// SPDX-License-Identifier: GPL-2.0-only /* * Copyright 2023 Red Hat */ #include "funnel-requestqueue.h" #include #include #include #include "funnel-queue.h" #include "logger.h" #include "memory-alloc.h" #include "thread-utils.h" /* * This queue will attempt to handle requests in reasonably sized batches instead of reacting * immediately to each new request. The wait time between batches is dynamically adjusted up or * down to try to balance responsiveness against wasted thread run time. * * If the wait time becomes long enough, the queue will become dormant and must be explicitly * awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a * wakeup of the worker thread. * * When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before * going to sleep is done inside the wait_event_interruptible() macro, after a point where one or * more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel * queue's "next" field update isn't visible yet to make the entry accessible, its existence will * kick the worker thread out of dormant mode and back into timer-based mode. * * Unbatched requests are used to communicate between different zone threads and will also cause * the queue to awaken immediately. */ enum { NANOSECOND = 1, MICROSECOND = 1000 * NANOSECOND, MILLISECOND = 1000 * MICROSECOND, DEFAULT_WAIT_TIME = 20 * MICROSECOND, MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2, MAXIMUM_WAIT_TIME = MILLISECOND, MINIMUM_BATCH = 32, MAXIMUM_BATCH = 64, }; struct uds_request_queue { /* Wait queue for synchronizing producers and consumer */ struct wait_queue_head wait_head; /* Function to process a request */ uds_request_queue_processor_fn processor; /* Queue of new incoming requests */ struct funnel_queue *main_queue; /* Queue of old requests to retry */ struct funnel_queue *retry_queue; /* The thread id of the worker thread */ struct thread *thread; /* True if the worker was started */ bool started; /* When true, requests can be enqueued */ bool running; /* A flag set when the worker is waiting without a timeout */ atomic_t dormant; }; static inline struct uds_request *poll_queues(struct uds_request_queue *queue) { struct funnel_queue_entry *entry; entry = vdo_funnel_queue_poll(queue->retry_queue); if (entry != NULL) return container_of(entry, struct uds_request, queue_link); entry = vdo_funnel_queue_poll(queue->main_queue); if (entry != NULL) return container_of(entry, struct uds_request, queue_link); return NULL; } static inline bool are_queues_idle(struct uds_request_queue *queue) { return vdo_is_funnel_queue_idle(queue->retry_queue) && vdo_is_funnel_queue_idle(queue->main_queue); } /* * Determine if there is a next request to process, and return it if there is. Also return flags * indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether * the thread did sleep before returning a new request. */ static inline bool dequeue_request(struct uds_request_queue *queue, struct uds_request **request_ptr, bool *waited_ptr) { struct uds_request *request = poll_queues(queue); if (request != NULL) { *request_ptr = request; return true; } if (!READ_ONCE(queue->running)) { /* Wake the worker thread so it can exit. */ *request_ptr = NULL; return true; } *request_ptr = NULL; *waited_ptr = true; return false; } static void wait_for_request(struct uds_request_queue *queue, bool dormant, unsigned long timeout, struct uds_request **request, bool *waited) { if (dormant) { wait_event_interruptible(queue->wait_head, (dequeue_request(queue, request, waited) || !are_queues_idle(queue))); return; } wait_event_interruptible_hrtimeout(queue->wait_head, dequeue_request(queue, request, waited), ns_to_ktime(timeout)); } static void request_queue_worker(void *arg) { struct uds_request_queue *queue = arg; struct uds_request *request = NULL; unsigned long time_batch = DEFAULT_WAIT_TIME; bool dormant = atomic_read(&queue->dormant); bool waited = false; long current_batch = 0; for (;;) { wait_for_request(queue, dormant, time_batch, &request, &waited); if (likely(request != NULL)) { current_batch++; queue->processor(request); } else if (!READ_ONCE(queue->running)) { break; } if (dormant) { /* * The queue has been roused from dormancy. Clear the flag so enqueuers can * stop broadcasting. No fence is needed for this transition. */ atomic_set(&queue->dormant, false); dormant = false; time_batch = DEFAULT_WAIT_TIME; } else if (waited) { /* * We waited for this request to show up. Adjust the wait time to smooth * out the batch size. */ if (current_batch < MINIMUM_BATCH) { /* * If the last batch of requests was too small, increase the wait * time. */ time_batch += time_batch / 4; if (time_batch >= MAXIMUM_WAIT_TIME) { atomic_set(&queue->dormant, true); dormant = true; } } else if (current_batch > MAXIMUM_BATCH) { /* * If the last batch of requests was too large, decrease the wait * time. */ time_batch -= time_batch / 4; if (time_batch < MINIMUM_WAIT_TIME) time_batch = MINIMUM_WAIT_TIME; } current_batch = 0; } } /* * Ensure that we process any remaining requests that were enqueued before trying to shut * down. The corresponding write barrier is in uds_request_queue_finish(). */ smp_rmb(); while ((request = poll_queues(queue)) != NULL) queue->processor(request); } int uds_make_request_queue(const char *queue_name, uds_request_queue_processor_fn processor, struct uds_request_queue **queue_ptr) { int result; struct uds_request_queue *queue; result = vdo_allocate(1, struct uds_request_queue, __func__, &queue); if (result != VDO_SUCCESS) return result; queue->processor = processor; queue->running = true; atomic_set(&queue->dormant, false); init_waitqueue_head(&queue->wait_head); result = vdo_make_funnel_queue(&queue->main_queue); if (result != VDO_SUCCESS) { uds_request_queue_finish(queue); return result; } result = vdo_make_funnel_queue(&queue->retry_queue); if (result != VDO_SUCCESS) { uds_request_queue_finish(queue); return result; } result = vdo_create_thread(request_queue_worker, queue, queue_name, &queue->thread); if (result != VDO_SUCCESS) { uds_request_queue_finish(queue); return result; } queue->started = true; *queue_ptr = queue; return UDS_SUCCESS; } static inline void wake_up_worker(struct uds_request_queue *queue) { if (wq_has_sleeper(&queue->wait_head)) wake_up(&queue->wait_head); } void uds_request_queue_enqueue(struct uds_request_queue *queue, struct uds_request *request) { struct funnel_queue *sub_queue; bool unbatched = request->unbatched; sub_queue = request->requeued ? queue->retry_queue : queue->main_queue; vdo_funnel_queue_put(sub_queue, &request->queue_link); /* * We must wake the worker thread when it is dormant. A read fence isn't needed here since * we know the queue operation acts as one. */ if (atomic_read(&queue->dormant) || unbatched) wake_up_worker(queue); } void uds_request_queue_finish(struct uds_request_queue *queue) { if (queue == NULL) return; /* * This memory barrier ensures that any requests we queued will be seen. The point is that * when dequeue_request() sees the following update to the running flag, it will also be * able to see any change we made to a next field in the funnel queue entry. The * corresponding read barrier is in request_queue_worker(). */ smp_wmb(); WRITE_ONCE(queue->running, false); if (queue->started) { wake_up_worker(queue); vdo_join_threads(queue->thread); } vdo_free_funnel_queue(queue->main_queue); vdo_free_funnel_queue(queue->retry_queue); vdo_free(queue); }