summaryrefslogtreecommitdiffstats
path: root/deluge/core
diff options
context:
space:
mode:
Diffstat (limited to 'deluge/core')
-rw-r--r--deluge/core/__init__.py0
-rw-r--r--deluge/core/alertmanager.py193
-rw-r--r--deluge/core/authmanager.py285
-rw-r--r--deluge/core/core.py1302
-rw-r--r--deluge/core/daemon.py203
-rw-r--r--deluge/core/daemon_entry.py140
-rw-r--r--deluge/core/eventmanager.py66
-rw-r--r--deluge/core/filtermanager.py274
-rw-r--r--deluge/core/pluginmanager.py105
-rw-r--r--deluge/core/preferencesmanager.py476
-rw-r--r--deluge/core/rpcserver.py598
-rw-r--r--deluge/core/torrent.py1563
-rw-r--r--deluge/core/torrentmanager.py1700
13 files changed, 6905 insertions, 0 deletions
diff --git a/deluge/core/__init__.py b/deluge/core/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/deluge/core/__init__.py
diff --git a/deluge/core/alertmanager.py b/deluge/core/alertmanager.py
new file mode 100644
index 0000000..cf541f0
--- /dev/null
+++ b/deluge/core/alertmanager.py
@@ -0,0 +1,193 @@
+#
+# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+"""
+
+The AlertManager handles all the libtorrent alerts.
+
+This should typically only be used by the Core. Plugins should utilize the
+`:mod:EventManager` for similar functionality.
+
+"""
+import contextlib
+import logging
+import threading
+import time
+from collections import defaultdict
+from functools import partial
+from typing import Any, Callable
+
+from twisted.internet import reactor, task, threads
+
+import deluge.component as component
+from deluge._libtorrent import lt
+from deluge.common import decode_bytes
+
+log = logging.getLogger(__name__)
+
+
+class AlertManager(component.Component):
+ """AlertManager fetches and processes libtorrent alerts"""
+
+ def __init__(self):
+ log.debug('AlertManager init...')
+ component.Component.__init__(self, 'AlertManager')
+ self.session = component.get('Core').session
+
+ # Increase the alert queue size so that alerts don't get lost.
+ self.alert_queue_size = 10000
+ self.set_alert_queue_size(self.alert_queue_size)
+
+ alert_mask = (
+ lt.alert.category_t.error_notification
+ | lt.alert.category_t.port_mapping_notification
+ | lt.alert.category_t.storage_notification
+ | lt.alert.category_t.tracker_notification
+ | lt.alert.category_t.status_notification
+ | lt.alert.category_t.ip_block_notification
+ | lt.alert.category_t.performance_warning
+ | lt.alert.category_t.file_progress_notification
+ )
+
+ self.session.apply_settings({'alert_mask': alert_mask})
+
+ # handlers is a dictionary of lists {"alert_type": [handler1,h2,..]}
+ self.handlers = defaultdict(list)
+ self.handlers_timeout_secs = 2
+ self.delayed_calls = []
+ self._event = threading.Event()
+
+ def update(self):
+ pass
+
+ def start(self):
+ thread = threading.Thread(
+ target=self.wait_for_alert_in_thread, name='alert-poller', daemon=True
+ )
+ thread.start()
+ self._event.set()
+
+ def stop(self):
+ self.cancel_delayed_calls()
+
+ def pause(self):
+ self._event.clear()
+
+ def resume(self):
+ self._event.set()
+
+ def wait_for_alert_in_thread(self):
+ while self._component_state not in ('Stopping', 'Stopped'):
+ if self.check_delayed_calls():
+ time.sleep(0.05)
+ continue
+
+ if self.session.wait_for_alert(1000) is None:
+ continue
+ if self._event.wait():
+ threads.blockingCallFromThread(reactor, self.maybe_handle_alerts)
+
+ def on_delayed_call_timeout(self, result, timeout, **kwargs):
+ log.warning('Alert handler was timed-out before being called %s', kwargs)
+
+ def cancel_delayed_calls(self):
+ """Cancel all delayed handlers."""
+ for delayed_call in self.delayed_calls:
+ delayed_call.cancel()
+ self.delayed_calls = []
+
+ def check_delayed_calls(self) -> bool:
+ """Returns True if any handler calls are delayed."""
+ self.delayed_calls = [dc for dc in self.delayed_calls if not dc.called]
+ return len(self.delayed_calls) > 0
+
+ def maybe_handle_alerts(self) -> None:
+ if self._component_state != 'Started':
+ return
+
+ self.handle_alerts()
+
+ def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None:
+ """
+ Registers a function that will be called when 'alert_type' is pop'd
+ in handle_alerts. The handler function should look like: handler(alert)
+ Where 'alert' is the actual alert object from libtorrent.
+
+ Args:
+ alert_type: String representation of the libtorrent alert name.
+ Can be supplied with or without `_alert` suffix.
+ handler: Callback function when the alert is raised.
+ """
+ if alert_type and alert_type.endswith('_alert'):
+ alert_type = alert_type[: -len('_alert')]
+
+ self.handlers[alert_type].append(handler)
+ log.debug('Registered handler for alert %s', alert_type)
+
+ def deregister_handler(self, handler: Callable[[Any], None]):
+ """
+ De-registers the `handler` function from all alert types.
+
+ Args:
+ handler: The handler function to deregister.
+ """
+ for alert_type_handlers in self.handlers.values():
+ with contextlib.suppress(ValueError):
+ alert_type_handlers.remove(handler)
+
+ def handle_alerts(self):
+ """
+ Pops all libtorrent alerts in the session queue and handles them appropriately.
+ """
+ alerts = self.session.pop_alerts()
+ if not alerts:
+ return
+
+ num_alerts = len(alerts)
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Alerts queued: %s', num_alerts)
+ if num_alerts > 0.9 * self.alert_queue_size:
+ log.warning(
+ 'Warning total alerts queued, %s, passes 90%% of queue size.',
+ num_alerts,
+ )
+
+ for alert in alerts:
+ alert_type = alert.what()
+
+ # Display the alert message
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('%s: %s', alert_type, decode_bytes(alert.message()))
+
+ if alert_type not in self.handlers:
+ continue
+
+ # Call any handlers for this alert type
+ for handler in self.handlers[alert_type]:
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Handling alert: %s', alert_type)
+ d = task.deferLater(reactor, 0, handler, alert)
+ on_handler_timeout = partial(
+ self.on_delayed_call_timeout,
+ handler=handler.__qualname__,
+ alert_type=alert_type,
+ )
+ d.addTimeout(
+ self.handlers_timeout_secs,
+ reactor,
+ onTimeoutCancel=on_handler_timeout,
+ )
+ self.delayed_calls.append(d)
+
+ def set_alert_queue_size(self, queue_size):
+ """Sets the maximum size of the libtorrent alert queue"""
+ log.info('Alert Queue Size set to %s', queue_size)
+ self.alert_queue_size = queue_size
+ component.get('Core').apply_session_setting(
+ 'alert_queue_size', self.alert_queue_size
+ )
diff --git a/deluge/core/authmanager.py b/deluge/core/authmanager.py
new file mode 100644
index 0000000..3ff8a3a
--- /dev/null
+++ b/deluge/core/authmanager.py
@@ -0,0 +1,285 @@
+#
+# Copyright (C) 2009 Andrew Resch <andrewresch@gmail.com>
+# Copyright (C) 2011 Pedro Algarvio <pedro@algarvio.me>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import logging
+import os
+import shutil
+
+import deluge.component as component
+import deluge.configmanager as configmanager
+from deluge.common import (
+ AUTH_LEVEL_ADMIN,
+ AUTH_LEVEL_DEFAULT,
+ AUTH_LEVEL_NONE,
+ AUTH_LEVEL_NORMAL,
+ AUTH_LEVEL_READONLY,
+ create_localclient_account,
+)
+from deluge.error import AuthenticationRequired, AuthManagerError, BadLoginError
+
+log = logging.getLogger(__name__)
+
+AUTH_LEVELS_MAPPING = {
+ 'NONE': AUTH_LEVEL_NONE,
+ 'READONLY': AUTH_LEVEL_READONLY,
+ 'DEFAULT': AUTH_LEVEL_DEFAULT,
+ 'NORMAL': AUTH_LEVEL_NORMAL,
+ 'ADMIN': AUTH_LEVEL_ADMIN,
+}
+AUTH_LEVELS_MAPPING_REVERSE = {v: k for k, v in AUTH_LEVELS_MAPPING.items()}
+
+
+class Account:
+ __slots__ = ('username', 'password', 'authlevel')
+
+ def __init__(self, username, password, authlevel):
+ self.username = username
+ self.password = password
+ self.authlevel = authlevel
+
+ def data(self):
+ return {
+ 'username': self.username,
+ 'password': self.password,
+ 'authlevel': AUTH_LEVELS_MAPPING_REVERSE[self.authlevel],
+ 'authlevel_int': self.authlevel,
+ }
+
+ def __repr__(self):
+ return '<Account username="{username}" authlevel={authlevel}>'.format(
+ username=self.username,
+ authlevel=self.authlevel,
+ )
+
+
+class AuthManager(component.Component):
+ def __init__(self):
+ component.Component.__init__(self, 'AuthManager', interval=10)
+ self.__auth = {}
+ self.__auth_modification_time = None
+
+ def start(self):
+ self.__load_auth_file()
+
+ def stop(self):
+ self.__auth = {}
+
+ def shutdown(self):
+ pass
+
+ def update(self):
+ auth_file = configmanager.get_config_dir('auth')
+ # Check for auth file and create if necessary
+ if not os.path.isfile(auth_file):
+ log.info('Authfile not found, recreating it.')
+ self.__load_auth_file()
+ return
+
+ auth_file_modification_time = os.stat(auth_file).st_mtime
+ if self.__auth_modification_time != auth_file_modification_time:
+ log.info('Auth file changed, reloading it!')
+ self.__load_auth_file()
+
+ def authorize(self, username, password):
+ """Authorizes users based on username and password.
+
+ Args:
+ username (str): Username
+ password (str): Password
+
+ Returns:
+ int: The auth level for this user.
+
+ Raises:
+ AuthenticationRequired: If additional details are required to authenticate.
+ BadLoginError: If the username does not exist or password does not match.
+
+ """
+ if not username:
+ raise AuthenticationRequired(
+ 'Username and Password are required.', username
+ )
+
+ if username not in self.__auth:
+ # Let's try to re-load the file.. Maybe it's been updated
+ self.__load_auth_file()
+ if username not in self.__auth:
+ raise BadLoginError('Username does not exist', username)
+
+ if self.__auth[username].password == password:
+ # Return the users auth level
+ return self.__auth[username].authlevel
+ elif not password and self.__auth[username].password:
+ raise AuthenticationRequired('Password is required', username)
+ else:
+ raise BadLoginError('Password does not match', username)
+
+ def has_account(self, username):
+ return username in self.__auth
+
+ def get_known_accounts(self):
+ """Returns a list of known deluge usernames."""
+ self.__load_auth_file()
+ return [account.data() for account in self.__auth.values()]
+
+ def create_account(self, username, password, authlevel):
+ if username in self.__auth:
+ raise AuthManagerError('Username in use.', username)
+ if authlevel not in AUTH_LEVELS_MAPPING:
+ raise AuthManagerError('Invalid auth level: %s' % authlevel)
+ try:
+ self.__auth[username] = Account(
+ username, password, AUTH_LEVELS_MAPPING[authlevel]
+ )
+ self.write_auth_file()
+ return True
+ except Exception as ex:
+ log.exception(ex)
+ raise ex
+
+ def update_account(self, username, password, authlevel):
+ if username not in self.__auth:
+ raise AuthManagerError('Username not known', username)
+ if authlevel not in AUTH_LEVELS_MAPPING:
+ raise AuthManagerError('Invalid auth level: %s' % authlevel)
+ try:
+ self.__auth[username].username = username
+ self.__auth[username].password = password
+ self.__auth[username].authlevel = AUTH_LEVELS_MAPPING[authlevel]
+ self.write_auth_file()
+ return True
+ except Exception as ex:
+ log.exception(ex)
+ raise ex
+
+ def remove_account(self, username):
+ if username not in self.__auth:
+ raise AuthManagerError('Username not known', username)
+ elif username == component.get('RPCServer').get_session_user():
+ raise AuthManagerError(
+ 'You cannot delete your own account while logged in!', username
+ )
+
+ del self.__auth[username]
+ self.write_auth_file()
+ return True
+
+ def write_auth_file(self):
+ filename = 'auth'
+ filepath = os.path.join(configmanager.get_config_dir(), filename)
+ filepath_bak = filepath + '.bak'
+ filepath_tmp = filepath + '.tmp'
+
+ try:
+ if os.path.isfile(filepath):
+ log.debug('Creating backup of %s at: %s', filename, filepath_bak)
+ shutil.copy2(filepath, filepath_bak)
+ except OSError as ex:
+ log.error('Unable to backup %s to %s: %s', filepath, filepath_bak, ex)
+ else:
+ log.info('Saving the %s at: %s', filename, filepath)
+ try:
+ with open(filepath_tmp, 'w', encoding='utf8') as _file:
+ for account in self.__auth.values():
+ _file.write(
+ '%(username)s:%(password)s:%(authlevel_int)s\n'
+ % account.data()
+ )
+ _file.flush()
+ os.fsync(_file.fileno())
+ shutil.move(filepath_tmp, filepath)
+ except OSError as ex:
+ log.error('Unable to save %s: %s', filename, ex)
+ if os.path.isfile(filepath_bak):
+ log.info('Restoring backup of %s from: %s', filename, filepath_bak)
+ shutil.move(filepath_bak, filepath)
+
+ self.__load_auth_file()
+
+ def __load_auth_file(self):
+ save_and_reload = False
+ filename = 'auth'
+ auth_file = configmanager.get_config_dir(filename)
+ auth_file_bak = auth_file + '.bak'
+
+ # Check for auth file and create if necessary
+ if not os.path.isfile(auth_file):
+ create_localclient_account()
+ return self.__load_auth_file()
+
+ auth_file_modification_time = os.stat(auth_file).st_mtime
+ if self.__auth_modification_time is None:
+ self.__auth_modification_time = auth_file_modification_time
+ elif self.__auth_modification_time == auth_file_modification_time:
+ # File didn't change, no need for re-parsing's
+ return
+
+ for _filepath in (auth_file, auth_file_bak):
+ log.info('Opening %s for load: %s', filename, _filepath)
+ try:
+ with open(_filepath, encoding='utf8') as _file:
+ file_data = _file.readlines()
+ except OSError as ex:
+ log.warning('Unable to load %s: %s', _filepath, ex)
+ file_data = []
+ else:
+ log.info('Successfully loaded %s: %s', filename, _filepath)
+ break
+
+ # Load the auth file into a dictionary: {username: Account(...)}
+ for line in file_data:
+ line = line.strip()
+ if line.startswith('#') or not line:
+ # This line is a comment or empty
+ continue
+ lsplit = line.split(':')
+ if len(lsplit) == 2:
+ username, password = lsplit
+ log.warning(
+ 'Your auth entry for %s contains no auth level, '
+ 'using AUTH_LEVEL_DEFAULT(%s)..',
+ username,
+ AUTH_LEVEL_DEFAULT,
+ )
+ if username == 'localclient':
+ authlevel = AUTH_LEVEL_ADMIN
+ else:
+ authlevel = AUTH_LEVEL_DEFAULT
+ # This is probably an old auth file
+ save_and_reload = True
+ elif len(lsplit) == 3:
+ username, password, authlevel = lsplit
+ else:
+ log.error('Your auth file is malformed: Incorrect number of fields!')
+ continue
+
+ username = username.strip()
+ password = password.strip()
+ try:
+ authlevel = int(authlevel)
+ except ValueError:
+ try:
+ authlevel = AUTH_LEVELS_MAPPING[authlevel]
+ except KeyError:
+ log.error(
+ 'Your auth file is malformed: %r is not a valid auth level',
+ authlevel,
+ )
+ continue
+
+ self.__auth[username] = Account(username, password, authlevel)
+
+ if 'localclient' not in self.__auth:
+ create_localclient_account(True)
+ return self.__load_auth_file()
+
+ if save_and_reload:
+ log.info('Re-writing auth file (upgrade)')
+ self.write_auth_file()
+ self.__auth_modification_time = auth_file_modification_time
diff --git a/deluge/core/core.py b/deluge/core/core.py
new file mode 100644
index 0000000..e2130f5
--- /dev/null
+++ b/deluge/core/core.py
@@ -0,0 +1,1302 @@
+#
+# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com>
+# Copyright (C) 2011 Pedro Algarvio <pedro@algarvio.me>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import glob
+import logging
+import os
+import shutil
+import tempfile
+from base64 import b64decode, b64encode
+from typing import Any, Dict, List, Optional, Tuple, Union
+from urllib.request import URLError, urlopen
+
+from twisted.internet import defer, reactor, task, threads
+from twisted.web.client import Agent, readBody
+
+import deluge.common
+import deluge.component as component
+from deluge import metafile, path_chooser_common
+from deluge._libtorrent import LT_VERSION, lt
+from deluge.configmanager import ConfigManager, get_config_dir
+from deluge.core.alertmanager import AlertManager
+from deluge.core.authmanager import (
+ AUTH_LEVEL_ADMIN,
+ AUTH_LEVEL_NONE,
+ AUTH_LEVELS_MAPPING,
+ AUTH_LEVELS_MAPPING_REVERSE,
+ AuthManager,
+)
+from deluge.core.eventmanager import EventManager
+from deluge.core.filtermanager import FilterManager
+from deluge.core.pluginmanager import PluginManager
+from deluge.core.preferencesmanager import PreferencesManager
+from deluge.core.rpcserver import export
+from deluge.core.torrentmanager import TorrentManager
+from deluge.decorators import deprecated, maybe_coroutine
+from deluge.error import (
+ AddTorrentError,
+ DelugeError,
+ InvalidPathError,
+ InvalidTorrentError,
+)
+from deluge.event import (
+ NewVersionAvailableEvent,
+ SessionPausedEvent,
+ SessionResumedEvent,
+ TorrentQueueChangedEvent,
+)
+from deluge.httpdownloader import download_file
+
+log = logging.getLogger(__name__)
+
+DEPR_SESSION_STATUS_KEYS = {
+ # 'active_requests': None, # In dht_stats_alert, if required.
+ 'allowed_upload_slots': 'ses.num_unchoke_slots',
+ # 'dht_global_nodes': None,
+ 'dht_node_cache': 'dht.dht_node_cache',
+ 'dht_nodes': 'dht.dht_nodes',
+ 'dht_torrents': 'dht.dht_torrents',
+ # 'dht_total_allocations': None,
+ 'down_bandwidth_bytes_queue': 'net.limiter_down_bytes',
+ 'down_bandwidth_queue': 'net.limiter_down_queue',
+ 'has_incoming_connections': 'net.has_incoming_connections',
+ 'num_peers': 'peer.num_peers_connected',
+ 'num_unchoked': 'peer.num_peers_up_unchoked',
+ # 'optimistic_unchoke_counter': None, # lt.settings_pack
+ 'total_dht_download': 'dht.dht_bytes_in',
+ 'total_dht_upload': 'dht.dht_bytes_out',
+ 'total_download': 'net.recv_bytes',
+ 'total_failed_bytes': 'net.recv_failed_bytes',
+ 'total_ip_overhead_download': 'net.recv_ip_overhead_bytes',
+ 'total_ip_overhead_upload': 'net.sent_ip_overhead_bytes',
+ 'total_payload_download': 'net.recv_payload_bytes',
+ 'total_payload_upload': 'net.sent_payload_bytes',
+ 'total_redundant_bytes': 'net.recv_redundant_bytes',
+ 'total_tracker_download': 'net.recv_tracker_bytes',
+ 'total_tracker_upload': 'net.sent_tracker_bytes',
+ 'total_upload': 'net.sent_bytes',
+ # 'unchoke_counter': None, # lt.settings_pack
+ 'up_bandwidth_bytes_queue': 'net.limiter_up_bytes',
+ 'up_bandwidth_queue': 'net.limiter_up_queue',
+ # 'utp_stats': None
+}
+
+# Session status rate keys associated with session status counters.
+SESSION_RATES_MAPPING = {
+ 'dht_download_rate': 'dht.dht_bytes_in',
+ 'dht_upload_rate': 'dht.dht_bytes_out',
+ 'ip_overhead_download_rate': 'net.recv_ip_overhead_bytes',
+ 'ip_overhead_upload_rate': 'net.sent_ip_overhead_bytes',
+ 'payload_download_rate': 'net.recv_payload_bytes',
+ 'payload_upload_rate': 'net.sent_payload_bytes',
+ 'tracker_download_rate': 'net.recv_tracker_bytes',
+ 'tracker_upload_rate': 'net.sent_tracker_bytes',
+ 'download_rate': 'net.recv_bytes',
+ 'upload_rate': 'net.sent_bytes',
+}
+
+DELUGE_VER = deluge.common.get_version()
+
+
+class Core(component.Component):
+ def __init__(
+ self, listen_interface=None, outgoing_interface=None, read_only_config_keys=None
+ ):
+ component.Component.__init__(self, 'Core')
+
+ # Start the libtorrent session.
+ user_agent = f'Deluge/{DELUGE_VER} libtorrent/{LT_VERSION}'
+ peer_id = self._create_peer_id(DELUGE_VER)
+ log.debug('Starting session (peer_id: %s, user_agent: %s)', peer_id, user_agent)
+ settings_pack = {
+ 'peer_fingerprint': peer_id,
+ 'user_agent': user_agent,
+ 'ignore_resume_timestamps': True,
+ }
+ self.session = lt.session(settings_pack, flags=0)
+
+ # Load the settings, if available.
+ self._load_session_state()
+
+ # Enable libtorrent extensions
+ # Allows peers to download the metadata from the swarm directly
+ self.session.add_extension('ut_metadata')
+ # Ban peers that sends bad data
+ self.session.add_extension('smart_ban')
+
+ # Create the components
+ self.eventmanager = EventManager()
+ self.preferencesmanager = PreferencesManager()
+ self.alertmanager = AlertManager()
+ self.pluginmanager = PluginManager(self)
+ self.torrentmanager = TorrentManager()
+ self.filtermanager = FilterManager(self)
+ self.authmanager = AuthManager()
+
+ # New release check information
+ self.new_release = None
+
+ # External IP Address from libtorrent
+ self.external_ip = None
+ self.eventmanager.register_event_handler(
+ 'ExternalIPEvent', self._on_external_ip_event
+ )
+
+ # GeoIP instance with db loaded
+ self.geoip_instance = None
+
+ # These keys will be dropped from the set_config() RPC and are
+ # configurable from the command-line.
+ self.read_only_config_keys = read_only_config_keys
+ log.debug('read_only_config_keys: %s', read_only_config_keys)
+
+ # Get the core config
+ self.config = ConfigManager('core.conf')
+ self.config.save()
+
+ # If there was an interface value from the command line, use it, but
+ # store the one in the config so we can restore it on shutdown
+ self._old_listen_interface = None
+ if listen_interface:
+ if deluge.common.is_interface(listen_interface):
+ self._old_listen_interface = self.config['listen_interface']
+ self.config['listen_interface'] = listen_interface
+ else:
+ log.error(
+ 'Invalid listen interface (must be IP Address or Interface Name): %s',
+ listen_interface,
+ )
+
+ self._old_outgoing_interface = None
+ if outgoing_interface:
+ if deluge.common.is_interface(outgoing_interface):
+ self._old_outgoing_interface = self.config['outgoing_interface']
+ self.config['outgoing_interface'] = outgoing_interface
+ else:
+ log.error(
+ 'Invalid outgoing interface (must be IP Address or Interface Name): %s',
+ outgoing_interface,
+ )
+
+ # New release check information
+ self.__new_release = None
+
+ # Session status timer
+ self.session_status = {k.name: 0 for k in lt.session_stats_metrics()}
+ self._session_prev_bytes = {k: 0 for k in SESSION_RATES_MAPPING}
+ # Initiate other session status keys.
+ self.session_status.update(self._session_prev_bytes)
+ hit_ratio_keys = ['write_hit_ratio', 'read_hit_ratio']
+ self.session_status.update({k: 0.0 for k in hit_ratio_keys})
+
+ self.session_status_timer_interval = 0.5
+ self.session_status_timer = task.LoopingCall(self.session.post_session_stats)
+ self.alertmanager.register_handler(
+ 'session_stats', self._on_alert_session_stats
+ )
+ self.session_rates_timer_interval = 2
+ self.session_rates_timer = task.LoopingCall(self._update_session_rates)
+
+ def start(self):
+ """Starts the core"""
+ self.session_status_timer.start(self.session_status_timer_interval)
+ self.session_rates_timer.start(self.session_rates_timer_interval, now=False)
+
+ def stop(self):
+ log.debug('Core stopping...')
+
+ if self.session_status_timer.running:
+ self.session_status_timer.stop()
+
+ if self.session_rates_timer.running:
+ self.session_rates_timer.stop()
+
+ # Save the libtorrent session state
+ self._save_session_state()
+
+ # We stored a copy of the old interface value
+ if self._old_listen_interface is not None:
+ self.config['listen_interface'] = self._old_listen_interface
+
+ if self._old_outgoing_interface is not None:
+ self.config['outgoing_interface'] = self._old_outgoing_interface
+
+ # Make sure the config file has been saved
+ self.config.save()
+
+ def shutdown(self):
+ pass
+
+ def apply_session_setting(self, key, value):
+ self.apply_session_settings({key: value})
+
+ def apply_session_settings(self, settings):
+ """Apply libtorrent session settings.
+
+ Args:
+ settings: A dict of lt session settings to apply.
+ """
+ self.session.apply_settings(settings)
+
+ @staticmethod
+ def _create_peer_id(version: str) -> str:
+ """Create a peer_id fingerprint.
+
+ This creates the peer_id and modifies the release char to identify
+ pre-release and development version. Using ``D`` for dev, daily or
+ nightly builds, ``a, b, r`` for pre-releases and ``s`` for
+ stable releases.
+
+ Examples:
+ ``--<client><client><major><minor><micro><release>--``
+ ``--DE200D--`` (development version of 2.0.0)
+ ``--DE200s--`` (stable release of v2.0.0)
+ ``--DE201b--`` (beta pre-release of v2.0.1)
+
+ Args:
+ version: The version string in PEP440 dotted notation.
+
+ Returns:
+ The formatted peer_id with Deluge prefix e.g. '--DE200s--'
+ """
+ split = deluge.common.VersionSplit(version)
+ # Fill list with zeros to length of 4 and use lt to create fingerprint.
+ version_list = split.version + [0] * (4 - len(split.version))
+ peer_id = lt.generate_fingerprint('DE', *version_list)
+
+ def substitute_chr(string, idx, char):
+ """Fast substitute single char in string."""
+ return string[:idx] + char + string[idx + 1 :]
+
+ if split.dev:
+ release_chr = 'D'
+ elif split.suffix:
+ # a (alpha), b (beta) or r (release candidate).
+ release_chr = split.suffix[0].lower()
+ else:
+ release_chr = 's'
+ peer_id = substitute_chr(peer_id, 6, release_chr)
+
+ return peer_id
+
+ def _save_session_state(self):
+ """Saves the libtorrent session state"""
+ filename = 'session.state'
+ filepath = get_config_dir(filename)
+ filepath_bak = filepath + '.bak'
+ filepath_tmp = filepath + '.tmp'
+
+ try:
+ if os.path.isfile(filepath):
+ log.debug('Creating backup of %s at: %s', filename, filepath_bak)
+ shutil.copy2(filepath, filepath_bak)
+ except OSError as ex:
+ log.error('Unable to backup %s to %s: %s', filepath, filepath_bak, ex)
+ else:
+ log.info('Saving the %s at: %s', filename, filepath)
+ try:
+ with open(filepath_tmp, 'wb') as _file:
+ _file.write(lt.bencode(self.session.save_state()))
+ _file.flush()
+ os.fsync(_file.fileno())
+ shutil.move(filepath_tmp, filepath)
+ except (OSError, EOFError) as ex:
+ log.error('Unable to save %s: %s', filename, ex)
+ if os.path.isfile(filepath_bak):
+ log.info('Restoring backup of %s from: %s', filename, filepath_bak)
+ shutil.move(filepath_bak, filepath)
+
+ def _load_session_state(self) -> dict:
+ """Loads the libtorrent session state
+
+ Returns:
+ A libtorrent sesion state, empty dict if unable to load it.
+ """
+ filename = 'session.state'
+ filepath = get_config_dir(filename)
+ filepath_bak = filepath + '.bak'
+
+ for _filepath in (filepath, filepath_bak):
+ log.debug('Opening %s for load: %s', filename, _filepath)
+ try:
+ with open(_filepath, 'rb') as _file:
+ state = lt.bdecode(_file.read())
+ except (OSError, EOFError, RuntimeError) as ex:
+ log.warning('Unable to load %s: %s', _filepath, ex)
+ else:
+ log.info('Successfully loaded %s: %s', filename, _filepath)
+ self.session.load_state(state)
+
+ def _on_alert_session_stats(self, alert):
+ """The handler for libtorrent session stats alert"""
+ self.session_status.update(alert.values)
+ self._update_session_cache_hit_ratio()
+
+ def _update_session_cache_hit_ratio(self):
+ """Calculates the cache read/write hit ratios for session_status."""
+ blocks_written = self.session_status['disk.num_blocks_written']
+ blocks_read = self.session_status['disk.num_blocks_read']
+
+ if blocks_written:
+ self.session_status['write_hit_ratio'] = (
+ blocks_written - self.session_status['disk.num_write_ops']
+ ) / blocks_written
+ else:
+ self.session_status['write_hit_ratio'] = 0.0
+
+ if blocks_read:
+ self.session_status['read_hit_ratio'] = (
+ blocks_read - self.session_status['disk.num_read_ops']
+ ) / blocks_read
+ else:
+ self.session_status['read_hit_ratio'] = 0.0
+
+ def _update_session_rates(self):
+ """Calculate session status rates.
+
+ Uses polling interval and counter difference for session_status rates.
+ """
+ for rate_key, prev_bytes in list(self._session_prev_bytes.items()):
+ new_bytes = self.session_status[SESSION_RATES_MAPPING[rate_key]]
+ self.session_status[rate_key] = (
+ new_bytes - prev_bytes
+ ) / self.session_rates_timer_interval
+ # Store current value for next update.
+ self._session_prev_bytes[rate_key] = new_bytes
+
+ def get_new_release(self):
+ log.debug('get_new_release')
+ try:
+ self.new_release = (
+ urlopen('http://download.deluge-torrent.org/version-2.0')
+ .read()
+ .decode()
+ .strip()
+ )
+ except URLError as ex:
+ log.debug('Unable to get release info from website: %s', ex)
+ else:
+ self.check_new_release()
+
+ def check_new_release(self):
+ if self.new_release:
+ log.debug('new_release: %s', self.new_release)
+ if deluge.common.VersionSplit(
+ self.new_release
+ ) > deluge.common.VersionSplit(deluge.common.get_version()):
+ component.get('EventManager').emit(
+ NewVersionAvailableEvent(self.new_release)
+ )
+ return self.new_release
+ return False
+
+ # Exported Methods
+ @export
+ def add_torrent_file_async(
+ self, filename: str, filedump: str, options: dict, save_state: bool = True
+ ) -> 'defer.Deferred[Optional[str]]':
+ """Adds a torrent file to the session asynchronously.
+
+ Args:
+ filename: The filename of the torrent.
+ filedump: A base64 encoded string of torrent file contents.
+ options: The options to apply to the torrent upon adding.
+ save_state: If the state should be saved after adding the file.
+
+ Returns:
+ The torrent ID or None.
+ """
+ try:
+ filedump = b64decode(filedump)
+ except TypeError as ex:
+ log.error('There was an error decoding the filedump string: %s', ex)
+
+ try:
+ d = self.torrentmanager.add_async(
+ filedump=filedump,
+ options=options,
+ filename=filename,
+ save_state=save_state,
+ )
+ except RuntimeError as ex:
+ log.error('There was an error adding the torrent file %s: %s', filename, ex)
+ raise
+ else:
+ return d
+
+ @export
+ @maybe_coroutine
+ async def prefetch_magnet_metadata(
+ self, magnet: str, timeout: int = 30
+ ) -> Tuple[str, bytes]:
+ """Download magnet metadata without adding to Deluge session.
+
+ Used by UIs to get magnet files for selection before adding to session.
+
+ The metadata is bencoded and for transfer base64 encoded.
+
+ Args:
+ magnet: The magnet URI.
+ timeout: Number of seconds to wait before canceling request.
+
+ Returns:
+ A tuple of (torrent_id, metadata) for the magnet.
+
+ """
+ return await self.torrentmanager.prefetch_metadata(magnet, timeout)
+
+ @export
+ def add_torrent_file(
+ self, filename: str, filedump: Union[str, bytes], options: dict
+ ) -> Optional[str]:
+ """Adds a torrent file to the session.
+
+ Args:
+ filename: The filename of the torrent.
+ filedump: A base64 encoded string of the torrent file contents.
+ options: The options to apply to the torrent upon adding.
+
+ Returns:
+ The torrent_id or None.
+ """
+ try:
+ filedump = b64decode(filedump)
+ except Exception as ex:
+ log.error('There was an error decoding the filedump string: %s', ex)
+
+ try:
+ return self.torrentmanager.add(
+ filedump=filedump, options=options, filename=filename
+ )
+ except RuntimeError as ex:
+ log.error('There was an error adding the torrent file %s: %s', filename, ex)
+ raise
+
+ @export
+ def add_torrent_files(
+ self, torrent_files: List[Tuple[str, Union[str, bytes], dict]]
+ ) -> 'defer.Deferred[List[AddTorrentError]]':
+ """Adds multiple torrent files to the session asynchronously.
+
+ Args:
+ torrent_files: Torrent files as tuple of
+ ``(filename, filedump, options)``.
+
+ Returns:
+ A list of errors (if there were any)
+ """
+
+ @maybe_coroutine
+ async def add_torrents():
+ errors = []
+ last_index = len(torrent_files) - 1
+ for idx, torrent in enumerate(torrent_files):
+ try:
+ await self.add_torrent_file_async(
+ torrent[0], torrent[1], torrent[2], save_state=idx == last_index
+ )
+ except AddTorrentError as ex:
+ log.warning('Error when adding torrent: %s', ex)
+ errors.append(ex)
+ defer.returnValue(errors)
+
+ return task.deferLater(reactor, 0, add_torrents)
+
+ @export
+ @maybe_coroutine
+ async def add_torrent_url(
+ self, url: str, options: dict, headers: dict = None
+ ) -> 'defer.Deferred[Optional[str]]':
+ """Adds a torrent from a URL. Deluge will attempt to fetch the torrent
+ from the URL prior to adding it to the session.
+
+ Args:
+ url: the URL pointing to the torrent file
+ options: the options to apply to the torrent on add
+ headers: any optional headers to send
+
+ Returns:
+ a Deferred which returns the torrent_id as a str or None
+ """
+ log.info('Attempting to add URL %s', url)
+
+ tmp_fd, tmp_file = tempfile.mkstemp(prefix='deluge_url.', suffix='.torrent')
+ try:
+ filename = await download_file(
+ url, tmp_file, headers=headers, force_filename=True
+ )
+ except Exception:
+ log.error('Failed to add torrent from URL %s', url)
+ raise
+ else:
+ with open(filename, 'rb') as _file:
+ data = _file.read()
+ return self.add_torrent_file(filename, b64encode(data), options)
+ finally:
+ try:
+ os.close(tmp_fd)
+ os.remove(tmp_file)
+ except OSError as ex:
+ log.warning(f'Unable to delete temp file {tmp_file}: , {ex}')
+
+ @export
+ def add_torrent_magnet(self, uri: str, options: dict) -> str:
+ """Adds a torrent from a magnet link.
+
+ Args:
+ uri: the magnet link
+ options: the options to apply to the torrent on add
+
+ Returns:
+ the torrent_id
+ """
+ log.debug('Attempting to add by magnet URI: %s', uri)
+
+ return self.torrentmanager.add(magnet=uri, options=options)
+
+ @export
+ def remove_torrent(self, torrent_id: str, remove_data: bool) -> bool:
+ """Removes a single torrent from the session.
+
+ Args:
+ torrent_id: The torrent ID to remove.
+ remove_data: If True, also remove the downloaded data.
+
+ Returns:
+ True if removed successfully.
+
+ Raises:
+ InvalidTorrentError: If the torrent ID does not exist in the session.
+ """
+ log.debug('Removing torrent %s from the core.', torrent_id)
+ return self.torrentmanager.remove(torrent_id, remove_data)
+
+ @export
+ def remove_torrents(
+ self, torrent_ids: List[str], remove_data: bool
+ ) -> 'defer.Deferred[List[Tuple[str, str]]]':
+ """Remove multiple torrents from the session.
+
+ Args:
+ torrent_ids: The torrent IDs to remove.
+ remove_data: If True, also remove the downloaded data.
+
+ Returns:
+ An empty list if no errors occurred otherwise the list contains
+ tuples of strings, a torrent ID and an error message. For example:
+
+ [('<torrent_id>', 'Error removing torrent')]
+ """
+ log.info('Removing %d torrents from core.', len(torrent_ids))
+
+ def do_remove_torrents():
+ errors = []
+ for torrent_id in torrent_ids:
+ try:
+ self.torrentmanager.remove(
+ torrent_id, remove_data=remove_data, save_state=False
+ )
+ except InvalidTorrentError as ex:
+ errors.append((torrent_id, str(ex)))
+ # Save the session state
+ self.torrentmanager.save_state()
+ if errors:
+ log.warning(
+ 'Failed to remove %d of %d torrents.', len(errors), len(torrent_ids)
+ )
+ return errors
+
+ return task.deferLater(reactor, 0, do_remove_torrents)
+
+ @export
+ def get_session_status(self, keys: List[str]) -> Dict[str, Union[int, float]]:
+ """Gets the session status values for 'keys', these keys are taking
+ from libtorrent's session status.
+
+ See: http://www.rasterbar.com/products/libtorrent/manual.html#status
+
+ Args:
+ keys: the keys for which we want values
+
+ Returns:
+ a dictionary of {key: value, ...}
+ """
+ if not keys:
+ return self.session_status
+
+ status = {}
+ for key in keys:
+ try:
+ status[key] = self.session_status[key]
+ except KeyError:
+ if key in DEPR_SESSION_STATUS_KEYS:
+ new_key = DEPR_SESSION_STATUS_KEYS[key]
+ log.debug(
+ 'Deprecated session status key %s, please use %s', key, new_key
+ )
+ status[key] = self.session_status[new_key]
+ else:
+ log.debug('Session status key not valid: %s', key)
+ return status
+
+ @export
+ def force_reannounce(self, torrent_ids: List[str]) -> None:
+ log.debug('Forcing reannouncment to: %s', torrent_ids)
+ for torrent_id in torrent_ids:
+ self.torrentmanager[torrent_id].force_reannounce()
+
+ @export
+ def pause_torrent(self, torrent_id: str) -> None:
+ """Pauses a torrent"""
+ log.debug('Pausing: %s', torrent_id)
+ if not isinstance(torrent_id, str):
+ self.pause_torrents(torrent_id)
+ else:
+ self.torrentmanager[torrent_id].pause()
+
+ @export
+ def pause_torrents(self, torrent_ids: List[str] = None) -> None:
+ """Pauses a list of torrents"""
+ if not torrent_ids:
+ torrent_ids = self.torrentmanager.get_torrent_list()
+ for torrent_id in torrent_ids:
+ self.pause_torrent(torrent_id)
+
+ @export
+ def connect_peer(self, torrent_id: str, ip: str, port: int):
+ log.debug('adding peer %s to %s', ip, torrent_id)
+ if not self.torrentmanager[torrent_id].connect_peer(ip, port):
+ log.warning('Error adding peer %s:%s to %s', ip, port, torrent_id)
+
+ @export
+ def move_storage(self, torrent_ids: List[str], dest: str):
+ log.debug('Moving storage %s to %s', torrent_ids, dest)
+ for torrent_id in torrent_ids:
+ if not self.torrentmanager[torrent_id].move_storage(dest):
+ log.warning('Error moving torrent %s to %s', torrent_id, dest)
+
+ @export
+ def pause_session(self) -> None:
+ """Pause the entire session"""
+ if not self.session.is_paused():
+ self.session.pause()
+ component.get('EventManager').emit(SessionPausedEvent())
+
+ @export
+ def resume_session(self) -> None:
+ """Resume the entire session"""
+ if self.session.is_paused():
+ self.session.resume()
+ for torrent_id in self.torrentmanager.torrents:
+ self.torrentmanager[torrent_id].update_state()
+ component.get('EventManager').emit(SessionResumedEvent())
+
+ @export
+ def is_session_paused(self) -> bool:
+ """Returns the activity of the session"""
+ return self.session.is_paused()
+
+ @export
+ def resume_torrent(self, torrent_id: str) -> None:
+ """Resumes a torrent"""
+ log.debug('Resuming: %s', torrent_id)
+ if not isinstance(torrent_id, str):
+ self.resume_torrents(torrent_id)
+ else:
+ self.torrentmanager[torrent_id].resume()
+
+ @export
+ def resume_torrents(self, torrent_ids: List[str] = None) -> None:
+ """Resumes a list of torrents"""
+ if not torrent_ids:
+ torrent_ids = self.torrentmanager.get_torrent_list()
+ for torrent_id in torrent_ids:
+ self.resume_torrent(torrent_id)
+
+ def create_torrent_status(
+ self,
+ torrent_id,
+ torrent_keys,
+ plugin_keys,
+ diff=False,
+ update=False,
+ all_keys=False,
+ ):
+ try:
+ status = self.torrentmanager[torrent_id].get_status(
+ torrent_keys, diff, update=update, all_keys=all_keys
+ )
+ except KeyError:
+ import traceback
+
+ traceback.print_exc()
+ # Torrent was probably removed meanwhile
+ return {}
+
+ # Ask the plugin manager to fill in the plugin keys
+ if len(plugin_keys) > 0 or all_keys:
+ status.update(self.pluginmanager.get_status(torrent_id, plugin_keys))
+ return status
+
+ @export
+ def get_torrent_status(
+ self, torrent_id: str, keys: List[str], diff: bool = False
+ ) -> dict:
+ torrent_keys, plugin_keys = self.torrentmanager.separate_keys(
+ keys, [torrent_id]
+ )
+ return self.create_torrent_status(
+ torrent_id,
+ torrent_keys,
+ plugin_keys,
+ diff=diff,
+ update=True,
+ all_keys=not keys,
+ )
+
+ @export
+ @maybe_coroutine
+ async def get_torrents_status(
+ self, filter_dict: dict, keys: List[str], diff: bool = False
+ ) -> dict:
+ """returns all torrents , optionally filtered by filter_dict."""
+ all_keys = not keys
+ torrent_ids = self.filtermanager.filter_torrent_ids(filter_dict)
+ status_dict, plugin_keys = await self.torrentmanager.torrents_status_update(
+ torrent_ids, keys, diff=diff
+ )
+ # Ask the plugin manager to fill in the plugin keys
+ if len(plugin_keys) > 0 or all_keys:
+ for key in status_dict:
+ status_dict[key].update(self.pluginmanager.get_status(key, plugin_keys))
+ return status_dict
+
+ @export
+ def get_filter_tree(
+ self, show_zero_hits: bool = True, hide_cat: List[str] = None
+ ) -> Dict:
+ """returns {field: [(value,count)] }
+ for use in sidebar(s)
+ """
+ return self.filtermanager.get_filter_tree(show_zero_hits, hide_cat)
+
+ @export
+ def get_session_state(self) -> List[str]:
+ """Returns a list of torrent_ids in the session."""
+ # Get the torrent list from the TorrentManager
+ return self.torrentmanager.get_torrent_list()
+
+ @export
+ def get_config(self) -> dict:
+ """Get all the preferences as a dictionary"""
+ return self.config.config
+
+ @export
+ def get_config_value(self, key: str) -> Any:
+ """Get the config value for key"""
+ return self.config.get(key)
+
+ @export
+ def get_config_values(self, keys: List[str]) -> Dict[str, Any]:
+ """Get the config values for the entered keys"""
+ return {key: self.config.get(key) for key in keys}
+
+ @export
+ def set_config(self, config: Dict[str, Any]):
+ """Set the config with values from dictionary"""
+ # Load all the values into the configuration
+ for key in config:
+ if self.read_only_config_keys and key in self.read_only_config_keys:
+ continue
+ self.config[key] = config[key]
+
+ @export
+ def get_listen_port(self) -> int:
+ """Returns the active listen port"""
+ return self.session.listen_port()
+
+ @export
+ def get_proxy(self) -> Dict[str, Any]:
+ """Returns the proxy settings
+
+ Returns:
+ Proxy settings.
+
+ Notes:
+ Proxy type names:
+ 0: None, 1: Socks4, 2: Socks5, 3: Socks5 w Auth, 4: HTTP, 5: HTTP w Auth, 6: I2P
+ """
+
+ settings = self.session.get_settings()
+ proxy_type = settings['proxy_type']
+ proxy_hostname = (
+ settings['i2p_hostname'] if proxy_type == 6 else settings['proxy_hostname']
+ )
+ proxy_port = settings['i2p_port'] if proxy_type == 6 else settings['proxy_port']
+ proxy_dict = {
+ 'type': proxy_type,
+ 'hostname': proxy_hostname,
+ 'username': settings['proxy_username'],
+ 'password': settings['proxy_password'],
+ 'port': proxy_port,
+ 'proxy_hostnames': settings['proxy_hostnames'],
+ 'proxy_peer_connections': settings['proxy_peer_connections'],
+ 'proxy_tracker_connections': settings['proxy_tracker_connections'],
+ }
+
+ return proxy_dict
+
+ @export
+ def get_available_plugins(self) -> List[str]:
+ """Returns a list of plugins available in the core"""
+ return self.pluginmanager.get_available_plugins()
+
+ @export
+ def get_enabled_plugins(self) -> List[str]:
+ """Returns a list of enabled plugins in the core"""
+ return self.pluginmanager.get_enabled_plugins()
+
+ @export
+ def enable_plugin(self, plugin: str) -> 'defer.Deferred[bool]':
+ return self.pluginmanager.enable_plugin(plugin)
+
+ @export
+ def disable_plugin(self, plugin: str) -> 'defer.Deferred[bool]':
+ return self.pluginmanager.disable_plugin(plugin)
+
+ @export
+ def force_recheck(self, torrent_ids: List[str]) -> None:
+ """Forces a data recheck on torrent_ids"""
+ for torrent_id in torrent_ids:
+ self.torrentmanager[torrent_id].force_recheck()
+
+ @export
+ def set_torrent_options(
+ self, torrent_ids: List[str], options: Dict[str, Any]
+ ) -> None:
+ """Sets the torrent options for torrent_ids
+
+ Args:
+ torrent_ids: A list of torrent_ids to set the options for.
+ options: A dict of torrent options to set. See
+ ``torrent.TorrentOptions`` class for valid keys.
+ """
+ if 'owner' in options and not self.authmanager.has_account(options['owner']):
+ raise DelugeError('Username "%s" is not known.' % options['owner'])
+
+ if isinstance(torrent_ids, str):
+ torrent_ids = [torrent_ids]
+
+ for torrent_id in torrent_ids:
+ self.torrentmanager[torrent_id].set_options(options)
+
+ @export
+ def set_torrent_trackers(
+ self, torrent_id: str, trackers: List[Dict[str, Any]]
+ ) -> None:
+ """Sets a torrents tracker list. trackers will be ``[{"url", "tier"}]``"""
+ return self.torrentmanager[torrent_id].set_trackers(trackers)
+
+ @export
+ def get_magnet_uri(self, torrent_id: str) -> str:
+ return self.torrentmanager[torrent_id].get_magnet_uri()
+
+ @deprecated
+ @export
+ def set_torrent_max_connections(self, torrent_id, value):
+ """Deprecated: Use set_torrent_options with 'max_connections'"""
+ self.set_torrent_options([torrent_id], {'max_connections': value})
+
+ @deprecated
+ @export
+ def set_torrent_max_upload_slots(self, torrent_id, value):
+ """Deprecated: Use set_torrent_options with 'max_upload_slots'"""
+ self.set_torrent_options([torrent_id], {'max_upload_slots': value})
+
+ @deprecated
+ @export
+ def set_torrent_max_upload_speed(self, torrent_id, value):
+ """Deprecated: Use set_torrent_options with 'max_upload_speed'"""
+ self.set_torrent_options([torrent_id], {'max_upload_speed': value})
+
+ @deprecated
+ @export
+ def set_torrent_max_download_speed(self, torrent_id, value):
+ """Deprecated: Use set_torrent_options with 'max_download_speed'"""
+ self.set_torrent_options([torrent_id], {'max_download_speed': value})
+
+ @deprecated
+ @export
+ def set_torrent_file_priorities(self, torrent_id, priorities):
+ """Deprecated: Use set_torrent_options with 'file_priorities'"""
+ self.set_torrent_options([torrent_id], {'file_priorities': priorities})
+
+ @deprecated
+ @export
+ def set_torrent_prioritize_first_last(self, torrent_id, value):
+ """Deprecated: Use set_torrent_options with 'prioritize_first_last'"""
+ self.set_torrent_options([torrent_id], {'prioritize_first_last_pieces': value})
+
+ @deprecated
+ @export
+ def set_torrent_auto_managed(self, torrent_id, value):
+ """Deprecated: Use set_torrent_options with 'auto_managed'"""
+ self.set_torrent_options([torrent_id], {'auto_managed': value})
+
+ @deprecated
+ @export
+ def set_torrent_stop_at_ratio(self, torrent_id, value):
+ """Deprecated: Use set_torrent_options with 'stop_at_ratio'"""
+ self.set_torrent_options([torrent_id], {'stop_at_ratio': value})
+
+ @deprecated
+ @export
+ def set_torrent_stop_ratio(self, torrent_id, value):
+ """Deprecated: Use set_torrent_options with 'stop_ratio'"""
+ self.set_torrent_options([torrent_id], {'stop_ratio': value})
+
+ @deprecated
+ @export
+ def set_torrent_remove_at_ratio(self, torrent_id, value):
+ """Deprecated: Use set_torrent_options with 'remove_at_ratio'"""
+ self.set_torrent_options([torrent_id], {'remove_at_ratio': value})
+
+ @deprecated
+ @export
+ def set_torrent_move_completed(self, torrent_id, value):
+ """Deprecated: Use set_torrent_options with 'move_completed'"""
+ self.set_torrent_options([torrent_id], {'move_completed': value})
+
+ @deprecated
+ @export
+ def set_torrent_move_completed_path(self, torrent_id, value):
+ """Deprecated: Use set_torrent_options with 'move_completed_path'"""
+ self.set_torrent_options([torrent_id], {'move_completed_path': value})
+
+ @export
+ def get_path_size(self, path):
+ """Returns the size of the file or folder 'path' and -1 if the path is
+ inaccessible (non-existent or insufficient privileges)"""
+ return deluge.common.get_path_size(path)
+
+ @export
+ def create_torrent(
+ self,
+ path,
+ tracker,
+ piece_length,
+ comment=None,
+ target=None,
+ webseeds=None,
+ private=False,
+ created_by=None,
+ trackers=None,
+ add_to_session=False,
+ torrent_format=metafile.TorrentFormat.V1,
+ ):
+ if isinstance(torrent_format, str):
+ torrent_format = metafile.TorrentFormat(torrent_format)
+
+ log.debug('creating torrent..')
+ return threads.deferToThread(
+ self._create_torrent_thread,
+ path,
+ tracker,
+ piece_length,
+ comment=comment,
+ target=target,
+ webseeds=webseeds,
+ private=private,
+ created_by=created_by,
+ trackers=trackers,
+ add_to_session=add_to_session,
+ torrent_format=torrent_format,
+ )
+
+ def _create_torrent_thread(
+ self,
+ path,
+ tracker,
+ piece_length,
+ comment,
+ target,
+ webseeds,
+ private,
+ created_by,
+ trackers,
+ add_to_session,
+ torrent_format,
+ ):
+ from deluge import metafile
+
+ filecontent = metafile.make_meta_file_content(
+ path,
+ tracker,
+ piece_length,
+ comment=comment,
+ webseeds=webseeds,
+ private=private,
+ created_by=created_by,
+ trackers=trackers,
+ torrent_format=torrent_format,
+ )
+
+ write_file = False
+ if target or not add_to_session:
+ write_file = True
+
+ if not target:
+ target = metafile.default_meta_file_path(path)
+ filename = os.path.split(target)[-1]
+
+ if write_file:
+ with open(target, 'wb') as _file:
+ _file.write(filecontent)
+
+ filedump = b64encode(filecontent)
+ log.debug('torrent created!')
+ if add_to_session:
+ options = {}
+ options['download_location'] = os.path.split(path)[0]
+ self.add_torrent_file(filename, filedump, options)
+ return filename, filedump
+
+ @export
+ def upload_plugin(self, filename: str, filedump: Union[str, bytes]) -> None:
+ """This method is used to upload new plugins to the daemon. It is used
+ when connecting to the daemon remotely and installing a new plugin on
+ the client side. ``plugin_data`` is a ``xmlrpc.Binary`` object of the file data,
+ i.e. ``plugin_file.read()``"""
+
+ try:
+ filedump = b64decode(filedump)
+ except TypeError as ex:
+ log.error('There was an error decoding the filedump string!')
+ log.exception(ex)
+ return
+
+ with open(os.path.join(get_config_dir(), 'plugins', filename), 'wb') as _file:
+ _file.write(filedump)
+ component.get('CorePluginManager').scan_for_plugins()
+
+ @export
+ def rescan_plugins(self) -> None:
+ """Re-scans the plugin folders for new plugins"""
+ component.get('CorePluginManager').scan_for_plugins()
+
+ @export
+ def rename_files(
+ self, torrent_id: str, filenames: List[Tuple[int, str]]
+ ) -> defer.Deferred:
+ """Rename files in ``torrent_id``. Since this is an asynchronous operation by
+ libtorrent, watch for the TorrentFileRenamedEvent to know when the
+ files have been renamed.
+
+ Args:
+ torrent_id: the torrent_id to rename files
+ filenames: a list of index, filename pairs
+
+ Raises:
+ InvalidTorrentError: if torrent_id is invalid
+ """
+ if torrent_id not in self.torrentmanager.torrents:
+ raise InvalidTorrentError('torrent_id is not in session')
+
+ def rename():
+ self.torrentmanager[torrent_id].rename_files(filenames)
+
+ return task.deferLater(reactor, 0, rename)
+
+ @export
+ def rename_folder(
+ self, torrent_id: str, folder: str, new_folder: str
+ ) -> defer.Deferred:
+ """Renames the 'folder' to 'new_folder' in 'torrent_id'. Watch for the
+ TorrentFolderRenamedEvent which is emitted when the folder has been
+ renamed successfully.
+
+ Args:
+ torrent_id: the torrent to rename folder in
+ folder: the folder to rename
+ new_folder: the new folder name
+
+ Raises:
+ InvalidTorrentError: if the torrent_id is invalid
+ """
+ if torrent_id not in self.torrentmanager.torrents:
+ raise InvalidTorrentError('torrent_id is not in session')
+
+ return self.torrentmanager[torrent_id].rename_folder(folder, new_folder)
+
+ @export
+ def queue_top(self, torrent_ids: List[str]) -> None:
+ log.debug('Attempting to queue %s to top', torrent_ids)
+ # torrent_ids must be sorted in reverse before moving to preserve order
+ for torrent_id in sorted(
+ torrent_ids, key=self.torrentmanager.get_queue_position, reverse=True
+ ):
+ try:
+ # If the queue method returns True, then we should emit a signal
+ if self.torrentmanager.queue_top(torrent_id):
+ component.get('EventManager').emit(TorrentQueueChangedEvent())
+ except KeyError:
+ log.warning('torrent_id: %s does not exist in the queue', torrent_id)
+
+ @export
+ def queue_up(self, torrent_ids: List[str]) -> None:
+ log.debug('Attempting to queue %s to up', torrent_ids)
+ torrents = (
+ (self.torrentmanager.get_queue_position(torrent_id), torrent_id)
+ for torrent_id in torrent_ids
+ )
+ torrent_moved = True
+ prev_queue_position = None
+ # torrent_ids must be sorted before moving.
+ for queue_position, torrent_id in sorted(torrents):
+ # Move the torrent if and only if there is space (by not moving it we preserve the order)
+ if torrent_moved or queue_position - prev_queue_position > 1:
+ try:
+ torrent_moved = self.torrentmanager.queue_up(torrent_id)
+ except KeyError:
+ log.warning(
+ 'torrent_id: %s does not exist in the queue', torrent_id
+ )
+ # If the torrent moved, then we should emit a signal
+ if torrent_moved:
+ component.get('EventManager').emit(TorrentQueueChangedEvent())
+ else:
+ prev_queue_position = queue_position
+
+ @export
+ def queue_down(self, torrent_ids: List[str]) -> None:
+ log.debug('Attempting to queue %s to down', torrent_ids)
+ torrents = (
+ (self.torrentmanager.get_queue_position(torrent_id), torrent_id)
+ for torrent_id in torrent_ids
+ )
+ torrent_moved = True
+ prev_queue_position = None
+ # torrent_ids must be sorted before moving.
+ for queue_position, torrent_id in sorted(torrents, reverse=True):
+ # Move the torrent if and only if there is space (by not moving it we preserve the order)
+ if torrent_moved or prev_queue_position - queue_position > 1:
+ try:
+ torrent_moved = self.torrentmanager.queue_down(torrent_id)
+ except KeyError:
+ log.warning(
+ 'torrent_id: %s does not exist in the queue', torrent_id
+ )
+ # If the torrent moved, then we should emit a signal
+ if torrent_moved:
+ component.get('EventManager').emit(TorrentQueueChangedEvent())
+ else:
+ prev_queue_position = queue_position
+
+ @export
+ def queue_bottom(self, torrent_ids: List[str]) -> None:
+ log.debug('Attempting to queue %s to bottom', torrent_ids)
+ # torrent_ids must be sorted before moving to preserve order
+ for torrent_id in sorted(
+ torrent_ids, key=self.torrentmanager.get_queue_position
+ ):
+ try:
+ # If the queue method returns True, then we should emit a signal
+ if self.torrentmanager.queue_bottom(torrent_id):
+ component.get('EventManager').emit(TorrentQueueChangedEvent())
+ except KeyError:
+ log.warning('torrent_id: %s does not exist in the queue', torrent_id)
+
+ @export
+ def glob(self, path: str) -> List[str]:
+ return glob.glob(path)
+
+ @export
+ def test_listen_port(self) -> 'defer.Deferred[Optional[bool]]':
+ """Checks if the active port is open
+
+ Returns:
+ True if the port is open, False if not
+ """
+ port = self.get_listen_port()
+ url = 'https://deluge-torrent.org/test_port.php?port=%s' % port
+ agent = Agent(reactor, connectTimeout=30)
+ d = agent.request(b'GET', url.encode())
+
+ def on_get_page(body):
+ return bool(int(body))
+
+ def on_error(failure):
+ log.warning('Error testing listen port: %s', failure)
+
+ d.addCallback(readBody).addCallback(on_get_page)
+ d.addErrback(on_error)
+
+ return d
+
+ @export
+ def get_free_space(self, path: str = None) -> int:
+ """Returns the number of free bytes at path
+
+ Args:
+ path: the path to check free space at, if None, use the default download location
+
+ Returns:
+ the number of free bytes at path
+
+ Raises:
+ InvalidPathError: if the path is invalid
+ """
+ if not path:
+ path = self.config['download_location']
+ try:
+ return deluge.common.free_space(path)
+ except InvalidPathError:
+ return -1
+
+ def _on_external_ip_event(self, external_ip):
+ self.external_ip = external_ip
+
+ @export
+ def get_external_ip(self) -> str:
+ """Returns the external IP address received from libtorrent."""
+ return self.external_ip
+
+ @export
+ def get_libtorrent_version(self) -> str:
+ """Returns the libtorrent version.
+
+ Returns:
+ the version
+ """
+ return LT_VERSION
+
+ @export
+ def get_completion_paths(self, args: Dict[str, Any]) -> Dict[str, Any]:
+ """Returns the available path completions for the input value."""
+ return path_chooser_common.get_completion_paths(args)
+
+ @export(AUTH_LEVEL_ADMIN)
+ def get_known_accounts(self) -> List[Dict[str, Any]]:
+ return self.authmanager.get_known_accounts()
+
+ @export(AUTH_LEVEL_NONE)
+ def get_auth_levels_mappings(self) -> Tuple[Dict[str, int], Dict[int, str]]:
+ return (AUTH_LEVELS_MAPPING, AUTH_LEVELS_MAPPING_REVERSE)
+
+ @export(AUTH_LEVEL_ADMIN)
+ def create_account(self, username: str, password: str, authlevel: str) -> bool:
+ return self.authmanager.create_account(username, password, authlevel)
+
+ @export(AUTH_LEVEL_ADMIN)
+ def update_account(self, username: str, password: str, authlevel: str) -> bool:
+ return self.authmanager.update_account(username, password, authlevel)
+
+ @export(AUTH_LEVEL_ADMIN)
+ def remove_account(self, username: str) -> bool:
+ return self.authmanager.remove_account(username)
diff --git a/deluge/core/daemon.py b/deluge/core/daemon.py
new file mode 100644
index 0000000..0185dd8
--- /dev/null
+++ b/deluge/core/daemon.py
@@ -0,0 +1,203 @@
+#
+# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+"""The Deluge daemon"""
+import logging
+import os
+import socket
+
+from twisted.internet import reactor
+
+import deluge.component as component
+from deluge.common import get_version, is_ip, is_process_running, windows_check
+from deluge.configmanager import get_config_dir
+from deluge.core.core import Core
+from deluge.core.rpcserver import RPCServer, export
+from deluge.error import DaemonRunningError
+
+if windows_check():
+ from win32api import SetConsoleCtrlHandler
+ from win32con import CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT
+
+log = logging.getLogger(__name__)
+
+
+def is_daemon_running(pid_file):
+ """
+ Check for another running instance of the daemon using the same pid file.
+
+ Args:
+ pid_file: The location of the file with pid, port values.
+
+ Returns:
+ bool: True is daemon is running, False otherwise.
+
+ """
+
+ try:
+ with open(pid_file) as _file:
+ pid, port = (int(x) for x in _file.readline().strip().split(';'))
+ except (OSError, ValueError):
+ return False
+
+ if is_process_running(pid):
+ # Ensure it's a deluged process by trying to open a socket to it's port.
+ _socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ _socket.connect(('127.0.0.1', port))
+ except OSError:
+ # Can't connect, so pid is not a deluged process.
+ return False
+ else:
+ # This is a deluged process!
+ _socket.close()
+ return True
+
+
+class Daemon:
+ """The Deluge Daemon class"""
+
+ def __init__(
+ self,
+ listen_interface=None,
+ outgoing_interface=None,
+ interface=None,
+ port=None,
+ standalone=False,
+ read_only_config_keys=None,
+ ):
+ """
+ Args:
+ listen_interface (str, optional): The IP address to listen to
+ BitTorrent connections on.
+ outgoing_interface (str, optional): The network interface name or
+ IP address to open outgoing BitTorrent connections on.
+ interface (str, optional): The IP address the daemon will
+ listen for UI connections on.
+ port (int, optional): The port the daemon will listen for UI
+ connections on.
+ standalone (bool, optional): If True the client is in Standalone
+ mode otherwise, if False, start the daemon as separate process.
+ read_only_config_keys (list of str, optional): A list of config
+ keys that will not be altered by core.set_config() RPC method.
+ """
+ self.standalone = standalone
+ self.pid_file = get_config_dir('deluged.pid')
+ log.info('Deluge daemon %s', get_version())
+ if is_daemon_running(self.pid_file):
+ raise DaemonRunningError(
+ 'Deluge daemon already running with this config directory!'
+ )
+
+ # Twisted catches signals to terminate, so just have it call the shutdown method.
+ reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown)
+
+ # Catch some Windows specific signals
+ if windows_check():
+
+ def win_handler(ctrl_type):
+ """Handle the Windows shutdown or close events."""
+ log.debug('windows handler ctrl_type: %s', ctrl_type)
+ if ctrl_type == CTRL_CLOSE_EVENT or ctrl_type == CTRL_SHUTDOWN_EVENT:
+ self._shutdown()
+ return 1
+
+ SetConsoleCtrlHandler(win_handler)
+
+ # Start the core as a thread and join it until it's done
+ self.core = Core(
+ listen_interface=listen_interface,
+ outgoing_interface=outgoing_interface,
+ read_only_config_keys=read_only_config_keys,
+ )
+
+ if port is None:
+ port = self.core.config['daemon_port']
+ self.port = port
+
+ if interface and not is_ip(interface):
+ log.error('Invalid UI interface (must be IP Address): %s', interface)
+ interface = None
+
+ self.rpcserver = RPCServer(
+ port=port,
+ allow_remote=self.core.config['allow_remote'],
+ listen=not standalone,
+ interface=interface,
+ )
+
+ log.debug(
+ 'Listening to UI on: %s:%s and bittorrent on: %s Making connections out on: %s',
+ interface,
+ port,
+ listen_interface,
+ outgoing_interface,
+ )
+
+ def start(self):
+ # Register the daemon and the core RPCs
+ self.rpcserver.register_object(self.core)
+ self.rpcserver.register_object(self)
+
+ # Make sure we start the PreferencesManager first
+ component.start('PreferencesManager')
+
+ if not self.standalone:
+ log.info('Deluge daemon starting...')
+ # Create pid file to track if deluged is running, also includes the port number.
+ pid = os.getpid()
+ log.debug('Storing pid %s & port %s in: %s', pid, self.port, self.pid_file)
+ with open(self.pid_file, 'w') as _file:
+ _file.write(f'{pid};{self.port}\n')
+
+ component.start()
+
+ try:
+ reactor.run()
+ finally:
+ log.debug('Remove pid file: %s', self.pid_file)
+ os.remove(self.pid_file)
+ log.info('Deluge daemon shutdown successfully')
+
+ @export()
+ def shutdown(self, *args, **kwargs):
+ log.debug('Deluge daemon shutdown requested...')
+ reactor.callLater(0, reactor.stop)
+
+ def _shutdown(self, *args, **kwargs):
+ log.info('Deluge daemon shutting down, waiting for components to shutdown...')
+ if not self.standalone:
+ return component.shutdown()
+
+ @export()
+ def get_method_list(self):
+ """Returns a list of the exported methods."""
+ return self.rpcserver.get_method_list()
+
+ @export()
+ def get_version(self):
+ """Returns the daemon version"""
+ return get_version()
+
+ @export(1)
+ def authorized_call(self, rpc):
+ """Determines if session auth_level is authorized to call RPC.
+
+ Args:
+ rpc (str): A RPC, e.g. core.get_torrents_status
+
+ Returns:
+ bool: True if authorized to call RPC, otherwise False.
+ """
+ if rpc not in self.get_method_list():
+ return False
+
+ return (
+ self.rpcserver.get_session_auth_level()
+ >= self.rpcserver.get_rpc_auth_level(rpc)
+ )
diff --git a/deluge/core/daemon_entry.py b/deluge/core/daemon_entry.py
new file mode 100644
index 0000000..c49fd2a
--- /dev/null
+++ b/deluge/core/daemon_entry.py
@@ -0,0 +1,140 @@
+#
+# Copyright (C) 2007 Andrew Resch <andrewresch@gmail.com>
+# Copyright (C) 2010 Pedro Algarvio <pedro@algarvio.me>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+import os
+import sys
+from logging import DEBUG, FileHandler, getLogger
+
+from twisted.internet.error import CannotListenError
+
+from deluge.argparserbase import ArgParserBase
+from deluge.common import run_profiled
+from deluge.configmanager import get_config_dir
+from deluge.i18n import setup_mock_translation
+
+
+def add_daemon_options(parser):
+ group = parser.add_argument_group(_('Daemon Options'))
+ group.add_argument(
+ '-u',
+ '--ui-interface',
+ metavar='<ip-addr>',
+ action='store',
+ help=_('IP address to listen for UI connections'),
+ )
+ group.add_argument(
+ '-p',
+ '--port',
+ metavar='<port>',
+ action='store',
+ type=int,
+ help=_('Port to listen for UI connections on'),
+ )
+ group.add_argument(
+ '-i',
+ '--interface',
+ metavar='<ip-addr>',
+ dest='listen_interface',
+ action='store',
+ help=_('IP address to listen for BitTorrent connections'),
+ )
+ group.add_argument(
+ '-o',
+ '--outgoing-interface',
+ metavar='<interface>',
+ dest='outgoing_interface',
+ action='store',
+ help=_(
+ 'The network interface name or IP address for outgoing BitTorrent connections.'
+ ),
+ )
+ group.add_argument(
+ '--read-only-config-keys',
+ metavar='<comma-separated-keys>',
+ action='store',
+ help=_('Config keys to be unmodified by `set_config` RPC'),
+ type=str,
+ default='',
+ )
+ parser.add_process_arg_group()
+
+
+def start_daemon(skip_start=False):
+ """
+ Entry point for daemon script
+
+ Args:
+ skip_start (bool): If starting daemon should be skipped.
+
+ Returns:
+ deluge.core.daemon.Daemon: A new daemon object
+
+ """
+ setup_mock_translation()
+
+ # Setup the argument parser
+ parser = ArgParserBase()
+ add_daemon_options(parser)
+
+ options = parser.parse_args()
+
+ # Check for any daemons running with this same config
+ from deluge.core.daemon import is_daemon_running
+
+ pid_file = get_config_dir('deluged.pid')
+ if is_daemon_running(pid_file):
+ print(
+ 'Cannot run multiple daemons with same config directory.\n'
+ 'If you believe this is an error, force starting by deleting: %s' % pid_file
+ )
+ sys.exit(1)
+
+ log = getLogger(__name__)
+
+ # If no logfile specified add logging to default location (as well as stdout)
+ if not options.logfile:
+ options.logfile = get_config_dir('deluged.log')
+ file_handler = FileHandler(options.logfile)
+ log.addHandler(file_handler)
+
+ def run_daemon(options):
+ try:
+ from deluge.core.daemon import Daemon
+
+ daemon = Daemon(
+ listen_interface=options.listen_interface,
+ outgoing_interface=options.outgoing_interface,
+ interface=options.ui_interface,
+ port=options.port,
+ read_only_config_keys=options.read_only_config_keys.split(','),
+ )
+ if skip_start:
+ return daemon
+ else:
+ daemon.start()
+ except CannotListenError as ex:
+ log.error(
+ 'Cannot start deluged, listen port in use.\n'
+ ' Check for other running daemons or services using this port: %s:%s',
+ ex.interface,
+ ex.port,
+ )
+ sys.exit(1)
+ except Exception as ex:
+ log.error('Unable to start deluged: %s', ex)
+ if log.isEnabledFor(DEBUG):
+ log.exception(ex)
+ sys.exit(1)
+ finally:
+ log.info('Exiting...')
+ if options.pidfile:
+ os.remove(options.pidfile)
+
+ return run_profiled(
+ run_daemon, options, output_file=options.profile, do_profile=options.profile
+ )
diff --git a/deluge/core/eventmanager.py b/deluge/core/eventmanager.py
new file mode 100644
index 0000000..d43847a
--- /dev/null
+++ b/deluge/core/eventmanager.py
@@ -0,0 +1,66 @@
+#
+# Copyright (C) 2009 Andrew Resch <andrewresch@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import logging
+
+import deluge.component as component
+
+log = logging.getLogger(__name__)
+
+
+class EventManager(component.Component):
+ def __init__(self):
+ component.Component.__init__(self, 'EventManager')
+ self.handlers = {}
+
+ def emit(self, event):
+ """
+ Emits the event to interested clients.
+
+ :param event: DelugeEvent
+ """
+ # Emit the event to the interested clients
+ component.get('RPCServer').emit_event(event)
+ # Call any handlers for the event
+ if event.name in self.handlers:
+ for handler in self.handlers[event.name]:
+ # log.debug('Running handler %s for event %s with args: %s', event.name, handler, event.args)
+ try:
+ handler(*event.args)
+ except Exception as ex:
+ log.error(
+ 'Event handler %s failed in %s with exception %s',
+ event.name,
+ handler,
+ ex,
+ )
+
+ def register_event_handler(self, event, handler):
+ """
+ Registers a function to be called when a `:param:event` is emitted.
+
+ :param event: str, the event name
+ :param handler: function, to be called when `:param:event` is emitted
+
+ """
+ if event not in self.handlers:
+ self.handlers[event] = []
+
+ if handler not in self.handlers[event]:
+ self.handlers[event].append(handler)
+
+ def deregister_event_handler(self, event, handler):
+ """
+ Deregisters an event handler function.
+
+ :param event: str, the event name
+ :param handler: function, currently registered to handle `:param:event`
+
+ """
+ if event in self.handlers and handler in self.handlers[event]:
+ self.handlers[event].remove(handler)
diff --git a/deluge/core/filtermanager.py b/deluge/core/filtermanager.py
new file mode 100644
index 0000000..a60cc5b
--- /dev/null
+++ b/deluge/core/filtermanager.py
@@ -0,0 +1,274 @@
+#
+# Copyright (C) 2008 Martijn Voncken <mvoncken@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+import logging
+
+import deluge.component as component
+from deluge.common import TORRENT_STATE
+
+log = logging.getLogger(__name__)
+
+STATE_SORT = ['All', 'Active'] + TORRENT_STATE
+
+
+# Special purpose filters:
+def filter_keywords(torrent_ids, values):
+ # Cleanup
+ keywords = ','.join([v.lower() for v in values])
+ keywords = keywords.split(',')
+
+ for keyword in keywords:
+ torrent_ids = filter_one_keyword(torrent_ids, keyword)
+ return torrent_ids
+
+
+def filter_one_keyword(torrent_ids, keyword):
+ """
+ search torrent on keyword.
+ searches title,state,tracker-status,tracker,files
+ """
+ all_torrents = component.get('TorrentManager').torrents
+
+ for torrent_id in torrent_ids:
+ torrent = all_torrents[torrent_id]
+ if keyword in torrent.filename.lower():
+ yield torrent_id
+ elif keyword in torrent.state.lower():
+ yield torrent_id
+ elif torrent.trackers and keyword in torrent.trackers[0]['url']:
+ yield torrent_id
+ elif keyword in torrent_id:
+ yield torrent_id
+ # Want to find broken torrents (search on "error", or "unregistered")
+ elif keyword in torrent.tracker_status.lower():
+ yield torrent_id
+ else:
+ for t_file in torrent.get_files():
+ if keyword in t_file['path'].lower():
+ yield torrent_id
+ break
+
+
+def filter_by_name(torrent_ids, search_string):
+ all_torrents = component.get('TorrentManager').torrents
+ try:
+ search_string, match_case = search_string[0].split('::match')
+ except ValueError:
+ search_string = search_string[0]
+ match_case = False
+
+ if match_case is False:
+ search_string = search_string.lower()
+
+ for torrent_id in torrent_ids:
+ torrent_name = all_torrents[torrent_id].get_name()
+ if match_case is False:
+ torrent_name = all_torrents[torrent_id].get_name().lower()
+ else:
+ torrent_name = all_torrents[torrent_id].get_name()
+
+ if search_string in torrent_name:
+ yield torrent_id
+
+
+def tracker_error_filter(torrent_ids, values):
+ filtered_torrent_ids = []
+ tm = component.get('TorrentManager')
+
+ # If this is a tracker_host, then we need to filter on it
+ if values[0] != 'Error':
+ for torrent_id in torrent_ids:
+ if values[0] == tm[torrent_id].get_status(['tracker_host'])['tracker_host']:
+ filtered_torrent_ids.append(torrent_id)
+ return filtered_torrent_ids
+
+ # Check torrent's tracker_status for 'Error:' and return those torrent_ids
+ for torrent_id in torrent_ids:
+ if 'Error:' in tm[torrent_id].get_status(['tracker_status'])['tracker_status']:
+ filtered_torrent_ids.append(torrent_id)
+ return filtered_torrent_ids
+
+
+class FilterManager(component.Component):
+ """FilterManager"""
+
+ def __init__(self, core):
+ component.Component.__init__(self, 'FilterManager')
+ log.debug('FilterManager init..')
+ self.core = core
+ self.torrents = core.torrentmanager
+ self.registered_filters = {}
+ self.register_filter('keyword', filter_keywords)
+ self.register_filter('name', filter_by_name)
+ self.tree_fields = {}
+
+ self.register_tree_field('state', self._init_state_tree)
+
+ def _init_tracker_tree():
+ return {'Error': 0}
+
+ self.register_tree_field('tracker_host', _init_tracker_tree)
+
+ self.register_filter('tracker_host', tracker_error_filter)
+
+ def _init_users_tree():
+ return {'': 0}
+
+ self.register_tree_field('owner', _init_users_tree)
+
+ def filter_torrent_ids(self, filter_dict):
+ """
+ returns a list of torrent_id's matching filter_dict.
+ core filter method
+ """
+ if not filter_dict:
+ return self.torrents.get_torrent_list()
+
+ # Sanitize input: filter-value must be a list of strings
+ for key, value in filter_dict.items():
+ if isinstance(value, str):
+ filter_dict[key] = [value]
+
+ # Optimized filter for id
+ if 'id' in filter_dict:
+ torrent_ids = list(filter_dict['id'])
+ del filter_dict['id']
+ else:
+ torrent_ids = self.torrents.get_torrent_list()
+
+ # Return if there's nothing more to filter
+ if not filter_dict:
+ return torrent_ids
+
+ # Special purpose, state=Active.
+ if 'state' in filter_dict:
+ # We need to make sure this is a list for the logic below
+ filter_dict['state'] = list(filter_dict['state'])
+
+ if 'state' in filter_dict and 'Active' in filter_dict['state']:
+ filter_dict['state'].remove('Active')
+ if not filter_dict['state']:
+ del filter_dict['state']
+ torrent_ids = self.filter_state_active(torrent_ids)
+
+ if not filter_dict:
+ return torrent_ids
+
+ # Registered filters
+ for field, values in list(filter_dict.items()):
+ if field in self.registered_filters:
+ # Filters out doubles
+ torrent_ids = list(
+ set(self.registered_filters[field](torrent_ids, values))
+ )
+ del filter_dict[field]
+
+ if not filter_dict:
+ return torrent_ids
+
+ torrent_keys, plugin_keys = self.torrents.separate_keys(
+ list(filter_dict), torrent_ids
+ )
+ # Leftover filter arguments, default filter on status fields.
+ for torrent_id in list(torrent_ids):
+ status = self.core.create_torrent_status(
+ torrent_id, torrent_keys, plugin_keys
+ )
+ for field, values in filter_dict.items():
+ if field in status and status[field] in values:
+ continue
+ elif torrent_id in torrent_ids:
+ torrent_ids.remove(torrent_id)
+ return torrent_ids
+
+ def get_filter_tree(self, show_zero_hits=True, hide_cat=None):
+ """
+ returns {field: [(value,count)] }
+ for use in sidebar.
+ """
+ torrent_ids = self.torrents.get_torrent_list()
+ tree_keys = list(self.tree_fields)
+ if hide_cat:
+ for cat in hide_cat:
+ tree_keys.remove(cat)
+
+ torrent_keys, plugin_keys = self.torrents.separate_keys(tree_keys, torrent_ids)
+ items = {field: self.tree_fields[field]() for field in tree_keys}
+
+ for torrent_id in list(torrent_ids):
+ status = self.core.create_torrent_status(
+ torrent_id, torrent_keys, plugin_keys
+ ) # status={key:value}
+ for field in tree_keys:
+ value = status[field]
+ items[field][value] = items[field].get(value, 0) + 1
+
+ if 'tracker_host' in items:
+ items['tracker_host']['All'] = len(torrent_ids)
+ items['tracker_host']['Error'] = len(
+ tracker_error_filter(torrent_ids, ('Error',))
+ )
+
+ if not show_zero_hits:
+ for cat in ['state', 'owner', 'tracker_host']:
+ if cat in tree_keys:
+ self._hide_state_items(items[cat])
+
+ # Return a dict of tuples:
+ sorted_items = {field: sorted(items[field].items()) for field in tree_keys}
+
+ if 'state' in tree_keys:
+ sorted_items['state'].sort(key=self._sort_state_item)
+
+ return sorted_items
+
+ def _init_state_tree(self):
+ init_state = {}
+ init_state['All'] = len(self.torrents.get_torrent_list())
+ for state in TORRENT_STATE:
+ init_state[state] = 0
+ init_state['Active'] = len(
+ self.filter_state_active(self.torrents.get_torrent_list())
+ )
+ return init_state
+
+ def register_filter(self, filter_id, filter_func, filter_value=None):
+ self.registered_filters[filter_id] = filter_func
+
+ def deregister_filter(self, filter_id):
+ del self.registered_filters[filter_id]
+
+ def register_tree_field(self, field, init_func=lambda: {}):
+ self.tree_fields[field] = init_func
+
+ def deregister_tree_field(self, field):
+ if field in self.tree_fields:
+ del self.tree_fields[field]
+
+ def filter_state_active(self, torrent_ids):
+ for torrent_id in list(torrent_ids):
+ status = self.torrents[torrent_id].get_status(
+ ['download_payload_rate', 'upload_payload_rate']
+ )
+ if status['download_payload_rate'] or status['upload_payload_rate']:
+ pass
+ else:
+ torrent_ids.remove(torrent_id)
+ return torrent_ids
+
+ def _hide_state_items(self, state_items):
+ """For hide(show)-zero hits"""
+ for value, count in list(state_items.items()):
+ if value != 'All' and count == 0:
+ del state_items[value]
+
+ def _sort_state_item(self, item):
+ try:
+ return STATE_SORT.index(item[0])
+ except ValueError:
+ return 99
diff --git a/deluge/core/pluginmanager.py b/deluge/core/pluginmanager.py
new file mode 100644
index 0000000..0482b16
--- /dev/null
+++ b/deluge/core/pluginmanager.py
@@ -0,0 +1,105 @@
+#
+# Copyright (C) 2007 Andrew Resch <andrewresch@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+
+"""PluginManager for Core"""
+import logging
+
+from twisted.internet import defer
+
+import deluge.component as component
+import deluge.pluginmanagerbase
+from deluge.event import PluginDisabledEvent, PluginEnabledEvent
+
+log = logging.getLogger(__name__)
+
+
+class PluginManager(deluge.pluginmanagerbase.PluginManagerBase, component.Component):
+ """PluginManager handles the loading of plugins and provides plugins with
+ functions to access parts of the core."""
+
+ def __init__(self, core):
+ component.Component.__init__(self, 'CorePluginManager')
+
+ self.status_fields = {}
+
+ # Call the PluginManagerBase constructor
+ deluge.pluginmanagerbase.PluginManagerBase.__init__(
+ self, 'core.conf', 'deluge.plugin.core'
+ )
+
+ def start(self):
+ # Enable plugins that are enabled in the config
+ self.enable_plugins()
+
+ def stop(self):
+ # Disable all enabled plugins
+ self.disable_plugins()
+
+ def shutdown(self):
+ self.stop()
+
+ def update_plugins(self):
+ for plugin in self.plugins:
+ if hasattr(self.plugins[plugin], 'update'):
+ try:
+ self.plugins[plugin].update()
+ except Exception as ex:
+ log.exception(ex)
+
+ def enable_plugin(self, name):
+ d = defer.succeed(True)
+ if name not in self.plugins:
+ d = deluge.pluginmanagerbase.PluginManagerBase.enable_plugin(self, name)
+
+ def on_enable_plugin(result):
+ if result is True and name in self.plugins:
+ component.get('EventManager').emit(PluginEnabledEvent(name))
+ return result
+
+ d.addBoth(on_enable_plugin)
+ return d
+
+ def disable_plugin(self, name):
+ d = defer.succeed(True)
+ if name in self.plugins:
+ d = deluge.pluginmanagerbase.PluginManagerBase.disable_plugin(self, name)
+
+ def on_disable_plugin(result):
+ if name not in self.plugins:
+ component.get('EventManager').emit(PluginDisabledEvent(name))
+ return result
+
+ d.addBoth(on_disable_plugin)
+ return d
+
+ def get_status(self, torrent_id, fields):
+ """Return the value of status fields for the selected torrent_id."""
+ status = {}
+ if len(fields) == 0:
+ fields = list(self.status_fields)
+ for field in fields:
+ try:
+ status[field] = self.status_fields[field](torrent_id)
+ except KeyError:
+ pass
+ return status
+
+ def register_status_field(self, field, function):
+ """Register a new status field. This can be used in the same way the
+ client requests other status information from core."""
+ log.debug('Registering status field %s with PluginManager', field)
+ self.status_fields[field] = function
+
+ def deregister_status_field(self, field):
+ """Deregisters a status field"""
+ log.debug('Deregistering status field %s with PluginManager', field)
+ try:
+ del self.status_fields[field]
+ except Exception:
+ log.warning('Unable to deregister status field %s', field)
diff --git a/deluge/core/preferencesmanager.py b/deluge/core/preferencesmanager.py
new file mode 100644
index 0000000..7e5c207
--- /dev/null
+++ b/deluge/core/preferencesmanager.py
@@ -0,0 +1,476 @@
+#
+# Copyright (C) 2008-2010 Andrew Resch <andrewresch@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+
+import logging
+import os
+import platform
+import random
+import threading
+from urllib.parse import quote_plus
+from urllib.request import urlopen
+
+from twisted.internet.task import LoopingCall
+
+import deluge.common
+import deluge.component as component
+import deluge.configmanager
+from deluge._libtorrent import lt
+from deluge.event import ConfigValueChangedEvent
+
+GeoIP = None
+try:
+ from GeoIP import GeoIP
+except ImportError:
+ try:
+ from pygeoip import GeoIP
+ except ImportError:
+ pass
+
+log = logging.getLogger(__name__)
+
+DEFAULT_PREFS = {
+ 'send_info': False,
+ 'info_sent': 0.0,
+ 'daemon_port': 58846,
+ 'allow_remote': False,
+ 'pre_allocate_storage': False,
+ 'download_location': deluge.common.get_default_download_dir(),
+ 'listen_ports': [6881, 6891],
+ 'listen_interface': '',
+ 'outgoing_interface': '',
+ 'random_port': True,
+ 'listen_random_port': None,
+ 'listen_use_sys_port': False,
+ 'listen_reuse_port': True,
+ 'outgoing_ports': [0, 0],
+ 'random_outgoing_ports': True,
+ 'copy_torrent_file': False,
+ 'del_copy_torrent_file': False,
+ 'torrentfiles_location': deluge.common.get_default_download_dir(),
+ 'plugins_location': os.path.join(deluge.configmanager.get_config_dir(), 'plugins'),
+ 'prioritize_first_last_pieces': False,
+ 'sequential_download': False,
+ 'dht': True,
+ 'upnp': True,
+ 'natpmp': True,
+ 'utpex': True,
+ 'lsd': True,
+ 'enc_in_policy': 1,
+ 'enc_out_policy': 1,
+ 'enc_level': 2,
+ 'max_connections_global': 200,
+ 'max_upload_speed': -1.0,
+ 'max_download_speed': -1.0,
+ 'max_upload_slots_global': 4,
+ 'max_half_open_connections': (
+ lambda: deluge.common.windows_check()
+ and (lambda: deluge.common.vista_check() and 4 or 8)()
+ or 50
+ )(),
+ 'max_connections_per_second': 20,
+ 'ignore_limits_on_local_network': True,
+ 'max_connections_per_torrent': -1,
+ 'max_upload_slots_per_torrent': -1,
+ 'max_upload_speed_per_torrent': -1,
+ 'max_download_speed_per_torrent': -1,
+ 'enabled_plugins': [],
+ 'add_paused': False,
+ 'max_active_seeding': 5,
+ 'max_active_downloading': 3,
+ 'max_active_limit': 8,
+ 'dont_count_slow_torrents': False,
+ 'queue_new_to_top': False,
+ 'stop_seed_at_ratio': False,
+ 'remove_seed_at_ratio': False,
+ 'stop_seed_ratio': 2.00,
+ 'share_ratio_limit': 2.00,
+ 'seed_time_ratio_limit': 7.00,
+ 'seed_time_limit': 180,
+ 'auto_managed': True,
+ 'move_completed': False,
+ 'move_completed_path': deluge.common.get_default_download_dir(),
+ 'move_completed_paths_list': [],
+ 'download_location_paths_list': [],
+ 'path_chooser_show_chooser_button_on_localhost': True,
+ 'path_chooser_auto_complete_enabled': True,
+ 'path_chooser_accelerator_string': 'Tab',
+ 'path_chooser_max_popup_rows': 20,
+ 'path_chooser_show_hidden_files': False,
+ 'new_release_check': True,
+ 'proxy': {
+ 'type': 0,
+ 'hostname': '',
+ 'username': '',
+ 'password': '',
+ 'port': 8080,
+ 'proxy_hostnames': True,
+ 'proxy_peer_connections': True,
+ 'proxy_tracker_connections': True,
+ 'force_proxy': False,
+ 'anonymous_mode': False,
+ },
+ 'peer_tos': '0x00',
+ 'rate_limit_ip_overhead': True,
+ 'geoip_db_location': '/usr/share/GeoIP/GeoIP.dat',
+ 'cache_size': 512,
+ 'cache_expiry': 60,
+ 'auto_manage_prefer_seeds': False,
+ 'shared': False,
+ 'super_seeding': False,
+}
+
+
+class PreferencesManager(component.Component):
+ def __init__(self):
+ component.Component.__init__(self, 'PreferencesManager')
+ self.config = deluge.configmanager.ConfigManager('core.conf', DEFAULT_PREFS)
+ if 'proxies' in self.config:
+ log.warning(
+ 'Updating config file for proxy, using "peer" values to fill new "proxy" setting'
+ )
+ self.config['proxy'].update(self.config['proxies']['peer'])
+ log.warning('New proxy config is: %s', self.config['proxy'])
+ del self.config['proxies']
+ if 'i2p_proxy' in self.config and self.config['i2p_proxy']['hostname']:
+ self.config['proxy'].update(self.config['i2p_proxy'])
+ self.config['proxy']['type'] = 6
+ del self.config['i2p_proxy']
+ if 'anonymous_mode' in self.config:
+ self.config['proxy']['anonymous_mode'] = self.config['anonymous_mode']
+ del self.config['anonymous_mode']
+ if 'proxy' in self.config:
+ for key in DEFAULT_PREFS['proxy']:
+ if key not in self.config['proxy']:
+ self.config['proxy'][key] = DEFAULT_PREFS['proxy'][key]
+
+ self.core = component.get('Core')
+ self.new_release_timer = None
+
+ def start(self):
+ # Set the initial preferences on start-up
+ for key in DEFAULT_PREFS:
+ self.do_config_set_func(key, self.config[key])
+
+ self.config.register_change_callback(self._on_config_value_change)
+
+ def stop(self):
+ if self.new_release_timer and self.new_release_timer.running:
+ self.new_release_timer.stop()
+
+ # Config set functions
+ def do_config_set_func(self, key, value):
+ on_set_func = getattr(self, '_on_set_' + key, None)
+ if on_set_func:
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Config key: %s set to %s..', key, value)
+ on_set_func(key, value)
+
+ def _on_config_value_change(self, key, value):
+ if self.get_state() == 'Started':
+ self.do_config_set_func(key, value)
+ component.get('EventManager').emit(ConfigValueChangedEvent(key, value))
+
+ def _on_set_torrentfiles_location(self, key, value):
+ if self.config['copy_torrent_file']:
+ try:
+ os.makedirs(value)
+ except OSError as ex:
+ log.debug('Unable to make directory: %s', ex)
+
+ def _on_set_listen_ports(self, key, value):
+ self.__set_listen_on()
+
+ def _on_set_listen_interface(self, key, value):
+ self.__set_listen_on()
+
+ def _on_set_outgoing_interface(self, key, value):
+ """Set interface name or IP address for outgoing BitTorrent connections."""
+ value = value.strip() if value else ''
+ self.core.apply_session_settings({'outgoing_interfaces': value})
+
+ def _on_set_random_port(self, key, value):
+ self.__set_listen_on()
+
+ def __set_listen_on(self):
+ """Set the ports and interface address to listen for incoming connections on."""
+ if self.config['random_port']:
+ if not self.config['listen_random_port']:
+ self.config['listen_random_port'] = random.randrange(49152, 65525)
+ listen_ports = [
+ self.config['listen_random_port']
+ ] * 2 # use single port range
+ else:
+ self.config['listen_random_port'] = None
+ listen_ports = self.config['listen_ports']
+
+ if self.config['listen_interface']:
+ interface = self.config['listen_interface'].strip()
+ else:
+ interface = '0.0.0.0'
+
+ log.debug(
+ 'Listen Interface: %s, Ports: %s with use_sys_port: %s',
+ interface,
+ listen_ports,
+ self.config['listen_use_sys_port'],
+ )
+ interfaces = [
+ f'{interface}:{port}'
+ for port in range(listen_ports[0], listen_ports[1] + 1)
+ ]
+ self.core.apply_session_settings(
+ {
+ 'listen_system_port_fallback': self.config['listen_use_sys_port'],
+ 'listen_interfaces': ','.join(interfaces),
+ }
+ )
+
+ def _on_set_outgoing_ports(self, key, value):
+ self.__set_outgoing_ports()
+
+ def _on_set_random_outgoing_ports(self, key, value):
+ self.__set_outgoing_ports()
+
+ def __set_outgoing_ports(self):
+ port = (
+ 0
+ if self.config['random_outgoing_ports']
+ else self.config['outgoing_ports'][0]
+ )
+ if port:
+ num_ports = (
+ self.config['outgoing_ports'][1] - self.config['outgoing_ports'][0]
+ )
+ num_ports = num_ports if num_ports > 1 else 5
+ else:
+ num_ports = 0
+ log.debug('Outgoing port set to %s with range: %s', port, num_ports)
+ self.core.apply_session_settings(
+ {'outgoing_port': port, 'num_outgoing_ports': num_ports}
+ )
+
+ def _on_set_peer_tos(self, key, value):
+ try:
+ self.core.apply_session_setting('peer_tos', int(value, 16))
+ except ValueError as ex:
+ log.error('Invalid tos byte: %s', ex)
+
+ def _on_set_dht(self, key, value):
+ lt_bootstraps = self.core.session.get_settings()['dht_bootstrap_nodes']
+ # Update list of lt bootstraps, using set to remove duplicates.
+ dht_bootstraps = set(
+ lt_bootstraps.split(',')
+ + [
+ 'router.bittorrent.com:6881',
+ 'router.utorrent.com:6881',
+ 'router.bitcomet.com:6881',
+ 'dht.transmissionbt.com:6881',
+ 'dht.aelitis.com:6881',
+ ]
+ )
+ self.core.apply_session_settings(
+ {'dht_bootstrap_nodes': ','.join(dht_bootstraps), 'enable_dht': value}
+ )
+
+ def _on_set_upnp(self, key, value):
+ self.core.apply_session_setting('enable_upnp', value)
+
+ def _on_set_natpmp(self, key, value):
+ self.core.apply_session_setting('enable_natpmp', value)
+
+ def _on_set_lsd(self, key, value):
+ self.core.apply_session_setting('enable_lsd', value)
+
+ def _on_set_utpex(self, key, value):
+ if value:
+ self.core.session.add_extension('ut_pex')
+
+ def _on_set_enc_in_policy(self, key, value):
+ self._on_set_encryption(key, value)
+
+ def _on_set_enc_out_policy(self, key, value):
+ self._on_set_encryption(key, value)
+
+ def _on_set_enc_level(self, key, value):
+ self._on_set_encryption(key, value)
+
+ def _on_set_encryption(self, key, value):
+ # Convert Deluge enc_level values to libtorrent enc_level values.
+ pe_enc_level = {
+ 0: lt.enc_level.plaintext,
+ 1: lt.enc_level.rc4,
+ 2: lt.enc_level.both,
+ }
+ self.core.apply_session_settings(
+ {
+ 'out_enc_policy': lt.enc_policy(self.config['enc_out_policy']),
+ 'in_enc_policy': lt.enc_policy(self.config['enc_in_policy']),
+ 'allowed_enc_level': lt.enc_level(
+ pe_enc_level[self.config['enc_level']]
+ ),
+ 'prefer_rc4': True,
+ }
+ )
+
+ def _on_set_max_connections_global(self, key, value):
+ self.core.apply_session_setting('connections_limit', value)
+
+ def _on_set_max_upload_speed(self, key, value):
+ # We need to convert Kb/s to B/s
+ value = -1 if value < 0 else int(value * 1024)
+ self.core.apply_session_setting('upload_rate_limit', value)
+
+ def _on_set_max_download_speed(self, key, value):
+ # We need to convert Kb/s to B/s
+ value = -1 if value < 0 else int(value * 1024)
+ self.core.apply_session_setting('download_rate_limit', value)
+
+ def _on_set_max_upload_slots_global(self, key, value):
+ self.core.apply_session_setting('unchoke_slots_limit', value)
+
+ def _on_set_max_half_open_connections(self, key, value):
+ self.core.apply_session_setting('half_open_limit', value)
+
+ def _on_set_max_connections_per_second(self, key, value):
+ self.core.apply_session_setting('connection_speed', value)
+
+ def _on_set_ignore_limits_on_local_network(self, key, value):
+ self.core.apply_session_setting('ignore_limits_on_local_network', value)
+
+ def _on_set_share_ratio_limit(self, key, value):
+ # This value is a float percentage in deluge, but libtorrent needs int percentage.
+ self.core.apply_session_setting('share_ratio_limit', int(value * 100))
+
+ def _on_set_seed_time_ratio_limit(self, key, value):
+ # This value is a float percentage in deluge, but libtorrent needs int percentage.
+ self.core.apply_session_setting('seed_time_ratio_limit', int(value * 100))
+
+ def _on_set_seed_time_limit(self, key, value):
+ # This value is stored in minutes in deluge, but libtorrent wants seconds
+ self.core.apply_session_setting('seed_time_limit', int(value * 60))
+
+ def _on_set_max_active_downloading(self, key, value):
+ self.core.apply_session_setting('active_downloads', value)
+
+ def _on_set_max_active_seeding(self, key, value):
+ self.core.apply_session_setting('active_seeds', value)
+
+ def _on_set_max_active_limit(self, key, value):
+ self.core.apply_session_setting('active_limit', value)
+
+ def _on_set_dont_count_slow_torrents(self, key, value):
+ self.core.apply_session_setting('dont_count_slow_torrents', value)
+
+ def _on_set_send_info(self, key, value):
+ """sends anonymous stats home"""
+ log.debug('Sending anonymous stats..')
+
+ class SendInfoThread(threading.Thread):
+ def __init__(self, config):
+ self.config = config
+ threading.Thread.__init__(self)
+
+ def run(self):
+ import time
+
+ now = time.time()
+ # check if we've done this within the last week or never
+ if (now - self.config['info_sent']) >= (60 * 60 * 24 * 7):
+ try:
+ url = (
+ 'http://deluge-torrent.org/stats_get.php?processor='
+ + platform.machine()
+ + '&python='
+ + platform.python_version()
+ + '&deluge='
+ + deluge.common.get_version()
+ + '&os='
+ + platform.system()
+ + '&plugins='
+ + quote_plus(':'.join(self.config['enabled_plugins']))
+ )
+ urlopen(url)
+ except OSError as ex:
+ log.debug('Network error while trying to send info: %s', ex)
+ else:
+ self.config['info_sent'] = now
+
+ if value:
+ SendInfoThread(self.config).start()
+
+ def _on_set_new_release_check(self, key, value):
+ if value:
+ log.debug('Checking for new release..')
+ threading.Thread(target=self.core.get_new_release).start()
+ if self.new_release_timer and self.new_release_timer.running:
+ self.new_release_timer.stop()
+ # Set a timer to check for a new release every 3 days
+ self.new_release_timer = LoopingCall(
+ self._on_set_new_release_check, 'new_release_check', True
+ )
+ self.new_release_timer.start(72 * 60 * 60, False)
+ else:
+ if self.new_release_timer and self.new_release_timer.running:
+ self.new_release_timer.stop()
+
+ def _on_set_proxy(self, key, value):
+ # Initialise with type none and blank hostnames.
+ proxy_settings = {
+ 'proxy_type': lt.proxy_type_t.none,
+ 'i2p_hostname': '',
+ 'proxy_hostname': '',
+ 'proxy_hostnames': value['proxy_hostnames'],
+ 'proxy_peer_connections': value['proxy_peer_connections'],
+ 'proxy_tracker_connections': value['proxy_tracker_connections'],
+ 'force_proxy': value['force_proxy'],
+ 'anonymous_mode': value['anonymous_mode'],
+ }
+
+ if value['type'] == lt.proxy_type_t.i2p_proxy:
+ proxy_settings.update(
+ {
+ 'proxy_type': lt.proxy_type_t.i2p_proxy,
+ 'i2p_hostname': value['hostname'],
+ 'i2p_port': value['port'],
+ }
+ )
+ elif value['type'] != lt.proxy_type_t.none:
+ proxy_settings.update(
+ {
+ 'proxy_type': value['type'],
+ 'proxy_hostname': value['hostname'],
+ 'proxy_port': value['port'],
+ 'proxy_username': value['username'],
+ 'proxy_password': value['password'],
+ }
+ )
+
+ self.core.apply_session_settings(proxy_settings)
+
+ def _on_set_rate_limit_ip_overhead(self, key, value):
+ self.core.apply_session_setting('rate_limit_ip_overhead', value)
+
+ def _on_set_geoip_db_location(self, key, geoipdb_path):
+ # Load the GeoIP DB for country look-ups if available
+ if os.path.exists(geoipdb_path):
+ try:
+ self.core.geoip_instance = GeoIP(geoipdb_path, 0)
+ except Exception as ex:
+ log.warning('GeoIP Unavailable: %s', ex)
+ else:
+ log.warning('Unable to find GeoIP database file: %s', geoipdb_path)
+
+ def _on_set_cache_size(self, key, value):
+ self.core.apply_session_setting('cache_size', value)
+
+ def _on_set_cache_expiry(self, key, value):
+ self.core.apply_session_setting('cache_expiry', value)
+
+ def _on_auto_manage_prefer_seeds(self, key, value):
+ self.core.apply_session_setting('auto_manage_prefer_seeds', value)
diff --git a/deluge/core/rpcserver.py b/deluge/core/rpcserver.py
new file mode 100644
index 0000000..81ab2e0
--- /dev/null
+++ b/deluge/core/rpcserver.py
@@ -0,0 +1,598 @@
+#
+# Copyright (C) 2008,2009 Andrew Resch <andrewresch@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+"""RPCServer Module"""
+import logging
+import os
+import sys
+import traceback
+from collections import namedtuple
+from types import FunctionType
+from typing import Callable, TypeVar, overload
+
+from twisted.internet import defer, reactor
+from twisted.internet.protocol import Factory, connectionDone
+
+import deluge.component as component
+import deluge.configmanager
+from deluge.core.authmanager import (
+ AUTH_LEVEL_ADMIN,
+ AUTH_LEVEL_DEFAULT,
+ AUTH_LEVEL_NONE,
+)
+from deluge.crypto_utils import check_ssl_keys, get_context_factory
+from deluge.error import (
+ DelugeError,
+ IncompatibleClient,
+ NotAuthorizedError,
+ WrappedException,
+ _ClientSideRecreateError,
+)
+from deluge.event import ClientDisconnectedEvent
+from deluge.transfer import DelugeTransferProtocol
+
+RPC_RESPONSE = 1
+RPC_ERROR = 2
+RPC_EVENT = 3
+
+log = logging.getLogger(__name__)
+
+TCallable = TypeVar('TCallable', bound=Callable)
+
+
+@overload
+def export(func: TCallable) -> TCallable:
+ ...
+
+
+@overload
+def export(auth_level: int) -> Callable[[TCallable], TCallable]:
+ ...
+
+
+def export(auth_level=AUTH_LEVEL_DEFAULT):
+ """
+ Decorator function to register an object's method as an RPC. The object
+ will need to be registered with an :class:`RPCServer` to be effective.
+
+ :param func: the function to export
+ :type func: function
+ :param auth_level: the auth level required to call this method
+ :type auth_level: int
+
+ """
+
+ def wrap(func, *args, **kwargs):
+ func._rpcserver_export = True
+ func._rpcserver_auth_level = auth_level
+
+ rpc_text = '**RPC exported method** (*Auth level: %s*)' % auth_level
+
+ # Append the RPC text while ensuring correct docstring formatting.
+ if func.__doc__:
+ if func.__doc__.endswith(' '):
+ indent = func.__doc__.split('\n')[-1]
+ func.__doc__ += f'\n{indent}'
+ else:
+ func.__doc__ += '\n\n'
+ func.__doc__ += rpc_text
+ else:
+ func.__doc__ = rpc_text
+
+ return func
+
+ if isinstance(auth_level, FunctionType):
+ func = auth_level
+ auth_level = AUTH_LEVEL_DEFAULT
+ return wrap(func)
+ else:
+ return wrap
+
+
+def format_request(call):
+ """
+ Format the RPCRequest message for debug printing
+
+ :param call: the request
+ :type call: a RPCRequest
+
+ :returns: a formatted string for printing
+ :rtype: str
+
+ """
+ try:
+ s = call[1] + '('
+ if call[2]:
+ s += ', '.join([str(x) for x in call[2]])
+ if call[3]:
+ if call[2]:
+ s += ', '
+ s += ', '.join([key + '=' + str(value) for key, value in call[3].items()])
+ s += ')'
+ except UnicodeEncodeError:
+ return 'UnicodeEncodeError, call: %s' % call
+ else:
+ return s
+
+
+class DelugeRPCProtocol(DelugeTransferProtocol):
+ def __init__(self):
+ super().__init__()
+ # namedtuple subclass with auth_level, username for the connected session.
+ self.AuthLevel = namedtuple('SessionAuthlevel', 'auth_level, username')
+
+ def message_received(self, request):
+ """
+ This method is called whenever a message is received from a client. The
+ only message that a client sends to the server is a RPC Request message.
+ If the RPC Request message is valid, then the method is called in
+ :meth:`dispatch`.
+
+ :param request: the request from the client.
+ :type data: tuple
+
+ """
+ if not isinstance(request, tuple):
+ log.debug('Received invalid message: type is not tuple')
+ return
+
+ if len(request) < 1:
+ log.debug('Received invalid message: there are no items')
+ return
+
+ for call in request:
+ if len(call) != 4:
+ log.debug(
+ 'Received invalid rpc request: number of items ' 'in request is %s',
+ len(call),
+ )
+ continue
+ # log.debug('RPCRequest: %s', format_request(call))
+ reactor.callLater(0, self.dispatch, *call)
+
+ def sendData(self, data): # NOQA: N802
+ """
+ Sends the data to the client.
+
+ :param data: the object that is to be sent to the client. This should
+ be one of the RPC message types.
+ :type data: object
+
+ """
+ try:
+ self.transfer_message(data)
+ except Exception as ex:
+ log.warning('Error occurred when sending message: %s.', ex)
+ log.exception(ex)
+ raise
+
+ def connectionMade(self): # NOQA: N802
+ """
+ This method is called when a new client connects.
+ """
+ peer = self.transport.getPeer()
+ log.info('Deluge Client connection made from: %s:%s', peer.host, peer.port)
+ # Set the initial auth level of this session to AUTH_LEVEL_NONE
+ self.factory.authorized_sessions[self.transport.sessionno] = self.AuthLevel(
+ AUTH_LEVEL_NONE, ''
+ )
+
+ def connectionLost(self, reason=connectionDone): # NOQA: N802
+ """
+ This method is called when the client is disconnected.
+
+ :param reason: the reason the client disconnected.
+ :type reason: str
+
+ """
+
+ # We need to remove this session from various dicts
+ del self.factory.authorized_sessions[self.transport.sessionno]
+ if self.transport.sessionno in self.factory.session_protocols:
+ del self.factory.session_protocols[self.transport.sessionno]
+ if self.transport.sessionno in self.factory.interested_events:
+ del self.factory.interested_events[self.transport.sessionno]
+
+ if self.factory.state == 'running':
+ component.get('EventManager').emit(
+ ClientDisconnectedEvent(self.factory.session_id)
+ )
+ log.info('Deluge client disconnected: %s', reason.value)
+
+ def valid_session(self):
+ return self.transport.sessionno in self.factory.authorized_sessions
+
+ def dispatch(self, request_id, method, args, kwargs):
+ """
+ This method is run when a RPC Request is made. It will run the local method
+ and will send either a RPC Response or RPC Error back to the client.
+
+ :param request_id: the request_id from the client (sent in the RPC Request)
+ :type request_id: int
+ :param method: the local method to call. It must be registered with
+ the :class:`RPCServer`.
+ :type method: str
+ :param args: the arguments to pass to `method`
+ :type args: list
+ :param kwargs: the keyword-arguments to pass to `method`
+ :type kwargs: dict
+
+ """
+
+ def send_error():
+ """
+ Sends an error response with the contents of the exception that was raised.
+ """
+ exc_type, exc_value, dummy_exc_trace = sys.exc_info()
+ formated_tb = traceback.format_exc()
+ try:
+ self.sendData(
+ (
+ RPC_ERROR,
+ request_id,
+ exc_type.__name__,
+ exc_value._args,
+ exc_value._kwargs,
+ formated_tb,
+ )
+ )
+ except AttributeError:
+ # This is not a deluge exception (object has no attribute '_args), let's wrap it
+ log.warning(
+ 'An exception occurred while sending RPC_ERROR to '
+ 'client. Wrapping it and resending. Error to '
+ 'send(causing exception goes next):\n%s',
+ formated_tb,
+ )
+ try:
+ raise WrappedException(
+ str(exc_value), exc_type.__name__, formated_tb
+ )
+ except WrappedException:
+ send_error()
+ except Exception as ex:
+ log.error(
+ 'An exception occurred while sending RPC_ERROR to client: %s', ex
+ )
+
+ if method == 'daemon.info':
+ # This is a special case and used in the initial connection process
+ self.sendData((RPC_RESPONSE, request_id, deluge.common.get_version()))
+ return
+ elif method == 'daemon.login':
+ # This is a special case and used in the initial connection process
+ # We need to authenticate the user here
+ log.debug('RPC dispatch daemon.login')
+ try:
+ client_version = kwargs.pop('client_version', None)
+ if client_version is None:
+ raise IncompatibleClient(deluge.common.get_version())
+ ret = component.get('AuthManager').authorize(*args, **kwargs)
+ if ret:
+ self.factory.authorized_sessions[
+ self.transport.sessionno
+ ] = self.AuthLevel(ret, args[0])
+ self.factory.session_protocols[self.transport.sessionno] = self
+ except Exception as ex:
+ send_error()
+ if not isinstance(ex, _ClientSideRecreateError):
+ log.exception(ex)
+ else:
+ self.sendData((RPC_RESPONSE, request_id, (ret)))
+ if not ret:
+ self.transport.loseConnection()
+ return
+
+ # Anything below requires a valid session
+ if not self.valid_session():
+ return
+
+ if method == 'daemon.set_event_interest':
+ log.debug('RPC dispatch daemon.set_event_interest')
+ # This special case is to allow clients to set which events they are
+ # interested in receiving.
+ # We are expecting a sequence from the client.
+ try:
+ if self.transport.sessionno not in self.factory.interested_events:
+ self.factory.interested_events[self.transport.sessionno] = []
+ self.factory.interested_events[self.transport.sessionno].extend(args[0])
+ except Exception:
+ send_error()
+ else:
+ self.sendData((RPC_RESPONSE, request_id, (True)))
+ return
+
+ if method not in self.factory.methods:
+ try:
+ # Raise exception to be sent back to client
+ raise AttributeError('RPC call on invalid function: %s' % method)
+ except AttributeError:
+ send_error()
+ return
+
+ log.debug('RPC dispatch %s', method)
+ try:
+ method_auth_requirement = self.factory.methods[method]._rpcserver_auth_level
+ auth_level = self.factory.authorized_sessions[
+ self.transport.sessionno
+ ].auth_level
+ if auth_level < method_auth_requirement:
+ # This session is not allowed to call this method
+ log.debug(
+ 'Session %s is attempting an unauthorized method call!',
+ self.transport.sessionno,
+ )
+ raise NotAuthorizedError(auth_level, method_auth_requirement)
+ # Set the session_id in the factory so that methods can know
+ # which session is calling it.
+ self.factory.session_id = self.transport.sessionno
+ ret = self.factory.methods[method](*args, **kwargs)
+ except Exception as ex:
+ send_error()
+ # Don't bother printing out DelugeErrors, because they are just
+ # for the client
+ if not isinstance(ex, DelugeError):
+ log.exception('Exception calling RPC request: %s', ex)
+ else:
+ # Check if the return value is a deferred, since we'll need to
+ # wait for it to fire before sending the RPC_RESPONSE
+ if isinstance(ret, defer.Deferred):
+
+ def on_success(result):
+ try:
+ self.sendData((RPC_RESPONSE, request_id, result))
+ except Exception:
+ send_error()
+ return result
+
+ def on_fail(failure):
+ try:
+ failure.raiseException()
+ except Exception:
+ send_error()
+ return failure
+
+ ret.addCallbacks(on_success, on_fail)
+ else:
+ self.sendData((RPC_RESPONSE, request_id, ret))
+
+
+class RPCServer(component.Component):
+ """
+ This class is used to handle rpc requests from the client. Objects are
+ registered with this class and their methods are exported using the export
+ decorator.
+
+ :param port: the port the RPCServer will listen on
+ :type port: int
+ :param interface: the interface to listen on, this may override the `allow_remote` setting
+ :type interface: str
+ :param allow_remote: set True if the server should allow remote connections
+ :type allow_remote: bool
+ :param listen: if False, will not start listening.. This is only useful in Classic Mode
+ :type listen: bool
+ """
+
+ def __init__(self, port=58846, interface='', allow_remote=False, listen=True):
+ component.Component.__init__(self, 'RPCServer')
+
+ self.factory = Factory()
+ self.factory.protocol = DelugeRPCProtocol
+ self.factory.session_id = -1
+ self.factory.state = 'running'
+
+ # Holds the registered methods
+ self.factory.methods = {}
+ # Holds the session_ids and auth levels
+ self.factory.authorized_sessions = {}
+ # Holds the protocol objects with the session_id as key
+ self.factory.session_protocols = {}
+ # Holds the interested event list for the sessions
+ self.factory.interested_events = {}
+
+ self.listen = listen
+ if not listen:
+ return
+
+ if allow_remote:
+ hostname = ''
+ else:
+ hostname = 'localhost'
+
+ if interface:
+ hostname = interface
+
+ log.info('Starting DelugeRPC server %s:%s', hostname, port)
+
+ # Check for SSL keys and generate some if needed
+ check_ssl_keys()
+
+ cert = os.path.join(deluge.configmanager.get_config_dir('ssl'), 'daemon.cert')
+ pkey = os.path.join(deluge.configmanager.get_config_dir('ssl'), 'daemon.pkey')
+
+ try:
+ reactor.listenSSL(
+ port, self.factory, get_context_factory(cert, pkey), interface=hostname
+ )
+ except Exception as ex:
+ log.debug('Daemon already running or port not available.: %s', ex)
+ raise
+
+ def register_object(self, obj, name=None):
+ """
+ Registers an object to export it's rpc methods. These methods should
+ be exported with the export decorator prior to registering the object.
+
+ :param obj: the object that we want to export
+ :type obj: object
+ :param name: the name to use, if None, it will be the class name of the object
+ :type name: str
+ """
+ if not name:
+ name = obj.__class__.__name__.lower()
+
+ for d in dir(obj):
+ if d[0] == '_':
+ continue
+ if getattr(getattr(obj, d), '_rpcserver_export', False):
+ log.debug('Registering method: %s', name + '.' + d)
+ self.factory.methods[name + '.' + d] = getattr(obj, d)
+
+ def deregister_object(self, obj):
+ """
+ Deregisters an objects exported rpc methods.
+
+ :param obj: the object that was previously registered
+
+ """
+ for key, value in self.factory.methods.items():
+ if value.__self__ == obj:
+ del self.factory.methods[key]
+
+ def get_object_method(self, name):
+ """
+ Returns a registered method.
+
+ :param name: the name of the method, usually in the form of 'object.method'
+ :type name: str
+
+ :returns: method
+
+ :raises KeyError: if `name` is not registered
+
+ """
+ return self.factory.methods[name]
+
+ def get_method_list(self):
+ """
+ Returns a list of the exported methods.
+
+ :returns: the exported methods
+ :rtype: list
+ """
+ return list(self.factory.methods)
+
+ def get_session_id(self):
+ """
+ Returns the session id of the current RPC.
+
+ :returns: the session id, this will be -1 if no connections have been made
+ :rtype: int
+
+ """
+ return self.factory.session_id
+
+ def get_session_user(self):
+ """
+ Returns the username calling the current RPC.
+
+ :returns: the username of the user calling the current RPC
+ :rtype: string
+
+ """
+ if not self.listen:
+ return 'localclient'
+ session_id = self.get_session_id()
+ if session_id > -1 and session_id in self.factory.authorized_sessions:
+ return self.factory.authorized_sessions[session_id].username
+ else:
+ # No connections made yet
+ return ''
+
+ def get_session_auth_level(self):
+ """
+ Returns the auth level of the user calling the current RPC.
+
+ :returns: the auth level
+ :rtype: int
+ """
+ if not self.listen or not self.is_session_valid(self.get_session_id()):
+ return AUTH_LEVEL_ADMIN
+ return self.factory.authorized_sessions[self.get_session_id()].auth_level
+
+ def get_rpc_auth_level(self, rpc):
+ """
+ Returns the auth level requirement for an exported rpc.
+
+ :returns: the auth level
+ :rtype: int
+ """
+ return self.factory.methods[rpc]._rpcserver_auth_level
+
+ def is_session_valid(self, session_id):
+ """
+ Checks if the session is still valid, eg, if the client is still connected.
+
+ :param session_id: the session id
+ :type session_id: int
+
+ :returns: True if the session is valid
+ :rtype: bool
+
+ """
+ return session_id in self.factory.authorized_sessions
+
+ def emit_event(self, event):
+ """
+ Emits the event to interested clients.
+
+ :param event: the event to emit
+ :type event: :class:`deluge.event.DelugeEvent`
+ """
+ log.debug('intevents: %s', self.factory.interested_events)
+ # Use copy of `interested_events` since it can mutate while iterating.
+ for session_id, interest in self.factory.interested_events.copy().items():
+ if event.name in interest:
+ log.debug('Emit Event: %s %s', event.name, event.args)
+ # This session is interested so send a RPC_EVENT
+ self.factory.session_protocols[session_id].sendData(
+ (RPC_EVENT, event.name, event.args)
+ )
+
+ def emit_event_for_session_id(self, session_id, event):
+ """
+ Emits the event to specified session_id.
+
+ :param session_id: the event to emit
+ :type session_id: int
+ :param event: the event to emit
+ :type event: :class:`deluge.event.DelugeEvent`
+ """
+ if not self.is_session_valid(session_id):
+ log.debug(
+ 'Session ID %s is not valid. Not sending event "%s".',
+ session_id,
+ event.name,
+ )
+ return
+ if session_id not in self.factory.interested_events:
+ log.debug(
+ 'Session ID %s is not interested in any events. Not sending event "%s".',
+ session_id,
+ event.name,
+ )
+ return
+ if event.name not in self.factory.interested_events[session_id]:
+ log.debug(
+ 'Session ID %s is not interested in event "%s". Not sending it.',
+ session_id,
+ event.name,
+ )
+ return
+ log.debug(
+ 'Sending event "%s" with args "%s" to session id "%s".',
+ event.name,
+ event.args,
+ session_id,
+ )
+ self.factory.session_protocols[session_id].sendData(
+ (RPC_EVENT, event.name, event.args)
+ )
+
+ def stop(self):
+ self.factory.state = 'stopping'
diff --git a/deluge/core/torrent.py b/deluge/core/torrent.py
new file mode 100644
index 0000000..57ec26f
--- /dev/null
+++ b/deluge/core/torrent.py
@@ -0,0 +1,1563 @@
+#
+# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+"""Internal Torrent class
+
+Attributes:
+ LT_TORRENT_STATE_MAP (dict): Maps the torrent state from libtorrent to Deluge state.
+
+"""
+
+import logging
+import os
+import socket
+import time
+from typing import Optional
+from urllib.parse import urlparse
+
+from twisted.internet.defer import Deferred, DeferredList
+
+import deluge.component as component
+from deluge._libtorrent import lt
+from deluge.common import decode_bytes
+from deluge.configmanager import ConfigManager, get_config_dir
+from deluge.core.authmanager import AUTH_LEVEL_ADMIN
+from deluge.decorators import deprecated
+from deluge.event import (
+ TorrentFolderRenamedEvent,
+ TorrentStateChangedEvent,
+ TorrentTrackerStatusEvent,
+)
+
+log = logging.getLogger(__name__)
+
+LT_TORRENT_STATE_MAP = {
+ 'queued_for_checking': 'Checking',
+ 'checking_files': 'Checking',
+ 'downloading_metadata': 'Downloading',
+ 'downloading': 'Downloading',
+ 'finished': 'Seeding',
+ 'seeding': 'Seeding',
+ 'allocating': 'Allocating',
+ 'checking_resume_data': 'Checking',
+}
+
+
+def sanitize_filepath(filepath, folder=False):
+ """Returns a sanitized filepath to pass to libtorrent rename_file().
+
+ The filepath will have backslashes substituted along with whitespace
+ padding and duplicate slashes stripped.
+
+ Args:
+ folder (bool): A trailing slash is appended to the returned filepath.
+ """
+
+ def clean_filename(filename):
+ """Strips whitespace and discards dotted filenames"""
+ filename = filename.strip()
+ if filename.replace('.', '') == '':
+ return ''
+ return filename
+
+ if '\\' in filepath or '/' in filepath:
+ folderpath = filepath.replace('\\', '/').split('/')
+ folderpath = [clean_filename(x) for x in folderpath]
+ newfilepath = '/'.join([path for path in folderpath if path])
+ else:
+ newfilepath = clean_filename(filepath)
+
+ if folder is True:
+ newfilepath += '/'
+
+ return newfilepath
+
+
+def convert_lt_files(files):
+ """Indexes and decodes files from libtorrent get_files().
+
+ Args:
+ files (file_storage): The libtorrent torrent files.
+
+ Returns:
+ list of dict: The files.
+
+ The format for the file dict::
+
+ {
+ "index": int,
+ "path": str,
+ "size": int,
+ "offset": int
+ }
+ """
+ filelist = []
+ for index in range(files.num_files()):
+ try:
+ file_path = files.file_path(index).decode('utf8')
+ except AttributeError:
+ file_path = files.file_path(index)
+
+ filelist.append(
+ {
+ 'index': index,
+ 'path': file_path.replace('\\', '/'),
+ 'size': files.file_size(index),
+ 'offset': files.file_offset(index),
+ }
+ )
+
+ return filelist
+
+
+class TorrentOptions(dict):
+ """TorrentOptions create a dict of the torrent options.
+
+ Attributes:
+ add_paused (bool): Add the torrrent in a paused state.
+ auto_managed (bool): Set torrent to auto managed mode, i.e. will be started or queued automatically.
+ download_location (str): The path for the torrent data to be stored while downloading.
+ file_priorities (list of int): The priority for files in torrent, range is [0..7] however
+ only [0, 1, 4, 7] are normally used and correspond to [Skip, Low, Normal, High]
+ mapped_files (dict): A mapping of the renamed filenames in 'index:filename' pairs.
+ max_connections (int): Sets maximum number of connections this torrent will open.
+ This must be at least 2. The default is unlimited (-1).
+ max_download_speed (float): Will limit the download bandwidth used by this torrent to the
+ limit you set.The default is unlimited (-1) but will not exceed global limit.
+ max_upload_slots (int): Sets the maximum number of peers that are
+ unchoked at the same time on this torrent. This defaults to infinite (-1).
+ max_upload_speed (float): Will limit the upload bandwidth used by this torrent to the limit
+ you set. The default is unlimited (-1) but will not exceed global limit.
+ move_completed (bool): Move the torrent when downloading has finished.
+ move_completed_path (str): The path to move torrent to when downloading has finished.
+ name (str): The display name of the torrent.
+ owner (str): The user this torrent belongs to.
+ pre_allocate_storage (bool): When adding the torrent should all files be pre-allocated.
+ prioritize_first_last_pieces (bool): Prioritize the first and last pieces in the torrent.
+ remove_at_ratio (bool): Remove the torrent when it has reached the stop_ratio.
+ seed_mode (bool): Assume that all files are present for this torrent (Only used when adding a torent).
+ sequential_download (bool): Download the pieces of the torrent in order.
+ shared (bool): Enable the torrent to be seen by other Deluge users.
+ stop_at_ratio (bool): Stop the torrent when it has reached stop_ratio.
+ stop_ratio (float): The seeding ratio to stop (or remove) the torrent at.
+ super_seeding (bool): Enable super seeding/initial seeding.
+ """
+
+ def __init__(self):
+ super().__init__()
+ config = ConfigManager('core.conf').config
+ options_conf_map = {
+ 'add_paused': 'add_paused',
+ 'auto_managed': 'auto_managed',
+ 'download_location': 'download_location',
+ 'max_connections': 'max_connections_per_torrent',
+ 'max_download_speed': 'max_download_speed_per_torrent',
+ 'max_upload_slots': 'max_upload_slots_per_torrent',
+ 'max_upload_speed': 'max_upload_speed_per_torrent',
+ 'move_completed': 'move_completed',
+ 'move_completed_path': 'move_completed_path',
+ 'pre_allocate_storage': 'pre_allocate_storage',
+ 'prioritize_first_last_pieces': 'prioritize_first_last_pieces',
+ 'remove_at_ratio': 'remove_seed_at_ratio',
+ 'sequential_download': 'sequential_download',
+ 'shared': 'shared',
+ 'stop_at_ratio': 'stop_seed_at_ratio',
+ 'stop_ratio': 'stop_seed_ratio',
+ 'super_seeding': 'super_seeding',
+ }
+ for opt_k, conf_k in options_conf_map.items():
+ self[opt_k] = config[conf_k]
+ self['file_priorities'] = []
+ self['mapped_files'] = {}
+ self['name'] = ''
+ self['owner'] = ''
+ self['seed_mode'] = False
+
+
+class TorrentError:
+ def __init__(self, error_message, was_paused=False, restart_to_resume=False):
+ self.error_message = error_message
+ self.was_paused = was_paused
+ self.restart_to_resume = restart_to_resume
+
+
+class Torrent:
+ """Torrent holds information about torrents added to the libtorrent session.
+
+ Args:
+ handle: The libtorrent torrent handle.
+ options (dict): The torrent options.
+ state (TorrentState): The torrent state.
+ filename (str): The filename of the torrent file.
+ magnet (str): The magnet URI.
+
+ Attributes:
+ torrent_id (str): The torrent_id for this torrent
+ handle: Holds the libtorrent torrent handle
+ magnet (str): The magnet URI used to add this torrent (if available).
+ status: Holds status info so that we don"t need to keep getting it from libtorrent.
+ torrent_info: store the torrent info.
+ has_metadata (bool): True if the metadata for the torrent is available, False otherwise.
+ status_funcs (dict): The function mappings to get torrent status
+ prev_status (dict): Previous status dicts returned for this torrent. We use this to return
+ dicts that only contain changes from the previous.
+ {session_id: status_dict, ...}
+ waiting_on_folder_rename (list of dict): A list of Deferreds for file indexes we're waiting for file_rename
+ alerts on. This is so we can send one folder_renamed signal instead of multiple file_renamed signals.
+ [{index: Deferred, ...}, ...]
+ options (dict): The torrent options.
+ filename (str): The filename of the torrent file in case it is required.
+ is_finished (bool): Keep track if torrent is finished to prevent some weird things on state load.
+ statusmsg (str): Status message holds error/extra info about the torrent.
+ state (str): The torrent's state
+ trackers (list of dict): The torrent's trackers
+ tracker_status (str): Status message of currently connected tracker
+ tracker_host (str): Hostname of the currently connected tracker
+ forcing_recheck (bool): Keep track if we're forcing a recheck of the torrent
+ forcing_recheck_paused (bool): Keep track if we're forcing a recheck of the torrent so that
+ we can re-pause it after its done if necessary
+ forced_error (TorrentError): Keep track if we have forced this torrent to be in Error state.
+ """
+
+ def __init__(self, handle, options, state=None, filename=None, magnet=None):
+ self.torrent_id = str(handle.info_hash())
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Creating torrent object %s', self.torrent_id)
+
+ # Get the core config
+ self.config = ConfigManager('core.conf')
+ self.rpcserver = component.get('RPCServer')
+
+ self.handle = handle
+
+ self.magnet = magnet
+ self._status: Optional['lt.torrent_status'] = None
+ self._status_last_update: float = 0.0
+
+ self.torrent_info = self.handle.torrent_file()
+ self.has_metadata = self.status.has_metadata
+
+ self.options = TorrentOptions()
+ self.options.update(options)
+
+ # Load values from state if we have it
+ if state:
+ self.set_trackers(state.trackers)
+ self.is_finished = state.is_finished
+ self.filename = state.filename
+ else:
+ self.set_trackers()
+ self.is_finished = False
+ self.filename = filename
+
+ if not self.filename:
+ self.filename = ''
+
+ self.forced_error = None
+ self.statusmsg = None
+ self.state = None
+ self.moving_storage_dest_path = None
+ self.tracker_status = ''
+ self.tracker_host = None
+ self.forcing_recheck = False
+ self.forcing_recheck_paused = False
+ self.status_funcs = None
+ self.prev_status = {}
+ self.waiting_on_folder_rename = []
+
+ self._create_status_funcs()
+ self.set_options(self.options)
+ self.update_state()
+
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Torrent object created.')
+
+ def _set_handle_flags(self, flag: lt.torrent_flags, set_flag: bool):
+ """set or unset a flag to the lt handle
+
+ Args:
+ flag (lt.torrent_flags): the flag to set/unset
+ set_flag (bool): True for setting the flag, False for unsetting it
+ """
+ if set_flag:
+ self.handle.set_flags(flag)
+ else:
+ self.handle.unset_flags(flag)
+
+ def on_metadata_received(self):
+ """Process the metadata received alert for this torrent"""
+ self.has_metadata = True
+ self.torrent_info = self.handle.get_torrent_info()
+ if self.options['prioritize_first_last_pieces']:
+ self.set_prioritize_first_last_pieces(True)
+ self.write_torrentfile()
+
+ # --- Options methods ---
+ def set_options(self, options):
+ """Set the torrent options.
+
+ Args:
+ options (dict): Torrent options, see TorrentOptions class for valid keys.
+ """
+
+ # Skip set_prioritize_first_last if set_file_priorities is in options as it also calls the method.
+ if 'file_priorities' in options and 'prioritize_first_last_pieces' in options:
+ self.options['prioritize_first_last_pieces'] = options.pop(
+ 'prioritize_first_last_pieces'
+ )
+
+ for key, value in options.items():
+ if key in self.options:
+ options_set_func = getattr(self, 'set_' + key, None)
+ if options_set_func:
+ options_set_func(value)
+ else:
+ # Update config options that do not have funcs
+ self.options[key] = value
+
+ def get_options(self):
+ """Get the torrent options.
+
+ Returns:
+ dict: the torrent options.
+ """
+ return self.options
+
+ def set_max_connections(self, max_connections):
+ """Sets maximum number of connections this torrent will open.
+
+ Args:
+ max_connections (int): Maximum number of connections
+
+ Note:
+ The minimum value for handle.max_connections is 2 (or -1 for unlimited connections).
+ This is enforced by libtorrent and values 0 or 1 raise an assert with lt debug builds.
+ """
+
+ if max_connections == 0:
+ max_connections = -1
+ elif max_connections == 1:
+ max_connections = 2
+
+ self.options['max_connections'] = max_connections
+ self.handle.set_max_connections(max_connections)
+
+ def set_max_upload_slots(self, max_slots):
+ """Sets maximum number of upload slots for this torrent.
+
+ Args:
+ max_slots (int): Maximum upload slots
+ """
+ self.options['max_upload_slots'] = max_slots
+ self.handle.set_max_uploads(max_slots)
+
+ def set_max_upload_speed(self, m_up_speed):
+ """Sets maximum upload speed for this torrent.
+
+ Args:
+ m_up_speed (float): Maximum upload speed in KiB/s.
+ """
+ self.options['max_upload_speed'] = m_up_speed
+ if m_up_speed < 0:
+ value = -1
+ else:
+ value = int(m_up_speed * 1024)
+ self.handle.set_upload_limit(value)
+
+ def set_max_download_speed(self, m_down_speed):
+ """Sets maximum download speed for this torrent.
+
+ Args:
+ m_down_speed (float): Maximum download speed in KiB/s.
+ """
+ self.options['max_download_speed'] = m_down_speed
+ if m_down_speed < 0:
+ value = -1
+ else:
+ value = int(m_down_speed * 1024)
+ self.handle.set_download_limit(value)
+
+ @deprecated
+ def set_prioritize_first_last(self, prioritize):
+ """Deprecated: Use set_prioritize_first_last_pieces."""
+ self.set_prioritize_first_last_pieces(prioritize)
+
+ def set_prioritize_first_last_pieces(self, prioritize):
+ """Prioritize the first and last pieces in the torrent.
+
+ Args:
+ prioritize (bool): Prioritize the first and last pieces.
+
+ """
+ if not self.has_metadata:
+ return
+
+ self.options['prioritize_first_last_pieces'] = prioritize
+ if not prioritize:
+ # If we are turning off this option, call set_file_priorities to
+ # reset all the piece priorities
+ self.set_file_priorities(self.options['file_priorities'])
+ return
+
+ # A list of priorities for each piece in the torrent
+ priorities = self.handle.get_piece_priorities()
+
+ def get_file_piece(idx, byte_offset):
+ return self.torrent_info.map_file(idx, byte_offset, 0).piece
+
+ for idx in range(self.torrent_info.num_files()):
+ file_size = self.torrent_info.files().file_size(idx)
+ two_percent_bytes = int(0.02 * file_size)
+ # Get the pieces for the byte offsets
+ first_start = get_file_piece(idx, 0)
+ first_end = get_file_piece(idx, two_percent_bytes) + 1
+ last_start = get_file_piece(idx, file_size - two_percent_bytes)
+ last_end = get_file_piece(idx, max(file_size - 1, 0)) + 1
+
+ # Set the pieces in first and last ranges to priority 7
+ # if they are not marked as do not download
+ priorities[first_start:first_end] = [
+ p and 7 for p in priorities[first_start:first_end]
+ ]
+ priorities[last_start:last_end] = [
+ p and 7 for p in priorities[last_start:last_end]
+ ]
+
+ # Setting the priorites for all the pieces of this torrent
+ self.handle.prioritize_pieces(priorities)
+
+ def set_sequential_download(self, sequential):
+ """Sets whether to download the pieces of the torrent in order.
+
+ Args:
+ sequential (bool): Enable sequential downloading.
+ """
+ self.options['sequential_download'] = sequential
+ self._set_handle_flags(
+ flag=lt.torrent_flags.sequential_download,
+ set_flag=sequential,
+ )
+
+ def set_auto_managed(self, auto_managed):
+ """Set auto managed mode, i.e. will be started or queued automatically.
+
+ Args:
+ auto_managed (bool): Enable auto managed.
+ """
+ self.options['auto_managed'] = auto_managed
+ if not (self.status.paused and not self.status.auto_managed):
+ self._set_handle_flags(
+ flag=lt.torrent_flags.auto_managed,
+ set_flag=auto_managed,
+ )
+ self.update_state()
+
+ def set_super_seeding(self, super_seeding):
+ """Set super seeding/initial seeding.
+
+ Args:
+ super_seeding (bool): Enable super seeding.
+ """
+ self.options['super_seeding'] = super_seeding
+ self._set_handle_flags(
+ flag=lt.torrent_flags.super_seeding,
+ set_flag=super_seeding,
+ )
+
+ def set_stop_ratio(self, stop_ratio):
+ """The seeding ratio to stop (or remove) the torrent at.
+
+ Args:
+ stop_ratio (float): The seeding ratio.
+ """
+ self.options['stop_ratio'] = stop_ratio
+
+ def set_stop_at_ratio(self, stop_at_ratio):
+ """Stop the torrent when it has reached stop_ratio.
+
+ Args:
+ stop_at_ratio (bool): Stop the torrent.
+ """
+ self.options['stop_at_ratio'] = stop_at_ratio
+
+ def set_remove_at_ratio(self, remove_at_ratio):
+ """Remove the torrent when it has reached the stop_ratio.
+
+ Args:
+ remove_at_ratio (bool): Remove the torrent.
+ """
+ self.options['remove_at_ratio'] = remove_at_ratio
+
+ def set_move_completed(self, move_completed):
+ """Set whether to move the torrent when downloading has finished.
+
+ Args:
+ move_completed (bool): Move the torrent.
+
+ """
+ self.options['move_completed'] = move_completed
+
+ def set_move_completed_path(self, move_completed_path):
+ """Set the path to move torrent to when downloading has finished.
+
+ Args:
+ move_completed_path (str): The move path.
+ """
+ self.options['move_completed_path'] = move_completed_path
+
+ def set_file_priorities(self, file_priorities):
+ """Sets the file priotities.
+
+ Args:
+ file_priorities (list of int): List of file priorities.
+ """
+ if not self.has_metadata:
+ return
+
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug(
+ 'Setting %s file priorities to: %s', self.torrent_id, file_priorities
+ )
+
+ if file_priorities and len(file_priorities) == len(self.get_files()):
+ self.handle.prioritize_files(file_priorities)
+ else:
+ log.debug('Unable to set new file priorities.')
+ file_priorities = self.handle.get_file_priorities()
+
+ if 0 in self.options['file_priorities']:
+ # Previously marked a file 'skip' so check for any 0's now >0.
+ for index, priority in enumerate(self.options['file_priorities']):
+ if priority == 0 and file_priorities[index] > 0:
+ # Changed priority from skip to download so update state.
+ self.is_finished = False
+ self.update_state()
+ break
+
+ # Store the priorities.
+ self.options['file_priorities'] = file_priorities
+
+ # Set the first/last priorities if needed.
+ if self.options['prioritize_first_last_pieces']:
+ self.set_prioritize_first_last_pieces(True)
+
+ @deprecated
+ def set_save_path(self, download_location):
+ """Deprecated: Use set_download_location."""
+ self.set_download_location(download_location)
+
+ def set_download_location(self, download_location):
+ """The location for downloading torrent data."""
+ self.options['download_location'] = download_location
+
+ def set_owner(self, account):
+ """Sets the owner of this torrent.
+
+ Args:
+ account (str): The new owner account name.
+
+ Notes:
+ Only a user with admin level auth can change this value.
+
+ """
+
+ if self.rpcserver.get_session_auth_level() == AUTH_LEVEL_ADMIN:
+ self.options['owner'] = account
+
+ # End Options methods #
+
+ def set_trackers(self, trackers=None):
+ """Sets the trackers for this torrent.
+
+ Args:
+ trackers (list of dicts): A list of trackers.
+ """
+ if trackers is None:
+ self.trackers = list(self.handle.trackers())
+ self.tracker_host = None
+ return
+
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Setting trackers for %s: %s', self.torrent_id, trackers)
+
+ tracker_list = []
+
+ for tracker in trackers:
+ new_entry = lt.announce_entry(str(tracker['url']))
+ new_entry.tier = tracker['tier']
+ tracker_list.append(new_entry)
+ self.handle.replace_trackers(tracker_list)
+
+ # Print out the trackers
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Trackers set for %s:', self.torrent_id)
+ for tracker in self.handle.trackers():
+ log.debug(' [tier %s]: %s', tracker['tier'], tracker['url'])
+ # Set the tracker list in the torrent object
+ self.trackers = trackers
+ if len(trackers) > 0:
+ # Force a re-announce if there is at least 1 tracker
+ self.force_reannounce()
+ self.tracker_host = None
+
+ def set_tracker_status(self, status):
+ """Sets the tracker status.
+
+ Args:
+ status (str): The tracker status.
+
+ Emits:
+ TorrentTrackerStatusEvent upon tracker status change.
+
+ """
+
+ self.tracker_host = None
+
+ if self.tracker_status != status:
+ self.tracker_status = status
+ component.get('EventManager').emit(
+ TorrentTrackerStatusEvent(self.torrent_id, self.tracker_status)
+ )
+
+ def merge_trackers(self, torrent_info):
+ """Merges new trackers in torrent_info into torrent"""
+ log.info(
+ 'Adding any new trackers to torrent (%s) already in session...',
+ self.torrent_id,
+ )
+ if not torrent_info:
+ return
+ # Don't merge trackers if either torrent has private flag set.
+ if torrent_info.priv() or self.get_status(['private'])['private']:
+ log.info('Adding trackers aborted: Torrent has private flag set.')
+ else:
+ for tracker in torrent_info.trackers():
+ self.handle.add_tracker({'url': tracker.url, 'tier': tracker.tier})
+ # Update torrent.trackers from libtorrent handle.
+ self.set_trackers()
+
+ def update_state(self):
+ """Updates the state, based on libtorrent's torrent state"""
+ status = self.get_lt_status()
+ session_paused = component.get('Core').session.is_paused()
+ old_state = self.state
+ self.set_status_message()
+ status_error = status.errc.message() if status.errc.value() else ''
+
+ if self.forced_error:
+ self.state = 'Error'
+ self.set_status_message(self.forced_error.error_message)
+ elif status_error:
+ self.state = 'Error'
+ # auto-manage status will be reverted upon resuming.
+ self._set_handle_flags(
+ flag=lt.torrent_flags.auto_managed,
+ set_flag=False,
+ )
+ self.set_status_message(decode_bytes(status_error))
+ elif status.moving_storage:
+ self.state = 'Moving'
+ elif not session_paused and status.paused and status.auto_managed:
+ self.state = 'Queued'
+ elif session_paused or status.paused:
+ self.state = 'Paused'
+ else:
+ self.state = LT_TORRENT_STATE_MAP.get(str(status.state), str(status.state))
+
+ if self.state != old_state:
+ component.get('EventManager').emit(
+ TorrentStateChangedEvent(self.torrent_id, self.state)
+ )
+
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug(
+ 'State from lt was: %s | Session is paused: %s\nTorrent state set from "%s" to "%s" (%s)',
+ 'error' if status_error else status.state,
+ session_paused,
+ old_state,
+ self.state,
+ self.torrent_id,
+ )
+ if self.forced_error:
+ log.debug(
+ 'Torrent Error state message: %s', self.forced_error.error_message
+ )
+
+ def set_status_message(self, message=None):
+ """Sets the torrent status message.
+
+ Calling method without a message will reset the message to 'OK'.
+
+ Args:
+ message (str, optional): The status message.
+
+ """
+ if not message:
+ message = 'OK'
+ self.statusmsg = message
+
+ def force_error_state(self, message, restart_to_resume=True):
+ """Forces the torrent into an error state.
+
+ For setting an error state not covered by libtorrent.
+
+ Args:
+ message (str): The error status message.
+ restart_to_resume (bool, optional): Prevent resuming clearing the error, only restarting
+ session can resume.
+ """
+ status = self.get_lt_status()
+ self._set_handle_flags(
+ flag=lt.torrent_flags.auto_managed,
+ set_flag=False,
+ )
+ self.forced_error = TorrentError(message, status.paused, restart_to_resume)
+ if not status.paused:
+ self.handle.pause()
+ self.update_state()
+
+ def clear_forced_error_state(self, update_state=True):
+ if not self.forced_error:
+ return
+
+ if self.forced_error.restart_to_resume:
+ log.error('Restart deluge to clear this torrent error')
+
+ if not self.forced_error.was_paused and self.options['auto_managed']:
+ self._set_handle_flags(
+ flag=lt.torrent_flags.auto_managed,
+ set_flag=True,
+ )
+ self.forced_error = None
+ self.set_status_message('OK')
+ if update_state:
+ self.update_state()
+
+ def get_eta(self):
+ """Get the ETA for this torrent.
+
+ Returns:
+ int: The ETA in seconds.
+
+ """
+ status = self.status
+ eta = 0
+ if (
+ self.is_finished
+ and self.options['stop_at_ratio']
+ and status.upload_payload_rate
+ ):
+ # We're a seed, so calculate the time to the 'stop_share_ratio'
+ eta = (
+ int(status.all_time_download * self.options['stop_ratio'])
+ - status.all_time_upload
+ ) // status.upload_payload_rate
+ elif status.download_payload_rate:
+ left = status.total_wanted - status.total_wanted_done
+ if left > 0:
+ eta = left // status.download_payload_rate
+
+ # Limit to 1 year, avoid excessive values and prevent GTK int overflow.
+ return eta if eta < 31557600 else -1
+
+ def get_ratio(self):
+ """Get the ratio of upload/download for this torrent.
+
+ Returns:
+ float: The ratio or -1.0 (for infinity).
+
+ """
+ if self.status.total_done > 0:
+ return self.status.all_time_upload / self.status.total_done
+ else:
+ return -1.0
+
+ def get_files(self):
+ """Get the files this torrent contains.
+
+ Returns:
+ list of dict: The files.
+
+ """
+ if not self.has_metadata:
+ return []
+
+ files = self.torrent_info.files()
+ return convert_lt_files(files)
+
+ def get_orig_files(self):
+ """Get the original filenames of files in this torrent.
+
+ Returns:
+ list of dict: The files with original filenames.
+
+ """
+ if not self.has_metadata:
+ return []
+
+ files = self.torrent_info.orig_files()
+ return convert_lt_files(files)
+
+ def get_peers(self):
+ """Get the peers for this torrent.
+
+ A list of peers and various information about them.
+
+ Returns:
+ list of dict: The peers.
+
+ The format for the peer dict::
+
+ {
+ "client": str,
+ "country": str,
+ "down_speed": int,
+ "ip": str,
+ "progress": float,
+ "seed": bool,
+ "up_speed": int
+ }
+ """
+ ret = []
+ peers = self.handle.get_peer_info()
+
+ for peer in peers:
+ # We do not want to report peers that are half-connected
+ if peer.flags & peer.connecting or peer.flags & peer.handshake:
+ continue
+
+ try:
+ client = decode_bytes(peer.client)
+ except UnicodeDecodeError:
+ # libtorrent on Py3 can raise UnicodeDecodeError for peer_info.client
+ client = 'unknown'
+
+ try:
+ country = component.get('Core').geoip_instance.country_code_by_addr(
+ peer.ip[0]
+ )
+ except AttributeError:
+ country = ''
+ else:
+ try:
+ country = ''.join(
+ [char if char.isalpha() else ' ' for char in country]
+ )
+ except TypeError:
+ country = ''
+
+ ret.append(
+ {
+ 'client': client,
+ 'country': country,
+ 'down_speed': peer.payload_down_speed,
+ 'ip': f'{peer.ip[0]}:{peer.ip[1]}',
+ 'progress': peer.progress,
+ 'seed': peer.flags & peer.seed,
+ 'up_speed': peer.payload_up_speed,
+ }
+ )
+
+ return ret
+
+ def get_queue_position(self):
+ """Get the torrents queue position
+
+ Returns:
+ int: queue position
+ """
+ return self.handle.queue_position()
+
+ def get_file_priorities(self):
+ """Return the file priorities"""
+ if not self.handle.status().has_metadata:
+ return []
+
+ if not self.options['file_priorities']:
+ # Ensure file_priorities option is populated.
+ self.set_file_priorities([])
+
+ return self.options['file_priorities']
+
+ def get_file_progress(self):
+ """Calculates the file progress as a percentage.
+
+ Returns:
+ list of floats: The file progress (0.0 -> 1.0), empty list if n/a.
+ """
+ if not self.has_metadata:
+ return []
+
+ try:
+ files_progresses = zip(
+ self.handle.file_progress(), self.torrent_info.files()
+ )
+ except Exception:
+ # Handle libtorrent >=2.0.0,<=2.0.4 file_progress error
+ files_progresses = zip(iter(lambda: 0, 1), self.torrent_info.files())
+
+ return [
+ progress / _file.size if _file.size else 0.0
+ for progress, _file in files_progresses
+ ]
+
+ def get_tracker_host(self):
+ """Get the hostname of the currently connected tracker.
+
+ If no tracker is connected, it uses the 1st tracker.
+
+ Returns:
+ str: The tracker host
+ """
+ if self.tracker_host:
+ return self.tracker_host
+
+ tracker = self.status.current_tracker
+ if not tracker and self.trackers:
+ tracker = self.trackers[0]['url']
+
+ if tracker:
+ url = urlparse(tracker.replace('udp://', 'http://'))
+ if hasattr(url, 'hostname'):
+ host = url.hostname or 'DHT'
+ # Check if hostname is an IP address and just return it if that's the case
+ try:
+ socket.inet_aton(host)
+ except OSError:
+ pass
+ else:
+ # This is an IP address because an exception wasn't raised
+ return url.hostname
+
+ parts = host.split('.')
+ if len(parts) > 2:
+ if parts[-2] in ('co', 'com', 'net', 'org') or parts[-1] == 'uk':
+ host = '.'.join(parts[-3:])
+ else:
+ host = '.'.join(parts[-2:])
+ self.tracker_host = host
+ return host
+ return ''
+
+ def get_magnet_uri(self):
+ """Returns a magnet URI for this torrent"""
+ return lt.make_magnet_uri(self.handle)
+
+ def get_name(self):
+ """The name of the torrent (distinct from the filenames).
+
+ Note:
+ Can be manually set in options through `name` key. If the key is
+ reset to empty string "" it will return the original torrent name.
+
+ Returns:
+ str: the name of the torrent.
+
+ """
+ if self.options['name']:
+ return self.options['name']
+
+ if self.has_metadata:
+ # Use the top-level folder as torrent name.
+ filename = decode_bytes(self.torrent_info.files().file_path(0))
+ name = filename.replace('\\', '/', 1).split('/', 1)[0]
+ else:
+ name = decode_bytes(self.handle.status().name)
+
+ if not name:
+ name = self.torrent_id
+
+ return name
+
+ def get_progress(self):
+ """The progress of this torrent's current task.
+
+ Returns:
+ float: The progress percentage (0 to 100).
+
+ """
+
+ def get_size(files, path):
+ """Returns total size of 'files' currently located in 'path'"""
+ files = [os.path.join(path, f) for f in files]
+ return sum(os.stat(f).st_size for f in files if os.path.exists(f))
+
+ if self.state == 'Error':
+ progress = 100.0
+ elif self.state == 'Moving':
+ # Check if torrent has downloaded any data yet.
+ if self.status.total_done:
+ torrent_files = [f['path'] for f in self.get_files()]
+ dest_path_size = get_size(torrent_files, self.moving_storage_dest_path)
+ progress = dest_path_size / self.status.total_done * 100
+ else:
+ progress = 100.0
+ else:
+ progress = self.status.progress * 100
+
+ return progress
+
+ def get_time_since_transfer(self):
+ """The time since either upload/download from peers"""
+ time_since = (self.status.time_since_download, self.status.time_since_upload)
+ try:
+ return min(x for x in time_since if x != -1)
+ except ValueError:
+ return -1
+
+ def get_status(self, keys, diff=False, update=False, all_keys=False):
+ """Returns the status of the torrent based on the keys provided
+
+ Args:
+ keys (list of str): the keys to get the status on
+ diff (bool): Will return a diff of the changes since the last
+ call to get_status based on the session_id
+ update (bool): If True the status will be updated from libtorrent
+ if False, the cached values will be returned
+ all_keys (bool): If True return all keys while ignoring the keys param
+ if False, return only the requested keys
+
+ Returns:
+ dict: a dictionary of the status keys and their values
+ """
+ if update:
+ self.get_lt_status()
+
+ if all_keys:
+ keys = list(self.status_funcs)
+
+ status_dict = {}
+
+ for key in keys:
+ status_dict[key] = self.status_funcs[key]()
+
+ if diff:
+ session_id = self.rpcserver.get_session_id()
+ if session_id in self.prev_status:
+ # We have a previous status dict, so lets make a diff
+ status_diff = {}
+ for key, value in status_dict.items():
+ if key in self.prev_status[session_id]:
+ if value != self.prev_status[session_id][key]:
+ status_diff[key] = value
+ else:
+ status_diff[key] = value
+
+ self.prev_status[session_id] = status_dict
+ return status_diff
+
+ self.prev_status[session_id] = status_dict
+ return status_dict
+
+ return status_dict
+
+ def get_lt_status(self) -> 'lt.torrent_status':
+ """Get the torrent status fresh, not from cache.
+
+ This should be used when a guaranteed fresh status is needed rather than
+ `torrent.handle.status()` because it will update the cache as well.
+ """
+ self.status = self.handle.status()
+ return self.status
+
+ @property
+ def status(self) -> 'lt.torrent_status':
+ """Cached copy of the libtorrent status for this torrent.
+
+ If it has not been updated within the last five seconds, it will be
+ automatically refreshed.
+ """
+ if self._status_last_update < (time.time() - 5):
+ self.status = self.handle.status()
+ return self._status
+
+ @status.setter
+ def status(self, status: 'lt.torrent_status') -> None:
+ """Updates the cached status.
+
+ Args:
+ status: a libtorrent torrent status
+ """
+ self._status = status
+ self._status_last_update = time.time()
+
+ def _create_status_funcs(self):
+ """Creates the functions for getting torrent status"""
+ self.status_funcs = {
+ 'active_time': lambda: self.status.active_time,
+ 'seeding_time': lambda: self.status.seeding_time,
+ 'finished_time': lambda: self.status.finished_time,
+ 'all_time_download': lambda: self.status.all_time_download,
+ 'storage_mode': lambda: self.status.storage_mode.name.split('_')[
+ 2
+ ], # sparse or allocate
+ 'distributed_copies': lambda: max(0.0, self.status.distributed_copies),
+ 'download_payload_rate': lambda: self.status.download_payload_rate,
+ 'file_priorities': self.get_file_priorities,
+ 'hash': lambda: self.torrent_id,
+ 'auto_managed': lambda: self.options['auto_managed'],
+ 'is_auto_managed': lambda: self.options['auto_managed'],
+ 'is_finished': lambda: self.is_finished,
+ 'max_connections': lambda: self.options['max_connections'],
+ 'max_download_speed': lambda: self.options['max_download_speed'],
+ 'max_upload_slots': lambda: self.options['max_upload_slots'],
+ 'max_upload_speed': lambda: self.options['max_upload_speed'],
+ 'message': lambda: self.statusmsg,
+ 'move_on_completed_path': lambda: self.options[
+ 'move_completed_path'
+ ], # Deprecated: move_completed_path
+ 'move_on_completed': lambda: self.options[
+ 'move_completed'
+ ], # Deprecated: Use move_completed
+ 'move_completed_path': lambda: self.options['move_completed_path'],
+ 'move_completed': lambda: self.options['move_completed'],
+ 'next_announce': lambda: self.status.next_announce.seconds,
+ 'num_peers': lambda: self.status.num_peers - self.status.num_seeds,
+ 'num_seeds': lambda: self.status.num_seeds,
+ 'owner': lambda: self.options['owner'],
+ 'paused': lambda: self.status.paused,
+ 'prioritize_first_last': lambda: self.options[
+ 'prioritize_first_last_pieces'
+ ],
+ # Deprecated: Use prioritize_first_last_pieces
+ 'prioritize_first_last_pieces': lambda: self.options[
+ 'prioritize_first_last_pieces'
+ ],
+ 'sequential_download': lambda: self.options['sequential_download'],
+ 'progress': self.get_progress,
+ 'shared': lambda: self.options['shared'],
+ 'remove_at_ratio': lambda: self.options['remove_at_ratio'],
+ 'save_path': lambda: self.options[
+ 'download_location'
+ ], # Deprecated: Use download_location
+ 'download_location': lambda: self.options['download_location'],
+ 'seeds_peers_ratio': lambda: -1.0
+ if self.status.num_incomplete == 0
+ else ( # Use -1.0 to signify infinity
+ self.status.num_complete / self.status.num_incomplete
+ ),
+ 'seed_rank': lambda: self.status.seed_rank,
+ 'state': lambda: self.state,
+ 'stop_at_ratio': lambda: self.options['stop_at_ratio'],
+ 'stop_ratio': lambda: self.options['stop_ratio'],
+ 'time_added': lambda: self.status.added_time,
+ 'total_done': lambda: self.status.total_done,
+ 'total_payload_download': lambda: self.status.total_payload_download,
+ 'total_payload_upload': lambda: self.status.total_payload_upload,
+ 'total_peers': lambda: self.status.num_incomplete,
+ 'total_seeds': lambda: self.status.num_complete,
+ 'total_uploaded': lambda: self.status.all_time_upload,
+ 'total_wanted': lambda: self.status.total_wanted,
+ 'total_remaining': lambda: self.status.total_wanted
+ - self.status.total_wanted_done,
+ 'tracker': lambda: self.status.current_tracker,
+ 'tracker_host': self.get_tracker_host,
+ 'trackers': lambda: self.trackers,
+ 'tracker_status': lambda: self.tracker_status,
+ 'upload_payload_rate': lambda: self.status.upload_payload_rate,
+ 'comment': lambda: decode_bytes(self.torrent_info.comment())
+ if self.has_metadata
+ else '',
+ 'creator': lambda: decode_bytes(self.torrent_info.creator())
+ if self.has_metadata
+ else '',
+ 'num_files': lambda: self.torrent_info.num_files()
+ if self.has_metadata
+ else 0,
+ 'num_pieces': lambda: self.torrent_info.num_pieces()
+ if self.has_metadata
+ else 0,
+ 'piece_length': lambda: self.torrent_info.piece_length()
+ if self.has_metadata
+ else 0,
+ 'private': lambda: self.torrent_info.priv() if self.has_metadata else False,
+ 'total_size': lambda: self.torrent_info.total_size()
+ if self.has_metadata
+ else 0,
+ 'eta': self.get_eta,
+ 'file_progress': self.get_file_progress,
+ 'files': self.get_files,
+ 'orig_files': self.get_orig_files,
+ 'is_seed': lambda: self.status.is_seeding,
+ 'peers': self.get_peers,
+ 'queue': lambda: self.status.queue_position,
+ 'ratio': self.get_ratio,
+ 'completed_time': lambda: self.status.completed_time,
+ 'last_seen_complete': lambda: self.status.last_seen_complete,
+ 'name': self.get_name,
+ 'pieces': self._get_pieces_info,
+ 'seed_mode': lambda: self.status.seed_mode,
+ 'super_seeding': lambda: self.status.super_seeding,
+ 'time_since_download': lambda: self.status.time_since_download,
+ 'time_since_upload': lambda: self.status.time_since_upload,
+ 'time_since_transfer': self.get_time_since_transfer,
+ }
+
+ def pause(self):
+ """Pause this torrent.
+
+ Returns:
+ bool: True is successful, otherwise False.
+
+ """
+ # Turn off auto-management so the torrent will not be unpaused by lt queueing
+ self._set_handle_flags(
+ flag=lt.torrent_flags.auto_managed,
+ set_flag=False,
+ )
+ if self.state == 'Error':
+ log.debug('Unable to pause torrent while in Error state')
+ elif self.status.paused:
+ # This torrent was probably paused due to being auto managed by lt
+ # Since we turned auto_managed off, we should update the state which should
+ # show it as 'Paused'. We need to emit a torrent_paused signal because
+ # the torrent_paused alert from libtorrent will not be generated.
+ self.update_state()
+ component.get('EventManager').emit(
+ TorrentStateChangedEvent(self.torrent_id, 'Paused')
+ )
+ else:
+ try:
+ self.handle.pause()
+ except RuntimeError as ex:
+ log.debug('Unable to pause torrent: %s', ex)
+
+ def resume(self):
+ """Resumes this torrent."""
+ if self.status.paused and self.status.auto_managed:
+ log.debug('Resume not possible for auto-managed torrent!')
+ elif self.forced_error and self.forced_error.was_paused:
+ log.debug(
+ 'Resume skipped for forced_error torrent as it was originally paused.'
+ )
+ elif (
+ self.status.is_finished
+ and self.options['stop_at_ratio']
+ and self.get_ratio() >= self.options['stop_ratio']
+ ):
+ log.debug('Resume skipped for torrent as it has reached "stop_seed_ratio".')
+ else:
+ # Check if torrent was originally being auto-managed.
+ if self.options['auto_managed']:
+ self._set_handle_flags(
+ flag=lt.torrent_flags.auto_managed,
+ set_flag=True,
+ )
+ try:
+ self.handle.resume()
+ except RuntimeError as ex:
+ log.debug('Unable to resume torrent: %s', ex)
+
+ # Clear torrent error state.
+ if self.forced_error and not self.forced_error.restart_to_resume:
+ self.clear_forced_error_state()
+ elif self.state == 'Error' and not self.forced_error:
+ self.handle.clear_error()
+
+ def connect_peer(self, peer_ip, peer_port):
+ """Manually add a peer to the torrent
+
+ Args:
+ peer_ip (str) : Peer IP Address
+ peer_port (int): Peer Port
+
+ Returns:
+ bool: True is successful, otherwise False
+ """
+ try:
+ self.handle.connect_peer((peer_ip, int(peer_port)), 0)
+ except (RuntimeError, ValueError) as ex:
+ log.debug('Unable to connect to peer: %s', ex)
+ return False
+ return True
+
+ def move_storage(self, dest):
+ """Move a torrent's storage location
+
+ Args:
+ dest (str): The destination folder for the torrent data
+
+ Returns:
+ bool: True if successful, otherwise False
+
+ """
+ dest = decode_bytes(dest)
+
+ if not os.path.exists(dest):
+ try:
+ os.makedirs(dest)
+ except OSError as ex:
+ log.error(
+ 'Could not move storage for torrent %s since %s does '
+ 'not exist and could not create the directory: %s',
+ self.torrent_id,
+ dest,
+ ex,
+ )
+ return False
+
+ try:
+ # lt needs utf8 byte-string. Otherwise if wstrings enabled, unicode string.
+ # Keyword argument flags=2 (dont_replace) dont overwrite target files but delete source.
+ try:
+ self.handle.move_storage(dest.encode('utf8'), flags=2)
+ except TypeError:
+ self.handle.move_storage(dest, flags=2)
+ except RuntimeError as ex:
+ log.error('Error calling libtorrent move_storage: %s', ex)
+ return False
+ self.moving_storage_dest_path = dest
+ self.update_state()
+ return True
+
+ def save_resume_data(self, flush_disk_cache=False):
+ """Signals libtorrent to build resume data for this torrent.
+
+ Args:
+ flush_disk_cache (bool): Avoids potential issue with file timestamps
+ and is only needed when stopping the session.
+
+ Returns:
+ None: The response with resume data is returned in a libtorrent save_resume_data_alert.
+
+ """
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Requesting save_resume_data for torrent: %s', self.torrent_id)
+ flags = lt.save_resume_flags_t.flush_disk_cache if flush_disk_cache else 0
+ # Don't generate fastresume data if torrent is in a Deluge Error state.
+ if self.forced_error:
+ component.get('TorrentManager').waiting_on_resume_data[
+ self.torrent_id
+ ].errback(UserWarning('Skipped creating resume_data while in Error state'))
+ else:
+ self.handle.save_resume_data(flags)
+
+ def write_torrentfile(self, filedump=None):
+ """Writes the torrent file to the state dir and optional 'copy of' dir.
+
+ Args:
+ filedump (str, optional): bencoded filedump of a torrent file.
+
+ """
+
+ def write_file(filepath, filedump):
+ """Write out the torrent file"""
+ log.debug('Writing torrent file to: %s', filepath)
+ try:
+ with open(filepath, 'wb') as save_file:
+ save_file.write(filedump)
+ except OSError as ex:
+ log.error('Unable to save torrent file to: %s', ex)
+
+ filepath = os.path.join(get_config_dir(), 'state', self.torrent_id + '.torrent')
+
+ if filedump is None:
+ lt_ct = lt.create_torrent(self.torrent_info)
+ filedump = lt.bencode(lt_ct.generate())
+
+ write_file(filepath, filedump)
+
+ # If the user has requested a copy of the torrent be saved elsewhere we need to do that.
+ if self.config['copy_torrent_file']:
+ if not self.filename:
+ self.filename = self.get_name() + '.torrent'
+ filepath = os.path.join(self.config['torrentfiles_location'], self.filename)
+ write_file(filepath, filedump)
+
+ def delete_torrentfile(self, delete_copies=False):
+ """Deletes the .torrent file in the state directory in config"""
+ torrent_files = [
+ os.path.join(get_config_dir(), 'state', self.torrent_id + '.torrent')
+ ]
+ if delete_copies and self.filename:
+ torrent_files.append(
+ os.path.join(self.config['torrentfiles_location'], self.filename)
+ )
+
+ for torrent_file in torrent_files:
+ log.debug('Deleting torrent file: %s', torrent_file)
+ try:
+ os.remove(torrent_file)
+ except OSError as ex:
+ log.warning('Unable to delete the torrent file: %s', ex)
+
+ def force_reannounce(self):
+ """Force a tracker reannounce"""
+ try:
+ self.handle.force_reannounce()
+ except RuntimeError as ex:
+ log.debug('Unable to force reannounce: %s', ex)
+ return False
+ return True
+
+ def scrape_tracker(self):
+ """Scrape the tracker
+
+ A scrape request queries the tracker for statistics such as total
+ number of incomplete peers, complete peers, number of downloads etc.
+ """
+ try:
+ self.handle.scrape_tracker()
+ except RuntimeError as ex:
+ log.debug('Unable to scrape tracker: %s', ex)
+ return False
+ return True
+
+ def force_recheck(self):
+ """Forces a recheck of the torrent's pieces"""
+ if self.forced_error:
+ self.forcing_recheck_paused = self.forced_error.was_paused
+ self.clear_forced_error_state(update_state=False)
+ else:
+ self.forcing_recheck_paused = self.status.paused
+
+ try:
+ self.handle.force_recheck()
+ self.handle.resume()
+ self.forcing_recheck = True
+ except RuntimeError as ex:
+ log.debug('Unable to force recheck: %s', ex)
+ self.forcing_recheck = False
+ return self.forcing_recheck
+
+ def rename_files(self, filenames):
+ """Renames files in the torrent.
+
+ Args:
+ filenames (list): A list of (index, filename) pairs.
+ """
+ for index, filename in filenames:
+ # Make sure filename is a sanitized unicode string.
+ filename = sanitize_filepath(decode_bytes(filename))
+ # lt needs utf8 byte-string. Otherwise if wstrings enabled, unicode string.
+ try:
+ self.handle.rename_file(index, filename.encode('utf8'))
+ except (UnicodeDecodeError, TypeError):
+ self.handle.rename_file(index, filename)
+
+ def rename_folder(self, folder, new_folder):
+ """Renames a folder within a torrent.
+
+ This basically does a file rename on all of the folders children.
+
+ Args:
+ folder (str): The original folder name
+ new_folder (str): The new folder name
+
+ Returns:
+ twisted.internet.defer.Deferred: A deferred which fires when the rename is complete
+ """
+ log.debug('Attempting to rename folder: %s to %s', folder, new_folder)
+
+ # Empty string means remove the dir and move its content to the parent
+ if len(new_folder) > 0:
+ new_folder = sanitize_filepath(new_folder, folder=True)
+
+ def on_file_rename_complete(dummy_result, wait_dict, index):
+ """File rename complete"""
+ wait_dict.pop(index, None)
+
+ wait_on_folder = {}
+ self.waiting_on_folder_rename.append(wait_on_folder)
+ for _file in self.get_files():
+ if _file['path'].startswith(folder):
+ # Keep track of filerenames we're waiting on
+ wait_on_folder[_file['index']] = Deferred().addBoth(
+ on_file_rename_complete, wait_on_folder, _file['index']
+ )
+ new_path = _file['path'].replace(folder, new_folder, 1)
+ try:
+ self.handle.rename_file(_file['index'], new_path.encode('utf8'))
+ except (UnicodeDecodeError, TypeError):
+ self.handle.rename_file(_file['index'], new_path)
+
+ def on_folder_rename_complete(dummy_result, torrent, folder, new_folder):
+ """Folder rename complete"""
+ component.get('EventManager').emit(
+ TorrentFolderRenamedEvent(torrent.torrent_id, folder, new_folder)
+ )
+ # Empty folders are removed after libtorrent folder renames
+ self.remove_empty_folders(folder)
+ torrent.waiting_on_folder_rename = [
+ _dir for _dir in torrent.waiting_on_folder_rename if _dir
+ ]
+ component.get('TorrentManager').save_resume_data((self.torrent_id,))
+
+ d = DeferredList(list(wait_on_folder.values()))
+ d.addBoth(on_folder_rename_complete, self, folder, new_folder)
+ return d
+
+ def remove_empty_folders(self, folder):
+ """Recursively removes folders but only if they are empty.
+
+ This cleans up after libtorrent folder renames.
+
+ Args:
+ folder (str): The folder to recursively check
+ """
+ # Removes leading slashes that can cause join to ignore download_location
+ download_location = self.options['download_location']
+ folder_full_path = os.path.normpath(
+ os.path.join(download_location, folder.lstrip('\\/'))
+ )
+
+ try:
+ if not os.listdir(folder_full_path):
+ os.removedirs(folder_full_path)
+ log.debug('Removed Empty Folder %s', folder_full_path)
+ else:
+ for root, dirs, dummy_files in os.walk(folder_full_path, topdown=False):
+ for name in dirs:
+ try:
+ os.removedirs(os.path.join(root, name))
+ log.debug(
+ 'Removed Empty Folder %s', os.path.join(root, name)
+ )
+ except OSError as ex:
+ log.debug(ex)
+
+ except OSError as ex:
+ log.debug('Cannot Remove Folder: %s', ex)
+
+ def cleanup_prev_status(self):
+ """Checks the validity of the keys in the prev_status dict.
+
+ If the key is no longer valid, the dict will be deleted.
+ """
+ # Dict will be modified so iterate over generated list
+ for key in list(self.prev_status):
+ if not self.rpcserver.is_session_valid(key):
+ del self.prev_status[key]
+
+ def _get_pieces_info(self):
+ """Get the pieces for this torrent."""
+ if not self.has_metadata or self.status.is_seeding:
+ pieces = None
+ else:
+ pieces = []
+ for piece, avail_piece in zip(
+ self.status.pieces, self.handle.piece_availability()
+ ):
+ if piece:
+ pieces.append(3) # Completed.
+ elif avail_piece:
+ pieces.append(
+ 1
+ ) # Available, just not downloaded nor being downloaded.
+ else:
+ pieces.append(
+ 0
+ ) # Missing, no known peer with piece, or not asked for yet.
+
+ for peer_info in self.handle.get_peer_info():
+ if peer_info.downloading_piece_index >= 0:
+ pieces[
+ peer_info.downloading_piece_index
+ ] = 2 # Being downloaded from peer.
+
+ return pieces
diff --git a/deluge/core/torrentmanager.py b/deluge/core/torrentmanager.py
new file mode 100644
index 0000000..c43a7a2
--- /dev/null
+++ b/deluge/core/torrentmanager.py
@@ -0,0 +1,1700 @@
+#
+# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com>
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+
+"""TorrentManager handles Torrent objects"""
+import datetime
+import logging
+import operator
+import os
+import pickle
+import time
+from base64 import b64encode
+from tempfile import gettempdir
+from typing import Dict, List, NamedTuple, Tuple
+
+from twisted.internet import defer, reactor, threads
+from twisted.internet.defer import Deferred, DeferredList
+from twisted.internet.task import LoopingCall
+
+import deluge.component as component
+from deluge._libtorrent import LT_VERSION, lt
+from deluge.common import (
+ VersionSplit,
+ archive_files,
+ decode_bytes,
+ get_magnet_info,
+ is_magnet,
+)
+from deluge.configmanager import ConfigManager, get_config_dir
+from deluge.core.authmanager import AUTH_LEVEL_ADMIN
+from deluge.core.torrent import Torrent, TorrentOptions, sanitize_filepath
+from deluge.decorators import maybe_coroutine
+from deluge.error import AddTorrentError, InvalidTorrentError
+from deluge.event import (
+ ExternalIPEvent,
+ PreTorrentRemovedEvent,
+ SessionStartedEvent,
+ TorrentAddedEvent,
+ TorrentFileCompletedEvent,
+ TorrentFileRenamedEvent,
+ TorrentFinishedEvent,
+ TorrentRemovedEvent,
+ TorrentResumedEvent,
+)
+
+log = logging.getLogger(__name__)
+
+LT_DEFAULT_ADD_TORRENT_FLAGS = (
+ lt.torrent_flags.paused
+ | lt.torrent_flags.auto_managed
+ | lt.torrent_flags.update_subscribe
+ | lt.torrent_flags.apply_ip_filter
+)
+
+
+class PrefetchQueueItem(NamedTuple):
+ alert_deferred: Deferred
+ result_queue: List[Deferred]
+
+
+class TorrentState: # pylint: disable=old-style-class
+ """Create a torrent state.
+
+ Note:
+ This must be old style class to avoid breaking torrent.state file.
+
+ """
+
+ def __init__(
+ self,
+ torrent_id=None,
+ filename=None,
+ trackers=None,
+ storage_mode='sparse',
+ paused=False,
+ save_path=None,
+ max_connections=-1,
+ max_upload_slots=-1,
+ max_upload_speed=-1.0,
+ max_download_speed=-1.0,
+ prioritize_first_last=False,
+ sequential_download=False,
+ file_priorities=None,
+ queue=None,
+ auto_managed=True,
+ is_finished=False,
+ stop_ratio=2.00,
+ stop_at_ratio=False,
+ remove_at_ratio=False,
+ move_completed=False,
+ move_completed_path=None,
+ magnet=None,
+ owner=None,
+ shared=False,
+ super_seeding=False,
+ name=None,
+ ):
+ # Build the class attribute list from args
+ for key, value in locals().items():
+ if key == 'self':
+ continue
+ setattr(self, key, value)
+
+ def __eq__(self, other):
+ return isinstance(other, TorrentState) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not self == other
+
+
+class TorrentManagerState: # pylint: disable=old-style-class
+ """TorrentManagerState holds a list of TorrentState objects.
+
+ Note:
+ This must be old style class to avoid breaking torrent.state file.
+
+ """
+
+ def __init__(self):
+ self.torrents = []
+
+ def __eq__(self, other):
+ return (
+ isinstance(other, TorrentManagerState) and self.torrents == other.torrents
+ )
+
+ def __ne__(self, other):
+ return not self == other
+
+
+class TorrentManager(component.Component):
+ """TorrentManager contains a list of torrents in the current libtorrent session.
+
+ This object is also responsible for saving the state of the session for use on restart.
+
+ """
+
+ # This is used in the test to mock out timeouts
+ clock = reactor
+
+ def __init__(self):
+ component.Component.__init__(
+ self,
+ 'TorrentManager',
+ interval=5,
+ depend=['CorePluginManager', 'AlertManager'],
+ )
+ log.debug('TorrentManager init...')
+ # Set the libtorrent session
+ self.session = component.get('Core').session
+ # Set the alertmanager
+ self.alerts = component.get('AlertManager')
+ # Get the core config
+ self.config = ConfigManager('core.conf')
+
+ # Make sure the state folder has been created
+ self.state_dir = os.path.join(get_config_dir(), 'state')
+ if not os.path.exists(self.state_dir):
+ os.makedirs(self.state_dir)
+ self.temp_file = os.path.join(self.state_dir, '.safe_state_check')
+
+ # Create the torrents dict { torrent_id: Torrent }
+ self.torrents = {}
+ self.queued_torrents = set()
+ self.is_saving_state = False
+ self.save_resume_data_file_lock = defer.DeferredLock()
+ self.torrents_loading = {}
+ self.prefetching_metadata: Dict[str, PrefetchQueueItem] = {}
+
+ # This is a map of torrent_ids to Deferreds used to track needed resume data.
+ # The Deferreds will be completed when resume data has been saved.
+ self.waiting_on_resume_data = {}
+
+ # Keep track of torrents finished but moving storage
+ self.waiting_on_finish_moving = []
+
+ # Keeps track of resume data
+ self.resume_data = {}
+
+ self.torrents_status_requests = []
+ self.status_dict = {}
+ self.last_state_update_alert_ts = 0
+
+ # Keep the previous saved state
+ self.prev_saved_state = None
+
+ # Register set functions
+ set_config_keys = [
+ 'max_connections_per_torrent',
+ 'max_upload_slots_per_torrent',
+ 'max_upload_speed_per_torrent',
+ 'max_download_speed_per_torrent',
+ ]
+
+ for config_key in set_config_keys:
+ on_set_func = getattr(self, ''.join(['on_set_', config_key]))
+ self.config.register_set_function(config_key, on_set_func)
+
+ # Register alert functions
+ alert_handles = [
+ 'external_ip',
+ 'performance',
+ 'add_torrent',
+ 'metadata_received',
+ 'torrent_finished',
+ 'torrent_paused',
+ 'torrent_checked',
+ 'torrent_resumed',
+ 'tracker_reply',
+ 'tracker_announce',
+ 'tracker_warning',
+ 'tracker_error',
+ 'file_renamed',
+ 'file_error',
+ 'file_completed',
+ 'storage_moved',
+ 'storage_moved_failed',
+ 'state_update',
+ 'state_changed',
+ 'save_resume_data',
+ 'save_resume_data_failed',
+ 'fastresume_rejected',
+ ]
+
+ for alert_handle in alert_handles:
+ on_alert_func = getattr(self, ''.join(['on_alert_', alert_handle]))
+ self.alerts.register_handler(alert_handle, on_alert_func)
+
+ # Define timers
+ self.save_state_timer = LoopingCall(self.save_state)
+ self.save_resume_data_timer = LoopingCall(self.save_resume_data)
+ self.prev_status_cleanup_loop = LoopingCall(self.cleanup_torrents_prev_status)
+
+ def start(self):
+ # Check for old temp file to verify safe shutdown
+ if os.path.isfile(self.temp_file):
+ self.archive_state('Bad shutdown detected so archiving state files')
+ os.remove(self.temp_file)
+
+ with open(self.temp_file, 'a'):
+ os.utime(self.temp_file, None)
+
+ # Try to load the state from file
+ self.load_state()
+
+ # Save the state periodically
+ self.save_state_timer.start(200, False)
+ self.save_resume_data_timer.start(190, False)
+ self.prev_status_cleanup_loop.start(10)
+
+ @maybe_coroutine
+ async def stop(self):
+ # Stop timers
+ if self.save_state_timer.running:
+ self.save_state_timer.stop()
+
+ if self.save_resume_data_timer.running:
+ self.save_resume_data_timer.stop()
+
+ if self.prev_status_cleanup_loop.running:
+ self.prev_status_cleanup_loop.stop()
+
+ # Save state on shutdown
+ await self.save_state()
+
+ self.session.pause()
+
+ result = await self.save_resume_data(flush_disk_cache=True)
+ # Remove the temp_file to signify successfully saved state
+ if result and os.path.isfile(self.temp_file):
+ os.remove(self.temp_file)
+
+ def update(self):
+ for torrent_id, torrent in self.torrents.items():
+ # XXX: Should the state check be those that _can_ be stopped at ratio
+ if torrent.options['stop_at_ratio'] and torrent.state not in (
+ 'Checking',
+ 'Allocating',
+ 'Paused',
+ 'Queued',
+ ):
+ if (
+ torrent.get_ratio() >= torrent.options['stop_ratio']
+ and torrent.is_finished
+ ):
+ if torrent.options['remove_at_ratio']:
+ self.remove(torrent_id)
+ break
+
+ torrent.pause()
+
+ def __getitem__(self, torrent_id):
+ """Return the Torrent with torrent_id.
+
+ Args:
+ torrent_id (str): The torrent_id.
+
+ Returns:
+ Torrent: A torrent object.
+
+ """
+ return self.torrents[torrent_id]
+
+ def get_torrent_list(self):
+ """Creates a list of torrent_ids, owned by current user and any marked shared.
+
+ Returns:
+ list: A list of torrent_ids.
+
+ """
+ torrent_ids = list(self.torrents)
+ if component.get('RPCServer').get_session_auth_level() == AUTH_LEVEL_ADMIN:
+ return torrent_ids
+
+ current_user = component.get('RPCServer').get_session_user()
+ for torrent_id in torrent_ids[:]:
+ torrent_status = self.torrents[torrent_id].get_status(['owner', 'shared'])
+ if torrent_status['owner'] != current_user and not torrent_status['shared']:
+ torrent_ids.pop(torrent_ids.index(torrent_id))
+ return torrent_ids
+
+ def get_torrent_info_from_file(self, filepath):
+ """Retrieves torrent_info from the file specified.
+
+ Args:
+ filepath (str): The filepath to extract torrent info from.
+
+ Returns:
+ lt.torrent_info: A libtorrent torrent_info dict or None if invalid file or data.
+
+ """
+ # Get the torrent data from the torrent file
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Attempting to extract torrent_info from %s', filepath)
+ try:
+ torrent_info = lt.torrent_info(filepath)
+ except RuntimeError as ex:
+ log.warning('Unable to open torrent file %s: %s', filepath, ex)
+ else:
+ return torrent_info
+
+ @maybe_coroutine
+ async def prefetch_metadata(self, magnet: str, timeout: int) -> Tuple[str, bytes]:
+ """Download the metadata for a magnet URI.
+
+ Args:
+ magnet: A magnet URI to download the metadata for.
+ timeout: Number of seconds to wait before canceling.
+
+ Returns:
+ A tuple of (torrent_id, metadata)
+
+ """
+
+ torrent_id = get_magnet_info(magnet)['info_hash']
+ if torrent_id in self.prefetching_metadata:
+ d = Deferred()
+ self.prefetching_metadata[torrent_id].result_queue.append(d)
+ return await d
+
+ add_torrent_params = lt.parse_magnet_uri(magnet)
+ add_torrent_params.save_path = gettempdir()
+ add_torrent_params.flags = (
+ (
+ LT_DEFAULT_ADD_TORRENT_FLAGS
+ | lt.torrent_flags.duplicate_is_error
+ | lt.torrent_flags.upload_mode
+ )
+ ^ lt.torrent_flags.auto_managed
+ ^ lt.torrent_flags.paused
+ )
+
+ torrent_handle = self.session.add_torrent(add_torrent_params)
+
+ d = Deferred()
+ # Cancel the defer if timeout reached.
+ d.addTimeout(timeout, self.clock)
+ self.prefetching_metadata[torrent_id] = PrefetchQueueItem(d, [])
+
+ try:
+ torrent_info = await d
+ except (defer.TimeoutError, defer.CancelledError):
+ log.debug(f'Prefetching metadata for {torrent_id} timed out or cancelled.')
+ metadata = b''
+ else:
+ log.debug('prefetch metadata received')
+ if VersionSplit(LT_VERSION) < VersionSplit('2.0.0.0'):
+ metadata = torrent_info.metadata()
+ else:
+ metadata = torrent_info.info_section()
+
+ log.debug('remove prefetch magnet from session')
+ result_queue = self.prefetching_metadata.pop(torrent_id).result_queue
+ self.session.remove_torrent(torrent_handle, 1)
+ result = torrent_id, b64encode(metadata)
+
+ for d in result_queue:
+ d.callback(result)
+ return result
+
+ def _build_torrent_options(self, options):
+ """Load default options and update if needed."""
+ _options = TorrentOptions()
+ if options:
+ _options.update(options)
+ options = _options
+
+ if not options['owner']:
+ options['owner'] = component.get('RPCServer').get_session_user()
+ if not component.get('AuthManager').has_account(options['owner']):
+ options['owner'] = 'localclient'
+
+ return options
+
+ def _build_torrent_params(
+ self, torrent_info=None, magnet=None, options=None, resume_data=None
+ ):
+ """Create the add_torrent_params dict for adding torrent to libtorrent."""
+ add_torrent_params = {}
+ if torrent_info:
+ add_torrent_params['ti'] = torrent_info
+ name = torrent_info.name()
+ if not name:
+ name = (
+ torrent_info.file_at(0).path.replace('\\', '/', 1).split('/', 1)[0]
+ )
+ add_torrent_params['name'] = name
+ torrent_id = str(torrent_info.info_hash())
+ elif magnet:
+ magnet_info = get_magnet_info(magnet)
+ if magnet_info:
+ add_torrent_params['name'] = magnet_info['name']
+ add_torrent_params['trackers'] = list(magnet_info['trackers'])
+ torrent_id = magnet_info['info_hash']
+ add_torrent_params['info_hash'] = bytes(bytearray.fromhex(torrent_id))
+ else:
+ raise AddTorrentError(
+ 'Unable to add magnet, invalid magnet info: %s' % magnet
+ )
+
+ # Check for existing torrent in session.
+ if torrent_id in self.get_torrent_list():
+ # Attempt merge trackers before returning.
+ self.torrents[torrent_id].merge_trackers(torrent_info)
+ raise AddTorrentError('Torrent already in session (%s).' % torrent_id)
+ elif torrent_id in self.torrents_loading:
+ raise AddTorrentError('Torrent already being added (%s).' % torrent_id)
+ elif torrent_id in self.prefetching_metadata:
+ # Cancel and remove metadata fetching torrent.
+ self.prefetching_metadata[torrent_id].alert_deferred.cancel()
+
+ # Check for renamed files and if so, rename them in the torrent_info before adding.
+ if options['mapped_files'] and torrent_info:
+ for index, fname in options['mapped_files'].items():
+ fname = sanitize_filepath(decode_bytes(fname))
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('renaming file index %s to %s', index, fname)
+ try:
+ torrent_info.rename_file(index, fname.encode('utf8'))
+ except TypeError:
+ torrent_info.rename_file(index, fname)
+ add_torrent_params['ti'] = torrent_info
+
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('options: %s', options)
+
+ # Fill in the rest of the add_torrent_params dictionary.
+ add_torrent_params['save_path'] = options['download_location'].encode('utf8')
+ if options['name']:
+ add_torrent_params['name'] = options['name']
+ if options['pre_allocate_storage']:
+ add_torrent_params['storage_mode'] = lt.storage_mode_t.storage_mode_allocate
+ if resume_data:
+ add_torrent_params['resume_data'] = resume_data
+
+ # Set flags: enable duplicate_is_error & override_resume_data, disable auto_managed.
+ add_torrent_params['flags'] = (
+ LT_DEFAULT_ADD_TORRENT_FLAGS | lt.torrent_flags.duplicate_is_error
+ ) ^ lt.torrent_flags.auto_managed
+ if options['seed_mode']:
+ add_torrent_params['flags'] |= lt.torrent_flags.seed_mode
+ if options['super_seeding']:
+ add_torrent_params['flags'] |= lt.torrent_flags.super_seeding
+
+ return torrent_id, add_torrent_params
+
+ def add(
+ self,
+ torrent_info=None,
+ state=None,
+ options=None,
+ save_state=True,
+ filedump=None,
+ filename=None,
+ magnet=None,
+ resume_data=None,
+ ):
+ """Adds a torrent to the torrent manager.
+
+ Args:
+ torrent_info (lt.torrent_info, optional): A libtorrent torrent_info object.
+ state (TorrentState, optional): The torrent state.
+ options (dict, optional): The options to apply to the torrent on adding.
+ save_state (bool, optional): If True save the session state after adding torrent, defaults to True.
+ filedump (str, optional): bencoded filedump of a torrent file.
+ filename (str, optional): The filename of the torrent file.
+ magnet (str, optional): The magnet URI.
+ resume_data (lt.entry, optional): libtorrent fast resume data.
+
+ Returns:
+ str: If successful the torrent_id of the added torrent, None if adding the torrent failed.
+
+ Emits:
+ TorrentAddedEvent: Torrent with torrent_id added to session.
+
+ """
+ if not torrent_info and not filedump and not magnet:
+ raise AddTorrentError(
+ 'You must specify a valid torrent_info, torrent state or magnet.'
+ )
+
+ if filedump:
+ try:
+ torrent_info = lt.torrent_info(lt.bdecode(filedump))
+ except RuntimeError as ex:
+ raise AddTorrentError(
+ 'Unable to add torrent, decoding filedump failed: %s' % ex
+ )
+
+ options = self._build_torrent_options(options)
+ __, add_torrent_params = self._build_torrent_params(
+ torrent_info, magnet, options, resume_data
+ )
+
+ # We need to pause the AlertManager momentarily to prevent alerts
+ # for this torrent being generated before a Torrent object is created.
+ component.pause('AlertManager')
+
+ try:
+ handle = self.session.add_torrent(add_torrent_params)
+ if not handle.is_valid():
+ raise InvalidTorrentError('Torrent handle is invalid!')
+ except (RuntimeError, InvalidTorrentError) as ex:
+ component.resume('AlertManager')
+ raise AddTorrentError('Unable to add torrent to session: %s' % ex)
+
+ torrent = self._add_torrent_obj(
+ handle, options, state, filename, magnet, resume_data, filedump, save_state
+ )
+ return torrent.torrent_id
+
+ def add_async(
+ self,
+ torrent_info=None,
+ state=None,
+ options=None,
+ save_state=True,
+ filedump=None,
+ filename=None,
+ magnet=None,
+ resume_data=None,
+ ):
+ """Adds a torrent to the torrent manager using libtorrent async add torrent method.
+
+ Args:
+ torrent_info (lt.torrent_info, optional): A libtorrent torrent_info object.
+ state (TorrentState, optional): The torrent state.
+ options (dict, optional): The options to apply to the torrent on adding.
+ save_state (bool, optional): If True save the session state after adding torrent, defaults to True.
+ filedump (str, optional): bencoded filedump of a torrent file.
+ filename (str, optional): The filename of the torrent file.
+ magnet (str, optional): The magnet URI.
+ resume_data (lt.entry, optional): libtorrent fast resume data.
+
+ Returns:
+ Deferred: If successful the torrent_id of the added torrent, None if adding the torrent failed.
+
+ Emits:
+ TorrentAddedEvent: Torrent with torrent_id added to session.
+
+ """
+ if not torrent_info and not filedump and not magnet:
+ raise AddTorrentError(
+ 'You must specify a valid torrent_info, torrent state or magnet.'
+ )
+
+ if filedump:
+ try:
+ torrent_info = lt.torrent_info(lt.bdecode(filedump))
+ except RuntimeError as ex:
+ raise AddTorrentError(
+ 'Unable to add torrent, decoding filedump failed: %s' % ex
+ )
+
+ options = self._build_torrent_options(options)
+ torrent_id, add_torrent_params = self._build_torrent_params(
+ torrent_info, magnet, options, resume_data
+ )
+
+ d = Deferred()
+ self.torrents_loading[torrent_id] = (
+ d,
+ options,
+ state,
+ filename,
+ magnet,
+ resume_data,
+ filedump,
+ save_state,
+ )
+ try:
+ self.session.async_add_torrent(add_torrent_params)
+ except RuntimeError as ex:
+ raise AddTorrentError('Unable to add torrent to session: %s' % ex)
+ return d
+
+ def _add_torrent_obj(
+ self,
+ handle,
+ options,
+ state,
+ filename,
+ magnet,
+ resume_data,
+ filedump,
+ save_state,
+ ):
+ # For magnets added with metadata, filename is used so set as magnet.
+ if not magnet and is_magnet(filename):
+ magnet = filename
+ filename = None
+
+ # Create a Torrent object and add to the dictionary.
+ torrent = Torrent(handle, options, state, filename, magnet)
+ self.torrents[torrent.torrent_id] = torrent
+
+ # Resume AlertManager if paused for adding torrent to libtorrent.
+ component.resume('AlertManager')
+
+ # Store the original resume_data, in case of errors.
+ if resume_data:
+ self.resume_data[torrent.torrent_id] = resume_data
+
+ # Add to queued torrents set.
+ self.queued_torrents.add(torrent.torrent_id)
+ if self.config['queue_new_to_top']:
+ self.queue_top(torrent.torrent_id)
+
+ # Resume the torrent if needed.
+ if not options['add_paused']:
+ torrent.resume()
+
+ # Emit torrent_added signal.
+ from_state = state is not None
+ component.get('EventManager').emit(
+ TorrentAddedEvent(torrent.torrent_id, from_state)
+ )
+
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Torrent added: %s', str(handle.info_hash()))
+ if log.isEnabledFor(logging.INFO):
+ name_and_owner = torrent.get_status(['name', 'owner'])
+ log.info(
+ 'Torrent %s from user "%s" %s',
+ name_and_owner['name'],
+ name_and_owner['owner'],
+ from_state and 'loaded' or 'added',
+ )
+
+ # Write the .torrent file to the state directory.
+ if filedump:
+ torrent.write_torrentfile(filedump)
+
+ # Save the session state.
+ if save_state:
+ self.save_state()
+
+ return torrent
+
+ def add_async_callback(
+ self,
+ handle,
+ d,
+ options,
+ state,
+ filename,
+ magnet,
+ resume_data,
+ filedump,
+ save_state,
+ ):
+ torrent = self._add_torrent_obj(
+ handle, options, state, filename, magnet, resume_data, filedump, save_state
+ )
+
+ d.callback(torrent.torrent_id)
+
+ def remove(self, torrent_id, remove_data=False, save_state=True):
+ """Remove a torrent from the session.
+
+ Args:
+ torrent_id (str): The torrent ID to remove.
+ remove_data (bool, optional): If True, remove the downloaded data, defaults to False.
+ save_state (bool, optional): If True, save the session state after removal, defaults to True.
+
+ Returns:
+ bool: True if removed successfully, False if not.
+
+ Emits:
+ PreTorrentRemovedEvent: Torrent is about to be removed from session.
+ TorrentRemovedEvent: Torrent with torrent_id removed from session.
+
+ Raises:
+ InvalidTorrentError: If the torrent_id is not in the session.
+
+ """
+ try:
+ torrent = self.torrents[torrent_id]
+ except KeyError:
+ raise InvalidTorrentError('torrent_id %s not in session.' % torrent_id)
+
+ torrent_name = torrent.get_status(['name'])['name']
+
+ # Emit the signal to the clients
+ component.get('EventManager').emit(PreTorrentRemovedEvent(torrent_id))
+
+ try:
+ self.session.remove_torrent(torrent.handle, 1 if remove_data else 0)
+ except RuntimeError as ex:
+ log.warning('Error removing torrent: %s', ex)
+ return False
+
+ # Remove fastresume data if it is exists
+ self.resume_data.pop(torrent_id, None)
+
+ # Remove the .torrent file in the state and copy location, if user requested.
+ delete_copies = (
+ self.config['copy_torrent_file'] and self.config['del_copy_torrent_file']
+ )
+ torrent.delete_torrentfile(delete_copies)
+
+ # Remove from set if it wasn't finished
+ if not torrent.is_finished:
+ try:
+ self.queued_torrents.remove(torrent_id)
+ except KeyError:
+ log.debug('%s is not in queued torrents set.', torrent_id)
+ raise InvalidTorrentError(
+ '%s is not in queued torrents set.' % torrent_id
+ )
+
+ # Remove the torrent from deluge's session
+ del self.torrents[torrent_id]
+
+ if save_state:
+ self.save_state()
+
+ # Emit the signal to the clients
+ component.get('EventManager').emit(TorrentRemovedEvent(torrent_id))
+ log.info(
+ 'Torrent %s removed by user: %s',
+ torrent_name,
+ component.get('RPCServer').get_session_user(),
+ )
+ return True
+
+ def fixup_state(self, state):
+ """Fixup an old state by adding missing TorrentState options and assigning default values.
+
+ Args:
+ state (TorrentManagerState): A torrentmanager state containing torrent details.
+
+ Returns:
+ TorrentManagerState: A fixedup TorrentManager state.
+
+ """
+ if state.torrents:
+ t_state_tmp = TorrentState()
+ if dir(state.torrents[0]) != dir(t_state_tmp):
+ self.archive_state('Migration of TorrentState required.')
+ try:
+ for attr in set(dir(t_state_tmp)) - set(dir(state.torrents[0])):
+ for t_state in state.torrents:
+ setattr(t_state, attr, getattr(t_state_tmp, attr, None))
+ except AttributeError as ex:
+ log.error(
+ 'Unable to update state file to a compatible version: %s', ex
+ )
+ return state
+
+ def open_state(self):
+ """Open the torrents.state file containing a TorrentManager state with session torrents.
+
+ Returns:
+ TorrentManagerState: The TorrentManager state.
+
+ """
+ torrents_state = os.path.join(self.state_dir, 'torrents.state')
+ state = None
+ for filepath in (torrents_state, torrents_state + '.bak'):
+ log.info('Loading torrent state: %s', filepath)
+ if not os.path.isfile(filepath):
+ continue
+
+ try:
+ with open(filepath, 'rb') as _file:
+ state = pickle.load(_file, encoding='utf8')
+ except (OSError, EOFError, pickle.UnpicklingError) as ex:
+ message = f'Unable to load {filepath}: {ex}'
+ log.error(message)
+ if not filepath.endswith('.bak'):
+ self.archive_state(message)
+ else:
+ log.info('Successfully loaded %s', filepath)
+ break
+
+ return state if state else TorrentManagerState()
+
+ def load_state(self):
+ """Load all the torrents from TorrentManager state into session.
+
+ Emits:
+ SessionStartedEvent: Emitted after all torrents are added to the session.
+
+ """
+ start = datetime.datetime.now()
+ state = self.open_state()
+ state = self.fixup_state(state)
+
+ # Reorder the state.torrents list to add torrents in the correct queue order.
+ state.torrents.sort(
+ key=operator.attrgetter('queue'), reverse=self.config['queue_new_to_top']
+ )
+ resume_data = self.load_resume_data_file()
+
+ deferreds = []
+ for t_state in state.torrents:
+ # Populate the options dict from state
+ options = TorrentOptions()
+ for option in options:
+ try:
+ options[option] = getattr(t_state, option)
+ except AttributeError:
+ pass
+ # Manually update unmatched attributes
+ options['download_location'] = t_state.save_path
+ options['pre_allocate_storage'] = t_state.storage_mode == 'allocate'
+ options['prioritize_first_last_pieces'] = t_state.prioritize_first_last
+ options['add_paused'] = t_state.paused
+
+ magnet = t_state.magnet
+ torrent_info = self.get_torrent_info_from_file(
+ os.path.join(self.state_dir, t_state.torrent_id + '.torrent')
+ )
+
+ try:
+ d = self.add_async(
+ torrent_info=torrent_info,
+ state=t_state,
+ options=options,
+ save_state=False,
+ magnet=magnet,
+ resume_data=resume_data.get(t_state.torrent_id),
+ )
+ except AddTorrentError as ex:
+ log.warning(
+ 'Error when adding torrent "%s" to session: %s',
+ t_state.torrent_id,
+ ex,
+ )
+ else:
+ deferreds.append(d)
+
+ deferred_list = DeferredList(deferreds, consumeErrors=False)
+
+ def on_complete(result):
+ log.info(
+ 'Finished loading %d torrents in %s',
+ len(state.torrents),
+ str(datetime.datetime.now() - start),
+ )
+ component.get('EventManager').emit(SessionStartedEvent())
+
+ deferred_list.addCallback(on_complete)
+
+ def create_state(self):
+ """Create a state of all the torrents in TorrentManager.
+
+ Returns:
+ TorrentManagerState: The TorrentManager state.
+
+ """
+ state = TorrentManagerState()
+ # Create the state for each Torrent and append to the list
+ for torrent in self.torrents.values():
+ if self.session.is_paused():
+ paused = torrent.handle.is_paused()
+ elif torrent.forced_error:
+ paused = torrent.forced_error.was_paused
+ elif torrent.state == 'Paused':
+ paused = True
+ else:
+ paused = False
+
+ torrent_state = TorrentState(
+ torrent.torrent_id,
+ torrent.filename,
+ torrent.trackers,
+ torrent.get_status(['storage_mode'])['storage_mode'],
+ paused,
+ torrent.options['download_location'],
+ torrent.options['max_connections'],
+ torrent.options['max_upload_slots'],
+ torrent.options['max_upload_speed'],
+ torrent.options['max_download_speed'],
+ torrent.options['prioritize_first_last_pieces'],
+ torrent.options['sequential_download'],
+ torrent.options['file_priorities'],
+ torrent.get_queue_position(),
+ torrent.options['auto_managed'],
+ torrent.is_finished,
+ torrent.options['stop_ratio'],
+ torrent.options['stop_at_ratio'],
+ torrent.options['remove_at_ratio'],
+ torrent.options['move_completed'],
+ torrent.options['move_completed_path'],
+ torrent.magnet,
+ torrent.options['owner'],
+ torrent.options['shared'],
+ torrent.options['super_seeding'],
+ torrent.options['name'],
+ )
+ state.torrents.append(torrent_state)
+ return state
+
+ def save_state(self):
+ """Run the save state task in a separate thread to avoid blocking main thread.
+
+ Note:
+ If a save task is already running, this call is ignored.
+
+ """
+ if self.is_saving_state:
+ return defer.succeed(None)
+ self.is_saving_state = True
+ d = threads.deferToThread(self._save_state)
+
+ def on_state_saved(arg):
+ self.is_saving_state = False
+ if self.save_state_timer.running:
+ self.save_state_timer.reset()
+
+ d.addBoth(on_state_saved)
+ return d
+
+ def _save_state(self):
+ """Save the state of the TorrentManager to the torrents.state file."""
+ state = self.create_state()
+
+ # If the state hasn't changed, no need to save it
+ if self.prev_saved_state == state:
+ return
+
+ filename = 'torrents.state'
+ filepath = os.path.join(self.state_dir, filename)
+ filepath_bak = filepath + '.bak'
+ filepath_tmp = filepath + '.tmp'
+
+ try:
+ log.debug('Creating the temporary file: %s', filepath_tmp)
+ with open(filepath_tmp, 'wb', 0) as _file:
+ pickle.dump(state, _file, protocol=2)
+ _file.flush()
+ os.fsync(_file.fileno())
+ except (OSError, pickle.PicklingError) as ex:
+ log.error('Unable to save %s: %s', filename, ex)
+ return
+
+ try:
+ log.debug('Creating backup of %s at: %s', filename, filepath_bak)
+ if os.path.isfile(filepath_bak):
+ os.remove(filepath_bak)
+ if os.path.isfile(filepath):
+ os.rename(filepath, filepath_bak)
+ except OSError as ex:
+ log.error('Unable to backup %s to %s: %s', filepath, filepath_bak, ex)
+ return
+
+ try:
+ log.debug('Saving %s to: %s', filename, filepath)
+ os.rename(filepath_tmp, filepath)
+ self.prev_saved_state = state
+ except OSError as ex:
+ log.error('Failed to set new state file %s: %s', filepath, ex)
+ if os.path.isfile(filepath_bak):
+ log.info('Restoring backup of state from: %s', filepath_bak)
+ os.rename(filepath_bak, filepath)
+
+ def save_resume_data(self, torrent_ids=None, flush_disk_cache=False):
+ """Saves torrents resume data.
+
+ Args:
+ torrent_ids (list of str): A list of torrents to save the resume data for, defaults
+ to None which saves all torrents resume data.
+ flush_disk_cache (bool, optional): If True flushes the disk cache which avoids potential
+ issue with file timestamps, defaults to False. This is only needed when stopping the session.
+
+ Returns:
+ t.i.d.DeferredList: A list of twisted Deferred callbacks to be invoked when save is complete.
+
+ """
+ if torrent_ids is None:
+ torrent_ids = (
+ tid
+ for tid, t in self.torrents.items()
+ if t.handle.need_save_resume_data()
+ )
+
+ def on_torrent_resume_save(dummy_result, torrent_id):
+ """Received torrent resume_data alert so remove from waiting list"""
+ self.waiting_on_resume_data.pop(torrent_id, None)
+
+ deferreds = []
+ for torrent_id in torrent_ids:
+ d = self.waiting_on_resume_data.get(torrent_id)
+ if not d:
+ d = Deferred().addBoth(on_torrent_resume_save, torrent_id)
+ self.waiting_on_resume_data[torrent_id] = d
+ deferreds.append(d)
+ self.torrents[torrent_id].save_resume_data(flush_disk_cache)
+
+ def on_all_resume_data_finished(dummy_result):
+ """Saves resume data file when no more torrents waiting for resume data.
+
+ Returns:
+ bool: True if fastresume file is saved.
+
+ This return value determines removal of `self.temp_file` in `self.stop()`.
+
+ """
+ # Use flush_disk_cache as a marker for shutdown so fastresume is
+ # saved even if torrents are waiting.
+ if not self.waiting_on_resume_data or flush_disk_cache:
+ return self.save_resume_data_file(queue_task=flush_disk_cache)
+
+ return DeferredList(deferreds).addBoth(on_all_resume_data_finished)
+
+ def load_resume_data_file(self):
+ """Load the resume data from file for all torrents.
+
+ Returns:
+ dict: A dict of torrents and their resume_data.
+
+ """
+ filename = 'torrents.fastresume'
+ filepath = os.path.join(self.state_dir, filename)
+ filepath_bak = filepath + '.bak'
+ old_data_filepath = os.path.join(get_config_dir(), filename)
+
+ for _filepath in (filepath, filepath_bak, old_data_filepath):
+ log.info('Opening %s for load: %s', filename, _filepath)
+ try:
+ with open(_filepath, 'rb') as _file:
+ resume_data = lt.bdecode(_file.read())
+ except (OSError, EOFError, RuntimeError) as ex:
+ if self.torrents:
+ log.warning('Unable to load %s: %s', _filepath, ex)
+ resume_data = None
+ else:
+ # lt.bdecode returns the dict keys as bytes so decode them.
+ resume_data = {k.decode(): v for k, v in resume_data.items()}
+ log.info('Successfully loaded %s: %s', filename, _filepath)
+ break
+
+ # If the libtorrent bdecode doesn't happen properly, it will return None
+ # so we need to make sure we return a {}
+ if resume_data is None:
+ return {}
+ else:
+ return resume_data
+
+ def save_resume_data_file(self, queue_task=False):
+ """Save resume data to file in a separate thread to avoid blocking main thread.
+
+ Args:
+ queue_task (bool): If True and a save task is already running then queue
+ this save task to run next. Default is to not queue save tasks.
+
+ Returns:
+ Deferred: Fires with arg, True if save task was successful, False if
+ not and None if task was not performed.
+
+ """
+ if not queue_task and self.save_resume_data_file_lock.locked:
+ return defer.succeed(None)
+
+ def on_lock_aquired():
+ d = threads.deferToThread(self._save_resume_data_file)
+
+ def on_resume_data_file_saved(arg):
+ if self.save_resume_data_timer.running:
+ self.save_resume_data_timer.reset()
+ return arg
+
+ d.addBoth(on_resume_data_file_saved)
+ return d
+
+ return self.save_resume_data_file_lock.run(on_lock_aquired)
+
+ def _save_resume_data_file(self):
+ """Saves the resume data file with the contents of self.resume_data"""
+ if not self.resume_data:
+ return True
+
+ filename = 'torrents.fastresume'
+ filepath = os.path.join(self.state_dir, filename)
+ filepath_bak = filepath + '.bak'
+ filepath_tmp = filepath + '.tmp'
+
+ try:
+ log.debug('Creating the temporary file: %s', filepath_tmp)
+ with open(filepath_tmp, 'wb', 0) as _file:
+ _file.write(lt.bencode(self.resume_data))
+ _file.flush()
+ os.fsync(_file.fileno())
+ except (OSError, EOFError) as ex:
+ log.error('Unable to save %s: %s', filename, ex)
+ return False
+
+ try:
+ log.debug('Creating backup of %s at: %s', filename, filepath_bak)
+ if os.path.isfile(filepath_bak):
+ os.remove(filepath_bak)
+ if os.path.isfile(filepath):
+ os.rename(filepath, filepath_bak)
+ except OSError as ex:
+ log.error('Unable to backup %s to %s: %s', filepath, filepath_bak, ex)
+ return False
+
+ try:
+ log.debug('Saving %s to: %s', filename, filepath)
+ os.rename(filepath_tmp, filepath)
+ except OSError as ex:
+ log.error('Failed to set new file %s: %s', filepath, ex)
+ if os.path.isfile(filepath_bak):
+ log.info('Restoring backup from: %s', filepath_bak)
+ os.rename(filepath_bak, filepath)
+ else:
+ # Sync the rename operations for the directory
+ if hasattr(os, 'O_DIRECTORY'):
+ dirfd = os.open(os.path.dirname(filepath), os.O_DIRECTORY)
+ os.fsync(dirfd)
+ os.close(dirfd)
+ return True
+
+ def archive_state(self, message):
+ log.warning(message)
+ arc_filepaths = []
+ for filename in ('torrents.fastresume', 'torrents.state'):
+ filepath = os.path.join(self.state_dir, filename)
+ arc_filepaths.extend([filepath, filepath + '.bak'])
+
+ archive_files('state', arc_filepaths, message=message)
+
+ def get_queue_position(self, torrent_id):
+ """Get queue position of torrent"""
+ return self.torrents[torrent_id].get_queue_position()
+
+ def queue_top(self, torrent_id):
+ """Queue torrent to top"""
+ if self.torrents[torrent_id].get_queue_position() == 0:
+ return False
+
+ self.torrents[torrent_id].handle.queue_position_top()
+ return True
+
+ def queue_up(self, torrent_id):
+ """Queue torrent up one position"""
+ if self.torrents[torrent_id].get_queue_position() == 0:
+ return False
+
+ self.torrents[torrent_id].handle.queue_position_up()
+ return True
+
+ def queue_down(self, torrent_id):
+ """Queue torrent down one position"""
+ if self.torrents[torrent_id].get_queue_position() == (
+ len(self.queued_torrents) - 1
+ ):
+ return False
+
+ self.torrents[torrent_id].handle.queue_position_down()
+ return True
+
+ def queue_bottom(self, torrent_id):
+ """Queue torrent to bottom"""
+ if self.torrents[torrent_id].get_queue_position() == (
+ len(self.queued_torrents) - 1
+ ):
+ return False
+
+ self.torrents[torrent_id].handle.queue_position_bottom()
+ return True
+
+ def cleanup_torrents_prev_status(self):
+ """Run cleanup_prev_status for each registered torrent"""
+ for torrent in self.torrents.values():
+ torrent.cleanup_prev_status()
+
+ def on_set_max_connections_per_torrent(self, key, value):
+ """Sets the per-torrent connection limit"""
+ log.debug('max_connections_per_torrent set to %s...', value)
+ for key in self.torrents:
+ self.torrents[key].set_max_connections(value)
+
+ def on_set_max_upload_slots_per_torrent(self, key, value):
+ """Sets the per-torrent upload slot limit"""
+ log.debug('max_upload_slots_per_torrent set to %s...', value)
+ for key in self.torrents:
+ self.torrents[key].set_max_upload_slots(value)
+
+ def on_set_max_upload_speed_per_torrent(self, key, value):
+ """Sets the per-torrent upload speed limit"""
+ log.debug('max_upload_speed_per_torrent set to %s...', value)
+ for key in self.torrents:
+ self.torrents[key].set_max_upload_speed(value)
+
+ def on_set_max_download_speed_per_torrent(self, key, value):
+ """Sets the per-torrent download speed limit"""
+ log.debug('max_download_speed_per_torrent set to %s...', value)
+ for key in self.torrents:
+ self.torrents[key].set_max_download_speed(value)
+
+ # --- Alert handlers ---
+ def on_alert_add_torrent(self, alert):
+ """Alert handler for libtorrent add_torrent_alert"""
+ if not alert.handle.is_valid():
+ log.warning('Torrent handle is invalid: %s', alert.error.message())
+ return
+
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ except RuntimeError as ex:
+ log.warning('Failed to get torrent id from handle: %s', ex)
+ return
+
+ try:
+ add_async_params = self.torrents_loading.pop(torrent_id)
+ except KeyError as ex:
+ log.warning('Torrent id not in torrents loading list: %s', ex)
+ return
+
+ self.add_async_callback(alert.handle, *add_async_params)
+
+ def on_alert_torrent_finished(self, alert):
+ """Alert handler for libtorrent torrent_finished_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+
+ # If total_download is 0, do not move, it's likely the torrent wasn't downloaded, but just added.
+ # Get fresh data from libtorrent, the cache isn't always up to date
+ total_download = torrent.get_status(['total_payload_download'], update=True)[
+ 'total_payload_download'
+ ]
+
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Finished %s ', torrent_id)
+ log.debug(
+ 'Torrent settings: is_finished: %s, total_download: %s, move_completed: %s, move_path: %s',
+ torrent.is_finished,
+ total_download,
+ torrent.options['move_completed'],
+ torrent.options['move_completed_path'],
+ )
+
+ torrent.update_state()
+ if not torrent.is_finished and total_download:
+ # Move completed download to completed folder if needed
+ if (
+ torrent.options['move_completed']
+ and torrent.options['download_location']
+ != torrent.options['move_completed_path']
+ ):
+ self.waiting_on_finish_moving.append(torrent_id)
+ torrent.move_storage(torrent.options['move_completed_path'])
+ else:
+ torrent.is_finished = True
+ component.get('EventManager').emit(TorrentFinishedEvent(torrent_id))
+ else:
+ torrent.is_finished = True
+
+ # Torrent is no longer part of the queue
+ try:
+ self.queued_torrents.remove(torrent_id)
+ except KeyError:
+ # Sometimes libtorrent fires a TorrentFinishedEvent twice
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('%s is not in queued torrents set.', torrent_id)
+
+ # Only save resume data if it was actually downloaded something. Helps
+ # on startup with big queues with lots of seeding torrents. Libtorrent
+ # emits alert_torrent_finished for them, but there seems like nothing
+ # worth really to save in resume data, we just read it up in
+ # self.load_state().
+ if total_download:
+ self.save_resume_data((torrent_id,))
+
+ def on_alert_torrent_paused(self, alert):
+ """Alert handler for libtorrent torrent_paused_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+ torrent.update_state()
+ # Write the fastresume file if we are not waiting on a bulk write
+ if torrent_id not in self.waiting_on_resume_data:
+ self.save_resume_data((torrent_id,))
+
+ def on_alert_torrent_checked(self, alert):
+ """Alert handler for libtorrent torrent_checked_alert"""
+ try:
+ torrent = self.torrents[str(alert.handle.info_hash())]
+ except (RuntimeError, KeyError):
+ return
+
+ # Check to see if we're forcing a recheck and set it back to paused if necessary.
+ if torrent.forcing_recheck:
+ torrent.forcing_recheck = False
+ if torrent.forcing_recheck_paused:
+ torrent.handle.pause()
+
+ torrent.update_state()
+
+ def on_alert_tracker_reply(self, alert):
+ """Alert handler for libtorrent tracker_reply_alert"""
+ try:
+ torrent = self.torrents[str(alert.handle.info_hash())]
+ except (RuntimeError, KeyError):
+ return
+
+ # Set the tracker status for the torrent
+ torrent.set_tracker_status('Announce OK')
+
+ # Check for peer information from the tracker, if none then send a scrape request.
+ torrent.get_lt_status()
+ if torrent.status.num_complete == -1 or torrent.status.num_incomplete == -1:
+ torrent.scrape_tracker()
+
+ def on_alert_tracker_announce(self, alert):
+ """Alert handler for libtorrent tracker_announce_alert"""
+ try:
+ torrent = self.torrents[str(alert.handle.info_hash())]
+ except (RuntimeError, KeyError):
+ return
+
+ # Set the tracker status for the torrent
+ torrent.set_tracker_status('Announce Sent')
+
+ def on_alert_tracker_warning(self, alert):
+ """Alert handler for libtorrent tracker_warning_alert"""
+ try:
+ torrent = self.torrents[str(alert.handle.info_hash())]
+ except (RuntimeError, KeyError):
+ return
+ # Set the tracker status for the torrent
+ torrent.set_tracker_status('Warning: %s' % decode_bytes(alert.message()))
+
+ def on_alert_tracker_error(self, alert):
+ """Alert handler for libtorrent tracker_error_alert"""
+ try:
+ torrent = self.torrents[str(alert.handle.info_hash())]
+ except (RuntimeError, KeyError):
+ return
+
+ error_message = decode_bytes(alert.error_message())
+ if not error_message:
+ error_message = decode_bytes(alert.error.message())
+ log.debug(
+ 'Tracker Error Alert: %s [%s]', decode_bytes(alert.message()), error_message
+ )
+ # libtorrent 1.2 added endpoint struct to each tracker. to prevent false updates
+ # we will need to verify that at least one endpoint to the errored tracker is working
+ for tracker in torrent.handle.trackers():
+ if tracker['url'] == alert.url:
+ if any(
+ endpoint['last_error']['value'] == 0
+ for endpoint in tracker['endpoints']
+ ):
+ torrent.set_tracker_status('Announce OK')
+ else:
+ torrent.set_tracker_status('Error: ' + error_message)
+ break
+
+ def on_alert_storage_moved(self, alert):
+ """Alert handler for libtorrent storage_moved_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+
+ torrent.set_download_location(os.path.normpath(alert.storage_path()))
+ torrent.set_move_completed(False)
+ torrent.update_state()
+
+ if torrent_id in self.waiting_on_finish_moving:
+ self.waiting_on_finish_moving.remove(torrent_id)
+ torrent.is_finished = True
+ component.get('EventManager').emit(TorrentFinishedEvent(torrent_id))
+
+ def on_alert_storage_moved_failed(self, alert):
+ """Alert handler for libtorrent storage_moved_failed_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+
+ log.warning('on_alert_storage_moved_failed: %s', decode_bytes(alert.message()))
+ # Set an Error message and pause the torrent
+ alert_msg = decode_bytes(alert.message()).split(':', 1)[1].strip()
+ torrent.force_error_state('Failed to move download folder: %s' % alert_msg)
+
+ if torrent_id in self.waiting_on_finish_moving:
+ self.waiting_on_finish_moving.remove(torrent_id)
+ torrent.is_finished = True
+ component.get('EventManager').emit(TorrentFinishedEvent(torrent_id))
+
+ def on_alert_torrent_resumed(self, alert):
+ """Alert handler for libtorrent torrent_resumed_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+ torrent.update_state()
+ component.get('EventManager').emit(TorrentResumedEvent(torrent_id))
+
+ def on_alert_state_changed(self, alert):
+ """Alert handler for libtorrent state_changed_alert.
+
+ Emits:
+ TorrentStateChangedEvent: The state has changed.
+
+ """
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+
+ torrent.update_state()
+ # Torrent may need to download data after checking.
+ if torrent.state in ('Checking', 'Downloading'):
+ torrent.is_finished = False
+ self.queued_torrents.add(torrent_id)
+
+ def on_alert_save_resume_data(self, alert):
+ """Alert handler for libtorrent save_resume_data_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ except RuntimeError:
+ return
+ if torrent_id in self.torrents:
+ # libtorrent add_torrent expects bencoded resume_data.
+ self.resume_data[torrent_id] = lt.bencode(
+ lt.write_resume_data(alert.params)
+ )
+
+ if torrent_id in self.waiting_on_resume_data:
+ self.waiting_on_resume_data[torrent_id].callback(None)
+
+ def on_alert_save_resume_data_failed(self, alert):
+ """Alert handler for libtorrent save_resume_data_failed_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ except RuntimeError:
+ return
+
+ if torrent_id in self.waiting_on_resume_data:
+ self.waiting_on_resume_data[torrent_id].errback(
+ Exception(decode_bytes(alert.message()))
+ )
+
+ def on_alert_fastresume_rejected(self, alert):
+ """Alert handler for libtorrent fastresume_rejected_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+
+ alert_msg = decode_bytes(alert.message())
+ log.error('on_alert_fastresume_rejected: %s', alert_msg)
+ if alert.error.value() == 134:
+ if not os.path.isdir(torrent.options['download_location']):
+ error_msg = 'Unable to locate Download Folder!'
+ else:
+ error_msg = 'Missing or invalid torrent data!'
+ else:
+ error_msg = (
+ 'Problem with resume data: %s' % alert_msg.split(':', 1)[1].strip()
+ )
+ torrent.force_error_state(error_msg, restart_to_resume=True)
+
+ def on_alert_file_renamed(self, alert):
+ """Alert handler for libtorrent file_renamed_alert.
+
+ Emits:
+ TorrentFileRenamedEvent: Files in the torrent have been renamed.
+
+ """
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ torrent = self.torrents[torrent_id]
+ except (RuntimeError, KeyError):
+ return
+
+ new_name = decode_bytes(alert.new_name())
+ log.debug('index: %s name: %s', alert.index, new_name)
+
+ # We need to see if this file index is in a waiting_on_folder dict
+ for wait_on_folder in torrent.waiting_on_folder_rename:
+ if alert.index in wait_on_folder:
+ wait_on_folder[alert.index].callback(None)
+ break
+ else:
+ # This is just a regular file rename so send the signal
+ component.get('EventManager').emit(
+ TorrentFileRenamedEvent(torrent_id, alert.index, new_name)
+ )
+ self.save_resume_data((torrent_id,))
+
+ def on_alert_metadata_received(self, alert):
+ """Alert handler for libtorrent metadata_received_alert"""
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ except RuntimeError:
+ return
+
+ try:
+ torrent = self.torrents[torrent_id]
+ except KeyError:
+ pass
+ else:
+ return torrent.on_metadata_received()
+
+ # Try callback to prefetch_metadata method.
+ try:
+ d = self.prefetching_metadata[torrent_id].alert_deferred
+ except KeyError:
+ pass
+ else:
+ torrent_info = alert.handle.get_torrent_info()
+ return d.callback(torrent_info)
+
+ def on_alert_file_error(self, alert):
+ """Alert handler for libtorrent file_error_alert"""
+ try:
+ torrent = self.torrents[str(alert.handle.info_hash())]
+ except (RuntimeError, KeyError):
+ return
+ torrent.update_state()
+
+ def on_alert_file_completed(self, alert):
+ """Alert handler for libtorrent file_completed_alert
+
+ Emits:
+ TorrentFileCompletedEvent: When an individual file completes downloading.
+
+ """
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ except RuntimeError:
+ return
+ if torrent_id in self.torrents:
+ component.get('EventManager').emit(
+ TorrentFileCompletedEvent(torrent_id, alert.index)
+ )
+
+ def on_alert_state_update(self, alert):
+ """Alert handler for libtorrent state_update_alert
+
+ Result of a session.post_torrent_updates() call and contains the torrent status
+ of all torrents that changed since last time this was posted.
+
+ """
+ self.last_state_update_alert_ts = time.time()
+
+ for t_status in alert.status:
+ try:
+ torrent_id = str(t_status.info_hash)
+ except RuntimeError:
+ continue
+ if torrent_id in self.torrents:
+ self.torrents[torrent_id].status = t_status
+
+ self.handle_torrents_status_callback(self.torrents_status_requests.pop())
+
+ def on_alert_external_ip(self, alert):
+ """Alert handler for libtorrent external_ip_alert"""
+ log.info('on_alert_external_ip: %s', alert.external_address)
+ component.get('EventManager').emit(ExternalIPEvent(alert.external_address))
+
+ def on_alert_performance(self, alert):
+ """Alert handler for libtorrent performance_alert"""
+ log.warning(
+ 'on_alert_performance: %s, %s',
+ decode_bytes(alert.message()),
+ alert.warning_code,
+ )
+ if alert.warning_code == lt.performance_warning_t.send_buffer_watermark_too_low:
+ max_send_buffer_watermark = 3 * 1024 * 1024 # 3MiB
+ settings = self.session.get_settings()
+ send_buffer_watermark = settings['send_buffer_watermark']
+
+ # If send buffer is too small, try increasing its size by 512KiB (up to max_send_buffer_watermark)
+ if send_buffer_watermark < max_send_buffer_watermark:
+ value = send_buffer_watermark + (500 * 1024)
+ log.info(
+ 'Increasing send_buffer_watermark from %s to %s Bytes',
+ send_buffer_watermark,
+ value,
+ )
+ component.get('Core').apply_session_setting(
+ 'send_buffer_watermark', value
+ )
+ else:
+ log.warning(
+ 'send_buffer_watermark reached maximum value: %s Bytes',
+ max_send_buffer_watermark,
+ )
+
+ def separate_keys(self, keys, torrent_ids):
+ """Separates the input keys into torrent class keys and plugins keys"""
+ if self.torrents:
+ for torrent_id in torrent_ids:
+ if torrent_id in self.torrents:
+ status_keys = list(self.torrents[torrent_id].status_funcs)
+ leftover_keys = list(set(keys) - set(status_keys))
+ torrent_keys = list(set(keys) - set(leftover_keys))
+ return torrent_keys, leftover_keys
+ return [], []
+
+ def handle_torrents_status_callback(self, status_request):
+ """Build the status dictionary with torrent values"""
+ d, torrent_ids, keys, diff = status_request
+ status_dict = {}.fromkeys(torrent_ids)
+ torrent_keys, plugin_keys = self.separate_keys(keys, torrent_ids)
+
+ # Get the torrent status for each torrent_id
+ for torrent_id in torrent_ids:
+ if torrent_id not in self.torrents:
+ # The torrent_id does not exist in the dict.
+ # Could be the clients cache (sessionproxy) isn't up to speed.
+ del status_dict[torrent_id]
+ else:
+ status_dict[torrent_id] = self.torrents[torrent_id].get_status(
+ torrent_keys, diff, all_keys=not keys
+ )
+ self.status_dict = status_dict
+ d.callback((status_dict, plugin_keys))
+
+ def torrents_status_update(self, torrent_ids, keys, diff=False):
+ """Returns status dict for the supplied torrent_ids async.
+
+ Note:
+ If torrent states was updated recently post_torrent_updates is not called and
+ instead cached state is used.
+
+ Args:
+ torrent_ids (list of str): The torrent IDs to get the status of.
+ keys (list of str): The keys to get the status on.
+ diff (bool, optional): If True, will return a diff of the changes since the
+ last call to get_status based on the session_id, defaults to False.
+
+ Returns:
+ dict: A status dictionary for the requested torrents.
+
+ """
+ d = Deferred()
+ now = time.time()
+ # If last update was recent, use cached data instead of request updates from libtorrent
+ if (now - self.last_state_update_alert_ts) < 1.5:
+ reactor.callLater(
+ 0, self.handle_torrents_status_callback, (d, torrent_ids, keys, diff)
+ )
+ else:
+ # Ask libtorrent for status update
+ self.torrents_status_requests.insert(0, (d, torrent_ids, keys, diff))
+ self.session.post_torrent_updates()
+ return d