summaryrefslogtreecommitdiffstats
path: root/deps/jemalloc/include/jemalloc/internal/mpsc_queue.h
blob: 316ea9b16eddb73e80c683ef9325a6b1bbb9190d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#ifndef JEMALLOC_INTERNAL_MPSC_QUEUE_H
#define JEMALLOC_INTERNAL_MPSC_QUEUE_H

#include "jemalloc/internal/atomic.h"

/*
 * A concurrent implementation of a multi-producer, single-consumer queue.  It
 * supports three concurrent operations:
 * - Push
 * - Push batch
 * - Pop batch
 *
 * These operations are all lock-free.
 *
 * The implementation is the simple two-stack queue built on a Treiber stack.
 * It's not terribly efficient, but this isn't expected to go into anywhere with
 * hot code.  In fact, we don't really even need queue semantics in any
 * anticipated use cases; we could get away with just the stack.  But this way
 * lets us frame the API in terms of the existing list types, which is a nice
 * convenience.  We can save on cache misses by introducing our own (parallel)
 * single-linked list type here, and dropping FIFO semantics, if we need this to
 * get faster.  Since we're currently providing queue semantics though, we use
 * the prev field in the link rather than the next field for Treiber-stack
 * linkage, so that we can preserve order for bash-pushed lists (recall that the
 * two-stack tricks reverses orders in the lock-free first stack).
 */

#define mpsc_queue(a_type)						\
struct {								\
	atomic_p_t tail;						\
}

#define mpsc_queue_proto(a_attr, a_prefix, a_queue_type, a_type,	\
    a_list_type)							\
/* Initialize a queue. */						\
a_attr void								\
a_prefix##new(a_queue_type *queue);					\
/* Insert all items in src into the queue, clearing src. */		\
a_attr void								\
a_prefix##push_batch(a_queue_type *queue, a_list_type *src);		\
/* Insert node into the queue. */					\
a_attr void								\
a_prefix##push(a_queue_type *queue, a_type *node);			\
/*									\
 * Pop all items in the queue into the list at dst.  dst should already	\
 * be initialized (and may contain existing items, which then remain	\
 * in dst).								\
 */									\
a_attr void								\
a_prefix##pop_batch(a_queue_type *queue, a_list_type *dst);

#define mpsc_queue_gen(a_attr, a_prefix, a_queue_type, a_type,		\
    a_list_type, a_link)						\
a_attr void								\
a_prefix##new(a_queue_type *queue) {					\
	atomic_store_p(&queue->tail, NULL, ATOMIC_RELAXED);		\
}									\
a_attr void								\
a_prefix##push_batch(a_queue_type *queue, a_list_type *src) {		\
	/*								\
	 * Reuse the ql list next field as the Treiber stack next	\
	 * field.							\
	 */								\
	a_type *first = ql_first(src);					\
	a_type *last = ql_last(src, a_link);				\
	void* cur_tail = atomic_load_p(&queue->tail, ATOMIC_RELAXED);	\
	do {								\
		/*							\
		 * Note that this breaks the queue ring structure;	\
		 * it's not a ring any more!				\
		 */							\
		first->a_link.qre_prev = cur_tail;			\
		/*							\
		 * Note: the upcoming CAS doesn't need an atomic; every	\
		 * push only needs to synchronize with the next pop,	\
		 * which we get from the release sequence rules.	\
		 */							\
	} while (!atomic_compare_exchange_weak_p(&queue->tail,		\
	    &cur_tail, last, ATOMIC_RELEASE, ATOMIC_RELAXED));		\
	ql_new(src);							\
}									\
a_attr void								\
a_prefix##push(a_queue_type *queue, a_type *node) {			\
	ql_elm_new(node, a_link);					\
	a_list_type list;						\
	ql_new(&list);							\
	ql_head_insert(&list, node, a_link);				\
	a_prefix##push_batch(queue, &list);				\
}									\
a_attr void								\
a_prefix##pop_batch(a_queue_type *queue, a_list_type *dst) {		\
	a_type *tail = atomic_load_p(&queue->tail, ATOMIC_RELAXED);	\
	if (tail == NULL) {						\
		/*							\
		 * In the common special case where there are no	\
		 * pending elements, bail early without a costly RMW.	\
		 */							\
		return;							\
	}								\
	tail = atomic_exchange_p(&queue->tail, NULL, ATOMIC_ACQUIRE);	\
	/*								\
	 * It's a single-consumer queue, so if cur started non-NULL,	\
	 * it'd better stay non-NULL.					\
	 */								\
	assert(tail != NULL);						\
	/*								\
	 * We iterate through the stack and both fix up the link	\
	 * structure (stack insertion broke the list requirement that	\
	 * the list be circularly linked).  It's just as efficient at	\
	 * this point to make the queue a "real" queue, so do that as	\
	 * well.							\
	 * If this ever gets to be a hot spot, we can omit this fixup	\
	 * and make the queue a bag (i.e. not necessarily ordered), but	\
	 * that would mean jettisoning the existing list API as the 	\
	 * batch pushing/popping interface.				\
	 */								\
	a_list_type reversed;						\
	ql_new(&reversed);						\
	while (tail != NULL) {						\
		/*							\
		 * Pop an item off the stack, prepend it onto the list	\
		 * (reversing the order).  Recall that we use the	\
		 * list prev field as the Treiber stack next field to	\
		 * preserve order of batch-pushed items when reversed.	\
		 */							\
		a_type *next = tail->a_link.qre_prev;			\
		ql_elm_new(tail, a_link);				\
		ql_head_insert(&reversed, tail, a_link);		\
		tail = next;						\
	}								\
	ql_concat(dst, &reversed, a_link);				\
}

#endif /* JEMALLOC_INTERNAL_MPSC_QUEUE_H */