From 06eaf7232e9a920468c0f8d74dcf2fe8b555501c Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 13 Apr 2024 14:24:36 +0200 Subject: Adding upstream version 1:10.11.6. Signed-off-by: Daniel Baumann --- tpool/task_group.cc | 115 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 tpool/task_group.cc (limited to 'tpool/task_group.cc') diff --git a/tpool/task_group.cc b/tpool/task_group.cc new file mode 100644 index 00000000..eb57a8be --- /dev/null +++ b/tpool/task_group.cc @@ -0,0 +1,115 @@ +/* Copyright(C) 2019 MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include +#include +#include +#include +#include +#include +#include +#ifndef _WIN32 +#include // usleep +#endif +namespace tpool +{ + + /** + Task_group constructor + + @param max_threads - maximum number of threads allowed to execute + tasks from the group at the same time. + + @param enable_task_release - if true (default), task::release() will be + called after task execution.'false' should only be used in rare cases + when accessing memory, pointed by task structures, would be unsafe after. + the callback. Also 'false' is only possible ,if task::release() is a trivial function + */ + task_group::task_group(unsigned int max_concurrency, + bool enable_task_release) + : + m_queue(8), + m_mtx(), + m_tasks_running(), + m_max_concurrent_tasks(max_concurrency), + m_enable_task_release(enable_task_release) + {}; + + void task_group::set_max_tasks(unsigned int max_concurrency) + { + std::unique_lock lk(m_mtx); + m_max_concurrent_tasks = max_concurrency; + } + void task_group::execute(task* t) + { + std::unique_lock lk(m_mtx); + if (m_tasks_running == m_max_concurrent_tasks) + { + /* Queue for later execution by another thread.*/ + m_queue.push(t); + return; + } + m_tasks_running++; + for (;;) + { + lk.unlock(); + if (t) + { + t->m_func(t->m_arg); + if (m_enable_task_release) + t->release(); + } + lk.lock(); + + if (m_queue.empty()) + break; + t = m_queue.front(); + m_queue.pop(); + } + m_tasks_running--; + } + + void task_group::cancel_pending(task* t) + { + std::unique_lock lk(m_mtx); + if (!t) + m_queue.clear(); + for (auto it = m_queue.begin(); it != m_queue.end(); it++) + { + if (*it == t) + { + (*it)->release(); + (*it) = nullptr; + } + } + } + + task_group::~task_group() + { + std::unique_lock lk(m_mtx); + assert(m_queue.empty()); + + while (m_tasks_running) + { + lk.unlock(); +#ifndef _WIN32 + usleep(1000); +#else + Sleep(1); +#endif + lk.lock(); + } + } +} -- cgit v1.2.3