summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_plugin_proxy.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 02:57:58 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 02:57:58 +0000
commitbe1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch)
tree9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/src/flb_plugin_proxy.c
parentInitial commit. (diff)
downloadnetdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz
netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/flb_plugin_proxy.c')
-rw-r--r--fluent-bit/src/flb_plugin_proxy.c498
1 files changed, 498 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_plugin_proxy.c b/fluent-bit/src/flb_plugin_proxy.c
new file mode 100644
index 00000000..440c5452
--- /dev/null
+++ b/fluent-bit/src/flb_plugin_proxy.c
@@ -0,0 +1,498 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include <monkey/mk_core.h>
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_api.h>
+#include <fluent-bit/flb_error.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_plugin_proxy.h>
+#include <fluent-bit/flb_input_log.h>
+
+/* Proxies */
+#include "proxy/go/go.h"
+
+#define PROXY_CALLBACK_TIME 1 /* 1 seconds */
+
+static void proxy_cb_flush(struct flb_event_chunk *event_chunk,
+ struct flb_output_flush *out_flush,
+ struct flb_input_instance *i_ins,
+ void *out_context,
+ struct flb_config *config)
+{
+ int ret = FLB_ERROR;
+ struct flb_plugin_proxy_context *ctx = out_context;
+ (void) i_ins;
+ (void) config;
+
+
+#ifdef FLB_HAVE_PROXY_GO
+ if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) {
+ flb_trace("[GO] entering go_flush()");
+ ret = proxy_go_output_flush(ctx,
+ event_chunk->data,
+ event_chunk->size,
+ event_chunk->tag,
+ flb_sds_len(event_chunk->tag));
+ }
+#else
+ (void) ctx;
+#endif
+
+ if (ret != FLB_OK && ret != FLB_RETRY && ret != FLB_ERROR) {
+ FLB_OUTPUT_RETURN(FLB_ERROR);
+ }
+
+ FLB_OUTPUT_RETURN(ret);
+}
+
+static int flb_proxy_input_cb_collect(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ int ret = FLB_OK;
+ size_t len = 0;
+ void *data = NULL;
+ struct flb_plugin_input_proxy_context *ctx = (struct flb_plugin_input_proxy_context *) in_context;
+
+#ifdef FLB_HAVE_PROXY_GO
+ if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) {
+ flb_trace("[GO] entering go_collect()");
+ ret = proxy_go_input_collect(ctx->proxy, &data, &len);
+
+ if (len == 0) {
+ flb_trace("[GO] No logs are ingested");
+ return -1;
+ }
+
+ if (ret == -1) {
+ flb_errno();
+ return -1;
+ }
+
+ flb_input_log_append(ins, NULL, 0, data, len);
+
+ ret = proxy_go_input_cleanup(ctx->proxy, data);
+ if (ret == -1) {
+ flb_errno();
+ return -1;
+ }
+ }
+#endif
+
+ return 0;
+}
+
+static int flb_proxy_input_cb_init(struct flb_input_instance *ins,
+ struct flb_config *config, void *data)
+{
+ int ret = -1;
+ struct flb_plugin_input_proxy_context *ctx;
+ struct flb_plugin_proxy_context *pc;
+
+ /* Allocate space for the configuration context */
+ ctx = flb_malloc(sizeof(struct flb_plugin_input_proxy_context));
+ if (!ctx) {
+ flb_errno();
+ return -1;
+ }
+
+ /* Before to initialize for proxy, set the proxy instance reference */
+ pc = (struct flb_plugin_proxy_context *)(ins->context);
+ ctx->proxy = pc->proxy;
+
+ /* Before to initialize, set the instance reference */
+ pc->proxy->instance = ins;
+
+ /* Based on 'proxy', use the proper handler */
+ if (pc->proxy->def->proxy == FLB_PROXY_GOLANG) {
+#ifdef FLB_HAVE_PROXY_GO
+ ret = proxy_go_input_init(pc->proxy);
+
+ if (ret == -1) {
+ flb_error("Could not initialize proxy for threaded input plugin");
+ goto init_error;
+ }
+#else
+ flb_error("Could not find initializing function on proxy for threaded input plugin");
+ goto init_error;
+#endif
+ }
+ else {
+ flb_error("[proxy] unrecognized input proxy handler %i",
+ pc->proxy->def->proxy);
+ }
+
+ /* Set the context */
+ flb_input_set_context(ins, ctx);
+
+ /* Collect upon data available on timer */
+ ret = flb_input_set_collector_time(ins,
+ flb_proxy_input_cb_collect,
+ PROXY_CALLBACK_TIME, 0,
+ config);
+
+ if (ret == -1) {
+ flb_error("Could not set collector for threaded proxy input plugin");
+ goto init_error;
+ }
+ ctx->coll_fd = ret;
+
+ return ret;
+
+init_error:
+ flb_free(ctx);
+
+ return -1;
+}
+
+static void flb_proxy_input_cb_pause(void *data, struct flb_config *config)
+{
+ struct flb_plugin_input_proxy_context *ctx = data;
+
+ flb_input_collector_pause(ctx->coll_fd, ctx->proxy->instance);
+}
+
+static void flb_proxy_input_cb_resume(void *data, struct flb_config *config)
+{
+ struct flb_plugin_input_proxy_context *ctx = data;
+
+ flb_input_collector_resume(ctx->coll_fd, ctx->proxy->instance);
+}
+
+static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy);
+
+static int flb_proxy_output_cb_exit(void *out_context, struct flb_config *config)
+{
+ struct flb_plugin_proxy_context *ctx = out_context;
+ struct flb_plugin_proxy *proxy = (ctx->proxy);
+
+ if (!out_context) {
+ return 0;
+ }
+
+ if (proxy->def->proxy == FLB_PROXY_GOLANG) {
+#ifdef FLB_HAVE_PROXY_GO
+ proxy_go_output_destroy(ctx);
+#endif
+ }
+
+ flb_free(ctx);
+ return 0;
+}
+
+static void flb_proxy_output_cb_destroy(struct flb_output_plugin *plugin)
+{
+ struct flb_plugin_proxy *proxy = (struct flb_plugin_proxy *) plugin->proxy;
+ /* cleanup */
+ void (*cb_unregister)(struct flb_plugin_proxy_def *def);
+
+ cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister");
+ if (cb_unregister != NULL) {
+ cb_unregister(proxy->def);
+ }
+
+ if (proxy->def->proxy == FLB_PROXY_GOLANG) {
+#ifdef FLB_HAVE_PROXY_GO
+ proxy_go_output_unregister(proxy->data);
+#endif
+ }
+
+ flb_plugin_proxy_destroy(proxy);
+}
+
+static int flb_proxy_input_cb_exit(void *in_context, struct flb_config *config)
+{
+ struct flb_plugin_input_proxy_context *ctx = in_context;
+ struct flb_plugin_proxy *proxy = (ctx->proxy);
+
+ if (!in_context) {
+ return 0;
+ }
+
+ if (proxy->def->proxy == FLB_PROXY_GOLANG) {
+#ifdef FLB_HAVE_PROXY_GO
+ proxy_go_input_destroy(ctx);
+#endif
+ }
+
+ flb_free(ctx);
+ return 0;
+}
+
+static void flb_proxy_input_cb_destroy(struct flb_input_plugin *plugin)
+{
+ struct flb_plugin_proxy *proxy = (struct flb_plugin_proxy *) plugin->proxy;
+ /* cleanup */
+ void (*cb_unregister)(struct flb_plugin_proxy_def *def);
+
+ cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister");
+ if (cb_unregister != NULL) {
+ cb_unregister(proxy->def);
+ }
+
+ if (proxy->def->proxy == FLB_PROXY_GOLANG) {
+#ifdef FLB_HAVE_PROXY_GO
+ proxy_go_input_unregister(proxy->data);
+#endif
+ }
+
+ flb_plugin_proxy_destroy(proxy);
+}
+
+static int flb_proxy_register_output(struct flb_plugin_proxy *proxy,
+ struct flb_plugin_proxy_def *def,
+ struct flb_config *config)
+{
+ struct flb_output_plugin *out;
+
+ out = flb_calloc(1, sizeof(struct flb_output_plugin));
+ if (!out) {
+ flb_errno();
+ return -1;
+ }
+
+ /* Plugin registration */
+ out->type = FLB_OUTPUT_PLUGIN_PROXY;
+ out->proxy = proxy;
+ out->flags = def->flags;
+ out->name = def->name;
+ out->description = def->description;
+ mk_list_add(&out->_head, &config->out_plugins);
+
+ /*
+ * Set proxy callbacks: external plugins which are not following
+ * the core plugins specs, have a different callback approach, so
+ * we put our proxy-middle callbacks to do the translation properly.
+ */
+ out->cb_flush = proxy_cb_flush;
+ out->cb_exit = flb_proxy_output_cb_exit;
+ out->cb_destroy = flb_proxy_output_cb_destroy;
+ return 0;
+}
+
+static int flb_proxy_register_input(struct flb_plugin_proxy *proxy,
+ struct flb_plugin_proxy_def *def,
+ struct flb_config *config)
+{
+ struct flb_input_plugin *in;
+
+ in = flb_calloc(1, sizeof(struct flb_input_plugin));
+ if (!in) {
+ flb_errno();
+ return -1;
+ }
+
+ /* Plugin registration */
+ in->type = FLB_INPUT_PLUGIN_PROXY;
+ in->proxy = proxy;
+ in->flags = def->flags | FLB_INPUT_THREADED;
+ in->name = flb_strdup(def->name);
+ in->description = def->description;
+ mk_list_add(&in->_head, &config->in_plugins);
+
+ /*
+ * Set proxy callbacks: external plugins which are not following
+ * the core plugins specs, have a different callback approach, so
+ * we put our proxy-middle callbacks to do the translation properly.
+ */
+ in->cb_init = flb_proxy_input_cb_init;
+ in->cb_collect = flb_proxy_input_cb_collect;
+ in->cb_flush_buf = NULL;
+ in->cb_exit = flb_proxy_input_cb_exit;
+ in->cb_destroy = flb_proxy_input_cb_destroy;
+ in->cb_pause = flb_proxy_input_cb_pause;
+ in->cb_resume = flb_proxy_input_cb_resume;
+ return 0;
+}
+
+void *flb_plugin_proxy_symbol(struct flb_plugin_proxy *proxy,
+ const char *symbol)
+{
+ void *s;
+
+ dlerror();
+ s = dlsym(proxy->dso_handler, symbol);
+ if (dlerror() != NULL) {
+ return NULL;
+ }
+ return s;
+}
+
+int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
+ struct flb_config *config)
+{
+ int ret;
+ int (*cb_register)(struct flb_plugin_proxy_def *);
+ struct flb_plugin_proxy_def *def = proxy->def;
+
+ /* Lookup the registration callback */
+ cb_register = flb_plugin_proxy_symbol(proxy, "FLBPluginRegister");
+ if (!cb_register) {
+ return -1;
+ }
+
+ /*
+ * Create a temporary definition used for registration. This definition
+ * aims to be be populated by plugin in the registration phase with:
+ *
+ * - plugin type (or proxy type, e.g: Golang)
+ * - plugin name
+ * - plugin description
+ */
+
+ /* Do the registration */
+ ret = cb_register(def);
+ if (ret == -1) {
+ flb_free(def);
+ return -1;
+ }
+
+ /*
+ * Each plugin proxy/type, have their own handler, based on the data
+ * provided in the registration invoke the proper handler.
+ */
+ ret = -1;
+ if (def->proxy == FLB_PROXY_GOLANG) {
+#ifdef FLB_HAVE_PROXY_GO
+ if (def->type == FLB_PROXY_OUTPUT_PLUGIN) {
+ ret = proxy_go_output_register(proxy, def);
+ }
+ else if (def->type == FLB_PROXY_INPUT_PLUGIN) {
+ ret = proxy_go_input_register(proxy, def);
+ }
+#endif
+ }
+ if (ret == 0) {
+ /*
+ * We got a plugin that can do it job, now we need to create the
+ * real link to the 'output' interface
+ */
+ if (def->type == FLB_PROXY_OUTPUT_PLUGIN) {
+ flb_proxy_register_output(proxy, def, config);
+ }
+ else if (def->type == FLB_PROXY_INPUT_PLUGIN) {
+ flb_proxy_register_input(proxy, def, config);
+ }
+ }
+
+ return 0;
+}
+
+int flb_plugin_proxy_output_init(struct flb_plugin_proxy *proxy,
+ struct flb_output_instance *o_ins,
+ struct flb_config *config)
+{
+ int ret = -1;
+
+ /* Before to initialize, set the instance reference */
+ proxy->instance = o_ins;
+
+ /* Based on 'proxy', use the proper handler */
+ if (proxy->def->proxy == FLB_PROXY_GOLANG) {
+#ifdef FLB_HAVE_PROXY_GO
+ ret = proxy_go_output_init(proxy);
+#endif
+ }
+ else {
+ flb_error("[proxy] unrecognized proxy handler %i",
+ proxy->def->proxy);
+ }
+
+ return ret;
+}
+
+struct flb_plugin_proxy *flb_plugin_proxy_create(const char *dso_path, int type,
+ struct flb_config *config)
+{
+ void *handle;
+ struct flb_plugin_proxy *proxy;
+
+ /* Load shared library */
+ handle = dlopen(dso_path, RTLD_LAZY);
+ if (!handle) {
+ flb_error("[proxy] error opening plugin %s: '%s'",
+ dso_path, dlerror());
+ return NULL;
+ }
+
+ /* Proxy Context */
+ proxy = flb_malloc(sizeof(struct flb_plugin_proxy));
+ if (!proxy) {
+ flb_errno();
+ dlclose(handle);
+ return NULL;
+ }
+
+ /* API Context */
+ proxy->api = flb_api_create();
+ if (!proxy->api) {
+ dlclose(handle);
+ flb_free(proxy);
+ return NULL;
+ }
+
+ proxy->def = flb_malloc(sizeof(struct flb_plugin_proxy_def));
+ if (!proxy->def) {
+ flb_errno();
+ dlclose(handle);
+ flb_api_destroy(proxy->api);
+ flb_free(proxy);
+ return NULL;
+ }
+
+ /* Set fields and add it to the list */
+ proxy->def->type = type;
+ proxy->dso_handler = handle;
+ proxy->data = NULL;
+ mk_list_add(&proxy->_head, &config->proxies);
+
+ /* Register plugin */
+ flb_plugin_proxy_register(proxy, config);
+
+ return proxy;
+}
+
+static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy)
+{
+ flb_free(proxy->def);
+ flb_api_destroy(proxy->api);
+ dlclose(proxy->dso_handler);
+ mk_list_del(&proxy->_head);
+ flb_free(proxy);
+}
+
+int flb_plugin_proxy_set(struct flb_plugin_proxy_def *def, int type,
+ int proxy, char *name, char *description)
+{
+ def->type = type;
+ def->proxy = proxy;
+ def->name = flb_strdup(name);
+ def->description = flb_strdup(description);
+
+ return 0;
+}