summaryrefslogtreecommitdiffstats
path: root/fluent-bit/tests/internal/multiline.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/tests/internal/multiline.c')
-rw-r--r--fluent-bit/tests/internal/multiline.c1478
1 files changed, 1478 insertions, 0 deletions
diff --git a/fluent-bit/tests/internal/multiline.c b/fluent-bit/tests/internal/multiline.c
new file mode 100644
index 000000000..e175e1171
--- /dev/null
+++ b/fluent-bit/tests/internal/multiline.c
@@ -0,0 +1,1478 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_parser.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/multiline/flb_ml.h>
+#include <fluent-bit/multiline/flb_ml_rule.h>
+#include <fluent-bit/multiline/flb_ml_parser.h>
+
+#include "flb_tests_internal.h"
+
+struct record_check {
+ char *buf;
+};
+
+struct expected_result {
+ int current_record;
+ char *key;
+ struct record_check *out_records;
+};
+
+/* Docker */
+struct record_check docker_input[] = {
+ {"{\"log\": \"aa\\n\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01231z\"}"},
+ {"{\"log\": \"aa\\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01231z\"}"},
+ {"{\"log\": \"bb\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01232z\"}"},
+ {"{\"log\": \"cc\n\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01233z\"}"},
+ {"{\"log\": \"dd\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01233z\"}"},
+ {"single line to force pending flush of the previous line"},
+ {"{\"log\": \"ee\\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01234z\"}"},
+};
+
+struct record_check docker_output[] = {
+ {"aa\n"},
+ {"aa\n"},
+ {"bbcc\n"},
+ {"dd"},
+ {"single line to force pending flush of the previous line"},
+ {"ee\n"},
+};
+
+/* CRI */
+struct record_check cri_input[] = {
+ {"2019-05-07T18:57:50.904275087+00:00 stdout P 1a. some "},
+ {"2019-05-07T18:57:51.904275088+00:00 stdout P multiline "},
+ {"2019-05-07T18:57:52.904275089+00:00 stdout F log"},
+ {"2019-05-07T18:57:50.904275087+00:00 stderr P 1b. some "},
+ {"2019-05-07T18:57:51.904275088+00:00 stderr P multiline "},
+ {"2019-05-07T18:57:52.904275089+00:00 stderr F log"},
+ {"2019-05-07T18:57:53.904275090+00:00 stdout P 2a. another "},
+ {"2019-05-07T18:57:54.904275091+00:00 stdout P multiline "},
+ {"2019-05-07T18:57:55.904275092+00:00 stdout F log"},
+ {"2019-05-07T18:57:53.904275090+00:00 stderr P 2b. another "},
+ {"2019-05-07T18:57:54.904275091+00:00 stderr P multiline "},
+ {"2019-05-07T18:57:55.904275092+00:00 stderr F log"},
+ {"2019-05-07T18:57:56.904275093+00:00 stdout F 3a. non multiline 1"},
+ {"2019-05-07T18:57:57.904275094+00:00 stdout F 4a. non multiline 2"},
+ {"2019-05-07T18:57:56.904275093+00:00 stderr F 3b. non multiline 1"},
+ {"2019-05-07T18:57:57.904275094+00:00 stderr F 4b. non multiline 2"}
+};
+
+struct record_check cri_output[] = {
+ {"1a. some multiline log"},
+ {"1b. some multiline log"},
+ {"2a. another multiline log"},
+ {"2b. another multiline log"},
+ {"3a. non multiline 1"},
+ {"4a. non multiline 2"},
+ {"3b. non multiline 1"},
+ {"4b. non multiline 2"}
+};
+
+/* ENDSWITH */
+struct record_check endswith_input[] = {
+ {"1a. some multiline log \\"},
+ {"1b. some multiline log"},
+ {"2a. another multiline log\\"},
+ {"2b. another multiline log"},
+ {"3a. non multiline 1"},
+ {"4a. non multiline 2"}
+};
+
+struct record_check endswith_output[] = {
+ {"1a. some multiline log \\\n1b. some multiline log\n"},
+ {"2a. another multiline log\\\n2b. another multiline log\n"},
+ {"3a. non multiline 1\n"},
+ {"4a. non multiline 2\n"}
+};
+
+/* Mixed lines of Docker and CRI logs in different streams (stdout/stderr) */
+struct record_check container_mix_input[] = {
+ {"{\"log\": \"a1\\n\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01231z\"}"},
+ {"{\"log\": \"a2\\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01231z\"}"},
+ {"{\"log\": \"bb\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01232z\"}"},
+ {"{\"log\": \"cc\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01233z\"}"},
+ {"{\"log\": \"dd\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01232z\"}"},
+ {"{\"log\": \"ee\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01233z\"}"},
+ {"2019-05-07T18:57:52.904275089+00:00 stdout F single full"},
+ {"2019-05-07T18:57:50.904275087+00:00 stdout P 1a. some "},
+ {"2019-05-07T18:57:51.904275088+00:00 stdout P multiline "},
+ {"2019-05-07T18:57:52.904275089+00:00 stdout F log"},
+ {"2019-05-07T18:57:50.904275087+00:00 stderr P 1b. some "},
+ {"2019-05-07T18:57:51.904275088+00:00 stderr P multiline "},
+ {"2019-05-07T18:57:52.904275089+00:00 stderr F log"},
+ {"{\"log\": \"dd-out\\n\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01234z\"}"},
+ {"{\"log\": \"dd-err\\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01234z\"}"},
+};
+
+struct record_check container_mix_output[] = {
+ {"a1\n"},
+ {"a2\n"},
+ {"ddee\n"},
+ {"bbcc"},
+ {"single full"},
+ {"1a. some multiline log"},
+ {"1b. some multiline log"},
+ {"dd-out\n"},
+ {"dd-err\n"},
+};
+
+/* Java stacktrace detection */
+struct record_check java_input[] = {
+ {"Exception in thread \"main\" java.lang.IllegalStateException: ..null property\n"},
+ {" at com.example.myproject.Author.getBookIds(xx.java:38)\n"},
+ {" at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\n"},
+ {"Caused by: java.lang.NullPointerException\n"},
+ {" at com.example.myproject.Book.getId(Book.java:22)\n"},
+ {" at com.example.myproject.Author.getBookIds(Author.java:35)\n"},
+ {" ... 1 more\n"},
+ {"single line\n"}
+};
+
+struct record_check java_output[] = {
+ {
+ "Exception in thread \"main\" java.lang.IllegalStateException: ..null property\n"
+ " at com.example.myproject.Author.getBookIds(xx.java:38)\n"
+ " at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\n"
+ "Caused by: java.lang.NullPointerException\n"
+ " at com.example.myproject.Book.getId(Book.java:22)\n"
+ " at com.example.myproject.Author.getBookIds(Author.java:35)\n"
+ " ... 1 more\n"
+ },
+ {
+ "single line\n"
+ }
+};
+
+struct record_check ruby_input[] = {
+ {"/app/config/routes.rb:6:in `/': divided by 0 (ZeroDivisionError)"},
+ {" from /app/config/routes.rb:6:in `block in <main>'"},
+ {" from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:428:in `instance_exec'"},
+ {" from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:428:in `eval_block'"},
+ {" from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:410:in `draw'"},
+ {" from /app/config/routes.rb:1:in `<main>'"},
+ {"hello world, not multiline\n"}
+};
+
+struct record_check ruby_output[] = {
+ {
+ "/app/config/routes.rb:6:in `/': divided by 0 (ZeroDivisionError)\n"
+ " from /app/config/routes.rb:6:in `block in <main>'\n"
+ " from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:428:in `instance_exec'\n"
+ " from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:428:in `eval_block'\n"
+ " from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:410:in `draw'\n"
+ " from /app/config/routes.rb:1:in `<main>'\n"
+ },
+ {"hello world, not multiline\n"}
+};
+
+/* Python stacktrace detection */
+struct record_check python_input[] = {
+ {"Traceback (most recent call last):\n"},
+ {" File \"/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py\", line 1535, in __call__\n"},
+ {" rv = self.handle_exception(request, response, e)\n"},
+ {" File \"/base/data/home/apps/s~nearfieldspy/1.378705245900539993/nearfieldspy.py\", line 17, in start\n"},
+ {" return get()\n"},
+ {" File \"/base/data/home/apps/s~nearfieldspy/1.378705245900539993/nearfieldspy.py\", line 5, in get\n"},
+ {" raise Exception('spam', 'eggs')\n"},
+ {"Exception: ('spam', 'eggs')\n"},
+ {"hello world, not multiline\n"}
+};
+
+struct record_check python_output[] = {
+ {
+ "Traceback (most recent call last):\n"
+ " File \"/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py\", line 1535, in __call__\n"
+ " rv = self.handle_exception(request, response, e)\n"
+ " File \"/base/data/home/apps/s~nearfieldspy/1.378705245900539993/nearfieldspy.py\", line 17, in start\n"
+ " return get()\n"
+ " File \"/base/data/home/apps/s~nearfieldspy/1.378705245900539993/nearfieldspy.py\", line 5, in get\n"
+ " raise Exception('spam', 'eggs')\n"
+ "Exception: ('spam', 'eggs')\n"
+ },
+ {"hello world, not multiline\n"}
+};
+
+/* Custom example for Elasticsearch stacktrace */
+struct record_check elastic_input[] = {
+ {"[some weird test] IndexNotFoundException[no such index]\n"},
+ {" at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver....\n"},
+ {" at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.java:133)\n"},
+ {" at org.elasticsearch.action.admin.indices.delete.java:75)\n"},
+ {"another separate log line\n"}
+};
+
+struct record_check elastic_output[] = {
+ {
+ "[some weird test] IndexNotFoundException[no such index]\n"
+ " at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver....\n"
+ " at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.java:133)\n"
+ " at org.elasticsearch.action.admin.indices.delete.java:75)\n"
+ },
+ {
+ "another separate log line\n"
+ }
+};
+
+/* Go */
+struct record_check go_input[] = {
+ {"panic: my panic\n"},
+ {"\n"},
+ {"goroutine 4 [running]:\n"},
+ {"panic(0x45cb40, 0x47ad70)\n"},
+ {" /usr/local/go/src/runtime/panic.go:542 +0x46c fp=0xc42003f7b8 sp=0xc42003f710 pc=0x422f7c\n"},
+ {"main.main.func1(0xc420024120)\n"},
+ {" foo.go:6 +0x39 fp=0xc42003f7d8 sp=0xc42003f7b8 pc=0x451339\n"},
+ {"runtime.goexit()\n"},
+ {" /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003f7e0 sp=0xc42003f7d8 pc=0x44b4d1\n"},
+ {"created by main.main\n"},
+ {" foo.go:5 +0x58\n"},
+ {"\n"},
+ {"goroutine 1 [chan receive]:\n"},
+ {"runtime.gopark(0x4739b8, 0xc420024178, 0x46fcd7, 0xc, 0xc420028e17, 0x3)\n"},
+ {" /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc420053e30 sp=0xc420053e00 pc=0x42503c\n"},
+ {"runtime.goparkunlock(0xc420024178, 0x46fcd7, 0xc, 0x1000f010040c217, 0x3)\n"},
+ {" /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc420053e70 sp=0xc420053e30 pc=0x42512e\n"},
+ {"runtime.chanrecv(0xc420024120, 0x0, 0xc420053f01, 0x4512d8)\n"},
+ {" /usr/local/go/src/runtime/chan.go:506 +0x304 fp=0xc420053f20 sp=0xc420053e70 pc=0x4046b4\n"},
+ {"runtime.chanrecv1(0xc420024120, 0x0)\n"},
+ {" /usr/local/go/src/runtime/chan.go:388 +0x2b fp=0xc420053f50 sp=0xc420053f20 pc=0x40439b\n"},
+ {"main.main()\n"},
+ {" foo.go:9 +0x6f fp=0xc420053f80 sp=0xc420053f50 pc=0x4512ef\n"},
+ {"runtime.main()\n"},
+ {" /usr/local/go/src/runtime/proc.go:185 +0x20d fp=0xc420053fe0 sp=0xc420053f80 pc=0x424bad\n"},
+ {"runtime.goexit()\n"},
+ {" /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc420053fe8 sp=0xc420053fe0 pc=0x44b4d1\n"},
+ {"\n"},
+ {"goroutine 2 [force gc (idle)]:\n"},
+ {"runtime.gopark(0x4739b8, 0x4ad720, 0x47001e, 0xf, 0x14, 0x1)\n"},
+ {" /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003e768 sp=0xc42003e738 pc=0x42503c\n"},
+ {"runtime.goparkunlock(0x4ad720, 0x47001e, 0xf, 0xc420000114, 0x1)\n"},
+ {" /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003e7a8 sp=0xc42003e768 pc=0x42512e\n"},
+ {"runtime.forcegchelper()\n"},
+ {" /usr/local/go/src/runtime/proc.go:238 +0xcc fp=0xc42003e7e0 sp=0xc42003e7a8 pc=0x424e5c\n"},
+ {"runtime.goexit()\n"},
+ {" /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003e7e8 sp=0xc42003e7e0 pc=0x44b4d1\n"},
+ {"created by runtime.init.4\n"},
+ {" /usr/local/go/src/runtime/proc.go:227 +0x35\n"},
+ {"\n"},
+ {"goroutine 3 [GC sweep wait]:\n"},
+ {"runtime.gopark(0x4739b8, 0x4ad7e0, 0x46fdd2, 0xd, 0x419914, 0x1)\n"},
+ {" /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003ef60 sp=0xc42003ef30 pc=0x42503c\n"},
+ {"runtime.goparkunlock(0x4ad7e0, 0x46fdd2, 0xd, 0x14, 0x1)\n"},
+ {" /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003efa0 sp=0xc42003ef60 pc=0x42512e\n"},
+ {"runtime.bgsweep(0xc42001e150)\n"},
+ {" /usr/local/go/src/runtime/mgcsweep.go:52 +0xa3 fp=0xc42003efd8 sp=0xc42003efa0 pc=0x419973\n"},
+ {"runtime.goexit()\n"},
+ {" /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003efe0 sp=0xc42003efd8 pc=0x44b4d1\n"},
+ {"created by runtime.gcenable\n"},
+ {" /usr/local/go/src/runtime/mgc.go:216 +0x58\n"},
+ {"one more line, no multiline"}
+};
+
+struct record_check go_output[] = {
+ {
+ "panic: my panic\n"
+ "\n"
+ "goroutine 4 [running]:\n"
+ "panic(0x45cb40, 0x47ad70)\n"
+ " /usr/local/go/src/runtime/panic.go:542 +0x46c fp=0xc42003f7b8 sp=0xc42003f710 pc=0x422f7c\n"
+ "main.main.func1(0xc420024120)\n"
+ " foo.go:6 +0x39 fp=0xc42003f7d8 sp=0xc42003f7b8 pc=0x451339\n"
+ "runtime.goexit()\n"
+ " /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003f7e0 sp=0xc42003f7d8 pc=0x44b4d1\n"
+ "created by main.main\n"
+ " foo.go:5 +0x58\n"
+ "\n"
+ "goroutine 1 [chan receive]:\n"
+ "runtime.gopark(0x4739b8, 0xc420024178, 0x46fcd7, 0xc, 0xc420028e17, 0x3)\n"
+ " /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc420053e30 sp=0xc420053e00 pc=0x42503c\n"
+ "runtime.goparkunlock(0xc420024178, 0x46fcd7, 0xc, 0x1000f010040c217, 0x3)\n"
+ " /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc420053e70 sp=0xc420053e30 pc=0x42512e\n"
+ "runtime.chanrecv(0xc420024120, 0x0, 0xc420053f01, 0x4512d8)\n"
+ " /usr/local/go/src/runtime/chan.go:506 +0x304 fp=0xc420053f20 sp=0xc420053e70 pc=0x4046b4\n"
+ "runtime.chanrecv1(0xc420024120, 0x0)\n"
+ " /usr/local/go/src/runtime/chan.go:388 +0x2b fp=0xc420053f50 sp=0xc420053f20 pc=0x40439b\n"
+ "main.main()\n"
+ " foo.go:9 +0x6f fp=0xc420053f80 sp=0xc420053f50 pc=0x4512ef\n"
+ "runtime.main()\n"
+ " /usr/local/go/src/runtime/proc.go:185 +0x20d fp=0xc420053fe0 sp=0xc420053f80 pc=0x424bad\n"
+ "runtime.goexit()\n"
+ " /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc420053fe8 sp=0xc420053fe0 pc=0x44b4d1\n"
+ "\n"
+ "goroutine 2 [force gc (idle)]:\n"
+ "runtime.gopark(0x4739b8, 0x4ad720, 0x47001e, 0xf, 0x14, 0x1)\n"
+ " /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003e768 sp=0xc42003e738 pc=0x42503c\n"
+ "runtime.goparkunlock(0x4ad720, 0x47001e, 0xf, 0xc420000114, 0x1)\n"
+ " /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003e7a8 sp=0xc42003e768 pc=0x42512e\n"
+ "runtime.forcegchelper()\n"
+ " /usr/local/go/src/runtime/proc.go:238 +0xcc fp=0xc42003e7e0 sp=0xc42003e7a8 pc=0x424e5c\n"
+ "runtime.goexit()\n"
+ " /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003e7e8 sp=0xc42003e7e0 pc=0x44b4d1\n"
+ "created by runtime.init.4\n"
+ " /usr/local/go/src/runtime/proc.go:227 +0x35\n"
+ "\n"
+ "goroutine 3 [GC sweep wait]:\n"
+ "runtime.gopark(0x4739b8, 0x4ad7e0, 0x46fdd2, 0xd, 0x419914, 0x1)\n"
+ " /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003ef60 sp=0xc42003ef30 pc=0x42503c\n"
+ "runtime.goparkunlock(0x4ad7e0, 0x46fdd2, 0xd, 0x14, 0x1)\n"
+ " /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003efa0 sp=0xc42003ef60 pc=0x42512e\n"
+ "runtime.bgsweep(0xc42001e150)\n"
+ " /usr/local/go/src/runtime/mgcsweep.go:52 +0xa3 fp=0xc42003efd8 sp=0xc42003efa0 pc=0x419973\n"
+ "runtime.goexit()\n"
+ " /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003efe0 sp=0xc42003efd8 pc=0x44b4d1\n"
+ "created by runtime.gcenable\n"
+ " /usr/local/go/src/runtime/mgc.go:216 +0x58\n"
+ },
+ {"one more line, no multiline\n"}
+};
+
+/*
+ * Issue 3817 (case: 1)
+ * --------------------
+ * Source CRI messages (need first CRI multiline parsing) + a custom multiline
+ * parser.
+ *
+ * - https://github.com/fluent/fluent-bit/issues/3817
+ *
+ * The 'case 1' represents the problems of identifying two consecutive multiline
+ * messages within the same stream.
+ */
+struct record_check issue_3817_1_input[] = {
+ {"2021-05-17T17:35:01.184675702Z stdout F [DEBUG] 1 start multiline - "},
+ {"2021-05-17T17:35:01.184747208Z stdout F 1 cont A"},
+ {"2021-05-17T17:35:01.184675702Z stdout F [DEBUG] 2 start multiline - "},
+ {"2021-05-17T17:35:01.184747208Z stdout F 2 cont B"},
+ {"another isolated line"}
+};
+
+struct record_check issue_3817_1_output[] = {
+ {
+ "[DEBUG] 1 start multiline - \n"
+ "1 cont A"
+ },
+
+ {
+ "[DEBUG] 2 start multiline - \n"
+ "2 cont B"
+ },
+
+ {
+ "another isolated line"
+ }
+};
+
+/*
+ * Flush callback is invoked every time a multiline stream has completed a multiline
+ * message or a message is not multiline.
+ */
+static int flush_callback(struct flb_ml_parser *parser,
+ struct flb_ml_stream *mst,
+ void *data, char *buf_data, size_t buf_size)
+{
+ int i;
+ int ret;
+ int len;
+ int found = FLB_FALSE;
+ size_t off = 0;
+ msgpack_unpacked result;
+ msgpack_object *map;
+ msgpack_object key;
+ msgpack_object val;
+ struct flb_time tm;
+ struct expected_result *res = data;
+ struct record_check *exp;
+
+ fprintf(stdout, "\n%s----- MULTILINE FLUSH -----%s\n", ANSI_YELLOW, ANSI_RESET);
+
+ /* Print incoming flush buffer */
+ flb_pack_print(buf_data, buf_size);
+
+ fprintf(stdout, "%s----------- EOF -----------%s\n",
+ ANSI_YELLOW, ANSI_RESET);
+
+ /* Validate content */
+ msgpack_unpacked_init(&result);
+ off = 0;
+ ret = msgpack_unpack_next(&result, buf_data, buf_size, &off);
+ TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS);
+
+ flb_time_pop_from_msgpack(&tm, &result, &map);
+
+ TEST_CHECK(flb_time_to_nanosec(&tm) != 0L);
+
+ exp = &res->out_records[res->current_record];
+ len = strlen(res->key);
+ for (i = 0; i < map->via.map.size; i++) {
+ key = map->via.map.ptr[i].key;
+ val = map->via.map.ptr[i].val;
+
+ if (key.via.str.size != len) {
+ continue;
+ }
+
+ if (strncmp(key.via.str.ptr, res->key, len) == 0) {
+ found = FLB_TRUE;
+ break;
+ }
+ }
+ TEST_CHECK(found == FLB_TRUE);
+
+ len = strlen(exp->buf);
+ TEST_CHECK(val.via.str.size == len);
+ if (val.via.str.size != len) {
+ printf("expected length: %i, received: %i\n", len, val.via.str.size);
+ printf("== received ==\n");
+ msgpack_object_print(stdout, val);
+ printf("\n\n");
+ printf("== expected ==\n%s\n", exp->buf);
+ exit(1);
+ }
+ TEST_CHECK(memcmp(val.via.str.ptr, exp->buf, len) == 0);
+ res->current_record++;
+
+ msgpack_unpacked_destroy(&result);
+ return 0;
+}
+
+static void test_parser_docker()
+{
+ int i;
+ int len;
+ int ret;
+ int entries;
+ uint64_t stream_id;
+ struct record_check *r;
+ struct flb_config *config;
+ struct flb_time tm;
+ struct flb_ml *ml;
+ struct flb_ml_parser_ins *mlp_i;
+ struct expected_result res = {0};
+
+ /* Expected results context */
+ res.key = "log";
+ res.out_records = docker_output;
+
+ /* Initialize environment */
+ config = flb_config_init();
+
+ /* Create docker multiline mode */
+ ml = flb_ml_create(config, "test-docker");
+ TEST_CHECK(ml != NULL);
+
+ /* Load instances of the parsers for current 'ml' context */
+ mlp_i = flb_ml_parser_instance_create(ml, "cri");
+ TEST_CHECK(mlp_i != NULL);
+
+ /* Generate an instance of multiline docker parser */
+ mlp_i = flb_ml_parser_instance_create(ml, "docker");
+ TEST_CHECK(mlp_i != NULL);
+
+ ret = flb_ml_stream_create(ml, "docker", -1, flush_callback, (void *) &res,
+ &stream_id);
+ TEST_CHECK(ret == 0);
+
+ entries = sizeof(docker_input) / sizeof(struct record_check);
+ for (i = 0; i < entries; i++) {
+ r = &docker_input[i];
+ len = strlen(r->buf);
+
+ flb_time_get(&tm);
+
+ /* Package as msgpack */
+ flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
+ }
+
+ if (ml) {
+ flb_ml_destroy(ml);
+ }
+
+ flb_config_exit(config);
+}
+
+static void test_parser_cri()
+{
+ int i;
+ int len;
+ int ret;
+ int entries;
+ uint64_t stream_id;
+ struct record_check *r;
+ struct flb_config *config;
+ struct flb_time tm;
+ struct flb_ml *ml;
+ struct flb_ml_parser_ins *mlp_i;
+ struct expected_result res = {0};
+
+ /* Expected results context */
+ res.key = "log";
+ res.out_records = cri_output;
+
+ /* Initialize environment */
+ config = flb_config_init();
+
+ /* Create docker multiline mode */
+ ml = flb_ml_create(config, "cri-test");
+ TEST_CHECK(ml != NULL);
+
+ /* Generate an instance of multiline docker parser */
+ mlp_i = flb_ml_parser_instance_create(ml, "docker");
+ TEST_CHECK(mlp_i != NULL);
+
+ /* Load instances of the parsers for current 'ml' context */
+ mlp_i = flb_ml_parser_instance_create(ml, "cri");
+ TEST_CHECK(mlp_i != NULL);
+
+ ret = flb_ml_stream_create(ml, "cri", -1, flush_callback, (void *) &res,
+ &stream_id);
+ TEST_CHECK(ret == 0);
+
+ entries = sizeof(cri_input) / sizeof(struct record_check);
+ for (i = 0; i < entries; i++) {
+ r = &cri_input[i];
+ len = strlen(r->buf);
+ flb_time_get(&tm);
+
+ /* Package as msgpack */
+ flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
+ }
+
+ if (ml) {
+ flb_ml_destroy(ml);
+ }
+
+ flb_config_exit(config);
+}
+
+static void test_container_mix()
+{
+ int i;
+ int len;
+ int ret;
+ int entries;
+ uint64_t stream_id;
+ struct record_check *r;
+ struct flb_config *config;
+ struct flb_time tm;
+ struct flb_ml *ml;
+ struct flb_ml_parser_ins *mlp_i;
+ struct expected_result res = {0};
+
+ /* Expected results context */
+ res.key = "log";
+ res.out_records = container_mix_output;
+
+ /* Initialize environment */
+ config = flb_config_init();
+
+ /* Create docker multiline mode */
+ ml = flb_ml_create(config, "container-mix-test");
+ TEST_CHECK(ml != NULL);
+
+ /* Generate an instance of multiline docker parser */
+ mlp_i = flb_ml_parser_instance_create(ml, "docker");
+ TEST_CHECK(mlp_i != NULL);
+
+ /* Load instances of the parsers for current 'ml' context */
+ mlp_i = flb_ml_parser_instance_create(ml, "cri");
+ TEST_CHECK(mlp_i != NULL);
+
+ ret = flb_ml_stream_create(ml, "container-mix", -1, flush_callback, (void *) &res,
+ &stream_id);
+ TEST_CHECK(ret == 0);
+
+ entries = sizeof(container_mix_input) / sizeof(struct record_check);
+ for (i = 0; i < entries; i++) {
+ r = &container_mix_input[i];
+ len = strlen(r->buf);
+ flb_time_get(&tm);
+
+ /* Package as msgpack */
+ flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
+ }
+
+ if (ml) {
+ flb_ml_destroy(ml);
+ }
+
+ flb_config_exit(config);
+}
+
+static void test_parser_java()
+{
+ int i;
+ int len;
+ int ret;
+ int entries;
+ uint64_t stream_id;
+ struct record_check *r;
+ struct flb_config *config;
+ struct flb_time tm;
+ struct flb_ml *ml;
+ struct flb_ml_parser_ins *mlp_i;
+ struct expected_result res = {0};
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+
+ /* Expected results context */
+ res.key = "log";
+ res.out_records = java_output;
+
+ /* Initialize environment */
+ config = flb_config_init();
+
+ /* Create docker multiline mode */
+ ml = flb_ml_create(config, "java-test");
+ TEST_CHECK(ml != NULL);
+
+ /* Generate an instance of multiline java parser */
+ mlp_i = flb_ml_parser_instance_create(ml, "java");
+ TEST_CHECK(mlp_i != NULL);
+
+ flb_ml_parser_instance_set(mlp_i, "key_content", "log");
+
+ ret = flb_ml_stream_create(ml, "java", -1, flush_callback, (void *) &res,
+ &stream_id);
+ TEST_CHECK(ret == 0);
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ size_t off = 0;
+ msgpack_unpacked result;
+ msgpack_object root;
+ msgpack_object *map;
+
+ entries = sizeof(java_input) / sizeof(struct record_check);
+ for (i = 0; i < entries; i++) {
+ r = &java_input[i];
+ len = strlen(r->buf);
+
+ /* Package as msgpack */
+ flb_time_get(&tm);
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ msgpack_pack_array(&mp_pck, 2);
+ flb_time_append_to_msgpack(&tm, &mp_pck, 0);
+
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str(&mp_pck, 3);
+ msgpack_pack_str_body(&mp_pck, "log", 3);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, r->buf, len);
+
+ /* Unpack and lookup the content map */
+ msgpack_unpacked_init(&result);
+ off = 0;
+ ret = msgpack_unpack_next(&result, mp_sbuf.data, mp_sbuf.size, &off);
+
+ flb_pack_print(mp_sbuf.data, mp_sbuf.size);
+
+ root = result.data;
+ map = &root.via.array.ptr[1];
+
+ /* Package as msgpack */
+ ret = flb_ml_append_object(ml, stream_id, &tm, NULL, map);
+
+ msgpack_unpacked_destroy(&result);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ }
+
+ if (ml) {
+ flb_ml_destroy(ml);
+ }
+
+ flb_config_exit(config);
+}
+
+static void test_parser_python()
+{
+ int i;
+ int len;
+ int ret;
+ int entries;
+ uint64_t stream_id;
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+ struct record_check *r;
+ struct flb_config *config;
+ struct flb_time tm;
+ struct flb_ml *ml;
+ struct flb_ml_parser_ins *mlp_i;
+ struct expected_result res = {0};
+
+ /* Expected results context */
+ res.key = "log";
+ res.out_records = python_output;
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ /* Initialize environment */
+ config = flb_config_init();
+
+ /* Create docker multiline mode */
+ ml = flb_ml_create(config, "python-test");
+ TEST_CHECK(ml != NULL);
+
+ /* Generate an instance of multiline python parser */
+ mlp_i = flb_ml_parser_instance_create(ml, "python");
+ TEST_CHECK(mlp_i != NULL);
+
+ ret = flb_ml_stream_create(ml, "python", -1, flush_callback, (void *) &res,
+ &stream_id);
+ TEST_CHECK(ret == 0);
+
+ flb_time_get(&tm);
+
+ printf("\n");
+ entries = sizeof(python_input) / sizeof(struct record_check);
+ for (i = 0; i < entries; i++) {
+ r = &python_input[i];
+ len = strlen(r->buf);
+
+ /* Package as msgpack */
+ flb_time_get(&tm);
+ flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
+ }
+
+ if (ml) {
+ flb_ml_destroy(ml);
+ }
+
+ flb_config_exit(config);
+}
+
+static void test_parser_ruby()
+{
+ int i;
+ int len;
+ int ret;
+ int entries;
+ uint64_t stream_id;
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+ struct record_check *r;
+ struct flb_config *config;
+ struct flb_time tm;
+ struct flb_ml *ml;
+ struct flb_ml_parser_ins *mlp_i;
+ struct expected_result res = {0};
+
+ /* Expected results context */
+ res.key = "log";
+ res.out_records = ruby_output;
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ /* Initialize environment */
+ config = flb_config_init();
+
+ /* Create docker multiline mode */
+ ml = flb_ml_create(config, "ruby-test");
+ TEST_CHECK(ml != NULL);
+
+ /* Generate an instance of multiline ruby parser */
+ mlp_i = flb_ml_parser_instance_create(ml, "ruby");
+ TEST_CHECK(mlp_i != NULL);
+
+ ret = flb_ml_stream_create(ml, "ruby", -1, flush_callback, (void *) &res,
+ &stream_id);
+ TEST_CHECK(ret == 0);
+
+ flb_time_get(&tm);
+
+ printf("\n");
+ entries = sizeof(ruby_input) / sizeof(struct record_check);
+ for (i = 0; i < entries; i++) {
+ r = &ruby_input[i];
+ len = strlen(r->buf);
+
+ /* Package as msgpack */
+ flb_time_get(&tm);
+ flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
+ }
+
+ if (ml) {
+ flb_ml_destroy(ml);
+ }
+
+ flb_config_exit(config);
+}
+
+static void test_issue_4949()
+{
+ int i;
+ int len;
+ int ret;
+ int entries;
+ uint64_t stream_id;
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+ struct record_check *r;
+ struct flb_config *config;
+ struct flb_time tm;
+ struct flb_ml *ml;
+ struct flb_ml_parser_ins *mlp_i;
+ struct expected_result res = {0};
+
+ /* Expected results context */
+ res.key = "log";
+ res.out_records = python_output;
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ /* Initialize environment */
+ config = flb_config_init();
+
+ /* Create docker multiline mode */
+ ml = flb_ml_create(config, "python-test");
+ TEST_CHECK(ml != NULL);
+
+ /* Generate an instance of multiline python parser */
+ mlp_i = flb_ml_parser_instance_create(ml, "python");
+ TEST_CHECK(mlp_i != NULL);
+
+ ret = flb_ml_stream_create(ml, "python", -1, flush_callback, (void *) &res,
+ &stream_id);
+ TEST_CHECK(ret == 0);
+
+ /* Generate an instance of multiline java parser */
+ mlp_i = flb_ml_parser_instance_create(ml, "java");
+ TEST_CHECK(mlp_i != NULL);
+
+ ret = flb_ml_stream_create(ml, "java", -1, flush_callback, (void *) &res,
+ &stream_id);
+ TEST_CHECK(ret == 0);
+
+ flb_time_get(&tm);
+
+ printf("\n");
+ entries = sizeof(python_input) / sizeof(struct record_check);
+ for (i = 0; i < entries; i++) {
+ r = &python_input[i];
+ len = strlen(r->buf);
+
+ /* Package as msgpack */
+ flb_time_get(&tm);
+ flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
+ }
+
+ if (ml) {
+ flb_ml_destroy(ml);
+ }
+
+ flb_config_exit(config);
+}
+
+static void test_parser_elastic()
+{
+ int i;
+ int len;
+ int ret;
+ int entries;
+ size_t off = 0;
+ uint64_t stream_id;
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+ msgpack_unpacked result;
+ msgpack_object root;
+ msgpack_object *map;
+ struct record_check *r;
+ struct flb_config *config;
+ struct flb_time tm;
+ struct flb_ml *ml;
+ struct flb_ml_parser *mlp;
+ struct flb_ml_parser_ins *mlp_i;
+ struct expected_result res = {0};
+
+ /* Expected results context */
+ res.key = "log";
+ res.out_records = elastic_output;
+
+ /* Initialize environment */
+ config = flb_config_init();
+
+ ml = flb_ml_create(config, "test-elastic");
+ TEST_CHECK(ml != NULL);
+
+ mlp = flb_ml_parser_create(config,
+ "elastic", /* name */
+ FLB_ML_REGEX, /* type */
+ NULL, /* match_str */
+ FLB_FALSE, /* negate */
+ 1000, /* flush_ms */
+ "log", /* key_content */
+ NULL, /* key_pattern */
+ NULL, /* key_group */
+ NULL, /* parser ctx */
+ NULL); /* parser name */
+ TEST_CHECK(mlp != NULL);
+
+ mlp_i = flb_ml_parser_instance_create(ml, "elastic");
+ TEST_CHECK(mlp_i != NULL);
+
+ ret = flb_ml_rule_create(mlp, "start_state", "/^\\[/", "elastic_cont", NULL);
+ if (ret != 0) {
+ fprintf(stderr, "error creating rule 1");
+ }
+
+ ret = flb_ml_rule_create(mlp, "elastic_cont", "/^\\s+/", "elastic_cont", NULL);
+ if (ret != 0) {
+ fprintf(stderr, "error creating rule 2");
+ }
+
+ ret = flb_ml_stream_create(ml, "elastic", -1, flush_callback, (void *) &res,
+ &stream_id);
+ TEST_CHECK(ret == 0);
+
+ ret = flb_ml_parser_init(mlp);
+ if (ret != 0) {
+ fprintf(stderr, "error initializing multiline\n");
+ flb_ml_destroy(ml);
+ return;
+ }
+
+ printf("\n");
+ entries = sizeof(elastic_input) / sizeof(struct record_check);
+ for (i = 0; i < entries; i++) {
+ r = &elastic_input[i];
+ len = strlen(r->buf);
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ /* Package raw text as a msgpack record */
+ msgpack_pack_array(&mp_pck, 2);
+
+ flb_time_get(&tm);
+ flb_time_append_to_msgpack(&tm, &mp_pck, 0);
+
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str(&mp_pck, 3);
+ msgpack_pack_str_body(&mp_pck, "log", 3);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, r->buf, len);
+
+ /* Unpack and lookup the content map */
+ msgpack_unpacked_init(&result);
+ off = 0;
+ ret = msgpack_unpack_next(&result, mp_sbuf.data, mp_sbuf.size, &off);
+ TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS);
+
+ root = result.data;
+ map = &root.via.array.ptr[1];
+
+ /* Package as msgpack */
+ ret = flb_ml_append_object(ml, stream_id, &tm, NULL, map);
+
+ msgpack_unpacked_destroy(&result);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ }
+
+ if (ml) {
+ flb_ml_destroy(ml);
+ }
+
+ flb_config_exit(config);
+}
+
+static void test_endswith()
+{
+ int i;
+ int len;
+ int ret;
+ int entries;
+ uint64_t stream_id = 0;
+ struct record_check *r;
+ struct flb_config *config;
+ struct flb_time tm;
+ struct flb_ml *ml;
+ struct flb_ml_parser *mlp;
+ struct flb_ml_parser_ins *mlp_i;
+ struct expected_result res = {0};
+
+ /* Expected results context */
+ res.key = "log";
+ res.out_records = endswith_output;
+
+ /* Initialize environment */
+ config = flb_config_init();
+
+ /* Create docker multiline mode */
+ ml = flb_ml_create(config, "raw-endswith");
+ TEST_CHECK(ml != NULL);
+
+ mlp = flb_ml_parser_create(config,
+ "endswith", /* name */
+ FLB_ML_ENDSWITH, /* type */
+ "\\", /* match_str */
+ FLB_TRUE, /* negate */
+ 1000, /* flush_ms */
+ NULL, /* key_content */
+ NULL, /* key_pattern */
+ NULL, /* key_group */
+ NULL, /* parser ctx */
+ NULL); /* parser name */
+ TEST_CHECK(mlp != NULL);
+
+ /* Generate an instance of 'endswith' custom parser parser */
+ mlp_i = flb_ml_parser_instance_create(ml, "endswith");
+ TEST_CHECK(mlp_i != NULL);
+
+ ret = flb_ml_stream_create(ml, "test", -1, flush_callback, (void *) &res, &stream_id);
+ TEST_CHECK(ret == 0);
+
+ entries = sizeof(endswith_input) / sizeof(struct record_check);
+ for (i = 0; i < entries; i++) {
+ r = &endswith_input[i];
+ len = strlen(r->buf);
+
+ /* Package as msgpack */
+ flb_time_get(&tm);
+ flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
+ }
+
+ if (ml) {
+ flb_ml_destroy(ml);
+ }
+
+ flb_config_exit(config);
+}
+
+static void test_parser_go()
+{
+ int i;
+ int len;
+ int ret;
+ int entries;
+ uint64_t stream_id = 0;
+ struct record_check *r;
+ struct flb_config *config;
+ struct flb_time tm;
+ struct flb_ml *ml;
+ struct flb_ml_parser_ins *mlp_i;
+ struct expected_result res = {0};
+
+ /* Expected results context */
+ res.key = "log";
+ res.out_records = go_output;
+
+ /* Initialize environment */
+ config = flb_config_init();
+
+ /* Create docker multiline mode */
+ ml = flb_ml_create(config, "go-test");
+ TEST_CHECK(ml != NULL);
+
+ /* Generate an instance of multiline java parser */
+ mlp_i = flb_ml_parser_instance_create(ml, "go");
+ TEST_CHECK(mlp_i != NULL);
+
+ ret = flb_ml_stream_create(ml, "go", -1, flush_callback, (void *) &res, &stream_id);
+ TEST_CHECK(ret == 0);
+
+ entries = sizeof(go_input) / sizeof(struct record_check);
+ for (i = 0; i < entries; i++) {
+ r = &go_input[i];
+ len = strlen(r->buf);
+
+ /* Package as msgpack */
+ flb_time_get(&tm);
+ flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
+ }
+
+ if (ml) {
+ flb_ml_destroy(ml);
+ }
+
+ flb_config_exit(config);
+}
+
+static int flush_callback_to_buf(struct flb_ml_parser *parser,
+ struct flb_ml_stream *mst,
+ void *data, char *buf_data, size_t buf_size)
+{
+ msgpack_sbuffer *mp_sbuf = data;
+ msgpack_sbuffer_write(mp_sbuf, buf_data, buf_size);
+
+ return 0;
+}
+
+static void run_test(struct flb_config *config, char *test_name,
+ struct record_check *in, int in_len,
+ struct record_check *out, int out_len,
+ char *parser1, char *parser2)
+
+{
+ int i;
+ int ret;
+ int len;
+ size_t off = 0;
+ uint64_t stream1 = 0;
+ uint64_t stream2 = 0;
+ struct flb_ml *ml;
+ struct flb_ml_parser_ins *p1 = NULL;
+ struct record_check *r;
+ msgpack_sbuffer mp_sbuf1;
+ msgpack_packer mp_pck1;
+ msgpack_sbuffer mp_sbuf2;
+ msgpack_packer mp_pck2;
+ msgpack_object *map;
+ struct flb_time tm;
+ struct expected_result res = {0};
+ msgpack_unpacked result;
+
+ /* init buffers */
+ msgpack_sbuffer_init(&mp_sbuf1);
+ msgpack_packer_init(&mp_pck1, &mp_sbuf1, msgpack_sbuffer_write);
+ msgpack_sbuffer_init(&mp_sbuf2);
+ msgpack_packer_init(&mp_pck2, &mp_sbuf2, msgpack_sbuffer_write);
+
+ /* Create docker multiline mode */
+ ml = flb_ml_create(config, test_name);
+ TEST_CHECK(ml != NULL);
+
+ if (!parser1) {
+ fprintf(stderr, "run_test(): parser1 is NULL\n");
+ exit(1);
+ }
+
+ /* Parser 1 */
+ p1 = flb_ml_parser_instance_create(ml, parser1);
+ TEST_CHECK(p1 != NULL);
+
+
+ /* Stream 1: use parser name (test_name) to generate the stream id */
+ ret = flb_ml_stream_create(ml, test_name, -1,
+ flush_callback_to_buf,
+ (void *) &mp_sbuf1, &stream1);
+ TEST_CHECK(ret == 0);
+
+ /* Ingest input records into parser 1 */
+ for (i = 0; i < in_len; i++) {
+ r = &in[i];
+ len = strlen(r->buf);
+
+ flb_time_get(&tm);
+
+ /* Package as msgpack */
+ flb_ml_append_text(ml, stream1, &tm, r->buf, len);
+ }
+
+ flb_ml_destroy(ml);
+ ml = flb_ml_create(config, test_name);
+
+ flb_ml_parser_instance_create(ml, parser2);
+
+ /*
+ * After flb_ml_append above(), mp_sbuf1 has been populated with the
+ * output results as structured messages. Now this data needs to be
+ * passed to the next parser.
+ */
+
+ /* Expected results context */
+ res.key = "log";
+ res.out_records = out;
+
+ /* Stream 2 */
+ ret = flb_ml_stream_create(ml, "filter_multiline", -1,
+ flush_callback,
+ (void *) &res, &stream2);
+
+ /* Ingest input records into parser 2 */
+ off = 0;
+ msgpack_unpacked_init(&result);
+ while (msgpack_unpack_next(&result, mp_sbuf1.data, mp_sbuf1.size, &off)) {
+ flb_time_pop_from_msgpack(&tm, &result, &map);
+
+ /* Package as msgpack */
+ ret = flb_ml_append_object(ml, stream2, &tm, NULL, map);
+ }
+ flb_ml_flush_pending_now(ml);
+
+ msgpack_unpacked_destroy(&result);
+ msgpack_sbuffer_destroy(&mp_sbuf1);
+ flb_ml_destroy(ml);
+}
+
+void test_issue_3817_1()
+{
+ int ret;
+ int in_len = sizeof(issue_3817_1_input) / sizeof(struct record_check);
+ int out_len = sizeof(issue_3817_1_output) / sizeof(struct record_check);
+ struct flb_config *config;
+ struct flb_ml_parser *mlp;
+
+ /*
+ * Parser definition for a file:
+ *
+ * [MULTILINE_PARSER]
+ * name parser_3817
+ * type regex
+ * key_content log
+ * #
+ * # Regex rules for multiline parsing
+ * # ---------------------------------
+ * #
+ * # rules | state name | regex pattern | next state
+ * # ------|---------------|------------------------------------
+ * rule "start_state" "/- $/" "cont"
+ * rule "cont" "/^([1-9].*$/" "cont"
+ *
+ */
+
+ /* Initialize environment */
+ config = flb_config_init();
+
+ /* Register custom parser */
+ mlp = flb_ml_parser_create(config,
+ "parser_3817", /* name */
+ FLB_ML_REGEX, /* type */
+ NULL, /* match_str */
+ FLB_FALSE, /* negate */
+ 1000, /* flush_ms */
+ "log", /* key_content */
+ NULL, /* key_pattern */
+ NULL, /* key_group */
+ NULL, /* parser ctx */
+ NULL); /* parser name */
+ TEST_CHECK(mlp != NULL);
+
+ /* rule: start_state */
+ ret = flb_ml_rule_create(mlp, "start_state", "/- $/", "cont", NULL);
+ if (ret != 0) {
+ fprintf(stderr, "error creating rule 1");
+ }
+
+ /* rule: cont */
+ ret = flb_ml_rule_create(mlp, "cont", "/^([1-9]).*$/", "cont", NULL);
+ if (ret != 0) {
+ fprintf(stderr, "error creating rule 2");
+ }
+
+ /* initiaze the parser configuration */
+ ret = flb_ml_parser_init(mlp);
+ TEST_CHECK(ret == 0);
+
+ /* Run the test */
+ run_test(config, "issue_3817_1",
+ issue_3817_1_input, in_len,
+ issue_3817_1_output, out_len,
+ "cri", "parser_3817");
+
+ flb_config_exit(config);
+}
+
+static void test_issue_4034()
+{
+ int i;
+ int len;
+ int ret;
+ int entries;
+ uint64_t stream_id;
+ struct record_check *r;
+ struct flb_config *config;
+ struct flb_time tm;
+ struct flb_ml *ml;
+ struct flb_ml_parser_ins *mlp_i;
+ struct expected_result res = {0};
+ msgpack_packer mp_pck;
+ msgpack_sbuffer mp_sbuf;
+
+ /* Expected results context */
+ res.key = "log";
+ res.out_records = cri_output;
+
+ /* Initialize environment */
+ config = flb_config_init();
+
+ /* Create cri multiline mode */
+ ml = flb_ml_create(config, "cri-test");
+ TEST_CHECK(ml != NULL);
+
+ /* Generate an instance of multiline cri parser */
+ mlp_i = flb_ml_parser_instance_create(ml, "cri");
+ TEST_CHECK(mlp_i != NULL);
+
+ flb_ml_parser_instance_set(mlp_i, "key_content", "log");
+
+ ret = flb_ml_stream_create(ml, "cri", -1, flush_callback, (void *) &res,
+ &stream_id);
+ TEST_CHECK(ret == 0);
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ size_t off = 0;
+ msgpack_unpacked result;
+ msgpack_object root;
+ msgpack_object *map;
+
+ entries = sizeof(cri_input) / sizeof(struct record_check);
+ for (i = 0; i < entries; i++) {
+ r = &cri_input[i];
+ len = strlen(r->buf);
+
+ /* Package as msgpack */
+ flb_time_get(&tm);
+
+ /* initialize buffers */
+ msgpack_sbuffer_init(&mp_sbuf);
+ msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
+
+ msgpack_pack_array(&mp_pck, 2);
+ flb_time_append_to_msgpack(&tm, &mp_pck, 0);
+
+ msgpack_pack_map(&mp_pck, 1);
+ msgpack_pack_str(&mp_pck, 3);
+ msgpack_pack_str_body(&mp_pck, "log", 3);
+ msgpack_pack_str(&mp_pck, len);
+ msgpack_pack_str_body(&mp_pck, r->buf, len);
+
+ /* Unpack and lookup the content map */
+ msgpack_unpacked_init(&result);
+ off = 0;
+ ret = msgpack_unpack_next(&result, mp_sbuf.data, mp_sbuf.size, &off);
+
+ flb_pack_print(mp_sbuf.data, mp_sbuf.size);
+
+ root = result.data;
+ map = &root.via.array.ptr[1];
+
+ /* Package as msgpack */
+ ret = flb_ml_append_object(ml, stream_id, &tm, NULL, map);
+
+ msgpack_unpacked_destroy(&result);
+ msgpack_sbuffer_destroy(&mp_sbuf);
+ }
+ flb_ml_flush_pending_now(ml);
+
+ if (ml) {
+ flb_ml_destroy(ml);
+ }
+
+ flb_config_exit(config);
+}
+
+static void test_issue_5504()
+{
+ uint64_t last_flush;
+ struct flb_config *config;
+ struct flb_ml *ml;
+ struct flb_ml_parser_ins *mlp_i;
+ struct mk_event_loop *evl;
+ struct flb_sched *sched;
+ struct mk_list *tmp;
+ struct mk_list *head;
+ struct flb_sched_timer *timer;
+ void (*cb)(struct flb_config *, void *);
+ int timeout = 500;
+
+#ifdef _WIN32
+ WSADATA wsa_data;
+ WSAStartup(0x0201, &wsa_data);
+#endif
+
+ /* Initialize environment */
+ config = flb_config_init();
+
+ /* Create the event loop */
+ evl = config->evl;
+ config->evl = mk_event_loop_create(32);
+ TEST_CHECK(config->evl != NULL);
+
+ /* Initialize the scheduler */
+ sched = config->sched;
+ config->sched = flb_sched_create(config, config->evl);
+ TEST_CHECK(config->sched != NULL);
+
+ /* Set the thread local scheduler */
+ flb_sched_ctx_init();
+ flb_sched_ctx_set(config->sched);
+
+ ml = flb_ml_create(config, "5504-test");
+ TEST_CHECK(ml != NULL);
+
+ /* Generate an instance of any multiline parser */
+ mlp_i = flb_ml_parser_instance_create(ml, "cri");
+ TEST_CHECK(mlp_i != NULL);
+
+ flb_ml_parser_instance_set(mlp_i, "key_content", "log");
+
+ /* Set the flush timeout */
+ ml->flush_ms = timeout;
+
+ /* Initialize the auto flush */
+ flb_ml_auto_flush_init(ml);
+
+ /* Store the initial last_flush time */
+ last_flush = ml->last_flush;
+
+ /* Find the cb_ml_flush_timer callback from the timers */
+ mk_list_foreach_safe(head, tmp, &((struct flb_sched *)config->sched)->timers) {
+ timer = mk_list_entry(head, struct flb_sched_timer, _head);
+ if (timer->type == FLB_SCHED_TIMER_CB_PERM) {
+ cb = timer->cb;
+ }
+ }
+ TEST_CHECK(cb != NULL);
+
+ /* Trigger the callback without delay */
+ cb(config, ml);
+ /* This should not update the last_flush since it is before the timeout */
+ TEST_CHECK(ml->last_flush == last_flush);
+
+ /* Sleep just enough time to pass the timeout */
+ flb_time_msleep(timeout + 1);
+
+ /* Retrigger the callback */
+ cb(config, ml);
+ /* Ensure this time the last_flush has been updated */
+ TEST_CHECK(ml->last_flush > last_flush);
+
+ /* Cleanup */
+ flb_sched_destroy(config->sched);
+ config->sched = sched;
+ mk_event_loop_destroy(config->evl);
+ config->evl = evl;
+ flb_ml_destroy(ml);
+ flb_config_exit(config);
+
+#ifdef _WIN32
+ WSACleanup();
+#endif
+}
+
+TEST_LIST = {
+ /* Normal features tests */
+ { "parser_docker", test_parser_docker},
+ { "parser_cri", test_parser_cri},
+ { "parser_java", test_parser_java},
+ { "parser_python", test_parser_python},
+ { "parser_ruby", test_parser_ruby},
+ { "parser_elastic", test_parser_elastic},
+ { "parser_go", test_parser_go},
+ { "container_mix", test_container_mix},
+ { "endswith", test_endswith},
+
+ /* Issues reported on Github */
+ { "issue_3817_1" , test_issue_3817_1},
+ { "issue_4034" , test_issue_4034},
+ { "issue_4949" , test_issue_4949},
+ { "issue_5504" , test_issue_5504},
+ { 0 }
+};