diff options
Diffstat (limited to '')
-rw-r--r-- | third_party/libwebrtc/rtc_base/thread.cc | 1221 |
1 files changed, 1221 insertions, 0 deletions
diff --git a/third_party/libwebrtc/rtc_base/thread.cc b/third_party/libwebrtc/rtc_base/thread.cc new file mode 100644 index 0000000000..56b6b43ef3 --- /dev/null +++ b/third_party/libwebrtc/rtc_base/thread.cc @@ -0,0 +1,1221 @@ +/* + * Copyright 2004 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/thread.h" + +#include "absl/strings/string_view.h" + +#if defined(WEBRTC_WIN) +#include <comdef.h> +#elif defined(WEBRTC_POSIX) +#include <time.h> +#else +#error "Either WEBRTC_WIN or WEBRTC_POSIX needs to be defined." +#endif + +#if defined(WEBRTC_WIN) +// Disable warning that we don't care about: +// warning C4722: destructor never returns, potential memory leak +#pragma warning(disable : 4722) +#endif + +#include <stdio.h> + +#include <utility> + +#include "absl/algorithm/container.h" +#include "absl/cleanup/cleanup.h" +#include "api/sequence_checker.h" +#include "rtc_base/checks.h" +#include "rtc_base/deprecated/recursive_critical_section.h" +#include "rtc_base/event.h" +#include "rtc_base/internal/default_socket_server.h" +#include "rtc_base/logging.h" +#include "rtc_base/null_socket_server.h" +#include "rtc_base/time_utils.h" +#include "rtc_base/trace_event.h" + +#if defined(WEBRTC_MAC) +#include "rtc_base/system/cocoa_threading.h" + +/* + * These are forward-declarations for methods that are part of the + * ObjC runtime. They are declared in the private header objc-internal.h. + * These calls are what clang inserts when using @autoreleasepool in ObjC, + * but here they are used directly in order to keep this file C++. + * https://clang.llvm.org/docs/AutomaticReferenceCounting.html#runtime-support + */ +extern "C" { +void* objc_autoreleasePoolPush(void); +void objc_autoreleasePoolPop(void* pool); +} + +namespace { +class ScopedAutoReleasePool { + public: + ScopedAutoReleasePool() : pool_(objc_autoreleasePoolPush()) {} + ~ScopedAutoReleasePool() { objc_autoreleasePoolPop(pool_); } + + private: + void* const pool_; +}; +} // namespace +#endif + +namespace rtc { +namespace { + +struct AnyInvocableMessage final : public MessageData { + explicit AnyInvocableMessage(absl::AnyInvocable<void() &&> task) + : task(std::move(task)) {} + absl::AnyInvocable<void() &&> task; +}; + +class AnyInvocableMessageHandler final : public MessageHandler { + public: + void OnMessage(Message* msg) override { + std::move(static_cast<AnyInvocableMessage*>(msg->pdata)->task)(); + delete msg->pdata; + } +}; + +MessageHandler* GetAnyInvocableMessageHandler() { + static MessageHandler* const handler = new AnyInvocableMessageHandler; + return handler; +} + +class RTC_SCOPED_LOCKABLE MarkProcessingCritScope { + public: + MarkProcessingCritScope(const RecursiveCriticalSection* cs, + size_t* processing) RTC_EXCLUSIVE_LOCK_FUNCTION(cs) + : cs_(cs), processing_(processing) { + cs_->Enter(); + *processing_ += 1; + } + + ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() { + *processing_ -= 1; + cs_->Leave(); + } + + MarkProcessingCritScope(const MarkProcessingCritScope&) = delete; + MarkProcessingCritScope& operator=(const MarkProcessingCritScope&) = delete; + + private: + const RecursiveCriticalSection* const cs_; + size_t* processing_; +}; + +} // namespace + +ThreadManager* ThreadManager::Instance() { + static ThreadManager* const thread_manager = new ThreadManager(); + return thread_manager; +} + +ThreadManager::~ThreadManager() { + // By above RTC_DEFINE_STATIC_LOCAL. + RTC_DCHECK_NOTREACHED() << "ThreadManager should never be destructed."; +} + +// static +void ThreadManager::Add(Thread* message_queue) { + return Instance()->AddInternal(message_queue); +} +void ThreadManager::AddInternal(Thread* message_queue) { + CritScope cs(&crit_); + // Prevent changes while the list of message queues is processed. + RTC_DCHECK_EQ(processing_, 0); + message_queues_.push_back(message_queue); +} + +// static +void ThreadManager::Remove(Thread* message_queue) { + return Instance()->RemoveInternal(message_queue); +} +void ThreadManager::RemoveInternal(Thread* message_queue) { + { + CritScope cs(&crit_); + // Prevent changes while the list of message queues is processed. + RTC_DCHECK_EQ(processing_, 0); + std::vector<Thread*>::iterator iter; + iter = absl::c_find(message_queues_, message_queue); + if (iter != message_queues_.end()) { + message_queues_.erase(iter); + } +#if RTC_DCHECK_IS_ON + RemoveFromSendGraph(message_queue); +#endif + } +} + +#if RTC_DCHECK_IS_ON +void ThreadManager::RemoveFromSendGraph(Thread* thread) { + for (auto it = send_graph_.begin(); it != send_graph_.end();) { + if (it->first == thread) { + it = send_graph_.erase(it); + } else { + it->second.erase(thread); + ++it; + } + } +} + +void ThreadManager::RegisterSendAndCheckForCycles(Thread* source, + Thread* target) { + RTC_DCHECK(source); + RTC_DCHECK(target); + + CritScope cs(&crit_); + std::deque<Thread*> all_targets({target}); + // We check the pre-existing who-sends-to-who graph for any path from target + // to source. This loop is guaranteed to terminate because per the send graph + // invariant, there are no cycles in the graph. + for (size_t i = 0; i < all_targets.size(); i++) { + const auto& targets = send_graph_[all_targets[i]]; + all_targets.insert(all_targets.end(), targets.begin(), targets.end()); + } + RTC_CHECK_EQ(absl::c_count(all_targets, source), 0) + << " send loop between " << source->name() << " and " << target->name(); + + // We may now insert source -> target without creating a cycle, since there + // was no path from target to source per the prior CHECK. + send_graph_[source].insert(target); +} +#endif + +// static +void ThreadManager::Clear(MessageHandler* handler) { + return Instance()->ClearInternal(handler); +} +void ThreadManager::ClearInternal(MessageHandler* handler) { + // Deleted objects may cause re-entrant calls to ClearInternal. This is + // allowed as the list of message queues does not change while queues are + // cleared. + MarkProcessingCritScope cs(&crit_, &processing_); + for (Thread* queue : message_queues_) { + queue->Clear(handler); + } +} + +// static +void ThreadManager::ProcessAllMessageQueuesForTesting() { + return Instance()->ProcessAllMessageQueuesInternal(); +} + +void ThreadManager::ProcessAllMessageQueuesInternal() { + // This works by posting a delayed message at the current time and waiting + // for it to be dispatched on all queues, which will ensure that all messages + // that came before it were also dispatched. + std::atomic<int> queues_not_done(0); + + // This class is used so that whether the posted message is processed, or the + // message queue is simply cleared, queues_not_done gets decremented. + class ScopedIncrement : public MessageData { + public: + ScopedIncrement(std::atomic<int>* value) : value_(value) { + value_->fetch_add(1); + } + ~ScopedIncrement() override { value_->fetch_sub(1); } + + private: + std::atomic<int>* value_; + }; + + { + MarkProcessingCritScope cs(&crit_, &processing_); + for (Thread* queue : message_queues_) { + if (!queue->IsProcessingMessagesForTesting()) { + // If the queue is not processing messages, it can + // be ignored. If we tried to post a message to it, it would be dropped + // or ignored. + continue; + } + queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, + new ScopedIncrement(&queues_not_done)); + } + } + + rtc::Thread* current = rtc::Thread::Current(); + // Note: One of the message queues may have been on this thread, which is + // why we can't synchronously wait for queues_not_done to go to 0; we need + // to process messages as well. + while (queues_not_done.load() > 0) { + if (current) { + current->ProcessMessages(0); + } + } +} + +// static +Thread* Thread::Current() { + ThreadManager* manager = ThreadManager::Instance(); + Thread* thread = manager->CurrentThread(); + + return thread; +} + +#if defined(WEBRTC_POSIX) +ThreadManager::ThreadManager() { +#if defined(WEBRTC_MAC) + InitCocoaMultiThreading(); +#endif + pthread_key_create(&key_, nullptr); +} + +Thread* ThreadManager::CurrentThread() { + return static_cast<Thread*>(pthread_getspecific(key_)); +} + +void ThreadManager::SetCurrentThreadInternal(Thread* thread) { + pthread_setspecific(key_, thread); +} +#endif + +#if defined(WEBRTC_WIN) +ThreadManager::ThreadManager() : key_(TlsAlloc()) {} + +Thread* ThreadManager::CurrentThread() { + return static_cast<Thread*>(TlsGetValue(key_)); +} + +void ThreadManager::SetCurrentThreadInternal(Thread* thread) { + TlsSetValue(key_, thread); +} +#endif + +void ThreadManager::SetCurrentThread(Thread* thread) { +#if RTC_DLOG_IS_ON + if (CurrentThread() && thread) { + RTC_DLOG(LS_ERROR) << "SetCurrentThread: Overwriting an existing value?"; + } +#endif // RTC_DLOG_IS_ON + + if (thread) { + thread->EnsureIsCurrentTaskQueue(); + } else { + Thread* current = CurrentThread(); + if (current) { + // The current thread is being cleared, e.g. as a result of + // UnwrapCurrent() being called or when a thread is being stopped + // (see PreRun()). This signals that the Thread instance is being detached + // from the thread, which also means that TaskQueue::Current() must not + // return a pointer to the Thread instance. + current->ClearCurrentTaskQueue(); + } + } + + SetCurrentThreadInternal(thread); +} + +void rtc::ThreadManager::ChangeCurrentThreadForTest(rtc::Thread* thread) { + SetCurrentThreadInternal(thread); +} + +Thread* ThreadManager::WrapCurrentThread() { + Thread* result = CurrentThread(); + if (nullptr == result) { + result = new Thread(CreateDefaultSocketServer()); + result->WrapCurrentWithThreadManager(this, true); + } + return result; +} + +void ThreadManager::UnwrapCurrentThread() { + Thread* t = CurrentThread(); + if (t && !(t->IsOwned())) { + t->UnwrapCurrent(); + delete t; + } +} + +Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls() + : thread_(Thread::Current()), + previous_state_(thread_->SetAllowBlockingCalls(false)) {} + +Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() { + RTC_DCHECK(thread_->IsCurrent()); + thread_->SetAllowBlockingCalls(previous_state_); +} + +#if RTC_DCHECK_IS_ON +Thread::ScopedCountBlockingCalls::ScopedCountBlockingCalls( + std::function<void(uint32_t, uint32_t)> callback) + : thread_(Thread::Current()), + base_blocking_call_count_(thread_->GetBlockingCallCount()), + base_could_be_blocking_call_count_( + thread_->GetCouldBeBlockingCallCount()), + result_callback_(std::move(callback)) {} + +Thread::ScopedCountBlockingCalls::~ScopedCountBlockingCalls() { + if (GetTotalBlockedCallCount() >= min_blocking_calls_for_callback_) { + result_callback_(GetBlockingCallCount(), GetCouldBeBlockingCallCount()); + } +} + +uint32_t Thread::ScopedCountBlockingCalls::GetBlockingCallCount() const { + return thread_->GetBlockingCallCount() - base_blocking_call_count_; +} + +uint32_t Thread::ScopedCountBlockingCalls::GetCouldBeBlockingCallCount() const { + return thread_->GetCouldBeBlockingCallCount() - + base_could_be_blocking_call_count_; +} + +uint32_t Thread::ScopedCountBlockingCalls::GetTotalBlockedCallCount() const { + return GetBlockingCallCount() + GetCouldBeBlockingCallCount(); +} +#endif + +Thread::Thread(SocketServer* ss) : Thread(ss, /*do_init=*/true) {} + +Thread::Thread(std::unique_ptr<SocketServer> ss) + : Thread(std::move(ss), /*do_init=*/true) {} + +Thread::Thread(SocketServer* ss, bool do_init) + : fPeekKeep_(false), + delayed_next_num_(0), + fInitialized_(false), + fDestroyed_(false), + stop_(0), + ss_(ss) { + RTC_DCHECK(ss); + ss_->SetMessageQueue(this); + SetName("Thread", this); // default name + if (do_init) { + DoInit(); + } +} + +Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init) + : Thread(ss.get(), do_init) { + own_ss_ = std::move(ss); +} + +Thread::~Thread() { + Stop(); + DoDestroy(); +} + +void Thread::DoInit() { + if (fInitialized_) { + return; + } + + fInitialized_ = true; + ThreadManager::Add(this); +} + +void Thread::DoDestroy() { + if (fDestroyed_) { + return; + } + + fDestroyed_ = true; + // The signal is done from here to ensure + // that it always gets called when the queue + // is going away. + if (ss_) { + ss_->SetMessageQueue(nullptr); + } + ThreadManager::Remove(this); + ClearInternal(nullptr, MQID_ANY, nullptr); +} + +SocketServer* Thread::socketserver() { + return ss_; +} + +void Thread::WakeUpSocketServer() { + ss_->WakeUp(); +} + +void Thread::Quit() { + stop_.store(1, std::memory_order_release); + WakeUpSocketServer(); +} + +bool Thread::IsQuitting() { + return stop_.load(std::memory_order_acquire) != 0; +} + +void Thread::Restart() { + stop_.store(0, std::memory_order_release); +} + +bool Thread::Peek(Message* pmsg, int cmsWait) { + if (fPeekKeep_) { + *pmsg = msgPeek_; + return true; + } + if (!Get(pmsg, cmsWait)) + return false; + msgPeek_ = *pmsg; + fPeekKeep_ = true; + return true; +} + +bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) { + // Return and clear peek if present + // Always return the peek if it exists so there is Peek/Get symmetry + + if (fPeekKeep_) { + *pmsg = msgPeek_; + fPeekKeep_ = false; + return true; + } + + // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch + + int64_t cmsTotal = cmsWait; + int64_t cmsElapsed = 0; + int64_t msStart = TimeMillis(); + int64_t msCurrent = msStart; + while (true) { + // Check for posted events + int64_t cmsDelayNext = kForever; + bool first_pass = true; + while (true) { + // All queue operations need to be locked, but nothing else in this loop + // (specifically handling disposed message) can happen inside the crit. + // Otherwise, disposed MessageHandlers will cause deadlocks. + { + CritScope cs(&crit_); + // On the first pass, check for delayed messages that have been + // triggered and calculate the next trigger time. + if (first_pass) { + first_pass = false; + while (!delayed_messages_.empty()) { + if (msCurrent < delayed_messages_.top().run_time_ms_) { + cmsDelayNext = + TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent); + break; + } + messages_.push_back(delayed_messages_.top().msg_); + delayed_messages_.pop(); + } + } + // Pull a message off the message queue, if available. + if (messages_.empty()) { + break; + } else { + *pmsg = messages_.front(); + messages_.pop_front(); + } + } // crit_ is released here. + + // If this was a dispose message, delete it and skip it. + if (MQID_DISPOSE == pmsg->message_id) { + RTC_DCHECK(nullptr == pmsg->phandler); + delete pmsg->pdata; + *pmsg = Message(); + continue; + } + return true; + } + + if (IsQuitting()) + break; + + // Which is shorter, the delay wait or the asked wait? + + int64_t cmsNext; + if (cmsWait == kForever) { + cmsNext = cmsDelayNext; + } else { + cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); + if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) + cmsNext = cmsDelayNext; + } + + { + // Wait and multiplex in the meantime + if (!ss_->Wait(static_cast<int>(cmsNext), process_io)) + return false; + } + + // If the specified timeout expired, return + + msCurrent = TimeMillis(); + cmsElapsed = TimeDiff(msCurrent, msStart); + if (cmsWait != kForever) { + if (cmsElapsed >= cmsWait) + return false; + } + } + return false; +} + +void Thread::Post(const Location& posted_from, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata, + bool time_sensitive) { + RTC_DCHECK(!time_sensitive); + if (IsQuitting()) { + delete pdata; + return; + } + + // Keep thread safe + // Add the message to the end of the queue + // Signal for the multiplexer to return + + { + CritScope cs(&crit_); + Message msg; + msg.posted_from = posted_from; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + messages_.push_back(msg); + } + WakeUpSocketServer(); +} + +void Thread::PostDelayed(const Location& posted_from, + int delay_ms, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata) { + return DoDelayPost(posted_from, delay_ms, TimeAfter(delay_ms), phandler, id, + pdata); +} + +void Thread::PostAt(const Location& posted_from, + int64_t run_at_ms, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata) { + return DoDelayPost(posted_from, TimeUntil(run_at_ms), run_at_ms, phandler, id, + pdata); +} + +void Thread::DoDelayPost(const Location& posted_from, + int64_t delay_ms, + int64_t run_at_ms, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata) { + if (IsQuitting()) { + delete pdata; + return; + } + + // Keep thread safe + // Add to the priority queue. Gets sorted soonest first. + // Signal for the multiplexer to return. + + { + CritScope cs(&crit_); + Message msg; + msg.posted_from = posted_from; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + DelayedMessage delayed(delay_ms, run_at_ms, delayed_next_num_, msg); + delayed_messages_.push(delayed); + // If this message queue processes 1 message every millisecond for 50 days, + // we will wrap this number. Even then, only messages with identical times + // will be misordered, and then only briefly. This is probably ok. + ++delayed_next_num_; + RTC_DCHECK_NE(0, delayed_next_num_); + } + WakeUpSocketServer(); +} + +int Thread::GetDelay() { + CritScope cs(&crit_); + + if (!messages_.empty()) + return 0; + + if (!delayed_messages_.empty()) { + int delay = TimeUntil(delayed_messages_.top().run_time_ms_); + if (delay < 0) + delay = 0; + return delay; + } + + return kForever; +} + +void Thread::ClearInternal(MessageHandler* phandler, + uint32_t id, + MessageList* removed) { + // Remove messages with phandler + + if (fPeekKeep_ && msgPeek_.Match(phandler, id)) { + if (removed) { + removed->push_back(msgPeek_); + } else { + delete msgPeek_.pdata; + } + fPeekKeep_ = false; + } + + // Remove from ordered message queue + + for (auto it = messages_.begin(); it != messages_.end();) { + if (it->Match(phandler, id)) { + if (removed) { + removed->push_back(*it); + } else { + delete it->pdata; + } + it = messages_.erase(it); + } else { + ++it; + } + } + + // Remove from priority queue. Not directly iterable, so use this approach + + auto new_end = delayed_messages_.container().begin(); + for (auto it = new_end; it != delayed_messages_.container().end(); ++it) { + if (it->msg_.Match(phandler, id)) { + if (removed) { + removed->push_back(it->msg_); + } else { + delete it->msg_.pdata; + } + } else { + *new_end++ = *it; + } + } + delayed_messages_.container().erase(new_end, + delayed_messages_.container().end()); + delayed_messages_.reheap(); +} + +void Thread::Dispatch(Message* pmsg) { + TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file", + pmsg->posted_from.file_name(), "src_func", + pmsg->posted_from.function_name()); + RTC_DCHECK_RUN_ON(this); + int64_t start_time = TimeMillis(); + pmsg->phandler->OnMessage(pmsg); + int64_t end_time = TimeMillis(); + int64_t diff = TimeDiff(end_time, start_time); + if (diff >= dispatch_warning_ms_) { + RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff + << "ms to dispatch. Posted from: " + << pmsg->posted_from.ToString(); + // To avoid log spew, move the warning limit to only give warning + // for delays that are larger than the one observed. + dispatch_warning_ms_ = diff + 1; + } +} + +bool Thread::IsCurrent() const { + return ThreadManager::Instance()->CurrentThread() == this; +} + +std::unique_ptr<Thread> Thread::CreateWithSocketServer() { + return std::unique_ptr<Thread>(new Thread(CreateDefaultSocketServer())); +} + +std::unique_ptr<Thread> Thread::Create() { + return std::unique_ptr<Thread>( + new Thread(std::unique_ptr<SocketServer>(new NullSocketServer()))); +} + +bool Thread::SleepMs(int milliseconds) { + AssertBlockingIsAllowedOnCurrentThread(); + +#if defined(WEBRTC_WIN) + ::Sleep(milliseconds); + return true; +#else + // POSIX has both a usleep() and a nanosleep(), but the former is deprecated, + // so we use nanosleep() even though it has greater precision than necessary. + struct timespec ts; + ts.tv_sec = milliseconds / 1000; + ts.tv_nsec = (milliseconds % 1000) * 1000000; + int ret = nanosleep(&ts, nullptr); + if (ret != 0) { + RTC_LOG_ERR(LS_WARNING) << "nanosleep() returning early"; + return false; + } + return true; +#endif +} + +bool Thread::SetName(absl::string_view name, const void* obj) { + RTC_DCHECK(!IsRunning()); + + name_ = std::string(name); + if (obj) { + // The %p specifier typically produce at most 16 hex digits, possibly with a + // 0x prefix. But format is implementation defined, so add some margin. + char buf[30]; + snprintf(buf, sizeof(buf), " 0x%p", obj); + name_ += buf; + } + return true; +} + +void Thread::SetDispatchWarningMs(int deadline) { + if (!IsCurrent()) { + PostTask([this, deadline]() { SetDispatchWarningMs(deadline); }); + return; + } + RTC_DCHECK_RUN_ON(this); + dispatch_warning_ms_ = deadline; +} + +bool Thread::Start() { + RTC_DCHECK(!IsRunning()); + + if (IsRunning()) + return false; + + Restart(); // reset IsQuitting() if the thread is being restarted + + // Make sure that ThreadManager is created on the main thread before + // we start a new thread. + ThreadManager::Instance(); + + owned_ = true; + +#if defined(WEBRTC_WIN) + thread_ = CreateThread(nullptr, 0, PreRun, this, 0, &thread_id_); + if (!thread_) { + return false; + } +#elif defined(WEBRTC_POSIX) + pthread_attr_t attr; + pthread_attr_init(&attr); + + int error_code = pthread_create(&thread_, &attr, PreRun, this); + if (0 != error_code) { + RTC_LOG(LS_ERROR) << "Unable to create pthread, error " << error_code; + thread_ = 0; + return false; + } + RTC_DCHECK(thread_); +#endif + return true; +} + +bool Thread::WrapCurrent() { + return WrapCurrentWithThreadManager(ThreadManager::Instance(), true); +} + +void Thread::UnwrapCurrent() { + // Clears the platform-specific thread-specific storage. + ThreadManager::Instance()->SetCurrentThread(nullptr); +#if defined(WEBRTC_WIN) + if (thread_ != nullptr) { + if (!CloseHandle(thread_)) { + RTC_LOG_GLE(LS_ERROR) + << "When unwrapping thread, failed to close handle."; + } + thread_ = nullptr; + thread_id_ = 0; + } +#elif defined(WEBRTC_POSIX) + thread_ = 0; +#endif +} + +void Thread::SafeWrapCurrent() { + WrapCurrentWithThreadManager(ThreadManager::Instance(), false); +} + +void Thread::Join() { + if (!IsRunning()) + return; + + RTC_DCHECK(!IsCurrent()); + if (Current() && !Current()->blocking_calls_allowed_) { + RTC_LOG(LS_WARNING) << "Waiting for the thread to join, " + "but blocking calls have been disallowed"; + } + +#if defined(WEBRTC_WIN) + RTC_DCHECK(thread_ != nullptr); + WaitForSingleObject(thread_, INFINITE); + CloseHandle(thread_); + thread_ = nullptr; + thread_id_ = 0; +#elif defined(WEBRTC_POSIX) + pthread_join(thread_, nullptr); + thread_ = 0; +#endif +} + +bool Thread::SetAllowBlockingCalls(bool allow) { + RTC_DCHECK(IsCurrent()); + bool previous = blocking_calls_allowed_; + blocking_calls_allowed_ = allow; + return previous; +} + +// static +void Thread::AssertBlockingIsAllowedOnCurrentThread() { +#if !defined(NDEBUG) + Thread* current = Thread::Current(); + RTC_DCHECK(!current || current->blocking_calls_allowed_); +#endif +} + +// static +#if defined(WEBRTC_WIN) +DWORD WINAPI Thread::PreRun(LPVOID pv) { +#else +void* Thread::PreRun(void* pv) { +#endif + Thread* thread = static_cast<Thread*>(pv); + ThreadManager::Instance()->SetCurrentThread(thread); + rtc::SetCurrentThreadName(thread->name_.c_str()); +#if defined(WEBRTC_MAC) + ScopedAutoReleasePool pool; +#endif + thread->Run(); + + ThreadManager::Instance()->SetCurrentThread(nullptr); +#ifdef WEBRTC_WIN + return 0; +#else + return nullptr; +#endif +} // namespace rtc + +void Thread::Run() { + ProcessMessages(kForever); +} + +bool Thread::IsOwned() { + RTC_DCHECK(IsRunning()); + return owned_; +} + +void Thread::Stop() { + Thread::Quit(); + Join(); +} + +void Thread::Send(const Location& posted_from, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata) { + RTC_DCHECK(!IsQuitting()); + if (IsQuitting()) + return; + + // Sent messages are sent to the MessageHandler directly, in the context + // of "thread", like Win32 SendMessage. If in the right context, + // call the handler directly. + Message msg; + msg.posted_from = posted_from; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + if (IsCurrent()) { +#if RTC_DCHECK_IS_ON + RTC_DCHECK(this->IsInvokeToThreadAllowed(this)); + RTC_DCHECK_RUN_ON(this); + could_be_blocking_call_count_++; +#endif + msg.phandler->OnMessage(&msg); + return; + } + + AssertBlockingIsAllowedOnCurrentThread(); + + Thread* current_thread = Thread::Current(); + +#if RTC_DCHECK_IS_ON + if (current_thread) { + RTC_DCHECK_RUN_ON(current_thread); + current_thread->blocking_call_count_++; + RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this)); + ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread, + this); + } +#endif + + // Perhaps down the line we can get rid of this workaround and always require + // current_thread to be valid when Send() is called. + std::unique_ptr<rtc::Event> done_event; + if (!current_thread) + done_event.reset(new rtc::Event()); + + bool ready = false; + absl::Cleanup cleanup = [this, &ready, current_thread, + done = done_event.get()] { + if (current_thread) { + CritScope cs(&crit_); + ready = true; + current_thread->socketserver()->WakeUp(); + } else { + done->Set(); + } + }; + PostTask([&msg, cleanup = std::move(cleanup)]() mutable { + msg.phandler->OnMessage(&msg); + }); + if (current_thread) { + bool waited = false; + crit_.Enter(); + while (!ready) { + crit_.Leave(); + current_thread->socketserver()->Wait(kForever, false); + waited = true; + crit_.Enter(); + } + crit_.Leave(); + + // Our Wait loop above may have consumed some WakeUp events for this + // Thread, that weren't relevant to this Send. Losing these WakeUps can + // cause problems for some SocketServers. + // + // Concrete example: + // Win32SocketServer on thread A calls Send on thread B. While processing + // the message, thread B Posts a message to A. We consume the wakeup for + // that Post while waiting for the Send to complete, which means that when + // we exit this loop, we need to issue another WakeUp, or else the Posted + // message won't be processed in a timely manner. + + if (waited) { + current_thread->socketserver()->WakeUp(); + } + } else { + done_event->Wait(rtc::Event::kForever); + } +} + +void Thread::InvokeInternal(const Location& posted_from, + rtc::FunctionView<void()> functor) { + TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(), + "src_func", posted_from.function_name()); + + class FunctorMessageHandler : public MessageHandler { + public: + explicit FunctorMessageHandler(rtc::FunctionView<void()> functor) + : functor_(functor) {} + void OnMessage(Message* msg) override { functor_(); } + + private: + rtc::FunctionView<void()> functor_; + } handler(functor); + + Send(posted_from, &handler); +} + +// Called by the ThreadManager when being set as the current thread. +void Thread::EnsureIsCurrentTaskQueue() { + task_queue_registration_ = + std::make_unique<TaskQueueBase::CurrentTaskQueueSetter>(this); +} + +// Called by the ThreadManager when being set as the current thread. +void Thread::ClearCurrentTaskQueue() { + task_queue_registration_.reset(); +} + +void Thread::AllowInvokesToThread(Thread* thread) { +#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON) + if (!IsCurrent()) { + PostTask([thread, this]() { AllowInvokesToThread(thread); }); + return; + } + RTC_DCHECK_RUN_ON(this); + allowed_threads_.push_back(thread); + invoke_policy_enabled_ = true; +#endif +} + +void Thread::DisallowAllInvokes() { +#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON) + if (!IsCurrent()) { + PostTask([this]() { DisallowAllInvokes(); }); + return; + } + RTC_DCHECK_RUN_ON(this); + allowed_threads_.clear(); + invoke_policy_enabled_ = true; +#endif +} + +#if RTC_DCHECK_IS_ON +uint32_t Thread::GetBlockingCallCount() const { + RTC_DCHECK_RUN_ON(this); + return blocking_call_count_; +} +uint32_t Thread::GetCouldBeBlockingCallCount() const { + RTC_DCHECK_RUN_ON(this); + return could_be_blocking_call_count_; +} +#endif + +// Returns true if no policies added or if there is at least one policy +// that permits invocation to `target` thread. +bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) { +#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON) + RTC_DCHECK_RUN_ON(this); + if (!invoke_policy_enabled_) { + return true; + } + for (const auto* thread : allowed_threads_) { + if (thread == target) { + return true; + } + } + return false; +#else + return true; +#endif +} + +void Thread::Delete() { + Stop(); + delete this; +} + +void Thread::PostTask(absl::AnyInvocable<void() &&> task) { + // Though Post takes MessageData by raw pointer (last parameter), it still + // takes it with ownership. + Post(RTC_FROM_HERE, GetAnyInvocableMessageHandler(), + /*id=*/0, new AnyInvocableMessage(std::move(task))); +} + +void Thread::PostDelayedTask(absl::AnyInvocable<void() &&> task, + webrtc::TimeDelta delay) { + // This implementation does not support low precision yet. + PostDelayedHighPrecisionTask(std::move(task), delay); +} + +void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task, + webrtc::TimeDelta delay) { + int delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>(); + // Though PostDelayed takes MessageData by raw pointer (last parameter), + // it still takes it with ownership. + PostDelayed(RTC_FROM_HERE, delay_ms, GetAnyInvocableMessageHandler(), + /*id=*/0, new AnyInvocableMessage(std::move(task))); +} + +bool Thread::IsProcessingMessagesForTesting() { + return (owned_ || IsCurrent()) && !IsQuitting(); +} + +void Thread::Clear(MessageHandler* phandler, + uint32_t id, + MessageList* removed) { + CritScope cs(&crit_); + ClearInternal(phandler, id, removed); +} + +bool Thread::ProcessMessages(int cmsLoop) { + // Using ProcessMessages with a custom clock for testing and a time greater + // than 0 doesn't work, since it's not guaranteed to advance the custom + // clock's time, and may get stuck in an infinite loop. + RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 || + cmsLoop == kForever); + int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop); + int cmsNext = cmsLoop; + + while (true) { +#if defined(WEBRTC_MAC) + ScopedAutoReleasePool pool; +#endif + Message msg; + if (!Get(&msg, cmsNext)) + return !IsQuitting(); + Dispatch(&msg); + + if (cmsLoop != kForever) { + cmsNext = static_cast<int>(TimeUntil(msEnd)); + if (cmsNext < 0) + return true; + } + } +} + +bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager, + bool need_synchronize_access) { + RTC_DCHECK(!IsRunning()); + +#if defined(WEBRTC_WIN) + if (need_synchronize_access) { + // We explicitly ask for no rights other than synchronization. + // This gives us the best chance of succeeding. + thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId()); + if (!thread_) { + RTC_LOG_GLE(LS_ERROR) << "Unable to get handle to thread."; + return false; + } + thread_id_ = GetCurrentThreadId(); + } +#elif defined(WEBRTC_POSIX) + thread_ = pthread_self(); +#endif + owned_ = false; + thread_manager->SetCurrentThread(this); + return true; +} + +bool Thread::IsRunning() { +#if defined(WEBRTC_WIN) + return thread_ != nullptr; +#elif defined(WEBRTC_POSIX) + return thread_ != 0; +#endif +} + +AutoThread::AutoThread() + : Thread(CreateDefaultSocketServer(), /*do_init=*/false) { + if (!ThreadManager::Instance()->CurrentThread()) { + // DoInit registers with ThreadManager. Do that only if we intend to + // be rtc::Thread::Current(), otherwise ProcessAllMessageQueuesInternal will + // post a message to a queue that no running thread is serving. + DoInit(); + ThreadManager::Instance()->SetCurrentThread(this); + } +} + +AutoThread::~AutoThread() { + Stop(); + DoDestroy(); + if (ThreadManager::Instance()->CurrentThread() == this) { + ThreadManager::Instance()->SetCurrentThread(nullptr); + } +} + +AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss) + : Thread(ss, /*do_init=*/false) { + DoInit(); + old_thread_ = ThreadManager::Instance()->CurrentThread(); + // Temporarily set the current thread to nullptr so that we can keep checks + // around that catch unintentional pointer overwrites. + rtc::ThreadManager::Instance()->SetCurrentThread(nullptr); + rtc::ThreadManager::Instance()->SetCurrentThread(this); + if (old_thread_) { + ThreadManager::Remove(old_thread_); + } +} + +AutoSocketServerThread::~AutoSocketServerThread() { + RTC_DCHECK(ThreadManager::Instance()->CurrentThread() == this); + // Stop and destroy the thread before clearing it as the current thread. + // Sometimes there are messages left in the Thread that will be + // destroyed by DoDestroy, and sometimes the destructors of the message and/or + // its contents rely on this thread still being set as the current thread. + Stop(); + DoDestroy(); + rtc::ThreadManager::Instance()->SetCurrentThread(nullptr); + rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_); + if (old_thread_) { + ThreadManager::Add(old_thread_); + } +} + +} // namespace rtc |