diff options
Diffstat (limited to 'modules/http2/h2_workers.c')
-rw-r--r-- | modules/http2/h2_workers.c | 755 |
1 files changed, 499 insertions, 256 deletions
diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index 699f533..e7e2039 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -15,285 +15,440 @@ */ #include <assert.h> -#include <apr_atomic.h> +#include <apr_ring.h> #include <apr_thread_mutex.h> #include <apr_thread_cond.h> #include <mpm_common.h> #include <httpd.h> +#include <http_connection.h> #include <http_core.h> #include <http_log.h> +#include <http_protocol.h> #include "h2.h" #include "h2_private.h" #include "h2_mplx.h" -#include "h2_task.h" +#include "h2_c2.h" #include "h2_workers.h" #include "h2_util.h" +typedef enum { + PROD_IDLE, + PROD_ACTIVE, + PROD_JOINED, +} prod_state_t; + +struct ap_conn_producer_t { + APR_RING_ENTRY(ap_conn_producer_t) link; + const char *name; + void *baton; + ap_conn_producer_next *fn_next; + ap_conn_producer_done *fn_done; + ap_conn_producer_shutdown *fn_shutdown; + volatile prod_state_t state; + volatile int conns_active; +}; + + +typedef enum { + H2_SLOT_FREE, + H2_SLOT_RUN, + H2_SLOT_ZOMBIE, +} h2_slot_state_t; + typedef struct h2_slot h2_slot; struct h2_slot { - int id; - h2_slot *next; + APR_RING_ENTRY(h2_slot) link; + apr_uint32_t id; + apr_pool_t *pool; + h2_slot_state_t state; + volatile int should_shutdown; + volatile int is_idle; h2_workers *workers; - int aborted; - int sticks; - h2_task *task; + ap_conn_producer_t *prod; apr_thread_t *thread; - apr_thread_mutex_t *lock; - apr_thread_cond_t *not_idle; + struct apr_thread_cond_t *more_work; + int activations; }; -static h2_slot *pop_slot(h2_slot **phead) -{ - /* Atomically pop a slot from the list */ - for (;;) { - h2_slot *first = *phead; - if (first == NULL) { - return NULL; - } - if (apr_atomic_casptr((void*)phead, first->next, first) == first) { - first->next = NULL; - return first; - } - } -} +struct h2_workers { + server_rec *s; + apr_pool_t *pool; + + apr_uint32_t max_slots; + apr_uint32_t min_active; + volatile apr_time_t idle_limit; + volatile int aborted; + volatile int shutdown; + int dynamic; + + volatile apr_uint32_t active_slots; + volatile apr_uint32_t idle_slots; + + apr_threadattr_t *thread_attr; + h2_slot *slots; + + APR_RING_HEAD(h2_slots_free, h2_slot) free; + APR_RING_HEAD(h2_slots_idle, h2_slot) idle; + APR_RING_HEAD(h2_slots_busy, h2_slot) busy; + APR_RING_HEAD(h2_slots_zombie, h2_slot) zombie; + + APR_RING_HEAD(ap_conn_producer_active, ap_conn_producer_t) prod_active; + APR_RING_HEAD(ap_conn_producer_idle, ap_conn_producer_t) prod_idle; + + struct apr_thread_mutex_t *lock; + struct apr_thread_cond_t *prod_done; + struct apr_thread_cond_t *all_done; +}; -static void push_slot(h2_slot **phead, h2_slot *slot) -{ - /* Atomically push a slot to the list */ - ap_assert(!slot->next); - for (;;) { - h2_slot *next = slot->next = *phead; - if (apr_atomic_casptr((void*)phead, slot, next) == next) { - return; - } - } -} static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx); -static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) +static apr_status_t activate_slot(h2_workers *workers) { - apr_status_t status; - - slot->workers = workers; - slot->aborted = 0; - slot->task = NULL; - - if (!slot->lock) { - status = apr_thread_mutex_create(&slot->lock, - APR_THREAD_MUTEX_DEFAULT, - workers->pool); - if (status != APR_SUCCESS) { - push_slot(&workers->free, slot); - return status; - } - } + h2_slot *slot; + apr_pool_t *pool; + apr_status_t rv; - if (!slot->not_idle) { - status = apr_thread_cond_create(&slot->not_idle, workers->pool); - if (status != APR_SUCCESS) { - push_slot(&workers->free, slot); - return status; - } + if (APR_RING_EMPTY(&workers->free, h2_slot, link)) { + return APR_EAGAIN; } - - ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s, - "h2_workers: new thread for slot %d", slot->id); - /* thread will either immediately start work or add itself - * to the idle queue */ - apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot, - workers->pool); - if (!slot->thread) { - push_slot(&workers->free, slot); - return APR_ENOMEM; - } - - apr_atomic_inc32(&workers->worker_count); - return APR_SUCCESS; -} + slot = APR_RING_FIRST(&workers->free); + ap_assert(slot->state == H2_SLOT_FREE); + APR_RING_REMOVE(slot, link); -static apr_status_t add_worker(h2_workers *workers) -{ - h2_slot *slot = pop_slot(&workers->free); - if (slot) { - return activate_slot(workers, slot); - } - return APR_EAGAIN; -} + ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, + "h2_workers: activate slot %d", slot->id); -static void wake_idle_worker(h2_workers *workers) -{ - h2_slot *slot = pop_slot(&workers->idle); - if (slot) { - apr_thread_mutex_lock(slot->lock); - apr_thread_cond_signal(slot->not_idle); - apr_thread_mutex_unlock(slot->lock); - } - else if (workers->dynamic) { - add_worker(workers); + slot->state = H2_SLOT_RUN; + slot->should_shutdown = 0; + slot->is_idle = 0; + slot->pool = NULL; + ++workers->active_slots; + rv = apr_pool_create(&pool, workers->pool); + if (APR_SUCCESS != rv) goto cleanup; + apr_pool_tag(pool, "h2_worker_slot"); + slot->pool = pool; + + rv = ap_thread_create(&slot->thread, workers->thread_attr, + slot_run, slot, slot->pool); + +cleanup: + if (rv != APR_SUCCESS) { + AP_DEBUG_ASSERT(0); + slot->state = H2_SLOT_FREE; + if (slot->pool) { + apr_pool_destroy(slot->pool); + slot->pool = NULL; + } + APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link); + --workers->active_slots; } + return rv; } -static void cleanup_zombies(h2_workers *workers) +static void join_zombies(h2_workers *workers) { h2_slot *slot; - while ((slot = pop_slot(&workers->zombies))) { - if (slot->thread) { - apr_status_t status; - apr_thread_join(&status, slot->thread); - slot->thread = NULL; + apr_status_t status; + + while (!APR_RING_EMPTY(&workers->zombie, h2_slot, link)) { + slot = APR_RING_FIRST(&workers->zombie); + APR_RING_REMOVE(slot, link); + ap_assert(slot->state == H2_SLOT_ZOMBIE); + ap_assert(slot->thread != NULL); + + apr_thread_mutex_unlock(workers->lock); + apr_thread_join(&status, slot->thread); + apr_thread_mutex_lock(workers->lock); + + slot->thread = NULL; + slot->state = H2_SLOT_FREE; + if (slot->pool) { + apr_pool_destroy(slot->pool); + slot->pool = NULL; } - apr_atomic_dec32(&workers->worker_count); - slot->next = NULL; - push_slot(&workers->free, slot); + APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link); } } -static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m) +static void wake_idle_worker(h2_workers *workers, ap_conn_producer_t *prod) { - apr_status_t rv; - - rv = h2_mplx_pop_task(m, &slot->task); - if (slot->task) { - /* Ok, we got something to give back to the worker for execution. - * If we still have idle workers, we let the worker be sticky, - * e.g. making it poll the task's h2_mplx instance for more work - * before asking back here. */ - slot->sticks = slot->workers->max_workers; - return rv; + if (!APR_RING_EMPTY(&workers->idle, h2_slot, link)) { + h2_slot *slot; + for (slot = APR_RING_FIRST(&workers->idle); + slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link); + slot = APR_RING_NEXT(slot, link)) { + if (slot->is_idle && !slot->should_shutdown) { + apr_thread_cond_signal(slot->more_work); + slot->is_idle = 0; + return; + } + } + } + if (workers->dynamic && !workers->shutdown + && (workers->active_slots < workers->max_slots)) { + activate_slot(workers); } - slot->sticks = 0; - return APR_EOF; -} - -static h2_fifo_op_t mplx_peek(void *head, void *ctx) -{ - h2_mplx *m = head; - h2_slot *slot = ctx; - - if (slot_pull_task(slot, m) == APR_EAGAIN) { - wake_idle_worker(slot->workers); - return H2_FIFO_OP_REPUSH; - } - return H2_FIFO_OP_PULL; } /** - * Get the next task for the given worker. Will block until a task arrives - * or the max_wait timer expires and more than min workers exist. + * Get the next connection to work on. */ -static apr_status_t get_next(h2_slot *slot) +static conn_rec *get_next(h2_slot *slot) { h2_workers *workers = slot->workers; - apr_status_t status; - - slot->task = NULL; - while (!slot->aborted) { - if (!slot->task) { - status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot); - if (status == APR_EOF) { - return status; - } + conn_rec *c = NULL; + ap_conn_producer_t *prod; + int has_more; + + slot->prod = NULL; + if (!APR_RING_EMPTY(&workers->prod_active, ap_conn_producer_t, link)) { + slot->prod = prod = APR_RING_FIRST(&workers->prod_active); + APR_RING_REMOVE(prod, link); + AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state); + + c = prod->fn_next(prod->baton, &has_more); + if (c && has_more) { + APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link); + wake_idle_worker(workers, slot->prod); } - - if (slot->task) { - return APR_SUCCESS; + else { + prod->state = PROD_IDLE; + APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link); + } + if (c) { + ++prod->conns_active; } - - cleanup_zombies(workers); - - apr_thread_mutex_lock(slot->lock); - push_slot(&workers->idle, slot); - apr_thread_cond_wait(slot->not_idle, slot->lock); - apr_thread_mutex_unlock(slot->lock); } - return APR_EOF; -} -static void slot_done(h2_slot *slot) -{ - push_slot(&(slot->workers->zombies), slot); + return c; } - static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx) { h2_slot *slot = wctx; - - while (!slot->aborted) { - - /* Get a h2_task from the mplxs queue. */ - get_next(slot); - while (slot->task) { - - h2_task_do(slot->task, thread, slot->id); - - /* Report the task as done. If stickyness is left, offer the - * mplx the opportunity to give us back a new task right away. - */ - if (!slot->aborted && (--slot->sticks > 0)) { - h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task); - } - else { - h2_mplx_task_done(slot->task->mplx, slot->task, NULL); - slot->task = NULL; + h2_workers *workers = slot->workers; + conn_rec *c; + apr_status_t rv; + + apr_thread_mutex_lock(workers->lock); + slot->state = H2_SLOT_RUN; + ++slot->activations; + APR_RING_ELEM_INIT(slot, link); + for(;;) { + if (APR_RING_NEXT(slot, link) != slot) { + /* slot is part of the idle ring from the last loop */ + APR_RING_REMOVE(slot, link); + --workers->idle_slots; + } + slot->is_idle = 0; + + if (!workers->aborted && !slot->should_shutdown) { + APR_RING_INSERT_TAIL(&workers->busy, slot, h2_slot, link); + do { + c = get_next(slot); + if (!c) { + break; + } + apr_thread_mutex_unlock(workers->lock); + /* See the discussion at <https://github.com/icing/mod_h2/issues/195> + * + * Each conn_rec->id is supposed to be unique at a point in time. Since + * some modules (and maybe external code) uses this id as an identifier + * for the request_rec they handle, it needs to be unique for secondary + * connections also. + * + * The MPM module assigns the connection ids and mod_unique_id is using + * that one to generate identifier for requests. While the implementation + * works for HTTP/1.x, the parallel execution of several requests per + * connection will generate duplicate identifiers on load. + * + * The original implementation for secondary connection identifiers used + * to shift the master connection id up and assign the stream id to the + * lower bits. This was cramped on 32 bit systems, but on 64bit there was + * enough space. + * + * As issue 195 showed, mod_unique_id only uses the lower 32 bit of the + * connection id, even on 64bit systems. Therefore collisions in request ids. + * + * The way master connection ids are generated, there is some space "at the + * top" of the lower 32 bits on allmost all systems. If you have a setup + * with 64k threads per child and 255 child processes, you live on the edge. + * + * The new implementation shifts 8 bits and XORs in the worker + * id. This will experience collisions with > 256 h2 workers and heavy + * load still. There seems to be no way to solve this in all possible + * configurations by mod_h2 alone. + */ + if (c->master) { + c->id = (c->master->id << 8)^slot->id; + } + c->current_thread = thread; + AP_DEBUG_ASSERT(slot->prod); + +#if AP_HAS_RESPONSE_BUCKETS + ap_process_connection(c, ap_get_conn_socket(c)); +#else + h2_c2_process(c, thread, slot->id); +#endif + slot->prod->fn_done(slot->prod->baton, c); + + apr_thread_mutex_lock(workers->lock); + if (--slot->prod->conns_active <= 0) { + apr_thread_cond_broadcast(workers->prod_done); + } + if (slot->prod->state == PROD_IDLE) { + APR_RING_REMOVE(slot->prod, link); + slot->prod->state = PROD_ACTIVE; + APR_RING_INSERT_TAIL(&workers->prod_active, slot->prod, ap_conn_producer_t, link); + } + + } while (!workers->aborted && !slot->should_shutdown); + APR_RING_REMOVE(slot, link); /* no longer busy */ + } + + if (workers->aborted || slot->should_shutdown) { + break; + } + + join_zombies(workers); + + /* we are idle */ + APR_RING_INSERT_TAIL(&workers->idle, slot, h2_slot, link); + ++workers->idle_slots; + slot->is_idle = 1; + if (slot->id >= workers->min_active && workers->idle_limit > 0) { + rv = apr_thread_cond_timedwait(slot->more_work, workers->lock, + workers->idle_limit); + if (APR_TIMEUP == rv) { + APR_RING_REMOVE(slot, link); + --workers->idle_slots; + ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s, + "h2_workers: idle timeout slot %d in state %d (%d activations)", + slot->id, slot->state, slot->activations); + break; } } + else { + apr_thread_cond_wait(slot->more_work, workers->lock); + } + } + + ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, + "h2_workers: terminate slot %d in state %d (%d activations)", + slot->id, slot->state, slot->activations); + slot->is_idle = 0; + slot->state = H2_SLOT_ZOMBIE; + slot->should_shutdown = 0; + APR_RING_INSERT_TAIL(&workers->zombie, slot, h2_slot, link); + --workers->active_slots; + if (workers->active_slots <= 0) { + apr_thread_cond_broadcast(workers->all_done); } + apr_thread_mutex_unlock(workers->lock); - slot_done(slot); + apr_thread_exit(thread, APR_SUCCESS); return NULL; } +static void wake_all_idles(h2_workers *workers) +{ + h2_slot *slot; + for (slot = APR_RING_FIRST(&workers->idle); + slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link); + slot = APR_RING_NEXT(slot, link)) + { + apr_thread_cond_signal(slot->more_work); + } +} + static apr_status_t workers_pool_cleanup(void *data) { h2_workers *workers = data; - h2_slot *slot; - - if (!workers->aborted) { - workers->aborted = 1; - /* abort all idle slots */ - for (;;) { - slot = pop_slot(&workers->idle); - if (slot) { - apr_thread_mutex_lock(slot->lock); - slot->aborted = 1; - apr_thread_cond_signal(slot->not_idle); - apr_thread_mutex_unlock(slot->lock); - } - else { - break; - } - } + apr_time_t end, timeout = apr_time_from_sec(1); + apr_status_t rv; + int n = 0, wait_sec = 5; - h2_fifo_term(workers->mplxs); - h2_fifo_interrupt(workers->mplxs); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, + "h2_workers: cleanup %d workers (%d idle)", + workers->active_slots, workers->idle_slots); + apr_thread_mutex_lock(workers->lock); + workers->shutdown = 1; + workers->aborted = 1; + wake_all_idles(workers); + apr_thread_mutex_unlock(workers->lock); + + /* wait for all the workers to become zombies and join them. + * this gets called after the mpm shuts down and all connections + * have either been handled (graceful) or we are forced exiting + * (ungrateful). Either way, we show limited patience. */ + end = apr_time_now() + apr_time_from_sec(wait_sec); + while (apr_time_now() < end) { + apr_thread_mutex_lock(workers->lock); + if (!(n = workers->active_slots)) { + apr_thread_mutex_unlock(workers->lock); + break; + } + wake_all_idles(workers); + rv = apr_thread_cond_timedwait(workers->all_done, workers->lock, timeout); + apr_thread_mutex_unlock(workers->lock); - cleanup_zombies(workers); + if (APR_TIMEUP == rv) { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, + APLOGNO(10290) "h2_workers: waiting for workers to close, " + "still seeing %d workers (%d idle) living", + workers->active_slots, workers->idle_slots); + } + } + if (n) { + ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s, + APLOGNO(10291) "h2_workers: cleanup, %d workers (%d idle) " + "did not exit after %d seconds.", + n, workers->idle_slots, wait_sec); } + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, + "h2_workers: cleanup all workers terminated"); + apr_thread_mutex_lock(workers->lock); + join_zombies(workers); + apr_thread_mutex_unlock(workers->lock); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, + "h2_workers: cleanup zombie workers joined"); + return APR_SUCCESS; } -h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool, - int min_workers, int max_workers, - int idle_secs) +h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild, + int max_slots, int min_active, + apr_time_t idle_limit) { - apr_status_t status; + apr_status_t rv; h2_workers *workers; apr_pool_t *pool; - int i, n; + apr_allocator_t *allocator; + int locked = 0; + apr_uint32_t i; ap_assert(s); - ap_assert(server_pool); + ap_assert(pchild); + ap_assert(idle_limit > 0); /* let's have our own pool that will be parent to all h2_worker * instances we create. This happens in various threads, but always * guarded by our lock. Without this pool, all subpool creations would * happen on the pool handed to us, which we do not guard. */ - apr_pool_create(&pool, server_pool); + rv = apr_allocator_create(&allocator); + if (rv != APR_SUCCESS) { + goto cleanup; + } + rv = apr_pool_create_ex(&pool, pchild, NULL, allocator); + if (rv != APR_SUCCESS) { + apr_allocator_destroy(allocator); + goto cleanup; + } + apr_allocator_owner_set(allocator, pool); apr_pool_tag(pool, "h2_workers"); workers = apr_pcalloc(pool, sizeof(h2_workers)); if (!workers) { @@ -302,31 +457,27 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool, workers->s = s; workers->pool = pool; - workers->min_workers = min_workers; - workers->max_workers = max_workers; - workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10; - - /* FIXME: the fifo set we use here has limited capacity. Once the - * set is full, connections with new requests do a wait. Unfortunately, - * we have optimizations in place there that makes such waiting "unfair" - * in the sense that it may take connections a looong time to get scheduled. - * - * Need to rewrite this to use one of our double-linked lists and a mutex - * to have unlimited capacity and fair scheduling. - * - * For now, we just make enough room to have many connections inside one - * process. - */ - status = h2_fifo_set_create(&workers->mplxs, pool, 8 * 1024); - if (status != APR_SUCCESS) { - return NULL; - } - - status = apr_threadattr_create(&workers->thread_attr, workers->pool); - if (status != APR_SUCCESS) { - return NULL; - } - + workers->min_active = min_active; + workers->max_slots = max_slots; + workers->idle_limit = idle_limit; + workers->dynamic = (workers->min_active < workers->max_slots); + + ap_log_error(APLOG_MARK, APLOG_INFO, 0, s, + "h2_workers: created with min=%d max=%d idle_ms=%d", + workers->min_active, workers->max_slots, + (int)apr_time_as_msec(idle_limit)); + + APR_RING_INIT(&workers->idle, h2_slot, link); + APR_RING_INIT(&workers->busy, h2_slot, link); + APR_RING_INIT(&workers->free, h2_slot, link); + APR_RING_INIT(&workers->zombie, h2_slot, link); + + APR_RING_INIT(&workers->prod_active, ap_conn_producer_t, link); + APR_RING_INIT(&workers->prod_idle, ap_conn_producer_t, link); + + rv = apr_threadattr_create(&workers->thread_attr, workers->pool); + if (rv != APR_SUCCESS) goto cleanup; + if (ap_thread_stacksize != 0) { apr_threadattr_stacksize_set(workers->thread_attr, ap_thread_stacksize); @@ -335,49 +486,141 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool, (long)ap_thread_stacksize); } - status = apr_thread_mutex_create(&workers->lock, - APR_THREAD_MUTEX_DEFAULT, - workers->pool); - if (status == APR_SUCCESS) { - n = workers->nslots = workers->max_workers; - workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot)); - if (workers->slots == NULL) { - workers->nslots = 0; - status = APR_ENOMEM; - } - for (i = 0; i < n; ++i) { - workers->slots[i].id = i; - } + rv = apr_thread_mutex_create(&workers->lock, + APR_THREAD_MUTEX_DEFAULT, + workers->pool); + if (rv != APR_SUCCESS) goto cleanup; + rv = apr_thread_cond_create(&workers->all_done, workers->pool); + if (rv != APR_SUCCESS) goto cleanup; + rv = apr_thread_cond_create(&workers->prod_done, workers->pool); + if (rv != APR_SUCCESS) goto cleanup; + + apr_thread_mutex_lock(workers->lock); + locked = 1; + + /* create the slots and put them on the free list */ + workers->slots = apr_pcalloc(workers->pool, workers->max_slots * sizeof(h2_slot)); + + for (i = 0; i < workers->max_slots; ++i) { + workers->slots[i].id = i; + workers->slots[i].state = H2_SLOT_FREE; + workers->slots[i].workers = workers; + APR_RING_ELEM_INIT(&workers->slots[i], link); + APR_RING_INSERT_TAIL(&workers->free, &workers->slots[i], h2_slot, link); + rv = apr_thread_cond_create(&workers->slots[i].more_work, workers->pool); + if (rv != APR_SUCCESS) goto cleanup; } - if (status == APR_SUCCESS) { - /* we activate all for now, TODO: support min_workers again. - * do this in reverse for vanity reasons so slot 0 will most - * likely be at head of idle queue. */ - n = workers->max_workers; - for (i = n-1; i >= 0; --i) { - status = activate_slot(workers, &workers->slots[i]); - } - /* the rest of the slots go on the free list */ - for(i = n; i < workers->nslots; ++i) { - push_slot(&workers->free, &workers->slots[i]); - } - workers->dynamic = (workers->worker_count < workers->max_workers); + + /* activate the min amount of workers */ + for (i = 0; i < workers->min_active; ++i) { + rv = activate_slot(workers); + if (rv != APR_SUCCESS) goto cleanup; } - if (status == APR_SUCCESS) { - apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup); + +cleanup: + if (locked) { + apr_thread_mutex_unlock(workers->lock); + } + if (rv == APR_SUCCESS) { + /* Stop/join the workers threads when the MPM child exits (pchild is + * destroyed), and as a pre_cleanup of pchild thus before the threads + * pools (children of workers->pool) so that they are not destroyed + * before/under us. + */ + apr_pool_pre_cleanup_register(pchild, workers, workers_pool_cleanup); return workers; } + ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, s, + "h2_workers: errors initializing"); return NULL; } -apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m) +apr_uint32_t h2_workers_get_max_workers(h2_workers *workers) { - apr_status_t status = h2_fifo_push(workers->mplxs, m); - wake_idle_worker(workers); - return status; + return workers->max_slots; } -apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m) +void h2_workers_shutdown(h2_workers *workers, int graceful) { - return h2_fifo_remove(workers->mplxs, m); + ap_conn_producer_t *prod; + + apr_thread_mutex_lock(workers->lock); + ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s, + "h2_workers: shutdown graceful=%d", graceful); + workers->shutdown = 1; + workers->idle_limit = apr_time_from_sec(1); + wake_all_idles(workers); + for (prod = APR_RING_FIRST(&workers->prod_idle); + prod != APR_RING_SENTINEL(&workers->prod_idle, ap_conn_producer_t, link); + prod = APR_RING_NEXT(prod, link)) { + if (prod->fn_shutdown) { + prod->fn_shutdown(prod->baton, graceful); + } + } + apr_thread_mutex_unlock(workers->lock); +} + +ap_conn_producer_t *h2_workers_register(h2_workers *workers, + apr_pool_t *producer_pool, + const char *name, + ap_conn_producer_next *fn_next, + ap_conn_producer_done *fn_done, + ap_conn_producer_shutdown *fn_shutdown, + void *baton) +{ + ap_conn_producer_t *prod; + + prod = apr_pcalloc(producer_pool, sizeof(*prod)); + APR_RING_ELEM_INIT(prod, link); + prod->name = name; + prod->fn_next = fn_next; + prod->fn_done = fn_done; + prod->fn_shutdown = fn_shutdown; + prod->baton = baton; + + apr_thread_mutex_lock(workers->lock); + prod->state = PROD_IDLE; + APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link); + apr_thread_mutex_unlock(workers->lock); + + return prod; +} + +apr_status_t h2_workers_join(h2_workers *workers, ap_conn_producer_t *prod) +{ + apr_status_t rv = APR_SUCCESS; + + apr_thread_mutex_lock(workers->lock); + if (PROD_JOINED == prod->state) { + AP_DEBUG_ASSERT(APR_RING_NEXT(prod, link) == prod); /* should be in no ring */ + rv = APR_EINVAL; + } + else { + AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state || PROD_IDLE == prod->state); + APR_RING_REMOVE(prod, link); + prod->state = PROD_JOINED; /* prevent further activations */ + while (prod->conns_active > 0) { + apr_thread_cond_wait(workers->prod_done, workers->lock); + } + APR_RING_ELEM_INIT(prod, link); /* make it link to itself */ + } + apr_thread_mutex_unlock(workers->lock); + return rv; +} + +apr_status_t h2_workers_activate(h2_workers *workers, ap_conn_producer_t *prod) +{ + apr_status_t rv = APR_SUCCESS; + apr_thread_mutex_lock(workers->lock); + if (PROD_IDLE == prod->state) { + APR_RING_REMOVE(prod, link); + prod->state = PROD_ACTIVE; + APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link); + wake_idle_worker(workers, prod); + } + else if (PROD_JOINED == prod->state) { + rv = APR_EINVAL; + } + apr_thread_mutex_unlock(workers->lock); + return rv; } |