From be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 04:57:58 +0200 Subject: Adding upstream version 1.44.3. Signed-off-by: Daniel Baumann --- logsmanagement/flb_plugin.c | 1536 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1536 insertions(+) create mode 100644 logsmanagement/flb_plugin.c (limited to 'logsmanagement/flb_plugin.c') diff --git a/logsmanagement/flb_plugin.c b/logsmanagement/flb_plugin.c new file mode 100644 index 00000000..493749ed --- /dev/null +++ b/logsmanagement/flb_plugin.c @@ -0,0 +1,1536 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +/** @file flb_plugin.c + * @brief This file includes all functions that act as an API to + * the Fluent Bit library. + */ + +#include "flb_plugin.h" +#include +#include "helper.h" +#include "defaults.h" +#include "circular_buffer.h" +#include "daemon/common.h" +#include "libnetdata/libnetdata.h" +#include "../fluent-bit/lib/msgpack-c/include/msgpack/unpack.h" +#include "../fluent-bit/lib/msgpack-c/include/msgpack/object.h" +#include "../fluent-bit/lib/monkey/include/monkey/mk_core/mk_list.h" +#include + +#ifdef HAVE_SYSTEMD +#include +#define SD_JOURNAL_SEND_DEFAULT_FIELDS \ + "%s_LOG_SOURCE=%s" , sd_journal_field_prefix, log_src_t_str[p_file_info->log_source], \ + "%s_LOG_TYPE=%s" , sd_journal_field_prefix, log_src_type_t_str[p_file_info->log_type] +#endif + +#define LOG_REC_KEY "msg" /**< key to represent log message field in most log sources **/ +#define LOG_REC_KEY_SYSTEMD "MESSAGE" /**< key to represent log message field in systemd log source **/ +#define SYSLOG_TIMESTAMP_SIZE 16 +#define UNKNOWN "unknown" + + +/* Including "../fluent-bit/include/fluent-bit/flb_macros.h" causes issues + * with CI, as it requires mk_core/mk_core_info.h which is generated only + * after Fluent Bit has been built. We can instead just redefined a couple + * of macros here: */ +#define FLB_FALSE 0 +#define FLB_TRUE !FLB_FALSE + +/* For similar reasons, (re)define the following macros from "flb_lib.h": */ +/* Lib engine status */ +#define FLB_LIB_ERROR -1 +#define FLB_LIB_NONE 0 +#define FLB_LIB_OK 1 +#define FLB_LIB_NO_CONFIG_MAP 2 + +/* Following structs are the same as defined in fluent-bit/flb_lib.h and + * fluent-bit/flb_time.h, but need to be redefined due to use of dlsym(). */ + +struct flb_time { + struct timespec tm; +}; + +/* Library mode context data */ +struct flb_lib_ctx { + int status; + struct mk_event_loop *event_loop; + struct mk_event *event_channel; + struct flb_config *config; +}; + +struct flb_parser_types { + char *key; + int key_len; + int type; +}; + +struct flb_parser { + /* configuration */ + int type; /* parser type */ + char *name; /* format name */ + char *p_regex; /* pattern for main regular expression */ + int skip_empty; /* skip empty regex matches */ + char *time_fmt; /* time format */ + char *time_fmt_full; /* original given time format */ + char *time_key; /* field name that contains the time */ + int time_offset; /* fixed UTC offset */ + int time_keep; /* keep time field */ + int time_strict; /* parse time field strictly */ + int logfmt_no_bare_keys; /* in logfmt parsers, require all keys to have values */ + char *time_frac_secs; /* time format have fractional seconds ? */ + struct flb_parser_types *types; /* type casting */ + int types_len; + + /* Field decoders */ + struct mk_list *decoders; + + /* internal */ + int time_with_year; /* do time_fmt consider a year (%Y) ? */ + char *time_fmt_year; + int time_with_tz; /* do time_fmt consider a timezone ? */ + struct flb_regex *regex; + struct mk_list _head; +}; + +struct flb_lib_out_cb { + int (*cb) (void *record, size_t size, void *data); + void *data; +}; + +typedef struct flb_lib_ctx flb_ctx_t; + +static flb_ctx_t *(*flb_create)(void); +static int (*flb_service_set)(flb_ctx_t *ctx, ...); +static int (*flb_start)(flb_ctx_t *ctx); +static int (*flb_stop)(flb_ctx_t *ctx); +static void (*flb_destroy)(flb_ctx_t *ctx); +static int (*flb_time_pop_from_msgpack)(struct flb_time *time, msgpack_unpacked *upk, msgpack_object **map); +static int (*flb_lib_free)(void *data); +static struct flb_parser *(*flb_parser_create)( const char *name, const char *format, const char *p_regex, int skip_empty, + const char *time_fmt, const char *time_key, const char *time_offset, + int time_keep, int time_strict, int logfmt_no_bare_keys, + struct flb_parser_types *types, int types_len,struct mk_list *decoders, + struct flb_config *config); +static int (*flb_input)(flb_ctx_t *ctx, const char *input, void *data); +static int (*flb_input_set)(flb_ctx_t *ctx, int ffd, ...); +// static int (*flb_filter)(flb_ctx_t *ctx, const char *filter, void *data); +// static int (*flb_filter_set)(flb_ctx_t *ctx, int ffd, ...); +static int (*flb_output)(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb); +static int (*flb_output_set)(flb_ctx_t *ctx, int ffd, ...); +static msgpack_unpack_return (*dl_msgpack_unpack_next)(msgpack_unpacked* result, const char* data, size_t len, size_t* off); +static void (*dl_msgpack_zone_free)(msgpack_zone* zone); +static int (*dl_msgpack_object_print_buffer)(char *buffer, size_t buffer_size, msgpack_object o); + +static flb_ctx_t *ctx = NULL; +static void *flb_lib_handle = NULL; + +static struct flb_lib_out_cb *fwd_input_out_cb = NULL; + +static const char *sd_journal_field_prefix = SD_JOURNAL_FIELD_PREFIX; + +extern netdata_mutex_t stdout_mut; + +int flb_init(flb_srvc_config_t flb_srvc_config, + const char *const stock_config_dir, + const char *const new_sd_journal_field_prefix){ + int rc = 0; + char *dl_error; + + char *flb_lib_path = strdupz_path_subpath(stock_config_dir, "/../libfluent-bit.so"); + if (unlikely(NULL == (flb_lib_handle = dlopen(flb_lib_path, RTLD_LAZY)))){ + if (NULL != (dl_error = dlerror())) + collector_error("dlopen() libfluent-bit.so error: %s", dl_error); + rc = -1; + goto do_return; + } + + dlerror(); /* Clear any existing error */ + + /* Load Fluent-Bit functions from the shared library */ + #define load_function(FUNC_NAME){ \ + *(void **) (&FUNC_NAME) = dlsym(flb_lib_handle, LOGS_MANAG_STR(FUNC_NAME)); \ + if ((dl_error = dlerror()) != NULL) { \ + collector_error("dlerror loading %s: %s", LOGS_MANAG_STR(FUNC_NAME), dl_error); \ + rc = -1; \ + goto do_return; \ + } \ + } + + load_function(flb_create); + load_function(flb_service_set); + load_function(flb_start); + load_function(flb_stop); + load_function(flb_destroy); + load_function(flb_time_pop_from_msgpack); + load_function(flb_lib_free); + load_function(flb_parser_create); + load_function(flb_input); + load_function(flb_input_set); + // load_function(flb_filter); + // load_function(flb_filter_set); + load_function(flb_output); + load_function(flb_output_set); + *(void **) (&dl_msgpack_unpack_next) = dlsym(flb_lib_handle, "msgpack_unpack_next"); + if ((dl_error = dlerror()) != NULL) { + collector_error("dlerror loading msgpack_unpack_next: %s", dl_error); + rc = -1; + goto do_return; + } + *(void **) (&dl_msgpack_zone_free) = dlsym(flb_lib_handle, "msgpack_zone_free"); + if ((dl_error = dlerror()) != NULL) { + collector_error("dlerror loading msgpack_zone_free: %s", dl_error); + rc = -1; + goto do_return; + } + *(void **) (&dl_msgpack_object_print_buffer) = dlsym(flb_lib_handle, "msgpack_object_print_buffer"); + if ((dl_error = dlerror()) != NULL) { + collector_error("dlerror loading msgpack_object_print_buffer: %s", dl_error); + rc = -1; + goto do_return; + } + + ctx = flb_create(); + if (unlikely(!ctx)){ + rc = -1; + goto do_return; + } + + /* Global service settings */ + if(unlikely(flb_service_set(ctx, + "Flush" , flb_srvc_config.flush, + "HTTP_Listen" , flb_srvc_config.http_listen, + "HTTP_Port" , flb_srvc_config.http_port, + "HTTP_Server" , flb_srvc_config.http_server, + "Log_File" , flb_srvc_config.log_path, + "Log_Level" , flb_srvc_config.log_level, + "Coro_stack_size" , flb_srvc_config.coro_stack_size, + NULL) != 0 )){ + rc = -1; + goto do_return; + } + + if(new_sd_journal_field_prefix && *new_sd_journal_field_prefix) + sd_journal_field_prefix = new_sd_journal_field_prefix; + +do_return: + freez(flb_lib_path); + if(unlikely(rc && flb_lib_handle)) + dlclose(flb_lib_handle); + + return rc; +} + +int flb_run(void){ + if (likely(flb_start(ctx)) == 0) return 0; + else return -1; +} + +void flb_terminate(void){ + if(ctx){ + flb_stop(ctx); + flb_destroy(ctx); + ctx = NULL; + } + if(flb_lib_handle) + dlclose(flb_lib_handle); +} + +static void flb_complete_buff_item(struct File_info *p_file_info){ + + Circ_buff_t *buff = p_file_info->circ_buff; + + m_assert(buff->in->timestamp, "buff->in->timestamp cannot be 0"); + m_assert(buff->in->data, "buff->in->text cannot be NULL"); + m_assert(*buff->in->data, "*buff->in->text cannot be 0"); + m_assert(buff->in->text_size, "buff->in->text_size cannot be 0"); + + /* Replace last '\n' with '\0' to null-terminate text */ + buff->in->data[buff->in->text_size - 1] = '\0'; + + /* Store status (timestamp and text_size must have already been + * stored during flb_collect_logs_cb() ). */ + buff->in->status = CIRC_BUFF_ITEM_STATUS_UNPROCESSED; + + /* Load max size of compressed buffer, as calculated previously */ + size_t text_compressed_buff_max_size = buff->in->text_compressed_size; + + /* Do compression. + * TODO: Validate compression option? */ + buff->in->text_compressed = buff->in->data + buff->in->text_size; + buff->in->text_compressed_size = LZ4_compress_fast( buff->in->data, + buff->in->text_compressed, + buff->in->text_size, + text_compressed_buff_max_size, + p_file_info->compression_accel); + m_assert(buff->in->text_compressed_size != 0, "Text_compressed_size should be != 0"); + + p_file_info->parser_metrics->last_update = buff->in->timestamp / MSEC_PER_SEC; + + p_file_info->parser_metrics->num_lines += buff->in->num_lines; + + /* Perform custom log chart parsing */ + for(int i = 0; p_file_info->parser_cus_config[i]; i++){ + p_file_info->parser_metrics->parser_cus[i]->count += + search_keyword( buff->in->data, buff->in->text_size, NULL, NULL, + NULL, &p_file_info->parser_cus_config[i]->regex, 0); + } + + /* Update charts */ + netdata_mutex_lock(&stdout_mut); + p_file_info->chart_meta->update(p_file_info); + fflush(stdout); + netdata_mutex_unlock(&stdout_mut); + + circ_buff_insert(buff); + + uv_timer_again(&p_file_info->flb_tmp_buff_cpy_timer); +} + +void flb_complete_item_timer_timeout_cb(uv_timer_t *handle) { + + struct File_info *p_file_info = handle->data; + Circ_buff_t *buff = p_file_info->circ_buff; + + uv_mutex_lock(&p_file_info->flb_tmp_buff_mut); + if(!buff->in->data || !*buff->in->data || !buff->in->text_size){ + p_file_info->parser_metrics->last_update = now_realtime_sec(); + netdata_mutex_lock(&stdout_mut); + p_file_info->chart_meta->update(p_file_info); + fflush(stdout); + netdata_mutex_unlock(&stdout_mut); + uv_mutex_unlock(&p_file_info->flb_tmp_buff_mut); + return; + } + + flb_complete_buff_item(p_file_info); + + uv_mutex_unlock(&p_file_info->flb_tmp_buff_mut); +} + +static int flb_collect_logs_cb(void *record, size_t size, void *data){ + + /* "data" is NULL for Forward-type sources and non-NULL for local sources */ + struct File_info *p_file_info = (struct File_info *) data; + Circ_buff_t *buff = NULL; + + msgpack_unpacked result; + size_t off = 0; + struct flb_time tmp_time; + msgpack_object *x; + + char timestamp_str[TIMESTAMP_MS_STR_SIZE] = ""; + msec_t timestamp = 0; + + struct resizable_key_val_arr { + char **key; + char **val; + size_t *key_size; + size_t *val_size; + int size, max_size; + }; + + /* FLB_WEB_LOG case */ + Log_line_parsed_t line_parsed = (Log_line_parsed_t) {0}; + /* FLB_WEB_LOG case end */ + + /* FLB_KMSG case */ + static int skip_kmsg_log_buffering = 1; + int kmsg_sever = -1; // -1 equals invalid + /* FLB_KMSG case end */ + + /* FLB_SYSTEMD or FLB_SYSLOG case */ + char syslog_prival[4] = ""; + size_t syslog_prival_size = 0; + char syslog_severity[2] = ""; + char syslog_facility[3] = ""; + char *syslog_timestamp = NULL; + size_t syslog_timestamp_size = 0; + char *hostname = NULL; + size_t hostname_size = 0; + char *syslog_identifier = NULL; + size_t syslog_identifier_size = 0; + char *pid = NULL; + size_t pid_size = 0; + char *message = NULL; + size_t message_size = 0; + /* FLB_SYSTEMD or FLB_SYSLOG case end */ + + /* FLB_DOCKER_EV case */ + long docker_ev_time = 0; + long docker_ev_timeNano = 0; + char *docker_ev_type = NULL; + size_t docker_ev_type_size = 0; + char *docker_ev_action = NULL; + size_t docker_ev_action_size = 0; + char *docker_ev_id = NULL; + size_t docker_ev_id_size = 0; + static struct resizable_key_val_arr docker_ev_attr = {0}; + docker_ev_attr.size = 0; + /* FLB_DOCKER_EV case end */ + + /* FLB_MQTT case */ + char *mqtt_topic = NULL; + size_t mqtt_topic_size = 0; + static char *mqtt_message = NULL; + static size_t mqtt_message_size_max = 0; + /* FLB_MQTT case end */ + + size_t new_tmp_text_size = 0; + + msgpack_unpacked_init(&result); + + int iter = 0; + while (dl_msgpack_unpack_next(&result, record, size, &off) == MSGPACK_UNPACK_SUCCESS) { + iter++; + m_assert(iter == 1, "We do not expect more than one loop iteration here"); + + flb_time_pop_from_msgpack(&tmp_time, &result, &x); + + if(likely(x->type == MSGPACK_OBJECT_MAP && x->via.map.size != 0)){ + msgpack_object_kv* p = x->via.map.ptr; + msgpack_object_kv* pend = x->via.map.ptr + x->via.map.size; + + /* ================================================================ + * If p_file_info == NULL, it means it is a "Forward" source, so + * we need to search for the associated p_file_info. This code can + * be optimized further. + * ============================================================== */ + if(p_file_info == NULL){ + do{ + if(!strncmp(p->key.via.str.ptr, "stream guid", (size_t) p->key.via.str.size)){ + char *stream_guid = (char *) p->val.via.str.ptr; + size_t stream_guid_size = p->val.via.str.size; + debug_log( "stream guid:%.*s", (int) stream_guid_size, stream_guid); + + for (int i = 0; i < p_file_infos_arr->count; i++) { + if(!strncmp(p_file_infos_arr->data[i]->stream_guid, stream_guid, stream_guid_size)){ + p_file_info = p_file_infos_arr->data[i]; + // debug_log( "p_file_info match found: %s type[%s]", + // p_file_info->stream_guid, + // log_src_type_t_str[p_file_info->log_type]); + break; + } + } + } + ++p; + // continue; + } while(p < pend); + } + if(unlikely(p_file_info == NULL)) + goto skip_collect_and_drop_logs; + + + uv_mutex_lock(&p_file_info->flb_tmp_buff_mut); + buff = p_file_info->circ_buff; + + + p = x->via.map.ptr; + pend = x->via.map.ptr + x->via.map.size; + do{ + switch(p_file_info->log_type){ + + case FLB_TAIL: + case FLB_WEB_LOG: + case FLB_SERIAL: + { + if( !strncmp(p->key.via.str.ptr, LOG_REC_KEY, (size_t) p->key.via.str.size) || + /* The following line is in case we collect systemd logs + * (tagged as "MESSAGE") or docker_events (tagged as + * "message") via a "Forward" source to an FLB_TAIL parent. */ + !strncasecmp(p->key.via.str.ptr, LOG_REC_KEY_SYSTEMD, (size_t) p->key.via.str.size)){ + + message = (char *) p->val.via.str.ptr; + message_size = p->val.via.str.size; + + if(p_file_info->log_type == FLB_WEB_LOG){ + parse_web_log_line( (Web_log_parser_config_t *) p_file_info->parser_config->gen_config, + message, message_size, &line_parsed); + + if(likely(p_file_info->use_log_timestamp)){ + timestamp = line_parsed.timestamp * MSEC_PER_SEC; // convert to msec from sec + + { /* ------------------ FIXME ------------------------ + * Temporary kludge so that metrics don't break when + * a new record has timestamp before the current one. + */ + static msec_t previous_timestamp = 0; + if((((long long) timestamp - (long long) previous_timestamp) < 0)) + timestamp = previous_timestamp; + + previous_timestamp = timestamp; + } + } + } + + new_tmp_text_size = message_size + 1; // +1 for '\n' + + m_assert(message_size, "message_size is 0"); + m_assert(message, "message is NULL"); + } + + break; + } + + case FLB_KMSG: + { + if(unlikely(skip_kmsg_log_buffering)){ + static time_t netdata_start_time = 0; + if (!netdata_start_time) netdata_start_time = now_boottime_sec(); + if(now_boottime_sec() - netdata_start_time < KERNEL_LOGS_COLLECT_INIT_WAIT) + goto skip_collect_and_drop_logs; + else skip_kmsg_log_buffering = 0; + } + + /* NOTE/WARNING: + * kmsg timestamps are tricky. The timestamp will be + * *wrong** if the system has gone into hibernation since + * last boot and "p_file_info->use_log_timestamp" is set. + * Even if "p_file_info->use_log_timestamp" is NOT set, we + * need to use now_realtime_msec() as Fluent Bit timestamp + * will also be wrong. */ + if( !strncmp(p->key.via.str.ptr, "sec", (size_t) p->key.via.str.size)){ + if(p_file_info->use_log_timestamp){ + timestamp += (now_realtime_sec() - now_boottime_sec() + p->val.via.i64) * MSEC_PER_SEC; + } + else if(!timestamp) + timestamp = now_realtime_msec(); + } + else if(!strncmp(p->key.via.str.ptr, "usec", (size_t) p->key.via.str.size) && + p_file_info->use_log_timestamp){ + timestamp += p->val.via.i64 / USEC_PER_MS; + } + else if(!strncmp(p->key.via.str.ptr, LOG_REC_KEY, (size_t) p->key.via.str.size)){ + message = (char *) p->val.via.str.ptr; + message_size = p->val.via.str.size; + + m_assert(message, "message is NULL"); + m_assert(message_size, "message_size is 0"); + + new_tmp_text_size += message_size + 1; // +1 for '\n' + } + else if(!strncmp(p->key.via.str.ptr, "priority", (size_t) p->key.via.str.size)){ + kmsg_sever = (int) p->val.via.u64; + } + + break; + } + + case FLB_SYSTEMD: + case FLB_SYSLOG: + { + if( p_file_info->use_log_timestamp && !strncmp( p->key.via.str.ptr, + "SOURCE_REALTIME_TIMESTAMP", + (size_t) p->key.via.str.size)){ + + m_assert(p->val.via.str.size - 3 == TIMESTAMP_MS_STR_SIZE - 1, + "p->val.via.str.size - 3 != TIMESTAMP_MS_STR_SIZE"); + + strncpyz(timestamp_str, p->val.via.str.ptr, (size_t) p->val.via.str.size); + + char *endptr = NULL; + timestamp = str2ll(timestamp_str, &endptr); + timestamp = *endptr ? 0 : timestamp / USEC_PER_MS; + } + else if(!strncmp(p->key.via.str.ptr, "PRIVAL", (size_t) p->key.via.str.size)){ + m_assert(p->val.via.str.size <= 3, "p->val.via.str.size > 3"); + strncpyz(syslog_prival, p->val.via.str.ptr, (size_t) p->val.via.str.size); + syslog_prival_size = (size_t) p->val.via.str.size; + + m_assert(syslog_prival, "syslog_prival is NULL"); + } + else if(!strncmp(p->key.via.str.ptr, "PRIORITY", (size_t) p->key.via.str.size)){ + m_assert(p->val.via.str.size <= 1, "p->val.via.str.size > 1"); + strncpyz(syslog_severity, p->val.via.str.ptr, (size_t) p->val.via.str.size); + + m_assert(syslog_severity, "syslog_severity is NULL"); + } + else if(!strncmp(p->key.via.str.ptr, "SYSLOG_FACILITY", (size_t) p->key.via.str.size)){ + m_assert(p->val.via.str.size <= 2, "p->val.via.str.size > 2"); + strncpyz(syslog_facility, p->val.via.str.ptr, (size_t) p->val.via.str.size); + + m_assert(syslog_facility, "syslog_facility is NULL"); + } + else if(!strncmp(p->key.via.str.ptr, "SYSLOG_TIMESTAMP", (size_t) p->key.via.str.size)){ + syslog_timestamp = (char *) p->val.via.str.ptr; + syslog_timestamp_size = p->val.via.str.size; + + m_assert(syslog_timestamp, "syslog_timestamp is NULL"); + m_assert(syslog_timestamp_size, "syslog_timestamp_size is 0"); + + new_tmp_text_size += syslog_timestamp_size; + } + else if(!strncmp(p->key.via.str.ptr, "HOSTNAME", (size_t) p->key.via.str.size)){ + hostname = (char *) p->val.via.str.ptr; + hostname_size = p->val.via.str.size; + + m_assert(hostname, "hostname is NULL"); + m_assert(hostname_size, "hostname_size is 0"); + + new_tmp_text_size += hostname_size + 1; // +1 for ' ' char + } + else if(!strncmp(p->key.via.str.ptr, "SYSLOG_IDENTIFIER", (size_t) p->key.via.str.size)){ + syslog_identifier = (char *) p->val.via.str.ptr; + syslog_identifier_size = p->val.via.str.size; + + new_tmp_text_size += syslog_identifier_size; + } + else if(!strncmp(p->key.via.str.ptr, "PID", (size_t) p->key.via.str.size)){ + pid = (char *) p->val.via.str.ptr; + pid_size = p->val.via.str.size; + + new_tmp_text_size += pid_size; + } + else if(!strncmp(p->key.via.str.ptr, LOG_REC_KEY_SYSTEMD, (size_t) p->key.via.str.size)){ + + message = (char *) p->val.via.str.ptr; + message_size = p->val.via.str.size; + + m_assert(message, "message is NULL"); + m_assert(message_size, "message_size is 0"); + + new_tmp_text_size += message_size; + } + + break; + } + + case FLB_DOCKER_EV: + { + if(!strncmp(p->key.via.str.ptr, "time", (size_t) p->key.via.str.size)){ + docker_ev_time = p->val.via.i64; + + m_assert(docker_ev_time, "docker_ev_time is 0"); + } + else if(!strncmp(p->key.via.str.ptr, "timeNano", (size_t) p->key.via.str.size)){ + docker_ev_timeNano = p->val.via.i64; + + m_assert(docker_ev_timeNano, "docker_ev_timeNano is 0"); + + if(likely(p_file_info->use_log_timestamp)) + timestamp = docker_ev_timeNano / NSEC_PER_MSEC; + } + else if(!strncmp(p->key.via.str.ptr, "Type", (size_t) p->key.via.str.size)){ + docker_ev_type = (char *) p->val.via.str.ptr; + docker_ev_type_size = p->val.via.str.size; + + m_assert(docker_ev_type, "docker_ev_type is NULL"); + m_assert(docker_ev_type_size, "docker_ev_type_size is 0"); + + // debug_log("docker_ev_type: %.*s", docker_ev_type_size, docker_ev_type); + } + else if(!strncmp(p->key.via.str.ptr, "Action", (size_t) p->key.via.str.size)){ + docker_ev_action = (char *) p->val.via.str.ptr; + docker_ev_action_size = p->val.via.str.size; + + m_assert(docker_ev_action, "docker_ev_action is NULL"); + m_assert(docker_ev_action_size, "docker_ev_action_size is 0"); + + // debug_log("docker_ev_action: %.*s", docker_ev_action_size, docker_ev_action); + } + else if(!strncmp(p->key.via.str.ptr, "id", (size_t) p->key.via.str.size)){ + docker_ev_id = (char *) p->val.via.str.ptr; + docker_ev_id_size = p->val.via.str.size; + + m_assert(docker_ev_id, "docker_ev_id is NULL"); + m_assert(docker_ev_id_size, "docker_ev_id_size is 0"); + + // debug_log("docker_ev_id: %.*s", docker_ev_id_size, docker_ev_id); + } + else if(!strncmp(p->key.via.str.ptr, "Actor", (size_t) p->key.via.str.size)){ + // debug_log( "msg key:[%.*s]val:[%.*s]", (int) p->key.via.str.size, + // p->key.via.str.ptr, + // (int) p->val.via.str.size, + // p->val.via.str.ptr); + if(likely(p->val.type == MSGPACK_OBJECT_MAP && p->val.via.map.size != 0)){ + msgpack_object_kv* ac = p->val.via.map.ptr; + msgpack_object_kv* const ac_pend= p->val.via.map.ptr + p->val.via.map.size; + do{ + if(!strncmp(ac->key.via.str.ptr, "ID", (size_t) ac->key.via.str.size)){ + docker_ev_id = (char *) ac->val.via.str.ptr; + docker_ev_id_size = ac->val.via.str.size; + + m_assert(docker_ev_id, "docker_ev_id is NULL"); + m_assert(docker_ev_id_size, "docker_ev_id_size is 0"); + + // debug_log("docker_ev_id: %.*s", docker_ev_id_size, docker_ev_id); + } + else if(!strncmp(ac->key.via.str.ptr, "Attributes", (size_t) ac->key.via.str.size)){ + if(likely(ac->val.type == MSGPACK_OBJECT_MAP && ac->val.via.map.size != 0)){ + msgpack_object_kv* att = ac->val.via.map.ptr; + msgpack_object_kv* const att_pend = ac->val.via.map.ptr + ac->val.via.map.size; + do{ + if(unlikely(++docker_ev_attr.size > docker_ev_attr.max_size)){ + docker_ev_attr.max_size = docker_ev_attr.size; + docker_ev_attr.key = reallocz(docker_ev_attr.key, + docker_ev_attr.max_size * sizeof(char *)); + docker_ev_attr.val = reallocz(docker_ev_attr.val, + docker_ev_attr.max_size * sizeof(char *)); + docker_ev_attr.key_size = reallocz(docker_ev_attr.key_size, + docker_ev_attr.max_size * sizeof(size_t)); + docker_ev_attr.val_size = reallocz(docker_ev_attr.val_size, + docker_ev_attr.max_size * sizeof(size_t)); + } + + docker_ev_attr.key[docker_ev_attr.size - 1] = (char *) att->key.via.str.ptr; + docker_ev_attr.val[docker_ev_attr.size - 1] = (char *) att->val.via.str.ptr; + docker_ev_attr.key_size[docker_ev_attr.size - 1] = (size_t) att->key.via.str.size; + docker_ev_attr.val_size[docker_ev_attr.size - 1] = (size_t) att->val.via.str.size; + + att++; + continue; + } while(att < att_pend); + } + } + ac++; + continue; + } while(ac < ac_pend); + } + } + + break; + } + + case FLB_MQTT: + { + if(!strncmp(p->key.via.str.ptr, "topic", (size_t) p->key.via.str.size)){ + mqtt_topic = (char *) p->val.via.str.ptr; + mqtt_topic_size = (size_t) p->val.via.str.size; + + while(0 == (message_size = dl_msgpack_object_print_buffer(mqtt_message, mqtt_message_size_max, *x))) + mqtt_message = reallocz(mqtt_message, (mqtt_message_size_max += 10)); + + new_tmp_text_size = message_size + 1; // +1 for '\n' + + m_assert(message_size, "message_size is 0"); + m_assert(mqtt_message, "mqtt_message is NULL"); + + break; // watch out, MQTT requires a 'break' here, as we parse the entire 'x' msgpack_object + } + else m_assert(0, "missing mqtt topic"); + + break; + } + + default: + break; + } + + } while(++p < pend); + } + } + + /* If no log timestamp was found, use Fluent Bit collection timestamp. */ + if(timestamp == 0) + timestamp = (msec_t) tmp_time.tm.tv_sec * MSEC_PER_SEC + (msec_t) tmp_time.tm.tv_nsec / (NSEC_PER_MSEC); + + m_assert(TEST_MS_TIMESTAMP_VALID(timestamp), "timestamp is invalid"); + + /* If input buffer timestamp is not set, now is the time to set it, + * else just be done with the previous buffer */ + if(unlikely(buff->in->timestamp == 0)) buff->in->timestamp = timestamp / 1000 * 1000; // rounding down + else if((timestamp - buff->in->timestamp) >= MSEC_PER_SEC) { + flb_complete_buff_item(p_file_info); + buff->in->timestamp = timestamp / 1000 * 1000; // rounding down + } + + m_assert(TEST_MS_TIMESTAMP_VALID(buff->in->timestamp), "buff->in->timestamp is invalid"); + + new_tmp_text_size += buff->in->text_size; + + /* ======================================================================== + * Step 2: Extract metrics and reconstruct log record + * ====================================================================== */ + + /* Parse number of log lines - common for all log source types */ + buff->in->num_lines++; + + /* FLB_TAIL, FLB_WEB_LOG and FLB_SERIAL case */ + if( p_file_info->log_type == FLB_TAIL || + p_file_info->log_type == FLB_WEB_LOG || + p_file_info->log_type == FLB_SERIAL){ + + if(p_file_info->log_type == FLB_WEB_LOG) + extract_web_log_metrics(p_file_info->parser_config, &line_parsed, + p_file_info->parser_metrics->web_log); + + // TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0. + if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size))) + goto skip_collect_and_drop_logs; + + size_t tmp_item_off = buff->in->text_size; + + memcpy_iscntrl_fix(&buff->in->data[tmp_item_off], message, message_size); + tmp_item_off += message_size; + + buff->in->data[tmp_item_off++] = '\n'; + m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size"); + buff->in->text_size = new_tmp_text_size; + +#ifdef HAVE_SYSTEMD + if(p_file_info->do_sd_journal_send){ + if(p_file_info->log_type == FLB_WEB_LOG){ + sd_journal_send( + SD_JOURNAL_SEND_DEFAULT_FIELDS, + *line_parsed.vhost ? "%sWEB_LOG_VHOST=%s" : "_%s=%s", sd_journal_field_prefix, line_parsed.vhost, + line_parsed.port ? "%sWEB_LOG_PORT=%d" : "_%s=%d", sd_journal_field_prefix, line_parsed.port, + *line_parsed.req_scheme ? "%sWEB_LOG_REQ_SCHEME=%s" : "_%s=%s", sd_journal_field_prefix, line_parsed.req_scheme, + *line_parsed.req_client ? "%sWEB_LOG_REQ_CLIENT=%s" : "_%s=%s", sd_journal_field_prefix, line_parsed.req_client, + "%sWEB_LOG_REQ_METHOD=%s" , sd_journal_field_prefix, line_parsed.req_method, + *line_parsed.req_URL ? "%sWEB_LOG_REQ_URL=%s" : "_%s=%s", sd_journal_field_prefix, line_parsed.req_URL, + *line_parsed.req_proto ? "%sWEB_LOG_REQ_PROTO=%s" : "_%s=%s", sd_journal_field_prefix, line_parsed.req_proto, + line_parsed.req_size ? "%sWEB_LOG_REQ_SIZE=%d" : "_%s=%d", sd_journal_field_prefix, line_parsed.req_size, + line_parsed.req_proc_time ? "%sWEB_LOG_REC_PROC_TIME=%d" : "_%s=%d", sd_journal_field_prefix, line_parsed.req_proc_time, + line_parsed.resp_code ? "%sWEB_LOG_RESP_CODE=%d" : "_%s=%d", sd_journal_field_prefix ,line_parsed.resp_code, + line_parsed.ups_resp_time ? "%sWEB_LOG_UPS_RESP_TIME=%d" : "_%s=%d", sd_journal_field_prefix ,line_parsed.ups_resp_time, + *line_parsed.ssl_proto ? "%sWEB_LOG_SSL_PROTO=%s" : "_%s=%s", sd_journal_field_prefix ,line_parsed.ssl_proto, + *line_parsed.ssl_cipher ? "%sWEB_LOB_SSL_CIPHER=%s" : "_%s=%s", sd_journal_field_prefix ,line_parsed.ssl_cipher, + LOG_REC_KEY_SYSTEMD "=%.*s", (int) message_size, message, + NULL + ); + } + else if(p_file_info->log_type == FLB_SERIAL){ + Flb_serial_config_t *serial_config = (Flb_serial_config_t *) p_file_info->flb_config; + sd_journal_send( + SD_JOURNAL_SEND_DEFAULT_FIELDS, + serial_config->bitrate && *serial_config->bitrate ? + "%sSERIAL_BITRATE=%s" : "_%s=%s", sd_journal_field_prefix, serial_config->bitrate, + LOG_REC_KEY_SYSTEMD "=%.*s", (int) message_size, message, + NULL + ); + } + else{ + sd_journal_send( + SD_JOURNAL_SEND_DEFAULT_FIELDS, + LOG_REC_KEY_SYSTEMD "=%.*s", (int) message_size, message, + NULL + ); + } + } +#endif + + } /* FLB_TAIL, FLB_WEB_LOG and FLB_SERIAL case end */ + + /* FLB_KMSG case */ + else if(p_file_info->log_type == FLB_KMSG){ + + char *c; + + // see https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg + if((c = memchr(message, '\n', message_size))){ + + const char subsys_str[] = "SUBSYSTEM=", + device_str[] = "DEVICE="; + const size_t subsys_str_len = sizeof(subsys_str) - 1, + device_str_len = sizeof(device_str) - 1; + + size_t bytes_remain = message_size - (c - message); + + /* Extract machine-readable info for charts, such as subsystem and device. */ + while(bytes_remain){ + size_t sz = 0; + while(--bytes_remain && c[++sz] != '\n'); + if(bytes_remain) --sz; + *(c++) = '\\'; + *(c++) = 'n'; + sz--; + + DICTIONARY *dict = NULL; + char *str = NULL; + size_t str_len = 0; + if(!strncmp(c, subsys_str, subsys_str_len)){ + dict = p_file_info->parser_metrics->kernel->subsystem; + str = &c[subsys_str_len]; + str_len = (sz - subsys_str_len); + } + else if (!strncmp(c, device_str, device_str_len)){ + dict = p_file_info->parser_metrics->kernel->device; + str = &c[device_str_len]; + str_len = (sz - device_str_len); + } + + if(likely(str)){ + char *const key = mallocz(str_len + 1); + memcpy(key, str, str_len); + key[str_len] = '\0'; + metrics_dict_item_t item = {.dim_initialized = false, .num_new = 1}; + dictionary_set_advanced(dict, key, str_len + 1, &item, sizeof(item), NULL); + } + c = &c[sz]; + } + } + + if(likely(kmsg_sever >= 0)) + p_file_info->parser_metrics->kernel->sever[kmsg_sever]++; + + // TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0. + if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size))) + goto skip_collect_and_drop_logs; + + size_t tmp_item_off = buff->in->text_size; + + memcpy_iscntrl_fix(&buff->in->data[tmp_item_off], message, message_size); + tmp_item_off += message_size; + + buff->in->data[tmp_item_off++] = '\n'; + m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size"); + buff->in->text_size = new_tmp_text_size; + } /* FLB_KMSG case end */ + + /* FLB_SYSTEMD or FLB_SYSLOG case */ + else if(p_file_info->log_type == FLB_SYSTEMD || + p_file_info->log_type == FLB_SYSLOG){ + + int syslog_prival_d = SYSLOG_PRIOR_ARR_SIZE - 1; // Initialise to 'unknown' + int syslog_severity_d = SYSLOG_SEVER_ARR_SIZE - 1; // Initialise to 'unknown' + int syslog_facility_d = SYSLOG_FACIL_ARR_SIZE - 1; // Initialise to 'unknown' + + + /* FLB_SYSTEMD case has syslog_severity and syslog_facility values that + * are used to calculate syslog_prival from. FLB_SYSLOG is the opposite + * case, as it has a syslog_prival value that is used to calculate + * syslog_severity and syslog_facility from. */ + if(p_file_info->log_type == FLB_SYSTEMD){ + + /* Parse syslog_severity char* field into int and extract metrics. + * syslog_severity_s will consist of 1 char (plus '\0'), + * see https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.1 */ + if(likely(syslog_severity[0])){ + if(likely(str2int(&syslog_severity_d, syslog_severity, 10) == STR2XX_SUCCESS)){ + p_file_info->parser_metrics->systemd->sever[syslog_severity_d]++; + } // else parsing errors ++ ?? + } else p_file_info->parser_metrics->systemd->sever[SYSLOG_SEVER_ARR_SIZE - 1]++; // 'unknown' + + /* Parse syslog_facility char* field into int and extract metrics. + * syslog_facility_s will consist of up to 2 chars (plus '\0'), + * see https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.1 */ + if(likely(syslog_facility[0])){ + if(likely(str2int(&syslog_facility_d, syslog_facility, 10) == STR2XX_SUCCESS)){ + p_file_info->parser_metrics->systemd->facil[syslog_facility_d]++; + } // else parsing errors ++ ?? + } else p_file_info->parser_metrics->systemd->facil[SYSLOG_FACIL_ARR_SIZE - 1]++; // 'unknown' + + if(likely(syslog_severity[0] && syslog_facility[0])){ + /* Definition of syslog priority value == facility * 8 + severity */ + syslog_prival_d = syslog_facility_d * 8 + syslog_severity_d; + syslog_prival_size = snprintfz(syslog_prival, 4, "%d", syslog_prival_d); + m_assert(syslog_prival_size < 4 && syslog_prival_size > 0, "error with snprintf()"); + + new_tmp_text_size += syslog_prival_size + 2; // +2 for '<' and '>' + + p_file_info->parser_metrics->systemd->prior[syslog_prival_d]++; + } else { + new_tmp_text_size += 3; // +3 for "<->" string + p_file_info->parser_metrics->systemd->prior[SYSLOG_PRIOR_ARR_SIZE - 1]++; // 'unknown' + } + + } else if(p_file_info->log_type == FLB_SYSLOG){ + + if(likely(syslog_prival[0])){ + if(likely(str2int(&syslog_prival_d, syslog_prival, 10) == STR2XX_SUCCESS)){ + syslog_severity_d = syslog_prival_d % 8; + syslog_facility_d = syslog_prival_d / 8; + + p_file_info->parser_metrics->systemd->prior[syslog_prival_d]++; + p_file_info->parser_metrics->systemd->sever[syslog_severity_d]++; + p_file_info->parser_metrics->systemd->facil[syslog_facility_d]++; + + new_tmp_text_size += syslog_prival_size + 2; // +2 for '<' and '>' + + } // else parsing errors ++ ?? + } else { + new_tmp_text_size += 3; // +3 for "<->" string + p_file_info->parser_metrics->systemd->prior[SYSLOG_PRIOR_ARR_SIZE - 1]++; // 'unknown' + p_file_info->parser_metrics->systemd->sever[SYSLOG_SEVER_ARR_SIZE - 1]++; // 'unknown' + p_file_info->parser_metrics->systemd->facil[SYSLOG_FACIL_ARR_SIZE - 1]++; // 'unknown' + } + + } else m_assert(0, "shoudn't get here"); + + char syslog_time_from_flb_time[25]; // 25 just to be on the safe side, but 16 + 1 chars bytes needed only. + if(unlikely(!syslog_timestamp)){ + const time_t ts = tmp_time.tm.tv_sec; + struct tm *const tm = localtime(&ts); + + strftime(syslog_time_from_flb_time, sizeof(syslog_time_from_flb_time), "%b %d %H:%M:%S ", tm); + new_tmp_text_size += SYSLOG_TIMESTAMP_SIZE; + } + + if(unlikely(!syslog_identifier)) new_tmp_text_size += sizeof(UNKNOWN) - 1; + if(unlikely(!pid)) new_tmp_text_size += sizeof(UNKNOWN) - 1; + + new_tmp_text_size += 5; // +5 for '[', ']', ':' and ' ' characters around and after pid and '\n' at the end + + /* Metrics extracted, now prepare circular buffer for write */ + // TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0. + if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size))) + goto skip_collect_and_drop_logs; + + size_t tmp_item_off = buff->in->text_size; + + buff->in->data[tmp_item_off++] = '<'; + if(likely(syslog_prival[0])){ + memcpy(&buff->in->data[tmp_item_off], syslog_prival, syslog_prival_size); + m_assert(syslog_prival_size, "syslog_prival_size cannot be 0"); + tmp_item_off += syslog_prival_size; + } else buff->in->data[tmp_item_off++] = '-'; + buff->in->data[tmp_item_off++] = '>'; + + if(likely(syslog_timestamp)){ + memcpy(&buff->in->data[tmp_item_off], syslog_timestamp, syslog_timestamp_size); + // FLB_SYSLOG doesn't add space, but FLB_SYSTEMD does: + // if(buff->in->data[tmp_item_off] != ' ') buff->in->data[tmp_item_off++] = ' '; + tmp_item_off += syslog_timestamp_size; + } else { + memcpy(&buff->in->data[tmp_item_off], syslog_time_from_flb_time, SYSLOG_TIMESTAMP_SIZE); + tmp_item_off += SYSLOG_TIMESTAMP_SIZE; + } + + if(likely(hostname)){ + memcpy(&buff->in->data[tmp_item_off], hostname, hostname_size); + tmp_item_off += hostname_size; + buff->in->data[tmp_item_off++] = ' '; + } + + if(likely(syslog_identifier)){ + memcpy(&buff->in->data[tmp_item_off], syslog_identifier, syslog_identifier_size); + tmp_item_off += syslog_identifier_size; + } else { + memcpy(&buff->in->data[tmp_item_off], UNKNOWN, sizeof(UNKNOWN) - 1); + tmp_item_off += sizeof(UNKNOWN) - 1; + } + + buff->in->data[tmp_item_off++] = '['; + if(likely(pid)){ + memcpy(&buff->in->data[tmp_item_off], pid, pid_size); + tmp_item_off += pid_size; + } else { + memcpy(&buff->in->data[tmp_item_off], UNKNOWN, sizeof(UNKNOWN) - 1); + tmp_item_off += sizeof(UNKNOWN) - 1; + } + buff->in->data[tmp_item_off++] = ']'; + + buff->in->data[tmp_item_off++] = ':'; + buff->in->data[tmp_item_off++] = ' '; + + if(likely(message)){ + memcpy_iscntrl_fix(&buff->in->data[tmp_item_off], message, message_size); + tmp_item_off += message_size; + } + + buff->in->data[tmp_item_off++] = '\n'; + m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size"); + buff->in->text_size = new_tmp_text_size; + } /* FLB_SYSTEMD or FLB_SYSLOG case end */ + + /* FLB_DOCKER_EV case */ + else if(p_file_info->log_type == FLB_DOCKER_EV){ + + const size_t docker_ev_datetime_size = sizeof "2022-08-26T15:33:20.802840200+0000" /* example datetime */; + char docker_ev_datetime[docker_ev_datetime_size]; + docker_ev_datetime[0] = 0; + if(likely(docker_ev_time && docker_ev_timeNano)){ + struct timespec ts; + ts.tv_sec = docker_ev_time; + if(unlikely(0 == strftime( docker_ev_datetime, docker_ev_datetime_size, + "%Y-%m-%dT%H:%M:%S.000000000%z", localtime(&ts.tv_sec)))) { /* TODO: do what if error? */}; + const size_t docker_ev_timeNano_s_size = sizeof "802840200"; + char docker_ev_timeNano_s[docker_ev_timeNano_s_size]; + snprintfz( docker_ev_timeNano_s, docker_ev_timeNano_s_size, "%0*ld", + (int) docker_ev_timeNano_s_size, docker_ev_timeNano % 1000000000); + memcpy(&docker_ev_datetime[20], &docker_ev_timeNano_s, docker_ev_timeNano_s_size - 1); + + new_tmp_text_size += docker_ev_datetime_size; // -1 for null terminator, +1 for ' ' character + } + + if(likely(docker_ev_type && docker_ev_action)){ + int ev_off = -1; + while(++ev_off < NUM_OF_DOCKER_EV_TYPES){ + if(!strncmp(docker_ev_type, docker_ev_type_string[ev_off], docker_ev_type_size)){ + p_file_info->parser_metrics->docker_ev->ev_type[ev_off]++; + + int act_off = -1; + while(docker_ev_action_string[ev_off][++act_off] != NULL){ + if(!strncmp(docker_ev_action, docker_ev_action_string[ev_off][act_off], docker_ev_action_size)){ + p_file_info->parser_metrics->docker_ev->ev_action[ev_off][act_off]++; + break; + } + } + if(unlikely(docker_ev_action_string[ev_off][act_off] == NULL)) + p_file_info->parser_metrics->docker_ev->ev_action[NUM_OF_DOCKER_EV_TYPES - 1][0]++; // 'unknown' + + break; + } + } + if(unlikely(ev_off >= NUM_OF_DOCKER_EV_TYPES - 1)){ + p_file_info->parser_metrics->docker_ev->ev_type[ev_off]++; // 'unknown' + p_file_info->parser_metrics->docker_ev->ev_action[NUM_OF_DOCKER_EV_TYPES - 1][0]++; // 'unknown' + } + + new_tmp_text_size += docker_ev_type_size + docker_ev_action_size + 2; // +2 for ' ' chars + } + + if(likely(docker_ev_id)){ + // debug_log("docker_ev_id: %.*s", (int) docker_ev_id_size, docker_ev_id); + + new_tmp_text_size += docker_ev_id_size + 1; // +1 for ' ' char + } + + if(likely(docker_ev_attr.size)){ + for(int i = 0; i < docker_ev_attr.size; i++){ + new_tmp_text_size += docker_ev_attr.key_size[i] + + docker_ev_attr.val_size[i] + 3; // +3 for '=' ',' ' ' characters + } + /* new_tmp_text_size = -2 + 2; + * -2 due to missing ',' ' ' from last attribute and +2 for the two + * '(' and ')' characters, so no need to add or subtract */ + } + + new_tmp_text_size += 1; // +1 for '\n' character at the end + + /* Metrics extracted, now prepare circular buffer for write */ + // TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0. + if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size))) + goto skip_collect_and_drop_logs; + + size_t tmp_item_off = buff->in->text_size; + message_size = new_tmp_text_size - 1 - tmp_item_off; + + if(likely(*docker_ev_datetime)){ + memcpy(&buff->in->data[tmp_item_off], docker_ev_datetime, docker_ev_datetime_size - 1); + tmp_item_off += docker_ev_datetime_size - 1; // -1 due to null terminator + buff->in->data[tmp_item_off++] = ' '; + } + + if(likely(docker_ev_type)){ + memcpy(&buff->in->data[tmp_item_off], docker_ev_type, docker_ev_type_size); + tmp_item_off += docker_ev_type_size; + buff->in->data[tmp_item_off++] = ' '; + } + + if(likely(docker_ev_action)){ + memcpy(&buff->in->data[tmp_item_off], docker_ev_action, docker_ev_action_size); + tmp_item_off += docker_ev_action_size; + buff->in->data[tmp_item_off++] = ' '; + } + + if(likely(docker_ev_id)){ + memcpy(&buff->in->data[tmp_item_off], docker_ev_id, docker_ev_id_size); + tmp_item_off += docker_ev_id_size; + buff->in->data[tmp_item_off++] = ' '; + } + + if(likely(docker_ev_attr.size)){ + buff->in->data[tmp_item_off++] = '('; + for(int i = 0; i < docker_ev_attr.size; i++){ + memcpy(&buff->in->data[tmp_item_off], docker_ev_attr.key[i], docker_ev_attr.key_size[i]); + tmp_item_off += docker_ev_attr.key_size[i]; + buff->in->data[tmp_item_off++] = '='; + memcpy(&buff->in->data[tmp_item_off], docker_ev_attr.val[i], docker_ev_attr.val_size[i]); + tmp_item_off += docker_ev_attr.val_size[i]; + buff->in->data[tmp_item_off++] = ','; + buff->in->data[tmp_item_off++] = ' '; + } + tmp_item_off -= 2; // overwrite last ',' and ' ' characters with a ')' character + buff->in->data[tmp_item_off++] = ')'; + } + + buff->in->data[tmp_item_off++] = '\n'; + m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size"); + buff->in->text_size = new_tmp_text_size; + +#ifdef HAVE_SYSTEMD + if(p_file_info->do_sd_journal_send){ + sd_journal_send( + SD_JOURNAL_SEND_DEFAULT_FIELDS, + "%sDOCKER_EVENTS_TYPE=%.*s", sd_journal_field_prefix, (int) docker_ev_type_size, docker_ev_type, + "%sDOCKER_EVENTS_ACTION=%.*s", sd_journal_field_prefix, (int) docker_ev_action_size, docker_ev_action, + "%sDOCKER_EVENTS_ID=%.*s", sd_journal_field_prefix, (int) docker_ev_id_size, docker_ev_id, + LOG_REC_KEY_SYSTEMD "=%.*s", (int) message_size, &buff->in->data[tmp_item_off - 1 - message_size], + NULL + ); + } +#endif + + } /* FLB_DOCKER_EV case end */ + + /* FLB_MQTT case */ + else if(p_file_info->log_type == FLB_MQTT){ + if(likely(mqtt_topic)){ + char *const key = mallocz(mqtt_topic_size + 1); + memcpy(key, mqtt_topic, mqtt_topic_size); + key[mqtt_topic_size] = '\0'; + metrics_dict_item_t item = {.dim_initialized = false, .num_new = 1}; + dictionary_set_advanced(p_file_info->parser_metrics->mqtt->topic, key, mqtt_topic_size + 1, &item, sizeof(item), NULL); + + // TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0. + if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size))) + goto skip_collect_and_drop_logs; + + size_t tmp_item_off = buff->in->text_size; + + memcpy(&buff->in->data[tmp_item_off], mqtt_message, message_size); + tmp_item_off += message_size; + + buff->in->data[tmp_item_off++] = '\n'; + m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size"); + buff->in->text_size = new_tmp_text_size; + +#ifdef HAVE_SYSTEMD + if(p_file_info->do_sd_journal_send){ + sd_journal_send( + SD_JOURNAL_SEND_DEFAULT_FIELDS, + "%sMQTT_TOPIC=%s", key, + LOG_REC_KEY_SYSTEMD "=%.*s", (int) message_size, mqtt_message, + NULL + ); + } +#endif + + } + else m_assert(0, "missing mqtt topic"); + } + +skip_collect_and_drop_logs: + /* Following code is equivalent to msgpack_unpacked_destroy(&result) due + * to that function call being unavailable when using dl_open() */ + if(result.zone != NULL) { + dl_msgpack_zone_free(result.zone); + result.zone = NULL; + memset(&result.data, 0, sizeof(msgpack_object)); + } + + if(p_file_info) + uv_mutex_unlock(&p_file_info->flb_tmp_buff_mut); + + flb_lib_free(record); + return 0; + +} + +/** + * @brief Add a Fluent-Bit input that outputs to the "lib" Fluent-Bit plugin. + * @param[in] p_file_info Pointer to the log source struct where the input will + * be registered to. + * @return 0 on success, a negative number for any errors (see enum). + */ +int flb_add_input(struct File_info *const p_file_info){ + + enum return_values { + SUCCESS = 0, + INVALID_LOG_TYPE = -1, + CONFIG_READ_ERROR = -2, + FLB_PARSER_CREATE_ERROR = -3, + FLB_INPUT_ERROR = -4, + FLB_INPUT_SET_ERROR = -5, + FLB_OUTPUT_ERROR = -6, + FLB_OUTPUT_SET_ERROR = -7, + DEFAULT_ERROR = -8 + }; + + const int tag_max_size = 5; + static unsigned tag = 0; // incremental tag id to link flb inputs to outputs + char tag_s[tag_max_size]; + snprintfz(tag_s, tag_max_size, "%u", tag++); + + + switch(p_file_info->log_type){ + case FLB_TAIL: + case FLB_WEB_LOG: { + + char update_every_str[10]; + snprintfz(update_every_str, 10, "%d", p_file_info->update_every); + + debug_log("Setting up %s tail for %s (basename:%s)", + p_file_info->log_type == FLB_TAIL ? "FLB_TAIL" : "FLB_WEB_LOG", + p_file_info->filename, p_file_info->file_basename); + + Flb_tail_config_t *tail_config = (Flb_tail_config_t *) p_file_info->flb_config; + if(unlikely(!tail_config)) return CONFIG_READ_ERROR; + + /* Set up input from log source */ + p_file_info->flb_input = flb_input(ctx, "tail", NULL); + if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR; + if(flb_input_set(ctx, p_file_info->flb_input, + "Tag", tag_s, + "Path", p_file_info->filename, + "Key", LOG_REC_KEY, + "Refresh_Interval", update_every_str, + "Skip_Long_Lines", "On", + "Skip_Empty_Lines", "On", +#if defined(FLB_HAVE_INOTIFY) + "Inotify_Watcher", tail_config->use_inotify ? "true" : "false", +#endif + NULL) != 0) return FLB_INPUT_SET_ERROR; + + break; + } + case FLB_KMSG: { + debug_log( "Setting up FLB_KMSG collector"); + + Flb_kmsg_config_t *kmsg_config = (Flb_kmsg_config_t *) p_file_info->flb_config; + if(unlikely(!kmsg_config || + !kmsg_config->prio_level || + !*kmsg_config->prio_level)) return CONFIG_READ_ERROR; + + /* Set up kmsg input */ + p_file_info->flb_input = flb_input(ctx, "kmsg", NULL); + if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR; + if(flb_input_set(ctx, p_file_info->flb_input, + "Tag", tag_s, + "Prio_Level", kmsg_config->prio_level, + NULL) != 0) return FLB_INPUT_SET_ERROR; + + break; + } + case FLB_SYSTEMD: { + debug_log( "Setting up FLB_SYSTEMD collector"); + + /* Set up systemd input */ + p_file_info->flb_input = flb_input(ctx, "systemd", NULL); + if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR; + if(!strcmp(p_file_info->filename, SYSTEMD_DEFAULT_PATH)){ + if(flb_input_set(ctx, p_file_info->flb_input, + "Tag", tag_s, + "Read_From_Tail", "On", + "Strip_Underscores", "On", + NULL) != 0) return FLB_INPUT_SET_ERROR; + } else { + if(flb_input_set(ctx, p_file_info->flb_input, + "Tag", tag_s, + "Read_From_Tail", "On", + "Strip_Underscores", "On", + "Path", p_file_info->filename, + NULL) != 0) return FLB_INPUT_SET_ERROR; + } + + break; + } + case FLB_DOCKER_EV: { + debug_log( "Setting up FLB_DOCKER_EV collector"); + + /* Set up Docker Events parser */ + if(flb_parser_create( "docker_events_parser", /* parser name */ + "json", /* backend type */ + NULL, /* regex */ + FLB_TRUE, /* skip_empty */ + NULL, /* time format */ + NULL, /* time key */ + NULL, /* time offset */ + FLB_TRUE, /* time keep */ + FLB_FALSE, /* time strict */ + FLB_FALSE, /* no bare keys */ + NULL, /* parser types */ + 0, /* types len */ + NULL, /* decoders */ + ctx->config) == NULL) return FLB_PARSER_CREATE_ERROR; + + /* Set up Docker Events input */ + p_file_info->flb_input = flb_input(ctx, "docker_events", NULL); + if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR; + if(flb_input_set(ctx, p_file_info->flb_input, + "Tag", tag_s, + "Parser", "docker_events_parser", + "Unix_Path", p_file_info->filename, + NULL) != 0) return FLB_INPUT_SET_ERROR; + + break; + } + case FLB_SYSLOG: { + debug_log( "Setting up FLB_SYSLOG collector"); + + /* Set up syslog parser */ + const char syslog_parser_prfx[] = "syslog_parser_"; + size_t parser_name_size = sizeof(syslog_parser_prfx) + tag_max_size - 1; + char parser_name[parser_name_size]; + snprintfz(parser_name, parser_name_size, "%s%u", syslog_parser_prfx, tag); + + Syslog_parser_config_t *syslog_config = (Syslog_parser_config_t *) p_file_info->parser_config->gen_config; + if(unlikely(!syslog_config || + !syslog_config->socket_config || + !syslog_config->socket_config->mode || + !p_file_info->filename)) return CONFIG_READ_ERROR; + + if(flb_parser_create( parser_name, /* parser name */ + "regex", /* backend type */ + syslog_config->log_format, /* regex */ + FLB_TRUE, /* skip_empty */ + NULL, /* time format */ + NULL, /* time key */ + NULL, /* time offset */ + FLB_TRUE, /* time keep */ + FLB_TRUE, /* time strict */ + FLB_FALSE, /* no bare keys */ + NULL, /* parser types */ + 0, /* types len */ + NULL, /* decoders */ + ctx->config) == NULL) return FLB_PARSER_CREATE_ERROR; + + /* Set up syslog input */ + p_file_info->flb_input = flb_input(ctx, "syslog", NULL); + if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR; + if( !strcmp(syslog_config->socket_config->mode, "unix_udp") || + !strcmp(syslog_config->socket_config->mode, "unix_tcp")){ + m_assert(syslog_config->socket_config->unix_perm, "unix_perm is not set"); + if(flb_input_set(ctx, p_file_info->flb_input, + "Tag", tag_s, + "Path", p_file_info->filename, + "Parser", parser_name, + "Mode", syslog_config->socket_config->mode, + "Unix_Perm", syslog_config->socket_config->unix_perm, + NULL) != 0) return FLB_INPUT_SET_ERROR; + } else if( !strcmp(syslog_config->socket_config->mode, "udp") || + !strcmp(syslog_config->socket_config->mode, "tcp")){ + m_assert(syslog_config->socket_config->listen, "listen is not set"); + m_assert(syslog_config->socket_config->port, "port is not set"); + if(flb_input_set(ctx, p_file_info->flb_input, + "Tag", tag_s, + "Parser", parser_name, + "Mode", syslog_config->socket_config->mode, + "Listen", syslog_config->socket_config->listen, + "Port", syslog_config->socket_config->port, + NULL) != 0) return FLB_INPUT_SET_ERROR; + } else return FLB_INPUT_SET_ERROR; // should never reach this line + + break; + } + case FLB_SERIAL: { + debug_log( "Setting up FLB_SERIAL collector"); + + Flb_serial_config_t *serial_config = (Flb_serial_config_t *) p_file_info->flb_config; + if(unlikely(!serial_config || + !serial_config->bitrate || + !*serial_config->bitrate || + !serial_config->min_bytes || + !*serial_config->min_bytes || + !p_file_info->filename)) return CONFIG_READ_ERROR; + + /* Set up serial input */ + p_file_info->flb_input = flb_input(ctx, "serial", NULL); + if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR; + if(flb_input_set(ctx, p_file_info->flb_input, + "Tag", tag_s, + "File", p_file_info->filename, + "Bitrate", serial_config->bitrate, + "Min_Bytes", serial_config->min_bytes, + "Separator", serial_config->separator, + "Format", serial_config->format, + NULL) != 0) return FLB_INPUT_SET_ERROR; + + break; + } + case FLB_MQTT: { + debug_log( "Setting up FLB_MQTT collector"); + + Flb_socket_config_t *socket_config = (Flb_socket_config_t *) p_file_info->flb_config; + if(unlikely(!socket_config || !socket_config->listen || !*socket_config->listen || + !socket_config->port || !*socket_config->port)) return CONFIG_READ_ERROR; + + /* Set up MQTT input */ + p_file_info->flb_input = flb_input(ctx, "mqtt", NULL); + if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR; + if(flb_input_set(ctx, p_file_info->flb_input, + "Tag", tag_s, + "Listen", socket_config->listen, + "Port", socket_config->port, + NULL) != 0) return FLB_INPUT_SET_ERROR; + + break; + } + default: { + m_assert(0, "default: case in flb_add_input() error"); + return DEFAULT_ERROR; // Shouldn't reach here + } + } + + /* Set up user-configured outputs */ + for(Flb_output_config_t *output = p_file_info->flb_outputs; output; output = output->next){ + debug_log( "setting up user output [%s]", output->plugin); + + int out = flb_output(ctx, output->plugin, NULL); + if(out < 0) return FLB_OUTPUT_ERROR; + if(flb_output_set(ctx, out, + "Match", tag_s, + NULL) != 0) return FLB_OUTPUT_SET_ERROR; + for(struct flb_output_config_param *param = output->param; param; param = param->next){ + debug_log( "setting up param [%s][%s] of output [%s]", param->key, param->val, output->plugin); + if(flb_output_set(ctx, out, + param->key, param->val, + NULL) != 0) return FLB_OUTPUT_SET_ERROR; + } + } + + /* Set up "lib" output */ + struct flb_lib_out_cb *callback = mallocz(sizeof(struct flb_lib_out_cb)); + callback->cb = flb_collect_logs_cb; + callback->data = p_file_info; + if(((p_file_info->flb_lib_output = flb_output(ctx, "lib", callback)) < 0) || + (flb_output_set(ctx, p_file_info->flb_lib_output, "Match", tag_s, NULL) != 0)){ + freez(callback); + return FLB_OUTPUT_ERROR; + } + + return SUCCESS; +} + +/** + * @brief Add a Fluent-Bit Forward input. + * @details This creates a unix or network socket to accept logs using + * Fluent Bit's Forward protocol. For more information see: + * https://docs.fluentbit.io/manual/pipeline/inputs/forward + * @param[in] forward_in_config Configuration of the Forward input socket. + * @return 0 on success, -1 on error. + */ +int flb_add_fwd_input(Flb_socket_config_t *forward_in_config){ + + if(forward_in_config == NULL){ + debug_log( "forward: forward_in_config is NULL"); + collector_info("forward_in_config is NULL"); + return 0; + } + + do{ + debug_log( "forward: Setting up flb_add_fwd_input()"); + + int input, output; + + if((input = flb_input(ctx, "forward", NULL)) < 0) break; + + if( forward_in_config->unix_path && *forward_in_config->unix_path && + forward_in_config->unix_perm && *forward_in_config->unix_perm){ + if(flb_input_set(ctx, input, + "Tag_Prefix", "fwd", + "Unix_Path", forward_in_config->unix_path, + "Unix_Perm", forward_in_config->unix_perm, + NULL) != 0) break; + } else if( forward_in_config->listen && *forward_in_config->listen && + forward_in_config->port && *forward_in_config->port){ + if(flb_input_set(ctx, input, + "Tag_Prefix", "fwd", + "Listen", forward_in_config->listen, + "Port", forward_in_config->port, + NULL) != 0) break; + } else break; // should never reach this line + + fwd_input_out_cb = mallocz(sizeof(struct flb_lib_out_cb)); + + /* Set up output */ + fwd_input_out_cb->cb = flb_collect_logs_cb; + fwd_input_out_cb->data = NULL; + if((output = flb_output(ctx, "lib", fwd_input_out_cb)) < 0) break; + if(flb_output_set(ctx, output, + "Match", "fwd*", + NULL) != 0) break; + + debug_log( "forward: Set up flb_add_fwd_input() with success"); + return 0; + } while(0); + + /* Error */ + if(fwd_input_out_cb) freez(fwd_input_out_cb); + return -1; +} + +void flb_free_fwd_input_out_cb(void){ + freez(fwd_input_out_cb); +} \ No newline at end of file -- cgit v1.2.3