From 2e2851dc13d73352530dd4495c7e05603b2e520d Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 10 Apr 2024 23:38:38 +0200 Subject: Adding upstream version 2.1.2~dev0+20240219. Signed-off-by: Daniel Baumann --- deluge/ui/sessionproxy.py | 282 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 282 insertions(+) create mode 100644 deluge/ui/sessionproxy.py (limited to 'deluge/ui/sessionproxy.py') diff --git a/deluge/ui/sessionproxy.py b/deluge/ui/sessionproxy.py new file mode 100644 index 0000000..6cb8550 --- /dev/null +++ b/deluge/ui/sessionproxy.py @@ -0,0 +1,282 @@ +# +# Copyright (C) 2010 Andrew Resch +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# +import logging +from time import time + +from twisted.internet.defer import maybeDeferred, succeed + +import deluge.component as component +from deluge.ui.client import client + +log = logging.getLogger(__name__) + + +class SessionProxy(component.Component): + """ + The SessionProxy component is used to cache session information client-side + to reduce the number of RPCs needed to provide a rich user interface. + + It will query the Core for only changes in the status of the torrents + and will try to satisfy client requests from the cache. + + """ + + def __init__(self): + log.debug('SessionProxy init..') + component.Component.__init__(self, 'SessionProxy', interval=5) + + # Set the cache time in seconds + # This is how long data will be valid before re-fetching from the core + self.cache_time = 1.5 + + # Hold the torrents' status.. {torrent_id: [time, {status_dict}], ...} + self.torrents = {} + + # Holds the time of the last key update.. {torrent_id: {key1, time, ...}, ...} + self.cache_times = {} + + def start(self): + client.register_event_handler( + 'TorrentStateChangedEvent', self.on_torrent_state_changed + ) + client.register_event_handler('TorrentRemovedEvent', self.on_torrent_removed) + client.register_event_handler('TorrentAddedEvent', self.on_torrent_added) + + def on_get_session_state(torrent_ids): + for torrent_id in torrent_ids: + # Let's at least store the torrent ids with empty statuses + # so that upcoming queries or status updates don't throw errors. + self.torrents.setdefault(torrent_id, [time(), {}]) + self.cache_times.setdefault(torrent_id, {}) + return torrent_ids + + return client.core.get_session_state().addCallback(on_get_session_state) + + def stop(self): + client.deregister_event_handler( + 'TorrentStateChangedEvent', self.on_torrent_state_changed + ) + client.deregister_event_handler('TorrentRemovedEvent', self.on_torrent_removed) + client.deregister_event_handler('TorrentAddedEvent', self.on_torrent_added) + self.torrents = {} + + def create_status_dict(self, torrent_ids, keys): + """ + Creates a status dict from the cache. + + :param torrent_ids: the torrent_ids + :type torrent_ids: list of strings + :param keys: the status keys + :type keys: list of strings + + :returns: a dict with the status information for the *torrent_ids* + :rtype: dict + + """ + sd = {} + keys = set(keys) + keys_len = ( + -1 + ) # The number of keys for the current cache (not the len of keys_diff_cached) + keys_diff_cached = [] + + for torrent_id in torrent_ids: + try: + if keys: + sd[torrent_id] = self.torrents[torrent_id][1].copy() + + # Have to remove the keys that weren't requested + if len(sd[torrent_id]) == keys_len: + # If the number of keys are equal they are the same keys + # so we use the cached diff of the keys we need to remove + keys_to_remove = keys_diff_cached + else: + # Not the same keys so create a new diff + keys_to_remove = set(sd[torrent_id]) - keys + # Update the cached diff + keys_diff_cached = keys_to_remove + keys_len = len(sd[torrent_id]) + + # Usually there are no keys to remove, so it's cheaper with + # this if-test than a for-loop with no iterations. + if keys_to_remove: + for k in keys_to_remove: + del sd[torrent_id][k] + else: + sd[torrent_id] = dict(self.torrents[torrent_id][1]) + except KeyError: + continue + return sd + + def get_torrent_status(self, torrent_id, keys): + """ + Get a status dict for one torrent. + + :param torrent_id: the torrent_id + :type torrent_id: string + :param keys: the status keys + :type keys: list of strings + + :returns: a dict of status information + :rtype: dict + + """ + if torrent_id in self.torrents: + # Keep track of keys we need to request from the core + keys_to_get = [] + if not keys: + keys = list(self.torrents[torrent_id][1]) + + for key in keys: + if ( + time() - self.cache_times[torrent_id].get(key, 0.0) + > self.cache_time + ): + keys_to_get.append(key) + if not keys_to_get: + return succeed(self.create_status_dict([torrent_id], keys)[torrent_id]) + else: + d = client.core.get_torrent_status(torrent_id, keys_to_get, True) + + def on_status(result, torrent_id): + t = time() + try: + self.torrents[torrent_id][0] = t + self.torrents[torrent_id][1].update(result) + for key in keys_to_get: + self.cache_times[torrent_id][key] = t + return self.create_status_dict([torrent_id], keys)[torrent_id] + except KeyError: + log.debug( + f'Status missing for torrent (removed?): {torrent_id}' + ) + return {} + + return d.addCallback(on_status, torrent_id) + else: + d = client.core.get_torrent_status(torrent_id, keys, True) + + def on_status(result): + if result: + t = time() + self.torrents[torrent_id] = (t, result) + self.cache_times[torrent_id] = {} + for key in result: + self.cache_times[torrent_id][key] = t + + return result + + return d.addCallback(on_status) + + def get_torrents_status(self, filter_dict, keys): + """ + Get a dict of torrent statuses. + + The filter can take 2 keys, *state* and *id*. The state filter can be + one of the torrent states or the special one *Active*. The *id* key is + simply a list of torrent_ids. + + :param filter_dict: the filter used for this query + :type filter_dict: dict + :param keys: the status keys + :type keys: list of strings + + :returns: a dict of torrent_ids and their status dicts + :rtype: dict + + """ + + # Helper functions and callbacks --------------------------------------- + def on_status(result, torrent_ids, keys): + # Update the internal torrent status dict with the update values + t = time() + for key, value in result.items(): + try: + self.torrents[key][0] = t + self.torrents[key][1].update(value) + for k in value: + self.cache_times[key][k] = t + except KeyError: + # The torrent was removed + continue + + # Create the status dict + if not torrent_ids: + torrent_ids = list(result) + + return self.create_status_dict(torrent_ids, keys) + + def find_torrents_to_fetch(torrent_ids): + to_fetch = [] + t = time() + for torrent_id in torrent_ids: + torrent = self.torrents[torrent_id] + if t - torrent[0] > self.cache_time: + to_fetch.append(torrent_id) + else: + # We need to check if a key is expired + for key in keys: + if ( + t - self.cache_times[torrent_id].get(key, 0.0) + > self.cache_time + ): + to_fetch.append(torrent_id) + break + + return to_fetch + + # ----------------------------------------------------------------------- + + if not filter_dict: + # This means we want all the torrents status + # We get a list of any torrent_ids with expired status dicts + torrents_list = list(self.torrents) + to_fetch = find_torrents_to_fetch(torrents_list) + if to_fetch: + d = client.core.get_torrents_status({'id': to_fetch}, keys, True) + return d.addCallback(on_status, torrents_list, keys) + + # Don't need to fetch anything + return maybeDeferred(self.create_status_dict, torrents_list, keys) + + if len(filter_dict) == 1 and 'id' in filter_dict: + # At this point we should have a filter with just "id" in it + to_fetch = find_torrents_to_fetch(filter_dict['id']) + if to_fetch: + d = client.core.get_torrents_status({'id': to_fetch}, keys, True) + return d.addCallback(on_status, filter_dict['id'], keys) + else: + # Don't need to fetch anything, so just return data from the cache + return maybeDeferred(self.create_status_dict, filter_dict['id'], keys) + else: + # This is a keyworded filter so lets just pass it onto the core + # XXX: Add more caching here. + d = client.core.get_torrents_status(filter_dict, keys, True) + return d.addCallback(on_status, None, keys) + + def on_torrent_state_changed(self, torrent_id, state): + if torrent_id in self.torrents: + self.torrents[torrent_id][1].setdefault('state', state) + self.cache_times.setdefault(torrent_id, {}).update(state=time()) + + def on_torrent_added(self, torrent_id, from_state): + self.torrents[torrent_id] = [time() - self.cache_time - 1, {}] + self.cache_times[torrent_id] = {} + + def on_status(status): + self.torrents[torrent_id][1].update(status) + t = time() + for key in status: + self.cache_times[torrent_id][key] = t + + client.core.get_torrent_status(torrent_id, []).addCallback(on_status) + + def on_torrent_removed(self, torrent_id): + if torrent_id in self.torrents: + del self.torrents[torrent_id] + del self.cache_times[torrent_id] -- cgit v1.2.3