diff options
Diffstat (limited to 'ipc/chromium/gtest/ports_unittest.cc')
-rw-r--r-- | ipc/chromium/gtest/ports_unittest.cc | 1751 |
1 files changed, 1751 insertions, 0 deletions
diff --git a/ipc/chromium/gtest/ports_unittest.cc b/ipc/chromium/gtest/ports_unittest.cc new file mode 100644 index 0000000000..7469a1371f --- /dev/null +++ b/ipc/chromium/gtest/ports_unittest.cc @@ -0,0 +1,1751 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <inttypes.h> +#include <stdio.h> + +#include <map> +#include <utility> + +#include "base/logging.h" +#include "base/waitable_event.h" +#include "base/thread.h" +#include "base/string_util.h" +#include "mojo/core/ports/event.h" +#include "mojo/core/ports/node.h" +#include "mojo/core/ports/node_delegate.h" +#include "mojo/core/ports/user_message.h" +#include "testing/gtest/include/gtest/gtest.h" + +#include "mozilla/Mutex.h" + +namespace mojo { +namespace core { +namespace ports { +namespace test { + +namespace { + +// TODO(rockot): Remove this unnecessary alias. +using ScopedMessage = mozilla::UniquePtr<UserMessageEvent>; + +class TestMessage : public UserMessage { + public: + static const TypeInfo kUserMessageTypeInfo; + + explicit TestMessage(const std::string& payload) + : UserMessage(&kUserMessageTypeInfo), payload_(payload) {} + ~TestMessage() override = default; + + const std::string& payload() const { return payload_; } + + private: + std::string payload_; +}; + +const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {}; + +ScopedMessage NewUserMessageEvent(const std::string& payload, + size_t num_ports) { + auto event = mozilla::MakeUnique<UserMessageEvent>(num_ports); + event->AttachMessage(mozilla::MakeUnique<TestMessage>(payload)); + return event; +} + +bool MessageEquals(const ScopedMessage& message, const std::string& s) { + return message->GetMessage<TestMessage>()->payload() == s; +} + +class TestNode; + +class MessageRouter { + public: + virtual ~MessageRouter() = default; + + virtual void ForwardEvent(TestNode* from_node, const NodeName& node_name, + ScopedEvent event) = 0; + virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0; +}; + +class TestNode : public NodeDelegate { + public: + explicit TestNode(uint64_t id) + : node_name_(id, 1), + node_(node_name_, this), + node_thread_(StringPrintf("Node %" PRIu64 " thread", id).c_str()), + events_available_event_(/* manual_reset */ false, + /* initially_signaled */ false), + idle_event_(/* manual_reset */ true, /* initially_signaled */ true) {} + + ~TestNode() override { + StopWhenIdle(); + node_thread_.Stop(); + } + + const NodeName& name() const { return node_name_; } + + // NOTE: Node is thread-safe. + Node& node() { return node_; } + + base::WaitableEvent& idle_event() { return idle_event_; } + + bool IsIdle() { + mozilla::MutexAutoLock lock(lock_); + return started_ && !dispatching_ && + (incoming_events_.empty() || (block_on_event_ && blocked_)); + } + + void BlockOnEvent(Event::Type type) { + mozilla::MutexAutoLock lock(lock_); + blocked_event_type_ = type; + block_on_event_ = true; + } + + void Unblock() { + mozilla::MutexAutoLock lock(lock_); + block_on_event_ = false; + events_available_event_.Signal(); + } + + void Start(MessageRouter* router) { + router_ = router; + node_thread_.Start(); + node_thread_.message_loop()->PostTask(mozilla::NewNonOwningRunnableMethod( + "TestNode::ProcessEvents", this, &TestNode::ProcessEvents)); + } + + void StopWhenIdle() { + mozilla::MutexAutoLock lock(lock_); + should_quit_ = true; + events_available_event_.Signal(); + } + + void WakeUp() { events_available_event_.Signal(); } + + int SendStringMessage(const PortRef& port, const std::string& s) { + return node_.SendUserMessage(port, NewUserMessageEvent(s, 0)); + } + + int SendMultipleMessages(const PortRef& port, size_t num_messages) { + for (size_t i = 0; i < num_messages; ++i) { + int result = SendStringMessage(port, ""); + if (result != OK) { + return result; + } + } + return OK; + } + + int SendStringMessageWithPort(const PortRef& port, const std::string& s, + const PortName& sent_port_name) { + auto event = NewUserMessageEvent(s, 1); + event->ports()[0] = sent_port_name; + return node_.SendUserMessage(port, std::move(event)); + } + + int SendStringMessageWithPort(const PortRef& port, const std::string& s, + const PortRef& sent_port) { + return SendStringMessageWithPort(port, s, sent_port.name()); + } + + void set_drop_messages(bool value) { + mozilla::MutexAutoLock lock(lock_); + drop_messages_ = value; + } + + void set_save_messages(bool value) { + mozilla::MutexAutoLock lock(lock_); + save_messages_ = value; + } + + bool ReadMessage(const PortRef& port, ScopedMessage* message) { + return node_.GetMessage(port, message, nullptr) == OK && *message; + } + + bool ReadMultipleMessages(const PortRef& port, size_t num_messages) { + for (size_t i = 0; i < num_messages; ++i) { + ScopedMessage message; + if (!ReadMessage(port, &message)) { + return false; + } + } + return true; + } + + bool GetSavedMessage(ScopedMessage* message) { + mozilla::MutexAutoLock lock(lock_); + if (saved_messages_.empty()) { + message->reset(); + return false; + } + std::swap(*message, saved_messages_.front()); + saved_messages_.pop(); + return true; + } + + void EnqueueEvent(ScopedEvent event) { + idle_event_.Reset(); + + // NOTE: This may be called from ForwardMessage and thus must not reenter + // |node_|. + mozilla::MutexAutoLock lock(lock_); + incoming_events_.emplace(std::move(event)); + events_available_event_.Signal(); + } + + void ForwardEvent(const NodeName& node_name, ScopedEvent event) override { + { + mozilla::MutexAutoLock lock(lock_); + if (drop_messages_) { + DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to " + << node_name; + + mozilla::MutexAutoUnlock unlock(lock_); + ClosePortsInEvent(event.get()); + return; + } + } + + DCHECK(router_); + DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name; + router_->ForwardEvent(this, node_name, std::move(event)); + } + + void BroadcastEvent(ScopedEvent event) override { + router_->BroadcastEvent(this, std::move(event)); + } + + void PortStatusChanged(const PortRef& port) override { + // The port may be closed, in which case we ignore the notification. + mozilla::MutexAutoLock lock(lock_); + if (!save_messages_) { + return; + } + + for (;;) { + ScopedMessage message; + { + mozilla::MutexAutoUnlock unlock(lock_); + if (!ReadMessage(port, &message)) { + break; + } + } + + saved_messages_.emplace(std::move(message)); + } + } + + void ClosePortsInEvent(Event* event) { + if (event->type() != Event::Type::kUserMessage) { + return; + } + + UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event); + for (size_t i = 0; i < message_event->num_ports(); ++i) { + PortRef port; + ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port)); + EXPECT_EQ(OK, node_.ClosePort(port)); + } + } + + uint64_t GetUnacknowledgedMessageCount(const PortRef& port_ref) { + PortStatus status{}; + if (node_.GetStatus(port_ref, &status) != OK) { + return 0; + } + + return status.unacknowledged_message_count; + } + + private: + void ProcessEvents() { + for (;;) { + events_available_event_.Wait(); + mozilla::MutexAutoLock lock(lock_); + + if (should_quit_) { + return; + } + + dispatching_ = true; + while (!incoming_events_.empty()) { + if (block_on_event_ && + incoming_events_.front()->type() == blocked_event_type_) { + blocked_ = true; + // Go idle if we hit a blocked event type. + break; + } + blocked_ = false; + + ScopedEvent event = std::move(incoming_events_.front()); + incoming_events_.pop(); + + // NOTE: AcceptMessage() can re-enter this object to call any of the + // NodeDelegate interface methods. + mozilla::MutexAutoUnlock unlock(lock_); + node_.AcceptEvent(std::move(event)); + } + + dispatching_ = false; + started_ = true; + idle_event_.Signal(); + }; + } + + const NodeName node_name_; + Node node_; + MessageRouter* router_ = nullptr; + + base::Thread node_thread_; + base::WaitableEvent events_available_event_; + base::WaitableEvent idle_event_; + + // Guards fields below. + mozilla::Mutex lock_ MOZ_UNANNOTATED{"TestNode"}; + bool started_ = false; + bool dispatching_ = false; + bool should_quit_ = false; + bool drop_messages_ = false; + bool save_messages_ = false; + bool blocked_ = false; + bool block_on_event_ = false; + Event::Type blocked_event_type_{}; + std::queue<ScopedEvent> incoming_events_; + std::queue<ScopedMessage> saved_messages_; +}; + +class PortsTest : public testing::Test, public MessageRouter { + public: + void AddNode(TestNode* node) { + { + mozilla::MutexAutoLock lock(lock_); + nodes_[node->name()] = node; + } + node->Start(this); + } + + void RemoveNode(TestNode* node) { + { + mozilla::MutexAutoLock lock(lock_); + nodes_.erase(node->name()); + } + + for (const auto& entry : nodes_) { + entry.second->node().LostConnectionToNode(node->name()); + } + } + + // Waits until all known Nodes are idle. Message forwarding and processing + // is handled in such a way that idleness is a stable state: once all nodes in + // the system are idle, they will remain idle until the test explicitly + // initiates some further event (e.g. sending a message, closing a port, or + // removing a Node). + void WaitForIdle() { + for (;;) { + mozilla::MutexAutoLock global_lock(global_lock_); + bool all_nodes_idle = true; + for (const auto& entry : nodes_) { + if (!entry.second->IsIdle()) { + all_nodes_idle = false; + } + entry.second->WakeUp(); + } + if (all_nodes_idle) { + return; + } + + // Wait for any Node to signal that it's idle. + mozilla::MutexAutoUnlock global_unlock(global_lock_); + std::vector<base::WaitableEvent*> events; + for (const auto& entry : nodes_) { + events.push_back(&entry.second->idle_event()); + } + base::WaitableEvent::WaitMany(events.data(), events.size()); + } + } + + void CreatePortPair(TestNode* node0, PortRef* port0, TestNode* node1, + PortRef* port1) { + if (node0 == node1) { + EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1)); + } else { + EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0)); + EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1)); + EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(), + port1->name())); + EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(), + port0->name())); + } + } + + private: + // MessageRouter: + void ForwardEvent(TestNode* from_node, const NodeName& node_name, + ScopedEvent event) override { + mozilla::MutexAutoLock global_lock(global_lock_); + mozilla::MutexAutoLock lock(lock_); + // Drop messages from nodes that have been removed. + if (nodes_.find(from_node->name()) == nodes_.end()) { + from_node->ClosePortsInEvent(event.get()); + return; + } + + auto it = nodes_.find(node_name); + if (it == nodes_.end()) { + DVLOG(1) << "Node not found: " << node_name; + return; + } + + // Serialize and de-serialize all forwarded events. + size_t buf_size = event->GetSerializedSize(); + mozilla::UniquePtr<char[]> buf(new char[buf_size]); + event->Serialize(buf.get()); + ScopedEvent copy = Event::Deserialize(buf.get(), buf_size); + // This should always succeed unless serialization or deserialization + // is broken. In that case, the loss of events should cause a test failure. + ASSERT_TRUE(copy); + + // Also copy the payload for user messages. + if (event->type() == Event::Type::kUserMessage) { + UserMessageEvent* message_event = + static_cast<UserMessageEvent*>(event.get()); + UserMessageEvent* message_copy = + static_cast<UserMessageEvent*>(copy.get()); + + message_copy->AttachMessage(mozilla::MakeUnique<TestMessage>( + message_event->GetMessage<TestMessage>()->payload())); + } + + it->second->EnqueueEvent(std::move(event)); + } + + void BroadcastEvent(TestNode* from_node, ScopedEvent event) override { + mozilla::MutexAutoLock global_lock(global_lock_); + mozilla::MutexAutoLock lock(lock_); + + // Drop messages from nodes that have been removed. + if (nodes_.find(from_node->name()) == nodes_.end()) { + return; + } + + for (const auto& entry : nodes_) { + TestNode* node = entry.second; + // Broadcast doesn't deliver to the local node. + if (node == from_node) { + continue; + } + node->EnqueueEvent(event->Clone()); + } + } + + // Acquired before any operation which makes a Node busy, and before testing + // if all nodes are idle. + mozilla::Mutex global_lock_ MOZ_UNANNOTATED{"PortsTest Global Lock"}; + + mozilla::Mutex lock_ MOZ_UNANNOTATED{"PortsTest Lock"}; + std::map<NodeName, TestNode*> nodes_; +}; + +} // namespace + +TEST_F(PortsTest, Basic1) { + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + PortRef x0, x1; + CreatePortPair(&node0, &x0, &node1, &x1); + + PortRef a0, a1; + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1)); + EXPECT_EQ(OK, node0.node().ClosePort(a0)); + + EXPECT_EQ(OK, node0.node().ClosePort(x0)); + EXPECT_EQ(OK, node1.node().ClosePort(x1)); + + WaitForIdle(); + + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, Basic2) { + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + PortRef x0, x1; + CreatePortPair(&node0, &x0, &node1, &x1); + + PortRef b0, b1; + EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1)); + EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again")); + + EXPECT_EQ(OK, node0.node().ClosePort(b0)); + + EXPECT_EQ(OK, node0.node().ClosePort(x0)); + EXPECT_EQ(OK, node1.node().ClosePort(x1)); + + WaitForIdle(); + + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, Basic3) { + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + PortRef x0, x1; + CreatePortPair(&node0, &x0, &node1, &x1); + + PortRef a0, a1; + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); + + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1)); + EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again")); + + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0)); + + PortRef b0, b1; + EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1)); + EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz")); + + EXPECT_EQ(OK, node0.node().ClosePort(b0)); + + EXPECT_EQ(OK, node0.node().ClosePort(x0)); + EXPECT_EQ(OK, node1.node().ClosePort(x1)); + + WaitForIdle(); + + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, LostConnectionToNode1) { + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + node1.set_drop_messages(true); + + PortRef x0, x1; + CreatePortPair(&node0, &x0, &node1, &x1); + + // Transfer a port to node1 and simulate a lost connection to node1. + + PortRef a0, a1; + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1)); + + WaitForIdle(); + + RemoveNode(&node1); + + WaitForIdle(); + + EXPECT_EQ(OK, node0.node().ClosePort(a0)); + EXPECT_EQ(OK, node0.node().ClosePort(x0)); + EXPECT_EQ(OK, node1.node().ClosePort(x1)); + + WaitForIdle(); + + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, LostConnectionToNode2) { + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + PortRef x0, x1; + CreatePortPair(&node0, &x0, &node1, &x1); + + PortRef a0, a1; + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1)); + + WaitForIdle(); + + node1.set_drop_messages(true); + + RemoveNode(&node1); + + WaitForIdle(); + + // a0 should have eventually detected peer closure after node loss. + ScopedMessage message; + EXPECT_EQ(ERROR_PORT_PEER_CLOSED, + node0.node().GetMessage(a0, &message, nullptr)); + EXPECT_FALSE(message); + + EXPECT_EQ(OK, node0.node().ClosePort(a0)); + + EXPECT_EQ(OK, node0.node().ClosePort(x0)); + + EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr)); + EXPECT_TRUE(message); + node1.ClosePortsInEvent(message.get()); + + EXPECT_EQ(OK, node1.node().ClosePort(x1)); + + WaitForIdle(); + + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) { + // Tests that a proxy gets cleaned up when its indirect peer lives on a lost + // node. + + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + TestNode node2(2); + AddNode(&node2); + + // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2. + PortRef A, B, C, D; + CreatePortPair(&node0, &A, &node1, &B); + CreatePortPair(&node1, &C, &node2, &D); + + // Create E-F and send F over A to node 1. + PortRef E, F; + EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F)); + + WaitForIdle(); + + ScopedMessage message; + ASSERT_TRUE(node1.ReadMessage(B, &message)); + ASSERT_EQ(1u, message->num_ports()); + + EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F)); + + // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1 + // will trivially become aware of the loss, and this test verifies that the + // port A on node 0 will eventually also become aware of it. + + // Make sure node2 stops processing events when it encounters an ObserveProxy. + node2.BlockOnEvent(Event::Type::kObserveProxy); + + EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F)); + WaitForIdle(); + + // Simulate node 1 and 2 disconnecting. + EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name())); + + // Let node2 continue processing events and wait for everyone to go idle. + node2.Unblock(); + WaitForIdle(); + + // Port F should be gone. + EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F)); + + // Port E should have detected peer closure despite the fact that there is + // no longer a continuous route from F to E over which the event could travel. + PortStatus status{}; + EXPECT_EQ(OK, node0.node().GetStatus(E, &status)); + EXPECT_TRUE(status.peer_closed); + + EXPECT_EQ(OK, node0.node().ClosePort(A)); + EXPECT_EQ(OK, node1.node().ClosePort(B)); + EXPECT_EQ(OK, node1.node().ClosePort(C)); + EXPECT_EQ(OK, node0.node().ClosePort(E)); + + WaitForIdle(); + + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) { + // Tests that a proxy gets cleaned up when its direct peer lives on a lost + // node and it's predecessor lives on the same node. + + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + PortRef A, B; + CreatePortPair(&node0, &A, &node1, &B); + + PortRef C, D; + EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D)); + + // Send D but block node0 on an ObserveProxy event. + node0.BlockOnEvent(Event::Type::kObserveProxy); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D)); + + // node0 won't collapse the proxy but node1 will receive the message before + // going idle. + WaitForIdle(); + + ScopedMessage message; + ASSERT_TRUE(node1.ReadMessage(B, &message)); + ASSERT_EQ(1u, message->num_ports()); + PortRef E; + EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E)); + + RemoveNode(&node1); + + node0.Unblock(); + WaitForIdle(); + + // Port C should have detected peer closure. + PortStatus status{}; + EXPECT_EQ(OK, node0.node().GetStatus(C, &status)); + EXPECT_TRUE(status.peer_closed); + + EXPECT_EQ(OK, node0.node().ClosePort(A)); + EXPECT_EQ(OK, node1.node().ClosePort(B)); + EXPECT_EQ(OK, node0.node().ClosePort(C)); + EXPECT_EQ(OK, node1.node().ClosePort(E)); + + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, GetMessage1) { + TestNode node(0); + AddNode(&node); + + PortRef a0, a1; + EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); + + ScopedMessage message; + EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); + EXPECT_FALSE(message); + + EXPECT_EQ(OK, node.node().ClosePort(a1)); + + WaitForIdle(); + + EXPECT_EQ(ERROR_PORT_PEER_CLOSED, + node.node().GetMessage(a0, &message, nullptr)); + EXPECT_FALSE(message); + + EXPECT_EQ(OK, node.node().ClosePort(a0)); + + WaitForIdle(); + + EXPECT_TRUE(node.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, GetMessage2) { + TestNode node(0); + AddNode(&node); + + PortRef a0, a1; + EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); + + EXPECT_EQ(OK, node.SendStringMessage(a1, "1")); + + ScopedMessage message; + EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); + + ASSERT_TRUE(message); + EXPECT_TRUE(MessageEquals(message, "1")); + + EXPECT_EQ(OK, node.node().ClosePort(a0)); + EXPECT_EQ(OK, node.node().ClosePort(a1)); + + EXPECT_TRUE(node.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, GetMessage3) { + TestNode node(0); + AddNode(&node); + + PortRef a0, a1; + EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); + + const char* kStrings[] = {"1", "2", "3"}; + + for (auto& kString : kStrings) { + EXPECT_EQ(OK, node.SendStringMessage(a1, kString)); + } + + ScopedMessage message; + for (auto& kString : kStrings) { + EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); + ASSERT_TRUE(message); + EXPECT_TRUE(MessageEquals(message, kString)); + } + + EXPECT_EQ(OK, node.node().ClosePort(a0)); + EXPECT_EQ(OK, node.node().ClosePort(a1)); + + EXPECT_TRUE(node.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, Delegation1) { + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + PortRef x0, x1; + CreatePortPair(&node0, &x0, &node1, &x1); + + // In this test, we send a message to a port that has been moved. + + PortRef a0, a1; + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1)); + WaitForIdle(); + + ScopedMessage message; + ASSERT_TRUE(node1.ReadMessage(x1, &message)); + ASSERT_EQ(1u, message->num_ports()); + EXPECT_TRUE(MessageEquals(message, "a1")); + + // This is "a1" from the point of view of node1. + PortName a2_name = message->ports()[0]; + EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name)); + EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello")); + + WaitForIdle(); + + ASSERT_TRUE(node0.ReadMessage(x0, &message)); + ASSERT_EQ(1u, message->num_ports()); + EXPECT_TRUE(MessageEquals(message, "a2")); + + // This is "a2" from the point of view of node1. + PortName a3_name = message->ports()[0]; + + PortRef a3; + EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3)); + + ASSERT_TRUE(node0.ReadMessage(a3, &message)); + EXPECT_EQ(0u, message->num_ports()); + EXPECT_TRUE(MessageEquals(message, "hello")); + + EXPECT_EQ(OK, node0.node().ClosePort(a0)); + EXPECT_EQ(OK, node0.node().ClosePort(a3)); + + EXPECT_EQ(OK, node0.node().ClosePort(x0)); + EXPECT_EQ(OK, node1.node().ClosePort(x1)); + + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, Delegation2) { + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + for (int i = 0; i < 100; ++i) { + // Setup pipe a<->b between node0 and node1. + PortRef A, B; + CreatePortPair(&node0, &A, &node1, &B); + + PortRef C, D; + EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D)); + + PortRef E, F; + EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); + + node1.set_save_messages(true); + + // Pass D over A to B. + EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D)); + + // Pass F over C to D. + EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F)); + + // This message should find its way to node1. + EXPECT_EQ(OK, node0.SendStringMessage(E, "hello")); + + WaitForIdle(); + + EXPECT_EQ(OK, node0.node().ClosePort(C)); + EXPECT_EQ(OK, node0.node().ClosePort(E)); + + EXPECT_EQ(OK, node0.node().ClosePort(A)); + EXPECT_EQ(OK, node1.node().ClosePort(B)); + + bool got_hello = false; + ScopedMessage message; + while (node1.GetSavedMessage(&message)) { + node1.ClosePortsInEvent(message.get()); + if (MessageEquals(message, "hello")) { + got_hello = true; + break; + } + } + + EXPECT_TRUE(got_hello); + + WaitForIdle(); // Because closing ports may have generated tasks. + } + + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, SendUninitialized) { + TestNode node(0); + AddNode(&node); + + PortRef x0; + EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0)); + EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops")); + EXPECT_EQ(OK, node.node().ClosePort(x0)); + EXPECT_TRUE(node.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, SendFailure) { + TestNode node(0); + AddNode(&node); + + node.set_save_messages(true); + + PortRef A, B; + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); + + // Try to send A over itself. + + EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF, + node.SendStringMessageWithPort(A, "oops", A)); + + // Try to send B over A. + + EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER, + node.SendStringMessageWithPort(A, "nope", B)); + + // B should be closed immediately. + EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B)); + + WaitForIdle(); + + // There should have been no messages accepted. + ScopedMessage message; + EXPECT_FALSE(node.GetSavedMessage(&message)); + + EXPECT_EQ(OK, node.node().ClosePort(A)); + + WaitForIdle(); + + EXPECT_TRUE(node.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, DontLeakUnreceivedPorts) { + TestNode node(0); + AddNode(&node); + + PortRef A, B, C, D; + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D)); + + EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D)); + + EXPECT_EQ(OK, node.node().ClosePort(C)); + EXPECT_EQ(OK, node.node().ClosePort(A)); + EXPECT_EQ(OK, node.node().ClosePort(B)); + + WaitForIdle(); + + EXPECT_TRUE(node.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) { + TestNode node(0); + AddNode(&node); + + PortRef A, B, C, D; + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D)); + + EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D)); + + ScopedMessage message; + EXPECT_TRUE(node.ReadMessage(B, &message)); + ASSERT_EQ(1u, message->num_ports()); + EXPECT_TRUE(MessageEquals(message, "foo")); + PortRef E; + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); + + EXPECT_TRUE( + node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); + + WaitForIdle(); + + EXPECT_TRUE( + node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); + EXPECT_FALSE(node.node().CanShutdownCleanly()); + + EXPECT_EQ(OK, node.node().ClosePort(A)); + EXPECT_EQ(OK, node.node().ClosePort(B)); + EXPECT_EQ(OK, node.node().ClosePort(C)); + EXPECT_EQ(OK, node.node().ClosePort(E)); + + WaitForIdle(); + + EXPECT_TRUE(node.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, ProxyCollapse1) { + TestNode node(0); + AddNode(&node); + + PortRef A, B; + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); + + PortRef X, Y; + EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); + + ScopedMessage message; + + // Send B and receive it as C. + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); + ASSERT_TRUE(node.ReadMessage(Y, &message)); + ASSERT_EQ(1u, message->num_ports()); + PortRef C; + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); + + // Send C and receive it as D. + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C)); + ASSERT_TRUE(node.ReadMessage(Y, &message)); + ASSERT_EQ(1u, message->num_ports()); + PortRef D; + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D)); + + // Send D and receive it as E. + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D)); + ASSERT_TRUE(node.ReadMessage(Y, &message)); + ASSERT_EQ(1u, message->num_ports()); + PortRef E; + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); + + EXPECT_EQ(OK, node.node().ClosePort(X)); + EXPECT_EQ(OK, node.node().ClosePort(Y)); + + EXPECT_EQ(OK, node.node().ClosePort(A)); + EXPECT_EQ(OK, node.node().ClosePort(E)); + + // The node should not idle until all proxies are collapsed. + WaitForIdle(); + + EXPECT_TRUE(node.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, ProxyCollapse2) { + TestNode node(0); + AddNode(&node); + + PortRef A, B; + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); + + PortRef X, Y; + EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); + + ScopedMessage message; + + // Send B and A to create proxies in each direction. + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A)); + + EXPECT_EQ(OK, node.node().ClosePort(X)); + EXPECT_EQ(OK, node.node().ClosePort(Y)); + + // At this point we have a scenario with: + // + // D -> [B] -> C -> [A] + // + // Ensure that the proxies can collapse. The sent ports will be closed + // eventually as a result of Y's closure. + + WaitForIdle(); + + EXPECT_TRUE(node.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, SendWithClosedPeer) { + // This tests that if a port is sent when its peer is already known to be + // closed, the newly created port will be aware of that peer closure, and the + // proxy will eventually collapse. + + TestNode node(0); + AddNode(&node); + + // Send a message from A to B, then close A. + PortRef A, B; + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node.SendStringMessage(A, "hey")); + EXPECT_EQ(OK, node.node().ClosePort(A)); + + // Now send B over X-Y as new port C. + PortRef X, Y; + EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); + ScopedMessage message; + ASSERT_TRUE(node.ReadMessage(Y, &message)); + ASSERT_EQ(1u, message->num_ports()); + PortRef C; + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); + + EXPECT_EQ(OK, node.node().ClosePort(X)); + EXPECT_EQ(OK, node.node().ClosePort(Y)); + + WaitForIdle(); + + // C should have received the message originally sent to B, and it should also + // be aware of A's closure. + + ASSERT_TRUE(node.ReadMessage(C, &message)); + EXPECT_TRUE(MessageEquals(message, "hey")); + + PortStatus status{}; + EXPECT_EQ(OK, node.node().GetStatus(C, &status)); + EXPECT_FALSE(status.receiving_messages); + EXPECT_FALSE(status.has_messages); + EXPECT_TRUE(status.peer_closed); + + node.node().ClosePort(C); + + WaitForIdle(); + + EXPECT_TRUE(node.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, SendWithClosedPeerSent) { + // This tests that if a port is closed while some number of proxies are still + // routing messages (directly or indirectly) to it, that the peer port is + // eventually notified of the closure, and the dead-end proxies will + // eventually be removed. + + TestNode node(0); + AddNode(&node); + + PortRef X, Y; + EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); + + PortRef A, B; + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); + + ScopedMessage message; + + // Send A as new port C. + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A)); + + ASSERT_TRUE(node.ReadMessage(Y, &message)); + ASSERT_EQ(1u, message->num_ports()); + PortRef C; + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); + + // Send C as new port D. + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C)); + + ASSERT_TRUE(node.ReadMessage(Y, &message)); + ASSERT_EQ(1u, message->num_ports()); + PortRef D; + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D)); + + // Send a message to B through D, then close D. + EXPECT_EQ(OK, node.SendStringMessage(D, "hey")); + EXPECT_EQ(OK, node.node().ClosePort(D)); + + // Now send B as new port E. + + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); + EXPECT_EQ(OK, node.node().ClosePort(X)); + + ASSERT_TRUE(node.ReadMessage(Y, &message)); + ASSERT_EQ(1u, message->num_ports()); + PortRef E; + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); + + EXPECT_EQ(OK, node.node().ClosePort(Y)); + + WaitForIdle(); + + // E should receive the message originally sent to B, and it should also be + // aware of D's closure. + + ASSERT_TRUE(node.ReadMessage(E, &message)); + EXPECT_TRUE(MessageEquals(message, "hey")); + + PortStatus status{}; + EXPECT_EQ(OK, node.node().GetStatus(E, &status)); + EXPECT_FALSE(status.receiving_messages); + EXPECT_FALSE(status.has_messages); + EXPECT_TRUE(status.peer_closed); + + EXPECT_EQ(OK, node.node().ClosePort(E)); + + WaitForIdle(); + + EXPECT_TRUE(node.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, MergePorts) { + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + // Setup two independent port pairs, A-B on node0 and C-D on node1. + PortRef A, B, C, D; + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); + + // Write a message on A. + EXPECT_EQ(OK, node0.SendStringMessage(A, "hey")); + + // Initiate a merge between B and C. + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); + + WaitForIdle(); + + // Expect all proxies to be gone once idle. + EXPECT_TRUE( + node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); + EXPECT_TRUE( + node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); + + // Expect D to have received the message sent on A. + ScopedMessage message; + ASSERT_TRUE(node1.ReadMessage(D, &message)); + EXPECT_TRUE(MessageEquals(message, "hey")); + + EXPECT_EQ(OK, node0.node().ClosePort(A)); + EXPECT_EQ(OK, node1.node().ClosePort(D)); + + // No more ports should be open. + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, MergePortWithClosedPeer1) { + // This tests that the right thing happens when initiating a merge on a port + // whose peer has already been closed. + + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + // Setup two independent port pairs, A-B on node0 and C-D on node1. + PortRef A, B, C, D; + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); + + // Write a message on A. + EXPECT_EQ(OK, node0.SendStringMessage(A, "hey")); + + // Close A. + EXPECT_EQ(OK, node0.node().ClosePort(A)); + + // Initiate a merge between B and C. + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); + + WaitForIdle(); + + // Expect all proxies to be gone once idle. node0 should have no ports since + // A was explicitly closed. + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE( + node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); + + // Expect D to have received the message sent on A. + ScopedMessage message; + ASSERT_TRUE(node1.ReadMessage(D, &message)); + EXPECT_TRUE(MessageEquals(message, "hey")); + + EXPECT_EQ(OK, node1.node().ClosePort(D)); + + // No more ports should be open. + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, MergePortWithClosedPeer2) { + // This tests that the right thing happens when merging into a port whose peer + // has already been closed. + + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + // Setup two independent port pairs, A-B on node0 and C-D on node1. + PortRef A, B, C, D; + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); + + // Write a message on D and close it. + EXPECT_EQ(OK, node0.SendStringMessage(D, "hey")); + EXPECT_EQ(OK, node1.node().ClosePort(D)); + + // Initiate a merge between B and C. + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); + + WaitForIdle(); + + // Expect all proxies to be gone once idle. node1 should have no ports since + // D was explicitly closed. + EXPECT_TRUE( + node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); + + // Expect A to have received the message sent on D. + ScopedMessage message; + ASSERT_TRUE(node0.ReadMessage(A, &message)); + EXPECT_TRUE(MessageEquals(message, "hey")); + + EXPECT_EQ(OK, node0.node().ClosePort(A)); + + // No more ports should be open. + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, MergePortsWithClosedPeers) { + // This tests that no residual ports are left behind if two ports are merged + // when both of their peers have been closed. + + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + // Setup two independent port pairs, A-B on node0 and C-D on node1. + PortRef A, B, C, D; + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); + + // Close A and D. + EXPECT_EQ(OK, node0.node().ClosePort(A)); + EXPECT_EQ(OK, node1.node().ClosePort(D)); + + WaitForIdle(); + + // Initiate a merge between B and C. + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); + + WaitForIdle(); + + // Expect everything to have gone away. + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, MergePortsWithMovedPeers) { + // This tests that ports can be merged successfully even if their peers are + // moved around. + + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + // Setup two independent port pairs, A-B on node0 and C-D on node1. + PortRef A, B, C, D; + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); + + // Set up another pair X-Y for moving ports on node0. + PortRef X, Y; + EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y)); + + ScopedMessage message; + + // Move A to new port E. + EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A)); + ASSERT_TRUE(node0.ReadMessage(Y, &message)); + ASSERT_EQ(1u, message->num_ports()); + PortRef E; + ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E)); + + EXPECT_EQ(OK, node0.node().ClosePort(X)); + EXPECT_EQ(OK, node0.node().ClosePort(Y)); + + // Write messages on E and D. + EXPECT_EQ(OK, node0.SendStringMessage(E, "hey")); + EXPECT_EQ(OK, node1.SendStringMessage(D, "hi")); + + // Initiate a merge between B and C. + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); + + WaitForIdle(); + + // Expect to receive D's message on E and E's message on D. + ASSERT_TRUE(node0.ReadMessage(E, &message)); + EXPECT_TRUE(MessageEquals(message, "hi")); + ASSERT_TRUE(node1.ReadMessage(D, &message)); + EXPECT_TRUE(MessageEquals(message, "hey")); + + // Close E and D. + EXPECT_EQ(OK, node0.node().ClosePort(E)); + EXPECT_EQ(OK, node1.node().ClosePort(D)); + + WaitForIdle(); + + // Expect everything to have gone away. + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, MergePortsFailsGracefully) { + // This tests that the system remains in a well-defined state if something + // goes wrong during port merge. + + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + // Setup two independent port pairs, A-B on node0 and C-D on node1. + PortRef A, B, C, D; + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); + + ScopedMessage message; + PortRef X, Y; + EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X)); + EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y)); + EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name())); + EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name())); + + // Block the merge from proceeding until we can do something stupid with port + // C. This avoids the test logic racing with async merge logic. + node1.BlockOnEvent(Event::Type::kMergePort); + + // Initiate the merge between B and C. + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); + + // Move C to a new port E. This is not a sane use of Node's public API but + // is still hypothetically possible. It allows us to force a merge failure + // because C will be in an invalid state by the time the merge is processed. + // As a result, B should be closed. + EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C)); + + node1.Unblock(); + + WaitForIdle(); + + ASSERT_TRUE(node0.ReadMessage(X, &message)); + ASSERT_EQ(1u, message->num_ports()); + PortRef E; + ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E)); + + EXPECT_EQ(OK, node0.node().ClosePort(X)); + EXPECT_EQ(OK, node1.node().ClosePort(Y)); + + WaitForIdle(); + + // C goes away as a result of normal proxy removal. B should have been closed + // cleanly by the failed MergePorts. + EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C)); + EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B)); + + // Close A, D, and E. + EXPECT_EQ(OK, node0.node().ClosePort(A)); + EXPECT_EQ(OK, node1.node().ClosePort(D)); + EXPECT_EQ(OK, node0.node().ClosePort(E)); + + WaitForIdle(); + + // Expect everything to have gone away. + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, RemotePeerStatus) { + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + // Create a local port pair. Neither port should appear to have a remote peer. + PortRef a, b; + PortStatus status{}; + node0.node().CreatePortPair(&a, &b); + ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); + EXPECT_FALSE(status.peer_remote); + ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); + EXPECT_FALSE(status.peer_remote); + + // Create a port pair spanning the two nodes. Both spanning ports should + // immediately appear to have a remote peer. + PortRef x0, x1; + CreatePortPair(&node0, &x0, &node1, &x1); + + ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); + EXPECT_TRUE(status.peer_remote); + ASSERT_EQ(OK, node1.node().GetStatus(x1, &status)); + EXPECT_TRUE(status.peer_remote); + + PortRef x2, x3; + CreatePortPair(&node0, &x2, &node1, &x3); + + // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers + // remote and the remote peers local. + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b)); + EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1)); + WaitForIdle(); + + ScopedMessage message; + ASSERT_TRUE(node0.ReadMessage(x2, &message)); + ASSERT_EQ(1u, message->num_ports()); + ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1)); + + ASSERT_TRUE(node1.ReadMessage(x3, &message)); + ASSERT_EQ(1u, message->num_ports()); + ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b)); + + // Now x0-x1 should be local to node0 and a-b should span the nodes. + ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); + EXPECT_FALSE(status.peer_remote); + ASSERT_EQ(OK, node0.node().GetStatus(x1, &status)); + EXPECT_FALSE(status.peer_remote); + ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); + EXPECT_TRUE(status.peer_remote); + ASSERT_EQ(OK, node1.node().GetStatus(b, &status)); + EXPECT_TRUE(status.peer_remote); + + // And swap them back one more time. + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1)); + EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b)); + WaitForIdle(); + + ASSERT_TRUE(node0.ReadMessage(x2, &message)); + ASSERT_EQ(1u, message->num_ports()); + ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b)); + + ASSERT_TRUE(node1.ReadMessage(x3, &message)); + ASSERT_EQ(1u, message->num_ports()); + ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1)); + + ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); + EXPECT_TRUE(status.peer_remote); + ASSERT_EQ(OK, node1.node().GetStatus(x1, &status)); + EXPECT_TRUE(status.peer_remote); + ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); + EXPECT_FALSE(status.peer_remote); + ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); + EXPECT_FALSE(status.peer_remote); + + EXPECT_EQ(OK, node0.node().ClosePort(x0)); + EXPECT_EQ(OK, node1.node().ClosePort(x1)); + EXPECT_EQ(OK, node0.node().ClosePort(x2)); + EXPECT_EQ(OK, node1.node().ClosePort(x3)); + EXPECT_EQ(OK, node0.node().ClosePort(a)); + EXPECT_EQ(OK, node0.node().ClosePort(b)); + + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) { + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + // Set up a-b on node0 and c-d spanning node0-node1. + PortRef a, b, c, d; + node0.node().CreatePortPair(&a, &b); + CreatePortPair(&node0, &c, &node1, &d); + + PortStatus status{}; + ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); + EXPECT_FALSE(status.peer_remote); + ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); + EXPECT_FALSE(status.peer_remote); + ASSERT_EQ(OK, node0.node().GetStatus(c, &status)); + EXPECT_TRUE(status.peer_remote); + ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); + EXPECT_TRUE(status.peer_remote); + + EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c)); + WaitForIdle(); + + ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); + EXPECT_TRUE(status.peer_remote); + ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); + EXPECT_TRUE(status.peer_remote); + + EXPECT_EQ(OK, node0.node().ClosePort(a)); + EXPECT_EQ(OK, node1.node().ClosePort(d)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) { + TestNode node0(0); + AddNode(&node0); + + TestNode node1(1); + AddNode(&node1); + + // Set up a-b on node0 and c-d on node1. + PortRef a, b, c, d; + node0.node().CreatePortPair(&a, &b); + node1.node().CreatePortPair(&c, &d); + + PortStatus status{}; + ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); + EXPECT_FALSE(status.peer_remote); + ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); + EXPECT_FALSE(status.peer_remote); + ASSERT_EQ(OK, node1.node().GetStatus(c, &status)); + EXPECT_FALSE(status.peer_remote); + ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); + EXPECT_FALSE(status.peer_remote); + + EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name())); + WaitForIdle(); + + ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); + EXPECT_TRUE(status.peer_remote); + ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); + EXPECT_TRUE(status.peer_remote); + + EXPECT_EQ(OK, node0.node().ClosePort(a)); + EXPECT_EQ(OK, node1.node().ClosePort(d)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); +} + +TEST_F(PortsTest, RetransmitUserMessageEvents) { + // Ensures that user message events can be retransmitted properly. + TestNode node0(0); + AddNode(&node0); + + PortRef a, b; + node0.node().CreatePortPair(&a, &b); + + // Ping. + const char* kMessage = "hey"; + ScopedMessage message; + EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage)); + ASSERT_TRUE(node0.ReadMessage(b, &message)); + EXPECT_TRUE(MessageEquals(message, kMessage)); + + // Pong. + EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message))); + EXPECT_FALSE(message); + ASSERT_TRUE(node0.ReadMessage(a, &message)); + EXPECT_TRUE(MessageEquals(message, kMessage)); + + // Ping again. + EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message))); + EXPECT_FALSE(message); + ASSERT_TRUE(node0.ReadMessage(b, &message)); + EXPECT_TRUE(MessageEquals(message, kMessage)); + + // Pong again! + EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message))); + EXPECT_FALSE(message); + ASSERT_TRUE(node0.ReadMessage(a, &message)); + EXPECT_TRUE(MessageEquals(message, kMessage)); + + EXPECT_EQ(OK, node0.node().ClosePort(a)); + EXPECT_EQ(OK, node0.node().ClosePort(b)); +} + +TEST_F(PortsTest, SetAcknowledgeRequestInterval) { + TestNode node0(0); + AddNode(&node0); + + PortRef a0, a1; + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); + EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0)); + + // Send a batch of messages. + EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 15)); + EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0)); + EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5)); + WaitForIdle(); + EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0)); + + // Set to acknowledge every read message, and validate that already-read + // messages are acknowledged. + EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 1)); + WaitForIdle(); + EXPECT_EQ(10u, node0.GetUnacknowledgedMessageCount(a0)); + + // Read a third of the messages from the other end. + EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5)); + WaitForIdle(); + + EXPECT_EQ(5u, node0.GetUnacknowledgedMessageCount(a0)); + + TestNode node1(1); + AddNode(&node1); + + // Transfer a1 across to node1. + PortRef x0, x1; + CreatePortPair(&node0, &x0, &node1, &x1); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1)); + WaitForIdle(); + + ScopedMessage message; + ASSERT_TRUE(node1.ReadMessage(x1, &message)); + ASSERT_EQ(1u, message->num_ports()); + ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &a1)); + + // Read the last third of the messages from the transferred node, and + // validate that the unacknowledge message count updates correctly. + EXPECT_TRUE(node1.ReadMultipleMessages(a1, 5)); + WaitForIdle(); + EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0)); + + // Turn the acknowledges down and validate that they don't go on indefinitely. + EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 0)); + EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 10)); + WaitForIdle(); + EXPECT_TRUE(node1.ReadMultipleMessages(a1, 10)); + WaitForIdle(); + EXPECT_NE(0u, node0.GetUnacknowledgedMessageCount(a0)); + + // Close the far port and validate that the closure updates the unacknowledged + // count. + EXPECT_EQ(OK, node1.node().ClosePort(a1)); + WaitForIdle(); + EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0)); + + EXPECT_EQ(OK, node0.node().ClosePort(a0)); + EXPECT_EQ(OK, node0.node().ClosePort(x0)); + EXPECT_EQ(OK, node1.node().ClosePort(x1)); +} + +} // namespace test +} // namespace ports +} // namespace core +} // namespace mojo |