diff options
Diffstat (limited to 'deluge/core/alertmanager.py')
-rw-r--r-- | deluge/core/alertmanager.py | 81 |
1 files changed, 61 insertions, 20 deletions
diff --git a/deluge/core/alertmanager.py b/deluge/core/alertmanager.py index 51a7f29..71045b0 100644 --- a/deluge/core/alertmanager.py +++ b/deluge/core/alertmanager.py @@ -16,11 +16,11 @@ This should typically only be used by the Core. Plugins should utilize the """ import contextlib import logging +import threading from collections import defaultdict -from types import SimpleNamespace from typing import Any, Callable -from twisted.internet import reactor +from twisted.internet import reactor, threads import deluge.component as component from deluge._libtorrent import lt @@ -34,7 +34,7 @@ class AlertManager(component.Component): def __init__(self): log.debug('AlertManager init...') - component.Component.__init__(self, 'AlertManager', interval=0.3) + component.Component.__init__(self, 'AlertManager') self.session = component.get('Core').session # Increase the alert queue size so that alerts don't get lost. @@ -56,18 +56,73 @@ class AlertManager(component.Component): # handlers is a dictionary of lists {"alert_type": [handler1,h2,..]} self.handlers = defaultdict(list) + self.handlers_retry_timeout = 0.3 + self.handlers_retry_count = 6 self.delayed_calls = [] + self._event = threading.Event() def update(self): - self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()] - self.handle_alerts() + pass + + def start(self): + thread = threading.Thread( + target=self.wait_for_alert_in_thread, name='alert-poller', daemon=True + ) + thread.start() + self._event.set() def stop(self): + self.cancel_delayed_calls() + + def pause(self): + self._event.clear() + + def resume(self): + self._event.set() + + def wait_for_alert_in_thread(self): + while self._component_state not in ('Stopping', 'Stopped'): + if self.session.wait_for_alert(1000) is None: + continue + if self._event.wait(): + threads.blockingCallFromThread(reactor, self.maybe_handle_alerts) + + def cancel_delayed_calls(self): + """Cancel all delayed handlers.""" for delayed_call in self.delayed_calls: if delayed_call.active(): delayed_call.cancel() self.delayed_calls = [] + def check_delayed_calls(self, retries: int = 0) -> bool: + """Returns True if any handler calls are delayed (upto retry limit).""" + self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()] + if not self.delayed_calls: + return False + + if retries > self.handlers_retry_count: + log.warning( + 'Alert handlers timeout reached, cancelling: %s', self.delayed_calls + ) + self.cancel_delayed_calls() + return False + + return True + + def maybe_handle_alerts(self, retries: int = 0) -> None: + if self._component_state != 'Started': + return + + if self.check_delayed_calls(retries): + log.debug('Waiting for delayed alerts: %s', self.delayed_calls) + retries += 1 + reactor.callLater( + self.handlers_retry_timeout, self.maybe_handle_alerts, retries + ) + return + + self.handle_alerts() + def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None: """ Registers a function that will be called when 'alert_type' is pop'd @@ -128,21 +183,7 @@ class AlertManager(component.Component): if log.isEnabledFor(logging.DEBUG): log.debug('Handling alert: %s', alert_type) - alert_copy = self.create_alert_copy(alert) - self.delayed_calls.append(reactor.callLater(0, handler, alert_copy)) - - @staticmethod - def create_alert_copy(alert): - """Create a Python copy of libtorrent alert - - Avoid segfault if an alert is handled after next pop_alert call""" - return SimpleNamespace( - **{ - attr: getattr(alert, attr) - for attr in dir(alert) - if not attr.startswith('__') - } - ) + self.delayed_calls.append(reactor.callLater(0, handler, alert)) def set_alert_queue_size(self, queue_size): """Sets the maximum size of the libtorrent alert queue""" |