// Copyright 2016 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include #include #include #include #include "base/logging.h" #include "base/waitable_event.h" #include "base/thread.h" #include "base/string_util.h" #include "mojo/core/ports/event.h" #include "mojo/core/ports/node.h" #include "mojo/core/ports/node_delegate.h" #include "mojo/core/ports/port_locker.h" #include "mojo/core/ports/user_message.h" #include "testing/gtest/include/gtest/gtest.h" #include "mozilla/Mutex.h" namespace mojo { namespace core { namespace ports { namespace test { namespace { // TODO(rockot): Remove this unnecessary alias. using ScopedMessage = mozilla::UniquePtr; class TestMessage : public UserMessage { public: static const TypeInfo kUserMessageTypeInfo; explicit TestMessage(const std::string& payload) : UserMessage(&kUserMessageTypeInfo), payload_(payload) {} ~TestMessage() override = default; const std::string& payload() const { return payload_; } private: std::string payload_; }; const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {}; ScopedMessage NewUserMessageEvent(const std::string& payload, size_t num_ports) { auto event = mozilla::MakeUnique(num_ports); event->AttachMessage(mozilla::MakeUnique(payload)); return event; } bool MessageEquals(const ScopedMessage& message, const std::string& s) { return message->GetMessage()->payload() == s; } class TestNode; class MessageRouter { public: virtual ~MessageRouter() = default; virtual void ForwardEvent(TestNode* from_node, const NodeName& node_name, ScopedEvent event) = 0; virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0; }; class TestNode : public NodeDelegate { public: explicit TestNode(uint64_t id) : node_name_(id, 1), node_(node_name_, this), node_thread_(StringPrintf("Node %" PRIu64 " thread", id).c_str()), events_available_event_(/* manual_reset */ false, /* initially_signaled */ false), idle_event_(/* manual_reset */ true, /* initially_signaled */ true) {} ~TestNode() override { StopWhenIdle(); node_thread_.Stop(); } const NodeName& name() const { return node_name_; } // NOTE: Node is thread-safe. Node& node() { return node_; } base::WaitableEvent& idle_event() { return idle_event_; } bool IsIdle() { mozilla::MutexAutoLock lock(lock_); return started_ && !dispatching_ && (incoming_events_.empty() || (block_on_event_ && blocked_)); } void BlockOnEvent(Event::Type type) { mozilla::MutexAutoLock lock(lock_); blocked_event_type_ = type; block_on_event_ = true; } void Unblock() { mozilla::MutexAutoLock lock(lock_); block_on_event_ = false; events_available_event_.Signal(); } void Start(MessageRouter* router) { router_ = router; node_thread_.Start(); node_thread_.message_loop()->PostTask(mozilla::NewNonOwningRunnableMethod( "TestNode::ProcessEvents", this, &TestNode::ProcessEvents)); } void StopWhenIdle() { mozilla::MutexAutoLock lock(lock_); should_quit_ = true; events_available_event_.Signal(); } void WakeUp() { events_available_event_.Signal(); } int SendStringMessage(const PortRef& port, const std::string& s) { return node_.SendUserMessage(port, NewUserMessageEvent(s, 0)); } int SendMultipleMessages(const PortRef& port, size_t num_messages) { for (size_t i = 0; i < num_messages; ++i) { int result = SendStringMessage(port, ""); if (result != OK) { return result; } } return OK; } int SendStringMessageWithPort(const PortRef& port, const std::string& s, const PortName& sent_port_name) { auto event = NewUserMessageEvent(s, 1); event->ports()[0] = sent_port_name; return node_.SendUserMessage(port, std::move(event)); } int SendStringMessageWithPort(const PortRef& port, const std::string& s, const PortRef& sent_port) { return SendStringMessageWithPort(port, s, sent_port.name()); } void set_drop_messages(bool value) { mozilla::MutexAutoLock lock(lock_); drop_messages_ = value; } void set_save_messages(bool value) { mozilla::MutexAutoLock lock(lock_); save_messages_ = value; } bool ReadMessage(const PortRef& port, ScopedMessage* message) { return node_.GetMessage(port, message, nullptr) == OK && *message; } bool ReadMultipleMessages(const PortRef& port, size_t num_messages) { for (size_t i = 0; i < num_messages; ++i) { ScopedMessage message; if (!ReadMessage(port, &message)) { return false; } } return true; } bool GetSavedMessage(ScopedMessage* message) { mozilla::MutexAutoLock lock(lock_); if (saved_messages_.empty()) { message->reset(); return false; } std::swap(*message, saved_messages_.front()); saved_messages_.pop(); return true; } void EnqueueEvent(const NodeName& from_node, ScopedEvent event) { idle_event_.Reset(); // NOTE: This may be called from ForwardMessage and thus must not reenter // |node_|. mozilla::MutexAutoLock lock(lock_); incoming_events_.push({from_node, std::move(event)}); events_available_event_.Signal(); } void ForwardEvent(const NodeName& node_name, ScopedEvent event) override { { mozilla::MutexAutoLock lock(lock_); if (drop_messages_) { DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to " << node_name; mozilla::MutexAutoUnlock unlock(lock_); ClosePortsInEvent(event.get()); return; } } DCHECK(router_); DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name; router_->ForwardEvent(this, node_name, std::move(event)); } void BroadcastEvent(ScopedEvent event) override { router_->BroadcastEvent(this, std::move(event)); } void PortStatusChanged(const PortRef& port) override { // The port may be closed, in which case we ignore the notification. mozilla::MutexAutoLock lock(lock_); if (!save_messages_) { return; } for (;;) { ScopedMessage message; { mozilla::MutexAutoUnlock unlock(lock_); if (!ReadMessage(port, &message)) { break; } } saved_messages_.emplace(std::move(message)); } } void ObserveRemoteNode(const NodeName& node) override { DCHECK(node != node_name_); } void ClosePortsInEvent(Event* event) { if (event->type() != Event::Type::kUserMessage) { return; } UserMessageEvent* message_event = static_cast(event); for (size_t i = 0; i < message_event->num_ports(); ++i) { PortRef port; ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port)); EXPECT_EQ(OK, node_.ClosePort(port)); } } uint64_t GetUnacknowledgedMessageCount(const PortRef& port_ref) { PortStatus status{}; if (node_.GetStatus(port_ref, &status) != OK) { return 0; } return status.unacknowledged_message_count; } void AllowPortMerge(const PortRef& port_ref) { SinglePortLocker locker(&port_ref); locker.port()->pending_merge_peer = true; } private: void ProcessEvents() { for (;;) { events_available_event_.Wait(); mozilla::MutexAutoLock lock(lock_); if (should_quit_) { return; } dispatching_ = true; while (!incoming_events_.empty()) { if (block_on_event_ && incoming_events_.front().second->type() == blocked_event_type_) { blocked_ = true; // Go idle if we hit a blocked event type. break; } blocked_ = false; auto node_event_pair = std::move(incoming_events_.front()); incoming_events_.pop(); // NOTE: AcceptMessage() can re-enter this object to call any of the // NodeDelegate interface methods. mozilla::MutexAutoUnlock unlock(lock_); node_.AcceptEvent(node_event_pair.first, std::move(node_event_pair.second)); } dispatching_ = false; started_ = true; idle_event_.Signal(); }; } const NodeName node_name_; Node node_; MessageRouter* router_ = nullptr; base::Thread node_thread_; base::WaitableEvent events_available_event_; base::WaitableEvent idle_event_; // Guards fields below. mozilla::Mutex lock_ MOZ_UNANNOTATED{"TestNode"}; bool started_ = false; bool dispatching_ = false; bool should_quit_ = false; bool drop_messages_ = false; bool save_messages_ = false; bool blocked_ = false; bool block_on_event_ = false; Event::Type blocked_event_type_{}; std::queue> incoming_events_; std::queue saved_messages_; }; class PortsTest : public testing::Test, public MessageRouter { public: void AddNode(TestNode* node) { { mozilla::MutexAutoLock lock(lock_); nodes_[node->name()] = node; } node->Start(this); } void RemoveNode(TestNode* node) { { mozilla::MutexAutoLock lock(lock_); nodes_.erase(node->name()); } for (const auto& entry : nodes_) { entry.second->node().LostConnectionToNode(node->name()); } } // Waits until all known Nodes are idle. Message forwarding and processing // is handled in such a way that idleness is a stable state: once all nodes in // the system are idle, they will remain idle until the test explicitly // initiates some further event (e.g. sending a message, closing a port, or // removing a Node). void WaitForIdle() { for (;;) { mozilla::MutexAutoLock global_lock(global_lock_); bool all_nodes_idle = true; for (const auto& entry : nodes_) { if (!entry.second->IsIdle()) { all_nodes_idle = false; } entry.second->WakeUp(); } if (all_nodes_idle) { return; } // Wait for any Node to signal that it's idle. mozilla::MutexAutoUnlock global_unlock(global_lock_); std::vector events; for (const auto& entry : nodes_) { events.push_back(&entry.second->idle_event()); } base::WaitableEvent::WaitMany(events.data(), events.size()); } } void CreatePortPair(TestNode* node0, PortRef* port0, TestNode* node1, PortRef* port1) { if (node0 == node1) { EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1)); } else { EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0)); EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1)); EXPECT_EQ( OK, node0->node().InitializePort(*port0, node1->name(), port1->name(), node1->name(), port1->name())); EXPECT_EQ( OK, node1->node().InitializePort(*port1, node0->name(), port0->name(), node0->name(), port0->name())); } } private: // MessageRouter: void ForwardEvent(TestNode* from_node, const NodeName& node_name, ScopedEvent event) override { mozilla::MutexAutoLock global_lock(global_lock_); mozilla::MutexAutoLock lock(lock_); // Drop messages from nodes that have been removed. if (nodes_.find(from_node->name()) == nodes_.end()) { from_node->ClosePortsInEvent(event.get()); return; } auto it = nodes_.find(node_name); if (it == nodes_.end()) { DVLOG(1) << "Node not found: " << node_name; return; } // Serialize and de-serialize all forwarded events. size_t buf_size = event->GetSerializedSize(); mozilla::UniquePtr buf(new char[buf_size]); event->Serialize(buf.get()); ScopedEvent copy = Event::Deserialize(buf.get(), buf_size); // This should always succeed unless serialization or deserialization // is broken. In that case, the loss of events should cause a test failure. ASSERT_TRUE(copy); // Also copy the payload for user messages. if (event->type() == Event::Type::kUserMessage) { UserMessageEvent* message_event = static_cast(event.get()); UserMessageEvent* message_copy = static_cast(copy.get()); message_copy->AttachMessage(mozilla::MakeUnique( message_event->GetMessage()->payload())); } it->second->EnqueueEvent(from_node->name(), std::move(event)); } void BroadcastEvent(TestNode* from_node, ScopedEvent event) override { mozilla::MutexAutoLock global_lock(global_lock_); mozilla::MutexAutoLock lock(lock_); // Drop messages from nodes that have been removed. if (nodes_.find(from_node->name()) == nodes_.end()) { return; } for (const auto& entry : nodes_) { TestNode* node = entry.second; // Broadcast doesn't deliver to the local node. if (node == from_node) { continue; } node->EnqueueEvent(from_node->name(), event->CloneForBroadcast()); } } // Acquired before any operation which makes a Node busy, and before testing // if all nodes are idle. mozilla::Mutex global_lock_ MOZ_UNANNOTATED{"PortsTest Global Lock"}; mozilla::Mutex lock_ MOZ_UNANNOTATED{"PortsTest Lock"}; std::map nodes_; }; } // namespace TEST_F(PortsTest, Basic1) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); PortRef x0, x1; CreatePortPair(&node0, &x0, &node1, &x1); PortRef a0, a1; EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1)); EXPECT_EQ(OK, node0.node().ClosePort(a0)); EXPECT_EQ(OK, node0.node().ClosePort(x0)); EXPECT_EQ(OK, node1.node().ClosePort(x1)); WaitForIdle(); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, Basic2) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); PortRef x0, x1; CreatePortPair(&node0, &x0, &node1, &x1); PortRef b0, b1; EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1)); EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again")); EXPECT_EQ(OK, node0.node().ClosePort(b0)); EXPECT_EQ(OK, node0.node().ClosePort(x0)); EXPECT_EQ(OK, node1.node().ClosePort(x1)); WaitForIdle(); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, Basic3) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); PortRef x0, x1; CreatePortPair(&node0, &x0, &node1, &x1); PortRef a0, a1; EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1)); EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again")); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0)); PortRef b0, b1; EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1)); EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz")); EXPECT_EQ(OK, node0.node().ClosePort(b0)); EXPECT_EQ(OK, node0.node().ClosePort(x0)); EXPECT_EQ(OK, node1.node().ClosePort(x1)); WaitForIdle(); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, LostConnectionToNode1) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); node1.set_drop_messages(true); PortRef x0, x1; CreatePortPair(&node0, &x0, &node1, &x1); // Transfer a port to node1 and simulate a lost connection to node1. PortRef a0, a1; EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1)); WaitForIdle(); RemoveNode(&node1); WaitForIdle(); EXPECT_EQ(OK, node0.node().ClosePort(a0)); EXPECT_EQ(OK, node0.node().ClosePort(x0)); EXPECT_EQ(OK, node1.node().ClosePort(x1)); WaitForIdle(); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, LostConnectionToNode2) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); PortRef x0, x1; CreatePortPair(&node0, &x0, &node1, &x1); PortRef a0, a1; EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1)); WaitForIdle(); node1.set_drop_messages(true); RemoveNode(&node1); WaitForIdle(); // a0 should have eventually detected peer closure after node loss. ScopedMessage message; EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.node().GetMessage(a0, &message, nullptr)); EXPECT_FALSE(message); EXPECT_EQ(OK, node0.node().ClosePort(a0)); EXPECT_EQ(OK, node0.node().ClosePort(x0)); EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr)); EXPECT_TRUE(message); node1.ClosePortsInEvent(message.get()); EXPECT_EQ(OK, node1.node().ClosePort(x1)); WaitForIdle(); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) { // Tests that a proxy gets cleaned up when its indirect peer lives on a lost // node. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); TestNode node2(2); AddNode(&node2); // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2. PortRef A, B, C, D; CreatePortPair(&node0, &A, &node1, &B); CreatePortPair(&node1, &C, &node2, &D); // Create E-F and send F over A to node 1. PortRef E, F; EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F)); WaitForIdle(); ScopedMessage message; ASSERT_TRUE(node1.ReadMessage(B, &message)); ASSERT_EQ(1u, message->num_ports()); EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F)); // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1 // will trivially become aware of the loss, and this test verifies that the // port A on node 0 will eventually also become aware of it. // Make sure node2 stops processing events when it encounters an ObserveProxy. node2.BlockOnEvent(Event::Type::kObserveProxy); EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F)); WaitForIdle(); // Simulate node 1 and 2 disconnecting. EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name())); // Let node2 continue processing events and wait for everyone to go idle. node2.Unblock(); WaitForIdle(); // Port F should be gone. EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F)); // Port E should have detected peer closure despite the fact that there is // no longer a continuous route from F to E over which the event could travel. PortStatus status{}; EXPECT_EQ(OK, node0.node().GetStatus(E, &status)); EXPECT_TRUE(status.peer_closed); EXPECT_EQ(OK, node0.node().ClosePort(A)); EXPECT_EQ(OK, node1.node().ClosePort(B)); EXPECT_EQ(OK, node1.node().ClosePort(C)); EXPECT_EQ(OK, node0.node().ClosePort(E)); WaitForIdle(); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, LostConnectionToNodeAfterSendingObserveProxy) { // Tests that a proxy gets cleaned up after a node disconnect if the // previous port already received the ObserveProxy event. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); TestNode node2(2); AddNode(&node2); // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2. PortRef A, B, C, D; CreatePortPair(&node0, &A, &node1, &B); CreatePortPair(&node1, &C, &node2, &D); // Create E-F and send F over A to node 1. PortRef E, F; EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F)); WaitForIdle(); ScopedMessage message; ASSERT_TRUE(node1.ReadMessage(B, &message)); ASSERT_EQ(1u, message->num_ports()); EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F)); // Send F over C to node 2 and then simulate node 2 loss from node 1 after // node 0 received the ObserveProxy event. Node 1 needs to clean up the // closed proxy while the node 0 to node 2 connection is still intact. node0.BlockOnEvent(Event::Type::kObserveProxy); EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F)); WaitForIdle(); // Simulate node 1 and 2 disconnecting. EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name())); // Let node2 continue processing events and wait for everyone to go idle. node0.Unblock(); WaitForIdle(); // Port F should be gone. EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F)); EXPECT_EQ(OK, node0.node().ClosePort(A)); EXPECT_EQ(OK, node1.node().ClosePort(B)); EXPECT_EQ(OK, node1.node().ClosePort(C)); EXPECT_EQ(OK, node0.node().ClosePort(E)); WaitForIdle(); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) { // Tests that a proxy gets cleaned up when its direct peer lives on a lost // node and it's predecessor lives on the same node. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); PortRef A, B; CreatePortPair(&node0, &A, &node1, &B); PortRef C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D)); // Send D but block node0 on an ObserveProxy event. node0.BlockOnEvent(Event::Type::kObserveProxy); EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D)); // node0 won't collapse the proxy but node1 will receive the message before // going idle. WaitForIdle(); ScopedMessage message; ASSERT_TRUE(node1.ReadMessage(B, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E)); RemoveNode(&node1); node0.Unblock(); WaitForIdle(); // Port C should have detected peer closure. PortStatus status{}; EXPECT_EQ(OK, node0.node().GetStatus(C, &status)); EXPECT_TRUE(status.peer_closed); EXPECT_EQ(OK, node0.node().ClosePort(A)); EXPECT_EQ(OK, node1.node().ClosePort(B)); EXPECT_EQ(OK, node0.node().ClosePort(C)); EXPECT_EQ(OK, node1.node().ClosePort(E)); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, GetMessage1) { TestNode node(0); AddNode(&node); PortRef a0, a1; EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); ScopedMessage message; EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); EXPECT_FALSE(message); EXPECT_EQ(OK, node.node().ClosePort(a1)); WaitForIdle(); EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node.node().GetMessage(a0, &message, nullptr)); EXPECT_FALSE(message); EXPECT_EQ(OK, node.node().ClosePort(a0)); WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, GetMessage2) { TestNode node(0); AddNode(&node); PortRef a0, a1; EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, node.SendStringMessage(a1, "1")); ScopedMessage message; EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); ASSERT_TRUE(message); EXPECT_TRUE(MessageEquals(message, "1")); EXPECT_EQ(OK, node.node().ClosePort(a0)); EXPECT_EQ(OK, node.node().ClosePort(a1)); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, GetMessage3) { TestNode node(0); AddNode(&node); PortRef a0, a1; EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); const char* kStrings[] = {"1", "2", "3"}; for (auto& kString : kStrings) { EXPECT_EQ(OK, node.SendStringMessage(a1, kString)); } ScopedMessage message; for (auto& kString : kStrings) { EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); ASSERT_TRUE(message); EXPECT_TRUE(MessageEquals(message, kString)); } EXPECT_EQ(OK, node.node().ClosePort(a0)); EXPECT_EQ(OK, node.node().ClosePort(a1)); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, Delegation1) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); PortRef x0, x1; CreatePortPair(&node0, &x0, &node1, &x1); // In this test, we send a message to a port that has been moved. PortRef a0, a1; EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1)); WaitForIdle(); ScopedMessage message; ASSERT_TRUE(node1.ReadMessage(x1, &message)); ASSERT_EQ(1u, message->num_ports()); EXPECT_TRUE(MessageEquals(message, "a1")); // This is "a1" from the point of view of node1. PortName a2_name = message->ports()[0]; EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name)); EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello")); WaitForIdle(); ASSERT_TRUE(node0.ReadMessage(x0, &message)); ASSERT_EQ(1u, message->num_ports()); EXPECT_TRUE(MessageEquals(message, "a2")); // This is "a2" from the point of view of node1. PortName a3_name = message->ports()[0]; PortRef a3; EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3)); ASSERT_TRUE(node0.ReadMessage(a3, &message)); EXPECT_EQ(0u, message->num_ports()); EXPECT_TRUE(MessageEquals(message, "hello")); EXPECT_EQ(OK, node0.node().ClosePort(a0)); EXPECT_EQ(OK, node0.node().ClosePort(a3)); EXPECT_EQ(OK, node0.node().ClosePort(x0)); EXPECT_EQ(OK, node1.node().ClosePort(x1)); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, Delegation2) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); for (int i = 0; i < 100; ++i) { // Setup pipe a<->b between node0 and node1. PortRef A, B; CreatePortPair(&node0, &A, &node1, &B); PortRef C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D)); PortRef E, F; EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); node1.set_save_messages(true); // Pass D over A to B. EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D)); // Pass F over C to D. EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F)); // This message should find its way to node1. EXPECT_EQ(OK, node0.SendStringMessage(E, "hello")); WaitForIdle(); EXPECT_EQ(OK, node0.node().ClosePort(C)); EXPECT_EQ(OK, node0.node().ClosePort(E)); EXPECT_EQ(OK, node0.node().ClosePort(A)); EXPECT_EQ(OK, node1.node().ClosePort(B)); bool got_hello = false; ScopedMessage message; while (node1.GetSavedMessage(&message)) { node1.ClosePortsInEvent(message.get()); if (MessageEquals(message, "hello")) { got_hello = true; break; } } EXPECT_TRUE(got_hello); WaitForIdle(); // Because closing ports may have generated tasks. } EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, SendUninitialized) { TestNode node(0); AddNode(&node); PortRef x0; EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0)); EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops")); EXPECT_EQ(OK, node.node().ClosePort(x0)); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, SendFailure) { TestNode node(0); AddNode(&node); node.set_save_messages(true); PortRef A, B; EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); // Try to send A over itself. EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF, node.SendStringMessageWithPort(A, "oops", A)); // Try to send B over A. EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER, node.SendStringMessageWithPort(A, "nope", B)); // B should be closed immediately. EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B)); WaitForIdle(); // There should have been no messages accepted. ScopedMessage message; EXPECT_FALSE(node.GetSavedMessage(&message)); EXPECT_EQ(OK, node.node().ClosePort(A)); WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, DontLeakUnreceivedPorts) { TestNode node(0); AddNode(&node); PortRef A, B, C, D; EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D)); EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D)); EXPECT_EQ(OK, node.node().ClosePort(C)); EXPECT_EQ(OK, node.node().ClosePort(A)); EXPECT_EQ(OK, node.node().ClosePort(B)); WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) { TestNode node(0); AddNode(&node); PortRef A, B, C, D; EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D)); EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D)); ScopedMessage message; EXPECT_TRUE(node.ReadMessage(B, &message)); ASSERT_EQ(1u, message->num_ports()); EXPECT_TRUE(MessageEquals(message, "foo")); PortRef E; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); EXPECT_TRUE( node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); WaitForIdle(); EXPECT_TRUE( node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); EXPECT_FALSE(node.node().CanShutdownCleanly()); EXPECT_EQ(OK, node.node().ClosePort(A)); EXPECT_EQ(OK, node.node().ClosePort(B)); EXPECT_EQ(OK, node.node().ClosePort(C)); EXPECT_EQ(OK, node.node().ClosePort(E)); WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, ProxyCollapse1) { TestNode node(0); AddNode(&node); PortRef A, B; EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); PortRef X, Y; EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); ScopedMessage message; // Send B and receive it as C. EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef C; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); // Send C and receive it as D. EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C)); ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef D; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D)); // Send D and receive it as E. EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D)); ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); EXPECT_EQ(OK, node.node().ClosePort(X)); EXPECT_EQ(OK, node.node().ClosePort(Y)); EXPECT_EQ(OK, node.node().ClosePort(A)); EXPECT_EQ(OK, node.node().ClosePort(E)); // The node should not idle until all proxies are collapsed. WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, ProxyCollapse2) { TestNode node(0); AddNode(&node); PortRef A, B; EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); PortRef X, Y; EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); ScopedMessage message; // Send B and A to create proxies in each direction. EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A)); EXPECT_EQ(OK, node.node().ClosePort(X)); EXPECT_EQ(OK, node.node().ClosePort(Y)); // At this point we have a scenario with: // // D -> [B] -> C -> [A] // // Ensure that the proxies can collapse. The sent ports will be closed // eventually as a result of Y's closure. WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, SendWithClosedPeer) { // This tests that if a port is sent when its peer is already known to be // closed, the newly created port will be aware of that peer closure, and the // proxy will eventually collapse. TestNode node(0); AddNode(&node); // Send a message from A to B, then close A. PortRef A, B; EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node.SendStringMessage(A, "hey")); EXPECT_EQ(OK, node.node().ClosePort(A)); // Now send B over X-Y as new port C. PortRef X, Y; EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); ScopedMessage message; ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef C; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); EXPECT_EQ(OK, node.node().ClosePort(X)); EXPECT_EQ(OK, node.node().ClosePort(Y)); WaitForIdle(); // C should have received the message originally sent to B, and it should also // be aware of A's closure. ASSERT_TRUE(node.ReadMessage(C, &message)); EXPECT_TRUE(MessageEquals(message, "hey")); PortStatus status{}; EXPECT_EQ(OK, node.node().GetStatus(C, &status)); EXPECT_FALSE(status.receiving_messages); EXPECT_FALSE(status.has_messages); EXPECT_TRUE(status.peer_closed); node.node().ClosePort(C); WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, SendWithClosedPeerSent) { // This tests that if a port is closed while some number of proxies are still // routing messages (directly or indirectly) to it, that the peer port is // eventually notified of the closure, and the dead-end proxies will // eventually be removed. TestNode node(0); AddNode(&node); PortRef X, Y; EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); PortRef A, B; EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); ScopedMessage message; // Send A as new port C. EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A)); ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef C; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); // Send C as new port D. EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C)); ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef D; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D)); // Send a message to B through D, then close D. EXPECT_EQ(OK, node.SendStringMessage(D, "hey")); EXPECT_EQ(OK, node.node().ClosePort(D)); // Now send B as new port E. EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); EXPECT_EQ(OK, node.node().ClosePort(X)); ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); EXPECT_EQ(OK, node.node().ClosePort(Y)); WaitForIdle(); // E should receive the message originally sent to B, and it should also be // aware of D's closure. ASSERT_TRUE(node.ReadMessage(E, &message)); EXPECT_TRUE(MessageEquals(message, "hey")); PortStatus status{}; EXPECT_EQ(OK, node.node().GetStatus(E, &status)); EXPECT_FALSE(status.receiving_messages); EXPECT_FALSE(status.has_messages); EXPECT_TRUE(status.peer_closed); EXPECT_EQ(OK, node.node().ClosePort(E)); WaitForIdle(); EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePorts) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); // Write a message on A. EXPECT_EQ(OK, node0.SendStringMessage(A, "hey")); // Initiate a merge between B and C. node1.AllowPortMerge(C); EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); WaitForIdle(); // Expect all proxies to be gone once idle. EXPECT_TRUE( node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); EXPECT_TRUE( node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); // Expect D to have received the message sent on A. ScopedMessage message; ASSERT_TRUE(node1.ReadMessage(D, &message)); EXPECT_TRUE(MessageEquals(message, "hey")); EXPECT_EQ(OK, node0.node().ClosePort(A)); EXPECT_EQ(OK, node1.node().ClosePort(D)); // No more ports should be open. EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortWithClosedPeer1) { // This tests that the right thing happens when initiating a merge on a port // whose peer has already been closed. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); // Write a message on A. EXPECT_EQ(OK, node0.SendStringMessage(A, "hey")); // Close A. EXPECT_EQ(OK, node0.node().ClosePort(A)); // Initiate a merge between B and C. node1.AllowPortMerge(C); EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); WaitForIdle(); // Expect all proxies to be gone once idle. node0 should have no ports since // A was explicitly closed. EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE( node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); // Expect D to have received the message sent on A. ScopedMessage message; ASSERT_TRUE(node1.ReadMessage(D, &message)); EXPECT_TRUE(MessageEquals(message, "hey")); EXPECT_EQ(OK, node1.node().ClosePort(D)); // No more ports should be open. EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortWithClosedPeer2) { // This tests that the right thing happens when merging into a port whose peer // has already been closed. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); // Write a message on D and close it. EXPECT_EQ(OK, node1.SendStringMessage(D, "hey")); EXPECT_EQ(OK, node1.node().ClosePort(D)); // Initiate a merge between B and C. node1.AllowPortMerge(C); EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); WaitForIdle(); // Expect all proxies to be gone once idle. node1 should have no ports since // D was explicitly closed. EXPECT_TRUE( node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); EXPECT_TRUE(node1.node().CanShutdownCleanly()); // Expect A to have received the message sent on D. ScopedMessage message; ASSERT_TRUE(node0.ReadMessage(A, &message)); EXPECT_TRUE(MessageEquals(message, "hey")); EXPECT_EQ(OK, node0.node().ClosePort(A)); // No more ports should be open. EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortsWithClosedPeers) { // This tests that no residual ports are left behind if two ports are merged // when both of their peers have been closed. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); // Close A and D. EXPECT_EQ(OK, node0.node().ClosePort(A)); EXPECT_EQ(OK, node1.node().ClosePort(D)); WaitForIdle(); // Initiate a merge between B and C. node1.AllowPortMerge(C); EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); WaitForIdle(); // Expect everything to have gone away. EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortsWithMovedPeers) { // This tests that ports can be merged successfully even if their peers are // moved around. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); // Set up another pair X-Y for moving ports on node0. PortRef X, Y; EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y)); ScopedMessage message; // Move A to new port E. EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A)); ASSERT_TRUE(node0.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E)); EXPECT_EQ(OK, node0.node().ClosePort(X)); EXPECT_EQ(OK, node0.node().ClosePort(Y)); // Write messages on E and D. EXPECT_EQ(OK, node0.SendStringMessage(E, "hey")); EXPECT_EQ(OK, node1.SendStringMessage(D, "hi")); // Initiate a merge between B and C. node1.AllowPortMerge(C); EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); WaitForIdle(); // Expect to receive D's message on E and E's message on D. ASSERT_TRUE(node0.ReadMessage(E, &message)); EXPECT_TRUE(MessageEquals(message, "hi")); ASSERT_TRUE(node1.ReadMessage(D, &message)); EXPECT_TRUE(MessageEquals(message, "hey")); // Close E and D. EXPECT_EQ(OK, node0.node().ClosePort(E)); EXPECT_EQ(OK, node1.node().ClosePort(D)); WaitForIdle(); // Expect everything to have gone away. EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortsFailsGracefully) { // This tests that the system remains in a well-defined state if something // goes wrong during port merge. TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); ScopedMessage message; PortRef X, Y; EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X)); EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y)); EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name(), node1.name(), Y.name())); EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name(), node0.name(), X.name())); // Block the merge from proceeding until we can do something stupid with port // C. This avoids the test logic racing with async merge logic. node1.BlockOnEvent(Event::Type::kMergePort); // Initiate the merge between B and C. node1.AllowPortMerge(C); EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); // Move C to a new port E. This is not a sane use of Node's public API but // is still hypothetically possible. It allows us to force a merge failure // because C will be in an invalid state by the time the merge is processed. // As a result, B should be closed. EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C)); node1.Unblock(); WaitForIdle(); ASSERT_TRUE(node0.ReadMessage(X, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E)); EXPECT_EQ(OK, node0.node().ClosePort(X)); EXPECT_EQ(OK, node1.node().ClosePort(Y)); WaitForIdle(); // C goes away as a result of normal proxy removal. B should have been closed // cleanly by the failed MergePorts. EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C)); EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B)); // Close A, D, and E. EXPECT_EQ(OK, node0.node().ClosePort(A)); EXPECT_EQ(OK, node1.node().ClosePort(D)); EXPECT_EQ(OK, node0.node().ClosePort(E)); WaitForIdle(); // Expect everything to have gone away. EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, RemotePeerStatus) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Create a local port pair. Neither port should appear to have a remote peer. PortRef a, b; PortStatus status{}; node0.node().CreatePortPair(&a, &b); ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); EXPECT_FALSE(status.peer_remote); ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); EXPECT_FALSE(status.peer_remote); // Create a port pair spanning the two nodes. Both spanning ports should // immediately appear to have a remote peer. PortRef x0, x1; CreatePortPair(&node0, &x0, &node1, &x1); ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); EXPECT_TRUE(status.peer_remote); ASSERT_EQ(OK, node1.node().GetStatus(x1, &status)); EXPECT_TRUE(status.peer_remote); PortRef x2, x3; CreatePortPair(&node0, &x2, &node1, &x3); // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers // remote and the remote peers local. EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b)); EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1)); WaitForIdle(); ScopedMessage message; ASSERT_TRUE(node0.ReadMessage(x2, &message)); ASSERT_EQ(1u, message->num_ports()); ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1)); ASSERT_TRUE(node1.ReadMessage(x3, &message)); ASSERT_EQ(1u, message->num_ports()); ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b)); // Now x0-x1 should be local to node0 and a-b should span the nodes. ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); EXPECT_FALSE(status.peer_remote); ASSERT_EQ(OK, node0.node().GetStatus(x1, &status)); EXPECT_FALSE(status.peer_remote); ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); EXPECT_TRUE(status.peer_remote); ASSERT_EQ(OK, node1.node().GetStatus(b, &status)); EXPECT_TRUE(status.peer_remote); // And swap them back one more time. EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1)); EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b)); WaitForIdle(); ASSERT_TRUE(node0.ReadMessage(x2, &message)); ASSERT_EQ(1u, message->num_ports()); ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b)); ASSERT_TRUE(node1.ReadMessage(x3, &message)); ASSERT_EQ(1u, message->num_ports()); ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1)); ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); EXPECT_TRUE(status.peer_remote); ASSERT_EQ(OK, node1.node().GetStatus(x1, &status)); EXPECT_TRUE(status.peer_remote); ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); EXPECT_FALSE(status.peer_remote); ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); EXPECT_FALSE(status.peer_remote); EXPECT_EQ(OK, node0.node().ClosePort(x0)); EXPECT_EQ(OK, node1.node().ClosePort(x1)); EXPECT_EQ(OK, node0.node().ClosePort(x2)); EXPECT_EQ(OK, node1.node().ClosePort(x3)); EXPECT_EQ(OK, node0.node().ClosePort(a)); EXPECT_EQ(OK, node0.node().ClosePort(b)); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Set up a-b on node0 and c-d spanning node0-node1. PortRef a, b, c, d; node0.node().CreatePortPair(&a, &b); CreatePortPair(&node0, &c, &node1, &d); PortStatus status{}; ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); EXPECT_FALSE(status.peer_remote); ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); EXPECT_FALSE(status.peer_remote); ASSERT_EQ(OK, node0.node().GetStatus(c, &status)); EXPECT_TRUE(status.peer_remote); ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); EXPECT_TRUE(status.peer_remote); EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c)); WaitForIdle(); ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); EXPECT_TRUE(status.peer_remote); ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); EXPECT_TRUE(status.peer_remote); EXPECT_EQ(OK, node0.node().ClosePort(a)); EXPECT_EQ(OK, node1.node().ClosePort(d)); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) { TestNode node0(0); AddNode(&node0); TestNode node1(1); AddNode(&node1); // Set up a-b on node0 and c-d on node1. PortRef a, b, c, d; node0.node().CreatePortPair(&a, &b); node1.node().CreatePortPair(&c, &d); PortStatus status{}; ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); EXPECT_FALSE(status.peer_remote); ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); EXPECT_FALSE(status.peer_remote); ASSERT_EQ(OK, node1.node().GetStatus(c, &status)); EXPECT_FALSE(status.peer_remote); ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); EXPECT_FALSE(status.peer_remote); node1.AllowPortMerge(c); EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name())); WaitForIdle(); ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); EXPECT_TRUE(status.peer_remote); ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); EXPECT_TRUE(status.peer_remote); EXPECT_EQ(OK, node0.node().ClosePort(a)); EXPECT_EQ(OK, node1.node().ClosePort(d)); EXPECT_TRUE(node0.node().CanShutdownCleanly()); EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, RetransmitUserMessageEvents) { // Ensures that user message events can be retransmitted properly. TestNode node0(0); AddNode(&node0); PortRef a, b; node0.node().CreatePortPair(&a, &b); // Ping. const char* kMessage = "hey"; ScopedMessage message; EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage)); ASSERT_TRUE(node0.ReadMessage(b, &message)); EXPECT_TRUE(MessageEquals(message, kMessage)); // Pong. EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message))); EXPECT_FALSE(message); ASSERT_TRUE(node0.ReadMessage(a, &message)); EXPECT_TRUE(MessageEquals(message, kMessage)); // Ping again. EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message))); EXPECT_FALSE(message); ASSERT_TRUE(node0.ReadMessage(b, &message)); EXPECT_TRUE(MessageEquals(message, kMessage)); // Pong again! EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message))); EXPECT_FALSE(message); ASSERT_TRUE(node0.ReadMessage(a, &message)); EXPECT_TRUE(MessageEquals(message, kMessage)); EXPECT_EQ(OK, node0.node().ClosePort(a)); EXPECT_EQ(OK, node0.node().ClosePort(b)); } TEST_F(PortsTest, SetAcknowledgeRequestInterval) { TestNode node0(0); AddNode(&node0); PortRef a0, a1; EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0)); // Send a batch of messages. EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 15)); EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0)); EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5)); WaitForIdle(); EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0)); // Set to acknowledge every read message, and validate that already-read // messages are acknowledged. EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 1)); WaitForIdle(); EXPECT_EQ(10u, node0.GetUnacknowledgedMessageCount(a0)); // Read a third of the messages from the other end. EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5)); WaitForIdle(); EXPECT_EQ(5u, node0.GetUnacknowledgedMessageCount(a0)); TestNode node1(1); AddNode(&node1); // Transfer a1 across to node1. PortRef x0, x1; CreatePortPair(&node0, &x0, &node1, &x1); EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1)); WaitForIdle(); ScopedMessage message; ASSERT_TRUE(node1.ReadMessage(x1, &message)); ASSERT_EQ(1u, message->num_ports()); ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &a1)); // Read the last third of the messages from the transferred node, and // validate that the unacknowledge message count updates correctly. EXPECT_TRUE(node1.ReadMultipleMessages(a1, 5)); WaitForIdle(); EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0)); // Turn the acknowledges down and validate that they don't go on indefinitely. EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 0)); EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 10)); WaitForIdle(); EXPECT_TRUE(node1.ReadMultipleMessages(a1, 10)); WaitForIdle(); EXPECT_NE(0u, node0.GetUnacknowledgedMessageCount(a0)); // Close the far port and validate that the closure updates the unacknowledged // count. EXPECT_EQ(OK, node1.node().ClosePort(a1)); WaitForIdle(); EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0)); EXPECT_EQ(OK, node0.node().ClosePort(a0)); EXPECT_EQ(OK, node0.node().ClosePort(x0)); EXPECT_EQ(OK, node1.node().ClosePort(x1)); } } // namespace test } // namespace ports } // namespace core } // namespace mojo