181 lines
6.9 KiB
C++
181 lines
6.9 KiB
C++
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*-*/
|
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
* You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
#ifndef DOM_MEDIA_WEBRTC_LIBWEBRTCGLUE_TASKQUEUEWRAPPER_H_
|
|
#define DOM_MEDIA_WEBRTC_LIBWEBRTCGLUE_TASKQUEUEWRAPPER_H_
|
|
|
|
#include "api/task_queue/task_queue_factory.h"
|
|
#include "mozilla/DataMutex.h"
|
|
#include "mozilla/RecursiveMutex.h"
|
|
#include "mozilla/ProfilerRunnable.h"
|
|
#include "mozilla/TaskQueue.h"
|
|
#include "VideoUtils.h"
|
|
#include "mozilla/media/MediaUtils.h" // For media::Await
|
|
|
|
namespace mozilla {
|
|
|
|
enum class DeletionPolicy : uint8_t { Blocking, NonBlocking };
|
|
|
|
/**
|
|
* A wrapper around Mozilla TaskQueues in the shape of a libwebrtc TaskQueue.
|
|
*
|
|
* Allows libwebrtc to use Mozilla threads where tooling, e.g. profiling, is set
|
|
* up and just works.
|
|
*
|
|
* Mozilla APIs like Runnables, MozPromise, etc. can also be used with the
|
|
* wrapped TaskQueue to run things on the right thread when interacting with
|
|
* libwebrtc.
|
|
*/
|
|
template <DeletionPolicy Deletion>
|
|
class TaskQueueWrapper : public webrtc::TaskQueueBase {
|
|
public:
|
|
TaskQueueWrapper(RefPtr<TaskQueue> aTaskQueue, nsCString aName)
|
|
: mTaskQueue(std::move(aTaskQueue)), mName(std::move(aName)) {}
|
|
~TaskQueueWrapper() = default;
|
|
|
|
void Delete() override {
|
|
{
|
|
// Scope this to make sure it does not race against the promise chain we
|
|
// set up below.
|
|
auto hasShutdown = mHasShutdown.Lock();
|
|
*hasShutdown = true;
|
|
}
|
|
|
|
MOZ_RELEASE_ASSERT(Deletion == DeletionPolicy::NonBlocking ||
|
|
!mTaskQueue->IsOnCurrentThread());
|
|
|
|
nsCOMPtr<nsISerialEventTarget> backgroundTaskQueue;
|
|
NS_CreateBackgroundTaskQueue(__func__, getter_AddRefs(backgroundTaskQueue));
|
|
if (NS_WARN_IF(!backgroundTaskQueue)) {
|
|
// Ok... that's pretty broken. Try main instead.
|
|
MOZ_ASSERT(false);
|
|
backgroundTaskQueue = GetMainThreadSerialEventTarget();
|
|
}
|
|
|
|
RefPtr<GenericPromise> shutdownPromise = mTaskQueue->BeginShutdown()->Then(
|
|
backgroundTaskQueue, __func__, [this] {
|
|
// Wait until shutdown is complete, then delete for real. Although we
|
|
// prevent queued tasks from executing with mHasShutdown, that is a
|
|
// member variable, which means we still need to ensure that the
|
|
// queue is done executing tasks before destroying it.
|
|
delete this;
|
|
return GenericPromise::CreateAndResolve(true, __func__);
|
|
});
|
|
if constexpr (Deletion == DeletionPolicy::Blocking) {
|
|
media::Await(backgroundTaskQueue.forget(), shutdownPromise);
|
|
} else {
|
|
Unused << shutdownPromise;
|
|
}
|
|
}
|
|
|
|
already_AddRefed<Runnable> CreateTaskRunner(
|
|
absl::AnyInvocable<void() &&> aTask) {
|
|
return NS_NewRunnableFunction(
|
|
"TaskQueueWrapper::CreateTaskRunner",
|
|
[this, task = std::move(aTask),
|
|
name = nsPrintfCString("TQ %s: webrtc::QueuedTask",
|
|
mName.get())]() mutable {
|
|
CurrentTaskQueueSetter current(this);
|
|
auto hasShutdown = mHasShutdown.Lock();
|
|
if (*hasShutdown) {
|
|
return;
|
|
}
|
|
AUTO_PROFILE_FOLLOWING_RUNNABLE(name);
|
|
std::move(task)();
|
|
});
|
|
}
|
|
|
|
already_AddRefed<Runnable> CreateTaskRunner(nsCOMPtr<nsIRunnable> aRunnable) {
|
|
return NS_NewRunnableFunction(
|
|
"TaskQueueWrapper::CreateTaskRunner",
|
|
[this, runnable = std::move(aRunnable)]() mutable {
|
|
CurrentTaskQueueSetter current(this);
|
|
auto hasShutdown = mHasShutdown.Lock();
|
|
if (*hasShutdown) {
|
|
return;
|
|
}
|
|
AUTO_PROFILE_FOLLOWING_RUNNABLE(runnable);
|
|
runnable->Run();
|
|
});
|
|
}
|
|
|
|
void PostTaskImpl(absl::AnyInvocable<void() &&> aTask,
|
|
const PostTaskTraits& aTraits,
|
|
const webrtc::Location& aLocation) override {
|
|
MOZ_ALWAYS_SUCCEEDS(
|
|
mTaskQueue->Dispatch(CreateTaskRunner(std::move(aTask))));
|
|
}
|
|
|
|
void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> aTask,
|
|
webrtc::TimeDelta aDelay,
|
|
const PostDelayedTaskTraits& aTraits,
|
|
const webrtc::Location& aLocation) override {
|
|
if (aDelay.ms() == 0) {
|
|
// AbstractThread::DelayedDispatch doesn't support delay 0
|
|
PostTaskImpl(std::move(aTask), PostTaskTraits{}, aLocation);
|
|
return;
|
|
}
|
|
MOZ_ALWAYS_SUCCEEDS(mTaskQueue->DelayedDispatch(
|
|
CreateTaskRunner(std::move(aTask)), aDelay.ms()));
|
|
}
|
|
|
|
const RefPtr<TaskQueue> mTaskQueue;
|
|
const nsCString mName;
|
|
|
|
// This is a recursive mutex because a TaskRunner holding this mutex while
|
|
// running its runnable may end up running other - tail dispatched - runnables
|
|
// too, and they'll again try to grab the mutex.
|
|
// The mutex must be held while running the runnable since otherwise there'd
|
|
// be a race between shutting down the underlying task queue and the runnable
|
|
// dispatching to that task queue (and we assert it succeeds in e.g.,
|
|
// PostTask()).
|
|
DataMutexBase<bool, RecursiveMutex> mHasShutdown{
|
|
false, "TaskQueueWrapper::mHasShutdown"};
|
|
};
|
|
|
|
template <DeletionPolicy Deletion>
|
|
class DefaultDelete<TaskQueueWrapper<Deletion>>
|
|
: public webrtc::TaskQueueDeleter {
|
|
public:
|
|
void operator()(TaskQueueWrapper<Deletion>* aPtr) const {
|
|
webrtc::TaskQueueDeleter::operator()(aPtr);
|
|
}
|
|
};
|
|
|
|
class SharedThreadPoolWebRtcTaskQueueFactory : public webrtc::TaskQueueFactory {
|
|
public:
|
|
SharedThreadPoolWebRtcTaskQueueFactory() {}
|
|
|
|
template <DeletionPolicy Deletion>
|
|
UniquePtr<TaskQueueWrapper<Deletion>> CreateTaskQueueWrapper(
|
|
absl::string_view aName, bool aSupportTailDispatch, Priority aPriority,
|
|
MediaThreadType aThreadType = MediaThreadType::WEBRTC_WORKER) const {
|
|
// XXX Do something with aPriority
|
|
nsCString name(aName.data(), aName.size());
|
|
auto taskQueue = TaskQueue::Create(GetMediaThreadPool(aThreadType),
|
|
name.get(), aSupportTailDispatch);
|
|
return MakeUnique<TaskQueueWrapper<Deletion>>(std::move(taskQueue),
|
|
std::move(name));
|
|
}
|
|
|
|
std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
|
|
CreateTaskQueue(absl::string_view aName, Priority aPriority) const override {
|
|
// libwebrtc will dispatch some tasks sync, i.e., block the origin thread
|
|
// until they've run, and that doesn't play nice with tail dispatching since
|
|
// there will never be a tail.
|
|
// DeletionPolicy::Blocking because this is for libwebrtc use and that's
|
|
// what they expect.
|
|
constexpr bool supportTailDispatch = false;
|
|
return std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>(
|
|
CreateTaskQueueWrapper<DeletionPolicy::Blocking>(
|
|
std::move(aName), supportTailDispatch, aPriority)
|
|
.release(),
|
|
webrtc::TaskQueueDeleter());
|
|
}
|
|
};
|
|
|
|
} // namespace mozilla
|
|
|
|
#endif
|