summaryrefslogtreecommitdiffstats
path: root/fluent-bit/include/fluent-bit/flb_input.h
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/include/fluent-bit/flb_input.h')
-rw-r--r--fluent-bit/include/fluent-bit/flb_input.h739
1 files changed, 739 insertions, 0 deletions
diff --git a/fluent-bit/include/fluent-bit/flb_input.h b/fluent-bit/include/fluent-bit/flb_input.h
new file mode 100644
index 00000000..1b4dff82
--- /dev/null
+++ b/fluent-bit/include/fluent-bit/flb_input.h
@@ -0,0 +1,739 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FLB_INPUT_H
+#define FLB_INPUT_H
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_engine_macros.h>
+#include <fluent-bit/flb_coro.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_network.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_str.h>
+#include <fluent-bit/flb_bits.h>
+#include <fluent-bit/flb_pipe.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_coro.h>
+#include <fluent-bit/flb_mp.h>
+#include <fluent-bit/flb_hash_table.h>
+
+#include <fluent-bit/flb_input_chunk.h>
+#include <fluent-bit/flb_input_log.h>
+#include <fluent-bit/flb_input_metric.h>
+#include <fluent-bit/flb_input_trace.h>
+#include <fluent-bit/flb_config_format.h>
+#include <fluent-bit/flb_processor.h>
+
+#ifdef FLB_HAVE_METRICS
+#include <fluent-bit/flb_metrics.h>
+#endif
+#include <fluent-bit/flb_pthread.h>
+
+#include <cmetrics/cmetrics.h>
+#include <monkey/mk_core.h>
+#include <msgpack.h>
+#include <inttypes.h>
+
+#define FLB_COLLECT_TIME 1
+#define FLB_COLLECT_FD_EVENT 2
+#define FLB_COLLECT_FD_SERVER 4
+
+/* Input plugin flag masks */
+#define FLB_INPUT_NET 4 /* input address may set host and port */
+#define FLB_INPUT_PLUGIN_CORE 0
+#define FLB_INPUT_PLUGIN_PROXY 1
+#define FLB_INPUT_CORO 128 /* plugin requires a thread on callbacks */
+#define FLB_INPUT_PRIVATE 256 /* plugin is not published/exposed */
+#define FLB_INPUT_NOTAG 512 /* plugin might don't have tags */
+#define FLB_INPUT_THREADED 1024 /* plugin must run in a separate thread */
+#define FLB_INPUT_NET_SERVER 2048 /* Input address may set host and port.
+ * In addition, if TLS is enabled then a
+ * private key and certificate are required.
+ */
+
+/* Input status */
+#define FLB_INPUT_RUNNING 1
+#define FLB_INPUT_PAUSED 0
+
+/* Input plugin event type */
+#define FLB_INPUT_LOGS 0
+#define FLB_INPUT_METRICS 1
+#define FLB_INPUT_TRACES 2
+
+struct flb_input_instance;
+
+struct flb_input_plugin {
+ /*
+ * The type defines if this is a core-based plugin or it's handled by
+ * some specific proxy.
+ */
+ int type;
+ void *proxy;
+
+ int flags; /* plugin flags */
+
+ /* The Input name */
+ char *name;
+
+ /* Plugin Description */
+ char *description;
+
+ struct flb_config_map *config_map;
+
+ /* Initialization */
+ int (*cb_init) (struct flb_input_instance *, struct flb_config *, void *);
+
+ /* Pre run */
+ int (*cb_pre_run) (struct flb_input_instance *, struct flb_config *, void *);
+
+ /* Collect: every certain amount of time, Fluent Bit trigger this callback */
+ int (*cb_collect) (struct flb_input_instance *, struct flb_config *, void *);
+
+ /*
+ * Flush: each plugin during a collection, it does some buffering,
+ * when the Flush timer takes place on the Engine, it will trigger
+ * the cb_flush(...) to obtain the plugin buffer data. This data is
+ * a MsgPack buffer which will be processed by the Engine and delivered
+ * to the target output.
+ */
+
+ /* Flush a buffer type (raw data) */
+ void *(*cb_flush_buf) (void *, size_t *);
+
+ /* Notify that a flush have completed on the collector (buf + iov) */
+ void (*cb_flush_end) (void *);
+
+ /*
+ * Callbacks to notify the plugin when it becomes paused (cannot longer append
+ * data) and when it can resume operations.
+ */
+ void (*cb_pause) (void *, struct flb_config *);
+ void (*cb_resume) (void *, struct flb_config *);
+
+ /*
+ * Optional callback that can be used from a parent caller to ingest
+ * data into the engine.
+ */
+ int (*cb_ingest) (void *in_context, void *, size_t);
+
+ /* Exit */
+ int (*cb_exit) (void *, struct flb_config *);
+
+ /* Destroy */
+ void (*cb_destroy) (struct flb_input_plugin *);
+
+ void *instance;
+
+ struct mk_list _head;
+};
+
+/*
+ * Each initialized plugin must have an instance, same plugin may be
+ * loaded more than one time.
+ *
+ * An instance try to contain plugin data separating what is fixed data
+ * and the variable one that is generated when the plugin is invoked.
+ */
+struct flb_input_instance {
+ struct mk_event event; /* events handler */
+
+ struct flb_processor *processor;
+
+ /*
+ * The instance flags are derived from the fixed plugin flags. This
+ * is done to offer some flexibility where a plugin instance per
+ * configuration would like to change some specific behavior.
+ *
+ * e.g By default in_tail plugin supports fixed tag, but if a wildcard
+ * is added to the 'tag', it will instruct to perform dyntag operations
+ * as the tags will be composed used the file name being watched.
+ */
+ int flags;
+
+ int id; /* instance id */
+#ifdef FLB_HAVE_CHUNK_TRACE
+ struct flb_chunk_trace_context *chunk_trace_ctxt;
+ pthread_mutex_t chunk_trace_lock;
+#endif /* FLB_HAVE_CHUNK_TRACE */
+ int log_level; /* log level for this plugin */
+ int log_suppress_interval; /* log suppression interval */
+ flb_pipefd_t channel[2]; /* pipe(2) channel */
+ int runs_in_coroutine; /* instance runs in coroutine ? */
+ char name[32]; /* numbered name (cpu -> cpu.0) */
+ char *alias; /* alias name for the instance */
+ void *context; /* plugin configuration context */
+ flb_pipefd_t ch_events[2]; /* channel for events */
+ struct flb_input_plugin *p; /* original plugin */
+
+ /* Plugin properties */
+ char *tag; /* Input tag for routing */
+ int tag_len;
+ int tag_default; /* is it using the default tag? */
+
+ /* By default all input instances are 'routable' */
+ int routable;
+
+ /* flag to pause input when storage is full */
+ int storage_pause_on_chunks_overlimit;
+
+ /*
+ * Input network info:
+ *
+ * An input plugin can be specified just using it shortname or using the
+ * complete network address format, e.g:
+ *
+ * $ fluent-bit -i cpu -o plugin://hostname:port/uri
+ *
+ * where:
+ *
+ * plugin = the output plugin shortname
+ * name = IP address or hostname of the target
+ * port = target TCP port
+ * uri = extra information that may be used by the plugin
+ */
+ struct flb_net_host host;
+
+ /* Reference to struct flb_storage_input context */
+ void *storage;
+
+ /* Type of storage: CIO_STORE_FS (filesystem) or CIO_STORE_MEM (memory) */
+ int storage_type;
+
+ /*
+ * Buffers counter: it counts the total of memory used by fixed and dynamic
+ * message pack buffers used by the input plugin instance.
+ */
+ size_t mem_chunks_size;
+ size_t mp_total_buf_size; /* FIXME: to be deprecated */
+
+ /*
+ * Buffer limit: optional limit set by configuration so this input instance
+ * cannot exceed more than mp_buf_limit (bytes unit).
+ *
+ * As a reference, if an input plugin exceeds the limit, the pause() callback
+ * will be triggered to notify the input instance it cannot longer append
+ * more data, on that moment Fluent Bit will avoid to add more records.
+ *
+ * When the buffer size goes down (because data was flushed), a resume()
+ * callback will be triggered, from that moment the plugin can append more
+ * data.
+ */
+ size_t mem_buf_limit;
+
+ /*
+ * Define the buffer status:
+ *
+ * - FLB_INPUT_RUNNING -> can append more data
+ * - FLB_INPUT_PAUSED -> cannot append data
+ */
+ int mem_buf_status;
+
+ /*
+ * Define the buffer status:
+ *
+ * - FLB_INPUT_RUNNING -> can append more data
+ * - FLB_INPUT_PAUSED -> cannot append data
+ */
+ int storage_buf_status;
+
+ /*
+ * Optional data passed to the plugin, this info is useful when
+ * running Fluent Bit in library mode and the target plugin needs
+ * some specific data from it caller.
+ */
+ void *data;
+
+ struct mk_list *config_map; /* configuration map */
+
+ struct mk_list _head; /* link to config->inputs */
+
+ struct mk_list routes_direct; /* direct routes set by API */
+ struct mk_list routes; /* flb_router_path's list */
+ struct mk_list properties; /* properties / configuration */
+ struct mk_list collectors; /* collectors */
+
+ /* Storage Chunks */
+ struct mk_list chunks; /* linked list of all chunks */
+
+ /*
+ * The following list helps to separate the chunks per its
+ * status, it can be 'up' or 'down'.
+ */
+ struct mk_list chunks_up; /* linked list of all chunks up */
+ struct mk_list chunks_down; /* linked list of all chunks down */
+
+ /*
+ * Every co-routine created by the engine when flushing data, it's
+ * linked into this list header.
+ */
+ struct mk_list tasks;
+
+ /* co-routines for input plugins with FLB_INPUT_CORO flag */
+ int input_coro_id;
+ struct mk_list input_coro_list;
+ struct mk_list input_coro_list_destroy;
+
+#ifdef FLB_HAVE_METRICS
+ /* old metrics API */
+ struct flb_metrics *metrics; /* metrics */
+#endif
+
+
+ /* is the plugin running in a separate thread ? */
+ int is_threaded;
+ struct flb_input_thread_instance *thi;
+
+ /*
+ * ring buffer: the ring buffer is used by the instance if is running
+ * in threaded mode; so when registering a msgpack buffer this happens
+ * in the ring buffer.
+ */
+ struct flb_ring_buffer *rb;
+
+ /* List of upstreams */
+ struct mk_list upstreams;
+
+ /* List of downstreams */
+ struct mk_list downstreams;
+
+ /*
+ * CMetrics
+ * --------
+ *
+ * All metrics available for an input plugin instance.
+ */
+ struct cmt *cmt; /* parent context */
+ struct cmt_counter *cmt_bytes; /* metric: input_bytes_total */
+ struct cmt_counter *cmt_records; /* metric: input_records_total */
+
+ /* is the input instance overlimit ?: 1 or 0 */
+ struct cmt_gauge *cmt_storage_overlimit;
+
+ /* memory bytes used by chunks */
+ struct cmt_gauge *cmt_storage_memory_bytes;
+
+ /* total number of chunks */
+ struct cmt_gauge *cmt_storage_chunks;
+
+ /* total number of chunks up in memory */
+ struct cmt_gauge *cmt_storage_chunks_up;
+
+ /* total number of chunks down */
+ struct cmt_gauge *cmt_storage_chunks_down;
+
+ /* number of chunks in a busy state */
+ struct cmt_gauge *cmt_storage_chunks_busy;
+
+ /* total bytes used by chunks in a busy state */
+ struct cmt_gauge *cmt_storage_chunks_busy_bytes;
+
+ /* memory ring buffer (memrb) metrics */
+ struct cmt_counter *cmt_memrb_dropped_chunks;
+ struct cmt_counter *cmt_memrb_dropped_bytes;
+
+ /*
+ * Indexes for generated chunks: simple hash tables that keeps the latest
+ * available chunks for writing data operations. This optimizes the
+ * lookup for candidates chunks to write data.
+ */
+ struct flb_hash_table *ht_log_chunks;
+ struct flb_hash_table *ht_metric_chunks;
+ struct flb_hash_table *ht_trace_chunks;
+
+ /* TLS settings */
+ int use_tls; /* bool, try to use TLS for I/O */
+ int tls_verify; /* Verify certs (default: true) */
+ int tls_debug; /* mbedtls debug level */
+ char *tls_vhost; /* Virtual hostname for SNI */
+ char *tls_ca_path; /* Path to certificates */
+ char *tls_ca_file; /* CA root cert */
+ char *tls_crt_file; /* Certificate */
+ char *tls_key_file; /* Cert Key */
+ char *tls_key_passwd; /* Cert Key Password */
+
+ struct mk_list *tls_config_map;
+
+#ifdef FLB_HAVE_TLS
+ struct flb_tls *tls;
+#else
+ void *tls;
+#endif
+
+ /* General network options like timeouts and keepalive */
+ struct flb_net_setup net_setup;
+ struct mk_list *net_config_map;
+ struct mk_list net_properties;
+
+ /* Keep a reference to the original context this instance belongs to */
+ struct flb_config *config;
+};
+
+struct flb_input_collector {
+ struct mk_event event;
+ struct mk_event_loop *evl; /* event loop */
+
+ int id; /* collector id */
+ int type; /* collector type */
+ int running; /* is running ? (True/False) */
+
+ /* FLB_COLLECT_FD_EVENT */
+ flb_pipefd_t fd_event; /* fd being watched */
+
+ /* FLB_COLLECT_TIME */
+ flb_pipefd_t fd_timer; /* timer fd */
+ time_t seconds; /* expire time in seconds */
+ long nanoseconds; /* expire nanoseconds */
+
+ /* Callback */
+ int (*cb_collect) (struct flb_input_instance *,
+ struct flb_config *, void *);
+
+
+ /* General references */
+ struct flb_input_instance *instance; /* plugin instance */
+ struct mk_list _head; /* link to instance collectors */
+};
+
+struct flb_input_coro {
+ int id; /* id returned from flb_input_coro_id_get() */
+ time_t start_time; /* start time */
+ time_t end_time; /* end time */
+ struct flb_input_instance *ins; /* parent input instance */
+ struct flb_coro *coro; /* coroutine context */
+ struct flb_config *config; /* FLB context */
+ struct mk_list _head; /* link to list on input_instance->coros */
+};
+
+int flb_input_coro_id_get(struct flb_input_instance *ins);
+
+static FLB_INLINE
+struct flb_input_coro *flb_input_coro_create(struct flb_input_instance *ins,
+ struct flb_config *config)
+{
+ struct flb_coro *coro;
+ struct flb_input_coro *input_coro;
+
+ /* input_coro context */
+ input_coro = (struct flb_input_coro *) flb_calloc(1,
+ sizeof(struct flb_input_coro));
+ if (!input_coro) {
+ flb_errno();
+ return NULL;
+ }
+
+ /* coroutine context */
+ coro = flb_coro_create(input_coro);
+ if (!coro) {
+ flb_free(input_coro);
+ return NULL;
+ }
+
+ input_coro->id = flb_input_coro_id_get(ins);
+ input_coro->ins = ins;
+ input_coro->start_time = time(NULL);
+ input_coro->coro = coro;
+ input_coro->config = config;
+
+ mk_list_add(&input_coro->_head, &ins->input_coro_list);
+
+ return input_coro;
+}
+
+struct flb_libco_in_params {
+ struct flb_config *config;
+ struct flb_input_collector *coll;
+ struct flb_coro *coro;
+};
+
+extern pthread_key_t libco_in_param_key;
+extern struct flb_libco_in_params libco_in_param;
+void flb_input_coro_prepare_destroy(struct flb_input_coro *input_coro);
+
+static FLB_INLINE void input_params_set(struct flb_coro *coro,
+ struct flb_input_collector *coll,
+ struct flb_config *config,
+ void *context)
+{
+ struct flb_libco_in_params *params;
+
+ params = pthread_getspecific(libco_in_param_key);
+ if (params == NULL) {
+ params = flb_calloc(1, sizeof(struct flb_libco_in_params));
+ if (params == NULL) {
+ flb_errno();
+ return;
+ }
+ pthread_setspecific(libco_in_param_key, params);
+ }
+
+ /* Set callback parameters */
+ params->coll = coll;
+ params->config = config;
+ params->coro = coro;
+ co_switch(coro->callee);
+}
+
+static FLB_INLINE void input_pre_cb_collect(void)
+{
+ struct flb_input_collector *coll;
+ struct flb_config *config;
+ struct flb_coro *coro;
+ struct flb_libco_in_params *params;
+
+ params = pthread_getspecific(libco_in_param_key);
+ if (params == NULL) {
+ params = flb_calloc(1, sizeof(struct flb_libco_in_params));
+ if (params == NULL) {
+ flb_errno();
+ return;
+ }
+ pthread_setspecific(libco_in_param_key, params);
+ }
+ coll = params->coll;
+ config = params->config;
+ coro = params->coro;
+
+ co_switch(coro->caller);
+ coll->cb_collect(coll->instance, config, coll->instance->context);
+}
+
+static FLB_INLINE void flb_input_coro_resume(struct flb_input_coro *input_coro)
+{
+ flb_coro_resume(input_coro->coro);
+}
+
+static void libco_in_param_key_destroy(void *data)
+{
+ struct flb_libco_inparams *params = (struct flb_libco_inparams*)data;
+
+ flb_free(params);
+}
+
+static FLB_INLINE
+struct flb_input_coro *flb_input_coro_collect(struct flb_input_collector *coll,
+ struct flb_config *config)
+{
+ size_t stack_size;
+ struct flb_coro *coro;
+ struct flb_input_coro *input_coro;
+
+ input_coro = flb_input_coro_create(coll->instance, config);
+ if (!input_coro) {
+ return NULL;
+ }
+
+ pthread_key_create(&libco_in_param_key, libco_in_param_key_destroy);
+
+ coro = input_coro->coro;
+ if (!coro) {
+ return NULL;
+ }
+
+ coro->caller = co_active();
+ coro->callee = co_create(config->coro_stack_size,
+ input_pre_cb_collect, &stack_size);
+
+#ifdef FLB_HAVE_VALGRIND
+ coro->valgrind_stack_id = VALGRIND_STACK_REGISTER(coro->callee,
+ ((char *)coro->callee) + stack_size);
+#endif
+
+ /* Set parameters */
+ input_params_set(coro, coll, config, coll->instance->context);
+ return input_coro;
+}
+
+static FLB_INLINE int flb_input_is_threaded(struct flb_input_instance *ins)
+{
+ return ins->is_threaded;
+}
+
+/*
+ * This function is used by the output plugins to return. It's mandatory
+ * as it will take care to signal the event loop letting know the flush
+ * callback has done.
+ *
+ * The signal emitted indicate the 'Task' number that have finished plus
+ * a return value. The return value is either FLB_OK, FLB_RETRY or FLB_ERROR.
+ *
+ * If the caller have requested an FLB_RETRY, it will be issued depending on the
+ * number of retries, if it has exceeded the 'retry_limit' option, an FLB_ERROR
+ * will be returned instead.
+ */
+static inline void flb_input_return(struct flb_coro *coro) {
+ int n;
+ uint64_t val;
+ struct flb_input_coro *input_coro;
+ struct flb_input_instance *ins;
+
+ input_coro = (struct flb_input_coro *) coro->data;
+ ins = input_coro->ins;
+
+ /*
+ * Message the event loop by identifying the message coming from an input
+ * coroutine and passing the input plugin ID that triggered the event.
+ */
+ val = FLB_BITS_U64_SET(FLB_ENGINE_IN_CORO, ins->id);
+ n = flb_pipe_w(ins->ch_events[1], (void *) &val, sizeof(val));
+ if (n == -1) {
+ flb_errno();
+ }
+
+ flb_input_coro_prepare_destroy(input_coro);
+}
+
+static inline void flb_input_return_do(int ret) {
+ struct flb_coro *coro = flb_coro_get();
+
+ flb_input_return(coro);
+ flb_coro_yield(coro, FLB_TRUE);
+}
+
+#define FLB_INPUT_RETURN(x) \
+ flb_input_return_do(x); \
+ return x;
+
+static inline int flb_input_buf_paused(struct flb_input_instance *i)
+{
+ if (i->mem_buf_status == FLB_INPUT_PAUSED) {
+ return FLB_TRUE;
+ }
+ if (i->storage_buf_status == FLB_INPUT_PAUSED) {
+ return FLB_TRUE;
+ }
+
+ return FLB_FALSE;
+}
+
+static inline int flb_input_config_map_set(struct flb_input_instance *ins,
+ void *context)
+{
+ int ret;
+
+ /* Process normal properties */
+ if (ins->config_map) {
+ ret = flb_config_map_set(&ins->properties, ins->config_map, context);
+
+ if (ret == -1) {
+ return -1;
+ }
+ }
+
+ /* Net properties */
+ if (ins->net_config_map) {
+ ret = flb_config_map_set(&ins->net_properties, ins->net_config_map,
+ &ins->net_setup);
+ if (ret == -1) {
+ return -1;
+ }
+ }
+
+ return ret;
+}
+
+int flb_input_register_all(struct flb_config *config);
+struct flb_input_instance *flb_input_new(struct flb_config *config,
+ const char *input, void *data,
+ int public_only);
+struct flb_input_instance *flb_input_get_instance(struct flb_config *config,
+ int ins_id);
+
+int flb_input_set_property(struct flb_input_instance *ins,
+ const char *k, const char *v);
+const char *flb_input_get_property(const char *key,
+ struct flb_input_instance *ins);
+#ifdef FLB_HAVE_METRICS
+void *flb_input_get_cmt_instance(struct flb_input_instance *ins);
+#endif
+
+int flb_input_check(struct flb_config *config);
+void flb_input_set_context(struct flb_input_instance *ins, void *context);
+int flb_input_channel_init(struct flb_input_instance *ins);
+
+
+int flb_input_collector_start(int coll_id, struct flb_input_instance *ins);
+int flb_input_collectors_start(struct flb_config *config);
+int flb_input_collector_pause(int coll_id, struct flb_input_instance *ins);
+int flb_input_collector_resume(int coll_id, struct flb_input_instance *ins);
+int flb_input_collector_delete(int coll_id, struct flb_input_instance *ins);
+int flb_input_collector_destroy(struct flb_input_collector *coll);
+int flb_input_collector_fd(flb_pipefd_t fd, struct flb_config *config);
+struct mk_event *flb_input_collector_get_event(int coll_id,
+ struct flb_input_instance *ins);
+int flb_input_set_collector_time(struct flb_input_instance *ins,
+ int (*cb_collect) (struct flb_input_instance *,
+ struct flb_config *, void *),
+ time_t seconds,
+ long nanoseconds,
+ struct flb_config *config);
+int flb_input_set_collector_event(struct flb_input_instance *ins,
+ int (*cb_collect) (struct flb_input_instance *,
+ struct flb_config *, void *),
+ flb_pipefd_t fd,
+ struct flb_config *config);
+int flb_input_set_collector_socket(struct flb_input_instance *ins,
+ int (*cb_new_connection) (struct flb_input_instance *,
+ struct flb_config *,
+ void*),
+ flb_pipefd_t fd,
+ struct flb_config *config);
+int flb_input_collector_running(int coll_id, struct flb_input_instance *ins);
+int flb_input_coro_id_get(struct flb_input_instance *ins);
+int flb_input_coro_finished(struct flb_config *config, int ins_id);
+
+int flb_input_instance_init(struct flb_input_instance *ins,
+ struct flb_config *config);
+void flb_input_instance_exit(struct flb_input_instance *ins,
+ struct flb_config *config);
+void flb_input_instance_destroy(struct flb_input_instance *ins);
+
+int flb_input_net_property_check(struct flb_input_instance *ins,
+ struct flb_config *config);
+int flb_input_plugin_property_check(struct flb_input_instance *ins,
+ struct flb_config *config);
+
+int flb_input_init_all(struct flb_config *config);
+void flb_input_pre_run_all(struct flb_config *config);
+void flb_input_exit_all(struct flb_config *config);
+
+void *flb_input_flush(struct flb_input_instance *ins, size_t *size);
+
+int flb_input_test_pause_resume(struct flb_input_instance *ins, int sleep_seconds);
+int flb_input_pause(struct flb_input_instance *ins);
+int flb_input_pause_all(struct flb_config *config);
+int flb_input_resume(struct flb_input_instance *ins);
+
+const char *flb_input_name(struct flb_input_instance *ins);
+int flb_input_name_exists(const char *name, struct flb_config *config);
+
+void flb_input_net_default_listener(const char *listen, int port,
+ struct flb_input_instance *ins);
+
+int flb_input_log_check(struct flb_input_instance *ins, int l);
+
+struct mk_event_loop *flb_input_event_loop_get(struct flb_input_instance *ins);
+int flb_input_upstream_set(struct flb_upstream *u, struct flb_input_instance *ins);
+int flb_input_downstream_set(struct flb_downstream *stream,
+ struct flb_input_instance *ins);
+
+/* processors */
+int flb_input_instance_processors_load(struct flb_input_instance *ins, struct flb_cf_group *processors);
+
+#endif