diff options
Diffstat (limited to 'ipc/glue/NodeController.cpp')
-rw-r--r-- | ipc/glue/NodeController.cpp | 132 |
1 files changed, 78 insertions, 54 deletions
diff --git a/ipc/glue/NodeController.cpp b/ipc/glue/NodeController.cpp index 7781771660..325eff1834 100644 --- a/ipc/glue/NodeController.cpp +++ b/ipc/glue/NodeController.cpp @@ -307,67 +307,78 @@ void NodeController::DropPeer(NodeName aNodeName) { mNode->LostConnectionToNode(aNodeName); } -void NodeController::ForwardEvent(const NodeName& aNode, - UniquePtr<Event> aEvent) { - if (aNode == mName) { - (void)mNode->AcceptEvent(mName, std::move(aEvent)); - } else { - // On Windows and macOS, messages holding HANDLEs or mach ports must be - // relayed via the broker process so it can transfer ownership. - bool needsRelay = false; +void NodeController::ContactRemotePeer(const NodeName& aNode, + UniquePtr<Event> aEvent) { + // On Windows and macOS, messages holding HANDLEs or mach ports must be + // relayed via the broker process so it can transfer ownership. + bool needsRelay = false; #if defined(XP_WIN) || defined(XP_DARWIN) - if (!IsBroker() && aNode != kBrokerNodeName && - aEvent->type() == Event::kUserMessage) { - auto* userEvent = static_cast<UserMessageEvent*>(aEvent.get()); - needsRelay = - userEvent->HasMessage() && - userEvent->GetMessage<IPC::Message>()->num_relayed_attachments() > 0; - } + if (aEvent && !IsBroker() && aNode != kBrokerNodeName && + aEvent->type() == Event::kUserMessage) { + auto* userEvent = static_cast<UserMessageEvent*>(aEvent.get()); + needsRelay = + userEvent->HasMessage() && + userEvent->GetMessage<IPC::Message>()->num_relayed_attachments() > 0; + } #endif - UniquePtr<IPC::Message> message = + UniquePtr<IPC::Message> message; + if (aEvent) { + message = SerializeEventMessage(std::move(aEvent), needsRelay ? &aNode : nullptr); MOZ_ASSERT(message->is_relay() == needsRelay, "Message relay status set incorrectly"); + } - RefPtr<NodeChannel> peer; - RefPtr<NodeChannel> broker; - bool needsIntroduction = false; - { - auto state = mState.Lock(); + RefPtr<NodeChannel> peer; + RefPtr<NodeChannel> broker; + bool needsIntroduction = false; + bool needsBroker = needsRelay; + { + auto state = mState.Lock(); - // Check if we know this peer. If we don't, we'll need to request an - // introduction. - peer = state->mPeers.Get(aNode); - if (!peer || needsRelay) { - if (IsBroker()) { - NODECONTROLLER_WARNING("Ignoring message '%s' to unknown peer %s", - message->name(), ToString(aNode).c_str()); - return; - } - - broker = state->mPeers.Get(kBrokerNodeName); - if (!broker) { - NODECONTROLLER_WARNING( - "Ignoring message '%s' to peer %s due to a missing broker", - message->name(), ToString(aNode).c_str()); - return; - } - - if (!needsRelay) { - auto& queue = - state->mPendingMessages.LookupOrInsertWith(aNode, [&]() { - needsIntroduction = true; - return Queue<UniquePtr<IPC::Message>, 64>{}; - }); - queue.Push(std::move(message)); - } + // Check if we know this peer. If we don't, we'll need to request an + // introduction. + peer = state->mPeers.Get(aNode); + if (!peer) { + // We don't know the peer, check if we've already requested an + // introduction, or if we need to request a new one. + auto& queue = state->mPendingMessages.LookupOrInsertWith(aNode, [&]() { + needsIntroduction = true; + needsBroker = true; + return Queue<UniquePtr<IPC::Message>, 64>{}; + }); + // If we aren't relaying, queue up the message to be sent. + if (message && !needsRelay) { + queue.Push(std::move(message)); } } - MOZ_ASSERT(!needsIntroduction || !needsRelay, - "Only one of the two should ever be set"); + if (needsBroker && !IsBroker()) { + broker = state->mPeers.Get(kBrokerNodeName); + } + } + if (needsBroker && !broker) { + NODECONTROLLER_WARNING( + "Dropping message '%s'; no connection to unknown peer %s", + message ? message->name() : "<null>", ToString(aNode).c_str()); + if (needsIntroduction) { + // We have no broker and will never be able to be introduced to this node. + // Queue a task to clean up any ports connected to it. + XRE_GetIOMessageLoop()->PostTask(NewRunnableMethod<NodeName>( + "NodeController::DropPeer", this, &NodeController::DropPeer, aNode)); + } + return; + } + + if (needsIntroduction) { + NODECONTROLLER_LOG(LogLevel::Info, "Requesting introduction to peer %s", + ToString(aNode).c_str()); + broker->RequestIntroduction(aNode); + } + + if (message) { if (needsRelay) { NODECONTROLLER_LOG(LogLevel::Info, "Relaying message '%s' for peer %s due to %" PRIu32 @@ -376,15 +387,22 @@ void NodeController::ForwardEvent(const NodeName& aNode, message->num_relayed_attachments()); MOZ_ASSERT(message->num_relayed_attachments() > 0 && broker); broker->SendEventMessage(std::move(message)); - } else if (needsIntroduction) { - MOZ_ASSERT(broker); - broker->RequestIntroduction(aNode); } else if (peer) { peer->SendEventMessage(std::move(message)); } } } +void NodeController::ForwardEvent(const NodeName& aNode, + UniquePtr<Event> aEvent) { + MOZ_ASSERT(aEvent, "cannot forward null event"); + if (aNode == mName) { + (void)mNode->AcceptEvent(mName, std::move(aEvent)); + } else { + ContactRemotePeer(aNode, std::move(aEvent)); + } +} + void NodeController::BroadcastEvent(UniquePtr<Event> aEvent) { UniquePtr<IPC::Message> message = SerializeEventMessage(std::move(aEvent), nullptr, BROADCAST_MESSAGE_TYPE); @@ -415,6 +433,11 @@ void NodeController::PortStatusChanged(const PortRef& aPortRef) { } } +void NodeController::ObserveRemoteNode(const NodeName& aNode) { + MOZ_ASSERT(aNode != mName); + ContactRemotePeer(aNode, nullptr); +} + void NodeController::OnEventMessage(const NodeName& aFromNode, UniquePtr<IPC::Message> aMessage) { AssertIOThread(); @@ -715,8 +738,6 @@ void NodeController::OnAcceptInvite(const NodeName& aFromNode, Invite invite; { auto state = mState.Lock(); - MOZ_ASSERT(state->mPendingMessages.IsEmpty(), - "Shouldn't have pending messages in broker"); // Try to remove the source node from our invites list and insert it into // our peers map under the new name. @@ -840,6 +861,9 @@ void NodeController::CleanUp() { lostConnections.AppendElement(chan.GetKey()); channelsToClose.AppendElement(chan.GetData()); } + for (const auto& pending : state->mPendingMessages.Keys()) { + lostConnections.AppendElement(pending); + } for (const auto& invite : state->mInvites.Values()) { channelsToClose.AppendElement(invite.mChannel); portsToClose.AppendElement(invite.mToMerge); |