diff options
Diffstat (limited to 'src/libserver/async_session.c')
-rw-r--r-- | src/libserver/async_session.c | 364 |
1 files changed, 364 insertions, 0 deletions
diff --git a/src/libserver/async_session.c b/src/libserver/async_session.c new file mode 100644 index 0000000..baaee62 --- /dev/null +++ b/src/libserver/async_session.c @@ -0,0 +1,364 @@ +/* + * Copyright 2023 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rspamd.h" +#include "contrib/uthash/utlist.h" +#include "contrib/libucl/khash.h" +#include "async_session.h" +#include "cryptobox.h" + +#define RSPAMD_SESSION_FLAG_DESTROYING (1 << 1) +#define RSPAMD_SESSION_FLAG_CLEANUP (1 << 2) + +#define RSPAMD_SESSION_CAN_ADD_EVENT(s) (!((s)->flags & (RSPAMD_SESSION_FLAG_DESTROYING | RSPAMD_SESSION_FLAG_CLEANUP))) + +#define msg_err_session(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \ + "events", session->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_warn_session(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \ + "events", session->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_info_session(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \ + "events", session->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_session(...) rspamd_conditional_debug_fast(NULL, NULL, \ + rspamd_events_log_id, "events", session->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) + +INIT_LOG_MODULE(events) + +/* Average symbols count to optimize hash allocation */ +static struct rspamd_counter_data events_count; + + +struct rspamd_async_event { + const gchar *subsystem; + const gchar *event_source; + event_finalizer_t fin; + void *user_data; +}; + +static inline bool +rspamd_event_equal(const struct rspamd_async_event *ev1, const struct rspamd_async_event *ev2) +{ + return ev1->fin == ev2->fin && ev1->user_data == ev2->user_data; +} + +static inline guint64 +rspamd_event_hash(const struct rspamd_async_event *ev) +{ + union _pointer_fp_thunk { + event_finalizer_t f; + gpointer p; + }; + struct ev_storage { + union _pointer_fp_thunk p; + gpointer ud; + } st; + + st.p.f = ev->fin; + st.ud = ev->user_data; + + return rspamd_cryptobox_fast_hash(&st, sizeof(st), rspamd_hash_seed()); +} + +/* Define **SET** of events */ +KHASH_INIT(rspamd_events_hash, + struct rspamd_async_event *, + char, + false, + rspamd_event_hash, + rspamd_event_equal); + +struct rspamd_async_session { + session_finalizer_t fin; + event_finalizer_t restore; + event_finalizer_t cleanup; + khash_t(rspamd_events_hash) * events; + void *user_data; + rspamd_mempool_t *pool; + guint flags; +}; + +static void +rspamd_session_dtor(gpointer d) +{ + struct rspamd_async_session *s = (struct rspamd_async_session *) d; + + /* Events are usually empty at this point */ + rspamd_set_counter_ema(&events_count, s->events->n_buckets, 0.5); + kh_destroy(rspamd_events_hash, s->events); +} + +struct rspamd_async_session * +rspamd_session_create(rspamd_mempool_t *pool, + session_finalizer_t fin, + event_finalizer_t restore, + event_finalizer_t cleanup, + void *user_data) +{ + struct rspamd_async_session *s; + + s = rspamd_mempool_alloc0(pool, sizeof(struct rspamd_async_session)); + s->pool = pool; + s->fin = fin; + s->restore = restore; + s->cleanup = cleanup; + s->user_data = user_data; + s->events = kh_init(rspamd_events_hash); + + kh_resize(rspamd_events_hash, s->events, MAX(4, events_count.mean)); + rspamd_mempool_add_destructor(pool, rspamd_session_dtor, s); + + return s; +} + +struct rspamd_async_event * +rspamd_session_add_event_full(struct rspamd_async_session *session, + event_finalizer_t fin, + gpointer user_data, + const gchar *subsystem, + const gchar *event_source) +{ + struct rspamd_async_event *new_event; + gint ret; + + if (session == NULL) { + msg_err("session is NULL"); + g_assert_not_reached(); + } + + if (!RSPAMD_SESSION_CAN_ADD_EVENT(session)) { + msg_debug_session("skip adding event subsystem: %s: " + "session is destroying/cleaning", + subsystem); + + return NULL; + } + + new_event = rspamd_mempool_alloc(session->pool, + sizeof(struct rspamd_async_event)); + new_event->fin = fin; + new_event->user_data = user_data; + new_event->subsystem = subsystem; + new_event->event_source = event_source; + + msg_debug_session("added event: %p, pending %d (+1) events, " + "subsystem: %s (%s)", + user_data, + kh_size(session->events), + subsystem, + event_source); + + kh_put(rspamd_events_hash, session->events, new_event, &ret); + g_assert(ret > 0); + + return new_event; +} + +void rspamd_session_remove_event_full(struct rspamd_async_session *session, + event_finalizer_t fin, + void *ud, + const gchar *event_source) +{ + struct rspamd_async_event search_ev, *found_ev; + khiter_t k; + + if (session == NULL) { + msg_err("session is NULL"); + return; + } + + if (!RSPAMD_SESSION_CAN_ADD_EVENT(session)) { + /* Session is already cleaned up, ignore this */ + return; + } + + /* Search for event */ + search_ev.fin = fin; + search_ev.user_data = ud; + k = kh_get(rspamd_events_hash, session->events, &search_ev); + if (k == kh_end(session->events)) { + + msg_err_session("cannot find event: %p(%p) from %s (%d total events)", fin, ud, + event_source, (int) kh_size(session->events)); + kh_foreach_key(session->events, found_ev, { + msg_err_session("existing event %s (%s): %p(%p)", + found_ev->subsystem, + found_ev->event_source, + found_ev->fin, + found_ev->user_data); + }); + + g_assert_not_reached(); + } + + found_ev = kh_key(session->events, k); + msg_debug_session("removed event: %p, pending %d (-1) events, " + "subsystem: %s (%s), added at %s", + ud, + kh_size(session->events), + found_ev->subsystem, + event_source, + found_ev->event_source); + kh_del(rspamd_events_hash, session->events, k); + + /* Remove event */ + if (fin) { + fin(ud); + } + + rspamd_session_pending(session); +} + +gboolean +rspamd_session_destroy(struct rspamd_async_session *session) +{ + if (session == NULL) { + msg_err("session is NULL"); + return FALSE; + } + + if (!rspamd_session_blocked(session)) { + session->flags |= RSPAMD_SESSION_FLAG_DESTROYING; + rspamd_session_cleanup(session, false); + + if (session->cleanup != NULL) { + session->cleanup(session->user_data); + } + } + + return TRUE; +} + +void rspamd_session_cleanup(struct rspamd_async_session *session, bool forced_cleanup) +{ + struct rspamd_async_event *ev; + + if (session == NULL) { + msg_err("session is NULL"); + return; + } + + session->flags |= RSPAMD_SESSION_FLAG_CLEANUP; + khash_t(rspamd_events_hash) *uncancellable_events = kh_init(rspamd_events_hash); + + kh_foreach_key(session->events, ev, { + /* Call event's finalizer */ + int ret; + + if (ev->fin != NULL) { + if (forced_cleanup) { + msg_info_session("forced removed event on destroy: %p, subsystem: %s, scheduled from: %s", + ev->user_data, + ev->subsystem, + ev->event_source); + } + else { + msg_debug_session("removed event on destroy: %p, subsystem: %s", + ev->user_data, + ev->subsystem); + } + ev->fin(ev->user_data); + } + else { + if (forced_cleanup) { + msg_info_session("NOT forced removed event on destroy - uncancellable: " + "%p, subsystem: %s, scheduled from: %s", + ev->user_data, + ev->subsystem, + ev->event_source); + } + else { + msg_debug_session("NOT removed event on destroy - uncancellable: %p, subsystem: %s", + ev->user_data, + ev->subsystem); + } + /* Assume an event is uncancellable, move it to a new hash table */ + kh_put(rspamd_events_hash, uncancellable_events, ev, &ret); + } + }); + + kh_destroy(rspamd_events_hash, session->events); + session->events = uncancellable_events; + if (forced_cleanup) { + msg_info_session("pending %d uncancellable events", kh_size(uncancellable_events)); + } + else { + msg_debug_session("pending %d uncancellable events", kh_size(uncancellable_events)); + } + + session->flags &= ~RSPAMD_SESSION_FLAG_CLEANUP; +} + +gboolean +rspamd_session_pending(struct rspamd_async_session *session) +{ + gboolean ret = TRUE; + + if (kh_size(session->events) == 0) { + if (session->fin != NULL) { + msg_debug_session("call fin handler, as no events are pending"); + + if (!session->fin(session->user_data)) { + /* Session finished incompletely, perform restoration */ + msg_debug_session("restore incomplete session"); + if (session->restore != NULL) { + session->restore(session->user_data); + } + } + else { + ret = FALSE; + } + } + + ret = FALSE; + } + + return ret; +} + +guint rspamd_session_events_pending(struct rspamd_async_session *session) +{ + guint npending; + + g_assert(session != NULL); + + npending = kh_size(session->events); + msg_debug_session("pending %d events", npending); + + return npending; +} + +rspamd_mempool_t * +rspamd_session_mempool(struct rspamd_async_session *session) +{ + g_assert(session != NULL); + + return session->pool; +} + +gboolean +rspamd_session_blocked(struct rspamd_async_session *session) +{ + g_assert(session != NULL); + + return !RSPAMD_SESSION_CAN_ADD_EVENT(session); +}
\ No newline at end of file |