summaryrefslogtreecommitdiffstats
path: root/src/libserver/fuzzy_backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/libserver/fuzzy_backend')
-rw-r--r--src/libserver/fuzzy_backend/fuzzy_backend.c560
-rw-r--r--src/libserver/fuzzy_backend/fuzzy_backend.h131
-rw-r--r--src/libserver/fuzzy_backend/fuzzy_backend_redis.c1666
-rw-r--r--src/libserver/fuzzy_backend/fuzzy_backend_redis.h67
-rw-r--r--src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c1029
-rw-r--r--src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h107
6 files changed, 3560 insertions, 0 deletions
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend.c b/src/libserver/fuzzy_backend/fuzzy_backend.c
new file mode 100644
index 0000000..9099f38
--- /dev/null
+++ b/src/libserver/fuzzy_backend/fuzzy_backend.c
@@ -0,0 +1,560 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "fuzzy_backend.h"
+#include "fuzzy_backend_sqlite.h"
+#include "fuzzy_backend_redis.h"
+#include "cfg_file.h"
+#include "fuzzy_wire.h"
+
+#define DEFAULT_EXPIRE 172800L
+
+enum rspamd_fuzzy_backend_type {
+ RSPAMD_FUZZY_BACKEND_SQLITE = 0,
+ RSPAMD_FUZZY_BACKEND_REDIS = 1,
+};
+
+static void *rspamd_fuzzy_backend_init_sqlite(struct rspamd_fuzzy_backend *bk,
+ const ucl_object_t *obj, struct rspamd_config *cfg, GError **err);
+static void rspamd_fuzzy_backend_check_sqlite(struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_update_sqlite(struct rspamd_fuzzy_backend *bk,
+ GArray *updates, const gchar *src,
+ rspamd_fuzzy_update_cb cb, void *ud,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_count_sqlite(struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_version_sqlite(struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud,
+ void *subr_ud);
+static const gchar *rspamd_fuzzy_backend_id_sqlite(struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_expire_sqlite(struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_close_sqlite(struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+
+struct rspamd_fuzzy_backend_subr {
+ void *(*init)(struct rspamd_fuzzy_backend *bk, const ucl_object_t *obj,
+ struct rspamd_config *cfg,
+ GError **err);
+ void (*check)(struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud,
+ void *subr_ud);
+ void (*update)(struct rspamd_fuzzy_backend *bk,
+ GArray *updates, const gchar *src,
+ rspamd_fuzzy_update_cb cb, void *ud,
+ void *subr_ud);
+ void (*count)(struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud,
+ void *subr_ud);
+ void (*version)(struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud,
+ void *subr_ud);
+ const gchar *(*id)(struct rspamd_fuzzy_backend *bk, void *subr_ud);
+ void (*periodic)(struct rspamd_fuzzy_backend *bk, void *subr_ud);
+ void (*close)(struct rspamd_fuzzy_backend *bk, void *subr_ud);
+};
+
+static const struct rspamd_fuzzy_backend_subr fuzzy_subrs[] = {
+ [RSPAMD_FUZZY_BACKEND_SQLITE] = {
+ .init = rspamd_fuzzy_backend_init_sqlite,
+ .check = rspamd_fuzzy_backend_check_sqlite,
+ .update = rspamd_fuzzy_backend_update_sqlite,
+ .count = rspamd_fuzzy_backend_count_sqlite,
+ .version = rspamd_fuzzy_backend_version_sqlite,
+ .id = rspamd_fuzzy_backend_id_sqlite,
+ .periodic = rspamd_fuzzy_backend_expire_sqlite,
+ .close = rspamd_fuzzy_backend_close_sqlite,
+ },
+ [RSPAMD_FUZZY_BACKEND_REDIS] = {
+ .init = rspamd_fuzzy_backend_init_redis,
+ .check = rspamd_fuzzy_backend_check_redis,
+ .update = rspamd_fuzzy_backend_update_redis,
+ .count = rspamd_fuzzy_backend_count_redis,
+ .version = rspamd_fuzzy_backend_version_redis,
+ .id = rspamd_fuzzy_backend_id_redis,
+ .periodic = rspamd_fuzzy_backend_expire_redis,
+ .close = rspamd_fuzzy_backend_close_redis,
+ }};
+
+struct rspamd_fuzzy_backend {
+ enum rspamd_fuzzy_backend_type type;
+ gdouble expire;
+ gdouble sync;
+ struct ev_loop *event_loop;
+ rspamd_fuzzy_periodic_cb periodic_cb;
+ void *periodic_ud;
+ const struct rspamd_fuzzy_backend_subr *subr;
+ void *subr_ud;
+ ev_timer periodic_event;
+};
+
+static GQuark
+rspamd_fuzzy_backend_quark(void)
+{
+ return g_quark_from_static_string("fuzzy-backend");
+}
+
+static void *
+rspamd_fuzzy_backend_init_sqlite(struct rspamd_fuzzy_backend *bk,
+ const ucl_object_t *obj, struct rspamd_config *cfg, GError **err)
+{
+ const ucl_object_t *elt;
+
+ elt = ucl_object_lookup_any(obj, "hashfile", "hash_file", "file",
+ "database", NULL);
+
+ if (elt == NULL || ucl_object_type(elt) != UCL_STRING) {
+ g_set_error(err, rspamd_fuzzy_backend_quark(),
+ EINVAL, "missing sqlite3 path");
+ return NULL;
+ }
+
+ return rspamd_fuzzy_backend_sqlite_open(ucl_object_tostring(elt),
+ FALSE, err);
+}
+
+static void
+rspamd_fuzzy_backend_check_sqlite(struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+ struct rspamd_fuzzy_reply rep;
+
+ rep = rspamd_fuzzy_backend_sqlite_check(sq, cmd, bk->expire);
+
+ if (cb) {
+ cb(&rep, ud);
+ }
+}
+
+static void
+rspamd_fuzzy_backend_update_sqlite(struct rspamd_fuzzy_backend *bk,
+ GArray *updates, const gchar *src,
+ rspamd_fuzzy_update_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+ gboolean success = FALSE;
+ guint i;
+ struct fuzzy_peer_cmd *io_cmd;
+ struct rspamd_fuzzy_cmd *cmd;
+ gpointer ptr;
+ guint nupdates = 0, nadded = 0, ndeleted = 0, nextended = 0, nignored = 0;
+
+ if (rspamd_fuzzy_backend_sqlite_prepare_update(sq, src)) {
+ for (i = 0; i < updates->len; i++) {
+ io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i);
+
+ if (io_cmd->is_shingle) {
+ cmd = &io_cmd->cmd.shingle.basic;
+ ptr = &io_cmd->cmd.shingle;
+ }
+ else {
+ cmd = &io_cmd->cmd.normal;
+ ptr = &io_cmd->cmd.normal;
+ }
+
+ if (cmd->cmd == FUZZY_WRITE) {
+ rspamd_fuzzy_backend_sqlite_add(sq, ptr);
+ nadded++;
+ nupdates++;
+ }
+ else if (cmd->cmd == FUZZY_DEL) {
+ rspamd_fuzzy_backend_sqlite_del(sq, ptr);
+ ndeleted++;
+ nupdates++;
+ }
+ else {
+ if (cmd->cmd == FUZZY_REFRESH) {
+ nextended++;
+ }
+ else {
+ nignored++;
+ }
+ }
+ }
+
+ if (rspamd_fuzzy_backend_sqlite_finish_update(sq, src,
+ nupdates > 0)) {
+ success = TRUE;
+ }
+ }
+
+ if (cb) {
+ cb(success, nadded, ndeleted, nextended, nignored, ud);
+ }
+}
+
+static void
+rspamd_fuzzy_backend_count_sqlite(struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+ guint64 nhashes;
+
+ nhashes = rspamd_fuzzy_backend_sqlite_count(sq);
+
+ if (cb) {
+ cb(nhashes, ud);
+ }
+}
+
+static void
+rspamd_fuzzy_backend_version_sqlite(struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+ guint64 rev;
+
+ rev = rspamd_fuzzy_backend_sqlite_version(sq, src);
+
+ if (cb) {
+ cb(rev, ud);
+ }
+}
+
+static const gchar *
+rspamd_fuzzy_backend_id_sqlite(struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+
+ return rspamd_fuzzy_sqlite_backend_id(sq);
+}
+static void
+rspamd_fuzzy_backend_expire_sqlite(struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+
+ rspamd_fuzzy_backend_sqlite_sync(sq, bk->expire, TRUE);
+}
+
+static void
+rspamd_fuzzy_backend_close_sqlite(struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+
+ rspamd_fuzzy_backend_sqlite_close(sq);
+}
+
+
+struct rspamd_fuzzy_backend *
+rspamd_fuzzy_backend_create(struct ev_loop *ev_base,
+ const ucl_object_t *config,
+ struct rspamd_config *cfg,
+ GError **err)
+{
+ struct rspamd_fuzzy_backend *bk;
+ enum rspamd_fuzzy_backend_type type = RSPAMD_FUZZY_BACKEND_SQLITE;
+ const ucl_object_t *elt;
+ gdouble expire = DEFAULT_EXPIRE;
+
+ if (config != NULL) {
+ elt = ucl_object_lookup(config, "backend");
+
+ if (elt != NULL && ucl_object_type(elt) == UCL_STRING) {
+ if (strcmp(ucl_object_tostring(elt), "sqlite") == 0) {
+ type = RSPAMD_FUZZY_BACKEND_SQLITE;
+ }
+ else if (strcmp(ucl_object_tostring(elt), "redis") == 0) {
+ type = RSPAMD_FUZZY_BACKEND_REDIS;
+ }
+ else {
+ g_set_error(err, rspamd_fuzzy_backend_quark(),
+ EINVAL, "invalid backend type: %s",
+ ucl_object_tostring(elt));
+ return NULL;
+ }
+ }
+
+ elt = ucl_object_lookup(config, "expire");
+
+ if (elt != NULL) {
+ expire = ucl_object_todouble(elt);
+ }
+ }
+
+ bk = g_malloc0(sizeof(*bk));
+ bk->event_loop = ev_base;
+ bk->expire = expire;
+ bk->type = type;
+ bk->subr = &fuzzy_subrs[type];
+
+ if ((bk->subr_ud = bk->subr->init(bk, config, cfg, err)) == NULL) {
+ g_free(bk);
+
+ return NULL;
+ }
+
+ return bk;
+}
+
+
+void rspamd_fuzzy_backend_check(struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud)
+{
+ g_assert(bk != NULL);
+
+ bk->subr->check(bk, cmd, cb, ud, bk->subr_ud);
+}
+
+static guint
+rspamd_fuzzy_digest_hash(gconstpointer key)
+{
+ guint ret;
+
+ /* Distributed uniformly already */
+ memcpy(&ret, key, sizeof(ret));
+
+ return ret;
+}
+
+static gboolean
+rspamd_fuzzy_digest_equal(gconstpointer v, gconstpointer v2)
+{
+ return memcmp(v, v2, rspamd_cryptobox_HASHBYTES) == 0;
+}
+
+static void
+rspamd_fuzzy_backend_deduplicate_queue(GArray *updates)
+{
+ GHashTable *seen = g_hash_table_new(rspamd_fuzzy_digest_hash,
+ rspamd_fuzzy_digest_equal);
+ struct fuzzy_peer_cmd *io_cmd, *found;
+ struct rspamd_fuzzy_cmd *cmd;
+ guchar *digest;
+ guint i;
+
+ for (i = 0; i < updates->len; i++) {
+ io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i);
+
+ if (io_cmd->is_shingle) {
+ cmd = &io_cmd->cmd.shingle.basic;
+ }
+ else {
+ cmd = &io_cmd->cmd.normal;
+ }
+
+ digest = cmd->digest;
+
+ found = g_hash_table_lookup(seen, digest);
+
+ if (found == NULL) {
+ /* Add to the seen list, if not a duplicate (huh?) */
+ if (cmd->cmd != FUZZY_DUP) {
+ g_hash_table_insert(seen, digest, io_cmd);
+ }
+ }
+ else {
+ if (found->cmd.normal.flag != cmd->flag) {
+ /* TODO: deal with flags better at some point */
+ continue;
+ }
+
+ /* Apply heuristic */
+ switch (cmd->cmd) {
+ case FUZZY_WRITE:
+ if (found->cmd.normal.cmd == FUZZY_WRITE) {
+ /* Already seen */
+ found->cmd.normal.value += cmd->value;
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ else if (found->cmd.normal.cmd == FUZZY_REFRESH) {
+ /* Seen refresh command, remove it as write has higher priority */
+ g_hash_table_replace(seen, digest, io_cmd);
+ found->cmd.normal.cmd = FUZZY_DUP;
+ }
+ else if (found->cmd.normal.cmd == FUZZY_DEL) {
+ /* Request delete + add, weird, but ignore add */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ break;
+ case FUZZY_REFRESH:
+ if (found->cmd.normal.cmd == FUZZY_WRITE) {
+ /* No need to expire, handled by addition */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ else if (found->cmd.normal.cmd == FUZZY_DEL) {
+ /* Request delete + expire, ignore expire */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ else if (found->cmd.normal.cmd == FUZZY_REFRESH) {
+ /* Already handled */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ break;
+ case FUZZY_DEL:
+ /* Delete has priority over all other commands */
+ g_hash_table_replace(seen, digest, io_cmd);
+ found->cmd.normal.cmd = FUZZY_DUP;
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ g_hash_table_unref(seen);
+}
+
+void rspamd_fuzzy_backend_process_updates(struct rspamd_fuzzy_backend *bk,
+ GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb,
+ void *ud)
+{
+ g_assert(bk != NULL);
+ g_assert(updates != NULL);
+
+ if (updates) {
+ rspamd_fuzzy_backend_deduplicate_queue(updates);
+ bk->subr->update(bk, updates, src, cb, ud, bk->subr_ud);
+ }
+ else if (cb) {
+ cb(TRUE, 0, 0, 0, 0, ud);
+ }
+}
+
+
+void rspamd_fuzzy_backend_count(struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud)
+{
+ g_assert(bk != NULL);
+
+ bk->subr->count(bk, cb, ud, bk->subr_ud);
+}
+
+
+void rspamd_fuzzy_backend_version(struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud)
+{
+ g_assert(bk != NULL);
+
+ bk->subr->version(bk, src, cb, ud, bk->subr_ud);
+}
+
+const gchar *
+rspamd_fuzzy_backend_id(struct rspamd_fuzzy_backend *bk)
+{
+ g_assert(bk != NULL);
+
+ if (bk->subr->id) {
+ return bk->subr->id(bk, bk->subr_ud);
+ }
+
+ return NULL;
+}
+
+static inline void
+rspamd_fuzzy_backend_periodic_sync(struct rspamd_fuzzy_backend *bk)
+{
+ if (bk->periodic_cb) {
+ if (bk->periodic_cb(bk->periodic_ud)) {
+ if (bk->subr->periodic) {
+ bk->subr->periodic(bk, bk->subr_ud);
+ }
+ }
+ }
+ else {
+ if (bk->subr->periodic) {
+ bk->subr->periodic(bk, bk->subr_ud);
+ }
+ }
+}
+
+static void
+rspamd_fuzzy_backend_periodic_cb(EV_P_ ev_timer *w, int revents)
+{
+ struct rspamd_fuzzy_backend *bk = (struct rspamd_fuzzy_backend *) w->data;
+ gdouble jittered;
+
+ jittered = rspamd_time_jitter(bk->sync, bk->sync / 2.0);
+ w->repeat = jittered;
+ rspamd_fuzzy_backend_periodic_sync(bk);
+ ev_timer_again(EV_A_ w);
+}
+
+void rspamd_fuzzy_backend_start_update(struct rspamd_fuzzy_backend *bk,
+ gdouble timeout,
+ rspamd_fuzzy_periodic_cb cb,
+ void *ud)
+{
+ gdouble jittered;
+
+ g_assert(bk != NULL);
+
+ if (bk->subr->periodic) {
+ if (bk->sync > 0.0) {
+ ev_timer_stop(bk->event_loop, &bk->periodic_event);
+ }
+
+ if (cb) {
+ bk->periodic_cb = cb;
+ bk->periodic_ud = ud;
+ }
+
+ rspamd_fuzzy_backend_periodic_sync(bk);
+ bk->sync = timeout;
+ jittered = rspamd_time_jitter(timeout, timeout / 2.0);
+
+ bk->periodic_event.data = bk;
+ ev_timer_init(&bk->periodic_event, rspamd_fuzzy_backend_periodic_cb,
+ jittered, 0.0);
+ ev_timer_start(bk->event_loop, &bk->periodic_event);
+ }
+}
+
+void rspamd_fuzzy_backend_close(struct rspamd_fuzzy_backend *bk)
+{
+ g_assert(bk != NULL);
+
+ if (bk->sync > 0.0) {
+ rspamd_fuzzy_backend_periodic_sync(bk);
+ ev_timer_stop(bk->event_loop, &bk->periodic_event);
+ }
+
+ bk->subr->close(bk, bk->subr_ud);
+
+ g_free(bk);
+}
+
+struct ev_loop *
+rspamd_fuzzy_backend_event_base(struct rspamd_fuzzy_backend *backend)
+{
+ return backend->event_loop;
+}
+
+gdouble
+rspamd_fuzzy_backend_get_expire(struct rspamd_fuzzy_backend *backend)
+{
+ return backend->expire;
+}
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend.h b/src/libserver/fuzzy_backend/fuzzy_backend.h
new file mode 100644
index 0000000..a1b74bc
--- /dev/null
+++ b/src/libserver/fuzzy_backend/fuzzy_backend.h
@@ -0,0 +1,131 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef SRC_LIBSERVER_FUZZY_BACKEND_H_
+#define SRC_LIBSERVER_FUZZY_BACKEND_H_
+
+#include "config.h"
+#include "contrib/libev/ev.h"
+#include "fuzzy_wire.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct rspamd_fuzzy_backend;
+struct rspamd_config;
+
+/*
+ * Callbacks for fuzzy methods
+ */
+typedef void (*rspamd_fuzzy_check_cb)(struct rspamd_fuzzy_reply *rep, void *ud);
+
+typedef void (*rspamd_fuzzy_update_cb)(gboolean success,
+ guint nadded,
+ guint ndeleted,
+ guint nextended,
+ guint nignored,
+ void *ud);
+
+typedef void (*rspamd_fuzzy_version_cb)(guint64 rev, void *ud);
+
+typedef void (*rspamd_fuzzy_count_cb)(guint64 count, void *ud);
+
+typedef gboolean (*rspamd_fuzzy_periodic_cb)(void *ud);
+
+/**
+ * Open fuzzy backend
+ * @param ev_base
+ * @param config
+ * @param err
+ * @return
+ */
+struct rspamd_fuzzy_backend *rspamd_fuzzy_backend_create(struct ev_loop *ev_base,
+ const ucl_object_t *config,
+ struct rspamd_config *cfg,
+ GError **err);
+
+
+/**
+ * Check a specific hash in storage
+ * @param cmd
+ * @param cb
+ * @param ud
+ */
+void rspamd_fuzzy_backend_check(struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud);
+
+/**
+ * Process updates for a specific queue
+ * @param bk
+ * @param updates queue of struct fuzzy_peer_cmd
+ * @param src
+ */
+void rspamd_fuzzy_backend_process_updates(struct rspamd_fuzzy_backend *bk,
+ GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb,
+ void *ud);
+
+/**
+ * Gets number of hashes from the backend
+ * @param bk
+ * @param cb
+ * @param ud
+ */
+void rspamd_fuzzy_backend_count(struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud);
+
+/**
+ * Returns number of revision for a specific source
+ * @param bk
+ * @param src
+ * @param cb
+ * @param ud
+ */
+void rspamd_fuzzy_backend_version(struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud);
+
+/**
+ * Returns unique id for backend
+ * @param backend
+ * @return
+ */
+const gchar *rspamd_fuzzy_backend_id(struct rspamd_fuzzy_backend *backend);
+
+/**
+ * Starts expire process for the backend
+ * @param backend
+ */
+void rspamd_fuzzy_backend_start_update(struct rspamd_fuzzy_backend *backend,
+ gdouble timeout,
+ rspamd_fuzzy_periodic_cb cb,
+ void *ud);
+
+struct ev_loop *rspamd_fuzzy_backend_event_base(struct rspamd_fuzzy_backend *backend);
+
+gdouble rspamd_fuzzy_backend_get_expire(struct rspamd_fuzzy_backend *backend);
+
+/**
+ * Closes backend
+ * @param backend
+ */
+void rspamd_fuzzy_backend_close(struct rspamd_fuzzy_backend *backend);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* SRC_LIBSERVER_FUZZY_BACKEND_H_ */
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_redis.c b/src/libserver/fuzzy_backend/fuzzy_backend_redis.c
new file mode 100644
index 0000000..7ab7ca6
--- /dev/null
+++ b/src/libserver/fuzzy_backend/fuzzy_backend_redis.c
@@ -0,0 +1,1666 @@
+/*
+ * Copyright 2023 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "ref.h"
+#include "fuzzy_backend.h"
+#include "fuzzy_backend_redis.h"
+#include "redis_pool.h"
+#include "cryptobox.h"
+#include "str_util.h"
+#include "upstream.h"
+#include "contrib/hiredis/hiredis.h"
+#include "contrib/hiredis/async.h"
+#include "lua/lua_common.h"
+
+#define REDIS_DEFAULT_PORT 6379
+#define REDIS_DEFAULT_OBJECT "fuzzy"
+#define REDIS_DEFAULT_TIMEOUT 2.0
+
+#define msg_err_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
+ "fuzzy_redis", session->backend->id, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_warn_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \
+ "fuzzy_redis", session->backend->id, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_info_redis_session(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \
+ "fuzzy_redis", session->backend->id, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_debug_redis_session(...) rspamd_conditional_debug_fast(NULL, NULL, \
+ rspamd_fuzzy_redis_log_id, "fuzzy_redis", session->backend->id, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+
+INIT_LOG_MODULE(fuzzy_redis)
+
+struct rspamd_fuzzy_backend_redis {
+ lua_State *L;
+ const gchar *redis_object;
+ const gchar *username;
+ const gchar *password;
+ const gchar *dbname;
+ gchar *id;
+ struct rspamd_redis_pool *pool;
+ gdouble timeout;
+ gint conf_ref;
+ bool terminated;
+ ref_entry_t ref;
+};
+
+enum rspamd_fuzzy_redis_command {
+ RSPAMD_FUZZY_REDIS_COMMAND_COUNT,
+ RSPAMD_FUZZY_REDIS_COMMAND_VERSION,
+ RSPAMD_FUZZY_REDIS_COMMAND_UPDATES,
+ RSPAMD_FUZZY_REDIS_COMMAND_CHECK
+};
+
+struct rspamd_fuzzy_redis_session {
+ struct rspamd_fuzzy_backend_redis *backend;
+ redisAsyncContext *ctx;
+ ev_timer timeout;
+ const struct rspamd_fuzzy_cmd *cmd;
+ struct ev_loop *event_loop;
+ float prob;
+ gboolean shingles_checked;
+
+ enum rspamd_fuzzy_redis_command command;
+ guint nargs;
+
+ guint nadded;
+ guint ndeleted;
+ guint nextended;
+ guint nignored;
+
+ union {
+ rspamd_fuzzy_check_cb cb_check;
+ rspamd_fuzzy_update_cb cb_update;
+ rspamd_fuzzy_version_cb cb_version;
+ rspamd_fuzzy_count_cb cb_count;
+ } callback;
+ void *cbdata;
+
+ gchar **argv;
+ gsize *argv_lens;
+ struct upstream *up;
+ guchar found_digest[rspamd_cryptobox_HASHBYTES];
+};
+
+static inline struct upstream_list *
+rspamd_redis_get_servers(struct rspamd_fuzzy_backend_redis *ctx,
+ const gchar *what)
+{
+ lua_State *L = ctx->L;
+ struct upstream_list *res = NULL;
+
+ lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->conf_ref);
+ lua_pushstring(L, what);
+ lua_gettable(L, -2);
+
+ if (lua_type(L, -1) == LUA_TUSERDATA) {
+ res = *((struct upstream_list **) lua_touserdata(L, -1));
+ }
+ else {
+ struct lua_logger_trace tr;
+ gchar outbuf[8192];
+
+ memset(&tr, 0, sizeof(tr));
+ lua_logger_out_type(L, -2, outbuf, sizeof(outbuf) - 1, &tr,
+ LUA_ESCAPE_UNPRINTABLE);
+
+ msg_err("cannot get %s upstreams for Redis fuzzy storage %s; table content: %s",
+ what, ctx->id, outbuf);
+ }
+
+ lua_settop(L, 0);
+
+ return res;
+}
+
+static inline void
+rspamd_fuzzy_redis_session_free_args(struct rspamd_fuzzy_redis_session *session)
+{
+ guint i;
+
+ if (session->argv) {
+ for (i = 0; i < session->nargs; i++) {
+ g_free(session->argv[i]);
+ }
+
+ g_free(session->argv);
+ g_free(session->argv_lens);
+ }
+}
+static void
+rspamd_fuzzy_redis_session_dtor(struct rspamd_fuzzy_redis_session *session,
+ gboolean is_fatal)
+{
+ redisAsyncContext *ac;
+
+
+ if (session->ctx) {
+ ac = session->ctx;
+ session->ctx = NULL;
+ rspamd_redis_pool_release_connection(session->backend->pool,
+ ac,
+ is_fatal ? RSPAMD_REDIS_RELEASE_FATAL : RSPAMD_REDIS_RELEASE_DEFAULT);
+ }
+
+ ev_timer_stop(session->event_loop, &session->timeout);
+ rspamd_fuzzy_redis_session_free_args(session);
+
+ REF_RELEASE(session->backend);
+ rspamd_upstream_unref(session->up);
+ g_free(session);
+}
+
+static void
+rspamd_fuzzy_backend_redis_dtor(struct rspamd_fuzzy_backend_redis *backend)
+{
+ if (!backend->terminated && backend->conf_ref != -1) {
+ luaL_unref(backend->L, LUA_REGISTRYINDEX, backend->conf_ref);
+ }
+
+ if (backend->id) {
+ g_free(backend->id);
+ }
+
+ g_free(backend);
+}
+
+void *
+rspamd_fuzzy_backend_init_redis(struct rspamd_fuzzy_backend *bk,
+ const ucl_object_t *obj, struct rspamd_config *cfg, GError **err)
+{
+ struct rspamd_fuzzy_backend_redis *backend;
+ const ucl_object_t *elt;
+ gboolean ret = FALSE;
+ guchar id_hash[rspamd_cryptobox_HASHBYTES];
+ rspamd_cryptobox_hash_state_t st;
+ lua_State *L = (lua_State *) cfg->lua_state;
+ gint conf_ref = -1;
+
+ backend = g_malloc0(sizeof(*backend));
+
+ backend->timeout = REDIS_DEFAULT_TIMEOUT;
+ backend->redis_object = REDIS_DEFAULT_OBJECT;
+ backend->L = L;
+
+ ret = rspamd_lua_try_load_redis(L, obj, cfg, &conf_ref);
+
+ /* Now try global redis settings */
+ if (!ret) {
+ elt = ucl_object_lookup(cfg->cfg_ucl_obj, "redis");
+
+ if (elt) {
+ const ucl_object_t *specific_obj;
+
+ specific_obj = ucl_object_lookup_any(elt, "fuzzy", "fuzzy_storage",
+ NULL);
+
+ if (specific_obj) {
+ ret = rspamd_lua_try_load_redis(L, specific_obj, cfg, &conf_ref);
+ }
+ else {
+ ret = rspamd_lua_try_load_redis(L, elt, cfg, &conf_ref);
+ }
+ }
+ }
+
+ if (!ret) {
+ msg_err_config("cannot init redis backend for fuzzy storage");
+ g_free(backend);
+
+ return NULL;
+ }
+
+ elt = ucl_object_lookup(obj, "prefix");
+ if (elt == NULL || ucl_object_type(elt) != UCL_STRING) {
+ backend->redis_object = REDIS_DEFAULT_OBJECT;
+ }
+ else {
+ backend->redis_object = ucl_object_tostring(elt);
+ }
+
+ backend->conf_ref = conf_ref;
+
+ /* Check some common table values */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, conf_ref);
+
+ lua_pushstring(L, "timeout");
+ lua_gettable(L, -2);
+ if (lua_type(L, -1) == LUA_TNUMBER) {
+ backend->timeout = lua_tonumber(L, -1);
+ }
+ lua_pop(L, 1);
+
+ lua_pushstring(L, "db");
+ lua_gettable(L, -2);
+ if (lua_type(L, -1) == LUA_TSTRING) {
+ backend->dbname = rspamd_mempool_strdup(cfg->cfg_pool,
+ lua_tostring(L, -1));
+ }
+ lua_pop(L, 1);
+
+ lua_pushstring(L, "username");
+ lua_gettable(L, -2);
+ if (lua_type(L, -1) == LUA_TSTRING) {
+ backend->username = rspamd_mempool_strdup(cfg->cfg_pool,
+ lua_tostring(L, -1));
+ }
+ lua_pop(L, 1);
+
+ lua_pushstring(L, "password");
+ lua_gettable(L, -2);
+ if (lua_type(L, -1) == LUA_TSTRING) {
+ backend->password = rspamd_mempool_strdup(cfg->cfg_pool,
+ lua_tostring(L, -1));
+ }
+ lua_pop(L, 1);
+
+ lua_settop(L, 0);
+
+ REF_INIT_RETAIN(backend, rspamd_fuzzy_backend_redis_dtor);
+ backend->pool = cfg->redis_pool;
+ rspamd_cryptobox_hash_init(&st, NULL, 0);
+ rspamd_cryptobox_hash_update(&st, backend->redis_object,
+ strlen(backend->redis_object));
+
+ if (backend->dbname) {
+ rspamd_cryptobox_hash_update(&st, backend->dbname,
+ strlen(backend->dbname));
+ }
+
+ if (backend->username) {
+ rspamd_cryptobox_hash_update(&st, backend->username,
+ strlen(backend->username));
+ }
+
+ if (backend->password) {
+ rspamd_cryptobox_hash_update(&st, backend->password,
+ strlen(backend->password));
+ }
+
+ rspamd_cryptobox_hash_final(&st, id_hash);
+ backend->id = rspamd_encode_base32(id_hash, sizeof(id_hash), RSPAMD_BASE32_DEFAULT);
+
+ return backend;
+}
+
+static void
+rspamd_fuzzy_redis_timeout(EV_P_ ev_timer *w, int revents)
+{
+ struct rspamd_fuzzy_redis_session *session =
+ (struct rspamd_fuzzy_redis_session *) w->data;
+ redisAsyncContext *ac;
+ static char errstr[128];
+
+ if (session->ctx) {
+ ac = session->ctx;
+ session->ctx = NULL;
+ ac->err = REDIS_ERR_IO;
+ /* Should be safe as in hiredis it is char[128] */
+ rspamd_snprintf(errstr, sizeof(errstr), "%s", strerror(ETIMEDOUT));
+ ac->errstr = errstr;
+
+ /* This will cause session closing */
+ rspamd_redis_pool_release_connection(session->backend->pool,
+ ac, RSPAMD_REDIS_RELEASE_FATAL);
+ }
+}
+
+static void rspamd_fuzzy_redis_check_callback(redisAsyncContext *c, gpointer r,
+ gpointer priv);
+
+struct _rspamd_fuzzy_shingles_helper {
+ guchar digest[64];
+ guint found;
+};
+
+static gint
+rspamd_fuzzy_backend_redis_shingles_cmp(const void *a, const void *b)
+{
+ const struct _rspamd_fuzzy_shingles_helper *sha = a,
+ *shb = b;
+
+ return memcmp(sha->digest, shb->digest, sizeof(sha->digest));
+}
+
+static void
+rspamd_fuzzy_redis_shingles_callback(redisAsyncContext *c, gpointer r,
+ gpointer priv)
+{
+ struct rspamd_fuzzy_redis_session *session = priv;
+ redisReply *reply = r, *cur;
+ struct rspamd_fuzzy_reply rep;
+ GString *key;
+ struct _rspamd_fuzzy_shingles_helper *shingles, *prev = NULL, *sel = NULL;
+ guint i, found = 0, max_found = 0, cur_found = 0;
+
+ ev_timer_stop(session->event_loop, &session->timeout);
+ memset(&rep, 0, sizeof(rep));
+
+ if (c->err == 0 && reply != NULL) {
+ rspamd_upstream_ok(session->up);
+
+ if (reply->type == REDIS_REPLY_ARRAY &&
+ reply->elements == RSPAMD_SHINGLE_SIZE) {
+ shingles = g_alloca(sizeof(struct _rspamd_fuzzy_shingles_helper) *
+ RSPAMD_SHINGLE_SIZE);
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
+ cur = reply->element[i];
+
+ if (cur->type == REDIS_REPLY_STRING) {
+ shingles[i].found = 1;
+ memcpy(shingles[i].digest, cur->str, MIN(64, cur->len));
+ found++;
+ }
+ else {
+ memset(shingles[i].digest, 0, sizeof(shingles[i].digest));
+ shingles[i].found = 0;
+ }
+ }
+
+ if (found > RSPAMD_SHINGLE_SIZE / 2) {
+ /* Now sort to find the most frequent element */
+ qsort(shingles, RSPAMD_SHINGLE_SIZE,
+ sizeof(struct _rspamd_fuzzy_shingles_helper),
+ rspamd_fuzzy_backend_redis_shingles_cmp);
+
+ prev = &shingles[0];
+
+ for (i = 1; i < RSPAMD_SHINGLE_SIZE; i++) {
+ if (!shingles[i].found) {
+ continue;
+ }
+
+ if (memcmp(shingles[i].digest, prev->digest, 64) == 0) {
+ cur_found++;
+
+ if (cur_found > max_found) {
+ max_found = cur_found;
+ sel = &shingles[i];
+ }
+ }
+ else {
+ cur_found = 1;
+ prev = &shingles[i];
+ }
+ }
+
+ if (max_found > RSPAMD_SHINGLE_SIZE / 2) {
+ session->prob = ((float) max_found) / RSPAMD_SHINGLE_SIZE;
+ rep.v1.prob = session->prob;
+
+ g_assert(sel != NULL);
+
+ /* Prepare new check command */
+ rspamd_fuzzy_redis_session_free_args(session);
+ session->nargs = 5;
+ session->argv = g_malloc(sizeof(gchar *) * session->nargs);
+ session->argv_lens = g_malloc(sizeof(gsize) * session->nargs);
+
+ key = g_string_new(session->backend->redis_object);
+ g_string_append_len(key, sel->digest, sizeof(sel->digest));
+ session->argv[0] = g_strdup("HMGET");
+ session->argv_lens[0] = 5;
+ session->argv[1] = key->str;
+ session->argv_lens[1] = key->len;
+ session->argv[2] = g_strdup("V");
+ session->argv_lens[2] = 1;
+ session->argv[3] = g_strdup("F");
+ session->argv_lens[3] = 1;
+ session->argv[4] = g_strdup("C");
+ session->argv_lens[4] = 1;
+ g_string_free(key, FALSE); /* Do not free underlying array */
+ memcpy(session->found_digest, sel->digest,
+ sizeof(session->cmd->digest));
+
+ g_assert(session->ctx != NULL);
+ if (redisAsyncCommandArgv(session->ctx,
+ rspamd_fuzzy_redis_check_callback,
+ session, session->nargs,
+ (const gchar **) session->argv,
+ session->argv_lens) != REDIS_OK) {
+
+ if (session->callback.cb_check) {
+ memset(&rep, 0, sizeof(rep));
+ session->callback.cb_check(&rep, session->cbdata);
+ }
+
+ rspamd_fuzzy_redis_session_dtor(session, TRUE);
+ }
+ else {
+ /* Add timeout */
+ session->timeout.data = session;
+ ev_now_update_if_cheap((struct ev_loop *) session->event_loop);
+ ev_timer_init(&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start(session->event_loop, &session->timeout);
+ }
+
+ return;
+ }
+ }
+ }
+ else if (reply->type == REDIS_REPLY_ERROR) {
+ msg_err_redis_session("fuzzy backend redis error: \"%s\"",
+ reply->str);
+ }
+
+ if (session->callback.cb_check) {
+ session->callback.cb_check(&rep, session->cbdata);
+ }
+ }
+ else {
+ if (session->callback.cb_check) {
+ session->callback.cb_check(&rep, session->cbdata);
+ }
+
+ if (c->errstr) {
+ msg_err_redis_session("error getting shingles: %s", c->errstr);
+ rspamd_upstream_fail(session->up, FALSE, c->errstr);
+ }
+ }
+
+ rspamd_fuzzy_redis_session_dtor(session, FALSE);
+}
+
+static void
+rspamd_fuzzy_backend_check_shingles(struct rspamd_fuzzy_redis_session *session)
+{
+ struct rspamd_fuzzy_reply rep;
+ const struct rspamd_fuzzy_shingle_cmd *shcmd;
+ GString *key;
+ guint i, init_len;
+
+ rspamd_fuzzy_redis_session_free_args(session);
+ /* First of all check digest */
+ session->nargs = RSPAMD_SHINGLE_SIZE + 1;
+ session->argv = g_malloc(sizeof(gchar *) * session->nargs);
+ session->argv_lens = g_malloc(sizeof(gsize) * session->nargs);
+ shcmd = (const struct rspamd_fuzzy_shingle_cmd *) session->cmd;
+
+ session->argv[0] = g_strdup("MGET");
+ session->argv_lens[0] = 4;
+ init_len = strlen(session->backend->redis_object);
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
+
+ key = g_string_sized_new(init_len + 2 + 2 + sizeof("18446744073709551616"));
+ rspamd_printf_gstring(key, "%s_%d_%uL", session->backend->redis_object,
+ i, shcmd->sgl.hashes[i]);
+ session->argv[i + 1] = key->str;
+ session->argv_lens[i + 1] = key->len;
+ g_string_free(key, FALSE); /* Do not free underlying array */
+ }
+
+ session->shingles_checked = TRUE;
+
+ g_assert(session->ctx != NULL);
+
+ if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_shingles_callback,
+ session, session->nargs,
+ (const gchar **) session->argv, session->argv_lens) != REDIS_OK) {
+ msg_err("cannot execute redis command on %s: %s",
+ rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)),
+ session->ctx->errstr);
+
+ if (session->callback.cb_check) {
+ memset(&rep, 0, sizeof(rep));
+ session->callback.cb_check(&rep, session->cbdata);
+ }
+
+ rspamd_fuzzy_redis_session_dtor(session, TRUE);
+ }
+ else {
+ /* Add timeout */
+ session->timeout.data = session;
+ ev_now_update_if_cheap((struct ev_loop *) session->event_loop);
+ ev_timer_init(&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start(session->event_loop, &session->timeout);
+ }
+}
+
+static void
+rspamd_fuzzy_redis_check_callback(redisAsyncContext *c, gpointer r,
+ gpointer priv)
+{
+ struct rspamd_fuzzy_redis_session *session = priv;
+ redisReply *reply = r, *cur;
+ struct rspamd_fuzzy_reply rep;
+ gulong value;
+ guint found_elts = 0;
+
+ ev_timer_stop(session->event_loop, &session->timeout);
+ memset(&rep, 0, sizeof(rep));
+
+ if (c->err == 0 && reply != NULL) {
+ rspamd_upstream_ok(session->up);
+
+ if (reply->type == REDIS_REPLY_ARRAY && reply->elements >= 2) {
+ cur = reply->element[0];
+
+ if (cur->type == REDIS_REPLY_STRING) {
+ value = strtoul(cur->str, NULL, 10);
+ rep.v1.value = value;
+ found_elts++;
+ }
+
+ cur = reply->element[1];
+
+ if (cur->type == REDIS_REPLY_STRING) {
+ value = strtoul(cur->str, NULL, 10);
+ rep.v1.flag = value;
+ found_elts++;
+ }
+
+ if (found_elts >= 2) {
+ rep.v1.prob = session->prob;
+ memcpy(rep.digest, session->found_digest, sizeof(rep.digest));
+ }
+
+ rep.ts = 0;
+
+ if (reply->elements > 2) {
+ cur = reply->element[2];
+
+ if (cur->type == REDIS_REPLY_STRING) {
+ rep.ts = strtoul(cur->str, NULL, 10);
+ }
+ }
+ }
+ else if (reply->type == REDIS_REPLY_ERROR) {
+ msg_err_redis_session("fuzzy backend redis error: \"%s\"",
+ reply->str);
+ }
+
+ if (found_elts < 2) {
+ if (session->cmd->shingles_count > 0 && !session->shingles_checked) {
+ /* We also need to check all shingles here */
+ rspamd_fuzzy_backend_check_shingles(session);
+ /* Do not free session */
+ return;
+ }
+ else {
+ if (session->callback.cb_check) {
+ session->callback.cb_check(&rep, session->cbdata);
+ }
+ }
+ }
+ else {
+ if (session->callback.cb_check) {
+ session->callback.cb_check(&rep, session->cbdata);
+ }
+ }
+ }
+ else {
+ if (session->callback.cb_check) {
+ session->callback.cb_check(&rep, session->cbdata);
+ }
+
+ if (c->errstr) {
+ msg_err_redis_session("error getting hashes on %s: %s",
+ rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)),
+ c->errstr);
+ rspamd_upstream_fail(session->up, FALSE, c->errstr);
+ }
+ }
+
+ rspamd_fuzzy_redis_session_dtor(session, FALSE);
+}
+
+void rspamd_fuzzy_backend_check_redis(struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ struct rspamd_fuzzy_redis_session *session;
+ struct upstream *up;
+ struct upstream_list *ups;
+ rspamd_inet_addr_t *addr;
+ struct rspamd_fuzzy_reply rep;
+ GString *key;
+
+ g_assert(backend != NULL);
+
+ ups = rspamd_redis_get_servers(backend, "read_servers");
+ if (!ups) {
+ if (cb) {
+ memset(&rep, 0, sizeof(rep));
+ cb(&rep, ud);
+ }
+
+ return;
+ }
+
+ session = g_malloc0(sizeof(*session));
+ session->backend = backend;
+ REF_RETAIN(session->backend);
+
+ session->callback.cb_check = cb;
+ session->cbdata = ud;
+ session->command = RSPAMD_FUZZY_REDIS_COMMAND_CHECK;
+ session->cmd = cmd;
+ session->prob = 1.0;
+ memcpy(rep.digest, session->cmd->digest, sizeof(rep.digest));
+ memcpy(session->found_digest, session->cmd->digest, sizeof(rep.digest));
+ session->event_loop = rspamd_fuzzy_backend_event_base(bk);
+
+ /* First of all check digest */
+ session->nargs = 5;
+ session->argv = g_malloc(sizeof(gchar *) * session->nargs);
+ session->argv_lens = g_malloc(sizeof(gsize) * session->nargs);
+
+ key = g_string_new(backend->redis_object);
+ g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
+ session->argv[0] = g_strdup("HMGET");
+ session->argv_lens[0] = 5;
+ session->argv[1] = key->str;
+ session->argv_lens[1] = key->len;
+ session->argv[2] = g_strdup("V");
+ session->argv_lens[2] = 1;
+ session->argv[3] = g_strdup("F");
+ session->argv_lens[3] = 1;
+ session->argv[4] = g_strdup("C");
+ session->argv_lens[4] = 1;
+ g_string_free(key, FALSE); /* Do not free underlying array */
+
+ up = rspamd_upstream_get(ups,
+ RSPAMD_UPSTREAM_ROUND_ROBIN,
+ NULL,
+ 0);
+
+ session->up = rspamd_upstream_ref(up);
+ addr = rspamd_upstream_addr_next(up);
+ g_assert(addr != NULL);
+ session->ctx = rspamd_redis_pool_connect(backend->pool,
+ backend->dbname,
+ backend->username, backend->password,
+ rspamd_inet_address_to_string(addr),
+ rspamd_inet_address_get_port(addr));
+
+ if (session->ctx == NULL) {
+ rspamd_upstream_fail(up, TRUE, strerror(errno));
+ rspamd_fuzzy_redis_session_dtor(session, TRUE);
+
+ if (cb) {
+ memset(&rep, 0, sizeof(rep));
+ cb(&rep, ud);
+ }
+ }
+ else {
+ if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_check_callback,
+ session, session->nargs,
+ (const gchar **) session->argv, session->argv_lens) != REDIS_OK) {
+ rspamd_fuzzy_redis_session_dtor(session, TRUE);
+
+ if (cb) {
+ memset(&rep, 0, sizeof(rep));
+ cb(&rep, ud);
+ }
+ }
+ else {
+ /* Add timeout */
+ session->timeout.data = session;
+ ev_now_update_if_cheap((struct ev_loop *) session->event_loop);
+ ev_timer_init(&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start(session->event_loop, &session->timeout);
+ }
+ }
+}
+
+static void
+rspamd_fuzzy_redis_count_callback(redisAsyncContext *c, gpointer r,
+ gpointer priv)
+{
+ struct rspamd_fuzzy_redis_session *session = priv;
+ redisReply *reply = r;
+ gulong nelts;
+
+ ev_timer_stop(session->event_loop, &session->timeout);
+
+ if (c->err == 0 && reply != NULL) {
+ rspamd_upstream_ok(session->up);
+
+ if (reply->type == REDIS_REPLY_INTEGER) {
+ if (session->callback.cb_count) {
+ session->callback.cb_count(reply->integer, session->cbdata);
+ }
+ }
+ else if (reply->type == REDIS_REPLY_STRING) {
+ nelts = strtoul(reply->str, NULL, 10);
+
+ if (session->callback.cb_count) {
+ session->callback.cb_count(nelts, session->cbdata);
+ }
+ }
+ else {
+ if (reply->type == REDIS_REPLY_ERROR) {
+ msg_err_redis_session("fuzzy backend redis error: \"%s\"",
+ reply->str);
+ }
+ if (session->callback.cb_count) {
+ session->callback.cb_count(0, session->cbdata);
+ }
+ }
+ }
+ else {
+ if (session->callback.cb_count) {
+ session->callback.cb_count(0, session->cbdata);
+ }
+
+ if (c->errstr) {
+ msg_err_redis_session("error getting count on %s: %s",
+ rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)),
+ c->errstr);
+ rspamd_upstream_fail(session->up, FALSE, c->errstr);
+ }
+ }
+
+ rspamd_fuzzy_redis_session_dtor(session, FALSE);
+}
+
+void rspamd_fuzzy_backend_count_redis(struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ struct rspamd_fuzzy_redis_session *session;
+ struct upstream *up;
+ struct upstream_list *ups;
+ rspamd_inet_addr_t *addr;
+ GString *key;
+
+ g_assert(backend != NULL);
+
+ ups = rspamd_redis_get_servers(backend, "read_servers");
+ if (!ups) {
+ if (cb) {
+ cb(0, ud);
+ }
+
+ return;
+ }
+
+ session = g_malloc0(sizeof(*session));
+ session->backend = backend;
+ REF_RETAIN(session->backend);
+
+ session->callback.cb_count = cb;
+ session->cbdata = ud;
+ session->command = RSPAMD_FUZZY_REDIS_COMMAND_COUNT;
+ session->event_loop = rspamd_fuzzy_backend_event_base(bk);
+
+ session->nargs = 2;
+ session->argv = g_malloc(sizeof(gchar *) * 2);
+ session->argv_lens = g_malloc(sizeof(gsize) * 2);
+ key = g_string_new(backend->redis_object);
+ g_string_append(key, "_count");
+ session->argv[0] = g_strdup("GET");
+ session->argv_lens[0] = 3;
+ session->argv[1] = key->str;
+ session->argv_lens[1] = key->len;
+ g_string_free(key, FALSE); /* Do not free underlying array */
+
+ up = rspamd_upstream_get(ups,
+ RSPAMD_UPSTREAM_ROUND_ROBIN,
+ NULL,
+ 0);
+
+ session->up = rspamd_upstream_ref(up);
+ addr = rspamd_upstream_addr_next(up);
+ g_assert(addr != NULL);
+ session->ctx = rspamd_redis_pool_connect(backend->pool,
+ backend->dbname,
+ backend->username, backend->password,
+ rspamd_inet_address_to_string(addr),
+ rspamd_inet_address_get_port(addr));
+
+ if (session->ctx == NULL) {
+ rspamd_upstream_fail(up, TRUE, strerror(errno));
+ rspamd_fuzzy_redis_session_dtor(session, TRUE);
+
+ if (cb) {
+ cb(0, ud);
+ }
+ }
+ else {
+ if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_count_callback,
+ session, session->nargs,
+ (const gchar **) session->argv, session->argv_lens) != REDIS_OK) {
+ rspamd_fuzzy_redis_session_dtor(session, TRUE);
+
+ if (cb) {
+ cb(0, ud);
+ }
+ }
+ else {
+ /* Add timeout */
+ session->timeout.data = session;
+ ev_now_update_if_cheap((struct ev_loop *) session->event_loop);
+ ev_timer_init(&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start(session->event_loop, &session->timeout);
+ }
+ }
+}
+
+static void
+rspamd_fuzzy_redis_version_callback(redisAsyncContext *c, gpointer r,
+ gpointer priv)
+{
+ struct rspamd_fuzzy_redis_session *session = priv;
+ redisReply *reply = r;
+ gulong nelts;
+
+ ev_timer_stop(session->event_loop, &session->timeout);
+
+ if (c->err == 0 && reply != NULL) {
+ rspamd_upstream_ok(session->up);
+
+ if (reply->type == REDIS_REPLY_INTEGER) {
+ if (session->callback.cb_version) {
+ session->callback.cb_version(reply->integer, session->cbdata);
+ }
+ }
+ else if (reply->type == REDIS_REPLY_STRING) {
+ nelts = strtoul(reply->str, NULL, 10);
+
+ if (session->callback.cb_version) {
+ session->callback.cb_version(nelts, session->cbdata);
+ }
+ }
+ else {
+ if (reply->type == REDIS_REPLY_ERROR) {
+ msg_err_redis_session("fuzzy backend redis error: \"%s\"",
+ reply->str);
+ }
+ if (session->callback.cb_version) {
+ session->callback.cb_version(0, session->cbdata);
+ }
+ }
+ }
+ else {
+ if (session->callback.cb_version) {
+ session->callback.cb_version(0, session->cbdata);
+ }
+
+ if (c->errstr) {
+ msg_err_redis_session("error getting version on %s: %s",
+ rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)),
+ c->errstr);
+ rspamd_upstream_fail(session->up, FALSE, c->errstr);
+ }
+ }
+
+ rspamd_fuzzy_redis_session_dtor(session, FALSE);
+}
+
+void rspamd_fuzzy_backend_version_redis(struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ struct rspamd_fuzzy_redis_session *session;
+ struct upstream *up;
+ struct upstream_list *ups;
+ rspamd_inet_addr_t *addr;
+ GString *key;
+
+ g_assert(backend != NULL);
+
+ ups = rspamd_redis_get_servers(backend, "read_servers");
+ if (!ups) {
+ if (cb) {
+ cb(0, ud);
+ }
+
+ return;
+ }
+
+ session = g_malloc0(sizeof(*session));
+ session->backend = backend;
+ REF_RETAIN(session->backend);
+
+ session->callback.cb_version = cb;
+ session->cbdata = ud;
+ session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION;
+ session->event_loop = rspamd_fuzzy_backend_event_base(bk);
+
+ session->nargs = 2;
+ session->argv = g_malloc(sizeof(gchar *) * 2);
+ session->argv_lens = g_malloc(sizeof(gsize) * 2);
+ key = g_string_new(backend->redis_object);
+ g_string_append(key, src);
+ session->argv[0] = g_strdup("GET");
+ session->argv_lens[0] = 3;
+ session->argv[1] = key->str;
+ session->argv_lens[1] = key->len;
+ g_string_free(key, FALSE); /* Do not free underlying array */
+
+ up = rspamd_upstream_get(ups,
+ RSPAMD_UPSTREAM_ROUND_ROBIN,
+ NULL,
+ 0);
+
+ session->up = rspamd_upstream_ref(up);
+ addr = rspamd_upstream_addr_next(up);
+ g_assert(addr != NULL);
+ session->ctx = rspamd_redis_pool_connect(backend->pool,
+ backend->dbname,
+ backend->username, backend->password,
+ rspamd_inet_address_to_string(addr),
+ rspamd_inet_address_get_port(addr));
+
+ if (session->ctx == NULL) {
+ rspamd_upstream_fail(up, FALSE, strerror(errno));
+ rspamd_fuzzy_redis_session_dtor(session, TRUE);
+
+ if (cb) {
+ cb(0, ud);
+ }
+ }
+ else {
+ if (redisAsyncCommandArgv(session->ctx, rspamd_fuzzy_redis_version_callback,
+ session, session->nargs,
+ (const gchar **) session->argv, session->argv_lens) != REDIS_OK) {
+ rspamd_fuzzy_redis_session_dtor(session, TRUE);
+
+ if (cb) {
+ cb(0, ud);
+ }
+ }
+ else {
+ /* Add timeout */
+ session->timeout.data = session;
+ ev_now_update_if_cheap((struct ev_loop *) session->event_loop);
+ ev_timer_init(&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start(session->event_loop, &session->timeout);
+ }
+ }
+}
+
+const gchar *
+rspamd_fuzzy_backend_id_redis(struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ g_assert(backend != NULL);
+
+ return backend->id;
+}
+
+void rspamd_fuzzy_backend_expire_redis(struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+
+ g_assert(backend != NULL);
+}
+
+static gboolean
+rspamd_fuzzy_update_append_command(struct rspamd_fuzzy_backend *bk,
+ struct rspamd_fuzzy_redis_session *session,
+ struct fuzzy_peer_cmd *io_cmd, guint *shift)
+{
+ GString *key, *value;
+ guint cur_shift = *shift;
+ guint i, klen;
+ struct rspamd_fuzzy_cmd *cmd;
+
+ if (io_cmd->is_shingle) {
+ cmd = &io_cmd->cmd.shingle.basic;
+ }
+ else {
+ cmd = &io_cmd->cmd.normal;
+ }
+
+ if (cmd->cmd == FUZZY_WRITE) {
+ /*
+ * For each normal hash addition we do 5 redis commands:
+ * HSET <key> F <flag>
+ * HSETNX <key> C <time>
+ * HINCRBY <key> V <weight>
+ * EXPIRE <key> <expire>
+ * Where <key> is <prefix> || <digest>
+ */
+
+ /* HSET */
+ klen = strlen(session->backend->redis_object) +
+ sizeof(cmd->digest) + 1;
+ key = g_string_sized_new(klen);
+ g_string_append(key, session->backend->redis_object);
+ g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
+ value = g_string_sized_new(sizeof("4294967296"));
+ rspamd_printf_gstring(value, "%d", cmd->flag);
+
+ if (cmd->version & RSPAMD_FUZZY_FLAG_WEAK) {
+ session->argv[cur_shift] = g_strdup("HSETNX");
+ session->argv_lens[cur_shift++] = sizeof("HSETNX") - 1;
+ }
+ else {
+ session->argv[cur_shift] = g_strdup("HSET");
+ session->argv_lens[cur_shift++] = sizeof("HSET") - 1;
+ }
+
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = g_strdup("F");
+ session->argv_lens[cur_shift++] = sizeof("F") - 1;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free(key, FALSE);
+ g_string_free(value, FALSE);
+
+ if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
+ 4,
+ (const gchar **) &session->argv[cur_shift - 4],
+ &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
+
+ return FALSE;
+ }
+
+ /* HSETNX */
+ klen = strlen(session->backend->redis_object) +
+ sizeof(cmd->digest) + 1;
+ key = g_string_sized_new(klen);
+ g_string_append(key, session->backend->redis_object);
+ g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
+ value = g_string_sized_new(sizeof("18446744073709551616"));
+ rspamd_printf_gstring(value, "%L", (gint64) rspamd_get_calendar_ticks());
+ session->argv[cur_shift] = g_strdup("HSETNX");
+ session->argv_lens[cur_shift++] = sizeof("HSETNX") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = g_strdup("C");
+ session->argv_lens[cur_shift++] = sizeof("C") - 1;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free(key, FALSE);
+ g_string_free(value, FALSE);
+
+ if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
+ 4,
+ (const gchar **) &session->argv[cur_shift - 4],
+ &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
+
+ return FALSE;
+ }
+
+ /* HINCRBY */
+ key = g_string_sized_new(klen);
+ g_string_append(key, session->backend->redis_object);
+ g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
+ value = g_string_sized_new(sizeof("4294967296"));
+ rspamd_printf_gstring(value, "%d", cmd->value);
+ session->argv[cur_shift] = g_strdup("HINCRBY");
+ session->argv_lens[cur_shift++] = sizeof("HINCRBY") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = g_strdup("V");
+ session->argv_lens[cur_shift++] = sizeof("V") - 1;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free(key, FALSE);
+ g_string_free(value, FALSE);
+
+ if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
+ 4,
+ (const gchar **) &session->argv[cur_shift - 4],
+ &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
+
+ return FALSE;
+ }
+
+ /* EXPIRE */
+ key = g_string_sized_new(klen);
+ g_string_append(key, session->backend->redis_object);
+ g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
+ value = g_string_sized_new(sizeof("4294967296"));
+ rspamd_printf_gstring(value, "%d",
+ (gint) rspamd_fuzzy_backend_get_expire(bk));
+ session->argv[cur_shift] = g_strdup("EXPIRE");
+ session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free(key, FALSE);
+ g_string_free(value, FALSE);
+
+ if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
+ 3,
+ (const gchar **) &session->argv[cur_shift - 3],
+ &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
+
+ return FALSE;
+ }
+
+ /* INCR */
+ key = g_string_sized_new(klen);
+ g_string_append(key, session->backend->redis_object);
+ g_string_append(key, "_count");
+ session->argv[cur_shift] = g_strdup("INCR");
+ session->argv_lens[cur_shift++] = sizeof("INCR") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ g_string_free(key, FALSE);
+
+ if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
+ 2,
+ (const gchar **) &session->argv[cur_shift - 2],
+ &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
+
+ return FALSE;
+ }
+ }
+ else if (cmd->cmd == FUZZY_DEL) {
+ /* DEL */
+ klen = strlen(session->backend->redis_object) +
+ sizeof(cmd->digest) + 1;
+
+ key = g_string_sized_new(klen);
+ g_string_append(key, session->backend->redis_object);
+ g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
+ session->argv[cur_shift] = g_strdup("DEL");
+ session->argv_lens[cur_shift++] = sizeof("DEL") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ g_string_free(key, FALSE);
+
+ if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
+ 2,
+ (const gchar **) &session->argv[cur_shift - 2],
+ &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
+
+ return FALSE;
+ }
+
+ /* DECR */
+ key = g_string_sized_new(klen);
+ g_string_append(key, session->backend->redis_object);
+ g_string_append(key, "_count");
+ session->argv[cur_shift] = g_strdup("DECR");
+ session->argv_lens[cur_shift++] = sizeof("DECR") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ g_string_free(key, FALSE);
+
+ if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
+ 2,
+ (const gchar **) &session->argv[cur_shift - 2],
+ &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
+
+ return FALSE;
+ }
+ }
+ else if (cmd->cmd == FUZZY_REFRESH) {
+ /*
+ * Issue refresh command by just EXPIRE command
+ * EXPIRE <key> <expire>
+ * Where <key> is <prefix> || <digest>
+ */
+
+ klen = strlen(session->backend->redis_object) +
+ sizeof(cmd->digest) + 1;
+
+ /* EXPIRE */
+ key = g_string_sized_new(klen);
+ g_string_append(key, session->backend->redis_object);
+ g_string_append_len(key, cmd->digest, sizeof(cmd->digest));
+ value = g_string_sized_new(sizeof("4294967296"));
+ rspamd_printf_gstring(value, "%d",
+ (gint) rspamd_fuzzy_backend_get_expire(bk));
+ session->argv[cur_shift] = g_strdup("EXPIRE");
+ session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free(key, FALSE);
+ g_string_free(value, FALSE);
+
+ if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
+ 3,
+ (const gchar **) &session->argv[cur_shift - 3],
+ &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
+
+ return FALSE;
+ }
+ }
+ else if (cmd->cmd == FUZZY_DUP) {
+ /* Ignore */
+ }
+ else {
+ g_assert_not_reached();
+ }
+
+ if (io_cmd->is_shingle) {
+ if (cmd->cmd == FUZZY_WRITE) {
+ klen = strlen(session->backend->redis_object) +
+ 64 + 1;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
+ guchar *hval;
+ /*
+ * For each command with shingles we additionally emit 32 commands:
+ * SETEX <prefix>_<number>_<value> <expire> <digest>
+ */
+
+ /* SETEX */
+ key = g_string_sized_new(klen);
+ rspamd_printf_gstring(key, "%s_%d_%uL",
+ session->backend->redis_object,
+ i,
+ io_cmd->cmd.shingle.sgl.hashes[i]);
+ value = g_string_sized_new(sizeof("4294967296"));
+ rspamd_printf_gstring(value, "%d",
+ (gint) rspamd_fuzzy_backend_get_expire(bk));
+ hval = g_malloc(sizeof(io_cmd->cmd.shingle.basic.digest));
+ memcpy(hval, io_cmd->cmd.shingle.basic.digest,
+ sizeof(io_cmd->cmd.shingle.basic.digest));
+ session->argv[cur_shift] = g_strdup("SETEX");
+ session->argv_lens[cur_shift++] = sizeof("SETEX") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ session->argv[cur_shift] = hval;
+ session->argv_lens[cur_shift++] = sizeof(io_cmd->cmd.shingle.basic.digest);
+ g_string_free(key, FALSE);
+ g_string_free(value, FALSE);
+
+ if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
+ 4,
+ (const gchar **) &session->argv[cur_shift - 4],
+ &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
+
+ return FALSE;
+ }
+ }
+ }
+ else if (cmd->cmd == FUZZY_DEL) {
+ klen = strlen(session->backend->redis_object) +
+ 64 + 1;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
+ key = g_string_sized_new(klen);
+ rspamd_printf_gstring(key, "%s_%d_%uL",
+ session->backend->redis_object,
+ i,
+ io_cmd->cmd.shingle.sgl.hashes[i]);
+ session->argv[cur_shift] = g_strdup("DEL");
+ session->argv_lens[cur_shift++] = sizeof("DEL") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ g_string_free(key, FALSE);
+
+ if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
+ 2,
+ (const gchar **) &session->argv[cur_shift - 2],
+ &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
+
+ return FALSE;
+ }
+ }
+ }
+ else if (cmd->cmd == FUZZY_REFRESH) {
+ klen = strlen(session->backend->redis_object) +
+ 64 + 1;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
+ /*
+ * For each command with shingles we additionally emit 32 commands:
+ * EXPIRE <prefix>_<number>_<value> <expire>
+ */
+
+ /* Expire */
+ key = g_string_sized_new(klen);
+ rspamd_printf_gstring(key, "%s_%d_%uL",
+ session->backend->redis_object,
+ i,
+ io_cmd->cmd.shingle.sgl.hashes[i]);
+ value = g_string_sized_new(sizeof("18446744073709551616"));
+ rspamd_printf_gstring(value, "%d",
+ (gint) rspamd_fuzzy_backend_get_expire(bk));
+ session->argv[cur_shift] = g_strdup("EXPIRE");
+ session->argv_lens[cur_shift++] = sizeof("EXPIRE") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free(key, FALSE);
+ g_string_free(value, FALSE);
+
+ if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
+ 3,
+ (const gchar **) &session->argv[cur_shift - 3],
+ &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
+
+ return FALSE;
+ }
+ }
+ }
+ else if (cmd->cmd == FUZZY_DUP) {
+ /* Ignore */
+ }
+ else {
+ g_assert_not_reached();
+ }
+ }
+
+ *shift = cur_shift;
+
+ return TRUE;
+}
+
+static void
+rspamd_fuzzy_redis_update_callback(redisAsyncContext *c, gpointer r,
+ gpointer priv)
+{
+ struct rspamd_fuzzy_redis_session *session = priv;
+ redisReply *reply = r;
+
+ ev_timer_stop(session->event_loop, &session->timeout);
+
+ if (c->err == 0 && reply != NULL) {
+ rspamd_upstream_ok(session->up);
+
+ if (reply->type == REDIS_REPLY_ARRAY) {
+ /* TODO: check all replies somehow */
+ if (session->callback.cb_update) {
+ session->callback.cb_update(TRUE,
+ session->nadded,
+ session->ndeleted,
+ session->nextended,
+ session->nignored,
+ session->cbdata);
+ }
+ }
+ else {
+ if (reply->type == REDIS_REPLY_ERROR) {
+ msg_err_redis_session("fuzzy backend redis error: \"%s\"",
+ reply->str);
+ }
+ if (session->callback.cb_update) {
+ session->callback.cb_update(FALSE, 0, 0, 0, 0, session->cbdata);
+ }
+ }
+ }
+ else {
+ if (session->callback.cb_update) {
+ session->callback.cb_update(FALSE, 0, 0, 0, 0, session->cbdata);
+ }
+
+ if (c->errstr) {
+ msg_err_redis_session("error sending update to redis %s: %s",
+ rspamd_inet_address_to_string_pretty(rspamd_upstream_addr_cur(session->up)),
+ c->errstr);
+ rspamd_upstream_fail(session->up, FALSE, c->errstr);
+ }
+ }
+
+ rspamd_fuzzy_redis_session_dtor(session, FALSE);
+}
+
+void rspamd_fuzzy_backend_update_redis(struct rspamd_fuzzy_backend *bk,
+ GArray *updates, const gchar *src,
+ rspamd_fuzzy_update_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ struct rspamd_fuzzy_redis_session *session;
+ struct upstream *up;
+ struct upstream_list *ups;
+ rspamd_inet_addr_t *addr;
+ guint i;
+ GString *key;
+ struct fuzzy_peer_cmd *io_cmd;
+ struct rspamd_fuzzy_cmd *cmd = NULL;
+ guint nargs, cur_shift;
+
+ g_assert(backend != NULL);
+
+ ups = rspamd_redis_get_servers(backend, "write_servers");
+ if (!ups) {
+ if (cb) {
+ cb(FALSE, 0, 0, 0, 0, ud);
+ }
+
+ return;
+ }
+
+ session = g_malloc0(sizeof(*session));
+ session->backend = backend;
+ REF_RETAIN(session->backend);
+
+ /*
+ * For each normal hash addition we do 3 redis commands:
+ * HSET <key> F <flag> **OR** HSETNX <key> F <flag> when flag is weak
+ * HINCRBY <key> V <weight>
+ * EXPIRE <key> <expire>
+ * INCR <prefix||fuzzy_count>
+ *
+ * Where <key> is <prefix> || <digest>
+ *
+ * For each command with shingles we additionally emit 32 commands:
+ * SETEX <prefix>_<number>_<value> <expire> <digest>
+ *
+ * For each delete command we emit:
+ * DEL <key>
+ *
+ * For each delete command with shingles we emit also 32 commands:
+ * DEL <prefix>_<number>_<value>
+ * DECR <prefix||fuzzy_count>
+ */
+
+ nargs = 4;
+
+ for (i = 0; i < updates->len; i++) {
+ io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i);
+
+ if (io_cmd->is_shingle) {
+ cmd = &io_cmd->cmd.shingle.basic;
+ }
+ else {
+ cmd = &io_cmd->cmd.normal;
+ }
+
+ if (cmd->cmd == FUZZY_WRITE) {
+ nargs += 17;
+ session->nadded++;
+
+ if (io_cmd->is_shingle) {
+ nargs += RSPAMD_SHINGLE_SIZE * 4;
+ }
+ }
+ else if (cmd->cmd == FUZZY_DEL) {
+ nargs += 4;
+ session->ndeleted++;
+
+ if (io_cmd->is_shingle) {
+ nargs += RSPAMD_SHINGLE_SIZE * 2;
+ }
+ }
+ else if (cmd->cmd == FUZZY_REFRESH) {
+ nargs += 3;
+ session->nextended++;
+
+ if (io_cmd->is_shingle) {
+ nargs += RSPAMD_SHINGLE_SIZE * 3;
+ }
+ }
+ else {
+ session->nignored++;
+ }
+ }
+
+ /* Now we need to create a new request */
+ session->callback.cb_update = cb;
+ session->cbdata = ud;
+ session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES;
+ session->cmd = cmd;
+ session->prob = 1.0f;
+ session->event_loop = rspamd_fuzzy_backend_event_base(bk);
+
+ /* First of all check digest */
+ session->nargs = nargs;
+ session->argv = g_malloc0(sizeof(gchar *) * session->nargs);
+ session->argv_lens = g_malloc0(sizeof(gsize) * session->nargs);
+
+ up = rspamd_upstream_get(ups,
+ RSPAMD_UPSTREAM_MASTER_SLAVE,
+ NULL,
+ 0);
+
+ session->up = rspamd_upstream_ref(up);
+ addr = rspamd_upstream_addr_next(up);
+ g_assert(addr != NULL);
+ session->ctx = rspamd_redis_pool_connect(backend->pool,
+ backend->dbname,
+ backend->username, backend->password,
+ rspamd_inet_address_to_string(addr),
+ rspamd_inet_address_get_port(addr));
+
+ if (session->ctx == NULL) {
+ rspamd_upstream_fail(up, TRUE, strerror(errno));
+ rspamd_fuzzy_redis_session_dtor(session, TRUE);
+
+ if (cb) {
+ cb(FALSE, 0, 0, 0, 0, ud);
+ }
+ }
+ else {
+ /* Start with MULTI command */
+ session->argv[0] = g_strdup("MULTI");
+ session->argv_lens[0] = 5;
+
+ if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
+ 1,
+ (const gchar **) session->argv,
+ session->argv_lens) != REDIS_OK) {
+
+ if (cb) {
+ cb(FALSE, 0, 0, 0, 0, ud);
+ }
+ rspamd_fuzzy_redis_session_dtor(session, TRUE);
+
+ return;
+ }
+
+ /* Now split the rest of commands in packs and emit them command by command */
+ cur_shift = 1;
+
+ for (i = 0; i < updates->len; i++) {
+ io_cmd = &g_array_index(updates, struct fuzzy_peer_cmd, i);
+
+ if (!rspamd_fuzzy_update_append_command(bk, session, io_cmd,
+ &cur_shift)) {
+ if (cb) {
+ cb(FALSE, 0, 0, 0, 0, ud);
+ }
+ rspamd_fuzzy_redis_session_dtor(session, TRUE);
+
+ return;
+ }
+ }
+
+ /* Now INCR command for the source */
+ key = g_string_new(backend->redis_object);
+ g_string_append(key, src);
+ session->argv[cur_shift] = g_strdup("INCR");
+ session->argv_lens[cur_shift++] = 4;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ g_string_free(key, FALSE);
+
+ if (redisAsyncCommandArgv(session->ctx, NULL, NULL,
+ 2,
+ (const gchar **) &session->argv[cur_shift - 2],
+ &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
+
+ if (cb) {
+ cb(FALSE, 0, 0, 0, 0, ud);
+ }
+ rspamd_fuzzy_redis_session_dtor(session, TRUE);
+
+ return;
+ }
+
+ /* Finally we call EXEC with a specific callback */
+ session->argv[cur_shift] = g_strdup("EXEC");
+ session->argv_lens[cur_shift] = 4;
+
+ if (redisAsyncCommandArgv(session->ctx,
+ rspamd_fuzzy_redis_update_callback, session,
+ 1,
+ (const gchar **) &session->argv[cur_shift],
+ &session->argv_lens[cur_shift]) != REDIS_OK) {
+
+ if (cb) {
+ cb(FALSE, 0, 0, 0, 0, ud);
+ }
+ rspamd_fuzzy_redis_session_dtor(session, TRUE);
+
+ return;
+ }
+ else {
+ /* Add timeout */
+ session->timeout.data = session;
+ ev_now_update_if_cheap((struct ev_loop *) session->event_loop);
+ ev_timer_init(&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start(session->event_loop, &session->timeout);
+ }
+ }
+}
+
+void rspamd_fuzzy_backend_close_redis(struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+
+ g_assert(backend != NULL);
+
+ /*
+ * XXX: we leak lua registry element there to avoid crashing
+ * due to chicken-egg problem between lua state termination and
+ * redis pool termination.
+ * Here, we assume that redis pool is destroyed AFTER lua_state,
+ * so all connections pending will release references but due to
+ * `terminated` hack they will not try to access Lua stuff
+ * This is enabled merely if we have connections pending (e.g. refcount > 1)
+ */
+ if (backend->ref.refcount > 1) {
+ backend->terminated = true;
+ }
+ REF_RELEASE(backend);
+}
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_redis.h b/src/libserver/fuzzy_backend/fuzzy_backend_redis.h
new file mode 100644
index 0000000..3cfa162
--- /dev/null
+++ b/src/libserver/fuzzy_backend/fuzzy_backend_redis.h
@@ -0,0 +1,67 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef SRC_LIBSERVER_FUZZY_BACKEND_REDIS_H_
+#define SRC_LIBSERVER_FUZZY_BACKEND_REDIS_H_
+
+#include "config.h"
+#include "fuzzy_backend.h"
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*
+ * Subroutines for fuzzy_backend
+ */
+void *rspamd_fuzzy_backend_init_redis(struct rspamd_fuzzy_backend *bk,
+ const ucl_object_t *obj,
+ struct rspamd_config *cfg,
+ GError **err);
+
+void rspamd_fuzzy_backend_check_redis(struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud,
+ void *subr_ud);
+
+void rspamd_fuzzy_backend_update_redis(struct rspamd_fuzzy_backend *bk,
+ GArray *updates, const gchar *src,
+ rspamd_fuzzy_update_cb cb, void *ud,
+ void *subr_ud);
+
+void rspamd_fuzzy_backend_count_redis(struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud,
+ void *subr_ud);
+
+void rspamd_fuzzy_backend_version_redis(struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud,
+ void *subr_ud);
+
+const gchar *rspamd_fuzzy_backend_id_redis(struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+
+void rspamd_fuzzy_backend_expire_redis(struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+
+void rspamd_fuzzy_backend_close_redis(struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* SRC_LIBSERVER_FUZZY_BACKEND_REDIS_H_ */
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c
new file mode 100644
index 0000000..9ec448e
--- /dev/null
+++ b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c
@@ -0,0 +1,1029 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rspamd.h"
+#include "fuzzy_backend.h"
+#include "fuzzy_backend_sqlite.h"
+#include "unix-std.h"
+
+#include <sqlite3.h>
+#include "libutil/sqlite_utils.h"
+
+struct rspamd_fuzzy_backend_sqlite {
+ sqlite3 *db;
+ char *path;
+ gchar id[MEMPOOL_UID_LEN];
+ gsize count;
+ gsize expired;
+ rspamd_mempool_t *pool;
+};
+
+static const gdouble sql_sleep_time = 0.1;
+static const guint max_retries = 10;
+
+#define msg_err_fuzzy_backend(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
+ backend->pool->tag.tagname, backend->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_warn_fuzzy_backend(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \
+ backend->pool->tag.tagname, backend->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_info_fuzzy_backend(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \
+ backend->pool->tag.tagname, backend->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_debug_fuzzy_backend(...) rspamd_conditional_debug_fast(NULL, NULL, \
+ rspamd_fuzzy_sqlite_log_id, backend->pool->tag.tagname, backend->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+
+INIT_LOG_MODULE(fuzzy_sqlite)
+
+static const char *create_tables_sql =
+ "BEGIN;"
+ "CREATE TABLE IF NOT EXISTS digests("
+ " id INTEGER PRIMARY KEY,"
+ " flag INTEGER NOT NULL,"
+ " digest TEXT NOT NULL,"
+ " value INTEGER,"
+ " time INTEGER);"
+ "CREATE TABLE IF NOT EXISTS shingles("
+ " value INTEGER NOT NULL,"
+ " number INTEGER NOT NULL,"
+ " digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
+ " ON UPDATE CASCADE);"
+ "CREATE TABLE IF NOT EXISTS sources("
+ " name TEXT UNIQUE,"
+ " version INTEGER,"
+ " last INTEGER);"
+ "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
+ "CREATE INDEX IF NOT EXISTS t ON digests(time);"
+ "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
+ "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
+ "COMMIT;";
+#if 0
+static const char *create_index_sql =
+ "BEGIN;"
+ "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
+ "CREATE INDEX IF NOT EXISTS t ON digests(time);"
+ "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
+ "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
+ "COMMIT;";
+#endif
+enum rspamd_fuzzy_statement_idx {
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START = 0,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK,
+ RSPAMD_FUZZY_BACKEND_INSERT,
+ RSPAMD_FUZZY_BACKEND_UPDATE,
+ RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
+ RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
+ RSPAMD_FUZZY_BACKEND_CHECK,
+ RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
+ RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID,
+ RSPAMD_FUZZY_BACKEND_DELETE,
+ RSPAMD_FUZZY_BACKEND_COUNT,
+ RSPAMD_FUZZY_BACKEND_EXPIRE,
+ RSPAMD_FUZZY_BACKEND_VACUUM,
+ RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+ RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
+ RSPAMD_FUZZY_BACKEND_VERSION,
+ RSPAMD_FUZZY_BACKEND_SET_VERSION,
+ RSPAMD_FUZZY_BACKEND_MAX
+};
+static struct rspamd_fuzzy_stmts {
+ enum rspamd_fuzzy_statement_idx idx;
+ const gchar *sql;
+ const gchar *args;
+ sqlite3_stmt *stmt;
+ gint result;
+} prepared_stmts[RSPAMD_FUZZY_BACKEND_MAX] =
+ {
+ {.idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_START,
+ .sql = "BEGIN TRANSACTION;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_DONE},
+ {.idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT,
+ .sql = "COMMIT;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_DONE},
+ {.idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK,
+ .sql = "ROLLBACK;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_DONE},
+ {.idx = RSPAMD_FUZZY_BACKEND_INSERT,
+ .sql = "INSERT INTO digests(flag, digest, value, time) VALUES"
+ "(?1, ?2, ?3, strftime('%s','now'));",
+ .args = "SDI",
+ .stmt = NULL,
+ .result = SQLITE_DONE},
+ {.idx = RSPAMD_FUZZY_BACKEND_UPDATE,
+ .sql = "UPDATE digests SET value = value + ?1, time = strftime('%s','now') WHERE "
+ "digest==?2;",
+ .args = "ID",
+ .stmt = NULL,
+ .result = SQLITE_DONE},
+ {.idx = RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
+ .sql = "UPDATE digests SET value = ?1, flag = ?2, time = strftime('%s','now') WHERE "
+ "digest==?3;",
+ .args = "IID",
+ .stmt = NULL,
+ .result = SQLITE_DONE},
+ {.idx = RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
+ .sql = "INSERT OR REPLACE INTO shingles(value, number, digest_id) "
+ "VALUES (?1, ?2, ?3);",
+ .args = "III",
+ .stmt = NULL,
+ .result = SQLITE_DONE},
+ {.idx = RSPAMD_FUZZY_BACKEND_CHECK,
+ .sql = "SELECT value, time, flag FROM digests WHERE digest==?1;",
+ .args = "D",
+ .stmt = NULL,
+ .result = SQLITE_ROW},
+ {.idx = RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
+ .sql = "SELECT digest_id FROM shingles WHERE value=?1 AND number=?2",
+ .args = "IS",
+ .stmt = NULL,
+ .result = SQLITE_ROW},
+ {.idx = RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID,
+ .sql = "SELECT digest, value, time, flag FROM digests WHERE id=?1",
+ .args = "I",
+ .stmt = NULL,
+ .result = SQLITE_ROW},
+ {.idx = RSPAMD_FUZZY_BACKEND_DELETE,
+ .sql = "DELETE FROM digests WHERE digest==?1;",
+ .args = "D",
+ .stmt = NULL,
+ .result = SQLITE_DONE},
+ {.idx = RSPAMD_FUZZY_BACKEND_COUNT,
+ .sql = "SELECT COUNT(*) FROM digests;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_ROW},
+ {.idx = RSPAMD_FUZZY_BACKEND_EXPIRE,
+ .sql = "DELETE FROM digests WHERE id IN (SELECT id FROM digests WHERE time < ?1 LIMIT ?2);",
+ .args = "II",
+ .stmt = NULL,
+ .result = SQLITE_DONE},
+ {.idx = RSPAMD_FUZZY_BACKEND_VACUUM,
+ .sql = "VACUUM;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_DONE},
+ {.idx = RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+ .sql = "DELETE FROM shingles WHERE value=?1 AND number=?2;",
+ .args = "II",
+ .stmt = NULL,
+ .result = SQLITE_DONE},
+ {.idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
+ .sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);",
+ .args = "TII",
+ .stmt = NULL,
+ .result = SQLITE_DONE},
+ {.idx = RSPAMD_FUZZY_BACKEND_VERSION,
+ .sql = "SELECT version FROM sources WHERE name=?1;",
+ .args = "T",
+ .stmt = NULL,
+ .result = SQLITE_ROW},
+ {.idx = RSPAMD_FUZZY_BACKEND_SET_VERSION,
+ .sql = "INSERT OR REPLACE INTO sources (name, version, last) VALUES (?3, ?1, ?2);",
+ .args = "IIT",
+ .stmt = NULL,
+ .result = SQLITE_DONE},
+};
+
+static GQuark
+rspamd_fuzzy_backend_sqlite_quark(void)
+{
+ return g_quark_from_static_string("fuzzy-backend-sqlite");
+}
+
+static gboolean
+rspamd_fuzzy_backend_sqlite_prepare_stmts(struct rspamd_fuzzy_backend_sqlite *bk, GError **err)
+{
+ int i;
+
+ for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i++) {
+ if (prepared_stmts[i].stmt != NULL) {
+ /* Skip already prepared statements */
+ continue;
+ }
+ if (sqlite3_prepare_v2(bk->db, prepared_stmts[i].sql, -1,
+ &prepared_stmts[i].stmt, NULL) != SQLITE_OK) {
+ g_set_error(err, rspamd_fuzzy_backend_sqlite_quark(),
+ -1, "Cannot initialize prepared sql `%s`: %s",
+ prepared_stmts[i].sql, sqlite3_errmsg(bk->db));
+
+ return FALSE;
+ }
+ }
+
+ return TRUE;
+}
+
+static int
+rspamd_fuzzy_backend_sqlite_cleanup_stmt(struct rspamd_fuzzy_backend_sqlite *backend,
+ int idx)
+{
+ sqlite3_stmt *stmt;
+
+ if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) {
+
+ return -1;
+ }
+
+ msg_debug_fuzzy_backend("resetting `%s`", prepared_stmts[idx].sql);
+ stmt = prepared_stmts[idx].stmt;
+ sqlite3_clear_bindings(stmt);
+ sqlite3_reset(stmt);
+
+ return SQLITE_OK;
+}
+
+static int
+rspamd_fuzzy_backend_sqlite_run_stmt(struct rspamd_fuzzy_backend_sqlite *backend,
+ gboolean auto_cleanup,
+ int idx, ...)
+{
+ int retcode;
+ va_list ap;
+ sqlite3_stmt *stmt;
+ int i;
+ const char *argtypes;
+ guint retries = 0;
+ struct timespec ts;
+
+ if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) {
+
+ return -1;
+ }
+
+ stmt = prepared_stmts[idx].stmt;
+ g_assert((int) prepared_stmts[idx].idx == idx);
+
+ if (stmt == NULL) {
+ if ((retcode = sqlite3_prepare_v2(backend->db, prepared_stmts[idx].sql, -1,
+ &prepared_stmts[idx].stmt, NULL)) != SQLITE_OK) {
+ msg_err_fuzzy_backend("Cannot initialize prepared sql `%s`: %s",
+ prepared_stmts[idx].sql, sqlite3_errmsg(backend->db));
+
+ return retcode;
+ }
+ stmt = prepared_stmts[idx].stmt;
+ }
+
+ msg_debug_fuzzy_backend("executing `%s` %s auto cleanup",
+ prepared_stmts[idx].sql, auto_cleanup ? "with" : "without");
+ argtypes = prepared_stmts[idx].args;
+ sqlite3_clear_bindings(stmt);
+ sqlite3_reset(stmt);
+ va_start(ap, idx);
+
+ for (i = 0; argtypes[i] != '\0'; i++) {
+ switch (argtypes[i]) {
+ case 'T':
+ sqlite3_bind_text(stmt, i + 1, va_arg(ap, const char *), -1,
+ SQLITE_STATIC);
+ break;
+ case 'I':
+ sqlite3_bind_int64(stmt, i + 1, va_arg(ap, gint64));
+ break;
+ case 'S':
+ sqlite3_bind_int(stmt, i + 1, va_arg(ap, gint));
+ break;
+ case 'D':
+ /* Special case for digests variable */
+ sqlite3_bind_text(stmt, i + 1, va_arg(ap, const char *), 64,
+ SQLITE_STATIC);
+ break;
+ }
+ }
+
+ va_end(ap);
+
+retry:
+ retcode = sqlite3_step(stmt);
+
+ if (retcode == prepared_stmts[idx].result) {
+ retcode = SQLITE_OK;
+ }
+ else {
+ if ((retcode == SQLITE_BUSY ||
+ retcode == SQLITE_LOCKED) &&
+ retries++ < max_retries) {
+ double_to_ts(sql_sleep_time, &ts);
+ nanosleep(&ts, NULL);
+ goto retry;
+ }
+
+ msg_debug_fuzzy_backend("failed to execute query %s: %d, %s", prepared_stmts[idx].sql,
+ retcode, sqlite3_errmsg(backend->db));
+ }
+
+ if (auto_cleanup) {
+ sqlite3_clear_bindings(stmt);
+ sqlite3_reset(stmt);
+ }
+
+ return retcode;
+}
+
+static void
+rspamd_fuzzy_backend_sqlite_close_stmts(struct rspamd_fuzzy_backend_sqlite *bk)
+{
+ int i;
+
+ for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i++) {
+ if (prepared_stmts[i].stmt != NULL) {
+ sqlite3_finalize(prepared_stmts[i].stmt);
+ prepared_stmts[i].stmt = NULL;
+ }
+ }
+
+ return;
+}
+
+static gboolean
+rspamd_fuzzy_backend_sqlite_run_sql(const gchar *sql, struct rspamd_fuzzy_backend_sqlite *bk,
+ GError **err)
+{
+ guint retries = 0;
+ struct timespec ts;
+ gint ret;
+
+ do {
+ ret = sqlite3_exec(bk->db, sql, NULL, NULL, NULL);
+ double_to_ts(sql_sleep_time, &ts);
+ } while (ret == SQLITE_BUSY && retries++ < max_retries &&
+ nanosleep(&ts, NULL) == 0);
+
+ if (ret != SQLITE_OK) {
+ g_set_error(err, rspamd_fuzzy_backend_sqlite_quark(),
+ -1, "Cannot execute raw sql `%s`: %s",
+ sql, sqlite3_errmsg(bk->db));
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static struct rspamd_fuzzy_backend_sqlite *
+rspamd_fuzzy_backend_sqlite_open_db(const gchar *path, GError **err)
+{
+ struct rspamd_fuzzy_backend_sqlite *bk;
+ rspamd_cryptobox_hash_state_t st;
+ guchar hash_out[rspamd_cryptobox_HASHBYTES];
+
+ g_assert(path != NULL);
+
+ bk = g_malloc0(sizeof(*bk));
+ bk->path = g_strdup(path);
+ bk->expired = 0;
+ bk->pool = rspamd_mempool_new(rspamd_mempool_suggest_size(),
+ "fuzzy_backend", 0);
+ bk->db = rspamd_sqlite3_open_or_create(bk->pool, bk->path,
+ create_tables_sql, 1, err);
+
+ if (bk->db == NULL) {
+ rspamd_fuzzy_backend_sqlite_close(bk);
+
+ return NULL;
+ }
+
+ if (!rspamd_fuzzy_backend_sqlite_prepare_stmts(bk, err)) {
+ rspamd_fuzzy_backend_sqlite_close(bk);
+
+ return NULL;
+ }
+
+ /* Set id for the backend */
+ rspamd_cryptobox_hash_init(&st, NULL, 0);
+ rspamd_cryptobox_hash_update(&st, path, strlen(path));
+ rspamd_cryptobox_hash_final(&st, hash_out);
+ rspamd_snprintf(bk->id, sizeof(bk->id), "%xs", hash_out);
+ memcpy(bk->pool->tag.uid, bk->id, sizeof(bk->pool->tag.uid));
+
+ return bk;
+}
+
+struct rspamd_fuzzy_backend_sqlite *
+rspamd_fuzzy_backend_sqlite_open(const gchar *path,
+ gboolean vacuum,
+ GError **err)
+{
+ struct rspamd_fuzzy_backend_sqlite *backend;
+
+ if (path == NULL) {
+ g_set_error(err, rspamd_fuzzy_backend_sqlite_quark(),
+ ENOENT, "Path has not been specified");
+ return NULL;
+ }
+
+ /* Open database */
+ if ((backend = rspamd_fuzzy_backend_sqlite_open_db(path, err)) == NULL) {
+ return NULL;
+ }
+
+ if (rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE, RSPAMD_FUZZY_BACKEND_COUNT) == SQLITE_OK) {
+ backend->count = sqlite3_column_int64(
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0);
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_COUNT);
+
+ return backend;
+}
+
+static gint
+rspamd_fuzzy_backend_sqlite_int64_cmp(const void *a, const void *b)
+{
+ gint64 ia = *(gint64 *) a, ib = *(gint64 *) b;
+
+ return (ia - ib);
+}
+
+struct rspamd_fuzzy_reply
+rspamd_fuzzy_backend_sqlite_check(struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd, gint64 expire)
+{
+ struct rspamd_fuzzy_reply rep;
+ const struct rspamd_fuzzy_shingle_cmd *shcmd;
+ int rc;
+ gint64 timestamp;
+ gint64 shingle_values[RSPAMD_SHINGLE_SIZE], i, sel_id, cur_id,
+ cur_cnt, max_cnt;
+
+ memset(&rep, 0, sizeof(rep));
+ memcpy(rep.digest, cmd->digest, sizeof(rep.digest));
+
+ if (backend == NULL) {
+ return rep;
+ }
+
+ /* Try direct match first of all */
+ rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_CHECK,
+ cmd->digest);
+
+ if (rc == SQLITE_OK) {
+ timestamp = sqlite3_column_int64(
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 1);
+ if (time(NULL) - timestamp > expire) {
+ /* Expire element */
+ msg_debug_fuzzy_backend("requested hash has been expired");
+ }
+ else {
+ rep.v1.value = sqlite3_column_int64(
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 0);
+ rep.v1.prob = 1.0;
+ rep.v1.flag = sqlite3_column_int(
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 2);
+ }
+ }
+ else if (cmd->shingles_count > 0) {
+ /* Fuzzy match */
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_CHECK);
+ shcmd = (const struct rspamd_fuzzy_shingle_cmd *) cmd;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
+ shcmd->sgl.hashes[i], i);
+ if (rc == SQLITE_OK) {
+ shingle_values[i] = sqlite3_column_int64(
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE].stmt,
+ 0);
+ }
+ else {
+ shingle_values[i] = -1;
+ }
+ msg_debug_fuzzy_backend("looking for shingle %L -> %L: %d", i,
+ shcmd->sgl.hashes[i], rc);
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend,
+ RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE);
+
+ qsort(shingle_values, RSPAMD_SHINGLE_SIZE, sizeof(gint64),
+ rspamd_fuzzy_backend_sqlite_int64_cmp);
+ sel_id = -1;
+ cur_id = -1;
+ cur_cnt = 0;
+ max_cnt = 0;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
+ if (shingle_values[i] == -1) {
+ continue;
+ }
+
+ /* We have some value here, so we need to check it */
+ if (shingle_values[i] == cur_id) {
+ cur_cnt++;
+ }
+ else {
+ cur_id = shingle_values[i];
+ if (cur_cnt >= max_cnt) {
+ max_cnt = cur_cnt;
+ sel_id = cur_id;
+ }
+ cur_cnt = 0;
+ }
+ }
+
+ if (cur_cnt > max_cnt) {
+ max_cnt = cur_cnt;
+ }
+
+ if (sel_id != -1) {
+ /* We have some id selected here */
+ rep.v1.prob = (float) max_cnt / (float) RSPAMD_SHINGLE_SIZE;
+
+ if (rep.v1.prob > 0.5) {
+ msg_debug_fuzzy_backend(
+ "found fuzzy hash with probability %.2f",
+ rep.v1.prob);
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, sel_id);
+ if (rc == SQLITE_OK) {
+ timestamp = sqlite3_column_int64(
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
+ 2);
+ if (time(NULL) - timestamp > expire) {
+ /* Expire element */
+ msg_debug_fuzzy_backend(
+ "requested hash has been expired");
+ rep.v1.prob = 0.0;
+ }
+ else {
+ rep.ts = timestamp;
+ memcpy(rep.digest, sqlite3_column_blob(prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, 0), sizeof(rep.digest));
+ rep.v1.value = sqlite3_column_int64(
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
+ 1);
+ rep.v1.flag = sqlite3_column_int(
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
+ 3);
+ }
+ }
+ }
+ else {
+ /* Otherwise we assume that as error */
+ rep.v1.value = 0;
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend,
+ RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID);
+ }
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_CHECK);
+ rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+ return rep;
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_prepare_update(struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source)
+{
+ gint rc;
+
+ if (backend == NULL) {
+ return FALSE;
+ }
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend("cannot start transaction for updates: %s",
+ sqlite3_errmsg(backend->db));
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_add(struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd)
+{
+ int rc, i;
+ gint64 id, flag;
+ const struct rspamd_fuzzy_shingle_cmd *shcmd;
+
+ if (backend == NULL) {
+ return FALSE;
+ }
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_CHECK,
+ cmd->digest);
+
+ if (rc == SQLITE_OK) {
+ /* Check flag */
+ flag = sqlite3_column_int64(
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt,
+ 2);
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_CHECK);
+
+ if (flag == cmd->flag) {
+ /* We need to increase weight */
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_UPDATE,
+ (gint64) cmd->value,
+ cmd->digest);
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend("cannot update hash to %d -> "
+ "%*xs: %s",
+ (gint) cmd->flag,
+ (gint) sizeof(cmd->digest), cmd->digest,
+ sqlite3_errmsg(backend->db));
+ }
+ }
+ else {
+ /* We need to relearn actually */
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
+ (gint64) cmd->value,
+ (gint64) cmd->flag,
+ cmd->digest);
+
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend("cannot update hash to %d -> "
+ "%*xs: %s",
+ (gint) cmd->flag,
+ (gint) sizeof(cmd->digest), cmd->digest,
+ sqlite3_errmsg(backend->db));
+ }
+ }
+ }
+ else {
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_CHECK);
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_INSERT,
+ (gint) cmd->flag,
+ cmd->digest,
+ (gint64) cmd->value);
+
+ if (rc == SQLITE_OK) {
+ if (cmd->shingles_count > 0) {
+ id = sqlite3_last_insert_rowid(backend->db);
+ shcmd = (const struct rspamd_fuzzy_shingle_cmd *) cmd;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
+ shcmd->sgl.hashes[i], (gint64) i, id);
+ msg_debug_fuzzy_backend("add shingle %d -> %L: %L",
+ i,
+ shcmd->sgl.hashes[i],
+ id);
+
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend("cannot add shingle %d -> "
+ "%L: %L: %s",
+ i,
+ shcmd->sgl.hashes[i],
+ id, sqlite3_errmsg(backend->db));
+ }
+ }
+ }
+ }
+ else {
+ msg_warn_fuzzy_backend("cannot add hash to %d -> "
+ "%*xs: %s",
+ (gint) cmd->flag,
+ (gint) sizeof(cmd->digest), cmd->digest,
+ sqlite3_errmsg(backend->db));
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend,
+ RSPAMD_FUZZY_BACKEND_INSERT);
+ }
+
+ return (rc == SQLITE_OK);
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_finish_update(struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source, gboolean version_bump)
+{
+ gint rc = SQLITE_OK, wal_frames, wal_checkpointed, ver;
+
+ /* Get and update version */
+ if (version_bump) {
+ ver = rspamd_fuzzy_backend_sqlite_version(backend, source);
+ ++ver;
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_SET_VERSION,
+ (gint64) ver, (gint64) time(NULL), source);
+ }
+
+ if (rc == SQLITE_OK) {
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend("cannot commit updates: %s",
+ sqlite3_errmsg(backend->db));
+ rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ return FALSE;
+ }
+ else {
+ if (!rspamd_sqlite3_sync(backend->db, &wal_frames, &wal_checkpointed)) {
+ msg_warn_fuzzy_backend("cannot commit checkpoint: %s",
+ sqlite3_errmsg(backend->db));
+ }
+ else if (wal_checkpointed > 0) {
+ msg_info_fuzzy_backend("total number of frames in the wal file: "
+ "%d, checkpointed: %d",
+ wal_frames, wal_checkpointed);
+ }
+ }
+ }
+ else {
+ msg_warn_fuzzy_backend("cannot update version for %s: %s", source,
+ sqlite3_errmsg(backend->db));
+ rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_del(struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd)
+{
+ int rc = -1;
+
+ if (backend == NULL) {
+ return FALSE;
+ }
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_CHECK,
+ cmd->digest);
+
+ if (rc == SQLITE_OK) {
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_CHECK);
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_DELETE,
+ cmd->digest);
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend("cannot update hash to %d -> "
+ "%*xs: %s",
+ (gint) cmd->flag,
+ (gint) sizeof(cmd->digest), cmd->digest,
+ sqlite3_errmsg(backend->db));
+ }
+ }
+ else {
+ /* Hash is missing */
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_CHECK);
+ }
+
+ return (rc == SQLITE_OK);
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_sync(struct rspamd_fuzzy_backend_sqlite *backend,
+ gint64 expire,
+ gboolean clean_orphaned)
+{
+ struct orphaned_shingle_elt {
+ gint64 value;
+ gint64 number;
+ };
+
+ /* Do not do more than 5k ops per step */
+ const guint64 max_changes = 5000;
+ gboolean ret = FALSE;
+ gint64 expire_lim, expired;
+ gint rc, i, orphaned_cnt = 0;
+ GError *err = NULL;
+ static const gchar orphaned_shingles[] = "SELECT shingles.value,shingles.number "
+ "FROM shingles "
+ "LEFT JOIN digests ON "
+ "shingles.digest_id=digests.id WHERE "
+ "digests.id IS NULL;";
+ sqlite3_stmt *stmt;
+ GArray *orphaned;
+ struct orphaned_shingle_elt orphaned_elt, *pelt;
+
+
+ if (backend == NULL) {
+ return FALSE;
+ }
+
+ /* Perform expire */
+ if (expire > 0) {
+ expire_lim = time(NULL) - expire;
+
+ if (expire_lim > 0) {
+ ret = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+
+ if (ret == SQLITE_OK) {
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_EXPIRE, expire_lim, max_changes);
+
+ if (rc == SQLITE_OK) {
+ expired = sqlite3_changes(backend->db);
+
+ if (expired > 0) {
+ backend->expired += expired;
+ msg_info_fuzzy_backend("expired %L hashes", expired);
+ }
+ }
+ else {
+ msg_warn_fuzzy_backend(
+ "cannot execute expired statement: %s",
+ sqlite3_errmsg(backend->db));
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend,
+ RSPAMD_FUZZY_BACKEND_EXPIRE);
+
+ ret = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+ if (ret != SQLITE_OK) {
+ rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ }
+ }
+ if (ret != SQLITE_OK) {
+ msg_warn_fuzzy_backend("cannot expire db: %s",
+ sqlite3_errmsg(backend->db));
+ }
+ }
+ }
+
+ /* Cleanup database */
+ if (clean_orphaned) {
+ ret = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+
+ if (ret == SQLITE_OK) {
+ if ((rc = sqlite3_prepare_v2(backend->db,
+ orphaned_shingles,
+ -1,
+ &stmt,
+ NULL)) != SQLITE_OK) {
+ msg_warn_fuzzy_backend("cannot cleanup shingles: %s",
+ sqlite3_errmsg(backend->db));
+ }
+ else {
+ orphaned = g_array_new(FALSE,
+ FALSE,
+ sizeof(struct orphaned_shingle_elt));
+
+ while (sqlite3_step(stmt) == SQLITE_ROW) {
+ orphaned_elt.value = sqlite3_column_int64(stmt, 0);
+ orphaned_elt.number = sqlite3_column_int64(stmt, 1);
+ g_array_append_val(orphaned, orphaned_elt);
+
+ if (orphaned->len > max_changes) {
+ break;
+ }
+ }
+
+ sqlite3_finalize(stmt);
+ orphaned_cnt = orphaned->len;
+
+ if (orphaned_cnt > 0) {
+ msg_info_fuzzy_backend(
+ "going to delete %ud orphaned shingles",
+ orphaned_cnt);
+ /* Need to delete orphaned elements */
+ for (i = 0; i < (gint) orphaned_cnt; i++) {
+ pelt = &g_array_index(orphaned,
+ struct orphaned_shingle_elt,
+ i);
+ rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+ pelt->value, pelt->number);
+ }
+ }
+
+
+ g_array_free(orphaned, TRUE);
+ }
+
+ ret = rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+ if (ret == SQLITE_OK) {
+ msg_info_fuzzy_backend(
+ "deleted %ud orphaned shingles",
+ orphaned_cnt);
+ }
+ else {
+ msg_warn_fuzzy_backend(
+ "cannot synchronize fuzzy backend: %e",
+ err);
+ rspamd_fuzzy_backend_sqlite_run_stmt(backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ }
+ }
+ }
+
+ return ret;
+}
+
+
+void rspamd_fuzzy_backend_sqlite_close(struct rspamd_fuzzy_backend_sqlite *backend)
+{
+ if (backend != NULL) {
+ if (backend->db != NULL) {
+ rspamd_fuzzy_backend_sqlite_close_stmts(backend);
+ sqlite3_close(backend->db);
+ }
+
+ if (backend->path != NULL) {
+ g_free(backend->path);
+ }
+
+ if (backend->pool) {
+ rspamd_mempool_delete(backend->pool);
+ }
+
+ g_free(backend);
+ }
+}
+
+
+gsize rspamd_fuzzy_backend_sqlite_count(struct rspamd_fuzzy_backend_sqlite *backend)
+{
+ if (backend) {
+ if (rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_COUNT) == SQLITE_OK) {
+ backend->count = sqlite3_column_int64(
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0);
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_COUNT);
+
+ return backend->count;
+ }
+
+ return 0;
+}
+
+gint rspamd_fuzzy_backend_sqlite_version(struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source)
+{
+ gint ret = 0;
+
+ if (backend) {
+ if (rspamd_fuzzy_backend_sqlite_run_stmt(backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) {
+ ret = sqlite3_column_int64(
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0);
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt(backend, RSPAMD_FUZZY_BACKEND_VERSION);
+ }
+
+ return ret;
+}
+
+gsize rspamd_fuzzy_backend_sqlite_expired(struct rspamd_fuzzy_backend_sqlite *backend)
+{
+ return backend != NULL ? backend->expired : 0;
+}
+
+const gchar *
+rspamd_fuzzy_sqlite_backend_id(struct rspamd_fuzzy_backend_sqlite *backend)
+{
+ return backend != NULL ? backend->id : 0;
+}
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h
new file mode 100644
index 0000000..766f7c9
--- /dev/null
+++ b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h
@@ -0,0 +1,107 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FUZZY_BACKEND_H_
+#define FUZZY_BACKEND_H_
+
+#include "config.h"
+#include "fuzzy_wire.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct rspamd_fuzzy_backend_sqlite;
+
+/**
+ * Open fuzzy backend
+ * @param path file to open (legacy file will be converted automatically)
+ * @param err error pointer
+ * @return backend structure or NULL
+ */
+struct rspamd_fuzzy_backend_sqlite *rspamd_fuzzy_backend_sqlite_open(const gchar *path,
+ gboolean vacuum,
+ GError **err);
+
+/**
+ * Check specified fuzzy in the backend
+ * @param backend
+ * @param cmd
+ * @return reply with probability and weight
+ */
+struct rspamd_fuzzy_reply rspamd_fuzzy_backend_sqlite_check(
+ struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd,
+ gint64 expire);
+
+/**
+ * Prepare storage for updates (by starting transaction)
+ */
+gboolean rspamd_fuzzy_backend_sqlite_prepare_update(struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source);
+
+/**
+ * Add digest to the database
+ * @param backend
+ * @param cmd
+ * @return
+ */
+gboolean rspamd_fuzzy_backend_sqlite_add(struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd);
+
+/**
+ * Delete digest from the database
+ * @param backend
+ * @param cmd
+ * @return
+ */
+gboolean rspamd_fuzzy_backend_sqlite_del(
+ struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd);
+
+/**
+ * Commit updates to storage
+ */
+gboolean rspamd_fuzzy_backend_sqlite_finish_update(struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source, gboolean version_bump);
+
+/**
+ * Sync storage
+ * @param backend
+ * @return
+ */
+gboolean rspamd_fuzzy_backend_sqlite_sync(struct rspamd_fuzzy_backend_sqlite *backend,
+ gint64 expire,
+ gboolean clean_orphaned);
+
+/**
+ * Close storage
+ * @param backend
+ */
+void rspamd_fuzzy_backend_sqlite_close(struct rspamd_fuzzy_backend_sqlite *backend);
+
+gsize rspamd_fuzzy_backend_sqlite_count(struct rspamd_fuzzy_backend_sqlite *backend);
+
+gint rspamd_fuzzy_backend_sqlite_version(struct rspamd_fuzzy_backend_sqlite *backend, const gchar *source);
+
+gsize rspamd_fuzzy_backend_sqlite_expired(struct rspamd_fuzzy_backend_sqlite *backend);
+
+const gchar *rspamd_fuzzy_sqlite_backend_id(struct rspamd_fuzzy_backend_sqlite *backend);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* FUZZY_BACKEND_H_ */