/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*-*/ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this file, * You can obtain one at http://mozilla.org/MPL/2.0/. */ #ifndef DOM_MEDIA_WEBRTC_LIBWEBRTCGLUE_TASKQUEUEWRAPPER_H_ #define DOM_MEDIA_WEBRTC_LIBWEBRTCGLUE_TASKQUEUEWRAPPER_H_ #include "api/task_queue/task_queue_factory.h" #include "mozilla/DataMutex.h" #include "mozilla/RecursiveMutex.h" #include "mozilla/ProfilerRunnable.h" #include "mozilla/TaskQueue.h" #include "VideoUtils.h" #include "mozilla/media/MediaUtils.h" // For media::Await namespace mozilla { enum class DeletionPolicy : uint8_t { Blocking, NonBlocking }; /** * A wrapper around Mozilla TaskQueues in the shape of a libwebrtc TaskQueue. * * Allows libwebrtc to use Mozilla threads where tooling, e.g. profiling, is set * up and just works. * * Mozilla APIs like Runnables, MozPromise, etc. can also be used with the * wrapped TaskQueue to run things on the right thread when interacting with * libwebrtc. */ template class TaskQueueWrapper : public webrtc::TaskQueueBase { public: TaskQueueWrapper(RefPtr aTaskQueue, nsCString aName) : mTaskQueue(std::move(aTaskQueue)), mName(std::move(aName)) {} ~TaskQueueWrapper() = default; void Delete() override { { // Scope this to make sure it does not race against the promise chain we // set up below. auto hasShutdown = mHasShutdown.Lock(); *hasShutdown = true; } MOZ_RELEASE_ASSERT(Deletion == DeletionPolicy::NonBlocking || !mTaskQueue->IsOnCurrentThread()); nsCOMPtr backgroundTaskQueue; NS_CreateBackgroundTaskQueue(__func__, getter_AddRefs(backgroundTaskQueue)); if (NS_WARN_IF(!backgroundTaskQueue)) { // Ok... that's pretty broken. Try main instead. MOZ_ASSERT(false); backgroundTaskQueue = GetMainThreadSerialEventTarget(); } RefPtr shutdownPromise = mTaskQueue->BeginShutdown()->Then( backgroundTaskQueue, __func__, [this] { // Wait until shutdown is complete, then delete for real. Although we // prevent queued tasks from executing with mHasShutdown, that is a // member variable, which means we still need to ensure that the // queue is done executing tasks before destroying it. delete this; return GenericPromise::CreateAndResolve(true, __func__); }); if constexpr (Deletion == DeletionPolicy::Blocking) { media::Await(backgroundTaskQueue.forget(), shutdownPromise); } else { Unused << shutdownPromise; } } already_AddRefed CreateTaskRunner( absl::AnyInvocable aTask) { return NS_NewRunnableFunction( "TaskQueueWrapper::CreateTaskRunner", [this, task = std::move(aTask), name = nsPrintfCString("TQ %s: webrtc::QueuedTask", mName.get())]() mutable { CurrentTaskQueueSetter current(this); auto hasShutdown = mHasShutdown.Lock(); if (*hasShutdown) { return; } AUTO_PROFILE_FOLLOWING_RUNNABLE(name); std::move(task)(); }); } already_AddRefed CreateTaskRunner(nsCOMPtr aRunnable) { return NS_NewRunnableFunction( "TaskQueueWrapper::CreateTaskRunner", [this, runnable = std::move(aRunnable)]() mutable { CurrentTaskQueueSetter current(this); auto hasShutdown = mHasShutdown.Lock(); if (*hasShutdown) { return; } AUTO_PROFILE_FOLLOWING_RUNNABLE(runnable); runnable->Run(); }); } void PostTask(absl::AnyInvocable aTask) override { MOZ_ALWAYS_SUCCEEDS( mTaskQueue->Dispatch(CreateTaskRunner(std::move(aTask)))); } void PostDelayedTask(absl::AnyInvocable aTask, webrtc::TimeDelta aDelay) override { if (aDelay.ms() == 0) { // AbstractThread::DelayedDispatch doesn't support delay 0 PostTask(std::move(aTask)); return; } MOZ_ALWAYS_SUCCEEDS(mTaskQueue->DelayedDispatch( CreateTaskRunner(std::move(aTask)), aDelay.ms())); } void PostDelayedHighPrecisionTask(absl::AnyInvocable aTask, webrtc::TimeDelta aDelay) override { PostDelayedTask(std::move(aTask), aDelay); } const RefPtr mTaskQueue; const nsCString mName; // This is a recursive mutex because a TaskRunner holding this mutex while // running its runnable may end up running other - tail dispatched - runnables // too, and they'll again try to grab the mutex. // The mutex must be held while running the runnable since otherwise there'd // be a race between shutting down the underlying task queue and the runnable // dispatching to that task queue (and we assert it succeeds in e.g., // PostTask()). DataMutexBase mHasShutdown{ false, "TaskQueueWrapper::mHasShutdown"}; }; template class DefaultDelete> : public webrtc::TaskQueueDeleter { public: void operator()(TaskQueueWrapper* aPtr) const { webrtc::TaskQueueDeleter::operator()(aPtr); } }; class SharedThreadPoolWebRtcTaskQueueFactory : public webrtc::TaskQueueFactory { public: SharedThreadPoolWebRtcTaskQueueFactory() {} template UniquePtr> CreateTaskQueueWrapper( absl::string_view aName, bool aSupportTailDispatch, Priority aPriority, MediaThreadType aThreadType = MediaThreadType::WEBRTC_WORKER) const { // XXX Do something with aPriority nsCString name(aName.data(), aName.size()); auto taskQueue = TaskQueue::Create(GetMediaThreadPool(aThreadType), name.get(), aSupportTailDispatch); return MakeUnique>(std::move(taskQueue), std::move(name)); } std::unique_ptr CreateTaskQueue(absl::string_view aName, Priority aPriority) const override { // libwebrtc will dispatch some tasks sync, i.e., block the origin thread // until they've run, and that doesn't play nice with tail dispatching since // there will never be a tail. // DeletionPolicy::Blocking because this is for libwebrtc use and that's // what they expect. constexpr bool supportTailDispatch = false; return std::unique_ptr( CreateTaskQueueWrapper( std::move(aName), supportTailDispatch, aPriority) .release(), webrtc::TaskQueueDeleter()); } }; } // namespace mozilla #endif