diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-10 21:30:40 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-10 21:30:40 +0000 |
commit | 133a45c109da5310add55824db21af5239951f93 (patch) | |
tree | ba6ac4c0a950a0dda56451944315d66409923918 /src/libserver/maps/map.c | |
parent | Initial commit. (diff) | |
download | rspamd-133a45c109da5310add55824db21af5239951f93.tar.xz rspamd-133a45c109da5310add55824db21af5239951f93.zip |
Adding upstream version 3.8.1.upstream/3.8.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/libserver/maps/map.c')
-rw-r--r-- | src/libserver/maps/map.c | 3195 |
1 files changed, 3195 insertions, 0 deletions
diff --git a/src/libserver/maps/map.c b/src/libserver/maps/map.c new file mode 100644 index 0000000..7f6a48f --- /dev/null +++ b/src/libserver/maps/map.c @@ -0,0 +1,3195 @@ +/* + * Copyright 2023 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Implementation of map files handling + */ + +#include "config.h" +#include "map.h" +#include "map_private.h" +#include "libserver/http/http_connection.h" +#include "libserver/http/http_private.h" +#include "rspamd.h" +#include "contrib/libev/ev.h" +#include "contrib/uthash/utlist.h" + +#ifdef SYS_ZSTD +#include "zstd.h" +#else +#include "contrib/zstd/zstd.h" +#endif + +#undef MAP_DEBUG_REFS +#ifdef MAP_DEBUG_REFS +#define MAP_RETAIN(x, t) \ + do { \ + msg_err(G_GNUC_PRETTY_FUNCTION ": " t ": retain ref %p, refcount: %d -> %d", (x), (x)->ref.refcount, (x)->ref.refcount + 1); \ + REF_RETAIN(x); \ + } while (0) + +#define MAP_RELEASE(x, t) \ + do { \ + msg_err(G_GNUC_PRETTY_FUNCTION ": " t ": release ref %p, refcount: %d -> %d", (x), (x)->ref.refcount, (x)->ref.refcount - 1); \ + REF_RELEASE(x); \ + } while (0) +#else +#define MAP_RETAIN(x, t) REF_RETAIN(x) +#define MAP_RELEASE(x, t) REF_RELEASE(x) +#endif + +enum rspamd_map_periodic_opts { + RSPAMD_MAP_SCHEDULE_NORMAL = 0, + RSPAMD_MAP_SCHEDULE_ERROR = (1u << 0u), + RSPAMD_MAP_SCHEDULE_LOCKED = (1u << 1u), + RSPAMD_MAP_SCHEDULE_INIT = (1u << 2u), +}; + +static void free_http_cbdata_common(struct http_callback_data *cbd, + gboolean plan_new); +static void free_http_cbdata_dtor(gpointer p); +static void free_http_cbdata(struct http_callback_data *cbd); +static void rspamd_map_process_periodic(struct map_periodic_cbdata *cbd); +static void rspamd_map_schedule_periodic(struct rspamd_map *map, int how); +static gboolean read_map_file_chunks(struct rspamd_map *map, + struct map_cb_data *cbdata, + const gchar *fname, + gsize len, + goffset off); +static gboolean rspamd_map_save_http_cached_file(struct rspamd_map *map, + struct rspamd_map_backend *bk, + struct http_map_data *htdata, + const guchar *data, + gsize len); +static gboolean rspamd_map_update_http_cached_file(struct rspamd_map *map, + struct rspamd_map_backend *bk, + struct http_map_data *htdata); + +guint rspamd_map_log_id = (guint) -1; +RSPAMD_CONSTRUCTOR(rspamd_map_log_init) +{ + rspamd_map_log_id = rspamd_logger_add_debug_module("map"); +} + +/** + * Write HTTP request + */ +static void +write_http_request(struct http_callback_data *cbd) +{ + gchar datebuf[128]; + struct rspamd_http_message *msg; + + msg = rspamd_http_new_message(HTTP_REQUEST); + if (cbd->check) { + msg->method = HTTP_HEAD; + } + + msg->url = rspamd_fstring_append(msg->url, + cbd->data->path, strlen(cbd->data->path)); + + if (cbd->check) { + if (cbd->data->last_modified != 0) { + rspamd_http_date_format(datebuf, sizeof(datebuf), + cbd->data->last_modified); + rspamd_http_message_add_header(msg, "If-Modified-Since", + datebuf); + } + if (cbd->data->etag) { + rspamd_http_message_add_header_len(msg, "If-None-Match", + cbd->data->etag->str, cbd->data->etag->len); + } + } + + msg->url = rspamd_fstring_append(msg->url, cbd->data->rest, + strlen(cbd->data->rest)); + + if (cbd->data->userinfo) { + rspamd_http_message_add_header(msg, "Authorization", + cbd->data->userinfo); + } + + MAP_RETAIN(cbd, "http_callback_data"); + rspamd_http_connection_write_message(cbd->conn, + msg, + cbd->data->host, + NULL, + cbd, + cbd->timeout); +} + +/** + * Callback for destroying HTTP callback data + */ +static void +free_http_cbdata_common(struct http_callback_data *cbd, gboolean plan_new) +{ + struct map_periodic_cbdata *periodic = cbd->periodic; + + if (cbd->shmem_data) { + rspamd_http_message_shmem_unref(cbd->shmem_data); + } + + if (cbd->pk) { + rspamd_pubkey_unref(cbd->pk); + } + + if (cbd->conn) { + rspamd_http_connection_unref(cbd->conn); + cbd->conn = NULL; + } + + if (cbd->addrs) { + rspamd_inet_addr_t *addr; + guint i; + + PTR_ARRAY_FOREACH(cbd->addrs, i, addr) + { + rspamd_inet_address_free(addr); + } + + g_ptr_array_free(cbd->addrs, TRUE); + } + + + MAP_RELEASE(cbd->bk, "rspamd_map_backend"); + + if (periodic) { + /* Detached in case of HTTP error */ + MAP_RELEASE(periodic, "periodic"); + } + + g_free(cbd); +} + +static void +free_http_cbdata(struct http_callback_data *cbd) +{ + cbd->map->tmp_dtor = NULL; + cbd->map->tmp_dtor_data = NULL; + + free_http_cbdata_common(cbd, TRUE); +} + +static void +free_http_cbdata_dtor(gpointer p) +{ + struct http_callback_data *cbd = p; + struct rspamd_map *map; + + map = cbd->map; + if (cbd->stage == http_map_http_conn) { + REF_RELEASE(cbd); + } + else { + /* We cannot terminate DNS requests sent */ + cbd->stage = http_map_terminated; + } + + msg_warn_map("%s: " + "connection with http server is terminated: worker is stopping", + map->name); +} + +/* + * HTTP callbacks + */ +static void +http_map_error(struct rspamd_http_connection *conn, + GError *err) +{ + struct http_callback_data *cbd = conn->ud; + struct rspamd_map *map; + + map = cbd->map; + + if (cbd->periodic) { + cbd->periodic->errored = TRUE; + msg_err_map("error reading %s(%s): " + "connection with http server terminated incorrectly: %e", + cbd->bk->uri, + cbd->addr ? rspamd_inet_address_to_string_pretty(cbd->addr) : "", + err); + + rspamd_map_process_periodic(cbd->periodic); + } + + MAP_RELEASE(cbd, "http_callback_data"); +} + +static void +rspamd_map_cache_cb(struct ev_loop *loop, ev_timer *w, int revents) +{ + struct rspamd_http_map_cached_cbdata *cache_cbd = (struct rspamd_http_map_cached_cbdata *) + w->data; + struct rspamd_map *map; + struct http_map_data *data; + + map = cache_cbd->map; + data = cache_cbd->data; + + if (cache_cbd->gen != cache_cbd->data->gen) { + /* We have another update, so this cache element is obviously expired */ + /* + * Important!: we do not set cache availability to zero here, as there + * might be fresh cache + */ + msg_info_map("cached data is now expired (gen mismatch %L != %L) for %s; shm name=%s; refcount=%d", + cache_cbd->gen, cache_cbd->data->gen, map->name, cache_cbd->shm->shm_name, + cache_cbd->shm->ref.refcount); + MAP_RELEASE(cache_cbd->shm, "rspamd_http_map_cached_cbdata"); + ev_timer_stop(loop, &cache_cbd->timeout); + g_free(cache_cbd); + } + else if (cache_cbd->data->last_checked >= cache_cbd->last_checked) { + /* + * We checked map but we have not found anything more recent, + * reschedule cache check + */ + if (cache_cbd->map->poll_timeout > + rspamd_get_calendar_ticks() - cache_cbd->data->last_checked) { + w->repeat = cache_cbd->map->poll_timeout - + (rspamd_get_calendar_ticks() - cache_cbd->data->last_checked); + } + else { + w->repeat = cache_cbd->map->poll_timeout; + } + + if (w->repeat < 0) { + msg_info_map("cached data for %s has skewed check time: %d last checked, " + "%d poll timeout, %.2f diff; shm name=%s; refcount=%d", + map->name, (int) cache_cbd->data->last_checked, + (int) cache_cbd->map->poll_timeout, + (rspamd_get_calendar_ticks() - cache_cbd->data->last_checked), + cache_cbd->shm->shm_name, + cache_cbd->shm->ref.refcount); + w->repeat = 0.0; + } + + cache_cbd->last_checked = cache_cbd->data->last_checked; + msg_debug_map("cached data is up to date for %s", map->name); + ev_timer_again(loop, &cache_cbd->timeout); + } + else { + data->cur_cache_cbd = NULL; + g_atomic_int_set(&data->cache->available, 0); + msg_info_map("cached data is now expired for %s; shm name=%s; refcount=%d", + map->name, + cache_cbd->shm->shm_name, + cache_cbd->shm->ref.refcount); + MAP_RELEASE(cache_cbd->shm, "rspamd_http_map_cached_cbdata"); + ev_timer_stop(loop, &cache_cbd->timeout); + g_free(cache_cbd); + } +} + +static int +http_map_finish(struct rspamd_http_connection *conn, + struct rspamd_http_message *msg) +{ + struct http_callback_data *cbd = conn->ud; + struct rspamd_map *map; + struct rspamd_map_backend *bk; + struct http_map_data *data; + struct rspamd_http_map_cached_cbdata *cache_cbd; + const rspamd_ftok_t *expires_hdr, *etag_hdr; + char next_check_date[128]; + guchar *in = NULL; + gsize dlen = 0; + + map = cbd->map; + bk = cbd->bk; + data = bk->data.hd; + + if (msg->code == 200) { + + if (cbd->check) { + msg_info_map("need to reread map from %s", cbd->bk->uri); + cbd->periodic->need_modify = TRUE; + /* Reset the whole chain */ + cbd->periodic->cur_backend = 0; + /* Reset cache, old cached data will be cleaned on timeout */ + g_atomic_int_set(&data->cache->available, 0); + data->cur_cache_cbd = NULL; + + rspamd_map_process_periodic(cbd->periodic); + MAP_RELEASE(cbd, "http_callback_data"); + + return 0; + } + + cbd->data->last_checked = msg->date; + + if (msg->last_modified) { + cbd->data->last_modified = msg->last_modified; + } + else { + cbd->data->last_modified = msg->date; + } + + + /* Unsigned version - just open file */ + cbd->shmem_data = rspamd_http_message_shmem_ref(msg); + cbd->data_len = msg->body_buf.len; + + if (cbd->data_len == 0) { + msg_err_map("cannot read empty map"); + goto err; + } + + g_assert(cbd->shmem_data != NULL); + + in = rspamd_shmem_xmap(cbd->shmem_data->shm_name, PROT_READ, &dlen); + + if (in == NULL) { + msg_err_map("cannot read tempfile %s: %s", + cbd->shmem_data->shm_name, + strerror(errno)); + goto err; + } + + /* Check for expires */ + double cached_timeout = map->poll_timeout * 2; + + expires_hdr = rspamd_http_message_find_header(msg, "Expires"); + + if (expires_hdr) { + time_t hdate; + + hdate = rspamd_http_parse_date(expires_hdr->begin, expires_hdr->len); + + if (hdate != (time_t) -1 && hdate > msg->date) { + cached_timeout = map->next_check - msg->date + + map->poll_timeout * 2; + + map->next_check = hdate; + } + else { + msg_info_map("invalid expires header: %T, ignore it", expires_hdr); + map->next_check = 0; + } + } + + /* Check for etag */ + etag_hdr = rspamd_http_message_find_header(msg, "ETag"); + + if (etag_hdr) { + if (cbd->data->etag) { + /* Remove old etag */ + rspamd_fstring_free(cbd->data->etag); + } + + cbd->data->etag = rspamd_fstring_new_init(etag_hdr->begin, + etag_hdr->len); + } + else { + if (cbd->data->etag) { + /* Remove and clear old etag */ + rspamd_fstring_free(cbd->data->etag); + cbd->data->etag = NULL; + } + } + + MAP_RETAIN(cbd->shmem_data, "shmem_data"); + cbd->data->gen++; + /* + * We know that a map is in the locked state + */ + g_atomic_int_set(&data->cache->available, 1); + /* Store cached data */ + rspamd_strlcpy(data->cache->shmem_name, cbd->shmem_data->shm_name, + sizeof(data->cache->shmem_name)); + data->cache->len = cbd->data_len; + data->cache->last_modified = cbd->data->last_modified; + cache_cbd = g_malloc0(sizeof(*cache_cbd)); + cache_cbd->shm = cbd->shmem_data; + cache_cbd->event_loop = cbd->event_loop; + cache_cbd->map = map; + cache_cbd->data = cbd->data; + cache_cbd->last_checked = cbd->data->last_checked; + cache_cbd->gen = cbd->data->gen; + MAP_RETAIN(cache_cbd->shm, "shmem_data"); + msg_info_map("stored map data in a shared memory cache: %s", + cache_cbd->shm->shm_name); + + ev_timer_init(&cache_cbd->timeout, rspamd_map_cache_cb, cached_timeout, + 0.0); + ev_timer_start(cbd->event_loop, &cache_cbd->timeout); + cache_cbd->timeout.data = cache_cbd; + data->cur_cache_cbd = cache_cbd; + + if (map->next_check) { + rspamd_http_date_format(next_check_date, sizeof(next_check_date), + map->next_check); + } + else { + rspamd_http_date_format(next_check_date, sizeof(next_check_date), + rspamd_get_calendar_ticks() + map->poll_timeout); + } + + + if (cbd->bk->is_compressed) { + ZSTD_DStream *zstream; + ZSTD_inBuffer zin; + ZSTD_outBuffer zout; + guchar *out; + gsize outlen, r; + + zstream = ZSTD_createDStream(); + ZSTD_initDStream(zstream); + + zin.pos = 0; + zin.src = in; + zin.size = dlen; + + if ((outlen = ZSTD_getDecompressedSize(zin.src, zin.size)) == 0) { + outlen = ZSTD_DStreamOutSize(); + } + + out = g_malloc(outlen); + + zout.dst = out; + zout.pos = 0; + zout.size = outlen; + + while (zin.pos < zin.size) { + r = ZSTD_decompressStream(zstream, &zout, &zin); + + if (ZSTD_isError(r)) { + msg_err_map("%s(%s): cannot decompress data: %s", + cbd->bk->uri, + rspamd_inet_address_to_string_pretty(cbd->addr), + ZSTD_getErrorName(r)); + ZSTD_freeDStream(zstream); + g_free(out); + MAP_RELEASE(cbd->shmem_data, "shmem_data"); + goto err; + } + + if (zout.pos == zout.size) { + /* We need to extend output buffer */ + zout.size = zout.size * 2 + 1.0; + out = g_realloc(zout.dst, zout.size); + zout.dst = out; + } + } + + ZSTD_freeDStream(zstream); + msg_info_map("%s(%s): read map data %z bytes compressed, " + "%z uncompressed, next check at %s", + cbd->bk->uri, + rspamd_inet_address_to_string_pretty(cbd->addr), + dlen, zout.pos, next_check_date); + map->read_callback(out, zout.pos, &cbd->periodic->cbdata, TRUE); + rspamd_map_save_http_cached_file(map, bk, cbd->data, out, zout.pos); + g_free(out); + } + else { + msg_info_map("%s(%s): read map data %z bytes, next check at %s", + cbd->bk->uri, + rspamd_inet_address_to_string_pretty(cbd->addr), + dlen, next_check_date); + rspamd_map_save_http_cached_file(map, bk, cbd->data, in, cbd->data_len); + map->read_callback(in, cbd->data_len, &cbd->periodic->cbdata, TRUE); + } + + MAP_RELEASE(cbd->shmem_data, "shmem_data"); + + cbd->periodic->cur_backend++; + munmap(in, dlen); + rspamd_map_process_periodic(cbd->periodic); + } + else if (msg->code == 304 && cbd->check) { + cbd->data->last_checked = msg->date; + + if (msg->last_modified) { + cbd->data->last_modified = msg->last_modified; + } + else { + cbd->data->last_modified = msg->date; + } + + expires_hdr = rspamd_http_message_find_header(msg, "Expires"); + + if (expires_hdr) { + time_t hdate; + + hdate = rspamd_http_parse_date(expires_hdr->begin, expires_hdr->len); + if (hdate != (time_t) -1 && hdate > msg->date) { + map->next_check = hdate; + } + else { + msg_info_map("invalid expires header: %T, ignore it", expires_hdr); + map->next_check = 0; + } + } + + etag_hdr = rspamd_http_message_find_header(msg, "ETag"); + + if (etag_hdr) { + if (cbd->data->etag) { + /* Remove old etag */ + rspamd_fstring_free(cbd->data->etag); + cbd->data->etag = rspamd_fstring_new_init(etag_hdr->begin, + etag_hdr->len); + } + } + + if (map->next_check) { + rspamd_http_date_format(next_check_date, sizeof(next_check_date), + map->next_check); + msg_info_map("data is not modified for server %s, next check at %s " + "(http cache based: %T)", + cbd->data->host, next_check_date, expires_hdr); + } + else { + rspamd_http_date_format(next_check_date, sizeof(next_check_date), + rspamd_get_calendar_ticks() + map->poll_timeout); + msg_info_map("data is not modified for server %s, next check at %s " + "(timer based)", + cbd->data->host, next_check_date); + } + + rspamd_map_update_http_cached_file(map, bk, cbd->data); + cbd->periodic->cur_backend++; + rspamd_map_process_periodic(cbd->periodic); + } + else { + msg_info_map("cannot load map %s from %s: HTTP error %d", + bk->uri, cbd->data->host, msg->code); + goto err; + } + + MAP_RELEASE(cbd, "http_callback_data"); + return 0; + +err: + cbd->periodic->errored = 1; + rspamd_map_process_periodic(cbd->periodic); + MAP_RELEASE(cbd, "http_callback_data"); + + return 0; +} + +static gboolean +read_map_file_chunks(struct rspamd_map *map, struct map_cb_data *cbdata, + const gchar *fname, gsize len, goffset off) +{ + gint fd; + gssize r, avail; + gsize buflen = 1024 * 1024; + gchar *pos, *bytes; + + fd = rspamd_file_xopen(fname, O_RDONLY, 0, TRUE); + + if (fd == -1) { + msg_err_map("can't open map for buffered reading %s: %s", + fname, strerror(errno)); + return FALSE; + } + + if (lseek(fd, off, SEEK_SET) == -1) { + msg_err_map("can't seek in map to pos %d for buffered reading %s: %s", + (gint) off, fname, strerror(errno)); + close(fd); + + return FALSE; + } + + buflen = MIN(len, buflen); + bytes = g_malloc(buflen); + avail = buflen; + pos = bytes; + + while ((r = read(fd, pos, avail)) > 0) { + gchar *end = bytes + (pos - bytes) + r; + msg_debug_map("%s: read map chunk, %z bytes", fname, + r); + pos = map->read_callback(bytes, end - bytes, cbdata, r == len); + + if (pos && pos > bytes && pos < end) { + guint remain = end - pos; + + memmove(bytes, pos, remain); + pos = bytes + remain; + /* Need to preserve the remain */ + avail = ((gssize) buflen) - remain; + + if (avail <= 0) { + /* Try realloc, too large element */ + g_assert(buflen >= remain); + bytes = g_realloc(bytes, buflen * 2); + + pos = bytes + remain; /* Adjust */ + avail += buflen; + buflen *= 2; + } + } + else { + avail = buflen; + pos = bytes; + } + + len -= r; + } + + if (r == -1) { + msg_err_map("can't read from map %s: %s", fname, strerror(errno)); + close(fd); + g_free(bytes); + + return FALSE; + } + + close(fd); + g_free(bytes); + + return TRUE; +} + +static gboolean +rspamd_map_check_sig_pk_mem(const guchar *sig, + gsize siglen, + struct rspamd_map *map, + const guchar *input, + gsize inlen, + struct rspamd_cryptobox_pubkey *pk) +{ + GString *b32_key; + gboolean ret = TRUE; + + if (siglen != rspamd_cryptobox_signature_bytes(RSPAMD_CRYPTOBOX_MODE_25519)) { + msg_err_map("can't open signature for %s: invalid size: %z", map->name, siglen); + + ret = FALSE; + } + + if (ret && !rspamd_cryptobox_verify(sig, siglen, input, inlen, + rspamd_pubkey_get_pk(pk, NULL), RSPAMD_CRYPTOBOX_MODE_25519)) { + msg_err_map("can't verify signature for %s: incorrect signature", map->name); + + ret = FALSE; + } + + if (ret) { + b32_key = rspamd_pubkey_print(pk, + RSPAMD_KEYPAIR_BASE32 | RSPAMD_KEYPAIR_PUBKEY); + msg_info_map("verified signature for %s using trusted key %v", + map->name, b32_key); + g_string_free(b32_key, TRUE); + } + + return ret; +} + +static gboolean +rspamd_map_check_file_sig(const char *fname, + struct rspamd_map *map, + struct rspamd_map_backend *bk, + const guchar *input, + gsize inlen) +{ + guchar *data; + struct rspamd_cryptobox_pubkey *pk = NULL; + GString *b32_key; + gboolean ret = TRUE; + gsize len = 0; + gchar fpath[PATH_MAX]; + + if (bk->trusted_pubkey == NULL) { + /* Try to load and check pubkey */ + rspamd_snprintf(fpath, sizeof(fpath), "%s.pub", fname); + data = rspamd_file_xmap(fpath, PROT_READ, &len, TRUE); + + if (data == NULL) { + msg_err_map("can't open pubkey %s: %s", fpath, strerror(errno)); + return FALSE; + } + + pk = rspamd_pubkey_from_base32(data, len, RSPAMD_KEYPAIR_SIGN, + RSPAMD_CRYPTOBOX_MODE_25519); + munmap(data, len); + + if (pk == NULL) { + msg_err_map("can't load pubkey %s", fpath); + return FALSE; + } + + /* We just check pk against the trusted db of keys */ + b32_key = rspamd_pubkey_print(pk, + RSPAMD_KEYPAIR_BASE32 | RSPAMD_KEYPAIR_PUBKEY); + g_assert(b32_key != NULL); + + if (g_hash_table_lookup(map->cfg->trusted_keys, b32_key->str) == NULL) { + msg_err_map("pubkey loaded from %s is untrusted: %v", fpath, + b32_key); + g_string_free(b32_key, TRUE); + rspamd_pubkey_unref(pk); + + return FALSE; + } + + g_string_free(b32_key, TRUE); + } + else { + pk = rspamd_pubkey_ref(bk->trusted_pubkey); + } + + rspamd_snprintf(fpath, sizeof(fpath), "%s.sig", fname); + data = rspamd_shmem_xmap(fpath, PROT_READ, &len); + + if (data == NULL) { + msg_err_map("can't open signature %s: %s", fpath, strerror(errno)); + ret = FALSE; + } + + if (ret) { + ret = rspamd_map_check_sig_pk_mem(data, len, map, input, inlen, pk); + munmap(data, len); + } + + rspamd_pubkey_unref(pk); + + return ret; +} + +/** + * Callback for reading data from file + */ +static gboolean +read_map_file(struct rspamd_map *map, struct file_map_data *data, + struct rspamd_map_backend *bk, struct map_periodic_cbdata *periodic) +{ + gchar *bytes; + gsize len; + struct stat st; + + if (map->read_callback == NULL || map->fin_callback == NULL) { + msg_err_map("%s: bad callback for reading map file", + data->filename); + return FALSE; + } + + if (stat(data->filename, &st) == -1) { + /* File does not exist, skipping */ + if (errno != ENOENT) { + msg_err_map("%s: map file is unavailable for reading: %s", + data->filename, strerror(errno)); + + return FALSE; + } + else { + msg_info_map("%s: map file is not found; " + "it will be read automatically if created", + data->filename); + return TRUE; + } + } + + ev_stat_stat(map->event_loop, &data->st_ev); + len = st.st_size; + + if (bk->is_signed) { + bytes = rspamd_file_xmap(data->filename, PROT_READ, &len, TRUE); + + if (bytes == NULL) { + msg_err_map("can't open map %s: %s", data->filename, strerror(errno)); + return FALSE; + } + + if (!rspamd_map_check_file_sig(data->filename, map, bk, bytes, len)) { + munmap(bytes, len); + + return FALSE; + } + + munmap(bytes, len); + } + + if (len > 0) { + if (map->no_file_read) { + /* We just call read callback with backend name */ + map->read_callback(data->filename, strlen(data->filename), + &periodic->cbdata, TRUE); + } + else { + if (bk->is_compressed) { + bytes = rspamd_file_xmap(data->filename, PROT_READ, &len, TRUE); + + if (bytes == NULL) { + msg_err_map("can't open map %s: %s", data->filename, strerror(errno)); + return FALSE; + } + + ZSTD_DStream *zstream; + ZSTD_inBuffer zin; + ZSTD_outBuffer zout; + guchar *out; + gsize outlen, r; + + zstream = ZSTD_createDStream(); + ZSTD_initDStream(zstream); + + zin.pos = 0; + zin.src = bytes; + zin.size = len; + + if ((outlen = ZSTD_getDecompressedSize(zin.src, zin.size)) == 0) { + outlen = ZSTD_DStreamOutSize(); + } + + out = g_malloc(outlen); + + zout.dst = out; + zout.pos = 0; + zout.size = outlen; + + while (zin.pos < zin.size) { + r = ZSTD_decompressStream(zstream, &zout, &zin); + + if (ZSTD_isError(r)) { + msg_err_map("%s: cannot decompress data: %s", + data->filename, + ZSTD_getErrorName(r)); + ZSTD_freeDStream(zstream); + g_free(out); + munmap(bytes, len); + return FALSE; + } + + if (zout.pos == zout.size) { + /* We need to extend output buffer */ + zout.size = zout.size * 2 + 1; + out = g_realloc(zout.dst, zout.size); + zout.dst = out; + } + } + + ZSTD_freeDStream(zstream); + msg_info_map("%s: read map data, %z bytes compressed, " + "%z uncompressed)", + data->filename, + len, zout.pos); + map->read_callback(out, zout.pos, &periodic->cbdata, TRUE); + g_free(out); + + munmap(bytes, len); + } + else { + /* Perform buffered read: fail-safe */ + if (!read_map_file_chunks(map, &periodic->cbdata, data->filename, + len, 0)) { + return FALSE; + } + } + } + } + else { + /* Empty map */ + map->read_callback(NULL, 0, &periodic->cbdata, TRUE); + } + + return TRUE; +} + +static gboolean +read_map_static(struct rspamd_map *map, struct static_map_data *data, + struct rspamd_map_backend *bk, struct map_periodic_cbdata *periodic) +{ + guchar *bytes; + gsize len; + + if (map->read_callback == NULL || map->fin_callback == NULL) { + msg_err_map("%s: bad callback for reading map file", map->name); + data->processed = TRUE; + return FALSE; + } + + bytes = data->data; + len = data->len; + + if (len > 0) { + if (bk->is_compressed) { + ZSTD_DStream *zstream; + ZSTD_inBuffer zin; + ZSTD_outBuffer zout; + guchar *out; + gsize outlen, r; + + zstream = ZSTD_createDStream(); + ZSTD_initDStream(zstream); + + zin.pos = 0; + zin.src = bytes; + zin.size = len; + + if ((outlen = ZSTD_getDecompressedSize(zin.src, zin.size)) == 0) { + outlen = ZSTD_DStreamOutSize(); + } + + out = g_malloc(outlen); + + zout.dst = out; + zout.pos = 0; + zout.size = outlen; + + while (zin.pos < zin.size) { + r = ZSTD_decompressStream(zstream, &zout, &zin); + + if (ZSTD_isError(r)) { + msg_err_map("%s: cannot decompress data: %s", + map->name, + ZSTD_getErrorName(r)); + ZSTD_freeDStream(zstream); + g_free(out); + + return FALSE; + } + + if (zout.pos == zout.size) { + /* We need to extend output buffer */ + zout.size = zout.size * 2 + 1; + out = g_realloc(zout.dst, zout.size); + zout.dst = out; + } + } + + ZSTD_freeDStream(zstream); + msg_info_map("%s: read map data, %z bytes compressed, " + "%z uncompressed)", + map->name, + len, zout.pos); + map->read_callback(out, zout.pos, &periodic->cbdata, TRUE); + g_free(out); + } + else { + msg_info_map("%s: read map data, %z bytes", + map->name, len); + map->read_callback(bytes, len, &periodic->cbdata, TRUE); + } + } + else { + map->read_callback(NULL, 0, &periodic->cbdata, TRUE); + } + + data->processed = TRUE; + + return TRUE; +} + +static void +rspamd_map_periodic_dtor(struct map_periodic_cbdata *periodic) +{ + struct rspamd_map *map; + + map = periodic->map; + msg_debug_map("periodic dtor %p", periodic); + + if (periodic->need_modify || periodic->cbdata.errored) { + /* Need to notify the real data structure */ + periodic->map->fin_callback(&periodic->cbdata, periodic->map->user_data); + + if (map->on_load_function) { + map->on_load_function(map, map->on_load_ud); + } + } + else { + /* Not modified */ + } + + if (periodic->locked) { + g_atomic_int_set(periodic->map->locked, 0); + msg_debug_map("unlocked map %s", periodic->map->name); + + if (periodic->map->wrk->state == rspamd_worker_state_running) { + rspamd_map_schedule_periodic(periodic->map, + RSPAMD_SYMBOL_RESULT_NORMAL); + } + else { + msg_debug_map("stop scheduling periodics for %s; terminating state", + periodic->map->name); + } + } + + g_free(periodic); +} + +/* Called on timer execution */ +static void +rspamd_map_periodic_callback(struct ev_loop *loop, ev_timer *w, int revents) +{ + struct map_periodic_cbdata *cbd = (struct map_periodic_cbdata *) w->data; + + MAP_RETAIN(cbd, "periodic"); + ev_timer_stop(loop, w); + rspamd_map_process_periodic(cbd); + MAP_RELEASE(cbd, "periodic"); +} + +static void +rspamd_map_schedule_periodic(struct rspamd_map *map, int how) +{ + const gdouble error_mult = 20.0, lock_mult = 0.1; + static const gdouble min_timer_interval = 2.0; + const gchar *reason = "unknown reason"; + gdouble jittered_sec; + gdouble timeout; + struct map_periodic_cbdata *cbd; + + if (map->scheduled_check || (map->wrk && + map->wrk->state != rspamd_worker_state_running)) { + /* + * Do not schedule check if some check is already scheduled or + * if worker is going to die + */ + return; + } + + if (!(how & RSPAMD_MAP_SCHEDULE_INIT) && map->static_only) { + /* No need to schedule anything for static maps */ + return; + } + + if (map->non_trivial && map->next_check != 0) { + timeout = map->next_check - rspamd_get_calendar_ticks(); + map->next_check = 0; + + if (timeout > 0 && timeout < map->poll_timeout) { + /* Early check case, jitter */ + gdouble poll_timeout = map->poll_timeout; + + if (how & RSPAMD_MAP_SCHEDULE_ERROR) { + poll_timeout = map->poll_timeout * error_mult; + reason = "early active non-trivial check (after error)"; + } + else if (how & RSPAMD_MAP_SCHEDULE_LOCKED) { + poll_timeout = map->poll_timeout * lock_mult; + reason = "early active non-trivial check (after being locked)"; + } + else { + reason = "early active non-trivial check"; + } + + jittered_sec = MIN(timeout, poll_timeout); + } + else if (timeout <= 0) { + /* Data is already expired, need to check */ + if (how & RSPAMD_MAP_SCHEDULE_ERROR) { + /* In case of error we still need to increase delay */ + jittered_sec = map->poll_timeout * error_mult; + reason = "expired non-trivial data (after error)"; + } + else { + jittered_sec = 0.0; + reason = "expired non-trivial data"; + } + } + else { + /* No need to check now, wait till next_check */ + jittered_sec = timeout; + reason = "valid non-trivial data"; + } + } + else { + /* No valid information when to check a map, plan a timer based check */ + timeout = map->poll_timeout; + + if (how & RSPAMD_MAP_SCHEDULE_INIT) { + if (map->active_http) { + /* Spill maps load to get better chances to hit ssl cache */ + timeout = rspamd_time_jitter(0.0, 2.0); + } + else { + timeout = 0.0; + } + + reason = "init scheduled check"; + } + else { + if (how & RSPAMD_MAP_SCHEDULE_ERROR) { + timeout = map->poll_timeout * error_mult; + reason = "errored scheduled check"; + } + else if (how & RSPAMD_MAP_SCHEDULE_LOCKED) { + timeout = map->poll_timeout * lock_mult; + reason = "locked scheduled check"; + } + else { + reason = "normal scheduled check"; + } + } + + jittered_sec = rspamd_time_jitter(timeout, 0); + } + + /* Now, we do some sanity checks for jittered seconds */ + if (!(how & RSPAMD_MAP_SCHEDULE_INIT)) { + /* Never allow too low interval between timer checks, it is expensive */ + if (jittered_sec < min_timer_interval) { + jittered_sec = rspamd_time_jitter(min_timer_interval, 0); + } + + if (map->non_trivial) { + /* + * Even if we are reported that we need to reload cache often, we + * still want to be sane in terms of events... + */ + if (jittered_sec < min_timer_interval * 2.0) { + if (map->nelts > 0) { + jittered_sec = min_timer_interval * 3.0; + } + } + } + } + + cbd = g_malloc0(sizeof(*cbd)); + cbd->cbdata.prev_data = *map->user_data; + cbd->cbdata.cur_data = NULL; + cbd->cbdata.map = map; + cbd->map = map; + map->scheduled_check = cbd; + REF_INIT_RETAIN(cbd, rspamd_map_periodic_dtor); + + cbd->ev.data = cbd; + ev_timer_init(&cbd->ev, rspamd_map_periodic_callback, jittered_sec, 0.0); + ev_timer_start(map->event_loop, &cbd->ev); + + msg_debug_map("schedule new periodic event %p in %.3f seconds for %s; reason: %s", + cbd, jittered_sec, map->name, reason); +} + +static gint +rspamd_map_af_to_weight(const rspamd_inet_addr_t *addr) +{ + int ret; + + switch (rspamd_inet_address_get_af(addr)) { + case AF_UNIX: + ret = 2; + break; + case AF_INET: + ret = 1; + break; + default: + ret = 0; + break; + } + + return ret; +} + +static gint +rspamd_map_dns_address_sort_func(gconstpointer a, gconstpointer b) +{ + const rspamd_inet_addr_t *ip1 = *(const rspamd_inet_addr_t **) a, + *ip2 = *(const rspamd_inet_addr_t **) b; + gint w1, w2; + + w1 = rspamd_map_af_to_weight(ip1); + w2 = rspamd_map_af_to_weight(ip2); + + /* Inverse order */ + return w2 - w1; +} + +static void +rspamd_map_dns_callback(struct rdns_reply *reply, void *arg) +{ + struct http_callback_data *cbd = arg; + struct rdns_reply_entry *cur_rep; + struct rspamd_map *map; + guint flags = RSPAMD_HTTP_CLIENT_SIMPLE | RSPAMD_HTTP_CLIENT_SHARED; + + map = cbd->map; + + msg_debug_map("got dns reply with code %s on stage %d", + rdns_strerror(reply->code), cbd->stage); + + if (cbd->stage == http_map_terminated) { + MAP_RELEASE(cbd, "http_callback_data"); + return; + } + + if (reply->code == RDNS_RC_NOERROR) { + DL_FOREACH(reply->entries, cur_rep) + { + rspamd_inet_addr_t *addr; + addr = rspamd_inet_address_from_rnds(cur_rep); + + if (addr != NULL) { + rspamd_inet_address_set_port(addr, cbd->data->port); + g_ptr_array_add(cbd->addrs, (void *) addr); + } + } + + if (cbd->stage == http_map_resolve_host2) { + /* We have still one request pending */ + cbd->stage = http_map_resolve_host1; + } + else if (cbd->stage == http_map_resolve_host1) { + cbd->stage = http_map_http_conn; + } + } + else if (cbd->stage < http_map_http_conn) { + if (cbd->stage == http_map_resolve_host2) { + /* We have still one request pending */ + cbd->stage = http_map_resolve_host1; + } + else if (cbd->addrs->len == 0) { + /* We could not resolve host, so cowardly fail here */ + msg_err_map("cannot resolve %s: %s", cbd->data->host, + rdns_strerror(reply->code)); + cbd->periodic->errored = 1; + rspamd_map_process_periodic(cbd->periodic); + } + else { + /* We have at least one address, so we can continue... */ + cbd->stage = http_map_http_conn; + } + } + + if (cbd->stage == http_map_http_conn && cbd->addrs->len > 0) { + rspamd_ptr_array_shuffle(cbd->addrs); + gint idx = 0; + /* + * For the existing addr we can just select any address as we have + * data available + */ + if (cbd->map->nelts > 0 && rspamd_random_double_fast() > 0.5) { + /* Already shuffled, use whatever is the first */ + cbd->addr = (rspamd_inet_addr_t *) g_ptr_array_index(cbd->addrs, idx); + } + else { + /* Always prefer IPv4 as IPv6 is almost all the time broken */ + g_ptr_array_sort(cbd->addrs, rspamd_map_dns_address_sort_func); + cbd->addr = (rspamd_inet_addr_t *) g_ptr_array_index(cbd->addrs, idx); + } + + retry: + msg_debug_map("try open http connection to %s", + rspamd_inet_address_to_string_pretty(cbd->addr)); + if (cbd->bk->protocol == MAP_PROTO_HTTPS) { + flags |= RSPAMD_HTTP_CLIENT_SSL; + } + cbd->conn = rspamd_http_connection_new_client(NULL, + NULL, + http_map_error, + http_map_finish, + flags, + cbd->addr); + + if (cbd->conn != NULL) { + write_http_request(cbd); + } + else { + if (idx < cbd->addrs->len - 1) { + /* We can retry */ + idx++; + rspamd_inet_addr_t *prev_addr = cbd->addr; + cbd->addr = (rspamd_inet_addr_t *) g_ptr_array_index(cbd->addrs, idx); + msg_info_map("cannot connect to %s to get data for %s: %s, retry with %s (%d of %d)", + rspamd_inet_address_to_string_pretty(prev_addr), + cbd->bk->uri, + strerror(errno), + rspamd_inet_address_to_string_pretty(cbd->addr), + idx + 1, cbd->addrs->len); + goto retry; + } + else { + /* Nothing else left */ + cbd->periodic->errored = TRUE; + msg_err_map("error reading %s(%s): " + "connection with http server terminated incorrectly: %s", + cbd->bk->uri, + cbd->addr ? rspamd_inet_address_to_string_pretty(cbd->addr) : "", + strerror(errno)); + + rspamd_map_process_periodic(cbd->periodic); + } + } + } + + MAP_RELEASE(cbd, "http_callback_data"); +} + +static gboolean +rspamd_map_read_cached(struct rspamd_map *map, struct rspamd_map_backend *bk, + struct map_periodic_cbdata *periodic, const gchar *host) +{ + gsize mmap_len, len; + gpointer in; + struct http_map_data *data; + + data = bk->data.hd; + + in = rspamd_shmem_xmap(data->cache->shmem_name, PROT_READ, &mmap_len); + + if (in == NULL) { + msg_err("cannot map cache from %s: %s", data->cache->shmem_name, + strerror(errno)); + return FALSE; + } + + if (mmap_len < data->cache->len) { + msg_err("cannot map cache from %s: truncated length %z, %z expected", + data->cache->shmem_name, + mmap_len, data->cache->len); + munmap(in, mmap_len); + + return FALSE; + } + + /* + * Len is taken from the shmem file size that can be larger than the + * actual data length, as we use shared memory as a growing buffer for the + * HTTP input. + * Hence, we need to use len from the saved cache data, counting that it is + * at least not more than the cached file length (this is checked above). + */ + len = data->cache->len; + + if (bk->is_compressed) { + ZSTD_DStream *zstream; + ZSTD_inBuffer zin; + ZSTD_outBuffer zout; + guchar *out; + gsize outlen, r; + + zstream = ZSTD_createDStream(); + ZSTD_initDStream(zstream); + + zin.pos = 0; + zin.src = in; + zin.size = len; + + if ((outlen = ZSTD_getDecompressedSize(zin.src, zin.size)) == 0) { + outlen = ZSTD_DStreamOutSize(); + } + + out = g_malloc(outlen); + + zout.dst = out; + zout.pos = 0; + zout.size = outlen; + + while (zin.pos < zin.size) { + r = ZSTD_decompressStream(zstream, &zout, &zin); + + if (ZSTD_isError(r)) { + msg_err_map("%s: cannot decompress data: %s", + bk->uri, + ZSTD_getErrorName(r)); + ZSTD_freeDStream(zstream); + g_free(out); + munmap(in, mmap_len); + return FALSE; + } + + if (zout.pos == zout.size) { + /* We need to extend output buffer */ + zout.size = zout.size * 2 + 1; + out = g_realloc(zout.dst, zout.size); + zout.dst = out; + } + } + + ZSTD_freeDStream(zstream); + msg_info_map("%s: read map data cached %z bytes compressed, " + "%z uncompressed", + bk->uri, + len, zout.pos); + map->read_callback(out, zout.pos, &periodic->cbdata, TRUE); + g_free(out); + } + else { + msg_info_map("%s: read map data cached %z bytes", bk->uri, len); + map->read_callback(in, len, &periodic->cbdata, TRUE); + } + + munmap(in, mmap_len); + + return TRUE; +} + +static gboolean +rspamd_map_has_http_cached_file(struct rspamd_map *map, + struct rspamd_map_backend *bk) +{ + gchar path[PATH_MAX]; + guchar digest[rspamd_cryptobox_HASHBYTES]; + struct rspamd_config *cfg = map->cfg; + struct stat st; + + if (cfg->maps_cache_dir == NULL || cfg->maps_cache_dir[0] == '\0') { + return FALSE; + } + + rspamd_cryptobox_hash(digest, bk->uri, strlen(bk->uri), NULL, 0); + rspamd_snprintf(path, sizeof(path), "%s%c%*xs.map", cfg->maps_cache_dir, + G_DIR_SEPARATOR, 20, digest); + + if (stat(path, &st) != -1 && st.st_size > + sizeof(struct rspamd_http_file_data)) { + return TRUE; + } + + return FALSE; +} + +static gboolean +rspamd_map_save_http_cached_file(struct rspamd_map *map, + struct rspamd_map_backend *bk, + struct http_map_data *htdata, + const guchar *data, + gsize len) +{ + gchar path[PATH_MAX]; + guchar digest[rspamd_cryptobox_HASHBYTES]; + struct rspamd_config *cfg = map->cfg; + gint fd; + struct rspamd_http_file_data header; + + if (cfg->maps_cache_dir == NULL || cfg->maps_cache_dir[0] == '\0') { + return FALSE; + } + + rspamd_cryptobox_hash(digest, bk->uri, strlen(bk->uri), NULL, 0); + rspamd_snprintf(path, sizeof(path), "%s%c%*xs.map", cfg->maps_cache_dir, + G_DIR_SEPARATOR, 20, digest); + + fd = rspamd_file_xopen(path, O_WRONLY | O_TRUNC | O_CREAT, + 00600, FALSE); + + if (fd == -1) { + return FALSE; + } + + if (!rspamd_file_lock(fd, FALSE)) { + msg_err_map("cannot lock file %s: %s", path, strerror(errno)); + close(fd); + + return FALSE; + } + + memcpy(header.magic, rspamd_http_file_magic, sizeof(rspamd_http_file_magic)); + header.mtime = htdata->last_modified; + header.next_check = map->next_check; + header.data_off = sizeof(header); + + if (htdata->etag) { + header.data_off += RSPAMD_FSTRING_LEN(htdata->etag); + header.etag_len = RSPAMD_FSTRING_LEN(htdata->etag); + } + else { + header.etag_len = 0; + } + + if (write(fd, &header, sizeof(header)) != sizeof(header)) { + msg_err_map("cannot write file %s (header stage): %s", path, strerror(errno)); + rspamd_file_unlock(fd, FALSE); + close(fd); + + return FALSE; + } + + if (header.etag_len > 0) { + if (write(fd, RSPAMD_FSTRING_DATA(htdata->etag), header.etag_len) != + header.etag_len) { + msg_err_map("cannot write file %s (etag stage): %s", path, strerror(errno)); + rspamd_file_unlock(fd, FALSE); + close(fd); + + return FALSE; + } + } + + /* Now write the rest */ + if (write(fd, data, len) != len) { + msg_err_map("cannot write file %s (data stage): %s", path, strerror(errno)); + rspamd_file_unlock(fd, FALSE); + close(fd); + + return FALSE; + } + + rspamd_file_unlock(fd, FALSE); + close(fd); + + msg_info_map("saved data from %s in %s, %uz bytes", bk->uri, path, len + sizeof(header) + header.etag_len); + + return TRUE; +} + +static gboolean +rspamd_map_update_http_cached_file(struct rspamd_map *map, + struct rspamd_map_backend *bk, + struct http_map_data *htdata) +{ + gchar path[PATH_MAX]; + guchar digest[rspamd_cryptobox_HASHBYTES]; + struct rspamd_config *cfg = map->cfg; + gint fd; + struct rspamd_http_file_data header; + + if (!rspamd_map_has_http_cached_file(map, bk)) { + return FALSE; + } + + rspamd_cryptobox_hash(digest, bk->uri, strlen(bk->uri), NULL, 0); + rspamd_snprintf(path, sizeof(path), "%s%c%*xs.map", cfg->maps_cache_dir, + G_DIR_SEPARATOR, 20, digest); + + fd = rspamd_file_xopen(path, O_WRONLY, + 00600, FALSE); + + if (fd == -1) { + return FALSE; + } + + if (!rspamd_file_lock(fd, FALSE)) { + msg_err_map("cannot lock file %s: %s", path, strerror(errno)); + close(fd); + + return FALSE; + } + + memcpy(header.magic, rspamd_http_file_magic, sizeof(rspamd_http_file_magic)); + header.mtime = htdata->last_modified; + header.next_check = map->next_check; + header.data_off = sizeof(header); + + if (htdata->etag) { + header.data_off += RSPAMD_FSTRING_LEN(htdata->etag); + header.etag_len = RSPAMD_FSTRING_LEN(htdata->etag); + } + else { + header.etag_len = 0; + } + + if (write(fd, &header, sizeof(header)) != sizeof(header)) { + msg_err_map("cannot update file %s (header stage): %s", path, strerror(errno)); + rspamd_file_unlock(fd, FALSE); + close(fd); + + return FALSE; + } + + if (header.etag_len > 0) { + if (write(fd, RSPAMD_FSTRING_DATA(htdata->etag), header.etag_len) != + header.etag_len) { + msg_err_map("cannot update file %s (etag stage): %s", path, strerror(errno)); + rspamd_file_unlock(fd, FALSE); + close(fd); + + return FALSE; + } + } + + rspamd_file_unlock(fd, FALSE); + close(fd); + + return TRUE; +} + + +static gboolean +rspamd_map_read_http_cached_file(struct rspamd_map *map, + struct rspamd_map_backend *bk, + struct http_map_data *htdata, + struct map_cb_data *cbdata) +{ + gchar path[PATH_MAX]; + guchar digest[rspamd_cryptobox_HASHBYTES]; + struct rspamd_config *cfg = map->cfg; + gint fd; + struct stat st; + struct rspamd_http_file_data header; + + if (cfg->maps_cache_dir == NULL || cfg->maps_cache_dir[0] == '\0') { + return FALSE; + } + + rspamd_cryptobox_hash(digest, bk->uri, strlen(bk->uri), NULL, 0); + rspamd_snprintf(path, sizeof(path), "%s%c%*xs.map", cfg->maps_cache_dir, + G_DIR_SEPARATOR, 20, digest); + + fd = rspamd_file_xopen(path, O_RDONLY, 00600, FALSE); + + if (fd == -1) { + return FALSE; + } + + if (!rspamd_file_lock(fd, FALSE)) { + msg_err_map("cannot lock file %s: %s", path, strerror(errno)); + close(fd); + + return FALSE; + } + + (void) fstat(fd, &st); + + if (read(fd, &header, sizeof(header)) != sizeof(header)) { + msg_err_map("cannot read file %s (header stage): %s", path, strerror(errno)); + rspamd_file_unlock(fd, FALSE); + close(fd); + + return FALSE; + } + + if (memcmp(header.magic, rspamd_http_file_magic, + sizeof(rspamd_http_file_magic)) != 0) { + msg_warn_map("invalid or old version magic in file %s; ignore it", path); + rspamd_file_unlock(fd, FALSE); + close(fd); + + return FALSE; + } + + double now = rspamd_get_calendar_ticks(); + + if (header.next_check > now) { + map->next_check = header.next_check; + } + else { + map->next_check = now; + } + + htdata->last_modified = header.mtime; + + if (header.etag_len > 0) { + rspamd_fstring_t *etag = rspamd_fstring_sized_new(header.etag_len); + + if (read(fd, RSPAMD_FSTRING_DATA(etag), header.etag_len) != header.etag_len) { + msg_err_map("cannot read file %s (etag stage): %s", path, + strerror(errno)); + rspamd_file_unlock(fd, FALSE); + rspamd_fstring_free(etag); + close(fd); + + return FALSE; + } + + etag->len = header.etag_len; + + if (htdata->etag) { + /* FIXME: should be dealt somehow better */ + msg_warn_map("etag is already defined as %V; cached is %V; ignore cached", + htdata->etag, etag); + rspamd_fstring_free(etag); + } + else { + htdata->etag = etag; + } + } + + rspamd_file_unlock(fd, FALSE); + close(fd); + + /* Now read file data */ + /* Perform buffered read: fail-safe */ + if (!read_map_file_chunks(map, cbdata, path, + st.st_size - header.data_off, header.data_off)) { + return FALSE; + } + + struct tm tm; + gchar ncheck_buf[32], lm_buf[32]; + + rspamd_localtime(map->next_check, &tm); + strftime(ncheck_buf, sizeof(ncheck_buf) - 1, "%Y-%m-%d %H:%M:%S", &tm); + rspamd_localtime(htdata->last_modified, &tm); + strftime(lm_buf, sizeof(lm_buf) - 1, "%Y-%m-%d %H:%M:%S", &tm); + + msg_info_map("read cached data for %s from %s, %uz bytes; next check at: %s;" + " last modified on: %s; etag: %V", + bk->uri, + path, + (size_t) (st.st_size - header.data_off), + ncheck_buf, + lm_buf, + htdata->etag); + + return TRUE; +} + +/** + * Async HTTP callback + */ +static void +rspamd_map_common_http_callback(struct rspamd_map *map, + struct rspamd_map_backend *bk, + struct map_periodic_cbdata *periodic, + gboolean check) +{ + struct http_map_data *data; + struct http_callback_data *cbd; + guint flags = RSPAMD_HTTP_CLIENT_SIMPLE | RSPAMD_HTTP_CLIENT_SHARED; + + data = bk->data.hd; + + if (g_atomic_int_get(&data->cache->available) == 1) { + /* Read cached data */ + if (check) { + if (data->last_modified < data->cache->last_modified) { + msg_info_map("need to reread cached map triggered by %s " + "(%d our modify time, %d cached modify time)", + bk->uri, + (int) data->last_modified, + (int) data->cache->last_modified); + periodic->need_modify = TRUE; + /* Reset the whole chain */ + periodic->cur_backend = 0; + rspamd_map_process_periodic(periodic); + } + else { + if (map->active_http) { + /* Check even if there is a cached version */ + goto check; + } + else { + /* Switch to the next backend */ + periodic->cur_backend++; + rspamd_map_process_periodic(periodic); + } + } + + return; + } + else { + if (map->active_http && + data->last_modified > data->cache->last_modified) { + goto check; + } + else if (rspamd_map_read_cached(map, bk, periodic, data->host)) { + /* Switch to the next backend */ + periodic->cur_backend++; + data->last_modified = data->cache->last_modified; + rspamd_map_process_periodic(periodic); + + return; + } + } + } + else if (!map->active_http) { + /* Switch to the next backend */ + periodic->cur_backend++; + rspamd_map_process_periodic(periodic); + + return; + } + +check: + cbd = g_malloc0(sizeof(struct http_callback_data)); + + cbd->event_loop = map->event_loop; + cbd->addrs = g_ptr_array_sized_new(4); + cbd->map = map; + cbd->data = data; + cbd->check = check; + cbd->periodic = periodic; + MAP_RETAIN(periodic, "periodic"); + cbd->bk = bk; + MAP_RETAIN(bk, "rspamd_map_backend"); + cbd->stage = http_map_terminated; + REF_INIT_RETAIN(cbd, free_http_cbdata); + + msg_debug_map("%s map data from %s", check ? "checking" : "reading", + data->host); + + /* Try address */ + rspamd_inet_addr_t *addr = NULL; + + if (rspamd_parse_inet_address(&addr, data->host, + strlen(data->host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) { + rspamd_inet_address_set_port(addr, cbd->data->port); + g_ptr_array_add(cbd->addrs, (void *) addr); + + if (bk->protocol == MAP_PROTO_HTTPS) { + flags |= RSPAMD_HTTP_CLIENT_SSL; + } + + cbd->conn = rspamd_http_connection_new_client( + NULL, + NULL, + http_map_error, + http_map_finish, + flags, + addr); + + if (cbd->conn != NULL) { + cbd->stage = http_map_http_conn; + write_http_request(cbd); + cbd->addr = addr; + MAP_RELEASE(cbd, "http_callback_data"); + } + else { + msg_warn_map("cannot load map: cannot connect to %s: %s", + data->host, strerror(errno)); + MAP_RELEASE(cbd, "http_callback_data"); + } + + return; + } + else if (map->r->r) { + /* Send both A and AAAA requests */ + guint nreq = 0; + + if (rdns_make_request_full(map->r->r, rspamd_map_dns_callback, cbd, + map->cfg->dns_timeout, map->cfg->dns_retransmits, 1, + data->host, RDNS_REQUEST_A)) { + MAP_RETAIN(cbd, "http_callback_data"); + nreq++; + } + if (rdns_make_request_full(map->r->r, rspamd_map_dns_callback, cbd, + map->cfg->dns_timeout, map->cfg->dns_retransmits, 1, + data->host, RDNS_REQUEST_AAAA)) { + MAP_RETAIN(cbd, "http_callback_data"); + nreq++; + } + + if (nreq == 2) { + cbd->stage = http_map_resolve_host2; + } + else if (nreq == 1) { + cbd->stage = http_map_resolve_host1; + } + + map->tmp_dtor = free_http_cbdata_dtor; + map->tmp_dtor_data = cbd; + } + else { + msg_warn_map("cannot load map: DNS resolver is not initialized"); + cbd->periodic->errored = TRUE; + } + + MAP_RELEASE(cbd, "http_callback_data"); +} + +static void +rspamd_map_http_check_callback(struct map_periodic_cbdata *cbd) +{ + struct rspamd_map *map; + struct rspamd_map_backend *bk; + + map = cbd->map; + bk = g_ptr_array_index(cbd->map->backends, cbd->cur_backend); + + rspamd_map_common_http_callback(map, bk, cbd, TRUE); +} + +static void +rspamd_map_http_read_callback(struct map_periodic_cbdata *cbd) +{ + struct rspamd_map *map; + struct rspamd_map_backend *bk; + + map = cbd->map; + bk = g_ptr_array_index(cbd->map->backends, cbd->cur_backend); + rspamd_map_common_http_callback(map, bk, cbd, FALSE); +} + +static void +rspamd_map_file_check_callback(struct map_periodic_cbdata *periodic) +{ + struct rspamd_map *map; + struct file_map_data *data; + struct rspamd_map_backend *bk; + + map = periodic->map; + bk = g_ptr_array_index(map->backends, periodic->cur_backend); + data = bk->data.fd; + + if (data->need_modify) { + periodic->need_modify = TRUE; + periodic->cur_backend = 0; + data->need_modify = FALSE; + + rspamd_map_process_periodic(periodic); + + return; + } + + map = periodic->map; + /* Switch to the next backend as the rest is handled by ev_stat */ + periodic->cur_backend++; + rspamd_map_process_periodic(periodic); +} + +static void +rspamd_map_static_check_callback(struct map_periodic_cbdata *periodic) +{ + struct rspamd_map *map; + struct static_map_data *data; + struct rspamd_map_backend *bk; + + map = periodic->map; + bk = g_ptr_array_index(map->backends, periodic->cur_backend); + data = bk->data.sd; + + if (!data->processed) { + periodic->need_modify = TRUE; + periodic->cur_backend = 0; + + rspamd_map_process_periodic(periodic); + + return; + } + + /* Switch to the next backend */ + periodic->cur_backend++; + rspamd_map_process_periodic(periodic); +} + +static void +rspamd_map_file_read_callback(struct map_periodic_cbdata *periodic) +{ + struct rspamd_map *map; + struct file_map_data *data; + struct rspamd_map_backend *bk; + + map = periodic->map; + + bk = g_ptr_array_index(map->backends, periodic->cur_backend); + data = bk->data.fd; + + msg_info_map("rereading map file %s", data->filename); + + if (!read_map_file(map, data, bk, periodic)) { + periodic->errored = TRUE; + } + + /* Switch to the next backend */ + periodic->cur_backend++; + rspamd_map_process_periodic(periodic); +} + +static void +rspamd_map_static_read_callback(struct map_periodic_cbdata *periodic) +{ + struct rspamd_map *map; + struct static_map_data *data; + struct rspamd_map_backend *bk; + + map = periodic->map; + + bk = g_ptr_array_index(map->backends, periodic->cur_backend); + data = bk->data.sd; + + msg_info_map("rereading static map"); + + if (!read_map_static(map, data, bk, periodic)) { + periodic->errored = TRUE; + } + + /* Switch to the next backend */ + periodic->cur_backend++; + rspamd_map_process_periodic(periodic); +} + +static void +rspamd_map_process_periodic(struct map_periodic_cbdata *cbd) +{ + struct rspamd_map_backend *bk; + struct rspamd_map *map; + + map = cbd->map; + map->scheduled_check = NULL; + + if (!map->file_only && !cbd->locked) { + if (!g_atomic_int_compare_and_exchange(cbd->map->locked, + 0, 1)) { + msg_debug_map( + "don't try to reread map %s as it is locked by other process, " + "will reread it later", + cbd->map->name); + rspamd_map_schedule_periodic(map, RSPAMD_MAP_SCHEDULE_LOCKED); + MAP_RELEASE(cbd, "periodic"); + + return; + } + else { + msg_debug_map("locked map %s", cbd->map->name); + cbd->locked = TRUE; + } + } + + if (cbd->errored) { + /* We should not check other backends if some backend has failed*/ + rspamd_map_schedule_periodic(cbd->map, RSPAMD_MAP_SCHEDULE_ERROR); + + if (cbd->locked) { + g_atomic_int_set(cbd->map->locked, 0); + cbd->locked = FALSE; + } + + /* Also set error flag for the map consumer */ + cbd->cbdata.errored = true; + + msg_debug_map("unlocked map %s, refcount=%d", cbd->map->name, + cbd->ref.refcount); + MAP_RELEASE(cbd, "periodic"); + + return; + } + + /* For each backend we need to check for modifications */ + if (cbd->cur_backend >= cbd->map->backends->len) { + /* Last backend */ + msg_debug_map("finished map: %d of %d", cbd->cur_backend, + cbd->map->backends->len); + MAP_RELEASE(cbd, "periodic"); + + return; + } + + if (cbd->map->wrk && cbd->map->wrk->state == rspamd_worker_state_running) { + bk = g_ptr_array_index(cbd->map->backends, cbd->cur_backend); + g_assert(bk != NULL); + + if (cbd->need_modify) { + /* Load data from the next backend */ + switch (bk->protocol) { + case MAP_PROTO_HTTP: + case MAP_PROTO_HTTPS: + rspamd_map_http_read_callback(cbd); + break; + case MAP_PROTO_FILE: + rspamd_map_file_read_callback(cbd); + break; + case MAP_PROTO_STATIC: + rspamd_map_static_read_callback(cbd); + break; + } + } + else { + /* Check the next backend */ + switch (bk->protocol) { + case MAP_PROTO_HTTP: + case MAP_PROTO_HTTPS: + rspamd_map_http_check_callback(cbd); + break; + case MAP_PROTO_FILE: + rspamd_map_file_check_callback(cbd); + break; + case MAP_PROTO_STATIC: + rspamd_map_static_check_callback(cbd); + break; + } + } + } +} + +static void +rspamd_map_on_stat(struct ev_loop *loop, ev_stat *w, int revents) +{ + struct rspamd_map *map = (struct rspamd_map *) w->data; + + if (w->attr.st_nlink > 0) { + msg_info_map("old mtime is %t (size = %Hz), " + "new mtime is %t (size = %Hz) for map file %s", + w->prev.st_mtime, (gsize) w->prev.st_size, + w->attr.st_mtime, (gsize) w->attr.st_size, + w->path); + + /* Fire need modify flag */ + struct rspamd_map_backend *bk; + guint i; + + PTR_ARRAY_FOREACH(map->backends, i, bk) + { + if (bk->protocol == MAP_PROTO_FILE) { + bk->data.fd->need_modify = TRUE; + } + } + + map->next_check = 0; + + if (map->scheduled_check) { + ev_timer_stop(map->event_loop, &map->scheduled_check->ev); + MAP_RELEASE(map->scheduled_check, "rspamd_map_on_stat"); + map->scheduled_check = NULL; + } + + rspamd_map_schedule_periodic(map, RSPAMD_MAP_SCHEDULE_INIT); + } +} + +/* Start watching event for all maps */ +void rspamd_map_watch(struct rspamd_config *cfg, + struct ev_loop *event_loop, + struct rspamd_dns_resolver *resolver, + struct rspamd_worker *worker, + enum rspamd_map_watch_type how) +{ + GList *cur = cfg->maps; + struct rspamd_map *map; + struct rspamd_map_backend *bk; + guint i; + + g_assert(how > RSPAMD_MAP_WATCH_MIN && how < RSPAMD_MAP_WATCH_MAX); + + /* First of all do synced read of data */ + while (cur) { + map = cur->data; + map->event_loop = event_loop; + map->r = resolver; + + if (map->wrk == NULL && how != RSPAMD_MAP_WATCH_WORKER) { + /* Generic scanner map */ + map->wrk = worker; + + if (how == RSPAMD_MAP_WATCH_PRIMARY_CONTROLLER) { + map->active_http = TRUE; + } + else { + map->active_http = FALSE; + } + } + else if (map->wrk != NULL && map->wrk == worker) { + /* Map is bound to a specific worker */ + map->active_http = TRUE; + } + else { + /* Skip map for this worker as irrelevant */ + cur = g_list_next(cur); + continue; + } + + if (!map->active_http) { + /* Check cached version more frequently as it is cheap */ + + if (map->poll_timeout >= cfg->map_timeout && + cfg->map_file_watch_multiplier < 1.0) { + map->poll_timeout = + map->poll_timeout * cfg->map_file_watch_multiplier; + } + } + + map->file_only = TRUE; + map->static_only = TRUE; + + PTR_ARRAY_FOREACH(map->backends, i, bk) + { + bk->event_loop = event_loop; + + if (bk->protocol == MAP_PROTO_FILE) { + struct file_map_data *data; + + data = bk->data.fd; + + if (map->user_data == NULL || *map->user_data == NULL) { + /* Map has not been read, init it's reading if possible */ + struct stat st; + + if (stat(data->filename, &st) != -1) { + data->need_modify = TRUE; + } + } + + ev_stat_init(&data->st_ev, rspamd_map_on_stat, + data->filename, map->poll_timeout * cfg->map_file_watch_multiplier); + data->st_ev.data = map; + ev_stat_start(event_loop, &data->st_ev); + map->static_only = FALSE; + } + else if ((bk->protocol == MAP_PROTO_HTTP || + bk->protocol == MAP_PROTO_HTTPS)) { + if (map->active_http) { + map->non_trivial = TRUE; + } + + map->static_only = FALSE; + map->file_only = FALSE; + } + } + + rspamd_map_schedule_periodic(map, RSPAMD_MAP_SCHEDULE_INIT); + + cur = g_list_next(cur); + } +} + +void rspamd_map_preload(struct rspamd_config *cfg) +{ + GList *cur = cfg->maps; + struct rspamd_map *map; + struct rspamd_map_backend *bk; + guint i; + gboolean map_ok; + + /* First of all do synced read of data */ + while (cur) { + map = cur->data; + map_ok = TRUE; + + PTR_ARRAY_FOREACH(map->backends, i, bk) + { + if (!(bk->protocol == MAP_PROTO_FILE || + bk->protocol == MAP_PROTO_STATIC)) { + + if (bk->protocol == MAP_PROTO_HTTP || + bk->protocol == MAP_PROTO_HTTPS) { + if (!rspamd_map_has_http_cached_file(map, bk)) { + + if (!map->fallback_backend) { + map_ok = FALSE; + } + break; + } + else { + continue; /* We are yet fine */ + } + } + map_ok = FALSE; + break; + } + } + + if (map_ok) { + struct map_periodic_cbdata fake_cbd; + gboolean succeed = TRUE; + + memset(&fake_cbd, 0, sizeof(fake_cbd)); + fake_cbd.cbdata.state = 0; + fake_cbd.cbdata.prev_data = *map->user_data; + fake_cbd.cbdata.cur_data = NULL; + fake_cbd.cbdata.map = map; + fake_cbd.map = map; + + PTR_ARRAY_FOREACH(map->backends, i, bk) + { + fake_cbd.cur_backend = i; + + if (bk->protocol == MAP_PROTO_FILE) { + if (!read_map_file(map, bk->data.fd, bk, &fake_cbd)) { + succeed = FALSE; + break; + } + } + else if (bk->protocol == MAP_PROTO_STATIC) { + if (!read_map_static(map, bk->data.sd, bk, &fake_cbd)) { + succeed = FALSE; + break; + } + } + else if (bk->protocol == MAP_PROTO_HTTP || + bk->protocol == MAP_PROTO_HTTPS) { + if (!rspamd_map_read_http_cached_file(map, bk, bk->data.hd, + &fake_cbd.cbdata)) { + + if (map->fallback_backend) { + /* Try fallback */ + g_assert(map->fallback_backend->protocol == + MAP_PROTO_FILE); + if (!read_map_file(map, + map->fallback_backend->data.fd, + map->fallback_backend, &fake_cbd)) { + succeed = FALSE; + break; + } + } + else { + succeed = FALSE; + break; + } + } + } + else { + g_assert_not_reached(); + } + } + + if (succeed) { + map->fin_callback(&fake_cbd.cbdata, map->user_data); + + if (map->on_load_function) { + map->on_load_function(map, map->on_load_ud); + } + } + else { + msg_info_map("preload of %s failed", map->name); + } + } + + cur = g_list_next(cur); + } +} + +void rspamd_map_remove_all(struct rspamd_config *cfg) +{ + struct rspamd_map *map; + GList *cur; + struct rspamd_map_backend *bk; + struct map_cb_data cbdata; + guint i; + + for (cur = cfg->maps; cur != NULL; cur = g_list_next(cur)) { + map = cur->data; + + if (map->tmp_dtor) { + map->tmp_dtor(map->tmp_dtor_data); + } + + if (map->dtor) { + cbdata.prev_data = NULL; + cbdata.map = map; + cbdata.cur_data = *map->user_data; + + map->dtor(&cbdata); + *map->user_data = NULL; + } + + if (map->on_load_ud_dtor) { + map->on_load_ud_dtor(map->on_load_ud); + } + + for (i = 0; i < map->backends->len; i++) { + bk = g_ptr_array_index(map->backends, i); + + MAP_RELEASE(bk, "rspamd_map_backend"); + } + + if (map->fallback_backend) { + MAP_RELEASE(map->fallback_backend, "rspamd_map_backend"); + } + } + + g_list_free(cfg->maps); + cfg->maps = NULL; +} + +static const gchar * +rspamd_map_check_proto(struct rspamd_config *cfg, + const gchar *map_line, struct rspamd_map_backend *bk) +{ + const gchar *pos = map_line, *end, *end_key; + + g_assert(bk != NULL); + g_assert(pos != NULL); + + end = pos + strlen(pos); + + /* Static check */ + if (g_ascii_strcasecmp(pos, "static") == 0) { + bk->protocol = MAP_PROTO_STATIC; + bk->uri = g_strdup(pos); + + return pos; + } + else if (g_ascii_strcasecmp(pos, "zst+static") == 0) { + bk->protocol = MAP_PROTO_STATIC; + bk->uri = g_strdup(pos + 4); + bk->is_compressed = TRUE; + + return pos + 4; + } + + for (;;) { + if (g_ascii_strncasecmp(pos, "sign+", sizeof("sign+") - 1) == 0) { + bk->is_signed = TRUE; + pos += sizeof("sign+") - 1; + } + else if (g_ascii_strncasecmp(pos, "fallback+", sizeof("fallback+") - 1) == 0) { + bk->is_fallback = TRUE; + pos += sizeof("fallback+") - 1; + } + else if (g_ascii_strncasecmp(pos, "key=", sizeof("key=") - 1) == 0) { + pos += sizeof("key=") - 1; + end_key = memchr(pos, '+', end - pos); + + if (end_key != NULL) { + bk->trusted_pubkey = rspamd_pubkey_from_base32(pos, end_key - pos, + RSPAMD_KEYPAIR_SIGN, RSPAMD_CRYPTOBOX_MODE_25519); + + if (bk->trusted_pubkey == NULL) { + msg_err_config("cannot read pubkey from map: %s", + map_line); + return NULL; + } + pos = end_key + 1; + } + else if (end - pos > 64) { + /* Try hex encoding */ + bk->trusted_pubkey = rspamd_pubkey_from_hex(pos, 64, + RSPAMD_KEYPAIR_SIGN, RSPAMD_CRYPTOBOX_MODE_25519); + + if (bk->trusted_pubkey == NULL) { + msg_err_config("cannot read pubkey from map: %s", + map_line); + return NULL; + } + pos += 64; + } + else { + msg_err_config("cannot read pubkey from map: %s", + map_line); + return NULL; + } + + if (*pos == '+' || *pos == ':') { + pos++; + } + } + else { + /* No known flags */ + break; + } + } + + bk->protocol = MAP_PROTO_FILE; + + if (g_ascii_strncasecmp(pos, "http://", sizeof("http://") - 1) == 0) { + bk->protocol = MAP_PROTO_HTTP; + /* Include http:// */ + bk->uri = g_strdup(pos); + pos += sizeof("http://") - 1; + } + else if (g_ascii_strncasecmp(pos, "https://", sizeof("https://") - 1) == 0) { + bk->protocol = MAP_PROTO_HTTPS; + /* Include https:// */ + bk->uri = g_strdup(pos); + pos += sizeof("https://") - 1; + } + else if (g_ascii_strncasecmp(pos, "file://", sizeof("file://") - 1) == 0) { + pos += sizeof("file://") - 1; + /* Exclude file:// */ + bk->uri = g_strdup(pos); + } + else if (*pos == '/') { + /* Trivial file case */ + bk->uri = g_strdup(pos); + } + else { + msg_err_config("invalid map fetching protocol: %s", map_line); + + return NULL; + } + + if (bk->protocol != MAP_PROTO_FILE && bk->is_signed) { + msg_err_config("signed maps are no longer supported for HTTP(s): %s", map_line); + } + + return pos; +} + +gboolean +rspamd_map_is_map(const gchar *map_line) +{ + gboolean ret = FALSE; + + g_assert(map_line != NULL); + + if (map_line[0] == '/') { + ret = TRUE; + } + else if (g_ascii_strncasecmp(map_line, "sign+", sizeof("sign+") - 1) == 0) { + ret = TRUE; + } + else if (g_ascii_strncasecmp(map_line, "fallback+", sizeof("fallback+") - 1) == 0) { + ret = TRUE; + } + else if (g_ascii_strncasecmp(map_line, "file://", sizeof("file://") - 1) == 0) { + ret = TRUE; + } + else if (g_ascii_strncasecmp(map_line, "http://", sizeof("http://") - 1) == 0) { + ret = TRUE; + } + else if (g_ascii_strncasecmp(map_line, "https://", sizeof("https://") - 1) == 0) { + ret = TRUE; + } + + return ret; +} + +static void +rspamd_map_backend_dtor(struct rspamd_map_backend *bk) +{ + switch (bk->protocol) { + case MAP_PROTO_FILE: + if (bk->data.fd) { + ev_stat_stop(bk->event_loop, &bk->data.fd->st_ev); + g_free(bk->data.fd->filename); + g_free(bk->data.fd); + } + break; + case MAP_PROTO_STATIC: + if (bk->data.sd) { + if (bk->data.sd->data) { + g_free(bk->data.sd->data); + } + + g_free(bk->data.sd); + } + break; + case MAP_PROTO_HTTP: + case MAP_PROTO_HTTPS: + if (bk->data.hd) { + struct http_map_data *data = bk->data.hd; + + g_free(data->host); + g_free(data->path); + g_free(data->rest); + + if (data->userinfo) { + g_free(data->userinfo); + } + + if (data->etag) { + rspamd_fstring_free(data->etag); + } + + /* + * Clear cached file, but check if a worker is an active http worker + * as cur_cache_cbd is meaningful merely for active worker, who actually + * owns the cache + */ + if (bk->map && bk->map->active_http) { + if (g_atomic_int_compare_and_exchange(&data->cache->available, 1, 0)) { + if (data->cur_cache_cbd) { + msg_info("clear shared memory cache for a map in %s as backend \"%s\" is closing", + data->cur_cache_cbd->shm->shm_name, + bk->uri); + MAP_RELEASE(data->cur_cache_cbd->shm, + "rspamd_http_map_cached_cbdata"); + ev_timer_stop(data->cur_cache_cbd->event_loop, + &data->cur_cache_cbd->timeout); + g_free(data->cur_cache_cbd); + data->cur_cache_cbd = NULL; + } + } + } + + g_free(bk->data.hd); + } + break; + } + + if (bk->trusted_pubkey) { + rspamd_pubkey_unref(bk->trusted_pubkey); + } + + g_free(bk->uri); + g_free(bk); +} + +static struct rspamd_map_backend * +rspamd_map_parse_backend(struct rspamd_config *cfg, const gchar *map_line) +{ + struct rspamd_map_backend *bk; + struct file_map_data *fdata = NULL; + struct http_map_data *hdata = NULL; + struct static_map_data *sdata = NULL; + struct http_parser_url up; + const gchar *end, *p; + rspamd_ftok_t tok; + + bk = g_malloc0(sizeof(*bk)); + REF_INIT_RETAIN(bk, rspamd_map_backend_dtor); + + if (!rspamd_map_check_proto(cfg, map_line, bk)) { + goto err; + } + + if (bk->is_fallback && bk->protocol != MAP_PROTO_FILE) { + msg_err_config("fallback backend must be file for %s", bk->uri); + + goto err; + } + + end = map_line + strlen(map_line); + if (end - map_line > 5) { + p = end - 5; + if (g_ascii_strcasecmp(p, ".zstd") == 0) { + bk->is_compressed = TRUE; + } + p = end - 4; + if (g_ascii_strcasecmp(p, ".zst") == 0) { + bk->is_compressed = TRUE; + } + } + + /* Now check for each proto separately */ + if (bk->protocol == MAP_PROTO_FILE) { + fdata = g_malloc0(sizeof(struct file_map_data)); + + if (access(bk->uri, R_OK) == -1) { + if (errno != ENOENT) { + msg_err_config("cannot open file '%s': %s", bk->uri, strerror(errno)); + goto err; + } + + msg_info_config( + "map '%s' is not found, but it can be loaded automatically later", + bk->uri); + } + + fdata->filename = g_strdup(bk->uri); + bk->data.fd = fdata; + } + else if (bk->protocol == MAP_PROTO_HTTP || bk->protocol == MAP_PROTO_HTTPS) { + hdata = g_malloc0(sizeof(struct http_map_data)); + + memset(&up, 0, sizeof(up)); + if (http_parser_parse_url(bk->uri, strlen(bk->uri), FALSE, + &up) != 0) { + msg_err_config("cannot parse HTTP url: %s", bk->uri); + goto err; + } + else { + if (!(up.field_set & 1u << UF_HOST)) { + msg_err_config("cannot parse HTTP url: %s: no host", bk->uri); + goto err; + } + + tok.begin = bk->uri + up.field_data[UF_HOST].off; + tok.len = up.field_data[UF_HOST].len; + hdata->host = rspamd_ftokdup(&tok); + + if (up.field_set & (1u << UF_PORT)) { + hdata->port = up.port; + } + else { + if (bk->protocol == MAP_PROTO_HTTP) { + hdata->port = 80; + } + else { + hdata->port = 443; + } + } + + if (up.field_set & (1u << UF_PATH)) { + tok.begin = bk->uri + up.field_data[UF_PATH].off; + tok.len = up.field_data[UF_PATH].len; + + hdata->path = rspamd_ftokdup(&tok); + + /* We also need to check query + fragment */ + if (up.field_set & ((1u << UF_QUERY) | (1u << UF_FRAGMENT))) { + tok.begin = bk->uri + up.field_data[UF_PATH].off + + up.field_data[UF_PATH].len; + tok.len = strlen(tok.begin); + hdata->rest = rspamd_ftokdup(&tok); + } + else { + hdata->rest = g_strdup(""); + } + } + + if (up.field_set & (1u << UF_USERINFO)) { + /* Create authorisation header for basic auth */ + guint len = sizeof("Basic ") + + up.field_data[UF_USERINFO].len * 8 / 5 + 4; + hdata->userinfo = g_malloc(len); + rspamd_snprintf(hdata->userinfo, len, "Basic %*Bs", + (int) up.field_data[UF_USERINFO].len, + bk->uri + up.field_data[UF_USERINFO].off); + + msg_debug("added userinfo for the map from the URL: %s", hdata->host); + } + else { + /* Try to obtain authentication data from options in the configuration */ + const ucl_object_t *auth_obj, *opts_obj; + + opts_obj = ucl_object_lookup(cfg->cfg_ucl_obj, "options"); + if (opts_obj != NULL) { + auth_obj = ucl_object_lookup(opts_obj, "http_auth"); + if (auth_obj != NULL && ucl_object_type(auth_obj) == UCL_OBJECT) { + const ucl_object_t *host_obj; + + /* + * Search first by the full URL and then by the host part + */ + host_obj = ucl_object_lookup(auth_obj, map_line); + + if (host_obj == NULL) { + host_obj = ucl_object_lookup(auth_obj, hdata->host); + } + + if (host_obj != NULL && ucl_object_type(host_obj) == UCL_OBJECT) { + const ucl_object_t *user_obj, *password_obj; + + user_obj = ucl_object_lookup(host_obj, "user"); + password_obj = ucl_object_lookup(host_obj, "password"); + + if (user_obj != NULL && password_obj != NULL && + ucl_object_type(user_obj) == UCL_STRING && + ucl_object_type(password_obj) == UCL_STRING) { + + gchar *tmpbuf; + unsigned tlen; + + /* User + password + ':' */ + tlen = strlen(ucl_object_tostring(user_obj)) + + strlen(ucl_object_tostring(password_obj)) + 1; + tmpbuf = g_malloc(tlen + 1); + rspamd_snprintf(tmpbuf, tlen + 1, "%s:%s", + ucl_object_tostring(user_obj), + ucl_object_tostring(password_obj)); + /* Base64 encoding is not so greedy, but we add some space for simplicity */ + tlen *= 2; + tlen += sizeof("Basic ") - 1; + hdata->userinfo = g_malloc(tlen + 1); + rspamd_snprintf(hdata->userinfo, tlen + 1, "Basic %Bs", tmpbuf); + g_free(tmpbuf); + msg_debug("added userinfo for the map from the configuration: %s", map_line); + } + } + } + } + } + } + + hdata->cache = rspamd_mempool_alloc0_shared(cfg->cfg_pool, + sizeof(*hdata->cache)); + + bk->data.hd = hdata; + } + else if (bk->protocol == MAP_PROTO_STATIC) { + sdata = g_malloc0(sizeof(*sdata)); + bk->data.sd = sdata; + } + + bk->id = rspamd_cryptobox_fast_hash_specific(RSPAMD_CRYPTOBOX_T1HA, + bk->uri, strlen(bk->uri), + 0xdeadbabe); + + return bk; + +err: + MAP_RELEASE(bk, "rspamd_map_backend"); + + if (hdata) { + g_free(hdata); + } + + if (fdata) { + g_free(fdata); + } + + if (sdata) { + g_free(sdata); + } + + return NULL; +} + +static void +rspamd_map_calculate_hash(struct rspamd_map *map) +{ + struct rspamd_map_backend *bk; + guint i; + rspamd_cryptobox_hash_state_t st; + gchar *cksum_encoded, cksum[rspamd_cryptobox_HASHBYTES]; + + rspamd_cryptobox_hash_init(&st, NULL, 0); + + for (i = 0; i < map->backends->len; i++) { + bk = g_ptr_array_index(map->backends, i); + rspamd_cryptobox_hash_update(&st, bk->uri, strlen(bk->uri)); + } + + rspamd_cryptobox_hash_final(&st, cksum); + cksum_encoded = rspamd_encode_base32(cksum, sizeof(cksum), RSPAMD_BASE32_DEFAULT); + rspamd_strlcpy(map->tag, cksum_encoded, sizeof(map->tag)); + g_free(cksum_encoded); +} + +static gboolean +rspamd_map_add_static_string(struct rspamd_config *cfg, + const ucl_object_t *elt, + GString *target) +{ + gsize sz; + const gchar *dline; + + if (ucl_object_type(elt) != UCL_STRING) { + msg_err_config("map has static backend but `data` is " + "not string like: %s", + ucl_object_type_to_string(elt->type)); + return FALSE; + } + + /* Otherwise, we copy data to the backend */ + dline = ucl_object_tolstring(elt, &sz); + + if (sz == 0) { + msg_err_config("map has static backend but empty no data"); + return FALSE; + } + + g_string_append_len(target, dline, sz); + g_string_append_c(target, '\n'); + + return TRUE; +} + +struct rspamd_map * +rspamd_map_add(struct rspamd_config *cfg, + const gchar *map_line, + const gchar *description, + map_cb_t read_callback, + map_fin_cb_t fin_callback, + map_dtor_t dtor, + void **user_data, + struct rspamd_worker *worker, + int flags) +{ + struct rspamd_map *map; + struct rspamd_map_backend *bk; + + bk = rspamd_map_parse_backend(cfg, map_line); + if (bk == NULL) { + return NULL; + } + + if (bk->is_fallback) { + msg_err_config("cannot add map with fallback only backend: %s", bk->uri); + REF_RELEASE(bk); + + return NULL; + } + + map = rspamd_mempool_alloc0(cfg->cfg_pool, sizeof(struct rspamd_map)); + map->read_callback = read_callback; + map->fin_callback = fin_callback; + map->dtor = dtor; + map->user_data = user_data; + map->cfg = cfg; + map->id = rspamd_random_uint64_fast(); + map->locked = + rspamd_mempool_alloc0_shared(cfg->cfg_pool, sizeof(gint)); + map->backends = g_ptr_array_sized_new(1); + map->wrk = worker; + rspamd_mempool_add_destructor(cfg->cfg_pool, rspamd_ptr_array_free_hard, + map->backends); + g_ptr_array_add(map->backends, bk); + map->name = rspamd_mempool_strdup(cfg->cfg_pool, map_line); + map->no_file_read = (flags & RSPAMD_MAP_FILE_NO_READ); + + if (bk->protocol == MAP_PROTO_FILE) { + map->poll_timeout = (cfg->map_timeout * cfg->map_file_watch_multiplier); + } + else { + map->poll_timeout = cfg->map_timeout; + } + + if (description != NULL) { + map->description = rspamd_mempool_strdup(cfg->cfg_pool, description); + } + + rspamd_map_calculate_hash(map); + msg_info_map("added map %s", bk->uri); + bk->map = map; + + cfg->maps = g_list_prepend(cfg->maps, map); + + return map; +} + +struct rspamd_map * +rspamd_map_add_fake(struct rspamd_config *cfg, + const gchar *description, + const gchar *name) +{ + struct rspamd_map *map; + + map = rspamd_mempool_alloc0(cfg->cfg_pool, sizeof(struct rspamd_map)); + map->cfg = cfg; + map->id = rspamd_random_uint64_fast(); + map->name = rspamd_mempool_strdup(cfg->cfg_pool, name); + map->user_data = (void **) ↦ /* to prevent null pointer dereferencing */ + + if (description != NULL) { + map->description = rspamd_mempool_strdup(cfg->cfg_pool, description); + } + + return map; +} + +static inline void +rspamd_map_add_backend(struct rspamd_map *map, struct rspamd_map_backend *bk) +{ + if (bk->is_fallback) { + if (map->fallback_backend) { + msg_warn_map("redefining fallback backend from %s to %s", + map->fallback_backend->uri, bk->uri); + } + + map->fallback_backend = bk; + } + else { + g_ptr_array_add(map->backends, bk); + } + + bk->map = map; +} + +struct rspamd_map * +rspamd_map_add_from_ucl(struct rspamd_config *cfg, + const ucl_object_t *obj, + const gchar *description, + map_cb_t read_callback, + map_fin_cb_t fin_callback, + map_dtor_t dtor, + void **user_data, + struct rspamd_worker *worker, + gint flags) +{ + ucl_object_iter_t it = NULL; + const ucl_object_t *cur, *elt; + struct rspamd_map *map; + struct rspamd_map_backend *bk; + guint i; + + g_assert(obj != NULL); + + if (ucl_object_type(obj) == UCL_STRING) { + /* Just a plain string */ + return rspamd_map_add(cfg, ucl_object_tostring(obj), description, + read_callback, fin_callback, dtor, user_data, worker, flags); + } + + map = rspamd_mempool_alloc0(cfg->cfg_pool, sizeof(struct rspamd_map)); + map->read_callback = read_callback; + map->fin_callback = fin_callback; + map->dtor = dtor; + map->user_data = user_data; + map->cfg = cfg; + map->id = rspamd_random_uint64_fast(); + map->locked = + rspamd_mempool_alloc0_shared(cfg->cfg_pool, sizeof(gint)); + map->backends = g_ptr_array_new(); + map->wrk = worker; + map->no_file_read = (flags & RSPAMD_MAP_FILE_NO_READ); + rspamd_mempool_add_destructor(cfg->cfg_pool, rspamd_ptr_array_free_hard, + map->backends); + map->poll_timeout = cfg->map_timeout; + + if (description) { + map->description = rspamd_mempool_strdup(cfg->cfg_pool, description); + } + + if (ucl_object_type(obj) == UCL_ARRAY) { + /* Add array of maps as multiple backends */ + while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) { + if (ucl_object_type(cur) == UCL_STRING) { + bk = rspamd_map_parse_backend(cfg, ucl_object_tostring(cur)); + + if (bk != NULL) { + rspamd_map_add_backend(map, bk); + + if (!map->name) { + map->name = rspamd_mempool_strdup(cfg->cfg_pool, + ucl_object_tostring(cur)); + } + } + } + else { + msg_err_config("bad map element type: %s", + ucl_object_type_to_string(ucl_object_type(cur))); + } + } + + if (map->backends->len == 0) { + msg_err_config("map has no urls to be loaded: empty list"); + goto err; + } + } + else if (ucl_object_type(obj) == UCL_OBJECT) { + elt = ucl_object_lookup(obj, "name"); + if (elt && ucl_object_type(elt) == UCL_STRING) { + map->name = rspamd_mempool_strdup(cfg->cfg_pool, + ucl_object_tostring(elt)); + } + + elt = ucl_object_lookup(obj, "description"); + if (elt && ucl_object_type(elt) == UCL_STRING) { + map->description = rspamd_mempool_strdup(cfg->cfg_pool, + ucl_object_tostring(elt)); + } + + elt = ucl_object_lookup_any(obj, "timeout", "poll", "poll_time", + "watch_interval", NULL); + if (elt) { + map->poll_timeout = ucl_object_todouble(elt); + } + + elt = ucl_object_lookup_any(obj, "upstreams", "url", "urls", NULL); + if (elt == NULL) { + msg_err_config("map has no urls to be loaded: no elt"); + goto err; + } + + if (ucl_object_type(elt) == UCL_ARRAY) { + /* Add array of maps as multiple backends */ + it = ucl_object_iterate_new(elt); + + while ((cur = ucl_object_iterate_safe(it, true)) != NULL) { + if (ucl_object_type(cur) == UCL_STRING) { + bk = rspamd_map_parse_backend(cfg, ucl_object_tostring(cur)); + + if (bk != NULL) { + rspamd_map_add_backend(map, bk); + + if (!map->name) { + map->name = rspamd_mempool_strdup(cfg->cfg_pool, + ucl_object_tostring(cur)); + } + } + } + else { + msg_err_config("bad map element type: %s", + ucl_object_type_to_string(ucl_object_type(cur))); + ucl_object_iterate_free(it); + goto err; + } + } + + ucl_object_iterate_free(it); + + if (map->backends->len == 0) { + msg_err_config("map has no urls to be loaded: empty object list"); + goto err; + } + } + else if (ucl_object_type(elt) == UCL_STRING) { + bk = rspamd_map_parse_backend(cfg, ucl_object_tostring(elt)); + + if (bk != NULL) { + rspamd_map_add_backend(map, bk); + + if (!map->name) { + map->name = rspamd_mempool_strdup(cfg->cfg_pool, + ucl_object_tostring(elt)); + } + } + } + + if (!map->backends || map->backends->len == 0) { + msg_err_config("map has no urls to be loaded: no valid backends"); + goto err; + } + } + else { + msg_err_config("map has invalid type for value: %s", + ucl_object_type_to_string(ucl_object_type(obj))); + goto err; + } + + gboolean all_local = TRUE; + + PTR_ARRAY_FOREACH(map->backends, i, bk) + { + if (bk->protocol == MAP_PROTO_STATIC) { + GString *map_data; + /* We need data field in ucl */ + elt = ucl_object_lookup(obj, "data"); + + if (elt == NULL) { + msg_err_config("map has static backend but no `data` field"); + goto err; + } + + + if (ucl_object_type(elt) == UCL_STRING) { + map_data = g_string_sized_new(32); + + if (rspamd_map_add_static_string(cfg, elt, map_data)) { + bk->data.sd->data = map_data->str; + bk->data.sd->len = map_data->len; + g_string_free(map_data, FALSE); + } + else { + g_string_free(map_data, TRUE); + msg_err_config("map has static backend with invalid `data` field"); + goto err; + } + } + else if (ucl_object_type(elt) == UCL_ARRAY) { + map_data = g_string_sized_new(32); + it = ucl_object_iterate_new(elt); + + while ((cur = ucl_object_iterate_safe(it, true))) { + if (!rspamd_map_add_static_string(cfg, cur, map_data)) { + g_string_free(map_data, TRUE); + msg_err_config("map has static backend with invalid " + "`data` field"); + ucl_object_iterate_free(it); + goto err; + } + } + + ucl_object_iterate_free(it); + bk->data.sd->data = map_data->str; + bk->data.sd->len = map_data->len; + g_string_free(map_data, FALSE); + } + } + else if (bk->protocol != MAP_PROTO_FILE) { + all_local = FALSE; + } + } + + if (all_local) { + map->poll_timeout = (map->poll_timeout * + cfg->map_file_watch_multiplier); + } + + rspamd_map_calculate_hash(map); + msg_debug_map("added map from ucl"); + + cfg->maps = g_list_prepend(cfg->maps, map); + + return map; + +err: + + if (map) { + PTR_ARRAY_FOREACH(map->backends, i, bk) + { + MAP_RELEASE(bk, "rspamd_map_backend"); + } + } + + return NULL; +} + +rspamd_map_traverse_function +rspamd_map_get_traverse_function(struct rspamd_map *map) +{ + if (map) { + return map->traverse_function; + } + + return NULL; +} + +void rspamd_map_traverse(struct rspamd_map *map, rspamd_map_traverse_cb cb, + gpointer cbdata, gboolean reset_hits) +{ + if (*map->user_data && map->traverse_function) { + map->traverse_function(*map->user_data, cb, cbdata, reset_hits); + } +} + +void rspamd_map_set_on_load_function(struct rspamd_map *map, rspamd_map_on_load_function cb, + gpointer cbdata, GDestroyNotify dtor) +{ + if (map) { + map->on_load_function = cb; + map->on_load_ud = cbdata; + map->on_load_ud_dtor = dtor; + } +} |