summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/flb_input.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/flb_input.c')
-rw-r--r--fluent-bit/src/flb_input.c1965
1 files changed, 1965 insertions, 0 deletions
diff --git a/fluent-bit/src/flb_input.c b/fluent-bit/src/flb_input.c
new file mode 100644
index 000000000..1c4faad4d
--- /dev/null
+++ b/fluent-bit/src/flb_input.c
@@ -0,0 +1,1965 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+
+#include <monkey/mk_core.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_env.h>
+#include <fluent-bit/flb_pipe.h>
+#include <fluent-bit/flb_macros.h>
+#include <fluent-bit/flb_input.h>
+#include <fluent-bit/flb_input_thread.h>
+#include <fluent-bit/flb_error.h>
+#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_plugin_proxy.h>
+#include <fluent-bit/flb_engine.h>
+#include <fluent-bit/flb_metrics.h>
+#include <fluent-bit/flb_storage.h>
+#include <fluent-bit/flb_downstream.h>
+#include <fluent-bit/flb_plugin.h>
+#include <fluent-bit/flb_kv.h>
+#include <fluent-bit/flb_hash_table.h>
+#include <fluent-bit/flb_scheduler.h>
+#include <fluent-bit/flb_ring_buffer.h>
+#include <fluent-bit/flb_processor.h>
+
+/* input plugin macro helpers */
+#include <fluent-bit/flb_input_plugin.h>
+
+#ifdef FLB_HAVE_CHUNK_TRACE
+#include <fluent-bit/flb_chunk_trace.h>
+#endif /* FLB_HAVE_CHUNK_TRACE */
+
+struct flb_libco_in_params libco_in_param;
+pthread_key_t libco_in_param_key;
+
+#define protcmp(a, b) strncasecmp(a, b, strlen(a))
+
+/*
+ * Ring buffer size: we make space for 512 entries that each input instance can
+ * use to enqueue data. Note that this value is fixed and only affect input plugins
+ * which runs in threaded mode (separate thread)
+ *
+ * Ring buffer window: the current window size is set to 5% which means that the
+ * ring buffer will emit a flush request whenever there are 51 records or more
+ * awaiting to be consumed.
+ */
+
+#define FLB_INPUT_RING_BUFFER_SIZE (sizeof(void *) * 1024)
+#define FLB_INPUT_RING_BUFFER_WINDOW (5)
+
+
+static int check_protocol(const char *prot, const char *output)
+{
+ int len;
+
+ len = strlen(prot);
+ if (len != strlen(output)) {
+ return 0;
+ }
+
+ if (protcmp(prot, output) != 0) {
+ return 0;
+ }
+
+ return 1;
+}
+
+static inline int instance_id(struct flb_input_plugin *p,
+ struct flb_config *config) \
+{
+ int c = 0;
+ struct mk_list *head;
+ struct flb_input_instance *entry;
+
+ mk_list_foreach(head, &config->inputs) {
+ entry = mk_list_entry(head, struct flb_input_instance, _head);
+ if (entry->id == c) {
+ c++;
+ }
+ }
+
+ return c;
+}
+
+/* Generate a new collector ID for the instance in question */
+static int collector_id(struct flb_input_instance *ins)
+{
+ int id = 0;
+ struct flb_input_collector *collector;
+
+ if (mk_list_is_empty(&ins->collectors) == 0) {
+ return id;
+ }
+
+ collector = mk_list_entry_last(&ins->collectors,
+ struct flb_input_collector,
+ _head);
+ return (collector->id + 1);
+}
+
+void flb_input_net_default_listener(const char *listen, int port,
+ struct flb_input_instance *ins)
+{
+ /* Set default network configuration */
+ if (!ins->host.listen) {
+ ins->host.listen = flb_sds_create(listen);
+ }
+ if (ins->host.port == 0) {
+ ins->host.port = port;
+ }
+}
+
+/* Check input plugin's log level.
+ * Not for core plugins but for Golang plugins.
+ * Golang plugins do not have thread-local flb_worker_ctx information. */
+int flb_input_log_check(struct flb_input_instance *ins, int l)
+{
+ if (ins->log_level < l) {
+ return FLB_FALSE;
+ }
+
+ return FLB_TRUE;
+}
+
+/* Create an input plugin instance */
+struct flb_input_instance *flb_input_new(struct flb_config *config,
+ const char *input, void *data,
+ int public_only)
+{
+ int id;
+ int ret;
+ int flags = 0;
+ struct mk_list *head;
+ struct flb_input_plugin *plugin;
+ struct flb_input_instance *instance = NULL;
+
+/* use for locking the use of the chunk trace context. */
+#ifdef FLB_HAVE_CHUNK_TRACE
+ pthread_mutexattr_t attr = {0};
+ pthread_mutexattr_init(&attr);
+#endif
+
+ if (!input) {
+ return NULL;
+ }
+
+ mk_list_foreach(head, &config->in_plugins) {
+ plugin = mk_list_entry(head, struct flb_input_plugin, _head);
+ if (!check_protocol(plugin->name, input)) {
+ plugin = NULL;
+ continue;
+ }
+
+ /*
+ * Check if the plugin is private and validate the 'public_only'
+ * requirement.
+ */
+ if (public_only == FLB_TRUE && plugin->flags & FLB_INPUT_PRIVATE) {
+ return NULL;
+ }
+
+ /* Create plugin instance */
+ instance = flb_calloc(1, sizeof(struct flb_input_instance));
+ if (!instance) {
+ flb_errno();
+ return NULL;
+ }
+ instance->config = config;
+
+ /* Get an ID */
+ id = instance_id(plugin, config);
+
+ /* Index for log Chunks (hash table) */
+ instance->ht_log_chunks = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE,
+ 512, 0);
+ if (!instance->ht_log_chunks) {
+ flb_free(instance);
+ return NULL;
+ }
+
+ /* Index for metric Chunks (hash table) */
+ instance->ht_metric_chunks = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE,
+ 512, 0);
+ if (!instance->ht_metric_chunks) {
+ flb_hash_table_destroy(instance->ht_log_chunks);
+ flb_free(instance);
+ return NULL;
+ }
+
+ /* Index for trace Chunks (hash table) */
+ instance->ht_trace_chunks = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE,
+ 512, 0);
+ if (!instance->ht_trace_chunks) {
+ flb_hash_table_destroy(instance->ht_log_chunks);
+ flb_hash_table_destroy(instance->ht_metric_chunks);
+ flb_free(instance);
+ return NULL;
+ }
+
+ /* format name (with instance id) */
+ snprintf(instance->name, sizeof(instance->name) - 1,
+ "%s.%i", plugin->name, id);
+
+ if (plugin->type == FLB_INPUT_PLUGIN_CORE) {
+ instance->context = NULL;
+ }
+ else {
+ struct flb_plugin_proxy_context *ctx;
+
+ ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
+ if (!ctx) {
+ flb_errno();
+ flb_free(instance);
+ return NULL;
+ }
+
+ ctx->proxy = plugin->proxy;
+
+ instance->context = ctx;
+ }
+
+ /* initialize remaining vars */
+ instance->alias = NULL;
+ instance->id = id;
+ instance->flags = plugin->flags;
+ instance->p = plugin;
+ instance->tag = NULL;
+ instance->tag_len = 0;
+ instance->tag_default = FLB_FALSE;
+ instance->routable = FLB_TRUE;
+ instance->data = data;
+ instance->storage = NULL;
+ instance->storage_type = -1;
+ instance->log_level = -1;
+ instance->log_suppress_interval = -1;
+ instance->runs_in_coroutine = FLB_FALSE;
+
+ /* net */
+ instance->host.name = NULL;
+ instance->host.address = NULL;
+ instance->host.uri = NULL;
+ instance->host.listen = NULL;
+ instance->host.ipv6 = FLB_FALSE;
+
+ /* Initialize list heads */
+ mk_list_init(&instance->routes_direct);
+ mk_list_init(&instance->routes);
+ mk_list_init(&instance->tasks);
+ mk_list_init(&instance->chunks);
+ mk_list_init(&instance->collectors);
+ mk_list_init(&instance->input_coro_list);
+ mk_list_init(&instance->input_coro_list_destroy);
+ mk_list_init(&instance->downstreams);
+ mk_list_init(&instance->upstreams);
+
+ /* Initialize properties list */
+ flb_kv_init(&instance->properties);
+ flb_kv_init(&instance->net_properties);
+
+ /* Plugin use networking */
+ if (plugin->flags & (FLB_INPUT_NET | FLB_INPUT_NET_SERVER)) {
+ ret = flb_net_host_set(plugin->name, &instance->host, input);
+ if (ret != 0) {
+ flb_free(instance);
+ return NULL;
+ }
+ }
+
+/* initialize lock for access to chunk trace context. */
+#ifdef FLB_HAVE_CHUNK_TRACE
+ pthread_mutex_init(&instance->chunk_trace_lock, &attr);
+#endif
+
+ /* Parent plugin flags */
+ flags = instance->flags;
+ if (flags & FLB_IO_TCP) {
+ instance->use_tls = FLB_FALSE;
+ }
+ else if (flags & FLB_IO_TLS) {
+ instance->use_tls = FLB_TRUE;
+ }
+ else if (flags & FLB_IO_OPT_TLS) {
+ /* TLS must be enabled manually in the config */
+ instance->use_tls = FLB_FALSE;
+ instance->flags |= FLB_IO_TLS;
+ }
+
+#ifdef FLB_HAVE_TLS
+ instance->tls = NULL;
+ instance->tls_debug = -1;
+ instance->tls_verify = FLB_TRUE;
+ instance->tls_vhost = NULL;
+ instance->tls_ca_path = NULL;
+ instance->tls_ca_file = NULL;
+ instance->tls_crt_file = NULL;
+ instance->tls_key_file = NULL;
+ instance->tls_key_passwd = NULL;
+#endif
+
+ /* Plugin requires a co-routine context ? */
+ if (plugin->flags & FLB_INPUT_CORO) {
+ instance->runs_in_coroutine = FLB_TRUE;
+ }
+
+ /* Plugin will run in a separate thread ? */
+ if (plugin->flags & FLB_INPUT_THREADED) {
+ instance->is_threaded = FLB_TRUE;
+
+ }
+
+ /* allocate a ring buffer */
+ instance->rb = flb_ring_buffer_create(FLB_INPUT_RING_BUFFER_SIZE);
+ if (!instance->rb) {
+ flb_error("instance %s could not initialize ring buffer",
+ flb_input_name(instance));
+ flb_free(instance);
+ return NULL;
+ }
+
+ instance->mem_buf_status = FLB_INPUT_RUNNING;
+ instance->mem_buf_limit = 0;
+ instance->mem_chunks_size = 0;
+ instance->storage_buf_status = FLB_INPUT_RUNNING;
+ mk_list_add(&instance->_head, &config->inputs);
+
+ /* processor instance */
+ instance->processor = flb_processor_create(config, instance->name, instance, FLB_PLUGIN_INPUT);
+ }
+
+ return instance;
+}
+
+static inline int prop_key_check(const char *key, const char *kv, int k_len)
+{
+ int len;
+
+ len = strlen(key);
+
+ if (strncasecmp(key, kv, k_len) == 0 && len == k_len) {
+ return 0;
+ }
+
+ return -1;
+}
+
+struct flb_input_instance *flb_input_get_instance(struct flb_config *config,
+ int ins_id)
+{
+ struct mk_list *head;
+ struct flb_input_instance *ins;
+
+ mk_list_foreach(head, &config->inputs) {
+ ins = mk_list_entry(head, struct flb_input_instance, _head);
+ if (ins->id == ins_id) {
+ break;
+ }
+ ins = NULL;
+ }
+
+ if (!ins) {
+ return NULL;
+ }
+
+ return ins;
+}
+
+static void flb_input_coro_destroy(struct flb_input_coro *input_coro)
+{
+ flb_debug("[input coro] destroy coro_id=%i", input_coro->id);
+
+ mk_list_del(&input_coro->_head);
+ flb_coro_destroy(input_coro->coro);
+ flb_free(input_coro);
+}
+
+int flb_input_coro_finished(struct flb_config *config, int ins_id)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_input_instance *ins;
+ struct flb_input_coro *input_coro;
+
+ ins = flb_input_get_instance(config, ins_id);
+ if (!ins) {
+ return -1;
+ }
+
+ /* Look for input coroutines that needs to be destroyed */
+ mk_list_foreach_safe(head, tmp, &ins->input_coro_list_destroy) {
+ input_coro = mk_list_entry(head, struct flb_input_coro, _head);
+ flb_input_coro_destroy(input_coro);
+ }
+
+ return 0;
+}
+
+void flb_input_coro_prepare_destroy(struct flb_input_coro *input_coro)
+{
+ struct flb_input_instance *ins = input_coro->ins;
+
+ /* move flb_input_coro from 'input_coro_list' to 'input_coro_list_destroy' */
+ mk_list_del(&input_coro->_head);
+ mk_list_add(&input_coro->_head, &ins->input_coro_list_destroy);
+}
+
+int flb_input_name_exists(const char *name, struct flb_config *config)
+{
+ struct mk_list *head;
+ struct flb_input_instance *ins;
+
+ mk_list_foreach(head, &config->inputs) {
+ ins = mk_list_entry(head, struct flb_input_instance, _head);
+ if (strcmp(ins->name, name) == 0) {
+ return FLB_TRUE;
+ }
+
+ if (ins->alias) {
+ if (strcmp(ins->alias, name) == 0) {
+ return FLB_TRUE;
+ }
+ }
+ }
+
+ return FLB_FALSE;
+}
+
+struct mk_event_loop *flb_input_event_loop_get(struct flb_input_instance *ins)
+{
+ struct flb_input_thread_instance *thi;
+
+ if (flb_input_is_threaded(ins)) {
+ thi = ins->thi;
+ return thi->evl;
+ }
+
+ return ins->config->evl;
+}
+
+/* Override a configuration property for the given input_instance plugin */
+int flb_input_set_property(struct flb_input_instance *ins,
+ const char *k, const char *v)
+{
+ int len;
+ int ret;
+ int enabled;
+ ssize_t limit;
+ flb_sds_t tmp = NULL;
+ struct flb_kv *kv;
+
+ len = strlen(k);
+ tmp = flb_env_var_translate(ins->config->env, v);
+ if (tmp) {
+ if (flb_sds_len(tmp) == 0) {
+ flb_sds_destroy(tmp);
+ tmp = NULL;
+ }
+ }
+
+ /* Check if the key is a known/shared property */
+ if (prop_key_check("tag", k, len) == 0 && tmp) {
+ flb_utils_set_plugin_string_property("tag", &ins->tag, tmp);
+ ins->tag_len = flb_sds_len(tmp);
+ ins->tag_default = FLB_FALSE;
+ }
+ else if (prop_key_check("log_level", k, len) == 0 && tmp) {
+ ret = flb_log_get_level_str(tmp);
+ flb_sds_destroy(tmp);
+ if (ret == -1) {
+ return -1;
+ }
+ ins->log_level = ret;
+ }
+ else if (prop_key_check("log_suppress_interval", k, len) == 0 && tmp) {
+ ret = flb_utils_time_to_seconds(tmp);
+ flb_sds_destroy(tmp);
+ if (ret == -1) {
+ return -1;
+ }
+ ins->log_suppress_interval = ret;
+ }
+ else if (prop_key_check("routable", k, len) == 0 && tmp) {
+ ins->routable = flb_utils_bool(tmp);
+ flb_sds_destroy(tmp);
+ }
+ else if (prop_key_check("alias", k, len) == 0 && tmp) {
+ flb_utils_set_plugin_string_property("alias", &ins->alias, tmp);
+ }
+ else if (prop_key_check("mem_buf_limit", k, len) == 0 && tmp) {
+ limit = flb_utils_size_to_bytes(tmp);
+ flb_sds_destroy(tmp);
+ if (limit == -1) {
+ return -1;
+ }
+ ins->mem_buf_limit = (size_t) limit;
+ }
+ else if (prop_key_check("listen", k, len) == 0) {
+ flb_utils_set_plugin_string_property("listen", &ins->host.listen, tmp);
+ }
+ else if (prop_key_check("host", k, len) == 0) {
+ flb_utils_set_plugin_string_property("host", &ins->host.name, tmp);
+ }
+ else if (prop_key_check("port", k, len) == 0) {
+ if (tmp) {
+ ins->host.port = atoi(tmp);
+ flb_sds_destroy(tmp);
+ }
+ }
+ else if (prop_key_check("ipv6", k, len) == 0 && tmp) {
+ ins->host.ipv6 = flb_utils_bool(tmp);
+ flb_sds_destroy(tmp);
+ }
+ else if (strncasecmp("net.", k, 4) == 0 && tmp) {
+ kv = flb_kv_item_create(&ins->net_properties, (char *) k, NULL);
+ if (!kv) {
+ if (tmp) {
+ flb_sds_destroy(tmp);
+ }
+ return -1;
+ }
+ kv->val = tmp;
+ }
+
+#ifdef FLB_HAVE_TLS
+ else if (prop_key_check("tls", k, len) == 0 && tmp) {
+ ins->use_tls = flb_utils_bool(tmp);
+ if (ins->use_tls == FLB_TRUE && ((ins->flags & FLB_IO_TLS) == 0)) {
+ flb_error("[config] %s does not support TLS", ins->name);
+ flb_sds_destroy(tmp);
+ return -1;
+ }
+ flb_sds_destroy(tmp);
+ }
+ else if (prop_key_check("tls.verify", k, len) == 0 && tmp) {
+ ins->tls_verify = flb_utils_bool(tmp);
+ flb_sds_destroy(tmp);
+ }
+ else if (prop_key_check("tls.debug", k, len) == 0 && tmp) {
+ ins->tls_debug = atoi(tmp);
+ flb_sds_destroy(tmp);
+ }
+ else if (prop_key_check("tls.vhost", k, len) == 0) {
+ flb_utils_set_plugin_string_property("tls.vhost", &ins->tls_vhost, tmp);
+ }
+ else if (prop_key_check("tls.ca_path", k, len) == 0) {
+ flb_utils_set_plugin_string_property("tls.ca_path", &ins->tls_ca_path, tmp);
+ }
+ else if (prop_key_check("tls.ca_file", k, len) == 0) {
+ flb_utils_set_plugin_string_property("tls.ca_file", &ins->tls_ca_file, tmp);
+ }
+ else if (prop_key_check("tls.crt_file", k, len) == 0) {
+ flb_utils_set_plugin_string_property("tls.crt_file", &ins->tls_crt_file, tmp);
+ }
+ else if (prop_key_check("tls.key_file", k, len) == 0) {
+ flb_utils_set_plugin_string_property("tls.key_file", &ins->tls_key_file, tmp);
+ }
+ else if (prop_key_check("tls.key_passwd", k, len) == 0) {
+ flb_utils_set_plugin_string_property("tls.key_passwd", &ins->tls_key_passwd, tmp);
+ }
+#endif
+ else if (prop_key_check("storage.type", k, len) == 0 && tmp) {
+ /* Set the storage type */
+ if (strcasecmp(tmp, "filesystem") == 0) {
+ ins->storage_type = FLB_STORAGE_FS;
+ }
+ else if (strcasecmp(tmp, "memory") == 0) {
+ ins->storage_type = FLB_STORAGE_MEM;
+ }
+ else if (strcasecmp(tmp, "memrb") == 0) {
+ ins->storage_type = FLB_STORAGE_MEMRB;
+ }
+ else {
+ flb_sds_destroy(tmp);
+ return -1;
+ }
+ flb_sds_destroy(tmp);
+ }
+ else if (prop_key_check("threaded", k, len) == 0 && tmp) {
+ enabled = flb_utils_bool(tmp);
+ flb_sds_destroy(tmp);
+
+ if (enabled == -1) {
+ return -1;
+ }
+
+ ins->is_threaded = enabled;
+ }
+ else if (prop_key_check("storage.pause_on_chunks_overlimit", k, len) == 0 && tmp) {
+ if (ins->storage_type == FLB_STORAGE_FS) {
+ ret = flb_utils_bool(tmp);
+ flb_sds_destroy(tmp);
+ if (ret == -1) {
+ return -1;
+ }
+ ins->storage_pause_on_chunks_overlimit = ret;
+ }
+ }
+ else {
+ /*
+ * Create the property, we don't pass the value since we will
+ * map it directly to avoid an extra memory allocation.
+ */
+ kv = flb_kv_item_create(&ins->properties, (char *) k, NULL);
+ if (!kv) {
+ if (tmp) {
+ flb_sds_destroy(tmp);
+ }
+ return -1;
+ }
+ kv->val = tmp;
+ }
+
+ return 0;
+}
+
+const char *flb_input_get_property(const char *key,
+ struct flb_input_instance *ins)
+{
+ return flb_config_prop_get(key, &ins->properties);
+}
+
+#ifdef FLB_HAVE_METRICS
+void *flb_input_get_cmt_instance(struct flb_input_instance *ins)
+{
+ return (void *)ins->cmt;
+}
+#endif
+
+/* Return an instance name or alias */
+const char *flb_input_name(struct flb_input_instance *ins)
+{
+ if (ins->alias) {
+ return ins->alias;
+ }
+
+ return ins->name;
+}
+
+void flb_input_instance_destroy(struct flb_input_instance *ins)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_input_collector *collector;
+
+ if (ins->alias) {
+ flb_sds_destroy(ins->alias);
+ }
+
+ /* Remove URI context */
+ if (ins->host.uri) {
+ flb_uri_destroy(ins->host.uri);
+ }
+
+ if (ins->host.name) {
+ flb_sds_destroy(ins->host.name);
+ }
+ if (ins->host.address) {
+ flb_sds_destroy(ins->host.address);
+ }
+ if (ins->host.listen) {
+ flb_sds_destroy(ins->host.listen);
+ }
+
+#ifdef FLB_HAVE_TLS
+ if (ins->use_tls) {
+ if (ins->tls != NULL) {
+ flb_tls_destroy(ins->tls);
+ }
+ }
+
+ if (ins->tls_config_map) {
+ flb_config_map_destroy(ins->tls_config_map);
+ }
+#endif
+
+ if (ins->tls_vhost) {
+ flb_sds_destroy(ins->tls_vhost);
+ }
+
+ if (ins->tls_ca_path) {
+ flb_sds_destroy(ins->tls_ca_path);
+ }
+
+ if (ins->tls_ca_file) {
+ flb_sds_destroy(ins->tls_ca_file);
+ }
+
+ if (ins->tls_crt_file) {
+ flb_sds_destroy(ins->tls_crt_file);
+ }
+
+ if (ins->tls_key_file) {
+ flb_sds_destroy(ins->tls_key_file);
+ }
+
+ if (ins->tls_key_passwd) {
+ flb_sds_destroy(ins->tls_key_passwd);
+ }
+
+ /* release the tag if any */
+ flb_sds_destroy(ins->tag);
+
+ /* Let the engine remove any pending task */
+ flb_engine_destroy_tasks(&ins->tasks);
+
+ /* release properties */
+ flb_kv_release(&ins->properties);
+ flb_kv_release(&ins->net_properties);
+
+
+#ifdef FLB_HAVE_CHUNK_TRACE
+ flb_chunk_trace_context_destroy(ins);
+#endif /* FLB_HAVE_CHUNK_TRACE */
+
+ /* Remove metrics */
+#ifdef FLB_HAVE_METRICS
+ if (ins->cmt) {
+ cmt_destroy(ins->cmt);
+ }
+
+ if (ins->metrics) {
+ flb_metrics_destroy(ins->metrics);
+ }
+#endif
+
+ if (ins->storage) {
+ flb_storage_input_destroy(ins);
+ }
+
+ /* destroy config map */
+ if (ins->config_map) {
+ flb_config_map_destroy(ins->config_map);
+ }
+
+ if (ins->net_config_map) {
+ flb_config_map_destroy(ins->net_config_map);
+ }
+
+ /* hash table for chunks */
+ if (ins->ht_log_chunks) {
+ flb_hash_table_destroy(ins->ht_log_chunks);
+ }
+
+ if (ins->ht_metric_chunks) {
+ flb_hash_table_destroy(ins->ht_metric_chunks);
+ }
+
+ if (ins->ht_trace_chunks) {
+ flb_hash_table_destroy(ins->ht_trace_chunks);
+ }
+
+ if (ins->ch_events[0] > 0) {
+ mk_event_closesocket(ins->ch_events[0]);
+ }
+
+ if (ins->ch_events[1] > 0) {
+ mk_event_closesocket(ins->ch_events[1]);
+ }
+
+ /* Collectors */
+ mk_list_foreach_safe(head, tmp, &ins->collectors) {
+ collector = mk_list_entry(head, struct flb_input_collector, _head);
+ mk_list_del(&collector->_head);
+ flb_input_collector_destroy(collector);
+ }
+
+ /* delete storage context */
+ flb_storage_input_destroy(ins);
+
+ mk_list_del(&ins->_head);
+
+ /* ring buffer */
+ if (ins->rb) {
+ flb_input_chunk_ring_buffer_cleanup(ins);
+ flb_ring_buffer_destroy(ins->rb);
+ }
+
+ /* processor */
+ if (ins->processor) {
+ flb_processor_destroy(ins->processor);
+ }
+ flb_free(ins);
+}
+
+int flb_input_coro_id_get(struct flb_input_instance *ins)
+{
+ int id;
+ int max = (2 << 13) - 1; /* max for 14 bits */
+
+ id = ins->input_coro_id;
+ ins->input_coro_id++;
+
+ /* reset once it reach the maximum allowed */
+ if (ins->input_coro_id > max) {
+ ins->input_coro_id = 0;
+ }
+
+ return id;
+}
+
+static int input_instance_channel_events_init(struct flb_input_instance *ins)
+{
+ int ret;
+ struct mk_event_loop *evl;
+
+ evl = flb_input_event_loop_get(ins);
+
+ /* Input event channel: used for co-routines to report return status */
+ ret = mk_event_channel_create(evl,
+ &ins->ch_events[0],
+ &ins->ch_events[1],
+ ins);
+ if (ret != 0) {
+ flb_error("could not create events channels for '%s'",
+ flb_input_name(ins));
+ return -1;
+ }
+
+ flb_debug("[%s:%s] created event channels: read=%i write=%i",
+ ins->p->name, flb_input_name(ins),
+ ins->ch_events[0], ins->ch_events[1]);
+
+ /*
+ * Note: mk_event_channel_create() sets a type = MK_EVENT_NOTIFICATION by
+ * default, we need to overwrite this value so we can do a clean check
+ * into the Engine when the event is triggered.
+ */
+ ins->event.type = FLB_ENGINE_EV_INPUT;
+
+ return 0;
+}
+
+int flb_input_net_property_check(struct flb_input_instance *ins,
+ struct flb_config *config)
+{
+ int ret = 0;
+
+ /* Get Downstream net_setup configmap */
+ ins->net_config_map = flb_downstream_get_config_map(config);
+ if (!ins->net_config_map) {
+ flb_input_instance_destroy(ins);
+ return -1;
+ }
+
+ /*
+ * Validate 'net.*' properties: if the plugin use the Downstream interface,
+ * it might receive some networking settings.
+ */
+ if (mk_list_size(&ins->net_properties) > 0) {
+ ret = flb_config_map_properties_check(ins->p->name,
+ &ins->net_properties,
+ ins->net_config_map);
+ if (ret == -1) {
+ if (config->program_name) {
+ flb_helper("try the command: %s -i %s -h\n",
+ config->program_name, ins->p->name);
+ }
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+int flb_input_plugin_property_check(struct flb_input_instance *ins,
+ struct flb_config *config)
+{
+ int ret = 0;
+ struct mk_list *config_map;
+ struct flb_input_plugin *p = ins->p;
+
+ if (p->config_map) {
+ /*
+ * Create a dynamic version of the configmap that will be used by the specific
+ * instance in question.
+ */
+ config_map = flb_config_map_create(config, p->config_map);
+ if (!config_map) {
+ flb_error("[input] error loading config map for '%s' plugin",
+ p->name);
+ flb_input_instance_destroy(ins);
+ return -1;
+ }
+ ins->config_map = config_map;
+
+ /* Validate incoming properties against config map */
+ ret = flb_config_map_properties_check(ins->p->name,
+ &ins->properties, ins->config_map);
+ if (ret == -1) {
+ if (config->program_name) {
+ flb_helper("try the command: %s -i %s -h\n",
+ config->program_name, ins->p->name);
+ }
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+int flb_input_instance_init(struct flb_input_instance *ins,
+ struct flb_config *config)
+{
+ int ret;
+ struct flb_config *ctx = ins->config;
+ struct flb_input_plugin *p = ins->p;
+ int tls_session_mode;
+
+ if (ins->log_level == -1 && config->log != NULL) {
+ ins->log_level = config->log->level;
+ }
+
+ /* Skip pseudo input plugins */
+ if (!p) {
+ return 0;
+ }
+
+
+#ifdef FLB_HAVE_METRICS
+ uint64_t ts;
+ char *name;
+
+ name = (char *) flb_input_name(ins);
+ ts = cfl_time_now();
+
+ /* CMetrics */
+ ins->cmt = cmt_create();
+ if (!ins->cmt) {
+ flb_error("[input] could not create cmetrics context: %s",
+ flb_input_name(ins));
+ return -1;
+ }
+
+ /*
+ * Register generic input plugin metrics
+ * -------------------------------------
+ */
+
+ /* fluentbit_input_bytes_total */
+ ins->cmt_bytes = \
+ cmt_counter_create(ins->cmt,
+ "fluentbit", "input", "bytes_total",
+ "Number of input bytes.",
+ 1, (char *[]) {"name"});
+ cmt_counter_set(ins->cmt_bytes, ts, 0, 1, (char *[]) {name});
+
+ /* fluentbit_input_records_total */
+ ins->cmt_records = \
+ cmt_counter_create(ins->cmt,
+ "fluentbit", "input", "records_total",
+ "Number of input records.",
+ 1, (char *[]) {"name"});
+ cmt_counter_set(ins->cmt_records, ts, 0, 1, (char *[]) {name});
+
+ /* Storage Metrics */
+ if (ctx->storage_metrics == FLB_TRUE) {
+ /* fluentbit_input_storage_overlimit */
+ ins->cmt_storage_overlimit = \
+ cmt_gauge_create(ins->cmt,
+ "fluentbit", "input",
+ "storage_overlimit",
+ "Is the input memory usage overlimit ?.",
+ 1, (char *[]) {"name"});
+ cmt_gauge_set(ins->cmt_storage_overlimit, ts, 0, 1, (char *[]) {name});
+
+ /* fluentbit_input_storage_memory_bytes */
+ ins->cmt_storage_memory_bytes = \
+ cmt_gauge_create(ins->cmt,
+ "fluentbit", "input",
+ "storage_memory_bytes",
+ "Memory bytes used by the chunks.",
+ 1, (char *[]) {"name"});
+ cmt_gauge_set(ins->cmt_storage_memory_bytes, ts, 0, 1, (char *[]) {name});
+
+ /* fluentbit_input_storage_chunks */
+ ins->cmt_storage_chunks = \
+ cmt_gauge_create(ins->cmt,
+ "fluentbit", "input",
+ "storage_chunks",
+ "Total number of chunks.",
+ 1, (char *[]) {"name"});
+ cmt_gauge_set(ins->cmt_storage_chunks, ts, 0, 1, (char *[]) {name});
+
+ /* fluentbit_input_storage_chunks_up */
+ ins->cmt_storage_chunks_up = \
+ cmt_gauge_create(ins->cmt,
+ "fluentbit", "input",
+ "storage_chunks_up",
+ "Total number of chunks up in memory.",
+ 1, (char *[]) {"name"});
+ cmt_gauge_set(ins->cmt_storage_chunks_up, ts, 0, 1, (char *[]) {name});
+
+ /* fluentbit_input_storage_chunks_down */
+ ins->cmt_storage_chunks_down = \
+ cmt_gauge_create(ins->cmt,
+ "fluentbit", "input",
+ "storage_chunks_down",
+ "Total number of chunks down.",
+ 1, (char *[]) {"name"});
+ cmt_gauge_set(ins->cmt_storage_chunks_down, ts, 0, 1, (char *[]) {name});
+
+ /* fluentbit_input_storage_chunks_busy */
+ ins->cmt_storage_chunks_busy = \
+ cmt_gauge_create(ins->cmt,
+ "fluentbit", "input",
+ "storage_chunks_busy",
+ "Total number of chunks in a busy state.",
+ 1, (char *[]) {"name"});
+ cmt_gauge_set(ins->cmt_storage_chunks_busy, ts, 0, 1, (char *[]) {name});
+
+ /* fluentbit_input_storage_chunks_busy_bytes */
+ ins->cmt_storage_chunks_busy_bytes = \
+ cmt_gauge_create(ins->cmt,
+ "fluentbit", "input",
+ "storage_chunks_busy_bytes",
+ "Total number of bytes used by chunks in a busy state.",
+ 1, (char *[]) {"name"});
+ cmt_gauge_set(ins->cmt_storage_chunks_busy_bytes, ts, 0, 1, (char *[]) {name});
+ }
+
+ if (ins->storage_type == FLB_STORAGE_MEMRB) {
+ /* fluentbit_input_memrb_dropped_chunks */
+ ins->cmt_memrb_dropped_chunks = cmt_counter_create(ins->cmt,
+ "fluentbit", "input",
+ "memrb_dropped_chunks",
+ "Number of memrb dropped chunks.",
+ 1, (char *[]) {"name"});
+ cmt_counter_set(ins->cmt_memrb_dropped_chunks, ts, 0, 1, (char *[]) {name});
+
+
+ /* fluentbit_input_memrb_dropped_bytes */
+ ins->cmt_memrb_dropped_bytes = cmt_counter_create(ins->cmt,
+ "fluentbit", "input",
+ "memrb_dropped_bytes",
+ "Number of memrb dropped bytes.",
+ 1, (char *[]) {"name"});
+
+ cmt_counter_set(ins->cmt_memrb_dropped_bytes, ts, 0, 1, (char *[]) {name});
+ }
+
+ /* OLD Metrics */
+ ins->metrics = flb_metrics_create(name);
+ if (ins->metrics) {
+ flb_metrics_add(FLB_METRIC_N_RECORDS, "records", ins->metrics);
+ flb_metrics_add(FLB_METRIC_N_BYTES, "bytes", ins->metrics);
+ }
+#endif
+
+ /*
+ * Before to call the initialization callback, make sure that the received
+ * configuration parameters are valid if the plugin is registering a config map.
+ */
+ if (flb_input_plugin_property_check(ins, config) == -1) {
+ return -1;
+ }
+
+#ifdef FLB_HAVE_TLS
+ if (ins->use_tls == FLB_TRUE) {
+ if ((p->flags & FLB_INPUT_NET_SERVER) != 0) {
+ if (ins->tls_crt_file == NULL) {
+ flb_error("[input %s] error initializing TLS context "
+ "(certificate file missing)",
+ ins->name);
+
+ return -1;
+ }
+ else if (ins->tls_key_file == NULL) {
+ flb_error("[input %s] error initializing TLS context "
+ "(private key file missing)",
+ ins->name);
+
+ return -1;
+ }
+
+ tls_session_mode = FLB_TLS_SERVER_MODE;
+ }
+ else {
+ tls_session_mode = FLB_TLS_CLIENT_MODE;
+ }
+
+ ins->tls = flb_tls_create(tls_session_mode,
+ ins->tls_verify,
+ ins->tls_debug,
+ ins->tls_vhost,
+ ins->tls_ca_path,
+ ins->tls_ca_file,
+ ins->tls_crt_file,
+ ins->tls_key_file,
+ ins->tls_key_passwd);
+
+ if (ins->tls == NULL) {
+ flb_error("[input %s] error initializing TLS context",
+ ins->name);
+
+ return -1;
+ }
+ }
+
+ struct flb_config_map *m;
+
+ /* TLS config map (just for 'help' formatting purposes) */
+ ins->tls_config_map = flb_tls_get_config_map(config);
+
+ if (ins->tls_config_map == NULL) {
+ return -1;
+ }
+
+ /* Override first configmap value based on it plugin flag */
+ m = mk_list_entry_first(ins->tls_config_map, struct flb_config_map, _head);
+ if (p->flags & FLB_IO_TLS) {
+ m->value.val.boolean = FLB_TRUE;
+ }
+ else {
+ m->value.val.boolean = FLB_FALSE;
+ }
+#endif
+
+ /* Init network defaults */
+ flb_net_setup_init(&ins->net_setup);
+
+ if (flb_input_net_property_check(ins, config) == -1) {
+ return -1;
+ }
+
+ /* Initialize the input */
+ if (p->cb_init) {
+ flb_plg_info(ins, "initializing");
+ flb_plg_info(ins, "storage_strategy=%s", flb_storage_get_type(ins->storage_type));
+
+ /* Sanity check: all non-dynamic tag input plugins must have a tag */
+ if (!ins->tag) {
+ flb_input_set_property(ins, "tag", ins->name);
+ ins->tag_default = FLB_TRUE;
+ }
+
+ if (flb_input_is_threaded(ins)) {
+ /*
+ * Create a thread for a new instance. Now the plugin initialization callback will be invoked and report an early failure
+ * or an 'ok' status, we will wait for that return value on flb_input_thread_instance_get_status() below.
+ */
+ ret = flb_input_thread_instance_init(config, ins);
+ if (ret != 0) {
+ flb_error("failed initialize input %s",
+ ins->name);
+ return -1;
+ }
+
+ /* initialize channel events */
+ ret = input_instance_channel_events_init(ins);
+ if (ret != 0) {
+ flb_error("failed initialize channel events on input %s",
+ ins->name);
+ return -1;
+ }
+
+ /* register the ring buffer */
+ ret = flb_ring_buffer_add_event_loop(ins->rb, config->evl, FLB_INPUT_RING_BUFFER_WINDOW);
+ if (ret) {
+ flb_error("failed while registering ring buffer events on input %s",
+ ins->name);
+ return -1;
+ }
+ }
+ else {
+ /* initialize channel events */
+ ret = input_instance_channel_events_init(ins);
+ if (ret != 0) {
+ flb_error("failed initialize channel events on input %s",
+ ins->name);
+ }
+ ret = p->cb_init(ins, config, ins->data);
+ if (ret != 0) {
+ flb_error("failed initialize input %s",
+ ins->name);
+ return -1;
+ }
+ }
+ }
+
+ /* initialize processors */
+ ret = flb_processor_init(ins->processor);
+ if (ret == -1) {
+ return -1;
+ }
+
+ return 0;
+}
+
+int flb_input_instance_pre_run(struct flb_input_instance *ins, struct flb_config *config)
+{
+ int ret;
+
+ if (flb_input_is_threaded(ins)) {
+ return flb_input_thread_instance_pre_run(config, ins);
+ }
+ else if (ins->p->cb_pre_run) {
+ ret = ins->p->cb_pre_run(ins, config, ins->context);
+ if (ret == -1) {
+ return -1;
+ }
+ return 0;
+ }
+
+ return 0;
+}
+
+/* Initialize all inputs */
+int flb_input_init_all(struct flb_config *config)
+{
+ int ret;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_input_instance *ins;
+ struct flb_input_plugin *p;
+
+ /* Initialize thread-id table */
+ memset(&config->in_table_id, '\0', sizeof(config->in_table_id));
+
+ /* Iterate all active input instance plugins */
+ mk_list_foreach_safe(head, tmp, &config->inputs) {
+ ins = mk_list_entry(head, struct flb_input_instance, _head);
+ p = ins->p;
+
+ /* Skip pseudo input plugins */
+ if (!p) {
+ continue;
+ }
+
+ /* Initialize instance */
+ ret = flb_input_instance_init(ins, config);
+ if (ret == -1) {
+ flb_input_instance_destroy(ins);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/* Invoke all pre-run input callbacks */
+void flb_input_pre_run_all(struct flb_config *config)
+{
+ struct mk_list *head;
+ struct flb_input_instance *ins;
+ struct flb_input_plugin *p;
+
+ mk_list_foreach(head, &config->inputs) {
+ ins = mk_list_entry(head, struct flb_input_instance, _head);
+ p = ins->p;
+ if (!p) {
+ continue;
+ }
+
+ flb_input_instance_pre_run(ins, config);
+ }
+}
+
+void flb_input_instance_exit(struct flb_input_instance *ins,
+ struct flb_config *config)
+{
+ struct flb_input_plugin *p;
+
+ /* if the instance runs in a separate thread, signal the thread */
+ if (flb_input_is_threaded(ins)) {
+ flb_input_thread_instance_exit(ins);
+ return;
+ }
+
+ p = ins->p;
+ if (p->cb_exit && ins->context) {
+ /* Multi-threaded input plugins use the same function signature for exit callbacks. */
+ p->cb_exit(ins->context, config);
+ }
+}
+
+/* Invoke all exit input callbacks */
+void flb_input_exit_all(struct flb_config *config)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_input_instance *ins;
+ struct flb_input_plugin *p;
+
+ /* Iterate instances */
+ mk_list_foreach_safe_r(head, tmp, &config->inputs) {
+ ins = mk_list_entry(head, struct flb_input_instance, _head);
+ p = ins->p;
+ if (!p) {
+ continue;
+ }
+
+ /* invoke plugin instance exit callback */
+ flb_input_instance_exit(ins, config);
+
+ /* destroy the instance */
+ flb_input_instance_destroy(ins);
+ }
+}
+
+/* Check that at least one Input is enabled */
+int flb_input_check(struct flb_config *config)
+{
+ if (mk_list_is_empty(&config->inputs) == 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * API for Input plugins
+ * =====================
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ * The Input interface provides a certain number of functions that can be
+ * used by Input plugins to configure it own behavior and request specific
+ *
+ * 1. flb_input_set_context()
+ *
+ * let an Input plugin set a context data reference that can be used
+ * later when invoking other callbacks.
+ *
+ * 2. flb_input_set_collector_time()
+ *
+ * request the Engine to trigger a specific collector callback at a
+ * certain interval time. Note that this callback will run in the main
+ * thread so it computing time must be short, otherwise it will block
+ * the main loop.
+ *
+ * The collector can runs in timeouts of the order of seconds.nanoseconds
+ *
+ * note: 1 Second = 1000000000 Nanosecond
+ *
+ * 3. flb_input_set_collector_event()
+ *
+ * for a registered file descriptor, associate the READ events to a
+ * specified plugin. Every time there is some data to read, the collector
+ * callback will be triggered. Oriented to a file descriptor that already
+ * have information that may be read through iotctl(..FIONREAD..);
+ *
+ * 4. flb_input_set_collector_server()
+ *
+ * it register a collector based on TCP socket events. It register a socket
+ * who did bind() and listen() and for each event on the socket it triggers
+ * the registered callbacks.
+ */
+
+/* Assign an Configuration context to an Input */
+void flb_input_set_context(struct flb_input_instance *in, void *context)
+{
+ in->context = context;
+}
+
+int flb_input_channel_init(struct flb_input_instance *in)
+{
+ return flb_pipe_create(in->channel);
+}
+
+static struct flb_input_collector *collector_create(int type,
+ struct flb_input_instance *ins,
+ int (*cb) (
+ struct flb_input_instance *,
+ struct flb_config *, void *),
+ struct flb_config *config)
+{
+ struct flb_input_collector *coll;
+ struct flb_input_thread_instance *thi;
+
+ coll = flb_calloc(1, sizeof(struct flb_input_collector));
+ if (!coll) {
+ flb_errno();
+ return NULL;
+ }
+
+ coll->id = collector_id(ins);
+ coll->type = type;
+ coll->running = FLB_FALSE;
+ coll->fd_event = -1;
+ coll->fd_timer = -1;
+ coll->seconds = -1;
+ coll->nanoseconds = -1;
+ coll->cb_collect = cb;
+ coll->instance = ins;
+ MK_EVENT_ZERO(&coll->event);
+
+ if (flb_input_is_threaded(ins)) {
+ thi = ins->thi;
+ coll->evl = thi->evl;
+ }
+ else {
+ coll->evl = config->evl;
+ }
+
+ /*
+ * Collectors created from a threaded input instance are only added to the
+ * instance `collectors` list. For instances in non-threaded mode, they are
+ * added to both lists, the global config collectors list and the instance
+ * list.
+ */
+ mk_list_add(&coll->_head, &ins->collectors);
+
+ return coll;
+}
+
+
+int flb_input_set_collector_time(struct flb_input_instance *ins,
+ int (*cb_collect) (struct flb_input_instance *,
+ struct flb_config *, void *),
+ time_t seconds,
+ long nanoseconds,
+ struct flb_config *config)
+{
+ struct flb_input_collector *coll;
+
+ coll = collector_create(FLB_COLLECT_TIME, ins, cb_collect, config);
+ if (!coll) {
+ return -1;
+ }
+
+ /* specific collector initialization */
+ coll->seconds = seconds;
+ coll->nanoseconds = nanoseconds;
+
+ return coll->id;
+}
+
+int flb_input_set_collector_event(struct flb_input_instance *ins,
+ int (*cb_collect) (struct flb_input_instance *,
+ struct flb_config *, void *),
+ flb_pipefd_t fd,
+ struct flb_config *config)
+{
+ struct flb_input_collector *coll;
+
+ coll = collector_create(FLB_COLLECT_FD_EVENT, ins, cb_collect, config);
+ if (!coll) {
+ return -1;
+ }
+
+ /* specific collector initialization */
+ coll->fd_event = fd;
+
+ return coll->id;
+}
+
+int flb_input_set_collector_socket(struct flb_input_instance *ins,
+ int (*cb_new_connection) (struct flb_input_instance *,
+ struct flb_config *,
+ void *),
+ flb_pipefd_t fd,
+ struct flb_config *config)
+{
+ struct flb_input_collector *coll;
+
+
+ coll = collector_create(FLB_COLLECT_FD_SERVER, ins, cb_new_connection, config);
+ if (!coll) {
+ return -1;
+ }
+
+ /* specific collector initialization */
+ coll->fd_event = fd;
+
+ return coll->id;
+}
+
+
+static int collector_start(struct flb_input_collector *coll,
+ struct flb_config *config)
+{
+ int fd;
+ int ret;
+ struct mk_event *event;
+
+ if (coll->running == FLB_TRUE) {
+ return 0;
+ }
+
+ event = &coll->event;
+ event->mask = MK_EVENT_EMPTY;
+ event->status = MK_EVENT_NONE;
+
+ if (coll->type == FLB_COLLECT_TIME) {
+ fd = mk_event_timeout_create(coll->evl, coll->seconds,
+ coll->nanoseconds, event);
+ if (fd == -1) {
+ flb_error("[input collector] COLLECT_TIME registration failed");
+ coll->running = FLB_FALSE;
+ return -1;
+ }
+ coll->fd_timer = fd;
+ }
+ else if (coll->type & (FLB_COLLECT_FD_EVENT | FLB_COLLECT_FD_SERVER)) {
+ event->fd = coll->fd_event;
+ ret = mk_event_add(coll->evl,
+ coll->fd_event,
+ FLB_ENGINE_EV_CORE,
+ MK_EVENT_READ, event);
+ if (ret == -1) {
+ flb_error("[input collector] COLLECT_EVENT registration failed");
+ mk_event_closesocket(coll->fd_event);
+ coll->running = FLB_FALSE;
+ return -1;
+ }
+ }
+
+ coll->running = FLB_TRUE;
+ return 0;
+}
+
+int flb_input_collector_start(int coll_id, struct flb_input_instance *in)
+{
+ int ret;
+ struct mk_list *head;
+ struct flb_input_collector *coll;
+
+ mk_list_foreach(head, &in->collectors) {
+ coll = mk_list_entry(head, struct flb_input_collector, _head);
+ if (coll->id == coll_id) {
+ ret = collector_start(coll, in->config);
+ if (ret == -1) {
+ flb_error("[input] error starting collector #%i: %s",
+ coll_id, in->name);
+ }
+ return ret;
+ }
+ }
+
+ return -1;
+}
+
+/* start collectors for main thread, no threaded plugins */
+int flb_input_collectors_signal_start(struct flb_input_instance *ins)
+{
+ int ret;
+ struct mk_list *head;
+ struct flb_input_collector *coll;
+
+ if (flb_input_is_threaded(ins)) {
+ flb_error("input plugin '%s' is threaded", flb_input_name(ins));
+ return -1;
+ }
+
+ mk_list_foreach(head, &ins->collectors) {
+ coll = mk_list_entry(head, struct flb_input_collector, _head);
+ ret = flb_input_collector_start(coll->id, ins);
+ if (ret < 0) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Start all collectors: this function is invoked from the engine interface and aim
+ * to start the local collectors and also signal the threaded input plugins to start
+ * their own collectors.
+ */
+int flb_input_collectors_start(struct flb_config *config)
+{
+ int ret;
+ struct mk_list *head;
+ struct flb_input_instance *ins;
+
+ /* Signal threaded input plugins to start their collectors */
+ mk_list_foreach(head, &config->inputs) {
+ ins = mk_list_entry(head, struct flb_input_instance, _head);
+ if (flb_input_is_threaded(ins)) {
+ ret = flb_input_thread_collectors_signal_start(ins);
+ if (ret != 0) {
+ flb_error("could not start collectors for threaded plugin '%s'",
+ flb_input_name(ins));
+ }
+ }
+ else {
+ ret = flb_input_collectors_signal_start(ins);
+ if (ret != 0) {
+ flb_error("could not start collectors for plugin '%s'",
+ flb_input_name(ins));
+ }
+ }
+ }
+
+ return 0;
+}
+
+static struct flb_input_collector *get_collector(int id,
+ struct flb_input_instance *in)
+{
+ struct mk_list *head;
+ struct flb_input_collector *coll;
+
+ mk_list_foreach(head, &in->collectors) {
+ coll = mk_list_entry(head, struct flb_input_collector, _head);
+ if (coll->id == id) {
+ return coll;
+ }
+ }
+
+ return NULL;
+}
+
+int flb_input_collector_running(int coll_id, struct flb_input_instance *in)
+{
+ struct flb_input_collector *coll;
+
+ coll = get_collector(coll_id, in);
+ if (!coll) {
+ return FLB_FALSE;
+ }
+
+ return coll->running;
+}
+
+struct mk_event *flb_input_collector_get_event(int coll_id,
+ struct flb_input_instance *ins)
+{
+ struct flb_input_collector *collector;
+
+ collector = get_collector(coll_id, ins);
+
+ if (collector == NULL) {
+ return NULL;
+ }
+
+ return &collector->event;
+}
+
+/*
+ * TEST: this is a test function that can be used by input plugins to check the
+ * 'pause' and 'resume' callback operations.
+ *
+ * After is invoked, it will schedule an internal event to wake up the instance
+ * after 'sleep_seconds'.
+ */
+int flb_input_test_pause_resume(struct flb_input_instance *ins, int sleep_seconds)
+{
+ /*
+ * This is a fake pause/resume implementation since it's only used to test the plugin
+ * callbacks for such purposes.
+ */
+
+ /* pause the instance */
+ flb_input_pause(ins);
+
+ /* wait */
+ sleep(sleep_seconds);
+
+ /* resume again */
+ flb_input_resume(ins);
+
+ return 0;
+}
+
+int flb_input_pause(struct flb_input_instance *ins)
+{
+ /* if the instance is already paused, just return */
+ if (flb_input_buf_paused(ins)) {
+ return -1;
+ }
+
+ /* Pause only if a callback is set and a local context exists */
+ if (ins->p->cb_pause && ins->context) {
+ if (flb_input_is_threaded(ins)) {
+ /* signal the thread event loop about the 'pause' operation */
+ flb_input_thread_instance_pause(ins);
+ }
+ else {
+ flb_info("[input] pausing %s", flb_input_name(ins));
+ ins->p->cb_pause(ins->context, ins->config);
+ }
+ }
+
+ return 0;
+}
+
+int flb_input_resume(struct flb_input_instance *ins)
+{
+ if (ins->p->cb_resume) {
+ if (flb_input_is_threaded(ins)) {
+ /* signal the thread event loop about the 'resume' operation */
+ flb_input_thread_instance_resume(ins);
+ }
+ else {
+ ins->p->cb_resume(ins->context, ins->config);
+ }
+ }
+
+ return 0;
+}
+
+int flb_input_pause_all(struct flb_config *config)
+{
+ int ret;
+ int paused = 0;
+ struct mk_list *head;
+ struct flb_input_instance *ins;
+
+ mk_list_foreach(head, &config->inputs) {
+ ins = mk_list_entry(head, struct flb_input_instance, _head);
+ /*
+ * Inform the plugin that is being paused, the source type is set to 'FLB_INPUT_PAUSE_MEM_BUF', no real reason, we
+ * just need to get it paused.
+ */
+ ret = flb_input_pause(ins);
+ if (ret == 0) {
+ paused++;
+ }
+ }
+
+ return paused;
+}
+
+int flb_input_collector_destroy(struct flb_input_collector *coll)
+{
+ struct flb_config *config = coll->instance->config;
+
+ if (coll->type == FLB_COLLECT_TIME) {
+ if (coll->fd_timer > 0) {
+ mk_event_timeout_destroy(config->evl, &coll->event);
+ mk_event_closesocket(coll->fd_timer);
+ }
+ }
+ else {
+ mk_event_del(config->evl, &coll->event);
+ }
+
+ flb_free(coll);
+
+ return 0;
+}
+
+int flb_input_collector_pause(int coll_id, struct flb_input_instance *in)
+{
+ int ret;
+ flb_pipefd_t fd;
+ struct flb_input_collector *coll;
+
+ coll = get_collector(coll_id, in);
+ if (!coll) {
+ return -1;
+ }
+
+ if (coll->running == FLB_FALSE) {
+ return 0;
+ }
+
+ if (coll->type == FLB_COLLECT_TIME) {
+ /*
+ * For a collector time, it's better to just remove the file
+ * descriptor associated to the time out, when resumed a new
+ * one can be created.
+ *
+ * Note: Invalidate fd_timer first in case closing a socket
+ * invokes another event.
+ */
+ fd = coll->fd_timer;
+ coll->fd_timer = -1;
+ mk_event_timeout_destroy(coll->evl, &coll->event);
+ mk_event_closesocket(fd);
+ }
+ else if (coll->type & (FLB_COLLECT_FD_SERVER | FLB_COLLECT_FD_EVENT)) {
+ ret = mk_event_del(coll->evl, &coll->event);
+ if (ret != 0) {
+ flb_warn("[input] cannot disable event for %s", in->name);
+ return -1;
+ }
+ }
+
+ coll->running = FLB_FALSE;
+
+ return 0;
+}
+
+int flb_input_collector_delete(int coll_id, struct flb_input_instance *in)
+{
+ struct flb_input_collector *coll;
+
+ coll = get_collector(coll_id, in);
+ if (!coll) {
+ return -1;
+ }
+ if (flb_input_collector_pause(coll_id, in) < 0) {
+ return -1;
+ }
+
+
+ pthread_mutex_lock(&in->config->collectors_mutex);
+ mk_list_del(&coll->_head);
+ pthread_mutex_unlock(&in->config->collectors_mutex);
+
+ flb_free(coll);
+ return 0;
+}
+
+int flb_input_collector_resume(int coll_id, struct flb_input_instance *in)
+{
+ int fd;
+ int ret;
+ struct flb_input_collector *coll;
+ struct flb_config *config;
+ struct mk_event *event;
+
+ coll = get_collector(coll_id, in);
+ if (!coll) {
+ return -1;
+ }
+
+ if (coll->running == FLB_TRUE) {
+ flb_error("[input] cannot resume collector %s:%i, already running",
+ in->name, coll_id);
+ return -1;
+ }
+
+ config = in->config;
+ event = &coll->event;
+
+ /* If data ingestion has been paused, the collector cannot resume */
+ if (config->is_ingestion_active == FLB_FALSE) {
+ return 0;
+ }
+
+ if (coll->type == FLB_COLLECT_TIME) {
+ event->mask = MK_EVENT_EMPTY;
+ event->status = MK_EVENT_NONE;
+ fd = mk_event_timeout_create(coll->evl, coll->seconds,
+ coll->nanoseconds, event);
+ if (fd == -1) {
+ flb_error("[input collector] resume COLLECT_TIME failed");
+ return -1;
+ }
+ coll->fd_timer = fd;
+ }
+ else if (coll->type & (FLB_COLLECT_FD_SERVER | FLB_COLLECT_FD_EVENT)) {
+ event->fd = coll->fd_event;
+ event->mask = MK_EVENT_EMPTY;
+ event->status = MK_EVENT_NONE;
+
+ ret = mk_event_add(coll->evl,
+ coll->fd_event,
+ FLB_ENGINE_EV_CORE,
+ MK_EVENT_READ, event);
+ if (ret == -1) {
+ flb_error("[input] cannot disable/pause event for %s", in->name);
+ return -1;
+ }
+ }
+
+ coll->running = FLB_TRUE;
+
+ return 0;
+}
+
+int flb_input_collector_fd(flb_pipefd_t fd, struct flb_config *config)
+{
+ struct mk_list *head;
+ struct mk_list *head_coll;
+ struct flb_input_instance *ins;
+ struct flb_input_collector *collector = NULL;
+ struct flb_input_coro *input_coro;
+
+ mk_list_foreach(head, &config->inputs) {
+ ins = mk_list_entry(head, struct flb_input_instance, _head);
+ mk_list_foreach(head_coll, &ins->collectors) {
+ collector = mk_list_entry(head_coll, struct flb_input_collector, _head);
+ if (collector->fd_event == fd) {
+ break;
+ }
+ else if (collector->fd_timer == fd) {
+ flb_utils_timer_consume(fd);
+ break;
+ }
+ collector = NULL;
+ }
+
+ if (collector) {
+ break;
+ }
+ }
+
+ /* No matches */
+ if (!collector) {
+ return -1;
+ }
+
+ if (collector->running == FLB_FALSE) {
+ return -1;
+ }
+
+ /* Trigger the collector callback */
+ if (collector->instance->runs_in_coroutine) {
+ input_coro = flb_input_coro_collect(collector, config);
+ if (!input_coro) {
+ return -1;
+ }
+ flb_input_coro_resume(input_coro);
+ }
+ else {
+ if (collector->cb_collect(collector->instance, config,
+ collector->instance->context) == -1) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+int flb_input_upstream_set(struct flb_upstream *u, struct flb_input_instance *ins)
+{
+ if (!u) {
+ return -1;
+ }
+
+ /*
+ * if the input instance runs in threaded mode, make sure to flag the
+ * upstream context so the lists operations are done in thread safe mode
+ */
+ if (flb_input_is_threaded(ins)) {
+ flb_upstream_thread_safe(u);
+ mk_list_add(&u->base._head, &ins->upstreams);
+ }
+
+ /* Set networking options 'net.*' received through instance properties */
+ memcpy(&u->base.net, &ins->net_setup, sizeof(struct flb_net_setup));
+
+ return 0;
+}
+
+int flb_input_downstream_set(struct flb_downstream *stream,
+ struct flb_input_instance *ins)
+{
+ if (stream == NULL) {
+ return -1;
+ }
+
+ /*
+ * If the input plugin will run in multiple threads, enable
+ * the thread safe mode for the Downstream context.
+ */
+ if (flb_input_is_threaded(ins)) {
+ flb_stream_enable_thread_safety(&stream->base);
+
+ mk_list_add(&stream->base._head, &ins->downstreams);
+ }
+
+ return 0;
+}