diff options
Diffstat (limited to 'lib/pthreadpool')
-rw-r--r-- | lib/pthreadpool/Makefile | 9 | ||||
-rw-r--r-- | lib/pthreadpool/pthreadpool.c | 863 | ||||
-rw-r--r-- | lib/pthreadpool/pthreadpool.h | 158 | ||||
-rw-r--r-- | lib/pthreadpool/pthreadpool_pipe.c | 202 | ||||
-rw-r--r-- | lib/pthreadpool/pthreadpool_pipe.h | 39 | ||||
-rw-r--r-- | lib/pthreadpool/pthreadpool_sync.c | 97 | ||||
-rw-r--r-- | lib/pthreadpool/pthreadpool_tevent.c | 428 | ||||
-rw-r--r-- | lib/pthreadpool/pthreadpool_tevent.h | 40 | ||||
-rw-r--r-- | lib/pthreadpool/tests.c | 517 | ||||
-rw-r--r-- | lib/pthreadpool/tests_cmocka.c | 247 | ||||
-rw-r--r-- | lib/pthreadpool/wscript_build | 35 |
11 files changed, 2635 insertions, 0 deletions
diff --git a/lib/pthreadpool/Makefile b/lib/pthreadpool/Makefile new file mode 100644 index 0000000..48626bd --- /dev/null +++ b/lib/pthreadpool/Makefile @@ -0,0 +1,9 @@ +all: tests + +CFLAGS=-O3 -g -Wall + +pthreadpool.o: pthreadpool.c pthreadpool.h + gcc -c -O3 -o pthreadpool.o pthreadpool.c -I../../.. + +tests: tests.o pthreadpool.o + gcc -o tests tests.o pthreadpool.o -lpthread
\ No newline at end of file diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c new file mode 100644 index 0000000..cbabec9 --- /dev/null +++ b/lib/pthreadpool/pthreadpool.c @@ -0,0 +1,863 @@ +/* + * Unix SMB/CIFS implementation. + * thread pool implementation + * Copyright (C) Volker Lendecke 2009 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "replace.h" +#include "system/time.h" +#include "system/wait.h" +#include "system/threads.h" +#include "system/filesys.h" +#include "pthreadpool.h" +#include "lib/util/dlinklist.h" + +#ifdef NDEBUG +#undef NDEBUG +#endif + +#include <assert.h> + +struct pthreadpool_job { + int id; + void (*fn)(void *private_data); + void *private_data; +}; + +struct pthreadpool { + /* + * List pthreadpools for fork safety + */ + struct pthreadpool *prev, *next; + + /* + * Control access to this struct + */ + pthread_mutex_t mutex; + + /* + * Threads waiting for work do so here + */ + pthread_cond_t condvar; + + /* + * Array of jobs + */ + size_t jobs_array_len; + struct pthreadpool_job *jobs; + + size_t head; + size_t num_jobs; + + /* + * Indicate job completion + */ + int (*signal_fn)(int jobid, + void (*job_fn)(void *private_data), + void *job_fn_private_data, + void *private_data); + void *signal_fn_private_data; + + /* + * indicator to worker threads to stop processing further jobs + * and exit. + */ + bool stopped; + + /* + * indicator to the last worker thread to free the pool + * resources. + */ + bool destroyed; + + /* + * maximum number of threads + * 0 means no real thread, only strict sync processing. + */ + unsigned max_threads; + + /* + * Number of threads + */ + unsigned num_threads; + + /* + * Number of idle threads + */ + unsigned num_idle; + + /* + * Condition variable indicating that helper threads should + * quickly go away making way for fork() without anybody + * waiting on pool->condvar. + */ + pthread_cond_t *prefork_cond; + + /* + * Waiting position for helper threads while fork is + * running. The forking thread will have locked it, and all + * idle helper threads will sit here until after the fork, + * where the forking thread will unlock it again. + */ + pthread_mutex_t fork_mutex; +}; + +static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER; +static struct pthreadpool *pthreadpools = NULL; +static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT; + +static void pthreadpool_prep_atfork(void); + +/* + * Initialize a thread pool + */ + +int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, + int (*signal_fn)(int jobid, + void (*job_fn)(void *private_data), + void *job_fn_private_data, + void *private_data), + void *signal_fn_private_data) +{ + struct pthreadpool *pool; + int ret; + + pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool)); + if (pool == NULL) { + return ENOMEM; + } + pool->signal_fn = signal_fn; + pool->signal_fn_private_data = signal_fn_private_data; + + pool->jobs_array_len = 4; + pool->jobs = calloc( + pool->jobs_array_len, sizeof(struct pthreadpool_job)); + + if (pool->jobs == NULL) { + free(pool); + return ENOMEM; + } + + pool->head = pool->num_jobs = 0; + + ret = pthread_mutex_init(&pool->mutex, NULL); + if (ret != 0) { + free(pool->jobs); + free(pool); + return ret; + } + + ret = pthread_cond_init(&pool->condvar, NULL); + if (ret != 0) { + pthread_mutex_destroy(&pool->mutex); + free(pool->jobs); + free(pool); + return ret; + } + + ret = pthread_mutex_init(&pool->fork_mutex, NULL); + if (ret != 0) { + pthread_cond_destroy(&pool->condvar); + pthread_mutex_destroy(&pool->mutex); + free(pool->jobs); + free(pool); + return ret; + } + + pool->stopped = false; + pool->destroyed = false; + pool->num_threads = 0; + pool->max_threads = max_threads; + pool->num_idle = 0; + pool->prefork_cond = NULL; + + ret = pthread_mutex_lock(&pthreadpools_mutex); + if (ret != 0) { + pthread_mutex_destroy(&pool->fork_mutex); + pthread_cond_destroy(&pool->condvar); + pthread_mutex_destroy(&pool->mutex); + free(pool->jobs); + free(pool); + return ret; + } + DLIST_ADD(pthreadpools, pool); + + ret = pthread_mutex_unlock(&pthreadpools_mutex); + assert(ret == 0); + + pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork); + + *presult = pool; + + return 0; +} + +size_t pthreadpool_max_threads(struct pthreadpool *pool) +{ + if (pool->stopped) { + return 0; + } + + return pool->max_threads; +} + +size_t pthreadpool_queued_jobs(struct pthreadpool *pool) +{ + int res; + int unlock_res; + size_t ret; + + if (pool->stopped) { + return 0; + } + + res = pthread_mutex_lock(&pool->mutex); + if (res != 0) { + return res; + } + + if (pool->stopped) { + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return 0; + } + + ret = pool->num_jobs; + + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return ret; +} + +static void pthreadpool_prepare_pool(struct pthreadpool *pool) +{ + int ret; + + ret = pthread_mutex_lock(&pool->fork_mutex); + assert(ret == 0); + + ret = pthread_mutex_lock(&pool->mutex); + assert(ret == 0); + + while (pool->num_idle != 0) { + unsigned num_idle = pool->num_idle; + pthread_cond_t prefork_cond; + + ret = pthread_cond_init(&prefork_cond, NULL); + assert(ret == 0); + + /* + * Push all idle threads off pool->condvar. In the + * child we can destroy the pool, which would result + * in undefined behaviour in the + * pthread_cond_destroy(pool->condvar). glibc just + * blocks here. + */ + pool->prefork_cond = &prefork_cond; + + ret = pthread_cond_signal(&pool->condvar); + assert(ret == 0); + + while (pool->num_idle == num_idle) { + ret = pthread_cond_wait(&prefork_cond, &pool->mutex); + assert(ret == 0); + } + + pool->prefork_cond = NULL; + + ret = pthread_cond_destroy(&prefork_cond); + assert(ret == 0); + } + + /* + * Probably it's well-defined somewhere: What happens to + * condvars after a fork? The rationale of pthread_atfork only + * writes about mutexes. So better be safe than sorry and + * destroy/reinit pool->condvar across a fork. + */ + + ret = pthread_cond_destroy(&pool->condvar); + assert(ret == 0); +} + +static void pthreadpool_prepare(void) +{ + int ret; + struct pthreadpool *pool; + + ret = pthread_mutex_lock(&pthreadpools_mutex); + assert(ret == 0); + + pool = pthreadpools; + + while (pool != NULL) { + pthreadpool_prepare_pool(pool); + pool = pool->next; + } +} + +static void pthreadpool_parent(void) +{ + int ret; + struct pthreadpool *pool; + + for (pool = DLIST_TAIL(pthreadpools); + pool != NULL; + pool = DLIST_PREV(pool)) { + ret = pthread_cond_init(&pool->condvar, NULL); + assert(ret == 0); + ret = pthread_mutex_unlock(&pool->mutex); + assert(ret == 0); + ret = pthread_mutex_unlock(&pool->fork_mutex); + assert(ret == 0); + } + + ret = pthread_mutex_unlock(&pthreadpools_mutex); + assert(ret == 0); +} + +static void pthreadpool_child(void) +{ + int ret; + struct pthreadpool *pool; + + for (pool = DLIST_TAIL(pthreadpools); + pool != NULL; + pool = DLIST_PREV(pool)) { + + pool->num_threads = 0; + pool->num_idle = 0; + pool->head = 0; + pool->num_jobs = 0; + pool->stopped = true; + + ret = pthread_cond_init(&pool->condvar, NULL); + assert(ret == 0); + + ret = pthread_mutex_unlock(&pool->mutex); + assert(ret == 0); + + ret = pthread_mutex_unlock(&pool->fork_mutex); + assert(ret == 0); + } + + ret = pthread_mutex_unlock(&pthreadpools_mutex); + assert(ret == 0); +} + +static void pthreadpool_prep_atfork(void) +{ + pthread_atfork(pthreadpool_prepare, pthreadpool_parent, + pthreadpool_child); +} + +static int pthreadpool_free(struct pthreadpool *pool) +{ + int ret, ret1, ret2; + + ret = pthread_mutex_lock(&pthreadpools_mutex); + if (ret != 0) { + return ret; + } + DLIST_REMOVE(pthreadpools, pool); + ret = pthread_mutex_unlock(&pthreadpools_mutex); + assert(ret == 0); + + ret = pthread_mutex_lock(&pool->mutex); + assert(ret == 0); + ret = pthread_mutex_unlock(&pool->mutex); + assert(ret == 0); + + ret = pthread_mutex_destroy(&pool->mutex); + ret1 = pthread_cond_destroy(&pool->condvar); + ret2 = pthread_mutex_destroy(&pool->fork_mutex); + + if (ret != 0) { + return ret; + } + if (ret1 != 0) { + return ret1; + } + if (ret2 != 0) { + return ret2; + } + + free(pool->jobs); + free(pool); + + return 0; +} + +/* + * Stop a thread pool. Wake up all idle threads for exit. + */ + +static int pthreadpool_stop_locked(struct pthreadpool *pool) +{ + int ret; + + pool->stopped = true; + + if (pool->num_threads == 0) { + return 0; + } + + /* + * We have active threads, tell them to finish. + */ + + ret = pthread_cond_broadcast(&pool->condvar); + + return ret; +} + +/* + * Stop a thread pool. Wake up all idle threads for exit. + */ + +int pthreadpool_stop(struct pthreadpool *pool) +{ + int ret, ret1; + + ret = pthread_mutex_lock(&pool->mutex); + if (ret != 0) { + return ret; + } + + if (!pool->stopped) { + ret = pthreadpool_stop_locked(pool); + } + + ret1 = pthread_mutex_unlock(&pool->mutex); + assert(ret1 == 0); + + return ret; +} + +/* + * Destroy a thread pool. Wake up all idle threads for exit. The last + * one will free the pool. + */ + +int pthreadpool_destroy(struct pthreadpool *pool) +{ + int ret, ret1; + bool free_it; + + assert(!pool->destroyed); + + ret = pthread_mutex_lock(&pool->mutex); + if (ret != 0) { + return ret; + } + + pool->destroyed = true; + + if (!pool->stopped) { + ret = pthreadpool_stop_locked(pool); + } + + free_it = (pool->num_threads == 0); + + ret1 = pthread_mutex_unlock(&pool->mutex); + assert(ret1 == 0); + + if (free_it) { + pthreadpool_free(pool); + } + + return ret; +} +/* + * Prepare for pthread_exit(), pool->mutex must be locked and will be + * unlocked here. This is a bit of a layering violation, but here we + * also take care of removing the pool if we're the last thread. + */ +static void pthreadpool_server_exit(struct pthreadpool *pool) +{ + int ret; + bool free_it; + + pool->num_threads -= 1; + + free_it = (pool->destroyed && (pool->num_threads == 0)); + + ret = pthread_mutex_unlock(&pool->mutex); + assert(ret == 0); + + if (free_it) { + pthreadpool_free(pool); + } +} + +static bool pthreadpool_get_job(struct pthreadpool *p, + struct pthreadpool_job *job) +{ + if (p->stopped) { + return false; + } + + if (p->num_jobs == 0) { + return false; + } + *job = p->jobs[p->head]; + p->head = (p->head+1) % p->jobs_array_len; + p->num_jobs -= 1; + return true; +} + +static bool pthreadpool_put_job(struct pthreadpool *p, + int id, + void (*fn)(void *private_data), + void *private_data) +{ + struct pthreadpool_job *job; + + if (p->num_jobs == p->jobs_array_len) { + struct pthreadpool_job *tmp; + size_t new_len = p->jobs_array_len * 2; + + tmp = realloc( + p->jobs, sizeof(struct pthreadpool_job) * new_len); + if (tmp == NULL) { + return false; + } + p->jobs = tmp; + + /* + * We just doubled the jobs array. The array implements a FIFO + * queue with a modulo-based wraparound, so we have to memcpy + * the jobs that are logically at the queue end but physically + * before the queue head into the reallocated area. The new + * space starts at the current jobs_array_len, and we have to + * copy everything before the current head job into the new + * area. + */ + memcpy(&p->jobs[p->jobs_array_len], p->jobs, + sizeof(struct pthreadpool_job) * p->head); + + p->jobs_array_len = new_len; + } + + job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len]; + job->id = id; + job->fn = fn; + job->private_data = private_data; + + p->num_jobs += 1; + + return true; +} + +static void pthreadpool_undo_put_job(struct pthreadpool *p) +{ + p->num_jobs -= 1; +} + +static void *pthreadpool_server(void *arg) +{ + struct pthreadpool *pool = (struct pthreadpool *)arg; + int res; + + res = pthread_mutex_lock(&pool->mutex); + if (res != 0) { + return NULL; + } + + while (1) { + struct timespec ts; + struct pthreadpool_job job; + + /* + * idle-wait at most 1 second. If nothing happens in that + * time, exit this thread. + */ + + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 1; + + while ((pool->num_jobs == 0) && !pool->stopped) { + + pool->num_idle += 1; + res = pthread_cond_timedwait( + &pool->condvar, &pool->mutex, &ts); + pool->num_idle -= 1; + + if (pool->prefork_cond != NULL) { + /* + * Me must allow fork() to continue + * without anybody waiting on + * &pool->condvar. Tell + * pthreadpool_prepare_pool that we + * got that message. + */ + + res = pthread_cond_signal(pool->prefork_cond); + assert(res == 0); + + res = pthread_mutex_unlock(&pool->mutex); + assert(res == 0); + + /* + * pthreadpool_prepare_pool has + * already locked this mutex across + * the fork. This makes us wait + * without sitting in a condvar. + */ + res = pthread_mutex_lock(&pool->fork_mutex); + assert(res == 0); + res = pthread_mutex_unlock(&pool->fork_mutex); + assert(res == 0); + + res = pthread_mutex_lock(&pool->mutex); + assert(res == 0); + } + + if (res == ETIMEDOUT) { + + if (pool->num_jobs == 0) { + /* + * we timed out and still no work for + * us. Exit. + */ + pthreadpool_server_exit(pool); + return NULL; + } + + break; + } + assert(res == 0); + } + + if (pthreadpool_get_job(pool, &job)) { + int ret; + + /* + * Do the work with the mutex unlocked + */ + + res = pthread_mutex_unlock(&pool->mutex); + assert(res == 0); + + job.fn(job.private_data); + + ret = pool->signal_fn(job.id, + job.fn, job.private_data, + pool->signal_fn_private_data); + + res = pthread_mutex_lock(&pool->mutex); + assert(res == 0); + + if (ret != 0) { + pthreadpool_server_exit(pool); + return NULL; + } + } + + if (pool->stopped) { + /* + * we're asked to stop processing jobs, so exit + */ + pthreadpool_server_exit(pool); + return NULL; + } + } +} + +static int pthreadpool_create_thread(struct pthreadpool *pool) +{ + pthread_attr_t thread_attr; + pthread_t thread_id; + int res; + sigset_t mask, omask; + + /* + * Create a new worker thread. It should not receive any signals. + */ + + sigfillset(&mask); + + res = pthread_attr_init(&thread_attr); + if (res != 0) { + return res; + } + + res = pthread_attr_setdetachstate( + &thread_attr, PTHREAD_CREATE_DETACHED); + if (res != 0) { + pthread_attr_destroy(&thread_attr); + return res; + } + + res = pthread_sigmask(SIG_BLOCK, &mask, &omask); + if (res != 0) { + pthread_attr_destroy(&thread_attr); + return res; + } + + res = pthread_create(&thread_id, &thread_attr, pthreadpool_server, + (void *)pool); + + assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0); + + pthread_attr_destroy(&thread_attr); + + if (res == 0) { + pool->num_threads += 1; + } + + return res; +} + +int pthreadpool_add_job(struct pthreadpool *pool, int job_id, + void (*fn)(void *private_data), void *private_data) +{ + int res; + int unlock_res; + + assert(!pool->destroyed); + + res = pthread_mutex_lock(&pool->mutex); + if (res != 0) { + return res; + } + + if (pool->stopped) { + /* + * Protect against the pool being shut down while + * trying to add a job + */ + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return EINVAL; + } + + if (pool->max_threads == 0) { + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + + /* + * If no thread are allowed we do strict sync processing. + */ + fn(private_data); + res = pool->signal_fn(job_id, fn, private_data, + pool->signal_fn_private_data); + return res; + } + + /* + * Add job to the end of the queue + */ + if (!pthreadpool_put_job(pool, job_id, fn, private_data)) { + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return ENOMEM; + } + + if (pool->num_idle > 0) { + /* + * We have idle threads, wake one. + */ + res = pthread_cond_signal(&pool->condvar); + if (res != 0) { + pthreadpool_undo_put_job(pool); + } + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return res; + } + + if (pool->num_threads >= pool->max_threads) { + /* + * No more new threads, we just queue the request + */ + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return 0; + } + + res = pthreadpool_create_thread(pool); + if (res == 0) { + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return 0; + } + + if (pool->num_threads != 0) { + /* + * At least one thread is still available, let + * that one run the queued job. + */ + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return 0; + } + + pthreadpool_undo_put_job(pool); + + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + + return res; +} + +size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, + void (*fn)(void *private_data), void *private_data) +{ + int res; + size_t i, j; + size_t num = 0; + + assert(!pool->destroyed); + + res = pthread_mutex_lock(&pool->mutex); + if (res != 0) { + return res; + } + + for (i = 0, j = 0; i < pool->num_jobs; i++) { + size_t idx = (pool->head + i) % pool->jobs_array_len; + size_t new_idx = (pool->head + j) % pool->jobs_array_len; + struct pthreadpool_job *job = &pool->jobs[idx]; + + if ((job->private_data == private_data) && + (job->id == job_id) && + (job->fn == fn)) + { + /* + * Just skip the entry. + */ + num++; + continue; + } + + /* + * If we already removed one or more jobs (so j will be smaller + * then i), we need to fill possible gaps in the logical list. + */ + if (j < i) { + pool->jobs[new_idx] = *job; + } + j++; + } + + pool->num_jobs -= num; + + res = pthread_mutex_unlock(&pool->mutex); + assert(res == 0); + + return num; +} diff --git a/lib/pthreadpool/pthreadpool.h b/lib/pthreadpool/pthreadpool.h new file mode 100644 index 0000000..b473358 --- /dev/null +++ b/lib/pthreadpool/pthreadpool.h @@ -0,0 +1,158 @@ +/* + * Unix SMB/CIFS implementation. + * threadpool implementation based on pthreads + * Copyright (C) Volker Lendecke 2009,2011 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __PTHREADPOOL_H__ +#define __PTHREADPOOL_H__ + +struct pthreadpool; + +/** + * @defgroup pthreadpool The pthreadpool API + * + * This API provides a way to run threadsafe functions in a helper + * thread. It is initially intended to run getaddrinfo asynchronously. + */ + + +/** + * @brief Create a pthreadpool + * + * A struct pthreadpool is the basis for for running threads in the + * background. + * + * @param[in] max_threads Maximum parallelism in this pool + * @param[out] presult Pointer to the threadpool returned + * @return success: 0, failure: errno + * + * max_threads=0 means unlimited parallelism. The caller has to take + * care to not overload the system. + */ +int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, + int (*signal_fn)(int jobid, + void (*job_fn)(void *private_data), + void *job_fn_private_data, + void *private_data), + void *signal_fn_private_data); + +/** + * @brief Get the max threads value of pthreadpool + * + * @note This can be 0 for strict sync processing. + * + * @param[in] pool The pool + * @return number of possible threads + */ +size_t pthreadpool_max_threads(struct pthreadpool *pool); + +/** + * @brief The number of queued jobs of pthreadpool + * + * This is the number of jobs added by pthreadpool_add_job(), + * which are not yet processed by a thread. + * + * @param[in] pool The pool + * @return The number of jobs + */ +size_t pthreadpool_queued_jobs(struct pthreadpool *pool); + +/** + * @brief Stop a pthreadpool + * + * Stop a pthreadpool. If jobs are submitted, but not yet active in + * a thread, they won't get executed. If a job has already been + * submitted to a thread, the job function will continue running, and + * the signal function might still be called. + * + * This allows a multi step shutdown using pthreadpool_stop(), + * pthreadpool_cancel_job() and pthreadpool_destroy(). + * + * @param[in] pool The pool to stop + * @return success: 0, failure: errno + * + * @see pthreadpool_cancel_job() + * @see pthreadpool_destroy() + */ +int pthreadpool_stop(struct pthreadpool *pool); + +/** + * @brief Destroy a pthreadpool + * + * This basically implies pthreadpool_stop() if the pool + * isn't already stopped. + * + * Destroy a pthreadpool. If jobs are submitted, but not yet active in + * a thread, they won't get executed. If a job has already been + * submitted to a thread, the job function will continue running, and + * the signal function might still be called. The caller of + * pthreadpool_init must make sure the required resources are still + * around when the pool is destroyed with pending jobs. The last + * thread to exit will finally free() the pool memory. + * + * @param[in] pool The pool to destroy + * @return success: 0, failure: errno + * + * @see pthreadpool_stop() + */ +int pthreadpool_destroy(struct pthreadpool *pool); + +/** + * @brief Add a job to a pthreadpool + * + * This adds a job to a pthreadpool. The job can be identified by + * job_id. This integer will be passed to signal_fn() when the + * job is completed. + * + * @param[in] pool The pool to run the job on + * @param[in] job_id A custom identifier + * @param[in] fn The function to run asynchronously + * @param[in] private_data Pointer passed to fn + * @return success: 0, failure: errno + */ +int pthreadpool_add_job(struct pthreadpool *pool, int job_id, + void (*fn)(void *private_data), void *private_data); + +/** + * @brief Try to cancel a job in a pthreadpool + * + * This tries to cancel a job in a pthreadpool. The same + * arguments, which were given to pthreadpool_add_job() + * needs to be passed. + * + * The combination of id, fn, private_data might not be unique. + * So the function tries to cancel as much matching jobs as possible. + * Note once a job is scheduled in a thread it's to late to + * cancel it. + * + * Canceled jobs that weren't started yet won't be reported via a + * pool's signal_fn. + * + * @param[in] pool The pool to run the job on + * @param[in] job_id A custom identifier + * @param[in] fn The function to run asynchronously + * @param[in] private_data Pointer passed to fn + * @return The number of canceled jobs + * + * @see pthreadpool_add_job() + * @see pthreadpool_stop() + * @see pthreadpool_destroy() + */ +size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, + void (*fn)(void *private_data), void *private_data); + +#endif diff --git a/lib/pthreadpool/pthreadpool_pipe.c b/lib/pthreadpool/pthreadpool_pipe.c new file mode 100644 index 0000000..d6d519a --- /dev/null +++ b/lib/pthreadpool/pthreadpool_pipe.c @@ -0,0 +1,202 @@ +/* + * Unix SMB/CIFS implementation. + * threadpool implementation based on pthreads + * Copyright (C) Volker Lendecke 2009,2011 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "replace.h" +#include "system/filesys.h" +#include "pthreadpool_pipe.h" +#include "pthreadpool.h" + +struct pthreadpool_pipe { + struct pthreadpool *pool; + int num_jobs; + pid_t pid; + int pipe_fds[2]; +}; + +static int pthreadpool_pipe_signal(int jobid, + void (*job_fn)(void *private_data), + void *job_private_data, + void *private_data); + +int pthreadpool_pipe_init(unsigned max_threads, + struct pthreadpool_pipe **presult) +{ + struct pthreadpool_pipe *pool; + int ret; + + pool = calloc(1, sizeof(struct pthreadpool_pipe)); + if (pool == NULL) { + return ENOMEM; + } + pool->pid = getpid(); + + ret = pipe(pool->pipe_fds); + if (ret == -1) { + int err = errno; + free(pool); + return err; + } + + ret = pthreadpool_init(max_threads, &pool->pool, + pthreadpool_pipe_signal, pool); + if (ret != 0) { + close(pool->pipe_fds[0]); + close(pool->pipe_fds[1]); + free(pool); + return ret; + } + + *presult = pool; + return 0; +} + +static int pthreadpool_pipe_signal(int jobid, + void (*job_fn)(void *private_data), + void *job_private_data, + void *private_data) +{ + struct pthreadpool_pipe *pool = private_data; + ssize_t written; + + do { + written = write(pool->pipe_fds[1], &jobid, sizeof(jobid)); + } while ((written == -1) && (errno == EINTR)); + + if (written != sizeof(jobid)) { + return errno; + } + + return 0; +} + +int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool) +{ + int ret; + + if (pool->num_jobs != 0) { + return EBUSY; + } + + ret = pthreadpool_destroy(pool->pool); + if (ret != 0) { + return ret; + } + + close(pool->pipe_fds[0]); + pool->pipe_fds[0] = -1; + + close(pool->pipe_fds[1]); + pool->pipe_fds[1] = -1; + + free(pool); + return 0; +} + +static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool) +{ + pid_t pid = getpid(); + int signal_fd; + int ret; + + if (pid == pool->pid) { + return 0; + } + + signal_fd = pool->pipe_fds[0]; + + close(pool->pipe_fds[0]); + pool->pipe_fds[0] = -1; + + close(pool->pipe_fds[1]); + pool->pipe_fds[1] = -1; + + ret = pipe(pool->pipe_fds); + if (ret != 0) { + return errno; + } + + ret = dup2(pool->pipe_fds[0], signal_fd); + if (ret != 0) { + return errno; + } + + pool->pipe_fds[0] = signal_fd; + pool->num_jobs = 0; + + return 0; +} + +int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id, + void (*fn)(void *private_data), + void *private_data) +{ + int ret; + + ret = pthreadpool_pipe_reinit(pool); + if (ret != 0) { + return ret; + } + + ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data); + if (ret != 0) { + return ret; + } + + pool->num_jobs += 1; + + return 0; +} + +int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool) +{ + return pool->pipe_fds[0]; +} + +int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids, + unsigned num_jobids) +{ + ssize_t to_read, nread, num_jobs; + pid_t pid = getpid(); + + if (pool->pid != pid) { + return EINVAL; + } + + to_read = sizeof(int) * num_jobids; + + do { + nread = read(pool->pipe_fds[0], jobids, to_read); + } while ((nread == -1) && (errno == EINTR)); + + if (nread == -1) { + return -errno; + } + if ((nread % sizeof(int)) != 0) { + return -EINVAL; + } + + num_jobs = nread / sizeof(int); + + if (num_jobs > pool->num_jobs) { + return -EINVAL; + } + pool->num_jobs -= num_jobs; + + return num_jobs; +} diff --git a/lib/pthreadpool/pthreadpool_pipe.h b/lib/pthreadpool/pthreadpool_pipe.h new file mode 100644 index 0000000..77516f7 --- /dev/null +++ b/lib/pthreadpool/pthreadpool_pipe.h @@ -0,0 +1,39 @@ +/* + * Unix SMB/CIFS implementation. + * threadpool implementation based on pthreads + * Copyright (C) Volker Lendecke 2009,2011 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __PTHREADPOOL_PIPE_H__ +#define __PTHREADPOOL_PIPE_H__ + +struct pthreadpool_pipe; + +int pthreadpool_pipe_init(unsigned max_threads, + struct pthreadpool_pipe **presult); + +int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool); + +int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id, + void (*fn)(void *private_data), + void *private_data); + +int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool); + +int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids, + unsigned num_jobids); + +#endif diff --git a/lib/pthreadpool/pthreadpool_sync.c b/lib/pthreadpool/pthreadpool_sync.c new file mode 100644 index 0000000..48e6a0d --- /dev/null +++ b/lib/pthreadpool/pthreadpool_sync.c @@ -0,0 +1,97 @@ +/* + * Unix SMB/CIFS implementation. + * sync dummy implementation of the pthreadpool API + * Copyright (C) Volker Lendecke 2009 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + + +#include "replace.h" +#include "pthreadpool.h" + +struct pthreadpool { + bool stopped; + + /* + * Indicate job completion + */ + int (*signal_fn)(int jobid, + void (*job_fn)(void *private_data), + void *job_fn_private_data, + void *private_data); + void *signal_fn_private_data; +}; + +int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, + int (*signal_fn)(int jobid, + void (*job_fn)(void *private_data), + void *job_fn_private_data, + void *private_data), + void *signal_fn_private_data) +{ + struct pthreadpool *pool; + + pool = (struct pthreadpool *)calloc(1, sizeof(struct pthreadpool)); + if (pool == NULL) { + return ENOMEM; + } + pool->stopped = false; + pool->signal_fn = signal_fn; + pool->signal_fn_private_data = signal_fn_private_data; + + *presult = pool; + return 0; +} + +size_t pthreadpool_max_threads(struct pthreadpool *pool) +{ + return 0; +} + +size_t pthreadpool_queued_jobs(struct pthreadpool *pool) +{ + return 0; +} + +int pthreadpool_add_job(struct pthreadpool *pool, int job_id, + void (*fn)(void *private_data), void *private_data) +{ + if (pool->stopped) { + return EINVAL; + } + + fn(private_data); + + return pool->signal_fn(job_id, fn, private_data, + pool->signal_fn_private_data); +} + +size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, + void (*fn)(void *private_data), void *private_data) +{ + return 0; +} + +int pthreadpool_stop(struct pthreadpool *pool) +{ + pool->stopped = true; + return 0; +} + +int pthreadpool_destroy(struct pthreadpool *pool) +{ + free(pool); + return 0; +} diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c new file mode 100644 index 0000000..389bb06 --- /dev/null +++ b/lib/pthreadpool/pthreadpool_tevent.c @@ -0,0 +1,428 @@ +/* + * Unix SMB/CIFS implementation. + * threadpool implementation based on pthreads + * Copyright (C) Volker Lendecke 2009,2011 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "replace.h" +#include "system/filesys.h" +#include "pthreadpool_tevent.h" +#include "pthreadpool.h" +#include "lib/util/tevent_unix.h" +#include "lib/util/dlinklist.h" + +struct pthreadpool_tevent_job_state; + +/* + * We need one pthreadpool_tevent_glue object per unique combintaion of tevent + * contexts and pthreadpool_tevent objects. Maintain a list of used tevent + * contexts in a pthreadpool_tevent. + */ +struct pthreadpool_tevent_glue { + struct pthreadpool_tevent_glue *prev, *next; + struct pthreadpool_tevent *pool; /* back-pointer to owning object. */ + /* Tuple we are keeping track of in this list. */ + struct tevent_context *ev; + struct tevent_threaded_context *tctx; + /* Pointer to link object owned by *ev. */ + struct pthreadpool_tevent_glue_ev_link *ev_link; +}; + +/* + * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the + * tevent context from our list of active event contexts if the event context + * is destroyed. + * This structure is talloc()'ed from the struct tevent_context *, and is a + * back-pointer allowing the related struct pthreadpool_tevent_glue object + * to be removed from the struct pthreadpool_tevent glue list if the owning + * tevent_context is talloc_free()'ed. + */ +struct pthreadpool_tevent_glue_ev_link { + struct pthreadpool_tevent_glue *glue; +}; + +struct pthreadpool_tevent { + struct pthreadpool *pool; + struct pthreadpool_tevent_glue *glue_list; + + struct pthreadpool_tevent_job_state *jobs; +}; + +struct pthreadpool_tevent_job_state { + struct pthreadpool_tevent_job_state *prev, *next; + struct pthreadpool_tevent *pool; + struct tevent_context *ev; + struct tevent_immediate *im; + struct tevent_req *req; + + void (*fn)(void *private_data); + void *private_data; +}; + +static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool); + +static int pthreadpool_tevent_job_signal(int jobid, + void (*job_fn)(void *private_data), + void *job_private_data, + void *private_data); + +int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads, + struct pthreadpool_tevent **presult) +{ + struct pthreadpool_tevent *pool; + int ret; + + pool = talloc_zero(mem_ctx, struct pthreadpool_tevent); + if (pool == NULL) { + return ENOMEM; + } + + ret = pthreadpool_init(max_threads, &pool->pool, + pthreadpool_tevent_job_signal, pool); + if (ret != 0) { + TALLOC_FREE(pool); + return ret; + } + + talloc_set_destructor(pool, pthreadpool_tevent_destructor); + + *presult = pool; + return 0; +} + +size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool) +{ + if (pool->pool == NULL) { + return 0; + } + + return pthreadpool_max_threads(pool->pool); +} + +size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool) +{ + if (pool->pool == NULL) { + return 0; + } + + return pthreadpool_queued_jobs(pool->pool); +} + +static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool) +{ + struct pthreadpool_tevent_job_state *state, *next; + struct pthreadpool_tevent_glue *glue = NULL; + int ret; + + ret = pthreadpool_stop(pool->pool); + if (ret != 0) { + return ret; + } + + for (state = pool->jobs; state != NULL; state = next) { + next = state->next; + DLIST_REMOVE(pool->jobs, state); + state->pool = NULL; + } + + /* + * Delete all the registered + * tevent_context/tevent_threaded_context + * pairs. + */ + for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) { + /* The glue destructor removes it from the list */ + TALLOC_FREE(glue); + } + pool->glue_list = NULL; + + ret = pthreadpool_destroy(pool->pool); + if (ret != 0) { + return ret; + } + pool->pool = NULL; + + return 0; +} + +static int pthreadpool_tevent_glue_destructor( + struct pthreadpool_tevent_glue *glue) +{ + if (glue->pool->glue_list != NULL) { + DLIST_REMOVE(glue->pool->glue_list, glue); + } + + /* Ensure the ev_link destructor knows we're gone */ + glue->ev_link->glue = NULL; + + TALLOC_FREE(glue->ev_link); + TALLOC_FREE(glue->tctx); + + return 0; +} + +/* + * Destructor called either explicitly from + * pthreadpool_tevent_glue_destructor(), or indirectly + * when owning tevent_context is destroyed. + * + * When called from pthreadpool_tevent_glue_destructor() + * ev_link->glue is already NULL, so this does nothing. + * + * When called from talloc_free() of the owning + * tevent_context we must ensure we also remove the + * linked glue object from the list inside + * struct pthreadpool_tevent. + */ +static int pthreadpool_tevent_glue_link_destructor( + struct pthreadpool_tevent_glue_ev_link *ev_link) +{ + TALLOC_FREE(ev_link->glue); + return 0; +} + +static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool, + struct tevent_context *ev) +{ + struct pthreadpool_tevent_glue *glue = NULL; + struct pthreadpool_tevent_glue_ev_link *ev_link = NULL; + + /* + * See if this tevent_context was already registered by + * searching the glue object list. If so we have nothing + * to do here - we already have a tevent_context/tevent_threaded_context + * pair. + */ + for (glue = pool->glue_list; glue != NULL; glue = glue->next) { + if (glue->ev == ev) { + return 0; + } + } + + /* + * Event context not yet registered - create a new glue + * object containing a tevent_context/tevent_threaded_context + * pair and put it on the list to remember this registration. + * We also need a link object to ensure the event context + * can't go away without us knowing about it. + */ + glue = talloc_zero(pool, struct pthreadpool_tevent_glue); + if (glue == NULL) { + return ENOMEM; + } + *glue = (struct pthreadpool_tevent_glue) { + .pool = pool, + .ev = ev, + }; + talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor); + + /* + * Now allocate the link object to the event context. Note this + * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event + * context is freed we are able to cleanup the glue object + * in the link object destructor. + */ + + ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link); + if (ev_link == NULL) { + TALLOC_FREE(glue); + return ENOMEM; + } + ev_link->glue = glue; + talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor); + + glue->ev_link = ev_link; + +#ifdef HAVE_PTHREAD + glue->tctx = tevent_threaded_context_create(glue, ev); + if (glue->tctx == NULL) { + TALLOC_FREE(ev_link); + TALLOC_FREE(glue); + return ENOMEM; + } +#endif + + DLIST_ADD(pool->glue_list, glue); + return 0; +} + +static void pthreadpool_tevent_job_fn(void *private_data); +static void pthreadpool_tevent_job_done(struct tevent_context *ctx, + struct tevent_immediate *im, + void *private_data); + +static int pthreadpool_tevent_job_state_destructor( + struct pthreadpool_tevent_job_state *state) +{ + if (state->pool == NULL) { + return 0; + } + + /* + * We should never be called with state->req == NULL, + * state->pool must be cleared before the 2nd talloc_free(). + */ + if (state->req == NULL) { + abort(); + } + + /* + * We need to reparent to a long term context. + */ + (void)talloc_reparent(state->req, NULL, state); + state->req = NULL; + return -1; +} + +struct tevent_req *pthreadpool_tevent_job_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct pthreadpool_tevent *pool, + void (*fn)(void *private_data), void *private_data) +{ + struct tevent_req *req; + struct pthreadpool_tevent_job_state *state; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct pthreadpool_tevent_job_state); + if (req == NULL) { + return NULL; + } + state->pool = pool; + state->ev = ev; + state->req = req; + state->fn = fn; + state->private_data = private_data; + + if (pool == NULL) { + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } + if (pool->pool == NULL) { + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } + + state->im = tevent_create_immediate(state); + if (tevent_req_nomem(state->im, req)) { + return tevent_req_post(req, ev); + } + + ret = pthreadpool_tevent_register_ev(pool, ev); + if (tevent_req_error(req, ret)) { + return tevent_req_post(req, ev); + } + + ret = pthreadpool_add_job(pool->pool, 0, + pthreadpool_tevent_job_fn, + state); + if (tevent_req_error(req, ret)) { + return tevent_req_post(req, ev); + } + + /* + * Once the job is scheduled, we need to protect + * our memory. + */ + talloc_set_destructor(state, pthreadpool_tevent_job_state_destructor); + + DLIST_ADD_END(pool->jobs, state); + + return req; +} + +static void pthreadpool_tevent_job_fn(void *private_data) +{ + struct pthreadpool_tevent_job_state *state = talloc_get_type_abort( + private_data, struct pthreadpool_tevent_job_state); + state->fn(state->private_data); +} + +static int pthreadpool_tevent_job_signal(int jobid, + void (*job_fn)(void *private_data), + void *job_private_data, + void *private_data) +{ + struct pthreadpool_tevent_job_state *state = talloc_get_type_abort( + job_private_data, struct pthreadpool_tevent_job_state); + struct tevent_threaded_context *tctx = NULL; + struct pthreadpool_tevent_glue *g = NULL; + + if (state->pool == NULL) { + /* The pthreadpool_tevent is already gone */ + return 0; + } + +#ifdef HAVE_PTHREAD + for (g = state->pool->glue_list; g != NULL; g = g->next) { + if (g->ev == state->ev) { + tctx = g->tctx; + break; + } + } + + if (tctx == NULL) { + abort(); + } +#endif + + if (tctx != NULL) { + /* with HAVE_PTHREAD */ + tevent_threaded_schedule_immediate(tctx, state->im, + pthreadpool_tevent_job_done, + state); + } else { + /* without HAVE_PTHREAD */ + tevent_schedule_immediate(state->im, state->ev, + pthreadpool_tevent_job_done, + state); + } + + return 0; +} + +static void pthreadpool_tevent_job_done(struct tevent_context *ctx, + struct tevent_immediate *im, + void *private_data) +{ + struct pthreadpool_tevent_job_state *state = talloc_get_type_abort( + private_data, struct pthreadpool_tevent_job_state); + + if (state->pool != NULL) { + DLIST_REMOVE(state->pool->jobs, state); + state->pool = NULL; + } + + if (state->req == NULL) { + /* + * There was a talloc_free() state->req + * while the job was pending, + * which mean we're reparented on a longterm + * talloc context. + * + * We just cleanup here... + */ + talloc_free(state); + return; + } + + tevent_req_done(state->req); +} + +int pthreadpool_tevent_job_recv(struct tevent_req *req) +{ + return tevent_req_simple_recv_unix(req); +} diff --git a/lib/pthreadpool/pthreadpool_tevent.h b/lib/pthreadpool/pthreadpool_tevent.h new file mode 100644 index 0000000..10d3a71 --- /dev/null +++ b/lib/pthreadpool/pthreadpool_tevent.h @@ -0,0 +1,40 @@ +/* + * Unix SMB/CIFS implementation. + * threadpool implementation based on pthreads + * Copyright (C) Volker Lendecke 2016 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __PTHREADPOOL_TEVENT_H__ +#define __PTHREADPOOL_TEVENT_H__ + +#include <tevent.h> + +struct pthreadpool_tevent; + +int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads, + struct pthreadpool_tevent **presult); + +size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool); +size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool); + +struct tevent_req *pthreadpool_tevent_job_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct pthreadpool_tevent *pool, + void (*fn)(void *private_data), void *private_data); + +int pthreadpool_tevent_job_recv(struct tevent_req *req); + +#endif diff --git a/lib/pthreadpool/tests.c b/lib/pthreadpool/tests.c new file mode 100644 index 0000000..08cb59e --- /dev/null +++ b/lib/pthreadpool/tests.c @@ -0,0 +1,517 @@ +#include <stdio.h> +#include <string.h> +#include <poll.h> +#include <errno.h> +#include <stdlib.h> +#include <limits.h> +#include <pthread.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/wait.h> +#include <signal.h> +#include "pthreadpool_pipe.h" +#include "pthreadpool_tevent.h" + +static int test_init(void) +{ + struct pthreadpool_pipe *p; + int ret; + + ret = pthreadpool_pipe_init(1, &p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_pipe_init failed: %s\n", + strerror(ret)); + return -1; + } + ret = pthreadpool_pipe_destroy(p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n", + strerror(ret)); + return -1; + } + return 0; +} + +static void test_sleep(void *ptr) +{ + int *ptimeout = (int *)ptr; + int ret; + ret = poll(NULL, 0, *ptimeout); + if (ret != 0) { + fprintf(stderr, "poll returned %d (%s)\n", + ret, strerror(errno)); + } +} + +static int test_jobs(int num_threads, int num_jobs) +{ + char *finished; + struct pthreadpool_pipe *p; + int timeout = 1; + int i, ret; + + finished = (char *)calloc(1, num_jobs); + if (finished == NULL) { + fprintf(stderr, "calloc failed\n"); + return -1; + } + + ret = pthreadpool_pipe_init(num_threads, &p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_pipe_init failed: %s\n", + strerror(ret)); + free(finished); + return -1; + } + + for (i=0; i<num_jobs; i++) { + ret = pthreadpool_pipe_add_job(p, i, test_sleep, &timeout); + if (ret != 0) { + fprintf(stderr, "pthreadpool_pipe_add_job failed: " + "%s\n", strerror(ret)); + free(finished); + return -1; + } + } + + for (i=0; i<num_jobs; i++) { + int jobid = -1; + ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1); + if (ret < 0) { + fprintf(stderr, "pthreadpool_pipe_finished_jobs " + "failed: %s\n", strerror(-ret)); + free(finished); + return -1; + } + if ((ret != 1) || (jobid >= num_jobs)) { + fprintf(stderr, "invalid job number %d\n", jobid); + free(finished); + return -1; + } + finished[jobid] += 1; + } + + for (i=0; i<num_jobs; i++) { + if (finished[i] != 1) { + fprintf(stderr, "finished[%d] = %d\n", + i, finished[i]); + free(finished); + return -1; + } + } + + ret = pthreadpool_pipe_destroy(p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n", + strerror(ret)); + free(finished); + return -1; + } + + free(finished); + return 0; +} + +static int test_busydestroy(void) +{ + struct pthreadpool_pipe *p; + int timeout = 50; + struct pollfd pfd; + int ret, jobid; + + ret = pthreadpool_pipe_init(1, &p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_pipe_init failed: %s\n", + strerror(ret)); + return -1; + } + ret = pthreadpool_pipe_add_job(p, 1, test_sleep, &timeout); + if (ret != 0) { + fprintf(stderr, "pthreadpool_pipe_add_job failed: %s\n", + strerror(ret)); + return -1; + } + ret = pthreadpool_pipe_destroy(p); + if (ret != EBUSY) { + fprintf(stderr, "Could destroy a busy pool\n"); + return -1; + } + + pfd.fd = pthreadpool_pipe_signal_fd(p); + pfd.events = POLLIN|POLLERR; + + do { + ret = poll(&pfd, 1, -1); + } while ((ret == -1) && (errno == EINTR)); + + ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1); + if (ret < 0) { + fprintf(stderr, "pthreadpool_pipe_finished_jobs failed: %s\n", + strerror(-ret)); + return -1; + } + + ret = pthreadpool_pipe_destroy(p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n", + strerror(ret)); + return -1; + } + return 0; +} + +static int test_fork(void) +{ + struct pthreadpool_pipe *p; + pid_t child, waited; + int status, ret; + + ret = pthreadpool_pipe_init(1, &p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_pipe_init failed: %s\n", + strerror(ret)); + return -1; + } + ret = pthreadpool_pipe_destroy(p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n", + strerror(ret)); + return -1; + } + + child = fork(); + if (child < 0) { + perror("fork failed"); + return -1; + } + if (child == 0) { + exit(0); + } + waited = wait(&status); + if (waited == -1) { + perror("wait failed"); + return -1; + } + if (waited != child) { + fprintf(stderr, "expected child %d, got %d\n", + (int)child, (int)waited); + return -1; + } + return 0; +} + +static void busyfork_job(void *private_data) +{ + return; +} + +static int test_busyfork(void) +{ + struct pthreadpool_pipe *p; + int fds[2]; + struct pollfd pfd; + pid_t child, waitret; + int ret, jobnum, wstatus; + + ret = pipe(fds); + if (ret == -1) { + perror("pipe failed"); + return -1; + } + + ret = pthreadpool_pipe_init(1, &p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_pipe_init failed: %s\n", + strerror(ret)); + return -1; + } + + ret = pthreadpool_pipe_add_job(p, 1, busyfork_job, NULL); + if (ret != 0) { + fprintf(stderr, "pthreadpool_add_job failed: %s\n", + strerror(ret)); + return -1; + } + + ret = pthreadpool_pipe_finished_jobs(p, &jobnum, 1); + if (ret != 1) { + fprintf(stderr, "pthreadpool_pipe_finished_jobs failed\n"); + return -1; + } + + ret = poll(NULL, 0, 200); + if (ret == -1) { + perror("poll failed"); + return -1; + } + + child = fork(); + if (child < 0) { + perror("fork failed"); + return -1; + } + + if (child == 0) { + ret = pthreadpool_pipe_destroy(p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_pipe_destroy failed: " + "%s\n", strerror(ret)); + exit(1); + } + exit(0); + } + + ret = close(fds[1]); + if (ret == -1) { + perror("close failed"); + return -1; + } + + pfd = (struct pollfd) { .fd = fds[0], .events = POLLIN }; + + ret = poll(&pfd, 1, 5000); + if (ret == -1) { + perror("poll failed"); + return -1; + } + if (ret == 0) { + fprintf(stderr, "Child did not exit for 5 seconds\n"); + /* + * The child might hang forever in + * pthread_cond_destroy for example. Be kind to the + * system and kill it. + */ + kill(child, SIGTERM); + return -1; + } + if (ret != 1) { + fprintf(stderr, "poll returned %d -- huh??\n", ret); + return -1; + } + + ret = poll(NULL, 0, 200); + if (ret == -1) { + perror("poll failed"); + return -1; + } + + waitret = waitpid(child, &wstatus, WNOHANG); + if (waitret != child) { + fprintf(stderr, "waitpid returned %d\n", (int)waitret); + return -1; + } + + if (!WIFEXITED(wstatus)) { + fprintf(stderr, "child did not properly exit\n"); + return -1; + } + + ret = WEXITSTATUS(wstatus); + if (ret != 0) { + fprintf(stderr, "child returned %d\n", ret); + return -1; + } + + return 0; +} + +static int test_busyfork2(void) +{ + struct pthreadpool_pipe *p; + pid_t child; + int ret, jobnum; + struct pollfd pfd; + + ret = pthreadpool_pipe_init(1, &p); + if (ret != 0) { + fprintf(stderr, "pthreadpool_pipe_init failed: %s\n", + strerror(ret)); + return -1; + } + + ret = pthreadpool_pipe_add_job(p, 1, busyfork_job, NULL); + if (ret != 0) { + fprintf(stderr, "pthreadpool_add_job failed: %s\n", + strerror(ret)); + return -1; + } + + ret = pthreadpool_pipe_finished_jobs(p, &jobnum, 1); + if (ret != 1) { + fprintf(stderr, "pthreadpool_pipe_finished_jobs failed\n"); + return -1; + } + + ret = poll(NULL, 0, 10); + if (ret == -1) { + perror("poll failed"); + return -1; + } + + ret = pthreadpool_pipe_add_job(p, 1, busyfork_job, NULL); + if (ret != 0) { + fprintf(stderr, "pthreadpool_add_job failed: %s\n", + strerror(ret)); + return -1; + } + + /* + * Do the fork right after the add_job. This tests a race + * where the atfork prepare handler gets all idle threads off + * the condvar. If we are faster doing the fork than the + * existing idle thread could get out of idle and take the + * job, after the fork we end up with no threads to take care + * of the job. + */ + + child = fork(); + if (child < 0) { + perror("fork failed"); + return -1; + } + + if (child == 0) { + exit(0); + } + + pfd = (struct pollfd) { + .fd = pthreadpool_pipe_signal_fd(p), + .events = POLLIN|POLLERR + }; + + do { + ret = poll(&pfd, 1, 5000); + } while ((ret == -1) && (errno == EINTR)); + + if (ret == 0) { + fprintf(stderr, "job unfinished after 5 seconds\n"); + return -1; + } + + return 0; +} + +static void test_tevent_wait(void *private_data) +{ + int *timeout = private_data; + poll(NULL, 0, *timeout); +} + +static int test_tevent_1(void) +{ + struct tevent_context *ev; + struct pthreadpool_tevent *pool; + struct tevent_req *req1, *req2; + int timeout10 = 10; + int timeout100 = 100; + int ret; + bool ok; + + ev = tevent_context_init(NULL); + if (ev == NULL) { + ret = errno; + fprintf(stderr, "tevent_context_init failed: %s\n", + strerror(ret)); + return ret; + } + ret = pthreadpool_tevent_init(ev, UINT_MAX, &pool); + if (ret != 0) { + fprintf(stderr, "pthreadpool_tevent_init failed: %s\n", + strerror(ret)); + TALLOC_FREE(ev); + return ret; + } + req1 = pthreadpool_tevent_job_send( + ev, ev, pool, test_tevent_wait, &timeout10); + if (req1 == NULL) { + fprintf(stderr, "pthreadpool_tevent_job_send failed\n"); + TALLOC_FREE(ev); + return ENOMEM; + } + req2 = pthreadpool_tevent_job_send( + ev, ev, pool, test_tevent_wait, &timeout100); + if (req2 == NULL) { + fprintf(stderr, "pthreadpool_tevent_job_send failed\n"); + TALLOC_FREE(ev); + return ENOMEM; + } + ok = tevent_req_poll(req2, ev); + if (!ok) { + ret = errno; + fprintf(stderr, "tevent_req_poll failed: %s\n", + strerror(ret)); + TALLOC_FREE(ev); + return ret; + } + ret = pthreadpool_tevent_job_recv(req1); + TALLOC_FREE(req1); + if (ret != 0) { + fprintf(stderr, "tevent_req_poll failed: %s\n", + strerror(ret)); + TALLOC_FREE(ev); + return ret; + } + + TALLOC_FREE(req2); + + ret = tevent_loop_wait(ev); + if (ret != 0) { + fprintf(stderr, "tevent_loop_wait failed\n"); + return ret; + } + + TALLOC_FREE(pool); + TALLOC_FREE(ev); + return 0; +} + +int main(void) +{ + int ret; + + ret = test_tevent_1(); + if (ret != 0) { + fprintf(stderr, "test_event_1 failed: %s\n", + strerror(ret)); + return 1; + } + + ret = test_init(); + if (ret != 0) { + fprintf(stderr, "test_init failed\n"); + return 1; + } + + ret = test_fork(); + if (ret != 0) { + fprintf(stderr, "test_fork failed\n"); + return 1; + } + + ret = test_jobs(10, 10000); + if (ret != 0) { + fprintf(stderr, "test_jobs failed\n"); + return 1; + } + + ret = test_busydestroy(); + if (ret != 0) { + fprintf(stderr, "test_busydestroy failed\n"); + return 1; + } + + ret = test_busyfork(); + if (ret != 0) { + fprintf(stderr, "test_busyfork failed\n"); + return 1; + } + + ret = test_busyfork2(); + if (ret != 0) { + fprintf(stderr, "test_busyfork2 failed\n"); + return 1; + } + + printf("success\n"); + return 0; +} diff --git a/lib/pthreadpool/tests_cmocka.c b/lib/pthreadpool/tests_cmocka.c new file mode 100644 index 0000000..e6af884 --- /dev/null +++ b/lib/pthreadpool/tests_cmocka.c @@ -0,0 +1,247 @@ +/* + * Unix SMB/CIFS implementation. + * cmocka tests for thread pool implementation + * Copyright (C) Christof Schmitt 2017 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <errno.h> +#include <pthread.h> +#include <setjmp.h> +#include <stdlib.h> +#include <string.h> +#include <limits.h> + +#include <talloc.h> +#include <tevent.h> +#include <pthreadpool_tevent.h> + +#include <cmocka.h> +#include <poll.h> + +struct pthreadpool_tevent_test { + struct tevent_context *ev; + struct pthreadpool_tevent *upool; + struct pthreadpool_tevent *spool; + struct pthreadpool_tevent *opool; +}; + +static int setup_pthreadpool_tevent(void **state) +{ + struct pthreadpool_tevent_test *t; + int ret; + size_t max_threads; + + t = talloc_zero(NULL, struct pthreadpool_tevent_test); + assert_non_null(t); + + t->ev = tevent_context_init(t); + assert_non_null(t->ev); + + ret = pthreadpool_tevent_init(t->ev, UINT_MAX, &t->upool); + assert_int_equal(ret, 0); + + max_threads = pthreadpool_tevent_max_threads(t->upool); + assert_int_equal(max_threads, UINT_MAX); + + ret = pthreadpool_tevent_init(t->ev, 1, &t->opool); + assert_int_equal(ret, 0); + + max_threads = pthreadpool_tevent_max_threads(t->opool); + assert_int_equal(max_threads, 1); + + ret = pthreadpool_tevent_init(t->ev, 0, &t->spool); + assert_int_equal(ret, 0); + + max_threads = pthreadpool_tevent_max_threads(t->spool); + assert_int_equal(max_threads, 0); + + *state = t; + + return 0; +} + +static int teardown_pthreadpool_tevent(void **state) +{ + struct pthreadpool_tevent_test *t = *state; + + TALLOC_FREE(t); + + return 0; +} + +int __wrap_pthread_create(pthread_t *thread, const pthread_attr_t *attr, + void *(*start_routine) (void *), void *arg); +int __real_pthread_create(pthread_t *thread, const pthread_attr_t *attr, + void *(*start_routine) (void *), void *arg); + +int __wrap_pthread_create(pthread_t *thread, const pthread_attr_t *attr, + void *(*start_routine) (void *), void *arg) +{ + int error; + + error = mock_type(int); + if (error != 0) { + return error; + } + + return __real_pthread_create(thread, attr, start_routine, arg); +} + +static void test_job_threadid(void *ptr) +{ + pthread_t *threadid = ptr; + + *threadid = pthread_self(); +} + +static int test_create_do(struct tevent_context *ev, + struct pthreadpool_tevent *pool, + bool *executed, + bool *in_main_thread) +{ + struct tevent_req *req; + pthread_t zero_thread; + pthread_t main_thread; + pthread_t worker_thread; + bool ok; + int ret; + + *executed = false; + *in_main_thread = false; + + memset(&zero_thread, 0, sizeof(zero_thread)); + main_thread = pthread_self(); + worker_thread = zero_thread; + + req = pthreadpool_tevent_job_send( + ev, ev, pool, test_job_threadid, &worker_thread); + if (req == NULL) { + fprintf(stderr, "pthreadpool_tevent_job_send failed\n"); + return ENOMEM; + } + + ok = tevent_req_poll(req, ev); + if (!ok) { + ret = errno; + fprintf(stderr, "tevent_req_poll failed: %s\n", + strerror(ret)); + *executed = !pthread_equal(worker_thread, zero_thread); + *in_main_thread = pthread_equal(worker_thread, main_thread); + return ret; + } + + + ret = pthreadpool_tevent_job_recv(req); + TALLOC_FREE(req); + *executed = !pthread_equal(worker_thread, zero_thread); + *in_main_thread = pthread_equal(worker_thread, main_thread); + if (ret != 0) { + fprintf(stderr, "tevent_req_recv failed: %s\n", + strerror(ret)); + return ret; + } + + return 0; +} + +static void test_create(void **state) +{ + struct pthreadpool_tevent_test *t = *state; + bool executed; + bool in_main_thread; + int ret; + + /* + * When pthreadpool cannot create the first worker thread, + * this job will run in the sync fallback in the main thread. + */ + will_return(__wrap_pthread_create, EAGAIN); + ret = test_create_do(t->ev, t->upool, &executed, &in_main_thread); + assert_int_equal(ret, EAGAIN); + assert_false(executed); + assert_false(in_main_thread); + + /* + * The sync pool won't trigger pthread_create() + * It will be triggered by the one pool. + */ + will_return(__wrap_pthread_create, EAGAIN); + + ret = test_create_do(t->ev, t->spool, &executed, &in_main_thread); + assert_int_equal(ret, 0); + assert_true(executed); + assert_true(in_main_thread); + + ret = test_create_do(t->ev, t->opool, &executed, &in_main_thread); + assert_int_equal(ret, EAGAIN); + assert_false(executed); + assert_false(in_main_thread); + + /* + * When a thread can be created, the job will run in the worker thread. + */ + will_return(__wrap_pthread_create, 0); + ret = test_create_do(t->ev, t->upool, &executed, &in_main_thread); + assert_int_equal(ret, 0); + assert_true(executed); + assert_false(in_main_thread); + + poll(NULL, 0, 10); + + /* + * Workerthread will still be active for a second; immediately + * running another job will also use the worker thread, even + * if a new thread cannot be created. + */ + ret = test_create_do(t->ev, t->upool, &executed, &in_main_thread); + assert_int_equal(ret, 0); + assert_true(executed); + assert_false(in_main_thread); + + /* + * When a thread can be created, the job will run in the worker thread. + */ + will_return(__wrap_pthread_create, 0); + ret = test_create_do(t->ev, t->opool, &executed, &in_main_thread); + assert_int_equal(ret, 0); + assert_true(executed); + assert_false(in_main_thread); + + poll(NULL, 0, 10); + + /* + * Workerthread will still be active for a second; immediately + * running another job will also use the worker thread, even + * if a new thread cannot be created. + */ + ret = test_create_do(t->ev, t->opool, &executed, &in_main_thread); + assert_int_equal(ret, 0); + assert_true(executed); + assert_false(in_main_thread); +} + +int main(int argc, char **argv) +{ + const struct CMUnitTest tests[] = { + cmocka_unit_test_setup_teardown(test_create, + setup_pthreadpool_tevent, + teardown_pthreadpool_tevent), + }; + + cmocka_set_message_output(CM_OUTPUT_SUBUNIT); + + return cmocka_run_group_tests(tests, NULL, NULL); +} diff --git a/lib/pthreadpool/wscript_build b/lib/pthreadpool/wscript_build new file mode 100644 index 0000000..e270f90 --- /dev/null +++ b/lib/pthreadpool/wscript_build @@ -0,0 +1,35 @@ +#!/usr/bin/env python + +if bld.env.WITH_PTHREADPOOL: + extra_libs='' + + # Link to librt if needed for clock_gettime() + if bld.CONFIG_SET('HAVE_LIBRT'): extra_libs += ' rt' + + bld.SAMBA_SUBSYSTEM('PTHREADPOOL', + source='''pthreadpool.c + pthreadpool_pipe.c + pthreadpool_tevent.c + ''', + deps='pthread replace tevent-util' + extra_libs) +else: + bld.SAMBA_SUBSYSTEM('PTHREADPOOL', + source='''pthreadpool_sync.c + pthreadpool_pipe.c + pthreadpool_tevent.c + ''', + deps='replace tevent-util') + + +bld.SAMBA_BINARY('pthreadpooltest', + source='tests.c', + deps='PTHREADPOOL', + enabled=bld.env.WITH_PTHREADPOOL, + for_selftest=True) + +bld.SAMBA_BINARY('pthreadpooltest_cmocka', + source='tests_cmocka.c', + deps='PTHREADPOOL cmocka', + ldflags='-Wl,--wrap=pthread_create', + enabled=bld.env.WITH_PTHREADPOOL and bld.env['HAVE_LDWRAP'], + for_selftest=True) |