summaryrefslogtreecommitdiffstats
path: root/ipc/chromium/gtest/ports_unittest.cc
diff options
context:
space:
mode:
Diffstat (limited to 'ipc/chromium/gtest/ports_unittest.cc')
-rw-r--r--ipc/chromium/gtest/ports_unittest.cc1751
1 files changed, 1751 insertions, 0 deletions
diff --git a/ipc/chromium/gtest/ports_unittest.cc b/ipc/chromium/gtest/ports_unittest.cc
new file mode 100644
index 0000000000..7469a1371f
--- /dev/null
+++ b/ipc/chromium/gtest/ports_unittest.cc
@@ -0,0 +1,1751 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <inttypes.h>
+#include <stdio.h>
+
+#include <map>
+#include <utility>
+
+#include "base/logging.h"
+#include "base/waitable_event.h"
+#include "base/thread.h"
+#include "base/string_util.h"
+#include "mojo/core/ports/event.h"
+#include "mojo/core/ports/node.h"
+#include "mojo/core/ports/node_delegate.h"
+#include "mojo/core/ports/user_message.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+#include "mozilla/Mutex.h"
+
+namespace mojo {
+namespace core {
+namespace ports {
+namespace test {
+
+namespace {
+
+// TODO(rockot): Remove this unnecessary alias.
+using ScopedMessage = mozilla::UniquePtr<UserMessageEvent>;
+
+class TestMessage : public UserMessage {
+ public:
+ static const TypeInfo kUserMessageTypeInfo;
+
+ explicit TestMessage(const std::string& payload)
+ : UserMessage(&kUserMessageTypeInfo), payload_(payload) {}
+ ~TestMessage() override = default;
+
+ const std::string& payload() const { return payload_; }
+
+ private:
+ std::string payload_;
+};
+
+const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {};
+
+ScopedMessage NewUserMessageEvent(const std::string& payload,
+ size_t num_ports) {
+ auto event = mozilla::MakeUnique<UserMessageEvent>(num_ports);
+ event->AttachMessage(mozilla::MakeUnique<TestMessage>(payload));
+ return event;
+}
+
+bool MessageEquals(const ScopedMessage& message, const std::string& s) {
+ return message->GetMessage<TestMessage>()->payload() == s;
+}
+
+class TestNode;
+
+class MessageRouter {
+ public:
+ virtual ~MessageRouter() = default;
+
+ virtual void ForwardEvent(TestNode* from_node, const NodeName& node_name,
+ ScopedEvent event) = 0;
+ virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0;
+};
+
+class TestNode : public NodeDelegate {
+ public:
+ explicit TestNode(uint64_t id)
+ : node_name_(id, 1),
+ node_(node_name_, this),
+ node_thread_(StringPrintf("Node %" PRIu64 " thread", id).c_str()),
+ events_available_event_(/* manual_reset */ false,
+ /* initially_signaled */ false),
+ idle_event_(/* manual_reset */ true, /* initially_signaled */ true) {}
+
+ ~TestNode() override {
+ StopWhenIdle();
+ node_thread_.Stop();
+ }
+
+ const NodeName& name() const { return node_name_; }
+
+ // NOTE: Node is thread-safe.
+ Node& node() { return node_; }
+
+ base::WaitableEvent& idle_event() { return idle_event_; }
+
+ bool IsIdle() {
+ mozilla::MutexAutoLock lock(lock_);
+ return started_ && !dispatching_ &&
+ (incoming_events_.empty() || (block_on_event_ && blocked_));
+ }
+
+ void BlockOnEvent(Event::Type type) {
+ mozilla::MutexAutoLock lock(lock_);
+ blocked_event_type_ = type;
+ block_on_event_ = true;
+ }
+
+ void Unblock() {
+ mozilla::MutexAutoLock lock(lock_);
+ block_on_event_ = false;
+ events_available_event_.Signal();
+ }
+
+ void Start(MessageRouter* router) {
+ router_ = router;
+ node_thread_.Start();
+ node_thread_.message_loop()->PostTask(mozilla::NewNonOwningRunnableMethod(
+ "TestNode::ProcessEvents", this, &TestNode::ProcessEvents));
+ }
+
+ void StopWhenIdle() {
+ mozilla::MutexAutoLock lock(lock_);
+ should_quit_ = true;
+ events_available_event_.Signal();
+ }
+
+ void WakeUp() { events_available_event_.Signal(); }
+
+ int SendStringMessage(const PortRef& port, const std::string& s) {
+ return node_.SendUserMessage(port, NewUserMessageEvent(s, 0));
+ }
+
+ int SendMultipleMessages(const PortRef& port, size_t num_messages) {
+ for (size_t i = 0; i < num_messages; ++i) {
+ int result = SendStringMessage(port, "");
+ if (result != OK) {
+ return result;
+ }
+ }
+ return OK;
+ }
+
+ int SendStringMessageWithPort(const PortRef& port, const std::string& s,
+ const PortName& sent_port_name) {
+ auto event = NewUserMessageEvent(s, 1);
+ event->ports()[0] = sent_port_name;
+ return node_.SendUserMessage(port, std::move(event));
+ }
+
+ int SendStringMessageWithPort(const PortRef& port, const std::string& s,
+ const PortRef& sent_port) {
+ return SendStringMessageWithPort(port, s, sent_port.name());
+ }
+
+ void set_drop_messages(bool value) {
+ mozilla::MutexAutoLock lock(lock_);
+ drop_messages_ = value;
+ }
+
+ void set_save_messages(bool value) {
+ mozilla::MutexAutoLock lock(lock_);
+ save_messages_ = value;
+ }
+
+ bool ReadMessage(const PortRef& port, ScopedMessage* message) {
+ return node_.GetMessage(port, message, nullptr) == OK && *message;
+ }
+
+ bool ReadMultipleMessages(const PortRef& port, size_t num_messages) {
+ for (size_t i = 0; i < num_messages; ++i) {
+ ScopedMessage message;
+ if (!ReadMessage(port, &message)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool GetSavedMessage(ScopedMessage* message) {
+ mozilla::MutexAutoLock lock(lock_);
+ if (saved_messages_.empty()) {
+ message->reset();
+ return false;
+ }
+ std::swap(*message, saved_messages_.front());
+ saved_messages_.pop();
+ return true;
+ }
+
+ void EnqueueEvent(ScopedEvent event) {
+ idle_event_.Reset();
+
+ // NOTE: This may be called from ForwardMessage and thus must not reenter
+ // |node_|.
+ mozilla::MutexAutoLock lock(lock_);
+ incoming_events_.emplace(std::move(event));
+ events_available_event_.Signal();
+ }
+
+ void ForwardEvent(const NodeName& node_name, ScopedEvent event) override {
+ {
+ mozilla::MutexAutoLock lock(lock_);
+ if (drop_messages_) {
+ DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to "
+ << node_name;
+
+ mozilla::MutexAutoUnlock unlock(lock_);
+ ClosePortsInEvent(event.get());
+ return;
+ }
+ }
+
+ DCHECK(router_);
+ DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name;
+ router_->ForwardEvent(this, node_name, std::move(event));
+ }
+
+ void BroadcastEvent(ScopedEvent event) override {
+ router_->BroadcastEvent(this, std::move(event));
+ }
+
+ void PortStatusChanged(const PortRef& port) override {
+ // The port may be closed, in which case we ignore the notification.
+ mozilla::MutexAutoLock lock(lock_);
+ if (!save_messages_) {
+ return;
+ }
+
+ for (;;) {
+ ScopedMessage message;
+ {
+ mozilla::MutexAutoUnlock unlock(lock_);
+ if (!ReadMessage(port, &message)) {
+ break;
+ }
+ }
+
+ saved_messages_.emplace(std::move(message));
+ }
+ }
+
+ void ClosePortsInEvent(Event* event) {
+ if (event->type() != Event::Type::kUserMessage) {
+ return;
+ }
+
+ UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event);
+ for (size_t i = 0; i < message_event->num_ports(); ++i) {
+ PortRef port;
+ ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port));
+ EXPECT_EQ(OK, node_.ClosePort(port));
+ }
+ }
+
+ uint64_t GetUnacknowledgedMessageCount(const PortRef& port_ref) {
+ PortStatus status{};
+ if (node_.GetStatus(port_ref, &status) != OK) {
+ return 0;
+ }
+
+ return status.unacknowledged_message_count;
+ }
+
+ private:
+ void ProcessEvents() {
+ for (;;) {
+ events_available_event_.Wait();
+ mozilla::MutexAutoLock lock(lock_);
+
+ if (should_quit_) {
+ return;
+ }
+
+ dispatching_ = true;
+ while (!incoming_events_.empty()) {
+ if (block_on_event_ &&
+ incoming_events_.front()->type() == blocked_event_type_) {
+ blocked_ = true;
+ // Go idle if we hit a blocked event type.
+ break;
+ }
+ blocked_ = false;
+
+ ScopedEvent event = std::move(incoming_events_.front());
+ incoming_events_.pop();
+
+ // NOTE: AcceptMessage() can re-enter this object to call any of the
+ // NodeDelegate interface methods.
+ mozilla::MutexAutoUnlock unlock(lock_);
+ node_.AcceptEvent(std::move(event));
+ }
+
+ dispatching_ = false;
+ started_ = true;
+ idle_event_.Signal();
+ };
+ }
+
+ const NodeName node_name_;
+ Node node_;
+ MessageRouter* router_ = nullptr;
+
+ base::Thread node_thread_;
+ base::WaitableEvent events_available_event_;
+ base::WaitableEvent idle_event_;
+
+ // Guards fields below.
+ mozilla::Mutex lock_ MOZ_UNANNOTATED{"TestNode"};
+ bool started_ = false;
+ bool dispatching_ = false;
+ bool should_quit_ = false;
+ bool drop_messages_ = false;
+ bool save_messages_ = false;
+ bool blocked_ = false;
+ bool block_on_event_ = false;
+ Event::Type blocked_event_type_{};
+ std::queue<ScopedEvent> incoming_events_;
+ std::queue<ScopedMessage> saved_messages_;
+};
+
+class PortsTest : public testing::Test, public MessageRouter {
+ public:
+ void AddNode(TestNode* node) {
+ {
+ mozilla::MutexAutoLock lock(lock_);
+ nodes_[node->name()] = node;
+ }
+ node->Start(this);
+ }
+
+ void RemoveNode(TestNode* node) {
+ {
+ mozilla::MutexAutoLock lock(lock_);
+ nodes_.erase(node->name());
+ }
+
+ for (const auto& entry : nodes_) {
+ entry.second->node().LostConnectionToNode(node->name());
+ }
+ }
+
+ // Waits until all known Nodes are idle. Message forwarding and processing
+ // is handled in such a way that idleness is a stable state: once all nodes in
+ // the system are idle, they will remain idle until the test explicitly
+ // initiates some further event (e.g. sending a message, closing a port, or
+ // removing a Node).
+ void WaitForIdle() {
+ for (;;) {
+ mozilla::MutexAutoLock global_lock(global_lock_);
+ bool all_nodes_idle = true;
+ for (const auto& entry : nodes_) {
+ if (!entry.second->IsIdle()) {
+ all_nodes_idle = false;
+ }
+ entry.second->WakeUp();
+ }
+ if (all_nodes_idle) {
+ return;
+ }
+
+ // Wait for any Node to signal that it's idle.
+ mozilla::MutexAutoUnlock global_unlock(global_lock_);
+ std::vector<base::WaitableEvent*> events;
+ for (const auto& entry : nodes_) {
+ events.push_back(&entry.second->idle_event());
+ }
+ base::WaitableEvent::WaitMany(events.data(), events.size());
+ }
+ }
+
+ void CreatePortPair(TestNode* node0, PortRef* port0, TestNode* node1,
+ PortRef* port1) {
+ if (node0 == node1) {
+ EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
+ } else {
+ EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
+ EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
+ EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(),
+ port1->name()));
+ EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(),
+ port0->name()));
+ }
+ }
+
+ private:
+ // MessageRouter:
+ void ForwardEvent(TestNode* from_node, const NodeName& node_name,
+ ScopedEvent event) override {
+ mozilla::MutexAutoLock global_lock(global_lock_);
+ mozilla::MutexAutoLock lock(lock_);
+ // Drop messages from nodes that have been removed.
+ if (nodes_.find(from_node->name()) == nodes_.end()) {
+ from_node->ClosePortsInEvent(event.get());
+ return;
+ }
+
+ auto it = nodes_.find(node_name);
+ if (it == nodes_.end()) {
+ DVLOG(1) << "Node not found: " << node_name;
+ return;
+ }
+
+ // Serialize and de-serialize all forwarded events.
+ size_t buf_size = event->GetSerializedSize();
+ mozilla::UniquePtr<char[]> buf(new char[buf_size]);
+ event->Serialize(buf.get());
+ ScopedEvent copy = Event::Deserialize(buf.get(), buf_size);
+ // This should always succeed unless serialization or deserialization
+ // is broken. In that case, the loss of events should cause a test failure.
+ ASSERT_TRUE(copy);
+
+ // Also copy the payload for user messages.
+ if (event->type() == Event::Type::kUserMessage) {
+ UserMessageEvent* message_event =
+ static_cast<UserMessageEvent*>(event.get());
+ UserMessageEvent* message_copy =
+ static_cast<UserMessageEvent*>(copy.get());
+
+ message_copy->AttachMessage(mozilla::MakeUnique<TestMessage>(
+ message_event->GetMessage<TestMessage>()->payload()));
+ }
+
+ it->second->EnqueueEvent(std::move(event));
+ }
+
+ void BroadcastEvent(TestNode* from_node, ScopedEvent event) override {
+ mozilla::MutexAutoLock global_lock(global_lock_);
+ mozilla::MutexAutoLock lock(lock_);
+
+ // Drop messages from nodes that have been removed.
+ if (nodes_.find(from_node->name()) == nodes_.end()) {
+ return;
+ }
+
+ for (const auto& entry : nodes_) {
+ TestNode* node = entry.second;
+ // Broadcast doesn't deliver to the local node.
+ if (node == from_node) {
+ continue;
+ }
+ node->EnqueueEvent(event->Clone());
+ }
+ }
+
+ // Acquired before any operation which makes a Node busy, and before testing
+ // if all nodes are idle.
+ mozilla::Mutex global_lock_ MOZ_UNANNOTATED{"PortsTest Global Lock"};
+
+ mozilla::Mutex lock_ MOZ_UNANNOTATED{"PortsTest Lock"};
+ std::map<NodeName, TestNode*> nodes_;
+};
+
+} // namespace
+
+TEST_F(PortsTest, Basic1) {
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ PortRef x0, x1;
+ CreatePortPair(&node0, &x0, &node1, &x1);
+
+ PortRef a0, a1;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
+ EXPECT_EQ(OK, node0.node().ClosePort(a0));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(x0));
+ EXPECT_EQ(OK, node1.node().ClosePort(x1));
+
+ WaitForIdle();
+
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, Basic2) {
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ PortRef x0, x1;
+ CreatePortPair(&node0, &x0, &node1, &x1);
+
+ PortRef b0, b1;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
+ EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(b0));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(x0));
+ EXPECT_EQ(OK, node1.node().ClosePort(x1));
+
+ WaitForIdle();
+
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, Basic3) {
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ PortRef x0, x1;
+ CreatePortPair(&node0, &x0, &node1, &x1);
+
+ PortRef a0, a1;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
+
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
+ EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));
+
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));
+
+ PortRef b0, b1;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
+ EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(b0));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(x0));
+ EXPECT_EQ(OK, node1.node().ClosePort(x1));
+
+ WaitForIdle();
+
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, LostConnectionToNode1) {
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+ node1.set_drop_messages(true);
+
+ PortRef x0, x1;
+ CreatePortPair(&node0, &x0, &node1, &x1);
+
+ // Transfer a port to node1 and simulate a lost connection to node1.
+
+ PortRef a0, a1;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
+
+ WaitForIdle();
+
+ RemoveNode(&node1);
+
+ WaitForIdle();
+
+ EXPECT_EQ(OK, node0.node().ClosePort(a0));
+ EXPECT_EQ(OK, node0.node().ClosePort(x0));
+ EXPECT_EQ(OK, node1.node().ClosePort(x1));
+
+ WaitForIdle();
+
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, LostConnectionToNode2) {
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ PortRef x0, x1;
+ CreatePortPair(&node0, &x0, &node1, &x1);
+
+ PortRef a0, a1;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));
+
+ WaitForIdle();
+
+ node1.set_drop_messages(true);
+
+ RemoveNode(&node1);
+
+ WaitForIdle();
+
+ // a0 should have eventually detected peer closure after node loss.
+ ScopedMessage message;
+ EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
+ node0.node().GetMessage(a0, &message, nullptr));
+ EXPECT_FALSE(message);
+
+ EXPECT_EQ(OK, node0.node().ClosePort(a0));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(x0));
+
+ EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr));
+ EXPECT_TRUE(message);
+ node1.ClosePortsInEvent(message.get());
+
+ EXPECT_EQ(OK, node1.node().ClosePort(x1));
+
+ WaitForIdle();
+
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
+ // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
+ // node.
+
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ TestNode node2(2);
+ AddNode(&node2);
+
+ // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
+ PortRef A, B, C, D;
+ CreatePortPair(&node0, &A, &node1, &B);
+ CreatePortPair(&node1, &C, &node2, &D);
+
+ // Create E-F and send F over A to node 1.
+ PortRef E, F;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
+
+ WaitForIdle();
+
+ ScopedMessage message;
+ ASSERT_TRUE(node1.ReadMessage(B, &message));
+ ASSERT_EQ(1u, message->num_ports());
+
+ EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));
+
+ // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
+ // will trivially become aware of the loss, and this test verifies that the
+ // port A on node 0 will eventually also become aware of it.
+
+ // Make sure node2 stops processing events when it encounters an ObserveProxy.
+ node2.BlockOnEvent(Event::Type::kObserveProxy);
+
+ EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
+ WaitForIdle();
+
+ // Simulate node 1 and 2 disconnecting.
+ EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
+
+ // Let node2 continue processing events and wait for everyone to go idle.
+ node2.Unblock();
+ WaitForIdle();
+
+ // Port F should be gone.
+ EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
+
+ // Port E should have detected peer closure despite the fact that there is
+ // no longer a continuous route from F to E over which the event could travel.
+ PortStatus status{};
+ EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
+ EXPECT_TRUE(status.peer_closed);
+
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
+ EXPECT_EQ(OK, node1.node().ClosePort(B));
+ EXPECT_EQ(OK, node1.node().ClosePort(C));
+ EXPECT_EQ(OK, node0.node().ClosePort(E));
+
+ WaitForIdle();
+
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
+ // Tests that a proxy gets cleaned up when its direct peer lives on a lost
+ // node and it's predecessor lives on the same node.
+
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ PortRef A, B;
+ CreatePortPair(&node0, &A, &node1, &B);
+
+ PortRef C, D;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
+
+ // Send D but block node0 on an ObserveProxy event.
+ node0.BlockOnEvent(Event::Type::kObserveProxy);
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));
+
+ // node0 won't collapse the proxy but node1 will receive the message before
+ // going idle.
+ WaitForIdle();
+
+ ScopedMessage message;
+ ASSERT_TRUE(node1.ReadMessage(B, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ PortRef E;
+ EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
+
+ RemoveNode(&node1);
+
+ node0.Unblock();
+ WaitForIdle();
+
+ // Port C should have detected peer closure.
+ PortStatus status{};
+ EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
+ EXPECT_TRUE(status.peer_closed);
+
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
+ EXPECT_EQ(OK, node1.node().ClosePort(B));
+ EXPECT_EQ(OK, node0.node().ClosePort(C));
+ EXPECT_EQ(OK, node1.node().ClosePort(E));
+
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, GetMessage1) {
+ TestNode node(0);
+ AddNode(&node);
+
+ PortRef a0, a1;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
+
+ ScopedMessage message;
+ EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
+ EXPECT_FALSE(message);
+
+ EXPECT_EQ(OK, node.node().ClosePort(a1));
+
+ WaitForIdle();
+
+ EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
+ node.node().GetMessage(a0, &message, nullptr));
+ EXPECT_FALSE(message);
+
+ EXPECT_EQ(OK, node.node().ClosePort(a0));
+
+ WaitForIdle();
+
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, GetMessage2) {
+ TestNode node(0);
+ AddNode(&node);
+
+ PortRef a0, a1;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
+
+ EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));
+
+ ScopedMessage message;
+ EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
+
+ ASSERT_TRUE(message);
+ EXPECT_TRUE(MessageEquals(message, "1"));
+
+ EXPECT_EQ(OK, node.node().ClosePort(a0));
+ EXPECT_EQ(OK, node.node().ClosePort(a1));
+
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, GetMessage3) {
+ TestNode node(0);
+ AddNode(&node);
+
+ PortRef a0, a1;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
+
+ const char* kStrings[] = {"1", "2", "3"};
+
+ for (auto& kString : kStrings) {
+ EXPECT_EQ(OK, node.SendStringMessage(a1, kString));
+ }
+
+ ScopedMessage message;
+ for (auto& kString : kStrings) {
+ EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
+ ASSERT_TRUE(message);
+ EXPECT_TRUE(MessageEquals(message, kString));
+ }
+
+ EXPECT_EQ(OK, node.node().ClosePort(a0));
+ EXPECT_EQ(OK, node.node().ClosePort(a1));
+
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, Delegation1) {
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ PortRef x0, x1;
+ CreatePortPair(&node0, &x0, &node1, &x1);
+
+ // In this test, we send a message to a port that has been moved.
+
+ PortRef a0, a1;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
+ WaitForIdle();
+
+ ScopedMessage message;
+ ASSERT_TRUE(node1.ReadMessage(x1, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ EXPECT_TRUE(MessageEquals(message, "a1"));
+
+ // This is "a1" from the point of view of node1.
+ PortName a2_name = message->ports()[0];
+ EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
+ EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));
+
+ WaitForIdle();
+
+ ASSERT_TRUE(node0.ReadMessage(x0, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ EXPECT_TRUE(MessageEquals(message, "a2"));
+
+ // This is "a2" from the point of view of node1.
+ PortName a3_name = message->ports()[0];
+
+ PortRef a3;
+ EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));
+
+ ASSERT_TRUE(node0.ReadMessage(a3, &message));
+ EXPECT_EQ(0u, message->num_ports());
+ EXPECT_TRUE(MessageEquals(message, "hello"));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(a0));
+ EXPECT_EQ(OK, node0.node().ClosePort(a3));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(x0));
+ EXPECT_EQ(OK, node1.node().ClosePort(x1));
+
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, Delegation2) {
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ for (int i = 0; i < 100; ++i) {
+ // Setup pipe a<->b between node0 and node1.
+ PortRef A, B;
+ CreatePortPair(&node0, &A, &node1, &B);
+
+ PortRef C, D;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
+
+ PortRef E, F;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
+
+ node1.set_save_messages(true);
+
+ // Pass D over A to B.
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));
+
+ // Pass F over C to D.
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));
+
+ // This message should find its way to node1.
+ EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));
+
+ WaitForIdle();
+
+ EXPECT_EQ(OK, node0.node().ClosePort(C));
+ EXPECT_EQ(OK, node0.node().ClosePort(E));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
+ EXPECT_EQ(OK, node1.node().ClosePort(B));
+
+ bool got_hello = false;
+ ScopedMessage message;
+ while (node1.GetSavedMessage(&message)) {
+ node1.ClosePortsInEvent(message.get());
+ if (MessageEquals(message, "hello")) {
+ got_hello = true;
+ break;
+ }
+ }
+
+ EXPECT_TRUE(got_hello);
+
+ WaitForIdle(); // Because closing ports may have generated tasks.
+ }
+
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, SendUninitialized) {
+ TestNode node(0);
+ AddNode(&node);
+
+ PortRef x0;
+ EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
+ EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
+ EXPECT_EQ(OK, node.node().ClosePort(x0));
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, SendFailure) {
+ TestNode node(0);
+ AddNode(&node);
+
+ node.set_save_messages(true);
+
+ PortRef A, B;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
+
+ // Try to send A over itself.
+
+ EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
+ node.SendStringMessageWithPort(A, "oops", A));
+
+ // Try to send B over A.
+
+ EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
+ node.SendStringMessageWithPort(A, "nope", B));
+
+ // B should be closed immediately.
+ EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));
+
+ WaitForIdle();
+
+ // There should have been no messages accepted.
+ ScopedMessage message;
+ EXPECT_FALSE(node.GetSavedMessage(&message));
+
+ EXPECT_EQ(OK, node.node().ClosePort(A));
+
+ WaitForIdle();
+
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, DontLeakUnreceivedPorts) {
+ TestNode node(0);
+ AddNode(&node);
+
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
+
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
+
+ EXPECT_EQ(OK, node.node().ClosePort(C));
+ EXPECT_EQ(OK, node.node().ClosePort(A));
+ EXPECT_EQ(OK, node.node().ClosePort(B));
+
+ WaitForIdle();
+
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
+ TestNode node(0);
+ AddNode(&node);
+
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
+
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
+
+ ScopedMessage message;
+ EXPECT_TRUE(node.ReadMessage(B, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ EXPECT_TRUE(MessageEquals(message, "foo"));
+ PortRef E;
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
+
+ EXPECT_TRUE(
+ node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
+
+ WaitForIdle();
+
+ EXPECT_TRUE(
+ node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
+ EXPECT_FALSE(node.node().CanShutdownCleanly());
+
+ EXPECT_EQ(OK, node.node().ClosePort(A));
+ EXPECT_EQ(OK, node.node().ClosePort(B));
+ EXPECT_EQ(OK, node.node().ClosePort(C));
+ EXPECT_EQ(OK, node.node().ClosePort(E));
+
+ WaitForIdle();
+
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, ProxyCollapse1) {
+ TestNode node(0);
+ AddNode(&node);
+
+ PortRef A, B;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
+
+ PortRef X, Y;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
+
+ ScopedMessage message;
+
+ // Send B and receive it as C.
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
+ ASSERT_TRUE(node.ReadMessage(Y, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ PortRef C;
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
+
+ // Send C and receive it as D.
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
+ ASSERT_TRUE(node.ReadMessage(Y, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ PortRef D;
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
+
+ // Send D and receive it as E.
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
+ ASSERT_TRUE(node.ReadMessage(Y, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ PortRef E;
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
+
+ EXPECT_EQ(OK, node.node().ClosePort(X));
+ EXPECT_EQ(OK, node.node().ClosePort(Y));
+
+ EXPECT_EQ(OK, node.node().ClosePort(A));
+ EXPECT_EQ(OK, node.node().ClosePort(E));
+
+ // The node should not idle until all proxies are collapsed.
+ WaitForIdle();
+
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, ProxyCollapse2) {
+ TestNode node(0);
+ AddNode(&node);
+
+ PortRef A, B;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
+
+ PortRef X, Y;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
+
+ ScopedMessage message;
+
+ // Send B and A to create proxies in each direction.
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
+
+ EXPECT_EQ(OK, node.node().ClosePort(X));
+ EXPECT_EQ(OK, node.node().ClosePort(Y));
+
+ // At this point we have a scenario with:
+ //
+ // D -> [B] -> C -> [A]
+ //
+ // Ensure that the proxies can collapse. The sent ports will be closed
+ // eventually as a result of Y's closure.
+
+ WaitForIdle();
+
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, SendWithClosedPeer) {
+ // This tests that if a port is sent when its peer is already known to be
+ // closed, the newly created port will be aware of that peer closure, and the
+ // proxy will eventually collapse.
+
+ TestNode node(0);
+ AddNode(&node);
+
+ // Send a message from A to B, then close A.
+ PortRef A, B;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
+ EXPECT_EQ(OK, node.node().ClosePort(A));
+
+ // Now send B over X-Y as new port C.
+ PortRef X, Y;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
+ ScopedMessage message;
+ ASSERT_TRUE(node.ReadMessage(Y, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ PortRef C;
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
+
+ EXPECT_EQ(OK, node.node().ClosePort(X));
+ EXPECT_EQ(OK, node.node().ClosePort(Y));
+
+ WaitForIdle();
+
+ // C should have received the message originally sent to B, and it should also
+ // be aware of A's closure.
+
+ ASSERT_TRUE(node.ReadMessage(C, &message));
+ EXPECT_TRUE(MessageEquals(message, "hey"));
+
+ PortStatus status{};
+ EXPECT_EQ(OK, node.node().GetStatus(C, &status));
+ EXPECT_FALSE(status.receiving_messages);
+ EXPECT_FALSE(status.has_messages);
+ EXPECT_TRUE(status.peer_closed);
+
+ node.node().ClosePort(C);
+
+ WaitForIdle();
+
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, SendWithClosedPeerSent) {
+ // This tests that if a port is closed while some number of proxies are still
+ // routing messages (directly or indirectly) to it, that the peer port is
+ // eventually notified of the closure, and the dead-end proxies will
+ // eventually be removed.
+
+ TestNode node(0);
+ AddNode(&node);
+
+ PortRef X, Y;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
+
+ PortRef A, B;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
+
+ ScopedMessage message;
+
+ // Send A as new port C.
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
+
+ ASSERT_TRUE(node.ReadMessage(Y, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ PortRef C;
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
+
+ // Send C as new port D.
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
+
+ ASSERT_TRUE(node.ReadMessage(Y, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ PortRef D;
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
+
+ // Send a message to B through D, then close D.
+ EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
+ EXPECT_EQ(OK, node.node().ClosePort(D));
+
+ // Now send B as new port E.
+
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
+ EXPECT_EQ(OK, node.node().ClosePort(X));
+
+ ASSERT_TRUE(node.ReadMessage(Y, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ PortRef E;
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
+
+ EXPECT_EQ(OK, node.node().ClosePort(Y));
+
+ WaitForIdle();
+
+ // E should receive the message originally sent to B, and it should also be
+ // aware of D's closure.
+
+ ASSERT_TRUE(node.ReadMessage(E, &message));
+ EXPECT_TRUE(MessageEquals(message, "hey"));
+
+ PortStatus status{};
+ EXPECT_EQ(OK, node.node().GetStatus(E, &status));
+ EXPECT_FALSE(status.receiving_messages);
+ EXPECT_FALSE(status.has_messages);
+ EXPECT_TRUE(status.peer_closed);
+
+ EXPECT_EQ(OK, node.node().ClosePort(E));
+
+ WaitForIdle();
+
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, MergePorts) {
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ // Setup two independent port pairs, A-B on node0 and C-D on node1.
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
+
+ // Write a message on A.
+ EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
+
+ // Initiate a merge between B and C.
+ EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
+
+ WaitForIdle();
+
+ // Expect all proxies to be gone once idle.
+ EXPECT_TRUE(
+ node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
+ EXPECT_TRUE(
+ node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
+
+ // Expect D to have received the message sent on A.
+ ScopedMessage message;
+ ASSERT_TRUE(node1.ReadMessage(D, &message));
+ EXPECT_TRUE(MessageEquals(message, "hey"));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
+ EXPECT_EQ(OK, node1.node().ClosePort(D));
+
+ // No more ports should be open.
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, MergePortWithClosedPeer1) {
+ // This tests that the right thing happens when initiating a merge on a port
+ // whose peer has already been closed.
+
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ // Setup two independent port pairs, A-B on node0 and C-D on node1.
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
+
+ // Write a message on A.
+ EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
+
+ // Close A.
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
+
+ // Initiate a merge between B and C.
+ EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
+
+ WaitForIdle();
+
+ // Expect all proxies to be gone once idle. node0 should have no ports since
+ // A was explicitly closed.
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(
+ node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
+
+ // Expect D to have received the message sent on A.
+ ScopedMessage message;
+ ASSERT_TRUE(node1.ReadMessage(D, &message));
+ EXPECT_TRUE(MessageEquals(message, "hey"));
+
+ EXPECT_EQ(OK, node1.node().ClosePort(D));
+
+ // No more ports should be open.
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, MergePortWithClosedPeer2) {
+ // This tests that the right thing happens when merging into a port whose peer
+ // has already been closed.
+
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ // Setup two independent port pairs, A-B on node0 and C-D on node1.
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
+
+ // Write a message on D and close it.
+ EXPECT_EQ(OK, node0.SendStringMessage(D, "hey"));
+ EXPECT_EQ(OK, node1.node().ClosePort(D));
+
+ // Initiate a merge between B and C.
+ EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
+
+ WaitForIdle();
+
+ // Expect all proxies to be gone once idle. node1 should have no ports since
+ // D was explicitly closed.
+ EXPECT_TRUE(
+ node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+
+ // Expect A to have received the message sent on D.
+ ScopedMessage message;
+ ASSERT_TRUE(node0.ReadMessage(A, &message));
+ EXPECT_TRUE(MessageEquals(message, "hey"));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
+
+ // No more ports should be open.
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, MergePortsWithClosedPeers) {
+ // This tests that no residual ports are left behind if two ports are merged
+ // when both of their peers have been closed.
+
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ // Setup two independent port pairs, A-B on node0 and C-D on node1.
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
+
+ // Close A and D.
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
+ EXPECT_EQ(OK, node1.node().ClosePort(D));
+
+ WaitForIdle();
+
+ // Initiate a merge between B and C.
+ EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
+
+ WaitForIdle();
+
+ // Expect everything to have gone away.
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, MergePortsWithMovedPeers) {
+ // This tests that ports can be merged successfully even if their peers are
+ // moved around.
+
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ // Setup two independent port pairs, A-B on node0 and C-D on node1.
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
+
+ // Set up another pair X-Y for moving ports on node0.
+ PortRef X, Y;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));
+
+ ScopedMessage message;
+
+ // Move A to new port E.
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
+ ASSERT_TRUE(node0.ReadMessage(Y, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ PortRef E;
+ ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(X));
+ EXPECT_EQ(OK, node0.node().ClosePort(Y));
+
+ // Write messages on E and D.
+ EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
+ EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));
+
+ // Initiate a merge between B and C.
+ EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
+
+ WaitForIdle();
+
+ // Expect to receive D's message on E and E's message on D.
+ ASSERT_TRUE(node0.ReadMessage(E, &message));
+ EXPECT_TRUE(MessageEquals(message, "hi"));
+ ASSERT_TRUE(node1.ReadMessage(D, &message));
+ EXPECT_TRUE(MessageEquals(message, "hey"));
+
+ // Close E and D.
+ EXPECT_EQ(OK, node0.node().ClosePort(E));
+ EXPECT_EQ(OK, node1.node().ClosePort(D));
+
+ WaitForIdle();
+
+ // Expect everything to have gone away.
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, MergePortsFailsGracefully) {
+ // This tests that the system remains in a well-defined state if something
+ // goes wrong during port merge.
+
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ // Setup two independent port pairs, A-B on node0 and C-D on node1.
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
+
+ ScopedMessage message;
+ PortRef X, Y;
+ EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X));
+ EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y));
+ EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name()));
+ EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name()));
+
+ // Block the merge from proceeding until we can do something stupid with port
+ // C. This avoids the test logic racing with async merge logic.
+ node1.BlockOnEvent(Event::Type::kMergePort);
+
+ // Initiate the merge between B and C.
+ EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
+
+ // Move C to a new port E. This is not a sane use of Node's public API but
+ // is still hypothetically possible. It allows us to force a merge failure
+ // because C will be in an invalid state by the time the merge is processed.
+ // As a result, B should be closed.
+ EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C));
+
+ node1.Unblock();
+
+ WaitForIdle();
+
+ ASSERT_TRUE(node0.ReadMessage(X, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ PortRef E;
+ ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(X));
+ EXPECT_EQ(OK, node1.node().ClosePort(Y));
+
+ WaitForIdle();
+
+ // C goes away as a result of normal proxy removal. B should have been closed
+ // cleanly by the failed MergePorts.
+ EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
+ EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));
+
+ // Close A, D, and E.
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
+ EXPECT_EQ(OK, node1.node().ClosePort(D));
+ EXPECT_EQ(OK, node0.node().ClosePort(E));
+
+ WaitForIdle();
+
+ // Expect everything to have gone away.
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, RemotePeerStatus) {
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ // Create a local port pair. Neither port should appear to have a remote peer.
+ PortRef a, b;
+ PortStatus status{};
+ node0.node().CreatePortPair(&a, &b);
+ ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
+ EXPECT_FALSE(status.peer_remote);
+ ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
+ EXPECT_FALSE(status.peer_remote);
+
+ // Create a port pair spanning the two nodes. Both spanning ports should
+ // immediately appear to have a remote peer.
+ PortRef x0, x1;
+ CreatePortPair(&node0, &x0, &node1, &x1);
+
+ ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
+ EXPECT_TRUE(status.peer_remote);
+ ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
+ EXPECT_TRUE(status.peer_remote);
+
+ PortRef x2, x3;
+ CreatePortPair(&node0, &x2, &node1, &x3);
+
+ // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers
+ // remote and the remote peers local.
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b));
+ EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1));
+ WaitForIdle();
+
+ ScopedMessage message;
+ ASSERT_TRUE(node0.ReadMessage(x2, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1));
+
+ ASSERT_TRUE(node1.ReadMessage(x3, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b));
+
+ // Now x0-x1 should be local to node0 and a-b should span the nodes.
+ ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
+ EXPECT_FALSE(status.peer_remote);
+ ASSERT_EQ(OK, node0.node().GetStatus(x1, &status));
+ EXPECT_FALSE(status.peer_remote);
+ ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
+ EXPECT_TRUE(status.peer_remote);
+ ASSERT_EQ(OK, node1.node().GetStatus(b, &status));
+ EXPECT_TRUE(status.peer_remote);
+
+ // And swap them back one more time.
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1));
+ EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b));
+ WaitForIdle();
+
+ ASSERT_TRUE(node0.ReadMessage(x2, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b));
+
+ ASSERT_TRUE(node1.ReadMessage(x3, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1));
+
+ ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
+ EXPECT_TRUE(status.peer_remote);
+ ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
+ EXPECT_TRUE(status.peer_remote);
+ ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
+ EXPECT_FALSE(status.peer_remote);
+ ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
+ EXPECT_FALSE(status.peer_remote);
+
+ EXPECT_EQ(OK, node0.node().ClosePort(x0));
+ EXPECT_EQ(OK, node1.node().ClosePort(x1));
+ EXPECT_EQ(OK, node0.node().ClosePort(x2));
+ EXPECT_EQ(OK, node1.node().ClosePort(x3));
+ EXPECT_EQ(OK, node0.node().ClosePort(a));
+ EXPECT_EQ(OK, node0.node().ClosePort(b));
+
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) {
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ // Set up a-b on node0 and c-d spanning node0-node1.
+ PortRef a, b, c, d;
+ node0.node().CreatePortPair(&a, &b);
+ CreatePortPair(&node0, &c, &node1, &d);
+
+ PortStatus status{};
+ ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
+ EXPECT_FALSE(status.peer_remote);
+ ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
+ EXPECT_FALSE(status.peer_remote);
+ ASSERT_EQ(OK, node0.node().GetStatus(c, &status));
+ EXPECT_TRUE(status.peer_remote);
+ ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
+ EXPECT_TRUE(status.peer_remote);
+
+ EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c));
+ WaitForIdle();
+
+ ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
+ EXPECT_TRUE(status.peer_remote);
+ ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
+ EXPECT_TRUE(status.peer_remote);
+
+ EXPECT_EQ(OK, node0.node().ClosePort(a));
+ EXPECT_EQ(OK, node1.node().ClosePort(d));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) {
+ TestNode node0(0);
+ AddNode(&node0);
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ // Set up a-b on node0 and c-d on node1.
+ PortRef a, b, c, d;
+ node0.node().CreatePortPair(&a, &b);
+ node1.node().CreatePortPair(&c, &d);
+
+ PortStatus status{};
+ ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
+ EXPECT_FALSE(status.peer_remote);
+ ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
+ EXPECT_FALSE(status.peer_remote);
+ ASSERT_EQ(OK, node1.node().GetStatus(c, &status));
+ EXPECT_FALSE(status.peer_remote);
+ ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
+ EXPECT_FALSE(status.peer_remote);
+
+ EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name()));
+ WaitForIdle();
+
+ ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
+ EXPECT_TRUE(status.peer_remote);
+ ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
+ EXPECT_TRUE(status.peer_remote);
+
+ EXPECT_EQ(OK, node0.node().ClosePort(a));
+ EXPECT_EQ(OK, node1.node().ClosePort(d));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
+}
+
+TEST_F(PortsTest, RetransmitUserMessageEvents) {
+ // Ensures that user message events can be retransmitted properly.
+ TestNode node0(0);
+ AddNode(&node0);
+
+ PortRef a, b;
+ node0.node().CreatePortPair(&a, &b);
+
+ // Ping.
+ const char* kMessage = "hey";
+ ScopedMessage message;
+ EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage));
+ ASSERT_TRUE(node0.ReadMessage(b, &message));
+ EXPECT_TRUE(MessageEquals(message, kMessage));
+
+ // Pong.
+ EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
+ EXPECT_FALSE(message);
+ ASSERT_TRUE(node0.ReadMessage(a, &message));
+ EXPECT_TRUE(MessageEquals(message, kMessage));
+
+ // Ping again.
+ EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message)));
+ EXPECT_FALSE(message);
+ ASSERT_TRUE(node0.ReadMessage(b, &message));
+ EXPECT_TRUE(MessageEquals(message, kMessage));
+
+ // Pong again!
+ EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
+ EXPECT_FALSE(message);
+ ASSERT_TRUE(node0.ReadMessage(a, &message));
+ EXPECT_TRUE(MessageEquals(message, kMessage));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(a));
+ EXPECT_EQ(OK, node0.node().ClosePort(b));
+}
+
+TEST_F(PortsTest, SetAcknowledgeRequestInterval) {
+ TestNode node0(0);
+ AddNode(&node0);
+
+ PortRef a0, a1;
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
+ EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
+
+ // Send a batch of messages.
+ EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 15));
+ EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
+ EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
+ WaitForIdle();
+ EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
+
+ // Set to acknowledge every read message, and validate that already-read
+ // messages are acknowledged.
+ EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 1));
+ WaitForIdle();
+ EXPECT_EQ(10u, node0.GetUnacknowledgedMessageCount(a0));
+
+ // Read a third of the messages from the other end.
+ EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
+ WaitForIdle();
+
+ EXPECT_EQ(5u, node0.GetUnacknowledgedMessageCount(a0));
+
+ TestNode node1(1);
+ AddNode(&node1);
+
+ // Transfer a1 across to node1.
+ PortRef x0, x1;
+ CreatePortPair(&node0, &x0, &node1, &x1);
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
+ WaitForIdle();
+
+ ScopedMessage message;
+ ASSERT_TRUE(node1.ReadMessage(x1, &message));
+ ASSERT_EQ(1u, message->num_ports());
+ ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &a1));
+
+ // Read the last third of the messages from the transferred node, and
+ // validate that the unacknowledge message count updates correctly.
+ EXPECT_TRUE(node1.ReadMultipleMessages(a1, 5));
+ WaitForIdle();
+ EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
+
+ // Turn the acknowledges down and validate that they don't go on indefinitely.
+ EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 0));
+ EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 10));
+ WaitForIdle();
+ EXPECT_TRUE(node1.ReadMultipleMessages(a1, 10));
+ WaitForIdle();
+ EXPECT_NE(0u, node0.GetUnacknowledgedMessageCount(a0));
+
+ // Close the far port and validate that the closure updates the unacknowledged
+ // count.
+ EXPECT_EQ(OK, node1.node().ClosePort(a1));
+ WaitForIdle();
+ EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));
+
+ EXPECT_EQ(OK, node0.node().ClosePort(a0));
+ EXPECT_EQ(OK, node0.node().ClosePort(x0));
+ EXPECT_EQ(OK, node1.node().ClosePort(x1));
+}
+
+} // namespace test
+} // namespace ports
+} // namespace core
+} // namespace mojo