summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/proxy/go/go.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/proxy/go/go.c')
-rw-r--r--fluent-bit/src/proxy/go/go.c289
1 files changed, 289 insertions, 0 deletions
diff --git a/fluent-bit/src/proxy/go/go.c b/fluent-bit/src/proxy/go/go.c
new file mode 100644
index 000000000..9d95a88af
--- /dev/null
+++ b/fluent-bit/src/proxy/go/go.c
@@ -0,0 +1,289 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_plugin_proxy.h>
+#include <fluent-bit/flb_output.h>
+#include "./go.h"
+
+/*
+ * These functions needs to be moved to a better place, still in
+ * experimental mode.
+ *
+ * ------------------------start------------------------------------------------
+ */
+
+/*
+ * Go Plugin phases
+ * ================
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * 1. FLBPluginRegister(context)
+ * 2. Inside FLBPluginRegister, it needs to register it self using Fluent Bit API
+ * where it basically set:
+ *
+ * - name: shortname of the plugin.
+ * - description: plugin description.
+ * - type: input, output, filter, whatever.
+ * - proxy: type of proxy e.g. GOLANG
+ * - flags: optional flags, not used by Go plugins at the moment.
+ *
+ * this is done through Go Wrapper:
+ *
+ * output.FLBPluginRegister(ctx, name, description, type, flags);
+ *
+ * 3. Plugin Initialization
+ */
+/*------------------------EOF------------------------------------------------*/
+
+int proxy_go_output_register(struct flb_plugin_proxy *proxy,
+ struct flb_plugin_proxy_def *def)
+{
+ struct flbgo_output_plugin *plugin;
+
+ plugin = flb_malloc(sizeof(struct flbgo_output_plugin));
+ if (!plugin) {
+ return -1;
+ }
+
+ /*
+ * Lookup the entry point function:
+ *
+ * - FLBPluginInit
+ * - FLBPluginFlush
+ * - FLBPluginFlushCtx
+ * - FLBPluginExit
+ *
+ * note: registration callback FLBPluginRegister() is resolved by the
+ * parent proxy interface.
+ */
+
+ plugin->cb_init = flb_plugin_proxy_symbol(proxy, "FLBPluginInit");
+ if (!plugin->cb_init) {
+ flb_error("[go proxy]: could not load FLBPluginInit symbol");
+ flb_free(plugin);
+ return -1;
+ }
+
+ plugin->cb_flush = flb_plugin_proxy_symbol(proxy, "FLBPluginFlush");
+ plugin->cb_flush_ctx = flb_plugin_proxy_symbol(proxy, "FLBPluginFlushCtx");
+ plugin->cb_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginExit");
+ plugin->cb_exit_ctx = flb_plugin_proxy_symbol(proxy, "FLBPluginExitCtx");
+ plugin->name = flb_strdup(def->name);
+
+ /* This Go plugin context is an opaque data for the parent proxy */
+ proxy->data = plugin;
+
+ return 0;
+}
+
+int proxy_go_output_init(struct flb_plugin_proxy *proxy)
+{
+ int ret;
+ struct flbgo_output_plugin *plugin = proxy->data;
+
+ /* set the API */
+ plugin->api = proxy->api;
+ plugin->o_ins = proxy->instance;
+ // In order to avoid having the whole instance as part of the ABI we
+ // copy the context pointer into the plugin.
+ plugin->context = ((struct flb_output_instance *)proxy->instance)->context;
+
+ ret = plugin->cb_init(plugin);
+ if (ret <= 0) {
+ flb_error("[go proxy]: plugin '%s' failed to initialize",
+ plugin->name);
+ flb_free(plugin);
+ return -1;
+ }
+
+ return ret;
+}
+
+int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx,
+ const void *data, size_t size,
+ const char *tag, int tag_len)
+{
+ int ret;
+ char *buf;
+ struct flbgo_output_plugin *plugin = ctx->proxy->data;
+
+ /* temporary buffer for the tag */
+ buf = flb_malloc(tag_len + 1);
+ if (!buf) {
+ flb_errno();
+ return -1;
+ }
+
+ memcpy(buf, tag, tag_len);
+ buf[tag_len] = '\0';
+
+ if (plugin->cb_flush_ctx) {
+ ret = plugin->cb_flush_ctx(ctx->remote_context, data, size, buf);
+ }
+ else {
+ ret = plugin->cb_flush(data, size, buf);
+ }
+ flb_free(buf);
+ return ret;
+}
+
+int proxy_go_output_destroy(struct flb_plugin_proxy_context *ctx)
+{
+ int ret = 0;
+ struct flbgo_output_plugin *plugin;
+
+ plugin = (struct flbgo_output_plugin *) ctx->proxy->data;
+ flb_debug("[GO] running exit callback");
+
+ if (plugin->cb_exit_ctx) {
+ ret = plugin->cb_exit_ctx(ctx->remote_context);
+ }
+ else if (plugin->cb_exit) {
+ ret = plugin->cb_exit();
+ }
+ return ret;
+}
+
+void proxy_go_output_unregister(void *data) {
+ struct flbgo_output_plugin *plugin;
+
+ plugin = (struct flbgo_output_plugin *) data;
+ flb_free(plugin->name);
+ flb_free(plugin);
+}
+
+int proxy_go_input_register(struct flb_plugin_proxy *proxy,
+ struct flb_plugin_proxy_def *def)
+{
+ struct flbgo_input_plugin *plugin;
+
+ plugin = flb_malloc(sizeof(struct flbgo_input_plugin));
+ if (!plugin) {
+ return -1;
+ }
+
+ /*
+ * Lookup the entry point function:
+ *
+ * - FLBPluginInit
+ * - FLBPluginInputCallback
+ * - FLBPluginExit
+ *
+ * note: registration callback FLBPluginRegister() is resolved by the
+ * parent proxy interface.
+ */
+
+ plugin->cb_init = flb_plugin_proxy_symbol(proxy, "FLBPluginInit");
+ if (!plugin->cb_init) {
+ flb_error("[go proxy]: could not load FLBPluginInit symbol");
+ flb_free(plugin);
+ return -1;
+ }
+
+ plugin->cb_collect = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCallback");
+ plugin->cb_cleanup = flb_plugin_proxy_symbol(proxy, "FLBPluginInputCleanupCallback");
+ plugin->cb_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginExit");
+ plugin->name = flb_strdup(def->name);
+
+ /* This Go plugin context is an opaque data for the parent proxy */
+ proxy->data = plugin;
+
+ return 0;
+}
+
+int proxy_go_input_init(struct flb_plugin_proxy *proxy)
+{
+ int ret;
+ struct flbgo_input_plugin *plugin = proxy->data;
+
+ /* set the API */
+ plugin->api = proxy->api;
+ plugin->i_ins = proxy->instance;
+ // In order to avoid having the whole instance as part of the ABI we
+ // copy the context pointer into the plugin.
+ plugin->context = ((struct flb_input_instance *)proxy->instance)->context;
+
+ ret = plugin->cb_init(plugin);
+ if (ret <= 0) {
+ flb_error("[go proxy]: plugin '%s' failed to initialize",
+ plugin->name);
+ flb_free(plugin);
+ return -1;
+ }
+
+ return ret;
+}
+
+int proxy_go_input_collect(struct flb_plugin_proxy *ctx,
+ void **collected_data, size_t *len)
+{
+ int ret;
+ void *data = NULL;
+ struct flbgo_input_plugin *plugin = ctx->data;
+
+ ret = plugin->cb_collect(&data, len);
+
+ *collected_data = data;
+
+ return ret;
+}
+
+int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx,
+ void *allocated_data)
+{
+ int ret = 0;
+ struct flbgo_input_plugin *plugin = ctx->data;
+
+ if (plugin->cb_cleanup) {
+ ret = plugin->cb_cleanup(allocated_data);
+ }
+ else {
+ /* If cleanup callback is not registered, we need to cleanup
+ * allocated memory on fluent-bit side. */
+ if (allocated_data != NULL) {
+ free(allocated_data);
+ }
+ }
+
+ return ret;
+}
+
+int proxy_go_input_destroy(struct flb_plugin_input_proxy_context *ctx)
+{
+ int ret = 0;
+ struct flbgo_input_plugin *plugin;
+
+ plugin = (struct flbgo_input_plugin *) ctx->proxy->data;
+ flb_debug("[GO] running exit callback");
+
+ if (plugin->cb_exit) {
+ ret = plugin->cb_exit();
+ }
+ return ret;
+}
+
+void proxy_go_input_unregister(void *data) {
+ struct flbgo_input_plugin *plugin;
+
+ plugin = (struct flbgo_input_plugin *) data;
+ flb_free(plugin->name);
+ flb_free(plugin);
+}