diff options
Diffstat (limited to 'fluent-bit/tests/internal/multiline.c')
-rw-r--r-- | fluent-bit/tests/internal/multiline.c | 1478 |
1 files changed, 1478 insertions, 0 deletions
diff --git a/fluent-bit/tests/internal/multiline.c b/fluent-bit/tests/internal/multiline.c new file mode 100644 index 000000000..e175e1171 --- /dev/null +++ b/fluent-bit/tests/internal/multiline.c @@ -0,0 +1,1478 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include <fluent-bit/flb_info.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_parser.h> +#include <fluent-bit/flb_mem.h> +#include <fluent-bit/multiline/flb_ml.h> +#include <fluent-bit/multiline/flb_ml_rule.h> +#include <fluent-bit/multiline/flb_ml_parser.h> + +#include "flb_tests_internal.h" + +struct record_check { + char *buf; +}; + +struct expected_result { + int current_record; + char *key; + struct record_check *out_records; +}; + +/* Docker */ +struct record_check docker_input[] = { + {"{\"log\": \"aa\\n\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01231z\"}"}, + {"{\"log\": \"aa\\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01231z\"}"}, + {"{\"log\": \"bb\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01232z\"}"}, + {"{\"log\": \"cc\n\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01233z\"}"}, + {"{\"log\": \"dd\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01233z\"}"}, + {"single line to force pending flush of the previous line"}, + {"{\"log\": \"ee\\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01234z\"}"}, +}; + +struct record_check docker_output[] = { + {"aa\n"}, + {"aa\n"}, + {"bbcc\n"}, + {"dd"}, + {"single line to force pending flush of the previous line"}, + {"ee\n"}, +}; + +/* CRI */ +struct record_check cri_input[] = { + {"2019-05-07T18:57:50.904275087+00:00 stdout P 1a. some "}, + {"2019-05-07T18:57:51.904275088+00:00 stdout P multiline "}, + {"2019-05-07T18:57:52.904275089+00:00 stdout F log"}, + {"2019-05-07T18:57:50.904275087+00:00 stderr P 1b. some "}, + {"2019-05-07T18:57:51.904275088+00:00 stderr P multiline "}, + {"2019-05-07T18:57:52.904275089+00:00 stderr F log"}, + {"2019-05-07T18:57:53.904275090+00:00 stdout P 2a. another "}, + {"2019-05-07T18:57:54.904275091+00:00 stdout P multiline "}, + {"2019-05-07T18:57:55.904275092+00:00 stdout F log"}, + {"2019-05-07T18:57:53.904275090+00:00 stderr P 2b. another "}, + {"2019-05-07T18:57:54.904275091+00:00 stderr P multiline "}, + {"2019-05-07T18:57:55.904275092+00:00 stderr F log"}, + {"2019-05-07T18:57:56.904275093+00:00 stdout F 3a. non multiline 1"}, + {"2019-05-07T18:57:57.904275094+00:00 stdout F 4a. non multiline 2"}, + {"2019-05-07T18:57:56.904275093+00:00 stderr F 3b. non multiline 1"}, + {"2019-05-07T18:57:57.904275094+00:00 stderr F 4b. non multiline 2"} +}; + +struct record_check cri_output[] = { + {"1a. some multiline log"}, + {"1b. some multiline log"}, + {"2a. another multiline log"}, + {"2b. another multiline log"}, + {"3a. non multiline 1"}, + {"4a. non multiline 2"}, + {"3b. non multiline 1"}, + {"4b. non multiline 2"} +}; + +/* ENDSWITH */ +struct record_check endswith_input[] = { + {"1a. some multiline log \\"}, + {"1b. some multiline log"}, + {"2a. another multiline log\\"}, + {"2b. another multiline log"}, + {"3a. non multiline 1"}, + {"4a. non multiline 2"} +}; + +struct record_check endswith_output[] = { + {"1a. some multiline log \\\n1b. some multiline log\n"}, + {"2a. another multiline log\\\n2b. another multiline log\n"}, + {"3a. non multiline 1\n"}, + {"4a. non multiline 2\n"} +}; + +/* Mixed lines of Docker and CRI logs in different streams (stdout/stderr) */ +struct record_check container_mix_input[] = { + {"{\"log\": \"a1\\n\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01231z\"}"}, + {"{\"log\": \"a2\\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01231z\"}"}, + {"{\"log\": \"bb\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01232z\"}"}, + {"{\"log\": \"cc\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01233z\"}"}, + {"{\"log\": \"dd\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01232z\"}"}, + {"{\"log\": \"ee\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01233z\"}"}, + {"2019-05-07T18:57:52.904275089+00:00 stdout F single full"}, + {"2019-05-07T18:57:50.904275087+00:00 stdout P 1a. some "}, + {"2019-05-07T18:57:51.904275088+00:00 stdout P multiline "}, + {"2019-05-07T18:57:52.904275089+00:00 stdout F log"}, + {"2019-05-07T18:57:50.904275087+00:00 stderr P 1b. some "}, + {"2019-05-07T18:57:51.904275088+00:00 stderr P multiline "}, + {"2019-05-07T18:57:52.904275089+00:00 stderr F log"}, + {"{\"log\": \"dd-out\\n\", \"stream\": \"stdout\", \"time\": \"2021-02-01T16:45:03.01234z\"}"}, + {"{\"log\": \"dd-err\\n\", \"stream\": \"stderr\", \"time\": \"2021-02-01T16:45:03.01234z\"}"}, +}; + +struct record_check container_mix_output[] = { + {"a1\n"}, + {"a2\n"}, + {"ddee\n"}, + {"bbcc"}, + {"single full"}, + {"1a. some multiline log"}, + {"1b. some multiline log"}, + {"dd-out\n"}, + {"dd-err\n"}, +}; + +/* Java stacktrace detection */ +struct record_check java_input[] = { + {"Exception in thread \"main\" java.lang.IllegalStateException: ..null property\n"}, + {" at com.example.myproject.Author.getBookIds(xx.java:38)\n"}, + {" at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\n"}, + {"Caused by: java.lang.NullPointerException\n"}, + {" at com.example.myproject.Book.getId(Book.java:22)\n"}, + {" at com.example.myproject.Author.getBookIds(Author.java:35)\n"}, + {" ... 1 more\n"}, + {"single line\n"} +}; + +struct record_check java_output[] = { + { + "Exception in thread \"main\" java.lang.IllegalStateException: ..null property\n" + " at com.example.myproject.Author.getBookIds(xx.java:38)\n" + " at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\n" + "Caused by: java.lang.NullPointerException\n" + " at com.example.myproject.Book.getId(Book.java:22)\n" + " at com.example.myproject.Author.getBookIds(Author.java:35)\n" + " ... 1 more\n" + }, + { + "single line\n" + } +}; + +struct record_check ruby_input[] = { + {"/app/config/routes.rb:6:in `/': divided by 0 (ZeroDivisionError)"}, + {" from /app/config/routes.rb:6:in `block in <main>'"}, + {" from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:428:in `instance_exec'"}, + {" from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:428:in `eval_block'"}, + {" from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:410:in `draw'"}, + {" from /app/config/routes.rb:1:in `<main>'"}, + {"hello world, not multiline\n"} +}; + +struct record_check ruby_output[] = { + { + "/app/config/routes.rb:6:in `/': divided by 0 (ZeroDivisionError)\n" + " from /app/config/routes.rb:6:in `block in <main>'\n" + " from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:428:in `instance_exec'\n" + " from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:428:in `eval_block'\n" + " from /var/lib/gems/3.0.0/gems/actionpack-7.0.4/lib/action_dispatch/routing/route_set.rb:410:in `draw'\n" + " from /app/config/routes.rb:1:in `<main>'\n" + }, + {"hello world, not multiline\n"} +}; + +/* Python stacktrace detection */ +struct record_check python_input[] = { + {"Traceback (most recent call last):\n"}, + {" File \"/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py\", line 1535, in __call__\n"}, + {" rv = self.handle_exception(request, response, e)\n"}, + {" File \"/base/data/home/apps/s~nearfieldspy/1.378705245900539993/nearfieldspy.py\", line 17, in start\n"}, + {" return get()\n"}, + {" File \"/base/data/home/apps/s~nearfieldspy/1.378705245900539993/nearfieldspy.py\", line 5, in get\n"}, + {" raise Exception('spam', 'eggs')\n"}, + {"Exception: ('spam', 'eggs')\n"}, + {"hello world, not multiline\n"} +}; + +struct record_check python_output[] = { + { + "Traceback (most recent call last):\n" + " File \"/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py\", line 1535, in __call__\n" + " rv = self.handle_exception(request, response, e)\n" + " File \"/base/data/home/apps/s~nearfieldspy/1.378705245900539993/nearfieldspy.py\", line 17, in start\n" + " return get()\n" + " File \"/base/data/home/apps/s~nearfieldspy/1.378705245900539993/nearfieldspy.py\", line 5, in get\n" + " raise Exception('spam', 'eggs')\n" + "Exception: ('spam', 'eggs')\n" + }, + {"hello world, not multiline\n"} +}; + +/* Custom example for Elasticsearch stacktrace */ +struct record_check elastic_input[] = { + {"[some weird test] IndexNotFoundException[no such index]\n"}, + {" at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver....\n"}, + {" at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.java:133)\n"}, + {" at org.elasticsearch.action.admin.indices.delete.java:75)\n"}, + {"another separate log line\n"} +}; + +struct record_check elastic_output[] = { + { + "[some weird test] IndexNotFoundException[no such index]\n" + " at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver....\n" + " at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.java:133)\n" + " at org.elasticsearch.action.admin.indices.delete.java:75)\n" + }, + { + "another separate log line\n" + } +}; + +/* Go */ +struct record_check go_input[] = { + {"panic: my panic\n"}, + {"\n"}, + {"goroutine 4 [running]:\n"}, + {"panic(0x45cb40, 0x47ad70)\n"}, + {" /usr/local/go/src/runtime/panic.go:542 +0x46c fp=0xc42003f7b8 sp=0xc42003f710 pc=0x422f7c\n"}, + {"main.main.func1(0xc420024120)\n"}, + {" foo.go:6 +0x39 fp=0xc42003f7d8 sp=0xc42003f7b8 pc=0x451339\n"}, + {"runtime.goexit()\n"}, + {" /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003f7e0 sp=0xc42003f7d8 pc=0x44b4d1\n"}, + {"created by main.main\n"}, + {" foo.go:5 +0x58\n"}, + {"\n"}, + {"goroutine 1 [chan receive]:\n"}, + {"runtime.gopark(0x4739b8, 0xc420024178, 0x46fcd7, 0xc, 0xc420028e17, 0x3)\n"}, + {" /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc420053e30 sp=0xc420053e00 pc=0x42503c\n"}, + {"runtime.goparkunlock(0xc420024178, 0x46fcd7, 0xc, 0x1000f010040c217, 0x3)\n"}, + {" /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc420053e70 sp=0xc420053e30 pc=0x42512e\n"}, + {"runtime.chanrecv(0xc420024120, 0x0, 0xc420053f01, 0x4512d8)\n"}, + {" /usr/local/go/src/runtime/chan.go:506 +0x304 fp=0xc420053f20 sp=0xc420053e70 pc=0x4046b4\n"}, + {"runtime.chanrecv1(0xc420024120, 0x0)\n"}, + {" /usr/local/go/src/runtime/chan.go:388 +0x2b fp=0xc420053f50 sp=0xc420053f20 pc=0x40439b\n"}, + {"main.main()\n"}, + {" foo.go:9 +0x6f fp=0xc420053f80 sp=0xc420053f50 pc=0x4512ef\n"}, + {"runtime.main()\n"}, + {" /usr/local/go/src/runtime/proc.go:185 +0x20d fp=0xc420053fe0 sp=0xc420053f80 pc=0x424bad\n"}, + {"runtime.goexit()\n"}, + {" /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc420053fe8 sp=0xc420053fe0 pc=0x44b4d1\n"}, + {"\n"}, + {"goroutine 2 [force gc (idle)]:\n"}, + {"runtime.gopark(0x4739b8, 0x4ad720, 0x47001e, 0xf, 0x14, 0x1)\n"}, + {" /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003e768 sp=0xc42003e738 pc=0x42503c\n"}, + {"runtime.goparkunlock(0x4ad720, 0x47001e, 0xf, 0xc420000114, 0x1)\n"}, + {" /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003e7a8 sp=0xc42003e768 pc=0x42512e\n"}, + {"runtime.forcegchelper()\n"}, + {" /usr/local/go/src/runtime/proc.go:238 +0xcc fp=0xc42003e7e0 sp=0xc42003e7a8 pc=0x424e5c\n"}, + {"runtime.goexit()\n"}, + {" /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003e7e8 sp=0xc42003e7e0 pc=0x44b4d1\n"}, + {"created by runtime.init.4\n"}, + {" /usr/local/go/src/runtime/proc.go:227 +0x35\n"}, + {"\n"}, + {"goroutine 3 [GC sweep wait]:\n"}, + {"runtime.gopark(0x4739b8, 0x4ad7e0, 0x46fdd2, 0xd, 0x419914, 0x1)\n"}, + {" /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003ef60 sp=0xc42003ef30 pc=0x42503c\n"}, + {"runtime.goparkunlock(0x4ad7e0, 0x46fdd2, 0xd, 0x14, 0x1)\n"}, + {" /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003efa0 sp=0xc42003ef60 pc=0x42512e\n"}, + {"runtime.bgsweep(0xc42001e150)\n"}, + {" /usr/local/go/src/runtime/mgcsweep.go:52 +0xa3 fp=0xc42003efd8 sp=0xc42003efa0 pc=0x419973\n"}, + {"runtime.goexit()\n"}, + {" /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003efe0 sp=0xc42003efd8 pc=0x44b4d1\n"}, + {"created by runtime.gcenable\n"}, + {" /usr/local/go/src/runtime/mgc.go:216 +0x58\n"}, + {"one more line, no multiline"} +}; + +struct record_check go_output[] = { + { + "panic: my panic\n" + "\n" + "goroutine 4 [running]:\n" + "panic(0x45cb40, 0x47ad70)\n" + " /usr/local/go/src/runtime/panic.go:542 +0x46c fp=0xc42003f7b8 sp=0xc42003f710 pc=0x422f7c\n" + "main.main.func1(0xc420024120)\n" + " foo.go:6 +0x39 fp=0xc42003f7d8 sp=0xc42003f7b8 pc=0x451339\n" + "runtime.goexit()\n" + " /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003f7e0 sp=0xc42003f7d8 pc=0x44b4d1\n" + "created by main.main\n" + " foo.go:5 +0x58\n" + "\n" + "goroutine 1 [chan receive]:\n" + "runtime.gopark(0x4739b8, 0xc420024178, 0x46fcd7, 0xc, 0xc420028e17, 0x3)\n" + " /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc420053e30 sp=0xc420053e00 pc=0x42503c\n" + "runtime.goparkunlock(0xc420024178, 0x46fcd7, 0xc, 0x1000f010040c217, 0x3)\n" + " /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc420053e70 sp=0xc420053e30 pc=0x42512e\n" + "runtime.chanrecv(0xc420024120, 0x0, 0xc420053f01, 0x4512d8)\n" + " /usr/local/go/src/runtime/chan.go:506 +0x304 fp=0xc420053f20 sp=0xc420053e70 pc=0x4046b4\n" + "runtime.chanrecv1(0xc420024120, 0x0)\n" + " /usr/local/go/src/runtime/chan.go:388 +0x2b fp=0xc420053f50 sp=0xc420053f20 pc=0x40439b\n" + "main.main()\n" + " foo.go:9 +0x6f fp=0xc420053f80 sp=0xc420053f50 pc=0x4512ef\n" + "runtime.main()\n" + " /usr/local/go/src/runtime/proc.go:185 +0x20d fp=0xc420053fe0 sp=0xc420053f80 pc=0x424bad\n" + "runtime.goexit()\n" + " /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc420053fe8 sp=0xc420053fe0 pc=0x44b4d1\n" + "\n" + "goroutine 2 [force gc (idle)]:\n" + "runtime.gopark(0x4739b8, 0x4ad720, 0x47001e, 0xf, 0x14, 0x1)\n" + " /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003e768 sp=0xc42003e738 pc=0x42503c\n" + "runtime.goparkunlock(0x4ad720, 0x47001e, 0xf, 0xc420000114, 0x1)\n" + " /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003e7a8 sp=0xc42003e768 pc=0x42512e\n" + "runtime.forcegchelper()\n" + " /usr/local/go/src/runtime/proc.go:238 +0xcc fp=0xc42003e7e0 sp=0xc42003e7a8 pc=0x424e5c\n" + "runtime.goexit()\n" + " /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003e7e8 sp=0xc42003e7e0 pc=0x44b4d1\n" + "created by runtime.init.4\n" + " /usr/local/go/src/runtime/proc.go:227 +0x35\n" + "\n" + "goroutine 3 [GC sweep wait]:\n" + "runtime.gopark(0x4739b8, 0x4ad7e0, 0x46fdd2, 0xd, 0x419914, 0x1)\n" + " /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003ef60 sp=0xc42003ef30 pc=0x42503c\n" + "runtime.goparkunlock(0x4ad7e0, 0x46fdd2, 0xd, 0x14, 0x1)\n" + " /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003efa0 sp=0xc42003ef60 pc=0x42512e\n" + "runtime.bgsweep(0xc42001e150)\n" + " /usr/local/go/src/runtime/mgcsweep.go:52 +0xa3 fp=0xc42003efd8 sp=0xc42003efa0 pc=0x419973\n" + "runtime.goexit()\n" + " /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003efe0 sp=0xc42003efd8 pc=0x44b4d1\n" + "created by runtime.gcenable\n" + " /usr/local/go/src/runtime/mgc.go:216 +0x58\n" + }, + {"one more line, no multiline\n"} +}; + +/* + * Issue 3817 (case: 1) + * -------------------- + * Source CRI messages (need first CRI multiline parsing) + a custom multiline + * parser. + * + * - https://github.com/fluent/fluent-bit/issues/3817 + * + * The 'case 1' represents the problems of identifying two consecutive multiline + * messages within the same stream. + */ +struct record_check issue_3817_1_input[] = { + {"2021-05-17T17:35:01.184675702Z stdout F [DEBUG] 1 start multiline - "}, + {"2021-05-17T17:35:01.184747208Z stdout F 1 cont A"}, + {"2021-05-17T17:35:01.184675702Z stdout F [DEBUG] 2 start multiline - "}, + {"2021-05-17T17:35:01.184747208Z stdout F 2 cont B"}, + {"another isolated line"} +}; + +struct record_check issue_3817_1_output[] = { + { + "[DEBUG] 1 start multiline - \n" + "1 cont A" + }, + + { + "[DEBUG] 2 start multiline - \n" + "2 cont B" + }, + + { + "another isolated line" + } +}; + +/* + * Flush callback is invoked every time a multiline stream has completed a multiline + * message or a message is not multiline. + */ +static int flush_callback(struct flb_ml_parser *parser, + struct flb_ml_stream *mst, + void *data, char *buf_data, size_t buf_size) +{ + int i; + int ret; + int len; + int found = FLB_FALSE; + size_t off = 0; + msgpack_unpacked result; + msgpack_object *map; + msgpack_object key; + msgpack_object val; + struct flb_time tm; + struct expected_result *res = data; + struct record_check *exp; + + fprintf(stdout, "\n%s----- MULTILINE FLUSH -----%s\n", ANSI_YELLOW, ANSI_RESET); + + /* Print incoming flush buffer */ + flb_pack_print(buf_data, buf_size); + + fprintf(stdout, "%s----------- EOF -----------%s\n", + ANSI_YELLOW, ANSI_RESET); + + /* Validate content */ + msgpack_unpacked_init(&result); + off = 0; + ret = msgpack_unpack_next(&result, buf_data, buf_size, &off); + TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS); + + flb_time_pop_from_msgpack(&tm, &result, &map); + + TEST_CHECK(flb_time_to_nanosec(&tm) != 0L); + + exp = &res->out_records[res->current_record]; + len = strlen(res->key); + for (i = 0; i < map->via.map.size; i++) { + key = map->via.map.ptr[i].key; + val = map->via.map.ptr[i].val; + + if (key.via.str.size != len) { + continue; + } + + if (strncmp(key.via.str.ptr, res->key, len) == 0) { + found = FLB_TRUE; + break; + } + } + TEST_CHECK(found == FLB_TRUE); + + len = strlen(exp->buf); + TEST_CHECK(val.via.str.size == len); + if (val.via.str.size != len) { + printf("expected length: %i, received: %i\n", len, val.via.str.size); + printf("== received ==\n"); + msgpack_object_print(stdout, val); + printf("\n\n"); + printf("== expected ==\n%s\n", exp->buf); + exit(1); + } + TEST_CHECK(memcmp(val.via.str.ptr, exp->buf, len) == 0); + res->current_record++; + + msgpack_unpacked_destroy(&result); + return 0; +} + +static void test_parser_docker() +{ + int i; + int len; + int ret; + int entries; + uint64_t stream_id; + struct record_check *r; + struct flb_config *config; + struct flb_time tm; + struct flb_ml *ml; + struct flb_ml_parser_ins *mlp_i; + struct expected_result res = {0}; + + /* Expected results context */ + res.key = "log"; + res.out_records = docker_output; + + /* Initialize environment */ + config = flb_config_init(); + + /* Create docker multiline mode */ + ml = flb_ml_create(config, "test-docker"); + TEST_CHECK(ml != NULL); + + /* Load instances of the parsers for current 'ml' context */ + mlp_i = flb_ml_parser_instance_create(ml, "cri"); + TEST_CHECK(mlp_i != NULL); + + /* Generate an instance of multiline docker parser */ + mlp_i = flb_ml_parser_instance_create(ml, "docker"); + TEST_CHECK(mlp_i != NULL); + + ret = flb_ml_stream_create(ml, "docker", -1, flush_callback, (void *) &res, + &stream_id); + TEST_CHECK(ret == 0); + + entries = sizeof(docker_input) / sizeof(struct record_check); + for (i = 0; i < entries; i++) { + r = &docker_input[i]; + len = strlen(r->buf); + + flb_time_get(&tm); + + /* Package as msgpack */ + flb_ml_append_text(ml, stream_id, &tm, r->buf, len); + } + + if (ml) { + flb_ml_destroy(ml); + } + + flb_config_exit(config); +} + +static void test_parser_cri() +{ + int i; + int len; + int ret; + int entries; + uint64_t stream_id; + struct record_check *r; + struct flb_config *config; + struct flb_time tm; + struct flb_ml *ml; + struct flb_ml_parser_ins *mlp_i; + struct expected_result res = {0}; + + /* Expected results context */ + res.key = "log"; + res.out_records = cri_output; + + /* Initialize environment */ + config = flb_config_init(); + + /* Create docker multiline mode */ + ml = flb_ml_create(config, "cri-test"); + TEST_CHECK(ml != NULL); + + /* Generate an instance of multiline docker parser */ + mlp_i = flb_ml_parser_instance_create(ml, "docker"); + TEST_CHECK(mlp_i != NULL); + + /* Load instances of the parsers for current 'ml' context */ + mlp_i = flb_ml_parser_instance_create(ml, "cri"); + TEST_CHECK(mlp_i != NULL); + + ret = flb_ml_stream_create(ml, "cri", -1, flush_callback, (void *) &res, + &stream_id); + TEST_CHECK(ret == 0); + + entries = sizeof(cri_input) / sizeof(struct record_check); + for (i = 0; i < entries; i++) { + r = &cri_input[i]; + len = strlen(r->buf); + flb_time_get(&tm); + + /* Package as msgpack */ + flb_ml_append_text(ml, stream_id, &tm, r->buf, len); + } + + if (ml) { + flb_ml_destroy(ml); + } + + flb_config_exit(config); +} + +static void test_container_mix() +{ + int i; + int len; + int ret; + int entries; + uint64_t stream_id; + struct record_check *r; + struct flb_config *config; + struct flb_time tm; + struct flb_ml *ml; + struct flb_ml_parser_ins *mlp_i; + struct expected_result res = {0}; + + /* Expected results context */ + res.key = "log"; + res.out_records = container_mix_output; + + /* Initialize environment */ + config = flb_config_init(); + + /* Create docker multiline mode */ + ml = flb_ml_create(config, "container-mix-test"); + TEST_CHECK(ml != NULL); + + /* Generate an instance of multiline docker parser */ + mlp_i = flb_ml_parser_instance_create(ml, "docker"); + TEST_CHECK(mlp_i != NULL); + + /* Load instances of the parsers for current 'ml' context */ + mlp_i = flb_ml_parser_instance_create(ml, "cri"); + TEST_CHECK(mlp_i != NULL); + + ret = flb_ml_stream_create(ml, "container-mix", -1, flush_callback, (void *) &res, + &stream_id); + TEST_CHECK(ret == 0); + + entries = sizeof(container_mix_input) / sizeof(struct record_check); + for (i = 0; i < entries; i++) { + r = &container_mix_input[i]; + len = strlen(r->buf); + flb_time_get(&tm); + + /* Package as msgpack */ + flb_ml_append_text(ml, stream_id, &tm, r->buf, len); + } + + if (ml) { + flb_ml_destroy(ml); + } + + flb_config_exit(config); +} + +static void test_parser_java() +{ + int i; + int len; + int ret; + int entries; + uint64_t stream_id; + struct record_check *r; + struct flb_config *config; + struct flb_time tm; + struct flb_ml *ml; + struct flb_ml_parser_ins *mlp_i; + struct expected_result res = {0}; + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + + /* Expected results context */ + res.key = "log"; + res.out_records = java_output; + + /* Initialize environment */ + config = flb_config_init(); + + /* Create docker multiline mode */ + ml = flb_ml_create(config, "java-test"); + TEST_CHECK(ml != NULL); + + /* Generate an instance of multiline java parser */ + mlp_i = flb_ml_parser_instance_create(ml, "java"); + TEST_CHECK(mlp_i != NULL); + + flb_ml_parser_instance_set(mlp_i, "key_content", "log"); + + ret = flb_ml_stream_create(ml, "java", -1, flush_callback, (void *) &res, + &stream_id); + TEST_CHECK(ret == 0); + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + size_t off = 0; + msgpack_unpacked result; + msgpack_object root; + msgpack_object *map; + + entries = sizeof(java_input) / sizeof(struct record_check); + for (i = 0; i < entries; i++) { + r = &java_input[i]; + len = strlen(r->buf); + + /* Package as msgpack */ + flb_time_get(&tm); + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + msgpack_pack_array(&mp_pck, 2); + flb_time_append_to_msgpack(&tm, &mp_pck, 0); + + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str(&mp_pck, 3); + msgpack_pack_str_body(&mp_pck, "log", 3); + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, r->buf, len); + + /* Unpack and lookup the content map */ + msgpack_unpacked_init(&result); + off = 0; + ret = msgpack_unpack_next(&result, mp_sbuf.data, mp_sbuf.size, &off); + + flb_pack_print(mp_sbuf.data, mp_sbuf.size); + + root = result.data; + map = &root.via.array.ptr[1]; + + /* Package as msgpack */ + ret = flb_ml_append_object(ml, stream_id, &tm, NULL, map); + + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&mp_sbuf); + } + + if (ml) { + flb_ml_destroy(ml); + } + + flb_config_exit(config); +} + +static void test_parser_python() +{ + int i; + int len; + int ret; + int entries; + uint64_t stream_id; + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + struct record_check *r; + struct flb_config *config; + struct flb_time tm; + struct flb_ml *ml; + struct flb_ml_parser_ins *mlp_i; + struct expected_result res = {0}; + + /* Expected results context */ + res.key = "log"; + res.out_records = python_output; + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + /* Initialize environment */ + config = flb_config_init(); + + /* Create docker multiline mode */ + ml = flb_ml_create(config, "python-test"); + TEST_CHECK(ml != NULL); + + /* Generate an instance of multiline python parser */ + mlp_i = flb_ml_parser_instance_create(ml, "python"); + TEST_CHECK(mlp_i != NULL); + + ret = flb_ml_stream_create(ml, "python", -1, flush_callback, (void *) &res, + &stream_id); + TEST_CHECK(ret == 0); + + flb_time_get(&tm); + + printf("\n"); + entries = sizeof(python_input) / sizeof(struct record_check); + for (i = 0; i < entries; i++) { + r = &python_input[i]; + len = strlen(r->buf); + + /* Package as msgpack */ + flb_time_get(&tm); + flb_ml_append_text(ml, stream_id, &tm, r->buf, len); + } + + if (ml) { + flb_ml_destroy(ml); + } + + flb_config_exit(config); +} + +static void test_parser_ruby() +{ + int i; + int len; + int ret; + int entries; + uint64_t stream_id; + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + struct record_check *r; + struct flb_config *config; + struct flb_time tm; + struct flb_ml *ml; + struct flb_ml_parser_ins *mlp_i; + struct expected_result res = {0}; + + /* Expected results context */ + res.key = "log"; + res.out_records = ruby_output; + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + /* Initialize environment */ + config = flb_config_init(); + + /* Create docker multiline mode */ + ml = flb_ml_create(config, "ruby-test"); + TEST_CHECK(ml != NULL); + + /* Generate an instance of multiline ruby parser */ + mlp_i = flb_ml_parser_instance_create(ml, "ruby"); + TEST_CHECK(mlp_i != NULL); + + ret = flb_ml_stream_create(ml, "ruby", -1, flush_callback, (void *) &res, + &stream_id); + TEST_CHECK(ret == 0); + + flb_time_get(&tm); + + printf("\n"); + entries = sizeof(ruby_input) / sizeof(struct record_check); + for (i = 0; i < entries; i++) { + r = &ruby_input[i]; + len = strlen(r->buf); + + /* Package as msgpack */ + flb_time_get(&tm); + flb_ml_append_text(ml, stream_id, &tm, r->buf, len); + } + + if (ml) { + flb_ml_destroy(ml); + } + + flb_config_exit(config); +} + +static void test_issue_4949() +{ + int i; + int len; + int ret; + int entries; + uint64_t stream_id; + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + struct record_check *r; + struct flb_config *config; + struct flb_time tm; + struct flb_ml *ml; + struct flb_ml_parser_ins *mlp_i; + struct expected_result res = {0}; + + /* Expected results context */ + res.key = "log"; + res.out_records = python_output; + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + /* Initialize environment */ + config = flb_config_init(); + + /* Create docker multiline mode */ + ml = flb_ml_create(config, "python-test"); + TEST_CHECK(ml != NULL); + + /* Generate an instance of multiline python parser */ + mlp_i = flb_ml_parser_instance_create(ml, "python"); + TEST_CHECK(mlp_i != NULL); + + ret = flb_ml_stream_create(ml, "python", -1, flush_callback, (void *) &res, + &stream_id); + TEST_CHECK(ret == 0); + + /* Generate an instance of multiline java parser */ + mlp_i = flb_ml_parser_instance_create(ml, "java"); + TEST_CHECK(mlp_i != NULL); + + ret = flb_ml_stream_create(ml, "java", -1, flush_callback, (void *) &res, + &stream_id); + TEST_CHECK(ret == 0); + + flb_time_get(&tm); + + printf("\n"); + entries = sizeof(python_input) / sizeof(struct record_check); + for (i = 0; i < entries; i++) { + r = &python_input[i]; + len = strlen(r->buf); + + /* Package as msgpack */ + flb_time_get(&tm); + flb_ml_append_text(ml, stream_id, &tm, r->buf, len); + } + + if (ml) { + flb_ml_destroy(ml); + } + + flb_config_exit(config); +} + +static void test_parser_elastic() +{ + int i; + int len; + int ret; + int entries; + size_t off = 0; + uint64_t stream_id; + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + msgpack_unpacked result; + msgpack_object root; + msgpack_object *map; + struct record_check *r; + struct flb_config *config; + struct flb_time tm; + struct flb_ml *ml; + struct flb_ml_parser *mlp; + struct flb_ml_parser_ins *mlp_i; + struct expected_result res = {0}; + + /* Expected results context */ + res.key = "log"; + res.out_records = elastic_output; + + /* Initialize environment */ + config = flb_config_init(); + + ml = flb_ml_create(config, "test-elastic"); + TEST_CHECK(ml != NULL); + + mlp = flb_ml_parser_create(config, + "elastic", /* name */ + FLB_ML_REGEX, /* type */ + NULL, /* match_str */ + FLB_FALSE, /* negate */ + 1000, /* flush_ms */ + "log", /* key_content */ + NULL, /* key_pattern */ + NULL, /* key_group */ + NULL, /* parser ctx */ + NULL); /* parser name */ + TEST_CHECK(mlp != NULL); + + mlp_i = flb_ml_parser_instance_create(ml, "elastic"); + TEST_CHECK(mlp_i != NULL); + + ret = flb_ml_rule_create(mlp, "start_state", "/^\\[/", "elastic_cont", NULL); + if (ret != 0) { + fprintf(stderr, "error creating rule 1"); + } + + ret = flb_ml_rule_create(mlp, "elastic_cont", "/^\\s+/", "elastic_cont", NULL); + if (ret != 0) { + fprintf(stderr, "error creating rule 2"); + } + + ret = flb_ml_stream_create(ml, "elastic", -1, flush_callback, (void *) &res, + &stream_id); + TEST_CHECK(ret == 0); + + ret = flb_ml_parser_init(mlp); + if (ret != 0) { + fprintf(stderr, "error initializing multiline\n"); + flb_ml_destroy(ml); + return; + } + + printf("\n"); + entries = sizeof(elastic_input) / sizeof(struct record_check); + for (i = 0; i < entries; i++) { + r = &elastic_input[i]; + len = strlen(r->buf); + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + /* Package raw text as a msgpack record */ + msgpack_pack_array(&mp_pck, 2); + + flb_time_get(&tm); + flb_time_append_to_msgpack(&tm, &mp_pck, 0); + + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str(&mp_pck, 3); + msgpack_pack_str_body(&mp_pck, "log", 3); + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, r->buf, len); + + /* Unpack and lookup the content map */ + msgpack_unpacked_init(&result); + off = 0; + ret = msgpack_unpack_next(&result, mp_sbuf.data, mp_sbuf.size, &off); + TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS); + + root = result.data; + map = &root.via.array.ptr[1]; + + /* Package as msgpack */ + ret = flb_ml_append_object(ml, stream_id, &tm, NULL, map); + + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&mp_sbuf); + } + + if (ml) { + flb_ml_destroy(ml); + } + + flb_config_exit(config); +} + +static void test_endswith() +{ + int i; + int len; + int ret; + int entries; + uint64_t stream_id = 0; + struct record_check *r; + struct flb_config *config; + struct flb_time tm; + struct flb_ml *ml; + struct flb_ml_parser *mlp; + struct flb_ml_parser_ins *mlp_i; + struct expected_result res = {0}; + + /* Expected results context */ + res.key = "log"; + res.out_records = endswith_output; + + /* Initialize environment */ + config = flb_config_init(); + + /* Create docker multiline mode */ + ml = flb_ml_create(config, "raw-endswith"); + TEST_CHECK(ml != NULL); + + mlp = flb_ml_parser_create(config, + "endswith", /* name */ + FLB_ML_ENDSWITH, /* type */ + "\\", /* match_str */ + FLB_TRUE, /* negate */ + 1000, /* flush_ms */ + NULL, /* key_content */ + NULL, /* key_pattern */ + NULL, /* key_group */ + NULL, /* parser ctx */ + NULL); /* parser name */ + TEST_CHECK(mlp != NULL); + + /* Generate an instance of 'endswith' custom parser parser */ + mlp_i = flb_ml_parser_instance_create(ml, "endswith"); + TEST_CHECK(mlp_i != NULL); + + ret = flb_ml_stream_create(ml, "test", -1, flush_callback, (void *) &res, &stream_id); + TEST_CHECK(ret == 0); + + entries = sizeof(endswith_input) / sizeof(struct record_check); + for (i = 0; i < entries; i++) { + r = &endswith_input[i]; + len = strlen(r->buf); + + /* Package as msgpack */ + flb_time_get(&tm); + flb_ml_append_text(ml, stream_id, &tm, r->buf, len); + } + + if (ml) { + flb_ml_destroy(ml); + } + + flb_config_exit(config); +} + +static void test_parser_go() +{ + int i; + int len; + int ret; + int entries; + uint64_t stream_id = 0; + struct record_check *r; + struct flb_config *config; + struct flb_time tm; + struct flb_ml *ml; + struct flb_ml_parser_ins *mlp_i; + struct expected_result res = {0}; + + /* Expected results context */ + res.key = "log"; + res.out_records = go_output; + + /* Initialize environment */ + config = flb_config_init(); + + /* Create docker multiline mode */ + ml = flb_ml_create(config, "go-test"); + TEST_CHECK(ml != NULL); + + /* Generate an instance of multiline java parser */ + mlp_i = flb_ml_parser_instance_create(ml, "go"); + TEST_CHECK(mlp_i != NULL); + + ret = flb_ml_stream_create(ml, "go", -1, flush_callback, (void *) &res, &stream_id); + TEST_CHECK(ret == 0); + + entries = sizeof(go_input) / sizeof(struct record_check); + for (i = 0; i < entries; i++) { + r = &go_input[i]; + len = strlen(r->buf); + + /* Package as msgpack */ + flb_time_get(&tm); + flb_ml_append_text(ml, stream_id, &tm, r->buf, len); + } + + if (ml) { + flb_ml_destroy(ml); + } + + flb_config_exit(config); +} + +static int flush_callback_to_buf(struct flb_ml_parser *parser, + struct flb_ml_stream *mst, + void *data, char *buf_data, size_t buf_size) +{ + msgpack_sbuffer *mp_sbuf = data; + msgpack_sbuffer_write(mp_sbuf, buf_data, buf_size); + + return 0; +} + +static void run_test(struct flb_config *config, char *test_name, + struct record_check *in, int in_len, + struct record_check *out, int out_len, + char *parser1, char *parser2) + +{ + int i; + int ret; + int len; + size_t off = 0; + uint64_t stream1 = 0; + uint64_t stream2 = 0; + struct flb_ml *ml; + struct flb_ml_parser_ins *p1 = NULL; + struct record_check *r; + msgpack_sbuffer mp_sbuf1; + msgpack_packer mp_pck1; + msgpack_sbuffer mp_sbuf2; + msgpack_packer mp_pck2; + msgpack_object *map; + struct flb_time tm; + struct expected_result res = {0}; + msgpack_unpacked result; + + /* init buffers */ + msgpack_sbuffer_init(&mp_sbuf1); + msgpack_packer_init(&mp_pck1, &mp_sbuf1, msgpack_sbuffer_write); + msgpack_sbuffer_init(&mp_sbuf2); + msgpack_packer_init(&mp_pck2, &mp_sbuf2, msgpack_sbuffer_write); + + /* Create docker multiline mode */ + ml = flb_ml_create(config, test_name); + TEST_CHECK(ml != NULL); + + if (!parser1) { + fprintf(stderr, "run_test(): parser1 is NULL\n"); + exit(1); + } + + /* Parser 1 */ + p1 = flb_ml_parser_instance_create(ml, parser1); + TEST_CHECK(p1 != NULL); + + + /* Stream 1: use parser name (test_name) to generate the stream id */ + ret = flb_ml_stream_create(ml, test_name, -1, + flush_callback_to_buf, + (void *) &mp_sbuf1, &stream1); + TEST_CHECK(ret == 0); + + /* Ingest input records into parser 1 */ + for (i = 0; i < in_len; i++) { + r = &in[i]; + len = strlen(r->buf); + + flb_time_get(&tm); + + /* Package as msgpack */ + flb_ml_append_text(ml, stream1, &tm, r->buf, len); + } + + flb_ml_destroy(ml); + ml = flb_ml_create(config, test_name); + + flb_ml_parser_instance_create(ml, parser2); + + /* + * After flb_ml_append above(), mp_sbuf1 has been populated with the + * output results as structured messages. Now this data needs to be + * passed to the next parser. + */ + + /* Expected results context */ + res.key = "log"; + res.out_records = out; + + /* Stream 2 */ + ret = flb_ml_stream_create(ml, "filter_multiline", -1, + flush_callback, + (void *) &res, &stream2); + + /* Ingest input records into parser 2 */ + off = 0; + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, mp_sbuf1.data, mp_sbuf1.size, &off)) { + flb_time_pop_from_msgpack(&tm, &result, &map); + + /* Package as msgpack */ + ret = flb_ml_append_object(ml, stream2, &tm, NULL, map); + } + flb_ml_flush_pending_now(ml); + + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&mp_sbuf1); + flb_ml_destroy(ml); +} + +void test_issue_3817_1() +{ + int ret; + int in_len = sizeof(issue_3817_1_input) / sizeof(struct record_check); + int out_len = sizeof(issue_3817_1_output) / sizeof(struct record_check); + struct flb_config *config; + struct flb_ml_parser *mlp; + + /* + * Parser definition for a file: + * + * [MULTILINE_PARSER] + * name parser_3817 + * type regex + * key_content log + * # + * # Regex rules for multiline parsing + * # --------------------------------- + * # + * # rules | state name | regex pattern | next state + * # ------|---------------|------------------------------------ + * rule "start_state" "/- $/" "cont" + * rule "cont" "/^([1-9].*$/" "cont" + * + */ + + /* Initialize environment */ + config = flb_config_init(); + + /* Register custom parser */ + mlp = flb_ml_parser_create(config, + "parser_3817", /* name */ + FLB_ML_REGEX, /* type */ + NULL, /* match_str */ + FLB_FALSE, /* negate */ + 1000, /* flush_ms */ + "log", /* key_content */ + NULL, /* key_pattern */ + NULL, /* key_group */ + NULL, /* parser ctx */ + NULL); /* parser name */ + TEST_CHECK(mlp != NULL); + + /* rule: start_state */ + ret = flb_ml_rule_create(mlp, "start_state", "/- $/", "cont", NULL); + if (ret != 0) { + fprintf(stderr, "error creating rule 1"); + } + + /* rule: cont */ + ret = flb_ml_rule_create(mlp, "cont", "/^([1-9]).*$/", "cont", NULL); + if (ret != 0) { + fprintf(stderr, "error creating rule 2"); + } + + /* initiaze the parser configuration */ + ret = flb_ml_parser_init(mlp); + TEST_CHECK(ret == 0); + + /* Run the test */ + run_test(config, "issue_3817_1", + issue_3817_1_input, in_len, + issue_3817_1_output, out_len, + "cri", "parser_3817"); + + flb_config_exit(config); +} + +static void test_issue_4034() +{ + int i; + int len; + int ret; + int entries; + uint64_t stream_id; + struct record_check *r; + struct flb_config *config; + struct flb_time tm; + struct flb_ml *ml; + struct flb_ml_parser_ins *mlp_i; + struct expected_result res = {0}; + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + + /* Expected results context */ + res.key = "log"; + res.out_records = cri_output; + + /* Initialize environment */ + config = flb_config_init(); + + /* Create cri multiline mode */ + ml = flb_ml_create(config, "cri-test"); + TEST_CHECK(ml != NULL); + + /* Generate an instance of multiline cri parser */ + mlp_i = flb_ml_parser_instance_create(ml, "cri"); + TEST_CHECK(mlp_i != NULL); + + flb_ml_parser_instance_set(mlp_i, "key_content", "log"); + + ret = flb_ml_stream_create(ml, "cri", -1, flush_callback, (void *) &res, + &stream_id); + TEST_CHECK(ret == 0); + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + size_t off = 0; + msgpack_unpacked result; + msgpack_object root; + msgpack_object *map; + + entries = sizeof(cri_input) / sizeof(struct record_check); + for (i = 0; i < entries; i++) { + r = &cri_input[i]; + len = strlen(r->buf); + + /* Package as msgpack */ + flb_time_get(&tm); + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + msgpack_pack_array(&mp_pck, 2); + flb_time_append_to_msgpack(&tm, &mp_pck, 0); + + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str(&mp_pck, 3); + msgpack_pack_str_body(&mp_pck, "log", 3); + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, r->buf, len); + + /* Unpack and lookup the content map */ + msgpack_unpacked_init(&result); + off = 0; + ret = msgpack_unpack_next(&result, mp_sbuf.data, mp_sbuf.size, &off); + + flb_pack_print(mp_sbuf.data, mp_sbuf.size); + + root = result.data; + map = &root.via.array.ptr[1]; + + /* Package as msgpack */ + ret = flb_ml_append_object(ml, stream_id, &tm, NULL, map); + + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&mp_sbuf); + } + flb_ml_flush_pending_now(ml); + + if (ml) { + flb_ml_destroy(ml); + } + + flb_config_exit(config); +} + +static void test_issue_5504() +{ + uint64_t last_flush; + struct flb_config *config; + struct flb_ml *ml; + struct flb_ml_parser_ins *mlp_i; + struct mk_event_loop *evl; + struct flb_sched *sched; + struct mk_list *tmp; + struct mk_list *head; + struct flb_sched_timer *timer; + void (*cb)(struct flb_config *, void *); + int timeout = 500; + +#ifdef _WIN32 + WSADATA wsa_data; + WSAStartup(0x0201, &wsa_data); +#endif + + /* Initialize environment */ + config = flb_config_init(); + + /* Create the event loop */ + evl = config->evl; + config->evl = mk_event_loop_create(32); + TEST_CHECK(config->evl != NULL); + + /* Initialize the scheduler */ + sched = config->sched; + config->sched = flb_sched_create(config, config->evl); + TEST_CHECK(config->sched != NULL); + + /* Set the thread local scheduler */ + flb_sched_ctx_init(); + flb_sched_ctx_set(config->sched); + + ml = flb_ml_create(config, "5504-test"); + TEST_CHECK(ml != NULL); + + /* Generate an instance of any multiline parser */ + mlp_i = flb_ml_parser_instance_create(ml, "cri"); + TEST_CHECK(mlp_i != NULL); + + flb_ml_parser_instance_set(mlp_i, "key_content", "log"); + + /* Set the flush timeout */ + ml->flush_ms = timeout; + + /* Initialize the auto flush */ + flb_ml_auto_flush_init(ml); + + /* Store the initial last_flush time */ + last_flush = ml->last_flush; + + /* Find the cb_ml_flush_timer callback from the timers */ + mk_list_foreach_safe(head, tmp, &((struct flb_sched *)config->sched)->timers) { + timer = mk_list_entry(head, struct flb_sched_timer, _head); + if (timer->type == FLB_SCHED_TIMER_CB_PERM) { + cb = timer->cb; + } + } + TEST_CHECK(cb != NULL); + + /* Trigger the callback without delay */ + cb(config, ml); + /* This should not update the last_flush since it is before the timeout */ + TEST_CHECK(ml->last_flush == last_flush); + + /* Sleep just enough time to pass the timeout */ + flb_time_msleep(timeout + 1); + + /* Retrigger the callback */ + cb(config, ml); + /* Ensure this time the last_flush has been updated */ + TEST_CHECK(ml->last_flush > last_flush); + + /* Cleanup */ + flb_sched_destroy(config->sched); + config->sched = sched; + mk_event_loop_destroy(config->evl); + config->evl = evl; + flb_ml_destroy(ml); + flb_config_exit(config); + +#ifdef _WIN32 + WSACleanup(); +#endif +} + +TEST_LIST = { + /* Normal features tests */ + { "parser_docker", test_parser_docker}, + { "parser_cri", test_parser_cri}, + { "parser_java", test_parser_java}, + { "parser_python", test_parser_python}, + { "parser_ruby", test_parser_ruby}, + { "parser_elastic", test_parser_elastic}, + { "parser_go", test_parser_go}, + { "container_mix", test_container_mix}, + { "endswith", test_endswith}, + + /* Issues reported on Github */ + { "issue_3817_1" , test_issue_3817_1}, + { "issue_4034" , test_issue_4034}, + { "issue_4949" , test_issue_4949}, + { "issue_5504" , test_issue_5504}, + { 0 } +}; |