diff options
Diffstat (limited to 'src/fluent-bit/plugins/in_tail/tail_multiline.c')
-rw-r--r-- | src/fluent-bit/plugins/in_tail/tail_multiline.c | 606 |
1 files changed, 0 insertions, 606 deletions
diff --git a/src/fluent-bit/plugins/in_tail/tail_multiline.c b/src/fluent-bit/plugins/in_tail/tail_multiline.c deleted file mode 100644 index 71c031014..000000000 --- a/src/fluent-bit/plugins/in_tail/tail_multiline.c +++ /dev/null @@ -1,606 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_input_plugin.h> -#include <fluent-bit/flb_config.h> -#include <fluent-bit/flb_kv.h> - -#include "tail_config.h" -#include "tail_multiline.h" - -static int tail_mult_append(struct flb_parser *parser, - struct flb_tail_config *ctx) -{ - struct flb_tail_mult *mp; - - mp = flb_malloc(sizeof(struct flb_tail_mult)); - if (!mp) { - flb_errno(); - return -1; - } - - mp->parser = parser; - mk_list_add(&mp->_head, &ctx->mult_parsers); - - return 0; -} - -int flb_tail_mult_create(struct flb_tail_config *ctx, - struct flb_input_instance *ins, - struct flb_config *config) -{ - int ret; - const char *tmp; - struct mk_list *head; - struct flb_parser *parser; - struct flb_kv *kv; - - if (ctx->multiline_flush <= 0) { - ctx->multiline_flush = 1; - } - - mk_list_init(&ctx->mult_parsers); - - /* Get firstline parser */ - tmp = flb_input_get_property("parser_firstline", ins); - if (!tmp) { - flb_plg_error(ctx->ins, "multiline: no parser defined for firstline"); - return -1; - } - parser = flb_parser_get(tmp, config); - if (!parser) { - flb_plg_error(ctx->ins, "multiline: invalid parser '%s'", tmp); - return -1; - } - - ctx->mult_parser_firstline = parser; - - /* Read all multiline rules */ - mk_list_foreach(head, &ins->properties) { - kv = mk_list_entry(head, struct flb_kv, _head); - if (strcasecmp("parser_firstline", kv->key) == 0) { - continue; - } - - if (strncasecmp("parser_", kv->key, 7) == 0) { - parser = flb_parser_get(kv->val, config); - if (!parser) { - flb_plg_error(ctx->ins, "multiline: invalid parser '%s'", kv->val); - return -1; - } - - ret = tail_mult_append(parser, ctx); - if (ret == -1) { - return -1; - } - } - } - - return 0; -} - -int flb_tail_mult_destroy(struct flb_tail_config *ctx) -{ - struct mk_list *tmp; - struct mk_list *head; - struct flb_tail_mult *mp; - - if (ctx->multiline == FLB_FALSE) { - return 0; - } - - mk_list_foreach_safe(head, tmp, &ctx->mult_parsers) { - mp = mk_list_entry(head, struct flb_tail_mult, _head); - mk_list_del(&mp->_head); - flb_free(mp); - } - - return 0; -} - -/* Process the result of a firstline match */ -int flb_tail_mult_process_first(time_t now, - char *buf, size_t size, - struct flb_time *out_time, - struct flb_tail_file *file, - struct flb_tail_config *ctx) -{ - int ret; - size_t off; - msgpack_object map; - msgpack_unpacked result; - - /* If a previous multiline context already exists, flush first */ - if (file->mult_firstline && !file->mult_skipping) { - flb_tail_mult_flush(file, ctx); - } - - /* Remark as first multiline message */ - file->mult_firstline = FLB_TRUE; - - /* Validate obtained time, if not set, set the current time */ - if (flb_time_to_nanosec(out_time) == 0L) { - flb_time_get(out_time); - } - - /* Should we skip this multiline record ? */ - if (ctx->ignore_older > 0) { - if ((now - ctx->ignore_older) > out_time->tm.tv_sec) { - flb_free(buf); - file->mult_skipping = FLB_TRUE; - file->mult_firstline = FLB_TRUE; - - /* we expect more data to skip */ - return FLB_TAIL_MULT_MORE; - } - } - - /* Re-initiate buffers */ - msgpack_sbuffer_init(&file->mult_sbuf); - msgpack_packer_init(&file->mult_pck, &file->mult_sbuf, msgpack_sbuffer_write); - - /* - * flb_parser_do() always return a msgpack buffer, so we tweak our - * local msgpack reference to avoid an extra allocation. The only - * concern is that we don't know what's the real size of the memory - * allocated, so we assume it's just 'out_size'. - */ - file->mult_flush_timeout = now + (ctx->multiline_flush - 1); - file->mult_sbuf.data = buf; - file->mult_sbuf.size = size; - file->mult_sbuf.alloc = size; - - /* Set multiline status */ - file->mult_firstline = FLB_TRUE; - file->mult_skipping = FLB_FALSE; - flb_time_copy(&file->mult_time, out_time); - - off = 0; - msgpack_unpacked_init(&result); - ret = msgpack_unpack_next(&result, buf, size, &off); - if (ret != MSGPACK_UNPACK_SUCCESS) { - msgpack_sbuffer_destroy(&file->mult_sbuf); - msgpack_unpacked_destroy(&result); - return FLB_TAIL_MULT_NA; - } - - map = result.data; - file->mult_keys = map.via.map.size; - msgpack_unpacked_destroy(&result); - - /* We expect more data */ - return FLB_TAIL_MULT_MORE; -} - -/* Append a raw log entry to the last structured field in the mult buffer */ -static inline void flb_tail_mult_append_raw(char *buf, int size, - struct flb_tail_file *file, - struct flb_tail_config *config) -{ - /* Append the raw string */ - msgpack_pack_str(&file->mult_pck, size); - msgpack_pack_str_body(&file->mult_pck, buf, size); -} - -/* Check if the last key value type of a map is string or not */ -static inline int is_last_key_val_string(char *buf, size_t size) -{ - int ret = FLB_FALSE; - size_t off; - msgpack_unpacked result; - msgpack_object v; - msgpack_object root; - - off = 0; - msgpack_unpacked_init(&result); - ret = msgpack_unpack_next(&result, buf, size, &off); - if (ret != MSGPACK_UNPACK_SUCCESS) { - return ret; - } - - root = result.data; - if (root.type != MSGPACK_OBJECT_MAP) { - ret = FLB_FALSE; - } - else { - if (root.via.map.size == 0) { - ret = FLB_FALSE; - } - else { - v = root.via.map.ptr[root.via.map.size - 1].val; - if (v.type == MSGPACK_OBJECT_STR) { - ret = FLB_TRUE; - } - } - } - - msgpack_unpacked_destroy(&result); - return ret; -} - -int flb_tail_mult_process_content(time_t now, - char *buf, size_t len, - struct flb_tail_file *file, - struct flb_tail_config *ctx, - size_t processed_bytes) -{ - int ret; - size_t off; - void *out_buf; - size_t out_size = 0; - struct mk_list *head; - struct flb_tail_mult *mult_parser = NULL; - struct flb_time out_time = {0}; - msgpack_object map; - msgpack_unpacked result; - - /* Always check if this line is the beginning of a new multiline message */ - ret = flb_parser_do(ctx->mult_parser_firstline, - buf, len, - &out_buf, &out_size, &out_time); - if (ret >= 0) { - /* - * The content is a candidate for a firstline, but we need to perform - * the extra-mandatory check where the last key value type must be - * a string, otherwise no string concatenation with continuation lines - * will be possible. - */ - ret = is_last_key_val_string(out_buf, out_size); - if (ret == FLB_TRUE) - file->mult_firstline_append = FLB_TRUE; - else - file->mult_firstline_append = FLB_FALSE; - - flb_tail_mult_process_first(now, out_buf, out_size, &out_time, - file, ctx); - return FLB_TAIL_MULT_MORE; - } - - if (file->mult_skipping == FLB_TRUE) { - return FLB_TAIL_MULT_MORE; - } - - /* - * Once here means we have some data that is a continuation, iterate - * parsers trying to find a match - */ - out_buf = NULL; - mk_list_foreach(head, &ctx->mult_parsers) { - mult_parser = mk_list_entry(head, struct flb_tail_mult, _head); - - /* Process line text with current parser */ - out_buf = NULL; - out_size = 0; - ret = flb_parser_do(mult_parser->parser, - buf, len, - &out_buf, &out_size, &out_time); - if (ret < 0) { - mult_parser = NULL; - continue; - } - - /* The line was processed, break the loop and buffer the data */ - break; - } - - if (!mult_parser) { - /* - * If no parser was found means the string log must be appended - * to the last structured field. - */ - if (file->mult_firstline && file->mult_firstline_append) { - flb_tail_mult_append_raw(buf, len, file, ctx); - } - else { - flb_tail_file_pack_line(NULL, buf, len, file, processed_bytes); - } - - return FLB_TAIL_MULT_MORE; - } - - off = 0; - msgpack_unpacked_init(&result); - msgpack_unpack_next(&result, out_buf, out_size, &off); - map = result.data; - - /* Append new map to our local msgpack buffer */ - file->mult_keys += map.via.map.size; - msgpack_unpacked_destroy(&result); - msgpack_sbuffer_write(&file->mult_sbuf, out_buf, out_size); - flb_free(out_buf); - - return FLB_TAIL_MULT_MORE; -} - -static int flb_tail_mult_pack_line_body( - struct flb_log_event_encoder *context, - struct flb_tail_file *file) -{ - size_t adjacent_object_offset; - size_t continuation_length; - msgpack_unpacked adjacent_object; - msgpack_unpacked current_object; - size_t entry_index; - msgpack_object entry_value; - msgpack_object entry_key; - msgpack_object_map *data_map; - int map_size; - size_t offset; - struct flb_tail_config *config; - int result; - - result = FLB_EVENT_ENCODER_SUCCESS; - config = (struct flb_tail_config *) file->config; - - /* New Map size */ - map_size = file->mult_keys; - - if (file->config->path_key != NULL) { - map_size++; - - result = flb_log_event_encoder_append_body_values( - context, - FLB_LOG_EVENT_CSTRING_VALUE(config->path_key), - FLB_LOG_EVENT_CSTRING_VALUE(file->name)); - } - - - msgpack_unpacked_init(¤t_object); - msgpack_unpacked_init(&adjacent_object); - - offset = 0; - - while (result == FLB_EVENT_ENCODER_SUCCESS && - msgpack_unpack_next(¤t_object, - file->mult_sbuf.data, - file->mult_sbuf.size, - &offset) == MSGPACK_UNPACK_SUCCESS) { - if (current_object.data.type != MSGPACK_OBJECT_MAP) { - continue; - } - - data_map = ¤t_object.data.via.map; - - continuation_length = 0; - - for (entry_index = 0; entry_index < data_map->size; entry_index++) { - entry_key = data_map->ptr[entry_index].key; - entry_value = data_map->ptr[entry_index].val; - - result = flb_log_event_encoder_append_body_msgpack_object(context, - &entry_key); - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - break; - } - - /* Check if this is the last entry in the map and if that is - * the case then add the lengths of all the trailing string - * objects after the map in order to append them to the value - * but only if the value object is a string - */ - if (entry_index + 1 == data_map->size && - entry_value.type == MSGPACK_OBJECT_STR) { - adjacent_object_offset = offset; - - while (msgpack_unpack_next( - &adjacent_object, - file->mult_sbuf.data, - file->mult_sbuf.size, - &adjacent_object_offset) == MSGPACK_UNPACK_SUCCESS) { - if (adjacent_object.data.type != MSGPACK_OBJECT_STR) { - break; - } - - /* Sum total bytes to append */ - continuation_length += adjacent_object.data.via.str.size + 1; - } - - result = flb_log_event_encoder_append_body_string_length( - context, - entry_value.via.str.size + - continuation_length); - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - break; - } - - result = flb_log_event_encoder_append_body_string_body( - context, - (char *) entry_value.via.str.ptr, - entry_value.via.str.size); - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - break; - } - - if (continuation_length > 0) { - adjacent_object_offset = offset; - - while (msgpack_unpack_next( - &adjacent_object, - file->mult_sbuf.data, - file->mult_sbuf.size, - &adjacent_object_offset) == MSGPACK_UNPACK_SUCCESS) { - if (adjacent_object.data.type != MSGPACK_OBJECT_STR) { - break; - } - - result = flb_log_event_encoder_append_body_string_body( - context, - "\n", - 1); - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - break; - } - - result = flb_log_event_encoder_append_body_string_body( - context, - (char *) adjacent_object.data.via.str.ptr, - adjacent_object.data.via.str.size); - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - break; - } - } - } - } - else { - result = flb_log_event_encoder_append_body_msgpack_object(context, - &entry_value); - } - } - } - - msgpack_unpacked_destroy(¤t_object); - msgpack_unpacked_destroy(&adjacent_object); - - /* Reset status */ - file->mult_firstline = FLB_FALSE; - file->mult_skipping = FLB_FALSE; - file->mult_keys = 0; - file->mult_flush_timeout = 0; - - msgpack_sbuffer_destroy(&file->mult_sbuf); - - file->mult_sbuf.data = NULL; - - flb_time_zero(&file->mult_time); - - return result; -} - -/* Flush any multiline context data into outgoing buffers */ -int flb_tail_mult_flush(struct flb_tail_file *file, struct flb_tail_config *ctx) -{ - int result; - - /* nothing to flush */ - if (file->mult_firstline == FLB_FALSE) { - return -1; - } - - if (file->mult_keys == 0) { - return -1; - } - - result = flb_log_event_encoder_begin_record(file->ml_log_event_encoder); - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_set_timestamp( - file->ml_log_event_encoder, &file->mult_time); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_tail_mult_pack_line_body( - file->ml_log_event_encoder, - file); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - result = flb_log_event_encoder_commit_record( - file->ml_log_event_encoder); - } - - if (result == FLB_EVENT_ENCODER_SUCCESS) { - flb_input_log_append(ctx->ins, - file->tag_buf, - file->tag_len, - file->ml_log_event_encoder->output_buffer, - file->ml_log_event_encoder->output_length); - result = 0; - } - else { - flb_plg_error(file->config->ins, "error packing event : %d", result); - - result = -1; - } - - flb_log_event_encoder_reset(file->ml_log_event_encoder); - - return result; -} - -static void file_pending_flush(struct flb_tail_config *ctx, - struct flb_tail_file *file, time_t now) -{ - if (file->mult_flush_timeout > now) { - return; - } - - if (file->mult_firstline == FLB_FALSE) { - if (file->mult_sbuf.data == NULL || file->mult_sbuf.size <= 0) { - return; - } - } - - flb_tail_mult_flush(file, ctx); -} - -int flb_tail_mult_pending_flush_all(struct flb_tail_config *ctx) -{ - time_t expired; - struct mk_list *head; - struct flb_tail_file *file; - - expired = time(NULL) + 3600; - - /* Iterate promoted event files with pending bytes */ - mk_list_foreach(head, &ctx->files_static) { - file = mk_list_entry(head, struct flb_tail_file, _head); - file_pending_flush(ctx, file, expired); - } - - /* Iterate promoted event files with pending bytes */ - mk_list_foreach(head, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - file_pending_flush(ctx, file, expired); - } - - return 0; -} - -int flb_tail_mult_pending_flush(struct flb_input_instance *ins, - struct flb_config *config, void *context) -{ - time_t now; - struct mk_list *head; - struct flb_tail_file *file; - struct flb_tail_config *ctx = context; - - now = time(NULL); - - /* Iterate promoted event files with pending bytes */ - mk_list_foreach(head, &ctx->files_static) { - file = mk_list_entry(head, struct flb_tail_file, _head); - - file_pending_flush(ctx, file, now); - } - - /* Iterate promoted event files with pending bytes */ - mk_list_foreach(head, &ctx->files_event) { - file = mk_list_entry(head, struct flb_tail_file, _head); - - file_pending_flush(ctx, file, now); - } - - return 0; -} |