diff options
Diffstat (limited to 'src/libserver/roll_history.c')
-rw-r--r-- | src/libserver/roll_history.c | 432 |
1 files changed, 432 insertions, 0 deletions
diff --git a/src/libserver/roll_history.c b/src/libserver/roll_history.c new file mode 100644 index 0000000..f567b0b --- /dev/null +++ b/src/libserver/roll_history.c @@ -0,0 +1,432 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rspamd.h" +#include "libmime/message.h" +#include "lua/lua_common.h" +#include "unix-std.h" +#include "cfg_file_private.h" + +static const gchar rspamd_history_magic_old[] = {'r', 's', 'h', '1'}; + +/** + * Returns new roll history + * @param pool pool for shared memory + * @return new structure + */ +struct roll_history * +rspamd_roll_history_new(rspamd_mempool_t *pool, guint max_rows, + struct rspamd_config *cfg) +{ + struct roll_history *history; + lua_State *L = cfg->lua_state; + + if (pool == NULL || max_rows == 0) { + return NULL; + } + + history = rspamd_mempool_alloc0_shared(pool, sizeof(struct roll_history)); + + /* + * Here, we check if there is any plugin that handles history, + * in this case, we disable this code completely + */ + lua_getglobal(L, "rspamd_plugins"); + if (lua_istable(L, -1)) { + lua_pushstring(L, "history"); + lua_gettable(L, -2); + + if (lua_istable(L, -1)) { + history->disabled = TRUE; + } + + lua_pop(L, 1); + } + + lua_pop(L, 1); + + if (!history->disabled) { + history->rows = rspamd_mempool_alloc0_shared(pool, + sizeof(struct roll_history_row) * max_rows); + history->nrows = max_rows; + } + + return history; +} + +struct history_metric_callback_data { + gchar *pos; + gint remain; +}; + +static void +roll_history_symbols_callback(gpointer key, gpointer value, void *user_data) +{ + struct history_metric_callback_data *cb = user_data; + struct rspamd_symbol_result *s = value; + guint wr; + + if (s->flags & RSPAMD_SYMBOL_RESULT_IGNORED) { + return; + } + + if (cb->remain > 0) { + wr = rspamd_snprintf(cb->pos, cb->remain, "%s, ", s->name); + cb->pos += wr; + cb->remain -= wr; + } +} + +/** + * Update roll history with data from task + * @param history roll history object + * @param task task object + */ +void rspamd_roll_history_update(struct roll_history *history, + struct rspamd_task *task) +{ + guint row_num; + struct roll_history_row *row; + struct rspamd_scan_result *metric_res; + struct history_metric_callback_data cbdata; + struct rspamd_action *action; + + if (history->disabled) { + return; + } + + /* First of all obtain check and obtain row number */ + g_atomic_int_compare_and_exchange(&history->cur_row, history->nrows, 0); +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) + row_num = g_atomic_int_add(&history->cur_row, 1); +#else + row_num = g_atomic_int_exchange_and_add(&history->cur_row, 1); +#endif + + if (row_num < history->nrows) { + row = &history->rows[row_num]; + g_atomic_int_set(&row->completed, FALSE); + } + else { + /* Race condition */ + history->cur_row = 0; + return; + } + + /* Add information from task to roll history */ + if (task->from_addr) { + rspamd_strlcpy(row->from_addr, + rspamd_inet_address_to_string(task->from_addr), + sizeof(row->from_addr)); + } + else { + rspamd_strlcpy(row->from_addr, "unknown", sizeof(row->from_addr)); + } + + row->timestamp = task->task_timestamp; + + /* Strings */ + if (task->message) { + rspamd_strlcpy(row->message_id, MESSAGE_FIELD(task, message_id), + sizeof(row->message_id)); + } + if (task->auth_user) { + rspamd_strlcpy(row->user, task->auth_user, sizeof(row->user)); + } + else { + row->user[0] = '\0'; + } + + /* Get default metric */ + metric_res = task->result; + + if (metric_res == NULL) { + row->symbols[0] = '\0'; + row->action = METRIC_ACTION_NOACTION; + } + else { + row->score = metric_res->score; + action = rspamd_check_action_metric(task, NULL, NULL); + row->action = action->action_type; + row->required_score = rspamd_task_get_required_score(task, metric_res); + cbdata.pos = row->symbols; + cbdata.remain = sizeof(row->symbols); + rspamd_task_symbol_result_foreach(task, NULL, + roll_history_symbols_callback, + &cbdata); + if (cbdata.remain > 0) { + /* Remove last whitespace and comma */ + *cbdata.pos-- = '\0'; + *cbdata.pos-- = '\0'; + *cbdata.pos = '\0'; + } + } + + row->scan_time = task->time_real_finish - task->task_timestamp; + row->len = task->msg.len; + g_atomic_int_set(&row->completed, TRUE); +} + +/** + * Load previously saved history from file + * @param history roll history object + * @param filename filename to load from + * @return TRUE if history has been loaded + */ +gboolean +rspamd_roll_history_load(struct roll_history *history, const gchar *filename) +{ + gint fd; + struct stat st; + gchar magic[sizeof(rspamd_history_magic_old)]; + ucl_object_t *top; + const ucl_object_t *cur, *elt; + struct ucl_parser *parser; + struct roll_history_row *row; + guint n, i; + + g_assert(history != NULL); + if (history->disabled) { + return TRUE; + } + + if (stat(filename, &st) == -1) { + msg_info("cannot load history from %s: %s", filename, + strerror(errno)); + return FALSE; + } + + if ((fd = open(filename, O_RDONLY)) == -1) { + msg_info("cannot load history from %s: %s", filename, + strerror(errno)); + return FALSE; + } + + /* Check for old format */ + if (read(fd, magic, sizeof(magic)) == -1) { + close(fd); + msg_info("cannot read history from %s: %s", filename, + strerror(errno)); + return FALSE; + } + + if (memcmp(magic, rspamd_history_magic_old, sizeof(magic)) == 0) { + close(fd); + msg_warn("cannot read history from old format %s, " + "it will be replaced after restart", + filename); + return FALSE; + } + + parser = ucl_parser_new(0); + + if (!ucl_parser_add_fd(parser, fd)) { + msg_warn("cannot parse history file %s: %s", filename, + ucl_parser_get_error(parser)); + ucl_parser_free(parser); + close(fd); + + return FALSE; + } + + top = ucl_parser_get_object(parser); + ucl_parser_free(parser); + close(fd); + + if (top == NULL) { + msg_warn("cannot parse history file %s: no object", filename); + + return FALSE; + } + + if (ucl_object_type(top) != UCL_ARRAY) { + msg_warn("invalid object type read from: %s", filename); + ucl_object_unref(top); + + return FALSE; + } + + if (top->len > history->nrows) { + msg_warn("stored history is larger than the current one: %ud (file) vs " + "%ud (history)", + top->len, history->nrows); + n = history->nrows; + } + else if (top->len < history->nrows) { + msg_warn( + "stored history is smaller than the current one: %ud (file) vs " + "%ud (history)", + top->len, history->nrows); + n = top->len; + } + else { + n = top->len; + } + + for (i = 0; i < n; i++) { + cur = ucl_array_find_index(top, i); + + if (cur != NULL && ucl_object_type(cur) == UCL_OBJECT) { + row = &history->rows[i]; + memset(row, 0, sizeof(*row)); + + elt = ucl_object_lookup(cur, "time"); + + if (elt && ucl_object_type(elt) == UCL_FLOAT) { + row->timestamp = ucl_object_todouble(elt); + } + + elt = ucl_object_lookup(cur, "id"); + + if (elt && ucl_object_type(elt) == UCL_STRING) { + rspamd_strlcpy(row->message_id, ucl_object_tostring(elt), + sizeof(row->message_id)); + } + + elt = ucl_object_lookup(cur, "symbols"); + + if (elt && ucl_object_type(elt) == UCL_STRING) { + rspamd_strlcpy(row->symbols, ucl_object_tostring(elt), + sizeof(row->symbols)); + } + + elt = ucl_object_lookup(cur, "user"); + + if (elt && ucl_object_type(elt) == UCL_STRING) { + rspamd_strlcpy(row->user, ucl_object_tostring(elt), + sizeof(row->user)); + } + + elt = ucl_object_lookup(cur, "from"); + + if (elt && ucl_object_type(elt) == UCL_STRING) { + rspamd_strlcpy(row->from_addr, ucl_object_tostring(elt), + sizeof(row->from_addr)); + } + + elt = ucl_object_lookup(cur, "len"); + + if (elt && ucl_object_type(elt) == UCL_INT) { + row->len = ucl_object_toint(elt); + } + + elt = ucl_object_lookup(cur, "scan_time"); + + if (elt && ucl_object_type(elt) == UCL_FLOAT) { + row->scan_time = ucl_object_todouble(elt); + } + + elt = ucl_object_lookup(cur, "score"); + + if (elt && ucl_object_type(elt) == UCL_FLOAT) { + row->score = ucl_object_todouble(elt); + } + + elt = ucl_object_lookup(cur, "required_score"); + + if (elt && ucl_object_type(elt) == UCL_FLOAT) { + row->required_score = ucl_object_todouble(elt); + } + + elt = ucl_object_lookup(cur, "action"); + + if (elt && ucl_object_type(elt) == UCL_INT) { + row->action = ucl_object_toint(elt); + } + + row->completed = TRUE; + } + } + + ucl_object_unref(top); + + history->cur_row = n; + + return TRUE; +} + +/** + * Save history to file + * @param history roll history object + * @param filename filename to load from + * @return TRUE if history has been saved + */ +gboolean +rspamd_roll_history_save(struct roll_history *history, const gchar *filename) +{ + gint fd; + FILE *fp; + ucl_object_t *obj, *elt; + guint i; + struct roll_history_row *row; + struct ucl_emitter_functions *emitter_func; + + g_assert(history != NULL); + + if (history->disabled) { + return TRUE; + } + + if ((fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 00600)) == -1) { + msg_info("cannot save history to %s: %s", filename, strerror(errno)); + return FALSE; + } + + fp = fdopen(fd, "w"); + obj = ucl_object_typed_new(UCL_ARRAY); + + for (i = 0; i < history->nrows; i++) { + row = &history->rows[i]; + + if (!row->completed) { + continue; + } + + elt = ucl_object_typed_new(UCL_OBJECT); + + ucl_object_insert_key(elt, ucl_object_fromdouble(row->timestamp), + "time", 0, false); + ucl_object_insert_key(elt, ucl_object_fromstring(row->message_id), + "id", 0, false); + ucl_object_insert_key(elt, ucl_object_fromstring(row->symbols), + "symbols", 0, false); + ucl_object_insert_key(elt, ucl_object_fromstring(row->user), + "user", 0, false); + ucl_object_insert_key(elt, ucl_object_fromstring(row->from_addr), + "from", 0, false); + ucl_object_insert_key(elt, ucl_object_fromint(row->len), + "len", 0, false); + ucl_object_insert_key(elt, ucl_object_fromdouble(row->scan_time), + "scan_time", 0, false); + ucl_object_insert_key(elt, ucl_object_fromdouble(row->score), + "score", 0, false); + ucl_object_insert_key(elt, ucl_object_fromdouble(row->required_score), + "required_score", 0, false); + ucl_object_insert_key(elt, ucl_object_fromint(row->action), + "action", 0, false); + + ucl_array_append(obj, elt); + } + + emitter_func = ucl_object_emit_file_funcs(fp); + ucl_object_emit_full(obj, UCL_EMIT_JSON_COMPACT, emitter_func, NULL); + ucl_object_emit_funcs_free(emitter_func); + ucl_object_unref(obj); + + fclose(fp); + + return TRUE; +} |