diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 17:57:27 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 17:57:27 +0000 |
commit | 6f0f7d1b40a8fa8d46a2d6f4317600001cdbbb18 (patch) | |
tree | d423850ae901365e582137bdf2b5cbdffd7ca266 /lib/gs-worker-thread.c | |
parent | Initial commit. (diff) | |
download | gnome-software-6f0f7d1b40a8fa8d46a2d6f4317600001cdbbb18.tar.xz gnome-software-6f0f7d1b40a8fa8d46a2d6f4317600001cdbbb18.zip |
Adding upstream version 43.5.upstream/43.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | lib/gs-worker-thread.c | 425 |
1 files changed, 425 insertions, 0 deletions
diff --git a/lib/gs-worker-thread.c b/lib/gs-worker-thread.c new file mode 100644 index 0000000..6e6686d --- /dev/null +++ b/lib/gs-worker-thread.c @@ -0,0 +1,425 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- + * vi:set noexpandtab tabstop=8 shiftwidth=8: + * + * Copyright (C) 2021 Endless OS Foundation LLC + * + * Author: Philip Withnall <pwithnall@endlessos.org> + * + * SPDX-License-Identifier: GPL-2.0+ + */ + +/** + * SECTION:gs-worker-thread + * @short_description: A worker thread which executes queued #GTasks until stopped + * + * #GsWorkerThread is a thread-safe wrapper around a #GTask queue and a single + * worker thread which executes tasks on that queue. + * + * Tasks can be added to the queue using gs_worker_thread_queue(). The worker + * thread (which is created when #GsWorkerThread is constructed) will execute + * them in (priority, queue order) order. Each #GTaskThreadFunc is responsible + * for calling `g_task_return_*()` on its #GTask to complete that task. + * + * The priority passed to gs_worker_thread_queue() will be used to adjust the + * worker thread’s I/O priority (using `ioprio_set()`) when executing that task. + * + * It is intended that gs_worker_thread_queue() is an alternative to using + * g_task_run_in_thread(). g_task_run_in_thread() queues tasks into a single + * process-wide thread pool, so they are mixed in with other tasks, and it can + * become hard to ensure the thread pool isn’t overwhelmed and that tasks are + * executed in the right order. + * + * The worker thread will continue executing tasks until + * gs_worker_thread_shutdown_async() is called. This must be called before the + * final reference to the #GsWorkerThread is dropped. + * + * Since: 42 + */ + +#include "config.h" + +#include <glib.h> +#include <glib-object.h> + +#include "gs-ioprio.h" +#include "gs-worker-thread.h" + +typedef enum { + GS_WORKER_THREAD_STATE_RUNNING = 0, + GS_WORKER_THREAD_STATE_SHUTTING_DOWN = 1, + GS_WORKER_THREAD_STATE_SHUT_DOWN = 2, +} GsWorkerThreadState; + +struct _GsWorkerThread +{ + GObject parent; + + gchar *name; /* (nullable) (owned) */ + + GsWorkerThreadState worker_state; /* (atomic) */ + GMainContext *worker_context; /* (owned); may be NULL before setup or after shutdown */ + GThread *worker_thread; /* (atomic); may be NULL before setup or after shutdown */ +}; + +typedef enum { + PROP_NAME = 1, +} GsWorkerThreadProperty; + +static GParamSpec *props[PROP_NAME + 1] = { NULL, }; + +G_DEFINE_TYPE (GsWorkerThread, gs_worker_thread, G_TYPE_OBJECT) + +static void +gs_worker_thread_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + GsWorkerThread *self = GS_WORKER_THREAD (object); + + switch ((GsWorkerThreadProperty) prop_id) { + case PROP_NAME: + g_value_set_string (value, self->name); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gs_worker_thread_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + GsWorkerThread *self = GS_WORKER_THREAD (object); + + switch ((GsWorkerThreadProperty) prop_id) { + case PROP_NAME: + /* Construct only */ + g_assert (self->name == NULL); + self->name = g_value_dup_string (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gs_worker_thread_dispose (GObject *object) +{ + GsWorkerThread *self = GS_WORKER_THREAD (object); + + /* Should have stopped by now. */ + g_assert (self->worker_thread == NULL); + + g_clear_pointer (&self->name, g_free); + g_clear_pointer (&self->worker_context, g_main_context_unref); + + G_OBJECT_CLASS (gs_worker_thread_parent_class)->dispose (object); +} + +static gpointer thread_cb (gpointer data); + +static void +gs_worker_thread_constructed (GObject *object) +{ + GsWorkerThread *self = GS_WORKER_THREAD (object); + + G_OBJECT_CLASS (gs_worker_thread_parent_class)->constructed (object); + + /* Start up a worker thread and its #GMainContext. The worker will run + * and process events on @worker_context until @worker_state changes + * from %GS_WORKER_THREAD_STATE_RUNNING. */ + self->worker_state = GS_WORKER_THREAD_STATE_RUNNING; + self->worker_context = g_main_context_new (); + self->worker_thread = g_thread_new (self->name, thread_cb, self); +} + +static void +gs_worker_thread_class_init (GsWorkerThreadClass *klass) +{ + GObjectClass *object_class = G_OBJECT_CLASS (klass); + + object_class->constructed = gs_worker_thread_constructed; + object_class->get_property = gs_worker_thread_get_property; + object_class->set_property = gs_worker_thread_set_property; + object_class->dispose = gs_worker_thread_dispose; + + /** + * GsWorkerThread:name: (not nullable): + * + * Name for the worker thread to use in debug output. This must be set. + * + * Since: 42 + */ + props[PROP_NAME] = + g_param_spec_string ("name", + "Name", + "Name for the worker thread to use in debug output.", + NULL, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS | G_PARAM_EXPLICIT_NOTIFY); + + g_object_class_install_properties (object_class, G_N_ELEMENTS (props), props); +} + +static gpointer +thread_cb (gpointer data) +{ + GsWorkerThread *self = GS_WORKER_THREAD (data); + g_autoptr(GMainContextPusher) pusher = g_main_context_pusher_new (self->worker_context); + + while (g_atomic_int_get (&self->worker_state) != GS_WORKER_THREAD_STATE_SHUT_DOWN) + g_main_context_iteration (self->worker_context, TRUE); + + return NULL; +} + +static void +gs_worker_thread_init (GsWorkerThread *self) +{ +} + +/** + * gs_worker_thread_new: + * @name: (not nullable): name for the worker thread + * + * Create and start a new #GsWorkerThread. + * + * @name will be used to set the thread name and in debug output. + * + * Returns: (transfer full): a new #GsWorkerThread + * Since: 42 + */ +GsWorkerThread * +gs_worker_thread_new (const gchar *name) +{ + g_return_val_if_fail (name != NULL, NULL); + + return g_object_new (GS_TYPE_WORKER_THREAD, + "name", name, + NULL); +} + +/* Essentially a wrapper around these elements to avoid the caller having to + * return `G_SOURCE_REMOVE` from their `work_func` every time. */ +typedef struct { + GTaskThreadFunc work_func; + GTask *task; /* (owned) */ + gint priority; +} WorkData; + +static void +work_data_free (WorkData *data) +{ + g_clear_object (&data->task); + g_free (data); +} + +G_DEFINE_AUTOPTR_CLEANUP_FUNC (WorkData, work_data_free) + +static gboolean +work_run_cb (gpointer _data) +{ + WorkData *data = _data; + GTask *task = data->task; + gpointer source_object = g_task_get_source_object (task); + gpointer task_data = g_task_get_task_data (task); + GCancellable *cancellable = g_task_get_cancellable (task); + + /* Set the I/O priority of the thread to match the priority of the + * task. */ + gs_ioprio_set (data->priority); + + data->work_func (task, source_object, task_data, cancellable); + + return G_SOURCE_REMOVE; +} + +/** + * gs_worker_thread_queue: + * @self: a #GsWorkerThread + * @priority: (default G_PRIORITY_DEFAULT): priority to queue the task at, + * typically #G_PRIORITY_DEFAULT + * @work_func: (not nullable): function to run the task + * @task: (transfer full) (not nullable): the #GTask containing context data to + * pass to @work_func + * + * Queue @task to be run in the worker thread at the given @priority. + * + * This function takes ownership of @task. + * + * @priority sets the order of the task in the queue, and also affects the I/O + * priority of the worker thread when the task is executed — high priorities + * result in a high I/O priority, low priorities result in an idle I/O priority, + * as per `ioprio_set()`. + * + * When the task is run, @work_func will be executed and passed @task and the + * source object, task data and cancellable set on @task. + * + * @work_func is responsible for calling `g_task_return_*()` on @task once the + * task is complete. + * + * If a task is cancelled using its #GCancellable after it’s queued to the + * #GsWorkerThread, @work_func will still be executed. @work_func is responsible + * for checking whether the #GCancellable has been cancelled. + * + * It is an error to call this function after gs_worker_thread_shutdown_async() + * has called. + * + * Since: 42 + */ +void +gs_worker_thread_queue (GsWorkerThread *self, + gint priority, + GTaskThreadFunc work_func, + GTask *task) +{ + g_autoptr(WorkData) data = NULL; + + g_return_if_fail (GS_IS_WORKER_THREAD (self)); + g_return_if_fail (work_func != NULL); + g_return_if_fail (G_IS_TASK (task)); + + g_assert (g_atomic_int_get (&self->worker_state) == GS_WORKER_THREAD_STATE_RUNNING || + g_task_get_source_tag (task) == gs_worker_thread_shutdown_async); + + data = g_new0 (WorkData, 1); + data->work_func = work_func; + data->task = g_steal_pointer (&task); + data->priority = priority; + + g_main_context_invoke_full (self->worker_context, priority, + work_run_cb, g_steal_pointer (&data), (GDestroyNotify) work_data_free); +} + +/** + * gs_worker_thread_is_in_worker_context: + * @self: a #GsWorkerThread + * + * Returns whether the calling thread is the worker thread. + * + * This is intended to be used as a precondition check to ensure that worker + * code is not accidentally run from the wrong thread. + * + * |[ + * static void + * do_work (MyPlugin *self) + * { + * g_assert (gs_worker_thread_is_in_worker_context (self->worker_thread)); + * + * // do some work + * } + * ]| + * + * Returns: %TRUE if running in the worker context, %FALSE otherwise + * Since: 42 + */ +gboolean +gs_worker_thread_is_in_worker_context (GsWorkerThread *self) +{ + return g_main_context_is_owner (self->worker_context); +} + +static void shutdown_cb (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable); + +/** + * gs_worker_thread_shutdown_async: + * @self: a #GsWorkerThread + * @cancellable: (nullable): a #GCancellable, or %NULL + * @callback: callback for once the asynchronous operation is complete + * @user_data: data to pass to @callback + * + * Shut down the worker thread. + * + * The thread will finish processing whatever task it’s currently processing + * (if any), will return %G_IO_ERROR_CANCELLED for all remaining queued + * tasks, and will then join the main process. + * + * This is a no-op if called subsequently. + * + * Since: 42 + */ +void +gs_worker_thread_shutdown_async (GsWorkerThread *self, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + g_autoptr(GTask) task = NULL; + + g_return_if_fail (GS_IS_WORKER_THREAD (self)); + g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable)); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_source_tag (task, gs_worker_thread_shutdown_async); + + /* Already called? */ + if (g_atomic_int_get (&self->worker_state) != GS_WORKER_THREAD_STATE_RUNNING) { + g_task_return_boolean (task, TRUE); + return; + } + + /* Signal the worker thread to stop processing tasks. */ + g_atomic_int_set (&self->worker_state, GS_WORKER_THREAD_STATE_SHUTTING_DOWN); + gs_worker_thread_queue (self, G_MAXINT /* lowest priority */, + shutdown_cb, g_steal_pointer (&task)); +} + +static void +shutdown_cb (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GsWorkerThread *self = GS_WORKER_THREAD (source_object); + gboolean updated_state; + + updated_state = g_atomic_int_compare_and_exchange (&self->worker_state, + GS_WORKER_THREAD_STATE_SHUTTING_DOWN, + GS_WORKER_THREAD_STATE_SHUT_DOWN); + g_assert (updated_state); + + /* Tidy up. We can’t join the thread here as this function is executing + * within the thread and that would deadlock. */ + g_clear_pointer (&self->worker_context, g_main_context_unref); + + g_task_return_boolean (task, TRUE); +} + +/** + * gs_worker_thread_shutdown_finish: + * @self: a #GsWorkerThread + * @result: a #GAsyncResult + * @error: return location for a #GError, or %NULL + * + * Finish an asynchronous shutdown operation started with + * gs_worker_thread_shutdown_async(); + * + * Returns: %TRUE on success, %FALSE otherwise + * Since: 42 + */ +gboolean +gs_worker_thread_shutdown_finish (GsWorkerThread *self, + GAsyncResult *result, + GError **error) +{ + gboolean success; + + g_return_val_if_fail (GS_IS_WORKER_THREAD (self), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, gs_worker_thread_shutdown_async), FALSE); + g_return_val_if_fail (g_task_is_valid (result, self), FALSE); + g_return_val_if_fail (error == NULL || *error == NULL, FALSE); + + success = g_task_propagate_boolean (G_TASK (result), error); + + if (success) + g_thread_join (g_steal_pointer (&self->worker_thread)); + + return success; +} |