summaryrefslogtreecommitdiffstats
path: root/logsmanagement/flb_plugin.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:18 +0000
commit5da14042f70711ea5cf66e034699730335462f66 (patch)
tree0f6354ccac934ed87a2d555f45be4c831cf92f4a /logsmanagement/flb_plugin.c
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz
netdata-5da14042f70711ea5cf66e034699730335462f66.zip
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'logsmanagement/flb_plugin.c')
-rw-r--r--logsmanagement/flb_plugin.c1536
1 files changed, 0 insertions, 1536 deletions
diff --git a/logsmanagement/flb_plugin.c b/logsmanagement/flb_plugin.c
deleted file mode 100644
index 493749ed4..000000000
--- a/logsmanagement/flb_plugin.c
+++ /dev/null
@@ -1,1536 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-/** @file flb_plugin.c
- * @brief This file includes all functions that act as an API to
- * the Fluent Bit library.
- */
-
-#include "flb_plugin.h"
-#include <lz4.h>
-#include "helper.h"
-#include "defaults.h"
-#include "circular_buffer.h"
-#include "daemon/common.h"
-#include "libnetdata/libnetdata.h"
-#include "../fluent-bit/lib/msgpack-c/include/msgpack/unpack.h"
-#include "../fluent-bit/lib/msgpack-c/include/msgpack/object.h"
-#include "../fluent-bit/lib/monkey/include/monkey/mk_core/mk_list.h"
-#include <dlfcn.h>
-
-#ifdef HAVE_SYSTEMD
-#include <systemd/sd-journal.h>
-#define SD_JOURNAL_SEND_DEFAULT_FIELDS \
- "%s_LOG_SOURCE=%s" , sd_journal_field_prefix, log_src_t_str[p_file_info->log_source], \
- "%s_LOG_TYPE=%s" , sd_journal_field_prefix, log_src_type_t_str[p_file_info->log_type]
-#endif
-
-#define LOG_REC_KEY "msg" /**< key to represent log message field in most log sources **/
-#define LOG_REC_KEY_SYSTEMD "MESSAGE" /**< key to represent log message field in systemd log source **/
-#define SYSLOG_TIMESTAMP_SIZE 16
-#define UNKNOWN "unknown"
-
-
-/* Including "../fluent-bit/include/fluent-bit/flb_macros.h" causes issues
- * with CI, as it requires mk_core/mk_core_info.h which is generated only
- * after Fluent Bit has been built. We can instead just redefined a couple
- * of macros here: */
-#define FLB_FALSE 0
-#define FLB_TRUE !FLB_FALSE
-
-/* For similar reasons, (re)define the following macros from "flb_lib.h": */
-/* Lib engine status */
-#define FLB_LIB_ERROR -1
-#define FLB_LIB_NONE 0
-#define FLB_LIB_OK 1
-#define FLB_LIB_NO_CONFIG_MAP 2
-
-/* Following structs are the same as defined in fluent-bit/flb_lib.h and
- * fluent-bit/flb_time.h, but need to be redefined due to use of dlsym(). */
-
-struct flb_time {
- struct timespec tm;
-};
-
-/* Library mode context data */
-struct flb_lib_ctx {
- int status;
- struct mk_event_loop *event_loop;
- struct mk_event *event_channel;
- struct flb_config *config;
-};
-
-struct flb_parser_types {
- char *key;
- int key_len;
- int type;
-};
-
-struct flb_parser {
- /* configuration */
- int type; /* parser type */
- char *name; /* format name */
- char *p_regex; /* pattern for main regular expression */
- int skip_empty; /* skip empty regex matches */
- char *time_fmt; /* time format */
- char *time_fmt_full; /* original given time format */
- char *time_key; /* field name that contains the time */
- int time_offset; /* fixed UTC offset */
- int time_keep; /* keep time field */
- int time_strict; /* parse time field strictly */
- int logfmt_no_bare_keys; /* in logfmt parsers, require all keys to have values */
- char *time_frac_secs; /* time format have fractional seconds ? */
- struct flb_parser_types *types; /* type casting */
- int types_len;
-
- /* Field decoders */
- struct mk_list *decoders;
-
- /* internal */
- int time_with_year; /* do time_fmt consider a year (%Y) ? */
- char *time_fmt_year;
- int time_with_tz; /* do time_fmt consider a timezone ? */
- struct flb_regex *regex;
- struct mk_list _head;
-};
-
-struct flb_lib_out_cb {
- int (*cb) (void *record, size_t size, void *data);
- void *data;
-};
-
-typedef struct flb_lib_ctx flb_ctx_t;
-
-static flb_ctx_t *(*flb_create)(void);
-static int (*flb_service_set)(flb_ctx_t *ctx, ...);
-static int (*flb_start)(flb_ctx_t *ctx);
-static int (*flb_stop)(flb_ctx_t *ctx);
-static void (*flb_destroy)(flb_ctx_t *ctx);
-static int (*flb_time_pop_from_msgpack)(struct flb_time *time, msgpack_unpacked *upk, msgpack_object **map);
-static int (*flb_lib_free)(void *data);
-static struct flb_parser *(*flb_parser_create)( const char *name, const char *format, const char *p_regex, int skip_empty,
- const char *time_fmt, const char *time_key, const char *time_offset,
- int time_keep, int time_strict, int logfmt_no_bare_keys,
- struct flb_parser_types *types, int types_len,struct mk_list *decoders,
- struct flb_config *config);
-static int (*flb_input)(flb_ctx_t *ctx, const char *input, void *data);
-static int (*flb_input_set)(flb_ctx_t *ctx, int ffd, ...);
-// static int (*flb_filter)(flb_ctx_t *ctx, const char *filter, void *data);
-// static int (*flb_filter_set)(flb_ctx_t *ctx, int ffd, ...);
-static int (*flb_output)(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb);
-static int (*flb_output_set)(flb_ctx_t *ctx, int ffd, ...);
-static msgpack_unpack_return (*dl_msgpack_unpack_next)(msgpack_unpacked* result, const char* data, size_t len, size_t* off);
-static void (*dl_msgpack_zone_free)(msgpack_zone* zone);
-static int (*dl_msgpack_object_print_buffer)(char *buffer, size_t buffer_size, msgpack_object o);
-
-static flb_ctx_t *ctx = NULL;
-static void *flb_lib_handle = NULL;
-
-static struct flb_lib_out_cb *fwd_input_out_cb = NULL;
-
-static const char *sd_journal_field_prefix = SD_JOURNAL_FIELD_PREFIX;
-
-extern netdata_mutex_t stdout_mut;
-
-int flb_init(flb_srvc_config_t flb_srvc_config,
- const char *const stock_config_dir,
- const char *const new_sd_journal_field_prefix){
- int rc = 0;
- char *dl_error;
-
- char *flb_lib_path = strdupz_path_subpath(stock_config_dir, "/../libfluent-bit.so");
- if (unlikely(NULL == (flb_lib_handle = dlopen(flb_lib_path, RTLD_LAZY)))){
- if (NULL != (dl_error = dlerror()))
- collector_error("dlopen() libfluent-bit.so error: %s", dl_error);
- rc = -1;
- goto do_return;
- }
-
- dlerror(); /* Clear any existing error */
-
- /* Load Fluent-Bit functions from the shared library */
- #define load_function(FUNC_NAME){ \
- *(void **) (&FUNC_NAME) = dlsym(flb_lib_handle, LOGS_MANAG_STR(FUNC_NAME)); \
- if ((dl_error = dlerror()) != NULL) { \
- collector_error("dlerror loading %s: %s", LOGS_MANAG_STR(FUNC_NAME), dl_error); \
- rc = -1; \
- goto do_return; \
- } \
- }
-
- load_function(flb_create);
- load_function(flb_service_set);
- load_function(flb_start);
- load_function(flb_stop);
- load_function(flb_destroy);
- load_function(flb_time_pop_from_msgpack);
- load_function(flb_lib_free);
- load_function(flb_parser_create);
- load_function(flb_input);
- load_function(flb_input_set);
- // load_function(flb_filter);
- // load_function(flb_filter_set);
- load_function(flb_output);
- load_function(flb_output_set);
- *(void **) (&dl_msgpack_unpack_next) = dlsym(flb_lib_handle, "msgpack_unpack_next");
- if ((dl_error = dlerror()) != NULL) {
- collector_error("dlerror loading msgpack_unpack_next: %s", dl_error);
- rc = -1;
- goto do_return;
- }
- *(void **) (&dl_msgpack_zone_free) = dlsym(flb_lib_handle, "msgpack_zone_free");
- if ((dl_error = dlerror()) != NULL) {
- collector_error("dlerror loading msgpack_zone_free: %s", dl_error);
- rc = -1;
- goto do_return;
- }
- *(void **) (&dl_msgpack_object_print_buffer) = dlsym(flb_lib_handle, "msgpack_object_print_buffer");
- if ((dl_error = dlerror()) != NULL) {
- collector_error("dlerror loading msgpack_object_print_buffer: %s", dl_error);
- rc = -1;
- goto do_return;
- }
-
- ctx = flb_create();
- if (unlikely(!ctx)){
- rc = -1;
- goto do_return;
- }
-
- /* Global service settings */
- if(unlikely(flb_service_set(ctx,
- "Flush" , flb_srvc_config.flush,
- "HTTP_Listen" , flb_srvc_config.http_listen,
- "HTTP_Port" , flb_srvc_config.http_port,
- "HTTP_Server" , flb_srvc_config.http_server,
- "Log_File" , flb_srvc_config.log_path,
- "Log_Level" , flb_srvc_config.log_level,
- "Coro_stack_size" , flb_srvc_config.coro_stack_size,
- NULL) != 0 )){
- rc = -1;
- goto do_return;
- }
-
- if(new_sd_journal_field_prefix && *new_sd_journal_field_prefix)
- sd_journal_field_prefix = new_sd_journal_field_prefix;
-
-do_return:
- freez(flb_lib_path);
- if(unlikely(rc && flb_lib_handle))
- dlclose(flb_lib_handle);
-
- return rc;
-}
-
-int flb_run(void){
- if (likely(flb_start(ctx)) == 0) return 0;
- else return -1;
-}
-
-void flb_terminate(void){
- if(ctx){
- flb_stop(ctx);
- flb_destroy(ctx);
- ctx = NULL;
- }
- if(flb_lib_handle)
- dlclose(flb_lib_handle);
-}
-
-static void flb_complete_buff_item(struct File_info *p_file_info){
-
- Circ_buff_t *buff = p_file_info->circ_buff;
-
- m_assert(buff->in->timestamp, "buff->in->timestamp cannot be 0");
- m_assert(buff->in->data, "buff->in->text cannot be NULL");
- m_assert(*buff->in->data, "*buff->in->text cannot be 0");
- m_assert(buff->in->text_size, "buff->in->text_size cannot be 0");
-
- /* Replace last '\n' with '\0' to null-terminate text */
- buff->in->data[buff->in->text_size - 1] = '\0';
-
- /* Store status (timestamp and text_size must have already been
- * stored during flb_collect_logs_cb() ). */
- buff->in->status = CIRC_BUFF_ITEM_STATUS_UNPROCESSED;
-
- /* Load max size of compressed buffer, as calculated previously */
- size_t text_compressed_buff_max_size = buff->in->text_compressed_size;
-
- /* Do compression.
- * TODO: Validate compression option? */
- buff->in->text_compressed = buff->in->data + buff->in->text_size;
- buff->in->text_compressed_size = LZ4_compress_fast( buff->in->data,
- buff->in->text_compressed,
- buff->in->text_size,
- text_compressed_buff_max_size,
- p_file_info->compression_accel);
- m_assert(buff->in->text_compressed_size != 0, "Text_compressed_size should be != 0");
-
- p_file_info->parser_metrics->last_update = buff->in->timestamp / MSEC_PER_SEC;
-
- p_file_info->parser_metrics->num_lines += buff->in->num_lines;
-
- /* Perform custom log chart parsing */
- for(int i = 0; p_file_info->parser_cus_config[i]; i++){
- p_file_info->parser_metrics->parser_cus[i]->count +=
- search_keyword( buff->in->data, buff->in->text_size, NULL, NULL,
- NULL, &p_file_info->parser_cus_config[i]->regex, 0);
- }
-
- /* Update charts */
- netdata_mutex_lock(&stdout_mut);
- p_file_info->chart_meta->update(p_file_info);
- fflush(stdout);
- netdata_mutex_unlock(&stdout_mut);
-
- circ_buff_insert(buff);
-
- uv_timer_again(&p_file_info->flb_tmp_buff_cpy_timer);
-}
-
-void flb_complete_item_timer_timeout_cb(uv_timer_t *handle) {
-
- struct File_info *p_file_info = handle->data;
- Circ_buff_t *buff = p_file_info->circ_buff;
-
- uv_mutex_lock(&p_file_info->flb_tmp_buff_mut);
- if(!buff->in->data || !*buff->in->data || !buff->in->text_size){
- p_file_info->parser_metrics->last_update = now_realtime_sec();
- netdata_mutex_lock(&stdout_mut);
- p_file_info->chart_meta->update(p_file_info);
- fflush(stdout);
- netdata_mutex_unlock(&stdout_mut);
- uv_mutex_unlock(&p_file_info->flb_tmp_buff_mut);
- return;
- }
-
- flb_complete_buff_item(p_file_info);
-
- uv_mutex_unlock(&p_file_info->flb_tmp_buff_mut);
-}
-
-static int flb_collect_logs_cb(void *record, size_t size, void *data){
-
- /* "data" is NULL for Forward-type sources and non-NULL for local sources */
- struct File_info *p_file_info = (struct File_info *) data;
- Circ_buff_t *buff = NULL;
-
- msgpack_unpacked result;
- size_t off = 0;
- struct flb_time tmp_time;
- msgpack_object *x;
-
- char timestamp_str[TIMESTAMP_MS_STR_SIZE] = "";
- msec_t timestamp = 0;
-
- struct resizable_key_val_arr {
- char **key;
- char **val;
- size_t *key_size;
- size_t *val_size;
- int size, max_size;
- };
-
- /* FLB_WEB_LOG case */
- Log_line_parsed_t line_parsed = (Log_line_parsed_t) {0};
- /* FLB_WEB_LOG case end */
-
- /* FLB_KMSG case */
- static int skip_kmsg_log_buffering = 1;
- int kmsg_sever = -1; // -1 equals invalid
- /* FLB_KMSG case end */
-
- /* FLB_SYSTEMD or FLB_SYSLOG case */
- char syslog_prival[4] = "";
- size_t syslog_prival_size = 0;
- char syslog_severity[2] = "";
- char syslog_facility[3] = "";
- char *syslog_timestamp = NULL;
- size_t syslog_timestamp_size = 0;
- char *hostname = NULL;
- size_t hostname_size = 0;
- char *syslog_identifier = NULL;
- size_t syslog_identifier_size = 0;
- char *pid = NULL;
- size_t pid_size = 0;
- char *message = NULL;
- size_t message_size = 0;
- /* FLB_SYSTEMD or FLB_SYSLOG case end */
-
- /* FLB_DOCKER_EV case */
- long docker_ev_time = 0;
- long docker_ev_timeNano = 0;
- char *docker_ev_type = NULL;
- size_t docker_ev_type_size = 0;
- char *docker_ev_action = NULL;
- size_t docker_ev_action_size = 0;
- char *docker_ev_id = NULL;
- size_t docker_ev_id_size = 0;
- static struct resizable_key_val_arr docker_ev_attr = {0};
- docker_ev_attr.size = 0;
- /* FLB_DOCKER_EV case end */
-
- /* FLB_MQTT case */
- char *mqtt_topic = NULL;
- size_t mqtt_topic_size = 0;
- static char *mqtt_message = NULL;
- static size_t mqtt_message_size_max = 0;
- /* FLB_MQTT case end */
-
- size_t new_tmp_text_size = 0;
-
- msgpack_unpacked_init(&result);
-
- int iter = 0;
- while (dl_msgpack_unpack_next(&result, record, size, &off) == MSGPACK_UNPACK_SUCCESS) {
- iter++;
- m_assert(iter == 1, "We do not expect more than one loop iteration here");
-
- flb_time_pop_from_msgpack(&tmp_time, &result, &x);
-
- if(likely(x->type == MSGPACK_OBJECT_MAP && x->via.map.size != 0)){
- msgpack_object_kv* p = x->via.map.ptr;
- msgpack_object_kv* pend = x->via.map.ptr + x->via.map.size;
-
- /* ================================================================
- * If p_file_info == NULL, it means it is a "Forward" source, so
- * we need to search for the associated p_file_info. This code can
- * be optimized further.
- * ============================================================== */
- if(p_file_info == NULL){
- do{
- if(!strncmp(p->key.via.str.ptr, "stream guid", (size_t) p->key.via.str.size)){
- char *stream_guid = (char *) p->val.via.str.ptr;
- size_t stream_guid_size = p->val.via.str.size;
- debug_log( "stream guid:%.*s", (int) stream_guid_size, stream_guid);
-
- for (int i = 0; i < p_file_infos_arr->count; i++) {
- if(!strncmp(p_file_infos_arr->data[i]->stream_guid, stream_guid, stream_guid_size)){
- p_file_info = p_file_infos_arr->data[i];
- // debug_log( "p_file_info match found: %s type[%s]",
- // p_file_info->stream_guid,
- // log_src_type_t_str[p_file_info->log_type]);
- break;
- }
- }
- }
- ++p;
- // continue;
- } while(p < pend);
- }
- if(unlikely(p_file_info == NULL))
- goto skip_collect_and_drop_logs;
-
-
- uv_mutex_lock(&p_file_info->flb_tmp_buff_mut);
- buff = p_file_info->circ_buff;
-
-
- p = x->via.map.ptr;
- pend = x->via.map.ptr + x->via.map.size;
- do{
- switch(p_file_info->log_type){
-
- case FLB_TAIL:
- case FLB_WEB_LOG:
- case FLB_SERIAL:
- {
- if( !strncmp(p->key.via.str.ptr, LOG_REC_KEY, (size_t) p->key.via.str.size) ||
- /* The following line is in case we collect systemd logs
- * (tagged as "MESSAGE") or docker_events (tagged as
- * "message") via a "Forward" source to an FLB_TAIL parent. */
- !strncasecmp(p->key.via.str.ptr, LOG_REC_KEY_SYSTEMD, (size_t) p->key.via.str.size)){
-
- message = (char *) p->val.via.str.ptr;
- message_size = p->val.via.str.size;
-
- if(p_file_info->log_type == FLB_WEB_LOG){
- parse_web_log_line( (Web_log_parser_config_t *) p_file_info->parser_config->gen_config,
- message, message_size, &line_parsed);
-
- if(likely(p_file_info->use_log_timestamp)){
- timestamp = line_parsed.timestamp * MSEC_PER_SEC; // convert to msec from sec
-
- { /* ------------------ FIXME ------------------------
- * Temporary kludge so that metrics don't break when
- * a new record has timestamp before the current one.
- */
- static msec_t previous_timestamp = 0;
- if((((long long) timestamp - (long long) previous_timestamp) < 0))
- timestamp = previous_timestamp;
-
- previous_timestamp = timestamp;
- }
- }
- }
-
- new_tmp_text_size = message_size + 1; // +1 for '\n'
-
- m_assert(message_size, "message_size is 0");
- m_assert(message, "message is NULL");
- }
-
- break;
- }
-
- case FLB_KMSG:
- {
- if(unlikely(skip_kmsg_log_buffering)){
- static time_t netdata_start_time = 0;
- if (!netdata_start_time) netdata_start_time = now_boottime_sec();
- if(now_boottime_sec() - netdata_start_time < KERNEL_LOGS_COLLECT_INIT_WAIT)
- goto skip_collect_and_drop_logs;
- else skip_kmsg_log_buffering = 0;
- }
-
- /* NOTE/WARNING:
- * kmsg timestamps are tricky. The timestamp will be
- * *wrong** if the system has gone into hibernation since
- * last boot and "p_file_info->use_log_timestamp" is set.
- * Even if "p_file_info->use_log_timestamp" is NOT set, we
- * need to use now_realtime_msec() as Fluent Bit timestamp
- * will also be wrong. */
- if( !strncmp(p->key.via.str.ptr, "sec", (size_t) p->key.via.str.size)){
- if(p_file_info->use_log_timestamp){
- timestamp += (now_realtime_sec() - now_boottime_sec() + p->val.via.i64) * MSEC_PER_SEC;
- }
- else if(!timestamp)
- timestamp = now_realtime_msec();
- }
- else if(!strncmp(p->key.via.str.ptr, "usec", (size_t) p->key.via.str.size) &&
- p_file_info->use_log_timestamp){
- timestamp += p->val.via.i64 / USEC_PER_MS;
- }
- else if(!strncmp(p->key.via.str.ptr, LOG_REC_KEY, (size_t) p->key.via.str.size)){
- message = (char *) p->val.via.str.ptr;
- message_size = p->val.via.str.size;
-
- m_assert(message, "message is NULL");
- m_assert(message_size, "message_size is 0");
-
- new_tmp_text_size += message_size + 1; // +1 for '\n'
- }
- else if(!strncmp(p->key.via.str.ptr, "priority", (size_t) p->key.via.str.size)){
- kmsg_sever = (int) p->val.via.u64;
- }
-
- break;
- }
-
- case FLB_SYSTEMD:
- case FLB_SYSLOG:
- {
- if( p_file_info->use_log_timestamp && !strncmp( p->key.via.str.ptr,
- "SOURCE_REALTIME_TIMESTAMP",
- (size_t) p->key.via.str.size)){
-
- m_assert(p->val.via.str.size - 3 == TIMESTAMP_MS_STR_SIZE - 1,
- "p->val.via.str.size - 3 != TIMESTAMP_MS_STR_SIZE");
-
- strncpyz(timestamp_str, p->val.via.str.ptr, (size_t) p->val.via.str.size);
-
- char *endptr = NULL;
- timestamp = str2ll(timestamp_str, &endptr);
- timestamp = *endptr ? 0 : timestamp / USEC_PER_MS;
- }
- else if(!strncmp(p->key.via.str.ptr, "PRIVAL", (size_t) p->key.via.str.size)){
- m_assert(p->val.via.str.size <= 3, "p->val.via.str.size > 3");
- strncpyz(syslog_prival, p->val.via.str.ptr, (size_t) p->val.via.str.size);
- syslog_prival_size = (size_t) p->val.via.str.size;
-
- m_assert(syslog_prival, "syslog_prival is NULL");
- }
- else if(!strncmp(p->key.via.str.ptr, "PRIORITY", (size_t) p->key.via.str.size)){
- m_assert(p->val.via.str.size <= 1, "p->val.via.str.size > 1");
- strncpyz(syslog_severity, p->val.via.str.ptr, (size_t) p->val.via.str.size);
-
- m_assert(syslog_severity, "syslog_severity is NULL");
- }
- else if(!strncmp(p->key.via.str.ptr, "SYSLOG_FACILITY", (size_t) p->key.via.str.size)){
- m_assert(p->val.via.str.size <= 2, "p->val.via.str.size > 2");
- strncpyz(syslog_facility, p->val.via.str.ptr, (size_t) p->val.via.str.size);
-
- m_assert(syslog_facility, "syslog_facility is NULL");
- }
- else if(!strncmp(p->key.via.str.ptr, "SYSLOG_TIMESTAMP", (size_t) p->key.via.str.size)){
- syslog_timestamp = (char *) p->val.via.str.ptr;
- syslog_timestamp_size = p->val.via.str.size;
-
- m_assert(syslog_timestamp, "syslog_timestamp is NULL");
- m_assert(syslog_timestamp_size, "syslog_timestamp_size is 0");
-
- new_tmp_text_size += syslog_timestamp_size;
- }
- else if(!strncmp(p->key.via.str.ptr, "HOSTNAME", (size_t) p->key.via.str.size)){
- hostname = (char *) p->val.via.str.ptr;
- hostname_size = p->val.via.str.size;
-
- m_assert(hostname, "hostname is NULL");
- m_assert(hostname_size, "hostname_size is 0");
-
- new_tmp_text_size += hostname_size + 1; // +1 for ' ' char
- }
- else if(!strncmp(p->key.via.str.ptr, "SYSLOG_IDENTIFIER", (size_t) p->key.via.str.size)){
- syslog_identifier = (char *) p->val.via.str.ptr;
- syslog_identifier_size = p->val.via.str.size;
-
- new_tmp_text_size += syslog_identifier_size;
- }
- else if(!strncmp(p->key.via.str.ptr, "PID", (size_t) p->key.via.str.size)){
- pid = (char *) p->val.via.str.ptr;
- pid_size = p->val.via.str.size;
-
- new_tmp_text_size += pid_size;
- }
- else if(!strncmp(p->key.via.str.ptr, LOG_REC_KEY_SYSTEMD, (size_t) p->key.via.str.size)){
-
- message = (char *) p->val.via.str.ptr;
- message_size = p->val.via.str.size;
-
- m_assert(message, "message is NULL");
- m_assert(message_size, "message_size is 0");
-
- new_tmp_text_size += message_size;
- }
-
- break;
- }
-
- case FLB_DOCKER_EV:
- {
- if(!strncmp(p->key.via.str.ptr, "time", (size_t) p->key.via.str.size)){
- docker_ev_time = p->val.via.i64;
-
- m_assert(docker_ev_time, "docker_ev_time is 0");
- }
- else if(!strncmp(p->key.via.str.ptr, "timeNano", (size_t) p->key.via.str.size)){
- docker_ev_timeNano = p->val.via.i64;
-
- m_assert(docker_ev_timeNano, "docker_ev_timeNano is 0");
-
- if(likely(p_file_info->use_log_timestamp))
- timestamp = docker_ev_timeNano / NSEC_PER_MSEC;
- }
- else if(!strncmp(p->key.via.str.ptr, "Type", (size_t) p->key.via.str.size)){
- docker_ev_type = (char *) p->val.via.str.ptr;
- docker_ev_type_size = p->val.via.str.size;
-
- m_assert(docker_ev_type, "docker_ev_type is NULL");
- m_assert(docker_ev_type_size, "docker_ev_type_size is 0");
-
- // debug_log("docker_ev_type: %.*s", docker_ev_type_size, docker_ev_type);
- }
- else if(!strncmp(p->key.via.str.ptr, "Action", (size_t) p->key.via.str.size)){
- docker_ev_action = (char *) p->val.via.str.ptr;
- docker_ev_action_size = p->val.via.str.size;
-
- m_assert(docker_ev_action, "docker_ev_action is NULL");
- m_assert(docker_ev_action_size, "docker_ev_action_size is 0");
-
- // debug_log("docker_ev_action: %.*s", docker_ev_action_size, docker_ev_action);
- }
- else if(!strncmp(p->key.via.str.ptr, "id", (size_t) p->key.via.str.size)){
- docker_ev_id = (char *) p->val.via.str.ptr;
- docker_ev_id_size = p->val.via.str.size;
-
- m_assert(docker_ev_id, "docker_ev_id is NULL");
- m_assert(docker_ev_id_size, "docker_ev_id_size is 0");
-
- // debug_log("docker_ev_id: %.*s", docker_ev_id_size, docker_ev_id);
- }
- else if(!strncmp(p->key.via.str.ptr, "Actor", (size_t) p->key.via.str.size)){
- // debug_log( "msg key:[%.*s]val:[%.*s]", (int) p->key.via.str.size,
- // p->key.via.str.ptr,
- // (int) p->val.via.str.size,
- // p->val.via.str.ptr);
- if(likely(p->val.type == MSGPACK_OBJECT_MAP && p->val.via.map.size != 0)){
- msgpack_object_kv* ac = p->val.via.map.ptr;
- msgpack_object_kv* const ac_pend= p->val.via.map.ptr + p->val.via.map.size;
- do{
- if(!strncmp(ac->key.via.str.ptr, "ID", (size_t) ac->key.via.str.size)){
- docker_ev_id = (char *) ac->val.via.str.ptr;
- docker_ev_id_size = ac->val.via.str.size;
-
- m_assert(docker_ev_id, "docker_ev_id is NULL");
- m_assert(docker_ev_id_size, "docker_ev_id_size is 0");
-
- // debug_log("docker_ev_id: %.*s", docker_ev_id_size, docker_ev_id);
- }
- else if(!strncmp(ac->key.via.str.ptr, "Attributes", (size_t) ac->key.via.str.size)){
- if(likely(ac->val.type == MSGPACK_OBJECT_MAP && ac->val.via.map.size != 0)){
- msgpack_object_kv* att = ac->val.via.map.ptr;
- msgpack_object_kv* const att_pend = ac->val.via.map.ptr + ac->val.via.map.size;
- do{
- if(unlikely(++docker_ev_attr.size > docker_ev_attr.max_size)){
- docker_ev_attr.max_size = docker_ev_attr.size;
- docker_ev_attr.key = reallocz(docker_ev_attr.key,
- docker_ev_attr.max_size * sizeof(char *));
- docker_ev_attr.val = reallocz(docker_ev_attr.val,
- docker_ev_attr.max_size * sizeof(char *));
- docker_ev_attr.key_size = reallocz(docker_ev_attr.key_size,
- docker_ev_attr.max_size * sizeof(size_t));
- docker_ev_attr.val_size = reallocz(docker_ev_attr.val_size,
- docker_ev_attr.max_size * sizeof(size_t));
- }
-
- docker_ev_attr.key[docker_ev_attr.size - 1] = (char *) att->key.via.str.ptr;
- docker_ev_attr.val[docker_ev_attr.size - 1] = (char *) att->val.via.str.ptr;
- docker_ev_attr.key_size[docker_ev_attr.size - 1] = (size_t) att->key.via.str.size;
- docker_ev_attr.val_size[docker_ev_attr.size - 1] = (size_t) att->val.via.str.size;
-
- att++;
- continue;
- } while(att < att_pend);
- }
- }
- ac++;
- continue;
- } while(ac < ac_pend);
- }
- }
-
- break;
- }
-
- case FLB_MQTT:
- {
- if(!strncmp(p->key.via.str.ptr, "topic", (size_t) p->key.via.str.size)){
- mqtt_topic = (char *) p->val.via.str.ptr;
- mqtt_topic_size = (size_t) p->val.via.str.size;
-
- while(0 == (message_size = dl_msgpack_object_print_buffer(mqtt_message, mqtt_message_size_max, *x)))
- mqtt_message = reallocz(mqtt_message, (mqtt_message_size_max += 10));
-
- new_tmp_text_size = message_size + 1; // +1 for '\n'
-
- m_assert(message_size, "message_size is 0");
- m_assert(mqtt_message, "mqtt_message is NULL");
-
- break; // watch out, MQTT requires a 'break' here, as we parse the entire 'x' msgpack_object
- }
- else m_assert(0, "missing mqtt topic");
-
- break;
- }
-
- default:
- break;
- }
-
- } while(++p < pend);
- }
- }
-
- /* If no log timestamp was found, use Fluent Bit collection timestamp. */
- if(timestamp == 0)
- timestamp = (msec_t) tmp_time.tm.tv_sec * MSEC_PER_SEC + (msec_t) tmp_time.tm.tv_nsec / (NSEC_PER_MSEC);
-
- m_assert(TEST_MS_TIMESTAMP_VALID(timestamp), "timestamp is invalid");
-
- /* If input buffer timestamp is not set, now is the time to set it,
- * else just be done with the previous buffer */
- if(unlikely(buff->in->timestamp == 0)) buff->in->timestamp = timestamp / 1000 * 1000; // rounding down
- else if((timestamp - buff->in->timestamp) >= MSEC_PER_SEC) {
- flb_complete_buff_item(p_file_info);
- buff->in->timestamp = timestamp / 1000 * 1000; // rounding down
- }
-
- m_assert(TEST_MS_TIMESTAMP_VALID(buff->in->timestamp), "buff->in->timestamp is invalid");
-
- new_tmp_text_size += buff->in->text_size;
-
- /* ========================================================================
- * Step 2: Extract metrics and reconstruct log record
- * ====================================================================== */
-
- /* Parse number of log lines - common for all log source types */
- buff->in->num_lines++;
-
- /* FLB_TAIL, FLB_WEB_LOG and FLB_SERIAL case */
- if( p_file_info->log_type == FLB_TAIL ||
- p_file_info->log_type == FLB_WEB_LOG ||
- p_file_info->log_type == FLB_SERIAL){
-
- if(p_file_info->log_type == FLB_WEB_LOG)
- extract_web_log_metrics(p_file_info->parser_config, &line_parsed,
- p_file_info->parser_metrics->web_log);
-
- // TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0.
- if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size)))
- goto skip_collect_and_drop_logs;
-
- size_t tmp_item_off = buff->in->text_size;
-
- memcpy_iscntrl_fix(&buff->in->data[tmp_item_off], message, message_size);
- tmp_item_off += message_size;
-
- buff->in->data[tmp_item_off++] = '\n';
- m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size");
- buff->in->text_size = new_tmp_text_size;
-
-#ifdef HAVE_SYSTEMD
- if(p_file_info->do_sd_journal_send){
- if(p_file_info->log_type == FLB_WEB_LOG){
- sd_journal_send(
- SD_JOURNAL_SEND_DEFAULT_FIELDS,
- *line_parsed.vhost ? "%sWEB_LOG_VHOST=%s" : "_%s=%s", sd_journal_field_prefix, line_parsed.vhost,
- line_parsed.port ? "%sWEB_LOG_PORT=%d" : "_%s=%d", sd_journal_field_prefix, line_parsed.port,
- *line_parsed.req_scheme ? "%sWEB_LOG_REQ_SCHEME=%s" : "_%s=%s", sd_journal_field_prefix, line_parsed.req_scheme,
- *line_parsed.req_client ? "%sWEB_LOG_REQ_CLIENT=%s" : "_%s=%s", sd_journal_field_prefix, line_parsed.req_client,
- "%sWEB_LOG_REQ_METHOD=%s" , sd_journal_field_prefix, line_parsed.req_method,
- *line_parsed.req_URL ? "%sWEB_LOG_REQ_URL=%s" : "_%s=%s", sd_journal_field_prefix, line_parsed.req_URL,
- *line_parsed.req_proto ? "%sWEB_LOG_REQ_PROTO=%s" : "_%s=%s", sd_journal_field_prefix, line_parsed.req_proto,
- line_parsed.req_size ? "%sWEB_LOG_REQ_SIZE=%d" : "_%s=%d", sd_journal_field_prefix, line_parsed.req_size,
- line_parsed.req_proc_time ? "%sWEB_LOG_REC_PROC_TIME=%d" : "_%s=%d", sd_journal_field_prefix, line_parsed.req_proc_time,
- line_parsed.resp_code ? "%sWEB_LOG_RESP_CODE=%d" : "_%s=%d", sd_journal_field_prefix ,line_parsed.resp_code,
- line_parsed.ups_resp_time ? "%sWEB_LOG_UPS_RESP_TIME=%d" : "_%s=%d", sd_journal_field_prefix ,line_parsed.ups_resp_time,
- *line_parsed.ssl_proto ? "%sWEB_LOG_SSL_PROTO=%s" : "_%s=%s", sd_journal_field_prefix ,line_parsed.ssl_proto,
- *line_parsed.ssl_cipher ? "%sWEB_LOB_SSL_CIPHER=%s" : "_%s=%s", sd_journal_field_prefix ,line_parsed.ssl_cipher,
- LOG_REC_KEY_SYSTEMD "=%.*s", (int) message_size, message,
- NULL
- );
- }
- else if(p_file_info->log_type == FLB_SERIAL){
- Flb_serial_config_t *serial_config = (Flb_serial_config_t *) p_file_info->flb_config;
- sd_journal_send(
- SD_JOURNAL_SEND_DEFAULT_FIELDS,
- serial_config->bitrate && *serial_config->bitrate ?
- "%sSERIAL_BITRATE=%s" : "_%s=%s", sd_journal_field_prefix, serial_config->bitrate,
- LOG_REC_KEY_SYSTEMD "=%.*s", (int) message_size, message,
- NULL
- );
- }
- else{
- sd_journal_send(
- SD_JOURNAL_SEND_DEFAULT_FIELDS,
- LOG_REC_KEY_SYSTEMD "=%.*s", (int) message_size, message,
- NULL
- );
- }
- }
-#endif
-
- } /* FLB_TAIL, FLB_WEB_LOG and FLB_SERIAL case end */
-
- /* FLB_KMSG case */
- else if(p_file_info->log_type == FLB_KMSG){
-
- char *c;
-
- // see https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg
- if((c = memchr(message, '\n', message_size))){
-
- const char subsys_str[] = "SUBSYSTEM=",
- device_str[] = "DEVICE=";
- const size_t subsys_str_len = sizeof(subsys_str) - 1,
- device_str_len = sizeof(device_str) - 1;
-
- size_t bytes_remain = message_size - (c - message);
-
- /* Extract machine-readable info for charts, such as subsystem and device. */
- while(bytes_remain){
- size_t sz = 0;
- while(--bytes_remain && c[++sz] != '\n');
- if(bytes_remain) --sz;
- *(c++) = '\\';
- *(c++) = 'n';
- sz--;
-
- DICTIONARY *dict = NULL;
- char *str = NULL;
- size_t str_len = 0;
- if(!strncmp(c, subsys_str, subsys_str_len)){
- dict = p_file_info->parser_metrics->kernel->subsystem;
- str = &c[subsys_str_len];
- str_len = (sz - subsys_str_len);
- }
- else if (!strncmp(c, device_str, device_str_len)){
- dict = p_file_info->parser_metrics->kernel->device;
- str = &c[device_str_len];
- str_len = (sz - device_str_len);
- }
-
- if(likely(str)){
- char *const key = mallocz(str_len + 1);
- memcpy(key, str, str_len);
- key[str_len] = '\0';
- metrics_dict_item_t item = {.dim_initialized = false, .num_new = 1};
- dictionary_set_advanced(dict, key, str_len + 1, &item, sizeof(item), NULL);
- }
- c = &c[sz];
- }
- }
-
- if(likely(kmsg_sever >= 0))
- p_file_info->parser_metrics->kernel->sever[kmsg_sever]++;
-
- // TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0.
- if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size)))
- goto skip_collect_and_drop_logs;
-
- size_t tmp_item_off = buff->in->text_size;
-
- memcpy_iscntrl_fix(&buff->in->data[tmp_item_off], message, message_size);
- tmp_item_off += message_size;
-
- buff->in->data[tmp_item_off++] = '\n';
- m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size");
- buff->in->text_size = new_tmp_text_size;
- } /* FLB_KMSG case end */
-
- /* FLB_SYSTEMD or FLB_SYSLOG case */
- else if(p_file_info->log_type == FLB_SYSTEMD ||
- p_file_info->log_type == FLB_SYSLOG){
-
- int syslog_prival_d = SYSLOG_PRIOR_ARR_SIZE - 1; // Initialise to 'unknown'
- int syslog_severity_d = SYSLOG_SEVER_ARR_SIZE - 1; // Initialise to 'unknown'
- int syslog_facility_d = SYSLOG_FACIL_ARR_SIZE - 1; // Initialise to 'unknown'
-
-
- /* FLB_SYSTEMD case has syslog_severity and syslog_facility values that
- * are used to calculate syslog_prival from. FLB_SYSLOG is the opposite
- * case, as it has a syslog_prival value that is used to calculate
- * syslog_severity and syslog_facility from. */
- if(p_file_info->log_type == FLB_SYSTEMD){
-
- /* Parse syslog_severity char* field into int and extract metrics.
- * syslog_severity_s will consist of 1 char (plus '\0'),
- * see https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.1 */
- if(likely(syslog_severity[0])){
- if(likely(str2int(&syslog_severity_d, syslog_severity, 10) == STR2XX_SUCCESS)){
- p_file_info->parser_metrics->systemd->sever[syslog_severity_d]++;
- } // else parsing errors ++ ??
- } else p_file_info->parser_metrics->systemd->sever[SYSLOG_SEVER_ARR_SIZE - 1]++; // 'unknown'
-
- /* Parse syslog_facility char* field into int and extract metrics.
- * syslog_facility_s will consist of up to 2 chars (plus '\0'),
- * see https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.1 */
- if(likely(syslog_facility[0])){
- if(likely(str2int(&syslog_facility_d, syslog_facility, 10) == STR2XX_SUCCESS)){
- p_file_info->parser_metrics->systemd->facil[syslog_facility_d]++;
- } // else parsing errors ++ ??
- } else p_file_info->parser_metrics->systemd->facil[SYSLOG_FACIL_ARR_SIZE - 1]++; // 'unknown'
-
- if(likely(syslog_severity[0] && syslog_facility[0])){
- /* Definition of syslog priority value == facility * 8 + severity */
- syslog_prival_d = syslog_facility_d * 8 + syslog_severity_d;
- syslog_prival_size = snprintfz(syslog_prival, 4, "%d", syslog_prival_d);
- m_assert(syslog_prival_size < 4 && syslog_prival_size > 0, "error with snprintf()");
-
- new_tmp_text_size += syslog_prival_size + 2; // +2 for '<' and '>'
-
- p_file_info->parser_metrics->systemd->prior[syslog_prival_d]++;
- } else {
- new_tmp_text_size += 3; // +3 for "<->" string
- p_file_info->parser_metrics->systemd->prior[SYSLOG_PRIOR_ARR_SIZE - 1]++; // 'unknown'
- }
-
- } else if(p_file_info->log_type == FLB_SYSLOG){
-
- if(likely(syslog_prival[0])){
- if(likely(str2int(&syslog_prival_d, syslog_prival, 10) == STR2XX_SUCCESS)){
- syslog_severity_d = syslog_prival_d % 8;
- syslog_facility_d = syslog_prival_d / 8;
-
- p_file_info->parser_metrics->systemd->prior[syslog_prival_d]++;
- p_file_info->parser_metrics->systemd->sever[syslog_severity_d]++;
- p_file_info->parser_metrics->systemd->facil[syslog_facility_d]++;
-
- new_tmp_text_size += syslog_prival_size + 2; // +2 for '<' and '>'
-
- } // else parsing errors ++ ??
- } else {
- new_tmp_text_size += 3; // +3 for "<->" string
- p_file_info->parser_metrics->systemd->prior[SYSLOG_PRIOR_ARR_SIZE - 1]++; // 'unknown'
- p_file_info->parser_metrics->systemd->sever[SYSLOG_SEVER_ARR_SIZE - 1]++; // 'unknown'
- p_file_info->parser_metrics->systemd->facil[SYSLOG_FACIL_ARR_SIZE - 1]++; // 'unknown'
- }
-
- } else m_assert(0, "shoudn't get here");
-
- char syslog_time_from_flb_time[25]; // 25 just to be on the safe side, but 16 + 1 chars bytes needed only.
- if(unlikely(!syslog_timestamp)){
- const time_t ts = tmp_time.tm.tv_sec;
- struct tm *const tm = localtime(&ts);
-
- strftime(syslog_time_from_flb_time, sizeof(syslog_time_from_flb_time), "%b %d %H:%M:%S ", tm);
- new_tmp_text_size += SYSLOG_TIMESTAMP_SIZE;
- }
-
- if(unlikely(!syslog_identifier)) new_tmp_text_size += sizeof(UNKNOWN) - 1;
- if(unlikely(!pid)) new_tmp_text_size += sizeof(UNKNOWN) - 1;
-
- new_tmp_text_size += 5; // +5 for '[', ']', ':' and ' ' characters around and after pid and '\n' at the end
-
- /* Metrics extracted, now prepare circular buffer for write */
- // TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0.
- if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size)))
- goto skip_collect_and_drop_logs;
-
- size_t tmp_item_off = buff->in->text_size;
-
- buff->in->data[tmp_item_off++] = '<';
- if(likely(syslog_prival[0])){
- memcpy(&buff->in->data[tmp_item_off], syslog_prival, syslog_prival_size);
- m_assert(syslog_prival_size, "syslog_prival_size cannot be 0");
- tmp_item_off += syslog_prival_size;
- } else buff->in->data[tmp_item_off++] = '-';
- buff->in->data[tmp_item_off++] = '>';
-
- if(likely(syslog_timestamp)){
- memcpy(&buff->in->data[tmp_item_off], syslog_timestamp, syslog_timestamp_size);
- // FLB_SYSLOG doesn't add space, but FLB_SYSTEMD does:
- // if(buff->in->data[tmp_item_off] != ' ') buff->in->data[tmp_item_off++] = ' ';
- tmp_item_off += syslog_timestamp_size;
- } else {
- memcpy(&buff->in->data[tmp_item_off], syslog_time_from_flb_time, SYSLOG_TIMESTAMP_SIZE);
- tmp_item_off += SYSLOG_TIMESTAMP_SIZE;
- }
-
- if(likely(hostname)){
- memcpy(&buff->in->data[tmp_item_off], hostname, hostname_size);
- tmp_item_off += hostname_size;
- buff->in->data[tmp_item_off++] = ' ';
- }
-
- if(likely(syslog_identifier)){
- memcpy(&buff->in->data[tmp_item_off], syslog_identifier, syslog_identifier_size);
- tmp_item_off += syslog_identifier_size;
- } else {
- memcpy(&buff->in->data[tmp_item_off], UNKNOWN, sizeof(UNKNOWN) - 1);
- tmp_item_off += sizeof(UNKNOWN) - 1;
- }
-
- buff->in->data[tmp_item_off++] = '[';
- if(likely(pid)){
- memcpy(&buff->in->data[tmp_item_off], pid, pid_size);
- tmp_item_off += pid_size;
- } else {
- memcpy(&buff->in->data[tmp_item_off], UNKNOWN, sizeof(UNKNOWN) - 1);
- tmp_item_off += sizeof(UNKNOWN) - 1;
- }
- buff->in->data[tmp_item_off++] = ']';
-
- buff->in->data[tmp_item_off++] = ':';
- buff->in->data[tmp_item_off++] = ' ';
-
- if(likely(message)){
- memcpy_iscntrl_fix(&buff->in->data[tmp_item_off], message, message_size);
- tmp_item_off += message_size;
- }
-
- buff->in->data[tmp_item_off++] = '\n';
- m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size");
- buff->in->text_size = new_tmp_text_size;
- } /* FLB_SYSTEMD or FLB_SYSLOG case end */
-
- /* FLB_DOCKER_EV case */
- else if(p_file_info->log_type == FLB_DOCKER_EV){
-
- const size_t docker_ev_datetime_size = sizeof "2022-08-26T15:33:20.802840200+0000" /* example datetime */;
- char docker_ev_datetime[docker_ev_datetime_size];
- docker_ev_datetime[0] = 0;
- if(likely(docker_ev_time && docker_ev_timeNano)){
- struct timespec ts;
- ts.tv_sec = docker_ev_time;
- if(unlikely(0 == strftime( docker_ev_datetime, docker_ev_datetime_size,
- "%Y-%m-%dT%H:%M:%S.000000000%z", localtime(&ts.tv_sec)))) { /* TODO: do what if error? */};
- const size_t docker_ev_timeNano_s_size = sizeof "802840200";
- char docker_ev_timeNano_s[docker_ev_timeNano_s_size];
- snprintfz( docker_ev_timeNano_s, docker_ev_timeNano_s_size, "%0*ld",
- (int) docker_ev_timeNano_s_size, docker_ev_timeNano % 1000000000);
- memcpy(&docker_ev_datetime[20], &docker_ev_timeNano_s, docker_ev_timeNano_s_size - 1);
-
- new_tmp_text_size += docker_ev_datetime_size; // -1 for null terminator, +1 for ' ' character
- }
-
- if(likely(docker_ev_type && docker_ev_action)){
- int ev_off = -1;
- while(++ev_off < NUM_OF_DOCKER_EV_TYPES){
- if(!strncmp(docker_ev_type, docker_ev_type_string[ev_off], docker_ev_type_size)){
- p_file_info->parser_metrics->docker_ev->ev_type[ev_off]++;
-
- int act_off = -1;
- while(docker_ev_action_string[ev_off][++act_off] != NULL){
- if(!strncmp(docker_ev_action, docker_ev_action_string[ev_off][act_off], docker_ev_action_size)){
- p_file_info->parser_metrics->docker_ev->ev_action[ev_off][act_off]++;
- break;
- }
- }
- if(unlikely(docker_ev_action_string[ev_off][act_off] == NULL))
- p_file_info->parser_metrics->docker_ev->ev_action[NUM_OF_DOCKER_EV_TYPES - 1][0]++; // 'unknown'
-
- break;
- }
- }
- if(unlikely(ev_off >= NUM_OF_DOCKER_EV_TYPES - 1)){
- p_file_info->parser_metrics->docker_ev->ev_type[ev_off]++; // 'unknown'
- p_file_info->parser_metrics->docker_ev->ev_action[NUM_OF_DOCKER_EV_TYPES - 1][0]++; // 'unknown'
- }
-
- new_tmp_text_size += docker_ev_type_size + docker_ev_action_size + 2; // +2 for ' ' chars
- }
-
- if(likely(docker_ev_id)){
- // debug_log("docker_ev_id: %.*s", (int) docker_ev_id_size, docker_ev_id);
-
- new_tmp_text_size += docker_ev_id_size + 1; // +1 for ' ' char
- }
-
- if(likely(docker_ev_attr.size)){
- for(int i = 0; i < docker_ev_attr.size; i++){
- new_tmp_text_size += docker_ev_attr.key_size[i] +
- docker_ev_attr.val_size[i] + 3; // +3 for '=' ',' ' ' characters
- }
- /* new_tmp_text_size = -2 + 2;
- * -2 due to missing ',' ' ' from last attribute and +2 for the two
- * '(' and ')' characters, so no need to add or subtract */
- }
-
- new_tmp_text_size += 1; // +1 for '\n' character at the end
-
- /* Metrics extracted, now prepare circular buffer for write */
- // TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0.
- if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size)))
- goto skip_collect_and_drop_logs;
-
- size_t tmp_item_off = buff->in->text_size;
- message_size = new_tmp_text_size - 1 - tmp_item_off;
-
- if(likely(*docker_ev_datetime)){
- memcpy(&buff->in->data[tmp_item_off], docker_ev_datetime, docker_ev_datetime_size - 1);
- tmp_item_off += docker_ev_datetime_size - 1; // -1 due to null terminator
- buff->in->data[tmp_item_off++] = ' ';
- }
-
- if(likely(docker_ev_type)){
- memcpy(&buff->in->data[tmp_item_off], docker_ev_type, docker_ev_type_size);
- tmp_item_off += docker_ev_type_size;
- buff->in->data[tmp_item_off++] = ' ';
- }
-
- if(likely(docker_ev_action)){
- memcpy(&buff->in->data[tmp_item_off], docker_ev_action, docker_ev_action_size);
- tmp_item_off += docker_ev_action_size;
- buff->in->data[tmp_item_off++] = ' ';
- }
-
- if(likely(docker_ev_id)){
- memcpy(&buff->in->data[tmp_item_off], docker_ev_id, docker_ev_id_size);
- tmp_item_off += docker_ev_id_size;
- buff->in->data[tmp_item_off++] = ' ';
- }
-
- if(likely(docker_ev_attr.size)){
- buff->in->data[tmp_item_off++] = '(';
- for(int i = 0; i < docker_ev_attr.size; i++){
- memcpy(&buff->in->data[tmp_item_off], docker_ev_attr.key[i], docker_ev_attr.key_size[i]);
- tmp_item_off += docker_ev_attr.key_size[i];
- buff->in->data[tmp_item_off++] = '=';
- memcpy(&buff->in->data[tmp_item_off], docker_ev_attr.val[i], docker_ev_attr.val_size[i]);
- tmp_item_off += docker_ev_attr.val_size[i];
- buff->in->data[tmp_item_off++] = ',';
- buff->in->data[tmp_item_off++] = ' ';
- }
- tmp_item_off -= 2; // overwrite last ',' and ' ' characters with a ')' character
- buff->in->data[tmp_item_off++] = ')';
- }
-
- buff->in->data[tmp_item_off++] = '\n';
- m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size");
- buff->in->text_size = new_tmp_text_size;
-
-#ifdef HAVE_SYSTEMD
- if(p_file_info->do_sd_journal_send){
- sd_journal_send(
- SD_JOURNAL_SEND_DEFAULT_FIELDS,
- "%sDOCKER_EVENTS_TYPE=%.*s", sd_journal_field_prefix, (int) docker_ev_type_size, docker_ev_type,
- "%sDOCKER_EVENTS_ACTION=%.*s", sd_journal_field_prefix, (int) docker_ev_action_size, docker_ev_action,
- "%sDOCKER_EVENTS_ID=%.*s", sd_journal_field_prefix, (int) docker_ev_id_size, docker_ev_id,
- LOG_REC_KEY_SYSTEMD "=%.*s", (int) message_size, &buff->in->data[tmp_item_off - 1 - message_size],
- NULL
- );
- }
-#endif
-
- } /* FLB_DOCKER_EV case end */
-
- /* FLB_MQTT case */
- else if(p_file_info->log_type == FLB_MQTT){
- if(likely(mqtt_topic)){
- char *const key = mallocz(mqtt_topic_size + 1);
- memcpy(key, mqtt_topic, mqtt_topic_size);
- key[mqtt_topic_size] = '\0';
- metrics_dict_item_t item = {.dim_initialized = false, .num_new = 1};
- dictionary_set_advanced(p_file_info->parser_metrics->mqtt->topic, key, mqtt_topic_size + 1, &item, sizeof(item), NULL);
-
- // TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0.
- if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size)))
- goto skip_collect_and_drop_logs;
-
- size_t tmp_item_off = buff->in->text_size;
-
- memcpy(&buff->in->data[tmp_item_off], mqtt_message, message_size);
- tmp_item_off += message_size;
-
- buff->in->data[tmp_item_off++] = '\n';
- m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size");
- buff->in->text_size = new_tmp_text_size;
-
-#ifdef HAVE_SYSTEMD
- if(p_file_info->do_sd_journal_send){
- sd_journal_send(
- SD_JOURNAL_SEND_DEFAULT_FIELDS,
- "%sMQTT_TOPIC=%s", key,
- LOG_REC_KEY_SYSTEMD "=%.*s", (int) message_size, mqtt_message,
- NULL
- );
- }
-#endif
-
- }
- else m_assert(0, "missing mqtt topic");
- }
-
-skip_collect_and_drop_logs:
- /* Following code is equivalent to msgpack_unpacked_destroy(&result) due
- * to that function call being unavailable when using dl_open() */
- if(result.zone != NULL) {
- dl_msgpack_zone_free(result.zone);
- result.zone = NULL;
- memset(&result.data, 0, sizeof(msgpack_object));
- }
-
- if(p_file_info)
- uv_mutex_unlock(&p_file_info->flb_tmp_buff_mut);
-
- flb_lib_free(record);
- return 0;
-
-}
-
-/**
- * @brief Add a Fluent-Bit input that outputs to the "lib" Fluent-Bit plugin.
- * @param[in] p_file_info Pointer to the log source struct where the input will
- * be registered to.
- * @return 0 on success, a negative number for any errors (see enum).
- */
-int flb_add_input(struct File_info *const p_file_info){
-
- enum return_values {
- SUCCESS = 0,
- INVALID_LOG_TYPE = -1,
- CONFIG_READ_ERROR = -2,
- FLB_PARSER_CREATE_ERROR = -3,
- FLB_INPUT_ERROR = -4,
- FLB_INPUT_SET_ERROR = -5,
- FLB_OUTPUT_ERROR = -6,
- FLB_OUTPUT_SET_ERROR = -7,
- DEFAULT_ERROR = -8
- };
-
- const int tag_max_size = 5;
- static unsigned tag = 0; // incremental tag id to link flb inputs to outputs
- char tag_s[tag_max_size];
- snprintfz(tag_s, tag_max_size, "%u", tag++);
-
-
- switch(p_file_info->log_type){
- case FLB_TAIL:
- case FLB_WEB_LOG: {
-
- char update_every_str[10];
- snprintfz(update_every_str, 10, "%d", p_file_info->update_every);
-
- debug_log("Setting up %s tail for %s (basename:%s)",
- p_file_info->log_type == FLB_TAIL ? "FLB_TAIL" : "FLB_WEB_LOG",
- p_file_info->filename, p_file_info->file_basename);
-
- Flb_tail_config_t *tail_config = (Flb_tail_config_t *) p_file_info->flb_config;
- if(unlikely(!tail_config)) return CONFIG_READ_ERROR;
-
- /* Set up input from log source */
- p_file_info->flb_input = flb_input(ctx, "tail", NULL);
- if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR;
- if(flb_input_set(ctx, p_file_info->flb_input,
- "Tag", tag_s,
- "Path", p_file_info->filename,
- "Key", LOG_REC_KEY,
- "Refresh_Interval", update_every_str,
- "Skip_Long_Lines", "On",
- "Skip_Empty_Lines", "On",
-#if defined(FLB_HAVE_INOTIFY)
- "Inotify_Watcher", tail_config->use_inotify ? "true" : "false",
-#endif
- NULL) != 0) return FLB_INPUT_SET_ERROR;
-
- break;
- }
- case FLB_KMSG: {
- debug_log( "Setting up FLB_KMSG collector");
-
- Flb_kmsg_config_t *kmsg_config = (Flb_kmsg_config_t *) p_file_info->flb_config;
- if(unlikely(!kmsg_config ||
- !kmsg_config->prio_level ||
- !*kmsg_config->prio_level)) return CONFIG_READ_ERROR;
-
- /* Set up kmsg input */
- p_file_info->flb_input = flb_input(ctx, "kmsg", NULL);
- if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR;
- if(flb_input_set(ctx, p_file_info->flb_input,
- "Tag", tag_s,
- "Prio_Level", kmsg_config->prio_level,
- NULL) != 0) return FLB_INPUT_SET_ERROR;
-
- break;
- }
- case FLB_SYSTEMD: {
- debug_log( "Setting up FLB_SYSTEMD collector");
-
- /* Set up systemd input */
- p_file_info->flb_input = flb_input(ctx, "systemd", NULL);
- if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR;
- if(!strcmp(p_file_info->filename, SYSTEMD_DEFAULT_PATH)){
- if(flb_input_set(ctx, p_file_info->flb_input,
- "Tag", tag_s,
- "Read_From_Tail", "On",
- "Strip_Underscores", "On",
- NULL) != 0) return FLB_INPUT_SET_ERROR;
- } else {
- if(flb_input_set(ctx, p_file_info->flb_input,
- "Tag", tag_s,
- "Read_From_Tail", "On",
- "Strip_Underscores", "On",
- "Path", p_file_info->filename,
- NULL) != 0) return FLB_INPUT_SET_ERROR;
- }
-
- break;
- }
- case FLB_DOCKER_EV: {
- debug_log( "Setting up FLB_DOCKER_EV collector");
-
- /* Set up Docker Events parser */
- if(flb_parser_create( "docker_events_parser", /* parser name */
- "json", /* backend type */
- NULL, /* regex */
- FLB_TRUE, /* skip_empty */
- NULL, /* time format */
- NULL, /* time key */
- NULL, /* time offset */
- FLB_TRUE, /* time keep */
- FLB_FALSE, /* time strict */
- FLB_FALSE, /* no bare keys */
- NULL, /* parser types */
- 0, /* types len */
- NULL, /* decoders */
- ctx->config) == NULL) return FLB_PARSER_CREATE_ERROR;
-
- /* Set up Docker Events input */
- p_file_info->flb_input = flb_input(ctx, "docker_events", NULL);
- if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR;
- if(flb_input_set(ctx, p_file_info->flb_input,
- "Tag", tag_s,
- "Parser", "docker_events_parser",
- "Unix_Path", p_file_info->filename,
- NULL) != 0) return FLB_INPUT_SET_ERROR;
-
- break;
- }
- case FLB_SYSLOG: {
- debug_log( "Setting up FLB_SYSLOG collector");
-
- /* Set up syslog parser */
- const char syslog_parser_prfx[] = "syslog_parser_";
- size_t parser_name_size = sizeof(syslog_parser_prfx) + tag_max_size - 1;
- char parser_name[parser_name_size];
- snprintfz(parser_name, parser_name_size, "%s%u", syslog_parser_prfx, tag);
-
- Syslog_parser_config_t *syslog_config = (Syslog_parser_config_t *) p_file_info->parser_config->gen_config;
- if(unlikely(!syslog_config ||
- !syslog_config->socket_config ||
- !syslog_config->socket_config->mode ||
- !p_file_info->filename)) return CONFIG_READ_ERROR;
-
- if(flb_parser_create( parser_name, /* parser name */
- "regex", /* backend type */
- syslog_config->log_format, /* regex */
- FLB_TRUE, /* skip_empty */
- NULL, /* time format */
- NULL, /* time key */
- NULL, /* time offset */
- FLB_TRUE, /* time keep */
- FLB_TRUE, /* time strict */
- FLB_FALSE, /* no bare keys */
- NULL, /* parser types */
- 0, /* types len */
- NULL, /* decoders */
- ctx->config) == NULL) return FLB_PARSER_CREATE_ERROR;
-
- /* Set up syslog input */
- p_file_info->flb_input = flb_input(ctx, "syslog", NULL);
- if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR;
- if( !strcmp(syslog_config->socket_config->mode, "unix_udp") ||
- !strcmp(syslog_config->socket_config->mode, "unix_tcp")){
- m_assert(syslog_config->socket_config->unix_perm, "unix_perm is not set");
- if(flb_input_set(ctx, p_file_info->flb_input,
- "Tag", tag_s,
- "Path", p_file_info->filename,
- "Parser", parser_name,
- "Mode", syslog_config->socket_config->mode,
- "Unix_Perm", syslog_config->socket_config->unix_perm,
- NULL) != 0) return FLB_INPUT_SET_ERROR;
- } else if( !strcmp(syslog_config->socket_config->mode, "udp") ||
- !strcmp(syslog_config->socket_config->mode, "tcp")){
- m_assert(syslog_config->socket_config->listen, "listen is not set");
- m_assert(syslog_config->socket_config->port, "port is not set");
- if(flb_input_set(ctx, p_file_info->flb_input,
- "Tag", tag_s,
- "Parser", parser_name,
- "Mode", syslog_config->socket_config->mode,
- "Listen", syslog_config->socket_config->listen,
- "Port", syslog_config->socket_config->port,
- NULL) != 0) return FLB_INPUT_SET_ERROR;
- } else return FLB_INPUT_SET_ERROR; // should never reach this line
-
- break;
- }
- case FLB_SERIAL: {
- debug_log( "Setting up FLB_SERIAL collector");
-
- Flb_serial_config_t *serial_config = (Flb_serial_config_t *) p_file_info->flb_config;
- if(unlikely(!serial_config ||
- !serial_config->bitrate ||
- !*serial_config->bitrate ||
- !serial_config->min_bytes ||
- !*serial_config->min_bytes ||
- !p_file_info->filename)) return CONFIG_READ_ERROR;
-
- /* Set up serial input */
- p_file_info->flb_input = flb_input(ctx, "serial", NULL);
- if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR;
- if(flb_input_set(ctx, p_file_info->flb_input,
- "Tag", tag_s,
- "File", p_file_info->filename,
- "Bitrate", serial_config->bitrate,
- "Min_Bytes", serial_config->min_bytes,
- "Separator", serial_config->separator,
- "Format", serial_config->format,
- NULL) != 0) return FLB_INPUT_SET_ERROR;
-
- break;
- }
- case FLB_MQTT: {
- debug_log( "Setting up FLB_MQTT collector");
-
- Flb_socket_config_t *socket_config = (Flb_socket_config_t *) p_file_info->flb_config;
- if(unlikely(!socket_config || !socket_config->listen || !*socket_config->listen ||
- !socket_config->port || !*socket_config->port)) return CONFIG_READ_ERROR;
-
- /* Set up MQTT input */
- p_file_info->flb_input = flb_input(ctx, "mqtt", NULL);
- if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR;
- if(flb_input_set(ctx, p_file_info->flb_input,
- "Tag", tag_s,
- "Listen", socket_config->listen,
- "Port", socket_config->port,
- NULL) != 0) return FLB_INPUT_SET_ERROR;
-
- break;
- }
- default: {
- m_assert(0, "default: case in flb_add_input() error");
- return DEFAULT_ERROR; // Shouldn't reach here
- }
- }
-
- /* Set up user-configured outputs */
- for(Flb_output_config_t *output = p_file_info->flb_outputs; output; output = output->next){
- debug_log( "setting up user output [%s]", output->plugin);
-
- int out = flb_output(ctx, output->plugin, NULL);
- if(out < 0) return FLB_OUTPUT_ERROR;
- if(flb_output_set(ctx, out,
- "Match", tag_s,
- NULL) != 0) return FLB_OUTPUT_SET_ERROR;
- for(struct flb_output_config_param *param = output->param; param; param = param->next){
- debug_log( "setting up param [%s][%s] of output [%s]", param->key, param->val, output->plugin);
- if(flb_output_set(ctx, out,
- param->key, param->val,
- NULL) != 0) return FLB_OUTPUT_SET_ERROR;
- }
- }
-
- /* Set up "lib" output */
- struct flb_lib_out_cb *callback = mallocz(sizeof(struct flb_lib_out_cb));
- callback->cb = flb_collect_logs_cb;
- callback->data = p_file_info;
- if(((p_file_info->flb_lib_output = flb_output(ctx, "lib", callback)) < 0) ||
- (flb_output_set(ctx, p_file_info->flb_lib_output, "Match", tag_s, NULL) != 0)){
- freez(callback);
- return FLB_OUTPUT_ERROR;
- }
-
- return SUCCESS;
-}
-
-/**
- * @brief Add a Fluent-Bit Forward input.
- * @details This creates a unix or network socket to accept logs using
- * Fluent Bit's Forward protocol. For more information see:
- * https://docs.fluentbit.io/manual/pipeline/inputs/forward
- * @param[in] forward_in_config Configuration of the Forward input socket.
- * @return 0 on success, -1 on error.
- */
-int flb_add_fwd_input(Flb_socket_config_t *forward_in_config){
-
- if(forward_in_config == NULL){
- debug_log( "forward: forward_in_config is NULL");
- collector_info("forward_in_config is NULL");
- return 0;
- }
-
- do{
- debug_log( "forward: Setting up flb_add_fwd_input()");
-
- int input, output;
-
- if((input = flb_input(ctx, "forward", NULL)) < 0) break;
-
- if( forward_in_config->unix_path && *forward_in_config->unix_path &&
- forward_in_config->unix_perm && *forward_in_config->unix_perm){
- if(flb_input_set(ctx, input,
- "Tag_Prefix", "fwd",
- "Unix_Path", forward_in_config->unix_path,
- "Unix_Perm", forward_in_config->unix_perm,
- NULL) != 0) break;
- } else if( forward_in_config->listen && *forward_in_config->listen &&
- forward_in_config->port && *forward_in_config->port){
- if(flb_input_set(ctx, input,
- "Tag_Prefix", "fwd",
- "Listen", forward_in_config->listen,
- "Port", forward_in_config->port,
- NULL) != 0) break;
- } else break; // should never reach this line
-
- fwd_input_out_cb = mallocz(sizeof(struct flb_lib_out_cb));
-
- /* Set up output */
- fwd_input_out_cb->cb = flb_collect_logs_cb;
- fwd_input_out_cb->data = NULL;
- if((output = flb_output(ctx, "lib", fwd_input_out_cb)) < 0) break;
- if(flb_output_set(ctx, output,
- "Match", "fwd*",
- NULL) != 0) break;
-
- debug_log( "forward: Set up flb_add_fwd_input() with success");
- return 0;
- } while(0);
-
- /* Error */
- if(fwd_input_out_cb) freez(fwd_input_out_cb);
- return -1;
-}
-
-void flb_free_fwd_input_out_cb(void){
- freez(fwd_input_out_cb);
-} \ No newline at end of file