summaryrefslogtreecommitdiffstats
path: root/deluge/core/alertmanager.py
diff options
context:
space:
mode:
Diffstat (limited to 'deluge/core/alertmanager.py')
-rw-r--r--deluge/core/alertmanager.py81
1 files changed, 61 insertions, 20 deletions
diff --git a/deluge/core/alertmanager.py b/deluge/core/alertmanager.py
index 51a7f29..71045b0 100644
--- a/deluge/core/alertmanager.py
+++ b/deluge/core/alertmanager.py
@@ -16,11 +16,11 @@ This should typically only be used by the Core. Plugins should utilize the
"""
import contextlib
import logging
+import threading
from collections import defaultdict
-from types import SimpleNamespace
from typing import Any, Callable
-from twisted.internet import reactor
+from twisted.internet import reactor, threads
import deluge.component as component
from deluge._libtorrent import lt
@@ -34,7 +34,7 @@ class AlertManager(component.Component):
def __init__(self):
log.debug('AlertManager init...')
- component.Component.__init__(self, 'AlertManager', interval=0.3)
+ component.Component.__init__(self, 'AlertManager')
self.session = component.get('Core').session
# Increase the alert queue size so that alerts don't get lost.
@@ -56,18 +56,73 @@ class AlertManager(component.Component):
# handlers is a dictionary of lists {"alert_type": [handler1,h2,..]}
self.handlers = defaultdict(list)
+ self.handlers_retry_timeout = 0.3
+ self.handlers_retry_count = 6
self.delayed_calls = []
+ self._event = threading.Event()
def update(self):
- self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()]
- self.handle_alerts()
+ pass
+
+ def start(self):
+ thread = threading.Thread(
+ target=self.wait_for_alert_in_thread, name='alert-poller', daemon=True
+ )
+ thread.start()
+ self._event.set()
def stop(self):
+ self.cancel_delayed_calls()
+
+ def pause(self):
+ self._event.clear()
+
+ def resume(self):
+ self._event.set()
+
+ def wait_for_alert_in_thread(self):
+ while self._component_state not in ('Stopping', 'Stopped'):
+ if self.session.wait_for_alert(1000) is None:
+ continue
+ if self._event.wait():
+ threads.blockingCallFromThread(reactor, self.maybe_handle_alerts)
+
+ def cancel_delayed_calls(self):
+ """Cancel all delayed handlers."""
for delayed_call in self.delayed_calls:
if delayed_call.active():
delayed_call.cancel()
self.delayed_calls = []
+ def check_delayed_calls(self, retries: int = 0) -> bool:
+ """Returns True if any handler calls are delayed (upto retry limit)."""
+ self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()]
+ if not self.delayed_calls:
+ return False
+
+ if retries > self.handlers_retry_count:
+ log.warning(
+ 'Alert handlers timeout reached, cancelling: %s', self.delayed_calls
+ )
+ self.cancel_delayed_calls()
+ return False
+
+ return True
+
+ def maybe_handle_alerts(self, retries: int = 0) -> None:
+ if self._component_state != 'Started':
+ return
+
+ if self.check_delayed_calls(retries):
+ log.debug('Waiting for delayed alerts: %s', self.delayed_calls)
+ retries += 1
+ reactor.callLater(
+ self.handlers_retry_timeout, self.maybe_handle_alerts, retries
+ )
+ return
+
+ self.handle_alerts()
+
def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None:
"""
Registers a function that will be called when 'alert_type' is pop'd
@@ -128,21 +183,7 @@ class AlertManager(component.Component):
if log.isEnabledFor(logging.DEBUG):
log.debug('Handling alert: %s', alert_type)
- alert_copy = self.create_alert_copy(alert)
- self.delayed_calls.append(reactor.callLater(0, handler, alert_copy))
-
- @staticmethod
- def create_alert_copy(alert):
- """Create a Python copy of libtorrent alert
-
- Avoid segfault if an alert is handled after next pop_alert call"""
- return SimpleNamespace(
- **{
- attr: getattr(alert, attr)
- for attr in dir(alert)
- if not attr.startswith('__')
- }
- )
+ self.delayed_calls.append(reactor.callLater(0, handler, alert))
def set_alert_queue_size(self, queue_size):
"""Sets the maximum size of the libtorrent alert queue"""