diff options
Diffstat (limited to 'src/libserver/symcache')
-rw-r--r-- | src/libserver/symcache/symcache_c.cxx | 715 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_id_list.hxx | 95 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_impl.cxx | 1316 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_internal.hxx | 652 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_item.cxx | 652 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_item.hxx | 561 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_periodic.hxx | 89 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_runtime.cxx | 823 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_runtime.hxx | 209 |
9 files changed, 5112 insertions, 0 deletions
diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx new file mode 100644 index 0000000..6a7e41c --- /dev/null +++ b/src/libserver/symcache/symcache_c.cxx @@ -0,0 +1,715 @@ +/* + * Copyright 2023 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "symcache_internal.hxx" +#include "symcache_periodic.hxx" +#include "symcache_item.hxx" +#include "symcache_runtime.hxx" + +/** + * C API for symcache + */ + +#define C_API_SYMCACHE(ptr) (reinterpret_cast<rspamd::symcache::symcache *>(ptr)) +#define C_API_SYMCACHE_RUNTIME(ptr) (reinterpret_cast<rspamd::symcache::symcache_runtime *>(ptr)) +#define C_API_SYMCACHE_ITEM(ptr) (reinterpret_cast<rspamd::symcache::cache_item *>(ptr)) +#define C_API_SYMCACHE_DYN_ITEM(ptr) (reinterpret_cast<rspamd::symcache::cache_dynamic_item *>(ptr)) + +void rspamd_symcache_destroy(struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + delete real_cache; +} + +struct rspamd_symcache * +rspamd_symcache_new(struct rspamd_config *cfg) +{ + auto *ncache = new rspamd::symcache::symcache(cfg); + + return (struct rspamd_symcache *) ncache; +} + +gboolean +rspamd_symcache_init(struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + return real_cache->init(); +} + +void rspamd_symcache_save(struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + real_cache->save_items(); +} + +gint rspamd_symcache_add_symbol(struct rspamd_symcache *cache, + const gchar *name, + gint priority, + symbol_func_t func, + gpointer user_data, + int type, + gint parent) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + /* Legacy stuff */ + if (name == nullptr) { + name = ""; + } + + if (parent == -1) { + return real_cache->add_symbol_with_callback(name, priority, func, user_data, type); + } + else { + return real_cache->add_virtual_symbol(name, parent, type); + } +} + +bool rspamd_symcache_add_symbol_augmentation(struct rspamd_symcache *cache, + int sym_id, + const char *augmentation, + const char *value) +{ + auto *real_cache = C_API_SYMCACHE(cache); + auto log_tag = [&]() { return real_cache->log_tag(); }; + + if (augmentation == nullptr) { + msg_err_cache("null augmentation is not allowed for item %d", sym_id); + return false; + } + + + auto *item = real_cache->get_item_by_id_mut(sym_id, false); + + if (item == nullptr) { + msg_err_cache("item %d is not found", sym_id); + return false; + } + + /* Handle empty or absent strings equally */ + if (value == nullptr || value[0] == '\0') { + return item->add_augmentation(*real_cache, augmentation, std::nullopt); + } + + return item->add_augmentation(*real_cache, augmentation, value); +} + +void rspamd_symcache_set_peak_callback(struct rspamd_symcache *cache, gint cbref) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + real_cache->set_peak_cb(cbref); +} + +gboolean +rspamd_symcache_add_condition_delayed(struct rspamd_symcache *cache, + const gchar *sym, lua_State *L, gint cbref) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + real_cache->add_delayed_condition(sym, cbref); + + return TRUE; +} + +gint rspamd_symcache_find_symbol(struct rspamd_symcache *cache, + const gchar *name) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + /* Legacy stuff but used */ + if (name == nullptr) { + return -1; + } + + auto sym_maybe = real_cache->get_item_by_name(name, false); + + if (sym_maybe != nullptr) { + return sym_maybe->id; + } + + return -1; +} + +gboolean +rspamd_symcache_stat_symbol(struct rspamd_symcache *cache, + const gchar *name, + gdouble *frequency, + gdouble *freq_stddev, + gdouble *tm, + guint *nhits) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto sym_maybe = real_cache->get_item_by_name(name, false); + + if (sym_maybe != nullptr) { + *frequency = sym_maybe->st->avg_frequency; + *freq_stddev = sqrt(sym_maybe->st->stddev_frequency); + *tm = sym_maybe->st->time_counter.mean; + + if (nhits) { + *nhits = sym_maybe->st->hits; + } + + return TRUE; + } + + return FALSE; +} + + +guint rspamd_symcache_stats_symbols_count(struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + return real_cache->get_stats_symbols_count(); +} + +guint64 +rspamd_symcache_get_cksum(struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + return real_cache->get_cksum(); +} + +gboolean +rspamd_symcache_validate(struct rspamd_symcache *cache, + struct rspamd_config *cfg, + gboolean strict) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + return real_cache->validate(strict); +} + +ucl_object_t * +rspamd_symcache_counters(struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + return real_cache->counters(); +} + +void * +rspamd_symcache_start_refresh(struct rspamd_symcache *cache, + struct ev_loop *ev_base, struct rspamd_worker *w) +{ + auto *real_cache = C_API_SYMCACHE(cache); + return new rspamd::symcache::cache_refresh_cbdata{real_cache, ev_base, w}; +} + +void rspamd_symcache_inc_frequency(struct rspamd_symcache *cache, struct rspamd_symcache_item *item, + const char *sym_name) +{ + auto *real_item = C_API_SYMCACHE_ITEM(item); + auto *real_cache = C_API_SYMCACHE(cache); + + if (real_item) { + real_item->inc_frequency(sym_name, *real_cache); + } +} + +void rspamd_symcache_add_delayed_dependency(struct rspamd_symcache *cache, + const gchar *from, const gchar *to) +{ + auto *real_cache = C_API_SYMCACHE(cache); + real_cache->add_delayed_dependency(from, to); +} + +const gchar * +rspamd_symcache_get_parent(struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto *sym = real_cache->get_item_by_name(symbol, false); + + if (sym && sym->is_virtual()) { + auto *parent = sym->get_parent(*real_cache); + + if (parent) { + return parent->get_name().c_str(); + } + } + + return nullptr; +} + +const gchar * +rspamd_symcache_item_name(struct rspamd_symcache_item *item) +{ + auto *real_item = C_API_SYMCACHE_ITEM(item); + + if (real_item == nullptr) { + return nullptr; + } + + return real_item->get_name().c_str(); +} + +gint rspamd_symcache_item_flags(struct rspamd_symcache_item *item) +{ + auto *real_item = C_API_SYMCACHE_ITEM(item); + + if (real_item == nullptr) { + return 0; + } + + return real_item->get_flags(); +} + + +const gchar * +rspamd_symcache_dyn_item_name(struct rspamd_task *task, + struct rspamd_symcache_dynamic_item *dyn_item) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_dyn_item = C_API_SYMCACHE_DYN_ITEM(dyn_item); + + if (cache_runtime == nullptr || real_dyn_item == nullptr) { + return nullptr; + } + + auto static_item = cache_runtime->get_item_by_dynamic_item(real_dyn_item); + + return static_item->get_name().c_str(); +} + +gint rspamd_symcache_item_flags(struct rspamd_task *task, + struct rspamd_symcache_dynamic_item *dyn_item) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_dyn_item = C_API_SYMCACHE_DYN_ITEM(dyn_item); + + if (cache_runtime == nullptr || real_dyn_item == nullptr) { + return 0; + } + + auto static_item = cache_runtime->get_item_by_dynamic_item(real_dyn_item); + + return static_item->get_flags(); +} + +guint rspamd_symcache_get_symbol_flags(struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto *sym = real_cache->get_item_by_name(symbol, false); + + if (sym) { + return sym->get_flags(); + } + + return 0; +} + +const struct rspamd_symcache_item_stat * +rspamd_symcache_item_stat(struct rspamd_symcache_item *item) +{ + auto *real_item = C_API_SYMCACHE_ITEM(item); + return real_item->st; +} + +void rspamd_symcache_get_symbol_details(struct rspamd_symcache *cache, + const gchar *symbol, + ucl_object_t *this_sym_ucl) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto *sym = real_cache->get_item_by_name(symbol, false); + + if (sym) { + ucl_object_insert_key(this_sym_ucl, + ucl_object_fromstring(sym->get_type_str()), + "type", strlen("type"), false); + } +} + +void rspamd_symcache_foreach(struct rspamd_symcache *cache, + void (*func)(struct rspamd_symcache_item *item, gpointer /* userdata */), + gpointer ud) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + real_cache->symbols_foreach([&](const rspamd::symcache::cache_item *item) { + func((struct rspamd_symcache_item *) item, ud); + }); +} + +void rspamd_symcache_process_settings_elt(struct rspamd_symcache *cache, + struct rspamd_config_settings_elt *elt) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + real_cache->process_settings_elt(elt); +} + +bool rspamd_symcache_set_allowed_settings_ids(struct rspamd_symcache *cache, + const gchar *symbol, + const guint32 *ids, + guint nids) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto *item = real_cache->get_item_by_name_mut(symbol, false); + + if (item == nullptr) { + return false; + } + + item->allowed_ids.set_ids(ids, nids); + return true; +} + +bool rspamd_symcache_set_forbidden_settings_ids(struct rspamd_symcache *cache, + const gchar *symbol, + const guint32 *ids, + guint nids) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto *item = real_cache->get_item_by_name_mut(symbol, false); + + if (item == nullptr) { + return false; + } + + item->forbidden_ids.set_ids(ids, nids); + return true; +} + +const guint32 * +rspamd_symcache_get_allowed_settings_ids(struct rspamd_symcache *cache, + const gchar *symbol, + guint *nids) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + const auto *item = real_cache->get_item_by_name(symbol, false); + return item->allowed_ids.get_ids(*nids); +} + +const guint32 * +rspamd_symcache_get_forbidden_settings_ids(struct rspamd_symcache *cache, + const gchar *symbol, + guint *nids) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + const auto *item = real_cache->get_item_by_name(symbol, false); + return item->forbidden_ids.get_ids(*nids); +} + +void rspamd_symcache_disable_all_symbols(struct rspamd_task *task, + struct rspamd_symcache *_cache, + guint skip_mask) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + + cache_runtime->disable_all_symbols(skip_mask); +} + +gboolean +rspamd_symcache_disable_symbol(struct rspamd_task *task, + struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_cache = C_API_SYMCACHE(cache); + + if (cache_runtime == nullptr) { + return FALSE; + } + + return cache_runtime->disable_symbol(task, *real_cache, symbol); +} + +gboolean +rspamd_symcache_enable_symbol(struct rspamd_task *task, + struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_cache = C_API_SYMCACHE(cache); + + if (cache_runtime == nullptr) { + return FALSE; + } + + return cache_runtime->enable_symbol(task, *real_cache, symbol); +} + +void rspamd_symcache_disable_symbol_static(struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + real_cache->disable_symbol_delayed(symbol); +} + +void rspamd_symcache_enable_symbol_static(struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + real_cache->enable_symbol_delayed(symbol); +} + +/* A real structure to match C results without extra copying */ +struct rspamd_symcache_real_timeout_result { + struct rspamd_symcache_timeout_result c_api_result; + std::vector<std::pair<double, const rspamd::symcache::cache_item *>> elts; +}; + +struct rspamd_symcache_timeout_result * +rspamd_symcache_get_max_timeout(struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + auto *res = new rspamd_symcache_real_timeout_result; + + res->c_api_result.max_timeout = real_cache->get_max_timeout(res->elts); + res->c_api_result.items = reinterpret_cast<struct rspamd_symcache_timeout_item *>(res->elts.data()); + res->c_api_result.nitems = res->elts.size(); + + return &res->c_api_result; +} + +void rspamd_symcache_timeout_result_free(struct rspamd_symcache_timeout_result *res) +{ + auto *real_result = reinterpret_cast<rspamd_symcache_real_timeout_result *>(res); + delete real_result; +} + +gboolean +rspamd_symcache_is_checked(struct rspamd_task *task, + struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_cache = C_API_SYMCACHE(cache); + + if (cache_runtime == nullptr) { + return FALSE; + } + + return cache_runtime->is_symbol_checked(*real_cache, symbol); +} + +gboolean +rspamd_symcache_process_settings(struct rspamd_task *task, + struct rspamd_symcache *cache) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_cache = C_API_SYMCACHE(cache); + + if (cache_runtime == nullptr) { + return FALSE; + } + + return cache_runtime->process_settings(task, *real_cache); +} + +gboolean +rspamd_symcache_is_item_allowed(struct rspamd_task *task, + struct rspamd_symcache_item *item, + gboolean exec_only) +{ + auto *real_item = C_API_SYMCACHE_ITEM(item); + + if (real_item == nullptr) { + return TRUE; + } + + return real_item->is_allowed(task, exec_only); +} + +gboolean +rspamd_symcache_is_symbol_enabled(struct rspamd_task *task, + struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_cache = C_API_SYMCACHE(cache); + + if (!cache_runtime) { + return TRUE; + } + + return cache_runtime->is_symbol_enabled(task, *real_cache, symbol); +} + +struct rspamd_symcache_dynamic_item * +rspamd_symcache_get_cur_item(struct rspamd_task *task) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + + if (!cache_runtime) { + return nullptr; + } + + return (struct rspamd_symcache_dynamic_item *) cache_runtime->get_cur_item(); +} + +struct rspamd_symcache_dynamic_item * +rspamd_symcache_set_cur_item(struct rspamd_task *task, struct rspamd_symcache_dynamic_item *item) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_dyn_item = C_API_SYMCACHE_DYN_ITEM(item); + + if (!cache_runtime || !real_dyn_item) { + return nullptr; + } + + return (struct rspamd_symcache_dynamic_item *) cache_runtime->set_cur_item(real_dyn_item); +} + +void rspamd_symcache_enable_profile(struct rspamd_task *task) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + if (!cache_runtime) { + return; + } + + cache_runtime->set_profile_mode(true); +} + +guint rspamd_symcache_item_async_inc_full(struct rspamd_task *task, + struct rspamd_symcache_dynamic_item *item, + const gchar *subsystem, + const gchar *loc) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_dyn_item = C_API_SYMCACHE_DYN_ITEM(item); + + auto *static_item = cache_runtime->get_item_by_dynamic_item(real_dyn_item); + msg_debug_cache_task("increase async events counter for %s(%d) = %d + 1; " + "subsystem %s (%s)", + static_item->symbol.c_str(), static_item->id, + real_dyn_item->async_events, subsystem, loc); + + return ++real_dyn_item->async_events; +} + +guint rspamd_symcache_item_async_dec_full(struct rspamd_task *task, + struct rspamd_symcache_dynamic_item *item, + const gchar *subsystem, + const gchar *loc) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_dyn_item = C_API_SYMCACHE_DYN_ITEM(item); + + auto *static_item = cache_runtime->get_item_by_dynamic_item(real_dyn_item); + msg_debug_cache_task("decrease async events counter for %s(%d) = %d - 1; " + "subsystem %s (%s)", + static_item->symbol.c_str(), static_item->id, + real_dyn_item->async_events, subsystem, loc); + + if (G_UNLIKELY(real_dyn_item->async_events == 0)) { + msg_err_cache_task("INTERNAL ERROR: trying decrease async events counter for %s(%d) that is already zero; " + "subsystem %s (%s)", + static_item->symbol.c_str(), static_item->id, + real_dyn_item->async_events, subsystem, loc); + g_abort(); + g_assert_not_reached(); + } + + return --real_dyn_item->async_events; +} + +gboolean +rspamd_symcache_item_async_dec_check_full(struct rspamd_task *task, + struct rspamd_symcache_dynamic_item *item, + const gchar *subsystem, + const gchar *loc) +{ + if (rspamd_symcache_item_async_dec_full(task, item, subsystem, loc) == 0) { + rspamd_symcache_finalize_item(task, item); + + return TRUE; + } + + return FALSE; +} + +struct rspamd_abstract_callback_data * +rspamd_symcache_get_cbdata(struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto *item = real_cache->get_item_by_name(symbol, true); + + if (item) { + return (struct rspamd_abstract_callback_data *) item->get_cbdata(); + } + + return nullptr; +} + +void rspamd_symcache_composites_foreach(struct rspamd_task *task, + struct rspamd_symcache *cache, + GHFunc func, + gpointer fd) +{ + auto *real_cache = C_API_SYMCACHE(cache); + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + + real_cache->composites_foreach([&](const auto *item) { + auto *dyn_item = cache_runtime->get_dynamic_item(item->id); + + if (dyn_item && !dyn_item->started) { + auto *old_item = cache_runtime->set_cur_item(dyn_item); + func((void *) item->get_name().c_str(), item->get_cbdata(), fd); + dyn_item->finished = true; + cache_runtime->set_cur_item(old_item); + } + }); + + cache_runtime->set_cur_item(nullptr); +} + +gboolean +rspamd_symcache_process_symbols(struct rspamd_task *task, + struct rspamd_symcache *cache, + guint stage) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + if (task->symcache_runtime == nullptr) { + task->symcache_runtime = rspamd::symcache::symcache_runtime::create(task, *real_cache); + } + + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + return cache_runtime->process_symbols(task, *real_cache, stage); +} + +void rspamd_symcache_finalize_item(struct rspamd_task *task, + struct rspamd_symcache_dynamic_item *item) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_dyn_item = C_API_SYMCACHE_DYN_ITEM(item); + + cache_runtime->finalize_item(task, real_dyn_item); +} + +void rspamd_symcache_runtime_destroy(struct rspamd_task *task) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + cache_runtime->savepoint_dtor(); +}
\ No newline at end of file diff --git a/src/libserver/symcache/symcache_id_list.hxx b/src/libserver/symcache/symcache_id_list.hxx new file mode 100644 index 0000000..bef4fa9 --- /dev/null +++ b/src/libserver/symcache/symcache_id_list.hxx @@ -0,0 +1,95 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RSPAMD_SYMCACHE_ID_LIST_HXX +#define RSPAMD_SYMCACHE_ID_LIST_HXX +#pragma once + +#include <cstdint> +#include <cstring> // for memset +#include <algorithm>// for sort/bsearch + +#include "config.h" +#include "libutil/mem_pool.h" +#include "contrib/ankerl/svector.h" + +namespace rspamd::symcache { +/* + * This structure is optimised to store ids list: + * - If the first element is -1 then use dynamic part, else use static part + * There is no std::variant to save space + */ + +constexpr const auto id_capacity = 4; +constexpr const auto id_sort_threshold = 32; + +struct id_list { + ankerl::svector<std::uint32_t, id_capacity> data; + + id_list() = default; + + auto reset() + { + data.clear(); + } + + /** + * Returns ids from a compressed list, accepting a mutable reference for number of elements + * @param nids output of the number of elements + * @return + */ + auto get_ids(unsigned &nids) const -> const std::uint32_t * + { + nids = data.size(); + + return data.data(); + } + + auto add_id(std::uint32_t id) -> void + { + data.push_back(id); + + /* Check sort threshold */ + if (data.size() > id_sort_threshold) { + std::sort(data.begin(), data.end()); + } + } + + auto set_ids(const std::uint32_t *ids, std::size_t nids) -> void + { + data.resize(nids); + + for (auto &id: data) { + id = *ids++; + } + + if (data.size() > id_sort_threshold) { + std::sort(data.begin(), data.end()); + } + } + + auto check_id(unsigned int id) const -> bool + { + if (data.size() > id_sort_threshold) { + return std::binary_search(data.begin(), data.end(), id); + } + return std::find(data.begin(), data.end(), id) != data.end(); + } +}; + +}// namespace rspamd::symcache + +#endif//RSPAMD_SYMCACHE_ID_LIST_HXX diff --git a/src/libserver/symcache/symcache_impl.cxx b/src/libserver/symcache/symcache_impl.cxx new file mode 100644 index 0000000..93675ac --- /dev/null +++ b/src/libserver/symcache/symcache_impl.cxx @@ -0,0 +1,1316 @@ +/* + * Copyright 2023 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "lua/lua_common.h" +#include "symcache_internal.hxx" +#include "symcache_item.hxx" +#include "symcache_runtime.hxx" +#include "unix-std.h" +#include "libutil/cxx/file_util.hxx" +#include "libutil/cxx/util.hxx" +#include "fmt/core.h" +#include "contrib/t1ha/t1ha.h" + +#ifdef __has_include +#if __has_include(<version>) +#include <version> +#endif +#endif +#include <cmath> + +namespace rspamd::symcache { + +INIT_LOG_MODULE_PUBLIC(symcache) + +auto symcache::init() -> bool +{ + auto res = true; + reload_time = cfg->cache_reload_time; + + if (cfg->cache_filename != nullptr) { + msg_debug_cache("loading symcache saved data from %s", cfg->cache_filename); + load_items(); + } + + ankerl::unordered_dense::set<int> disabled_ids; + /* Process enabled/disabled symbols */ + for (const auto &[id, it]: items_by_id) { + if (disabled_symbols) { + /* + * Due to the ability to add patterns, this is now O(N^2), but it is done + * once on configuration and the amount of static patterns is usually low + * The possible optimization is to store non patterns in a different set to check it + * quickly. However, it is unlikely that this would be used to something really heavy. + */ + for (const auto &disable_pat: *disabled_symbols) { + if (disable_pat.matches(it->get_name())) { + msg_debug_cache("symbol %s matches %*s disable pattern", it->get_name().c_str(), + (int) disable_pat.to_string_view().size(), disable_pat.to_string_view().data()); + auto need_disable = true; + + if (enabled_symbols) { + for (const auto &enable_pat: *enabled_symbols) { + if (enable_pat.matches(it->get_name())) { + msg_debug_cache("symbol %s matches %*s enable pattern; skip disabling", it->get_name().c_str(), + (int) enable_pat.to_string_view().size(), enable_pat.to_string_view().data()); + need_disable = false; + break; + } + } + } + + if (need_disable) { + disabled_ids.insert(it->id); + + if (it->is_virtual()) { + auto real_elt = it->get_parent(*this); + + if (real_elt) { + disabled_ids.insert(real_elt->id); + + const auto *children = real_elt->get_children(); + if (children != nullptr) { + for (const auto &cld: *children) { + msg_debug_cache("symbol %s is a virtual sibling of the disabled symbol %s", + cld->get_name().c_str(), it->get_name().c_str()); + disabled_ids.insert(cld->id); + } + } + } + } + else { + /* Also disable all virtual children of this element */ + const auto *children = it->get_children(); + + if (children != nullptr) { + for (const auto &cld: *children) { + msg_debug_cache("symbol %s is a virtual child of the disabled symbol %s", + cld->get_name().c_str(), it->get_name().c_str()); + disabled_ids.insert(cld->id); + } + } + } + } + } + } + } + } + + /* Deal with the delayed dependencies */ + msg_debug_cache("resolving delayed dependencies: %d in list", (int) delayed_deps->size()); + for (const auto &delayed_dep: *delayed_deps) { + auto virt_item = get_item_by_name(delayed_dep.from, false); + auto real_item = get_item_by_name(delayed_dep.from, true); + + if (virt_item == nullptr || real_item == nullptr) { + msg_err_cache("cannot register delayed dependency between %s and %s: " + "%s is missing", + delayed_dep.from.data(), + delayed_dep.to.data(), delayed_dep.from.data()); + } + else { + + if (!disabled_ids.contains(real_item->id)) { + msg_debug_cache("delayed between %s(%d:%d) -> %s", + delayed_dep.from.data(), + real_item->id, virt_item->id, + delayed_dep.to.data()); + add_dependency(real_item->id, delayed_dep.to, + virt_item != real_item ? virt_item->id : -1); + } + else { + msg_debug_cache("no delayed between %s(%d:%d) -> %s; %s is disabled", + delayed_dep.from.data(), + real_item->id, virt_item->id, + delayed_dep.to.data(), + delayed_dep.from.data()); + } + } + } + + /* Remove delayed dependencies, as they are no longer needed at this point */ + delayed_deps.reset(); + + /* Physically remove ids that are disabled statically */ + for (auto id_to_disable: disabled_ids) { + /* + * This erasure is inefficient, we can swap the last element with the removed id + * But in this way, our ids are still sorted by addition + */ + + /* Preserve refcount here */ + auto deleted_element_refcount = items_by_id[id_to_disable]; + items_by_id.erase(id_to_disable); + items_by_symbol.erase(deleted_element_refcount->get_name()); + + auto &additional_vec = get_item_specific_vector(*deleted_element_refcount); +#if defined(__cpp_lib_erase_if) + std::erase_if(additional_vec, [id_to_disable](cache_item *elt) { + return elt->id == id_to_disable; + }); +#else + auto it = std::remove_if(additional_vec.begin(), + additional_vec.end(), [id_to_disable](cache_item *elt) { + return elt->id == id_to_disable; + }); + additional_vec.erase(it, additional_vec.end()); +#endif + + /* Refcount is dropped, so the symbol should be freed, ensure that nothing else owns this symbol */ + g_assert(deleted_element_refcount.use_count() == 1); + } + + /* Remove no longer used stuff */ + enabled_symbols.reset(); + disabled_symbols.reset(); + + /* Deal with the delayed conditions */ + msg_debug_cache("resolving delayed conditions: %d in list", (int) delayed_conditions->size()); + for (const auto &delayed_cond: *delayed_conditions) { + auto it = get_item_by_name_mut(delayed_cond.sym, true); + + if (it == nullptr) { + msg_err_cache( + "cannot register delayed condition for %s", + delayed_cond.sym.c_str()); + luaL_unref(delayed_cond.L, LUA_REGISTRYINDEX, delayed_cond.cbref); + } + else { + if (!it->add_condition(delayed_cond.L, delayed_cond.cbref)) { + msg_err_cache( + "cannot register delayed condition for %s: virtual parent; qed", + delayed_cond.sym.c_str()); + g_abort(); + } + + msg_debug_cache("added a condition to the symbol %s", it->symbol.c_str()); + } + } + delayed_conditions.reset(); + + msg_debug_cache("process dependencies"); + for (const auto &[_id, it]: items_by_id) { + it->process_deps(*this); + } + + /* Sorting stuff */ + constexpr auto postfilters_cmp = [](const auto &it1, const auto &it2) -> bool { + return it1->priority < it2->priority; + }; + constexpr auto prefilters_cmp = [](const auto &it1, const auto &it2) -> bool { + return it1->priority > it2->priority; + }; + + msg_debug_cache("sorting stuff"); + std::stable_sort(std::begin(connfilters), std::end(connfilters), prefilters_cmp); + std::stable_sort(std::begin(prefilters), std::end(prefilters), prefilters_cmp); + std::stable_sort(std::begin(postfilters), std::end(postfilters), postfilters_cmp); + std::stable_sort(std::begin(idempotent), std::end(idempotent), postfilters_cmp); + + resort(); + + /* Connect metric symbols with symcache symbols */ + if (cfg->symbols) { + msg_debug_cache("connect metrics"); + g_hash_table_foreach(cfg->symbols, + symcache::metric_connect_cb, + (void *) this); + } + + return res; +} + +auto symcache::load_items() -> bool +{ + auto cached_map = util::raii_mmaped_file::mmap_shared(cfg->cache_filename, + O_RDONLY, PROT_READ); + + if (!cached_map.has_value()) { + if (cached_map.error().category == util::error_category::CRITICAL) { + msg_err_cache("%s", cached_map.error().error_message.data()); + } + else { + msg_info_cache("%s", cached_map.error().error_message.data()); + } + return false; + } + + + if (cached_map->get_size() < (gint) sizeof(symcache_header)) { + msg_info_cache("cannot use file %s, truncated: %z", cfg->cache_filename, + errno, strerror(errno)); + return false; + } + + const auto *hdr = (struct symcache_header *) cached_map->get_map(); + + if (memcmp(hdr->magic, symcache_magic, + sizeof(symcache_magic)) != 0) { + msg_info_cache("cannot use file %s, bad magic", cfg->cache_filename); + + return false; + } + + auto *parser = ucl_parser_new(0); + const auto *p = (const std::uint8_t *) (hdr + 1); + + if (!ucl_parser_add_chunk(parser, p, cached_map->get_size() - sizeof(*hdr))) { + msg_info_cache("cannot use file %s, cannot parse: %s", cfg->cache_filename, + ucl_parser_get_error(parser)); + ucl_parser_free(parser); + + return false; + } + + auto *top = ucl_parser_get_object(parser); + ucl_parser_free(parser); + + if (top == nullptr || ucl_object_type(top) != UCL_OBJECT) { + msg_info_cache("cannot use file %s, bad object", cfg->cache_filename); + ucl_object_unref(top); + + return false; + } + + auto it = ucl_object_iterate_new(top); + const ucl_object_t *cur; + while ((cur = ucl_object_iterate_safe(it, true)) != nullptr) { + auto item_it = items_by_symbol.find(ucl_object_key(cur)); + + if (item_it != items_by_symbol.end()) { + auto item = item_it->second; + /* Copy saved info */ + /* + * XXX: don't save or load weight, it should be obtained from the + * metric + */ +#if 0 + elt = ucl_object_lookup (cur, "weight"); + + if (elt) { + w = ucl_object_todouble (elt); + if (w != 0) { + item->weight = w; + } + } +#endif + const auto *elt = ucl_object_lookup(cur, "time"); + if (elt) { + item->st->avg_time = ucl_object_todouble(elt); + } + + elt = ucl_object_lookup(cur, "count"); + if (elt) { + item->st->total_hits = ucl_object_toint(elt); + item->last_count = item->st->total_hits; + } + + elt = ucl_object_lookup(cur, "frequency"); + if (elt && ucl_object_type(elt) == UCL_OBJECT) { + const ucl_object_t *freq_elt; + + freq_elt = ucl_object_lookup(elt, "avg"); + + if (freq_elt) { + item->st->avg_frequency = ucl_object_todouble(freq_elt); + } + freq_elt = ucl_object_lookup(elt, "stddev"); + + if (freq_elt) { + item->st->stddev_frequency = ucl_object_todouble(freq_elt); + } + } + + if (item->is_virtual() && !item->is_ghost()) { + const auto &parent = item->get_parent(*this); + + if (parent) { + if (parent->st->weight < item->st->weight) { + parent->st->weight = item->st->weight; + } + } + /* + * We maintain avg_time for virtual symbols equal to the + * parent item avg_time + */ + item->st->avg_time = parent->st->avg_time; + } + + total_weight += fabs(item->st->weight); + total_hits += item->st->total_hits; + } + } + + ucl_object_iterate_free(it); + ucl_object_unref(top); + + return true; +} + +template<typename T> +static constexpr auto round_to_hundreds(T x) +{ + return (::floor(x) * 100.0) / 100.0; +} + +bool symcache::save_items() const +{ + if (cfg->cache_filename == nullptr) { + return false; + } + + auto file_sink = util::raii_file_sink::create(cfg->cache_filename, + O_WRONLY | O_TRUNC, 00644); + + if (!file_sink.has_value()) { + if (errno == EEXIST) { + /* Some other process is already writing data, give up silently */ + return false; + } + + msg_err_cache("%s", file_sink.error().error_message.data()); + + return false; + } + + struct symcache_header hdr; + memset(&hdr, 0, sizeof(hdr)); + memcpy(hdr.magic, symcache_magic, sizeof(symcache_magic)); + + if (write(file_sink->get_fd(), &hdr, sizeof(hdr)) == -1) { + msg_err_cache("cannot write to file %s, error %d, %s", cfg->cache_filename, + errno, strerror(errno)); + + return false; + } + + auto *top = ucl_object_typed_new(UCL_OBJECT); + + for (const auto &it: items_by_symbol) { + auto item = it.second; + auto elt = ucl_object_typed_new(UCL_OBJECT); + ucl_object_insert_key(elt, + ucl_object_fromdouble(round_to_hundreds(item->st->weight)), + "weight", 0, false); + ucl_object_insert_key(elt, + ucl_object_fromdouble(round_to_hundreds(item->st->time_counter.mean)), + "time", 0, false); + ucl_object_insert_key(elt, ucl_object_fromint(item->st->total_hits), + "count", 0, false); + + auto *freq = ucl_object_typed_new(UCL_OBJECT); + ucl_object_insert_key(freq, + ucl_object_fromdouble(round_to_hundreds(item->st->frequency_counter.mean)), + "avg", 0, false); + ucl_object_insert_key(freq, + ucl_object_fromdouble(round_to_hundreds(item->st->frequency_counter.stddev)), + "stddev", 0, false); + ucl_object_insert_key(elt, freq, "frequency", 0, false); + + ucl_object_insert_key(top, elt, it.first.data(), 0, true); + } + + auto fp = fdopen(file_sink->get_fd(), "a"); + auto *efunc = ucl_object_emit_file_funcs(fp); + auto ret = ucl_object_emit_full(top, UCL_EMIT_JSON_COMPACT, efunc, nullptr); + ucl_object_emit_funcs_free(efunc); + ucl_object_unref(top); + fclose(fp); + + return ret; +} + +auto symcache::metric_connect_cb(void *k, void *v, void *ud) -> void +{ + auto *cache = (symcache *) ud; + const auto *sym = (const char *) k; + auto *s = (struct rspamd_symbol *) v; + auto weight = *s->weight_ptr; + auto *item = cache->get_item_by_name_mut(sym, false); + + if (item) { + item->st->weight = weight; + s->cache_item = (void *) item; + } +} + + +auto symcache::get_item_by_id(int id, bool resolve_parent) const -> const cache_item * +{ + if (id < 0 || id >= items_by_id.size()) { + msg_err_cache("internal error: requested item with id %d, when we have just %d items in the cache", + id, (int) items_by_id.size()); + return nullptr; + } + + const auto &maybe_item = rspamd::find_map(items_by_id, id); + + if (!maybe_item.has_value()) { + msg_err_cache("internal error: requested item with id %d but it is empty; qed", + id); + return nullptr; + } + + const auto &item = maybe_item.value().get(); + + if (resolve_parent && item->is_virtual()) { + return item->get_parent(*this); + } + + return item.get(); +} + +auto symcache::get_item_by_id_mut(int id, bool resolve_parent) const -> cache_item * +{ + if (id < 0 || id >= items_by_id.size()) { + msg_err_cache("internal error: requested item with id %d, when we have just %d items in the cache", + id, (int) items_by_id.size()); + return nullptr; + } + + const auto &maybe_item = rspamd::find_map(items_by_id, id); + + if (!maybe_item.has_value()) { + msg_err_cache("internal error: requested item with id %d but it is empty; qed", + id); + return nullptr; + } + + const auto &item = maybe_item.value().get(); + + if (resolve_parent && item->is_virtual()) { + return const_cast<cache_item *>(item->get_parent(*this)); + } + + return item.get(); +} + +auto symcache::get_item_by_name(std::string_view name, bool resolve_parent) const -> const cache_item * +{ + auto it = items_by_symbol.find(name); + + if (it == items_by_symbol.end()) { + return nullptr; + } + + if (resolve_parent && it->second->is_virtual()) { + it->second->resolve_parent(*this); + return it->second->get_parent(*this); + } + + return it->second; +} + +auto symcache::get_item_by_name_mut(std::string_view name, bool resolve_parent) const -> cache_item * +{ + auto it = items_by_symbol.find(name); + + if (it == items_by_symbol.end()) { + return nullptr; + } + + if (resolve_parent && it->second->is_virtual()) { + return (cache_item *) it->second->get_parent(*this); + } + + return it->second; +} + +auto symcache::add_dependency(int id_from, std::string_view to, int virtual_id_from) -> void +{ + g_assert(id_from >= 0 && id_from < (gint) items_by_id.size()); + const auto &source = items_by_id[id_from]; + g_assert(source.get() != nullptr); + + source->deps.emplace_back(nullptr, + std::string(to), + id_from, + -1); + + + if (virtual_id_from >= 0) { + g_assert(virtual_id_from < (gint) items_by_id.size()); + /* We need that for settings id propagation */ + const auto &vsource = items_by_id[virtual_id_from]; + g_assert(vsource.get() != nullptr); + vsource->deps.emplace_back(nullptr, + std::string(to), + -1, + virtual_id_from); + } +} + +auto symcache::resort() -> void +{ + auto log_func = RSPAMD_LOG_FUNC; + auto ord = std::make_shared<order_generation>(filters.size() + + prefilters.size() + + composites.size() + + postfilters.size() + + idempotent.size() + + connfilters.size() + + classifiers.size(), + cur_order_gen); + + for (auto &it: filters) { + if (it) { + total_hits += it->st->total_hits; + /* Unmask topological order */ + it->order = 0; + ord->d.emplace_back(it->getptr()); + } + } + + enum class tsort_mask { + PERM, + TEMP + }; + + constexpr auto tsort_unmask = [](cache_item *it) -> auto { + return (it->order & ~((1u << 31) | (1u << 30))); + }; + + /* Recursive topological sort helper */ + const auto tsort_visit = [&](cache_item *it, unsigned cur_order, auto &&rec) { + constexpr auto tsort_mark = [](cache_item *it, tsort_mask how) { + switch (how) { + case tsort_mask::PERM: + it->order |= (1u << 31); + break; + case tsort_mask::TEMP: + it->order |= (1u << 30); + break; + } + }; + constexpr auto tsort_is_marked = [](cache_item *it, tsort_mask how) { + switch (how) { + case tsort_mask::PERM: + return (it->order & (1u << 31)); + case tsort_mask::TEMP: + return (it->order & (1u << 30)); + } + + return 100500u; /* Because fuck compilers, that's why */ + }; + + if (tsort_is_marked(it, tsort_mask::PERM)) { + if (cur_order > tsort_unmask(it)) { + /* Need to recalculate the whole chain */ + it->order = cur_order; /* That also removes all masking */ + } + else { + /* We are fine, stop DFS */ + return; + } + } + else if (tsort_is_marked(it, tsort_mask::TEMP)) { + msg_err_cache_lambda("cyclic dependencies found when checking '%s'!", + it->symbol.c_str()); + return; + } + + tsort_mark(it, tsort_mask::TEMP); + msg_debug_cache_lambda("visiting node: %s (%d)", it->symbol.c_str(), cur_order); + + for (const auto &dep: it->deps) { + msg_debug_cache_lambda("visiting dep: %s (%d)", dep.item->symbol.c_str(), cur_order + 1); + rec(dep.item, cur_order + 1, rec); + } + + it->order = cur_order; + tsort_mark(it, tsort_mask::PERM); + }; + /* + * Topological sort + */ + total_hits = 0; + auto used_items = ord->d.size(); + + for (const auto &it: ord->d) { + if (it->order == 0) { + tsort_visit(it.get(), 0, tsort_visit); + } + } + + + /* Main sorting comparator */ + constexpr auto score_functor = [](auto w, auto f, auto t) -> auto { + auto time_alpha = 1.0, weight_alpha = 0.1, freq_alpha = 0.01; + + return ((w > 0.0 ? w : weight_alpha) * (f > 0.0 ? f : freq_alpha) / + (t > time_alpha ? t : time_alpha)); + }; + + auto cache_order_cmp = [&](const auto &it1, const auto &it2) -> auto { + constexpr const auto topology_mult = 1e7, + priority_mult = 1e6, + augmentations1_mult = 1e5; + auto w1 = tsort_unmask(it1.get()) * topology_mult, + w2 = tsort_unmask(it2.get()) * topology_mult; + + w1 += it1->priority * priority_mult; + w2 += it2->priority * priority_mult; + w1 += it1->get_augmentation_weight() * augmentations1_mult; + w2 += it2->get_augmentation_weight() * augmentations1_mult; + + auto avg_freq = ((double) total_hits / used_items); + auto avg_weight = (total_weight / used_items); + auto f1 = (double) it1->st->total_hits / avg_freq; + auto f2 = (double) it2->st->total_hits / avg_freq; + auto weight1 = std::fabs(it1->st->weight) / avg_weight; + auto weight2 = std::fabs(it2->st->weight) / avg_weight; + auto t1 = it1->st->avg_time; + auto t2 = it2->st->avg_time; + w1 += score_functor(weight1, f1, t1); + w2 += score_functor(weight2, f2, t2); + + return w1 > w2; + }; + + std::stable_sort(std::begin(ord->d), std::end(ord->d), cache_order_cmp); + /* + * Here lives some ugly legacy! + * We have several filters classes, connfilters, prefilters, filters... etc + * + * Our order is meaningful merely for filters, but we have to add other classes + * to understand if those symbols are checked or disabled. + * We can disable symbols for almost everything but not for virtual symbols. + * The rule of thumb is that if a symbol has explicit parent, then it is a + * virtual symbol that follows it's special rules + */ + + /* + * We enrich ord with all other symbol types without any sorting, + * as it is done in another place + */ + constexpr auto append_items_vec = [](const auto &vec, auto &out) { + for (const auto &it: vec) { + if (it) { + out.emplace_back(it->getptr()); + } + } + }; + + append_items_vec(connfilters, ord->d); + append_items_vec(prefilters, ord->d); + append_items_vec(postfilters, ord->d); + append_items_vec(idempotent, ord->d); + append_items_vec(composites, ord->d); + append_items_vec(classifiers, ord->d); + + /* After sorting is done, we can assign all elements in the by_symbol hash */ + for (const auto [i, it]: rspamd::enumerate(ord->d)) { + ord->by_symbol.emplace(it->get_name(), i); + ord->by_cache_id[it->id] = i; + } + /* Finally set the current order */ + std::swap(ord, items_by_order); +} + +auto symcache::add_symbol_with_callback(std::string_view name, + int priority, + symbol_func_t func, + void *user_data, + int flags_and_type) -> int +{ + auto real_type_pair_maybe = item_type_from_c(flags_and_type); + + if (!real_type_pair_maybe.has_value()) { + msg_err_cache("incompatible flags when adding %s: %s", name.data(), + real_type_pair_maybe.error().c_str()); + return -1; + } + + auto real_type_pair = real_type_pair_maybe.value(); + + if (real_type_pair.first != symcache_item_type::FILTER) { + real_type_pair.second |= SYMBOL_TYPE_NOSTAT; + } + if (real_type_pair.second & (SYMBOL_TYPE_GHOST | SYMBOL_TYPE_CALLBACK)) { + real_type_pair.second |= SYMBOL_TYPE_NOSTAT; + } + + if (real_type_pair.first == symcache_item_type::VIRTUAL) { + msg_err_cache("trying to add virtual symbol %s as real (no parent)", name.data()); + return -1; + } + + std::string static_string_name; + + if (name.empty()) { + static_string_name = fmt::format("AUTO_{}_{}", (void *) func, user_data); + msg_warn_cache("trying to add an empty symbol name, convert it to %s", + static_string_name.c_str()); + } + else { + static_string_name = name; + } + + if (real_type_pair.first == symcache_item_type::IDEMPOTENT && priority != 0) { + msg_warn_cache("priority has been set for idempotent symbol %s: %d", + static_string_name.c_str(), priority); + } + + if ((real_type_pair.second & SYMBOL_TYPE_FINE) && priority == 0) { + /* Adjust priority for negative weighted symbols */ + priority = 1; + } + + if (items_by_symbol.contains(static_string_name)) { + msg_err_cache("duplicate symbol name: %s", static_string_name.data()); + return -1; + } + + auto id = items_by_id.size(); + + auto item = cache_item::create_with_function(static_pool, id, + std::move(static_string_name), + priority, func, user_data, + real_type_pair.first, real_type_pair.second); + + items_by_symbol.emplace(item->get_name(), item.get()); + get_item_specific_vector(*item).push_back(item.get()); + items_by_id.emplace(id, std::move(item));// Takes ownership + + if (!(real_type_pair.second & SYMBOL_TYPE_NOSTAT)) { + cksum = t1ha(name.data(), name.size(), cksum); + stats_symbols_count++; + } + + return id; +} + +auto symcache::add_virtual_symbol(std::string_view name, int parent_id, int flags_and_type) -> int +{ + if (name.empty()) { + msg_err_cache("cannot register a virtual symbol with no name; qed"); + return -1; + } + + auto real_type_pair_maybe = item_type_from_c(flags_and_type); + + if (!real_type_pair_maybe.has_value()) { + msg_err_cache("incompatible flags when adding %s: %s", name.data(), + real_type_pair_maybe.error().c_str()); + return -1; + } + + auto real_type_pair = real_type_pair_maybe.value(); + + if (items_by_symbol.contains(name)) { + msg_err_cache("duplicate symbol name: %s", name.data()); + return -1; + } + + if (items_by_id.size() < parent_id) { + msg_err_cache("parent id %d is out of bounds for virtual symbol %s", parent_id, name.data()); + return -1; + } + + auto id = items_by_id.size(); + + auto item = cache_item::create_with_virtual(static_pool, + id, + std::string{name}, + parent_id, real_type_pair.first, real_type_pair.second); + const auto &parent = items_by_id[parent_id].get(); + parent->add_child(item.get()); + items_by_symbol.emplace(item->get_name(), item.get()); + get_item_specific_vector(*item).push_back(item.get()); + items_by_id.emplace(id, std::move(item));// Takes ownership + + return id; +} + +auto symcache::set_peak_cb(int cbref) -> void +{ + if (peak_cb != -1) { + luaL_unref(L, LUA_REGISTRYINDEX, peak_cb); + } + + peak_cb = cbref; + msg_info_cache("registered peak callback"); +} + +auto symcache::add_delayed_condition(std::string_view sym, int cbref) -> void +{ + delayed_conditions->emplace_back(sym, cbref, (lua_State *) cfg->lua_state); +} + +auto symcache::validate(bool strict) -> bool +{ + total_weight = 1.0; + + for (auto &pair: items_by_symbol) { + auto &item = pair.second; + auto ghost = item->st->weight == 0 ? true : false; + auto skipped = !ghost; + + if (item->is_scoreable() && g_hash_table_lookup(cfg->symbols, item->symbol.c_str()) == nullptr) { + if (!std::isnan(cfg->unknown_weight)) { + item->st->weight = cfg->unknown_weight; + auto *s = rspamd_mempool_alloc0_type(static_pool, + struct rspamd_symbol); + /* Legit as we actually never modify this data */ + s->name = (char *) item->symbol.c_str(); + s->weight_ptr = &item->st->weight; + g_hash_table_insert(cfg->symbols, (void *) s->name, (void *) s); + + msg_info_cache("adding unknown symbol %s with weight: %.2f", + item->symbol.c_str(), cfg->unknown_weight); + ghost = false; + skipped = false; + } + else { + skipped = true; + } + } + else { + skipped = false; + } + + if (!ghost && skipped) { + if (!(item->flags & SYMBOL_TYPE_SKIPPED)) { + item->flags |= SYMBOL_TYPE_SKIPPED; + msg_warn_cache("symbol %s has no score registered, skip its check", + item->symbol.c_str()); + } + } + + if (ghost) { + msg_debug_cache("symbol %s is registered as ghost symbol, it won't be inserted " + "to any metric", + item->symbol.c_str()); + } + + if (item->st->weight < 0 && item->priority == 0) { + item->priority++; + } + + if (item->is_virtual()) { + if (!(item->flags & SYMBOL_TYPE_GHOST)) { + auto *parent = const_cast<cache_item *>(item->get_parent(*this)); + + if (parent == nullptr) { + item->resolve_parent(*this); + parent = const_cast<cache_item *>(item->get_parent(*this)); + } + + if (::fabs(parent->st->weight) < ::fabs(item->st->weight)) { + parent->st->weight = item->st->weight; + } + + auto p1 = ::abs(item->priority); + auto p2 = ::abs(parent->priority); + + if (p1 != p2) { + parent->priority = MAX(p1, p2); + item->priority = parent->priority; + } + } + } + + total_weight += fabs(item->st->weight); + } + + /* Now check each metric item and find corresponding symbol in a cache */ + auto ret = true; + GHashTableIter it; + void *k, *v; + g_hash_table_iter_init(&it, cfg->symbols); + + while (g_hash_table_iter_next(&it, &k, &v)) { + auto ignore_symbol = false; + auto sym_def = (struct rspamd_symbol *) v; + + if (sym_def && (sym_def->flags & + (RSPAMD_SYMBOL_FLAG_IGNORE_METRIC | RSPAMD_SYMBOL_FLAG_DISABLED))) { + ignore_symbol = true; + } + + if (!ignore_symbol) { + if (!items_by_symbol.contains((const char *) k)) { + msg_debug_cache( + "symbol '%s' has its score defined but there is no " + "corresponding rule registered", + k); + } + } + else if (sym_def->flags & RSPAMD_SYMBOL_FLAG_DISABLED) { + auto item = get_item_by_name_mut((const char *) k, false); + + if (item) { + item->enabled = FALSE; + } + } + } + + return ret; +} + +auto symcache::counters() const -> ucl_object_t * +{ + auto *top = ucl_object_typed_new(UCL_ARRAY); + constexpr const auto round_float = [](const auto x, const int digits) -> auto { + const auto power10 = ::pow(10, digits); + return (::floor(x * power10) / power10); + }; + + for (auto &pair: items_by_symbol) { + auto &item = pair.second; + auto symbol = pair.first; + + auto *obj = ucl_object_typed_new(UCL_OBJECT); + ucl_object_insert_key(obj, ucl_object_fromlstring(symbol.data(), symbol.size()), + "symbol", 0, false); + + if (item->is_virtual()) { + if (!(item->flags & SYMBOL_TYPE_GHOST)) { + const auto *parent = item->get_parent(*this); + ucl_object_insert_key(obj, + ucl_object_fromdouble(round_float(item->st->weight, 3)), + "weight", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromdouble(round_float(parent->st->avg_frequency, 3)), + "frequency", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromint(parent->st->total_hits), + "hits", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromdouble(round_float(parent->st->avg_time, 3)), + "time", 0, false); + } + else { + ucl_object_insert_key(obj, + ucl_object_fromdouble(round_float(item->st->weight, 3)), + "weight", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromdouble(0.0), + "frequency", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromdouble(0.0), + "hits", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromdouble(0.0), + "time", 0, false); + } + } + else { + ucl_object_insert_key(obj, + ucl_object_fromdouble(round_float(item->st->weight, 3)), + "weight", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromdouble(round_float(item->st->avg_frequency, 3)), + "frequency", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromint(item->st->total_hits), + "hits", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromdouble(round_float(item->st->avg_time, 3)), + "time", 0, false); + } + + ucl_array_append(top, obj); + } + + return top; +} + +auto symcache::periodic_resort(struct ev_loop *ev_loop, double cur_time, double last_resort) -> void +{ + for (const auto &item: filters) { + + if (item->update_counters_check_peak(L, ev_loop, cur_time, last_resort)) { + auto cur_value = (item->st->total_hits - item->last_count) / + (cur_time - last_resort); + auto cur_err = (item->st->avg_frequency - cur_value); + cur_err *= cur_err; + msg_debug_cache("peak found for %s is %.2f, avg: %.2f, " + "stddev: %.2f, error: %.2f, peaks: %d", + item->symbol.c_str(), cur_value, + item->st->avg_frequency, + item->st->stddev_frequency, + cur_err, + item->frequency_peaks); + + if (peak_cb != -1) { + struct ev_loop **pbase; + + lua_rawgeti(L, LUA_REGISTRYINDEX, peak_cb); + pbase = (struct ev_loop **) lua_newuserdata(L, sizeof(*pbase)); + *pbase = ev_loop; + rspamd_lua_setclass(L, "rspamd{ev_base}", -1); + lua_pushlstring(L, item->symbol.c_str(), item->symbol.size()); + lua_pushnumber(L, item->st->avg_frequency); + lua_pushnumber(L, ::sqrt(item->st->stddev_frequency)); + lua_pushnumber(L, cur_value); + lua_pushnumber(L, cur_err); + + if (lua_pcall(L, 6, 0, 0) != 0) { + msg_info_cache("call to peak function for %s failed: %s", + item->symbol.c_str(), lua_tostring(L, -1)); + lua_pop(L, 1); + } + } + } + } +} + +symcache::~symcache() +{ + if (peak_cb != -1) { + luaL_unref(L, LUA_REGISTRYINDEX, peak_cb); + } +} + +auto symcache::maybe_resort() -> bool +{ + if (items_by_order->generation_id != cur_order_gen) { + /* + * Cache has been modified, need to resort it + */ + msg_info_cache("symbols cache has been modified since last check:" + " old id: %ud, new id: %ud", + items_by_order->generation_id, cur_order_gen); + resort(); + + return true; + } + + return false; +} + +auto symcache::get_item_specific_vector(const cache_item &it) -> symcache::items_ptr_vec & +{ + switch (it.get_type()) { + case symcache_item_type::CONNFILTER: + return connfilters; + case symcache_item_type::FILTER: + return filters; + case symcache_item_type::IDEMPOTENT: + return idempotent; + case symcache_item_type::PREFILTER: + return prefilters; + case symcache_item_type::POSTFILTER: + return postfilters; + case symcache_item_type::COMPOSITE: + return composites; + case symcache_item_type::CLASSIFIER: + return classifiers; + case symcache_item_type::VIRTUAL: + return virtual_symbols; + } + + RSPAMD_UNREACHABLE; +} + +auto symcache::process_settings_elt(struct rspamd_config_settings_elt *elt) -> void +{ + + auto id = elt->id; + + if (elt->symbols_disabled) { + /* Process denied symbols */ + ucl_object_iter_t iter = nullptr; + const ucl_object_t *cur; + + while ((cur = ucl_object_iterate(elt->symbols_disabled, &iter, true)) != NULL) { + const auto *sym = ucl_object_key(cur); + auto *item = get_item_by_name_mut(sym, false); + + if (item != nullptr) { + if (item->is_virtual()) { + /* + * Virtual symbols are special: + * we ignore them in symcache but prevent them from being + * inserted. + */ + item->forbidden_ids.add_id(id); + msg_debug_cache("deny virtual symbol %s for settings %ud (%s); " + "parent can still be executed", + sym, id, elt->name); + } + else { + /* Normal symbol, disable it */ + item->forbidden_ids.add_id(id); + msg_debug_cache("deny symbol %s for settings %ud (%s)", + sym, id, elt->name); + } + } + else { + msg_warn_cache("cannot find a symbol to disable %s " + "when processing settings %ud (%s)", + sym, id, elt->name); + } + } + } + + if (elt->symbols_enabled) { + ucl_object_iter_t iter = nullptr; + const ucl_object_t *cur; + + while ((cur = ucl_object_iterate(elt->symbols_enabled, &iter, true)) != nullptr) { + /* Here, we resolve parent and explicitly allow it */ + const auto *sym = ucl_object_key(cur); + + auto *item = get_item_by_name_mut(sym, false); + + if (item != nullptr) { + if (item->is_virtual()) { + auto *parent = get_item_by_name_mut(sym, true); + + if (parent) { + if (elt->symbols_disabled && + ucl_object_lookup(elt->symbols_disabled, parent->symbol.data())) { + msg_err_cache("conflict in %s: cannot enable disabled symbol %s, " + "wanted to enable symbol %s", + elt->name, parent->symbol.data(), sym); + continue; + } + + parent->exec_only_ids.add_id(id); + msg_debug_cache("allow just execution of symbol %s for settings %ud (%s)", + parent->symbol.data(), id, elt->name); + } + } + + item->allowed_ids.add_id(id); + msg_debug_cache("allow execution of symbol %s for settings %ud (%s)", + sym, id, elt->name); + } + else { + msg_warn_cache("cannot find a symbol to enable %s " + "when processing settings %ud (%s)", + sym, id, elt->name); + } + } + } +} + +auto symcache::get_max_timeout(std::vector<std::pair<double, const cache_item *>> &elts) const -> double +{ + auto accumulated_timeout = 0.0; + auto log_func = RSPAMD_LOG_FUNC; + ankerl::unordered_dense::set<const cache_item *> seen_items; + + auto get_item_timeout = [](cache_item *it) { + return it->get_numeric_augmentation("timeout").value_or(0.0); + }; + + /* This function returns the timeout for an item and all it's dependencies */ + auto get_filter_timeout = [&](cache_item *it, auto self) -> double { + auto own_timeout = get_item_timeout(it); + auto max_child_timeout = 0.0; + + for (const auto &dep: it->deps) { + auto cld_timeout = self(dep.item, self); + + if (cld_timeout > max_child_timeout) { + max_child_timeout = cld_timeout; + } + } + + return own_timeout + max_child_timeout; + }; + + /* For prefilters and postfilters, we just care about priorities */ + auto pre_postfilter_iter = [&](const items_ptr_vec &vec) -> double { + auto saved_priority = -1; + auto max_timeout = 0.0, added_timeout = 0.0; + const cache_item *max_elt = nullptr; + for (const auto &it: vec) { + if (it->priority != saved_priority && max_elt != nullptr && max_timeout > 0) { + if (!seen_items.contains(max_elt)) { + accumulated_timeout += max_timeout; + added_timeout += max_timeout; + + msg_debug_cache_lambda("added %.2f to the timeout (%.2f) as the priority has changed (%d -> %d); " + "symbol: %s", + max_timeout, accumulated_timeout, saved_priority, it->priority, + max_elt->symbol.c_str()); + elts.emplace_back(max_timeout, max_elt); + seen_items.insert(max_elt); + } + max_timeout = 0; + saved_priority = it->priority; + max_elt = nullptr; + } + + auto timeout = get_item_timeout(it); + + if (timeout > max_timeout) { + max_timeout = timeout; + max_elt = it; + } + } + + if (max_elt != nullptr && max_timeout > 0) { + if (!seen_items.contains(max_elt)) { + accumulated_timeout += max_timeout; + added_timeout += max_timeout; + + msg_debug_cache_lambda("added %.2f to the timeout (%.2f) end of processing; " + "symbol: %s", + max_timeout, accumulated_timeout, + max_elt->symbol.c_str()); + elts.emplace_back(max_timeout, max_elt); + seen_items.insert(max_elt); + } + } + + return added_timeout; + }; + + auto prefilters_timeout = pre_postfilter_iter(this->prefilters); + + /* For normal filters, we check the maximum chain of the dependencies + * This function might have O(N^2) complexity if all symbols are in a single + * dependencies chain. But it is not the case in practice + */ + double max_filters_timeout = 0; + for (const auto &it: this->filters) { + auto timeout = get_filter_timeout(it, get_filter_timeout); + + if (timeout > max_filters_timeout) { + max_filters_timeout = timeout; + if (!seen_items.contains(it)) { + elts.emplace_back(timeout, it); + seen_items.insert(it); + } + } + } + + accumulated_timeout += max_filters_timeout; + + auto postfilters_timeout = pre_postfilter_iter(this->postfilters); + auto idempotent_timeout = pre_postfilter_iter(this->idempotent); + + /* Sort in decreasing order by timeout */ + std::stable_sort(std::begin(elts), std::end(elts), + [](const auto &p1, const auto &p2) { + return p1.first > p2.first; + }); + + msg_debug_cache("overall cache timeout: %.2f, %.2f from prefilters," + " %.2f from postfilters, %.2f from idempotent filters," + " %.2f from normal filters", + accumulated_timeout, prefilters_timeout, postfilters_timeout, + idempotent_timeout, max_filters_timeout); + + return accumulated_timeout; +} + +}// namespace rspamd::symcache
\ No newline at end of file diff --git a/src/libserver/symcache/symcache_internal.hxx b/src/libserver/symcache/symcache_internal.hxx new file mode 100644 index 0000000..255a4b1 --- /dev/null +++ b/src/libserver/symcache/symcache_internal.hxx @@ -0,0 +1,652 @@ +/* + * Copyright 2023 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal C++ structures and classes for symcache + */ + +#ifndef RSPAMD_SYMCACHE_INTERNAL_HXX +#define RSPAMD_SYMCACHE_INTERNAL_HXX +#pragma once + +#include <cmath> +#include <cstdlib> +#include <cstdint> +#include <utility> +#include <vector> +#include <string> +#include <string_view> +#include <memory> +#include <variant> + +#include "rspamd_symcache.h" +#include "contrib/libev/ev.h" +#include "contrib/ankerl/unordered_dense.h" +#include "contrib/expected/expected.hpp" +#include "cfg_file.h" + +#include "symcache_id_list.hxx" + +#define msg_err_cache(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \ + "symcache", log_tag(), \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +#define msg_err_cache_lambda(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \ + "symcache", log_tag(), \ + log_func, \ + __VA_ARGS__) +#define msg_err_cache_task(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \ + "symcache", task->task_pool->tag.uid, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +#define msg_warn_cache(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \ + "symcache", log_tag(), \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +#define msg_info_cache(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \ + "symcache", log_tag(), \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +#define msg_debug_cache(...) rspamd_conditional_debug_fast(NULL, NULL, \ + ::rspamd::symcache::rspamd_symcache_log_id, "symcache", log_tag(), \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +#define msg_debug_cache_lambda(...) rspamd_conditional_debug_fast(NULL, NULL, \ + ::rspamd::symcache::rspamd_symcache_log_id, "symcache", log_tag(), \ + log_func, \ + __VA_ARGS__) +#define msg_debug_cache_task(...) rspamd_conditional_debug_fast(NULL, NULL, \ + ::rspamd::symcache::rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +#define msg_debug_cache_task_lambda(...) rspamd_conditional_debug_fast(NULL, NULL, \ + ::rspamd::symcache::rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \ + log_func, \ + __VA_ARGS__) + +struct lua_State; + +namespace rspamd::symcache { + +/* Defined in symcache_impl.cxx */ +extern int rspamd_symcache_log_id; + +static const std::uint8_t symcache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0}; + +struct symcache_header { + std::uint8_t magic[8]; + unsigned int nitems; + std::uint8_t checksum[64]; + std::uint8_t unused[128]; +}; + +struct cache_item; +using cache_item_ptr = std::shared_ptr<cache_item>; + +/** + * This structure is intended to keep the current ordering for all symbols + * It is designed to be shared among all tasks and keep references to the real + * symbols. + * If some symbol has been added or removed to the symbol cache, it will not affect + * the current order, and it will only be regenerated for the subsequent tasks. + * This allows safe and no copy sharing and keeping track of all symbols in the + * cache runtime. + */ +struct order_generation { + /* All items ordered */ + std::vector<cache_item_ptr> d; + /* Mapping from symbol name to the position in the order array */ + ankerl::unordered_dense::map<std::string_view, unsigned int> by_symbol; + /* Mapping from symbol id to the position in the order array */ + ankerl::unordered_dense::map<unsigned int, unsigned int> by_cache_id; + /* It matches cache->generation_id; if not, a fresh ordering is required */ + unsigned int generation_id; + + explicit order_generation(std::size_t nelts, unsigned id) + : generation_id(id) + { + d.reserve(nelts); + by_symbol.reserve(nelts); + by_cache_id.reserve(nelts); + } + + auto size() const -> auto + { + return d.size(); + } +}; + +using order_generation_ptr = std::shared_ptr<order_generation>; + + +struct delayed_cache_dependency { + std::string from; + std::string to; + + delayed_cache_dependency(std::string_view _from, std::string_view _to) + : from(_from), to(_to) + { + } +}; + +struct delayed_cache_condition { + std::string sym; + int cbref; + lua_State *L; + +public: + delayed_cache_condition(std::string_view sym, int cbref, lua_State *L) + : sym(sym), cbref(cbref), L(L) + { + } +}; + +class delayed_symbol_elt { +private: + std::variant<std::string, rspamd_regexp_t *> content; + +public: + /* Disable copy */ + delayed_symbol_elt() = delete; + delayed_symbol_elt(const delayed_symbol_elt &) = delete; + delayed_symbol_elt &operator=(const delayed_symbol_elt &) = delete; + /* Enable move */ + delayed_symbol_elt(delayed_symbol_elt &&other) noexcept = default; + delayed_symbol_elt &operator=(delayed_symbol_elt &&other) noexcept = default; + + explicit delayed_symbol_elt(std::string_view elt) noexcept + { + if (!elt.empty() && elt[0] == '/') { + /* Possibly regexp */ + auto *re = rspamd_regexp_new_len(elt.data(), elt.size(), nullptr, nullptr); + + if (re != nullptr) { + std::get<rspamd_regexp_t *>(content) = re; + } + else { + std::get<std::string>(content) = elt; + } + } + else { + std::get<std::string>(content) = elt; + } + } + + ~delayed_symbol_elt() + { + if (std::holds_alternative<rspamd_regexp_t *>(content)) { + rspamd_regexp_unref(std::get<rspamd_regexp_t *>(content)); + } + } + + auto matches(std::string_view what) const -> bool + { + return std::visit([&](auto &elt) { + using T = typeof(elt); + if constexpr (std::is_same_v<T, rspamd_regexp_t *>) { + if (rspamd_regexp_match(elt, what.data(), what.size(), false)) { + return true; + } + } + else if constexpr (std::is_same_v<T, std::string>) { + return elt == what; + } + + return false; + }, + content); + } + + auto to_string_view() const -> std::string_view + { + return std::visit([&](auto &elt) { + using T = typeof(elt); + if constexpr (std::is_same_v<T, rspamd_regexp_t *>) { + return std::string_view{rspamd_regexp_get_pattern(elt)}; + } + else if constexpr (std::is_same_v<T, std::string>) { + return std::string_view{elt}; + } + + return std::string_view{}; + }, + content); + } +}; + +struct delayed_symbol_elt_equal { + using is_transparent = void; + auto operator()(const delayed_symbol_elt &a, const delayed_symbol_elt &b) const + { + return a.to_string_view() == b.to_string_view(); + } + auto operator()(const delayed_symbol_elt &a, const std::string_view &b) const + { + return a.to_string_view() == b; + } + auto operator()(const std::string_view &a, const delayed_symbol_elt &b) const + { + return a == b.to_string_view(); + } +}; + +struct delayed_symbol_elt_hash { + using is_transparent = void; + auto operator()(const delayed_symbol_elt &a) const + { + return ankerl::unordered_dense::hash<std::string_view>()(a.to_string_view()); + } + auto operator()(const std::string_view &a) const + { + return ankerl::unordered_dense::hash<std::string_view>()(a); + } +}; + +class symcache { +private: + using items_ptr_vec = std::vector<cache_item *>; + /* Map indexed by symbol name: all symbols must have unique names, so this map holds ownership */ + ankerl::unordered_dense::map<std::string_view, cache_item *> items_by_symbol; + ankerl::unordered_dense::map<int, cache_item_ptr> items_by_id; + + /* Items sorted into some order */ + order_generation_ptr items_by_order; + unsigned int cur_order_gen; + + /* Specific vectors for execution/iteration */ + items_ptr_vec connfilters; + items_ptr_vec prefilters; + items_ptr_vec filters; + items_ptr_vec postfilters; + items_ptr_vec composites; + items_ptr_vec idempotent; + items_ptr_vec classifiers; + items_ptr_vec virtual_symbols; + + /* These are stored within pointer to clean up after init */ + std::unique_ptr<std::vector<delayed_cache_dependency>> delayed_deps; + std::unique_ptr<std::vector<delayed_cache_condition>> delayed_conditions; + /* Delayed statically enabled or disabled symbols */ + using delayed_symbol_names = ankerl::unordered_dense::set<delayed_symbol_elt, + delayed_symbol_elt_hash, delayed_symbol_elt_equal>; + std::unique_ptr<delayed_symbol_names> disabled_symbols; + std::unique_ptr<delayed_symbol_names> enabled_symbols; + + rspamd_mempool_t *static_pool; + std::uint64_t cksum; + double total_weight; + std::size_t stats_symbols_count; + +private: + std::uint64_t total_hits; + + struct rspamd_config *cfg; + lua_State *L; + double reload_time; + double last_profile; + +private: + int peak_cb; + int cache_id; + +private: + /* Internal methods */ + auto load_items() -> bool; + auto resort() -> void; + auto get_item_specific_vector(const cache_item &) -> items_ptr_vec &; + /* Helper for g_hash_table_foreach */ + static auto metric_connect_cb(void *k, void *v, void *ud) -> void; + +public: + explicit symcache(struct rspamd_config *cfg) + : cfg(cfg) + { + /* XXX: do we need a special pool for symcache? I don't think so */ + static_pool = cfg->cfg_pool; + reload_time = cfg->cache_reload_time; + total_hits = 1; + total_weight = 1.0; + cksum = 0xdeadbabe; + peak_cb = -1; + cache_id = rspamd_random_uint64_fast(); + L = (lua_State *) cfg->lua_state; + delayed_conditions = std::make_unique<std::vector<delayed_cache_condition>>(); + delayed_deps = std::make_unique<std::vector<delayed_cache_dependency>>(); + } + + virtual ~symcache(); + + /** + * Saves items on disk (if possible) + * @return + */ + auto save_items() const -> bool; + + /** + * Get an item by ID + * @param id + * @param resolve_parent + * @return + */ + auto get_item_by_id(int id, bool resolve_parent) const -> const cache_item *; + auto get_item_by_id_mut(int id, bool resolve_parent) const -> cache_item *; + /** + * Get an item by it's name + * @param name + * @param resolve_parent + * @return + */ + auto get_item_by_name(std::string_view name, bool resolve_parent) const -> const cache_item *; + /** + * Get an item by it's name, mutable pointer + * @param name + * @param resolve_parent + * @return + */ + auto get_item_by_name_mut(std::string_view name, bool resolve_parent) const -> cache_item *; + + /** + * Add a direct dependency + * @param id_from + * @param to + * @param virtual_id_from + * @return + */ + auto add_dependency(int id_from, std::string_view to, int virtual_id_from) -> void; + + /** + * Add a delayed dependency between symbols that will be resolved on the init stage + * @param from + * @param to + */ + auto add_delayed_dependency(std::string_view from, std::string_view to) -> void + { + if (!delayed_deps) { + delayed_deps = std::make_unique<std::vector<delayed_cache_dependency>>(); + } + + delayed_deps->emplace_back(from, to); + } + + /** + * Adds a symbol to the list of the disabled symbols + * @param sym + * @return + */ + auto disable_symbol_delayed(std::string_view sym) -> bool + { + if (!disabled_symbols) { + disabled_symbols = std::make_unique<delayed_symbol_names>(); + } + + if (!disabled_symbols->contains(sym)) { + disabled_symbols->emplace(sym); + + return true; + } + + return false; + } + + /** + * Adds a symbol to the list of the enabled symbols + * @param sym + * @return + */ + auto enable_symbol_delayed(std::string_view sym) -> bool + { + if (!enabled_symbols) { + enabled_symbols = std::make_unique<delayed_symbol_names>(); + } + + if (!enabled_symbols->contains(sym)) { + enabled_symbols->emplace(sym); + + return true; + } + + return false; + } + + /** + * Initialises the symbols cache, must be called after all symbols are added + * and the config file is loaded + */ + auto init() -> bool; + + /** + * Log helper that returns cfg checksum + * @return + */ + auto log_tag() const -> const char * + { + return cfg->checksum; + } + + /** + * Helper to return a memory pool associated with the cache + * @return + */ + auto get_pool() const + { + return static_pool; + } + + /** + * A method to add a generic symbol with a callback to couple with C API + * @param name name of the symbol, unlike C API it must be "" for callback only (compat) symbols, in this case an automatic name is generated + * @param priority + * @param func + * @param user_data + * @param flags_and_type mix of flags and type in a messy C enum + * @return id of a new symbol or -1 in case of failure + */ + auto add_symbol_with_callback(std::string_view name, + int priority, + symbol_func_t func, + void *user_data, + int flags_and_type) -> int; + /** + * A method to add a generic virtual symbol with no function associated + * @param name must have some value, or a fatal error will strike you + * @param parent_id if this param is -1 then this symbol is associated with nothing + * @param flags_and_type mix of flags and type in a messy C enum + * @return id of a new symbol or -1 in case of failure + */ + auto add_virtual_symbol(std::string_view name, int parent_id, + int flags_and_type) -> int; + + /** + * Sets a lua callback to be called on peaks in execution time + * @param cbref + */ + auto set_peak_cb(int cbref) -> void; + + /** + * Add a delayed condition for a symbol that might not be registered yet + * @param sym + * @param cbref + */ + auto add_delayed_condition(std::string_view sym, int cbref) -> void; + + /** + * Returns number of symbols that needs to be checked in statistical algorithm + * @return + */ + auto get_stats_symbols_count() const + { + return stats_symbols_count; + } + + /** + * Returns a checksum for the cache + * @return + */ + auto get_cksum() const + { + return cksum; + } + + /** + * Validate symbols in the cache + * @param strict + * @return + */ + auto validate(bool strict) -> bool; + + /** + * Returns counters for the cache + * @return + */ + auto counters() const -> ucl_object_t *; + + /** + * Adjusts stats of the cache for the periodic counter + */ + auto periodic_resort(struct ev_loop *ev_loop, double cur_time, double last_resort) -> void; + + /** + * A simple helper to get the reload time + * @return + */ + auto get_reload_time() const + { + return reload_time; + }; + + /** + * Iterate over all symbols using a specific functor + * @tparam Functor + * @param f + */ + template<typename Functor> + auto symbols_foreach(Functor f) -> void + { + for (const auto &sym_it: items_by_symbol) { + f(sym_it.second); + } + } + + /** + * Iterate over all composites using a specific functor + * @tparam Functor + * @param f + */ + template<typename Functor> + auto composites_foreach(Functor f) -> void + { + for (const auto &sym_it: composites) { + f(sym_it); + } + } + + /** + * Iterate over all composites using a specific functor + * @tparam Functor + * @param f + */ + template<typename Functor> + auto connfilters_foreach(Functor f) -> bool + { + return std::all_of(std::begin(connfilters), std::end(connfilters), + [&](const auto &sym_it) { + return f(sym_it); + }); + } + template<typename Functor> + auto prefilters_foreach(Functor f) -> bool + { + return std::all_of(std::begin(prefilters), std::end(prefilters), + [&](const auto &sym_it) { + return f(sym_it); + }); + } + template<typename Functor> + auto postfilters_foreach(Functor f) -> bool + { + return std::all_of(std::begin(postfilters), std::end(postfilters), + [&](const auto &sym_it) { + return f(sym_it); + }); + } + template<typename Functor> + auto idempotent_foreach(Functor f) -> bool + { + return std::all_of(std::begin(idempotent), std::end(idempotent), + [&](const auto &sym_it) { + return f(sym_it); + }); + } + template<typename Functor> + auto filters_foreach(Functor f) -> bool + { + return std::all_of(std::begin(filters), std::end(filters), + [&](const auto &sym_it) { + return f(sym_it); + }); + } + + /** + * Resort cache if anything has been changed since last time + * @return + */ + auto maybe_resort() -> bool; + + /** + * Returns current set of items ordered for sharing ownership + * @return + */ + auto get_cache_order() const -> auto + { + return items_by_order; + } + + /** + * Get last profile timestamp + * @return + */ + auto get_last_profile() const -> auto + { + return last_profile; + } + + /** + * Sets last profile timestamp + * @param last_profile + * @return + */ + auto set_last_profile(double last_profile) + { + symcache::last_profile = last_profile; + } + + /** + * Process settings elt identified by id + * @param elt + */ + auto process_settings_elt(struct rspamd_config_settings_elt *elt) -> void; + + /** + * Returns maximum timeout that is requested by all rules + * @return + */ + auto get_max_timeout(std::vector<std::pair<double, const cache_item *>> &elts) const -> double; +}; + + +}// namespace rspamd::symcache + +#endif//RSPAMD_SYMCACHE_INTERNAL_HXX diff --git a/src/libserver/symcache/symcache_item.cxx b/src/libserver/symcache/symcache_item.cxx new file mode 100644 index 0000000..ac901f5 --- /dev/null +++ b/src/libserver/symcache/symcache_item.cxx @@ -0,0 +1,652 @@ +/* + * Copyright 2023 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "lua/lua_common.h" +#include "symcache_internal.hxx" +#include "symcache_item.hxx" +#include "fmt/core.h" +#include "libserver/task.h" +#include "libutil/cxx/util.hxx" +#include <numeric> +#include <functional> + +namespace rspamd::symcache { + +enum class augmentation_value_type { + NO_VALUE, + STRING_VALUE, + NUMBER_VALUE, +}; + +struct augmentation_info { + int weight = 0; + int implied_flags = 0; + augmentation_value_type value_type = augmentation_value_type::NO_VALUE; +}; + +/* A list of internal augmentations that are known to Rspamd with their weight */ +static const auto known_augmentations = + ankerl::unordered_dense::map<std::string, augmentation_info, rspamd::smart_str_hash, rspamd::smart_str_equal>{ + {"passthrough", {.weight = 10, .implied_flags = SYMBOL_TYPE_IGNORE_PASSTHROUGH}}, + {"single_network", {.weight = 1, .implied_flags = 0}}, + {"no_network", {.weight = 0, .implied_flags = 0}}, + {"many_network", {.weight = 1, .implied_flags = 0}}, + {"important", {.weight = 5, .implied_flags = SYMBOL_TYPE_FINE}}, + {"timeout", { + .weight = 0, + .implied_flags = 0, + .value_type = augmentation_value_type::NUMBER_VALUE, + }}}; + +auto cache_item::get_parent(const symcache &cache) const -> const cache_item * +{ + if (is_virtual()) { + const auto &virtual_sp = std::get<virtual_item>(specific); + + return virtual_sp.get_parent(cache); + } + + return nullptr; +} + +auto cache_item::get_parent_mut(const symcache &cache) -> cache_item * +{ + if (is_virtual()) { + auto &virtual_sp = std::get<virtual_item>(specific); + + return virtual_sp.get_parent_mut(cache); + } + + return nullptr; +} + +auto cache_item::process_deps(const symcache &cache) -> void +{ + /* Allow logging macros to work */ + auto log_tag = [&]() { return cache.log_tag(); }; + + for (auto &dep: deps) { + msg_debug_cache("process real dependency %s on %s", symbol.c_str(), dep.sym.c_str()); + auto *dit = cache.get_item_by_name_mut(dep.sym, true); + + if (dep.vid >= 0) { + /* Case of the virtual symbol that depends on another (maybe virtual) symbol */ + const auto *vdit = cache.get_item_by_name(dep.sym, false); + + if (!vdit) { + if (dit) { + msg_err_cache("cannot add dependency from %s on %s: no dependency symbol registered", + dep.sym.c_str(), dit->symbol.c_str()); + } + } + else { + msg_debug_cache("process virtual dependency %s(%d) on %s(%d)", symbol.c_str(), + dep.vid, vdit->symbol.c_str(), vdit->id); + + unsigned nids = 0; + + /* Propagate ids */ + msg_debug_cache("check id propagation for dependency %s from %s", + symbol.c_str(), dit->symbol.c_str()); + + const auto *ids = dit->allowed_ids.get_ids(nids); + + if (nids > 0) { + msg_debug_cache("propagate allowed ids from %s to %s", + dit->symbol.c_str(), symbol.c_str()); + + allowed_ids.set_ids(ids, nids); + } + + ids = dit->forbidden_ids.get_ids(nids); + + if (nids > 0) { + msg_debug_cache("propagate forbidden ids from %s to %s", + dit->symbol.c_str(), symbol.c_str()); + + forbidden_ids.set_ids(ids, nids); + } + } + } + + if (dit != nullptr) { + if (!dit->is_filter()) { + /* + * Check sanity: + * - filters -> prefilter dependency is OK and always satisfied + * - postfilter -> (filter, prefilter) dep is ok + * - idempotent -> (any) dep is OK + * + * Otherwise, emit error + * However, even if everything is fine this dep is useless ¯\_(ツ)_/¯ + */ + auto ok_dep = false; + + if (dit->get_type() == type) { + ok_dep = true; + } + else if (type < dit->get_type()) { + ok_dep = true; + } + + if (!ok_dep) { + msg_err_cache("cannot add dependency from %s on %s: invalid symbol types", + dep.sym.c_str(), symbol.c_str()); + + continue; + } + } + else { + if (dit->id == id) { + msg_err_cache("cannot add dependency on self: %s -> %s " + "(resolved to %s)", + symbol.c_str(), dep.sym.c_str(), dit->symbol.c_str()); + } + else { + /* Create a reverse dep */ + if (is_virtual()) { + auto *parent = get_parent_mut(cache); + + if (parent) { + dit->rdeps.emplace_back(parent, parent->symbol, parent->id, -1); + dep.item = dit; + dep.id = dit->id; + + msg_debug_cache("added reverse dependency from %d on %d", parent->id, + dit->id); + } + } + else { + dep.item = dit; + dep.id = dit->id; + dit->rdeps.emplace_back(this, symbol, id, -1); + msg_debug_cache("added reverse dependency from %d on %d", id, + dit->id); + } + } + } + } + else if (dep.id >= 0) { + msg_err_cache("cannot find dependency on symbol %s for symbol %s", + dep.sym.c_str(), symbol.c_str()); + + continue; + } + } + + // Remove empty deps + deps.erase(std::remove_if(std::begin(deps), std::end(deps), + [](const auto &dep) { return !dep.item; }), + std::end(deps)); +} + +auto cache_item::resolve_parent(const symcache &cache) -> bool +{ + auto log_tag = [&]() { return cache.log_tag(); }; + + if (is_virtual()) { + auto &virt = std::get<virtual_item>(specific); + + if (virt.get_parent(cache)) { + msg_debug_cache("trying to resolve parent twice for %s", symbol.c_str()); + + return false; + } + + return virt.resolve_parent(cache); + } + else { + msg_warn_cache("trying to resolve a parent for non-virtual symbol %s", symbol.c_str()); + } + + return false; +} + +auto cache_item::update_counters_check_peak(lua_State *L, + struct ev_loop *ev_loop, + double cur_time, + double last_resort) -> bool +{ + auto ret = false; + static const double decay_rate = 0.25; + + st->total_hits += st->hits; + g_atomic_int_set(&st->hits, 0); + + if (last_count > 0) { + auto cur_value = (st->total_hits - last_count) / + (cur_time - last_resort); + rspamd_set_counter_ema(&st->frequency_counter, + cur_value, decay_rate); + st->avg_frequency = st->frequency_counter.mean; + st->stddev_frequency = st->frequency_counter.stddev; + + auto cur_err = (st->avg_frequency - cur_value); + cur_err *= cur_err; + + if (st->frequency_counter.number > 10 && + cur_err > ::sqrt(st->stddev_frequency) * 3) { + frequency_peaks++; + ret = true; + } + } + + last_count = st->total_hits; + + if (cd->number > 0) { + if (!is_virtual()) { + st->avg_time = cd->mean; + rspamd_set_counter_ema(&st->time_counter, + st->avg_time, decay_rate); + st->avg_time = st->time_counter.mean; + memset(cd, 0, sizeof(*cd)); + } + } + + return ret; +} + +auto cache_item::inc_frequency(const char *sym_name, symcache &cache) -> void +{ + if (sym_name && symbol != sym_name) { + if (is_filter()) { + const auto *children = get_children(); + if (children) { + /* Likely a callback symbol with some virtual symbol that needs to be adjusted */ + for (const auto &cld: *children) { + if (cld->get_name() == sym_name) { + cld->inc_frequency(sym_name, cache); + } + } + } + } + else { + /* Name not equal to symbol name, so we need to find the proper name */ + auto *another_item = cache.get_item_by_name_mut(sym_name, false); + if (another_item != nullptr) { + another_item->inc_frequency(sym_name, cache); + } + } + } + else { + /* Symbol and sym name are the same */ + g_atomic_int_inc(&st->hits); + } +} + +auto cache_item::get_type_str() const -> const char * +{ + switch (type) { + case symcache_item_type::CONNFILTER: + return "connfilter"; + case symcache_item_type::FILTER: + return "filter"; + case symcache_item_type::IDEMPOTENT: + return "idempotent"; + case symcache_item_type::PREFILTER: + return "prefilter"; + case symcache_item_type::POSTFILTER: + return "postfilter"; + case symcache_item_type::COMPOSITE: + return "composite"; + case symcache_item_type::CLASSIFIER: + return "classifier"; + case symcache_item_type::VIRTUAL: + return "virtual"; + } + + RSPAMD_UNREACHABLE; +} + +auto cache_item::is_allowed(struct rspamd_task *task, bool exec_only) const -> bool +{ + const auto *what = "execution"; + + if (!exec_only) { + what = "symbol insertion"; + } + + /* Static checks */ + if (!enabled || + (RSPAMD_TASK_IS_EMPTY(task) && !(flags & SYMBOL_TYPE_EMPTY)) || + (flags & SYMBOL_TYPE_MIME_ONLY && !RSPAMD_TASK_IS_MIME(task))) { + + if (!enabled) { + msg_debug_cache_task("skipping %s of %s as it is permanently disabled", + what, symbol.c_str()); + + return false; + } + else { + /* + * If we check merely execution (not insertion), then we disallow + * mime symbols for non mime tasks and vice versa + */ + if (exec_only) { + msg_debug_cache_task("skipping check of %s as it cannot be " + "executed for this task type", + symbol.c_str()); + + return FALSE; + } + } + } + + /* Settings checks */ + if (task->settings_elt != nullptr) { + if (forbidden_ids.check_id(task->settings_elt->id)) { + msg_debug_cache_task("deny %s of %s as it is forbidden for " + "settings id %ud", + what, + symbol.c_str(), + task->settings_elt->id); + + return false; + } + + if (!(flags & SYMBOL_TYPE_EXPLICIT_DISABLE)) { + if (!allowed_ids.check_id(task->settings_elt->id)) { + + if (task->settings_elt->policy == RSPAMD_SETTINGS_POLICY_IMPLICIT_ALLOW) { + msg_debug_cache_task("allow execution of %s settings id %ud " + "allows implicit execution of the symbols;", + symbol.c_str(), + id); + + return true; + } + + if (exec_only) { + /* + * Special case if any of our virtual children are enabled + */ + if (exec_only_ids.check_id(task->settings_elt->id)) { + return true; + } + } + + msg_debug_cache_task("deny %s of %s as it is not listed " + "as allowed for settings id %ud", + what, + symbol.c_str(), + task->settings_elt->id); + return false; + } + } + else { + msg_debug_cache_task("allow %s of %s for " + "settings id %ud as it can be only disabled explicitly", + what, + symbol.c_str(), + task->settings_elt->id); + } + } + else if (flags & SYMBOL_TYPE_EXPLICIT_ENABLE) { + msg_debug_cache_task("deny %s of %s as it must be explicitly enabled", + what, + symbol.c_str()); + return false; + } + + /* Allow all symbols with no settings id */ + return true; +} + +auto cache_item::add_augmentation(const symcache &cache, std::string_view augmentation, + std::optional<std::string_view> value) -> bool +{ + auto log_tag = [&]() { return cache.log_tag(); }; + + if (augmentations.contains(augmentation)) { + msg_warn_cache("duplicate augmentation: %s", augmentation.data()); + + return false; + } + + auto maybe_known = rspamd::find_map(known_augmentations, augmentation); + + if (maybe_known.has_value()) { + auto &known_info = maybe_known.value().get(); + + if (known_info.implied_flags) { + if ((known_info.implied_flags & flags) == 0) { + msg_info_cache("added implied flags (%bd) for symbol %s as it has %s augmentation", + known_info.implied_flags, symbol.data(), augmentation.data()); + flags |= known_info.implied_flags; + } + } + + if (known_info.value_type == augmentation_value_type::NO_VALUE) { + if (value.has_value()) { + msg_err_cache("value specified for augmentation %s, that has no value", + augmentation.data()); + + return false; + } + return augmentations.try_emplace(augmentation, known_info.weight).second; + } + else { + if (!value.has_value()) { + msg_err_cache("value is not specified for augmentation %s, that requires explicit value", + augmentation.data()); + + return false; + } + + if (known_info.value_type == augmentation_value_type::STRING_VALUE) { + return augmentations.try_emplace(augmentation, std::string{value.value()}, + known_info.weight) + .second; + } + else if (known_info.value_type == augmentation_value_type::NUMBER_VALUE) { + /* I wish it was supported properly */ + //auto conv_res = std::from_chars(value->data(), value->size(), num); + char numbuf[128], *endptr = nullptr; + rspamd_strlcpy(numbuf, value->data(), MIN(value->size(), sizeof(numbuf))); + auto num = g_ascii_strtod(numbuf, &endptr); + + if (fabs(num) >= G_MAXFLOAT || std::isnan(num)) { + msg_err_cache("value for augmentation %s is not numeric: %*s", + augmentation.data(), + (int) value->size(), value->data()); + return false; + } + + return augmentations.try_emplace(augmentation, num, + known_info.weight) + .second; + } + } + } + else { + msg_debug_cache("added unknown augmentation %s for symbol %s", + "unknown", augmentation.data(), symbol.data()); + return augmentations.try_emplace(augmentation, 0).second; + } + + // Should not be reached + return false; +} + +auto cache_item::get_augmentation_weight() const -> int +{ + return std::accumulate(std::begin(augmentations), std::end(augmentations), + 0, [](int acc, const auto &map_pair) { + return acc + map_pair.second.weight; + }); +} + +auto cache_item::get_numeric_augmentation(std::string_view name) const -> std::optional<double> +{ + const auto augmentation_value_maybe = rspamd::find_map(this->augmentations, name); + + if (augmentation_value_maybe.has_value()) { + const auto &augmentation = augmentation_value_maybe.value().get(); + + if (std::holds_alternative<double>(augmentation.value)) { + return std::get<double>(augmentation.value); + } + } + + return std::nullopt; +} + + +auto virtual_item::get_parent(const symcache &cache) const -> const cache_item * +{ + if (parent) { + return parent; + } + + return cache.get_item_by_id(parent_id, false); +} + +auto virtual_item::get_parent_mut(const symcache &cache) -> cache_item * +{ + if (parent) { + return parent; + } + + return const_cast<cache_item *>(cache.get_item_by_id(parent_id, false)); +} + +auto virtual_item::resolve_parent(const symcache &cache) -> bool +{ + if (parent) { + return false; + } + + auto item_ptr = cache.get_item_by_id(parent_id, true); + + if (item_ptr) { + parent = const_cast<cache_item *>(item_ptr); + + return true; + } + + return false; +} + +auto item_type_from_c(int type) -> tl::expected<std::pair<symcache_item_type, int>, std::string> +{ + constexpr const auto trivial_types = SYMBOL_TYPE_CONNFILTER | SYMBOL_TYPE_PREFILTER | SYMBOL_TYPE_POSTFILTER | SYMBOL_TYPE_IDEMPOTENT | SYMBOL_TYPE_COMPOSITE | SYMBOL_TYPE_CLASSIFIER | SYMBOL_TYPE_VIRTUAL; + + constexpr auto all_but_one_ty = [&](int type, int exclude_bit) -> auto { + return (type & trivial_types) & (trivial_types & ~exclude_bit); + }; + + if (type & trivial_types) { + auto check_trivial = [&](auto flag, + symcache_item_type ty) -> tl::expected<std::pair<symcache_item_type, int>, std::string> { + if (all_but_one_ty(type, flag)) { + return tl::make_unexpected(fmt::format("invalid flags for a symbol: {}", (int) type)); + } + + return std::make_pair(ty, type & ~flag); + }; + if (type & SYMBOL_TYPE_CONNFILTER) { + return check_trivial(SYMBOL_TYPE_CONNFILTER, symcache_item_type::CONNFILTER); + } + else if (type & SYMBOL_TYPE_PREFILTER) { + return check_trivial(SYMBOL_TYPE_PREFILTER, symcache_item_type::PREFILTER); + } + else if (type & SYMBOL_TYPE_POSTFILTER) { + return check_trivial(SYMBOL_TYPE_POSTFILTER, symcache_item_type::POSTFILTER); + } + else if (type & SYMBOL_TYPE_IDEMPOTENT) { + return check_trivial(SYMBOL_TYPE_IDEMPOTENT, symcache_item_type::IDEMPOTENT); + } + else if (type & SYMBOL_TYPE_COMPOSITE) { + return check_trivial(SYMBOL_TYPE_COMPOSITE, symcache_item_type::COMPOSITE); + } + else if (type & SYMBOL_TYPE_CLASSIFIER) { + return check_trivial(SYMBOL_TYPE_CLASSIFIER, symcache_item_type::CLASSIFIER); + } + else if (type & SYMBOL_TYPE_VIRTUAL) { + return check_trivial(SYMBOL_TYPE_VIRTUAL, symcache_item_type::VIRTUAL); + } + + return tl::make_unexpected(fmt::format("internal error: impossible flags combination: {}", (int) type)); + } + + /* Maybe check other flags combination here? */ + return std::make_pair(symcache_item_type::FILTER, type); +} + +bool operator<(symcache_item_type lhs, symcache_item_type rhs) +{ + auto ret = false; + switch (lhs) { + case symcache_item_type::CONNFILTER: + break; + case symcache_item_type::PREFILTER: + if (rhs == symcache_item_type::CONNFILTER) { + ret = true; + } + break; + case symcache_item_type::FILTER: + if (rhs == symcache_item_type::CONNFILTER || rhs == symcache_item_type::PREFILTER) { + ret = true; + } + break; + case symcache_item_type::POSTFILTER: + if (rhs != symcache_item_type::IDEMPOTENT) { + ret = true; + } + break; + case symcache_item_type::IDEMPOTENT: + default: + break; + } + + return ret; +} + +item_condition::~item_condition() +{ + if (cb != -1 && L != nullptr) { + luaL_unref(L, LUA_REGISTRYINDEX, cb); + } +} + +auto item_condition::check(std::string_view sym_name, struct rspamd_task *task) const -> bool +{ + if (cb != -1 && L != nullptr) { + auto ret = false; + + lua_pushcfunction(L, &rspamd_lua_traceback); + auto err_idx = lua_gettop(L); + + lua_rawgeti(L, LUA_REGISTRYINDEX, cb); + rspamd_lua_task_push(L, task); + + if (lua_pcall(L, 1, 1, err_idx) != 0) { + msg_info_task("call to condition for %s failed: %s", + sym_name.data(), lua_tostring(L, -1)); + } + else { + ret = lua_toboolean(L, -1); + } + + lua_settop(L, err_idx - 1); + + return ret; + } + + return true; +} + +}// namespace rspamd::symcache diff --git a/src/libserver/symcache/symcache_item.hxx b/src/libserver/symcache/symcache_item.hxx new file mode 100644 index 0000000..a60213a --- /dev/null +++ b/src/libserver/symcache/symcache_item.hxx @@ -0,0 +1,561 @@ +/* + * Copyright 2023 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RSPAMD_SYMCACHE_ITEM_HXX +#define RSPAMD_SYMCACHE_ITEM_HXX + +#pragma once + +#include <utility> +#include <vector> +#include <string> +#include <string_view> +#include <memory> +#include <variant> +#include <algorithm> +#include <optional> + +#include "rspamd_symcache.h" +#include "symcache_id_list.hxx" +#include "contrib/expected/expected.hpp" +#include "contrib/libev/ev.h" +#include "symcache_runtime.hxx" +#include "libutil/cxx/hash_util.hxx" + +namespace rspamd::symcache { + +class symcache; +struct cache_item; +using cache_item_ptr = std::shared_ptr<cache_item>; + +enum class symcache_item_type { + CONNFILTER, /* Executed on connection stage */ + PREFILTER, /* Executed before all filters */ + FILTER, /* Normal symbol with a callback */ + POSTFILTER, /* Executed after all filters */ + IDEMPOTENT, /* Executed after postfilters, cannot change results */ + CLASSIFIER, /* A virtual classifier symbol */ + COMPOSITE, /* A virtual composite symbol */ + VIRTUAL, /* A virtual symbol... */ +}; + +/* + * Compare item types: earlier stages symbols are > than later stages symbols + * Order for virtual stuff is not defined. + */ +bool operator<(symcache_item_type lhs, symcache_item_type rhs); + +constexpr static auto item_type_to_str(symcache_item_type t) -> const char * +{ + switch (t) { + case symcache_item_type::CONNFILTER: + return "connfilter"; + case symcache_item_type::PREFILTER: + return "prefilter"; + case symcache_item_type::FILTER: + return "filter"; + case symcache_item_type::POSTFILTER: + return "postfilter"; + case symcache_item_type::IDEMPOTENT: + return "idempotent"; + case symcache_item_type::CLASSIFIER: + return "classifier"; + case symcache_item_type::COMPOSITE: + return "composite"; + case symcache_item_type::VIRTUAL: + return "virtual"; + } +} + +/** + * This is a public helper to convert a legacy C type to a more static type + * @param type input type as a C enum + * @return pair of type safe symcache_item_type + the remaining flags or an error + */ +auto item_type_from_c(int type) -> tl::expected<std::pair<symcache_item_type, int>, std::string>; + +struct item_condition { +private: + lua_State *L = nullptr; + int cb = -1; + +public: + explicit item_condition(lua_State *L_, int cb_) noexcept + : L(L_), cb(cb_) + { + } + item_condition(item_condition &&other) noexcept + { + *this = std::move(other); + } + /* Make it move only */ + item_condition(const item_condition &) = delete; + item_condition &operator=(item_condition &&other) noexcept + { + std::swap(other.L, L); + std::swap(other.cb, cb); + return *this; + } + ~item_condition(); + + auto check(std::string_view sym_name, struct rspamd_task *task) const -> bool; +}; + +class normal_item { +private: + symbol_func_t func = nullptr; + void *user_data = nullptr; + std::vector<cache_item *> virtual_children; + std::vector<item_condition> conditions; + +public: + explicit normal_item(symbol_func_t _func, void *_user_data) + : func(_func), user_data(_user_data) + { + } + + auto add_condition(lua_State *L, int cbref) -> void + { + conditions.emplace_back(L, cbref); + } + + auto call(struct rspamd_task *task, struct rspamd_symcache_dynamic_item *item) const -> void + { + func(task, item, user_data); + } + + auto check_conditions(std::string_view sym_name, struct rspamd_task *task) const -> bool + { + return std::all_of(std::begin(conditions), std::end(conditions), + [&](const auto &cond) { return cond.check(sym_name, task); }); + } + + auto get_cbdata() const -> auto + { + return user_data; + } + + auto add_child(cache_item *ptr) -> void + { + virtual_children.push_back(ptr); + } + + auto get_childen() const -> const std::vector<cache_item *> & + { + return virtual_children; + } +}; + +class virtual_item { +private: + int parent_id = -1; + cache_item *parent = nullptr; + +public: + explicit virtual_item(int _parent_id) + : parent_id(_parent_id) + { + } + + auto get_parent(const symcache &cache) const -> const cache_item *; + auto get_parent_mut(const symcache &cache) -> cache_item *; + + auto resolve_parent(const symcache &cache) -> bool; +}; + +struct cache_dependency { + cache_item *item; /* Real dependency */ + std::string sym; /* Symbolic dep name */ + int id; /* Real from */ + int vid; /* Virtual from */ +public: + /* Default piecewise constructor */ + explicit cache_dependency(cache_item *_item, std::string _sym, int _id, int _vid) + : item(_item), sym(std::move(_sym)), id(_id), vid(_vid) + { + } +}; + +/* + * Used to store augmentation values + */ +struct item_augmentation { + std::variant<std::monostate, std::string, double> value; + int weight; + + explicit item_augmentation(int weight) + : value(std::monostate{}), weight(weight) + { + } + explicit item_augmentation(std::string str_value, int weight) + : value(str_value), weight(weight) + { + } + explicit item_augmentation(double double_value, int weight) + : value(double_value), weight(weight) + { + } +}; + +struct cache_item : std::enable_shared_from_this<cache_item> { + /* The following fields will live in shared memory */ + struct rspamd_symcache_item_stat *st = nullptr; + struct rspamd_counter_data *cd = nullptr; + + /* Unique id - counter */ + int id; + std::uint64_t last_count = 0; + std::string symbol; + symcache_item_type type; + int flags; + + /* Condition of execution */ + bool enabled = true; + + /* Priority */ + int priority = 0; + /* Topological order */ + unsigned int order = 0; + int frequency_peaks = 0; + + /* Specific data for virtual and callback symbols */ + std::variant<normal_item, virtual_item> specific; + + /* Settings ids */ + id_list allowed_ids; + /* Allows execution but not symbols insertion */ + id_list exec_only_ids; + id_list forbidden_ids; + + /* Set of augmentations */ + ankerl::unordered_dense::map<std::string, item_augmentation, + rspamd::smart_str_hash, rspamd::smart_str_equal> + augmentations; + + /* Dependencies */ + std::vector<cache_dependency> deps; + /* Reverse dependencies */ + std::vector<cache_dependency> rdeps; + +public: + /** + * Create a normal item with a callback + * @param name + * @param priority + * @param func + * @param user_data + * @param type + * @param flags + * @return + */ + template<typename T> + static auto create_with_function(rspamd_mempool_t *pool, + int id, + T &&name, + int priority, + symbol_func_t func, + void *user_data, + symcache_item_type type, + int flags) -> cache_item_ptr + { + return std::shared_ptr<cache_item>(new cache_item(pool, + id, std::forward<T>(name), priority, + func, user_data, + type, flags)); + } + + /** + * Create a virtual item + * @param name + * @param priority + * @param parent + * @param type + * @param flags + * @return + */ + template<typename T> + static auto create_with_virtual(rspamd_mempool_t *pool, + int id, + T &&name, + int parent, + symcache_item_type type, + int flags) -> cache_item_ptr + { + return std::shared_ptr<cache_item>(new cache_item(pool, id, std::forward<T>(name), + parent, type, flags)); + } + + /** + * Share ownership on the item + * @return + */ + auto getptr() -> cache_item_ptr + { + return shared_from_this(); + } + + /** + * Process and resolve dependencies for the item + * @param cache + */ + auto process_deps(const symcache &cache) -> void; + + auto is_virtual() const -> bool + { + return std::holds_alternative<virtual_item>(specific); + } + + auto is_filter() const -> bool + { + return std::holds_alternative<normal_item>(specific) && + (type == symcache_item_type::FILTER); + } + + /** + * Returns true if a symbol should have some score defined + * @return + */ + auto is_scoreable() const -> bool + { + return !(flags & SYMBOL_TYPE_CALLBACK) && + ((type == symcache_item_type::FILTER) || + is_virtual() || + (type == symcache_item_type::COMPOSITE) || + (type == symcache_item_type::CLASSIFIER)); + } + + auto is_ghost() const -> bool + { + return flags & SYMBOL_TYPE_GHOST; + } + + auto get_parent(const symcache &cache) const -> const cache_item *; + auto get_parent_mut(const symcache &cache) -> cache_item *; + + auto resolve_parent(const symcache &cache) -> bool; + + auto get_type() const -> auto + { + return type; + } + + auto get_type_str() const -> const char *; + + auto get_name() const -> const std::string & + { + return symbol; + } + + auto get_flags() const -> auto + { + return flags; + }; + + auto add_condition(lua_State *L, int cbref) -> bool + { + if (!is_virtual()) { + auto &normal = std::get<normal_item>(specific); + normal.add_condition(L, cbref); + + return true; + } + + return false; + } + + auto update_counters_check_peak(lua_State *L, + struct ev_loop *ev_loop, + double cur_time, + double last_resort) -> bool; + + /** + * Increase frequency for a symbol + */ + auto inc_frequency(const char *sym_name, symcache &cache) -> void; + + /** + * Check if an item is allowed to be executed not checking item conditions + * @param task + * @param exec_only + * @return + */ + auto is_allowed(struct rspamd_task *task, bool exec_only) const -> bool; + + /** + * Returns callback data + * @return + */ + auto get_cbdata() const -> void * + { + if (std::holds_alternative<normal_item>(specific)) { + const auto &filter_data = std::get<normal_item>(specific); + + return filter_data.get_cbdata(); + } + + return nullptr; + } + + /** + * Check all conditions for an item + * @param task + * @return + */ + auto check_conditions(struct rspamd_task *task) const -> auto + { + if (std::holds_alternative<normal_item>(specific)) { + const auto &filter_data = std::get<normal_item>(specific); + + return filter_data.check_conditions(symbol, task); + } + + return false; + } + + auto call(struct rspamd_task *task, cache_dynamic_item *dyn_item) const -> void + { + if (std::holds_alternative<normal_item>(specific)) { + const auto &filter_data = std::get<normal_item>(specific); + + filter_data.call(task, (struct rspamd_symcache_dynamic_item *) dyn_item); + } + } + + /** + * Add an augmentation to the item, returns `true` if augmentation is known and unique, false otherwise + * @param augmentation + * @return + */ + auto add_augmentation(const symcache &cache, std::string_view augmentation, + std::optional<std::string_view> value) -> bool; + + /** + * Return sum weight of all known augmentations + * @return + */ + auto get_augmentation_weight() const -> int; + + /** + * Returns numeric augmentation value + * @param name + * @return + */ + auto get_numeric_augmentation(std::string_view name) const -> std::optional<double>; + + /** + * Returns string augmentation value + * @param name + * @return + */ + auto get_string_augmentation(std::string_view name) const -> std::optional<std::string_view>; + + /** + * Add a virtual symbol as a child of some normal symbol + * @param ptr + */ + auto add_child(cache_item *ptr) -> void + { + if (std::holds_alternative<normal_item>(specific)) { + auto &filter_data = std::get<normal_item>(specific); + + filter_data.add_child(ptr); + } + else { + g_assert("add child is called for a virtual symbol!"); + } + } + + /** + * Returns virtual children for a normal item + * @param ptr + * @return + */ + auto get_children() const -> const std::vector<cache_item *> * + { + if (std::holds_alternative<normal_item>(specific)) { + const auto &filter_data = std::get<normal_item>(specific); + + return &filter_data.get_childen(); + } + + return nullptr; + } + +private: + /** + * Constructor for a normal symbols with callback + * @param name + * @param _priority + * @param func + * @param user_data + * @param _type + * @param _flags + */ + cache_item(rspamd_mempool_t *pool, + int _id, + std::string &&name, + int _priority, + symbol_func_t func, + void *user_data, + symcache_item_type _type, + int _flags) + : id(_id), + symbol(std::move(name)), + type(_type), + flags(_flags), + priority(_priority), + specific(normal_item{func, user_data}) + { + /* These structures are kept trivial, so they need to be explicitly reset */ + forbidden_ids.reset(); + allowed_ids.reset(); + exec_only_ids.reset(); + st = rspamd_mempool_alloc0_shared_type(pool, std::remove_pointer_t<decltype(st)>); + cd = rspamd_mempool_alloc0_shared_type(pool, std::remove_pointer_t<decltype(cd)>); + } + + /** + * Constructor for a virtual symbol + * @param name + * @param _priority + * @param parent + * @param _type + * @param _flags + */ + cache_item(rspamd_mempool_t *pool, + int _id, + std::string &&name, + int parent, + symcache_item_type _type, + int _flags) + : id(_id), + symbol(std::move(name)), + type(_type), + flags(_flags), + specific(virtual_item{parent}) + { + /* These structures are kept trivial, so they need to be explicitly reset */ + forbidden_ids.reset(); + allowed_ids.reset(); + exec_only_ids.reset(); + st = rspamd_mempool_alloc0_shared_type(pool, std::remove_pointer_t<decltype(st)>); + cd = rspamd_mempool_alloc0_shared_type(pool, std::remove_pointer_t<decltype(cd)>); + } +}; + +}// namespace rspamd::symcache + +#endif//RSPAMD_SYMCACHE_ITEM_HXX diff --git a/src/libserver/symcache/symcache_periodic.hxx b/src/libserver/symcache/symcache_periodic.hxx new file mode 100644 index 0000000..535956b --- /dev/null +++ b/src/libserver/symcache/symcache_periodic.hxx @@ -0,0 +1,89 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef RSPAMD_SYMCACHE_PERIODIC_HXX +#define RSPAMD_SYMCACHE_PERIODIC_HXX + +#pragma once + +#include "config.h" +#include "contrib/libev/ev.h" +#include "symcache_internal.hxx" +#include "worker_util.h" + +namespace rspamd::symcache { +struct cache_refresh_cbdata { +private: + symcache *cache; + struct ev_loop *event_loop; + struct rspamd_worker *w; + double reload_time; + double last_resort; + ev_timer resort_ev; + +public: + explicit cache_refresh_cbdata(symcache *_cache, + struct ev_loop *_ev_base, + struct rspamd_worker *_w) + : cache(_cache), event_loop(_ev_base), w(_w) + { + auto log_tag = [&]() { return cache->log_tag(); }; + last_resort = rspamd_get_ticks(TRUE); + reload_time = cache->get_reload_time(); + auto tm = rspamd_time_jitter(reload_time, 0); + msg_debug_cache("next reload in %.2f seconds", tm); + ev_timer_init(&resort_ev, cache_refresh_cbdata::resort_cb, + tm, tm); + resort_ev.data = (void *) this; + ev_timer_start(event_loop, &resort_ev); + rspamd_mempool_add_destructor(cache->get_pool(), + cache_refresh_cbdata::refresh_dtor, (void *) this); + } + + static void refresh_dtor(void *d) + { + auto *cbdata = (struct cache_refresh_cbdata *) d; + delete cbdata; + } + + static void resort_cb(EV_P_ ev_timer *w, int _revents) + { + auto *cbdata = (struct cache_refresh_cbdata *) w->data; + + auto log_tag = [&]() { return cbdata->cache->log_tag(); }; + + if (rspamd_worker_is_primary_controller(cbdata->w)) { + /* Plan new event */ + auto tm = rspamd_time_jitter(cbdata->reload_time, 0); + msg_debug_cache("resort symbols cache, next reload in %.2f seconds", tm); + cbdata->resort_ev.repeat = tm; + ev_timer_again(EV_A_ w); + auto cur_time = rspamd_get_ticks(FALSE); + cbdata->cache->periodic_resort(cbdata->event_loop, cur_time, cbdata->last_resort); + cbdata->last_resort = cur_time; + } + } + +private: + ~cache_refresh_cbdata() + { + ev_timer_stop(event_loop, &resort_ev); + } +}; +}// namespace rspamd::symcache + +#endif//RSPAMD_SYMCACHE_PERIODIC_HXX diff --git a/src/libserver/symcache/symcache_runtime.cxx b/src/libserver/symcache/symcache_runtime.cxx new file mode 100644 index 0000000..d9622d8 --- /dev/null +++ b/src/libserver/symcache/symcache_runtime.cxx @@ -0,0 +1,823 @@ +/* + * Copyright 2023 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "symcache_internal.hxx" +#include "symcache_item.hxx" +#include "symcache_runtime.hxx" +#include "libutil/cxx/util.hxx" +#include "libserver/task.h" +#include "libmime/scan_result.h" +#include "utlist.h" +#include "libserver/worker_util.h" +#include <limits> +#include <cmath> + +namespace rspamd::symcache { + +/* At least once per minute */ +constexpr static const auto PROFILE_MAX_TIME = 60.0; +/* For messages larger than 2Mb enable profiling */ +constexpr static const auto PROFILE_MESSAGE_SIZE_THRESHOLD = 1024ul * 1024 * 2; +/* Enable profile at least once per this amount of messages processed */ +constexpr static const auto PROFILE_PROBABILITY = 0.01; + +auto symcache_runtime::create(struct rspamd_task *task, symcache &cache) -> symcache_runtime * +{ + cache.maybe_resort(); + + auto &&cur_order = cache.get_cache_order(); + auto *checkpoint = (symcache_runtime *) rspamd_mempool_alloc0(task->task_pool, + sizeof(symcache_runtime) + + sizeof(struct cache_dynamic_item) * cur_order->size()); + + checkpoint->order = cache.get_cache_order(); + + /* Calculate profile probability */ + ev_now_update_if_cheap(task->event_loop); + ev_tstamp now = ev_now(task->event_loop); + checkpoint->profile_start = now; + checkpoint->lim = rspamd_task_get_required_score(task, task->result); + + if ((cache.get_last_profile() == 0.0 || now > cache.get_last_profile() + PROFILE_MAX_TIME) || + (task->msg.len >= PROFILE_MESSAGE_SIZE_THRESHOLD) || + (rspamd_random_double_fast() >= (1 - PROFILE_PROBABILITY))) { + msg_debug_cache_task("enable profiling of symbols for task"); + checkpoint->profile = true; + cache.set_last_profile(now); + } + + task->symcache_runtime = (void *) checkpoint; + + return checkpoint; +} + +auto symcache_runtime::process_settings(struct rspamd_task *task, const symcache &cache) -> bool +{ + if (!task->settings) { + msg_err_task("`process_settings` is called with no settings"); + return false; + } + + const auto *wl = ucl_object_lookup(task->settings, "whitelist"); + + if (wl != nullptr) { + msg_info_task("task is whitelisted"); + task->flags |= RSPAMD_TASK_FLAG_SKIP; + return true; + } + + auto already_disabled = false; + + auto process_group = [&](const ucl_object_t *gr_obj, auto functor) -> void { + ucl_object_iter_t it = nullptr; + const ucl_object_t *cur; + + if (gr_obj) { + while ((cur = ucl_iterate_object(gr_obj, &it, true)) != nullptr) { + if (ucl_object_type(cur) == UCL_STRING) { + auto *gr = (struct rspamd_symbols_group *) + g_hash_table_lookup(task->cfg->groups, + ucl_object_tostring(cur)); + + if (gr) { + GHashTableIter gr_it; + void *k, *v; + g_hash_table_iter_init(&gr_it, gr->symbols); + + while (g_hash_table_iter_next(&gr_it, &k, &v)) { + functor((const char *) k); + } + } + } + } + } + }; + + ucl_object_iter_t it = nullptr; + const ucl_object_t *cur; + + const auto *enabled = ucl_object_lookup(task->settings, "symbols_enabled"); + + if (enabled) { + msg_debug_cache_task("disable all symbols as `symbols_enabled` is found"); + /* Disable all symbols but selected */ + disable_all_symbols(SYMBOL_TYPE_EXPLICIT_DISABLE); + already_disabled = true; + it = nullptr; + + while ((cur = ucl_iterate_object(enabled, &it, true)) != nullptr) { + enable_symbol(task, cache, ucl_object_tostring(cur)); + } + } + + /* Enable groups of symbols */ + enabled = ucl_object_lookup(task->settings, "groups_enabled"); + if (enabled && !already_disabled) { + disable_all_symbols(SYMBOL_TYPE_EXPLICIT_DISABLE); + } + process_group(enabled, [&](const char *sym) { + enable_symbol(task, cache, sym); + }); + + const auto *disabled = ucl_object_lookup(task->settings, "symbols_disabled"); + + if (disabled) { + it = nullptr; + + while ((cur = ucl_iterate_object(disabled, &it, true)) != nullptr) { + disable_symbol(task, cache, ucl_object_tostring(cur)); + } + } + + /* Disable groups of symbols */ + disabled = ucl_object_lookup(task->settings, "groups_disabled"); + process_group(disabled, [&](const char *sym) { + disable_symbol(task, cache, sym); + }); + + /* Update required limit */ + lim = rspamd_task_get_required_score(task, task->result); + + return false; +} + +auto symcache_runtime::disable_all_symbols(int skip_mask) -> void +{ + for (auto [i, item]: rspamd::enumerate(order->d)) { + auto *dyn_item = &dynamic_items[i]; + + if (!(item->get_flags() & skip_mask)) { + dyn_item->finished = true; + dyn_item->started = true; + } + } +} + +auto symcache_runtime::disable_symbol(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool +{ + const auto *item = cache.get_item_by_name(name, true); + + if (item != nullptr) { + + auto *dyn_item = get_dynamic_item(item->id); + + if (dyn_item) { + dyn_item->finished = true; + dyn_item->started = true; + msg_debug_cache_task("disable execution of %s", name.data()); + + return true; + } + else { + msg_debug_cache_task("cannot disable %s: id not found %d", name.data(), item->id); + } + } + else { + msg_debug_cache_task("cannot disable %s: symbol not found", name.data()); + } + + return false; +} + +auto symcache_runtime::enable_symbol(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool +{ + const auto *item = cache.get_item_by_name(name, true); + + if (item != nullptr) { + + auto *dyn_item = get_dynamic_item(item->id); + + if (dyn_item) { + dyn_item->finished = false; + dyn_item->started = false; + msg_debug_cache_task("enable execution of %s", name.data()); + + return true; + } + else { + msg_debug_cache_task("cannot enable %s: id not found %d", name.data(), item->id); + } + } + else { + msg_debug_cache_task("cannot enable %s: symbol not found", name.data()); + } + + return false; +} + +auto symcache_runtime::is_symbol_checked(const symcache &cache, std::string_view name) -> bool +{ + const auto *item = cache.get_item_by_name(name, true); + + if (item != nullptr) { + + auto *dyn_item = get_dynamic_item(item->id); + + if (dyn_item) { + return dyn_item->started; + } + } + + return false; +} + +auto symcache_runtime::is_symbol_enabled(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool +{ + + const auto *item = cache.get_item_by_name(name, true); + if (item) { + + if (!item->is_allowed(task, true)) { + return false; + } + else { + auto *dyn_item = get_dynamic_item(item->id); + + if (dyn_item) { + if (dyn_item->started) { + /* Already started */ + return false; + } + + if (!item->is_virtual()) { + return std::get<normal_item>(item->specific).check_conditions(item->symbol, task); + } + } + else { + /* Unknown item */ + msg_debug_cache_task("cannot enable %s: symbol not found", name.data()); + } + } + } + + return true; +} + +auto symcache_runtime::get_dynamic_item(int id) const -> cache_dynamic_item * +{ + + /* Not found in the cache, do a hash lookup */ + auto our_id_maybe = rspamd::find_map(order->by_cache_id, id); + + if (our_id_maybe) { + return &dynamic_items[our_id_maybe.value()]; + } + + return nullptr; +} + +auto symcache_runtime::process_symbols(struct rspamd_task *task, symcache &cache, unsigned int stage) -> bool +{ + msg_debug_cache_task("symbols processing stage at pass: %d", stage); + + if (RSPAMD_TASK_IS_SKIPPED(task)) { + return true; + } + + switch (stage) { + case RSPAMD_TASK_STAGE_CONNFILTERS: + case RSPAMD_TASK_STAGE_PRE_FILTERS: + case RSPAMD_TASK_STAGE_POST_FILTERS: + case RSPAMD_TASK_STAGE_IDEMPOTENT: + return process_pre_postfilters(task, cache, + rspamd_session_events_pending(task->s), stage); + break; + + case RSPAMD_TASK_STAGE_FILTERS: + return process_filters(task, cache, rspamd_session_events_pending(task->s)); + break; + + default: + g_assert_not_reached(); + } +} + +auto symcache_runtime::process_pre_postfilters(struct rspamd_task *task, + symcache &cache, + int start_events, + unsigned int stage) -> bool +{ + auto saved_priority = std::numeric_limits<int>::min(); + auto all_done = true; + auto log_func = RSPAMD_LOG_FUNC; + auto compare_functor = +[](int a, int b) { return a < b; }; + + auto proc_func = [&](cache_item *item) { + /* + * We can safely ignore all pre/postfilters except idempotent ones and + * those that are marked as ignore passthrough result + */ + if (stage != RSPAMD_TASK_STAGE_IDEMPOTENT && + !(item->flags & SYMBOL_TYPE_IGNORE_PASSTHROUGH)) { + if (check_metric_limit(task)) { + msg_debug_cache_task_lambda("task has already the result being set, ignore further checks"); + + return true; + } + } + + auto dyn_item = get_dynamic_item(item->id); + + if (!dyn_item->started && !dyn_item->finished) { + if (has_slow) { + /* Delay */ + has_slow = false; + + return false; + } + + if (saved_priority == std::numeric_limits<int>::min()) { + saved_priority = item->priority; + } + else { + if (compare_functor(item->priority, saved_priority) && + rspamd_session_events_pending(task->s) > start_events) { + /* + * Delay further checks as we have higher + * priority filters to be processed + */ + return false; + } + } + + return process_symbol(task, cache, item, dyn_item); + } + + /* Continue processing */ + return true; + }; + + switch (stage) { + case RSPAMD_TASK_STAGE_CONNFILTERS: + all_done = cache.connfilters_foreach(proc_func); + break; + case RSPAMD_TASK_STAGE_PRE_FILTERS: + all_done = cache.prefilters_foreach(proc_func); + break; + case RSPAMD_TASK_STAGE_POST_FILTERS: + compare_functor = +[](int a, int b) { return a > b; }; + all_done = cache.postfilters_foreach(proc_func); + break; + case RSPAMD_TASK_STAGE_IDEMPOTENT: + compare_functor = +[](int a, int b) { return a > b; }; + all_done = cache.idempotent_foreach(proc_func); + break; + default: + g_error("invalid invocation"); + break; + } + + return all_done; +} + +auto symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool +{ + auto all_done = true; + auto log_func = RSPAMD_LOG_FUNC; + auto has_passtrough = false; + + for (const auto [idx, item]: rspamd::enumerate(order->d)) { + /* Exclude all non filters */ + if (item->type != symcache_item_type::FILTER) { + /* + * We use breaking the loop as we append non-filters to the end of the list + * so, it is safe to stop processing immediately + */ + break; + } + + if (!(item->flags & (SYMBOL_TYPE_FINE | SYMBOL_TYPE_IGNORE_PASSTHROUGH))) { + if (has_passtrough || check_metric_limit(task)) { + msg_debug_cache_task_lambda("task has already the result being set, ignore further checks"); + has_passtrough = true; + /* Skip this item */ + continue; + } + } + + auto dyn_item = &dynamic_items[idx]; + + if (!dyn_item->started) { + all_done = false; + + if (!check_item_deps(task, cache, item.get(), + dyn_item, false)) { + msg_debug_cache_task("blocked execution of %d(%s) unless deps are " + "resolved", + item->id, item->symbol.c_str()); + + continue; + } + + process_symbol(task, cache, item.get(), dyn_item); + + if (has_slow) { + /* Delay */ + has_slow = false; + + return false; + } + } + } + + return all_done; +} + +auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, cache_item *item, + cache_dynamic_item *dyn_item) -> bool +{ + if (item->type == symcache_item_type::CLASSIFIER || item->type == symcache_item_type::COMPOSITE) { + /* Classifiers are special :( */ + return true; + } + + if (rspamd_session_blocked(task->s)) { + /* + * We cannot add new events as session is either destroyed or + * being cleaned up. + */ + return true; + } + + g_assert(!item->is_virtual()); + if (dyn_item->started) { + /* + * This can actually happen when deps span over different layers + */ + return dyn_item->finished; + } + + /* Check has been started */ + dyn_item->started = true; + auto check = true; + + if (!item->is_allowed(task, true) || !item->check_conditions(task)) { + check = false; + } + + if (check) { + msg_debug_cache_task("execute %s, %d; symbol type = %s", item->symbol.data(), + item->id, item_type_to_str(item->type)); + + if (profile) { + ev_now_update_if_cheap(task->event_loop); + dyn_item->start_msec = (ev_now(task->event_loop) - + profile_start) * + 1e3; + } + dyn_item->async_events = 0; + cur_item = dyn_item; + items_inflight++; + /* Callback now must finalize itself */ + item->call(task, dyn_item); + cur_item = nullptr; + + if (items_inflight == 0) { + return true; + } + + if (dyn_item->async_events == 0 && !dyn_item->finished) { + msg_err_cache_task("critical error: item %s has no async events pending, " + "but it is not finalised", + item->symbol.data()); + g_assert_not_reached(); + } + + return false; + } + else { + dyn_item->finished = true; + } + + return true; +} + +auto symcache_runtime::check_metric_limit(struct rspamd_task *task) -> bool +{ + if (task->flags & RSPAMD_TASK_FLAG_PASS_ALL) { + return false; + } + + /* Check score limit */ + if (!std::isnan(lim)) { + if (task->result->score > lim) { + return true; + } + } + + if (task->result->passthrough_result != nullptr) { + /* We also need to check passthrough results */ + auto *pr = task->result->passthrough_result; + DL_FOREACH(task->result->passthrough_result, pr) + { + struct rspamd_action_config *act_config = + rspamd_find_action_config_for_action(task->result, pr->action); + + /* Skip least results */ + if (pr->flags & RSPAMD_PASSTHROUGH_LEAST) { + continue; + } + + /* Skip disabled actions */ + if (act_config && (act_config->flags & RSPAMD_ACTION_RESULT_DISABLED)) { + continue; + } + + /* Immediately stop on non least passthrough action */ + return true; + } + } + + return false; +} + +auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache, cache_item *item, + cache_dynamic_item *dyn_item, bool check_only) -> bool +{ + constexpr const auto max_recursion = 20; + auto log_func = RSPAMD_LOG_FUNC; + + auto inner_functor = [&](int recursion, cache_item *item, cache_dynamic_item *dyn_item, auto rec_functor) -> bool { + if (recursion > max_recursion) { + msg_err_task_lambda("cyclic dependencies: maximum check level %ud exceed when " + "checking dependencies for %s", + max_recursion, item->symbol.c_str()); + + return true; + } + + auto ret = true; + + for (const auto &dep: item->deps) { + if (!dep.item) { + /* Assume invalid deps as done */ + msg_debug_cache_task_lambda("symbol %d(%s) has invalid dependencies on %d(%s)", + item->id, item->symbol.c_str(), dep.id, dep.sym.c_str()); + continue; + } + + auto *dep_dyn_item = get_dynamic_item(dep.item->id); + + if (!dep_dyn_item->finished) { + if (!dep_dyn_item->started) { + /* Not started */ + if (!check_only) { + if (!rec_functor(recursion + 1, + dep.item, + dep_dyn_item, + rec_functor)) { + + ret = false; + msg_debug_cache_task_lambda("delayed dependency %d(%s) for " + "symbol %d(%s)", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + } + else if (!process_symbol(task, cache, dep.item, dep_dyn_item)) { + /* Now started, but has events pending */ + ret = false; + msg_debug_cache_task_lambda("started check of %d(%s) symbol " + "as dep for " + "%d(%s)", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + } + else { + msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is " + "already processed", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + } + } + else { + msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) " + "cannot be started now", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + ret = false; + } + } + else { + /* Started but not finished */ + msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is " + "still executing", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + ret = false; + } + } + else { + msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is already " + "checked", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + } + } + + return ret; + }; + + return inner_functor(0, item, dyn_item, inner_functor); +} + + +struct rspamd_symcache_delayed_cbdata { + cache_item *item; + struct rspamd_task *task; + symcache_runtime *runtime; + struct rspamd_async_event *event; + struct ev_timer tm; +}; + +static void +rspamd_symcache_delayed_item_fin(gpointer ud) +{ + auto *cbd = (struct rspamd_symcache_delayed_cbdata *) ud; + + cbd->event = nullptr; + cbd->runtime->unset_slow(); + ev_timer_stop(cbd->task->event_loop, &cbd->tm); +} + +static void +rspamd_symcache_delayed_item_cb(EV_P_ ev_timer *w, int what) +{ + auto *cbd = (struct rspamd_symcache_delayed_cbdata *) w->data; + + if (cbd->event) { + cbd->event = nullptr; + + /* Timer will be stopped here */ + rspamd_session_remove_event(cbd->task->s, + rspamd_symcache_delayed_item_fin, cbd); + + cbd->runtime->process_item_rdeps(cbd->task, cbd->item); + } +} + +static void +rspamd_delayed_timer_dtor(gpointer d) +{ + auto *cbd = (struct rspamd_symcache_delayed_cbdata *) d; + + if (cbd->event) { + /* Event has not been executed, this will also stop a timer */ + rspamd_session_remove_event(cbd->task->s, + rspamd_symcache_delayed_item_fin, cbd); + cbd->event = nullptr; + } +} + +auto symcache_runtime::finalize_item(struct rspamd_task *task, cache_dynamic_item *dyn_item) -> void +{ + /* Limit to consider a rule as slow (in milliseconds) */ + constexpr const gdouble slow_diff_limit = 300; + auto *item = get_item_by_dynamic_item(dyn_item); + /* Sanity checks */ + g_assert(items_inflight > 0); + g_assert(item != nullptr); + + if (dyn_item->async_events > 0) { + /* + * XXX: Race condition + * + * It is possible that some async event is still in flight, but we + * already know its result, however, it is the responsibility of that + * event to decrease async events count and call this function + * one more time + */ + msg_debug_cache_task("postpone finalisation of %s(%d) as there are %d " + "async events pending", + item->symbol.c_str(), item->id, dyn_item->async_events); + + return; + } + + msg_debug_cache_task("process finalize for item %s(%d)", item->symbol.c_str(), item->id); + dyn_item->finished = true; + items_inflight--; + cur_item = nullptr; + + auto enable_slow_timer = [&]() -> bool { + auto *cbd = rspamd_mempool_alloc0_type(task->task_pool, rspamd_symcache_delayed_cbdata); + /* Add timer to allow something else to be executed */ + ev_timer *tm = &cbd->tm; + + cbd->event = rspamd_session_add_event(task->s, + rspamd_symcache_delayed_item_fin, cbd, + "symcache"); + cbd->runtime = this; + + /* + * If no event could be added, then we are already in the destruction + * phase. So the main issue is to deal with has slow here + */ + if (cbd->event) { + ev_timer_init(tm, rspamd_symcache_delayed_item_cb, 0.1, 0.0); + ev_set_priority(tm, EV_MINPRI); + rspamd_mempool_add_destructor(task->task_pool, + rspamd_delayed_timer_dtor, cbd); + + cbd->task = task; + cbd->item = item; + tm->data = cbd; + ev_timer_start(task->event_loop, tm); + } + else { + /* Just reset as no timer is added */ + has_slow = FALSE; + return false; + } + + return true; + }; + + if (profile) { + ev_now_update_if_cheap(task->event_loop); + auto diff = ((ev_now(task->event_loop) - profile_start) * 1e3 - + dyn_item->start_msec); + + if (diff > slow_diff_limit) { + + if (!has_slow) { + has_slow = true; + + msg_info_task("slow rule: %s(%d): %.2f ms; enable slow timer delay", + item->symbol.c_str(), item->id, + diff); + + if (enable_slow_timer()) { + /* Allow network execution */ + return; + } + } + else { + msg_info_task("slow rule: %s(%d): %.2f ms", + item->symbol.c_str(), item->id, + diff); + } + } + + if (G_UNLIKELY(RSPAMD_TASK_IS_PROFILING(task))) { + rspamd_task_profile_set(task, item->symbol.c_str(), diff); + } + + if (rspamd_worker_is_scanner(task->worker)) { + rspamd_set_counter(item->cd, diff); + } + } + + process_item_rdeps(task, item); +} + +auto symcache_runtime::process_item_rdeps(struct rspamd_task *task, cache_item *item) -> void +{ + auto *cache_ptr = reinterpret_cast<symcache *>(task->cfg->cache); + + // Avoid race condition with the runtime destruction and the delay timer + if (!order) { + return; + } + + for (const auto &rdep: item->rdeps) { + if (rdep.item) { + auto *dyn_item = get_dynamic_item(rdep.item->id); + if (!dyn_item->started) { + msg_debug_cache_task("check item %d(%s) rdep of %s ", + rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str()); + + if (!check_item_deps(task, *cache_ptr, rdep.item, dyn_item, false)) { + msg_debug_cache_task("blocked execution of %d(%s) rdep of %s " + "unless deps are resolved", + rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str()); + } + else { + process_symbol(task, *cache_ptr, rdep.item, + dyn_item); + } + } + } + } +} + +auto symcache_runtime::get_item_by_dynamic_item(cache_dynamic_item *dyn_item) const -> cache_item * +{ + auto idx = dyn_item - dynamic_items; + + if (idx >= 0 && idx < order->size()) { + return order->d[idx].get(); + } + + msg_err("internal error: invalid index to get: %d", (int) idx); + + return nullptr; +} + +}// namespace rspamd::symcache diff --git a/src/libserver/symcache/symcache_runtime.hxx b/src/libserver/symcache/symcache_runtime.hxx new file mode 100644 index 0000000..aa8f66c --- /dev/null +++ b/src/libserver/symcache/symcache_runtime.hxx @@ -0,0 +1,209 @@ +/* + * Copyright 2023 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/** + * Symcache runtime is produced for each task and it consists of symbols + * being executed, being dynamically disabled/enabled and it also captures + * the current order of the symbols (produced by resort periodic) + */ + +#ifndef RSPAMD_SYMCACHE_RUNTIME_HXX +#define RSPAMD_SYMCACHE_RUNTIME_HXX +#pragma once + +#include "symcache_internal.hxx" + +struct rspamd_scan_result; + +namespace rspamd::symcache { +/** + * These items are saved within task structure and are used to track + * symbols execution. + * Each symcache item occupies a single dynamic item, that currently has 8 bytes + * length + */ +struct cache_dynamic_item { + std::uint16_t start_msec; /* Relative to task time */ + bool started; + bool finished; + std::uint32_t async_events; +}; + +static_assert(sizeof(cache_dynamic_item) == sizeof(std::uint64_t)); +static_assert(std::is_trivial_v<cache_dynamic_item>); + +class symcache_runtime { + unsigned items_inflight; + bool profile; + bool has_slow; + + double profile_start; + double lim; + + struct cache_dynamic_item *cur_item; + order_generation_ptr order; + /* Dynamically expanded as needed */ + mutable struct cache_dynamic_item dynamic_items[]; + /* We allocate this structure merely in memory pool, so destructor is absent */ + ~symcache_runtime() = delete; + + auto process_symbol(struct rspamd_task *task, symcache &cache, cache_item *item, + cache_dynamic_item *dyn_item) -> bool; + /* Specific stages of the processing */ + auto process_pre_postfilters(struct rspamd_task *task, symcache &cache, int start_events, unsigned int stage) -> bool; + auto process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool; + auto check_metric_limit(struct rspamd_task *task) -> bool; + auto check_item_deps(struct rspamd_task *task, symcache &cache, cache_item *item, + cache_dynamic_item *dyn_item, bool check_only) -> bool; + +public: + /* Dropper for a shared ownership */ + auto savepoint_dtor() -> void + { + + /* Drop shared ownership */ + order.reset(); + } + /** + * Creates a cache runtime using task mempool + * @param task + * @param cache + * @return + */ + static auto create(struct rspamd_task *task, symcache &cache) -> symcache_runtime *; + /** + * Process task settings + * @param task + * @return + */ + auto process_settings(struct rspamd_task *task, const symcache &cache) -> bool; + + /** + * Disable all symbols but not touching ones that are in the specific mask + * @param skip_mask + */ + auto disable_all_symbols(int skip_mask) -> void; + + /** + * Disable a symbol (or it's parent) + * @param name + * @return + */ + auto disable_symbol(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool; + + /** + * Enable a symbol (or it's parent) + * @param name + * @return + */ + auto enable_symbol(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool; + + /** + * Checks if an item has been checked/disabled + * @param cache + * @param name + * @return + */ + auto is_symbol_checked(const symcache &cache, std::string_view name) -> bool; + + /** + * Checks if a symbol is enabled for execution, checking all pending conditions + * @param task + * @param cache + * @param name + * @return + */ + auto is_symbol_enabled(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool; + + /** + * Get the current processed item + * @return + */ + auto get_cur_item() const -> auto + { + return cur_item; + } + + /** + * Set the current processed item + * @param item + * @return + */ + auto set_cur_item(cache_dynamic_item *item) -> auto + { + std::swap(item, cur_item); + return item; + } + + /** + * Set profile mode for the runtime + * @param enable + * @return + */ + auto set_profile_mode(bool enable) -> auto + { + std::swap(profile, enable); + return enable; + } + + /** + * Returns the dynamic item by static item id + * @param id + * @return + */ + auto get_dynamic_item(int id) const -> cache_dynamic_item *; + + /** + * Returns static cache item by dynamic cache item + * @return + */ + auto get_item_by_dynamic_item(cache_dynamic_item *) const -> cache_item *; + + /** + * Process symbols in the cache + * @param task + * @param cache + * @param stage + * @return + */ + auto process_symbols(struct rspamd_task *task, symcache &cache, unsigned int stage) -> bool; + + /** + * Finalize execution of some item in the cache + * @param task + * @param item + */ + auto finalize_item(struct rspamd_task *task, cache_dynamic_item *item) -> void; + + /** + * Process unblocked reverse dependencies of the specific item + * @param task + * @param item + */ + auto process_item_rdeps(struct rspamd_task *task, cache_item *item) -> void; + + /* XXX: a helper to allow hiding internal implementation of the slow timer structure */ + auto unset_slow() -> void + { + has_slow = false; + } +}; + + +}// namespace rspamd::symcache + +#endif//RSPAMD_SYMCACHE_RUNTIME_HXX |