/* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include #include #include #include "h2.h" #include "h2_private.h" #include "h2_mplx.h" #include "h2_c2.h" #include "h2_workers.h" #include "h2_util.h" typedef enum { PROD_IDLE, PROD_ACTIVE, PROD_JOINED, } prod_state_t; struct ap_conn_producer_t { APR_RING_ENTRY(ap_conn_producer_t) link; const char *name; void *baton; ap_conn_producer_next *fn_next; ap_conn_producer_done *fn_done; ap_conn_producer_shutdown *fn_shutdown; volatile prod_state_t state; volatile int conns_active; }; typedef enum { H2_SLOT_FREE, H2_SLOT_RUN, H2_SLOT_ZOMBIE, } h2_slot_state_t; typedef struct h2_slot h2_slot; struct h2_slot { APR_RING_ENTRY(h2_slot) link; apr_uint32_t id; apr_pool_t *pool; h2_slot_state_t state; volatile int should_shutdown; volatile int is_idle; h2_workers *workers; ap_conn_producer_t *prod; apr_thread_t *thread; struct apr_thread_cond_t *more_work; int activations; }; struct h2_workers { server_rec *s; apr_pool_t *pool; apr_uint32_t max_slots; apr_uint32_t min_active; volatile apr_time_t idle_limit; volatile int aborted; volatile int shutdown; int dynamic; volatile apr_uint32_t active_slots; volatile apr_uint32_t idle_slots; apr_threadattr_t *thread_attr; h2_slot *slots; APR_RING_HEAD(h2_slots_free, h2_slot) free; APR_RING_HEAD(h2_slots_idle, h2_slot) idle; APR_RING_HEAD(h2_slots_busy, h2_slot) busy; APR_RING_HEAD(h2_slots_zombie, h2_slot) zombie; APR_RING_HEAD(ap_conn_producer_active, ap_conn_producer_t) prod_active; APR_RING_HEAD(ap_conn_producer_idle, ap_conn_producer_t) prod_idle; struct apr_thread_mutex_t *lock; struct apr_thread_cond_t *prod_done; struct apr_thread_cond_t *all_done; }; static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx); static apr_status_t activate_slot(h2_workers *workers) { h2_slot *slot; apr_pool_t *pool; apr_status_t rv; if (APR_RING_EMPTY(&workers->free, h2_slot, link)) { return APR_EAGAIN; } slot = APR_RING_FIRST(&workers->free); ap_assert(slot->state == H2_SLOT_FREE); APR_RING_REMOVE(slot, link); ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, "h2_workers: activate slot %d", slot->id); slot->state = H2_SLOT_RUN; slot->should_shutdown = 0; slot->is_idle = 0; slot->pool = NULL; ++workers->active_slots; rv = apr_pool_create(&pool, workers->pool); if (APR_SUCCESS != rv) goto cleanup; apr_pool_tag(pool, "h2_worker_slot"); slot->pool = pool; rv = ap_thread_create(&slot->thread, workers->thread_attr, slot_run, slot, slot->pool); cleanup: if (rv != APR_SUCCESS) { AP_DEBUG_ASSERT(0); slot->state = H2_SLOT_FREE; if (slot->pool) { apr_pool_destroy(slot->pool); slot->pool = NULL; } APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link); --workers->active_slots; } return rv; } static void join_zombies(h2_workers *workers) { h2_slot *slot; apr_status_t status; while (!APR_RING_EMPTY(&workers->zombie, h2_slot, link)) { slot = APR_RING_FIRST(&workers->zombie); APR_RING_REMOVE(slot, link); ap_assert(slot->state == H2_SLOT_ZOMBIE); ap_assert(slot->thread != NULL); apr_thread_mutex_unlock(workers->lock); apr_thread_join(&status, slot->thread); apr_thread_mutex_lock(workers->lock); slot->thread = NULL; slot->state = H2_SLOT_FREE; if (slot->pool) { apr_pool_destroy(slot->pool); slot->pool = NULL; } APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link); } } static void wake_idle_worker(h2_workers *workers, ap_conn_producer_t *prod) { if (!APR_RING_EMPTY(&workers->idle, h2_slot, link)) { h2_slot *slot; for (slot = APR_RING_FIRST(&workers->idle); slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link); slot = APR_RING_NEXT(slot, link)) { if (slot->is_idle && !slot->should_shutdown) { apr_thread_cond_signal(slot->more_work); slot->is_idle = 0; return; } } } if (workers->dynamic && !workers->shutdown && (workers->active_slots < workers->max_slots)) { activate_slot(workers); } } /** * Get the next connection to work on. */ static conn_rec *get_next(h2_slot *slot) { h2_workers *workers = slot->workers; conn_rec *c = NULL; ap_conn_producer_t *prod; int has_more; slot->prod = NULL; if (!APR_RING_EMPTY(&workers->prod_active, ap_conn_producer_t, link)) { slot->prod = prod = APR_RING_FIRST(&workers->prod_active); APR_RING_REMOVE(prod, link); AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state); c = prod->fn_next(prod->baton, &has_more); if (c && has_more) { APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link); wake_idle_worker(workers, slot->prod); } else { prod->state = PROD_IDLE; APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link); } if (c) { ++prod->conns_active; } } return c; } static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx) { h2_slot *slot = wctx; h2_workers *workers = slot->workers; conn_rec *c; apr_status_t rv; apr_thread_mutex_lock(workers->lock); slot->state = H2_SLOT_RUN; ++slot->activations; APR_RING_ELEM_INIT(slot, link); for(;;) { if (APR_RING_NEXT(slot, link) != slot) { /* slot is part of the idle ring from the last loop */ APR_RING_REMOVE(slot, link); --workers->idle_slots; } slot->is_idle = 0; if (!workers->aborted && !slot->should_shutdown) { APR_RING_INSERT_TAIL(&workers->busy, slot, h2_slot, link); do { c = get_next(slot); if (!c) { break; } apr_thread_mutex_unlock(workers->lock); /* See the discussion at * * Each conn_rec->id is supposed to be unique at a point in time. Since * some modules (and maybe external code) uses this id as an identifier * for the request_rec they handle, it needs to be unique for secondary * connections also. * * The MPM module assigns the connection ids and mod_unique_id is using * that one to generate identifier for requests. While the implementation * works for HTTP/1.x, the parallel execution of several requests per * connection will generate duplicate identifiers on load. * * The original implementation for secondary connection identifiers used * to shift the master connection id up and assign the stream id to the * lower bits. This was cramped on 32 bit systems, but on 64bit there was * enough space. * * As issue 195 showed, mod_unique_id only uses the lower 32 bit of the * connection id, even on 64bit systems. Therefore collisions in request ids. * * The way master connection ids are generated, there is some space "at the * top" of the lower 32 bits on allmost all systems. If you have a setup * with 64k threads per child and 255 child processes, you live on the edge. * * The new implementation shifts 8 bits and XORs in the worker * id. This will experience collisions with > 256 h2 workers and heavy * load still. There seems to be no way to solve this in all possible * configurations by mod_h2 alone. */ if (c->master) { c->id = (c->master->id << 8)^slot->id; } c->current_thread = thread; AP_DEBUG_ASSERT(slot->prod); #if AP_HAS_RESPONSE_BUCKETS ap_process_connection(c, ap_get_conn_socket(c)); #else h2_c2_process(c, thread, slot->id); #endif slot->prod->fn_done(slot->prod->baton, c); apr_thread_mutex_lock(workers->lock); if (--slot->prod->conns_active <= 0) { apr_thread_cond_broadcast(workers->prod_done); } if (slot->prod->state == PROD_IDLE) { APR_RING_REMOVE(slot->prod, link); slot->prod->state = PROD_ACTIVE; APR_RING_INSERT_TAIL(&workers->prod_active, slot->prod, ap_conn_producer_t, link); } } while (!workers->aborted && !slot->should_shutdown); APR_RING_REMOVE(slot, link); /* no longer busy */ } if (workers->aborted || slot->should_shutdown) { break; } join_zombies(workers); /* we are idle */ APR_RING_INSERT_TAIL(&workers->idle, slot, h2_slot, link); ++workers->idle_slots; slot->is_idle = 1; if (slot->id >= workers->min_active && workers->idle_limit > 0) { rv = apr_thread_cond_timedwait(slot->more_work, workers->lock, workers->idle_limit); if (APR_TIMEUP == rv) { APR_RING_REMOVE(slot, link); --workers->idle_slots; ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s, "h2_workers: idle timeout slot %d in state %d (%d activations)", slot->id, slot->state, slot->activations); break; } } else { apr_thread_cond_wait(slot->more_work, workers->lock); } } ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, "h2_workers: terminate slot %d in state %d (%d activations)", slot->id, slot->state, slot->activations); slot->is_idle = 0; slot->state = H2_SLOT_ZOMBIE; slot->should_shutdown = 0; APR_RING_INSERT_TAIL(&workers->zombie, slot, h2_slot, link); --workers->active_slots; if (workers->active_slots <= 0) { apr_thread_cond_broadcast(workers->all_done); } apr_thread_mutex_unlock(workers->lock); apr_thread_exit(thread, APR_SUCCESS); return NULL; } static void wake_all_idles(h2_workers *workers) { h2_slot *slot; for (slot = APR_RING_FIRST(&workers->idle); slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link); slot = APR_RING_NEXT(slot, link)) { apr_thread_cond_signal(slot->more_work); } } static apr_status_t workers_pool_cleanup(void *data) { h2_workers *workers = data; apr_time_t end, timeout = apr_time_from_sec(1); apr_status_t rv; int n = 0, wait_sec = 5; ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, "h2_workers: cleanup %d workers (%d idle)", workers->active_slots, workers->idle_slots); apr_thread_mutex_lock(workers->lock); workers->shutdown = 1; workers->aborted = 1; wake_all_idles(workers); apr_thread_mutex_unlock(workers->lock); /* wait for all the workers to become zombies and join them. * this gets called after the mpm shuts down and all connections * have either been handled (graceful) or we are forced exiting * (ungrateful). Either way, we show limited patience. */ end = apr_time_now() + apr_time_from_sec(wait_sec); while (apr_time_now() < end) { apr_thread_mutex_lock(workers->lock); if (!(n = workers->active_slots)) { apr_thread_mutex_unlock(workers->lock); break; } wake_all_idles(workers); rv = apr_thread_cond_timedwait(workers->all_done, workers->lock, timeout); apr_thread_mutex_unlock(workers->lock); if (APR_TIMEUP == rv) { ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, APLOGNO(10290) "h2_workers: waiting for workers to close, " "still seeing %d workers (%d idle) living", workers->active_slots, workers->idle_slots); } } if (n) { ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s, APLOGNO(10291) "h2_workers: cleanup, %d workers (%d idle) " "did not exit after %d seconds.", n, workers->idle_slots, wait_sec); } ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, "h2_workers: cleanup all workers terminated"); apr_thread_mutex_lock(workers->lock); join_zombies(workers); apr_thread_mutex_unlock(workers->lock); ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, "h2_workers: cleanup zombie workers joined"); return APR_SUCCESS; } h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild, int max_slots, int min_active, apr_time_t idle_limit) { apr_status_t rv; h2_workers *workers; apr_pool_t *pool; apr_allocator_t *allocator; int locked = 0; apr_uint32_t i; ap_assert(s); ap_assert(pchild); ap_assert(idle_limit > 0); /* let's have our own pool that will be parent to all h2_worker * instances we create. This happens in various threads, but always * guarded by our lock. Without this pool, all subpool creations would * happen on the pool handed to us, which we do not guard. */ rv = apr_allocator_create(&allocator); if (rv != APR_SUCCESS) { goto cleanup; } rv = apr_pool_create_ex(&pool, pchild, NULL, allocator); if (rv != APR_SUCCESS) { apr_allocator_destroy(allocator); goto cleanup; } apr_allocator_owner_set(allocator, pool); apr_pool_tag(pool, "h2_workers"); workers = apr_pcalloc(pool, sizeof(h2_workers)); if (!workers) { return NULL; } workers->s = s; workers->pool = pool; workers->min_active = min_active; workers->max_slots = max_slots; workers->idle_limit = idle_limit; workers->dynamic = (workers->min_active < workers->max_slots); ap_log_error(APLOG_MARK, APLOG_INFO, 0, s, "h2_workers: created with min=%d max=%d idle_ms=%d", workers->min_active, workers->max_slots, (int)apr_time_as_msec(idle_limit)); APR_RING_INIT(&workers->idle, h2_slot, link); APR_RING_INIT(&workers->busy, h2_slot, link); APR_RING_INIT(&workers->free, h2_slot, link); APR_RING_INIT(&workers->zombie, h2_slot, link); APR_RING_INIT(&workers->prod_active, ap_conn_producer_t, link); APR_RING_INIT(&workers->prod_idle, ap_conn_producer_t, link); rv = apr_threadattr_create(&workers->thread_attr, workers->pool); if (rv != APR_SUCCESS) goto cleanup; if (ap_thread_stacksize != 0) { apr_threadattr_stacksize_set(workers->thread_attr, ap_thread_stacksize); ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s, "h2_workers: using stacksize=%ld", (long)ap_thread_stacksize); } rv = apr_thread_mutex_create(&workers->lock, APR_THREAD_MUTEX_DEFAULT, workers->pool); if (rv != APR_SUCCESS) goto cleanup; rv = apr_thread_cond_create(&workers->all_done, workers->pool); if (rv != APR_SUCCESS) goto cleanup; rv = apr_thread_cond_create(&workers->prod_done, workers->pool); if (rv != APR_SUCCESS) goto cleanup; apr_thread_mutex_lock(workers->lock); locked = 1; /* create the slots and put them on the free list */ workers->slots = apr_pcalloc(workers->pool, workers->max_slots * sizeof(h2_slot)); for (i = 0; i < workers->max_slots; ++i) { workers->slots[i].id = i; workers->slots[i].state = H2_SLOT_FREE; workers->slots[i].workers = workers; APR_RING_ELEM_INIT(&workers->slots[i], link); APR_RING_INSERT_TAIL(&workers->free, &workers->slots[i], h2_slot, link); rv = apr_thread_cond_create(&workers->slots[i].more_work, workers->pool); if (rv != APR_SUCCESS) goto cleanup; } /* activate the min amount of workers */ for (i = 0; i < workers->min_active; ++i) { rv = activate_slot(workers); if (rv != APR_SUCCESS) goto cleanup; } cleanup: if (locked) { apr_thread_mutex_unlock(workers->lock); } if (rv == APR_SUCCESS) { /* Stop/join the workers threads when the MPM child exits (pchild is * destroyed), and as a pre_cleanup of pchild thus before the threads * pools (children of workers->pool) so that they are not destroyed * before/under us. */ apr_pool_pre_cleanup_register(pchild, workers, workers_pool_cleanup); return workers; } ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, s, "h2_workers: errors initializing"); return NULL; } apr_uint32_t h2_workers_get_max_workers(h2_workers *workers) { return workers->max_slots; } void h2_workers_shutdown(h2_workers *workers, int graceful) { ap_conn_producer_t *prod; apr_thread_mutex_lock(workers->lock); ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s, "h2_workers: shutdown graceful=%d", graceful); workers->shutdown = 1; workers->idle_limit = apr_time_from_sec(1); wake_all_idles(workers); for (prod = APR_RING_FIRST(&workers->prod_idle); prod != APR_RING_SENTINEL(&workers->prod_idle, ap_conn_producer_t, link); prod = APR_RING_NEXT(prod, link)) { if (prod->fn_shutdown) { prod->fn_shutdown(prod->baton, graceful); } } apr_thread_mutex_unlock(workers->lock); } ap_conn_producer_t *h2_workers_register(h2_workers *workers, apr_pool_t *producer_pool, const char *name, ap_conn_producer_next *fn_next, ap_conn_producer_done *fn_done, ap_conn_producer_shutdown *fn_shutdown, void *baton) { ap_conn_producer_t *prod; prod = apr_pcalloc(producer_pool, sizeof(*prod)); APR_RING_ELEM_INIT(prod, link); prod->name = name; prod->fn_next = fn_next; prod->fn_done = fn_done; prod->fn_shutdown = fn_shutdown; prod->baton = baton; apr_thread_mutex_lock(workers->lock); prod->state = PROD_IDLE; APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link); apr_thread_mutex_unlock(workers->lock); return prod; } apr_status_t h2_workers_join(h2_workers *workers, ap_conn_producer_t *prod) { apr_status_t rv = APR_SUCCESS; apr_thread_mutex_lock(workers->lock); if (PROD_JOINED == prod->state) { AP_DEBUG_ASSERT(APR_RING_NEXT(prod, link) == prod); /* should be in no ring */ rv = APR_EINVAL; } else { AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state || PROD_IDLE == prod->state); APR_RING_REMOVE(prod, link); prod->state = PROD_JOINED; /* prevent further activations */ while (prod->conns_active > 0) { apr_thread_cond_wait(workers->prod_done, workers->lock); } APR_RING_ELEM_INIT(prod, link); /* make it link to itself */ } apr_thread_mutex_unlock(workers->lock); return rv; } apr_status_t h2_workers_activate(h2_workers *workers, ap_conn_producer_t *prod) { apr_status_t rv = APR_SUCCESS; apr_thread_mutex_lock(workers->lock); if (PROD_IDLE == prod->state) { APR_RING_REMOVE(prod, link); prod->state = PROD_ACTIVE; APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link); wake_idle_worker(workers, prod); } else if (PROD_JOINED == prod->state) { rv = APR_EINVAL; } apr_thread_mutex_unlock(workers->lock); return rv; }