summaryrefslogtreecommitdiffstats
path: root/ipc/chromium/src/mojo/core
diff options
context:
space:
mode:
Diffstat (limited to 'ipc/chromium/src/mojo/core')
-rw-r--r--ipc/chromium/src/mojo/core/ports/event.cc565
-rw-r--r--ipc/chromium/src/mojo/core/ports/event.h393
-rw-r--r--ipc/chromium/src/mojo/core/ports/message_filter.h29
-rw-r--r--ipc/chromium/src/mojo/core/ports/message_queue.cc97
-rw-r--r--ipc/chromium/src/mojo/core/ports/message_queue.h88
-rw-r--r--ipc/chromium/src/mojo/core/ports/name.cc54
-rw-r--r--ipc/chromium/src/mojo/core/ports/name.h122
-rw-r--r--ipc/chromium/src/mojo/core/ports/node.cc2130
-rw-r--r--ipc/chromium/src/mojo/core/ports/node.h355
-rw-r--r--ipc/chromium/src/mojo/core/ports/node_delegate.h38
-rw-r--r--ipc/chromium/src/mojo/core/ports/port.cc95
-rw-r--r--ipc/chromium/src/mojo/core/ports/port.h277
-rw-r--r--ipc/chromium/src/mojo/core/ports/port_locker.cc75
-rw-r--r--ipc/chromium/src/mojo/core/ports/port_locker.h90
-rw-r--r--ipc/chromium/src/mojo/core/ports/port_ref.cc30
-rw-r--r--ipc/chromium/src/mojo/core/ports/port_ref.h47
-rw-r--r--ipc/chromium/src/mojo/core/ports/user_data.h25
-rw-r--r--ipc/chromium/src/mojo/core/ports/user_message.cc21
-rw-r--r--ipc/chromium/src/mojo/core/ports/user_message.h62
19 files changed, 4593 insertions, 0 deletions
diff --git a/ipc/chromium/src/mojo/core/ports/event.cc b/ipc/chromium/src/mojo/core/ports/event.cc
new file mode 100644
index 0000000000..5caec8ae55
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/event.cc
@@ -0,0 +1,565 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/core/ports/event.h"
+
+#include <stdint.h>
+#include <string.h>
+
+#include "base/logging.h"
+#include "mojo/core/ports/name.h"
+#include "mojo/core/ports/user_message.h"
+#include "mozilla/Assertions.h"
+#include "mozilla/CheckedInt.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+namespace {
+
+const size_t kPortsMessageAlignment = 8;
+
+#pragma pack(push, 1)
+
+struct SerializedHeader {
+ Event::Type type;
+ uint32_t padding;
+ PortName port_name;
+ PortName from_port;
+ uint64_t control_sequence_num;
+};
+
+struct UserMessageEventData {
+ uint64_t sequence_num;
+ uint32_t num_ports;
+ uint32_t padding;
+};
+
+struct ObserveProxyEventData {
+ NodeName proxy_node_name;
+ PortName proxy_port_name;
+ NodeName proxy_target_node_name;
+ PortName proxy_target_port_name;
+};
+
+struct ObserveProxyAckEventData {
+ uint64_t last_sequence_num;
+};
+
+struct ObserveClosureEventData {
+ uint64_t last_sequence_num;
+};
+
+struct MergePortEventData {
+ PortName new_port_name;
+ Event::PortDescriptor new_port_descriptor;
+};
+
+struct UserMessageReadAckRequestEventData {
+ uint64_t sequence_num_to_acknowledge;
+};
+
+struct UserMessageReadAckEventData {
+ uint64_t sequence_num_acknowledged;
+};
+
+struct UpdatePreviousPeerEventData {
+ NodeName new_node_name;
+ PortName new_port_name;
+};
+
+#pragma pack(pop)
+
+static_assert(sizeof(Event::PortDescriptor) % kPortsMessageAlignment == 0,
+ "Invalid PortDescriptor size.");
+
+static_assert(sizeof(SerializedHeader) % kPortsMessageAlignment == 0,
+ "Invalid SerializedHeader size.");
+
+static_assert(sizeof(UserMessageEventData) % kPortsMessageAlignment == 0,
+ "Invalid UserEventData size.");
+
+static_assert(sizeof(ObserveProxyEventData) % kPortsMessageAlignment == 0,
+ "Invalid ObserveProxyEventData size.");
+
+static_assert(sizeof(ObserveProxyAckEventData) % kPortsMessageAlignment == 0,
+ "Invalid ObserveProxyAckEventData size.");
+
+static_assert(sizeof(ObserveClosureEventData) % kPortsMessageAlignment == 0,
+ "Invalid ObserveClosureEventData size.");
+
+static_assert(sizeof(MergePortEventData) % kPortsMessageAlignment == 0,
+ "Invalid MergePortEventData size.");
+
+static_assert(sizeof(UserMessageReadAckRequestEventData) %
+ kPortsMessageAlignment ==
+ 0,
+ "Invalid UserMessageReadAckRequestEventData size.");
+
+static_assert(sizeof(UserMessageReadAckEventData) % kPortsMessageAlignment == 0,
+ "Invalid UserMessageReadAckEventData size.");
+
+static_assert(sizeof(UpdatePreviousPeerEventData) % kPortsMessageAlignment == 0,
+ "Invalid UpdatePreviousPeerEventData size.");
+
+} // namespace
+
+Event::PortDescriptor::PortDescriptor() { memset(padding, 0, sizeof(padding)); }
+
+Event::~Event() = default;
+
+// static
+ScopedEvent Event::Deserialize(const void* buffer, size_t num_bytes) {
+ if (num_bytes < sizeof(SerializedHeader)) {
+ return nullptr;
+ }
+
+ const auto* header = static_cast<const SerializedHeader*>(buffer);
+ const PortName& port_name = header->port_name;
+ const PortName& from_port = header->from_port;
+ const uint64_t control_sequence_num = header->control_sequence_num;
+ const size_t data_size = num_bytes - sizeof(*header);
+ switch (header->type) {
+ case Type::kUserMessage:
+ return UserMessageEvent::Deserialize(
+ port_name, from_port, control_sequence_num, header + 1, data_size);
+ case Type::kPortAccepted:
+ return PortAcceptedEvent::Deserialize(
+ port_name, from_port, control_sequence_num, header + 1, data_size);
+ case Type::kObserveProxy:
+ return ObserveProxyEvent::Deserialize(
+ port_name, from_port, control_sequence_num, header + 1, data_size);
+ case Type::kObserveProxyAck:
+ return ObserveProxyAckEvent::Deserialize(
+ port_name, from_port, control_sequence_num, header + 1, data_size);
+ case Type::kObserveClosure:
+ return ObserveClosureEvent::Deserialize(
+ port_name, from_port, control_sequence_num, header + 1, data_size);
+ case Type::kMergePort:
+ return MergePortEvent::Deserialize(
+ port_name, from_port, control_sequence_num, header + 1, data_size);
+ case Type::kUserMessageReadAckRequest:
+ return UserMessageReadAckRequestEvent::Deserialize(
+ port_name, from_port, control_sequence_num, header + 1, data_size);
+ case Type::kUserMessageReadAck:
+ return UserMessageReadAckEvent::Deserialize(
+ port_name, from_port, control_sequence_num, header + 1, data_size);
+ case Type::kUpdatePreviousPeer:
+ return UpdatePreviousPeerEvent::Deserialize(
+ port_name, from_port, control_sequence_num, header + 1, data_size);
+ default:
+ DVLOG(2) << "Ingoring unknown port event type: "
+ << static_cast<uint32_t>(header->type);
+ return nullptr;
+ }
+}
+
+Event::Event(Type type, const PortName& port_name, const PortName& from_port,
+ uint64_t control_sequence_num)
+ : type_(type),
+ port_name_(port_name),
+ from_port_(from_port),
+ control_sequence_num_(control_sequence_num) {}
+
+size_t Event::GetSerializedSize() const {
+ return sizeof(SerializedHeader) + GetSerializedDataSize();
+}
+
+void Event::Serialize(void* buffer) const {
+ auto* header = static_cast<SerializedHeader*>(buffer);
+ header->type = type_;
+ header->padding = 0;
+ header->port_name = port_name_;
+ header->from_port = from_port_;
+ header->control_sequence_num = control_sequence_num_;
+ SerializeData(header + 1);
+}
+
+ScopedEvent Event::CloneForBroadcast() const { return nullptr; }
+
+UserMessageEvent::~UserMessageEvent() = default;
+
+UserMessageEvent::UserMessageEvent(size_t num_ports)
+ : Event(Type::kUserMessage, kInvalidPortName, kInvalidPortName, -1) {
+ ReservePorts(num_ports);
+}
+
+void UserMessageEvent::AttachMessage(mozilla::UniquePtr<UserMessage> message) {
+ DCHECK(!message_);
+ message_ = std::move(message);
+}
+
+void UserMessageEvent::ReservePorts(size_t num_ports) {
+ port_descriptors_.resize(num_ports);
+ ports_.resize(num_ports);
+}
+
+bool UserMessageEvent::NotifyWillBeRoutedExternally() {
+ DCHECK(message_);
+ return message_->WillBeRoutedExternally(*this);
+}
+
+// static
+ScopedEvent UserMessageEvent::Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer,
+ size_t num_bytes) {
+ if (num_bytes < sizeof(UserMessageEventData)) {
+ return nullptr;
+ }
+
+ const auto* data = static_cast<const UserMessageEventData*>(buffer);
+ mozilla::CheckedInt<size_t> port_data_size = data->num_ports;
+ port_data_size *= sizeof(PortDescriptor) + sizeof(PortName);
+ if (!port_data_size.isValid()) {
+ return nullptr;
+ }
+
+ mozilla::CheckedInt<size_t> total_size = port_data_size.value();
+ total_size += sizeof(UserMessageEventData);
+ if (!total_size.isValid() || num_bytes < total_size.value()) {
+ return nullptr;
+ }
+
+ auto event = mozilla::WrapUnique(new UserMessageEvent(
+ port_name, from_port, control_sequence_num, data->sequence_num));
+ event->ReservePorts(data->num_ports);
+ const auto* in_descriptors =
+ reinterpret_cast<const PortDescriptor*>(data + 1);
+ std::copy(in_descriptors, in_descriptors + data->num_ports,
+ event->port_descriptors());
+
+ const auto* in_names =
+ reinterpret_cast<const PortName*>(in_descriptors + data->num_ports);
+ std::copy(in_names, in_names + data->num_ports, event->ports());
+ return event;
+}
+
+UserMessageEvent::UserMessageEvent(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ uint64_t sequence_num)
+ : Event(Type::kUserMessage, port_name, from_port, control_sequence_num),
+ sequence_num_(sequence_num) {}
+
+size_t UserMessageEvent::GetSizeIfSerialized() const {
+ if (!message_) {
+ return 0;
+ }
+ return message_->GetSizeIfSerialized();
+}
+
+size_t UserMessageEvent::GetSerializedDataSize() const {
+ DCHECK_EQ(ports_.size(), port_descriptors_.size());
+ mozilla::CheckedInt<size_t> size = sizeof(UserMessageEventData);
+ mozilla::CheckedInt<size_t> ports_size =
+ sizeof(PortDescriptor) + sizeof(PortName);
+ ports_size *= ports_.size();
+ mozilla::CheckedInt<size_t> combined = size + ports_size;
+ MOZ_RELEASE_ASSERT(combined.isValid());
+ return combined.value();
+}
+
+void UserMessageEvent::SerializeData(void* buffer) const {
+ DCHECK_EQ(ports_.size(), port_descriptors_.size());
+ auto* data = static_cast<UserMessageEventData*>(buffer);
+ data->sequence_num = sequence_num_;
+ mozilla::CheckedInt<uint32_t> num_ports{ports_.size()};
+ DCHECK(num_ports.isValid());
+ data->num_ports = num_ports.value();
+ data->padding = 0;
+
+ auto* ports_data = reinterpret_cast<PortDescriptor*>(data + 1);
+ std::copy(port_descriptors_.begin(), port_descriptors_.end(), ports_data);
+
+ auto* port_names_data =
+ reinterpret_cast<PortName*>(ports_data + ports_.size());
+ std::copy(ports_.begin(), ports_.end(), port_names_data);
+}
+
+PortAcceptedEvent::PortAcceptedEvent(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num)
+ : Event(Type::kPortAccepted, port_name, from_port, control_sequence_num) {}
+
+PortAcceptedEvent::~PortAcceptedEvent() = default;
+
+// static
+ScopedEvent PortAcceptedEvent::Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer,
+ size_t num_bytes) {
+ return mozilla::MakeUnique<PortAcceptedEvent>(port_name, from_port,
+ control_sequence_num);
+}
+
+size_t PortAcceptedEvent::GetSerializedDataSize() const { return 0; }
+
+void PortAcceptedEvent::SerializeData(void* buffer) const {}
+
+ObserveProxyEvent::ObserveProxyEvent(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const NodeName& proxy_node_name,
+ const PortName& proxy_port_name,
+ const NodeName& proxy_target_node_name,
+ const PortName& proxy_target_port_name)
+ : Event(Type::kObserveProxy, port_name, from_port, control_sequence_num),
+ proxy_node_name_(proxy_node_name),
+ proxy_port_name_(proxy_port_name),
+ proxy_target_node_name_(proxy_target_node_name),
+ proxy_target_port_name_(proxy_target_port_name) {}
+
+ObserveProxyEvent::~ObserveProxyEvent() = default;
+
+// static
+ScopedEvent ObserveProxyEvent::Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer,
+ size_t num_bytes) {
+ if (num_bytes < sizeof(ObserveProxyEventData)) {
+ return nullptr;
+ }
+
+ const auto* data = static_cast<const ObserveProxyEventData*>(buffer);
+ return mozilla::MakeUnique<ObserveProxyEvent>(
+ port_name, from_port, control_sequence_num, data->proxy_node_name,
+ data->proxy_port_name, data->proxy_target_node_name,
+ data->proxy_target_port_name);
+}
+
+size_t ObserveProxyEvent::GetSerializedDataSize() const {
+ return sizeof(ObserveProxyEventData);
+}
+
+void ObserveProxyEvent::SerializeData(void* buffer) const {
+ auto* data = static_cast<ObserveProxyEventData*>(buffer);
+ data->proxy_node_name = proxy_node_name_;
+ data->proxy_port_name = proxy_port_name_;
+ data->proxy_target_node_name = proxy_target_node_name_;
+ data->proxy_target_port_name = proxy_target_port_name_;
+}
+
+ScopedEvent ObserveProxyEvent::CloneForBroadcast() const {
+ // Don't broadcast events targeted at specific ports. Otherwise a malicious
+ // node can use this to bypass sender verification.
+ if (port_name() != kInvalidPortName) {
+ return nullptr;
+ }
+ return mozilla::MakeUnique<ObserveProxyEvent>(
+ port_name(), from_port(), control_sequence_num(), proxy_node_name_,
+ proxy_port_name_, proxy_target_node_name_, proxy_target_port_name_);
+}
+
+ObserveProxyAckEvent::ObserveProxyAckEvent(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ uint64_t last_sequence_num)
+ : Event(Type::kObserveProxyAck, port_name, from_port, control_sequence_num),
+ last_sequence_num_(last_sequence_num) {}
+
+ObserveProxyAckEvent::~ObserveProxyAckEvent() = default;
+
+// static
+ScopedEvent ObserveProxyAckEvent::Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer,
+ size_t num_bytes) {
+ if (num_bytes < sizeof(ObserveProxyAckEventData)) {
+ return nullptr;
+ }
+
+ const auto* data = static_cast<const ObserveProxyAckEventData*>(buffer);
+ return mozilla::MakeUnique<ObserveProxyAckEvent>(
+ port_name, from_port, control_sequence_num, data->last_sequence_num);
+}
+
+size_t ObserveProxyAckEvent::GetSerializedDataSize() const {
+ return sizeof(ObserveProxyAckEventData);
+}
+
+void ObserveProxyAckEvent::SerializeData(void* buffer) const {
+ auto* data = static_cast<ObserveProxyAckEventData*>(buffer);
+ data->last_sequence_num = last_sequence_num_;
+}
+
+ObserveClosureEvent::ObserveClosureEvent(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ uint64_t last_sequence_num)
+ : Event(Type::kObserveClosure, port_name, from_port, control_sequence_num),
+ last_sequence_num_(last_sequence_num) {}
+
+ObserveClosureEvent::~ObserveClosureEvent() = default;
+
+// static
+ScopedEvent ObserveClosureEvent::Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer,
+ size_t num_bytes) {
+ if (num_bytes < sizeof(ObserveClosureEventData)) {
+ return nullptr;
+ }
+
+ const auto* data = static_cast<const ObserveClosureEventData*>(buffer);
+ return mozilla::MakeUnique<ObserveClosureEvent>(
+ port_name, from_port, control_sequence_num, data->last_sequence_num);
+}
+
+size_t ObserveClosureEvent::GetSerializedDataSize() const {
+ return sizeof(ObserveClosureEventData);
+}
+
+void ObserveClosureEvent::SerializeData(void* buffer) const {
+ auto* data = static_cast<ObserveClosureEventData*>(buffer);
+ data->last_sequence_num = last_sequence_num_;
+}
+
+MergePortEvent::MergePortEvent(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const PortName& new_port_name,
+ const PortDescriptor& new_port_descriptor)
+ : Event(Type::kMergePort, port_name, from_port, control_sequence_num),
+ new_port_name_(new_port_name),
+ new_port_descriptor_(new_port_descriptor) {}
+
+MergePortEvent::~MergePortEvent() = default;
+
+// static
+ScopedEvent MergePortEvent::Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer, size_t num_bytes) {
+ if (num_bytes < sizeof(MergePortEventData)) {
+ return nullptr;
+ }
+
+ const auto* data = static_cast<const MergePortEventData*>(buffer);
+ return mozilla::MakeUnique<MergePortEvent>(
+ port_name, from_port, control_sequence_num, data->new_port_name,
+ data->new_port_descriptor);
+}
+
+size_t MergePortEvent::GetSerializedDataSize() const {
+ return sizeof(MergePortEventData);
+}
+
+void MergePortEvent::SerializeData(void* buffer) const {
+ auto* data = static_cast<MergePortEventData*>(buffer);
+ data->new_port_name = new_port_name_;
+ data->new_port_descriptor = new_port_descriptor_;
+}
+
+UserMessageReadAckRequestEvent::UserMessageReadAckRequestEvent(
+ const PortName& port_name, const PortName& from_port,
+ uint64_t control_sequence_num, uint64_t sequence_num_to_acknowledge)
+ : Event(Type::kUserMessageReadAckRequest, port_name, from_port,
+ control_sequence_num),
+ sequence_num_to_acknowledge_(sequence_num_to_acknowledge) {}
+
+UserMessageReadAckRequestEvent::~UserMessageReadAckRequestEvent() = default;
+
+// static
+ScopedEvent UserMessageReadAckRequestEvent::Deserialize(
+ const PortName& port_name, const PortName& from_port,
+ uint64_t control_sequence_num, const void* buffer, size_t num_bytes) {
+ if (num_bytes < sizeof(UserMessageReadAckRequestEventData)) {
+ return nullptr;
+ }
+
+ const auto* data =
+ static_cast<const UserMessageReadAckRequestEventData*>(buffer);
+ return mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
+ port_name, from_port, control_sequence_num,
+ data->sequence_num_to_acknowledge);
+}
+
+size_t UserMessageReadAckRequestEvent::GetSerializedDataSize() const {
+ return sizeof(UserMessageReadAckRequestEventData);
+}
+
+void UserMessageReadAckRequestEvent::SerializeData(void* buffer) const {
+ auto* data = static_cast<UserMessageReadAckRequestEventData*>(buffer);
+ data->sequence_num_to_acknowledge = sequence_num_to_acknowledge_;
+}
+
+UserMessageReadAckEvent::UserMessageReadAckEvent(
+ const PortName& port_name, const PortName& from_port,
+ uint64_t control_sequence_num, uint64_t sequence_num_acknowledged)
+ : Event(Type::kUserMessageReadAck, port_name, from_port,
+ control_sequence_num),
+ sequence_num_acknowledged_(sequence_num_acknowledged) {}
+
+UserMessageReadAckEvent::~UserMessageReadAckEvent() = default;
+
+// static
+ScopedEvent UserMessageReadAckEvent::Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer,
+ size_t num_bytes) {
+ if (num_bytes < sizeof(UserMessageReadAckEventData)) {
+ return nullptr;
+ }
+
+ const auto* data = static_cast<const UserMessageReadAckEventData*>(buffer);
+ return mozilla::MakeUnique<UserMessageReadAckEvent>(
+ port_name, from_port, control_sequence_num,
+ data->sequence_num_acknowledged);
+}
+
+size_t UserMessageReadAckEvent::GetSerializedDataSize() const {
+ return sizeof(UserMessageReadAckEventData);
+}
+
+void UserMessageReadAckEvent::SerializeData(void* buffer) const {
+ auto* data = static_cast<UserMessageReadAckEventData*>(buffer);
+ data->sequence_num_acknowledged = sequence_num_acknowledged_;
+}
+
+UpdatePreviousPeerEvent::UpdatePreviousPeerEvent(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const NodeName& new_node_name,
+ const PortName& new_port_name)
+ : Event(Type::kUpdatePreviousPeer, port_name, from_port,
+ control_sequence_num),
+ new_node_name_(new_node_name),
+ new_port_name_(new_port_name) {}
+
+UpdatePreviousPeerEvent::~UpdatePreviousPeerEvent() = default;
+
+// static
+ScopedEvent UpdatePreviousPeerEvent::Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer,
+ size_t num_bytes) {
+ if (num_bytes < sizeof(UpdatePreviousPeerEventData)) return nullptr;
+ const auto* data = static_cast<const UpdatePreviousPeerEventData*>(buffer);
+ return mozilla::MakeUnique<UpdatePreviousPeerEvent>(
+ port_name, from_port, control_sequence_num, data->new_node_name,
+ data->new_port_name);
+}
+
+size_t UpdatePreviousPeerEvent::GetSerializedDataSize() const {
+ return sizeof(UpdatePreviousPeerEventData);
+}
+
+void UpdatePreviousPeerEvent::SerializeData(void* buffer) const {
+ auto* data = static_cast<UpdatePreviousPeerEventData*>(buffer);
+ data->new_node_name = new_node_name_;
+ data->new_port_name = new_port_name_;
+}
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
diff --git a/ipc/chromium/src/mojo/core/ports/event.h b/ipc/chromium/src/mojo/core/ports/event.h
new file mode 100644
index 0000000000..e0b6027e0b
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/event.h
@@ -0,0 +1,393 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_CORE_PORTS_EVENT_H_
+#define MOJO_CORE_PORTS_EVENT_H_
+
+#include <stdint.h>
+
+#include <memory>
+#include <vector>
+
+#include "mojo/core/ports/name.h"
+#include "mojo/core/ports/user_message.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+class Event;
+
+using ScopedEvent = mozilla::UniquePtr<Event>;
+
+// A Event is the fundamental unit of operation and communication within and
+// between Nodes.
+class Event {
+ public:
+ enum Type : uint32_t {
+ // A user message event contains arbitrary user-specified payload data
+ // which may include any number of ports and/or system handles (e.g. FDs).
+ kUserMessage,
+
+ // When a Node receives a user message with one or more ports attached, it
+ // sends back an instance of this event for every attached port to indicate
+ // that the port has been accepted by its destination node.
+ kPortAccepted,
+
+ // This event begins circulation any time a port enters a proxying state. It
+ // may be re-circulated in certain edge cases, but the ultimate purpose of
+ // the event is to ensure that every port along a route is (if necessary)
+ // aware that the proxying port is indeed proxying (and to where) so that it
+ // can begin to be bypassed along the route.
+ kObserveProxy,
+
+ // An event used to acknowledge to a proxy that all concerned nodes and
+ // ports are aware of its proxying state and that no more user messages will
+ // be routed to it beyond a given final sequence number.
+ kObserveProxyAck,
+
+ // Indicates that a port has been closed. This event fully circulates a
+ // route to ensure that all ports are aware of closure.
+ kObserveClosure,
+
+ // Used to request the merging of two routes via two sacrificial receiving
+ // ports, one from each route.
+ kMergePort,
+
+ // Used to request that the conjugate port acknowledges read messages by
+ // sending back a UserMessageReadAck.
+ kUserMessageReadAckRequest,
+
+ // Used to acknowledge read messages to the conjugate.
+ kUserMessageReadAck,
+
+ // Used to update the previous node and port name of a port.
+ kUpdatePreviousPeer,
+ };
+
+#pragma pack(push, 1)
+ struct PortDescriptor {
+ PortDescriptor();
+
+ NodeName peer_node_name;
+ PortName peer_port_name;
+ NodeName referring_node_name;
+ PortName referring_port_name;
+ uint64_t next_sequence_num_to_send;
+ uint64_t next_sequence_num_to_receive;
+ uint64_t last_sequence_num_to_receive;
+ bool peer_closed;
+ char padding[7];
+ };
+#pragma pack(pop)
+ virtual ~Event();
+
+ static ScopedEvent Deserialize(const void* buffer, size_t num_bytes);
+
+ template <typename T>
+ static mozilla::UniquePtr<T> Cast(ScopedEvent* event) {
+ return mozilla::WrapUnique(static_cast<T*>(event->release()));
+ }
+
+ Type type() const { return type_; }
+ const PortName& port_name() const { return port_name_; }
+ void set_port_name(const PortName& port_name) { port_name_ = port_name; }
+
+ size_t GetSerializedSize() const;
+ void Serialize(void* buffer) const;
+ virtual ScopedEvent CloneForBroadcast() const;
+
+ const PortName& from_port() const { return from_port_; }
+ void set_from_port(const PortName& from_port) { from_port_ = from_port; }
+
+ uint64_t control_sequence_num() const { return control_sequence_num_; }
+ void set_control_sequence_num(uint64_t control_sequence_num) {
+ control_sequence_num_ = control_sequence_num;
+ }
+
+ protected:
+ Event(Type type, const PortName& port_name, const PortName& from_port,
+ uint64_t control_sequence_num);
+
+ virtual size_t GetSerializedDataSize() const = 0;
+ virtual void SerializeData(void* buffer) const = 0;
+
+ private:
+ const Type type_;
+ PortName port_name_;
+ PortName from_port_;
+ uint64_t control_sequence_num_;
+
+ DISALLOW_COPY_AND_ASSIGN(Event);
+};
+
+class UserMessageEvent : public Event {
+ public:
+ explicit UserMessageEvent(size_t num_ports);
+ ~UserMessageEvent() override;
+
+ bool HasMessage() const { return !!message_; }
+ void AttachMessage(mozilla::UniquePtr<UserMessage> message);
+
+ template <typename T>
+ T* GetMessage() {
+ DCHECK(HasMessage());
+ DCHECK_EQ(&T::kUserMessageTypeInfo, message_->type_info());
+ return static_cast<T*>(message_.get());
+ }
+
+ template <typename T>
+ const T* GetMessage() const {
+ DCHECK(HasMessage());
+ DCHECK_EQ(&T::kUserMessageTypeInfo, message_->type_info());
+ return static_cast<const T*>(message_.get());
+ }
+
+ template <typename T>
+ mozilla::UniquePtr<T> TakeMessage() {
+ DCHECK(HasMessage());
+ DCHECK_EQ(&T::kUserMessageTypeInfo, message_->type_info());
+ return mozilla::UniquePtr<T>{static_cast<T*>(message_.release())};
+ }
+
+ void ReservePorts(size_t num_ports);
+ bool NotifyWillBeRoutedExternally();
+
+ uint64_t sequence_num() const { return sequence_num_; }
+ void set_sequence_num(uint64_t sequence_num) { sequence_num_ = sequence_num; }
+
+ size_t num_ports() const { return ports_.size(); }
+ PortDescriptor* port_descriptors() { return port_descriptors_.data(); }
+ PortName* ports() { return ports_.data(); }
+
+ static ScopedEvent Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer, size_t num_bytes);
+
+ size_t GetSizeIfSerialized() const;
+
+ private:
+ UserMessageEvent(const PortName& port_name, const PortName& from_port,
+ uint64_t control_sequence_num, uint64_t sequence_num);
+
+ size_t GetSerializedDataSize() const override;
+ void SerializeData(void* buffer) const override;
+
+ uint64_t sequence_num_ = 0;
+ std::vector<PortDescriptor> port_descriptors_;
+ std::vector<PortName> ports_;
+ mozilla::UniquePtr<UserMessage> message_;
+
+ DISALLOW_COPY_AND_ASSIGN(UserMessageEvent);
+};
+
+class PortAcceptedEvent : public Event {
+ public:
+ PortAcceptedEvent(const PortName& port_name, const PortName& from_port,
+ uint64_t control_sequence_num);
+ ~PortAcceptedEvent() override;
+
+ static ScopedEvent Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer, size_t num_bytes);
+
+ private:
+ size_t GetSerializedDataSize() const override;
+ void SerializeData(void* buffer) const override;
+
+ DISALLOW_COPY_AND_ASSIGN(PortAcceptedEvent);
+};
+
+class ObserveProxyEvent : public Event {
+ public:
+ ObserveProxyEvent(const PortName& port_name, const PortName& from_port,
+ uint64_t control_sequence_num,
+ const NodeName& proxy_node_name,
+ const PortName& proxy_port_name,
+ const NodeName& proxy_target_node_name,
+ const PortName& proxy_target_port_name);
+ ~ObserveProxyEvent() override;
+
+ const NodeName& proxy_node_name() const { return proxy_node_name_; }
+ const PortName& proxy_port_name() const { return proxy_port_name_; }
+ const NodeName& proxy_target_node_name() const {
+ return proxy_target_node_name_;
+ }
+ const PortName& proxy_target_port_name() const {
+ return proxy_target_port_name_;
+ }
+
+ static ScopedEvent Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer, size_t num_bytes);
+
+ private:
+ size_t GetSerializedDataSize() const override;
+ void SerializeData(void* buffer) const override;
+ ScopedEvent CloneForBroadcast() const override;
+
+ const NodeName proxy_node_name_;
+ const PortName proxy_port_name_;
+ const NodeName proxy_target_node_name_;
+ const PortName proxy_target_port_name_;
+
+ DISALLOW_COPY_AND_ASSIGN(ObserveProxyEvent);
+};
+
+class ObserveProxyAckEvent : public Event {
+ public:
+ ObserveProxyAckEvent(const PortName& port_name, const PortName& from_port,
+ uint64_t control_sequence_num,
+ uint64_t last_sequence_num);
+ ~ObserveProxyAckEvent() override;
+
+ uint64_t last_sequence_num() const { return last_sequence_num_; }
+
+ static ScopedEvent Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer, size_t num_bytes);
+
+ private:
+ size_t GetSerializedDataSize() const override;
+ void SerializeData(void* buffer) const override;
+
+ const uint64_t last_sequence_num_;
+
+ DISALLOW_COPY_AND_ASSIGN(ObserveProxyAckEvent);
+};
+
+class ObserveClosureEvent : public Event {
+ public:
+ ObserveClosureEvent(const PortName& port_name, const PortName& from_port,
+ uint64_t control_sequence_num,
+ uint64_t last_sequence_num);
+ ~ObserveClosureEvent() override;
+
+ uint64_t last_sequence_num() const { return last_sequence_num_; }
+ void set_last_sequence_num(uint64_t last_sequence_num) {
+ last_sequence_num_ = last_sequence_num;
+ }
+
+ static ScopedEvent Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer, size_t num_bytes);
+
+ private:
+ size_t GetSerializedDataSize() const override;
+ void SerializeData(void* buffer) const override;
+
+ uint64_t last_sequence_num_;
+
+ DISALLOW_COPY_AND_ASSIGN(ObserveClosureEvent);
+};
+
+class MergePortEvent : public Event {
+ public:
+ MergePortEvent(const PortName& port_name, const PortName& from_port,
+ uint64_t control_sequence_num, const PortName& new_port_name,
+ const PortDescriptor& new_port_descriptor);
+ ~MergePortEvent() override;
+
+ const PortName& new_port_name() const { return new_port_name_; }
+ const PortDescriptor& new_port_descriptor() const {
+ return new_port_descriptor_;
+ }
+
+ static ScopedEvent Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer, size_t num_bytes);
+
+ private:
+ size_t GetSerializedDataSize() const override;
+ void SerializeData(void* buffer) const override;
+
+ const PortName new_port_name_;
+ const PortDescriptor new_port_descriptor_;
+
+ DISALLOW_COPY_AND_ASSIGN(MergePortEvent);
+};
+
+class UserMessageReadAckRequestEvent : public Event {
+ public:
+ UserMessageReadAckRequestEvent(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ uint64_t sequence_num_to_acknowledge);
+ ~UserMessageReadAckRequestEvent() override;
+
+ uint64_t sequence_num_to_acknowledge() const {
+ return sequence_num_to_acknowledge_;
+ }
+
+ static ScopedEvent Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer, size_t num_bytes);
+
+ private:
+ size_t GetSerializedDataSize() const override;
+ void SerializeData(void* buffer) const override;
+
+ uint64_t sequence_num_to_acknowledge_;
+};
+
+class UserMessageReadAckEvent : public Event {
+ public:
+ UserMessageReadAckEvent(const PortName& port_name, const PortName& from_port,
+ uint64_t control_sequence_num,
+ uint64_t sequence_num_acknowledged);
+ ~UserMessageReadAckEvent() override;
+
+ uint64_t sequence_num_acknowledged() const {
+ return sequence_num_acknowledged_;
+ }
+
+ static ScopedEvent Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer, size_t num_bytes);
+
+ private:
+ size_t GetSerializedDataSize() const override;
+ void SerializeData(void* buffer) const override;
+
+ uint64_t sequence_num_acknowledged_;
+};
+
+class UpdatePreviousPeerEvent : public Event {
+ public:
+ UpdatePreviousPeerEvent(const PortName& port_name, const PortName& from_port,
+ uint64_t control_sequence_num,
+ const NodeName& new_node_name,
+ const PortName& new_port_name);
+ ~UpdatePreviousPeerEvent() override;
+
+ const NodeName& new_node_name() const { return new_node_name_; }
+
+ const PortName& new_port_name() const { return new_port_name_; }
+
+ static ScopedEvent Deserialize(const PortName& port_name,
+ const PortName& from_port,
+ uint64_t control_sequence_num,
+ const void* buffer, size_t num_bytes);
+
+ private:
+ size_t GetSerializedDataSize() const override;
+ void SerializeData(void* buffer) const override;
+
+ const NodeName new_node_name_;
+ const PortName new_port_name_;
+};
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
+
+#endif // MOJO_CORE_PORTS_EVENT_H_
diff --git a/ipc/chromium/src/mojo/core/ports/message_filter.h b/ipc/chromium/src/mojo/core/ports/message_filter.h
new file mode 100644
index 0000000000..21f8c1c869
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/message_filter.h
@@ -0,0 +1,29 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_CORE_PORTS_MESSAGE_FILTER_H_
+#define MOJO_CORE_PORTS_MESSAGE_FILTER_H_
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+class UserMessageEvent;
+
+// An interface which can be implemented to user message events according to
+// arbitrary policy.
+class MessageFilter {
+ public:
+ virtual ~MessageFilter() = default;
+
+ // Returns true if |message| should be accepted by whomever is applying this
+ // filter. See MessageQueue::GetNextMessage(), for example.
+ virtual bool Match(const UserMessageEvent& message) = 0;
+};
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
+
+#endif // MOJO_CORE_PORTS_MESSAGE_FILTER_H_
diff --git a/ipc/chromium/src/mojo/core/ports/message_queue.cc b/ipc/chromium/src/mojo/core/ports/message_queue.cc
new file mode 100644
index 0000000000..24c5c7fa91
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/message_queue.cc
@@ -0,0 +1,97 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/core/ports/message_queue.h"
+
+#include <algorithm>
+
+#include "base/compiler_specific.h"
+#include "base/logging.h"
+#include "mojo/core/ports/message_filter.h"
+#include "mozilla/Likely.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+// Used by std::{push,pop}_heap functions
+inline bool operator<(const mozilla::UniquePtr<UserMessageEvent>& a,
+ const mozilla::UniquePtr<UserMessageEvent>& b) {
+ return a->sequence_num() > b->sequence_num();
+}
+
+MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {}
+
+MessageQueue::MessageQueue(uint64_t next_sequence_num)
+ : next_sequence_num_(next_sequence_num) {
+ // The message queue is blocked waiting for a message with sequence number
+ // equal to |next_sequence_num|.
+}
+
+MessageQueue::~MessageQueue() {
+#ifdef DEBUG
+ size_t num_leaked_ports = 0;
+ for (const auto& message : heap_) {
+ num_leaked_ports += message->num_ports();
+ }
+ if (num_leaked_ports > 0) {
+ DVLOG(1) << "Leaking " << num_leaked_ports
+ << " ports in unreceived messages";
+ }
+#endif
+}
+
+bool MessageQueue::HasNextMessage() const {
+ return !heap_.empty() && heap_[0]->sequence_num() == next_sequence_num_;
+}
+
+void MessageQueue::GetNextMessage(mozilla::UniquePtr<UserMessageEvent>* message,
+ MessageFilter* filter) {
+ if (!HasNextMessage() || (filter && !filter->Match(*heap_[0]))) {
+ message->reset();
+ return;
+ }
+
+ std::pop_heap(heap_.begin(), heap_.end());
+ *message = std::move(heap_.back());
+ total_queued_bytes_ -= (*message)->GetSizeIfSerialized();
+ heap_.pop_back();
+
+ // We keep the capacity of |heap_| in check so that a large batch of incoming
+ // messages doesn't permanently wreck available memory. The choice of interval
+ // here is somewhat arbitrary.
+ constexpr size_t kHeapMinimumShrinkSize = 16;
+ constexpr size_t kHeapShrinkInterval = 512;
+ if (MOZ_UNLIKELY(heap_.size() > kHeapMinimumShrinkSize &&
+ heap_.size() % kHeapShrinkInterval == 0)) {
+ heap_.shrink_to_fit();
+ }
+}
+
+void MessageQueue::AcceptMessage(mozilla::UniquePtr<UserMessageEvent> message,
+ bool* has_next_message) {
+ // TODO: Handle sequence number roll-over.
+
+ total_queued_bytes_ += message->GetSizeIfSerialized();
+ heap_.emplace_back(std::move(message));
+ std::push_heap(heap_.begin(), heap_.end());
+
+ if (!signalable_) {
+ *has_next_message = false;
+ } else {
+ *has_next_message = (heap_[0]->sequence_num() == next_sequence_num_);
+ }
+}
+
+void MessageQueue::TakeAllMessages(
+ std::vector<mozilla::UniquePtr<UserMessageEvent>>* messages) {
+ *messages = std::move(heap_);
+ total_queued_bytes_ = 0;
+}
+
+void MessageQueue::MessageProcessed() { next_sequence_num_++; }
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
diff --git a/ipc/chromium/src/mojo/core/ports/message_queue.h b/ipc/chromium/src/mojo/core/ports/message_queue.h
new file mode 100644
index 0000000000..c8c56da8b3
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/message_queue.h
@@ -0,0 +1,88 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_CORE_PORTS_MESSAGE_QUEUE_H_
+#define MOJO_CORE_PORTS_MESSAGE_QUEUE_H_
+
+#include <stdint.h>
+
+#include <limits>
+#include <memory>
+#include <vector>
+
+#include "mojo/core/ports/event.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+constexpr uint64_t kInitialSequenceNum = 1;
+constexpr uint64_t kInvalidSequenceNum = std::numeric_limits<uint64_t>::max();
+
+class MessageFilter;
+
+// An incoming message queue for a port. MessageQueue keeps track of the highest
+// known sequence number and can indicate whether the next sequential message is
+// available. Thus the queue enforces message ordering for the consumer without
+// enforcing it for the producer (see AcceptMessage() below.)
+class MessageQueue {
+ public:
+ explicit MessageQueue();
+ explicit MessageQueue(uint64_t next_sequence_num);
+ ~MessageQueue();
+
+ MessageQueue(const MessageQueue&) = delete;
+ void operator=(const MessageQueue&) = delete;
+
+ void set_signalable(bool value) { signalable_ = value; }
+
+ uint64_t next_sequence_num() const { return next_sequence_num_; }
+
+ bool HasNextMessage() const;
+
+ // Gives ownership of the message. If |filter| is non-null, the next message
+ // will only be retrieved if the filter successfully matches it.
+ void GetNextMessage(mozilla::UniquePtr<UserMessageEvent>* message,
+ MessageFilter* filter);
+
+ // Mark the message from |GetNextMessage| as processed.
+ void MessageProcessed();
+
+ // Takes ownership of the message. Note: Messages are ordered, so while we
+ // have added a message to the queue, we may still be waiting on a message
+ // ahead of this one before we can let any of the messages be returned by
+ // GetNextMessage.
+ //
+ // Furthermore, once has_next_message is set to true, it will remain false
+ // until GetNextMessage is called enough times to return a null message.
+ // In other words, has_next_message acts like an edge trigger.
+ //
+ void AcceptMessage(mozilla::UniquePtr<UserMessageEvent> message,
+ bool* has_next_message);
+
+ // Takes all messages from this queue. Used to safely destroy queued messages
+ // without holding any Port lock.
+ void TakeAllMessages(
+ std::vector<mozilla::UniquePtr<UserMessageEvent>>* messages);
+
+ // The number of messages queued here, regardless of whether the next expected
+ // message has arrived yet.
+ size_t queued_message_count() const { return heap_.size(); }
+
+ // The aggregate memory size in bytes of all messages queued here, regardless
+ // of whether the next expected message has arrived yet.
+ size_t queued_num_bytes() const { return total_queued_bytes_; }
+
+ private:
+ std::vector<mozilla::UniquePtr<UserMessageEvent>> heap_;
+ uint64_t next_sequence_num_;
+ bool signalable_ = true;
+ size_t total_queued_bytes_ = 0;
+};
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
+
+#endif // MOJO_CORE_PORTS_MESSAGE_QUEUE_H_
diff --git a/ipc/chromium/src/mojo/core/ports/name.cc b/ipc/chromium/src/mojo/core/ports/name.cc
new file mode 100644
index 0000000000..17a787d933
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/name.cc
@@ -0,0 +1,54 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/core/ports/name.h"
+#include "chrome/common/ipc_message_utils.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+std::ostream& operator<<(std::ostream& stream, const Name& name) {
+ std::ios::fmtflags flags(stream.flags());
+ stream << std::hex << std::uppercase << name.v1;
+ if (name.v2 != 0) {
+ stream << '.' << name.v2;
+ }
+ stream.flags(flags);
+ return stream;
+}
+
+mozilla::Logger& operator<<(mozilla::Logger& log, const Name& name) {
+ log.printf("%" PRIX64, name.v1);
+ if (name.v2 != 0) {
+ log.printf(".%" PRIX64, name.v2);
+ }
+ return log;
+}
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
+
+void IPC::ParamTraits<mojo::core::ports::PortName>::Write(
+ MessageWriter* aWriter, const paramType& aParam) {
+ WriteParam(aWriter, aParam.v1);
+ WriteParam(aWriter, aParam.v2);
+}
+
+bool IPC::ParamTraits<mojo::core::ports::PortName>::Read(MessageReader* aReader,
+ paramType* aResult) {
+ return ReadParam(aReader, &aResult->v1) && ReadParam(aReader, &aResult->v2);
+}
+
+void IPC::ParamTraits<mojo::core::ports::NodeName>::Write(
+ MessageWriter* aWriter, const paramType& aParam) {
+ WriteParam(aWriter, aParam.v1);
+ WriteParam(aWriter, aParam.v2);
+}
+
+bool IPC::ParamTraits<mojo::core::ports::NodeName>::Read(MessageReader* aReader,
+ paramType* aResult) {
+ return ReadParam(aReader, &aResult->v1) && ReadParam(aReader, &aResult->v2);
+}
diff --git a/ipc/chromium/src/mojo/core/ports/name.h b/ipc/chromium/src/mojo/core/ports/name.h
new file mode 100644
index 0000000000..0e668ebc40
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/name.h
@@ -0,0 +1,122 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_CORE_PORTS_NAME_H_
+#define MOJO_CORE_PORTS_NAME_H_
+
+#include <stdint.h>
+
+#include <ostream>
+#include <tuple>
+
+#include "base/logging.h"
+#include "mozilla/HashFunctions.h"
+#include "nsHashKeys.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+struct Name {
+ constexpr Name(uint64_t v1, uint64_t v2) : v1(v1), v2(v2) {}
+ uint64_t v1, v2;
+};
+
+inline bool operator==(const Name& a, const Name& b) {
+ return a.v1 == b.v1 && a.v2 == b.v2;
+}
+
+inline bool operator!=(const Name& a, const Name& b) { return !(a == b); }
+
+inline bool operator<(const Name& a, const Name& b) {
+ return std::tie(a.v1, a.v2) < std::tie(b.v1, b.v2);
+}
+
+std::ostream& operator<<(std::ostream& stream, const Name& name);
+mozilla::Logger& operator<<(mozilla::Logger& log, const Name& name);
+
+struct PortName : Name {
+ constexpr PortName() : Name(0, 0) {}
+ constexpr PortName(uint64_t v1, uint64_t v2) : Name(v1, v2) {}
+};
+
+constexpr PortName kInvalidPortName{0, 0};
+
+struct NodeName : Name {
+ constexpr NodeName() : Name(0, 0) {}
+ constexpr NodeName(uint64_t v1, uint64_t v2) : Name(v1, v2) {}
+};
+
+constexpr NodeName kInvalidNodeName{0, 0};
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
+
+namespace mozilla {
+
+template <>
+inline PLDHashNumber Hash<mojo::core::ports::PortName>(
+ const mojo::core::ports::PortName& aValue) {
+ return mozilla::HashGeneric(aValue.v1, aValue.v2);
+}
+
+template <>
+inline PLDHashNumber Hash<mojo::core::ports::NodeName>(
+ const mojo::core::ports::NodeName& aValue) {
+ return mozilla::HashGeneric(aValue.v1, aValue.v2);
+}
+
+using PortNameHashKey = nsGenericHashKey<mojo::core::ports::PortName>;
+using NodeNameHashKey = nsGenericHashKey<mojo::core::ports::NodeName>;
+
+} // namespace mozilla
+
+namespace std {
+
+template <>
+struct hash<mojo::core::ports::PortName> {
+ std::size_t operator()(const mojo::core::ports::PortName& name) const {
+ // FIXME: PLDHashNumber is only 32-bits
+ return mozilla::Hash(name);
+ }
+};
+
+template <>
+struct hash<mojo::core::ports::NodeName> {
+ std::size_t operator()(const mojo::core::ports::NodeName& name) const {
+ // FIXME: PLDHashNumber is only 32-bits
+ return mozilla::Hash(name);
+ }
+};
+
+} // namespace std
+
+class PickleIterator;
+
+namespace IPC {
+
+template <typename T>
+struct ParamTraits;
+class Message;
+class MessageReader;
+class MessageWriter;
+
+template <>
+struct ParamTraits<mojo::core::ports::PortName> {
+ using paramType = mojo::core::ports::PortName;
+ static void Write(MessageWriter* aWriter, const paramType& aParam);
+ static bool Read(MessageReader* aReader, paramType* aResult);
+};
+
+template <>
+struct ParamTraits<mojo::core::ports::NodeName> {
+ using paramType = mojo::core::ports::NodeName;
+ static void Write(MessageWriter* aWriter, const paramType& aParam);
+ static bool Read(MessageReader* aReader, paramType* aResult);
+};
+
+} // namespace IPC
+
+#endif // MOJO_CORE_PORTS_NAME_H_
diff --git a/ipc/chromium/src/mojo/core/ports/node.cc b/ipc/chromium/src/mojo/core/ports/node.cc
new file mode 100644
index 0000000000..63e58c9928
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/node.cc
@@ -0,0 +1,2130 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/core/ports/node.h"
+
+#include <string.h>
+
+#include <algorithm>
+#include <atomic>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "mozilla/Mutex.h"
+#include "mozilla/RandomNum.h"
+#include "nsTArray.h"
+
+#include "base/logging.h"
+#include "mojo/core/ports/event.h"
+#include "mojo/core/ports/node_delegate.h"
+#include "mojo/core/ports/port_locker.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+namespace {
+
+int DebugError(const char* message, int error_code) {
+ NOTREACHED() << "Oops: " << message;
+ return error_code;
+}
+
+#define OOPS(x) DebugError(#x, x)
+
+bool CanAcceptMoreMessages(const Port* port) {
+ // Have we already doled out the last message (i.e., do we expect to NOT
+ // receive further messages)?
+ uint64_t next_sequence_num = port->message_queue.next_sequence_num();
+ if (port->state == Port::kClosed) {
+ return false;
+ }
+ if (port->peer_closed || port->remove_proxy_on_last_message) {
+ if (port->peer_lost_unexpectedly) {
+ return port->message_queue.HasNextMessage();
+ }
+ if (port->last_sequence_num_to_receive == next_sequence_num - 1) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void GenerateRandomPortName(PortName* name) {
+ // FIXME: Chrome uses a cache to avoid extra calls to the system RNG when
+ // generating port names to keep this overhead down. If this method starts
+ // showing up on profiles we should consider doing the same.
+ *name = PortName{mozilla::RandomUint64OrDie(), mozilla::RandomUint64OrDie()};
+}
+
+} // namespace
+
+Node::Node(const NodeName& name, NodeDelegate* delegate)
+ : name_(name), delegate_(this, delegate) {}
+
+Node::~Node() {
+ if (!ports_.empty()) {
+ DLOG(WARNING) << "Unclean shutdown for node " << name_;
+ }
+}
+
+bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
+ PortLocker::AssertNoPortsLockedOnCurrentThread();
+ mozilla::MutexAutoLock ports_lock(ports_lock_);
+
+ if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) {
+#ifdef DEBUG
+ for (auto& entry : ports_) {
+ DVLOG(2) << "Port " << entry.first << " referencing node "
+ << entry.second->peer_node_name << " is blocking shutdown of "
+ << "node " << name_ << " (state=" << entry.second->state << ")";
+ }
+#endif
+ return ports_.empty();
+ }
+
+ DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS);
+
+ // NOTE: This is not efficient, though it probably doesn't need to be since
+ // relatively few ports should be open during shutdown and shutdown doesn't
+ // need to be blazingly fast.
+ bool can_shutdown = true;
+ for (auto& entry : ports_) {
+ PortRef port_ref(entry.first, entry.second);
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (port->peer_node_name != name_ && port->state != Port::kReceiving) {
+ can_shutdown = false;
+#ifdef DEBUG
+ DVLOG(2) << "Port " << entry.first << " referencing node "
+ << port->peer_node_name << " is blocking shutdown of "
+ << "node " << name_ << " (state=" << port->state << ")";
+#else
+ // Exit early when not debugging.
+ break;
+#endif
+ }
+ }
+
+ return can_shutdown;
+}
+
+int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
+ PortLocker::AssertNoPortsLockedOnCurrentThread();
+ mozilla::MutexAutoLock lock(ports_lock_);
+ auto iter = ports_.find(port_name);
+ if (iter == ports_.end()) {
+ return ERROR_PORT_UNKNOWN;
+ }
+
+#if defined(ANDROID) && defined(__aarch64__)
+ // Workaround for https://crbug.com/665869.
+ std::atomic_thread_fence(std::memory_order_seq_cst);
+#endif
+
+ *port_ref = PortRef(port_name, iter->second);
+ return OK;
+}
+
+int Node::CreateUninitializedPort(PortRef* port_ref) {
+ PortName port_name;
+ GenerateRandomPortName(&port_name);
+
+ RefPtr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
+ int rv = AddPortWithName(port_name, port);
+ if (rv != OK) {
+ return rv;
+ }
+
+ *port_ref = PortRef(port_name, std::move(port));
+ return OK;
+}
+
+int Node::InitializePort(const PortRef& port_ref,
+ const NodeName& peer_node_name,
+ const PortName& peer_port_name,
+ const NodeName& prev_node_name,
+ const PortName& prev_port_name) {
+ {
+ // Must be acquired for UpdatePortPeerAddress below.
+ PortLocker::AssertNoPortsLockedOnCurrentThread();
+ mozilla::MutexAutoLock ports_lock(ports_lock_);
+
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (port->state != Port::kUninitialized) {
+ return ERROR_PORT_STATE_UNEXPECTED;
+ }
+
+ port->state = Port::kReceiving;
+ UpdatePortPeerAddress(port_ref.name(), port, peer_node_name,
+ peer_port_name);
+
+ port->prev_node_name = prev_node_name;
+ port->prev_port_name = prev_port_name;
+ }
+
+ delegate_->PortStatusChanged(port_ref);
+
+ return OK;
+}
+
+int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
+ int rv;
+
+ rv = CreateUninitializedPort(port0_ref);
+ if (rv != OK) {
+ return rv;
+ }
+
+ rv = CreateUninitializedPort(port1_ref);
+ if (rv != OK) {
+ return rv;
+ }
+
+ rv = InitializePort(*port0_ref, name_, port1_ref->name(), name_,
+ port1_ref->name());
+ if (rv != OK) {
+ return rv;
+ }
+
+ rv = InitializePort(*port1_ref, name_, port0_ref->name(), name_,
+ port0_ref->name());
+ if (rv != OK) {
+ return rv;
+ }
+
+ return OK;
+}
+
+int Node::SetUserData(const PortRef& port_ref, RefPtr<UserData> user_data) {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (port->state == Port::kClosed) {
+ return ERROR_PORT_STATE_UNEXPECTED;
+ }
+
+ port->user_data = std::move(user_data);
+
+ return OK;
+}
+
+int Node::GetUserData(const PortRef& port_ref, RefPtr<UserData>* user_data) {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (port->state == Port::kClosed) {
+ return ERROR_PORT_STATE_UNEXPECTED;
+ }
+
+ *user_data = port->user_data;
+
+ return OK;
+}
+
+int Node::ClosePort(const PortRef& port_ref) {
+ std::vector<mozilla::UniquePtr<UserMessageEvent>> undelivered_messages;
+ NodeName peer_node_name;
+ PortName peer_port_name;
+ uint64_t sequence_num = 0;
+ uint64_t last_sequence_num = 0;
+ bool was_initialized = false;
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ switch (port->state) {
+ case Port::kUninitialized:
+ break;
+
+ case Port::kReceiving:
+ was_initialized = true;
+ port->state = Port::kClosed;
+
+ // We pass along the sequence number of the last message sent from this
+ // port to allow the peer to have the opportunity to consume all inbound
+ // messages before notifying the embedder that this port is closed.
+ last_sequence_num = port->next_sequence_num_to_send - 1;
+
+ peer_node_name = port->peer_node_name;
+ peer_port_name = port->peer_port_name;
+
+ sequence_num = port->next_control_sequence_num_to_send++;
+
+ // If the port being closed still has unread messages, then we need to
+ // take care to close those ports so as to avoid leaking memory.
+ port->message_queue.TakeAllMessages(&undelivered_messages);
+ port->TakePendingMessages(undelivered_messages);
+ break;
+
+ default:
+ return ERROR_PORT_STATE_UNEXPECTED;
+ }
+ }
+
+ ErasePort(port_ref.name());
+
+ if (was_initialized) {
+ DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@"
+ << name_ << " to " << peer_port_name << "@" << peer_node_name;
+ delegate_->ForwardEvent(
+ peer_node_name,
+ mozilla::MakeUnique<ObserveClosureEvent>(
+ peer_port_name, port_ref.name(), sequence_num, last_sequence_num));
+ for (const auto& message : undelivered_messages) {
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ PortRef ref;
+ if (GetPort(message->ports()[i], &ref) == OK) {
+ ClosePort(ref);
+ }
+ }
+ }
+ }
+ return OK;
+}
+
+int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (port->state != Port::kReceiving) {
+ return ERROR_PORT_STATE_UNEXPECTED;
+ }
+
+ // TODO(sroettger): include messages pending sender verification here?
+ port_status->has_messages = port->message_queue.HasNextMessage();
+ port_status->receiving_messages = CanAcceptMoreMessages(port);
+ port_status->peer_closed = port->peer_closed;
+ port_status->peer_remote = port->peer_node_name != name_;
+ port_status->queued_message_count =
+ port->message_queue.queued_message_count();
+ port_status->queued_num_bytes = port->message_queue.queued_num_bytes();
+ port_status->unacknowledged_message_count =
+ port->next_sequence_num_to_send - port->last_sequence_num_acknowledged -
+ 1;
+
+#ifdef FUZZING_SNAPSHOT
+ port_status->peer_node_name = port->peer_node_name;
+#endif
+
+ return OK;
+}
+
+int Node::GetMessage(const PortRef& port_ref,
+ mozilla::UniquePtr<UserMessageEvent>* message,
+ MessageFilter* filter) {
+ *message = nullptr;
+
+ DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_;
+
+ NodeName peer_node_name;
+ ScopedEvent ack_event;
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+
+ // This could also be treated like the port being unknown since the
+ // embedder should no longer be referring to a port that has been sent.
+ if (port->state != Port::kReceiving) {
+ return ERROR_PORT_STATE_UNEXPECTED;
+ }
+
+ // Let the embedder get messages until there are no more before reporting
+ // that the peer closed its end.
+ if (!CanAcceptMoreMessages(port)) {
+ return ERROR_PORT_PEER_CLOSED;
+ }
+
+ port->message_queue.GetNextMessage(message, filter);
+ if (*message &&
+ (*message)->sequence_num() == port->sequence_num_to_acknowledge) {
+ peer_node_name = port->peer_node_name;
+ ack_event = mozilla::MakeUnique<UserMessageReadAckEvent>(
+ port->peer_port_name, port_ref.name(),
+ port->next_control_sequence_num_to_send++,
+ port->sequence_num_to_acknowledge);
+ }
+ if (*message) {
+ // Message will be passed to the user, no need to block the queue.
+ port->message_queue.MessageProcessed();
+ }
+ }
+
+ if (ack_event) {
+ delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
+ }
+
+ // Allow referenced ports to trigger PortStatusChanged calls.
+ if (*message) {
+ for (size_t i = 0; i < (*message)->num_ports(); ++i) {
+ PortRef new_port_ref;
+ int rv = GetPort((*message)->ports()[i], &new_port_ref);
+
+ DCHECK_EQ(OK, rv) << "Port " << new_port_ref.name() << "@" << name_
+ << " does not exist!";
+
+ SinglePortLocker locker(&new_port_ref);
+ DCHECK_EQ(locker.port()->state, Port::kReceiving);
+ locker.port()->message_queue.set_signalable(true);
+ }
+
+ // The user may retransmit this message from another port. We reset the
+ // sequence number so that the message will get a new one if that happens.
+ (*message)->set_sequence_num(0);
+ }
+
+ return OK;
+}
+
+int Node::SendUserMessage(const PortRef& port_ref,
+ mozilla::UniquePtr<UserMessageEvent> message) {
+ int rv = SendUserMessageInternal(port_ref, &message);
+ if (rv != OK) {
+ // If send failed, close all carried ports. Note that we're careful not to
+ // close the sending port itself if it happened to be one of the encoded
+ // ports (an invalid but possible condition.)
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ if (message->ports()[i] == port_ref.name()) {
+ continue;
+ }
+
+ PortRef port;
+ if (GetPort(message->ports()[i], &port) == OK) {
+ ClosePort(port);
+ }
+ }
+ }
+ return rv;
+}
+
+int Node::SetAcknowledgeRequestInterval(
+ const PortRef& port_ref, uint64_t sequence_num_acknowledge_interval) {
+ NodeName peer_node_name;
+ PortName peer_port_name;
+ uint64_t sequence_num_to_request_ack = 0;
+ uint64_t sequence_num = 0;
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (port->state != Port::kReceiving) {
+ return ERROR_PORT_STATE_UNEXPECTED;
+ }
+
+ port->sequence_num_acknowledge_interval = sequence_num_acknowledge_interval;
+ if (!sequence_num_acknowledge_interval) {
+ return OK;
+ }
+
+ peer_node_name = port->peer_node_name;
+ peer_port_name = port->peer_port_name;
+
+ sequence_num_to_request_ack = port->last_sequence_num_acknowledged +
+ sequence_num_acknowledge_interval;
+ sequence_num = port->next_control_sequence_num_to_send++;
+ }
+
+ delegate_->ForwardEvent(peer_node_name,
+ mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
+ peer_port_name, port_ref.name(), sequence_num,
+ sequence_num_to_request_ack));
+ return OK;
+}
+
+bool Node::IsEventFromPreviousPeer(const Event& event) {
+ switch (event.type()) {
+ case Event::Type::kUserMessage:
+ return true;
+ case Event::Type::kPortAccepted:
+ // PortAccepted is sent by the next peer
+ return false;
+ case Event::Type::kObserveProxy:
+ // ObserveProxy with an invalid port name is a broadcast event
+ return event.port_name() != kInvalidPortName;
+ case Event::Type::kObserveProxyAck:
+ return true;
+ case Event::Type::kObserveClosure:
+ return true;
+ case Event::Type::kMergePort:
+ // MergePort is not from the previous peer
+ return false;
+ case Event::Type::kUserMessageReadAckRequest:
+ return true;
+ case Event::Type::kUserMessageReadAck:
+ return true;
+ case Event::Type::kUpdatePreviousPeer:
+ return true;
+ default:
+ // No need to check unknown message types since AcceptPeer will return
+ // an error.
+ return false;
+ }
+}
+
+int Node::AcceptEventInternal(const PortRef& port_ref,
+ const NodeName& from_node, ScopedEvent event) {
+ switch (event->type()) {
+ case Event::Type::kUserMessage:
+ return OnUserMessage(port_ref, from_node,
+ Event::Cast<UserMessageEvent>(&event));
+ case Event::Type::kPortAccepted:
+ return OnPortAccepted(port_ref, Event::Cast<PortAcceptedEvent>(&event));
+ case Event::Type::kObserveProxy:
+ return OnObserveProxy(port_ref, Event::Cast<ObserveProxyEvent>(&event));
+ case Event::Type::kObserveProxyAck:
+ return OnObserveProxyAck(port_ref,
+ Event::Cast<ObserveProxyAckEvent>(&event));
+ case Event::Type::kObserveClosure:
+ return OnObserveClosure(port_ref,
+ Event::Cast<ObserveClosureEvent>(&event));
+ case Event::Type::kMergePort:
+ return OnMergePort(port_ref, Event::Cast<MergePortEvent>(&event));
+ case Event::Type::kUserMessageReadAckRequest:
+ return OnUserMessageReadAckRequest(
+ port_ref, Event::Cast<UserMessageReadAckRequestEvent>(&event));
+ case Event::Type::kUserMessageReadAck:
+ return OnUserMessageReadAck(port_ref,
+ Event::Cast<UserMessageReadAckEvent>(&event));
+ case Event::Type::kUpdatePreviousPeer:
+ return OnUpdatePreviousPeer(port_ref,
+ Event::Cast<UpdatePreviousPeerEvent>(&event));
+ }
+ return OOPS(ERROR_NOT_IMPLEMENTED);
+}
+
+int Node::AcceptEvent(const NodeName& from_node, ScopedEvent event) {
+ PortRef port_ref;
+ GetPort(event->port_name(), &port_ref);
+
+ DVLOG(2) << "AcceptEvent type: " << event->type() << ", "
+ << event->from_port() << "@" << from_node << " => "
+ << port_ref.name() << "@" << name_
+ << " seq nr: " << event->control_sequence_num() << " port valid? "
+ << port_ref.is_valid();
+
+ if (!IsEventFromPreviousPeer(*event)) {
+ DCHECK_EQ(event->control_sequence_num(), kInvalidSequenceNum);
+ // Some events are not coming from the previous peer, e.g. broadcasts or
+ // PortAccepted events. No need to check the sequence number or sender.
+ return AcceptEventInternal(port_ref, from_node, std::move(event));
+ }
+
+ DCHECK_NE(event->control_sequence_num(), kInvalidSequenceNum);
+
+ if (!port_ref.is_valid()) {
+ // If we don't have a valid port, there's nothing for us to check. However,
+ // we pass the ref on to AcceptEventInternal to make sure there's no race
+ // where it becomes valid and we skipped the peer check.
+ return AcceptEventInternal(port_ref, from_node, std::move(event));
+ }
+
+ // Before processing the event, verify the sender and sequence number.
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (!port->IsNextEvent(from_node, *event)) {
+ DVLOG(2) << "Buffering event (type " << event->type()
+ << "): " << event->from_port() << "@" << from_node << " => "
+ << port_ref.name() << "@" << name_
+ << " seq nr: " << event->control_sequence_num() << " / "
+ << port->next_control_sequence_num_to_receive << ", want "
+ << port->prev_port_name << "@" << port->prev_node_name;
+
+ port->BufferEvent(from_node, std::move(event));
+ return OK;
+ }
+ }
+
+ int ret = AcceptEventInternal(port_ref, from_node, std::move(event));
+
+ // More events might have been enqueued during processing.
+ while (true) {
+ ScopedEvent next_event;
+ NodeName next_from_node;
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ // We always increment the control sequence number after we finished
+ // processing the event. That way we ensure that the events are handled
+ // in order without keeping a lock the whole time.
+ port->next_control_sequence_num_to_receive++;
+ port->NextEvent(&next_from_node, &next_event);
+
+ if (next_event) {
+ DVLOG(2) << "Handling buffered event (type " << next_event->type()
+ << "): " << next_event->from_port() << "@" << next_from_node
+ << " => " << port_ref.name() << "@" << name_
+ << " seq nr: " << next_event->control_sequence_num() << " / "
+ << port->next_control_sequence_num_to_receive;
+ }
+ }
+ if (!next_event) {
+ break;
+ }
+ AcceptEventInternal(port_ref, next_from_node, std::move(next_event));
+ }
+
+ return ret;
+}
+
+int Node::MergePorts(const PortRef& port_ref,
+ const NodeName& destination_node_name,
+ const PortName& destination_port_name) {
+ PortName new_port_name;
+ Event::PortDescriptor new_port_descriptor;
+ PendingUpdatePreviousPeer pending_update_event{.from_port = port_ref.name()};
+ {
+ // Must be held for ConvertToProxy.
+ PortLocker::AssertNoPortsLockedOnCurrentThread();
+ mozilla::MutexAutoLock ports_locker(ports_lock_);
+
+ SinglePortLocker locker(&port_ref);
+
+ DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
+ << " to " << destination_port_name << "@" << destination_node_name;
+
+ // Send the port-to-merge over to the destination node so it can be merged
+ // into the port cycle atomically there.
+ new_port_name = port_ref.name();
+ ConvertToProxy(locker.port(), destination_node_name, &new_port_name,
+ &new_port_descriptor, &pending_update_event);
+ }
+
+ delegate_->ForwardEvent(
+ pending_update_event.receiver,
+ mozilla::MakeUnique<UpdatePreviousPeerEvent>(
+ pending_update_event.port, pending_update_event.from_port,
+ pending_update_event.sequence_num, pending_update_event.new_prev_node,
+ pending_update_event.new_prev_port));
+
+ if (new_port_descriptor.peer_node_name == name_ &&
+ destination_node_name != name_) {
+ // Ensure that the locally retained peer of the new proxy gets a status
+ // update so it notices that its peer is now remote.
+ PortRef local_peer;
+ if (GetPort(new_port_descriptor.peer_port_name, &local_peer) == OK) {
+ delegate_->PortStatusChanged(local_peer);
+ }
+ }
+
+ delegate_->ForwardEvent(
+ destination_node_name,
+ mozilla::MakeUnique<MergePortEvent>(destination_port_name,
+ kInvalidPortName, kInvalidSequenceNum,
+ new_port_name, new_port_descriptor));
+ return OK;
+}
+
+int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
+ DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
+ << " and " << port1_ref.name() << "@" << name_;
+ return MergePortsInternal(port0_ref, port1_ref,
+ true /* allow_close_on_bad_state */);
+}
+
+int Node::LostConnectionToNode(const NodeName& node_name) {
+ // We can no longer send events to the given node. We also can't expect any
+ // PortAccepted events.
+
+ DVLOG(1) << "Observing lost connection from node " << name_ << " to node "
+ << node_name;
+
+ DestroyAllPortsWithPeer(node_name, kInvalidPortName);
+ return OK;
+}
+
+int Node::OnUserMessage(const PortRef& port_ref, const NodeName& from_node,
+ mozilla::UniquePtr<UserMessageEvent> message) {
+#ifdef DEBUG
+ std::ostringstream ports_buf;
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ if (i > 0) {
+ ports_buf << ",";
+ }
+ ports_buf << message->ports()[i];
+ }
+
+ DVLOG(4) << "OnUserMessage " << message->sequence_num()
+ << " [ports=" << ports_buf.str() << "] at " << message->port_name()
+ << "@" << name_;
+#endif
+
+ // Even if this port does not exist, cannot receive anymore messages or is
+ // buffering or proxying messages, we still need these ports to be bound to
+ // this node. When the message is forwarded, these ports will get transferred
+ // following the usual method. If the message cannot be accepted, then the
+ // newly bound ports will simply be closed.
+ if (from_node != name_) {
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ Event::PortDescriptor& descriptor = message->port_descriptors()[i];
+ int rv = AcceptPort(message->ports()[i], descriptor);
+ if (rv != OK) {
+ return rv;
+ }
+ }
+ }
+
+ bool has_next_message = false;
+ bool message_accepted = false;
+ bool should_forward_messages = false;
+ if (port_ref.is_valid()) {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+
+ // Reject spurious messages if we've already received the last expected
+ // message.
+ if (CanAcceptMoreMessages(port)) {
+ message_accepted = true;
+ port->message_queue.AcceptMessage(std::move(message), &has_next_message);
+
+ if (port->state == Port::kBuffering) {
+ has_next_message = false;
+ } else if (port->state == Port::kProxying) {
+ has_next_message = false;
+ should_forward_messages = true;
+ }
+ }
+ }
+
+ if (should_forward_messages) {
+ int rv = ForwardUserMessagesFromProxy(port_ref);
+ if (rv != OK) {
+ return rv;
+ }
+ TryRemoveProxy(port_ref);
+ }
+
+ if (!message_accepted) {
+ DVLOG(2) << "Message not accepted!\n";
+ // Close all newly accepted ports as they are effectively orphaned.
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ PortRef attached_port_ref;
+ if (GetPort(message->ports()[i], &attached_port_ref) == OK) {
+ ClosePort(attached_port_ref);
+ } else {
+ DLOG(WARNING) << "Cannot close non-existent port!\n";
+ }
+ }
+ } else if (has_next_message) {
+ delegate_->PortStatusChanged(port_ref);
+ }
+
+ return OK;
+}
+
+int Node::OnPortAccepted(const PortRef& port_ref,
+ mozilla::UniquePtr<PortAcceptedEvent> event) {
+ if (!port_ref.is_valid()) {
+ return ERROR_PORT_UNKNOWN;
+ }
+
+#ifdef DEBUG
+ {
+ SinglePortLocker locker(&port_ref);
+ DVLOG(2) << "PortAccepted at " << port_ref.name() << "@" << name_
+ << " pointing to " << locker.port()->peer_port_name << "@"
+ << locker.port()->peer_node_name;
+ }
+#endif
+
+ return BeginProxying(port_ref);
+}
+
+int Node::OnObserveProxy(const PortRef& port_ref,
+ mozilla::UniquePtr<ObserveProxyEvent> event) {
+ if (event->port_name() == kInvalidPortName) {
+ // An ObserveProxy with an invalid target port name is a broadcast used to
+ // inform ports when their peer (which was itself a proxy) has become
+ // defunct due to unexpected node disconnection.
+ //
+ // Receiving ports affected by this treat it as equivalent to peer closure.
+ // Proxies affected by this can be removed and will in turn broadcast their
+ // own death with a similar message.
+ DCHECK_EQ(event->proxy_target_node_name(), kInvalidNodeName);
+ DCHECK_EQ(event->proxy_target_port_name(), kInvalidPortName);
+ DestroyAllPortsWithPeer(event->proxy_node_name(), event->proxy_port_name());
+ return OK;
+ }
+
+ // The port may have already been closed locally, in which case the
+ // ObserveClosure message will contain the last_sequence_num field.
+ // We can then silently ignore this message.
+ if (!port_ref.is_valid()) {
+ DVLOG(1) << "ObserveProxy: " << event->port_name() << "@" << name_
+ << " not found";
+ return OK;
+ }
+
+ DVLOG(2) << "ObserveProxy at " << port_ref.name() << "@" << name_
+ << ", proxy at " << event->proxy_port_name() << "@"
+ << event->proxy_node_name() << " pointing to "
+ << event->proxy_target_port_name() << "@"
+ << event->proxy_target_node_name();
+
+ bool peer_changed = false;
+ ScopedEvent event_to_forward;
+ NodeName event_target_node;
+ {
+ // Must be acquired for UpdatePortPeerAddress below.
+ PortLocker::AssertNoPortsLockedOnCurrentThread();
+ mozilla::MutexAutoLock ports_locker(ports_lock_);
+
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+
+ if (port->peer_node_name == event->proxy_node_name() &&
+ port->peer_port_name == event->proxy_port_name()) {
+ if (port->state == Port::kReceiving) {
+ // Updating the port peer will reset the sequence num. Grab it now;
+ uint64_t sequence_num = port->next_control_sequence_num_to_send++;
+ UpdatePortPeerAddress(port_ref.name(), port,
+ event->proxy_target_node_name(),
+ event->proxy_target_port_name());
+ event_target_node = event->proxy_node_name();
+ event_to_forward = mozilla::MakeUnique<ObserveProxyAckEvent>(
+ event->proxy_port_name(), port_ref.name(), sequence_num,
+ port->next_sequence_num_to_send - 1);
+ peer_changed = true;
+ DVLOG(2) << "Forwarding ObserveProxyAck from " << event->port_name()
+ << "@" << name_ << " to " << event->proxy_port_name() << "@"
+ << event_target_node;
+ } else {
+ // As a proxy ourselves, we don't know how to honor the ObserveProxy
+ // event or to populate the last_sequence_num field of ObserveProxyAck.
+ // Afterall, another port could be sending messages to our peer now
+ // that we've sent out our own ObserveProxy event. Instead, we will
+ // send an ObserveProxyAck indicating that the ObserveProxy event
+ // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
+ // However, this has to be done after we are removed as a proxy.
+ // Otherwise, we might just find ourselves back here again, which
+ // would be akin to a busy loop.
+
+ DVLOG(2) << "Delaying ObserveProxyAck to " << event->proxy_port_name()
+ << "@" << event->proxy_node_name();
+
+ port->send_on_proxy_removal =
+ mozilla::MakeUnique<std::pair<NodeName, ScopedEvent>>(
+ event->proxy_node_name(),
+ mozilla::MakeUnique<ObserveProxyAckEvent>(
+ event->proxy_port_name(), port_ref.name(),
+ kInvalidSequenceNum, kInvalidSequenceNum));
+ }
+ } else {
+ // Forward this event along to our peer. Eventually, it should find the
+ // port referring to the proxy.
+ event_target_node = port->peer_node_name;
+ event->set_port_name(port->peer_port_name);
+ event->set_from_port(port_ref.name());
+ event->set_control_sequence_num(
+ port->next_control_sequence_num_to_send++);
+ if (port->state == Port::kBuffering) {
+ port->control_message_queue.push_back(
+ {event_target_node, std::move(event)});
+ } else {
+ event_to_forward = std::move(event);
+ }
+ }
+ }
+
+ if (event_to_forward) {
+ delegate_->ForwardEvent(event_target_node, std::move(event_to_forward));
+ }
+
+ if (peer_changed) {
+ // Re-send ack and/or ack requests, as the previous peer proxy may not have
+ // forwarded the previous request before it died.
+ MaybeResendAck(port_ref);
+ MaybeResendAckRequest(port_ref);
+
+ delegate_->PortStatusChanged(port_ref);
+ }
+
+ return OK;
+}
+
+int Node::OnObserveProxyAck(const PortRef& port_ref,
+ mozilla::UniquePtr<ObserveProxyAckEvent> event) {
+ DVLOG(2) << "ObserveProxyAck at " << event->port_name() << "@" << name_
+ << " (last_sequence_num=" << event->last_sequence_num() << ")";
+
+ if (!port_ref.is_valid()) {
+ return ERROR_PORT_UNKNOWN; // The port may have observed closure first.
+ }
+
+ bool try_remove_proxy_immediately;
+ bool erase_port = false;
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+
+ if (port->state == Port::kProxying) {
+ // If the last sequence number is invalid, this is a signal that we need
+ // to retransmit the ObserveProxy event for this port rather than flagging
+ // the the proxy for removal ASAP.
+ try_remove_proxy_immediately =
+ event->last_sequence_num() != kInvalidSequenceNum;
+ if (try_remove_proxy_immediately) {
+ // We can now remove this port once we have received and forwarded the
+ // last message addressed to this port.
+ port->remove_proxy_on_last_message = true;
+ port->last_sequence_num_to_receive = event->last_sequence_num();
+ }
+ } else if (port->state == Port::kClosed) {
+ erase_port = true;
+ } else {
+ return OOPS(ERROR_PORT_STATE_UNEXPECTED);
+ }
+ }
+
+ if (erase_port) {
+ ErasePort(port_ref.name());
+ return OK;
+ }
+
+ if (try_remove_proxy_immediately) {
+ TryRemoveProxy(port_ref);
+ } else {
+ InitiateProxyRemoval(port_ref);
+ }
+
+ return OK;
+}
+
+int Node::OnObserveClosure(const PortRef& port_ref,
+ mozilla::UniquePtr<ObserveClosureEvent> event) {
+ // OK if the port doesn't exist, as it may have been closed already.
+ if (!port_ref.is_valid()) {
+ return OK;
+ }
+
+ // This message tells the port that it should no longer expect more messages
+ // beyond last_sequence_num. This message is forwarded along until we reach
+ // the receiving end, and this message serves as an equivalent to
+ // ObserveProxyAck.
+
+ bool notify_delegate = false;
+ NodeName peer_node_name;
+ bool try_remove_proxy = false;
+ bool erase_port = false;
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+
+ port->peer_closed = true;
+ port->last_sequence_num_to_receive = event->last_sequence_num();
+
+ DVLOG(2) << "ObserveClosure at " << port_ref.name() << "@" << name_
+ << " (state=" << port->state << ") pointing to "
+ << port->peer_port_name << "@" << port->peer_node_name
+ << " (last_sequence_num=" << event->last_sequence_num() << ")";
+
+ // We always forward ObserveClosure, even beyond the receiving port which
+ // cares about it. This ensures that any dead-end proxies beyond that port
+ // are notified to remove themselves.
+
+ if (port->state == Port::kReceiving) {
+ notify_delegate = true;
+
+ // When forwarding along the other half of the port cycle, this will only
+ // reach dead-end proxies. Tell them we've sent our last message so they
+ // can go away.
+ //
+ // TODO: Repurposing ObserveClosure for this has the desired result but
+ // may be semantically confusing since the forwarding port is not actually
+ // closed. Consider replacing this with a new event type.
+ event->set_last_sequence_num(port->next_sequence_num_to_send - 1);
+
+ // Treat the closure as an acknowledge that all sent messages have been
+ // read from the other end.
+ port->last_sequence_num_acknowledged =
+ port->next_sequence_num_to_send - 1;
+ } else if (port->state == Port::kClosed) {
+ // This is the ack for a closed proxy port notification. Now it's fine to
+ // delete the port.
+ erase_port = true;
+ } else {
+ // We haven't yet reached the receiving peer of the closed port, so we'll
+ // forward the message along as-is.
+ // See about removing the port if it is a proxy as our peer won't be able
+ // to participate in proxy removal.
+ port->remove_proxy_on_last_message = true;
+ if (port->state == Port::kProxying) {
+ try_remove_proxy = true;
+ }
+ }
+
+ DVLOG(2) << "Forwarding ObserveClosure from " << port_ref.name() << "@"
+ << name_ << " to peer " << port->peer_port_name << "@"
+ << port->peer_node_name
+ << " (last_sequence_num=" << event->last_sequence_num() << ")";
+
+ event->set_port_name(port->peer_port_name);
+ event->set_from_port(port_ref.name());
+ event->set_control_sequence_num(port->next_control_sequence_num_to_send++);
+ peer_node_name = port->peer_node_name;
+
+ if (port->state == Port::kBuffering) {
+ port->control_message_queue.push_back({peer_node_name, std::move(event)});
+ }
+ }
+
+ if (try_remove_proxy) {
+ TryRemoveProxy(port_ref);
+ }
+
+ if (erase_port) {
+ ErasePort(port_ref.name());
+ }
+
+ if (event) {
+ delegate_->ForwardEvent(peer_node_name, std::move(event));
+ }
+
+ if (notify_delegate) {
+ delegate_->PortStatusChanged(port_ref);
+ }
+
+ return OK;
+}
+
+int Node::OnMergePort(const PortRef& port_ref,
+ mozilla::UniquePtr<MergePortEvent> event) {
+ DVLOG(1) << "MergePort at " << port_ref.name() << "@" << name_
+ << " merging with proxy " << event->new_port_name() << "@" << name_
+ << " pointing to " << event->new_port_descriptor().peer_port_name
+ << "@" << event->new_port_descriptor().peer_node_name
+ << " referred by "
+ << event->new_port_descriptor().referring_port_name << "@"
+ << event->new_port_descriptor().referring_node_name;
+
+ // Accept the new port. This is now the receiving end of the other port cycle
+ // to be merged with ours. Note that we always attempt to accept the new port
+ // first as otherwise its peer receiving port could be left stranded
+ // indefinitely.
+ if (AcceptPort(event->new_port_name(), event->new_port_descriptor()) != OK) {
+ if (port_ref.is_valid()) {
+ ClosePort(port_ref);
+ }
+ return ERROR_PORT_STATE_UNEXPECTED;
+ }
+
+ PortRef new_port_ref;
+ GetPort(event->new_port_name(), &new_port_ref);
+ if (!port_ref.is_valid() && new_port_ref.is_valid()) {
+ ClosePort(new_port_ref);
+ return ERROR_PORT_UNKNOWN;
+ }
+ if (port_ref.is_valid() && !new_port_ref.is_valid()) {
+ ClosePort(port_ref);
+ return ERROR_PORT_UNKNOWN;
+ }
+
+ bool peer_allowed = true;
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (!port->pending_merge_peer) {
+ CHROMIUM_LOG(ERROR) << "MergePort called on unexpected port: "
+ << event->port_name();
+ peer_allowed = false;
+ } else {
+ port->pending_merge_peer = false;
+ }
+ }
+ if (!peer_allowed) {
+ ClosePort(port_ref);
+ return ERROR_PORT_STATE_UNEXPECTED;
+ }
+
+ return MergePortsInternal(port_ref, new_port_ref,
+ false /* allow_close_on_bad_state */);
+}
+
+int Node::OnUserMessageReadAckRequest(
+ const PortRef& port_ref,
+ mozilla::UniquePtr<UserMessageReadAckRequestEvent> event) {
+ DVLOG(1) << "AckRequest " << port_ref.name() << "@" << name_ << " sequence "
+ << event->sequence_num_to_acknowledge();
+
+ if (!port_ref.is_valid()) {
+ return ERROR_PORT_UNKNOWN;
+ }
+
+ NodeName peer_node_name;
+ mozilla::UniquePtr<Event> event_to_send;
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+
+ peer_node_name = port->peer_node_name;
+ if (port->state == Port::kProxying) {
+ // Proxies simply forward the ack request to their peer.
+ event->set_port_name(port->peer_port_name);
+ event->set_from_port(port_ref.name());
+ event->set_control_sequence_num(
+ port->next_control_sequence_num_to_send++);
+ event_to_send = std::move(event);
+ } else {
+ uint64_t current_sequence_num =
+ port->message_queue.next_sequence_num() - 1;
+ // Either this is requesting an ack for a sequence number already read, or
+ // else for a sequence number that is yet to be read.
+ if (current_sequence_num >= event->sequence_num_to_acknowledge()) {
+ // If the current sequence number to read already exceeds the ack
+ // request, send an ack immediately.
+ event_to_send = mozilla::MakeUnique<UserMessageReadAckEvent>(
+ port->peer_port_name, port_ref.name(),
+ port->next_control_sequence_num_to_send++, current_sequence_num);
+
+ if (port->state == Port::kBuffering) {
+ port->control_message_queue.push_back(
+ {peer_node_name, std::move(event_to_send)});
+ }
+
+ // This might be a late or duplicate acknowledge request, that's
+ // requesting acknowledge for an already read message. There may already
+ // have been a request for future reads, so take care not to back up
+ // the requested acknowledge counter.
+ if (current_sequence_num > port->sequence_num_to_acknowledge) {
+ port->sequence_num_to_acknowledge = current_sequence_num;
+ }
+ } else {
+ // This is request to ack a sequence number that hasn't been read yet.
+ // The state of the port can either be that it already has a
+ // future-requested ack, or not. Because ack requests aren't guaranteed
+ // to arrive in order, store the earlier of the current queued request
+ // and the new one, if one was already requested.
+ bool has_queued_ack_request =
+ port->sequence_num_to_acknowledge > current_sequence_num;
+ if (!has_queued_ack_request ||
+ port->sequence_num_to_acknowledge >
+ event->sequence_num_to_acknowledge()) {
+ port->sequence_num_to_acknowledge =
+ event->sequence_num_to_acknowledge();
+ }
+ return OK;
+ }
+ }
+ }
+
+ if (event_to_send) {
+ delegate_->ForwardEvent(peer_node_name, std::move(event_to_send));
+ }
+
+ return OK;
+}
+
+int Node::OnUserMessageReadAck(
+ const PortRef& port_ref,
+ mozilla::UniquePtr<UserMessageReadAckEvent> event) {
+ DVLOG(1) << "Acknowledge " << port_ref.name() << "@" << name_ << " sequence "
+ << event->sequence_num_acknowledged();
+
+ NodeName peer_node_name;
+ ScopedEvent ack_request_event;
+ if (port_ref.is_valid()) {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+
+ if (event->sequence_num_acknowledged() >= port->next_sequence_num_to_send) {
+ // TODO(http://crbug.com/980952): This is a malformed event.
+ // This could return a new error "ERROR_MALFORMED_EVENT" which the
+ // delegate could use as a signal to drop the peer node.
+ return OK;
+ }
+
+ // Keep the largest acknowledge seen.
+ if (event->sequence_num_acknowledged() <=
+ port->last_sequence_num_acknowledged) {
+ // The acknowledge was late or a duplicate, it's safe to ignore it.
+ return OK;
+ }
+
+ port->last_sequence_num_acknowledged = event->sequence_num_acknowledged();
+ // Send another ack request if the interval is non-zero and the peer has
+ // not been closed.
+ if (port->sequence_num_acknowledge_interval && !port->peer_closed) {
+ peer_node_name = port->peer_node_name;
+ ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
+ port->peer_port_name, port_ref.name(),
+ port->next_control_sequence_num_to_send++,
+ port->last_sequence_num_acknowledged +
+ port->sequence_num_acknowledge_interval);
+ DCHECK_NE(port->state, Port::kBuffering);
+ }
+ }
+ if (ack_request_event) {
+ delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
+ }
+
+ if (port_ref.is_valid()) {
+ delegate_->PortStatusChanged(port_ref);
+ }
+
+ return OK;
+}
+
+int Node::OnUpdatePreviousPeer(
+ const PortRef& port_ref,
+ mozilla::UniquePtr<UpdatePreviousPeerEvent> event) {
+ DVLOG(1) << "OnUpdatePreviousPeer port: " << event->port_name()
+ << " changing to " << event->new_node_name()
+ << ", port: " << event->from_port() << " => "
+ << event->new_port_name();
+
+ if (!port_ref.is_valid()) {
+ return ERROR_PORT_UNKNOWN;
+ }
+
+ const NodeName& new_node_name = event->new_node_name();
+ const PortName& new_port_name = event->new_port_name();
+ DCHECK_NE(new_node_name, kInvalidNodeName);
+ DCHECK_NE(new_port_name, kInvalidPortName);
+ if (new_node_name == kInvalidNodeName || new_port_name == kInvalidPortName) {
+ return ERROR_PORT_STATE_UNEXPECTED;
+ }
+
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+
+ port->prev_node_name = new_node_name;
+ port->prev_port_name = new_port_name;
+ // The sequence number will get incremented after this event has been
+ // handled.
+ port->next_control_sequence_num_to_receive = kInitialSequenceNum - 1;
+ }
+
+ return OK;
+}
+
+int Node::AddPortWithName(const PortName& port_name, RefPtr<Port> port) {
+ PortLocker::AssertNoPortsLockedOnCurrentThread();
+ mozilla::MutexAutoLock lock(ports_lock_);
+ if (port->peer_port_name != kInvalidPortName) {
+ DCHECK_NE(kInvalidNodeName, port->peer_node_name);
+ peer_port_maps_[port->peer_node_name][port->peer_port_name].emplace(
+ port_name, PortRef(port_name, port));
+ }
+ if (!ports_.emplace(port_name, std::move(port)).second) {
+ return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
+ }
+ DVLOG(2) << "Created port " << port_name << "@" << name_;
+ return OK;
+}
+
+void Node::ErasePort(const PortName& port_name) {
+ PortLocker::AssertNoPortsLockedOnCurrentThread();
+ RefPtr<Port> port;
+ {
+ mozilla::MutexAutoLock lock(ports_lock_);
+ auto it = ports_.find(port_name);
+ if (it == ports_.end()) {
+ return;
+ }
+ port = std::move(it->second);
+ ports_.erase(it);
+
+ RemoveFromPeerPortMap(port_name, port.get());
+ }
+ // NOTE: We are careful not to release the port's messages while holding any
+ // locks, since they may run arbitrary user code upon destruction.
+ std::vector<mozilla::UniquePtr<UserMessageEvent>> messages;
+ {
+ PortRef port_ref(port_name, std::move(port));
+ SinglePortLocker locker(&port_ref);
+ locker.port()->message_queue.TakeAllMessages(&messages);
+ }
+ DVLOG(2) << "Deleted port " << port_name << "@" << name_;
+}
+
+int Node::SendUserMessageInternal(
+ const PortRef& port_ref, mozilla::UniquePtr<UserMessageEvent>* message) {
+ mozilla::UniquePtr<UserMessageEvent>& m = *message;
+
+ m->set_from_port(port_ref.name());
+
+ for (size_t i = 0; i < m->num_ports(); ++i) {
+ if (m->ports()[i] == port_ref.name()) {
+ return ERROR_PORT_CANNOT_SEND_SELF;
+ }
+ }
+
+ NodeName target_node;
+ int rv = PrepareToForwardUserMessage(port_ref, Port::kReceiving,
+ false /* ignore_closed_peer */, m.get(),
+ &target_node);
+ if (rv != OK) {
+ return rv;
+ }
+
+ // Beyond this point there's no sense in returning anything but OK. Even if
+ // message forwarding or acceptance fails, there's nothing the embedder can
+ // do to recover. Assume that failure beyond this point must be treated as a
+ // transport failure.
+
+ DCHECK_NE(kInvalidNodeName, target_node);
+ if (target_node != name_) {
+ delegate_->ForwardEvent(target_node, std::move(m));
+ return OK;
+ }
+
+ int accept_result = AcceptEvent(name_, std::move(m));
+ if (accept_result != OK) {
+ // See comment above for why we don't return an error in this case.
+ DVLOG(2) << "AcceptEvent failed: " << accept_result;
+ }
+
+ return OK;
+}
+
+int Node::MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref,
+ bool allow_close_on_bad_state) {
+ const PortRef* port_refs[2] = {&port0_ref, &port1_ref};
+ PendingUpdatePreviousPeer pending_update_events[2];
+ uint64_t original_sequence_number[2];
+ {
+ // Needed to swap peer map entries below.
+ PortLocker::AssertNoPortsLockedOnCurrentThread();
+ mozilla::ReleasableMutexAutoLock ports_locker(ports_lock_);
+
+ mozilla::Maybe<PortLocker> locker(std::in_place, port_refs, size_t(2));
+ auto* port0 = locker->GetPort(port0_ref);
+ auto* port1 = locker->GetPort(port1_ref);
+
+ // There are several conditions which must be met before we'll consider
+ // merging two ports:
+ //
+ // - They must both be in the kReceiving state
+ // - They must not be each other's peer
+ // - They must have never sent a user message
+ //
+ // If any of these criteria are not met, we fail early.
+ if (port0->state != Port::kReceiving || port1->state != Port::kReceiving ||
+ (port0->peer_node_name == name_ &&
+ port0->peer_port_name == port1_ref.name()) ||
+ (port1->peer_node_name == name_ &&
+ port1->peer_port_name == port0_ref.name()) ||
+ port0->next_sequence_num_to_send != kInitialSequenceNum ||
+ port1->next_sequence_num_to_send != kInitialSequenceNum) {
+ // On failure, we only close a port if it was at least properly in the
+ // |kReceiving| state. This avoids getting the system in an inconsistent
+ // state by e.g. closing a proxy abruptly.
+ //
+ // Note that we must release the port locks before closing ports.
+ const bool close_port0 =
+ port0->state == Port::kReceiving || allow_close_on_bad_state;
+ const bool close_port1 =
+ port1->state == Port::kReceiving || allow_close_on_bad_state;
+ locker.reset();
+ ports_locker.Unlock();
+ if (close_port0) {
+ ClosePort(port0_ref);
+ }
+ if (close_port1) {
+ ClosePort(port1_ref);
+ }
+ return ERROR_PORT_STATE_UNEXPECTED;
+ }
+
+ pending_update_events[0] = {
+ .receiver = port0->peer_node_name,
+ .port = port0->peer_port_name,
+ .from_port = port0_ref.name(),
+ .sequence_num = port0->next_control_sequence_num_to_send++,
+ .new_prev_node = name_,
+ .new_prev_port = port1_ref.name()};
+ pending_update_events[1] = {
+ .receiver = port1->peer_node_name,
+ .port = port1->peer_port_name,
+ .from_port = port1_ref.name(),
+ .sequence_num = port1->next_control_sequence_num_to_send++,
+ .new_prev_node = name_,
+ .new_prev_port = port0_ref.name()};
+
+ // Swap the ports' peer information and switch them both to proxying mode.
+ SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
+ port0->state = Port::kProxying;
+ port1->state = Port::kProxying;
+ original_sequence_number[0] = port0->next_control_sequence_num_to_send;
+ original_sequence_number[1] = port1->next_control_sequence_num_to_send;
+ port0->next_control_sequence_num_to_send = kInitialSequenceNum;
+ port1->next_control_sequence_num_to_send = kInitialSequenceNum;
+ if (port0->peer_closed) {
+ port0->remove_proxy_on_last_message = true;
+ }
+ if (port1->peer_closed) {
+ port1->remove_proxy_on_last_message = true;
+ }
+ }
+
+ // Flush any queued messages from the new proxies and, if successful, complete
+ // the merge by initiating proxy removals.
+ if (ForwardUserMessagesFromProxy(port0_ref) == OK &&
+ ForwardUserMessagesFromProxy(port1_ref) == OK) {
+ // Send the prev peer updates out after the forwarding the user messages
+ // succeeded. Otherwise, we won't be able to restore the previous state
+ // below.
+ for (const auto& pending_update_event : pending_update_events) {
+ delegate_->ForwardEvent(
+ pending_update_event.receiver,
+ mozilla::MakeUnique<UpdatePreviousPeerEvent>(
+ pending_update_event.port, pending_update_event.from_port,
+ pending_update_event.sequence_num,
+ pending_update_event.new_prev_node,
+ pending_update_event.new_prev_port));
+ }
+
+ for (const auto* const port_ref : port_refs) {
+ bool try_remove_proxy_immediately = false;
+ ScopedEvent closure_event;
+ NodeName closure_event_target_node;
+ {
+ SinglePortLocker locker(port_ref);
+ auto* port = locker.port();
+ DCHECK_EQ(port->state, Port::kProxying);
+ try_remove_proxy_immediately = port->remove_proxy_on_last_message;
+ if (try_remove_proxy_immediately || port->peer_closed) {
+ // If either end of the port cycle is closed, we propagate an
+ // ObserveClosure event.
+ closure_event_target_node = port->peer_node_name;
+ closure_event = mozilla::MakeUnique<ObserveClosureEvent>(
+ port->peer_port_name, port_ref->name(),
+ port->next_control_sequence_num_to_send++,
+ port->last_sequence_num_to_receive);
+ }
+ }
+ if (try_remove_proxy_immediately) {
+ TryRemoveProxy(*port_ref);
+ } else {
+ InitiateProxyRemoval(*port_ref);
+ }
+
+ if (closure_event) {
+ delegate_->ForwardEvent(closure_event_target_node,
+ std::move(closure_event));
+ }
+ }
+
+ return OK;
+ }
+
+ // If we failed to forward proxied messages, we keep the system in a
+ // consistent state by undoing the peer swap and closing the ports.
+ {
+ PortLocker::AssertNoPortsLockedOnCurrentThread();
+ mozilla::MutexAutoLock ports_locker(ports_lock_);
+ PortLocker locker(port_refs, 2);
+ auto* port0 = locker.GetPort(port0_ref);
+ auto* port1 = locker.GetPort(port1_ref);
+ SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
+ port0->remove_proxy_on_last_message = false;
+ port1->remove_proxy_on_last_message = false;
+ DCHECK_EQ(Port::kProxying, port0->state);
+ DCHECK_EQ(Port::kProxying, port1->state);
+ port0->state = Port::kReceiving;
+ port1->state = Port::kReceiving;
+ port0->next_control_sequence_num_to_send = original_sequence_number[0];
+ port1->next_control_sequence_num_to_send = original_sequence_number[1];
+ }
+
+ ClosePort(port0_ref);
+ ClosePort(port1_ref);
+ return ERROR_PORT_STATE_UNEXPECTED;
+}
+
+void Node::ConvertToProxy(Port* port, const NodeName& to_node_name,
+ PortName* port_name,
+ Event::PortDescriptor* port_descriptor,
+ PendingUpdatePreviousPeer* pending_update) {
+ port->AssertLockAcquired();
+ PortName local_port_name = *port_name;
+
+ PortName new_port_name;
+ GenerateRandomPortName(&new_port_name);
+
+ pending_update->receiver = port->peer_node_name;
+ pending_update->port = port->peer_port_name;
+ pending_update->sequence_num = port->next_control_sequence_num_to_send++;
+ pending_update->new_prev_node = to_node_name;
+ pending_update->new_prev_port = new_port_name;
+
+ // Make sure we don't send messages to the new peer until after we know it
+ // exists. In the meantime, just buffer messages locally.
+ DCHECK_EQ(port->state, Port::kReceiving);
+ port->state = Port::kBuffering;
+
+ // If we already know our peer is closed, we already know this proxy can
+ // be removed once it receives and forwards its last expected message.
+ if (port->peer_closed) {
+ port->remove_proxy_on_last_message = true;
+ }
+
+ *port_name = new_port_name;
+
+ port_descriptor->peer_node_name = port->peer_node_name;
+ port_descriptor->peer_port_name = port->peer_port_name;
+ port_descriptor->referring_node_name = name_;
+ port_descriptor->referring_port_name = local_port_name;
+ port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
+ port_descriptor->next_sequence_num_to_receive =
+ port->message_queue.next_sequence_num();
+ port_descriptor->last_sequence_num_to_receive =
+ port->last_sequence_num_to_receive;
+ port_descriptor->peer_closed = port->peer_closed;
+ memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
+
+ // Configure the local port to point to the new port.
+ UpdatePortPeerAddress(local_port_name, port, to_node_name, new_port_name);
+}
+
+int Node::AcceptPort(const PortName& port_name,
+ const Event::PortDescriptor& port_descriptor) {
+ RefPtr<Port> port =
+ mozilla::MakeRefPtr<Port>(port_descriptor.next_sequence_num_to_send,
+ port_descriptor.next_sequence_num_to_receive);
+ port->state = Port::kReceiving;
+ port->peer_node_name = port_descriptor.peer_node_name;
+ port->peer_port_name = port_descriptor.peer_port_name;
+ port->next_control_sequence_num_to_send = kInitialSequenceNum;
+ port->next_control_sequence_num_to_receive = kInitialSequenceNum;
+ port->prev_node_name = port_descriptor.referring_node_name;
+ port->prev_port_name = port_descriptor.referring_port_name;
+ port->last_sequence_num_to_receive =
+ port_descriptor.last_sequence_num_to_receive;
+ port->peer_closed = port_descriptor.peer_closed;
+
+ DVLOG(2) << "Accepting port " << port_name
+ << " [peer_closed=" << port->peer_closed
+ << "; last_sequence_num_to_receive="
+ << port->last_sequence_num_to_receive << "]";
+
+ // A newly accepted port is not signalable until the message referencing the
+ // new port finds its way to the consumer (see GetMessage).
+ port->message_queue.set_signalable(false);
+
+ int rv = AddPortWithName(port_name, std::move(port));
+ if (rv != OK) {
+ return rv;
+ }
+
+ // Allow referring port to forward messages.
+ delegate_->ForwardEvent(port_descriptor.referring_node_name,
+ mozilla::MakeUnique<PortAcceptedEvent>(
+ port_descriptor.referring_port_name,
+ kInvalidPortName, kInvalidSequenceNum));
+ return OK;
+}
+
+int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
+ Port::State expected_port_state,
+ bool ignore_closed_peer,
+ UserMessageEvent* message,
+ NodeName* forward_to_node) {
+ bool target_is_remote = false;
+ std::vector<PendingUpdatePreviousPeer> peer_update_events;
+
+ for (;;) {
+ NodeName target_node_name;
+ {
+ SinglePortLocker locker(&forwarding_port_ref);
+ target_node_name = locker.port()->peer_node_name;
+ }
+
+ // NOTE: This may call out to arbitrary user code, so it's important to call
+ // it only while no port locks are held on the calling thread.
+ if (target_node_name != name_) {
+ if (!message->NotifyWillBeRoutedExternally()) {
+ CHROMIUM_LOG(ERROR)
+ << "NotifyWillBeRoutedExternally failed unexpectedly.";
+ return ERROR_PORT_STATE_UNEXPECTED;
+ }
+ }
+
+ // Must be held because ConvertToProxy needs to update |peer_port_maps_|.
+ PortLocker::AssertNoPortsLockedOnCurrentThread();
+ mozilla::MutexAutoLock ports_locker(ports_lock_);
+
+ // Simultaneously lock the forwarding port as well as all attached ports.
+ AutoTArray<PortRef, 4> attached_port_refs;
+ AutoTArray<const PortRef*, 5> ports_to_lock;
+ attached_port_refs.SetCapacity(message->num_ports());
+ ports_to_lock.SetCapacity(message->num_ports() + 1);
+ ports_to_lock.AppendElement(&forwarding_port_ref);
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ const PortName& attached_port_name = message->ports()[i];
+ auto iter = ports_.find(attached_port_name);
+ DCHECK(iter != ports_.end());
+ attached_port_refs.AppendElement(
+ PortRef(attached_port_name, iter->second));
+ ports_to_lock.AppendElement(&attached_port_refs[i]);
+ }
+ PortLocker locker(ports_to_lock.Elements(), ports_to_lock.Length());
+ auto* forwarding_port = locker.GetPort(forwarding_port_ref);
+
+ if (forwarding_port->peer_node_name != target_node_name) {
+ // The target node has already changed since we last held the lock.
+ if (target_node_name == name_) {
+ // If the target node was previously this local node, we need to restart
+ // the loop, since that means we may now route the message externally.
+ continue;
+ }
+
+ target_node_name = forwarding_port->peer_node_name;
+ }
+ target_is_remote = target_node_name != name_;
+
+ if (forwarding_port->state != expected_port_state) {
+ return ERROR_PORT_STATE_UNEXPECTED;
+ }
+ if (forwarding_port->peer_closed && !ignore_closed_peer) {
+ return ERROR_PORT_PEER_CLOSED;
+ }
+
+ // Messages may already have a sequence number if they're being forwarded by
+ // a proxy. Otherwise, use the next outgoing sequence number.
+ if (message->sequence_num() == 0) {
+ message->set_sequence_num(forwarding_port->next_sequence_num_to_send++);
+ }
+#ifdef DEBUG
+ std::ostringstream ports_buf;
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ if (i > 0) {
+ ports_buf << ",";
+ }
+ ports_buf << message->ports()[i];
+ }
+#endif
+
+ if (message->num_ports() > 0) {
+ // Sanity check to make sure we can actually send all the attached ports.
+ // They must all be in the |kReceiving| state and must not be the sender's
+ // own peer.
+ DCHECK_EQ(message->num_ports(), attached_port_refs.Length());
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ auto* attached_port = locker.GetPort(attached_port_refs[i]);
+ int error = OK;
+ if (attached_port->state != Port::kReceiving) {
+ error = ERROR_PORT_STATE_UNEXPECTED;
+ } else if (attached_port_refs[i].name() ==
+ forwarding_port->peer_port_name) {
+ error = ERROR_PORT_CANNOT_SEND_PEER;
+ }
+
+ if (error != OK) {
+ // Not going to send. Backpedal on the sequence number.
+ forwarding_port->next_sequence_num_to_send--;
+ return error;
+ }
+ }
+
+ if (target_is_remote) {
+ // We only bother to proxy and rewrite ports in the event if it's
+ // going to be routed to an external node. This substantially reduces
+ // the amount of port churn in the system, as many port-carrying
+ // events are routed at least 1 or 2 intra-node hops before (if ever)
+ // being routed externally.
+ Event::PortDescriptor* port_descriptors = message->port_descriptors();
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ auto* port = locker.GetPort(attached_port_refs[i]);
+ PendingUpdatePreviousPeer update_event = {
+ .from_port = attached_port_refs[i].name()};
+ ConvertToProxy(port, target_node_name, message->ports() + i,
+ port_descriptors + i, &update_event);
+ peer_update_events.push_back(update_event);
+ }
+ }
+ }
+
+#ifdef DEBUG
+ DVLOG(4) << "Sending message " << message->sequence_num()
+ << " [ports=" << ports_buf.str() << "]"
+ << " from " << forwarding_port_ref.name() << "@" << name_ << " to "
+ << forwarding_port->peer_port_name << "@" << target_node_name;
+#endif
+
+ *forward_to_node = target_node_name;
+ message->set_port_name(forwarding_port->peer_port_name);
+ message->set_from_port(forwarding_port_ref.name());
+ message->set_control_sequence_num(
+ forwarding_port->next_control_sequence_num_to_send++);
+ break;
+ }
+
+ for (auto& pending_update_event : peer_update_events) {
+ delegate_->ForwardEvent(
+ pending_update_event.receiver,
+ mozilla::MakeUnique<UpdatePreviousPeerEvent>(
+ pending_update_event.port, pending_update_event.from_port,
+ pending_update_event.sequence_num,
+ pending_update_event.new_prev_node,
+ pending_update_event.new_prev_port));
+ }
+
+ if (target_is_remote) {
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ // For any ports that were converted to proxies above, make sure their
+ // prior local peer (if applicable) receives a status update so it can be
+ // made aware of its peer's location.
+ const Event::PortDescriptor& descriptor = message->port_descriptors()[i];
+ if (descriptor.peer_node_name == name_) {
+ PortRef local_peer;
+ if (GetPort(descriptor.peer_port_name, &local_peer) == OK) {
+ delegate_->PortStatusChanged(local_peer);
+ }
+ }
+ }
+ }
+
+ return OK;
+}
+
+int Node::BeginProxying(const PortRef& port_ref) {
+ std::vector<std::pair<NodeName, ScopedEvent>> control_message_queue;
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (port->state != Port::kBuffering) {
+ return OOPS(ERROR_PORT_STATE_UNEXPECTED);
+ }
+ port->state = Port::kProxying;
+ std::swap(port->control_message_queue, control_message_queue);
+ }
+
+ for (auto& [control_message_node_name, control_message_event] :
+ control_message_queue) {
+ delegate_->ForwardEvent(control_message_node_name,
+ std::move(control_message_event));
+ }
+ control_message_queue.clear();
+
+ int rv = ForwardUserMessagesFromProxy(port_ref);
+ if (rv != OK) {
+ return rv;
+ }
+
+ // Forward any pending acknowledge request.
+ MaybeForwardAckRequest(port_ref);
+
+ bool try_remove_proxy_immediately;
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (port->state != Port::kProxying) {
+ return OOPS(ERROR_PORT_STATE_UNEXPECTED);
+ }
+
+ try_remove_proxy_immediately = port->remove_proxy_on_last_message;
+ }
+
+ if (try_remove_proxy_immediately) {
+ TryRemoveProxy(port_ref);
+ } else {
+ InitiateProxyRemoval(port_ref);
+ }
+
+ return OK;
+}
+
+int Node::ForwardUserMessagesFromProxy(const PortRef& port_ref) {
+ for (;;) {
+ // NOTE: We forward messages in sequential order here so that we maintain
+ // the message queue's notion of next sequence number. That's useful for the
+ // proxy removal process as we can tell when this port has seen all of the
+ // messages it is expected to see.
+ mozilla::UniquePtr<UserMessageEvent> message;
+ {
+ SinglePortLocker locker(&port_ref);
+ locker.port()->message_queue.GetNextMessage(&message, nullptr);
+ if (!message) {
+ break;
+ }
+ }
+
+ NodeName target_node;
+ int rv = PrepareToForwardUserMessage(port_ref, Port::kProxying,
+ true /* ignore_closed_peer */,
+ message.get(), &target_node);
+ {
+ // Mark the message as processed after we ran PrepareToForwardUserMessage.
+ // This is important to prevent another thread from deleting the port
+ // before we grabbed a sequence number for the message.
+ SinglePortLocker locker(&port_ref);
+ locker.port()->message_queue.MessageProcessed();
+ }
+ if (rv != OK) {
+ return rv;
+ }
+
+ delegate_->ForwardEvent(target_node, std::move(message));
+ }
+ return OK;
+}
+
+void Node::InitiateProxyRemoval(const PortRef& port_ref) {
+ NodeName peer_node_name;
+ PortName peer_port_name;
+ uint64_t sequence_num;
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (port->state == Port::kClosed) {
+ return;
+ }
+ peer_node_name = port->peer_node_name;
+ peer_port_name = port->peer_port_name;
+ sequence_num = port->next_control_sequence_num_to_send++;
+ DCHECK_EQ(port->state, Port::kProxying);
+ }
+
+ // To remove this node, we start by notifying the connected graph that we are
+ // a proxy. This allows whatever port is referencing this node to skip it.
+ // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
+ // the peer was closed in the meantime).
+ delegate_->ForwardEvent(
+ peer_node_name, mozilla::MakeUnique<ObserveProxyEvent>(
+ peer_port_name, port_ref.name(), sequence_num, name_,
+ port_ref.name(), peer_node_name, peer_port_name));
+}
+
+void Node::TryRemoveProxy(const PortRef& port_ref) {
+ bool should_erase = false;
+ NodeName removal_target_node;
+ ScopedEvent removal_event;
+ PendingUpdatePreviousPeer pending_update_event;
+
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (port->state == Port::kClosed) {
+ return;
+ }
+ DCHECK_EQ(port->state, Port::kProxying);
+
+ // Make sure we have seen ObserveProxyAck before removing the port.
+ if (!port->remove_proxy_on_last_message) {
+ return;
+ }
+
+ if (!CanAcceptMoreMessages(port)) {
+ DCHECK_EQ(port->message_queue.queued_message_count(), 0lu);
+ should_erase = true;
+ if (port->send_on_proxy_removal) {
+ removal_target_node = port->send_on_proxy_removal->first;
+ removal_event = std::move(port->send_on_proxy_removal->second);
+ if (removal_event) {
+ removal_event->set_control_sequence_num(
+ port->next_control_sequence_num_to_send++);
+ DCHECK_EQ(removal_target_node, port->peer_node_name);
+ DCHECK_EQ(removal_event->port_name(), port->peer_port_name);
+ }
+ }
+ // Tell the peer_node to accept messages from prev_node from now.
+ pending_update_event = {
+ .receiver = port->peer_node_name,
+ .port = port->peer_port_name,
+ .from_port = port_ref.name(),
+ .sequence_num = port->next_control_sequence_num_to_send++,
+ .new_prev_node = port->prev_node_name,
+ .new_prev_port = port->prev_port_name};
+ } else {
+ DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
+ << " now; waiting for more messages";
+ }
+ }
+
+ if (should_erase) {
+ delegate_->ForwardEvent(
+ pending_update_event.receiver,
+ mozilla::MakeUnique<UpdatePreviousPeerEvent>(
+ pending_update_event.port, pending_update_event.from_port,
+ pending_update_event.sequence_num,
+ pending_update_event.new_prev_node,
+ pending_update_event.new_prev_port));
+ ErasePort(port_ref.name());
+ }
+
+ if (removal_event) {
+ delegate_->ForwardEvent(removal_target_node, std::move(removal_event));
+ }
+}
+
+void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
+ const PortName& port_name) {
+ // Wipes out all ports whose peer node matches |node_name| and whose peer port
+ // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
+ // node is matched.
+
+ std::vector<PortRef> ports_to_notify;
+ std::vector<PortName> dead_proxies_to_broadcast;
+ std::vector<mozilla::UniquePtr<UserMessageEvent>> undelivered_messages;
+ std::vector<std::pair<NodeName, ScopedEvent>> closure_events;
+
+ {
+ PortLocker::AssertNoPortsLockedOnCurrentThread();
+ mozilla::MutexAutoLock ports_lock(ports_lock_);
+
+ auto node_peer_port_map_iter = peer_port_maps_.find(node_name);
+ if (node_peer_port_map_iter == peer_port_maps_.end()) {
+ return;
+ }
+
+ auto& node_peer_port_map = node_peer_port_map_iter->second;
+ auto peer_ports_begin = node_peer_port_map.begin();
+ auto peer_ports_end = node_peer_port_map.end();
+ if (port_name != kInvalidPortName) {
+ // If |port_name| is given, we limit the set of local ports to the ones
+ // with that specific port as their peer.
+ peer_ports_begin = node_peer_port_map.find(port_name);
+ if (peer_ports_begin == node_peer_port_map.end()) {
+ return;
+ }
+
+ peer_ports_end = peer_ports_begin;
+ ++peer_ports_end;
+ }
+
+ for (auto peer_port_iter = peer_ports_begin;
+ peer_port_iter != peer_ports_end; ++peer_port_iter) {
+ auto& local_ports = peer_port_iter->second;
+ // NOTE: This inner loop almost always has only one element. There are
+ // relatively short-lived cases where more than one local port points to
+ // the same peer, and this only happens when extra ports are bypassed
+ // proxies waiting to be torn down.
+ for (auto& local_port : local_ports) {
+ auto& local_port_ref = local_port.second;
+
+ SinglePortLocker locker(&local_port_ref);
+ auto* port = locker.port();
+
+ if (port_name != kInvalidPortName) {
+ // If this is a targeted observe dead proxy event, send out an
+ // ObserveClosure to acknowledge it.
+ closure_events.push_back(
+ std::pair{port->peer_node_name,
+ mozilla::MakeUnique<ObserveClosureEvent>(
+ port->peer_port_name, local_port_ref.name(),
+ port->next_control_sequence_num_to_send++,
+ port->last_sequence_num_to_receive)});
+ }
+
+ if (!port->peer_closed) {
+ // Treat this as immediate peer closure. It's an exceptional
+ // condition akin to a broken pipe, so we don't care about losing
+ // messages.
+
+ port->peer_closed = true;
+ port->peer_lost_unexpectedly = true;
+ if (port->state == Port::kReceiving) {
+ ports_to_notify.push_back(local_port_ref);
+ }
+ }
+
+ // We don't expect to forward any further messages, and we don't
+ // expect to receive a Port{Accepted,Rejected} event. Because we're
+ // a proxy with no active peer, we cannot use the normal proxy removal
+ // procedure of forward-propagating an ObserveProxy. Instead we
+ // broadcast our own death so it can be back-propagated. This is
+ // inefficient but rare.
+ if (port->state == Port::kBuffering || port->state == Port::kProxying) {
+ port->state = Port::kClosed;
+ dead_proxies_to_broadcast.push_back(local_port_ref.name());
+ std::vector<mozilla::UniquePtr<UserMessageEvent>> messages;
+ port->message_queue.TakeAllMessages(&messages);
+ port->TakePendingMessages(messages);
+ for (auto& message : messages) {
+ undelivered_messages.emplace_back(std::move(message));
+ }
+ }
+ }
+ }
+ }
+
+ for (auto& [closure_event_target_node, closure_event] : closure_events) {
+ delegate_->ForwardEvent(closure_event_target_node,
+ std::move(closure_event));
+ }
+
+ // Wake up any receiving ports who have just observed simulated peer closure.
+ for (const auto& port : ports_to_notify) {
+ delegate_->PortStatusChanged(port);
+ }
+
+ for (const auto& proxy_name : dead_proxies_to_broadcast) {
+ // Broadcast an event signifying that this proxy is no longer functioning.
+ delegate_->BroadcastEvent(mozilla::MakeUnique<ObserveProxyEvent>(
+ kInvalidPortName, kInvalidPortName, kInvalidSequenceNum, name_,
+ proxy_name, kInvalidNodeName, kInvalidPortName));
+
+ // Also process death locally since the port that points this closed one
+ // could be on the current node.
+ // Note: Although this is recursive, only a single port is involved which
+ // limits the expected branching to 1.
+ DestroyAllPortsWithPeer(name_, proxy_name);
+ }
+
+ // Close any ports referenced by undelivered messages.
+ for (const auto& message : undelivered_messages) {
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ PortRef ref;
+ if (GetPort(message->ports()[i], &ref) == OK) {
+ ClosePort(ref);
+ }
+ }
+ }
+}
+
+void Node::UpdatePortPeerAddress(const PortName& local_port_name,
+ Port* local_port,
+ const NodeName& new_peer_node,
+ const PortName& new_peer_port) {
+ ports_lock_.AssertCurrentThreadOwns();
+ local_port->AssertLockAcquired();
+
+ RemoveFromPeerPortMap(local_port_name, local_port);
+ local_port->peer_node_name = new_peer_node;
+ local_port->peer_port_name = new_peer_port;
+ local_port->next_control_sequence_num_to_send = kInitialSequenceNum;
+ if (new_peer_port != kInvalidPortName) {
+ peer_port_maps_[new_peer_node][new_peer_port].emplace(
+ local_port_name, PortRef(local_port_name, RefPtr<Port>{local_port}));
+ }
+}
+
+void Node::RemoveFromPeerPortMap(const PortName& local_port_name,
+ Port* local_port) {
+ if (local_port->peer_port_name == kInvalidPortName) {
+ return;
+ }
+
+ auto node_iter = peer_port_maps_.find(local_port->peer_node_name);
+ if (node_iter == peer_port_maps_.end()) {
+ return;
+ }
+
+ auto& node_peer_port_map = node_iter->second;
+ auto ports_iter = node_peer_port_map.find(local_port->peer_port_name);
+ if (ports_iter == node_peer_port_map.end()) {
+ return;
+ }
+
+ auto& local_ports_with_this_peer = ports_iter->second;
+ local_ports_with_this_peer.erase(local_port_name);
+ if (local_ports_with_this_peer.empty()) {
+ node_peer_port_map.erase(ports_iter);
+ }
+ if (node_peer_port_map.empty()) {
+ peer_port_maps_.erase(node_iter);
+ }
+}
+
+void Node::SwapPortPeers(const PortName& port0_name, Port* port0,
+ const PortName& port1_name, Port* port1) {
+ ports_lock_.AssertCurrentThreadOwns();
+ port0->AssertLockAcquired();
+ port1->AssertLockAcquired();
+
+ auto& peer0_ports =
+ peer_port_maps_[port0->peer_node_name][port0->peer_port_name];
+ auto& peer1_ports =
+ peer_port_maps_[port1->peer_node_name][port1->peer_port_name];
+ peer0_ports.erase(port0_name);
+ peer1_ports.erase(port1_name);
+ peer0_ports.emplace(port1_name, PortRef(port1_name, RefPtr<Port>{port1}));
+ peer1_ports.emplace(port0_name, PortRef(port0_name, RefPtr<Port>{port0}));
+
+ std::swap(port0->peer_node_name, port1->peer_node_name);
+ std::swap(port0->peer_port_name, port1->peer_port_name);
+}
+
+void Node::MaybeResendAckRequest(const PortRef& port_ref) {
+ NodeName peer_node_name;
+ ScopedEvent ack_request_event;
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (port->state != Port::kReceiving) {
+ return;
+ }
+
+ if (!port->sequence_num_acknowledge_interval) {
+ return;
+ }
+
+ peer_node_name = port->peer_node_name;
+ ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
+ port->peer_port_name, port_ref.name(),
+ port->next_control_sequence_num_to_send++,
+ port->last_sequence_num_acknowledged +
+ port->sequence_num_acknowledge_interval);
+ }
+
+ delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
+}
+
+void Node::MaybeForwardAckRequest(const PortRef& port_ref) {
+ NodeName peer_node_name;
+ ScopedEvent ack_request_event;
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (port->state != Port::kProxying) {
+ return;
+ }
+
+ if (!port->sequence_num_to_acknowledge) {
+ return;
+ }
+
+ peer_node_name = port->peer_node_name;
+ ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
+ port->peer_port_name, port_ref.name(),
+ port->next_control_sequence_num_to_send++,
+ port->sequence_num_to_acknowledge);
+
+ port->sequence_num_to_acknowledge = 0;
+ }
+
+ delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
+}
+
+void Node::MaybeResendAck(const PortRef& port_ref) {
+ NodeName peer_node_name;
+ ScopedEvent ack_event;
+ {
+ SinglePortLocker locker(&port_ref);
+ auto* port = locker.port();
+ if (port->state != Port::kReceiving) {
+ return;
+ }
+
+ uint64_t last_sequence_num_read =
+ port->message_queue.next_sequence_num() - 1;
+ if (!port->sequence_num_to_acknowledge || !last_sequence_num_read) {
+ return;
+ }
+
+ peer_node_name = port->peer_node_name;
+ ack_event = mozilla::MakeUnique<UserMessageReadAckEvent>(
+ port->peer_port_name, port_ref.name(),
+ port->next_control_sequence_num_to_send++, last_sequence_num_read);
+ }
+
+ delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
+}
+
+Node::DelegateHolder::DelegateHolder(Node* node, NodeDelegate* delegate)
+ : node_(node), delegate_(delegate) {
+ DCHECK(node_);
+}
+
+Node::DelegateHolder::~DelegateHolder() = default;
+
+#ifdef DEBUG
+void Node::DelegateHolder::EnsureSafeDelegateAccess() const {
+ PortLocker::AssertNoPortsLockedOnCurrentThread();
+ mozilla::MutexAutoLock lock(node_->ports_lock_);
+}
+#endif
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
diff --git a/ipc/chromium/src/mojo/core/ports/node.h b/ipc/chromium/src/mojo/core/ports/node.h
new file mode 100644
index 0000000000..9dbbd2a337
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/node.h
@@ -0,0 +1,355 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_CORE_PORTS_NODE_H_
+#define MOJO_CORE_PORTS_NODE_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <unordered_map>
+
+#include "mojo/core/ports/event.h"
+#include "mojo/core/ports/name.h"
+#include "mojo/core/ports/port.h"
+#include "mojo/core/ports/port_ref.h"
+#include "mojo/core/ports/user_data.h"
+#include "mozilla/Mutex.h"
+#include "mozilla/RefPtr.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+enum : int {
+ OK = 0,
+ ERROR_PORT_UNKNOWN = -10,
+ ERROR_PORT_EXISTS = -11,
+ ERROR_PORT_STATE_UNEXPECTED = -12,
+ ERROR_PORT_CANNOT_SEND_SELF = -13,
+ ERROR_PORT_PEER_CLOSED = -14,
+ ERROR_PORT_CANNOT_SEND_PEER = -15,
+ ERROR_NOT_IMPLEMENTED = -100,
+};
+
+struct PortStatus {
+ bool has_messages;
+ bool receiving_messages;
+ bool peer_closed;
+ bool peer_remote;
+ size_t queued_message_count;
+ size_t queued_num_bytes;
+ size_t unacknowledged_message_count;
+#ifdef FUZZING_SNAPSHOT
+ NodeName peer_node_name;
+#endif
+};
+
+struct PendingUpdatePreviousPeer {
+ NodeName receiver;
+ PortName port;
+ PortName from_port;
+ uint64_t sequence_num;
+ NodeName new_prev_node;
+ PortName new_prev_port;
+};
+
+class MessageFilter;
+class NodeDelegate;
+
+// A Node maintains a collection of Ports (see port.h) indexed by unique 128-bit
+// addresses (names), performing routing and processing of events among the
+// Ports within the Node and to or from other Nodes in the system. Typically
+// (and practically, in all uses today) there is a single Node per system
+// process. Thus a Node boundary effectively models a process boundary.
+//
+// New Ports can be created uninitialized using CreateUninitializedPort (and
+// later initialized using InitializePort), or created in a fully initialized
+// state using CreatePortPair(). Initialized ports have exactly one conjugate
+// port which is the ultimate receiver of any user messages sent by that port.
+// See SendUserMessage().
+//
+// In addition to routing user message events, various control events are used
+// by Nodes to coordinate Port behavior and lifetime within and across Nodes.
+// See Event documentation for description of different types of events used by
+// a Node to coordinate behavior.
+class Node {
+ public:
+ enum class ShutdownPolicy {
+ DONT_ALLOW_LOCAL_PORTS,
+ ALLOW_LOCAL_PORTS,
+ };
+
+ // Does not take ownership of the delegate.
+ Node(const NodeName& name, NodeDelegate* delegate);
+ ~Node();
+
+ Node(const Node&) = delete;
+ void operator=(const Node&) = delete;
+
+ // Returns true iff there are no open ports referring to another node or ports
+ // in the process of being transferred from this node to another. If this
+ // returns false, then to ensure clean shutdown, it is necessary to keep the
+ // node alive and continue routing messages to it via AcceptMessage. This
+ // method may be called again after AcceptMessage to check if the Node is now
+ // ready to be destroyed.
+ //
+ // If |policy| is set to |ShutdownPolicy::ALLOW_LOCAL_PORTS|, this will return
+ // |true| even if some ports remain alive, as long as none of them are proxies
+ // to another node.
+ bool CanShutdownCleanly(
+ ShutdownPolicy policy = ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS);
+
+ // Lookup the named port.
+ int GetPort(const PortName& port_name, PortRef* port_ref);
+
+ // Creates a port on this node. Before the port can be used, it must be
+ // initialized using InitializePort. This method is useful for bootstrapping
+ // a connection between two nodes. Generally, ports are created using
+ // CreatePortPair instead.
+ int CreateUninitializedPort(PortRef* port_ref);
+
+ // Initializes a newly created port.
+ int InitializePort(const PortRef& port_ref, const NodeName& peer_node_name,
+ const PortName& peer_port_name,
+ const NodeName& prev_node_name,
+ const PortName& prev_port_name);
+
+ // Generates a new connected pair of ports bound to this node. These ports
+ // are initialized and ready to go.
+ int CreatePortPair(PortRef* port0_ref, PortRef* port1_ref);
+
+ // User data associated with the port.
+ int SetUserData(const PortRef& port_ref, RefPtr<UserData> user_data);
+ int GetUserData(const PortRef& port_ref, RefPtr<UserData>* user_data);
+
+ // Prevents further messages from being sent from this port or delivered to
+ // this port. The port is removed, and the port's peer is notified of the
+ // closure after it has consumed all pending messages.
+ int ClosePort(const PortRef& port_ref);
+
+ // Returns the current status of the port.
+ int GetStatus(const PortRef& port_ref, PortStatus* port_status);
+
+ // Returns the next available message on the specified port or returns a null
+ // message if there are none available. Returns ERROR_PORT_PEER_CLOSED to
+ // indicate that this port's peer has closed. In such cases GetMessage may
+ // be called until it yields a null message, indicating that no more messages
+ // may be read from the port.
+ //
+ // If |filter| is non-null, the next available message is returned only if it
+ // is matched by the filter. If the provided filter does not match the next
+ // available message, GetMessage() behaves as if there is no message
+ // available. Ownership of |filter| is not taken, and it must outlive the
+ // extent of this call.
+ int GetMessage(const PortRef& port_ref,
+ mozilla::UniquePtr<UserMessageEvent>* message,
+ MessageFilter* filter);
+
+ // Sends a message from the specified port to its peer. Note that the message
+ // notification may arrive synchronously (via PortStatusChanged() on the
+ // delegate) if the peer is local to this Node.
+ int SendUserMessage(const PortRef& port_ref,
+ mozilla::UniquePtr<UserMessageEvent> message);
+
+ // Makes the port send acknowledge requests to its conjugate to acknowledge
+ // at least every |sequence_number_acknowledge_interval| messages as they're
+ // read from the conjugate. The number of unacknowledged messages is exposed
+ // in the |unacknowledged_message_count| field of PortStatus. This allows
+ // bounding the number of unread and/or in-transit messages from this port
+ // to its conjugate between zero and |unacknowledged_message_count|.
+ int SetAcknowledgeRequestInterval(const PortRef& port_ref,
+ uint64_t sequence_num_acknowledge_interval);
+
+ // Corresponding to NodeDelegate::ForwardEvent.
+ int AcceptEvent(const NodeName& from_node, ScopedEvent event);
+
+ // Called to merge two ports with each other. If you have two independent
+ // port pairs A <=> B and C <=> D, the net result of merging B and C is a
+ // single connected port pair A <=> D.
+ //
+ // Note that the behavior of this operation is undefined if either port to be
+ // merged (B or C above) has ever been read from or written to directly, and
+ // this must ONLY be called on one side of the merge, though it doesn't matter
+ // which side.
+ //
+ // It is safe for the non-merged peers (A and D above) to be transferred,
+ // closed, and/or written to before, during, or after the merge.
+ int MergePorts(const PortRef& port_ref, const NodeName& destination_node_name,
+ const PortName& destination_port_name);
+
+ // Like above but merges two ports local to this node. Because both ports are
+ // local this can also verify that neither port has been written to before the
+ // merge. If this fails for any reason, both ports are closed. Otherwise OK
+ // is returned and the ports' receiving peers are connected to each other.
+ int MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref);
+
+ // Called to inform this node that communication with another node is lost
+ // indefinitely. This triggers cleanup of ports bound to this node.
+ int LostConnectionToNode(const NodeName& node_name);
+
+ private:
+ // Helper to ensure that a Node always calls into its delegate safely, i.e.
+ // without holding any internal locks.
+ class DelegateHolder {
+ public:
+ DelegateHolder(Node* node, NodeDelegate* delegate);
+ ~DelegateHolder();
+
+ DelegateHolder(const DelegateHolder&) = delete;
+ void operator=(const DelegateHolder&) = delete;
+
+ NodeDelegate* operator->() const {
+ EnsureSafeDelegateAccess();
+ return delegate_;
+ }
+
+ private:
+#ifdef DEBUG
+ void EnsureSafeDelegateAccess() const;
+#else
+ void EnsureSafeDelegateAccess() const {}
+#endif
+
+ Node* const node_;
+ NodeDelegate* const delegate_;
+ };
+
+ int OnUserMessage(const PortRef& port_ref, const NodeName& from_node,
+ mozilla::UniquePtr<UserMessageEvent> message);
+ int OnPortAccepted(const PortRef& port_ref,
+ mozilla::UniquePtr<PortAcceptedEvent> event);
+ int OnObserveProxy(const PortRef& port_ref,
+ mozilla::UniquePtr<ObserveProxyEvent> event);
+ int OnObserveProxyAck(const PortRef& port_ref,
+ mozilla::UniquePtr<ObserveProxyAckEvent> event);
+ int OnObserveClosure(const PortRef& port_ref,
+ mozilla::UniquePtr<ObserveClosureEvent> event);
+ int OnMergePort(const PortRef& port_ref,
+ mozilla::UniquePtr<MergePortEvent> event);
+ int OnUserMessageReadAckRequest(
+ const PortRef& port_ref,
+ mozilla::UniquePtr<UserMessageReadAckRequestEvent> event);
+ int OnUserMessageReadAck(const PortRef& port_ref,
+ mozilla::UniquePtr<UserMessageReadAckEvent> event);
+ int OnUpdatePreviousPeer(const PortRef& port_ref,
+ mozilla::UniquePtr<UpdatePreviousPeerEvent> event);
+
+ int AddPortWithName(const PortName& port_name, RefPtr<Port> port);
+ void ErasePort(const PortName& port_name);
+
+ // Check if the event is sent by the previous peer of the port to decide if
+ // we can check the sequence number.
+ // This is not the case for example for PortAccepted or broadcasted events.
+ bool IsEventFromPreviousPeer(const Event& event);
+
+ int AcceptEventInternal(const PortRef& port_ref, const NodeName& from_node,
+ ScopedEvent event);
+
+ int SendUserMessageInternal(const PortRef& port_ref,
+ mozilla::UniquePtr<UserMessageEvent>* message);
+ int MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref,
+ bool allow_close_on_bad_state);
+ void ConvertToProxy(Port* port, const NodeName& to_node_name,
+ PortName* port_name,
+ Event::PortDescriptor* port_descriptor,
+ PendingUpdatePreviousPeer* pending_update)
+ MOZ_REQUIRES(ports_lock_);
+ int AcceptPort(const PortName& port_name,
+ const Event::PortDescriptor& port_descriptor);
+
+ int PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
+ Port::State expected_port_state,
+ bool ignore_closed_peer,
+ UserMessageEvent* message,
+ NodeName* forward_to_node);
+ int BeginProxying(const PortRef& port_ref);
+ int ForwardUserMessagesFromProxy(const PortRef& port_ref);
+ void InitiateProxyRemoval(const PortRef& port_ref);
+ void TryRemoveProxy(const PortRef& port_ref);
+ void DestroyAllPortsWithPeer(const NodeName& node_name,
+ const PortName& port_name);
+
+ // Changes the peer node and port name referenced by |port|. Note that both
+ // |ports_lock_| MUST be held through the extent of this method.
+ // |local_port|'s lock must be held if and only if a reference to |local_port|
+ // exist in |ports_|.
+ void UpdatePortPeerAddress(const PortName& local_port_name, Port* local_port,
+ const NodeName& new_peer_node,
+ const PortName& new_peer_port)
+ MOZ_REQUIRES(ports_lock_);
+
+ // Removes an entry from |peer_port_map_| corresponding to |local_port|'s peer
+ // address, if valid.
+ void RemoveFromPeerPortMap(const PortName& local_port_name, Port* local_port)
+ MOZ_REQUIRES(ports_lock_);
+
+ // Swaps the peer information for two local ports. Used during port merges.
+ // Note that |ports_lock_| must be held along with each of the two port's own
+ // locks, through the extent of this method.
+ void SwapPortPeers(const PortName& port0_name, Port* port0,
+ const PortName& port1_name, Port* port1)
+ MOZ_REQUIRES(ports_lock_);
+
+ // Sends an acknowledge request to the peer if the port has a non-zero
+ // |sequence_num_acknowledge_interval|. This needs to be done when the port's
+ // peer changes, as the previous peer proxy may not have forwarded any prior
+ // acknowledge request before deleting itself.
+ void MaybeResendAckRequest(const PortRef& port_ref);
+
+ // Forwards a stored acknowledge request to the peer if the proxy has a
+ // non-zero |sequence_num_acknowledge_interval|.
+ void MaybeForwardAckRequest(const PortRef& port_ref);
+
+ // Sends an acknowledge of the most recently read sequence number to the peer
+ // if any messages have been read, and the port has a non-zero
+ // |sequence_num_to_acknowledge|.
+ void MaybeResendAck(const PortRef& port_ref);
+
+ const NodeName name_;
+ const DelegateHolder delegate_;
+
+ // Just to clarify readability of the types below.
+ using LocalPortName = PortName;
+ using PeerPortName = PortName;
+
+ // Guards access to |ports_| and |peer_port_maps_| below.
+ //
+ // This must never be acquired while an individual port's lock is held on the
+ // same thread. Conversely, individual port locks may be acquired while this
+ // one is held.
+ //
+ // Because UserMessage events may execute arbitrary user code during
+ // destruction, it is also important to ensure that such events are never
+ // destroyed while this (or any individual Port) lock is held.
+ mozilla::Mutex ports_lock_{"Ports Lock"};
+ std::unordered_map<LocalPortName, RefPtr<Port>> ports_
+ MOZ_GUARDED_BY(ports_lock_);
+
+ // Maps a peer port name to a list of PortRefs for all local ports which have
+ // the port name key designated as their peer port. The set of local ports
+ // which have the same peer port is expected to always be relatively small and
+ // usually 1. Hence we just use a flat_map of local PortRefs keyed on each
+ // local port's name.
+ //
+ // FIXME(nika): We don't have `base::flat_map` or a super equivalent type with
+ // the same API, so just use a nested `std::unordered_map` for now. We should
+ // probably change all of this to instead use our types eventually.
+ using PeerPortMap =
+ std::unordered_map<PeerPortName,
+ std::unordered_map<LocalPortName, PortRef>>;
+
+ // A reverse mapping which can be used to find all local ports that reference
+ // a given peer node or a local port that references a specific given peer
+ // port on a peer node. The key to this map is the corresponding peer node
+ // name.
+ std::unordered_map<NodeName, PeerPortMap> peer_port_maps_
+ MOZ_GUARDED_BY(ports_lock_);
+};
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
+
+#endif // MOJO_CORE_PORTS_NODE_H_
diff --git a/ipc/chromium/src/mojo/core/ports/node_delegate.h b/ipc/chromium/src/mojo/core/ports/node_delegate.h
new file mode 100644
index 0000000000..3172779c06
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/node_delegate.h
@@ -0,0 +1,38 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_CORE_PORTS_NODE_DELEGATE_H_
+#define MOJO_CORE_PORTS_NODE_DELEGATE_H_
+
+#include <stddef.h>
+
+#include "mojo/core/ports/event.h"
+#include "mojo/core/ports/name.h"
+#include "mojo/core/ports/port_ref.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+class NodeDelegate {
+ public:
+ virtual ~NodeDelegate() = default;
+
+ // Forward an event (possibly asynchronously) to the specified node.
+ virtual void ForwardEvent(const NodeName& node, ScopedEvent event) = 0;
+
+ // Broadcast an event to all nodes.
+ virtual void BroadcastEvent(ScopedEvent event) = 0;
+
+ // Indicates that the port's status has changed recently. Use Node::GetStatus
+ // to query the latest status of the port. Note, this event could be spurious
+ // if another thread is simultaneously modifying the status of the port.
+ virtual void PortStatusChanged(const PortRef& port_ref) = 0;
+};
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
+
+#endif // MOJO_CORE_PORTS_NODE_DELEGATE_H_
diff --git a/ipc/chromium/src/mojo/core/ports/port.cc b/ipc/chromium/src/mojo/core/ports/port.cc
new file mode 100644
index 0000000000..871ec8fca6
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/port.cc
@@ -0,0 +1,95 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/core/ports/port.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+// Used by std::{push,pop}_heap functions
+inline bool operator<(const mozilla::UniquePtr<Event>& a,
+ const mozilla::UniquePtr<Event>& b) {
+ return a->control_sequence_num() > b->control_sequence_num();
+}
+
+Port::Port(uint64_t next_sequence_num_to_send,
+ uint64_t next_sequence_num_to_receive)
+ : state(kUninitialized),
+ pending_merge_peer(false),
+ next_control_sequence_num_to_send(kInitialSequenceNum),
+ next_control_sequence_num_to_receive(kInitialSequenceNum),
+ next_sequence_num_to_send(next_sequence_num_to_send),
+ last_sequence_num_acknowledged(next_sequence_num_to_send - 1),
+ sequence_num_acknowledge_interval(0),
+ last_sequence_num_to_receive(0),
+ sequence_num_to_acknowledge(0),
+ message_queue(next_sequence_num_to_receive),
+ remove_proxy_on_last_message(false),
+ peer_closed(false),
+ peer_lost_unexpectedly(false) {}
+
+Port::~Port() = default;
+
+bool Port::IsNextEvent(const NodeName& from_node, const Event& event) {
+ if (from_node != prev_node_name) {
+ return false;
+ }
+
+ if (event.from_port() != prev_port_name) {
+ return false;
+ }
+
+ DCHECK_GE(event.control_sequence_num(), next_control_sequence_num_to_receive);
+ return event.control_sequence_num() == next_control_sequence_num_to_receive;
+}
+
+void Port::NextEvent(NodeName* from_node, ScopedEvent* event) {
+ auto it = control_event_queues_.find({prev_node_name, prev_port_name});
+ if (it == control_event_queues_.end()) {
+ return;
+ }
+
+ auto& msg_queue = it->second;
+ // There must always be one entry since we delete the queue after processing
+ // the last element.
+ DCHECK_GE(msg_queue.size(), 1lu);
+
+ if (msg_queue[0]->control_sequence_num() !=
+ next_control_sequence_num_to_receive) {
+ return;
+ }
+
+ std::pop_heap(msg_queue.begin(), msg_queue.end());
+ *from_node = prev_node_name;
+ *event = std::move(msg_queue.back());
+ msg_queue.pop_back();
+ if (msg_queue.size() == 0) {
+ control_event_queues_.erase(it);
+ }
+}
+
+void Port::BufferEvent(const NodeName& from_node, ScopedEvent event) {
+ DCHECK(!IsNextEvent(from_node, *event));
+
+ auto& event_heap = control_event_queues_[{from_node, event->from_port()}];
+ event_heap.emplace_back(std::move(event));
+ std::push_heap(event_heap.begin(), event_heap.end());
+}
+
+void Port::TakePendingMessages(
+ std::vector<mozilla::UniquePtr<UserMessageEvent>>& messages) {
+ for (auto& node_queue_pair : control_event_queues_) {
+ auto& events = node_queue_pair.second;
+ for (auto& event : events) {
+ if (event->type() != Event::Type::kUserMessage) continue;
+ messages.emplace_back(Event::Cast<UserMessageEvent>(&event));
+ }
+ }
+ control_event_queues_.clear();
+}
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
diff --git a/ipc/chromium/src/mojo/core/ports/port.h b/ipc/chromium/src/mojo/core/ports/port.h
new file mode 100644
index 0000000000..44529ddb6f
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/port.h
@@ -0,0 +1,277 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_CORE_PORTS_PORT_H_
+#define MOJO_CORE_PORTS_PORT_H_
+
+#include <map>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "mojo/core/ports/event.h"
+#include "mojo/core/ports/message_queue.h"
+#include "mojo/core/ports/user_data.h"
+#include "mozilla/Atomics.h"
+#include "mozilla/PlatformMutex.h"
+#include "mozilla/RefPtr.h"
+#include "nsISupportsImpl.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+class PortLocker;
+
+namespace detail {
+
+// Ports cannot use mozilla::Mutex, as the acquires-before relationships handled
+// by PortLocker can overload the debug-only deadlock detector.
+class MOZ_CAPABILITY("mutex") PortMutex : private ::mozilla::detail::MutexImpl {
+ public:
+ void AssertCurrentThreadOwns() const MOZ_ASSERT_CAPABILITY(this) {
+#ifdef DEBUG
+ MOZ_ASSERT(mOwningThread == PR_GetCurrentThread());
+#endif
+ }
+
+ private:
+ // PortMutex should only be locked/unlocked via PortLocker
+ friend class ::mojo::core::ports::PortLocker;
+
+ void Lock() MOZ_CAPABILITY_ACQUIRE() {
+ ::mozilla::detail::MutexImpl::lock();
+#ifdef DEBUG
+ mOwningThread = PR_GetCurrentThread();
+#endif
+ }
+ void Unlock() MOZ_CAPABILITY_RELEASE() {
+#ifdef DEBUG
+ MOZ_ASSERT(mOwningThread == PR_GetCurrentThread());
+ mOwningThread = nullptr;
+#endif
+ ::mozilla::detail::MutexImpl::unlock();
+ }
+
+#ifdef DEBUG
+ mozilla::Atomic<PRThread*, mozilla::Relaxed> mOwningThread;
+#endif
+};
+
+} // namespace detail
+
+// A Port is essentially a node in a circular list of addresses. For the sake of
+// this documentation such a list will henceforth be referred to as a "route."
+// Routes are the fundamental medium upon which all Node event circulation takes
+// place and are thus the backbone of all Mojo message passing.
+//
+// Each Port is identified by a 128-bit address within a Node (see node.h). A
+// Port doesn't really *do* anything per se: it's a named collection of state,
+// and its owning Node manages all event production, transmission, routing, and
+// processing logic. See Node for more details on how Ports may be used to
+// transmit arbitrary user messages as well as other Ports.
+//
+// Ports may be in any of a handful of states (see State below) which dictate
+// how they react to system events targeting them. In the simplest and most
+// common case, Ports are initially created as an entangled pair (i.e. a simple
+// cycle consisting of two Ports) both in the |kReceiving| State. Consider Ports
+// we'll label |A| and |B| here, which may be created using
+// Node::CreatePortPair():
+//
+// +-----+ +-----+
+// | |--------->| |
+// | A | | B |
+// | |<---------| |
+// +-----+ +-----+
+//
+// |A| references |B| via |peer_node_name| and |peer_port_name|, while |B| in
+// turn references |A|. Note that a Node is NEVER aware of who is sending events
+// to a given Port; it is only aware of where it must route events FROM a given
+// Port.
+//
+// For the sake of documentation, we refer to one receiving port in a route as
+// the "conjugate" of the other. A receiving port's conjugate is also its peer
+// upon initial creation, but because of proxying this may not be the case at a
+// later time.
+//
+// ALL access to this data structure must be guarded by |lock_| acquisition,
+// which is only possible using a PortLocker. PortLocker ensures that
+// overlapping Port lock acquisitions on a single thread are always acquired in
+// a globally consistent order.
+class Port {
+ NS_INLINE_DECL_THREADSAFE_REFCOUNTING(Port)
+
+ public:
+ // The state of a given Port. A Port may only exist in one of these states at
+ // any given time.
+ enum State {
+ // The Port is not yet paired with a peer and is therefore unusable. See
+ // Node::CreateUninitializedPort and Node::InitializePort for motivation.
+ kUninitialized,
+
+ // The Port is publicly visible outside of its Node and may be used to send
+ // and receive user messages. There are always AT MOST two |kReceiving|
+ // Ports along any given route. A user message event sent from a receiving
+ // port is always circulated along the Port's route until it reaches either
+ // a dead-end -- in which case the route is broken -- or it reaches the
+ // other receiving Port in the route -- in which case it lands in that
+ // Port's incoming message queue which can by read by user code.
+ kReceiving,
+
+ // The Port has been taken out of the |kReceiving| state in preparation for
+ // proxying to a new destination. A Port enters this state immediately when
+ // it's attached to a user message and may only leave this state when
+ // transitioning to |kProxying|. See Node for more details.
+ kBuffering,
+
+ // The Port is forwarding all user messages (and most other events) to its
+ // peer without discretion. Ports in the |kProxying| state may never leave
+ // this state and only exist temporarily until their owning Node has
+ // established that no more events will target them. See Node for more
+ // details.
+ kProxying,
+
+ // The Port has been closed and is now permanently unusable. Only
+ // |kReceiving| ports can be closed.
+ kClosed
+ };
+
+ // The current State of the Port.
+ State state;
+
+ // The Node and Port address to which events should be routed FROM this Port.
+ // Note that this is NOT necessarily the address of the Port currently sending
+ // events TO this Port.
+ NodeName peer_node_name;
+ PortName peer_port_name;
+
+ // We keep track of the port that is currently sending messages to this port.
+ // This allows us to verify that the sender node is allowed to send messages
+ // to this port as a mitigation against info leak vulnerabilities.
+ // Tracking the previous port has the nice side effect of keeping received
+ // messages in order.
+ NodeName prev_node_name;
+ PortName prev_port_name;
+
+ // Mark this port as to be merged.
+ bool pending_merge_peer;
+
+ // Next sequence number to send for all event messages.
+ uint64_t next_control_sequence_num_to_send;
+ uint64_t next_control_sequence_num_to_receive;
+
+ // The next available sequence number to use for outgoing user message events
+ // originating from this port.
+ uint64_t next_sequence_num_to_send;
+
+ // The largest acknowledged user message event sequence number.
+ uint64_t last_sequence_num_acknowledged;
+
+ // The interval for which acknowledge requests will be sent. A value of N will
+ // cause an acknowledge request for |last_sequence_num_acknowledged| + N when
+ // initially set and on received acknowledge. This means that the lower bound
+ // for unread or in-transit messages is |next_sequence_num_to_send| -
+ // |last_sequence_num_acknowledged| + |sequence_number_acknowledge_interval|.
+ // If zero, no acknowledge requests are sent.
+ uint64_t sequence_num_acknowledge_interval;
+
+ // The sequence number of the last message this Port should ever expect to
+ // receive in its lifetime. May be used to determine that a proxying port is
+ // ready to be destroyed or that a receiving port's conjugate has been closed
+ // and we know the sequence number of the last message it sent.
+ uint64_t last_sequence_num_to_receive;
+
+ // The sequence number of the message for which this Port should send an
+ // acknowledge message. In the buffering state, holds the acknowledge request
+ // value that is forwarded to the peer on transition to proxying.
+ // This is zero in any port that's never received an acknowledge request, and
+ // in proxies that have forwarded a stored acknowledge.
+ uint64_t sequence_num_to_acknowledge;
+
+ // The queue of incoming user messages received by this Port. Only non-empty
+ // for buffering or receiving Ports. When a buffering port enters the proxying
+ // state, it flushes its queue and the proxy then bypasses the queue
+ // indefinitely.
+ //
+ // A receiving port's queue only has elements removed by user code reading
+ // messages from the port.
+ //
+ // Note that this is a priority queue which only exposes messages to consumers
+ // in strict sequential order.
+ MessageQueue message_queue;
+
+ // Buffer outgoing control messages while this port is in kBuffering state.
+ std::vector<std::pair<NodeName, ScopedEvent>> control_message_queue;
+
+ // In some edge cases, a Node may need to remember to route a single special
+ // event upon destruction of this (proxying) Port. That event is stashed here
+ // in the interim.
+ mozilla::UniquePtr<std::pair<NodeName, ScopedEvent>> send_on_proxy_removal;
+
+ // Arbitrary user data attached to the Port. In practice, Mojo uses this to
+ // stash an observer interface which can be notified about various Port state
+ // changes.
+ RefPtr<UserData> user_data;
+
+ // Indicates that this (proxying) Port has received acknowledgement that no
+ // new user messages will be routed to it. If |true|, the proxy will be
+ // removed once it has received and forwarded all sequenced messages up to and
+ // including the one numbered |last_sequence_num_to_receive|.
+ bool remove_proxy_on_last_message;
+
+ // Indicates that this Port is aware that its nearest (in terms of forward,
+ // non-zero cyclic routing distance) receiving Port has been closed.
+ bool peer_closed;
+
+ // Indicates that this Port lost its peer unexpectedly (e.g. via process death
+ // rather than receiving an ObserveClosure event). In this case
+ // |peer_closed| will be true but |last_sequence_num_to_receive| cannot be
+ // known. Such ports will continue to make message available until their
+ // message queue is empty.
+ bool peer_lost_unexpectedly;
+
+ Port(uint64_t next_sequence_num_to_send,
+ uint64_t next_sequence_num_to_receive);
+
+ Port(const Port&) = delete;
+ void operator=(const Port&) = delete;
+
+ void AssertLockAcquired() { lock_.AssertCurrentThreadOwns(); }
+
+ // Check if the given event should be handled next based on the sequence
+ // number and sender peer.
+ bool IsNextEvent(const NodeName& from_node, const Event& event);
+
+ // Get the next buffered event to be processed. If none is available, |event|
+ // will not be modified.
+ void NextEvent(NodeName* from_node, ScopedEvent* event);
+
+ // Buffer the event for later processing.
+ void BufferEvent(const NodeName& from_node, ScopedEvent event);
+
+ // Flushes the queue of events pending peer verification and returns all user
+ // events
+ void TakePendingMessages(
+ std::vector<mozilla::UniquePtr<UserMessageEvent>>& messages);
+
+ private:
+ using NodePortPair = std::pair<NodeName, PortName>;
+ using EventQueue = std::vector<mozilla::UniquePtr<Event>>;
+ std::map<NodePortPair, EventQueue> control_event_queues_;
+
+ friend class PortLocker;
+
+ ~Port();
+
+ // This lock guards all fields in Port, but is locked in a unique way which is
+ // unfortunately somewhat difficult to get to work with the thread-safety
+ // analysis.
+ detail::PortMutex lock_ MOZ_ANNOTATED;
+};
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
+
+#endif // MOJO_CORE_PORTS_PORT_H_
diff --git a/ipc/chromium/src/mojo/core/ports/port_locker.cc b/ipc/chromium/src/mojo/core/ports/port_locker.cc
new file mode 100644
index 0000000000..044a26f143
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/port_locker.cc
@@ -0,0 +1,75 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/core/ports/port_locker.h"
+
+#include <algorithm>
+
+#include "mojo/core/ports/port.h"
+
+#ifdef DEBUG
+# include "base/thread_local.h"
+#endif
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+namespace {
+
+#ifdef DEBUG
+void UpdateTLS(PortLocker* old_locker, PortLocker* new_locker) {
+ // Sanity check when DCHECK is on to make sure there is only ever one
+ // PortLocker extant on the current thread.
+ static auto* tls = new base::ThreadLocalPointer<PortLocker>();
+ DCHECK_EQ(old_locker, tls->Get());
+ tls->Set(new_locker);
+}
+#endif
+
+} // namespace
+
+PortLocker::PortLocker(const PortRef** port_refs, size_t num_ports)
+ : port_refs_(port_refs), num_ports_(num_ports) {
+#ifdef DEBUG
+ UpdateTLS(nullptr, this);
+#endif
+
+ // Sort the ports by address to lock them in a globally consistent order.
+ std::sort(
+ port_refs_, port_refs_ + num_ports_,
+ [](const PortRef* a, const PortRef* b) { return a->port() < b->port(); });
+ for (size_t i = 0; i < num_ports_; ++i) {
+ // TODO(crbug.com/725605): Remove this CHECK.
+ CHECK(port_refs_[i]->port());
+ port_refs_[i]->port()->lock_.Lock();
+ }
+}
+
+PortLocker::~PortLocker() {
+ for (size_t i = 0; i < num_ports_; ++i) {
+ port_refs_[i]->port()->lock_.Unlock();
+ }
+
+#ifdef DEBUG
+ UpdateTLS(this, nullptr);
+#endif
+}
+
+#ifdef DEBUG
+// static
+void PortLocker::AssertNoPortsLockedOnCurrentThread() {
+ // Forces a DCHECK if the TLS PortLocker is anything other than null.
+ UpdateTLS(nullptr, nullptr);
+}
+#endif
+
+SinglePortLocker::SinglePortLocker(const PortRef* port_ref)
+ : port_ref_(port_ref), locker_(&port_ref_, 1) {}
+
+SinglePortLocker::~SinglePortLocker() = default;
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
diff --git a/ipc/chromium/src/mojo/core/ports/port_locker.h b/ipc/chromium/src/mojo/core/ports/port_locker.h
new file mode 100644
index 0000000000..6a88d14912
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/port_locker.h
@@ -0,0 +1,90 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_CORE_PORTS_PORT_LOCKER_H_
+#define MOJO_CORE_PORTS_PORT_LOCKER_H_
+
+#include <stdint.h>
+
+#include "base/logging.h"
+#include "mojo/core/ports/port_ref.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+class Port;
+class PortRef;
+
+// A helper which must be used to acquire individual Port locks. Any given
+// thread may have at most one of these alive at any time. This ensures that
+// when multiple ports are locked, they're locked in globally consistent order.
+//
+// Port locks are acquired upon construction of this object and released upon
+// destruction.
+class PortLocker {
+ public:
+ // Constructs a PortLocker over a sequence of |num_ports| contiguous
+ // |PortRef*|s. The sequence may be reordered by this constructor, and upon
+ // return, all referenced ports' locks are held.
+ PortLocker(const PortRef** port_refs, size_t num_ports);
+ ~PortLocker();
+
+ PortLocker(const PortLocker&) = delete;
+ void operator=(const PortLocker&) = delete;
+
+ // Provides safe access to a PortRef's Port. Note that in release builds this
+ // doesn't do anything other than pass through to the private accessor on
+ // |port_ref|, but it does force callers to go through a PortLocker to get to
+ // the state, thus minimizing the likelihood that they'll go and do something
+ // stupid.
+ Port* GetPort(const PortRef& port_ref) const {
+#ifdef DEBUG
+ // Sanity check when DEBUG is on to ensure this is actually a port whose
+ // lock is held by this PortLocker.
+ bool is_port_locked = false;
+ for (size_t i = 0; i < num_ports_ && !is_port_locked; ++i) {
+ if (port_refs_[i]->port() == port_ref.port()) {
+ is_port_locked = true;
+ }
+ }
+ DCHECK(is_port_locked);
+#endif
+ return port_ref.port();
+ }
+
+// A helper which can be used to verify that no Port locks are held on the
+// current thread. In non-DCHECK builds this is a no-op.
+#ifdef DEBUG
+ static void AssertNoPortsLockedOnCurrentThread();
+#else
+ static void AssertNoPortsLockedOnCurrentThread() {}
+#endif
+
+ private:
+ const PortRef** const port_refs_;
+ const size_t num_ports_;
+};
+
+// Convenience wrapper for a PortLocker that locks a single port.
+class SinglePortLocker {
+ public:
+ explicit SinglePortLocker(const PortRef* port_ref);
+ ~SinglePortLocker();
+
+ SinglePortLocker(const SinglePortLocker&) = delete;
+ void operator=(const SinglePortLocker&) = delete;
+
+ Port* port() const { return locker_.GetPort(*port_ref_); }
+
+ private:
+ const PortRef* port_ref_;
+ PortLocker locker_;
+};
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
+
+#endif // MOJO_CORE_PORTS_PORT_LOCKER_H_
diff --git a/ipc/chromium/src/mojo/core/ports/port_ref.cc b/ipc/chromium/src/mojo/core/ports/port_ref.cc
new file mode 100644
index 0000000000..b9267f8a6a
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/port_ref.cc
@@ -0,0 +1,30 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/core/ports/port_ref.h"
+
+#include "mojo/core/ports/port.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+PortRef::~PortRef() = default;
+
+PortRef::PortRef() = default;
+
+PortRef::PortRef(const PortName& name, RefPtr<Port> port)
+ : name_(name), port_(std::move(port)) {}
+
+PortRef::PortRef(const PortRef& other) = default;
+
+PortRef::PortRef(PortRef&& other) = default;
+
+PortRef& PortRef::operator=(const PortRef& other) = default;
+
+PortRef& PortRef::operator=(PortRef&& other) = default;
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
diff --git a/ipc/chromium/src/mojo/core/ports/port_ref.h b/ipc/chromium/src/mojo/core/ports/port_ref.h
new file mode 100644
index 0000000000..0ff0d32fc7
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/port_ref.h
@@ -0,0 +1,47 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_CORE_PORTS_PORT_REF_H_
+#define MOJO_CORE_PORTS_PORT_REF_H_
+
+#include "mojo/core/ports/name.h"
+#include "mozilla/RefPtr.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+class Port;
+class PortLocker;
+
+class PortRef {
+ public:
+ ~PortRef();
+ PortRef();
+ PortRef(const PortName& name, RefPtr<Port> port);
+
+ PortRef(const PortRef& other);
+ PortRef(PortRef&& other);
+
+ PortRef& operator=(const PortRef& other);
+ PortRef& operator=(PortRef&& other);
+
+ const PortName& name() const { return name_; }
+
+ bool is_valid() const { return !!port_; }
+
+ private:
+ friend class PortLocker;
+
+ Port* port() const { return port_.get(); }
+
+ PortName name_;
+ RefPtr<Port> port_;
+};
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
+
+#endif // MOJO_CORE_PORTS_PORT_REF_H_
diff --git a/ipc/chromium/src/mojo/core/ports/user_data.h b/ipc/chromium/src/mojo/core/ports/user_data.h
new file mode 100644
index 0000000000..f3d568c259
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/user_data.h
@@ -0,0 +1,25 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_CORE_PORTS_USER_DATA_H_
+#define MOJO_CORE_PORTS_USER_DATA_H_
+
+#include "nsISupportsImpl.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+class UserData {
+ NS_INLINE_DECL_THREADSAFE_REFCOUNTING(UserData);
+
+ protected:
+ virtual ~UserData() = default;
+};
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
+
+#endif // MOJO_CORE_PORTS_USER_DATA_H_
diff --git a/ipc/chromium/src/mojo/core/ports/user_message.cc b/ipc/chromium/src/mojo/core/ports/user_message.cc
new file mode 100644
index 0000000000..1aadb21425
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/user_message.cc
@@ -0,0 +1,21 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/core/ports/user_message.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+UserMessage::UserMessage(const TypeInfo* type_info) : type_info_(type_info) {}
+
+UserMessage::~UserMessage() = default;
+
+bool UserMessage::WillBeRoutedExternally(UserMessageEvent&) { return true; }
+
+size_t UserMessage::GetSizeIfSerialized() const { return 0; }
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
diff --git a/ipc/chromium/src/mojo/core/ports/user_message.h b/ipc/chromium/src/mojo/core/ports/user_message.h
new file mode 100644
index 0000000000..85c03a9557
--- /dev/null
+++ b/ipc/chromium/src/mojo/core/ports/user_message.h
@@ -0,0 +1,62 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_CORE_PORTS_USER_MESSAGE_H_
+#define MOJO_CORE_PORTS_USER_MESSAGE_H_
+
+#include <stddef.h>
+
+namespace mojo {
+namespace core {
+namespace ports {
+
+class UserMessageEvent;
+
+// Base type to use for any embedder-defined user message implementation. This
+// class is intentionally empty.
+//
+// Provides a bit of type-safety help to subclasses since by design downcasting
+// from this type is a common operation in embedders.
+//
+// Each subclass should define a static const instance of TypeInfo named
+// |kUserMessageTypeInfo| and pass its address down to the UserMessage
+// constructor. The type of a UserMessage can then be dynamically inspected by
+// comparing |type_info()| to any subclass's |&kUserMessageTypeInfo|.
+class UserMessage {
+ public:
+ struct TypeInfo {};
+
+ explicit UserMessage(const TypeInfo* type_info);
+ virtual ~UserMessage();
+
+ UserMessage(const UserMessage&) = delete;
+ void operator=(const UserMessage&) = delete;
+
+ const TypeInfo* type_info() const { return type_info_; }
+
+ // Invoked immediately before the system asks the embedder to forward this
+ // message to an external node.
+ //
+ // The UserMessageEvent is passed in to allow ports and other such values to
+ // be attached to the message before it is sent externally, in case late
+ // serialization is performed.
+ //
+ // Returns |true| if the message is OK to route externally, or |false|
+ // otherwise. Returning |false| implies an unrecoverable condition, and the
+ // message event will be destroyed without further routing.
+ virtual bool WillBeRoutedExternally(UserMessageEvent& event);
+
+ // Returns the size in bytes of this message iff it's serialized. Zero
+ // otherwise.
+ virtual size_t GetSizeIfSerialized() const;
+
+ private:
+ const TypeInfo* const type_info_;
+};
+
+} // namespace ports
+} // namespace core
+} // namespace mojo
+
+#endif // MOJO_CORE_PORTS_USER_MESSAGE_H_