summaryrefslogtreecommitdiffstats
path: root/tpool/task.cc
diff options
context:
space:
mode:
Diffstat (limited to 'tpool/task.cc')
-rw-r--r--tpool/task.cc108
1 files changed, 108 insertions, 0 deletions
diff --git a/tpool/task.cc b/tpool/task.cc
new file mode 100644
index 00000000..0b5253bc
--- /dev/null
+++ b/tpool/task.cc
@@ -0,0 +1,108 @@
+/* Copyright (C) 2019, 2020, MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include <tpool.h>
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+#include <tpool_structs.h>
+
+namespace tpool
+{
+
+#ifndef DBUG_OFF
+static callback_func_np after_task_callback;
+void set_after_task_callback(callback_func_np cb)
+{
+ after_task_callback= cb;
+}
+
+void execute_after_task_callback()
+{
+ if (after_task_callback)
+ after_task_callback();
+}
+#endif
+
+ task::task(callback_func func, void* arg, task_group* group) :
+ m_func(func), m_arg(arg), m_group(group) {}
+
+ void task::execute()
+ {
+ if (m_group)
+ {
+ /* Executing in a group (limiting concurrency).*/
+ m_group->execute(this);
+ }
+ else
+ {
+ /* Execute directly. */
+ m_func(m_arg);
+ dbug_execute_after_task_callback();
+ release();
+ }
+ }
+
+ /* Task that provide wait() operation. */
+ waitable_task::waitable_task(callback_func func, void* arg, task_group* group) :
+ task(func,arg, group),m_mtx(),m_cv(),m_ref_count(),m_waiter_count(),m_original_func(){}
+
+ void waitable_task::add_ref()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_ref_count++;
+ }
+
+ void waitable_task::release()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_ref_count--;
+ if (!m_ref_count && m_waiter_count)
+ m_cv.notify_all();
+ }
+ void waitable_task::wait(std::unique_lock<std::mutex>& lk)
+ {
+ m_waiter_count++;
+ while (m_ref_count)
+ m_cv.wait(lk);
+ m_waiter_count--;
+ }
+ void waitable_task::wait()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ wait(lk);
+ }
+
+ static void noop(void*)
+ {
+ }
+ void waitable_task::disable()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (m_func == noop)
+ return;
+ wait(lk);
+ m_original_func = m_func;
+ m_func = noop;
+ }
+ void waitable_task::enable()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if(m_func != noop)
+ return;
+ wait(lk);
+ m_func = m_original_func;
+ }
+}