diff options
Diffstat (limited to 'deluge/core')
-rw-r--r-- | deluge/core/alertmanager.py | 81 | ||||
-rw-r--r-- | deluge/core/core.py | 76 |
2 files changed, 107 insertions, 50 deletions
diff --git a/deluge/core/alertmanager.py b/deluge/core/alertmanager.py index 51a7f29..71045b0 100644 --- a/deluge/core/alertmanager.py +++ b/deluge/core/alertmanager.py @@ -16,11 +16,11 @@ This should typically only be used by the Core. Plugins should utilize the """ import contextlib import logging +import threading from collections import defaultdict -from types import SimpleNamespace from typing import Any, Callable -from twisted.internet import reactor +from twisted.internet import reactor, threads import deluge.component as component from deluge._libtorrent import lt @@ -34,7 +34,7 @@ class AlertManager(component.Component): def __init__(self): log.debug('AlertManager init...') - component.Component.__init__(self, 'AlertManager', interval=0.3) + component.Component.__init__(self, 'AlertManager') self.session = component.get('Core').session # Increase the alert queue size so that alerts don't get lost. @@ -56,18 +56,73 @@ class AlertManager(component.Component): # handlers is a dictionary of lists {"alert_type": [handler1,h2,..]} self.handlers = defaultdict(list) + self.handlers_retry_timeout = 0.3 + self.handlers_retry_count = 6 self.delayed_calls = [] + self._event = threading.Event() def update(self): - self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()] - self.handle_alerts() + pass + + def start(self): + thread = threading.Thread( + target=self.wait_for_alert_in_thread, name='alert-poller', daemon=True + ) + thread.start() + self._event.set() def stop(self): + self.cancel_delayed_calls() + + def pause(self): + self._event.clear() + + def resume(self): + self._event.set() + + def wait_for_alert_in_thread(self): + while self._component_state not in ('Stopping', 'Stopped'): + if self.session.wait_for_alert(1000) is None: + continue + if self._event.wait(): + threads.blockingCallFromThread(reactor, self.maybe_handle_alerts) + + def cancel_delayed_calls(self): + """Cancel all delayed handlers.""" for delayed_call in self.delayed_calls: if delayed_call.active(): delayed_call.cancel() self.delayed_calls = [] + def check_delayed_calls(self, retries: int = 0) -> bool: + """Returns True if any handler calls are delayed (upto retry limit).""" + self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()] + if not self.delayed_calls: + return False + + if retries > self.handlers_retry_count: + log.warning( + 'Alert handlers timeout reached, cancelling: %s', self.delayed_calls + ) + self.cancel_delayed_calls() + return False + + return True + + def maybe_handle_alerts(self, retries: int = 0) -> None: + if self._component_state != 'Started': + return + + if self.check_delayed_calls(retries): + log.debug('Waiting for delayed alerts: %s', self.delayed_calls) + retries += 1 + reactor.callLater( + self.handlers_retry_timeout, self.maybe_handle_alerts, retries + ) + return + + self.handle_alerts() + def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None: """ Registers a function that will be called when 'alert_type' is pop'd @@ -128,21 +183,7 @@ class AlertManager(component.Component): if log.isEnabledFor(logging.DEBUG): log.debug('Handling alert: %s', alert_type) - alert_copy = self.create_alert_copy(alert) - self.delayed_calls.append(reactor.callLater(0, handler, alert_copy)) - - @staticmethod - def create_alert_copy(alert): - """Create a Python copy of libtorrent alert - - Avoid segfault if an alert is handled after next pop_alert call""" - return SimpleNamespace( - **{ - attr: getattr(alert, attr) - for attr in dir(alert) - if not attr.startswith('__') - } - ) + self.delayed_calls.append(reactor.callLater(0, handler, alert)) def set_alert_queue_size(self, queue_size): """Sets the maximum size of the libtorrent alert queue""" diff --git a/deluge/core/core.py b/deluge/core/core.py index 198410e..e2130f5 100644 --- a/deluge/core/core.py +++ b/deluge/core/core.py @@ -12,17 +12,16 @@ import logging import os import shutil import tempfile -import threading from base64 import b64decode, b64encode from typing import Any, Dict, List, Optional, Tuple, Union from urllib.request import URLError, urlopen -from twisted.internet import defer, reactor, task +from twisted.internet import defer, reactor, task, threads from twisted.web.client import Agent, readBody import deluge.common import deluge.component as component -from deluge import path_chooser_common +from deluge import metafile, path_chooser_common from deluge._libtorrent import LT_VERSION, lt from deluge.configmanager import ConfigManager, get_config_dir from deluge.core.alertmanager import AlertManager @@ -992,30 +991,33 @@ class Core(component.Component): path, tracker, piece_length, - comment, - target, - webseeds, - private, - created_by, - trackers, - add_to_session, + comment=None, + target=None, + webseeds=None, + private=False, + created_by=None, + trackers=None, + add_to_session=False, + torrent_format=metafile.TorrentFormat.V1, ): + if isinstance(torrent_format, str): + torrent_format = metafile.TorrentFormat(torrent_format) + log.debug('creating torrent..') - threading.Thread( - target=self._create_torrent_thread, - args=( - path, - tracker, - piece_length, - comment, - target, - webseeds, - private, - created_by, - trackers, - add_to_session, - ), - ).start() + return threads.deferToThread( + self._create_torrent_thread, + path, + tracker, + piece_length, + comment=comment, + target=target, + webseeds=webseeds, + private=private, + created_by=created_by, + trackers=trackers, + add_to_session=add_to_session, + torrent_format=torrent_format, + ) def _create_torrent_thread( self, @@ -1029,27 +1031,41 @@ class Core(component.Component): created_by, trackers, add_to_session, + torrent_format, ): from deluge import metafile - metafile.make_meta_file( + filecontent = metafile.make_meta_file_content( path, tracker, piece_length, comment=comment, - target=target, webseeds=webseeds, private=private, created_by=created_by, trackers=trackers, + torrent_format=torrent_format, ) + + write_file = False + if target or not add_to_session: + write_file = True + + if not target: + target = metafile.default_meta_file_path(path) + filename = os.path.split(target)[-1] + + if write_file: + with open(target, 'wb') as _file: + _file.write(filecontent) + + filedump = b64encode(filecontent) log.debug('torrent created!') if add_to_session: options = {} options['download_location'] = os.path.split(path)[0] - with open(target, 'rb') as _file: - filedump = b64encode(_file.read()) - self.add_torrent_file(os.path.split(target)[1], filedump, options) + self.add_torrent_file(filename, filedump, options) + return filename, filedump @export def upload_plugin(self, filename: str, filedump: Union[str, bytes]) -> None: |