From 56ae875861ab260b80a030f50c4aff9f9dc8fff0 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 13 Apr 2024 13:32:39 +0200 Subject: Adding upstream version 2.14.2. Signed-off-by: Daniel Baumann --- lib/base/threadpool.hpp | 101 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 lib/base/threadpool.hpp (limited to 'lib/base/threadpool.hpp') diff --git a/lib/base/threadpool.hpp b/lib/base/threadpool.hpp new file mode 100644 index 0000000..d30fa69 --- /dev/null +++ b/lib/base/threadpool.hpp @@ -0,0 +1,101 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef THREADPOOL_H +#define THREADPOOL_H + +#include "base/atomic.hpp" +#include "base/configuration.hpp" +#include "base/exception.hpp" +#include "base/logger.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace icinga +{ + +enum SchedulerPolicy +{ + DefaultScheduler, + LowLatencyScheduler +}; + +/** + * A thread pool. + * + * @ingroup base + */ +class ThreadPool +{ +public: + typedef std::function WorkFunction; + + ThreadPool(); + ~ThreadPool(); + + void Start(); + void Stop(); + void Restart(); + + /** + * Appends a work item to the work queue. Work items will be processed in FIFO order. + * + * @param callback The callback function for the work item. + * @returns true if the item was queued, false otherwise. + */ + template + bool Post(T callback, SchedulerPolicy) + { + boost::shared_lock lock (m_Mutex); + + if (m_Pool) { + m_Pending.fetch_add(1); + + boost::asio::post(*m_Pool, [this, callback]() { + m_Pending.fetch_sub(1); + + try { + callback(); + } catch (const std::exception& ex) { + Log(LogCritical, "ThreadPool") + << "Exception thrown in event handler:\n" + << DiagnosticInformation(ex); + } catch (...) { + Log(LogCritical, "ThreadPool", "Exception of unknown type thrown in event handler."); + } + }); + + return true; + } else { + return false; + } + } + + /** + * Returns the amount of queued tasks not started yet. + * + * @returns amount of queued tasks. + */ + inline uint_fast64_t GetPending() + { + return m_Pending.load(); + } + +private: + boost::shared_mutex m_Mutex; + std::unique_ptr m_Pool; + Atomic m_Pending; + + void InitializePool(); +}; + +} + +#endif /* THREADPOOL_H */ -- cgit v1.2.3