summaryrefslogtreecommitdiffstats
path: root/lib/remote/eventqueue.hpp
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:34:54 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:34:54 +0000
commit0915b3ef56dfac3113cce55a59a5765dc94976be (patch)
treea8fea11d50b4f083e1bf0f90025ece7f0824784a /lib/remote/eventqueue.hpp
parentInitial commit. (diff)
downloadicinga2-upstream.tar.xz
icinga2-upstream.zip
Adding upstream version 2.13.6.upstream/2.13.6upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--lib/remote/eventqueue.hpp177
1 files changed, 177 insertions, 0 deletions
diff --git a/lib/remote/eventqueue.hpp b/lib/remote/eventqueue.hpp
new file mode 100644
index 0000000..32bd34a
--- /dev/null
+++ b/lib/remote/eventqueue.hpp
@@ -0,0 +1,177 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#ifndef EVENTQUEUE_H
+#define EVENTQUEUE_H
+
+#include "remote/httphandler.hpp"
+#include "base/object.hpp"
+#include "config/expression.hpp"
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/asio/spawn.hpp>
+#include <condition_variable>
+#include <cstddef>
+#include <cstdint>
+#include <mutex>
+#include <set>
+#include <map>
+#include <deque>
+#include <queue>
+
+namespace icinga
+{
+
+class EventQueue final : public Object
+{
+public:
+ DECLARE_PTR_TYPEDEFS(EventQueue);
+
+ EventQueue(String name);
+
+ bool CanProcessEvent(const String& type) const;
+ void ProcessEvent(const Dictionary::Ptr& event);
+ void AddClient(void *client);
+ void RemoveClient(void *client);
+
+ void SetTypes(const std::set<String>& types);
+ void SetFilter(std::unique_ptr<Expression> filter);
+
+ Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
+
+ static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
+ static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);
+
+ static EventQueue::Ptr GetByName(const String& name);
+ static void Register(const String& name, const EventQueue::Ptr& function);
+ static void Unregister(const String& name);
+
+private:
+ String m_Name;
+
+ mutable std::mutex m_Mutex;
+ std::condition_variable m_CV;
+
+ std::set<String> m_Types;
+ std::unique_ptr<Expression> m_Filter;
+
+ std::map<void *, std::deque<Dictionary::Ptr> > m_Events;
+};
+
+/**
+ * A registry for API event queues.
+ *
+ * @ingroup base
+ */
+class EventQueueRegistry : public Registry<EventQueueRegistry, EventQueue::Ptr>
+{
+public:
+ static EventQueueRegistry *GetInstance();
+};
+
+enum class EventType : uint_fast8_t
+{
+ AcknowledgementCleared,
+ AcknowledgementSet,
+ CheckResult,
+ CommentAdded,
+ CommentRemoved,
+ DowntimeAdded,
+ DowntimeRemoved,
+ DowntimeStarted,
+ DowntimeTriggered,
+ Flapping,
+ Notification,
+ StateChange,
+ ObjectCreated,
+ ObjectDeleted,
+ ObjectModified
+};
+
+class EventsInbox : public Object
+{
+public:
+ DECLARE_PTR_TYPEDEFS(EventsInbox);
+
+ EventsInbox(String filter, const String& filterSource);
+ EventsInbox(const EventsInbox&) = delete;
+ EventsInbox(EventsInbox&&) = delete;
+ EventsInbox& operator=(const EventsInbox&) = delete;
+ EventsInbox& operator=(EventsInbox&&) = delete;
+ ~EventsInbox();
+
+ const Expression::Ptr& GetFilter();
+
+ void Push(Dictionary::Ptr event);
+ Dictionary::Ptr Shift(boost::asio::yield_context yc, double timeout = 5);
+
+private:
+ struct Filter
+ {
+ std::size_t Refs;
+ Expression::Ptr Expr;
+ };
+
+ static std::mutex m_FiltersMutex;
+ static std::map<String, Filter> m_Filters;
+
+ std::mutex m_Mutex;
+ decltype(m_Filters.begin()) m_Filter;
+ std::queue<Dictionary::Ptr> m_Queue;
+ boost::asio::deadline_timer m_Timer;
+};
+
+class EventsSubscriber
+{
+public:
+ EventsSubscriber(std::set<EventType> types, String filter, const String& filterSource);
+ EventsSubscriber(const EventsSubscriber&) = delete;
+ EventsSubscriber(EventsSubscriber&&) = delete;
+ EventsSubscriber& operator=(const EventsSubscriber&) = delete;
+ EventsSubscriber& operator=(EventsSubscriber&&) = delete;
+ ~EventsSubscriber();
+
+ const EventsInbox::Ptr& GetInbox();
+
+private:
+ std::set<EventType> m_Types;
+ EventsInbox::Ptr m_Inbox;
+};
+
+class EventsFilter
+{
+public:
+ EventsFilter(std::map<Expression::Ptr, std::set<EventsInbox::Ptr>> inboxes);
+
+ operator bool();
+
+ void Push(Dictionary::Ptr event);
+
+private:
+ std::map<Expression::Ptr, std::set<EventsInbox::Ptr>> m_Inboxes;
+};
+
+class EventsRouter
+{
+public:
+ static EventsRouter& GetInstance();
+
+ void Subscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox);
+ void Unsubscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox);
+ EventsFilter GetInboxes(EventType type);
+
+private:
+ static EventsRouter m_Instance;
+
+ EventsRouter() = default;
+ EventsRouter(const EventsRouter&) = delete;
+ EventsRouter(EventsRouter&&) = delete;
+ EventsRouter& operator=(const EventsRouter&) = delete;
+ EventsRouter& operator=(EventsRouter&&) = delete;
+ ~EventsRouter() = default;
+
+ std::mutex m_Mutex;
+ std::map<EventType, std::map<Expression::Ptr, std::set<EventsInbox::Ptr>>> m_Subscribers;
+};
+
+}
+
+#endif /* EVENTQUEUE_H */