diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:20:02 +0000 |
commit | 58daab21cd043e1dc37024a7f99b396788372918 (patch) | |
tree | 96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/tests/runtime/out_forward.c | |
parent | Releasing debian version 1.43.2-1. (diff) | |
download | netdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz netdata-58daab21cd043e1dc37024a7f99b396788372918.zip |
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/tests/runtime/out_forward.c')
-rw-r--r-- | fluent-bit/tests/runtime/out_forward.c | 364 |
1 files changed, 364 insertions, 0 deletions
diff --git a/fluent-bit/tests/runtime/out_forward.c b/fluent-bit/tests/runtime/out_forward.c new file mode 100644 index 000000000..7e3b90aec --- /dev/null +++ b/fluent-bit/tests/runtime/out_forward.c @@ -0,0 +1,364 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <fluent-bit.h> +#include <fluent-bit/flb_pack.h> +#include <fluent-bit/flb_record_accessor.h> + +#include "flb_tests_runtime.h" + +/* Include plugin header to get the flush_ctx structure definition */ +#include "../../plugins/out_forward/forward.h" + +static void cb_check_message_mode(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + size_t off = 0; + msgpack_object tag; + msgpack_object ts; + msgpack_object record; + msgpack_object root; + msgpack_unpacked result; + struct flb_time time = {0}; + + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, res_data, res_size, &off); + root = result.data; + + TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS); + TEST_CHECK(root.type == MSGPACK_OBJECT_ARRAY); + TEST_CHECK(root.via.array.size == 4); + + /* Tag */ + tag = root.via.array.ptr[0]; + TEST_CHECK(tag.type == MSGPACK_OBJECT_STR); + ret = strncmp(tag.via.str.ptr, "new.tag.fluent", tag.via.str.size); + TEST_CHECK(ret == 0); + + /* Timestamp */ + ts = root.via.array.ptr[1]; + TEST_CHECK(ts.type == MSGPACK_OBJECT_EXT); + + ret = flb_time_msgpack_to_time(&time, &ts); + TEST_CHECK(ret == 0); + TEST_CHECK(time.tm.tv_nsec != 0); + + /* Record */ + record = root.via.array.ptr[2]; + TEST_CHECK(record.type == MSGPACK_OBJECT_MAP); + TEST_CHECK(record.via.map.size == 2); + + msgpack_unpacked_destroy(&result); + flb_free(res_data); +} + +static void cb_check_message_compat_mode(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + size_t off = 0; + msgpack_object tag; + msgpack_object ts; + msgpack_object record; + msgpack_object root; + msgpack_unpacked result; + struct flb_time time = {0}; + + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, res_data, res_size, &off); + root = result.data; + + TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS); + TEST_CHECK(root.type == MSGPACK_OBJECT_ARRAY); + TEST_CHECK(root.via.array.size == 4); + + /* Tag */ + tag = root.via.array.ptr[0]; + TEST_CHECK(tag.type == MSGPACK_OBJECT_STR); + ret = strncmp(tag.via.str.ptr, "new.tag.fluent", tag.via.str.size); + TEST_CHECK(ret == 0); + + /* Timestamp */ + ts = root.via.array.ptr[1]; + TEST_CHECK(ts.type == MSGPACK_OBJECT_POSITIVE_INTEGER); + + ret = flb_time_msgpack_to_time(&time, &ts); + TEST_CHECK(ret == 0); + TEST_CHECK(time.tm.tv_nsec == 0); + + /* Record */ + record = root.via.array.ptr[2]; + TEST_CHECK(record.type == MSGPACK_OBJECT_MAP); + TEST_CHECK(record.via.map.size == 2); + + msgpack_unpacked_destroy(&result); + flb_free(res_data); +} + +static void cb_check_forward_mode(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + size_t off = 0; + msgpack_object key; + msgpack_object val; + msgpack_object root; + msgpack_unpacked result; + + /* + * the check for forward mode is a bit special, since no data is formatted, instead the formatter callback + * will return the "options" map that will be send after the records chunk. The options are set because the + * caller specified 'send_options true'. + */ + TEST_CHECK(res_ret == MODE_FORWARD); + + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, res_data, res_size, &off); + root = result.data; + + TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS); + TEST_CHECK(root.type == MSGPACK_OBJECT_MAP); + + /* fluent_signal and size */ + TEST_CHECK(root.via.map.size == 2); + + /* Record */ + key = root.via.map.ptr[1].key; + val = root.via.map.ptr[1].val; + + ret = strncmp(key.via.str.ptr, "fluent_signal", 13); + TEST_CHECK(ret == 0); + TEST_CHECK(val.type == MSGPACK_OBJECT_POSITIVE_INTEGER); + TEST_CHECK(val.via.u64 == 0); + + msgpack_unpacked_destroy(&result); + flb_free(res_data); +} + +static void cb_check_forward_compat_mode(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + int ret; + size_t off = 0; + msgpack_object root; + msgpack_object records; + msgpack_object entry; + msgpack_unpacked result; + + TEST_CHECK(res_ret == MODE_FORWARD_COMPAT); + + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, res_data, res_size, &off); + root = result.data; + records = root.via.array.ptr[1]; + + TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS); + TEST_CHECK(root.type == MSGPACK_OBJECT_ARRAY); + + /* Record */ + entry = records.via.array.ptr[0]; + TEST_CHECK(entry.type == MSGPACK_OBJECT_ARRAY); + TEST_CHECK(entry.via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER); + + msgpack_unpacked_destroy(&result); + flb_free(res_data); +} + +void flb_test_message_mode() +{ + int ret; + int in_ffd; + int out_ffd; + flb_ctx_t *ctx; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "2", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "dummy", NULL); + flb_input_set(ctx, in_ffd, + "tag", "test", + "samples", "1", + "dummy", "{\"key1\": 123, \"key2\": {\"s1\": \"fluent\"}}", + NULL); + + + /* Forward output */ + out_ffd = flb_output(ctx, (char *) "forward", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "tag", "new.tag.$key2['s1']", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_message_mode, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_message_compat_mode() +{ + int ret; + int in_ffd; + int out_ffd; + flb_ctx_t *ctx; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "2", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "dummy", NULL); + flb_input_set(ctx, in_ffd, + "tag", "test", + "samples", "1", + "dummy", "{\"key1\": 123, \"key2\": {\"s1\": \"fluent\"}}", + NULL); + + + /* Forward output with timestamp in integer mode (compat) */ + out_ffd = flb_output(ctx, (char *) "forward", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "time_as_integer", "true", + "tag", "new.tag.$key2['s1']", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_message_compat_mode, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_forward_mode() +{ + int ret; + int in_ffd; + int out_ffd; + flb_ctx_t *ctx; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "2", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "dummy", NULL); + flb_input_set(ctx, in_ffd, + "tag", "test", + "samples", "1", + "dummy", "{\"key1\": 123, \"key2\": {\"s1\": \"fluent\"}}", + NULL); + + + /* Forward output: without a tag key access, forward mode is used */ + out_ffd = flb_output(ctx, (char *) "forward", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "tag", "new.tag", + "send_options", "true", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_forward_mode, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_forward_compat_mode() +{ + int ret; + int in_ffd; + int out_ffd; + flb_ctx_t *ctx; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "3", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "dummy", NULL); + flb_input_set(ctx, in_ffd, + "tag", "test", + "samples", "2", + "dummy", "{\"key1\": 123, \"key2\": {\"s1\": \"fluent\"}}", + NULL); + + + /* Forward output: without a tag key access, forward mode is used */ + out_ffd = flb_output(ctx, (char *) "forward", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "tag", "new.tag", + "time_as_integer", "true", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_forward_compat_mode, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +/* Test list */ +TEST_LIST = { +#ifdef FLB_HAVE_RECORD_ACCESSOR + {"message_mode" , flb_test_message_mode }, + {"message_compat_mode", flb_test_message_compat_mode }, +#endif + {"forward_mode" , flb_test_forward_mode }, + {"forward_compat_mode", flb_test_forward_compat_mode }, + {NULL, NULL} +}; |