diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:32:39 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:32:39 +0000 |
commit | 56ae875861ab260b80a030f50c4aff9f9dc8fff0 (patch) | |
tree | 531412110fc901a5918c7f7442202804a83cada9 /lib/base/bulker.hpp | |
parent | Initial commit. (diff) | |
download | icinga2-56ae875861ab260b80a030f50c4aff9f9dc8fff0.tar.xz icinga2-56ae875861ab260b80a030f50c4aff9f9dc8fff0.zip |
Adding upstream version 2.14.2.upstream/2.14.2upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/base/bulker.hpp')
-rw-r--r-- | lib/base/bulker.hpp | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/lib/base/bulker.hpp b/lib/base/bulker.hpp new file mode 100644 index 0000000..2c30dc3 --- /dev/null +++ b/lib/base/bulker.hpp @@ -0,0 +1,119 @@ +/* Icinga 2 | (c) 2022 Icinga GmbH | GPLv2+ */ + +#ifndef BULKER_H +#define BULKER_H + +#include <boost/config.hpp> +#include <chrono> +#include <condition_variable> +#include <mutex> +#include <queue> +#include <utility> +#include <vector> + +namespace icinga +{ + +/** + * A queue which outputs the input as bulks of a defined size + * or after a defined time, whichever is reached first + * + * @ingroup base + */ +template<class T> +class Bulker +{ +private: + typedef std::chrono::steady_clock Clock; + +public: + typedef std::vector<T> Container; + typedef typename Container::size_type SizeType; + typedef typename Clock::duration Duration; + + Bulker(SizeType bulkSize, Duration threshold) + : m_BulkSize(bulkSize), m_Threshold(threshold), m_NextConsumption(NullTimePoint()) { } + + void ProduceOne(T needle); + Container ConsumeMany(); + SizeType Size(); + + inline SizeType GetBulkSize() const noexcept + { + return m_BulkSize; + } + +private: + typedef std::chrono::time_point<Clock> TimePoint; + + static inline + TimePoint NullTimePoint() + { + return TimePoint::min(); + } + + inline void UpdateNextConsumption() + { + m_NextConsumption = Clock::now() + m_Threshold; + } + + const SizeType m_BulkSize; + const Duration m_Threshold; + + std::mutex m_Mutex; + std::condition_variable m_CV; + std::queue<Container> m_Bulks; + TimePoint m_NextConsumption; +}; + +template<class T> +void Bulker<T>::ProduceOne(T needle) +{ + std::unique_lock<std::mutex> lock (m_Mutex); + + if (m_Bulks.empty() || m_Bulks.back().size() == m_BulkSize) { + m_Bulks.emplace(); + } + + m_Bulks.back().emplace_back(std::move(needle)); + + if (m_Bulks.size() == 1u && m_Bulks.back().size() == m_BulkSize) { + m_CV.notify_one(); + } +} + +template<class T> +typename Bulker<T>::Container Bulker<T>::ConsumeMany() +{ + std::unique_lock<std::mutex> lock (m_Mutex); + + if (BOOST_UNLIKELY(m_NextConsumption == NullTimePoint())) { + UpdateNextConsumption(); + } + + auto deadline (m_NextConsumption); + + m_CV.wait_until(lock, deadline, [this]() { return !m_Bulks.empty() && m_Bulks.front().size() == m_BulkSize; }); + UpdateNextConsumption(); + + if (m_Bulks.empty()) { + return Container(); + } + + auto haystack (std::move(m_Bulks.front())); + + m_Bulks.pop(); + return haystack; +} + +template<class T> +typename Bulker<T>::SizeType Bulker<T>::Size() +{ + std::unique_lock<std::mutex> lock (m_Mutex); + + return m_Bulks.empty() ? 0 : (m_Bulks.size() - 1u) * m_BulkSize + m_Bulks.back().size(); +} + +} + +#endif /* BULKER_H */ |