From 0915b3ef56dfac3113cce55a59a5765dc94976be Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 14:34:54 +0200 Subject: Adding upstream version 2.13.6. Signed-off-by: Daniel Baumann --- lib/base/threadpool.hpp | 98 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 lib/base/threadpool.hpp (limited to 'lib/base/threadpool.hpp') diff --git a/lib/base/threadpool.hpp b/lib/base/threadpool.hpp new file mode 100644 index 0000000..af351cd --- /dev/null +++ b/lib/base/threadpool.hpp @@ -0,0 +1,98 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef THREADPOOL_H +#define THREADPOOL_H + +#include "base/atomic.hpp" +#include "base/exception.hpp" +#include "base/logger.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace icinga +{ + +enum SchedulerPolicy +{ + DefaultScheduler, + LowLatencyScheduler +}; + +/** + * A thread pool. + * + * @ingroup base + */ +class ThreadPool +{ +public: + typedef std::function WorkFunction; + + ThreadPool(size_t threads = std::thread::hardware_concurrency() * 2u); + ~ThreadPool(); + + void Start(); + void Stop(); + + /** + * Appends a work item to the work queue. Work items will be processed in FIFO order. + * + * @param callback The callback function for the work item. + * @returns true if the item was queued, false otherwise. + */ + template + bool Post(T callback, SchedulerPolicy) + { + boost::shared_lock lock (m_Mutex); + + if (m_Pool) { + m_Pending.fetch_add(1); + + boost::asio::post(*m_Pool, [this, callback]() { + m_Pending.fetch_sub(1); + + try { + callback(); + } catch (const std::exception& ex) { + Log(LogCritical, "ThreadPool") + << "Exception thrown in event handler:\n" + << DiagnosticInformation(ex); + } catch (...) { + Log(LogCritical, "ThreadPool", "Exception of unknown type thrown in event handler."); + } + }); + + return true; + } else { + return false; + } + } + + /** + * Returns the amount of queued tasks not started yet. + * + * @returns amount of queued tasks. + */ + inline uint_fast64_t GetPending() + { + return m_Pending.load(); + } + +private: + boost::shared_mutex m_Mutex; + std::unique_ptr m_Pool; + size_t m_Threads; + Atomic m_Pending; +}; + +} + +#endif /* THREADPOOL_H */ -- cgit v1.2.3