From 0915b3ef56dfac3113cce55a59a5765dc94976be Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 14:34:54 +0200 Subject: Adding upstream version 2.13.6. Signed-off-by: Daniel Baumann --- lib/remote/eventqueue.hpp | 177 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 lib/remote/eventqueue.hpp (limited to 'lib/remote/eventqueue.hpp') diff --git a/lib/remote/eventqueue.hpp b/lib/remote/eventqueue.hpp new file mode 100644 index 0000000..32bd34a --- /dev/null +++ b/lib/remote/eventqueue.hpp @@ -0,0 +1,177 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef EVENTQUEUE_H +#define EVENTQUEUE_H + +#include "remote/httphandler.hpp" +#include "base/object.hpp" +#include "config/expression.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace icinga +{ + +class EventQueue final : public Object +{ +public: + DECLARE_PTR_TYPEDEFS(EventQueue); + + EventQueue(String name); + + bool CanProcessEvent(const String& type) const; + void ProcessEvent(const Dictionary::Ptr& event); + void AddClient(void *client); + void RemoveClient(void *client); + + void SetTypes(const std::set& types); + void SetFilter(std::unique_ptr filter); + + Dictionary::Ptr WaitForEvent(void *client, double timeout = 5); + + static std::vector GetQueuesForType(const String& type); + static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue); + + static EventQueue::Ptr GetByName(const String& name); + static void Register(const String& name, const EventQueue::Ptr& function); + static void Unregister(const String& name); + +private: + String m_Name; + + mutable std::mutex m_Mutex; + std::condition_variable m_CV; + + std::set m_Types; + std::unique_ptr m_Filter; + + std::map > m_Events; +}; + +/** + * A registry for API event queues. + * + * @ingroup base + */ +class EventQueueRegistry : public Registry +{ +public: + static EventQueueRegistry *GetInstance(); +}; + +enum class EventType : uint_fast8_t +{ + AcknowledgementCleared, + AcknowledgementSet, + CheckResult, + CommentAdded, + CommentRemoved, + DowntimeAdded, + DowntimeRemoved, + DowntimeStarted, + DowntimeTriggered, + Flapping, + Notification, + StateChange, + ObjectCreated, + ObjectDeleted, + ObjectModified +}; + +class EventsInbox : public Object +{ +public: + DECLARE_PTR_TYPEDEFS(EventsInbox); + + EventsInbox(String filter, const String& filterSource); + EventsInbox(const EventsInbox&) = delete; + EventsInbox(EventsInbox&&) = delete; + EventsInbox& operator=(const EventsInbox&) = delete; + EventsInbox& operator=(EventsInbox&&) = delete; + ~EventsInbox(); + + const Expression::Ptr& GetFilter(); + + void Push(Dictionary::Ptr event); + Dictionary::Ptr Shift(boost::asio::yield_context yc, double timeout = 5); + +private: + struct Filter + { + std::size_t Refs; + Expression::Ptr Expr; + }; + + static std::mutex m_FiltersMutex; + static std::map m_Filters; + + std::mutex m_Mutex; + decltype(m_Filters.begin()) m_Filter; + std::queue m_Queue; + boost::asio::deadline_timer m_Timer; +}; + +class EventsSubscriber +{ +public: + EventsSubscriber(std::set types, String filter, const String& filterSource); + EventsSubscriber(const EventsSubscriber&) = delete; + EventsSubscriber(EventsSubscriber&&) = delete; + EventsSubscriber& operator=(const EventsSubscriber&) = delete; + EventsSubscriber& operator=(EventsSubscriber&&) = delete; + ~EventsSubscriber(); + + const EventsInbox::Ptr& GetInbox(); + +private: + std::set m_Types; + EventsInbox::Ptr m_Inbox; +}; + +class EventsFilter +{ +public: + EventsFilter(std::map> inboxes); + + operator bool(); + + void Push(Dictionary::Ptr event); + +private: + std::map> m_Inboxes; +}; + +class EventsRouter +{ +public: + static EventsRouter& GetInstance(); + + void Subscribe(const std::set& types, const EventsInbox::Ptr& inbox); + void Unsubscribe(const std::set& types, const EventsInbox::Ptr& inbox); + EventsFilter GetInboxes(EventType type); + +private: + static EventsRouter m_Instance; + + EventsRouter() = default; + EventsRouter(const EventsRouter&) = delete; + EventsRouter(EventsRouter&&) = delete; + EventsRouter& operator=(const EventsRouter&) = delete; + EventsRouter& operator=(EventsRouter&&) = delete; + ~EventsRouter() = default; + + std::mutex m_Mutex; + std::map>> m_Subscribers; +}; + +} + +#endif /* EVENTQUEUE_H */ -- cgit v1.2.3