From 5da14042f70711ea5cf66e034699730335462f66 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 14:08:03 +0200 Subject: Merging upstream version 1.45.3+dfsg. Signed-off-by: Daniel Baumann --- .../plugins/filter_multiline/ml_concat.c | 473 +++++++++++++++++++++ 1 file changed, 473 insertions(+) create mode 100644 src/fluent-bit/plugins/filter_multiline/ml_concat.c (limited to 'src/fluent-bit/plugins/filter_multiline/ml_concat.c') diff --git a/src/fluent-bit/plugins/filter_multiline/ml_concat.c b/src/fluent-bit/plugins/filter_multiline/ml_concat.c new file mode 100644 index 000000000..27121f48d --- /dev/null +++ b/src/fluent-bit/plugins/filter_multiline/ml_concat.c @@ -0,0 +1,473 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2021 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ml_concat.h" + +msgpack_object_kv *ml_get_key(msgpack_object *map, char *check_for_key) +{ + int i; + char *key_str = NULL; + size_t key_str_size = 0; + msgpack_object_kv *kv; + msgpack_object key; + int check_key = FLB_FALSE; + + kv = map->via.map.ptr; + + for(i=0; i < map->via.map.size; i++) { + check_key = FLB_FALSE; + + key = (kv+i)->key; + if (key.type == MSGPACK_OBJECT_BIN) { + key_str = (char *) key.via.bin.ptr; + key_str_size = key.via.bin.size; + check_key = FLB_TRUE; + } + if (key.type == MSGPACK_OBJECT_STR) { + key_str = (char *) key.via.str.ptr; + key_str_size = key.via.str.size; + check_key = FLB_TRUE; + } + + if (check_key == FLB_TRUE) { + if (strncmp(check_for_key, key_str, key_str_size) == 0) { + return (kv+i); + } + } + } + return NULL; +} + +int ml_is_partial(msgpack_object *map) +{ + char *val_str = NULL; + msgpack_object_kv *kv; + msgpack_object val; + + kv = ml_get_key(map, FLB_MULTILINE_PARTIAL_MESSAGE_KEY); + + if (kv == NULL) { + return FLB_FALSE; + } + + val = kv->val; + if (val.type == MSGPACK_OBJECT_BIN) { + val_str = (char *) val.via.bin.ptr; + } + if (val.type == MSGPACK_OBJECT_STR) { + val_str = (char *) val.via.str.ptr; + } + + if (strncasecmp("true", val_str, 4) == 0) { + return FLB_TRUE; + } + return FLB_FALSE; +} + +int ml_is_partial_last(msgpack_object *map) +{ + char *val_str = NULL; + msgpack_object_kv *kv; + msgpack_object val; + + kv = ml_get_key(map, FLB_MULTILINE_PARTIAL_LAST_KEY); + + if (kv == NULL) { + return FLB_FALSE; + } + + val = kv->val; + if (val.type == MSGPACK_OBJECT_BIN) { + val_str = (char *) val.via.bin.ptr; + } + if (val.type == MSGPACK_OBJECT_STR) { + val_str = (char *) val.via.str.ptr; + } + + if (strncasecmp("true", val_str, 4) == 0) { + return FLB_TRUE; + } + return FLB_FALSE; +} + +int ml_get_partial_id(msgpack_object *map, + char **partial_id_str, + size_t *partial_id_size) +{ + char *val_str = NULL; + size_t val_str_size = 0; + msgpack_object_kv *kv; + msgpack_object val; + + kv = ml_get_key(map, FLB_MULTILINE_PARTIAL_ID_KEY); + + if (kv == NULL) { + return -1; + } + + val = kv->val; + if (val.type == MSGPACK_OBJECT_BIN) { + val_str = (char *) val.via.bin.ptr; + val_str_size = val.via.bin.size; + } + if (val.type == MSGPACK_OBJECT_STR) { + val_str = (char *) val.via.str.ptr; + val_str_size = val.via.str.size; + } + + *partial_id_str = val_str; + *partial_id_size = val_str_size; + + return 0; +} + +struct split_message_packer *ml_get_packer(struct mk_list *packers, const char *tag, + char *input_name, + char *partial_id_str, size_t partial_id_size) +{ + struct mk_list *tmp; + struct mk_list *head; + struct split_message_packer *packer; + int name_check; + int tag_check; + int id_check; + + + mk_list_foreach_safe(head, tmp, packers) { + packer = mk_list_entry(head, struct split_message_packer, _head); + id_check = strncmp(packer->partial_id, partial_id_str, partial_id_size); + if (id_check != 0) { + continue; + } + name_check = strcmp(packer->input_name, input_name); + if (name_check != 0) { + continue; + } + tag_check = strcmp(packer->tag, tag); + if (tag_check == 0) { + return packer; + } + } + + return NULL; +} + +struct split_message_packer *ml_create_packer(const char *tag, char *input_name, + char *partial_id_str, size_t partial_id_size, + msgpack_object *map, char *multiline_key_content, + struct flb_time *tm) +{ + struct split_message_packer *packer; + msgpack_object_kv *kv; + msgpack_object_kv *split_kv; + flb_sds_t tmp; + int i; + char *key_str = NULL; + size_t key_str_size = 0; + msgpack_object key; + int check_key = FLB_FALSE; + size_t len; + int ret; + + packer = flb_calloc(1, sizeof(struct split_message_packer)); + if (!packer) { + flb_errno(); + return NULL; + } + + tmp = flb_sds_create(input_name); + if (!tmp) { + flb_errno(); + flb_free(packer); + return NULL; + } + packer->input_name = tmp; + + tmp = flb_sds_create(tag); + if (!tmp) { + flb_errno(); + ml_split_message_packer_destroy(packer); + return NULL; + } + packer->tag = tmp; + + tmp = flb_sds_create_len(partial_id_str, partial_id_size); + if (!tmp) { + flb_errno(); + ml_split_message_packer_destroy(packer); + return NULL; + } + packer->partial_id = tmp; + + packer->buf = flb_sds_create_size(FLB_MULTILINE_PARTIAL_BUF_SIZE); + if (!packer->buf) { + flb_errno(); + ml_split_message_packer_destroy(packer); + return NULL; + } + + ret = flb_log_event_encoder_init(&packer->log_encoder, + FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_error("[partial message concat] Log event encoder initialization error : %d", ret); + + ml_split_message_packer_destroy(packer); + + return NULL; + } + + /* get the key that is split */ + split_kv = ml_get_key(map, multiline_key_content); + if (split_kv == NULL) { + flb_error("[partial message concat] Could not find key %s in record", multiline_key_content); + ml_split_message_packer_destroy(packer); + return NULL; + } + + ret = flb_log_event_encoder_begin_record(&packer->log_encoder); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_error("[partial message concat] Log event encoder error : %d", ret); + + ml_split_message_packer_destroy(packer); + + return NULL; + } + + /* write all of the keys except the split one and the partial metadata */ + ret = flb_log_event_encoder_set_timestamp( + &packer->log_encoder, tm); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_error("[partial message concat] Log event encoder error : %d", ret); + + ml_split_message_packer_destroy(packer); + + return NULL; + } + + kv = map->via.map.ptr; + + for(i=0; + i < map->via.map.size && + ret == FLB_EVENT_ENCODER_SUCCESS; + i++) { + if ((kv+i) == split_kv) { + continue; + } + + key = (kv+i)->key; + if (key.type == MSGPACK_OBJECT_BIN) { + key_str = (char *) key.via.bin.ptr; + key_str_size = key.via.bin.size; + check_key = FLB_TRUE; + } + if (key.type == MSGPACK_OBJECT_STR) { + key_str = (char *) key.via.str.ptr; + key_str_size = key.via.str.size; + check_key = FLB_TRUE; + } + + len = FLB_MULTILINE_PARTIAL_PREFIX_LEN; + if (key_str_size < len) { + len = key_str_size; + } + + if (check_key == FLB_TRUE) { + if (strncmp(FLB_MULTILINE_PARTIAL_PREFIX, key_str, len) == 0) { + /* don't pack the partial keys */ + continue; + } + } + + ret = flb_log_event_encoder_append_body_values( + &packer->log_encoder, + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].key), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val)); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + /* write split kv last, so we can append to it later as needed */ + ret = flb_log_event_encoder_append_body_msgpack_object( + &packer->log_encoder, + &split_kv->key); + } + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_error("[partial message concat] Log event encoder error : %d", ret); + + ml_split_message_packer_destroy(packer); + + return NULL; + } + + return packer; +} + +unsigned long long ml_current_timestamp() { + struct flb_time te; + flb_time_get(&te); + return flb_time_to_nanosec(&te) / 1000000LL; +} + +int ml_split_message_packer_write(struct split_message_packer *packer, + msgpack_object *map, char *multiline_key_content) +{ + char *val_str = NULL; + size_t val_str_size = 0; + msgpack_object_kv *kv; + msgpack_object val; + + kv = ml_get_key(map, multiline_key_content); + + if (kv == NULL) { + flb_error("[partial message concat] Could not find key %s in record", multiline_key_content); + return -1; + } + + val = kv->val; + if (val.type == MSGPACK_OBJECT_BIN) { + val_str = (char *) val.via.bin.ptr; + val_str_size = val.via.bin.size; + } else if (val.type == MSGPACK_OBJECT_STR) { + val_str = (char *) val.via.str.ptr; + val_str_size = val.via.str.size; + } else { + return -1; + } + + flb_sds_cat_safe(&packer->buf, val_str, val_str_size); + packer->last_write_time = ml_current_timestamp(); + + return 0; +} + +void ml_split_message_packer_complete(struct split_message_packer *packer) +{ + flb_log_event_encoder_append_body_string(&packer->log_encoder, + packer->buf, + flb_sds_len(packer->buf)); + + flb_log_event_encoder_commit_record(&packer->log_encoder); +} + +void ml_append_complete_record(struct split_message_packer *packer, + struct flb_log_event_encoder *log_encoder) +{ + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + int ret; + + ret = flb_log_event_decoder_init( + &log_decoder, + packer->log_encoder.output_buffer, + packer->log_encoder.output_length); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_error("[partial message concat] Log event decoder error : %d", + ret); + + return; + } + + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + ret = flb_log_event_encoder_begin_record(log_encoder); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_timestamp( + log_encoder, + &log_event.timestamp); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_metadata_from_msgpack_object( + log_encoder, + log_event.metadata); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_body_from_msgpack_object( + log_encoder, + log_event.body); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(log_encoder); + } + else { + flb_log_event_encoder_rollback_record(log_encoder); + + break; + } + } + + if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA && + log_decoder.offset == packer->log_encoder.output_length) { + ret = FLB_EVENT_ENCODER_SUCCESS; + } + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_error("[partial message concat] Log event encoder error : %d", + ret); + + return; + } + + flb_log_event_decoder_destroy(&log_decoder); +} + +void ml_split_message_packer_destroy(struct split_message_packer *packer) +{ + if (!packer) { + return; + } + + if (packer->tag) { + flb_sds_destroy(packer->tag); + } + if (packer->buf) { + flb_sds_destroy(packer->buf); + } + if (packer->input_name) { + flb_sds_destroy(packer->input_name); + } + if (packer->partial_id) { + flb_sds_destroy(packer->partial_id); + } + + flb_log_event_encoder_destroy(&packer->log_encoder); + + flb_free(packer); +} + -- cgit v1.2.3