summaryrefslogtreecommitdiffstats
path: root/src/libserver/async_session.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-10 21:30:40 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-10 21:30:40 +0000
commit133a45c109da5310add55824db21af5239951f93 (patch)
treeba6ac4c0a950a0dda56451944315d66409923918 /src/libserver/async_session.c
parentInitial commit. (diff)
downloadrspamd-133a45c109da5310add55824db21af5239951f93.tar.xz
rspamd-133a45c109da5310add55824db21af5239951f93.zip
Adding upstream version 3.8.1.upstream/3.8.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/libserver/async_session.c')
-rw-r--r--src/libserver/async_session.c364
1 files changed, 364 insertions, 0 deletions
diff --git a/src/libserver/async_session.c b/src/libserver/async_session.c
new file mode 100644
index 0000000..baaee62
--- /dev/null
+++ b/src/libserver/async_session.c
@@ -0,0 +1,364 @@
+/*
+ * Copyright 2023 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rspamd.h"
+#include "contrib/uthash/utlist.h"
+#include "contrib/libucl/khash.h"
+#include "async_session.h"
+#include "cryptobox.h"
+
+#define RSPAMD_SESSION_FLAG_DESTROYING (1 << 1)
+#define RSPAMD_SESSION_FLAG_CLEANUP (1 << 2)
+
+#define RSPAMD_SESSION_CAN_ADD_EVENT(s) (!((s)->flags & (RSPAMD_SESSION_FLAG_DESTROYING | RSPAMD_SESSION_FLAG_CLEANUP)))
+
+#define msg_err_session(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
+ "events", session->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_warn_session(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \
+ "events", session->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_info_session(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \
+ "events", session->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_debug_session(...) rspamd_conditional_debug_fast(NULL, NULL, \
+ rspamd_events_log_id, "events", session->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+
+INIT_LOG_MODULE(events)
+
+/* Average symbols count to optimize hash allocation */
+static struct rspamd_counter_data events_count;
+
+
+struct rspamd_async_event {
+ const gchar *subsystem;
+ const gchar *event_source;
+ event_finalizer_t fin;
+ void *user_data;
+};
+
+static inline bool
+rspamd_event_equal(const struct rspamd_async_event *ev1, const struct rspamd_async_event *ev2)
+{
+ return ev1->fin == ev2->fin && ev1->user_data == ev2->user_data;
+}
+
+static inline guint64
+rspamd_event_hash(const struct rspamd_async_event *ev)
+{
+ union _pointer_fp_thunk {
+ event_finalizer_t f;
+ gpointer p;
+ };
+ struct ev_storage {
+ union _pointer_fp_thunk p;
+ gpointer ud;
+ } st;
+
+ st.p.f = ev->fin;
+ st.ud = ev->user_data;
+
+ return rspamd_cryptobox_fast_hash(&st, sizeof(st), rspamd_hash_seed());
+}
+
+/* Define **SET** of events */
+KHASH_INIT(rspamd_events_hash,
+ struct rspamd_async_event *,
+ char,
+ false,
+ rspamd_event_hash,
+ rspamd_event_equal);
+
+struct rspamd_async_session {
+ session_finalizer_t fin;
+ event_finalizer_t restore;
+ event_finalizer_t cleanup;
+ khash_t(rspamd_events_hash) * events;
+ void *user_data;
+ rspamd_mempool_t *pool;
+ guint flags;
+};
+
+static void
+rspamd_session_dtor(gpointer d)
+{
+ struct rspamd_async_session *s = (struct rspamd_async_session *) d;
+
+ /* Events are usually empty at this point */
+ rspamd_set_counter_ema(&events_count, s->events->n_buckets, 0.5);
+ kh_destroy(rspamd_events_hash, s->events);
+}
+
+struct rspamd_async_session *
+rspamd_session_create(rspamd_mempool_t *pool,
+ session_finalizer_t fin,
+ event_finalizer_t restore,
+ event_finalizer_t cleanup,
+ void *user_data)
+{
+ struct rspamd_async_session *s;
+
+ s = rspamd_mempool_alloc0(pool, sizeof(struct rspamd_async_session));
+ s->pool = pool;
+ s->fin = fin;
+ s->restore = restore;
+ s->cleanup = cleanup;
+ s->user_data = user_data;
+ s->events = kh_init(rspamd_events_hash);
+
+ kh_resize(rspamd_events_hash, s->events, MAX(4, events_count.mean));
+ rspamd_mempool_add_destructor(pool, rspamd_session_dtor, s);
+
+ return s;
+}
+
+struct rspamd_async_event *
+rspamd_session_add_event_full(struct rspamd_async_session *session,
+ event_finalizer_t fin,
+ gpointer user_data,
+ const gchar *subsystem,
+ const gchar *event_source)
+{
+ struct rspamd_async_event *new_event;
+ gint ret;
+
+ if (session == NULL) {
+ msg_err("session is NULL");
+ g_assert_not_reached();
+ }
+
+ if (!RSPAMD_SESSION_CAN_ADD_EVENT(session)) {
+ msg_debug_session("skip adding event subsystem: %s: "
+ "session is destroying/cleaning",
+ subsystem);
+
+ return NULL;
+ }
+
+ new_event = rspamd_mempool_alloc(session->pool,
+ sizeof(struct rspamd_async_event));
+ new_event->fin = fin;
+ new_event->user_data = user_data;
+ new_event->subsystem = subsystem;
+ new_event->event_source = event_source;
+
+ msg_debug_session("added event: %p, pending %d (+1) events, "
+ "subsystem: %s (%s)",
+ user_data,
+ kh_size(session->events),
+ subsystem,
+ event_source);
+
+ kh_put(rspamd_events_hash, session->events, new_event, &ret);
+ g_assert(ret > 0);
+
+ return new_event;
+}
+
+void rspamd_session_remove_event_full(struct rspamd_async_session *session,
+ event_finalizer_t fin,
+ void *ud,
+ const gchar *event_source)
+{
+ struct rspamd_async_event search_ev, *found_ev;
+ khiter_t k;
+
+ if (session == NULL) {
+ msg_err("session is NULL");
+ return;
+ }
+
+ if (!RSPAMD_SESSION_CAN_ADD_EVENT(session)) {
+ /* Session is already cleaned up, ignore this */
+ return;
+ }
+
+ /* Search for event */
+ search_ev.fin = fin;
+ search_ev.user_data = ud;
+ k = kh_get(rspamd_events_hash, session->events, &search_ev);
+ if (k == kh_end(session->events)) {
+
+ msg_err_session("cannot find event: %p(%p) from %s (%d total events)", fin, ud,
+ event_source, (int) kh_size(session->events));
+ kh_foreach_key(session->events, found_ev, {
+ msg_err_session("existing event %s (%s): %p(%p)",
+ found_ev->subsystem,
+ found_ev->event_source,
+ found_ev->fin,
+ found_ev->user_data);
+ });
+
+ g_assert_not_reached();
+ }
+
+ found_ev = kh_key(session->events, k);
+ msg_debug_session("removed event: %p, pending %d (-1) events, "
+ "subsystem: %s (%s), added at %s",
+ ud,
+ kh_size(session->events),
+ found_ev->subsystem,
+ event_source,
+ found_ev->event_source);
+ kh_del(rspamd_events_hash, session->events, k);
+
+ /* Remove event */
+ if (fin) {
+ fin(ud);
+ }
+
+ rspamd_session_pending(session);
+}
+
+gboolean
+rspamd_session_destroy(struct rspamd_async_session *session)
+{
+ if (session == NULL) {
+ msg_err("session is NULL");
+ return FALSE;
+ }
+
+ if (!rspamd_session_blocked(session)) {
+ session->flags |= RSPAMD_SESSION_FLAG_DESTROYING;
+ rspamd_session_cleanup(session, false);
+
+ if (session->cleanup != NULL) {
+ session->cleanup(session->user_data);
+ }
+ }
+
+ return TRUE;
+}
+
+void rspamd_session_cleanup(struct rspamd_async_session *session, bool forced_cleanup)
+{
+ struct rspamd_async_event *ev;
+
+ if (session == NULL) {
+ msg_err("session is NULL");
+ return;
+ }
+
+ session->flags |= RSPAMD_SESSION_FLAG_CLEANUP;
+ khash_t(rspamd_events_hash) *uncancellable_events = kh_init(rspamd_events_hash);
+
+ kh_foreach_key(session->events, ev, {
+ /* Call event's finalizer */
+ int ret;
+
+ if (ev->fin != NULL) {
+ if (forced_cleanup) {
+ msg_info_session("forced removed event on destroy: %p, subsystem: %s, scheduled from: %s",
+ ev->user_data,
+ ev->subsystem,
+ ev->event_source);
+ }
+ else {
+ msg_debug_session("removed event on destroy: %p, subsystem: %s",
+ ev->user_data,
+ ev->subsystem);
+ }
+ ev->fin(ev->user_data);
+ }
+ else {
+ if (forced_cleanup) {
+ msg_info_session("NOT forced removed event on destroy - uncancellable: "
+ "%p, subsystem: %s, scheduled from: %s",
+ ev->user_data,
+ ev->subsystem,
+ ev->event_source);
+ }
+ else {
+ msg_debug_session("NOT removed event on destroy - uncancellable: %p, subsystem: %s",
+ ev->user_data,
+ ev->subsystem);
+ }
+ /* Assume an event is uncancellable, move it to a new hash table */
+ kh_put(rspamd_events_hash, uncancellable_events, ev, &ret);
+ }
+ });
+
+ kh_destroy(rspamd_events_hash, session->events);
+ session->events = uncancellable_events;
+ if (forced_cleanup) {
+ msg_info_session("pending %d uncancellable events", kh_size(uncancellable_events));
+ }
+ else {
+ msg_debug_session("pending %d uncancellable events", kh_size(uncancellable_events));
+ }
+
+ session->flags &= ~RSPAMD_SESSION_FLAG_CLEANUP;
+}
+
+gboolean
+rspamd_session_pending(struct rspamd_async_session *session)
+{
+ gboolean ret = TRUE;
+
+ if (kh_size(session->events) == 0) {
+ if (session->fin != NULL) {
+ msg_debug_session("call fin handler, as no events are pending");
+
+ if (!session->fin(session->user_data)) {
+ /* Session finished incompletely, perform restoration */
+ msg_debug_session("restore incomplete session");
+ if (session->restore != NULL) {
+ session->restore(session->user_data);
+ }
+ }
+ else {
+ ret = FALSE;
+ }
+ }
+
+ ret = FALSE;
+ }
+
+ return ret;
+}
+
+guint rspamd_session_events_pending(struct rspamd_async_session *session)
+{
+ guint npending;
+
+ g_assert(session != NULL);
+
+ npending = kh_size(session->events);
+ msg_debug_session("pending %d events", npending);
+
+ return npending;
+}
+
+rspamd_mempool_t *
+rspamd_session_mempool(struct rspamd_async_session *session)
+{
+ g_assert(session != NULL);
+
+ return session->pool;
+}
+
+gboolean
+rspamd_session_blocked(struct rspamd_async_session *session)
+{
+ g_assert(session != NULL);
+
+ return !RSPAMD_SESSION_CAN_ADD_EVENT(session);
+} \ No newline at end of file