summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/rtc_base/thread.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--third_party/libwebrtc/rtc_base/thread.cc1221
1 files changed, 1221 insertions, 0 deletions
diff --git a/third_party/libwebrtc/rtc_base/thread.cc b/third_party/libwebrtc/rtc_base/thread.cc
new file mode 100644
index 0000000000..56b6b43ef3
--- /dev/null
+++ b/third_party/libwebrtc/rtc_base/thread.cc
@@ -0,0 +1,1221 @@
+/*
+ * Copyright 2004 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/thread.h"
+
+#include "absl/strings/string_view.h"
+
+#if defined(WEBRTC_WIN)
+#include <comdef.h>
+#elif defined(WEBRTC_POSIX)
+#include <time.h>
+#else
+#error "Either WEBRTC_WIN or WEBRTC_POSIX needs to be defined."
+#endif
+
+#if defined(WEBRTC_WIN)
+// Disable warning that we don't care about:
+// warning C4722: destructor never returns, potential memory leak
+#pragma warning(disable : 4722)
+#endif
+
+#include <stdio.h>
+
+#include <utility>
+
+#include "absl/algorithm/container.h"
+#include "absl/cleanup/cleanup.h"
+#include "api/sequence_checker.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/deprecated/recursive_critical_section.h"
+#include "rtc_base/event.h"
+#include "rtc_base/internal/default_socket_server.h"
+#include "rtc_base/logging.h"
+#include "rtc_base/null_socket_server.h"
+#include "rtc_base/time_utils.h"
+#include "rtc_base/trace_event.h"
+
+#if defined(WEBRTC_MAC)
+#include "rtc_base/system/cocoa_threading.h"
+
+/*
+ * These are forward-declarations for methods that are part of the
+ * ObjC runtime. They are declared in the private header objc-internal.h.
+ * These calls are what clang inserts when using @autoreleasepool in ObjC,
+ * but here they are used directly in order to keep this file C++.
+ * https://clang.llvm.org/docs/AutomaticReferenceCounting.html#runtime-support
+ */
+extern "C" {
+void* objc_autoreleasePoolPush(void);
+void objc_autoreleasePoolPop(void* pool);
+}
+
+namespace {
+class ScopedAutoReleasePool {
+ public:
+ ScopedAutoReleasePool() : pool_(objc_autoreleasePoolPush()) {}
+ ~ScopedAutoReleasePool() { objc_autoreleasePoolPop(pool_); }
+
+ private:
+ void* const pool_;
+};
+} // namespace
+#endif
+
+namespace rtc {
+namespace {
+
+struct AnyInvocableMessage final : public MessageData {
+ explicit AnyInvocableMessage(absl::AnyInvocable<void() &&> task)
+ : task(std::move(task)) {}
+ absl::AnyInvocable<void() &&> task;
+};
+
+class AnyInvocableMessageHandler final : public MessageHandler {
+ public:
+ void OnMessage(Message* msg) override {
+ std::move(static_cast<AnyInvocableMessage*>(msg->pdata)->task)();
+ delete msg->pdata;
+ }
+};
+
+MessageHandler* GetAnyInvocableMessageHandler() {
+ static MessageHandler* const handler = new AnyInvocableMessageHandler;
+ return handler;
+}
+
+class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
+ public:
+ MarkProcessingCritScope(const RecursiveCriticalSection* cs,
+ size_t* processing) RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
+ : cs_(cs), processing_(processing) {
+ cs_->Enter();
+ *processing_ += 1;
+ }
+
+ ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
+ *processing_ -= 1;
+ cs_->Leave();
+ }
+
+ MarkProcessingCritScope(const MarkProcessingCritScope&) = delete;
+ MarkProcessingCritScope& operator=(const MarkProcessingCritScope&) = delete;
+
+ private:
+ const RecursiveCriticalSection* const cs_;
+ size_t* processing_;
+};
+
+} // namespace
+
+ThreadManager* ThreadManager::Instance() {
+ static ThreadManager* const thread_manager = new ThreadManager();
+ return thread_manager;
+}
+
+ThreadManager::~ThreadManager() {
+ // By above RTC_DEFINE_STATIC_LOCAL.
+ RTC_DCHECK_NOTREACHED() << "ThreadManager should never be destructed.";
+}
+
+// static
+void ThreadManager::Add(Thread* message_queue) {
+ return Instance()->AddInternal(message_queue);
+}
+void ThreadManager::AddInternal(Thread* message_queue) {
+ CritScope cs(&crit_);
+ // Prevent changes while the list of message queues is processed.
+ RTC_DCHECK_EQ(processing_, 0);
+ message_queues_.push_back(message_queue);
+}
+
+// static
+void ThreadManager::Remove(Thread* message_queue) {
+ return Instance()->RemoveInternal(message_queue);
+}
+void ThreadManager::RemoveInternal(Thread* message_queue) {
+ {
+ CritScope cs(&crit_);
+ // Prevent changes while the list of message queues is processed.
+ RTC_DCHECK_EQ(processing_, 0);
+ std::vector<Thread*>::iterator iter;
+ iter = absl::c_find(message_queues_, message_queue);
+ if (iter != message_queues_.end()) {
+ message_queues_.erase(iter);
+ }
+#if RTC_DCHECK_IS_ON
+ RemoveFromSendGraph(message_queue);
+#endif
+ }
+}
+
+#if RTC_DCHECK_IS_ON
+void ThreadManager::RemoveFromSendGraph(Thread* thread) {
+ for (auto it = send_graph_.begin(); it != send_graph_.end();) {
+ if (it->first == thread) {
+ it = send_graph_.erase(it);
+ } else {
+ it->second.erase(thread);
+ ++it;
+ }
+ }
+}
+
+void ThreadManager::RegisterSendAndCheckForCycles(Thread* source,
+ Thread* target) {
+ RTC_DCHECK(source);
+ RTC_DCHECK(target);
+
+ CritScope cs(&crit_);
+ std::deque<Thread*> all_targets({target});
+ // We check the pre-existing who-sends-to-who graph for any path from target
+ // to source. This loop is guaranteed to terminate because per the send graph
+ // invariant, there are no cycles in the graph.
+ for (size_t i = 0; i < all_targets.size(); i++) {
+ const auto& targets = send_graph_[all_targets[i]];
+ all_targets.insert(all_targets.end(), targets.begin(), targets.end());
+ }
+ RTC_CHECK_EQ(absl::c_count(all_targets, source), 0)
+ << " send loop between " << source->name() << " and " << target->name();
+
+ // We may now insert source -> target without creating a cycle, since there
+ // was no path from target to source per the prior CHECK.
+ send_graph_[source].insert(target);
+}
+#endif
+
+// static
+void ThreadManager::Clear(MessageHandler* handler) {
+ return Instance()->ClearInternal(handler);
+}
+void ThreadManager::ClearInternal(MessageHandler* handler) {
+ // Deleted objects may cause re-entrant calls to ClearInternal. This is
+ // allowed as the list of message queues does not change while queues are
+ // cleared.
+ MarkProcessingCritScope cs(&crit_, &processing_);
+ for (Thread* queue : message_queues_) {
+ queue->Clear(handler);
+ }
+}
+
+// static
+void ThreadManager::ProcessAllMessageQueuesForTesting() {
+ return Instance()->ProcessAllMessageQueuesInternal();
+}
+
+void ThreadManager::ProcessAllMessageQueuesInternal() {
+ // This works by posting a delayed message at the current time and waiting
+ // for it to be dispatched on all queues, which will ensure that all messages
+ // that came before it were also dispatched.
+ std::atomic<int> queues_not_done(0);
+
+ // This class is used so that whether the posted message is processed, or the
+ // message queue is simply cleared, queues_not_done gets decremented.
+ class ScopedIncrement : public MessageData {
+ public:
+ ScopedIncrement(std::atomic<int>* value) : value_(value) {
+ value_->fetch_add(1);
+ }
+ ~ScopedIncrement() override { value_->fetch_sub(1); }
+
+ private:
+ std::atomic<int>* value_;
+ };
+
+ {
+ MarkProcessingCritScope cs(&crit_, &processing_);
+ for (Thread* queue : message_queues_) {
+ if (!queue->IsProcessingMessagesForTesting()) {
+ // If the queue is not processing messages, it can
+ // be ignored. If we tried to post a message to it, it would be dropped
+ // or ignored.
+ continue;
+ }
+ queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
+ new ScopedIncrement(&queues_not_done));
+ }
+ }
+
+ rtc::Thread* current = rtc::Thread::Current();
+ // Note: One of the message queues may have been on this thread, which is
+ // why we can't synchronously wait for queues_not_done to go to 0; we need
+ // to process messages as well.
+ while (queues_not_done.load() > 0) {
+ if (current) {
+ current->ProcessMessages(0);
+ }
+ }
+}
+
+// static
+Thread* Thread::Current() {
+ ThreadManager* manager = ThreadManager::Instance();
+ Thread* thread = manager->CurrentThread();
+
+ return thread;
+}
+
+#if defined(WEBRTC_POSIX)
+ThreadManager::ThreadManager() {
+#if defined(WEBRTC_MAC)
+ InitCocoaMultiThreading();
+#endif
+ pthread_key_create(&key_, nullptr);
+}
+
+Thread* ThreadManager::CurrentThread() {
+ return static_cast<Thread*>(pthread_getspecific(key_));
+}
+
+void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
+ pthread_setspecific(key_, thread);
+}
+#endif
+
+#if defined(WEBRTC_WIN)
+ThreadManager::ThreadManager() : key_(TlsAlloc()) {}
+
+Thread* ThreadManager::CurrentThread() {
+ return static_cast<Thread*>(TlsGetValue(key_));
+}
+
+void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
+ TlsSetValue(key_, thread);
+}
+#endif
+
+void ThreadManager::SetCurrentThread(Thread* thread) {
+#if RTC_DLOG_IS_ON
+ if (CurrentThread() && thread) {
+ RTC_DLOG(LS_ERROR) << "SetCurrentThread: Overwriting an existing value?";
+ }
+#endif // RTC_DLOG_IS_ON
+
+ if (thread) {
+ thread->EnsureIsCurrentTaskQueue();
+ } else {
+ Thread* current = CurrentThread();
+ if (current) {
+ // The current thread is being cleared, e.g. as a result of
+ // UnwrapCurrent() being called or when a thread is being stopped
+ // (see PreRun()). This signals that the Thread instance is being detached
+ // from the thread, which also means that TaskQueue::Current() must not
+ // return a pointer to the Thread instance.
+ current->ClearCurrentTaskQueue();
+ }
+ }
+
+ SetCurrentThreadInternal(thread);
+}
+
+void rtc::ThreadManager::ChangeCurrentThreadForTest(rtc::Thread* thread) {
+ SetCurrentThreadInternal(thread);
+}
+
+Thread* ThreadManager::WrapCurrentThread() {
+ Thread* result = CurrentThread();
+ if (nullptr == result) {
+ result = new Thread(CreateDefaultSocketServer());
+ result->WrapCurrentWithThreadManager(this, true);
+ }
+ return result;
+}
+
+void ThreadManager::UnwrapCurrentThread() {
+ Thread* t = CurrentThread();
+ if (t && !(t->IsOwned())) {
+ t->UnwrapCurrent();
+ delete t;
+ }
+}
+
+Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls()
+ : thread_(Thread::Current()),
+ previous_state_(thread_->SetAllowBlockingCalls(false)) {}
+
+Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() {
+ RTC_DCHECK(thread_->IsCurrent());
+ thread_->SetAllowBlockingCalls(previous_state_);
+}
+
+#if RTC_DCHECK_IS_ON
+Thread::ScopedCountBlockingCalls::ScopedCountBlockingCalls(
+ std::function<void(uint32_t, uint32_t)> callback)
+ : thread_(Thread::Current()),
+ base_blocking_call_count_(thread_->GetBlockingCallCount()),
+ base_could_be_blocking_call_count_(
+ thread_->GetCouldBeBlockingCallCount()),
+ result_callback_(std::move(callback)) {}
+
+Thread::ScopedCountBlockingCalls::~ScopedCountBlockingCalls() {
+ if (GetTotalBlockedCallCount() >= min_blocking_calls_for_callback_) {
+ result_callback_(GetBlockingCallCount(), GetCouldBeBlockingCallCount());
+ }
+}
+
+uint32_t Thread::ScopedCountBlockingCalls::GetBlockingCallCount() const {
+ return thread_->GetBlockingCallCount() - base_blocking_call_count_;
+}
+
+uint32_t Thread::ScopedCountBlockingCalls::GetCouldBeBlockingCallCount() const {
+ return thread_->GetCouldBeBlockingCallCount() -
+ base_could_be_blocking_call_count_;
+}
+
+uint32_t Thread::ScopedCountBlockingCalls::GetTotalBlockedCallCount() const {
+ return GetBlockingCallCount() + GetCouldBeBlockingCallCount();
+}
+#endif
+
+Thread::Thread(SocketServer* ss) : Thread(ss, /*do_init=*/true) {}
+
+Thread::Thread(std::unique_ptr<SocketServer> ss)
+ : Thread(std::move(ss), /*do_init=*/true) {}
+
+Thread::Thread(SocketServer* ss, bool do_init)
+ : fPeekKeep_(false),
+ delayed_next_num_(0),
+ fInitialized_(false),
+ fDestroyed_(false),
+ stop_(0),
+ ss_(ss) {
+ RTC_DCHECK(ss);
+ ss_->SetMessageQueue(this);
+ SetName("Thread", this); // default name
+ if (do_init) {
+ DoInit();
+ }
+}
+
+Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init)
+ : Thread(ss.get(), do_init) {
+ own_ss_ = std::move(ss);
+}
+
+Thread::~Thread() {
+ Stop();
+ DoDestroy();
+}
+
+void Thread::DoInit() {
+ if (fInitialized_) {
+ return;
+ }
+
+ fInitialized_ = true;
+ ThreadManager::Add(this);
+}
+
+void Thread::DoDestroy() {
+ if (fDestroyed_) {
+ return;
+ }
+
+ fDestroyed_ = true;
+ // The signal is done from here to ensure
+ // that it always gets called when the queue
+ // is going away.
+ if (ss_) {
+ ss_->SetMessageQueue(nullptr);
+ }
+ ThreadManager::Remove(this);
+ ClearInternal(nullptr, MQID_ANY, nullptr);
+}
+
+SocketServer* Thread::socketserver() {
+ return ss_;
+}
+
+void Thread::WakeUpSocketServer() {
+ ss_->WakeUp();
+}
+
+void Thread::Quit() {
+ stop_.store(1, std::memory_order_release);
+ WakeUpSocketServer();
+}
+
+bool Thread::IsQuitting() {
+ return stop_.load(std::memory_order_acquire) != 0;
+}
+
+void Thread::Restart() {
+ stop_.store(0, std::memory_order_release);
+}
+
+bool Thread::Peek(Message* pmsg, int cmsWait) {
+ if (fPeekKeep_) {
+ *pmsg = msgPeek_;
+ return true;
+ }
+ if (!Get(pmsg, cmsWait))
+ return false;
+ msgPeek_ = *pmsg;
+ fPeekKeep_ = true;
+ return true;
+}
+
+bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
+ // Return and clear peek if present
+ // Always return the peek if it exists so there is Peek/Get symmetry
+
+ if (fPeekKeep_) {
+ *pmsg = msgPeek_;
+ fPeekKeep_ = false;
+ return true;
+ }
+
+ // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
+
+ int64_t cmsTotal = cmsWait;
+ int64_t cmsElapsed = 0;
+ int64_t msStart = TimeMillis();
+ int64_t msCurrent = msStart;
+ while (true) {
+ // Check for posted events
+ int64_t cmsDelayNext = kForever;
+ bool first_pass = true;
+ while (true) {
+ // All queue operations need to be locked, but nothing else in this loop
+ // (specifically handling disposed message) can happen inside the crit.
+ // Otherwise, disposed MessageHandlers will cause deadlocks.
+ {
+ CritScope cs(&crit_);
+ // On the first pass, check for delayed messages that have been
+ // triggered and calculate the next trigger time.
+ if (first_pass) {
+ first_pass = false;
+ while (!delayed_messages_.empty()) {
+ if (msCurrent < delayed_messages_.top().run_time_ms_) {
+ cmsDelayNext =
+ TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
+ break;
+ }
+ messages_.push_back(delayed_messages_.top().msg_);
+ delayed_messages_.pop();
+ }
+ }
+ // Pull a message off the message queue, if available.
+ if (messages_.empty()) {
+ break;
+ } else {
+ *pmsg = messages_.front();
+ messages_.pop_front();
+ }
+ } // crit_ is released here.
+
+ // If this was a dispose message, delete it and skip it.
+ if (MQID_DISPOSE == pmsg->message_id) {
+ RTC_DCHECK(nullptr == pmsg->phandler);
+ delete pmsg->pdata;
+ *pmsg = Message();
+ continue;
+ }
+ return true;
+ }
+
+ if (IsQuitting())
+ break;
+
+ // Which is shorter, the delay wait or the asked wait?
+
+ int64_t cmsNext;
+ if (cmsWait == kForever) {
+ cmsNext = cmsDelayNext;
+ } else {
+ cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
+ if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
+ cmsNext = cmsDelayNext;
+ }
+
+ {
+ // Wait and multiplex in the meantime
+ if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
+ return false;
+ }
+
+ // If the specified timeout expired, return
+
+ msCurrent = TimeMillis();
+ cmsElapsed = TimeDiff(msCurrent, msStart);
+ if (cmsWait != kForever) {
+ if (cmsElapsed >= cmsWait)
+ return false;
+ }
+ }
+ return false;
+}
+
+void Thread::Post(const Location& posted_from,
+ MessageHandler* phandler,
+ uint32_t id,
+ MessageData* pdata,
+ bool time_sensitive) {
+ RTC_DCHECK(!time_sensitive);
+ if (IsQuitting()) {
+ delete pdata;
+ return;
+ }
+
+ // Keep thread safe
+ // Add the message to the end of the queue
+ // Signal for the multiplexer to return
+
+ {
+ CritScope cs(&crit_);
+ Message msg;
+ msg.posted_from = posted_from;
+ msg.phandler = phandler;
+ msg.message_id = id;
+ msg.pdata = pdata;
+ messages_.push_back(msg);
+ }
+ WakeUpSocketServer();
+}
+
+void Thread::PostDelayed(const Location& posted_from,
+ int delay_ms,
+ MessageHandler* phandler,
+ uint32_t id,
+ MessageData* pdata) {
+ return DoDelayPost(posted_from, delay_ms, TimeAfter(delay_ms), phandler, id,
+ pdata);
+}
+
+void Thread::PostAt(const Location& posted_from,
+ int64_t run_at_ms,
+ MessageHandler* phandler,
+ uint32_t id,
+ MessageData* pdata) {
+ return DoDelayPost(posted_from, TimeUntil(run_at_ms), run_at_ms, phandler, id,
+ pdata);
+}
+
+void Thread::DoDelayPost(const Location& posted_from,
+ int64_t delay_ms,
+ int64_t run_at_ms,
+ MessageHandler* phandler,
+ uint32_t id,
+ MessageData* pdata) {
+ if (IsQuitting()) {
+ delete pdata;
+ return;
+ }
+
+ // Keep thread safe
+ // Add to the priority queue. Gets sorted soonest first.
+ // Signal for the multiplexer to return.
+
+ {
+ CritScope cs(&crit_);
+ Message msg;
+ msg.posted_from = posted_from;
+ msg.phandler = phandler;
+ msg.message_id = id;
+ msg.pdata = pdata;
+ DelayedMessage delayed(delay_ms, run_at_ms, delayed_next_num_, msg);
+ delayed_messages_.push(delayed);
+ // If this message queue processes 1 message every millisecond for 50 days,
+ // we will wrap this number. Even then, only messages with identical times
+ // will be misordered, and then only briefly. This is probably ok.
+ ++delayed_next_num_;
+ RTC_DCHECK_NE(0, delayed_next_num_);
+ }
+ WakeUpSocketServer();
+}
+
+int Thread::GetDelay() {
+ CritScope cs(&crit_);
+
+ if (!messages_.empty())
+ return 0;
+
+ if (!delayed_messages_.empty()) {
+ int delay = TimeUntil(delayed_messages_.top().run_time_ms_);
+ if (delay < 0)
+ delay = 0;
+ return delay;
+ }
+
+ return kForever;
+}
+
+void Thread::ClearInternal(MessageHandler* phandler,
+ uint32_t id,
+ MessageList* removed) {
+ // Remove messages with phandler
+
+ if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
+ if (removed) {
+ removed->push_back(msgPeek_);
+ } else {
+ delete msgPeek_.pdata;
+ }
+ fPeekKeep_ = false;
+ }
+
+ // Remove from ordered message queue
+
+ for (auto it = messages_.begin(); it != messages_.end();) {
+ if (it->Match(phandler, id)) {
+ if (removed) {
+ removed->push_back(*it);
+ } else {
+ delete it->pdata;
+ }
+ it = messages_.erase(it);
+ } else {
+ ++it;
+ }
+ }
+
+ // Remove from priority queue. Not directly iterable, so use this approach
+
+ auto new_end = delayed_messages_.container().begin();
+ for (auto it = new_end; it != delayed_messages_.container().end(); ++it) {
+ if (it->msg_.Match(phandler, id)) {
+ if (removed) {
+ removed->push_back(it->msg_);
+ } else {
+ delete it->msg_.pdata;
+ }
+ } else {
+ *new_end++ = *it;
+ }
+ }
+ delayed_messages_.container().erase(new_end,
+ delayed_messages_.container().end());
+ delayed_messages_.reheap();
+}
+
+void Thread::Dispatch(Message* pmsg) {
+ TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",
+ pmsg->posted_from.file_name(), "src_func",
+ pmsg->posted_from.function_name());
+ RTC_DCHECK_RUN_ON(this);
+ int64_t start_time = TimeMillis();
+ pmsg->phandler->OnMessage(pmsg);
+ int64_t end_time = TimeMillis();
+ int64_t diff = TimeDiff(end_time, start_time);
+ if (diff >= dispatch_warning_ms_) {
+ RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
+ << "ms to dispatch. Posted from: "
+ << pmsg->posted_from.ToString();
+ // To avoid log spew, move the warning limit to only give warning
+ // for delays that are larger than the one observed.
+ dispatch_warning_ms_ = diff + 1;
+ }
+}
+
+bool Thread::IsCurrent() const {
+ return ThreadManager::Instance()->CurrentThread() == this;
+}
+
+std::unique_ptr<Thread> Thread::CreateWithSocketServer() {
+ return std::unique_ptr<Thread>(new Thread(CreateDefaultSocketServer()));
+}
+
+std::unique_ptr<Thread> Thread::Create() {
+ return std::unique_ptr<Thread>(
+ new Thread(std::unique_ptr<SocketServer>(new NullSocketServer())));
+}
+
+bool Thread::SleepMs(int milliseconds) {
+ AssertBlockingIsAllowedOnCurrentThread();
+
+#if defined(WEBRTC_WIN)
+ ::Sleep(milliseconds);
+ return true;
+#else
+ // POSIX has both a usleep() and a nanosleep(), but the former is deprecated,
+ // so we use nanosleep() even though it has greater precision than necessary.
+ struct timespec ts;
+ ts.tv_sec = milliseconds / 1000;
+ ts.tv_nsec = (milliseconds % 1000) * 1000000;
+ int ret = nanosleep(&ts, nullptr);
+ if (ret != 0) {
+ RTC_LOG_ERR(LS_WARNING) << "nanosleep() returning early";
+ return false;
+ }
+ return true;
+#endif
+}
+
+bool Thread::SetName(absl::string_view name, const void* obj) {
+ RTC_DCHECK(!IsRunning());
+
+ name_ = std::string(name);
+ if (obj) {
+ // The %p specifier typically produce at most 16 hex digits, possibly with a
+ // 0x prefix. But format is implementation defined, so add some margin.
+ char buf[30];
+ snprintf(buf, sizeof(buf), " 0x%p", obj);
+ name_ += buf;
+ }
+ return true;
+}
+
+void Thread::SetDispatchWarningMs(int deadline) {
+ if (!IsCurrent()) {
+ PostTask([this, deadline]() { SetDispatchWarningMs(deadline); });
+ return;
+ }
+ RTC_DCHECK_RUN_ON(this);
+ dispatch_warning_ms_ = deadline;
+}
+
+bool Thread::Start() {
+ RTC_DCHECK(!IsRunning());
+
+ if (IsRunning())
+ return false;
+
+ Restart(); // reset IsQuitting() if the thread is being restarted
+
+ // Make sure that ThreadManager is created on the main thread before
+ // we start a new thread.
+ ThreadManager::Instance();
+
+ owned_ = true;
+
+#if defined(WEBRTC_WIN)
+ thread_ = CreateThread(nullptr, 0, PreRun, this, 0, &thread_id_);
+ if (!thread_) {
+ return false;
+ }
+#elif defined(WEBRTC_POSIX)
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+
+ int error_code = pthread_create(&thread_, &attr, PreRun, this);
+ if (0 != error_code) {
+ RTC_LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
+ thread_ = 0;
+ return false;
+ }
+ RTC_DCHECK(thread_);
+#endif
+ return true;
+}
+
+bool Thread::WrapCurrent() {
+ return WrapCurrentWithThreadManager(ThreadManager::Instance(), true);
+}
+
+void Thread::UnwrapCurrent() {
+ // Clears the platform-specific thread-specific storage.
+ ThreadManager::Instance()->SetCurrentThread(nullptr);
+#if defined(WEBRTC_WIN)
+ if (thread_ != nullptr) {
+ if (!CloseHandle(thread_)) {
+ RTC_LOG_GLE(LS_ERROR)
+ << "When unwrapping thread, failed to close handle.";
+ }
+ thread_ = nullptr;
+ thread_id_ = 0;
+ }
+#elif defined(WEBRTC_POSIX)
+ thread_ = 0;
+#endif
+}
+
+void Thread::SafeWrapCurrent() {
+ WrapCurrentWithThreadManager(ThreadManager::Instance(), false);
+}
+
+void Thread::Join() {
+ if (!IsRunning())
+ return;
+
+ RTC_DCHECK(!IsCurrent());
+ if (Current() && !Current()->blocking_calls_allowed_) {
+ RTC_LOG(LS_WARNING) << "Waiting for the thread to join, "
+ "but blocking calls have been disallowed";
+ }
+
+#if defined(WEBRTC_WIN)
+ RTC_DCHECK(thread_ != nullptr);
+ WaitForSingleObject(thread_, INFINITE);
+ CloseHandle(thread_);
+ thread_ = nullptr;
+ thread_id_ = 0;
+#elif defined(WEBRTC_POSIX)
+ pthread_join(thread_, nullptr);
+ thread_ = 0;
+#endif
+}
+
+bool Thread::SetAllowBlockingCalls(bool allow) {
+ RTC_DCHECK(IsCurrent());
+ bool previous = blocking_calls_allowed_;
+ blocking_calls_allowed_ = allow;
+ return previous;
+}
+
+// static
+void Thread::AssertBlockingIsAllowedOnCurrentThread() {
+#if !defined(NDEBUG)
+ Thread* current = Thread::Current();
+ RTC_DCHECK(!current || current->blocking_calls_allowed_);
+#endif
+}
+
+// static
+#if defined(WEBRTC_WIN)
+DWORD WINAPI Thread::PreRun(LPVOID pv) {
+#else
+void* Thread::PreRun(void* pv) {
+#endif
+ Thread* thread = static_cast<Thread*>(pv);
+ ThreadManager::Instance()->SetCurrentThread(thread);
+ rtc::SetCurrentThreadName(thread->name_.c_str());
+#if defined(WEBRTC_MAC)
+ ScopedAutoReleasePool pool;
+#endif
+ thread->Run();
+
+ ThreadManager::Instance()->SetCurrentThread(nullptr);
+#ifdef WEBRTC_WIN
+ return 0;
+#else
+ return nullptr;
+#endif
+} // namespace rtc
+
+void Thread::Run() {
+ ProcessMessages(kForever);
+}
+
+bool Thread::IsOwned() {
+ RTC_DCHECK(IsRunning());
+ return owned_;
+}
+
+void Thread::Stop() {
+ Thread::Quit();
+ Join();
+}
+
+void Thread::Send(const Location& posted_from,
+ MessageHandler* phandler,
+ uint32_t id,
+ MessageData* pdata) {
+ RTC_DCHECK(!IsQuitting());
+ if (IsQuitting())
+ return;
+
+ // Sent messages are sent to the MessageHandler directly, in the context
+ // of "thread", like Win32 SendMessage. If in the right context,
+ // call the handler directly.
+ Message msg;
+ msg.posted_from = posted_from;
+ msg.phandler = phandler;
+ msg.message_id = id;
+ msg.pdata = pdata;
+ if (IsCurrent()) {
+#if RTC_DCHECK_IS_ON
+ RTC_DCHECK(this->IsInvokeToThreadAllowed(this));
+ RTC_DCHECK_RUN_ON(this);
+ could_be_blocking_call_count_++;
+#endif
+ msg.phandler->OnMessage(&msg);
+ return;
+ }
+
+ AssertBlockingIsAllowedOnCurrentThread();
+
+ Thread* current_thread = Thread::Current();
+
+#if RTC_DCHECK_IS_ON
+ if (current_thread) {
+ RTC_DCHECK_RUN_ON(current_thread);
+ current_thread->blocking_call_count_++;
+ RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
+ ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
+ this);
+ }
+#endif
+
+ // Perhaps down the line we can get rid of this workaround and always require
+ // current_thread to be valid when Send() is called.
+ std::unique_ptr<rtc::Event> done_event;
+ if (!current_thread)
+ done_event.reset(new rtc::Event());
+
+ bool ready = false;
+ absl::Cleanup cleanup = [this, &ready, current_thread,
+ done = done_event.get()] {
+ if (current_thread) {
+ CritScope cs(&crit_);
+ ready = true;
+ current_thread->socketserver()->WakeUp();
+ } else {
+ done->Set();
+ }
+ };
+ PostTask([&msg, cleanup = std::move(cleanup)]() mutable {
+ msg.phandler->OnMessage(&msg);
+ });
+ if (current_thread) {
+ bool waited = false;
+ crit_.Enter();
+ while (!ready) {
+ crit_.Leave();
+ current_thread->socketserver()->Wait(kForever, false);
+ waited = true;
+ crit_.Enter();
+ }
+ crit_.Leave();
+
+ // Our Wait loop above may have consumed some WakeUp events for this
+ // Thread, that weren't relevant to this Send. Losing these WakeUps can
+ // cause problems for some SocketServers.
+ //
+ // Concrete example:
+ // Win32SocketServer on thread A calls Send on thread B. While processing
+ // the message, thread B Posts a message to A. We consume the wakeup for
+ // that Post while waiting for the Send to complete, which means that when
+ // we exit this loop, we need to issue another WakeUp, or else the Posted
+ // message won't be processed in a timely manner.
+
+ if (waited) {
+ current_thread->socketserver()->WakeUp();
+ }
+ } else {
+ done_event->Wait(rtc::Event::kForever);
+ }
+}
+
+void Thread::InvokeInternal(const Location& posted_from,
+ rtc::FunctionView<void()> functor) {
+ TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
+ "src_func", posted_from.function_name());
+
+ class FunctorMessageHandler : public MessageHandler {
+ public:
+ explicit FunctorMessageHandler(rtc::FunctionView<void()> functor)
+ : functor_(functor) {}
+ void OnMessage(Message* msg) override { functor_(); }
+
+ private:
+ rtc::FunctionView<void()> functor_;
+ } handler(functor);
+
+ Send(posted_from, &handler);
+}
+
+// Called by the ThreadManager when being set as the current thread.
+void Thread::EnsureIsCurrentTaskQueue() {
+ task_queue_registration_ =
+ std::make_unique<TaskQueueBase::CurrentTaskQueueSetter>(this);
+}
+
+// Called by the ThreadManager when being set as the current thread.
+void Thread::ClearCurrentTaskQueue() {
+ task_queue_registration_.reset();
+}
+
+void Thread::AllowInvokesToThread(Thread* thread) {
+#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
+ if (!IsCurrent()) {
+ PostTask([thread, this]() { AllowInvokesToThread(thread); });
+ return;
+ }
+ RTC_DCHECK_RUN_ON(this);
+ allowed_threads_.push_back(thread);
+ invoke_policy_enabled_ = true;
+#endif
+}
+
+void Thread::DisallowAllInvokes() {
+#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
+ if (!IsCurrent()) {
+ PostTask([this]() { DisallowAllInvokes(); });
+ return;
+ }
+ RTC_DCHECK_RUN_ON(this);
+ allowed_threads_.clear();
+ invoke_policy_enabled_ = true;
+#endif
+}
+
+#if RTC_DCHECK_IS_ON
+uint32_t Thread::GetBlockingCallCount() const {
+ RTC_DCHECK_RUN_ON(this);
+ return blocking_call_count_;
+}
+uint32_t Thread::GetCouldBeBlockingCallCount() const {
+ RTC_DCHECK_RUN_ON(this);
+ return could_be_blocking_call_count_;
+}
+#endif
+
+// Returns true if no policies added or if there is at least one policy
+// that permits invocation to `target` thread.
+bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) {
+#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
+ RTC_DCHECK_RUN_ON(this);
+ if (!invoke_policy_enabled_) {
+ return true;
+ }
+ for (const auto* thread : allowed_threads_) {
+ if (thread == target) {
+ return true;
+ }
+ }
+ return false;
+#else
+ return true;
+#endif
+}
+
+void Thread::Delete() {
+ Stop();
+ delete this;
+}
+
+void Thread::PostTask(absl::AnyInvocable<void() &&> task) {
+ // Though Post takes MessageData by raw pointer (last parameter), it still
+ // takes it with ownership.
+ Post(RTC_FROM_HERE, GetAnyInvocableMessageHandler(),
+ /*id=*/0, new AnyInvocableMessage(std::move(task)));
+}
+
+void Thread::PostDelayedTask(absl::AnyInvocable<void() &&> task,
+ webrtc::TimeDelta delay) {
+ // This implementation does not support low precision yet.
+ PostDelayedHighPrecisionTask(std::move(task), delay);
+}
+
+void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
+ webrtc::TimeDelta delay) {
+ int delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>();
+ // Though PostDelayed takes MessageData by raw pointer (last parameter),
+ // it still takes it with ownership.
+ PostDelayed(RTC_FROM_HERE, delay_ms, GetAnyInvocableMessageHandler(),
+ /*id=*/0, new AnyInvocableMessage(std::move(task)));
+}
+
+bool Thread::IsProcessingMessagesForTesting() {
+ return (owned_ || IsCurrent()) && !IsQuitting();
+}
+
+void Thread::Clear(MessageHandler* phandler,
+ uint32_t id,
+ MessageList* removed) {
+ CritScope cs(&crit_);
+ ClearInternal(phandler, id, removed);
+}
+
+bool Thread::ProcessMessages(int cmsLoop) {
+ // Using ProcessMessages with a custom clock for testing and a time greater
+ // than 0 doesn't work, since it's not guaranteed to advance the custom
+ // clock's time, and may get stuck in an infinite loop.
+ RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
+ cmsLoop == kForever);
+ int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
+ int cmsNext = cmsLoop;
+
+ while (true) {
+#if defined(WEBRTC_MAC)
+ ScopedAutoReleasePool pool;
+#endif
+ Message msg;
+ if (!Get(&msg, cmsNext))
+ return !IsQuitting();
+ Dispatch(&msg);
+
+ if (cmsLoop != kForever) {
+ cmsNext = static_cast<int>(TimeUntil(msEnd));
+ if (cmsNext < 0)
+ return true;
+ }
+ }
+}
+
+bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
+ bool need_synchronize_access) {
+ RTC_DCHECK(!IsRunning());
+
+#if defined(WEBRTC_WIN)
+ if (need_synchronize_access) {
+ // We explicitly ask for no rights other than synchronization.
+ // This gives us the best chance of succeeding.
+ thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
+ if (!thread_) {
+ RTC_LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
+ return false;
+ }
+ thread_id_ = GetCurrentThreadId();
+ }
+#elif defined(WEBRTC_POSIX)
+ thread_ = pthread_self();
+#endif
+ owned_ = false;
+ thread_manager->SetCurrentThread(this);
+ return true;
+}
+
+bool Thread::IsRunning() {
+#if defined(WEBRTC_WIN)
+ return thread_ != nullptr;
+#elif defined(WEBRTC_POSIX)
+ return thread_ != 0;
+#endif
+}
+
+AutoThread::AutoThread()
+ : Thread(CreateDefaultSocketServer(), /*do_init=*/false) {
+ if (!ThreadManager::Instance()->CurrentThread()) {
+ // DoInit registers with ThreadManager. Do that only if we intend to
+ // be rtc::Thread::Current(), otherwise ProcessAllMessageQueuesInternal will
+ // post a message to a queue that no running thread is serving.
+ DoInit();
+ ThreadManager::Instance()->SetCurrentThread(this);
+ }
+}
+
+AutoThread::~AutoThread() {
+ Stop();
+ DoDestroy();
+ if (ThreadManager::Instance()->CurrentThread() == this) {
+ ThreadManager::Instance()->SetCurrentThread(nullptr);
+ }
+}
+
+AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss)
+ : Thread(ss, /*do_init=*/false) {
+ DoInit();
+ old_thread_ = ThreadManager::Instance()->CurrentThread();
+ // Temporarily set the current thread to nullptr so that we can keep checks
+ // around that catch unintentional pointer overwrites.
+ rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
+ rtc::ThreadManager::Instance()->SetCurrentThread(this);
+ if (old_thread_) {
+ ThreadManager::Remove(old_thread_);
+ }
+}
+
+AutoSocketServerThread::~AutoSocketServerThread() {
+ RTC_DCHECK(ThreadManager::Instance()->CurrentThread() == this);
+ // Stop and destroy the thread before clearing it as the current thread.
+ // Sometimes there are messages left in the Thread that will be
+ // destroyed by DoDestroy, and sometimes the destructors of the message and/or
+ // its contents rely on this thread still being set as the current thread.
+ Stop();
+ DoDestroy();
+ rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
+ rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_);
+ if (old_thread_) {
+ ThreadManager::Add(old_thread_);
+ }
+}
+
+} // namespace rtc