diff options
Diffstat (limited to '')
-rw-r--r-- | modules/http2/h2_workers.c | 383 |
1 files changed, 383 insertions, 0 deletions
diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c new file mode 100644 index 0000000..699f533 --- /dev/null +++ b/modules/http2/h2_workers.c @@ -0,0 +1,383 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <assert.h> +#include <apr_atomic.h> +#include <apr_thread_mutex.h> +#include <apr_thread_cond.h> + +#include <mpm_common.h> +#include <httpd.h> +#include <http_core.h> +#include <http_log.h> + +#include "h2.h" +#include "h2_private.h" +#include "h2_mplx.h" +#include "h2_task.h" +#include "h2_workers.h" +#include "h2_util.h" + +typedef struct h2_slot h2_slot; +struct h2_slot { + int id; + h2_slot *next; + h2_workers *workers; + int aborted; + int sticks; + h2_task *task; + apr_thread_t *thread; + apr_thread_mutex_t *lock; + apr_thread_cond_t *not_idle; +}; + +static h2_slot *pop_slot(h2_slot **phead) +{ + /* Atomically pop a slot from the list */ + for (;;) { + h2_slot *first = *phead; + if (first == NULL) { + return NULL; + } + if (apr_atomic_casptr((void*)phead, first->next, first) == first) { + first->next = NULL; + return first; + } + } +} + +static void push_slot(h2_slot **phead, h2_slot *slot) +{ + /* Atomically push a slot to the list */ + ap_assert(!slot->next); + for (;;) { + h2_slot *next = slot->next = *phead; + if (apr_atomic_casptr((void*)phead, slot, next) == next) { + return; + } + } +} + +static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx); + +static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) +{ + apr_status_t status; + + slot->workers = workers; + slot->aborted = 0; + slot->task = NULL; + + if (!slot->lock) { + status = apr_thread_mutex_create(&slot->lock, + APR_THREAD_MUTEX_DEFAULT, + workers->pool); + if (status != APR_SUCCESS) { + push_slot(&workers->free, slot); + return status; + } + } + + if (!slot->not_idle) { + status = apr_thread_cond_create(&slot->not_idle, workers->pool); + if (status != APR_SUCCESS) { + push_slot(&workers->free, slot); + return status; + } + } + + ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s, + "h2_workers: new thread for slot %d", slot->id); + /* thread will either immediately start work or add itself + * to the idle queue */ + apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot, + workers->pool); + if (!slot->thread) { + push_slot(&workers->free, slot); + return APR_ENOMEM; + } + + apr_atomic_inc32(&workers->worker_count); + return APR_SUCCESS; +} + +static apr_status_t add_worker(h2_workers *workers) +{ + h2_slot *slot = pop_slot(&workers->free); + if (slot) { + return activate_slot(workers, slot); + } + return APR_EAGAIN; +} + +static void wake_idle_worker(h2_workers *workers) +{ + h2_slot *slot = pop_slot(&workers->idle); + if (slot) { + apr_thread_mutex_lock(slot->lock); + apr_thread_cond_signal(slot->not_idle); + apr_thread_mutex_unlock(slot->lock); + } + else if (workers->dynamic) { + add_worker(workers); + } +} + +static void cleanup_zombies(h2_workers *workers) +{ + h2_slot *slot; + while ((slot = pop_slot(&workers->zombies))) { + if (slot->thread) { + apr_status_t status; + apr_thread_join(&status, slot->thread); + slot->thread = NULL; + } + apr_atomic_dec32(&workers->worker_count); + slot->next = NULL; + push_slot(&workers->free, slot); + } +} + +static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m) +{ + apr_status_t rv; + + rv = h2_mplx_pop_task(m, &slot->task); + if (slot->task) { + /* Ok, we got something to give back to the worker for execution. + * If we still have idle workers, we let the worker be sticky, + * e.g. making it poll the task's h2_mplx instance for more work + * before asking back here. */ + slot->sticks = slot->workers->max_workers; + return rv; + } + slot->sticks = 0; + return APR_EOF; +} + +static h2_fifo_op_t mplx_peek(void *head, void *ctx) +{ + h2_mplx *m = head; + h2_slot *slot = ctx; + + if (slot_pull_task(slot, m) == APR_EAGAIN) { + wake_idle_worker(slot->workers); + return H2_FIFO_OP_REPUSH; + } + return H2_FIFO_OP_PULL; +} + +/** + * Get the next task for the given worker. Will block until a task arrives + * or the max_wait timer expires and more than min workers exist. + */ +static apr_status_t get_next(h2_slot *slot) +{ + h2_workers *workers = slot->workers; + apr_status_t status; + + slot->task = NULL; + while (!slot->aborted) { + if (!slot->task) { + status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot); + if (status == APR_EOF) { + return status; + } + } + + if (slot->task) { + return APR_SUCCESS; + } + + cleanup_zombies(workers); + + apr_thread_mutex_lock(slot->lock); + push_slot(&workers->idle, slot); + apr_thread_cond_wait(slot->not_idle, slot->lock); + apr_thread_mutex_unlock(slot->lock); + } + return APR_EOF; +} + +static void slot_done(h2_slot *slot) +{ + push_slot(&(slot->workers->zombies), slot); +} + + +static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx) +{ + h2_slot *slot = wctx; + + while (!slot->aborted) { + + /* Get a h2_task from the mplxs queue. */ + get_next(slot); + while (slot->task) { + + h2_task_do(slot->task, thread, slot->id); + + /* Report the task as done. If stickyness is left, offer the + * mplx the opportunity to give us back a new task right away. + */ + if (!slot->aborted && (--slot->sticks > 0)) { + h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task); + } + else { + h2_mplx_task_done(slot->task->mplx, slot->task, NULL); + slot->task = NULL; + } + } + } + + slot_done(slot); + return NULL; +} + +static apr_status_t workers_pool_cleanup(void *data) +{ + h2_workers *workers = data; + h2_slot *slot; + + if (!workers->aborted) { + workers->aborted = 1; + /* abort all idle slots */ + for (;;) { + slot = pop_slot(&workers->idle); + if (slot) { + apr_thread_mutex_lock(slot->lock); + slot->aborted = 1; + apr_thread_cond_signal(slot->not_idle); + apr_thread_mutex_unlock(slot->lock); + } + else { + break; + } + } + + h2_fifo_term(workers->mplxs); + h2_fifo_interrupt(workers->mplxs); + + cleanup_zombies(workers); + } + return APR_SUCCESS; +} + +h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool, + int min_workers, int max_workers, + int idle_secs) +{ + apr_status_t status; + h2_workers *workers; + apr_pool_t *pool; + int i, n; + + ap_assert(s); + ap_assert(server_pool); + + /* let's have our own pool that will be parent to all h2_worker + * instances we create. This happens in various threads, but always + * guarded by our lock. Without this pool, all subpool creations would + * happen on the pool handed to us, which we do not guard. + */ + apr_pool_create(&pool, server_pool); + apr_pool_tag(pool, "h2_workers"); + workers = apr_pcalloc(pool, sizeof(h2_workers)); + if (!workers) { + return NULL; + } + + workers->s = s; + workers->pool = pool; + workers->min_workers = min_workers; + workers->max_workers = max_workers; + workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10; + + /* FIXME: the fifo set we use here has limited capacity. Once the + * set is full, connections with new requests do a wait. Unfortunately, + * we have optimizations in place there that makes such waiting "unfair" + * in the sense that it may take connections a looong time to get scheduled. + * + * Need to rewrite this to use one of our double-linked lists and a mutex + * to have unlimited capacity and fair scheduling. + * + * For now, we just make enough room to have many connections inside one + * process. + */ + status = h2_fifo_set_create(&workers->mplxs, pool, 8 * 1024); + if (status != APR_SUCCESS) { + return NULL; + } + + status = apr_threadattr_create(&workers->thread_attr, workers->pool); + if (status != APR_SUCCESS) { + return NULL; + } + + if (ap_thread_stacksize != 0) { + apr_threadattr_stacksize_set(workers->thread_attr, + ap_thread_stacksize); + ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s, + "h2_workers: using stacksize=%ld", + (long)ap_thread_stacksize); + } + + status = apr_thread_mutex_create(&workers->lock, + APR_THREAD_MUTEX_DEFAULT, + workers->pool); + if (status == APR_SUCCESS) { + n = workers->nslots = workers->max_workers; + workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot)); + if (workers->slots == NULL) { + workers->nslots = 0; + status = APR_ENOMEM; + } + for (i = 0; i < n; ++i) { + workers->slots[i].id = i; + } + } + if (status == APR_SUCCESS) { + /* we activate all for now, TODO: support min_workers again. + * do this in reverse for vanity reasons so slot 0 will most + * likely be at head of idle queue. */ + n = workers->max_workers; + for (i = n-1; i >= 0; --i) { + status = activate_slot(workers, &workers->slots[i]); + } + /* the rest of the slots go on the free list */ + for(i = n; i < workers->nslots; ++i) { + push_slot(&workers->free, &workers->slots[i]); + } + workers->dynamic = (workers->worker_count < workers->max_workers); + } + if (status == APR_SUCCESS) { + apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup); + return workers; + } + return NULL; +} + +apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m) +{ + apr_status_t status = h2_fifo_push(workers->mplxs, m); + wake_idle_worker(workers); + return status; +} + +apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m) +{ + return h2_fifo_remove(workers->mplxs, m); +} |