summaryrefslogtreecommitdiffstats
path: root/fluent-bit/tests/internal/flb_event_loop.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/tests/internal/flb_event_loop.c')
-rw-r--r--fluent-bit/tests/internal/flb_event_loop.c592
1 files changed, 592 insertions, 0 deletions
diff --git a/fluent-bit/tests/internal/flb_event_loop.c b/fluent-bit/tests/internal/flb_event_loop.c
new file mode 100644
index 000000000..669475d71
--- /dev/null
+++ b/fluent-bit/tests/internal/flb_event_loop.c
@@ -0,0 +1,592 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+#include <fluent-bit/flb_compat.h>
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_pack.h>
+#include <fluent-bit/flb_mp.h>
+#include <msgpack.h>
+
+#include "flb_tests_internal.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include <fluent-bit/flb_event_loop.h>
+#include <fluent-bit/flb_bucket_queue.h>
+#include <monkey/mk_core/mk_list.h>
+
+#define EVENT_LOOP_TEST_PRIORITIES 7
+#define EVENT_LOOP_MAX_EVENTS 64
+#ifdef _WIN32
+ #define TIME_EPSILON_MS 30
+#elif FLB_SYSTEM_MACOS
+ #define TIME_EPSILON_MS 200
+#else
+ #define TIME_EPSILON_MS 10
+#endif
+
+#define TIMER_COARSE_EPSION_MS 300
+
+struct test_evl_context {
+ struct mk_event_loop *evl;
+ struct flb_bucket_queue *bktq;
+};
+
+/*
+ * The following implements a uniform custom delayed event
+ * primarily to help Windows. It is needed to simulate
+ * another thread or network call writing to fd
+ * activating an event without disrupting the event (libevent)
+ * loop. LibEvent timeouts disrupt the libevent loop
+ * making the testcases non-homogeneous between platforms
+ */
+struct delay_worker_args {
+ int fd; /* pipefd */
+ int sec; /* seconds */
+};
+
+/* Writes to pipe after set delay. Cleans self up after pipe closure */
+void _delay_worker(void *arg) {
+ int ret;
+ uint64_t val = 1;
+ char tmp[100];
+
+ struct delay_worker_args *args = (struct delay_worker_args *) arg;
+ static int idx = 0;
+
+ sprintf(tmp, "delay-timer-%i", ++idx);
+ mk_utils_worker_rename(tmp);
+
+ /* Sleep for the delay period */
+ sleep(args->sec);
+
+ /* Send delayed event a notification */
+ flb_pipe_set_nonblocking(args->fd);
+ ret = flb_pipe_w(args->fd, &val, sizeof(uint64_t)); /* supposedly blocking */
+ if (ret == -1) {
+ flb_error("Delayed event: unable to trigger event via write to pipe");
+ }
+
+ /* Clean up */
+ flb_pipe_close(args->fd);
+ flb_free(args);
+}
+
+void test_timeout_create(struct mk_event_loop *loop,
+ time_t sec, long nsec, void *data)
+{
+ flb_pipefd_t fd[2];
+ int ret;
+ pthread_t tid;
+
+ ret = flb_pipe_create(fd);
+ if (ret == -1) {
+ flb_error("pipe creation failure");
+ return;
+ }
+
+ /*
+ * Spin up another thread to keep
+ * track of our delay (don't let register in event loop (like libevent))
+ */
+ struct delay_worker_args *args = flb_malloc(sizeof(struct delay_worker_args));
+
+ /* Register write end of data pipe */
+ args->fd = fd[1];
+ args->sec = sec;
+ ret = mk_utils_worker_spawn(_delay_worker, args, &tid); /* worker handles freeing up args. */
+
+ /* Convert read end to monkey event */
+ MK_EVENT_NEW(data);
+ mk_event_add(loop, fd[0], MK_EVENT_NOTIFICATION, MK_EVENT_READ, data);
+}
+
+void test_timeout_destroy(struct mk_event_loop *loop, void *data)
+{
+ struct mk_event *event = (struct mk_event *) data;
+ mk_event_del(loop, event);
+ flb_pipe_close(event->fd);
+}
+
+struct test_evl_context *evl_context_create()
+{
+ struct test_evl_context *ctx = flb_malloc(sizeof(struct test_evl_context));
+ ctx->evl = mk_event_loop_create(EVENT_LOOP_MAX_EVENTS);
+ ctx->bktq = flb_bucket_queue_create(EVENT_LOOP_TEST_PRIORITIES);
+ return ctx;
+}
+
+void evl_context_destroy(struct test_evl_context *ctx)
+{
+ flb_bucket_queue_destroy(ctx->bktq);
+ mk_event_loop_destroy(ctx->evl);
+ flb_free(ctx);
+}
+
+void test_simple_timeout_1000ms()
+{
+ struct test_evl_context *ctx;
+
+ struct flb_time start_time;
+ struct flb_time end_time;
+ struct flb_time diff_time;
+ uint64_t elapsed_time_flb;
+ int target;
+
+#ifdef _WIN32
+ WSADATA wsa_data;
+ WSAStartup(0x0201, &wsa_data);
+#endif
+
+ target = 1000;
+ ctx = evl_context_create();
+
+ flb_time_get(&start_time);
+
+ mk_event_wait_2(ctx->evl, target);
+
+ flb_time_get(&end_time);
+ flb_time_diff(&end_time, &start_time, &diff_time);
+ elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000;
+ TEST_CHECK(elapsed_time_flb < target + TIME_EPSILON_MS
+ && elapsed_time_flb > target - TIME_EPSILON_MS);
+ TEST_MSG("Target time failed for mk_wait_2. Expect %d ms. Waited %d ms\n", target,
+ (int) elapsed_time_flb);
+
+ evl_context_destroy(ctx);
+
+#ifdef _WIN32
+ WSACleanup();
+#endif
+}
+
+/*
+ * Non-block wait: 0ms, no event
+ * Add timer - 1s (very inexact)
+ * Non-block wait: 0ms, no event
+ * Blocking wait with 2.1s timeout: ~1s (very inexact), 1event
+ * Non-blocking wait: 0ms, 1 event
+ * Remove timer event
+ * Blocking wait with 2.1s timeout: 2.1s, no event
+ */
+void test_non_blocking_and_blocking_timeout()
+{
+ struct test_evl_context *ctx;
+
+ struct mk_event event = {0};
+
+ struct flb_time start_time;
+ struct flb_time end_time;
+ struct flb_time diff_time;
+ uint64_t elapsed_time_flb;
+ int n_events;
+
+ int target;
+ int wait_2_timeout = 2100;
+#ifdef _WIN32
+ WSADATA wsa_data;
+ WSAStartup(0x0201, &wsa_data);
+#endif
+
+ ctx = evl_context_create();
+
+ /* Non blocking wait -- no event */
+ target = 0;
+ flb_time_get(&start_time);
+ n_events = mk_event_wait_2(ctx->evl, 0);
+ flb_time_get(&end_time);
+
+ flb_time_diff(&end_time, &start_time, &diff_time);
+ elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000;
+ TEST_CHECK(elapsed_time_flb <= target + TIME_EPSILON_MS);
+ TEST_MSG("Target time failed for mk_wait_2. Expect %d ms. Waited %d ms\n", target,
+ (int) elapsed_time_flb);
+ TEST_CHECK(n_events == 0);
+
+ /* Add somewhat inexact 1 second timer */
+ target = 1000;
+ event.mask = MK_EVENT_EMPTY;
+ event.status = MK_EVENT_NONE;
+ test_timeout_create(ctx->evl, target / 1000, 0, &event);
+
+ /* Non blocking wait -- one event */
+ target = 0;
+ flb_time_get(&start_time);
+ n_events = mk_event_wait_2(ctx->evl, 0);
+ flb_time_get(&end_time);
+
+ flb_time_diff(&end_time, &start_time, &diff_time);
+ elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000;
+ TEST_CHECK(elapsed_time_flb <= target + TIME_EPSILON_MS);
+ TEST_MSG("Target time failed for mk_wait_2. Expect %d ms. Waited %d ms\n", target,
+ (int) elapsed_time_flb);
+ TEST_CHECK(n_events == 0);
+
+ /* Blocking wait with unused timeout */
+ target = 1000;
+ flb_time_get(&start_time);
+ n_events = mk_event_wait_2(ctx->evl, wait_2_timeout);
+ flb_time_get(&end_time);
+
+ flb_time_diff(&end_time, &start_time, &diff_time);
+ elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000;
+ TEST_CHECK(elapsed_time_flb < target + TIMER_COARSE_EPSION_MS
+ && elapsed_time_flb > 100); /* accommodate for timer inaccuracy */
+ TEST_MSG("Target time failed for mk_wait_2. Expect %d ms. Waited %d ms\n", target,
+ (int) elapsed_time_flb);
+ TEST_CHECK(n_events == 1);
+
+ /* Non blocking wait -- one event */
+ target = 0;
+ flb_time_get(&start_time);
+ n_events = mk_event_wait_2(ctx->evl, 0);
+ flb_time_get(&end_time);
+
+ flb_time_diff(&end_time, &start_time, &diff_time);
+ elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000;
+ TEST_CHECK(elapsed_time_flb <= target + TIME_EPSILON_MS);
+ TEST_MSG("Target time failed for mk_wait_2. Expect %d ms. Waited %d ms\n", target,
+ (int) elapsed_time_flb);
+ TEST_CHECK(n_events == 1);
+
+ /* Remove triggered 1s timer event */
+ test_timeout_destroy(ctx->evl, &event);
+
+ /* Blocking wait, used timeout */
+ target = wait_2_timeout;
+ flb_time_get(&start_time);
+ n_events = mk_event_wait_2(ctx->evl, wait_2_timeout);
+ flb_time_get(&end_time);
+ flb_time_diff(&end_time, &start_time, &diff_time);
+ elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000;
+ TEST_CHECK(elapsed_time_flb < target + TIME_EPSILON_MS
+ && elapsed_time_flb > target - TIME_EPSILON_MS);
+ TEST_MSG("Target time failed for mk_wait_2. Expect %d ms. Waited %d ms\n", target,
+ (int) elapsed_time_flb);
+ TEST_CHECK(n_events == 0);
+
+ evl_context_destroy(ctx);
+#ifdef _WIN32
+ WSACleanup();
+#endif
+}
+
+/*
+ * Add 1s timer
+ * Infinite wait: 1 event, < 1s + epsilon
+ * Remove timer
+ */
+void test_infinite_wait()
+{
+ struct test_evl_context *ctx;
+
+ struct mk_event event = {0};
+
+ struct flb_time start_time;
+ struct flb_time end_time;
+ struct flb_time diff_time;
+ uint64_t elapsed_time_flb;
+ int n_events;
+
+ int target;
+#ifdef _WIN32
+ WSADATA wsa_data;
+ WSAStartup(0x0201, &wsa_data);
+#endif
+
+ ctx = evl_context_create();
+
+ /* Add somewhat inexact 1 second timer */
+ target = 1000;
+ test_timeout_create(ctx->evl, target / 1000, 0, &event);
+
+ /* Infinite wait -- 1 event */
+ target = 1000;
+ flb_time_get(&start_time);
+ n_events = mk_event_wait(ctx->evl);
+ flb_time_get(&end_time);
+
+ flb_time_diff(&end_time, &start_time, &diff_time);
+ elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000;
+ TEST_CHECK(elapsed_time_flb < target + TIMER_COARSE_EPSION_MS
+ && elapsed_time_flb > 100); /* expect timer to be inexact */
+ TEST_MSG("Target time failed for mk_wait_2. Expect %d ms. Waited %d ms\n", target,
+ (int) elapsed_time_flb);
+ TEST_CHECK(n_events == 1);
+
+ /* Remove triggered 1s timer event */
+ test_timeout_destroy(ctx->evl, &event);
+
+ evl_context_destroy(ctx);
+#ifdef _WIN32
+ WSACleanup();
+#endif
+}
+
+void synchronize_tests()
+{
+ test_non_blocking_and_blocking_timeout();
+ test_infinite_wait();
+}
+
+/*
+ * Add non-delayed and delayed timers of varying priority to priority event loop, and
+ * verify timers are processed by order of priority and order of activation. Delete also
+ * checked by deleting events in several cases and confirming deleted events are not
+ * processed.
+ *
+ * Method:
+ * Add n_timers / 2 non-delayed timers
+ * delete 1/4th of the non-delayed timers
+ * Wait for non_delayed timers to activate
+ * delete 1/4th of the non-delayed timers
+ * Process events with mk_event_loop deleting each processed event
+ * Add n_timers / 2 non-delayed timers
+ * Add n_timers / 2 2s delayed timers
+ * Wait for non_delayed timers to activate
+ * Start looping though processing the events
+ * on the first iteration, (after initial events are tracked)
+ * Delete 1/2 of the non-delayed timers
+ * Wait for the delayed timers to activate
+ * Check that deleted events are not processed
+ * Check that non-delayed timers which are tracked first are processed before
+ * non-delayed events.
+ *
+ * Summary:
+ * Track priorities and confirm that all added events were processed
+ * Verify non-delayed timers are triggered before delayed timers
+ * Confirm delete properly deletes events before ready, after ready,
+ * and after tracked by event loop.
+ */
+void event_loop_stress_priority_add_delete()
+{
+ struct test_evl_context *ctx;
+
+ const int n_timers = EVENT_LOOP_MAX_EVENTS;
+ struct mk_event events[EVENT_LOOP_MAX_EVENTS] = {0};
+ struct mk_event *event_cronology[EVENT_LOOP_TEST_PRIORITIES] = {0}; /* event loop priority fifo */
+ int priority_cronology = 0;
+
+ struct mk_event *event;
+ int immediate_timers[EVENT_LOOP_TEST_PRIORITIES] = {0}; /* by priority */
+ int delayed_timers[EVENT_LOOP_TEST_PRIORITIES] = {0}; /* by priority */
+
+ int immediate_timers_triggered[EVENT_LOOP_TEST_PRIORITIES] = {0};
+ int delayed_timers_triggered[EVENT_LOOP_TEST_PRIORITIES] = {0};
+
+ int priority;
+ int n_events;
+ int target;
+
+ int i;
+ int j;
+ int ret = 0;
+ int immediate_timer_count;
+#ifdef _WIN32
+ WSADATA wsa_data;
+ WSAStartup(0x0201, &wsa_data);
+#endif
+
+ ctx = evl_context_create();
+ srand(20);
+
+ /* Add timers with no delay */
+ for (i = 0; i < n_timers / 2; ++i) {
+ priority = rand() % EVENT_LOOP_TEST_PRIORITIES;
+ target = 0;
+ memset(&events[i], 0, sizeof(struct mk_event));
+ test_timeout_create(ctx->evl, 0, 0, &events[i]);
+ events[i].priority = priority;
+ ++immediate_timers[priority];
+ }
+
+ usleep(400000); /* sleep 400 milliseconds for the 0delay events to register */
+
+ /* Remove the first n/8 events */
+ for (i = 0; i < n_timers / 8; ++i) {
+ test_timeout_destroy(ctx->evl, &events[i]);
+ --immediate_timers[(int) events[i].priority];
+ }
+
+ /* Wait on the no delay timers */
+ n_events = mk_event_wait(ctx->evl);
+ TEST_CHECK(n_events == n_timers / 2 - n_timers / 8);
+ TEST_MSG("Expected %i ready events from the no delay timers. Recieved %i",
+ n_timers / 2 - n_timers / 8, ret);
+
+ /* Remove the first n/8 events */
+ for (i = n_timers / 8; i < n_timers / 4; ++i) {
+ test_timeout_destroy(ctx->evl, &events[i]);
+ --immediate_timers[(int) events[i].priority];
+ }
+
+ i = 0;
+ do { /* variable closure */
+ flb_event_priority_live_foreach(event, ctx->bktq, ctx->evl, n_timers) {
+ /* check priority cronology */
+ TEST_CHECK(event->priority >= priority_cronology);
+ TEST_MSG("Priority event loop processed events out of order.");
+ priority_cronology = event->priority;
+
+ /* check none of the deleted records appear */
+ TEST_CHECK(event >= &events[n_timers / 4]);
+ TEST_MSG("Deleted event appeared in priority event loop.");
+
+ /* update records */
+
+ /* delete event */
+ test_timeout_destroy(ctx->evl, event);
+
+ /* update records */
+ test_timeout_destroy(ctx->evl, event);
+ if (event < &events[n_timers/2]) {
+ /* immediate timer */
+ --immediate_timers[(int) event->priority];
+ ++immediate_timers_triggered[(int) event->priority];
+ }
+ else {
+ /* delayed timer */
+ --delayed_timers[(int) event->priority];
+ ++delayed_timers_triggered[(int) event->priority];
+ }
+ ++i;
+ }
+ } while (0);
+ TEST_CHECK(i == n_timers / 4);
+ TEST_MSG("Not all no-wait timers activated");
+
+ /* verify number of immediate timers triggered */
+ for (i = 0; i < EVENT_LOOP_TEST_PRIORITIES; ++i) {
+ TEST_CHECK(immediate_timers[i] == 0);
+ TEST_MSG("Priority event register and triggered mismatch for priority %i. "
+ "Remaining: %i out of: %i", i, immediate_timers[i],
+ immediate_timers_triggered[i]);
+ }
+
+ /* Re-add timers with no delay */
+ for (i = 0; i < n_timers / 2; ++i) {
+ priority = rand() % EVENT_LOOP_TEST_PRIORITIES;
+ target = 0;
+ memset(&events[i], 0, sizeof(struct mk_event));
+ test_timeout_create(ctx->evl, target, 0, &events[i]);
+ events[i].priority = priority;
+ ++immediate_timers[priority];
+ }
+
+ usleep(400000); /* sleep 200 milliseconds for the 0delay events to register */
+
+ /* Add timers with delay */
+ for (i = n_timers / 2; i < n_timers; ++i) {
+ priority = rand() % EVENT_LOOP_TEST_PRIORITIES;
+ target = 2; /* 2 second delay */
+ memset(&events[i], 0, sizeof(struct mk_event));
+ test_timeout_create(ctx->evl, target, 0, &events[i]);
+ events[i].priority = priority;
+ ++delayed_timers[priority];
+ }
+
+ /* Wait on the timers */
+ n_events = mk_event_wait(ctx->evl);
+ TEST_CHECK(n_events == n_timers / 2);
+ TEST_MSG("Expected %i ready events from the no delay timers. Recieved %i", n_timers / 2, ret);
+ j = 0;
+ priority_cronology = 0;
+ do { /* variable closure */
+ flb_event_priority_live_foreach(event, ctx->bktq, ctx->evl, n_timers) {
+
+ /* first round, delete half of all 0delay timers */
+ if (j == 0) {
+
+ /* this tests propper removal from bucket queue */
+ for (i = 0; i < n_timers/4; ++i) {
+ if (&events[i] == event) {
+ continue;
+ }
+ test_timeout_destroy(ctx->evl, &events[i]);
+ --immediate_timers[(int) events[i].priority];
+ }
+
+ /* check priority cronology */
+ TEST_CHECK(event->priority >= priority_cronology);
+ priority_cronology = event->priority;
+
+ /* delete actual event */
+ test_timeout_destroy(ctx->evl, event);
+ --immediate_timers[(int) event->priority];
+ TEST_CHECK(event < &events[n_timers/2]);
+ TEST_MSG("Processed delayed timer first. Should process immediate timer first");
+
+ /* delay for the delayed timers to register */
+ usleep(2500000); /* 2.5 seconds */
+ ++j;
+ continue;
+ }
+
+ /* validate fifo nature. inspect cross from no-timeout to timeout event */
+ /* check event fifo cronology */
+ /* (priority A) [immediate timer] -> [delay timer]: check all immediate timers processed */
+ if (event_cronology[(int) event->priority] < &events[n_timers / 2]
+ && event >= &events[n_timers / 2]) {
+ /* verify that all of the immediate_timers in priority have been removed */
+ immediate_timer_count = 0;
+ for (i = 0; i < EVENT_LOOP_TEST_PRIORITIES; ++i) {
+ immediate_timer_count += immediate_timers[i];
+ }
+ TEST_CHECK(immediate_timers[(int) event->priority] == 0);
+ TEST_MSG("immediate timer events are not all processed before delayed timer events for priority %i", event->priority);
+ }
+ /* check for non fifo behavior */
+ /* (priority A) [delay timer] -> [immediate timer]: disallow */
+ if (!TEST_CHECK(!(event_cronology[(int) event->priority] >= &events[n_timers / 2]
+ && event < &events[n_timers / 2]))) {
+ TEST_MSG("Non fifo behavior within priority. Delayed event processed before immediate event.");
+ }
+ event_cronology[(int) event->priority] = event;
+
+ /* check priority cronology */
+ TEST_CHECK(event->priority >= priority_cronology);
+ TEST_MSG("Priority event loop processed events out of order.");
+ priority_cronology = event->priority;
+
+ /* verify none of the deleted timers are processed */
+ TEST_CHECK(event >= &events[n_timers / 4]);
+ TEST_MSG("Processed a deleted timer. Delete performed after event is registered in the event loop bucket queue.");
+
+ /* update records */
+ test_timeout_destroy(ctx->evl, event);
+ if (event < &events[n_timers/2]) {
+ /* immediate timer */
+ --immediate_timers[(int) event->priority];
+ }
+ else {
+ /* delayed timer */
+ --delayed_timers[(int) event->priority];
+ }
+ ++j;
+ }
+ } while (0);
+
+ /* validate all timers processed */
+ for (i = 0; i < EVENT_LOOP_TEST_PRIORITIES; ++i) {
+ TEST_CHECK(immediate_timers[i] == 0);
+ TEST_MSG("Not all immediate timers processed");
+ }
+ for (i = 0; i < EVENT_LOOP_TEST_PRIORITIES; ++i) {
+ TEST_CHECK(delayed_timers[i] == 0);
+ TEST_MSG("Not all delayed timers processed");
+ }
+
+ evl_context_destroy(ctx);
+#ifdef _WIN32
+ WSACleanup();
+#endif
+}
+
+TEST_LIST = {
+ {"test_simple_timeout_1000ms", test_simple_timeout_1000ms},
+ {"test_non_blocking_and_blocking_timeout", test_non_blocking_and_blocking_timeout},
+ {"test_infinite_wait", test_infinite_wait},
+ {"event_loop_stress_priority_add_delete", event_loop_stress_priority_add_delete},
+ { 0 }
+};