summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/rtc_base/thread.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--third_party/libwebrtc/rtc_base/thread.cc987
1 files changed, 987 insertions, 0 deletions
diff --git a/third_party/libwebrtc/rtc_base/thread.cc b/third_party/libwebrtc/rtc_base/thread.cc
new file mode 100644
index 0000000000..0e5df64dde
--- /dev/null
+++ b/third_party/libwebrtc/rtc_base/thread.cc
@@ -0,0 +1,987 @@
+/*
+ * Copyright 2004 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/thread.h"
+
+#include "absl/strings/string_view.h"
+#include "api/task_queue/task_queue_base.h"
+#include "api/units/time_delta.h"
+#include "rtc_base/socket_server.h"
+
+#if defined(WEBRTC_WIN)
+#include <comdef.h>
+#elif defined(WEBRTC_POSIX)
+#include <time.h>
+#else
+#error "Either WEBRTC_WIN or WEBRTC_POSIX needs to be defined."
+#endif
+
+#if defined(WEBRTC_WIN)
+// Disable warning that we don't care about:
+// warning C4722: destructor never returns, potential memory leak
+#pragma warning(disable : 4722)
+#endif
+
+#include <stdio.h>
+
+#include <utility>
+
+#include "absl/algorithm/container.h"
+#include "absl/cleanup/cleanup.h"
+#include "api/sequence_checker.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/event.h"
+#include "rtc_base/internal/default_socket_server.h"
+#include "rtc_base/logging.h"
+#include "rtc_base/null_socket_server.h"
+#include "rtc_base/synchronization/mutex.h"
+#include "rtc_base/time_utils.h"
+#include "rtc_base/trace_event.h"
+
+#if defined(WEBRTC_MAC)
+#include "rtc_base/system/cocoa_threading.h"
+
+/*
+ * These are forward-declarations for methods that are part of the
+ * ObjC runtime. They are declared in the private header objc-internal.h.
+ * These calls are what clang inserts when using @autoreleasepool in ObjC,
+ * but here they are used directly in order to keep this file C++.
+ * https://clang.llvm.org/docs/AutomaticReferenceCounting.html#runtime-support
+ */
+extern "C" {
+void* objc_autoreleasePoolPush(void);
+void objc_autoreleasePoolPop(void* pool);
+}
+
+namespace {
+class ScopedAutoReleasePool {
+ public:
+ ScopedAutoReleasePool() : pool_(objc_autoreleasePoolPush()) {}
+ ~ScopedAutoReleasePool() { objc_autoreleasePoolPop(pool_); }
+
+ private:
+ void* const pool_;
+};
+} // namespace
+#endif
+
+namespace rtc {
+
+using ::webrtc::MutexLock;
+using ::webrtc::TimeDelta;
+
+ThreadManager* ThreadManager::Instance() {
+ static ThreadManager* const thread_manager = new ThreadManager();
+ return thread_manager;
+}
+
+ThreadManager::~ThreadManager() {
+ // By above RTC_DEFINE_STATIC_LOCAL.
+ RTC_DCHECK_NOTREACHED() << "ThreadManager should never be destructed.";
+}
+
+// static
+void ThreadManager::Add(Thread* message_queue) {
+ return Instance()->AddInternal(message_queue);
+}
+void ThreadManager::AddInternal(Thread* message_queue) {
+ MutexLock cs(&crit_);
+ message_queues_.push_back(message_queue);
+}
+
+// static
+void ThreadManager::Remove(Thread* message_queue) {
+ return Instance()->RemoveInternal(message_queue);
+}
+void ThreadManager::RemoveInternal(Thread* message_queue) {
+ {
+ MutexLock cs(&crit_);
+ std::vector<Thread*>::iterator iter;
+ iter = absl::c_find(message_queues_, message_queue);
+ if (iter != message_queues_.end()) {
+ message_queues_.erase(iter);
+ }
+#if RTC_DCHECK_IS_ON
+ RemoveFromSendGraph(message_queue);
+#endif
+ }
+}
+
+#if RTC_DCHECK_IS_ON
+void ThreadManager::RemoveFromSendGraph(Thread* thread) {
+ for (auto it = send_graph_.begin(); it != send_graph_.end();) {
+ if (it->first == thread) {
+ it = send_graph_.erase(it);
+ } else {
+ it->second.erase(thread);
+ ++it;
+ }
+ }
+}
+
+void ThreadManager::RegisterSendAndCheckForCycles(Thread* source,
+ Thread* target) {
+ RTC_DCHECK(source);
+ RTC_DCHECK(target);
+
+ MutexLock cs(&crit_);
+ std::deque<Thread*> all_targets({target});
+ // We check the pre-existing who-sends-to-who graph for any path from target
+ // to source. This loop is guaranteed to terminate because per the send graph
+ // invariant, there are no cycles in the graph.
+ for (size_t i = 0; i < all_targets.size(); i++) {
+ const auto& targets = send_graph_[all_targets[i]];
+ all_targets.insert(all_targets.end(), targets.begin(), targets.end());
+ }
+ RTC_CHECK_EQ(absl::c_count(all_targets, source), 0)
+ << " send loop between " << source->name() << " and " << target->name();
+
+ // We may now insert source -> target without creating a cycle, since there
+ // was no path from target to source per the prior CHECK.
+ send_graph_[source].insert(target);
+}
+#endif
+
+// static
+void ThreadManager::ProcessAllMessageQueuesForTesting() {
+ return Instance()->ProcessAllMessageQueuesInternal();
+}
+
+void ThreadManager::ProcessAllMessageQueuesInternal() {
+ // This works by posting a delayed message at the current time and waiting
+ // for it to be dispatched on all queues, which will ensure that all messages
+ // that came before it were also dispatched.
+ std::atomic<int> queues_not_done(0);
+
+ {
+ MutexLock cs(&crit_);
+ for (Thread* queue : message_queues_) {
+ if (!queue->IsProcessingMessagesForTesting()) {
+ // If the queue is not processing messages, it can
+ // be ignored. If we tried to post a message to it, it would be dropped
+ // or ignored.
+ continue;
+ }
+ queues_not_done.fetch_add(1);
+ // Whether the task is processed, or the thread is simply cleared,
+ // queues_not_done gets decremented.
+ absl::Cleanup sub = [&queues_not_done] { queues_not_done.fetch_sub(1); };
+ // Post delayed task instead of regular task to wait for all delayed tasks
+ // that are ready for processing.
+ queue->PostDelayedTask([sub = std::move(sub)] {}, TimeDelta::Zero());
+ }
+ }
+
+ rtc::Thread* current = rtc::Thread::Current();
+ // Note: One of the message queues may have been on this thread, which is
+ // why we can't synchronously wait for queues_not_done to go to 0; we need
+ // to process messages as well.
+ while (queues_not_done.load() > 0) {
+ if (current) {
+ current->ProcessMessages(0);
+ }
+ }
+}
+
+// static
+Thread* Thread::Current() {
+ ThreadManager* manager = ThreadManager::Instance();
+ Thread* thread = manager->CurrentThread();
+
+ return thread;
+}
+
+#if defined(WEBRTC_POSIX)
+ThreadManager::ThreadManager() {
+#if defined(WEBRTC_MAC)
+ InitCocoaMultiThreading();
+#endif
+ pthread_key_create(&key_, nullptr);
+}
+
+Thread* ThreadManager::CurrentThread() {
+ return static_cast<Thread*>(pthread_getspecific(key_));
+}
+
+void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
+ pthread_setspecific(key_, thread);
+}
+#endif
+
+#if defined(WEBRTC_WIN)
+ThreadManager::ThreadManager() : key_(TlsAlloc()) {}
+
+Thread* ThreadManager::CurrentThread() {
+ return static_cast<Thread*>(TlsGetValue(key_));
+}
+
+void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
+ TlsSetValue(key_, thread);
+}
+#endif
+
+void ThreadManager::SetCurrentThread(Thread* thread) {
+#if RTC_DLOG_IS_ON
+ if (CurrentThread() && thread) {
+ RTC_DLOG(LS_ERROR) << "SetCurrentThread: Overwriting an existing value?";
+ }
+#endif // RTC_DLOG_IS_ON
+
+ if (thread) {
+ thread->EnsureIsCurrentTaskQueue();
+ } else {
+ Thread* current = CurrentThread();
+ if (current) {
+ // The current thread is being cleared, e.g. as a result of
+ // UnwrapCurrent() being called or when a thread is being stopped
+ // (see PreRun()). This signals that the Thread instance is being detached
+ // from the thread, which also means that TaskQueue::Current() must not
+ // return a pointer to the Thread instance.
+ current->ClearCurrentTaskQueue();
+ }
+ }
+
+ SetCurrentThreadInternal(thread);
+}
+
+void rtc::ThreadManager::ChangeCurrentThreadForTest(rtc::Thread* thread) {
+ SetCurrentThreadInternal(thread);
+}
+
+Thread* ThreadManager::WrapCurrentThread() {
+ Thread* result = CurrentThread();
+ if (nullptr == result) {
+ result = new Thread(CreateDefaultSocketServer());
+ result->WrapCurrentWithThreadManager(this, true);
+ }
+ return result;
+}
+
+void ThreadManager::UnwrapCurrentThread() {
+ Thread* t = CurrentThread();
+ if (t && !(t->IsOwned())) {
+ t->UnwrapCurrent();
+ delete t;
+ }
+}
+
+Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls()
+ : thread_(Thread::Current()),
+ previous_state_(thread_->SetAllowBlockingCalls(false)) {}
+
+Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() {
+ RTC_DCHECK(thread_->IsCurrent());
+ thread_->SetAllowBlockingCalls(previous_state_);
+}
+
+#if RTC_DCHECK_IS_ON
+Thread::ScopedCountBlockingCalls::ScopedCountBlockingCalls(
+ std::function<void(uint32_t, uint32_t)> callback)
+ : thread_(Thread::Current()),
+ base_blocking_call_count_(thread_->GetBlockingCallCount()),
+ base_could_be_blocking_call_count_(
+ thread_->GetCouldBeBlockingCallCount()),
+ result_callback_(std::move(callback)) {}
+
+Thread::ScopedCountBlockingCalls::~ScopedCountBlockingCalls() {
+ if (GetTotalBlockedCallCount() >= min_blocking_calls_for_callback_) {
+ result_callback_(GetBlockingCallCount(), GetCouldBeBlockingCallCount());
+ }
+}
+
+uint32_t Thread::ScopedCountBlockingCalls::GetBlockingCallCount() const {
+ return thread_->GetBlockingCallCount() - base_blocking_call_count_;
+}
+
+uint32_t Thread::ScopedCountBlockingCalls::GetCouldBeBlockingCallCount() const {
+ return thread_->GetCouldBeBlockingCallCount() -
+ base_could_be_blocking_call_count_;
+}
+
+uint32_t Thread::ScopedCountBlockingCalls::GetTotalBlockedCallCount() const {
+ return GetBlockingCallCount() + GetCouldBeBlockingCallCount();
+}
+#endif
+
+Thread::Thread(SocketServer* ss) : Thread(ss, /*do_init=*/true) {}
+
+Thread::Thread(std::unique_ptr<SocketServer> ss)
+ : Thread(std::move(ss), /*do_init=*/true) {}
+
+Thread::Thread(SocketServer* ss, bool do_init)
+ : delayed_next_num_(0),
+ fInitialized_(false),
+ fDestroyed_(false),
+ stop_(0),
+ ss_(ss) {
+ RTC_DCHECK(ss);
+ ss_->SetMessageQueue(this);
+ SetName("Thread", this); // default name
+ if (do_init) {
+ DoInit();
+ }
+}
+
+Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init)
+ : Thread(ss.get(), do_init) {
+ own_ss_ = std::move(ss);
+}
+
+Thread::~Thread() {
+ Stop();
+ DoDestroy();
+}
+
+void Thread::DoInit() {
+ if (fInitialized_) {
+ return;
+ }
+
+ fInitialized_ = true;
+ ThreadManager::Add(this);
+}
+
+void Thread::DoDestroy() {
+ if (fDestroyed_) {
+ return;
+ }
+
+ fDestroyed_ = true;
+ // The signal is done from here to ensure
+ // that it always gets called when the queue
+ // is going away.
+ if (ss_) {
+ ss_->SetMessageQueue(nullptr);
+ }
+ ThreadManager::Remove(this);
+ // Clear.
+ CurrentTaskQueueSetter set_current(this);
+ messages_ = {};
+ delayed_messages_ = {};
+}
+
+SocketServer* Thread::socketserver() {
+ return ss_;
+}
+
+void Thread::WakeUpSocketServer() {
+ ss_->WakeUp();
+}
+
+void Thread::Quit() {
+ stop_.store(1, std::memory_order_release);
+ WakeUpSocketServer();
+}
+
+bool Thread::IsQuitting() {
+ return stop_.load(std::memory_order_acquire) != 0;
+}
+
+void Thread::Restart() {
+ stop_.store(0, std::memory_order_release);
+}
+
+absl::AnyInvocable<void() &&> Thread::Get(int cmsWait) {
+ // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
+
+ int64_t cmsTotal = cmsWait;
+ int64_t cmsElapsed = 0;
+ int64_t msStart = TimeMillis();
+ int64_t msCurrent = msStart;
+ while (true) {
+ // Check for posted events
+ int64_t cmsDelayNext = kForever;
+ {
+ // All queue operations need to be locked, but nothing else in this loop
+ // can happen while holding the `mutex_`.
+ MutexLock lock(&mutex_);
+ // Check for delayed messages that have been triggered and calculate the
+ // next trigger time.
+ while (!delayed_messages_.empty()) {
+ if (msCurrent < delayed_messages_.top().run_time_ms) {
+ cmsDelayNext =
+ TimeDiff(delayed_messages_.top().run_time_ms, msCurrent);
+ break;
+ }
+ messages_.push(std::move(delayed_messages_.top().functor));
+ delayed_messages_.pop();
+ }
+ // Pull a message off the message queue, if available.
+ if (!messages_.empty()) {
+ absl::AnyInvocable<void()&&> task = std::move(messages_.front());
+ messages_.pop();
+ return task;
+ }
+ }
+
+ if (IsQuitting())
+ break;
+
+ // Which is shorter, the delay wait or the asked wait?
+
+ int64_t cmsNext;
+ if (cmsWait == kForever) {
+ cmsNext = cmsDelayNext;
+ } else {
+ cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
+ if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
+ cmsNext = cmsDelayNext;
+ }
+
+ {
+ // Wait and multiplex in the meantime
+ if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever
+ : webrtc::TimeDelta::Millis(cmsNext),
+ /*process_io=*/true))
+ return nullptr;
+ }
+
+ // If the specified timeout expired, return
+
+ msCurrent = TimeMillis();
+ cmsElapsed = TimeDiff(msCurrent, msStart);
+ if (cmsWait != kForever) {
+ if (cmsElapsed >= cmsWait)
+ return nullptr;
+ }
+ }
+ return nullptr;
+}
+
+void Thread::PostTask(absl::AnyInvocable<void() &&> task) {
+ if (IsQuitting()) {
+ return;
+ }
+
+ // Keep thread safe
+ // Add the message to the end of the queue
+ // Signal for the multiplexer to return
+
+ {
+ MutexLock lock(&mutex_);
+ messages_.push(std::move(task));
+ }
+ WakeUpSocketServer();
+}
+
+void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
+ webrtc::TimeDelta delay) {
+ if (IsQuitting()) {
+ return;
+ }
+
+ // Keep thread safe
+ // Add to the priority queue. Gets sorted soonest first.
+ // Signal for the multiplexer to return.
+
+ int64_t delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>();
+ int64_t run_time_ms = TimeAfter(delay_ms);
+ {
+ MutexLock lock(&mutex_);
+ delayed_messages_.push({.delay_ms = delay_ms,
+ .run_time_ms = run_time_ms,
+ .message_number = delayed_next_num_,
+ .functor = std::move(task)});
+ // If this message queue processes 1 message every millisecond for 50 days,
+ // we will wrap this number. Even then, only messages with identical times
+ // will be misordered, and then only briefly. This is probably ok.
+ ++delayed_next_num_;
+ RTC_DCHECK_NE(0, delayed_next_num_);
+ }
+ WakeUpSocketServer();
+}
+
+int Thread::GetDelay() {
+ MutexLock lock(&mutex_);
+
+ if (!messages_.empty())
+ return 0;
+
+ if (!delayed_messages_.empty()) {
+ int delay = TimeUntil(delayed_messages_.top().run_time_ms);
+ if (delay < 0)
+ delay = 0;
+ return delay;
+ }
+
+ return kForever;
+}
+
+void Thread::Dispatch(absl::AnyInvocable<void() &&> task) {
+ TRACE_EVENT0("webrtc", "Thread::Dispatch");
+ RTC_DCHECK_RUN_ON(this);
+ int64_t start_time = TimeMillis();
+ std::move(task)();
+ int64_t end_time = TimeMillis();
+ int64_t diff = TimeDiff(end_time, start_time);
+ if (diff >= dispatch_warning_ms_) {
+ RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
+ << "ms to dispatch.";
+ // To avoid log spew, move the warning limit to only give warning
+ // for delays that are larger than the one observed.
+ dispatch_warning_ms_ = diff + 1;
+ }
+}
+
+bool Thread::IsCurrent() const {
+ return ThreadManager::Instance()->CurrentThread() == this;
+}
+
+std::unique_ptr<Thread> Thread::CreateWithSocketServer() {
+ return std::unique_ptr<Thread>(new Thread(CreateDefaultSocketServer()));
+}
+
+std::unique_ptr<Thread> Thread::Create() {
+ return std::unique_ptr<Thread>(
+ new Thread(std::unique_ptr<SocketServer>(new NullSocketServer())));
+}
+
+bool Thread::SleepMs(int milliseconds) {
+ AssertBlockingIsAllowedOnCurrentThread();
+
+#if defined(WEBRTC_WIN)
+ ::Sleep(milliseconds);
+ return true;
+#else
+ // POSIX has both a usleep() and a nanosleep(), but the former is deprecated,
+ // so we use nanosleep() even though it has greater precision than necessary.
+ struct timespec ts;
+ ts.tv_sec = milliseconds / 1000;
+ ts.tv_nsec = (milliseconds % 1000) * 1000000;
+ int ret = nanosleep(&ts, nullptr);
+ if (ret != 0) {
+ RTC_LOG_ERR(LS_WARNING) << "nanosleep() returning early";
+ return false;
+ }
+ return true;
+#endif
+}
+
+bool Thread::SetName(absl::string_view name, const void* obj) {
+ RTC_DCHECK(!IsRunning());
+
+ name_ = std::string(name);
+ if (obj) {
+ // The %p specifier typically produce at most 16 hex digits, possibly with a
+ // 0x prefix. But format is implementation defined, so add some margin.
+ char buf[30];
+ snprintf(buf, sizeof(buf), " 0x%p", obj);
+ name_ += buf;
+ }
+ return true;
+}
+
+void Thread::SetDispatchWarningMs(int deadline) {
+ if (!IsCurrent()) {
+ PostTask([this, deadline]() { SetDispatchWarningMs(deadline); });
+ return;
+ }
+ RTC_DCHECK_RUN_ON(this);
+ dispatch_warning_ms_ = deadline;
+}
+
+bool Thread::Start() {
+ RTC_DCHECK(!IsRunning());
+
+ if (IsRunning())
+ return false;
+
+ Restart(); // reset IsQuitting() if the thread is being restarted
+
+ // Make sure that ThreadManager is created on the main thread before
+ // we start a new thread.
+ ThreadManager::Instance();
+
+ owned_ = true;
+
+#if defined(WEBRTC_WIN)
+ thread_ = CreateThread(nullptr, 0, PreRun, this, 0, &thread_id_);
+ if (!thread_) {
+ return false;
+ }
+#elif defined(WEBRTC_POSIX)
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+
+ int error_code = pthread_create(&thread_, &attr, PreRun, this);
+ if (0 != error_code) {
+ RTC_LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
+ thread_ = 0;
+ return false;
+ }
+ RTC_DCHECK(thread_);
+#endif
+ return true;
+}
+
+bool Thread::WrapCurrent() {
+ return WrapCurrentWithThreadManager(ThreadManager::Instance(), true);
+}
+
+void Thread::UnwrapCurrent() {
+ // Clears the platform-specific thread-specific storage.
+ ThreadManager::Instance()->SetCurrentThread(nullptr);
+#if defined(WEBRTC_WIN)
+ if (thread_ != nullptr) {
+ if (!CloseHandle(thread_)) {
+ RTC_LOG_GLE(LS_ERROR)
+ << "When unwrapping thread, failed to close handle.";
+ }
+ thread_ = nullptr;
+ thread_id_ = 0;
+ }
+#elif defined(WEBRTC_POSIX)
+ thread_ = 0;
+#endif
+}
+
+void Thread::SafeWrapCurrent() {
+ WrapCurrentWithThreadManager(ThreadManager::Instance(), false);
+}
+
+void Thread::Join() {
+ if (!IsRunning())
+ return;
+
+ RTC_DCHECK(!IsCurrent());
+ if (Current() && !Current()->blocking_calls_allowed_) {
+ RTC_LOG(LS_WARNING) << "Waiting for the thread to join, "
+ "but blocking calls have been disallowed";
+ }
+
+#if defined(WEBRTC_WIN)
+ RTC_DCHECK(thread_ != nullptr);
+ WaitForSingleObject(thread_, INFINITE);
+ CloseHandle(thread_);
+ thread_ = nullptr;
+ thread_id_ = 0;
+#elif defined(WEBRTC_POSIX)
+ pthread_join(thread_, nullptr);
+ thread_ = 0;
+#endif
+}
+
+bool Thread::SetAllowBlockingCalls(bool allow) {
+ RTC_DCHECK(IsCurrent());
+ bool previous = blocking_calls_allowed_;
+ blocking_calls_allowed_ = allow;
+ return previous;
+}
+
+// static
+void Thread::AssertBlockingIsAllowedOnCurrentThread() {
+#if !defined(NDEBUG)
+ Thread* current = Thread::Current();
+ RTC_DCHECK(!current || current->blocking_calls_allowed_);
+#endif
+}
+
+// static
+#if defined(WEBRTC_WIN)
+DWORD WINAPI Thread::PreRun(LPVOID pv) {
+#else
+void* Thread::PreRun(void* pv) {
+#endif
+ Thread* thread = static_cast<Thread*>(pv);
+ ThreadManager::Instance()->SetCurrentThread(thread);
+ rtc::SetCurrentThreadName(thread->name_.c_str());
+#if defined(WEBRTC_MAC)
+ ScopedAutoReleasePool pool;
+#endif
+ thread->Run();
+
+ ThreadManager::Instance()->SetCurrentThread(nullptr);
+#ifdef WEBRTC_WIN
+ return 0;
+#else
+ return nullptr;
+#endif
+} // namespace rtc
+
+void Thread::Run() {
+ ProcessMessages(kForever);
+}
+
+bool Thread::IsOwned() {
+ RTC_DCHECK(IsRunning());
+ return owned_;
+}
+
+void Thread::Stop() {
+ Thread::Quit();
+ Join();
+}
+
+void Thread::BlockingCall(rtc::FunctionView<void()> functor) {
+ TRACE_EVENT0("webrtc", "Thread::BlockingCall");
+
+ RTC_DCHECK(!IsQuitting());
+ if (IsQuitting())
+ return;
+
+ if (IsCurrent()) {
+#if RTC_DCHECK_IS_ON
+ RTC_DCHECK(this->IsInvokeToThreadAllowed(this));
+ RTC_DCHECK_RUN_ON(this);
+ could_be_blocking_call_count_++;
+#endif
+ functor();
+ return;
+ }
+
+ AssertBlockingIsAllowedOnCurrentThread();
+
+ Thread* current_thread = Thread::Current();
+
+#if RTC_DCHECK_IS_ON
+ if (current_thread) {
+ RTC_DCHECK_RUN_ON(current_thread);
+ current_thread->blocking_call_count_++;
+ RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
+ ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
+ this);
+ }
+#endif
+
+ // Perhaps down the line we can get rid of this workaround and always require
+ // current_thread to be valid when BlockingCall() is called.
+ std::unique_ptr<rtc::Event> done_event;
+ if (!current_thread)
+ done_event.reset(new rtc::Event());
+
+ bool ready = false;
+ absl::Cleanup cleanup = [this, &ready, current_thread,
+ done = done_event.get()] {
+ if (current_thread) {
+ {
+ MutexLock lock(&mutex_);
+ ready = true;
+ }
+ current_thread->socketserver()->WakeUp();
+ } else {
+ done->Set();
+ }
+ };
+ PostTask([functor, cleanup = std::move(cleanup)] { functor(); });
+ if (current_thread) {
+ bool waited = false;
+ mutex_.Lock();
+ while (!ready) {
+ mutex_.Unlock();
+ current_thread->socketserver()->Wait(SocketServer::kForever, false);
+ waited = true;
+ mutex_.Lock();
+ }
+ mutex_.Unlock();
+
+ // Our Wait loop above may have consumed some WakeUp events for this
+ // Thread, that weren't relevant to this Send. Losing these WakeUps can
+ // cause problems for some SocketServers.
+ //
+ // Concrete example:
+ // Win32SocketServer on thread A calls Send on thread B. While processing
+ // the message, thread B Posts a message to A. We consume the wakeup for
+ // that Post while waiting for the Send to complete, which means that when
+ // we exit this loop, we need to issue another WakeUp, or else the Posted
+ // message won't be processed in a timely manner.
+
+ if (waited) {
+ current_thread->socketserver()->WakeUp();
+ }
+ } else {
+ done_event->Wait(rtc::Event::kForever);
+ }
+}
+
+// Called by the ThreadManager when being set as the current thread.
+void Thread::EnsureIsCurrentTaskQueue() {
+ task_queue_registration_ =
+ std::make_unique<TaskQueueBase::CurrentTaskQueueSetter>(this);
+}
+
+// Called by the ThreadManager when being set as the current thread.
+void Thread::ClearCurrentTaskQueue() {
+ task_queue_registration_.reset();
+}
+
+void Thread::AllowInvokesToThread(Thread* thread) {
+#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
+ if (!IsCurrent()) {
+ PostTask([thread, this]() { AllowInvokesToThread(thread); });
+ return;
+ }
+ RTC_DCHECK_RUN_ON(this);
+ allowed_threads_.push_back(thread);
+ invoke_policy_enabled_ = true;
+#endif
+}
+
+void Thread::DisallowAllInvokes() {
+#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
+ if (!IsCurrent()) {
+ PostTask([this]() { DisallowAllInvokes(); });
+ return;
+ }
+ RTC_DCHECK_RUN_ON(this);
+ allowed_threads_.clear();
+ invoke_policy_enabled_ = true;
+#endif
+}
+
+#if RTC_DCHECK_IS_ON
+uint32_t Thread::GetBlockingCallCount() const {
+ RTC_DCHECK_RUN_ON(this);
+ return blocking_call_count_;
+}
+uint32_t Thread::GetCouldBeBlockingCallCount() const {
+ RTC_DCHECK_RUN_ON(this);
+ return could_be_blocking_call_count_;
+}
+#endif
+
+// Returns true if no policies added or if there is at least one policy
+// that permits invocation to `target` thread.
+bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) {
+#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
+ RTC_DCHECK_RUN_ON(this);
+ if (!invoke_policy_enabled_) {
+ return true;
+ }
+ for (const auto* thread : allowed_threads_) {
+ if (thread == target) {
+ return true;
+ }
+ }
+ return false;
+#else
+ return true;
+#endif
+}
+
+void Thread::Delete() {
+ Stop();
+ delete this;
+}
+
+void Thread::PostDelayedTask(absl::AnyInvocable<void() &&> task,
+ webrtc::TimeDelta delay) {
+ // This implementation does not support low precision yet.
+ PostDelayedHighPrecisionTask(std::move(task), delay);
+}
+
+bool Thread::IsProcessingMessagesForTesting() {
+ return (owned_ || IsCurrent()) && !IsQuitting();
+}
+
+bool Thread::ProcessMessages(int cmsLoop) {
+ // Using ProcessMessages with a custom clock for testing and a time greater
+ // than 0 doesn't work, since it's not guaranteed to advance the custom
+ // clock's time, and may get stuck in an infinite loop.
+ RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
+ cmsLoop == kForever);
+ int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
+ int cmsNext = cmsLoop;
+
+ while (true) {
+#if defined(WEBRTC_MAC)
+ ScopedAutoReleasePool pool;
+#endif
+ absl::AnyInvocable<void()&&> task = Get(cmsNext);
+ if (!task)
+ return !IsQuitting();
+ Dispatch(std::move(task));
+
+ if (cmsLoop != kForever) {
+ cmsNext = static_cast<int>(TimeUntil(msEnd));
+ if (cmsNext < 0)
+ return true;
+ }
+ }
+}
+
+bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
+ bool need_synchronize_access) {
+ RTC_DCHECK(!IsRunning());
+
+#if defined(WEBRTC_WIN)
+ if (need_synchronize_access) {
+ // We explicitly ask for no rights other than synchronization.
+ // This gives us the best chance of succeeding.
+ thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
+ if (!thread_) {
+ RTC_LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
+ return false;
+ }
+ thread_id_ = GetCurrentThreadId();
+ }
+#elif defined(WEBRTC_POSIX)
+ thread_ = pthread_self();
+#endif
+ owned_ = false;
+ thread_manager->SetCurrentThread(this);
+ return true;
+}
+
+bool Thread::IsRunning() {
+#if defined(WEBRTC_WIN)
+ return thread_ != nullptr;
+#elif defined(WEBRTC_POSIX)
+ return thread_ != 0;
+#endif
+}
+
+AutoThread::AutoThread()
+ : Thread(CreateDefaultSocketServer(), /*do_init=*/false) {
+ if (!ThreadManager::Instance()->CurrentThread()) {
+ // DoInit registers with ThreadManager. Do that only if we intend to
+ // be rtc::Thread::Current(), otherwise ProcessAllMessageQueuesInternal will
+ // post a message to a queue that no running thread is serving.
+ DoInit();
+ ThreadManager::Instance()->SetCurrentThread(this);
+ }
+}
+
+AutoThread::~AutoThread() {
+ Stop();
+ DoDestroy();
+ if (ThreadManager::Instance()->CurrentThread() == this) {
+ ThreadManager::Instance()->SetCurrentThread(nullptr);
+ }
+}
+
+AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss)
+ : Thread(ss, /*do_init=*/false) {
+ DoInit();
+ old_thread_ = ThreadManager::Instance()->CurrentThread();
+ // Temporarily set the current thread to nullptr so that we can keep checks
+ // around that catch unintentional pointer overwrites.
+ rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
+ rtc::ThreadManager::Instance()->SetCurrentThread(this);
+ if (old_thread_) {
+ ThreadManager::Remove(old_thread_);
+ }
+}
+
+AutoSocketServerThread::~AutoSocketServerThread() {
+ RTC_DCHECK(ThreadManager::Instance()->CurrentThread() == this);
+ // Stop and destroy the thread before clearing it as the current thread.
+ // Sometimes there are messages left in the Thread that will be
+ // destroyed by DoDestroy, and sometimes the destructors of the message and/or
+ // its contents rely on this thread still being set as the current thread.
+ Stop();
+ DoDestroy();
+ rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
+ rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_);
+ if (old_thread_) {
+ ThreadManager::Add(old_thread_);
+ }
+}
+
+} // namespace rtc