summaryrefslogtreecommitdiffstats
path: root/ipc/glue/NodeController.cpp
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--ipc/glue/NodeController.cpp856
1 files changed, 856 insertions, 0 deletions
diff --git a/ipc/glue/NodeController.cpp b/ipc/glue/NodeController.cpp
new file mode 100644
index 0000000000..72e9b9a8fe
--- /dev/null
+++ b/ipc/glue/NodeController.cpp
@@ -0,0 +1,856 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: set ts=8 sts=2 et sw=2 tw=80: */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "mozilla/ipc/NodeController.h"
+#include "MainThreadUtils.h"
+#include "base/process_util.h"
+#include "chrome/common/ipc_message.h"
+#include "mojo/core/ports/name.h"
+#include "mojo/core/ports/node.h"
+#include "mozilla/AlreadyAddRefed.h"
+#include "mozilla/RandomNum.h"
+#include "mozilla/StaticPtr.h"
+#include "mozilla/Assertions.h"
+#include "mozilla/ToString.h"
+#include "mozilla/ipc/BrowserProcessSubThread.h"
+#include "mozilla/ipc/ProtocolUtils.h"
+#include "mozilla/mozalloc.h"
+#include "nsISerialEventTarget.h"
+#include "nsTArray.h"
+#include "nsXULAppAPI.h"
+#include "nsPrintfCString.h"
+
+#define PORTS_ALWAYS_OK(expr) MOZ_ALWAYS_TRUE(mojo::core::ports::OK == (expr))
+
+namespace mozilla::ipc {
+
+static StaticRefPtr<NodeController> gNodeController;
+
+static LazyLogModule gNodeControllerLog{"NodeController"};
+
+// Helper logger macro which includes the name of the `this` NodeController in
+// the logged messages.
+#define NODECONTROLLER_LOG(level_, fmt_, ...) \
+ MOZ_LOG(gNodeControllerLog, level_, \
+ ("[%s]: " fmt_, ToString(mName).c_str(), ##__VA_ARGS__))
+
+// Helper warning macro which both does logger logging and emits NS_WARNING logs
+// under debug mode.
+#ifdef DEBUG
+# define NODECONTROLLER_WARNING(fmt_, ...) \
+ do { \
+ nsPrintfCString warning("[%s]: " fmt_, ToString(mName).c_str(), \
+ ##__VA_ARGS__); \
+ NS_WARNING(warning.get()); \
+ MOZ_LOG(gNodeControllerLog, LogLevel::Debug, ("%s", warning.get())); \
+ } while (0)
+#else
+# define NODECONTROLLER_WARNING(fmt_, ...) \
+ NODECONTROLLER_LOG(LogLevel::Warning, fmt_, ##__VA_ARGS__)
+#endif
+
+NodeController::NodeController(const NodeName& aName)
+ : mName(aName), mNode(MakeUnique<Node>(aName, this)) {}
+
+NodeController::~NodeController() {
+ auto state = mState.Lock();
+ MOZ_RELEASE_ASSERT(state->mPeers.IsEmpty(),
+ "Destroying NodeController before closing all peers");
+ MOZ_RELEASE_ASSERT(state->mInvites.IsEmpty(),
+ "Destroying NodeController before closing all invites");
+};
+
+// FIXME: Actually provide some way to create the thing.
+/* static */ NodeController* NodeController::GetSingleton() {
+ MOZ_ASSERT(gNodeController);
+ return gNodeController;
+}
+
+std::pair<ScopedPort, ScopedPort> NodeController::CreatePortPair() {
+ PortRef port0, port1;
+ PORTS_ALWAYS_OK(mNode->CreatePortPair(&port0, &port1));
+ return {ScopedPort{std::move(port0), this},
+ ScopedPort{std::move(port1), this}};
+}
+
+auto NodeController::GetPort(const PortName& aName) -> PortRef {
+ PortRef port;
+ int rv = mNode->GetPort(aName, &port);
+ if (NS_WARN_IF(rv != mojo::core::ports::OK)) {
+ NODECONTROLLER_WARNING("Call to GetPort(%s) Failed",
+ ToString(aName).c_str());
+ return {};
+ }
+ return port;
+}
+
+void NodeController::SetPortObserver(const PortRef& aPort,
+ PortObserver* aObserver) {
+ PORTS_ALWAYS_OK(mNode->SetUserData(aPort, aObserver));
+}
+
+auto NodeController::GetStatus(const PortRef& aPort) -> Maybe<PortStatus> {
+ PortStatus status{};
+ int rv = mNode->GetStatus(aPort, &status);
+ if (rv != mojo::core::ports::OK) {
+ return Nothing();
+ }
+ return Some(status);
+}
+
+void NodeController::ClosePort(const PortRef& aPort) {
+ PORTS_ALWAYS_OK(mNode->ClosePort(aPort));
+}
+
+bool NodeController::GetMessage(const PortRef& aPort,
+ UniquePtr<IPC::Message>* aMessage) {
+ UniquePtr<UserMessageEvent> messageEvent;
+ int rv = mNode->GetMessage(aPort, &messageEvent, nullptr);
+ if (rv != mojo::core::ports::OK) {
+ if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) {
+ return false;
+ }
+ MOZ_CRASH("GetMessage on port in invalid state");
+ }
+
+ if (messageEvent) {
+ UniquePtr<IPC::Message> message = messageEvent->TakeMessage<IPC::Message>();
+
+ // If our UserMessageEvent has any ports directly attached to it, fetch them
+ // from our node and attach them to the IPC::Message we extracted.
+ //
+ // It's important to only do this if we have nonempty set of ports on the
+ // event, as we may have never serialized our IPC::Message's ports onto the
+ // event if it was routed in-process.
+ if (messageEvent->num_ports() > 0) {
+ nsTArray<ScopedPort> attachedPorts(messageEvent->num_ports());
+ for (size_t i = 0; i < messageEvent->num_ports(); ++i) {
+ attachedPorts.AppendElement(
+ ScopedPort{GetPort(messageEvent->ports()[i]), this});
+ }
+ message->SetAttachedPorts(std::move(attachedPorts));
+ }
+
+ *aMessage = std::move(message);
+ } else {
+ *aMessage = nullptr;
+ }
+ return true;
+}
+
+bool NodeController::SendUserMessage(const PortRef& aPort,
+ UniquePtr<IPC::Message> aMessage) {
+ auto messageEvent = MakeUnique<UserMessageEvent>(0);
+ messageEvent->AttachMessage(std::move(aMessage));
+
+ int rv = mNode->SendUserMessage(aPort, std::move(messageEvent));
+ if (rv == mojo::core::ports::OK) {
+ return true;
+ }
+ if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) {
+ NODECONTROLLER_LOG(LogLevel::Debug,
+ "Ignoring message to port %s as peer was closed",
+ ToString(aPort.name()).c_str());
+ return true;
+ }
+ NODECONTROLLER_WARNING("Failed to send message to port %s",
+ ToString(aPort.name()).c_str());
+ return false;
+}
+
+auto NodeController::SerializeEventMessage(UniquePtr<Event> aEvent,
+ const NodeName* aRelayTarget,
+ uint32_t aType)
+ -> UniquePtr<IPC::Message> {
+ UniquePtr<IPC::Message> message;
+ if (aEvent->type() == Event::kUserMessage) {
+ MOZ_DIAGNOSTIC_ASSERT(
+ aType == EVENT_MESSAGE_TYPE,
+ "Can only send a UserMessage in an EVENT_MESSAGE_TYPE");
+ message = static_cast<UserMessageEvent*>(aEvent.get())
+ ->TakeMessage<IPC::Message>();
+ } else {
+ message = MakeUnique<IPC::Message>(MSG_ROUTING_CONTROL, aType);
+ }
+
+ message->set_relay(aRelayTarget != nullptr);
+
+ size_t length = aEvent->GetSerializedSize();
+ if (aRelayTarget) {
+ length += sizeof(NodeName);
+ }
+
+ // Use an intermediate buffer to serialize to avoid potential issues with the
+ // segmented `IPC::Message` bufferlist. This should be fairly cheap, as the
+ // majority of events are fairly small.
+ Vector<char, 256, InfallibleAllocPolicy> buffer;
+ (void)buffer.initLengthUninitialized(length);
+ if (aRelayTarget) {
+ memcpy(buffer.begin(), aRelayTarget, sizeof(NodeName));
+ aEvent->Serialize(buffer.begin() + sizeof(NodeName));
+ } else {
+ aEvent->Serialize(buffer.begin());
+ }
+
+ message->WriteFooter(buffer.begin(), buffer.length());
+ message->set_event_footer_size(buffer.length());
+
+#ifdef DEBUG
+ // Debug-assert that we can read the same data back out of the buffer.
+ MOZ_ASSERT(message->event_footer_size() == length);
+ Vector<char, 256, InfallibleAllocPolicy> buffer2;
+ (void)buffer2.initLengthUninitialized(message->event_footer_size());
+ MOZ_ASSERT(message->ReadFooter(buffer2.begin(), buffer2.length(),
+ /* truncate */ false));
+ MOZ_ASSERT(!memcmp(buffer2.begin(), buffer.begin(), buffer.length()));
+#endif
+
+ return message;
+}
+
+auto NodeController::DeserializeEventMessage(UniquePtr<IPC::Message> aMessage,
+ NodeName* aRelayTarget)
+ -> UniquePtr<Event> {
+ if (aMessage->is_relay() && !aRelayTarget) {
+ NODECONTROLLER_WARNING("Unexpected relay message '%s'", aMessage->name());
+ return nullptr;
+ }
+
+ Vector<char, 256, InfallibleAllocPolicy> buffer;
+ (void)buffer.initLengthUninitialized(aMessage->event_footer_size());
+ // Truncate the message when reading the footer, so that the extra footer data
+ // is no longer present in the message. This allows future code to eventually
+ // send the same `IPC::Message` to another process.
+ if (!aMessage->ReadFooter(buffer.begin(), buffer.length(),
+ /* truncate */ true)) {
+ NODECONTROLLER_WARNING("Call to ReadFooter for message '%s' Failed",
+ aMessage->name());
+ return nullptr;
+ }
+ aMessage->set_event_footer_size(0);
+
+ UniquePtr<Event> event;
+ if (aRelayTarget) {
+ MOZ_ASSERT(aMessage->is_relay());
+ if (buffer.length() < sizeof(NodeName)) {
+ NODECONTROLLER_WARNING(
+ "Insufficient space in message footer for message '%s'",
+ aMessage->name());
+ return nullptr;
+ }
+ memcpy(aRelayTarget, buffer.begin(), sizeof(NodeName));
+ event = Event::Deserialize(buffer.begin() + sizeof(NodeName),
+ buffer.length() - sizeof(NodeName));
+ } else {
+ event = Event::Deserialize(buffer.begin(), buffer.length());
+ }
+
+ if (!event) {
+ NODECONTROLLER_WARNING("Call to Event::Deserialize for message '%s' Failed",
+ aMessage->name());
+ return nullptr;
+ }
+
+ if (event->type() == Event::kUserMessage) {
+ static_cast<UserMessageEvent*>(event.get())
+ ->AttachMessage(std::move(aMessage));
+ }
+ return event;
+}
+
+already_AddRefed<NodeChannel> NodeController::GetNodeChannel(
+ const NodeName& aName) {
+ auto state = mState.Lock();
+ return do_AddRef(state->mPeers.Get(aName));
+}
+
+void NodeController::DropPeer(NodeName aNodeName) {
+ AssertIOThread();
+
+#ifdef FUZZING_SNAPSHOT
+ MOZ_FUZZING_IPC_DROP_PEER("NodeController::DropPeer");
+#endif
+
+ Invite invite;
+ RefPtr<NodeChannel> channel;
+ nsTArray<PortRef> pendingMerges;
+ {
+ auto state = mState.Lock();
+ state->mPeers.Remove(aNodeName, &channel);
+ state->mPendingMessages.Remove(aNodeName);
+ state->mInvites.Remove(aNodeName, &invite);
+ state->mPendingMerges.Remove(aNodeName, &pendingMerges);
+ }
+
+ NODECONTROLLER_LOG(LogLevel::Info, "Dropping Peer %s (pid: %" PRIPID ")",
+ ToString(aNodeName).c_str(),
+ channel ? channel->OtherPid() : base::kInvalidProcessId);
+
+ if (channel) {
+ channel->Close();
+ }
+ if (invite.mChannel) {
+ invite.mChannel->Close();
+ }
+ if (invite.mToMerge.is_valid()) {
+ // Ignore any possible errors here.
+ (void)mNode->ClosePort(invite.mToMerge);
+ }
+ for (auto& port : pendingMerges) {
+ // Ignore any possible errors here.
+ (void)mNode->ClosePort(port);
+ }
+ mNode->LostConnectionToNode(aNodeName);
+}
+
+void NodeController::ForwardEvent(const NodeName& aNode,
+ UniquePtr<Event> aEvent) {
+ if (aNode == mName) {
+ (void)mNode->AcceptEvent(std::move(aEvent));
+ } else {
+ // On Windows and macOS, messages holding HANDLEs or mach ports must be
+ // relayed via the broker process so it can transfer ownership.
+ bool needsRelay = false;
+#if defined(XP_WIN) || defined(XP_MACOSX)
+ if (!IsBroker() && aNode != kBrokerNodeName &&
+ aEvent->type() == Event::kUserMessage) {
+ auto* userEvent = static_cast<UserMessageEvent*>(aEvent.get());
+ needsRelay =
+ userEvent->HasMessage() &&
+ userEvent->GetMessage<IPC::Message>()->num_relayed_attachments() > 0;
+ }
+#endif
+
+ UniquePtr<IPC::Message> message =
+ SerializeEventMessage(std::move(aEvent), needsRelay ? &aNode : nullptr);
+ MOZ_ASSERT(message->is_relay() == needsRelay,
+ "Message relay status set incorrectly");
+
+ RefPtr<NodeChannel> peer;
+ RefPtr<NodeChannel> broker;
+ bool needsIntroduction = false;
+ {
+ auto state = mState.Lock();
+
+ // Check if we know this peer. If we don't, we'll need to request an
+ // introduction.
+ peer = state->mPeers.Get(aNode);
+ if (!peer || needsRelay) {
+ if (IsBroker()) {
+ NODECONTROLLER_WARNING("Ignoring message '%s' to unknown peer %s",
+ message->name(), ToString(aNode).c_str());
+ return;
+ }
+
+ broker = state->mPeers.Get(kBrokerNodeName);
+ if (!broker) {
+ NODECONTROLLER_WARNING(
+ "Ignoring message '%s' to peer %s due to a missing broker",
+ message->name(), ToString(aNode).c_str());
+ return;
+ }
+
+ if (!needsRelay) {
+ auto& queue =
+ state->mPendingMessages.LookupOrInsertWith(aNode, [&]() {
+ needsIntroduction = true;
+ return Queue<UniquePtr<IPC::Message>, 64>{};
+ });
+ queue.Push(std::move(message));
+ }
+ }
+ }
+
+ MOZ_ASSERT(!needsIntroduction || !needsRelay,
+ "Only one of the two should ever be set");
+
+ if (needsRelay) {
+ NODECONTROLLER_LOG(LogLevel::Info,
+ "Relaying message '%s' for peer %s due to %" PRIu32
+ " attachments",
+ message->name(), ToString(aNode).c_str(),
+ message->num_relayed_attachments());
+ MOZ_ASSERT(message->num_relayed_attachments() > 0 && broker);
+ broker->SendEventMessage(std::move(message));
+ } else if (needsIntroduction) {
+ MOZ_ASSERT(broker);
+ broker->RequestIntroduction(aNode);
+ } else if (peer) {
+ peer->SendEventMessage(std::move(message));
+ }
+ }
+}
+
+void NodeController::BroadcastEvent(UniquePtr<Event> aEvent) {
+ UniquePtr<IPC::Message> message =
+ SerializeEventMessage(std::move(aEvent), nullptr, BROADCAST_MESSAGE_TYPE);
+
+ if (IsBroker()) {
+ OnBroadcast(mName, std::move(message));
+ } else if (RefPtr<NodeChannel> broker = GetNodeChannel(kBrokerNodeName)) {
+ broker->Broadcast(std::move(message));
+ } else {
+ NODECONTROLLER_WARNING(
+ "Trying to broadcast event, but no connection to broker");
+ }
+}
+
+void NodeController::PortStatusChanged(const PortRef& aPortRef) {
+ RefPtr<UserData> userData;
+ int rv = mNode->GetUserData(aPortRef, &userData);
+ if (rv != mojo::core::ports::OK) {
+ NODECONTROLLER_WARNING("GetUserData call for port '%s' failed",
+ ToString(aPortRef.name()).c_str());
+ return;
+ }
+ if (userData) {
+ // All instances of `UserData` attached to ports in this node must be of
+ // type `PortObserver`, so we can call `OnPortStatusChanged` directly on
+ // them.
+ static_cast<PortObserver*>(userData.get())->OnPortStatusChanged();
+ }
+}
+
+void NodeController::OnEventMessage(const NodeName& aFromNode,
+ UniquePtr<IPC::Message> aMessage) {
+ AssertIOThread();
+
+ bool isRelay = aMessage->is_relay();
+ if (isRelay && aMessage->num_relayed_attachments() == 0) {
+ NODECONTROLLER_WARNING(
+ "Invalid relay message without relayed attachments from peer %s!",
+ ToString(aFromNode).c_str());
+ DropPeer(aFromNode);
+ return;
+ }
+
+ NodeName relayTarget;
+ UniquePtr<Event> event = DeserializeEventMessage(
+ std::move(aMessage), isRelay ? &relayTarget : nullptr);
+ if (!event) {
+ NODECONTROLLER_WARNING("Invalid EventMessage from peer %s!",
+ ToString(aFromNode).c_str());
+ DropPeer(aFromNode);
+ return;
+ }
+
+ NodeName fromNode = aFromNode;
+#if defined(XP_WIN) || defined(XP_MACOSX)
+ if (isRelay) {
+ if (event->type() != Event::kUserMessage) {
+ NODECONTROLLER_WARNING(
+ "Unexpected relay of non-UserMessage event from peer %s!",
+ ToString(aFromNode).c_str());
+ DropPeer(aFromNode);
+ return;
+ }
+
+ // If we're the broker, then we'll need to forward this message on to the
+ // true recipient. To do this, we re-serialize the message, passing along
+ // the original source node, and send it to the final node.
+ if (IsBroker()) {
+ UniquePtr<IPC::Message> message =
+ SerializeEventMessage(std::move(event), &aFromNode);
+ if (!message) {
+ NODECONTROLLER_WARNING(
+ "Relaying EventMessage from peer %s failed to re-serialize!",
+ ToString(aFromNode).c_str());
+ DropPeer(aFromNode);
+ return;
+ }
+ MOZ_ASSERT(message->is_relay(), "Message stopped being a relay message?");
+ MOZ_ASSERT(message->num_relayed_attachments() > 0,
+ "Message doesn't have relayed attachments?");
+
+ NODECONTROLLER_LOG(
+ LogLevel::Info,
+ "Relaying message '%s' from peer %s to peer %s (%" PRIu32
+ " attachments)",
+ message->name(), ToString(aFromNode).c_str(),
+ ToString(relayTarget).c_str(), message->num_relayed_attachments());
+
+ RefPtr<NodeChannel> peer;
+ {
+ auto state = mState.Lock();
+ peer = state->mPeers.Get(relayTarget);
+ }
+ if (!peer) {
+ NODECONTROLLER_WARNING(
+ "Dropping relayed message from %s to unknown peer %s",
+ ToString(aFromNode).c_str(), ToString(relayTarget).c_str());
+ return;
+ }
+
+ peer->SendEventMessage(std::move(message));
+ return;
+ }
+
+ // Otherwise, we're the final recipient, so we can continue & process the
+ // message as usual.
+ if (aFromNode != kBrokerNodeName) {
+ NODECONTROLLER_WARNING(
+ "Unexpected relayed EventMessage from non-broker peer %s!",
+ ToString(aFromNode).c_str());
+ DropPeer(aFromNode);
+ return;
+ }
+ fromNode = relayTarget;
+
+ NODECONTROLLER_LOG(LogLevel::Info, "Got relayed message from peer %s",
+ ToString(fromNode).c_str());
+ }
+#endif
+
+ // If we're getting a requested port merge from another process, check to make
+ // sure that we're expecting the request, and record that the merge has
+ // arrived so we don't try to close the port on error.
+ if (event->type() == Event::kMergePort) {
+ // Check that the target port for the merge actually exists.
+ auto targetPort = GetPort(event->port_name());
+ if (!targetPort.is_valid()) {
+ NODECONTROLLER_WARNING(
+ "Unexpected MergePortEvent from peer %s for unknown port %s",
+ ToString(fromNode).c_str(), ToString(event->port_name()).c_str());
+ DropPeer(fromNode);
+ return;
+ }
+
+ // Check if `targetPort` is in our pending merges entry for the given source
+ // node. If this makes the `mPendingMerges` entry empty, remove it.
+ bool expectingMerge = [&] {
+ auto state = mState.Lock();
+ auto pendingMerges = state->mPendingMerges.Lookup(aFromNode);
+ if (!pendingMerges) {
+ return false;
+ }
+ size_t removed = pendingMerges->RemoveElementsBy(
+ [&](auto& port) { return port.name() == targetPort.name(); });
+ if (removed != 0 && pendingMerges->IsEmpty()) {
+ pendingMerges.Remove();
+ }
+ return removed != 0;
+ }();
+
+ if (!expectingMerge) {
+ NODECONTROLLER_WARNING(
+ "Unexpected MergePortEvent from peer %s for port %s",
+ ToString(fromNode).c_str(), ToString(event->port_name()).c_str());
+ DropPeer(fromNode);
+ return;
+ }
+ }
+
+ (void)mNode->AcceptEvent(std::move(event));
+}
+
+void NodeController::OnBroadcast(const NodeName& aFromNode,
+ UniquePtr<IPC::Message> aMessage) {
+ MOZ_DIAGNOSTIC_ASSERT(aMessage->type() == BROADCAST_MESSAGE_TYPE);
+
+ // NOTE: This method may be called off of the IO thread by the
+ // `BroadcastEvent` node callback.
+ if (!IsBroker()) {
+ NODECONTROLLER_WARNING("Broadcast request received by non-broker node");
+ return;
+ }
+
+ UniquePtr<Event> event = DeserializeEventMessage(std::move(aMessage));
+ if (!event) {
+ NODECONTROLLER_WARNING("Invalid broadcast message from peer");
+ return;
+ }
+
+ nsTArray<RefPtr<NodeChannel>> peers;
+ {
+ auto state = mState.Lock();
+ peers.SetCapacity(state->mPeers.Count());
+ for (const auto& peer : state->mPeers.Values()) {
+ peers.AppendElement(peer);
+ }
+ }
+ for (const auto& peer : peers) {
+ // NOTE: This `clone` operation is only supported for a limited number of
+ // message types by the ports API, which provides some extra security by
+ // only allowing those specific types of messages to be broadcasted.
+ // Messages which don't support `Clone` cannot be broadcast, and the ports
+ // library will not attempt to broadcast them.
+ auto clone = event->Clone();
+ if (!clone) {
+ NODECONTROLLER_WARNING("Attempt to broadcast unsupported message");
+ break;
+ }
+
+ peer->SendEventMessage(SerializeEventMessage(std::move(clone)));
+ }
+}
+
+void NodeController::OnIntroduce(const NodeName& aFromNode,
+ NodeChannel::Introduction aIntroduction) {
+ AssertIOThread();
+
+ if (aFromNode != kBrokerNodeName) {
+ NODECONTROLLER_WARNING("Introduction received from non-broker node");
+ DropPeer(aFromNode);
+ return;
+ }
+
+ MOZ_ASSERT(aIntroduction.mMyPid == base::GetCurrentProcId(),
+ "We're the wrong process to receive this?");
+
+ if (!aIntroduction.mHandle) {
+ NODECONTROLLER_WARNING("Could not be introduced to peer %s",
+ ToString(aIntroduction.mName).c_str());
+ mNode->LostConnectionToNode(aIntroduction.mName);
+
+ auto state = mState.Lock();
+ state->mPendingMessages.Remove(aIntroduction.mName);
+ return;
+ }
+
+ auto channel = MakeUnique<IPC::Channel>(std::move(aIntroduction.mHandle),
+ aIntroduction.mMode, nullptr);
+ auto nodeChannel = MakeRefPtr<NodeChannel>(
+ aIntroduction.mName, std::move(channel), this, aIntroduction.mOtherPid);
+
+ {
+ auto state = mState.Lock();
+ bool isNew = false;
+ state->mPeers.LookupOrInsertWith(aIntroduction.mName, [&]() {
+ isNew = true;
+ return nodeChannel;
+ });
+ if (!isNew) {
+ // We got a duplicate introduction. This can happen during normal
+ // execution if both sides request an introduction at the same time. We
+ // can just ignore the second one, as they'll arrive in the same order in
+ // both processes.
+ nodeChannel->Close();
+ return;
+ }
+
+ // Deliver any pending messages, then remove the entry from our table. We do
+ // this while `mState` is still held to ensure that these messages are
+ // all sent before another thread can observe the newly created channel.
+ // As the channel hasn't been `Connect()`-ed yet, this will only queue the
+ // messages up to be sent, so is OK to do with the mutex held. These
+ // messages will be processed to be sent during `Start()` below, which is
+ // performed outside of the lock.
+ if (auto pending = state->mPendingMessages.Lookup(aIntroduction.mName)) {
+ while (!pending->IsEmpty()) {
+ nodeChannel->SendEventMessage(pending->Pop());
+ }
+ pending.Remove();
+ }
+ }
+
+ // NodeChannel::Start must be called with the lock not held, as it may lead to
+ // callbacks being made into `OnChannelError` or `OnMessageReceived`, which
+ // will attempt to re-acquire our lock.
+ nodeChannel->Start();
+}
+
+void NodeController::OnRequestIntroduction(const NodeName& aFromNode,
+ const NodeName& aName) {
+ AssertIOThread();
+ if (NS_WARN_IF(!IsBroker())) {
+ return;
+ }
+
+ RefPtr<NodeChannel> peerA = GetNodeChannel(aFromNode);
+ if (!peerA || aName == mojo::core::ports::kInvalidNodeName) {
+ NODECONTROLLER_WARNING("Invalid OnRequestIntroduction message from node %s",
+ ToString(aFromNode).c_str());
+ DropPeer(aFromNode);
+ return;
+ }
+
+ RefPtr<NodeChannel> peerB = GetNodeChannel(aName);
+ IPC::Channel::ChannelHandle handleA, handleB;
+ if (!peerB || !IPC::Channel::CreateRawPipe(&handleA, &handleB)) {
+ NODECONTROLLER_WARNING(
+ "Rejecting introduction request from '%s' for unknown peer '%s'",
+ ToString(aFromNode).c_str(), ToString(aName).c_str());
+
+ // We don't know this peer, or ran into issues creating the descriptor! Send
+ // an invalid introduction to content to clean up any pending outbound
+ // messages.
+ NodeChannel::Introduction intro{aName, nullptr, IPC::Channel::MODE_SERVER,
+ peerA->OtherPid(), base::kInvalidProcessId};
+ peerA->Introduce(std::move(intro));
+ return;
+ }
+
+ NodeChannel::Introduction introA{aName, std::move(handleA),
+ IPC::Channel::MODE_SERVER, peerA->OtherPid(),
+ peerB->OtherPid()};
+ NodeChannel::Introduction introB{aFromNode, std::move(handleB),
+ IPC::Channel::MODE_CLIENT, peerB->OtherPid(),
+ peerA->OtherPid()};
+ peerA->Introduce(std::move(introA));
+ peerB->Introduce(std::move(introB));
+}
+
+void NodeController::OnAcceptInvite(const NodeName& aFromNode,
+ const NodeName& aRealName,
+ const PortName& aInitialPort) {
+ AssertIOThread();
+ if (!IsBroker()) {
+ NODECONTROLLER_WARNING("Ignoring AcceptInvite message as non-broker");
+ return;
+ }
+
+ if (aRealName == mojo::core::ports::kInvalidNodeName ||
+ aInitialPort == mojo::core::ports::kInvalidPortName) {
+ NODECONTROLLER_WARNING("Invalid name in AcceptInvite message");
+ DropPeer(aFromNode);
+ return;
+ }
+
+ bool inserted = false;
+ Invite invite;
+ {
+ auto state = mState.Lock();
+ MOZ_ASSERT(state->mPendingMessages.IsEmpty(),
+ "Shouldn't have pending messages in broker");
+
+ // Try to remove the source node from our invites list and insert it into
+ // our peers map under the new name.
+ if (state->mInvites.Remove(aFromNode, &invite)) {
+ MOZ_ASSERT(invite.mChannel && invite.mToMerge.is_valid());
+ state->mPeers.LookupOrInsertWith(aRealName, [&]() {
+ inserted = true;
+ return invite.mChannel;
+ });
+ }
+ }
+ if (!inserted) {
+ NODECONTROLLER_WARNING("Invalid AcceptInvite message from node %s",
+ ToString(aFromNode).c_str());
+ DropPeer(aFromNode);
+ return;
+ }
+
+ // Update the name of the node. This field is only accessed from the IO
+ // thread, so it's safe to update it without a lock held.
+ invite.mChannel->SetName(aRealName);
+
+ // Start the port merge to allow our existing initial port to begin
+ // communicating with the remote port.
+ PORTS_ALWAYS_OK(mNode->MergePorts(invite.mToMerge, aRealName, aInitialPort));
+}
+
+void NodeController::OnChannelError(const NodeName& aFromNode) {
+ AssertIOThread();
+ DropPeer(aFromNode);
+}
+
+static mojo::core::ports::NodeName RandomNodeName() {
+ return {RandomUint64OrDie(), RandomUint64OrDie()};
+}
+
+std::tuple<ScopedPort, RefPtr<NodeChannel>> NodeController::InviteChildProcess(
+ UniquePtr<IPC::Channel> aChannel) {
+ MOZ_ASSERT(IsBroker());
+ AssertIOThread();
+
+ // Create the peer with a randomly generated name, and store it in `mInvites`.
+ // This channel and name will be used for communication with the node until it
+ // sends us its' real name in an `AcceptInvite` message.
+ auto ports = CreatePortPair();
+ auto inviteName = RandomNodeName();
+ auto nodeChannel =
+ MakeRefPtr<NodeChannel>(inviteName, std::move(aChannel), this);
+ {
+ auto state = mState.Lock();
+ MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(inviteName),
+ "UUID conflict?");
+ MOZ_DIAGNOSTIC_ASSERT(!state->mInvites.Contains(inviteName),
+ "UUID conflict?");
+ state->mInvites.InsertOrUpdate(inviteName,
+ Invite{nodeChannel, ports.second.Release()});
+ }
+
+ nodeChannel->Start(/* aCallConnect */ false);
+ return std::tuple{std::move(ports.first), std::move(nodeChannel)};
+}
+
+void NodeController::InitBrokerProcess() {
+ AssertIOThread();
+ MOZ_ASSERT(!gNodeController);
+ gNodeController = new NodeController(kBrokerNodeName);
+}
+
+ScopedPort NodeController::InitChildProcess(UniquePtr<IPC::Channel> aChannel,
+ int32_t aParentPid) {
+ AssertIOThread();
+ MOZ_ASSERT(!gNodeController);
+
+ auto nodeName = RandomNodeName();
+ gNodeController = new NodeController(nodeName);
+
+ auto ports = gNodeController->CreatePortPair();
+ PortRef toMerge = ports.second.Release();
+
+ auto nodeChannel = MakeRefPtr<NodeChannel>(
+ kBrokerNodeName, std::move(aChannel), gNodeController, aParentPid);
+ {
+ auto state = gNodeController->mState.Lock();
+ MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(kBrokerNodeName));
+ state->mPeers.InsertOrUpdate(kBrokerNodeName, nodeChannel);
+ MOZ_DIAGNOSTIC_ASSERT(!state->mPendingMerges.Contains(kBrokerNodeName));
+ state->mPendingMerges.LookupOrInsert(kBrokerNodeName)
+ .AppendElement(toMerge);
+ }
+
+ nodeChannel->Start(/* aCallConnect */ true);
+ nodeChannel->AcceptInvite(nodeName, toMerge.name());
+ return std::move(ports.first);
+}
+
+void NodeController::CleanUp() {
+ AssertIOThread();
+ MOZ_ASSERT(gNodeController);
+
+ RefPtr<NodeController> nodeController = gNodeController;
+ gNodeController = nullptr;
+
+ // Collect all objects from our state which need to be cleaned up.
+ nsTArray<NodeName> lostConnections;
+ nsTArray<RefPtr<NodeChannel>> channelsToClose;
+ nsTArray<PortRef> portsToClose;
+ {
+ auto state = nodeController->mState.Lock();
+ for (const auto& chan : state->mPeers) {
+ lostConnections.AppendElement(chan.GetKey());
+ channelsToClose.AppendElement(chan.GetData());
+ }
+ for (const auto& invite : state->mInvites.Values()) {
+ channelsToClose.AppendElement(invite.mChannel);
+ portsToClose.AppendElement(invite.mToMerge);
+ }
+ for (const auto& pendingPorts : state->mPendingMerges.Values()) {
+ portsToClose.AppendElements(pendingPorts);
+ }
+ state->mPeers.Clear();
+ state->mPendingMessages.Clear();
+ state->mInvites.Clear();
+ state->mPendingMerges.Clear();
+ }
+ for (auto& nodeChannel : channelsToClose) {
+ nodeChannel->Close();
+ }
+ for (auto& port : portsToClose) {
+ nodeController->mNode->ClosePort(port);
+ }
+ for (auto& name : lostConnections) {
+ nodeController->mNode->LostConnectionToNode(name);
+ }
+}
+
+#undef NODECONTROLLER_LOG
+#undef NODECONTROLLER_WARNING
+
+} // namespace mozilla::ipc