diff options
Diffstat (limited to 'fluent-bit/plugins/in_tail/tail_multiline.c')
-rw-r--r-- | fluent-bit/plugins/in_tail/tail_multiline.c | 606 |
1 files changed, 606 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_tail/tail_multiline.c b/fluent-bit/plugins/in_tail/tail_multiline.c new file mode 100644 index 000000000..71c031014 --- /dev/null +++ b/fluent-bit/plugins/in_tail/tail_multiline.c @@ -0,0 +1,606 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_input_plugin.h> +#include <fluent-bit/flb_config.h> +#include <fluent-bit/flb_kv.h> + +#include "tail_config.h" +#include "tail_multiline.h" + +static int tail_mult_append(struct flb_parser *parser, + struct flb_tail_config *ctx) +{ + struct flb_tail_mult *mp; + + mp = flb_malloc(sizeof(struct flb_tail_mult)); + if (!mp) { + flb_errno(); + return -1; + } + + mp->parser = parser; + mk_list_add(&mp->_head, &ctx->mult_parsers); + + return 0; +} + +int flb_tail_mult_create(struct flb_tail_config *ctx, + struct flb_input_instance *ins, + struct flb_config *config) +{ + int ret; + const char *tmp; + struct mk_list *head; + struct flb_parser *parser; + struct flb_kv *kv; + + if (ctx->multiline_flush <= 0) { + ctx->multiline_flush = 1; + } + + mk_list_init(&ctx->mult_parsers); + + /* Get firstline parser */ + tmp = flb_input_get_property("parser_firstline", ins); + if (!tmp) { + flb_plg_error(ctx->ins, "multiline: no parser defined for firstline"); + return -1; + } + parser = flb_parser_get(tmp, config); + if (!parser) { + flb_plg_error(ctx->ins, "multiline: invalid parser '%s'", tmp); + return -1; + } + + ctx->mult_parser_firstline = parser; + + /* Read all multiline rules */ + mk_list_foreach(head, &ins->properties) { + kv = mk_list_entry(head, struct flb_kv, _head); + if (strcasecmp("parser_firstline", kv->key) == 0) { + continue; + } + + if (strncasecmp("parser_", kv->key, 7) == 0) { + parser = flb_parser_get(kv->val, config); + if (!parser) { + flb_plg_error(ctx->ins, "multiline: invalid parser '%s'", kv->val); + return -1; + } + + ret = tail_mult_append(parser, ctx); + if (ret == -1) { + return -1; + } + } + } + + return 0; +} + +int flb_tail_mult_destroy(struct flb_tail_config *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_tail_mult *mp; + + if (ctx->multiline == FLB_FALSE) { + return 0; + } + + mk_list_foreach_safe(head, tmp, &ctx->mult_parsers) { + mp = mk_list_entry(head, struct flb_tail_mult, _head); + mk_list_del(&mp->_head); + flb_free(mp); + } + + return 0; +} + +/* Process the result of a firstline match */ +int flb_tail_mult_process_first(time_t now, + char *buf, size_t size, + struct flb_time *out_time, + struct flb_tail_file *file, + struct flb_tail_config *ctx) +{ + int ret; + size_t off; + msgpack_object map; + msgpack_unpacked result; + + /* If a previous multiline context already exists, flush first */ + if (file->mult_firstline && !file->mult_skipping) { + flb_tail_mult_flush(file, ctx); + } + + /* Remark as first multiline message */ + file->mult_firstline = FLB_TRUE; + + /* Validate obtained time, if not set, set the current time */ + if (flb_time_to_nanosec(out_time) == 0L) { + flb_time_get(out_time); + } + + /* Should we skip this multiline record ? */ + if (ctx->ignore_older > 0) { + if ((now - ctx->ignore_older) > out_time->tm.tv_sec) { + flb_free(buf); + file->mult_skipping = FLB_TRUE; + file->mult_firstline = FLB_TRUE; + + /* we expect more data to skip */ + return FLB_TAIL_MULT_MORE; + } + } + + /* Re-initiate buffers */ + msgpack_sbuffer_init(&file->mult_sbuf); + msgpack_packer_init(&file->mult_pck, &file->mult_sbuf, msgpack_sbuffer_write); + + /* + * flb_parser_do() always return a msgpack buffer, so we tweak our + * local msgpack reference to avoid an extra allocation. The only + * concern is that we don't know what's the real size of the memory + * allocated, so we assume it's just 'out_size'. + */ + file->mult_flush_timeout = now + (ctx->multiline_flush - 1); + file->mult_sbuf.data = buf; + file->mult_sbuf.size = size; + file->mult_sbuf.alloc = size; + + /* Set multiline status */ + file->mult_firstline = FLB_TRUE; + file->mult_skipping = FLB_FALSE; + flb_time_copy(&file->mult_time, out_time); + + off = 0; + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, buf, size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + msgpack_sbuffer_destroy(&file->mult_sbuf); + msgpack_unpacked_destroy(&result); + return FLB_TAIL_MULT_NA; + } + + map = result.data; + file->mult_keys = map.via.map.size; + msgpack_unpacked_destroy(&result); + + /* We expect more data */ + return FLB_TAIL_MULT_MORE; +} + +/* Append a raw log entry to the last structured field in the mult buffer */ +static inline void flb_tail_mult_append_raw(char *buf, int size, + struct flb_tail_file *file, + struct flb_tail_config *config) +{ + /* Append the raw string */ + msgpack_pack_str(&file->mult_pck, size); + msgpack_pack_str_body(&file->mult_pck, buf, size); +} + +/* Check if the last key value type of a map is string or not */ +static inline int is_last_key_val_string(char *buf, size_t size) +{ + int ret = FLB_FALSE; + size_t off; + msgpack_unpacked result; + msgpack_object v; + msgpack_object root; + + off = 0; + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, buf, size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + return ret; + } + + root = result.data; + if (root.type != MSGPACK_OBJECT_MAP) { + ret = FLB_FALSE; + } + else { + if (root.via.map.size == 0) { + ret = FLB_FALSE; + } + else { + v = root.via.map.ptr[root.via.map.size - 1].val; + if (v.type == MSGPACK_OBJECT_STR) { + ret = FLB_TRUE; + } + } + } + + msgpack_unpacked_destroy(&result); + return ret; +} + +int flb_tail_mult_process_content(time_t now, + char *buf, size_t len, + struct flb_tail_file *file, + struct flb_tail_config *ctx, + size_t processed_bytes) +{ + int ret; + size_t off; + void *out_buf; + size_t out_size = 0; + struct mk_list *head; + struct flb_tail_mult *mult_parser = NULL; + struct flb_time out_time = {0}; + msgpack_object map; + msgpack_unpacked result; + + /* Always check if this line is the beginning of a new multiline message */ + ret = flb_parser_do(ctx->mult_parser_firstline, + buf, len, + &out_buf, &out_size, &out_time); + if (ret >= 0) { + /* + * The content is a candidate for a firstline, but we need to perform + * the extra-mandatory check where the last key value type must be + * a string, otherwise no string concatenation with continuation lines + * will be possible. + */ + ret = is_last_key_val_string(out_buf, out_size); + if (ret == FLB_TRUE) + file->mult_firstline_append = FLB_TRUE; + else + file->mult_firstline_append = FLB_FALSE; + + flb_tail_mult_process_first(now, out_buf, out_size, &out_time, + file, ctx); + return FLB_TAIL_MULT_MORE; + } + + if (file->mult_skipping == FLB_TRUE) { + return FLB_TAIL_MULT_MORE; + } + + /* + * Once here means we have some data that is a continuation, iterate + * parsers trying to find a match + */ + out_buf = NULL; + mk_list_foreach(head, &ctx->mult_parsers) { + mult_parser = mk_list_entry(head, struct flb_tail_mult, _head); + + /* Process line text with current parser */ + out_buf = NULL; + out_size = 0; + ret = flb_parser_do(mult_parser->parser, + buf, len, + &out_buf, &out_size, &out_time); + if (ret < 0) { + mult_parser = NULL; + continue; + } + + /* The line was processed, break the loop and buffer the data */ + break; + } + + if (!mult_parser) { + /* + * If no parser was found means the string log must be appended + * to the last structured field. + */ + if (file->mult_firstline && file->mult_firstline_append) { + flb_tail_mult_append_raw(buf, len, file, ctx); + } + else { + flb_tail_file_pack_line(NULL, buf, len, file, processed_bytes); + } + + return FLB_TAIL_MULT_MORE; + } + + off = 0; + msgpack_unpacked_init(&result); + msgpack_unpack_next(&result, out_buf, out_size, &off); + map = result.data; + + /* Append new map to our local msgpack buffer */ + file->mult_keys += map.via.map.size; + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_write(&file->mult_sbuf, out_buf, out_size); + flb_free(out_buf); + + return FLB_TAIL_MULT_MORE; +} + +static int flb_tail_mult_pack_line_body( + struct flb_log_event_encoder *context, + struct flb_tail_file *file) +{ + size_t adjacent_object_offset; + size_t continuation_length; + msgpack_unpacked adjacent_object; + msgpack_unpacked current_object; + size_t entry_index; + msgpack_object entry_value; + msgpack_object entry_key; + msgpack_object_map *data_map; + int map_size; + size_t offset; + struct flb_tail_config *config; + int result; + + result = FLB_EVENT_ENCODER_SUCCESS; + config = (struct flb_tail_config *) file->config; + + /* New Map size */ + map_size = file->mult_keys; + + if (file->config->path_key != NULL) { + map_size++; + + result = flb_log_event_encoder_append_body_values( + context, + FLB_LOG_EVENT_CSTRING_VALUE(config->path_key), + FLB_LOG_EVENT_CSTRING_VALUE(file->name)); + } + + + msgpack_unpacked_init(¤t_object); + msgpack_unpacked_init(&adjacent_object); + + offset = 0; + + while (result == FLB_EVENT_ENCODER_SUCCESS && + msgpack_unpack_next(¤t_object, + file->mult_sbuf.data, + file->mult_sbuf.size, + &offset) == MSGPACK_UNPACK_SUCCESS) { + if (current_object.data.type != MSGPACK_OBJECT_MAP) { + continue; + } + + data_map = ¤t_object.data.via.map; + + continuation_length = 0; + + for (entry_index = 0; entry_index < data_map->size; entry_index++) { + entry_key = data_map->ptr[entry_index].key; + entry_value = data_map->ptr[entry_index].val; + + result = flb_log_event_encoder_append_body_msgpack_object(context, + &entry_key); + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + break; + } + + /* Check if this is the last entry in the map and if that is + * the case then add the lengths of all the trailing string + * objects after the map in order to append them to the value + * but only if the value object is a string + */ + if (entry_index + 1 == data_map->size && + entry_value.type == MSGPACK_OBJECT_STR) { + adjacent_object_offset = offset; + + while (msgpack_unpack_next( + &adjacent_object, + file->mult_sbuf.data, + file->mult_sbuf.size, + &adjacent_object_offset) == MSGPACK_UNPACK_SUCCESS) { + if (adjacent_object.data.type != MSGPACK_OBJECT_STR) { + break; + } + + /* Sum total bytes to append */ + continuation_length += adjacent_object.data.via.str.size + 1; + } + + result = flb_log_event_encoder_append_body_string_length( + context, + entry_value.via.str.size + + continuation_length); + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + break; + } + + result = flb_log_event_encoder_append_body_string_body( + context, + (char *) entry_value.via.str.ptr, + entry_value.via.str.size); + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + break; + } + + if (continuation_length > 0) { + adjacent_object_offset = offset; + + while (msgpack_unpack_next( + &adjacent_object, + file->mult_sbuf.data, + file->mult_sbuf.size, + &adjacent_object_offset) == MSGPACK_UNPACK_SUCCESS) { + if (adjacent_object.data.type != MSGPACK_OBJECT_STR) { + break; + } + + result = flb_log_event_encoder_append_body_string_body( + context, + "\n", + 1); + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + break; + } + + result = flb_log_event_encoder_append_body_string_body( + context, + (char *) adjacent_object.data.via.str.ptr, + adjacent_object.data.via.str.size); + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + break; + } + } + } + } + else { + result = flb_log_event_encoder_append_body_msgpack_object(context, + &entry_value); + } + } + } + + msgpack_unpacked_destroy(¤t_object); + msgpack_unpacked_destroy(&adjacent_object); + + /* Reset status */ + file->mult_firstline = FLB_FALSE; + file->mult_skipping = FLB_FALSE; + file->mult_keys = 0; + file->mult_flush_timeout = 0; + + msgpack_sbuffer_destroy(&file->mult_sbuf); + + file->mult_sbuf.data = NULL; + + flb_time_zero(&file->mult_time); + + return result; +} + +/* Flush any multiline context data into outgoing buffers */ +int flb_tail_mult_flush(struct flb_tail_file *file, struct flb_tail_config *ctx) +{ + int result; + + /* nothing to flush */ + if (file->mult_firstline == FLB_FALSE) { + return -1; + } + + if (file->mult_keys == 0) { + return -1; + } + + result = flb_log_event_encoder_begin_record(file->ml_log_event_encoder); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_set_timestamp( + file->ml_log_event_encoder, &file->mult_time); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_tail_mult_pack_line_body( + file->ml_log_event_encoder, + file); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_commit_record( + file->ml_log_event_encoder); + } + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + flb_input_log_append(ctx->ins, + file->tag_buf, + file->tag_len, + file->ml_log_event_encoder->output_buffer, + file->ml_log_event_encoder->output_length); + result = 0; + } + else { + flb_plg_error(file->config->ins, "error packing event : %d", result); + + result = -1; + } + + flb_log_event_encoder_reset(file->ml_log_event_encoder); + + return result; +} + +static void file_pending_flush(struct flb_tail_config *ctx, + struct flb_tail_file *file, time_t now) +{ + if (file->mult_flush_timeout > now) { + return; + } + + if (file->mult_firstline == FLB_FALSE) { + if (file->mult_sbuf.data == NULL || file->mult_sbuf.size <= 0) { + return; + } + } + + flb_tail_mult_flush(file, ctx); +} + +int flb_tail_mult_pending_flush_all(struct flb_tail_config *ctx) +{ + time_t expired; + struct mk_list *head; + struct flb_tail_file *file; + + expired = time(NULL) + 3600; + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, expired); + } + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + file_pending_flush(ctx, file, expired); + } + + return 0; +} + +int flb_tail_mult_pending_flush(struct flb_input_instance *ins, + struct flb_config *config, void *context) +{ + time_t now; + struct mk_list *head; + struct flb_tail_file *file; + struct flb_tail_config *ctx = context; + + now = time(NULL); + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + + file_pending_flush(ctx, file, now); + } + + /* Iterate promoted event files with pending bytes */ + mk_list_foreach(head, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + + file_pending_flush(ctx, file, now); + } + + return 0; +} |