diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 20:36:56 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 20:36:56 +0000 |
commit | 51de1d8436100f725f3576aefa24a2bd2057bc28 (patch) | |
tree | c6d1d5264b6d40a8d7ca34129f36b7d61e188af3 /misc/dispatch.c | |
parent | Initial commit. (diff) | |
download | mpv-51de1d8436100f725f3576aefa24a2bd2057bc28.tar.xz mpv-51de1d8436100f725f3576aefa24a2bd2057bc28.zip |
Adding upstream version 0.37.0.upstream/0.37.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'misc/dispatch.c')
-rw-r--r-- | misc/dispatch.c | 417 |
1 files changed, 417 insertions, 0 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c new file mode 100644 index 0000000..6fd9fe1 --- /dev/null +++ b/misc/dispatch.c @@ -0,0 +1,417 @@ +/* + * This file is part of mpv. + * + * mpv is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * mpv is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with mpv. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <stdbool.h> +#include <assert.h> + +#include "common/common.h" +#include "osdep/threads.h" +#include "osdep/timer.h" + +#include "dispatch.h" + +struct mp_dispatch_queue { + struct mp_dispatch_item *head, *tail; + mp_mutex lock; + mp_cond cond; + void (*wakeup_fn)(void *wakeup_ctx); + void *wakeup_ctx; + void (*onlock_fn)(void *onlock_ctx); + void *onlock_ctx; + // Time at which mp_dispatch_queue_process() should return. + int64_t wait; + // Make mp_dispatch_queue_process() exit if it's idle. + bool interrupted; + // The target thread is in mp_dispatch_queue_process() (and either idling, + // locked, or running a dispatch callback). + bool in_process; + mp_thread_id in_process_thread_id; + // The target thread is in mp_dispatch_queue_process(), and currently + // something has exclusive access to it (e.g. running a dispatch callback, + // or a different thread got it with mp_dispatch_lock()). + bool locked; + // A mp_dispatch_lock() call is requesting an exclusive lock. + size_t lock_requests; + // locked==true is due to a mp_dispatch_lock() call (for debugging). + bool locked_explicit; + mp_thread_id locked_explicit_thread_id; +}; + +struct mp_dispatch_item { + mp_dispatch_fn fn; + void *fn_data; + bool asynchronous; + bool mergeable; + bool completed; + struct mp_dispatch_item *next; +}; + +static void queue_dtor(void *p) +{ + struct mp_dispatch_queue *queue = p; + assert(!queue->head); + assert(!queue->in_process); + assert(!queue->lock_requests); + assert(!queue->locked); + mp_cond_destroy(&queue->cond); + mp_mutex_destroy(&queue->lock); +} + +// A dispatch queue lets other threads run callbacks in a target thread. +// The target thread is the thread which calls mp_dispatch_queue_process(). +// Free the dispatch queue with talloc_free(). At the time of destruction, +// the queue must be empty. The easiest way to guarantee this is to +// terminate all potential senders, then call mp_dispatch_run() with a +// function that e.g. makes the target thread exit, then mp_thread_join() the +// target thread, and finally destroy the queue. Another way is calling +// mp_dispatch_queue_process() after terminating all potential senders, and +// then destroying the queue. +struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent) +{ + struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue); + *queue = (struct mp_dispatch_queue){0}; + talloc_set_destructor(queue, queue_dtor); + mp_mutex_init(&queue->lock); + mp_cond_init(&queue->cond); + return queue; +} + +// Set a custom function that should be called to guarantee that the target +// thread wakes up. This is intended for use with code that needs to block +// on non-pthread primitives, such as e.g. select(). In the case of select(), +// the wakeup_fn could for example write a byte into a "wakeup" pipe in order +// to unblock the select(). The wakeup_fn is called from the dispatch queue +// when there are new dispatch items, and the target thread should then enter +// mp_dispatch_queue_process() as soon as possible. +// Note that this setter does not do internal synchronization, so you must set +// it before other threads see it. +void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue, + void (*wakeup_fn)(void *wakeup_ctx), + void *wakeup_ctx) +{ + queue->wakeup_fn = wakeup_fn; + queue->wakeup_ctx = wakeup_ctx; +} + +// Set a function that will be called by mp_dispatch_lock() if the target thread +// is not calling mp_dispatch_queue_process() right now. This is an obscure, +// optional mechanism to make a worker thread react to external events more +// quickly. The idea is that the callback will make the worker thread to stop +// doing whatever (e.g. by setting a flag), and call mp_dispatch_queue_process() +// in order to let mp_dispatch_lock() calls continue sooner. +// Like wakeup_fn, this setter does no internal synchronization, and you must +// not access the dispatch queue itself from the callback. +void mp_dispatch_set_onlock_fn(struct mp_dispatch_queue *queue, + void (*onlock_fn)(void *onlock_ctx), + void *onlock_ctx) +{ + queue->onlock_fn = onlock_fn; + queue->onlock_ctx = onlock_ctx; +} + +static void mp_dispatch_append(struct mp_dispatch_queue *queue, + struct mp_dispatch_item *item) +{ + mp_mutex_lock(&queue->lock); + if (item->mergeable) { + for (struct mp_dispatch_item *cur = queue->head; cur; cur = cur->next) { + if (cur->mergeable && cur->fn == item->fn && + cur->fn_data == item->fn_data) + { + talloc_free(item); + mp_mutex_unlock(&queue->lock); + return; + } + } + } + + if (queue->tail) { + queue->tail->next = item; + } else { + queue->head = item; + } + queue->tail = item; + + // Wake up the main thread; note that other threads might wait on this + // condition for reasons, so broadcast the condition. + mp_cond_broadcast(&queue->cond); + // No wakeup callback -> assume mp_dispatch_queue_process() needs to be + // interrupted instead. + if (!queue->wakeup_fn) + queue->interrupted = true; + mp_mutex_unlock(&queue->lock); + + if (queue->wakeup_fn) + queue->wakeup_fn(queue->wakeup_ctx); +} + +// Enqueue a callback to run it on the target thread asynchronously. The target +// thread will run fn(fn_data) as soon as it enter mp_dispatch_queue_process. +// Note that mp_dispatch_enqueue() will usually return long before that happens. +// It's up to the user to signal completion of the callback. It's also up to +// the user to guarantee that the context fn_data has correct lifetime, i.e. +// lives until the callback is run, and is freed after that. +void mp_dispatch_enqueue(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data) +{ + struct mp_dispatch_item *item = talloc_ptrtype(NULL, item); + *item = (struct mp_dispatch_item){ + .fn = fn, + .fn_data = fn_data, + .asynchronous = true, + }; + mp_dispatch_append(queue, item); +} + +// Like mp_dispatch_enqueue(), but the queue code will call talloc_free(fn_data) +// after the fn callback has been run. (The callback could trivially do that +// itself, but it makes it easier to implement synchronous and asynchronous +// requests with the same callback implementation.) +void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data) +{ + struct mp_dispatch_item *item = talloc_ptrtype(NULL, item); + *item = (struct mp_dispatch_item){ + .fn = fn, + .fn_data = talloc_steal(item, fn_data), + .asynchronous = true, + }; + mp_dispatch_append(queue, item); +} + +// Like mp_dispatch_enqueue(), but +void mp_dispatch_enqueue_notify(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data) +{ + struct mp_dispatch_item *item = talloc_ptrtype(NULL, item); + *item = (struct mp_dispatch_item){ + .fn = fn, + .fn_data = fn_data, + .mergeable = true, + .asynchronous = true, + }; + mp_dispatch_append(queue, item); +} + +// Remove already queued item. Only items enqueued with the following functions +// can be canceled: +// - mp_dispatch_enqueue() +// - mp_dispatch_enqueue_notify() +// Items which were enqueued, and which are currently executing, can not be +// canceled anymore. This function is mostly for being called from the same +// context as mp_dispatch_queue_process(), where the "currently executing" case +// can be excluded. +void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data) +{ + mp_mutex_lock(&queue->lock); + struct mp_dispatch_item **pcur = &queue->head; + queue->tail = NULL; + while (*pcur) { + struct mp_dispatch_item *cur = *pcur; + if (cur->fn == fn && cur->fn_data == fn_data) { + *pcur = cur->next; + talloc_free(cur); + } else { + queue->tail = cur; + pcur = &cur->next; + } + } + mp_mutex_unlock(&queue->lock); +} + +// Run fn(fn_data) on the target thread synchronously. This function enqueues +// the callback and waits until the target thread is done doing this. +// This is redundant to calling the function inside mp_dispatch_[un]lock(), +// but can be helpful with code that relies on TLS (such as OpenGL). +void mp_dispatch_run(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data) +{ + struct mp_dispatch_item item = { + .fn = fn, + .fn_data = fn_data, + }; + mp_dispatch_append(queue, &item); + + mp_mutex_lock(&queue->lock); + while (!item.completed) + mp_cond_wait(&queue->cond, &queue->lock); + mp_mutex_unlock(&queue->lock); +} + +// Process any outstanding dispatch items in the queue. This also handles +// suspending or locking the this thread from another thread via +// mp_dispatch_lock(). +// The timeout specifies the minimum wait time. The actual time spent in this +// function can be much higher if the suspending/locking functions are used, or +// if executing the dispatch items takes time. On the other hand, this function +// can return much earlier than the timeout due to sporadic wakeups. +// Note that this will strictly return only after: +// - timeout has passed, +// - all queue items were processed, +// - the possibly acquired lock has been released +// It's possible to cancel the timeout by calling mp_dispatch_interrupt(). +// Reentrant calls are not allowed. There can be only 1 thread calling +// mp_dispatch_queue_process() at a time. In addition, mp_dispatch_lock() can +// not be called from a thread that is calling mp_dispatch_queue_process() (i.e. +// no enqueued callback can call the lock/unlock functions). +void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) +{ + mp_mutex_lock(&queue->lock); + queue->wait = timeout > 0 ? mp_time_ns_add(mp_time_ns(), timeout) : 0; + assert(!queue->in_process); // recursion not allowed + queue->in_process = true; + queue->in_process_thread_id = mp_thread_current_id(); + // Wake up thread which called mp_dispatch_lock(). + if (queue->lock_requests) + mp_cond_broadcast(&queue->cond); + while (1) { + if (queue->lock_requests) { + // Block due to something having called mp_dispatch_lock(). + mp_cond_wait(&queue->cond, &queue->lock); + } else if (queue->head) { + struct mp_dispatch_item *item = queue->head; + queue->head = item->next; + if (!queue->head) + queue->tail = NULL; + item->next = NULL; + // Unlock, because we want to allow other threads to queue items + // while the dispatch item is processed. + // At the same time, we must prevent other threads from returning + // from mp_dispatch_lock(), which is done by locked=true. + assert(!queue->locked); + queue->locked = true; + mp_mutex_unlock(&queue->lock); + + item->fn(item->fn_data); + + mp_mutex_lock(&queue->lock); + assert(queue->locked); + queue->locked = false; + // Wakeup mp_dispatch_run(), also mp_dispatch_lock(). + mp_cond_broadcast(&queue->cond); + if (item->asynchronous) { + talloc_free(item); + } else { + item->completed = true; + } + } else if (queue->wait > 0 && !queue->interrupted) { + if (mp_cond_timedwait_until(&queue->cond, &queue->lock, queue->wait)) + queue->wait = 0; + } else { + break; + } + } + assert(!queue->locked); + queue->in_process = false; + queue->interrupted = false; + mp_mutex_unlock(&queue->lock); +} + +// If the queue is inside of mp_dispatch_queue_process(), make it return as +// soon as all work items have been run, without waiting for the timeout. This +// does not make it return early if it's blocked by a mp_dispatch_lock(). +// If the queue is _not_ inside of mp_dispatch_queue_process(), make the next +// call of it use a timeout of 0 (this is useful behavior if you need to +// wakeup the main thread from another thread in a race free way). +void mp_dispatch_interrupt(struct mp_dispatch_queue *queue) +{ + mp_mutex_lock(&queue->lock); + queue->interrupted = true; + mp_cond_broadcast(&queue->cond); + mp_mutex_unlock(&queue->lock); +} + +// If a mp_dispatch_queue_process() call is in progress, then adjust the maximum +// time it blocks due to its timeout argument. Otherwise does nothing. (It +// makes sense to call this in code that uses both mp_dispatch_[un]lock() and +// a normal event loop.) +// Does not work correctly with queues that have mp_dispatch_set_wakeup_fn() +// called on them, because this implies you actually do waiting via +// mp_dispatch_queue_process(), while wakeup callbacks are used when you need +// to wait in external APIs. +void mp_dispatch_adjust_timeout(struct mp_dispatch_queue *queue, int64_t until) +{ + mp_mutex_lock(&queue->lock); + if (queue->in_process && queue->wait > until) { + queue->wait = until; + mp_cond_broadcast(&queue->cond); + } + mp_mutex_unlock(&queue->lock); +} + +// Grant exclusive access to the target thread's state. While this is active, +// no other thread can return from mp_dispatch_lock() (i.e. it behaves like +// a pthread mutex), and no other thread can get dispatch items completed. +// Other threads can still queue asynchronous dispatch items without waiting, +// and the mutex behavior applies to this function and dispatch callbacks only. +// The lock is non-recursive, and dispatch callback functions can be thought of +// already holding the dispatch lock. +void mp_dispatch_lock(struct mp_dispatch_queue *queue) +{ + mp_mutex_lock(&queue->lock); + // Must not be called recursively from dispatched callbacks. + if (queue->in_process) + assert(!mp_thread_id_equal(queue->in_process_thread_id, mp_thread_current_id())); + // Must not be called recursively at all. + if (queue->locked_explicit) + assert(!mp_thread_id_equal(queue->locked_explicit_thread_id, mp_thread_current_id())); + queue->lock_requests += 1; + // And now wait until the target thread gets "trapped" within the + // mp_dispatch_queue_process() call, which will mean we get exclusive + // access to the target's thread state. + if (queue->onlock_fn) + queue->onlock_fn(queue->onlock_ctx); + while (!queue->in_process) { + mp_mutex_unlock(&queue->lock); + if (queue->wakeup_fn) + queue->wakeup_fn(queue->wakeup_ctx); + mp_mutex_lock(&queue->lock); + if (queue->in_process) + break; + mp_cond_wait(&queue->cond, &queue->lock); + } + // Wait until we can get the lock. + while (!queue->in_process || queue->locked) + mp_cond_wait(&queue->cond, &queue->lock); + // "Lock". + assert(queue->lock_requests); + assert(!queue->locked); + assert(!queue->locked_explicit); + queue->locked = true; + queue->locked_explicit = true; + queue->locked_explicit_thread_id = mp_thread_current_id(); + mp_mutex_unlock(&queue->lock); +} + +// Undo mp_dispatch_lock(). +void mp_dispatch_unlock(struct mp_dispatch_queue *queue) +{ + mp_mutex_lock(&queue->lock); + assert(queue->locked); + // Must be called after a mp_dispatch_lock(), from the same thread. + assert(queue->locked_explicit); + assert(mp_thread_id_equal(queue->locked_explicit_thread_id, mp_thread_current_id())); + // "Unlock". + queue->locked = false; + queue->locked_explicit = false; + queue->lock_requests -= 1; + // Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s. + // (Would be nice to wake up only 1 other locker if lock_requests>0.) + mp_cond_broadcast(&queue->cond); + mp_mutex_unlock(&queue->lock); +} |