diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/libwebrtc/webrtc/rtc_base/messagequeue.cc | |
parent | Initial commit. (diff) | |
download | firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/libwebrtc/webrtc/rtc_base/messagequeue.cc')
-rw-r--r-- | third_party/libwebrtc/webrtc/rtc_base/messagequeue.cc | 541 |
1 files changed, 541 insertions, 0 deletions
diff --git a/third_party/libwebrtc/webrtc/rtc_base/messagequeue.cc b/third_party/libwebrtc/webrtc/rtc_base/messagequeue.cc new file mode 100644 index 0000000000..001d3ed148 --- /dev/null +++ b/third_party/libwebrtc/webrtc/rtc_base/messagequeue.cc @@ -0,0 +1,541 @@ +/* + * Copyright 2004 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include <algorithm> + +#include "rtc_base/atomicops.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" +#include "rtc_base/messagequeue.h" +#include "rtc_base/stringencode.h" +#include "rtc_base/thread.h" +#include "rtc_base/trace_event.h" + +namespace rtc { +namespace { + +const int kMaxMsgLatency = 150; // 150 ms +const int kSlowDispatchLoggingThreshold = 50; // 50 ms + +class RTC_SCOPED_LOCKABLE MarkProcessingCritScope { + public: + MarkProcessingCritScope(const CriticalSection* cs, size_t* processing) + RTC_EXCLUSIVE_LOCK_FUNCTION(cs) + : cs_(cs), processing_(processing) { + cs_->Enter(); + *processing_ += 1; + } + + ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() { + *processing_ -= 1; + cs_->Leave(); + } + + private: + const CriticalSection* const cs_; + size_t* processing_; + + RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope); +}; +} // namespace + +//------------------------------------------------------------------ +// MessageQueueManager + +MessageQueueManager* MessageQueueManager::instance_ = nullptr; + +MessageQueueManager* MessageQueueManager::Instance() { + // Note: This is not thread safe, but it is first called before threads are + // spawned. + if (!instance_) + instance_ = new MessageQueueManager; + return instance_; +} + +bool MessageQueueManager::IsInitialized() { + return instance_ != nullptr; +} + +MessageQueueManager::MessageQueueManager() : processing_(0) {} + +MessageQueueManager::~MessageQueueManager() { +} + +void MessageQueueManager::Add(MessageQueue *message_queue) { + return Instance()->AddInternal(message_queue); +} +void MessageQueueManager::AddInternal(MessageQueue *message_queue) { + CritScope cs(&crit_); + // Prevent changes while the list of message queues is processed. + RTC_DCHECK_EQ(processing_, 0); + message_queues_.push_back(message_queue); +} + +void MessageQueueManager::Remove(MessageQueue *message_queue) { + // If there isn't a message queue manager instance, then there isn't a queue + // to remove. + if (!instance_) return; + return Instance()->RemoveInternal(message_queue); +} +void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) { + // If this is the last MessageQueue, destroy the manager as well so that + // we don't leak this object at program shutdown. As mentioned above, this is + // not thread-safe, but this should only happen at program termination (when + // the ThreadManager is destroyed, and threads are no longer active). + bool destroy = false; + { + CritScope cs(&crit_); + // Prevent changes while the list of message queues is processed. + RTC_DCHECK_EQ(processing_, 0); + std::vector<MessageQueue *>::iterator iter; + iter = std::find(message_queues_.begin(), message_queues_.end(), + message_queue); + if (iter != message_queues_.end()) { + message_queues_.erase(iter); + } + destroy = message_queues_.empty(); + } + if (destroy) { + instance_ = nullptr; + delete this; + } +} + +void MessageQueueManager::Clear(MessageHandler *handler) { + // If there isn't a message queue manager instance, then there aren't any + // queues to remove this handler from. + if (!instance_) return; + return Instance()->ClearInternal(handler); +} +void MessageQueueManager::ClearInternal(MessageHandler *handler) { + // Deleted objects may cause re-entrant calls to ClearInternal. This is + // allowed as the list of message queues does not change while queues are + // cleared. + MarkProcessingCritScope cs(&crit_, &processing_); + std::vector<MessageQueue *>::iterator iter; + for (MessageQueue* queue : message_queues_) { + queue->Clear(handler); + } +} + +void MessageQueueManager::ProcessAllMessageQueues() { + if (!instance_) { + return; + } + return Instance()->ProcessAllMessageQueuesInternal(); +} + +void MessageQueueManager::ProcessAllMessageQueuesInternal() { + // This works by posting a delayed message at the current time and waiting + // for it to be dispatched on all queues, which will ensure that all messages + // that came before it were also dispatched. + volatile int queues_not_done = 0; + + // This class is used so that whether the posted message is processed, or the + // message queue is simply cleared, queues_not_done gets decremented. + class ScopedIncrement : public MessageData { + public: + ScopedIncrement(volatile int* value) : value_(value) { + AtomicOps::Increment(value_); + } + ~ScopedIncrement() override { AtomicOps::Decrement(value_); } + + private: + volatile int* value_; + }; + + { + MarkProcessingCritScope cs(&crit_, &processing_); + for (MessageQueue* queue : message_queues_) { + if (!queue->IsProcessingMessages()) { + // If the queue is not processing messages, it can + // be ignored. If we tried to post a message to it, it would be dropped + // or ignored. + continue; + } + queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, + new ScopedIncrement(&queues_not_done)); + } + } + // Note: One of the message queues may have been on this thread, which is why + // we can't synchronously wait for queues_not_done to go to 0; we need to + // process messages as well. + while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { + rtc::Thread::Current()->ProcessMessages(0); + } +} + +//------------------------------------------------------------------ +// MessageQueue +MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) + : fPeekKeep_(false), + dmsgq_next_num_(0), + fInitialized_(false), + fDestroyed_(false), + stop_(0), + ss_(ss) { + RTC_DCHECK(ss); + // Currently, MessageQueue holds a socket server, and is the base class for + // Thread. It seems like it makes more sense for Thread to hold the socket + // server, and provide it to the MessageQueue, since the Thread controls + // the I/O model, and MQ is agnostic to those details. Anyway, this causes + // messagequeue_unittest to depend on network libraries... yuck. + ss_->SetMessageQueue(this); + if (init_queue) { + DoInit(); + } +} + +MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue) + : MessageQueue(ss.get(), init_queue) { + own_ss_ = std::move(ss); +} + +MessageQueue::~MessageQueue() { + DoDestroy(); +} + +void MessageQueue::DoInit() { + if (fInitialized_) { + return; + } + + fInitialized_ = true; + MessageQueueManager::Add(this); +} + +void MessageQueue::DoDestroy() { + if (fDestroyed_) { + return; + } + + fDestroyed_ = true; + // The signal is done from here to ensure + // that it always gets called when the queue + // is going away. + SignalQueueDestroyed(); + MessageQueueManager::Remove(this); + Clear(nullptr); + + if (ss_) { + ss_->SetMessageQueue(nullptr); + } +} + +SocketServer* MessageQueue::socketserver() { + return ss_; +} + +void MessageQueue::WakeUpSocketServer() { + ss_->WakeUp(); +} + +void MessageQueue::Quit() { + AtomicOps::ReleaseStore(&stop_, 1); + WakeUpSocketServer(); +} + +bool MessageQueue::IsQuitting() { + return AtomicOps::AcquireLoad(&stop_) != 0; +} + +bool MessageQueue::IsProcessingMessages() { + return !IsQuitting(); +} + +void MessageQueue::Restart() { + AtomicOps::ReleaseStore(&stop_, 0); +} + +bool MessageQueue::Peek(Message *pmsg, int cmsWait) { + if (fPeekKeep_) { + *pmsg = msgPeek_; + return true; + } + if (!Get(pmsg, cmsWait)) + return false; + msgPeek_ = *pmsg; + fPeekKeep_ = true; + return true; +} + +bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) { + // Return and clear peek if present + // Always return the peek if it exists so there is Peek/Get symmetry + + if (fPeekKeep_) { + *pmsg = msgPeek_; + fPeekKeep_ = false; + return true; + } + + // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch + + int64_t cmsTotal = cmsWait; + int64_t cmsElapsed = 0; + int64_t msStart = TimeMillis(); + int64_t msCurrent = msStart; + while (true) { + // Check for sent messages + ReceiveSends(); + + // Check for posted events + int64_t cmsDelayNext = kForever; + bool first_pass = true; + while (true) { + // All queue operations need to be locked, but nothing else in this loop + // (specifically handling disposed message) can happen inside the crit. + // Otherwise, disposed MessageHandlers will cause deadlocks. + { + CritScope cs(&crit_); + // On the first pass, check for delayed messages that have been + // triggered and calculate the next trigger time. + if (first_pass) { + first_pass = false; + while (!dmsgq_.empty()) { + if (msCurrent < dmsgq_.top().msTrigger_) { + cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); + break; + } + msgq_.push_back(dmsgq_.top().msg_); + dmsgq_.pop(); + } + } + // Pull a message off the message queue, if available. + if (msgq_.empty()) { + break; + } else { + *pmsg = msgq_.front(); + msgq_.pop_front(); + } + } // crit_ is released here. + + // Log a warning for time-sensitive messages that we're late to deliver. + if (pmsg->ts_sensitive) { + int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive); + if (delay > 0) { + RTC_LOG_F(LS_WARNING) + << "id: " << pmsg->message_id + << " delay: " << (delay + kMaxMsgLatency) << "ms"; + } + } + // If this was a dispose message, delete it and skip it. + if (MQID_DISPOSE == pmsg->message_id) { + RTC_DCHECK(nullptr == pmsg->phandler); + delete pmsg->pdata; + *pmsg = Message(); + continue; + } + return true; + } + + if (IsQuitting()) + break; + + // Which is shorter, the delay wait or the asked wait? + + int64_t cmsNext; + if (cmsWait == kForever) { + cmsNext = cmsDelayNext; + } else { + cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); + if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) + cmsNext = cmsDelayNext; + } + + { + // Wait and multiplex in the meantime + if (!ss_->Wait(static_cast<int>(cmsNext), process_io)) + return false; + } + + // If the specified timeout expired, return + + msCurrent = TimeMillis(); + cmsElapsed = TimeDiff(msCurrent, msStart); + if (cmsWait != kForever) { + if (cmsElapsed >= cmsWait) + return false; + } + } + return false; +} + +void MessageQueue::ReceiveSends() { +} + +void MessageQueue::Post(const Location& posted_from, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata, + bool time_sensitive) { + if (IsQuitting()) + return; + + // Keep thread safe + // Add the message to the end of the queue + // Signal for the multiplexer to return + + { + CritScope cs(&crit_); + Message msg; + msg.posted_from = posted_from; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + if (time_sensitive) { + msg.ts_sensitive = TimeMillis() + kMaxMsgLatency; + } + msgq_.push_back(msg); + } + WakeUpSocketServer(); +} + +void MessageQueue::PostDelayed(const Location& posted_from, + int cmsDelay, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata) { + return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id, + pdata); +} + +void MessageQueue::PostAt(const Location& posted_from, + uint32_t tstamp, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata) { + // This should work even if it is used (unexpectedly). + int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp; + return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata); +} + +void MessageQueue::PostAt(const Location& posted_from, + int64_t tstamp, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata) { + return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, + pdata); +} + +void MessageQueue::DoDelayPost(const Location& posted_from, + int64_t cmsDelay, + int64_t tstamp, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata) { + if (IsQuitting()) { + return; + } + + // Keep thread safe + // Add to the priority queue. Gets sorted soonest first. + // Signal for the multiplexer to return. + + { + CritScope cs(&crit_); + Message msg; + msg.posted_from = posted_from; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); + dmsgq_.push(dmsg); + // If this message queue processes 1 message every millisecond for 50 days, + // we will wrap this number. Even then, only messages with identical times + // will be misordered, and then only briefly. This is probably ok. + ++dmsgq_next_num_; + RTC_DCHECK_NE(0, dmsgq_next_num_); + } + WakeUpSocketServer(); +} + +int MessageQueue::GetDelay() { + CritScope cs(&crit_); + + if (!msgq_.empty()) + return 0; + + if (!dmsgq_.empty()) { + int delay = TimeUntil(dmsgq_.top().msTrigger_); + if (delay < 0) + delay = 0; + return delay; + } + + return kForever; +} + +void MessageQueue::Clear(MessageHandler* phandler, + uint32_t id, + MessageList* removed) { + CritScope cs(&crit_); + + // Remove messages with phandler + + if (fPeekKeep_ && msgPeek_.Match(phandler, id)) { + if (removed) { + removed->push_back(msgPeek_); + } else { + delete msgPeek_.pdata; + } + fPeekKeep_ = false; + } + + // Remove from ordered message queue + + for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) { + if (it->Match(phandler, id)) { + if (removed) { + removed->push_back(*it); + } else { + delete it->pdata; + } + it = msgq_.erase(it); + } else { + ++it; + } + } + + // Remove from priority queue. Not directly iterable, so use this approach + + PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin(); + for (PriorityQueue::container_type::iterator it = new_end; + it != dmsgq_.container().end(); ++it) { + if (it->msg_.Match(phandler, id)) { + if (removed) { + removed->push_back(it->msg_); + } else { + delete it->msg_.pdata; + } + } else { + *new_end++ = *it; + } + } + dmsgq_.container().erase(new_end, dmsgq_.container().end()); + dmsgq_.reheap(); +} + +void MessageQueue::Dispatch(Message *pmsg) { + TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line", + pmsg->posted_from.file_and_line(), "src_func", + pmsg->posted_from.function_name()); + int64_t start_time = TimeMillis(); + pmsg->phandler->OnMessage(pmsg); + int64_t end_time = TimeMillis(); + int64_t diff = TimeDiff(end_time, start_time); + if (diff >= kSlowDispatchLoggingThreshold) { + RTC_LOG(LS_INFO) << "Message took " << diff + << "ms to dispatch. Posted from: " + << pmsg->posted_from.ToString(); + } +} + +} // namespace rtc |