diff options
Diffstat (limited to 'lib/base/workqueue.hpp')
-rw-r--r-- | lib/base/workqueue.hpp | 154 |
1 files changed, 154 insertions, 0 deletions
diff --git a/lib/base/workqueue.hpp b/lib/base/workqueue.hpp new file mode 100644 index 0000000..9c8a6b8 --- /dev/null +++ b/lib/base/workqueue.hpp @@ -0,0 +1,154 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef WORKQUEUE_H +#define WORKQUEUE_H + +#include "base/i2-base.hpp" +#include "base/timer.hpp" +#include "base/ringbuffer.hpp" +#include "base/logger.hpp" +#include <boost/thread/thread.hpp> +#include <boost/exception_ptr.hpp> +#include <condition_variable> +#include <mutex> +#include <queue> +#include <deque> +#include <atomic> + +namespace icinga +{ + +enum WorkQueuePriority +{ + PriorityLow = 0, + PriorityNormal = 1, + PriorityHigh = 2, + PriorityImmediate = 4 +}; + +using TaskFunction = std::function<void ()>; + +struct Task +{ + Task() = default; + + Task(TaskFunction function, WorkQueuePriority priority, int id) + : Function(std::move(function)), Priority(priority), ID(id) + { } + + TaskFunction Function; + WorkQueuePriority Priority{PriorityNormal}; + int ID{-1}; +}; + +bool operator<(const Task& a, const Task& b); + +/** + * A workqueue. + * + * @ingroup base + */ +class WorkQueue +{ +public: + typedef std::function<void (boost::exception_ptr)> ExceptionCallback; + + WorkQueue(size_t maxItems = 0, int threadCount = 1, LogSeverity statsLogLevel = LogInformation); + ~WorkQueue(); + + void SetName(const String& name); + String GetName() const; + + std::unique_lock<std::mutex> AcquireLock(); + void EnqueueUnlocked(std::unique_lock<std::mutex>& lock, TaskFunction&& function, WorkQueuePriority priority = PriorityNormal); + void Enqueue(TaskFunction&& function, WorkQueuePriority priority = PriorityNormal, + bool allowInterleaved = false); + void Join(bool stop = false); + + template<typename VectorType, typename FuncType> + void ParallelFor(const VectorType& items, const FuncType& func) + { + ParallelFor(items, true, func); + } + + template<typename VectorType, typename FuncType> + void ParallelFor(const VectorType& items, bool preChunk, const FuncType& func) + { + using SizeType = decltype(items.size()); + + SizeType totalCount = items.size(); + SizeType chunks = preChunk ? m_ThreadCount : totalCount; + + auto lock = AcquireLock(); + + SizeType offset = 0; + + for (SizeType i = 0; i < chunks; i++) { + SizeType count = totalCount / chunks; + if (i < totalCount % chunks) + count++; + + EnqueueUnlocked(lock, [&items, func, offset, count, this]() { + for (SizeType j = offset; j < offset + count; j++) { + RunTaskFunction([&func, &items, j]() { + func(items[j]); + }); + } + }); + + offset += count; + } + + ASSERT(offset == items.size()); + } + + bool IsWorkerThread() const; + + size_t GetLength() const; + size_t GetTaskCount(RingBuffer::SizeType span); + + void SetExceptionCallback(const ExceptionCallback& callback); + + bool HasExceptions() const; + std::vector<boost::exception_ptr> GetExceptions() const; + void ReportExceptions(const String& facility, bool verbose = false) const; + +protected: + void IncreaseTaskCount(); + +private: + int m_ID; + String m_Name; + static std::atomic<int> m_NextID; + int m_ThreadCount; + bool m_Spawned{false}; + + mutable std::mutex m_Mutex; + std::condition_variable m_CVEmpty; + std::condition_variable m_CVFull; + std::condition_variable m_CVStarved; + boost::thread_group m_Threads; + size_t m_MaxItems; + bool m_Stopped{false}; + int m_Processing{0}; + std::priority_queue<Task, std::deque<Task> > m_Tasks; + int m_NextTaskID{0}; + ExceptionCallback m_ExceptionCallback; + std::vector<boost::exception_ptr> m_Exceptions; + Timer::Ptr m_StatusTimer; + double m_StatusTimerTimeout; + LogSeverity m_StatsLogLevel; + + RingBuffer m_TaskStats; + size_t m_PendingTasks{0}; + double m_PendingTasksTimestamp{0}; + + void WorkerThreadProc(); + void StatusTimerHandler(); + + void RunTaskFunction(const TaskFunction& func); +}; + +} + +#endif /* WORKQUEUE_H */ |