summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_output.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/flb_output.c')
-rw-r--r--fluent-bit/src/flb_output.c1445
1 files changed, 1445 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_output.c b/fluent-bit/src/flb_output.c
new file mode 100644
index 000000000..b1548f60d
--- /dev/null
+++ b/fluent-bit/src/flb_output.c
@@ -0,0 +1,1445 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_env.h>
+#include <fluent-bit/flb_coro.h>
+#include <fluent-bit/flb_output.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_io.h>
+#include <fluent-bit/flb_uri.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_macros.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_plugin.h>
+#include <fluent-bit/flb_plugin_proxy.h>
+#include <fluent-bit/flb_http_client_debug.h>
+#include <fluent-bit/flb_output_thread.h>
+#include <fluent-bit/flb_mp.h>
+#include <fluent-bit/flb_pack.h>
+
+FLB_TLS_DEFINE(struct flb_out_flush_params, out_flush_params);
+
+void flb_output_prepare()
+{
+ FLB_TLS_INIT(out_flush_params);
+}
+
+/* Validate the the output address protocol */
+static int check_protocol(const char *prot, const char *output)
+{
+ int len;
+ char *p;
+
+ p = strstr(output, "://");
+ if (p && p != output) {
+ len = p - output;
+ }
+ else {
+ len = strlen(output);
+ }
+
+ if (strlen(prot) != len) {
+ return 0;
+ }
+
+ /* Output plugin match */
+ if (strncasecmp(prot, output, len) == 0) {
+ return 1;
+ }
+
+ return 0;
+}
+
+
+/* Invoke pre-run call for the output plugin */
+void flb_output_pre_run(struct flb_config *config)
+{
+ struct mk_list *head;
+ struct flb_output_instance *ins;
+ struct flb_output_plugin *p;
+
+ mk_list_foreach(head, &config->outputs) {
+ ins = mk_list_entry(head, struct flb_output_instance, _head);
+ p = ins->p;
+ if (p->cb_pre_run) {
+ p->cb_pre_run(ins->context, config);
+ }
+ }
+}
+
+static void flb_output_free_properties(struct flb_output_instance *ins)
+{
+
+ flb_kv_release(&ins->properties);
+ flb_kv_release(&ins->net_properties);
+
+#ifdef FLB_HAVE_TLS
+ if (ins->tls_vhost) {
+ flb_sds_destroy(ins->tls_vhost);
+ }
+ if (ins->tls_ca_path) {
+ flb_sds_destroy(ins->tls_ca_path);
+ }
+ if (ins->tls_ca_file) {
+ flb_sds_destroy(ins->tls_ca_file);
+ }
+ if (ins->tls_crt_file) {
+ flb_sds_destroy(ins->tls_crt_file);
+ }
+ if (ins->tls_key_file) {
+ flb_sds_destroy(ins->tls_key_file);
+ }
+ if (ins->tls_key_passwd) {
+ flb_sds_destroy(ins->tls_key_passwd);
+ }
+#endif
+}
+
+void flb_output_flush_prepare_destroy(struct flb_output_flush *out_flush)
+{
+ struct flb_output_instance *ins = out_flush->o_ins;
+ struct flb_out_thread_instance *th_ins;
+
+ /* Move output coroutine context from active list to the destroy one */
+ if (flb_output_is_threaded(ins) == FLB_TRUE) {
+ th_ins = flb_output_thread_instance_get();
+ pthread_mutex_lock(&th_ins->flush_mutex);
+ mk_list_del(&out_flush->_head);
+ mk_list_add(&out_flush->_head, &th_ins->flush_list_destroy);
+ pthread_mutex_unlock(&th_ins->flush_mutex);
+ }
+ else {
+ mk_list_del(&out_flush->_head);
+ mk_list_add(&out_flush->_head, &ins->flush_list_destroy);
+ }
+}
+
+int flb_output_flush_id_get(struct flb_output_instance *ins)
+{
+ int id;
+ int max = (2 << 13) - 1; /* max for 14 bits */
+ struct flb_out_thread_instance *th_ins;
+
+ if (flb_output_is_threaded(ins) == FLB_TRUE) {
+ th_ins = flb_output_thread_instance_get();
+ id = th_ins->flush_id;
+ th_ins->flush_id++;
+
+ /* reset once it reach the maximum allowed */
+ if (th_ins->flush_id > max) {
+ th_ins->flush_id = 0;
+ }
+ }
+ else {
+ id = ins->flush_id;
+ ins->flush_id++;
+
+ /* reset once it reach the maximum allowed */
+ if (ins->flush_id > max) {
+ ins->flush_id = 0;
+ }
+ }
+
+ return id;
+}
+
+void flb_output_coro_add(struct flb_output_instance *ins, struct flb_coro *coro)
+{
+ struct flb_output_flush *out_flush;
+
+ out_flush = (struct flb_output_flush *) FLB_CORO_DATA(coro);
+ mk_list_add(&out_flush->_head, &ins->flush_list);
+}
+
+/*
+ * Queue a task to be flushed at a later time
+ * Deletes retry context if enqueue fails
+ */
+static int flb_output_task_queue_enqueue(struct flb_task_queue *queue,
+ struct flb_task_retry *retry,
+ struct flb_task *task,
+ struct flb_output_instance *out_ins,
+ struct flb_config *config)
+{
+ struct flb_task_enqueued *queued_task;
+
+ queued_task = flb_malloc(sizeof(struct flb_task_enqueued));
+ if (!queued_task) {
+ flb_errno();
+ if (retry) {
+ flb_task_retry_destroy(retry);
+ }
+ return -1;
+ }
+ queued_task->retry = retry;
+ queued_task->out_instance = out_ins;
+ queued_task->task = task;
+ queued_task->config = config;
+
+ mk_list_add(&queued_task->_head, &queue->pending);
+ return 0;
+}
+
+/*
+ * Pop task from pending queue and flush it
+ * Will delete retry context if flush fails
+ */
+static int flb_output_task_queue_flush_one(struct flb_task_queue *queue)
+{
+ struct flb_task_enqueued *queued_task;
+ int ret;
+ int is_empty;
+
+ is_empty = mk_list_is_empty(&queue->pending) == 0;
+ if (is_empty) {
+ flb_error("Attempting to flush task from an empty in_progress queue");
+ return -1;
+ }
+
+ queued_task = mk_list_entry_first(&queue->pending, struct flb_task_enqueued, _head);
+ mk_list_del(&queued_task->_head);
+ mk_list_add(&queued_task->_head, &queue->in_progress);
+
+ /*
+ * Remove temporary user now that task is out of singleplex queue.
+ * Flush will add back the user representing queued_task->out_instance if it succeeds.
+ */
+ flb_task_users_dec(queued_task->task, FLB_FALSE);
+ ret = flb_output_task_flush(queued_task->task,
+ queued_task->out_instance,
+ queued_task->config);
+
+ /* Destroy retry context if needed */
+ if (ret == -1) {
+ if (queued_task->retry) {
+ flb_task_retry_destroy(queued_task->retry);
+ }
+ /* Flush the next task */
+ flb_output_task_singleplex_flush_next(queue);
+ return -1;
+ }
+
+ return ret;
+}
+
+/*
+ * Will either run or queue running a single task
+ * Deletes retry context if enqueue fails
+ */
+int flb_output_task_singleplex_enqueue(struct flb_task_queue *queue,
+ struct flb_task_retry *retry,
+ struct flb_task *task,
+ struct flb_output_instance *out_ins,
+ struct flb_config *config)
+{
+ int ret;
+ int is_empty;
+
+ /*
+ * Add temporary user to preserve task while in singleplex queue.
+ * Temporary user will be removed when task is removed from queue.
+ *
+ * Note: if we fail to increment now, then the task may be prematurely
+ * deleted if the task's users go to 0 while we are waiting in the
+ * queue.
+ */
+ flb_task_users_inc(task);
+
+ /* Enqueue task */
+ ret = flb_output_task_queue_enqueue(queue, retry, task, out_ins, config);
+ if (ret == -1) {
+ return -1;
+ }
+
+ /* Launch task if nothing is running */
+ is_empty = mk_list_is_empty(&out_ins->singleplex_queue->in_progress) == 0;
+ if (is_empty) {
+ return flb_output_task_queue_flush_one(out_ins->singleplex_queue);
+ }
+
+ return 0;
+}
+
+/*
+ * Clear in progress task and flush a single queued task if exists
+ * Deletes retry context on next flush if flush fails
+ */
+int flb_output_task_singleplex_flush_next(struct flb_task_queue *queue)
+{
+ int is_empty;
+ struct flb_task_enqueued *ended_task;
+
+ /* Remove in progress task */
+ is_empty = mk_list_is_empty(&queue->in_progress) == 0;
+ if (!is_empty) {
+ ended_task = mk_list_entry_first(&queue->in_progress,
+ struct flb_task_enqueued, _head);
+ mk_list_del(&ended_task->_head);
+ flb_free(ended_task);
+ }
+
+ /* Flush if there is a pending task queued */
+ is_empty = mk_list_is_empty(&queue->pending) == 0;
+ if (!is_empty) {
+ return flb_output_task_queue_flush_one(queue);
+ }
+ return 0;
+}
+
+/*
+ * Flush a task through the output plugin, either using a worker thread + coroutine
+ * or a simple co-routine in the current thread.
+ */
+int flb_output_task_flush(struct flb_task *task,
+ struct flb_output_instance *out_ins,
+ struct flb_config *config)
+{
+ int ret;
+ struct flb_output_flush *out_flush;
+
+ if (flb_output_is_threaded(out_ins) == FLB_TRUE) {
+ flb_task_users_inc(task);
+
+ /* Dispatch the task to the thread pool */
+ ret = flb_output_thread_pool_flush(task, out_ins, config);
+ if (ret == -1) {
+ flb_task_users_dec(task, FLB_FALSE);
+
+ /* If we are in synchronous mode, flush one waiting task */
+ if (out_ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
+ flb_output_task_singleplex_flush_next(out_ins->singleplex_queue);
+ }
+ }
+ }
+ else {
+ /* Queue co-routine handling */
+ out_flush = flb_output_flush_create(task,
+ task->i_ins,
+ out_ins,
+ config);
+ if (!out_flush) {
+ return -1;
+ }
+
+ flb_task_users_inc(task);
+ ret = flb_pipe_w(config->ch_self_events[1], &out_flush,
+ sizeof(struct flb_output_flush*));
+ if (ret == -1) {
+ flb_errno();
+ flb_output_flush_destroy(out_flush);
+ flb_task_users_dec(task, FLB_FALSE);
+
+ /* If we are in synchronous mode, flush one waiting task */
+ if (out_ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
+ flb_output_task_singleplex_flush_next(out_ins->singleplex_queue);
+ }
+
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+int flb_output_instance_destroy(struct flb_output_instance *ins)
+{
+ if (ins->alias) {
+ flb_sds_destroy(ins->alias);
+ }
+
+ /* Remove URI context */
+ if (ins->host.uri) {
+ flb_uri_destroy(ins->host.uri);
+ }
+
+ flb_sds_destroy(ins->host.name);
+ flb_sds_destroy(ins->host.address);
+ flb_sds_destroy(ins->host.listen);
+ flb_sds_destroy(ins->match);
+
+#ifdef FLB_HAVE_REGEX
+ if (ins->match_regex) {
+ flb_regex_destroy(ins->match_regex);
+ }
+#endif
+
+#ifdef FLB_HAVE_TLS
+ if (ins->use_tls == FLB_TRUE) {
+ if (ins->tls) {
+ flb_tls_destroy(ins->tls);
+ }
+ }
+
+ if (ins->tls_config_map) {
+ flb_config_map_destroy(ins->tls_config_map);
+ }
+#endif
+
+ /* Remove metrics */
+#ifdef FLB_HAVE_METRICS
+ if (ins->cmt) {
+ cmt_destroy(ins->cmt);
+ }
+
+ if (ins->metrics) {
+ flb_metrics_destroy(ins->metrics);
+ }
+#endif
+
+ /* destroy callback context */
+ if (ins->callback) {
+ flb_callback_destroy(ins->callback);
+ }
+
+ /* destroy config map */
+ if (ins->config_map) {
+ flb_config_map_destroy(ins->config_map);
+ }
+
+ if (ins->net_config_map) {
+ flb_config_map_destroy(ins->net_config_map);
+ }
+
+ if (ins->ch_events[0] > 0) {
+ mk_event_closesocket(ins->ch_events[0]);
+ }
+
+ if (ins->ch_events[1] > 0) {
+ mk_event_closesocket(ins->ch_events[1]);
+ }
+
+ /* release properties */
+ flb_output_free_properties(ins);
+
+ /* free singleplex queue */
+ if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
+ flb_task_queue_destroy(ins->singleplex_queue);
+ }
+
+ mk_list_del(&ins->_head);
+
+ /* processor */
+ if (ins->processor) {
+ flb_processor_destroy(ins->processor);
+ }
+
+ flb_free(ins);
+
+ return 0;
+}
+
+/* Invoke exit call for the output plugin */
+void flb_output_exit(struct flb_config *config)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_output_instance *ins;
+ struct flb_output_plugin *p;
+ void *params;
+
+ mk_list_foreach_safe(head, tmp, &config->outputs) {
+ ins = mk_list_entry(head, struct flb_output_instance, _head);
+ p = ins->p;
+
+ /* Stop any worker thread */
+ if (flb_output_is_threaded(ins) == FLB_TRUE) {
+ flb_output_thread_pool_destroy(ins);
+ }
+
+ /* Check a exit callback */
+ if (p->cb_exit) {
+ p->cb_exit(ins->context, config);
+ }
+ flb_output_instance_destroy(ins);
+ }
+
+ params = FLB_TLS_GET(out_flush_params);
+ if (params) {
+ flb_free(params);
+ }
+}
+
+static inline int instance_id(struct flb_config *config)
+{
+ struct flb_output_instance *ins;
+
+ if (mk_list_size(&config->outputs) == 0) {
+ return 0;
+ }
+
+ ins = mk_list_entry_last(&config->outputs, struct flb_output_instance,
+ _head);
+ return (ins->id + 1);
+}
+
+struct flb_output_instance *flb_output_get_instance(struct flb_config *config,
+ int out_id)
+{
+ struct mk_list *head;
+ struct flb_output_instance *ins;
+
+ mk_list_foreach(head, &config->outputs) {
+ ins = mk_list_entry(head, struct flb_output_instance, _head);
+ if (ins->id == out_id) {
+ break;
+ }
+ ins = NULL;
+ }
+
+ if (!ins) {
+ return NULL;
+ }
+
+ return ins;
+}
+
+/*
+ * Invoked everytime a flush callback has finished (returned). This function
+ * is called from the event loop.
+ */
+int flb_output_flush_finished(struct flb_config *config, int out_id)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct mk_list *list;
+ struct flb_output_instance *ins;
+ struct flb_output_flush *out_flush;
+ struct flb_out_thread_instance *th_ins;
+
+ ins = flb_output_get_instance(config, out_id);
+ if (!ins) {
+ return -1;
+ }
+
+ if (flb_output_is_threaded(ins) == FLB_TRUE) {
+ th_ins = flb_output_thread_instance_get();
+ list = &th_ins->flush_list_destroy;
+ }
+ else {
+ list = &ins->flush_list_destroy;
+ }
+
+ /* Look for output coroutines that needs to be destroyed */
+ mk_list_foreach_safe(head, tmp, list) {
+ out_flush = mk_list_entry(head, struct flb_output_flush, _head);
+ flb_output_flush_destroy(out_flush);
+ }
+
+ return 0;
+}
+
+
+/*
+ * It validate an output type given the string, it return the
+ * proper type and if valid, populate the global config.
+ */
+struct flb_output_instance *flb_output_new(struct flb_config *config,
+ const char *output, void *data,
+ int public_only)
+{
+ int ret = -1;
+ int flags = 0;
+ struct mk_list *head;
+ struct flb_output_plugin *plugin;
+ struct flb_output_instance *instance = NULL;
+
+ if (!output) {
+ return NULL;
+ }
+
+ mk_list_foreach(head, &config->out_plugins) {
+ plugin = mk_list_entry(head, struct flb_output_plugin, _head);
+ if (!check_protocol(plugin->name, output)) {
+ plugin = NULL;
+ continue;
+ }
+
+ if (public_only && plugin->flags & FLB_OUTPUT_PRIVATE) {
+ return NULL;
+ }
+ break;
+ }
+
+ if (!plugin) {
+ return NULL;
+ }
+
+ /* Create and load instance */
+ instance = flb_calloc(1, sizeof(struct flb_output_instance));
+ if (!instance) {
+ flb_errno();
+ return NULL;
+ }
+
+ /* Initialize event type, if not set, default to FLB_OUTPUT_LOGS */
+ if (plugin->event_type == 0) {
+ instance->event_type = FLB_OUTPUT_LOGS;
+ }
+ else {
+ instance->event_type = plugin->event_type;
+ }
+ instance->config = config;
+ instance->log_level = -1;
+ instance->log_suppress_interval = -1;
+ instance->test_mode = FLB_FALSE;
+ instance->is_threaded = FLB_FALSE;
+ instance->tp_workers = plugin->workers;
+
+ /* Retrieve an instance id for the output instance */
+ instance->id = instance_id(config);
+
+ /* format name (with instance id) */
+ snprintf(instance->name, sizeof(instance->name) - 1,
+ "%s.%i", plugin->name, instance->id);
+ instance->p = plugin;
+ instance->callback = flb_callback_create(instance->name);
+ if (!instance->callback) {
+ if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
+ flb_task_queue_destroy(instance->singleplex_queue);
+ }
+ flb_free(instance);
+ return NULL;
+ }
+
+ if (plugin->type == FLB_OUTPUT_PLUGIN_CORE) {
+ instance->context = NULL;
+ }
+ else {
+ struct flb_plugin_proxy_context *ctx;
+
+ ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
+ if (!ctx) {
+ flb_errno();
+ if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
+ flb_task_queue_destroy(instance->singleplex_queue);
+ }
+ flb_free(instance);
+ return NULL;
+ }
+
+ ctx->proxy = plugin->proxy;
+
+ instance->context = ctx;
+ }
+
+ instance->alias = NULL;
+ instance->flags = instance->p->flags;
+ instance->data = data;
+ instance->match = NULL;
+#ifdef FLB_HAVE_REGEX
+ instance->match_regex = NULL;
+#endif
+ instance->retry_limit = 1;
+ instance->host.name = NULL;
+ instance->host.address = NULL;
+ instance->net_config_map = NULL;
+
+ /* Storage */
+ instance->total_limit_size = -1;
+
+ /* Parent plugin flags */
+ flags = instance->flags;
+ if (flags & FLB_IO_TCP) {
+ instance->use_tls = FLB_FALSE;
+ }
+ else if (flags & FLB_IO_TLS) {
+ instance->use_tls = FLB_TRUE;
+ }
+ else if (flags & FLB_IO_OPT_TLS) {
+ /* TLS must be enabled manually in the config */
+ instance->use_tls = FLB_FALSE;
+ instance->flags |= FLB_IO_TLS;
+ }
+
+#ifdef FLB_HAVE_TLS
+ instance->tls = NULL;
+ instance->tls_debug = -1;
+ instance->tls_verify = FLB_TRUE;
+ instance->tls_vhost = NULL;
+ instance->tls_ca_path = NULL;
+ instance->tls_ca_file = NULL;
+ instance->tls_crt_file = NULL;
+ instance->tls_key_file = NULL;
+ instance->tls_key_passwd = NULL;
+#endif
+
+ if (plugin->flags & FLB_OUTPUT_NET) {
+ ret = flb_net_host_set(plugin->name, &instance->host, output);
+ if (ret != 0) {
+ if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
+ flb_task_queue_destroy(instance->singleplex_queue);
+ }
+ flb_free(instance);
+ return NULL;
+ }
+ }
+
+ /* Create singleplex queue if SYNCHRONOUS mode is used */
+ instance->singleplex_queue = NULL;
+ if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
+ instance->singleplex_queue = flb_task_queue_create();
+ if (!instance->singleplex_queue) {
+ flb_free(instance);
+ flb_errno();
+ return NULL;
+ }
+ }
+
+ flb_kv_init(&instance->properties);
+ flb_kv_init(&instance->net_properties);
+ mk_list_init(&instance->upstreams);
+ mk_list_init(&instance->flush_list);
+ mk_list_init(&instance->flush_list_destroy);
+
+ mk_list_add(&instance->_head, &config->outputs);
+
+ /* processor instance */
+ instance->processor = flb_processor_create(config, instance->name, instance, FLB_PLUGIN_OUTPUT);
+
+ /* Tests */
+ instance->test_formatter.callback = plugin->test_formatter.callback;
+
+
+ return instance;
+}
+
+static inline int prop_key_check(const char *key, const char *kv, int k_len)
+{
+ int len;
+
+ len = strlen(key);
+ if (strncasecmp(key, kv, k_len) == 0 && len == k_len) {
+ return 0;
+ }
+
+ return -1;
+}
+
+/* Override a configuration property for the given input_instance plugin */
+int flb_output_set_property(struct flb_output_instance *ins,
+ const char *k, const char *v)
+{
+ int len;
+ int ret;
+ ssize_t limit;
+ flb_sds_t tmp;
+ struct flb_kv *kv;
+ struct flb_config *config = ins->config;
+
+ len = strlen(k);
+ tmp = flb_env_var_translate(config->env, v);
+ if (tmp) {
+ if (strlen(tmp) == 0) {
+ flb_sds_destroy(tmp);
+ tmp = NULL;
+ }
+ }
+
+ /* Check if the key is a known/shared property */
+ if (prop_key_check("match", k, len) == 0) {
+ flb_utils_set_plugin_string_property("match", &ins->match, tmp);
+ }
+#ifdef FLB_HAVE_REGEX
+ else if (prop_key_check("match_regex", k, len) == 0 && tmp) {
+ ins->match_regex = flb_regex_create(tmp);
+ flb_sds_destroy(tmp);
+ }
+#endif
+ else if (prop_key_check("alias", k, len) == 0 && tmp) {
+ flb_utils_set_plugin_string_property("alias", &ins->alias, tmp);
+ }
+ else if (prop_key_check("log_level", k, len) == 0 && tmp) {
+ ret = flb_log_get_level_str(tmp);
+ flb_sds_destroy(tmp);
+ if (ret == -1) {
+ return -1;
+ }
+ ins->log_level = ret;
+ }
+ else if (prop_key_check("log_suppress_interval", k, len) == 0 && tmp) {
+ ret = flb_utils_time_to_seconds(tmp);
+ flb_sds_destroy(tmp);
+ if (ret == -1) {
+ return -1;
+ }
+ ins->log_suppress_interval = ret;
+ }
+ else if (prop_key_check("host", k, len) == 0) {
+ flb_utils_set_plugin_string_property("host", &ins->host.name, tmp);
+ }
+ else if (prop_key_check("port", k, len) == 0) {
+ if (tmp) {
+ ins->host.port = atoi(tmp);
+ flb_sds_destroy(tmp);
+ }
+ else {
+ ins->host.port = 0;
+ }
+ }
+ else if (prop_key_check("ipv6", k, len) == 0 && tmp) {
+ ins->host.ipv6 = flb_utils_bool(tmp);
+ flb_sds_destroy(tmp);
+ }
+ else if (prop_key_check("retry_limit", k, len) == 0) {
+ if (tmp) {
+ if (strcasecmp(tmp, "no_limits") == 0 ||
+ strcasecmp(tmp, "false") == 0 ||
+ strcasecmp(tmp, "off") == 0) {
+ /* No limits for retries */
+ ins->retry_limit = FLB_OUT_RETRY_UNLIMITED;
+ }
+ else if (strcasecmp(tmp, "no_retries") == 0) {
+ ins->retry_limit = FLB_OUT_RETRY_NONE;
+ }
+ else {
+ ins->retry_limit = atoi(tmp);
+ if (ins->retry_limit <= 0) {
+ flb_warn("[config] invalid retry_limit. set default.");
+ /* set default when input is invalid number */
+ ins->retry_limit = 1;
+ }
+ }
+ flb_sds_destroy(tmp);
+ }
+ else {
+ ins->retry_limit = 1;
+ }
+ }
+ else if (strncasecmp("net.", k, 4) == 0 && tmp) {
+ kv = flb_kv_item_create(&ins->net_properties, (char *) k, NULL);
+ if (!kv) {
+ if (tmp) {
+ flb_sds_destroy(tmp);
+ }
+ return -1;
+ }
+ kv->val = tmp;
+ }
+#ifdef FLB_HAVE_HTTP_CLIENT_DEBUG
+ else if (strncasecmp("_debug.http.", k, 12) == 0 && tmp) {
+ ret = flb_http_client_debug_property_is_valid((char *) k, tmp);
+ if (ret == FLB_TRUE) {
+ kv = flb_kv_item_create(&ins->properties, (char *) k, NULL);
+ if (!kv) {
+ if (tmp) {
+ flb_sds_destroy(tmp);
+ }
+ return -1;
+ }
+ kv->val = tmp;
+ }
+ else {
+ flb_error("[config] invalid property '%s' on instance '%s'",
+ k, flb_output_name(ins));
+ flb_sds_destroy(tmp);
+ }
+ }
+#endif
+#ifdef FLB_HAVE_TLS
+ else if (prop_key_check("tls", k, len) == 0 && tmp) {
+ ins->use_tls = flb_utils_bool(tmp);
+ if (ins->use_tls == FLB_TRUE && ((ins->flags & FLB_IO_TLS) == 0)) {
+ flb_error("[config] %s does not support TLS", ins->name);
+ flb_sds_destroy(tmp);
+ return -1;
+ }
+ flb_sds_destroy(tmp);
+ }
+ else if (prop_key_check("tls.verify", k, len) == 0 && tmp) {
+ ins->tls_verify = flb_utils_bool(tmp);
+ flb_sds_destroy(tmp);
+ }
+ else if (prop_key_check("tls.debug", k, len) == 0 && tmp) {
+ ins->tls_debug = atoi(tmp);
+ flb_sds_destroy(tmp);
+ }
+ else if (prop_key_check("tls.vhost", k, len) == 0) {
+ flb_utils_set_plugin_string_property("tls.vhost", &ins->tls_vhost, tmp);
+ }
+ else if (prop_key_check("tls.ca_path", k, len) == 0) {
+ flb_utils_set_plugin_string_property("tls.ca_path", &ins->tls_ca_path, tmp);
+ }
+ else if (prop_key_check("tls.ca_file", k, len) == 0) {
+ flb_utils_set_plugin_string_property("tls.ca_file", &ins->tls_ca_file, tmp);
+ }
+ else if (prop_key_check("tls.crt_file", k, len) == 0) {
+ flb_utils_set_plugin_string_property("tls.crt_file", &ins->tls_crt_file, tmp);
+ }
+ else if (prop_key_check("tls.key_file", k, len) == 0) {
+ flb_utils_set_plugin_string_property("tls.key_file", &ins->tls_key_file, tmp);
+ }
+ else if (prop_key_check("tls.key_passwd", k, len) == 0) {
+ flb_utils_set_plugin_string_property("tls.key_passwd", &ins->tls_key_passwd, tmp);
+ }
+#endif
+ else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) {
+ if (strcasecmp(tmp, "off") == 0 ||
+ flb_utils_bool(tmp) == FLB_FALSE) {
+ /* no limit for filesystem storage */
+ limit = -1;
+ flb_info("[config] unlimited filesystem buffer for %s plugin",
+ ins->name);
+ }
+ else {
+ limit = flb_utils_size_to_bytes(tmp);
+ if (limit == -1) {
+ flb_sds_destroy(tmp);
+ return -1;
+ }
+
+ if (limit == 0) {
+ limit = -1;
+ }
+ }
+
+ flb_sds_destroy(tmp);
+ ins->total_limit_size = (size_t) limit;
+ }
+ else if (prop_key_check("workers", k, len) == 0 && tmp) {
+ /* Set the number of workers */
+ ins->tp_workers = atoi(tmp);
+ flb_sds_destroy(tmp);
+ }
+ else {
+ /*
+ * Create the property, we don't pass the value since we will
+ * map it directly to avoid an extra memory allocation.
+ */
+ kv = flb_kv_item_create(&ins->properties, (char *) k, NULL);
+ if (!kv) {
+ if (tmp) {
+ flb_sds_destroy(tmp);
+ }
+ return -1;
+ }
+ kv->val = tmp;
+ }
+
+ return 0;
+}
+
+/* Configure a default hostname and TCP port if they are not set */
+void flb_output_net_default(const char *host, const int port,
+ struct flb_output_instance *ins)
+{
+ /* Set default network configuration */
+ if (!ins->host.name) {
+ ins->host.name = flb_sds_create(host);
+ }
+ if (ins->host.port == 0) {
+ ins->host.port = port;
+ }
+}
+
+/* Add thread pool for output plugin if configured with workers */
+int flb_output_enable_multi_threading(struct flb_output_instance *ins, struct flb_config *config)
+{
+ /* Multi-threading enabled ? (through 'workers' property) */
+ if (ins->tp_workers > 0) {
+ if(flb_output_thread_pool_create(config, ins) != 0) {
+ flb_output_instance_destroy(ins);
+ return -1;
+ }
+ flb_output_thread_pool_start(ins);
+ }
+
+ return 0;
+}
+
+/* Return an instance name or alias */
+const char *flb_output_name(struct flb_output_instance *ins)
+{
+ if (ins->alias) {
+ return ins->alias;
+ }
+
+ return ins->name;
+}
+
+const char *flb_output_get_property(const char *key, struct flb_output_instance *ins)
+{
+ return flb_config_prop_get(key, &ins->properties);
+}
+
+#ifdef FLB_HAVE_METRICS
+void *flb_output_get_cmt_instance(struct flb_output_instance *ins)
+{
+ return (void *)ins->cmt;
+}
+#endif
+
+int flb_output_net_property_check(struct flb_output_instance *ins,
+ struct flb_config *config)
+{
+ int ret = 0;
+
+ /* Get Upstream net_setup configmap */
+ ins->net_config_map = flb_upstream_get_config_map(config);
+ if (!ins->net_config_map) {
+ flb_output_instance_destroy(ins);
+ return -1;
+ }
+
+ /*
+ * Validate 'net.*' properties: if the plugin use the Upstream interface,
+ * it might receive some networking settings.
+ */
+ if (mk_list_size(&ins->net_properties) > 0) {
+ ret = flb_config_map_properties_check(ins->p->name,
+ &ins->net_properties,
+ ins->net_config_map);
+ if (ret == -1) {
+ if (config->program_name) {
+ flb_helper("try the command: %s -o %s -h\n",
+ config->program_name, ins->p->name);
+ }
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+int flb_output_plugin_property_check(struct flb_output_instance *ins,
+ struct flb_config *config)
+{
+ int ret = 0;
+ struct mk_list *config_map;
+ struct flb_output_plugin *p = ins->p;
+
+ if (p->config_map) {
+ /*
+ * Create a dynamic version of the configmap that will be used by the specific
+ * instance in question.
+ */
+ config_map = flb_config_map_create(config, p->config_map);
+ if (!config_map) {
+ flb_error("[output] error loading config map for '%s' plugin",
+ p->name);
+ flb_output_instance_destroy(ins);
+ return -1;
+ }
+ ins->config_map = config_map;
+
+ /* Validate incoming properties against config map */
+ ret = flb_config_map_properties_check(ins->p->name,
+ &ins->properties, ins->config_map);
+ if (ret == -1) {
+ if (config->program_name) {
+ flb_helper("try the command: %s -o %s -h\n",
+ config->program_name, ins->p->name);
+ }
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/* Trigger the output plugins setup callbacks to prepare them. */
+int flb_output_init_all(struct flb_config *config)
+{
+ int ret;
+#ifdef FLB_HAVE_METRICS
+ char *name;
+#endif
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_output_instance *ins;
+ struct flb_output_plugin *p;
+ uint64_t ts;
+
+ /* Retrieve the plugin reference */
+ mk_list_foreach_safe(head, tmp, &config->outputs) {
+ ins = mk_list_entry(head, struct flb_output_instance, _head);
+ if (ins->log_level == -1) {
+ ins->log_level = config->log->level;
+ }
+ p = ins->p;
+
+ /* Output Events Channel */
+ ret = mk_event_channel_create(config->evl,
+ &ins->ch_events[0],
+ &ins->ch_events[1],
+ ins);
+ if (ret != 0) {
+ flb_error("could not create events channels for '%s'",
+ flb_output_name(ins));
+ flb_output_instance_destroy(ins);
+ return -1;
+ }
+ flb_debug("[%s:%s] created event channels: read=%i write=%i",
+ ins->p->name, flb_output_name(ins),
+ ins->ch_events[0], ins->ch_events[1]);
+
+ /*
+ * Note: mk_event_channel_create() sets a type = MK_EVENT_NOTIFICATION by
+ * default, we need to overwrite this value so we can do a clean check
+ * into the Engine when the event is triggered.
+ */
+ ins->event.type = FLB_ENGINE_EV_OUTPUT;
+
+ /* Metrics */
+#ifdef FLB_HAVE_METRICS
+ /* Get name or alias for the instance */
+ name = (char *) flb_output_name(ins);
+
+ /* get timestamp */
+ ts = cfl_time_now();
+
+ /* CMetrics */
+ ins->cmt = cmt_create();
+ if (!ins->cmt) {
+ flb_error("[output] could not create cmetrics context");
+ return -1;
+ }
+
+ /*
+ * Register generic output plugin metrics
+ */
+
+ /* fluentbit_output_proc_records_total */
+ ins->cmt_proc_records = cmt_counter_create(ins->cmt, "fluentbit",
+ "output", "proc_records_total",
+ "Number of processed output records.",
+ 1, (char *[]) {"name"});
+ cmt_counter_set(ins->cmt_proc_records, ts, 0, 1, (char *[]) {name});
+
+
+ /* fluentbit_output_proc_bytes_total */
+ ins->cmt_proc_bytes = cmt_counter_create(ins->cmt, "fluentbit",
+ "output", "proc_bytes_total",
+ "Number of processed output bytes.",
+ 1, (char *[]) {"name"});
+ cmt_counter_set(ins->cmt_proc_bytes, ts, 0, 1, (char *[]) {name});
+
+
+ /* fluentbit_output_errors_total */
+ ins->cmt_errors = cmt_counter_create(ins->cmt, "fluentbit",
+ "output", "errors_total",
+ "Number of output errors.",
+ 1, (char *[]) {"name"});
+ cmt_counter_set(ins->cmt_errors, ts, 0, 1, (char *[]) {name});
+
+
+ /* fluentbit_output_retries_total */
+ ins->cmt_retries = cmt_counter_create(ins->cmt, "fluentbit",
+ "output", "retries_total",
+ "Number of output retries.",
+ 1, (char *[]) {"name"});
+ cmt_counter_set(ins->cmt_retries, ts, 0, 1, (char *[]) {name});
+
+ /* fluentbit_output_retries_failed_total */
+ ins->cmt_retries_failed = cmt_counter_create(ins->cmt, "fluentbit",
+ "output", "retries_failed_total",
+ "Number of abandoned batches because "
+ "the maximum number of re-tries was "
+ "reached.",
+ 1, (char *[]) {"name"});
+ cmt_counter_set(ins->cmt_retries_failed, ts, 0, 1, (char *[]) {name});
+
+
+ /* fluentbit_output_dropped_records_total */
+ ins->cmt_dropped_records = cmt_counter_create(ins->cmt, "fluentbit",
+ "output", "dropped_records_total",
+ "Number of dropped records.",
+ 1, (char *[]) {"name"});
+ cmt_counter_set(ins->cmt_dropped_records, ts, 0, 1, (char *[]) {name});
+
+ /* fluentbit_output_retried_records_total */
+ ins->cmt_retried_records = cmt_counter_create(ins->cmt, "fluentbit",
+ "output", "retried_records_total",
+ "Number of retried records.",
+ 1, (char *[]) {"name"});
+ cmt_counter_set(ins->cmt_retried_records, ts, 0, 1, (char *[]) {name});
+
+ /* output_upstream_total_connections */
+ ins->cmt_upstream_total_connections = cmt_gauge_create(ins->cmt,
+ "fluentbit",
+ "output",
+ "upstream_total_connections",
+ "Total Connection count.",
+ 1, (char *[]) {"name"});
+ cmt_gauge_set(ins->cmt_upstream_total_connections,
+ ts,
+ 0,
+ 1, (char *[]) {name});
+
+ /* output_upstream_total_connections */
+ ins->cmt_upstream_busy_connections = cmt_gauge_create(ins->cmt,
+ "fluentbit",
+ "output",
+ "upstream_busy_connections",
+ "Busy Connection count.",
+ 1, (char *[]) {"name"});
+ cmt_gauge_set(ins->cmt_upstream_busy_connections,
+ ts,
+ 0,
+ 1, (char *[]) {name});
+
+ /* old API */
+ ins->metrics = flb_metrics_create(name);
+ if (ins->metrics) {
+ flb_metrics_add(FLB_METRIC_OUT_OK_RECORDS,
+ "proc_records", ins->metrics);
+ flb_metrics_add(FLB_METRIC_OUT_OK_BYTES,
+ "proc_bytes", ins->metrics);
+ flb_metrics_add(FLB_METRIC_OUT_ERROR,
+ "errors", ins->metrics);
+ flb_metrics_add(FLB_METRIC_OUT_RETRY,
+ "retries", ins->metrics);
+ flb_metrics_add(FLB_METRIC_OUT_RETRY_FAILED,
+ "retries_failed", ins->metrics);
+ flb_metrics_add(FLB_METRIC_OUT_DROPPED_RECORDS,
+ "dropped_records", ins->metrics);
+ flb_metrics_add(FLB_METRIC_OUT_RETRIED_RECORDS,
+ "retried_records", ins->metrics);
+ }
+#endif
+
+#ifdef FLB_HAVE_PROXY_GO
+ /* Proxy plugins have their own initialization */
+ if (p->type == FLB_OUTPUT_PLUGIN_PROXY) {
+ ret = flb_plugin_proxy_output_init(p->proxy, ins, config);
+ if (ret == -1) {
+ flb_output_instance_destroy(ins);
+ return -1;
+ }
+
+ /* Multi-threading enabled if configured */
+ ret = flb_output_enable_multi_threading(ins, config);
+ if (ret == -1) {
+ flb_error("[output] could not start thread pool for '%s' plugin",
+ p->name);
+ return -1;
+ }
+
+ continue;
+ }
+#endif
+
+#ifdef FLB_HAVE_TLS
+ if (ins->use_tls == FLB_TRUE) {
+ ins->tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
+ ins->tls_verify,
+ ins->tls_debug,
+ ins->tls_vhost,
+ ins->tls_ca_path,
+ ins->tls_ca_file,
+ ins->tls_crt_file,
+ ins->tls_key_file,
+ ins->tls_key_passwd);
+ if (!ins->tls) {
+ flb_error("[output %s] error initializing TLS context",
+ ins->name);
+ flb_output_instance_destroy(ins);
+ return -1;
+ }
+ }
+#endif
+ /*
+ * Before to call the initialization callback, make sure that the received
+ * configuration parameters are valid if the plugin is registering a config map.
+ */
+ if (flb_output_plugin_property_check(ins, config) == -1) {
+ flb_output_instance_destroy(ins);
+ return -1;
+ }
+
+#ifdef FLB_HAVE_TLS
+ struct flb_config_map *m;
+
+ /* TLS config map (just for 'help' formatting purposes) */
+ ins->tls_config_map = flb_tls_get_config_map(config);
+ if (!ins->tls_config_map) {
+ flb_output_instance_destroy(ins);
+ return -1;
+ }
+
+ /* Override first configmap value based on it plugin flag */
+ m = mk_list_entry_first(ins->tls_config_map, struct flb_config_map, _head);
+ if (p->flags & FLB_IO_TLS) {
+ m->value.val.boolean = FLB_TRUE;
+ }
+ else {
+ m->value.val.boolean = FLB_FALSE;
+ }
+#endif
+
+ /* Init network defaults */
+ flb_net_setup_init(&ins->net_setup);
+
+ if (flb_output_net_property_check(ins, config) == -1) {
+ flb_output_instance_destroy(ins);
+ return -1;
+ }
+
+ /* Initialize plugin through it 'init callback' */
+ ret = p->cb_init(ins, config, ins->data);
+ if (ret == -1) {
+ flb_error("[output] failed to initialize '%s' plugin",
+ p->name);
+ flb_output_instance_destroy(ins);
+ return -1;
+ }
+
+ /* Multi-threading enabled if configured */
+ ret = flb_output_enable_multi_threading(ins, config);
+ if (ret == -1) {
+ flb_error("[output] could not start thread pool for '%s' plugin",
+ flb_output_name(ins));
+ return -1;
+ }
+
+ /* initialize processors */
+ ret = flb_processor_init(ins->processor);
+ if (ret == -1) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/* Assign an Configuration context to an Output */
+void flb_output_set_context(struct flb_output_instance *ins, void *context)
+{
+ ins->context = context;
+}
+
+/* Check that at least one Output is enabled */
+int flb_output_check(struct flb_config *config)
+{
+ if (mk_list_is_empty(&config->outputs) == 0) {
+ return -1;
+ }
+ return 0;
+}
+
+/* Check output plugin's log level.
+ * Not for core plugins but for Golang plugins.
+ * Golang plugins do not have thread-local flb_worker_ctx information. */
+int flb_output_log_check(struct flb_output_instance *ins, int l)
+{
+ if (ins->log_level < l) {
+ return FLB_FALSE;
+ }
+
+ return FLB_TRUE;
+}
+
+/*
+ * Output plugins might have enabled certain features that have not been passed
+ * directly to the upstream context. In order to avoid let plugins validate specific
+ * variables from the instance context like tls, tls.x, keepalive, etc, we populate
+ * them directly through this function.
+ */
+int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance *ins)
+{
+ int flags = 0;
+
+ if (!u) {
+ return -1;
+ }
+
+ /* TLS */
+#ifdef FLB_HAVE_TLS
+ if (ins->use_tls == FLB_TRUE) {
+ flags |= FLB_IO_TLS;
+ }
+ else {
+ flags |= FLB_IO_TCP;
+ }
+#else
+ flags |= FLB_IO_TCP;
+#endif
+
+ /* IPv6 */
+ if (ins->host.ipv6 == FLB_TRUE) {
+ flags |= FLB_IO_IPV6;
+ }
+ /* keepalive */
+ if (ins->net_setup.keepalive == FLB_TRUE) {
+ flags |= FLB_IO_TCP_KA;
+ }
+
+ /* Set flags */
+ flb_stream_enable_flags(&u->base, flags);
+
+ flb_upstream_set_total_connections_label(u,
+ flb_output_name(ins));
+
+ flb_upstream_set_total_connections_gauge(u,
+ ins->cmt_upstream_total_connections);
+
+ flb_upstream_set_busy_connections_label(u,
+ flb_output_name(ins));
+
+ flb_upstream_set_busy_connections_gauge(u,
+ ins->cmt_upstream_busy_connections);
+
+ /*
+ * If the output plugin flush callbacks will run in multiple threads, enable
+ * the thread safe mode for the Upstream context.
+ */
+ if (ins->tp_workers > 0) {
+ flb_stream_enable_thread_safety(&u->base);
+
+ mk_list_add(&u->base._head, &ins->upstreams);
+ }
+
+ /* Set networking options 'net.*' received through instance properties */
+ memcpy(&u->base.net, &ins->net_setup, sizeof(struct flb_net_setup));
+
+ return 0;
+}
+
+int flb_output_upstream_ha_set(void *ha, struct flb_output_instance *ins)
+{
+ struct mk_list *head;
+ struct flb_upstream_node *node;
+ struct flb_upstream_ha *upstream_ha = ha;
+
+ mk_list_foreach(head, &upstream_ha->nodes) {
+ node = mk_list_entry(head, struct flb_upstream_node, _head);
+ flb_output_upstream_set(node->u, ins);
+ }
+
+ return 0;
+}
+
+/*
+ * Helper function to set HTTP callbacks using the output instance 'callback'
+ * context.
+ */
+int flb_output_set_http_debug_callbacks(struct flb_output_instance *ins)
+{
+#ifdef FLB_HAVE_HTTP_CLIENT_DEBUG
+ return flb_http_client_debug_setup(ins->callback, &ins->properties);
+#else
+ return 0;
+#endif
+}