summaryrefslogtreecommitdiffstats
path: root/src/libstat/backends/http_backend.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstat/backends/http_backend.cxx')
-rw-r--r--src/libstat/backends/http_backend.cxx440
1 files changed, 440 insertions, 0 deletions
diff --git a/src/libstat/backends/http_backend.cxx b/src/libstat/backends/http_backend.cxx
new file mode 100644
index 0000000..075e508
--- /dev/null
+++ b/src/libstat/backends/http_backend.cxx
@@ -0,0 +1,440 @@
+/*-
+ * Copyright 2022 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "stat_internal.h"
+#include "libserver/http/http_connection.h"
+#include "libserver/mempool_vars_internal.h"
+#include "upstream.h"
+#include "contrib/ankerl/unordered_dense.h"
+#include <algorithm>
+#include <vector>
+
+namespace rspamd::stat::http {
+
+#define msg_debug_stat_http(...) rspamd_conditional_debug_fast(NULL, NULL, \
+ rspamd_stat_http_log_id, "stat_http", task->task_pool->tag.uid, \
+ RSPAMD_LOG_FUNC, \
+ __VA_ARGS__)
+
+INIT_LOG_MODULE(stat_http)
+
+/* Represents all http backends defined in some configuration */
+class http_backends_collection {
+ std::vector<struct rspamd_statfile *> backends;
+ double timeout = 1.0; /* Default timeout */
+ struct upstream_list *read_servers = nullptr;
+ struct upstream_list *write_servers = nullptr;
+
+public:
+ static auto get() -> http_backends_collection &
+ {
+ static http_backends_collection *singleton = nullptr;
+
+ if (singleton == nullptr) {
+ singleton = new http_backends_collection;
+ }
+
+ return *singleton;
+ }
+
+ /**
+ * Add a new backend and (optionally initialize the basic backend parameters
+ * @param ctx
+ * @param cfg
+ * @param st
+ * @return
+ */
+ auto add_backend(struct rspamd_stat_ctx *ctx,
+ struct rspamd_config *cfg,
+ struct rspamd_statfile *st) -> bool;
+ /**
+ * Remove a statfile cleaning things up if the last statfile is removed
+ * @param st
+ * @return
+ */
+ auto remove_backend(struct rspamd_statfile *st) -> bool;
+
+ upstream *get_upstream(bool is_learn);
+
+private:
+ http_backends_collection() = default;
+ auto first_init(struct rspamd_stat_ctx *ctx,
+ struct rspamd_config *cfg,
+ struct rspamd_statfile *st) -> bool;
+};
+
+/*
+ * Created one per each task
+ */
+class http_backend_runtime final {
+public:
+ static auto create(struct rspamd_task *task, bool is_learn) -> http_backend_runtime *;
+ /* Add a new statfile with a specific id to the list of statfiles */
+ auto notice_statfile(int id, const struct rspamd_statfile_config *st) -> void
+ {
+ seen_statfiles[id] = st;
+ }
+
+ auto process_tokens(struct rspamd_task *task,
+ GPtrArray *tokens,
+ gint id,
+ bool learn) -> bool;
+
+private:
+ http_backends_collection *all_backends;
+ ankerl::unordered_dense::map<int, const struct rspamd_statfile_config *> seen_statfiles;
+ struct upstream *selected;
+
+private:
+ http_backend_runtime(struct rspamd_task *task, bool is_learn)
+ : all_backends(&http_backends_collection::get())
+ {
+ selected = all_backends->get_upstream(is_learn);
+ }
+ ~http_backend_runtime() = default;
+ static auto dtor(void *p) -> void
+ {
+ ((http_backend_runtime *) p)->~http_backend_runtime();
+ }
+};
+
+/*
+ * Efficient way to make a messagepack payload from stat tokens,
+ * avoiding any intermediate libraries, as we would send many tokens
+ * all together
+ */
+static auto
+stat_tokens_to_msgpack(GPtrArray *tokens) -> std::vector<std::uint8_t>
+{
+ std::vector<std::uint8_t> ret;
+ rspamd_token_t *cur;
+ int i;
+
+ /*
+ * We define array, it's size and N elements each is uint64_t
+ * Layout:
+ * 0xdd - array marker
+ * [4 bytes be] - size of the array
+ * [ 0xcf + <8 bytes BE integer>] * N - array elements
+ */
+ ret.resize(tokens->len * (sizeof(std::uint64_t) + 1) + 5);
+ ret.push_back('\xdd');
+ std::uint32_t ulen = GUINT32_TO_BE(tokens->len);
+ std::copy((const std::uint8_t *) &ulen,
+ ((const std::uint8_t *) &ulen) + sizeof(ulen), std::back_inserter(ret));
+
+ PTR_ARRAY_FOREACH(tokens, i, cur)
+ {
+ ret.push_back('\xcf');
+ std::uint64_t val = GUINT64_TO_BE(cur->data);
+ std::copy((const std::uint8_t *) &val,
+ ((const std::uint8_t *) &val) + sizeof(val), std::back_inserter(ret));
+ }
+
+ return ret;
+}
+
+auto http_backend_runtime::create(struct rspamd_task *task, bool is_learn) -> http_backend_runtime *
+{
+ /* Alloc type provide proper size and alignment */
+ auto *allocated_runtime = rspamd_mempool_alloc_type(task->task_pool, http_backend_runtime);
+
+ rspamd_mempool_add_destructor(task->task_pool, http_backend_runtime::dtor, allocated_runtime);
+
+ return new (allocated_runtime) http_backend_runtime{task, is_learn};
+}
+
+auto http_backend_runtime::process_tokens(struct rspamd_task *task, GPtrArray *tokens, gint id, bool learn) -> bool
+{
+ if (!learn) {
+ if (id == seen_statfiles.size() - 1) {
+ /* Emit http request on the last statfile */
+ }
+ }
+ else {
+ /* On learn we need to learn all statfiles that we were requested to learn */
+ if (seen_statfiles.empty()) {
+ /* Request has been already set, or nothing to learn */
+ return true;
+ }
+ else {
+ seen_statfiles.clear();
+ }
+ }
+
+ return true;
+}
+
+auto http_backends_collection::add_backend(struct rspamd_stat_ctx *ctx,
+ struct rspamd_config *cfg,
+ struct rspamd_statfile *st) -> bool
+{
+ /* On empty list of backends we know that we need to load backend data actually */
+ if (backends.empty()) {
+ if (!first_init(ctx, cfg, st)) {
+ return false;
+ }
+ }
+
+ backends.push_back(st);
+
+ return true;
+}
+
+auto http_backends_collection::first_init(struct rspamd_stat_ctx *ctx,
+ struct rspamd_config *cfg,
+ struct rspamd_statfile *st) -> bool
+{
+ auto try_load_backend_config = [&](const ucl_object_t *obj) -> bool {
+ if (!obj || ucl_object_type(obj) != UCL_OBJECT) {
+ return false;
+ }
+
+ /* First try to load read servers */
+ auto *rs = ucl_object_lookup_any(obj, "read_servers", "servers", nullptr);
+ if (rs) {
+ read_servers = rspamd_upstreams_create(cfg->ups_ctx);
+
+ if (read_servers == nullptr) {
+ return false;
+ }
+
+ if (!rspamd_upstreams_from_ucl(read_servers, rs, 80, this)) {
+ rspamd_upstreams_destroy(read_servers);
+ return false;
+ }
+ }
+ auto *ws = ucl_object_lookup_any(obj, "write_servers", "servers", nullptr);
+ if (ws) {
+ write_servers = rspamd_upstreams_create(cfg->ups_ctx);
+
+ if (write_servers == nullptr) {
+ return false;
+ }
+
+ if (!rspamd_upstreams_from_ucl(write_servers, rs, 80, this)) {
+ rspamd_upstreams_destroy(write_servers);
+ return false;
+ }
+ }
+
+ auto *tim = ucl_object_lookup(obj, "timeout");
+
+ if (tim) {
+ timeout = ucl_object_todouble(tim);
+ }
+
+ return true;
+ };
+
+ auto ret = false;
+ auto obj = ucl_object_lookup(st->classifier->cfg->opts, "backend");
+ if (obj != nullptr) {
+ ret = try_load_backend_config(obj);
+ }
+
+ /* Now try statfiles config */
+ if (!ret && st->stcf->opts) {
+ ret = try_load_backend_config(st->stcf->opts);
+ }
+
+ /* Now try classifier config */
+ if (!ret && st->classifier->cfg->opts) {
+ ret = try_load_backend_config(st->classifier->cfg->opts);
+ }
+
+ return ret;
+}
+
+auto http_backends_collection::remove_backend(struct rspamd_statfile *st) -> bool
+{
+ auto backend_it = std::remove(std::begin(backends), std::end(backends), st);
+
+ if (backend_it != std::end(backends)) {
+ /* Fast erasure with no order preservation */
+ std::swap(*backend_it, backends.back());
+ backends.pop_back();
+
+ if (backends.empty()) {
+ /* De-init collection - likely config reload */
+ if (read_servers) {
+ rspamd_upstreams_destroy(read_servers);
+ read_servers = nullptr;
+ }
+
+ if (write_servers) {
+ rspamd_upstreams_destroy(write_servers);
+ write_servers = nullptr;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+}
+
+upstream *http_backends_collection::get_upstream(bool is_learn)
+{
+ auto *ups_list = read_servers;
+ if (is_learn) {
+ ups_list = write_servers;
+ }
+
+ return rspamd_upstream_get(ups_list, RSPAMD_UPSTREAM_ROUND_ROBIN, nullptr, 0);
+}
+
+}// namespace rspamd::stat::http
+
+/* C API */
+
+gpointer
+rspamd_http_init(struct rspamd_stat_ctx *ctx,
+ struct rspamd_config *cfg,
+ struct rspamd_statfile *st)
+{
+ auto &collections = rspamd::stat::http::http_backends_collection::get();
+
+ if (!collections.add_backend(ctx, cfg, st)) {
+ msg_err_config("cannot load http backend");
+
+ return nullptr;
+ }
+
+ return (void *) &collections;
+}
+gpointer
+rspamd_http_runtime(struct rspamd_task *task,
+ struct rspamd_statfile_config *stcf,
+ gboolean learn,
+ gpointer ctx,
+ gint id)
+{
+ auto maybe_existing = rspamd_mempool_get_variable(task->task_pool, RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME);
+
+ if (maybe_existing != nullptr) {
+ auto real_runtime = (rspamd::stat::http::http_backend_runtime *) maybe_existing;
+ real_runtime->notice_statfile(id, stcf);
+
+ return maybe_existing;
+ }
+
+ auto runtime = rspamd::stat::http::http_backend_runtime::create(task, learn);
+
+ if (runtime) {
+ runtime->notice_statfile(id, stcf);
+ rspamd_mempool_set_variable(task->task_pool, RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME,
+ (void *) runtime, nullptr);
+ }
+
+ return (void *) runtime;
+}
+
+gboolean
+rspamd_http_process_tokens(struct rspamd_task *task,
+ GPtrArray *tokens,
+ gint id,
+ gpointer runtime)
+{
+ auto real_runtime = (rspamd::stat::http::http_backend_runtime *) runtime;
+
+ if (real_runtime) {
+ return real_runtime->process_tokens(task, tokens, id, false);
+ }
+
+
+ return false;
+}
+gboolean
+rspamd_http_finalize_process(struct rspamd_task *task,
+ gpointer runtime,
+ gpointer ctx)
+{
+ /* Not needed */
+ return true;
+}
+
+gboolean
+rspamd_http_learn_tokens(struct rspamd_task *task,
+ GPtrArray *tokens,
+ gint id,
+ gpointer runtime)
+{
+ auto real_runtime = (rspamd::stat::http::http_backend_runtime *) runtime;
+
+ if (real_runtime) {
+ return real_runtime->process_tokens(task, tokens, id, true);
+ }
+
+
+ return false;
+}
+gboolean
+rspamd_http_finalize_learn(struct rspamd_task *task,
+ gpointer runtime,
+ gpointer ctx,
+ GError **err)
+{
+ return false;
+}
+
+gulong rspamd_http_total_learns(struct rspamd_task *task,
+ gpointer runtime,
+ gpointer ctx)
+{
+ /* TODO */
+ return 0;
+}
+gulong
+rspamd_http_inc_learns(struct rspamd_task *task,
+ gpointer runtime,
+ gpointer ctx)
+{
+ /* TODO */
+ return 0;
+}
+gulong
+rspamd_http_dec_learns(struct rspamd_task *task,
+ gpointer runtime,
+ gpointer ctx)
+{
+ /* TODO */
+ return (gulong) -1;
+}
+gulong
+rspamd_http_learns(struct rspamd_task *task,
+ gpointer runtime,
+ gpointer ctx)
+{
+ /* TODO */
+ return 0;
+}
+ucl_object_t *
+rspamd_http_get_stat(gpointer runtime, gpointer ctx)
+{
+ /* TODO */
+ return nullptr;
+}
+gpointer
+rspamd_http_load_tokenizer_config(gpointer runtime, gsize *len)
+{
+ return nullptr;
+}
+void rspamd_http_close(gpointer ctx)
+{
+ /* TODO */
+} \ No newline at end of file