summaryrefslogtreecommitdiffstats
path: root/ipc/glue/NodeController.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ipc/glue/NodeController.cpp')
-rw-r--r--ipc/glue/NodeController.cpp132
1 files changed, 78 insertions, 54 deletions
diff --git a/ipc/glue/NodeController.cpp b/ipc/glue/NodeController.cpp
index 7781771660..325eff1834 100644
--- a/ipc/glue/NodeController.cpp
+++ b/ipc/glue/NodeController.cpp
@@ -307,67 +307,78 @@ void NodeController::DropPeer(NodeName aNodeName) {
mNode->LostConnectionToNode(aNodeName);
}
-void NodeController::ForwardEvent(const NodeName& aNode,
- UniquePtr<Event> aEvent) {
- if (aNode == mName) {
- (void)mNode->AcceptEvent(mName, std::move(aEvent));
- } else {
- // On Windows and macOS, messages holding HANDLEs or mach ports must be
- // relayed via the broker process so it can transfer ownership.
- bool needsRelay = false;
+void NodeController::ContactRemotePeer(const NodeName& aNode,
+ UniquePtr<Event> aEvent) {
+ // On Windows and macOS, messages holding HANDLEs or mach ports must be
+ // relayed via the broker process so it can transfer ownership.
+ bool needsRelay = false;
#if defined(XP_WIN) || defined(XP_DARWIN)
- if (!IsBroker() && aNode != kBrokerNodeName &&
- aEvent->type() == Event::kUserMessage) {
- auto* userEvent = static_cast<UserMessageEvent*>(aEvent.get());
- needsRelay =
- userEvent->HasMessage() &&
- userEvent->GetMessage<IPC::Message>()->num_relayed_attachments() > 0;
- }
+ if (aEvent && !IsBroker() && aNode != kBrokerNodeName &&
+ aEvent->type() == Event::kUserMessage) {
+ auto* userEvent = static_cast<UserMessageEvent*>(aEvent.get());
+ needsRelay =
+ userEvent->HasMessage() &&
+ userEvent->GetMessage<IPC::Message>()->num_relayed_attachments() > 0;
+ }
#endif
- UniquePtr<IPC::Message> message =
+ UniquePtr<IPC::Message> message;
+ if (aEvent) {
+ message =
SerializeEventMessage(std::move(aEvent), needsRelay ? &aNode : nullptr);
MOZ_ASSERT(message->is_relay() == needsRelay,
"Message relay status set incorrectly");
+ }
- RefPtr<NodeChannel> peer;
- RefPtr<NodeChannel> broker;
- bool needsIntroduction = false;
- {
- auto state = mState.Lock();
+ RefPtr<NodeChannel> peer;
+ RefPtr<NodeChannel> broker;
+ bool needsIntroduction = false;
+ bool needsBroker = needsRelay;
+ {
+ auto state = mState.Lock();
- // Check if we know this peer. If we don't, we'll need to request an
- // introduction.
- peer = state->mPeers.Get(aNode);
- if (!peer || needsRelay) {
- if (IsBroker()) {
- NODECONTROLLER_WARNING("Ignoring message '%s' to unknown peer %s",
- message->name(), ToString(aNode).c_str());
- return;
- }
-
- broker = state->mPeers.Get(kBrokerNodeName);
- if (!broker) {
- NODECONTROLLER_WARNING(
- "Ignoring message '%s' to peer %s due to a missing broker",
- message->name(), ToString(aNode).c_str());
- return;
- }
-
- if (!needsRelay) {
- auto& queue =
- state->mPendingMessages.LookupOrInsertWith(aNode, [&]() {
- needsIntroduction = true;
- return Queue<UniquePtr<IPC::Message>, 64>{};
- });
- queue.Push(std::move(message));
- }
+ // Check if we know this peer. If we don't, we'll need to request an
+ // introduction.
+ peer = state->mPeers.Get(aNode);
+ if (!peer) {
+ // We don't know the peer, check if we've already requested an
+ // introduction, or if we need to request a new one.
+ auto& queue = state->mPendingMessages.LookupOrInsertWith(aNode, [&]() {
+ needsIntroduction = true;
+ needsBroker = true;
+ return Queue<UniquePtr<IPC::Message>, 64>{};
+ });
+ // If we aren't relaying, queue up the message to be sent.
+ if (message && !needsRelay) {
+ queue.Push(std::move(message));
}
}
- MOZ_ASSERT(!needsIntroduction || !needsRelay,
- "Only one of the two should ever be set");
+ if (needsBroker && !IsBroker()) {
+ broker = state->mPeers.Get(kBrokerNodeName);
+ }
+ }
+ if (needsBroker && !broker) {
+ NODECONTROLLER_WARNING(
+ "Dropping message '%s'; no connection to unknown peer %s",
+ message ? message->name() : "<null>", ToString(aNode).c_str());
+ if (needsIntroduction) {
+ // We have no broker and will never be able to be introduced to this node.
+ // Queue a task to clean up any ports connected to it.
+ XRE_GetIOMessageLoop()->PostTask(NewRunnableMethod<NodeName>(
+ "NodeController::DropPeer", this, &NodeController::DropPeer, aNode));
+ }
+ return;
+ }
+
+ if (needsIntroduction) {
+ NODECONTROLLER_LOG(LogLevel::Info, "Requesting introduction to peer %s",
+ ToString(aNode).c_str());
+ broker->RequestIntroduction(aNode);
+ }
+
+ if (message) {
if (needsRelay) {
NODECONTROLLER_LOG(LogLevel::Info,
"Relaying message '%s' for peer %s due to %" PRIu32
@@ -376,15 +387,22 @@ void NodeController::ForwardEvent(const NodeName& aNode,
message->num_relayed_attachments());
MOZ_ASSERT(message->num_relayed_attachments() > 0 && broker);
broker->SendEventMessage(std::move(message));
- } else if (needsIntroduction) {
- MOZ_ASSERT(broker);
- broker->RequestIntroduction(aNode);
} else if (peer) {
peer->SendEventMessage(std::move(message));
}
}
}
+void NodeController::ForwardEvent(const NodeName& aNode,
+ UniquePtr<Event> aEvent) {
+ MOZ_ASSERT(aEvent, "cannot forward null event");
+ if (aNode == mName) {
+ (void)mNode->AcceptEvent(mName, std::move(aEvent));
+ } else {
+ ContactRemotePeer(aNode, std::move(aEvent));
+ }
+}
+
void NodeController::BroadcastEvent(UniquePtr<Event> aEvent) {
UniquePtr<IPC::Message> message =
SerializeEventMessage(std::move(aEvent), nullptr, BROADCAST_MESSAGE_TYPE);
@@ -415,6 +433,11 @@ void NodeController::PortStatusChanged(const PortRef& aPortRef) {
}
}
+void NodeController::ObserveRemoteNode(const NodeName& aNode) {
+ MOZ_ASSERT(aNode != mName);
+ ContactRemotePeer(aNode, nullptr);
+}
+
void NodeController::OnEventMessage(const NodeName& aFromNode,
UniquePtr<IPC::Message> aMessage) {
AssertIOThread();
@@ -715,8 +738,6 @@ void NodeController::OnAcceptInvite(const NodeName& aFromNode,
Invite invite;
{
auto state = mState.Lock();
- MOZ_ASSERT(state->mPendingMessages.IsEmpty(),
- "Shouldn't have pending messages in broker");
// Try to remove the source node from our invites list and insert it into
// our peers map under the new name.
@@ -840,6 +861,9 @@ void NodeController::CleanUp() {
lostConnections.AppendElement(chan.GetKey());
channelsToClose.AppendElement(chan.GetData());
}
+ for (const auto& pending : state->mPendingMessages.Keys()) {
+ lostConnections.AppendElement(pending);
+ }
for (const auto& invite : state->mInvites.Values()) {
channelsToClose.AppendElement(invite.mChannel);
portsToClose.AppendElement(invite.mToMerge);