summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/webrtc/rtc_base/asyncinvoker.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/libwebrtc/webrtc/rtc_base/asyncinvoker.cc')
-rw-r--r--third_party/libwebrtc/webrtc/rtc_base/asyncinvoker.cc143
1 files changed, 143 insertions, 0 deletions
diff --git a/third_party/libwebrtc/webrtc/rtc_base/asyncinvoker.cc b/third_party/libwebrtc/webrtc/rtc_base/asyncinvoker.cc
new file mode 100644
index 0000000000..7f65dfc35f
--- /dev/null
+++ b/third_party/libwebrtc/webrtc/rtc_base/asyncinvoker.cc
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2014 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/asyncinvoker.h"
+
+#include "rtc_base/checks.h"
+#include "rtc_base/logging.h"
+
+namespace rtc {
+
+AsyncInvoker::AsyncInvoker()
+ : pending_invocations_(0),
+ invocation_complete_(new RefCountedObject<Event>(false, false)),
+ destroying_(false) {}
+
+AsyncInvoker::~AsyncInvoker() {
+ destroying_.store(true, std::memory_order_relaxed);
+ // Messages for this need to be cleared *before* our destructor is complete.
+ MessageQueueManager::Clear(this);
+ // And we need to wait for any invocations that are still in progress on
+ // other threads. Using memory_order_acquire for synchronization with
+ // AsyncClosure destructors.
+ while (pending_invocations_.load(std::memory_order_acquire) > 0) {
+ // If the destructor was called while AsyncInvoke was being called by
+ // another thread, WITHIN an AsyncInvoked functor, it may do another
+ // Thread::Post even after we called MessageQueueManager::Clear(this). So
+ // we need to keep calling Clear to discard these posts.
+ Thread::Current()->Clear(this);
+ invocation_complete_->Wait(Event::kForever);
+ }
+}
+
+void AsyncInvoker::OnMessage(Message* msg) {
+ // Get the AsyncClosure shared ptr from this message's data.
+ ScopedMessageData<AsyncClosure>* data =
+ static_cast<ScopedMessageData<AsyncClosure>*>(msg->pdata);
+ // Execute the closure and trigger the return message if needed.
+ data->inner_data().Execute();
+ delete data;
+}
+
+void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) {
+ // If the destructor is waiting for invocations to finish, don't start
+ // running even more tasks.
+ if (destroying_.load(std::memory_order_relaxed))
+ return;
+
+ // Run this on |thread| to reduce the number of context switches.
+ if (Thread::Current() != thread) {
+ thread->Invoke<void>(RTC_FROM_HERE,
+ Bind(&AsyncInvoker::Flush, this, thread, id));
+ return;
+ }
+
+ MessageList removed;
+ thread->Clear(this, id, &removed);
+ for (MessageList::iterator it = removed.begin(); it != removed.end(); ++it) {
+ // This message was pending on this thread, so run it now.
+ thread->Send(it->posted_from, it->phandler, it->message_id, it->pdata);
+ }
+}
+
+void AsyncInvoker::DoInvoke(const Location& posted_from,
+ Thread* thread,
+ std::unique_ptr<AsyncClosure> closure,
+ uint32_t id) {
+ if (destroying_.load(std::memory_order_relaxed)) {
+ // Note that this may be expected, if the application is AsyncInvoking
+ // tasks that AsyncInvoke other tasks. But otherwise it indicates a race
+ // between a thread destroying the AsyncInvoker and a thread still trying
+ // to use it.
+ RTC_LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
+ return;
+ }
+ thread->Post(posted_from, this, id,
+ new ScopedMessageData<AsyncClosure>(std::move(closure)));
+}
+
+void AsyncInvoker::DoInvokeDelayed(const Location& posted_from,
+ Thread* thread,
+ std::unique_ptr<AsyncClosure> closure,
+ uint32_t delay_ms,
+ uint32_t id) {
+ if (destroying_.load(std::memory_order_relaxed)) {
+ // See above comment.
+ RTC_LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
+ return;
+ }
+ thread->PostDelayed(posted_from, delay_ms, this, id,
+ new ScopedMessageData<AsyncClosure>(std::move(closure)));
+}
+
+GuardedAsyncInvoker::GuardedAsyncInvoker() : thread_(Thread::Current()) {
+ thread_->SignalQueueDestroyed.connect(this,
+ &GuardedAsyncInvoker::ThreadDestroyed);
+}
+
+GuardedAsyncInvoker::~GuardedAsyncInvoker() {
+}
+
+bool GuardedAsyncInvoker::Flush(uint32_t id) {
+ CritScope cs(&crit_);
+ if (thread_ == nullptr)
+ return false;
+ invoker_.Flush(thread_, id);
+ return true;
+}
+
+void GuardedAsyncInvoker::ThreadDestroyed() {
+ CritScope cs(&crit_);
+ // We should never get more than one notification about the thread dying.
+ RTC_DCHECK(thread_ != nullptr);
+ thread_ = nullptr;
+}
+
+AsyncClosure::AsyncClosure(AsyncInvoker* invoker)
+ : invoker_(invoker), invocation_complete_(invoker_->invocation_complete_) {
+ invoker_->pending_invocations_.fetch_add(1, std::memory_order_relaxed);
+}
+
+AsyncClosure::~AsyncClosure() {
+ // Using memory_order_release for synchronization with the AsyncInvoker
+ // destructor.
+ invoker_->pending_invocations_.fetch_sub(1, std::memory_order_release);
+
+ // After |pending_invocations_| is decremented, we may need to signal
+ // |invocation_complete_| in case the AsyncInvoker is being destroyed and
+ // waiting for pending tasks to complete.
+ //
+ // It's also possible that the destructor finishes before "Set()" is called,
+ // which is safe because the event is reference counted (and in a thread-safe
+ // way).
+ invocation_complete_->Set();
+}
+
+} // namespace rtc