summaryrefslogtreecommitdiffstats
path: root/drivers/md/dm-vdo/funnel-queue.c
blob: a63b2f2bfd7de9e6e6d5298e440ba37db5a30823 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// SPDX-License-Identifier: GPL-2.0-only
/*
 * Copyright 2023 Red Hat
 */

#include "funnel-queue.h"

#include "cpu.h"
#include "memory-alloc.h"
#include "permassert.h"

int vdo_make_funnel_queue(struct funnel_queue **queue_ptr)
{
	int result;
	struct funnel_queue *queue;

	result = vdo_allocate(1, struct funnel_queue, "funnel queue", &queue);
	if (result != VDO_SUCCESS)
		return result;

	/*
	 * Initialize the stub entry and put it in the queue, establishing the invariant that
	 * queue->newest and queue->oldest are never null.
	 */
	queue->stub.next = NULL;
	queue->newest = &queue->stub;
	queue->oldest = &queue->stub;

	*queue_ptr = queue;
	return VDO_SUCCESS;
}

void vdo_free_funnel_queue(struct funnel_queue *queue)
{
	vdo_free(queue);
}

static struct funnel_queue_entry *get_oldest(struct funnel_queue *queue)
{
	/*
	 * Barrier requirements: We need a read barrier between reading a "next" field pointer
	 * value and reading anything it points to. There's an accompanying barrier in
	 * vdo_funnel_queue_put() between its caller setting up the entry and making it visible.
	 */
	struct funnel_queue_entry *oldest = queue->oldest;
	struct funnel_queue_entry *next = READ_ONCE(oldest->next);

	if (oldest == &queue->stub) {
		/*
		 * When the oldest entry is the stub and it has no successor, the queue is
		 * logically empty.
		 */
		if (next == NULL)
			return NULL;
		/*
		 * The stub entry has a successor, so the stub can be dequeued and ignored without
		 * breaking the queue invariants.
		 */
		oldest = next;
		queue->oldest = oldest;
		next = READ_ONCE(oldest->next);
	}

	/*
	 * We have a non-stub candidate to dequeue. If it lacks a successor, we'll need to put the
	 * stub entry back on the queue first.
	 */
	if (next == NULL) {
		struct funnel_queue_entry *newest = READ_ONCE(queue->newest);

		if (oldest != newest) {
			/*
			 * Another thread has already swung queue->newest atomically, but not yet
			 * assigned previous->next. The queue is really still empty.
			 */
			return NULL;
		}

		/*
		 * Put the stub entry back on the queue, ensuring a successor will eventually be
		 * seen.
		 */
		vdo_funnel_queue_put(queue, &queue->stub);

		/* Check again for a successor. */
		next = READ_ONCE(oldest->next);
		if (next == NULL) {
			/*
			 * We lost a race with a producer who swapped queue->newest before we did,
			 * but who hasn't yet updated previous->next. Try again later.
			 */
			return NULL;
		}
	}

	return oldest;
}

/*
 * Poll a queue, removing the oldest entry if the queue is not empty. This function must only be
 * called from a single consumer thread.
 */
struct funnel_queue_entry *vdo_funnel_queue_poll(struct funnel_queue *queue)
{
	struct funnel_queue_entry *oldest = get_oldest(queue);

	if (oldest == NULL)
		return oldest;

	/*
	 * Dequeue the oldest entry and return it. Only one consumer thread may call this function,
	 * so no locking, atomic operations, or fences are needed; queue->oldest is owned by the
	 * consumer and oldest->next is never used by a producer thread after it is swung from NULL
	 * to non-NULL.
	 */
	queue->oldest = READ_ONCE(oldest->next);
	/*
	 * Make sure the caller sees the proper stored data for this entry. Since we've already
	 * fetched the entry pointer we stored in "queue->oldest", this also ensures that on entry
	 * to the next call we'll properly see the dependent data.
	 */
	smp_rmb();
	/*
	 * If "oldest" is a very light-weight work item, we'll be looking for the next one very
	 * soon, so prefetch it now.
	 */
	uds_prefetch_address(queue->oldest, true);
	WRITE_ONCE(oldest->next, NULL);
	return oldest;
}

/*
 * Check whether the funnel queue is empty or not. If the queue is in a transition state with one
 * or more entries being added such that the list view is incomplete, this function will report the
 * queue as empty.
 */
bool vdo_is_funnel_queue_empty(struct funnel_queue *queue)
{
	return get_oldest(queue) == NULL;
}

/*
 * Check whether the funnel queue is idle or not. If the queue has entries available to be
 * retrieved, it is not idle. If the queue is in a transition state with one or more entries being
 * added such that the list view is incomplete, it may not be possible to retrieve an entry with
 * the vdo_funnel_queue_poll() function, but the queue will not be considered idle.
 */
bool vdo_is_funnel_queue_idle(struct funnel_queue *queue)
{
	/*
	 * Oldest is not the stub, so there's another entry, though if next is NULL we can't
	 * retrieve it yet.
	 */
	if (queue->oldest != &queue->stub)
		return false;

	/*
	 * Oldest is the stub, but newest has been updated by _put(); either there's another,
	 * retrievable entry in the list, or the list is officially empty but in the intermediate
	 * state of having an entry added.
	 *
	 * Whether anything is retrievable depends on whether stub.next has been updated and become
	 * visible to us, but for idleness we don't care. And due to memory ordering in _put(), the
	 * update to newest would be visible to us at the same time or sooner.
	 */
	if (READ_ONCE(queue->newest) != &queue->stub)
		return false;

	return true;
}