summaryrefslogtreecommitdiffstats
path: root/fluent-bit/tests/internal/multiline.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/tests/internal/multiline.c')
-rw-r--r--fluent-bit/tests/internal/multiline.c1478
1 files changed, 0 insertions, 1478 deletions
diff --git a/fluent-bit/tests/internal/multiline.c b/fluent-bit/tests/internal/multiline.c
deleted file mode 100644
index e175e117..00000000
--- a/fluent-bit/tests/internal/multiline.c
+++ /dev/null
@@ -1,1478 +0,0 @@
-/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
-
-#include <fluent-bit/flb_info.h>
-#include <fluent-bit/flb_pack.h>
-#include <fluent-bit/flb_parser.h>
-#include <fluent-bit/flb_mem.h>
-#include <fluent-bit/multiline/flb_ml.h>
-#include <fluent-bit/multiline/flb_ml_rule.h>
-#include <fluent-bit/multiline/flb_ml_parser.h>
-
-#include "flb_tests_internal.h"
-
-struct record_check {
- char *buf;
-};
-
-struct expected_result {
- int current_record;
- char *key;
- struct record_check *out_records;
-};
-
-/* Docker */
-struct record_check docker_input[] = {
- {"{\"log\": \"aa\\n\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01231z\"}"},
- {"{\"log\": \"aa\\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01231z\"}"},
- {"{\"log\": \"bb\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01232z\"}"},
- {"{\"log\": \"cc\n\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01233z\"}"},
- {"{\"log\": \"dd\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01233z\"}"},
- {"single line to force pending flush of the previous line"},
- {"{\"log\": \"ee\\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01234z\"}"},
-};
-
-struct record_check docker_output[] = {
- {"aa\n"},
- {"aa\n"},
- {"bbcc\n"},
- {"dd"},
- {"single line to force pending flush of the previous line"},
- {"ee\n"},
-};
-
-/* CRI */
-struct record_check cri_input[] = {
- {"2019-05-07T18:57:50.904275087+00:00 stdout P 1a. some "},
- {"2019-05-07T18:57:51.904275088+00:00 stdout P multiline "},
- {"2019-05-07T18:57:52.904275089+00:00 stdout F log"},
- {"2019-05-07T18:57:50.904275087+00:00 stderr P 1b. some "},
- {"2019-05-07T18:57:51.904275088+00:00 stderr P multiline "},
- {"2019-05-07T18:57:52.904275089+00:00 stderr F log"},
- {"2019-05-07T18:57:53.904275090+00:00 stdout P 2a. another "},
- {"2019-05-07T18:57:54.904275091+00:00 stdout P multiline "},
- {"2019-05-07T18:57:55.904275092+00:00 stdout F log"},
- {"2019-05-07T18:57:53.904275090+00:00 stderr P 2b. another "},
- {"2019-05-07T18:57:54.904275091+00:00 stderr P multiline "},
- {"2019-05-07T18:57:55.904275092+00:00 stderr F log"},
- {"2019-05-07T18:57:56.904275093+00:00 stdout F 3a. non multiline 1"},
- {"2019-05-07T18:57:57.904275094+00:00 stdout F 4a. non multiline 2"},
- {"2019-05-07T18:57:56.904275093+00:00 stderr F 3b. non multiline 1"},
- {"2019-05-07T18:57:57.904275094+00:00 stderr F 4b. non multiline 2"}
-};
-
-struct record_check cri_output[] = {
- {"1a. some multiline log"},
- {"1b. some multiline log"},
- {"2a. another multiline log"},
- {"2b. another multiline log"},
- {"3a. non multiline 1"},
- {"4a. non multiline 2"},
- {"3b. non multiline 1"},
- {"4b. non multiline 2"}
-};
-
-/* ENDSWITH */
-struct record_check endswith_input[] = {
- {"1a. some multiline log \\"},
- {"1b. some multiline log"},
- {"2a. another multiline log\\"},
- {"2b. another multiline log"},
- {"3a. non multiline 1"},
- {"4a. non multiline 2"}
-};
-
-struct record_check endswith_output[] = {
- {"1a. some multiline log \\\n1b. some multiline log\n"},
- {"2a. another multiline log\\\n2b. another multiline log\n"},
- {"3a. non multiline 1\n"},
- {"4a. non multiline 2\n"}
-};
-
-/* Mixed lines of Docker and CRI logs in different streams (stdout/stderr) */
-struct record_check container_mix_input[] = {
- {"{\"log\": \"a1\\n\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01231z\"}"},
- {"{\"log\": \"a2\\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01231z\"}"},
- {"{\"log\": \"bb\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01232z\"}"},
- {"{\"log\": \"cc\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01233z\"}"},
- {"{\"log\": \"dd\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01232z\"}"},
- {"{\"log\": \"ee\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01233z\"}"},
- {"2019-05-07T18:57:52.904275089+00:00 stdout F single full"},
- {"2019-05-07T18:57:50.904275087+00:00 stdout P 1a. some "},
- {"2019-05-07T18:57:51.904275088+00:00 stdout P multiline "},
- {"2019-05-07T18:57:52.904275089+00:00 stdout F log"},
- {"2019-05-07T18:57:50.904275087+00:00 stderr P 1b. some "},
- {"2019-05-07T18:57:51.904275088+00:00 stderr P multiline "},
- {"2019-05-07T18:57:52.904275089+00:00 stderr F log"},
- {"{\"log\": \"dd-out\\n\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01234z\"}"},
- {"{\"log\": \"dd-err\\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01234z\"}"},
-};
-
-struct record_check container_mix_output[] = {
- {"a1\n"},
- {"a2\n"},
- {"ddee\n"},
- {"bbcc"},
- {"single full"},
- {"1a. some multiline log"},
- {"1b. some multiline log"},
- {"dd-out\n"},
- {"dd-err\n"},
-};
-
-/* Java stacktrace detection */
-struct record_check java_input[] = {
- {"Exception in thread \"main\" java.lang.IllegalStateException: ..null property\n"},
- {" at com.example.myproject.Author.getBookIds(xx.java:38)\n"},
- {" at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\n"},
- {"Caused by: java.lang.NullPointerException\n"},
- {" at com.example.myproject.Book.getId(Book.java:22)\n"},
- {" at com.example.myproject.Author.getBookIds(Author.java:35)\n"},
- {" ... 1 more\n"},
- {"single line\n"}
-};
-
-struct record_check java_output[] = {
- {
- "Exception in thread \"main\" java.lang.IllegalStateException: ..null property\n"
- " at com.example.myproject.Author.getBookIds(xx.java:38)\n"
- " at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\n"
- "Caused by: java.lang.NullPointerException\n"
- " at com.example.myproject.Book.getId(Book.java:22)\n"
- " at com.example.myproject.Author.getBookIds(Author.java:35)\n"
- " ... 1 more\n"
- },
- {
- "single line\n"
- }
-};
-
-struct record_check ruby_input[] = {
- {"/app/config/routes.rb:6:in `/': divided by 0 (ZeroDivisionError)"},
- {" from /app/config/routes.rb:6:in `block in <main>'"},
- {" from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:428:in `instance_exec'"},
- {" from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:428:in `eval_block'"},
- {" from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:410:in `draw'"},
- {" from /app/config/routes.rb:1:in `<main>'"},
- {"hello world, not multiline\n"}
-};
-
-struct record_check ruby_output[] = {
- {
- "/app/config/routes.rb:6:in `/': divided by 0 (ZeroDivisionError)\n"
- " from /app/config/routes.rb:6:in `block in <main>'\n"
- " from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:428:in `instance_exec'\n"
- " from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:428:in `eval_block'\n"
- " from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:410:in `draw'\n"
- " from /app/config/routes.rb:1:in `<main>'\n"
- },
- {"hello world, not multiline\n"}
-};
-
-/* Python stacktrace detection */
-struct record_check python_input[] = {
- {"Traceback (most recent call last):\n"},
- {" File \"/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py\", line 1535, in __call__\n"},
- {" rv = self.handle_exception(request, response, e)\n"},
- {" File \"/base/data/home/apps/s~nearfieldspy/1.378705245900539993/nearfieldspy.py\", line 17, in start\n"},
- {" return get()\n"},
- {" File \"/base/data/home/apps/s~nearfieldspy/1.378705245900539993/nearfieldspy.py\", line 5, in get\n"},
- {" raise Exception('spam', 'eggs')\n"},
- {"Exception: ('spam', 'eggs')\n"},
- {"hello world, not multiline\n"}
-};
-
-struct record_check python_output[] = {
- {
- "Traceback (most recent call last):\n"
- " File \"/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py\", line 1535, in __call__\n"
- " rv = self.handle_exception(request, response, e)\n"
- " File \"/base/data/home/apps/s~nearfieldspy/1.378705245900539993/nearfieldspy.py\", line 17, in start\n"
- " return get()\n"
- " File \"/base/data/home/apps/s~nearfieldspy/1.378705245900539993/nearfieldspy.py\", line 5, in get\n"
- " raise Exception('spam', 'eggs')\n"
- "Exception: ('spam', 'eggs')\n"
- },
- {"hello world, not multiline\n"}
-};
-
-/* Custom example for Elasticsearch stacktrace */
-struct record_check elastic_input[] = {
- {"[some weird test] IndexNotFoundException[no such index]\n"},
- {" at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver....\n"},
- {" at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.java:133)\n"},
- {" at org.elasticsearch.action.admin.indices.delete.java:75)\n"},
- {"another separate log line\n"}
-};
-
-struct record_check elastic_output[] = {
- {
- "[some weird test] IndexNotFoundException[no such index]\n"
- " at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver....\n"
- " at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.java:133)\n"
- " at org.elasticsearch.action.admin.indices.delete.java:75)\n"
- },
- {
- "another separate log line\n"
- }
-};
-
-/* Go */
-struct record_check go_input[] = {
- {"panic: my panic\n"},
- {"\n"},
- {"goroutine 4 [running]:\n"},
- {"panic(0x45cb40, 0x47ad70)\n"},
- {" /usr/local/go/src/runtime/panic.go:542 +0x46c fp=0xc42003f7b8 sp=0xc42003f710 pc=0x422f7c\n"},
- {"main.main.func1(0xc420024120)\n"},
- {" foo.go:6 +0x39 fp=0xc42003f7d8 sp=0xc42003f7b8 pc=0x451339\n"},
- {"runtime.goexit()\n"},
- {" /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003f7e0 sp=0xc42003f7d8 pc=0x44b4d1\n"},
- {"created by main.main\n"},
- {" foo.go:5 +0x58\n"},
- {"\n"},
- {"goroutine 1 [chan receive]:\n"},
- {"runtime.gopark(0x4739b8, 0xc420024178, 0x46fcd7, 0xc, 0xc420028e17, 0x3)\n"},
- {" /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc420053e30 sp=0xc420053e00 pc=0x42503c\n"},
- {"runtime.goparkunlock(0xc420024178, 0x46fcd7, 0xc, 0x1000f010040c217, 0x3)\n"},
- {" /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc420053e70 sp=0xc420053e30 pc=0x42512e\n"},
- {"runtime.chanrecv(0xc420024120, 0x0, 0xc420053f01, 0x4512d8)\n"},
- {" /usr/local/go/src/runtime/chan.go:506 +0x304 fp=0xc420053f20 sp=0xc420053e70 pc=0x4046b4\n"},
- {"runtime.chanrecv1(0xc420024120, 0x0)\n"},
- {" /usr/local/go/src/runtime/chan.go:388 +0x2b fp=0xc420053f50 sp=0xc420053f20 pc=0x40439b\n"},
- {"main.main()\n"},
- {" foo.go:9 +0x6f fp=0xc420053f80 sp=0xc420053f50 pc=0x4512ef\n"},
- {"runtime.main()\n"},
- {" /usr/local/go/src/runtime/proc.go:185 +0x20d fp=0xc420053fe0 sp=0xc420053f80 pc=0x424bad\n"},
- {"runtime.goexit()\n"},
- {" /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc420053fe8 sp=0xc420053fe0 pc=0x44b4d1\n"},
- {"\n"},
- {"goroutine 2 [force gc (idle)]:\n"},
- {"runtime.gopark(0x4739b8, 0x4ad720, 0x47001e, 0xf, 0x14, 0x1)\n"},
- {" /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003e768 sp=0xc42003e738 pc=0x42503c\n"},
- {"runtime.goparkunlock(0x4ad720, 0x47001e, 0xf, 0xc420000114, 0x1)\n"},
- {" /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003e7a8 sp=0xc42003e768 pc=0x42512e\n"},
- {"runtime.forcegchelper()\n"},
- {" /usr/local/go/src/runtime/proc.go:238 +0xcc fp=0xc42003e7e0 sp=0xc42003e7a8 pc=0x424e5c\n"},
- {"runtime.goexit()\n"},
- {" /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003e7e8 sp=0xc42003e7e0 pc=0x44b4d1\n"},
- {"created by runtime.init.4\n"},
- {" /usr/local/go/src/runtime/proc.go:227 +0x35\n"},
- {"\n"},
- {"goroutine 3 [GC sweep wait]:\n"},
- {"runtime.gopark(0x4739b8, 0x4ad7e0, 0x46fdd2, 0xd, 0x419914, 0x1)\n"},
- {" /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003ef60 sp=0xc42003ef30 pc=0x42503c\n"},
- {"runtime.goparkunlock(0x4ad7e0, 0x46fdd2, 0xd, 0x14, 0x1)\n"},
- {" /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003efa0 sp=0xc42003ef60 pc=0x42512e\n"},
- {"runtime.bgsweep(0xc42001e150)\n"},
- {" /usr/local/go/src/runtime/mgcsweep.go:52 +0xa3 fp=0xc42003efd8 sp=0xc42003efa0 pc=0x419973\n"},
- {"runtime.goexit()\n"},
- {" /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003efe0 sp=0xc42003efd8 pc=0x44b4d1\n"},
- {"created by runtime.gcenable\n"},
- {" /usr/local/go/src/runtime/mgc.go:216 +0x58\n"},
- {"one more line, no multiline"}
-};
-
-struct record_check go_output[] = {
- {
- "panic: my panic\n"
- "\n"
- "goroutine 4 [running]:\n"
- "panic(0x45cb40, 0x47ad70)\n"
- " /usr/local/go/src/runtime/panic.go:542 +0x46c fp=0xc42003f7b8 sp=0xc42003f710 pc=0x422f7c\n"
- "main.main.func1(0xc420024120)\n"
- " foo.go:6 +0x39 fp=0xc42003f7d8 sp=0xc42003f7b8 pc=0x451339\n"
- "runtime.goexit()\n"
- " /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003f7e0 sp=0xc42003f7d8 pc=0x44b4d1\n"
- "created by main.main\n"
- " foo.go:5 +0x58\n"
- "\n"
- "goroutine 1 [chan receive]:\n"
- "runtime.gopark(0x4739b8, 0xc420024178, 0x46fcd7, 0xc, 0xc420028e17, 0x3)\n"
- " /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc420053e30 sp=0xc420053e00 pc=0x42503c\n"
- "runtime.goparkunlock(0xc420024178, 0x46fcd7, 0xc, 0x1000f010040c217, 0x3)\n"
- " /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc420053e70 sp=0xc420053e30 pc=0x42512e\n"
- "runtime.chanrecv(0xc420024120, 0x0, 0xc420053f01, 0x4512d8)\n"
- " /usr/local/go/src/runtime/chan.go:506 +0x304 fp=0xc420053f20 sp=0xc420053e70 pc=0x4046b4\n"
- "runtime.chanrecv1(0xc420024120, 0x0)\n"
- " /usr/local/go/src/runtime/chan.go:388 +0x2b fp=0xc420053f50 sp=0xc420053f20 pc=0x40439b\n"
- "main.main()\n"
- " foo.go:9 +0x6f fp=0xc420053f80 sp=0xc420053f50 pc=0x4512ef\n"
- "runtime.main()\n"
- " /usr/local/go/src/runtime/proc.go:185 +0x20d fp=0xc420053fe0 sp=0xc420053f80 pc=0x424bad\n"
- "runtime.goexit()\n"
- " /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc420053fe8 sp=0xc420053fe0 pc=0x44b4d1\n"
- "\n"
- "goroutine 2 [force gc (idle)]:\n"
- "runtime.gopark(0x4739b8, 0x4ad720, 0x47001e, 0xf, 0x14, 0x1)\n"
- " /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003e768 sp=0xc42003e738 pc=0x42503c\n"
- "runtime.goparkunlock(0x4ad720, 0x47001e, 0xf, 0xc420000114, 0x1)\n"
- " /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003e7a8 sp=0xc42003e768 pc=0x42512e\n"
- "runtime.forcegchelper()\n"
- " /usr/local/go/src/runtime/proc.go:238 +0xcc fp=0xc42003e7e0 sp=0xc42003e7a8 pc=0x424e5c\n"
- "runtime.goexit()\n"
- " /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003e7e8 sp=0xc42003e7e0 pc=0x44b4d1\n"
- "created by runtime.init.4\n"
- " /usr/local/go/src/runtime/proc.go:227 +0x35\n"
- "\n"
- "goroutine 3 [GC sweep wait]:\n"
- "runtime.gopark(0x4739b8, 0x4ad7e0, 0x46fdd2, 0xd, 0x419914, 0x1)\n"
- " /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003ef60 sp=0xc42003ef30 pc=0x42503c\n"
- "runtime.goparkunlock(0x4ad7e0, 0x46fdd2, 0xd, 0x14, 0x1)\n"
- " /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003efa0 sp=0xc42003ef60 pc=0x42512e\n"
- "runtime.bgsweep(0xc42001e150)\n"
- " /usr/local/go/src/runtime/mgcsweep.go:52 +0xa3 fp=0xc42003efd8 sp=0xc42003efa0 pc=0x419973\n"
- "runtime.goexit()\n"
- " /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003efe0 sp=0xc42003efd8 pc=0x44b4d1\n"
- "created by runtime.gcenable\n"
- " /usr/local/go/src/runtime/mgc.go:216 +0x58\n"
- },
- {"one more line, no multiline\n"}
-};
-
-/*
- * Issue 3817 (case: 1)
- * --------------------
- * Source CRI messages (need first CRI multiline parsing) + a custom multiline
- * parser.
- *
- * - https://github.com/fluent/fluent-bit/issues/3817
- *
- * The 'case 1' represents the problems of identifying two consecutive multiline
- * messages within the same stream.
- */
-struct record_check issue_3817_1_input[] = {
- {"2021-05-17T17:35:01.184675702Z stdout F [DEBUG] 1 start multiline - "},
- {"2021-05-17T17:35:01.184747208Z stdout F 1 cont A"},
- {"2021-05-17T17:35:01.184675702Z stdout F [DEBUG] 2 start multiline - "},
- {"2021-05-17T17:35:01.184747208Z stdout F 2 cont B"},
- {"another isolated line"}
-};
-
-struct record_check issue_3817_1_output[] = {
- {
- "[DEBUG] 1 start multiline - \n"
- "1 cont A"
- },
-
- {
- "[DEBUG] 2 start multiline - \n"
- "2 cont B"
- },
-
- {
- "another isolated line"
- }
-};
-
-/*
- * Flush callback is invoked every time a multiline stream has completed a multiline
- * message or a message is not multiline.
- */
-static int flush_callback(struct flb_ml_parser *parser,
- struct flb_ml_stream *mst,
- void *data, char *buf_data, size_t buf_size)
-{
- int i;
- int ret;
- int len;
- int found = FLB_FALSE;
- size_t off = 0;
- msgpack_unpacked result;
- msgpack_object *map;
- msgpack_object key;
- msgpack_object val;
- struct flb_time tm;
- struct expected_result *res = data;
- struct record_check *exp;
-
- fprintf(stdout, "\n%s----- MULTILINE FLUSH -----%s\n", ANSI_YELLOW, ANSI_RESET);
-
- /* Print incoming flush buffer */
- flb_pack_print(buf_data, buf_size);
-
- fprintf(stdout, "%s----------- EOF -----------%s\n",
- ANSI_YELLOW, ANSI_RESET);
-
- /* Validate content */
- msgpack_unpacked_init(&result);
- off = 0;
- ret = msgpack_unpack_next(&result, buf_data, buf_size, &off);
- TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS);
-
- flb_time_pop_from_msgpack(&tm, &result, &map);
-
- TEST_CHECK(flb_time_to_nanosec(&tm) != 0L);
-
- exp = &res->out_records[res->current_record];
- len = strlen(res->key);
- for (i = 0; i < map->via.map.size; i++) {
- key = map->via.map.ptr[i].key;
- val = map->via.map.ptr[i].val;
-
- if (key.via.str.size != len) {
- continue;
- }
-
- if (strncmp(key.via.str.ptr, res->key, len) == 0) {
- found = FLB_TRUE;
- break;
- }
- }
- TEST_CHECK(found == FLB_TRUE);
-
- len = strlen(exp->buf);
- TEST_CHECK(val.via.str.size == len);
- if (val.via.str.size != len) {
- printf("expected length: %i, received: %i\n", len, val.via.str.size);
- printf("== received ==\n");
- msgpack_object_print(stdout, val);
- printf("\n\n");
- printf("== expected ==\n%s\n", exp->buf);
- exit(1);
- }
- TEST_CHECK(memcmp(val.via.str.ptr, exp->buf, len) == 0);
- res->current_record++;
-
- msgpack_unpacked_destroy(&result);
- return 0;
-}
-
-static void test_parser_docker()
-{
- int i;
- int len;
- int ret;
- int entries;
- uint64_t stream_id;
- struct record_check *r;
- struct flb_config *config;
- struct flb_time tm;
- struct flb_ml *ml;
- struct flb_ml_parser_ins *mlp_i;
- struct expected_result res = {0};
-
- /* Expected results context */
- res.key = "log";
- res.out_records = docker_output;
-
- /* Initialize environment */
- config = flb_config_init();
-
- /* Create docker multiline mode */
- ml = flb_ml_create(config, "test-docker");
- TEST_CHECK(ml != NULL);
-
- /* Load instances of the parsers for current 'ml' context */
- mlp_i = flb_ml_parser_instance_create(ml, "cri");
- TEST_CHECK(mlp_i != NULL);
-
- /* Generate an instance of multiline docker parser */
- mlp_i = flb_ml_parser_instance_create(ml, "docker");
- TEST_CHECK(mlp_i != NULL);
-
- ret = flb_ml_stream_create(ml, "docker", -1, flush_callback, (void *) &res,
- &stream_id);
- TEST_CHECK(ret == 0);
-
- entries = sizeof(docker_input) / sizeof(struct record_check);
- for (i = 0; i < entries; i++) {
- r = &docker_input[i];
- len = strlen(r->buf);
-
- flb_time_get(&tm);
-
- /* Package as msgpack */
- flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
- }
-
- if (ml) {
- flb_ml_destroy(ml);
- }
-
- flb_config_exit(config);
-}
-
-static void test_parser_cri()
-{
- int i;
- int len;
- int ret;
- int entries;
- uint64_t stream_id;
- struct record_check *r;
- struct flb_config *config;
- struct flb_time tm;
- struct flb_ml *ml;
- struct flb_ml_parser_ins *mlp_i;
- struct expected_result res = {0};
-
- /* Expected results context */
- res.key = "log";
- res.out_records = cri_output;
-
- /* Initialize environment */
- config = flb_config_init();
-
- /* Create docker multiline mode */
- ml = flb_ml_create(config, "cri-test");
- TEST_CHECK(ml != NULL);
-
- /* Generate an instance of multiline docker parser */
- mlp_i = flb_ml_parser_instance_create(ml, "docker");
- TEST_CHECK(mlp_i != NULL);
-
- /* Load instances of the parsers for current 'ml' context */
- mlp_i = flb_ml_parser_instance_create(ml, "cri");
- TEST_CHECK(mlp_i != NULL);
-
- ret = flb_ml_stream_create(ml, "cri", -1, flush_callback, (void *) &res,
- &stream_id);
- TEST_CHECK(ret == 0);
-
- entries = sizeof(cri_input) / sizeof(struct record_check);
- for (i = 0; i < entries; i++) {
- r = &cri_input[i];
- len = strlen(r->buf);
- flb_time_get(&tm);
-
- /* Package as msgpack */
- flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
- }
-
- if (ml) {
- flb_ml_destroy(ml);
- }
-
- flb_config_exit(config);
-}
-
-static void test_container_mix()
-{
- int i;
- int len;
- int ret;
- int entries;
- uint64_t stream_id;
- struct record_check *r;
- struct flb_config *config;
- struct flb_time tm;
- struct flb_ml *ml;
- struct flb_ml_parser_ins *mlp_i;
- struct expected_result res = {0};
-
- /* Expected results context */
- res.key = "log";
- res.out_records = container_mix_output;
-
- /* Initialize environment */
- config = flb_config_init();
-
- /* Create docker multiline mode */
- ml = flb_ml_create(config, "container-mix-test");
- TEST_CHECK(ml != NULL);
-
- /* Generate an instance of multiline docker parser */
- mlp_i = flb_ml_parser_instance_create(ml, "docker");
- TEST_CHECK(mlp_i != NULL);
-
- /* Load instances of the parsers for current 'ml' context */
- mlp_i = flb_ml_parser_instance_create(ml, "cri");
- TEST_CHECK(mlp_i != NULL);
-
- ret = flb_ml_stream_create(ml, "container-mix", -1, flush_callback, (void *) &res,
- &stream_id);
- TEST_CHECK(ret == 0);
-
- entries = sizeof(container_mix_input) / sizeof(struct record_check);
- for (i = 0; i < entries; i++) {
- r = &container_mix_input[i];
- len = strlen(r->buf);
- flb_time_get(&tm);
-
- /* Package as msgpack */
- flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
- }
-
- if (ml) {
- flb_ml_destroy(ml);
- }
-
- flb_config_exit(config);
-}
-
-static void test_parser_java()
-{
- int i;
- int len;
- int ret;
- int entries;
- uint64_t stream_id;
- struct record_check *r;
- struct flb_config *config;
- struct flb_time tm;
- struct flb_ml *ml;
- struct flb_ml_parser_ins *mlp_i;
- struct expected_result res = {0};
- msgpack_packer mp_pck;
- msgpack_sbuffer mp_sbuf;
-
- /* Expected results context */
- res.key = "log";
- res.out_records = java_output;
-
- /* Initialize environment */
- config = flb_config_init();
-
- /* Create docker multiline mode */
- ml = flb_ml_create(config, "java-test");
- TEST_CHECK(ml != NULL);
-
- /* Generate an instance of multiline java parser */
- mlp_i = flb_ml_parser_instance_create(ml, "java");
- TEST_CHECK(mlp_i != NULL);
-
- flb_ml_parser_instance_set(mlp_i, "key_content", "log");
-
- ret = flb_ml_stream_create(ml, "java", -1, flush_callback, (void *) &res,
- &stream_id);
- TEST_CHECK(ret == 0);
-
- /* initialize buffers */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- size_t off = 0;
- msgpack_unpacked result;
- msgpack_object root;
- msgpack_object *map;
-
- entries = sizeof(java_input) / sizeof(struct record_check);
- for (i = 0; i < entries; i++) {
- r = &java_input[i];
- len = strlen(r->buf);
-
- /* Package as msgpack */
- flb_time_get(&tm);
-
- /* initialize buffers */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- msgpack_pack_array(&mp_pck, 2);
- flb_time_append_to_msgpack(&tm, &mp_pck, 0);
-
- msgpack_pack_map(&mp_pck, 1);
- msgpack_pack_str(&mp_pck, 3);
- msgpack_pack_str_body(&mp_pck, "log", 3);
- msgpack_pack_str(&mp_pck, len);
- msgpack_pack_str_body(&mp_pck, r->buf, len);
-
- /* Unpack and lookup the content map */
- msgpack_unpacked_init(&result);
- off = 0;
- ret = msgpack_unpack_next(&result, mp_sbuf.data, mp_sbuf.size, &off);
-
- flb_pack_print(mp_sbuf.data, mp_sbuf.size);
-
- root = result.data;
- map = &root.via.array.ptr[1];
-
- /* Package as msgpack */
- ret = flb_ml_append_object(ml, stream_id, &tm, NULL, map);
-
- msgpack_unpacked_destroy(&result);
- msgpack_sbuffer_destroy(&mp_sbuf);
- }
-
- if (ml) {
- flb_ml_destroy(ml);
- }
-
- flb_config_exit(config);
-}
-
-static void test_parser_python()
-{
- int i;
- int len;
- int ret;
- int entries;
- uint64_t stream_id;
- msgpack_packer mp_pck;
- msgpack_sbuffer mp_sbuf;
- struct record_check *r;
- struct flb_config *config;
- struct flb_time tm;
- struct flb_ml *ml;
- struct flb_ml_parser_ins *mlp_i;
- struct expected_result res = {0};
-
- /* Expected results context */
- res.key = "log";
- res.out_records = python_output;
-
- /* initialize buffers */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- /* Initialize environment */
- config = flb_config_init();
-
- /* Create docker multiline mode */
- ml = flb_ml_create(config, "python-test");
- TEST_CHECK(ml != NULL);
-
- /* Generate an instance of multiline python parser */
- mlp_i = flb_ml_parser_instance_create(ml, "python");
- TEST_CHECK(mlp_i != NULL);
-
- ret = flb_ml_stream_create(ml, "python", -1, flush_callback, (void *) &res,
- &stream_id);
- TEST_CHECK(ret == 0);
-
- flb_time_get(&tm);
-
- printf("\n");
- entries = sizeof(python_input) / sizeof(struct record_check);
- for (i = 0; i < entries; i++) {
- r = &python_input[i];
- len = strlen(r->buf);
-
- /* Package as msgpack */
- flb_time_get(&tm);
- flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
- }
-
- if (ml) {
- flb_ml_destroy(ml);
- }
-
- flb_config_exit(config);
-}
-
-static void test_parser_ruby()
-{
- int i;
- int len;
- int ret;
- int entries;
- uint64_t stream_id;
- msgpack_packer mp_pck;
- msgpack_sbuffer mp_sbuf;
- struct record_check *r;
- struct flb_config *config;
- struct flb_time tm;
- struct flb_ml *ml;
- struct flb_ml_parser_ins *mlp_i;
- struct expected_result res = {0};
-
- /* Expected results context */
- res.key = "log";
- res.out_records = ruby_output;
-
- /* initialize buffers */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- /* Initialize environment */
- config = flb_config_init();
-
- /* Create docker multiline mode */
- ml = flb_ml_create(config, "ruby-test");
- TEST_CHECK(ml != NULL);
-
- /* Generate an instance of multiline ruby parser */
- mlp_i = flb_ml_parser_instance_create(ml, "ruby");
- TEST_CHECK(mlp_i != NULL);
-
- ret = flb_ml_stream_create(ml, "ruby", -1, flush_callback, (void *) &res,
- &stream_id);
- TEST_CHECK(ret == 0);
-
- flb_time_get(&tm);
-
- printf("\n");
- entries = sizeof(ruby_input) / sizeof(struct record_check);
- for (i = 0; i < entries; i++) {
- r = &ruby_input[i];
- len = strlen(r->buf);
-
- /* Package as msgpack */
- flb_time_get(&tm);
- flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
- }
-
- if (ml) {
- flb_ml_destroy(ml);
- }
-
- flb_config_exit(config);
-}
-
-static void test_issue_4949()
-{
- int i;
- int len;
- int ret;
- int entries;
- uint64_t stream_id;
- msgpack_packer mp_pck;
- msgpack_sbuffer mp_sbuf;
- struct record_check *r;
- struct flb_config *config;
- struct flb_time tm;
- struct flb_ml *ml;
- struct flb_ml_parser_ins *mlp_i;
- struct expected_result res = {0};
-
- /* Expected results context */
- res.key = "log";
- res.out_records = python_output;
-
- /* initialize buffers */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- /* Initialize environment */
- config = flb_config_init();
-
- /* Create docker multiline mode */
- ml = flb_ml_create(config, "python-test");
- TEST_CHECK(ml != NULL);
-
- /* Generate an instance of multiline python parser */
- mlp_i = flb_ml_parser_instance_create(ml, "python");
- TEST_CHECK(mlp_i != NULL);
-
- ret = flb_ml_stream_create(ml, "python", -1, flush_callback, (void *) &res,
- &stream_id);
- TEST_CHECK(ret == 0);
-
- /* Generate an instance of multiline java parser */
- mlp_i = flb_ml_parser_instance_create(ml, "java");
- TEST_CHECK(mlp_i != NULL);
-
- ret = flb_ml_stream_create(ml, "java", -1, flush_callback, (void *) &res,
- &stream_id);
- TEST_CHECK(ret == 0);
-
- flb_time_get(&tm);
-
- printf("\n");
- entries = sizeof(python_input) / sizeof(struct record_check);
- for (i = 0; i < entries; i++) {
- r = &python_input[i];
- len = strlen(r->buf);
-
- /* Package as msgpack */
- flb_time_get(&tm);
- flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
- }
-
- if (ml) {
- flb_ml_destroy(ml);
- }
-
- flb_config_exit(config);
-}
-
-static void test_parser_elastic()
-{
- int i;
- int len;
- int ret;
- int entries;
- size_t off = 0;
- uint64_t stream_id;
- msgpack_packer mp_pck;
- msgpack_sbuffer mp_sbuf;
- msgpack_unpacked result;
- msgpack_object root;
- msgpack_object *map;
- struct record_check *r;
- struct flb_config *config;
- struct flb_time tm;
- struct flb_ml *ml;
- struct flb_ml_parser *mlp;
- struct flb_ml_parser_ins *mlp_i;
- struct expected_result res = {0};
-
- /* Expected results context */
- res.key = "log";
- res.out_records = elastic_output;
-
- /* Initialize environment */
- config = flb_config_init();
-
- ml = flb_ml_create(config, "test-elastic");
- TEST_CHECK(ml != NULL);
-
- mlp = flb_ml_parser_create(config,
- "elastic", /* name */
- FLB_ML_REGEX, /* type */
- NULL, /* match_str */
- FLB_FALSE, /* negate */
- 1000, /* flush_ms */
- "log", /* key_content */
- NULL, /* key_pattern */
- NULL, /* key_group */
- NULL, /* parser ctx */
- NULL); /* parser name */
- TEST_CHECK(mlp != NULL);
-
- mlp_i = flb_ml_parser_instance_create(ml, "elastic");
- TEST_CHECK(mlp_i != NULL);
-
- ret = flb_ml_rule_create(mlp, "start_state", "/^\\[/", "elastic_cont", NULL);
- if (ret != 0) {
- fprintf(stderr, "error creating rule 1");
- }
-
- ret = flb_ml_rule_create(mlp, "elastic_cont", "/^\\s+/", "elastic_cont", NULL);
- if (ret != 0) {
- fprintf(stderr, "error creating rule 2");
- }
-
- ret = flb_ml_stream_create(ml, "elastic", -1, flush_callback, (void *) &res,
- &stream_id);
- TEST_CHECK(ret == 0);
-
- ret = flb_ml_parser_init(mlp);
- if (ret != 0) {
- fprintf(stderr, "error initializing multiline\n");
- flb_ml_destroy(ml);
- return;
- }
-
- printf("\n");
- entries = sizeof(elastic_input) / sizeof(struct record_check);
- for (i = 0; i < entries; i++) {
- r = &elastic_input[i];
- len = strlen(r->buf);
-
- /* initialize buffers */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- /* Package raw text as a msgpack record */
- msgpack_pack_array(&mp_pck, 2);
-
- flb_time_get(&tm);
- flb_time_append_to_msgpack(&tm, &mp_pck, 0);
-
- msgpack_pack_map(&mp_pck, 1);
- msgpack_pack_str(&mp_pck, 3);
- msgpack_pack_str_body(&mp_pck, "log", 3);
- msgpack_pack_str(&mp_pck, len);
- msgpack_pack_str_body(&mp_pck, r->buf, len);
-
- /* Unpack and lookup the content map */
- msgpack_unpacked_init(&result);
- off = 0;
- ret = msgpack_unpack_next(&result, mp_sbuf.data, mp_sbuf.size, &off);
- TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS);
-
- root = result.data;
- map = &root.via.array.ptr[1];
-
- /* Package as msgpack */
- ret = flb_ml_append_object(ml, stream_id, &tm, NULL, map);
-
- msgpack_unpacked_destroy(&result);
- msgpack_sbuffer_destroy(&mp_sbuf);
- }
-
- if (ml) {
- flb_ml_destroy(ml);
- }
-
- flb_config_exit(config);
-}
-
-static void test_endswith()
-{
- int i;
- int len;
- int ret;
- int entries;
- uint64_t stream_id = 0;
- struct record_check *r;
- struct flb_config *config;
- struct flb_time tm;
- struct flb_ml *ml;
- struct flb_ml_parser *mlp;
- struct flb_ml_parser_ins *mlp_i;
- struct expected_result res = {0};
-
- /* Expected results context */
- res.key = "log";
- res.out_records = endswith_output;
-
- /* Initialize environment */
- config = flb_config_init();
-
- /* Create docker multiline mode */
- ml = flb_ml_create(config, "raw-endswith");
- TEST_CHECK(ml != NULL);
-
- mlp = flb_ml_parser_create(config,
- "endswith", /* name */
- FLB_ML_ENDSWITH, /* type */
- "\\", /* match_str */
- FLB_TRUE, /* negate */
- 1000, /* flush_ms */
- NULL, /* key_content */
- NULL, /* key_pattern */
- NULL, /* key_group */
- NULL, /* parser ctx */
- NULL); /* parser name */
- TEST_CHECK(mlp != NULL);
-
- /* Generate an instance of 'endswith' custom parser parser */
- mlp_i = flb_ml_parser_instance_create(ml, "endswith");
- TEST_CHECK(mlp_i != NULL);
-
- ret = flb_ml_stream_create(ml, "test", -1, flush_callback, (void *) &res, &stream_id);
- TEST_CHECK(ret == 0);
-
- entries = sizeof(endswith_input) / sizeof(struct record_check);
- for (i = 0; i < entries; i++) {
- r = &endswith_input[i];
- len = strlen(r->buf);
-
- /* Package as msgpack */
- flb_time_get(&tm);
- flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
- }
-
- if (ml) {
- flb_ml_destroy(ml);
- }
-
- flb_config_exit(config);
-}
-
-static void test_parser_go()
-{
- int i;
- int len;
- int ret;
- int entries;
- uint64_t stream_id = 0;
- struct record_check *r;
- struct flb_config *config;
- struct flb_time tm;
- struct flb_ml *ml;
- struct flb_ml_parser_ins *mlp_i;
- struct expected_result res = {0};
-
- /* Expected results context */
- res.key = "log";
- res.out_records = go_output;
-
- /* Initialize environment */
- config = flb_config_init();
-
- /* Create docker multiline mode */
- ml = flb_ml_create(config, "go-test");
- TEST_CHECK(ml != NULL);
-
- /* Generate an instance of multiline java parser */
- mlp_i = flb_ml_parser_instance_create(ml, "go");
- TEST_CHECK(mlp_i != NULL);
-
- ret = flb_ml_stream_create(ml, "go", -1, flush_callback, (void *) &res, &stream_id);
- TEST_CHECK(ret == 0);
-
- entries = sizeof(go_input) / sizeof(struct record_check);
- for (i = 0; i < entries; i++) {
- r = &go_input[i];
- len = strlen(r->buf);
-
- /* Package as msgpack */
- flb_time_get(&tm);
- flb_ml_append_text(ml, stream_id, &tm, r->buf, len);
- }
-
- if (ml) {
- flb_ml_destroy(ml);
- }
-
- flb_config_exit(config);
-}
-
-static int flush_callback_to_buf(struct flb_ml_parser *parser,
- struct flb_ml_stream *mst,
- void *data, char *buf_data, size_t buf_size)
-{
- msgpack_sbuffer *mp_sbuf = data;
- msgpack_sbuffer_write(mp_sbuf, buf_data, buf_size);
-
- return 0;
-}
-
-static void run_test(struct flb_config *config, char *test_name,
- struct record_check *in, int in_len,
- struct record_check *out, int out_len,
- char *parser1, char *parser2)
-
-{
- int i;
- int ret;
- int len;
- size_t off = 0;
- uint64_t stream1 = 0;
- uint64_t stream2 = 0;
- struct flb_ml *ml;
- struct flb_ml_parser_ins *p1 = NULL;
- struct record_check *r;
- msgpack_sbuffer mp_sbuf1;
- msgpack_packer mp_pck1;
- msgpack_sbuffer mp_sbuf2;
- msgpack_packer mp_pck2;
- msgpack_object *map;
- struct flb_time tm;
- struct expected_result res = {0};
- msgpack_unpacked result;
-
- /* init buffers */
- msgpack_sbuffer_init(&mp_sbuf1);
- msgpack_packer_init(&mp_pck1, &mp_sbuf1, msgpack_sbuffer_write);
- msgpack_sbuffer_init(&mp_sbuf2);
- msgpack_packer_init(&mp_pck2, &mp_sbuf2, msgpack_sbuffer_write);
-
- /* Create docker multiline mode */
- ml = flb_ml_create(config, test_name);
- TEST_CHECK(ml != NULL);
-
- if (!parser1) {
- fprintf(stderr, "run_test(): parser1 is NULL\n");
- exit(1);
- }
-
- /* Parser 1 */
- p1 = flb_ml_parser_instance_create(ml, parser1);
- TEST_CHECK(p1 != NULL);
-
-
- /* Stream 1: use parser name (test_name) to generate the stream id */
- ret = flb_ml_stream_create(ml, test_name, -1,
- flush_callback_to_buf,
- (void *) &mp_sbuf1, &stream1);
- TEST_CHECK(ret == 0);
-
- /* Ingest input records into parser 1 */
- for (i = 0; i < in_len; i++) {
- r = &in[i];
- len = strlen(r->buf);
-
- flb_time_get(&tm);
-
- /* Package as msgpack */
- flb_ml_append_text(ml, stream1, &tm, r->buf, len);
- }
-
- flb_ml_destroy(ml);
- ml = flb_ml_create(config, test_name);
-
- flb_ml_parser_instance_create(ml, parser2);
-
- /*
- * After flb_ml_append above(), mp_sbuf1 has been populated with the
- * output results as structured messages. Now this data needs to be
- * passed to the next parser.
- */
-
- /* Expected results context */
- res.key = "log";
- res.out_records = out;
-
- /* Stream 2 */
- ret = flb_ml_stream_create(ml, "filter_multiline", -1,
- flush_callback,
- (void *) &res, &stream2);
-
- /* Ingest input records into parser 2 */
- off = 0;
- msgpack_unpacked_init(&result);
- while (msgpack_unpack_next(&result, mp_sbuf1.data, mp_sbuf1.size, &off)) {
- flb_time_pop_from_msgpack(&tm, &result, &map);
-
- /* Package as msgpack */
- ret = flb_ml_append_object(ml, stream2, &tm, NULL, map);
- }
- flb_ml_flush_pending_now(ml);
-
- msgpack_unpacked_destroy(&result);
- msgpack_sbuffer_destroy(&mp_sbuf1);
- flb_ml_destroy(ml);
-}
-
-void test_issue_3817_1()
-{
- int ret;
- int in_len = sizeof(issue_3817_1_input) / sizeof(struct record_check);
- int out_len = sizeof(issue_3817_1_output) / sizeof(struct record_check);
- struct flb_config *config;
- struct flb_ml_parser *mlp;
-
- /*
- * Parser definition for a file:
- *
- * [MULTILINE_PARSER]
- * name parser_3817
- * type regex
- * key_content log
- * #
- * # Regex rules for multiline parsing
- * # ---------------------------------
- * #
- * # rules | state name | regex pattern | next state
- * # ------|---------------|------------------------------------
- * rule "start_state" "/- $/" "cont"
- * rule "cont" "/^([1-9].*$/" "cont"
- *
- */
-
- /* Initialize environment */
- config = flb_config_init();
-
- /* Register custom parser */
- mlp = flb_ml_parser_create(config,
- "parser_3817", /* name */
- FLB_ML_REGEX, /* type */
- NULL, /* match_str */
- FLB_FALSE, /* negate */
- 1000, /* flush_ms */
- "log", /* key_content */
- NULL, /* key_pattern */
- NULL, /* key_group */
- NULL, /* parser ctx */
- NULL); /* parser name */
- TEST_CHECK(mlp != NULL);
-
- /* rule: start_state */
- ret = flb_ml_rule_create(mlp, "start_state", "/- $/", "cont", NULL);
- if (ret != 0) {
- fprintf(stderr, "error creating rule 1");
- }
-
- /* rule: cont */
- ret = flb_ml_rule_create(mlp, "cont", "/^([1-9]).*$/", "cont", NULL);
- if (ret != 0) {
- fprintf(stderr, "error creating rule 2");
- }
-
- /* initiaze the parser configuration */
- ret = flb_ml_parser_init(mlp);
- TEST_CHECK(ret == 0);
-
- /* Run the test */
- run_test(config, "issue_3817_1",
- issue_3817_1_input, in_len,
- issue_3817_1_output, out_len,
- "cri", "parser_3817");
-
- flb_config_exit(config);
-}
-
-static void test_issue_4034()
-{
- int i;
- int len;
- int ret;
- int entries;
- uint64_t stream_id;
- struct record_check *r;
- struct flb_config *config;
- struct flb_time tm;
- struct flb_ml *ml;
- struct flb_ml_parser_ins *mlp_i;
- struct expected_result res = {0};
- msgpack_packer mp_pck;
- msgpack_sbuffer mp_sbuf;
-
- /* Expected results context */
- res.key = "log";
- res.out_records = cri_output;
-
- /* Initialize environment */
- config = flb_config_init();
-
- /* Create cri multiline mode */
- ml = flb_ml_create(config, "cri-test");
- TEST_CHECK(ml != NULL);
-
- /* Generate an instance of multiline cri parser */
- mlp_i = flb_ml_parser_instance_create(ml, "cri");
- TEST_CHECK(mlp_i != NULL);
-
- flb_ml_parser_instance_set(mlp_i, "key_content", "log");
-
- ret = flb_ml_stream_create(ml, "cri", -1, flush_callback, (void *) &res,
- &stream_id);
- TEST_CHECK(ret == 0);
-
- /* initialize buffers */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- size_t off = 0;
- msgpack_unpacked result;
- msgpack_object root;
- msgpack_object *map;
-
- entries = sizeof(cri_input) / sizeof(struct record_check);
- for (i = 0; i < entries; i++) {
- r = &cri_input[i];
- len = strlen(r->buf);
-
- /* Package as msgpack */
- flb_time_get(&tm);
-
- /* initialize buffers */
- msgpack_sbuffer_init(&mp_sbuf);
- msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
-
- msgpack_pack_array(&mp_pck, 2);
- flb_time_append_to_msgpack(&tm, &mp_pck, 0);
-
- msgpack_pack_map(&mp_pck, 1);
- msgpack_pack_str(&mp_pck, 3);
- msgpack_pack_str_body(&mp_pck, "log", 3);
- msgpack_pack_str(&mp_pck, len);
- msgpack_pack_str_body(&mp_pck, r->buf, len);
-
- /* Unpack and lookup the content map */
- msgpack_unpacked_init(&result);
- off = 0;
- ret = msgpack_unpack_next(&result, mp_sbuf.data, mp_sbuf.size, &off);
-
- flb_pack_print(mp_sbuf.data, mp_sbuf.size);
-
- root = result.data;
- map = &root.via.array.ptr[1];
-
- /* Package as msgpack */
- ret = flb_ml_append_object(ml, stream_id, &tm, NULL, map);
-
- msgpack_unpacked_destroy(&result);
- msgpack_sbuffer_destroy(&mp_sbuf);
- }
- flb_ml_flush_pending_now(ml);
-
- if (ml) {
- flb_ml_destroy(ml);
- }
-
- flb_config_exit(config);
-}
-
-static void test_issue_5504()
-{
- uint64_t last_flush;
- struct flb_config *config;
- struct flb_ml *ml;
- struct flb_ml_parser_ins *mlp_i;
- struct mk_event_loop *evl;
- struct flb_sched *sched;
- struct mk_list *tmp;
- struct mk_list *head;
- struct flb_sched_timer *timer;
- void (*cb)(struct flb_config *, void *);
- int timeout = 500;
-
-#ifdef _WIN32
- WSADATA wsa_data;
- WSAStartup(0x0201, &wsa_data);
-#endif
-
- /* Initialize environment */
- config = flb_config_init();
-
- /* Create the event loop */
- evl = config->evl;
- config->evl = mk_event_loop_create(32);
- TEST_CHECK(config->evl != NULL);
-
- /* Initialize the scheduler */
- sched = config->sched;
- config->sched = flb_sched_create(config, config->evl);
- TEST_CHECK(config->sched != NULL);
-
- /* Set the thread local scheduler */
- flb_sched_ctx_init();
- flb_sched_ctx_set(config->sched);
-
- ml = flb_ml_create(config, "5504-test");
- TEST_CHECK(ml != NULL);
-
- /* Generate an instance of any multiline parser */
- mlp_i = flb_ml_parser_instance_create(ml, "cri");
- TEST_CHECK(mlp_i != NULL);
-
- flb_ml_parser_instance_set(mlp_i, "key_content", "log");
-
- /* Set the flush timeout */
- ml->flush_ms = timeout;
-
- /* Initialize the auto flush */
- flb_ml_auto_flush_init(ml);
-
- /* Store the initial last_flush time */
- last_flush = ml->last_flush;
-
- /* Find the cb_ml_flush_timer callback from the timers */
- mk_list_foreach_safe(head, tmp, &((struct flb_sched *)config->sched)->timers) {
- timer = mk_list_entry(head, struct flb_sched_timer, _head);
- if (timer->type == FLB_SCHED_TIMER_CB_PERM) {
- cb = timer->cb;
- }
- }
- TEST_CHECK(cb != NULL);
-
- /* Trigger the callback without delay */
- cb(config, ml);
- /* This should not update the last_flush since it is before the timeout */
- TEST_CHECK(ml->last_flush == last_flush);
-
- /* Sleep just enough time to pass the timeout */
- flb_time_msleep(timeout + 1);
-
- /* Retrigger the callback */
- cb(config, ml);
- /* Ensure this time the last_flush has been updated */
- TEST_CHECK(ml->last_flush > last_flush);
-
- /* Cleanup */
- flb_sched_destroy(config->sched);
- config->sched = sched;
- mk_event_loop_destroy(config->evl);
- config->evl = evl;
- flb_ml_destroy(ml);
- flb_config_exit(config);
-
-#ifdef _WIN32
- WSACleanup();
-#endif
-}
-
-TEST_LIST = {
- /* Normal features tests */
- { "parser_docker", test_parser_docker},
- { "parser_cri", test_parser_cri},
- { "parser_java", test_parser_java},
- { "parser_python", test_parser_python},
- { "parser_ruby", test_parser_ruby},
- { "parser_elastic", test_parser_elastic},
- { "parser_go", test_parser_go},
- { "container_mix", test_container_mix},
- { "endswith", test_endswith},
-
- /* Issues reported on Github */
- { "issue_3817_1" , test_issue_3817_1},
- { "issue_4034" , test_issue_4034},
- { "issue_4949" , test_issue_4949},
- { "issue_5504" , test_issue_5504},
- { 0 }
-};