diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:24:36 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:24:36 +0000 |
commit | 06eaf7232e9a920468c0f8d74dcf2fe8b555501c (patch) | |
tree | e2c7b5777f728320e5b5542b6213fd3591ba51e2 /tpool/tpool_generic.cc | |
parent | Initial commit. (diff) | |
download | mariadb-06eaf7232e9a920468c0f8d74dcf2fe8b555501c.tar.xz mariadb-06eaf7232e9a920468c0f8d74dcf2fe8b555501c.zip |
Adding upstream version 1:10.11.6.upstream/1%10.11.6
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tpool/tpool_generic.cc')
-rw-r--r-- | tpool/tpool_generic.cc | 965 |
1 files changed, 965 insertions, 0 deletions
diff --git a/tpool/tpool_generic.cc b/tpool/tpool_generic.cc new file mode 100644 index 00000000..fd97b446 --- /dev/null +++ b/tpool/tpool_generic.cc @@ -0,0 +1,965 @@ +/* Copyright (C) 2019, 2022, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" +#include <limits.h> +#include <algorithm> +#include <assert.h> +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <iostream> +#include <limits.h> +#include <mutex> +#include <queue> +#include <stack> +#include <thread> +#include <vector> +#include "tpool.h" +#include <assert.h> +#include <my_global.h> +#include <my_dbug.h> +#include <thr_timer.h> +#include <stdlib.h> +#include "aligned.h" + +namespace tpool +{ + +#ifdef __linux__ +#if defined(HAVE_URING) || defined(LINUX_NATIVE_AIO) + extern aio* create_linux_aio(thread_pool* tp, int max_io); +#else + aio *create_linux_aio(thread_pool *, int) { return nullptr; }; +#endif +#endif +#ifdef _WIN32 + extern aio* create_win_aio(thread_pool* tp, int max_io); +#endif + + static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500); + static const int OVERSUBSCRIBE_FACTOR = 2; + +/** + Process the cb synchronously +*/ +void aio::synchronous(aiocb *cb) +{ +#ifdef _WIN32 + size_t ret_len; +#else + ssize_t ret_len; +#endif + int err= 0; + switch (cb->m_opcode) + { + case aio_opcode::AIO_PREAD: + ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset); + break; + case aio_opcode::AIO_PWRITE: + ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset); + break; + default: + abort(); + } +#ifdef _WIN32 + if (static_cast<int>(ret_len) < 0) + err= GetLastError(); +#else + if (ret_len < 0) + { + err= errno; + ret_len= 0; + } +#endif + cb->m_ret_len = ret_len; + cb->m_err = err; + if (ret_len) + finish_synchronous(cb); +} + + +/** + Implementation of generic threadpool. + This threadpool consists of the following components + + - The task queue. This queue is populated by submit() + - Worker that execute the work items. + - Timer thread that takes care of pool health + + The task queue is populated by submit() method. + on submit(), a worker thread can be woken, or created + to execute tasks. + + The timer thread watches if work items are being dequeued, and if not, + this can indicate potential deadlock. + Thus the timer thread can also wake or create a thread, to ensure some progress. + + Optimizations: + + - worker threads that are idle for long time will shutdown. + - worker threads are woken in LIFO order, which minimizes context switching + and also ensures that idle timeout works well. LIFO wakeup order ensures + that active threads stay active, and idle ones stay idle. + +*/ + +/** + Worker wakeup flags. +*/ +enum worker_wake_reason +{ + WAKE_REASON_NONE, + WAKE_REASON_TASK, + WAKE_REASON_SHUTDOWN +}; + + + +/* A per-worker thread structure.*/ +struct alignas(CPU_LEVEL1_DCACHE_LINESIZE) worker_data +{ + /** Condition variable to wakeup this worker.*/ + std::condition_variable m_cv; + + /** Reason why worker was woken. */ + worker_wake_reason m_wake_reason; + + /** + If worker wakes up with WAKE_REASON_TASK, this the task it needs to execute. + */ + task* m_task; + + /** Struct is member of intrusive doubly linked list */ + worker_data* m_prev; + worker_data* m_next; + + /* Current state of the worker.*/ + enum state + { + NONE = 0, + EXECUTING_TASK = 1, + LONG_TASK = 2, + WAITING = 4 + }; + + int m_state; + + bool is_executing_task() + { + return m_state & EXECUTING_TASK; + } + bool is_long_task() + { + return m_state & LONG_TASK; + } + bool is_waiting() + { + return m_state & WAITING; + } + std::chrono::system_clock::time_point m_task_start_time; + worker_data() : + m_cv(), + m_wake_reason(WAKE_REASON_NONE), + m_task(), + m_prev(), + m_next(), + m_state(NONE), + m_task_start_time() + {} + + /*Define custom new/delete because of overaligned structure. */ + static void *operator new(size_t size) + { + return aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE); + } + static void operator delete(void* p) + { + aligned_free(p); + } +}; + + +static thread_local worker_data* tls_worker_data; + +class thread_pool_generic : public thread_pool +{ + /** Cache for per-worker structures */ + cache<worker_data> m_thread_data_cache; + + /** The task queue */ + circular_queue<task*> m_task_queue; + + /** List of standby (idle) workers */ + doubly_linked_list<worker_data> m_standby_threads; + + /** List of threads that are executing tasks */ + doubly_linked_list<worker_data> m_active_threads; + + /* Mutex that protects the whole struct, most importantly + the standby threads list, and task queue */ + std::mutex m_mtx; + + /** Timeout after which idle worker shuts down */ + std::chrono::milliseconds m_thread_timeout; + + /** How often should timer wakeup.*/ + std::chrono::milliseconds m_timer_interval; + + /** Another condition variable, used in pool shutdown */ + std::condition_variable m_cv_no_threads; + + /** Condition variable for the timer thread. Signaled on shutdown. */ + std::condition_variable m_cv_timer; + + /** Overall number of enqueues*/ + unsigned long long m_tasks_enqueued; + unsigned long long m_group_enqueued; + /** Overall number of dequeued tasks. */ + unsigned long long m_tasks_dequeued; + + /** Statistic related, number of worker thread wakeups */ + int m_wakeups; + + /** + Statistic related, number of spurious thread wakeups + (i.e thread woke up, and the task queue is empty) + */ + int m_spurious_wakeups; + + /** The desired concurrency. This number of workers should be + actively executing. */ + unsigned int m_concurrency; + + /** True, if threadpool is being shutdown, false otherwise */ + bool m_in_shutdown; + + /** Maintenance timer state : true = active(ON),false = inactive(OFF)*/ + enum class timer_state_t + { + OFF, ON + }; + timer_state_t m_timer_state= timer_state_t::OFF; + void switch_timer(timer_state_t state); + + /* Updates idle_since, and maybe switches the timer off */ + void check_idle(std::chrono::system_clock::time_point now); + + /** time point when timer last ran, used as a coarse clock. */ + std::chrono::system_clock::time_point m_timestamp; + + /** Number of long running tasks. The long running tasks are excluded when + adjusting concurrency */ + unsigned int m_long_tasks_count; + + unsigned int m_waiting_task_count; + + /** Last time thread was created*/ + std::chrono::system_clock::time_point m_last_thread_creation; + + /** Minimumum number of threads in this pool.*/ + unsigned int m_min_threads; + + /** Maximimum number of threads in this pool. */ + unsigned int m_max_threads; + + /* maintenance related statistics (see maintenance()) */ + size_t m_last_thread_count; + unsigned long long m_last_activity; + std::atomic_flag m_thread_creation_pending= ATOMIC_FLAG_INIT; + + void worker_main(worker_data *thread_data); + void worker_end(worker_data* thread_data); + + /* Checks threadpool responsiveness, adjusts thread_counts */ + void maintenance(); + static void maintenance_func(void* arg) + { + ((thread_pool_generic *)arg)->maintenance(); + } + bool add_thread(); + bool wake(worker_wake_reason reason, task *t = nullptr); + void maybe_wake_or_create_thread(); + bool too_many_active_threads(); + bool get_task(worker_data *thread_var, task **t); + bool wait_for_tasks(std::unique_lock<std::mutex> &lk, + worker_data *thread_var); + void cancel_pending(task* t); + + size_t thread_count() + { + return m_active_threads.size() + m_standby_threads.size(); + } +public: + thread_pool_generic(int min_threads, int max_threads); + ~thread_pool_generic(); + void wait_begin() override; + void wait_end() override; + void submit_task(task *task) override; + virtual aio *create_native_aio(int max_io) override + { +#ifdef _WIN32 + return create_win_aio(this, max_io); +#elif defined(__linux__) + return create_linux_aio(this,max_io); +#else + return nullptr; +#endif + } + + class timer_generic : public thr_timer_t, public timer + { + thread_pool_generic* m_pool; + waitable_task m_task; + callback_func m_callback; + void* m_data; + int m_period; + std::mutex m_mtx; + bool m_on; + std::atomic<int> m_running; + + void run() + { + /* + In rare cases, multiple callbacks can be scheduled, + at the same time,. e.g with set_time(0,0) in a loop. + We do not allow parallel execution, since it is against the expectations. + */ + if (m_running.fetch_add(1, std::memory_order_acquire) > 0) + return; + do + { + m_callback(m_data); + } + while (m_running.fetch_sub(1, std::memory_order_release) != 1); + + if (m_pool && m_period) + { + std::unique_lock<std::mutex> lk(m_mtx); + if (m_on) + { + DBUG_PUSH_EMPTY; + thr_timer_end(this); + thr_timer_settime(this, 1000ULL * m_period); + DBUG_POP_EMPTY; + } + } + } + + static void execute(void* arg) + { + auto timer = (timer_generic*)arg; + timer->run(); + } + + static void submit_task(void* arg) + { + timer_generic* timer = (timer_generic*)arg; + timer->m_pool->submit_task(&timer->m_task); + } + + public: + timer_generic(callback_func func, void* data, thread_pool_generic * pool): + m_pool(pool), + m_task(timer_generic::execute,this), + m_callback(func),m_data(data),m_period(0),m_mtx(), + m_on(true),m_running() + { + if (pool) + { + /* EXecute callback in threadpool*/ + thr_timer_init(this, submit_task, this); + } + else + { + /* run in "timer" thread */ + thr_timer_init(this, m_task.get_func(), m_task.get_arg()); + } + } + + void set_time(int initial_delay_ms, int period_ms) override + { + std::unique_lock<std::mutex> lk(m_mtx); + if (!m_on) + return; + thr_timer_end(this); + if (!m_pool) + thr_timer_set_period(this, 1000ULL * period_ms); + else + m_period = period_ms; + thr_timer_settime(this, 1000ULL * initial_delay_ms); + } + + /* + Change only period of a periodic timer + (after the next execution). Workarounds + mysys timer deadlocks + */ + void set_period(int period_ms) + { + std::unique_lock<std::mutex> lk(m_mtx); + if (!m_on) + return; + if (!m_pool) + thr_timer_set_period(this, 1000ULL * period_ms); + else + m_period = period_ms; + } + + void disarm() override + { + std::unique_lock<std::mutex> lk(m_mtx); + m_on = false; + thr_timer_end(this); + lk.unlock(); + + if (m_task.m_group) + { + m_task.m_group->cancel_pending(&m_task); + } + if (m_pool) + { + m_pool->cancel_pending(&m_task); + } + m_task.wait(); + } + + virtual ~timer_generic() + { + disarm(); + } + }; + timer_generic m_maintenance_timer; + virtual timer* create_timer(callback_func func, void *data) override + { + return new timer_generic(func, data, this); + } + void set_concurrency(unsigned int concurrency=0) override; +}; + +void thread_pool_generic::cancel_pending(task* t) +{ + std::unique_lock <std::mutex> lk(m_mtx); + for (auto it = m_task_queue.begin(); it != m_task_queue.end(); it++) + { + if (*it == t) + { + t->release(); + *it = nullptr; + } + } +} +/** + Register worker in standby list, and wait to be woken. + + @retval true if thread was woken + @retval false idle wait timeout exceeded (the current thread must shutdown) +*/ +bool thread_pool_generic::wait_for_tasks(std::unique_lock<std::mutex> &lk, + worker_data *thread_data) +{ + assert(m_task_queue.empty()); + assert(!m_in_shutdown); + + thread_data->m_wake_reason= WAKE_REASON_NONE; + m_active_threads.erase(thread_data); + m_standby_threads.push_back(thread_data); + + for (;;) + { + thread_data->m_cv.wait_for(lk, m_thread_timeout); + + if (thread_data->m_wake_reason != WAKE_REASON_NONE) + { + /* Woke up not due to timeout.*/ + return true; + } + + if (thread_count() <= m_min_threads) + { + /* Do not shutdown thread, maintain required minimum of worker + threads.*/ + continue; + } + + /* + Woke up due to timeout, remove this thread's from the standby list. In + all other cases where it is signaled it is removed by the signaling + thread. + */ + m_standby_threads.erase(thread_data); + m_active_threads.push_back(thread_data); + return false; + } +} + + +/** + Workers "get next task" routine. + + A task can be handed over to the current thread directly during submit(). + if thread_var->m_wake_reason == WAKE_REASON_TASK. + + Or a task can be taken from the task queue. + In case task queue is empty, the worker thread will park (wait for wakeup). +*/ +bool thread_pool_generic::get_task(worker_data *thread_var, task **t) +{ + std::unique_lock<std::mutex> lk(m_mtx); + + if (thread_var->is_long_task()) + { + DBUG_ASSERT(m_long_tasks_count); + m_long_tasks_count--; + } + DBUG_ASSERT(!thread_var->is_waiting()); + thread_var->m_state = worker_data::NONE; + + while (m_task_queue.empty()) + { + if (m_in_shutdown) + return false; + + if (!wait_for_tasks(lk, thread_var)) + return false; + if (m_task_queue.empty()) + { + m_spurious_wakeups++; + continue; + } + } + + /* Dequeue from the task queue.*/ + *t= m_task_queue.front(); + m_task_queue.pop(); + m_tasks_dequeued++; + thread_var->m_state |= worker_data::EXECUTING_TASK; + thread_var->m_task_start_time = m_timestamp; + return true; +} + +/** Worker thread shutdown routine. */ +void thread_pool_generic::worker_end(worker_data* thread_data) +{ + std::lock_guard<std::mutex> lk(m_mtx); + DBUG_ASSERT(!thread_data->is_long_task()); + m_active_threads.erase(thread_data); + m_thread_data_cache.put(thread_data); + + if (!thread_count() && m_in_shutdown) + { + /* Signal the destructor that no more threads are left. */ + m_cv_no_threads.notify_all(); + } +} + +extern "C" void set_tls_pool(tpool::thread_pool* pool); + +/* The worker get/execute task loop.*/ +void thread_pool_generic::worker_main(worker_data *thread_var) +{ + task* task; + set_tls_pool(this); + if(m_worker_init_callback) + m_worker_init_callback(); + + tls_worker_data = thread_var; + m_thread_creation_pending.clear(); + + while (get_task(thread_var, &task) && task) + { + task->execute(); + } + + if (m_worker_destroy_callback) + m_worker_destroy_callback(); + + worker_end(thread_var); +} + + +/* + Check if threadpool had been idle for a while + Switch off maintenance timer if it is in idle state + for too long. + + Helper function, to be used inside maintenance callback, + before m_last_activity is updated +*/ + +static const auto invalid_timestamp= std::chrono::system_clock::time_point::max(); +constexpr auto max_idle_time= std::chrono::minutes(1); + +/* Time since maintenance timer had nothing to do */ +static std::chrono::system_clock::time_point idle_since= invalid_timestamp; +void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now) +{ + DBUG_ASSERT(m_task_queue.empty()); + + /* + We think that there is no activity, if there were at most 2 tasks + since last time, and there is a spare thread. + The 2 tasks (and not 0) is to account for some periodic timers. + */ + bool idle= m_standby_threads.m_count > 0; + + if (!idle) + { + idle_since= invalid_timestamp; + return; + } + + if (idle_since == invalid_timestamp) + { + idle_since= now; + return; + } + + /* Switch timer off after 1 minute of idle time */ + if (now - idle_since > max_idle_time) + { + idle_since= invalid_timestamp; + switch_timer(timer_state_t::OFF); + } +} + + +/* + Periodic job to fix thread count and concurrency, + in case of long tasks, etc +*/ +void thread_pool_generic::maintenance() +{ + /* + If pool is busy (i.e the its mutex is currently locked), we can + skip the maintenance task, some times, to lower mutex contention + */ + static int skip_counter; + const int MAX_SKIPS = 10; + std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock); + if (skip_counter == MAX_SKIPS) + { + lk.lock(); + } + else if (!lk.try_lock()) + { + skip_counter++; + return; + } + + skip_counter = 0; + + m_timestamp = std::chrono::system_clock::now(); + + if (m_task_queue.empty()) + { + check_idle(m_timestamp); + m_last_activity = m_tasks_dequeued + m_wakeups; + return; + } + + m_long_tasks_count = 0; + for (auto thread_data = m_active_threads.front(); + thread_data; + thread_data = thread_data->m_next) + { + if (thread_data->is_executing_task() && + !thread_data->is_waiting() && + (thread_data->is_long_task() + || (m_timestamp - thread_data->m_task_start_time > LONG_TASK_DURATION))) + { + thread_data->m_state |= worker_data::LONG_TASK; + m_long_tasks_count++; + } + } + + maybe_wake_or_create_thread(); + + size_t thread_cnt = (int)thread_count(); + if (m_last_activity == m_tasks_dequeued + m_wakeups && + m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt) + { + // no progress made since last iteration. create new + // thread + add_thread(); + } + m_last_activity = m_tasks_dequeued + m_wakeups; + m_last_thread_count= thread_cnt; +} + +/* + Heuristic used for thread creation throttling. + Returns interval in milliseconds between thread creation + (depending on number of threads already in the pool, and + desired concurrency level) +*/ +static int throttling_interval_ms(size_t n_threads,size_t concurrency) +{ + if (n_threads < concurrency*4) + return 0; + + if (n_threads < concurrency*8) + return 50; + + if (n_threads < concurrency*16) + return 100; + + return 200; +} + +/* Create a new worker.*/ +bool thread_pool_generic::add_thread() +{ + if (m_thread_creation_pending.test_and_set()) + return false; + + size_t n_threads = thread_count(); + + if (n_threads >= m_max_threads) + return false; + if (n_threads >= m_min_threads) + { + auto now = std::chrono::system_clock::now(); + if (now - m_last_thread_creation < + std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency))) + { + /* + Throttle thread creation and wakeup deadlock detection timer, + if is it off. + */ + switch_timer(timer_state_t::ON); + + return false; + } + } + + worker_data *thread_data = m_thread_data_cache.get(); + m_active_threads.push_back(thread_data); + try + { + std::thread thread(&thread_pool_generic::worker_main, this, thread_data); + m_last_thread_creation = std::chrono::system_clock::now(); + thread.detach(); + } + catch (std::system_error& e) + { + m_active_threads.erase(thread_data); + m_thread_data_cache.put(thread_data); + static bool warning_written; + if (!warning_written) + { + fprintf(stderr, "Warning : threadpool thread could not be created :%s," + "current number of threads in pool %zu\n", e.what(), thread_count()); + warning_written = true; + } + return false; + } + return true; +} + +/** Wake a standby thread, and hand the given task over to this thread. */ +bool thread_pool_generic::wake(worker_wake_reason reason, task *) +{ + assert(reason != WAKE_REASON_NONE); + + if (m_standby_threads.empty()) + return false; + auto var= m_standby_threads.back(); + m_standby_threads.pop_back(); + m_active_threads.push_back(var); + assert(var->m_wake_reason == WAKE_REASON_NONE); + var->m_wake_reason= reason; + var->m_cv.notify_one(); + m_wakeups++; + return true; +} + + +thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) : + m_thread_data_cache(max_threads), + m_task_queue(10000), + m_standby_threads(), + m_active_threads(), + m_mtx(), + m_thread_timeout(std::chrono::milliseconds(60000)), + m_timer_interval(std::chrono::milliseconds(400)), + m_cv_no_threads(), + m_cv_timer(), + m_tasks_enqueued(), + m_tasks_dequeued(), + m_wakeups(), + m_spurious_wakeups(), + m_in_shutdown(), + m_timestamp(), + m_long_tasks_count(), + m_waiting_task_count(), + m_last_thread_creation(), + m_min_threads(min_threads), + m_max_threads(max_threads), + m_last_thread_count(), + m_last_activity(), + m_maintenance_timer(thread_pool_generic::maintenance_func, this, nullptr) +{ + set_concurrency(); + // start the timer + m_maintenance_timer.set_time(0, (int)m_timer_interval.count()); +} + + +void thread_pool_generic::maybe_wake_or_create_thread() +{ + if (m_task_queue.empty()) + return; + DBUG_ASSERT(m_active_threads.size() >= static_cast<size_t>(m_long_tasks_count + m_waiting_task_count)); + if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > m_concurrency) + return; + if (!m_standby_threads.empty()) + { + wake(WAKE_REASON_TASK); + } + else + { + add_thread(); + } +} + +bool thread_pool_generic::too_many_active_threads() +{ + return m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > + m_concurrency* OVERSUBSCRIBE_FACTOR; +} + +void thread_pool_generic::set_concurrency(unsigned int concurrency) +{ + std::unique_lock<std::mutex> lk(m_mtx); + if (concurrency == 0) + concurrency= 2 * std::thread::hardware_concurrency(); + m_concurrency = concurrency; + if (m_concurrency > m_max_threads) + m_concurrency = m_max_threads; + if (m_concurrency < m_min_threads) + m_concurrency = m_min_threads; + if (m_concurrency < 1) + m_concurrency = 1; +} + +/** Submit a new task*/ +void thread_pool_generic::submit_task(task* task) +{ + std::unique_lock<std::mutex> lk(m_mtx); + if (m_in_shutdown) + return; + task->add_ref(); + m_tasks_enqueued++; + m_task_queue.push(task); + maybe_wake_or_create_thread(); +} + + +/* Notify thread pool that current thread is going to wait */ +void thread_pool_generic::wait_begin() +{ + if (!tls_worker_data || tls_worker_data->is_long_task()) + return; + std::unique_lock<std::mutex> lk(m_mtx); + if(tls_worker_data->is_long_task()) + { + /* + Current task flag could have become "long-running" + while waiting for the lock, thus recheck. + */ + return; + } + DBUG_ASSERT(!tls_worker_data->is_waiting()); + tls_worker_data->m_state |= worker_data::WAITING; + m_waiting_task_count++; + + /* Maintain concurrency */ + maybe_wake_or_create_thread(); +} + + +void thread_pool_generic::wait_end() +{ + if (tls_worker_data && tls_worker_data->is_waiting()) + { + std::unique_lock<std::mutex> lk(m_mtx); + tls_worker_data->m_state &= ~worker_data::WAITING; + m_waiting_task_count--; + } +} + + +void thread_pool_generic::switch_timer(timer_state_t state) +{ + if (m_timer_state == state) + return; + /* + We can't use timer::set_time, because mysys timers are deadlock + prone. + + Instead, to switch off we increase the timer period + and decrease period to switch on. + + This might introduce delays in thread creation when needed, + as period will only be changed when timer fires next time. + For this reason, we can't use very long periods for the "off" state. + */ + m_timer_state= state; + long long period= (state == timer_state_t::OFF) ? + m_timer_interval.count()*10: m_timer_interval.count(); + + m_maintenance_timer.set_period((int)period); +} + + +/** + Wake up all workers, and wait until they are gone + Stop the timer. +*/ +thread_pool_generic::~thread_pool_generic() +{ + /* + Stop AIO early. + This is needed to prevent AIO completion thread + from calling submit_task() on an object that is being destroyed. + */ + m_aio.reset(); + + /* Also stop the maintanence task early. */ + m_maintenance_timer.disarm(); + + std::unique_lock<std::mutex> lk(m_mtx); + m_in_shutdown= true; + + /* Wake up idle threads. */ + while (wake(WAKE_REASON_SHUTDOWN)) + { + } + + while (thread_count()) + { + m_cv_no_threads.wait(lk); + } + + lk.unlock(); +} + +thread_pool *create_thread_pool_generic(int min_threads, int max_threads) +{ + return new thread_pool_generic(min_threads, max_threads); +} + +} // namespace tpool |