summaryrefslogtreecommitdiffstats
path: root/tevent_threads.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tevent_threads.c601
1 files changed, 601 insertions, 0 deletions
diff --git a/tevent_threads.c b/tevent_threads.c
new file mode 100644
index 0000000..2dc1f17
--- /dev/null
+++ b/tevent_threads.c
@@ -0,0 +1,601 @@
+/*
+ tevent event library.
+
+ Copyright (C) Jeremy Allison 2015
+
+ ** NOTE! The following LGPL license applies to the tevent
+ ** library. This does NOT imply that all of Samba is released
+ ** under the LGPL
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 3 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "replace.h"
+#include "system/filesys.h"
+#include "talloc.h"
+#include "tevent.h"
+#include "tevent_internal.h"
+#include "tevent_util.h"
+
+#ifdef HAVE_PTHREAD
+#include "system/threads.h"
+
+struct tevent_immediate_list {
+ struct tevent_immediate_list *next, *prev;
+ tevent_immediate_handler_t handler;
+ struct tevent_immediate *im;
+ void *private_ptr;
+};
+
+struct tevent_thread_proxy {
+ pthread_mutex_t mutex;
+ struct tevent_context *dest_ev_ctx;
+ int read_fd;
+ int write_fd;
+ struct tevent_fd *pipe_read_fde;
+ /* Pending events list. */
+ struct tevent_immediate_list *im_list;
+ /* Completed events list. */
+ struct tevent_immediate_list *tofree_im_list;
+ struct tevent_immediate *free_im;
+};
+
+static void free_im_list(struct tevent_immediate_list **pp_list_head)
+{
+ struct tevent_immediate_list *im_entry = NULL;
+ struct tevent_immediate_list *im_next = NULL;
+
+ for (im_entry = *pp_list_head; im_entry; im_entry = im_next) {
+ im_next = im_entry->next;
+ DLIST_REMOVE(*pp_list_head, im_entry);
+ TALLOC_FREE(im_entry);
+ }
+}
+
+static void free_list_handler(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_ptr)
+{
+ struct tevent_thread_proxy *tp =
+ talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+ int ret;
+
+ ret = pthread_mutex_lock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return;
+ }
+
+ free_im_list(&tp->tofree_im_list);
+
+ ret = pthread_mutex_unlock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return;
+ }
+}
+
+static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
+{
+ struct tevent_immediate_list *im_entry = NULL;
+ struct tevent_immediate_list *im_next = NULL;
+
+ for (im_entry = tp->im_list; im_entry; im_entry = im_next) {
+ im_next = im_entry->next;
+ DLIST_REMOVE(tp->im_list, im_entry);
+
+ tevent_schedule_immediate(im_entry->im,
+ tp->dest_ev_ctx,
+ im_entry->handler,
+ im_entry->private_ptr);
+
+ /* Move from pending list to free list. */
+ DLIST_ADD(tp->tofree_im_list, im_entry);
+ }
+ if (tp->tofree_im_list != NULL) {
+ /*
+ * Once the current immediate events
+ * are processed, we need to reschedule
+ * ourselves to free them. This works
+ * as tevent_schedule_immediate()
+ * always adds events to the *END* of
+ * the immediate events list.
+ */
+ tevent_schedule_immediate(tp->free_im,
+ tp->dest_ev_ctx,
+ free_list_handler,
+ tp);
+ }
+}
+
+static void pipe_read_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags,
+ void *private_ptr)
+{
+ struct tevent_thread_proxy *tp =
+ talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+ ssize_t len = 64;
+ int ret;
+
+ ret = pthread_mutex_lock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return;
+ }
+
+ /*
+ * Clear out all data in the pipe. We
+ * don't really care if this returns -1.
+ */
+ while (len == 64) {
+ char buf[64];
+ len = read(tp->read_fd, buf, 64);
+ };
+
+ schedule_immediate_functions(tp);
+
+ ret = pthread_mutex_unlock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return;
+ }
+}
+
+static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp)
+{
+ int ret;
+
+ ret = pthread_mutex_lock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return 0;
+ }
+
+ TALLOC_FREE(tp->pipe_read_fde);
+
+ if (tp->read_fd != -1) {
+ (void)close(tp->read_fd);
+ tp->read_fd = -1;
+ }
+ if (tp->write_fd != -1) {
+ (void)close(tp->write_fd);
+ tp->write_fd = -1;
+ }
+
+ /* Hmmm. It's probably an error if we get here with
+ any non-NULL immediate entries.. */
+
+ free_im_list(&tp->im_list);
+ free_im_list(&tp->tofree_im_list);
+
+ TALLOC_FREE(tp->free_im);
+
+ ret = pthread_mutex_unlock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return 0;
+ }
+
+ ret = pthread_mutex_destroy(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return 0;
+ }
+
+ return 0;
+}
+
+/*
+ * Create a struct that can be passed to other threads
+ * to allow them to signal the struct tevent_context *
+ * passed in.
+ */
+
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+ struct tevent_context *dest_ev_ctx)
+{
+ int ret;
+ int pipefds[2];
+ struct tevent_thread_proxy *tp;
+
+ if (dest_ev_ctx->wrapper.glue != NULL) {
+ /*
+ * stacking of wrappers is not supported
+ */
+ tevent_debug(dest_ev_ctx->wrapper.glue->main_ev,
+ TEVENT_DEBUG_FATAL,
+ "%s() not allowed on a wrapper context\n",
+ __func__);
+ errno = EINVAL;
+ return NULL;
+ }
+
+ tp = talloc_zero(dest_ev_ctx, struct tevent_thread_proxy);
+ if (tp == NULL) {
+ return NULL;
+ }
+
+ ret = pthread_mutex_init(&tp->mutex, NULL);
+ if (ret != 0) {
+ goto fail;
+ }
+
+ tp->dest_ev_ctx = dest_ev_ctx;
+ tp->read_fd = -1;
+ tp->write_fd = -1;
+
+ talloc_set_destructor(tp, tevent_thread_proxy_destructor);
+
+ ret = pipe(pipefds);
+ if (ret == -1) {
+ goto fail;
+ }
+
+ tp->read_fd = pipefds[0];
+ tp->write_fd = pipefds[1];
+
+ ret = ev_set_blocking(pipefds[0], false);
+ if (ret != 0) {
+ goto fail;
+ }
+ ret = ev_set_blocking(pipefds[1], false);
+ if (ret != 0) {
+ goto fail;
+ }
+ if (!ev_set_close_on_exec(pipefds[0])) {
+ goto fail;
+ }
+ if (!ev_set_close_on_exec(pipefds[1])) {
+ goto fail;
+ }
+
+ tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx,
+ tp,
+ tp->read_fd,
+ TEVENT_FD_READ,
+ pipe_read_handler,
+ tp);
+ if (tp->pipe_read_fde == NULL) {
+ goto fail;
+ }
+
+ /*
+ * Create an immediate event to free
+ * completed lists.
+ */
+ tp->free_im = tevent_create_immediate(tp);
+ if (tp->free_im == NULL) {
+ goto fail;
+ }
+
+ return tp;
+
+ fail:
+
+ TALLOC_FREE(tp);
+ return NULL;
+}
+
+/*
+ * This function schedules an immediate event to be called with argument
+ * *pp_private in the thread context of dest_ev_ctx. Caller doesn't
+ * wait for activation to take place, this is simply fire-and-forget.
+ *
+ * pp_im must be a pointer to an immediate event talloced on
+ * a context owned by the calling thread, or the NULL context.
+ * Ownership of *pp_im will be transferred to the tevent library.
+ *
+ * pp_private can be null, or contents of *pp_private must be
+ * talloc'ed memory on a context owned by the calling thread
+ * or the NULL context. If non-null, ownership of *pp_private will
+ * be transferred to the tevent library.
+ *
+ * If you want to return a message, have the destination use the
+ * same function call to send back to the caller.
+ */
+
+
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+ struct tevent_immediate **pp_im,
+ tevent_immediate_handler_t handler,
+ void *pp_private_data)
+{
+ struct tevent_immediate_list *im_entry;
+ int ret;
+ char c;
+ ssize_t written;
+
+ ret = pthread_mutex_lock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return;
+ }
+
+ if (tp->write_fd == -1) {
+ /* In the process of being destroyed. Ignore. */
+ goto end;
+ }
+
+ /* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
+ im_entry = talloc_zero(NULL, struct tevent_immediate_list);
+ if (im_entry == NULL) {
+ goto end;
+ }
+
+ im_entry->handler = handler;
+ im_entry->im = talloc_move(im_entry, pp_im);
+
+ if (pp_private_data != NULL) {
+ void **pptr = (void **)pp_private_data;
+ im_entry->private_ptr = talloc_move(im_entry, pptr);
+ }
+
+ DLIST_ADD(tp->im_list, im_entry);
+
+ /* And notify the dest_ev_ctx to wake up. */
+ c = '\0';
+ do {
+ written = write(tp->write_fd, &c, 1);
+ } while (written == -1 && errno == EINTR);
+
+ end:
+
+ ret = pthread_mutex_unlock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ }
+}
+#else
+/* !HAVE_PTHREAD */
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+ struct tevent_context *dest_ev_ctx)
+{
+ errno = ENOSYS;
+ return NULL;
+}
+
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+ struct tevent_immediate **pp_im,
+ tevent_immediate_handler_t handler,
+ void *pp_private_data)
+{
+ ;
+}
+#endif
+
+static int tevent_threaded_context_destructor(
+ struct tevent_threaded_context *tctx)
+{
+ struct tevent_context *main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
+ int ret;
+
+ if (main_ev != NULL) {
+ DLIST_REMOVE(main_ev->threaded_contexts, tctx);
+ }
+
+ /*
+ * We have to coordinate with _tevent_threaded_schedule_immediate's
+ * unlock of the event_ctx_mutex. We're in the main thread here,
+ * and we can be scheduled before the helper thread finalizes its
+ * call _tevent_threaded_schedule_immediate. This means we would
+ * pthreadpool_destroy a locked mutex, which is illegal.
+ */
+ ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ ret = pthread_mutex_destroy(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ return 0;
+}
+
+struct tevent_threaded_context *tevent_threaded_context_create(
+ TALLOC_CTX *mem_ctx, struct tevent_context *ev)
+{
+#ifdef HAVE_PTHREAD
+ struct tevent_context *main_ev = tevent_wrapper_main_ev(ev);
+ struct tevent_threaded_context *tctx;
+ int ret;
+
+ ret = tevent_common_wakeup_init(main_ev);
+ if (ret != 0) {
+ errno = ret;
+ return NULL;
+ }
+
+ tctx = talloc(mem_ctx, struct tevent_threaded_context);
+ if (tctx == NULL) {
+ return NULL;
+ }
+ tctx->event_ctx = ev;
+
+ ret = pthread_mutex_init(&tctx->event_ctx_mutex, NULL);
+ if (ret != 0) {
+ TALLOC_FREE(tctx);
+ return NULL;
+ }
+
+ DLIST_ADD(main_ev->threaded_contexts, tctx);
+ talloc_set_destructor(tctx, tevent_threaded_context_destructor);
+
+ return tctx;
+#else
+ errno = ENOSYS;
+ return NULL;
+#endif
+}
+
+static int tevent_threaded_schedule_immediate_destructor(struct tevent_immediate *im)
+{
+ if (im->event_ctx != NULL) {
+ abort();
+ }
+ return 0;
+}
+
+void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
+ struct tevent_immediate *im,
+ tevent_immediate_handler_t handler,
+ void *private_data,
+ const char *handler_name,
+ const char *location)
+{
+#ifdef HAVE_PTHREAD
+ const char *create_location = im->create_location;
+ struct tevent_context *main_ev = NULL;
+ struct tevent_wrapper_glue *glue = NULL;
+ int ret, wakeup_fd;
+
+ ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ if (tctx->event_ctx == NULL) {
+ /*
+ * Our event context is already gone.
+ */
+ ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+ return;
+ }
+
+ glue = tctx->event_ctx->wrapper.glue;
+
+ if ((im->event_ctx != NULL) || (handler == NULL)) {
+ abort();
+ }
+ if (im->destroyed) {
+ abort();
+ }
+ if (im->busy) {
+ abort();
+ }
+
+ main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
+
+ *im = (struct tevent_immediate) {
+ .event_ctx = tctx->event_ctx,
+ .wrapper = glue,
+ .handler = handler,
+ .private_data = private_data,
+ .handler_name = handler_name,
+ .create_location = create_location,
+ .schedule_location = location,
+ };
+
+ /*
+ * Make sure the event won't be destroyed while
+ * it's part of the ev->scheduled_immediates list.
+ * _tevent_schedule_immediate() will reset the destructor
+ * in tevent_common_threaded_activate_immediate().
+ */
+ talloc_set_destructor(im, tevent_threaded_schedule_immediate_destructor);
+
+ ret = pthread_mutex_lock(&main_ev->scheduled_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ DLIST_ADD_END(main_ev->scheduled_immediates, im);
+ wakeup_fd = main_ev->wakeup_fd;
+
+ ret = pthread_mutex_unlock(&main_ev->scheduled_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ /*
+ * We might want to wake up the main thread under the lock. We
+ * had a slightly similar situation in pthreadpool, changed
+ * with 1c4284c7395f23. This is not exactly the same, as the
+ * wakeup is only a last-resort thing in case the main thread
+ * is sleeping. Doing the wakeup under the lock can easily
+ * lead to a contended mutex, which is much more expensive
+ * than a noncontended one. So I'd opt for the lower footprint
+ * initially. Maybe we have to change that later.
+ */
+ tevent_common_wakeup_fd(wakeup_fd);
+#else
+ /*
+ * tevent_threaded_context_create() returned NULL with ENOSYS...
+ */
+ abort();
+#endif
+}
+
+void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
+{
+#ifdef HAVE_PTHREAD
+ int ret;
+ ret = pthread_mutex_lock(&ev->scheduled_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ while (ev->scheduled_immediates != NULL) {
+ struct tevent_immediate *im = ev->scheduled_immediates;
+ struct tevent_immediate copy = *im;
+
+ DLIST_REMOVE(ev->scheduled_immediates, im);
+
+ TEVENT_DEBUG(ev, TEVENT_DEBUG_TRACE,
+ "Schedule immediate event \"%s\": %p from thread into main\n",
+ im->handler_name, im);
+ im->handler_name = NULL;
+ _tevent_schedule_immediate(im,
+ ev,
+ copy.handler,
+ copy.private_data,
+ copy.handler_name,
+ copy.schedule_location);
+ }
+
+ ret = pthread_mutex_unlock(&ev->scheduled_mutex);
+ if (ret != 0) {
+ abort();
+ }
+#else
+ /*
+ * tevent_threaded_context_create() returned NULL with ENOSYS...
+ */
+ abort();
+#endif
+}