diff options
Diffstat (limited to 'src/libserver/fuzzy_backend')
-rw-r--r-- | src/libserver/fuzzy_backend/fuzzy_backend.c | 560 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend/fuzzy_backend.h | 131 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend/fuzzy_backend_redis.c | 1666 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend/fuzzy_backend_redis.h | 67 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c | 1029 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h | 107 |
6 files changed, 3560 insertions, 0 deletions
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend.c b/src/libserver/fuzzy_backend/fuzzy_backend.c new file mode 100644 index 0000000..9099f38 --- /dev/null +++ b/src/libserver/fuzzy_backend/fuzzy_backend.c @@ -0,0 +1,560 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "config.h" +#include "fuzzy_backend.h" +#include "fuzzy_backend_sqlite.h" +#include "fuzzy_backend_redis.h" +#include "cfg_file.h" +#include "fuzzy_wire.h" + +#define DEFAULT_EXPIRE 172800L + +enum rspamd_fuzzy_backend_type { + RSPAMD_FUZZY_BACKEND_SQLITE = 0, + RSPAMD_FUZZY_BACKEND_REDIS = 1, +}; + +static void *rspamd_fuzzy_backend_init_sqlite(struct rspamd_fuzzy_backend *bk, + const ucl_object_t *obj, struct rspamd_config *cfg, GError **err); +static void rspamd_fuzzy_backend_check_sqlite(struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud); +static void rspamd_fuzzy_backend_update_sqlite(struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud); +static void rspamd_fuzzy_backend_count_sqlite(struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud); +static void rspamd_fuzzy_backend_version_sqlite(struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud); +static const gchar *rspamd_fuzzy_backend_id_sqlite(struct rspamd_fuzzy_backend *bk, + void *subr_ud); +static void rspamd_fuzzy_backend_expire_sqlite(struct rspamd_fuzzy_backend *bk, + void *subr_ud); +static void rspamd_fuzzy_backend_close_sqlite(struct rspamd_fuzzy_backend *bk, + void *subr_ud); + +struct rspamd_fuzzy_backend_subr { + void *(*init)(struct rspamd_fuzzy_backend *bk, const ucl_object_t *obj, + struct rspamd_config *cfg, + GError **err); + void (*check)(struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud); + void (*update)(struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud); + void (*count)(struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud); + void (*version)(struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud); + const gchar *(*id)(struct rspamd_fuzzy_backend *bk, void *subr_ud); + void (*periodic)(struct rspamd_fuzzy_backend *bk, void *subr_ud); + void (*close)(struct rspamd_fuzzy_backend *bk, void *subr_ud); +}; + +static const struct rspamd_fuzzy_backend_subr fuzzy_subrs[] = { + [RSPAMD_FUZZY_BACKEND_SQLITE] = { + .init = rspamd_fuzzy_backend_init_sqlite, + .check = rspamd_fuzzy_backend_check_sqlite, + .update = rspamd_fuzzy_backend_update_sqlite, + .count = rspamd_fuzzy_backend_count_sqlite, + .version = rspamd_fuzzy_backend_version_sqlite, + .id = rspamd_fuzzy_backend_id_sqlite, + .periodic = rspamd_fuzzy_backend_expire_sqlite, + .close = rspamd_fuzzy_backend_close_sqlite, + }, + [RSPAMD_FUZZY_BACKEND_REDIS] = { + .init = rspamd_fuzzy_backend_init_redis, + .check = rspamd_fuzzy_backend_check_redis, + .update = rspamd_fuzzy_backend_update_redis, + .count = rspamd_fuzzy_backend_count_redis, + .version = rspamd_fuzzy_backend_version_redis, + .id = rspamd_fuzzy_backend_id_redis, + .periodic = rspamd_fuzzy_backend_expire_redis, + .close = rspamd_fuzzy_backend_close_redis, + }}; + +struct rspamd_fuzzy_backend { + enum rspamd_fuzzy_backend_type type; + gdouble expire; + gdouble sync; + struct ev_loop *event_loop; + rspamd_fuzzy_periodic_cb periodic_cb; + void *periodic_ud; + const struct rspamd_fuzzy_backend_subr *subr; + void *subr_ud; + ev_timer periodic_event; +}; + +static GQuark +rspamd_fuzzy_backend_quark(void) +{ + return g_quark_from_static_string("fuzzy-backend"); +} + +static void * +rspamd_fuzzy_backend_init_sqlite(struct rspamd_fuzzy_backend *bk, + const ucl_object_t *obj, struct rspamd_config *cfg, GError **err) +{ + const ucl_object_t *elt; + + elt = ucl_object_lookup_any(obj, "hashfile", "hash_file", "file", + "database", NULL); + + if (elt == NULL || ucl_object_type(elt) != UCL_STRING) { + g_set_error(err, rspamd_fuzzy_backend_quark(), + EINVAL, "missing sqlite3 path"); + return NULL; + } + + return rspamd_fuzzy_backend_sqlite_open(ucl_object_tostring(elt), + FALSE, err); +} + +static void +rspamd_fuzzy_backend_check_sqlite(struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + struct rspamd_fuzzy_reply rep; + + rep = rspamd_fuzzy_backend_sqlite_check(sq, cmd, bk->expire); + + if (cb) { + cb(&rep, ud); + } +} + +static void +rspamd_fuzzy_backend_update_sqlite(struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + gboolean success = FALSE; + guint i; + struct fuzzy_peer_cmd *io_cmd; + struct rspamd_fuzzy_cmd *cmd; + gpointer ptr; + guint nupdates = 0, nadded = 0, ndeleted = 0, nextended = 0, nignored = 0; + + if (rspamd_fuzzy_backend_sqlite_prepare_update(sq, src)) { + for (i = 0; i < updates->len; i++) { + io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i); + + if (io_cmd->is_shingle) { + cmd = &io_cmd->cmd.shingle.basic; + ptr = &io_cmd->cmd.shingle; + } + else { + cmd = &io_cmd->cmd.normal; + ptr = &io_cmd->cmd.normal; + } + + if (cmd->cmd == FUZZY_WRITE) { + rspamd_fuzzy_backend_sqlite_add(sq, ptr); + nadded++; + nupdates++; + } + else if (cmd->cmd == FUZZY_DEL) { + rspamd_fuzzy_backend_sqlite_del(sq, ptr); + ndeleted++; + nupdates++; + } + else { + if (cmd->cmd == FUZZY_REFRESH) { + nextended++; + } + else { + nignored++; + } + } + } + + if (rspamd_fuzzy_backend_sqlite_finish_update(sq, src, + nupdates > 0)) { + success = TRUE; + } + } + + if (cb) { + cb(success, nadded, ndeleted, nextended, nignored, ud); + } +} + +static void +rspamd_fuzzy_backend_count_sqlite(struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + guint64 nhashes; + + nhashes = rspamd_fuzzy_backend_sqlite_count(sq); + + if (cb) { + cb(nhashes, ud); + } +} + +static void +rspamd_fuzzy_backend_version_sqlite(struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + guint64 rev; + + rev = rspamd_fuzzy_backend_sqlite_version(sq, src); + + if (cb) { + cb(rev, ud); + } +} + +static const gchar * +rspamd_fuzzy_backend_id_sqlite(struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + + return rspamd_fuzzy_sqlite_backend_id(sq); +} +static void +rspamd_fuzzy_backend_expire_sqlite(struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + + rspamd_fuzzy_backend_sqlite_sync(sq, bk->expire, TRUE); +} + +static void +rspamd_fuzzy_backend_close_sqlite(struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + + rspamd_fuzzy_backend_sqlite_close(sq); +} + + +struct rspamd_fuzzy_backend * +rspamd_fuzzy_backend_create(struct ev_loop *ev_base, + const ucl_object_t *config, + struct rspamd_config *cfg, + GError **err) +{ + struct rspamd_fuzzy_backend *bk; + enum rspamd_fuzzy_backend_type type = RSPAMD_FUZZY_BACKEND_SQLITE; + const ucl_object_t *elt; + gdouble expire = DEFAULT_EXPIRE; + + if (config != NULL) { + elt = ucl_object_lookup(config, "backend"); + + if (elt != NULL && ucl_object_type(elt) == UCL_STRING) { + if (strcmp(ucl_object_tostring(elt), "sqlite") == 0) { + type = RSPAMD_FUZZY_BACKEND_SQLITE; + } + else if (strcmp(ucl_object_tostring(elt), "redis") == 0) { + type = RSPAMD_FUZZY_BACKEND_REDIS; + } + else { + g_set_error(err, rspamd_fuzzy_backend_quark(), + EINVAL, "invalid backend type: %s", + ucl_object_tostring(elt)); + return NULL; + } + } + + elt = ucl_object_lookup(config, "expire"); + + if (elt != NULL) { + expire = ucl_object_todouble(elt); + } + } + + bk = g_malloc0(sizeof(*bk)); + bk->event_loop = ev_base; + bk->expire = expire; + bk->type = type; + bk->subr = &fuzzy_subrs[type]; + + if ((bk->subr_ud = bk->subr->init(bk, config, cfg, err)) == NULL) { + g_free(bk); + + return NULL; + } + + return bk; +} + + +void rspamd_fuzzy_backend_check(struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud) +{ + g_assert(bk != NULL); + + bk->subr->check(bk, cmd, cb, ud, bk->subr_ud); +} + +static guint +rspamd_fuzzy_digest_hash(gconstpointer key) +{ + guint ret; + + /* Distributed uniformly already */ + memcpy(&ret, key, sizeof(ret)); + + return ret; +} + +static gboolean +rspamd_fuzzy_digest_equal(gconstpointer v, gconstpointer v2) +{ + return memcmp(v, v2, rspamd_cryptobox_HASHBYTES) == 0; +} + +static void +rspamd_fuzzy_backend_deduplicate_queue(GArray *updates) +{ + GHashTable *seen = g_hash_table_new(rspamd_fuzzy_digest_hash, + rspamd_fuzzy_digest_equal); + struct fuzzy_peer_cmd *io_cmd, *found; + struct rspamd_fuzzy_cmd *cmd; + guchar *digest; + guint i; + + for (i = 0; i < updates->len; i++) { + io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i); + + if (io_cmd->is_shingle) { + cmd = &io_cmd->cmd.shingle.basic; + } + else { + cmd = &io_cmd->cmd.normal; + } + + digest = cmd->digest; + + found = g_hash_table_lookup(seen, digest); + + if (found == NULL) { + /* Add to the seen list, if not a duplicate (huh?) */ + if (cmd->cmd != FUZZY_DUP) { + g_hash_table_insert(seen, digest, io_cmd); + } + } + else { + if (found->cmd.normal.flag != cmd->flag) { + /* TODO: deal with flags better at some point */ + continue; + } + + /* Apply heuristic */ + switch (cmd->cmd) { + case FUZZY_WRITE: + if (found->cmd.normal.cmd == FUZZY_WRITE) { + /* Already seen */ + found->cmd.normal.value += cmd->value; + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + else if (found->cmd.normal.cmd == FUZZY_REFRESH) { + /* Seen refresh command, remove it as write has higher priority */ + g_hash_table_replace(seen, digest, io_cmd); + found->cmd.normal.cmd = FUZZY_DUP; + } + else if (found->cmd.normal.cmd == FUZZY_DEL) { + /* Request delete + add, weird, but ignore add */ + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + break; + case FUZZY_REFRESH: + if (found->cmd.normal.cmd == FUZZY_WRITE) { + /* No need to expire, handled by addition */ + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + else if (found->cmd.normal.cmd == FUZZY_DEL) { + /* Request delete + expire, ignore expire */ + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + else if (found->cmd.normal.cmd == FUZZY_REFRESH) { + /* Already handled */ + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + break; + case FUZZY_DEL: + /* Delete has priority over all other commands */ + g_hash_table_replace(seen, digest, io_cmd); + found->cmd.normal.cmd = FUZZY_DUP; + break; + default: + break; + } + } + } + + g_hash_table_unref(seen); +} + +void rspamd_fuzzy_backend_process_updates(struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb, + void *ud) +{ + g_assert(bk != NULL); + g_assert(updates != NULL); + + if (updates) { + rspamd_fuzzy_backend_deduplicate_queue(updates); + bk->subr->update(bk, updates, src, cb, ud, bk->subr_ud); + } + else if (cb) { + cb(TRUE, 0, 0, 0, 0, ud); + } +} + + +void rspamd_fuzzy_backend_count(struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud) +{ + g_assert(bk != NULL); + + bk->subr->count(bk, cb, ud, bk->subr_ud); +} + + +void rspamd_fuzzy_backend_version(struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud) +{ + g_assert(bk != NULL); + + bk->subr->version(bk, src, cb, ud, bk->subr_ud); +} + +const gchar * +rspamd_fuzzy_backend_id(struct rspamd_fuzzy_backend *bk) +{ + g_assert(bk != NULL); + + if (bk->subr->id) { + return bk->subr->id(bk, bk->subr_ud); + } + + return NULL; +} + +static inline void +rspamd_fuzzy_backend_periodic_sync(struct rspamd_fuzzy_backend *bk) +{ + if (bk->periodic_cb) { + if (bk->periodic_cb(bk->periodic_ud)) { + if (bk->subr->periodic) { + bk->subr->periodic(bk, bk->subr_ud); + } + } + } + else { + if (bk->subr->periodic) { + bk->subr->periodic(bk, bk->subr_ud); + } + } +} + +static void +rspamd_fuzzy_backend_periodic_cb(EV_P_ ev_timer *w, int revents) +{ + struct rspamd_fuzzy_backend *bk = (struct rspamd_fuzzy_backend *) w->data; + gdouble jittered; + + jittered = rspamd_time_jitter(bk->sync, bk->sync / 2.0); + w->repeat = jittered; + rspamd_fuzzy_backend_periodic_sync(bk); + ev_timer_again(EV_A_ w); +} + +void rspamd_fuzzy_backend_start_update(struct rspamd_fuzzy_backend *bk, + gdouble timeout, + rspamd_fuzzy_periodic_cb cb, + void *ud) +{ + gdouble jittered; + + g_assert(bk != NULL); + + if (bk->subr->periodic) { + if (bk->sync > 0.0) { + ev_timer_stop(bk->event_loop, &bk->periodic_event); + } + + if (cb) { + bk->periodic_cb = cb; + bk->periodic_ud = ud; + } + + rspamd_fuzzy_backend_periodic_sync(bk); + bk->sync = timeout; + jittered = rspamd_time_jitter(timeout, timeout / 2.0); + + bk->periodic_event.data = bk; + ev_timer_init(&bk->periodic_event, rspamd_fuzzy_backend_periodic_cb, + jittered, 0.0); + ev_timer_start(bk->event_loop, &bk->periodic_event); + } +} + +void rspamd_fuzzy_backend_close(struct rspamd_fuzzy_backend *bk) +{ + g_assert(bk != NULL); + + if (bk->sync > 0.0) { + rspamd_fuzzy_backend_periodic_sync(bk); + ev_timer_stop(bk->event_loop, &bk->periodic_event); + } + + bk->subr->close(bk, bk->subr_ud); + + g_free(bk); +} + +struct ev_loop * +rspamd_fuzzy_backend_event_base(struct rspamd_fuzzy_backend *backend) +{ + return backend->event_loop; +} + +gdouble +rspamd_fuzzy_backend_get_expire(struct rspamd_fuzzy_backend *backend) +{ + return backend->expire; +} diff --git a/src/libserver/fuzzy_backend/fuzzy_backend.h b/src/libserver/fuzzy_backend/fuzzy_backend.h new file mode 100644 index 0000000..a1b74bc --- /dev/null +++ b/src/libserver/fuzzy_backend/fuzzy_backend.h @@ -0,0 +1,131 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef SRC_LIBSERVER_FUZZY_BACKEND_H_ +#define SRC_LIBSERVER_FUZZY_BACKEND_H_ + +#include "config.h" +#include "contrib/libev/ev.h" +#include "fuzzy_wire.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct rspamd_fuzzy_backend; +struct rspamd_config; + +/* + * Callbacks for fuzzy methods + */ +typedef void (*rspamd_fuzzy_check_cb)(struct rspamd_fuzzy_reply *rep, void *ud); + +typedef void (*rspamd_fuzzy_update_cb)(gboolean success, + guint nadded, + guint ndeleted, + guint nextended, + guint nignored, + void *ud); + +typedef void (*rspamd_fuzzy_version_cb)(guint64 rev, void *ud); + +typedef void (*rspamd_fuzzy_count_cb)(guint64 count, void *ud); + +typedef gboolean (*rspamd_fuzzy_periodic_cb)(void *ud); + +/** + * Open fuzzy backend + * @param ev_base + * @param config + * @param err + * @return + */ +struct rspamd_fuzzy_backend *rspamd_fuzzy_backend_create(struct ev_loop *ev_base, + const ucl_object_t *config, + struct rspamd_config *cfg, + GError **err); + + +/** + * Check a specific hash in storage + * @param cmd + * @param cb + * @param ud + */ +void rspamd_fuzzy_backend_check(struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud); + +/** + * Process updates for a specific queue + * @param bk + * @param updates queue of struct fuzzy_peer_cmd + * @param src + */ +void rspamd_fuzzy_backend_process_updates(struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb, + void *ud); + +/** + * Gets number of hashes from the backend + * @param bk + * @param cb + * @param ud + */ +void rspamd_fuzzy_backend_count(struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud); + +/** + * Returns number of revision for a specific source + * @param bk + * @param src + * @param cb + * @param ud + */ +void rspamd_fuzzy_backend_version(struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud); + +/** + * Returns unique id for backend + * @param backend + * @return + */ +const gchar *rspamd_fuzzy_backend_id(struct rspamd_fuzzy_backend *backend); + +/** + * Starts expire process for the backend + * @param backend + */ +void rspamd_fuzzy_backend_start_update(struct rspamd_fuzzy_backend *backend, + gdouble timeout, + rspamd_fuzzy_periodic_cb cb, + void *ud); + +struct ev_loop *rspamd_fuzzy_backend_event_base(struct rspamd_fuzzy_backend *backend); + +gdouble rspamd_fuzzy_backend_get_expire(struct rspamd_fuzzy_backend *backend); + +/** + * Closes backend + * @param backend + */ +void rspamd_fuzzy_backend_close(struct rspamd_fuzzy_backend *backend); + +#ifdef __cplusplus +} +#endif + +#endif /* SRC_LIBSERVER_FUZZY_BACKEND_H_ */ diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_redis.c b/src/libserver/fuzzy_backend/fuzzy_backend_redis.c new file mode 100644 index 0000000..7ab7ca6 --- /dev/null +++ b/src/libserver/fuzzy_backend/fuzzy_backend_redis.c @@ -0,0 +1,1666 @@ +/* + * Copyright 2023 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "config.h" +#include "ref.h" +#include "fuzzy_backend.h" +#include "fuzzy_backend_redis.h" +#include "redis_pool.h" +#include "cryptobox.h" +#include "str_util.h" +#include "upstream.h" +#include "contrib/hiredis/hiredis.h" +#include "contrib/hiredis/async.h" +#include "lua/lua_common.h" + +#define REDIS_DEFAULT_PORT 6379 +#define REDIS_DEFAULT_OBJECT "fuzzy" +#define REDIS_DEFAULT_TIMEOUT 2.0 + +#define msg_err_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \ + "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_warn_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \ + "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_info_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \ + "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_redis_session(...) rspamd_conditional_debug_fast(NULL, NULL, \ + rspamd_fuzzy_redis_log_id, "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) + +INIT_LOG_MODULE(fuzzy_redis) + +struct rspamd_fuzzy_backend_redis { + lua_State *L; + const gchar *redis_object; + const gchar *username; + const gchar *password; + const gchar *dbname; + gchar *id; + struct rspamd_redis_pool *pool; + gdouble timeout; + gint conf_ref; + bool terminated; + ref_entry_t ref; +}; + +enum rspamd_fuzzy_redis_command { + RSPAMD_FUZZY_REDIS_COMMAND_COUNT, + RSPAMD_FUZZY_REDIS_COMMAND_VERSION, + RSPAMD_FUZZY_REDIS_COMMAND_UPDATES, + RSPAMD_FUZZY_REDIS_COMMAND_CHECK +}; + +struct rspamd_fuzzy_redis_session { + struct rspamd_fuzzy_backend_redis *backend; + redisAsyncContext *ctx; + ev_timer timeout; + const struct rspamd_fuzzy_cmd *cmd; + struct ev_loop *event_loop; + float prob; + gboolean shingles_checked; + + enum rspamd_fuzzy_redis_command command; + guint nargs; + + guint nadded; + guint ndeleted; + guint nextended; + guint nignored; + + union { + rspamd_fuzzy_check_cb cb_check; + rspamd_fuzzy_update_cb cb_update; + rspamd_fuzzy_version_cb cb_version; + rspamd_fuzzy_count_cb cb_count; + } callback; + void *cbdata; + + gchar **argv; + gsize *argv_lens; + struct upstream *up; + guchar found_digest[rspamd_cryptobox_HASHBYTES]; +}; + +static inline struct upstream_list * +rspamd_redis_get_servers(struct rspamd_fuzzy_backend_redis *ctx, + const gchar *what) +{ + lua_State *L = ctx->L; + struct upstream_list *res = NULL; + + lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->conf_ref); + lua_pushstring(L, what); + lua_gettable(L, -2); + + if (lua_type(L, -1) == LUA_TUSERDATA) { + res = *((struct upstream_list **) lua_touserdata(L, -1)); + } + else { + struct lua_logger_trace tr; + gchar outbuf[8192]; + + memset(&tr, 0, sizeof(tr)); + lua_logger_out_type(L, -2, outbuf, sizeof(outbuf) - 1, &tr, + LUA_ESCAPE_UNPRINTABLE); + + msg_err("cannot get %s upstreams for Redis fuzzy storage %s; table content: %s", + what, ctx->id, outbuf); + } + + lua_settop(L, 0); + + return res; +} + +static inline void +rspamd_fuzzy_redis_session_free_args(struct rspamd_fuzzy_redis_session *session) +{ + guint i; + + if (session->argv) { + for (i = 0; i < session->nargs; i++) { + g_free(session->argv[i]); + } + + g_free(session->argv); + g_free(session->argv_lens); + } +} +static void +rspamd_fuzzy_redis_session_dtor(struct rspamd_fuzzy_redis_session *session, + gboolean is_fatal) +{ + redisAsyncContext *ac; + + + if (session->ctx) { + ac = session->ctx; + session->ctx = NULL; + rspamd_redis_pool_release_connection(session->backend->pool, + ac, + is_fatal ? RSPAMD_REDIS_RELEASE_FATAL : RSPAMD_REDIS_RELEASE_DEFAULT); + } + + ev_timer_stop(session->event_loop, &session->timeout); + rspamd_fuzzy_redis_session_free_args(session); + + REF_RELEASE(session->backend); + rspamd_upstream_unref(session->up); + g_free(session); +} + +static void +rspamd_fuzzy_backend_redis_dtor(struct rspamd_fuzzy_backend_redis *backend) +{ + if (!backend->terminated && backend->conf_ref != -1) { + luaL_unref(backend->L, LUA_REGISTRYINDEX, backend->conf_ref); + } + + if (backend->id) { + g_free(backend->id); + } + + g_free(backend); +} + +void * +rspamd_fuzzy_backend_init_redis(struct rspamd_fuzzy_backend *bk, + const ucl_object_t *obj, struct rspamd_config *cfg, GError **err) +{ + struct rspamd_fuzzy_backend_redis *backend; + const ucl_object_t *elt; + gboolean ret = FALSE; + guchar id_hash[rspamd_cryptobox_HASHBYTES]; + rspamd_cryptobox_hash_state_t st; + lua_State *L = (lua_State *) cfg->lua_state; + gint conf_ref = -1; + + backend = g_malloc0(sizeof(*backend)); + + backend->timeout = REDIS_DEFAULT_TIMEOUT; + backend->redis_object = REDIS_DEFAULT_OBJECT; + backend->L = L; + + ret = rspamd_lua_try_load_redis(L, obj, cfg, &conf_ref); + + /* Now try global redis settings */ + if (!ret) { + elt = ucl_object_lookup(cfg->cfg_ucl_obj, "redis"); + + if (elt) { + const ucl_object_t *specific_obj; + + specific_obj = ucl_object_lookup_any(elt, "fuzzy", "fuzzy_storage", + NULL); + + if (specific_obj) { + ret = rspamd_lua_try_load_redis(L, specific_obj, cfg, &conf_ref); + } + else { + ret = rspamd_lua_try_load_redis(L, elt, cfg, &conf_ref); + } + } + } + + if (!ret) { + msg_err_config("cannot init redis backend for fuzzy storage"); + g_free(backend); + + return NULL; + } + + elt = ucl_object_lookup(obj, "prefix"); + if (elt == NULL || ucl_object_type(elt) != UCL_STRING) { + backend->redis_object = REDIS_DEFAULT_OBJECT; + } + else { + backend->redis_object = ucl_object_tostring(elt); + } + + backend->conf_ref = conf_ref; + + /* Check some common table values */ + lua_rawgeti(L, LUA_REGISTRYINDEX, conf_ref); + + lua_pushstring(L, "timeout"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TNUMBER) { + backend->timeout = lua_tonumber(L, -1); + } + lua_pop(L, 1); + + lua_pushstring(L, "db"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TSTRING) { + backend->dbname = rspamd_mempool_strdup(cfg->cfg_pool, + lua_tostring(L, -1)); + } + lua_pop(L, 1); + + lua_pushstring(L, "username"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TSTRING) { + backend->username = rspamd_mempool_strdup(cfg->cfg_pool, + lua_tostring(L, -1)); + } + lua_pop(L, 1); + + lua_pushstring(L, "password"); + lua_gettable(L, -2); + if (lua_type(L, -1) == LUA_TSTRING) { + backend->password = rspamd_mempool_strdup(cfg->cfg_pool, + lua_tostring(L, -1)); + } + lua_pop(L, 1); + + lua_settop(L, 0); + + REF_INIT_RETAIN(backend, rspamd_fuzzy_backend_redis_dtor); + backend->pool = cfg->redis_pool; + rspamd_cryptobox_hash_init(&st, NULL, 0); + rspamd_cryptobox_hash_update(&st, backend->redis_object, + strlen(backend->redis_object)); + + if (backend->dbname) { + rspamd_cryptobox_hash_update(&st, backend->dbname, + strlen(backend->dbname)); + } + + if (backend->username) { + rspamd_cryptobox_hash_update(&st, backend->username, + strlen(backend->username)); + } + + if (backend->password) { + rspamd_cryptobox_hash_update(&st, backend->password, + strlen(backend->password)); + } + + rspamd_cryptobox_hash_final(&st, id_hash); + backend->id = rspamd_encode_base32(id_hash, sizeof(id_hash), RSPAMD_BASE32_DEFAULT); + + return backend; +} + +static void +rspamd_fuzzy_redis_timeout(EV_P_ ev_timer *w, int revents) +{ + struct rspamd_fuzzy_redis_session *session = + (struct rspamd_fuzzy_redis_session *) w->data; + redisAsyncContext *ac; + static char errstr[128]; + + if (session->ctx) { + ac = session->ctx; + session->ctx = NULL; + ac->err = REDIS_ERR_IO; + /* Should be safe as in hiredis it is char[128] */ + rspamd_snprintf(errstr, sizeof(errstr), "%s", strerror(ETIMEDOUT)); + ac->errstr = errstr; + + /* This will cause session closing */ + rspamd_redis_pool_release_connection(session->backend->pool, + ac, RSPAMD_REDIS_RELEASE_FATAL); + } +} + +static void rspamd_fuzzy_redis_check_callback(redisAsyncContext *c, gpointer r, + gpointer priv); + +struct _rspamd_fuzzy_shingles_helper { + guchar digest[64]; + guint found; +}; + +static gint +rspamd_fuzzy_backend_redis_shingles_cmp(const void *a, const void *b) +{ + const struct _rspamd_fuzzy_shingles_helper *sha = a, + *shb = b; + + return memcmp(sha->digest, shb->digest, sizeof(sha->digest)); +} + +static void +rspamd_fuzzy_redis_shingles_callback(redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r, *cur; + struct rspamd_fuzzy_reply rep; + GString *key; + struct _rspamd_fuzzy_shingles_helper *shingles, *prev = NULL, *sel = NULL; + guint i, found = 0, max_found = 0, cur_found = 0; + + ev_timer_stop(session->event_loop, &session->timeout); + memset(&rep, 0, sizeof(rep)); + + if (c->err == 0 && reply != NULL) { + rspamd_upstream_ok(session->up); + + if (reply->type == REDIS_REPLY_ARRAY && + reply->elements == RSPAMD_SHINGLE_SIZE) { + shingles = g_alloca(sizeof(struct _rspamd_fuzzy_shingles_helper) * + RSPAMD_SHINGLE_SIZE); + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { + cur = reply->element[i]; + + if (cur->type == REDIS_REPLY_STRING) { + shingles[i].found = 1; + memcpy(shingles[i].digest, cur->str, MIN(64, cur->len)); + found++; + } + else { + memset(shingles[i].digest, 0, sizeof(shingles[i].digest)); + shingles[i].found = 0; + } + } + + if (found > RSPAMD_SHINGLE_SIZE / 2) { + /* Now sort to find the most frequent element */ + qsort(shingles, RSPAMD_SHINGLE_SIZE, + sizeof(struct _rspamd_fuzzy_shingles_helper), + rspamd_fuzzy_backend_redis_shingles_cmp); + + prev = &shingles[0]; + + for (i = 1; i < RSPAMD_SHINGLE_SIZE; i++) { + if (!shingles[i].found) { + continue; + } + + if (memcmp(shingles[i].digest, prev->digest, 64) == 0) { + cur_found++; + + if (cur_found > max_found) { + max_found = cur_found; + sel = &shingles[i]; + } + } + else { + cur_found = 1; + prev = &shingles[i]; + } + } + + if (max_found > RSPAMD_SHINGLE_SIZE / 2) { + session->prob = ((float) max_found) / RSPAMD_SHINGLE_SIZE; + rep.v1.prob = session->prob; + + g_assert(sel != NULL); + + /* Prepare new check command */ + rspamd_fuzzy_redis_session_free_args(session); + session->nargs = 5; + session->argv = g_malloc(sizeof(gchar *) * session->nargs); + session->argv_lens = g_malloc(sizeof(gsize) * session->nargs); + + key = g_string_new(session->backend->redis_object); + g_string_append_len(key, sel->digest, sizeof(sel->digest)); + session->argv[0] = g_strdup("HMGET"); + session->argv_lens[0] = 5; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + session->argv[2] = g_strdup("V"); + session->argv_lens[2] = 1; + session->argv[3] = g_strdup("F"); + session->argv_lens[3] = 1; + session->argv[4] = g_strdup("C"); + session->argv_lens[4] = 1; + g_string_free(key, FALSE); /* Do not free underlying array */ + memcpy(session->found_digest, sel->digest, + sizeof(session->cmd->digest)); + + g_assert(session->ctx != NULL); + if (redisAsyncCommandArgv(session->ctx, + rspamd_fuzzy_redis_check_callback, + session, session->nargs, + (const gchar **) session->argv, + session->argv_lens) != REDIS_OK) { + + if (session->callback.cb_check) { + memset(&rep, 0, sizeof(rep)); + session->callback.cb_check(&rep, session->cbdata); + } + + rspamd_fuzzy_redis_session_dtor(session, TRUE); + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_now_update_if_cheap((struct ev_loop *) session->event_loop); + ev_timer_init(&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start(session->event_loop, &session->timeout); + } + + return; + } + } + } + else if (reply->type == REDIS_REPLY_ERROR) { + msg_err_redis_session("fuzzy backend redis error: \"%s\"", + reply->str); + } + + if (session->callback.cb_check) { + session->callback.cb_check(&rep, session->cbdata); + } + } + else { + if (session->callback.cb_check) { + session->callback.cb_check(&rep, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session("error getting shingles: %s", c->errstr); + rspamd_upstream_fail(session->up, FALSE, c->errstr); + } + } + + rspamd_fuzzy_redis_session_dtor(session, FALSE); +} + +static void +rspamd_fuzzy_backend_check_shingles(struct rspamd_fuzzy_redis_session *session) +{ + struct rspamd_fuzzy_reply rep; + const struct rspamd_fuzzy_shingle_cmd *shcmd; + GString *key; + guint i, init_len; + + rspamd_fuzzy_redis_session_free_args(session); + /* First of all check digest */ + session->nargs = RSPAMD_SHINGLE_SIZE + 1; + session->argv = g_malloc(sizeof(gchar *) * session->nargs); + session->argv_lens = g_malloc(sizeof(gsize) * session->nargs); + shcmd = (const struct rspamd_fuzzy_shingle_cmd *) session->cmd; + + session->argv[0] = g_strdup("MGET"); + session->argv_lens[0] = 4; + init_len = strlen(session->backend->redis_object); + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { + + key = g_string_sized_new(init_len + 2 + 2 + sizeof("18446744073709551616")); + rspamd_printf_gstring(key, "%s_%d_%uL", session->backend->redis_object, + i, shcmd->sgl.hashes[i]); + session->argv[i + 1] = key->str; + session->argv_lens[i + 1] = key->len; + g_string_free(key, FALSE); /* Do not free underlying array */ + } + + session->shingles_checked = TRUE; + + g_assert(session->ctx != NULL); + + if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_shingles_callback, + session, session->nargs, + (const gchar **) session->argv, session->argv_lens) != REDIS_OK) { + msg_err("cannot execute redis command on %s: %s", + rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)), + session->ctx->errstr); + + if (session->callback.cb_check) { + memset(&rep, 0, sizeof(rep)); + session->callback.cb_check(&rep, session->cbdata); + } + + rspamd_fuzzy_redis_session_dtor(session, TRUE); + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_now_update_if_cheap((struct ev_loop *) session->event_loop); + ev_timer_init(&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start(session->event_loop, &session->timeout); + } +} + +static void +rspamd_fuzzy_redis_check_callback(redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r, *cur; + struct rspamd_fuzzy_reply rep; + gulong value; + guint found_elts = 0; + + ev_timer_stop(session->event_loop, &session->timeout); + memset(&rep, 0, sizeof(rep)); + + if (c->err == 0 && reply != NULL) { + rspamd_upstream_ok(session->up); + + if (reply->type == REDIS_REPLY_ARRAY && reply->elements >= 2) { + cur = reply->element[0]; + + if (cur->type == REDIS_REPLY_STRING) { + value = strtoul(cur->str, NULL, 10); + rep.v1.value = value; + found_elts++; + } + + cur = reply->element[1]; + + if (cur->type == REDIS_REPLY_STRING) { + value = strtoul(cur->str, NULL, 10); + rep.v1.flag = value; + found_elts++; + } + + if (found_elts >= 2) { + rep.v1.prob = session->prob; + memcpy(rep.digest, session->found_digest, sizeof(rep.digest)); + } + + rep.ts = 0; + + if (reply->elements > 2) { + cur = reply->element[2]; + + if (cur->type == REDIS_REPLY_STRING) { + rep.ts = strtoul(cur->str, NULL, 10); + } + } + } + else if (reply->type == REDIS_REPLY_ERROR) { + msg_err_redis_session("fuzzy backend redis error: \"%s\"", + reply->str); + } + + if (found_elts < 2) { + if (session->cmd->shingles_count > 0 && !session->shingles_checked) { + /* We also need to check all shingles here */ + rspamd_fuzzy_backend_check_shingles(session); + /* Do not free session */ + return; + } + else { + if (session->callback.cb_check) { + session->callback.cb_check(&rep, session->cbdata); + } + } + } + else { + if (session->callback.cb_check) { + session->callback.cb_check(&rep, session->cbdata); + } + } + } + else { + if (session->callback.cb_check) { + session->callback.cb_check(&rep, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session("error getting hashes on %s: %s", + rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)), + c->errstr); + rspamd_upstream_fail(session->up, FALSE, c->errstr); + } + } + + rspamd_fuzzy_redis_session_dtor(session, FALSE); +} + +void rspamd_fuzzy_backend_check_redis(struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct upstream_list *ups; + rspamd_inet_addr_t *addr; + struct rspamd_fuzzy_reply rep; + GString *key; + + g_assert(backend != NULL); + + ups = rspamd_redis_get_servers(backend, "read_servers"); + if (!ups) { + if (cb) { + memset(&rep, 0, sizeof(rep)); + cb(&rep, ud); + } + + return; + } + + session = g_malloc0(sizeof(*session)); + session->backend = backend; + REF_RETAIN(session->backend); + + session->callback.cb_check = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_CHECK; + session->cmd = cmd; + session->prob = 1.0; + memcpy(rep.digest, session->cmd->digest, sizeof(rep.digest)); + memcpy(session->found_digest, session->cmd->digest, sizeof(rep.digest)); + session->event_loop = rspamd_fuzzy_backend_event_base(bk); + + /* First of all check digest */ + session->nargs = 5; + session->argv = g_malloc(sizeof(gchar *) * session->nargs); + session->argv_lens = g_malloc(sizeof(gsize) * session->nargs); + + key = g_string_new(backend->redis_object); + g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); + session->argv[0] = g_strdup("HMGET"); + session->argv_lens[0] = 5; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + session->argv[2] = g_strdup("V"); + session->argv_lens[2] = 1; + session->argv[3] = g_strdup("F"); + session->argv_lens[3] = 1; + session->argv[4] = g_strdup("C"); + session->argv_lens[4] = 1; + g_string_free(key, FALSE); /* Do not free underlying array */ + + up = rspamd_upstream_get(ups, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); + + session->up = rspamd_upstream_ref(up); + addr = rspamd_upstream_addr_next(up); + g_assert(addr != NULL); + session->ctx = rspamd_redis_pool_connect(backend->pool, + backend->dbname, + backend->username, backend->password, + rspamd_inet_address_to_string(addr), + rspamd_inet_address_get_port(addr)); + + if (session->ctx == NULL) { + rspamd_upstream_fail(up, TRUE, strerror(errno)); + rspamd_fuzzy_redis_session_dtor(session, TRUE); + + if (cb) { + memset(&rep, 0, sizeof(rep)); + cb(&rep, ud); + } + } + else { + if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_check_callback, + session, session->nargs, + (const gchar **) session->argv, session->argv_lens) != REDIS_OK) { + rspamd_fuzzy_redis_session_dtor(session, TRUE); + + if (cb) { + memset(&rep, 0, sizeof(rep)); + cb(&rep, ud); + } + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_now_update_if_cheap((struct ev_loop *) session->event_loop); + ev_timer_init(&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start(session->event_loop, &session->timeout); + } + } +} + +static void +rspamd_fuzzy_redis_count_callback(redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r; + gulong nelts; + + ev_timer_stop(session->event_loop, &session->timeout); + + if (c->err == 0 && reply != NULL) { + rspamd_upstream_ok(session->up); + + if (reply->type == REDIS_REPLY_INTEGER) { + if (session->callback.cb_count) { + session->callback.cb_count(reply->integer, session->cbdata); + } + } + else if (reply->type == REDIS_REPLY_STRING) { + nelts = strtoul(reply->str, NULL, 10); + + if (session->callback.cb_count) { + session->callback.cb_count(nelts, session->cbdata); + } + } + else { + if (reply->type == REDIS_REPLY_ERROR) { + msg_err_redis_session("fuzzy backend redis error: \"%s\"", + reply->str); + } + if (session->callback.cb_count) { + session->callback.cb_count(0, session->cbdata); + } + } + } + else { + if (session->callback.cb_count) { + session->callback.cb_count(0, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session("error getting count on %s: %s", + rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)), + c->errstr); + rspamd_upstream_fail(session->up, FALSE, c->errstr); + } + } + + rspamd_fuzzy_redis_session_dtor(session, FALSE); +} + +void rspamd_fuzzy_backend_count_redis(struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct upstream_list *ups; + rspamd_inet_addr_t *addr; + GString *key; + + g_assert(backend != NULL); + + ups = rspamd_redis_get_servers(backend, "read_servers"); + if (!ups) { + if (cb) { + cb(0, ud); + } + + return; + } + + session = g_malloc0(sizeof(*session)); + session->backend = backend; + REF_RETAIN(session->backend); + + session->callback.cb_count = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_COUNT; + session->event_loop = rspamd_fuzzy_backend_event_base(bk); + + session->nargs = 2; + session->argv = g_malloc(sizeof(gchar *) * 2); + session->argv_lens = g_malloc(sizeof(gsize) * 2); + key = g_string_new(backend->redis_object); + g_string_append(key, "_count"); + session->argv[0] = g_strdup("GET"); + session->argv_lens[0] = 3; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + g_string_free(key, FALSE); /* Do not free underlying array */ + + up = rspamd_upstream_get(ups, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); + + session->up = rspamd_upstream_ref(up); + addr = rspamd_upstream_addr_next(up); + g_assert(addr != NULL); + session->ctx = rspamd_redis_pool_connect(backend->pool, + backend->dbname, + backend->username, backend->password, + rspamd_inet_address_to_string(addr), + rspamd_inet_address_get_port(addr)); + + if (session->ctx == NULL) { + rspamd_upstream_fail(up, TRUE, strerror(errno)); + rspamd_fuzzy_redis_session_dtor(session, TRUE); + + if (cb) { + cb(0, ud); + } + } + else { + if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_count_callback, + session, session->nargs, + (const gchar **) session->argv, session->argv_lens) != REDIS_OK) { + rspamd_fuzzy_redis_session_dtor(session, TRUE); + + if (cb) { + cb(0, ud); + } + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_now_update_if_cheap((struct ev_loop *) session->event_loop); + ev_timer_init(&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start(session->event_loop, &session->timeout); + } + } +} + +static void +rspamd_fuzzy_redis_version_callback(redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r; + gulong nelts; + + ev_timer_stop(session->event_loop, &session->timeout); + + if (c->err == 0 && reply != NULL) { + rspamd_upstream_ok(session->up); + + if (reply->type == REDIS_REPLY_INTEGER) { + if (session->callback.cb_version) { + session->callback.cb_version(reply->integer, session->cbdata); + } + } + else if (reply->type == REDIS_REPLY_STRING) { + nelts = strtoul(reply->str, NULL, 10); + + if (session->callback.cb_version) { + session->callback.cb_version(nelts, session->cbdata); + } + } + else { + if (reply->type == REDIS_REPLY_ERROR) { + msg_err_redis_session("fuzzy backend redis error: \"%s\"", + reply->str); + } + if (session->callback.cb_version) { + session->callback.cb_version(0, session->cbdata); + } + } + } + else { + if (session->callback.cb_version) { + session->callback.cb_version(0, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session("error getting version on %s: %s", + rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)), + c->errstr); + rspamd_upstream_fail(session->up, FALSE, c->errstr); + } + } + + rspamd_fuzzy_redis_session_dtor(session, FALSE); +} + +void rspamd_fuzzy_backend_version_redis(struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct upstream_list *ups; + rspamd_inet_addr_t *addr; + GString *key; + + g_assert(backend != NULL); + + ups = rspamd_redis_get_servers(backend, "read_servers"); + if (!ups) { + if (cb) { + cb(0, ud); + } + + return; + } + + session = g_malloc0(sizeof(*session)); + session->backend = backend; + REF_RETAIN(session->backend); + + session->callback.cb_version = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION; + session->event_loop = rspamd_fuzzy_backend_event_base(bk); + + session->nargs = 2; + session->argv = g_malloc(sizeof(gchar *) * 2); + session->argv_lens = g_malloc(sizeof(gsize) * 2); + key = g_string_new(backend->redis_object); + g_string_append(key, src); + session->argv[0] = g_strdup("GET"); + session->argv_lens[0] = 3; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + g_string_free(key, FALSE); /* Do not free underlying array */ + + up = rspamd_upstream_get(ups, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); + + session->up = rspamd_upstream_ref(up); + addr = rspamd_upstream_addr_next(up); + g_assert(addr != NULL); + session->ctx = rspamd_redis_pool_connect(backend->pool, + backend->dbname, + backend->username, backend->password, + rspamd_inet_address_to_string(addr), + rspamd_inet_address_get_port(addr)); + + if (session->ctx == NULL) { + rspamd_upstream_fail(up, FALSE, strerror(errno)); + rspamd_fuzzy_redis_session_dtor(session, TRUE); + + if (cb) { + cb(0, ud); + } + } + else { + if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_version_callback, + session, session->nargs, + (const gchar **) session->argv, session->argv_lens) != REDIS_OK) { + rspamd_fuzzy_redis_session_dtor(session, TRUE); + + if (cb) { + cb(0, ud); + } + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_now_update_if_cheap((struct ev_loop *) session->event_loop); + ev_timer_init(&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start(session->event_loop, &session->timeout); + } + } +} + +const gchar * +rspamd_fuzzy_backend_id_redis(struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + g_assert(backend != NULL); + + return backend->id; +} + +void rspamd_fuzzy_backend_expire_redis(struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + + g_assert(backend != NULL); +} + +static gboolean +rspamd_fuzzy_update_append_command(struct rspamd_fuzzy_backend *bk, + struct rspamd_fuzzy_redis_session *session, + struct fuzzy_peer_cmd *io_cmd, guint *shift) +{ + GString *key, *value; + guint cur_shift = *shift; + guint i, klen; + struct rspamd_fuzzy_cmd *cmd; + + if (io_cmd->is_shingle) { + cmd = &io_cmd->cmd.shingle.basic; + } + else { + cmd = &io_cmd->cmd.normal; + } + + if (cmd->cmd == FUZZY_WRITE) { + /* + * For each normal hash addition we do 5 redis commands: + * HSET <key> F <flag> + * HSETNX <key> C <time> + * HINCRBY <key> V <weight> + * EXPIRE <key> <expire> + * Where <key> is <prefix> || <digest> + */ + + /* HSET */ + klen = strlen(session->backend->redis_object) + + sizeof(cmd->digest) + 1; + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); + value = g_string_sized_new(sizeof("4294967296")); + rspamd_printf_gstring(value, "%d", cmd->flag); + + if (cmd->version & RSPAMD_FUZZY_FLAG_WEAK) { + session->argv[cur_shift] = g_strdup("HSETNX"); + session->argv_lens[cur_shift++] = sizeof("HSETNX") - 1; + } + else { + session->argv[cur_shift] = g_strdup("HSET"); + session->argv_lens[cur_shift++] = sizeof("HSET") - 1; + } + + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = g_strdup("F"); + session->argv_lens[cur_shift++] = sizeof("F") - 1; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free(key, FALSE); + g_string_free(value, FALSE); + + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 4, + (const gchar **) &session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + + return FALSE; + } + + /* HSETNX */ + klen = strlen(session->backend->redis_object) + + sizeof(cmd->digest) + 1; + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); + value = g_string_sized_new(sizeof("18446744073709551616")); + rspamd_printf_gstring(value, "%L", (gint64) rspamd_get_calendar_ticks()); + session->argv[cur_shift] = g_strdup("HSETNX"); + session->argv_lens[cur_shift++] = sizeof("HSETNX") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = g_strdup("C"); + session->argv_lens[cur_shift++] = sizeof("C") - 1; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free(key, FALSE); + g_string_free(value, FALSE); + + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 4, + (const gchar **) &session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + + return FALSE; + } + + /* HINCRBY */ + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); + value = g_string_sized_new(sizeof("4294967296")); + rspamd_printf_gstring(value, "%d", cmd->value); + session->argv[cur_shift] = g_strdup("HINCRBY"); + session->argv_lens[cur_shift++] = sizeof("HINCRBY") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = g_strdup("V"); + session->argv_lens[cur_shift++] = sizeof("V") - 1; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free(key, FALSE); + g_string_free(value, FALSE); + + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 4, + (const gchar **) &session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + + return FALSE; + } + + /* EXPIRE */ + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); + value = g_string_sized_new(sizeof("4294967296")); + rspamd_printf_gstring(value, "%d", + (gint) rspamd_fuzzy_backend_get_expire(bk)); + session->argv[cur_shift] = g_strdup("EXPIRE"); + session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free(key, FALSE); + g_string_free(value, FALSE); + + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 3, + (const gchar **) &session->argv[cur_shift - 3], + &session->argv_lens[cur_shift - 3]) != REDIS_OK) { + + return FALSE; + } + + /* INCR */ + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append(key, "_count"); + session->argv[cur_shift] = g_strdup("INCR"); + session->argv_lens[cur_shift++] = sizeof("INCR") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + g_string_free(key, FALSE); + + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 2, + (const gchar **) &session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + return FALSE; + } + } + else if (cmd->cmd == FUZZY_DEL) { + /* DEL */ + klen = strlen(session->backend->redis_object) + + sizeof(cmd->digest) + 1; + + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); + session->argv[cur_shift] = g_strdup("DEL"); + session->argv_lens[cur_shift++] = sizeof("DEL") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + g_string_free(key, FALSE); + + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 2, + (const gchar **) &session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + return FALSE; + } + + /* DECR */ + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append(key, "_count"); + session->argv[cur_shift] = g_strdup("DECR"); + session->argv_lens[cur_shift++] = sizeof("DECR") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + g_string_free(key, FALSE); + + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 2, + (const gchar **) &session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + return FALSE; + } + } + else if (cmd->cmd == FUZZY_REFRESH) { + /* + * Issue refresh command by just EXPIRE command + * EXPIRE <key> <expire> + * Where <key> is <prefix> || <digest> + */ + + klen = strlen(session->backend->redis_object) + + sizeof(cmd->digest) + 1; + + /* EXPIRE */ + key = g_string_sized_new(klen); + g_string_append(key, session->backend->redis_object); + g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); + value = g_string_sized_new(sizeof("4294967296")); + rspamd_printf_gstring(value, "%d", + (gint) rspamd_fuzzy_backend_get_expire(bk)); + session->argv[cur_shift] = g_strdup("EXPIRE"); + session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free(key, FALSE); + g_string_free(value, FALSE); + + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 3, + (const gchar **) &session->argv[cur_shift - 3], + &session->argv_lens[cur_shift - 3]) != REDIS_OK) { + + return FALSE; + } + } + else if (cmd->cmd == FUZZY_DUP) { + /* Ignore */ + } + else { + g_assert_not_reached(); + } + + if (io_cmd->is_shingle) { + if (cmd->cmd == FUZZY_WRITE) { + klen = strlen(session->backend->redis_object) + + 64 + 1; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { + guchar *hval; + /* + * For each command with shingles we additionally emit 32 commands: + * SETEX <prefix>_<number>_<value> <expire> <digest> + */ + + /* SETEX */ + key = g_string_sized_new(klen); + rspamd_printf_gstring(key, "%s_%d_%uL", + session->backend->redis_object, + i, + io_cmd->cmd.shingle.sgl.hashes[i]); + value = g_string_sized_new(sizeof("4294967296")); + rspamd_printf_gstring(value, "%d", + (gint) rspamd_fuzzy_backend_get_expire(bk)); + hval = g_malloc(sizeof(io_cmd->cmd.shingle.basic.digest)); + memcpy(hval, io_cmd->cmd.shingle.basic.digest, + sizeof(io_cmd->cmd.shingle.basic.digest)); + session->argv[cur_shift] = g_strdup("SETEX"); + session->argv_lens[cur_shift++] = sizeof("SETEX") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + session->argv[cur_shift] = hval; + session->argv_lens[cur_shift++] = sizeof(io_cmd->cmd.shingle.basic.digest); + g_string_free(key, FALSE); + g_string_free(value, FALSE); + + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 4, + (const gchar **) &session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + + return FALSE; + } + } + } + else if (cmd->cmd == FUZZY_DEL) { + klen = strlen(session->backend->redis_object) + + 64 + 1; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { + key = g_string_sized_new(klen); + rspamd_printf_gstring(key, "%s_%d_%uL", + session->backend->redis_object, + i, + io_cmd->cmd.shingle.sgl.hashes[i]); + session->argv[cur_shift] = g_strdup("DEL"); + session->argv_lens[cur_shift++] = sizeof("DEL") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + g_string_free(key, FALSE); + + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 2, + (const gchar **) &session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + return FALSE; + } + } + } + else if (cmd->cmd == FUZZY_REFRESH) { + klen = strlen(session->backend->redis_object) + + 64 + 1; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { + /* + * For each command with shingles we additionally emit 32 commands: + * EXPIRE <prefix>_<number>_<value> <expire> + */ + + /* Expire */ + key = g_string_sized_new(klen); + rspamd_printf_gstring(key, "%s_%d_%uL", + session->backend->redis_object, + i, + io_cmd->cmd.shingle.sgl.hashes[i]); + value = g_string_sized_new(sizeof("18446744073709551616")); + rspamd_printf_gstring(value, "%d", + (gint) rspamd_fuzzy_backend_get_expire(bk)); + session->argv[cur_shift] = g_strdup("EXPIRE"); + session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free(key, FALSE); + g_string_free(value, FALSE); + + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 3, + (const gchar **) &session->argv[cur_shift - 3], + &session->argv_lens[cur_shift - 3]) != REDIS_OK) { + + return FALSE; + } + } + } + else if (cmd->cmd == FUZZY_DUP) { + /* Ignore */ + } + else { + g_assert_not_reached(); + } + } + + *shift = cur_shift; + + return TRUE; +} + +static void +rspamd_fuzzy_redis_update_callback(redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r; + + ev_timer_stop(session->event_loop, &session->timeout); + + if (c->err == 0 && reply != NULL) { + rspamd_upstream_ok(session->up); + + if (reply->type == REDIS_REPLY_ARRAY) { + /* TODO: check all replies somehow */ + if (session->callback.cb_update) { + session->callback.cb_update(TRUE, + session->nadded, + session->ndeleted, + session->nextended, + session->nignored, + session->cbdata); + } + } + else { + if (reply->type == REDIS_REPLY_ERROR) { + msg_err_redis_session("fuzzy backend redis error: \"%s\"", + reply->str); + } + if (session->callback.cb_update) { + session->callback.cb_update(FALSE, 0, 0, 0, 0, session->cbdata); + } + } + } + else { + if (session->callback.cb_update) { + session->callback.cb_update(FALSE, 0, 0, 0, 0, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session("error sending update to redis %s: %s", + rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)), + c->errstr); + rspamd_upstream_fail(session->up, FALSE, c->errstr); + } + } + + rspamd_fuzzy_redis_session_dtor(session, FALSE); +} + +void rspamd_fuzzy_backend_update_redis(struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct upstream_list *ups; + rspamd_inet_addr_t *addr; + guint i; + GString *key; + struct fuzzy_peer_cmd *io_cmd; + struct rspamd_fuzzy_cmd *cmd = NULL; + guint nargs, cur_shift; + + g_assert(backend != NULL); + + ups = rspamd_redis_get_servers(backend, "write_servers"); + if (!ups) { + if (cb) { + cb(FALSE, 0, 0, 0, 0, ud); + } + + return; + } + + session = g_malloc0(sizeof(*session)); + session->backend = backend; + REF_RETAIN(session->backend); + + /* + * For each normal hash addition we do 3 redis commands: + * HSET <key> F <flag> **OR** HSETNX <key> F <flag> when flag is weak + * HINCRBY <key> V <weight> + * EXPIRE <key> <expire> + * INCR <prefix||fuzzy_count> + * + * Where <key> is <prefix> || <digest> + * + * For each command with shingles we additionally emit 32 commands: + * SETEX <prefix>_<number>_<value> <expire> <digest> + * + * For each delete command we emit: + * DEL <key> + * + * For each delete command with shingles we emit also 32 commands: + * DEL <prefix>_<number>_<value> + * DECR <prefix||fuzzy_count> + */ + + nargs = 4; + + for (i = 0; i < updates->len; i++) { + io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i); + + if (io_cmd->is_shingle) { + cmd = &io_cmd->cmd.shingle.basic; + } + else { + cmd = &io_cmd->cmd.normal; + } + + if (cmd->cmd == FUZZY_WRITE) { + nargs += 17; + session->nadded++; + + if (io_cmd->is_shingle) { + nargs += RSPAMD_SHINGLE_SIZE * 4; + } + } + else if (cmd->cmd == FUZZY_DEL) { + nargs += 4; + session->ndeleted++; + + if (io_cmd->is_shingle) { + nargs += RSPAMD_SHINGLE_SIZE * 2; + } + } + else if (cmd->cmd == FUZZY_REFRESH) { + nargs += 3; + session->nextended++; + + if (io_cmd->is_shingle) { + nargs += RSPAMD_SHINGLE_SIZE * 3; + } + } + else { + session->nignored++; + } + } + + /* Now we need to create a new request */ + session->callback.cb_update = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES; + session->cmd = cmd; + session->prob = 1.0f; + session->event_loop = rspamd_fuzzy_backend_event_base(bk); + + /* First of all check digest */ + session->nargs = nargs; + session->argv = g_malloc0(sizeof(gchar *) * session->nargs); + session->argv_lens = g_malloc0(sizeof(gsize) * session->nargs); + + up = rspamd_upstream_get(ups, + RSPAMD_UPSTREAM_MASTER_SLAVE, + NULL, + 0); + + session->up = rspamd_upstream_ref(up); + addr = rspamd_upstream_addr_next(up); + g_assert(addr != NULL); + session->ctx = rspamd_redis_pool_connect(backend->pool, + backend->dbname, + backend->username, backend->password, + rspamd_inet_address_to_string(addr), + rspamd_inet_address_get_port(addr)); + + if (session->ctx == NULL) { + rspamd_upstream_fail(up, TRUE, strerror(errno)); + rspamd_fuzzy_redis_session_dtor(session, TRUE); + + if (cb) { + cb(FALSE, 0, 0, 0, 0, ud); + } + } + else { + /* Start with MULTI command */ + session->argv[0] = g_strdup("MULTI"); + session->argv_lens[0] = 5; + + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 1, + (const gchar **) session->argv, + session->argv_lens) != REDIS_OK) { + + if (cb) { + cb(FALSE, 0, 0, 0, 0, ud); + } + rspamd_fuzzy_redis_session_dtor(session, TRUE); + + return; + } + + /* Now split the rest of commands in packs and emit them command by command */ + cur_shift = 1; + + for (i = 0; i < updates->len; i++) { + io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i); + + if (!rspamd_fuzzy_update_append_command(bk, session, io_cmd, + &cur_shift)) { + if (cb) { + cb(FALSE, 0, 0, 0, 0, ud); + } + rspamd_fuzzy_redis_session_dtor(session, TRUE); + + return; + } + } + + /* Now INCR command for the source */ + key = g_string_new(backend->redis_object); + g_string_append(key, src); + session->argv[cur_shift] = g_strdup("INCR"); + session->argv_lens[cur_shift++] = 4; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + g_string_free(key, FALSE); + + if (redisAsyncCommandArgv(session->ctx, NULL, NULL, + 2, + (const gchar **) &session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + if (cb) { + cb(FALSE, 0, 0, 0, 0, ud); + } + rspamd_fuzzy_redis_session_dtor(session, TRUE); + + return; + } + + /* Finally we call EXEC with a specific callback */ + session->argv[cur_shift] = g_strdup("EXEC"); + session->argv_lens[cur_shift] = 4; + + if (redisAsyncCommandArgv(session->ctx, + rspamd_fuzzy_redis_update_callback, session, + 1, + (const gchar **) &session->argv[cur_shift], + &session->argv_lens[cur_shift]) != REDIS_OK) { + + if (cb) { + cb(FALSE, 0, 0, 0, 0, ud); + } + rspamd_fuzzy_redis_session_dtor(session, TRUE); + + return; + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_now_update_if_cheap((struct ev_loop *) session->event_loop); + ev_timer_init(&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start(session->event_loop, &session->timeout); + } + } +} + +void rspamd_fuzzy_backend_close_redis(struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + + g_assert(backend != NULL); + + /* + * XXX: we leak lua registry element there to avoid crashing + * due to chicken-egg problem between lua state termination and + * redis pool termination. + * Here, we assume that redis pool is destroyed AFTER lua_state, + * so all connections pending will release references but due to + * `terminated` hack they will not try to access Lua stuff + * This is enabled merely if we have connections pending (e.g. refcount > 1) + */ + if (backend->ref.refcount > 1) { + backend->terminated = true; + } + REF_RELEASE(backend); +} diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_redis.h b/src/libserver/fuzzy_backend/fuzzy_backend_redis.h new file mode 100644 index 0000000..3cfa162 --- /dev/null +++ b/src/libserver/fuzzy_backend/fuzzy_backend_redis.h @@ -0,0 +1,67 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef SRC_LIBSERVER_FUZZY_BACKEND_REDIS_H_ +#define SRC_LIBSERVER_FUZZY_BACKEND_REDIS_H_ + +#include "config.h" +#include "fuzzy_backend.h" + + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Subroutines for fuzzy_backend + */ +void *rspamd_fuzzy_backend_init_redis(struct rspamd_fuzzy_backend *bk, + const ucl_object_t *obj, + struct rspamd_config *cfg, + GError **err); + +void rspamd_fuzzy_backend_check_redis(struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud); + +void rspamd_fuzzy_backend_update_redis(struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud); + +void rspamd_fuzzy_backend_count_redis(struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud); + +void rspamd_fuzzy_backend_version_redis(struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud); + +const gchar *rspamd_fuzzy_backend_id_redis(struct rspamd_fuzzy_backend *bk, + void *subr_ud); + +void rspamd_fuzzy_backend_expire_redis(struct rspamd_fuzzy_backend *bk, + void *subr_ud); + +void rspamd_fuzzy_backend_close_redis(struct rspamd_fuzzy_backend *bk, + void *subr_ud); + +#ifdef __cplusplus +} +#endif + +#endif /* SRC_LIBSERVER_FUZZY_BACKEND_REDIS_H_ */ diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c new file mode 100644 index 0000000..9ec448e --- /dev/null +++ b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c @@ -0,0 +1,1029 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rspamd.h" +#include "fuzzy_backend.h" +#include "fuzzy_backend_sqlite.h" +#include "unix-std.h" + +#include <sqlite3.h> +#include "libutil/sqlite_utils.h" + +struct rspamd_fuzzy_backend_sqlite { + sqlite3 *db; + char *path; + gchar id[MEMPOOL_UID_LEN]; + gsize count; + gsize expired; + rspamd_mempool_t *pool; +}; + +static const gdouble sql_sleep_time = 0.1; +static const guint max_retries = 10; + +#define msg_err_fuzzy_backend(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \ + backend->pool->tag.tagname, backend->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_warn_fuzzy_backend(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \ + backend->pool->tag.tagname, backend->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_info_fuzzy_backend(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \ + backend->pool->tag.tagname, backend->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_fuzzy_backend(...) rspamd_conditional_debug_fast(NULL, NULL, \ + rspamd_fuzzy_sqlite_log_id, backend->pool->tag.tagname, backend->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) + +INIT_LOG_MODULE(fuzzy_sqlite) + +static const char *create_tables_sql = + "BEGIN;" + "CREATE TABLE IF NOT EXISTS digests(" + " id INTEGER PRIMARY KEY," + " flag INTEGER NOT NULL," + " digest TEXT NOT NULL," + " value INTEGER," + " time INTEGER);" + "CREATE TABLE IF NOT EXISTS shingles(" + " value INTEGER NOT NULL," + " number INTEGER NOT NULL," + " digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE " + " ON UPDATE CASCADE);" + "CREATE TABLE IF NOT EXISTS sources(" + " name TEXT UNIQUE," + " version INTEGER," + " last INTEGER);" + "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);" + "CREATE INDEX IF NOT EXISTS t ON digests(time);" + "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);" + "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);" + "COMMIT;"; +#if 0 +static const char *create_index_sql = + "BEGIN;" + "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);" + "CREATE INDEX IF NOT EXISTS t ON digests(time);" + "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);" + "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);" + "COMMIT;"; +#endif +enum rspamd_fuzzy_statement_idx { + RSPAMD_FUZZY_BACKEND_TRANSACTION_START = 0, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK, + RSPAMD_FUZZY_BACKEND_INSERT, + RSPAMD_FUZZY_BACKEND_UPDATE, + RSPAMD_FUZZY_BACKEND_UPDATE_FLAG, + RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE, + RSPAMD_FUZZY_BACKEND_CHECK, + RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE, + RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, + RSPAMD_FUZZY_BACKEND_DELETE, + RSPAMD_FUZZY_BACKEND_COUNT, + RSPAMD_FUZZY_BACKEND_EXPIRE, + RSPAMD_FUZZY_BACKEND_VACUUM, + RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED, + RSPAMD_FUZZY_BACKEND_ADD_SOURCE, + RSPAMD_FUZZY_BACKEND_VERSION, + RSPAMD_FUZZY_BACKEND_SET_VERSION, + RSPAMD_FUZZY_BACKEND_MAX +}; +static struct rspamd_fuzzy_stmts { + enum rspamd_fuzzy_statement_idx idx; + const gchar *sql; + const gchar *args; + sqlite3_stmt *stmt; + gint result; +} prepared_stmts[RSPAMD_FUZZY_BACKEND_MAX] = + { + {.idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_START, + .sql = "BEGIN TRANSACTION;", + .args = "", + .stmt = NULL, + .result = SQLITE_DONE}, + {.idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT, + .sql = "COMMIT;", + .args = "", + .stmt = NULL, + .result = SQLITE_DONE}, + {.idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK, + .sql = "ROLLBACK;", + .args = "", + .stmt = NULL, + .result = SQLITE_DONE}, + {.idx = RSPAMD_FUZZY_BACKEND_INSERT, + .sql = "INSERT INTO digests(flag, digest, value, time) VALUES" + "(?1, ?2, ?3, strftime('%s','now'));", + .args = "SDI", + .stmt = NULL, + .result = SQLITE_DONE}, + {.idx = RSPAMD_FUZZY_BACKEND_UPDATE, + .sql = "UPDATE digests SET value = value + ?1, time = strftime('%s','now') WHERE " + "digest==?2;", + .args = "ID", + .stmt = NULL, + .result = SQLITE_DONE}, + {.idx = RSPAMD_FUZZY_BACKEND_UPDATE_FLAG, + .sql = "UPDATE digests SET value = ?1, flag = ?2, time = strftime('%s','now') WHERE " + "digest==?3;", + .args = "IID", + .stmt = NULL, + .result = SQLITE_DONE}, + {.idx = RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE, + .sql = "INSERT OR REPLACE INTO shingles(value, number, digest_id) " + "VALUES (?1, ?2, ?3);", + .args = "III", + .stmt = NULL, + .result = SQLITE_DONE}, + {.idx = RSPAMD_FUZZY_BACKEND_CHECK, + .sql = "SELECT value, time, flag FROM digests WHERE digest==?1;", + .args = "D", + .stmt = NULL, + .result = SQLITE_ROW}, + {.idx = RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE, + .sql = "SELECT digest_id FROM shingles WHERE value=?1 AND number=?2", + .args = "IS", + .stmt = NULL, + .result = SQLITE_ROW}, + {.idx = RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, + .sql = "SELECT digest, value, time, flag FROM digests WHERE id=?1", + .args = "I", + .stmt = NULL, + .result = SQLITE_ROW}, + {.idx = RSPAMD_FUZZY_BACKEND_DELETE, + .sql = "DELETE FROM digests WHERE digest==?1;", + .args = "D", + .stmt = NULL, + .result = SQLITE_DONE}, + {.idx = RSPAMD_FUZZY_BACKEND_COUNT, + .sql = "SELECT COUNT(*) FROM digests;", + .args = "", + .stmt = NULL, + .result = SQLITE_ROW}, + {.idx = RSPAMD_FUZZY_BACKEND_EXPIRE, + .sql = "DELETE FROM digests WHERE id IN (SELECT id FROM digests WHERE time < ?1 LIMIT ?2);", + .args = "II", + .stmt = NULL, + .result = SQLITE_DONE}, + {.idx = RSPAMD_FUZZY_BACKEND_VACUUM, + .sql = "VACUUM;", + .args = "", + .stmt = NULL, + .result = SQLITE_DONE}, + {.idx = RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED, + .sql = "DELETE FROM shingles WHERE value=?1 AND number=?2;", + .args = "II", + .stmt = NULL, + .result = SQLITE_DONE}, + {.idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE, + .sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);", + .args = "TII", + .stmt = NULL, + .result = SQLITE_DONE}, + {.idx = RSPAMD_FUZZY_BACKEND_VERSION, + .sql = "SELECT version FROM sources WHERE name=?1;", + .args = "T", + .stmt = NULL, + .result = SQLITE_ROW}, + {.idx = RSPAMD_FUZZY_BACKEND_SET_VERSION, + .sql = "INSERT OR REPLACE INTO sources (name, version, last) VALUES (?3, ?1, ?2);", + .args = "IIT", + .stmt = NULL, + .result = SQLITE_DONE}, +}; + +static GQuark +rspamd_fuzzy_backend_sqlite_quark(void) +{ + return g_quark_from_static_string("fuzzy-backend-sqlite"); +} + +static gboolean +rspamd_fuzzy_backend_sqlite_prepare_stmts(struct rspamd_fuzzy_backend_sqlite *bk, GError **err) +{ + int i; + + for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i++) { + if (prepared_stmts[i].stmt != NULL) { + /* Skip already prepared statements */ + continue; + } + if (sqlite3_prepare_v2(bk->db, prepared_stmts[i].sql, -1, + &prepared_stmts[i].stmt, NULL) != SQLITE_OK) { + g_set_error(err, rspamd_fuzzy_backend_sqlite_quark(), + -1, "Cannot initialize prepared sql `%s`: %s", + prepared_stmts[i].sql, sqlite3_errmsg(bk->db)); + + return FALSE; + } + } + + return TRUE; +} + +static int +rspamd_fuzzy_backend_sqlite_cleanup_stmt(struct rspamd_fuzzy_backend_sqlite *backend, + int idx) +{ + sqlite3_stmt *stmt; + + if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) { + + return -1; + } + + msg_debug_fuzzy_backend("resetting `%s`", prepared_stmts[idx].sql); + stmt = prepared_stmts[idx].stmt; + sqlite3_clear_bindings(stmt); + sqlite3_reset(stmt); + + return SQLITE_OK; +} + +static int +rspamd_fuzzy_backend_sqlite_run_stmt(struct rspamd_fuzzy_backend_sqlite *backend, + gboolean auto_cleanup, + int idx, ...) +{ + int retcode; + va_list ap; + sqlite3_stmt *stmt; + int i; + const char *argtypes; + guint retries = 0; + struct timespec ts; + + if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) { + + return -1; + } + + stmt = prepared_stmts[idx].stmt; + g_assert((int) prepared_stmts[idx].idx == idx); + + if (stmt == NULL) { + if ((retcode = sqlite3_prepare_v2(backend->db, prepared_stmts[idx].sql, -1, + &prepared_stmts[idx].stmt, NULL)) != SQLITE_OK) { + msg_err_fuzzy_backend("Cannot initialize prepared sql `%s`: %s", + prepared_stmts[idx].sql, sqlite3_errmsg(backend->db)); + + return retcode; + } + stmt = prepared_stmts[idx].stmt; + } + + msg_debug_fuzzy_backend("executing `%s` %s auto cleanup", + prepared_stmts[idx].sql, auto_cleanup ? "with" : "without"); + argtypes = prepared_stmts[idx].args; + sqlite3_clear_bindings(stmt); + sqlite3_reset(stmt); + va_start(ap, idx); + + for (i = 0; argtypes[i] != '\0'; i++) { + switch (argtypes[i]) { + case 'T': + sqlite3_bind_text(stmt, i + 1, va_arg(ap, const char *), -1, + SQLITE_STATIC); + break; + case 'I': + sqlite3_bind_int64(stmt, i + 1, va_arg(ap, gint64)); + break; + case 'S': + sqlite3_bind_int(stmt, i + 1, va_arg(ap, gint)); + break; + case 'D': + /* Special case for digests variable */ + sqlite3_bind_text(stmt, i + 1, va_arg(ap, const char *), 64, + SQLITE_STATIC); + break; + } + } + + va_end(ap); + +retry: + retcode = sqlite3_step(stmt); + + if (retcode == prepared_stmts[idx].result) { + retcode = SQLITE_OK; + } + else { + if ((retcode == SQLITE_BUSY || + retcode == SQLITE_LOCKED) && + retries++ < max_retries) { + double_to_ts(sql_sleep_time, &ts); + nanosleep(&ts, NULL); + goto retry; + } + + msg_debug_fuzzy_backend("failed to execute query %s: %d, %s", prepared_stmts[idx].sql, + retcode, sqlite3_errmsg(backend->db)); + } + + if (auto_cleanup) { + sqlite3_clear_bindings(stmt); + sqlite3_reset(stmt); + } + + return retcode; +} + +static void +rspamd_fuzzy_backend_sqlite_close_stmts(struct rspamd_fuzzy_backend_sqlite *bk) +{ + int i; + + for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i++) { + if (prepared_stmts[i].stmt != NULL) { + sqlite3_finalize(prepared_stmts[i].stmt); + prepared_stmts[i].stmt = NULL; + } + } + + return; +} + +static gboolean +rspamd_fuzzy_backend_sqlite_run_sql(const gchar *sql, struct rspamd_fuzzy_backend_sqlite *bk, + GError **err) +{ + guint retries = 0; + struct timespec ts; + gint ret; + + do { + ret = sqlite3_exec(bk->db, sql, NULL, NULL, NULL); + double_to_ts(sql_sleep_time, &ts); + } while (ret == SQLITE_BUSY && retries++ < max_retries && + nanosleep(&ts, NULL) == 0); + + if (ret != SQLITE_OK) { + g_set_error(err, rspamd_fuzzy_backend_sqlite_quark(), + -1, "Cannot execute raw sql `%s`: %s", + sql, sqlite3_errmsg(bk->db)); + return FALSE; + } + + return TRUE; +} + +static struct rspamd_fuzzy_backend_sqlite * +rspamd_fuzzy_backend_sqlite_open_db(const gchar *path, GError **err) +{ + struct rspamd_fuzzy_backend_sqlite *bk; + rspamd_cryptobox_hash_state_t st; + guchar hash_out[rspamd_cryptobox_HASHBYTES]; + + g_assert(path != NULL); + + bk = g_malloc0(sizeof(*bk)); + bk->path = g_strdup(path); + bk->expired = 0; + bk->pool = rspamd_mempool_new(rspamd_mempool_suggest_size(), + "fuzzy_backend", 0); + bk->db = rspamd_sqlite3_open_or_create(bk->pool, bk->path, + create_tables_sql, 1, err); + + if (bk->db == NULL) { + rspamd_fuzzy_backend_sqlite_close(bk); + + return NULL; + } + + if (!rspamd_fuzzy_backend_sqlite_prepare_stmts(bk, err)) { + rspamd_fuzzy_backend_sqlite_close(bk); + + return NULL; + } + + /* Set id for the backend */ + rspamd_cryptobox_hash_init(&st, NULL, 0); + rspamd_cryptobox_hash_update(&st, path, strlen(path)); + rspamd_cryptobox_hash_final(&st, hash_out); + rspamd_snprintf(bk->id, sizeof(bk->id), "%xs", hash_out); + memcpy(bk->pool->tag.uid, bk->id, sizeof(bk->pool->tag.uid)); + + return bk; +} + +struct rspamd_fuzzy_backend_sqlite * +rspamd_fuzzy_backend_sqlite_open(const gchar *path, + gboolean vacuum, + GError **err) +{ + struct rspamd_fuzzy_backend_sqlite *backend; + + if (path == NULL) { + g_set_error(err, rspamd_fuzzy_backend_sqlite_quark(), + ENOENT, "Path has not been specified"); + return NULL; + } + + /* Open database */ + if ((backend = rspamd_fuzzy_backend_sqlite_open_db(path, err)) == NULL) { + return NULL; + } + + if (rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE, RSPAMD_FUZZY_BACKEND_COUNT) == SQLITE_OK) { + backend->count = sqlite3_column_int64( + prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_COUNT); + + return backend; +} + +static gint +rspamd_fuzzy_backend_sqlite_int64_cmp(const void *a, const void *b) +{ + gint64 ia = *(gint64 *) a, ib = *(gint64 *) b; + + return (ia - ib); +} + +struct rspamd_fuzzy_reply +rspamd_fuzzy_backend_sqlite_check(struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd, gint64 expire) +{ + struct rspamd_fuzzy_reply rep; + const struct rspamd_fuzzy_shingle_cmd *shcmd; + int rc; + gint64 timestamp; + gint64 shingle_values[RSPAMD_SHINGLE_SIZE], i, sel_id, cur_id, + cur_cnt, max_cnt; + + memset(&rep, 0, sizeof(rep)); + memcpy(rep.digest, cmd->digest, sizeof(rep.digest)); + + if (backend == NULL) { + return rep; + } + + /* Try direct match first of all */ + rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_START); + rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE, + RSPAMD_FUZZY_BACKEND_CHECK, + cmd->digest); + + if (rc == SQLITE_OK) { + timestamp = sqlite3_column_int64( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 1); + if (time(NULL) - timestamp > expire) { + /* Expire element */ + msg_debug_fuzzy_backend("requested hash has been expired"); + } + else { + rep.v1.value = sqlite3_column_int64( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 0); + rep.v1.prob = 1.0; + rep.v1.flag = sqlite3_column_int( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 2); + } + } + else if (cmd->shingles_count > 0) { + /* Fuzzy match */ + + rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_CHECK); + shcmd = (const struct rspamd_fuzzy_shingle_cmd *) cmd; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { + rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE, + RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE, + shcmd->sgl.hashes[i], i); + if (rc == SQLITE_OK) { + shingle_values[i] = sqlite3_column_int64( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE].stmt, + 0); + } + else { + shingle_values[i] = -1; + } + msg_debug_fuzzy_backend("looking for shingle %L -> %L: %d", i, + shcmd->sgl.hashes[i], rc); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, + RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE); + + qsort(shingle_values, RSPAMD_SHINGLE_SIZE, sizeof(gint64), + rspamd_fuzzy_backend_sqlite_int64_cmp); + sel_id = -1; + cur_id = -1; + cur_cnt = 0; + max_cnt = 0; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { + if (shingle_values[i] == -1) { + continue; + } + + /* We have some value here, so we need to check it */ + if (shingle_values[i] == cur_id) { + cur_cnt++; + } + else { + cur_id = shingle_values[i]; + if (cur_cnt >= max_cnt) { + max_cnt = cur_cnt; + sel_id = cur_id; + } + cur_cnt = 0; + } + } + + if (cur_cnt > max_cnt) { + max_cnt = cur_cnt; + } + + if (sel_id != -1) { + /* We have some id selected here */ + rep.v1.prob = (float) max_cnt / (float) RSPAMD_SHINGLE_SIZE; + + if (rep.v1.prob > 0.5) { + msg_debug_fuzzy_backend( + "found fuzzy hash with probability %.2f", + rep.v1.prob); + rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE, + RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, sel_id); + if (rc == SQLITE_OK) { + timestamp = sqlite3_column_int64( + prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, + 2); + if (time(NULL) - timestamp > expire) { + /* Expire element */ + msg_debug_fuzzy_backend( + "requested hash has been expired"); + rep.v1.prob = 0.0; + } + else { + rep.ts = timestamp; + memcpy(rep.digest, sqlite3_column_blob(prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, 0), sizeof(rep.digest)); + rep.v1.value = sqlite3_column_int64( + prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, + 1); + rep.v1.flag = sqlite3_column_int( + prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, + 3); + } + } + } + else { + /* Otherwise we assume that as error */ + rep.v1.value = 0; + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, + RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID); + } + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_CHECK); + rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + + return rep; +} + +gboolean +rspamd_fuzzy_backend_sqlite_prepare_update(struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source) +{ + gint rc; + + if (backend == NULL) { + return FALSE; + } + + rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_START); + + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend("cannot start transaction for updates: %s", + sqlite3_errmsg(backend->db)); + return FALSE; + } + + return TRUE; +} + +gboolean +rspamd_fuzzy_backend_sqlite_add(struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd) +{ + int rc, i; + gint64 id, flag; + const struct rspamd_fuzzy_shingle_cmd *shcmd; + + if (backend == NULL) { + return FALSE; + } + + rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE, + RSPAMD_FUZZY_BACKEND_CHECK, + cmd->digest); + + if (rc == SQLITE_OK) { + /* Check flag */ + flag = sqlite3_column_int64( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, + 2); + rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_CHECK); + + if (flag == cmd->flag) { + /* We need to increase weight */ + rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_UPDATE, + (gint64) cmd->value, + cmd->digest); + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend("cannot update hash to %d -> " + "%*xs: %s", + (gint) cmd->flag, + (gint) sizeof(cmd->digest), cmd->digest, + sqlite3_errmsg(backend->db)); + } + } + else { + /* We need to relearn actually */ + + rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_UPDATE_FLAG, + (gint64) cmd->value, + (gint64) cmd->flag, + cmd->digest); + + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend("cannot update hash to %d -> " + "%*xs: %s", + (gint) cmd->flag, + (gint) sizeof(cmd->digest), cmd->digest, + sqlite3_errmsg(backend->db)); + } + } + } + else { + rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_CHECK); + rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE, + RSPAMD_FUZZY_BACKEND_INSERT, + (gint) cmd->flag, + cmd->digest, + (gint64) cmd->value); + + if (rc == SQLITE_OK) { + if (cmd->shingles_count > 0) { + id = sqlite3_last_insert_rowid(backend->db); + shcmd = (const struct rspamd_fuzzy_shingle_cmd *) cmd; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { + rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE, + shcmd->sgl.hashes[i], (gint64) i, id); + msg_debug_fuzzy_backend("add shingle %d -> %L: %L", + i, + shcmd->sgl.hashes[i], + id); + + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend("cannot add shingle %d -> " + "%L: %L: %s", + i, + shcmd->sgl.hashes[i], + id, sqlite3_errmsg(backend->db)); + } + } + } + } + else { + msg_warn_fuzzy_backend("cannot add hash to %d -> " + "%*xs: %s", + (gint) cmd->flag, + (gint) sizeof(cmd->digest), cmd->digest, + sqlite3_errmsg(backend->db)); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, + RSPAMD_FUZZY_BACKEND_INSERT); + } + + return (rc == SQLITE_OK); +} + +gboolean +rspamd_fuzzy_backend_sqlite_finish_update(struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source, gboolean version_bump) +{ + gint rc = SQLITE_OK, wal_frames, wal_checkpointed, ver; + + /* Get and update version */ + if (version_bump) { + ver = rspamd_fuzzy_backend_sqlite_version(backend, source); + ++ver; + + rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_SET_VERSION, + (gint64) ver, (gint64) time(NULL), source); + } + + if (rc == SQLITE_OK) { + rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend("cannot commit updates: %s", + sqlite3_errmsg(backend->db)); + rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); + return FALSE; + } + else { + if (!rspamd_sqlite3_sync(backend->db, &wal_frames, &wal_checkpointed)) { + msg_warn_fuzzy_backend("cannot commit checkpoint: %s", + sqlite3_errmsg(backend->db)); + } + else if (wal_checkpointed > 0) { + msg_info_fuzzy_backend("total number of frames in the wal file: " + "%d, checkpointed: %d", + wal_frames, wal_checkpointed); + } + } + } + else { + msg_warn_fuzzy_backend("cannot update version for %s: %s", source, + sqlite3_errmsg(backend->db)); + rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); + return FALSE; + } + + return TRUE; +} + +gboolean +rspamd_fuzzy_backend_sqlite_del(struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd) +{ + int rc = -1; + + if (backend == NULL) { + return FALSE; + } + + rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE, + RSPAMD_FUZZY_BACKEND_CHECK, + cmd->digest); + + if (rc == SQLITE_OK) { + rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_CHECK); + + rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_DELETE, + cmd->digest); + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend("cannot update hash to %d -> " + "%*xs: %s", + (gint) cmd->flag, + (gint) sizeof(cmd->digest), cmd->digest, + sqlite3_errmsg(backend->db)); + } + } + else { + /* Hash is missing */ + rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_CHECK); + } + + return (rc == SQLITE_OK); +} + +gboolean +rspamd_fuzzy_backend_sqlite_sync(struct rspamd_fuzzy_backend_sqlite *backend, + gint64 expire, + gboolean clean_orphaned) +{ + struct orphaned_shingle_elt { + gint64 value; + gint64 number; + }; + + /* Do not do more than 5k ops per step */ + const guint64 max_changes = 5000; + gboolean ret = FALSE; + gint64 expire_lim, expired; + gint rc, i, orphaned_cnt = 0; + GError *err = NULL; + static const gchar orphaned_shingles[] = "SELECT shingles.value,shingles.number " + "FROM shingles " + "LEFT JOIN digests ON " + "shingles.digest_id=digests.id WHERE " + "digests.id IS NULL;"; + sqlite3_stmt *stmt; + GArray *orphaned; + struct orphaned_shingle_elt orphaned_elt, *pelt; + + + if (backend == NULL) { + return FALSE; + } + + /* Perform expire */ + if (expire > 0) { + expire_lim = time(NULL) - expire; + + if (expire_lim > 0) { + ret = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_START); + + if (ret == SQLITE_OK) { + + rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE, + RSPAMD_FUZZY_BACKEND_EXPIRE, expire_lim, max_changes); + + if (rc == SQLITE_OK) { + expired = sqlite3_changes(backend->db); + + if (expired > 0) { + backend->expired += expired; + msg_info_fuzzy_backend("expired %L hashes", expired); + } + } + else { + msg_warn_fuzzy_backend( + "cannot execute expired statement: %s", + sqlite3_errmsg(backend->db)); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, + RSPAMD_FUZZY_BACKEND_EXPIRE); + + ret = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + + if (ret != SQLITE_OK) { + rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); + } + } + if (ret != SQLITE_OK) { + msg_warn_fuzzy_backend("cannot expire db: %s", + sqlite3_errmsg(backend->db)); + } + } + } + + /* Cleanup database */ + if (clean_orphaned) { + ret = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_START); + + if (ret == SQLITE_OK) { + if ((rc = sqlite3_prepare_v2(backend->db, + orphaned_shingles, + -1, + &stmt, + NULL)) != SQLITE_OK) { + msg_warn_fuzzy_backend("cannot cleanup shingles: %s", + sqlite3_errmsg(backend->db)); + } + else { + orphaned = g_array_new(FALSE, + FALSE, + sizeof(struct orphaned_shingle_elt)); + + while (sqlite3_step(stmt) == SQLITE_ROW) { + orphaned_elt.value = sqlite3_column_int64(stmt, 0); + orphaned_elt.number = sqlite3_column_int64(stmt, 1); + g_array_append_val(orphaned, orphaned_elt); + + if (orphaned->len > max_changes) { + break; + } + } + + sqlite3_finalize(stmt); + orphaned_cnt = orphaned->len; + + if (orphaned_cnt > 0) { + msg_info_fuzzy_backend( + "going to delete %ud orphaned shingles", + orphaned_cnt); + /* Need to delete orphaned elements */ + for (i = 0; i < (gint) orphaned_cnt; i++) { + pelt = &g_array_index(orphaned, + struct orphaned_shingle_elt, + i); + rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED, + pelt->value, pelt->number); + } + } + + + g_array_free(orphaned, TRUE); + } + + ret = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + + if (ret == SQLITE_OK) { + msg_info_fuzzy_backend( + "deleted %ud orphaned shingles", + orphaned_cnt); + } + else { + msg_warn_fuzzy_backend( + "cannot synchronize fuzzy backend: %e", + err); + rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); + } + } + } + + return ret; +} + + +void rspamd_fuzzy_backend_sqlite_close(struct rspamd_fuzzy_backend_sqlite *backend) +{ + if (backend != NULL) { + if (backend->db != NULL) { + rspamd_fuzzy_backend_sqlite_close_stmts(backend); + sqlite3_close(backend->db); + } + + if (backend->path != NULL) { + g_free(backend->path); + } + + if (backend->pool) { + rspamd_mempool_delete(backend->pool); + } + + g_free(backend); + } +} + + +gsize rspamd_fuzzy_backend_sqlite_count(struct rspamd_fuzzy_backend_sqlite *backend) +{ + if (backend) { + if (rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE, + RSPAMD_FUZZY_BACKEND_COUNT) == SQLITE_OK) { + backend->count = sqlite3_column_int64( + prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_COUNT); + + return backend->count; + } + + return 0; +} + +gint rspamd_fuzzy_backend_sqlite_version(struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source) +{ + gint ret = 0; + + if (backend) { + if (rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE, + RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) { + ret = sqlite3_column_int64( + prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_VERSION); + } + + return ret; +} + +gsize rspamd_fuzzy_backend_sqlite_expired(struct rspamd_fuzzy_backend_sqlite *backend) +{ + return backend != NULL ? backend->expired : 0; +} + +const gchar * +rspamd_fuzzy_sqlite_backend_id(struct rspamd_fuzzy_backend_sqlite *backend) +{ + return backend != NULL ? backend->id : 0; +} diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h new file mode 100644 index 0000000..766f7c9 --- /dev/null +++ b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h @@ -0,0 +1,107 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef FUZZY_BACKEND_H_ +#define FUZZY_BACKEND_H_ + +#include "config.h" +#include "fuzzy_wire.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct rspamd_fuzzy_backend_sqlite; + +/** + * Open fuzzy backend + * @param path file to open (legacy file will be converted automatically) + * @param err error pointer + * @return backend structure or NULL + */ +struct rspamd_fuzzy_backend_sqlite *rspamd_fuzzy_backend_sqlite_open(const gchar *path, + gboolean vacuum, + GError **err); + +/** + * Check specified fuzzy in the backend + * @param backend + * @param cmd + * @return reply with probability and weight + */ +struct rspamd_fuzzy_reply rspamd_fuzzy_backend_sqlite_check( + struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd, + gint64 expire); + +/** + * Prepare storage for updates (by starting transaction) + */ +gboolean rspamd_fuzzy_backend_sqlite_prepare_update(struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source); + +/** + * Add digest to the database + * @param backend + * @param cmd + * @return + */ +gboolean rspamd_fuzzy_backend_sqlite_add(struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd); + +/** + * Delete digest from the database + * @param backend + * @param cmd + * @return + */ +gboolean rspamd_fuzzy_backend_sqlite_del( + struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd); + +/** + * Commit updates to storage + */ +gboolean rspamd_fuzzy_backend_sqlite_finish_update(struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source, gboolean version_bump); + +/** + * Sync storage + * @param backend + * @return + */ +gboolean rspamd_fuzzy_backend_sqlite_sync(struct rspamd_fuzzy_backend_sqlite *backend, + gint64 expire, + gboolean clean_orphaned); + +/** + * Close storage + * @param backend + */ +void rspamd_fuzzy_backend_sqlite_close(struct rspamd_fuzzy_backend_sqlite *backend); + +gsize rspamd_fuzzy_backend_sqlite_count(struct rspamd_fuzzy_backend_sqlite *backend); + +gint rspamd_fuzzy_backend_sqlite_version(struct rspamd_fuzzy_backend_sqlite *backend, const gchar *source); + +gsize rspamd_fuzzy_backend_sqlite_expired(struct rspamd_fuzzy_backend_sqlite *backend); + +const gchar *rspamd_fuzzy_sqlite_backend_id(struct rspamd_fuzzy_backend_sqlite *backend); + +#ifdef __cplusplus +} +#endif + +#endif /* FUZZY_BACKEND_H_ */ |