summaryrefslogtreecommitdiffstats
path: root/misc/apr_thread_pool.c
diff options
context:
space:
mode:
Diffstat (limited to 'misc/apr_thread_pool.c')
-rw-r--r--misc/apr_thread_pool.c1019
1 files changed, 1019 insertions, 0 deletions
diff --git a/misc/apr_thread_pool.c b/misc/apr_thread_pool.c
new file mode 100644
index 0000000..5aa3b65
--- /dev/null
+++ b/misc/apr_thread_pool.c
@@ -0,0 +1,1019 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#include <assert.h>
+#include "apr_thread_pool.h"
+#include "apr_ring.h"
+#include "apr_thread_cond.h"
+#include "apr_portable.h"
+
+#if APR_HAS_THREADS
+
+#define TASK_PRIORITY_SEGS 4
+#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)
+
+typedef struct apr_thread_pool_task
+{
+ APR_RING_ENTRY(apr_thread_pool_task) link;
+ apr_thread_start_t func;
+ void *param;
+ void *owner;
+ union
+ {
+ apr_byte_t priority;
+ apr_time_t time;
+ } dispatch;
+} apr_thread_pool_task_t;
+
+APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
+
+struct apr_thread_list_elt
+{
+ APR_RING_ENTRY(apr_thread_list_elt) link;
+ apr_thread_t *thd;
+ void *current_owner;
+ enum { TH_RUN, TH_STOP, TH_PROBATION } state;
+ int signal_work_done;
+};
+
+APR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
+
+struct apr_thread_pool
+{
+ apr_pool_t *pool;
+ volatile apr_size_t thd_max;
+ volatile apr_size_t idle_max;
+ volatile apr_interval_time_t idle_wait;
+ volatile apr_size_t thd_cnt;
+ volatile apr_size_t idle_cnt;
+ volatile apr_size_t busy_cnt;
+ volatile apr_size_t task_cnt;
+ volatile apr_size_t scheduled_task_cnt;
+ volatile apr_size_t threshold;
+ volatile apr_size_t tasks_run;
+ volatile apr_size_t tasks_high;
+ volatile apr_size_t thd_high;
+ volatile apr_size_t thd_timed_out;
+ struct apr_thread_pool_tasks *tasks;
+ struct apr_thread_pool_tasks *scheduled_tasks;
+ struct apr_thread_list *busy_thds;
+ struct apr_thread_list *idle_thds;
+ struct apr_thread_list *dead_thds;
+ apr_thread_cond_t *more_work;
+ apr_thread_cond_t *work_done;
+ apr_thread_cond_t *all_done;
+ apr_thread_mutex_t *lock;
+ volatile int terminated;
+ struct apr_thread_pool_tasks *recycled_tasks;
+ struct apr_thread_list *recycled_thds;
+ apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
+};
+
+static apr_status_t thread_pool_construct(apr_thread_pool_t **tp,
+ apr_size_t init_threads,
+ apr_size_t max_threads,
+ apr_pool_t *pool)
+{
+ apr_status_t rv;
+ apr_thread_pool_t *me;
+
+ me = *tp = apr_pcalloc(pool, sizeof(apr_thread_pool_t));
+ me->thd_max = max_threads;
+ me->idle_max = init_threads;
+ me->threshold = init_threads / 2;
+
+ /* This pool will be used by different threads. As we cannot ensure that
+ * our caller won't use the pool without acquiring the mutex, we must
+ * create a new sub pool.
+ */
+ rv = apr_pool_create(&me->pool, pool);
+ if (APR_SUCCESS != rv) {
+ return rv;
+ }
+ /* Create the mutex on the parent pool such that it's always alive from
+ * apr_thread_pool_{push,schedule,top}() callers.
+ */
+ rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED, pool);
+ if (APR_SUCCESS != rv) {
+ return rv;
+ }
+ rv = apr_thread_cond_create(&me->more_work, me->pool);
+ if (APR_SUCCESS != rv) {
+ apr_thread_mutex_destroy(me->lock);
+ return rv;
+ }
+ rv = apr_thread_cond_create(&me->work_done, me->pool);
+ if (APR_SUCCESS != rv) {
+ apr_thread_cond_destroy(me->more_work);
+ apr_thread_mutex_destroy(me->lock);
+ return rv;
+ }
+ rv = apr_thread_cond_create(&me->all_done, me->pool);
+ if (APR_SUCCESS != rv) {
+ apr_thread_cond_destroy(me->work_done);
+ apr_thread_cond_destroy(me->more_work);
+ apr_thread_mutex_destroy(me->lock);
+ return rv;
+ }
+ me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
+ if (!me->tasks) {
+ goto CATCH_ENOMEM;
+ }
+ APR_RING_INIT(me->tasks, apr_thread_pool_task, link);
+ me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));
+ if (!me->scheduled_tasks) {
+ goto CATCH_ENOMEM;
+ }
+ APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);
+ me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
+ if (!me->recycled_tasks) {
+ goto CATCH_ENOMEM;
+ }
+ APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);
+ me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
+ if (!me->busy_thds) {
+ goto CATCH_ENOMEM;
+ }
+ APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);
+ me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
+ if (!me->idle_thds) {
+ goto CATCH_ENOMEM;
+ }
+ APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
+ me->dead_thds = apr_palloc(me->pool, sizeof(*me->dead_thds));
+ if (!me->dead_thds) {
+ goto CATCH_ENOMEM;
+ }
+ APR_RING_INIT(me->dead_thds, apr_thread_list_elt, link);
+ me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds));
+ if (!me->recycled_thds) {
+ goto CATCH_ENOMEM;
+ }
+ APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link);
+ goto FINAL_EXIT;
+ CATCH_ENOMEM:
+ rv = APR_ENOMEM;
+ apr_thread_cond_destroy(me->all_done);
+ apr_thread_cond_destroy(me->work_done);
+ apr_thread_cond_destroy(me->more_work);
+ apr_thread_mutex_destroy(me->lock);
+ FINAL_EXIT:
+ return rv;
+}
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me)
+{
+ apr_thread_pool_task_t *task = NULL;
+ int seg;
+
+ /* check for scheduled tasks */
+ if (me->scheduled_task_cnt > 0) {
+ task = APR_RING_FIRST(me->scheduled_tasks);
+ assert(task != NULL);
+ assert(task !=
+ APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+ link));
+ /* if it's time */
+ if (task->dispatch.time <= apr_time_now()) {
+ --me->scheduled_task_cnt;
+ APR_RING_REMOVE(task, link);
+ return task;
+ }
+ }
+ /* check for normal tasks if we're not returning a scheduled task */
+ if (me->task_cnt == 0) {
+ return NULL;
+ }
+
+ task = APR_RING_FIRST(me->tasks);
+ assert(task != NULL);
+ assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));
+ --me->task_cnt;
+ seg = TASK_PRIORITY_SEG(task);
+ if (task == me->task_idx[seg]) {
+ me->task_idx[seg] = APR_RING_NEXT(task, link);
+ if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
+ apr_thread_pool_task, link)
+ || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
+ me->task_idx[seg] = NULL;
+ }
+ }
+ APR_RING_REMOVE(task, link);
+ return task;
+}
+
+static apr_interval_time_t waiting_time(apr_thread_pool_t * me)
+{
+ apr_thread_pool_task_t *task = NULL;
+
+ task = APR_RING_FIRST(me->scheduled_tasks);
+ assert(task != NULL);
+ assert(task !=
+ APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+ link));
+ return task->dispatch.time - apr_time_now();
+}
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me,
+ apr_thread_t * t)
+{
+ struct apr_thread_list_elt *elt;
+
+ if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) {
+ elt = apr_palloc(me->pool, sizeof(*elt));
+ if (NULL == elt) {
+ return NULL;
+ }
+ }
+ else {
+ elt = APR_RING_FIRST(me->recycled_thds);
+ APR_RING_REMOVE(elt, link);
+ }
+
+ APR_RING_ELEM_INIT(elt, link);
+ elt->thd = t;
+ elt->current_owner = NULL;
+ elt->signal_work_done = 0;
+ elt->state = TH_RUN;
+ return elt;
+}
+
+/*
+ * The worker thread function. Take a task from the queue and perform it if
+ * there is any. Otherwise, put itself into the idle thread list and waiting
+ * for signal to wake up.
+ * The thread terminates directly and exits when it is asked to stop, after
+ * handling its task if busy. The thread will then be in the dead_thds list
+ * and should be joined.
+ */
+static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
+{
+ apr_thread_pool_t *me = param;
+ apr_thread_pool_task_t *task = NULL;
+ apr_interval_time_t wait;
+ struct apr_thread_list_elt *elt;
+
+ apr_thread_mutex_lock(me->lock);
+
+ elt = elt_new(me, t);
+ if (!elt) {
+ apr_thread_mutex_unlock(me->lock);
+ apr_thread_exit(t, APR_ENOMEM);
+ }
+
+ for (;;) {
+ /* Test if not new element, it is awakened from idle */
+ if (APR_RING_NEXT(elt, link) != elt) {
+ --me->idle_cnt;
+ APR_RING_REMOVE(elt, link);
+ }
+
+ if (elt->state != TH_STOP) {
+ ++me->busy_cnt;
+ APR_RING_INSERT_TAIL(me->busy_thds, elt,
+ apr_thread_list_elt, link);
+ do {
+ task = pop_task(me);
+ if (!task) {
+ break;
+ }
+ ++me->tasks_run;
+ elt->current_owner = task->owner;
+ apr_thread_mutex_unlock(me->lock);
+
+ /* Run the task (or drop it if terminated already) */
+ if (!me->terminated) {
+ apr_thread_data_set(task, "apr_thread_pool_task", NULL, t);
+ task->func(t, task->param);
+ }
+
+ apr_thread_mutex_lock(me->lock);
+ APR_RING_INSERT_TAIL(me->recycled_tasks, task,
+ apr_thread_pool_task, link);
+ elt->current_owner = NULL;
+ if (elt->signal_work_done) {
+ elt->signal_work_done = 0;
+ apr_thread_cond_signal(me->work_done);
+ }
+ } while (elt->state != TH_STOP);
+ APR_RING_REMOVE(elt, link);
+ --me->busy_cnt;
+ }
+ assert(NULL == elt->current_owner);
+
+ /* thread should die? */
+ if (me->terminated
+ || elt->state != TH_RUN
+ || (me->idle_cnt >= me->idle_max
+ && (me->idle_max || !me->scheduled_task_cnt)
+ && !me->idle_wait)) {
+ if ((TH_PROBATION == elt->state) && me->idle_wait)
+ ++me->thd_timed_out;
+ break;
+ }
+
+ /* busy thread become idle */
+ ++me->idle_cnt;
+ APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);
+
+ /*
+ * If there is a scheduled task, always scheduled to perform that task.
+ * Since there is no guarantee that current idle threads are scheduled
+ * for next scheduled task.
+ */
+ if (me->scheduled_task_cnt)
+ wait = waiting_time(me);
+ else if (me->idle_cnt > me->idle_max) {
+ wait = me->idle_wait;
+ elt->state = TH_PROBATION;
+ }
+ else
+ wait = -1;
+
+ if (wait >= 0) {
+ apr_thread_cond_timedwait(me->more_work, me->lock, wait);
+ }
+ else {
+ apr_thread_cond_wait(me->more_work, me->lock);
+ }
+ }
+
+ /* Dead thread, to be joined */
+ APR_RING_INSERT_TAIL(me->dead_thds, elt, apr_thread_list_elt, link);
+ if (--me->thd_cnt == 0 && me->terminated) {
+ apr_thread_cond_signal(me->all_done);
+ }
+ apr_thread_mutex_unlock(me->lock);
+
+ apr_thread_exit(t, APR_SUCCESS);
+ return NULL; /* should not be here, safe net */
+}
+
+/* Must be locked by the caller */
+static void join_dead_threads(apr_thread_pool_t *me)
+{
+ while (!APR_RING_EMPTY(me->dead_thds, apr_thread_list_elt, link)) {
+ struct apr_thread_list_elt *elt;
+ apr_status_t status;
+
+ elt = APR_RING_FIRST(me->dead_thds);
+ APR_RING_REMOVE(elt, link);
+ apr_thread_mutex_unlock(me->lock);
+
+ apr_thread_join(&status, elt->thd);
+
+ apr_thread_mutex_lock(me->lock);
+ APR_RING_INSERT_TAIL(me->recycled_thds, elt,
+ apr_thread_list_elt, link);
+ }
+}
+
+static apr_status_t thread_pool_cleanup(void *me)
+{
+ apr_thread_pool_t *_myself = me;
+
+ _myself->terminated = 1;
+ apr_thread_pool_tasks_cancel(_myself, NULL);
+ apr_thread_pool_thread_max_set(_myself, 0);
+ apr_thread_mutex_lock(_myself->lock);
+
+ if (_myself->thd_cnt) {
+ apr_thread_cond_wait(_myself->all_done, _myself->lock);
+ }
+
+ /* All threads should be dead now, join them */
+ join_dead_threads(_myself);
+
+ apr_thread_mutex_unlock(_myself->lock);
+
+ return APR_SUCCESS;
+}
+
+APU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
+ apr_size_t init_threads,
+ apr_size_t max_threads,
+ apr_pool_t * pool)
+{
+ apr_thread_t *t;
+ apr_status_t rv = APR_SUCCESS;
+ apr_thread_pool_t *tp;
+
+ *me = NULL;
+
+ rv = thread_pool_construct(&tp, init_threads, max_threads, pool);
+ if (APR_SUCCESS != rv)
+ return rv;
+ apr_pool_pre_cleanup_register(tp->pool, tp, thread_pool_cleanup);
+
+ /* Grab the mutex as apr_thread_create() and thread_pool_func() will
+ * allocate from (*me)->pool. This is dangerous if there are multiple
+ * initial threads to create.
+ */
+ apr_thread_mutex_lock(tp->lock);
+ while (init_threads--) {
+ rv = apr_thread_create(&t, NULL, thread_pool_func, tp, tp->pool);
+ if (APR_SUCCESS != rv) {
+ break;
+ }
+ tp->thd_cnt++;
+ if (tp->thd_cnt > tp->thd_high) {
+ tp->thd_high = tp->thd_cnt;
+ }
+ }
+ apr_thread_mutex_unlock(tp->lock);
+
+ if (rv == APR_SUCCESS) {
+ *me = tp;
+ }
+
+ return rv;
+}
+
+APU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me)
+{
+ /* Stop the threads before destroying me->pool: with APR <= 1.7 the
+ * threads' pools are children of me->pool and APR_POOL_DEBUG would
+ * deadlock if thread_pool_cleanup() is called while me->pool is
+ * destroyed (because of parent locking).
+ * With APR > 1.7 the threads' pools are unmanaged so there is no
+ * such issue, yet it does not hurt to stop the threads before.
+ */
+ apr_pool_cleanup_run(me->pool, me, thread_pool_cleanup);
+ apr_pool_destroy(me->pool);
+ return APR_SUCCESS;
+}
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
+ apr_thread_start_t func,
+ void *param, apr_byte_t priority,
+ void *owner, apr_time_t time)
+{
+ apr_thread_pool_task_t *t;
+
+ if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {
+ t = apr_palloc(me->pool, sizeof(*t));
+ if (NULL == t) {
+ return NULL;
+ }
+ }
+ else {
+ t = APR_RING_FIRST(me->recycled_tasks);
+ APR_RING_REMOVE(t, link);
+ }
+ APR_RING_ELEM_INIT(t, link);
+
+ t->func = func;
+ t->param = param;
+ t->owner = owner;
+ if (time > 0) {
+ t->dispatch.time = apr_time_now() + time;
+ }
+ else {
+ t->dispatch.priority = priority;
+ }
+ return t;
+}
+
+/*
+ * Test it the task is the only one within the priority segment.
+ * If it is not, return the first element with same or lower priority.
+ * Otherwise, add the task into the queue and return NULL.
+ *
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
+ apr_thread_pool_task_t * const t)
+{
+ int seg;
+ int next;
+ apr_thread_pool_task_t *t_next;
+
+ seg = TASK_PRIORITY_SEG(t);
+ if (me->task_idx[seg]) {
+ assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
+ me->task_idx[seg]);
+ t_next = me->task_idx[seg];
+ while (t_next->dispatch.priority > t->dispatch.priority) {
+ t_next = APR_RING_NEXT(t_next, link);
+ if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==
+ t_next) {
+ return t_next;
+ }
+ }
+ return t_next;
+ }
+
+ for (next = seg - 1; next >= 0; next--) {
+ if (me->task_idx[next]) {
+ APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
+ break;
+ }
+ }
+ if (0 > next) {
+ APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);
+ }
+ me->task_idx[seg] = t;
+ return NULL;
+}
+
+/*
+* schedule a task to run in "time" microseconds. Find the spot in the ring where
+* the time fits. Adjust the short_time so the thread wakes up when the time is reached.
+*/
+static apr_status_t schedule_task(apr_thread_pool_t *me,
+ apr_thread_start_t func, void *param,
+ void *owner, apr_interval_time_t time)
+{
+ apr_thread_pool_task_t *t;
+ apr_thread_pool_task_t *t_loc;
+ apr_thread_t *thd;
+ apr_status_t rv = APR_SUCCESS;
+
+ apr_thread_mutex_lock(me->lock);
+
+ if (me->terminated) {
+ /* Let the caller know that we are done */
+ apr_thread_mutex_unlock(me->lock);
+ return APR_NOTFOUND;
+ }
+
+ /* Maintain dead threads */
+ join_dead_threads(me);
+
+ t = task_new(me, func, param, 0, owner, time);
+ if (NULL == t) {
+ apr_thread_mutex_unlock(me->lock);
+ return APR_ENOMEM;
+ }
+ t_loc = APR_RING_FIRST(me->scheduled_tasks);
+ while (NULL != t_loc) {
+ /* if the time is less than the entry insert ahead of it */
+ if (t->dispatch.time < t_loc->dispatch.time) {
+ ++me->scheduled_task_cnt;
+ APR_RING_INSERT_BEFORE(t_loc, t, link);
+ break;
+ }
+ else {
+ t_loc = APR_RING_NEXT(t_loc, link);
+ if (t_loc ==
+ APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+ link)) {
+ ++me->scheduled_task_cnt;
+ APR_RING_INSERT_TAIL(me->scheduled_tasks, t,
+ apr_thread_pool_task, link);
+ break;
+ }
+ }
+ }
+ /* there should be at least one thread for scheduled tasks */
+ if (0 == me->thd_cnt) {
+ rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
+ if (APR_SUCCESS == rv) {
+ ++me->thd_cnt;
+ if (me->thd_cnt > me->thd_high)
+ me->thd_high = me->thd_cnt;
+ }
+ }
+ apr_thread_cond_signal(me->more_work);
+ apr_thread_mutex_unlock(me->lock);
+
+ return rv;
+}
+
+static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func,
+ void *param, apr_byte_t priority, int push,
+ void *owner)
+{
+ apr_thread_pool_task_t *t;
+ apr_thread_pool_task_t *t_loc;
+ apr_thread_t *thd;
+ apr_status_t rv = APR_SUCCESS;
+
+ apr_thread_mutex_lock(me->lock);
+
+ if (me->terminated) {
+ /* Let the caller know that we are done */
+ apr_thread_mutex_unlock(me->lock);
+ return APR_NOTFOUND;
+ }
+
+ /* Maintain dead threads */
+ join_dead_threads(me);
+
+ t = task_new(me, func, param, priority, owner, 0);
+ if (NULL == t) {
+ apr_thread_mutex_unlock(me->lock);
+ return APR_ENOMEM;
+ }
+
+ t_loc = add_if_empty(me, t);
+ if (NULL == t_loc) {
+ goto FINAL_EXIT;
+ }
+
+ if (push) {
+ while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
+ t_loc && t_loc->dispatch.priority >= t->dispatch.priority) {
+ t_loc = APR_RING_NEXT(t_loc, link);
+ }
+ }
+ APR_RING_INSERT_BEFORE(t_loc, t, link);
+ if (!push) {
+ if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
+ me->task_idx[TASK_PRIORITY_SEG(t)] = t;
+ }
+ }
+
+ FINAL_EXIT:
+ me->task_cnt++;
+ if (me->task_cnt > me->tasks_high)
+ me->tasks_high = me->task_cnt;
+ if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max &&
+ me->task_cnt > me->threshold)) {
+ rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
+ if (APR_SUCCESS == rv) {
+ ++me->thd_cnt;
+ if (me->thd_cnt > me->thd_high)
+ me->thd_high = me->thd_cnt;
+ }
+ }
+
+ apr_thread_cond_signal(me->more_work);
+ apr_thread_mutex_unlock(me->lock);
+
+ return rv;
+}
+
+APU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me,
+ apr_thread_start_t func,
+ void *param,
+ apr_byte_t priority,
+ void *owner)
+{
+ return add_task(me, func, param, priority, 1, owner);
+}
+
+APU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me,
+ apr_thread_start_t func,
+ void *param,
+ apr_interval_time_t time,
+ void *owner)
+{
+ return schedule_task(me, func, param, owner, time);
+}
+
+APU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me,
+ apr_thread_start_t func,
+ void *param,
+ apr_byte_t priority,
+ void *owner)
+{
+ return add_task(me, func, param, priority, 0, owner);
+}
+
+static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me,
+ void *owner)
+{
+ apr_thread_pool_task_t *t_loc;
+ apr_thread_pool_task_t *next;
+
+ t_loc = APR_RING_FIRST(me->scheduled_tasks);
+ while (t_loc !=
+ APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+ link)) {
+ next = APR_RING_NEXT(t_loc, link);
+ /* if this is the owner remove it */
+ if (!owner || t_loc->owner == owner) {
+ --me->scheduled_task_cnt;
+ APR_RING_REMOVE(t_loc, link);
+ }
+ t_loc = next;
+ }
+ return APR_SUCCESS;
+}
+
+static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner)
+{
+ apr_thread_pool_task_t *t_loc;
+ apr_thread_pool_task_t *next;
+ int seg;
+
+ t_loc = APR_RING_FIRST(me->tasks);
+ while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
+ next = APR_RING_NEXT(t_loc, link);
+ if (!owner || t_loc->owner == owner) {
+ --me->task_cnt;
+ seg = TASK_PRIORITY_SEG(t_loc);
+ if (t_loc == me->task_idx[seg]) {
+ me->task_idx[seg] = APR_RING_NEXT(t_loc, link);
+ if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
+ apr_thread_pool_task,
+ link)
+ || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
+ me->task_idx[seg] = NULL;
+ }
+ }
+ APR_RING_REMOVE(t_loc, link);
+ }
+ t_loc = next;
+ }
+ return APR_SUCCESS;
+}
+
+/* Must be locked by the caller */
+static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner)
+{
+#ifndef NDEBUG
+ apr_os_thread_t *os_thread;
+#endif
+ struct apr_thread_list_elt *elt;
+
+ elt = APR_RING_FIRST(me->busy_thds);
+ while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
+ if (owner ? owner != elt->current_owner : !elt->current_owner) {
+ elt = APR_RING_NEXT(elt, link);
+ continue;
+ }
+
+#ifndef NDEBUG
+ /* make sure the thread is not the one calling tasks_cancel */
+ apr_os_thread_get(&os_thread, elt->thd);
+#ifdef WIN32
+ /* hack for apr win32 bug */
+ assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread));
+#else
+ assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
+#endif
+#endif
+
+ elt->signal_work_done = 1;
+ apr_thread_cond_wait(me->work_done, me->lock);
+
+ /* Restart */
+ elt = APR_RING_FIRST(me->busy_thds);
+ }
+
+ /* Maintain dead threads */
+ join_dead_threads(me);
+}
+
+APU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me,
+ void *owner)
+{
+ apr_status_t rv = APR_SUCCESS;
+
+ apr_thread_mutex_lock(me->lock);
+
+ if (me->task_cnt > 0) {
+ rv = remove_tasks(me, owner);
+ }
+ if (me->scheduled_task_cnt > 0) {
+ rv = remove_scheduled_tasks(me, owner);
+ }
+
+ wait_on_busy_threads(me, owner);
+
+ apr_thread_mutex_unlock(me->lock);
+
+ return rv;
+}
+
+APU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me)
+{
+ return me->task_cnt;
+}
+
+APU_DECLARE(apr_size_t)
+ apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me)
+{
+ return me->scheduled_task_cnt;
+}
+
+APU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me)
+{
+ return me->thd_cnt;
+}
+
+APU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me)
+{
+ return me->busy_cnt;
+}
+
+APU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me)
+{
+ return me->idle_cnt;
+}
+
+APU_DECLARE(apr_size_t)
+ apr_thread_pool_tasks_run_count(apr_thread_pool_t * me)
+{
+ return me->tasks_run;
+}
+
+APU_DECLARE(apr_size_t)
+ apr_thread_pool_tasks_high_count(apr_thread_pool_t * me)
+{
+ return me->tasks_high;
+}
+
+APU_DECLARE(apr_size_t)
+ apr_thread_pool_threads_high_count(apr_thread_pool_t * me)
+{
+ return me->thd_high;
+}
+
+APU_DECLARE(apr_size_t)
+ apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me)
+{
+ return me->thd_timed_out;
+}
+
+
+APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me)
+{
+ return me->idle_max;
+}
+
+APU_DECLARE(apr_interval_time_t)
+ apr_thread_pool_idle_wait_get(apr_thread_pool_t * me)
+{
+ return me->idle_wait;
+}
+
+/*
+ * Stop threads above given *cnt, set the number of threads stopped in *cnt.
+ * NOTE: There could be busy threads become idle during this function
+ */
+static void stop_threads(apr_thread_pool_t *me, apr_size_t *cnt, int idle)
+{
+ struct apr_thread_list *thds;
+ struct apr_thread_list_elt *elt, *last;
+ apr_size_t n, i;
+
+ apr_thread_mutex_lock(me->lock);
+
+ if (idle) {
+ thds = me->idle_thds;
+ n = me->idle_cnt;
+ }
+ else {
+ thds = me->busy_thds;
+ n = me->busy_cnt;
+ }
+ if (n <= *cnt) {
+ apr_thread_mutex_unlock(me->lock);
+ *cnt = 0;
+ return;
+ }
+
+ elt = APR_RING_FIRST(thds);
+ last = APR_RING_LAST(thds);
+ for (i = 0; i < *cnt; ++i) {
+ elt = APR_RING_NEXT(elt, link);
+ }
+ for (; i < n; ++i) {
+ elt->state = TH_STOP;
+ if (elt == last) {
+ break;
+ }
+ elt = APR_RING_NEXT(elt, link);
+ }
+ assert(i + 1 == n);
+ *cnt -= n;
+
+ join_dead_threads(me);
+
+ apr_thread_mutex_unlock(me->lock);
+}
+
+static apr_size_t stop_idle_threads(apr_thread_pool_t *me, apr_size_t cnt)
+{
+ stop_threads(me, &cnt, 1);
+ if (cnt) {
+ apr_thread_mutex_lock(me->lock);
+ apr_thread_cond_broadcast(me->more_work);
+ apr_thread_mutex_unlock(me->lock);
+ }
+ return cnt;
+}
+
+static apr_size_t stop_busy_threads(apr_thread_pool_t *me, apr_size_t cnt)
+{
+ stop_threads(me, &cnt, 0);
+ return cnt;
+}
+
+APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me,
+ apr_size_t cnt)
+{
+ me->idle_max = cnt;
+ return stop_idle_threads(me, cnt);
+}
+
+APU_DECLARE(apr_interval_time_t)
+ apr_thread_pool_idle_wait_set(apr_thread_pool_t * me,
+ apr_interval_time_t timeout)
+{
+ apr_interval_time_t oldtime;
+
+ oldtime = me->idle_wait;
+ me->idle_wait = timeout;
+
+ return oldtime;
+}
+
+APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me)
+{
+ return me->thd_max;
+}
+
+/*
+ * This function stop extra working threads to the new limit.
+ * NOTE: There could be busy threads become idle during this function
+ */
+APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me,
+ apr_size_t cnt)
+{
+ apr_size_t n, i;
+
+ me->thd_max = cnt;
+ n = me->thd_cnt;
+ if (n <= cnt) {
+ return 0;
+ }
+ n -= cnt; /* #threads to stop */
+
+ i = me->idle_cnt;
+ if (n >= i) {
+ stop_busy_threads(me, n - i);
+ n = i; /* stop all idle threads */
+ }
+ stop_idle_threads(me, i - n);
+
+ return n;
+}
+
+APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me)
+{
+ return me->threshold;
+}
+
+APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me,
+ apr_size_t val)
+{
+ apr_size_t ov;
+
+ ov = me->threshold;
+ me->threshold = val;
+ return ov;
+}
+
+APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd,
+ void **owner)
+{
+ apr_status_t rv;
+ apr_thread_pool_task_t *task;
+ void *data;
+
+ rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ task = data;
+ if (!task) {
+ *owner = NULL;
+ return APR_BADARG;
+ }
+
+ *owner = task->owner;
+ return APR_SUCCESS;
+}
+
+#endif /* APR_HAS_THREADS */
+
+/* vim: set ts=4 sw=4 et cin tw=80: */