diff options
Diffstat (limited to 'lib/tevent/tevent_threads.c')
-rw-r--r-- | lib/tevent/tevent_threads.c | 601 |
1 files changed, 601 insertions, 0 deletions
diff --git a/lib/tevent/tevent_threads.c b/lib/tevent/tevent_threads.c new file mode 100644 index 0000000..a89990f --- /dev/null +++ b/lib/tevent/tevent_threads.c @@ -0,0 +1,601 @@ +/* + tevent event library. + + Copyright (C) Jeremy Allison 2015 + + ** NOTE! The following LGPL license applies to the tevent + ** library. This does NOT imply that all of Samba is released + ** under the LGPL + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 3 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/filesys.h" +#include "talloc.h" +#include "tevent.h" +#include "tevent_internal.h" +#include "tevent_util.h" + +#ifdef HAVE_PTHREAD +#include "system/threads.h" + +struct tevent_immediate_list { + struct tevent_immediate_list *next, *prev; + tevent_immediate_handler_t handler; + struct tevent_immediate *im; + void *private_ptr; +}; + +struct tevent_thread_proxy { + pthread_mutex_t mutex; + struct tevent_context *dest_ev_ctx; + int read_fd; + int write_fd; + struct tevent_fd *pipe_read_fde; + /* Pending events list. */ + struct tevent_immediate_list *im_list; + /* Completed events list. */ + struct tevent_immediate_list *tofree_im_list; + struct tevent_immediate *free_im; +}; + +static void free_im_list(struct tevent_immediate_list **pp_list_head) +{ + struct tevent_immediate_list *im_entry = NULL; + struct tevent_immediate_list *im_next = NULL; + + for (im_entry = *pp_list_head; im_entry; im_entry = im_next) { + im_next = im_entry->next; + DLIST_REMOVE(*pp_list_head, im_entry); + TALLOC_FREE(im_entry); + } +} + +static void free_list_handler(struct tevent_context *ev, + struct tevent_immediate *im, + void *private_ptr) +{ + struct tevent_thread_proxy *tp = + talloc_get_type_abort(private_ptr, struct tevent_thread_proxy); + int ret; + + ret = pthread_mutex_lock(&tp->mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return; + } + + free_im_list(&tp->tofree_im_list); + + ret = pthread_mutex_unlock(&tp->mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return; + } +} + +static void schedule_immediate_functions(struct tevent_thread_proxy *tp) +{ + struct tevent_immediate_list *im_entry = NULL; + struct tevent_immediate_list *im_next = NULL; + + for (im_entry = tp->im_list; im_entry; im_entry = im_next) { + im_next = im_entry->next; + DLIST_REMOVE(tp->im_list, im_entry); + + tevent_schedule_immediate(im_entry->im, + tp->dest_ev_ctx, + im_entry->handler, + im_entry->private_ptr); + + /* Move from pending list to free list. */ + DLIST_ADD(tp->tofree_im_list, im_entry); + } + if (tp->tofree_im_list != NULL) { + /* + * Once the current immediate events + * are processed, we need to reschedule + * ourselves to free them. This works + * as tevent_schedule_immediate() + * always adds events to the *END* of + * the immediate events list. + */ + tevent_schedule_immediate(tp->free_im, + tp->dest_ev_ctx, + free_list_handler, + tp); + } +} + +static void pipe_read_handler(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, + void *private_ptr) +{ + struct tevent_thread_proxy *tp = + talloc_get_type_abort(private_ptr, struct tevent_thread_proxy); + ssize_t len = 64; + int ret; + + ret = pthread_mutex_lock(&tp->mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return; + } + + /* + * Clear out all data in the pipe. We + * don't really care if this returns -1. + */ + while (len == 64) { + char buf[64]; + len = read(tp->read_fd, buf, 64); + }; + + schedule_immediate_functions(tp); + + ret = pthread_mutex_unlock(&tp->mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return; + } +} + +static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp) +{ + int ret; + + ret = pthread_mutex_lock(&tp->mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return 0; + } + + TALLOC_FREE(tp->pipe_read_fde); + + if (tp->read_fd != -1) { + (void)close(tp->read_fd); + tp->read_fd = -1; + } + if (tp->write_fd != -1) { + (void)close(tp->write_fd); + tp->write_fd = -1; + } + + /* Hmmm. It's probably an error if we get here with + any non-NULL immediate entries.. */ + + free_im_list(&tp->im_list); + free_im_list(&tp->tofree_im_list); + + TALLOC_FREE(tp->free_im); + + ret = pthread_mutex_unlock(&tp->mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return 0; + } + + ret = pthread_mutex_destroy(&tp->mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return 0; + } + + return 0; +} + +/* + * Create a struct that can be passed to other threads + * to allow them to signal the struct tevent_context * + * passed in. + */ + +struct tevent_thread_proxy *tevent_thread_proxy_create( + struct tevent_context *dest_ev_ctx) +{ + int ret; + int pipefds[2]; + struct tevent_thread_proxy *tp; + + if (dest_ev_ctx->wrapper.glue != NULL) { + /* + * stacking of wrappers is not supported + */ + tevent_debug(dest_ev_ctx->wrapper.glue->main_ev, + TEVENT_DEBUG_FATAL, + "%s() not allowed on a wrapper context\n", + __func__); + errno = EINVAL; + return NULL; + } + + tp = talloc_zero(dest_ev_ctx, struct tevent_thread_proxy); + if (tp == NULL) { + return NULL; + } + + ret = pthread_mutex_init(&tp->mutex, NULL); + if (ret != 0) { + goto fail; + } + + tp->dest_ev_ctx = dest_ev_ctx; + tp->read_fd = -1; + tp->write_fd = -1; + + talloc_set_destructor(tp, tevent_thread_proxy_destructor); + + ret = pipe(pipefds); + if (ret == -1) { + goto fail; + } + + tp->read_fd = pipefds[0]; + tp->write_fd = pipefds[1]; + + ret = ev_set_blocking(pipefds[0], false); + if (ret != 0) { + goto fail; + } + ret = ev_set_blocking(pipefds[1], false); + if (ret != 0) { + goto fail; + } + if (!ev_set_close_on_exec(pipefds[0])) { + goto fail; + } + if (!ev_set_close_on_exec(pipefds[1])) { + goto fail; + } + + tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx, + tp, + tp->read_fd, + TEVENT_FD_READ, + pipe_read_handler, + tp); + if (tp->pipe_read_fde == NULL) { + goto fail; + } + + /* + * Create an immediate event to free + * completed lists. + */ + tp->free_im = tevent_create_immediate(tp); + if (tp->free_im == NULL) { + goto fail; + } + + return tp; + + fail: + + TALLOC_FREE(tp); + return NULL; +} + +/* + * This function schedules an immediate event to be called with argument + * *pp_private in the thread context of dest_ev_ctx. Caller doesn't + * wait for activation to take place, this is simply fire-and-forget. + * + * pp_im must be a pointer to an immediate event talloced on + * a context owned by the calling thread, or the NULL context. + * Ownership of *pp_im will be transfered to the tevent library. + * + * pp_private can be null, or contents of *pp_private must be + * talloc'ed memory on a context owned by the calling thread + * or the NULL context. If non-null, ownership of *pp_private will + * be transfered to the tevent library. + * + * If you want to return a message, have the destination use the + * same function call to send back to the caller. + */ + + +void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp, + struct tevent_immediate **pp_im, + tevent_immediate_handler_t handler, + void *pp_private_data) +{ + struct tevent_immediate_list *im_entry; + int ret; + char c; + ssize_t written; + + ret = pthread_mutex_lock(&tp->mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return; + } + + if (tp->write_fd == -1) { + /* In the process of being destroyed. Ignore. */ + goto end; + } + + /* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */ + im_entry = talloc_zero(NULL, struct tevent_immediate_list); + if (im_entry == NULL) { + goto end; + } + + im_entry->handler = handler; + im_entry->im = talloc_move(im_entry, pp_im); + + if (pp_private_data != NULL) { + void **pptr = (void **)pp_private_data; + im_entry->private_ptr = talloc_move(im_entry, pptr); + } + + DLIST_ADD(tp->im_list, im_entry); + + /* And notify the dest_ev_ctx to wake up. */ + c = '\0'; + do { + written = write(tp->write_fd, &c, 1); + } while (written == -1 && errno == EINTR); + + end: + + ret = pthread_mutex_unlock(&tp->mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + } +} +#else +/* !HAVE_PTHREAD */ +struct tevent_thread_proxy *tevent_thread_proxy_create( + struct tevent_context *dest_ev_ctx) +{ + errno = ENOSYS; + return NULL; +} + +void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp, + struct tevent_immediate **pp_im, + tevent_immediate_handler_t handler, + void *pp_private_data) +{ + ; +} +#endif + +static int tevent_threaded_context_destructor( + struct tevent_threaded_context *tctx) +{ + struct tevent_context *main_ev = tevent_wrapper_main_ev(tctx->event_ctx); + int ret; + + if (main_ev != NULL) { + DLIST_REMOVE(main_ev->threaded_contexts, tctx); + } + + /* + * We have to coordinate with _tevent_threaded_schedule_immediate's + * unlock of the event_ctx_mutex. We're in the main thread here, + * and we can be scheduled before the helper thread finalizes its + * call _tevent_threaded_schedule_immediate. This means we would + * pthreadpool_destroy a locked mutex, which is illegal. + */ + ret = pthread_mutex_lock(&tctx->event_ctx_mutex); + if (ret != 0) { + abort(); + } + + ret = pthread_mutex_unlock(&tctx->event_ctx_mutex); + if (ret != 0) { + abort(); + } + + ret = pthread_mutex_destroy(&tctx->event_ctx_mutex); + if (ret != 0) { + abort(); + } + + return 0; +} + +struct tevent_threaded_context *tevent_threaded_context_create( + TALLOC_CTX *mem_ctx, struct tevent_context *ev) +{ +#ifdef HAVE_PTHREAD + struct tevent_context *main_ev = tevent_wrapper_main_ev(ev); + struct tevent_threaded_context *tctx; + int ret; + + ret = tevent_common_wakeup_init(main_ev); + if (ret != 0) { + errno = ret; + return NULL; + } + + tctx = talloc(mem_ctx, struct tevent_threaded_context); + if (tctx == NULL) { + return NULL; + } + tctx->event_ctx = ev; + + ret = pthread_mutex_init(&tctx->event_ctx_mutex, NULL); + if (ret != 0) { + TALLOC_FREE(tctx); + return NULL; + } + + DLIST_ADD(main_ev->threaded_contexts, tctx); + talloc_set_destructor(tctx, tevent_threaded_context_destructor); + + return tctx; +#else + errno = ENOSYS; + return NULL; +#endif +} + +static int tevent_threaded_schedule_immediate_destructor(struct tevent_immediate *im) +{ + if (im->event_ctx != NULL) { + abort(); + } + return 0; +} + +void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx, + struct tevent_immediate *im, + tevent_immediate_handler_t handler, + void *private_data, + const char *handler_name, + const char *location) +{ +#ifdef HAVE_PTHREAD + const char *create_location = im->create_location; + struct tevent_context *main_ev = NULL; + struct tevent_wrapper_glue *glue = NULL; + int ret, wakeup_fd; + + ret = pthread_mutex_lock(&tctx->event_ctx_mutex); + if (ret != 0) { + abort(); + } + + if (tctx->event_ctx == NULL) { + /* + * Our event context is already gone. + */ + ret = pthread_mutex_unlock(&tctx->event_ctx_mutex); + if (ret != 0) { + abort(); + } + return; + } + + glue = tctx->event_ctx->wrapper.glue; + + if ((im->event_ctx != NULL) || (handler == NULL)) { + abort(); + } + if (im->destroyed) { + abort(); + } + if (im->busy) { + abort(); + } + + main_ev = tevent_wrapper_main_ev(tctx->event_ctx); + + *im = (struct tevent_immediate) { + .event_ctx = tctx->event_ctx, + .wrapper = glue, + .handler = handler, + .private_data = private_data, + .handler_name = handler_name, + .create_location = create_location, + .schedule_location = location, + }; + + /* + * Make sure the event won't be destroyed while + * it's part of the ev->scheduled_immediates list. + * _tevent_schedule_immediate() will reset the destructor + * in tevent_common_threaded_activate_immediate(). + */ + talloc_set_destructor(im, tevent_threaded_schedule_immediate_destructor); + + ret = pthread_mutex_lock(&main_ev->scheduled_mutex); + if (ret != 0) { + abort(); + } + + DLIST_ADD_END(main_ev->scheduled_immediates, im); + wakeup_fd = main_ev->wakeup_fd; + + ret = pthread_mutex_unlock(&main_ev->scheduled_mutex); + if (ret != 0) { + abort(); + } + + ret = pthread_mutex_unlock(&tctx->event_ctx_mutex); + if (ret != 0) { + abort(); + } + + /* + * We might want to wake up the main thread under the lock. We + * had a slightly similar situation in pthreadpool, changed + * with 1c4284c7395f23. This is not exactly the same, as the + * wakeup is only a last-resort thing in case the main thread + * is sleeping. Doing the wakeup under the lock can easily + * lead to a contended mutex, which is much more expensive + * than a noncontended one. So I'd opt for the lower footprint + * initially. Maybe we have to change that later. + */ + tevent_common_wakeup_fd(wakeup_fd); +#else + /* + * tevent_threaded_context_create() returned NULL with ENOSYS... + */ + abort(); +#endif +} + +void tevent_common_threaded_activate_immediate(struct tevent_context *ev) +{ +#ifdef HAVE_PTHREAD + int ret; + ret = pthread_mutex_lock(&ev->scheduled_mutex); + if (ret != 0) { + abort(); + } + + while (ev->scheduled_immediates != NULL) { + struct tevent_immediate *im = ev->scheduled_immediates; + struct tevent_immediate copy = *im; + + DLIST_REMOVE(ev->scheduled_immediates, im); + + tevent_debug(ev, TEVENT_DEBUG_TRACE, + "Schedule immediate event \"%s\": %p from thread into main\n", + im->handler_name, im); + im->handler_name = NULL; + _tevent_schedule_immediate(im, + ev, + copy.handler, + copy.private_data, + copy.handler_name, + copy.schedule_location); + } + + ret = pthread_mutex_unlock(&ev->scheduled_mutex); + if (ret != 0) { + abort(); + } +#else + /* + * tevent_threaded_context_create() returned NULL with ENOSYS... + */ + abort(); +#endif +} |