diff options
Diffstat (limited to 'fluent-bit/lib/jemalloc-5.3.0/test/unit/mpsc_queue.c')
-rw-r--r-- | fluent-bit/lib/jemalloc-5.3.0/test/unit/mpsc_queue.c | 304 |
1 files changed, 304 insertions, 0 deletions
diff --git a/fluent-bit/lib/jemalloc-5.3.0/test/unit/mpsc_queue.c b/fluent-bit/lib/jemalloc-5.3.0/test/unit/mpsc_queue.c new file mode 100644 index 00000000..895edf84 --- /dev/null +++ b/fluent-bit/lib/jemalloc-5.3.0/test/unit/mpsc_queue.c @@ -0,0 +1,304 @@ +#include "test/jemalloc_test.h" + +#include "jemalloc/internal/mpsc_queue.h" + +typedef struct elem_s elem_t; +typedef ql_head(elem_t) elem_list_t; +typedef mpsc_queue(elem_t) elem_mpsc_queue_t; +struct elem_s { + int thread; + int idx; + ql_elm(elem_t) link; +}; + +/* Include both proto and gen to make sure they match up. */ +mpsc_queue_proto(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t, + elem_list_t); +mpsc_queue_gen(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t, + elem_list_t, link); + +static void +init_elems_simple(elem_t *elems, int nelems, int thread) { + for (int i = 0; i < nelems; i++) { + elems[i].thread = thread; + elems[i].idx = i; + ql_elm_new(&elems[i], link); + } +} + +static void +check_elems_simple(elem_list_t *list, int nelems, int thread) { + elem_t *elem; + int next_idx = 0; + ql_foreach(elem, list, link) { + expect_d_lt(next_idx, nelems, "Too many list items"); + expect_d_eq(thread, elem->thread, ""); + expect_d_eq(next_idx, elem->idx, "List out of order"); + next_idx++; + } +} + +TEST_BEGIN(test_simple) { + enum {NELEMS = 10}; + elem_t elems[NELEMS]; + elem_list_t list; + elem_mpsc_queue_t queue; + + /* Pop empty queue onto empty list -> empty list */ + ql_new(&list); + elem_mpsc_queue_new(&queue); + elem_mpsc_queue_pop_batch(&queue, &list); + expect_true(ql_empty(&list), ""); + + /* Pop empty queue onto nonempty list -> list unchanged */ + ql_new(&list); + elem_mpsc_queue_new(&queue); + init_elems_simple(elems, NELEMS, 0); + for (int i = 0; i < NELEMS; i++) { + ql_tail_insert(&list, &elems[i], link); + } + elem_mpsc_queue_pop_batch(&queue, &list); + check_elems_simple(&list, NELEMS, 0); + + /* Pop nonempty queue onto empty list -> list takes queue contents */ + ql_new(&list); + elem_mpsc_queue_new(&queue); + init_elems_simple(elems, NELEMS, 0); + for (int i = 0; i < NELEMS; i++) { + elem_mpsc_queue_push(&queue, &elems[i]); + } + elem_mpsc_queue_pop_batch(&queue, &list); + check_elems_simple(&list, NELEMS, 0); + + /* Pop nonempty queue onto nonempty list -> list gains queue contents */ + ql_new(&list); + elem_mpsc_queue_new(&queue); + init_elems_simple(elems, NELEMS, 0); + for (int i = 0; i < NELEMS / 2; i++) { + ql_tail_insert(&list, &elems[i], link); + } + for (int i = NELEMS / 2; i < NELEMS; i++) { + elem_mpsc_queue_push(&queue, &elems[i]); + } + elem_mpsc_queue_pop_batch(&queue, &list); + check_elems_simple(&list, NELEMS, 0); + +} +TEST_END + +TEST_BEGIN(test_push_single_or_batch) { + enum { + BATCH_MAX = 10, + /* + * We'll push i items one-at-a-time, then i items as a batch, + * then i items as a batch again, as i ranges from 1 to + * BATCH_MAX. So we need 3 times the sum of the numbers from 1 + * to BATCH_MAX elements total. + */ + NELEMS = 3 * BATCH_MAX * (BATCH_MAX - 1) / 2 + }; + elem_t elems[NELEMS]; + init_elems_simple(elems, NELEMS, 0); + elem_list_t list; + ql_new(&list); + elem_mpsc_queue_t queue; + elem_mpsc_queue_new(&queue); + int next_idx = 0; + for (int i = 1; i < 10; i++) { + /* Push i items 1 at a time. */ + for (int j = 0; j < i; j++) { + elem_mpsc_queue_push(&queue, &elems[next_idx]); + next_idx++; + } + /* Push i items in batch. */ + for (int j = 0; j < i; j++) { + ql_tail_insert(&list, &elems[next_idx], link); + next_idx++; + } + elem_mpsc_queue_push_batch(&queue, &list); + expect_true(ql_empty(&list), "Batch push should empty source"); + /* + * Push i items in batch, again. This tests two batches + * proceeding one after the other. + */ + for (int j = 0; j < i; j++) { + ql_tail_insert(&list, &elems[next_idx], link); + next_idx++; + } + elem_mpsc_queue_push_batch(&queue, &list); + expect_true(ql_empty(&list), "Batch push should empty source"); + } + expect_d_eq(NELEMS, next_idx, "Miscomputed number of elems to push."); + + expect_true(ql_empty(&list), ""); + elem_mpsc_queue_pop_batch(&queue, &list); + check_elems_simple(&list, NELEMS, 0); +} +TEST_END + +TEST_BEGIN(test_multi_op) { + enum {NELEMS = 20}; + elem_t elems[NELEMS]; + init_elems_simple(elems, NELEMS, 0); + elem_list_t push_list; + ql_new(&push_list); + elem_list_t result_list; + ql_new(&result_list); + elem_mpsc_queue_t queue; + elem_mpsc_queue_new(&queue); + + int next_idx = 0; + /* Push first quarter 1-at-a-time. */ + for (int i = 0; i < NELEMS / 4; i++) { + elem_mpsc_queue_push(&queue, &elems[next_idx]); + next_idx++; + } + /* Push second quarter in batch. */ + for (int i = NELEMS / 4; i < NELEMS / 2; i++) { + ql_tail_insert(&push_list, &elems[next_idx], link); + next_idx++; + } + elem_mpsc_queue_push_batch(&queue, &push_list); + /* Batch pop all pushed elements. */ + elem_mpsc_queue_pop_batch(&queue, &result_list); + /* Push third quarter in batch. */ + for (int i = NELEMS / 2; i < 3 * NELEMS / 4; i++) { + ql_tail_insert(&push_list, &elems[next_idx], link); + next_idx++; + } + elem_mpsc_queue_push_batch(&queue, &push_list); + /* Push last quarter one-at-a-time. */ + for (int i = 3 * NELEMS / 4; i < NELEMS; i++) { + elem_mpsc_queue_push(&queue, &elems[next_idx]); + next_idx++; + } + /* Pop them again. Order of existing list should be preserved. */ + elem_mpsc_queue_pop_batch(&queue, &result_list); + + check_elems_simple(&result_list, NELEMS, 0); + +} +TEST_END + +typedef struct pusher_arg_s pusher_arg_t; +struct pusher_arg_s { + elem_mpsc_queue_t *queue; + int thread; + elem_t *elems; + int nelems; +}; + +typedef struct popper_arg_s popper_arg_t; +struct popper_arg_s { + elem_mpsc_queue_t *queue; + int npushers; + int nelems_per_pusher; + int *pusher_counts; +}; + +static void * +thd_pusher(void *void_arg) { + pusher_arg_t *arg = (pusher_arg_t *)void_arg; + int next_idx = 0; + while (next_idx < arg->nelems) { + /* Push 10 items in batch. */ + elem_list_t list; + ql_new(&list); + int limit = next_idx + 10; + while (next_idx < arg->nelems && next_idx < limit) { + ql_tail_insert(&list, &arg->elems[next_idx], link); + next_idx++; + } + elem_mpsc_queue_push_batch(arg->queue, &list); + /* Push 10 items one-at-a-time. */ + limit = next_idx + 10; + while (next_idx < arg->nelems && next_idx < limit) { + elem_mpsc_queue_push(arg->queue, &arg->elems[next_idx]); + next_idx++; + } + + } + return NULL; +} + +static void * +thd_popper(void *void_arg) { + popper_arg_t *arg = (popper_arg_t *)void_arg; + int done_pushers = 0; + while (done_pushers < arg->npushers) { + elem_list_t list; + ql_new(&list); + elem_mpsc_queue_pop_batch(arg->queue, &list); + elem_t *elem; + ql_foreach(elem, &list, link) { + int thread = elem->thread; + int idx = elem->idx; + expect_d_eq(arg->pusher_counts[thread], idx, + "Thread's pushes reordered"); + arg->pusher_counts[thread]++; + if (arg->pusher_counts[thread] + == arg->nelems_per_pusher) { + done_pushers++; + } + } + } + return NULL; +} + +TEST_BEGIN(test_multiple_threads) { + enum { + NPUSHERS = 4, + NELEMS_PER_PUSHER = 1000*1000, + }; + thd_t pushers[NPUSHERS]; + pusher_arg_t pusher_arg[NPUSHERS]; + + thd_t popper; + popper_arg_t popper_arg; + + elem_mpsc_queue_t queue; + elem_mpsc_queue_new(&queue); + + elem_t *elems = calloc(NPUSHERS * NELEMS_PER_PUSHER, sizeof(elem_t)); + elem_t *elem_iter = elems; + for (int i = 0; i < NPUSHERS; i++) { + pusher_arg[i].queue = &queue; + pusher_arg[i].thread = i; + pusher_arg[i].elems = elem_iter; + pusher_arg[i].nelems = NELEMS_PER_PUSHER; + + init_elems_simple(elem_iter, NELEMS_PER_PUSHER, i); + elem_iter += NELEMS_PER_PUSHER; + } + popper_arg.queue = &queue; + popper_arg.npushers = NPUSHERS; + popper_arg.nelems_per_pusher = NELEMS_PER_PUSHER; + int pusher_counts[NPUSHERS] = {0}; + popper_arg.pusher_counts = pusher_counts; + + thd_create(&popper, thd_popper, (void *)&popper_arg); + for (int i = 0; i < NPUSHERS; i++) { + thd_create(&pushers[i], thd_pusher, &pusher_arg[i]); + } + + thd_join(popper, NULL); + for (int i = 0; i < NPUSHERS; i++) { + thd_join(pushers[i], NULL); + } + + for (int i = 0; i < NPUSHERS; i++) { + expect_d_eq(NELEMS_PER_PUSHER, pusher_counts[i], ""); + } + + free(elems); +} +TEST_END + +int +main(void) { + return test_no_reentrancy( + test_simple, + test_push_single_or_batch, + test_multi_op, + test_multiple_threads); +} |