/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this file, * You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "mozilla/ipc/NodeController.h" #include "MainThreadUtils.h" #include "base/process_util.h" #include "chrome/common/ipc_message.h" #include "mojo/core/ports/name.h" #include "mojo/core/ports/node.h" #include "mozilla/AlreadyAddRefed.h" #include "mozilla/RandomNum.h" #include "mozilla/StaticPtr.h" #include "mozilla/Assertions.h" #include "mozilla/ToString.h" #include "mozilla/ipc/BrowserProcessSubThread.h" #include "mozilla/ipc/ProtocolUtils.h" #include "mozilla/mozalloc.h" #include "nsISerialEventTarget.h" #include "nsTArray.h" #include "nsXULAppAPI.h" #include "nsPrintfCString.h" #define PORTS_ALWAYS_OK(expr) MOZ_ALWAYS_TRUE(mojo::core::ports::OK == (expr)) namespace mozilla::ipc { static StaticRefPtr gNodeController; static LazyLogModule gNodeControllerLog{"NodeController"}; // Helper logger macro which includes the name of the `this` NodeController in // the logged messages. #define NODECONTROLLER_LOG(level_, fmt_, ...) \ MOZ_LOG(gNodeControllerLog, level_, \ ("[%s]: " fmt_, ToString(mName).c_str(), ##__VA_ARGS__)) // Helper warning macro which both does logger logging and emits NS_WARNING logs // under debug mode. #ifdef DEBUG # define NODECONTROLLER_WARNING(fmt_, ...) \ do { \ nsPrintfCString warning("[%s]: " fmt_, ToString(mName).c_str(), \ ##__VA_ARGS__); \ NS_WARNING(warning.get()); \ MOZ_LOG(gNodeControllerLog, LogLevel::Debug, ("%s", warning.get())); \ } while (0) #else # define NODECONTROLLER_WARNING(fmt_, ...) \ NODECONTROLLER_LOG(LogLevel::Warning, fmt_, ##__VA_ARGS__) #endif NodeController::NodeController(const NodeName& aName) : mName(aName), mNode(MakeUnique(aName, this)) {} NodeController::~NodeController() { auto state = mState.Lock(); MOZ_RELEASE_ASSERT(state->mPeers.IsEmpty(), "Destroying NodeController before closing all peers"); MOZ_RELEASE_ASSERT(state->mInvites.IsEmpty(), "Destroying NodeController before closing all invites"); }; // FIXME: Actually provide some way to create the thing. /* static */ NodeController* NodeController::GetSingleton() { MOZ_ASSERT(gNodeController); return gNodeController; } std::pair NodeController::CreatePortPair() { PortRef port0, port1; PORTS_ALWAYS_OK(mNode->CreatePortPair(&port0, &port1)); return {ScopedPort{std::move(port0), this}, ScopedPort{std::move(port1), this}}; } auto NodeController::GetPort(const PortName& aName) -> PortRef { PortRef port; int rv = mNode->GetPort(aName, &port); if (NS_WARN_IF(rv != mojo::core::ports::OK)) { NODECONTROLLER_WARNING("Call to GetPort(%s) Failed", ToString(aName).c_str()); return {}; } return port; } void NodeController::SetPortObserver(const PortRef& aPort, PortObserver* aObserver) { PORTS_ALWAYS_OK(mNode->SetUserData(aPort, aObserver)); } auto NodeController::GetStatus(const PortRef& aPort) -> Maybe { PortStatus status{}; int rv = mNode->GetStatus(aPort, &status); if (rv != mojo::core::ports::OK) { return Nothing(); } return Some(status); } void NodeController::ClosePort(const PortRef& aPort) { PORTS_ALWAYS_OK(mNode->ClosePort(aPort)); } bool NodeController::GetMessage(const PortRef& aPort, UniquePtr* aMessage) { UniquePtr messageEvent; int rv = mNode->GetMessage(aPort, &messageEvent, nullptr); if (rv != mojo::core::ports::OK) { if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) { return false; } MOZ_CRASH("GetMessage on port in invalid state"); } if (messageEvent) { UniquePtr message = messageEvent->TakeMessage(); // If our UserMessageEvent has any ports directly attached to it, fetch them // from our node and attach them to the IPC::Message we extracted. // // It's important to only do this if we have nonempty set of ports on the // event, as we may have never serialized our IPC::Message's ports onto the // event if it was routed in-process. if (messageEvent->num_ports() > 0) { nsTArray attachedPorts(messageEvent->num_ports()); for (size_t i = 0; i < messageEvent->num_ports(); ++i) { attachedPorts.AppendElement( ScopedPort{GetPort(messageEvent->ports()[i]), this}); } message->SetAttachedPorts(std::move(attachedPorts)); } *aMessage = std::move(message); } else { *aMessage = nullptr; } return true; } bool NodeController::SendUserMessage(const PortRef& aPort, UniquePtr aMessage) { auto messageEvent = MakeUnique(0); messageEvent->AttachMessage(std::move(aMessage)); int rv = mNode->SendUserMessage(aPort, std::move(messageEvent)); if (rv == mojo::core::ports::OK) { return true; } if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) { NODECONTROLLER_LOG(LogLevel::Debug, "Ignoring message to port %s as peer was closed", ToString(aPort.name()).c_str()); return true; } NODECONTROLLER_WARNING("Failed to send message to port %s", ToString(aPort.name()).c_str()); return false; } auto NodeController::SerializeEventMessage(UniquePtr aEvent, const NodeName* aRelayTarget, uint32_t aType) -> UniquePtr { UniquePtr message; if (aEvent->type() == Event::kUserMessage) { MOZ_DIAGNOSTIC_ASSERT( aType == EVENT_MESSAGE_TYPE, "Can only send a UserMessage in an EVENT_MESSAGE_TYPE"); message = static_cast(aEvent.get()) ->TakeMessage(); } else { message = MakeUnique(MSG_ROUTING_CONTROL, aType); } message->set_relay(aRelayTarget != nullptr); size_t length = aEvent->GetSerializedSize(); if (aRelayTarget) { length += sizeof(NodeName); } // Use an intermediate buffer to serialize to avoid potential issues with the // segmented `IPC::Message` bufferlist. This should be fairly cheap, as the // majority of events are fairly small. Vector buffer; (void)buffer.initLengthUninitialized(length); if (aRelayTarget) { memcpy(buffer.begin(), aRelayTarget, sizeof(NodeName)); aEvent->Serialize(buffer.begin() + sizeof(NodeName)); } else { aEvent->Serialize(buffer.begin()); } message->WriteFooter(buffer.begin(), buffer.length()); message->set_event_footer_size(buffer.length()); #ifdef DEBUG // Debug-assert that we can read the same data back out of the buffer. MOZ_ASSERT(message->event_footer_size() == length); Vector buffer2; (void)buffer2.initLengthUninitialized(message->event_footer_size()); MOZ_ASSERT(message->ReadFooter(buffer2.begin(), buffer2.length(), /* truncate */ false)); MOZ_ASSERT(!memcmp(buffer2.begin(), buffer.begin(), buffer.length())); #endif return message; } auto NodeController::DeserializeEventMessage(UniquePtr aMessage, NodeName* aRelayTarget) -> UniquePtr { if (aMessage->is_relay() && !aRelayTarget) { NODECONTROLLER_WARNING("Unexpected relay message '%s'", aMessage->name()); return nullptr; } Vector buffer; (void)buffer.initLengthUninitialized(aMessage->event_footer_size()); // Truncate the message when reading the footer, so that the extra footer data // is no longer present in the message. This allows future code to eventually // send the same `IPC::Message` to another process. if (!aMessage->ReadFooter(buffer.begin(), buffer.length(), /* truncate */ true)) { NODECONTROLLER_WARNING("Call to ReadFooter for message '%s' Failed", aMessage->name()); return nullptr; } aMessage->set_event_footer_size(0); UniquePtr event; if (aRelayTarget) { MOZ_ASSERT(aMessage->is_relay()); if (buffer.length() < sizeof(NodeName)) { NODECONTROLLER_WARNING( "Insufficient space in message footer for message '%s'", aMessage->name()); return nullptr; } memcpy(aRelayTarget, buffer.begin(), sizeof(NodeName)); event = Event::Deserialize(buffer.begin() + sizeof(NodeName), buffer.length() - sizeof(NodeName)); } else { event = Event::Deserialize(buffer.begin(), buffer.length()); } if (!event) { NODECONTROLLER_WARNING("Call to Event::Deserialize for message '%s' Failed", aMessage->name()); return nullptr; } if (event->type() == Event::kUserMessage) { static_cast(event.get()) ->AttachMessage(std::move(aMessage)); } return event; } already_AddRefed NodeController::GetNodeChannel( const NodeName& aName) { auto state = mState.Lock(); return do_AddRef(state->mPeers.Get(aName)); } void NodeController::DropPeer(NodeName aNodeName) { AssertIOThread(); #ifdef FUZZING_SNAPSHOT MOZ_FUZZING_IPC_DROP_PEER("NodeController::DropPeer"); #endif Invite invite; RefPtr channel; nsTArray pendingMerges; { auto state = mState.Lock(); state->mPeers.Remove(aNodeName, &channel); state->mPendingMessages.Remove(aNodeName); state->mInvites.Remove(aNodeName, &invite); state->mPendingMerges.Remove(aNodeName, &pendingMerges); } NODECONTROLLER_LOG(LogLevel::Info, "Dropping Peer %s (pid: %" PRIPID ")", ToString(aNodeName).c_str(), channel ? channel->OtherPid() : base::kInvalidProcessId); if (channel) { channel->Close(); } if (invite.mChannel) { invite.mChannel->Close(); } if (invite.mToMerge.is_valid()) { // Ignore any possible errors here. (void)mNode->ClosePort(invite.mToMerge); } for (auto& port : pendingMerges) { // Ignore any possible errors here. (void)mNode->ClosePort(port); } mNode->LostConnectionToNode(aNodeName); } void NodeController::ForwardEvent(const NodeName& aNode, UniquePtr aEvent) { if (aNode == mName) { (void)mNode->AcceptEvent(std::move(aEvent)); } else { // On Windows and macOS, messages holding HANDLEs or mach ports must be // relayed via the broker process so it can transfer ownership. bool needsRelay = false; #if defined(XP_WIN) || defined(XP_MACOSX) if (!IsBroker() && aNode != kBrokerNodeName && aEvent->type() == Event::kUserMessage) { auto* userEvent = static_cast(aEvent.get()); needsRelay = userEvent->HasMessage() && userEvent->GetMessage()->num_relayed_attachments() > 0; } #endif UniquePtr message = SerializeEventMessage(std::move(aEvent), needsRelay ? &aNode : nullptr); MOZ_ASSERT(message->is_relay() == needsRelay, "Message relay status set incorrectly"); RefPtr peer; RefPtr broker; bool needsIntroduction = false; { auto state = mState.Lock(); // Check if we know this peer. If we don't, we'll need to request an // introduction. peer = state->mPeers.Get(aNode); if (!peer || needsRelay) { if (IsBroker()) { NODECONTROLLER_WARNING("Ignoring message '%s' to unknown peer %s", message->name(), ToString(aNode).c_str()); return; } broker = state->mPeers.Get(kBrokerNodeName); if (!broker) { NODECONTROLLER_WARNING( "Ignoring message '%s' to peer %s due to a missing broker", message->name(), ToString(aNode).c_str()); return; } if (!needsRelay) { auto& queue = state->mPendingMessages.LookupOrInsertWith(aNode, [&]() { needsIntroduction = true; return Queue, 64>{}; }); queue.Push(std::move(message)); } } } MOZ_ASSERT(!needsIntroduction || !needsRelay, "Only one of the two should ever be set"); if (needsRelay) { NODECONTROLLER_LOG(LogLevel::Info, "Relaying message '%s' for peer %s due to %" PRIu32 " attachments", message->name(), ToString(aNode).c_str(), message->num_relayed_attachments()); MOZ_ASSERT(message->num_relayed_attachments() > 0 && broker); broker->SendEventMessage(std::move(message)); } else if (needsIntroduction) { MOZ_ASSERT(broker); broker->RequestIntroduction(aNode); } else if (peer) { peer->SendEventMessage(std::move(message)); } } } void NodeController::BroadcastEvent(UniquePtr aEvent) { UniquePtr message = SerializeEventMessage(std::move(aEvent), nullptr, BROADCAST_MESSAGE_TYPE); if (IsBroker()) { OnBroadcast(mName, std::move(message)); } else if (RefPtr broker = GetNodeChannel(kBrokerNodeName)) { broker->Broadcast(std::move(message)); } else { NODECONTROLLER_WARNING( "Trying to broadcast event, but no connection to broker"); } } void NodeController::PortStatusChanged(const PortRef& aPortRef) { RefPtr userData; int rv = mNode->GetUserData(aPortRef, &userData); if (rv != mojo::core::ports::OK) { NODECONTROLLER_WARNING("GetUserData call for port '%s' failed", ToString(aPortRef.name()).c_str()); return; } if (userData) { // All instances of `UserData` attached to ports in this node must be of // type `PortObserver`, so we can call `OnPortStatusChanged` directly on // them. static_cast(userData.get())->OnPortStatusChanged(); } } void NodeController::OnEventMessage(const NodeName& aFromNode, UniquePtr aMessage) { AssertIOThread(); bool isRelay = aMessage->is_relay(); if (isRelay && aMessage->num_relayed_attachments() == 0) { NODECONTROLLER_WARNING( "Invalid relay message without relayed attachments from peer %s!", ToString(aFromNode).c_str()); DropPeer(aFromNode); return; } NodeName relayTarget; UniquePtr event = DeserializeEventMessage( std::move(aMessage), isRelay ? &relayTarget : nullptr); if (!event) { NODECONTROLLER_WARNING("Invalid EventMessage from peer %s!", ToString(aFromNode).c_str()); DropPeer(aFromNode); return; } NodeName fromNode = aFromNode; #if defined(XP_WIN) || defined(XP_MACOSX) if (isRelay) { if (event->type() != Event::kUserMessage) { NODECONTROLLER_WARNING( "Unexpected relay of non-UserMessage event from peer %s!", ToString(aFromNode).c_str()); DropPeer(aFromNode); return; } // If we're the broker, then we'll need to forward this message on to the // true recipient. To do this, we re-serialize the message, passing along // the original source node, and send it to the final node. if (IsBroker()) { UniquePtr message = SerializeEventMessage(std::move(event), &aFromNode); if (!message) { NODECONTROLLER_WARNING( "Relaying EventMessage from peer %s failed to re-serialize!", ToString(aFromNode).c_str()); DropPeer(aFromNode); return; } MOZ_ASSERT(message->is_relay(), "Message stopped being a relay message?"); MOZ_ASSERT(message->num_relayed_attachments() > 0, "Message doesn't have relayed attachments?"); NODECONTROLLER_LOG( LogLevel::Info, "Relaying message '%s' from peer %s to peer %s (%" PRIu32 " attachments)", message->name(), ToString(aFromNode).c_str(), ToString(relayTarget).c_str(), message->num_relayed_attachments()); RefPtr peer; { auto state = mState.Lock(); peer = state->mPeers.Get(relayTarget); } if (!peer) { NODECONTROLLER_WARNING( "Dropping relayed message from %s to unknown peer %s", ToString(aFromNode).c_str(), ToString(relayTarget).c_str()); return; } peer->SendEventMessage(std::move(message)); return; } // Otherwise, we're the final recipient, so we can continue & process the // message as usual. if (aFromNode != kBrokerNodeName) { NODECONTROLLER_WARNING( "Unexpected relayed EventMessage from non-broker peer %s!", ToString(aFromNode).c_str()); DropPeer(aFromNode); return; } fromNode = relayTarget; NODECONTROLLER_LOG(LogLevel::Info, "Got relayed message from peer %s", ToString(fromNode).c_str()); } #endif // If we're getting a requested port merge from another process, check to make // sure that we're expecting the request, and record that the merge has // arrived so we don't try to close the port on error. if (event->type() == Event::kMergePort) { // Check that the target port for the merge actually exists. auto targetPort = GetPort(event->port_name()); if (!targetPort.is_valid()) { NODECONTROLLER_WARNING( "Unexpected MergePortEvent from peer %s for unknown port %s", ToString(fromNode).c_str(), ToString(event->port_name()).c_str()); DropPeer(fromNode); return; } // Check if `targetPort` is in our pending merges entry for the given source // node. If this makes the `mPendingMerges` entry empty, remove it. bool expectingMerge = [&] { auto state = mState.Lock(); auto pendingMerges = state->mPendingMerges.Lookup(aFromNode); if (!pendingMerges) { return false; } size_t removed = pendingMerges->RemoveElementsBy( [&](auto& port) { return port.name() == targetPort.name(); }); if (removed != 0 && pendingMerges->IsEmpty()) { pendingMerges.Remove(); } return removed != 0; }(); if (!expectingMerge) { NODECONTROLLER_WARNING( "Unexpected MergePortEvent from peer %s for port %s", ToString(fromNode).c_str(), ToString(event->port_name()).c_str()); DropPeer(fromNode); return; } } (void)mNode->AcceptEvent(std::move(event)); } void NodeController::OnBroadcast(const NodeName& aFromNode, UniquePtr aMessage) { MOZ_DIAGNOSTIC_ASSERT(aMessage->type() == BROADCAST_MESSAGE_TYPE); // NOTE: This method may be called off of the IO thread by the // `BroadcastEvent` node callback. if (!IsBroker()) { NODECONTROLLER_WARNING("Broadcast request received by non-broker node"); return; } UniquePtr event = DeserializeEventMessage(std::move(aMessage)); if (!event) { NODECONTROLLER_WARNING("Invalid broadcast message from peer"); return; } nsTArray> peers; { auto state = mState.Lock(); peers.SetCapacity(state->mPeers.Count()); for (const auto& peer : state->mPeers.Values()) { peers.AppendElement(peer); } } for (const auto& peer : peers) { // NOTE: This `clone` operation is only supported for a limited number of // message types by the ports API, which provides some extra security by // only allowing those specific types of messages to be broadcasted. // Messages which don't support `Clone` cannot be broadcast, and the ports // library will not attempt to broadcast them. auto clone = event->Clone(); if (!clone) { NODECONTROLLER_WARNING("Attempt to broadcast unsupported message"); break; } peer->SendEventMessage(SerializeEventMessage(std::move(clone))); } } void NodeController::OnIntroduce(const NodeName& aFromNode, NodeChannel::Introduction aIntroduction) { AssertIOThread(); if (aFromNode != kBrokerNodeName) { NODECONTROLLER_WARNING("Introduction received from non-broker node"); DropPeer(aFromNode); return; } MOZ_ASSERT(aIntroduction.mMyPid == base::GetCurrentProcId(), "We're the wrong process to receive this?"); if (!aIntroduction.mHandle) { NODECONTROLLER_WARNING("Could not be introduced to peer %s", ToString(aIntroduction.mName).c_str()); mNode->LostConnectionToNode(aIntroduction.mName); auto state = mState.Lock(); state->mPendingMessages.Remove(aIntroduction.mName); return; } auto channel = MakeUnique(std::move(aIntroduction.mHandle), aIntroduction.mMode, nullptr); auto nodeChannel = MakeRefPtr( aIntroduction.mName, std::move(channel), this, aIntroduction.mOtherPid); { auto state = mState.Lock(); bool isNew = false; state->mPeers.LookupOrInsertWith(aIntroduction.mName, [&]() { isNew = true; return nodeChannel; }); if (!isNew) { // We got a duplicate introduction. This can happen during normal // execution if both sides request an introduction at the same time. We // can just ignore the second one, as they'll arrive in the same order in // both processes. nodeChannel->Close(); return; } // Deliver any pending messages, then remove the entry from our table. We do // this while `mState` is still held to ensure that these messages are // all sent before another thread can observe the newly created channel. // As the channel hasn't been `Connect()`-ed yet, this will only queue the // messages up to be sent, so is OK to do with the mutex held. These // messages will be processed to be sent during `Start()` below, which is // performed outside of the lock. if (auto pending = state->mPendingMessages.Lookup(aIntroduction.mName)) { while (!pending->IsEmpty()) { nodeChannel->SendEventMessage(pending->Pop()); } pending.Remove(); } } // NodeChannel::Start must be called with the lock not held, as it may lead to // callbacks being made into `OnChannelError` or `OnMessageReceived`, which // will attempt to re-acquire our lock. nodeChannel->Start(); } void NodeController::OnRequestIntroduction(const NodeName& aFromNode, const NodeName& aName) { AssertIOThread(); if (NS_WARN_IF(!IsBroker())) { return; } RefPtr peerA = GetNodeChannel(aFromNode); if (!peerA || aName == mojo::core::ports::kInvalidNodeName) { NODECONTROLLER_WARNING("Invalid OnRequestIntroduction message from node %s", ToString(aFromNode).c_str()); DropPeer(aFromNode); return; } RefPtr peerB = GetNodeChannel(aName); IPC::Channel::ChannelHandle handleA, handleB; if (!peerB || !IPC::Channel::CreateRawPipe(&handleA, &handleB)) { NODECONTROLLER_WARNING( "Rejecting introduction request from '%s' for unknown peer '%s'", ToString(aFromNode).c_str(), ToString(aName).c_str()); // We don't know this peer, or ran into issues creating the descriptor! Send // an invalid introduction to content to clean up any pending outbound // messages. NodeChannel::Introduction intro{aName, nullptr, IPC::Channel::MODE_SERVER, peerA->OtherPid(), base::kInvalidProcessId}; peerA->Introduce(std::move(intro)); return; } NodeChannel::Introduction introA{aName, std::move(handleA), IPC::Channel::MODE_SERVER, peerA->OtherPid(), peerB->OtherPid()}; NodeChannel::Introduction introB{aFromNode, std::move(handleB), IPC::Channel::MODE_CLIENT, peerB->OtherPid(), peerA->OtherPid()}; peerA->Introduce(std::move(introA)); peerB->Introduce(std::move(introB)); } void NodeController::OnAcceptInvite(const NodeName& aFromNode, const NodeName& aRealName, const PortName& aInitialPort) { AssertIOThread(); if (!IsBroker()) { NODECONTROLLER_WARNING("Ignoring AcceptInvite message as non-broker"); return; } if (aRealName == mojo::core::ports::kInvalidNodeName || aInitialPort == mojo::core::ports::kInvalidPortName) { NODECONTROLLER_WARNING("Invalid name in AcceptInvite message"); DropPeer(aFromNode); return; } bool inserted = false; Invite invite; { auto state = mState.Lock(); MOZ_ASSERT(state->mPendingMessages.IsEmpty(), "Shouldn't have pending messages in broker"); // Try to remove the source node from our invites list and insert it into // our peers map under the new name. if (state->mInvites.Remove(aFromNode, &invite)) { MOZ_ASSERT(invite.mChannel && invite.mToMerge.is_valid()); state->mPeers.LookupOrInsertWith(aRealName, [&]() { inserted = true; return invite.mChannel; }); } } if (!inserted) { NODECONTROLLER_WARNING("Invalid AcceptInvite message from node %s", ToString(aFromNode).c_str()); DropPeer(aFromNode); return; } // Update the name of the node. This field is only accessed from the IO // thread, so it's safe to update it without a lock held. invite.mChannel->SetName(aRealName); // Start the port merge to allow our existing initial port to begin // communicating with the remote port. PORTS_ALWAYS_OK(mNode->MergePorts(invite.mToMerge, aRealName, aInitialPort)); } void NodeController::OnChannelError(const NodeName& aFromNode) { AssertIOThread(); DropPeer(aFromNode); } static mojo::core::ports::NodeName RandomNodeName() { return {RandomUint64OrDie(), RandomUint64OrDie()}; } std::tuple> NodeController::InviteChildProcess( UniquePtr aChannel) { MOZ_ASSERT(IsBroker()); AssertIOThread(); // Create the peer with a randomly generated name, and store it in `mInvites`. // This channel and name will be used for communication with the node until it // sends us its' real name in an `AcceptInvite` message. auto ports = CreatePortPair(); auto inviteName = RandomNodeName(); auto nodeChannel = MakeRefPtr(inviteName, std::move(aChannel), this); { auto state = mState.Lock(); MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(inviteName), "UUID conflict?"); MOZ_DIAGNOSTIC_ASSERT(!state->mInvites.Contains(inviteName), "UUID conflict?"); state->mInvites.InsertOrUpdate(inviteName, Invite{nodeChannel, ports.second.Release()}); } nodeChannel->Start(/* aCallConnect */ false); return std::tuple{std::move(ports.first), std::move(nodeChannel)}; } void NodeController::InitBrokerProcess() { AssertIOThread(); MOZ_ASSERT(!gNodeController); gNodeController = new NodeController(kBrokerNodeName); } ScopedPort NodeController::InitChildProcess(UniquePtr aChannel, int32_t aParentPid) { AssertIOThread(); MOZ_ASSERT(!gNodeController); auto nodeName = RandomNodeName(); gNodeController = new NodeController(nodeName); auto ports = gNodeController->CreatePortPair(); PortRef toMerge = ports.second.Release(); auto nodeChannel = MakeRefPtr( kBrokerNodeName, std::move(aChannel), gNodeController, aParentPid); { auto state = gNodeController->mState.Lock(); MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(kBrokerNodeName)); state->mPeers.InsertOrUpdate(kBrokerNodeName, nodeChannel); MOZ_DIAGNOSTIC_ASSERT(!state->mPendingMerges.Contains(kBrokerNodeName)); state->mPendingMerges.LookupOrInsert(kBrokerNodeName) .AppendElement(toMerge); } nodeChannel->Start(/* aCallConnect */ true); nodeChannel->AcceptInvite(nodeName, toMerge.name()); return std::move(ports.first); } void NodeController::CleanUp() { AssertIOThread(); MOZ_ASSERT(gNodeController); RefPtr nodeController = gNodeController; gNodeController = nullptr; // Collect all objects from our state which need to be cleaned up. nsTArray lostConnections; nsTArray> channelsToClose; nsTArray portsToClose; { auto state = nodeController->mState.Lock(); for (const auto& chan : state->mPeers) { lostConnections.AppendElement(chan.GetKey()); channelsToClose.AppendElement(chan.GetData()); } for (const auto& invite : state->mInvites.Values()) { channelsToClose.AppendElement(invite.mChannel); portsToClose.AppendElement(invite.mToMerge); } for (const auto& pendingPorts : state->mPendingMerges.Values()) { portsToClose.AppendElements(pendingPorts); } state->mPeers.Clear(); state->mPendingMessages.Clear(); state->mInvites.Clear(); state->mPendingMerges.Clear(); } for (auto& nodeChannel : channelsToClose) { nodeChannel->Close(); } for (auto& port : portsToClose) { nodeController->mNode->ClosePort(port); } for (auto& name : lostConnections) { nodeController->mNode->LostConnectionToNode(name); } } #undef NODECONTROLLER_LOG #undef NODECONTROLLER_WARNING } // namespace mozilla::ipc