diff options
Diffstat (limited to 'ipc/chromium/src/mojo')
19 files changed, 3945 insertions, 0 deletions
diff --git a/ipc/chromium/src/mojo/core/ports/event.cc b/ipc/chromium/src/mojo/core/ports/event.cc new file mode 100644 index 0000000000..32e0bbbaa8 --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/event.cc @@ -0,0 +1,471 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/core/ports/event.h" + +#include <stdint.h> +#include <string.h> + +#include "base/logging.h" +#include "mojo/core/ports/user_message.h" +#include "mozilla/Assertions.h" +#include "mozilla/CheckedInt.h" + +namespace mojo { +namespace core { +namespace ports { + +namespace { + +const size_t kPortsMessageAlignment = 8; + +#pragma pack(push, 1) + +struct SerializedHeader { + Event::Type type; + uint32_t padding; + PortName port_name; +}; + +struct UserMessageEventData { + uint64_t sequence_num; + uint32_t num_ports; + uint32_t padding; +}; + +struct ObserveProxyEventData { + NodeName proxy_node_name; + PortName proxy_port_name; + NodeName proxy_target_node_name; + PortName proxy_target_port_name; +}; + +struct ObserveProxyAckEventData { + uint64_t last_sequence_num; +}; + +struct ObserveClosureEventData { + uint64_t last_sequence_num; +}; + +struct MergePortEventData { + PortName new_port_name; + Event::PortDescriptor new_port_descriptor; +}; + +struct UserMessageReadAckRequestEventData { + uint64_t sequence_num_to_acknowledge; +}; + +struct UserMessageReadAckEventData { + uint64_t sequence_num_acknowledged; +}; + +#pragma pack(pop) + +static_assert(sizeof(Event::PortDescriptor) % kPortsMessageAlignment == 0, + "Invalid PortDescriptor size."); + +static_assert(sizeof(SerializedHeader) % kPortsMessageAlignment == 0, + "Invalid SerializedHeader size."); + +static_assert(sizeof(UserMessageEventData) % kPortsMessageAlignment == 0, + "Invalid UserEventData size."); + +static_assert(sizeof(ObserveProxyEventData) % kPortsMessageAlignment == 0, + "Invalid ObserveProxyEventData size."); + +static_assert(sizeof(ObserveProxyAckEventData) % kPortsMessageAlignment == 0, + "Invalid ObserveProxyAckEventData size."); + +static_assert(sizeof(ObserveClosureEventData) % kPortsMessageAlignment == 0, + "Invalid ObserveClosureEventData size."); + +static_assert(sizeof(MergePortEventData) % kPortsMessageAlignment == 0, + "Invalid MergePortEventData size."); + +static_assert(sizeof(UserMessageReadAckRequestEventData) % + kPortsMessageAlignment == + 0, + "Invalid UserMessageReadAckRequestEventData size."); + +static_assert(sizeof(UserMessageReadAckEventData) % kPortsMessageAlignment == 0, + "Invalid UserMessageReadAckEventData size."); + +} // namespace + +Event::PortDescriptor::PortDescriptor() { memset(padding, 0, sizeof(padding)); } + +Event::~Event() = default; + +// static +ScopedEvent Event::Deserialize(const void* buffer, size_t num_bytes) { + if (num_bytes < sizeof(SerializedHeader)) { + return nullptr; + } + + const auto* header = static_cast<const SerializedHeader*>(buffer); + const PortName& port_name = header->port_name; + const size_t data_size = num_bytes - sizeof(*header); + switch (header->type) { + case Type::kUserMessage: + return UserMessageEvent::Deserialize(port_name, header + 1, data_size); + case Type::kPortAccepted: + return PortAcceptedEvent::Deserialize(port_name, header + 1, data_size); + case Type::kObserveProxy: + return ObserveProxyEvent::Deserialize(port_name, header + 1, data_size); + case Type::kObserveProxyAck: + return ObserveProxyAckEvent::Deserialize(port_name, header + 1, + data_size); + case Type::kObserveClosure: + return ObserveClosureEvent::Deserialize(port_name, header + 1, data_size); + case Type::kMergePort: + return MergePortEvent::Deserialize(port_name, header + 1, data_size); + case Type::kUserMessageReadAckRequest: + return UserMessageReadAckRequestEvent::Deserialize(port_name, header + 1, + data_size); + case Type::kUserMessageReadAck: + return UserMessageReadAckEvent::Deserialize(port_name, header + 1, + data_size); + default: + DVLOG(2) << "Ingoring unknown port event type: " + << static_cast<uint32_t>(header->type); + return nullptr; + } +} + +Event::Event(Type type, const PortName& port_name) + : type_(type), port_name_(port_name) {} + +size_t Event::GetSerializedSize() const { + return sizeof(SerializedHeader) + GetSerializedDataSize(); +} + +void Event::Serialize(void* buffer) const { + auto* header = static_cast<SerializedHeader*>(buffer); + header->type = type_; + header->padding = 0; + header->port_name = port_name_; + SerializeData(header + 1); +} + +ScopedEvent Event::Clone() const { return nullptr; } + +UserMessageEvent::~UserMessageEvent() = default; + +UserMessageEvent::UserMessageEvent(size_t num_ports) + : Event(Type::kUserMessage, kInvalidPortName) { + ReservePorts(num_ports); +} + +void UserMessageEvent::AttachMessage(mozilla::UniquePtr<UserMessage> message) { + DCHECK(!message_); + message_ = std::move(message); +} + +void UserMessageEvent::ReservePorts(size_t num_ports) { + port_descriptors_.resize(num_ports); + ports_.resize(num_ports); +} + +bool UserMessageEvent::NotifyWillBeRoutedExternally() { + DCHECK(message_); + return message_->WillBeRoutedExternally(*this); +} + +// static +ScopedEvent UserMessageEvent::Deserialize(const PortName& port_name, + const void* buffer, + size_t num_bytes) { + if (num_bytes < sizeof(UserMessageEventData)) { + return nullptr; + } + + const auto* data = static_cast<const UserMessageEventData*>(buffer); + mozilla::CheckedInt<size_t> port_data_size = data->num_ports; + port_data_size *= sizeof(PortDescriptor) + sizeof(PortName); + if (!port_data_size.isValid()) { + return nullptr; + } + + mozilla::CheckedInt<size_t> total_size = port_data_size.value(); + total_size += sizeof(UserMessageEventData); + if (!total_size.isValid() || num_bytes < total_size.value()) { + return nullptr; + } + + auto event = + mozilla::WrapUnique(new UserMessageEvent(port_name, data->sequence_num)); + event->ReservePorts(data->num_ports); + const auto* in_descriptors = + reinterpret_cast<const PortDescriptor*>(data + 1); + std::copy(in_descriptors, in_descriptors + data->num_ports, + event->port_descriptors()); + + const auto* in_names = + reinterpret_cast<const PortName*>(in_descriptors + data->num_ports); + std::copy(in_names, in_names + data->num_ports, event->ports()); + return event; +} + +UserMessageEvent::UserMessageEvent(const PortName& port_name, + uint64_t sequence_num) + : Event(Type::kUserMessage, port_name), sequence_num_(sequence_num) {} + +size_t UserMessageEvent::GetSizeIfSerialized() const { + if (!message_) { + return 0; + } + return message_->GetSizeIfSerialized(); +} + +size_t UserMessageEvent::GetSerializedDataSize() const { + DCHECK_EQ(ports_.size(), port_descriptors_.size()); + mozilla::CheckedInt<size_t> size = sizeof(UserMessageEventData); + mozilla::CheckedInt<size_t> ports_size = + sizeof(PortDescriptor) + sizeof(PortName); + ports_size *= ports_.size(); + mozilla::CheckedInt<size_t> combined = size + ports_size; + MOZ_RELEASE_ASSERT(combined.isValid()); + return combined.value(); +} + +void UserMessageEvent::SerializeData(void* buffer) const { + DCHECK_EQ(ports_.size(), port_descriptors_.size()); + auto* data = static_cast<UserMessageEventData*>(buffer); + data->sequence_num = sequence_num_; + mozilla::CheckedInt<uint32_t> num_ports{ports_.size()}; + DCHECK(num_ports.isValid()); + data->num_ports = num_ports.value(); + data->padding = 0; + + auto* ports_data = reinterpret_cast<PortDescriptor*>(data + 1); + std::copy(port_descriptors_.begin(), port_descriptors_.end(), ports_data); + + auto* port_names_data = + reinterpret_cast<PortName*>(ports_data + ports_.size()); + std::copy(ports_.begin(), ports_.end(), port_names_data); +} + +PortAcceptedEvent::PortAcceptedEvent(const PortName& port_name) + : Event(Type::kPortAccepted, port_name) {} + +PortAcceptedEvent::~PortAcceptedEvent() = default; + +// static +ScopedEvent PortAcceptedEvent::Deserialize(const PortName& port_name, + const void* buffer, + size_t num_bytes) { + return mozilla::MakeUnique<PortAcceptedEvent>(port_name); +} + +size_t PortAcceptedEvent::GetSerializedDataSize() const { return 0; } + +void PortAcceptedEvent::SerializeData(void* buffer) const {} + +ObserveProxyEvent::ObserveProxyEvent(const PortName& port_name, + const NodeName& proxy_node_name, + const PortName& proxy_port_name, + const NodeName& proxy_target_node_name, + const PortName& proxy_target_port_name) + : Event(Type::kObserveProxy, port_name), + proxy_node_name_(proxy_node_name), + proxy_port_name_(proxy_port_name), + proxy_target_node_name_(proxy_target_node_name), + proxy_target_port_name_(proxy_target_port_name) {} + +ObserveProxyEvent::~ObserveProxyEvent() = default; + +// static +ScopedEvent ObserveProxyEvent::Deserialize(const PortName& port_name, + const void* buffer, + size_t num_bytes) { + if (num_bytes < sizeof(ObserveProxyEventData)) { + return nullptr; + } + + const auto* data = static_cast<const ObserveProxyEventData*>(buffer); + return mozilla::MakeUnique<ObserveProxyEvent>( + port_name, data->proxy_node_name, data->proxy_port_name, + data->proxy_target_node_name, data->proxy_target_port_name); +} + +size_t ObserveProxyEvent::GetSerializedDataSize() const { + return sizeof(ObserveProxyEventData); +} + +void ObserveProxyEvent::SerializeData(void* buffer) const { + auto* data = static_cast<ObserveProxyEventData*>(buffer); + data->proxy_node_name = proxy_node_name_; + data->proxy_port_name = proxy_port_name_; + data->proxy_target_node_name = proxy_target_node_name_; + data->proxy_target_port_name = proxy_target_port_name_; +} + +ScopedEvent ObserveProxyEvent::Clone() const { + return mozilla::MakeUnique<ObserveProxyEvent>( + port_name(), proxy_node_name_, proxy_port_name_, proxy_target_node_name_, + proxy_target_port_name_); +} + +ObserveProxyAckEvent::ObserveProxyAckEvent(const PortName& port_name, + uint64_t last_sequence_num) + : Event(Type::kObserveProxyAck, port_name), + last_sequence_num_(last_sequence_num) {} + +ObserveProxyAckEvent::~ObserveProxyAckEvent() = default; + +// static +ScopedEvent ObserveProxyAckEvent::Deserialize(const PortName& port_name, + const void* buffer, + size_t num_bytes) { + if (num_bytes < sizeof(ObserveProxyAckEventData)) { + return nullptr; + } + + const auto* data = static_cast<const ObserveProxyAckEventData*>(buffer); + return mozilla::MakeUnique<ObserveProxyAckEvent>(port_name, + data->last_sequence_num); +} + +size_t ObserveProxyAckEvent::GetSerializedDataSize() const { + return sizeof(ObserveProxyAckEventData); +} + +void ObserveProxyAckEvent::SerializeData(void* buffer) const { + auto* data = static_cast<ObserveProxyAckEventData*>(buffer); + data->last_sequence_num = last_sequence_num_; +} + +ScopedEvent ObserveProxyAckEvent::Clone() const { + return mozilla::MakeUnique<ObserveProxyAckEvent>(port_name(), + last_sequence_num_); +} + +ObserveClosureEvent::ObserveClosureEvent(const PortName& port_name, + uint64_t last_sequence_num) + : Event(Type::kObserveClosure, port_name), + last_sequence_num_(last_sequence_num) {} + +ObserveClosureEvent::~ObserveClosureEvent() = default; + +// static +ScopedEvent ObserveClosureEvent::Deserialize(const PortName& port_name, + const void* buffer, + size_t num_bytes) { + if (num_bytes < sizeof(ObserveClosureEventData)) { + return nullptr; + } + + const auto* data = static_cast<const ObserveClosureEventData*>(buffer); + return mozilla::MakeUnique<ObserveClosureEvent>(port_name, + data->last_sequence_num); +} + +size_t ObserveClosureEvent::GetSerializedDataSize() const { + return sizeof(ObserveClosureEventData); +} + +void ObserveClosureEvent::SerializeData(void* buffer) const { + auto* data = static_cast<ObserveClosureEventData*>(buffer); + data->last_sequence_num = last_sequence_num_; +} + +ScopedEvent ObserveClosureEvent::Clone() const { + return mozilla::MakeUnique<ObserveClosureEvent>(port_name(), + last_sequence_num_); +} + +MergePortEvent::MergePortEvent(const PortName& port_name, + const PortName& new_port_name, + const PortDescriptor& new_port_descriptor) + : Event(Type::kMergePort, port_name), + new_port_name_(new_port_name), + new_port_descriptor_(new_port_descriptor) {} + +MergePortEvent::~MergePortEvent() = default; + +// static +ScopedEvent MergePortEvent::Deserialize(const PortName& port_name, + const void* buffer, size_t num_bytes) { + if (num_bytes < sizeof(MergePortEventData)) { + return nullptr; + } + + const auto* data = static_cast<const MergePortEventData*>(buffer); + return mozilla::MakeUnique<MergePortEvent>(port_name, data->new_port_name, + data->new_port_descriptor); +} + +size_t MergePortEvent::GetSerializedDataSize() const { + return sizeof(MergePortEventData); +} + +void MergePortEvent::SerializeData(void* buffer) const { + auto* data = static_cast<MergePortEventData*>(buffer); + data->new_port_name = new_port_name_; + data->new_port_descriptor = new_port_descriptor_; +} + +UserMessageReadAckRequestEvent::UserMessageReadAckRequestEvent( + const PortName& port_name, uint64_t sequence_num_to_acknowledge) + : Event(Type::kUserMessageReadAckRequest, port_name), + sequence_num_to_acknowledge_(sequence_num_to_acknowledge) {} + +UserMessageReadAckRequestEvent::~UserMessageReadAckRequestEvent() = default; + +// static +ScopedEvent UserMessageReadAckRequestEvent::Deserialize( + const PortName& port_name, const void* buffer, size_t num_bytes) { + if (num_bytes < sizeof(UserMessageReadAckRequestEventData)) { + return nullptr; + } + + const auto* data = + static_cast<const UserMessageReadAckRequestEventData*>(buffer); + return mozilla::MakeUnique<UserMessageReadAckRequestEvent>( + port_name, data->sequence_num_to_acknowledge); +} + +size_t UserMessageReadAckRequestEvent::GetSerializedDataSize() const { + return sizeof(UserMessageReadAckRequestEventData); +} + +void UserMessageReadAckRequestEvent::SerializeData(void* buffer) const { + auto* data = static_cast<UserMessageReadAckRequestEventData*>(buffer); + data->sequence_num_to_acknowledge = sequence_num_to_acknowledge_; +} + +UserMessageReadAckEvent::UserMessageReadAckEvent( + const PortName& port_name, uint64_t sequence_num_acknowledged) + : Event(Type::kUserMessageReadAck, port_name), + sequence_num_acknowledged_(sequence_num_acknowledged) {} + +UserMessageReadAckEvent::~UserMessageReadAckEvent() = default; + +// static +ScopedEvent UserMessageReadAckEvent::Deserialize(const PortName& port_name, + const void* buffer, + size_t num_bytes) { + if (num_bytes < sizeof(UserMessageReadAckEventData)) { + return nullptr; + } + + const auto* data = static_cast<const UserMessageReadAckEventData*>(buffer); + return mozilla::MakeUnique<UserMessageReadAckEvent>( + port_name, data->sequence_num_acknowledged); +} + +size_t UserMessageReadAckEvent::GetSerializedDataSize() const { + return sizeof(UserMessageReadAckEventData); +} + +void UserMessageReadAckEvent::SerializeData(void* buffer) const { + auto* data = static_cast<UserMessageReadAckEventData*>(buffer); + data->sequence_num_acknowledged = sequence_num_acknowledged_; +} + +} // namespace ports +} // namespace core +} // namespace mojo diff --git a/ipc/chromium/src/mojo/core/ports/event.h b/ipc/chromium/src/mojo/core/ports/event.h new file mode 100644 index 0000000000..17b330c27f --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/event.h @@ -0,0 +1,328 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_CORE_PORTS_EVENT_H_ +#define MOJO_CORE_PORTS_EVENT_H_ + +#include <stdint.h> + +#include <memory> +#include <vector> + +#include "mojo/core/ports/name.h" +#include "mojo/core/ports/user_message.h" + +namespace mojo { +namespace core { +namespace ports { + +class Event; + +using ScopedEvent = mozilla::UniquePtr<Event>; + +// A Event is the fundamental unit of operation and communication within and +// between Nodes. +class Event { + public: + enum Type : uint32_t { + // A user message event contains arbitrary user-specified payload data + // which may include any number of ports and/or system handles (e.g. FDs). + kUserMessage, + + // When a Node receives a user message with one or more ports attached, it + // sends back an instance of this event for every attached port to indicate + // that the port has been accepted by its destination node. + kPortAccepted, + + // This event begins circulation any time a port enters a proxying state. It + // may be re-circulated in certain edge cases, but the ultimate purpose of + // the event is to ensure that every port along a route is (if necessary) + // aware that the proxying port is indeed proxying (and to where) so that it + // can begin to be bypassed along the route. + kObserveProxy, + + // An event used to acknowledge to a proxy that all concerned nodes and + // ports are aware of its proxying state and that no more user messages will + // be routed to it beyond a given final sequence number. + kObserveProxyAck, + + // Indicates that a port has been closed. This event fully circulates a + // route to ensure that all ports are aware of closure. + kObserveClosure, + + // Used to request the merging of two routes via two sacrificial receiving + // ports, one from each route. + kMergePort, + + // Used to request that the conjugate port acknowledges read messages by + // sending back a UserMessageReadAck. + kUserMessageReadAckRequest, + + // Used to acknowledge read messages to the conjugate. + kUserMessageReadAck, + }; + +#pragma pack(push, 1) + struct PortDescriptor { + PortDescriptor(); + + NodeName peer_node_name; + PortName peer_port_name; + NodeName referring_node_name; + PortName referring_port_name; + uint64_t next_sequence_num_to_send; + uint64_t next_sequence_num_to_receive; + uint64_t last_sequence_num_to_receive; + bool peer_closed; + char padding[7]; + }; +#pragma pack(pop) + virtual ~Event(); + + static ScopedEvent Deserialize(const void* buffer, size_t num_bytes); + + template <typename T> + static mozilla::UniquePtr<T> Cast(ScopedEvent* event) { + return mozilla::WrapUnique(static_cast<T*>(event->release())); + } + + Type type() const { return type_; } + const PortName& port_name() const { return port_name_; } + void set_port_name(const PortName& port_name) { port_name_ = port_name; } + + size_t GetSerializedSize() const; + void Serialize(void* buffer) const; + virtual ScopedEvent Clone() const; + + protected: + Event(Type type, const PortName& port_name); + + virtual size_t GetSerializedDataSize() const = 0; + virtual void SerializeData(void* buffer) const = 0; + + private: + const Type type_; + PortName port_name_; + + DISALLOW_COPY_AND_ASSIGN(Event); +}; + +class UserMessageEvent : public Event { + public: + explicit UserMessageEvent(size_t num_ports); + ~UserMessageEvent() override; + + bool HasMessage() const { return !!message_; } + void AttachMessage(mozilla::UniquePtr<UserMessage> message); + + template <typename T> + T* GetMessage() { + DCHECK(HasMessage()); + DCHECK_EQ(&T::kUserMessageTypeInfo, message_->type_info()); + return static_cast<T*>(message_.get()); + } + + template <typename T> + const T* GetMessage() const { + DCHECK(HasMessage()); + DCHECK_EQ(&T::kUserMessageTypeInfo, message_->type_info()); + return static_cast<const T*>(message_.get()); + } + + template <typename T> + mozilla::UniquePtr<T> TakeMessage() { + DCHECK(HasMessage()); + DCHECK_EQ(&T::kUserMessageTypeInfo, message_->type_info()); + return mozilla::UniquePtr<T>{static_cast<T*>(message_.release())}; + } + + void ReservePorts(size_t num_ports); + bool NotifyWillBeRoutedExternally(); + + uint64_t sequence_num() const { return sequence_num_; } + void set_sequence_num(uint64_t sequence_num) { sequence_num_ = sequence_num; } + + size_t num_ports() const { return ports_.size(); } + PortDescriptor* port_descriptors() { return port_descriptors_.data(); } + PortName* ports() { return ports_.data(); } + + static ScopedEvent Deserialize(const PortName& port_name, const void* buffer, + size_t num_bytes); + + size_t GetSizeIfSerialized() const; + + private: + UserMessageEvent(const PortName& port_name, uint64_t sequence_num); + + size_t GetSerializedDataSize() const override; + void SerializeData(void* buffer) const override; + + uint64_t sequence_num_ = 0; + std::vector<PortDescriptor> port_descriptors_; + std::vector<PortName> ports_; + mozilla::UniquePtr<UserMessage> message_; + + DISALLOW_COPY_AND_ASSIGN(UserMessageEvent); +}; + +class PortAcceptedEvent : public Event { + public: + explicit PortAcceptedEvent(const PortName& port_name); + ~PortAcceptedEvent() override; + + static ScopedEvent Deserialize(const PortName& port_name, const void* buffer, + size_t num_bytes); + + private: + size_t GetSerializedDataSize() const override; + void SerializeData(void* buffer) const override; + + DISALLOW_COPY_AND_ASSIGN(PortAcceptedEvent); +}; + +class ObserveProxyEvent : public Event { + public: + ObserveProxyEvent(const PortName& port_name, const NodeName& proxy_node_name, + const PortName& proxy_port_name, + const NodeName& proxy_target_node_name, + const PortName& proxy_target_port_name); + ~ObserveProxyEvent() override; + + const NodeName& proxy_node_name() const { return proxy_node_name_; } + const PortName& proxy_port_name() const { return proxy_port_name_; } + const NodeName& proxy_target_node_name() const { + return proxy_target_node_name_; + } + const PortName& proxy_target_port_name() const { + return proxy_target_port_name_; + } + + static ScopedEvent Deserialize(const PortName& port_name, const void* buffer, + size_t num_bytes); + + private: + size_t GetSerializedDataSize() const override; + void SerializeData(void* buffer) const override; + ScopedEvent Clone() const override; + + const NodeName proxy_node_name_; + const PortName proxy_port_name_; + const NodeName proxy_target_node_name_; + const PortName proxy_target_port_name_; + + DISALLOW_COPY_AND_ASSIGN(ObserveProxyEvent); +}; + +class ObserveProxyAckEvent : public Event { + public: + ObserveProxyAckEvent(const PortName& port_name, uint64_t last_sequence_num); + ~ObserveProxyAckEvent() override; + + uint64_t last_sequence_num() const { return last_sequence_num_; } + + static ScopedEvent Deserialize(const PortName& port_name, const void* buffer, + size_t num_bytes); + + private: + size_t GetSerializedDataSize() const override; + void SerializeData(void* buffer) const override; + ScopedEvent Clone() const override; + + const uint64_t last_sequence_num_; + + DISALLOW_COPY_AND_ASSIGN(ObserveProxyAckEvent); +}; + +class ObserveClosureEvent : public Event { + public: + ObserveClosureEvent(const PortName& port_name, uint64_t last_sequence_num); + ~ObserveClosureEvent() override; + + uint64_t last_sequence_num() const { return last_sequence_num_; } + void set_last_sequence_num(uint64_t last_sequence_num) { + last_sequence_num_ = last_sequence_num; + } + + static ScopedEvent Deserialize(const PortName& port_name, const void* buffer, + size_t num_bytes); + + private: + size_t GetSerializedDataSize() const override; + void SerializeData(void* buffer) const override; + ScopedEvent Clone() const override; + + uint64_t last_sequence_num_; + + DISALLOW_COPY_AND_ASSIGN(ObserveClosureEvent); +}; + +class MergePortEvent : public Event { + public: + MergePortEvent(const PortName& port_name, const PortName& new_port_name, + const PortDescriptor& new_port_descriptor); + ~MergePortEvent() override; + + const PortName& new_port_name() const { return new_port_name_; } + const PortDescriptor& new_port_descriptor() const { + return new_port_descriptor_; + } + + static ScopedEvent Deserialize(const PortName& port_name, const void* buffer, + size_t num_bytes); + + private: + size_t GetSerializedDataSize() const override; + void SerializeData(void* buffer) const override; + + const PortName new_port_name_; + const PortDescriptor new_port_descriptor_; + + DISALLOW_COPY_AND_ASSIGN(MergePortEvent); +}; + +class UserMessageReadAckRequestEvent : public Event { + public: + UserMessageReadAckRequestEvent(const PortName& port_name, + uint64_t sequence_num_to_acknowledge); + ~UserMessageReadAckRequestEvent() override; + + uint64_t sequence_num_to_acknowledge() const { + return sequence_num_to_acknowledge_; + } + + static ScopedEvent Deserialize(const PortName& port_name, const void* buffer, + size_t num_bytes); + + private: + size_t GetSerializedDataSize() const override; + void SerializeData(void* buffer) const override; + + uint64_t sequence_num_to_acknowledge_; +}; + +class UserMessageReadAckEvent : public Event { + public: + UserMessageReadAckEvent(const PortName& port_name, + uint64_t sequence_num_acknowledged); + ~UserMessageReadAckEvent() override; + + uint64_t sequence_num_acknowledged() const { + return sequence_num_acknowledged_; + } + + static ScopedEvent Deserialize(const PortName& port_name, const void* buffer, + size_t num_bytes); + + private: + size_t GetSerializedDataSize() const override; + void SerializeData(void* buffer) const override; + + uint64_t sequence_num_acknowledged_; +}; + +} // namespace ports +} // namespace core +} // namespace mojo + +#endif // MOJO_CORE_PORTS_EVENT_H_ diff --git a/ipc/chromium/src/mojo/core/ports/message_filter.h b/ipc/chromium/src/mojo/core/ports/message_filter.h new file mode 100644 index 0000000000..21f8c1c869 --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/message_filter.h @@ -0,0 +1,29 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_CORE_PORTS_MESSAGE_FILTER_H_ +#define MOJO_CORE_PORTS_MESSAGE_FILTER_H_ + +namespace mojo { +namespace core { +namespace ports { + +class UserMessageEvent; + +// An interface which can be implemented to user message events according to +// arbitrary policy. +class MessageFilter { + public: + virtual ~MessageFilter() = default; + + // Returns true if |message| should be accepted by whomever is applying this + // filter. See MessageQueue::GetNextMessage(), for example. + virtual bool Match(const UserMessageEvent& message) = 0; +}; + +} // namespace ports +} // namespace core +} // namespace mojo + +#endif // MOJO_CORE_PORTS_MESSAGE_FILTER_H_ diff --git a/ipc/chromium/src/mojo/core/ports/message_queue.cc b/ipc/chromium/src/mojo/core/ports/message_queue.cc new file mode 100644 index 0000000000..0e785bec2b --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/message_queue.cc @@ -0,0 +1,97 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/core/ports/message_queue.h" + +#include <algorithm> + +#include "base/compiler_specific.h" +#include "base/logging.h" +#include "mojo/core/ports/message_filter.h" +#include "mozilla/Likely.h" + +namespace mojo { +namespace core { +namespace ports { + +// Used by std::{push,pop}_heap functions +inline bool operator<(const mozilla::UniquePtr<UserMessageEvent>& a, + const mozilla::UniquePtr<UserMessageEvent>& b) { + return a->sequence_num() > b->sequence_num(); +} + +MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {} + +MessageQueue::MessageQueue(uint64_t next_sequence_num) + : next_sequence_num_(next_sequence_num) { + // The message queue is blocked waiting for a message with sequence number + // equal to |next_sequence_num|. +} + +MessageQueue::~MessageQueue() { +#ifdef DEBUG + size_t num_leaked_ports = 0; + for (const auto& message : heap_) { + num_leaked_ports += message->num_ports(); + } + if (num_leaked_ports > 0) { + DVLOG(1) << "Leaking " << num_leaked_ports + << " ports in unreceived messages"; + } +#endif +} + +bool MessageQueue::HasNextMessage() const { + return !heap_.empty() && heap_[0]->sequence_num() == next_sequence_num_; +} + +void MessageQueue::GetNextMessage(mozilla::UniquePtr<UserMessageEvent>* message, + MessageFilter* filter) { + if (!HasNextMessage() || (filter && !filter->Match(*heap_[0]))) { + message->reset(); + return; + } + + std::pop_heap(heap_.begin(), heap_.end()); + *message = std::move(heap_.back()); + total_queued_bytes_ -= (*message)->GetSizeIfSerialized(); + heap_.pop_back(); + + // We keep the capacity of |heap_| in check so that a large batch of incoming + // messages doesn't permanently wreck available memory. The choice of interval + // here is somewhat arbitrary. + constexpr size_t kHeapMinimumShrinkSize = 16; + constexpr size_t kHeapShrinkInterval = 512; + if (MOZ_UNLIKELY(heap_.size() > kHeapMinimumShrinkSize && + heap_.size() % kHeapShrinkInterval == 0)) { + heap_.shrink_to_fit(); + } + + next_sequence_num_++; +} + +void MessageQueue::AcceptMessage(mozilla::UniquePtr<UserMessageEvent> message, + bool* has_next_message) { + // TODO: Handle sequence number roll-over. + + total_queued_bytes_ += message->GetSizeIfSerialized(); + heap_.emplace_back(std::move(message)); + std::push_heap(heap_.begin(), heap_.end()); + + if (!signalable_) { + *has_next_message = false; + } else { + *has_next_message = (heap_[0]->sequence_num() == next_sequence_num_); + } +} + +void MessageQueue::TakeAllMessages( + std::vector<mozilla::UniquePtr<UserMessageEvent>>* messages) { + *messages = std::move(heap_); + total_queued_bytes_ = 0; +} + +} // namespace ports +} // namespace core +} // namespace mojo diff --git a/ipc/chromium/src/mojo/core/ports/message_queue.h b/ipc/chromium/src/mojo/core/ports/message_queue.h new file mode 100644 index 0000000000..a9e08c761a --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/message_queue.h @@ -0,0 +1,85 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_CORE_PORTS_MESSAGE_QUEUE_H_ +#define MOJO_CORE_PORTS_MESSAGE_QUEUE_H_ + +#include <stdint.h> + +#include <limits> +#include <memory> +#include <vector> + +#include "mojo/core/ports/event.h" + +namespace mojo { +namespace core { +namespace ports { + +constexpr uint64_t kInitialSequenceNum = 1; +constexpr uint64_t kInvalidSequenceNum = std::numeric_limits<uint64_t>::max(); + +class MessageFilter; + +// An incoming message queue for a port. MessageQueue keeps track of the highest +// known sequence number and can indicate whether the next sequential message is +// available. Thus the queue enforces message ordering for the consumer without +// enforcing it for the producer (see AcceptMessage() below.) +class MessageQueue { + public: + explicit MessageQueue(); + explicit MessageQueue(uint64_t next_sequence_num); + ~MessageQueue(); + + MessageQueue(const MessageQueue&) = delete; + void operator=(const MessageQueue&) = delete; + + void set_signalable(bool value) { signalable_ = value; } + + uint64_t next_sequence_num() const { return next_sequence_num_; } + + bool HasNextMessage() const; + + // Gives ownership of the message. If |filter| is non-null, the next message + // will only be retrieved if the filter successfully matches it. + void GetNextMessage(mozilla::UniquePtr<UserMessageEvent>* message, + MessageFilter* filter); + + // Takes ownership of the message. Note: Messages are ordered, so while we + // have added a message to the queue, we may still be waiting on a message + // ahead of this one before we can let any of the messages be returned by + // GetNextMessage. + // + // Furthermore, once has_next_message is set to true, it will remain false + // until GetNextMessage is called enough times to return a null message. + // In other words, has_next_message acts like an edge trigger. + // + void AcceptMessage(mozilla::UniquePtr<UserMessageEvent> message, + bool* has_next_message); + + // Takes all messages from this queue. Used to safely destroy queued messages + // without holding any Port lock. + void TakeAllMessages( + std::vector<mozilla::UniquePtr<UserMessageEvent>>* messages); + + // The number of messages queued here, regardless of whether the next expected + // message has arrived yet. + size_t queued_message_count() const { return heap_.size(); } + + // The aggregate memory size in bytes of all messages queued here, regardless + // of whether the next expected message has arrived yet. + size_t queued_num_bytes() const { return total_queued_bytes_; } + + private: + std::vector<mozilla::UniquePtr<UserMessageEvent>> heap_; + uint64_t next_sequence_num_; + bool signalable_ = true; + size_t total_queued_bytes_ = 0; +}; + +} // namespace ports +} // namespace core +} // namespace mojo + +#endif // MOJO_CORE_PORTS_MESSAGE_QUEUE_H_ diff --git a/ipc/chromium/src/mojo/core/ports/name.cc b/ipc/chromium/src/mojo/core/ports/name.cc new file mode 100644 index 0000000000..17a787d933 --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/name.cc @@ -0,0 +1,54 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/core/ports/name.h" +#include "chrome/common/ipc_message_utils.h" + +namespace mojo { +namespace core { +namespace ports { + +std::ostream& operator<<(std::ostream& stream, const Name& name) { + std::ios::fmtflags flags(stream.flags()); + stream << std::hex << std::uppercase << name.v1; + if (name.v2 != 0) { + stream << '.' << name.v2; + } + stream.flags(flags); + return stream; +} + +mozilla::Logger& operator<<(mozilla::Logger& log, const Name& name) { + log.printf("%" PRIX64, name.v1); + if (name.v2 != 0) { + log.printf(".%" PRIX64, name.v2); + } + return log; +} + +} // namespace ports +} // namespace core +} // namespace mojo + +void IPC::ParamTraits<mojo::core::ports::PortName>::Write( + MessageWriter* aWriter, const paramType& aParam) { + WriteParam(aWriter, aParam.v1); + WriteParam(aWriter, aParam.v2); +} + +bool IPC::ParamTraits<mojo::core::ports::PortName>::Read(MessageReader* aReader, + paramType* aResult) { + return ReadParam(aReader, &aResult->v1) && ReadParam(aReader, &aResult->v2); +} + +void IPC::ParamTraits<mojo::core::ports::NodeName>::Write( + MessageWriter* aWriter, const paramType& aParam) { + WriteParam(aWriter, aParam.v1); + WriteParam(aWriter, aParam.v2); +} + +bool IPC::ParamTraits<mojo::core::ports::NodeName>::Read(MessageReader* aReader, + paramType* aResult) { + return ReadParam(aReader, &aResult->v1) && ReadParam(aReader, &aResult->v2); +} diff --git a/ipc/chromium/src/mojo/core/ports/name.h b/ipc/chromium/src/mojo/core/ports/name.h new file mode 100644 index 0000000000..0e668ebc40 --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/name.h @@ -0,0 +1,122 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_CORE_PORTS_NAME_H_ +#define MOJO_CORE_PORTS_NAME_H_ + +#include <stdint.h> + +#include <ostream> +#include <tuple> + +#include "base/logging.h" +#include "mozilla/HashFunctions.h" +#include "nsHashKeys.h" + +namespace mojo { +namespace core { +namespace ports { + +struct Name { + constexpr Name(uint64_t v1, uint64_t v2) : v1(v1), v2(v2) {} + uint64_t v1, v2; +}; + +inline bool operator==(const Name& a, const Name& b) { + return a.v1 == b.v1 && a.v2 == b.v2; +} + +inline bool operator!=(const Name& a, const Name& b) { return !(a == b); } + +inline bool operator<(const Name& a, const Name& b) { + return std::tie(a.v1, a.v2) < std::tie(b.v1, b.v2); +} + +std::ostream& operator<<(std::ostream& stream, const Name& name); +mozilla::Logger& operator<<(mozilla::Logger& log, const Name& name); + +struct PortName : Name { + constexpr PortName() : Name(0, 0) {} + constexpr PortName(uint64_t v1, uint64_t v2) : Name(v1, v2) {} +}; + +constexpr PortName kInvalidPortName{0, 0}; + +struct NodeName : Name { + constexpr NodeName() : Name(0, 0) {} + constexpr NodeName(uint64_t v1, uint64_t v2) : Name(v1, v2) {} +}; + +constexpr NodeName kInvalidNodeName{0, 0}; + +} // namespace ports +} // namespace core +} // namespace mojo + +namespace mozilla { + +template <> +inline PLDHashNumber Hash<mojo::core::ports::PortName>( + const mojo::core::ports::PortName& aValue) { + return mozilla::HashGeneric(aValue.v1, aValue.v2); +} + +template <> +inline PLDHashNumber Hash<mojo::core::ports::NodeName>( + const mojo::core::ports::NodeName& aValue) { + return mozilla::HashGeneric(aValue.v1, aValue.v2); +} + +using PortNameHashKey = nsGenericHashKey<mojo::core::ports::PortName>; +using NodeNameHashKey = nsGenericHashKey<mojo::core::ports::NodeName>; + +} // namespace mozilla + +namespace std { + +template <> +struct hash<mojo::core::ports::PortName> { + std::size_t operator()(const mojo::core::ports::PortName& name) const { + // FIXME: PLDHashNumber is only 32-bits + return mozilla::Hash(name); + } +}; + +template <> +struct hash<mojo::core::ports::NodeName> { + std::size_t operator()(const mojo::core::ports::NodeName& name) const { + // FIXME: PLDHashNumber is only 32-bits + return mozilla::Hash(name); + } +}; + +} // namespace std + +class PickleIterator; + +namespace IPC { + +template <typename T> +struct ParamTraits; +class Message; +class MessageReader; +class MessageWriter; + +template <> +struct ParamTraits<mojo::core::ports::PortName> { + using paramType = mojo::core::ports::PortName; + static void Write(MessageWriter* aWriter, const paramType& aParam); + static bool Read(MessageReader* aReader, paramType* aResult); +}; + +template <> +struct ParamTraits<mojo::core::ports::NodeName> { + using paramType = mojo::core::ports::NodeName; + static void Write(MessageWriter* aWriter, const paramType& aParam); + static bool Read(MessageReader* aReader, paramType* aResult); +}; + +} // namespace IPC + +#endif // MOJO_CORE_PORTS_NAME_H_ diff --git a/ipc/chromium/src/mojo/core/ports/node.cc b/ipc/chromium/src/mojo/core/ports/node.cc new file mode 100644 index 0000000000..455679ae4e --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/node.cc @@ -0,0 +1,1778 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/core/ports/node.h" + +#include <string.h> + +#include <algorithm> +#include <atomic> +#include <memory> +#include <utility> +#include <vector> + +#include "mozilla/Mutex.h" +#include "mozilla/RandomNum.h" +#include "nsTArray.h" + +#include "base/logging.h" +#include "mojo/core/ports/event.h" +#include "mojo/core/ports/node_delegate.h" +#include "mojo/core/ports/port_locker.h" + +namespace mojo { +namespace core { +namespace ports { + +namespace { + +int DebugError(const char* message, int error_code) { + NOTREACHED() << "Oops: " << message; + return error_code; +} + +#define OOPS(x) DebugError(#x, x) + +bool CanAcceptMoreMessages(const Port* port) { + // Have we already doled out the last message (i.e., do we expect to NOT + // receive further messages)? + uint64_t next_sequence_num = port->message_queue.next_sequence_num(); + if (port->state == Port::kClosed) { + return false; + } + if (port->peer_closed || port->remove_proxy_on_last_message) { + if (port->peer_lost_unexpectedly) { + return port->message_queue.HasNextMessage(); + } + if (port->last_sequence_num_to_receive == next_sequence_num - 1) { + return false; + } + } + return true; +} + +void GenerateRandomPortName(PortName* name) { + // FIXME: Chrome uses a cache to avoid extra calls to the system RNG when + // generating port names to keep this overhead down. If this method starts + // showing up on profiles we should consider doing the same. + *name = PortName{mozilla::RandomUint64OrDie(), mozilla::RandomUint64OrDie()}; +} + +} // namespace + +Node::Node(const NodeName& name, NodeDelegate* delegate) + : name_(name), delegate_(this, delegate) {} + +Node::~Node() { + if (!ports_.empty()) { + DLOG(WARNING) << "Unclean shutdown for node " << name_; + } +} + +bool Node::CanShutdownCleanly(ShutdownPolicy policy) { + PortLocker::AssertNoPortsLockedOnCurrentThread(); + mozilla::MutexAutoLock ports_lock(ports_lock_); + + if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) { +#ifdef DEBUG + for (auto& entry : ports_) { + DVLOG(2) << "Port " << entry.first << " referencing node " + << entry.second->peer_node_name << " is blocking shutdown of " + << "node " << name_ << " (state=" << entry.second->state << ")"; + } +#endif + return ports_.empty(); + } + + DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS); + + // NOTE: This is not efficient, though it probably doesn't need to be since + // relatively few ports should be open during shutdown and shutdown doesn't + // need to be blazingly fast. + bool can_shutdown = true; + for (auto& entry : ports_) { + PortRef port_ref(entry.first, entry.second); + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + if (port->peer_node_name != name_ && port->state != Port::kReceiving) { + can_shutdown = false; +#ifdef DEBUG + DVLOG(2) << "Port " << entry.first << " referencing node " + << port->peer_node_name << " is blocking shutdown of " + << "node " << name_ << " (state=" << port->state << ")"; +#else + // Exit early when not debugging. + break; +#endif + } + } + + return can_shutdown; +} + +int Node::GetPort(const PortName& port_name, PortRef* port_ref) { + PortLocker::AssertNoPortsLockedOnCurrentThread(); + mozilla::MutexAutoLock lock(ports_lock_); + auto iter = ports_.find(port_name); + if (iter == ports_.end()) { + return ERROR_PORT_UNKNOWN; + } + +#if defined(OS_ANDROID) && defined(ARCH_CPU_ARM64) + // Workaround for https://crbug.com/665869. + std::atomic_thread_fence(std::memory_order_seq_cst); +#endif + + *port_ref = PortRef(port_name, iter->second); + return OK; +} + +int Node::CreateUninitializedPort(PortRef* port_ref) { + PortName port_name; + GenerateRandomPortName(&port_name); + + RefPtr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum)); + int rv = AddPortWithName(port_name, port); + if (rv != OK) { + return rv; + } + + *port_ref = PortRef(port_name, std::move(port)); + return OK; +} + +int Node::InitializePort(const PortRef& port_ref, + const NodeName& peer_node_name, + const PortName& peer_port_name) { + { + // Must be acquired for UpdatePortPeerAddress below. + PortLocker::AssertNoPortsLockedOnCurrentThread(); + mozilla::MutexAutoLock ports_lock(ports_lock_); + + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + if (port->state != Port::kUninitialized) { + return ERROR_PORT_STATE_UNEXPECTED; + } + + port->state = Port::kReceiving; + UpdatePortPeerAddress(port_ref.name(), port, peer_node_name, + peer_port_name); + } + + delegate_->PortStatusChanged(port_ref); + + return OK; +} + +int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) { + int rv; + + rv = CreateUninitializedPort(port0_ref); + if (rv != OK) { + return rv; + } + + rv = CreateUninitializedPort(port1_ref); + if (rv != OK) { + return rv; + } + + rv = InitializePort(*port0_ref, name_, port1_ref->name()); + if (rv != OK) { + return rv; + } + + rv = InitializePort(*port1_ref, name_, port0_ref->name()); + if (rv != OK) { + return rv; + } + + return OK; +} + +int Node::SetUserData(const PortRef& port_ref, RefPtr<UserData> user_data) { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + if (port->state == Port::kClosed) { + return ERROR_PORT_STATE_UNEXPECTED; + } + + port->user_data = std::move(user_data); + + return OK; +} + +int Node::GetUserData(const PortRef& port_ref, RefPtr<UserData>* user_data) { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + if (port->state == Port::kClosed) { + return ERROR_PORT_STATE_UNEXPECTED; + } + + *user_data = port->user_data; + + return OK; +} + +int Node::ClosePort(const PortRef& port_ref) { + std::vector<mozilla::UniquePtr<UserMessageEvent>> undelivered_messages; + NodeName peer_node_name; + PortName peer_port_name; + uint64_t last_sequence_num = 0; + bool was_initialized = false; + { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + switch (port->state) { + case Port::kUninitialized: + break; + + case Port::kReceiving: + was_initialized = true; + port->state = Port::kClosed; + + // We pass along the sequence number of the last message sent from this + // port to allow the peer to have the opportunity to consume all inbound + // messages before notifying the embedder that this port is closed. + last_sequence_num = port->next_sequence_num_to_send - 1; + + peer_node_name = port->peer_node_name; + peer_port_name = port->peer_port_name; + + // If the port being closed still has unread messages, then we need to + // take care to close those ports so as to avoid leaking memory. + port->message_queue.TakeAllMessages(&undelivered_messages); + break; + + default: + return ERROR_PORT_STATE_UNEXPECTED; + } + } + + ErasePort(port_ref.name()); + + if (was_initialized) { + DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" + << name_ << " to " << peer_port_name << "@" << peer_node_name; + delegate_->ForwardEvent(peer_node_name, + mozilla::MakeUnique<ObserveClosureEvent>( + peer_port_name, last_sequence_num)); + for (const auto& message : undelivered_messages) { + for (size_t i = 0; i < message->num_ports(); ++i) { + PortRef ref; + if (GetPort(message->ports()[i], &ref) == OK) { + ClosePort(ref); + } + } + } + } + return OK; +} + +int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + if (port->state != Port::kReceiving) { + return ERROR_PORT_STATE_UNEXPECTED; + } + + port_status->has_messages = port->message_queue.HasNextMessage(); + port_status->receiving_messages = CanAcceptMoreMessages(port); + port_status->peer_closed = port->peer_closed; + port_status->peer_remote = port->peer_node_name != name_; + port_status->queued_message_count = + port->message_queue.queued_message_count(); + port_status->queued_num_bytes = port->message_queue.queued_num_bytes(); + port_status->unacknowledged_message_count = + port->next_sequence_num_to_send - port->last_sequence_num_acknowledged - + 1; + +#ifdef FUZZING_SNAPSHOT + port_status->peer_node_name = port->peer_node_name; +#endif + + return OK; +} + +int Node::GetMessage(const PortRef& port_ref, + mozilla::UniquePtr<UserMessageEvent>* message, + MessageFilter* filter) { + *message = nullptr; + + DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_; + + NodeName peer_node_name; + ScopedEvent ack_event; + { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + + // This could also be treated like the port being unknown since the + // embedder should no longer be referring to a port that has been sent. + if (port->state != Port::kReceiving) { + return ERROR_PORT_STATE_UNEXPECTED; + } + + // Let the embedder get messages until there are no more before reporting + // that the peer closed its end. + if (!CanAcceptMoreMessages(port)) { + return ERROR_PORT_PEER_CLOSED; + } + + port->message_queue.GetNextMessage(message, filter); + if (*message && + (*message)->sequence_num() == port->sequence_num_to_acknowledge) { + peer_node_name = port->peer_node_name; + ack_event = mozilla::MakeUnique<UserMessageReadAckEvent>( + port->peer_port_name, port->sequence_num_to_acknowledge); + } + } + + if (ack_event) { + delegate_->ForwardEvent(peer_node_name, std::move(ack_event)); + } + + // Allow referenced ports to trigger PortStatusChanged calls. + if (*message) { + for (size_t i = 0; i < (*message)->num_ports(); ++i) { + PortRef new_port_ref; + int rv = GetPort((*message)->ports()[i], &new_port_ref); + + DCHECK_EQ(OK, rv) << "Port " << new_port_ref.name() << "@" << name_ + << " does not exist!"; + + SinglePortLocker locker(&new_port_ref); + DCHECK(locker.port()->state == Port::kReceiving); + locker.port()->message_queue.set_signalable(true); + } + + // The user may retransmit this message from another port. We reset the + // sequence number so that the message will get a new one if that happens. + (*message)->set_sequence_num(0); + } + + return OK; +} + +int Node::SendUserMessage(const PortRef& port_ref, + mozilla::UniquePtr<UserMessageEvent> message) { + int rv = SendUserMessageInternal(port_ref, &message); + if (rv != OK) { + // If send failed, close all carried ports. Note that we're careful not to + // close the sending port itself if it happened to be one of the encoded + // ports (an invalid but possible condition.) + for (size_t i = 0; i < message->num_ports(); ++i) { + if (message->ports()[i] == port_ref.name()) { + continue; + } + + PortRef port; + if (GetPort(message->ports()[i], &port) == OK) { + ClosePort(port); + } + } + } + return rv; +} + +int Node::SetAcknowledgeRequestInterval( + const PortRef& port_ref, uint64_t sequence_num_acknowledge_interval) { + NodeName peer_node_name; + PortName peer_port_name; + uint64_t sequence_num_to_request_ack = 0; + { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + if (port->state != Port::kReceiving) { + return ERROR_PORT_STATE_UNEXPECTED; + } + + port->sequence_num_acknowledge_interval = sequence_num_acknowledge_interval; + if (!sequence_num_acknowledge_interval) { + return OK; + } + + peer_node_name = port->peer_node_name; + peer_port_name = port->peer_port_name; + + sequence_num_to_request_ack = port->last_sequence_num_acknowledged + + sequence_num_acknowledge_interval; + } + + delegate_->ForwardEvent(peer_node_name, + mozilla::MakeUnique<UserMessageReadAckRequestEvent>( + peer_port_name, sequence_num_to_request_ack)); + return OK; +} + +int Node::AcceptEvent(ScopedEvent event) { + switch (event->type()) { + case Event::Type::kUserMessage: + return OnUserMessage(Event::Cast<UserMessageEvent>(&event)); + case Event::Type::kPortAccepted: + return OnPortAccepted(Event::Cast<PortAcceptedEvent>(&event)); + case Event::Type::kObserveProxy: + return OnObserveProxy(Event::Cast<ObserveProxyEvent>(&event)); + case Event::Type::kObserveProxyAck: + return OnObserveProxyAck(Event::Cast<ObserveProxyAckEvent>(&event)); + case Event::Type::kObserveClosure: + return OnObserveClosure(Event::Cast<ObserveClosureEvent>(&event)); + case Event::Type::kMergePort: + return OnMergePort(Event::Cast<MergePortEvent>(&event)); + case Event::Type::kUserMessageReadAckRequest: + return OnUserMessageReadAckRequest( + Event::Cast<UserMessageReadAckRequestEvent>(&event)); + case Event::Type::kUserMessageReadAck: + return OnUserMessageReadAck(Event::Cast<UserMessageReadAckEvent>(&event)); + } + return OOPS(ERROR_NOT_IMPLEMENTED); +} + +int Node::MergePorts(const PortRef& port_ref, + const NodeName& destination_node_name, + const PortName& destination_port_name) { + PortName new_port_name; + Event::PortDescriptor new_port_descriptor; + { + // Must be held for ConvertToProxy. + PortLocker::AssertNoPortsLockedOnCurrentThread(); + mozilla::MutexAutoLock ports_locker(ports_lock_); + + SinglePortLocker locker(&port_ref); + + DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ + << " to " << destination_port_name << "@" << destination_node_name; + + // Send the port-to-merge over to the destination node so it can be merged + // into the port cycle atomically there. + new_port_name = port_ref.name(); + ConvertToProxy(locker.port(), destination_node_name, &new_port_name, + &new_port_descriptor); + } + + if (new_port_descriptor.peer_node_name == name_ && + destination_node_name != name_) { + // Ensure that the locally retained peer of the new proxy gets a status + // update so it notices that its peer is now remote. + PortRef local_peer; + if (GetPort(new_port_descriptor.peer_port_name, &local_peer) == OK) { + delegate_->PortStatusChanged(local_peer); + } + } + + delegate_->ForwardEvent( + destination_node_name, + mozilla::MakeUnique<MergePortEvent>(destination_port_name, new_port_name, + new_port_descriptor)); + return OK; +} + +int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) { + DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_ + << " and " << port1_ref.name() << "@" << name_; + return MergePortsInternal(port0_ref, port1_ref, + true /* allow_close_on_bad_state */); +} + +int Node::LostConnectionToNode(const NodeName& node_name) { + // We can no longer send events to the given node. We also can't expect any + // PortAccepted events. + + DVLOG(1) << "Observing lost connection from node " << name_ << " to node " + << node_name; + + DestroyAllPortsWithPeer(node_name, kInvalidPortName); + return OK; +} + +int Node::OnUserMessage(mozilla::UniquePtr<UserMessageEvent> message) { + PortName port_name = message->port_name(); + +#ifdef DEBUG + std::ostringstream ports_buf; + for (size_t i = 0; i < message->num_ports(); ++i) { + if (i > 0) { + ports_buf << ","; + } + ports_buf << message->ports()[i]; + } + + DVLOG(4) << "OnUserMessage " << message->sequence_num() + << " [ports=" << ports_buf.str() << "] at " << port_name << "@" + << name_; +#endif + + // Even if this port does not exist, cannot receive anymore messages or is + // buffering or proxying messages, we still need these ports to be bound to + // this node. When the message is forwarded, these ports will get transferred + // following the usual method. If the message cannot be accepted, then the + // newly bound ports will simply be closed. + for (size_t i = 0; i < message->num_ports(); ++i) { + Event::PortDescriptor& descriptor = message->port_descriptors()[i]; + if (descriptor.referring_node_name == kInvalidNodeName) { + // If the referring node name is invalid, this descriptor can be ignored + // and the port should already exist locally. + PortRef port_ref; + if (GetPort(message->ports()[i], &port_ref) != OK) { + return ERROR_PORT_UNKNOWN; + } + } else { + int rv = AcceptPort(message->ports()[i], descriptor); + if (rv != OK) { + return rv; + } + + // Ensure that the referring node is wiped out of this descriptor. This + // allows the event to be forwarded across multiple local hops without + // attempting to accept the port more than once. + descriptor.referring_node_name = kInvalidNodeName; + } + } + + PortRef port_ref; + GetPort(port_name, &port_ref); + bool has_next_message = false; + bool message_accepted = false; + bool should_forward_messages = false; + if (port_ref.is_valid()) { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + + // Reject spurious messages if we've already received the last expected + // message. + if (CanAcceptMoreMessages(port)) { + message_accepted = true; + port->message_queue.AcceptMessage(std::move(message), &has_next_message); + + if (port->state == Port::kBuffering) { + has_next_message = false; + } else if (port->state == Port::kProxying) { + has_next_message = false; + should_forward_messages = true; + } + } + } + + if (should_forward_messages) { + int rv = ForwardUserMessagesFromProxy(port_ref); + if (rv != OK) { + return rv; + } + TryRemoveProxy(port_ref); + } + + if (!message_accepted) { + DVLOG(2) << "Message not accepted!\n"; + // Close all newly accepted ports as they are effectively orphaned. + for (size_t i = 0; i < message->num_ports(); ++i) { + PortRef attached_port_ref; + if (GetPort(message->ports()[i], &attached_port_ref) == OK) { + ClosePort(attached_port_ref); + } else { + DLOG(WARNING) << "Cannot close non-existent port!\n"; + } + } + } else if (has_next_message) { + delegate_->PortStatusChanged(port_ref); + } + + return OK; +} + +int Node::OnPortAccepted(mozilla::UniquePtr<PortAcceptedEvent> event) { + PortRef port_ref; + if (GetPort(event->port_name(), &port_ref) != OK) { + return ERROR_PORT_UNKNOWN; + } + +#ifdef DEBUG + { + SinglePortLocker locker(&port_ref); + DVLOG(2) << "PortAccepted at " << port_ref.name() << "@" << name_ + << " pointing to " << locker.port()->peer_port_name << "@" + << locker.port()->peer_node_name; + } +#endif + + return BeginProxying(port_ref); +} + +int Node::OnObserveProxy(mozilla::UniquePtr<ObserveProxyEvent> event) { + if (event->port_name() == kInvalidPortName) { + // An ObserveProxy with an invalid target port name is a broadcast used to + // inform ports when their peer (which was itself a proxy) has become + // defunct due to unexpected node disconnection. + // + // Receiving ports affected by this treat it as equivalent to peer closure. + // Proxies affected by this can be removed and will in turn broadcast their + // own death with a similar message. + DCHECK_EQ(event->proxy_target_node_name(), kInvalidNodeName); + DCHECK_EQ(event->proxy_target_port_name(), kInvalidPortName); + DestroyAllPortsWithPeer(event->proxy_node_name(), event->proxy_port_name()); + return OK; + } + + // The port may have already been closed locally, in which case the + // ObserveClosure message will contain the last_sequence_num field. + // We can then silently ignore this message. + PortRef port_ref; + if (GetPort(event->port_name(), &port_ref) != OK) { + DVLOG(1) << "ObserveProxy: " << event->port_name() << "@" << name_ + << " not found"; + return OK; + } + + DVLOG(2) << "ObserveProxy at " << port_ref.name() << "@" << name_ + << ", proxy at " << event->proxy_port_name() << "@" + << event->proxy_node_name() << " pointing to " + << event->proxy_target_port_name() << "@" + << event->proxy_target_node_name(); + + bool peer_changed = false; + ScopedEvent event_to_forward; + NodeName event_target_node; + { + // Must be acquired for UpdatePortPeerAddress below. + PortLocker::AssertNoPortsLockedOnCurrentThread(); + mozilla::MutexAutoLock ports_locker(ports_lock_); + + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + + if (port->peer_node_name == event->proxy_node_name() && + port->peer_port_name == event->proxy_port_name()) { + if (port->state == Port::kReceiving) { + UpdatePortPeerAddress(port_ref.name(), port, + event->proxy_target_node_name(), + event->proxy_target_port_name()); + event_target_node = event->proxy_node_name(); + event_to_forward = mozilla::MakeUnique<ObserveProxyAckEvent>( + event->proxy_port_name(), port->next_sequence_num_to_send - 1); + peer_changed = true; + DVLOG(2) << "Forwarding ObserveProxyAck from " << event->port_name() + << "@" << name_ << " to " << event->proxy_port_name() << "@" + << event_target_node; + } else { + // As a proxy ourselves, we don't know how to honor the ObserveProxy + // event or to populate the last_sequence_num field of ObserveProxyAck. + // Afterall, another port could be sending messages to our peer now + // that we've sent out our own ObserveProxy event. Instead, we will + // send an ObserveProxyAck indicating that the ObserveProxy event + // should be re-sent (last_sequence_num set to kInvalidSequenceNum). + // However, this has to be done after we are removed as a proxy. + // Otherwise, we might just find ourselves back here again, which + // would be akin to a busy loop. + + DVLOG(2) << "Delaying ObserveProxyAck to " << event->proxy_port_name() + << "@" << event->proxy_node_name(); + + port->send_on_proxy_removal = + mozilla::MakeUnique<std::pair<NodeName, ScopedEvent>>( + event->proxy_node_name(), + mozilla::MakeUnique<ObserveProxyAckEvent>( + event->proxy_port_name(), kInvalidSequenceNum)); + } + } else { + // Forward this event along to our peer. Eventually, it should find the + // port referring to the proxy. + event_target_node = port->peer_node_name; + event->set_port_name(port->peer_port_name); + event_to_forward = std::move(event); + } + } + + if (event_to_forward) { + delegate_->ForwardEvent(event_target_node, std::move(event_to_forward)); + } + + if (peer_changed) { + // Re-send ack and/or ack requests, as the previous peer proxy may not have + // forwarded the previous request before it died. + MaybeResendAck(port_ref); + MaybeResendAckRequest(port_ref); + + delegate_->PortStatusChanged(port_ref); + } + + return OK; +} + +int Node::OnObserveProxyAck(mozilla::UniquePtr<ObserveProxyAckEvent> event) { + DVLOG(2) << "ObserveProxyAck at " << event->port_name() << "@" << name_ + << " (last_sequence_num=" << event->last_sequence_num() << ")"; + + PortRef port_ref; + if (GetPort(event->port_name(), &port_ref) != OK) { + return ERROR_PORT_UNKNOWN; // The port may have observed closure first. + } + + bool try_remove_proxy_immediately; + { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + if (port->state != Port::kProxying) { + return OOPS(ERROR_PORT_STATE_UNEXPECTED); + } + + // If the last sequence number is invalid, this is a signal that we need to + // retransmit the ObserveProxy event for this port rather than flagging the + // the proxy for removal ASAP. + try_remove_proxy_immediately = + event->last_sequence_num() != kInvalidSequenceNum; + if (try_remove_proxy_immediately) { + // We can now remove this port once we have received and forwarded the + // last message addressed to this port. + port->remove_proxy_on_last_message = true; + port->last_sequence_num_to_receive = event->last_sequence_num(); + } + } + + if (try_remove_proxy_immediately) { + TryRemoveProxy(port_ref); + } else { + InitiateProxyRemoval(port_ref); + } + + return OK; +} + +int Node::OnObserveClosure(mozilla::UniquePtr<ObserveClosureEvent> event) { + // OK if the port doesn't exist, as it may have been closed already. + PortRef port_ref; + if (GetPort(event->port_name(), &port_ref) != OK) { + return OK; + } + + // This message tells the port that it should no longer expect more messages + // beyond last_sequence_num. This message is forwarded along until we reach + // the receiving end, and this message serves as an equivalent to + // ObserveProxyAck. + + bool notify_delegate = false; + NodeName peer_node_name; + PortName peer_port_name; + bool try_remove_proxy = false; + { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + + port->peer_closed = true; + port->last_sequence_num_to_receive = event->last_sequence_num(); + + DVLOG(2) << "ObserveClosure at " << port_ref.name() << "@" << name_ + << " (state=" << port->state << ") pointing to " + << port->peer_port_name << "@" << port->peer_node_name + << " (last_sequence_num=" << event->last_sequence_num() << ")"; + + // We always forward ObserveClosure, even beyond the receiving port which + // cares about it. This ensures that any dead-end proxies beyond that port + // are notified to remove themselves. + + if (port->state == Port::kReceiving) { + notify_delegate = true; + + // When forwarding along the other half of the port cycle, this will only + // reach dead-end proxies. Tell them we've sent our last message so they + // can go away. + // + // TODO: Repurposing ObserveClosure for this has the desired result but + // may be semantically confusing since the forwarding port is not actually + // closed. Consider replacing this with a new event type. + event->set_last_sequence_num(port->next_sequence_num_to_send - 1); + + // Treat the closure as an acknowledge that all sent messages have been + // read from the other end. + port->last_sequence_num_acknowledged = + port->next_sequence_num_to_send - 1; + } else { + // We haven't yet reached the receiving peer of the closed port, so we'll + // forward the message along as-is. + // See about removing the port if it is a proxy as our peer won't be able + // to participate in proxy removal. + port->remove_proxy_on_last_message = true; + if (port->state == Port::kProxying) { + try_remove_proxy = true; + } + } + + DVLOG(2) << "Forwarding ObserveClosure from " << port_ref.name() << "@" + << name_ << " to peer " << port->peer_port_name << "@" + << port->peer_node_name + << " (last_sequence_num=" << event->last_sequence_num() << ")"; + + peer_node_name = port->peer_node_name; + peer_port_name = port->peer_port_name; + } + + if (try_remove_proxy) { + TryRemoveProxy(port_ref); + } + + event->set_port_name(peer_port_name); + delegate_->ForwardEvent(peer_node_name, std::move(event)); + + if (notify_delegate) { + delegate_->PortStatusChanged(port_ref); + } + + return OK; +} + +int Node::OnMergePort(mozilla::UniquePtr<MergePortEvent> event) { + PortRef port_ref; + GetPort(event->port_name(), &port_ref); + + DVLOG(1) << "MergePort at " << port_ref.name() << "@" << name_ + << " merging with proxy " << event->new_port_name() << "@" << name_ + << " pointing to " << event->new_port_descriptor().peer_port_name + << "@" << event->new_port_descriptor().peer_node_name + << " referred by " + << event->new_port_descriptor().referring_port_name << "@" + << event->new_port_descriptor().referring_node_name; + + // Accept the new port. This is now the receiving end of the other port cycle + // to be merged with ours. Note that we always attempt to accept the new port + // first as otherwise its peer receiving port could be left stranded + // indefinitely. + if (AcceptPort(event->new_port_name(), event->new_port_descriptor()) != OK) { + if (port_ref.is_valid()) { + ClosePort(port_ref); + } + return ERROR_PORT_STATE_UNEXPECTED; + } + + PortRef new_port_ref; + GetPort(event->new_port_name(), &new_port_ref); + if (!port_ref.is_valid() && new_port_ref.is_valid()) { + ClosePort(new_port_ref); + return ERROR_PORT_UNKNOWN; + } + if (port_ref.is_valid() && !new_port_ref.is_valid()) { + ClosePort(port_ref); + return ERROR_PORT_UNKNOWN; + } + + return MergePortsInternal(port_ref, new_port_ref, + false /* allow_close_on_bad_state */); +} + +int Node::OnUserMessageReadAckRequest( + mozilla::UniquePtr<UserMessageReadAckRequestEvent> event) { + PortRef port_ref; + GetPort(event->port_name(), &port_ref); + + DVLOG(1) << "AckRequest " << port_ref.name() << "@" << name_ << " sequence " + << event->sequence_num_to_acknowledge(); + + if (!port_ref.is_valid()) { + return ERROR_PORT_UNKNOWN; + } + + NodeName peer_node_name; + mozilla::UniquePtr<Event> event_to_send; + { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + + peer_node_name = port->peer_node_name; + if (port->state == Port::kProxying) { + // Proxies simply forward the ack request to their peer. + event->set_port_name(port->peer_port_name); + event_to_send = std::move(event); + } else { + uint64_t current_sequence_num = + port->message_queue.next_sequence_num() - 1; + // Either this is requesting an ack for a sequence number already read, or + // else for a sequence number that is yet to be read. + if (current_sequence_num >= event->sequence_num_to_acknowledge()) { + // If the current sequence number to read already exceeds the ack + // request, send an ack immediately. + event_to_send = mozilla::MakeUnique<UserMessageReadAckEvent>( + port->peer_port_name, current_sequence_num); + + // This might be a late or duplicate acknowledge request, that's + // requesting acknowledge for an already read message. There may already + // have been a request for future reads, so take care not to back up + // the requested acknowledge counter. + if (current_sequence_num > port->sequence_num_to_acknowledge) { + port->sequence_num_to_acknowledge = current_sequence_num; + } + } else { + // This is request to ack a sequence number that hasn't been read yet. + // The state of the port can either be that it already has a + // future-requested ack, or not. Because ack requests aren't guaranteed + // to arrive in order, store the earlier of the current queued request + // and the new one, if one was already requested. + bool has_queued_ack_request = + port->sequence_num_to_acknowledge > current_sequence_num; + if (!has_queued_ack_request || + port->sequence_num_to_acknowledge > + event->sequence_num_to_acknowledge()) { + port->sequence_num_to_acknowledge = + event->sequence_num_to_acknowledge(); + } + return OK; + } + } + } + + if (event_to_send) { + delegate_->ForwardEvent(peer_node_name, std::move(event_to_send)); + } + + return OK; +} + +int Node::OnUserMessageReadAck( + mozilla::UniquePtr<UserMessageReadAckEvent> event) { + PortRef port_ref; + GetPort(event->port_name(), &port_ref); + + DVLOG(1) << "Acknowledge " << port_ref.name() << "@" << name_ << " sequence " + << event->sequence_num_acknowledged(); + + NodeName peer_node_name; + ScopedEvent ack_request_event; + if (port_ref.is_valid()) { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + + if (event->sequence_num_acknowledged() >= port->next_sequence_num_to_send) { + // TODO(http://crbug.com/980952): This is a malformed event. + // This could return a new error "ERROR_MALFORMED_EVENT" which the + // delegate could use as a signal to drop the peer node. + return OK; + } + + // Keep the largest acknowledge seen. + if (event->sequence_num_acknowledged() <= + port->last_sequence_num_acknowledged) { + // The acknowledge was late or a duplicate, it's safe to ignore it. + return OK; + } + + port->last_sequence_num_acknowledged = event->sequence_num_acknowledged(); + // Send another ack request if the interval is non-zero and the peer has + // not been closed. + if (port->sequence_num_acknowledge_interval && !port->peer_closed) { + peer_node_name = port->peer_node_name; + ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>( + port->peer_port_name, port->last_sequence_num_acknowledged + + port->sequence_num_acknowledge_interval); + } + } + if (ack_request_event) { + delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event)); + } + + if (port_ref.is_valid()) { + delegate_->PortStatusChanged(port_ref); + } + + return OK; +} + +int Node::AddPortWithName(const PortName& port_name, RefPtr<Port> port) { + PortLocker::AssertNoPortsLockedOnCurrentThread(); + mozilla::MutexAutoLock lock(ports_lock_); + if (port->peer_port_name != kInvalidPortName) { + DCHECK_NE(kInvalidNodeName, port->peer_node_name); + peer_port_maps_[port->peer_node_name][port->peer_port_name].emplace( + port_name, PortRef(port_name, port)); + } + if (!ports_.emplace(port_name, std::move(port)).second) { + return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. + } + DVLOG(2) << "Created port " << port_name << "@" << name_; + return OK; +} + +void Node::ErasePort(const PortName& port_name) { + PortLocker::AssertNoPortsLockedOnCurrentThread(); + RefPtr<Port> port; + { + mozilla::MutexAutoLock lock(ports_lock_); + auto it = ports_.find(port_name); + if (it == ports_.end()) { + return; + } + port = std::move(it->second); + ports_.erase(it); + + RemoveFromPeerPortMap(port_name, port.get()); + } + // NOTE: We are careful not to release the port's messages while holding any + // locks, since they may run arbitrary user code upon destruction. + std::vector<mozilla::UniquePtr<UserMessageEvent>> messages; + { + PortRef port_ref(port_name, std::move(port)); + SinglePortLocker locker(&port_ref); + locker.port()->message_queue.TakeAllMessages(&messages); + } + DVLOG(2) << "Deleted port " << port_name << "@" << name_; +} + +int Node::SendUserMessageInternal( + const PortRef& port_ref, mozilla::UniquePtr<UserMessageEvent>* message) { + mozilla::UniquePtr<UserMessageEvent>& m = *message; + for (size_t i = 0; i < m->num_ports(); ++i) { + if (m->ports()[i] == port_ref.name()) { + return ERROR_PORT_CANNOT_SEND_SELF; + } + } + + NodeName target_node; + int rv = PrepareToForwardUserMessage(port_ref, Port::kReceiving, + false /* ignore_closed_peer */, m.get(), + &target_node); + if (rv != OK) { + return rv; + } + + // Beyond this point there's no sense in returning anything but OK. Even if + // message forwarding or acceptance fails, there's nothing the embedder can + // do to recover. Assume that failure beyond this point must be treated as a + // transport failure. + + DCHECK_NE(kInvalidNodeName, target_node); + if (target_node != name_) { + delegate_->ForwardEvent(target_node, std::move(m)); + return OK; + } + + int accept_result = AcceptEvent(std::move(m)); + if (accept_result != OK) { + // See comment above for why we don't return an error in this case. + DVLOG(2) << "AcceptEvent failed: " << accept_result; + } + + return OK; +} + +int Node::MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref, + bool allow_close_on_bad_state) { + const PortRef* port_refs[2] = {&port0_ref, &port1_ref}; + { + // Needed to swap peer map entries below. + PortLocker::AssertNoPortsLockedOnCurrentThread(); + mozilla::ReleasableMutexAutoLock ports_locker(ports_lock_); + + mozilla::Maybe<PortLocker> locker(std::in_place, port_refs, size_t(2)); + auto* port0 = locker->GetPort(port0_ref); + auto* port1 = locker->GetPort(port1_ref); + + // There are several conditions which must be met before we'll consider + // merging two ports: + // + // - They must both be in the kReceiving state + // - They must not be each other's peer + // - They must have never sent a user message + // + // If any of these criteria are not met, we fail early. + if (port0->state != Port::kReceiving || port1->state != Port::kReceiving || + (port0->peer_node_name == name_ && + port0->peer_port_name == port1_ref.name()) || + (port1->peer_node_name == name_ && + port1->peer_port_name == port0_ref.name()) || + port0->next_sequence_num_to_send != kInitialSequenceNum || + port1->next_sequence_num_to_send != kInitialSequenceNum) { + // On failure, we only close a port if it was at least properly in the + // |kReceiving| state. This avoids getting the system in an inconsistent + // state by e.g. closing a proxy abruptly. + // + // Note that we must release the port locks before closing ports. + const bool close_port0 = + port0->state == Port::kReceiving || allow_close_on_bad_state; + const bool close_port1 = + port1->state == Port::kReceiving || allow_close_on_bad_state; + locker.reset(); + ports_locker.Unlock(); + if (close_port0) { + ClosePort(port0_ref); + } + if (close_port1) { + ClosePort(port1_ref); + } + return ERROR_PORT_STATE_UNEXPECTED; + } + + // Swap the ports' peer information and switch them both to proxying mode. + SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1); + port0->state = Port::kProxying; + port1->state = Port::kProxying; + if (port0->peer_closed) { + port0->remove_proxy_on_last_message = true; + } + if (port1->peer_closed) { + port1->remove_proxy_on_last_message = true; + } + } + + // Flush any queued messages from the new proxies and, if successful, complete + // the merge by initiating proxy removals. + if (ForwardUserMessagesFromProxy(port0_ref) == OK && + ForwardUserMessagesFromProxy(port1_ref) == OK) { + for (auto& port_ref : port_refs) { + bool try_remove_proxy_immediately = false; + ScopedEvent closure_event; + NodeName closure_event_target_node; + { + SinglePortLocker locker(port_ref); + auto* port = locker.port(); + DCHECK(port->state == Port::kProxying); + try_remove_proxy_immediately = port->remove_proxy_on_last_message; + if (try_remove_proxy_immediately || port->peer_closed) { + // If either end of the port cycle is closed, we propagate an + // ObserveClosure event. + closure_event_target_node = port->peer_node_name; + closure_event = mozilla::MakeUnique<ObserveClosureEvent>( + port->peer_port_name, port->last_sequence_num_to_receive); + } + } + if (try_remove_proxy_immediately) { + TryRemoveProxy(*port_ref); + } else { + InitiateProxyRemoval(*port_ref); + } + + if (closure_event) { + delegate_->ForwardEvent(closure_event_target_node, + std::move(closure_event)); + } + } + + return OK; + } + + // If we failed to forward proxied messages, we keep the system in a + // consistent state by undoing the peer swap and closing the ports. + { + PortLocker::AssertNoPortsLockedOnCurrentThread(); + mozilla::MutexAutoLock ports_locker(ports_lock_); + PortLocker locker(port_refs, 2); + auto* port0 = locker.GetPort(port0_ref); + auto* port1 = locker.GetPort(port1_ref); + SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1); + port0->remove_proxy_on_last_message = false; + port1->remove_proxy_on_last_message = false; + DCHECK_EQ(Port::kProxying, port0->state); + DCHECK_EQ(Port::kProxying, port1->state); + port0->state = Port::kReceiving; + port1->state = Port::kReceiving; + } + + ClosePort(port0_ref); + ClosePort(port1_ref); + return ERROR_PORT_STATE_UNEXPECTED; +} + +void Node::ConvertToProxy(Port* port, const NodeName& to_node_name, + PortName* port_name, + Event::PortDescriptor* port_descriptor) { + port->AssertLockAcquired(); + PortName local_port_name = *port_name; + + PortName new_port_name; + GenerateRandomPortName(&new_port_name); + + // Make sure we don't send messages to the new peer until after we know it + // exists. In the meantime, just buffer messages locally. + DCHECK(port->state == Port::kReceiving); + port->state = Port::kBuffering; + + // If we already know our peer is closed, we already know this proxy can + // be removed once it receives and forwards its last expected message. + if (port->peer_closed) { + port->remove_proxy_on_last_message = true; + } + + *port_name = new_port_name; + + port_descriptor->peer_node_name = port->peer_node_name; + port_descriptor->peer_port_name = port->peer_port_name; + port_descriptor->referring_node_name = name_; + port_descriptor->referring_port_name = local_port_name; + port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send; + port_descriptor->next_sequence_num_to_receive = + port->message_queue.next_sequence_num(); + port_descriptor->last_sequence_num_to_receive = + port->last_sequence_num_to_receive; + port_descriptor->peer_closed = port->peer_closed; + memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding)); + + // Configure the local port to point to the new port. + UpdatePortPeerAddress(local_port_name, port, to_node_name, new_port_name); +} + +int Node::AcceptPort(const PortName& port_name, + const Event::PortDescriptor& port_descriptor) { + RefPtr<Port> port = + mozilla::MakeRefPtr<Port>(port_descriptor.next_sequence_num_to_send, + port_descriptor.next_sequence_num_to_receive); + port->state = Port::kReceiving; + port->peer_node_name = port_descriptor.peer_node_name; + port->peer_port_name = port_descriptor.peer_port_name; + port->last_sequence_num_to_receive = + port_descriptor.last_sequence_num_to_receive; + port->peer_closed = port_descriptor.peer_closed; + + DVLOG(2) << "Accepting port " << port_name + << " [peer_closed=" << port->peer_closed + << "; last_sequence_num_to_receive=" + << port->last_sequence_num_to_receive << "]"; + + // A newly accepted port is not signalable until the message referencing the + // new port finds its way to the consumer (see GetMessage). + port->message_queue.set_signalable(false); + + int rv = AddPortWithName(port_name, std::move(port)); + if (rv != OK) { + return rv; + } + + // Allow referring port to forward messages. + delegate_->ForwardEvent(port_descriptor.referring_node_name, + mozilla::MakeUnique<PortAcceptedEvent>( + port_descriptor.referring_port_name)); + return OK; +} + +int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref, + Port::State expected_port_state, + bool ignore_closed_peer, + UserMessageEvent* message, + NodeName* forward_to_node) { + bool target_is_remote = false; + for (;;) { + NodeName target_node_name; + { + SinglePortLocker locker(&forwarding_port_ref); + target_node_name = locker.port()->peer_node_name; + } + + // NOTE: This may call out to arbitrary user code, so it's important to call + // it only while no port locks are held on the calling thread. + if (target_node_name != name_) { + if (!message->NotifyWillBeRoutedExternally()) { + CHROMIUM_LOG(ERROR) + << "NotifyWillBeRoutedExternally failed unexpectedly."; + return ERROR_PORT_STATE_UNEXPECTED; + } + } + + // Must be held because ConvertToProxy needs to update |peer_port_maps_|. + PortLocker::AssertNoPortsLockedOnCurrentThread(); + mozilla::MutexAutoLock ports_locker(ports_lock_); + + // Simultaneously lock the forwarding port as well as all attached ports. + AutoTArray<PortRef, 4> attached_port_refs; + AutoTArray<const PortRef*, 5> ports_to_lock; + attached_port_refs.SetCapacity(message->num_ports()); + ports_to_lock.SetCapacity(message->num_ports() + 1); + ports_to_lock.AppendElement(&forwarding_port_ref); + for (size_t i = 0; i < message->num_ports(); ++i) { + const PortName& attached_port_name = message->ports()[i]; + auto iter = ports_.find(attached_port_name); + DCHECK(iter != ports_.end()); + attached_port_refs.AppendElement( + PortRef(attached_port_name, iter->second)); + ports_to_lock.AppendElement(&attached_port_refs[i]); + } + PortLocker locker(ports_to_lock.Elements(), ports_to_lock.Length()); + auto* forwarding_port = locker.GetPort(forwarding_port_ref); + + if (forwarding_port->peer_node_name != target_node_name) { + // The target node has already changed since we last held the lock. + if (target_node_name == name_) { + // If the target node was previously this local node, we need to restart + // the loop, since that means we may now route the message externally. + continue; + } + + target_node_name = forwarding_port->peer_node_name; + } + target_is_remote = target_node_name != name_; + + if (forwarding_port->state != expected_port_state) { + return ERROR_PORT_STATE_UNEXPECTED; + } + if (forwarding_port->peer_closed && !ignore_closed_peer) { + return ERROR_PORT_PEER_CLOSED; + } + + // Messages may already have a sequence number if they're being forwarded by + // a proxy. Otherwise, use the next outgoing sequence number. + if (message->sequence_num() == 0) { + message->set_sequence_num(forwarding_port->next_sequence_num_to_send++); + } +#ifdef DEBUG + std::ostringstream ports_buf; + for (size_t i = 0; i < message->num_ports(); ++i) { + if (i > 0) { + ports_buf << ","; + } + ports_buf << message->ports()[i]; + } +#endif + + if (message->num_ports() > 0) { + // Sanity check to make sure we can actually send all the attached ports. + // They must all be in the |kReceiving| state and must not be the sender's + // own peer. + DCHECK_EQ(message->num_ports(), attached_port_refs.Length()); + for (size_t i = 0; i < message->num_ports(); ++i) { + auto* attached_port = locker.GetPort(attached_port_refs[i]); + int error = OK; + if (attached_port->state != Port::kReceiving) { + error = ERROR_PORT_STATE_UNEXPECTED; + } else if (attached_port_refs[i].name() == + forwarding_port->peer_port_name) { + error = ERROR_PORT_CANNOT_SEND_PEER; + } + + if (error != OK) { + // Not going to send. Backpedal on the sequence number. + forwarding_port->next_sequence_num_to_send--; + return error; + } + } + + if (target_is_remote) { + // We only bother to proxy and rewrite ports in the event if it's + // going to be routed to an external node. This substantially reduces + // the amount of port churn in the system, as many port-carrying + // events are routed at least 1 or 2 intra-node hops before (if ever) + // being routed externally. + Event::PortDescriptor* port_descriptors = message->port_descriptors(); + for (size_t i = 0; i < message->num_ports(); ++i) { + ConvertToProxy(locker.GetPort(attached_port_refs[i]), + target_node_name, message->ports() + i, + port_descriptors + i); + } + } + } + +#ifdef DEBUG + DVLOG(4) << "Sending message " << message->sequence_num() + << " [ports=" << ports_buf.str() << "]" + << " from " << forwarding_port_ref.name() << "@" << name_ << " to " + << forwarding_port->peer_port_name << "@" << target_node_name; +#endif + + *forward_to_node = target_node_name; + message->set_port_name(forwarding_port->peer_port_name); + break; + } + + if (target_is_remote) { + for (size_t i = 0; i < message->num_ports(); ++i) { + // For any ports that were converted to proxies above, make sure their + // prior local peer (if applicable) receives a status update so it can be + // made aware of its peer's location. + const Event::PortDescriptor& descriptor = message->port_descriptors()[i]; + if (descriptor.peer_node_name == name_) { + PortRef local_peer; + if (GetPort(descriptor.peer_port_name, &local_peer) == OK) { + delegate_->PortStatusChanged(local_peer); + } + } + } + } + + return OK; +} + +int Node::BeginProxying(const PortRef& port_ref) { + { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + if (port->state != Port::kBuffering) { + return OOPS(ERROR_PORT_STATE_UNEXPECTED); + } + port->state = Port::kProxying; + } + + int rv = ForwardUserMessagesFromProxy(port_ref); + if (rv != OK) { + return rv; + } + + // Forward any pending acknowledge request. + MaybeForwardAckRequest(port_ref); + + bool try_remove_proxy_immediately; + ScopedEvent closure_event; + NodeName closure_target_node; + { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + if (port->state != Port::kProxying) { + return OOPS(ERROR_PORT_STATE_UNEXPECTED); + } + + try_remove_proxy_immediately = port->remove_proxy_on_last_message; + if (try_remove_proxy_immediately) { + // Make sure we propagate closure to our current peer. + closure_target_node = port->peer_node_name; + closure_event = mozilla::MakeUnique<ObserveClosureEvent>( + port->peer_port_name, port->last_sequence_num_to_receive); + } + } + + if (try_remove_proxy_immediately) { + TryRemoveProxy(port_ref); + delegate_->ForwardEvent(closure_target_node, std::move(closure_event)); + } else { + InitiateProxyRemoval(port_ref); + } + + return OK; +} + +int Node::ForwardUserMessagesFromProxy(const PortRef& port_ref) { + for (;;) { + // NOTE: We forward messages in sequential order here so that we maintain + // the message queue's notion of next sequence number. That's useful for the + // proxy removal process as we can tell when this port has seen all of the + // messages it is expected to see. + mozilla::UniquePtr<UserMessageEvent> message; + { + SinglePortLocker locker(&port_ref); + locker.port()->message_queue.GetNextMessage(&message, nullptr); + if (!message) { + break; + } + } + + NodeName target_node; + int rv = PrepareToForwardUserMessage(port_ref, Port::kProxying, + true /* ignore_closed_peer */, + message.get(), &target_node); + if (rv != OK) { + return rv; + } + + delegate_->ForwardEvent(target_node, std::move(message)); + } + return OK; +} + +void Node::InitiateProxyRemoval(const PortRef& port_ref) { + NodeName peer_node_name; + PortName peer_port_name; + { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + peer_node_name = port->peer_node_name; + peer_port_name = port->peer_port_name; + } + + // To remove this node, we start by notifying the connected graph that we are + // a proxy. This allows whatever port is referencing this node to skip it. + // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if + // the peer was closed in the meantime). + delegate_->ForwardEvent(peer_node_name, + mozilla::MakeUnique<ObserveProxyEvent>( + peer_port_name, name_, port_ref.name(), + peer_node_name, peer_port_name)); +} + +void Node::TryRemoveProxy(const PortRef& port_ref) { + bool should_erase = false; + NodeName removal_target_node; + ScopedEvent removal_event; + + { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + DCHECK(port->state == Port::kProxying); + + // Make sure we have seen ObserveProxyAck before removing the port. + if (!port->remove_proxy_on_last_message) { + return; + } + + if (!CanAcceptMoreMessages(port)) { + should_erase = true; + if (port->send_on_proxy_removal) { + removal_target_node = port->send_on_proxy_removal->first; + removal_event = std::move(port->send_on_proxy_removal->second); + } + } else { + DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_ + << " now; waiting for more messages"; + } + } + + if (should_erase) { + ErasePort(port_ref.name()); + } + + if (removal_event) { + delegate_->ForwardEvent(removal_target_node, std::move(removal_event)); + } +} + +void Node::DestroyAllPortsWithPeer(const NodeName& node_name, + const PortName& port_name) { + // Wipes out all ports whose peer node matches |node_name| and whose peer port + // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer + // node is matched. + + std::vector<PortRef> ports_to_notify; + std::vector<PortName> dead_proxies_to_broadcast; + std::vector<mozilla::UniquePtr<UserMessageEvent>> undelivered_messages; + + { + PortLocker::AssertNoPortsLockedOnCurrentThread(); + mozilla::MutexAutoLock ports_lock(ports_lock_); + + auto node_peer_port_map_iter = peer_port_maps_.find(node_name); + if (node_peer_port_map_iter == peer_port_maps_.end()) { + return; + } + + auto& node_peer_port_map = node_peer_port_map_iter->second; + auto peer_ports_begin = node_peer_port_map.begin(); + auto peer_ports_end = node_peer_port_map.end(); + if (port_name != kInvalidPortName) { + // If |port_name| is given, we limit the set of local ports to the ones + // with that specific port as their peer. + peer_ports_begin = node_peer_port_map.find(port_name); + if (peer_ports_begin == node_peer_port_map.end()) { + return; + } + + peer_ports_end = peer_ports_begin; + ++peer_ports_end; + } + + for (auto peer_port_iter = peer_ports_begin; + peer_port_iter != peer_ports_end; ++peer_port_iter) { + auto& local_ports = peer_port_iter->second; + // NOTE: This inner loop almost always has only one element. There are + // relatively short-lived cases where more than one local port points to + // the same peer, and this only happens when extra ports are bypassed + // proxies waiting to be torn down. + for (auto& local_port : local_ports) { + auto& local_port_ref = local_port.second; + + SinglePortLocker locker(&local_port_ref); + auto* port = locker.port(); + + if (!port->peer_closed) { + // Treat this as immediate peer closure. It's an exceptional + // condition akin to a broken pipe, so we don't care about losing + // messages. + + port->peer_closed = true; + port->peer_lost_unexpectedly = true; + if (port->state == Port::kReceiving) { + ports_to_notify.push_back(local_port_ref); + } + } + + // We don't expect to forward any further messages, and we don't + // expect to receive a Port{Accepted,Rejected} event. Because we're + // a proxy with no active peer, we cannot use the normal proxy removal + // procedure of forward-propagating an ObserveProxy. Instead we + // broadcast our own death so it can be back-propagated. This is + // inefficient but rare. + if (port->state != Port::kReceiving) { + dead_proxies_to_broadcast.push_back(local_port_ref.name()); + std::vector<mozilla::UniquePtr<UserMessageEvent>> messages; + port->message_queue.TakeAllMessages(&messages); + for (auto& message : messages) { + undelivered_messages.emplace_back(std::move(message)); + } + } + } + } + } + + for (const auto& proxy_name : dead_proxies_to_broadcast) { + ErasePort(proxy_name); + DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_; + } + + // Wake up any receiving ports who have just observed simulated peer closure. + for (const auto& port : ports_to_notify) { + delegate_->PortStatusChanged(port); + } + + for (const auto& proxy_name : dead_proxies_to_broadcast) { + // Broadcast an event signifying that this proxy is no longer functioning. + delegate_->BroadcastEvent(mozilla::MakeUnique<ObserveProxyEvent>( + kInvalidPortName, name_, proxy_name, kInvalidNodeName, + kInvalidPortName)); + + // Also process death locally since the port that points this closed one + // could be on the current node. + // Note: Although this is recursive, only a single port is involved which + // limits the expected branching to 1. + DestroyAllPortsWithPeer(name_, proxy_name); + } + + // Close any ports referenced by undelivered messages. + for (const auto& message : undelivered_messages) { + for (size_t i = 0; i < message->num_ports(); ++i) { + PortRef ref; + if (GetPort(message->ports()[i], &ref) == OK) { + ClosePort(ref); + } + } + } +} + +void Node::UpdatePortPeerAddress(const PortName& local_port_name, + Port* local_port, + const NodeName& new_peer_node, + const PortName& new_peer_port) { + ports_lock_.AssertCurrentThreadOwns(); + local_port->AssertLockAcquired(); + + RemoveFromPeerPortMap(local_port_name, local_port); + local_port->peer_node_name = new_peer_node; + local_port->peer_port_name = new_peer_port; + if (new_peer_port != kInvalidPortName) { + peer_port_maps_[new_peer_node][new_peer_port].emplace( + local_port_name, PortRef(local_port_name, RefPtr<Port>{local_port})); + } +} + +void Node::RemoveFromPeerPortMap(const PortName& local_port_name, + Port* local_port) { + if (local_port->peer_port_name == kInvalidPortName) { + return; + } + + auto node_iter = peer_port_maps_.find(local_port->peer_node_name); + if (node_iter == peer_port_maps_.end()) { + return; + } + + auto& node_peer_port_map = node_iter->second; + auto ports_iter = node_peer_port_map.find(local_port->peer_port_name); + if (ports_iter == node_peer_port_map.end()) { + return; + } + + auto& local_ports_with_this_peer = ports_iter->second; + local_ports_with_this_peer.erase(local_port_name); + if (local_ports_with_this_peer.empty()) { + node_peer_port_map.erase(ports_iter); + } + if (node_peer_port_map.empty()) { + peer_port_maps_.erase(node_iter); + } +} + +void Node::SwapPortPeers(const PortName& port0_name, Port* port0, + const PortName& port1_name, Port* port1) { + ports_lock_.AssertCurrentThreadOwns(); + port0->AssertLockAcquired(); + port1->AssertLockAcquired(); + + auto& peer0_ports = + peer_port_maps_[port0->peer_node_name][port0->peer_port_name]; + auto& peer1_ports = + peer_port_maps_[port1->peer_node_name][port1->peer_port_name]; + peer0_ports.erase(port0_name); + peer1_ports.erase(port1_name); + peer0_ports.emplace(port1_name, PortRef(port1_name, RefPtr<Port>{port1})); + peer1_ports.emplace(port0_name, PortRef(port0_name, RefPtr<Port>{port0})); + + std::swap(port0->peer_node_name, port1->peer_node_name); + std::swap(port0->peer_port_name, port1->peer_port_name); +} + +void Node::MaybeResendAckRequest(const PortRef& port_ref) { + NodeName peer_node_name; + ScopedEvent ack_request_event; + { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + if (port->state != Port::kReceiving) { + return; + } + + if (!port->sequence_num_acknowledge_interval) { + return; + } + + peer_node_name = port->peer_node_name; + ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>( + port->peer_port_name, port->last_sequence_num_acknowledged + + port->sequence_num_acknowledge_interval); + } + + delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event)); +} + +void Node::MaybeForwardAckRequest(const PortRef& port_ref) { + NodeName peer_node_name; + ScopedEvent ack_request_event; + { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + if (port->state != Port::kProxying) { + return; + } + + if (!port->sequence_num_to_acknowledge) { + return; + } + + peer_node_name = port->peer_node_name; + ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>( + port->peer_port_name, port->sequence_num_to_acknowledge); + + port->sequence_num_to_acknowledge = 0; + } + + delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event)); +} + +void Node::MaybeResendAck(const PortRef& port_ref) { + NodeName peer_node_name; + ScopedEvent ack_event; + { + SinglePortLocker locker(&port_ref); + auto* port = locker.port(); + if (port->state != Port::kReceiving) { + return; + } + + uint64_t last_sequence_num_read = + port->message_queue.next_sequence_num() - 1; + if (!port->sequence_num_to_acknowledge || !last_sequence_num_read) { + return; + } + + peer_node_name = port->peer_node_name; + ack_event = mozilla::MakeUnique<UserMessageReadAckEvent>( + port->peer_port_name, last_sequence_num_read); + } + + delegate_->ForwardEvent(peer_node_name, std::move(ack_event)); +} + +Node::DelegateHolder::DelegateHolder(Node* node, NodeDelegate* delegate) + : node_(node), delegate_(delegate) { + DCHECK(node_); +} + +Node::DelegateHolder::~DelegateHolder() = default; + +#ifdef DEBUG +void Node::DelegateHolder::EnsureSafeDelegateAccess() const { + PortLocker::AssertNoPortsLockedOnCurrentThread(); + mozilla::MutexAutoLock lock(node_->ports_lock_); +} +#endif + +} // namespace ports +} // namespace core +} // namespace mojo diff --git a/ipc/chromium/src/mojo/core/ports/node.h b/ipc/chromium/src/mojo/core/ports/node.h new file mode 100644 index 0000000000..6754008177 --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/node.h @@ -0,0 +1,326 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_CORE_PORTS_NODE_H_ +#define MOJO_CORE_PORTS_NODE_H_ + +#include <stddef.h> +#include <stdint.h> + +#include <queue> +#include <unordered_map> + +#include "mojo/core/ports/event.h" +#include "mojo/core/ports/name.h" +#include "mojo/core/ports/port.h" +#include "mojo/core/ports/port_ref.h" +#include "mojo/core/ports/user_data.h" +#include "mozilla/Mutex.h" +#include "mozilla/RefPtr.h" + +namespace mojo { +namespace core { +namespace ports { + +enum : int { + OK = 0, + ERROR_PORT_UNKNOWN = -10, + ERROR_PORT_EXISTS = -11, + ERROR_PORT_STATE_UNEXPECTED = -12, + ERROR_PORT_CANNOT_SEND_SELF = -13, + ERROR_PORT_PEER_CLOSED = -14, + ERROR_PORT_CANNOT_SEND_PEER = -15, + ERROR_NOT_IMPLEMENTED = -100, +}; + +struct PortStatus { + bool has_messages; + bool receiving_messages; + bool peer_closed; + bool peer_remote; + size_t queued_message_count; + size_t queued_num_bytes; + size_t unacknowledged_message_count; +#ifdef FUZZING_SNAPSHOT + NodeName peer_node_name; +#endif +}; + +class MessageFilter; +class NodeDelegate; + +// A Node maintains a collection of Ports (see port.h) indexed by unique 128-bit +// addresses (names), performing routing and processing of events among the +// Ports within the Node and to or from other Nodes in the system. Typically +// (and practically, in all uses today) there is a single Node per system +// process. Thus a Node boundary effectively models a process boundary. +// +// New Ports can be created uninitialized using CreateUninitializedPort (and +// later initialized using InitializePort), or created in a fully initialized +// state using CreatePortPair(). Initialized ports have exactly one conjugate +// port which is the ultimate receiver of any user messages sent by that port. +// See SendUserMessage(). +// +// In addition to routing user message events, various control events are used +// by Nodes to coordinate Port behavior and lifetime within and across Nodes. +// See Event documentation for description of different types of events used by +// a Node to coordinate behavior. +class Node { + public: + enum class ShutdownPolicy { + DONT_ALLOW_LOCAL_PORTS, + ALLOW_LOCAL_PORTS, + }; + + // Does not take ownership of the delegate. + Node(const NodeName& name, NodeDelegate* delegate); + ~Node(); + + Node(const Node&) = delete; + void operator=(const Node&) = delete; + + // Returns true iff there are no open ports referring to another node or ports + // in the process of being transferred from this node to another. If this + // returns false, then to ensure clean shutdown, it is necessary to keep the + // node alive and continue routing messages to it via AcceptMessage. This + // method may be called again after AcceptMessage to check if the Node is now + // ready to be destroyed. + // + // If |policy| is set to |ShutdownPolicy::ALLOW_LOCAL_PORTS|, this will return + // |true| even if some ports remain alive, as long as none of them are proxies + // to another node. + bool CanShutdownCleanly( + ShutdownPolicy policy = ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS); + + // Lookup the named port. + int GetPort(const PortName& port_name, PortRef* port_ref); + + // Creates a port on this node. Before the port can be used, it must be + // initialized using InitializePort. This method is useful for bootstrapping + // a connection between two nodes. Generally, ports are created using + // CreatePortPair instead. + int CreateUninitializedPort(PortRef* port_ref); + + // Initializes a newly created port. + int InitializePort(const PortRef& port_ref, const NodeName& peer_node_name, + const PortName& peer_port_name); + + // Generates a new connected pair of ports bound to this node. These ports + // are initialized and ready to go. + int CreatePortPair(PortRef* port0_ref, PortRef* port1_ref); + + // User data associated with the port. + int SetUserData(const PortRef& port_ref, RefPtr<UserData> user_data); + int GetUserData(const PortRef& port_ref, RefPtr<UserData>* user_data); + + // Prevents further messages from being sent from this port or delivered to + // this port. The port is removed, and the port's peer is notified of the + // closure after it has consumed all pending messages. + int ClosePort(const PortRef& port_ref); + + // Returns the current status of the port. + int GetStatus(const PortRef& port_ref, PortStatus* port_status); + + // Returns the next available message on the specified port or returns a null + // message if there are none available. Returns ERROR_PORT_PEER_CLOSED to + // indicate that this port's peer has closed. In such cases GetMessage may + // be called until it yields a null message, indicating that no more messages + // may be read from the port. + // + // If |filter| is non-null, the next available message is returned only if it + // is matched by the filter. If the provided filter does not match the next + // available message, GetMessage() behaves as if there is no message + // available. Ownership of |filter| is not taken, and it must outlive the + // extent of this call. + int GetMessage(const PortRef& port_ref, + mozilla::UniquePtr<UserMessageEvent>* message, + MessageFilter* filter); + + // Sends a message from the specified port to its peer. Note that the message + // notification may arrive synchronously (via PortStatusChanged() on the + // delegate) if the peer is local to this Node. + int SendUserMessage(const PortRef& port_ref, + mozilla::UniquePtr<UserMessageEvent> message); + + // Makes the port send acknowledge requests to its conjugate to acknowledge + // at least every |sequence_number_acknowledge_interval| messages as they're + // read from the conjugate. The number of unacknowledged messages is exposed + // in the |unacknowledged_message_count| field of PortStatus. This allows + // bounding the number of unread and/or in-transit messages from this port + // to its conjugate between zero and |unacknowledged_message_count|. + int SetAcknowledgeRequestInterval(const PortRef& port_ref, + uint64_t sequence_num_acknowledge_interval); + + // Corresponding to NodeDelegate::ForwardEvent. + int AcceptEvent(ScopedEvent event); + + // Called to merge two ports with each other. If you have two independent + // port pairs A <=> B and C <=> D, the net result of merging B and C is a + // single connected port pair A <=> D. + // + // Note that the behavior of this operation is undefined if either port to be + // merged (B or C above) has ever been read from or written to directly, and + // this must ONLY be called on one side of the merge, though it doesn't matter + // which side. + // + // It is safe for the non-merged peers (A and D above) to be transferred, + // closed, and/or written to before, during, or after the merge. + int MergePorts(const PortRef& port_ref, const NodeName& destination_node_name, + const PortName& destination_port_name); + + // Like above but merges two ports local to this node. Because both ports are + // local this can also verify that neither port has been written to before the + // merge. If this fails for any reason, both ports are closed. Otherwise OK + // is returned and the ports' receiving peers are connected to each other. + int MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref); + + // Called to inform this node that communication with another node is lost + // indefinitely. This triggers cleanup of ports bound to this node. + int LostConnectionToNode(const NodeName& node_name); + + private: + // Helper to ensure that a Node always calls into its delegate safely, i.e. + // without holding any internal locks. + class DelegateHolder { + public: + DelegateHolder(Node* node, NodeDelegate* delegate); + ~DelegateHolder(); + + DelegateHolder(const DelegateHolder&) = delete; + void operator=(const DelegateHolder&) = delete; + + NodeDelegate* operator->() const { + EnsureSafeDelegateAccess(); + return delegate_; + } + + private: +#ifdef DEBUG + void EnsureSafeDelegateAccess() const; +#else + void EnsureSafeDelegateAccess() const {} +#endif + + Node* const node_; + NodeDelegate* const delegate_; + }; + + int OnUserMessage(mozilla::UniquePtr<UserMessageEvent> message); + int OnPortAccepted(mozilla::UniquePtr<PortAcceptedEvent> event); + int OnObserveProxy(mozilla::UniquePtr<ObserveProxyEvent> event); + int OnObserveProxyAck(mozilla::UniquePtr<ObserveProxyAckEvent> event); + int OnObserveClosure(mozilla::UniquePtr<ObserveClosureEvent> event); + int OnMergePort(mozilla::UniquePtr<MergePortEvent> event); + int OnUserMessageReadAckRequest( + mozilla::UniquePtr<UserMessageReadAckRequestEvent> event); + int OnUserMessageReadAck(mozilla::UniquePtr<UserMessageReadAckEvent> event); + + int AddPortWithName(const PortName& port_name, RefPtr<Port> port); + void ErasePort(const PortName& port_name); + + int SendUserMessageInternal(const PortRef& port_ref, + mozilla::UniquePtr<UserMessageEvent>* message); + int MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref, + bool allow_close_on_bad_state); + void ConvertToProxy(Port* port, const NodeName& to_node_name, + PortName* port_name, + Event::PortDescriptor* port_descriptor) + MOZ_REQUIRES(ports_lock_); + int AcceptPort(const PortName& port_name, + const Event::PortDescriptor& port_descriptor); + + int PrepareToForwardUserMessage(const PortRef& forwarding_port_ref, + Port::State expected_port_state, + bool ignore_closed_peer, + UserMessageEvent* message, + NodeName* forward_to_node); + int BeginProxying(const PortRef& port_ref); + int ForwardUserMessagesFromProxy(const PortRef& port_ref); + void InitiateProxyRemoval(const PortRef& port_ref); + void TryRemoveProxy(const PortRef& port_ref); + void DestroyAllPortsWithPeer(const NodeName& node_name, + const PortName& port_name); + + // Changes the peer node and port name referenced by |port|. Note that both + // |ports_lock_| MUST be held through the extent of this method. + // |local_port|'s lock must be held if and only if a reference to |local_port| + // exist in |ports_|. + void UpdatePortPeerAddress(const PortName& local_port_name, Port* local_port, + const NodeName& new_peer_node, + const PortName& new_peer_port) + MOZ_REQUIRES(ports_lock_); + + // Removes an entry from |peer_port_map_| corresponding to |local_port|'s peer + // address, if valid. + void RemoveFromPeerPortMap(const PortName& local_port_name, Port* local_port) + MOZ_REQUIRES(ports_lock_); + + // Swaps the peer information for two local ports. Used during port merges. + // Note that |ports_lock_| must be held along with each of the two port's own + // locks, through the extent of this method. + void SwapPortPeers(const PortName& port0_name, Port* port0, + const PortName& port1_name, Port* port1) + MOZ_REQUIRES(ports_lock_); + + // Sends an acknowledge request to the peer if the port has a non-zero + // |sequence_num_acknowledge_interval|. This needs to be done when the port's + // peer changes, as the previous peer proxy may not have forwarded any prior + // acknowledge request before deleting itself. + void MaybeResendAckRequest(const PortRef& port_ref); + + // Forwards a stored acknowledge request to the peer if the proxy has a + // non-zero |sequence_num_acknowledge_interval|. + void MaybeForwardAckRequest(const PortRef& port_ref); + + // Sends an acknowledge of the most recently read sequence number to the peer + // if any messages have been read, and the port has a non-zero + // |sequence_num_to_acknowledge|. + void MaybeResendAck(const PortRef& port_ref); + + const NodeName name_; + const DelegateHolder delegate_; + + // Just to clarify readability of the types below. + using LocalPortName = PortName; + using PeerPortName = PortName; + + // Guards access to |ports_| and |peer_port_maps_| below. + // + // This must never be acquired while an individual port's lock is held on the + // same thread. Conversely, individual port locks may be acquired while this + // one is held. + // + // Because UserMessage events may execute arbitrary user code during + // destruction, it is also important to ensure that such events are never + // destroyed while this (or any individual Port) lock is held. + mozilla::Mutex ports_lock_{"Ports Lock"}; + std::unordered_map<LocalPortName, RefPtr<Port>> ports_ + MOZ_GUARDED_BY(ports_lock_); + + // Maps a peer port name to a list of PortRefs for all local ports which have + // the port name key designated as their peer port. The set of local ports + // which have the same peer port is expected to always be relatively small and + // usually 1. Hence we just use a flat_map of local PortRefs keyed on each + // local port's name. + // + // FIXME(nika): We don't have `base::flat_map` or a super equivalent type with + // the same API, so just use a nested `std::unordered_map` for now. We should + // probably change all of this to instead use our types eventually. + using PeerPortMap = + std::unordered_map<PeerPortName, + std::unordered_map<LocalPortName, PortRef>>; + + // A reverse mapping which can be used to find all local ports that reference + // a given peer node or a local port that references a specific given peer + // port on a peer node. The key to this map is the corresponding peer node + // name. + std::unordered_map<NodeName, PeerPortMap> peer_port_maps_ + MOZ_GUARDED_BY(ports_lock_); +}; + +} // namespace ports +} // namespace core +} // namespace mojo + +#endif // MOJO_CORE_PORTS_NODE_H_ diff --git a/ipc/chromium/src/mojo/core/ports/node_delegate.h b/ipc/chromium/src/mojo/core/ports/node_delegate.h new file mode 100644 index 0000000000..3172779c06 --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/node_delegate.h @@ -0,0 +1,38 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_CORE_PORTS_NODE_DELEGATE_H_ +#define MOJO_CORE_PORTS_NODE_DELEGATE_H_ + +#include <stddef.h> + +#include "mojo/core/ports/event.h" +#include "mojo/core/ports/name.h" +#include "mojo/core/ports/port_ref.h" + +namespace mojo { +namespace core { +namespace ports { + +class NodeDelegate { + public: + virtual ~NodeDelegate() = default; + + // Forward an event (possibly asynchronously) to the specified node. + virtual void ForwardEvent(const NodeName& node, ScopedEvent event) = 0; + + // Broadcast an event to all nodes. + virtual void BroadcastEvent(ScopedEvent event) = 0; + + // Indicates that the port's status has changed recently. Use Node::GetStatus + // to query the latest status of the port. Note, this event could be spurious + // if another thread is simultaneously modifying the status of the port. + virtual void PortStatusChanged(const PortRef& port_ref) = 0; +}; + +} // namespace ports +} // namespace core +} // namespace mojo + +#endif // MOJO_CORE_PORTS_NODE_DELEGATE_H_ diff --git a/ipc/chromium/src/mojo/core/ports/port.cc b/ipc/chromium/src/mojo/core/ports/port.cc new file mode 100644 index 0000000000..c46dc9ed25 --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/port.cc @@ -0,0 +1,28 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/core/ports/port.h" + +namespace mojo { +namespace core { +namespace ports { + +Port::Port(uint64_t next_sequence_num_to_send, + uint64_t next_sequence_num_to_receive) + : state(kUninitialized), + next_sequence_num_to_send(next_sequence_num_to_send), + last_sequence_num_acknowledged(next_sequence_num_to_send - 1), + sequence_num_acknowledge_interval(0), + last_sequence_num_to_receive(0), + sequence_num_to_acknowledge(0), + message_queue(next_sequence_num_to_receive), + remove_proxy_on_last_message(false), + peer_closed(false), + peer_lost_unexpectedly(false) {} + +Port::~Port() = default; + +} // namespace ports +} // namespace core +} // namespace mojo diff --git a/ipc/chromium/src/mojo/core/ports/port.h b/ipc/chromium/src/mojo/core/ports/port.h new file mode 100644 index 0000000000..4fdffe7064 --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/port.h @@ -0,0 +1,239 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_CORE_PORTS_PORT_H_ +#define MOJO_CORE_PORTS_PORT_H_ + +#include <memory> +#include <queue> +#include <utility> +#include <vector> + +#include "mojo/core/ports/event.h" +#include "mojo/core/ports/message_queue.h" +#include "mojo/core/ports/user_data.h" +#include "mozilla/Atomics.h" +#include "mozilla/PlatformMutex.h" +#include "mozilla/RefPtr.h" +#include "nsISupportsImpl.h" + +namespace mojo { +namespace core { +namespace ports { + +class PortLocker; + +namespace detail { + +// Ports cannot use mozilla::Mutex, as the acquires-before relationships handled +// by PortLocker can overload the debug-only deadlock detector. +class MOZ_CAPABILITY("mutex") PortMutex : private ::mozilla::detail::MutexImpl { + public: + void AssertCurrentThreadOwns() const MOZ_ASSERT_CAPABILITY(this) { +#ifdef DEBUG + MOZ_ASSERT(mOwningThread == PR_GetCurrentThread()); +#endif + } + + private: + // PortMutex should only be locked/unlocked via PortLocker + friend class ::mojo::core::ports::PortLocker; + + void Lock() MOZ_CAPABILITY_ACQUIRE() { + ::mozilla::detail::MutexImpl::lock(); +#ifdef DEBUG + mOwningThread = PR_GetCurrentThread(); +#endif + } + void Unlock() MOZ_CAPABILITY_RELEASE() { +#ifdef DEBUG + MOZ_ASSERT(mOwningThread == PR_GetCurrentThread()); + mOwningThread = nullptr; +#endif + ::mozilla::detail::MutexImpl::unlock(); + } + +#ifdef DEBUG + mozilla::Atomic<PRThread*, mozilla::Relaxed> mOwningThread; +#endif +}; + +} // namespace detail + +// A Port is essentially a node in a circular list of addresses. For the sake of +// this documentation such a list will henceforth be referred to as a "route." +// Routes are the fundamental medium upon which all Node event circulation takes +// place and are thus the backbone of all Mojo message passing. +// +// Each Port is identified by a 128-bit address within a Node (see node.h). A +// Port doesn't really *do* anything per se: it's a named collection of state, +// and its owning Node manages all event production, transmission, routing, and +// processing logic. See Node for more details on how Ports may be used to +// transmit arbitrary user messages as well as other Ports. +// +// Ports may be in any of a handful of states (see State below) which dictate +// how they react to system events targeting them. In the simplest and most +// common case, Ports are initially created as an entangled pair (i.e. a simple +// cycle consisting of two Ports) both in the |kReceiving| State. Consider Ports +// we'll label |A| and |B| here, which may be created using +// Node::CreatePortPair(): +// +// +-----+ +-----+ +// | |--------->| | +// | A | | B | +// | |<---------| | +// +-----+ +-----+ +// +// |A| references |B| via |peer_node_name| and |peer_port_name|, while |B| in +// turn references |A|. Note that a Node is NEVER aware of who is sending events +// to a given Port; it is only aware of where it must route events FROM a given +// Port. +// +// For the sake of documentation, we refer to one receiving port in a route as +// the "conjugate" of the other. A receiving port's conjugate is also its peer +// upon initial creation, but because of proxying this may not be the case at a +// later time. +// +// ALL access to this data structure must be guarded by |lock_| acquisition, +// which is only possible using a PortLocker. PortLocker ensures that +// overlapping Port lock acquisitions on a single thread are always acquired in +// a globally consistent order. +class Port { + NS_INLINE_DECL_THREADSAFE_REFCOUNTING(Port) + + public: + // The state of a given Port. A Port may only exist in one of these states at + // any given time. + enum State { + // The Port is not yet paired with a peer and is therefore unusable. See + // Node::CreateUninitializedPort and Node::InitializePort for motivation. + kUninitialized, + + // The Port is publicly visible outside of its Node and may be used to send + // and receive user messages. There are always AT MOST two |kReceiving| + // Ports along any given route. A user message event sent from a receiving + // port is always circulated along the Port's route until it reaches either + // a dead-end -- in which case the route is broken -- or it reaches the + // other receiving Port in the route -- in which case it lands in that + // Port's incoming message queue which can by read by user code. + kReceiving, + + // The Port has been taken out of the |kReceiving| state in preparation for + // proxying to a new destination. A Port enters this state immediately when + // it's attached to a user message and may only leave this state when + // transitioning to |kProxying|. See Node for more details. + kBuffering, + + // The Port is forwarding all user messages (and most other events) to its + // peer without discretion. Ports in the |kProxying| state may never leave + // this state and only exist temporarily until their owning Node has + // established that no more events will target them. See Node for more + // details. + kProxying, + + // The Port has been closed and is now permanently unusable. Only + // |kReceiving| ports can be closed. + kClosed + }; + + // The current State of the Port. + State state; + + // The Node and Port address to which events should be routed FROM this Port. + // Note that this is NOT necessarily the address of the Port currently sending + // events TO this Port. + NodeName peer_node_name; + PortName peer_port_name; + + // The next available sequence number to use for outgoing user message events + // originating from this port. + uint64_t next_sequence_num_to_send; + + // The largest acknowledged user message event sequence number. + uint64_t last_sequence_num_acknowledged; + + // The interval for which acknowledge requests will be sent. A value of N will + // cause an acknowledge request for |last_sequence_num_acknowledged| + N when + // initially set and on received acknowledge. This means that the lower bound + // for unread or in-transit messages is |next_sequence_num_to_send| - + // |last_sequence_num_acknowledged| + |sequence_number_acknowledge_interval|. + // If zero, no acknowledge requests are sent. + uint64_t sequence_num_acknowledge_interval; + + // The sequence number of the last message this Port should ever expect to + // receive in its lifetime. May be used to determine that a proxying port is + // ready to be destroyed or that a receiving port's conjugate has been closed + // and we know the sequence number of the last message it sent. + uint64_t last_sequence_num_to_receive; + + // The sequence number of the message for which this Port should send an + // acknowledge message. In the buffering state, holds the acknowledge request + // value that is forwarded to the peer on transition to proxying. + // This is zero in any port that's never received an acknowledge request, and + // in proxies that have forwarded a stored acknowledge. + uint64_t sequence_num_to_acknowledge; + + // The queue of incoming user messages received by this Port. Only non-empty + // for buffering or receiving Ports. When a buffering port enters the proxying + // state, it flushes its queue and the proxy then bypasses the queue + // indefinitely. + // + // A receiving port's queue only has elements removed by user code reading + // messages from the port. + // + // Note that this is a priority queue which only exposes messages to consumers + // in strict sequential order. + MessageQueue message_queue; + + // In some edge cases, a Node may need to remember to route a single special + // event upon destruction of this (proxying) Port. That event is stashed here + // in the interim. + mozilla::UniquePtr<std::pair<NodeName, ScopedEvent>> send_on_proxy_removal; + + // Arbitrary user data attached to the Port. In practice, Mojo uses this to + // stash an observer interface which can be notified about various Port state + // changes. + RefPtr<UserData> user_data; + + // Indicates that this (proxying) Port has received acknowledgement that no + // new user messages will be routed to it. If |true|, the proxy will be + // removed once it has received and forwarded all sequenced messages up to and + // including the one numbered |last_sequence_num_to_receive|. + bool remove_proxy_on_last_message; + + // Indicates that this Port is aware that its nearest (in terms of forward, + // non-zero cyclic routing distance) receiving Port has been closed. + bool peer_closed; + + // Indicates that this Port lost its peer unexpectedly (e.g. via process death + // rather than receiving an ObserveClosure event). In this case + // |peer_closed| will be true but |last_sequence_num_to_receive| cannot be + // known. Such ports will continue to make message available until their + // message queue is empty. + bool peer_lost_unexpectedly; + + Port(uint64_t next_sequence_num_to_send, + uint64_t next_sequence_num_to_receive); + + Port(const Port&) = delete; + void operator=(const Port&) = delete; + + void AssertLockAcquired() { lock_.AssertCurrentThreadOwns(); } + + private: + friend class PortLocker; + + ~Port(); + + // This lock guards all fields in Port, but is locked in a unique way which is + // unfortunately somewhat difficult to get to work with the thread-safety + // analysis. + detail::PortMutex lock_ MOZ_ANNOTATED; +}; + +} // namespace ports +} // namespace core +} // namespace mojo + +#endif // MOJO_CORE_PORTS_PORT_H_ diff --git a/ipc/chromium/src/mojo/core/ports/port_locker.cc b/ipc/chromium/src/mojo/core/ports/port_locker.cc new file mode 100644 index 0000000000..044a26f143 --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/port_locker.cc @@ -0,0 +1,75 @@ +// Copyright 2017 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/core/ports/port_locker.h" + +#include <algorithm> + +#include "mojo/core/ports/port.h" + +#ifdef DEBUG +# include "base/thread_local.h" +#endif + +namespace mojo { +namespace core { +namespace ports { + +namespace { + +#ifdef DEBUG +void UpdateTLS(PortLocker* old_locker, PortLocker* new_locker) { + // Sanity check when DCHECK is on to make sure there is only ever one + // PortLocker extant on the current thread. + static auto* tls = new base::ThreadLocalPointer<PortLocker>(); + DCHECK_EQ(old_locker, tls->Get()); + tls->Set(new_locker); +} +#endif + +} // namespace + +PortLocker::PortLocker(const PortRef** port_refs, size_t num_ports) + : port_refs_(port_refs), num_ports_(num_ports) { +#ifdef DEBUG + UpdateTLS(nullptr, this); +#endif + + // Sort the ports by address to lock them in a globally consistent order. + std::sort( + port_refs_, port_refs_ + num_ports_, + [](const PortRef* a, const PortRef* b) { return a->port() < b->port(); }); + for (size_t i = 0; i < num_ports_; ++i) { + // TODO(crbug.com/725605): Remove this CHECK. + CHECK(port_refs_[i]->port()); + port_refs_[i]->port()->lock_.Lock(); + } +} + +PortLocker::~PortLocker() { + for (size_t i = 0; i < num_ports_; ++i) { + port_refs_[i]->port()->lock_.Unlock(); + } + +#ifdef DEBUG + UpdateTLS(this, nullptr); +#endif +} + +#ifdef DEBUG +// static +void PortLocker::AssertNoPortsLockedOnCurrentThread() { + // Forces a DCHECK if the TLS PortLocker is anything other than null. + UpdateTLS(nullptr, nullptr); +} +#endif + +SinglePortLocker::SinglePortLocker(const PortRef* port_ref) + : port_ref_(port_ref), locker_(&port_ref_, 1) {} + +SinglePortLocker::~SinglePortLocker() = default; + +} // namespace ports +} // namespace core +} // namespace mojo diff --git a/ipc/chromium/src/mojo/core/ports/port_locker.h b/ipc/chromium/src/mojo/core/ports/port_locker.h new file mode 100644 index 0000000000..6a88d14912 --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/port_locker.h @@ -0,0 +1,90 @@ +// Copyright 2017 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_CORE_PORTS_PORT_LOCKER_H_ +#define MOJO_CORE_PORTS_PORT_LOCKER_H_ + +#include <stdint.h> + +#include "base/logging.h" +#include "mojo/core/ports/port_ref.h" + +namespace mojo { +namespace core { +namespace ports { + +class Port; +class PortRef; + +// A helper which must be used to acquire individual Port locks. Any given +// thread may have at most one of these alive at any time. This ensures that +// when multiple ports are locked, they're locked in globally consistent order. +// +// Port locks are acquired upon construction of this object and released upon +// destruction. +class PortLocker { + public: + // Constructs a PortLocker over a sequence of |num_ports| contiguous + // |PortRef*|s. The sequence may be reordered by this constructor, and upon + // return, all referenced ports' locks are held. + PortLocker(const PortRef** port_refs, size_t num_ports); + ~PortLocker(); + + PortLocker(const PortLocker&) = delete; + void operator=(const PortLocker&) = delete; + + // Provides safe access to a PortRef's Port. Note that in release builds this + // doesn't do anything other than pass through to the private accessor on + // |port_ref|, but it does force callers to go through a PortLocker to get to + // the state, thus minimizing the likelihood that they'll go and do something + // stupid. + Port* GetPort(const PortRef& port_ref) const { +#ifdef DEBUG + // Sanity check when DEBUG is on to ensure this is actually a port whose + // lock is held by this PortLocker. + bool is_port_locked = false; + for (size_t i = 0; i < num_ports_ && !is_port_locked; ++i) { + if (port_refs_[i]->port() == port_ref.port()) { + is_port_locked = true; + } + } + DCHECK(is_port_locked); +#endif + return port_ref.port(); + } + +// A helper which can be used to verify that no Port locks are held on the +// current thread. In non-DCHECK builds this is a no-op. +#ifdef DEBUG + static void AssertNoPortsLockedOnCurrentThread(); +#else + static void AssertNoPortsLockedOnCurrentThread() {} +#endif + + private: + const PortRef** const port_refs_; + const size_t num_ports_; +}; + +// Convenience wrapper for a PortLocker that locks a single port. +class SinglePortLocker { + public: + explicit SinglePortLocker(const PortRef* port_ref); + ~SinglePortLocker(); + + SinglePortLocker(const SinglePortLocker&) = delete; + void operator=(const SinglePortLocker&) = delete; + + Port* port() const { return locker_.GetPort(*port_ref_); } + + private: + const PortRef* port_ref_; + PortLocker locker_; +}; + +} // namespace ports +} // namespace core +} // namespace mojo + +#endif // MOJO_CORE_PORTS_PORT_LOCKER_H_ diff --git a/ipc/chromium/src/mojo/core/ports/port_ref.cc b/ipc/chromium/src/mojo/core/ports/port_ref.cc new file mode 100644 index 0000000000..b9267f8a6a --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/port_ref.cc @@ -0,0 +1,30 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/core/ports/port_ref.h" + +#include "mojo/core/ports/port.h" + +namespace mojo { +namespace core { +namespace ports { + +PortRef::~PortRef() = default; + +PortRef::PortRef() = default; + +PortRef::PortRef(const PortName& name, RefPtr<Port> port) + : name_(name), port_(std::move(port)) {} + +PortRef::PortRef(const PortRef& other) = default; + +PortRef::PortRef(PortRef&& other) = default; + +PortRef& PortRef::operator=(const PortRef& other) = default; + +PortRef& PortRef::operator=(PortRef&& other) = default; + +} // namespace ports +} // namespace core +} // namespace mojo diff --git a/ipc/chromium/src/mojo/core/ports/port_ref.h b/ipc/chromium/src/mojo/core/ports/port_ref.h new file mode 100644 index 0000000000..0ff0d32fc7 --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/port_ref.h @@ -0,0 +1,47 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_CORE_PORTS_PORT_REF_H_ +#define MOJO_CORE_PORTS_PORT_REF_H_ + +#include "mojo/core/ports/name.h" +#include "mozilla/RefPtr.h" + +namespace mojo { +namespace core { +namespace ports { + +class Port; +class PortLocker; + +class PortRef { + public: + ~PortRef(); + PortRef(); + PortRef(const PortName& name, RefPtr<Port> port); + + PortRef(const PortRef& other); + PortRef(PortRef&& other); + + PortRef& operator=(const PortRef& other); + PortRef& operator=(PortRef&& other); + + const PortName& name() const { return name_; } + + bool is_valid() const { return !!port_; } + + private: + friend class PortLocker; + + Port* port() const { return port_.get(); } + + PortName name_; + RefPtr<Port> port_; +}; + +} // namespace ports +} // namespace core +} // namespace mojo + +#endif // MOJO_CORE_PORTS_PORT_REF_H_ diff --git a/ipc/chromium/src/mojo/core/ports/user_data.h b/ipc/chromium/src/mojo/core/ports/user_data.h new file mode 100644 index 0000000000..f3d568c259 --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/user_data.h @@ -0,0 +1,25 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_CORE_PORTS_USER_DATA_H_ +#define MOJO_CORE_PORTS_USER_DATA_H_ + +#include "nsISupportsImpl.h" + +namespace mojo { +namespace core { +namespace ports { + +class UserData { + NS_INLINE_DECL_THREADSAFE_REFCOUNTING(UserData); + + protected: + virtual ~UserData() = default; +}; + +} // namespace ports +} // namespace core +} // namespace mojo + +#endif // MOJO_CORE_PORTS_USER_DATA_H_ diff --git a/ipc/chromium/src/mojo/core/ports/user_message.cc b/ipc/chromium/src/mojo/core/ports/user_message.cc new file mode 100644 index 0000000000..1aadb21425 --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/user_message.cc @@ -0,0 +1,21 @@ +// Copyright 2017 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/core/ports/user_message.h" + +namespace mojo { +namespace core { +namespace ports { + +UserMessage::UserMessage(const TypeInfo* type_info) : type_info_(type_info) {} + +UserMessage::~UserMessage() = default; + +bool UserMessage::WillBeRoutedExternally(UserMessageEvent&) { return true; } + +size_t UserMessage::GetSizeIfSerialized() const { return 0; } + +} // namespace ports +} // namespace core +} // namespace mojo diff --git a/ipc/chromium/src/mojo/core/ports/user_message.h b/ipc/chromium/src/mojo/core/ports/user_message.h new file mode 100644 index 0000000000..85c03a9557 --- /dev/null +++ b/ipc/chromium/src/mojo/core/ports/user_message.h @@ -0,0 +1,62 @@ +// Copyright 2017 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_CORE_PORTS_USER_MESSAGE_H_ +#define MOJO_CORE_PORTS_USER_MESSAGE_H_ + +#include <stddef.h> + +namespace mojo { +namespace core { +namespace ports { + +class UserMessageEvent; + +// Base type to use for any embedder-defined user message implementation. This +// class is intentionally empty. +// +// Provides a bit of type-safety help to subclasses since by design downcasting +// from this type is a common operation in embedders. +// +// Each subclass should define a static const instance of TypeInfo named +// |kUserMessageTypeInfo| and pass its address down to the UserMessage +// constructor. The type of a UserMessage can then be dynamically inspected by +// comparing |type_info()| to any subclass's |&kUserMessageTypeInfo|. +class UserMessage { + public: + struct TypeInfo {}; + + explicit UserMessage(const TypeInfo* type_info); + virtual ~UserMessage(); + + UserMessage(const UserMessage&) = delete; + void operator=(const UserMessage&) = delete; + + const TypeInfo* type_info() const { return type_info_; } + + // Invoked immediately before the system asks the embedder to forward this + // message to an external node. + // + // The UserMessageEvent is passed in to allow ports and other such values to + // be attached to the message before it is sent externally, in case late + // serialization is performed. + // + // Returns |true| if the message is OK to route externally, or |false| + // otherwise. Returning |false| implies an unrecoverable condition, and the + // message event will be destroyed without further routing. + virtual bool WillBeRoutedExternally(UserMessageEvent& event); + + // Returns the size in bytes of this message iff it's serialized. Zero + // otherwise. + virtual size_t GetSizeIfSerialized() const; + + private: + const TypeInfo* const type_info_; +}; + +} // namespace ports +} // namespace core +} // namespace mojo + +#endif // MOJO_CORE_PORTS_USER_MESSAGE_H_ |