/* * Copyright (C) 2016-2018 Team Kodi * This file is part of Kodi - https://kodi.tv * * SPDX-License-Identifier: GPL-2.0-or-later * See LICENSES/README.md for more information. */ #pragma once #include "EventStreamDetail.h" #include "JobManager.h" #include "threads/CriticalSection.h" #include #include #include #include template class CEventStream { public: template void Subscribe(A* owner, void (A::*fn)(const Event&)) { auto subscription = std::make_shared>(owner, fn); std::unique_lock lock(m_criticalSection); m_subscriptions.emplace_back(std::move(subscription)); } template void Unsubscribe(A* obj) { std::vector>> toCancel; { std::unique_lock lock(m_criticalSection); auto it = m_subscriptions.begin(); while (it != m_subscriptions.end()) { if ((*it)->IsOwnedBy(obj)) { toCancel.push_back(*it); it = m_subscriptions.erase(it); } else { ++it; } } } for (auto& subscription : toCancel) subscription->Cancel(); } protected: std::vector>> m_subscriptions; CCriticalSection m_criticalSection; }; template class CEventSource : public CEventStream { public: explicit CEventSource() : m_queue(false, 1, CJob::PRIORITY_HIGH) {} template void Publish(A event) { std::unique_lock lock(this->m_criticalSection); auto& subscriptions = this->m_subscriptions; auto task = [subscriptions, event](){ for (auto& s: subscriptions) s->HandleEvent(event); }; lock.unlock(); m_queue.Submit(std::move(task)); } private: CJobQueue m_queue; }; template class CBlockingEventSource : public CEventStream { public: template void HandleEvent(A event) { std::unique_lock lock(this->m_criticalSection); for (const auto& subscription : this->m_subscriptions) { subscription->HandleEvent(event); } } };