diff options
Diffstat (limited to 'fluent-bit/src/multiline')
-rw-r--r-- | fluent-bit/src/multiline/CMakeLists.txt | 15 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml.c | 1562 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_group.c | 86 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_mode.c | 111 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_parser.c | 347 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_parser_cri.c | 81 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_parser_docker.c | 110 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_parser_go.c | 140 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_parser_java.c | 143 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_parser_python.c | 98 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_parser_ruby.c | 87 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_rule.c | 421 | ||||
-rw-r--r-- | fluent-bit/src/multiline/flb_ml_stream.c | 338 |
13 files changed, 3539 insertions, 0 deletions
diff --git a/fluent-bit/src/multiline/CMakeLists.txt b/fluent-bit/src/multiline/CMakeLists.txt new file mode 100644 index 000000000..294ef3e8f --- /dev/null +++ b/fluent-bit/src/multiline/CMakeLists.txt @@ -0,0 +1,15 @@ +set(src_multiline + # built-in parsers + multiline/flb_ml_parser_cri.c + multiline/flb_ml_parser_docker.c + multiline/flb_ml_parser_python.c + multiline/flb_ml_parser_java.c + multiline/flb_ml_parser_go.c + multiline/flb_ml_parser_ruby.c + # core + multiline/flb_ml_stream.c + multiline/flb_ml_parser.c + multiline/flb_ml_group.c + multiline/flb_ml_rule.c + multiline/flb_ml.c PARENT_SCOPE + ) diff --git a/fluent-bit/src/multiline/flb_ml.c b/fluent-bit/src/multiline/flb_ml.c new file mode 100644 index 000000000..28e123cee --- /dev/null +++ b/fluent-bit/src/multiline/flb_ml.c @@ -0,0 +1,1562 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_regex.h> +#include <fluent-bit/flb_time.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_scheduler.h> +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_rule.h> +#include <fluent-bit/multiline/flb_ml_group.h> + +#include <stdarg.h> +#include <math.h> + +static inline int match_negate(struct flb_ml_parser *ml_parser, int matched) +{ + int rule_match = matched; + + /* Validate pattern matching against expected 'negate' condition */ + if (matched == FLB_TRUE) { + if (ml_parser->negate == FLB_FALSE) { + rule_match = FLB_TRUE; + } + else { + rule_match = FLB_FALSE; + } + } + else { + if (ml_parser->negate == FLB_TRUE) { + rule_match = FLB_TRUE; + } + } + + return rule_match; +} + +static uint64_t time_ms_now() +{ + uint64_t ms; + struct flb_time tm; + + flb_time_get(&tm); + ms = (tm.tm.tv_sec * 1000) + lround(tm.tm.tv_nsec/1.0e6); + return ms; +} + + +int flb_ml_flush_stdout(struct flb_ml_parser *parser, + struct flb_ml_stream *mst, + void *data, char *buf_data, size_t buf_size) +{ + fprintf(stdout, "\n%s----- MULTILINE FLUSH (stream_id=%" PRIu64 ") -----%s\n", + ANSI_GREEN, mst->id, ANSI_RESET); + + /* Print incoming flush buffer */ + flb_pack_print(buf_data, buf_size); + + fprintf(stdout, "%s----------- EOF -----------%s\n", + ANSI_GREEN, ANSI_RESET); + return 0; +} + +int flb_ml_type_lookup(char *str) +{ + int type = -1; + + if (strcasecmp(str, "regex") == 0) { + type = FLB_ML_REGEX; + } + else if (strcasecmp(str, "endswith") == 0) { + type = FLB_ML_ENDSWITH; + } + else if (strcasecmp(str, "equal") == 0 || strcasecmp(str, "eq") == 0) { + type = FLB_ML_EQ; + } + + return type; +} + +void flb_ml_flush_parser_instance(struct flb_ml *ml, + struct flb_ml_parser_ins *parser_i, + uint64_t stream_id, int forced_flush) +{ + struct mk_list *head; + struct mk_list *head_group; + struct flb_ml_stream *mst; + struct flb_ml_stream_group *group; + + mk_list_foreach(head, &parser_i->streams) { + mst = mk_list_entry(head, struct flb_ml_stream, _head); + if (stream_id != 0 && mst->id != stream_id) { + continue; + } + + /* Iterate stream groups */ + mk_list_foreach(head_group, &mst->groups) { + group = mk_list_entry(head_group, struct flb_ml_stream_group, _head); + flb_ml_flush_stream_group(parser_i->ml_parser, mst, group, forced_flush); + } + } +} + +void flb_ml_flush_pending(struct flb_ml *ml, uint64_t now, int forced_flush) +{ + struct mk_list *head; + struct flb_ml_parser_ins *parser_i; + struct flb_ml_group *group; + + /* set the last flush time */ + ml->last_flush = now; + + /* flush only the first group of the context */ + group = mk_list_entry_first(&ml->groups, struct flb_ml_group, _head); + + /* iterate group parser instances */ + mk_list_foreach(head, &group->parsers) { + parser_i = mk_list_entry(head, struct flb_ml_parser_ins, _head); + flb_ml_flush_parser_instance(ml, parser_i, 0, forced_flush); + } +} + +void flb_ml_flush_pending_now(struct flb_ml *ml) +{ + uint64_t now; + + now = time_ms_now(); + flb_ml_flush_pending(ml, now, FLB_TRUE); +} + +static void cb_ml_flush_timer(struct flb_config *ctx, void *data) +{ + uint64_t now; + struct flb_ml *ml = data; + + now = time_ms_now(); + if (ml->last_flush + ml->flush_ms > now) { + return; + } + + /* + * Iterate over all streams and groups and for a flush for expired groups + * which has not flushed in the last N milliseconds. + */ + flb_ml_flush_pending(ml, now, FLB_TRUE); +} + +int flb_ml_register_context(struct flb_ml_stream_group *group, + struct flb_time *tm, msgpack_object *map) +{ + if (tm) { + flb_time_copy(&group->mp_time, tm); + } + + if (map) { + msgpack_pack_object(&group->mp_pck, *map); + } + + return 0; +} + +static inline void breakline_prepare(struct flb_ml_parser_ins *parser_i, + struct flb_ml_stream_group *stream_group) +{ + int len; + + if (parser_i->key_content) { + return; + } + + len = flb_sds_len(stream_group->buf); + if (len <= 0) { + return; + } + + if (stream_group->buf[len - 1] != '\n') { + flb_sds_cat_safe(&stream_group->buf, "\n", 1); + } +} + +/* + * package content into a multiline stream: + * + * full_map: if the original content to process comes in msgpack map, this variable + * reference the map. It's only used in case we will package a first line so we + * store a copy of the other key values in the map for flush time. + */ +static int package_content(struct flb_ml_stream *mst, + msgpack_object *metadata, + msgpack_object *full_map, + void *buf, size_t size, struct flb_time *tm, + msgpack_object *val_content, + msgpack_object *val_pattern, + msgpack_object *val_group) +{ + int len; + int ret; + int rule_match = FLB_FALSE; + int processed = FLB_FALSE; + int type; + size_t offset = 0; + size_t buf_size; + char *buf_data; + msgpack_object *val = val_content; + struct flb_ml_parser *parser; + struct flb_ml_parser_ins *parser_i; + struct flb_ml_stream_group *stream_group; + + parser_i = mst->parser; + parser = parser_i->ml_parser; + + /* Get stream group */ + stream_group = flb_ml_stream_group_get(mst->parser, mst, val_group); + if (!mst->last_stream_group) { + mst->last_stream_group = stream_group; + } + else { + if (mst->last_stream_group != stream_group) { + mst->last_stream_group = stream_group; + } + } + + /* Set the parser type */ + type = parser->type; + + if (val_pattern) { + val = val_pattern; + } + + if (val) { + buf_data = (char *) val->via.str.ptr; + buf_size = val->via.str.size; + } + else { + buf_data = buf; + buf_size = size; + + } + if (type == FLB_ML_REGEX) { + ret = flb_ml_rule_process(parser, mst, + stream_group, full_map, buf, size, tm, + val_content, val_pattern); + if (ret == -1) { + processed = FLB_FALSE; + } + else { + processed = FLB_TRUE; + } + } + else if (type == FLB_ML_ENDSWITH) { + len = flb_sds_len(parser->match_str); + if (buf_data && len <= buf_size) { + /* Validate if content ends with expected string */ + offset = buf_size - len; + ret = memcmp(buf_data + offset, parser->match_str, len); + if (ret == 0) { + rule_match = match_negate(parser, FLB_TRUE); + } + else { + rule_match = match_negate(parser, FLB_FALSE); + } + + if (stream_group->mp_sbuf.size == 0) { + flb_ml_register_context(stream_group, tm, full_map); + } + + /* Prepare concatenation */ + breakline_prepare(parser_i, stream_group); + + /* Concatenate value */ + if (val_content) { + flb_sds_cat_safe(&stream_group->buf, + val_content->via.str.ptr, + val_content->via.str.size); + } + else { + flb_sds_cat_safe(&stream_group->buf, buf_data, buf_size); + } + + /* on ENDSWITH mode, a rule match means flush the content */ + if (rule_match) { + flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE); + } + processed = FLB_TRUE; + } + } + else if (type == FLB_ML_EQ) { + if (buf_size == flb_sds_len(parser->match_str) && + memcmp(buf_data, parser->match_str, buf_size) == 0) { + /* EQ match */ + rule_match = match_negate(parser, FLB_TRUE); + } + else { + rule_match = match_negate(parser, FLB_FALSE); + } + + if (stream_group->mp_sbuf.size == 0) { + flb_ml_register_context(stream_group, tm, full_map); + } + + /* Prepare concatenation */ + breakline_prepare(parser_i, stream_group); + + /* Concatenate value */ + if (val_content) { + flb_sds_cat_safe(&stream_group->buf, + val_content->via.str.ptr, + val_content->via.str.size); + } + else { + flb_sds_cat_safe(&stream_group->buf, buf_data, buf_size); + } + + /* on ENDSWITH mode, a rule match means flush the content */ + if (rule_match) { + flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE); + } + processed = FLB_TRUE; + } + + if (processed && metadata != NULL) { + msgpack_pack_object(&stream_group->mp_md_pck, *metadata); + } + + return processed; +} + +/* + * Retrieve the ID of a specific key name in a map. This function might be + * extended later to use record accessor, since all current cases are solved + * now quering the first level of keys in the map, we avoid record accessor + * to avoid extra memory allocations. + */ +static int get_key_id(msgpack_object *map, flb_sds_t key_name) +{ + int i; + int len; + int found = FLB_FALSE; + msgpack_object key; + msgpack_object val; + + if (!key_name) { + return -1; + } + + len = flb_sds_len(key_name); + for (i = 0; i < map->via.map.size; i++) { + key = map->via.map.ptr[i].key; + val = map->via.map.ptr[i].val; + + if (key.type != MSGPACK_OBJECT_STR || val.type != MSGPACK_OBJECT_STR) { + continue; + } + + if (key.via.str.size != len) { + continue; + } + + if (strncmp(key.via.str.ptr, key_name, len) == 0) { + found = FLB_TRUE; + break; + } + } + + if (found) { + return i; + } + + return -1; +} + +static int process_append(struct flb_ml_parser_ins *parser_i, + struct flb_ml_stream *mst, + int type, + struct flb_time *tm, + msgpack_object *metadata, + msgpack_object *obj, + void *buf, size_t size) +{ + int ret; + int id_content = -1; + int id_pattern = -1; + int id_group = -1; + int unpacked = FLB_FALSE; + size_t off = 0; + msgpack_object *full_map = NULL; + msgpack_object *val_content = NULL; + msgpack_object *val_pattern = NULL; + msgpack_object *val_group = NULL; + msgpack_unpacked result; + + /* Lookup the key */ + if (type == FLB_ML_TYPE_TEXT) { + ret = package_content(mst, NULL, NULL, buf, size, tm, NULL, NULL, NULL); + if (ret == FLB_FALSE) { + return -1; + } + return 0; + } + else if (type == FLB_ML_TYPE_MAP) { + full_map = obj; + /* + * When full_map and buf is not NULL, + * we use 'buf' since buf is already processed from full_map at + * ml_append_try_parser_type_map. + */ + if (!full_map || (buf != NULL && full_map != NULL)) { + off = 0; + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, buf, size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + return -1; + } + full_map = &result.data; + unpacked = FLB_TRUE; + } + else if (full_map->type != MSGPACK_OBJECT_MAP) { + msgpack_unpacked_destroy(&result); + return -1; + } + } + + /* Lookup for key_content entry */ + id_content = get_key_id(full_map, parser_i->key_content); + if (id_content == -1) { + if (unpacked) { + msgpack_unpacked_destroy(&result); + } + return -1; + } + + val_content = &full_map->via.map.ptr[id_content].val; + if (val_content->type != MSGPACK_OBJECT_STR) { + val_content = NULL; + } + + /* Optional: Lookup for key_pattern entry */ + if (parser_i->key_pattern) { + id_pattern = get_key_id(full_map, parser_i->key_pattern); + if (id_pattern >= 0) { + val_pattern = &full_map->via.map.ptr[id_pattern].val; + if (val_pattern->type != MSGPACK_OBJECT_STR) { + val_pattern = NULL; + } + } + } + + /* Optional: lookup for key_group entry */ + if (parser_i->key_group) { + id_group = get_key_id(full_map, parser_i->key_group); + if (id_group >= 0) { + val_group = &full_map->via.map.ptr[id_group].val; + if (val_group->type != MSGPACK_OBJECT_STR) { + val_group = NULL; + } + } + } + + /* Package the content */ + ret = package_content(mst, metadata, full_map, buf, size, tm, + val_content, val_pattern, val_group); + if (unpacked) { + msgpack_unpacked_destroy(&result); + } + if (ret == FLB_FALSE) { + return -1; + } + return 0; +} + +static int ml_append_try_parser_type_text(struct flb_ml_parser_ins *parser, + uint64_t stream_id, + int *type, + struct flb_time *tm, void *buf, size_t size, + msgpack_object *map, + void **out_buf, size_t *out_size, int *out_release, + struct flb_time *out_time) +{ + int ret; + + if (parser->ml_parser->parser) { + /* Parse incoming content */ + ret = flb_parser_do(parser->ml_parser->parser, (char *) buf, size, + out_buf, out_size, out_time); + if (flb_time_to_nanosec(out_time) == 0L) { + flb_time_copy(out_time, tm); + } + if (ret >= 0) { + *out_release = FLB_TRUE; + *type = FLB_ML_TYPE_MAP; + } + else { + *out_buf = buf; + *out_size = size; + return -1; + } + } + else { + *out_buf = buf; + *out_size = size; + } + return 0; +} + +static int ml_append_try_parser_type_map(struct flb_ml_parser_ins *parser, + uint64_t stream_id, + int *type, + struct flb_time *tm, void *buf, size_t size, + msgpack_object *map, + void **out_buf, size_t *out_size, int *out_release, + struct flb_time *out_time) +{ + int map_size; + int i; + int len; + msgpack_object key; + msgpack_object val; + + if (map == NULL || map->type != MSGPACK_OBJECT_MAP) { + flb_error("%s:invalid map", __FUNCTION__); + return -1; + } + + if (parser->ml_parser->parser) { + /* lookup key_content */ + + len = flb_sds_len(parser->key_content); + map_size = map->via.map.size; + for(i = 0; i < map_size; i++) { + key = map->via.map.ptr[i].key; + val = map->via.map.ptr[i].val; + if (key.type == MSGPACK_OBJECT_STR && + parser->key_content && + key.via.str.size == len && + strncmp(key.via.str.ptr, parser->key_content, len) == 0) { + /* key_content found */ + if (val.type == MSGPACK_OBJECT_STR) { + /* try parse the value of key_content e*/ + return ml_append_try_parser_type_text(parser, stream_id, type, + tm, (void*) val.via.str.ptr, + val.via.str.size, + map, + out_buf, out_size, out_release, + out_time); + } else { + flb_error("%s: not string", __FUNCTION__); + return -1; + } + } + } + } + else { + *out_buf = buf; + *out_size = size; + } + return 0; +} + +static int ml_append_try_parser(struct flb_ml_parser_ins *parser, + uint64_t stream_id, + int type, + struct flb_time *tm, void *buf, size_t size, + msgpack_object *metadata, + msgpack_object *map) +{ + int ret; + int release = FLB_FALSE; + void *out_buf = NULL; + size_t out_size = 0; + struct flb_time out_time; + struct flb_ml_stream *mst; + + flb_time_zero(&out_time); + + switch (type) { + case FLB_ML_TYPE_TEXT: + ret = ml_append_try_parser_type_text(parser, stream_id, &type, + tm, buf, size, map, + &out_buf, &out_size, &release, + &out_time); + if (ret < 0) { + return -1; + } + break; + case FLB_ML_TYPE_MAP: + ret = ml_append_try_parser_type_map(parser, stream_id, &type, + tm, buf, size, map, + &out_buf, &out_size, &release, + &out_time); + if (ret < 0) { + return -1; + } + break; + + default: + flb_error("[multiline] unknown type=%d", type); + return -1; + } + + if (flb_time_to_nanosec(&out_time) == 0L) { + if (tm && flb_time_to_nanosec(tm) != 0L) { + flb_time_copy(&out_time, tm); + } + else { + flb_time_get(&out_time); + } + } + + /* Get the stream */ + mst = flb_ml_stream_get(parser, stream_id); + if (!mst) { + flb_error("[multiline] invalid stream_id %" PRIu64 ", could not " + "append content to multiline context", stream_id); + goto exit; + } + + /* Process the binary record */ + ret = process_append(parser, mst, type, &out_time, metadata, map, out_buf, out_size); + if (ret == -1) { + if (release == FLB_TRUE) { + flb_free(out_buf); + } + return -1; + } + + exit: + if (release == FLB_TRUE) { + flb_free(out_buf); + } + + return 0; +} + +int flb_ml_append_text(struct flb_ml *ml, uint64_t stream_id, + struct flb_time *tm, void *buf, size_t size) +{ + int ret; + int processed = FLB_FALSE; + struct mk_list *head; + struct mk_list *head_group; + struct flb_ml_group *group; + struct flb_ml_stream *mst; + struct flb_ml_parser_ins *lru_parser = NULL; + struct flb_ml_parser_ins *parser_i; + struct flb_time out_time; + struct flb_ml_stream_group *st_group; + int type; + + type = FLB_ML_TYPE_TEXT; + + flb_time_zero(&out_time); + + mk_list_foreach(head, &ml->groups) { + group = mk_list_entry(head, struct flb_ml_group, _head); + + /* Check if the incoming data matches the last recently used parser */ + lru_parser = group->lru_parser; + + if (lru_parser && lru_parser->last_stream_id == stream_id) { + ret = ml_append_try_parser(lru_parser, lru_parser->last_stream_id, type, + tm, buf, size, NULL, NULL); + if (ret == 0) { + processed = FLB_TRUE; + break; + } + else { + flb_ml_flush_parser_instance(ml, + lru_parser, + lru_parser->last_stream_id, + FLB_FALSE); + } + } + else if (lru_parser && lru_parser->last_stream_id > 0) { + /* + * Clear last recently used parser to match new parser. + * Do not flush last_stream_id since it should continue to parsing. + */ + lru_parser = NULL; + } + } + + mk_list_foreach(head_group, &group->parsers) { + parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); + if (lru_parser && lru_parser == parser_i && + lru_parser->last_stream_id == stream_id) { + continue; + } + + ret = ml_append_try_parser(parser_i, stream_id, type, + tm, buf, size, NULL, NULL); + if (ret == 0) { + group->lru_parser = parser_i; + group->lru_parser->last_stream_id = stream_id; + lru_parser = parser_i; + processed = FLB_TRUE; + break; + } + else { + parser_i = NULL; + } + } + + if (!processed) { + if (lru_parser) { + flb_ml_flush_parser_instance(ml, lru_parser, stream_id, FLB_FALSE); + parser_i = lru_parser; + } + else { + /* get the first parser (just to make use of it buffers) */ + parser_i = mk_list_entry_first(&group->parsers, + struct flb_ml_parser_ins, + _head); + } + + flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE); + mst = flb_ml_stream_get(parser_i, stream_id); + if (!mst) { + flb_error("[multiline] invalid stream_id %" PRIu64 ", could not " + "append content to multiline context", stream_id); + return -1; + } + + /* Get stream group */ + st_group = flb_ml_stream_group_get(mst->parser, mst, NULL); + flb_sds_cat_safe(&st_group->buf, buf, size); + flb_ml_flush_stream_group(parser_i->ml_parser, mst, st_group, FLB_FALSE); + } + + return 0; +} + + + +int flb_ml_append_object(struct flb_ml *ml, + uint64_t stream_id, + struct flb_time *tm, + msgpack_object *metadata, + msgpack_object *obj) +{ + int ret; + int type; + int processed = FLB_FALSE; + struct mk_list *head; + struct mk_list *head_group; + struct flb_ml_group *group; + struct flb_ml_parser_ins *lru_parser = NULL; + struct flb_ml_parser_ins *parser_i; + struct flb_ml_stream *mst; + struct flb_ml_stream_group *st_group; + struct flb_log_event event; + + if (metadata == NULL) { + metadata = ml->log_event_decoder.empty_map; + } + + /* + * As incoming objects, we accept packed events + * and msgpack Maps containing key/value pairs. + */ + if (obj->type == MSGPACK_OBJECT_ARRAY) { + flb_error("[multiline] appending object with invalid type, expected " + "map, received type=%i", obj->type); + return -1; + + + flb_log_event_decoder_reset(&ml->log_event_decoder, NULL, 0); + + ret = flb_event_decoder_decode_object(&ml->log_event_decoder, + &event, + obj); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_error("[multiline] invalid event object"); + + return -1; + } + + tm = &event.timestamp; + obj = event.body; + metadata = event.metadata; + + type = FLB_ML_TYPE_MAP; + } + else if (obj->type == MSGPACK_OBJECT_MAP) { + type = FLB_ML_TYPE_MAP; + } + else { + flb_error("[multiline] appending object with invalid type, expected " + "array or map, received type=%i", obj->type); + return -1; + } + + mk_list_foreach(head, &ml->groups) { + group = mk_list_entry(head, struct flb_ml_group, _head); + + /* Check if the incoming data matches the last recently used parser */ + lru_parser = group->lru_parser; + + if (lru_parser && lru_parser->last_stream_id == stream_id) { + ret = ml_append_try_parser(lru_parser, lru_parser->last_stream_id, type, + tm, NULL, 0, metadata, obj); + if (ret == 0) { + processed = FLB_TRUE; + break; + } + else { + flb_ml_flush_parser_instance(ml, + lru_parser, + lru_parser->last_stream_id, + FLB_FALSE); + } + } + else if (lru_parser && lru_parser->last_stream_id > 0) { + /* + * Clear last recently used parser to match new parser. + * Do not flush last_stream_id since it should continue to parsing. + */ + lru_parser = NULL; + } + } + + mk_list_foreach(head_group, &group->parsers) { + parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); + if (lru_parser && parser_i == lru_parser) { + continue; + } + + ret = ml_append_try_parser(parser_i, stream_id, type, + tm, NULL, 0, metadata, obj); + if (ret == 0) { + group->lru_parser = parser_i; + group->lru_parser->last_stream_id = stream_id; + lru_parser = parser_i; + processed = FLB_TRUE; + break; + } + else { + parser_i = NULL; + } + } + + if (!processed) { + if (lru_parser) { + flb_ml_flush_parser_instance(ml, lru_parser, stream_id, FLB_FALSE); + parser_i = lru_parser; + } + else { + /* get the first parser (just to make use of it buffers) */ + parser_i = mk_list_entry_first(&group->parsers, + struct flb_ml_parser_ins, + _head); + } + + flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE); + mst = flb_ml_stream_get(parser_i, stream_id); + if (!mst) { + flb_error("[multiline] invalid stream_id %" PRIu64 ", could not " + "append content to multiline context", stream_id); + + return -1; + } + + /* Get stream group */ + st_group = flb_ml_stream_group_get(mst->parser, mst, NULL); + + ret = flb_log_event_encoder_begin_record(&ml->log_event_encoder); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_timestamp( + &ml->log_event_encoder, tm); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + if (metadata != ml->log_event_decoder.empty_map) { + ret = flb_log_event_encoder_set_metadata_from_msgpack_object( + &ml->log_event_encoder, metadata); + } + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_body_from_msgpack_object( + &ml->log_event_encoder, obj); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(&ml->log_event_encoder); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + mst->cb_flush(parser_i->ml_parser, + mst, + mst->cb_data, + ml->log_event_encoder.output_buffer, + ml->log_event_encoder.output_length); + } + else { + flb_error("[multiline] log event encoder error : %d", ret); + } + + flb_log_event_encoder_reset(&ml->log_event_encoder); + + /* reset group buffer counters */ + st_group->mp_sbuf.size = 0; + flb_sds_len_set(st_group->buf, 0); + + /* Update last flush time */ + st_group->last_flush = time_ms_now(); + } + + return 0; +} + +int flb_ml_append_event(struct flb_ml *ml, uint64_t stream_id, + struct flb_log_event *event) +{ + return flb_ml_append_object(ml, + stream_id, + &event->timestamp, + event->metadata, + event->body); +} + + +struct flb_ml *flb_ml_create(struct flb_config *ctx, char *name) +{ + int result; + struct flb_ml *ml; + + ml = flb_calloc(1, sizeof(struct flb_ml)); + if (!ml) { + flb_errno(); + return NULL; + } + ml->name = flb_sds_create(name); + if (!ml) { + flb_free(ml); + return NULL; + } + + ml->config = ctx; + ml->last_flush = time_ms_now(); + mk_list_init(&ml->groups); + + result = flb_log_event_decoder_init(&ml->log_event_decoder, + NULL, + 0); + + if (result != FLB_EVENT_DECODER_SUCCESS) { + flb_error("cannot initialize log event decoder"); + + flb_ml_destroy(ml); + + return NULL; + } + + result = flb_log_event_encoder_init(&ml->log_event_encoder, + FLB_LOG_EVENT_FORMAT_DEFAULT); + + if (result != FLB_EVENT_ENCODER_SUCCESS) { + flb_error("cannot initialize log event encoder"); + + flb_ml_destroy(ml); + + return NULL; + } + + return ml; +} + +/* + * Some multiline contexts might define a parser name but not a parser context, + * for missing contexts, just lookup the parser and perform the assignment. + * + * The common use case is when reading config files with [PARSER] and [MULTILINE_PARSER] + * definitions, so we need to delay the parser loading. + */ +int flb_ml_parsers_init(struct flb_config *ctx) +{ + struct mk_list *head; + struct flb_parser *p; + struct flb_ml_parser *ml_parser; + + mk_list_foreach(head, &ctx->multiline_parsers) { + ml_parser = mk_list_entry(head, struct flb_ml_parser, _head); + if (ml_parser->parser_name && !ml_parser->parser) { + p = flb_parser_get(ml_parser->parser_name, ctx); + if (!p) { + flb_error("multiline parser '%s' points to an undefined parser '%s'", + ml_parser->name, ml_parser->parser_name); + return -1; + } + ml_parser->parser = p; + } + } + + return 0; +} + +int flb_ml_auto_flush_init(struct flb_ml *ml) +{ + struct flb_sched *scheduler; + int ret; + + if (ml == NULL) { + return -1; + } + + scheduler = flb_sched_ctx_get(); + + if (scheduler == NULL) { + flb_error("[multiline] scheduler context has not been created"); + return -1; + } + + if (ml->flush_ms < 500) { + flb_error("[multiline] flush timeout '%i' is too low", ml->flush_ms); + return -1; + } + + /* Create flush timer */ + ret = flb_sched_timer_cb_create(scheduler, + FLB_SCHED_TIMER_CB_PERM, + ml->flush_ms, + cb_ml_flush_timer, + ml, NULL); + return ret; +} + +int flb_ml_destroy(struct flb_ml *ml) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_ml_group *group; + + if (!ml) { + return 0; + } + + flb_log_event_decoder_destroy(&ml->log_event_decoder); + flb_log_event_encoder_destroy(&ml->log_event_encoder); + + if (ml->name) { + flb_sds_destroy(ml->name); + } + + /* destroy groups */ + mk_list_foreach_safe(head, tmp, &ml->groups) { + group = mk_list_entry(head, struct flb_ml_group, _head); + flb_ml_group_destroy(group); + } + + flb_free(ml); + return 0; +} + +static int flb_msgpack_object_hash_internal(cfl_hash_state_t *state, + msgpack_object *object) +{ + void *dummy_pointer; + int result; + int index; + + if (object == NULL) { + return 0; + } + + dummy_pointer = NULL; + result = 0; + + if (object->type == MSGPACK_OBJECT_NIL) { + cfl_hash_64bits_update(state, + &dummy_pointer, + sizeof(dummy_pointer)); + } + else if (object->type == MSGPACK_OBJECT_BOOLEAN) { + cfl_hash_64bits_update(state, + &object->via.boolean, + sizeof(object->via.boolean)); + } + else if (object->type == MSGPACK_OBJECT_POSITIVE_INTEGER || + object->type == MSGPACK_OBJECT_NEGATIVE_INTEGER) { + cfl_hash_64bits_update(state, + &object->via.u64, + sizeof(object->via.u64)); + } + else if (object->type == MSGPACK_OBJECT_FLOAT32 || + object->type == MSGPACK_OBJECT_FLOAT64 || + object->type == MSGPACK_OBJECT_FLOAT) { + cfl_hash_64bits_update(state, + &object->via.f64, + sizeof(object->via.f64)); + } + else if (object->type == MSGPACK_OBJECT_STR) { + cfl_hash_64bits_update(state, + object->via.str.ptr, + object->via.str.size); + } + else if (object->type == MSGPACK_OBJECT_ARRAY) { + for (index = 0 ; + index < object->via.array.size && + result == 0; + index++) { + result = flb_msgpack_object_hash_internal( + state, + &object->via.array.ptr[index]); + } + } + else if (object->type == MSGPACK_OBJECT_MAP) { + for (index = 0 ; + index < object->via.map.size && + result == 0; + index++) { + result = flb_msgpack_object_hash_internal( + state, + &object->via.map.ptr[index].key); + + if (result == 0) { + result = flb_msgpack_object_hash_internal( + state, + &object->via.map.ptr[index].val); + } + } + } + else if (object->type == MSGPACK_OBJECT_BIN) { + cfl_hash_64bits_update(state, + object->via.bin.ptr, + object->via.bin.size); + } + else if (object->type == MSGPACK_OBJECT_EXT) { + cfl_hash_64bits_update(state, + &object->via.ext.type, + sizeof(object->via.ext.type)); + + cfl_hash_64bits_update(state, + object->via.ext.ptr, + object->via.ext.size); + } + + return result; +} + +static int flb_hash_msgpack_object_list(cfl_hash_64bits_t *hash, + size_t entry_count, + ...) +{ + cfl_hash_state_t hash_state; + va_list arguments; + msgpack_object *object; + int result; + size_t index; + + cfl_hash_64bits_reset(&hash_state); + + va_start(arguments, entry_count); + + result = 0; + + for (index = 0 ; + index < entry_count && + result == 0 ; + index++) { + object = va_arg(arguments, msgpack_object *); + + if (object == NULL) { + break; + } + + result = flb_msgpack_object_hash_internal(&hash_state, object); + } + + va_end(arguments); + + if (result == 0) { + *hash = cfl_hash_64bits_digest(&hash_state); + } + + return result; +} + +struct flb_deduplication_list_entry { + cfl_hash_64bits_t hash; + struct cfl_list _head; +}; + +void flb_deduplication_list_init(struct cfl_list *deduplication_list) +{ + cfl_list_init(deduplication_list); +} + +int flb_deduplication_list_validate(struct cfl_list *deduplication_list, + cfl_hash_64bits_t hash) +{ + struct cfl_list *iterator; + struct flb_deduplication_list_entry *entry; + + cfl_list_foreach(iterator, deduplication_list) { + entry = cfl_list_entry(iterator, + struct flb_deduplication_list_entry, + _head); + + if (entry->hash == hash) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +int flb_deduplication_list_add(struct cfl_list *deduplication_list, + cfl_hash_64bits_t hash) +{ + struct flb_deduplication_list_entry *entry; + + entry = (struct flb_deduplication_list_entry *) + flb_calloc(1, + sizeof(struct flb_deduplication_list_entry)); + + if (entry == NULL) { + return -1; + } + + cfl_list_entry_init(&entry->_head); + entry->hash = hash; + + cfl_list_append(&entry->_head, deduplication_list); + + return 0; +} + +void flb_deduplication_list_purge(struct cfl_list *deduplication_list) +{ + struct cfl_list *iterator; + struct cfl_list *backup; + struct flb_deduplication_list_entry *entry; + + cfl_list_foreach_safe(iterator, backup, deduplication_list) { + entry = cfl_list_entry(iterator, + struct flb_deduplication_list_entry, + _head); + + cfl_list_del(&entry->_head); + + free(entry); + } +} + +int flb_ml_flush_metadata_buffer(struct flb_ml_stream *mst, + struct flb_ml_stream_group *group, + int deduplicate_metadata) +{ + int append_metadata_entry; + cfl_hash_64bits_t metadata_entry_hash; + struct cfl_list deduplication_list; + msgpack_unpacked metadata_map; + size_t offset; + size_t index; + msgpack_object value; + msgpack_object key; + int ret; + + ret = FLB_EVENT_ENCODER_SUCCESS; + + if (deduplicate_metadata) { + flb_deduplication_list_init(&deduplication_list); + } + + msgpack_unpacked_init(&metadata_map); + + offset = 0; + while (ret == FLB_EVENT_ENCODER_SUCCESS && + msgpack_unpack_next(&metadata_map, + group->mp_md_sbuf.data, + group->mp_md_sbuf.size, + &offset) == MSGPACK_UNPACK_SUCCESS) { + + for (index = 0; + index < metadata_map.data.via.map.size && + ret == FLB_EVENT_ENCODER_SUCCESS; + index++) { + key = metadata_map.data.via.map.ptr[index].key; + value = metadata_map.data.via.map.ptr[index].val; + + append_metadata_entry = FLB_TRUE; + + if (deduplicate_metadata) { + ret = flb_hash_msgpack_object_list(&metadata_entry_hash, + 2, + &key, + &value); + if (ret != 0) { + ret = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT; + } + else { + ret = flb_deduplication_list_validate( + &deduplication_list, + metadata_entry_hash); + + if (ret) { + append_metadata_entry = FLB_FALSE; + + ret = FLB_EVENT_ENCODER_SUCCESS; + } + else { + ret = flb_deduplication_list_add( + &deduplication_list, + metadata_entry_hash); + + if (ret == 0) { + ret = FLB_EVENT_ENCODER_SUCCESS; + } + else { + ret = FLB_EVENT_ENCODER_ERROR_ALLOCATION_ERROR; + } + } + } + } + + if (append_metadata_entry) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_metadata_values( + &mst->ml->log_event_encoder, + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&key), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&value)); + } + } + } + } + + msgpack_unpacked_destroy(&metadata_map); + + if (deduplicate_metadata) { + flb_deduplication_list_purge(&deduplication_list); + } + + return ret; +} + +int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser, + struct flb_ml_stream *mst, + struct flb_ml_stream_group *group, + int forced_flush) +{ + int i; + int ret; + int size; + int len; + size_t off = 0; + msgpack_object map; + msgpack_object k; + msgpack_object v; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + msgpack_unpacked result; + struct flb_ml_parser_ins *parser_i = mst->parser; + struct flb_time *group_time; + struct flb_time now; + + breakline_prepare(parser_i, group); + len = flb_sds_len(group->buf); + + /* init msgpack buffer */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + /* if the group don't have a time set, use current time */ + if (flb_time_to_nanosec(&group->mp_time) == 0L) { + flb_time_get(&now); + group_time = &now; + } else { + group_time = &group->mp_time; + } + + /* compose final record if we have a first line context */ + if (group->mp_sbuf.size > 0) { + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, + group->mp_sbuf.data, group->mp_sbuf.size, + &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + flb_error("[multiline] could not unpack first line state buffer"); + msgpack_unpacked_destroy(&result); + group->mp_sbuf.size = 0; + return -1; + } + map = result.data; + + if (map.type != MSGPACK_OBJECT_MAP) { + flb_error("[multiline] expected MAP type in first line state buffer"); + msgpack_unpacked_destroy(&result); + group->mp_sbuf.size = 0; + return -1; + } + + /* Take the first line keys and repack */ + len = flb_sds_len(parser_i->key_content); + size = map.via.map.size; + msgpack_pack_map(&mp_pck, size); + + for (i = 0; i < size; i++) { + k = map.via.map.ptr[i].key; + v = map.via.map.ptr[i].val; + + /* + * Check if the current key is the key that will contain the + * concatenated multiline buffer + */ + if (k.type == MSGPACK_OBJECT_STR && + parser_i->key_content && + k.via.str.size == len && + strncmp(k.via.str.ptr, parser_i->key_content, len) == 0) { + + /* key */ + msgpack_pack_object(&mp_pck, k); + + /* value */ + len = flb_sds_len(group->buf); + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, group->buf, len); + } + else { + /* key / val */ + msgpack_pack_object(&mp_pck, k); + msgpack_pack_object(&mp_pck, v); + } + } + msgpack_unpacked_destroy(&result); + group->mp_sbuf.size = 0; + } + else if (len > 0) { + /* Pack raw content as Fluent Bit record */ + msgpack_pack_map(&mp_pck, 1); + + /* key */ + if (parser_i->key_content) { + len = flb_sds_len(parser_i->key_content); + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, parser_i->key_content, len); + } + else { + msgpack_pack_str(&mp_pck, 3); + msgpack_pack_str_body(&mp_pck, "log", 3); + } + + /* val */ + len = flb_sds_len(group->buf); + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, group->buf, len); + } + + if (mp_sbuf.size > 0) { + /* + * a 'forced_flush' means to alert the caller that the data 'must be flushed to it destination'. This flag is + * only enabled when the flush process has been triggered by the multiline timer, e.g: + * + * - the message is complete or incomplete and its time to dispatch it. + */ + if (forced_flush) { + mst->forced_flush = FLB_TRUE; + } + + /* encode and invoke the user callback */ + + ret = flb_log_event_encoder_begin_record( + &mst->ml->log_event_encoder); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_timestamp( + &mst->ml->log_event_encoder, + group_time); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_ml_flush_metadata_buffer(mst, + group, + FLB_TRUE); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_body_from_raw_msgpack( + &mst->ml->log_event_encoder, + mp_sbuf.data, + mp_sbuf.size); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record( + &mst->ml->log_event_encoder); + } + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_error("[multiline] error packing event"); + + return -1; + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + mst->cb_flush(ml_parser, + mst, + mst->cb_data, + mst->ml->log_event_encoder.output_buffer, + mst->ml->log_event_encoder.output_length); + } + else { + flb_error("[multiline] log event encoder error : %d", ret); + } + + flb_log_event_encoder_reset(&mst->ml->log_event_encoder); + + if (forced_flush) { + mst->forced_flush = FLB_FALSE; + } + } + + msgpack_sbuffer_destroy(&mp_sbuf); + flb_sds_len_set(group->buf, 0); + + /* Update last flush time */ + group->last_flush = time_ms_now(); + + return 0; +} + +/* + * Initialize multiline global environment. + * + * note: function must be invoked before any flb_ml_create() call. + */ +int flb_ml_init(struct flb_config *config) +{ + int ret; + + ret = flb_ml_parser_builtin_create(config); + if (ret == -1) { + return -1; + } + + return 0; +} + +int flb_ml_exit(struct flb_config *config) +{ + flb_ml_parser_destroy_all(&config->multiline_parsers); + return 0; +} diff --git a/fluent-bit/src/multiline/flb_ml_group.c b/fluent-bit/src/multiline/flb_ml_group.c new file mode 100644 index 000000000..895a71056 --- /dev/null +++ b/fluent-bit/src/multiline/flb_ml_group.c @@ -0,0 +1,86 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_parser.h> + +struct flb_ml_group *flb_ml_group_create(struct flb_ml *ml) +{ + struct flb_ml_group *group; + + group = flb_calloc(1, sizeof(struct flb_ml_group)); + if (!group) { + flb_errno(); + return NULL; + } + group->id = mk_list_size(&ml->groups); + group->ml = ml; + group->lru_parser = NULL; + mk_list_init(&group->parsers); + + mk_list_add(&group->_head, &ml->groups); + + return group; +} + +/* + * Link a parser instance into the active group, if no group exists, a default + * one is created. + */ +int flb_ml_group_add_parser(struct flb_ml *ctx, struct flb_ml_parser_ins *p) +{ + struct flb_ml_group *group = NULL; + + if (mk_list_size(&ctx->groups) == 0) { + group = flb_ml_group_create(ctx); + if (!group) { + return -1; + } + } + else { + /* retrieve the latest active group */ + group = mk_list_entry_last(&ctx->groups, struct flb_ml_group, _head); + } + + if (!group) { + return -1; + } + + mk_list_add(&p->_head, &group->parsers); + return 0; +} + +void flb_ml_group_destroy(struct flb_ml_group *group) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_ml_parser_ins *parser_i; + + /* destroy parser instances */ + mk_list_foreach_safe(head, tmp, &group->parsers) { + parser_i = mk_list_entry(head, struct flb_ml_parser_ins, _head); + flb_ml_parser_instance_destroy(parser_i); + } + + mk_list_del(&group->_head); + flb_free(group); +} diff --git a/fluent-bit/src/multiline/flb_ml_mode.c b/fluent-bit/src/multiline/flb_ml_mode.c new file mode 100644 index 000000000..964672d82 --- /dev/null +++ b/fluent-bit/src/multiline/flb_ml_mode.c @@ -0,0 +1,111 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_mode.h> + +struct flb_ml *flb_ml_mode_create(struct flb_config *config, char *mode, int flush_ms, + char *key) +{ + if (strcmp(mode, "docker") == 0) { + return flb_ml_mode_docker(config, flush_ms); + } + else if (strcmp(mode, "cri") == 0) { + return flb_ml_mode_cri(config, flush_ms); + } + else if (strcmp(mode, "python") == 0) { + return flb_ml_mode_python(config, flush_ms, key); + } + else if (strcmp(mode, "java") == 0) { + return flb_ml_mode_java(config, flush_ms, key); + } + else if (strcmp(mode, "go") == 0) { + return flb_ml_mode_go(config, flush_ms, key); + } + + flb_error("[multiline] built-in mode '%s' not found", mode); + return NULL; +} + + +struct flb_ml_mode *flb_ml_parser_create(struct flb_config *ctx, + char *name, + int type, char *match_str, int negate, + int flush_ms, + char *key_content, + char *key_group, + char *key_pattern, + struct flb_parser *parser_ctx, + char *parser_name) +{ + struct flb_ml_mode *ml; + + ml = flb_calloc(1, sizeof(struct flb_ml)); + if (!ml) { + flb_errno(); + return NULL; + } + ml->name = flb_sds_create(name); + ml->type = type; + + if (match_str) { + ml->match_str = flb_sds_create(match_str); + if (!ml->match_str) { + flb_free(ml); + return NULL; + } + } + + ml->parser = parser_ctx; + if (parser_name) { + ml->parser_name = flb_sds_create(parser_name); + } + + ml->negate = negate; + mk_list_init(&ml->streams); + mk_list_init(&ml->regex_rules); + mk_list_add(&ml->_head, &ctx->multilines); + + if (key_content) { + ml->key_content = flb_sds_create(key_content); + if (!ml->key_content) { + flb_ml_destroy(ml); + return NULL; + } + } + + if (key_group) { + ml->key_group = flb_sds_create(key_group); + if (!ml->key_group) { + flb_ml_destroy(ml); + return NULL; + } + } + + if (key_pattern) { + ml->key_pattern = flb_sds_create(key_pattern); + if (!ml->key_pattern) { + flb_ml_destroy(ml); + return NULL; + } + } + return ml; +} diff --git a/fluent-bit/src/multiline/flb_ml_parser.c b/fluent-bit/src/multiline/flb_ml_parser.c new file mode 100644 index 000000000..7aa33789d --- /dev/null +++ b/fluent-bit/src/multiline/flb_ml_parser.c @@ -0,0 +1,347 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_rule.h> +#include <fluent-bit/multiline/flb_ml_mode.h> +#include <fluent-bit/multiline/flb_ml_group.h> + +int flb_ml_parser_init(struct flb_ml_parser *ml_parser) +{ + int ret; + + ret = flb_ml_rule_init(ml_parser); + if (ret == -1) { + return -1; + } + + return 0; +} + +/* Create built-in multiline parsers */ +int flb_ml_parser_builtin_create(struct flb_config *config) +{ + struct flb_ml_parser *mlp; + + /* Docker */ + mlp = flb_ml_parser_docker(config); + if (!mlp) { + flb_error("[multiline] could not init 'docker' built-in parser"); + return -1; + } + + /* CRI */ + mlp = flb_ml_parser_cri(config); + if (!mlp) { + flb_error("[multiline] could not init 'cri' built-in parser"); + return -1; + } + + /* Java */ + mlp = flb_ml_parser_java(config, NULL); + if (!mlp) { + flb_error("[multiline] could not init 'java' built-in parser"); + return -1; + } + + /* Go */ + mlp = flb_ml_parser_go(config, NULL); + if (!mlp) { + flb_error("[multiline] could not init 'go' built-in parser"); + return -1; + } + + /* Ruby */ + mlp = flb_ml_parser_ruby(config, NULL); + if (!mlp) { + flb_error("[multiline] could not init 'ruby' built-in parser"); + return -1; + } + + /* Python */ + mlp = flb_ml_parser_python(config, NULL); + if (!mlp) { + flb_error("[multiline] could not init 'python' built-in parser"); + return -1; + } + + return 0; +} + +struct flb_ml_parser *flb_ml_parser_create(struct flb_config *ctx, + char *name, + int type, char *match_str, int negate, + int flush_ms, + char *key_content, + char *key_group, + char *key_pattern, + struct flb_parser *parser_ctx, + char *parser_name) +{ + struct flb_ml_parser *ml_parser; + + ml_parser = flb_calloc(1, sizeof(struct flb_ml_parser)); + if (!ml_parser) { + flb_errno(); + return NULL; + } + ml_parser->name = flb_sds_create(name); + ml_parser->type = type; + + if (match_str) { + ml_parser->match_str = flb_sds_create(match_str); + if (!ml_parser->match_str) { + if (ml_parser->name) { + flb_sds_destroy(ml_parser->name); + } + flb_free(ml_parser); + return NULL; + } + } + + ml_parser->parser = parser_ctx; + + if (parser_name) { + ml_parser->parser_name = flb_sds_create(parser_name); + } + ml_parser->negate = negate; + ml_parser->flush_ms = flush_ms; + mk_list_init(&ml_parser->regex_rules); + mk_list_add(&ml_parser->_head, &ctx->multiline_parsers); + + if (key_content) { + ml_parser->key_content = flb_sds_create(key_content); + if (!ml_parser->key_content) { + flb_ml_parser_destroy(ml_parser); + return NULL; + } + } + + if (key_group) { + ml_parser->key_group = flb_sds_create(key_group); + if (!ml_parser->key_group) { + flb_ml_parser_destroy(ml_parser); + return NULL; + } + } + + if (key_pattern) { + ml_parser->key_pattern = flb_sds_create(key_pattern); + if (!ml_parser->key_pattern) { + flb_ml_parser_destroy(ml_parser); + return NULL; + } + } + + return ml_parser; +} + +struct flb_ml_parser *flb_ml_parser_get(struct flb_config *ctx, char *name) +{ + struct mk_list *head; + struct flb_ml_parser *ml_parser; + + mk_list_foreach(head, &ctx->multiline_parsers) { + ml_parser = mk_list_entry(head, struct flb_ml_parser, _head); + if (strcasecmp(ml_parser->name, name) == 0) { + return ml_parser; + } + } + + return NULL; +} + +int flb_ml_parser_instance_has_data(struct flb_ml_parser_ins *ins) +{ + struct mk_list *head; + struct mk_list *head_group; + struct flb_ml_stream *st; + struct flb_ml_stream_group *st_group; + + mk_list_foreach(head, &ins->streams) { + st = mk_list_entry(head, struct flb_ml_stream, _head); + mk_list_foreach(head_group, &st->groups) { + st_group = mk_list_entry(head_group, struct flb_ml_stream_group, _head); + if (st_group->mp_sbuf.size > 0) { + return FLB_TRUE; + } + } + } + + return FLB_FALSE; +} + +struct flb_ml_parser_ins *flb_ml_parser_instance_create(struct flb_ml *ml, + char *name) +{ + int ret; + struct flb_ml_parser_ins *ins; + struct flb_ml_parser *parser; + + parser = flb_ml_parser_get(ml->config, name); + if (!parser) { + flb_error("[multiline] parser '%s' not registered", name); + return NULL; + } + + ins = flb_calloc(1, sizeof(struct flb_ml_parser_ins)); + if (!ins) { + flb_errno(); + return NULL; + } + ins->last_stream_id = 0; + ins->ml_parser = parser; + mk_list_init(&ins->streams); + + /* Copy parent configuration */ + if (parser->key_content) { + ins->key_content = flb_sds_create(parser->key_content); + } + if (parser->key_pattern) { + ins->key_pattern = flb_sds_create(parser->key_pattern); + } + if (parser->key_group) { + ins->key_group = flb_sds_create(parser->key_group); + } + + /* Append this multiline parser instance to the active multiline group */ + ret = flb_ml_group_add_parser(ml, ins); + if (ret != 0) { + flb_error("[multiline] could not register parser '%s' on " + "multiline '%s 'group", name, ml->name); + flb_free(ins); + return NULL; + } + + /* + * Update flush_interval for pending records on multiline context. We always + * use the greater value found. + */ + if (parser->flush_ms > ml->flush_ms) { + ml->flush_ms = parser->flush_ms; + } + + return ins; +} + +/* Override a fixed parser property for the instance only*/ +int flb_ml_parser_instance_set(struct flb_ml_parser_ins *p, char *prop, char *val) +{ + if (strcasecmp(prop, "key_content") == 0) { + if (p->key_content) { + flb_sds_destroy(p->key_content); + } + p->key_content = flb_sds_create(val); + } + else if (strcasecmp(prop, "key_pattern") == 0) { + if (p->key_pattern) { + flb_sds_destroy(p->key_pattern); + } + p->key_pattern = flb_sds_create(val); + } + else if (strcasecmp(prop, "key_group") == 0) { + if (p->key_group) { + flb_sds_destroy(p->key_group); + } + p->key_group = flb_sds_create(val); + } + else { + return -1; + } + + return 0; +} + +int flb_ml_parser_destroy(struct flb_ml_parser *ml_parser) +{ + if (!ml_parser) { + return 0; + } + + if (ml_parser->name) { + flb_sds_destroy(ml_parser->name); + } + + if (ml_parser->parser_name) { + flb_sds_destroy(ml_parser->parser_name); + } + + if (ml_parser->match_str) { + flb_sds_destroy(ml_parser->match_str); + } + if (ml_parser->key_content) { + flb_sds_destroy(ml_parser->key_content); + } + if (ml_parser->key_group) { + flb_sds_destroy(ml_parser->key_group); + } + if (ml_parser->key_pattern) { + flb_sds_destroy(ml_parser->key_pattern); + } + + /* Regex rules */ + flb_ml_rule_destroy_all(ml_parser); + + /* Unlink from struct flb_config->multiline_parsers */ + mk_list_del(&ml_parser->_head); + + flb_free(ml_parser); + return 0; +} + +int flb_ml_parser_instance_destroy(struct flb_ml_parser_ins *ins) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_ml_stream *stream; + + /* Destroy streams */ + mk_list_foreach_safe(head, tmp, &ins->streams) { + stream = mk_list_entry(head, struct flb_ml_stream, _head); + flb_ml_stream_destroy(stream); + } + + if (ins->key_content) { + flb_sds_destroy(ins->key_content); + } + if (ins->key_pattern) { + flb_sds_destroy(ins->key_pattern); + } + if (ins->key_group) { + flb_sds_destroy(ins->key_group); + } + + flb_free(ins); + + return 0; +} + +void flb_ml_parser_destroy_all(struct mk_list *list) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_ml_parser *parser; + + mk_list_foreach_safe(head, tmp, list) { + parser = mk_list_entry(head, struct flb_ml_parser, _head); + flb_ml_parser_destroy(parser); + } +} diff --git a/fluent-bit/src/multiline/flb_ml_parser_cri.c b/fluent-bit/src/multiline/flb_ml_parser_cri.c new file mode 100644 index 000000000..669fa39a2 --- /dev/null +++ b/fluent-bit/src/multiline/flb_ml_parser_cri.c @@ -0,0 +1,81 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_parser.h> + +#define FLB_ML_CRI_REGEX \ + "^(?<time>.+?) (?<stream>stdout|stderr) (?<_p>F|P) (?<log>.*)$" +#define FLB_ML_CRI_TIME \ + "%Y-%m-%dT%H:%M:%S.%L%z" + +/* Creates a parser for Docker */ +static struct flb_parser *cri_parser_create(struct flb_config *config) +{ + struct flb_parser *p; + + p = flb_parser_create("_ml_cri", /* parser name */ + "regex", /* backend type */ + FLB_ML_CRI_REGEX, /* regex */ + FLB_FALSE, /* skip_empty */ + FLB_ML_CRI_TIME, /* time format */ + "time", /* time key */ + NULL, /* time offset */ + FLB_TRUE, /* time keep */ + FLB_FALSE, /* time strict */ + FLB_FALSE, /* no bare keys */ + NULL, /* parser types */ + 0, /* types len */ + NULL, /* decoders */ + config); /* Fluent Bit context */ + return p; +} + +/* Our first multiline mode: 'docker' */ +struct flb_ml_parser *flb_ml_parser_cri(struct flb_config *config) +{ + struct flb_parser *parser; + struct flb_ml_parser *mlp; + + /* Create a Docker parser */ + parser = cri_parser_create(config); + if (!parser) { + return NULL; + } + + mlp = flb_ml_parser_create(config, + "cri", /* name */ + FLB_ML_EQ, /* type */ + "F", /* match_str */ + FLB_FALSE, /* negate */ + FLB_ML_FLUSH_TIMEOUT, /* flush_ms */ + "log", /* key_content */ + "stream", /* key_group */ + "_p", /* key_pattern */ + parser, /* parser ctx */ + NULL); /* parser name */ + + if (!mlp) { + flb_error("[multiline] could not create 'cri mode'"); + return NULL; + } + + return mlp; +} diff --git a/fluent-bit/src/multiline/flb_ml_parser_docker.c b/fluent-bit/src/multiline/flb_ml_parser_docker.c new file mode 100644 index 000000000..5b622d32c --- /dev/null +++ b/fluent-bit/src/multiline/flb_ml_parser_docker.c @@ -0,0 +1,110 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_parser.h> + +/* Creates a parser for Docker */ +static struct flb_parser *docker_parser_create(struct flb_config *config) +{ + struct flb_parser *p; + + p = flb_parser_create("_ml_json_docker", /* parser name */ + "json", /* backend type */ + NULL, /* regex */ + FLB_TRUE, /* skip_empty */ + "%Y-%m-%dT%H:%M:%S.%L", /* time format */ + "time", /* time key */ + NULL, /* time offset */ + FLB_TRUE, /* time keep */ + FLB_FALSE, /* time strict */ + FLB_FALSE, /* no bare keys */ + NULL, /* parser types */ + 0, /* types len */ + NULL, /* decoders */ + config); /* Fluent Bit context */ + return p; +} + +/* Our first multiline mode: 'docker' */ +struct flb_ml_parser *flb_ml_parser_docker(struct flb_config *config) +{ + struct flb_parser *parser; + struct flb_ml_parser *mlp; + + /* Create a Docker parser */ + parser = docker_parser_create(config); + if (!parser) { + return NULL; + } + + /* + * Let's explain this multiline mode, then you (the reader) might want + * to submit a PR with new built-in modes :) + * + * Containerized apps under Docker writes logs to stdout/stderr. These streams + * (stdout/stderr) are handled by Docker, in most of cases the content is + * stored in a .json file in your file system. A message like "hey!" gets into + * a JSON map like this: + * + * {"log": "hey!\n", "stream": "stdout", "time": "2021-02-01T01:40:03.53412Z"} + * + * By Docker log spec, any 'log' key that "ends with a \n" it's a complete + * log record, but Docker also limits the log record size to 16KB, so a long + * message that does not fit into 16KB can be split in multiple JSON lines, + * the following example use short words to describe the context: + * + * - original message: 'one, two, three\n' + * + * Docker log interpretation: + * + * - {"log": "one, ", "stream": "stdout", "time": "2021-02-01T01:40:03.53413Z"} + * - {"log": "two, ", "stream": "stdout", "time": "2021-02-01T01:40:03.53414Z"} + * - {"log": "three\n", "stream": "stdout", "time": "2021-02-01T01:40:03.53415Z"} + * + * So every 'log' key that does not ends with '\n', it's a partial log record + * and for logging purposes it needs to be concatenated with further messages + * until a final '\n' is found. + * + * We setup the Multiline mode as follows: + * + * - Use the type 'FLB_ML_ENDSWITH' to specify that we expect the 'log' + * key must ends with a '\n' for complete messages, otherwise it means is + * a continuation message. In case a message is not complete just wait until + * 500 milliseconds (0.5 second) and flush the buffer. + */ + mlp = flb_ml_parser_create(config, /* Fluent Bit context */ + "docker", /* name */ + FLB_ML_ENDSWITH, /* type */ + "\n", /* match_str */ + FLB_FALSE, /* negate */ + FLB_ML_FLUSH_TIMEOUT, /* flush_ms */ + "log", /* key_content */ + "stream", /* key_group */ + NULL, /* key_pattern */ + parser, /* parser ctx */ + NULL); /* parser name */ + if (!mlp) { + flb_error("[multiline] could not create 'docker mode'"); + return NULL; + } + + return mlp; +} diff --git a/fluent-bit/src/multiline/flb_ml_parser_go.c b/fluent-bit/src/multiline/flb_ml_parser_go.c new file mode 100644 index 000000000..f1cd5407f --- /dev/null +++ b/fluent-bit/src/multiline/flb_ml_parser_go.c @@ -0,0 +1,140 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_rule.h> +#include <fluent-bit/multiline/flb_ml_parser.h> + +#define rule flb_ml_rule_create + +static void rule_error(struct flb_ml_parser *mlp) +{ + int id; + + id = mk_list_size(&mlp->regex_rules); + flb_error("[multiline: go] rule #%i could not be created", id); + flb_ml_parser_destroy(mlp); +} + +/* Go mode */ +struct flb_ml_parser *flb_ml_parser_go(struct flb_config *config, char *key) +{ + int ret; + struct flb_ml_parser *mlp; + + mlp = flb_ml_parser_create(config, /* Fluent Bit context */ + "go", /* name */ + FLB_ML_REGEX, /* type */ + NULL, /* match_str */ + FLB_FALSE, /* negate */ + FLB_ML_FLUSH_TIMEOUT, /* flush_ms */ + key, /* key_content */ + NULL, /* key_group */ + NULL, /* key_pattern */ + NULL, /* parser ctx */ + NULL); /* parser name */ + + if (!mlp) { + flb_error("[multiline] could not create 'go mode'"); + return NULL; + } + + ret = rule(mlp, + "start_state", + "/\\bpanic: /", + "go_after_panic", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "start_state", + "/http: panic serving/", + "go_goroutine", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "go_after_panic", + "/^$/", + "go_goroutine", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "go_after_panic, go_after_signal, go_frame_1", + "/^$/", + "go_goroutine", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "go_after_panic", + "/^\\[signal /", + "go_after_signal", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "go_goroutine", + "/^goroutine \\d+ \\[[^\\]]+\\]:$/", + "go_frame_1", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "go_frame_1", + "/^(?:[^\\s.:]+\\.)*[^\\s.():]+\\(|^created by /", + "go_frame_2", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "go_frame_2", + "/^\\s/", + "go_frame_1", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + /* Map the rules (mandatory for regex rules) */ + ret = flb_ml_parser_init(mlp); + if (ret != 0) { + flb_error("[multiline: go] error on mapping rules"); + flb_ml_parser_destroy(mlp); + return NULL; + } + + return mlp; +} diff --git a/fluent-bit/src/multiline/flb_ml_parser_java.c b/fluent-bit/src/multiline/flb_ml_parser_java.c new file mode 100644 index 000000000..4df5a00f7 --- /dev/null +++ b/fluent-bit/src/multiline/flb_ml_parser_java.c @@ -0,0 +1,143 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_rule.h> +#include <fluent-bit/multiline/flb_ml_parser.h> + +#define rule flb_ml_rule_create + +static void rule_error(struct flb_ml_parser *ml_parser) +{ + int id; + + id = mk_list_size(&ml_parser->regex_rules); + flb_error("[multiline: java] rule #%i could not be created", id); + flb_ml_parser_destroy(ml_parser); +} + +/* Java mode */ +struct flb_ml_parser *flb_ml_parser_java(struct flb_config *config, char *key) +{ + int ret; + struct flb_ml_parser *mlp; + + mlp = flb_ml_parser_create(config, /* Fluent Bit context */ + "java", /* name */ + FLB_ML_REGEX, /* type */ + NULL, /* match_str */ + FLB_FALSE, /* negate */ + FLB_ML_FLUSH_TIMEOUT, /* flush_ms */ + key, /* key_content */ + NULL, /* key_group */ + NULL, /* key_pattern */ + NULL, /* parser ctx */ + NULL); /* parser name */ + + if (!mlp) { + flb_error("[multiline] could not create 'java mode'"); + return NULL; + } + + ret = rule(mlp, + "start_state, java_start_exception", + "/(.)(?:Exception|Error|Throwable|V8 errors stack trace)[:\\r\\n]/", + "java_after_exception", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "java_after_exception", + "/^[\\t ]*nested exception is:[\\t ]*/", + "java_start_exception", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "java_after_exception", + "/^[\\r\\n]*$/", + "java_after_exception", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "java_after_exception, java", + "/^[\\t ]+(?:eval )?at /", + "java", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "java_after_exception, java", + /* C# nested exception */ + "/^[\\t ]+--- End of inner exception stack trace ---$/", + "java", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "java_after_exception, java", + /* C# exception from async code */ + "/^--- End of stack trace from previous (?x:" + ")location where exception was thrown ---$/", + "java", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "java_after_exception, java", + "/^[\\t ]*(?:Caused by|Suppressed):/", + "java_after_exception", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "java_after_exception, java", + "/^[\\t ]*... \\d+ (?:more|common frames omitted)/", + "java", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + /* Map the rules (mandatory for regex rules) */ + ret = flb_ml_parser_init(mlp); + if (ret != 0) { + flb_error("[multiline: java] error on mapping rules"); + flb_ml_parser_destroy(mlp); + return NULL; + } + + return mlp; +} diff --git a/fluent-bit/src/multiline/flb_ml_parser_python.c b/fluent-bit/src/multiline/flb_ml_parser_python.c new file mode 100644 index 000000000..a92088397 --- /dev/null +++ b/fluent-bit/src/multiline/flb_ml_parser_python.c @@ -0,0 +1,98 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_rule.h> +#include <fluent-bit/multiline/flb_ml_parser.h> + +#define rule flb_ml_rule_create + +static void rule_error(struct flb_ml_parser *mlp) +{ + int id; + + id = mk_list_size(&mlp->regex_rules); + flb_error("[multiline: python] rule #%i could not be created", id); + flb_ml_parser_destroy(mlp); +} + +/* Python */ +struct flb_ml_parser *flb_ml_parser_python(struct flb_config *config, char *key) +{ + int ret; + struct flb_ml_parser *mlp; + + mlp = flb_ml_parser_create(config, /* Fluent Bit context */ + "python", /* name */ + FLB_ML_REGEX, /* type */ + NULL, /* match_str */ + FLB_FALSE, /* negate */ + FLB_ML_FLUSH_TIMEOUT, /* flush_ms */ + key, /* key_content */ + NULL, /* key_group */ + NULL, /* key_pattern */ + NULL, /* parser ctx */ + NULL); /* parser name */ + + if (!mlp) { + flb_error("[multiline] could not create 'python mode'"); + return NULL; + } + + /* rule(:start_state, /^Traceback \(most recent call last\):$/, :python) */ + ret = rule(mlp, + "start_state", "/^Traceback \\(most recent call last\\):$/", + "python", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + /* rule(:python, /^[\t ]+File /, :python_code) */ + ret = rule(mlp, "python", "/^[\\t ]+File /", "python_code", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + /* rule(:python_code, /[^\t ]/, :python) */ + ret = rule(mlp, "python_code", "/[^\\t ]/", "python", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + /* rule(:python, /^(?:[^\s.():]+\.)*[^\s.():]+:/, :start_state) */ + ret = rule(mlp, "python", "/^(?:[^\\s.():]+\\.)*[^\\s.():]+:/", "start_state", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + /* Map the rules (mandatory for regex rules) */ + ret = flb_ml_parser_init(mlp); + if (ret != 0) { + flb_error("[multiline: python] error on mapping rules"); + flb_ml_parser_destroy(mlp); + return NULL; + } + + return mlp; +} diff --git a/fluent-bit/src/multiline/flb_ml_parser_ruby.c b/fluent-bit/src/multiline/flb_ml_parser_ruby.c new file mode 100644 index 000000000..780f829d0 --- /dev/null +++ b/fluent-bit/src/multiline/flb_ml_parser_ruby.c @@ -0,0 +1,87 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_rule.h> +#include <fluent-bit/multiline/flb_ml_parser.h> + +#define rule flb_ml_rule_create + +static void rule_error(struct flb_ml_parser *mlp) +{ + int id; + + id = mk_list_size(&mlp->regex_rules); + flb_error("[multiline: ruby] rule #%i could not be created", id); + flb_ml_parser_destroy(mlp); +} + +/* Ruby mode */ +struct flb_ml_parser *flb_ml_parser_ruby(struct flb_config *config, char *key) +{ + int ret; + struct flb_ml_parser *mlp; + + mlp = flb_ml_parser_create(config, /* Fluent Bit context */ + "ruby", /* name */ + FLB_ML_REGEX, /* type */ + NULL, /* match_str */ + FLB_FALSE, /* negate */ + FLB_ML_FLUSH_TIMEOUT, /* flush_ms */ + key, /* key_content */ + NULL, /* key_group */ + NULL, /* key_pattern */ + NULL, /* parser ctx */ + NULL); /* parser name */ + + if (!mlp) { + flb_error("[multiline] could not create 'ruby mode'"); + return NULL; + } + + ret = rule(mlp, + "start_state, ruby_start_exception", + "/^.+:\\d+:in\\s+.*/", + "ruby_after_exception", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + ret = rule(mlp, + "ruby_after_exception, ruby", + "/^\\s+from\\s+.*:\\d+:in\\s+.*/", + "ruby", NULL); + if (ret != 0) { + rule_error(mlp); + return NULL; + } + + + /* Map the rules (mandatory for regex rules) */ + ret = flb_ml_parser_init(mlp); + if (ret != 0) { + flb_error("[multiline: ruby] error on mapping rules"); + flb_ml_parser_destroy(mlp); + return NULL; + } + + return mlp; +} diff --git a/fluent-bit/src/multiline/flb_ml_rule.c b/fluent-bit/src/multiline/flb_ml_rule.c new file mode 100644 index 000000000..26520dfed --- /dev/null +++ b/fluent-bit/src/multiline/flb_ml_rule.c @@ -0,0 +1,421 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_regex.h> +#include <fluent-bit/flb_slist.h> + +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_rule.h> + +struct to_state { + struct flb_ml_rule *rule; + struct mk_list _head; +}; + +struct flb_slist_entry *get_start_state(struct mk_list *list) +{ + struct mk_list *head; + struct flb_slist_entry *e; + + mk_list_foreach(head, list) { + e = mk_list_entry(head, struct flb_slist_entry, _head); + if (strcmp(e->str, "start_state") == 0) { + return e; + } + } + + return NULL; +} + +int flb_ml_rule_create(struct flb_ml_parser *ml_parser, + flb_sds_t from_states, + char *regex_pattern, + flb_sds_t to_state, + char *end_pattern) +{ + int ret; + int first_rule = FLB_FALSE; + struct flb_ml_rule *rule; + + rule = flb_calloc(1, sizeof(struct flb_ml_rule)); + if (!rule) { + flb_errno(); + return -1; + } + flb_slist_create(&rule->from_states); + mk_list_init(&rule->to_state_map); + + if (mk_list_size(&ml_parser->regex_rules) == 0) { + first_rule = FLB_TRUE; + } + mk_list_add(&rule->_head, &ml_parser->regex_rules); + + /* from_states */ + ret = flb_slist_split_string(&rule->from_states, from_states, ',', -1); + if (ret <= 0) { + flb_error("[multiline] rule is empty or has invalid 'from_states' tokens"); + flb_ml_rule_destroy(rule); + return -1; + } + + /* Check if the rule contains a 'start_state' */ + if (get_start_state(&rule->from_states)) { + rule->start_state = FLB_TRUE; + } + else if (first_rule) { + flb_error("[multiline] rule don't contain a 'start_state'"); + flb_ml_rule_destroy(rule); + return -1; + } + + /* regex content pattern */ + rule->regex = flb_regex_create(regex_pattern); + if (!rule->regex) { + flb_ml_rule_destroy(rule); + return -1; + } + + /* to_state */ + if (to_state) { + rule->to_state = flb_sds_create(to_state); + if (!rule->to_state) { + flb_ml_rule_destroy(rule); + return -1; + } + } + + /* regex end pattern */ + if (end_pattern) { + rule->regex_end = flb_regex_create(end_pattern); + if (!rule->regex_end) { + flb_ml_rule_destroy(rule); + return -1; + } + } + + return 0; +} + +void flb_ml_rule_destroy(struct flb_ml_rule *rule) +{ + struct mk_list *tmp; + struct mk_list *head; + struct to_state *st; + + flb_slist_destroy(&rule->from_states); + + if (rule->regex) { + flb_regex_destroy(rule->regex); + } + + + if (rule->to_state) { + flb_sds_destroy(rule->to_state); + } + + mk_list_foreach_safe(head, tmp, &rule->to_state_map) { + st = mk_list_entry(head, struct to_state, _head); + mk_list_del(&st->_head); + flb_free(st); + } + + if (rule->regex_end) { + flb_regex_destroy(rule->regex_end); + } + + mk_list_del(&rule->_head); + flb_free(rule); +} + +void flb_ml_rule_destroy_all(struct flb_ml_parser *ml_parser) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_ml_rule *rule; + + mk_list_foreach_safe(head, tmp, &ml_parser->regex_rules) { + rule = mk_list_entry(head, struct flb_ml_rule, _head); + flb_ml_rule_destroy(rule); + } +} + +static inline int to_states_exists(struct flb_ml_parser *ml_parser, + flb_sds_t state) +{ + struct mk_list *head; + struct mk_list *r_head; + struct flb_ml_rule *rule; + struct flb_slist_entry *e; + + mk_list_foreach(head, &ml_parser->regex_rules) { + rule = mk_list_entry(head, struct flb_ml_rule, _head); + + mk_list_foreach(r_head, &rule->from_states) { + e = mk_list_entry(r_head, struct flb_slist_entry, _head); + if (strcmp(e->str, state) == 0) { + return FLB_TRUE; + } + } + } + + return FLB_FALSE; +} + +static inline int to_states_matches_rule(struct flb_ml_rule *rule, + flb_sds_t state) +{ + struct mk_list *head; + struct flb_slist_entry *e; + + mk_list_foreach(head, &rule->from_states) { + e = mk_list_entry(head, struct flb_slist_entry, _head); + if (strcmp(e->str, state) == 0) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static int set_to_state_map(struct flb_ml_parser *ml_parser, + struct flb_ml_rule *rule) +{ + int ret; + struct to_state *s; + struct mk_list *head; + struct flb_ml_rule *r; + + if (!rule->to_state) { + /* no to_state */ + return 0; + } + + /* Iterate all rules that matches the to_state */ + mk_list_foreach(head, &ml_parser->regex_rules) { + r = mk_list_entry(head, struct flb_ml_rule, _head); + + /* Check if rule->to_state, matches an existing (registered) from_state */ + ret = to_states_exists(ml_parser, rule->to_state); + if (!ret) { + flb_error("[multiline parser: %s] to_state='%s' is not registered", + ml_parser->name, rule->to_state); + return -1; + } + + /* + * A rule can have many 'from_states', check if the current 'rule->to_state' + * matches any 'r->from_states' + */ + ret = to_states_matches_rule(r, rule->to_state); + if (!ret) { + continue; + } + + /* We have a match. Create a 'to_state' entry into the 'to_state_map' list */ + s = flb_malloc(sizeof(struct to_state)); + if (!s) { + flb_errno(); + return -1; + } + s->rule = r; + mk_list_add(&s->_head, &rule->to_state_map); + } + + return 0; +} + +static int try_flushing_buffer(struct flb_ml_parser *ml_parser, + struct flb_ml_stream *mst, + struct flb_ml_stream_group *group) +{ + int next_start = FLB_FALSE; + struct mk_list *head; + struct to_state *st; + struct flb_ml_rule *rule; + + rule = group->rule_to_state; + if (!rule) { + if (flb_sds_len(group->buf) > 0) { + flb_ml_flush_stream_group(ml_parser, mst, group, FLB_FALSE); + group->first_line = FLB_TRUE; + } + return 0; + } + + /* Check if any 'to_state_map' referenced rules is a possible start */ + mk_list_foreach(head, &rule->to_state_map) { + st = mk_list_entry(head, struct to_state, _head); + if (st->rule->start_state) { + next_start = FLB_TRUE; + break; + } + } + + if (next_start && flb_sds_len(group->buf) > 0) { + flb_ml_flush_stream_group(ml_parser, mst, group, FLB_FALSE); + group->first_line = FLB_TRUE; + } + + return 0; +} + +/* Initialize all rules */ +int flb_ml_rule_init(struct flb_ml_parser *ml_parser) +{ + int ret; + struct mk_list *head; + struct flb_ml_rule *rule; + + /* FIXME: sort rules by start_state first (let's trust in the caller) */ + + /* For each rule, compose it to_state_map list */ + mk_list_foreach(head, &ml_parser->regex_rules) { + rule = mk_list_entry(head, struct flb_ml_rule, _head); + /* Populate 'rule->to_state_map' list */ + ret = set_to_state_map(ml_parser, rule); + if (ret == -1) { + return -1; + } + } + + return 0; +} + +/* Search any 'start_state' matching the incoming 'buf_data' */ +static struct flb_ml_rule *try_start_state(struct flb_ml_parser *ml_parser, + char *buf_data, size_t buf_size) +{ + int ret = -1; + struct mk_list *head; + struct flb_ml_rule *rule = NULL; + + mk_list_foreach(head, &ml_parser->regex_rules) { + rule = mk_list_entry(head, struct flb_ml_rule, _head); + + /* Is this rule matching a start_state ? */ + if (!rule->start_state) { + rule = NULL; + continue; + } + + /* Matched a start_state. Check if we have a regex match */ + ret = flb_regex_match(rule->regex, (unsigned char *) buf_data, buf_size); + if (ret) { + return rule; + } + } + + return NULL; +} + +int flb_ml_rule_process(struct flb_ml_parser *ml_parser, + struct flb_ml_stream *mst, + struct flb_ml_stream_group *group, + msgpack_object *full_map, + void *buf, size_t size, struct flb_time *tm, + msgpack_object *val_content, + msgpack_object *val_pattern) +{ + int ret; + int len; + char *buf_data = NULL; + size_t buf_size = 0; + struct mk_list *head; + struct to_state *st = NULL; + struct flb_ml_rule *rule = NULL; + struct flb_ml_rule *tmp_rule = NULL; + + if (val_content) { + buf_data = (char *) val_content->via.str.ptr; + buf_size = val_content->via.str.size; + } + else { + buf_data = buf; + buf_size = size; + } + + if (group->rule_to_state) { + /* are we in a continuation ? */ + tmp_rule = group->rule_to_state; + + /* Lookup all possible next rules by state reference */ + rule = NULL; + mk_list_foreach(head, &tmp_rule->to_state_map) { + st = mk_list_entry(head, struct to_state, _head); + + /* skip start states */ + if (st->rule->start_state) { + continue; + } + + /* Try regex match */ + ret = flb_regex_match(st->rule->regex, + (unsigned char *) buf_data, buf_size); + if (ret) { + /* Regex matched */ + len = flb_sds_len(group->buf); + if (len >= 1 && group->buf[len - 1] != '\n') { + flb_sds_cat_safe(&group->buf, "\n", 1); + } + + if (buf_size == 0) { + flb_sds_cat_safe(&group->buf, "\n", 1); + } + else { + flb_sds_cat_safe(&group->buf, buf_data, buf_size); + } + rule = st->rule; + break; + } + rule = NULL; + } + + } + + if (!rule) { + /* Check if we are in a 'start_state' */ + rule = try_start_state(ml_parser, buf_data, buf_size); + if (rule) { + /* if the group buffer has any previous data just flush it */ + if (flb_sds_len(group->buf) > 0) { + flb_ml_flush_stream_group(ml_parser, mst, group, FLB_FALSE); + } + + /* set the rule state */ + group->rule_to_state = rule; + + /* concatenate the data */ + flb_sds_cat_safe(&group->buf, buf_data, buf_size); + + /* Copy full map content in stream buffer */ + flb_ml_register_context(group, tm, full_map); + + return 0; + } + } + + if (rule) { + group->rule_to_state = rule; + try_flushing_buffer(ml_parser, mst, group); + return 0; + } + + return -1; +} diff --git a/fluent-bit/src/multiline/flb_ml_stream.c b/fluent-bit/src/multiline/flb_ml_stream.c new file mode 100644 index 000000000..c92ba3220 --- /dev/null +++ b/fluent-bit/src/multiline/flb_ml_stream.c @@ -0,0 +1,338 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/flb_log.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_rule.h> +#include <cfl/cfl.h> + +static int ml_flush_stdout(struct flb_ml_parser *parser, + struct flb_ml_stream *mst, + void *data, char *buf_data, size_t buf_size) +{ + fprintf(stdout, "\n%s----- MULTILINE FLUSH -----%s\n", + ANSI_GREEN, ANSI_RESET); + + /* Print incoming flush buffer */ + flb_pack_print(buf_data, buf_size); + + fprintf(stdout, "%s----------- EOF -----------%s\n", + ANSI_GREEN, ANSI_RESET); + return 0; +} + +static struct flb_ml_stream_group *stream_group_create(struct flb_ml_stream *mst, + char *name, int len) +{ + struct flb_ml_stream_group *group; + + if (!name) { + name = "_default"; + } + + group = flb_calloc(1, sizeof(struct flb_ml_stream_group)); + if (!group) { + flb_errno(); + return NULL; + } + group->name = flb_sds_create_len(name, len); + if (!group->name) { + flb_free(group); + return NULL; + } + + /* status */ + group->first_line = FLB_TRUE; + + /* multiline buffer */ + group->buf = flb_sds_create_size(FLB_ML_BUF_SIZE); + if (!group->buf) { + flb_error("cannot allocate multiline stream buffer in group %s", name); + flb_sds_destroy(group->name); + flb_free(group); + return NULL; + } + + /* msgpack buffer */ + msgpack_sbuffer_init(&group->mp_md_sbuf); + msgpack_packer_init(&group->mp_md_pck, &group->mp_md_sbuf, msgpack_sbuffer_write); + + msgpack_sbuffer_init(&group->mp_sbuf); + msgpack_packer_init(&group->mp_pck, &group->mp_sbuf, msgpack_sbuffer_write); + + mk_list_add(&group->_head, &mst->groups); + + return group; +} + +struct flb_ml_stream_group *flb_ml_stream_group_get(struct flb_ml_parser_ins *parser_i, + struct flb_ml_stream *mst, + msgpack_object *group_name) +{ + int len; + char *name; + struct flb_ml_parser *mlp; + struct mk_list *head; + struct flb_ml_stream_group *group = NULL; + + mlp = parser_i->ml_parser; + + /* If key_group was not defined, we already have a default group */ + if (!mlp->key_group || !group_name) { + group = mk_list_entry_first(&mst->groups, + struct flb_ml_stream_group, + _head); + return group; + } + + /* Lookup for a candidate group */ + len = group_name->via.str.size; + name = (char *)group_name->via.str.ptr; + + mk_list_foreach(head, &mst->groups) { + group = mk_list_entry(head, struct flb_ml_stream_group, _head); + if (flb_sds_cmp(group->name, name, len) == 0) { + return group; + } + else { + group = NULL; + continue; + } + } + + /* No group has been found, create a new one */ + if (mk_list_size(&mst->groups) >= FLB_ML_MAX_GROUPS) { + flb_error("[multiline] stream %s exceeded number of allowed groups (%i)", + mst->name, FLB_ML_MAX_GROUPS); + return NULL; + } + + group = stream_group_create(mst, name, len); + return group; +} + +static void stream_group_destroy(struct flb_ml_stream_group *group) +{ + if (group->name) { + flb_sds_destroy(group->name); + } + if (group->buf) { + flb_sds_destroy(group->buf); + } + + msgpack_sbuffer_destroy(&group->mp_md_sbuf); + msgpack_sbuffer_destroy(&group->mp_sbuf); + + mk_list_del(&group->_head); + flb_free(group); +} + +static void stream_group_destroy_all(struct flb_ml_stream *mst) +{ + struct mk_list *tmp; + struct mk_list *head; + struct flb_ml_stream_group *group; + + mk_list_foreach_safe(head, tmp, &mst->groups) { + group = mk_list_entry(head, struct flb_ml_stream_group, _head); + stream_group_destroy(group); + } +} + +static int stream_group_init(struct flb_ml_stream *stream) +{ + struct flb_ml_stream_group *group = NULL; + + mk_list_init(&stream->groups); + + /* create a default group */ + group = stream_group_create(stream, NULL, 0); + if (!group) { + flb_error("[multiline] error initializing default group for " + "stream '%s'", stream->name); + return -1; + } + + return 0; +} + +static struct flb_ml_stream *stream_create(struct flb_ml *ml, + uint64_t id, + struct flb_ml_parser_ins *parser, + int (*cb_flush) (struct flb_ml_parser *, + struct flb_ml_stream *, + void *cb_data, + char *buf_data, + size_t buf_size), + void *cb_data) +{ + int ret; + struct flb_ml_stream *stream; + + stream = flb_calloc(1, sizeof(struct flb_ml_stream)); + if (!stream) { + flb_errno(); + return NULL; + } + stream->ml = ml; + stream->id = id; + stream->parser = parser; + + /* Flush Callback and opaque data type */ + if (cb_flush) { + stream->cb_flush = cb_flush; + } + else { + stream->cb_flush = ml_flush_stdout; + } + stream->cb_data = cb_data; + + ret = stream_group_init(stream); + if (ret != 0) { + flb_free(stream); + return NULL; + } + + mk_list_add(&stream->_head, &parser->streams); + return stream; +} + +int flb_ml_stream_create(struct flb_ml *ml, + char *name, + int name_len, + int (*cb_flush) (struct flb_ml_parser *, + struct flb_ml_stream *, + void *cb_data, + char *buf_data, + size_t buf_size), + void *cb_data, + uint64_t *stream_id) +{ + uint64_t id; + struct mk_list *head; + struct mk_list *head_group; + struct flb_ml_stream *mst; + struct flb_ml_group *group; + struct flb_ml_parser_ins *parser; + + if (!name) { + return -1; + } + + if (name_len <= 0) { + name_len = strlen(name); + } + + /* Set the stream id by creating a hash using the name */ + id = cfl_hash_64bits(name, name_len); + + /* For every group and parser, create a stream for this stream_id/hash */ + mk_list_foreach(head, &ml->groups) { + group = mk_list_entry(head, struct flb_ml_group, _head); + mk_list_foreach(head_group, &group->parsers) { + parser = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); + + /* Check if the stream already exists on the parser */ + if (flb_ml_stream_get(parser, id) != NULL) { + continue; + } + + /* Create the stream */ + mst = stream_create(ml, id, parser, cb_flush, cb_data); + if (!mst) { + flb_error("[multiline] could not create stream_id=%" PRIu64 + "for stream '%s' on parser '%s'", + *stream_id, name, parser->ml_parser->name); + return -1; + } + } + } + + *stream_id = id; + return 0; +} + +struct flb_ml_stream *flb_ml_stream_get(struct flb_ml_parser_ins *parser, + uint64_t stream_id) +{ + struct mk_list *head; + struct flb_ml_stream *mst = NULL; + + mk_list_foreach(head, &parser->streams) { + mst = mk_list_entry(head, struct flb_ml_stream, _head); + if (mst->id == stream_id) { + return mst; + } + } + + return NULL; +} + +void flb_ml_stream_id_destroy_all(struct flb_ml *ml, uint64_t stream_id) +{ + struct mk_list *tmp; + struct mk_list *head; + struct mk_list *head_group; + struct mk_list *head_stream; + struct flb_ml_group *group; + struct flb_ml_stream *mst; + struct flb_ml_parser_ins *parser_i; + + /* groups */ + mk_list_foreach(head, &ml->groups) { + group = mk_list_entry(head, struct flb_ml_group, _head); + + /* parser instances */ + mk_list_foreach(head_group, &group->parsers) { + parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head); + + /* streams */ + mk_list_foreach_safe(head_stream, tmp, &parser_i->streams) { + mst = mk_list_entry(head_stream, struct flb_ml_stream, _head); + if (mst->id != stream_id) { + continue; + } + + /* flush any pending data */ + flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_TRUE); + + /* destroy internal groups of the stream */ + flb_ml_stream_destroy(mst); + } + } + } +} + +int flb_ml_stream_destroy(struct flb_ml_stream *mst) +{ + mk_list_del(&mst->_head); + if (mst->name) { + flb_sds_destroy(mst->name); + } + + /* destroy groups */ + stream_group_destroy_all(mst); + + flb_free(mst); + + return 0; +} |