summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/multiline/flb_ml.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:23 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:44 +0000
commit836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch)
tree1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/src/multiline/flb_ml.c
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz
netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/multiline/flb_ml.c')
-rw-r--r--fluent-bit/src/multiline/flb_ml.c1562
1 files changed, 0 insertions, 1562 deletions
diff --git a/fluent-bit/src/multiline/flb_ml.c b/fluent-bit/src/multiline/flb_ml.c
deleted file mode 100644
index 28e123ce..00000000
--- a/fluent-bit/src/multiline/flb_ml.c
+++ /dev/null
@@ -1,1562 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-/* Fluent Bit
- * ==========
- * Copyright (C) 2015-2022 The Fluent Bit Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_mem.h>
-#include <fluent-bit/flb_log.h>
-#include <fluent-bit/flb_regex.h>
-#include <fluent-bit/flb_time.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_scheduler.h>
-#include <fluent-bit/multiline/flb_ml.h>
-#include <fluent-bit/multiline/flb_ml_rule.h>
-#include <fluent-bit/multiline/flb_ml_group.h>
-
-#include <stdarg.h>
-#include <math.h>
-
-static inline int match_negate(struct flb_ml_parser *ml_parser, int matched)
-{
- int rule_match = matched;
-
- /* Validate pattern matching against expected 'negate' condition */
- if (matched == FLB_TRUE) {
- if (ml_parser->negate == FLB_FALSE) {
- rule_match = FLB_TRUE;
- }
- else {
- rule_match = FLB_FALSE;
- }
- }
- else {
- if (ml_parser->negate == FLB_TRUE) {
- rule_match = FLB_TRUE;
- }
- }
-
- return rule_match;
-}
-
-static uint64_t time_ms_now()
-{
- uint64_t ms;
- struct flb_time tm;
-
- flb_time_get(&tm);
- ms = (tm.tm.tv_sec * 1000) + lround(tm.tm.tv_nsec/1.0e6);
- return ms;
-}
-
-
-int flb_ml_flush_stdout(struct flb_ml_parser *parser,
- struct flb_ml_stream *mst,
- void *data, char *buf_data, size_t buf_size)
-{
- fprintf(stdout, "\n%s----- MULTILINE FLUSH (stream_id=%" PRIu64 ") -----%s\n",
- ANSI_GREEN, mst->id, ANSI_RESET);
-
- /* Print incoming flush buffer */
- flb_pack_print(buf_data, buf_size);
-
- fprintf(stdout, "%s----------- EOF -----------%s\n",
- ANSI_GREEN, ANSI_RESET);
- return 0;
-}
-
-int flb_ml_type_lookup(char *str)
-{
- int type = -1;
-
- if (strcasecmp(str, "regex") == 0) {
- type = FLB_ML_REGEX;
- }
- else if (strcasecmp(str, "endswith") == 0) {
- type = FLB_ML_ENDSWITH;
- }
- else if (strcasecmp(str, "equal") == 0 || strcasecmp(str, "eq") == 0) {
- type = FLB_ML_EQ;
- }
-
- return type;
-}
-
-void flb_ml_flush_parser_instance(struct flb_ml *ml,
- struct flb_ml_parser_ins *parser_i,
- uint64_t stream_id, int forced_flush)
-{
- struct mk_list *head;
- struct mk_list *head_group;
- struct flb_ml_stream *mst;
- struct flb_ml_stream_group *group;
-
- mk_list_foreach(head, &parser_i->streams) {
- mst = mk_list_entry(head, struct flb_ml_stream, _head);
- if (stream_id != 0 && mst->id != stream_id) {
- continue;
- }
-
- /* Iterate stream groups */
- mk_list_foreach(head_group, &mst->groups) {
- group = mk_list_entry(head_group, struct flb_ml_stream_group, _head);
- flb_ml_flush_stream_group(parser_i->ml_parser, mst, group, forced_flush);
- }
- }
-}
-
-void flb_ml_flush_pending(struct flb_ml *ml, uint64_t now, int forced_flush)
-{
- struct mk_list *head;
- struct flb_ml_parser_ins *parser_i;
- struct flb_ml_group *group;
-
- /* set the last flush time */
- ml->last_flush = now;
-
- /* flush only the first group of the context */
- group = mk_list_entry_first(&ml->groups, struct flb_ml_group, _head);
-
- /* iterate group parser instances */
- mk_list_foreach(head, &group->parsers) {
- parser_i = mk_list_entry(head, struct flb_ml_parser_ins, _head);
- flb_ml_flush_parser_instance(ml, parser_i, 0, forced_flush);
- }
-}
-
-void flb_ml_flush_pending_now(struct flb_ml *ml)
-{
- uint64_t now;
-
- now = time_ms_now();
- flb_ml_flush_pending(ml, now, FLB_TRUE);
-}
-
-static void cb_ml_flush_timer(struct flb_config *ctx, void *data)
-{
- uint64_t now;
- struct flb_ml *ml = data;
-
- now = time_ms_now();
- if (ml->last_flush + ml->flush_ms > now) {
- return;
- }
-
- /*
- * Iterate over all streams and groups and for a flush for expired groups
- * which has not flushed in the last N milliseconds.
- */
- flb_ml_flush_pending(ml, now, FLB_TRUE);
-}
-
-int flb_ml_register_context(struct flb_ml_stream_group *group,
- struct flb_time *tm, msgpack_object *map)
-{
- if (tm) {
- flb_time_copy(&group->mp_time, tm);
- }
-
- if (map) {
- msgpack_pack_object(&group->mp_pck, *map);
- }
-
- return 0;
-}
-
-static inline void breakline_prepare(struct flb_ml_parser_ins *parser_i,
- struct flb_ml_stream_group *stream_group)
-{
- int len;
-
- if (parser_i->key_content) {
- return;
- }
-
- len = flb_sds_len(stream_group->buf);
- if (len <= 0) {
- return;
- }
-
- if (stream_group->buf[len - 1] != '\n') {
- flb_sds_cat_safe(&stream_group->buf, "\n", 1);
- }
-}
-
-/*
- * package content into a multiline stream:
- *
- * full_map: if the original content to process comes in msgpack map, this variable
- * reference the map. It's only used in case we will package a first line so we
- * store a copy of the other key values in the map for flush time.
- */
-static int package_content(struct flb_ml_stream *mst,
- msgpack_object *metadata,
- msgpack_object *full_map,
- void *buf, size_t size, struct flb_time *tm,
- msgpack_object *val_content,
- msgpack_object *val_pattern,
- msgpack_object *val_group)
-{
- int len;
- int ret;
- int rule_match = FLB_FALSE;
- int processed = FLB_FALSE;
- int type;
- size_t offset = 0;
- size_t buf_size;
- char *buf_data;
- msgpack_object *val = val_content;
- struct flb_ml_parser *parser;
- struct flb_ml_parser_ins *parser_i;
- struct flb_ml_stream_group *stream_group;
-
- parser_i = mst->parser;
- parser = parser_i->ml_parser;
-
- /* Get stream group */
- stream_group = flb_ml_stream_group_get(mst->parser, mst, val_group);
- if (!mst->last_stream_group) {
- mst->last_stream_group = stream_group;
- }
- else {
- if (mst->last_stream_group != stream_group) {
- mst->last_stream_group = stream_group;
- }
- }
-
- /* Set the parser type */
- type = parser->type;
-
- if (val_pattern) {
- val = val_pattern;
- }
-
- if (val) {
- buf_data = (char *) val->via.str.ptr;
- buf_size = val->via.str.size;
- }
- else {
- buf_data = buf;
- buf_size = size;
-
- }
- if (type == FLB_ML_REGEX) {
- ret = flb_ml_rule_process(parser, mst,
- stream_group, full_map, buf, size, tm,
- val_content, val_pattern);
- if (ret == -1) {
- processed = FLB_FALSE;
- }
- else {
- processed = FLB_TRUE;
- }
- }
- else if (type == FLB_ML_ENDSWITH) {
- len = flb_sds_len(parser->match_str);
- if (buf_data && len <= buf_size) {
- /* Validate if content ends with expected string */
- offset = buf_size - len;
- ret = memcmp(buf_data + offset, parser->match_str, len);
- if (ret == 0) {
- rule_match = match_negate(parser, FLB_TRUE);
- }
- else {
- rule_match = match_negate(parser, FLB_FALSE);
- }
-
- if (stream_group->mp_sbuf.size == 0) {
- flb_ml_register_context(stream_group, tm, full_map);
- }
-
- /* Prepare concatenation */
- breakline_prepare(parser_i, stream_group);
-
- /* Concatenate value */
- if (val_content) {
- flb_sds_cat_safe(&stream_group->buf,
- val_content->via.str.ptr,
- val_content->via.str.size);
- }
- else {
- flb_sds_cat_safe(&stream_group->buf, buf_data, buf_size);
- }
-
- /* on ENDSWITH mode, a rule match means flush the content */
- if (rule_match) {
- flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE);
- }
- processed = FLB_TRUE;
- }
- }
- else if (type == FLB_ML_EQ) {
- if (buf_size == flb_sds_len(parser->match_str) &&
- memcmp(buf_data, parser->match_str, buf_size) == 0) {
- /* EQ match */
- rule_match = match_negate(parser, FLB_TRUE);
- }
- else {
- rule_match = match_negate(parser, FLB_FALSE);
- }
-
- if (stream_group->mp_sbuf.size == 0) {
- flb_ml_register_context(stream_group, tm, full_map);
- }
-
- /* Prepare concatenation */
- breakline_prepare(parser_i, stream_group);
-
- /* Concatenate value */
- if (val_content) {
- flb_sds_cat_safe(&stream_group->buf,
- val_content->via.str.ptr,
- val_content->via.str.size);
- }
- else {
- flb_sds_cat_safe(&stream_group->buf, buf_data, buf_size);
- }
-
- /* on ENDSWITH mode, a rule match means flush the content */
- if (rule_match) {
- flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE);
- }
- processed = FLB_TRUE;
- }
-
- if (processed && metadata != NULL) {
- msgpack_pack_object(&stream_group->mp_md_pck, *metadata);
- }
-
- return processed;
-}
-
-/*
- * Retrieve the ID of a specific key name in a map. This function might be
- * extended later to use record accessor, since all current cases are solved
- * now quering the first level of keys in the map, we avoid record accessor
- * to avoid extra memory allocations.
- */
-static int get_key_id(msgpack_object *map, flb_sds_t key_name)
-{
- int i;
- int len;
- int found = FLB_FALSE;
- msgpack_object key;
- msgpack_object val;
-
- if (!key_name) {
- return -1;
- }
-
- len = flb_sds_len(key_name);
- for (i = 0; i < map->via.map.size; i++) {
- key = map->via.map.ptr[i].key;
- val = map->via.map.ptr[i].val;
-
- if (key.type != MSGPACK_OBJECT_STR || val.type != MSGPACK_OBJECT_STR) {
- continue;
- }
-
- if (key.via.str.size != len) {
- continue;
- }
-
- if (strncmp(key.via.str.ptr, key_name, len) == 0) {
- found = FLB_TRUE;
- break;
- }
- }
-
- if (found) {
- return i;
- }
-
- return -1;
-}
-
-static int process_append(struct flb_ml_parser_ins *parser_i,
- struct flb_ml_stream *mst,
- int type,
- struct flb_time *tm,
- msgpack_object *metadata,
- msgpack_object *obj,
- void *buf, size_t size)
-{
- int ret;
- int id_content = -1;
- int id_pattern = -1;
- int id_group = -1;
- int unpacked = FLB_FALSE;
- size_t off = 0;
- msgpack_object *full_map = NULL;
- msgpack_object *val_content = NULL;
- msgpack_object *val_pattern = NULL;
- msgpack_object *val_group = NULL;
- msgpack_unpacked result;
-
- /* Lookup the key */
- if (type == FLB_ML_TYPE_TEXT) {
- ret = package_content(mst, NULL, NULL, buf, size, tm, NULL, NULL, NULL);
- if (ret == FLB_FALSE) {
- return -1;
- }
- return 0;
- }
- else if (type == FLB_ML_TYPE_MAP) {
- full_map = obj;
- /*
- * When full_map and buf is not NULL,
- * we use 'buf' since buf is already processed from full_map at
- * ml_append_try_parser_type_map.
- */
- if (!full_map || (buf != NULL && full_map != NULL)) {
- off = 0;
- msgpack_unpacked_init(&result);
- ret = msgpack_unpack_next(&result, buf, size, &off);
- if (ret != MSGPACK_UNPACK_SUCCESS) {
- return -1;
- }
- full_map = &result.data;
- unpacked = FLB_TRUE;
- }
- else if (full_map->type != MSGPACK_OBJECT_MAP) {
- msgpack_unpacked_destroy(&result);
- return -1;
- }
- }
-
- /* Lookup for key_content entry */
- id_content = get_key_id(full_map, parser_i->key_content);
- if (id_content == -1) {
- if (unpacked) {
- msgpack_unpacked_destroy(&result);
- }
- return -1;
- }
-
- val_content = &full_map->via.map.ptr[id_content].val;
- if (val_content->type != MSGPACK_OBJECT_STR) {
- val_content = NULL;
- }
-
- /* Optional: Lookup for key_pattern entry */
- if (parser_i->key_pattern) {
- id_pattern = get_key_id(full_map, parser_i->key_pattern);
- if (id_pattern >= 0) {
- val_pattern = &full_map->via.map.ptr[id_pattern].val;
- if (val_pattern->type != MSGPACK_OBJECT_STR) {
- val_pattern = NULL;
- }
- }
- }
-
- /* Optional: lookup for key_group entry */
- if (parser_i->key_group) {
- id_group = get_key_id(full_map, parser_i->key_group);
- if (id_group >= 0) {
- val_group = &full_map->via.map.ptr[id_group].val;
- if (val_group->type != MSGPACK_OBJECT_STR) {
- val_group = NULL;
- }
- }
- }
-
- /* Package the content */
- ret = package_content(mst, metadata, full_map, buf, size, tm,
- val_content, val_pattern, val_group);
- if (unpacked) {
- msgpack_unpacked_destroy(&result);
- }
- if (ret == FLB_FALSE) {
- return -1;
- }
- return 0;
-}
-
-static int ml_append_try_parser_type_text(struct flb_ml_parser_ins *parser,
- uint64_t stream_id,
- int *type,
- struct flb_time *tm, void *buf, size_t size,
- msgpack_object *map,
- void **out_buf, size_t *out_size, int *out_release,
- struct flb_time *out_time)
-{
- int ret;
-
- if (parser->ml_parser->parser) {
- /* Parse incoming content */
- ret = flb_parser_do(parser->ml_parser->parser, (char *) buf, size,
- out_buf, out_size, out_time);
- if (flb_time_to_nanosec(out_time) == 0L) {
- flb_time_copy(out_time, tm);
- }
- if (ret >= 0) {
- *out_release = FLB_TRUE;
- *type = FLB_ML_TYPE_MAP;
- }
- else {
- *out_buf = buf;
- *out_size = size;
- return -1;
- }
- }
- else {
- *out_buf = buf;
- *out_size = size;
- }
- return 0;
-}
-
-static int ml_append_try_parser_type_map(struct flb_ml_parser_ins *parser,
- uint64_t stream_id,
- int *type,
- struct flb_time *tm, void *buf, size_t size,
- msgpack_object *map,
- void **out_buf, size_t *out_size, int *out_release,
- struct flb_time *out_time)
-{
- int map_size;
- int i;
- int len;
- msgpack_object key;
- msgpack_object val;
-
- if (map == NULL || map->type != MSGPACK_OBJECT_MAP) {
- flb_error("%s:invalid map", __FUNCTION__);
- return -1;
- }
-
- if (parser->ml_parser->parser) {
- /* lookup key_content */
-
- len = flb_sds_len(parser->key_content);
- map_size = map->via.map.size;
- for(i = 0; i < map_size; i++) {
- key = map->via.map.ptr[i].key;
- val = map->via.map.ptr[i].val;
- if (key.type == MSGPACK_OBJECT_STR &&
- parser->key_content &&
- key.via.str.size == len &&
- strncmp(key.via.str.ptr, parser->key_content, len) == 0) {
- /* key_content found */
- if (val.type == MSGPACK_OBJECT_STR) {
- /* try parse the value of key_content e*/
- return ml_append_try_parser_type_text(parser, stream_id, type,
- tm, (void*) val.via.str.ptr,
- val.via.str.size,
- map,
- out_buf, out_size, out_release,
- out_time);
- } else {
- flb_error("%s: not string", __FUNCTION__);
- return -1;
- }
- }
- }
- }
- else {
- *out_buf = buf;
- *out_size = size;
- }
- return 0;
-}
-
-static int ml_append_try_parser(struct flb_ml_parser_ins *parser,
- uint64_t stream_id,
- int type,
- struct flb_time *tm, void *buf, size_t size,
- msgpack_object *metadata,
- msgpack_object *map)
-{
- int ret;
- int release = FLB_FALSE;
- void *out_buf = NULL;
- size_t out_size = 0;
- struct flb_time out_time;
- struct flb_ml_stream *mst;
-
- flb_time_zero(&out_time);
-
- switch (type) {
- case FLB_ML_TYPE_TEXT:
- ret = ml_append_try_parser_type_text(parser, stream_id, &type,
- tm, buf, size, map,
- &out_buf, &out_size, &release,
- &out_time);
- if (ret < 0) {
- return -1;
- }
- break;
- case FLB_ML_TYPE_MAP:
- ret = ml_append_try_parser_type_map(parser, stream_id, &type,
- tm, buf, size, map,
- &out_buf, &out_size, &release,
- &out_time);
- if (ret < 0) {
- return -1;
- }
- break;
-
- default:
- flb_error("[multiline] unknown type=%d", type);
- return -1;
- }
-
- if (flb_time_to_nanosec(&out_time) == 0L) {
- if (tm && flb_time_to_nanosec(tm) != 0L) {
- flb_time_copy(&out_time, tm);
- }
- else {
- flb_time_get(&out_time);
- }
- }
-
- /* Get the stream */
- mst = flb_ml_stream_get(parser, stream_id);
- if (!mst) {
- flb_error("[multiline] invalid stream_id %" PRIu64 ", could not "
- "append content to multiline context", stream_id);
- goto exit;
- }
-
- /* Process the binary record */
- ret = process_append(parser, mst, type, &out_time, metadata, map, out_buf, out_size);
- if (ret == -1) {
- if (release == FLB_TRUE) {
- flb_free(out_buf);
- }
- return -1;
- }
-
- exit:
- if (release == FLB_TRUE) {
- flb_free(out_buf);
- }
-
- return 0;
-}
-
-int flb_ml_append_text(struct flb_ml *ml, uint64_t stream_id,
- struct flb_time *tm, void *buf, size_t size)
-{
- int ret;
- int processed = FLB_FALSE;
- struct mk_list *head;
- struct mk_list *head_group;
- struct flb_ml_group *group;
- struct flb_ml_stream *mst;
- struct flb_ml_parser_ins *lru_parser = NULL;
- struct flb_ml_parser_ins *parser_i;
- struct flb_time out_time;
- struct flb_ml_stream_group *st_group;
- int type;
-
- type = FLB_ML_TYPE_TEXT;
-
- flb_time_zero(&out_time);
-
- mk_list_foreach(head, &ml->groups) {
- group = mk_list_entry(head, struct flb_ml_group, _head);
-
- /* Check if the incoming data matches the last recently used parser */
- lru_parser = group->lru_parser;
-
- if (lru_parser && lru_parser->last_stream_id == stream_id) {
- ret = ml_append_try_parser(lru_parser, lru_parser->last_stream_id, type,
- tm, buf, size, NULL, NULL);
- if (ret == 0) {
- processed = FLB_TRUE;
- break;
- }
- else {
- flb_ml_flush_parser_instance(ml,
- lru_parser,
- lru_parser->last_stream_id,
- FLB_FALSE);
- }
- }
- else if (lru_parser && lru_parser->last_stream_id > 0) {
- /*
- * Clear last recently used parser to match new parser.
- * Do not flush last_stream_id since it should continue to parsing.
- */
- lru_parser = NULL;
- }
- }
-
- mk_list_foreach(head_group, &group->parsers) {
- parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head);
- if (lru_parser && lru_parser == parser_i &&
- lru_parser->last_stream_id == stream_id) {
- continue;
- }
-
- ret = ml_append_try_parser(parser_i, stream_id, type,
- tm, buf, size, NULL, NULL);
- if (ret == 0) {
- group->lru_parser = parser_i;
- group->lru_parser->last_stream_id = stream_id;
- lru_parser = parser_i;
- processed = FLB_TRUE;
- break;
- }
- else {
- parser_i = NULL;
- }
- }
-
- if (!processed) {
- if (lru_parser) {
- flb_ml_flush_parser_instance(ml, lru_parser, stream_id, FLB_FALSE);
- parser_i = lru_parser;
- }
- else {
- /* get the first parser (just to make use of it buffers) */
- parser_i = mk_list_entry_first(&group->parsers,
- struct flb_ml_parser_ins,
- _head);
- }
-
- flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE);
- mst = flb_ml_stream_get(parser_i, stream_id);
- if (!mst) {
- flb_error("[multiline] invalid stream_id %" PRIu64 ", could not "
- "append content to multiline context", stream_id);
- return -1;
- }
-
- /* Get stream group */
- st_group = flb_ml_stream_group_get(mst->parser, mst, NULL);
- flb_sds_cat_safe(&st_group->buf, buf, size);
- flb_ml_flush_stream_group(parser_i->ml_parser, mst, st_group, FLB_FALSE);
- }
-
- return 0;
-}
-
-
-
-int flb_ml_append_object(struct flb_ml *ml,
- uint64_t stream_id,
- struct flb_time *tm,
- msgpack_object *metadata,
- msgpack_object *obj)
-{
- int ret;
- int type;
- int processed = FLB_FALSE;
- struct mk_list *head;
- struct mk_list *head_group;
- struct flb_ml_group *group;
- struct flb_ml_parser_ins *lru_parser = NULL;
- struct flb_ml_parser_ins *parser_i;
- struct flb_ml_stream *mst;
- struct flb_ml_stream_group *st_group;
- struct flb_log_event event;
-
- if (metadata == NULL) {
- metadata = ml->log_event_decoder.empty_map;
- }
-
- /*
- * As incoming objects, we accept packed events
- * and msgpack Maps containing key/value pairs.
- */
- if (obj->type == MSGPACK_OBJECT_ARRAY) {
- flb_error("[multiline] appending object with invalid type, expected "
- "map, received type=%i", obj->type);
- return -1;
-
-
- flb_log_event_decoder_reset(&ml->log_event_decoder, NULL, 0);
-
- ret = flb_event_decoder_decode_object(&ml->log_event_decoder,
- &event,
- obj);
-
- if (ret != FLB_EVENT_DECODER_SUCCESS) {
- flb_error("[multiline] invalid event object");
-
- return -1;
- }
-
- tm = &event.timestamp;
- obj = event.body;
- metadata = event.metadata;
-
- type = FLB_ML_TYPE_MAP;
- }
- else if (obj->type == MSGPACK_OBJECT_MAP) {
- type = FLB_ML_TYPE_MAP;
- }
- else {
- flb_error("[multiline] appending object with invalid type, expected "
- "array or map, received type=%i", obj->type);
- return -1;
- }
-
- mk_list_foreach(head, &ml->groups) {
- group = mk_list_entry(head, struct flb_ml_group, _head);
-
- /* Check if the incoming data matches the last recently used parser */
- lru_parser = group->lru_parser;
-
- if (lru_parser && lru_parser->last_stream_id == stream_id) {
- ret = ml_append_try_parser(lru_parser, lru_parser->last_stream_id, type,
- tm, NULL, 0, metadata, obj);
- if (ret == 0) {
- processed = FLB_TRUE;
- break;
- }
- else {
- flb_ml_flush_parser_instance(ml,
- lru_parser,
- lru_parser->last_stream_id,
- FLB_FALSE);
- }
- }
- else if (lru_parser && lru_parser->last_stream_id > 0) {
- /*
- * Clear last recently used parser to match new parser.
- * Do not flush last_stream_id since it should continue to parsing.
- */
- lru_parser = NULL;
- }
- }
-
- mk_list_foreach(head_group, &group->parsers) {
- parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head);
- if (lru_parser && parser_i == lru_parser) {
- continue;
- }
-
- ret = ml_append_try_parser(parser_i, stream_id, type,
- tm, NULL, 0, metadata, obj);
- if (ret == 0) {
- group->lru_parser = parser_i;
- group->lru_parser->last_stream_id = stream_id;
- lru_parser = parser_i;
- processed = FLB_TRUE;
- break;
- }
- else {
- parser_i = NULL;
- }
- }
-
- if (!processed) {
- if (lru_parser) {
- flb_ml_flush_parser_instance(ml, lru_parser, stream_id, FLB_FALSE);
- parser_i = lru_parser;
- }
- else {
- /* get the first parser (just to make use of it buffers) */
- parser_i = mk_list_entry_first(&group->parsers,
- struct flb_ml_parser_ins,
- _head);
- }
-
- flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE);
- mst = flb_ml_stream_get(parser_i, stream_id);
- if (!mst) {
- flb_error("[multiline] invalid stream_id %" PRIu64 ", could not "
- "append content to multiline context", stream_id);
-
- return -1;
- }
-
- /* Get stream group */
- st_group = flb_ml_stream_group_get(mst->parser, mst, NULL);
-
- ret = flb_log_event_encoder_begin_record(&ml->log_event_encoder);
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_timestamp(
- &ml->log_event_encoder, tm);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- if (metadata != ml->log_event_decoder.empty_map) {
- ret = flb_log_event_encoder_set_metadata_from_msgpack_object(
- &ml->log_event_encoder, metadata);
- }
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_body_from_msgpack_object(
- &ml->log_event_encoder, obj);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_commit_record(&ml->log_event_encoder);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- mst->cb_flush(parser_i->ml_parser,
- mst,
- mst->cb_data,
- ml->log_event_encoder.output_buffer,
- ml->log_event_encoder.output_length);
- }
- else {
- flb_error("[multiline] log event encoder error : %d", ret);
- }
-
- flb_log_event_encoder_reset(&ml->log_event_encoder);
-
- /* reset group buffer counters */
- st_group->mp_sbuf.size = 0;
- flb_sds_len_set(st_group->buf, 0);
-
- /* Update last flush time */
- st_group->last_flush = time_ms_now();
- }
-
- return 0;
-}
-
-int flb_ml_append_event(struct flb_ml *ml, uint64_t stream_id,
- struct flb_log_event *event)
-{
- return flb_ml_append_object(ml,
- stream_id,
- &event->timestamp,
- event->metadata,
- event->body);
-}
-
-
-struct flb_ml *flb_ml_create(struct flb_config *ctx, char *name)
-{
- int result;
- struct flb_ml *ml;
-
- ml = flb_calloc(1, sizeof(struct flb_ml));
- if (!ml) {
- flb_errno();
- return NULL;
- }
- ml->name = flb_sds_create(name);
- if (!ml) {
- flb_free(ml);
- return NULL;
- }
-
- ml->config = ctx;
- ml->last_flush = time_ms_now();
- mk_list_init(&ml->groups);
-
- result = flb_log_event_decoder_init(&ml->log_event_decoder,
- NULL,
- 0);
-
- if (result != FLB_EVENT_DECODER_SUCCESS) {
- flb_error("cannot initialize log event decoder");
-
- flb_ml_destroy(ml);
-
- return NULL;
- }
-
- result = flb_log_event_encoder_init(&ml->log_event_encoder,
- FLB_LOG_EVENT_FORMAT_DEFAULT);
-
- if (result != FLB_EVENT_ENCODER_SUCCESS) {
- flb_error("cannot initialize log event encoder");
-
- flb_ml_destroy(ml);
-
- return NULL;
- }
-
- return ml;
-}
-
-/*
- * Some multiline contexts might define a parser name but not a parser context,
- * for missing contexts, just lookup the parser and perform the assignment.
- *
- * The common use case is when reading config files with [PARSER] and [MULTILINE_PARSER]
- * definitions, so we need to delay the parser loading.
- */
-int flb_ml_parsers_init(struct flb_config *ctx)
-{
- struct mk_list *head;
- struct flb_parser *p;
- struct flb_ml_parser *ml_parser;
-
- mk_list_foreach(head, &ctx->multiline_parsers) {
- ml_parser = mk_list_entry(head, struct flb_ml_parser, _head);
- if (ml_parser->parser_name && !ml_parser->parser) {
- p = flb_parser_get(ml_parser->parser_name, ctx);
- if (!p) {
- flb_error("multiline parser '%s' points to an undefined parser '%s'",
- ml_parser->name, ml_parser->parser_name);
- return -1;
- }
- ml_parser->parser = p;
- }
- }
-
- return 0;
-}
-
-int flb_ml_auto_flush_init(struct flb_ml *ml)
-{
- struct flb_sched *scheduler;
- int ret;
-
- if (ml == NULL) {
- return -1;
- }
-
- scheduler = flb_sched_ctx_get();
-
- if (scheduler == NULL) {
- flb_error("[multiline] scheduler context has not been created");
- return -1;
- }
-
- if (ml->flush_ms < 500) {
- flb_error("[multiline] flush timeout '%i' is too low", ml->flush_ms);
- return -1;
- }
-
- /* Create flush timer */
- ret = flb_sched_timer_cb_create(scheduler,
- FLB_SCHED_TIMER_CB_PERM,
- ml->flush_ms,
- cb_ml_flush_timer,
- ml, NULL);
- return ret;
-}
-
-int flb_ml_destroy(struct flb_ml *ml)
-{
- struct mk_list *head;
- struct mk_list *tmp;
- struct flb_ml_group *group;
-
- if (!ml) {
- return 0;
- }
-
- flb_log_event_decoder_destroy(&ml->log_event_decoder);
- flb_log_event_encoder_destroy(&ml->log_event_encoder);
-
- if (ml->name) {
- flb_sds_destroy(ml->name);
- }
-
- /* destroy groups */
- mk_list_foreach_safe(head, tmp, &ml->groups) {
- group = mk_list_entry(head, struct flb_ml_group, _head);
- flb_ml_group_destroy(group);
- }
-
- flb_free(ml);
- return 0;
-}
-
-static int flb_msgpack_object_hash_internal(cfl_hash_state_t *state,
- msgpack_object *object)
-{
- void *dummy_pointer;
- int result;
- int index;
-
- if (object == NULL) {
- return 0;
- }
-
- dummy_pointer = NULL;
- result = 0;
-
- if (object->type == MSGPACK_OBJECT_NIL) {
- cfl_hash_64bits_update(state,
- &dummy_pointer,
- sizeof(dummy_pointer));
- }
- else if (object->type == MSGPACK_OBJECT_BOOLEAN) {
- cfl_hash_64bits_update(state,
- &object->via.boolean,
- sizeof(object->via.boolean));
- }
- else if (object->type == MSGPACK_OBJECT_POSITIVE_INTEGER ||
- object->type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
- cfl_hash_64bits_update(state,
- &object->via.u64,
- sizeof(object->via.u64));
- }
- else if (object->type == MSGPACK_OBJECT_FLOAT32 ||
- object->type == MSGPACK_OBJECT_FLOAT64 ||
- object->type == MSGPACK_OBJECT_FLOAT) {
- cfl_hash_64bits_update(state,
- &object->via.f64,
- sizeof(object->via.f64));
- }
- else if (object->type == MSGPACK_OBJECT_STR) {
- cfl_hash_64bits_update(state,
- object->via.str.ptr,
- object->via.str.size);
- }
- else if (object->type == MSGPACK_OBJECT_ARRAY) {
- for (index = 0 ;
- index < object->via.array.size &&
- result == 0;
- index++) {
- result = flb_msgpack_object_hash_internal(
- state,
- &object->via.array.ptr[index]);
- }
- }
- else if (object->type == MSGPACK_OBJECT_MAP) {
- for (index = 0 ;
- index < object->via.map.size &&
- result == 0;
- index++) {
- result = flb_msgpack_object_hash_internal(
- state,
- &object->via.map.ptr[index].key);
-
- if (result == 0) {
- result = flb_msgpack_object_hash_internal(
- state,
- &object->via.map.ptr[index].val);
- }
- }
- }
- else if (object->type == MSGPACK_OBJECT_BIN) {
- cfl_hash_64bits_update(state,
- object->via.bin.ptr,
- object->via.bin.size);
- }
- else if (object->type == MSGPACK_OBJECT_EXT) {
- cfl_hash_64bits_update(state,
- &object->via.ext.type,
- sizeof(object->via.ext.type));
-
- cfl_hash_64bits_update(state,
- object->via.ext.ptr,
- object->via.ext.size);
- }
-
- return result;
-}
-
-static int flb_hash_msgpack_object_list(cfl_hash_64bits_t *hash,
- size_t entry_count,
- ...)
-{
- cfl_hash_state_t hash_state;
- va_list arguments;
- msgpack_object *object;
- int result;
- size_t index;
-
- cfl_hash_64bits_reset(&hash_state);
-
- va_start(arguments, entry_count);
-
- result = 0;
-
- for (index = 0 ;
- index < entry_count &&
- result == 0 ;
- index++) {
- object = va_arg(arguments, msgpack_object *);
-
- if (object == NULL) {
- break;
- }
-
- result = flb_msgpack_object_hash_internal(&hash_state, object);
- }
-
- va_end(arguments);
-
- if (result == 0) {
- *hash = cfl_hash_64bits_digest(&hash_state);
- }
-
- return result;
-}
-
-struct flb_deduplication_list_entry {
- cfl_hash_64bits_t hash;
- struct cfl_list _head;
-};
-
-void flb_deduplication_list_init(struct cfl_list *deduplication_list)
-{
- cfl_list_init(deduplication_list);
-}
-
-int flb_deduplication_list_validate(struct cfl_list *deduplication_list,
- cfl_hash_64bits_t hash)
-{
- struct cfl_list *iterator;
- struct flb_deduplication_list_entry *entry;
-
- cfl_list_foreach(iterator, deduplication_list) {
- entry = cfl_list_entry(iterator,
- struct flb_deduplication_list_entry,
- _head);
-
- if (entry->hash == hash) {
- return FLB_TRUE;
- }
- }
-
- return FLB_FALSE;
-}
-
-int flb_deduplication_list_add(struct cfl_list *deduplication_list,
- cfl_hash_64bits_t hash)
-{
- struct flb_deduplication_list_entry *entry;
-
- entry = (struct flb_deduplication_list_entry *)
- flb_calloc(1,
- sizeof(struct flb_deduplication_list_entry));
-
- if (entry == NULL) {
- return -1;
- }
-
- cfl_list_entry_init(&entry->_head);
- entry->hash = hash;
-
- cfl_list_append(&entry->_head, deduplication_list);
-
- return 0;
-}
-
-void flb_deduplication_list_purge(struct cfl_list *deduplication_list)
-{
- struct cfl_list *iterator;
- struct cfl_list *backup;
- struct flb_deduplication_list_entry *entry;
-
- cfl_list_foreach_safe(iterator, backup, deduplication_list) {
- entry = cfl_list_entry(iterator,
- struct flb_deduplication_list_entry,
- _head);
-
- cfl_list_del(&entry->_head);
-
- free(entry);
- }
-}
-
-int flb_ml_flush_metadata_buffer(struct flb_ml_stream *mst,
- struct flb_ml_stream_group *group,
- int deduplicate_metadata)
-{
- int append_metadata_entry;
- cfl_hash_64bits_t metadata_entry_hash;
- struct cfl_list deduplication_list;
- msgpack_unpacked metadata_map;
- size_t offset;
- size_t index;
- msgpack_object value;
- msgpack_object key;
- int ret;
-
- ret = FLB_EVENT_ENCODER_SUCCESS;
-
- if (deduplicate_metadata) {
- flb_deduplication_list_init(&deduplication_list);
- }
-
- msgpack_unpacked_init(&metadata_map);
-
- offset = 0;
- while (ret == FLB_EVENT_ENCODER_SUCCESS &&
- msgpack_unpack_next(&metadata_map,
- group->mp_md_sbuf.data,
- group->mp_md_sbuf.size,
- &offset) == MSGPACK_UNPACK_SUCCESS) {
-
- for (index = 0;
- index < metadata_map.data.via.map.size &&
- ret == FLB_EVENT_ENCODER_SUCCESS;
- index++) {
- key = metadata_map.data.via.map.ptr[index].key;
- value = metadata_map.data.via.map.ptr[index].val;
-
- append_metadata_entry = FLB_TRUE;
-
- if (deduplicate_metadata) {
- ret = flb_hash_msgpack_object_list(&metadata_entry_hash,
- 2,
- &key,
- &value);
- if (ret != 0) {
- ret = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT;
- }
- else {
- ret = flb_deduplication_list_validate(
- &deduplication_list,
- metadata_entry_hash);
-
- if (ret) {
- append_metadata_entry = FLB_FALSE;
-
- ret = FLB_EVENT_ENCODER_SUCCESS;
- }
- else {
- ret = flb_deduplication_list_add(
- &deduplication_list,
- metadata_entry_hash);
-
- if (ret == 0) {
- ret = FLB_EVENT_ENCODER_SUCCESS;
- }
- else {
- ret = FLB_EVENT_ENCODER_ERROR_ALLOCATION_ERROR;
- }
- }
- }
- }
-
- if (append_metadata_entry) {
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_append_metadata_values(
- &mst->ml->log_event_encoder,
- FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&key),
- FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&value));
- }
- }
- }
- }
-
- msgpack_unpacked_destroy(&metadata_map);
-
- if (deduplicate_metadata) {
- flb_deduplication_list_purge(&deduplication_list);
- }
-
- return ret;
-}
-
-int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser,
- struct flb_ml_stream *mst,
- struct flb_ml_stream_group *group,
- int forced_flush)
-{
- int i;
- int ret;
- int size;
- int len;
- size_t off = 0;
- msgpack_object map;
- msgpack_object k;
- msgpack_object v;
- msgpack_sbuffer mp_sbuf;
- msgpack_packer mp_pck;
- msgpack_unpacked result;
- struct flb_ml_parser_ins *parser_i = mst->parser;
- struct flb_time *group_time;
- struct flb_time now;
-
- breakline_prepare(parser_i, group);
- len = flb_sds_len(group->buf);
-
- /* init msgpack buffer */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- /* if the group don't have a time set, use current time */
- if (flb_time_to_nanosec(&group->mp_time) == 0L) {
- flb_time_get(&now);
- group_time = &now;
- } else {
- group_time = &group->mp_time;
- }
-
- /* compose final record if we have a first line context */
- if (group->mp_sbuf.size > 0) {
- msgpack_unpacked_init(&result);
- ret = msgpack_unpack_next(&result,
- group->mp_sbuf.data, group->mp_sbuf.size,
- &off);
- if (ret != MSGPACK_UNPACK_SUCCESS) {
- flb_error("[multiline] could not unpack first line state buffer");
- msgpack_unpacked_destroy(&result);
- group->mp_sbuf.size = 0;
- return -1;
- }
- map = result.data;
-
- if (map.type != MSGPACK_OBJECT_MAP) {
- flb_error("[multiline] expected MAP type in first line state buffer");
- msgpack_unpacked_destroy(&result);
- group->mp_sbuf.size = 0;
- return -1;
- }
-
- /* Take the first line keys and repack */
- len = flb_sds_len(parser_i->key_content);
- size = map.via.map.size;
- msgpack_pack_map(&mp_pck, size);
-
- for (i = 0; i < size; i++) {
- k = map.via.map.ptr[i].key;
- v = map.via.map.ptr[i].val;
-
- /*
- * Check if the current key is the key that will contain the
- * concatenated multiline buffer
- */
- if (k.type == MSGPACK_OBJECT_STR &&
- parser_i->key_content &&
- k.via.str.size == len &&
- strncmp(k.via.str.ptr, parser_i->key_content, len) == 0) {
-
- /* key */
- msgpack_pack_object(&mp_pck, k);
-
- /* value */
- len = flb_sds_len(group->buf);
- msgpack_pack_str(&mp_pck, len);
- msgpack_pack_str_body(&mp_pck, group->buf, len);
- }
- else {
- /* key / val */
- msgpack_pack_object(&mp_pck, k);
- msgpack_pack_object(&mp_pck, v);
- }
- }
- msgpack_unpacked_destroy(&result);
- group->mp_sbuf.size = 0;
- }
- else if (len > 0) {
- /* Pack raw content as Fluent Bit record */
- msgpack_pack_map(&mp_pck, 1);
-
- /* key */
- if (parser_i->key_content) {
- len = flb_sds_len(parser_i->key_content);
- msgpack_pack_str(&mp_pck, len);
- msgpack_pack_str_body(&mp_pck, parser_i->key_content, len);
- }
- else {
- msgpack_pack_str(&mp_pck, 3);
- msgpack_pack_str_body(&mp_pck, "log", 3);
- }
-
- /* val */
- len = flb_sds_len(group->buf);
- msgpack_pack_str(&mp_pck, len);
- msgpack_pack_str_body(&mp_pck, group->buf, len);
- }
-
- if (mp_sbuf.size > 0) {
- /*
- * a 'forced_flush' means to alert the caller that the data 'must be flushed to it destination'. This flag is
- * only enabled when the flush process has been triggered by the multiline timer, e.g:
- *
- * - the message is complete or incomplete and its time to dispatch it.
- */
- if (forced_flush) {
- mst->forced_flush = FLB_TRUE;
- }
-
- /* encode and invoke the user callback */
-
- ret = flb_log_event_encoder_begin_record(
- &mst->ml->log_event_encoder);
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_timestamp(
- &mst->ml->log_event_encoder,
- group_time);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_ml_flush_metadata_buffer(mst,
- group,
- FLB_TRUE);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_set_body_from_raw_msgpack(
- &mst->ml->log_event_encoder,
- mp_sbuf.data,
- mp_sbuf.size);
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- ret = flb_log_event_encoder_commit_record(
- &mst->ml->log_event_encoder);
- }
-
- if (ret != FLB_EVENT_ENCODER_SUCCESS) {
- flb_error("[multiline] error packing event");
-
- return -1;
- }
-
- if (ret == FLB_EVENT_ENCODER_SUCCESS) {
- mst->cb_flush(ml_parser,
- mst,
- mst->cb_data,
- mst->ml->log_event_encoder.output_buffer,
- mst->ml->log_event_encoder.output_length);
- }
- else {
- flb_error("[multiline] log event encoder error : %d", ret);
- }
-
- flb_log_event_encoder_reset(&mst->ml->log_event_encoder);
-
- if (forced_flush) {
- mst->forced_flush = FLB_FALSE;
- }
- }
-
- msgpack_sbuffer_destroy(&mp_sbuf);
- flb_sds_len_set(group->buf, 0);
-
- /* Update last flush time */
- group->last_flush = time_ms_now();
-
- return 0;
-}
-
-/*
- * Initialize multiline global environment.
- *
- * note: function must be invoked before any flb_ml_create() call.
- */
-int flb_ml_init(struct flb_config *config)
-{
- int ret;
-
- ret = flb_ml_parser_builtin_create(config);
- if (ret == -1) {
- return -1;
- }
-
- return 0;
-}
-
-int flb_ml_exit(struct flb_config *config)
-{
- flb_ml_parser_destroy_all(&config->multiline_parsers);
- return 0;
-}