summaryrefslogtreecommitdiffstats
path: root/modules/http2/h2_workers.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 06:33:50 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 06:33:50 +0000
commitfe39ffb8b90ae4e002ed73fe98617cd590abb467 (patch)
treeb80e5956907d8aeaaffe4e4f0c068c0e6157ce8b /modules/http2/h2_workers.c
parentInitial commit. (diff)
downloadapache2-upstream/2.4.56.tar.xz
apache2-upstream/2.4.56.zip
Adding upstream version 2.4.56.upstream/2.4.56
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'modules/http2/h2_workers.c')
-rw-r--r--modules/http2/h2_workers.c626
1 files changed, 626 insertions, 0 deletions
diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c
new file mode 100644
index 0000000..e7e2039
--- /dev/null
+++ b/modules/http2/h2_workers.c
@@ -0,0 +1,626 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <apr_ring.h>
+#include <apr_thread_mutex.h>
+#include <apr_thread_cond.h>
+
+#include <mpm_common.h>
+#include <httpd.h>
+#include <http_connection.h>
+#include <http_core.h>
+#include <http_log.h>
+#include <http_protocol.h>
+
+#include "h2.h"
+#include "h2_private.h"
+#include "h2_mplx.h"
+#include "h2_c2.h"
+#include "h2_workers.h"
+#include "h2_util.h"
+
+typedef enum {
+ PROD_IDLE,
+ PROD_ACTIVE,
+ PROD_JOINED,
+} prod_state_t;
+
+struct ap_conn_producer_t {
+ APR_RING_ENTRY(ap_conn_producer_t) link;
+ const char *name;
+ void *baton;
+ ap_conn_producer_next *fn_next;
+ ap_conn_producer_done *fn_done;
+ ap_conn_producer_shutdown *fn_shutdown;
+ volatile prod_state_t state;
+ volatile int conns_active;
+};
+
+
+typedef enum {
+ H2_SLOT_FREE,
+ H2_SLOT_RUN,
+ H2_SLOT_ZOMBIE,
+} h2_slot_state_t;
+
+typedef struct h2_slot h2_slot;
+struct h2_slot {
+ APR_RING_ENTRY(h2_slot) link;
+ apr_uint32_t id;
+ apr_pool_t *pool;
+ h2_slot_state_t state;
+ volatile int should_shutdown;
+ volatile int is_idle;
+ h2_workers *workers;
+ ap_conn_producer_t *prod;
+ apr_thread_t *thread;
+ struct apr_thread_cond_t *more_work;
+ int activations;
+};
+
+struct h2_workers {
+ server_rec *s;
+ apr_pool_t *pool;
+
+ apr_uint32_t max_slots;
+ apr_uint32_t min_active;
+ volatile apr_time_t idle_limit;
+ volatile int aborted;
+ volatile int shutdown;
+ int dynamic;
+
+ volatile apr_uint32_t active_slots;
+ volatile apr_uint32_t idle_slots;
+
+ apr_threadattr_t *thread_attr;
+ h2_slot *slots;
+
+ APR_RING_HEAD(h2_slots_free, h2_slot) free;
+ APR_RING_HEAD(h2_slots_idle, h2_slot) idle;
+ APR_RING_HEAD(h2_slots_busy, h2_slot) busy;
+ APR_RING_HEAD(h2_slots_zombie, h2_slot) zombie;
+
+ APR_RING_HEAD(ap_conn_producer_active, ap_conn_producer_t) prod_active;
+ APR_RING_HEAD(ap_conn_producer_idle, ap_conn_producer_t) prod_idle;
+
+ struct apr_thread_mutex_t *lock;
+ struct apr_thread_cond_t *prod_done;
+ struct apr_thread_cond_t *all_done;
+};
+
+
+static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx);
+
+static apr_status_t activate_slot(h2_workers *workers)
+{
+ h2_slot *slot;
+ apr_pool_t *pool;
+ apr_status_t rv;
+
+ if (APR_RING_EMPTY(&workers->free, h2_slot, link)) {
+ return APR_EAGAIN;
+ }
+ slot = APR_RING_FIRST(&workers->free);
+ ap_assert(slot->state == H2_SLOT_FREE);
+ APR_RING_REMOVE(slot, link);
+
+ ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
+ "h2_workers: activate slot %d", slot->id);
+
+ slot->state = H2_SLOT_RUN;
+ slot->should_shutdown = 0;
+ slot->is_idle = 0;
+ slot->pool = NULL;
+ ++workers->active_slots;
+ rv = apr_pool_create(&pool, workers->pool);
+ if (APR_SUCCESS != rv) goto cleanup;
+ apr_pool_tag(pool, "h2_worker_slot");
+ slot->pool = pool;
+
+ rv = ap_thread_create(&slot->thread, workers->thread_attr,
+ slot_run, slot, slot->pool);
+
+cleanup:
+ if (rv != APR_SUCCESS) {
+ AP_DEBUG_ASSERT(0);
+ slot->state = H2_SLOT_FREE;
+ if (slot->pool) {
+ apr_pool_destroy(slot->pool);
+ slot->pool = NULL;
+ }
+ APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link);
+ --workers->active_slots;
+ }
+ return rv;
+}
+
+static void join_zombies(h2_workers *workers)
+{
+ h2_slot *slot;
+ apr_status_t status;
+
+ while (!APR_RING_EMPTY(&workers->zombie, h2_slot, link)) {
+ slot = APR_RING_FIRST(&workers->zombie);
+ APR_RING_REMOVE(slot, link);
+ ap_assert(slot->state == H2_SLOT_ZOMBIE);
+ ap_assert(slot->thread != NULL);
+
+ apr_thread_mutex_unlock(workers->lock);
+ apr_thread_join(&status, slot->thread);
+ apr_thread_mutex_lock(workers->lock);
+
+ slot->thread = NULL;
+ slot->state = H2_SLOT_FREE;
+ if (slot->pool) {
+ apr_pool_destroy(slot->pool);
+ slot->pool = NULL;
+ }
+ APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link);
+ }
+}
+
+static void wake_idle_worker(h2_workers *workers, ap_conn_producer_t *prod)
+{
+ if (!APR_RING_EMPTY(&workers->idle, h2_slot, link)) {
+ h2_slot *slot;
+ for (slot = APR_RING_FIRST(&workers->idle);
+ slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link);
+ slot = APR_RING_NEXT(slot, link)) {
+ if (slot->is_idle && !slot->should_shutdown) {
+ apr_thread_cond_signal(slot->more_work);
+ slot->is_idle = 0;
+ return;
+ }
+ }
+ }
+ if (workers->dynamic && !workers->shutdown
+ && (workers->active_slots < workers->max_slots)) {
+ activate_slot(workers);
+ }
+}
+
+/**
+ * Get the next connection to work on.
+ */
+static conn_rec *get_next(h2_slot *slot)
+{
+ h2_workers *workers = slot->workers;
+ conn_rec *c = NULL;
+ ap_conn_producer_t *prod;
+ int has_more;
+
+ slot->prod = NULL;
+ if (!APR_RING_EMPTY(&workers->prod_active, ap_conn_producer_t, link)) {
+ slot->prod = prod = APR_RING_FIRST(&workers->prod_active);
+ APR_RING_REMOVE(prod, link);
+ AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state);
+
+ c = prod->fn_next(prod->baton, &has_more);
+ if (c && has_more) {
+ APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link);
+ wake_idle_worker(workers, slot->prod);
+ }
+ else {
+ prod->state = PROD_IDLE;
+ APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link);
+ }
+ if (c) {
+ ++prod->conns_active;
+ }
+ }
+
+ return c;
+}
+
+static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
+{
+ h2_slot *slot = wctx;
+ h2_workers *workers = slot->workers;
+ conn_rec *c;
+ apr_status_t rv;
+
+ apr_thread_mutex_lock(workers->lock);
+ slot->state = H2_SLOT_RUN;
+ ++slot->activations;
+ APR_RING_ELEM_INIT(slot, link);
+ for(;;) {
+ if (APR_RING_NEXT(slot, link) != slot) {
+ /* slot is part of the idle ring from the last loop */
+ APR_RING_REMOVE(slot, link);
+ --workers->idle_slots;
+ }
+ slot->is_idle = 0;
+
+ if (!workers->aborted && !slot->should_shutdown) {
+ APR_RING_INSERT_TAIL(&workers->busy, slot, h2_slot, link);
+ do {
+ c = get_next(slot);
+ if (!c) {
+ break;
+ }
+ apr_thread_mutex_unlock(workers->lock);
+ /* See the discussion at <https://github.com/icing/mod_h2/issues/195>
+ *
+ * Each conn_rec->id is supposed to be unique at a point in time. Since
+ * some modules (and maybe external code) uses this id as an identifier
+ * for the request_rec they handle, it needs to be unique for secondary
+ * connections also.
+ *
+ * The MPM module assigns the connection ids and mod_unique_id is using
+ * that one to generate identifier for requests. While the implementation
+ * works for HTTP/1.x, the parallel execution of several requests per
+ * connection will generate duplicate identifiers on load.
+ *
+ * The original implementation for secondary connection identifiers used
+ * to shift the master connection id up and assign the stream id to the
+ * lower bits. This was cramped on 32 bit systems, but on 64bit there was
+ * enough space.
+ *
+ * As issue 195 showed, mod_unique_id only uses the lower 32 bit of the
+ * connection id, even on 64bit systems. Therefore collisions in request ids.
+ *
+ * The way master connection ids are generated, there is some space "at the
+ * top" of the lower 32 bits on allmost all systems. If you have a setup
+ * with 64k threads per child and 255 child processes, you live on the edge.
+ *
+ * The new implementation shifts 8 bits and XORs in the worker
+ * id. This will experience collisions with > 256 h2 workers and heavy
+ * load still. There seems to be no way to solve this in all possible
+ * configurations by mod_h2 alone.
+ */
+ if (c->master) {
+ c->id = (c->master->id << 8)^slot->id;
+ }
+ c->current_thread = thread;
+ AP_DEBUG_ASSERT(slot->prod);
+
+#if AP_HAS_RESPONSE_BUCKETS
+ ap_process_connection(c, ap_get_conn_socket(c));
+#else
+ h2_c2_process(c, thread, slot->id);
+#endif
+ slot->prod->fn_done(slot->prod->baton, c);
+
+ apr_thread_mutex_lock(workers->lock);
+ if (--slot->prod->conns_active <= 0) {
+ apr_thread_cond_broadcast(workers->prod_done);
+ }
+ if (slot->prod->state == PROD_IDLE) {
+ APR_RING_REMOVE(slot->prod, link);
+ slot->prod->state = PROD_ACTIVE;
+ APR_RING_INSERT_TAIL(&workers->prod_active, slot->prod, ap_conn_producer_t, link);
+ }
+
+ } while (!workers->aborted && !slot->should_shutdown);
+ APR_RING_REMOVE(slot, link); /* no longer busy */
+ }
+
+ if (workers->aborted || slot->should_shutdown) {
+ break;
+ }
+
+ join_zombies(workers);
+
+ /* we are idle */
+ APR_RING_INSERT_TAIL(&workers->idle, slot, h2_slot, link);
+ ++workers->idle_slots;
+ slot->is_idle = 1;
+ if (slot->id >= workers->min_active && workers->idle_limit > 0) {
+ rv = apr_thread_cond_timedwait(slot->more_work, workers->lock,
+ workers->idle_limit);
+ if (APR_TIMEUP == rv) {
+ APR_RING_REMOVE(slot, link);
+ --workers->idle_slots;
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
+ "h2_workers: idle timeout slot %d in state %d (%d activations)",
+ slot->id, slot->state, slot->activations);
+ break;
+ }
+ }
+ else {
+ apr_thread_cond_wait(slot->more_work, workers->lock);
+ }
+ }
+
+ ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
+ "h2_workers: terminate slot %d in state %d (%d activations)",
+ slot->id, slot->state, slot->activations);
+ slot->is_idle = 0;
+ slot->state = H2_SLOT_ZOMBIE;
+ slot->should_shutdown = 0;
+ APR_RING_INSERT_TAIL(&workers->zombie, slot, h2_slot, link);
+ --workers->active_slots;
+ if (workers->active_slots <= 0) {
+ apr_thread_cond_broadcast(workers->all_done);
+ }
+ apr_thread_mutex_unlock(workers->lock);
+
+ apr_thread_exit(thread, APR_SUCCESS);
+ return NULL;
+}
+
+static void wake_all_idles(h2_workers *workers)
+{
+ h2_slot *slot;
+ for (slot = APR_RING_FIRST(&workers->idle);
+ slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link);
+ slot = APR_RING_NEXT(slot, link))
+ {
+ apr_thread_cond_signal(slot->more_work);
+ }
+}
+
+static apr_status_t workers_pool_cleanup(void *data)
+{
+ h2_workers *workers = data;
+ apr_time_t end, timeout = apr_time_from_sec(1);
+ apr_status_t rv;
+ int n = 0, wait_sec = 5;
+
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
+ "h2_workers: cleanup %d workers (%d idle)",
+ workers->active_slots, workers->idle_slots);
+ apr_thread_mutex_lock(workers->lock);
+ workers->shutdown = 1;
+ workers->aborted = 1;
+ wake_all_idles(workers);
+ apr_thread_mutex_unlock(workers->lock);
+
+ /* wait for all the workers to become zombies and join them.
+ * this gets called after the mpm shuts down and all connections
+ * have either been handled (graceful) or we are forced exiting
+ * (ungrateful). Either way, we show limited patience. */
+ end = apr_time_now() + apr_time_from_sec(wait_sec);
+ while (apr_time_now() < end) {
+ apr_thread_mutex_lock(workers->lock);
+ if (!(n = workers->active_slots)) {
+ apr_thread_mutex_unlock(workers->lock);
+ break;
+ }
+ wake_all_idles(workers);
+ rv = apr_thread_cond_timedwait(workers->all_done, workers->lock, timeout);
+ apr_thread_mutex_unlock(workers->lock);
+
+ if (APR_TIMEUP == rv) {
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
+ APLOGNO(10290) "h2_workers: waiting for workers to close, "
+ "still seeing %d workers (%d idle) living",
+ workers->active_slots, workers->idle_slots);
+ }
+ }
+ if (n) {
+ ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s,
+ APLOGNO(10291) "h2_workers: cleanup, %d workers (%d idle) "
+ "did not exit after %d seconds.",
+ n, workers->idle_slots, wait_sec);
+ }
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
+ "h2_workers: cleanup all workers terminated");
+ apr_thread_mutex_lock(workers->lock);
+ join_zombies(workers);
+ apr_thread_mutex_unlock(workers->lock);
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
+ "h2_workers: cleanup zombie workers joined");
+
+ return APR_SUCCESS;
+}
+
+h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild,
+ int max_slots, int min_active,
+ apr_time_t idle_limit)
+{
+ apr_status_t rv;
+ h2_workers *workers;
+ apr_pool_t *pool;
+ apr_allocator_t *allocator;
+ int locked = 0;
+ apr_uint32_t i;
+
+ ap_assert(s);
+ ap_assert(pchild);
+ ap_assert(idle_limit > 0);
+
+ /* let's have our own pool that will be parent to all h2_worker
+ * instances we create. This happens in various threads, but always
+ * guarded by our lock. Without this pool, all subpool creations would
+ * happen on the pool handed to us, which we do not guard.
+ */
+ rv = apr_allocator_create(&allocator);
+ if (rv != APR_SUCCESS) {
+ goto cleanup;
+ }
+ rv = apr_pool_create_ex(&pool, pchild, NULL, allocator);
+ if (rv != APR_SUCCESS) {
+ apr_allocator_destroy(allocator);
+ goto cleanup;
+ }
+ apr_allocator_owner_set(allocator, pool);
+ apr_pool_tag(pool, "h2_workers");
+ workers = apr_pcalloc(pool, sizeof(h2_workers));
+ if (!workers) {
+ return NULL;
+ }
+
+ workers->s = s;
+ workers->pool = pool;
+ workers->min_active = min_active;
+ workers->max_slots = max_slots;
+ workers->idle_limit = idle_limit;
+ workers->dynamic = (workers->min_active < workers->max_slots);
+
+ ap_log_error(APLOG_MARK, APLOG_INFO, 0, s,
+ "h2_workers: created with min=%d max=%d idle_ms=%d",
+ workers->min_active, workers->max_slots,
+ (int)apr_time_as_msec(idle_limit));
+
+ APR_RING_INIT(&workers->idle, h2_slot, link);
+ APR_RING_INIT(&workers->busy, h2_slot, link);
+ APR_RING_INIT(&workers->free, h2_slot, link);
+ APR_RING_INIT(&workers->zombie, h2_slot, link);
+
+ APR_RING_INIT(&workers->prod_active, ap_conn_producer_t, link);
+ APR_RING_INIT(&workers->prod_idle, ap_conn_producer_t, link);
+
+ rv = apr_threadattr_create(&workers->thread_attr, workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
+
+ if (ap_thread_stacksize != 0) {
+ apr_threadattr_stacksize_set(workers->thread_attr,
+ ap_thread_stacksize);
+ ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
+ "h2_workers: using stacksize=%ld",
+ (long)ap_thread_stacksize);
+ }
+
+ rv = apr_thread_mutex_create(&workers->lock,
+ APR_THREAD_MUTEX_DEFAULT,
+ workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
+ rv = apr_thread_cond_create(&workers->all_done, workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
+ rv = apr_thread_cond_create(&workers->prod_done, workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
+
+ apr_thread_mutex_lock(workers->lock);
+ locked = 1;
+
+ /* create the slots and put them on the free list */
+ workers->slots = apr_pcalloc(workers->pool, workers->max_slots * sizeof(h2_slot));
+
+ for (i = 0; i < workers->max_slots; ++i) {
+ workers->slots[i].id = i;
+ workers->slots[i].state = H2_SLOT_FREE;
+ workers->slots[i].workers = workers;
+ APR_RING_ELEM_INIT(&workers->slots[i], link);
+ APR_RING_INSERT_TAIL(&workers->free, &workers->slots[i], h2_slot, link);
+ rv = apr_thread_cond_create(&workers->slots[i].more_work, workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
+ }
+
+ /* activate the min amount of workers */
+ for (i = 0; i < workers->min_active; ++i) {
+ rv = activate_slot(workers);
+ if (rv != APR_SUCCESS) goto cleanup;
+ }
+
+cleanup:
+ if (locked) {
+ apr_thread_mutex_unlock(workers->lock);
+ }
+ if (rv == APR_SUCCESS) {
+ /* Stop/join the workers threads when the MPM child exits (pchild is
+ * destroyed), and as a pre_cleanup of pchild thus before the threads
+ * pools (children of workers->pool) so that they are not destroyed
+ * before/under us.
+ */
+ apr_pool_pre_cleanup_register(pchild, workers, workers_pool_cleanup);
+ return workers;
+ }
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, s,
+ "h2_workers: errors initializing");
+ return NULL;
+}
+
+apr_uint32_t h2_workers_get_max_workers(h2_workers *workers)
+{
+ return workers->max_slots;
+}
+
+void h2_workers_shutdown(h2_workers *workers, int graceful)
+{
+ ap_conn_producer_t *prod;
+
+ apr_thread_mutex_lock(workers->lock);
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
+ "h2_workers: shutdown graceful=%d", graceful);
+ workers->shutdown = 1;
+ workers->idle_limit = apr_time_from_sec(1);
+ wake_all_idles(workers);
+ for (prod = APR_RING_FIRST(&workers->prod_idle);
+ prod != APR_RING_SENTINEL(&workers->prod_idle, ap_conn_producer_t, link);
+ prod = APR_RING_NEXT(prod, link)) {
+ if (prod->fn_shutdown) {
+ prod->fn_shutdown(prod->baton, graceful);
+ }
+ }
+ apr_thread_mutex_unlock(workers->lock);
+}
+
+ap_conn_producer_t *h2_workers_register(h2_workers *workers,
+ apr_pool_t *producer_pool,
+ const char *name,
+ ap_conn_producer_next *fn_next,
+ ap_conn_producer_done *fn_done,
+ ap_conn_producer_shutdown *fn_shutdown,
+ void *baton)
+{
+ ap_conn_producer_t *prod;
+
+ prod = apr_pcalloc(producer_pool, sizeof(*prod));
+ APR_RING_ELEM_INIT(prod, link);
+ prod->name = name;
+ prod->fn_next = fn_next;
+ prod->fn_done = fn_done;
+ prod->fn_shutdown = fn_shutdown;
+ prod->baton = baton;
+
+ apr_thread_mutex_lock(workers->lock);
+ prod->state = PROD_IDLE;
+ APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link);
+ apr_thread_mutex_unlock(workers->lock);
+
+ return prod;
+}
+
+apr_status_t h2_workers_join(h2_workers *workers, ap_conn_producer_t *prod)
+{
+ apr_status_t rv = APR_SUCCESS;
+
+ apr_thread_mutex_lock(workers->lock);
+ if (PROD_JOINED == prod->state) {
+ AP_DEBUG_ASSERT(APR_RING_NEXT(prod, link) == prod); /* should be in no ring */
+ rv = APR_EINVAL;
+ }
+ else {
+ AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state || PROD_IDLE == prod->state);
+ APR_RING_REMOVE(prod, link);
+ prod->state = PROD_JOINED; /* prevent further activations */
+ while (prod->conns_active > 0) {
+ apr_thread_cond_wait(workers->prod_done, workers->lock);
+ }
+ APR_RING_ELEM_INIT(prod, link); /* make it link to itself */
+ }
+ apr_thread_mutex_unlock(workers->lock);
+ return rv;
+}
+
+apr_status_t h2_workers_activate(h2_workers *workers, ap_conn_producer_t *prod)
+{
+ apr_status_t rv = APR_SUCCESS;
+ apr_thread_mutex_lock(workers->lock);
+ if (PROD_IDLE == prod->state) {
+ APR_RING_REMOVE(prod, link);
+ prod->state = PROD_ACTIVE;
+ APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link);
+ wake_idle_worker(workers, prod);
+ }
+ else if (PROD_JOINED == prod->state) {
+ rv = APR_EINVAL;
+ }
+ apr_thread_mutex_unlock(workers->lock);
+ return rv;
+}