diff options
Diffstat (limited to 'src/knot/worker')
-rw-r--r-- | src/knot/worker/pool.c | 254 | ||||
-rw-r--r-- | src/knot/worker/pool.h | 93 | ||||
-rw-r--r-- | src/knot/worker/queue.c | 67 | ||||
-rw-r--r-- | src/knot/worker/queue.h | 65 |
4 files changed, 479 insertions, 0 deletions
diff --git a/src/knot/worker/pool.c b/src/knot/worker/pool.c new file mode 100644 index 0000000..ff74970 --- /dev/null +++ b/src/knot/worker/pool.c @@ -0,0 +1,254 @@ +/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include <assert.h> +#include <pthread.h> +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> + +#include "libknot/libknot.h" +#include "knot/server/dthreads.h" +#include "knot/worker/pool.h" + +/*! + * \brief Worker pool state. + */ +struct worker_pool { + dt_unit_t *threads; + + pthread_mutex_t lock; + pthread_cond_t wake; + + bool terminating; /*!< Is the pool terminating? .*/ + bool suspended; /*!< Is execution temporarily suspended? .*/ + int running; /*!< Number of running threads. */ + worker_queue_t tasks; +}; + +/*! + * \brief Worker thread. + * + * The thread takes a task from the tasks queue and runs it, while checking + * if the dispatching of new tasks is allowed by the thread pool. + * + * An execution of a running thread cannot be enforced. + * + */ +static int worker_main(dthread_t *thread) +{ + assert(thread); + + worker_pool_t *pool = thread->data; + + pthread_mutex_lock(&pool->lock); + + for (;;) { + if (pool->terminating) { + break; + } + + worker_task_t *task = NULL; + if (!pool->suspended) { + task = worker_queue_dequeue(&pool->tasks); + } + + if (task == NULL) { + pthread_cond_wait(&pool->wake, &pool->lock); + continue; + } + + assert(task->run); + pool->running += 1; + + pthread_mutex_unlock(&pool->lock); + task->run(task); + pthread_mutex_lock(&pool->lock); + + pool->running -= 1; + pthread_cond_broadcast(&pool->wake); + } + + pthread_mutex_unlock(&pool->lock); + + return KNOT_EOK; +} + +/* -- public API ------------------------------------------------------------ */ + +worker_pool_t *worker_pool_create(unsigned threads) +{ + worker_pool_t *pool = malloc(sizeof(worker_pool_t)); + if (pool == NULL) { + return NULL; + } + + memset(pool, 0, sizeof(worker_pool_t)); + pool->threads = dt_create(threads, worker_main, NULL, pool); + if (pool->threads == NULL) { + goto fail; + } + + if (pthread_mutex_init(&pool->lock, NULL) != 0) { + goto fail; + } + + if (pthread_cond_init(&pool->wake, NULL) != 0) { + goto fail; + } + + worker_queue_init(&pool->tasks); + + return pool; + +fail: + dt_delete(&pool->threads); + free(pool); + return NULL; +} + +void worker_pool_destroy(worker_pool_t *pool) +{ + if (!pool) { + return; + } + + dt_delete(&pool->threads); + + pthread_mutex_destroy(&pool->lock); + pthread_cond_destroy(&pool->wake); + + worker_queue_deinit(&pool->tasks); + + free(pool); +} + +void worker_pool_start(worker_pool_t *pool) +{ + if (!pool) { + return; + } + + dt_start(pool->threads); +} + +void worker_pool_stop(worker_pool_t *pool) +{ + if (!pool) { + return; + } + + pthread_mutex_lock(&pool->lock); + pool->terminating = true; + pthread_cond_broadcast(&pool->wake); + pthread_mutex_unlock(&pool->lock); + + dt_stop(pool->threads); +} + +void worker_pool_suspend(worker_pool_t *pool) +{ + if (!pool) { + return; + } + + pthread_mutex_lock(&pool->lock); + pool->suspended = true; + pthread_mutex_unlock(&pool->lock); +} + +void worker_pool_resume(worker_pool_t *pool) +{ + if (!pool) { + return; + } + + pthread_mutex_lock(&pool->lock); + pool->suspended = false; + pthread_cond_broadcast(&pool->wake); + pthread_mutex_unlock(&pool->lock); +} + +void worker_pool_join(worker_pool_t *pool) +{ + if (!pool) { + return; + } + + dt_join(pool->threads); +} + +void worker_pool_wait_cb(worker_pool_t *pool, wait_callback_t cb) +{ + if (!pool) { + return; + } + + pthread_mutex_lock(&pool->lock); + while (!EMPTY_LIST(pool->tasks.list) || pool->running > 0) { + if (cb != NULL) { + cb(pool); + } + pthread_cond_wait(&pool->wake, &pool->lock); + } + pthread_mutex_unlock(&pool->lock); +} + +void worker_pool_wait(worker_pool_t *pool) +{ + worker_pool_wait_cb(pool, NULL); +} + +void worker_pool_assign(worker_pool_t *pool, struct task *task) +{ + if (!pool || !task) { + return; + } + + pthread_mutex_lock(&pool->lock); + worker_queue_enqueue(&pool->tasks, task); + pthread_cond_signal(&pool->wake); + pthread_mutex_unlock(&pool->lock); +} + +void worker_pool_clear(worker_pool_t *pool) +{ + if (!pool) { + return; + } + + pthread_mutex_lock(&pool->lock); + worker_queue_deinit(&pool->tasks); + worker_queue_init(&pool->tasks); + pthread_mutex_unlock(&pool->lock); +} + +void worker_pool_status(worker_pool_t *pool, bool locked, int *running, int *queued) +{ + if (!pool) { + *running = *queued = 0; + return; + } + + if (!locked) { + pthread_mutex_lock(&pool->lock); + } + *running = pool->running; + *queued = worker_queue_length(&pool->tasks); + if (!locked) { + pthread_mutex_unlock(&pool->lock); + } +} diff --git a/src/knot/worker/pool.h b/src/knot/worker/pool.h new file mode 100644 index 0000000..f843ea7 --- /dev/null +++ b/src/knot/worker/pool.h @@ -0,0 +1,93 @@ +/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <stdbool.h> + +#include "knot/worker/queue.h" + +struct worker_pool; +typedef struct worker_pool worker_pool_t; + +typedef void(*wait_callback_t)(worker_pool_t *); + +/*! + * \brief Initialize worker pool. + * + * \param threads Number of threads to be created. + * + * \return Thread pool or NULL in case of error. + */ +worker_pool_t *worker_pool_create(unsigned threads); + +/*! + * \brief Destroy the worker pool. + */ +void worker_pool_destroy(worker_pool_t *pool); + +/*! + * \brief Start all threads in the worker pool. + */ +void worker_pool_start(worker_pool_t *pool); + +/*! + * \brief Stop processing of new tasks, start stopping worker threads when possible. + */ +void worker_pool_stop(worker_pool_t *pool); + +/*! + * \brief Temporarily suspend the execution of worker pool. + */ +void worker_pool_suspend(worker_pool_t *pool); + +/*! + * \brief Resume the execution of worker pool. + */ +void worker_pool_resume(worker_pool_t *pool); + +/*! + * \brief Wait for all threads to terminate. + */ +void worker_pool_join(worker_pool_t *pool); + +/*! + * \brief Wait till the number of pending tasks is zero. + */ +void worker_pool_wait(worker_pool_t *pool); + +/*! + * \brief Wait till the number of pending tasks is zero. Callback emitted on + * thread wakeup can be specified. + */ +void worker_pool_wait_cb(worker_pool_t *pool, wait_callback_t cb); + +/*! + * \brief Assign a task to be performed by a worker in the pool. + */ +void worker_pool_assign(worker_pool_t *pool, struct task *task); + +/*! + * \brief Clear all tasks enqueued in pool processing queue. + */ +void worker_pool_clear(worker_pool_t *pool); + +/*! + * \brief Obtain info regarding how the pool is busy. + * + * \note Locked means if the mutex `pool->lock` is locked. + */ +void worker_pool_status(worker_pool_t *pool, bool locked, int *running, int *queued); diff --git a/src/knot/worker/queue.c b/src/knot/worker/queue.c new file mode 100644 index 0000000..d9fc2b6 --- /dev/null +++ b/src/knot/worker/queue.c @@ -0,0 +1,67 @@ +/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "knot/worker/queue.h" +#include "contrib/mempattern.h" + +void worker_queue_init(worker_queue_t *queue) +{ + if (!queue) { + return; + } + + memset(queue, 0, sizeof(worker_queue_t)); + + init_list(&queue->list); + mm_ctx_init(&queue->mm_ctx); +} + +void worker_queue_deinit(worker_queue_t *queue) +{ + ptrlist_free(&queue->list, &queue->mm_ctx); +} + +void worker_queue_enqueue(worker_queue_t *queue, worker_task_t *task) +{ + if (!queue || !task) { + return; + } + + ptrlist_add(&queue->list, task, &queue->mm_ctx); +} + +worker_task_t *worker_queue_dequeue(worker_queue_t *queue) +{ + if (!queue) { + return NULL; + } + + worker_task_t *task = NULL; + + if (!EMPTY_LIST(queue->list)) { + ptrnode_t *node = HEAD(queue->list); + task = (void *)node->d; + rem_node(&node->n); + queue->mm_ctx.free(&node->n); + } + + return task; +} + +size_t worker_queue_length(worker_queue_t *queue) +{ + return queue ? list_size(&queue->list) : 0; +} diff --git a/src/knot/worker/queue.h b/src/knot/worker/queue.h new file mode 100644 index 0000000..0ade7ab --- /dev/null +++ b/src/knot/worker/queue.h @@ -0,0 +1,65 @@ +/* Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "contrib/ucw/lists.h" + +struct task; +typedef void (*task_cb)(struct task *); + +/*! + * \brief Task executable by a worker. + */ +typedef struct task { + void *ctx; + task_cb run; +} worker_task_t; + +/*! + * \brief Worker queue. + */ +typedef struct worker_queue { + knot_mm_t mm_ctx; + list_t list; +} worker_queue_t; + +/*! + * \brief Initialize worker queue. + */ +void worker_queue_init(worker_queue_t *queue); + +/*! + * \brief Deinitialize worker queue. + */ +void worker_queue_deinit(worker_queue_t *queue); + +/*! + * \brief Insert new item into the queue. + */ +void worker_queue_enqueue(worker_queue_t *queue, worker_task_t *task); + +/*! + * \brief Remove item from the queue. + * + * \return Task or NULL if the queue is empty. + */ +worker_task_t *worker_queue_dequeue(worker_queue_t *queue); + +/*! + * \brief Return number of tasks in worker queue. + */ +size_t worker_queue_length(worker_queue_t *queue); |