summaryrefslogtreecommitdiffstats
path: root/extra/mariabackup/thread_pool.cc
diff options
context:
space:
mode:
Diffstat (limited to 'extra/mariabackup/thread_pool.cc')
-rw-r--r--extra/mariabackup/thread_pool.cc50
1 files changed, 50 insertions, 0 deletions
diff --git a/extra/mariabackup/thread_pool.cc b/extra/mariabackup/thread_pool.cc
new file mode 100644
index 00000000..e18581f4
--- /dev/null
+++ b/extra/mariabackup/thread_pool.cc
@@ -0,0 +1,50 @@
+#include "thread_pool.h"
+#include "common.h"
+
+bool ThreadPool::start(size_t threads_count) {
+ if (!m_stopped)
+ return false;
+ m_stopped = false;
+ for (unsigned i = 0; i < threads_count; ++i)
+ m_threads.emplace_back(&ThreadPool::thread_func, this, i);
+ return true;
+}
+
+void ThreadPool::stop() {
+ if (m_stopped)
+ return;
+ m_stop = true;
+ m_cv.notify_all();
+ for (auto &t : m_threads)
+ t.join();
+ m_stopped = true;
+};
+
+void ThreadPool::push(ThreadPool::job_t &&j) {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ m_jobs.push(j);
+ lock.unlock();
+ m_cv.notify_one();
+}
+
+void ThreadPool::thread_func(unsigned thread_num) {
+ if (my_thread_init())
+ die("Can't init mysql thread");
+ std::unique_lock<std::mutex> lock(m_mutex);
+ while(true) {
+ if (m_stop)
+ goto exit;
+ while (!m_jobs.empty()) {
+ if (m_stop)
+ goto exit;
+ job_t j = std::move(m_jobs.front());
+ m_jobs.pop();
+ lock.unlock();
+ j(thread_num);
+ lock.lock();
+ }
+ m_cv.wait(lock, [&] { return m_stop || !m_jobs.empty(); });
+ }
+exit:
+ my_thread_end();
+}