#include "test/jemalloc_test.h" #include "jemalloc/internal/mpsc_queue.h" typedef struct elem_s elem_t; typedef ql_head(elem_t) elem_list_t; typedef mpsc_queue(elem_t) elem_mpsc_queue_t; struct elem_s { int thread; int idx; ql_elm(elem_t) link; }; /* Include both proto and gen to make sure they match up. */ mpsc_queue_proto(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t, elem_list_t); mpsc_queue_gen(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t, elem_list_t, link); static void init_elems_simple(elem_t *elems, int nelems, int thread) { for (int i = 0; i < nelems; i++) { elems[i].thread = thread; elems[i].idx = i; ql_elm_new(&elems[i], link); } } static void check_elems_simple(elem_list_t *list, int nelems, int thread) { elem_t *elem; int next_idx = 0; ql_foreach(elem, list, link) { expect_d_lt(next_idx, nelems, "Too many list items"); expect_d_eq(thread, elem->thread, ""); expect_d_eq(next_idx, elem->idx, "List out of order"); next_idx++; } } TEST_BEGIN(test_simple) { enum {NELEMS = 10}; elem_t elems[NELEMS]; elem_list_t list; elem_mpsc_queue_t queue; /* Pop empty queue onto empty list -> empty list */ ql_new(&list); elem_mpsc_queue_new(&queue); elem_mpsc_queue_pop_batch(&queue, &list); expect_true(ql_empty(&list), ""); /* Pop empty queue onto nonempty list -> list unchanged */ ql_new(&list); elem_mpsc_queue_new(&queue); init_elems_simple(elems, NELEMS, 0); for (int i = 0; i < NELEMS; i++) { ql_tail_insert(&list, &elems[i], link); } elem_mpsc_queue_pop_batch(&queue, &list); check_elems_simple(&list, NELEMS, 0); /* Pop nonempty queue onto empty list -> list takes queue contents */ ql_new(&list); elem_mpsc_queue_new(&queue); init_elems_simple(elems, NELEMS, 0); for (int i = 0; i < NELEMS; i++) { elem_mpsc_queue_push(&queue, &elems[i]); } elem_mpsc_queue_pop_batch(&queue, &list); check_elems_simple(&list, NELEMS, 0); /* Pop nonempty queue onto nonempty list -> list gains queue contents */ ql_new(&list); elem_mpsc_queue_new(&queue); init_elems_simple(elems, NELEMS, 0); for (int i = 0; i < NELEMS / 2; i++) { ql_tail_insert(&list, &elems[i], link); } for (int i = NELEMS / 2; i < NELEMS; i++) { elem_mpsc_queue_push(&queue, &elems[i]); } elem_mpsc_queue_pop_batch(&queue, &list); check_elems_simple(&list, NELEMS, 0); } TEST_END TEST_BEGIN(test_push_single_or_batch) { enum { BATCH_MAX = 10, /* * We'll push i items one-at-a-time, then i items as a batch, * then i items as a batch again, as i ranges from 1 to * BATCH_MAX. So we need 3 times the sum of the numbers from 1 * to BATCH_MAX elements total. */ NELEMS = 3 * BATCH_MAX * (BATCH_MAX - 1) / 2 }; elem_t elems[NELEMS]; init_elems_simple(elems, NELEMS, 0); elem_list_t list; ql_new(&list); elem_mpsc_queue_t queue; elem_mpsc_queue_new(&queue); int next_idx = 0; for (int i = 1; i < 10; i++) { /* Push i items 1 at a time. */ for (int j = 0; j < i; j++) { elem_mpsc_queue_push(&queue, &elems[next_idx]); next_idx++; } /* Push i items in batch. */ for (int j = 0; j < i; j++) { ql_tail_insert(&list, &elems[next_idx], link); next_idx++; } elem_mpsc_queue_push_batch(&queue, &list); expect_true(ql_empty(&list), "Batch push should empty source"); /* * Push i items in batch, again. This tests two batches * proceeding one after the other. */ for (int j = 0; j < i; j++) { ql_tail_insert(&list, &elems[next_idx], link); next_idx++; } elem_mpsc_queue_push_batch(&queue, &list); expect_true(ql_empty(&list), "Batch push should empty source"); } expect_d_eq(NELEMS, next_idx, "Miscomputed number of elems to push."); expect_true(ql_empty(&list), ""); elem_mpsc_queue_pop_batch(&queue, &list); check_elems_simple(&list, NELEMS, 0); } TEST_END TEST_BEGIN(test_multi_op) { enum {NELEMS = 20}; elem_t elems[NELEMS]; init_elems_simple(elems, NELEMS, 0); elem_list_t push_list; ql_new(&push_list); elem_list_t result_list; ql_new(&result_list); elem_mpsc_queue_t queue; elem_mpsc_queue_new(&queue); int next_idx = 0; /* Push first quarter 1-at-a-time. */ for (int i = 0; i < NELEMS / 4; i++) { elem_mpsc_queue_push(&queue, &elems[next_idx]); next_idx++; } /* Push second quarter in batch. */ for (int i = NELEMS / 4; i < NELEMS / 2; i++) { ql_tail_insert(&push_list, &elems[next_idx], link); next_idx++; } elem_mpsc_queue_push_batch(&queue, &push_list); /* Batch pop all pushed elements. */ elem_mpsc_queue_pop_batch(&queue, &result_list); /* Push third quarter in batch. */ for (int i = NELEMS / 2; i < 3 * NELEMS / 4; i++) { ql_tail_insert(&push_list, &elems[next_idx], link); next_idx++; } elem_mpsc_queue_push_batch(&queue, &push_list); /* Push last quarter one-at-a-time. */ for (int i = 3 * NELEMS / 4; i < NELEMS; i++) { elem_mpsc_queue_push(&queue, &elems[next_idx]); next_idx++; } /* Pop them again. Order of existing list should be preserved. */ elem_mpsc_queue_pop_batch(&queue, &result_list); check_elems_simple(&result_list, NELEMS, 0); } TEST_END typedef struct pusher_arg_s pusher_arg_t; struct pusher_arg_s { elem_mpsc_queue_t *queue; int thread; elem_t *elems; int nelems; }; typedef struct popper_arg_s popper_arg_t; struct popper_arg_s { elem_mpsc_queue_t *queue; int npushers; int nelems_per_pusher; int *pusher_counts; }; static void * thd_pusher(void *void_arg) { pusher_arg_t *arg = (pusher_arg_t *)void_arg; int next_idx = 0; while (next_idx < arg->nelems) { /* Push 10 items in batch. */ elem_list_t list; ql_new(&list); int limit = next_idx + 10; while (next_idx < arg->nelems && next_idx < limit) { ql_tail_insert(&list, &arg->elems[next_idx], link); next_idx++; } elem_mpsc_queue_push_batch(arg->queue, &list); /* Push 10 items one-at-a-time. */ limit = next_idx + 10; while (next_idx < arg->nelems && next_idx < limit) { elem_mpsc_queue_push(arg->queue, &arg->elems[next_idx]); next_idx++; } } return NULL; } static void * thd_popper(void *void_arg) { popper_arg_t *arg = (popper_arg_t *)void_arg; int done_pushers = 0; while (done_pushers < arg->npushers) { elem_list_t list; ql_new(&list); elem_mpsc_queue_pop_batch(arg->queue, &list); elem_t *elem; ql_foreach(elem, &list, link) { int thread = elem->thread; int idx = elem->idx; expect_d_eq(arg->pusher_counts[thread], idx, "Thread's pushes reordered"); arg->pusher_counts[thread]++; if (arg->pusher_counts[thread] == arg->nelems_per_pusher) { done_pushers++; } } } return NULL; } TEST_BEGIN(test_multiple_threads) { enum { NPUSHERS = 4, NELEMS_PER_PUSHER = 1000*1000, }; thd_t pushers[NPUSHERS]; pusher_arg_t pusher_arg[NPUSHERS]; thd_t popper; popper_arg_t popper_arg; elem_mpsc_queue_t queue; elem_mpsc_queue_new(&queue); elem_t *elems = calloc(NPUSHERS * NELEMS_PER_PUSHER, sizeof(elem_t)); elem_t *elem_iter = elems; for (int i = 0; i < NPUSHERS; i++) { pusher_arg[i].queue = &queue; pusher_arg[i].thread = i; pusher_arg[i].elems = elem_iter; pusher_arg[i].nelems = NELEMS_PER_PUSHER; init_elems_simple(elem_iter, NELEMS_PER_PUSHER, i); elem_iter += NELEMS_PER_PUSHER; } popper_arg.queue = &queue; popper_arg.npushers = NPUSHERS; popper_arg.nelems_per_pusher = NELEMS_PER_PUSHER; int pusher_counts[NPUSHERS] = {0}; popper_arg.pusher_counts = pusher_counts; thd_create(&popper, thd_popper, (void *)&popper_arg); for (int i = 0; i < NPUSHERS; i++) { thd_create(&pushers[i], thd_pusher, &pusher_arg[i]); } thd_join(popper, NULL); for (int i = 0; i < NPUSHERS; i++) { thd_join(pushers[i], NULL); } for (int i = 0; i < NPUSHERS; i++) { expect_d_eq(NELEMS_PER_PUSHER, pusher_counts[i], ""); } free(elems); } TEST_END int main(void) { return test_no_reentrancy( test_simple, test_push_single_or_batch, test_multi_op, test_multiple_threads); }