summaryrefslogtreecommitdiffstats
path: root/modules/http2/h2_workers.c
diff options
context:
space:
mode:
Diffstat (limited to 'modules/http2/h2_workers.c')
-rw-r--r--modules/http2/h2_workers.c755
1 files changed, 499 insertions, 256 deletions
diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c
index 699f533..e7e2039 100644
--- a/modules/http2/h2_workers.c
+++ b/modules/http2/h2_workers.c
@@ -15,285 +15,440 @@
*/
#include <assert.h>
-#include <apr_atomic.h>
+#include <apr_ring.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
#include <mpm_common.h>
#include <httpd.h>
+#include <http_connection.h>
#include <http_core.h>
#include <http_log.h>
+#include <http_protocol.h>
#include "h2.h"
#include "h2_private.h"
#include "h2_mplx.h"
-#include "h2_task.h"
+#include "h2_c2.h"
#include "h2_workers.h"
#include "h2_util.h"
+typedef enum {
+ PROD_IDLE,
+ PROD_ACTIVE,
+ PROD_JOINED,
+} prod_state_t;
+
+struct ap_conn_producer_t {
+ APR_RING_ENTRY(ap_conn_producer_t) link;
+ const char *name;
+ void *baton;
+ ap_conn_producer_next *fn_next;
+ ap_conn_producer_done *fn_done;
+ ap_conn_producer_shutdown *fn_shutdown;
+ volatile prod_state_t state;
+ volatile int conns_active;
+};
+
+
+typedef enum {
+ H2_SLOT_FREE,
+ H2_SLOT_RUN,
+ H2_SLOT_ZOMBIE,
+} h2_slot_state_t;
+
typedef struct h2_slot h2_slot;
struct h2_slot {
- int id;
- h2_slot *next;
+ APR_RING_ENTRY(h2_slot) link;
+ apr_uint32_t id;
+ apr_pool_t *pool;
+ h2_slot_state_t state;
+ volatile int should_shutdown;
+ volatile int is_idle;
h2_workers *workers;
- int aborted;
- int sticks;
- h2_task *task;
+ ap_conn_producer_t *prod;
apr_thread_t *thread;
- apr_thread_mutex_t *lock;
- apr_thread_cond_t *not_idle;
+ struct apr_thread_cond_t *more_work;
+ int activations;
};
-static h2_slot *pop_slot(h2_slot **phead)
-{
- /* Atomically pop a slot from the list */
- for (;;) {
- h2_slot *first = *phead;
- if (first == NULL) {
- return NULL;
- }
- if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
- first->next = NULL;
- return first;
- }
- }
-}
+struct h2_workers {
+ server_rec *s;
+ apr_pool_t *pool;
+
+ apr_uint32_t max_slots;
+ apr_uint32_t min_active;
+ volatile apr_time_t idle_limit;
+ volatile int aborted;
+ volatile int shutdown;
+ int dynamic;
+
+ volatile apr_uint32_t active_slots;
+ volatile apr_uint32_t idle_slots;
+
+ apr_threadattr_t *thread_attr;
+ h2_slot *slots;
+
+ APR_RING_HEAD(h2_slots_free, h2_slot) free;
+ APR_RING_HEAD(h2_slots_idle, h2_slot) idle;
+ APR_RING_HEAD(h2_slots_busy, h2_slot) busy;
+ APR_RING_HEAD(h2_slots_zombie, h2_slot) zombie;
+
+ APR_RING_HEAD(ap_conn_producer_active, ap_conn_producer_t) prod_active;
+ APR_RING_HEAD(ap_conn_producer_idle, ap_conn_producer_t) prod_idle;
+
+ struct apr_thread_mutex_t *lock;
+ struct apr_thread_cond_t *prod_done;
+ struct apr_thread_cond_t *all_done;
+};
-static void push_slot(h2_slot **phead, h2_slot *slot)
-{
- /* Atomically push a slot to the list */
- ap_assert(!slot->next);
- for (;;) {
- h2_slot *next = slot->next = *phead;
- if (apr_atomic_casptr((void*)phead, slot, next) == next) {
- return;
- }
- }
-}
static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx);
-static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
+static apr_status_t activate_slot(h2_workers *workers)
{
- apr_status_t status;
-
- slot->workers = workers;
- slot->aborted = 0;
- slot->task = NULL;
-
- if (!slot->lock) {
- status = apr_thread_mutex_create(&slot->lock,
- APR_THREAD_MUTEX_DEFAULT,
- workers->pool);
- if (status != APR_SUCCESS) {
- push_slot(&workers->free, slot);
- return status;
- }
- }
+ h2_slot *slot;
+ apr_pool_t *pool;
+ apr_status_t rv;
- if (!slot->not_idle) {
- status = apr_thread_cond_create(&slot->not_idle, workers->pool);
- if (status != APR_SUCCESS) {
- push_slot(&workers->free, slot);
- return status;
- }
+ if (APR_RING_EMPTY(&workers->free, h2_slot, link)) {
+ return APR_EAGAIN;
}
-
- ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
- "h2_workers: new thread for slot %d", slot->id);
- /* thread will either immediately start work or add itself
- * to the idle queue */
- apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot,
- workers->pool);
- if (!slot->thread) {
- push_slot(&workers->free, slot);
- return APR_ENOMEM;
- }
-
- apr_atomic_inc32(&workers->worker_count);
- return APR_SUCCESS;
-}
+ slot = APR_RING_FIRST(&workers->free);
+ ap_assert(slot->state == H2_SLOT_FREE);
+ APR_RING_REMOVE(slot, link);
-static apr_status_t add_worker(h2_workers *workers)
-{
- h2_slot *slot = pop_slot(&workers->free);
- if (slot) {
- return activate_slot(workers, slot);
- }
- return APR_EAGAIN;
-}
+ ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
+ "h2_workers: activate slot %d", slot->id);
-static void wake_idle_worker(h2_workers *workers)
-{
- h2_slot *slot = pop_slot(&workers->idle);
- if (slot) {
- apr_thread_mutex_lock(slot->lock);
- apr_thread_cond_signal(slot->not_idle);
- apr_thread_mutex_unlock(slot->lock);
- }
- else if (workers->dynamic) {
- add_worker(workers);
+ slot->state = H2_SLOT_RUN;
+ slot->should_shutdown = 0;
+ slot->is_idle = 0;
+ slot->pool = NULL;
+ ++workers->active_slots;
+ rv = apr_pool_create(&pool, workers->pool);
+ if (APR_SUCCESS != rv) goto cleanup;
+ apr_pool_tag(pool, "h2_worker_slot");
+ slot->pool = pool;
+
+ rv = ap_thread_create(&slot->thread, workers->thread_attr,
+ slot_run, slot, slot->pool);
+
+cleanup:
+ if (rv != APR_SUCCESS) {
+ AP_DEBUG_ASSERT(0);
+ slot->state = H2_SLOT_FREE;
+ if (slot->pool) {
+ apr_pool_destroy(slot->pool);
+ slot->pool = NULL;
+ }
+ APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link);
+ --workers->active_slots;
}
+ return rv;
}
-static void cleanup_zombies(h2_workers *workers)
+static void join_zombies(h2_workers *workers)
{
h2_slot *slot;
- while ((slot = pop_slot(&workers->zombies))) {
- if (slot->thread) {
- apr_status_t status;
- apr_thread_join(&status, slot->thread);
- slot->thread = NULL;
+ apr_status_t status;
+
+ while (!APR_RING_EMPTY(&workers->zombie, h2_slot, link)) {
+ slot = APR_RING_FIRST(&workers->zombie);
+ APR_RING_REMOVE(slot, link);
+ ap_assert(slot->state == H2_SLOT_ZOMBIE);
+ ap_assert(slot->thread != NULL);
+
+ apr_thread_mutex_unlock(workers->lock);
+ apr_thread_join(&status, slot->thread);
+ apr_thread_mutex_lock(workers->lock);
+
+ slot->thread = NULL;
+ slot->state = H2_SLOT_FREE;
+ if (slot->pool) {
+ apr_pool_destroy(slot->pool);
+ slot->pool = NULL;
}
- apr_atomic_dec32(&workers->worker_count);
- slot->next = NULL;
- push_slot(&workers->free, slot);
+ APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link);
}
}
-static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m)
+static void wake_idle_worker(h2_workers *workers, ap_conn_producer_t *prod)
{
- apr_status_t rv;
-
- rv = h2_mplx_pop_task(m, &slot->task);
- if (slot->task) {
- /* Ok, we got something to give back to the worker for execution.
- * If we still have idle workers, we let the worker be sticky,
- * e.g. making it poll the task's h2_mplx instance for more work
- * before asking back here. */
- slot->sticks = slot->workers->max_workers;
- return rv;
+ if (!APR_RING_EMPTY(&workers->idle, h2_slot, link)) {
+ h2_slot *slot;
+ for (slot = APR_RING_FIRST(&workers->idle);
+ slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link);
+ slot = APR_RING_NEXT(slot, link)) {
+ if (slot->is_idle && !slot->should_shutdown) {
+ apr_thread_cond_signal(slot->more_work);
+ slot->is_idle = 0;
+ return;
+ }
+ }
+ }
+ if (workers->dynamic && !workers->shutdown
+ && (workers->active_slots < workers->max_slots)) {
+ activate_slot(workers);
}
- slot->sticks = 0;
- return APR_EOF;
-}
-
-static h2_fifo_op_t mplx_peek(void *head, void *ctx)
-{
- h2_mplx *m = head;
- h2_slot *slot = ctx;
-
- if (slot_pull_task(slot, m) == APR_EAGAIN) {
- wake_idle_worker(slot->workers);
- return H2_FIFO_OP_REPUSH;
- }
- return H2_FIFO_OP_PULL;
}
/**
- * Get the next task for the given worker. Will block until a task arrives
- * or the max_wait timer expires and more than min workers exist.
+ * Get the next connection to work on.
*/
-static apr_status_t get_next(h2_slot *slot)
+static conn_rec *get_next(h2_slot *slot)
{
h2_workers *workers = slot->workers;
- apr_status_t status;
-
- slot->task = NULL;
- while (!slot->aborted) {
- if (!slot->task) {
- status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
- if (status == APR_EOF) {
- return status;
- }
+ conn_rec *c = NULL;
+ ap_conn_producer_t *prod;
+ int has_more;
+
+ slot->prod = NULL;
+ if (!APR_RING_EMPTY(&workers->prod_active, ap_conn_producer_t, link)) {
+ slot->prod = prod = APR_RING_FIRST(&workers->prod_active);
+ APR_RING_REMOVE(prod, link);
+ AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state);
+
+ c = prod->fn_next(prod->baton, &has_more);
+ if (c && has_more) {
+ APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link);
+ wake_idle_worker(workers, slot->prod);
}
-
- if (slot->task) {
- return APR_SUCCESS;
+ else {
+ prod->state = PROD_IDLE;
+ APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link);
+ }
+ if (c) {
+ ++prod->conns_active;
}
-
- cleanup_zombies(workers);
-
- apr_thread_mutex_lock(slot->lock);
- push_slot(&workers->idle, slot);
- apr_thread_cond_wait(slot->not_idle, slot->lock);
- apr_thread_mutex_unlock(slot->lock);
}
- return APR_EOF;
-}
-static void slot_done(h2_slot *slot)
-{
- push_slot(&(slot->workers->zombies), slot);
+ return c;
}
-
static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
{
h2_slot *slot = wctx;
-
- while (!slot->aborted) {
-
- /* Get a h2_task from the mplxs queue. */
- get_next(slot);
- while (slot->task) {
-
- h2_task_do(slot->task, thread, slot->id);
-
- /* Report the task as done. If stickyness is left, offer the
- * mplx the opportunity to give us back a new task right away.
- */
- if (!slot->aborted && (--slot->sticks > 0)) {
- h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task);
- }
- else {
- h2_mplx_task_done(slot->task->mplx, slot->task, NULL);
- slot->task = NULL;
+ h2_workers *workers = slot->workers;
+ conn_rec *c;
+ apr_status_t rv;
+
+ apr_thread_mutex_lock(workers->lock);
+ slot->state = H2_SLOT_RUN;
+ ++slot->activations;
+ APR_RING_ELEM_INIT(slot, link);
+ for(;;) {
+ if (APR_RING_NEXT(slot, link) != slot) {
+ /* slot is part of the idle ring from the last loop */
+ APR_RING_REMOVE(slot, link);
+ --workers->idle_slots;
+ }
+ slot->is_idle = 0;
+
+ if (!workers->aborted && !slot->should_shutdown) {
+ APR_RING_INSERT_TAIL(&workers->busy, slot, h2_slot, link);
+ do {
+ c = get_next(slot);
+ if (!c) {
+ break;
+ }
+ apr_thread_mutex_unlock(workers->lock);
+ /* See the discussion at <https://github.com/icing/mod_h2/issues/195>
+ *
+ * Each conn_rec->id is supposed to be unique at a point in time. Since
+ * some modules (and maybe external code) uses this id as an identifier
+ * for the request_rec they handle, it needs to be unique for secondary
+ * connections also.
+ *
+ * The MPM module assigns the connection ids and mod_unique_id is using
+ * that one to generate identifier for requests. While the implementation
+ * works for HTTP/1.x, the parallel execution of several requests per
+ * connection will generate duplicate identifiers on load.
+ *
+ * The original implementation for secondary connection identifiers used
+ * to shift the master connection id up and assign the stream id to the
+ * lower bits. This was cramped on 32 bit systems, but on 64bit there was
+ * enough space.
+ *
+ * As issue 195 showed, mod_unique_id only uses the lower 32 bit of the
+ * connection id, even on 64bit systems. Therefore collisions in request ids.
+ *
+ * The way master connection ids are generated, there is some space "at the
+ * top" of the lower 32 bits on allmost all systems. If you have a setup
+ * with 64k threads per child and 255 child processes, you live on the edge.
+ *
+ * The new implementation shifts 8 bits and XORs in the worker
+ * id. This will experience collisions with > 256 h2 workers and heavy
+ * load still. There seems to be no way to solve this in all possible
+ * configurations by mod_h2 alone.
+ */
+ if (c->master) {
+ c->id = (c->master->id << 8)^slot->id;
+ }
+ c->current_thread = thread;
+ AP_DEBUG_ASSERT(slot->prod);
+
+#if AP_HAS_RESPONSE_BUCKETS
+ ap_process_connection(c, ap_get_conn_socket(c));
+#else
+ h2_c2_process(c, thread, slot->id);
+#endif
+ slot->prod->fn_done(slot->prod->baton, c);
+
+ apr_thread_mutex_lock(workers->lock);
+ if (--slot->prod->conns_active <= 0) {
+ apr_thread_cond_broadcast(workers->prod_done);
+ }
+ if (slot->prod->state == PROD_IDLE) {
+ APR_RING_REMOVE(slot->prod, link);
+ slot->prod->state = PROD_ACTIVE;
+ APR_RING_INSERT_TAIL(&workers->prod_active, slot->prod, ap_conn_producer_t, link);
+ }
+
+ } while (!workers->aborted && !slot->should_shutdown);
+ APR_RING_REMOVE(slot, link); /* no longer busy */
+ }
+
+ if (workers->aborted || slot->should_shutdown) {
+ break;
+ }
+
+ join_zombies(workers);
+
+ /* we are idle */
+ APR_RING_INSERT_TAIL(&workers->idle, slot, h2_slot, link);
+ ++workers->idle_slots;
+ slot->is_idle = 1;
+ if (slot->id >= workers->min_active && workers->idle_limit > 0) {
+ rv = apr_thread_cond_timedwait(slot->more_work, workers->lock,
+ workers->idle_limit);
+ if (APR_TIMEUP == rv) {
+ APR_RING_REMOVE(slot, link);
+ --workers->idle_slots;
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
+ "h2_workers: idle timeout slot %d in state %d (%d activations)",
+ slot->id, slot->state, slot->activations);
+ break;
}
}
+ else {
+ apr_thread_cond_wait(slot->more_work, workers->lock);
+ }
+ }
+
+ ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
+ "h2_workers: terminate slot %d in state %d (%d activations)",
+ slot->id, slot->state, slot->activations);
+ slot->is_idle = 0;
+ slot->state = H2_SLOT_ZOMBIE;
+ slot->should_shutdown = 0;
+ APR_RING_INSERT_TAIL(&workers->zombie, slot, h2_slot, link);
+ --workers->active_slots;
+ if (workers->active_slots <= 0) {
+ apr_thread_cond_broadcast(workers->all_done);
}
+ apr_thread_mutex_unlock(workers->lock);
- slot_done(slot);
+ apr_thread_exit(thread, APR_SUCCESS);
return NULL;
}
+static void wake_all_idles(h2_workers *workers)
+{
+ h2_slot *slot;
+ for (slot = APR_RING_FIRST(&workers->idle);
+ slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link);
+ slot = APR_RING_NEXT(slot, link))
+ {
+ apr_thread_cond_signal(slot->more_work);
+ }
+}
+
static apr_status_t workers_pool_cleanup(void *data)
{
h2_workers *workers = data;
- h2_slot *slot;
-
- if (!workers->aborted) {
- workers->aborted = 1;
- /* abort all idle slots */
- for (;;) {
- slot = pop_slot(&workers->idle);
- if (slot) {
- apr_thread_mutex_lock(slot->lock);
- slot->aborted = 1;
- apr_thread_cond_signal(slot->not_idle);
- apr_thread_mutex_unlock(slot->lock);
- }
- else {
- break;
- }
- }
+ apr_time_t end, timeout = apr_time_from_sec(1);
+ apr_status_t rv;
+ int n = 0, wait_sec = 5;
- h2_fifo_term(workers->mplxs);
- h2_fifo_interrupt(workers->mplxs);
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
+ "h2_workers: cleanup %d workers (%d idle)",
+ workers->active_slots, workers->idle_slots);
+ apr_thread_mutex_lock(workers->lock);
+ workers->shutdown = 1;
+ workers->aborted = 1;
+ wake_all_idles(workers);
+ apr_thread_mutex_unlock(workers->lock);
+
+ /* wait for all the workers to become zombies and join them.
+ * this gets called after the mpm shuts down and all connections
+ * have either been handled (graceful) or we are forced exiting
+ * (ungrateful). Either way, we show limited patience. */
+ end = apr_time_now() + apr_time_from_sec(wait_sec);
+ while (apr_time_now() < end) {
+ apr_thread_mutex_lock(workers->lock);
+ if (!(n = workers->active_slots)) {
+ apr_thread_mutex_unlock(workers->lock);
+ break;
+ }
+ wake_all_idles(workers);
+ rv = apr_thread_cond_timedwait(workers->all_done, workers->lock, timeout);
+ apr_thread_mutex_unlock(workers->lock);
- cleanup_zombies(workers);
+ if (APR_TIMEUP == rv) {
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
+ APLOGNO(10290) "h2_workers: waiting for workers to close, "
+ "still seeing %d workers (%d idle) living",
+ workers->active_slots, workers->idle_slots);
+ }
+ }
+ if (n) {
+ ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s,
+ APLOGNO(10291) "h2_workers: cleanup, %d workers (%d idle) "
+ "did not exit after %d seconds.",
+ n, workers->idle_slots, wait_sec);
}
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
+ "h2_workers: cleanup all workers terminated");
+ apr_thread_mutex_lock(workers->lock);
+ join_zombies(workers);
+ apr_thread_mutex_unlock(workers->lock);
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
+ "h2_workers: cleanup zombie workers joined");
+
return APR_SUCCESS;
}
-h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
- int min_workers, int max_workers,
- int idle_secs)
+h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild,
+ int max_slots, int min_active,
+ apr_time_t idle_limit)
{
- apr_status_t status;
+ apr_status_t rv;
h2_workers *workers;
apr_pool_t *pool;
- int i, n;
+ apr_allocator_t *allocator;
+ int locked = 0;
+ apr_uint32_t i;
ap_assert(s);
- ap_assert(server_pool);
+ ap_assert(pchild);
+ ap_assert(idle_limit > 0);
/* let's have our own pool that will be parent to all h2_worker
* instances we create. This happens in various threads, but always
* guarded by our lock. Without this pool, all subpool creations would
* happen on the pool handed to us, which we do not guard.
*/
- apr_pool_create(&pool, server_pool);
+ rv = apr_allocator_create(&allocator);
+ if (rv != APR_SUCCESS) {
+ goto cleanup;
+ }
+ rv = apr_pool_create_ex(&pool, pchild, NULL, allocator);
+ if (rv != APR_SUCCESS) {
+ apr_allocator_destroy(allocator);
+ goto cleanup;
+ }
+ apr_allocator_owner_set(allocator, pool);
apr_pool_tag(pool, "h2_workers");
workers = apr_pcalloc(pool, sizeof(h2_workers));
if (!workers) {
@@ -302,31 +457,27 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
workers->s = s;
workers->pool = pool;
- workers->min_workers = min_workers;
- workers->max_workers = max_workers;
- workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
-
- /* FIXME: the fifo set we use here has limited capacity. Once the
- * set is full, connections with new requests do a wait. Unfortunately,
- * we have optimizations in place there that makes such waiting "unfair"
- * in the sense that it may take connections a looong time to get scheduled.
- *
- * Need to rewrite this to use one of our double-linked lists and a mutex
- * to have unlimited capacity and fair scheduling.
- *
- * For now, we just make enough room to have many connections inside one
- * process.
- */
- status = h2_fifo_set_create(&workers->mplxs, pool, 8 * 1024);
- if (status != APR_SUCCESS) {
- return NULL;
- }
-
- status = apr_threadattr_create(&workers->thread_attr, workers->pool);
- if (status != APR_SUCCESS) {
- return NULL;
- }
-
+ workers->min_active = min_active;
+ workers->max_slots = max_slots;
+ workers->idle_limit = idle_limit;
+ workers->dynamic = (workers->min_active < workers->max_slots);
+
+ ap_log_error(APLOG_MARK, APLOG_INFO, 0, s,
+ "h2_workers: created with min=%d max=%d idle_ms=%d",
+ workers->min_active, workers->max_slots,
+ (int)apr_time_as_msec(idle_limit));
+
+ APR_RING_INIT(&workers->idle, h2_slot, link);
+ APR_RING_INIT(&workers->busy, h2_slot, link);
+ APR_RING_INIT(&workers->free, h2_slot, link);
+ APR_RING_INIT(&workers->zombie, h2_slot, link);
+
+ APR_RING_INIT(&workers->prod_active, ap_conn_producer_t, link);
+ APR_RING_INIT(&workers->prod_idle, ap_conn_producer_t, link);
+
+ rv = apr_threadattr_create(&workers->thread_attr, workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
+
if (ap_thread_stacksize != 0) {
apr_threadattr_stacksize_set(workers->thread_attr,
ap_thread_stacksize);
@@ -335,49 +486,141 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
(long)ap_thread_stacksize);
}
- status = apr_thread_mutex_create(&workers->lock,
- APR_THREAD_MUTEX_DEFAULT,
- workers->pool);
- if (status == APR_SUCCESS) {
- n = workers->nslots = workers->max_workers;
- workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
- if (workers->slots == NULL) {
- workers->nslots = 0;
- status = APR_ENOMEM;
- }
- for (i = 0; i < n; ++i) {
- workers->slots[i].id = i;
- }
+ rv = apr_thread_mutex_create(&workers->lock,
+ APR_THREAD_MUTEX_DEFAULT,
+ workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
+ rv = apr_thread_cond_create(&workers->all_done, workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
+ rv = apr_thread_cond_create(&workers->prod_done, workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
+
+ apr_thread_mutex_lock(workers->lock);
+ locked = 1;
+
+ /* create the slots and put them on the free list */
+ workers->slots = apr_pcalloc(workers->pool, workers->max_slots * sizeof(h2_slot));
+
+ for (i = 0; i < workers->max_slots; ++i) {
+ workers->slots[i].id = i;
+ workers->slots[i].state = H2_SLOT_FREE;
+ workers->slots[i].workers = workers;
+ APR_RING_ELEM_INIT(&workers->slots[i], link);
+ APR_RING_INSERT_TAIL(&workers->free, &workers->slots[i], h2_slot, link);
+ rv = apr_thread_cond_create(&workers->slots[i].more_work, workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
}
- if (status == APR_SUCCESS) {
- /* we activate all for now, TODO: support min_workers again.
- * do this in reverse for vanity reasons so slot 0 will most
- * likely be at head of idle queue. */
- n = workers->max_workers;
- for (i = n-1; i >= 0; --i) {
- status = activate_slot(workers, &workers->slots[i]);
- }
- /* the rest of the slots go on the free list */
- for(i = n; i < workers->nslots; ++i) {
- push_slot(&workers->free, &workers->slots[i]);
- }
- workers->dynamic = (workers->worker_count < workers->max_workers);
+
+ /* activate the min amount of workers */
+ for (i = 0; i < workers->min_active; ++i) {
+ rv = activate_slot(workers);
+ if (rv != APR_SUCCESS) goto cleanup;
}
- if (status == APR_SUCCESS) {
- apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup);
+
+cleanup:
+ if (locked) {
+ apr_thread_mutex_unlock(workers->lock);
+ }
+ if (rv == APR_SUCCESS) {
+ /* Stop/join the workers threads when the MPM child exits (pchild is
+ * destroyed), and as a pre_cleanup of pchild thus before the threads
+ * pools (children of workers->pool) so that they are not destroyed
+ * before/under us.
+ */
+ apr_pool_pre_cleanup_register(pchild, workers, workers_pool_cleanup);
return workers;
}
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, s,
+ "h2_workers: errors initializing");
return NULL;
}
-apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
+apr_uint32_t h2_workers_get_max_workers(h2_workers *workers)
{
- apr_status_t status = h2_fifo_push(workers->mplxs, m);
- wake_idle_worker(workers);
- return status;
+ return workers->max_slots;
}
-apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
+void h2_workers_shutdown(h2_workers *workers, int graceful)
{
- return h2_fifo_remove(workers->mplxs, m);
+ ap_conn_producer_t *prod;
+
+ apr_thread_mutex_lock(workers->lock);
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
+ "h2_workers: shutdown graceful=%d", graceful);
+ workers->shutdown = 1;
+ workers->idle_limit = apr_time_from_sec(1);
+ wake_all_idles(workers);
+ for (prod = APR_RING_FIRST(&workers->prod_idle);
+ prod != APR_RING_SENTINEL(&workers->prod_idle, ap_conn_producer_t, link);
+ prod = APR_RING_NEXT(prod, link)) {
+ if (prod->fn_shutdown) {
+ prod->fn_shutdown(prod->baton, graceful);
+ }
+ }
+ apr_thread_mutex_unlock(workers->lock);
+}
+
+ap_conn_producer_t *h2_workers_register(h2_workers *workers,
+ apr_pool_t *producer_pool,
+ const char *name,
+ ap_conn_producer_next *fn_next,
+ ap_conn_producer_done *fn_done,
+ ap_conn_producer_shutdown *fn_shutdown,
+ void *baton)
+{
+ ap_conn_producer_t *prod;
+
+ prod = apr_pcalloc(producer_pool, sizeof(*prod));
+ APR_RING_ELEM_INIT(prod, link);
+ prod->name = name;
+ prod->fn_next = fn_next;
+ prod->fn_done = fn_done;
+ prod->fn_shutdown = fn_shutdown;
+ prod->baton = baton;
+
+ apr_thread_mutex_lock(workers->lock);
+ prod->state = PROD_IDLE;
+ APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link);
+ apr_thread_mutex_unlock(workers->lock);
+
+ return prod;
+}
+
+apr_status_t h2_workers_join(h2_workers *workers, ap_conn_producer_t *prod)
+{
+ apr_status_t rv = APR_SUCCESS;
+
+ apr_thread_mutex_lock(workers->lock);
+ if (PROD_JOINED == prod->state) {
+ AP_DEBUG_ASSERT(APR_RING_NEXT(prod, link) == prod); /* should be in no ring */
+ rv = APR_EINVAL;
+ }
+ else {
+ AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state || PROD_IDLE == prod->state);
+ APR_RING_REMOVE(prod, link);
+ prod->state = PROD_JOINED; /* prevent further activations */
+ while (prod->conns_active > 0) {
+ apr_thread_cond_wait(workers->prod_done, workers->lock);
+ }
+ APR_RING_ELEM_INIT(prod, link); /* make it link to itself */
+ }
+ apr_thread_mutex_unlock(workers->lock);
+ return rv;
+}
+
+apr_status_t h2_workers_activate(h2_workers *workers, ap_conn_producer_t *prod)
+{
+ apr_status_t rv = APR_SUCCESS;
+ apr_thread_mutex_lock(workers->lock);
+ if (PROD_IDLE == prod->state) {
+ APR_RING_REMOVE(prod, link);
+ prod->state = PROD_ACTIVE;
+ APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link);
+ wake_idle_worker(workers, prod);
+ }
+ else if (PROD_JOINED == prod->state) {
+ rv = APR_EINVAL;
+ }
+ apr_thread_mutex_unlock(workers->lock);
+ return rv;
}