diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/src/multiline | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/src/multiline')
-rw-r--r-- | fluent-bit/src/multiline/CMakeLists.txt | 15 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml.c | 1562 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_group.c | 86 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_mode.c | 111 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_parser.c | 347 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_parser_cri.c | 81 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_parser_docker.c | 110 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_parser_go.c | 140 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_parser_java.c | 143 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_parser_python.c | 98 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_parser_ruby.c | 87 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_rule.c | 421 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_stream.c | 338 |
13 files changed, 0 insertions, 3539 deletions
diff --git a/fluent-bit/src/multiline/CMakeLists.txt b/fluent-bit/src/multiline/CMakeLists.txt deleted file mode 100644 index 294ef3e8..00000000 --- a/fluent-bit/src/multiline/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ -set(src_multiline - # built-in parsers - multiline/flb_ml_parser_cri.c - multiline/flb_ml_parser_docker.c - multiline/flb_ml_parser_python.c - multiline/flb_ml_parser_java.c - multiline/flb_ml_parser_go.c - multiline/flb_ml_parser_ruby.c - # core - multiline/flb_ml_stream.c - multiline/flb_ml_parser.c - multiline/flb_ml_group.c - multiline/flb_ml_rule.c - multiline/flb_ml.c PARENT_SCOPE - ) diff --git a/fluent-bit/src/multiline/flb_ml.c b/fluent-bit/src/multiline/flb_ml.c deleted file mode 100644 index 28e123ce..00000000 --- a/fluent-bit/src/multiline/flb_ml.c +++ /dev/null @@ -1,1562 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_mem.h> -#include <fluent-bit/flb_log.h> -#include <fluent-bit/flb_regex.h> -#include <fluent-bit/flb_time.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/flb_scheduler.h> -#include <fluent-bit/multiline/flb_ml.h> -#include <fluent-bit/multiline/flb_ml_rule.h> -#include <fluent-bit/multiline/flb_ml_group.h> - -#include <stdarg.h> -#include <math.h> - -static inline int match_negate(struct flb_ml_parser *ml_parser, int matched) -{ - int rule_match = matched; - - /* Validate pattern matching against expected 'negate' condition */ - if (matched == FLB_TRUE) { - if (ml_parser->negate == FLB_FALSE) { - rule_match = FLB_TRUE; - } - else { - rule_match = FLB_FALSE; - } - } - else { - if (ml_parser->negate == FLB_TRUE) { - rule_match = FLB_TRUE; - } - } - - return rule_match; -} - -static uint64_t time_ms_now() -{ - uint64_t ms; - struct flb_time tm; - - flb_time_get(&tm); - ms = (tm.tm.tv_sec * 1000) + lround(tm.tm.tv_nsec/1.0e6); - return ms; -} - - -int flb_ml_flush_stdout(struct flb_ml_parser *parser, - struct flb_ml_stream *mst, - void *data, char *buf_data, size_t buf_size) -{ - fprintf(stdout, "\n%s----- MULTILINE FLUSH (stream_id=%" PRIu64 ") -----%s\n", - ANSI_GREEN, mst->id, ANSI_RESET); - - /* Print incoming flush buffer */ - flb_pack_print(buf_data, buf_size); - - fprintf(stdout, "%s----------- EOF -----------%s\n", - ANSI_GREEN, ANSI_RESET); - return 0; -} - -int flb_ml_type_lookup(char *str) -{ - int type = -1; - - if (strcasecmp(str, "regex") == 0) { - type = FLB_ML_REGEX; - } - else if (strcasecmp(str, "endswith") == 0) { - type = FLB_ML_ENDSWITH; - } - else if (strcasecmp(str, "equal") == 0 || strcasecmp(str, "eq") == 0) { - type = FLB_ML_EQ; - } - - return type; -} - -void flb_ml_flush_parser_instance(struct flb_ml *ml, - struct flb_ml_parser_ins *parser_i, - uint64_t stream_id, int forced_flush) -{ - struct mk_list *head; - struct mk_list *head_group; - struct flb_ml_stream *mst; - struct flb_ml_stream_group *group; - - mk_list_foreach(head, &parser_i->streams) { - mst = mk_list_entry(head, struct flb_ml_stream, _head); - if (stream_id != 0 && mst->id != stream_id) { - continue; - } - - /* Iterate stream groups */ - mk_list_foreach(head_group, &mst->groups) { - group = mk_list_entry(head_group, struct flb_ml_stream_group, _head); - flb_ml_flush_stream_group(parser_i->ml_parser, mst, group, forced_flush); - } - } -} - -void flb_ml_flush_pending(struct flb_ml *ml, uint64_t now, int forced_flush) -{ - struct mk_list *head; - struct flb_ml_parser_ins *parser_i; - struct flb_ml_group *group; - - /* set the last flush time */ - ml->last_flush = now; - - /* flush only the first group of the context */ - group = mk_list_entry_first(&ml->groups, struct flb_ml_group, _head); - - /* iterate group parser instances */ - mk_list_foreach(head, &group->parsers) { - parser_i = mk_list_entry(head, struct flb_ml_parser_ins, _head); - flb_ml_flush_parser_instance(ml, parser_i, 0, forced_flush); - } -} - -void flb_ml_flush_pending_now(struct flb_ml *ml) -{ - uint64_t now; - - now = time_ms_now(); - flb_ml_flush_pending(ml, now, FLB_TRUE); -} - -static void cb_ml_flush_timer(struct flb_config *ctx, void *data) -{ - uint64_t now; - struct flb_ml *ml = data; - - now = time_ms_now(); - if (ml->last_flush + ml->flush_ms > now) { - return; - } - - /* - * Iterate over all streams and groups and for a flush for expired groups - * which has not flushed in the last N milliseconds. - */ - flb_ml_flush_pending(ml, now, FLB_TRUE); -} - -int flb_ml_register_context(struct flb_ml_stream_group *group, - struct flb_time *tm, msgpack_object *map) -{ - if (tm) { - flb_time_copy(&group->mp_time, tm); - } - - if (map) { - msgpack_pack_object(&group->mp_pck, *map); - } - - return 0; -} - -static inline void breakline_prepare(struct flb_ml_parser_ins *parser_i, - struct flb_ml_stream_group *stream_group) -{ - int len; - - if (parser_i->key_content) { - return; - } - - len = flb_sds_len(stream_group->buf); - if (len <= 0) { - return; - } - - if (stream_group->buf[len - 1] != '\n') { - flb_sds_cat_safe(&stream_group->buf, "\n", 1); - } -} - -/* - * package content into a multiline stream: - * - * full_map: if the original content to process comes in msgpack map, this variable - * reference the map. It's only used in case we will package a first line so we - * store a copy of the other key values in the map for flush time. - */ -static int package_content(struct flb_ml_stream *mst, - msgpack_object *metadata, - msgpack_object *full_map, - void *buf, size_t size, struct flb_time *tm, - msgpack_object *val_content, - msgpack_object *val_pattern, - msgpack_object *val_group) -{ - int len; - int ret; - int rule_match = FLB_FALSE; - int processed = FLB_FALSE; - int type; - size_t offset = 0; - size_t buf_size; - char *buf_data; - msgpack_object *val = val_content; - struct flb_ml_parser *parser; - struct flb_ml_parser_ins *parser_i; - struct flb_ml_stream_group *stream_group; - - parser_i = mst->parser; - parser = parser_i->ml_parser; - - /* Get stream group */ - stream_group = flb_ml_stream_group_get(mst->parser, mst, val_group); - if (!mst->last_stream_group) { - mst->last_stream_group = stream_group; - } - else { - if (mst->last_stream_group != stream_group) { - mst->last_stream_group = stream_group; - } - } - - /* Set the parser type */ - type = parser->type; - - if (val_pattern) { - val = val_pattern; - } - - if (val) { - buf_data = (char *) val->via.str.ptr; - buf_size = val->via.str.size; - } - else { - buf_data = buf; - buf_size = size; - - } - if (type == FLB_ML_REGEX) { - ret = flb_ml_rule_process(parser, mst, - stream_group, full_map, buf, size, tm, - val_content, val_pattern); - if (ret == -1) { - processed = FLB_FALSE; - } - else { - processed = FLB_TRUE; - } - } - else if (type == FLB_ML_ENDSWITH) { - len = flb_sds_len(parser->match_str); - if (buf_data && len <= buf_size) { - /* Validate if content ends with expected string */ - offset = buf_size - len; - ret = memcmp(buf_data + offset, parser->match_str, len); - if (ret == 0) { - rule_match = match_negate(parser, FLB_TRUE); - } - else { - rule_match = match_negate(parser, FLB_FALSE); - } - - if (stream_group->mp_sbuf.size == 0) { - flb_ml_register_context(stream_group, tm, full_map); - } - - /* Prepare concatenation */ - breakline_prepare(parser_i, stream_group); - - /* Concatenate value */ - if (val_content) { - flb_sds_cat_safe(&stream_group->buf, - val_content->via.str.ptr, - val_content->via.str.size); - } - else { - flb_sds_cat_safe(&stream_group->buf, buf_data, buf_size); - } - - /* on ENDSWITH mode, a rule match means flush the content */ - if (rule_match) { - flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE); - } - processed = FLB_TRUE; - } - } - else if (type == FLB_ML_EQ) { - if (buf_size == flb_sds_len(parser->match_str) && - memcmp(buf_data, parser->match_str, buf_size) == 0) { - /* EQ match */ - rule_match = match_negate(parser, FLB_TRUE); - } - else { - rule_match = match_negate(parser, FLB_FALSE); - } - - if (stream_group->mp_sbuf.size == 0) { - flb_ml_register_context(stream_group, tm, full_map); - } - - /* Prepare concatenation */ - breakline_prepare(parser_i, stream_group); - - /* Concatenate value */ - if (val_content) { - flb_sds_cat_safe(&stream_group->buf, - val_content->via.str.ptr, - val_content->via.str.size); - } - else { - flb_sds_cat_safe(&stream_group->buf, buf_data, buf_size); - } - - /* on ENDSWITH mode, a rule match means flush the content */ - if (rule_match) { - flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE); - } - processed = FLB_TRUE; - } - - if (processed && metadata != NULL) { - msgpack_pack_object(&stream_group->mp_md_pck, *metadata); - } - - return processed; -} - -/* - * Retrieve the ID of a specific key name in a map. This function might be - * extended later to use record accessor, since all current cases are solved - * now quering the first level of keys in the map, we avoid record accessor - * to avoid extra memory allocations. - */ -static int get_key_id(msgpack_object *map, flb_sds_t key_name) -{ - int i; - int len; - int found = FLB_FALSE; - msgpack_object key; - msgpack_object val; - - if (!key_name) { - return -1; - } - - len = flb_sds_len(key_name); - for (i = 0; i < map->via.map.size; i++) { - key = map->via.map.ptr[i].key; - val = map->via.map.ptr[i].val; - - if (key.type != MSGPACK_OBJECT_STR || val.type != MSGPACK_OBJECT_STR) { - continue; - } - - if (key.via.str.size != len) { - continue; - } - - if (strncmp(key.via.str.ptr, key_name, len) == 0) { - found = FLB_TRUE; - break; - } - } - - if (found) { - return i; - } - - return -1; -} - -static int process_append(struct flb_ml_parser_ins *parser_i, - struct flb_ml_stream *mst, - int type, - struct flb_time *tm, - msgpack_object *metadata, - msgpack_object *obj, - void *buf, size_t size) -{ - int ret; - int id_content = -1; - int id_pattern = -1; - int id_group = -1; - int unpacked = FLB_FALSE; - size_t off = 0; - msgpack_object *full_map = NULL; - msgpack_object *val_content = NULL; - msgpack_object *val_pattern = NULL; - msgpack_object *val_group = NULL; - msgpack_unpacked result; - - /* Lookup the key */ - if (type == FLB_ML_TYPE_TEXT) { - ret = package_content(mst, NULL, NULL, buf, size, tm, NULL, NULL, NULL); - if (ret == FLB_FALSE) { - return -1; - } - return 0; - } - else if (type == FLB_ML_TYPE_MAP) { - full_map = obj; - /* - * When full_map and buf is not NULL, - * we use 'buf' since buf is already processed from full_map at - * ml_append_try_parser_type_map. - */ - if (!full_map || (buf != NULL && full_map != NULL)) { - off = 0; - msgpack_unpacked_init(&result); - ret = msgpack_unpack_next(&result, buf, size, &off); - if (ret != MSGPACK_UNPACK_SUCCESS) { - return -1; - } - full_map = &result.data; - unpacked = FLB_TRUE; - } - else if (full_map->type != MSGPACK_OBJECT_MAP) { - msgpack_unpacked_destroy(&result); - return -1; - } - } - - /* Lookup for key_content entry */ - id_content = get_key_id(full_map, parser_i->key_content); - if (id_content == -1) { - if (unpacked) { - msgpack_unpacked_destroy(&result); - } - return -1; - } - - val_content = &full_map->via.map.ptr[id_content].val; - if (val_content->type != MSGPACK_OBJECT_STR) { - val_content = NULL; - } - - /* Optional: Lookup for key_pattern entry */ - if (parser_i->key_pattern) { - id_pattern = get_key_id(full_map, parser_i->key_pattern); - if (id_pattern >= 0) { - val_pattern = &full_map->via.map.ptr[id_pattern].val; - if (val_pattern->type != MSGPACK_OBJECT_STR) { - val_pattern = NULL; - } - } - } - - /* Optional: lookup for key_group entry */ - if (parser_i->key_group) { - id_group = get_key_id(full_map, parser_i->key_group); - if (id_group >= 0) { - val_group = &full_map->via.map.ptr[id_group].val; - if (val_group->type != MSGPACK_OBJECT_STR) { - val_group = NULL; - } - } - } - - /* Package the content */ - ret = package_content(mst, metadata, full_map, buf, size, tm, - val_content, val_pattern, val_group); - if (unpacked) { - msgpack_unpacked_destroy(&result); - } - if (ret == FLB_FALSE) { - return -1; - } - return 0; -} - -static int ml_append_try_parser_type_text(struct flb_ml_parser_ins *parser, - uint64_t stream_id, - int *type, - struct flb_time *tm, void *buf, size_t size, - msgpack_object *map, - void **out_buf, size_t *out_size, int *out_release, - struct flb_time *out_time) -{ - int ret; - - if (parser->ml_parser->parser) { - /* Parse incoming content */ - ret = flb_parser_do(parser->ml_parser->parser, (char *) buf, size, - out_buf, out_size, out_time); - if (flb_time_to_nanosec(out_time) == 0L) { - flb_time_copy(out_time, tm); - } - if (ret >= 0) { - *out_release = FLB_TRUE; - *type = FLB_ML_TYPE_MAP; - } - else { - *out_buf = buf; - *out_size = size; - return -1; - } - } - else { - *out_buf = buf; - *out_size = size; - } - return 0; -} - -static int ml_append_try_parser_type_map(struct flb_ml_parser_ins *parser, - uint64_t stream_id, - int *type, - struct flb_time *tm, void *buf, size_t size, - msgpack_object *map, - void **out_buf, size_t *out_size, int *out_release, - struct flb_time *out_time) -{ - int map_size; - int i; - int len; - msgpack_object key; - msgpack_object val; - - if (map == NULL || map->type != MSGPACK_OBJECT_MAP) { - flb_error("%s:invalid map", __FUNCTION__); - return -1; - } - - if (parser->ml_parser->parser) { - /* lookup key_content */ - - len = flb_sds_len(parser->key_content); - map_size = map->via.map.size; - for(i = 0; i < map_size; i++) { - key = map->via.map.ptr[i].key; - val = map->via.map.ptr[i].val; - if (key.type == MSGPACK_OBJECT_STR && - parser->key_content && - key.via.str.size == len && - strncmp(key.via.str.ptr, parser->key_content, len) == 0) { - /* key_content found */ - if (val.type == MSGPACK_OBJECT_STR) { - /* try parse the value of key_content e*/ - return ml_append_try_parser_type_text(parser, stream_id, type, - tm, (void*) val.via.str.ptr, - val.via.str.size, - map, - out_buf, out_size, out_release, - out_time); - } else { - flb_error("%s: not string", __FUNCTION__); - return -1; - } - } - } - } - else { - *out_buf = buf; - *out_size = size; - } - return 0; -} - -static int ml_append_try_parser(struct flb_ml_parser_ins *parser, - uint64_t stream_id, - int type, - struct flb_time *tm, void *buf, size_t size, - msgpack_object *metadata, - msgpack_object *map) -{ - int ret; - int release = FLB_FALSE; - void *out_buf = NULL; - size_t out_size = 0; - struct flb_time out_time; - struct flb_ml_stream *mst; - - flb_time_zero(&out_time); - - switch (type) { - case FLB_ML_TYPE_TEXT: - ret = ml_append_try_parser_type_text(parser, stream_id, &type, - tm, buf, size, map, - &out_buf, &out_size, &release, - &out_time); - if (ret < 0) { - return -1; - } - break; - case FLB_ML_TYPE_MAP: - ret = ml_append_try_parser_type_map(parser, stream_id, &type, - tm, buf, size, map, - &out_buf, &out_size, &release, - &out_time); - if (ret < 0) { - return -1; - } - break; - - default: - flb_error("[multiline] unknown type=%d", type); - return -1; - } - - if (flb_time_to_nanosec(&out_time) == 0L) { - if (tm && flb_time_to_nanosec(tm) != 0L) { - flb_time_copy(&out_time, tm); - } - else { - flb_time_get(&out_time); - } - } - - /* Get the stream */ - mst = flb_ml_stream_get(parser, stream_id); - if (!mst) { - flb_error("[multiline] invalid stream_id %" PRIu64 ", could not " - "append content to multiline context", stream_id); - goto exit; - } - - /* Process the binary record */ - ret = process_append(parser, mst, type, &out_time, metadata, map, out_buf, out_size); - if (ret == -1) { - if (release == FLB_TRUE) { - flb_free(out_buf); - } - return -1; - } - - exit: - if (release == FLB_TRUE) { - flb_free(out_buf); - } - - return 0; -} - -int flb_ml_append_text(struct flb_ml *ml, uint64_t stream_id, - struct flb_time *tm, void *buf, size_t size) -{ - int ret; - int processed = FLB_FALSE; - struct mk_list *head; - struct mk_list *head_group; - struct flb_ml_group *group; - struct flb_ml_stream *mst; - struct flb_ml_parser_ins *lru_parser = NULL; - struct flb_ml_parser_ins *parser_i; - struct flb_time out_time; - struct flb_ml_stream_group *st_group; - int type; - - type = FLB_ML_TYPE_TEXT; - - flb_time_zero(&out_time); - - mk_list_foreach(head, &ml->groups) { - group = mk_list_entry(head, struct flb_ml_group, _head); - - /* Check if the incoming data matches the last recently used parser */ - lru_parser = group->lru_parser; - - if (lru_parser && lru_parser->last_stream_id == stream_id) { - ret = ml_append_try_parser(lru_parser, lru_parser->last_stream_id, type, - tm, buf, size, NULL, NULL); - if (ret == 0) { - processed = FLB_TRUE; - break; - } - else { - flb_ml_flush_parser_instance(ml, - lru_parser, - lru_parser->last_stream_id, - FLB_FALSE); - } - } - else if (lru_parser && lru_parser->last_stream_id > 0) { - /* - * Clear last recently used parser to match new parser. - * Do not flush last_stream_id since it should continue to parsing. - */ - lru_parser = NULL; - } - } - - mk_list_foreach(head_group, &group->parsers) { - parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); - if (lru_parser && lru_parser == parser_i && - lru_parser->last_stream_id == stream_id) { - continue; - } - - ret = ml_append_try_parser(parser_i, stream_id, type, - tm, buf, size, NULL, NULL); - if (ret == 0) { - group->lru_parser = parser_i; - group->lru_parser->last_stream_id = stream_id; - lru_parser = parser_i; - processed = FLB_TRUE; - break; - } - else { - parser_i = NULL; - } - } - - if (!processed) { - if (lru_parser) { - flb_ml_flush_parser_instance(ml, lru_parser, stream_id, FLB_FALSE); - parser_i = lru_parser; - } - else { - /* get the first parser (just to make use of it buffers) */ - parser_i = mk_list_entry_first(&group->parsers, - struct flb_ml_parser_ins, - _head); - } - - flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE); - mst = flb_ml_stream_get(parser_i, stream_id); - if (!mst) { - flb_error("[multiline] invalid stream_id %" PRIu64 ", could not " - "append content to multiline context", stream_id); - return -1; - } - - /* Get stream group */ - st_group = flb_ml_stream_group_get(mst->parser, mst, NULL); - flb_sds_cat_safe(&st_group->buf, buf, size); - flb_ml_flush_stream_group(parser_i->ml_parser, mst, st_group, FLB_FALSE); - } - - return 0; -} - - - -int flb_ml_append_object(struct flb_ml *ml, - uint64_t stream_id, - struct flb_time *tm, - msgpack_object *metadata, - msgpack_object *obj) -{ - int ret; - int type; - int processed = FLB_FALSE; - struct mk_list *head; - struct mk_list *head_group; - struct flb_ml_group *group; - struct flb_ml_parser_ins *lru_parser = NULL; - struct flb_ml_parser_ins *parser_i; - struct flb_ml_stream *mst; - struct flb_ml_stream_group *st_group; - struct flb_log_event event; - - if (metadata == NULL) { - metadata = ml->log_event_decoder.empty_map; - } - - /* - * As incoming objects, we accept packed events - * and msgpack Maps containing key/value pairs. - */ - if (obj->type == MSGPACK_OBJECT_ARRAY) { - flb_error("[multiline] appending object with invalid type, expected " - "map, received type=%i", obj->type); - return -1; - - - flb_log_event_decoder_reset(&ml->log_event_decoder, NULL, 0); - - ret = flb_event_decoder_decode_object(&ml->log_event_decoder, - &event, - obj); - - if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_error("[multiline] invalid event object"); - - return -1; - } - - tm = &event.timestamp; - obj = event.body; - metadata = event.metadata; - - type = FLB_ML_TYPE_MAP; - } - else if (obj->type == MSGPACK_OBJECT_MAP) { - type = FLB_ML_TYPE_MAP; - } - else { - flb_error("[multiline] appending object with invalid type, expected " - "array or map, received type=%i", obj->type); - return -1; - } - - mk_list_foreach(head, &ml->groups) { - group = mk_list_entry(head, struct flb_ml_group, _head); - - /* Check if the incoming data matches the last recently used parser */ - lru_parser = group->lru_parser; - - if (lru_parser && lru_parser->last_stream_id == stream_id) { - ret = ml_append_try_parser(lru_parser, lru_parser->last_stream_id, type, - tm, NULL, 0, metadata, obj); - if (ret == 0) { - processed = FLB_TRUE; - break; - } - else { - flb_ml_flush_parser_instance(ml, - lru_parser, - lru_parser->last_stream_id, - FLB_FALSE); - } - } - else if (lru_parser && lru_parser->last_stream_id > 0) { - /* - * Clear last recently used parser to match new parser. - * Do not flush last_stream_id since it should continue to parsing. - */ - lru_parser = NULL; - } - } - - mk_list_foreach(head_group, &group->parsers) { - parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); - if (lru_parser && parser_i == lru_parser) { - continue; - } - - ret = ml_append_try_parser(parser_i, stream_id, type, - tm, NULL, 0, metadata, obj); - if (ret == 0) { - group->lru_parser = parser_i; - group->lru_parser->last_stream_id = stream_id; - lru_parser = parser_i; - processed = FLB_TRUE; - break; - } - else { - parser_i = NULL; - } - } - - if (!processed) { - if (lru_parser) { - flb_ml_flush_parser_instance(ml, lru_parser, stream_id, FLB_FALSE); - parser_i = lru_parser; - } - else { - /* get the first parser (just to make use of it buffers) */ - parser_i = mk_list_entry_first(&group->parsers, - struct flb_ml_parser_ins, - _head); - } - - flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE); - mst = flb_ml_stream_get(parser_i, stream_id); - if (!mst) { - flb_error("[multiline] invalid stream_id %" PRIu64 ", could not " - "append content to multiline context", stream_id); - - return -1; - } - - /* Get stream group */ - st_group = flb_ml_stream_group_get(mst->parser, mst, NULL); - - ret = flb_log_event_encoder_begin_record(&ml->log_event_encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp( - &ml->log_event_encoder, tm); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - if (metadata != ml->log_event_decoder.empty_map) { - ret = flb_log_event_encoder_set_metadata_from_msgpack_object( - &ml->log_event_encoder, metadata); - } - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ml->log_event_encoder, obj); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(&ml->log_event_encoder); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - mst->cb_flush(parser_i->ml_parser, - mst, - mst->cb_data, - ml->log_event_encoder.output_buffer, - ml->log_event_encoder.output_length); - } - else { - flb_error("[multiline] log event encoder error : %d", ret); - } - - flb_log_event_encoder_reset(&ml->log_event_encoder); - - /* reset group buffer counters */ - st_group->mp_sbuf.size = 0; - flb_sds_len_set(st_group->buf, 0); - - /* Update last flush time */ - st_group->last_flush = time_ms_now(); - } - - return 0; -} - -int flb_ml_append_event(struct flb_ml *ml, uint64_t stream_id, - struct flb_log_event *event) -{ - return flb_ml_append_object(ml, - stream_id, - &event->timestamp, - event->metadata, - event->body); -} - - -struct flb_ml *flb_ml_create(struct flb_config *ctx, char *name) -{ - int result; - struct flb_ml *ml; - - ml = flb_calloc(1, sizeof(struct flb_ml)); - if (!ml) { - flb_errno(); - return NULL; - } - ml->name = flb_sds_create(name); - if (!ml) { - flb_free(ml); - return NULL; - } - - ml->config = ctx; - ml->last_flush = time_ms_now(); - mk_list_init(&ml->groups); - - result = flb_log_event_decoder_init(&ml->log_event_decoder, - NULL, - 0); - - if (result != FLB_EVENT_DECODER_SUCCESS) { - flb_error("cannot initialize log event decoder"); - - flb_ml_destroy(ml); - - return NULL; - } - - result = flb_log_event_encoder_init(&ml->log_event_encoder, - FLB_LOG_EVENT_FORMAT_DEFAULT); - - if (result != FLB_EVENT_ENCODER_SUCCESS) { - flb_error("cannot initialize log event encoder"); - - flb_ml_destroy(ml); - - return NULL; - } - - return ml; -} - -/* - * Some multiline contexts might define a parser name but not a parser context, - * for missing contexts, just lookup the parser and perform the assignment. - * - * The common use case is when reading config files with [PARSER] and [MULTILINE_PARSER] - * definitions, so we need to delay the parser loading. - */ -int flb_ml_parsers_init(struct flb_config *ctx) -{ - struct mk_list *head; - struct flb_parser *p; - struct flb_ml_parser *ml_parser; - - mk_list_foreach(head, &ctx->multiline_parsers) { - ml_parser = mk_list_entry(head, struct flb_ml_parser, _head); - if (ml_parser->parser_name && !ml_parser->parser) { - p = flb_parser_get(ml_parser->parser_name, ctx); - if (!p) { - flb_error("multiline parser '%s' points to an undefined parser '%s'", - ml_parser->name, ml_parser->parser_name); - return -1; - } - ml_parser->parser = p; - } - } - - return 0; -} - -int flb_ml_auto_flush_init(struct flb_ml *ml) -{ - struct flb_sched *scheduler; - int ret; - - if (ml == NULL) { - return -1; - } - - scheduler = flb_sched_ctx_get(); - - if (scheduler == NULL) { - flb_error("[multiline] scheduler context has not been created"); - return -1; - } - - if (ml->flush_ms < 500) { - flb_error("[multiline] flush timeout '%i' is too low", ml->flush_ms); - return -1; - } - - /* Create flush timer */ - ret = flb_sched_timer_cb_create(scheduler, - FLB_SCHED_TIMER_CB_PERM, - ml->flush_ms, - cb_ml_flush_timer, - ml, NULL); - return ret; -} - -int flb_ml_destroy(struct flb_ml *ml) -{ - struct mk_list *head; - struct mk_list *tmp; - struct flb_ml_group *group; - - if (!ml) { - return 0; - } - - flb_log_event_decoder_destroy(&ml->log_event_decoder); - flb_log_event_encoder_destroy(&ml->log_event_encoder); - - if (ml->name) { - flb_sds_destroy(ml->name); - } - - /* destroy groups */ - mk_list_foreach_safe(head, tmp, &ml->groups) { - group = mk_list_entry(head, struct flb_ml_group, _head); - flb_ml_group_destroy(group); - } - - flb_free(ml); - return 0; -} - -static int flb_msgpack_object_hash_internal(cfl_hash_state_t *state, - msgpack_object *object) -{ - void *dummy_pointer; - int result; - int index; - - if (object == NULL) { - return 0; - } - - dummy_pointer = NULL; - result = 0; - - if (object->type == MSGPACK_OBJECT_NIL) { - cfl_hash_64bits_update(state, - &dummy_pointer, - sizeof(dummy_pointer)); - } - else if (object->type == MSGPACK_OBJECT_BOOLEAN) { - cfl_hash_64bits_update(state, - &object->via.boolean, - sizeof(object->via.boolean)); - } - else if (object->type == MSGPACK_OBJECT_POSITIVE_INTEGER || - object->type == MSGPACK_OBJECT_NEGATIVE_INTEGER) { - cfl_hash_64bits_update(state, - &object->via.u64, - sizeof(object->via.u64)); - } - else if (object->type == MSGPACK_OBJECT_FLOAT32 || - object->type == MSGPACK_OBJECT_FLOAT64 || - object->type == MSGPACK_OBJECT_FLOAT) { - cfl_hash_64bits_update(state, - &object->via.f64, - sizeof(object->via.f64)); - } - else if (object->type == MSGPACK_OBJECT_STR) { - cfl_hash_64bits_update(state, - object->via.str.ptr, - object->via.str.size); - } - else if (object->type == MSGPACK_OBJECT_ARRAY) { - for (index = 0 ; - index < object->via.array.size && - result == 0; - index++) { - result = flb_msgpack_object_hash_internal( - state, - &object->via.array.ptr[index]); - } - } - else if (object->type == MSGPACK_OBJECT_MAP) { - for (index = 0 ; - index < object->via.map.size && - result == 0; - index++) { - result = flb_msgpack_object_hash_internal( - state, - &object->via.map.ptr[index].key); - - if (result == 0) { - result = flb_msgpack_object_hash_internal( - state, - &object->via.map.ptr[index].val); - } - } - } - else if (object->type == MSGPACK_OBJECT_BIN) { - cfl_hash_64bits_update(state, - object->via.bin.ptr, - object->via.bin.size); - } - else if (object->type == MSGPACK_OBJECT_EXT) { - cfl_hash_64bits_update(state, - &object->via.ext.type, - sizeof(object->via.ext.type)); - - cfl_hash_64bits_update(state, - object->via.ext.ptr, - object->via.ext.size); - } - - return result; -} - -static int flb_hash_msgpack_object_list(cfl_hash_64bits_t *hash, - size_t entry_count, - ...) -{ - cfl_hash_state_t hash_state; - va_list arguments; - msgpack_object *object; - int result; - size_t index; - - cfl_hash_64bits_reset(&hash_state); - - va_start(arguments, entry_count); - - result = 0; - - for (index = 0 ; - index < entry_count && - result == 0 ; - index++) { - object = va_arg(arguments, msgpack_object *); - - if (object == NULL) { - break; - } - - result = flb_msgpack_object_hash_internal(&hash_state, object); - } - - va_end(arguments); - - if (result == 0) { - *hash = cfl_hash_64bits_digest(&hash_state); - } - - return result; -} - -struct flb_deduplication_list_entry { - cfl_hash_64bits_t hash; - struct cfl_list _head; -}; - -void flb_deduplication_list_init(struct cfl_list *deduplication_list) -{ - cfl_list_init(deduplication_list); -} - -int flb_deduplication_list_validate(struct cfl_list *deduplication_list, - cfl_hash_64bits_t hash) -{ - struct cfl_list *iterator; - struct flb_deduplication_list_entry *entry; - - cfl_list_foreach(iterator, deduplication_list) { - entry = cfl_list_entry(iterator, - struct flb_deduplication_list_entry, - _head); - - if (entry->hash == hash) { - return FLB_TRUE; - } - } - - return FLB_FALSE; -} - -int flb_deduplication_list_add(struct cfl_list *deduplication_list, - cfl_hash_64bits_t hash) -{ - struct flb_deduplication_list_entry *entry; - - entry = (struct flb_deduplication_list_entry *) - flb_calloc(1, - sizeof(struct flb_deduplication_list_entry)); - - if (entry == NULL) { - return -1; - } - - cfl_list_entry_init(&entry->_head); - entry->hash = hash; - - cfl_list_append(&entry->_head, deduplication_list); - - return 0; -} - -void flb_deduplication_list_purge(struct cfl_list *deduplication_list) -{ - struct cfl_list *iterator; - struct cfl_list *backup; - struct flb_deduplication_list_entry *entry; - - cfl_list_foreach_safe(iterator, backup, deduplication_list) { - entry = cfl_list_entry(iterator, - struct flb_deduplication_list_entry, - _head); - - cfl_list_del(&entry->_head); - - free(entry); - } -} - -int flb_ml_flush_metadata_buffer(struct flb_ml_stream *mst, - struct flb_ml_stream_group *group, - int deduplicate_metadata) -{ - int append_metadata_entry; - cfl_hash_64bits_t metadata_entry_hash; - struct cfl_list deduplication_list; - msgpack_unpacked metadata_map; - size_t offset; - size_t index; - msgpack_object value; - msgpack_object key; - int ret; - - ret = FLB_EVENT_ENCODER_SUCCESS; - - if (deduplicate_metadata) { - flb_deduplication_list_init(&deduplication_list); - } - - msgpack_unpacked_init(&metadata_map); - - offset = 0; - while (ret == FLB_EVENT_ENCODER_SUCCESS && - msgpack_unpack_next(&metadata_map, - group->mp_md_sbuf.data, - group->mp_md_sbuf.size, - &offset) == MSGPACK_UNPACK_SUCCESS) { - - for (index = 0; - index < metadata_map.data.via.map.size && - ret == FLB_EVENT_ENCODER_SUCCESS; - index++) { - key = metadata_map.data.via.map.ptr[index].key; - value = metadata_map.data.via.map.ptr[index].val; - - append_metadata_entry = FLB_TRUE; - - if (deduplicate_metadata) { - ret = flb_hash_msgpack_object_list(&metadata_entry_hash, - 2, - &key, - &value); - if (ret != 0) { - ret = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT; - } - else { - ret = flb_deduplication_list_validate( - &deduplication_list, - metadata_entry_hash); - - if (ret) { - append_metadata_entry = FLB_FALSE; - - ret = FLB_EVENT_ENCODER_SUCCESS; - } - else { - ret = flb_deduplication_list_add( - &deduplication_list, - metadata_entry_hash); - - if (ret == 0) { - ret = FLB_EVENT_ENCODER_SUCCESS; - } - else { - ret = FLB_EVENT_ENCODER_ERROR_ALLOCATION_ERROR; - } - } - } - } - - if (append_metadata_entry) { - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_metadata_values( - &mst->ml->log_event_encoder, - FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&key), - FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&value)); - } - } - } - } - - msgpack_unpacked_destroy(&metadata_map); - - if (deduplicate_metadata) { - flb_deduplication_list_purge(&deduplication_list); - } - - return ret; -} - -int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser, - struct flb_ml_stream *mst, - struct flb_ml_stream_group *group, - int forced_flush) -{ - int i; - int ret; - int size; - int len; - size_t off = 0; - msgpack_object map; - msgpack_object k; - msgpack_object v; - msgpack_sbuffer mp_sbuf; - msgpack_packer mp_pck; - msgpack_unpacked result; - struct flb_ml_parser_ins *parser_i = mst->parser; - struct flb_time *group_time; - struct flb_time now; - - breakline_prepare(parser_i, group); - len = flb_sds_len(group->buf); - - /* init msgpack buffer */ - msgpack_sbuffer_init(&mp_sbuf); - msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); - - /* if the group don't have a time set, use current time */ - if (flb_time_to_nanosec(&group->mp_time) == 0L) { - flb_time_get(&now); - group_time = &now; - } else { - group_time = &group->mp_time; - } - - /* compose final record if we have a first line context */ - if (group->mp_sbuf.size > 0) { - msgpack_unpacked_init(&result); - ret = msgpack_unpack_next(&result, - group->mp_sbuf.data, group->mp_sbuf.size, - &off); - if (ret != MSGPACK_UNPACK_SUCCESS) { - flb_error("[multiline] could not unpack first line state buffer"); - msgpack_unpacked_destroy(&result); - group->mp_sbuf.size = 0; - return -1; - } - map = result.data; - - if (map.type != MSGPACK_OBJECT_MAP) { - flb_error("[multiline] expected MAP type in first line state buffer"); - msgpack_unpacked_destroy(&result); - group->mp_sbuf.size = 0; - return -1; - } - - /* Take the first line keys and repack */ - len = flb_sds_len(parser_i->key_content); - size = map.via.map.size; - msgpack_pack_map(&mp_pck, size); - - for (i = 0; i < size; i++) { - k = map.via.map.ptr[i].key; - v = map.via.map.ptr[i].val; - - /* - * Check if the current key is the key that will contain the - * concatenated multiline buffer - */ - if (k.type == MSGPACK_OBJECT_STR && - parser_i->key_content && - k.via.str.size == len && - strncmp(k.via.str.ptr, parser_i->key_content, len) == 0) { - - /* key */ - msgpack_pack_object(&mp_pck, k); - - /* value */ - len = flb_sds_len(group->buf); - msgpack_pack_str(&mp_pck, len); - msgpack_pack_str_body(&mp_pck, group->buf, len); - } - else { - /* key / val */ - msgpack_pack_object(&mp_pck, k); - msgpack_pack_object(&mp_pck, v); - } - } - msgpack_unpacked_destroy(&result); - group->mp_sbuf.size = 0; - } - else if (len > 0) { - /* Pack raw content as Fluent Bit record */ - msgpack_pack_map(&mp_pck, 1); - - /* key */ - if (parser_i->key_content) { - len = flb_sds_len(parser_i->key_content); - msgpack_pack_str(&mp_pck, len); - msgpack_pack_str_body(&mp_pck, parser_i->key_content, len); - } - else { - msgpack_pack_str(&mp_pck, 3); - msgpack_pack_str_body(&mp_pck, "log", 3); - } - - /* val */ - len = flb_sds_len(group->buf); - msgpack_pack_str(&mp_pck, len); - msgpack_pack_str_body(&mp_pck, group->buf, len); - } - - if (mp_sbuf.size > 0) { - /* - * a 'forced_flush' means to alert the caller that the data 'must be flushed to it destination'. This flag is - * only enabled when the flush process has been triggered by the multiline timer, e.g: - * - * - the message is complete or incomplete and its time to dispatch it. - */ - if (forced_flush) { - mst->forced_flush = FLB_TRUE; - } - - /* encode and invoke the user callback */ - - ret = flb_log_event_encoder_begin_record( - &mst->ml->log_event_encoder); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_timestamp( - &mst->ml->log_event_encoder, - group_time); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_ml_flush_metadata_buffer(mst, - group, - FLB_TRUE); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_raw_msgpack( - &mst->ml->log_event_encoder, - mp_sbuf.data, - mp_sbuf.size); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record( - &mst->ml->log_event_encoder); - } - - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_error("[multiline] error packing event"); - - return -1; - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - mst->cb_flush(ml_parser, - mst, - mst->cb_data, - mst->ml->log_event_encoder.output_buffer, - mst->ml->log_event_encoder.output_length); - } - else { - flb_error("[multiline] log event encoder error : %d", ret); - } - - flb_log_event_encoder_reset(&mst->ml->log_event_encoder); - - if (forced_flush) { - mst->forced_flush = FLB_FALSE; - } - } - - msgpack_sbuffer_destroy(&mp_sbuf); - flb_sds_len_set(group->buf, 0); - - /* Update last flush time */ - group->last_flush = time_ms_now(); - - return 0; -} - -/* - * Initialize multiline global environment. - * - * note: function must be invoked before any flb_ml_create() call. - */ -int flb_ml_init(struct flb_config *config) -{ - int ret; - - ret = flb_ml_parser_builtin_create(config); - if (ret == -1) { - return -1; - } - - return 0; -} - -int flb_ml_exit(struct flb_config *config) -{ - flb_ml_parser_destroy_all(&config->multiline_parsers); - return 0; -} diff --git a/fluent-bit/src/multiline/flb_ml_group.c b/fluent-bit/src/multiline/flb_ml_group.c deleted file mode 100644 index 895a7105..00000000 --- a/fluent-bit/src/multiline/flb_ml_group.c +++ /dev/null @@ -1,86 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_mem.h> -#include <fluent-bit/flb_log.h> -#include <fluent-bit/multiline/flb_ml.h> -#include <fluent-bit/multiline/flb_ml_parser.h> - -struct flb_ml_group *flb_ml_group_create(struct flb_ml *ml) -{ - struct flb_ml_group *group; - - group = flb_calloc(1, sizeof(struct flb_ml_group)); - if (!group) { - flb_errno(); - return NULL; - } - group->id = mk_list_size(&ml->groups); - group->ml = ml; - group->lru_parser = NULL; - mk_list_init(&group->parsers); - - mk_list_add(&group->_head, &ml->groups); - - return group; -} - -/* - * Link a parser instance into the active group, if no group exists, a default - * one is created. - */ -int flb_ml_group_add_parser(struct flb_ml *ctx, struct flb_ml_parser_ins *p) -{ - struct flb_ml_group *group = NULL; - - if (mk_list_size(&ctx->groups) == 0) { - group = flb_ml_group_create(ctx); - if (!group) { - return -1; - } - } - else { - /* retrieve the latest active group */ - group = mk_list_entry_last(&ctx->groups, struct flb_ml_group, _head); - } - - if (!group) { - return -1; - } - - mk_list_add(&p->_head, &group->parsers); - return 0; -} - -void flb_ml_group_destroy(struct flb_ml_group *group) -{ - struct mk_list *head; - struct mk_list *tmp; - struct flb_ml_parser_ins *parser_i; - - /* destroy parser instances */ - mk_list_foreach_safe(head, tmp, &group->parsers) { - parser_i = mk_list_entry(head, struct flb_ml_parser_ins, _head); - flb_ml_parser_instance_destroy(parser_i); - } - - mk_list_del(&group->_head); - flb_free(group); -} diff --git a/fluent-bit/src/multiline/flb_ml_mode.c b/fluent-bit/src/multiline/flb_ml_mode.c deleted file mode 100644 index 964672d8..00000000 --- a/fluent-bit/src/multiline/flb_ml_mode.c +++ /dev/null @@ -1,111 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_log.h> -#include <fluent-bit/multiline/flb_ml.h> -#include <fluent-bit/multiline/flb_ml_mode.h> - -struct flb_ml *flb_ml_mode_create(struct flb_config *config, char *mode, int flush_ms, - char *key) -{ - if (strcmp(mode, "docker") == 0) { - return flb_ml_mode_docker(config, flush_ms); - } - else if (strcmp(mode, "cri") == 0) { - return flb_ml_mode_cri(config, flush_ms); - } - else if (strcmp(mode, "python") == 0) { - return flb_ml_mode_python(config, flush_ms, key); - } - else if (strcmp(mode, "java") == 0) { - return flb_ml_mode_java(config, flush_ms, key); - } - else if (strcmp(mode, "go") == 0) { - return flb_ml_mode_go(config, flush_ms, key); - } - - flb_error("[multiline] built-in mode '%s' not found", mode); - return NULL; -} - - -struct flb_ml_mode *flb_ml_parser_create(struct flb_config *ctx, - char *name, - int type, char *match_str, int negate, - int flush_ms, - char *key_content, - char *key_group, - char *key_pattern, - struct flb_parser *parser_ctx, - char *parser_name) -{ - struct flb_ml_mode *ml; - - ml = flb_calloc(1, sizeof(struct flb_ml)); - if (!ml) { - flb_errno(); - return NULL; - } - ml->name = flb_sds_create(name); - ml->type = type; - - if (match_str) { - ml->match_str = flb_sds_create(match_str); - if (!ml->match_str) { - flb_free(ml); - return NULL; - } - } - - ml->parser = parser_ctx; - if (parser_name) { - ml->parser_name = flb_sds_create(parser_name); - } - - ml->negate = negate; - mk_list_init(&ml->streams); - mk_list_init(&ml->regex_rules); - mk_list_add(&ml->_head, &ctx->multilines); - - if (key_content) { - ml->key_content = flb_sds_create(key_content); - if (!ml->key_content) { - flb_ml_destroy(ml); - return NULL; - } - } - - if (key_group) { - ml->key_group = flb_sds_create(key_group); - if (!ml->key_group) { - flb_ml_destroy(ml); - return NULL; - } - } - - if (key_pattern) { - ml->key_pattern = flb_sds_create(key_pattern); - if (!ml->key_pattern) { - flb_ml_destroy(ml); - return NULL; - } - } - return ml; -} diff --git a/fluent-bit/src/multiline/flb_ml_parser.c b/fluent-bit/src/multiline/flb_ml_parser.c deleted file mode 100644 index 7aa33789..00000000 --- a/fluent-bit/src/multiline/flb_ml_parser.c +++ /dev/null @@ -1,347 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_log.h> -#include <fluent-bit/multiline/flb_ml.h> -#include <fluent-bit/multiline/flb_ml_rule.h> -#include <fluent-bit/multiline/flb_ml_mode.h> -#include <fluent-bit/multiline/flb_ml_group.h> - -int flb_ml_parser_init(struct flb_ml_parser *ml_parser) -{ - int ret; - - ret = flb_ml_rule_init(ml_parser); - if (ret == -1) { - return -1; - } - - return 0; -} - -/* Create built-in multiline parsers */ -int flb_ml_parser_builtin_create(struct flb_config *config) -{ - struct flb_ml_parser *mlp; - - /* Docker */ - mlp = flb_ml_parser_docker(config); - if (!mlp) { - flb_error("[multiline] could not init 'docker' built-in parser"); - return -1; - } - - /* CRI */ - mlp = flb_ml_parser_cri(config); - if (!mlp) { - flb_error("[multiline] could not init 'cri' built-in parser"); - return -1; - } - - /* Java */ - mlp = flb_ml_parser_java(config, NULL); - if (!mlp) { - flb_error("[multiline] could not init 'java' built-in parser"); - return -1; - } - - /* Go */ - mlp = flb_ml_parser_go(config, NULL); - if (!mlp) { - flb_error("[multiline] could not init 'go' built-in parser"); - return -1; - } - - /* Ruby */ - mlp = flb_ml_parser_ruby(config, NULL); - if (!mlp) { - flb_error("[multiline] could not init 'ruby' built-in parser"); - return -1; - } - - /* Python */ - mlp = flb_ml_parser_python(config, NULL); - if (!mlp) { - flb_error("[multiline] could not init 'python' built-in parser"); - return -1; - } - - return 0; -} - -struct flb_ml_parser *flb_ml_parser_create(struct flb_config *ctx, - char *name, - int type, char *match_str, int negate, - int flush_ms, - char *key_content, - char *key_group, - char *key_pattern, - struct flb_parser *parser_ctx, - char *parser_name) -{ - struct flb_ml_parser *ml_parser; - - ml_parser = flb_calloc(1, sizeof(struct flb_ml_parser)); - if (!ml_parser) { - flb_errno(); - return NULL; - } - ml_parser->name = flb_sds_create(name); - ml_parser->type = type; - - if (match_str) { - ml_parser->match_str = flb_sds_create(match_str); - if (!ml_parser->match_str) { - if (ml_parser->name) { - flb_sds_destroy(ml_parser->name); - } - flb_free(ml_parser); - return NULL; - } - } - - ml_parser->parser = parser_ctx; - - if (parser_name) { - ml_parser->parser_name = flb_sds_create(parser_name); - } - ml_parser->negate = negate; - ml_parser->flush_ms = flush_ms; - mk_list_init(&ml_parser->regex_rules); - mk_list_add(&ml_parser->_head, &ctx->multiline_parsers); - - if (key_content) { - ml_parser->key_content = flb_sds_create(key_content); - if (!ml_parser->key_content) { - flb_ml_parser_destroy(ml_parser); - return NULL; - } - } - - if (key_group) { - ml_parser->key_group = flb_sds_create(key_group); - if (!ml_parser->key_group) { - flb_ml_parser_destroy(ml_parser); - return NULL; - } - } - - if (key_pattern) { - ml_parser->key_pattern = flb_sds_create(key_pattern); - if (!ml_parser->key_pattern) { - flb_ml_parser_destroy(ml_parser); - return NULL; - } - } - - return ml_parser; -} - -struct flb_ml_parser *flb_ml_parser_get(struct flb_config *ctx, char *name) -{ - struct mk_list *head; - struct flb_ml_parser *ml_parser; - - mk_list_foreach(head, &ctx->multiline_parsers) { - ml_parser = mk_list_entry(head, struct flb_ml_parser, _head); - if (strcasecmp(ml_parser->name, name) == 0) { - return ml_parser; - } - } - - return NULL; -} - -int flb_ml_parser_instance_has_data(struct flb_ml_parser_ins *ins) -{ - struct mk_list *head; - struct mk_list *head_group; - struct flb_ml_stream *st; - struct flb_ml_stream_group *st_group; - - mk_list_foreach(head, &ins->streams) { - st = mk_list_entry(head, struct flb_ml_stream, _head); - mk_list_foreach(head_group, &st->groups) { - st_group = mk_list_entry(head_group, struct flb_ml_stream_group, _head); - if (st_group->mp_sbuf.size > 0) { - return FLB_TRUE; - } - } - } - - return FLB_FALSE; -} - -struct flb_ml_parser_ins *flb_ml_parser_instance_create(struct flb_ml *ml, - char *name) -{ - int ret; - struct flb_ml_parser_ins *ins; - struct flb_ml_parser *parser; - - parser = flb_ml_parser_get(ml->config, name); - if (!parser) { - flb_error("[multiline] parser '%s' not registered", name); - return NULL; - } - - ins = flb_calloc(1, sizeof(struct flb_ml_parser_ins)); - if (!ins) { - flb_errno(); - return NULL; - } - ins->last_stream_id = 0; - ins->ml_parser = parser; - mk_list_init(&ins->streams); - - /* Copy parent configuration */ - if (parser->key_content) { - ins->key_content = flb_sds_create(parser->key_content); - } - if (parser->key_pattern) { - ins->key_pattern = flb_sds_create(parser->key_pattern); - } - if (parser->key_group) { - ins->key_group = flb_sds_create(parser->key_group); - } - - /* Append this multiline parser instance to the active multiline group */ - ret = flb_ml_group_add_parser(ml, ins); - if (ret != 0) { - flb_error("[multiline] could not register parser '%s' on " - "multiline '%s 'group", name, ml->name); - flb_free(ins); - return NULL; - } - - /* - * Update flush_interval for pending records on multiline context. We always - * use the greater value found. - */ - if (parser->flush_ms > ml->flush_ms) { - ml->flush_ms = parser->flush_ms; - } - - return ins; -} - -/* Override a fixed parser property for the instance only*/ -int flb_ml_parser_instance_set(struct flb_ml_parser_ins *p, char *prop, char *val) -{ - if (strcasecmp(prop, "key_content") == 0) { - if (p->key_content) { - flb_sds_destroy(p->key_content); - } - p->key_content = flb_sds_create(val); - } - else if (strcasecmp(prop, "key_pattern") == 0) { - if (p->key_pattern) { - flb_sds_destroy(p->key_pattern); - } - p->key_pattern = flb_sds_create(val); - } - else if (strcasecmp(prop, "key_group") == 0) { - if (p->key_group) { - flb_sds_destroy(p->key_group); - } - p->key_group = flb_sds_create(val); - } - else { - return -1; - } - - return 0; -} - -int flb_ml_parser_destroy(struct flb_ml_parser *ml_parser) -{ - if (!ml_parser) { - return 0; - } - - if (ml_parser->name) { - flb_sds_destroy(ml_parser->name); - } - - if (ml_parser->parser_name) { - flb_sds_destroy(ml_parser->parser_name); - } - - if (ml_parser->match_str) { - flb_sds_destroy(ml_parser->match_str); - } - if (ml_parser->key_content) { - flb_sds_destroy(ml_parser->key_content); - } - if (ml_parser->key_group) { - flb_sds_destroy(ml_parser->key_group); - } - if (ml_parser->key_pattern) { - flb_sds_destroy(ml_parser->key_pattern); - } - - /* Regex rules */ - flb_ml_rule_destroy_all(ml_parser); - - /* Unlink from struct flb_config->multiline_parsers */ - mk_list_del(&ml_parser->_head); - - flb_free(ml_parser); - return 0; -} - -int flb_ml_parser_instance_destroy(struct flb_ml_parser_ins *ins) -{ - struct mk_list *tmp; - struct mk_list *head; - struct flb_ml_stream *stream; - - /* Destroy streams */ - mk_list_foreach_safe(head, tmp, &ins->streams) { - stream = mk_list_entry(head, struct flb_ml_stream, _head); - flb_ml_stream_destroy(stream); - } - - if (ins->key_content) { - flb_sds_destroy(ins->key_content); - } - if (ins->key_pattern) { - flb_sds_destroy(ins->key_pattern); - } - if (ins->key_group) { - flb_sds_destroy(ins->key_group); - } - - flb_free(ins); - - return 0; -} - -void flb_ml_parser_destroy_all(struct mk_list *list) -{ - struct mk_list *tmp; - struct mk_list *head; - struct flb_ml_parser *parser; - - mk_list_foreach_safe(head, tmp, list) { - parser = mk_list_entry(head, struct flb_ml_parser, _head); - flb_ml_parser_destroy(parser); - } -} diff --git a/fluent-bit/src/multiline/flb_ml_parser_cri.c b/fluent-bit/src/multiline/flb_ml_parser_cri.c deleted file mode 100644 index 669fa39a..00000000 --- a/fluent-bit/src/multiline/flb_ml_parser_cri.c +++ /dev/null @@ -1,81 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/multiline/flb_ml.h> -#include <fluent-bit/multiline/flb_ml_parser.h> - -#define FLB_ML_CRI_REGEX \ - "^(?<time>.+?) (?<stream>stdout|stderr) (?<_p>F|P) (?<log>.*)$" -#define FLB_ML_CRI_TIME \ - "%Y-%m-%dT%H:%M:%S.%L%z" - -/* Creates a parser for Docker */ -static struct flb_parser *cri_parser_create(struct flb_config *config) -{ - struct flb_parser *p; - - p = flb_parser_create("_ml_cri", /* parser name */ - "regex", /* backend type */ - FLB_ML_CRI_REGEX, /* regex */ - FLB_FALSE, /* skip_empty */ - FLB_ML_CRI_TIME, /* time format */ - "time", /* time key */ - NULL, /* time offset */ - FLB_TRUE, /* time keep */ - FLB_FALSE, /* time strict */ - FLB_FALSE, /* no bare keys */ - NULL, /* parser types */ - 0, /* types len */ - NULL, /* decoders */ - config); /* Fluent Bit context */ - return p; -} - -/* Our first multiline mode: 'docker' */ -struct flb_ml_parser *flb_ml_parser_cri(struct flb_config *config) -{ - struct flb_parser *parser; - struct flb_ml_parser *mlp; - - /* Create a Docker parser */ - parser = cri_parser_create(config); - if (!parser) { - return NULL; - } - - mlp = flb_ml_parser_create(config, - "cri", /* name */ - FLB_ML_EQ, /* type */ - "F", /* match_str */ - FLB_FALSE, /* negate */ - FLB_ML_FLUSH_TIMEOUT, /* flush_ms */ - "log", /* key_content */ - "stream", /* key_group */ - "_p", /* key_pattern */ - parser, /* parser ctx */ - NULL); /* parser name */ - - if (!mlp) { - flb_error("[multiline] could not create 'cri mode'"); - return NULL; - } - - return mlp; -} diff --git a/fluent-bit/src/multiline/flb_ml_parser_docker.c b/fluent-bit/src/multiline/flb_ml_parser_docker.c deleted file mode 100644 index 5b622d32..00000000 --- a/fluent-bit/src/multiline/flb_ml_parser_docker.c +++ /dev/null @@ -1,110 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/multiline/flb_ml.h> -#include <fluent-bit/multiline/flb_ml_parser.h> - -/* Creates a parser for Docker */ -static struct flb_parser *docker_parser_create(struct flb_config *config) -{ - struct flb_parser *p; - - p = flb_parser_create("_ml_json_docker", /* parser name */ - "json", /* backend type */ - NULL, /* regex */ - FLB_TRUE, /* skip_empty */ - "%Y-%m-%dT%H:%M:%S.%L", /* time format */ - "time", /* time key */ - NULL, /* time offset */ - FLB_TRUE, /* time keep */ - FLB_FALSE, /* time strict */ - FLB_FALSE, /* no bare keys */ - NULL, /* parser types */ - 0, /* types len */ - NULL, /* decoders */ - config); /* Fluent Bit context */ - return p; -} - -/* Our first multiline mode: 'docker' */ -struct flb_ml_parser *flb_ml_parser_docker(struct flb_config *config) -{ - struct flb_parser *parser; - struct flb_ml_parser *mlp; - - /* Create a Docker parser */ - parser = docker_parser_create(config); - if (!parser) { - return NULL; - } - - /* - * Let's explain this multiline mode, then you (the reader) might want - * to submit a PR with new built-in modes :) - * - * Containerized apps under Docker writes logs to stdout/stderr. These streams - * (stdout/stderr) are handled by Docker, in most of cases the content is - * stored in a .json file in your file system. A message like "hey!" gets into - * a JSON map like this: - * - * {"log": "hey!\n", "stream": "stdout", "time": "2021-02-01T01:40:03.53412Z"} - * - * By Docker log spec, any 'log' key that "ends with a \n" it's a complete - * log record, but Docker also limits the log record size to 16KB, so a long - * message that does not fit into 16KB can be split in multiple JSON lines, - * the following example use short words to describe the context: - * - * - original message: 'one, two, three\n' - * - * Docker log interpretation: - * - * - {"log": "one, ", "stream": "stdout", "time": "2021-02-01T01:40:03.53413Z"} - * - {"log": "two, ", "stream": "stdout", "time": "2021-02-01T01:40:03.53414Z"} - * - {"log": "three\n", "stream": "stdout", "time": "2021-02-01T01:40:03.53415Z"} - * - * So every 'log' key that does not ends with '\n', it's a partial log record - * and for logging purposes it needs to be concatenated with further messages - * until a final '\n' is found. - * - * We setup the Multiline mode as follows: - * - * - Use the type 'FLB_ML_ENDSWITH' to specify that we expect the 'log' - * key must ends with a '\n' for complete messages, otherwise it means is - * a continuation message. In case a message is not complete just wait until - * 500 milliseconds (0.5 second) and flush the buffer. - */ - mlp = flb_ml_parser_create(config, /* Fluent Bit context */ - "docker", /* name */ - FLB_ML_ENDSWITH, /* type */ - "\n", /* match_str */ - FLB_FALSE, /* negate */ - FLB_ML_FLUSH_TIMEOUT, /* flush_ms */ - "log", /* key_content */ - "stream", /* key_group */ - NULL, /* key_pattern */ - parser, /* parser ctx */ - NULL); /* parser name */ - if (!mlp) { - flb_error("[multiline] could not create 'docker mode'"); - return NULL; - } - - return mlp; -} diff --git a/fluent-bit/src/multiline/flb_ml_parser_go.c b/fluent-bit/src/multiline/flb_ml_parser_go.c deleted file mode 100644 index f1cd5407..00000000 --- a/fluent-bit/src/multiline/flb_ml_parser_go.c +++ /dev/null @@ -1,140 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/multiline/flb_ml.h> -#include <fluent-bit/multiline/flb_ml_rule.h> -#include <fluent-bit/multiline/flb_ml_parser.h> - -#define rule flb_ml_rule_create - -static void rule_error(struct flb_ml_parser *mlp) -{ - int id; - - id = mk_list_size(&mlp->regex_rules); - flb_error("[multiline: go] rule #%i could not be created", id); - flb_ml_parser_destroy(mlp); -} - -/* Go mode */ -struct flb_ml_parser *flb_ml_parser_go(struct flb_config *config, char *key) -{ - int ret; - struct flb_ml_parser *mlp; - - mlp = flb_ml_parser_create(config, /* Fluent Bit context */ - "go", /* name */ - FLB_ML_REGEX, /* type */ - NULL, /* match_str */ - FLB_FALSE, /* negate */ - FLB_ML_FLUSH_TIMEOUT, /* flush_ms */ - key, /* key_content */ - NULL, /* key_group */ - NULL, /* key_pattern */ - NULL, /* parser ctx */ - NULL); /* parser name */ - - if (!mlp) { - flb_error("[multiline] could not create 'go mode'"); - return NULL; - } - - ret = rule(mlp, - "start_state", - "/\\bpanic: /", - "go_after_panic", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "start_state", - "/http: panic serving/", - "go_goroutine", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "go_after_panic", - "/^$/", - "go_goroutine", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "go_after_panic, go_after_signal, go_frame_1", - "/^$/", - "go_goroutine", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "go_after_panic", - "/^\\[signal /", - "go_after_signal", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "go_goroutine", - "/^goroutine \\d+ \\[[^\\]]+\\]:$/", - "go_frame_1", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "go_frame_1", - "/^(?:[^\\s.:]+\\.)*[^\\s.():]+\\(|^created by /", - "go_frame_2", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "go_frame_2", - "/^\\s/", - "go_frame_1", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - /* Map the rules (mandatory for regex rules) */ - ret = flb_ml_parser_init(mlp); - if (ret != 0) { - flb_error("[multiline: go] error on mapping rules"); - flb_ml_parser_destroy(mlp); - return NULL; - } - - return mlp; -} diff --git a/fluent-bit/src/multiline/flb_ml_parser_java.c b/fluent-bit/src/multiline/flb_ml_parser_java.c deleted file mode 100644 index 4df5a00f..00000000 --- a/fluent-bit/src/multiline/flb_ml_parser_java.c +++ /dev/null @@ -1,143 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/multiline/flb_ml.h> -#include <fluent-bit/multiline/flb_ml_rule.h> -#include <fluent-bit/multiline/flb_ml_parser.h> - -#define rule flb_ml_rule_create - -static void rule_error(struct flb_ml_parser *ml_parser) -{ - int id; - - id = mk_list_size(&ml_parser->regex_rules); - flb_error("[multiline: java] rule #%i could not be created", id); - flb_ml_parser_destroy(ml_parser); -} - -/* Java mode */ -struct flb_ml_parser *flb_ml_parser_java(struct flb_config *config, char *key) -{ - int ret; - struct flb_ml_parser *mlp; - - mlp = flb_ml_parser_create(config, /* Fluent Bit context */ - "java", /* name */ - FLB_ML_REGEX, /* type */ - NULL, /* match_str */ - FLB_FALSE, /* negate */ - FLB_ML_FLUSH_TIMEOUT, /* flush_ms */ - key, /* key_content */ - NULL, /* key_group */ - NULL, /* key_pattern */ - NULL, /* parser ctx */ - NULL); /* parser name */ - - if (!mlp) { - flb_error("[multiline] could not create 'java mode'"); - return NULL; - } - - ret = rule(mlp, - "start_state, java_start_exception", - "/(.)(?:Exception|Error|Throwable|V8 errors stack trace)[:\\r\\n]/", - "java_after_exception", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "java_after_exception", - "/^[\\t ]*nested exception is:[\\t ]*/", - "java_start_exception", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "java_after_exception", - "/^[\\r\\n]*$/", - "java_after_exception", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "java_after_exception, java", - "/^[\\t ]+(?:eval )?at /", - "java", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "java_after_exception, java", - /* C# nested exception */ - "/^[\\t ]+--- End of inner exception stack trace ---$/", - "java", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "java_after_exception, java", - /* C# exception from async code */ - "/^--- End of stack trace from previous (?x:" - ")location where exception was thrown ---$/", - "java", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "java_after_exception, java", - "/^[\\t ]*(?:Caused by|Suppressed):/", - "java_after_exception", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "java_after_exception, java", - "/^[\\t ]*... \\d+ (?:more|common frames omitted)/", - "java", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - /* Map the rules (mandatory for regex rules) */ - ret = flb_ml_parser_init(mlp); - if (ret != 0) { - flb_error("[multiline: java] error on mapping rules"); - flb_ml_parser_destroy(mlp); - return NULL; - } - - return mlp; -} diff --git a/fluent-bit/src/multiline/flb_ml_parser_python.c b/fluent-bit/src/multiline/flb_ml_parser_python.c deleted file mode 100644 index a9208839..00000000 --- a/fluent-bit/src/multiline/flb_ml_parser_python.c +++ /dev/null @@ -1,98 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/multiline/flb_ml.h> -#include <fluent-bit/multiline/flb_ml_rule.h> -#include <fluent-bit/multiline/flb_ml_parser.h> - -#define rule flb_ml_rule_create - -static void rule_error(struct flb_ml_parser *mlp) -{ - int id; - - id = mk_list_size(&mlp->regex_rules); - flb_error("[multiline: python] rule #%i could not be created", id); - flb_ml_parser_destroy(mlp); -} - -/* Python */ -struct flb_ml_parser *flb_ml_parser_python(struct flb_config *config, char *key) -{ - int ret; - struct flb_ml_parser *mlp; - - mlp = flb_ml_parser_create(config, /* Fluent Bit context */ - "python", /* name */ - FLB_ML_REGEX, /* type */ - NULL, /* match_str */ - FLB_FALSE, /* negate */ - FLB_ML_FLUSH_TIMEOUT, /* flush_ms */ - key, /* key_content */ - NULL, /* key_group */ - NULL, /* key_pattern */ - NULL, /* parser ctx */ - NULL); /* parser name */ - - if (!mlp) { - flb_error("[multiline] could not create 'python mode'"); - return NULL; - } - - /* rule(:start_state, /^Traceback \(most recent call last\):$/, :python) */ - ret = rule(mlp, - "start_state", "/^Traceback \\(most recent call last\\):$/", - "python", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - /* rule(:python, /^[\t ]+File /, :python_code) */ - ret = rule(mlp, "python", "/^[\\t ]+File /", "python_code", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - /* rule(:python_code, /[^\t ]/, :python) */ - ret = rule(mlp, "python_code", "/[^\\t ]/", "python", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - /* rule(:python, /^(?:[^\s.():]+\.)*[^\s.():]+:/, :start_state) */ - ret = rule(mlp, "python", "/^(?:[^\\s.():]+\\.)*[^\\s.():]+:/", "start_state", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - /* Map the rules (mandatory for regex rules) */ - ret = flb_ml_parser_init(mlp); - if (ret != 0) { - flb_error("[multiline: python] error on mapping rules"); - flb_ml_parser_destroy(mlp); - return NULL; - } - - return mlp; -} diff --git a/fluent-bit/src/multiline/flb_ml_parser_ruby.c b/fluent-bit/src/multiline/flb_ml_parser_ruby.c deleted file mode 100644 index 780f829d..00000000 --- a/fluent-bit/src/multiline/flb_ml_parser_ruby.c +++ /dev/null @@ -1,87 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/multiline/flb_ml.h> -#include <fluent-bit/multiline/flb_ml_rule.h> -#include <fluent-bit/multiline/flb_ml_parser.h> - -#define rule flb_ml_rule_create - -static void rule_error(struct flb_ml_parser *mlp) -{ - int id; - - id = mk_list_size(&mlp->regex_rules); - flb_error("[multiline: ruby] rule #%i could not be created", id); - flb_ml_parser_destroy(mlp); -} - -/* Ruby mode */ -struct flb_ml_parser *flb_ml_parser_ruby(struct flb_config *config, char *key) -{ - int ret; - struct flb_ml_parser *mlp; - - mlp = flb_ml_parser_create(config, /* Fluent Bit context */ - "ruby", /* name */ - FLB_ML_REGEX, /* type */ - NULL, /* match_str */ - FLB_FALSE, /* negate */ - FLB_ML_FLUSH_TIMEOUT, /* flush_ms */ - key, /* key_content */ - NULL, /* key_group */ - NULL, /* key_pattern */ - NULL, /* parser ctx */ - NULL); /* parser name */ - - if (!mlp) { - flb_error("[multiline] could not create 'ruby mode'"); - return NULL; - } - - ret = rule(mlp, - "start_state, ruby_start_exception", - "/^.+:\\d+:in\\s+.*/", - "ruby_after_exception", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - ret = rule(mlp, - "ruby_after_exception, ruby", - "/^\\s+from\\s+.*:\\d+:in\\s+.*/", - "ruby", NULL); - if (ret != 0) { - rule_error(mlp); - return NULL; - } - - - /* Map the rules (mandatory for regex rules) */ - ret = flb_ml_parser_init(mlp); - if (ret != 0) { - flb_error("[multiline: ruby] error on mapping rules"); - flb_ml_parser_destroy(mlp); - return NULL; - } - - return mlp; -} diff --git a/fluent-bit/src/multiline/flb_ml_rule.c b/fluent-bit/src/multiline/flb_ml_rule.c deleted file mode 100644 index 26520dfe..00000000 --- a/fluent-bit/src/multiline/flb_ml_rule.c +++ /dev/null @@ -1,421 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_regex.h> -#include <fluent-bit/flb_slist.h> - -#include <fluent-bit/multiline/flb_ml.h> -#include <fluent-bit/multiline/flb_ml_rule.h> - -struct to_state { - struct flb_ml_rule *rule; - struct mk_list _head; -}; - -struct flb_slist_entry *get_start_state(struct mk_list *list) -{ - struct mk_list *head; - struct flb_slist_entry *e; - - mk_list_foreach(head, list) { - e = mk_list_entry(head, struct flb_slist_entry, _head); - if (strcmp(e->str, "start_state") == 0) { - return e; - } - } - - return NULL; -} - -int flb_ml_rule_create(struct flb_ml_parser *ml_parser, - flb_sds_t from_states, - char *regex_pattern, - flb_sds_t to_state, - char *end_pattern) -{ - int ret; - int first_rule = FLB_FALSE; - struct flb_ml_rule *rule; - - rule = flb_calloc(1, sizeof(struct flb_ml_rule)); - if (!rule) { - flb_errno(); - return -1; - } - flb_slist_create(&rule->from_states); - mk_list_init(&rule->to_state_map); - - if (mk_list_size(&ml_parser->regex_rules) == 0) { - first_rule = FLB_TRUE; - } - mk_list_add(&rule->_head, &ml_parser->regex_rules); - - /* from_states */ - ret = flb_slist_split_string(&rule->from_states, from_states, ',', -1); - if (ret <= 0) { - flb_error("[multiline] rule is empty or has invalid 'from_states' tokens"); - flb_ml_rule_destroy(rule); - return -1; - } - - /* Check if the rule contains a 'start_state' */ - if (get_start_state(&rule->from_states)) { - rule->start_state = FLB_TRUE; - } - else if (first_rule) { - flb_error("[multiline] rule don't contain a 'start_state'"); - flb_ml_rule_destroy(rule); - return -1; - } - - /* regex content pattern */ - rule->regex = flb_regex_create(regex_pattern); - if (!rule->regex) { - flb_ml_rule_destroy(rule); - return -1; - } - - /* to_state */ - if (to_state) { - rule->to_state = flb_sds_create(to_state); - if (!rule->to_state) { - flb_ml_rule_destroy(rule); - return -1; - } - } - - /* regex end pattern */ - if (end_pattern) { - rule->regex_end = flb_regex_create(end_pattern); - if (!rule->regex_end) { - flb_ml_rule_destroy(rule); - return -1; - } - } - - return 0; -} - -void flb_ml_rule_destroy(struct flb_ml_rule *rule) -{ - struct mk_list *tmp; - struct mk_list *head; - struct to_state *st; - - flb_slist_destroy(&rule->from_states); - - if (rule->regex) { - flb_regex_destroy(rule->regex); - } - - - if (rule->to_state) { - flb_sds_destroy(rule->to_state); - } - - mk_list_foreach_safe(head, tmp, &rule->to_state_map) { - st = mk_list_entry(head, struct to_state, _head); - mk_list_del(&st->_head); - flb_free(st); - } - - if (rule->regex_end) { - flb_regex_destroy(rule->regex_end); - } - - mk_list_del(&rule->_head); - flb_free(rule); -} - -void flb_ml_rule_destroy_all(struct flb_ml_parser *ml_parser) -{ - struct mk_list *tmp; - struct mk_list *head; - struct flb_ml_rule *rule; - - mk_list_foreach_safe(head, tmp, &ml_parser->regex_rules) { - rule = mk_list_entry(head, struct flb_ml_rule, _head); - flb_ml_rule_destroy(rule); - } -} - -static inline int to_states_exists(struct flb_ml_parser *ml_parser, - flb_sds_t state) -{ - struct mk_list *head; - struct mk_list *r_head; - struct flb_ml_rule *rule; - struct flb_slist_entry *e; - - mk_list_foreach(head, &ml_parser->regex_rules) { - rule = mk_list_entry(head, struct flb_ml_rule, _head); - - mk_list_foreach(r_head, &rule->from_states) { - e = mk_list_entry(r_head, struct flb_slist_entry, _head); - if (strcmp(e->str, state) == 0) { - return FLB_TRUE; - } - } - } - - return FLB_FALSE; -} - -static inline int to_states_matches_rule(struct flb_ml_rule *rule, - flb_sds_t state) -{ - struct mk_list *head; - struct flb_slist_entry *e; - - mk_list_foreach(head, &rule->from_states) { - e = mk_list_entry(head, struct flb_slist_entry, _head); - if (strcmp(e->str, state) == 0) { - return FLB_TRUE; - } - } - - return FLB_FALSE; -} - -static int set_to_state_map(struct flb_ml_parser *ml_parser, - struct flb_ml_rule *rule) -{ - int ret; - struct to_state *s; - struct mk_list *head; - struct flb_ml_rule *r; - - if (!rule->to_state) { - /* no to_state */ - return 0; - } - - /* Iterate all rules that matches the to_state */ - mk_list_foreach(head, &ml_parser->regex_rules) { - r = mk_list_entry(head, struct flb_ml_rule, _head); - - /* Check if rule->to_state, matches an existing (registered) from_state */ - ret = to_states_exists(ml_parser, rule->to_state); - if (!ret) { - flb_error("[multiline parser: %s] to_state='%s' is not registered", - ml_parser->name, rule->to_state); - return -1; - } - - /* - * A rule can have many 'from_states', check if the current 'rule->to_state' - * matches any 'r->from_states' - */ - ret = to_states_matches_rule(r, rule->to_state); - if (!ret) { - continue; - } - - /* We have a match. Create a 'to_state' entry into the 'to_state_map' list */ - s = flb_malloc(sizeof(struct to_state)); - if (!s) { - flb_errno(); - return -1; - } - s->rule = r; - mk_list_add(&s->_head, &rule->to_state_map); - } - - return 0; -} - -static int try_flushing_buffer(struct flb_ml_parser *ml_parser, - struct flb_ml_stream *mst, - struct flb_ml_stream_group *group) -{ - int next_start = FLB_FALSE; - struct mk_list *head; - struct to_state *st; - struct flb_ml_rule *rule; - - rule = group->rule_to_state; - if (!rule) { - if (flb_sds_len(group->buf) > 0) { - flb_ml_flush_stream_group(ml_parser, mst, group, FLB_FALSE); - group->first_line = FLB_TRUE; - } - return 0; - } - - /* Check if any 'to_state_map' referenced rules is a possible start */ - mk_list_foreach(head, &rule->to_state_map) { - st = mk_list_entry(head, struct to_state, _head); - if (st->rule->start_state) { - next_start = FLB_TRUE; - break; - } - } - - if (next_start && flb_sds_len(group->buf) > 0) { - flb_ml_flush_stream_group(ml_parser, mst, group, FLB_FALSE); - group->first_line = FLB_TRUE; - } - - return 0; -} - -/* Initialize all rules */ -int flb_ml_rule_init(struct flb_ml_parser *ml_parser) -{ - int ret; - struct mk_list *head; - struct flb_ml_rule *rule; - - /* FIXME: sort rules by start_state first (let's trust in the caller) */ - - /* For each rule, compose it to_state_map list */ - mk_list_foreach(head, &ml_parser->regex_rules) { - rule = mk_list_entry(head, struct flb_ml_rule, _head); - /* Populate 'rule->to_state_map' list */ - ret = set_to_state_map(ml_parser, rule); - if (ret == -1) { - return -1; - } - } - - return 0; -} - -/* Search any 'start_state' matching the incoming 'buf_data' */ -static struct flb_ml_rule *try_start_state(struct flb_ml_parser *ml_parser, - char *buf_data, size_t buf_size) -{ - int ret = -1; - struct mk_list *head; - struct flb_ml_rule *rule = NULL; - - mk_list_foreach(head, &ml_parser->regex_rules) { - rule = mk_list_entry(head, struct flb_ml_rule, _head); - - /* Is this rule matching a start_state ? */ - if (!rule->start_state) { - rule = NULL; - continue; - } - - /* Matched a start_state. Check if we have a regex match */ - ret = flb_regex_match(rule->regex, (unsigned char *) buf_data, buf_size); - if (ret) { - return rule; - } - } - - return NULL; -} - -int flb_ml_rule_process(struct flb_ml_parser *ml_parser, - struct flb_ml_stream *mst, - struct flb_ml_stream_group *group, - msgpack_object *full_map, - void *buf, size_t size, struct flb_time *tm, - msgpack_object *val_content, - msgpack_object *val_pattern) -{ - int ret; - int len; - char *buf_data = NULL; - size_t buf_size = 0; - struct mk_list *head; - struct to_state *st = NULL; - struct flb_ml_rule *rule = NULL; - struct flb_ml_rule *tmp_rule = NULL; - - if (val_content) { - buf_data = (char *) val_content->via.str.ptr; - buf_size = val_content->via.str.size; - } - else { - buf_data = buf; - buf_size = size; - } - - if (group->rule_to_state) { - /* are we in a continuation ? */ - tmp_rule = group->rule_to_state; - - /* Lookup all possible next rules by state reference */ - rule = NULL; - mk_list_foreach(head, &tmp_rule->to_state_map) { - st = mk_list_entry(head, struct to_state, _head); - - /* skip start states */ - if (st->rule->start_state) { - continue; - } - - /* Try regex match */ - ret = flb_regex_match(st->rule->regex, - (unsigned char *) buf_data, buf_size); - if (ret) { - /* Regex matched */ - len = flb_sds_len(group->buf); - if (len >= 1 && group->buf[len - 1] != '\n') { - flb_sds_cat_safe(&group->buf, "\n", 1); - } - - if (buf_size == 0) { - flb_sds_cat_safe(&group->buf, "\n", 1); - } - else { - flb_sds_cat_safe(&group->buf, buf_data, buf_size); - } - rule = st->rule; - break; - } - rule = NULL; - } - - } - - if (!rule) { - /* Check if we are in a 'start_state' */ - rule = try_start_state(ml_parser, buf_data, buf_size); - if (rule) { - /* if the group buffer has any previous data just flush it */ - if (flb_sds_len(group->buf) > 0) { - flb_ml_flush_stream_group(ml_parser, mst, group, FLB_FALSE); - } - - /* set the rule state */ - group->rule_to_state = rule; - - /* concatenate the data */ - flb_sds_cat_safe(&group->buf, buf_data, buf_size); - - /* Copy full map content in stream buffer */ - flb_ml_register_context(group, tm, full_map); - - return 0; - } - } - - if (rule) { - group->rule_to_state = rule; - try_flushing_buffer(ml_parser, mst, group); - return 0; - } - - return -1; -} diff --git a/fluent-bit/src/multiline/flb_ml_stream.c b/fluent-bit/src/multiline/flb_ml_stream.c deleted file mode 100644 index c92ba322..00000000 --- a/fluent-bit/src/multiline/flb_ml_stream.c +++ /dev/null @@ -1,338 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <fluent-bit/flb_info.h> -#include <fluent-bit/flb_mem.h> -#include <fluent-bit/flb_log.h> -#include <fluent-bit/flb_pack.h> -#include <fluent-bit/multiline/flb_ml.h> -#include <fluent-bit/multiline/flb_ml_rule.h> -#include <cfl/cfl.h> - -static int ml_flush_stdout(struct flb_ml_parser *parser, - struct flb_ml_stream *mst, - void *data, char *buf_data, size_t buf_size) -{ - fprintf(stdout, "\n%s----- MULTILINE FLUSH -----%s\n", - ANSI_GREEN, ANSI_RESET); - - /* Print incoming flush buffer */ - flb_pack_print(buf_data, buf_size); - - fprintf(stdout, "%s----------- EOF -----------%s\n", - ANSI_GREEN, ANSI_RESET); - return 0; -} - -static struct flb_ml_stream_group *stream_group_create(struct flb_ml_stream *mst, - char *name, int len) -{ - struct flb_ml_stream_group *group; - - if (!name) { - name = "_default"; - } - - group = flb_calloc(1, sizeof(struct flb_ml_stream_group)); - if (!group) { - flb_errno(); - return NULL; - } - group->name = flb_sds_create_len(name, len); - if (!group->name) { - flb_free(group); - return NULL; - } - - /* status */ - group->first_line = FLB_TRUE; - - /* multiline buffer */ - group->buf = flb_sds_create_size(FLB_ML_BUF_SIZE); - if (!group->buf) { - flb_error("cannot allocate multiline stream buffer in group %s", name); - flb_sds_destroy(group->name); - flb_free(group); - return NULL; - } - - /* msgpack buffer */ - msgpack_sbuffer_init(&group->mp_md_sbuf); - msgpack_packer_init(&group->mp_md_pck, &group->mp_md_sbuf, msgpack_sbuffer_write); - - msgpack_sbuffer_init(&group->mp_sbuf); - msgpack_packer_init(&group->mp_pck, &group->mp_sbuf, msgpack_sbuffer_write); - - mk_list_add(&group->_head, &mst->groups); - - return group; -} - -struct flb_ml_stream_group *flb_ml_stream_group_get(struct flb_ml_parser_ins *parser_i, - struct flb_ml_stream *mst, - msgpack_object *group_name) -{ - int len; - char *name; - struct flb_ml_parser *mlp; - struct mk_list *head; - struct flb_ml_stream_group *group = NULL; - - mlp = parser_i->ml_parser; - - /* If key_group was not defined, we already have a default group */ - if (!mlp->key_group || !group_name) { - group = mk_list_entry_first(&mst->groups, - struct flb_ml_stream_group, - _head); - return group; - } - - /* Lookup for a candidate group */ - len = group_name->via.str.size; - name = (char *)group_name->via.str.ptr; - - mk_list_foreach(head, &mst->groups) { - group = mk_list_entry(head, struct flb_ml_stream_group, _head); - if (flb_sds_cmp(group->name, name, len) == 0) { - return group; - } - else { - group = NULL; - continue; - } - } - - /* No group has been found, create a new one */ - if (mk_list_size(&mst->groups) >= FLB_ML_MAX_GROUPS) { - flb_error("[multiline] stream %s exceeded number of allowed groups (%i)", - mst->name, FLB_ML_MAX_GROUPS); - return NULL; - } - - group = stream_group_create(mst, name, len); - return group; -} - -static void stream_group_destroy(struct flb_ml_stream_group *group) -{ - if (group->name) { - flb_sds_destroy(group->name); - } - if (group->buf) { - flb_sds_destroy(group->buf); - } - - msgpack_sbuffer_destroy(&group->mp_md_sbuf); - msgpack_sbuffer_destroy(&group->mp_sbuf); - - mk_list_del(&group->_head); - flb_free(group); -} - -static void stream_group_destroy_all(struct flb_ml_stream *mst) -{ - struct mk_list *tmp; - struct mk_list *head; - struct flb_ml_stream_group *group; - - mk_list_foreach_safe(head, tmp, &mst->groups) { - group = mk_list_entry(head, struct flb_ml_stream_group, _head); - stream_group_destroy(group); - } -} - -static int stream_group_init(struct flb_ml_stream *stream) -{ - struct flb_ml_stream_group *group = NULL; - - mk_list_init(&stream->groups); - - /* create a default group */ - group = stream_group_create(stream, NULL, 0); - if (!group) { - flb_error("[multiline] error initializing default group for " - "stream '%s'", stream->name); - return -1; - } - - return 0; -} - -static struct flb_ml_stream *stream_create(struct flb_ml *ml, - uint64_t id, - struct flb_ml_parser_ins *parser, - int (*cb_flush) (struct flb_ml_parser *, - struct flb_ml_stream *, - void *cb_data, - char *buf_data, - size_t buf_size), - void *cb_data) -{ - int ret; - struct flb_ml_stream *stream; - - stream = flb_calloc(1, sizeof(struct flb_ml_stream)); - if (!stream) { - flb_errno(); - return NULL; - } - stream->ml = ml; - stream->id = id; - stream->parser = parser; - - /* Flush Callback and opaque data type */ - if (cb_flush) { - stream->cb_flush = cb_flush; - } - else { - stream->cb_flush = ml_flush_stdout; - } - stream->cb_data = cb_data; - - ret = stream_group_init(stream); - if (ret != 0) { - flb_free(stream); - return NULL; - } - - mk_list_add(&stream->_head, &parser->streams); - return stream; -} - -int flb_ml_stream_create(struct flb_ml *ml, - char *name, - int name_len, - int (*cb_flush) (struct flb_ml_parser *, - struct flb_ml_stream *, - void *cb_data, - char *buf_data, - size_t buf_size), - void *cb_data, - uint64_t *stream_id) -{ - uint64_t id; - struct mk_list *head; - struct mk_list *head_group; - struct flb_ml_stream *mst; - struct flb_ml_group *group; - struct flb_ml_parser_ins *parser; - - if (!name) { - return -1; - } - - if (name_len <= 0) { - name_len = strlen(name); - } - - /* Set the stream id by creating a hash using the name */ - id = cfl_hash_64bits(name, name_len); - - /* For every group and parser, create a stream for this stream_id/hash */ - mk_list_foreach(head, &ml->groups) { - group = mk_list_entry(head, struct flb_ml_group, _head); - mk_list_foreach(head_group, &group->parsers) { - parser = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); - - /* Check if the stream already exists on the parser */ - if (flb_ml_stream_get(parser, id) != NULL) { - continue; - } - - /* Create the stream */ - mst = stream_create(ml, id, parser, cb_flush, cb_data); - if (!mst) { - flb_error("[multiline] could not create stream_id=%" PRIu64 - "for stream '%s' on parser '%s'", - *stream_id, name, parser->ml_parser->name); - return -1; - } - } - } - - *stream_id = id; - return 0; -} - -struct flb_ml_stream *flb_ml_stream_get(struct flb_ml_parser_ins *parser, - uint64_t stream_id) -{ - struct mk_list *head; - struct flb_ml_stream *mst = NULL; - - mk_list_foreach(head, &parser->streams) { - mst = mk_list_entry(head, struct flb_ml_stream, _head); - if (mst->id == stream_id) { - return mst; - } - } - - return NULL; -} - -void flb_ml_stream_id_destroy_all(struct flb_ml *ml, uint64_t stream_id) -{ - struct mk_list *tmp; - struct mk_list *head; - struct mk_list *head_group; - struct mk_list *head_stream; - struct flb_ml_group *group; - struct flb_ml_stream *mst; - struct flb_ml_parser_ins *parser_i; - - /* groups */ - mk_list_foreach(head, &ml->groups) { - group = mk_list_entry(head, struct flb_ml_group, _head); - - /* parser instances */ - mk_list_foreach(head_group, &group->parsers) { - parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); - - /* streams */ - mk_list_foreach_safe(head_stream, tmp, &parser_i->streams) { - mst = mk_list_entry(head_stream, struct flb_ml_stream, _head); - if (mst->id != stream_id) { - continue; - } - - /* flush any pending data */ - flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_TRUE); - - /* destroy internal groups of the stream */ - flb_ml_stream_destroy(mst); - } - } - } -} - -int flb_ml_stream_destroy(struct flb_ml_stream *mst) -{ - mk_list_del(&mst->_head); - if (mst->name) { - flb_sds_destroy(mst->name); - } - - /* destroy groups */ - stream_group_destroy_all(mst); - - flb_free(mst); - - return 0; -} |