summaryrefslogtreecommitdiffstats
path: root/tpool/task_group.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:24:36 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:24:36 +0000
commit06eaf7232e9a920468c0f8d74dcf2fe8b555501c (patch)
treee2c7b5777f728320e5b5542b6213fd3591ba51e2 /tpool/task_group.cc
parentInitial commit. (diff)
downloadmariadb-06eaf7232e9a920468c0f8d74dcf2fe8b555501c.tar.xz
mariadb-06eaf7232e9a920468c0f8d74dcf2fe8b555501c.zip
Adding upstream version 1:10.11.6.upstream/1%10.11.6
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tpool/task_group.cc')
-rw-r--r--tpool/task_group.cc115
1 files changed, 115 insertions, 0 deletions
diff --git a/tpool/task_group.cc b/tpool/task_group.cc
new file mode 100644
index 00000000..eb57a8be
--- /dev/null
+++ b/tpool/task_group.cc
@@ -0,0 +1,115 @@
+/* Copyright(C) 2019 MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include <tpool.h>
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+#include <tpool_structs.h>
+#include <thread>
+#include <assert.h>
+#ifndef _WIN32
+#include <unistd.h> // usleep
+#endif
+namespace tpool
+{
+
+ /**
+ Task_group constructor
+
+ @param max_threads - maximum number of threads allowed to execute
+ tasks from the group at the same time.
+
+ @param enable_task_release - if true (default), task::release() will be
+ called after task execution.'false' should only be used in rare cases
+ when accessing memory, pointed by task structures, would be unsafe after.
+ the callback. Also 'false' is only possible ,if task::release() is a trivial function
+ */
+ task_group::task_group(unsigned int max_concurrency,
+ bool enable_task_release)
+ :
+ m_queue(8),
+ m_mtx(),
+ m_tasks_running(),
+ m_max_concurrent_tasks(max_concurrency),
+ m_enable_task_release(enable_task_release)
+ {};
+
+ void task_group::set_max_tasks(unsigned int max_concurrency)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_max_concurrent_tasks = max_concurrency;
+ }
+ void task_group::execute(task* t)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (m_tasks_running == m_max_concurrent_tasks)
+ {
+ /* Queue for later execution by another thread.*/
+ m_queue.push(t);
+ return;
+ }
+ m_tasks_running++;
+ for (;;)
+ {
+ lk.unlock();
+ if (t)
+ {
+ t->m_func(t->m_arg);
+ if (m_enable_task_release)
+ t->release();
+ }
+ lk.lock();
+
+ if (m_queue.empty())
+ break;
+ t = m_queue.front();
+ m_queue.pop();
+ }
+ m_tasks_running--;
+ }
+
+ void task_group::cancel_pending(task* t)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (!t)
+ m_queue.clear();
+ for (auto it = m_queue.begin(); it != m_queue.end(); it++)
+ {
+ if (*it == t)
+ {
+ (*it)->release();
+ (*it) = nullptr;
+ }
+ }
+ }
+
+ task_group::~task_group()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ assert(m_queue.empty());
+
+ while (m_tasks_running)
+ {
+ lk.unlock();
+#ifndef _WIN32
+ usleep(1000);
+#else
+ Sleep(1);
+#endif
+ lk.lock();
+ }
+ }
+}