summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/tests/runtime/filter_wasm.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:18 +0000
commit5da14042f70711ea5cf66e034699730335462f66 (patch)
tree0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/fluent-bit/tests/runtime/filter_wasm.c
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz
netdata-5da14042f70711ea5cf66e034699730335462f66.zip
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/tests/runtime/filter_wasm.c')
-rw-r--r--src/fluent-bit/tests/runtime/filter_wasm.c468
1 files changed, 468 insertions, 0 deletions
diff --git a/src/fluent-bit/tests/runtime/filter_wasm.c b/src/fluent-bit/tests/runtime/filter_wasm.c
new file mode 100644
index 000000000..4cc1afcb8
--- /dev/null
+++ b/src/fluent-bit/tests/runtime/filter_wasm.c
@@ -0,0 +1,468 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit.h>
+#include <fluent-bit/flb_time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include "flb_tests_runtime.h"
+
+#define DPATH_WASM FLB_TESTS_DATA_PATH "/data/wasm"
+#define FLUSH_INTERVAL "1.0"
+#ifdef _WIN32
+ #define TIME_EPSILON_MS 30
+#else
+ #define TIME_EPSILON_MS 10
+#endif
+
+pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER;
+char *output = NULL;
+int num_output = 0;
+
+void set_output(char *val)
+{
+ pthread_mutex_lock(&result_mutex);
+ output = val;
+ pthread_mutex_unlock(&result_mutex);
+}
+
+char *get_output(void)
+{
+ char *val;
+
+ pthread_mutex_lock(&result_mutex);
+ val = output;
+ pthread_mutex_unlock(&result_mutex);
+
+ return val;
+}
+
+static void clear_output()
+{
+ pthread_mutex_lock(&result_mutex);
+ output = NULL;
+ pthread_mutex_unlock(&result_mutex);
+}
+
+static void clear_output_num()
+{
+ pthread_mutex_lock(&result_mutex);
+ num_output = 0;
+ pthread_mutex_unlock(&result_mutex);
+}
+
+static int get_output_num()
+{
+ int ret;
+ pthread_mutex_lock(&result_mutex);
+ ret = num_output;
+ pthread_mutex_unlock(&result_mutex);
+
+ return ret;
+}
+
+static int cb_count_msgpack_events(void *record, size_t size, void *data)
+{
+ msgpack_unpacked result;
+ size_t off = 0;
+
+ if (!TEST_CHECK(data != NULL)) {
+ flb_error("data is NULL");
+ }
+
+ /* Iterate each item array and apply rules */
+ msgpack_unpacked_init(&result);
+ while (msgpack_unpack_next(&result, record, size, &off) == MSGPACK_UNPACK_SUCCESS) {
+ pthread_mutex_lock(&result_mutex);
+ num_output++;
+ pthread_mutex_unlock(&result_mutex);
+ }
+ msgpack_unpacked_destroy(&result);
+
+ flb_free(record);
+ return 0;
+}
+
+int callback_test(void* data, size_t size, void* cb_data)
+{
+ if (size > 0) {
+ flb_debug("[test_filter_wasm] received message: %s", (char*)data);
+ set_output(data); /* success */
+ }
+ return 0;
+}
+
+void wait_with_timeout(uint32_t timeout_ms, char **out_result)
+{
+ struct flb_time start_time;
+ struct flb_time end_time;
+ struct flb_time diff_time;
+ uint64_t elapsed_time_flb = 0;
+ char *output = NULL;
+
+ flb_time_get(&start_time);
+
+ while (true) {
+ output = get_output();
+
+ if (output != NULL) {
+ *out_result = output;
+ break;
+ }
+
+ flb_time_msleep(100);
+ flb_time_get(&end_time);
+ flb_time_diff(&end_time, &start_time, &diff_time);
+ elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000;
+
+ if (elapsed_time_flb > timeout_ms - TIME_EPSILON_MS) {
+ flb_warn("[timeout] elapsed_time: %ld", elapsed_time_flb);
+ // Reached timeout.
+ break;
+ }
+ }
+}
+
+
+void flb_test_append_tag(void)
+{
+ int ret;
+ flb_ctx_t *ctx;
+ int in_ffd;
+ int out_ffd;
+ int filter_ffd;
+ char *output = NULL;
+ char *input = "[0, {\"key\":\"val\"}]";
+ char *result;
+ struct flb_lib_out_cb cb_data;
+
+ /* clear previous output */
+ clear_output();
+
+ /* Create context, flush every second (some checks omitted here) */
+ ctx = flb_create();
+ flb_service_set(ctx, "flush", FLUSH_INTERVAL, "grace", "1", NULL);
+
+ /* Prepare output callback context*/
+ cb_data.cb = callback_test;
+ cb_data.data = NULL;
+
+ /* Filter */
+ filter_ffd = flb_filter(ctx, (char *) "wasm", NULL);
+ TEST_CHECK(filter_ffd >= 0);
+ ret = flb_filter_set(ctx, filter_ffd,
+ "Match", "*",
+ "wasm_path", DPATH_WASM "/append_tag.wasm",
+ "function_name", "filter_append_tag",
+ NULL);
+
+ /* Input */
+ in_ffd = flb_input(ctx, (char *) "lib", NULL);
+ flb_input_set(ctx, in_ffd, "tag", "test.wasm", NULL);
+ TEST_CHECK(in_ffd >= 0);
+
+ /* Lib output */
+ out_ffd = flb_output(ctx, (char *) "lib", (void *)&cb_data);
+ TEST_CHECK(out_ffd >= 0);
+ flb_output_set(ctx, out_ffd,
+ "match", "test.wasm",
+ "format", "json",
+ NULL);
+
+ ret = flb_start(ctx);
+ TEST_CHECK(ret==0);
+
+ flb_lib_push(ctx, in_ffd, input, strlen(input));
+ wait_with_timeout(2000, &output);
+ result = strstr(output, "\"tag\":\"test.wasm\"");
+ TEST_CHECK(result != NULL);
+
+ /* clean up */
+ flb_lib_free(output);
+
+ flb_stop(ctx);
+ flb_destroy(ctx);
+}
+
+void flb_test_helloworld(void)
+{
+ int ret;
+ flb_ctx_t *ctx;
+ int in_ffd;
+ int out_ffd;
+ int filter_ffd;
+
+ /* Create context, flush every second (some checks omitted here) */
+ ctx = flb_create();
+ flb_service_set(ctx, "flush", FLUSH_INTERVAL, "grace", "1", NULL);
+
+ /* Filter */
+ filter_ffd = flb_filter(ctx, (char *) "wasm", NULL);
+ TEST_CHECK(filter_ffd >= 0);
+ ret = flb_filter_set(ctx, filter_ffd,
+ "Match", "*",
+ "wasm_path", DPATH_WASM "/say_hello.wasm",
+ "function_name", "filter_say_hello",
+ NULL);
+ /* Input */
+ in_ffd = flb_input(ctx, (char *) "dummy", NULL);
+ flb_input_set(ctx, in_ffd, "tag", "test", NULL);
+ TEST_CHECK(in_ffd >= 0);
+
+ /* Lib output */
+ out_ffd = flb_output(ctx, (char *) "stdout", NULL);
+ TEST_CHECK(out_ffd >= 0);
+ flb_output_set(ctx, out_ffd,
+ "match", "test",
+ NULL);
+
+ ret = flb_start(ctx);
+ TEST_CHECK(ret==0);
+
+ flb_stop(ctx);
+ flb_destroy(ctx);
+}
+
+void flb_test_numerics_records(void)
+{
+ int ret;
+ flb_ctx_t *ctx;
+ int in_ffd;
+ int out_ffd;
+ int filter_ffd;
+ char *output = NULL;
+ char *input = "[0, {\"key\":\"val\"}]";
+ char *result;
+ struct flb_lib_out_cb cb_data;
+
+ /* clear previous output */
+ clear_output();
+
+ /* Create context, flush every second (some checks omitted here) */
+ ctx = flb_create();
+ flb_service_set(ctx, "flush", FLUSH_INTERVAL, "grace", "1", NULL);
+
+ /* Prepare output callback context*/
+ cb_data.cb = callback_test;
+ cb_data.data = NULL;
+
+ /* Filter */
+ filter_ffd = flb_filter(ctx, (char *) "wasm", NULL);
+ TEST_CHECK(filter_ffd >= 0);
+ ret = flb_filter_set(ctx, filter_ffd,
+ "Match", "*",
+ "wasm_path", DPATH_WASM "/numeric_records.wasm",
+ "function_name", "filter_numeric_records",
+ NULL);
+
+ /* Input */
+ in_ffd = flb_input(ctx, (char *) "lib", NULL);
+ flb_input_set(ctx, in_ffd, "tag", "test.wasm", NULL);
+ TEST_CHECK(in_ffd >= 0);
+
+ /* Lib output */
+ out_ffd = flb_output(ctx, (char *) "lib", (void *)&cb_data);
+ TEST_CHECK(out_ffd >= 0);
+ flb_output_set(ctx, out_ffd,
+ "match", "test.wasm",
+ "format", "json",
+ NULL);
+
+ ret = flb_start(ctx);
+ TEST_CHECK(ret==0);
+
+ flb_lib_push(ctx, in_ffd, input, strlen(input));
+ wait_with_timeout(2000, &output);
+
+ /* check if float (for int keys)*/
+ result = strstr(output, "\"wasm_int1\":10.");
+ if (!TEST_CHECK(result == NULL)) {
+ TEST_MSG("output:%s\n", output);
+ }
+ result = strstr(output, "\"wasm_int2\":100.");
+ if (!TEST_CHECK(result == NULL)) {
+ TEST_MSG("output:%s\n", output);
+ }
+
+ /* check if float (for float keys)*/
+ result = strstr(output, "\"wasm_float1\":10.5");
+ if (!TEST_CHECK(result != NULL)) {
+ TEST_MSG("output:%s\n", output);
+ }
+ result = strstr(output, "\"wasm_float2\":100.5");
+ if (!TEST_CHECK(result != NULL)) {
+ TEST_MSG("output:%s\n", output);
+ }
+
+ /* check if float (for exp style float key)*/
+ result = strstr(output, "\"wasm_exp_float\":0.00354");
+ if (!TEST_CHECK(result != NULL)) {
+ TEST_MSG("output:%s\n", output);
+ }
+
+ /* check if float (for truncated float key)*/
+ result = strstr(output, "\"wasm_truncate_float\":120");
+ if (!TEST_CHECK(result != NULL)) {
+ TEST_MSG("output:%s\n", output);
+ }
+ result = strstr(output, "\"wasm_truncate_float\":120.");
+ if (!TEST_CHECK(result == NULL)) {
+ TEST_MSG("output:%s\n", output);
+ }
+
+ /* clean up */
+ flb_lib_free(output);
+
+ flb_stop(ctx);
+ flb_destroy(ctx);
+}
+
+void flb_test_array_contains_null(void)
+{
+ int ret;
+ flb_ctx_t *ctx;
+ int in_ffd;
+ int out_ffd;
+ int filter_ffd;
+ char *output = NULL;
+ char *input = "[0, {\"hello\": [1, null, \"world\"]}]";
+ char *result;
+ struct flb_lib_out_cb cb_data;
+
+ /* clear previous output */
+ clear_output();
+
+ /* Create context, flush every second (some checks omitted here) */
+ ctx = flb_create();
+ flb_service_set(ctx, "flush", FLUSH_INTERVAL, "grace", "1", NULL);
+
+ /* Prepare output callback context*/
+ cb_data.cb = callback_test;
+ cb_data.data = NULL;
+
+ /* Filter */
+ filter_ffd = flb_filter(ctx, (char *) "wasm", NULL);
+ TEST_CHECK(filter_ffd >= 0);
+ ret = flb_filter_set(ctx, filter_ffd,
+ "Match", "*",
+ "wasm_path", DPATH_WASM "/modify_record.wasm",
+ "function_name", "filter_modify_record",
+ NULL);
+
+ /* Input */
+ in_ffd = flb_input(ctx, (char *) "lib", NULL);
+ flb_input_set(ctx, in_ffd, "tag", "test.wasm", NULL);
+ TEST_CHECK(in_ffd >= 0);
+
+ /* Lib output */
+ out_ffd = flb_output(ctx, (char *) "lib", (void *)&cb_data);
+ TEST_CHECK(out_ffd >= 0);
+ flb_output_set(ctx, out_ffd,
+ "match", "test.wasm",
+ "format", "json",
+ NULL);
+
+ ret = flb_start(ctx);
+ TEST_CHECK(ret==0);
+
+ flb_lib_push(ctx, in_ffd, input, strlen(input));
+ wait_with_timeout(2000, &output);
+
+ result = strstr(output, "[1,null,\"world\"]");
+ if(!TEST_CHECK(result != NULL)) {
+ TEST_MSG("output:%s\n", output);
+ }
+ result = strstr(output, "\"modify\":\"yes\"");
+ if(!TEST_CHECK(result != NULL)) {
+ TEST_MSG("output:%s\n", output);
+ }
+
+ /* clean up */
+ flb_lib_free(output);
+
+ flb_stop(ctx);
+ flb_destroy(ctx);
+}
+
+void flb_test_drop_all_records(void)
+{
+ int ret;
+ flb_ctx_t *ctx;
+ int in_ffd;
+ int out_ffd;
+ int filter_ffd;
+ char *output = NULL;
+ char *input = "[0, {\"key\":\"val\"}]";
+ struct flb_lib_out_cb cb_data;
+
+ clear_output_num();
+
+ /* Create context, flush every second (some checks omitted here) */
+ ctx = flb_create();
+ flb_service_set(ctx, "flush", FLUSH_INTERVAL, "grace", "1", NULL);
+
+ /* Prepare output callback context*/
+ cb_data.cb = cb_count_msgpack_events;
+ cb_data.data = NULL;
+
+ /* Filter */
+ filter_ffd = flb_filter(ctx, (char *) "wasm", NULL);
+ TEST_CHECK(filter_ffd >= 0);
+ ret = flb_filter_set(ctx, filter_ffd,
+ "Match", "*",
+ "wasm_path", DPATH_WASM "/drop_record.wasm",
+ "function_name", "filter_drop_record",
+ NULL);
+ /* Input */
+ in_ffd = flb_input(ctx, (char *) "lib", NULL);
+ flb_input_set(ctx, in_ffd, "tag", "test", NULL);
+ TEST_CHECK(in_ffd >= 0);
+
+ /* Lib output */
+ out_ffd = flb_output(ctx, (char *) "lib", (void *)&cb_data);
+ TEST_CHECK(out_ffd >= 0);
+ flb_output_set(ctx, out_ffd,
+ "match", "test",
+ NULL);
+
+ ret = flb_start(ctx);
+ TEST_CHECK(ret==0);
+
+ flb_lib_push(ctx, in_ffd, input, strlen(input));
+ wait_with_timeout(2000, &output); /* waiting flush */
+
+ ret = get_output_num();
+ if (!TEST_CHECK(ret == 0)) {
+ TEST_MSG("error. got %d expect 0", ret);
+ }
+
+ flb_stop(ctx);
+ flb_destroy(ctx);
+}
+
+
+TEST_LIST = {
+ {"hello_world", flb_test_helloworld},
+ {"append_tag", flb_test_append_tag},
+ {"numeric_records", flb_test_numerics_records},
+ {"array_contains_null", flb_test_array_contains_null},
+ {"drop_all_records", flb_test_drop_all_records},
+ {NULL, NULL}
+};