/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim:set ts=2 sw=2 sts=2 et cindent: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this file, * You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "MediaEventSource.h" #include "MediaTimer.h" #include "nsDeque.h" #ifndef DOM_MEDIA_PACER_H_ # define DOM_MEDIA_PACER_H_ extern mozilla::LazyLogModule gMediaPipelineLog; # define LOG(level, msg, ...) \ MOZ_LOG(gMediaPipelineLog, level, (msg, ##__VA_ARGS__)) namespace mozilla { /** * Pacer takes a queue of Ts tied to timestamps, and emits PacedItemEvents * for every T at its corresponding timestamp. * * The queue is ordered. Enqueing an item at time t will drop all items at times * later than T. This is because of how video sources work (some send out frames * in the future, some don't), and to allow swapping one source for another. * * It supports a duplication interval. If there is no new item enqueued within * the duplication interval since the last enqueued item, the last enqueud item * is emitted again. */ template class Pacer { public: NS_INLINE_DECL_THREADSAFE_REFCOUNTING(Pacer) Pacer(already_AddRefed aTarget, TimeDuration aDuplicationInterval) : mTarget(aTarget), mDuplicationInterval(aDuplicationInterval), mTimer(MakeAndAddRef>()) { LOG(LogLevel::Info, "Pacer %p constructed. Duplication interval is %.2fms", this, mDuplicationInterval.ToMilliseconds()); } /** * Enqueues an item and schedules a timer to pass it on to PacedItemEvent() at * t=aTime. Already queued items with t>=aTime will be dropped. */ void Enqueue(T aItem, TimeStamp aTime) { LOG(LogLevel::Verbose, "Pacer %p: Enqueue t=%.4fs now=%.4fs", this, (aTime - mStart).ToSeconds(), (TimeStamp::Now() - mStart).ToSeconds()); MOZ_ALWAYS_SUCCEEDS(mTarget->Dispatch(NS_NewRunnableFunction( __func__, [this, self = RefPtr(this), aItem = std::move(aItem), aTime] { MOZ_DIAGNOSTIC_ASSERT(!mIsShutdown); LOG(LogLevel::Verbose, "Pacer %p: InnerEnqueue t=%.4fs, now=%.4fs", self.get(), (aTime - mStart).ToSeconds(), (TimeStamp::Now() - mStart).ToSeconds()); while (const auto* item = mQueue.Peek()) { if (item->mTime < aTime) { break; } RefPtr dropping = mQueue.Pop(); } mQueue.Push(MakeAndAddRef(std::move(aItem), aTime, false)); EnsureTimerScheduled(aTime); }))); } void SetDuplicationInterval(TimeDuration aInterval) { LOG(LogLevel::Info, "Pacer %p: SetDuplicationInterval(%.3fs) now=%.4fs", this, aInterval.ToSeconds(), (TimeStamp::Now() - mStart).ToSeconds()); MOZ_ALWAYS_SUCCEEDS(mTarget->Dispatch(NS_NewRunnableFunction( __func__, [this, self = RefPtr(this), aInterval] { LOG(LogLevel::Debug, "Pacer %p: InnerSetDuplicationInterval(%.3fs) now=%.4fs", self.get(), aInterval.ToSeconds(), (TimeStamp::Now() - mStart).ToSeconds()); if (auto* next = mQueue.PeekFront(); next && next->mIsDuplicate) { // Adjust the time of the next duplication frame. next->mTime = std::max(TimeStamp::Now(), next->mTime - mDuplicationInterval + aInterval); EnsureTimerScheduled(next->mTime); } mDuplicationInterval = aInterval; }))); } RefPtr Shutdown() { LOG(LogLevel::Info, "Pacer %p: Shutdown, now=%.4fs", this, (TimeStamp::Now() - mStart).ToSeconds()); return InvokeAsync(mTarget, __func__, [this, self = RefPtr(this)] { LOG(LogLevel::Debug, "Pacer %p: InnerShutdown, now=%.4fs", self.get(), (TimeStamp::Now() - mStart).ToSeconds()); mIsShutdown = true; mTimer->Cancel(); mQueue.Erase(); mCurrentTimerTarget = Nothing(); return GenericPromise::CreateAndResolve(true, "Pacer::Shutdown"); }); } MediaEventSourceExc& PacedItemEvent() { return mPacedItemEvent; } protected: ~Pacer() = default; void EnsureTimerScheduled(TimeStamp aTime) { if (mCurrentTimerTarget && *mCurrentTimerTarget <= aTime) { return; } if (mCurrentTimerTarget) { mTimer->Cancel(); mCurrentTimerTarget = Nothing(); } LOG(LogLevel::Verbose, "Pacer %p: Waiting until t=%.4fs", this, (aTime - mStart).ToSeconds()); mTimer->WaitUntil(aTime, __func__) ->Then( mTarget, __func__, [this, self = RefPtr(this), aTime] { LOG(LogLevel::Verbose, "Pacer %p: OnTimerTick t=%.4fs, now=%.4fs", self.get(), (aTime - mStart).ToSeconds(), (TimeStamp::Now() - mStart).ToSeconds()); OnTimerTick(); }, [] { // Timer was rejected. This is fine. }); mCurrentTimerTarget = Some(aTime); } void OnTimerTick() { MOZ_ASSERT(mTarget->IsOnCurrentThread()); mCurrentTimerTarget = Nothing(); while (RefPtr item = mQueue.PopFront()) { auto now = TimeStamp::Now(); if (item->mTime <= now) { // It's time to process this item. if (const auto& next = mQueue.PeekFront(); !next || next->mTime > (item->mTime + mDuplicationInterval)) { // No future frame within the duplication interval exists. Schedule // a copy. mQueue.PushFront(MakeAndAddRef( item->mItem, item->mTime + mDuplicationInterval, true)); } LOG(LogLevel::Verbose, "Pacer %p: NotifyPacedItem t=%.4fs, now=%.4fs", this, (item->mTime - mStart).ToSeconds(), (TimeStamp::Now() - mStart).ToSeconds()); mPacedItemEvent.Notify(std::move(item->mItem), item->mTime); continue; } // This item is in the future. Put it back. mQueue.PushFront(item.forget()); break; } if (const auto& next = mQueue.PeekFront(); next) { // The queue is not empty. Schedule the timer. EnsureTimerScheduled(next->mTime); } } public: const nsCOMPtr mTarget; # ifdef MOZ_LOGGING const TimeStamp mStart = TimeStamp::Now(); # endif protected: struct QueueItem { NS_INLINE_DECL_THREADSAFE_REFCOUNTING(QueueItem) QueueItem(T aItem, TimeStamp aTime, bool aIsDuplicate) : mItem(std::forward(aItem)), mTime(aTime), mIsDuplicate(aIsDuplicate) { MOZ_ASSERT(!aTime.IsNull()); } T mItem; TimeStamp mTime; bool mIsDuplicate; private: ~QueueItem() = default; }; // Accessed on mTarget. nsRefPtrDeque mQueue; // Maximum interval at which a frame should be issued, even if it means // duplicating the previous. TimeDuration mDuplicationInterval; // Accessed on mTarget. RefPtr> mTimer; // Accessed on mTarget. Maybe mCurrentTimerTarget; // Accessed on mTarget. bool mIsShutdown = false; MediaEventProducerExc mPacedItemEvent; }; } // namespace mozilla # undef LOG #endif