diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:32:39 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:32:39 +0000 |
commit | 56ae875861ab260b80a030f50c4aff9f9dc8fff0 (patch) | |
tree | 531412110fc901a5918c7f7442202804a83cada9 /lib/base/threadpool.hpp | |
parent | Initial commit. (diff) | |
download | icinga2-56ae875861ab260b80a030f50c4aff9f9dc8fff0.tar.xz icinga2-56ae875861ab260b80a030f50c4aff9f9dc8fff0.zip |
Adding upstream version 2.14.2.upstream/2.14.2upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/base/threadpool.hpp')
-rw-r--r-- | lib/base/threadpool.hpp | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/lib/base/threadpool.hpp b/lib/base/threadpool.hpp new file mode 100644 index 0000000..d30fa69 --- /dev/null +++ b/lib/base/threadpool.hpp @@ -0,0 +1,101 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef THREADPOOL_H +#define THREADPOOL_H + +#include "base/atomic.hpp" +#include "base/configuration.hpp" +#include "base/exception.hpp" +#include "base/logger.hpp" +#include <cstddef> +#include <exception> +#include <functional> +#include <memory> +#include <thread> +#include <boost/asio/post.hpp> +#include <boost/asio/thread_pool.hpp> +#include <boost/thread/locks.hpp> +#include <boost/thread/shared_mutex.hpp> +#include <cstdint> + +namespace icinga +{ + +enum SchedulerPolicy +{ + DefaultScheduler, + LowLatencyScheduler +}; + +/** + * A thread pool. + * + * @ingroup base + */ +class ThreadPool +{ +public: + typedef std::function<void ()> WorkFunction; + + ThreadPool(); + ~ThreadPool(); + + void Start(); + void Stop(); + void Restart(); + + /** + * Appends a work item to the work queue. Work items will be processed in FIFO order. + * + * @param callback The callback function for the work item. + * @returns true if the item was queued, false otherwise. + */ + template<class T> + bool Post(T callback, SchedulerPolicy) + { + boost::shared_lock<decltype(m_Mutex)> lock (m_Mutex); + + if (m_Pool) { + m_Pending.fetch_add(1); + + boost::asio::post(*m_Pool, [this, callback]() { + m_Pending.fetch_sub(1); + + try { + callback(); + } catch (const std::exception& ex) { + Log(LogCritical, "ThreadPool") + << "Exception thrown in event handler:\n" + << DiagnosticInformation(ex); + } catch (...) { + Log(LogCritical, "ThreadPool", "Exception of unknown type thrown in event handler."); + } + }); + + return true; + } else { + return false; + } + } + + /** + * Returns the amount of queued tasks not started yet. + * + * @returns amount of queued tasks. + */ + inline uint_fast64_t GetPending() + { + return m_Pending.load(); + } + +private: + boost::shared_mutex m_Mutex; + std::unique_ptr<boost::asio::thread_pool> m_Pool; + Atomic<uint_fast64_t> m_Pending; + + void InitializePool(); +}; + +} + +#endif /* THREADPOOL_H */ |