diff options
Diffstat (limited to 'tpool/task.cc')
-rw-r--r-- | tpool/task.cc | 108 |
1 files changed, 108 insertions, 0 deletions
diff --git a/tpool/task.cc b/tpool/task.cc new file mode 100644 index 00000000..0b5253bc --- /dev/null +++ b/tpool/task.cc @@ -0,0 +1,108 @@ +/* Copyright (C) 2019, 2020, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include <tpool.h> +#include <queue> +#include <mutex> +#include <condition_variable> +#include <tpool_structs.h> + +namespace tpool +{ + +#ifndef DBUG_OFF +static callback_func_np after_task_callback; +void set_after_task_callback(callback_func_np cb) +{ + after_task_callback= cb; +} + +void execute_after_task_callback() +{ + if (after_task_callback) + after_task_callback(); +} +#endif + + task::task(callback_func func, void* arg, task_group* group) : + m_func(func), m_arg(arg), m_group(group) {} + + void task::execute() + { + if (m_group) + { + /* Executing in a group (limiting concurrency).*/ + m_group->execute(this); + } + else + { + /* Execute directly. */ + m_func(m_arg); + dbug_execute_after_task_callback(); + release(); + } + } + + /* Task that provide wait() operation. */ + waitable_task::waitable_task(callback_func func, void* arg, task_group* group) : + task(func,arg, group),m_mtx(),m_cv(),m_ref_count(),m_waiter_count(),m_original_func(){} + + void waitable_task::add_ref() + { + std::unique_lock<std::mutex> lk(m_mtx); + m_ref_count++; + } + + void waitable_task::release() + { + std::unique_lock<std::mutex> lk(m_mtx); + m_ref_count--; + if (!m_ref_count && m_waiter_count) + m_cv.notify_all(); + } + void waitable_task::wait(std::unique_lock<std::mutex>& lk) + { + m_waiter_count++; + while (m_ref_count) + m_cv.wait(lk); + m_waiter_count--; + } + void waitable_task::wait() + { + std::unique_lock<std::mutex> lk(m_mtx); + wait(lk); + } + + static void noop(void*) + { + } + void waitable_task::disable() + { + std::unique_lock<std::mutex> lk(m_mtx); + if (m_func == noop) + return; + wait(lk); + m_original_func = m_func; + m_func = noop; + } + void waitable_task::enable() + { + std::unique_lock<std::mutex> lk(m_mtx); + if(m_func != noop) + return; + wait(lk); + m_func = m_original_func; + } +} |