diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 06:30:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 06:30:05 +0000 |
commit | a1e354165254cd9e346751e6c2ddc554feeb0e6d (patch) | |
tree | 5fd273cc604fd00efd630eb387a6f79ce102f4e3 /misc/apr_thread_pool.c | |
parent | Initial commit. (diff) | |
download | apr-util-a1e354165254cd9e346751e6c2ddc554feeb0e6d.tar.xz apr-util-a1e354165254cd9e346751e6c2ddc554feeb0e6d.zip |
Adding upstream version 1.6.3.upstream/1.6.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'misc/apr_thread_pool.c')
-rw-r--r-- | misc/apr_thread_pool.c | 1019 |
1 files changed, 1019 insertions, 0 deletions
diff --git a/misc/apr_thread_pool.c b/misc/apr_thread_pool.c new file mode 100644 index 0000000..5aa3b65 --- /dev/null +++ b/misc/apr_thread_pool.c @@ -0,0 +1,1019 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include <assert.h> +#include "apr_thread_pool.h" +#include "apr_ring.h" +#include "apr_thread_cond.h" +#include "apr_portable.h" + +#if APR_HAS_THREADS + +#define TASK_PRIORITY_SEGS 4 +#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64) + +typedef struct apr_thread_pool_task +{ + APR_RING_ENTRY(apr_thread_pool_task) link; + apr_thread_start_t func; + void *param; + void *owner; + union + { + apr_byte_t priority; + apr_time_t time; + } dispatch; +} apr_thread_pool_task_t; + +APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task); + +struct apr_thread_list_elt +{ + APR_RING_ENTRY(apr_thread_list_elt) link; + apr_thread_t *thd; + void *current_owner; + enum { TH_RUN, TH_STOP, TH_PROBATION } state; + int signal_work_done; +}; + +APR_RING_HEAD(apr_thread_list, apr_thread_list_elt); + +struct apr_thread_pool +{ + apr_pool_t *pool; + volatile apr_size_t thd_max; + volatile apr_size_t idle_max; + volatile apr_interval_time_t idle_wait; + volatile apr_size_t thd_cnt; + volatile apr_size_t idle_cnt; + volatile apr_size_t busy_cnt; + volatile apr_size_t task_cnt; + volatile apr_size_t scheduled_task_cnt; + volatile apr_size_t threshold; + volatile apr_size_t tasks_run; + volatile apr_size_t tasks_high; + volatile apr_size_t thd_high; + volatile apr_size_t thd_timed_out; + struct apr_thread_pool_tasks *tasks; + struct apr_thread_pool_tasks *scheduled_tasks; + struct apr_thread_list *busy_thds; + struct apr_thread_list *idle_thds; + struct apr_thread_list *dead_thds; + apr_thread_cond_t *more_work; + apr_thread_cond_t *work_done; + apr_thread_cond_t *all_done; + apr_thread_mutex_t *lock; + volatile int terminated; + struct apr_thread_pool_tasks *recycled_tasks; + struct apr_thread_list *recycled_thds; + apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS]; +}; + +static apr_status_t thread_pool_construct(apr_thread_pool_t **tp, + apr_size_t init_threads, + apr_size_t max_threads, + apr_pool_t *pool) +{ + apr_status_t rv; + apr_thread_pool_t *me; + + me = *tp = apr_pcalloc(pool, sizeof(apr_thread_pool_t)); + me->thd_max = max_threads; + me->idle_max = init_threads; + me->threshold = init_threads / 2; + + /* This pool will be used by different threads. As we cannot ensure that + * our caller won't use the pool without acquiring the mutex, we must + * create a new sub pool. + */ + rv = apr_pool_create(&me->pool, pool); + if (APR_SUCCESS != rv) { + return rv; + } + /* Create the mutex on the parent pool such that it's always alive from + * apr_thread_pool_{push,schedule,top}() callers. + */ + rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED, pool); + if (APR_SUCCESS != rv) { + return rv; + } + rv = apr_thread_cond_create(&me->more_work, me->pool); + if (APR_SUCCESS != rv) { + apr_thread_mutex_destroy(me->lock); + return rv; + } + rv = apr_thread_cond_create(&me->work_done, me->pool); + if (APR_SUCCESS != rv) { + apr_thread_cond_destroy(me->more_work); + apr_thread_mutex_destroy(me->lock); + return rv; + } + rv = apr_thread_cond_create(&me->all_done, me->pool); + if (APR_SUCCESS != rv) { + apr_thread_cond_destroy(me->work_done); + apr_thread_cond_destroy(me->more_work); + apr_thread_mutex_destroy(me->lock); + return rv; + } + me->tasks = apr_palloc(me->pool, sizeof(*me->tasks)); + if (!me->tasks) { + goto CATCH_ENOMEM; + } + APR_RING_INIT(me->tasks, apr_thread_pool_task, link); + me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks)); + if (!me->scheduled_tasks) { + goto CATCH_ENOMEM; + } + APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link); + me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks)); + if (!me->recycled_tasks) { + goto CATCH_ENOMEM; + } + APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link); + me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds)); + if (!me->busy_thds) { + goto CATCH_ENOMEM; + } + APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link); + me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds)); + if (!me->idle_thds) { + goto CATCH_ENOMEM; + } + APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link); + me->dead_thds = apr_palloc(me->pool, sizeof(*me->dead_thds)); + if (!me->dead_thds) { + goto CATCH_ENOMEM; + } + APR_RING_INIT(me->dead_thds, apr_thread_list_elt, link); + me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds)); + if (!me->recycled_thds) { + goto CATCH_ENOMEM; + } + APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link); + goto FINAL_EXIT; + CATCH_ENOMEM: + rv = APR_ENOMEM; + apr_thread_cond_destroy(me->all_done); + apr_thread_cond_destroy(me->work_done); + apr_thread_cond_destroy(me->more_work); + apr_thread_mutex_destroy(me->lock); + FINAL_EXIT: + return rv; +} + +/* + * NOTE: This function is not thread safe by itself. Caller should hold the lock + */ +static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me) +{ + apr_thread_pool_task_t *task = NULL; + int seg; + + /* check for scheduled tasks */ + if (me->scheduled_task_cnt > 0) { + task = APR_RING_FIRST(me->scheduled_tasks); + assert(task != NULL); + assert(task != + APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, + link)); + /* if it's time */ + if (task->dispatch.time <= apr_time_now()) { + --me->scheduled_task_cnt; + APR_RING_REMOVE(task, link); + return task; + } + } + /* check for normal tasks if we're not returning a scheduled task */ + if (me->task_cnt == 0) { + return NULL; + } + + task = APR_RING_FIRST(me->tasks); + assert(task != NULL); + assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)); + --me->task_cnt; + seg = TASK_PRIORITY_SEG(task); + if (task == me->task_idx[seg]) { + me->task_idx[seg] = APR_RING_NEXT(task, link); + if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, + apr_thread_pool_task, link) + || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { + me->task_idx[seg] = NULL; + } + } + APR_RING_REMOVE(task, link); + return task; +} + +static apr_interval_time_t waiting_time(apr_thread_pool_t * me) +{ + apr_thread_pool_task_t *task = NULL; + + task = APR_RING_FIRST(me->scheduled_tasks); + assert(task != NULL); + assert(task != + APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, + link)); + return task->dispatch.time - apr_time_now(); +} + +/* + * NOTE: This function is not thread safe by itself. Caller should hold the lock + */ +static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me, + apr_thread_t * t) +{ + struct apr_thread_list_elt *elt; + + if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) { + elt = apr_palloc(me->pool, sizeof(*elt)); + if (NULL == elt) { + return NULL; + } + } + else { + elt = APR_RING_FIRST(me->recycled_thds); + APR_RING_REMOVE(elt, link); + } + + APR_RING_ELEM_INIT(elt, link); + elt->thd = t; + elt->current_owner = NULL; + elt->signal_work_done = 0; + elt->state = TH_RUN; + return elt; +} + +/* + * The worker thread function. Take a task from the queue and perform it if + * there is any. Otherwise, put itself into the idle thread list and waiting + * for signal to wake up. + * The thread terminates directly and exits when it is asked to stop, after + * handling its task if busy. The thread will then be in the dead_thds list + * and should be joined. + */ +static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param) +{ + apr_thread_pool_t *me = param; + apr_thread_pool_task_t *task = NULL; + apr_interval_time_t wait; + struct apr_thread_list_elt *elt; + + apr_thread_mutex_lock(me->lock); + + elt = elt_new(me, t); + if (!elt) { + apr_thread_mutex_unlock(me->lock); + apr_thread_exit(t, APR_ENOMEM); + } + + for (;;) { + /* Test if not new element, it is awakened from idle */ + if (APR_RING_NEXT(elt, link) != elt) { + --me->idle_cnt; + APR_RING_REMOVE(elt, link); + } + + if (elt->state != TH_STOP) { + ++me->busy_cnt; + APR_RING_INSERT_TAIL(me->busy_thds, elt, + apr_thread_list_elt, link); + do { + task = pop_task(me); + if (!task) { + break; + } + ++me->tasks_run; + elt->current_owner = task->owner; + apr_thread_mutex_unlock(me->lock); + + /* Run the task (or drop it if terminated already) */ + if (!me->terminated) { + apr_thread_data_set(task, "apr_thread_pool_task", NULL, t); + task->func(t, task->param); + } + + apr_thread_mutex_lock(me->lock); + APR_RING_INSERT_TAIL(me->recycled_tasks, task, + apr_thread_pool_task, link); + elt->current_owner = NULL; + if (elt->signal_work_done) { + elt->signal_work_done = 0; + apr_thread_cond_signal(me->work_done); + } + } while (elt->state != TH_STOP); + APR_RING_REMOVE(elt, link); + --me->busy_cnt; + } + assert(NULL == elt->current_owner); + + /* thread should die? */ + if (me->terminated + || elt->state != TH_RUN + || (me->idle_cnt >= me->idle_max + && (me->idle_max || !me->scheduled_task_cnt) + && !me->idle_wait)) { + if ((TH_PROBATION == elt->state) && me->idle_wait) + ++me->thd_timed_out; + break; + } + + /* busy thread become idle */ + ++me->idle_cnt; + APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link); + + /* + * If there is a scheduled task, always scheduled to perform that task. + * Since there is no guarantee that current idle threads are scheduled + * for next scheduled task. + */ + if (me->scheduled_task_cnt) + wait = waiting_time(me); + else if (me->idle_cnt > me->idle_max) { + wait = me->idle_wait; + elt->state = TH_PROBATION; + } + else + wait = -1; + + if (wait >= 0) { + apr_thread_cond_timedwait(me->more_work, me->lock, wait); + } + else { + apr_thread_cond_wait(me->more_work, me->lock); + } + } + + /* Dead thread, to be joined */ + APR_RING_INSERT_TAIL(me->dead_thds, elt, apr_thread_list_elt, link); + if (--me->thd_cnt == 0 && me->terminated) { + apr_thread_cond_signal(me->all_done); + } + apr_thread_mutex_unlock(me->lock); + + apr_thread_exit(t, APR_SUCCESS); + return NULL; /* should not be here, safe net */ +} + +/* Must be locked by the caller */ +static void join_dead_threads(apr_thread_pool_t *me) +{ + while (!APR_RING_EMPTY(me->dead_thds, apr_thread_list_elt, link)) { + struct apr_thread_list_elt *elt; + apr_status_t status; + + elt = APR_RING_FIRST(me->dead_thds); + APR_RING_REMOVE(elt, link); + apr_thread_mutex_unlock(me->lock); + + apr_thread_join(&status, elt->thd); + + apr_thread_mutex_lock(me->lock); + APR_RING_INSERT_TAIL(me->recycled_thds, elt, + apr_thread_list_elt, link); + } +} + +static apr_status_t thread_pool_cleanup(void *me) +{ + apr_thread_pool_t *_myself = me; + + _myself->terminated = 1; + apr_thread_pool_tasks_cancel(_myself, NULL); + apr_thread_pool_thread_max_set(_myself, 0); + apr_thread_mutex_lock(_myself->lock); + + if (_myself->thd_cnt) { + apr_thread_cond_wait(_myself->all_done, _myself->lock); + } + + /* All threads should be dead now, join them */ + join_dead_threads(_myself); + + apr_thread_mutex_unlock(_myself->lock); + + return APR_SUCCESS; +} + +APU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me, + apr_size_t init_threads, + apr_size_t max_threads, + apr_pool_t * pool) +{ + apr_thread_t *t; + apr_status_t rv = APR_SUCCESS; + apr_thread_pool_t *tp; + + *me = NULL; + + rv = thread_pool_construct(&tp, init_threads, max_threads, pool); + if (APR_SUCCESS != rv) + return rv; + apr_pool_pre_cleanup_register(tp->pool, tp, thread_pool_cleanup); + + /* Grab the mutex as apr_thread_create() and thread_pool_func() will + * allocate from (*me)->pool. This is dangerous if there are multiple + * initial threads to create. + */ + apr_thread_mutex_lock(tp->lock); + while (init_threads--) { + rv = apr_thread_create(&t, NULL, thread_pool_func, tp, tp->pool); + if (APR_SUCCESS != rv) { + break; + } + tp->thd_cnt++; + if (tp->thd_cnt > tp->thd_high) { + tp->thd_high = tp->thd_cnt; + } + } + apr_thread_mutex_unlock(tp->lock); + + if (rv == APR_SUCCESS) { + *me = tp; + } + + return rv; +} + +APU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me) +{ + /* Stop the threads before destroying me->pool: with APR <= 1.7 the + * threads' pools are children of me->pool and APR_POOL_DEBUG would + * deadlock if thread_pool_cleanup() is called while me->pool is + * destroyed (because of parent locking). + * With APR > 1.7 the threads' pools are unmanaged so there is no + * such issue, yet it does not hurt to stop the threads before. + */ + apr_pool_cleanup_run(me->pool, me, thread_pool_cleanup); + apr_pool_destroy(me->pool); + return APR_SUCCESS; +} + +/* + * NOTE: This function is not thread safe by itself. Caller should hold the lock + */ +static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me, + apr_thread_start_t func, + void *param, apr_byte_t priority, + void *owner, apr_time_t time) +{ + apr_thread_pool_task_t *t; + + if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) { + t = apr_palloc(me->pool, sizeof(*t)); + if (NULL == t) { + return NULL; + } + } + else { + t = APR_RING_FIRST(me->recycled_tasks); + APR_RING_REMOVE(t, link); + } + APR_RING_ELEM_INIT(t, link); + + t->func = func; + t->param = param; + t->owner = owner; + if (time > 0) { + t->dispatch.time = apr_time_now() + time; + } + else { + t->dispatch.priority = priority; + } + return t; +} + +/* + * Test it the task is the only one within the priority segment. + * If it is not, return the first element with same or lower priority. + * Otherwise, add the task into the queue and return NULL. + * + * NOTE: This function is not thread safe by itself. Caller should hold the lock + */ +static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me, + apr_thread_pool_task_t * const t) +{ + int seg; + int next; + apr_thread_pool_task_t *t_next; + + seg = TASK_PRIORITY_SEG(t); + if (me->task_idx[seg]) { + assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != + me->task_idx[seg]); + t_next = me->task_idx[seg]; + while (t_next->dispatch.priority > t->dispatch.priority) { + t_next = APR_RING_NEXT(t_next, link); + if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) == + t_next) { + return t_next; + } + } + return t_next; + } + + for (next = seg - 1; next >= 0; next--) { + if (me->task_idx[next]) { + APR_RING_INSERT_BEFORE(me->task_idx[next], t, link); + break; + } + } + if (0 > next) { + APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link); + } + me->task_idx[seg] = t; + return NULL; +} + +/* +* schedule a task to run in "time" microseconds. Find the spot in the ring where +* the time fits. Adjust the short_time so the thread wakes up when the time is reached. +*/ +static apr_status_t schedule_task(apr_thread_pool_t *me, + apr_thread_start_t func, void *param, + void *owner, apr_interval_time_t time) +{ + apr_thread_pool_task_t *t; + apr_thread_pool_task_t *t_loc; + apr_thread_t *thd; + apr_status_t rv = APR_SUCCESS; + + apr_thread_mutex_lock(me->lock); + + if (me->terminated) { + /* Let the caller know that we are done */ + apr_thread_mutex_unlock(me->lock); + return APR_NOTFOUND; + } + + /* Maintain dead threads */ + join_dead_threads(me); + + t = task_new(me, func, param, 0, owner, time); + if (NULL == t) { + apr_thread_mutex_unlock(me->lock); + return APR_ENOMEM; + } + t_loc = APR_RING_FIRST(me->scheduled_tasks); + while (NULL != t_loc) { + /* if the time is less than the entry insert ahead of it */ + if (t->dispatch.time < t_loc->dispatch.time) { + ++me->scheduled_task_cnt; + APR_RING_INSERT_BEFORE(t_loc, t, link); + break; + } + else { + t_loc = APR_RING_NEXT(t_loc, link); + if (t_loc == + APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, + link)) { + ++me->scheduled_task_cnt; + APR_RING_INSERT_TAIL(me->scheduled_tasks, t, + apr_thread_pool_task, link); + break; + } + } + } + /* there should be at least one thread for scheduled tasks */ + if (0 == me->thd_cnt) { + rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); + if (APR_SUCCESS == rv) { + ++me->thd_cnt; + if (me->thd_cnt > me->thd_high) + me->thd_high = me->thd_cnt; + } + } + apr_thread_cond_signal(me->more_work); + apr_thread_mutex_unlock(me->lock); + + return rv; +} + +static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func, + void *param, apr_byte_t priority, int push, + void *owner) +{ + apr_thread_pool_task_t *t; + apr_thread_pool_task_t *t_loc; + apr_thread_t *thd; + apr_status_t rv = APR_SUCCESS; + + apr_thread_mutex_lock(me->lock); + + if (me->terminated) { + /* Let the caller know that we are done */ + apr_thread_mutex_unlock(me->lock); + return APR_NOTFOUND; + } + + /* Maintain dead threads */ + join_dead_threads(me); + + t = task_new(me, func, param, priority, owner, 0); + if (NULL == t) { + apr_thread_mutex_unlock(me->lock); + return APR_ENOMEM; + } + + t_loc = add_if_empty(me, t); + if (NULL == t_loc) { + goto FINAL_EXIT; + } + + if (push) { + while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != + t_loc && t_loc->dispatch.priority >= t->dispatch.priority) { + t_loc = APR_RING_NEXT(t_loc, link); + } + } + APR_RING_INSERT_BEFORE(t_loc, t, link); + if (!push) { + if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) { + me->task_idx[TASK_PRIORITY_SEG(t)] = t; + } + } + + FINAL_EXIT: + me->task_cnt++; + if (me->task_cnt > me->tasks_high) + me->tasks_high = me->task_cnt; + if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max && + me->task_cnt > me->threshold)) { + rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); + if (APR_SUCCESS == rv) { + ++me->thd_cnt; + if (me->thd_cnt > me->thd_high) + me->thd_high = me->thd_cnt; + } + } + + apr_thread_cond_signal(me->more_work); + apr_thread_mutex_unlock(me->lock); + + return rv; +} + +APU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me, + apr_thread_start_t func, + void *param, + apr_byte_t priority, + void *owner) +{ + return add_task(me, func, param, priority, 1, owner); +} + +APU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me, + apr_thread_start_t func, + void *param, + apr_interval_time_t time, + void *owner) +{ + return schedule_task(me, func, param, owner, time); +} + +APU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me, + apr_thread_start_t func, + void *param, + apr_byte_t priority, + void *owner) +{ + return add_task(me, func, param, priority, 0, owner); +} + +static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me, + void *owner) +{ + apr_thread_pool_task_t *t_loc; + apr_thread_pool_task_t *next; + + t_loc = APR_RING_FIRST(me->scheduled_tasks); + while (t_loc != + APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, + link)) { + next = APR_RING_NEXT(t_loc, link); + /* if this is the owner remove it */ + if (!owner || t_loc->owner == owner) { + --me->scheduled_task_cnt; + APR_RING_REMOVE(t_loc, link); + } + t_loc = next; + } + return APR_SUCCESS; +} + +static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner) +{ + apr_thread_pool_task_t *t_loc; + apr_thread_pool_task_t *next; + int seg; + + t_loc = APR_RING_FIRST(me->tasks); + while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) { + next = APR_RING_NEXT(t_loc, link); + if (!owner || t_loc->owner == owner) { + --me->task_cnt; + seg = TASK_PRIORITY_SEG(t_loc); + if (t_loc == me->task_idx[seg]) { + me->task_idx[seg] = APR_RING_NEXT(t_loc, link); + if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, + apr_thread_pool_task, + link) + || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { + me->task_idx[seg] = NULL; + } + } + APR_RING_REMOVE(t_loc, link); + } + t_loc = next; + } + return APR_SUCCESS; +} + +/* Must be locked by the caller */ +static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner) +{ +#ifndef NDEBUG + apr_os_thread_t *os_thread; +#endif + struct apr_thread_list_elt *elt; + + elt = APR_RING_FIRST(me->busy_thds); + while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) { + if (owner ? owner != elt->current_owner : !elt->current_owner) { + elt = APR_RING_NEXT(elt, link); + continue; + } + +#ifndef NDEBUG + /* make sure the thread is not the one calling tasks_cancel */ + apr_os_thread_get(&os_thread, elt->thd); +#ifdef WIN32 + /* hack for apr win32 bug */ + assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread)); +#else + assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread)); +#endif +#endif + + elt->signal_work_done = 1; + apr_thread_cond_wait(me->work_done, me->lock); + + /* Restart */ + elt = APR_RING_FIRST(me->busy_thds); + } + + /* Maintain dead threads */ + join_dead_threads(me); +} + +APU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me, + void *owner) +{ + apr_status_t rv = APR_SUCCESS; + + apr_thread_mutex_lock(me->lock); + + if (me->task_cnt > 0) { + rv = remove_tasks(me, owner); + } + if (me->scheduled_task_cnt > 0) { + rv = remove_scheduled_tasks(me, owner); + } + + wait_on_busy_threads(me, owner); + + apr_thread_mutex_unlock(me->lock); + + return rv; +} + +APU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me) +{ + return me->task_cnt; +} + +APU_DECLARE(apr_size_t) + apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me) +{ + return me->scheduled_task_cnt; +} + +APU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me) +{ + return me->thd_cnt; +} + +APU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me) +{ + return me->busy_cnt; +} + +APU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me) +{ + return me->idle_cnt; +} + +APU_DECLARE(apr_size_t) + apr_thread_pool_tasks_run_count(apr_thread_pool_t * me) +{ + return me->tasks_run; +} + +APU_DECLARE(apr_size_t) + apr_thread_pool_tasks_high_count(apr_thread_pool_t * me) +{ + return me->tasks_high; +} + +APU_DECLARE(apr_size_t) + apr_thread_pool_threads_high_count(apr_thread_pool_t * me) +{ + return me->thd_high; +} + +APU_DECLARE(apr_size_t) + apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me) +{ + return me->thd_timed_out; +} + + +APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me) +{ + return me->idle_max; +} + +APU_DECLARE(apr_interval_time_t) + apr_thread_pool_idle_wait_get(apr_thread_pool_t * me) +{ + return me->idle_wait; +} + +/* + * Stop threads above given *cnt, set the number of threads stopped in *cnt. + * NOTE: There could be busy threads become idle during this function + */ +static void stop_threads(apr_thread_pool_t *me, apr_size_t *cnt, int idle) +{ + struct apr_thread_list *thds; + struct apr_thread_list_elt *elt, *last; + apr_size_t n, i; + + apr_thread_mutex_lock(me->lock); + + if (idle) { + thds = me->idle_thds; + n = me->idle_cnt; + } + else { + thds = me->busy_thds; + n = me->busy_cnt; + } + if (n <= *cnt) { + apr_thread_mutex_unlock(me->lock); + *cnt = 0; + return; + } + + elt = APR_RING_FIRST(thds); + last = APR_RING_LAST(thds); + for (i = 0; i < *cnt; ++i) { + elt = APR_RING_NEXT(elt, link); + } + for (; i < n; ++i) { + elt->state = TH_STOP; + if (elt == last) { + break; + } + elt = APR_RING_NEXT(elt, link); + } + assert(i + 1 == n); + *cnt -= n; + + join_dead_threads(me); + + apr_thread_mutex_unlock(me->lock); +} + +static apr_size_t stop_idle_threads(apr_thread_pool_t *me, apr_size_t cnt) +{ + stop_threads(me, &cnt, 1); + if (cnt) { + apr_thread_mutex_lock(me->lock); + apr_thread_cond_broadcast(me->more_work); + apr_thread_mutex_unlock(me->lock); + } + return cnt; +} + +static apr_size_t stop_busy_threads(apr_thread_pool_t *me, apr_size_t cnt) +{ + stop_threads(me, &cnt, 0); + return cnt; +} + +APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me, + apr_size_t cnt) +{ + me->idle_max = cnt; + return stop_idle_threads(me, cnt); +} + +APU_DECLARE(apr_interval_time_t) + apr_thread_pool_idle_wait_set(apr_thread_pool_t * me, + apr_interval_time_t timeout) +{ + apr_interval_time_t oldtime; + + oldtime = me->idle_wait; + me->idle_wait = timeout; + + return oldtime; +} + +APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me) +{ + return me->thd_max; +} + +/* + * This function stop extra working threads to the new limit. + * NOTE: There could be busy threads become idle during this function + */ +APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me, + apr_size_t cnt) +{ + apr_size_t n, i; + + me->thd_max = cnt; + n = me->thd_cnt; + if (n <= cnt) { + return 0; + } + n -= cnt; /* #threads to stop */ + + i = me->idle_cnt; + if (n >= i) { + stop_busy_threads(me, n - i); + n = i; /* stop all idle threads */ + } + stop_idle_threads(me, i - n); + + return n; +} + +APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me) +{ + return me->threshold; +} + +APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me, + apr_size_t val) +{ + apr_size_t ov; + + ov = me->threshold; + me->threshold = val; + return ov; +} + +APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd, + void **owner) +{ + apr_status_t rv; + apr_thread_pool_task_t *task; + void *data; + + rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd); + if (rv != APR_SUCCESS) { + return rv; + } + + task = data; + if (!task) { + *owner = NULL; + return APR_BADARG; + } + + *owner = task->owner; + return APR_SUCCESS; +} + +#endif /* APR_HAS_THREADS */ + +/* vim: set ts=4 sw=4 et cin tw=80: */ |