diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/libwebrtc/rtc_base/thread.cc | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/libwebrtc/rtc_base/thread.cc')
-rw-r--r-- | third_party/libwebrtc/rtc_base/thread.cc | 987 |
1 files changed, 987 insertions, 0 deletions
diff --git a/third_party/libwebrtc/rtc_base/thread.cc b/third_party/libwebrtc/rtc_base/thread.cc new file mode 100644 index 0000000000..0e5df64dde --- /dev/null +++ b/third_party/libwebrtc/rtc_base/thread.cc @@ -0,0 +1,987 @@ +/* + * Copyright 2004 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/thread.h" + +#include "absl/strings/string_view.h" +#include "api/task_queue/task_queue_base.h" +#include "api/units/time_delta.h" +#include "rtc_base/socket_server.h" + +#if defined(WEBRTC_WIN) +#include <comdef.h> +#elif defined(WEBRTC_POSIX) +#include <time.h> +#else +#error "Either WEBRTC_WIN or WEBRTC_POSIX needs to be defined." +#endif + +#if defined(WEBRTC_WIN) +// Disable warning that we don't care about: +// warning C4722: destructor never returns, potential memory leak +#pragma warning(disable : 4722) +#endif + +#include <stdio.h> + +#include <utility> + +#include "absl/algorithm/container.h" +#include "absl/cleanup/cleanup.h" +#include "api/sequence_checker.h" +#include "rtc_base/checks.h" +#include "rtc_base/event.h" +#include "rtc_base/internal/default_socket_server.h" +#include "rtc_base/logging.h" +#include "rtc_base/null_socket_server.h" +#include "rtc_base/synchronization/mutex.h" +#include "rtc_base/time_utils.h" +#include "rtc_base/trace_event.h" + +#if defined(WEBRTC_MAC) +#include "rtc_base/system/cocoa_threading.h" + +/* + * These are forward-declarations for methods that are part of the + * ObjC runtime. They are declared in the private header objc-internal.h. + * These calls are what clang inserts when using @autoreleasepool in ObjC, + * but here they are used directly in order to keep this file C++. + * https://clang.llvm.org/docs/AutomaticReferenceCounting.html#runtime-support + */ +extern "C" { +void* objc_autoreleasePoolPush(void); +void objc_autoreleasePoolPop(void* pool); +} + +namespace { +class ScopedAutoReleasePool { + public: + ScopedAutoReleasePool() : pool_(objc_autoreleasePoolPush()) {} + ~ScopedAutoReleasePool() { objc_autoreleasePoolPop(pool_); } + + private: + void* const pool_; +}; +} // namespace +#endif + +namespace rtc { + +using ::webrtc::MutexLock; +using ::webrtc::TimeDelta; + +ThreadManager* ThreadManager::Instance() { + static ThreadManager* const thread_manager = new ThreadManager(); + return thread_manager; +} + +ThreadManager::~ThreadManager() { + // By above RTC_DEFINE_STATIC_LOCAL. + RTC_DCHECK_NOTREACHED() << "ThreadManager should never be destructed."; +} + +// static +void ThreadManager::Add(Thread* message_queue) { + return Instance()->AddInternal(message_queue); +} +void ThreadManager::AddInternal(Thread* message_queue) { + MutexLock cs(&crit_); + message_queues_.push_back(message_queue); +} + +// static +void ThreadManager::Remove(Thread* message_queue) { + return Instance()->RemoveInternal(message_queue); +} +void ThreadManager::RemoveInternal(Thread* message_queue) { + { + MutexLock cs(&crit_); + std::vector<Thread*>::iterator iter; + iter = absl::c_find(message_queues_, message_queue); + if (iter != message_queues_.end()) { + message_queues_.erase(iter); + } +#if RTC_DCHECK_IS_ON + RemoveFromSendGraph(message_queue); +#endif + } +} + +#if RTC_DCHECK_IS_ON +void ThreadManager::RemoveFromSendGraph(Thread* thread) { + for (auto it = send_graph_.begin(); it != send_graph_.end();) { + if (it->first == thread) { + it = send_graph_.erase(it); + } else { + it->second.erase(thread); + ++it; + } + } +} + +void ThreadManager::RegisterSendAndCheckForCycles(Thread* source, + Thread* target) { + RTC_DCHECK(source); + RTC_DCHECK(target); + + MutexLock cs(&crit_); + std::deque<Thread*> all_targets({target}); + // We check the pre-existing who-sends-to-who graph for any path from target + // to source. This loop is guaranteed to terminate because per the send graph + // invariant, there are no cycles in the graph. + for (size_t i = 0; i < all_targets.size(); i++) { + const auto& targets = send_graph_[all_targets[i]]; + all_targets.insert(all_targets.end(), targets.begin(), targets.end()); + } + RTC_CHECK_EQ(absl::c_count(all_targets, source), 0) + << " send loop between " << source->name() << " and " << target->name(); + + // We may now insert source -> target without creating a cycle, since there + // was no path from target to source per the prior CHECK. + send_graph_[source].insert(target); +} +#endif + +// static +void ThreadManager::ProcessAllMessageQueuesForTesting() { + return Instance()->ProcessAllMessageQueuesInternal(); +} + +void ThreadManager::ProcessAllMessageQueuesInternal() { + // This works by posting a delayed message at the current time and waiting + // for it to be dispatched on all queues, which will ensure that all messages + // that came before it were also dispatched. + std::atomic<int> queues_not_done(0); + + { + MutexLock cs(&crit_); + for (Thread* queue : message_queues_) { + if (!queue->IsProcessingMessagesForTesting()) { + // If the queue is not processing messages, it can + // be ignored. If we tried to post a message to it, it would be dropped + // or ignored. + continue; + } + queues_not_done.fetch_add(1); + // Whether the task is processed, or the thread is simply cleared, + // queues_not_done gets decremented. + absl::Cleanup sub = [&queues_not_done] { queues_not_done.fetch_sub(1); }; + // Post delayed task instead of regular task to wait for all delayed tasks + // that are ready for processing. + queue->PostDelayedTask([sub = std::move(sub)] {}, TimeDelta::Zero()); + } + } + + rtc::Thread* current = rtc::Thread::Current(); + // Note: One of the message queues may have been on this thread, which is + // why we can't synchronously wait for queues_not_done to go to 0; we need + // to process messages as well. + while (queues_not_done.load() > 0) { + if (current) { + current->ProcessMessages(0); + } + } +} + +// static +Thread* Thread::Current() { + ThreadManager* manager = ThreadManager::Instance(); + Thread* thread = manager->CurrentThread(); + + return thread; +} + +#if defined(WEBRTC_POSIX) +ThreadManager::ThreadManager() { +#if defined(WEBRTC_MAC) + InitCocoaMultiThreading(); +#endif + pthread_key_create(&key_, nullptr); +} + +Thread* ThreadManager::CurrentThread() { + return static_cast<Thread*>(pthread_getspecific(key_)); +} + +void ThreadManager::SetCurrentThreadInternal(Thread* thread) { + pthread_setspecific(key_, thread); +} +#endif + +#if defined(WEBRTC_WIN) +ThreadManager::ThreadManager() : key_(TlsAlloc()) {} + +Thread* ThreadManager::CurrentThread() { + return static_cast<Thread*>(TlsGetValue(key_)); +} + +void ThreadManager::SetCurrentThreadInternal(Thread* thread) { + TlsSetValue(key_, thread); +} +#endif + +void ThreadManager::SetCurrentThread(Thread* thread) { +#if RTC_DLOG_IS_ON + if (CurrentThread() && thread) { + RTC_DLOG(LS_ERROR) << "SetCurrentThread: Overwriting an existing value?"; + } +#endif // RTC_DLOG_IS_ON + + if (thread) { + thread->EnsureIsCurrentTaskQueue(); + } else { + Thread* current = CurrentThread(); + if (current) { + // The current thread is being cleared, e.g. as a result of + // UnwrapCurrent() being called or when a thread is being stopped + // (see PreRun()). This signals that the Thread instance is being detached + // from the thread, which also means that TaskQueue::Current() must not + // return a pointer to the Thread instance. + current->ClearCurrentTaskQueue(); + } + } + + SetCurrentThreadInternal(thread); +} + +void rtc::ThreadManager::ChangeCurrentThreadForTest(rtc::Thread* thread) { + SetCurrentThreadInternal(thread); +} + +Thread* ThreadManager::WrapCurrentThread() { + Thread* result = CurrentThread(); + if (nullptr == result) { + result = new Thread(CreateDefaultSocketServer()); + result->WrapCurrentWithThreadManager(this, true); + } + return result; +} + +void ThreadManager::UnwrapCurrentThread() { + Thread* t = CurrentThread(); + if (t && !(t->IsOwned())) { + t->UnwrapCurrent(); + delete t; + } +} + +Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls() + : thread_(Thread::Current()), + previous_state_(thread_->SetAllowBlockingCalls(false)) {} + +Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() { + RTC_DCHECK(thread_->IsCurrent()); + thread_->SetAllowBlockingCalls(previous_state_); +} + +#if RTC_DCHECK_IS_ON +Thread::ScopedCountBlockingCalls::ScopedCountBlockingCalls( + std::function<void(uint32_t, uint32_t)> callback) + : thread_(Thread::Current()), + base_blocking_call_count_(thread_->GetBlockingCallCount()), + base_could_be_blocking_call_count_( + thread_->GetCouldBeBlockingCallCount()), + result_callback_(std::move(callback)) {} + +Thread::ScopedCountBlockingCalls::~ScopedCountBlockingCalls() { + if (GetTotalBlockedCallCount() >= min_blocking_calls_for_callback_) { + result_callback_(GetBlockingCallCount(), GetCouldBeBlockingCallCount()); + } +} + +uint32_t Thread::ScopedCountBlockingCalls::GetBlockingCallCount() const { + return thread_->GetBlockingCallCount() - base_blocking_call_count_; +} + +uint32_t Thread::ScopedCountBlockingCalls::GetCouldBeBlockingCallCount() const { + return thread_->GetCouldBeBlockingCallCount() - + base_could_be_blocking_call_count_; +} + +uint32_t Thread::ScopedCountBlockingCalls::GetTotalBlockedCallCount() const { + return GetBlockingCallCount() + GetCouldBeBlockingCallCount(); +} +#endif + +Thread::Thread(SocketServer* ss) : Thread(ss, /*do_init=*/true) {} + +Thread::Thread(std::unique_ptr<SocketServer> ss) + : Thread(std::move(ss), /*do_init=*/true) {} + +Thread::Thread(SocketServer* ss, bool do_init) + : delayed_next_num_(0), + fInitialized_(false), + fDestroyed_(false), + stop_(0), + ss_(ss) { + RTC_DCHECK(ss); + ss_->SetMessageQueue(this); + SetName("Thread", this); // default name + if (do_init) { + DoInit(); + } +} + +Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init) + : Thread(ss.get(), do_init) { + own_ss_ = std::move(ss); +} + +Thread::~Thread() { + Stop(); + DoDestroy(); +} + +void Thread::DoInit() { + if (fInitialized_) { + return; + } + + fInitialized_ = true; + ThreadManager::Add(this); +} + +void Thread::DoDestroy() { + if (fDestroyed_) { + return; + } + + fDestroyed_ = true; + // The signal is done from here to ensure + // that it always gets called when the queue + // is going away. + if (ss_) { + ss_->SetMessageQueue(nullptr); + } + ThreadManager::Remove(this); + // Clear. + CurrentTaskQueueSetter set_current(this); + messages_ = {}; + delayed_messages_ = {}; +} + +SocketServer* Thread::socketserver() { + return ss_; +} + +void Thread::WakeUpSocketServer() { + ss_->WakeUp(); +} + +void Thread::Quit() { + stop_.store(1, std::memory_order_release); + WakeUpSocketServer(); +} + +bool Thread::IsQuitting() { + return stop_.load(std::memory_order_acquire) != 0; +} + +void Thread::Restart() { + stop_.store(0, std::memory_order_release); +} + +absl::AnyInvocable<void() &&> Thread::Get(int cmsWait) { + // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch + + int64_t cmsTotal = cmsWait; + int64_t cmsElapsed = 0; + int64_t msStart = TimeMillis(); + int64_t msCurrent = msStart; + while (true) { + // Check for posted events + int64_t cmsDelayNext = kForever; + { + // All queue operations need to be locked, but nothing else in this loop + // can happen while holding the `mutex_`. + MutexLock lock(&mutex_); + // Check for delayed messages that have been triggered and calculate the + // next trigger time. + while (!delayed_messages_.empty()) { + if (msCurrent < delayed_messages_.top().run_time_ms) { + cmsDelayNext = + TimeDiff(delayed_messages_.top().run_time_ms, msCurrent); + break; + } + messages_.push(std::move(delayed_messages_.top().functor)); + delayed_messages_.pop(); + } + // Pull a message off the message queue, if available. + if (!messages_.empty()) { + absl::AnyInvocable<void()&&> task = std::move(messages_.front()); + messages_.pop(); + return task; + } + } + + if (IsQuitting()) + break; + + // Which is shorter, the delay wait or the asked wait? + + int64_t cmsNext; + if (cmsWait == kForever) { + cmsNext = cmsDelayNext; + } else { + cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); + if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) + cmsNext = cmsDelayNext; + } + + { + // Wait and multiplex in the meantime + if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever + : webrtc::TimeDelta::Millis(cmsNext), + /*process_io=*/true)) + return nullptr; + } + + // If the specified timeout expired, return + + msCurrent = TimeMillis(); + cmsElapsed = TimeDiff(msCurrent, msStart); + if (cmsWait != kForever) { + if (cmsElapsed >= cmsWait) + return nullptr; + } + } + return nullptr; +} + +void Thread::PostTask(absl::AnyInvocable<void() &&> task) { + if (IsQuitting()) { + return; + } + + // Keep thread safe + // Add the message to the end of the queue + // Signal for the multiplexer to return + + { + MutexLock lock(&mutex_); + messages_.push(std::move(task)); + } + WakeUpSocketServer(); +} + +void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task, + webrtc::TimeDelta delay) { + if (IsQuitting()) { + return; + } + + // Keep thread safe + // Add to the priority queue. Gets sorted soonest first. + // Signal for the multiplexer to return. + + int64_t delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>(); + int64_t run_time_ms = TimeAfter(delay_ms); + { + MutexLock lock(&mutex_); + delayed_messages_.push({.delay_ms = delay_ms, + .run_time_ms = run_time_ms, + .message_number = delayed_next_num_, + .functor = std::move(task)}); + // If this message queue processes 1 message every millisecond for 50 days, + // we will wrap this number. Even then, only messages with identical times + // will be misordered, and then only briefly. This is probably ok. + ++delayed_next_num_; + RTC_DCHECK_NE(0, delayed_next_num_); + } + WakeUpSocketServer(); +} + +int Thread::GetDelay() { + MutexLock lock(&mutex_); + + if (!messages_.empty()) + return 0; + + if (!delayed_messages_.empty()) { + int delay = TimeUntil(delayed_messages_.top().run_time_ms); + if (delay < 0) + delay = 0; + return delay; + } + + return kForever; +} + +void Thread::Dispatch(absl::AnyInvocable<void() &&> task) { + TRACE_EVENT0("webrtc", "Thread::Dispatch"); + RTC_DCHECK_RUN_ON(this); + int64_t start_time = TimeMillis(); + std::move(task)(); + int64_t end_time = TimeMillis(); + int64_t diff = TimeDiff(end_time, start_time); + if (diff >= dispatch_warning_ms_) { + RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff + << "ms to dispatch."; + // To avoid log spew, move the warning limit to only give warning + // for delays that are larger than the one observed. + dispatch_warning_ms_ = diff + 1; + } +} + +bool Thread::IsCurrent() const { + return ThreadManager::Instance()->CurrentThread() == this; +} + +std::unique_ptr<Thread> Thread::CreateWithSocketServer() { + return std::unique_ptr<Thread>(new Thread(CreateDefaultSocketServer())); +} + +std::unique_ptr<Thread> Thread::Create() { + return std::unique_ptr<Thread>( + new Thread(std::unique_ptr<SocketServer>(new NullSocketServer()))); +} + +bool Thread::SleepMs(int milliseconds) { + AssertBlockingIsAllowedOnCurrentThread(); + +#if defined(WEBRTC_WIN) + ::Sleep(milliseconds); + return true; +#else + // POSIX has both a usleep() and a nanosleep(), but the former is deprecated, + // so we use nanosleep() even though it has greater precision than necessary. + struct timespec ts; + ts.tv_sec = milliseconds / 1000; + ts.tv_nsec = (milliseconds % 1000) * 1000000; + int ret = nanosleep(&ts, nullptr); + if (ret != 0) { + RTC_LOG_ERR(LS_WARNING) << "nanosleep() returning early"; + return false; + } + return true; +#endif +} + +bool Thread::SetName(absl::string_view name, const void* obj) { + RTC_DCHECK(!IsRunning()); + + name_ = std::string(name); + if (obj) { + // The %p specifier typically produce at most 16 hex digits, possibly with a + // 0x prefix. But format is implementation defined, so add some margin. + char buf[30]; + snprintf(buf, sizeof(buf), " 0x%p", obj); + name_ += buf; + } + return true; +} + +void Thread::SetDispatchWarningMs(int deadline) { + if (!IsCurrent()) { + PostTask([this, deadline]() { SetDispatchWarningMs(deadline); }); + return; + } + RTC_DCHECK_RUN_ON(this); + dispatch_warning_ms_ = deadline; +} + +bool Thread::Start() { + RTC_DCHECK(!IsRunning()); + + if (IsRunning()) + return false; + + Restart(); // reset IsQuitting() if the thread is being restarted + + // Make sure that ThreadManager is created on the main thread before + // we start a new thread. + ThreadManager::Instance(); + + owned_ = true; + +#if defined(WEBRTC_WIN) + thread_ = CreateThread(nullptr, 0, PreRun, this, 0, &thread_id_); + if (!thread_) { + return false; + } +#elif defined(WEBRTC_POSIX) + pthread_attr_t attr; + pthread_attr_init(&attr); + + int error_code = pthread_create(&thread_, &attr, PreRun, this); + if (0 != error_code) { + RTC_LOG(LS_ERROR) << "Unable to create pthread, error " << error_code; + thread_ = 0; + return false; + } + RTC_DCHECK(thread_); +#endif + return true; +} + +bool Thread::WrapCurrent() { + return WrapCurrentWithThreadManager(ThreadManager::Instance(), true); +} + +void Thread::UnwrapCurrent() { + // Clears the platform-specific thread-specific storage. + ThreadManager::Instance()->SetCurrentThread(nullptr); +#if defined(WEBRTC_WIN) + if (thread_ != nullptr) { + if (!CloseHandle(thread_)) { + RTC_LOG_GLE(LS_ERROR) + << "When unwrapping thread, failed to close handle."; + } + thread_ = nullptr; + thread_id_ = 0; + } +#elif defined(WEBRTC_POSIX) + thread_ = 0; +#endif +} + +void Thread::SafeWrapCurrent() { + WrapCurrentWithThreadManager(ThreadManager::Instance(), false); +} + +void Thread::Join() { + if (!IsRunning()) + return; + + RTC_DCHECK(!IsCurrent()); + if (Current() && !Current()->blocking_calls_allowed_) { + RTC_LOG(LS_WARNING) << "Waiting for the thread to join, " + "but blocking calls have been disallowed"; + } + +#if defined(WEBRTC_WIN) + RTC_DCHECK(thread_ != nullptr); + WaitForSingleObject(thread_, INFINITE); + CloseHandle(thread_); + thread_ = nullptr; + thread_id_ = 0; +#elif defined(WEBRTC_POSIX) + pthread_join(thread_, nullptr); + thread_ = 0; +#endif +} + +bool Thread::SetAllowBlockingCalls(bool allow) { + RTC_DCHECK(IsCurrent()); + bool previous = blocking_calls_allowed_; + blocking_calls_allowed_ = allow; + return previous; +} + +// static +void Thread::AssertBlockingIsAllowedOnCurrentThread() { +#if !defined(NDEBUG) + Thread* current = Thread::Current(); + RTC_DCHECK(!current || current->blocking_calls_allowed_); +#endif +} + +// static +#if defined(WEBRTC_WIN) +DWORD WINAPI Thread::PreRun(LPVOID pv) { +#else +void* Thread::PreRun(void* pv) { +#endif + Thread* thread = static_cast<Thread*>(pv); + ThreadManager::Instance()->SetCurrentThread(thread); + rtc::SetCurrentThreadName(thread->name_.c_str()); +#if defined(WEBRTC_MAC) + ScopedAutoReleasePool pool; +#endif + thread->Run(); + + ThreadManager::Instance()->SetCurrentThread(nullptr); +#ifdef WEBRTC_WIN + return 0; +#else + return nullptr; +#endif +} // namespace rtc + +void Thread::Run() { + ProcessMessages(kForever); +} + +bool Thread::IsOwned() { + RTC_DCHECK(IsRunning()); + return owned_; +} + +void Thread::Stop() { + Thread::Quit(); + Join(); +} + +void Thread::BlockingCall(rtc::FunctionView<void()> functor) { + TRACE_EVENT0("webrtc", "Thread::BlockingCall"); + + RTC_DCHECK(!IsQuitting()); + if (IsQuitting()) + return; + + if (IsCurrent()) { +#if RTC_DCHECK_IS_ON + RTC_DCHECK(this->IsInvokeToThreadAllowed(this)); + RTC_DCHECK_RUN_ON(this); + could_be_blocking_call_count_++; +#endif + functor(); + return; + } + + AssertBlockingIsAllowedOnCurrentThread(); + + Thread* current_thread = Thread::Current(); + +#if RTC_DCHECK_IS_ON + if (current_thread) { + RTC_DCHECK_RUN_ON(current_thread); + current_thread->blocking_call_count_++; + RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this)); + ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread, + this); + } +#endif + + // Perhaps down the line we can get rid of this workaround and always require + // current_thread to be valid when BlockingCall() is called. + std::unique_ptr<rtc::Event> done_event; + if (!current_thread) + done_event.reset(new rtc::Event()); + + bool ready = false; + absl::Cleanup cleanup = [this, &ready, current_thread, + done = done_event.get()] { + if (current_thread) { + { + MutexLock lock(&mutex_); + ready = true; + } + current_thread->socketserver()->WakeUp(); + } else { + done->Set(); + } + }; + PostTask([functor, cleanup = std::move(cleanup)] { functor(); }); + if (current_thread) { + bool waited = false; + mutex_.Lock(); + while (!ready) { + mutex_.Unlock(); + current_thread->socketserver()->Wait(SocketServer::kForever, false); + waited = true; + mutex_.Lock(); + } + mutex_.Unlock(); + + // Our Wait loop above may have consumed some WakeUp events for this + // Thread, that weren't relevant to this Send. Losing these WakeUps can + // cause problems for some SocketServers. + // + // Concrete example: + // Win32SocketServer on thread A calls Send on thread B. While processing + // the message, thread B Posts a message to A. We consume the wakeup for + // that Post while waiting for the Send to complete, which means that when + // we exit this loop, we need to issue another WakeUp, or else the Posted + // message won't be processed in a timely manner. + + if (waited) { + current_thread->socketserver()->WakeUp(); + } + } else { + done_event->Wait(rtc::Event::kForever); + } +} + +// Called by the ThreadManager when being set as the current thread. +void Thread::EnsureIsCurrentTaskQueue() { + task_queue_registration_ = + std::make_unique<TaskQueueBase::CurrentTaskQueueSetter>(this); +} + +// Called by the ThreadManager when being set as the current thread. +void Thread::ClearCurrentTaskQueue() { + task_queue_registration_.reset(); +} + +void Thread::AllowInvokesToThread(Thread* thread) { +#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON) + if (!IsCurrent()) { + PostTask([thread, this]() { AllowInvokesToThread(thread); }); + return; + } + RTC_DCHECK_RUN_ON(this); + allowed_threads_.push_back(thread); + invoke_policy_enabled_ = true; +#endif +} + +void Thread::DisallowAllInvokes() { +#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON) + if (!IsCurrent()) { + PostTask([this]() { DisallowAllInvokes(); }); + return; + } + RTC_DCHECK_RUN_ON(this); + allowed_threads_.clear(); + invoke_policy_enabled_ = true; +#endif +} + +#if RTC_DCHECK_IS_ON +uint32_t Thread::GetBlockingCallCount() const { + RTC_DCHECK_RUN_ON(this); + return blocking_call_count_; +} +uint32_t Thread::GetCouldBeBlockingCallCount() const { + RTC_DCHECK_RUN_ON(this); + return could_be_blocking_call_count_; +} +#endif + +// Returns true if no policies added or if there is at least one policy +// that permits invocation to `target` thread. +bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) { +#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON) + RTC_DCHECK_RUN_ON(this); + if (!invoke_policy_enabled_) { + return true; + } + for (const auto* thread : allowed_threads_) { + if (thread == target) { + return true; + } + } + return false; +#else + return true; +#endif +} + +void Thread::Delete() { + Stop(); + delete this; +} + +void Thread::PostDelayedTask(absl::AnyInvocable<void() &&> task, + webrtc::TimeDelta delay) { + // This implementation does not support low precision yet. + PostDelayedHighPrecisionTask(std::move(task), delay); +} + +bool Thread::IsProcessingMessagesForTesting() { + return (owned_ || IsCurrent()) && !IsQuitting(); +} + +bool Thread::ProcessMessages(int cmsLoop) { + // Using ProcessMessages with a custom clock for testing and a time greater + // than 0 doesn't work, since it's not guaranteed to advance the custom + // clock's time, and may get stuck in an infinite loop. + RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 || + cmsLoop == kForever); + int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop); + int cmsNext = cmsLoop; + + while (true) { +#if defined(WEBRTC_MAC) + ScopedAutoReleasePool pool; +#endif + absl::AnyInvocable<void()&&> task = Get(cmsNext); + if (!task) + return !IsQuitting(); + Dispatch(std::move(task)); + + if (cmsLoop != kForever) { + cmsNext = static_cast<int>(TimeUntil(msEnd)); + if (cmsNext < 0) + return true; + } + } +} + +bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager, + bool need_synchronize_access) { + RTC_DCHECK(!IsRunning()); + +#if defined(WEBRTC_WIN) + if (need_synchronize_access) { + // We explicitly ask for no rights other than synchronization. + // This gives us the best chance of succeeding. + thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId()); + if (!thread_) { + RTC_LOG_GLE(LS_ERROR) << "Unable to get handle to thread."; + return false; + } + thread_id_ = GetCurrentThreadId(); + } +#elif defined(WEBRTC_POSIX) + thread_ = pthread_self(); +#endif + owned_ = false; + thread_manager->SetCurrentThread(this); + return true; +} + +bool Thread::IsRunning() { +#if defined(WEBRTC_WIN) + return thread_ != nullptr; +#elif defined(WEBRTC_POSIX) + return thread_ != 0; +#endif +} + +AutoThread::AutoThread() + : Thread(CreateDefaultSocketServer(), /*do_init=*/false) { + if (!ThreadManager::Instance()->CurrentThread()) { + // DoInit registers with ThreadManager. Do that only if we intend to + // be rtc::Thread::Current(), otherwise ProcessAllMessageQueuesInternal will + // post a message to a queue that no running thread is serving. + DoInit(); + ThreadManager::Instance()->SetCurrentThread(this); + } +} + +AutoThread::~AutoThread() { + Stop(); + DoDestroy(); + if (ThreadManager::Instance()->CurrentThread() == this) { + ThreadManager::Instance()->SetCurrentThread(nullptr); + } +} + +AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss) + : Thread(ss, /*do_init=*/false) { + DoInit(); + old_thread_ = ThreadManager::Instance()->CurrentThread(); + // Temporarily set the current thread to nullptr so that we can keep checks + // around that catch unintentional pointer overwrites. + rtc::ThreadManager::Instance()->SetCurrentThread(nullptr); + rtc::ThreadManager::Instance()->SetCurrentThread(this); + if (old_thread_) { + ThreadManager::Remove(old_thread_); + } +} + +AutoSocketServerThread::~AutoSocketServerThread() { + RTC_DCHECK(ThreadManager::Instance()->CurrentThread() == this); + // Stop and destroy the thread before clearing it as the current thread. + // Sometimes there are messages left in the Thread that will be + // destroyed by DoDestroy, and sometimes the destructors of the message and/or + // its contents rely on this thread still being set as the current thread. + Stop(); + DoDestroy(); + rtc::ThreadManager::Instance()->SetCurrentThread(nullptr); + rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_); + if (old_thread_) { + ThreadManager::Add(old_thread_); + } +} + +} // namespace rtc |