/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ /* Fluent Bit * ========== * Copyright (C) 2015-2022 The Fluent Bit Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include FLB_TLS_DEFINE(struct flb_out_flush_params, out_flush_params); void flb_output_prepare() { FLB_TLS_INIT(out_flush_params); } /* Validate the the output address protocol */ static int check_protocol(const char *prot, const char *output) { int len; char *p; p = strstr(output, "://"); if (p && p != output) { len = p - output; } else { len = strlen(output); } if (strlen(prot) != len) { return 0; } /* Output plugin match */ if (strncasecmp(prot, output, len) == 0) { return 1; } return 0; } /* Invoke pre-run call for the output plugin */ void flb_output_pre_run(struct flb_config *config) { struct mk_list *head; struct flb_output_instance *ins; struct flb_output_plugin *p; mk_list_foreach(head, &config->outputs) { ins = mk_list_entry(head, struct flb_output_instance, _head); p = ins->p; if (p->cb_pre_run) { p->cb_pre_run(ins->context, config); } } } static void flb_output_free_properties(struct flb_output_instance *ins) { flb_kv_release(&ins->properties); flb_kv_release(&ins->net_properties); #ifdef FLB_HAVE_TLS if (ins->tls_vhost) { flb_sds_destroy(ins->tls_vhost); } if (ins->tls_ca_path) { flb_sds_destroy(ins->tls_ca_path); } if (ins->tls_ca_file) { flb_sds_destroy(ins->tls_ca_file); } if (ins->tls_crt_file) { flb_sds_destroy(ins->tls_crt_file); } if (ins->tls_key_file) { flb_sds_destroy(ins->tls_key_file); } if (ins->tls_key_passwd) { flb_sds_destroy(ins->tls_key_passwd); } #endif } void flb_output_flush_prepare_destroy(struct flb_output_flush *out_flush) { struct flb_output_instance *ins = out_flush->o_ins; struct flb_out_thread_instance *th_ins; /* Move output coroutine context from active list to the destroy one */ if (flb_output_is_threaded(ins) == FLB_TRUE) { th_ins = flb_output_thread_instance_get(); pthread_mutex_lock(&th_ins->flush_mutex); mk_list_del(&out_flush->_head); mk_list_add(&out_flush->_head, &th_ins->flush_list_destroy); pthread_mutex_unlock(&th_ins->flush_mutex); } else { mk_list_del(&out_flush->_head); mk_list_add(&out_flush->_head, &ins->flush_list_destroy); } } int flb_output_flush_id_get(struct flb_output_instance *ins) { int id; int max = (2 << 13) - 1; /* max for 14 bits */ struct flb_out_thread_instance *th_ins; if (flb_output_is_threaded(ins) == FLB_TRUE) { th_ins = flb_output_thread_instance_get(); id = th_ins->flush_id; th_ins->flush_id++; /* reset once it reach the maximum allowed */ if (th_ins->flush_id > max) { th_ins->flush_id = 0; } } else { id = ins->flush_id; ins->flush_id++; /* reset once it reach the maximum allowed */ if (ins->flush_id > max) { ins->flush_id = 0; } } return id; } void flb_output_coro_add(struct flb_output_instance *ins, struct flb_coro *coro) { struct flb_output_flush *out_flush; out_flush = (struct flb_output_flush *) FLB_CORO_DATA(coro); mk_list_add(&out_flush->_head, &ins->flush_list); } /* * Queue a task to be flushed at a later time * Deletes retry context if enqueue fails */ static int flb_output_task_queue_enqueue(struct flb_task_queue *queue, struct flb_task_retry *retry, struct flb_task *task, struct flb_output_instance *out_ins, struct flb_config *config) { struct flb_task_enqueued *queued_task; queued_task = flb_malloc(sizeof(struct flb_task_enqueued)); if (!queued_task) { flb_errno(); if (retry) { flb_task_retry_destroy(retry); } return -1; } queued_task->retry = retry; queued_task->out_instance = out_ins; queued_task->task = task; queued_task->config = config; mk_list_add(&queued_task->_head, &queue->pending); return 0; } /* * Pop task from pending queue and flush it * Will delete retry context if flush fails */ static int flb_output_task_queue_flush_one(struct flb_task_queue *queue) { struct flb_task_enqueued *queued_task; int ret; int is_empty; is_empty = mk_list_is_empty(&queue->pending) == 0; if (is_empty) { flb_error("Attempting to flush task from an empty in_progress queue"); return -1; } queued_task = mk_list_entry_first(&queue->pending, struct flb_task_enqueued, _head); mk_list_del(&queued_task->_head); mk_list_add(&queued_task->_head, &queue->in_progress); /* * Remove temporary user now that task is out of singleplex queue. * Flush will add back the user representing queued_task->out_instance if it succeeds. */ flb_task_users_dec(queued_task->task, FLB_FALSE); ret = flb_output_task_flush(queued_task->task, queued_task->out_instance, queued_task->config); /* Destroy retry context if needed */ if (ret == -1) { if (queued_task->retry) { flb_task_retry_destroy(queued_task->retry); } /* Flush the next task */ flb_output_task_singleplex_flush_next(queue); return -1; } return ret; } /* * Will either run or queue running a single task * Deletes retry context if enqueue fails */ int flb_output_task_singleplex_enqueue(struct flb_task_queue *queue, struct flb_task_retry *retry, struct flb_task *task, struct flb_output_instance *out_ins, struct flb_config *config) { int ret; int is_empty; /* * Add temporary user to preserve task while in singleplex queue. * Temporary user will be removed when task is removed from queue. * * Note: if we fail to increment now, then the task may be prematurely * deleted if the task's users go to 0 while we are waiting in the * queue. */ flb_task_users_inc(task); /* Enqueue task */ ret = flb_output_task_queue_enqueue(queue, retry, task, out_ins, config); if (ret == -1) { return -1; } /* Launch task if nothing is running */ is_empty = mk_list_is_empty(&out_ins->singleplex_queue->in_progress) == 0; if (is_empty) { return flb_output_task_queue_flush_one(out_ins->singleplex_queue); } return 0; } /* * Clear in progress task and flush a single queued task if exists * Deletes retry context on next flush if flush fails */ int flb_output_task_singleplex_flush_next(struct flb_task_queue *queue) { int is_empty; struct flb_task_enqueued *ended_task; /* Remove in progress task */ is_empty = mk_list_is_empty(&queue->in_progress) == 0; if (!is_empty) { ended_task = mk_list_entry_first(&queue->in_progress, struct flb_task_enqueued, _head); mk_list_del(&ended_task->_head); flb_free(ended_task); } /* Flush if there is a pending task queued */ is_empty = mk_list_is_empty(&queue->pending) == 0; if (!is_empty) { return flb_output_task_queue_flush_one(queue); } return 0; } /* * Flush a task through the output plugin, either using a worker thread + coroutine * or a simple co-routine in the current thread. */ int flb_output_task_flush(struct flb_task *task, struct flb_output_instance *out_ins, struct flb_config *config) { int ret; struct flb_output_flush *out_flush; if (flb_output_is_threaded(out_ins) == FLB_TRUE) { flb_task_users_inc(task); /* Dispatch the task to the thread pool */ ret = flb_output_thread_pool_flush(task, out_ins, config); if (ret == -1) { flb_task_users_dec(task, FLB_FALSE); /* If we are in synchronous mode, flush one waiting task */ if (out_ins->flags & FLB_OUTPUT_SYNCHRONOUS) { flb_output_task_singleplex_flush_next(out_ins->singleplex_queue); } } } else { /* Queue co-routine handling */ out_flush = flb_output_flush_create(task, task->i_ins, out_ins, config); if (!out_flush) { return -1; } flb_task_users_inc(task); ret = flb_pipe_w(config->ch_self_events[1], &out_flush, sizeof(struct flb_output_flush*)); if (ret == -1) { flb_errno(); flb_output_flush_destroy(out_flush); flb_task_users_dec(task, FLB_FALSE); /* If we are in synchronous mode, flush one waiting task */ if (out_ins->flags & FLB_OUTPUT_SYNCHRONOUS) { flb_output_task_singleplex_flush_next(out_ins->singleplex_queue); } return -1; } } return 0; } int flb_output_instance_destroy(struct flb_output_instance *ins) { if (ins->alias) { flb_sds_destroy(ins->alias); } /* Remove URI context */ if (ins->host.uri) { flb_uri_destroy(ins->host.uri); } flb_sds_destroy(ins->host.name); flb_sds_destroy(ins->host.address); flb_sds_destroy(ins->host.listen); flb_sds_destroy(ins->match); #ifdef FLB_HAVE_REGEX if (ins->match_regex) { flb_regex_destroy(ins->match_regex); } #endif #ifdef FLB_HAVE_TLS if (ins->use_tls == FLB_TRUE) { if (ins->tls) { flb_tls_destroy(ins->tls); } } if (ins->tls_config_map) { flb_config_map_destroy(ins->tls_config_map); } #endif /* Remove metrics */ #ifdef FLB_HAVE_METRICS if (ins->cmt) { cmt_destroy(ins->cmt); } if (ins->metrics) { flb_metrics_destroy(ins->metrics); } #endif /* destroy callback context */ if (ins->callback) { flb_callback_destroy(ins->callback); } /* destroy config map */ if (ins->config_map) { flb_config_map_destroy(ins->config_map); } if (ins->net_config_map) { flb_config_map_destroy(ins->net_config_map); } if (ins->ch_events[0] > 0) { mk_event_closesocket(ins->ch_events[0]); } if (ins->ch_events[1] > 0) { mk_event_closesocket(ins->ch_events[1]); } /* release properties */ flb_output_free_properties(ins); /* free singleplex queue */ if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) { flb_task_queue_destroy(ins->singleplex_queue); } mk_list_del(&ins->_head); /* processor */ if (ins->processor) { flb_processor_destroy(ins->processor); } flb_free(ins); return 0; } /* Invoke exit call for the output plugin */ void flb_output_exit(struct flb_config *config) { struct mk_list *tmp; struct mk_list *head; struct flb_output_instance *ins; struct flb_output_plugin *p; void *params; mk_list_foreach_safe(head, tmp, &config->outputs) { ins = mk_list_entry(head, struct flb_output_instance, _head); p = ins->p; /* Stop any worker thread */ if (flb_output_is_threaded(ins) == FLB_TRUE) { flb_output_thread_pool_destroy(ins); } /* Check a exit callback */ if (p->cb_exit) { p->cb_exit(ins->context, config); } flb_output_instance_destroy(ins); } params = FLB_TLS_GET(out_flush_params); if (params) { flb_free(params); } } static inline int instance_id(struct flb_config *config) { struct flb_output_instance *ins; if (mk_list_size(&config->outputs) == 0) { return 0; } ins = mk_list_entry_last(&config->outputs, struct flb_output_instance, _head); return (ins->id + 1); } struct flb_output_instance *flb_output_get_instance(struct flb_config *config, int out_id) { struct mk_list *head; struct flb_output_instance *ins; mk_list_foreach(head, &config->outputs) { ins = mk_list_entry(head, struct flb_output_instance, _head); if (ins->id == out_id) { break; } ins = NULL; } if (!ins) { return NULL; } return ins; } /* * Invoked everytime a flush callback has finished (returned). This function * is called from the event loop. */ int flb_output_flush_finished(struct flb_config *config, int out_id) { struct mk_list *tmp; struct mk_list *head; struct mk_list *list; struct flb_output_instance *ins; struct flb_output_flush *out_flush; struct flb_out_thread_instance *th_ins; ins = flb_output_get_instance(config, out_id); if (!ins) { return -1; } if (flb_output_is_threaded(ins) == FLB_TRUE) { th_ins = flb_output_thread_instance_get(); list = &th_ins->flush_list_destroy; } else { list = &ins->flush_list_destroy; } /* Look for output coroutines that needs to be destroyed */ mk_list_foreach_safe(head, tmp, list) { out_flush = mk_list_entry(head, struct flb_output_flush, _head); flb_output_flush_destroy(out_flush); } return 0; } /* * It validate an output type given the string, it return the * proper type and if valid, populate the global config. */ struct flb_output_instance *flb_output_new(struct flb_config *config, const char *output, void *data, int public_only) { int ret = -1; int flags = 0; struct mk_list *head; struct flb_output_plugin *plugin; struct flb_output_instance *instance = NULL; if (!output) { return NULL; } mk_list_foreach(head, &config->out_plugins) { plugin = mk_list_entry(head, struct flb_output_plugin, _head); if (!check_protocol(plugin->name, output)) { plugin = NULL; continue; } if (public_only && plugin->flags & FLB_OUTPUT_PRIVATE) { return NULL; } break; } if (!plugin) { return NULL; } /* Create and load instance */ instance = flb_calloc(1, sizeof(struct flb_output_instance)); if (!instance) { flb_errno(); return NULL; } /* Initialize event type, if not set, default to FLB_OUTPUT_LOGS */ if (plugin->event_type == 0) { instance->event_type = FLB_OUTPUT_LOGS; } else { instance->event_type = plugin->event_type; } instance->config = config; instance->log_level = -1; instance->log_suppress_interval = -1; instance->test_mode = FLB_FALSE; instance->is_threaded = FLB_FALSE; instance->tp_workers = plugin->workers; /* Retrieve an instance id for the output instance */ instance->id = instance_id(config); /* format name (with instance id) */ snprintf(instance->name, sizeof(instance->name) - 1, "%s.%i", plugin->name, instance->id); instance->p = plugin; instance->callback = flb_callback_create(instance->name); if (!instance->callback) { if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) { flb_task_queue_destroy(instance->singleplex_queue); } flb_free(instance); return NULL; } if (plugin->type == FLB_OUTPUT_PLUGIN_CORE) { instance->context = NULL; } else { struct flb_plugin_proxy_context *ctx; ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context)); if (!ctx) { flb_errno(); if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) { flb_task_queue_destroy(instance->singleplex_queue); } flb_free(instance); return NULL; } ctx->proxy = plugin->proxy; instance->context = ctx; } instance->alias = NULL; instance->flags = instance->p->flags; instance->data = data; instance->match = NULL; #ifdef FLB_HAVE_REGEX instance->match_regex = NULL; #endif instance->retry_limit = 1; instance->host.name = NULL; instance->host.address = NULL; instance->net_config_map = NULL; /* Storage */ instance->total_limit_size = -1; /* Parent plugin flags */ flags = instance->flags; if (flags & FLB_IO_TCP) { instance->use_tls = FLB_FALSE; } else if (flags & FLB_IO_TLS) { instance->use_tls = FLB_TRUE; } else if (flags & FLB_IO_OPT_TLS) { /* TLS must be enabled manually in the config */ instance->use_tls = FLB_FALSE; instance->flags |= FLB_IO_TLS; } #ifdef FLB_HAVE_TLS instance->tls = NULL; instance->tls_debug = -1; instance->tls_verify = FLB_TRUE; instance->tls_vhost = NULL; instance->tls_ca_path = NULL; instance->tls_ca_file = NULL; instance->tls_crt_file = NULL; instance->tls_key_file = NULL; instance->tls_key_passwd = NULL; #endif if (plugin->flags & FLB_OUTPUT_NET) { ret = flb_net_host_set(plugin->name, &instance->host, output); if (ret != 0) { if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) { flb_task_queue_destroy(instance->singleplex_queue); } flb_free(instance); return NULL; } } /* Create singleplex queue if SYNCHRONOUS mode is used */ instance->singleplex_queue = NULL; if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) { instance->singleplex_queue = flb_task_queue_create(); if (!instance->singleplex_queue) { flb_free(instance); flb_errno(); return NULL; } } flb_kv_init(&instance->properties); flb_kv_init(&instance->net_properties); mk_list_init(&instance->upstreams); mk_list_init(&instance->flush_list); mk_list_init(&instance->flush_list_destroy); mk_list_add(&instance->_head, &config->outputs); /* processor instance */ instance->processor = flb_processor_create(config, instance->name, instance, FLB_PLUGIN_OUTPUT); /* Tests */ instance->test_formatter.callback = plugin->test_formatter.callback; return instance; } static inline int prop_key_check(const char *key, const char *kv, int k_len) { int len; len = strlen(key); if (strncasecmp(key, kv, k_len) == 0 && len == k_len) { return 0; } return -1; } /* Override a configuration property for the given input_instance plugin */ int flb_output_set_property(struct flb_output_instance *ins, const char *k, const char *v) { int len; int ret; ssize_t limit; flb_sds_t tmp; struct flb_kv *kv; struct flb_config *config = ins->config; len = strlen(k); tmp = flb_env_var_translate(config->env, v); if (tmp) { if (strlen(tmp) == 0) { flb_sds_destroy(tmp); tmp = NULL; } } /* Check if the key is a known/shared property */ if (prop_key_check("match", k, len) == 0) { flb_utils_set_plugin_string_property("match", &ins->match, tmp); } #ifdef FLB_HAVE_REGEX else if (prop_key_check("match_regex", k, len) == 0 && tmp) { ins->match_regex = flb_regex_create(tmp); flb_sds_destroy(tmp); } #endif else if (prop_key_check("alias", k, len) == 0 && tmp) { flb_utils_set_plugin_string_property("alias", &ins->alias, tmp); } else if (prop_key_check("log_level", k, len) == 0 && tmp) { ret = flb_log_get_level_str(tmp); flb_sds_destroy(tmp); if (ret == -1) { return -1; } ins->log_level = ret; } else if (prop_key_check("log_suppress_interval", k, len) == 0 && tmp) { ret = flb_utils_time_to_seconds(tmp); flb_sds_destroy(tmp); if (ret == -1) { return -1; } ins->log_suppress_interval = ret; } else if (prop_key_check("host", k, len) == 0) { flb_utils_set_plugin_string_property("host", &ins->host.name, tmp); } else if (prop_key_check("port", k, len) == 0) { if (tmp) { ins->host.port = atoi(tmp); flb_sds_destroy(tmp); } else { ins->host.port = 0; } } else if (prop_key_check("ipv6", k, len) == 0 && tmp) { ins->host.ipv6 = flb_utils_bool(tmp); flb_sds_destroy(tmp); } else if (prop_key_check("retry_limit", k, len) == 0) { if (tmp) { if (strcasecmp(tmp, "no_limits") == 0 || strcasecmp(tmp, "false") == 0 || strcasecmp(tmp, "off") == 0) { /* No limits for retries */ ins->retry_limit = FLB_OUT_RETRY_UNLIMITED; } else if (strcasecmp(tmp, "no_retries") == 0) { ins->retry_limit = FLB_OUT_RETRY_NONE; } else { ins->retry_limit = atoi(tmp); if (ins->retry_limit <= 0) { flb_warn("[config] invalid retry_limit. set default."); /* set default when input is invalid number */ ins->retry_limit = 1; } } flb_sds_destroy(tmp); } else { ins->retry_limit = 1; } } else if (strncasecmp("net.", k, 4) == 0 && tmp) { kv = flb_kv_item_create(&ins->net_properties, (char *) k, NULL); if (!kv) { if (tmp) { flb_sds_destroy(tmp); } return -1; } kv->val = tmp; } #ifdef FLB_HAVE_HTTP_CLIENT_DEBUG else if (strncasecmp("_debug.http.", k, 12) == 0 && tmp) { ret = flb_http_client_debug_property_is_valid((char *) k, tmp); if (ret == FLB_TRUE) { kv = flb_kv_item_create(&ins->properties, (char *) k, NULL); if (!kv) { if (tmp) { flb_sds_destroy(tmp); } return -1; } kv->val = tmp; } else { flb_error("[config] invalid property '%s' on instance '%s'", k, flb_output_name(ins)); flb_sds_destroy(tmp); } } #endif #ifdef FLB_HAVE_TLS else if (prop_key_check("tls", k, len) == 0 && tmp) { ins->use_tls = flb_utils_bool(tmp); if (ins->use_tls == FLB_TRUE && ((ins->flags & FLB_IO_TLS) == 0)) { flb_error("[config] %s does not support TLS", ins->name); flb_sds_destroy(tmp); return -1; } flb_sds_destroy(tmp); } else if (prop_key_check("tls.verify", k, len) == 0 && tmp) { ins->tls_verify = flb_utils_bool(tmp); flb_sds_destroy(tmp); } else if (prop_key_check("tls.debug", k, len) == 0 && tmp) { ins->tls_debug = atoi(tmp); flb_sds_destroy(tmp); } else if (prop_key_check("tls.vhost", k, len) == 0) { flb_utils_set_plugin_string_property("tls.vhost", &ins->tls_vhost, tmp); } else if (prop_key_check("tls.ca_path", k, len) == 0) { flb_utils_set_plugin_string_property("tls.ca_path", &ins->tls_ca_path, tmp); } else if (prop_key_check("tls.ca_file", k, len) == 0) { flb_utils_set_plugin_string_property("tls.ca_file", &ins->tls_ca_file, tmp); } else if (prop_key_check("tls.crt_file", k, len) == 0) { flb_utils_set_plugin_string_property("tls.crt_file", &ins->tls_crt_file, tmp); } else if (prop_key_check("tls.key_file", k, len) == 0) { flb_utils_set_plugin_string_property("tls.key_file", &ins->tls_key_file, tmp); } else if (prop_key_check("tls.key_passwd", k, len) == 0) { flb_utils_set_plugin_string_property("tls.key_passwd", &ins->tls_key_passwd, tmp); } #endif else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) { if (strcasecmp(tmp, "off") == 0 || flb_utils_bool(tmp) == FLB_FALSE) { /* no limit for filesystem storage */ limit = -1; flb_info("[config] unlimited filesystem buffer for %s plugin", ins->name); } else { limit = flb_utils_size_to_bytes(tmp); if (limit == -1) { flb_sds_destroy(tmp); return -1; } if (limit == 0) { limit = -1; } } flb_sds_destroy(tmp); ins->total_limit_size = (size_t) limit; } else if (prop_key_check("workers", k, len) == 0 && tmp) { /* Set the number of workers */ ins->tp_workers = atoi(tmp); flb_sds_destroy(tmp); } else { /* * Create the property, we don't pass the value since we will * map it directly to avoid an extra memory allocation. */ kv = flb_kv_item_create(&ins->properties, (char *) k, NULL); if (!kv) { if (tmp) { flb_sds_destroy(tmp); } return -1; } kv->val = tmp; } return 0; } /* Configure a default hostname and TCP port if they are not set */ void flb_output_net_default(const char *host, const int port, struct flb_output_instance *ins) { /* Set default network configuration */ if (!ins->host.name) { ins->host.name = flb_sds_create(host); } if (ins->host.port == 0) { ins->host.port = port; } } /* Add thread pool for output plugin if configured with workers */ int flb_output_enable_multi_threading(struct flb_output_instance *ins, struct flb_config *config) { /* Multi-threading enabled ? (through 'workers' property) */ if (ins->tp_workers > 0) { if(flb_output_thread_pool_create(config, ins) != 0) { flb_output_instance_destroy(ins); return -1; } flb_output_thread_pool_start(ins); } return 0; } /* Return an instance name or alias */ const char *flb_output_name(struct flb_output_instance *ins) { if (ins->alias) { return ins->alias; } return ins->name; } const char *flb_output_get_property(const char *key, struct flb_output_instance *ins) { return flb_config_prop_get(key, &ins->properties); } #ifdef FLB_HAVE_METRICS void *flb_output_get_cmt_instance(struct flb_output_instance *ins) { return (void *)ins->cmt; } #endif int flb_output_net_property_check(struct flb_output_instance *ins, struct flb_config *config) { int ret = 0; /* Get Upstream net_setup configmap */ ins->net_config_map = flb_upstream_get_config_map(config); if (!ins->net_config_map) { flb_output_instance_destroy(ins); return -1; } /* * Validate 'net.*' properties: if the plugin use the Upstream interface, * it might receive some networking settings. */ if (mk_list_size(&ins->net_properties) > 0) { ret = flb_config_map_properties_check(ins->p->name, &ins->net_properties, ins->net_config_map); if (ret == -1) { if (config->program_name) { flb_helper("try the command: %s -o %s -h\n", config->program_name, ins->p->name); } return -1; } } return 0; } int flb_output_plugin_property_check(struct flb_output_instance *ins, struct flb_config *config) { int ret = 0; struct mk_list *config_map; struct flb_output_plugin *p = ins->p; if (p->config_map) { /* * Create a dynamic version of the configmap that will be used by the specific * instance in question. */ config_map = flb_config_map_create(config, p->config_map); if (!config_map) { flb_error("[output] error loading config map for '%s' plugin", p->name); flb_output_instance_destroy(ins); return -1; } ins->config_map = config_map; /* Validate incoming properties against config map */ ret = flb_config_map_properties_check(ins->p->name, &ins->properties, ins->config_map); if (ret == -1) { if (config->program_name) { flb_helper("try the command: %s -o %s -h\n", config->program_name, ins->p->name); } return -1; } } return 0; } /* Trigger the output plugins setup callbacks to prepare them. */ int flb_output_init_all(struct flb_config *config) { int ret; #ifdef FLB_HAVE_METRICS char *name; #endif struct mk_list *tmp; struct mk_list *head; struct flb_output_instance *ins; struct flb_output_plugin *p; uint64_t ts; /* Retrieve the plugin reference */ mk_list_foreach_safe(head, tmp, &config->outputs) { ins = mk_list_entry(head, struct flb_output_instance, _head); if (ins->log_level == -1) { ins->log_level = config->log->level; } p = ins->p; /* Output Events Channel */ ret = mk_event_channel_create(config->evl, &ins->ch_events[0], &ins->ch_events[1], ins); if (ret != 0) { flb_error("could not create events channels for '%s'", flb_output_name(ins)); flb_output_instance_destroy(ins); return -1; } flb_debug("[%s:%s] created event channels: read=%i write=%i", ins->p->name, flb_output_name(ins), ins->ch_events[0], ins->ch_events[1]); /* * Note: mk_event_channel_create() sets a type = MK_EVENT_NOTIFICATION by * default, we need to overwrite this value so we can do a clean check * into the Engine when the event is triggered. */ ins->event.type = FLB_ENGINE_EV_OUTPUT; /* Metrics */ #ifdef FLB_HAVE_METRICS /* Get name or alias for the instance */ name = (char *) flb_output_name(ins); /* get timestamp */ ts = cfl_time_now(); /* CMetrics */ ins->cmt = cmt_create(); if (!ins->cmt) { flb_error("[output] could not create cmetrics context"); return -1; } /* * Register generic output plugin metrics */ /* fluentbit_output_proc_records_total */ ins->cmt_proc_records = cmt_counter_create(ins->cmt, "fluentbit", "output", "proc_records_total", "Number of processed output records.", 1, (char *[]) {"name"}); cmt_counter_set(ins->cmt_proc_records, ts, 0, 1, (char *[]) {name}); /* fluentbit_output_proc_bytes_total */ ins->cmt_proc_bytes = cmt_counter_create(ins->cmt, "fluentbit", "output", "proc_bytes_total", "Number of processed output bytes.", 1, (char *[]) {"name"}); cmt_counter_set(ins->cmt_proc_bytes, ts, 0, 1, (char *[]) {name}); /* fluentbit_output_errors_total */ ins->cmt_errors = cmt_counter_create(ins->cmt, "fluentbit", "output", "errors_total", "Number of output errors.", 1, (char *[]) {"name"}); cmt_counter_set(ins->cmt_errors, ts, 0, 1, (char *[]) {name}); /* fluentbit_output_retries_total */ ins->cmt_retries = cmt_counter_create(ins->cmt, "fluentbit", "output", "retries_total", "Number of output retries.", 1, (char *[]) {"name"}); cmt_counter_set(ins->cmt_retries, ts, 0, 1, (char *[]) {name}); /* fluentbit_output_retries_failed_total */ ins->cmt_retries_failed = cmt_counter_create(ins->cmt, "fluentbit", "output", "retries_failed_total", "Number of abandoned batches because " "the maximum number of re-tries was " "reached.", 1, (char *[]) {"name"}); cmt_counter_set(ins->cmt_retries_failed, ts, 0, 1, (char *[]) {name}); /* fluentbit_output_dropped_records_total */ ins->cmt_dropped_records = cmt_counter_create(ins->cmt, "fluentbit", "output", "dropped_records_total", "Number of dropped records.", 1, (char *[]) {"name"}); cmt_counter_set(ins->cmt_dropped_records, ts, 0, 1, (char *[]) {name}); /* fluentbit_output_retried_records_total */ ins->cmt_retried_records = cmt_counter_create(ins->cmt, "fluentbit", "output", "retried_records_total", "Number of retried records.", 1, (char *[]) {"name"}); cmt_counter_set(ins->cmt_retried_records, ts, 0, 1, (char *[]) {name}); /* output_upstream_total_connections */ ins->cmt_upstream_total_connections = cmt_gauge_create(ins->cmt, "fluentbit", "output", "upstream_total_connections", "Total Connection count.", 1, (char *[]) {"name"}); cmt_gauge_set(ins->cmt_upstream_total_connections, ts, 0, 1, (char *[]) {name}); /* output_upstream_total_connections */ ins->cmt_upstream_busy_connections = cmt_gauge_create(ins->cmt, "fluentbit", "output", "upstream_busy_connections", "Busy Connection count.", 1, (char *[]) {"name"}); cmt_gauge_set(ins->cmt_upstream_busy_connections, ts, 0, 1, (char *[]) {name}); /* old API */ ins->metrics = flb_metrics_create(name); if (ins->metrics) { flb_metrics_add(FLB_METRIC_OUT_OK_RECORDS, "proc_records", ins->metrics); flb_metrics_add(FLB_METRIC_OUT_OK_BYTES, "proc_bytes", ins->metrics); flb_metrics_add(FLB_METRIC_OUT_ERROR, "errors", ins->metrics); flb_metrics_add(FLB_METRIC_OUT_RETRY, "retries", ins->metrics); flb_metrics_add(FLB_METRIC_OUT_RETRY_FAILED, "retries_failed", ins->metrics); flb_metrics_add(FLB_METRIC_OUT_DROPPED_RECORDS, "dropped_records", ins->metrics); flb_metrics_add(FLB_METRIC_OUT_RETRIED_RECORDS, "retried_records", ins->metrics); } #endif #ifdef FLB_HAVE_PROXY_GO /* Proxy plugins have their own initialization */ if (p->type == FLB_OUTPUT_PLUGIN_PROXY) { ret = flb_plugin_proxy_output_init(p->proxy, ins, config); if (ret == -1) { flb_output_instance_destroy(ins); return -1; } /* Multi-threading enabled if configured */ ret = flb_output_enable_multi_threading(ins, config); if (ret == -1) { flb_error("[output] could not start thread pool for '%s' plugin", p->name); return -1; } continue; } #endif #ifdef FLB_HAVE_TLS if (ins->use_tls == FLB_TRUE) { ins->tls = flb_tls_create(FLB_TLS_CLIENT_MODE, ins->tls_verify, ins->tls_debug, ins->tls_vhost, ins->tls_ca_path, ins->tls_ca_file, ins->tls_crt_file, ins->tls_key_file, ins->tls_key_passwd); if (!ins->tls) { flb_error("[output %s] error initializing TLS context", ins->name); flb_output_instance_destroy(ins); return -1; } } #endif /* * Before to call the initialization callback, make sure that the received * configuration parameters are valid if the plugin is registering a config map. */ if (flb_output_plugin_property_check(ins, config) == -1) { flb_output_instance_destroy(ins); return -1; } #ifdef FLB_HAVE_TLS struct flb_config_map *m; /* TLS config map (just for 'help' formatting purposes) */ ins->tls_config_map = flb_tls_get_config_map(config); if (!ins->tls_config_map) { flb_output_instance_destroy(ins); return -1; } /* Override first configmap value based on it plugin flag */ m = mk_list_entry_first(ins->tls_config_map, struct flb_config_map, _head); if (p->flags & FLB_IO_TLS) { m->value.val.boolean = FLB_TRUE; } else { m->value.val.boolean = FLB_FALSE; } #endif /* Init network defaults */ flb_net_setup_init(&ins->net_setup); if (flb_output_net_property_check(ins, config) == -1) { flb_output_instance_destroy(ins); return -1; } /* Initialize plugin through it 'init callback' */ ret = p->cb_init(ins, config, ins->data); if (ret == -1) { flb_error("[output] failed to initialize '%s' plugin", p->name); flb_output_instance_destroy(ins); return -1; } /* Multi-threading enabled if configured */ ret = flb_output_enable_multi_threading(ins, config); if (ret == -1) { flb_error("[output] could not start thread pool for '%s' plugin", flb_output_name(ins)); return -1; } /* initialize processors */ ret = flb_processor_init(ins->processor); if (ret == -1) { return -1; } } return 0; } /* Assign an Configuration context to an Output */ void flb_output_set_context(struct flb_output_instance *ins, void *context) { ins->context = context; } /* Check that at least one Output is enabled */ int flb_output_check(struct flb_config *config) { if (mk_list_is_empty(&config->outputs) == 0) { return -1; } return 0; } /* Check output plugin's log level. * Not for core plugins but for Golang plugins. * Golang plugins do not have thread-local flb_worker_ctx information. */ int flb_output_log_check(struct flb_output_instance *ins, int l) { if (ins->log_level < l) { return FLB_FALSE; } return FLB_TRUE; } /* * Output plugins might have enabled certain features that have not been passed * directly to the upstream context. In order to avoid let plugins validate specific * variables from the instance context like tls, tls.x, keepalive, etc, we populate * them directly through this function. */ int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance *ins) { int flags = 0; if (!u) { return -1; } /* TLS */ #ifdef FLB_HAVE_TLS if (ins->use_tls == FLB_TRUE) { flags |= FLB_IO_TLS; } else { flags |= FLB_IO_TCP; } #else flags |= FLB_IO_TCP; #endif /* IPv6 */ if (ins->host.ipv6 == FLB_TRUE) { flags |= FLB_IO_IPV6; } /* keepalive */ if (ins->net_setup.keepalive == FLB_TRUE) { flags |= FLB_IO_TCP_KA; } /* Set flags */ flb_stream_enable_flags(&u->base, flags); flb_upstream_set_total_connections_label(u, flb_output_name(ins)); flb_upstream_set_total_connections_gauge(u, ins->cmt_upstream_total_connections); flb_upstream_set_busy_connections_label(u, flb_output_name(ins)); flb_upstream_set_busy_connections_gauge(u, ins->cmt_upstream_busy_connections); /* * If the output plugin flush callbacks will run in multiple threads, enable * the thread safe mode for the Upstream context. */ if (ins->tp_workers > 0) { flb_stream_enable_thread_safety(&u->base); mk_list_add(&u->base._head, &ins->upstreams); } /* Set networking options 'net.*' received through instance properties */ memcpy(&u->base.net, &ins->net_setup, sizeof(struct flb_net_setup)); return 0; } int flb_output_upstream_ha_set(void *ha, struct flb_output_instance *ins) { struct mk_list *head; struct flb_upstream_node *node; struct flb_upstream_ha *upstream_ha = ha; mk_list_foreach(head, &upstream_ha->nodes) { node = mk_list_entry(head, struct flb_upstream_node, _head); flb_output_upstream_set(node->u, ins); } return 0; } /* * Helper function to set HTTP callbacks using the output instance 'callback' * context. */ int flb_output_set_http_debug_callbacks(struct flb_output_instance *ins) { #ifdef FLB_HAVE_HTTP_CLIENT_DEBUG return flb_http_client_debug_setup(ins->callback, &ins->properties); #else return 0; #endif }