summaryrefslogtreecommitdiffstats
path: root/lib/gs-worker-thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gs-worker-thread.c')
-rw-r--r--lib/gs-worker-thread.c425
1 files changed, 425 insertions, 0 deletions
diff --git a/lib/gs-worker-thread.c b/lib/gs-worker-thread.c
new file mode 100644
index 0000000..6e6686d
--- /dev/null
+++ b/lib/gs-worker-thread.c
@@ -0,0 +1,425 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*-
+ * vi:set noexpandtab tabstop=8 shiftwidth=8:
+ *
+ * Copyright (C) 2021 Endless OS Foundation LLC
+ *
+ * Author: Philip Withnall <pwithnall@endlessos.org>
+ *
+ * SPDX-License-Identifier: GPL-2.0+
+ */
+
+/**
+ * SECTION:gs-worker-thread
+ * @short_description: A worker thread which executes queued #GTasks until stopped
+ *
+ * #GsWorkerThread is a thread-safe wrapper around a #GTask queue and a single
+ * worker thread which executes tasks on that queue.
+ *
+ * Tasks can be added to the queue using gs_worker_thread_queue(). The worker
+ * thread (which is created when #GsWorkerThread is constructed) will execute
+ * them in (priority, queue order) order. Each #GTaskThreadFunc is responsible
+ * for calling `g_task_return_*()` on its #GTask to complete that task.
+ *
+ * The priority passed to gs_worker_thread_queue() will be used to adjust the
+ * worker thread’s I/O priority (using `ioprio_set()`) when executing that task.
+ *
+ * It is intended that gs_worker_thread_queue() is an alternative to using
+ * g_task_run_in_thread(). g_task_run_in_thread() queues tasks into a single
+ * process-wide thread pool, so they are mixed in with other tasks, and it can
+ * become hard to ensure the thread pool isn’t overwhelmed and that tasks are
+ * executed in the right order.
+ *
+ * The worker thread will continue executing tasks until
+ * gs_worker_thread_shutdown_async() is called. This must be called before the
+ * final reference to the #GsWorkerThread is dropped.
+ *
+ * Since: 42
+ */
+
+#include "config.h"
+
+#include <glib.h>
+#include <glib-object.h>
+
+#include "gs-ioprio.h"
+#include "gs-worker-thread.h"
+
+typedef enum {
+ GS_WORKER_THREAD_STATE_RUNNING = 0,
+ GS_WORKER_THREAD_STATE_SHUTTING_DOWN = 1,
+ GS_WORKER_THREAD_STATE_SHUT_DOWN = 2,
+} GsWorkerThreadState;
+
+struct _GsWorkerThread
+{
+ GObject parent;
+
+ gchar *name; /* (nullable) (owned) */
+
+ GsWorkerThreadState worker_state; /* (atomic) */
+ GMainContext *worker_context; /* (owned); may be NULL before setup or after shutdown */
+ GThread *worker_thread; /* (atomic); may be NULL before setup or after shutdown */
+};
+
+typedef enum {
+ PROP_NAME = 1,
+} GsWorkerThreadProperty;
+
+static GParamSpec *props[PROP_NAME + 1] = { NULL, };
+
+G_DEFINE_TYPE (GsWorkerThread, gs_worker_thread, G_TYPE_OBJECT)
+
+static void
+gs_worker_thread_get_property (GObject *object,
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ GsWorkerThread *self = GS_WORKER_THREAD (object);
+
+ switch ((GsWorkerThreadProperty) prop_id) {
+ case PROP_NAME:
+ g_value_set_string (value, self->name);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gs_worker_thread_set_property (GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ GsWorkerThread *self = GS_WORKER_THREAD (object);
+
+ switch ((GsWorkerThreadProperty) prop_id) {
+ case PROP_NAME:
+ /* Construct only */
+ g_assert (self->name == NULL);
+ self->name = g_value_dup_string (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gs_worker_thread_dispose (GObject *object)
+{
+ GsWorkerThread *self = GS_WORKER_THREAD (object);
+
+ /* Should have stopped by now. */
+ g_assert (self->worker_thread == NULL);
+
+ g_clear_pointer (&self->name, g_free);
+ g_clear_pointer (&self->worker_context, g_main_context_unref);
+
+ G_OBJECT_CLASS (gs_worker_thread_parent_class)->dispose (object);
+}
+
+static gpointer thread_cb (gpointer data);
+
+static void
+gs_worker_thread_constructed (GObject *object)
+{
+ GsWorkerThread *self = GS_WORKER_THREAD (object);
+
+ G_OBJECT_CLASS (gs_worker_thread_parent_class)->constructed (object);
+
+ /* Start up a worker thread and its #GMainContext. The worker will run
+ * and process events on @worker_context until @worker_state changes
+ * from %GS_WORKER_THREAD_STATE_RUNNING. */
+ self->worker_state = GS_WORKER_THREAD_STATE_RUNNING;
+ self->worker_context = g_main_context_new ();
+ self->worker_thread = g_thread_new (self->name, thread_cb, self);
+}
+
+static void
+gs_worker_thread_class_init (GsWorkerThreadClass *klass)
+{
+ GObjectClass *object_class = G_OBJECT_CLASS (klass);
+
+ object_class->constructed = gs_worker_thread_constructed;
+ object_class->get_property = gs_worker_thread_get_property;
+ object_class->set_property = gs_worker_thread_set_property;
+ object_class->dispose = gs_worker_thread_dispose;
+
+ /**
+ * GsWorkerThread:name: (not nullable):
+ *
+ * Name for the worker thread to use in debug output. This must be set.
+ *
+ * Since: 42
+ */
+ props[PROP_NAME] =
+ g_param_spec_string ("name",
+ "Name",
+ "Name for the worker thread to use in debug output.",
+ NULL,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS | G_PARAM_EXPLICIT_NOTIFY);
+
+ g_object_class_install_properties (object_class, G_N_ELEMENTS (props), props);
+}
+
+static gpointer
+thread_cb (gpointer data)
+{
+ GsWorkerThread *self = GS_WORKER_THREAD (data);
+ g_autoptr(GMainContextPusher) pusher = g_main_context_pusher_new (self->worker_context);
+
+ while (g_atomic_int_get (&self->worker_state) != GS_WORKER_THREAD_STATE_SHUT_DOWN)
+ g_main_context_iteration (self->worker_context, TRUE);
+
+ return NULL;
+}
+
+static void
+gs_worker_thread_init (GsWorkerThread *self)
+{
+}
+
+/**
+ * gs_worker_thread_new:
+ * @name: (not nullable): name for the worker thread
+ *
+ * Create and start a new #GsWorkerThread.
+ *
+ * @name will be used to set the thread name and in debug output.
+ *
+ * Returns: (transfer full): a new #GsWorkerThread
+ * Since: 42
+ */
+GsWorkerThread *
+gs_worker_thread_new (const gchar *name)
+{
+ g_return_val_if_fail (name != NULL, NULL);
+
+ return g_object_new (GS_TYPE_WORKER_THREAD,
+ "name", name,
+ NULL);
+}
+
+/* Essentially a wrapper around these elements to avoid the caller having to
+ * return `G_SOURCE_REMOVE` from their `work_func` every time. */
+typedef struct {
+ GTaskThreadFunc work_func;
+ GTask *task; /* (owned) */
+ gint priority;
+} WorkData;
+
+static void
+work_data_free (WorkData *data)
+{
+ g_clear_object (&data->task);
+ g_free (data);
+}
+
+G_DEFINE_AUTOPTR_CLEANUP_FUNC (WorkData, work_data_free)
+
+static gboolean
+work_run_cb (gpointer _data)
+{
+ WorkData *data = _data;
+ GTask *task = data->task;
+ gpointer source_object = g_task_get_source_object (task);
+ gpointer task_data = g_task_get_task_data (task);
+ GCancellable *cancellable = g_task_get_cancellable (task);
+
+ /* Set the I/O priority of the thread to match the priority of the
+ * task. */
+ gs_ioprio_set (data->priority);
+
+ data->work_func (task, source_object, task_data, cancellable);
+
+ return G_SOURCE_REMOVE;
+}
+
+/**
+ * gs_worker_thread_queue:
+ * @self: a #GsWorkerThread
+ * @priority: (default G_PRIORITY_DEFAULT): priority to queue the task at,
+ * typically #G_PRIORITY_DEFAULT
+ * @work_func: (not nullable): function to run the task
+ * @task: (transfer full) (not nullable): the #GTask containing context data to
+ * pass to @work_func
+ *
+ * Queue @task to be run in the worker thread at the given @priority.
+ *
+ * This function takes ownership of @task.
+ *
+ * @priority sets the order of the task in the queue, and also affects the I/O
+ * priority of the worker thread when the task is executed — high priorities
+ * result in a high I/O priority, low priorities result in an idle I/O priority,
+ * as per `ioprio_set()`.
+ *
+ * When the task is run, @work_func will be executed and passed @task and the
+ * source object, task data and cancellable set on @task.
+ *
+ * @work_func is responsible for calling `g_task_return_*()` on @task once the
+ * task is complete.
+ *
+ * If a task is cancelled using its #GCancellable after it’s queued to the
+ * #GsWorkerThread, @work_func will still be executed. @work_func is responsible
+ * for checking whether the #GCancellable has been cancelled.
+ *
+ * It is an error to call this function after gs_worker_thread_shutdown_async()
+ * has called.
+ *
+ * Since: 42
+ */
+void
+gs_worker_thread_queue (GsWorkerThread *self,
+ gint priority,
+ GTaskThreadFunc work_func,
+ GTask *task)
+{
+ g_autoptr(WorkData) data = NULL;
+
+ g_return_if_fail (GS_IS_WORKER_THREAD (self));
+ g_return_if_fail (work_func != NULL);
+ g_return_if_fail (G_IS_TASK (task));
+
+ g_assert (g_atomic_int_get (&self->worker_state) == GS_WORKER_THREAD_STATE_RUNNING ||
+ g_task_get_source_tag (task) == gs_worker_thread_shutdown_async);
+
+ data = g_new0 (WorkData, 1);
+ data->work_func = work_func;
+ data->task = g_steal_pointer (&task);
+ data->priority = priority;
+
+ g_main_context_invoke_full (self->worker_context, priority,
+ work_run_cb, g_steal_pointer (&data), (GDestroyNotify) work_data_free);
+}
+
+/**
+ * gs_worker_thread_is_in_worker_context:
+ * @self: a #GsWorkerThread
+ *
+ * Returns whether the calling thread is the worker thread.
+ *
+ * This is intended to be used as a precondition check to ensure that worker
+ * code is not accidentally run from the wrong thread.
+ *
+ * |[
+ * static void
+ * do_work (MyPlugin *self)
+ * {
+ * g_assert (gs_worker_thread_is_in_worker_context (self->worker_thread));
+ *
+ * // do some work
+ * }
+ * ]|
+ *
+ * Returns: %TRUE if running in the worker context, %FALSE otherwise
+ * Since: 42
+ */
+gboolean
+gs_worker_thread_is_in_worker_context (GsWorkerThread *self)
+{
+ return g_main_context_is_owner (self->worker_context);
+}
+
+static void shutdown_cb (GTask *task,
+ gpointer source_object,
+ gpointer task_data,
+ GCancellable *cancellable);
+
+/**
+ * gs_worker_thread_shutdown_async:
+ * @self: a #GsWorkerThread
+ * @cancellable: (nullable): a #GCancellable, or %NULL
+ * @callback: callback for once the asynchronous operation is complete
+ * @user_data: data to pass to @callback
+ *
+ * Shut down the worker thread.
+ *
+ * The thread will finish processing whatever task it’s currently processing
+ * (if any), will return %G_IO_ERROR_CANCELLED for all remaining queued
+ * tasks, and will then join the main process.
+ *
+ * This is a no-op if called subsequently.
+ *
+ * Since: 42
+ */
+void
+gs_worker_thread_shutdown_async (GsWorkerThread *self,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ g_autoptr(GTask) task = NULL;
+
+ g_return_if_fail (GS_IS_WORKER_THREAD (self));
+ g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
+
+ task = g_task_new (self, cancellable, callback, user_data);
+ g_task_set_source_tag (task, gs_worker_thread_shutdown_async);
+
+ /* Already called? */
+ if (g_atomic_int_get (&self->worker_state) != GS_WORKER_THREAD_STATE_RUNNING) {
+ g_task_return_boolean (task, TRUE);
+ return;
+ }
+
+ /* Signal the worker thread to stop processing tasks. */
+ g_atomic_int_set (&self->worker_state, GS_WORKER_THREAD_STATE_SHUTTING_DOWN);
+ gs_worker_thread_queue (self, G_MAXINT /* lowest priority */,
+ shutdown_cb, g_steal_pointer (&task));
+}
+
+static void
+shutdown_cb (GTask *task,
+ gpointer source_object,
+ gpointer task_data,
+ GCancellable *cancellable)
+{
+ GsWorkerThread *self = GS_WORKER_THREAD (source_object);
+ gboolean updated_state;
+
+ updated_state = g_atomic_int_compare_and_exchange (&self->worker_state,
+ GS_WORKER_THREAD_STATE_SHUTTING_DOWN,
+ GS_WORKER_THREAD_STATE_SHUT_DOWN);
+ g_assert (updated_state);
+
+ /* Tidy up. We can’t join the thread here as this function is executing
+ * within the thread and that would deadlock. */
+ g_clear_pointer (&self->worker_context, g_main_context_unref);
+
+ g_task_return_boolean (task, TRUE);
+}
+
+/**
+ * gs_worker_thread_shutdown_finish:
+ * @self: a #GsWorkerThread
+ * @result: a #GAsyncResult
+ * @error: return location for a #GError, or %NULL
+ *
+ * Finish an asynchronous shutdown operation started with
+ * gs_worker_thread_shutdown_async();
+ *
+ * Returns: %TRUE on success, %FALSE otherwise
+ * Since: 42
+ */
+gboolean
+gs_worker_thread_shutdown_finish (GsWorkerThread *self,
+ GAsyncResult *result,
+ GError **error)
+{
+ gboolean success;
+
+ g_return_val_if_fail (GS_IS_WORKER_THREAD (self), FALSE);
+ g_return_val_if_fail (g_async_result_is_tagged (result, gs_worker_thread_shutdown_async), FALSE);
+ g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
+ g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
+
+ success = g_task_propagate_boolean (G_TASK (result), error);
+
+ if (success)
+ g_thread_join (g_steal_pointer (&self->worker_thread));
+
+ return success;
+}