summaryrefslogtreecommitdiffstats
path: root/fluent-bit/plugins/in_event_test
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/plugins/in_event_test')
-rw-r--r--fluent-bit/plugins/in_event_test/CMakeLists.txt4
-rw-r--r--fluent-bit/plugins/in_event_test/event_test.c407
2 files changed, 411 insertions, 0 deletions
diff --git a/fluent-bit/plugins/in_event_test/CMakeLists.txt b/fluent-bit/plugins/in_event_test/CMakeLists.txt
new file mode 100644
index 000000000..9a9577a6c
--- /dev/null
+++ b/fluent-bit/plugins/in_event_test/CMakeLists.txt
@@ -0,0 +1,4 @@
+set(src
+ event_test.c)
+
+FLB_PLUGIN(in_event_test "${src}" "")
diff --git a/fluent-bit/plugins/in_event_test/event_test.c b/fluent-bit/plugins/in_event_test/event_test.c
new file mode 100644
index 000000000..557017fe2
--- /dev/null
+++ b/fluent-bit/plugins/in_event_test/event_test.c
@@ -0,0 +1,407 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/* Fluent Bit
+ * ==========
+ * Copyright (C) 2015-2022 The Fluent Bit Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <fluent-bit/flb_input_plugin.h>
+#include <fluent-bit/flb_config.h>
+#include <fluent-bit/flb_config_map.h>
+#include <fluent-bit/flb_error.h>
+#include <fluent-bit/flb_pipe.h>
+#include <fluent-bit/flb_time.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_upstream.h>
+#include <fluent-bit/flb_time_utils.h>
+
+#define STATUS_OK 1
+#define STATUS_ERROR 0
+#define STATUS_PENDING -1
+#define CALLBACK_TIME 2 /* 2 seconds */
+
+#define SERVER_PORT "9092"
+#define SERVER_IFACE "0.0.0.0"
+
+struct unit_test {
+ int id;
+ int coll_id;
+ int status;
+ char *desc;
+};
+
+struct unit_test tests[] = {
+ {0, 0, STATUS_PENDING, "collector time"},
+ {1, 0, STATUS_PENDING, "collector fd_event"},
+ {2, 0, STATUS_PENDING, "collector fd_server | socket"},
+ {3, 0, STATUS_PENDING, "plugin paused from engine"},
+ {4, 0, STATUS_PENDING, "plugin resumed from engine"},
+};
+
+#define UNIT_TESTS_SIZE (sizeof(tests) / sizeof(struct unit_test))
+
+struct event_test {
+ flb_pipefd_t pipe[2];
+ int server_fd;
+ int client_coll_id;
+ struct flb_upstream *upstream;
+ struct unit_test *tests;
+ struct flb_input_instance *ins;
+};
+
+static void set_unit_test_status(struct event_test *ctx, int id, int status)
+{
+ struct unit_test *ut;
+
+ ut = &ctx->tests[id];
+ ut->status = status;
+}
+
+static int config_destroy(struct event_test *ctx)
+{
+ if (!ctx) {
+ return 0;
+ }
+
+ if (ctx->tests) {
+ flb_free(ctx->tests);
+ }
+
+ if (ctx->pipe[0] > 0) {
+ flb_socket_close(ctx->pipe[0]);
+ }
+ if (ctx->pipe[1] > 0) {
+ flb_socket_close(ctx->pipe[1]);
+ }
+ if (ctx->server_fd > 0) {
+ flb_socket_close(ctx->server_fd);
+ }
+
+ if (ctx->upstream) {
+ flb_upstream_destroy(ctx->upstream);
+ }
+
+ flb_free(ctx);
+ return 0;
+}
+
+static int cb_collector_time(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ int diff;
+ int ret;
+ uint64_t val;
+ time_t now;
+ struct unit_test *ut;
+ struct event_test *ctx = (struct event_test *) in_context;
+
+ now = time(NULL);
+ diff = now - config->init_time;
+ /* For macOS, we sometimes get the +1 longer time elapse.
+ * To handle this, we simply add +1 as a delta for checking interval. */
+ if (diff > (CALLBACK_TIME + 1)) {
+ flb_plg_error(ins, "cb_collector_time difference failed: %i seconds", diff);
+ set_unit_test_status(ctx, 0, STATUS_ERROR);
+ flb_engine_exit(config);
+ }
+
+ /* disable the collector */
+ ut = &ctx->tests[0];
+ flb_input_collector_pause(ut->coll_id, ins);
+
+ /*
+ * before to return, trigger test 1 (collector_fd_event) by writing a byte
+ * to our local pipe.
+ */
+ val = 1;
+ ret = write(ctx->pipe[1], &val, sizeof(val));
+ if (ret == -1) {
+ flb_errno();
+ set_unit_test_status(ctx, 0, STATUS_ERROR);
+ flb_engine_exit(config);
+ }
+
+ set_unit_test_status(ctx, 0, STATUS_OK);
+ flb_plg_info(ins, "[OK] collector_time");
+ FLB_INPUT_RETURN(0);
+}
+
+static int cb_collector_fd(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ uint64_t val = 0;
+ size_t bytes;
+ struct unit_test *ut;
+ struct event_test *ctx = (struct event_test *) in_context;
+
+ bytes = read(ctx->pipe[0], &val, sizeof(val));
+ if (bytes <= 0) {
+ flb_errno();
+ set_unit_test_status(ctx, 1, STATUS_ERROR);
+ flb_engine_exit(config);
+ }
+ else {
+ flb_plg_info(ins, "[OK] collector_fd");
+ }
+
+ /* disable the collector */
+ ut = &ctx->tests[1];
+ flb_input_collector_pause(ut->coll_id, ins);
+ set_unit_test_status(ctx, 1, STATUS_OK);
+
+ FLB_INPUT_RETURN(0);
+}
+
+static int cb_collector_server_socket(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ int fd;
+ struct unit_test *ut;
+ struct event_test *ctx = in_context;
+
+ /* Accept the new connection */
+ fd = flb_net_accept(ctx->server_fd);
+ if (fd == -1) {
+ flb_plg_error(ins, "could not accept new connection");
+ return -1;
+ }
+
+ /* sleep co-routine for 500ms */
+ flb_time_sleep(500);
+ flb_socket_close(fd);
+
+ ut = &ctx->tests[2];
+ flb_input_collector_pause(ut->coll_id, ins);
+ set_unit_test_status(ctx, 2, STATUS_OK);
+
+ flb_plg_info(ins, "[OK] collector_server_socket");
+
+ /* tell the engine to deliver a pause request */
+ flb_plg_info(ins, "test pause/resume in 5 seconds...");
+ flb_input_test_pause_resume(ins, 5);
+
+ /* return */
+ FLB_INPUT_RETURN(0);
+}
+
+static int cb_collector_server_client(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ struct flb_connection *u_conn;
+ struct event_test *ctx = (struct event_test *) in_context;
+
+ /* get the upstream connection (localhost) */
+ u_conn = flb_upstream_conn_get(ctx->upstream);
+ if (!u_conn) {
+ flb_plg_error(ins, "could not connect to socket server");
+ return -1;
+ }
+
+ flb_time_sleep(200);
+ flb_upstream_conn_release(u_conn);
+
+ /* disable this collector */
+ flb_input_collector_pause(ctx->client_coll_id, ins);
+ FLB_INPUT_RETURN(0);
+}
+
+static struct event_test *config_create(struct flb_input_instance *ins)
+{
+ size_t size;
+ struct event_test *ctx;
+
+ /* Allocate space for the configuration */
+ ctx = flb_calloc(1, sizeof(struct event_test));
+ if (!ctx) {
+ flb_errno();
+ return NULL;
+ }
+ ctx->ins = ins;
+
+ size = sizeof(struct unit_test) * UNIT_TESTS_SIZE;
+ ctx->tests = flb_malloc(size);
+ if (!ctx->tests) {
+ flb_errno();
+ flb_free(ctx);
+ return NULL;
+ }
+ memcpy(ctx->tests, &tests, size);
+ return ctx;
+}
+
+/* Initialize plugin */
+static int cb_event_test_init(struct flb_input_instance *ins,
+ struct flb_config *config, void *data)
+{
+ int fd;
+ int ret;
+ struct unit_test *ut;
+ struct event_test *ctx = NULL;
+ struct flb_upstream *upstream;
+
+ /* Allocate space for the configuration */
+ ctx = config_create(ins);
+ if (!ctx) {
+ return -1;
+ }
+ flb_input_set_context(ins, ctx);
+
+ /* unit test 0: collector_time */
+ ret = flb_input_set_collector_time(ins, cb_collector_time,
+ CALLBACK_TIME, 0, config);
+ if (ret < 0) {
+ config_destroy(ctx);
+ return -1;
+ }
+ ut = &ctx->tests[0];
+ ut->coll_id = ret;
+
+ /* unit test 1: collector_fd_event */
+ ret = flb_pipe_create(ctx->pipe);
+ if (ret == -1) {
+ flb_errno();
+ config_destroy(ctx);
+ return -1;
+ }
+ ret = flb_input_set_collector_event(ins,
+ cb_collector_fd,
+ ctx->pipe[0],
+ config);
+ if (ret < 0) {
+ config_destroy(ctx);
+ return -1;
+ }
+ ut = &ctx->tests[1];
+ ut->coll_id = ret;
+
+ /* unit test 2: collector_socket */
+ fd = flb_net_server(SERVER_PORT, SERVER_IFACE);
+ if (fd < 0) {
+ flb_errno();
+ config_destroy(ctx);
+ return -1;
+ }
+ flb_net_socket_nonblocking(fd);
+ ctx->server_fd = fd;
+
+ /* socket server */
+ ret = flb_input_set_collector_socket(ins,
+ cb_collector_server_socket,
+ ctx->server_fd,
+ config);
+ if (ret == -1) {
+ config_destroy(ctx);
+ return -1;
+ }
+ ut = &ctx->tests[2];
+ ut->coll_id = ret;
+
+ /* socket client: connect to socket server to trigger the event */
+ ret = flb_input_set_collector_time(ins, cb_collector_server_client,
+ CALLBACK_TIME * 2, 0, config);
+ if (ret < 0) {
+ config_destroy(ctx);
+ return -1;
+ }
+ ctx->client_coll_id = ret;
+
+ /* upstream context for socket client */
+ upstream = flb_upstream_create(config, "127.0.0.1", atoi(SERVER_PORT),
+ FLB_IO_TCP, NULL);
+ if (!upstream) {
+ config_destroy(ctx);
+ return -1;
+ }
+ ctx->upstream = upstream;
+ flb_input_upstream_set(ctx->upstream, ins);
+
+ return 0;
+}
+
+static int cb_event_test_pre_run(struct flb_input_instance *ins,
+ struct flb_config *config, void *in_context)
+{
+ flb_plg_info(ins, "pre run OK");
+ return -1;
+}
+
+static void cb_event_test_pause(void *data, struct flb_config *config)
+{
+ struct event_test *ctx = data;
+
+ set_unit_test_status(ctx, 3, STATUS_OK);
+ flb_plg_info(ctx->ins, "[OK] engine has paused the plugin");
+}
+
+static void cb_event_test_resume(void *data, struct flb_config *config)
+{
+ struct event_test *ctx = data;
+
+ set_unit_test_status(ctx, 4, STATUS_OK);
+ flb_plg_info(ctx->ins, "[OK] engine has resumed the plugin");
+
+ flb_engine_exit(config);
+}
+
+static int in_event_test_exit(void *data, struct flb_config *config)
+{
+ int i;
+ int failed = FLB_FALSE;
+ struct event_test *ctx = data;
+ struct unit_test *ut;
+ (void) *config;
+
+ /* check tests */
+ for (i = 0; i < UNIT_TESTS_SIZE; i++) {
+ ut = &ctx->tests[i];
+ if (ut->status != STATUS_OK) {
+ flb_plg_error(ctx->ins, "unit test #%i '%s' failed",
+ i, ut->desc);
+ failed = FLB_TRUE;
+ }
+ else {
+ flb_plg_info(ctx->ins, "unit test #%i '%s' succeeded",
+ i, ut->desc);
+ }
+ }
+
+ /* if one test failed, perform an abrupt exit with proper error */
+ if (failed) {
+ exit(EXIT_FAILURE);
+ }
+
+ config_destroy(ctx);
+ return 0;
+}
+
+/* Configuration properties map */
+static struct flb_config_map config_map[] = {
+ /* EOF */
+ {0}
+};
+
+struct flb_input_plugin in_event_test_plugin = {
+ .name = "event_test",
+ .description = "Event tests for input plugins",
+ .cb_init = cb_event_test_init,
+ .cb_pre_run = cb_event_test_pre_run,
+ .cb_collect = NULL,
+ .cb_flush_buf = NULL,
+ .cb_pause = cb_event_test_pause,
+ .cb_resume = cb_event_test_resume,
+ .cb_exit = in_event_test_exit,
+ .config_map = config_map,
+ .flags = FLB_INPUT_CORO | FLB_INPUT_THREADED
+};