diff options
Diffstat (limited to 'drivers/md/dm-vdo/indexer/funnel-requestqueue.c')
-rw-r--r-- | drivers/md/dm-vdo/indexer/funnel-requestqueue.c | 279 |
1 files changed, 279 insertions, 0 deletions
diff --git a/drivers/md/dm-vdo/indexer/funnel-requestqueue.c b/drivers/md/dm-vdo/indexer/funnel-requestqueue.c new file mode 100644 index 0000000000..1a5735375d --- /dev/null +++ b/drivers/md/dm-vdo/indexer/funnel-requestqueue.c @@ -0,0 +1,279 @@ +// SPDX-License-Identifier: GPL-2.0-only +/* + * Copyright 2023 Red Hat + */ + +#include "funnel-requestqueue.h" + +#include <linux/atomic.h> +#include <linux/compiler.h> +#include <linux/wait.h> + +#include "funnel-queue.h" +#include "logger.h" +#include "memory-alloc.h" +#include "thread-utils.h" + +/* + * This queue will attempt to handle requests in reasonably sized batches instead of reacting + * immediately to each new request. The wait time between batches is dynamically adjusted up or + * down to try to balance responsiveness against wasted thread run time. + * + * If the wait time becomes long enough, the queue will become dormant and must be explicitly + * awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel + * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a + * wakeup of the worker thread. + * + * When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to + * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before + * going to sleep is done inside the wait_event_interruptible() macro, after a point where one or + * more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel + * queue's "next" field update isn't visible yet to make the entry accessible, its existence will + * kick the worker thread out of dormant mode and back into timer-based mode. + * + * Unbatched requests are used to communicate between different zone threads and will also cause + * the queue to awaken immediately. + */ + +enum { + NANOSECOND = 1, + MICROSECOND = 1000 * NANOSECOND, + MILLISECOND = 1000 * MICROSECOND, + DEFAULT_WAIT_TIME = 20 * MICROSECOND, + MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2, + MAXIMUM_WAIT_TIME = MILLISECOND, + MINIMUM_BATCH = 32, + MAXIMUM_BATCH = 64, +}; + +struct uds_request_queue { + /* Wait queue for synchronizing producers and consumer */ + struct wait_queue_head wait_head; + /* Function to process a request */ + uds_request_queue_processor_fn processor; + /* Queue of new incoming requests */ + struct funnel_queue *main_queue; + /* Queue of old requests to retry */ + struct funnel_queue *retry_queue; + /* The thread id of the worker thread */ + struct thread *thread; + /* True if the worker was started */ + bool started; + /* When true, requests can be enqueued */ + bool running; + /* A flag set when the worker is waiting without a timeout */ + atomic_t dormant; +}; + +static inline struct uds_request *poll_queues(struct uds_request_queue *queue) +{ + struct funnel_queue_entry *entry; + + entry = vdo_funnel_queue_poll(queue->retry_queue); + if (entry != NULL) + return container_of(entry, struct uds_request, queue_link); + + entry = vdo_funnel_queue_poll(queue->main_queue); + if (entry != NULL) + return container_of(entry, struct uds_request, queue_link); + + return NULL; +} + +static inline bool are_queues_idle(struct uds_request_queue *queue) +{ + return vdo_is_funnel_queue_idle(queue->retry_queue) && + vdo_is_funnel_queue_idle(queue->main_queue); +} + +/* + * Determine if there is a next request to process, and return it if there is. Also return flags + * indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether + * the thread did sleep before returning a new request. + */ +static inline bool dequeue_request(struct uds_request_queue *queue, + struct uds_request **request_ptr, bool *waited_ptr) +{ + struct uds_request *request = poll_queues(queue); + + if (request != NULL) { + *request_ptr = request; + return true; + } + + if (!READ_ONCE(queue->running)) { + /* Wake the worker thread so it can exit. */ + *request_ptr = NULL; + return true; + } + + *request_ptr = NULL; + *waited_ptr = true; + return false; +} + +static void wait_for_request(struct uds_request_queue *queue, bool dormant, + unsigned long timeout, struct uds_request **request, + bool *waited) +{ + if (dormant) { + wait_event_interruptible(queue->wait_head, + (dequeue_request(queue, request, waited) || + !are_queues_idle(queue))); + return; + } + + wait_event_interruptible_hrtimeout(queue->wait_head, + dequeue_request(queue, request, waited), + ns_to_ktime(timeout)); +} + +static void request_queue_worker(void *arg) +{ + struct uds_request_queue *queue = arg; + struct uds_request *request = NULL; + unsigned long time_batch = DEFAULT_WAIT_TIME; + bool dormant = atomic_read(&queue->dormant); + bool waited = false; + long current_batch = 0; + + for (;;) { + wait_for_request(queue, dormant, time_batch, &request, &waited); + if (likely(request != NULL)) { + current_batch++; + queue->processor(request); + } else if (!READ_ONCE(queue->running)) { + break; + } + + if (dormant) { + /* + * The queue has been roused from dormancy. Clear the flag so enqueuers can + * stop broadcasting. No fence is needed for this transition. + */ + atomic_set(&queue->dormant, false); + dormant = false; + time_batch = DEFAULT_WAIT_TIME; + } else if (waited) { + /* + * We waited for this request to show up. Adjust the wait time to smooth + * out the batch size. + */ + if (current_batch < MINIMUM_BATCH) { + /* + * If the last batch of requests was too small, increase the wait + * time. + */ + time_batch += time_batch / 4; + if (time_batch >= MAXIMUM_WAIT_TIME) { + atomic_set(&queue->dormant, true); + dormant = true; + } + } else if (current_batch > MAXIMUM_BATCH) { + /* + * If the last batch of requests was too large, decrease the wait + * time. + */ + time_batch -= time_batch / 4; + if (time_batch < MINIMUM_WAIT_TIME) + time_batch = MINIMUM_WAIT_TIME; + } + current_batch = 0; + } + } + + /* + * Ensure that we process any remaining requests that were enqueued before trying to shut + * down. The corresponding write barrier is in uds_request_queue_finish(). + */ + smp_rmb(); + while ((request = poll_queues(queue)) != NULL) + queue->processor(request); +} + +int uds_make_request_queue(const char *queue_name, + uds_request_queue_processor_fn processor, + struct uds_request_queue **queue_ptr) +{ + int result; + struct uds_request_queue *queue; + + result = vdo_allocate(1, struct uds_request_queue, __func__, &queue); + if (result != VDO_SUCCESS) + return result; + + queue->processor = processor; + queue->running = true; + atomic_set(&queue->dormant, false); + init_waitqueue_head(&queue->wait_head); + + result = vdo_make_funnel_queue(&queue->main_queue); + if (result != VDO_SUCCESS) { + uds_request_queue_finish(queue); + return result; + } + + result = vdo_make_funnel_queue(&queue->retry_queue); + if (result != VDO_SUCCESS) { + uds_request_queue_finish(queue); + return result; + } + + result = vdo_create_thread(request_queue_worker, queue, queue_name, + &queue->thread); + if (result != VDO_SUCCESS) { + uds_request_queue_finish(queue); + return result; + } + + queue->started = true; + *queue_ptr = queue; + return UDS_SUCCESS; +} + +static inline void wake_up_worker(struct uds_request_queue *queue) +{ + if (wq_has_sleeper(&queue->wait_head)) + wake_up(&queue->wait_head); +} + +void uds_request_queue_enqueue(struct uds_request_queue *queue, + struct uds_request *request) +{ + struct funnel_queue *sub_queue; + bool unbatched = request->unbatched; + + sub_queue = request->requeued ? queue->retry_queue : queue->main_queue; + vdo_funnel_queue_put(sub_queue, &request->queue_link); + + /* + * We must wake the worker thread when it is dormant. A read fence isn't needed here since + * we know the queue operation acts as one. + */ + if (atomic_read(&queue->dormant) || unbatched) + wake_up_worker(queue); +} + +void uds_request_queue_finish(struct uds_request_queue *queue) +{ + if (queue == NULL) + return; + + /* + * This memory barrier ensures that any requests we queued will be seen. The point is that + * when dequeue_request() sees the following update to the running flag, it will also be + * able to see any change we made to a next field in the funnel queue entry. The + * corresponding read barrier is in request_queue_worker(). + */ + smp_wmb(); + WRITE_ONCE(queue->running, false); + + if (queue->started) { + wake_up_worker(queue); + vdo_join_threads(queue->thread); + } + + vdo_free_funnel_queue(queue->main_queue); + vdo_free_funnel_queue(queue->retry_queue); + vdo_free(queue); +} |