/* * Copyright 2023 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "config.h" #include "ref.h" #include "fuzzy_backend.h" #include "fuzzy_backend_redis.h" #include "redis_pool.h" #include "cryptobox.h" #include "str_util.h" #include "upstream.h" #include "contrib/hiredis/hiredis.h" #include "contrib/hiredis/async.h" #include "lua/lua_common.h" #define REDIS_DEFAULT_PORT 6379 #define REDIS_DEFAULT_OBJECT "fuzzy" #define REDIS_DEFAULT_TIMEOUT 2.0 #define msg_err_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \ "fuzzy_redis", session->backend->id, \ G_STRFUNC, \ __VA_ARGS__) #define msg_warn_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \ "fuzzy_redis", session->backend->id, \ G_STRFUNC, \ __VA_ARGS__) #define msg_info_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \ "fuzzy_redis", session->backend->id, \ G_STRFUNC, \ __VA_ARGS__) #define msg_debug_redis_session(...) rspamd_conditional_debug_fast(NULL, NULL, \ rspamd_fuzzy_redis_log_id, "fuzzy_redis", session->backend->id, \ G_STRFUNC, \ __VA_ARGS__) INIT_LOG_MODULE(fuzzy_redis) struct rspamd_fuzzy_backend_redis { lua_State *L; const gchar *redis_object; const gchar *username; const gchar *password; const gchar *dbname; gchar *id; struct rspamd_redis_pool *pool; gdouble timeout; gint conf_ref; bool terminated; ref_entry_t ref; }; enum rspamd_fuzzy_redis_command { RSPAMD_FUZZY_REDIS_COMMAND_COUNT, RSPAMD_FUZZY_REDIS_COMMAND_VERSION, RSPAMD_FUZZY_REDIS_COMMAND_UPDATES, RSPAMD_FUZZY_REDIS_COMMAND_CHECK }; struct rspamd_fuzzy_redis_session { struct rspamd_fuzzy_backend_redis *backend; redisAsyncContext *ctx; ev_timer timeout; const struct rspamd_fuzzy_cmd *cmd; struct ev_loop *event_loop; float prob; gboolean shingles_checked; enum rspamd_fuzzy_redis_command command; guint nargs; guint nadded; guint ndeleted; guint nextended; guint nignored; union { rspamd_fuzzy_check_cb cb_check; rspamd_fuzzy_update_cb cb_update; rspamd_fuzzy_version_cb cb_version; rspamd_fuzzy_count_cb cb_count; } callback; void *cbdata; gchar **argv; gsize *argv_lens; struct upstream *up; guchar found_digest[rspamd_cryptobox_HASHBYTES]; }; static inline struct upstream_list * rspamd_redis_get_servers(struct rspamd_fuzzy_backend_redis *ctx, const gchar *what) { lua_State *L = ctx->L; struct upstream_list *res = NULL; lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->conf_ref); lua_pushstring(L, what); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TUSERDATA) { res = *((struct upstream_list **) lua_touserdata(L, -1)); } else { struct lua_logger_trace tr; gchar outbuf[8192]; memset(&tr, 0, sizeof(tr)); lua_logger_out_type(L, -2, outbuf, sizeof(outbuf) - 1, &tr, LUA_ESCAPE_UNPRINTABLE); msg_err("cannot get %s upstreams for Redis fuzzy storage %s; table content: %s", what, ctx->id, outbuf); } lua_settop(L, 0); return res; } static inline void rspamd_fuzzy_redis_session_free_args(struct rspamd_fuzzy_redis_session *session) { guint i; if (session->argv) { for (i = 0; i < session->nargs; i++) { g_free(session->argv[i]); } g_free(session->argv); g_free(session->argv_lens); } } static void rspamd_fuzzy_redis_session_dtor(struct rspamd_fuzzy_redis_session *session, gboolean is_fatal) { redisAsyncContext *ac; if (session->ctx) { ac = session->ctx; session->ctx = NULL; rspamd_redis_pool_release_connection(session->backend->pool, ac, is_fatal ? RSPAMD_REDIS_RELEASE_FATAL : RSPAMD_REDIS_RELEASE_DEFAULT); } ev_timer_stop(session->event_loop, &session->timeout); rspamd_fuzzy_redis_session_free_args(session); REF_RELEASE(session->backend); rspamd_upstream_unref(session->up); g_free(session); } static void rspamd_fuzzy_backend_redis_dtor(struct rspamd_fuzzy_backend_redis *backend) { if (!backend->terminated && backend->conf_ref != -1) { luaL_unref(backend->L, LUA_REGISTRYINDEX, backend->conf_ref); } if (backend->id) { g_free(backend->id); } g_free(backend); } void * rspamd_fuzzy_backend_init_redis(struct rspamd_fuzzy_backend *bk, const ucl_object_t *obj, struct rspamd_config *cfg, GError **err) { struct rspamd_fuzzy_backend_redis *backend; const ucl_object_t *elt; gboolean ret = FALSE; guchar id_hash[rspamd_cryptobox_HASHBYTES]; rspamd_cryptobox_hash_state_t st; lua_State *L = (lua_State *) cfg->lua_state; gint conf_ref = -1; backend = g_malloc0(sizeof(*backend)); backend->timeout = REDIS_DEFAULT_TIMEOUT; backend->redis_object = REDIS_DEFAULT_OBJECT; backend->L = L; ret = rspamd_lua_try_load_redis(L, obj, cfg, &conf_ref); /* Now try global redis settings */ if (!ret) { elt = ucl_object_lookup(cfg->cfg_ucl_obj, "redis"); if (elt) { const ucl_object_t *specific_obj; specific_obj = ucl_object_lookup_any(elt, "fuzzy", "fuzzy_storage", NULL); if (specific_obj) { ret = rspamd_lua_try_load_redis(L, specific_obj, cfg, &conf_ref); } else { ret = rspamd_lua_try_load_redis(L, elt, cfg, &conf_ref); } } } if (!ret) { msg_err_config("cannot init redis backend for fuzzy storage"); g_free(backend); return NULL; } elt = ucl_object_lookup(obj, "prefix"); if (elt == NULL || ucl_object_type(elt) != UCL_STRING) { backend->redis_object = REDIS_DEFAULT_OBJECT; } else { backend->redis_object = ucl_object_tostring(elt); } backend->conf_ref = conf_ref; /* Check some common table values */ lua_rawgeti(L, LUA_REGISTRYINDEX, conf_ref); lua_pushstring(L, "timeout"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TNUMBER) { backend->timeout = lua_tonumber(L, -1); } lua_pop(L, 1); lua_pushstring(L, "db"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TSTRING) { backend->dbname = rspamd_mempool_strdup(cfg->cfg_pool, lua_tostring(L, -1)); } lua_pop(L, 1); lua_pushstring(L, "username"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TSTRING) { backend->username = rspamd_mempool_strdup(cfg->cfg_pool, lua_tostring(L, -1)); } lua_pop(L, 1); lua_pushstring(L, "password"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TSTRING) { backend->password = rspamd_mempool_strdup(cfg->cfg_pool, lua_tostring(L, -1)); } lua_pop(L, 1); lua_settop(L, 0); REF_INIT_RETAIN(backend, rspamd_fuzzy_backend_redis_dtor); backend->pool = cfg->redis_pool; rspamd_cryptobox_hash_init(&st, NULL, 0); rspamd_cryptobox_hash_update(&st, backend->redis_object, strlen(backend->redis_object)); if (backend->dbname) { rspamd_cryptobox_hash_update(&st, backend->dbname, strlen(backend->dbname)); } if (backend->username) { rspamd_cryptobox_hash_update(&st, backend->username, strlen(backend->username)); } if (backend->password) { rspamd_cryptobox_hash_update(&st, backend->password, strlen(backend->password)); } rspamd_cryptobox_hash_final(&st, id_hash); backend->id = rspamd_encode_base32(id_hash, sizeof(id_hash), RSPAMD_BASE32_DEFAULT); return backend; } static void rspamd_fuzzy_redis_timeout(EV_P_ ev_timer *w, int revents) { struct rspamd_fuzzy_redis_session *session = (struct rspamd_fuzzy_redis_session *) w->data; redisAsyncContext *ac; static char errstr[128]; if (session->ctx) { ac = session->ctx; session->ctx = NULL; ac->err = REDIS_ERR_IO; /* Should be safe as in hiredis it is char[128] */ rspamd_snprintf(errstr, sizeof(errstr), "%s", strerror(ETIMEDOUT)); ac->errstr = errstr; /* This will cause session closing */ rspamd_redis_pool_release_connection(session->backend->pool, ac, RSPAMD_REDIS_RELEASE_FATAL); } } static void rspamd_fuzzy_redis_check_callback(redisAsyncContext *c, gpointer r, gpointer priv); struct _rspamd_fuzzy_shingles_helper { guchar digest[64]; guint found; }; static gint rspamd_fuzzy_backend_redis_shingles_cmp(const void *a, const void *b) { const struct _rspamd_fuzzy_shingles_helper *sha = a, *shb = b; return memcmp(sha->digest, shb->digest, sizeof(sha->digest)); } static void rspamd_fuzzy_redis_shingles_callback(redisAsyncContext *c, gpointer r, gpointer priv) { struct rspamd_fuzzy_redis_session *session = priv; redisReply *reply = r, *cur; struct rspamd_fuzzy_reply rep; GString *key; struct _rspamd_fuzzy_shingles_helper *shingles, *prev = NULL, *sel = NULL; guint i, found = 0, max_found = 0, cur_found = 0; ev_timer_stop(session->event_loop, &session->timeout); memset(&rep, 0, sizeof(rep)); if (c->err == 0 && reply != NULL) { rspamd_upstream_ok(session->up); if (reply->type == REDIS_REPLY_ARRAY && reply->elements == RSPAMD_SHINGLE_SIZE) { shingles = g_alloca(sizeof(struct _rspamd_fuzzy_shingles_helper) * RSPAMD_SHINGLE_SIZE); for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { cur = reply->element[i]; if (cur->type == REDIS_REPLY_STRING) { shingles[i].found = 1; memcpy(shingles[i].digest, cur->str, MIN(64, cur->len)); found++; } else { memset(shingles[i].digest, 0, sizeof(shingles[i].digest)); shingles[i].found = 0; } } if (found > RSPAMD_SHINGLE_SIZE / 2) { /* Now sort to find the most frequent element */ qsort(shingles, RSPAMD_SHINGLE_SIZE, sizeof(struct _rspamd_fuzzy_shingles_helper), rspamd_fuzzy_backend_redis_shingles_cmp); prev = &shingles[0]; for (i = 1; i < RSPAMD_SHINGLE_SIZE; i++) { if (!shingles[i].found) { continue; } if (memcmp(shingles[i].digest, prev->digest, 64) == 0) { cur_found++; if (cur_found > max_found) { max_found = cur_found; sel = &shingles[i]; } } else { cur_found = 1; prev = &shingles[i]; } } if (max_found > RSPAMD_SHINGLE_SIZE / 2) { session->prob = ((float) max_found) / RSPAMD_SHINGLE_SIZE; rep.v1.prob = session->prob; g_assert(sel != NULL); /* Prepare new check command */ rspamd_fuzzy_redis_session_free_args(session); session->nargs = 5; session->argv = g_malloc(sizeof(gchar *) * session->nargs); session->argv_lens = g_malloc(sizeof(gsize) * session->nargs); key = g_string_new(session->backend->redis_object); g_string_append_len(key, sel->digest, sizeof(sel->digest)); session->argv[0] = g_strdup("HMGET"); session->argv_lens[0] = 5; session->argv[1] = key->str; session->argv_lens[1] = key->len; session->argv[2] = g_strdup("V"); session->argv_lens[2] = 1; session->argv[3] = g_strdup("F"); session->argv_lens[3] = 1; session->argv[4] = g_strdup("C"); session->argv_lens[4] = 1; g_string_free(key, FALSE); /* Do not free underlying array */ memcpy(session->found_digest, sel->digest, sizeof(session->cmd->digest)); g_assert(session->ctx != NULL); if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_check_callback, session, session->nargs, (const gchar **) session->argv, session->argv_lens) != REDIS_OK) { if (session->callback.cb_check) { memset(&rep, 0, sizeof(rep)); session->callback.cb_check(&rep, session->cbdata); } rspamd_fuzzy_redis_session_dtor(session, TRUE); } else { /* Add timeout */ session->timeout.data = session; ev_now_update_if_cheap((struct ev_loop *) session->event_loop); ev_timer_init(&session->timeout, rspamd_fuzzy_redis_timeout, session->backend->timeout, 0.0); ev_timer_start(session->event_loop, &session->timeout); } return; } } } else if (reply->type == REDIS_REPLY_ERROR) { msg_err_redis_session("fuzzy backend redis error: \"%s\"", reply->str); } if (session->callback.cb_check) { session->callback.cb_check(&rep, session->cbdata); } } else { if (session->callback.cb_check) { session->callback.cb_check(&rep, session->cbdata); } if (c->errstr) { msg_err_redis_session("error getting shingles: %s", c->errstr); rspamd_upstream_fail(session->up, FALSE, c->errstr); } } rspamd_fuzzy_redis_session_dtor(session, FALSE); } static void rspamd_fuzzy_backend_check_shingles(struct rspamd_fuzzy_redis_session *session) { struct rspamd_fuzzy_reply rep; const struct rspamd_fuzzy_shingle_cmd *shcmd; GString *key; guint i, init_len; rspamd_fuzzy_redis_session_free_args(session); /* First of all check digest */ session->nargs = RSPAMD_SHINGLE_SIZE + 1; session->argv = g_malloc(sizeof(gchar *) * session->nargs); session->argv_lens = g_malloc(sizeof(gsize) * session->nargs); shcmd = (const struct rspamd_fuzzy_shingle_cmd *) session->cmd; session->argv[0] = g_strdup("MGET"); session->argv_lens[0] = 4; init_len = strlen(session->backend->redis_object); for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { key = g_string_sized_new(init_len + 2 + 2 + sizeof("18446744073709551616")); rspamd_printf_gstring(key, "%s_%d_%uL", session->backend->redis_object, i, shcmd->sgl.hashes[i]); session->argv[i + 1] = key->str; session->argv_lens[i + 1] = key->len; g_string_free(key, FALSE); /* Do not free underlying array */ } session->shingles_checked = TRUE; g_assert(session->ctx != NULL); if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_shingles_callback, session, session->nargs, (const gchar **) session->argv, session->argv_lens) != REDIS_OK) { msg_err("cannot execute redis command on %s: %s", rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)), session->ctx->errstr); if (session->callback.cb_check) { memset(&rep, 0, sizeof(rep)); session->callback.cb_check(&rep, session->cbdata); } rspamd_fuzzy_redis_session_dtor(session, TRUE); } else { /* Add timeout */ session->timeout.data = session; ev_now_update_if_cheap((struct ev_loop *) session->event_loop); ev_timer_init(&session->timeout, rspamd_fuzzy_redis_timeout, session->backend->timeout, 0.0); ev_timer_start(session->event_loop, &session->timeout); } } static void rspamd_fuzzy_redis_check_callback(redisAsyncContext *c, gpointer r, gpointer priv) { struct rspamd_fuzzy_redis_session *session = priv; redisReply *reply = r, *cur; struct rspamd_fuzzy_reply rep; gulong value; guint found_elts = 0; ev_timer_stop(session->event_loop, &session->timeout); memset(&rep, 0, sizeof(rep)); if (c->err == 0 && reply != NULL) { rspamd_upstream_ok(session->up); if (reply->type == REDIS_REPLY_ARRAY && reply->elements >= 2) { cur = reply->element[0]; if (cur->type == REDIS_REPLY_STRING) { value = strtoul(cur->str, NULL, 10); rep.v1.value = value; found_elts++; } cur = reply->element[1]; if (cur->type == REDIS_REPLY_STRING) { value = strtoul(cur->str, NULL, 10); rep.v1.flag = value; found_elts++; } if (found_elts >= 2) { rep.v1.prob = session->prob; memcpy(rep.digest, session->found_digest, sizeof(rep.digest)); } rep.ts = 0; if (reply->elements > 2) { cur = reply->element[2]; if (cur->type == REDIS_REPLY_STRING) { rep.ts = strtoul(cur->str, NULL, 10); } } } else if (reply->type == REDIS_REPLY_ERROR) { msg_err_redis_session("fuzzy backend redis error: \"%s\"", reply->str); } if (found_elts < 2) { if (session->cmd->shingles_count > 0 && !session->shingles_checked) { /* We also need to check all shingles here */ rspamd_fuzzy_backend_check_shingles(session); /* Do not free session */ return; } else { if (session->callback.cb_check) { session->callback.cb_check(&rep, session->cbdata); } } } else { if (session->callback.cb_check) { session->callback.cb_check(&rep, session->cbdata); } } } else { if (session->callback.cb_check) { session->callback.cb_check(&rep, session->cbdata); } if (c->errstr) { msg_err_redis_session("error getting hashes on %s: %s", rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)), c->errstr); rspamd_upstream_fail(session->up, FALSE, c->errstr); } } rspamd_fuzzy_redis_session_dtor(session, FALSE); } void rspamd_fuzzy_backend_check_redis(struct rspamd_fuzzy_backend *bk, const struct rspamd_fuzzy_cmd *cmd, rspamd_fuzzy_check_cb cb, void *ud, void *subr_ud) { struct rspamd_fuzzy_backend_redis *backend = subr_ud; struct rspamd_fuzzy_redis_session *session; struct upstream *up; struct upstream_list *ups; rspamd_inet_addr_t *addr; struct rspamd_fuzzy_reply rep; GString *key; g_assert(backend != NULL); ups = rspamd_redis_get_servers(backend, "read_servers"); if (!ups) { if (cb) { memset(&rep, 0, sizeof(rep)); cb(&rep, ud); } return; } session = g_malloc0(sizeof(*session)); session->backend = backend; REF_RETAIN(session->backend); session->callback.cb_check = cb; session->cbdata = ud; session->command = RSPAMD_FUZZY_REDIS_COMMAND_CHECK; session->cmd = cmd; session->prob = 1.0; memcpy(rep.digest, session->cmd->digest, sizeof(rep.digest)); memcpy(session->found_digest, session->cmd->digest, sizeof(rep.digest)); session->event_loop = rspamd_fuzzy_backend_event_base(bk); /* First of all check digest */ session->nargs = 5; session->argv = g_malloc(sizeof(gchar *) * session->nargs); session->argv_lens = g_malloc(sizeof(gsize) * session->nargs); key = g_string_new(backend->redis_object); g_string_append_len(key, cmd->digest, sizeof(cmd->digest)); session->argv[0] = g_strdup("HMGET"); session->argv_lens[0] = 5; session->argv[1] = key->str; session->argv_lens[1] = key->len; session->argv[2] = g_strdup("V"); session->argv_lens[2] = 1; session->argv[3] = g_strdup("F"); session->argv_lens[3] = 1; session->argv[4] = g_strdup("C"); session->argv_lens[4] = 1; g_string_free(key, FALSE); /* Do not free underlying array */ up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); session->up = rspamd_upstream_ref(up); addr = rspamd_upstream_addr_next(up); g_assert(addr != NULL); session->ctx = rspamd_redis_pool_connect(backend->pool, backend->dbname, backend->username, backend->password, rspamd_inet_address_to_string(addr), rspamd_inet_address_get_port(addr)); if (session->ctx == NULL) { rspamd_upstream_fail(up, TRUE, strerror(errno)); rspamd_fuzzy_redis_session_dtor(session, TRUE); if (cb) { memset(&rep, 0, sizeof(rep)); cb(&rep, ud); } } else { if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_check_callback, session, session->nargs, (const gchar **) session->argv, session->argv_lens) != REDIS_OK) { rspamd_fuzzy_redis_session_dtor(session, TRUE); if (cb) { memset(&rep, 0, sizeof(rep)); cb(&rep, ud); } } else { /* Add timeout */ session->timeout.data = session; ev_now_update_if_cheap((struct ev_loop *) session->event_loop); ev_timer_init(&session->timeout, rspamd_fuzzy_redis_timeout, session->backend->timeout, 0.0); ev_timer_start(session->event_loop, &session->timeout); } } } static void rspamd_fuzzy_redis_count_callback(redisAsyncContext *c, gpointer r, gpointer priv) { struct rspamd_fuzzy_redis_session *session = priv; redisReply *reply = r; gulong nelts; ev_timer_stop(session->event_loop, &session->timeout); if (c->err == 0 && reply != NULL) { rspamd_upstream_ok(session->up); if (reply->type == REDIS_REPLY_INTEGER) { if (session->callback.cb_count) { session->callback.cb_count(reply->integer, session->cbdata); } } else if (reply->type == REDIS_REPLY_STRING) { nelts = strtoul(reply->str, NULL, 10); if (session->callback.cb_count) { session->callback.cb_count(nelts, session->cbdata); } } else { if (reply->type == REDIS_REPLY_ERROR) { msg_err_redis_session("fuzzy backend redis error: \"%s\"", reply->str); } if (session->callback.cb_count) { session->callback.cb_count(0, session->cbdata); } } } else { if (session->callback.cb_count) { session->callback.cb_count(0, session->cbdata); } if (c->errstr) { msg_err_redis_session("error getting count on %s: %s", rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)), c->errstr); rspamd_upstream_fail(session->up, FALSE, c->errstr); } } rspamd_fuzzy_redis_session_dtor(session, FALSE); } void rspamd_fuzzy_backend_count_redis(struct rspamd_fuzzy_backend *bk, rspamd_fuzzy_count_cb cb, void *ud, void *subr_ud) { struct rspamd_fuzzy_backend_redis *backend = subr_ud; struct rspamd_fuzzy_redis_session *session; struct upstream *up; struct upstream_list *ups; rspamd_inet_addr_t *addr; GString *key; g_assert(backend != NULL); ups = rspamd_redis_get_servers(backend, "read_servers"); if (!ups) { if (cb) { cb(0, ud); } return; } session = g_malloc0(sizeof(*session)); session->backend = backend; REF_RETAIN(session->backend); session->callback.cb_count = cb; session->cbdata = ud; session->command = RSPAMD_FUZZY_REDIS_COMMAND_COUNT; session->event_loop = rspamd_fuzzy_backend_event_base(bk); session->nargs = 2; session->argv = g_malloc(sizeof(gchar *) * 2); session->argv_lens = g_malloc(sizeof(gsize) * 2); key = g_string_new(backend->redis_object); g_string_append(key, "_count"); session->argv[0] = g_strdup("GET"); session->argv_lens[0] = 3; session->argv[1] = key->str; session->argv_lens[1] = key->len; g_string_free(key, FALSE); /* Do not free underlying array */ up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); session->up = rspamd_upstream_ref(up); addr = rspamd_upstream_addr_next(up); g_assert(addr != NULL); session->ctx = rspamd_redis_pool_connect(backend->pool, backend->dbname, backend->username, backend->password, rspamd_inet_address_to_string(addr), rspamd_inet_address_get_port(addr)); if (session->ctx == NULL) { rspamd_upstream_fail(up, TRUE, strerror(errno)); rspamd_fuzzy_redis_session_dtor(session, TRUE); if (cb) { cb(0, ud); } } else { if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_count_callback, session, session->nargs, (const gchar **) session->argv, session->argv_lens) != REDIS_OK) { rspamd_fuzzy_redis_session_dtor(session, TRUE); if (cb) { cb(0, ud); } } else { /* Add timeout */ session->timeout.data = session; ev_now_update_if_cheap((struct ev_loop *) session->event_loop); ev_timer_init(&session->timeout, rspamd_fuzzy_redis_timeout, session->backend->timeout, 0.0); ev_timer_start(session->event_loop, &session->timeout); } } } static void rspamd_fuzzy_redis_version_callback(redisAsyncContext *c, gpointer r, gpointer priv) { struct rspamd_fuzzy_redis_session *session = priv; redisReply *reply = r; gulong nelts; ev_timer_stop(session->event_loop, &session->timeout); if (c->err == 0 && reply != NULL) { rspamd_upstream_ok(session->up); if (reply->type == REDIS_REPLY_INTEGER) { if (session->callback.cb_version) { session->callback.cb_version(reply->integer, session->cbdata); } } else if (reply->type == REDIS_REPLY_STRING) { nelts = strtoul(reply->str, NULL, 10); if (session->callback.cb_version) { session->callback.cb_version(nelts, session->cbdata); } } else { if (reply->type == REDIS_REPLY_ERROR) { msg_err_redis_session("fuzzy backend redis error: \"%s\"", reply->str); } if (session->callback.cb_version) { session->callback.cb_version(0, session->cbdata); } } } else { if (session->callback.cb_version) { session->callback.cb_version(0, session->cbdata); } if (c->errstr) { msg_err_redis_session("error getting version on %s: %s", rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)), c->errstr); rspamd_upstream_fail(session->up, FALSE, c->errstr); } } rspamd_fuzzy_redis_session_dtor(session, FALSE); } void rspamd_fuzzy_backend_version_redis(struct rspamd_fuzzy_backend *bk, const gchar *src, rspamd_fuzzy_version_cb cb, void *ud, void *subr_ud) { struct rspamd_fuzzy_backend_redis *backend = subr_ud; struct rspamd_fuzzy_redis_session *session; struct upstream *up; struct upstream_list *ups; rspamd_inet_addr_t *addr; GString *key; g_assert(backend != NULL); ups = rspamd_redis_get_servers(backend, "read_servers"); if (!ups) { if (cb) { cb(0, ud); } return; } session = g_malloc0(sizeof(*session)); session->backend = backend; REF_RETAIN(session->backend); session->callback.cb_version = cb; session->cbdata = ud; session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION; session->event_loop = rspamd_fuzzy_backend_event_base(bk); session->nargs = 2; session->argv = g_malloc(sizeof(gchar *) * 2); session->argv_lens = g_malloc(sizeof(gsize) * 2); key = g_string_new(backend->redis_object); g_string_append(key, src); session->argv[0] = g_strdup("GET"); session->argv_lens[0] = 3; session->argv[1] = key->str; session->argv_lens[1] = key->len; g_string_free(key, FALSE); /* Do not free underlying array */ up = rspamd_upstream_get(ups, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); session->up = rspamd_upstream_ref(up); addr = rspamd_upstream_addr_next(up); g_assert(addr != NULL); session->ctx = rspamd_redis_pool_connect(backend->pool, backend->dbname, backend->username, backend->password, rspamd_inet_address_to_string(addr), rspamd_inet_address_get_port(addr)); if (session->ctx == NULL) { rspamd_upstream_fail(up, FALSE, strerror(errno)); rspamd_fuzzy_redis_session_dtor(session, TRUE); if (cb) { cb(0, ud); } } else { if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_version_callback, session, session->nargs, (const gchar **) session->argv, session->argv_lens) != REDIS_OK) { rspamd_fuzzy_redis_session_dtor(session, TRUE); if (cb) { cb(0, ud); } } else { /* Add timeout */ session->timeout.data = session; ev_now_update_if_cheap((struct ev_loop *) session->event_loop); ev_timer_init(&session->timeout, rspamd_fuzzy_redis_timeout, session->backend->timeout, 0.0); ev_timer_start(session->event_loop, &session->timeout); } } } const gchar * rspamd_fuzzy_backend_id_redis(struct rspamd_fuzzy_backend *bk, void *subr_ud) { struct rspamd_fuzzy_backend_redis *backend = subr_ud; g_assert(backend != NULL); return backend->id; } void rspamd_fuzzy_backend_expire_redis(struct rspamd_fuzzy_backend *bk, void *subr_ud) { struct rspamd_fuzzy_backend_redis *backend = subr_ud; g_assert(backend != NULL); } static gboolean rspamd_fuzzy_update_append_command(struct rspamd_fuzzy_backend *bk, struct rspamd_fuzzy_redis_session *session, struct fuzzy_peer_cmd *io_cmd, guint *shift) { GString *key, *value; guint cur_shift = *shift; guint i, klen; struct rspamd_fuzzy_cmd *cmd; if (io_cmd->is_shingle) { cmd = &io_cmd->cmd.shingle.basic; } else { cmd = &io_cmd->cmd.normal; } if (cmd->cmd == FUZZY_WRITE) { /* * For each normal hash addition we do 5 redis commands: * HSET F * HSETNX C