/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- * vi:set noexpandtab tabstop=8 shiftwidth=8: * * Copyright (C) 2021 Endless OS Foundation LLC * * Author: Philip Withnall * * SPDX-License-Identifier: GPL-2.0+ */ /** * SECTION:gs-worker-thread * @short_description: A worker thread which executes queued #GTasks until stopped * * #GsWorkerThread is a thread-safe wrapper around a #GTask queue and a single * worker thread which executes tasks on that queue. * * Tasks can be added to the queue using gs_worker_thread_queue(). The worker * thread (which is created when #GsWorkerThread is constructed) will execute * them in (priority, queue order) order. Each #GTaskThreadFunc is responsible * for calling `g_task_return_*()` on its #GTask to complete that task. * * The priority passed to gs_worker_thread_queue() will be used to adjust the * worker thread’s I/O priority (using `ioprio_set()`) when executing that task. * * It is intended that gs_worker_thread_queue() is an alternative to using * g_task_run_in_thread(). g_task_run_in_thread() queues tasks into a single * process-wide thread pool, so they are mixed in with other tasks, and it can * become hard to ensure the thread pool isn’t overwhelmed and that tasks are * executed in the right order. * * The worker thread will continue executing tasks until * gs_worker_thread_shutdown_async() is called. This must be called before the * final reference to the #GsWorkerThread is dropped. * * Since: 42 */ #include "config.h" #include #include #include "gs-ioprio.h" #include "gs-worker-thread.h" typedef enum { GS_WORKER_THREAD_STATE_RUNNING = 0, GS_WORKER_THREAD_STATE_SHUTTING_DOWN = 1, GS_WORKER_THREAD_STATE_SHUT_DOWN = 2, } GsWorkerThreadState; struct _GsWorkerThread { GObject parent; gchar *name; /* (nullable) (owned) */ GsWorkerThreadState worker_state; /* (atomic) */ GMainContext *worker_context; /* (owned); may be NULL before setup or after shutdown */ GThread *worker_thread; /* (atomic); may be NULL before setup or after shutdown */ }; typedef enum { PROP_NAME = 1, } GsWorkerThreadProperty; static GParamSpec *props[PROP_NAME + 1] = { NULL, }; G_DEFINE_TYPE (GsWorkerThread, gs_worker_thread, G_TYPE_OBJECT) static void gs_worker_thread_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { GsWorkerThread *self = GS_WORKER_THREAD (object); switch ((GsWorkerThreadProperty) prop_id) { case PROP_NAME: g_value_set_string (value, self->name); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } static void gs_worker_thread_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { GsWorkerThread *self = GS_WORKER_THREAD (object); switch ((GsWorkerThreadProperty) prop_id) { case PROP_NAME: /* Construct only */ g_assert (self->name == NULL); self->name = g_value_dup_string (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } static void gs_worker_thread_dispose (GObject *object) { GsWorkerThread *self = GS_WORKER_THREAD (object); /* Should have stopped by now. */ g_assert (self->worker_thread == NULL); g_clear_pointer (&self->name, g_free); g_clear_pointer (&self->worker_context, g_main_context_unref); G_OBJECT_CLASS (gs_worker_thread_parent_class)->dispose (object); } static gpointer thread_cb (gpointer data); static void gs_worker_thread_constructed (GObject *object) { GsWorkerThread *self = GS_WORKER_THREAD (object); G_OBJECT_CLASS (gs_worker_thread_parent_class)->constructed (object); /* Start up a worker thread and its #GMainContext. The worker will run * and process events on @worker_context until @worker_state changes * from %GS_WORKER_THREAD_STATE_RUNNING. */ self->worker_state = GS_WORKER_THREAD_STATE_RUNNING; self->worker_context = g_main_context_new (); self->worker_thread = g_thread_new (self->name, thread_cb, self); } static void gs_worker_thread_class_init (GsWorkerThreadClass *klass) { GObjectClass *object_class = G_OBJECT_CLASS (klass); object_class->constructed = gs_worker_thread_constructed; object_class->get_property = gs_worker_thread_get_property; object_class->set_property = gs_worker_thread_set_property; object_class->dispose = gs_worker_thread_dispose; /** * GsWorkerThread:name: (not nullable): * * Name for the worker thread to use in debug output. This must be set. * * Since: 42 */ props[PROP_NAME] = g_param_spec_string ("name", "Name", "Name for the worker thread to use in debug output.", NULL, G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS | G_PARAM_EXPLICIT_NOTIFY); g_object_class_install_properties (object_class, G_N_ELEMENTS (props), props); } static gpointer thread_cb (gpointer data) { GsWorkerThread *self = GS_WORKER_THREAD (data); g_autoptr(GMainContextPusher) pusher = g_main_context_pusher_new (self->worker_context); while (g_atomic_int_get (&self->worker_state) != GS_WORKER_THREAD_STATE_SHUT_DOWN) g_main_context_iteration (self->worker_context, TRUE); return NULL; } static void gs_worker_thread_init (GsWorkerThread *self) { } /** * gs_worker_thread_new: * @name: (not nullable): name for the worker thread * * Create and start a new #GsWorkerThread. * * @name will be used to set the thread name and in debug output. * * Returns: (transfer full): a new #GsWorkerThread * Since: 42 */ GsWorkerThread * gs_worker_thread_new (const gchar *name) { g_return_val_if_fail (name != NULL, NULL); return g_object_new (GS_TYPE_WORKER_THREAD, "name", name, NULL); } /* Essentially a wrapper around these elements to avoid the caller having to * return `G_SOURCE_REMOVE` from their `work_func` every time. */ typedef struct { GTaskThreadFunc work_func; GTask *task; /* (owned) */ gint priority; } WorkData; static void work_data_free (WorkData *data) { g_clear_object (&data->task); g_free (data); } G_DEFINE_AUTOPTR_CLEANUP_FUNC (WorkData, work_data_free) static gboolean work_run_cb (gpointer _data) { WorkData *data = _data; GTask *task = data->task; gpointer source_object = g_task_get_source_object (task); gpointer task_data = g_task_get_task_data (task); GCancellable *cancellable = g_task_get_cancellable (task); /* Set the I/O priority of the thread to match the priority of the * task. */ gs_ioprio_set (data->priority); data->work_func (task, source_object, task_data, cancellable); return G_SOURCE_REMOVE; } /** * gs_worker_thread_queue: * @self: a #GsWorkerThread * @priority: (default G_PRIORITY_DEFAULT): priority to queue the task at, * typically #G_PRIORITY_DEFAULT * @work_func: (not nullable): function to run the task * @task: (transfer full) (not nullable): the #GTask containing context data to * pass to @work_func * * Queue @task to be run in the worker thread at the given @priority. * * This function takes ownership of @task. * * @priority sets the order of the task in the queue, and also affects the I/O * priority of the worker thread when the task is executed — high priorities * result in a high I/O priority, low priorities result in an idle I/O priority, * as per `ioprio_set()`. * * When the task is run, @work_func will be executed and passed @task and the * source object, task data and cancellable set on @task. * * @work_func is responsible for calling `g_task_return_*()` on @task once the * task is complete. * * If a task is cancelled using its #GCancellable after it’s queued to the * #GsWorkerThread, @work_func will still be executed. @work_func is responsible * for checking whether the #GCancellable has been cancelled. * * It is an error to call this function after gs_worker_thread_shutdown_async() * has called. * * Since: 42 */ void gs_worker_thread_queue (GsWorkerThread *self, gint priority, GTaskThreadFunc work_func, GTask *task) { g_autoptr(WorkData) data = NULL; g_return_if_fail (GS_IS_WORKER_THREAD (self)); g_return_if_fail (work_func != NULL); g_return_if_fail (G_IS_TASK (task)); g_assert (g_atomic_int_get (&self->worker_state) == GS_WORKER_THREAD_STATE_RUNNING || g_task_get_source_tag (task) == gs_worker_thread_shutdown_async); data = g_new0 (WorkData, 1); data->work_func = work_func; data->task = g_steal_pointer (&task); data->priority = priority; g_main_context_invoke_full (self->worker_context, priority, work_run_cb, g_steal_pointer (&data), (GDestroyNotify) work_data_free); } /** * gs_worker_thread_is_in_worker_context: * @self: a #GsWorkerThread * * Returns whether the calling thread is the worker thread. * * This is intended to be used as a precondition check to ensure that worker * code is not accidentally run from the wrong thread. * * |[ * static void * do_work (MyPlugin *self) * { * g_assert (gs_worker_thread_is_in_worker_context (self->worker_thread)); * * // do some work * } * ]| * * Returns: %TRUE if running in the worker context, %FALSE otherwise * Since: 42 */ gboolean gs_worker_thread_is_in_worker_context (GsWorkerThread *self) { return g_main_context_is_owner (self->worker_context); } static void shutdown_cb (GTask *task, gpointer source_object, gpointer task_data, GCancellable *cancellable); /** * gs_worker_thread_shutdown_async: * @self: a #GsWorkerThread * @cancellable: (nullable): a #GCancellable, or %NULL * @callback: callback for once the asynchronous operation is complete * @user_data: data to pass to @callback * * Shut down the worker thread. * * The thread will finish processing whatever task it’s currently processing * (if any), will return %G_IO_ERROR_CANCELLED for all remaining queued * tasks, and will then join the main process. * * This is a no-op if called subsequently. * * Since: 42 */ void gs_worker_thread_shutdown_async (GsWorkerThread *self, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer user_data) { g_autoptr(GTask) task = NULL; g_return_if_fail (GS_IS_WORKER_THREAD (self)); g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable)); task = g_task_new (self, cancellable, callback, user_data); g_task_set_source_tag (task, gs_worker_thread_shutdown_async); /* Already called? */ if (g_atomic_int_get (&self->worker_state) != GS_WORKER_THREAD_STATE_RUNNING) { g_task_return_boolean (task, TRUE); return; } /* Signal the worker thread to stop processing tasks. */ g_atomic_int_set (&self->worker_state, GS_WORKER_THREAD_STATE_SHUTTING_DOWN); gs_worker_thread_queue (self, G_MAXINT /* lowest priority */, shutdown_cb, g_steal_pointer (&task)); } static void shutdown_cb (GTask *task, gpointer source_object, gpointer task_data, GCancellable *cancellable) { GsWorkerThread *self = GS_WORKER_THREAD (source_object); gboolean updated_state; updated_state = g_atomic_int_compare_and_exchange (&self->worker_state, GS_WORKER_THREAD_STATE_SHUTTING_DOWN, GS_WORKER_THREAD_STATE_SHUT_DOWN); g_assert (updated_state); /* Tidy up. We can’t join the thread here as this function is executing * within the thread and that would deadlock. */ g_clear_pointer (&self->worker_context, g_main_context_unref); g_task_return_boolean (task, TRUE); } /** * gs_worker_thread_shutdown_finish: * @self: a #GsWorkerThread * @result: a #GAsyncResult * @error: return location for a #GError, or %NULL * * Finish an asynchronous shutdown operation started with * gs_worker_thread_shutdown_async(); * * Returns: %TRUE on success, %FALSE otherwise * Since: 42 */ gboolean gs_worker_thread_shutdown_finish (GsWorkerThread *self, GAsyncResult *result, GError **error) { gboolean success; g_return_val_if_fail (GS_IS_WORKER_THREAD (self), FALSE); g_return_val_if_fail (g_async_result_is_tagged (result, gs_worker_thread_shutdown_async), FALSE); g_return_val_if_fail (g_task_is_valid (result, self), FALSE); g_return_val_if_fail (error == NULL || *error == NULL, FALSE); success = g_task_propagate_boolean (G_TASK (result), error); if (success) g_thread_join (g_steal_pointer (&self->worker_thread)); return success; }