summaryrefslogtreecommitdiffstats
path: root/fluent-bit/src/multiline
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/src/multiline')
-rw-r--r--fluent-bit/src/multiline/CMakeLists.txt15
-rw-r--r--fluent-bit/src/multiline/flb_ml.c1562
-rw-r--r--fluent-bit/src/multiline/flb_ml_group.c86
-rw-r--r--fluent-bit/src/multiline/flb_ml_mode.c111
-rw-r--r--fluent-bit/src/multiline/flb_ml_parser.c347
-rw-r--r--fluent-bit/src/multiline/flb_ml_parser_cri.c81
-rw-r--r--fluent-bit/src/multiline/flb_ml_parser_docker.c110
-rw-r--r--fluent-bit/src/multiline/flb_ml_parser_go.c140
-rw-r--r--fluent-bit/src/multiline/flb_ml_parser_java.c143
-rw-r--r--fluent-bit/src/multiline/flb_ml_parser_python.c98
-rw-r--r--fluent-bit/src/multiline/flb_ml_parser_ruby.c87
-rw-r--r--fluent-bit/src/multiline/flb_ml_rule.c421
-rw-r--r--fluent-bit/src/multiline/flb_ml_stream.c338
13 files changed, 3539 insertions, 0 deletions
diff --git a/fluent-bit/src/multiline/CMakeLists.txt b/fluent-bit/src/multiline/CMakeLists.txt
new file mode 100644
index 000000000..294ef3e8f
--- /dev/null
+++ b/fluent-bit/src/multiline/CMakeLists.txt
@@ -0,0 +1,15 @@
+set(src_multiline
+ # built-in parsers
+ multiline/flb_ml_parser_cri.c
+ multiline/flb_ml_parser_docker.c
+ multiline/flb_ml_parser_python.c
+ multiline/flb_ml_parser_java.c
+ multiline/flb_ml_parser_go.c
+ multiline/flb_ml_parser_ruby.c
+ # core
+ multiline/flb_ml_stream.c
+ multiline/flb_ml_parser.c
+ multiline/flb_ml_group.c
+ multiline/flb_ml_rule.c
+ multiline/flb_ml.c PARENT_SCOPE
+ )
diff --git a/fluent-bit/src/multiline/flb_ml.c b/fluent-bit/src/multiline/flb_ml.c
new file mode 100644
index 000000000..28e123cee
--- /dev/null
+++ b/fluent-bit/src/multiline/flb_ml.c
@@ -0,0 +1,1562 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_regex.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_scheduler.h>
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_rule.h>
+#include <fluent-bit/multiline/flb_ml_group.h>
+
+#include <stdarg.h>
+#include <math.h>
+
+static inline int match_negate(struct flb_ml_parser *ml_parser, int matched)
+{
+ int rule_match = matched;
+
+ /* Validate pattern matching against expected 'negate' condition */
+ if (matched == FLB_TRUE) {
+ if (ml_parser->negate == FLB_FALSE) {
+ rule_match = FLB_TRUE;
+ }
+ else {
+ rule_match = FLB_FALSE;
+ }
+ }
+ else {
+ if (ml_parser->negate == FLB_TRUE) {
+ rule_match = FLB_TRUE;
+ }
+ }
+
+ return rule_match;
+}
+
+static uint64_t time_ms_now()
+{
+ uint64_t ms;
+ struct flb_time tm;
+
+ flb_time_get(&tm);
+ ms = (tm.tm.tv_sec * 1000) + lround(tm.tm.tv_nsec/1.0e6);
+ return ms;
+}
+
+
+int flb_ml_flush_stdout(struct flb_ml_parser *parser,
+ struct flb_ml_stream *mst,
+ void *data, char *buf_data, size_t buf_size)
+{
+ fprintf(stdout, "\n%s----- MULTILINE FLUSH (stream_id=%" PRIu64 ") -----%s\n",
+ ANSI_GREEN, mst->id, ANSI_RESET);
+
+ /* Print incoming flush buffer */
+ flb_pack_print(buf_data, buf_size);
+
+ fprintf(stdout, "%s----------- EOF -----------%s\n",
+ ANSI_GREEN, ANSI_RESET);
+ return 0;
+}
+
+int flb_ml_type_lookup(char *str)
+{
+ int type = -1;
+
+ if (strcasecmp(str, "regex") == 0) {
+ type = FLB_ML_REGEX;
+ }
+ else if (strcasecmp(str, "endswith") == 0) {
+ type = FLB_ML_ENDSWITH;
+ }
+ else if (strcasecmp(str, "equal") == 0 || strcasecmp(str, "eq") == 0) {
+ type = FLB_ML_EQ;
+ }
+
+ return type;
+}
+
+void flb_ml_flush_parser_instance(struct flb_ml *ml,
+ struct flb_ml_parser_ins *parser_i,
+ uint64_t stream_id, int forced_flush)
+{
+ struct mk_list *head;
+ struct mk_list *head_group;
+ struct flb_ml_stream *mst;
+ struct flb_ml_stream_group *group;
+
+ mk_list_foreach(head, &parser_i->streams) {
+ mst = mk_list_entry(head, struct flb_ml_stream, _head);
+ if (stream_id != 0 && mst->id != stream_id) {
+ continue;
+ }
+
+ /* Iterate stream groups */
+ mk_list_foreach(head_group, &mst->groups) {
+ group = mk_list_entry(head_group, struct flb_ml_stream_group, _head);
+ flb_ml_flush_stream_group(parser_i->ml_parser, mst, group, forced_flush);
+ }
+ }
+}
+
+void flb_ml_flush_pending(struct flb_ml *ml, uint64_t now, int forced_flush)
+{
+ struct mk_list *head;
+ struct flb_ml_parser_ins *parser_i;
+ struct flb_ml_group *group;
+
+ /* set the last flush time */
+ ml->last_flush = now;
+
+ /* flush only the first group of the context */
+ group = mk_list_entry_first(&ml->groups, struct flb_ml_group, _head);
+
+ /* iterate group parser instances */
+ mk_list_foreach(head, &group->parsers) {
+ parser_i = mk_list_entry(head, struct flb_ml_parser_ins, _head);
+ flb_ml_flush_parser_instance(ml, parser_i, 0, forced_flush);
+ }
+}
+
+void flb_ml_flush_pending_now(struct flb_ml *ml)
+{
+ uint64_t now;
+
+ now = time_ms_now();
+ flb_ml_flush_pending(ml, now, FLB_TRUE);
+}
+
+static void cb_ml_flush_timer(struct flb_config *ctx, void *data)
+{
+ uint64_t now;
+ struct flb_ml *ml = data;
+
+ now = time_ms_now();
+ if (ml->last_flush + ml->flush_ms > now) {
+ return;
+ }
+
+ /*
+ * Iterate over all streams and groups and for a flush for expired groups
+ * which has not flushed in the last N milliseconds.
+ */
+ flb_ml_flush_pending(ml, now, FLB_TRUE);
+}
+
+int flb_ml_register_context(struct flb_ml_stream_group *group,
+ struct flb_time *tm, msgpack_object *map)
+{
+ if (tm) {
+ flb_time_copy(&group->mp_time, tm);
+ }
+
+ if (map) {
+ msgpack_pack_object(&group->mp_pck, *map);
+ }
+
+ return 0;
+}
+
+static inline void breakline_prepare(struct flb_ml_parser_ins *parser_i,
+ struct flb_ml_stream_group *stream_group)
+{
+ int len;
+
+ if (parser_i->key_content) {
+ return;
+ }
+
+ len = flb_sds_len(stream_group->buf);
+ if (len <= 0) {
+ return;
+ }
+
+ if (stream_group->buf[len - 1] != '\n') {
+ flb_sds_cat_safe(&stream_group->buf, "\n", 1);
+ }
+}
+
+/*
+ * package content into a multiline stream:
+ *
+ * full_map: if the original content to process comes in msgpack map, this variable
+ * reference the map. It's only used in case we will package a first line so we
+ * store a copy of the other key values in the map for flush time.
+ */
+static int package_content(struct flb_ml_stream *mst,
+ msgpack_object *metadata,
+ msgpack_object *full_map,
+ void *buf, size_t size, struct flb_time *tm,
+ msgpack_object *val_content,
+ msgpack_object *val_pattern,
+ msgpack_object *val_group)
+{
+ int len;
+ int ret;
+ int rule_match = FLB_FALSE;
+ int processed = FLB_FALSE;
+ int type;
+ size_t offset = 0;
+ size_t buf_size;
+ char *buf_data;
+ msgpack_object *val = val_content;
+ struct flb_ml_parser *parser;
+ struct flb_ml_parser_ins *parser_i;
+ struct flb_ml_stream_group *stream_group;
+
+ parser_i = mst->parser;
+ parser = parser_i->ml_parser;
+
+ /* Get stream group */
+ stream_group = flb_ml_stream_group_get(mst->parser, mst, val_group);
+ if (!mst->last_stream_group) {
+ mst->last_stream_group = stream_group;
+ }
+ else {
+ if (mst->last_stream_group != stream_group) {
+ mst->last_stream_group = stream_group;
+ }
+ }
+
+ /* Set the parser type */
+ type = parser->type;
+
+ if (val_pattern) {
+ val = val_pattern;
+ }
+
+ if (val) {
+ buf_data = (char *) val->via.str.ptr;
+ buf_size = val->via.str.size;
+ }
+ else {
+ buf_data = buf;
+ buf_size = size;
+
+ }
+ if (type == FLB_ML_REGEX) {
+ ret = flb_ml_rule_process(parser, mst,
+ stream_group, full_map, buf, size, tm,
+ val_content, val_pattern);
+ if (ret == -1) {
+ processed = FLB_FALSE;
+ }
+ else {
+ processed = FLB_TRUE;
+ }
+ }
+ else if (type == FLB_ML_ENDSWITH) {
+ len = flb_sds_len(parser->match_str);
+ if (buf_data && len <= buf_size) {
+ /* Validate if content ends with expected string */
+ offset = buf_size - len;
+ ret = memcmp(buf_data + offset, parser->match_str, len);
+ if (ret == 0) {
+ rule_match = match_negate(parser, FLB_TRUE);
+ }
+ else {
+ rule_match = match_negate(parser, FLB_FALSE);
+ }
+
+ if (stream_group->mp_sbuf.size == 0) {
+ flb_ml_register_context(stream_group, tm, full_map);
+ }
+
+ /* Prepare concatenation */
+ breakline_prepare(parser_i, stream_group);
+
+ /* Concatenate value */
+ if (val_content) {
+ flb_sds_cat_safe(&stream_group->buf,
+ val_content->via.str.ptr,
+ val_content->via.str.size);
+ }
+ else {
+ flb_sds_cat_safe(&stream_group->buf, buf_data, buf_size);
+ }
+
+ /* on ENDSWITH mode, a rule match means flush the content */
+ if (rule_match) {
+ flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE);
+ }
+ processed = FLB_TRUE;
+ }
+ }
+ else if (type == FLB_ML_EQ) {
+ if (buf_size == flb_sds_len(parser->match_str) &&
+ memcmp(buf_data, parser->match_str, buf_size) == 0) {
+ /* EQ match */
+ rule_match = match_negate(parser, FLB_TRUE);
+ }
+ else {
+ rule_match = match_negate(parser, FLB_FALSE);
+ }
+
+ if (stream_group->mp_sbuf.size == 0) {
+ flb_ml_register_context(stream_group, tm, full_map);
+ }
+
+ /* Prepare concatenation */
+ breakline_prepare(parser_i, stream_group);
+
+ /* Concatenate value */
+ if (val_content) {
+ flb_sds_cat_safe(&stream_group->buf,
+ val_content->via.str.ptr,
+ val_content->via.str.size);
+ }
+ else {
+ flb_sds_cat_safe(&stream_group->buf, buf_data, buf_size);
+ }
+
+ /* on ENDSWITH mode, a rule match means flush the content */
+ if (rule_match) {
+ flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE);
+ }
+ processed = FLB_TRUE;
+ }
+
+ if (processed && metadata != NULL) {
+ msgpack_pack_object(&stream_group->mp_md_pck, *metadata);
+ }
+
+ return processed;
+}
+
+/*
+ * Retrieve the ID of a specific key name in a map. This function might be
+ * extended later to use record accessor, since all current cases are solved
+ * now quering the first level of keys in the map, we avoid record accessor
+ * to avoid extra memory allocations.
+ */
+static int get_key_id(msgpack_object *map, flb_sds_t key_name)
+{
+ int i;
+ int len;
+ int found = FLB_FALSE;
+ msgpack_object key;
+ msgpack_object val;
+
+ if (!key_name) {
+ return -1;
+ }
+
+ len = flb_sds_len(key_name);
+ for (i = 0; i < map->via.map.size; i++) {
+ key = map->via.map.ptr[i].key;
+ val = map->via.map.ptr[i].val;
+
+ if (key.type != MSGPACK_OBJECT_STR || val.type != MSGPACK_OBJECT_STR) {
+ continue;
+ }
+
+ if (key.via.str.size != len) {
+ continue;
+ }
+
+ if (strncmp(key.via.str.ptr, key_name, len) == 0) {
+ found = FLB_TRUE;
+ break;
+ }
+ }
+
+ if (found) {
+ return i;
+ }
+
+ return -1;
+}
+
+static int process_append(struct flb_ml_parser_ins *parser_i,
+ struct flb_ml_stream *mst,
+ int type,
+ struct flb_time *tm,
+ msgpack_object *metadata,
+ msgpack_object *obj,
+ void *buf, size_t size)
+{
+ int ret;
+ int id_content = -1;
+ int id_pattern = -1;
+ int id_group = -1;
+ int unpacked = FLB_FALSE;
+ size_t off = 0;
+ msgpack_object *full_map = NULL;
+ msgpack_object *val_content = NULL;
+ msgpack_object *val_pattern = NULL;
+ msgpack_object *val_group = NULL;
+ msgpack_unpacked result;
+
+ /* Lookup the key */
+ if (type == FLB_ML_TYPE_TEXT) {
+ ret = package_content(mst, NULL, NULL, buf, size, tm, NULL, NULL, NULL);
+ if (ret == FLB_FALSE) {
+ return -1;
+ }
+ return 0;
+ }
+ else if (type == FLB_ML_TYPE_MAP) {
+ full_map = obj;
+ /*
+ * When full_map and buf is not NULL,
+ * we use 'buf' since buf is already processed from full_map at
+ * ml_append_try_parser_type_map.
+ */
+ if (!full_map || (buf != NULL && full_map != NULL)) {
+ off = 0;
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result, buf, size, &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ return -1;
+ }
+ full_map = &result.data;
+ unpacked = FLB_TRUE;
+ }
+ else if (full_map->type != MSGPACK_OBJECT_MAP) {
+ msgpack_unpacked_destroy(&result);
+ return -1;
+ }
+ }
+
+ /* Lookup for key_content entry */
+ id_content = get_key_id(full_map, parser_i->key_content);
+ if (id_content == -1) {
+ if (unpacked) {
+ msgpack_unpacked_destroy(&result);
+ }
+ return -1;
+ }
+
+ val_content = &full_map->via.map.ptr[id_content].val;
+ if (val_content->type != MSGPACK_OBJECT_STR) {
+ val_content = NULL;
+ }
+
+ /* Optional: Lookup for key_pattern entry */
+ if (parser_i->key_pattern) {
+ id_pattern = get_key_id(full_map, parser_i->key_pattern);
+ if (id_pattern >= 0) {
+ val_pattern = &full_map->via.map.ptr[id_pattern].val;
+ if (val_pattern->type != MSGPACK_OBJECT_STR) {
+ val_pattern = NULL;
+ }
+ }
+ }
+
+ /* Optional: lookup for key_group entry */
+ if (parser_i->key_group) {
+ id_group = get_key_id(full_map, parser_i->key_group);
+ if (id_group >= 0) {
+ val_group = &full_map->via.map.ptr[id_group].val;
+ if (val_group->type != MSGPACK_OBJECT_STR) {
+ val_group = NULL;
+ }
+ }
+ }
+
+ /* Package the content */
+ ret = package_content(mst, metadata, full_map, buf, size, tm,
+ val_content, val_pattern, val_group);
+ if (unpacked) {
+ msgpack_unpacked_destroy(&result);
+ }
+ if (ret == FLB_FALSE) {
+ return -1;
+ }
+ return 0;
+}
+
+static int ml_append_try_parser_type_text(struct flb_ml_parser_ins *parser,
+ uint64_t stream_id,
+ int *type,
+ struct flb_time *tm, void *buf, size_t size,
+ msgpack_object *map,
+ void **out_buf, size_t *out_size, int *out_release,
+ struct flb_time *out_time)
+{
+ int ret;
+
+ if (parser->ml_parser->parser) {
+ /* Parse incoming content */
+ ret = flb_parser_do(parser->ml_parser->parser, (char *) buf, size,
+ out_buf, out_size, out_time);
+ if (flb_time_to_nanosec(out_time) == 0L) {
+ flb_time_copy(out_time, tm);
+ }
+ if (ret >= 0) {
+ *out_release = FLB_TRUE;
+ *type = FLB_ML_TYPE_MAP;
+ }
+ else {
+ *out_buf = buf;
+ *out_size = size;
+ return -1;
+ }
+ }
+ else {
+ *out_buf = buf;
+ *out_size = size;
+ }
+ return 0;
+}
+
+static int ml_append_try_parser_type_map(struct flb_ml_parser_ins *parser,
+ uint64_t stream_id,
+ int *type,
+ struct flb_time *tm, void *buf, size_t size,
+ msgpack_object *map,
+ void **out_buf, size_t *out_size, int *out_release,
+ struct flb_time *out_time)
+{
+ int map_size;
+ int i;
+ int len;
+ msgpack_object key;
+ msgpack_object val;
+
+ if (map == NULL || map->type != MSGPACK_OBJECT_MAP) {
+ flb_error("%s:invalid map", __FUNCTION__);
+ return -1;
+ }
+
+ if (parser->ml_parser->parser) {
+ /* lookup key_content */
+
+ len = flb_sds_len(parser->key_content);
+ map_size = map->via.map.size;
+ for(i = 0; i < map_size; i++) {
+ key = map->via.map.ptr[i].key;
+ val = map->via.map.ptr[i].val;
+ if (key.type == MSGPACK_OBJECT_STR &&
+ parser->key_content &&
+ key.via.str.size == len &&
+ strncmp(key.via.str.ptr, parser->key_content, len) == 0) {
+ /* key_content found */
+ if (val.type == MSGPACK_OBJECT_STR) {
+ /* try parse the value of key_content e*/
+ return ml_append_try_parser_type_text(parser, stream_id, type,
+ tm, (void*) val.via.str.ptr,
+ val.via.str.size,
+ map,
+ out_buf, out_size, out_release,
+ out_time);
+ } else {
+ flb_error("%s: not string", __FUNCTION__);
+ return -1;
+ }
+ }
+ }
+ }
+ else {
+ *out_buf = buf;
+ *out_size = size;
+ }
+ return 0;
+}
+
+static int ml_append_try_parser(struct flb_ml_parser_ins *parser,
+ uint64_t stream_id,
+ int type,
+ struct flb_time *tm, void *buf, size_t size,
+ msgpack_object *metadata,
+ msgpack_object *map)
+{
+ int ret;
+ int release = FLB_FALSE;
+ void *out_buf = NULL;
+ size_t out_size = 0;
+ struct flb_time out_time;
+ struct flb_ml_stream *mst;
+
+ flb_time_zero(&out_time);
+
+ switch (type) {
+ case FLB_ML_TYPE_TEXT:
+ ret = ml_append_try_parser_type_text(parser, stream_id, &type,
+ tm, buf, size, map,
+ &out_buf, &out_size, &release,
+ &out_time);
+ if (ret < 0) {
+ return -1;
+ }
+ break;
+ case FLB_ML_TYPE_MAP:
+ ret = ml_append_try_parser_type_map(parser, stream_id, &type,
+ tm, buf, size, map,
+ &out_buf, &out_size, &release,
+ &out_time);
+ if (ret < 0) {
+ return -1;
+ }
+ break;
+
+ default:
+ flb_error("[multiline] unknown type=%d", type);
+ return -1;
+ }
+
+ if (flb_time_to_nanosec(&out_time) == 0L) {
+ if (tm && flb_time_to_nanosec(tm) != 0L) {
+ flb_time_copy(&out_time, tm);
+ }
+ else {
+ flb_time_get(&out_time);
+ }
+ }
+
+ /* Get the stream */
+ mst = flb_ml_stream_get(parser, stream_id);
+ if (!mst) {
+ flb_error("[multiline] invalid stream_id %" PRIu64 ", could not "
+ "append content to multiline context", stream_id);
+ goto exit;
+ }
+
+ /* Process the binary record */
+ ret = process_append(parser, mst, type, &out_time, metadata, map, out_buf, out_size);
+ if (ret == -1) {
+ if (release == FLB_TRUE) {
+ flb_free(out_buf);
+ }
+ return -1;
+ }
+
+ exit:
+ if (release == FLB_TRUE) {
+ flb_free(out_buf);
+ }
+
+ return 0;
+}
+
+int flb_ml_append_text(struct flb_ml *ml, uint64_t stream_id,
+ struct flb_time *tm, void *buf, size_t size)
+{
+ int ret;
+ int processed = FLB_FALSE;
+ struct mk_list *head;
+ struct mk_list *head_group;
+ struct flb_ml_group *group;
+ struct flb_ml_stream *mst;
+ struct flb_ml_parser_ins *lru_parser = NULL;
+ struct flb_ml_parser_ins *parser_i;
+ struct flb_time out_time;
+ struct flb_ml_stream_group *st_group;
+ int type;
+
+ type = FLB_ML_TYPE_TEXT;
+
+ flb_time_zero(&out_time);
+
+ mk_list_foreach(head, &ml->groups) {
+ group = mk_list_entry(head, struct flb_ml_group, _head);
+
+ /* Check if the incoming data matches the last recently used parser */
+ lru_parser = group->lru_parser;
+
+ if (lru_parser && lru_parser->last_stream_id == stream_id) {
+ ret = ml_append_try_parser(lru_parser, lru_parser->last_stream_id, type,
+ tm, buf, size, NULL, NULL);
+ if (ret == 0) {
+ processed = FLB_TRUE;
+ break;
+ }
+ else {
+ flb_ml_flush_parser_instance(ml,
+ lru_parser,
+ lru_parser->last_stream_id,
+ FLB_FALSE);
+ }
+ }
+ else if (lru_parser && lru_parser->last_stream_id > 0) {
+ /*
+ * Clear last recently used parser to match new parser.
+ * Do not flush last_stream_id since it should continue to parsing.
+ */
+ lru_parser = NULL;
+ }
+ }
+
+ mk_list_foreach(head_group, &group->parsers) {
+ parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head);
+ if (lru_parser && lru_parser == parser_i &&
+ lru_parser->last_stream_id == stream_id) {
+ continue;
+ }
+
+ ret = ml_append_try_parser(parser_i, stream_id, type,
+ tm, buf, size, NULL, NULL);
+ if (ret == 0) {
+ group->lru_parser = parser_i;
+ group->lru_parser->last_stream_id = stream_id;
+ lru_parser = parser_i;
+ processed = FLB_TRUE;
+ break;
+ }
+ else {
+ parser_i = NULL;
+ }
+ }
+
+ if (!processed) {
+ if (lru_parser) {
+ flb_ml_flush_parser_instance(ml, lru_parser, stream_id, FLB_FALSE);
+ parser_i = lru_parser;
+ }
+ else {
+ /* get the first parser (just to make use of it buffers) */
+ parser_i = mk_list_entry_first(&group->parsers,
+ struct flb_ml_parser_ins,
+ _head);
+ }
+
+ flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE);
+ mst = flb_ml_stream_get(parser_i, stream_id);
+ if (!mst) {
+ flb_error("[multiline] invalid stream_id %" PRIu64 ", could not "
+ "append content to multiline context", stream_id);
+ return -1;
+ }
+
+ /* Get stream group */
+ st_group = flb_ml_stream_group_get(mst->parser, mst, NULL);
+ flb_sds_cat_safe(&st_group->buf, buf, size);
+ flb_ml_flush_stream_group(parser_i->ml_parser, mst, st_group, FLB_FALSE);
+ }
+
+ return 0;
+}
+
+
+
+int flb_ml_append_object(struct flb_ml *ml,
+ uint64_t stream_id,
+ struct flb_time *tm,
+ msgpack_object *metadata,
+ msgpack_object *obj)
+{
+ int ret;
+ int type;
+ int processed = FLB_FALSE;
+ struct mk_list *head;
+ struct mk_list *head_group;
+ struct flb_ml_group *group;
+ struct flb_ml_parser_ins *lru_parser = NULL;
+ struct flb_ml_parser_ins *parser_i;
+ struct flb_ml_stream *mst;
+ struct flb_ml_stream_group *st_group;
+ struct flb_log_event event;
+
+ if (metadata == NULL) {
+ metadata = ml->log_event_decoder.empty_map;
+ }
+
+ /*
+ * As incoming objects, we accept packed events
+ * and msgpack Maps containing key/value pairs.
+ */
+ if (obj->type == MSGPACK_OBJECT_ARRAY) {
+ flb_error("[multiline] appending object with invalid type, expected "
+ "map, received type=%i", obj->type);
+ return -1;
+
+
+ flb_log_event_decoder_reset(&ml->log_event_decoder, NULL, 0);
+
+ ret = flb_event_decoder_decode_object(&ml->log_event_decoder,
+ &event,
+ obj);
+
+ if (ret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_error("[multiline] invalid event object");
+
+ return -1;
+ }
+
+ tm = &event.timestamp;
+ obj = event.body;
+ metadata = event.metadata;
+
+ type = FLB_ML_TYPE_MAP;
+ }
+ else if (obj->type == MSGPACK_OBJECT_MAP) {
+ type = FLB_ML_TYPE_MAP;
+ }
+ else {
+ flb_error("[multiline] appending object with invalid type, expected "
+ "array or map, received type=%i", obj->type);
+ return -1;
+ }
+
+ mk_list_foreach(head, &ml->groups) {
+ group = mk_list_entry(head, struct flb_ml_group, _head);
+
+ /* Check if the incoming data matches the last recently used parser */
+ lru_parser = group->lru_parser;
+
+ if (lru_parser && lru_parser->last_stream_id == stream_id) {
+ ret = ml_append_try_parser(lru_parser, lru_parser->last_stream_id, type,
+ tm, NULL, 0, metadata, obj);
+ if (ret == 0) {
+ processed = FLB_TRUE;
+ break;
+ }
+ else {
+ flb_ml_flush_parser_instance(ml,
+ lru_parser,
+ lru_parser->last_stream_id,
+ FLB_FALSE);
+ }
+ }
+ else if (lru_parser && lru_parser->last_stream_id > 0) {
+ /*
+ * Clear last recently used parser to match new parser.
+ * Do not flush last_stream_id since it should continue to parsing.
+ */
+ lru_parser = NULL;
+ }
+ }
+
+ mk_list_foreach(head_group, &group->parsers) {
+ parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head);
+ if (lru_parser && parser_i == lru_parser) {
+ continue;
+ }
+
+ ret = ml_append_try_parser(parser_i, stream_id, type,
+ tm, NULL, 0, metadata, obj);
+ if (ret == 0) {
+ group->lru_parser = parser_i;
+ group->lru_parser->last_stream_id = stream_id;
+ lru_parser = parser_i;
+ processed = FLB_TRUE;
+ break;
+ }
+ else {
+ parser_i = NULL;
+ }
+ }
+
+ if (!processed) {
+ if (lru_parser) {
+ flb_ml_flush_parser_instance(ml, lru_parser, stream_id, FLB_FALSE);
+ parser_i = lru_parser;
+ }
+ else {
+ /* get the first parser (just to make use of it buffers) */
+ parser_i = mk_list_entry_first(&group->parsers,
+ struct flb_ml_parser_ins,
+ _head);
+ }
+
+ flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_FALSE);
+ mst = flb_ml_stream_get(parser_i, stream_id);
+ if (!mst) {
+ flb_error("[multiline] invalid stream_id %" PRIu64 ", could not "
+ "append content to multiline context", stream_id);
+
+ return -1;
+ }
+
+ /* Get stream group */
+ st_group = flb_ml_stream_group_get(mst->parser, mst, NULL);
+
+ ret = flb_log_event_encoder_begin_record(&ml->log_event_encoder);
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_timestamp(
+ &ml->log_event_encoder, tm);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ if (metadata != ml->log_event_decoder.empty_map) {
+ ret = flb_log_event_encoder_set_metadata_from_msgpack_object(
+ &ml->log_event_encoder, metadata);
+ }
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_body_from_msgpack_object(
+ &ml->log_event_encoder, obj);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_commit_record(&ml->log_event_encoder);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ mst->cb_flush(parser_i->ml_parser,
+ mst,
+ mst->cb_data,
+ ml->log_event_encoder.output_buffer,
+ ml->log_event_encoder.output_length);
+ }
+ else {
+ flb_error("[multiline] log event encoder error : %d", ret);
+ }
+
+ flb_log_event_encoder_reset(&ml->log_event_encoder);
+
+ /* reset group buffer counters */
+ st_group->mp_sbuf.size = 0;
+ flb_sds_len_set(st_group->buf, 0);
+
+ /* Update last flush time */
+ st_group->last_flush = time_ms_now();
+ }
+
+ return 0;
+}
+
+int flb_ml_append_event(struct flb_ml *ml, uint64_t stream_id,
+ struct flb_log_event *event)
+{
+ return flb_ml_append_object(ml,
+ stream_id,
+ &event->timestamp,
+ event->metadata,
+ event->body);
+}
+
+
+struct flb_ml *flb_ml_create(struct flb_config *ctx, char *name)
+{
+ int result;
+ struct flb_ml *ml;
+
+ ml = flb_calloc(1, sizeof(struct flb_ml));
+ if (!ml) {
+ flb_errno();
+ return NULL;
+ }
+ ml->name = flb_sds_create(name);
+ if (!ml) {
+ flb_free(ml);
+ return NULL;
+ }
+
+ ml->config = ctx;
+ ml->last_flush = time_ms_now();
+ mk_list_init(&ml->groups);
+
+ result = flb_log_event_decoder_init(&ml->log_event_decoder,
+ NULL,
+ 0);
+
+ if (result != FLB_EVENT_DECODER_SUCCESS) {
+ flb_error("cannot initialize log event decoder");
+
+ flb_ml_destroy(ml);
+
+ return NULL;
+ }
+
+ result = flb_log_event_encoder_init(&ml->log_event_encoder,
+ FLB_LOG_EVENT_FORMAT_DEFAULT);
+
+ if (result != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_error("cannot initialize log event encoder");
+
+ flb_ml_destroy(ml);
+
+ return NULL;
+ }
+
+ return ml;
+}
+
+/*
+ * Some multiline contexts might define a parser name but not a parser context,
+ * for missing contexts, just lookup the parser and perform the assignment.
+ *
+ * The common use case is when reading config files with [PARSER] and [MULTILINE_PARSER]
+ * definitions, so we need to delay the parser loading.
+ */
+int flb_ml_parsers_init(struct flb_config *ctx)
+{
+ struct mk_list *head;
+ struct flb_parser *p;
+ struct flb_ml_parser *ml_parser;
+
+ mk_list_foreach(head, &ctx->multiline_parsers) {
+ ml_parser = mk_list_entry(head, struct flb_ml_parser, _head);
+ if (ml_parser->parser_name && !ml_parser->parser) {
+ p = flb_parser_get(ml_parser->parser_name, ctx);
+ if (!p) {
+ flb_error("multiline parser '%s' points to an undefined parser '%s'",
+ ml_parser->name, ml_parser->parser_name);
+ return -1;
+ }
+ ml_parser->parser = p;
+ }
+ }
+
+ return 0;
+}
+
+int flb_ml_auto_flush_init(struct flb_ml *ml)
+{
+ struct flb_sched *scheduler;
+ int ret;
+
+ if (ml == NULL) {
+ return -1;
+ }
+
+ scheduler = flb_sched_ctx_get();
+
+ if (scheduler == NULL) {
+ flb_error("[multiline] scheduler context has not been created");
+ return -1;
+ }
+
+ if (ml->flush_ms < 500) {
+ flb_error("[multiline] flush timeout '%i' is too low", ml->flush_ms);
+ return -1;
+ }
+
+ /* Create flush timer */
+ ret = flb_sched_timer_cb_create(scheduler,
+ FLB_SCHED_TIMER_CB_PERM,
+ ml->flush_ms,
+ cb_ml_flush_timer,
+ ml, NULL);
+ return ret;
+}
+
+int flb_ml_destroy(struct flb_ml *ml)
+{
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct flb_ml_group *group;
+
+ if (!ml) {
+ return 0;
+ }
+
+ flb_log_event_decoder_destroy(&ml->log_event_decoder);
+ flb_log_event_encoder_destroy(&ml->log_event_encoder);
+
+ if (ml->name) {
+ flb_sds_destroy(ml->name);
+ }
+
+ /* destroy groups */
+ mk_list_foreach_safe(head, tmp, &ml->groups) {
+ group = mk_list_entry(head, struct flb_ml_group, _head);
+ flb_ml_group_destroy(group);
+ }
+
+ flb_free(ml);
+ return 0;
+}
+
+static int flb_msgpack_object_hash_internal(cfl_hash_state_t *state,
+ msgpack_object *object)
+{
+ void *dummy_pointer;
+ int result;
+ int index;
+
+ if (object == NULL) {
+ return 0;
+ }
+
+ dummy_pointer = NULL;
+ result = 0;
+
+ if (object->type == MSGPACK_OBJECT_NIL) {
+ cfl_hash_64bits_update(state,
+ &dummy_pointer,
+ sizeof(dummy_pointer));
+ }
+ else if (object->type == MSGPACK_OBJECT_BOOLEAN) {
+ cfl_hash_64bits_update(state,
+ &object->via.boolean,
+ sizeof(object->via.boolean));
+ }
+ else if (object->type == MSGPACK_OBJECT_POSITIVE_INTEGER ||
+ object->type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
+ cfl_hash_64bits_update(state,
+ &object->via.u64,
+ sizeof(object->via.u64));
+ }
+ else if (object->type == MSGPACK_OBJECT_FLOAT32 ||
+ object->type == MSGPACK_OBJECT_FLOAT64 ||
+ object->type == MSGPACK_OBJECT_FLOAT) {
+ cfl_hash_64bits_update(state,
+ &object->via.f64,
+ sizeof(object->via.f64));
+ }
+ else if (object->type == MSGPACK_OBJECT_STR) {
+ cfl_hash_64bits_update(state,
+ object->via.str.ptr,
+ object->via.str.size);
+ }
+ else if (object->type == MSGPACK_OBJECT_ARRAY) {
+ for (index = 0 ;
+ index < object->via.array.size &&
+ result == 0;
+ index++) {
+ result = flb_msgpack_object_hash_internal(
+ state,
+ &object->via.array.ptr[index]);
+ }
+ }
+ else if (object->type == MSGPACK_OBJECT_MAP) {
+ for (index = 0 ;
+ index < object->via.map.size &&
+ result == 0;
+ index++) {
+ result = flb_msgpack_object_hash_internal(
+ state,
+ &object->via.map.ptr[index].key);
+
+ if (result == 0) {
+ result = flb_msgpack_object_hash_internal(
+ state,
+ &object->via.map.ptr[index].val);
+ }
+ }
+ }
+ else if (object->type == MSGPACK_OBJECT_BIN) {
+ cfl_hash_64bits_update(state,
+ object->via.bin.ptr,
+ object->via.bin.size);
+ }
+ else if (object->type == MSGPACK_OBJECT_EXT) {
+ cfl_hash_64bits_update(state,
+ &object->via.ext.type,
+ sizeof(object->via.ext.type));
+
+ cfl_hash_64bits_update(state,
+ object->via.ext.ptr,
+ object->via.ext.size);
+ }
+
+ return result;
+}
+
+static int flb_hash_msgpack_object_list(cfl_hash_64bits_t *hash,
+ size_t entry_count,
+ ...)
+{
+ cfl_hash_state_t hash_state;
+ va_list arguments;
+ msgpack_object *object;
+ int result;
+ size_t index;
+
+ cfl_hash_64bits_reset(&hash_state);
+
+ va_start(arguments, entry_count);
+
+ result = 0;
+
+ for (index = 0 ;
+ index < entry_count &&
+ result == 0 ;
+ index++) {
+ object = va_arg(arguments, msgpack_object *);
+
+ if (object == NULL) {
+ break;
+ }
+
+ result = flb_msgpack_object_hash_internal(&hash_state, object);
+ }
+
+ va_end(arguments);
+
+ if (result == 0) {
+ *hash = cfl_hash_64bits_digest(&hash_state);
+ }
+
+ return result;
+}
+
+struct flb_deduplication_list_entry {
+ cfl_hash_64bits_t hash;
+ struct cfl_list _head;
+};
+
+void flb_deduplication_list_init(struct cfl_list *deduplication_list)
+{
+ cfl_list_init(deduplication_list);
+}
+
+int flb_deduplication_list_validate(struct cfl_list *deduplication_list,
+ cfl_hash_64bits_t hash)
+{
+ struct cfl_list *iterator;
+ struct flb_deduplication_list_entry *entry;
+
+ cfl_list_foreach(iterator, deduplication_list) {
+ entry = cfl_list_entry(iterator,
+ struct flb_deduplication_list_entry,
+ _head);
+
+ if (entry->hash == hash) {
+ return FLB_TRUE;
+ }
+ }
+
+ return FLB_FALSE;
+}
+
+int flb_deduplication_list_add(struct cfl_list *deduplication_list,
+ cfl_hash_64bits_t hash)
+{
+ struct flb_deduplication_list_entry *entry;
+
+ entry = (struct flb_deduplication_list_entry *)
+ flb_calloc(1,
+ sizeof(struct flb_deduplication_list_entry));
+
+ if (entry == NULL) {
+ return -1;
+ }
+
+ cfl_list_entry_init(&entry->_head);
+ entry->hash = hash;
+
+ cfl_list_append(&entry->_head, deduplication_list);
+
+ return 0;
+}
+
+void flb_deduplication_list_purge(struct cfl_list *deduplication_list)
+{
+ struct cfl_list *iterator;
+ struct cfl_list *backup;
+ struct flb_deduplication_list_entry *entry;
+
+ cfl_list_foreach_safe(iterator, backup, deduplication_list) {
+ entry = cfl_list_entry(iterator,
+ struct flb_deduplication_list_entry,
+ _head);
+
+ cfl_list_del(&entry->_head);
+
+ free(entry);
+ }
+}
+
+int flb_ml_flush_metadata_buffer(struct flb_ml_stream *mst,
+ struct flb_ml_stream_group *group,
+ int deduplicate_metadata)
+{
+ int append_metadata_entry;
+ cfl_hash_64bits_t metadata_entry_hash;
+ struct cfl_list deduplication_list;
+ msgpack_unpacked metadata_map;
+ size_t offset;
+ size_t index;
+ msgpack_object value;
+ msgpack_object key;
+ int ret;
+
+ ret = FLB_EVENT_ENCODER_SUCCESS;
+
+ if (deduplicate_metadata) {
+ flb_deduplication_list_init(&deduplication_list);
+ }
+
+ msgpack_unpacked_init(&metadata_map);
+
+ offset = 0;
+ while (ret == FLB_EVENT_ENCODER_SUCCESS &&
+ msgpack_unpack_next(&metadata_map,
+ group->mp_md_sbuf.data,
+ group->mp_md_sbuf.size,
+ &offset) == MSGPACK_UNPACK_SUCCESS) {
+
+ for (index = 0;
+ index < metadata_map.data.via.map.size &&
+ ret == FLB_EVENT_ENCODER_SUCCESS;
+ index++) {
+ key = metadata_map.data.via.map.ptr[index].key;
+ value = metadata_map.data.via.map.ptr[index].val;
+
+ append_metadata_entry = FLB_TRUE;
+
+ if (deduplicate_metadata) {
+ ret = flb_hash_msgpack_object_list(&metadata_entry_hash,
+ 2,
+ &key,
+ &value);
+ if (ret != 0) {
+ ret = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT;
+ }
+ else {
+ ret = flb_deduplication_list_validate(
+ &deduplication_list,
+ metadata_entry_hash);
+
+ if (ret) {
+ append_metadata_entry = FLB_FALSE;
+
+ ret = FLB_EVENT_ENCODER_SUCCESS;
+ }
+ else {
+ ret = flb_deduplication_list_add(
+ &deduplication_list,
+ metadata_entry_hash);
+
+ if (ret == 0) {
+ ret = FLB_EVENT_ENCODER_SUCCESS;
+ }
+ else {
+ ret = FLB_EVENT_ENCODER_ERROR_ALLOCATION_ERROR;
+ }
+ }
+ }
+ }
+
+ if (append_metadata_entry) {
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_append_metadata_values(
+ &mst->ml->log_event_encoder,
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&key),
+ FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&value));
+ }
+ }
+ }
+ }
+
+ msgpack_unpacked_destroy(&metadata_map);
+
+ if (deduplicate_metadata) {
+ flb_deduplication_list_purge(&deduplication_list);
+ }
+
+ return ret;
+}
+
+int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser,
+ struct flb_ml_stream *mst,
+ struct flb_ml_stream_group *group,
+ int forced_flush)
+{
+ int i;
+ int ret;
+ int size;
+ int len;
+ size_t off = 0;
+ msgpack_object map;
+ msgpack_object k;
+ msgpack_object v;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_packer mp_pck;
+ msgpack_unpacked result;
+ struct flb_ml_parser_ins *parser_i = mst->parser;
+ struct flb_time *group_time;
+ struct flb_time now;
+
+ breakline_prepare(parser_i, group);
+ len = flb_sds_len(group->buf);
+
+ /* init msgpack buffer */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ /* if the group don't have a time set, use current time */
+ if (flb_time_to_nanosec(&group->mp_time) == 0L) {
+ flb_time_get(&now);
+ group_time = &now;
+ } else {
+ group_time = &group->mp_time;
+ }
+
+ /* compose final record if we have a first line context */
+ if (group->mp_sbuf.size > 0) {
+ msgpack_unpacked_init(&result);
+ ret = msgpack_unpack_next(&result,
+ group->mp_sbuf.data, group->mp_sbuf.size,
+ &off);
+ if (ret != MSGPACK_UNPACK_SUCCESS) {
+ flb_error("[multiline] could not unpack first line state buffer");
+ msgpack_unpacked_destroy(&result);
+ group->mp_sbuf.size = 0;
+ return -1;
+ }
+ map = result.data;
+
+ if (map.type != MSGPACK_OBJECT_MAP) {
+ flb_error("[multiline] expected MAP type in first line state buffer");
+ msgpack_unpacked_destroy(&result);
+ group->mp_sbuf.size = 0;
+ return -1;
+ }
+
+ /* Take the first line keys and repack */
+ len = flb_sds_len(parser_i->key_content);
+ size = map.via.map.size;
+ msgpack_pack_map(&mp_pck, size);
+
+ for (i = 0; i < size; i++) {
+ k = map.via.map.ptr[i].key;
+ v = map.via.map.ptr[i].val;
+
+ /*
+ * Check if the current key is the key that will contain the
+ * concatenated multiline buffer
+ */
+ if (k.type == MSGPACK_OBJECT_STR &&
+ parser_i->key_content &&
+ k.via.str.size == len &&
+ strncmp(k.via.str.ptr, parser_i->key_content, len) == 0) {
+
+ /* key */
+ msgpack_pack_object(&mp_pck, k);
+
+ /* value */
+ len = flb_sds_len(group->buf);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, group->buf, len);
+ }
+ else {
+ /* key / val */
+ msgpack_pack_object(&mp_pck, k);
+ msgpack_pack_object(&mp_pck, v);
+ }
+ }
+ msgpack_unpacked_destroy(&result);
+ group->mp_sbuf.size = 0;
+ }
+ else if (len > 0) {
+ /* Pack raw content as Fluent Bit record */
+ msgpack_pack_map(&mp_pck, 1);
+
+ /* key */
+ if (parser_i->key_content) {
+ len = flb_sds_len(parser_i->key_content);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, parser_i->key_content, len);
+ }
+ else {
+ msgpack_pack_str(&mp_pck, 3);
+ msgpack_pack_str_body(&mp_pck, "log", 3);
+ }
+
+ /* val */
+ len = flb_sds_len(group->buf);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, group->buf, len);
+ }
+
+ if (mp_sbuf.size > 0) {
+ /*
+ * a 'forced_flush' means to alert the caller that the data 'must be flushed to it destination'. This flag is
+ * only enabled when the flush process has been triggered by the multiline timer, e.g:
+ *
+ * - the message is complete or incomplete and its time to dispatch it.
+ */
+ if (forced_flush) {
+ mst->forced_flush = FLB_TRUE;
+ }
+
+ /* encode and invoke the user callback */
+
+ ret = flb_log_event_encoder_begin_record(
+ &mst->ml->log_event_encoder);
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_timestamp(
+ &mst->ml->log_event_encoder,
+ group_time);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_ml_flush_metadata_buffer(mst,
+ group,
+ FLB_TRUE);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_set_body_from_raw_msgpack(
+ &mst->ml->log_event_encoder,
+ mp_sbuf.data,
+ mp_sbuf.size);
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_log_event_encoder_commit_record(
+ &mst->ml->log_event_encoder);
+ }
+
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ flb_error("[multiline] error packing event");
+
+ return -1;
+ }
+
+ if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+ mst->cb_flush(ml_parser,
+ mst,
+ mst->cb_data,
+ mst->ml->log_event_encoder.output_buffer,
+ mst->ml->log_event_encoder.output_length);
+ }
+ else {
+ flb_error("[multiline] log event encoder error : %d", ret);
+ }
+
+ flb_log_event_encoder_reset(&mst->ml->log_event_encoder);
+
+ if (forced_flush) {
+ mst->forced_flush = FLB_FALSE;
+ }
+ }
+
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ flb_sds_len_set(group->buf, 0);
+
+ /* Update last flush time */
+ group->last_flush = time_ms_now();
+
+ return 0;
+}
+
+/*
+ * Initialize multiline global environment.
+ *
+ * note: function must be invoked before any flb_ml_create() call.
+ */
+int flb_ml_init(struct flb_config *config)
+{
+ int ret;
+
+ ret = flb_ml_parser_builtin_create(config);
+ if (ret == -1) {
+ return -1;
+ }
+
+ return 0;
+}
+
+int flb_ml_exit(struct flb_config *config)
+{
+ flb_ml_parser_destroy_all(&config->multiline_parsers);
+ return 0;
+}
diff --git a/fluent-bit/src/multiline/flb_ml_group.c b/fluent-bit/src/multiline/flb_ml_group.c
new file mode 100644
index 000000000..895a71056
--- /dev/null
+++ b/fluent-bit/src/multiline/flb_ml_group.c
@@ -0,0 +1,86 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_parser.h>
+
+struct flb_ml_group *flb_ml_group_create(struct flb_ml *ml)
+{
+ struct flb_ml_group *group;
+
+ group = flb_calloc(1, sizeof(struct flb_ml_group));
+ if (!group) {
+ flb_errno();
+ return NULL;
+ }
+ group->id = mk_list_size(&ml->groups);
+ group->ml = ml;
+ group->lru_parser = NULL;
+ mk_list_init(&group->parsers);
+
+ mk_list_add(&group->_head, &ml->groups);
+
+ return group;
+}
+
+/*
+ * Link a parser instance into the active group, if no group exists, a default
+ * one is created.
+ */
+int flb_ml_group_add_parser(struct flb_ml *ctx, struct flb_ml_parser_ins *p)
+{
+ struct flb_ml_group *group = NULL;
+
+ if (mk_list_size(&ctx->groups) == 0) {
+ group = flb_ml_group_create(ctx);
+ if (!group) {
+ return -1;
+ }
+ }
+ else {
+ /* retrieve the latest active group */
+ group = mk_list_entry_last(&ctx->groups, struct flb_ml_group, _head);
+ }
+
+ if (!group) {
+ return -1;
+ }
+
+ mk_list_add(&p->_head, &group->parsers);
+ return 0;
+}
+
+void flb_ml_group_destroy(struct flb_ml_group *group)
+{
+ struct mk_list *head;
+ struct mk_list *tmp;
+ struct flb_ml_parser_ins *parser_i;
+
+ /* destroy parser instances */
+ mk_list_foreach_safe(head, tmp, &group->parsers) {
+ parser_i = mk_list_entry(head, struct flb_ml_parser_ins, _head);
+ flb_ml_parser_instance_destroy(parser_i);
+ }
+
+ mk_list_del(&group->_head);
+ flb_free(group);
+}
diff --git a/fluent-bit/src/multiline/flb_ml_mode.c b/fluent-bit/src/multiline/flb_ml_mode.c
new file mode 100644
index 000000000..964672d82
--- /dev/null
+++ b/fluent-bit/src/multiline/flb_ml_mode.c
@@ -0,0 +1,111 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_mode.h>
+
+struct flb_ml *flb_ml_mode_create(struct flb_config *config, char *mode, int flush_ms,
+ char *key)
+{
+ if (strcmp(mode, "docker") == 0) {
+ return flb_ml_mode_docker(config, flush_ms);
+ }
+ else if (strcmp(mode, "cri") == 0) {
+ return flb_ml_mode_cri(config, flush_ms);
+ }
+ else if (strcmp(mode, "python") == 0) {
+ return flb_ml_mode_python(config, flush_ms, key);
+ }
+ else if (strcmp(mode, "java") == 0) {
+ return flb_ml_mode_java(config, flush_ms, key);
+ }
+ else if (strcmp(mode, "go") == 0) {
+ return flb_ml_mode_go(config, flush_ms, key);
+ }
+
+ flb_error("[multiline] built-in mode '%s' not found", mode);
+ return NULL;
+}
+
+
+struct flb_ml_mode *flb_ml_parser_create(struct flb_config *ctx,
+ char *name,
+ int type, char *match_str, int negate,
+ int flush_ms,
+ char *key_content,
+ char *key_group,
+ char *key_pattern,
+ struct flb_parser *parser_ctx,
+ char *parser_name)
+{
+ struct flb_ml_mode *ml;
+
+ ml = flb_calloc(1, sizeof(struct flb_ml));
+ if (!ml) {
+ flb_errno();
+ return NULL;
+ }
+ ml->name = flb_sds_create(name);
+ ml->type = type;
+
+ if (match_str) {
+ ml->match_str = flb_sds_create(match_str);
+ if (!ml->match_str) {
+ flb_free(ml);
+ return NULL;
+ }
+ }
+
+ ml->parser = parser_ctx;
+ if (parser_name) {
+ ml->parser_name = flb_sds_create(parser_name);
+ }
+
+ ml->negate = negate;
+ mk_list_init(&ml->streams);
+ mk_list_init(&ml->regex_rules);
+ mk_list_add(&ml->_head, &ctx->multilines);
+
+ if (key_content) {
+ ml->key_content = flb_sds_create(key_content);
+ if (!ml->key_content) {
+ flb_ml_destroy(ml);
+ return NULL;
+ }
+ }
+
+ if (key_group) {
+ ml->key_group = flb_sds_create(key_group);
+ if (!ml->key_group) {
+ flb_ml_destroy(ml);
+ return NULL;
+ }
+ }
+
+ if (key_pattern) {
+ ml->key_pattern = flb_sds_create(key_pattern);
+ if (!ml->key_pattern) {
+ flb_ml_destroy(ml);
+ return NULL;
+ }
+ }
+ return ml;
+}
diff --git a/fluent-bit/src/multiline/flb_ml_parser.c b/fluent-bit/src/multiline/flb_ml_parser.c
new file mode 100644
index 000000000..7aa33789d
--- /dev/null
+++ b/fluent-bit/src/multiline/flb_ml_parser.c
@@ -0,0 +1,347 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_rule.h>
+#include <fluent-bit/multiline/flb_ml_mode.h>
+#include <fluent-bit/multiline/flb_ml_group.h>
+
+int flb_ml_parser_init(struct flb_ml_parser *ml_parser)
+{
+ int ret;
+
+ ret = flb_ml_rule_init(ml_parser);
+ if (ret == -1) {
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Create built-in multiline parsers */
+int flb_ml_parser_builtin_create(struct flb_config *config)
+{
+ struct flb_ml_parser *mlp;
+
+ /* Docker */
+ mlp = flb_ml_parser_docker(config);
+ if (!mlp) {
+ flb_error("[multiline] could not init 'docker' built-in parser");
+ return -1;
+ }
+
+ /* CRI */
+ mlp = flb_ml_parser_cri(config);
+ if (!mlp) {
+ flb_error("[multiline] could not init 'cri' built-in parser");
+ return -1;
+ }
+
+ /* Java */
+ mlp = flb_ml_parser_java(config, NULL);
+ if (!mlp) {
+ flb_error("[multiline] could not init 'java' built-in parser");
+ return -1;
+ }
+
+ /* Go */
+ mlp = flb_ml_parser_go(config, NULL);
+ if (!mlp) {
+ flb_error("[multiline] could not init 'go' built-in parser");
+ return -1;
+ }
+
+ /* Ruby */
+ mlp = flb_ml_parser_ruby(config, NULL);
+ if (!mlp) {
+ flb_error("[multiline] could not init 'ruby' built-in parser");
+ return -1;
+ }
+
+ /* Python */
+ mlp = flb_ml_parser_python(config, NULL);
+ if (!mlp) {
+ flb_error("[multiline] could not init 'python' built-in parser");
+ return -1;
+ }
+
+ return 0;
+}
+
+struct flb_ml_parser *flb_ml_parser_create(struct flb_config *ctx,
+ char *name,
+ int type, char *match_str, int negate,
+ int flush_ms,
+ char *key_content,
+ char *key_group,
+ char *key_pattern,
+ struct flb_parser *parser_ctx,
+ char *parser_name)
+{
+ struct flb_ml_parser *ml_parser;
+
+ ml_parser = flb_calloc(1, sizeof(struct flb_ml_parser));
+ if (!ml_parser) {
+ flb_errno();
+ return NULL;
+ }
+ ml_parser->name = flb_sds_create(name);
+ ml_parser->type = type;
+
+ if (match_str) {
+ ml_parser->match_str = flb_sds_create(match_str);
+ if (!ml_parser->match_str) {
+ if (ml_parser->name) {
+ flb_sds_destroy(ml_parser->name);
+ }
+ flb_free(ml_parser);
+ return NULL;
+ }
+ }
+
+ ml_parser->parser = parser_ctx;
+
+ if (parser_name) {
+ ml_parser->parser_name = flb_sds_create(parser_name);
+ }
+ ml_parser->negate = negate;
+ ml_parser->flush_ms = flush_ms;
+ mk_list_init(&ml_parser->regex_rules);
+ mk_list_add(&ml_parser->_head, &ctx->multiline_parsers);
+
+ if (key_content) {
+ ml_parser->key_content = flb_sds_create(key_content);
+ if (!ml_parser->key_content) {
+ flb_ml_parser_destroy(ml_parser);
+ return NULL;
+ }
+ }
+
+ if (key_group) {
+ ml_parser->key_group = flb_sds_create(key_group);
+ if (!ml_parser->key_group) {
+ flb_ml_parser_destroy(ml_parser);
+ return NULL;
+ }
+ }
+
+ if (key_pattern) {
+ ml_parser->key_pattern = flb_sds_create(key_pattern);
+ if (!ml_parser->key_pattern) {
+ flb_ml_parser_destroy(ml_parser);
+ return NULL;
+ }
+ }
+
+ return ml_parser;
+}
+
+struct flb_ml_parser *flb_ml_parser_get(struct flb_config *ctx, char *name)
+{
+ struct mk_list *head;
+ struct flb_ml_parser *ml_parser;
+
+ mk_list_foreach(head, &ctx->multiline_parsers) {
+ ml_parser = mk_list_entry(head, struct flb_ml_parser, _head);
+ if (strcasecmp(ml_parser->name, name) == 0) {
+ return ml_parser;
+ }
+ }
+
+ return NULL;
+}
+
+int flb_ml_parser_instance_has_data(struct flb_ml_parser_ins *ins)
+{
+ struct mk_list *head;
+ struct mk_list *head_group;
+ struct flb_ml_stream *st;
+ struct flb_ml_stream_group *st_group;
+
+ mk_list_foreach(head, &ins->streams) {
+ st = mk_list_entry(head, struct flb_ml_stream, _head);
+ mk_list_foreach(head_group, &st->groups) {
+ st_group = mk_list_entry(head_group, struct flb_ml_stream_group, _head);
+ if (st_group->mp_sbuf.size > 0) {
+ return FLB_TRUE;
+ }
+ }
+ }
+
+ return FLB_FALSE;
+}
+
+struct flb_ml_parser_ins *flb_ml_parser_instance_create(struct flb_ml *ml,
+ char *name)
+{
+ int ret;
+ struct flb_ml_parser_ins *ins;
+ struct flb_ml_parser *parser;
+
+ parser = flb_ml_parser_get(ml->config, name);
+ if (!parser) {
+ flb_error("[multiline] parser '%s' not registered", name);
+ return NULL;
+ }
+
+ ins = flb_calloc(1, sizeof(struct flb_ml_parser_ins));
+ if (!ins) {
+ flb_errno();
+ return NULL;
+ }
+ ins->last_stream_id = 0;
+ ins->ml_parser = parser;
+ mk_list_init(&ins->streams);
+
+ /* Copy parent configuration */
+ if (parser->key_content) {
+ ins->key_content = flb_sds_create(parser->key_content);
+ }
+ if (parser->key_pattern) {
+ ins->key_pattern = flb_sds_create(parser->key_pattern);
+ }
+ if (parser->key_group) {
+ ins->key_group = flb_sds_create(parser->key_group);
+ }
+
+ /* Append this multiline parser instance to the active multiline group */
+ ret = flb_ml_group_add_parser(ml, ins);
+ if (ret != 0) {
+ flb_error("[multiline] could not register parser '%s' on "
+ "multiline '%s 'group", name, ml->name);
+ flb_free(ins);
+ return NULL;
+ }
+
+ /*
+ * Update flush_interval for pending records on multiline context. We always
+ * use the greater value found.
+ */
+ if (parser->flush_ms > ml->flush_ms) {
+ ml->flush_ms = parser->flush_ms;
+ }
+
+ return ins;
+}
+
+/* Override a fixed parser property for the instance only*/
+int flb_ml_parser_instance_set(struct flb_ml_parser_ins *p, char *prop, char *val)
+{
+ if (strcasecmp(prop, "key_content") == 0) {
+ if (p->key_content) {
+ flb_sds_destroy(p->key_content);
+ }
+ p->key_content = flb_sds_create(val);
+ }
+ else if (strcasecmp(prop, "key_pattern") == 0) {
+ if (p->key_pattern) {
+ flb_sds_destroy(p->key_pattern);
+ }
+ p->key_pattern = flb_sds_create(val);
+ }
+ else if (strcasecmp(prop, "key_group") == 0) {
+ if (p->key_group) {
+ flb_sds_destroy(p->key_group);
+ }
+ p->key_group = flb_sds_create(val);
+ }
+ else {
+ return -1;
+ }
+
+ return 0;
+}
+
+int flb_ml_parser_destroy(struct flb_ml_parser *ml_parser)
+{
+ if (!ml_parser) {
+ return 0;
+ }
+
+ if (ml_parser->name) {
+ flb_sds_destroy(ml_parser->name);
+ }
+
+ if (ml_parser->parser_name) {
+ flb_sds_destroy(ml_parser->parser_name);
+ }
+
+ if (ml_parser->match_str) {
+ flb_sds_destroy(ml_parser->match_str);
+ }
+ if (ml_parser->key_content) {
+ flb_sds_destroy(ml_parser->key_content);
+ }
+ if (ml_parser->key_group) {
+ flb_sds_destroy(ml_parser->key_group);
+ }
+ if (ml_parser->key_pattern) {
+ flb_sds_destroy(ml_parser->key_pattern);
+ }
+
+ /* Regex rules */
+ flb_ml_rule_destroy_all(ml_parser);
+
+ /* Unlink from struct flb_config->multiline_parsers */
+ mk_list_del(&ml_parser->_head);
+
+ flb_free(ml_parser);
+ return 0;
+}
+
+int flb_ml_parser_instance_destroy(struct flb_ml_parser_ins *ins)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_ml_stream *stream;
+
+ /* Destroy streams */
+ mk_list_foreach_safe(head, tmp, &ins->streams) {
+ stream = mk_list_entry(head, struct flb_ml_stream, _head);
+ flb_ml_stream_destroy(stream);
+ }
+
+ if (ins->key_content) {
+ flb_sds_destroy(ins->key_content);
+ }
+ if (ins->key_pattern) {
+ flb_sds_destroy(ins->key_pattern);
+ }
+ if (ins->key_group) {
+ flb_sds_destroy(ins->key_group);
+ }
+
+ flb_free(ins);
+
+ return 0;
+}
+
+void flb_ml_parser_destroy_all(struct mk_list *list)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_ml_parser *parser;
+
+ mk_list_foreach_safe(head, tmp, list) {
+ parser = mk_list_entry(head, struct flb_ml_parser, _head);
+ flb_ml_parser_destroy(parser);
+ }
+}
diff --git a/fluent-bit/src/multiline/flb_ml_parser_cri.c b/fluent-bit/src/multiline/flb_ml_parser_cri.c
new file mode 100644
index 000000000..669fa39a2
--- /dev/null
+++ b/fluent-bit/src/multiline/flb_ml_parser_cri.c
@@ -0,0 +1,81 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_parser.h>
+
+#define FLB_ML_CRI_REGEX \
+ "^(?<time>.+?) (?<stream>stdout|stderr) (?<_p>F|P) (?<log>.*)$"
+#define FLB_ML_CRI_TIME \
+ "%Y-%m-%dT%H:%M:%S.%L%z"
+
+/* Creates a parser for Docker */
+static struct flb_parser *cri_parser_create(struct flb_config *config)
+{
+ struct flb_parser *p;
+
+ p = flb_parser_create("_ml_cri", /* parser name */
+ "regex", /* backend type */
+ FLB_ML_CRI_REGEX, /* regex */
+ FLB_FALSE, /* skip_empty */
+ FLB_ML_CRI_TIME, /* time format */
+ "time", /* time key */
+ NULL, /* time offset */
+ FLB_TRUE, /* time keep */
+ FLB_FALSE, /* time strict */
+ FLB_FALSE, /* no bare keys */
+ NULL, /* parser types */
+ 0, /* types len */
+ NULL, /* decoders */
+ config); /* Fluent Bit context */
+ return p;
+}
+
+/* Our first multiline mode: 'docker' */
+struct flb_ml_parser *flb_ml_parser_cri(struct flb_config *config)
+{
+ struct flb_parser *parser;
+ struct flb_ml_parser *mlp;
+
+ /* Create a Docker parser */
+ parser = cri_parser_create(config);
+ if (!parser) {
+ return NULL;
+ }
+
+ mlp = flb_ml_parser_create(config,
+ "cri", /* name */
+ FLB_ML_EQ, /* type */
+ "F", /* match_str */
+ FLB_FALSE, /* negate */
+ FLB_ML_FLUSH_TIMEOUT, /* flush_ms */
+ "log", /* key_content */
+ "stream", /* key_group */
+ "_p", /* key_pattern */
+ parser, /* parser ctx */
+ NULL); /* parser name */
+
+ if (!mlp) {
+ flb_error("[multiline] could not create 'cri mode'");
+ return NULL;
+ }
+
+ return mlp;
+}
diff --git a/fluent-bit/src/multiline/flb_ml_parser_docker.c b/fluent-bit/src/multiline/flb_ml_parser_docker.c
new file mode 100644
index 000000000..5b622d32c
--- /dev/null
+++ b/fluent-bit/src/multiline/flb_ml_parser_docker.c
@@ -0,0 +1,110 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_parser.h>
+
+/* Creates a parser for Docker */
+static struct flb_parser *docker_parser_create(struct flb_config *config)
+{
+ struct flb_parser *p;
+
+ p = flb_parser_create("_ml_json_docker", /* parser name */
+ "json", /* backend type */
+ NULL, /* regex */
+ FLB_TRUE, /* skip_empty */
+ "%Y-%m-%dT%H:%M:%S.%L", /* time format */
+ "time", /* time key */
+ NULL, /* time offset */
+ FLB_TRUE, /* time keep */
+ FLB_FALSE, /* time strict */
+ FLB_FALSE, /* no bare keys */
+ NULL, /* parser types */
+ 0, /* types len */
+ NULL, /* decoders */
+ config); /* Fluent Bit context */
+ return p;
+}
+
+/* Our first multiline mode: 'docker' */
+struct flb_ml_parser *flb_ml_parser_docker(struct flb_config *config)
+{
+ struct flb_parser *parser;
+ struct flb_ml_parser *mlp;
+
+ /* Create a Docker parser */
+ parser = docker_parser_create(config);
+ if (!parser) {
+ return NULL;
+ }
+
+ /*
+ * Let's explain this multiline mode, then you (the reader) might want
+ * to submit a PR with new built-in modes :)
+ *
+ * Containerized apps under Docker writes logs to stdout/stderr. These streams
+ * (stdout/stderr) are handled by Docker, in most of cases the content is
+ * stored in a .json file in your file system. A message like "hey!" gets into
+ * a JSON map like this:
+ *
+ * {"log": "hey!\n", "stream": "stdout", "time": "2021-02-01T01:40:03.53412Z"}
+ *
+ * By Docker log spec, any 'log' key that "ends with a \n" it's a complete
+ * log record, but Docker also limits the log record size to 16KB, so a long
+ * message that does not fit into 16KB can be split in multiple JSON lines,
+ * the following example use short words to describe the context:
+ *
+ * - original message: 'one, two, three\n'
+ *
+ * Docker log interpretation:
+ *
+ * - {"log": "one, ", "stream": "stdout", "time": "2021-02-01T01:40:03.53413Z"}
+ * - {"log": "two, ", "stream": "stdout", "time": "2021-02-01T01:40:03.53414Z"}
+ * - {"log": "three\n", "stream": "stdout", "time": "2021-02-01T01:40:03.53415Z"}
+ *
+ * So every 'log' key that does not ends with '\n', it's a partial log record
+ * and for logging purposes it needs to be concatenated with further messages
+ * until a final '\n' is found.
+ *
+ * We setup the Multiline mode as follows:
+ *
+ * - Use the type 'FLB_ML_ENDSWITH' to specify that we expect the 'log'
+ * key must ends with a '\n' for complete messages, otherwise it means is
+ * a continuation message. In case a message is not complete just wait until
+ * 500 milliseconds (0.5 second) and flush the buffer.
+ */
+ mlp = flb_ml_parser_create(config, /* Fluent Bit context */
+ "docker", /* name */
+ FLB_ML_ENDSWITH, /* type */
+ "\n", /* match_str */
+ FLB_FALSE, /* negate */
+ FLB_ML_FLUSH_TIMEOUT, /* flush_ms */
+ "log", /* key_content */
+ "stream", /* key_group */
+ NULL, /* key_pattern */
+ parser, /* parser ctx */
+ NULL); /* parser name */
+ if (!mlp) {
+ flb_error("[multiline] could not create 'docker mode'");
+ return NULL;
+ }
+
+ return mlp;
+}
diff --git a/fluent-bit/src/multiline/flb_ml_parser_go.c b/fluent-bit/src/multiline/flb_ml_parser_go.c
new file mode 100644
index 000000000..f1cd5407f
--- /dev/null
+++ b/fluent-bit/src/multiline/flb_ml_parser_go.c
@@ -0,0 +1,140 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_rule.h>
+#include <fluent-bit/multiline/flb_ml_parser.h>
+
+#define rule flb_ml_rule_create
+
+static void rule_error(struct flb_ml_parser *mlp)
+{
+ int id;
+
+ id = mk_list_size(&mlp->regex_rules);
+ flb_error("[multiline: go] rule #%i could not be created", id);
+ flb_ml_parser_destroy(mlp);
+}
+
+/* Go mode */
+struct flb_ml_parser *flb_ml_parser_go(struct flb_config *config, char *key)
+{
+ int ret;
+ struct flb_ml_parser *mlp;
+
+ mlp = flb_ml_parser_create(config, /* Fluent Bit context */
+ "go", /* name */
+ FLB_ML_REGEX, /* type */
+ NULL, /* match_str */
+ FLB_FALSE, /* negate */
+ FLB_ML_FLUSH_TIMEOUT, /* flush_ms */
+ key, /* key_content */
+ NULL, /* key_group */
+ NULL, /* key_pattern */
+ NULL, /* parser ctx */
+ NULL); /* parser name */
+
+ if (!mlp) {
+ flb_error("[multiline] could not create 'go mode'");
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "start_state",
+ "/\\bpanic: /",
+ "go_after_panic", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "start_state",
+ "/http: panic serving/",
+ "go_goroutine", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "go_after_panic",
+ "/^$/",
+ "go_goroutine", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "go_after_panic, go_after_signal, go_frame_1",
+ "/^$/",
+ "go_goroutine", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "go_after_panic",
+ "/^\\[signal /",
+ "go_after_signal", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "go_goroutine",
+ "/^goroutine \\d+ \\[[^\\]]+\\]:$/",
+ "go_frame_1", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "go_frame_1",
+ "/^(?:[^\\s.:]+\\.)*[^\\s.():]+\\(|^created by /",
+ "go_frame_2", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "go_frame_2",
+ "/^\\s/",
+ "go_frame_1", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ /* Map the rules (mandatory for regex rules) */
+ ret = flb_ml_parser_init(mlp);
+ if (ret != 0) {
+ flb_error("[multiline: go] error on mapping rules");
+ flb_ml_parser_destroy(mlp);
+ return NULL;
+ }
+
+ return mlp;
+}
diff --git a/fluent-bit/src/multiline/flb_ml_parser_java.c b/fluent-bit/src/multiline/flb_ml_parser_java.c
new file mode 100644
index 000000000..4df5a00f7
--- /dev/null
+++ b/fluent-bit/src/multiline/flb_ml_parser_java.c
@@ -0,0 +1,143 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_rule.h>
+#include <fluent-bit/multiline/flb_ml_parser.h>
+
+#define rule flb_ml_rule_create
+
+static void rule_error(struct flb_ml_parser *ml_parser)
+{
+ int id;
+
+ id = mk_list_size(&ml_parser->regex_rules);
+ flb_error("[multiline: java] rule #%i could not be created", id);
+ flb_ml_parser_destroy(ml_parser);
+}
+
+/* Java mode */
+struct flb_ml_parser *flb_ml_parser_java(struct flb_config *config, char *key)
+{
+ int ret;
+ struct flb_ml_parser *mlp;
+
+ mlp = flb_ml_parser_create(config, /* Fluent Bit context */
+ "java", /* name */
+ FLB_ML_REGEX, /* type */
+ NULL, /* match_str */
+ FLB_FALSE, /* negate */
+ FLB_ML_FLUSH_TIMEOUT, /* flush_ms */
+ key, /* key_content */
+ NULL, /* key_group */
+ NULL, /* key_pattern */
+ NULL, /* parser ctx */
+ NULL); /* parser name */
+
+ if (!mlp) {
+ flb_error("[multiline] could not create 'java mode'");
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "start_state, java_start_exception",
+ "/(.)(?:Exception|Error|Throwable|V8 errors stack trace)[:\\r\\n]/",
+ "java_after_exception", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "java_after_exception",
+ "/^[\\t ]*nested exception is:[\\t ]*/",
+ "java_start_exception", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "java_after_exception",
+ "/^[\\r\\n]*$/",
+ "java_after_exception", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "java_after_exception, java",
+ "/^[\\t ]+(?:eval )?at /",
+ "java", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "java_after_exception, java",
+ /* C# nested exception */
+ "/^[\\t ]+--- End of inner exception stack trace ---$/",
+ "java", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "java_after_exception, java",
+ /* C# exception from async code */
+ "/^--- End of stack trace from previous (?x:"
+ ")location where exception was thrown ---$/",
+ "java", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "java_after_exception, java",
+ "/^[\\t ]*(?:Caused by|Suppressed):/",
+ "java_after_exception", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "java_after_exception, java",
+ "/^[\\t ]*... \\d+ (?:more|common frames omitted)/",
+ "java", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ /* Map the rules (mandatory for regex rules) */
+ ret = flb_ml_parser_init(mlp);
+ if (ret != 0) {
+ flb_error("[multiline: java] error on mapping rules");
+ flb_ml_parser_destroy(mlp);
+ return NULL;
+ }
+
+ return mlp;
+}
diff --git a/fluent-bit/src/multiline/flb_ml_parser_python.c b/fluent-bit/src/multiline/flb_ml_parser_python.c
new file mode 100644
index 000000000..a92088397
--- /dev/null
+++ b/fluent-bit/src/multiline/flb_ml_parser_python.c
@@ -0,0 +1,98 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_rule.h>
+#include <fluent-bit/multiline/flb_ml_parser.h>
+
+#define rule flb_ml_rule_create
+
+static void rule_error(struct flb_ml_parser *mlp)
+{
+ int id;
+
+ id = mk_list_size(&mlp->regex_rules);
+ flb_error("[multiline: python] rule #%i could not be created", id);
+ flb_ml_parser_destroy(mlp);
+}
+
+/* Python */
+struct flb_ml_parser *flb_ml_parser_python(struct flb_config *config, char *key)
+{
+ int ret;
+ struct flb_ml_parser *mlp;
+
+ mlp = flb_ml_parser_create(config, /* Fluent Bit context */
+ "python", /* name */
+ FLB_ML_REGEX, /* type */
+ NULL, /* match_str */
+ FLB_FALSE, /* negate */
+ FLB_ML_FLUSH_TIMEOUT, /* flush_ms */
+ key, /* key_content */
+ NULL, /* key_group */
+ NULL, /* key_pattern */
+ NULL, /* parser ctx */
+ NULL); /* parser name */
+
+ if (!mlp) {
+ flb_error("[multiline] could not create 'python mode'");
+ return NULL;
+ }
+
+ /* rule(:start_state, /^Traceback \(most recent call last\):$/, :python) */
+ ret = rule(mlp,
+ "start_state", "/^Traceback \\(most recent call last\\):$/",
+ "python", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ /* rule(:python, /^[\t ]+File /, :python_code) */
+ ret = rule(mlp, "python", "/^[\\t ]+File /", "python_code", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ /* rule(:python_code, /[^\t ]/, :python) */
+ ret = rule(mlp, "python_code", "/[^\\t ]/", "python", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ /* rule(:python, /^(?:[^\s.():]+\.)*[^\s.():]+:/, :start_state) */
+ ret = rule(mlp, "python", "/^(?:[^\\s.():]+\\.)*[^\\s.():]+:/", "start_state", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ /* Map the rules (mandatory for regex rules) */
+ ret = flb_ml_parser_init(mlp);
+ if (ret != 0) {
+ flb_error("[multiline: python] error on mapping rules");
+ flb_ml_parser_destroy(mlp);
+ return NULL;
+ }
+
+ return mlp;
+}
diff --git a/fluent-bit/src/multiline/flb_ml_parser_ruby.c b/fluent-bit/src/multiline/flb_ml_parser_ruby.c
new file mode 100644
index 000000000..780f829d0
--- /dev/null
+++ b/fluent-bit/src/multiline/flb_ml_parser_ruby.c
@@ -0,0 +1,87 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_rule.h>
+#include <fluent-bit/multiline/flb_ml_parser.h>
+
+#define rule flb_ml_rule_create
+
+static void rule_error(struct flb_ml_parser *mlp)
+{
+ int id;
+
+ id = mk_list_size(&mlp->regex_rules);
+ flb_error("[multiline: ruby] rule #%i could not be created", id);
+ flb_ml_parser_destroy(mlp);
+}
+
+/* Ruby mode */
+struct flb_ml_parser *flb_ml_parser_ruby(struct flb_config *config, char *key)
+{
+ int ret;
+ struct flb_ml_parser *mlp;
+
+ mlp = flb_ml_parser_create(config, /* Fluent Bit context */
+ "ruby", /* name */
+ FLB_ML_REGEX, /* type */
+ NULL, /* match_str */
+ FLB_FALSE, /* negate */
+ FLB_ML_FLUSH_TIMEOUT, /* flush_ms */
+ key, /* key_content */
+ NULL, /* key_group */
+ NULL, /* key_pattern */
+ NULL, /* parser ctx */
+ NULL); /* parser name */
+
+ if (!mlp) {
+ flb_error("[multiline] could not create 'ruby mode'");
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "start_state, ruby_start_exception",
+ "/^.+:\\d+:in\\s+.*/",
+ "ruby_after_exception", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+ ret = rule(mlp,
+ "ruby_after_exception, ruby",
+ "/^\\s+from\\s+.*:\\d+:in\\s+.*/",
+ "ruby", NULL);
+ if (ret != 0) {
+ rule_error(mlp);
+ return NULL;
+ }
+
+
+ /* Map the rules (mandatory for regex rules) */
+ ret = flb_ml_parser_init(mlp);
+ if (ret != 0) {
+ flb_error("[multiline: ruby] error on mapping rules");
+ flb_ml_parser_destroy(mlp);
+ return NULL;
+ }
+
+ return mlp;
+}
diff --git a/fluent-bit/src/multiline/flb_ml_rule.c b/fluent-bit/src/multiline/flb_ml_rule.c
new file mode 100644
index 000000000..26520dfed
--- /dev/null
+++ b/fluent-bit/src/multiline/flb_ml_rule.c
@@ -0,0 +1,421 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_regex.h>
+#include <fluent-bit/flb_slist.h>
+
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_rule.h>
+
+struct to_state {
+ struct flb_ml_rule *rule;
+ struct mk_list _head;
+};
+
+struct flb_slist_entry *get_start_state(struct mk_list *list)
+{
+ struct mk_list *head;
+ struct flb_slist_entry *e;
+
+ mk_list_foreach(head, list) {
+ e = mk_list_entry(head, struct flb_slist_entry, _head);
+ if (strcmp(e->str, "start_state") == 0) {
+ return e;
+ }
+ }
+
+ return NULL;
+}
+
+int flb_ml_rule_create(struct flb_ml_parser *ml_parser,
+ flb_sds_t from_states,
+ char *regex_pattern,
+ flb_sds_t to_state,
+ char *end_pattern)
+{
+ int ret;
+ int first_rule = FLB_FALSE;
+ struct flb_ml_rule *rule;
+
+ rule = flb_calloc(1, sizeof(struct flb_ml_rule));
+ if (!rule) {
+ flb_errno();
+ return -1;
+ }
+ flb_slist_create(&rule->from_states);
+ mk_list_init(&rule->to_state_map);
+
+ if (mk_list_size(&ml_parser->regex_rules) == 0) {
+ first_rule = FLB_TRUE;
+ }
+ mk_list_add(&rule->_head, &ml_parser->regex_rules);
+
+ /* from_states */
+ ret = flb_slist_split_string(&rule->from_states, from_states, ',', -1);
+ if (ret <= 0) {
+ flb_error("[multiline] rule is empty or has invalid 'from_states' tokens");
+ flb_ml_rule_destroy(rule);
+ return -1;
+ }
+
+ /* Check if the rule contains a 'start_state' */
+ if (get_start_state(&rule->from_states)) {
+ rule->start_state = FLB_TRUE;
+ }
+ else if (first_rule) {
+ flb_error("[multiline] rule don't contain a 'start_state'");
+ flb_ml_rule_destroy(rule);
+ return -1;
+ }
+
+ /* regex content pattern */
+ rule->regex = flb_regex_create(regex_pattern);
+ if (!rule->regex) {
+ flb_ml_rule_destroy(rule);
+ return -1;
+ }
+
+ /* to_state */
+ if (to_state) {
+ rule->to_state = flb_sds_create(to_state);
+ if (!rule->to_state) {
+ flb_ml_rule_destroy(rule);
+ return -1;
+ }
+ }
+
+ /* regex end pattern */
+ if (end_pattern) {
+ rule->regex_end = flb_regex_create(end_pattern);
+ if (!rule->regex_end) {
+ flb_ml_rule_destroy(rule);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+void flb_ml_rule_destroy(struct flb_ml_rule *rule)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct to_state *st;
+
+ flb_slist_destroy(&rule->from_states);
+
+ if (rule->regex) {
+ flb_regex_destroy(rule->regex);
+ }
+
+
+ if (rule->to_state) {
+ flb_sds_destroy(rule->to_state);
+ }
+
+ mk_list_foreach_safe(head, tmp, &rule->to_state_map) {
+ st = mk_list_entry(head, struct to_state, _head);
+ mk_list_del(&st->_head);
+ flb_free(st);
+ }
+
+ if (rule->regex_end) {
+ flb_regex_destroy(rule->regex_end);
+ }
+
+ mk_list_del(&rule->_head);
+ flb_free(rule);
+}
+
+void flb_ml_rule_destroy_all(struct flb_ml_parser *ml_parser)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_ml_rule *rule;
+
+ mk_list_foreach_safe(head, tmp, &ml_parser->regex_rules) {
+ rule = mk_list_entry(head, struct flb_ml_rule, _head);
+ flb_ml_rule_destroy(rule);
+ }
+}
+
+static inline int to_states_exists(struct flb_ml_parser *ml_parser,
+ flb_sds_t state)
+{
+ struct mk_list *head;
+ struct mk_list *r_head;
+ struct flb_ml_rule *rule;
+ struct flb_slist_entry *e;
+
+ mk_list_foreach(head, &ml_parser->regex_rules) {
+ rule = mk_list_entry(head, struct flb_ml_rule, _head);
+
+ mk_list_foreach(r_head, &rule->from_states) {
+ e = mk_list_entry(r_head, struct flb_slist_entry, _head);
+ if (strcmp(e->str, state) == 0) {
+ return FLB_TRUE;
+ }
+ }
+ }
+
+ return FLB_FALSE;
+}
+
+static inline int to_states_matches_rule(struct flb_ml_rule *rule,
+ flb_sds_t state)
+{
+ struct mk_list *head;
+ struct flb_slist_entry *e;
+
+ mk_list_foreach(head, &rule->from_states) {
+ e = mk_list_entry(head, struct flb_slist_entry, _head);
+ if (strcmp(e->str, state) == 0) {
+ return FLB_TRUE;
+ }
+ }
+
+ return FLB_FALSE;
+}
+
+static int set_to_state_map(struct flb_ml_parser *ml_parser,
+ struct flb_ml_rule *rule)
+{
+ int ret;
+ struct to_state *s;
+ struct mk_list *head;
+ struct flb_ml_rule *r;
+
+ if (!rule->to_state) {
+ /* no to_state */
+ return 0;
+ }
+
+ /* Iterate all rules that matches the to_state */
+ mk_list_foreach(head, &ml_parser->regex_rules) {
+ r = mk_list_entry(head, struct flb_ml_rule, _head);
+
+ /* Check if rule->to_state, matches an existing (registered) from_state */
+ ret = to_states_exists(ml_parser, rule->to_state);
+ if (!ret) {
+ flb_error("[multiline parser: %s] to_state='%s' is not registered",
+ ml_parser->name, rule->to_state);
+ return -1;
+ }
+
+ /*
+ * A rule can have many 'from_states', check if the current 'rule->to_state'
+ * matches any 'r->from_states'
+ */
+ ret = to_states_matches_rule(r, rule->to_state);
+ if (!ret) {
+ continue;
+ }
+
+ /* We have a match. Create a 'to_state' entry into the 'to_state_map' list */
+ s = flb_malloc(sizeof(struct to_state));
+ if (!s) {
+ flb_errno();
+ return -1;
+ }
+ s->rule = r;
+ mk_list_add(&s->_head, &rule->to_state_map);
+ }
+
+ return 0;
+}
+
+static int try_flushing_buffer(struct flb_ml_parser *ml_parser,
+ struct flb_ml_stream *mst,
+ struct flb_ml_stream_group *group)
+{
+ int next_start = FLB_FALSE;
+ struct mk_list *head;
+ struct to_state *st;
+ struct flb_ml_rule *rule;
+
+ rule = group->rule_to_state;
+ if (!rule) {
+ if (flb_sds_len(group->buf) > 0) {
+ flb_ml_flush_stream_group(ml_parser, mst, group, FLB_FALSE);
+ group->first_line = FLB_TRUE;
+ }
+ return 0;
+ }
+
+ /* Check if any 'to_state_map' referenced rules is a possible start */
+ mk_list_foreach(head, &rule->to_state_map) {
+ st = mk_list_entry(head, struct to_state, _head);
+ if (st->rule->start_state) {
+ next_start = FLB_TRUE;
+ break;
+ }
+ }
+
+ if (next_start && flb_sds_len(group->buf) > 0) {
+ flb_ml_flush_stream_group(ml_parser, mst, group, FLB_FALSE);
+ group->first_line = FLB_TRUE;
+ }
+
+ return 0;
+}
+
+/* Initialize all rules */
+int flb_ml_rule_init(struct flb_ml_parser *ml_parser)
+{
+ int ret;
+ struct mk_list *head;
+ struct flb_ml_rule *rule;
+
+ /* FIXME: sort rules by start_state first (let's trust in the caller) */
+
+ /* For each rule, compose it to_state_map list */
+ mk_list_foreach(head, &ml_parser->regex_rules) {
+ rule = mk_list_entry(head, struct flb_ml_rule, _head);
+ /* Populate 'rule->to_state_map' list */
+ ret = set_to_state_map(ml_parser, rule);
+ if (ret == -1) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/* Search any 'start_state' matching the incoming 'buf_data' */
+static struct flb_ml_rule *try_start_state(struct flb_ml_parser *ml_parser,
+ char *buf_data, size_t buf_size)
+{
+ int ret = -1;
+ struct mk_list *head;
+ struct flb_ml_rule *rule = NULL;
+
+ mk_list_foreach(head, &ml_parser->regex_rules) {
+ rule = mk_list_entry(head, struct flb_ml_rule, _head);
+
+ /* Is this rule matching a start_state ? */
+ if (!rule->start_state) {
+ rule = NULL;
+ continue;
+ }
+
+ /* Matched a start_state. Check if we have a regex match */
+ ret = flb_regex_match(rule->regex, (unsigned char *) buf_data, buf_size);
+ if (ret) {
+ return rule;
+ }
+ }
+
+ return NULL;
+}
+
+int flb_ml_rule_process(struct flb_ml_parser *ml_parser,
+ struct flb_ml_stream *mst,
+ struct flb_ml_stream_group *group,
+ msgpack_object *full_map,
+ void *buf, size_t size, struct flb_time *tm,
+ msgpack_object *val_content,
+ msgpack_object *val_pattern)
+{
+ int ret;
+ int len;
+ char *buf_data = NULL;
+ size_t buf_size = 0;
+ struct mk_list *head;
+ struct to_state *st = NULL;
+ struct flb_ml_rule *rule = NULL;
+ struct flb_ml_rule *tmp_rule = NULL;
+
+ if (val_content) {
+ buf_data = (char *) val_content->via.str.ptr;
+ buf_size = val_content->via.str.size;
+ }
+ else {
+ buf_data = buf;
+ buf_size = size;
+ }
+
+ if (group->rule_to_state) {
+ /* are we in a continuation ? */
+ tmp_rule = group->rule_to_state;
+
+ /* Lookup all possible next rules by state reference */
+ rule = NULL;
+ mk_list_foreach(head, &tmp_rule->to_state_map) {
+ st = mk_list_entry(head, struct to_state, _head);
+
+ /* skip start states */
+ if (st->rule->start_state) {
+ continue;
+ }
+
+ /* Try regex match */
+ ret = flb_regex_match(st->rule->regex,
+ (unsigned char *) buf_data, buf_size);
+ if (ret) {
+ /* Regex matched */
+ len = flb_sds_len(group->buf);
+ if (len >= 1 && group->buf[len - 1] != '\n') {
+ flb_sds_cat_safe(&group->buf, "\n", 1);
+ }
+
+ if (buf_size == 0) {
+ flb_sds_cat_safe(&group->buf, "\n", 1);
+ }
+ else {
+ flb_sds_cat_safe(&group->buf, buf_data, buf_size);
+ }
+ rule = st->rule;
+ break;
+ }
+ rule = NULL;
+ }
+
+ }
+
+ if (!rule) {
+ /* Check if we are in a 'start_state' */
+ rule = try_start_state(ml_parser, buf_data, buf_size);
+ if (rule) {
+ /* if the group buffer has any previous data just flush it */
+ if (flb_sds_len(group->buf) > 0) {
+ flb_ml_flush_stream_group(ml_parser, mst, group, FLB_FALSE);
+ }
+
+ /* set the rule state */
+ group->rule_to_state = rule;
+
+ /* concatenate the data */
+ flb_sds_cat_safe(&group->buf, buf_data, buf_size);
+
+ /* Copy full map content in stream buffer */
+ flb_ml_register_context(group, tm, full_map);
+
+ return 0;
+ }
+ }
+
+ if (rule) {
+ group->rule_to_state = rule;
+ try_flushing_buffer(ml_parser, mst, group);
+ return 0;
+ }
+
+ return -1;
+}
diff --git a/fluent-bit/src/multiline/flb_ml_stream.c b/fluent-bit/src/multiline/flb_ml_stream.c
new file mode 100644
index 000000000..c92ba3220
--- /dev/null
+++ b/fluent-bit/src/multiline/flb_ml_stream.c
@@ -0,0 +1,338 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_log.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_rule.h>
+#include <cfl/cfl.h>
+
+static int ml_flush_stdout(struct flb_ml_parser *parser,
+ struct flb_ml_stream *mst,
+ void *data, char *buf_data, size_t buf_size)
+{
+ fprintf(stdout, "\n%s----- MULTILINE FLUSH -----%s\n",
+ ANSI_GREEN, ANSI_RESET);
+
+ /* Print incoming flush buffer */
+ flb_pack_print(buf_data, buf_size);
+
+ fprintf(stdout, "%s----------- EOF -----------%s\n",
+ ANSI_GREEN, ANSI_RESET);
+ return 0;
+}
+
+static struct flb_ml_stream_group *stream_group_create(struct flb_ml_stream *mst,
+ char *name, int len)
+{
+ struct flb_ml_stream_group *group;
+
+ if (!name) {
+ name = "_default";
+ }
+
+ group = flb_calloc(1, sizeof(struct flb_ml_stream_group));
+ if (!group) {
+ flb_errno();
+ return NULL;
+ }
+ group->name = flb_sds_create_len(name, len);
+ if (!group->name) {
+ flb_free(group);
+ return NULL;
+ }
+
+ /* status */
+ group->first_line = FLB_TRUE;
+
+ /* multiline buffer */
+ group->buf = flb_sds_create_size(FLB_ML_BUF_SIZE);
+ if (!group->buf) {
+ flb_error("cannot allocate multiline stream buffer in group %s", name);
+ flb_sds_destroy(group->name);
+ flb_free(group);
+ return NULL;
+ }
+
+ /* msgpack buffer */
+ msgpack_sbuffer_init(&group->mp_md_sbuf);
+ msgpack_packer_init(&group->mp_md_pck, &group->mp_md_sbuf, msgpack_sbuffer_write);
+
+ msgpack_sbuffer_init(&group->mp_sbuf);
+ msgpack_packer_init(&group->mp_pck, &group->mp_sbuf, msgpack_sbuffer_write);
+
+ mk_list_add(&group->_head, &mst->groups);
+
+ return group;
+}
+
+struct flb_ml_stream_group *flb_ml_stream_group_get(struct flb_ml_parser_ins *parser_i,
+ struct flb_ml_stream *mst,
+ msgpack_object *group_name)
+{
+ int len;
+ char *name;
+ struct flb_ml_parser *mlp;
+ struct mk_list *head;
+ struct flb_ml_stream_group *group = NULL;
+
+ mlp = parser_i->ml_parser;
+
+ /* If key_group was not defined, we already have a default group */
+ if (!mlp->key_group || !group_name) {
+ group = mk_list_entry_first(&mst->groups,
+ struct flb_ml_stream_group,
+ _head);
+ return group;
+ }
+
+ /* Lookup for a candidate group */
+ len = group_name->via.str.size;
+ name = (char *)group_name->via.str.ptr;
+
+ mk_list_foreach(head, &mst->groups) {
+ group = mk_list_entry(head, struct flb_ml_stream_group, _head);
+ if (flb_sds_cmp(group->name, name, len) == 0) {
+ return group;
+ }
+ else {
+ group = NULL;
+ continue;
+ }
+ }
+
+ /* No group has been found, create a new one */
+ if (mk_list_size(&mst->groups) >= FLB_ML_MAX_GROUPS) {
+ flb_error("[multiline] stream %s exceeded number of allowed groups (%i)",
+ mst->name, FLB_ML_MAX_GROUPS);
+ return NULL;
+ }
+
+ group = stream_group_create(mst, name, len);
+ return group;
+}
+
+static void stream_group_destroy(struct flb_ml_stream_group *group)
+{
+ if (group->name) {
+ flb_sds_destroy(group->name);
+ }
+ if (group->buf) {
+ flb_sds_destroy(group->buf);
+ }
+
+ msgpack_sbuffer_destroy(&group->mp_md_sbuf);
+ msgpack_sbuffer_destroy(&group->mp_sbuf);
+
+ mk_list_del(&group->_head);
+ flb_free(group);
+}
+
+static void stream_group_destroy_all(struct flb_ml_stream *mst)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_ml_stream_group *group;
+
+ mk_list_foreach_safe(head, tmp, &mst->groups) {
+ group = mk_list_entry(head, struct flb_ml_stream_group, _head);
+ stream_group_destroy(group);
+ }
+}
+
+static int stream_group_init(struct flb_ml_stream *stream)
+{
+ struct flb_ml_stream_group *group = NULL;
+
+ mk_list_init(&stream->groups);
+
+ /* create a default group */
+ group = stream_group_create(stream, NULL, 0);
+ if (!group) {
+ flb_error("[multiline] error initializing default group for "
+ "stream '%s'", stream->name);
+ return -1;
+ }
+
+ return 0;
+}
+
+static struct flb_ml_stream *stream_create(struct flb_ml *ml,
+ uint64_t id,
+ struct flb_ml_parser_ins *parser,
+ int (*cb_flush) (struct flb_ml_parser *,
+ struct flb_ml_stream *,
+ void *cb_data,
+ char *buf_data,
+ size_t buf_size),
+ void *cb_data)
+{
+ int ret;
+ struct flb_ml_stream *stream;
+
+ stream = flb_calloc(1, sizeof(struct flb_ml_stream));
+ if (!stream) {
+ flb_errno();
+ return NULL;
+ }
+ stream->ml = ml;
+ stream->id = id;
+ stream->parser = parser;
+
+ /* Flush Callback and opaque data type */
+ if (cb_flush) {
+ stream->cb_flush = cb_flush;
+ }
+ else {
+ stream->cb_flush = ml_flush_stdout;
+ }
+ stream->cb_data = cb_data;
+
+ ret = stream_group_init(stream);
+ if (ret != 0) {
+ flb_free(stream);
+ return NULL;
+ }
+
+ mk_list_add(&stream->_head, &parser->streams);
+ return stream;
+}
+
+int flb_ml_stream_create(struct flb_ml *ml,
+ char *name,
+ int name_len,
+ int (*cb_flush) (struct flb_ml_parser *,
+ struct flb_ml_stream *,
+ void *cb_data,
+ char *buf_data,
+ size_t buf_size),
+ void *cb_data,
+ uint64_t *stream_id)
+{
+ uint64_t id;
+ struct mk_list *head;
+ struct mk_list *head_group;
+ struct flb_ml_stream *mst;
+ struct flb_ml_group *group;
+ struct flb_ml_parser_ins *parser;
+
+ if (!name) {
+ return -1;
+ }
+
+ if (name_len <= 0) {
+ name_len = strlen(name);
+ }
+
+ /* Set the stream id by creating a hash using the name */
+ id = cfl_hash_64bits(name, name_len);
+
+ /* For every group and parser, create a stream for this stream_id/hash */
+ mk_list_foreach(head, &ml->groups) {
+ group = mk_list_entry(head, struct flb_ml_group, _head);
+ mk_list_foreach(head_group, &group->parsers) {
+ parser = mk_list_entry(head_group, struct flb_ml_parser_ins, _head);
+
+ /* Check if the stream already exists on the parser */
+ if (flb_ml_stream_get(parser, id) != NULL) {
+ continue;
+ }
+
+ /* Create the stream */
+ mst = stream_create(ml, id, parser, cb_flush, cb_data);
+ if (!mst) {
+ flb_error("[multiline] could not create stream_id=%" PRIu64
+ "for stream '%s' on parser '%s'",
+ *stream_id, name, parser->ml_parser->name);
+ return -1;
+ }
+ }
+ }
+
+ *stream_id = id;
+ return 0;
+}
+
+struct flb_ml_stream *flb_ml_stream_get(struct flb_ml_parser_ins *parser,
+ uint64_t stream_id)
+{
+ struct mk_list *head;
+ struct flb_ml_stream *mst = NULL;
+
+ mk_list_foreach(head, &parser->streams) {
+ mst = mk_list_entry(head, struct flb_ml_stream, _head);
+ if (mst->id == stream_id) {
+ return mst;
+ }
+ }
+
+ return NULL;
+}
+
+void flb_ml_stream_id_destroy_all(struct flb_ml *ml, uint64_t stream_id)
+{
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct mk_list *head_group;
+ struct mk_list *head_stream;
+ struct flb_ml_group *group;
+ struct flb_ml_stream *mst;
+ struct flb_ml_parser_ins *parser_i;
+
+ /* groups */
+ mk_list_foreach(head, &ml->groups) {
+ group = mk_list_entry(head, struct flb_ml_group, _head);
+
+ /* parser instances */
+ mk_list_foreach(head_group, &group->parsers) {
+ parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head);
+
+ /* streams */
+ mk_list_foreach_safe(head_stream, tmp, &parser_i->streams) {
+ mst = mk_list_entry(head_stream, struct flb_ml_stream, _head);
+ if (mst->id != stream_id) {
+ continue;
+ }
+
+ /* flush any pending data */
+ flb_ml_flush_parser_instance(ml, parser_i, stream_id, FLB_TRUE);
+
+ /* destroy internal groups of the stream */
+ flb_ml_stream_destroy(mst);
+ }
+ }
+ }
+}
+
+int flb_ml_stream_destroy(struct flb_ml_stream *mst)
+{
+ mk_list_del(&mst->_head);
+ if (mst->name) {
+ flb_sds_destroy(mst->name);
+ }
+
+ /* destroy groups */
+ stream_group_destroy_all(mst);
+
+ flb_free(mst);
+
+ return 0;
+}