summaryrefslogtreecommitdiffstats
path: root/xbmc/utils/EventStream.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--xbmc/utils/EventStream.h100
1 files changed, 100 insertions, 0 deletions
diff --git a/xbmc/utils/EventStream.h b/xbmc/utils/EventStream.h
new file mode 100644
index 0000000..6fcb651
--- /dev/null
+++ b/xbmc/utils/EventStream.h
@@ -0,0 +1,100 @@
+/*
+ * Copyright (C) 2016-2018 Team Kodi
+ * This file is part of Kodi - https://kodi.tv
+ *
+ * SPDX-License-Identifier: GPL-2.0-or-later
+ * See LICENSES/README.md for more information.
+ */
+
+#pragma once
+
+#include "EventStreamDetail.h"
+#include "JobManager.h"
+#include "threads/CriticalSection.h"
+
+#include <algorithm>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+
+template<typename Event>
+class CEventStream
+{
+public:
+
+ template<typename A>
+ void Subscribe(A* owner, void (A::*fn)(const Event&))
+ {
+ auto subscription = std::make_shared<detail::CSubscription<Event, A>>(owner, fn);
+ std::unique_lock<CCriticalSection> lock(m_criticalSection);
+ m_subscriptions.emplace_back(std::move(subscription));
+ }
+
+ template<typename A>
+ void Unsubscribe(A* obj)
+ {
+ std::vector<std::shared_ptr<detail::ISubscription<Event>>> toCancel;
+ {
+ std::unique_lock<CCriticalSection> lock(m_criticalSection);
+ auto it = m_subscriptions.begin();
+ while (it != m_subscriptions.end())
+ {
+ if ((*it)->IsOwnedBy(obj))
+ {
+ toCancel.push_back(*it);
+ it = m_subscriptions.erase(it);
+ }
+ else
+ {
+ ++it;
+ }
+ }
+ }
+ for (auto& subscription : toCancel)
+ subscription->Cancel();
+ }
+
+protected:
+ std::vector<std::shared_ptr<detail::ISubscription<Event>>> m_subscriptions;
+ CCriticalSection m_criticalSection;
+};
+
+
+template<typename Event>
+class CEventSource : public CEventStream<Event>
+{
+public:
+ explicit CEventSource() : m_queue(false, 1, CJob::PRIORITY_HIGH) {}
+
+ template<typename A>
+ void Publish(A event)
+ {
+ std::unique_lock<CCriticalSection> lock(this->m_criticalSection);
+ auto& subscriptions = this->m_subscriptions;
+ auto task = [subscriptions, event](){
+ for (auto& s: subscriptions)
+ s->HandleEvent(event);
+ };
+ lock.unlock();
+ m_queue.Submit(std::move(task));
+ }
+
+private:
+ CJobQueue m_queue;
+};
+
+template<typename Event>
+class CBlockingEventSource : public CEventStream<Event>
+{
+public:
+ template<typename A>
+ void HandleEvent(A event)
+ {
+ std::unique_lock<CCriticalSection> lock(this->m_criticalSection);
+ for (const auto& subscription : this->m_subscriptions)
+ {
+ subscription->HandleEvent(event);
+ }
+ }
+};