summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_output.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:18 +0000
commit5da14042f70711ea5cf66e034699730335462f66 (patch)
tree0f6354ccac934ed87a2d555f45be4c831cf92f4a /fluent-bit/src/flb_output.c
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz
netdata-5da14042f70711ea5cf66e034699730335462f66.zip
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/flb_output.c')
-rw-r--r--fluent-bit/src/flb_output.c1445
1 files changed, 0 insertions, 1445 deletions
diff --git a/fluent-bit/src/flb_output.c b/fluent-bit/src/flb_output.c
deleted file mode 100644
index b1548f60d..000000000
--- a/fluent-bit/src/flb_output.c
+++ /dev/null
@@ -1,1445 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_mem.h>
-#include <fluent-bit/flb_str.h>
-#include <fluent-bit/flb_env.h>
-#include <fluent-bit/flb_coro.h>
-#include <fluent-bit/flb_output.h>
-#include <fluent-bit/flb_kv.h>
-#include <fluent-bit/flb_io.h>
-#include <fluent-bit/flb_uri.h>
-#include <fluent-bit/flb_config.h>
-#include <fluent-bit/flb_macros.h>
-#include <fluent-bit/flb_utils.h>
-#include <fluent-bit/flb_plugin.h>
-#include <fluent-bit/flb_plugin_proxy.h>
-#include <fluent-bit/flb_http_client_debug.h>
-#include <fluent-bit/flb_output_thread.h>
-#include <fluent-bit/flb_mp.h>
-#include <fluent-bit/flb_pack.h>
-
-FLB_TLS_DEFINE(struct flb_out_flush_params, out_flush_params);
-
-void flb_output_prepare()
-{
- FLB_TLS_INIT(out_flush_params);
-}
-
-/* Validate the the output address protocol */
-static int check_protocol(const char *prot, const char *output)
-{
- int len;
- char *p;
-
- p = strstr(output, "://");
- if (p && p != output) {
- len = p - output;
- }
- else {
- len = strlen(output);
- }
-
- if (strlen(prot) != len) {
- return 0;
- }
-
- /* Output plugin match */
- if (strncasecmp(prot, output, len) == 0) {
- return 1;
- }
-
- return 0;
-}
-
-
-/* Invoke pre-run call for the output plugin */
-void flb_output_pre_run(struct flb_config *config)
-{
- struct mk_list *head;
- struct flb_output_instance *ins;
- struct flb_output_plugin *p;
-
- mk_list_foreach(head, &config->outputs) {
- ins = mk_list_entry(head, struct flb_output_instance, _head);
- p = ins->p;
- if (p->cb_pre_run) {
- p->cb_pre_run(ins->context, config);
- }
- }
-}
-
-static void flb_output_free_properties(struct flb_output_instance *ins)
-{
-
- flb_kv_release(&ins->properties);
- flb_kv_release(&ins->net_properties);
-
-#ifdef FLB_HAVE_TLS
- if (ins->tls_vhost) {
- flb_sds_destroy(ins->tls_vhost);
- }
- if (ins->tls_ca_path) {
- flb_sds_destroy(ins->tls_ca_path);
- }
- if (ins->tls_ca_file) {
- flb_sds_destroy(ins->tls_ca_file);
- }
- if (ins->tls_crt_file) {
- flb_sds_destroy(ins->tls_crt_file);
- }
- if (ins->tls_key_file) {
- flb_sds_destroy(ins->tls_key_file);
- }
- if (ins->tls_key_passwd) {
- flb_sds_destroy(ins->tls_key_passwd);
- }
-#endif
-}
-
-void flb_output_flush_prepare_destroy(struct flb_output_flush *out_flush)
-{
- struct flb_output_instance *ins = out_flush->o_ins;
- struct flb_out_thread_instance *th_ins;
-
- /* Move output coroutine context from active list to the destroy one */
- if (flb_output_is_threaded(ins) == FLB_TRUE) {
- th_ins = flb_output_thread_instance_get();
- pthread_mutex_lock(&th_ins->flush_mutex);
- mk_list_del(&out_flush->_head);
- mk_list_add(&out_flush->_head, &th_ins->flush_list_destroy);
- pthread_mutex_unlock(&th_ins->flush_mutex);
- }
- else {
- mk_list_del(&out_flush->_head);
- mk_list_add(&out_flush->_head, &ins->flush_list_destroy);
- }
-}
-
-int flb_output_flush_id_get(struct flb_output_instance *ins)
-{
- int id;
- int max = (2 << 13) - 1; /* max for 14 bits */
- struct flb_out_thread_instance *th_ins;
-
- if (flb_output_is_threaded(ins) == FLB_TRUE) {
- th_ins = flb_output_thread_instance_get();
- id = th_ins->flush_id;
- th_ins->flush_id++;
-
- /* reset once it reach the maximum allowed */
- if (th_ins->flush_id > max) {
- th_ins->flush_id = 0;
- }
- }
- else {
- id = ins->flush_id;
- ins->flush_id++;
-
- /* reset once it reach the maximum allowed */
- if (ins->flush_id > max) {
- ins->flush_id = 0;
- }
- }
-
- return id;
-}
-
-void flb_output_coro_add(struct flb_output_instance *ins, struct flb_coro *coro)
-{
- struct flb_output_flush *out_flush;
-
- out_flush = (struct flb_output_flush *) FLB_CORO_DATA(coro);
- mk_list_add(&out_flush->_head, &ins->flush_list);
-}
-
-/*
- * Queue a task to be flushed at a later time
- * Deletes retry context if enqueue fails
- */
-static int flb_output_task_queue_enqueue(struct flb_task_queue *queue,
- struct flb_task_retry *retry,
- struct flb_task *task,
- struct flb_output_instance *out_ins,
- struct flb_config *config)
-{
- struct flb_task_enqueued *queued_task;
-
- queued_task = flb_malloc(sizeof(struct flb_task_enqueued));
- if (!queued_task) {
- flb_errno();
- if (retry) {
- flb_task_retry_destroy(retry);
- }
- return -1;
- }
- queued_task->retry = retry;
- queued_task->out_instance = out_ins;
- queued_task->task = task;
- queued_task->config = config;
-
- mk_list_add(&queued_task->_head, &queue->pending);
- return 0;
-}
-
-/*
- * Pop task from pending queue and flush it
- * Will delete retry context if flush fails
- */
-static int flb_output_task_queue_flush_one(struct flb_task_queue *queue)
-{
- struct flb_task_enqueued *queued_task;
- int ret;
- int is_empty;
-
- is_empty = mk_list_is_empty(&queue->pending) == 0;
- if (is_empty) {
- flb_error("Attempting to flush task from an empty in_progress queue");
- return -1;
- }
-
- queued_task = mk_list_entry_first(&queue->pending, struct flb_task_enqueued, _head);
- mk_list_del(&queued_task->_head);
- mk_list_add(&queued_task->_head, &queue->in_progress);
-
- /*
- * Remove temporary user now that task is out of singleplex queue.
- * Flush will add back the user representing queued_task->out_instance if it succeeds.
- */
- flb_task_users_dec(queued_task->task, FLB_FALSE);
- ret = flb_output_task_flush(queued_task->task,
- queued_task->out_instance,
- queued_task->config);
-
- /* Destroy retry context if needed */
- if (ret == -1) {
- if (queued_task->retry) {
- flb_task_retry_destroy(queued_task->retry);
- }
- /* Flush the next task */
- flb_output_task_singleplex_flush_next(queue);
- return -1;
- }
-
- return ret;
-}
-
-/*
- * Will either run or queue running a single task
- * Deletes retry context if enqueue fails
- */
-int flb_output_task_singleplex_enqueue(struct flb_task_queue *queue,
- struct flb_task_retry *retry,
- struct flb_task *task,
- struct flb_output_instance *out_ins,
- struct flb_config *config)
-{
- int ret;
- int is_empty;
-
- /*
- * Add temporary user to preserve task while in singleplex queue.
- * Temporary user will be removed when task is removed from queue.
- *
- * Note: if we fail to increment now, then the task may be prematurely
- * deleted if the task's users go to 0 while we are waiting in the
- * queue.
- */
- flb_task_users_inc(task);
-
- /* Enqueue task */
- ret = flb_output_task_queue_enqueue(queue, retry, task, out_ins, config);
- if (ret == -1) {
- return -1;
- }
-
- /* Launch task if nothing is running */
- is_empty = mk_list_is_empty(&out_ins->singleplex_queue->in_progress) == 0;
- if (is_empty) {
- return flb_output_task_queue_flush_one(out_ins->singleplex_queue);
- }
-
- return 0;
-}
-
-/*
- * Clear in progress task and flush a single queued task if exists
- * Deletes retry context on next flush if flush fails
- */
-int flb_output_task_singleplex_flush_next(struct flb_task_queue *queue)
-{
- int is_empty;
- struct flb_task_enqueued *ended_task;
-
- /* Remove in progress task */
- is_empty = mk_list_is_empty(&queue->in_progress) == 0;
- if (!is_empty) {
- ended_task = mk_list_entry_first(&queue->in_progress,
- struct flb_task_enqueued, _head);
- mk_list_del(&ended_task->_head);
- flb_free(ended_task);
- }
-
- /* Flush if there is a pending task queued */
- is_empty = mk_list_is_empty(&queue->pending) == 0;
- if (!is_empty) {
- return flb_output_task_queue_flush_one(queue);
- }
- return 0;
-}
-
-/*
- * Flush a task through the output plugin, either using a worker thread + coroutine
- * or a simple co-routine in the current thread.
- */
-int flb_output_task_flush(struct flb_task *task,
- struct flb_output_instance *out_ins,
- struct flb_config *config)
-{
- int ret;
- struct flb_output_flush *out_flush;
-
- if (flb_output_is_threaded(out_ins) == FLB_TRUE) {
- flb_task_users_inc(task);
-
- /* Dispatch the task to the thread pool */
- ret = flb_output_thread_pool_flush(task, out_ins, config);
- if (ret == -1) {
- flb_task_users_dec(task, FLB_FALSE);
-
- /* If we are in synchronous mode, flush one waiting task */
- if (out_ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
- flb_output_task_singleplex_flush_next(out_ins->singleplex_queue);
- }
- }
- }
- else {
- /* Queue co-routine handling */
- out_flush = flb_output_flush_create(task,
- task->i_ins,
- out_ins,
- config);
- if (!out_flush) {
- return -1;
- }
-
- flb_task_users_inc(task);
- ret = flb_pipe_w(config->ch_self_events[1], &out_flush,
- sizeof(struct flb_output_flush*));
- if (ret == -1) {
- flb_errno();
- flb_output_flush_destroy(out_flush);
- flb_task_users_dec(task, FLB_FALSE);
-
- /* If we are in synchronous mode, flush one waiting task */
- if (out_ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
- flb_output_task_singleplex_flush_next(out_ins->singleplex_queue);
- }
-
- return -1;
- }
- }
-
- return 0;
-}
-
-int flb_output_instance_destroy(struct flb_output_instance *ins)
-{
- if (ins->alias) {
- flb_sds_destroy(ins->alias);
- }
-
- /* Remove URI context */
- if (ins->host.uri) {
- flb_uri_destroy(ins->host.uri);
- }
-
- flb_sds_destroy(ins->host.name);
- flb_sds_destroy(ins->host.address);
- flb_sds_destroy(ins->host.listen);
- flb_sds_destroy(ins->match);
-
-#ifdef FLB_HAVE_REGEX
- if (ins->match_regex) {
- flb_regex_destroy(ins->match_regex);
- }
-#endif
-
-#ifdef FLB_HAVE_TLS
- if (ins->use_tls == FLB_TRUE) {
- if (ins->tls) {
- flb_tls_destroy(ins->tls);
- }
- }
-
- if (ins->tls_config_map) {
- flb_config_map_destroy(ins->tls_config_map);
- }
-#endif
-
- /* Remove metrics */
-#ifdef FLB_HAVE_METRICS
- if (ins->cmt) {
- cmt_destroy(ins->cmt);
- }
-
- if (ins->metrics) {
- flb_metrics_destroy(ins->metrics);
- }
-#endif
-
- /* destroy callback context */
- if (ins->callback) {
- flb_callback_destroy(ins->callback);
- }
-
- /* destroy config map */
- if (ins->config_map) {
- flb_config_map_destroy(ins->config_map);
- }
-
- if (ins->net_config_map) {
- flb_config_map_destroy(ins->net_config_map);
- }
-
- if (ins->ch_events[0] > 0) {
- mk_event_closesocket(ins->ch_events[0]);
- }
-
- if (ins->ch_events[1] > 0) {
- mk_event_closesocket(ins->ch_events[1]);
- }
-
- /* release properties */
- flb_output_free_properties(ins);
-
- /* free singleplex queue */
- if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
- flb_task_queue_destroy(ins->singleplex_queue);
- }
-
- mk_list_del(&ins->_head);
-
- /* processor */
- if (ins->processor) {
- flb_processor_destroy(ins->processor);
- }
-
- flb_free(ins);
-
- return 0;
-}
-
-/* Invoke exit call for the output plugin */
-void flb_output_exit(struct flb_config *config)
-{
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_output_instance *ins;
- struct flb_output_plugin *p;
- void *params;
-
- mk_list_foreach_safe(head, tmp, &config->outputs) {
- ins = mk_list_entry(head, struct flb_output_instance, _head);
- p = ins->p;
-
- /* Stop any worker thread */
- if (flb_output_is_threaded(ins) == FLB_TRUE) {
- flb_output_thread_pool_destroy(ins);
- }
-
- /* Check a exit callback */
- if (p->cb_exit) {
- p->cb_exit(ins->context, config);
- }
- flb_output_instance_destroy(ins);
- }
-
- params = FLB_TLS_GET(out_flush_params);
- if (params) {
- flb_free(params);
- }
-}
-
-static inline int instance_id(struct flb_config *config)
-{
- struct flb_output_instance *ins;
-
- if (mk_list_size(&config->outputs) == 0) {
- return 0;
- }
-
- ins = mk_list_entry_last(&config->outputs, struct flb_output_instance,
- _head);
- return (ins->id + 1);
-}
-
-struct flb_output_instance *flb_output_get_instance(struct flb_config *config,
- int out_id)
-{
- struct mk_list *head;
- struct flb_output_instance *ins;
-
- mk_list_foreach(head, &config->outputs) {
- ins = mk_list_entry(head, struct flb_output_instance, _head);
- if (ins->id == out_id) {
- break;
- }
- ins = NULL;
- }
-
- if (!ins) {
- return NULL;
- }
-
- return ins;
-}
-
-/*
- * Invoked everytime a flush callback has finished (returned). This function
- * is called from the event loop.
- */
-int flb_output_flush_finished(struct flb_config *config, int out_id)
-{
- struct mk_list *tmp;
- struct mk_list *head;
- struct mk_list *list;
- struct flb_output_instance *ins;
- struct flb_output_flush *out_flush;
- struct flb_out_thread_instance *th_ins;
-
- ins = flb_output_get_instance(config, out_id);
- if (!ins) {
- return -1;
- }
-
- if (flb_output_is_threaded(ins) == FLB_TRUE) {
- th_ins = flb_output_thread_instance_get();
- list = &th_ins->flush_list_destroy;
- }
- else {
- list = &ins->flush_list_destroy;
- }
-
- /* Look for output coroutines that needs to be destroyed */
- mk_list_foreach_safe(head, tmp, list) {
- out_flush = mk_list_entry(head, struct flb_output_flush, _head);
- flb_output_flush_destroy(out_flush);
- }
-
- return 0;
-}
-
-
-/*
- * It validate an output type given the string, it return the
- * proper type and if valid, populate the global config.
- */
-struct flb_output_instance *flb_output_new(struct flb_config *config,
- const char *output, void *data,
- int public_only)
-{
- int ret = -1;
- int flags = 0;
- struct mk_list *head;
- struct flb_output_plugin *plugin;
- struct flb_output_instance *instance = NULL;
-
- if (!output) {
- return NULL;
- }
-
- mk_list_foreach(head, &config->out_plugins) {
- plugin = mk_list_entry(head, struct flb_output_plugin, _head);
- if (!check_protocol(plugin->name, output)) {
- plugin = NULL;
- continue;
- }
-
- if (public_only && plugin->flags & FLB_OUTPUT_PRIVATE) {
- return NULL;
- }
- break;
- }
-
- if (!plugin) {
- return NULL;
- }
-
- /* Create and load instance */
- instance = flb_calloc(1, sizeof(struct flb_output_instance));
- if (!instance) {
- flb_errno();
- return NULL;
- }
-
- /* Initialize event type, if not set, default to FLB_OUTPUT_LOGS */
- if (plugin->event_type == 0) {
- instance->event_type = FLB_OUTPUT_LOGS;
- }
- else {
- instance->event_type = plugin->event_type;
- }
- instance->config = config;
- instance->log_level = -1;
- instance->log_suppress_interval = -1;
- instance->test_mode = FLB_FALSE;
- instance->is_threaded = FLB_FALSE;
- instance->tp_workers = plugin->workers;
-
- /* Retrieve an instance id for the output instance */
- instance->id = instance_id(config);
-
- /* format name (with instance id) */
- snprintf(instance->name, sizeof(instance->name) - 1,
- "%s.%i", plugin->name, instance->id);
- instance->p = plugin;
- instance->callback = flb_callback_create(instance->name);
- if (!instance->callback) {
- if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
- flb_task_queue_destroy(instance->singleplex_queue);
- }
- flb_free(instance);
- return NULL;
- }
-
- if (plugin->type == FLB_OUTPUT_PLUGIN_CORE) {
- instance->context = NULL;
- }
- else {
- struct flb_plugin_proxy_context *ctx;
-
- ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
- if (!ctx) {
- flb_errno();
- if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
- flb_task_queue_destroy(instance->singleplex_queue);
- }
- flb_free(instance);
- return NULL;
- }
-
- ctx->proxy = plugin->proxy;
-
- instance->context = ctx;
- }
-
- instance->alias = NULL;
- instance->flags = instance->p->flags;
- instance->data = data;
- instance->match = NULL;
-#ifdef FLB_HAVE_REGEX
- instance->match_regex = NULL;
-#endif
- instance->retry_limit = 1;
- instance->host.name = NULL;
- instance->host.address = NULL;
- instance->net_config_map = NULL;
-
- /* Storage */
- instance->total_limit_size = -1;
-
- /* Parent plugin flags */
- flags = instance->flags;
- if (flags & FLB_IO_TCP) {
- instance->use_tls = FLB_FALSE;
- }
- else if (flags & FLB_IO_TLS) {
- instance->use_tls = FLB_TRUE;
- }
- else if (flags & FLB_IO_OPT_TLS) {
- /* TLS must be enabled manually in the config */
- instance->use_tls = FLB_FALSE;
- instance->flags |= FLB_IO_TLS;
- }
-
-#ifdef FLB_HAVE_TLS
- instance->tls = NULL;
- instance->tls_debug = -1;
- instance->tls_verify = FLB_TRUE;
- instance->tls_vhost = NULL;
- instance->tls_ca_path = NULL;
- instance->tls_ca_file = NULL;
- instance->tls_crt_file = NULL;
- instance->tls_key_file = NULL;
- instance->tls_key_passwd = NULL;
-#endif
-
- if (plugin->flags & FLB_OUTPUT_NET) {
- ret = flb_net_host_set(plugin->name, &instance->host, output);
- if (ret != 0) {
- if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
- flb_task_queue_destroy(instance->singleplex_queue);
- }
- flb_free(instance);
- return NULL;
- }
- }
-
- /* Create singleplex queue if SYNCHRONOUS mode is used */
- instance->singleplex_queue = NULL;
- if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
- instance->singleplex_queue = flb_task_queue_create();
- if (!instance->singleplex_queue) {
- flb_free(instance);
- flb_errno();
- return NULL;
- }
- }
-
- flb_kv_init(&instance->properties);
- flb_kv_init(&instance->net_properties);
- mk_list_init(&instance->upstreams);
- mk_list_init(&instance->flush_list);
- mk_list_init(&instance->flush_list_destroy);
-
- mk_list_add(&instance->_head, &config->outputs);
-
- /* processor instance */
- instance->processor = flb_processor_create(config, instance->name, instance, FLB_PLUGIN_OUTPUT);
-
- /* Tests */
- instance->test_formatter.callback = plugin->test_formatter.callback;
-
-
- return instance;
-}
-
-static inline int prop_key_check(const char *key, const char *kv, int k_len)
-{
- int len;
-
- len = strlen(key);
- if (strncasecmp(key, kv, k_len) == 0 && len == k_len) {
- return 0;
- }
-
- return -1;
-}
-
-/* Override a configuration property for the given input_instance plugin */
-int flb_output_set_property(struct flb_output_instance *ins,
- const char *k, const char *v)
-{
- int len;
- int ret;
- ssize_t limit;
- flb_sds_t tmp;
- struct flb_kv *kv;
- struct flb_config *config = ins->config;
-
- len = strlen(k);
- tmp = flb_env_var_translate(config->env, v);
- if (tmp) {
- if (strlen(tmp) == 0) {
- flb_sds_destroy(tmp);
- tmp = NULL;
- }
- }
-
- /* Check if the key is a known/shared property */
- if (prop_key_check("match", k, len) == 0) {
- flb_utils_set_plugin_string_property("match", &ins->match, tmp);
- }
-#ifdef FLB_HAVE_REGEX
- else if (prop_key_check("match_regex", k, len) == 0 && tmp) {
- ins->match_regex = flb_regex_create(tmp);
- flb_sds_destroy(tmp);
- }
-#endif
- else if (prop_key_check("alias", k, len) == 0 && tmp) {
- flb_utils_set_plugin_string_property("alias", &ins->alias, tmp);
- }
- else if (prop_key_check("log_level", k, len) == 0 && tmp) {
- ret = flb_log_get_level_str(tmp);
- flb_sds_destroy(tmp);
- if (ret == -1) {
- return -1;
- }
- ins->log_level = ret;
- }
- else if (prop_key_check("log_suppress_interval", k, len) == 0 && tmp) {
- ret = flb_utils_time_to_seconds(tmp);
- flb_sds_destroy(tmp);
- if (ret == -1) {
- return -1;
- }
- ins->log_suppress_interval = ret;
- }
- else if (prop_key_check("host", k, len) == 0) {
- flb_utils_set_plugin_string_property("host", &ins->host.name, tmp);
- }
- else if (prop_key_check("port", k, len) == 0) {
- if (tmp) {
- ins->host.port = atoi(tmp);
- flb_sds_destroy(tmp);
- }
- else {
- ins->host.port = 0;
- }
- }
- else if (prop_key_check("ipv6", k, len) == 0 && tmp) {
- ins->host.ipv6 = flb_utils_bool(tmp);
- flb_sds_destroy(tmp);
- }
- else if (prop_key_check("retry_limit", k, len) == 0) {
- if (tmp) {
- if (strcasecmp(tmp, "no_limits") == 0 ||
- strcasecmp(tmp, "false") == 0 ||
- strcasecmp(tmp, "off") == 0) {
- /* No limits for retries */
- ins->retry_limit = FLB_OUT_RETRY_UNLIMITED;
- }
- else if (strcasecmp(tmp, "no_retries") == 0) {
- ins->retry_limit = FLB_OUT_RETRY_NONE;
- }
- else {
- ins->retry_limit = atoi(tmp);
- if (ins->retry_limit <= 0) {
- flb_warn("[config] invalid retry_limit. set default.");
- /* set default when input is invalid number */
- ins->retry_limit = 1;
- }
- }
- flb_sds_destroy(tmp);
- }
- else {
- ins->retry_limit = 1;
- }
- }
- else if (strncasecmp("net.", k, 4) == 0 && tmp) {
- kv = flb_kv_item_create(&ins->net_properties, (char *) k, NULL);
- if (!kv) {
- if (tmp) {
- flb_sds_destroy(tmp);
- }
- return -1;
- }
- kv->val = tmp;
- }
-#ifdef FLB_HAVE_HTTP_CLIENT_DEBUG
- else if (strncasecmp("_debug.http.", k, 12) == 0 && tmp) {
- ret = flb_http_client_debug_property_is_valid((char *) k, tmp);
- if (ret == FLB_TRUE) {
- kv = flb_kv_item_create(&ins->properties, (char *) k, NULL);
- if (!kv) {
- if (tmp) {
- flb_sds_destroy(tmp);
- }
- return -1;
- }
- kv->val = tmp;
- }
- else {
- flb_error("[config] invalid property '%s' on instance '%s'",
- k, flb_output_name(ins));
- flb_sds_destroy(tmp);
- }
- }
-#endif
-#ifdef FLB_HAVE_TLS
- else if (prop_key_check("tls", k, len) == 0 && tmp) {
- ins->use_tls = flb_utils_bool(tmp);
- if (ins->use_tls == FLB_TRUE && ((ins->flags & FLB_IO_TLS) == 0)) {
- flb_error("[config] %s does not support TLS", ins->name);
- flb_sds_destroy(tmp);
- return -1;
- }
- flb_sds_destroy(tmp);
- }
- else if (prop_key_check("tls.verify", k, len) == 0 && tmp) {
- ins->tls_verify = flb_utils_bool(tmp);
- flb_sds_destroy(tmp);
- }
- else if (prop_key_check("tls.debug", k, len) == 0 && tmp) {
- ins->tls_debug = atoi(tmp);
- flb_sds_destroy(tmp);
- }
- else if (prop_key_check("tls.vhost", k, len) == 0) {
- flb_utils_set_plugin_string_property("tls.vhost", &ins->tls_vhost, tmp);
- }
- else if (prop_key_check("tls.ca_path", k, len) == 0) {
- flb_utils_set_plugin_string_property("tls.ca_path", &ins->tls_ca_path, tmp);
- }
- else if (prop_key_check("tls.ca_file", k, len) == 0) {
- flb_utils_set_plugin_string_property("tls.ca_file", &ins->tls_ca_file, tmp);
- }
- else if (prop_key_check("tls.crt_file", k, len) == 0) {
- flb_utils_set_plugin_string_property("tls.crt_file", &ins->tls_crt_file, tmp);
- }
- else if (prop_key_check("tls.key_file", k, len) == 0) {
- flb_utils_set_plugin_string_property("tls.key_file", &ins->tls_key_file, tmp);
- }
- else if (prop_key_check("tls.key_passwd", k, len) == 0) {
- flb_utils_set_plugin_string_property("tls.key_passwd", &ins->tls_key_passwd, tmp);
- }
-#endif
- else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) {
- if (strcasecmp(tmp, "off") == 0 ||
- flb_utils_bool(tmp) == FLB_FALSE) {
- /* no limit for filesystem storage */
- limit = -1;
- flb_info("[config] unlimited filesystem buffer for %s plugin",
- ins->name);
- }
- else {
- limit = flb_utils_size_to_bytes(tmp);
- if (limit == -1) {
- flb_sds_destroy(tmp);
- return -1;
- }
-
- if (limit == 0) {
- limit = -1;
- }
- }
-
- flb_sds_destroy(tmp);
- ins->total_limit_size = (size_t) limit;
- }
- else if (prop_key_check("workers", k, len) == 0 && tmp) {
- /* Set the number of workers */
- ins->tp_workers = atoi(tmp);
- flb_sds_destroy(tmp);
- }
- else {
- /*
- * Create the property, we don't pass the value since we will
- * map it directly to avoid an extra memory allocation.
- */
- kv = flb_kv_item_create(&ins->properties, (char *) k, NULL);
- if (!kv) {
- if (tmp) {
- flb_sds_destroy(tmp);
- }
- return -1;
- }
- kv->val = tmp;
- }
-
- return 0;
-}
-
-/* Configure a default hostname and TCP port if they are not set */
-void flb_output_net_default(const char *host, const int port,
- struct flb_output_instance *ins)
-{
- /* Set default network configuration */
- if (!ins->host.name) {
- ins->host.name = flb_sds_create(host);
- }
- if (ins->host.port == 0) {
- ins->host.port = port;
- }
-}
-
-/* Add thread pool for output plugin if configured with workers */
-int flb_output_enable_multi_threading(struct flb_output_instance *ins, struct flb_config *config)
-{
- /* Multi-threading enabled ? (through 'workers' property) */
- if (ins->tp_workers > 0) {
- if(flb_output_thread_pool_create(config, ins) != 0) {
- flb_output_instance_destroy(ins);
- return -1;
- }
- flb_output_thread_pool_start(ins);
- }
-
- return 0;
-}
-
-/* Return an instance name or alias */
-const char *flb_output_name(struct flb_output_instance *ins)
-{
- if (ins->alias) {
- return ins->alias;
- }
-
- return ins->name;
-}
-
-const char *flb_output_get_property(const char *key, struct flb_output_instance *ins)
-{
- return flb_config_prop_get(key, &ins->properties);
-}
-
-#ifdef FLB_HAVE_METRICS
-void *flb_output_get_cmt_instance(struct flb_output_instance *ins)
-{
- return (void *)ins->cmt;
-}
-#endif
-
-int flb_output_net_property_check(struct flb_output_instance *ins,
- struct flb_config *config)
-{
- int ret = 0;
-
- /* Get Upstream net_setup configmap */
- ins->net_config_map = flb_upstream_get_config_map(config);
- if (!ins->net_config_map) {
- flb_output_instance_destroy(ins);
- return -1;
- }
-
- /*
- * Validate 'net.*' properties: if the plugin use the Upstream interface,
- * it might receive some networking settings.
- */
- if (mk_list_size(&ins->net_properties) > 0) {
- ret = flb_config_map_properties_check(ins->p->name,
- &ins->net_properties,
- ins->net_config_map);
- if (ret == -1) {
- if (config->program_name) {
- flb_helper("try the command: %s -o %s -h\n",
- config->program_name, ins->p->name);
- }
- return -1;
- }
- }
-
- return 0;
-}
-
-int flb_output_plugin_property_check(struct flb_output_instance *ins,
- struct flb_config *config)
-{
- int ret = 0;
- struct mk_list *config_map;
- struct flb_output_plugin *p = ins->p;
-
- if (p->config_map) {
- /*
- * Create a dynamic version of the configmap that will be used by the specific
- * instance in question.
- */
- config_map = flb_config_map_create(config, p->config_map);
- if (!config_map) {
- flb_error("[output] error loading config map for '%s' plugin",
- p->name);
- flb_output_instance_destroy(ins);
- return -1;
- }
- ins->config_map = config_map;
-
- /* Validate incoming properties against config map */
- ret = flb_config_map_properties_check(ins->p->name,
- &ins->properties, ins->config_map);
- if (ret == -1) {
- if (config->program_name) {
- flb_helper("try the command: %s -o %s -h\n",
- config->program_name, ins->p->name);
- }
- return -1;
- }
- }
-
- return 0;
-}
-
-/* Trigger the output plugins setup callbacks to prepare them. */
-int flb_output_init_all(struct flb_config *config)
-{
- int ret;
-#ifdef FLB_HAVE_METRICS
- char *name;
-#endif
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_output_instance *ins;
- struct flb_output_plugin *p;
- uint64_t ts;
-
- /* Retrieve the plugin reference */
- mk_list_foreach_safe(head, tmp, &config->outputs) {
- ins = mk_list_entry(head, struct flb_output_instance, _head);
- if (ins->log_level == -1) {
- ins->log_level = config->log->level;
- }
- p = ins->p;
-
- /* Output Events Channel */
- ret = mk_event_channel_create(config->evl,
- &ins->ch_events[0],
- &ins->ch_events[1],
- ins);
- if (ret != 0) {
- flb_error("could not create events channels for '%s'",
- flb_output_name(ins));
- flb_output_instance_destroy(ins);
- return -1;
- }
- flb_debug("[%s:%s] created event channels: read=%i write=%i",
- ins->p->name, flb_output_name(ins),
- ins->ch_events[0], ins->ch_events[1]);
-
- /*
- * Note: mk_event_channel_create() sets a type = MK_EVENT_NOTIFICATION by
- * default, we need to overwrite this value so we can do a clean check
- * into the Engine when the event is triggered.
- */
- ins->event.type = FLB_ENGINE_EV_OUTPUT;
-
- /* Metrics */
-#ifdef FLB_HAVE_METRICS
- /* Get name or alias for the instance */
- name = (char *) flb_output_name(ins);
-
- /* get timestamp */
- ts = cfl_time_now();
-
- /* CMetrics */
- ins->cmt = cmt_create();
- if (!ins->cmt) {
- flb_error("[output] could not create cmetrics context");
- return -1;
- }
-
- /*
- * Register generic output plugin metrics
- */
-
- /* fluentbit_output_proc_records_total */
- ins->cmt_proc_records = cmt_counter_create(ins->cmt, "fluentbit",
- "output", "proc_records_total",
- "Number of processed output records.",
- 1, (char *[]) {"name"});
- cmt_counter_set(ins->cmt_proc_records, ts, 0, 1, (char *[]) {name});
-
-
- /* fluentbit_output_proc_bytes_total */
- ins->cmt_proc_bytes = cmt_counter_create(ins->cmt, "fluentbit",
- "output", "proc_bytes_total",
- "Number of processed output bytes.",
- 1, (char *[]) {"name"});
- cmt_counter_set(ins->cmt_proc_bytes, ts, 0, 1, (char *[]) {name});
-
-
- /* fluentbit_output_errors_total */
- ins->cmt_errors = cmt_counter_create(ins->cmt, "fluentbit",
- "output", "errors_total",
- "Number of output errors.",
- 1, (char *[]) {"name"});
- cmt_counter_set(ins->cmt_errors, ts, 0, 1, (char *[]) {name});
-
-
- /* fluentbit_output_retries_total */
- ins->cmt_retries = cmt_counter_create(ins->cmt, "fluentbit",
- "output", "retries_total",
- "Number of output retries.",
- 1, (char *[]) {"name"});
- cmt_counter_set(ins->cmt_retries, ts, 0, 1, (char *[]) {name});
-
- /* fluentbit_output_retries_failed_total */
- ins->cmt_retries_failed = cmt_counter_create(ins->cmt, "fluentbit",
- "output", "retries_failed_total",
- "Number of abandoned batches because "
- "the maximum number of re-tries was "
- "reached.",
- 1, (char *[]) {"name"});
- cmt_counter_set(ins->cmt_retries_failed, ts, 0, 1, (char *[]) {name});
-
-
- /* fluentbit_output_dropped_records_total */
- ins->cmt_dropped_records = cmt_counter_create(ins->cmt, "fluentbit",
- "output", "dropped_records_total",
- "Number of dropped records.",
- 1, (char *[]) {"name"});
- cmt_counter_set(ins->cmt_dropped_records, ts, 0, 1, (char *[]) {name});
-
- /* fluentbit_output_retried_records_total */
- ins->cmt_retried_records = cmt_counter_create(ins->cmt, "fluentbit",
- "output", "retried_records_total",
- "Number of retried records.",
- 1, (char *[]) {"name"});
- cmt_counter_set(ins->cmt_retried_records, ts, 0, 1, (char *[]) {name});
-
- /* output_upstream_total_connections */
- ins->cmt_upstream_total_connections = cmt_gauge_create(ins->cmt,
- "fluentbit",
- "output",
- "upstream_total_connections",
- "Total Connection count.",
- 1, (char *[]) {"name"});
- cmt_gauge_set(ins->cmt_upstream_total_connections,
- ts,
- 0,
- 1, (char *[]) {name});
-
- /* output_upstream_total_connections */
- ins->cmt_upstream_busy_connections = cmt_gauge_create(ins->cmt,
- "fluentbit",
- "output",
- "upstream_busy_connections",
- "Busy Connection count.",
- 1, (char *[]) {"name"});
- cmt_gauge_set(ins->cmt_upstream_busy_connections,
- ts,
- 0,
- 1, (char *[]) {name});
-
- /* old API */
- ins->metrics = flb_metrics_create(name);
- if (ins->metrics) {
- flb_metrics_add(FLB_METRIC_OUT_OK_RECORDS,
- "proc_records", ins->metrics);
- flb_metrics_add(FLB_METRIC_OUT_OK_BYTES,
- "proc_bytes", ins->metrics);
- flb_metrics_add(FLB_METRIC_OUT_ERROR,
- "errors", ins->metrics);
- flb_metrics_add(FLB_METRIC_OUT_RETRY,
- "retries", ins->metrics);
- flb_metrics_add(FLB_METRIC_OUT_RETRY_FAILED,
- "retries_failed", ins->metrics);
- flb_metrics_add(FLB_METRIC_OUT_DROPPED_RECORDS,
- "dropped_records", ins->metrics);
- flb_metrics_add(FLB_METRIC_OUT_RETRIED_RECORDS,
- "retried_records", ins->metrics);
- }
-#endif
-
-#ifdef FLB_HAVE_PROXY_GO
- /* Proxy plugins have their own initialization */
- if (p->type == FLB_OUTPUT_PLUGIN_PROXY) {
- ret = flb_plugin_proxy_output_init(p->proxy, ins, config);
- if (ret == -1) {
- flb_output_instance_destroy(ins);
- return -1;
- }
-
- /* Multi-threading enabled if configured */
- ret = flb_output_enable_multi_threading(ins, config);
- if (ret == -1) {
- flb_error("[output] could not start thread pool for '%s' plugin",
- p->name);
- return -1;
- }
-
- continue;
- }
-#endif
-
-#ifdef FLB_HAVE_TLS
- if (ins->use_tls == FLB_TRUE) {
- ins->tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
- ins->tls_verify,
- ins->tls_debug,
- ins->tls_vhost,
- ins->tls_ca_path,
- ins->tls_ca_file,
- ins->tls_crt_file,
- ins->tls_key_file,
- ins->tls_key_passwd);
- if (!ins->tls) {
- flb_error("[output %s] error initializing TLS context",
- ins->name);
- flb_output_instance_destroy(ins);
- return -1;
- }
- }
-#endif
- /*
- * Before to call the initialization callback, make sure that the received
- * configuration parameters are valid if the plugin is registering a config map.
- */
- if (flb_output_plugin_property_check(ins, config) == -1) {
- flb_output_instance_destroy(ins);
- return -1;
- }
-
-#ifdef FLB_HAVE_TLS
- struct flb_config_map *m;
-
- /* TLS config map (just for 'help' formatting purposes) */
- ins->tls_config_map = flb_tls_get_config_map(config);
- if (!ins->tls_config_map) {
- flb_output_instance_destroy(ins);
- return -1;
- }
-
- /* Override first configmap value based on it plugin flag */
- m = mk_list_entry_first(ins->tls_config_map, struct flb_config_map, _head);
- if (p->flags & FLB_IO_TLS) {
- m->value.val.boolean = FLB_TRUE;
- }
- else {
- m->value.val.boolean = FLB_FALSE;
- }
-#endif
-
- /* Init network defaults */
- flb_net_setup_init(&ins->net_setup);
-
- if (flb_output_net_property_check(ins, config) == -1) {
- flb_output_instance_destroy(ins);
- return -1;
- }
-
- /* Initialize plugin through it 'init callback' */
- ret = p->cb_init(ins, config, ins->data);
- if (ret == -1) {
- flb_error("[output] failed to initialize '%s' plugin",
- p->name);
- flb_output_instance_destroy(ins);
- return -1;
- }
-
- /* Multi-threading enabled if configured */
- ret = flb_output_enable_multi_threading(ins, config);
- if (ret == -1) {
- flb_error("[output] could not start thread pool for '%s' plugin",
- flb_output_name(ins));
- return -1;
- }
-
- /* initialize processors */
- ret = flb_processor_init(ins->processor);
- if (ret == -1) {
- return -1;
- }
- }
-
- return 0;
-}
-
-/* Assign an Configuration context to an Output */
-void flb_output_set_context(struct flb_output_instance *ins, void *context)
-{
- ins->context = context;
-}
-
-/* Check that at least one Output is enabled */
-int flb_output_check(struct flb_config *config)
-{
- if (mk_list_is_empty(&config->outputs) == 0) {
- return -1;
- }
- return 0;
-}
-
-/* Check output plugin's log level.
- * Not for core plugins but for Golang plugins.
- * Golang plugins do not have thread-local flb_worker_ctx information. */
-int flb_output_log_check(struct flb_output_instance *ins, int l)
-{
- if (ins->log_level < l) {
- return FLB_FALSE;
- }
-
- return FLB_TRUE;
-}
-
-/*
- * Output plugins might have enabled certain features that have not been passed
- * directly to the upstream context. In order to avoid let plugins validate specific
- * variables from the instance context like tls, tls.x, keepalive, etc, we populate
- * them directly through this function.
- */
-int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance *ins)
-{
- int flags = 0;
-
- if (!u) {
- return -1;
- }
-
- /* TLS */
-#ifdef FLB_HAVE_TLS
- if (ins->use_tls == FLB_TRUE) {
- flags |= FLB_IO_TLS;
- }
- else {
- flags |= FLB_IO_TCP;
- }
-#else
- flags |= FLB_IO_TCP;
-#endif
-
- /* IPv6 */
- if (ins->host.ipv6 == FLB_TRUE) {
- flags |= FLB_IO_IPV6;
- }
- /* keepalive */
- if (ins->net_setup.keepalive == FLB_TRUE) {
- flags |= FLB_IO_TCP_KA;
- }
-
- /* Set flags */
- flb_stream_enable_flags(&u->base, flags);
-
- flb_upstream_set_total_connections_label(u,
- flb_output_name(ins));
-
- flb_upstream_set_total_connections_gauge(u,
- ins->cmt_upstream_total_connections);
-
- flb_upstream_set_busy_connections_label(u,
- flb_output_name(ins));
-
- flb_upstream_set_busy_connections_gauge(u,
- ins->cmt_upstream_busy_connections);
-
- /*
- * If the output plugin flush callbacks will run in multiple threads, enable
- * the thread safe mode for the Upstream context.
- */
- if (ins->tp_workers > 0) {
- flb_stream_enable_thread_safety(&u->base);
-
- mk_list_add(&u->base._head, &ins->upstreams);
- }
-
- /* Set networking options 'net.*' received through instance properties */
- memcpy(&u->base.net, &ins->net_setup, sizeof(struct flb_net_setup));
-
- return 0;
-}
-
-int flb_output_upstream_ha_set(void *ha, struct flb_output_instance *ins)
-{
- struct mk_list *head;
- struct flb_upstream_node *node;
- struct flb_upstream_ha *upstream_ha = ha;
-
- mk_list_foreach(head, &upstream_ha->nodes) {
- node = mk_list_entry(head, struct flb_upstream_node, _head);
- flb_output_upstream_set(node->u, ins);
- }
-
- return 0;
-}
-
-/*
- * Helper function to set HTTP callbacks using the output instance 'callback'
- * context.
- */
-int flb_output_set_http_debug_callbacks(struct flb_output_instance *ins)
-{
-#ifdef FLB_HAVE_HTTP_CLIENT_DEBUG
- return flb_http_client_debug_setup(ins->callback, &ins->properties);
-#else
- return 0;
-#endif
-}