// Copyright 2016 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/core/ports/message_queue.h" #include #include "base/compiler_specific.h" #include "base/logging.h" #include "mojo/core/ports/message_filter.h" #include "mozilla/Likely.h" namespace mojo { namespace core { namespace ports { // Used by std::{push,pop}_heap functions inline bool operator<(const mozilla::UniquePtr& a, const mozilla::UniquePtr& b) { return a->sequence_num() > b->sequence_num(); } MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {} MessageQueue::MessageQueue(uint64_t next_sequence_num) : next_sequence_num_(next_sequence_num) { // The message queue is blocked waiting for a message with sequence number // equal to |next_sequence_num|. } MessageQueue::~MessageQueue() { #ifdef DEBUG size_t num_leaked_ports = 0; for (const auto& message : heap_) { num_leaked_ports += message->num_ports(); } if (num_leaked_ports > 0) { DVLOG(1) << "Leaking " << num_leaked_ports << " ports in unreceived messages"; } #endif } bool MessageQueue::HasNextMessage() const { return !heap_.empty() && heap_[0]->sequence_num() == next_sequence_num_; } void MessageQueue::GetNextMessage(mozilla::UniquePtr* message, MessageFilter* filter) { if (!HasNextMessage() || (filter && !filter->Match(*heap_[0]))) { message->reset(); return; } std::pop_heap(heap_.begin(), heap_.end()); *message = std::move(heap_.back()); total_queued_bytes_ -= (*message)->GetSizeIfSerialized(); heap_.pop_back(); // We keep the capacity of |heap_| in check so that a large batch of incoming // messages doesn't permanently wreck available memory. The choice of interval // here is somewhat arbitrary. constexpr size_t kHeapMinimumShrinkSize = 16; constexpr size_t kHeapShrinkInterval = 512; if (MOZ_UNLIKELY(heap_.size() > kHeapMinimumShrinkSize && heap_.size() % kHeapShrinkInterval == 0)) { heap_.shrink_to_fit(); } next_sequence_num_++; } void MessageQueue::AcceptMessage(mozilla::UniquePtr message, bool* has_next_message) { // TODO: Handle sequence number roll-over. total_queued_bytes_ += message->GetSizeIfSerialized(); heap_.emplace_back(std::move(message)); std::push_heap(heap_.begin(), heap_.end()); if (!signalable_) { *has_next_message = false; } else { *has_next_message = (heap_[0]->sequence_num() == next_sequence_num_); } } void MessageQueue::TakeAllMessages( std::vector>* messages) { *messages = std::move(heap_); total_queued_bytes_ = 0; } } // namespace ports } // namespace core } // namespace mojo