diff options
Diffstat (limited to 'tevent_queue.c')
-rw-r--r-- | tevent_queue.c | 404 |
1 files changed, 404 insertions, 0 deletions
diff --git a/tevent_queue.c b/tevent_queue.c new file mode 100644 index 0000000..647b8a0 --- /dev/null +++ b/tevent_queue.c @@ -0,0 +1,404 @@ +/* + Unix SMB/CIFS implementation. + Infrastructure for async requests + Copyright (C) Volker Lendecke 2008 + Copyright (C) Stefan Metzmacher 2009 + + ** NOTE! The following LGPL license applies to the tevent + ** library. This does NOT imply that all of Samba is released + ** under the LGPL + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 3 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "tevent.h" +#include "tevent_internal.h" +#include "tevent_util.h" + +struct tevent_queue_entry { + struct tevent_queue_entry *prev, *next; + struct tevent_queue *queue; + + bool triggered; + + struct tevent_req *req; + struct tevent_context *ev; + + tevent_queue_trigger_fn_t trigger; + void *private_data; + uint64_t tag; +}; + +struct tevent_queue { + const char *name; + const char *location; + + bool running; + struct tevent_immediate *immediate; + + size_t length; + struct tevent_queue_entry *list; +}; + +static void tevent_queue_immediate_trigger(struct tevent_context *ev, + struct tevent_immediate *im, + void *private_data); + +static int tevent_queue_entry_destructor(struct tevent_queue_entry *e) +{ + struct tevent_queue *q = e->queue; + + if (!q) { + return 0; + } + + tevent_trace_queue_callback(q->list->ev, e, TEVENT_EVENT_TRACE_DETACH); + DLIST_REMOVE(q->list, e); + q->length--; + + if (!q->running) { + return 0; + } + + if (!q->list) { + return 0; + } + + if (q->list->triggered) { + return 0; + } + + tevent_schedule_immediate(q->immediate, + q->list->ev, + tevent_queue_immediate_trigger, + q); + + return 0; +} + +static int tevent_queue_destructor(struct tevent_queue *q) +{ + q->running = false; + + while (q->list) { + struct tevent_queue_entry *e = q->list; + talloc_free(e); + } + + return 0; +} + +struct tevent_queue *_tevent_queue_create(TALLOC_CTX *mem_ctx, + const char *name, + const char *location) +{ + struct tevent_queue *queue; + + queue = talloc_zero(mem_ctx, struct tevent_queue); + if (!queue) { + return NULL; + } + + queue->name = talloc_strdup(queue, name); + if (!queue->name) { + talloc_free(queue); + return NULL; + } + queue->immediate = tevent_create_immediate(queue); + if (!queue->immediate) { + talloc_free(queue); + return NULL; + } + + queue->location = location; + + /* queue is running by default */ + queue->running = true; + + talloc_set_destructor(queue, tevent_queue_destructor); + return queue; +} + +static void tevent_queue_immediate_trigger(struct tevent_context *ev, + struct tevent_immediate *im, + void *private_data) +{ + struct tevent_queue *q = + talloc_get_type_abort(private_data, + struct tevent_queue); + + if (!q->running) { + return; + } + + if (!q->list) { + return; + } + + tevent_trace_queue_callback(ev, q->list, + TEVENT_EVENT_TRACE_BEFORE_HANDLER); + /* Set the call depth of the request coming from the queue. */ + tevent_thread_call_depth_set(q->list->req->internal.call_depth); + q->list->triggered = true; + q->list->trigger(q->list->req, q->list->private_data); +} + +static void tevent_queue_noop_trigger(struct tevent_req *req, + void *_private_data) +{ + /* this is doing nothing but blocking the queue */ +} + +static struct tevent_queue_entry *tevent_queue_add_internal( + struct tevent_queue *queue, + struct tevent_context *ev, + struct tevent_req *req, + tevent_queue_trigger_fn_t trigger, + void *private_data, + bool allow_direct) +{ + struct tevent_queue_entry *e; + + e = talloc_zero(req, struct tevent_queue_entry); + if (e == NULL) { + return NULL; + } + + /* + * if there is no trigger, it is just a blocker + */ + if (trigger == NULL) { + trigger = tevent_queue_noop_trigger; + } + + e->queue = queue; + e->req = req; + e->ev = ev; + e->trigger = trigger; + e->private_data = private_data; + + if (queue->length > 0) { + /* + * if there are already entries in the + * queue do not optimize. + */ + allow_direct = false; + } + + if (req->async.fn != NULL) { + /* + * If the caller wants to optimize for the + * empty queue case, call the trigger only + * if there is no callback defined for the + * request yet. + */ + allow_direct = false; + } + + DLIST_ADD_END(queue->list, e); + queue->length++; + talloc_set_destructor(e, tevent_queue_entry_destructor); + tevent_trace_queue_callback(ev, e, TEVENT_EVENT_TRACE_ATTACH); + + if (!queue->running) { + return e; + } + + if (queue->list->triggered) { + return e; + } + + /* + * If allowed we directly call the trigger + * avoiding possible delays caused by + * an immediate event. + */ + if (allow_direct) { + tevent_trace_queue_callback(ev, + queue->list, + TEVENT_EVENT_TRACE_BEFORE_HANDLER); + queue->list->triggered = true; + queue->list->trigger(queue->list->req, + queue->list->private_data); + return e; + } + + tevent_schedule_immediate(queue->immediate, + queue->list->ev, + tevent_queue_immediate_trigger, + queue); + + return e; +} + +bool tevent_queue_add(struct tevent_queue *queue, + struct tevent_context *ev, + struct tevent_req *req, + tevent_queue_trigger_fn_t trigger, + void *private_data) +{ + struct tevent_queue_entry *e; + + e = tevent_queue_add_internal(queue, ev, req, + trigger, private_data, false); + if (e == NULL) { + return false; + } + + return true; +} + +struct tevent_queue_entry *tevent_queue_add_entry( + struct tevent_queue *queue, + struct tevent_context *ev, + struct tevent_req *req, + tevent_queue_trigger_fn_t trigger, + void *private_data) +{ + return tevent_queue_add_internal(queue, ev, req, + trigger, private_data, false); +} + +struct tevent_queue_entry *tevent_queue_add_optimize_empty( + struct tevent_queue *queue, + struct tevent_context *ev, + struct tevent_req *req, + tevent_queue_trigger_fn_t trigger, + void *private_data) +{ + return tevent_queue_add_internal(queue, ev, req, + trigger, private_data, true); +} + +void tevent_queue_entry_untrigger(struct tevent_queue_entry *entry) +{ + if (entry->queue->running) { + abort(); + } + + if (entry->queue->list != entry) { + abort(); + } + + entry->triggered = false; +} + +void tevent_queue_start(struct tevent_queue *queue) +{ + if (queue->running) { + /* already started */ + return; + } + + queue->running = true; + + if (!queue->list) { + return; + } + + if (queue->list->triggered) { + return; + } + + tevent_schedule_immediate(queue->immediate, + queue->list->ev, + tevent_queue_immediate_trigger, + queue); +} + +void tevent_queue_stop(struct tevent_queue *queue) +{ + queue->running = false; +} + +size_t tevent_queue_length(struct tevent_queue *queue) +{ + return queue->length; +} + +bool tevent_queue_running(struct tevent_queue *queue) +{ + return queue->running; +} + +struct tevent_queue_wait_state { + uint8_t dummy; +}; + +static void tevent_queue_wait_trigger(struct tevent_req *req, + void *private_data); + +struct tevent_req *tevent_queue_wait_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tevent_queue *queue) +{ + struct tevent_req *req; + struct tevent_queue_wait_state *state; + bool ok; + + req = tevent_req_create(mem_ctx, &state, + struct tevent_queue_wait_state); + if (req == NULL) { + return NULL; + } + + ok = tevent_queue_add(queue, ev, req, + tevent_queue_wait_trigger, + NULL); + if (!ok) { + tevent_req_oom(req); + return tevent_req_post(req, ev); + } + + return req; +} + +static void tevent_queue_wait_trigger(struct tevent_req *req, + void *private_data) +{ + tevent_req_done(req); +} + +bool tevent_queue_wait_recv(struct tevent_req *req) +{ + enum tevent_req_state state; + uint64_t err; + + if (tevent_req_is_error(req, &state, &err)) { + tevent_req_received(req); + return false; + } + + tevent_req_received(req); + return true; +} + +void tevent_queue_entry_set_tag(struct tevent_queue_entry *qe, uint64_t tag) +{ + if (qe == NULL) { + return; + } + + qe->tag = tag; +} + +uint64_t tevent_queue_entry_get_tag(const struct tevent_queue_entry *qe) +{ + if (qe == NULL) { + return 0; + } + + return qe->tag; +} |