diff options
Diffstat (limited to 'third_party/libwebrtc/webrtc/rtc_base/asyncinvoker.cc')
-rw-r--r-- | third_party/libwebrtc/webrtc/rtc_base/asyncinvoker.cc | 143 |
1 files changed, 143 insertions, 0 deletions
diff --git a/third_party/libwebrtc/webrtc/rtc_base/asyncinvoker.cc b/third_party/libwebrtc/webrtc/rtc_base/asyncinvoker.cc new file mode 100644 index 0000000000..7f65dfc35f --- /dev/null +++ b/third_party/libwebrtc/webrtc/rtc_base/asyncinvoker.cc @@ -0,0 +1,143 @@ +/* + * Copyright 2014 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "rtc_base/asyncinvoker.h" + +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" + +namespace rtc { + +AsyncInvoker::AsyncInvoker() + : pending_invocations_(0), + invocation_complete_(new RefCountedObject<Event>(false, false)), + destroying_(false) {} + +AsyncInvoker::~AsyncInvoker() { + destroying_.store(true, std::memory_order_relaxed); + // Messages for this need to be cleared *before* our destructor is complete. + MessageQueueManager::Clear(this); + // And we need to wait for any invocations that are still in progress on + // other threads. Using memory_order_acquire for synchronization with + // AsyncClosure destructors. + while (pending_invocations_.load(std::memory_order_acquire) > 0) { + // If the destructor was called while AsyncInvoke was being called by + // another thread, WITHIN an AsyncInvoked functor, it may do another + // Thread::Post even after we called MessageQueueManager::Clear(this). So + // we need to keep calling Clear to discard these posts. + Thread::Current()->Clear(this); + invocation_complete_->Wait(Event::kForever); + } +} + +void AsyncInvoker::OnMessage(Message* msg) { + // Get the AsyncClosure shared ptr from this message's data. + ScopedMessageData<AsyncClosure>* data = + static_cast<ScopedMessageData<AsyncClosure>*>(msg->pdata); + // Execute the closure and trigger the return message if needed. + data->inner_data().Execute(); + delete data; +} + +void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) { + // If the destructor is waiting for invocations to finish, don't start + // running even more tasks. + if (destroying_.load(std::memory_order_relaxed)) + return; + + // Run this on |thread| to reduce the number of context switches. + if (Thread::Current() != thread) { + thread->Invoke<void>(RTC_FROM_HERE, + Bind(&AsyncInvoker::Flush, this, thread, id)); + return; + } + + MessageList removed; + thread->Clear(this, id, &removed); + for (MessageList::iterator it = removed.begin(); it != removed.end(); ++it) { + // This message was pending on this thread, so run it now. + thread->Send(it->posted_from, it->phandler, it->message_id, it->pdata); + } +} + +void AsyncInvoker::DoInvoke(const Location& posted_from, + Thread* thread, + std::unique_ptr<AsyncClosure> closure, + uint32_t id) { + if (destroying_.load(std::memory_order_relaxed)) { + // Note that this may be expected, if the application is AsyncInvoking + // tasks that AsyncInvoke other tasks. But otherwise it indicates a race + // between a thread destroying the AsyncInvoker and a thread still trying + // to use it. + RTC_LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; + return; + } + thread->Post(posted_from, this, id, + new ScopedMessageData<AsyncClosure>(std::move(closure))); +} + +void AsyncInvoker::DoInvokeDelayed(const Location& posted_from, + Thread* thread, + std::unique_ptr<AsyncClosure> closure, + uint32_t delay_ms, + uint32_t id) { + if (destroying_.load(std::memory_order_relaxed)) { + // See above comment. + RTC_LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; + return; + } + thread->PostDelayed(posted_from, delay_ms, this, id, + new ScopedMessageData<AsyncClosure>(std::move(closure))); +} + +GuardedAsyncInvoker::GuardedAsyncInvoker() : thread_(Thread::Current()) { + thread_->SignalQueueDestroyed.connect(this, + &GuardedAsyncInvoker::ThreadDestroyed); +} + +GuardedAsyncInvoker::~GuardedAsyncInvoker() { +} + +bool GuardedAsyncInvoker::Flush(uint32_t id) { + CritScope cs(&crit_); + if (thread_ == nullptr) + return false; + invoker_.Flush(thread_, id); + return true; +} + +void GuardedAsyncInvoker::ThreadDestroyed() { + CritScope cs(&crit_); + // We should never get more than one notification about the thread dying. + RTC_DCHECK(thread_ != nullptr); + thread_ = nullptr; +} + +AsyncClosure::AsyncClosure(AsyncInvoker* invoker) + : invoker_(invoker), invocation_complete_(invoker_->invocation_complete_) { + invoker_->pending_invocations_.fetch_add(1, std::memory_order_relaxed); +} + +AsyncClosure::~AsyncClosure() { + // Using memory_order_release for synchronization with the AsyncInvoker + // destructor. + invoker_->pending_invocations_.fetch_sub(1, std::memory_order_release); + + // After |pending_invocations_| is decremented, we may need to signal + // |invocation_complete_| in case the AsyncInvoker is being destroyed and + // waiting for pending tasks to complete. + // + // It's also possible that the destructor finishes before "Set()" is called, + // which is safe because the event is reference counted (and in a thread-safe + // way). + invocation_complete_->Set(); +} + +} // namespace rtc |