diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-10 21:30:40 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-10 21:30:40 +0000 |
commit | 133a45c109da5310add55824db21af5239951f93 (patch) | |
tree | ba6ac4c0a950a0dda56451944315d66409923918 /src/libserver/http/http_context.c | |
parent | Initial commit. (diff) | |
download | rspamd-133a45c109da5310add55824db21af5239951f93.tar.xz rspamd-133a45c109da5310add55824db21af5239951f93.zip |
Adding upstream version 3.8.1.upstream/3.8.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/libserver/http/http_context.c')
-rw-r--r-- | src/libserver/http/http_context.c | 670 |
1 files changed, 670 insertions, 0 deletions
diff --git a/src/libserver/http/http_context.c b/src/libserver/http/http_context.c new file mode 100644 index 0000000..f08e33b --- /dev/null +++ b/src/libserver/http/http_context.c @@ -0,0 +1,670 @@ +/* + * Copyright 2023 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "http_context.h" +#include "http_private.h" +#include "keypair.h" +#include "keypairs_cache.h" +#include "cfg_file.h" +#include "contrib/libottery/ottery.h" +#include "contrib/http-parser/http_parser.h" +#include "ssl_util.h" +#include "rspamd.h" +#include "libev_helper.h" + +INIT_LOG_MODULE(http_context) + +#define msg_debug_http_context(...) rspamd_conditional_debug_fast(NULL, NULL, \ + rspamd_http_context_log_id, "http_context", NULL, \ + G_STRFUNC, \ + __VA_ARGS__) + +static struct rspamd_http_context *default_ctx = NULL; + +struct rspamd_http_keepalive_cbdata { + struct rspamd_http_connection *conn; + struct rspamd_http_context *ctx; + GQueue *queue; + GList *link; + struct rspamd_io_ev ev; +}; + +static void +rspamd_http_keepalive_queue_cleanup(GQueue *conns) +{ + GList *cur; + + cur = conns->head; + + while (cur) { + struct rspamd_http_keepalive_cbdata *cbd; + + cbd = (struct rspamd_http_keepalive_cbdata *) cur->data; + /* unref call closes fd, so we need to remove ev watcher first! */ + rspamd_ev_watcher_stop(cbd->ctx->event_loop, &cbd->ev); + rspamd_http_connection_unref(cbd->conn); + g_free(cbd); + + cur = cur->next; + } + + g_queue_clear(conns); +} + +static void +rspamd_http_context_client_rotate_ev(struct ev_loop *loop, ev_timer *w, int revents) +{ + struct rspamd_http_context *ctx = (struct rspamd_http_context *) w->data; + gpointer kp; + + w->repeat = rspamd_time_jitter(ctx->config.client_key_rotate_time, 0); + msg_debug_http_context("rotate local keypair, next rotate in %.0f seconds", + w->repeat); + + ev_timer_again(loop, w); + + kp = ctx->client_kp; + ctx->client_kp = rspamd_keypair_new(RSPAMD_KEYPAIR_KEX, + RSPAMD_CRYPTOBOX_MODE_25519); + rspamd_keypair_unref(kp); +} + +static struct rspamd_http_context * +rspamd_http_context_new_default(struct rspamd_config *cfg, + struct ev_loop *ev_base, + struct upstream_ctx *ups_ctx) +{ + struct rspamd_http_context *ctx; + + static const int default_kp_size = 1024; + static const gdouble default_rotate_time = 120; + static const gdouble default_keepalive_interval = 65; + static const gchar *default_user_agent = "rspamd-" RSPAMD_VERSION_FULL; + static const gchar *default_server_hdr = "rspamd/" RSPAMD_VERSION_FULL; + + ctx = g_malloc0(sizeof(*ctx)); + ctx->config.kp_cache_size_client = default_kp_size; + ctx->config.kp_cache_size_server = default_kp_size; + ctx->config.client_key_rotate_time = default_rotate_time; + ctx->config.user_agent = default_user_agent; + ctx->config.keepalive_interval = default_keepalive_interval; + ctx->config.server_hdr = default_server_hdr; + ctx->ups_ctx = ups_ctx; + + if (cfg) { + ctx->ssl_ctx = cfg->libs_ctx->ssl_ctx; + ctx->ssl_ctx_noverify = cfg->libs_ctx->ssl_ctx_noverify; + } + else { + ctx->ssl_ctx = rspamd_init_ssl_ctx(); + ctx->ssl_ctx_noverify = rspamd_init_ssl_ctx_noverify(); + } + + ctx->event_loop = ev_base; + + ctx->keep_alive_hash = kh_init(rspamd_keep_alive_hash); + + return ctx; +} + +static void +rspamd_http_context_parse_proxy(struct rspamd_http_context *ctx, + const gchar *name, + struct upstream_list **pls) +{ + struct http_parser_url u; + struct upstream_list *uls; + + if (!ctx->ups_ctx) { + msg_err("cannot parse http_proxy %s - upstreams context is undefined", name); + return; + } + + memset(&u, 0, sizeof(u)); + + if (http_parser_parse_url(name, strlen(name), 1, &u) == 0) { + if (!(u.field_set & (1u << UF_HOST)) || u.port == 0) { + msg_err("cannot parse http(s) proxy %s - invalid host or port", name); + + return; + } + + uls = rspamd_upstreams_create(ctx->ups_ctx); + + if (!rspamd_upstreams_parse_line_len(uls, + name + u.field_data[UF_HOST].off, + u.field_data[UF_HOST].len, u.port, NULL)) { + msg_err("cannot parse http(s) proxy %s - invalid data", name); + + rspamd_upstreams_destroy(uls); + } + else { + *pls = uls; + msg_info("set http(s) proxy to %s", name); + } + } + else { + uls = rspamd_upstreams_create(ctx->ups_ctx); + + if (!rspamd_upstreams_parse_line(uls, + name, 3128, NULL)) { + msg_err("cannot parse http(s) proxy %s - invalid data", name); + + rspamd_upstreams_destroy(uls); + } + else { + *pls = uls; + msg_info("set http(s) proxy to %s", name); + } + } +} + +static void +rspamd_http_context_init(struct rspamd_http_context *ctx) +{ + if (ctx->config.kp_cache_size_client > 0) { + ctx->client_kp_cache = rspamd_keypair_cache_new(ctx->config.kp_cache_size_client); + } + + if (ctx->config.kp_cache_size_server > 0) { + ctx->server_kp_cache = rspamd_keypair_cache_new(ctx->config.kp_cache_size_server); + } + + if (ctx->config.client_key_rotate_time > 0 && ctx->event_loop) { + double jittered = rspamd_time_jitter(ctx->config.client_key_rotate_time, + 0); + + ev_timer_init(&ctx->client_rotate_ev, + rspamd_http_context_client_rotate_ev, jittered, 0); + ev_timer_start(ctx->event_loop, &ctx->client_rotate_ev); + ctx->client_rotate_ev.data = ctx; + } + + if (ctx->config.http_proxy) { + rspamd_http_context_parse_proxy(ctx, ctx->config.http_proxy, + &ctx->http_proxies); + } + + default_ctx = ctx; +} + +struct rspamd_http_context * +rspamd_http_context_create(struct rspamd_config *cfg, + struct ev_loop *ev_base, + struct upstream_ctx *ups_ctx) +{ + struct rspamd_http_context *ctx; + const ucl_object_t *http_obj; + + ctx = rspamd_http_context_new_default(cfg, ev_base, ups_ctx); + http_obj = ucl_object_lookup(cfg->cfg_ucl_obj, "http"); + + if (http_obj) { + const ucl_object_t *server_obj, *client_obj; + + client_obj = ucl_object_lookup(http_obj, "client"); + + if (client_obj) { + const ucl_object_t *kp_size; + + kp_size = ucl_object_lookup(client_obj, "cache_size"); + + if (kp_size) { + ctx->config.kp_cache_size_client = ucl_object_toint(kp_size); + } + + const ucl_object_t *rotate_time; + + rotate_time = ucl_object_lookup(client_obj, "rotate_time"); + + if (rotate_time) { + ctx->config.client_key_rotate_time = ucl_object_todouble(rotate_time); + } + + const ucl_object_t *user_agent; + + user_agent = ucl_object_lookup(client_obj, "user_agent"); + + if (user_agent) { + ctx->config.user_agent = ucl_object_tostring(user_agent); + + if (ctx->config.user_agent && strlen(ctx->config.user_agent) == 0) { + ctx->config.user_agent = NULL; + } + } + + const ucl_object_t *server_hdr; + server_hdr = ucl_object_lookup(client_obj, "server_hdr"); + + if (server_hdr) { + ctx->config.server_hdr = ucl_object_tostring(server_hdr); + + if (ctx->config.server_hdr && strlen(ctx->config.server_hdr) == 0) { + ctx->config.server_hdr = ""; + } + } + + const ucl_object_t *keepalive_interval; + + keepalive_interval = ucl_object_lookup(client_obj, "keepalive_interval"); + + if (keepalive_interval) { + ctx->config.keepalive_interval = ucl_object_todouble(keepalive_interval); + } + + const ucl_object_t *http_proxy; + http_proxy = ucl_object_lookup(client_obj, "http_proxy"); + + if (http_proxy) { + ctx->config.http_proxy = ucl_object_tostring(http_proxy); + } + } + + server_obj = ucl_object_lookup(http_obj, "server"); + + if (server_obj) { + const ucl_object_t *kp_size; + + kp_size = ucl_object_lookup(server_obj, "cache_size"); + + if (kp_size) { + ctx->config.kp_cache_size_server = ucl_object_toint(kp_size); + } + } + } + + rspamd_http_context_init(ctx); + + return ctx; +} + + +void rspamd_http_context_free(struct rspamd_http_context *ctx) +{ + if (ctx == default_ctx) { + default_ctx = NULL; + } + + if (ctx->client_kp_cache) { + rspamd_keypair_cache_destroy(ctx->client_kp_cache); + } + + if (ctx->server_kp_cache) { + rspamd_keypair_cache_destroy(ctx->server_kp_cache); + } + + if (ctx->config.client_key_rotate_time > 0) { + ev_timer_stop(ctx->event_loop, &ctx->client_rotate_ev); + + if (ctx->client_kp) { + rspamd_keypair_unref(ctx->client_kp); + } + } + + struct rspamd_keepalive_hash_key *hk; + + kh_foreach_key(ctx->keep_alive_hash, hk, { + msg_debug_http_context("cleanup keepalive elt %s (%s)", + rspamd_inet_address_to_string_pretty(hk->addr), + hk->host); + + if (hk->host) { + g_free(hk->host); + } + + rspamd_inet_address_free(hk->addr); + rspamd_http_keepalive_queue_cleanup(&hk->conns); + g_free(hk); + }); + + kh_destroy(rspamd_keep_alive_hash, ctx->keep_alive_hash); + + if (ctx->http_proxies) { + rspamd_upstreams_destroy(ctx->http_proxies); + } + + g_free(ctx); +} + +struct rspamd_http_context * +rspamd_http_context_create_config(struct rspamd_http_context_cfg *cfg, + struct ev_loop *ev_base, + struct upstream_ctx *ups_ctx) +{ + struct rspamd_http_context *ctx; + + ctx = rspamd_http_context_new_default(NULL, ev_base, ups_ctx); + memcpy(&ctx->config, cfg, sizeof(*cfg)); + rspamd_http_context_init(ctx); + + return ctx; +} + +struct rspamd_http_context * +rspamd_http_context_default(void) +{ + g_assert(default_ctx != NULL); + + return default_ctx; +} + +gint32 +rspamd_keep_alive_key_hash(struct rspamd_keepalive_hash_key *k) +{ + rspamd_cryptobox_fast_hash_state_t hst; + + rspamd_cryptobox_fast_hash_init(&hst, 0); + + if (k->host) { + rspamd_cryptobox_fast_hash_update(&hst, k->host, strlen(k->host)); + } + + rspamd_cryptobox_fast_hash_update(&hst, &k->port, sizeof(k->port)); + rspamd_cryptobox_fast_hash_update(&hst, &k->is_ssl, sizeof(k->is_ssl)); + + return rspamd_cryptobox_fast_hash_final(&hst); +} + +bool rspamd_keep_alive_key_equal(struct rspamd_keepalive_hash_key *k1, + struct rspamd_keepalive_hash_key *k2) +{ + if (k1->is_ssl != k2->is_ssl) { + return false; + } + + if (k1->host && k2->host) { + if (k1->port == k2->port) { + return strcmp(k1->host, k2->host) == 0; + } + } + else if (!k1->host && !k2->host) { + return (k1->port == k2->port); + } + + /* One has host and another has no host */ + return false; +} + +struct rspamd_http_connection * +rspamd_http_context_check_keepalive(struct rspamd_http_context *ctx, + const rspamd_inet_addr_t *addr, + const gchar *host, + bool is_ssl) +{ + struct rspamd_keepalive_hash_key hk, *phk; + khiter_t k; + + if (ctx == NULL) { + ctx = rspamd_http_context_default(); + } + + hk.addr = (rspamd_inet_addr_t *) addr; + hk.host = (gchar *) host; + hk.port = rspamd_inet_address_get_port(addr); + hk.is_ssl = is_ssl; + + k = kh_get(rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk); + + if (k != kh_end(ctx->keep_alive_hash)) { + phk = kh_key(ctx->keep_alive_hash, k); + GQueue *conns = &phk->conns; + + /* Use stack based approach */ + + if (g_queue_get_length(conns) > 0) { + struct rspamd_http_keepalive_cbdata *cbd; + struct rspamd_http_connection *conn; + gint err; + socklen_t len = sizeof(gint); + + cbd = g_queue_pop_head(conns); + rspamd_ev_watcher_stop(ctx->event_loop, &cbd->ev); + conn = cbd->conn; + g_free(cbd); + + if (getsockopt(conn->fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len) == -1) { + err = errno; + } + + if (err != 0) { + rspamd_http_connection_unref(conn); + + msg_debug_http_context("invalid reused keepalive element %s (%s, ssl=%d); " + "%s error; " + "%d connections queued", + rspamd_inet_address_to_string_pretty(phk->addr), + phk->host, + (int) phk->is_ssl, + g_strerror(err), + conns->length); + + return NULL; + } + + msg_debug_http_context("reused keepalive element %s (%s, ssl=%d), %d connections queued", + rspamd_inet_address_to_string_pretty(phk->addr), + phk->host, + (int) phk->is_ssl, + conns->length); + + /* We transfer refcount here! */ + return conn; + } + else { + msg_debug_http_context("found empty keepalive element %s (%s), cannot reuse", + rspamd_inet_address_to_string_pretty(phk->addr), + phk->host); + } + } + + return NULL; +} + +const rspamd_inet_addr_t * +rspamd_http_context_has_keepalive(struct rspamd_http_context *ctx, + const gchar *host, + unsigned port, + bool is_ssl) +{ + struct rspamd_keepalive_hash_key hk, *phk; + khiter_t k; + + if (ctx == NULL) { + ctx = rspamd_http_context_default(); + } + + hk.host = (gchar *) host; + hk.port = port; + hk.is_ssl = is_ssl; + + k = kh_get(rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk); + + if (k != kh_end(ctx->keep_alive_hash)) { + phk = kh_key(ctx->keep_alive_hash, k); + GQueue *conns = &phk->conns; + + if (g_queue_get_length(conns) > 0) { + return phk->addr; + } + } + + return NULL; +} + +void rspamd_http_context_prepare_keepalive(struct rspamd_http_context *ctx, + struct rspamd_http_connection *conn, + const rspamd_inet_addr_t *addr, + const gchar *host, + bool is_ssl) +{ + struct rspamd_keepalive_hash_key hk, *phk; + khiter_t k; + + hk.addr = (rspamd_inet_addr_t *) addr; + hk.host = (gchar *) host; + hk.is_ssl = is_ssl; + hk.port = rspamd_inet_address_get_port(addr); + + k = kh_get(rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk); + + if (k != kh_end(ctx->keep_alive_hash)) { + /* Reuse existing */ + conn->keepalive_hash_key = kh_key(ctx->keep_alive_hash, k); + msg_debug_http_context("use existing keepalive element %s (%s)", + rspamd_inet_address_to_string_pretty(conn->keepalive_hash_key->addr), + conn->keepalive_hash_key->host); + } + else { + /* Create new one */ + GQueue empty_init = G_QUEUE_INIT; + gint r; + + phk = g_malloc(sizeof(*phk)); + phk->conns = empty_init; + phk->host = g_strdup(host); + phk->is_ssl = is_ssl; + phk->addr = rspamd_inet_address_copy(addr, NULL); + phk->port = hk.port; + + + kh_put(rspamd_keep_alive_hash, ctx->keep_alive_hash, phk, &r); + conn->keepalive_hash_key = phk; + + msg_debug_http_context("create new keepalive element %s (%s)", + rspamd_inet_address_to_string_pretty(conn->keepalive_hash_key->addr), + conn->keepalive_hash_key->host); + } +} + +static void +rspamd_http_keepalive_handler(gint fd, short what, gpointer ud) +{ + struct rspamd_http_keepalive_cbdata *cbdata = + (struct rspamd_http_keepalive_cbdata *) ud; /* + * We can get here if a remote side reported something or it has + * timed out. In both cases we just terminate keepalive connection. + */ + + g_queue_delete_link(cbdata->queue, cbdata->link); + msg_debug_http_context("remove keepalive element %s (%s), %d connections left", + rspamd_inet_address_to_string_pretty(cbdata->conn->keepalive_hash_key->addr), + cbdata->conn->keepalive_hash_key->host, + cbdata->queue->length); + /* unref call closes fd, so we need to remove ev watcher first! */ + rspamd_ev_watcher_stop(cbdata->ctx->event_loop, &cbdata->ev); + rspamd_http_connection_unref(cbdata->conn); + g_free(cbdata); +} + +/* Non-static for unit testing */ +long rspamd_http_parse_keepalive_timeout(const rspamd_ftok_t *tok) +{ + long timeout = -1; + goffset pos = rspamd_substring_search(tok->begin, + tok->len, "timeout", sizeof("timeout") - 1); + + if (pos != -1) { + pos += sizeof("timeout") - 1; + + /* Skip spaces and equal sign */ + while (pos < tok->len) { + if (tok->begin[pos] != '=' && !g_ascii_isspace(tok->begin[pos])) { + break; + } + pos++; + } + + gsize ndigits = rspamd_memspn(tok->begin + pos, "0123456789", tok->len - pos); + glong real_timeout; + + if (ndigits > 0) { + if (rspamd_strtoul(tok->begin + pos, ndigits, &real_timeout)) { + timeout = real_timeout; + msg_debug_http_context("got timeout attr %l", timeout); + } + } + } + + return timeout; +} + +void rspamd_http_context_push_keepalive(struct rspamd_http_context *ctx, + struct rspamd_http_connection *conn, + struct rspamd_http_message *msg, + struct ev_loop *event_loop) +{ + struct rspamd_http_keepalive_cbdata *cbdata; + gdouble timeout = ctx->config.keepalive_interval; + + g_assert(conn->keepalive_hash_key != NULL); + + if (msg) { + const rspamd_ftok_t *tok; + rspamd_ftok_t cmp; + + tok = rspamd_http_message_find_header(msg, "Connection"); + + if (!tok) { + /* Server has not stated that it can do keep alive */ + conn->finished = TRUE; + msg_debug_http_context("no Connection header"); + return; + } + + RSPAMD_FTOK_ASSIGN(&cmp, "keep-alive"); + + if (rspamd_ftok_casecmp(&cmp, tok) != 0) { + conn->finished = TRUE; + msg_debug_http_context("connection header is not `keep-alive`"); + return; + } + + /* We can proceed, check timeout */ + + tok = rspamd_http_message_find_header(msg, "Keep-Alive"); + + if (tok) { + long maybe_timeout = rspamd_http_parse_keepalive_timeout(tok); + + if (maybe_timeout > 0) { + timeout = maybe_timeout; + } + } + } + + /* Move connection to the keepalive pool */ + cbdata = g_malloc0(sizeof(*cbdata)); + + cbdata->conn = rspamd_http_connection_ref(conn); + /* Use stack like approach to that would easy reading */ + g_queue_push_head(&conn->keepalive_hash_key->conns, cbdata); + cbdata->link = conn->keepalive_hash_key->conns.head; + + cbdata->queue = &conn->keepalive_hash_key->conns; + cbdata->ctx = ctx; + conn->finished = FALSE; + + rspamd_ev_watcher_init(&cbdata->ev, conn->fd, EV_READ, + rspamd_http_keepalive_handler, + cbdata); + rspamd_ev_watcher_start(event_loop, &cbdata->ev, timeout); + + msg_debug_http_context("push keepalive element %s (%s), %d connections queued, %.1f timeout", + rspamd_inet_address_to_string_pretty(cbdata->conn->keepalive_hash_key->addr), + cbdata->conn->keepalive_hash_key->host, + cbdata->queue->length, + timeout); +}
\ No newline at end of file |