summaryrefslogtreecommitdiffstats
path: root/ipc/chromium/src/mojo/core/ports/message_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'ipc/chromium/src/mojo/core/ports/message_queue.cc')
-rw-r--r--ipc/chromium/src/mojo/core/ports/message_queue.cc97
1 files changed, 97 insertions, 0 deletions
diff --git a/ipc/chromium/src/mojo/core/ports/message_queue.cc b/ipc/chromium/src/mojo/core/ports/message_queue.cc
new file mode 100644
index 0000000000..0e785bec2b
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/message_queue.cc
@@ -0,0 +1,97 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/core/ports/message_queue.h"
+
+#include <algorithm>
+
+#include "base/compiler_specific.h"
+#include "base/logging.h"
+#include "mojo/core/ports/message_filter.h"
+#include "mozilla/Likely.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+// Used by std::{push,pop}_heap functions
+inline bool operator<(const mozilla::UniquePtr<UserMessageEvent>& a,
+ const mozilla::UniquePtr<UserMessageEvent>& b) {
+ return a->sequence_num() > b->sequence_num();
+}
+
+MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {}
+
+MessageQueue::MessageQueue(uint64_t next_sequence_num)
+ : next_sequence_num_(next_sequence_num) {
+ // The message queue is blocked waiting for a message with sequence number
+ // equal to |next_sequence_num|.
+}
+
+MessageQueue::~MessageQueue() {
+#ifdef DEBUG
+ size_t num_leaked_ports = 0;
+ for (const auto& message : heap_) {
+ num_leaked_ports += message->num_ports();
+ }
+ if (num_leaked_ports > 0) {
+ DVLOG(1) << "Leaking " << num_leaked_ports
+ << " ports in unreceived messages";
+ }
+#endif
+}
+
+bool MessageQueue::HasNextMessage() const {
+ return !heap_.empty() && heap_[0]->sequence_num() == next_sequence_num_;
+}
+
+void MessageQueue::GetNextMessage(mozilla::UniquePtr<UserMessageEvent>* message,
+ MessageFilter* filter) {
+ if (!HasNextMessage() || (filter && !filter->Match(*heap_[0]))) {
+ message->reset();
+ return;
+ }
+
+ std::pop_heap(heap_.begin(), heap_.end());
+ *message = std::move(heap_.back());
+ total_queued_bytes_ -= (*message)->GetSizeIfSerialized();
+ heap_.pop_back();
+
+ // We keep the capacity of |heap_| in check so that a large batch of incoming
+ // messages doesn't permanently wreck available memory. The choice of interval
+ // here is somewhat arbitrary.
+ constexpr size_t kHeapMinimumShrinkSize = 16;
+ constexpr size_t kHeapShrinkInterval = 512;
+ if (MOZ_UNLIKELY(heap_.size() > kHeapMinimumShrinkSize &&
+ heap_.size() % kHeapShrinkInterval == 0)) {
+ heap_.shrink_to_fit();
+ }
+
+ next_sequence_num_++;
+}
+
+void MessageQueue::AcceptMessage(mozilla::UniquePtr<UserMessageEvent> message,
+ bool* has_next_message) {
+ // TODO: Handle sequence number roll-over.
+
+ total_queued_bytes_ += message->GetSizeIfSerialized();
+ heap_.emplace_back(std::move(message));
+ std::push_heap(heap_.begin(), heap_.end());
+
+ if (!signalable_) {
+ *has_next_message = false;
+ } else {
+ *has_next_message = (heap_[0]->sequence_num() == next_sequence_num_);
+ }
+}
+
+void MessageQueue::TakeAllMessages(
+ std::vector<mozilla::UniquePtr<UserMessageEvent>>* messages) {
+ *messages = std::move(heap_);
+ total_queued_bytes_ = 0;
+}
+
+} // namespace ports
+} // namespace core
+} // namespace mojo