summaryrefslogtreecommitdiffstats
path: root/src/libserver/maps/map.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-10 21:30:40 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-10 21:30:40 +0000
commit133a45c109da5310add55824db21af5239951f93 (patch)
treeba6ac4c0a950a0dda56451944315d66409923918 /src/libserver/maps/map.c
parentInitial commit. (diff)
downloadrspamd-133a45c109da5310add55824db21af5239951f93.tar.xz
rspamd-133a45c109da5310add55824db21af5239951f93.zip
Adding upstream version 3.8.1.upstream/3.8.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/libserver/maps/map.c')
-rw-r--r--src/libserver/maps/map.c3195
1 files changed, 3195 insertions, 0 deletions
diff --git a/src/libserver/maps/map.c b/src/libserver/maps/map.c
new file mode 100644
index 0000000..7f6a48f
--- /dev/null
+++ b/src/libserver/maps/map.c
@@ -0,0 +1,3195 @@
+/*
+ * Copyright 2023 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Implementation of map files handling
+ */
+
+#include "config.h"
+#include "map.h"
+#include "map_private.h"
+#include "libserver/http/http_connection.h"
+#include "libserver/http/http_private.h"
+#include "rspamd.h"
+#include "contrib/libev/ev.h"
+#include "contrib/uthash/utlist.h"
+
+#ifdef SYS_ZSTD
+#include "zstd.h"
+#else
+#include "contrib/zstd/zstd.h"
+#endif
+
+#undef MAP_DEBUG_REFS
+#ifdef MAP_DEBUG_REFS
+#define MAP_RETAIN(x, t) \
+ do { \
+ msg_err(G_GNUC_PRETTY_FUNCTION ": " t ": retain ref %p, refcount: %d -> %d", (x), (x)->ref.refcount, (x)->ref.refcount + 1); \
+ REF_RETAIN(x); \
+ } while (0)
+
+#define MAP_RELEASE(x, t) \
+ do { \
+ msg_err(G_GNUC_PRETTY_FUNCTION ": " t ": release ref %p, refcount: %d -> %d", (x), (x)->ref.refcount, (x)->ref.refcount - 1); \
+ REF_RELEASE(x); \
+ } while (0)
+#else
+#define MAP_RETAIN(x, t) REF_RETAIN(x)
+#define MAP_RELEASE(x, t) REF_RELEASE(x)
+#endif
+
+enum rspamd_map_periodic_opts {
+ RSPAMD_MAP_SCHEDULE_NORMAL = 0,
+ RSPAMD_MAP_SCHEDULE_ERROR = (1u << 0u),
+ RSPAMD_MAP_SCHEDULE_LOCKED = (1u << 1u),
+ RSPAMD_MAP_SCHEDULE_INIT = (1u << 2u),
+};
+
+static void free_http_cbdata_common(struct http_callback_data *cbd,
+ gboolean plan_new);
+static void free_http_cbdata_dtor(gpointer p);
+static void free_http_cbdata(struct http_callback_data *cbd);
+static void rspamd_map_process_periodic(struct map_periodic_cbdata *cbd);
+static void rspamd_map_schedule_periodic(struct rspamd_map *map, int how);
+static gboolean read_map_file_chunks(struct rspamd_map *map,
+ struct map_cb_data *cbdata,
+ const gchar *fname,
+ gsize len,
+ goffset off);
+static gboolean rspamd_map_save_http_cached_file(struct rspamd_map *map,
+ struct rspamd_map_backend *bk,
+ struct http_map_data *htdata,
+ const guchar *data,
+ gsize len);
+static gboolean rspamd_map_update_http_cached_file(struct rspamd_map *map,
+ struct rspamd_map_backend *bk,
+ struct http_map_data *htdata);
+
+guint rspamd_map_log_id = (guint) -1;
+RSPAMD_CONSTRUCTOR(rspamd_map_log_init)
+{
+ rspamd_map_log_id = rspamd_logger_add_debug_module("map");
+}
+
+/**
+ * Write HTTP request
+ */
+static void
+write_http_request(struct http_callback_data *cbd)
+{
+ gchar datebuf[128];
+ struct rspamd_http_message *msg;
+
+ msg = rspamd_http_new_message(HTTP_REQUEST);
+ if (cbd->check) {
+ msg->method = HTTP_HEAD;
+ }
+
+ msg->url = rspamd_fstring_append(msg->url,
+ cbd->data->path, strlen(cbd->data->path));
+
+ if (cbd->check) {
+ if (cbd->data->last_modified != 0) {
+ rspamd_http_date_format(datebuf, sizeof(datebuf),
+ cbd->data->last_modified);
+ rspamd_http_message_add_header(msg, "If-Modified-Since",
+ datebuf);
+ }
+ if (cbd->data->etag) {
+ rspamd_http_message_add_header_len(msg, "If-None-Match",
+ cbd->data->etag->str, cbd->data->etag->len);
+ }
+ }
+
+ msg->url = rspamd_fstring_append(msg->url, cbd->data->rest,
+ strlen(cbd->data->rest));
+
+ if (cbd->data->userinfo) {
+ rspamd_http_message_add_header(msg, "Authorization",
+ cbd->data->userinfo);
+ }
+
+ MAP_RETAIN(cbd, "http_callback_data");
+ rspamd_http_connection_write_message(cbd->conn,
+ msg,
+ cbd->data->host,
+ NULL,
+ cbd,
+ cbd->timeout);
+}
+
+/**
+ * Callback for destroying HTTP callback data
+ */
+static void
+free_http_cbdata_common(struct http_callback_data *cbd, gboolean plan_new)
+{
+ struct map_periodic_cbdata *periodic = cbd->periodic;
+
+ if (cbd->shmem_data) {
+ rspamd_http_message_shmem_unref(cbd->shmem_data);
+ }
+
+ if (cbd->pk) {
+ rspamd_pubkey_unref(cbd->pk);
+ }
+
+ if (cbd->conn) {
+ rspamd_http_connection_unref(cbd->conn);
+ cbd->conn = NULL;
+ }
+
+ if (cbd->addrs) {
+ rspamd_inet_addr_t *addr;
+ guint i;
+
+ PTR_ARRAY_FOREACH(cbd->addrs, i, addr)
+ {
+ rspamd_inet_address_free(addr);
+ }
+
+ g_ptr_array_free(cbd->addrs, TRUE);
+ }
+
+
+ MAP_RELEASE(cbd->bk, "rspamd_map_backend");
+
+ if (periodic) {
+ /* Detached in case of HTTP error */
+ MAP_RELEASE(periodic, "periodic");
+ }
+
+ g_free(cbd);
+}
+
+static void
+free_http_cbdata(struct http_callback_data *cbd)
+{
+ cbd->map->tmp_dtor = NULL;
+ cbd->map->tmp_dtor_data = NULL;
+
+ free_http_cbdata_common(cbd, TRUE);
+}
+
+static void
+free_http_cbdata_dtor(gpointer p)
+{
+ struct http_callback_data *cbd = p;
+ struct rspamd_map *map;
+
+ map = cbd->map;
+ if (cbd->stage == http_map_http_conn) {
+ REF_RELEASE(cbd);
+ }
+ else {
+ /* We cannot terminate DNS requests sent */
+ cbd->stage = http_map_terminated;
+ }
+
+ msg_warn_map("%s: "
+ "connection with http server is terminated: worker is stopping",
+ map->name);
+}
+
+/*
+ * HTTP callbacks
+ */
+static void
+http_map_error(struct rspamd_http_connection *conn,
+ GError *err)
+{
+ struct http_callback_data *cbd = conn->ud;
+ struct rspamd_map *map;
+
+ map = cbd->map;
+
+ if (cbd->periodic) {
+ cbd->periodic->errored = TRUE;
+ msg_err_map("error reading %s(%s): "
+ "connection with http server terminated incorrectly: %e",
+ cbd->bk->uri,
+ cbd->addr ? rspamd_inet_address_to_string_pretty(cbd->addr) : "",
+ err);
+
+ rspamd_map_process_periodic(cbd->periodic);
+ }
+
+ MAP_RELEASE(cbd, "http_callback_data");
+}
+
+static void
+rspamd_map_cache_cb(struct ev_loop *loop, ev_timer *w, int revents)
+{
+ struct rspamd_http_map_cached_cbdata *cache_cbd = (struct rspamd_http_map_cached_cbdata *)
+ w->data;
+ struct rspamd_map *map;
+ struct http_map_data *data;
+
+ map = cache_cbd->map;
+ data = cache_cbd->data;
+
+ if (cache_cbd->gen != cache_cbd->data->gen) {
+ /* We have another update, so this cache element is obviously expired */
+ /*
+ * Important!: we do not set cache availability to zero here, as there
+ * might be fresh cache
+ */
+ msg_info_map("cached data is now expired (gen mismatch %L != %L) for %s; shm name=%s; refcount=%d",
+ cache_cbd->gen, cache_cbd->data->gen, map->name, cache_cbd->shm->shm_name,
+ cache_cbd->shm->ref.refcount);
+ MAP_RELEASE(cache_cbd->shm, "rspamd_http_map_cached_cbdata");
+ ev_timer_stop(loop, &cache_cbd->timeout);
+ g_free(cache_cbd);
+ }
+ else if (cache_cbd->data->last_checked >= cache_cbd->last_checked) {
+ /*
+ * We checked map but we have not found anything more recent,
+ * reschedule cache check
+ */
+ if (cache_cbd->map->poll_timeout >
+ rspamd_get_calendar_ticks() - cache_cbd->data->last_checked) {
+ w->repeat = cache_cbd->map->poll_timeout -
+ (rspamd_get_calendar_ticks() - cache_cbd->data->last_checked);
+ }
+ else {
+ w->repeat = cache_cbd->map->poll_timeout;
+ }
+
+ if (w->repeat < 0) {
+ msg_info_map("cached data for %s has skewed check time: %d last checked, "
+ "%d poll timeout, %.2f diff; shm name=%s; refcount=%d",
+ map->name, (int) cache_cbd->data->last_checked,
+ (int) cache_cbd->map->poll_timeout,
+ (rspamd_get_calendar_ticks() - cache_cbd->data->last_checked),
+ cache_cbd->shm->shm_name,
+ cache_cbd->shm->ref.refcount);
+ w->repeat = 0.0;
+ }
+
+ cache_cbd->last_checked = cache_cbd->data->last_checked;
+ msg_debug_map("cached data is up to date for %s", map->name);
+ ev_timer_again(loop, &cache_cbd->timeout);
+ }
+ else {
+ data->cur_cache_cbd = NULL;
+ g_atomic_int_set(&data->cache->available, 0);
+ msg_info_map("cached data is now expired for %s; shm name=%s; refcount=%d",
+ map->name,
+ cache_cbd->shm->shm_name,
+ cache_cbd->shm->ref.refcount);
+ MAP_RELEASE(cache_cbd->shm, "rspamd_http_map_cached_cbdata");
+ ev_timer_stop(loop, &cache_cbd->timeout);
+ g_free(cache_cbd);
+ }
+}
+
+static int
+http_map_finish(struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg)
+{
+ struct http_callback_data *cbd = conn->ud;
+ struct rspamd_map *map;
+ struct rspamd_map_backend *bk;
+ struct http_map_data *data;
+ struct rspamd_http_map_cached_cbdata *cache_cbd;
+ const rspamd_ftok_t *expires_hdr, *etag_hdr;
+ char next_check_date[128];
+ guchar *in = NULL;
+ gsize dlen = 0;
+
+ map = cbd->map;
+ bk = cbd->bk;
+ data = bk->data.hd;
+
+ if (msg->code == 200) {
+
+ if (cbd->check) {
+ msg_info_map("need to reread map from %s", cbd->bk->uri);
+ cbd->periodic->need_modify = TRUE;
+ /* Reset the whole chain */
+ cbd->periodic->cur_backend = 0;
+ /* Reset cache, old cached data will be cleaned on timeout */
+ g_atomic_int_set(&data->cache->available, 0);
+ data->cur_cache_cbd = NULL;
+
+ rspamd_map_process_periodic(cbd->periodic);
+ MAP_RELEASE(cbd, "http_callback_data");
+
+ return 0;
+ }
+
+ cbd->data->last_checked = msg->date;
+
+ if (msg->last_modified) {
+ cbd->data->last_modified = msg->last_modified;
+ }
+ else {
+ cbd->data->last_modified = msg->date;
+ }
+
+
+ /* Unsigned version - just open file */
+ cbd->shmem_data = rspamd_http_message_shmem_ref(msg);
+ cbd->data_len = msg->body_buf.len;
+
+ if (cbd->data_len == 0) {
+ msg_err_map("cannot read empty map");
+ goto err;
+ }
+
+ g_assert(cbd->shmem_data != NULL);
+
+ in = rspamd_shmem_xmap(cbd->shmem_data->shm_name, PROT_READ, &dlen);
+
+ if (in == NULL) {
+ msg_err_map("cannot read tempfile %s: %s",
+ cbd->shmem_data->shm_name,
+ strerror(errno));
+ goto err;
+ }
+
+ /* Check for expires */
+ double cached_timeout = map->poll_timeout * 2;
+
+ expires_hdr = rspamd_http_message_find_header(msg, "Expires");
+
+ if (expires_hdr) {
+ time_t hdate;
+
+ hdate = rspamd_http_parse_date(expires_hdr->begin, expires_hdr->len);
+
+ if (hdate != (time_t) -1 && hdate > msg->date) {
+ cached_timeout = map->next_check - msg->date +
+ map->poll_timeout * 2;
+
+ map->next_check = hdate;
+ }
+ else {
+ msg_info_map("invalid expires header: %T, ignore it", expires_hdr);
+ map->next_check = 0;
+ }
+ }
+
+ /* Check for etag */
+ etag_hdr = rspamd_http_message_find_header(msg, "ETag");
+
+ if (etag_hdr) {
+ if (cbd->data->etag) {
+ /* Remove old etag */
+ rspamd_fstring_free(cbd->data->etag);
+ }
+
+ cbd->data->etag = rspamd_fstring_new_init(etag_hdr->begin,
+ etag_hdr->len);
+ }
+ else {
+ if (cbd->data->etag) {
+ /* Remove and clear old etag */
+ rspamd_fstring_free(cbd->data->etag);
+ cbd->data->etag = NULL;
+ }
+ }
+
+ MAP_RETAIN(cbd->shmem_data, "shmem_data");
+ cbd->data->gen++;
+ /*
+ * We know that a map is in the locked state
+ */
+ g_atomic_int_set(&data->cache->available, 1);
+ /* Store cached data */
+ rspamd_strlcpy(data->cache->shmem_name, cbd->shmem_data->shm_name,
+ sizeof(data->cache->shmem_name));
+ data->cache->len = cbd->data_len;
+ data->cache->last_modified = cbd->data->last_modified;
+ cache_cbd = g_malloc0(sizeof(*cache_cbd));
+ cache_cbd->shm = cbd->shmem_data;
+ cache_cbd->event_loop = cbd->event_loop;
+ cache_cbd->map = map;
+ cache_cbd->data = cbd->data;
+ cache_cbd->last_checked = cbd->data->last_checked;
+ cache_cbd->gen = cbd->data->gen;
+ MAP_RETAIN(cache_cbd->shm, "shmem_data");
+ msg_info_map("stored map data in a shared memory cache: %s",
+ cache_cbd->shm->shm_name);
+
+ ev_timer_init(&cache_cbd->timeout, rspamd_map_cache_cb, cached_timeout,
+ 0.0);
+ ev_timer_start(cbd->event_loop, &cache_cbd->timeout);
+ cache_cbd->timeout.data = cache_cbd;
+ data->cur_cache_cbd = cache_cbd;
+
+ if (map->next_check) {
+ rspamd_http_date_format(next_check_date, sizeof(next_check_date),
+ map->next_check);
+ }
+ else {
+ rspamd_http_date_format(next_check_date, sizeof(next_check_date),
+ rspamd_get_calendar_ticks() + map->poll_timeout);
+ }
+
+
+ if (cbd->bk->is_compressed) {
+ ZSTD_DStream *zstream;
+ ZSTD_inBuffer zin;
+ ZSTD_outBuffer zout;
+ guchar *out;
+ gsize outlen, r;
+
+ zstream = ZSTD_createDStream();
+ ZSTD_initDStream(zstream);
+
+ zin.pos = 0;
+ zin.src = in;
+ zin.size = dlen;
+
+ if ((outlen = ZSTD_getDecompressedSize(zin.src, zin.size)) == 0) {
+ outlen = ZSTD_DStreamOutSize();
+ }
+
+ out = g_malloc(outlen);
+
+ zout.dst = out;
+ zout.pos = 0;
+ zout.size = outlen;
+
+ while (zin.pos < zin.size) {
+ r = ZSTD_decompressStream(zstream, &zout, &zin);
+
+ if (ZSTD_isError(r)) {
+ msg_err_map("%s(%s): cannot decompress data: %s",
+ cbd->bk->uri,
+ rspamd_inet_address_to_string_pretty(cbd->addr),
+ ZSTD_getErrorName(r));
+ ZSTD_freeDStream(zstream);
+ g_free(out);
+ MAP_RELEASE(cbd->shmem_data, "shmem_data");
+ goto err;
+ }
+
+ if (zout.pos == zout.size) {
+ /* We need to extend output buffer */
+ zout.size = zout.size * 2 + 1.0;
+ out = g_realloc(zout.dst, zout.size);
+ zout.dst = out;
+ }
+ }
+
+ ZSTD_freeDStream(zstream);
+ msg_info_map("%s(%s): read map data %z bytes compressed, "
+ "%z uncompressed, next check at %s",
+ cbd->bk->uri,
+ rspamd_inet_address_to_string_pretty(cbd->addr),
+ dlen, zout.pos, next_check_date);
+ map->read_callback(out, zout.pos, &cbd->periodic->cbdata, TRUE);
+ rspamd_map_save_http_cached_file(map, bk, cbd->data, out, zout.pos);
+ g_free(out);
+ }
+ else {
+ msg_info_map("%s(%s): read map data %z bytes, next check at %s",
+ cbd->bk->uri,
+ rspamd_inet_address_to_string_pretty(cbd->addr),
+ dlen, next_check_date);
+ rspamd_map_save_http_cached_file(map, bk, cbd->data, in, cbd->data_len);
+ map->read_callback(in, cbd->data_len, &cbd->periodic->cbdata, TRUE);
+ }
+
+ MAP_RELEASE(cbd->shmem_data, "shmem_data");
+
+ cbd->periodic->cur_backend++;
+ munmap(in, dlen);
+ rspamd_map_process_periodic(cbd->periodic);
+ }
+ else if (msg->code == 304 && cbd->check) {
+ cbd->data->last_checked = msg->date;
+
+ if (msg->last_modified) {
+ cbd->data->last_modified = msg->last_modified;
+ }
+ else {
+ cbd->data->last_modified = msg->date;
+ }
+
+ expires_hdr = rspamd_http_message_find_header(msg, "Expires");
+
+ if (expires_hdr) {
+ time_t hdate;
+
+ hdate = rspamd_http_parse_date(expires_hdr->begin, expires_hdr->len);
+ if (hdate != (time_t) -1 && hdate > msg->date) {
+ map->next_check = hdate;
+ }
+ else {
+ msg_info_map("invalid expires header: %T, ignore it", expires_hdr);
+ map->next_check = 0;
+ }
+ }
+
+ etag_hdr = rspamd_http_message_find_header(msg, "ETag");
+
+ if (etag_hdr) {
+ if (cbd->data->etag) {
+ /* Remove old etag */
+ rspamd_fstring_free(cbd->data->etag);
+ cbd->data->etag = rspamd_fstring_new_init(etag_hdr->begin,
+ etag_hdr->len);
+ }
+ }
+
+ if (map->next_check) {
+ rspamd_http_date_format(next_check_date, sizeof(next_check_date),
+ map->next_check);
+ msg_info_map("data is not modified for server %s, next check at %s "
+ "(http cache based: %T)",
+ cbd->data->host, next_check_date, expires_hdr);
+ }
+ else {
+ rspamd_http_date_format(next_check_date, sizeof(next_check_date),
+ rspamd_get_calendar_ticks() + map->poll_timeout);
+ msg_info_map("data is not modified for server %s, next check at %s "
+ "(timer based)",
+ cbd->data->host, next_check_date);
+ }
+
+ rspamd_map_update_http_cached_file(map, bk, cbd->data);
+ cbd->periodic->cur_backend++;
+ rspamd_map_process_periodic(cbd->periodic);
+ }
+ else {
+ msg_info_map("cannot load map %s from %s: HTTP error %d",
+ bk->uri, cbd->data->host, msg->code);
+ goto err;
+ }
+
+ MAP_RELEASE(cbd, "http_callback_data");
+ return 0;
+
+err:
+ cbd->periodic->errored = 1;
+ rspamd_map_process_periodic(cbd->periodic);
+ MAP_RELEASE(cbd, "http_callback_data");
+
+ return 0;
+}
+
+static gboolean
+read_map_file_chunks(struct rspamd_map *map, struct map_cb_data *cbdata,
+ const gchar *fname, gsize len, goffset off)
+{
+ gint fd;
+ gssize r, avail;
+ gsize buflen = 1024 * 1024;
+ gchar *pos, *bytes;
+
+ fd = rspamd_file_xopen(fname, O_RDONLY, 0, TRUE);
+
+ if (fd == -1) {
+ msg_err_map("can't open map for buffered reading %s: %s",
+ fname, strerror(errno));
+ return FALSE;
+ }
+
+ if (lseek(fd, off, SEEK_SET) == -1) {
+ msg_err_map("can't seek in map to pos %d for buffered reading %s: %s",
+ (gint) off, fname, strerror(errno));
+ close(fd);
+
+ return FALSE;
+ }
+
+ buflen = MIN(len, buflen);
+ bytes = g_malloc(buflen);
+ avail = buflen;
+ pos = bytes;
+
+ while ((r = read(fd, pos, avail)) > 0) {
+ gchar *end = bytes + (pos - bytes) + r;
+ msg_debug_map("%s: read map chunk, %z bytes", fname,
+ r);
+ pos = map->read_callback(bytes, end - bytes, cbdata, r == len);
+
+ if (pos && pos > bytes && pos < end) {
+ guint remain = end - pos;
+
+ memmove(bytes, pos, remain);
+ pos = bytes + remain;
+ /* Need to preserve the remain */
+ avail = ((gssize) buflen) - remain;
+
+ if (avail <= 0) {
+ /* Try realloc, too large element */
+ g_assert(buflen >= remain);
+ bytes = g_realloc(bytes, buflen * 2);
+
+ pos = bytes + remain; /* Adjust */
+ avail += buflen;
+ buflen *= 2;
+ }
+ }
+ else {
+ avail = buflen;
+ pos = bytes;
+ }
+
+ len -= r;
+ }
+
+ if (r == -1) {
+ msg_err_map("can't read from map %s: %s", fname, strerror(errno));
+ close(fd);
+ g_free(bytes);
+
+ return FALSE;
+ }
+
+ close(fd);
+ g_free(bytes);
+
+ return TRUE;
+}
+
+static gboolean
+rspamd_map_check_sig_pk_mem(const guchar *sig,
+ gsize siglen,
+ struct rspamd_map *map,
+ const guchar *input,
+ gsize inlen,
+ struct rspamd_cryptobox_pubkey *pk)
+{
+ GString *b32_key;
+ gboolean ret = TRUE;
+
+ if (siglen != rspamd_cryptobox_signature_bytes(RSPAMD_CRYPTOBOX_MODE_25519)) {
+ msg_err_map("can't open signature for %s: invalid size: %z", map->name, siglen);
+
+ ret = FALSE;
+ }
+
+ if (ret && !rspamd_cryptobox_verify(sig, siglen, input, inlen,
+ rspamd_pubkey_get_pk(pk, NULL), RSPAMD_CRYPTOBOX_MODE_25519)) {
+ msg_err_map("can't verify signature for %s: incorrect signature", map->name);
+
+ ret = FALSE;
+ }
+
+ if (ret) {
+ b32_key = rspamd_pubkey_print(pk,
+ RSPAMD_KEYPAIR_BASE32 | RSPAMD_KEYPAIR_PUBKEY);
+ msg_info_map("verified signature for %s using trusted key %v",
+ map->name, b32_key);
+ g_string_free(b32_key, TRUE);
+ }
+
+ return ret;
+}
+
+static gboolean
+rspamd_map_check_file_sig(const char *fname,
+ struct rspamd_map *map,
+ struct rspamd_map_backend *bk,
+ const guchar *input,
+ gsize inlen)
+{
+ guchar *data;
+ struct rspamd_cryptobox_pubkey *pk = NULL;
+ GString *b32_key;
+ gboolean ret = TRUE;
+ gsize len = 0;
+ gchar fpath[PATH_MAX];
+
+ if (bk->trusted_pubkey == NULL) {
+ /* Try to load and check pubkey */
+ rspamd_snprintf(fpath, sizeof(fpath), "%s.pub", fname);
+ data = rspamd_file_xmap(fpath, PROT_READ, &len, TRUE);
+
+ if (data == NULL) {
+ msg_err_map("can't open pubkey %s: %s", fpath, strerror(errno));
+ return FALSE;
+ }
+
+ pk = rspamd_pubkey_from_base32(data, len, RSPAMD_KEYPAIR_SIGN,
+ RSPAMD_CRYPTOBOX_MODE_25519);
+ munmap(data, len);
+
+ if (pk == NULL) {
+ msg_err_map("can't load pubkey %s", fpath);
+ return FALSE;
+ }
+
+ /* We just check pk against the trusted db of keys */
+ b32_key = rspamd_pubkey_print(pk,
+ RSPAMD_KEYPAIR_BASE32 | RSPAMD_KEYPAIR_PUBKEY);
+ g_assert(b32_key != NULL);
+
+ if (g_hash_table_lookup(map->cfg->trusted_keys, b32_key->str) == NULL) {
+ msg_err_map("pubkey loaded from %s is untrusted: %v", fpath,
+ b32_key);
+ g_string_free(b32_key, TRUE);
+ rspamd_pubkey_unref(pk);
+
+ return FALSE;
+ }
+
+ g_string_free(b32_key, TRUE);
+ }
+ else {
+ pk = rspamd_pubkey_ref(bk->trusted_pubkey);
+ }
+
+ rspamd_snprintf(fpath, sizeof(fpath), "%s.sig", fname);
+ data = rspamd_shmem_xmap(fpath, PROT_READ, &len);
+
+ if (data == NULL) {
+ msg_err_map("can't open signature %s: %s", fpath, strerror(errno));
+ ret = FALSE;
+ }
+
+ if (ret) {
+ ret = rspamd_map_check_sig_pk_mem(data, len, map, input, inlen, pk);
+ munmap(data, len);
+ }
+
+ rspamd_pubkey_unref(pk);
+
+ return ret;
+}
+
+/**
+ * Callback for reading data from file
+ */
+static gboolean
+read_map_file(struct rspamd_map *map, struct file_map_data *data,
+ struct rspamd_map_backend *bk, struct map_periodic_cbdata *periodic)
+{
+ gchar *bytes;
+ gsize len;
+ struct stat st;
+
+ if (map->read_callback == NULL || map->fin_callback == NULL) {
+ msg_err_map("%s: bad callback for reading map file",
+ data->filename);
+ return FALSE;
+ }
+
+ if (stat(data->filename, &st) == -1) {
+ /* File does not exist, skipping */
+ if (errno != ENOENT) {
+ msg_err_map("%s: map file is unavailable for reading: %s",
+ data->filename, strerror(errno));
+
+ return FALSE;
+ }
+ else {
+ msg_info_map("%s: map file is not found; "
+ "it will be read automatically if created",
+ data->filename);
+ return TRUE;
+ }
+ }
+
+ ev_stat_stat(map->event_loop, &data->st_ev);
+ len = st.st_size;
+
+ if (bk->is_signed) {
+ bytes = rspamd_file_xmap(data->filename, PROT_READ, &len, TRUE);
+
+ if (bytes == NULL) {
+ msg_err_map("can't open map %s: %s", data->filename, strerror(errno));
+ return FALSE;
+ }
+
+ if (!rspamd_map_check_file_sig(data->filename, map, bk, bytes, len)) {
+ munmap(bytes, len);
+
+ return FALSE;
+ }
+
+ munmap(bytes, len);
+ }
+
+ if (len > 0) {
+ if (map->no_file_read) {
+ /* We just call read callback with backend name */
+ map->read_callback(data->filename, strlen(data->filename),
+ &periodic->cbdata, TRUE);
+ }
+ else {
+ if (bk->is_compressed) {
+ bytes = rspamd_file_xmap(data->filename, PROT_READ, &len, TRUE);
+
+ if (bytes == NULL) {
+ msg_err_map("can't open map %s: %s", data->filename, strerror(errno));
+ return FALSE;
+ }
+
+ ZSTD_DStream *zstream;
+ ZSTD_inBuffer zin;
+ ZSTD_outBuffer zout;
+ guchar *out;
+ gsize outlen, r;
+
+ zstream = ZSTD_createDStream();
+ ZSTD_initDStream(zstream);
+
+ zin.pos = 0;
+ zin.src = bytes;
+ zin.size = len;
+
+ if ((outlen = ZSTD_getDecompressedSize(zin.src, zin.size)) == 0) {
+ outlen = ZSTD_DStreamOutSize();
+ }
+
+ out = g_malloc(outlen);
+
+ zout.dst = out;
+ zout.pos = 0;
+ zout.size = outlen;
+
+ while (zin.pos < zin.size) {
+ r = ZSTD_decompressStream(zstream, &zout, &zin);
+
+ if (ZSTD_isError(r)) {
+ msg_err_map("%s: cannot decompress data: %s",
+ data->filename,
+ ZSTD_getErrorName(r));
+ ZSTD_freeDStream(zstream);
+ g_free(out);
+ munmap(bytes, len);
+ return FALSE;
+ }
+
+ if (zout.pos == zout.size) {
+ /* We need to extend output buffer */
+ zout.size = zout.size * 2 + 1;
+ out = g_realloc(zout.dst, zout.size);
+ zout.dst = out;
+ }
+ }
+
+ ZSTD_freeDStream(zstream);
+ msg_info_map("%s: read map data, %z bytes compressed, "
+ "%z uncompressed)",
+ data->filename,
+ len, zout.pos);
+ map->read_callback(out, zout.pos, &periodic->cbdata, TRUE);
+ g_free(out);
+
+ munmap(bytes, len);
+ }
+ else {
+ /* Perform buffered read: fail-safe */
+ if (!read_map_file_chunks(map, &periodic->cbdata, data->filename,
+ len, 0)) {
+ return FALSE;
+ }
+ }
+ }
+ }
+ else {
+ /* Empty map */
+ map->read_callback(NULL, 0, &periodic->cbdata, TRUE);
+ }
+
+ return TRUE;
+}
+
+static gboolean
+read_map_static(struct rspamd_map *map, struct static_map_data *data,
+ struct rspamd_map_backend *bk, struct map_periodic_cbdata *periodic)
+{
+ guchar *bytes;
+ gsize len;
+
+ if (map->read_callback == NULL || map->fin_callback == NULL) {
+ msg_err_map("%s: bad callback for reading map file", map->name);
+ data->processed = TRUE;
+ return FALSE;
+ }
+
+ bytes = data->data;
+ len = data->len;
+
+ if (len > 0) {
+ if (bk->is_compressed) {
+ ZSTD_DStream *zstream;
+ ZSTD_inBuffer zin;
+ ZSTD_outBuffer zout;
+ guchar *out;
+ gsize outlen, r;
+
+ zstream = ZSTD_createDStream();
+ ZSTD_initDStream(zstream);
+
+ zin.pos = 0;
+ zin.src = bytes;
+ zin.size = len;
+
+ if ((outlen = ZSTD_getDecompressedSize(zin.src, zin.size)) == 0) {
+ outlen = ZSTD_DStreamOutSize();
+ }
+
+ out = g_malloc(outlen);
+
+ zout.dst = out;
+ zout.pos = 0;
+ zout.size = outlen;
+
+ while (zin.pos < zin.size) {
+ r = ZSTD_decompressStream(zstream, &zout, &zin);
+
+ if (ZSTD_isError(r)) {
+ msg_err_map("%s: cannot decompress data: %s",
+ map->name,
+ ZSTD_getErrorName(r));
+ ZSTD_freeDStream(zstream);
+ g_free(out);
+
+ return FALSE;
+ }
+
+ if (zout.pos == zout.size) {
+ /* We need to extend output buffer */
+ zout.size = zout.size * 2 + 1;
+ out = g_realloc(zout.dst, zout.size);
+ zout.dst = out;
+ }
+ }
+
+ ZSTD_freeDStream(zstream);
+ msg_info_map("%s: read map data, %z bytes compressed, "
+ "%z uncompressed)",
+ map->name,
+ len, zout.pos);
+ map->read_callback(out, zout.pos, &periodic->cbdata, TRUE);
+ g_free(out);
+ }
+ else {
+ msg_info_map("%s: read map data, %z bytes",
+ map->name, len);
+ map->read_callback(bytes, len, &periodic->cbdata, TRUE);
+ }
+ }
+ else {
+ map->read_callback(NULL, 0, &periodic->cbdata, TRUE);
+ }
+
+ data->processed = TRUE;
+
+ return TRUE;
+}
+
+static void
+rspamd_map_periodic_dtor(struct map_periodic_cbdata *periodic)
+{
+ struct rspamd_map *map;
+
+ map = periodic->map;
+ msg_debug_map("periodic dtor %p", periodic);
+
+ if (periodic->need_modify || periodic->cbdata.errored) {
+ /* Need to notify the real data structure */
+ periodic->map->fin_callback(&periodic->cbdata, periodic->map->user_data);
+
+ if (map->on_load_function) {
+ map->on_load_function(map, map->on_load_ud);
+ }
+ }
+ else {
+ /* Not modified */
+ }
+
+ if (periodic->locked) {
+ g_atomic_int_set(periodic->map->locked, 0);
+ msg_debug_map("unlocked map %s", periodic->map->name);
+
+ if (periodic->map->wrk->state == rspamd_worker_state_running) {
+ rspamd_map_schedule_periodic(periodic->map,
+ RSPAMD_SYMBOL_RESULT_NORMAL);
+ }
+ else {
+ msg_debug_map("stop scheduling periodics for %s; terminating state",
+ periodic->map->name);
+ }
+ }
+
+ g_free(periodic);
+}
+
+/* Called on timer execution */
+static void
+rspamd_map_periodic_callback(struct ev_loop *loop, ev_timer *w, int revents)
+{
+ struct map_periodic_cbdata *cbd = (struct map_periodic_cbdata *) w->data;
+
+ MAP_RETAIN(cbd, "periodic");
+ ev_timer_stop(loop, w);
+ rspamd_map_process_periodic(cbd);
+ MAP_RELEASE(cbd, "periodic");
+}
+
+static void
+rspamd_map_schedule_periodic(struct rspamd_map *map, int how)
+{
+ const gdouble error_mult = 20.0, lock_mult = 0.1;
+ static const gdouble min_timer_interval = 2.0;
+ const gchar *reason = "unknown reason";
+ gdouble jittered_sec;
+ gdouble timeout;
+ struct map_periodic_cbdata *cbd;
+
+ if (map->scheduled_check || (map->wrk &&
+ map->wrk->state != rspamd_worker_state_running)) {
+ /*
+ * Do not schedule check if some check is already scheduled or
+ * if worker is going to die
+ */
+ return;
+ }
+
+ if (!(how & RSPAMD_MAP_SCHEDULE_INIT) && map->static_only) {
+ /* No need to schedule anything for static maps */
+ return;
+ }
+
+ if (map->non_trivial && map->next_check != 0) {
+ timeout = map->next_check - rspamd_get_calendar_ticks();
+ map->next_check = 0;
+
+ if (timeout > 0 && timeout < map->poll_timeout) {
+ /* Early check case, jitter */
+ gdouble poll_timeout = map->poll_timeout;
+
+ if (how & RSPAMD_MAP_SCHEDULE_ERROR) {
+ poll_timeout = map->poll_timeout * error_mult;
+ reason = "early active non-trivial check (after error)";
+ }
+ else if (how & RSPAMD_MAP_SCHEDULE_LOCKED) {
+ poll_timeout = map->poll_timeout * lock_mult;
+ reason = "early active non-trivial check (after being locked)";
+ }
+ else {
+ reason = "early active non-trivial check";
+ }
+
+ jittered_sec = MIN(timeout, poll_timeout);
+ }
+ else if (timeout <= 0) {
+ /* Data is already expired, need to check */
+ if (how & RSPAMD_MAP_SCHEDULE_ERROR) {
+ /* In case of error we still need to increase delay */
+ jittered_sec = map->poll_timeout * error_mult;
+ reason = "expired non-trivial data (after error)";
+ }
+ else {
+ jittered_sec = 0.0;
+ reason = "expired non-trivial data";
+ }
+ }
+ else {
+ /* No need to check now, wait till next_check */
+ jittered_sec = timeout;
+ reason = "valid non-trivial data";
+ }
+ }
+ else {
+ /* No valid information when to check a map, plan a timer based check */
+ timeout = map->poll_timeout;
+
+ if (how & RSPAMD_MAP_SCHEDULE_INIT) {
+ if (map->active_http) {
+ /* Spill maps load to get better chances to hit ssl cache */
+ timeout = rspamd_time_jitter(0.0, 2.0);
+ }
+ else {
+ timeout = 0.0;
+ }
+
+ reason = "init scheduled check";
+ }
+ else {
+ if (how & RSPAMD_MAP_SCHEDULE_ERROR) {
+ timeout = map->poll_timeout * error_mult;
+ reason = "errored scheduled check";
+ }
+ else if (how & RSPAMD_MAP_SCHEDULE_LOCKED) {
+ timeout = map->poll_timeout * lock_mult;
+ reason = "locked scheduled check";
+ }
+ else {
+ reason = "normal scheduled check";
+ }
+ }
+
+ jittered_sec = rspamd_time_jitter(timeout, 0);
+ }
+
+ /* Now, we do some sanity checks for jittered seconds */
+ if (!(how & RSPAMD_MAP_SCHEDULE_INIT)) {
+ /* Never allow too low interval between timer checks, it is expensive */
+ if (jittered_sec < min_timer_interval) {
+ jittered_sec = rspamd_time_jitter(min_timer_interval, 0);
+ }
+
+ if (map->non_trivial) {
+ /*
+ * Even if we are reported that we need to reload cache often, we
+ * still want to be sane in terms of events...
+ */
+ if (jittered_sec < min_timer_interval * 2.0) {
+ if (map->nelts > 0) {
+ jittered_sec = min_timer_interval * 3.0;
+ }
+ }
+ }
+ }
+
+ cbd = g_malloc0(sizeof(*cbd));
+ cbd->cbdata.prev_data = *map->user_data;
+ cbd->cbdata.cur_data = NULL;
+ cbd->cbdata.map = map;
+ cbd->map = map;
+ map->scheduled_check = cbd;
+ REF_INIT_RETAIN(cbd, rspamd_map_periodic_dtor);
+
+ cbd->ev.data = cbd;
+ ev_timer_init(&cbd->ev, rspamd_map_periodic_callback, jittered_sec, 0.0);
+ ev_timer_start(map->event_loop, &cbd->ev);
+
+ msg_debug_map("schedule new periodic event %p in %.3f seconds for %s; reason: %s",
+ cbd, jittered_sec, map->name, reason);
+}
+
+static gint
+rspamd_map_af_to_weight(const rspamd_inet_addr_t *addr)
+{
+ int ret;
+
+ switch (rspamd_inet_address_get_af(addr)) {
+ case AF_UNIX:
+ ret = 2;
+ break;
+ case AF_INET:
+ ret = 1;
+ break;
+ default:
+ ret = 0;
+ break;
+ }
+
+ return ret;
+}
+
+static gint
+rspamd_map_dns_address_sort_func(gconstpointer a, gconstpointer b)
+{
+ const rspamd_inet_addr_t *ip1 = *(const rspamd_inet_addr_t **) a,
+ *ip2 = *(const rspamd_inet_addr_t **) b;
+ gint w1, w2;
+
+ w1 = rspamd_map_af_to_weight(ip1);
+ w2 = rspamd_map_af_to_weight(ip2);
+
+ /* Inverse order */
+ return w2 - w1;
+}
+
+static void
+rspamd_map_dns_callback(struct rdns_reply *reply, void *arg)
+{
+ struct http_callback_data *cbd = arg;
+ struct rdns_reply_entry *cur_rep;
+ struct rspamd_map *map;
+ guint flags = RSPAMD_HTTP_CLIENT_SIMPLE | RSPAMD_HTTP_CLIENT_SHARED;
+
+ map = cbd->map;
+
+ msg_debug_map("got dns reply with code %s on stage %d",
+ rdns_strerror(reply->code), cbd->stage);
+
+ if (cbd->stage == http_map_terminated) {
+ MAP_RELEASE(cbd, "http_callback_data");
+ return;
+ }
+
+ if (reply->code == RDNS_RC_NOERROR) {
+ DL_FOREACH(reply->entries, cur_rep)
+ {
+ rspamd_inet_addr_t *addr;
+ addr = rspamd_inet_address_from_rnds(cur_rep);
+
+ if (addr != NULL) {
+ rspamd_inet_address_set_port(addr, cbd->data->port);
+ g_ptr_array_add(cbd->addrs, (void *) addr);
+ }
+ }
+
+ if (cbd->stage == http_map_resolve_host2) {
+ /* We have still one request pending */
+ cbd->stage = http_map_resolve_host1;
+ }
+ else if (cbd->stage == http_map_resolve_host1) {
+ cbd->stage = http_map_http_conn;
+ }
+ }
+ else if (cbd->stage < http_map_http_conn) {
+ if (cbd->stage == http_map_resolve_host2) {
+ /* We have still one request pending */
+ cbd->stage = http_map_resolve_host1;
+ }
+ else if (cbd->addrs->len == 0) {
+ /* We could not resolve host, so cowardly fail here */
+ msg_err_map("cannot resolve %s: %s", cbd->data->host,
+ rdns_strerror(reply->code));
+ cbd->periodic->errored = 1;
+ rspamd_map_process_periodic(cbd->periodic);
+ }
+ else {
+ /* We have at least one address, so we can continue... */
+ cbd->stage = http_map_http_conn;
+ }
+ }
+
+ if (cbd->stage == http_map_http_conn && cbd->addrs->len > 0) {
+ rspamd_ptr_array_shuffle(cbd->addrs);
+ gint idx = 0;
+ /*
+ * For the existing addr we can just select any address as we have
+ * data available
+ */
+ if (cbd->map->nelts > 0 && rspamd_random_double_fast() > 0.5) {
+ /* Already shuffled, use whatever is the first */
+ cbd->addr = (rspamd_inet_addr_t *) g_ptr_array_index(cbd->addrs, idx);
+ }
+ else {
+ /* Always prefer IPv4 as IPv6 is almost all the time broken */
+ g_ptr_array_sort(cbd->addrs, rspamd_map_dns_address_sort_func);
+ cbd->addr = (rspamd_inet_addr_t *) g_ptr_array_index(cbd->addrs, idx);
+ }
+
+ retry:
+ msg_debug_map("try open http connection to %s",
+ rspamd_inet_address_to_string_pretty(cbd->addr));
+ if (cbd->bk->protocol == MAP_PROTO_HTTPS) {
+ flags |= RSPAMD_HTTP_CLIENT_SSL;
+ }
+ cbd->conn = rspamd_http_connection_new_client(NULL,
+ NULL,
+ http_map_error,
+ http_map_finish,
+ flags,
+ cbd->addr);
+
+ if (cbd->conn != NULL) {
+ write_http_request(cbd);
+ }
+ else {
+ if (idx < cbd->addrs->len - 1) {
+ /* We can retry */
+ idx++;
+ rspamd_inet_addr_t *prev_addr = cbd->addr;
+ cbd->addr = (rspamd_inet_addr_t *) g_ptr_array_index(cbd->addrs, idx);
+ msg_info_map("cannot connect to %s to get data for %s: %s, retry with %s (%d of %d)",
+ rspamd_inet_address_to_string_pretty(prev_addr),
+ cbd->bk->uri,
+ strerror(errno),
+ rspamd_inet_address_to_string_pretty(cbd->addr),
+ idx + 1, cbd->addrs->len);
+ goto retry;
+ }
+ else {
+ /* Nothing else left */
+ cbd->periodic->errored = TRUE;
+ msg_err_map("error reading %s(%s): "
+ "connection with http server terminated incorrectly: %s",
+ cbd->bk->uri,
+ cbd->addr ? rspamd_inet_address_to_string_pretty(cbd->addr) : "",
+ strerror(errno));
+
+ rspamd_map_process_periodic(cbd->periodic);
+ }
+ }
+ }
+
+ MAP_RELEASE(cbd, "http_callback_data");
+}
+
+static gboolean
+rspamd_map_read_cached(struct rspamd_map *map, struct rspamd_map_backend *bk,
+ struct map_periodic_cbdata *periodic, const gchar *host)
+{
+ gsize mmap_len, len;
+ gpointer in;
+ struct http_map_data *data;
+
+ data = bk->data.hd;
+
+ in = rspamd_shmem_xmap(data->cache->shmem_name, PROT_READ, &mmap_len);
+
+ if (in == NULL) {
+ msg_err("cannot map cache from %s: %s", data->cache->shmem_name,
+ strerror(errno));
+ return FALSE;
+ }
+
+ if (mmap_len < data->cache->len) {
+ msg_err("cannot map cache from %s: truncated length %z, %z expected",
+ data->cache->shmem_name,
+ mmap_len, data->cache->len);
+ munmap(in, mmap_len);
+
+ return FALSE;
+ }
+
+ /*
+ * Len is taken from the shmem file size that can be larger than the
+ * actual data length, as we use shared memory as a growing buffer for the
+ * HTTP input.
+ * Hence, we need to use len from the saved cache data, counting that it is
+ * at least not more than the cached file length (this is checked above).
+ */
+ len = data->cache->len;
+
+ if (bk->is_compressed) {
+ ZSTD_DStream *zstream;
+ ZSTD_inBuffer zin;
+ ZSTD_outBuffer zout;
+ guchar *out;
+ gsize outlen, r;
+
+ zstream = ZSTD_createDStream();
+ ZSTD_initDStream(zstream);
+
+ zin.pos = 0;
+ zin.src = in;
+ zin.size = len;
+
+ if ((outlen = ZSTD_getDecompressedSize(zin.src, zin.size)) == 0) {
+ outlen = ZSTD_DStreamOutSize();
+ }
+
+ out = g_malloc(outlen);
+
+ zout.dst = out;
+ zout.pos = 0;
+ zout.size = outlen;
+
+ while (zin.pos < zin.size) {
+ r = ZSTD_decompressStream(zstream, &zout, &zin);
+
+ if (ZSTD_isError(r)) {
+ msg_err_map("%s: cannot decompress data: %s",
+ bk->uri,
+ ZSTD_getErrorName(r));
+ ZSTD_freeDStream(zstream);
+ g_free(out);
+ munmap(in, mmap_len);
+ return FALSE;
+ }
+
+ if (zout.pos == zout.size) {
+ /* We need to extend output buffer */
+ zout.size = zout.size * 2 + 1;
+ out = g_realloc(zout.dst, zout.size);
+ zout.dst = out;
+ }
+ }
+
+ ZSTD_freeDStream(zstream);
+ msg_info_map("%s: read map data cached %z bytes compressed, "
+ "%z uncompressed",
+ bk->uri,
+ len, zout.pos);
+ map->read_callback(out, zout.pos, &periodic->cbdata, TRUE);
+ g_free(out);
+ }
+ else {
+ msg_info_map("%s: read map data cached %z bytes", bk->uri, len);
+ map->read_callback(in, len, &periodic->cbdata, TRUE);
+ }
+
+ munmap(in, mmap_len);
+
+ return TRUE;
+}
+
+static gboolean
+rspamd_map_has_http_cached_file(struct rspamd_map *map,
+ struct rspamd_map_backend *bk)
+{
+ gchar path[PATH_MAX];
+ guchar digest[rspamd_cryptobox_HASHBYTES];
+ struct rspamd_config *cfg = map->cfg;
+ struct stat st;
+
+ if (cfg->maps_cache_dir == NULL || cfg->maps_cache_dir[0] == '\0') {
+ return FALSE;
+ }
+
+ rspamd_cryptobox_hash(digest, bk->uri, strlen(bk->uri), NULL, 0);
+ rspamd_snprintf(path, sizeof(path), "%s%c%*xs.map", cfg->maps_cache_dir,
+ G_DIR_SEPARATOR, 20, digest);
+
+ if (stat(path, &st) != -1 && st.st_size >
+ sizeof(struct rspamd_http_file_data)) {
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+static gboolean
+rspamd_map_save_http_cached_file(struct rspamd_map *map,
+ struct rspamd_map_backend *bk,
+ struct http_map_data *htdata,
+ const guchar *data,
+ gsize len)
+{
+ gchar path[PATH_MAX];
+ guchar digest[rspamd_cryptobox_HASHBYTES];
+ struct rspamd_config *cfg = map->cfg;
+ gint fd;
+ struct rspamd_http_file_data header;
+
+ if (cfg->maps_cache_dir == NULL || cfg->maps_cache_dir[0] == '\0') {
+ return FALSE;
+ }
+
+ rspamd_cryptobox_hash(digest, bk->uri, strlen(bk->uri), NULL, 0);
+ rspamd_snprintf(path, sizeof(path), "%s%c%*xs.map", cfg->maps_cache_dir,
+ G_DIR_SEPARATOR, 20, digest);
+
+ fd = rspamd_file_xopen(path, O_WRONLY | O_TRUNC | O_CREAT,
+ 00600, FALSE);
+
+ if (fd == -1) {
+ return FALSE;
+ }
+
+ if (!rspamd_file_lock(fd, FALSE)) {
+ msg_err_map("cannot lock file %s: %s", path, strerror(errno));
+ close(fd);
+
+ return FALSE;
+ }
+
+ memcpy(header.magic, rspamd_http_file_magic, sizeof(rspamd_http_file_magic));
+ header.mtime = htdata->last_modified;
+ header.next_check = map->next_check;
+ header.data_off = sizeof(header);
+
+ if (htdata->etag) {
+ header.data_off += RSPAMD_FSTRING_LEN(htdata->etag);
+ header.etag_len = RSPAMD_FSTRING_LEN(htdata->etag);
+ }
+ else {
+ header.etag_len = 0;
+ }
+
+ if (write(fd, &header, sizeof(header)) != sizeof(header)) {
+ msg_err_map("cannot write file %s (header stage): %s", path, strerror(errno));
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+
+ return FALSE;
+ }
+
+ if (header.etag_len > 0) {
+ if (write(fd, RSPAMD_FSTRING_DATA(htdata->etag), header.etag_len) !=
+ header.etag_len) {
+ msg_err_map("cannot write file %s (etag stage): %s", path, strerror(errno));
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+
+ return FALSE;
+ }
+ }
+
+ /* Now write the rest */
+ if (write(fd, data, len) != len) {
+ msg_err_map("cannot write file %s (data stage): %s", path, strerror(errno));
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+
+ return FALSE;
+ }
+
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+
+ msg_info_map("saved data from %s in %s, %uz bytes", bk->uri, path, len + sizeof(header) + header.etag_len);
+
+ return TRUE;
+}
+
+static gboolean
+rspamd_map_update_http_cached_file(struct rspamd_map *map,
+ struct rspamd_map_backend *bk,
+ struct http_map_data *htdata)
+{
+ gchar path[PATH_MAX];
+ guchar digest[rspamd_cryptobox_HASHBYTES];
+ struct rspamd_config *cfg = map->cfg;
+ gint fd;
+ struct rspamd_http_file_data header;
+
+ if (!rspamd_map_has_http_cached_file(map, bk)) {
+ return FALSE;
+ }
+
+ rspamd_cryptobox_hash(digest, bk->uri, strlen(bk->uri), NULL, 0);
+ rspamd_snprintf(path, sizeof(path), "%s%c%*xs.map", cfg->maps_cache_dir,
+ G_DIR_SEPARATOR, 20, digest);
+
+ fd = rspamd_file_xopen(path, O_WRONLY,
+ 00600, FALSE);
+
+ if (fd == -1) {
+ return FALSE;
+ }
+
+ if (!rspamd_file_lock(fd, FALSE)) {
+ msg_err_map("cannot lock file %s: %s", path, strerror(errno));
+ close(fd);
+
+ return FALSE;
+ }
+
+ memcpy(header.magic, rspamd_http_file_magic, sizeof(rspamd_http_file_magic));
+ header.mtime = htdata->last_modified;
+ header.next_check = map->next_check;
+ header.data_off = sizeof(header);
+
+ if (htdata->etag) {
+ header.data_off += RSPAMD_FSTRING_LEN(htdata->etag);
+ header.etag_len = RSPAMD_FSTRING_LEN(htdata->etag);
+ }
+ else {
+ header.etag_len = 0;
+ }
+
+ if (write(fd, &header, sizeof(header)) != sizeof(header)) {
+ msg_err_map("cannot update file %s (header stage): %s", path, strerror(errno));
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+
+ return FALSE;
+ }
+
+ if (header.etag_len > 0) {
+ if (write(fd, RSPAMD_FSTRING_DATA(htdata->etag), header.etag_len) !=
+ header.etag_len) {
+ msg_err_map("cannot update file %s (etag stage): %s", path, strerror(errno));
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+
+ return FALSE;
+ }
+ }
+
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+
+ return TRUE;
+}
+
+
+static gboolean
+rspamd_map_read_http_cached_file(struct rspamd_map *map,
+ struct rspamd_map_backend *bk,
+ struct http_map_data *htdata,
+ struct map_cb_data *cbdata)
+{
+ gchar path[PATH_MAX];
+ guchar digest[rspamd_cryptobox_HASHBYTES];
+ struct rspamd_config *cfg = map->cfg;
+ gint fd;
+ struct stat st;
+ struct rspamd_http_file_data header;
+
+ if (cfg->maps_cache_dir == NULL || cfg->maps_cache_dir[0] == '\0') {
+ return FALSE;
+ }
+
+ rspamd_cryptobox_hash(digest, bk->uri, strlen(bk->uri), NULL, 0);
+ rspamd_snprintf(path, sizeof(path), "%s%c%*xs.map", cfg->maps_cache_dir,
+ G_DIR_SEPARATOR, 20, digest);
+
+ fd = rspamd_file_xopen(path, O_RDONLY, 00600, FALSE);
+
+ if (fd == -1) {
+ return FALSE;
+ }
+
+ if (!rspamd_file_lock(fd, FALSE)) {
+ msg_err_map("cannot lock file %s: %s", path, strerror(errno));
+ close(fd);
+
+ return FALSE;
+ }
+
+ (void) fstat(fd, &st);
+
+ if (read(fd, &header, sizeof(header)) != sizeof(header)) {
+ msg_err_map("cannot read file %s (header stage): %s", path, strerror(errno));
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+
+ return FALSE;
+ }
+
+ if (memcmp(header.magic, rspamd_http_file_magic,
+ sizeof(rspamd_http_file_magic)) != 0) {
+ msg_warn_map("invalid or old version magic in file %s; ignore it", path);
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+
+ return FALSE;
+ }
+
+ double now = rspamd_get_calendar_ticks();
+
+ if (header.next_check > now) {
+ map->next_check = header.next_check;
+ }
+ else {
+ map->next_check = now;
+ }
+
+ htdata->last_modified = header.mtime;
+
+ if (header.etag_len > 0) {
+ rspamd_fstring_t *etag = rspamd_fstring_sized_new(header.etag_len);
+
+ if (read(fd, RSPAMD_FSTRING_DATA(etag), header.etag_len) != header.etag_len) {
+ msg_err_map("cannot read file %s (etag stage): %s", path,
+ strerror(errno));
+ rspamd_file_unlock(fd, FALSE);
+ rspamd_fstring_free(etag);
+ close(fd);
+
+ return FALSE;
+ }
+
+ etag->len = header.etag_len;
+
+ if (htdata->etag) {
+ /* FIXME: should be dealt somehow better */
+ msg_warn_map("etag is already defined as %V; cached is %V; ignore cached",
+ htdata->etag, etag);
+ rspamd_fstring_free(etag);
+ }
+ else {
+ htdata->etag = etag;
+ }
+ }
+
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+
+ /* Now read file data */
+ /* Perform buffered read: fail-safe */
+ if (!read_map_file_chunks(map, cbdata, path,
+ st.st_size - header.data_off, header.data_off)) {
+ return FALSE;
+ }
+
+ struct tm tm;
+ gchar ncheck_buf[32], lm_buf[32];
+
+ rspamd_localtime(map->next_check, &tm);
+ strftime(ncheck_buf, sizeof(ncheck_buf) - 1, "%Y-%m-%d %H:%M:%S", &tm);
+ rspamd_localtime(htdata->last_modified, &tm);
+ strftime(lm_buf, sizeof(lm_buf) - 1, "%Y-%m-%d %H:%M:%S", &tm);
+
+ msg_info_map("read cached data for %s from %s, %uz bytes; next check at: %s;"
+ " last modified on: %s; etag: %V",
+ bk->uri,
+ path,
+ (size_t) (st.st_size - header.data_off),
+ ncheck_buf,
+ lm_buf,
+ htdata->etag);
+
+ return TRUE;
+}
+
+/**
+ * Async HTTP callback
+ */
+static void
+rspamd_map_common_http_callback(struct rspamd_map *map,
+ struct rspamd_map_backend *bk,
+ struct map_periodic_cbdata *periodic,
+ gboolean check)
+{
+ struct http_map_data *data;
+ struct http_callback_data *cbd;
+ guint flags = RSPAMD_HTTP_CLIENT_SIMPLE | RSPAMD_HTTP_CLIENT_SHARED;
+
+ data = bk->data.hd;
+
+ if (g_atomic_int_get(&data->cache->available) == 1) {
+ /* Read cached data */
+ if (check) {
+ if (data->last_modified < data->cache->last_modified) {
+ msg_info_map("need to reread cached map triggered by %s "
+ "(%d our modify time, %d cached modify time)",
+ bk->uri,
+ (int) data->last_modified,
+ (int) data->cache->last_modified);
+ periodic->need_modify = TRUE;
+ /* Reset the whole chain */
+ periodic->cur_backend = 0;
+ rspamd_map_process_periodic(periodic);
+ }
+ else {
+ if (map->active_http) {
+ /* Check even if there is a cached version */
+ goto check;
+ }
+ else {
+ /* Switch to the next backend */
+ periodic->cur_backend++;
+ rspamd_map_process_periodic(periodic);
+ }
+ }
+
+ return;
+ }
+ else {
+ if (map->active_http &&
+ data->last_modified > data->cache->last_modified) {
+ goto check;
+ }
+ else if (rspamd_map_read_cached(map, bk, periodic, data->host)) {
+ /* Switch to the next backend */
+ periodic->cur_backend++;
+ data->last_modified = data->cache->last_modified;
+ rspamd_map_process_periodic(periodic);
+
+ return;
+ }
+ }
+ }
+ else if (!map->active_http) {
+ /* Switch to the next backend */
+ periodic->cur_backend++;
+ rspamd_map_process_periodic(periodic);
+
+ return;
+ }
+
+check:
+ cbd = g_malloc0(sizeof(struct http_callback_data));
+
+ cbd->event_loop = map->event_loop;
+ cbd->addrs = g_ptr_array_sized_new(4);
+ cbd->map = map;
+ cbd->data = data;
+ cbd->check = check;
+ cbd->periodic = periodic;
+ MAP_RETAIN(periodic, "periodic");
+ cbd->bk = bk;
+ MAP_RETAIN(bk, "rspamd_map_backend");
+ cbd->stage = http_map_terminated;
+ REF_INIT_RETAIN(cbd, free_http_cbdata);
+
+ msg_debug_map("%s map data from %s", check ? "checking" : "reading",
+ data->host);
+
+ /* Try address */
+ rspamd_inet_addr_t *addr = NULL;
+
+ if (rspamd_parse_inet_address(&addr, data->host,
+ strlen(data->host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
+ rspamd_inet_address_set_port(addr, cbd->data->port);
+ g_ptr_array_add(cbd->addrs, (void *) addr);
+
+ if (bk->protocol == MAP_PROTO_HTTPS) {
+ flags |= RSPAMD_HTTP_CLIENT_SSL;
+ }
+
+ cbd->conn = rspamd_http_connection_new_client(
+ NULL,
+ NULL,
+ http_map_error,
+ http_map_finish,
+ flags,
+ addr);
+
+ if (cbd->conn != NULL) {
+ cbd->stage = http_map_http_conn;
+ write_http_request(cbd);
+ cbd->addr = addr;
+ MAP_RELEASE(cbd, "http_callback_data");
+ }
+ else {
+ msg_warn_map("cannot load map: cannot connect to %s: %s",
+ data->host, strerror(errno));
+ MAP_RELEASE(cbd, "http_callback_data");
+ }
+
+ return;
+ }
+ else if (map->r->r) {
+ /* Send both A and AAAA requests */
+ guint nreq = 0;
+
+ if (rdns_make_request_full(map->r->r, rspamd_map_dns_callback, cbd,
+ map->cfg->dns_timeout, map->cfg->dns_retransmits, 1,
+ data->host, RDNS_REQUEST_A)) {
+ MAP_RETAIN(cbd, "http_callback_data");
+ nreq++;
+ }
+ if (rdns_make_request_full(map->r->r, rspamd_map_dns_callback, cbd,
+ map->cfg->dns_timeout, map->cfg->dns_retransmits, 1,
+ data->host, RDNS_REQUEST_AAAA)) {
+ MAP_RETAIN(cbd, "http_callback_data");
+ nreq++;
+ }
+
+ if (nreq == 2) {
+ cbd->stage = http_map_resolve_host2;
+ }
+ else if (nreq == 1) {
+ cbd->stage = http_map_resolve_host1;
+ }
+
+ map->tmp_dtor = free_http_cbdata_dtor;
+ map->tmp_dtor_data = cbd;
+ }
+ else {
+ msg_warn_map("cannot load map: DNS resolver is not initialized");
+ cbd->periodic->errored = TRUE;
+ }
+
+ MAP_RELEASE(cbd, "http_callback_data");
+}
+
+static void
+rspamd_map_http_check_callback(struct map_periodic_cbdata *cbd)
+{
+ struct rspamd_map *map;
+ struct rspamd_map_backend *bk;
+
+ map = cbd->map;
+ bk = g_ptr_array_index(cbd->map->backends, cbd->cur_backend);
+
+ rspamd_map_common_http_callback(map, bk, cbd, TRUE);
+}
+
+static void
+rspamd_map_http_read_callback(struct map_periodic_cbdata *cbd)
+{
+ struct rspamd_map *map;
+ struct rspamd_map_backend *bk;
+
+ map = cbd->map;
+ bk = g_ptr_array_index(cbd->map->backends, cbd->cur_backend);
+ rspamd_map_common_http_callback(map, bk, cbd, FALSE);
+}
+
+static void
+rspamd_map_file_check_callback(struct map_periodic_cbdata *periodic)
+{
+ struct rspamd_map *map;
+ struct file_map_data *data;
+ struct rspamd_map_backend *bk;
+
+ map = periodic->map;
+ bk = g_ptr_array_index(map->backends, periodic->cur_backend);
+ data = bk->data.fd;
+
+ if (data->need_modify) {
+ periodic->need_modify = TRUE;
+ periodic->cur_backend = 0;
+ data->need_modify = FALSE;
+
+ rspamd_map_process_periodic(periodic);
+
+ return;
+ }
+
+ map = periodic->map;
+ /* Switch to the next backend as the rest is handled by ev_stat */
+ periodic->cur_backend++;
+ rspamd_map_process_periodic(periodic);
+}
+
+static void
+rspamd_map_static_check_callback(struct map_periodic_cbdata *periodic)
+{
+ struct rspamd_map *map;
+ struct static_map_data *data;
+ struct rspamd_map_backend *bk;
+
+ map = periodic->map;
+ bk = g_ptr_array_index(map->backends, periodic->cur_backend);
+ data = bk->data.sd;
+
+ if (!data->processed) {
+ periodic->need_modify = TRUE;
+ periodic->cur_backend = 0;
+
+ rspamd_map_process_periodic(periodic);
+
+ return;
+ }
+
+ /* Switch to the next backend */
+ periodic->cur_backend++;
+ rspamd_map_process_periodic(periodic);
+}
+
+static void
+rspamd_map_file_read_callback(struct map_periodic_cbdata *periodic)
+{
+ struct rspamd_map *map;
+ struct file_map_data *data;
+ struct rspamd_map_backend *bk;
+
+ map = periodic->map;
+
+ bk = g_ptr_array_index(map->backends, periodic->cur_backend);
+ data = bk->data.fd;
+
+ msg_info_map("rereading map file %s", data->filename);
+
+ if (!read_map_file(map, data, bk, periodic)) {
+ periodic->errored = TRUE;
+ }
+
+ /* Switch to the next backend */
+ periodic->cur_backend++;
+ rspamd_map_process_periodic(periodic);
+}
+
+static void
+rspamd_map_static_read_callback(struct map_periodic_cbdata *periodic)
+{
+ struct rspamd_map *map;
+ struct static_map_data *data;
+ struct rspamd_map_backend *bk;
+
+ map = periodic->map;
+
+ bk = g_ptr_array_index(map->backends, periodic->cur_backend);
+ data = bk->data.sd;
+
+ msg_info_map("rereading static map");
+
+ if (!read_map_static(map, data, bk, periodic)) {
+ periodic->errored = TRUE;
+ }
+
+ /* Switch to the next backend */
+ periodic->cur_backend++;
+ rspamd_map_process_periodic(periodic);
+}
+
+static void
+rspamd_map_process_periodic(struct map_periodic_cbdata *cbd)
+{
+ struct rspamd_map_backend *bk;
+ struct rspamd_map *map;
+
+ map = cbd->map;
+ map->scheduled_check = NULL;
+
+ if (!map->file_only && !cbd->locked) {
+ if (!g_atomic_int_compare_and_exchange(cbd->map->locked,
+ 0, 1)) {
+ msg_debug_map(
+ "don't try to reread map %s as it is locked by other process, "
+ "will reread it later",
+ cbd->map->name);
+ rspamd_map_schedule_periodic(map, RSPAMD_MAP_SCHEDULE_LOCKED);
+ MAP_RELEASE(cbd, "periodic");
+
+ return;
+ }
+ else {
+ msg_debug_map("locked map %s", cbd->map->name);
+ cbd->locked = TRUE;
+ }
+ }
+
+ if (cbd->errored) {
+ /* We should not check other backends if some backend has failed*/
+ rspamd_map_schedule_periodic(cbd->map, RSPAMD_MAP_SCHEDULE_ERROR);
+
+ if (cbd->locked) {
+ g_atomic_int_set(cbd->map->locked, 0);
+ cbd->locked = FALSE;
+ }
+
+ /* Also set error flag for the map consumer */
+ cbd->cbdata.errored = true;
+
+ msg_debug_map("unlocked map %s, refcount=%d", cbd->map->name,
+ cbd->ref.refcount);
+ MAP_RELEASE(cbd, "periodic");
+
+ return;
+ }
+
+ /* For each backend we need to check for modifications */
+ if (cbd->cur_backend >= cbd->map->backends->len) {
+ /* Last backend */
+ msg_debug_map("finished map: %d of %d", cbd->cur_backend,
+ cbd->map->backends->len);
+ MAP_RELEASE(cbd, "periodic");
+
+ return;
+ }
+
+ if (cbd->map->wrk && cbd->map->wrk->state == rspamd_worker_state_running) {
+ bk = g_ptr_array_index(cbd->map->backends, cbd->cur_backend);
+ g_assert(bk != NULL);
+
+ if (cbd->need_modify) {
+ /* Load data from the next backend */
+ switch (bk->protocol) {
+ case MAP_PROTO_HTTP:
+ case MAP_PROTO_HTTPS:
+ rspamd_map_http_read_callback(cbd);
+ break;
+ case MAP_PROTO_FILE:
+ rspamd_map_file_read_callback(cbd);
+ break;
+ case MAP_PROTO_STATIC:
+ rspamd_map_static_read_callback(cbd);
+ break;
+ }
+ }
+ else {
+ /* Check the next backend */
+ switch (bk->protocol) {
+ case MAP_PROTO_HTTP:
+ case MAP_PROTO_HTTPS:
+ rspamd_map_http_check_callback(cbd);
+ break;
+ case MAP_PROTO_FILE:
+ rspamd_map_file_check_callback(cbd);
+ break;
+ case MAP_PROTO_STATIC:
+ rspamd_map_static_check_callback(cbd);
+ break;
+ }
+ }
+ }
+}
+
+static void
+rspamd_map_on_stat(struct ev_loop *loop, ev_stat *w, int revents)
+{
+ struct rspamd_map *map = (struct rspamd_map *) w->data;
+
+ if (w->attr.st_nlink > 0) {
+ msg_info_map("old mtime is %t (size = %Hz), "
+ "new mtime is %t (size = %Hz) for map file %s",
+ w->prev.st_mtime, (gsize) w->prev.st_size,
+ w->attr.st_mtime, (gsize) w->attr.st_size,
+ w->path);
+
+ /* Fire need modify flag */
+ struct rspamd_map_backend *bk;
+ guint i;
+
+ PTR_ARRAY_FOREACH(map->backends, i, bk)
+ {
+ if (bk->protocol == MAP_PROTO_FILE) {
+ bk->data.fd->need_modify = TRUE;
+ }
+ }
+
+ map->next_check = 0;
+
+ if (map->scheduled_check) {
+ ev_timer_stop(map->event_loop, &map->scheduled_check->ev);
+ MAP_RELEASE(map->scheduled_check, "rspamd_map_on_stat");
+ map->scheduled_check = NULL;
+ }
+
+ rspamd_map_schedule_periodic(map, RSPAMD_MAP_SCHEDULE_INIT);
+ }
+}
+
+/* Start watching event for all maps */
+void rspamd_map_watch(struct rspamd_config *cfg,
+ struct ev_loop *event_loop,
+ struct rspamd_dns_resolver *resolver,
+ struct rspamd_worker *worker,
+ enum rspamd_map_watch_type how)
+{
+ GList *cur = cfg->maps;
+ struct rspamd_map *map;
+ struct rspamd_map_backend *bk;
+ guint i;
+
+ g_assert(how > RSPAMD_MAP_WATCH_MIN && how < RSPAMD_MAP_WATCH_MAX);
+
+ /* First of all do synced read of data */
+ while (cur) {
+ map = cur->data;
+ map->event_loop = event_loop;
+ map->r = resolver;
+
+ if (map->wrk == NULL && how != RSPAMD_MAP_WATCH_WORKER) {
+ /* Generic scanner map */
+ map->wrk = worker;
+
+ if (how == RSPAMD_MAP_WATCH_PRIMARY_CONTROLLER) {
+ map->active_http = TRUE;
+ }
+ else {
+ map->active_http = FALSE;
+ }
+ }
+ else if (map->wrk != NULL && map->wrk == worker) {
+ /* Map is bound to a specific worker */
+ map->active_http = TRUE;
+ }
+ else {
+ /* Skip map for this worker as irrelevant */
+ cur = g_list_next(cur);
+ continue;
+ }
+
+ if (!map->active_http) {
+ /* Check cached version more frequently as it is cheap */
+
+ if (map->poll_timeout >= cfg->map_timeout &&
+ cfg->map_file_watch_multiplier < 1.0) {
+ map->poll_timeout =
+ map->poll_timeout * cfg->map_file_watch_multiplier;
+ }
+ }
+
+ map->file_only = TRUE;
+ map->static_only = TRUE;
+
+ PTR_ARRAY_FOREACH(map->backends, i, bk)
+ {
+ bk->event_loop = event_loop;
+
+ if (bk->protocol == MAP_PROTO_FILE) {
+ struct file_map_data *data;
+
+ data = bk->data.fd;
+
+ if (map->user_data == NULL || *map->user_data == NULL) {
+ /* Map has not been read, init it's reading if possible */
+ struct stat st;
+
+ if (stat(data->filename, &st) != -1) {
+ data->need_modify = TRUE;
+ }
+ }
+
+ ev_stat_init(&data->st_ev, rspamd_map_on_stat,
+ data->filename, map->poll_timeout * cfg->map_file_watch_multiplier);
+ data->st_ev.data = map;
+ ev_stat_start(event_loop, &data->st_ev);
+ map->static_only = FALSE;
+ }
+ else if ((bk->protocol == MAP_PROTO_HTTP ||
+ bk->protocol == MAP_PROTO_HTTPS)) {
+ if (map->active_http) {
+ map->non_trivial = TRUE;
+ }
+
+ map->static_only = FALSE;
+ map->file_only = FALSE;
+ }
+ }
+
+ rspamd_map_schedule_periodic(map, RSPAMD_MAP_SCHEDULE_INIT);
+
+ cur = g_list_next(cur);
+ }
+}
+
+void rspamd_map_preload(struct rspamd_config *cfg)
+{
+ GList *cur = cfg->maps;
+ struct rspamd_map *map;
+ struct rspamd_map_backend *bk;
+ guint i;
+ gboolean map_ok;
+
+ /* First of all do synced read of data */
+ while (cur) {
+ map = cur->data;
+ map_ok = TRUE;
+
+ PTR_ARRAY_FOREACH(map->backends, i, bk)
+ {
+ if (!(bk->protocol == MAP_PROTO_FILE ||
+ bk->protocol == MAP_PROTO_STATIC)) {
+
+ if (bk->protocol == MAP_PROTO_HTTP ||
+ bk->protocol == MAP_PROTO_HTTPS) {
+ if (!rspamd_map_has_http_cached_file(map, bk)) {
+
+ if (!map->fallback_backend) {
+ map_ok = FALSE;
+ }
+ break;
+ }
+ else {
+ continue; /* We are yet fine */
+ }
+ }
+ map_ok = FALSE;
+ break;
+ }
+ }
+
+ if (map_ok) {
+ struct map_periodic_cbdata fake_cbd;
+ gboolean succeed = TRUE;
+
+ memset(&fake_cbd, 0, sizeof(fake_cbd));
+ fake_cbd.cbdata.state = 0;
+ fake_cbd.cbdata.prev_data = *map->user_data;
+ fake_cbd.cbdata.cur_data = NULL;
+ fake_cbd.cbdata.map = map;
+ fake_cbd.map = map;
+
+ PTR_ARRAY_FOREACH(map->backends, i, bk)
+ {
+ fake_cbd.cur_backend = i;
+
+ if (bk->protocol == MAP_PROTO_FILE) {
+ if (!read_map_file(map, bk->data.fd, bk, &fake_cbd)) {
+ succeed = FALSE;
+ break;
+ }
+ }
+ else if (bk->protocol == MAP_PROTO_STATIC) {
+ if (!read_map_static(map, bk->data.sd, bk, &fake_cbd)) {
+ succeed = FALSE;
+ break;
+ }
+ }
+ else if (bk->protocol == MAP_PROTO_HTTP ||
+ bk->protocol == MAP_PROTO_HTTPS) {
+ if (!rspamd_map_read_http_cached_file(map, bk, bk->data.hd,
+ &fake_cbd.cbdata)) {
+
+ if (map->fallback_backend) {
+ /* Try fallback */
+ g_assert(map->fallback_backend->protocol ==
+ MAP_PROTO_FILE);
+ if (!read_map_file(map,
+ map->fallback_backend->data.fd,
+ map->fallback_backend, &fake_cbd)) {
+ succeed = FALSE;
+ break;
+ }
+ }
+ else {
+ succeed = FALSE;
+ break;
+ }
+ }
+ }
+ else {
+ g_assert_not_reached();
+ }
+ }
+
+ if (succeed) {
+ map->fin_callback(&fake_cbd.cbdata, map->user_data);
+
+ if (map->on_load_function) {
+ map->on_load_function(map, map->on_load_ud);
+ }
+ }
+ else {
+ msg_info_map("preload of %s failed", map->name);
+ }
+ }
+
+ cur = g_list_next(cur);
+ }
+}
+
+void rspamd_map_remove_all(struct rspamd_config *cfg)
+{
+ struct rspamd_map *map;
+ GList *cur;
+ struct rspamd_map_backend *bk;
+ struct map_cb_data cbdata;
+ guint i;
+
+ for (cur = cfg->maps; cur != NULL; cur = g_list_next(cur)) {
+ map = cur->data;
+
+ if (map->tmp_dtor) {
+ map->tmp_dtor(map->tmp_dtor_data);
+ }
+
+ if (map->dtor) {
+ cbdata.prev_data = NULL;
+ cbdata.map = map;
+ cbdata.cur_data = *map->user_data;
+
+ map->dtor(&cbdata);
+ *map->user_data = NULL;
+ }
+
+ if (map->on_load_ud_dtor) {
+ map->on_load_ud_dtor(map->on_load_ud);
+ }
+
+ for (i = 0; i < map->backends->len; i++) {
+ bk = g_ptr_array_index(map->backends, i);
+
+ MAP_RELEASE(bk, "rspamd_map_backend");
+ }
+
+ if (map->fallback_backend) {
+ MAP_RELEASE(map->fallback_backend, "rspamd_map_backend");
+ }
+ }
+
+ g_list_free(cfg->maps);
+ cfg->maps = NULL;
+}
+
+static const gchar *
+rspamd_map_check_proto(struct rspamd_config *cfg,
+ const gchar *map_line, struct rspamd_map_backend *bk)
+{
+ const gchar *pos = map_line, *end, *end_key;
+
+ g_assert(bk != NULL);
+ g_assert(pos != NULL);
+
+ end = pos + strlen(pos);
+
+ /* Static check */
+ if (g_ascii_strcasecmp(pos, "static") == 0) {
+ bk->protocol = MAP_PROTO_STATIC;
+ bk->uri = g_strdup(pos);
+
+ return pos;
+ }
+ else if (g_ascii_strcasecmp(pos, "zst+static") == 0) {
+ bk->protocol = MAP_PROTO_STATIC;
+ bk->uri = g_strdup(pos + 4);
+ bk->is_compressed = TRUE;
+
+ return pos + 4;
+ }
+
+ for (;;) {
+ if (g_ascii_strncasecmp(pos, "sign+", sizeof("sign+") - 1) == 0) {
+ bk->is_signed = TRUE;
+ pos += sizeof("sign+") - 1;
+ }
+ else if (g_ascii_strncasecmp(pos, "fallback+", sizeof("fallback+") - 1) == 0) {
+ bk->is_fallback = TRUE;
+ pos += sizeof("fallback+") - 1;
+ }
+ else if (g_ascii_strncasecmp(pos, "key=", sizeof("key=") - 1) == 0) {
+ pos += sizeof("key=") - 1;
+ end_key = memchr(pos, '+', end - pos);
+
+ if (end_key != NULL) {
+ bk->trusted_pubkey = rspamd_pubkey_from_base32(pos, end_key - pos,
+ RSPAMD_KEYPAIR_SIGN, RSPAMD_CRYPTOBOX_MODE_25519);
+
+ if (bk->trusted_pubkey == NULL) {
+ msg_err_config("cannot read pubkey from map: %s",
+ map_line);
+ return NULL;
+ }
+ pos = end_key + 1;
+ }
+ else if (end - pos > 64) {
+ /* Try hex encoding */
+ bk->trusted_pubkey = rspamd_pubkey_from_hex(pos, 64,
+ RSPAMD_KEYPAIR_SIGN, RSPAMD_CRYPTOBOX_MODE_25519);
+
+ if (bk->trusted_pubkey == NULL) {
+ msg_err_config("cannot read pubkey from map: %s",
+ map_line);
+ return NULL;
+ }
+ pos += 64;
+ }
+ else {
+ msg_err_config("cannot read pubkey from map: %s",
+ map_line);
+ return NULL;
+ }
+
+ if (*pos == '+' || *pos == ':') {
+ pos++;
+ }
+ }
+ else {
+ /* No known flags */
+ break;
+ }
+ }
+
+ bk->protocol = MAP_PROTO_FILE;
+
+ if (g_ascii_strncasecmp(pos, "http://", sizeof("http://") - 1) == 0) {
+ bk->protocol = MAP_PROTO_HTTP;
+ /* Include http:// */
+ bk->uri = g_strdup(pos);
+ pos += sizeof("http://") - 1;
+ }
+ else if (g_ascii_strncasecmp(pos, "https://", sizeof("https://") - 1) == 0) {
+ bk->protocol = MAP_PROTO_HTTPS;
+ /* Include https:// */
+ bk->uri = g_strdup(pos);
+ pos += sizeof("https://") - 1;
+ }
+ else if (g_ascii_strncasecmp(pos, "file://", sizeof("file://") - 1) == 0) {
+ pos += sizeof("file://") - 1;
+ /* Exclude file:// */
+ bk->uri = g_strdup(pos);
+ }
+ else if (*pos == '/') {
+ /* Trivial file case */
+ bk->uri = g_strdup(pos);
+ }
+ else {
+ msg_err_config("invalid map fetching protocol: %s", map_line);
+
+ return NULL;
+ }
+
+ if (bk->protocol != MAP_PROTO_FILE && bk->is_signed) {
+ msg_err_config("signed maps are no longer supported for HTTP(s): %s", map_line);
+ }
+
+ return pos;
+}
+
+gboolean
+rspamd_map_is_map(const gchar *map_line)
+{
+ gboolean ret = FALSE;
+
+ g_assert(map_line != NULL);
+
+ if (map_line[0] == '/') {
+ ret = TRUE;
+ }
+ else if (g_ascii_strncasecmp(map_line, "sign+", sizeof("sign+") - 1) == 0) {
+ ret = TRUE;
+ }
+ else if (g_ascii_strncasecmp(map_line, "fallback+", sizeof("fallback+") - 1) == 0) {
+ ret = TRUE;
+ }
+ else if (g_ascii_strncasecmp(map_line, "file://", sizeof("file://") - 1) == 0) {
+ ret = TRUE;
+ }
+ else if (g_ascii_strncasecmp(map_line, "http://", sizeof("http://") - 1) == 0) {
+ ret = TRUE;
+ }
+ else if (g_ascii_strncasecmp(map_line, "https://", sizeof("https://") - 1) == 0) {
+ ret = TRUE;
+ }
+
+ return ret;
+}
+
+static void
+rspamd_map_backend_dtor(struct rspamd_map_backend *bk)
+{
+ switch (bk->protocol) {
+ case MAP_PROTO_FILE:
+ if (bk->data.fd) {
+ ev_stat_stop(bk->event_loop, &bk->data.fd->st_ev);
+ g_free(bk->data.fd->filename);
+ g_free(bk->data.fd);
+ }
+ break;
+ case MAP_PROTO_STATIC:
+ if (bk->data.sd) {
+ if (bk->data.sd->data) {
+ g_free(bk->data.sd->data);
+ }
+
+ g_free(bk->data.sd);
+ }
+ break;
+ case MAP_PROTO_HTTP:
+ case MAP_PROTO_HTTPS:
+ if (bk->data.hd) {
+ struct http_map_data *data = bk->data.hd;
+
+ g_free(data->host);
+ g_free(data->path);
+ g_free(data->rest);
+
+ if (data->userinfo) {
+ g_free(data->userinfo);
+ }
+
+ if (data->etag) {
+ rspamd_fstring_free(data->etag);
+ }
+
+ /*
+ * Clear cached file, but check if a worker is an active http worker
+ * as cur_cache_cbd is meaningful merely for active worker, who actually
+ * owns the cache
+ */
+ if (bk->map && bk->map->active_http) {
+ if (g_atomic_int_compare_and_exchange(&data->cache->available, 1, 0)) {
+ if (data->cur_cache_cbd) {
+ msg_info("clear shared memory cache for a map in %s as backend \"%s\" is closing",
+ data->cur_cache_cbd->shm->shm_name,
+ bk->uri);
+ MAP_RELEASE(data->cur_cache_cbd->shm,
+ "rspamd_http_map_cached_cbdata");
+ ev_timer_stop(data->cur_cache_cbd->event_loop,
+ &data->cur_cache_cbd->timeout);
+ g_free(data->cur_cache_cbd);
+ data->cur_cache_cbd = NULL;
+ }
+ }
+ }
+
+ g_free(bk->data.hd);
+ }
+ break;
+ }
+
+ if (bk->trusted_pubkey) {
+ rspamd_pubkey_unref(bk->trusted_pubkey);
+ }
+
+ g_free(bk->uri);
+ g_free(bk);
+}
+
+static struct rspamd_map_backend *
+rspamd_map_parse_backend(struct rspamd_config *cfg, const gchar *map_line)
+{
+ struct rspamd_map_backend *bk;
+ struct file_map_data *fdata = NULL;
+ struct http_map_data *hdata = NULL;
+ struct static_map_data *sdata = NULL;
+ struct http_parser_url up;
+ const gchar *end, *p;
+ rspamd_ftok_t tok;
+
+ bk = g_malloc0(sizeof(*bk));
+ REF_INIT_RETAIN(bk, rspamd_map_backend_dtor);
+
+ if (!rspamd_map_check_proto(cfg, map_line, bk)) {
+ goto err;
+ }
+
+ if (bk->is_fallback && bk->protocol != MAP_PROTO_FILE) {
+ msg_err_config("fallback backend must be file for %s", bk->uri);
+
+ goto err;
+ }
+
+ end = map_line + strlen(map_line);
+ if (end - map_line > 5) {
+ p = end - 5;
+ if (g_ascii_strcasecmp(p, ".zstd") == 0) {
+ bk->is_compressed = TRUE;
+ }
+ p = end - 4;
+ if (g_ascii_strcasecmp(p, ".zst") == 0) {
+ bk->is_compressed = TRUE;
+ }
+ }
+
+ /* Now check for each proto separately */
+ if (bk->protocol == MAP_PROTO_FILE) {
+ fdata = g_malloc0(sizeof(struct file_map_data));
+
+ if (access(bk->uri, R_OK) == -1) {
+ if (errno != ENOENT) {
+ msg_err_config("cannot open file '%s': %s", bk->uri, strerror(errno));
+ goto err;
+ }
+
+ msg_info_config(
+ "map '%s' is not found, but it can be loaded automatically later",
+ bk->uri);
+ }
+
+ fdata->filename = g_strdup(bk->uri);
+ bk->data.fd = fdata;
+ }
+ else if (bk->protocol == MAP_PROTO_HTTP || bk->protocol == MAP_PROTO_HTTPS) {
+ hdata = g_malloc0(sizeof(struct http_map_data));
+
+ memset(&up, 0, sizeof(up));
+ if (http_parser_parse_url(bk->uri, strlen(bk->uri), FALSE,
+ &up) != 0) {
+ msg_err_config("cannot parse HTTP url: %s", bk->uri);
+ goto err;
+ }
+ else {
+ if (!(up.field_set & 1u << UF_HOST)) {
+ msg_err_config("cannot parse HTTP url: %s: no host", bk->uri);
+ goto err;
+ }
+
+ tok.begin = bk->uri + up.field_data[UF_HOST].off;
+ tok.len = up.field_data[UF_HOST].len;
+ hdata->host = rspamd_ftokdup(&tok);
+
+ if (up.field_set & (1u << UF_PORT)) {
+ hdata->port = up.port;
+ }
+ else {
+ if (bk->protocol == MAP_PROTO_HTTP) {
+ hdata->port = 80;
+ }
+ else {
+ hdata->port = 443;
+ }
+ }
+
+ if (up.field_set & (1u << UF_PATH)) {
+ tok.begin = bk->uri + up.field_data[UF_PATH].off;
+ tok.len = up.field_data[UF_PATH].len;
+
+ hdata->path = rspamd_ftokdup(&tok);
+
+ /* We also need to check query + fragment */
+ if (up.field_set & ((1u << UF_QUERY) | (1u << UF_FRAGMENT))) {
+ tok.begin = bk->uri + up.field_data[UF_PATH].off +
+ up.field_data[UF_PATH].len;
+ tok.len = strlen(tok.begin);
+ hdata->rest = rspamd_ftokdup(&tok);
+ }
+ else {
+ hdata->rest = g_strdup("");
+ }
+ }
+
+ if (up.field_set & (1u << UF_USERINFO)) {
+ /* Create authorisation header for basic auth */
+ guint len = sizeof("Basic ") +
+ up.field_data[UF_USERINFO].len * 8 / 5 + 4;
+ hdata->userinfo = g_malloc(len);
+ rspamd_snprintf(hdata->userinfo, len, "Basic %*Bs",
+ (int) up.field_data[UF_USERINFO].len,
+ bk->uri + up.field_data[UF_USERINFO].off);
+
+ msg_debug("added userinfo for the map from the URL: %s", hdata->host);
+ }
+ else {
+ /* Try to obtain authentication data from options in the configuration */
+ const ucl_object_t *auth_obj, *opts_obj;
+
+ opts_obj = ucl_object_lookup(cfg->cfg_ucl_obj, "options");
+ if (opts_obj != NULL) {
+ auth_obj = ucl_object_lookup(opts_obj, "http_auth");
+ if (auth_obj != NULL && ucl_object_type(auth_obj) == UCL_OBJECT) {
+ const ucl_object_t *host_obj;
+
+ /*
+ * Search first by the full URL and then by the host part
+ */
+ host_obj = ucl_object_lookup(auth_obj, map_line);
+
+ if (host_obj == NULL) {
+ host_obj = ucl_object_lookup(auth_obj, hdata->host);
+ }
+
+ if (host_obj != NULL && ucl_object_type(host_obj) == UCL_OBJECT) {
+ const ucl_object_t *user_obj, *password_obj;
+
+ user_obj = ucl_object_lookup(host_obj, "user");
+ password_obj = ucl_object_lookup(host_obj, "password");
+
+ if (user_obj != NULL && password_obj != NULL &&
+ ucl_object_type(user_obj) == UCL_STRING &&
+ ucl_object_type(password_obj) == UCL_STRING) {
+
+ gchar *tmpbuf;
+ unsigned tlen;
+
+ /* User + password + ':' */
+ tlen = strlen(ucl_object_tostring(user_obj)) +
+ strlen(ucl_object_tostring(password_obj)) + 1;
+ tmpbuf = g_malloc(tlen + 1);
+ rspamd_snprintf(tmpbuf, tlen + 1, "%s:%s",
+ ucl_object_tostring(user_obj),
+ ucl_object_tostring(password_obj));
+ /* Base64 encoding is not so greedy, but we add some space for simplicity */
+ tlen *= 2;
+ tlen += sizeof("Basic ") - 1;
+ hdata->userinfo = g_malloc(tlen + 1);
+ rspamd_snprintf(hdata->userinfo, tlen + 1, "Basic %Bs", tmpbuf);
+ g_free(tmpbuf);
+ msg_debug("added userinfo for the map from the configuration: %s", map_line);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ hdata->cache = rspamd_mempool_alloc0_shared(cfg->cfg_pool,
+ sizeof(*hdata->cache));
+
+ bk->data.hd = hdata;
+ }
+ else if (bk->protocol == MAP_PROTO_STATIC) {
+ sdata = g_malloc0(sizeof(*sdata));
+ bk->data.sd = sdata;
+ }
+
+ bk->id = rspamd_cryptobox_fast_hash_specific(RSPAMD_CRYPTOBOX_T1HA,
+ bk->uri, strlen(bk->uri),
+ 0xdeadbabe);
+
+ return bk;
+
+err:
+ MAP_RELEASE(bk, "rspamd_map_backend");
+
+ if (hdata) {
+ g_free(hdata);
+ }
+
+ if (fdata) {
+ g_free(fdata);
+ }
+
+ if (sdata) {
+ g_free(sdata);
+ }
+
+ return NULL;
+}
+
+static void
+rspamd_map_calculate_hash(struct rspamd_map *map)
+{
+ struct rspamd_map_backend *bk;
+ guint i;
+ rspamd_cryptobox_hash_state_t st;
+ gchar *cksum_encoded, cksum[rspamd_cryptobox_HASHBYTES];
+
+ rspamd_cryptobox_hash_init(&st, NULL, 0);
+
+ for (i = 0; i < map->backends->len; i++) {
+ bk = g_ptr_array_index(map->backends, i);
+ rspamd_cryptobox_hash_update(&st, bk->uri, strlen(bk->uri));
+ }
+
+ rspamd_cryptobox_hash_final(&st, cksum);
+ cksum_encoded = rspamd_encode_base32(cksum, sizeof(cksum), RSPAMD_BASE32_DEFAULT);
+ rspamd_strlcpy(map->tag, cksum_encoded, sizeof(map->tag));
+ g_free(cksum_encoded);
+}
+
+static gboolean
+rspamd_map_add_static_string(struct rspamd_config *cfg,
+ const ucl_object_t *elt,
+ GString *target)
+{
+ gsize sz;
+ const gchar *dline;
+
+ if (ucl_object_type(elt) != UCL_STRING) {
+ msg_err_config("map has static backend but `data` is "
+ "not string like: %s",
+ ucl_object_type_to_string(elt->type));
+ return FALSE;
+ }
+
+ /* Otherwise, we copy data to the backend */
+ dline = ucl_object_tolstring(elt, &sz);
+
+ if (sz == 0) {
+ msg_err_config("map has static backend but empty no data");
+ return FALSE;
+ }
+
+ g_string_append_len(target, dline, sz);
+ g_string_append_c(target, '\n');
+
+ return TRUE;
+}
+
+struct rspamd_map *
+rspamd_map_add(struct rspamd_config *cfg,
+ const gchar *map_line,
+ const gchar *description,
+ map_cb_t read_callback,
+ map_fin_cb_t fin_callback,
+ map_dtor_t dtor,
+ void **user_data,
+ struct rspamd_worker *worker,
+ int flags)
+{
+ struct rspamd_map *map;
+ struct rspamd_map_backend *bk;
+
+ bk = rspamd_map_parse_backend(cfg, map_line);
+ if (bk == NULL) {
+ return NULL;
+ }
+
+ if (bk->is_fallback) {
+ msg_err_config("cannot add map with fallback only backend: %s", bk->uri);
+ REF_RELEASE(bk);
+
+ return NULL;
+ }
+
+ map = rspamd_mempool_alloc0(cfg->cfg_pool, sizeof(struct rspamd_map));
+ map->read_callback = read_callback;
+ map->fin_callback = fin_callback;
+ map->dtor = dtor;
+ map->user_data = user_data;
+ map->cfg = cfg;
+ map->id = rspamd_random_uint64_fast();
+ map->locked =
+ rspamd_mempool_alloc0_shared(cfg->cfg_pool, sizeof(gint));
+ map->backends = g_ptr_array_sized_new(1);
+ map->wrk = worker;
+ rspamd_mempool_add_destructor(cfg->cfg_pool, rspamd_ptr_array_free_hard,
+ map->backends);
+ g_ptr_array_add(map->backends, bk);
+ map->name = rspamd_mempool_strdup(cfg->cfg_pool, map_line);
+ map->no_file_read = (flags & RSPAMD_MAP_FILE_NO_READ);
+
+ if (bk->protocol == MAP_PROTO_FILE) {
+ map->poll_timeout = (cfg->map_timeout * cfg->map_file_watch_multiplier);
+ }
+ else {
+ map->poll_timeout = cfg->map_timeout;
+ }
+
+ if (description != NULL) {
+ map->description = rspamd_mempool_strdup(cfg->cfg_pool, description);
+ }
+
+ rspamd_map_calculate_hash(map);
+ msg_info_map("added map %s", bk->uri);
+ bk->map = map;
+
+ cfg->maps = g_list_prepend(cfg->maps, map);
+
+ return map;
+}
+
+struct rspamd_map *
+rspamd_map_add_fake(struct rspamd_config *cfg,
+ const gchar *description,
+ const gchar *name)
+{
+ struct rspamd_map *map;
+
+ map = rspamd_mempool_alloc0(cfg->cfg_pool, sizeof(struct rspamd_map));
+ map->cfg = cfg;
+ map->id = rspamd_random_uint64_fast();
+ map->name = rspamd_mempool_strdup(cfg->cfg_pool, name);
+ map->user_data = (void **) &map; /* to prevent null pointer dereferencing */
+
+ if (description != NULL) {
+ map->description = rspamd_mempool_strdup(cfg->cfg_pool, description);
+ }
+
+ return map;
+}
+
+static inline void
+rspamd_map_add_backend(struct rspamd_map *map, struct rspamd_map_backend *bk)
+{
+ if (bk->is_fallback) {
+ if (map->fallback_backend) {
+ msg_warn_map("redefining fallback backend from %s to %s",
+ map->fallback_backend->uri, bk->uri);
+ }
+
+ map->fallback_backend = bk;
+ }
+ else {
+ g_ptr_array_add(map->backends, bk);
+ }
+
+ bk->map = map;
+}
+
+struct rspamd_map *
+rspamd_map_add_from_ucl(struct rspamd_config *cfg,
+ const ucl_object_t *obj,
+ const gchar *description,
+ map_cb_t read_callback,
+ map_fin_cb_t fin_callback,
+ map_dtor_t dtor,
+ void **user_data,
+ struct rspamd_worker *worker,
+ gint flags)
+{
+ ucl_object_iter_t it = NULL;
+ const ucl_object_t *cur, *elt;
+ struct rspamd_map *map;
+ struct rspamd_map_backend *bk;
+ guint i;
+
+ g_assert(obj != NULL);
+
+ if (ucl_object_type(obj) == UCL_STRING) {
+ /* Just a plain string */
+ return rspamd_map_add(cfg, ucl_object_tostring(obj), description,
+ read_callback, fin_callback, dtor, user_data, worker, flags);
+ }
+
+ map = rspamd_mempool_alloc0(cfg->cfg_pool, sizeof(struct rspamd_map));
+ map->read_callback = read_callback;
+ map->fin_callback = fin_callback;
+ map->dtor = dtor;
+ map->user_data = user_data;
+ map->cfg = cfg;
+ map->id = rspamd_random_uint64_fast();
+ map->locked =
+ rspamd_mempool_alloc0_shared(cfg->cfg_pool, sizeof(gint));
+ map->backends = g_ptr_array_new();
+ map->wrk = worker;
+ map->no_file_read = (flags & RSPAMD_MAP_FILE_NO_READ);
+ rspamd_mempool_add_destructor(cfg->cfg_pool, rspamd_ptr_array_free_hard,
+ map->backends);
+ map->poll_timeout = cfg->map_timeout;
+
+ if (description) {
+ map->description = rspamd_mempool_strdup(cfg->cfg_pool, description);
+ }
+
+ if (ucl_object_type(obj) == UCL_ARRAY) {
+ /* Add array of maps as multiple backends */
+ while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) {
+ if (ucl_object_type(cur) == UCL_STRING) {
+ bk = rspamd_map_parse_backend(cfg, ucl_object_tostring(cur));
+
+ if (bk != NULL) {
+ rspamd_map_add_backend(map, bk);
+
+ if (!map->name) {
+ map->name = rspamd_mempool_strdup(cfg->cfg_pool,
+ ucl_object_tostring(cur));
+ }
+ }
+ }
+ else {
+ msg_err_config("bad map element type: %s",
+ ucl_object_type_to_string(ucl_object_type(cur)));
+ }
+ }
+
+ if (map->backends->len == 0) {
+ msg_err_config("map has no urls to be loaded: empty list");
+ goto err;
+ }
+ }
+ else if (ucl_object_type(obj) == UCL_OBJECT) {
+ elt = ucl_object_lookup(obj, "name");
+ if (elt && ucl_object_type(elt) == UCL_STRING) {
+ map->name = rspamd_mempool_strdup(cfg->cfg_pool,
+ ucl_object_tostring(elt));
+ }
+
+ elt = ucl_object_lookup(obj, "description");
+ if (elt && ucl_object_type(elt) == UCL_STRING) {
+ map->description = rspamd_mempool_strdup(cfg->cfg_pool,
+ ucl_object_tostring(elt));
+ }
+
+ elt = ucl_object_lookup_any(obj, "timeout", "poll", "poll_time",
+ "watch_interval", NULL);
+ if (elt) {
+ map->poll_timeout = ucl_object_todouble(elt);
+ }
+
+ elt = ucl_object_lookup_any(obj, "upstreams", "url", "urls", NULL);
+ if (elt == NULL) {
+ msg_err_config("map has no urls to be loaded: no elt");
+ goto err;
+ }
+
+ if (ucl_object_type(elt) == UCL_ARRAY) {
+ /* Add array of maps as multiple backends */
+ it = ucl_object_iterate_new(elt);
+
+ while ((cur = ucl_object_iterate_safe(it, true)) != NULL) {
+ if (ucl_object_type(cur) == UCL_STRING) {
+ bk = rspamd_map_parse_backend(cfg, ucl_object_tostring(cur));
+
+ if (bk != NULL) {
+ rspamd_map_add_backend(map, bk);
+
+ if (!map->name) {
+ map->name = rspamd_mempool_strdup(cfg->cfg_pool,
+ ucl_object_tostring(cur));
+ }
+ }
+ }
+ else {
+ msg_err_config("bad map element type: %s",
+ ucl_object_type_to_string(ucl_object_type(cur)));
+ ucl_object_iterate_free(it);
+ goto err;
+ }
+ }
+
+ ucl_object_iterate_free(it);
+
+ if (map->backends->len == 0) {
+ msg_err_config("map has no urls to be loaded: empty object list");
+ goto err;
+ }
+ }
+ else if (ucl_object_type(elt) == UCL_STRING) {
+ bk = rspamd_map_parse_backend(cfg, ucl_object_tostring(elt));
+
+ if (bk != NULL) {
+ rspamd_map_add_backend(map, bk);
+
+ if (!map->name) {
+ map->name = rspamd_mempool_strdup(cfg->cfg_pool,
+ ucl_object_tostring(elt));
+ }
+ }
+ }
+
+ if (!map->backends || map->backends->len == 0) {
+ msg_err_config("map has no urls to be loaded: no valid backends");
+ goto err;
+ }
+ }
+ else {
+ msg_err_config("map has invalid type for value: %s",
+ ucl_object_type_to_string(ucl_object_type(obj)));
+ goto err;
+ }
+
+ gboolean all_local = TRUE;
+
+ PTR_ARRAY_FOREACH(map->backends, i, bk)
+ {
+ if (bk->protocol == MAP_PROTO_STATIC) {
+ GString *map_data;
+ /* We need data field in ucl */
+ elt = ucl_object_lookup(obj, "data");
+
+ if (elt == NULL) {
+ msg_err_config("map has static backend but no `data` field");
+ goto err;
+ }
+
+
+ if (ucl_object_type(elt) == UCL_STRING) {
+ map_data = g_string_sized_new(32);
+
+ if (rspamd_map_add_static_string(cfg, elt, map_data)) {
+ bk->data.sd->data = map_data->str;
+ bk->data.sd->len = map_data->len;
+ g_string_free(map_data, FALSE);
+ }
+ else {
+ g_string_free(map_data, TRUE);
+ msg_err_config("map has static backend with invalid `data` field");
+ goto err;
+ }
+ }
+ else if (ucl_object_type(elt) == UCL_ARRAY) {
+ map_data = g_string_sized_new(32);
+ it = ucl_object_iterate_new(elt);
+
+ while ((cur = ucl_object_iterate_safe(it, true))) {
+ if (!rspamd_map_add_static_string(cfg, cur, map_data)) {
+ g_string_free(map_data, TRUE);
+ msg_err_config("map has static backend with invalid "
+ "`data` field");
+ ucl_object_iterate_free(it);
+ goto err;
+ }
+ }
+
+ ucl_object_iterate_free(it);
+ bk->data.sd->data = map_data->str;
+ bk->data.sd->len = map_data->len;
+ g_string_free(map_data, FALSE);
+ }
+ }
+ else if (bk->protocol != MAP_PROTO_FILE) {
+ all_local = FALSE;
+ }
+ }
+
+ if (all_local) {
+ map->poll_timeout = (map->poll_timeout *
+ cfg->map_file_watch_multiplier);
+ }
+
+ rspamd_map_calculate_hash(map);
+ msg_debug_map("added map from ucl");
+
+ cfg->maps = g_list_prepend(cfg->maps, map);
+
+ return map;
+
+err:
+
+ if (map) {
+ PTR_ARRAY_FOREACH(map->backends, i, bk)
+ {
+ MAP_RELEASE(bk, "rspamd_map_backend");
+ }
+ }
+
+ return NULL;
+}
+
+rspamd_map_traverse_function
+rspamd_map_get_traverse_function(struct rspamd_map *map)
+{
+ if (map) {
+ return map->traverse_function;
+ }
+
+ return NULL;
+}
+
+void rspamd_map_traverse(struct rspamd_map *map, rspamd_map_traverse_cb cb,
+ gpointer cbdata, gboolean reset_hits)
+{
+ if (*map->user_data && map->traverse_function) {
+ map->traverse_function(*map->user_data, cb, cbdata, reset_hits);
+ }
+}
+
+void rspamd_map_set_on_load_function(struct rspamd_map *map, rspamd_map_on_load_function cb,
+ gpointer cbdata, GDestroyNotify dtor)
+{
+ if (map) {
+ map->on_load_function = cb;
+ map->on_load_ud = cbdata;
+ map->on_load_ud_dtor = dtor;
+ }
+}