summaryrefslogtreecommitdiffstats
path: root/tpool/task_group.cc
diff options
context:
space:
mode:
Diffstat (limited to 'tpool/task_group.cc')
-rw-r--r--tpool/task_group.cc100
1 files changed, 100 insertions, 0 deletions
diff --git a/tpool/task_group.cc b/tpool/task_group.cc
new file mode 100644
index 00000000..97fbb091
--- /dev/null
+++ b/tpool/task_group.cc
@@ -0,0 +1,100 @@
+/* Copyright(C) 2019 MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include <tpool.h>
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+#include <tpool_structs.h>
+#include <thread>
+#include <assert.h>
+#ifndef _WIN32
+#include <unistd.h> // usleep
+#endif
+namespace tpool
+{
+ task_group::task_group(unsigned int max_concurrency) :
+ m_queue(8),
+ m_mtx(),
+ m_tasks_running(),
+ m_max_concurrent_tasks(max_concurrency)
+ {};
+
+ void task_group::set_max_tasks(unsigned int max_concurrency)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_max_concurrent_tasks = max_concurrency;
+ }
+ void task_group::execute(task* t)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (m_tasks_running == m_max_concurrent_tasks)
+ {
+ /* Queue for later execution by another thread.*/
+ m_queue.push(t);
+ return;
+ }
+ m_tasks_running++;
+ for (;;)
+ {
+ lk.unlock();
+ if (t)
+ {
+ t->m_func(t->m_arg);
+ dbug_execute_after_task_callback();
+ t->release();
+ }
+ lk.lock();
+
+ if (m_queue.empty())
+ break;
+ t = m_queue.front();
+ m_queue.pop();
+ }
+ m_tasks_running--;
+ }
+
+ void task_group::cancel_pending(task* t)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (!t)
+ m_queue.clear();
+ for (auto it = m_queue.begin(); it != m_queue.end(); it++)
+ {
+ if (*it == t)
+ {
+ (*it)->release();
+ (*it) = nullptr;
+ }
+ }
+ }
+
+ task_group::~task_group()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ assert(m_queue.empty());
+
+ while (m_tasks_running)
+ {
+ lk.unlock();
+#ifndef _WIN32
+ usleep(1000);
+#else
+ Sleep(1);
+#endif
+ lk.lock();
+ }
+ }
+}