diff options
Diffstat (limited to 'src/libserver/maps')
-rw-r--r-- | src/libserver/maps/map.c | 3195 | ||||
-rw-r--r-- | src/libserver/maps/map.h | 168 | ||||
-rw-r--r-- | src/libserver/maps/map_helpers.c | 1845 | ||||
-rw-r--r-- | src/libserver/maps/map_helpers.h | 269 | ||||
-rw-r--r-- | src/libserver/maps/map_private.h | 226 |
5 files changed, 5703 insertions, 0 deletions
diff --git a/src/libserver/maps/map.c b/src/libserver/maps/map.c new file mode 100644 index 0000000..7f6a48f --- /dev/null +++ b/src/libserver/maps/map.c @@ -0,0 +1,3195 @@ +/* + * Copyright 2023 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Implementation of map files handling + */ + +#include "config.h" +#include "map.h" +#include "map_private.h" +#include "libserver/http/http_connection.h" +#include "libserver/http/http_private.h" +#include "rspamd.h" +#include "contrib/libev/ev.h" +#include "contrib/uthash/utlist.h" + +#ifdef SYS_ZSTD +#include "zstd.h" +#else +#include "contrib/zstd/zstd.h" +#endif + +#undef MAP_DEBUG_REFS +#ifdef MAP_DEBUG_REFS +#define MAP_RETAIN(x, t) \ + do { \ + msg_err(G_GNUC_PRETTY_FUNCTION ": " t ": retain ref %p, refcount: %d -> %d", (x), (x)->ref.refcount, (x)->ref.refcount + 1); \ + REF_RETAIN(x); \ + } while (0) + +#define MAP_RELEASE(x, t) \ + do { \ + msg_err(G_GNUC_PRETTY_FUNCTION ": " t ": release ref %p, refcount: %d -> %d", (x), (x)->ref.refcount, (x)->ref.refcount - 1); \ + REF_RELEASE(x); \ + } while (0) +#else +#define MAP_RETAIN(x, t) REF_RETAIN(x) +#define MAP_RELEASE(x, t) REF_RELEASE(x) +#endif + +enum rspamd_map_periodic_opts { + RSPAMD_MAP_SCHEDULE_NORMAL = 0, + RSPAMD_MAP_SCHEDULE_ERROR = (1u << 0u), + RSPAMD_MAP_SCHEDULE_LOCKED = (1u << 1u), + RSPAMD_MAP_SCHEDULE_INIT = (1u << 2u), +}; + +static void free_http_cbdata_common(struct http_callback_data *cbd, + gboolean plan_new); +static void free_http_cbdata_dtor(gpointer p); +static void free_http_cbdata(struct http_callback_data *cbd); +static void rspamd_map_process_periodic(struct map_periodic_cbdata *cbd); +static void rspamd_map_schedule_periodic(struct rspamd_map *map, int how); +static gboolean read_map_file_chunks(struct rspamd_map *map, + struct map_cb_data *cbdata, + const gchar *fname, + gsize len, + goffset off); +static gboolean rspamd_map_save_http_cached_file(struct rspamd_map *map, + struct rspamd_map_backend *bk, + struct http_map_data *htdata, + const guchar *data, + gsize len); +static gboolean rspamd_map_update_http_cached_file(struct rspamd_map *map, + struct rspamd_map_backend *bk, + struct http_map_data *htdata); + +guint rspamd_map_log_id = (guint) -1; +RSPAMD_CONSTRUCTOR(rspamd_map_log_init) +{ + rspamd_map_log_id = rspamd_logger_add_debug_module("map"); +} + +/** + * Write HTTP request + */ +static void +write_http_request(struct http_callback_data *cbd) +{ + gchar datebuf[128]; + struct rspamd_http_message *msg; + + msg = rspamd_http_new_message(HTTP_REQUEST); + if (cbd->check) { + msg->method = HTTP_HEAD; + } + + msg->url = rspamd_fstring_append(msg->url, + cbd->data->path, strlen(cbd->data->path)); + + if (cbd->check) { + if (cbd->data->last_modified != 0) { + rspamd_http_date_format(datebuf, sizeof(datebuf), + cbd->data->last_modified); + rspamd_http_message_add_header(msg, "If-Modified-Since", + datebuf); + } + if (cbd->data->etag) { + rspamd_http_message_add_header_len(msg, "If-None-Match", + cbd->data->etag->str, cbd->data->etag->len); + } + } + + msg->url = rspamd_fstring_append(msg->url, cbd->data->rest, + strlen(cbd->data->rest)); + + if (cbd->data->userinfo) { + rspamd_http_message_add_header(msg, "Authorization", + cbd->data->userinfo); + } + + MAP_RETAIN(cbd, "http_callback_data"); + rspamd_http_connection_write_message(cbd->conn, + msg, + cbd->data->host, + NULL, + cbd, + cbd->timeout); +} + +/** + * Callback for destroying HTTP callback data + */ +static void +free_http_cbdata_common(struct http_callback_data *cbd, gboolean plan_new) +{ + struct map_periodic_cbdata *periodic = cbd->periodic; + + if (cbd->shmem_data) { + rspamd_http_message_shmem_unref(cbd->shmem_data); + } + + if (cbd->pk) { + rspamd_pubkey_unref(cbd->pk); + } + + if (cbd->conn) { + rspamd_http_connection_unref(cbd->conn); + cbd->conn = NULL; + } + + if (cbd->addrs) { + rspamd_inet_addr_t *addr; + guint i; + + PTR_ARRAY_FOREACH(cbd->addrs, i, addr) + { + rspamd_inet_address_free(addr); + } + + g_ptr_array_free(cbd->addrs, TRUE); + } + + + MAP_RELEASE(cbd->bk, "rspamd_map_backend"); + + if (periodic) { + /* Detached in case of HTTP error */ + MAP_RELEASE(periodic, "periodic"); + } + + g_free(cbd); +} + +static void +free_http_cbdata(struct http_callback_data *cbd) +{ + cbd->map->tmp_dtor = NULL; + cbd->map->tmp_dtor_data = NULL; + + free_http_cbdata_common(cbd, TRUE); +} + +static void +free_http_cbdata_dtor(gpointer p) +{ + struct http_callback_data *cbd = p; + struct rspamd_map *map; + + map = cbd->map; + if (cbd->stage == http_map_http_conn) { + REF_RELEASE(cbd); + } + else { + /* We cannot terminate DNS requests sent */ + cbd->stage = http_map_terminated; + } + + msg_warn_map("%s: " + "connection with http server is terminated: worker is stopping", + map->name); +} + +/* + * HTTP callbacks + */ +static void +http_map_error(struct rspamd_http_connection *conn, + GError *err) +{ + struct http_callback_data *cbd = conn->ud; + struct rspamd_map *map; + + map = cbd->map; + + if (cbd->periodic) { + cbd->periodic->errored = TRUE; + msg_err_map("error reading %s(%s): " + "connection with http server terminated incorrectly: %e", + cbd->bk->uri, + cbd->addr ? rspamd_inet_address_to_string_pretty(cbd->addr) : "", + err); + + rspamd_map_process_periodic(cbd->periodic); + } + + MAP_RELEASE(cbd, "http_callback_data"); +} + +static void +rspamd_map_cache_cb(struct ev_loop *loop, ev_timer *w, int revents) +{ + struct rspamd_http_map_cached_cbdata *cache_cbd = (struct rspamd_http_map_cached_cbdata *) + w->data; + struct rspamd_map *map; + struct http_map_data *data; + + map = cache_cbd->map; + data = cache_cbd->data; + + if (cache_cbd->gen != cache_cbd->data->gen) { + /* We have another update, so this cache element is obviously expired */ + /* + * Important!: we do not set cache availability to zero here, as there + * might be fresh cache + */ + msg_info_map("cached data is now expired (gen mismatch %L != %L) for %s; shm name=%s; refcount=%d", + cache_cbd->gen, cache_cbd->data->gen, map->name, cache_cbd->shm->shm_name, + cache_cbd->shm->ref.refcount); + MAP_RELEASE(cache_cbd->shm, "rspamd_http_map_cached_cbdata"); + ev_timer_stop(loop, &cache_cbd->timeout); + g_free(cache_cbd); + } + else if (cache_cbd->data->last_checked >= cache_cbd->last_checked) { + /* + * We checked map but we have not found anything more recent, + * reschedule cache check + */ + if (cache_cbd->map->poll_timeout > + rspamd_get_calendar_ticks() - cache_cbd->data->last_checked) { + w->repeat = cache_cbd->map->poll_timeout - + (rspamd_get_calendar_ticks() - cache_cbd->data->last_checked); + } + else { + w->repeat = cache_cbd->map->poll_timeout; + } + + if (w->repeat < 0) { + msg_info_map("cached data for %s has skewed check time: %d last checked, " + "%d poll timeout, %.2f diff; shm name=%s; refcount=%d", + map->name, (int) cache_cbd->data->last_checked, + (int) cache_cbd->map->poll_timeout, + (rspamd_get_calendar_ticks() - cache_cbd->data->last_checked), + cache_cbd->shm->shm_name, + cache_cbd->shm->ref.refcount); + w->repeat = 0.0; + } + + cache_cbd->last_checked = cache_cbd->data->last_checked; + msg_debug_map("cached data is up to date for %s", map->name); + ev_timer_again(loop, &cache_cbd->timeout); + } + else { + data->cur_cache_cbd = NULL; + g_atomic_int_set(&data->cache->available, 0); + msg_info_map("cached data is now expired for %s; shm name=%s; refcount=%d", + map->name, + cache_cbd->shm->shm_name, + cache_cbd->shm->ref.refcount); + MAP_RELEASE(cache_cbd->shm, "rspamd_http_map_cached_cbdata"); + ev_timer_stop(loop, &cache_cbd->timeout); + g_free(cache_cbd); + } +} + +static int +http_map_finish(struct rspamd_http_connection *conn, + struct rspamd_http_message *msg) +{ + struct http_callback_data *cbd = conn->ud; + struct rspamd_map *map; + struct rspamd_map_backend *bk; + struct http_map_data *data; + struct rspamd_http_map_cached_cbdata *cache_cbd; + const rspamd_ftok_t *expires_hdr, *etag_hdr; + char next_check_date[128]; + guchar *in = NULL; + gsize dlen = 0; + + map = cbd->map; + bk = cbd->bk; + data = bk->data.hd; + + if (msg->code == 200) { + + if (cbd->check) { + msg_info_map("need to reread map from %s", cbd->bk->uri); + cbd->periodic->need_modify = TRUE; + /* Reset the whole chain */ + cbd->periodic->cur_backend = 0; + /* Reset cache, old cached data will be cleaned on timeout */ + g_atomic_int_set(&data->cache->available, 0); + data->cur_cache_cbd = NULL; + + rspamd_map_process_periodic(cbd->periodic); + MAP_RELEASE(cbd, "http_callback_data"); + + return 0; + } + + cbd->data->last_checked = msg->date; + + if (msg->last_modified) { + cbd->data->last_modified = msg->last_modified; + } + else { + cbd->data->last_modified = msg->date; + } + + + /* Unsigned version - just open file */ + cbd->shmem_data = rspamd_http_message_shmem_ref(msg); + cbd->data_len = msg->body_buf.len; + + if (cbd->data_len == 0) { + msg_err_map("cannot read empty map"); + goto err; + } + + g_assert(cbd->shmem_data != NULL); + + in = rspamd_shmem_xmap(cbd->shmem_data->shm_name, PROT_READ, &dlen); + + if (in == NULL) { + msg_err_map("cannot read tempfile %s: %s", + cbd->shmem_data->shm_name, + strerror(errno)); + goto err; + } + + /* Check for expires */ + double cached_timeout = map->poll_timeout * 2; + + expires_hdr = rspamd_http_message_find_header(msg, "Expires"); + + if (expires_hdr) { + time_t hdate; + + hdate = rspamd_http_parse_date(expires_hdr->begin, expires_hdr->len); + + if (hdate != (time_t) -1 && hdate > msg->date) { + cached_timeout = map->next_check - msg->date + + map->poll_timeout * 2; + + map->next_check = hdate; + } + else { + msg_info_map("invalid expires header: %T, ignore it", expires_hdr); + map->next_check = 0; + } + } + + /* Check for etag */ + etag_hdr = rspamd_http_message_find_header(msg, "ETag"); + + if (etag_hdr) { + if (cbd->data->etag) { + /* Remove old etag */ + rspamd_fstring_free(cbd->data->etag); + } + + cbd->data->etag = rspamd_fstring_new_init(etag_hdr->begin, + etag_hdr->len); + } + else { + if (cbd->data->etag) { + /* Remove and clear old etag */ + rspamd_fstring_free(cbd->data->etag); + cbd->data->etag = NULL; + } + } + + MAP_RETAIN(cbd->shmem_data, "shmem_data"); + cbd->data->gen++; + /* + * We know that a map is in the locked state + */ + g_atomic_int_set(&data->cache->available, 1); + /* Store cached data */ + rspamd_strlcpy(data->cache->shmem_name, cbd->shmem_data->shm_name, + sizeof(data->cache->shmem_name)); + data->cache->len = cbd->data_len; + data->cache->last_modified = cbd->data->last_modified; + cache_cbd = g_malloc0(sizeof(*cache_cbd)); + cache_cbd->shm = cbd->shmem_data; + cache_cbd->event_loop = cbd->event_loop; + cache_cbd->map = map; + cache_cbd->data = cbd->data; + cache_cbd->last_checked = cbd->data->last_checked; + cache_cbd->gen = cbd->data->gen; + MAP_RETAIN(cache_cbd->shm, "shmem_data"); + msg_info_map("stored map data in a shared memory cache: %s", + cache_cbd->shm->shm_name); + + ev_timer_init(&cache_cbd->timeout, rspamd_map_cache_cb, cached_timeout, + 0.0); + ev_timer_start(cbd->event_loop, &cache_cbd->timeout); + cache_cbd->timeout.data = cache_cbd; + data->cur_cache_cbd = cache_cbd; + + if (map->next_check) { + rspamd_http_date_format(next_check_date, sizeof(next_check_date), + map->next_check); + } + else { + rspamd_http_date_format(next_check_date, sizeof(next_check_date), + rspamd_get_calendar_ticks() + map->poll_timeout); + } + + + if (cbd->bk->is_compressed) { + ZSTD_DStream *zstream; + ZSTD_inBuffer zin; + ZSTD_outBuffer zout; + guchar *out; + gsize outlen, r; + + zstream = ZSTD_createDStream(); + ZSTD_initDStream(zstream); + + zin.pos = 0; + zin.src = in; + zin.size = dlen; + + if ((outlen = ZSTD_getDecompressedSize(zin.src, zin.size)) == 0) { + outlen = ZSTD_DStreamOutSize(); + } + + out = g_malloc(outlen); + + zout.dst = out; + zout.pos = 0; + zout.size = outlen; + + while (zin.pos < zin.size) { + r = ZSTD_decompressStream(zstream, &zout, &zin); + + if (ZSTD_isError(r)) { + msg_err_map("%s(%s): cannot decompress data: %s", + cbd->bk->uri, + rspamd_inet_address_to_string_pretty(cbd->addr), + ZSTD_getErrorName(r)); + ZSTD_freeDStream(zstream); + g_free(out); + MAP_RELEASE(cbd->shmem_data, "shmem_data"); + goto err; + } + + if (zout.pos == zout.size) { + /* We need to extend output buffer */ + zout.size = zout.size * 2 + 1.0; + out = g_realloc(zout.dst, zout.size); + zout.dst = out; + } + } + + ZSTD_freeDStream(zstream); + msg_info_map("%s(%s): read map data %z bytes compressed, " + "%z uncompressed, next check at %s", + cbd->bk->uri, + rspamd_inet_address_to_string_pretty(cbd->addr), + dlen, zout.pos, next_check_date); + map->read_callback(out, zout.pos, &cbd->periodic->cbdata, TRUE); + rspamd_map_save_http_cached_file(map, bk, cbd->data, out, zout.pos); + g_free(out); + } + else { + msg_info_map("%s(%s): read map data %z bytes, next check at %s", + cbd->bk->uri, + rspamd_inet_address_to_string_pretty(cbd->addr), + dlen, next_check_date); + rspamd_map_save_http_cached_file(map, bk, cbd->data, in, cbd->data_len); + map->read_callback(in, cbd->data_len, &cbd->periodic->cbdata, TRUE); + } + + MAP_RELEASE(cbd->shmem_data, "shmem_data"); + + cbd->periodic->cur_backend++; + munmap(in, dlen); + rspamd_map_process_periodic(cbd->periodic); + } + else if (msg->code == 304 && cbd->check) { + cbd->data->last_checked = msg->date; + + if (msg->last_modified) { + cbd->data->last_modified = msg->last_modified; + } + else { + cbd->data->last_modified = msg->date; + } + + expires_hdr = rspamd_http_message_find_header(msg, "Expires"); + + if (expires_hdr) { + time_t hdate; + + hdate = rspamd_http_parse_date(expires_hdr->begin, expires_hdr->len); + if (hdate != (time_t) -1 && hdate > msg->date) { + map->next_check = hdate; + } + else { + msg_info_map("invalid expires header: %T, ignore it", expires_hdr); + map->next_check = 0; + } + } + + etag_hdr = rspamd_http_message_find_header(msg, "ETag"); + + if (etag_hdr) { + if (cbd->data->etag) { + /* Remove old etag */ + rspamd_fstring_free(cbd->data->etag); + cbd->data->etag = rspamd_fstring_new_init(etag_hdr->begin, + etag_hdr->len); + } + } + + if (map->next_check) { + rspamd_http_date_format(next_check_date, sizeof(next_check_date), + map->next_check); + msg_info_map("data is not modified for server %s, next check at %s " + "(http cache based: %T)", + cbd->data->host, next_check_date, expires_hdr); + } + else { + rspamd_http_date_format(next_check_date, sizeof(next_check_date), + rspamd_get_calendar_ticks() + map->poll_timeout); + msg_info_map("data is not modified for server %s, next check at %s " + "(timer based)", + cbd->data->host, next_check_date); + } + + rspamd_map_update_http_cached_file(map, bk, cbd->data); + cbd->periodic->cur_backend++; + rspamd_map_process_periodic(cbd->periodic); + } + else { + msg_info_map("cannot load map %s from %s: HTTP error %d", + bk->uri, cbd->data->host, msg->code); + goto err; + } + + MAP_RELEASE(cbd, "http_callback_data"); + return 0; + +err: + cbd->periodic->errored = 1; + rspamd_map_process_periodic(cbd->periodic); + MAP_RELEASE(cbd, "http_callback_data"); + + return 0; +} + +static gboolean +read_map_file_chunks(struct rspamd_map *map, struct map_cb_data *cbdata, + const gchar *fname, gsize len, goffset off) +{ + gint fd; + gssize r, avail; + gsize buflen = 1024 * 1024; + gchar *pos, *bytes; + + fd = rspamd_file_xopen(fname, O_RDONLY, 0, TRUE); + + if (fd == -1) { + msg_err_map("can't open map for buffered reading %s: %s", + fname, strerror(errno)); + return FALSE; + } + + if (lseek(fd, off, SEEK_SET) == -1) { + msg_err_map("can't seek in map to pos %d for buffered reading %s: %s", + (gint) off, fname, strerror(errno)); + close(fd); + + return FALSE; + } + + buflen = MIN(len, buflen); + bytes = g_malloc(buflen); + avail = buflen; + pos = bytes; + + while ((r = read(fd, pos, avail)) > 0) { + gchar *end = bytes + (pos - bytes) + r; + msg_debug_map("%s: read map chunk, %z bytes", fname, + r); + pos = map->read_callback(bytes, end - bytes, cbdata, r == len); + + if (pos && pos > bytes && pos < end) { + guint remain = end - pos; + + memmove(bytes, pos, remain); + pos = bytes + remain; + /* Need to preserve the remain */ + avail = ((gssize) buflen) - remain; + + if (avail <= 0) { + /* Try realloc, too large element */ + g_assert(buflen >= remain); + bytes = g_realloc(bytes, buflen * 2); + + pos = bytes + remain; /* Adjust */ + avail += buflen; + buflen *= 2; + } + } + else { + avail = buflen; + pos = bytes; + } + + len -= r; + } + + if (r == -1) { + msg_err_map("can't read from map %s: %s", fname, strerror(errno)); + close(fd); + g_free(bytes); + + return FALSE; + } + + close(fd); + g_free(bytes); + + return TRUE; +} + +static gboolean +rspamd_map_check_sig_pk_mem(const guchar *sig, + gsize siglen, + struct rspamd_map *map, + const guchar *input, + gsize inlen, + struct rspamd_cryptobox_pubkey *pk) +{ + GString *b32_key; + gboolean ret = TRUE; + + if (siglen != rspamd_cryptobox_signature_bytes(RSPAMD_CRYPTOBOX_MODE_25519)) { + msg_err_map("can't open signature for %s: invalid size: %z", map->name, siglen); + + ret = FALSE; + } + + if (ret && !rspamd_cryptobox_verify(sig, siglen, input, inlen, + rspamd_pubkey_get_pk(pk, NULL), RSPAMD_CRYPTOBOX_MODE_25519)) { + msg_err_map("can't verify signature for %s: incorrect signature", map->name); + + ret = FALSE; + } + + if (ret) { + b32_key = rspamd_pubkey_print(pk, + RSPAMD_KEYPAIR_BASE32 | RSPAMD_KEYPAIR_PUBKEY); + msg_info_map("verified signature for %s using trusted key %v", + map->name, b32_key); + g_string_free(b32_key, TRUE); + } + + return ret; +} + +static gboolean +rspamd_map_check_file_sig(const char *fname, + struct rspamd_map *map, + struct rspamd_map_backend *bk, + const guchar *input, + gsize inlen) +{ + guchar *data; + struct rspamd_cryptobox_pubkey *pk = NULL; + GString *b32_key; + gboolean ret = TRUE; + gsize len = 0; + gchar fpath[PATH_MAX]; + + if (bk->trusted_pubkey == NULL) { + /* Try to load and check pubkey */ + rspamd_snprintf(fpath, sizeof(fpath), "%s.pub", fname); + data = rspamd_file_xmap(fpath, PROT_READ, &len, TRUE); + + if (data == NULL) { + msg_err_map("can't open pubkey %s: %s", fpath, strerror(errno)); + return FALSE; + } + + pk = rspamd_pubkey_from_base32(data, len, RSPAMD_KEYPAIR_SIGN, + RSPAMD_CRYPTOBOX_MODE_25519); + munmap(data, len); + + if (pk == NULL) { + msg_err_map("can't load pubkey %s", fpath); + return FALSE; + } + + /* We just check pk against the trusted db of keys */ + b32_key = rspamd_pubkey_print(pk, + RSPAMD_KEYPAIR_BASE32 | RSPAMD_KEYPAIR_PUBKEY); + g_assert(b32_key != NULL); + + if (g_hash_table_lookup(map->cfg->trusted_keys, b32_key->str) == NULL) { + msg_err_map("pubkey loaded from %s is untrusted: %v", fpath, + b32_key); + g_string_free(b32_key, TRUE); + rspamd_pubkey_unref(pk); + + return FALSE; + } + + g_string_free(b32_key, TRUE); + } + else { + pk = rspamd_pubkey_ref(bk->trusted_pubkey); + } + + rspamd_snprintf(fpath, sizeof(fpath), "%s.sig", fname); + data = rspamd_shmem_xmap(fpath, PROT_READ, &len); + + if (data == NULL) { + msg_err_map("can't open signature %s: %s", fpath, strerror(errno)); + ret = FALSE; + } + + if (ret) { + ret = rspamd_map_check_sig_pk_mem(data, len, map, input, inlen, pk); + munmap(data, len); + } + + rspamd_pubkey_unref(pk); + + return ret; +} + +/** + * Callback for reading data from file + */ +static gboolean +read_map_file(struct rspamd_map *map, struct file_map_data *data, + struct rspamd_map_backend *bk, struct map_periodic_cbdata *periodic) +{ + gchar *bytes; + gsize len; + struct stat st; + + if (map->read_callback == NULL || map->fin_callback == NULL) { + msg_err_map("%s: bad callback for reading map file", + data->filename); + return FALSE; + } + + if (stat(data->filename, &st) == -1) { + /* File does not exist, skipping */ + if (errno != ENOENT) { + msg_err_map("%s: map file is unavailable for reading: %s", + data->filename, strerror(errno)); + + return FALSE; + } + else { + msg_info_map("%s: map file is not found; " + "it will be read automatically if created", + data->filename); + return TRUE; + } + } + + ev_stat_stat(map->event_loop, &data->st_ev); + len = st.st_size; + + if (bk->is_signed) { + bytes = rspamd_file_xmap(data->filename, PROT_READ, &len, TRUE); + + if (bytes == NULL) { + msg_err_map("can't open map %s: %s", data->filename, strerror(errno)); + return FALSE; + } + + if (!rspamd_map_check_file_sig(data->filename, map, bk, bytes, len)) { + munmap(bytes, len); + + return FALSE; + } + + munmap(bytes, len); + } + + if (len > 0) { + if (map->no_file_read) { + /* We just call read callback with backend name */ + map->read_callback(data->filename, strlen(data->filename), + &periodic->cbdata, TRUE); + } + else { + if (bk->is_compressed) { + bytes = rspamd_file_xmap(data->filename, PROT_READ, &len, TRUE); + + if (bytes == NULL) { + msg_err_map("can't open map %s: %s", data->filename, strerror(errno)); + return FALSE; + } + + ZSTD_DStream *zstream; + ZSTD_inBuffer zin; + ZSTD_outBuffer zout; + guchar *out; + gsize outlen, r; + + zstream = ZSTD_createDStream(); + ZSTD_initDStream(zstream); + + zin.pos = 0; + zin.src = bytes; + zin.size = len; + + if ((outlen = ZSTD_getDecompressedSize(zin.src, zin.size)) == 0) { + outlen = ZSTD_DStreamOutSize(); + } + + out = g_malloc(outlen); + + zout.dst = out; + zout.pos = 0; + zout.size = outlen; + + while (zin.pos < zin.size) { + r = ZSTD_decompressStream(zstream, &zout, &zin); + + if (ZSTD_isError(r)) { + msg_err_map("%s: cannot decompress data: %s", + data->filename, + ZSTD_getErrorName(r)); + ZSTD_freeDStream(zstream); + g_free(out); + munmap(bytes, len); + return FALSE; + } + + if (zout.pos == zout.size) { + /* We need to extend output buffer */ + zout.size = zout.size * 2 + 1; + out = g_realloc(zout.dst, zout.size); + zout.dst = out; + } + } + + ZSTD_freeDStream(zstream); + msg_info_map("%s: read map data, %z bytes compressed, " + "%z uncompressed)", + data->filename, + len, zout.pos); + map->read_callback(out, zout.pos, &periodic->cbdata, TRUE); + g_free(out); + + munmap(bytes, len); + } + else { + /* Perform buffered read: fail-safe */ + if (!read_map_file_chunks(map, &periodic->cbdata, data->filename, + len, 0)) { + return FALSE; + } + } + } + } + else { + /* Empty map */ + map->read_callback(NULL, 0, &periodic->cbdata, TRUE); + } + + return TRUE; +} + +static gboolean +read_map_static(struct rspamd_map *map, struct static_map_data *data, + struct rspamd_map_backend *bk, struct map_periodic_cbdata *periodic) +{ + guchar *bytes; + gsize len; + + if (map->read_callback == NULL || map->fin_callback == NULL) { + msg_err_map("%s: bad callback for reading map file", map->name); + data->processed = TRUE; + return FALSE; + } + + bytes = data->data; + len = data->len; + + if (len > 0) { + if (bk->is_compressed) { + ZSTD_DStream *zstream; + ZSTD_inBuffer zin; + ZSTD_outBuffer zout; + guchar *out; + gsize outlen, r; + + zstream = ZSTD_createDStream(); + ZSTD_initDStream(zstream); + + zin.pos = 0; + zin.src = bytes; + zin.size = len; + + if ((outlen = ZSTD_getDecompressedSize(zin.src, zin.size)) == 0) { + outlen = ZSTD_DStreamOutSize(); + } + + out = g_malloc(outlen); + + zout.dst = out; + zout.pos = 0; + zout.size = outlen; + + while (zin.pos < zin.size) { + r = ZSTD_decompressStream(zstream, &zout, &zin); + + if (ZSTD_isError(r)) { + msg_err_map("%s: cannot decompress data: %s", + map->name, + ZSTD_getErrorName(r)); + ZSTD_freeDStream(zstream); + g_free(out); + + return FALSE; + } + + if (zout.pos == zout.size) { + /* We need to extend output buffer */ + zout.size = zout.size * 2 + 1; + out = g_realloc(zout.dst, zout.size); + zout.dst = out; + } + } + + ZSTD_freeDStream(zstream); + msg_info_map("%s: read map data, %z bytes compressed, " + "%z uncompressed)", + map->name, + len, zout.pos); + map->read_callback(out, zout.pos, &periodic->cbdata, TRUE); + g_free(out); + } + else { + msg_info_map("%s: read map data, %z bytes", + map->name, len); + map->read_callback(bytes, len, &periodic->cbdata, TRUE); + } + } + else { + map->read_callback(NULL, 0, &periodic->cbdata, TRUE); + } + + data->processed = TRUE; + + return TRUE; +} + +static void +rspamd_map_periodic_dtor(struct map_periodic_cbdata *periodic) +{ + struct rspamd_map *map; + + map = periodic->map; + msg_debug_map("periodic dtor %p", periodic); + + if (periodic->need_modify || periodic->cbdata.errored) { + /* Need to notify the real data structure */ + periodic->map->fin_callback(&periodic->cbdata, periodic->map->user_data); + + if (map->on_load_function) { + map->on_load_function(map, map->on_load_ud); + } + } + else { + /* Not modified */ + } + + if (periodic->locked) { + g_atomic_int_set(periodic->map->locked, 0); + msg_debug_map("unlocked map %s", periodic->map->name); + + if (periodic->map->wrk->state == rspamd_worker_state_running) { + rspamd_map_schedule_periodic(periodic->map, + RSPAMD_SYMBOL_RESULT_NORMAL); + } + else { + msg_debug_map("stop scheduling periodics for %s; terminating state", + periodic->map->name); + } + } + + g_free(periodic); +} + +/* Called on timer execution */ +static void +rspamd_map_periodic_callback(struct ev_loop *loop, ev_timer *w, int revents) +{ + struct map_periodic_cbdata *cbd = (struct map_periodic_cbdata *) w->data; + + MAP_RETAIN(cbd, "periodic"); + ev_timer_stop(loop, w); + rspamd_map_process_periodic(cbd); + MAP_RELEASE(cbd, "periodic"); +} + +static void +rspamd_map_schedule_periodic(struct rspamd_map *map, int how) +{ + const gdouble error_mult = 20.0, lock_mult = 0.1; + static const gdouble min_timer_interval = 2.0; + const gchar *reason = "unknown reason"; + gdouble jittered_sec; + gdouble timeout; + struct map_periodic_cbdata *cbd; + + if (map->scheduled_check || (map->wrk && + map->wrk->state != rspamd_worker_state_running)) { + /* + * Do not schedule check if some check is already scheduled or + * if worker is going to die + */ + return; + } + + if (!(how & RSPAMD_MAP_SCHEDULE_INIT) && map->static_only) { + /* No need to schedule anything for static maps */ + return; + } + + if (map->non_trivial && map->next_check != 0) { + timeout = map->next_check - rspamd_get_calendar_ticks(); + map->next_check = 0; + + if (timeout > 0 && timeout < map->poll_timeout) { + /* Early check case, jitter */ + gdouble poll_timeout = map->poll_timeout; + + if (how & RSPAMD_MAP_SCHEDULE_ERROR) { + poll_timeout = map->poll_timeout * error_mult; + reason = "early active non-trivial check (after error)"; + } + else if (how & RSPAMD_MAP_SCHEDULE_LOCKED) { + poll_timeout = map->poll_timeout * lock_mult; + reason = "early active non-trivial check (after being locked)"; + } + else { + reason = "early active non-trivial check"; + } + + jittered_sec = MIN(timeout, poll_timeout); + } + else if (timeout <= 0) { + /* Data is already expired, need to check */ + if (how & RSPAMD_MAP_SCHEDULE_ERROR) { + /* In case of error we still need to increase delay */ + jittered_sec = map->poll_timeout * error_mult; + reason = "expired non-trivial data (after error)"; + } + else { + jittered_sec = 0.0; + reason = "expired non-trivial data"; + } + } + else { + /* No need to check now, wait till next_check */ + jittered_sec = timeout; + reason = "valid non-trivial data"; + } + } + else { + /* No valid information when to check a map, plan a timer based check */ + timeout = map->poll_timeout; + + if (how & RSPAMD_MAP_SCHEDULE_INIT) { + if (map->active_http) { + /* Spill maps load to get better chances to hit ssl cache */ + timeout = rspamd_time_jitter(0.0, 2.0); + } + else { + timeout = 0.0; + } + + reason = "init scheduled check"; + } + else { + if (how & RSPAMD_MAP_SCHEDULE_ERROR) { + timeout = map->poll_timeout * error_mult; + reason = "errored scheduled check"; + } + else if (how & RSPAMD_MAP_SCHEDULE_LOCKED) { + timeout = map->poll_timeout * lock_mult; + reason = "locked scheduled check"; + } + else { + reason = "normal scheduled check"; + } + } + + jittered_sec = rspamd_time_jitter(timeout, 0); + } + + /* Now, we do some sanity checks for jittered seconds */ + if (!(how & RSPAMD_MAP_SCHEDULE_INIT)) { + /* Never allow too low interval between timer checks, it is expensive */ + if (jittered_sec < min_timer_interval) { + jittered_sec = rspamd_time_jitter(min_timer_interval, 0); + } + + if (map->non_trivial) { + /* + * Even if we are reported that we need to reload cache often, we + * still want to be sane in terms of events... + */ + if (jittered_sec < min_timer_interval * 2.0) { + if (map->nelts > 0) { + jittered_sec = min_timer_interval * 3.0; + } + } + } + } + + cbd = g_malloc0(sizeof(*cbd)); + cbd->cbdata.prev_data = *map->user_data; + cbd->cbdata.cur_data = NULL; + cbd->cbdata.map = map; + cbd->map = map; + map->scheduled_check = cbd; + REF_INIT_RETAIN(cbd, rspamd_map_periodic_dtor); + + cbd->ev.data = cbd; + ev_timer_init(&cbd->ev, rspamd_map_periodic_callback, jittered_sec, 0.0); + ev_timer_start(map->event_loop, &cbd->ev); + + msg_debug_map("schedule new periodic event %p in %.3f seconds for %s; reason: %s", + cbd, jittered_sec, map->name, reason); +} + +static gint +rspamd_map_af_to_weight(const rspamd_inet_addr_t *addr) +{ + int ret; + + switch (rspamd_inet_address_get_af(addr)) { + case AF_UNIX: + ret = 2; + break; + case AF_INET: + ret = 1; + break; + default: + ret = 0; + break; + } + + return ret; +} + +static gint +rspamd_map_dns_address_sort_func(gconstpointer a, gconstpointer b) +{ + const rspamd_inet_addr_t *ip1 = *(const rspamd_inet_addr_t **) a, + *ip2 = *(const rspamd_inet_addr_t **) b; + gint w1, w2; + + w1 = rspamd_map_af_to_weight(ip1); + w2 = rspamd_map_af_to_weight(ip2); + + /* Inverse order */ + return w2 - w1; +} + +static void +rspamd_map_dns_callback(struct rdns_reply *reply, void *arg) +{ + struct http_callback_data *cbd = arg; + struct rdns_reply_entry *cur_rep; + struct rspamd_map *map; + guint flags = RSPAMD_HTTP_CLIENT_SIMPLE | RSPAMD_HTTP_CLIENT_SHARED; + + map = cbd->map; + + msg_debug_map("got dns reply with code %s on stage %d", + rdns_strerror(reply->code), cbd->stage); + + if (cbd->stage == http_map_terminated) { + MAP_RELEASE(cbd, "http_callback_data"); + return; + } + + if (reply->code == RDNS_RC_NOERROR) { + DL_FOREACH(reply->entries, cur_rep) + { + rspamd_inet_addr_t *addr; + addr = rspamd_inet_address_from_rnds(cur_rep); + + if (addr != NULL) { + rspamd_inet_address_set_port(addr, cbd->data->port); + g_ptr_array_add(cbd->addrs, (void *) addr); + } + } + + if (cbd->stage == http_map_resolve_host2) { + /* We have still one request pending */ + cbd->stage = http_map_resolve_host1; + } + else if (cbd->stage == http_map_resolve_host1) { + cbd->stage = http_map_http_conn; + } + } + else if (cbd->stage < http_map_http_conn) { + if (cbd->stage == http_map_resolve_host2) { + /* We have still one request pending */ + cbd->stage = http_map_resolve_host1; + } + else if (cbd->addrs->len == 0) { + /* We could not resolve host, so cowardly fail here */ + msg_err_map("cannot resolve %s: %s", cbd->data->host, + rdns_strerror(reply->code)); + cbd->periodic->errored = 1; + rspamd_map_process_periodic(cbd->periodic); + } + else { + /* We have at least one address, so we can continue... */ + cbd->stage = http_map_http_conn; + } + } + + if (cbd->stage == http_map_http_conn && cbd->addrs->len > 0) { + rspamd_ptr_array_shuffle(cbd->addrs); + gint idx = 0; + /* + * For the existing addr we can just select any address as we have + * data available + */ + if (cbd->map->nelts > 0 && rspamd_random_double_fast() > 0.5) { + /* Already shuffled, use whatever is the first */ + cbd->addr = (rspamd_inet_addr_t *) g_ptr_array_index(cbd->addrs, idx); + } + else { + /* Always prefer IPv4 as IPv6 is almost all the time broken */ + g_ptr_array_sort(cbd->addrs, rspamd_map_dns_address_sort_func); + cbd->addr = (rspamd_inet_addr_t *) g_ptr_array_index(cbd->addrs, idx); + } + + retry: + msg_debug_map("try open http connection to %s", + rspamd_inet_address_to_string_pretty(cbd->addr)); + if (cbd->bk->protocol == MAP_PROTO_HTTPS) { + flags |= RSPAMD_HTTP_CLIENT_SSL; + } + cbd->conn = rspamd_http_connection_new_client(NULL, + NULL, + http_map_error, + http_map_finish, + flags, + cbd->addr); + + if (cbd->conn != NULL) { + write_http_request(cbd); + } + else { + if (idx < cbd->addrs->len - 1) { + /* We can retry */ + idx++; + rspamd_inet_addr_t *prev_addr = cbd->addr; + cbd->addr = (rspamd_inet_addr_t *) g_ptr_array_index(cbd->addrs, idx); + msg_info_map("cannot connect to %s to get data for %s: %s, retry with %s (%d of %d)", + rspamd_inet_address_to_string_pretty(prev_addr), + cbd->bk->uri, + strerror(errno), + rspamd_inet_address_to_string_pretty(cbd->addr), + idx + 1, cbd->addrs->len); + goto retry; + } + else { + /* Nothing else left */ + cbd->periodic->errored = TRUE; + msg_err_map("error reading %s(%s): " + "connection with http server terminated incorrectly: %s", + cbd->bk->uri, + cbd->addr ? rspamd_inet_address_to_string_pretty(cbd->addr) : "", + strerror(errno)); + + rspamd_map_process_periodic(cbd->periodic); + } + } + } + + MAP_RELEASE(cbd, "http_callback_data"); +} + +static gboolean +rspamd_map_read_cached(struct rspamd_map *map, struct rspamd_map_backend *bk, + struct map_periodic_cbdata *periodic, const gchar *host) +{ + gsize mmap_len, len; + gpointer in; + struct http_map_data *data; + + data = bk->data.hd; + + in = rspamd_shmem_xmap(data->cache->shmem_name, PROT_READ, &mmap_len); + + if (in == NULL) { + msg_err("cannot map cache from %s: %s", data->cache->shmem_name, + strerror(errno)); + return FALSE; + } + + if (mmap_len < data->cache->len) { + msg_err("cannot map cache from %s: truncated length %z, %z expected", + data->cache->shmem_name, + mmap_len, data->cache->len); + munmap(in, mmap_len); + + return FALSE; + } + + /* + * Len is taken from the shmem file size that can be larger than the + * actual data length, as we use shared memory as a growing buffer for the + * HTTP input. + * Hence, we need to use len from the saved cache data, counting that it is + * at least not more than the cached file length (this is checked above). + */ + len = data->cache->len; + + if (bk->is_compressed) { + ZSTD_DStream *zstream; + ZSTD_inBuffer zin; + ZSTD_outBuffer zout; + guchar *out; + gsize outlen, r; + + zstream = ZSTD_createDStream(); + ZSTD_initDStream(zstream); + + zin.pos = 0; + zin.src = in; + zin.size = len; + + if ((outlen = ZSTD_getDecompressedSize(zin.src, zin.size)) == 0) { + outlen = ZSTD_DStreamOutSize(); + } + + out = g_malloc(outlen); + + zout.dst = out; + zout.pos = 0; + zout.size = outlen; + + while (zin.pos < zin.size) { + r = ZSTD_decompressStream(zstream, &zout, &zin); + + if (ZSTD_isError(r)) { + msg_err_map("%s: cannot decompress data: %s", + bk->uri, + ZSTD_getErrorName(r)); + ZSTD_freeDStream(zstream); + g_free(out); + munmap(in, mmap_len); + return FALSE; + } + + if (zout.pos == zout.size) { + /* We need to extend output buffer */ + zout.size = zout.size * 2 + 1; + out = g_realloc(zout.dst, zout.size); + zout.dst = out; + } + } + + ZSTD_freeDStream(zstream); + msg_info_map("%s: read map data cached %z bytes compressed, " + "%z uncompressed", + bk->uri, + len, zout.pos); + map->read_callback(out, zout.pos, &periodic->cbdata, TRUE); + g_free(out); + } + else { + msg_info_map("%s: read map data cached %z bytes", bk->uri, len); + map->read_callback(in, len, &periodic->cbdata, TRUE); + } + + munmap(in, mmap_len); + + return TRUE; +} + +static gboolean +rspamd_map_has_http_cached_file(struct rspamd_map *map, + struct rspamd_map_backend *bk) +{ + gchar path[PATH_MAX]; + guchar digest[rspamd_cryptobox_HASHBYTES]; + struct rspamd_config *cfg = map->cfg; + struct stat st; + + if (cfg->maps_cache_dir == NULL || cfg->maps_cache_dir[0] == '\0') { + return FALSE; + } + + rspamd_cryptobox_hash(digest, bk->uri, strlen(bk->uri), NULL, 0); + rspamd_snprintf(path, sizeof(path), "%s%c%*xs.map", cfg->maps_cache_dir, + G_DIR_SEPARATOR, 20, digest); + + if (stat(path, &st) != -1 && st.st_size > + sizeof(struct rspamd_http_file_data)) { + return TRUE; + } + + return FALSE; +} + +static gboolean +rspamd_map_save_http_cached_file(struct rspamd_map *map, + struct rspamd_map_backend *bk, + struct http_map_data *htdata, + const guchar *data, + gsize len) +{ + gchar path[PATH_MAX]; + guchar digest[rspamd_cryptobox_HASHBYTES]; + struct rspamd_config *cfg = map->cfg; + gint fd; + struct rspamd_http_file_data header; + + if (cfg->maps_cache_dir == NULL || cfg->maps_cache_dir[0] == '\0') { + return FALSE; + } + + rspamd_cryptobox_hash(digest, bk->uri, strlen(bk->uri), NULL, 0); + rspamd_snprintf(path, sizeof(path), "%s%c%*xs.map", cfg->maps_cache_dir, + G_DIR_SEPARATOR, 20, digest); + + fd = rspamd_file_xopen(path, O_WRONLY | O_TRUNC | O_CREAT, + 00600, FALSE); + + if (fd == -1) { + return FALSE; + } + + if (!rspamd_file_lock(fd, FALSE)) { + msg_err_map("cannot lock file %s: %s", path, strerror(errno)); + close(fd); + + return FALSE; + } + + memcpy(header.magic, rspamd_http_file_magic, sizeof(rspamd_http_file_magic)); + header.mtime = htdata->last_modified; + header.next_check = map->next_check; + header.data_off = sizeof(header); + + if (htdata->etag) { + header.data_off += RSPAMD_FSTRING_LEN(htdata->etag); + header.etag_len = RSPAMD_FSTRING_LEN(htdata->etag); + } + else { + header.etag_len = 0; + } + + if (write(fd, &header, sizeof(header)) != sizeof(header)) { + msg_err_map("cannot write file %s (header stage): %s", path, strerror(errno)); + rspamd_file_unlock(fd, FALSE); + close(fd); + + return FALSE; + } + + if (header.etag_len > 0) { + if (write(fd, RSPAMD_FSTRING_DATA(htdata->etag), header.etag_len) != + header.etag_len) { + msg_err_map("cannot write file %s (etag stage): %s", path, strerror(errno)); + rspamd_file_unlock(fd, FALSE); + close(fd); + + return FALSE; + } + } + + /* Now write the rest */ + if (write(fd, data, len) != len) { + msg_err_map("cannot write file %s (data stage): %s", path, strerror(errno)); + rspamd_file_unlock(fd, FALSE); + close(fd); + + return FALSE; + } + + rspamd_file_unlock(fd, FALSE); + close(fd); + + msg_info_map("saved data from %s in %s, %uz bytes", bk->uri, path, len + sizeof(header) + header.etag_len); + + return TRUE; +} + +static gboolean +rspamd_map_update_http_cached_file(struct rspamd_map *map, + struct rspamd_map_backend *bk, + struct http_map_data *htdata) +{ + gchar path[PATH_MAX]; + guchar digest[rspamd_cryptobox_HASHBYTES]; + struct rspamd_config *cfg = map->cfg; + gint fd; + struct rspamd_http_file_data header; + + if (!rspamd_map_has_http_cached_file(map, bk)) { + return FALSE; + } + + rspamd_cryptobox_hash(digest, bk->uri, strlen(bk->uri), NULL, 0); + rspamd_snprintf(path, sizeof(path), "%s%c%*xs.map", cfg->maps_cache_dir, + G_DIR_SEPARATOR, 20, digest); + + fd = rspamd_file_xopen(path, O_WRONLY, + 00600, FALSE); + + if (fd == -1) { + return FALSE; + } + + if (!rspamd_file_lock(fd, FALSE)) { + msg_err_map("cannot lock file %s: %s", path, strerror(errno)); + close(fd); + + return FALSE; + } + + memcpy(header.magic, rspamd_http_file_magic, sizeof(rspamd_http_file_magic)); + header.mtime = htdata->last_modified; + header.next_check = map->next_check; + header.data_off = sizeof(header); + + if (htdata->etag) { + header.data_off += RSPAMD_FSTRING_LEN(htdata->etag); + header.etag_len = RSPAMD_FSTRING_LEN(htdata->etag); + } + else { + header.etag_len = 0; + } + + if (write(fd, &header, sizeof(header)) != sizeof(header)) { + msg_err_map("cannot update file %s (header stage): %s", path, strerror(errno)); + rspamd_file_unlock(fd, FALSE); + close(fd); + + return FALSE; + } + + if (header.etag_len > 0) { + if (write(fd, RSPAMD_FSTRING_DATA(htdata->etag), header.etag_len) != + header.etag_len) { + msg_err_map("cannot update file %s (etag stage): %s", path, strerror(errno)); + rspamd_file_unlock(fd, FALSE); + close(fd); + + return FALSE; + } + } + + rspamd_file_unlock(fd, FALSE); + close(fd); + + return TRUE; +} + + +static gboolean +rspamd_map_read_http_cached_file(struct rspamd_map *map, + struct rspamd_map_backend *bk, + struct http_map_data *htdata, + struct map_cb_data *cbdata) +{ + gchar path[PATH_MAX]; + guchar digest[rspamd_cryptobox_HASHBYTES]; + struct rspamd_config *cfg = map->cfg; + gint fd; + struct stat st; + struct rspamd_http_file_data header; + + if (cfg->maps_cache_dir == NULL || cfg->maps_cache_dir[0] == '\0') { + return FALSE; + } + + rspamd_cryptobox_hash(digest, bk->uri, strlen(bk->uri), NULL, 0); + rspamd_snprintf(path, sizeof(path), "%s%c%*xs.map", cfg->maps_cache_dir, + G_DIR_SEPARATOR, 20, digest); + + fd = rspamd_file_xopen(path, O_RDONLY, 00600, FALSE); + + if (fd == -1) { + return FALSE; + } + + if (!rspamd_file_lock(fd, FALSE)) { + msg_err_map("cannot lock file %s: %s", path, strerror(errno)); + close(fd); + + return FALSE; + } + + (void) fstat(fd, &st); + + if (read(fd, &header, sizeof(header)) != sizeof(header)) { + msg_err_map("cannot read file %s (header stage): %s", path, strerror(errno)); + rspamd_file_unlock(fd, FALSE); + close(fd); + + return FALSE; + } + + if (memcmp(header.magic, rspamd_http_file_magic, + sizeof(rspamd_http_file_magic)) != 0) { + msg_warn_map("invalid or old version magic in file %s; ignore it", path); + rspamd_file_unlock(fd, FALSE); + close(fd); + + return FALSE; + } + + double now = rspamd_get_calendar_ticks(); + + if (header.next_check > now) { + map->next_check = header.next_check; + } + else { + map->next_check = now; + } + + htdata->last_modified = header.mtime; + + if (header.etag_len > 0) { + rspamd_fstring_t *etag = rspamd_fstring_sized_new(header.etag_len); + + if (read(fd, RSPAMD_FSTRING_DATA(etag), header.etag_len) != header.etag_len) { + msg_err_map("cannot read file %s (etag stage): %s", path, + strerror(errno)); + rspamd_file_unlock(fd, FALSE); + rspamd_fstring_free(etag); + close(fd); + + return FALSE; + } + + etag->len = header.etag_len; + + if (htdata->etag) { + /* FIXME: should be dealt somehow better */ + msg_warn_map("etag is already defined as %V; cached is %V; ignore cached", + htdata->etag, etag); + rspamd_fstring_free(etag); + } + else { + htdata->etag = etag; + } + } + + rspamd_file_unlock(fd, FALSE); + close(fd); + + /* Now read file data */ + /* Perform buffered read: fail-safe */ + if (!read_map_file_chunks(map, cbdata, path, + st.st_size - header.data_off, header.data_off)) { + return FALSE; + } + + struct tm tm; + gchar ncheck_buf[32], lm_buf[32]; + + rspamd_localtime(map->next_check, &tm); + strftime(ncheck_buf, sizeof(ncheck_buf) - 1, "%Y-%m-%d %H:%M:%S", &tm); + rspamd_localtime(htdata->last_modified, &tm); + strftime(lm_buf, sizeof(lm_buf) - 1, "%Y-%m-%d %H:%M:%S", &tm); + + msg_info_map("read cached data for %s from %s, %uz bytes; next check at: %s;" + " last modified on: %s; etag: %V", + bk->uri, + path, + (size_t) (st.st_size - header.data_off), + ncheck_buf, + lm_buf, + htdata->etag); + + return TRUE; +} + +/** + * Async HTTP callback + */ +static void +rspamd_map_common_http_callback(struct rspamd_map *map, + struct rspamd_map_backend *bk, + struct map_periodic_cbdata *periodic, + gboolean check) +{ + struct http_map_data *data; + struct http_callback_data *cbd; + guint flags = RSPAMD_HTTP_CLIENT_SIMPLE | RSPAMD_HTTP_CLIENT_SHARED; + + data = bk->data.hd; + + if (g_atomic_int_get(&data->cache->available) == 1) { + /* Read cached data */ + if (check) { + if (data->last_modified < data->cache->last_modified) { + msg_info_map("need to reread cached map triggered by %s " + "(%d our modify time, %d cached modify time)", + bk->uri, + (int) data->last_modified, + (int) data->cache->last_modified); + periodic->need_modify = TRUE; + /* Reset the whole chain */ + periodic->cur_backend = 0; + rspamd_map_process_periodic(periodic); + } + else { + if (map->active_http) { + /* Check even if there is a cached version */ + goto check; + } + else { + /* Switch to the next backend */ + periodic->cur_backend++; + rspamd_map_process_periodic(periodic); + } + } + + return; + } + else { + if (map->active_http && + data->last_modified > data->cache->last_modified) { + goto check; + } + else if (rspamd_map_read_cached(map, bk, periodic, data->host)) { + /* Switch to the next backend */ + periodic->cur_backend++; + data->last_modified = data->cache->last_modified; + rspamd_map_process_periodic(periodic); + + return; + } + } + } + else if (!map->active_http) { + /* Switch to the next backend */ + periodic->cur_backend++; + rspamd_map_process_periodic(periodic); + + return; + } + +check: + cbd = g_malloc0(sizeof(struct http_callback_data)); + + cbd->event_loop = map->event_loop; + cbd->addrs = g_ptr_array_sized_new(4); + cbd->map = map; + cbd->data = data; + cbd->check = check; + cbd->periodic = periodic; + MAP_RETAIN(periodic, "periodic"); + cbd->bk = bk; + MAP_RETAIN(bk, "rspamd_map_backend"); + cbd->stage = http_map_terminated; + REF_INIT_RETAIN(cbd, free_http_cbdata); + + msg_debug_map("%s map data from %s", check ? "checking" : "reading", + data->host); + + /* Try address */ + rspamd_inet_addr_t *addr = NULL; + + if (rspamd_parse_inet_address(&addr, data->host, + strlen(data->host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) { + rspamd_inet_address_set_port(addr, cbd->data->port); + g_ptr_array_add(cbd->addrs, (void *) addr); + + if (bk->protocol == MAP_PROTO_HTTPS) { + flags |= RSPAMD_HTTP_CLIENT_SSL; + } + + cbd->conn = rspamd_http_connection_new_client( + NULL, + NULL, + http_map_error, + http_map_finish, + flags, + addr); + + if (cbd->conn != NULL) { + cbd->stage = http_map_http_conn; + write_http_request(cbd); + cbd->addr = addr; + MAP_RELEASE(cbd, "http_callback_data"); + } + else { + msg_warn_map("cannot load map: cannot connect to %s: %s", + data->host, strerror(errno)); + MAP_RELEASE(cbd, "http_callback_data"); + } + + return; + } + else if (map->r->r) { + /* Send both A and AAAA requests */ + guint nreq = 0; + + if (rdns_make_request_full(map->r->r, rspamd_map_dns_callback, cbd, + map->cfg->dns_timeout, map->cfg->dns_retransmits, 1, + data->host, RDNS_REQUEST_A)) { + MAP_RETAIN(cbd, "http_callback_data"); + nreq++; + } + if (rdns_make_request_full(map->r->r, rspamd_map_dns_callback, cbd, + map->cfg->dns_timeout, map->cfg->dns_retransmits, 1, + data->host, RDNS_REQUEST_AAAA)) { + MAP_RETAIN(cbd, "http_callback_data"); + nreq++; + } + + if (nreq == 2) { + cbd->stage = http_map_resolve_host2; + } + else if (nreq == 1) { + cbd->stage = http_map_resolve_host1; + } + + map->tmp_dtor = free_http_cbdata_dtor; + map->tmp_dtor_data = cbd; + } + else { + msg_warn_map("cannot load map: DNS resolver is not initialized"); + cbd->periodic->errored = TRUE; + } + + MAP_RELEASE(cbd, "http_callback_data"); +} + +static void +rspamd_map_http_check_callback(struct map_periodic_cbdata *cbd) +{ + struct rspamd_map *map; + struct rspamd_map_backend *bk; + + map = cbd->map; + bk = g_ptr_array_index(cbd->map->backends, cbd->cur_backend); + + rspamd_map_common_http_callback(map, bk, cbd, TRUE); +} + +static void +rspamd_map_http_read_callback(struct map_periodic_cbdata *cbd) +{ + struct rspamd_map *map; + struct rspamd_map_backend *bk; + + map = cbd->map; + bk = g_ptr_array_index(cbd->map->backends, cbd->cur_backend); + rspamd_map_common_http_callback(map, bk, cbd, FALSE); +} + +static void +rspamd_map_file_check_callback(struct map_periodic_cbdata *periodic) +{ + struct rspamd_map *map; + struct file_map_data *data; + struct rspamd_map_backend *bk; + + map = periodic->map; + bk = g_ptr_array_index(map->backends, periodic->cur_backend); + data = bk->data.fd; + + if (data->need_modify) { + periodic->need_modify = TRUE; + periodic->cur_backend = 0; + data->need_modify = FALSE; + + rspamd_map_process_periodic(periodic); + + return; + } + + map = periodic->map; + /* Switch to the next backend as the rest is handled by ev_stat */ + periodic->cur_backend++; + rspamd_map_process_periodic(periodic); +} + +static void +rspamd_map_static_check_callback(struct map_periodic_cbdata *periodic) +{ + struct rspamd_map *map; + struct static_map_data *data; + struct rspamd_map_backend *bk; + + map = periodic->map; + bk = g_ptr_array_index(map->backends, periodic->cur_backend); + data = bk->data.sd; + + if (!data->processed) { + periodic->need_modify = TRUE; + periodic->cur_backend = 0; + + rspamd_map_process_periodic(periodic); + + return; + } + + /* Switch to the next backend */ + periodic->cur_backend++; + rspamd_map_process_periodic(periodic); +} + +static void +rspamd_map_file_read_callback(struct map_periodic_cbdata *periodic) +{ + struct rspamd_map *map; + struct file_map_data *data; + struct rspamd_map_backend *bk; + + map = periodic->map; + + bk = g_ptr_array_index(map->backends, periodic->cur_backend); + data = bk->data.fd; + + msg_info_map("rereading map file %s", data->filename); + + if (!read_map_file(map, data, bk, periodic)) { + periodic->errored = TRUE; + } + + /* Switch to the next backend */ + periodic->cur_backend++; + rspamd_map_process_periodic(periodic); +} + +static void +rspamd_map_static_read_callback(struct map_periodic_cbdata *periodic) +{ + struct rspamd_map *map; + struct static_map_data *data; + struct rspamd_map_backend *bk; + + map = periodic->map; + + bk = g_ptr_array_index(map->backends, periodic->cur_backend); + data = bk->data.sd; + + msg_info_map("rereading static map"); + + if (!read_map_static(map, data, bk, periodic)) { + periodic->errored = TRUE; + } + + /* Switch to the next backend */ + periodic->cur_backend++; + rspamd_map_process_periodic(periodic); +} + +static void +rspamd_map_process_periodic(struct map_periodic_cbdata *cbd) +{ + struct rspamd_map_backend *bk; + struct rspamd_map *map; + + map = cbd->map; + map->scheduled_check = NULL; + + if (!map->file_only && !cbd->locked) { + if (!g_atomic_int_compare_and_exchange(cbd->map->locked, + 0, 1)) { + msg_debug_map( + "don't try to reread map %s as it is locked by other process, " + "will reread it later", + cbd->map->name); + rspamd_map_schedule_periodic(map, RSPAMD_MAP_SCHEDULE_LOCKED); + MAP_RELEASE(cbd, "periodic"); + + return; + } + else { + msg_debug_map("locked map %s", cbd->map->name); + cbd->locked = TRUE; + } + } + + if (cbd->errored) { + /* We should not check other backends if some backend has failed*/ + rspamd_map_schedule_periodic(cbd->map, RSPAMD_MAP_SCHEDULE_ERROR); + + if (cbd->locked) { + g_atomic_int_set(cbd->map->locked, 0); + cbd->locked = FALSE; + } + + /* Also set error flag for the map consumer */ + cbd->cbdata.errored = true; + + msg_debug_map("unlocked map %s, refcount=%d", cbd->map->name, + cbd->ref.refcount); + MAP_RELEASE(cbd, "periodic"); + + return; + } + + /* For each backend we need to check for modifications */ + if (cbd->cur_backend >= cbd->map->backends->len) { + /* Last backend */ + msg_debug_map("finished map: %d of %d", cbd->cur_backend, + cbd->map->backends->len); + MAP_RELEASE(cbd, "periodic"); + + return; + } + + if (cbd->map->wrk && cbd->map->wrk->state == rspamd_worker_state_running) { + bk = g_ptr_array_index(cbd->map->backends, cbd->cur_backend); + g_assert(bk != NULL); + + if (cbd->need_modify) { + /* Load data from the next backend */ + switch (bk->protocol) { + case MAP_PROTO_HTTP: + case MAP_PROTO_HTTPS: + rspamd_map_http_read_callback(cbd); + break; + case MAP_PROTO_FILE: + rspamd_map_file_read_callback(cbd); + break; + case MAP_PROTO_STATIC: + rspamd_map_static_read_callback(cbd); + break; + } + } + else { + /* Check the next backend */ + switch (bk->protocol) { + case MAP_PROTO_HTTP: + case MAP_PROTO_HTTPS: + rspamd_map_http_check_callback(cbd); + break; + case MAP_PROTO_FILE: + rspamd_map_file_check_callback(cbd); + break; + case MAP_PROTO_STATIC: + rspamd_map_static_check_callback(cbd); + break; + } + } + } +} + +static void +rspamd_map_on_stat(struct ev_loop *loop, ev_stat *w, int revents) +{ + struct rspamd_map *map = (struct rspamd_map *) w->data; + + if (w->attr.st_nlink > 0) { + msg_info_map("old mtime is %t (size = %Hz), " + "new mtime is %t (size = %Hz) for map file %s", + w->prev.st_mtime, (gsize) w->prev.st_size, + w->attr.st_mtime, (gsize) w->attr.st_size, + w->path); + + /* Fire need modify flag */ + struct rspamd_map_backend *bk; + guint i; + + PTR_ARRAY_FOREACH(map->backends, i, bk) + { + if (bk->protocol == MAP_PROTO_FILE) { + bk->data.fd->need_modify = TRUE; + } + } + + map->next_check = 0; + + if (map->scheduled_check) { + ev_timer_stop(map->event_loop, &map->scheduled_check->ev); + MAP_RELEASE(map->scheduled_check, "rspamd_map_on_stat"); + map->scheduled_check = NULL; + } + + rspamd_map_schedule_periodic(map, RSPAMD_MAP_SCHEDULE_INIT); + } +} + +/* Start watching event for all maps */ +void rspamd_map_watch(struct rspamd_config *cfg, + struct ev_loop *event_loop, + struct rspamd_dns_resolver *resolver, + struct rspamd_worker *worker, + enum rspamd_map_watch_type how) +{ + GList *cur = cfg->maps; + struct rspamd_map *map; + struct rspamd_map_backend *bk; + guint i; + + g_assert(how > RSPAMD_MAP_WATCH_MIN && how < RSPAMD_MAP_WATCH_MAX); + + /* First of all do synced read of data */ + while (cur) { + map = cur->data; + map->event_loop = event_loop; + map->r = resolver; + + if (map->wrk == NULL && how != RSPAMD_MAP_WATCH_WORKER) { + /* Generic scanner map */ + map->wrk = worker; + + if (how == RSPAMD_MAP_WATCH_PRIMARY_CONTROLLER) { + map->active_http = TRUE; + } + else { + map->active_http = FALSE; + } + } + else if (map->wrk != NULL && map->wrk == worker) { + /* Map is bound to a specific worker */ + map->active_http = TRUE; + } + else { + /* Skip map for this worker as irrelevant */ + cur = g_list_next(cur); + continue; + } + + if (!map->active_http) { + /* Check cached version more frequently as it is cheap */ + + if (map->poll_timeout >= cfg->map_timeout && + cfg->map_file_watch_multiplier < 1.0) { + map->poll_timeout = + map->poll_timeout * cfg->map_file_watch_multiplier; + } + } + + map->file_only = TRUE; + map->static_only = TRUE; + + PTR_ARRAY_FOREACH(map->backends, i, bk) + { + bk->event_loop = event_loop; + + if (bk->protocol == MAP_PROTO_FILE) { + struct file_map_data *data; + + data = bk->data.fd; + + if (map->user_data == NULL || *map->user_data == NULL) { + /* Map has not been read, init it's reading if possible */ + struct stat st; + + if (stat(data->filename, &st) != -1) { + data->need_modify = TRUE; + } + } + + ev_stat_init(&data->st_ev, rspamd_map_on_stat, + data->filename, map->poll_timeout * cfg->map_file_watch_multiplier); + data->st_ev.data = map; + ev_stat_start(event_loop, &data->st_ev); + map->static_only = FALSE; + } + else if ((bk->protocol == MAP_PROTO_HTTP || + bk->protocol == MAP_PROTO_HTTPS)) { + if (map->active_http) { + map->non_trivial = TRUE; + } + + map->static_only = FALSE; + map->file_only = FALSE; + } + } + + rspamd_map_schedule_periodic(map, RSPAMD_MAP_SCHEDULE_INIT); + + cur = g_list_next(cur); + } +} + +void rspamd_map_preload(struct rspamd_config *cfg) +{ + GList *cur = cfg->maps; + struct rspamd_map *map; + struct rspamd_map_backend *bk; + guint i; + gboolean map_ok; + + /* First of all do synced read of data */ + while (cur) { + map = cur->data; + map_ok = TRUE; + + PTR_ARRAY_FOREACH(map->backends, i, bk) + { + if (!(bk->protocol == MAP_PROTO_FILE || + bk->protocol == MAP_PROTO_STATIC)) { + + if (bk->protocol == MAP_PROTO_HTTP || + bk->protocol == MAP_PROTO_HTTPS) { + if (!rspamd_map_has_http_cached_file(map, bk)) { + + if (!map->fallback_backend) { + map_ok = FALSE; + } + break; + } + else { + continue; /* We are yet fine */ + } + } + map_ok = FALSE; + break; + } + } + + if (map_ok) { + struct map_periodic_cbdata fake_cbd; + gboolean succeed = TRUE; + + memset(&fake_cbd, 0, sizeof(fake_cbd)); + fake_cbd.cbdata.state = 0; + fake_cbd.cbdata.prev_data = *map->user_data; + fake_cbd.cbdata.cur_data = NULL; + fake_cbd.cbdata.map = map; + fake_cbd.map = map; + + PTR_ARRAY_FOREACH(map->backends, i, bk) + { + fake_cbd.cur_backend = i; + + if (bk->protocol == MAP_PROTO_FILE) { + if (!read_map_file(map, bk->data.fd, bk, &fake_cbd)) { + succeed = FALSE; + break; + } + } + else if (bk->protocol == MAP_PROTO_STATIC) { + if (!read_map_static(map, bk->data.sd, bk, &fake_cbd)) { + succeed = FALSE; + break; + } + } + else if (bk->protocol == MAP_PROTO_HTTP || + bk->protocol == MAP_PROTO_HTTPS) { + if (!rspamd_map_read_http_cached_file(map, bk, bk->data.hd, + &fake_cbd.cbdata)) { + + if (map->fallback_backend) { + /* Try fallback */ + g_assert(map->fallback_backend->protocol == + MAP_PROTO_FILE); + if (!read_map_file(map, + map->fallback_backend->data.fd, + map->fallback_backend, &fake_cbd)) { + succeed = FALSE; + break; + } + } + else { + succeed = FALSE; + break; + } + } + } + else { + g_assert_not_reached(); + } + } + + if (succeed) { + map->fin_callback(&fake_cbd.cbdata, map->user_data); + + if (map->on_load_function) { + map->on_load_function(map, map->on_load_ud); + } + } + else { + msg_info_map("preload of %s failed", map->name); + } + } + + cur = g_list_next(cur); + } +} + +void rspamd_map_remove_all(struct rspamd_config *cfg) +{ + struct rspamd_map *map; + GList *cur; + struct rspamd_map_backend *bk; + struct map_cb_data cbdata; + guint i; + + for (cur = cfg->maps; cur != NULL; cur = g_list_next(cur)) { + map = cur->data; + + if (map->tmp_dtor) { + map->tmp_dtor(map->tmp_dtor_data); + } + + if (map->dtor) { + cbdata.prev_data = NULL; + cbdata.map = map; + cbdata.cur_data = *map->user_data; + + map->dtor(&cbdata); + *map->user_data = NULL; + } + + if (map->on_load_ud_dtor) { + map->on_load_ud_dtor(map->on_load_ud); + } + + for (i = 0; i < map->backends->len; i++) { + bk = g_ptr_array_index(map->backends, i); + + MAP_RELEASE(bk, "rspamd_map_backend"); + } + + if (map->fallback_backend) { + MAP_RELEASE(map->fallback_backend, "rspamd_map_backend"); + } + } + + g_list_free(cfg->maps); + cfg->maps = NULL; +} + +static const gchar * +rspamd_map_check_proto(struct rspamd_config *cfg, + const gchar *map_line, struct rspamd_map_backend *bk) +{ + const gchar *pos = map_line, *end, *end_key; + + g_assert(bk != NULL); + g_assert(pos != NULL); + + end = pos + strlen(pos); + + /* Static check */ + if (g_ascii_strcasecmp(pos, "static") == 0) { + bk->protocol = MAP_PROTO_STATIC; + bk->uri = g_strdup(pos); + + return pos; + } + else if (g_ascii_strcasecmp(pos, "zst+static") == 0) { + bk->protocol = MAP_PROTO_STATIC; + bk->uri = g_strdup(pos + 4); + bk->is_compressed = TRUE; + + return pos + 4; + } + + for (;;) { + if (g_ascii_strncasecmp(pos, "sign+", sizeof("sign+") - 1) == 0) { + bk->is_signed = TRUE; + pos += sizeof("sign+") - 1; + } + else if (g_ascii_strncasecmp(pos, "fallback+", sizeof("fallback+") - 1) == 0) { + bk->is_fallback = TRUE; + pos += sizeof("fallback+") - 1; + } + else if (g_ascii_strncasecmp(pos, "key=", sizeof("key=") - 1) == 0) { + pos += sizeof("key=") - 1; + end_key = memchr(pos, '+', end - pos); + + if (end_key != NULL) { + bk->trusted_pubkey = rspamd_pubkey_from_base32(pos, end_key - pos, + RSPAMD_KEYPAIR_SIGN, RSPAMD_CRYPTOBOX_MODE_25519); + + if (bk->trusted_pubkey == NULL) { + msg_err_config("cannot read pubkey from map: %s", + map_line); + return NULL; + } + pos = end_key + 1; + } + else if (end - pos > 64) { + /* Try hex encoding */ + bk->trusted_pubkey = rspamd_pubkey_from_hex(pos, 64, + RSPAMD_KEYPAIR_SIGN, RSPAMD_CRYPTOBOX_MODE_25519); + + if (bk->trusted_pubkey == NULL) { + msg_err_config("cannot read pubkey from map: %s", + map_line); + return NULL; + } + pos += 64; + } + else { + msg_err_config("cannot read pubkey from map: %s", + map_line); + return NULL; + } + + if (*pos == '+' || *pos == ':') { + pos++; + } + } + else { + /* No known flags */ + break; + } + } + + bk->protocol = MAP_PROTO_FILE; + + if (g_ascii_strncasecmp(pos, "http://", sizeof("http://") - 1) == 0) { + bk->protocol = MAP_PROTO_HTTP; + /* Include http:// */ + bk->uri = g_strdup(pos); + pos += sizeof("http://") - 1; + } + else if (g_ascii_strncasecmp(pos, "https://", sizeof("https://") - 1) == 0) { + bk->protocol = MAP_PROTO_HTTPS; + /* Include https:// */ + bk->uri = g_strdup(pos); + pos += sizeof("https://") - 1; + } + else if (g_ascii_strncasecmp(pos, "file://", sizeof("file://") - 1) == 0) { + pos += sizeof("file://") - 1; + /* Exclude file:// */ + bk->uri = g_strdup(pos); + } + else if (*pos == '/') { + /* Trivial file case */ + bk->uri = g_strdup(pos); + } + else { + msg_err_config("invalid map fetching protocol: %s", map_line); + + return NULL; + } + + if (bk->protocol != MAP_PROTO_FILE && bk->is_signed) { + msg_err_config("signed maps are no longer supported for HTTP(s): %s", map_line); + } + + return pos; +} + +gboolean +rspamd_map_is_map(const gchar *map_line) +{ + gboolean ret = FALSE; + + g_assert(map_line != NULL); + + if (map_line[0] == '/') { + ret = TRUE; + } + else if (g_ascii_strncasecmp(map_line, "sign+", sizeof("sign+") - 1) == 0) { + ret = TRUE; + } + else if (g_ascii_strncasecmp(map_line, "fallback+", sizeof("fallback+") - 1) == 0) { + ret = TRUE; + } + else if (g_ascii_strncasecmp(map_line, "file://", sizeof("file://") - 1) == 0) { + ret = TRUE; + } + else if (g_ascii_strncasecmp(map_line, "http://", sizeof("http://") - 1) == 0) { + ret = TRUE; + } + else if (g_ascii_strncasecmp(map_line, "https://", sizeof("https://") - 1) == 0) { + ret = TRUE; + } + + return ret; +} + +static void +rspamd_map_backend_dtor(struct rspamd_map_backend *bk) +{ + switch (bk->protocol) { + case MAP_PROTO_FILE: + if (bk->data.fd) { + ev_stat_stop(bk->event_loop, &bk->data.fd->st_ev); + g_free(bk->data.fd->filename); + g_free(bk->data.fd); + } + break; + case MAP_PROTO_STATIC: + if (bk->data.sd) { + if (bk->data.sd->data) { + g_free(bk->data.sd->data); + } + + g_free(bk->data.sd); + } + break; + case MAP_PROTO_HTTP: + case MAP_PROTO_HTTPS: + if (bk->data.hd) { + struct http_map_data *data = bk->data.hd; + + g_free(data->host); + g_free(data->path); + g_free(data->rest); + + if (data->userinfo) { + g_free(data->userinfo); + } + + if (data->etag) { + rspamd_fstring_free(data->etag); + } + + /* + * Clear cached file, but check if a worker is an active http worker + * as cur_cache_cbd is meaningful merely for active worker, who actually + * owns the cache + */ + if (bk->map && bk->map->active_http) { + if (g_atomic_int_compare_and_exchange(&data->cache->available, 1, 0)) { + if (data->cur_cache_cbd) { + msg_info("clear shared memory cache for a map in %s as backend \"%s\" is closing", + data->cur_cache_cbd->shm->shm_name, + bk->uri); + MAP_RELEASE(data->cur_cache_cbd->shm, + "rspamd_http_map_cached_cbdata"); + ev_timer_stop(data->cur_cache_cbd->event_loop, + &data->cur_cache_cbd->timeout); + g_free(data->cur_cache_cbd); + data->cur_cache_cbd = NULL; + } + } + } + + g_free(bk->data.hd); + } + break; + } + + if (bk->trusted_pubkey) { + rspamd_pubkey_unref(bk->trusted_pubkey); + } + + g_free(bk->uri); + g_free(bk); +} + +static struct rspamd_map_backend * +rspamd_map_parse_backend(struct rspamd_config *cfg, const gchar *map_line) +{ + struct rspamd_map_backend *bk; + struct file_map_data *fdata = NULL; + struct http_map_data *hdata = NULL; + struct static_map_data *sdata = NULL; + struct http_parser_url up; + const gchar *end, *p; + rspamd_ftok_t tok; + + bk = g_malloc0(sizeof(*bk)); + REF_INIT_RETAIN(bk, rspamd_map_backend_dtor); + + if (!rspamd_map_check_proto(cfg, map_line, bk)) { + goto err; + } + + if (bk->is_fallback && bk->protocol != MAP_PROTO_FILE) { + msg_err_config("fallback backend must be file for %s", bk->uri); + + goto err; + } + + end = map_line + strlen(map_line); + if (end - map_line > 5) { + p = end - 5; + if (g_ascii_strcasecmp(p, ".zstd") == 0) { + bk->is_compressed = TRUE; + } + p = end - 4; + if (g_ascii_strcasecmp(p, ".zst") == 0) { + bk->is_compressed = TRUE; + } + } + + /* Now check for each proto separately */ + if (bk->protocol == MAP_PROTO_FILE) { + fdata = g_malloc0(sizeof(struct file_map_data)); + + if (access(bk->uri, R_OK) == -1) { + if (errno != ENOENT) { + msg_err_config("cannot open file '%s': %s", bk->uri, strerror(errno)); + goto err; + } + + msg_info_config( + "map '%s' is not found, but it can be loaded automatically later", + bk->uri); + } + + fdata->filename = g_strdup(bk->uri); + bk->data.fd = fdata; + } + else if (bk->protocol == MAP_PROTO_HTTP || bk->protocol == MAP_PROTO_HTTPS) { + hdata = g_malloc0(sizeof(struct http_map_data)); + + memset(&up, 0, sizeof(up)); + if (http_parser_parse_url(bk->uri, strlen(bk->uri), FALSE, + &up) != 0) { + msg_err_config("cannot parse HTTP url: %s", bk->uri); + goto err; + } + else { + if (!(up.field_set & 1u << UF_HOST)) { + msg_err_config("cannot parse HTTP url: %s: no host", bk->uri); + goto err; + } + + tok.begin = bk->uri + up.field_data[UF_HOST].off; + tok.len = up.field_data[UF_HOST].len; + hdata->host = rspamd_ftokdup(&tok); + + if (up.field_set & (1u << UF_PORT)) { + hdata->port = up.port; + } + else { + if (bk->protocol == MAP_PROTO_HTTP) { + hdata->port = 80; + } + else { + hdata->port = 443; + } + } + + if (up.field_set & (1u << UF_PATH)) { + tok.begin = bk->uri + up.field_data[UF_PATH].off; + tok.len = up.field_data[UF_PATH].len; + + hdata->path = rspamd_ftokdup(&tok); + + /* We also need to check query + fragment */ + if (up.field_set & ((1u << UF_QUERY) | (1u << UF_FRAGMENT))) { + tok.begin = bk->uri + up.field_data[UF_PATH].off + + up.field_data[UF_PATH].len; + tok.len = strlen(tok.begin); + hdata->rest = rspamd_ftokdup(&tok); + } + else { + hdata->rest = g_strdup(""); + } + } + + if (up.field_set & (1u << UF_USERINFO)) { + /* Create authorisation header for basic auth */ + guint len = sizeof("Basic ") + + up.field_data[UF_USERINFO].len * 8 / 5 + 4; + hdata->userinfo = g_malloc(len); + rspamd_snprintf(hdata->userinfo, len, "Basic %*Bs", + (int) up.field_data[UF_USERINFO].len, + bk->uri + up.field_data[UF_USERINFO].off); + + msg_debug("added userinfo for the map from the URL: %s", hdata->host); + } + else { + /* Try to obtain authentication data from options in the configuration */ + const ucl_object_t *auth_obj, *opts_obj; + + opts_obj = ucl_object_lookup(cfg->cfg_ucl_obj, "options"); + if (opts_obj != NULL) { + auth_obj = ucl_object_lookup(opts_obj, "http_auth"); + if (auth_obj != NULL && ucl_object_type(auth_obj) == UCL_OBJECT) { + const ucl_object_t *host_obj; + + /* + * Search first by the full URL and then by the host part + */ + host_obj = ucl_object_lookup(auth_obj, map_line); + + if (host_obj == NULL) { + host_obj = ucl_object_lookup(auth_obj, hdata->host); + } + + if (host_obj != NULL && ucl_object_type(host_obj) == UCL_OBJECT) { + const ucl_object_t *user_obj, *password_obj; + + user_obj = ucl_object_lookup(host_obj, "user"); + password_obj = ucl_object_lookup(host_obj, "password"); + + if (user_obj != NULL && password_obj != NULL && + ucl_object_type(user_obj) == UCL_STRING && + ucl_object_type(password_obj) == UCL_STRING) { + + gchar *tmpbuf; + unsigned tlen; + + /* User + password + ':' */ + tlen = strlen(ucl_object_tostring(user_obj)) + + strlen(ucl_object_tostring(password_obj)) + 1; + tmpbuf = g_malloc(tlen + 1); + rspamd_snprintf(tmpbuf, tlen + 1, "%s:%s", + ucl_object_tostring(user_obj), + ucl_object_tostring(password_obj)); + /* Base64 encoding is not so greedy, but we add some space for simplicity */ + tlen *= 2; + tlen += sizeof("Basic ") - 1; + hdata->userinfo = g_malloc(tlen + 1); + rspamd_snprintf(hdata->userinfo, tlen + 1, "Basic %Bs", tmpbuf); + g_free(tmpbuf); + msg_debug("added userinfo for the map from the configuration: %s", map_line); + } + } + } + } + } + } + + hdata->cache = rspamd_mempool_alloc0_shared(cfg->cfg_pool, + sizeof(*hdata->cache)); + + bk->data.hd = hdata; + } + else if (bk->protocol == MAP_PROTO_STATIC) { + sdata = g_malloc0(sizeof(*sdata)); + bk->data.sd = sdata; + } + + bk->id = rspamd_cryptobox_fast_hash_specific(RSPAMD_CRYPTOBOX_T1HA, + bk->uri, strlen(bk->uri), + 0xdeadbabe); + + return bk; + +err: + MAP_RELEASE(bk, "rspamd_map_backend"); + + if (hdata) { + g_free(hdata); + } + + if (fdata) { + g_free(fdata); + } + + if (sdata) { + g_free(sdata); + } + + return NULL; +} + +static void +rspamd_map_calculate_hash(struct rspamd_map *map) +{ + struct rspamd_map_backend *bk; + guint i; + rspamd_cryptobox_hash_state_t st; + gchar *cksum_encoded, cksum[rspamd_cryptobox_HASHBYTES]; + + rspamd_cryptobox_hash_init(&st, NULL, 0); + + for (i = 0; i < map->backends->len; i++) { + bk = g_ptr_array_index(map->backends, i); + rspamd_cryptobox_hash_update(&st, bk->uri, strlen(bk->uri)); + } + + rspamd_cryptobox_hash_final(&st, cksum); + cksum_encoded = rspamd_encode_base32(cksum, sizeof(cksum), RSPAMD_BASE32_DEFAULT); + rspamd_strlcpy(map->tag, cksum_encoded, sizeof(map->tag)); + g_free(cksum_encoded); +} + +static gboolean +rspamd_map_add_static_string(struct rspamd_config *cfg, + const ucl_object_t *elt, + GString *target) +{ + gsize sz; + const gchar *dline; + + if (ucl_object_type(elt) != UCL_STRING) { + msg_err_config("map has static backend but `data` is " + "not string like: %s", + ucl_object_type_to_string(elt->type)); + return FALSE; + } + + /* Otherwise, we copy data to the backend */ + dline = ucl_object_tolstring(elt, &sz); + + if (sz == 0) { + msg_err_config("map has static backend but empty no data"); + return FALSE; + } + + g_string_append_len(target, dline, sz); + g_string_append_c(target, '\n'); + + return TRUE; +} + +struct rspamd_map * +rspamd_map_add(struct rspamd_config *cfg, + const gchar *map_line, + const gchar *description, + map_cb_t read_callback, + map_fin_cb_t fin_callback, + map_dtor_t dtor, + void **user_data, + struct rspamd_worker *worker, + int flags) +{ + struct rspamd_map *map; + struct rspamd_map_backend *bk; + + bk = rspamd_map_parse_backend(cfg, map_line); + if (bk == NULL) { + return NULL; + } + + if (bk->is_fallback) { + msg_err_config("cannot add map with fallback only backend: %s", bk->uri); + REF_RELEASE(bk); + + return NULL; + } + + map = rspamd_mempool_alloc0(cfg->cfg_pool, sizeof(struct rspamd_map)); + map->read_callback = read_callback; + map->fin_callback = fin_callback; + map->dtor = dtor; + map->user_data = user_data; + map->cfg = cfg; + map->id = rspamd_random_uint64_fast(); + map->locked = + rspamd_mempool_alloc0_shared(cfg->cfg_pool, sizeof(gint)); + map->backends = g_ptr_array_sized_new(1); + map->wrk = worker; + rspamd_mempool_add_destructor(cfg->cfg_pool, rspamd_ptr_array_free_hard, + map->backends); + g_ptr_array_add(map->backends, bk); + map->name = rspamd_mempool_strdup(cfg->cfg_pool, map_line); + map->no_file_read = (flags & RSPAMD_MAP_FILE_NO_READ); + + if (bk->protocol == MAP_PROTO_FILE) { + map->poll_timeout = (cfg->map_timeout * cfg->map_file_watch_multiplier); + } + else { + map->poll_timeout = cfg->map_timeout; + } + + if (description != NULL) { + map->description = rspamd_mempool_strdup(cfg->cfg_pool, description); + } + + rspamd_map_calculate_hash(map); + msg_info_map("added map %s", bk->uri); + bk->map = map; + + cfg->maps = g_list_prepend(cfg->maps, map); + + return map; +} + +struct rspamd_map * +rspamd_map_add_fake(struct rspamd_config *cfg, + const gchar *description, + const gchar *name) +{ + struct rspamd_map *map; + + map = rspamd_mempool_alloc0(cfg->cfg_pool, sizeof(struct rspamd_map)); + map->cfg = cfg; + map->id = rspamd_random_uint64_fast(); + map->name = rspamd_mempool_strdup(cfg->cfg_pool, name); + map->user_data = (void **) ↦ /* to prevent null pointer dereferencing */ + + if (description != NULL) { + map->description = rspamd_mempool_strdup(cfg->cfg_pool, description); + } + + return map; +} + +static inline void +rspamd_map_add_backend(struct rspamd_map *map, struct rspamd_map_backend *bk) +{ + if (bk->is_fallback) { + if (map->fallback_backend) { + msg_warn_map("redefining fallback backend from %s to %s", + map->fallback_backend->uri, bk->uri); + } + + map->fallback_backend = bk; + } + else { + g_ptr_array_add(map->backends, bk); + } + + bk->map = map; +} + +struct rspamd_map * +rspamd_map_add_from_ucl(struct rspamd_config *cfg, + const ucl_object_t *obj, + const gchar *description, + map_cb_t read_callback, + map_fin_cb_t fin_callback, + map_dtor_t dtor, + void **user_data, + struct rspamd_worker *worker, + gint flags) +{ + ucl_object_iter_t it = NULL; + const ucl_object_t *cur, *elt; + struct rspamd_map *map; + struct rspamd_map_backend *bk; + guint i; + + g_assert(obj != NULL); + + if (ucl_object_type(obj) == UCL_STRING) { + /* Just a plain string */ + return rspamd_map_add(cfg, ucl_object_tostring(obj), description, + read_callback, fin_callback, dtor, user_data, worker, flags); + } + + map = rspamd_mempool_alloc0(cfg->cfg_pool, sizeof(struct rspamd_map)); + map->read_callback = read_callback; + map->fin_callback = fin_callback; + map->dtor = dtor; + map->user_data = user_data; + map->cfg = cfg; + map->id = rspamd_random_uint64_fast(); + map->locked = + rspamd_mempool_alloc0_shared(cfg->cfg_pool, sizeof(gint)); + map->backends = g_ptr_array_new(); + map->wrk = worker; + map->no_file_read = (flags & RSPAMD_MAP_FILE_NO_READ); + rspamd_mempool_add_destructor(cfg->cfg_pool, rspamd_ptr_array_free_hard, + map->backends); + map->poll_timeout = cfg->map_timeout; + + if (description) { + map->description = rspamd_mempool_strdup(cfg->cfg_pool, description); + } + + if (ucl_object_type(obj) == UCL_ARRAY) { + /* Add array of maps as multiple backends */ + while ((cur = ucl_object_iterate(obj, &it, true)) != NULL) { + if (ucl_object_type(cur) == UCL_STRING) { + bk = rspamd_map_parse_backend(cfg, ucl_object_tostring(cur)); + + if (bk != NULL) { + rspamd_map_add_backend(map, bk); + + if (!map->name) { + map->name = rspamd_mempool_strdup(cfg->cfg_pool, + ucl_object_tostring(cur)); + } + } + } + else { + msg_err_config("bad map element type: %s", + ucl_object_type_to_string(ucl_object_type(cur))); + } + } + + if (map->backends->len == 0) { + msg_err_config("map has no urls to be loaded: empty list"); + goto err; + } + } + else if (ucl_object_type(obj) == UCL_OBJECT) { + elt = ucl_object_lookup(obj, "name"); + if (elt && ucl_object_type(elt) == UCL_STRING) { + map->name = rspamd_mempool_strdup(cfg->cfg_pool, + ucl_object_tostring(elt)); + } + + elt = ucl_object_lookup(obj, "description"); + if (elt && ucl_object_type(elt) == UCL_STRING) { + map->description = rspamd_mempool_strdup(cfg->cfg_pool, + ucl_object_tostring(elt)); + } + + elt = ucl_object_lookup_any(obj, "timeout", "poll", "poll_time", + "watch_interval", NULL); + if (elt) { + map->poll_timeout = ucl_object_todouble(elt); + } + + elt = ucl_object_lookup_any(obj, "upstreams", "url", "urls", NULL); + if (elt == NULL) { + msg_err_config("map has no urls to be loaded: no elt"); + goto err; + } + + if (ucl_object_type(elt) == UCL_ARRAY) { + /* Add array of maps as multiple backends */ + it = ucl_object_iterate_new(elt); + + while ((cur = ucl_object_iterate_safe(it, true)) != NULL) { + if (ucl_object_type(cur) == UCL_STRING) { + bk = rspamd_map_parse_backend(cfg, ucl_object_tostring(cur)); + + if (bk != NULL) { + rspamd_map_add_backend(map, bk); + + if (!map->name) { + map->name = rspamd_mempool_strdup(cfg->cfg_pool, + ucl_object_tostring(cur)); + } + } + } + else { + msg_err_config("bad map element type: %s", + ucl_object_type_to_string(ucl_object_type(cur))); + ucl_object_iterate_free(it); + goto err; + } + } + + ucl_object_iterate_free(it); + + if (map->backends->len == 0) { + msg_err_config("map has no urls to be loaded: empty object list"); + goto err; + } + } + else if (ucl_object_type(elt) == UCL_STRING) { + bk = rspamd_map_parse_backend(cfg, ucl_object_tostring(elt)); + + if (bk != NULL) { + rspamd_map_add_backend(map, bk); + + if (!map->name) { + map->name = rspamd_mempool_strdup(cfg->cfg_pool, + ucl_object_tostring(elt)); + } + } + } + + if (!map->backends || map->backends->len == 0) { + msg_err_config("map has no urls to be loaded: no valid backends"); + goto err; + } + } + else { + msg_err_config("map has invalid type for value: %s", + ucl_object_type_to_string(ucl_object_type(obj))); + goto err; + } + + gboolean all_local = TRUE; + + PTR_ARRAY_FOREACH(map->backends, i, bk) + { + if (bk->protocol == MAP_PROTO_STATIC) { + GString *map_data; + /* We need data field in ucl */ + elt = ucl_object_lookup(obj, "data"); + + if (elt == NULL) { + msg_err_config("map has static backend but no `data` field"); + goto err; + } + + + if (ucl_object_type(elt) == UCL_STRING) { + map_data = g_string_sized_new(32); + + if (rspamd_map_add_static_string(cfg, elt, map_data)) { + bk->data.sd->data = map_data->str; + bk->data.sd->len = map_data->len; + g_string_free(map_data, FALSE); + } + else { + g_string_free(map_data, TRUE); + msg_err_config("map has static backend with invalid `data` field"); + goto err; + } + } + else if (ucl_object_type(elt) == UCL_ARRAY) { + map_data = g_string_sized_new(32); + it = ucl_object_iterate_new(elt); + + while ((cur = ucl_object_iterate_safe(it, true))) { + if (!rspamd_map_add_static_string(cfg, cur, map_data)) { + g_string_free(map_data, TRUE); + msg_err_config("map has static backend with invalid " + "`data` field"); + ucl_object_iterate_free(it); + goto err; + } + } + + ucl_object_iterate_free(it); + bk->data.sd->data = map_data->str; + bk->data.sd->len = map_data->len; + g_string_free(map_data, FALSE); + } + } + else if (bk->protocol != MAP_PROTO_FILE) { + all_local = FALSE; + } + } + + if (all_local) { + map->poll_timeout = (map->poll_timeout * + cfg->map_file_watch_multiplier); + } + + rspamd_map_calculate_hash(map); + msg_debug_map("added map from ucl"); + + cfg->maps = g_list_prepend(cfg->maps, map); + + return map; + +err: + + if (map) { + PTR_ARRAY_FOREACH(map->backends, i, bk) + { + MAP_RELEASE(bk, "rspamd_map_backend"); + } + } + + return NULL; +} + +rspamd_map_traverse_function +rspamd_map_get_traverse_function(struct rspamd_map *map) +{ + if (map) { + return map->traverse_function; + } + + return NULL; +} + +void rspamd_map_traverse(struct rspamd_map *map, rspamd_map_traverse_cb cb, + gpointer cbdata, gboolean reset_hits) +{ + if (*map->user_data && map->traverse_function) { + map->traverse_function(*map->user_data, cb, cbdata, reset_hits); + } +} + +void rspamd_map_set_on_load_function(struct rspamd_map *map, rspamd_map_on_load_function cb, + gpointer cbdata, GDestroyNotify dtor) +{ + if (map) { + map->on_load_function = cb; + map->on_load_ud = cbdata; + map->on_load_ud_dtor = dtor; + } +} diff --git a/src/libserver/maps/map.h b/src/libserver/maps/map.h new file mode 100644 index 0000000..04df16e --- /dev/null +++ b/src/libserver/maps/map.h @@ -0,0 +1,168 @@ +#ifndef RSPAMD_MAP_H +#define RSPAMD_MAP_H + +#include "config.h" +#include "contrib/libev/ev.h" + +#include "ucl.h" +#include "mem_pool.h" +#include "radix.h" +#include "dns.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Maps API is designed to load lists data from different dynamic sources. + * It monitor files and HTTP locations for modifications and reload them if they are + * modified. + */ +struct map_cb_data; +struct rspamd_worker; + +/** + * Common map object + */ +struct rspamd_config; +struct rspamd_map; + +/** + * Callback types + */ +typedef gchar *(*map_cb_t)(gchar *chunk, gint len, + struct map_cb_data *data, gboolean final); + +typedef void (*map_fin_cb_t)(struct map_cb_data *data, void **target); + +typedef void (*map_dtor_t)(struct map_cb_data *data); + +typedef gboolean (*rspamd_map_traverse_cb)(gconstpointer key, + gconstpointer value, gsize hits, gpointer ud); + +typedef void (*rspamd_map_traverse_function)(void *data, + rspamd_map_traverse_cb cb, + gpointer cbdata, gboolean reset_hits); +typedef void (*rspamd_map_on_load_function)(struct rspamd_map *map, gpointer ud); + +/** + * Callback data for async load + */ +struct map_cb_data { + struct rspamd_map *map; + gint state; + bool errored; + void *prev_data; + void *cur_data; +}; + +/** + * Returns TRUE if line looks like a map definition + * @param map_line + * @return + */ +gboolean rspamd_map_is_map(const gchar *map_line); + +enum rspamd_map_flags { + RSPAMD_MAP_DEFAULT = 0, + RSPAMD_MAP_FILE_ONLY = 1u << 0u, + RSPAMD_MAP_FILE_NO_READ = 1u << 1u, +}; + +/** + * Add map from line + */ +struct rspamd_map *rspamd_map_add(struct rspamd_config *cfg, + const gchar *map_line, + const gchar *description, + map_cb_t read_callback, + map_fin_cb_t fin_callback, + map_dtor_t dtor, + void **user_data, + struct rspamd_worker *worker, + int flags); + +/** + * Add map from ucl + */ +struct rspamd_map *rspamd_map_add_from_ucl(struct rspamd_config *cfg, + const ucl_object_t *obj, + const gchar *description, + map_cb_t read_callback, + map_fin_cb_t fin_callback, + map_dtor_t dtor, + void **user_data, + struct rspamd_worker *worker, + int flags); + +/** + * Adds a fake map structure (for logging purposes mainly) + * @param cfg + * @param description + * @return + */ +struct rspamd_map *rspamd_map_add_fake(struct rspamd_config *cfg, + const gchar *description, + const gchar *name); + + +enum rspamd_map_watch_type { + RSPAMD_MAP_WATCH_MIN = 9, + RSPAMD_MAP_WATCH_PRIMARY_CONTROLLER, + RSPAMD_MAP_WATCH_SCANNER, + RSPAMD_MAP_WATCH_WORKER, + RSPAMD_MAP_WATCH_MAX +}; + +/** + * Start watching of maps by adding events to libevent event loop + */ +void rspamd_map_watch(struct rspamd_config *cfg, + struct ev_loop *event_loop, + struct rspamd_dns_resolver *resolver, + struct rspamd_worker *worker, + enum rspamd_map_watch_type how); + +/** + * Preloads maps where all backends are file + * @param cfg + */ +void rspamd_map_preload(struct rspamd_config *cfg); + +/** + * Remove all maps watched (remove events) + */ +void rspamd_map_remove_all(struct rspamd_config *cfg); + +/** + * Get traverse function for specific map + * @param map + * @return + */ +rspamd_map_traverse_function rspamd_map_get_traverse_function(struct rspamd_map *map); + +/** + * Perform map traverse + * @param map + * @param cb + * @param cbdata + * @param reset_hits + * @return + */ +void rspamd_map_traverse(struct rspamd_map *map, rspamd_map_traverse_cb cb, + gpointer cbdata, gboolean reset_hits); + +/** + * Set map on load callback + * @param map + * @param cb + * @param cbdata + */ +void rspamd_map_set_on_load_function(struct rspamd_map *map, rspamd_map_on_load_function cb, + gpointer cbdata, GDestroyNotify dtor); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/libserver/maps/map_helpers.c b/src/libserver/maps/map_helpers.c new file mode 100644 index 0000000..65478c5 --- /dev/null +++ b/src/libserver/maps/map_helpers.c @@ -0,0 +1,1845 @@ +/* + * Copyright 2023 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "map_helpers.h" +#include "map_private.h" +#include "khash.h" +#include "radix.h" +#include "rspamd.h" +#include "cryptobox.h" +#include "mempool_vars_internal.h" +#include "contrib/fastutf8/fastutf8.h" +#include "contrib/cdb/cdb.h" + +#ifdef WITH_HYPERSCAN +#include "hs.h" +#include "hyperscan_tools.h" +#endif +#ifndef WITH_PCRE2 +#include <pcre.h> +#else +#include <pcre2.h> +#endif + + +static const guint64 map_hash_seed = 0xdeadbabeULL; +static const gchar *const hash_fill = "1"; + +struct rspamd_map_helper_value { + gsize hits; + gconstpointer key; + gchar value[]; /* Null terminated */ +}; + +#define rspamd_map_ftok_hash(t) (rspamd_icase_hash((t).begin, (t).len, rspamd_hash_seed())) +#define rspamd_map_ftok_equal(a, b) ((a).len == (b).len && rspamd_lc_cmp((a).begin, (b).begin, (a).len) == 0) + +KHASH_INIT(rspamd_map_hash, rspamd_ftok_t, + struct rspamd_map_helper_value *, true, + rspamd_map_ftok_hash, rspamd_map_ftok_equal); + +struct rspamd_radix_map_helper { + rspamd_mempool_t *pool; + khash_t(rspamd_map_hash) * htb; + radix_compressed_t *trie; + struct rspamd_map *map; + rspamd_cryptobox_fast_hash_state_t hst; +}; + +struct rspamd_hash_map_helper { + rspamd_mempool_t *pool; + khash_t(rspamd_map_hash) * htb; + struct rspamd_map *map; + rspamd_cryptobox_fast_hash_state_t hst; +}; + +struct rspamd_cdb_map_helper { + GQueue cdbs; + struct rspamd_map *map; + rspamd_cryptobox_fast_hash_state_t hst; + gsize total_size; +}; + +struct rspamd_regexp_map_helper { + rspamd_cryptobox_hash_state_t hst; + guchar re_digest[rspamd_cryptobox_HASHBYTES]; + rspamd_mempool_t *pool; + struct rspamd_map *map; + GPtrArray *regexps; + GPtrArray *values; + khash_t(rspamd_map_hash) * htb; + enum rspamd_regexp_map_flags map_flags; +#ifdef WITH_HYPERSCAN + rspamd_hyperscan_t *hs_db; + hs_scratch_t *hs_scratch; + gchar **patterns; + gint *flags; + gint *ids; +#endif +}; + +/** + * FSM for parsing lists + */ + +#define MAP_STORE_KEY \ + do { \ + while (g_ascii_isspace(*c) && p > c) { c++; } \ + key = g_malloc(p - c + 1); \ + rspamd_strlcpy(key, c, p - c + 1); \ + stripped_key = g_strstrip(key); \ + } while (0) + +#define MAP_STORE_VALUE \ + do { \ + while (g_ascii_isspace(*c) && p > c) { c++; } \ + value = g_malloc(p - c + 1); \ + rspamd_strlcpy(value, c, p - c + 1); \ + stripped_value = g_strstrip(value); \ + } while (0) + +gchar * +rspamd_parse_kv_list( + gchar *chunk, + gint len, + struct map_cb_data *data, + rspamd_map_insert_func func, + const gchar *default_value, + gboolean final) +{ + enum { + map_skip_spaces_before_key = 0, + map_read_key, + map_read_key_quoted, + map_read_key_slashed, + map_skip_spaces_after_key, + map_backslash_quoted, + map_backslash_slashed, + map_read_key_after_slash, + map_read_value, + map_read_comment_start, + map_skip_comment, + map_read_eol, + }; + + gchar *c, *p, *key = NULL, *value = NULL, *stripped_key, *stripped_value, *end; + struct rspamd_map *map = data->map; + guint line_number = 0; + + p = chunk; + c = p; + end = p + len; + + while (p < end) { + switch (data->state) { + case map_skip_spaces_before_key: + if (g_ascii_isspace(*p)) { + p++; + } + else { + if (*p == '"') { + p++; + c = p; + data->state = map_read_key_quoted; + } + else if (*p == '/') { + /* Note that c is on '/' here as '/' is a part of key */ + c = p; + p++; + data->state = map_read_key_slashed; + } + else { + c = p; + data->state = map_read_key; + } + } + break; + case map_read_key: + /* read key */ + /* Check here comments, eol and end of buffer */ + if (*p == '#' && (p == c || *(p - 1) != '\\')) { + if (p - c > 0) { + /* Store a single key */ + MAP_STORE_KEY; + func(data->cur_data, stripped_key, default_value); + msg_debug_map("insert key only pair: %s -> %s; line: %d", + stripped_key, default_value, line_number); + g_free(key); + } + + key = NULL; + data->state = map_read_comment_start; + } + else if (*p == '\r' || *p == '\n') { + if (p - c > 0) { + /* Store a single key */ + MAP_STORE_KEY; + func(data->cur_data, stripped_key, default_value); + msg_debug_map("insert key only pair: %s -> %s; line: %d", + stripped_key, default_value, line_number); + g_free(key); + } + + data->state = map_read_eol; + key = NULL; + } + else if (g_ascii_isspace(*p)) { + if (p - c > 0) { + MAP_STORE_KEY; + data->state = map_skip_spaces_after_key; + } + else { + msg_err_map("empty or invalid key found on line %d", line_number); + data->state = map_skip_comment; + } + } + else { + p++; + } + break; + case map_read_key_quoted: + if (*p == '\\') { + data->state = map_backslash_quoted; + p++; + } + else if (*p == '"') { + /* Allow empty keys in this case */ + if (p - c >= 0) { + MAP_STORE_KEY; + data->state = map_skip_spaces_after_key; + } + else { + g_assert_not_reached(); + } + p++; + } + else { + p++; + } + break; + case map_read_key_slashed: + if (*p == '\\') { + data->state = map_backslash_slashed; + p++; + } + else if (*p == '/') { + /* Allow empty keys in this case */ + if (p - c >= 0) { + data->state = map_read_key_after_slash; + } + else { + g_assert_not_reached(); + } + } + else { + p++; + } + break; + case map_read_key_after_slash: + /* + * This state is equal to reading of key but '/' is not + * treated specially + */ + if (*p == '#') { + if (p - c > 0) { + /* Store a single key */ + MAP_STORE_KEY; + func(data->cur_data, stripped_key, default_value); + msg_debug_map("insert key only pair: %s -> %s; line: %d", + stripped_key, default_value, line_number); + g_free(key); + key = NULL; + } + + data->state = map_read_comment_start; + } + else if (*p == '\r' || *p == '\n') { + if (p - c > 0) { + /* Store a single key */ + MAP_STORE_KEY; + func(data->cur_data, stripped_key, default_value); + + msg_debug_map("insert key only pair: %s -> %s; line: %d", + stripped_key, default_value, line_number); + g_free(key); + key = NULL; + } + + data->state = map_read_eol; + key = NULL; + } + else if (g_ascii_isspace(*p)) { + if (p - c > 0) { + MAP_STORE_KEY; + data->state = map_skip_spaces_after_key; + } + else { + msg_err_map("empty or invalid key found on line %d", line_number); + data->state = map_skip_comment; + } + } + else { + p++; + } + break; + case map_backslash_quoted: + p++; + data->state = map_read_key_quoted; + break; + case map_backslash_slashed: + p++; + data->state = map_read_key_slashed; + break; + case map_skip_spaces_after_key: + if (*p == ' ' || *p == '\t') { + p++; + } + else { + c = p; + data->state = map_read_value; + } + break; + case map_read_value: + if (key == NULL) { + /* Ignore line */ + msg_err_map("empty or invalid key found on line %d", line_number); + data->state = map_skip_comment; + } + else { + if (*p == '#') { + if (p - c > 0) { + /* Store a single key */ + MAP_STORE_VALUE; + func(data->cur_data, stripped_key, stripped_value); + msg_debug_map("insert key value pair: %s -> %s; line: %d", + stripped_key, stripped_value, line_number); + g_free(key); + g_free(value); + key = NULL; + value = NULL; + } + else { + func(data->cur_data, stripped_key, default_value); + msg_debug_map("insert key only pair: %s -> %s; line: %d", + stripped_key, default_value, line_number); + g_free(key); + key = NULL; + } + + data->state = map_read_comment_start; + } + else if (*p == '\r' || *p == '\n') { + if (p - c > 0) { + /* Store a single key */ + MAP_STORE_VALUE; + func(data->cur_data, stripped_key, stripped_value); + msg_debug_map("insert key value pair: %s -> %s", + stripped_key, stripped_value); + g_free(key); + g_free(value); + key = NULL; + value = NULL; + } + else { + func(data->cur_data, stripped_key, default_value); + msg_debug_map("insert key only pair: %s -> %s", + stripped_key, default_value); + g_free(key); + key = NULL; + } + + data->state = map_read_eol; + key = NULL; + } + else { + p++; + } + } + break; + case map_read_comment_start: + if (*p == '#') { + data->state = map_skip_comment; + p++; + key = NULL; + value = NULL; + } + else { + g_assert_not_reached(); + } + break; + case map_skip_comment: + if (*p == '\r' || *p == '\n') { + data->state = map_read_eol; + } + else { + p++; + } + break; + case map_read_eol: + /* Skip \r\n and whitespaces */ + if (*p == '\r' || *p == '\n') { + if (*p == '\n') { + /* We don't care about \r only line separators, they are too rare */ + line_number++; + } + p++; + } + else { + data->state = map_skip_spaces_before_key; + } + break; + default: + g_assert_not_reached(); + break; + } + } + + if (final) { + /* Examine the state */ + switch (data->state) { + case map_read_key: + case map_read_key_slashed: + case map_read_key_quoted: + case map_read_key_after_slash: + if (p - c > 0) { + /* Store a single key */ + MAP_STORE_KEY; + func(data->cur_data, stripped_key, default_value); + msg_debug_map("insert key only pair: %s -> %s", + stripped_key, default_value); + g_free(key); + key = NULL; + } + break; + case map_read_value: + if (key == NULL) { + /* Ignore line */ + msg_err_map("empty or invalid key found on line %d", line_number); + data->state = map_skip_comment; + } + else { + if (p - c > 0) { + /* Store a single key */ + MAP_STORE_VALUE; + func(data->cur_data, stripped_key, stripped_value); + msg_debug_map("insert key value pair: %s -> %s", + stripped_key, stripped_value); + g_free(key); + g_free(value); + key = NULL; + value = NULL; + } + else { + func(data->cur_data, stripped_key, default_value); + msg_debug_map("insert key only pair: %s -> %s", + stripped_key, default_value); + g_free(key); + key = NULL; + } + } + break; + } + + data->state = map_skip_spaces_before_key; + } + + return c; +} + +/** + * Radix tree helper function + */ +void rspamd_map_helper_insert_radix(gpointer st, gconstpointer key, gconstpointer value) +{ + struct rspamd_radix_map_helper *r = (struct rspamd_radix_map_helper *) st; + struct rspamd_map_helper_value *val; + gsize vlen; + khiter_t k; + gconstpointer nk; + rspamd_ftok_t tok; + gint res; + struct rspamd_map *map; + + map = r->map; + tok.begin = key; + tok.len = strlen(key); + + k = kh_get(rspamd_map_hash, r->htb, tok); + + if (k == kh_end(r->htb)) { + nk = rspamd_mempool_strdup(r->pool, key); + tok.begin = nk; + k = kh_put(rspamd_map_hash, r->htb, tok, &res); + } + else { + val = kh_value(r->htb, k); + + if (strcmp(value, val->value) == 0) { + /* Same element, skip */ + return; + } + else { + msg_warn_map("duplicate radix entry found for map %s: %s (old value: '%s', new: '%s')", + map->name, key, val->value, value); + } + + nk = kh_key(r->htb, k).begin; + val->key = nk; + kh_value(r->htb, k) = val; + + return; /* do not touch radix in case of exact duplicate */ + } + + vlen = strlen(value); + val = rspamd_mempool_alloc0(r->pool, sizeof(*val) + + vlen + 1); + memcpy(val->value, value, vlen); + + nk = kh_key(r->htb, k).begin; + val->key = nk; + kh_value(r->htb, k) = val; + rspamd_radix_add_iplist(key, ",", r->trie, val, FALSE, + r->map->name); + rspamd_cryptobox_fast_hash_update(&r->hst, nk, tok.len); +} + +void rspamd_map_helper_insert_radix_resolve(gpointer st, gconstpointer key, gconstpointer value) +{ + struct rspamd_radix_map_helper *r = (struct rspamd_radix_map_helper *) st; + struct rspamd_map_helper_value *val; + gsize vlen; + khiter_t k; + gconstpointer nk; + rspamd_ftok_t tok; + gint res; + struct rspamd_map *map; + + map = r->map; + + if (!key) { + msg_warn_map("cannot insert NULL value in the map: %s", + map->name); + return; + } + + tok.begin = key; + tok.len = strlen(key); + + k = kh_get(rspamd_map_hash, r->htb, tok); + + if (k == kh_end(r->htb)) { + nk = rspamd_mempool_strdup(r->pool, key); + tok.begin = nk; + k = kh_put(rspamd_map_hash, r->htb, tok, &res); + } + else { + val = kh_value(r->htb, k); + + if (strcmp(value, val->value) == 0) { + /* Same element, skip */ + return; + } + else { + msg_warn_map("duplicate radix entry found for map %s: %s (old value: '%s', new: '%s')", + map->name, key, val->value, value); + } + + nk = kh_key(r->htb, k).begin; + val->key = nk; + kh_value(r->htb, k) = val; + + return; /* do not touch radix in case of exact duplicate */ + } + + vlen = strlen(value); + val = rspamd_mempool_alloc0(r->pool, sizeof(*val) + + vlen + 1); + memcpy(val->value, value, vlen); + nk = kh_key(r->htb, k).begin; + val->key = nk; + kh_value(r->htb, k) = val; + rspamd_radix_add_iplist(key, ",", r->trie, val, TRUE, + r->map->name); + rspamd_cryptobox_fast_hash_update(&r->hst, nk, tok.len); +} + +void rspamd_map_helper_insert_hash(gpointer st, gconstpointer key, gconstpointer value) +{ + struct rspamd_hash_map_helper *ht = st; + struct rspamd_map_helper_value *val; + khiter_t k; + gconstpointer nk; + gsize vlen; + gint r; + rspamd_ftok_t tok; + struct rspamd_map *map; + + tok.begin = key; + tok.len = strlen(key); + map = ht->map; + + k = kh_get(rspamd_map_hash, ht->htb, tok); + + if (k == kh_end(ht->htb)) { + nk = rspamd_mempool_strdup(ht->pool, key); + tok.begin = nk; + k = kh_put(rspamd_map_hash, ht->htb, tok, &r); + } + else { + val = kh_value(ht->htb, k); + + if (strcmp(value, val->value) == 0) { + /* Same element, skip */ + return; + } + else { + msg_warn_map("duplicate hash entry found for map %s: %s (old value: '%s', new: '%s')", + map->name, key, val->value, value); + } + } + + /* Null termination due to alloc0 */ + vlen = strlen(value); + val = rspamd_mempool_alloc0(ht->pool, sizeof(*val) + vlen + 1); + memcpy(val->value, value, vlen); + + tok = kh_key(ht->htb, k); + nk = tok.begin; + val->key = nk; + kh_value(ht->htb, k) = val; + + rspamd_cryptobox_fast_hash_update(&ht->hst, nk, tok.len); +} + +void rspamd_map_helper_insert_re(gpointer st, gconstpointer key, gconstpointer value) +{ + struct rspamd_regexp_map_helper *re_map = st; + struct rspamd_map *map; + rspamd_regexp_t *re; + gchar *escaped; + GError *err = NULL; + gint pcre_flags; + gsize escaped_len; + struct rspamd_map_helper_value *val; + khiter_t k; + rspamd_ftok_t tok; + gconstpointer nk; + gsize vlen; + gint r; + + map = re_map->map; + + tok.begin = key; + tok.len = strlen(key); + + k = kh_get(rspamd_map_hash, re_map->htb, tok); + + if (k == kh_end(re_map->htb)) { + nk = rspamd_mempool_strdup(re_map->pool, key); + tok.begin = nk; + k = kh_put(rspamd_map_hash, re_map->htb, tok, &r); + } + else { + val = kh_value(re_map->htb, k); + + /* Always warn about regexp duplicate as it's likely a bad mistake */ + msg_warn_map("duplicate re entry found for map %s: %s (old value: '%s', new: '%s')", + map->name, key, val->value, value); + + if (strcmp(val->value, value) == 0) { + /* Same value, skip */ + return; + } + + /* Replace value but do not touch regexp */ + nk = kh_key(re_map->htb, k).begin; + val->key = nk; + kh_value(re_map->htb, k) = val; + + return; + } + + /* Check regexp stuff */ + if (re_map->map_flags & RSPAMD_REGEXP_MAP_FLAG_GLOB) { + escaped = rspamd_str_regexp_escape(key, strlen(key), &escaped_len, + RSPAMD_REGEXP_ESCAPE_GLOB | RSPAMD_REGEXP_ESCAPE_UTF); + re = rspamd_regexp_new(escaped, NULL, &err); + g_free(escaped); + } + else { + re = rspamd_regexp_new(key, NULL, &err); + } + + if (re == NULL) { + msg_err_map("cannot parse regexp %s: %e", key, err); + + if (err) { + g_error_free(err); + } + + return; + } + + vlen = strlen(value); + val = rspamd_mempool_alloc0(re_map->pool, sizeof(*val) + + vlen + 1); + memcpy(val->value, value, vlen); /* Null terminated due to alloc0 previously */ + nk = kh_key(re_map->htb, k).begin; + val->key = nk; + kh_value(re_map->htb, k) = val; + rspamd_cryptobox_hash_update(&re_map->hst, nk, tok.len); + + pcre_flags = rspamd_regexp_get_pcre_flags(re); + +#ifndef WITH_PCRE2 + if (pcre_flags & PCRE_FLAG(UTF8)) { + re_map->map_flags |= RSPAMD_REGEXP_MAP_FLAG_UTF; + } +#else + if (pcre_flags & PCRE_FLAG(UTF)) { + re_map->map_flags |= RSPAMD_REGEXP_MAP_FLAG_UTF; + } +#endif + + g_ptr_array_add(re_map->regexps, re); + g_ptr_array_add(re_map->values, val); +} + +static void +rspamd_map_helper_traverse_regexp(void *data, + rspamd_map_traverse_cb cb, + gpointer cbdata, + gboolean reset_hits) +{ + rspamd_ftok_t tok; + struct rspamd_map_helper_value *val; + struct rspamd_regexp_map_helper *re_map = data; + + kh_foreach(re_map->htb, tok, val, { + if (!cb(tok.begin, val->value, val->hits, cbdata)) { + break; + } + + if (reset_hits) { + val->hits = 0; + } + }); +} + +struct rspamd_hash_map_helper * +rspamd_map_helper_new_hash(struct rspamd_map *map) +{ + struct rspamd_hash_map_helper *htb; + rspamd_mempool_t *pool; + + if (map) { + pool = rspamd_mempool_new(rspamd_mempool_suggest_size(), + map->tag, 0); + } + else { + pool = rspamd_mempool_new(rspamd_mempool_suggest_size(), + NULL, 0); + } + + htb = rspamd_mempool_alloc0_type(pool, struct rspamd_hash_map_helper); + htb->htb = kh_init(rspamd_map_hash); + htb->pool = pool; + htb->map = map; + rspamd_cryptobox_fast_hash_init(&htb->hst, map_hash_seed); + + return htb; +} + +void rspamd_map_helper_destroy_hash(struct rspamd_hash_map_helper *r) +{ + if (r == NULL || r->pool == NULL) { + return; + } + + rspamd_mempool_t *pool = r->pool; + kh_destroy(rspamd_map_hash, r->htb); + memset(r, 0, sizeof(*r)); + rspamd_mempool_delete(pool); +} + +static void +rspamd_map_helper_traverse_hash(void *data, + rspamd_map_traverse_cb cb, + gpointer cbdata, + gboolean reset_hits) +{ + rspamd_ftok_t tok; + struct rspamd_map_helper_value *val; + struct rspamd_hash_map_helper *ht = data; + + kh_foreach(ht->htb, tok, val, { + if (!cb(tok.begin, val->value, val->hits, cbdata)) { + break; + } + + if (reset_hits) { + val->hits = 0; + } + }); +} + +struct rspamd_radix_map_helper * +rspamd_map_helper_new_radix(struct rspamd_map *map) +{ + struct rspamd_radix_map_helper *r; + rspamd_mempool_t *pool; + const gchar *name = "unnamed"; + + if (map) { + pool = rspamd_mempool_new(rspamd_mempool_suggest_size(), + map->tag, 0); + name = map->name; + } + else { + pool = rspamd_mempool_new(rspamd_mempool_suggest_size(), + NULL, 0); + } + + r = rspamd_mempool_alloc0_type(pool, struct rspamd_radix_map_helper); + r->trie = radix_create_compressed_with_pool(pool, name); + r->htb = kh_init(rspamd_map_hash); + r->pool = pool; + r->map = map; + rspamd_cryptobox_fast_hash_init(&r->hst, map_hash_seed); + + return r; +} + +void rspamd_map_helper_destroy_radix(struct rspamd_radix_map_helper *r) +{ + if (r == NULL || !r->pool) { + return; + } + + kh_destroy(rspamd_map_hash, r->htb); + rspamd_mempool_t *pool = r->pool; + memset(r, 0, sizeof(*r)); + rspamd_mempool_delete(pool); +} + +static void +rspamd_map_helper_traverse_radix(void *data, + rspamd_map_traverse_cb cb, + gpointer cbdata, + gboolean reset_hits) +{ + rspamd_ftok_t tok; + struct rspamd_map_helper_value *val; + struct rspamd_radix_map_helper *r = data; + + kh_foreach(r->htb, tok, val, { + if (!cb(tok.begin, val->value, val->hits, cbdata)) { + break; + } + + if (reset_hits) { + val->hits = 0; + } + }); +} + +struct rspamd_regexp_map_helper * +rspamd_map_helper_new_regexp(struct rspamd_map *map, + enum rspamd_regexp_map_flags flags) +{ + struct rspamd_regexp_map_helper *re_map; + rspamd_mempool_t *pool; + + pool = rspamd_mempool_new(rspamd_mempool_suggest_size(), + map->tag, 0); + + re_map = rspamd_mempool_alloc0_type(pool, struct rspamd_regexp_map_helper); + re_map->pool = pool; + re_map->values = g_ptr_array_new(); + re_map->regexps = g_ptr_array_new(); + re_map->map = map; + re_map->map_flags = flags; + re_map->htb = kh_init(rspamd_map_hash); + rspamd_cryptobox_hash_init(&re_map->hst, NULL, 0); + + return re_map; +} + + +void rspamd_map_helper_destroy_regexp(struct rspamd_regexp_map_helper *re_map) +{ + rspamd_regexp_t *re; + guint i; + + if (!re_map || !re_map->regexps) { + return; + } + +#ifdef WITH_HYPERSCAN + if (re_map->hs_scratch) { + hs_free_scratch(re_map->hs_scratch); + } + if (re_map->hs_db) { + rspamd_hyperscan_free(re_map->hs_db, false); + } + if (re_map->patterns) { + for (i = 0; i < re_map->regexps->len; i++) { + g_free(re_map->patterns[i]); + } + + g_free(re_map->patterns); + } + if (re_map->flags) { + g_free(re_map->flags); + } + if (re_map->ids) { + g_free(re_map->ids); + } +#endif + + for (i = 0; i < re_map->regexps->len; i++) { + re = g_ptr_array_index(re_map->regexps, i); + rspamd_regexp_unref(re); + } + + g_ptr_array_free(re_map->regexps, TRUE); + g_ptr_array_free(re_map->values, TRUE); + kh_destroy(rspamd_map_hash, re_map->htb); + + rspamd_mempool_t *pool = re_map->pool; + memset(re_map, 0, sizeof(*re_map)); + rspamd_mempool_delete(pool); +} + +gchar * +rspamd_kv_list_read( + gchar *chunk, + gint len, + struct map_cb_data *data, + gboolean final) +{ + if (data->cur_data == NULL) { + data->cur_data = rspamd_map_helper_new_hash(data->map); + } + + return rspamd_parse_kv_list( + chunk, + len, + data, + rspamd_map_helper_insert_hash, + "", + final); +} + +void rspamd_kv_list_fin(struct map_cb_data *data, void **target) +{ + struct rspamd_map *map = data->map; + struct rspamd_hash_map_helper *htb; + + if (data->errored) { + /* Clean up the current data and do not touch prev data */ + if (data->cur_data) { + msg_info_map("cleanup unfinished new data as error occurred for %s", + map->name); + htb = (struct rspamd_hash_map_helper *) data->cur_data; + rspamd_map_helper_destroy_hash(htb); + data->cur_data = NULL; + } + } + else { + if (data->cur_data) { + htb = (struct rspamd_hash_map_helper *) data->cur_data; + msg_info_map("read hash of %d elements from %s", kh_size(htb->htb), + map->name); + data->map->traverse_function = rspamd_map_helper_traverse_hash; + data->map->nelts = kh_size(htb->htb); + data->map->digest = rspamd_cryptobox_fast_hash_final(&htb->hst); + } + + if (target) { + *target = data->cur_data; + } + + if (data->prev_data) { + htb = (struct rspamd_hash_map_helper *) data->prev_data; + rspamd_map_helper_destroy_hash(htb); + } + } +} + +void rspamd_kv_list_dtor(struct map_cb_data *data) +{ + struct rspamd_hash_map_helper *htb; + + if (data->cur_data) { + htb = (struct rspamd_hash_map_helper *) data->cur_data; + rspamd_map_helper_destroy_hash(htb); + } +} + +gchar * +rspamd_radix_read( + gchar *chunk, + gint len, + struct map_cb_data *data, + gboolean final) +{ + struct rspamd_radix_map_helper *r; + struct rspamd_map *map = data->map; + + if (data->cur_data == NULL) { + r = rspamd_map_helper_new_radix(map); + data->cur_data = r; + } + + return rspamd_parse_kv_list( + chunk, + len, + data, + rspamd_map_helper_insert_radix, + hash_fill, + final); +} + +void rspamd_radix_fin(struct map_cb_data *data, void **target) +{ + struct rspamd_map *map = data->map; + struct rspamd_radix_map_helper *r; + + if (data->errored) { + /* Clean up the current data and do not touch prev data */ + if (data->cur_data) { + msg_info_map("cleanup unfinished new data as error occurred for %s", + map->name); + r = (struct rspamd_radix_map_helper *) data->cur_data; + rspamd_map_helper_destroy_radix(r); + data->cur_data = NULL; + } + } + else { + if (data->cur_data) { + r = (struct rspamd_radix_map_helper *) data->cur_data; + msg_info_map("read radix trie of %z elements: %s", + radix_get_size(r->trie), radix_get_info(r->trie)); + data->map->traverse_function = rspamd_map_helper_traverse_radix; + data->map->nelts = kh_size(r->htb); + data->map->digest = rspamd_cryptobox_fast_hash_final(&r->hst); + } + + if (target) { + *target = data->cur_data; + } + + if (data->prev_data) { + r = (struct rspamd_radix_map_helper *) data->prev_data; + rspamd_map_helper_destroy_radix(r); + } + } +} + +void rspamd_radix_dtor(struct map_cb_data *data) +{ + struct rspamd_radix_map_helper *r; + + if (data->cur_data) { + r = (struct rspamd_radix_map_helper *) data->cur_data; + rspamd_map_helper_destroy_radix(r); + } +} + +#ifdef WITH_HYPERSCAN + +static gboolean +rspamd_try_load_re_map_cache(struct rspamd_regexp_map_helper *re_map) +{ + gchar fp[PATH_MAX]; + struct rspamd_map *map; + + map = re_map->map; + + if (!map->cfg->hs_cache_dir) { + return FALSE; + } + + rspamd_snprintf(fp, sizeof(fp), "%s/%*xs.hsmc", + map->cfg->hs_cache_dir, + (gint) rspamd_cryptobox_HASHBYTES / 2, re_map->re_digest); + + re_map->hs_db = rspamd_hyperscan_maybe_load(fp, 0); + + return re_map->hs_db != NULL; +} + +static gboolean +rspamd_try_save_re_map_cache(struct rspamd_regexp_map_helper *re_map) +{ + gchar fp[PATH_MAX], np[PATH_MAX]; + gsize len; + gint fd; + char *bytes = NULL; + struct rspamd_map *map; + + map = re_map->map; + + if (!map->cfg->hs_cache_dir) { + return FALSE; + } + + rspamd_snprintf(fp, sizeof(fp), "%s/hsmc-XXXXXXXXXXXXX", + re_map->map->cfg->hs_cache_dir); + + if ((fd = g_mkstemp_full(fp, O_WRONLY | O_CREAT | O_EXCL, 00644)) != -1) { + if (hs_serialize_database(rspamd_hyperscan_get_database(re_map->hs_db), &bytes, &len) == HS_SUCCESS) { + if (write(fd, bytes, len) == -1) { + msg_warn_map("cannot write hyperscan cache to %s: %s", + fp, strerror(errno)); + unlink(fp); + free(bytes); + } + else { + free(bytes); + fsync(fd); + + rspamd_snprintf(np, sizeof(np), "%s/%*xs.hsmc", + re_map->map->cfg->hs_cache_dir, + (gint) rspamd_cryptobox_HASHBYTES / 2, re_map->re_digest); + + if (rename(fp, np) == -1) { + msg_warn_map("cannot rename hyperscan cache from %s to %s: %s", + fp, np, strerror(errno)); + unlink(fp); + } + else { + msg_info_map("written cached hyperscan data for %s to %s (%Hz length)", + map->name, np, len); + rspamd_hyperscan_notice_known(np); + } + } + } + else { + msg_warn_map("cannot serialize hyperscan cache to %s: %s", + fp, strerror(errno)); + unlink(fp); + } + + + close(fd); + } + + return FALSE; +} + +#endif + +static void +rspamd_re_map_finalize(struct rspamd_regexp_map_helper *re_map) +{ +#ifdef WITH_HYPERSCAN + guint i; + hs_platform_info_t plt; + hs_compile_error_t *err; + struct rspamd_map *map; + rspamd_regexp_t *re; + gint pcre_flags; + + map = re_map->map; + +#if !defined(__aarch64__) && !defined(__powerpc64__) + if (!(map->cfg->libs_ctx->crypto_ctx->cpu_config & CPUID_SSSE3)) { + msg_info_map("disable hyperscan for map %s, ssse3 instructions are not supported by CPU", + map->name); + return; + } +#endif + + if (hs_populate_platform(&plt) != HS_SUCCESS) { + msg_err_map("cannot populate hyperscan platform"); + return; + } + + re_map->patterns = g_new(gchar *, re_map->regexps->len); + re_map->flags = g_new(gint, re_map->regexps->len); + re_map->ids = g_new(gint, re_map->regexps->len); + + for (i = 0; i < re_map->regexps->len; i++) { + const gchar *pat; + gchar *escaped; + gint pat_flags; + + re = g_ptr_array_index(re_map->regexps, i); + pcre_flags = rspamd_regexp_get_pcre_flags(re); + pat = rspamd_regexp_get_pattern(re); + pat_flags = rspamd_regexp_get_flags(re); + + if (pat_flags & RSPAMD_REGEXP_FLAG_UTF) { + escaped = rspamd_str_regexp_escape(pat, strlen(pat), NULL, + RSPAMD_REGEXP_ESCAPE_RE | RSPAMD_REGEXP_ESCAPE_UTF); + re_map->flags[i] |= HS_FLAG_UTF8; + } + else { + escaped = rspamd_str_regexp_escape(pat, strlen(pat), NULL, + RSPAMD_REGEXP_ESCAPE_RE); + } + + re_map->patterns[i] = escaped; + re_map->flags[i] = HS_FLAG_SINGLEMATCH; + +#ifndef WITH_PCRE2 + if (pcre_flags & PCRE_FLAG(UTF8)) { + re_map->flags[i] |= HS_FLAG_UTF8; + } +#else + if (pcre_flags & PCRE_FLAG(UTF)) { + re_map->flags[i] |= HS_FLAG_UTF8; + } +#endif + if (pcre_flags & PCRE_FLAG(CASELESS)) { + re_map->flags[i] |= HS_FLAG_CASELESS; + } + if (pcre_flags & PCRE_FLAG(MULTILINE)) { + re_map->flags[i] |= HS_FLAG_MULTILINE; + } + if (pcre_flags & PCRE_FLAG(DOTALL)) { + re_map->flags[i] |= HS_FLAG_DOTALL; + } + if (rspamd_regexp_get_maxhits(re) == 1) { + re_map->flags[i] |= HS_FLAG_SINGLEMATCH; + } + + re_map->ids[i] = i; + } + + if (re_map->regexps->len > 0 && re_map->patterns) { + + if (!rspamd_try_load_re_map_cache(re_map)) { + gdouble ts1 = rspamd_get_ticks(FALSE); + hs_database_t *hs_db = NULL; + + if (hs_compile_multi((const gchar **) re_map->patterns, + re_map->flags, + re_map->ids, + re_map->regexps->len, + HS_MODE_BLOCK, + &plt, + &hs_db, + &err) != HS_SUCCESS) { + + msg_err_map("cannot create tree of regexp when processing '%s': %s", + err->expression >= 0 ? re_map->patterns[err->expression] : "unknown regexp", err->message); + re_map->hs_db = NULL; + hs_free_compile_error(err); + + return; + } + + if (re_map->map->cfg->hs_cache_dir) { + char fpath[PATH_MAX]; + rspamd_snprintf(fpath, sizeof(fpath), "%s/%*xs.hsmc", + re_map->map->cfg->hs_cache_dir, + (gint) rspamd_cryptobox_HASHBYTES / 2, re_map->re_digest); + re_map->hs_db = rspamd_hyperscan_from_raw_db(hs_db, fpath); + } + else { + re_map->hs_db = rspamd_hyperscan_from_raw_db(hs_db, NULL); + } + + ts1 = (rspamd_get_ticks(FALSE) - ts1) * 1000.0; + msg_info_map("hyperscan compiled %d regular expressions from %s in %.1f ms", + re_map->regexps->len, re_map->map->name, ts1); + rspamd_try_save_re_map_cache(re_map); + } + else { + msg_info_map("hyperscan read %d cached regular expressions from %s", + re_map->regexps->len, re_map->map->name); + } + + if (hs_alloc_scratch(rspamd_hyperscan_get_database(re_map->hs_db), &re_map->hs_scratch) != HS_SUCCESS) { + msg_err_map("cannot allocate scratch space for hyperscan"); + rspamd_hyperscan_free(re_map->hs_db, true); + re_map->hs_db = NULL; + } + } + else { + msg_err_map("regexp map is empty"); + } +#endif +} + +gchar * +rspamd_regexp_list_read_single( + gchar *chunk, + gint len, + struct map_cb_data *data, + gboolean final) +{ + struct rspamd_regexp_map_helper *re_map; + + if (data->cur_data == NULL) { + re_map = rspamd_map_helper_new_regexp(data->map, 0); + data->cur_data = re_map; + } + + return rspamd_parse_kv_list( + chunk, + len, + data, + rspamd_map_helper_insert_re, + hash_fill, + final); +} + +gchar * +rspamd_glob_list_read_single( + gchar *chunk, + gint len, + struct map_cb_data *data, + gboolean final) +{ + struct rspamd_regexp_map_helper *re_map; + + if (data->cur_data == NULL) { + re_map = rspamd_map_helper_new_regexp(data->map, RSPAMD_REGEXP_MAP_FLAG_GLOB); + data->cur_data = re_map; + } + + return rspamd_parse_kv_list( + chunk, + len, + data, + rspamd_map_helper_insert_re, + hash_fill, + final); +} + +gchar * +rspamd_regexp_list_read_multiple( + gchar *chunk, + gint len, + struct map_cb_data *data, + gboolean final) +{ + struct rspamd_regexp_map_helper *re_map; + + if (data->cur_data == NULL) { + re_map = rspamd_map_helper_new_regexp(data->map, + RSPAMD_REGEXP_MAP_FLAG_MULTIPLE); + data->cur_data = re_map; + } + + return rspamd_parse_kv_list( + chunk, + len, + data, + rspamd_map_helper_insert_re, + hash_fill, + final); +} + +gchar * +rspamd_glob_list_read_multiple( + gchar *chunk, + gint len, + struct map_cb_data *data, + gboolean final) +{ + struct rspamd_regexp_map_helper *re_map; + + if (data->cur_data == NULL) { + re_map = rspamd_map_helper_new_regexp(data->map, + RSPAMD_REGEXP_MAP_FLAG_GLOB | RSPAMD_REGEXP_MAP_FLAG_MULTIPLE); + data->cur_data = re_map; + } + + return rspamd_parse_kv_list( + chunk, + len, + data, + rspamd_map_helper_insert_re, + hash_fill, + final); +} + + +void rspamd_regexp_list_fin(struct map_cb_data *data, void **target) +{ + struct rspamd_regexp_map_helper *re_map = NULL, *old_re_map; + struct rspamd_map *map = data->map; + + if (data->errored) { + /* Clean up the current data and do not touch prev data */ + if (data->cur_data) { + msg_info_map("cleanup unfinished new data as error occurred for %s", + map->name); + re_map = (struct rspamd_regexp_map_helper *) data->cur_data; + rspamd_map_helper_destroy_regexp(re_map); + data->cur_data = NULL; + } + } + else { + if (data->cur_data) { + re_map = data->cur_data; + rspamd_cryptobox_hash_final(&re_map->hst, re_map->re_digest); + memcpy(&data->map->digest, re_map->re_digest, sizeof(data->map->digest)); + rspamd_re_map_finalize(re_map); + msg_info_map("read regexp list of %ud elements", + re_map->regexps->len); + data->map->traverse_function = rspamd_map_helper_traverse_regexp; + data->map->nelts = kh_size(re_map->htb); + } + + if (target) { + *target = data->cur_data; + } + + if (data->prev_data) { + old_re_map = data->prev_data; + rspamd_map_helper_destroy_regexp(old_re_map); + } + } +} +void rspamd_regexp_list_dtor(struct map_cb_data *data) +{ + if (data->cur_data) { + rspamd_map_helper_destroy_regexp(data->cur_data); + } +} + +#ifdef WITH_HYPERSCAN +static int +rspamd_match_hs_single_handler(unsigned int id, unsigned long long from, + unsigned long long to, + unsigned int flags, void *context) +{ + guint *i = context; + /* Always return non-zero as we need a single match here */ + + *i = id; + + return 1; +} +#endif + +gconstpointer +rspamd_match_regexp_map_single(struct rspamd_regexp_map_helper *map, + const gchar *in, gsize len) +{ + guint i; + rspamd_regexp_t *re; + gint res = 0; + gpointer ret = NULL; + struct rspamd_map_helper_value *val; + gboolean validated = FALSE; + + g_assert(in != NULL); + + if (map == NULL || len == 0 || map->regexps == NULL) { + return NULL; + } + + if (map->map_flags & RSPAMD_REGEXP_MAP_FLAG_UTF) { + if (rspamd_fast_utf8_validate(in, len) == 0) { + validated = TRUE; + } + } + else { + validated = TRUE; + } + +#ifdef WITH_HYPERSCAN + if (map->hs_db && map->hs_scratch) { + + if (validated) { + + res = hs_scan(rspamd_hyperscan_get_database(map->hs_db), in, len, 0, + map->hs_scratch, + rspamd_match_hs_single_handler, (void *) &i); + + if (res == HS_SCAN_TERMINATED) { + res = 1; + val = g_ptr_array_index(map->values, i); + + ret = val->value; + val->hits++; + } + + return ret; + } + } +#endif + + if (!res) { + /* PCRE version */ + for (i = 0; i < map->regexps->len; i++) { + re = g_ptr_array_index(map->regexps, i); + + if (rspamd_regexp_search(re, in, len, NULL, NULL, !validated, NULL)) { + val = g_ptr_array_index(map->values, i); + + ret = val->value; + val->hits++; + break; + } + } + } + + return ret; +} + +#ifdef WITH_HYPERSCAN +struct rspamd_multiple_cbdata { + GPtrArray *ar; + struct rspamd_regexp_map_helper *map; +}; + +static int +rspamd_match_hs_multiple_handler(unsigned int id, unsigned long long from, + unsigned long long to, + unsigned int flags, void *context) +{ + struct rspamd_multiple_cbdata *cbd = context; + struct rspamd_map_helper_value *val; + + + if (id < cbd->map->values->len) { + val = g_ptr_array_index(cbd->map->values, id); + val->hits++; + g_ptr_array_add(cbd->ar, val->value); + } + + /* Always return zero as we need all matches here */ + return 0; +} +#endif + +GPtrArray * +rspamd_match_regexp_map_all(struct rspamd_regexp_map_helper *map, + const gchar *in, gsize len) +{ + guint i; + rspamd_regexp_t *re; + GPtrArray *ret; + gint res = 0; + gboolean validated = FALSE; + struct rspamd_map_helper_value *val; + + if (map == NULL || map->regexps == NULL || len == 0) { + return NULL; + } + + g_assert(in != NULL); + + if (map->map_flags & RSPAMD_REGEXP_MAP_FLAG_UTF) { + if (rspamd_fast_utf8_validate(in, len) == 0) { + validated = TRUE; + } + } + else { + validated = TRUE; + } + + ret = g_ptr_array_new(); + +#ifdef WITH_HYPERSCAN + if (map->hs_db && map->hs_scratch) { + + if (validated) { + struct rspamd_multiple_cbdata cbd; + + cbd.ar = ret; + cbd.map = map; + + if (hs_scan(rspamd_hyperscan_get_database(map->hs_db), in, len, + 0, map->hs_scratch, + rspamd_match_hs_multiple_handler, &cbd) == HS_SUCCESS) { + res = 1; + } + } + } +#endif + + if (!res) { + /* PCRE version */ + for (i = 0; i < map->regexps->len; i++) { + re = g_ptr_array_index(map->regexps, i); + + if (rspamd_regexp_search(re, in, len, NULL, NULL, + !validated, NULL)) { + val = g_ptr_array_index(map->values, i); + val->hits++; + g_ptr_array_add(ret, val->value); + } + } + } + + if (ret->len > 0) { + return ret; + } + + g_ptr_array_free(ret, TRUE); + + return NULL; +} + +gconstpointer +rspamd_match_hash_map(struct rspamd_hash_map_helper *map, const gchar *in, + gsize len) +{ + khiter_t k; + struct rspamd_map_helper_value *val; + rspamd_ftok_t tok; + + if (map == NULL || map->htb == NULL) { + return NULL; + } + + tok.begin = in; + tok.len = len; + + k = kh_get(rspamd_map_hash, map->htb, tok); + + if (k != kh_end(map->htb)) { + val = kh_value(map->htb, k); + val->hits++; + + return val->value; + } + + return NULL; +} + +gconstpointer +rspamd_match_radix_map(struct rspamd_radix_map_helper *map, + const guchar *in, gsize inlen) +{ + struct rspamd_map_helper_value *val; + + if (map == NULL || map->trie == NULL) { + return NULL; + } + + val = (struct rspamd_map_helper_value *) radix_find_compressed(map->trie, + in, inlen); + + if (val != (gconstpointer) RADIX_NO_VALUE) { + val->hits++; + + return val->value; + } + + return NULL; +} + +gconstpointer +rspamd_match_radix_map_addr(struct rspamd_radix_map_helper *map, + const rspamd_inet_addr_t *addr) +{ + struct rspamd_map_helper_value *val; + + if (map == NULL || map->trie == NULL) { + return NULL; + } + + val = (struct rspamd_map_helper_value *) radix_find_compressed_addr(map->trie, addr); + + if (val != (gconstpointer) RADIX_NO_VALUE) { + val->hits++; + + return val->value; + } + + return NULL; +} + + +/* + * CBD stuff + */ + +struct rspamd_cdb_map_helper * +rspamd_map_helper_new_cdb(struct rspamd_map *map) +{ + struct rspamd_cdb_map_helper *n; + + n = g_malloc0(sizeof(*n)); + n->cdbs = (GQueue) G_QUEUE_INIT; + n->map = map; + + rspamd_cryptobox_fast_hash_init(&n->hst, map_hash_seed); + + return n; +} + +void rspamd_map_helper_destroy_cdb(struct rspamd_cdb_map_helper *c) +{ + if (c == NULL) { + return; + } + + GList *cur = c->cdbs.head; + + while (cur) { + struct cdb *cdb = (struct cdb *) cur->data; + + cdb_free(cdb); + g_free(cdb->filename); + close(cdb->cdb_fd); + g_free(cdb); + + cur = g_list_next(cur); + } + + g_queue_clear(&c->cdbs); + + g_free(c); +} + +gchar * +rspamd_cdb_list_read(gchar *chunk, + gint len, + struct map_cb_data *data, + gboolean final) +{ + struct rspamd_cdb_map_helper *cdb_data; + struct cdb *found = NULL; + struct rspamd_map *map = data->map; + + g_assert(map->no_file_read); + + if (data->cur_data == NULL) { + cdb_data = rspamd_map_helper_new_cdb(data->map); + data->cur_data = cdb_data; + } + else { + cdb_data = (struct rspamd_cdb_map_helper *) data->cur_data; + } + + GList *cur = cdb_data->cdbs.head; + + while (cur) { + struct cdb *elt = (struct cdb *) cur->data; + + if (strcmp(elt->filename, chunk) == 0) { + found = elt; + break; + } + + cur = g_list_next(cur); + } + + if (found == NULL) { + /* New cdb */ + gint fd; + struct cdb *cdb; + + fd = rspamd_file_xopen(chunk, O_RDONLY, 0, TRUE); + + if (fd == -1) { + msg_err_map("cannot open cdb map from %s: %s", chunk, strerror(errno)); + + return NULL; + } + + cdb = g_malloc0(sizeof(struct cdb)); + + if (cdb_init(cdb, fd) == -1) { + g_free(cdb); + msg_err_map("cannot init cdb map from %s: %s", chunk, strerror(errno)); + + return NULL; + } + + cdb->filename = g_strdup(chunk); + g_queue_push_tail(&cdb_data->cdbs, cdb); + cdb_data->total_size += cdb->cdb_fsize; + rspamd_cryptobox_fast_hash_update(&cdb_data->hst, chunk, len); + } + + return chunk + len; +} + +void rspamd_cdb_list_fin(struct map_cb_data *data, void **target) +{ + struct rspamd_map *map = data->map; + struct rspamd_cdb_map_helper *cdb_data; + + if (data->errored) { + /* Clean up the current data and do not touch prev data */ + if (data->cur_data) { + msg_info_map("cleanup unfinished new data as error occurred for %s", + map->name); + cdb_data = (struct rspamd_cdb_map_helper *) data->cur_data; + rspamd_map_helper_destroy_cdb(cdb_data); + data->cur_data = NULL; + } + } + else { + if (data->cur_data) { + cdb_data = (struct rspamd_cdb_map_helper *) data->cur_data; + msg_info_map("read cdb of %Hz size", cdb_data->total_size); + data->map->traverse_function = NULL; + data->map->nelts = 0; + data->map->digest = rspamd_cryptobox_fast_hash_final(&cdb_data->hst); + } + + if (target) { + *target = data->cur_data; + } + + if (data->prev_data) { + cdb_data = (struct rspamd_cdb_map_helper *) data->prev_data; + rspamd_map_helper_destroy_cdb(cdb_data); + } + } +} +void rspamd_cdb_list_dtor(struct map_cb_data *data) +{ + if (data->cur_data) { + rspamd_map_helper_destroy_cdb(data->cur_data); + } +} + +gconstpointer +rspamd_match_cdb_map(struct rspamd_cdb_map_helper *map, + const gchar *in, gsize inlen) +{ + if (map == NULL || map->cdbs.head == NULL) { + return NULL; + } + + GList *cur = map->cdbs.head; + static rspamd_ftok_t found; + + while (cur) { + struct cdb *cdb = (struct cdb *) cur->data; + + if (cdb_find(cdb, in, inlen) > 0) { + /* Extract and push value to lua as string */ + unsigned vlen; + gconstpointer vpos; + + vpos = cdb->cdb_mem + cdb_datapos(cdb); + vlen = cdb_datalen(cdb); + found.len = vlen; + found.begin = vpos; + + return &found; /* Do not reuse! */ + } + + cur = g_list_next(cur); + } + + return NULL; +} diff --git a/src/libserver/maps/map_helpers.h b/src/libserver/maps/map_helpers.h new file mode 100644 index 0000000..82c62b6 --- /dev/null +++ b/src/libserver/maps/map_helpers.h @@ -0,0 +1,269 @@ +/*- + * Copyright 2018 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RSPAMD_MAP_HELPERS_H +#define RSPAMD_MAP_HELPERS_H + +#include "config.h" +#include "map.h" +#include "addr.h" + +/** + * @file map_helpers.h + * + * Defines helper structures to deal with different map types + */ + + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Common structures, abstract for simplicity + */ +struct rspamd_radix_map_helper; +struct rspamd_hash_map_helper; +struct rspamd_regexp_map_helper; +struct rspamd_cdb_map_helper; +struct rspamd_map_helper_value; + +enum rspamd_regexp_map_flags { + RSPAMD_REGEXP_MAP_FLAG_UTF = (1u << 0), + RSPAMD_REGEXP_MAP_FLAG_MULTIPLE = (1u << 1), + RSPAMD_REGEXP_MAP_FLAG_GLOB = (1u << 2), +}; + +typedef void (*rspamd_map_insert_func)(gpointer st, gconstpointer key, + gconstpointer value); + +/** + * Radix list is a list like ip/mask + */ +gchar *rspamd_radix_read( + gchar *chunk, + gint len, + struct map_cb_data *data, + gboolean final); + +void rspamd_radix_fin(struct map_cb_data *data, void **target); + +void rspamd_radix_dtor(struct map_cb_data *data); + +/** + * Kv list is an ordinal list of keys and values separated by whitespace + */ +gchar *rspamd_kv_list_read( + gchar *chunk, + gint len, + struct map_cb_data *data, + gboolean final); + +void rspamd_kv_list_fin(struct map_cb_data *data, void **target); + +void rspamd_kv_list_dtor(struct map_cb_data *data); + +/** + * Cdb is a cdb mapped file with shared data + * chunk must be filename! + */ +gchar *rspamd_cdb_list_read( + gchar *chunk, + gint len, + struct map_cb_data *data, + gboolean final); +void rspamd_cdb_list_fin(struct map_cb_data *data, void **target); +void rspamd_cdb_list_dtor(struct map_cb_data *data); + +/** + * Regexp list is a list of regular expressions + */ + +gchar *rspamd_regexp_list_read_single( + gchar *chunk, + gint len, + struct map_cb_data *data, + gboolean final); + +gchar *rspamd_regexp_list_read_multiple( + gchar *chunk, + gint len, + struct map_cb_data *data, + gboolean final); + +gchar *rspamd_glob_list_read_single( + gchar *chunk, + gint len, + struct map_cb_data *data, + gboolean final); + +gchar *rspamd_glob_list_read_multiple( + gchar *chunk, + gint len, + struct map_cb_data *data, + gboolean final); + +void rspamd_regexp_list_fin(struct map_cb_data *data, void **target); + +void rspamd_regexp_list_dtor(struct map_cb_data *data); + +/** + * FSM for lists parsing (support comments, blank lines and partial replies) + */ +gchar * +rspamd_parse_kv_list( + gchar *chunk, + gint len, + struct map_cb_data *data, + rspamd_map_insert_func func, + const gchar *default_value, + gboolean final); + +/** + * Find a single (any) matching regexp for the specified text or NULL if + * no matches found + * @param map + * @param in + * @param len + * @return + */ +gconstpointer rspamd_match_regexp_map_single(struct rspamd_regexp_map_helper *map, + const gchar *in, gsize len); + +/** + * Find a multiple (all) matching regexp for the specified text or NULL if + * no matches found. Returns GPtrArray that *must* be freed by a caller if not NULL + * @param map + * @param in + * @param len + * @return + */ +GPtrArray *rspamd_match_regexp_map_all(struct rspamd_regexp_map_helper *map, + const gchar *in, gsize len); + +/** + * Find value matching specific key in a hash map + * @param map + * @param in + * @param len + * @return + */ +gconstpointer rspamd_match_hash_map(struct rspamd_hash_map_helper *map, + const gchar *in, gsize len); + +/** + * Find value matching specific key in a cdb map + * @param map + * @param in + * @param len + * @return rspamd_ftok_t pointer (allocated in a static buffer!) + */ +gconstpointer rspamd_match_cdb_map(struct rspamd_cdb_map_helper *map, + const gchar *in, gsize len); + +/** + * Find value matching specific key in a hash map + * @param map + * @param in raw ip address + * @param inlen ip address length (4 for IPv4 and 16 for IPv6) + * @return + */ +gconstpointer rspamd_match_radix_map(struct rspamd_radix_map_helper *map, + const guchar *in, gsize inlen); + +gconstpointer rspamd_match_radix_map_addr(struct rspamd_radix_map_helper *map, + const rspamd_inet_addr_t *addr); + +/** + * Creates radix map helper + * @param map + * @return + */ +struct rspamd_radix_map_helper *rspamd_map_helper_new_radix(struct rspamd_map *map); + +/** + * Inserts new value into radix map + * @param st + * @param key + * @param value + */ +void rspamd_map_helper_insert_radix(gpointer st, gconstpointer key, gconstpointer value); + +/** + * Inserts new value into radix map performing synchronous resolving + * @param st + * @param key + * @param value + */ +void rspamd_map_helper_insert_radix_resolve(gpointer st, gconstpointer key, + gconstpointer value); + +/** + * Destroys radix map helper + * @param r + */ +void rspamd_map_helper_destroy_radix(struct rspamd_radix_map_helper *r); + + +/** + * Creates hash map helper + * @param map + * @return + */ +struct rspamd_hash_map_helper *rspamd_map_helper_new_hash(struct rspamd_map *map); + +/** + * Inserts a new value into a hash map + * @param st + * @param key + * @param value + */ +void rspamd_map_helper_insert_hash(gpointer st, gconstpointer key, gconstpointer value); + +/** + * Destroys hash map helper + * @param r + */ +void rspamd_map_helper_destroy_hash(struct rspamd_hash_map_helper *r); + +/** + * Create new regexp map + * @param map + * @param flags + * @return + */ +struct rspamd_regexp_map_helper *rspamd_map_helper_new_regexp(struct rspamd_map *map, + enum rspamd_regexp_map_flags flags); + +/** + * Inserts a new regexp into regexp map + * @param st + * @param key + * @param value + */ +void rspamd_map_helper_insert_re(gpointer st, gconstpointer key, gconstpointer value); + +/** + * Destroy regexp map + * @param re_map + */ +void rspamd_map_helper_destroy_regexp(struct rspamd_regexp_map_helper *re_map); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/libserver/maps/map_private.h b/src/libserver/maps/map_private.h new file mode 100644 index 0000000..60751c0 --- /dev/null +++ b/src/libserver/maps/map_private.h @@ -0,0 +1,226 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef SRC_LIBUTIL_MAP_PRIVATE_H_ +#define SRC_LIBUTIL_MAP_PRIVATE_H_ + +#include "config.h" +#include "mem_pool.h" +#include "keypair.h" +#include "unix-std.h" +#include "map.h" +#include "ref.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void (*rspamd_map_tmp_dtor)(gpointer p); + +extern guint rspamd_map_log_id; +#define msg_err_map(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \ + "map", map->tag, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +#define msg_warn_map(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \ + "map", map->tag, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +#define msg_info_map(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \ + "map", map->tag, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +#define msg_debug_map(...) rspamd_conditional_debug_fast(NULL, NULL, \ + rspamd_map_log_id, "map", map->tag, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) + +enum fetch_proto { + MAP_PROTO_FILE, + MAP_PROTO_HTTP, + MAP_PROTO_HTTPS, + MAP_PROTO_STATIC +}; + +/** + * Data specific to file maps + */ +struct file_map_data { + gchar *filename; + gboolean need_modify; + ev_stat st_ev; +}; + + +struct http_map_data; + +struct rspamd_http_map_cached_cbdata { + ev_timer timeout; + struct ev_loop *event_loop; + struct rspamd_storage_shmem *shm; + struct rspamd_map *map; + struct http_map_data *data; + guint64 gen; + time_t last_checked; +}; + +struct rspamd_map_cachepoint { + gint available; + gsize len; + time_t last_modified; + gchar shmem_name[256]; +}; + +/** + * Data specific to HTTP maps + */ +struct http_map_data { + /* Shared cache data */ + struct rspamd_map_cachepoint *cache; + /* Non-shared for cache owner, used to cleanup cache */ + struct rspamd_http_map_cached_cbdata *cur_cache_cbd; + gchar *userinfo; + gchar *path; + gchar *host; + gchar *rest; + rspamd_fstring_t *etag; + time_t last_modified; + time_t last_checked; + gboolean request_sent; + guint64 gen; + guint16 port; +}; + +struct static_map_data { + guchar *data; + gsize len; + gboolean processed; +}; + +union rspamd_map_backend_data { + struct file_map_data *fd; + struct http_map_data *hd; + struct static_map_data *sd; +}; + + +struct rspamd_map; +struct rspamd_map_backend { + enum fetch_proto protocol; + gboolean is_signed; + gboolean is_compressed; + gboolean is_fallback; + struct rspamd_map *map; + struct ev_loop *event_loop; + guint32 id; + struct rspamd_cryptobox_pubkey *trusted_pubkey; + union rspamd_map_backend_data data; + gchar *uri; + ref_entry_t ref; +}; + +struct map_periodic_cbdata; + +struct rspamd_map { + struct rspamd_dns_resolver *r; + struct rspamd_config *cfg; + GPtrArray *backends; + struct rspamd_map_backend *fallback_backend; + map_cb_t read_callback; + map_fin_cb_t fin_callback; + map_dtor_t dtor; + void **user_data; + struct ev_loop *event_loop; + struct rspamd_worker *wrk; + gchar *description; + gchar *name; + guint32 id; + struct map_periodic_cbdata *scheduled_check; + rspamd_map_tmp_dtor tmp_dtor; + gpointer tmp_dtor_data; + rspamd_map_traverse_function traverse_function; + rspamd_map_on_load_function on_load_function; + gpointer on_load_ud; + GDestroyNotify on_load_ud_dtor; + gpointer lua_map; + gsize nelts; + guint64 digest; + /* Should we check HTTP or just load cached data */ + ev_tstamp timeout; + gdouble poll_timeout; + time_t next_check; + bool active_http; + bool non_trivial; /* E.g. has http backends in active mode */ + bool file_only; /* No HTTP backends found */ + bool static_only; /* No need to check */ + bool no_file_read; /* Do not read files */ + /* Shared lock for temporary disabling of map reading (e.g. when this map is written by UI) */ + gint *locked; + gchar tag[MEMPOOL_UID_LEN]; +}; + +enum rspamd_map_http_stage { + http_map_resolve_host2 = 0, /* 2 requests sent */ + http_map_resolve_host1, /* 1 requests sent */ + http_map_http_conn, /* http connection */ + http_map_terminated /* terminated when doing resolving */ +}; + +struct map_periodic_cbdata { + struct rspamd_map *map; + struct map_cb_data cbdata; + ev_timer ev; + gboolean need_modify; + gboolean errored; + gboolean locked; + guint cur_backend; + ref_entry_t ref; +}; + +static const gchar rspamd_http_file_magic[] = + {'r', 'm', 'c', 'd', '2', '0', '0', '0'}; + +struct rspamd_http_file_data { + guchar magic[sizeof(rspamd_http_file_magic)]; + goffset data_off; + gulong mtime; + gulong next_check; + gulong etag_len; +}; + +struct http_callback_data { + struct ev_loop *event_loop; + struct rspamd_http_connection *conn; + GPtrArray *addrs; + rspamd_inet_addr_t *addr; + struct rspamd_map *map; + struct rspamd_map_backend *bk; + struct http_map_data *data; + struct map_periodic_cbdata *periodic; + struct rspamd_cryptobox_pubkey *pk; + struct rspamd_storage_shmem *shmem_data; + gsize data_len; + gboolean check; + enum rspamd_map_http_stage stage; + ev_tstamp timeout; + + ref_entry_t ref; +}; + +#ifdef __cplusplus +} +#endif + +#endif /* SRC_LIBUTIL_MAP_PRIVATE_H_ */ |