summaryrefslogtreecommitdiffstats
path: root/src/test/direct_messenger
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/direct_messenger')
-rw-r--r--src/test/direct_messenger/CMakeLists.txt5
-rw-r--r--src/test/direct_messenger/DirectMessenger.cc252
-rw-r--r--src/test/direct_messenger/DirectMessenger.h98
-rw-r--r--src/test/direct_messenger/DispatchStrategy.h37
-rw-r--r--src/test/direct_messenger/FastStrategy.h35
-rw-r--r--src/test/direct_messenger/QueueStrategy.cc107
-rw-r--r--src/test/direct_messenger/QueueStrategy.h63
-rw-r--r--src/test/direct_messenger/test_direct_messenger.cc436
8 files changed, 1033 insertions, 0 deletions
diff --git a/src/test/direct_messenger/CMakeLists.txt b/src/test/direct_messenger/CMakeLists.txt
new file mode 100644
index 000000000..6ca56946e
--- /dev/null
+++ b/src/test/direct_messenger/CMakeLists.txt
@@ -0,0 +1,5 @@
+# unittest_direct_messenger
+#add_library(QueueStrategy OBJECT QueueStrategy.cc)
+#add_executable(unittest_direct_messenger $<TARGET_OBJECTS:QueueStrategy> test_direct_messenger.cc DirectMessenger.cc)
+#add_ceph_unittest(unittest_direct_messenger)
+#target_link_libraries(unittest_direct_messenger global)
diff --git a/src/test/direct_messenger/DirectMessenger.cc b/src/test/direct_messenger/DirectMessenger.cc
new file mode 100644
index 000000000..3aeff4fee
--- /dev/null
+++ b/src/test/direct_messenger/DirectMessenger.cc
@@ -0,0 +1,252 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "DirectMessenger.h"
+#include "DispatchStrategy.h"
+
+
+class DirectConnection : public Connection {
+ /// sent messages are dispatched here
+ DispatchStrategy *const dispatchers;
+
+ /// the connection that will be attached to outgoing messages, so that replies
+ /// can be dispatched back to the sender. the pointer is atomic for
+ /// thread-safety between mark_down() and send_message(). no reference is held
+ /// on this Connection to avoid cyclical refs. we don't need a reference
+ /// because its owning DirectMessenger will mark both connections down (and
+ /// clear this pointer) before dropping its own reference
+ std::atomic<Connection*> reply_connection{nullptr};
+
+ private:
+ FRIEND_MAKE_REF(DirectConnection);
+ DirectConnection(CephContext *cct, DirectMessenger *m,
+ DispatchStrategy *dispatchers)
+ : Connection(cct, m),
+ dispatchers(dispatchers)
+ {}
+
+ public:
+ /// sets the Connection that will receive replies to outgoing messages
+ void set_direct_reply_connection(ConnectionRef conn);
+
+ /// return true if a peer connection exists
+ bool is_connected() override;
+
+ /// pass the given message directly to our dispatchers
+ int send_message(Message *m) override;
+
+ /// release our pointer to the peer connection. later calls to is_connected()
+ /// will return false, and send_message() will fail with -ENOTCONN
+ void mark_down() override;
+
+ /// noop - keepalive messages are not needed within a process
+ void send_keepalive() override {}
+
+ /// noop - reconnect/recovery semantics are not needed within a process
+ void mark_disposable() override {}
+};
+
+void DirectConnection::set_direct_reply_connection(ConnectionRef conn)
+{
+ reply_connection.store(conn.get());
+}
+
+bool DirectConnection::is_connected()
+{
+ // true between calls to set_direct_reply_connection() and mark_down()
+ return reply_connection.load() != nullptr;
+}
+
+int DirectConnection::send_message(Message *m)
+{
+ // read reply_connection atomically and take a reference
+ ConnectionRef conn = reply_connection.load();
+ if (!conn) {
+ m->put();
+ return -ENOTCONN;
+ }
+ // attach reply_connection to the Message, so that calls to
+ // m->get_connection()->send_message() can be dispatched back to the sender
+ m->set_connection(conn);
+
+ dispatchers->ds_dispatch(m);
+ return 0;
+}
+
+void DirectConnection::mark_down()
+{
+ Connection *conn = reply_connection.load();
+ if (!conn) {
+ return; // already marked down
+ }
+ if (!reply_connection.compare_exchange_weak(conn, nullptr)) {
+ return; // lost the race to mark down
+ }
+ // called only once to avoid loops
+ conn->mark_down();
+}
+
+
+static ConnectionRef create_loopback(DirectMessenger *m,
+ entity_name_t name,
+ DispatchStrategy *dispatchers)
+{
+ auto loopback = ceph::make_ref<DirectConnection>(m->cct, m, dispatchers);
+ // loopback replies go to itself
+ loopback->set_direct_reply_connection(loopback);
+ loopback->set_peer_type(name.type());
+ loopback->set_features(CEPH_FEATURES_ALL);
+ return loopback;
+}
+
+DirectMessenger::DirectMessenger(CephContext *cct, entity_name_t name,
+ string mname, uint64_t nonce,
+ DispatchStrategy *dispatchers)
+ : SimplePolicyMessenger(cct, name, mname, nonce),
+ dispatchers(dispatchers),
+ loopback_connection(create_loopback(this, name, dispatchers))
+{
+ dispatchers->set_messenger(this);
+}
+
+DirectMessenger::~DirectMessenger()
+{
+}
+
+int DirectMessenger::set_direct_peer(DirectMessenger *peer)
+{
+ if (get_myinst() == peer->get_myinst()) {
+ return -EADDRINUSE; // must have a different entity instance
+ }
+ peer_inst = peer->get_myinst();
+
+ // allocate a Connection that dispatches to the peer messenger
+ auto direct_connection = ceph::make_ref<DirectConnection>(cct, peer, peer->dispatchers.get());
+
+ direct_connection->set_peer_addr(peer_inst.addr);
+ direct_connection->set_peer_type(peer_inst.name.type());
+ direct_connection->set_features(CEPH_FEATURES_ALL);
+
+ // if set_direct_peer() was already called on the peer messenger, we can
+ // finish by attaching their connections. if not, the later call to
+ // peer->set_direct_peer() will attach their connection to ours
+ auto connection = peer->get_connection(get_myinst());
+ if (connection) {
+ auto p = static_cast<DirectConnection*>(connection.get());
+
+ p->set_direct_reply_connection(direct_connection);
+ direct_connection->set_direct_reply_connection(p);
+ }
+
+ peer_connection = std::move(direct_connection);
+ return 0;
+}
+
+int DirectMessenger::bind(const entity_addr_t &bind_addr)
+{
+ if (peer_connection) {
+ return -EINVAL; // can't change address after sharing it with the peer
+ }
+ set_myaddr(bind_addr);
+ loopback_connection->set_peer_addr(bind_addr);
+ return 0;
+}
+
+int DirectMessenger::client_bind(const entity_addr_t &bind_addr)
+{
+ // same as bind
+ return bind(bind_addr);
+}
+
+int DirectMessenger::start()
+{
+ if (!peer_connection) {
+ return -EINVAL; // did not connect to a peer
+ }
+ if (started) {
+ return -EINVAL; // already started
+ }
+
+ dispatchers->start();
+ return SimplePolicyMessenger::start();
+}
+
+int DirectMessenger::shutdown()
+{
+ if (!started) {
+ return -EINVAL; // not started
+ }
+
+ mark_down_all();
+ peer_connection.reset();
+ loopback_connection.reset();
+
+ dispatchers->shutdown();
+ SimplePolicyMessenger::shutdown();
+ sem.Put(); // signal wait()
+ return 0;
+}
+
+void DirectMessenger::wait()
+{
+ sem.Get(); // wait on signal from shutdown()
+ dispatchers->wait();
+}
+
+ConnectionRef DirectMessenger::get_connection(const entity_inst_t& dst)
+{
+ if (dst == peer_inst) {
+ return peer_connection;
+ }
+ if (dst == get_myinst()) {
+ return loopback_connection;
+ }
+ return nullptr;
+}
+
+ConnectionRef DirectMessenger::get_loopback_connection()
+{
+ return loopback_connection;
+}
+
+int DirectMessenger::send_message(Message *m, const entity_inst_t& dst)
+{
+ auto conn = get_connection(dst);
+ if (!conn) {
+ m->put();
+ return -ENOTCONN;
+ }
+ return conn->send_message(m);
+}
+
+void DirectMessenger::mark_down(const entity_addr_t& addr)
+{
+ ConnectionRef conn;
+ if (addr == peer_inst.addr) {
+ conn = peer_connection;
+ } else if (addr == get_myaddr_legacy()) {
+ conn = loopback_connection;
+ }
+ if (conn) {
+ conn->mark_down();
+ }
+}
+
+void DirectMessenger::mark_down_all()
+{
+ if (peer_connection) {
+ peer_connection->mark_down();
+ }
+ loopback_connection->mark_down();
+}
diff --git a/src/test/direct_messenger/DirectMessenger.h b/src/test/direct_messenger/DirectMessenger.h
new file mode 100644
index 000000000..710fcfb37
--- /dev/null
+++ b/src/test/direct_messenger/DirectMessenger.h
@@ -0,0 +1,98 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_DIRECTMESSENGER_H
+#define CEPH_MSG_DIRECTMESSENGER_H
+
+#include "msg/SimplePolicyMessenger.h"
+#include "common/Semaphore.h"
+
+
+class DispatchStrategy;
+
+/**
+ * DirectMessenger provides a direct path between two messengers
+ * within a process. A pair of DirectMessengers share their
+ * DispatchStrategy with each other, and calls to send_message()
+ * forward the message directly to the other.
+ *
+ * This is for testing and i/o injection only, and cannot be used
+ * for normal messengers with ms_type.
+ */
+class DirectMessenger : public SimplePolicyMessenger {
+ private:
+ /// strategy for local dispatch
+ std::unique_ptr<DispatchStrategy> dispatchers;
+ /// peer instance for comparison in get_connection()
+ entity_inst_t peer_inst;
+ /// connection that sends to the peer's dispatchers
+ ConnectionRef peer_connection;
+ /// connection that sends to my own dispatchers
+ ConnectionRef loopback_connection;
+ /// semaphore for signalling wait() from shutdown()
+ Semaphore sem;
+
+ public:
+ DirectMessenger(CephContext *cct, entity_name_t name,
+ string mname, uint64_t nonce,
+ DispatchStrategy *dispatchers);
+ ~DirectMessenger();
+
+ /// attach to a peer messenger. must be called before start()
+ int set_direct_peer(DirectMessenger *peer);
+
+
+ // Messenger interface
+
+ /// sets the addr. must not be called after set_direct_peer() or start()
+ int bind(const entity_addr_t& bind_addr) override;
+
+ /// sets the addr. must not be called after set_direct_peer() or start()
+ int client_bind(const entity_addr_t& bind_addr) override;
+
+ /// starts dispatchers
+ int start() override;
+
+ /// breaks connections, stops dispatchers, and unblocks callers of wait()
+ int shutdown() override;
+
+ /// blocks until shutdown() completes
+ void wait() override;
+
+ /// returns a connection to the peer instance, a loopback connection to our
+ /// own instance, or null if not connected
+ ConnectionRef get_connection(const entity_inst_t& dst) override;
+
+ /// returns a loopback connection that dispatches to this messenger
+ ConnectionRef get_loopback_connection() override;
+
+ /// dispatches a message to the peer instance if connected
+ int send_message(Message *m, const entity_inst_t& dst) override;
+
+ /// mark down the connection for the given address
+ void mark_down(const entity_addr_t& a) override;
+
+ /// mark down all connections
+ void mark_down_all() override;
+
+
+ // unimplemented Messenger interface
+ void set_addr_unknowns(const entity_addr_t &addr) override {}
+ void set_addr(const entity_addr_t &addr) override {}
+ int get_dispatch_queue_len() override { return 0; }
+ double get_dispatch_queue_max_age(utime_t now) override { return 0; }
+ void set_cluster_protocol(int p) override {}
+};
+
+#endif
diff --git a/src/test/direct_messenger/DispatchStrategy.h b/src/test/direct_messenger/DispatchStrategy.h
new file mode 100644
index 000000000..4c9726ed6
--- /dev/null
+++ b/src/test/direct_messenger/DispatchStrategy.h
@@ -0,0 +1,37 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef DISPATCH_STRATEGY_H
+#define DISPATCH_STRATEGY_H
+
+#include "msg/Message.h"
+
+class Messenger;
+
+class DispatchStrategy
+{
+protected:
+ Messenger *msgr = nullptr;
+public:
+ DispatchStrategy() {}
+ Messenger *get_messenger() { return msgr; }
+ void set_messenger(Messenger *_msgr) { msgr = _msgr; }
+ virtual void ds_dispatch(Message *m) = 0;
+ virtual void shutdown() = 0;
+ virtual void start() = 0;
+ virtual void wait() = 0;
+ virtual ~DispatchStrategy() {}
+};
+
+#endif /* DISPATCH_STRATEGY_H */
diff --git a/src/test/direct_messenger/FastStrategy.h b/src/test/direct_messenger/FastStrategy.h
new file mode 100644
index 000000000..001ff4004
--- /dev/null
+++ b/src/test/direct_messenger/FastStrategy.h
@@ -0,0 +1,35 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef FAST_STRATEGY_H
+#define FAST_STRATEGY_H
+#include "DispatchStrategy.h"
+
+class FastStrategy : public DispatchStrategy {
+public:
+ FastStrategy() {}
+ void ds_dispatch(Message *m) override {
+ msgr->ms_fast_preprocess(m);
+ if (msgr->ms_can_fast_dispatch(m))
+ msgr->ms_fast_dispatch(m);
+ else
+ msgr->ms_deliver_dispatch(m);
+ }
+ void shutdown() override {}
+ void start() override {}
+ void wait() override {}
+ virtual ~FastStrategy() {}
+};
+#endif /* FAST_STRATEGY_H */
diff --git a/src/test/direct_messenger/QueueStrategy.cc b/src/test/direct_messenger/QueueStrategy.cc
new file mode 100644
index 000000000..342494c5a
--- /dev/null
+++ b/src/test/direct_messenger/QueueStrategy.cc
@@ -0,0 +1,107 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include <string>
+#include "QueueStrategy.h"
+#define dout_subsys ceph_subsys_ms
+#include "common/debug.h"
+
+QueueStrategy::QueueStrategy(int _n_threads)
+ : n_threads(_n_threads),
+ stop(false),
+ mqueue(),
+ disp_threads()
+{
+}
+
+void QueueStrategy::ds_dispatch(Message *m) {
+ msgr->ms_fast_preprocess(m);
+ if (msgr->ms_can_fast_dispatch(m)) {
+ msgr->ms_fast_dispatch(m);
+ return;
+ }
+ std::lock_guard l{lock};
+ mqueue.push_back(*m);
+ if (disp_threads.size()) {
+ if (! disp_threads.empty()) {
+ QSThread *thrd = &disp_threads.front();
+ disp_threads.pop_front();
+ thrd->cond.notify_all();
+ }
+ }
+}
+
+void QueueStrategy::entry(QSThread *thrd)
+{
+ for (;;) {
+ ceph::ref_t<Message> m;
+ std::unique_lock l{lock};
+ for (;;) {
+ if (! mqueue.empty()) {
+ m = ceph::ref_t<Message>(&mqueue.front(), false);
+ mqueue.pop_front();
+ break;
+ }
+ if (stop)
+ break;
+ disp_threads.push_front(*thrd);
+ thrd->cond.wait(l);
+ }
+ l.unlock();
+ if (stop) {
+ if (!m) break;
+ continue;
+ }
+ get_messenger()->ms_deliver_dispatch(m);
+ }
+}
+
+void QueueStrategy::shutdown()
+{
+ QSThread *thrd;
+ std::lock_guard l{lock};
+ stop = true;
+ while (disp_threads.size()) {
+ thrd = &(disp_threads.front());
+ disp_threads.pop_front();
+ thrd->cond.notify_all();
+ }
+}
+
+void QueueStrategy::wait()
+{
+ std::unique_lock l{lock};
+ ceph_assert(stop);
+ for (auto& thread : threads) {
+ l.unlock();
+
+ // join outside of lock
+ thread->join();
+
+ l.lock();
+ }
+}
+
+void QueueStrategy::start()
+{
+ ceph_assert(!stop);
+ std::lock_guard l{lock};
+ threads.reserve(n_threads);
+ for (int ix = 0; ix < n_threads; ++ix) {
+ std::string thread_name = "ms_qs_";
+ thread_name.append(std::to_string(ix));
+ auto thrd = std::make_unique<QSThread>(this);
+ thrd->create(thread_name.c_str());
+ threads.emplace_back(std::move(thrd));
+ }
+}
diff --git a/src/test/direct_messenger/QueueStrategy.h b/src/test/direct_messenger/QueueStrategy.h
new file mode 100644
index 000000000..b7f6df85d
--- /dev/null
+++ b/src/test/direct_messenger/QueueStrategy.h
@@ -0,0 +1,63 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef QUEUE_STRATEGY_H
+#define QUEUE_STRATEGY_H
+
+#include <vector>
+#include <memory>
+#include <boost/intrusive/list.hpp>
+#include "DispatchStrategy.h"
+#include "msg/Messenger.h"
+
+namespace bi = boost::intrusive;
+
+class QueueStrategy : public DispatchStrategy {
+ ceph::mutex lock = ceph::make_mutex("QueueStrategy::lock");
+ const int n_threads;
+ bool stop;
+
+ Message::Queue mqueue;
+
+ class QSThread : public Thread {
+ public:
+ bi::list_member_hook<> thread_q;
+ QueueStrategy *dq;
+ ceph::condition_variable cond;
+ explicit QSThread(QueueStrategy *dq) : thread_q(), dq(dq) {}
+ void* entry() {
+ dq->entry(this);
+ return NULL;
+ }
+
+ typedef bi::list< QSThread,
+ bi::member_hook< QSThread,
+ bi::list_member_hook<>,
+ &QSThread::thread_q > > Queue;
+ };
+
+ std::vector<std::unique_ptr<QSThread>> threads; //< all threads
+ QSThread::Queue disp_threads; //< waiting threads
+
+public:
+ explicit QueueStrategy(int n_threads);
+ void ds_dispatch(Message *m) override;
+ void shutdown() override;
+ void start() override;
+ void wait() override;
+ void entry(QSThread *thrd);
+ virtual ~QueueStrategy() {}
+};
+#endif /* QUEUE_STRATEGY_H */
diff --git a/src/test/direct_messenger/test_direct_messenger.cc b/src/test/direct_messenger/test_direct_messenger.cc
new file mode 100644
index 000000000..311fce161
--- /dev/null
+++ b/src/test/direct_messenger/test_direct_messenger.cc
@@ -0,0 +1,436 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "global/global_init.h"
+#include "common/ceph_argparse.h"
+
+#include "DirectMessenger.h"
+#include "FastStrategy.h"
+#include "QueueStrategy.h"
+#include "messages/MPing.h"
+
+
+/// mock dispatcher that calls the given callback
+class MockDispatcher : public Dispatcher {
+ std::function<void(Message*)> callback;
+ public:
+ MockDispatcher(CephContext *cct, std::function<void(Message*)> callback)
+ : Dispatcher(cct), callback(std::move(callback)) {}
+ bool ms_handle_reset(Connection *con) override { return false; }
+ void ms_handle_remote_reset(Connection *con) override {}
+ bool ms_handle_refused(Connection *con) override { return false; }
+ bool ms_dispatch(Message *m) override {
+ callback(m);
+ m->put();
+ return true;
+ }
+};
+
+/// test synchronous dispatch of messenger and connection interfaces
+TEST(DirectMessenger, SyncDispatch)
+{
+ auto cct = g_ceph_context;
+
+ // use FastStrategy for synchronous dispatch
+ DirectMessenger client(cct, entity_name_t::CLIENT(1),
+ "client", 0, new FastStrategy());
+ DirectMessenger server(cct, entity_name_t::CLIENT(2),
+ "server", 0, new FastStrategy());
+
+ ASSERT_EQ(0, client.set_direct_peer(&server));
+ ASSERT_EQ(0, server.set_direct_peer(&client));
+
+ bool got_request = false;
+ bool got_reply = false;
+
+ MockDispatcher client_dispatcher(cct, [&] (Message *m) {
+ got_reply = true;
+ });
+ client.add_dispatcher_head(&client_dispatcher);
+
+ MockDispatcher server_dispatcher(cct, [&] (Message *m) {
+ got_request = true;
+ ASSERT_EQ(0, m->get_connection()->send_message(new MPing()));
+ });
+ server.add_dispatcher_head(&server_dispatcher);
+
+ ASSERT_EQ(0, client.start());
+ ASSERT_EQ(0, server.start());
+
+ // test DirectMessenger::send_message()
+ ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst()));
+ ASSERT_TRUE(got_request);
+ ASSERT_TRUE(got_reply);
+
+ // test DirectConnection::send_message()
+ {
+ got_request = false;
+ got_reply = false;
+ auto conn = client.get_connection(server.get_myinst());
+ ASSERT_EQ(0, conn->send_message(new MPing()));
+ ASSERT_TRUE(got_request);
+ ASSERT_TRUE(got_reply);
+ }
+
+ // test DirectMessenger::send_message() with loopback address
+ got_request = false;
+ got_reply = false;
+ ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst()));
+ ASSERT_FALSE(got_request); // server should never see this
+ ASSERT_TRUE(got_reply);
+
+ // test DirectConnection::send_message() with loopback address
+ {
+ got_request = false;
+ got_reply = false;
+ auto conn = client.get_connection(client.get_myinst());
+ ASSERT_EQ(0, conn->send_message(new MPing()));
+ ASSERT_FALSE(got_request); // server should never see this
+ ASSERT_TRUE(got_reply);
+ }
+
+ // test DirectConnection::send_message() with loopback connection
+ {
+ got_request = false;
+ got_reply = false;
+ auto conn = client.get_loopback_connection();
+ ASSERT_EQ(0, conn->send_message(new MPing()));
+ ASSERT_FALSE(got_request); // server should never see this
+ ASSERT_TRUE(got_reply);
+ }
+
+ ASSERT_EQ(0, client.shutdown());
+ client.wait();
+
+ ASSERT_EQ(0, server.shutdown());
+ server.wait();
+}
+
+/// test asynchronous dispatch of messenger and connection interfaces
+TEST(DirectMessenger, AsyncDispatch)
+{
+ auto cct = g_ceph_context;
+
+ // use QueueStrategy for async replies
+ DirectMessenger client(cct, entity_name_t::CLIENT(1),
+ "client", 0, new QueueStrategy(1));
+ DirectMessenger server(cct, entity_name_t::CLIENT(2),
+ "server", 0, new FastStrategy());
+
+ ASSERT_EQ(0, client.set_direct_peer(&server));
+ ASSERT_EQ(0, server.set_direct_peer(&client));
+
+ // condition variable to wait on ping reply
+ std::mutex mutex;
+ std::condition_variable cond;
+ bool done = false;
+
+ auto wait_for_reply = [&] {
+ std::unique_lock<std::mutex> lock(mutex);
+ while (!done) {
+ cond.wait(lock);
+ }
+ done = false; // clear for reuse
+ };
+
+ // client dispatcher signals the condition variable on reply
+ MockDispatcher client_dispatcher(cct, [&] (Message *m) {
+ std::lock_guard<std::mutex> lock(mutex);
+ done = true;
+ cond.notify_one();
+ });
+ client.add_dispatcher_head(&client_dispatcher);
+
+ MockDispatcher server_dispatcher(cct, [&] (Message *m) {
+ // hold the lock over the call to send_message() to prove that the client's
+ // dispatch is asynchronous. if it isn't, it will deadlock
+ std::lock_guard<std::mutex> lock(mutex);
+ ASSERT_EQ(0, m->get_connection()->send_message(new MPing()));
+ });
+ server.add_dispatcher_head(&server_dispatcher);
+
+ ASSERT_EQ(0, client.start());
+ ASSERT_EQ(0, server.start());
+
+ // test DirectMessenger::send_message()
+ ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst()));
+ wait_for_reply();
+
+ // test DirectConnection::send_message()
+ {
+ auto conn = client.get_connection(server.get_myinst());
+ ASSERT_EQ(0, conn->send_message(new MPing()));
+ }
+ wait_for_reply();
+
+ // test DirectMessenger::send_message() with loopback address
+ {
+ // hold the lock to test that loopback dispatch is asynchronous
+ std::lock_guard<std::mutex> lock(mutex);
+ ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst()));
+ }
+ wait_for_reply();
+
+ // test DirectConnection::send_message() with loopback address
+ {
+ auto conn = client.get_connection(client.get_myinst());
+ // hold the lock to test that loopback dispatch is asynchronous
+ std::lock_guard<std::mutex> lock(mutex);
+ ASSERT_EQ(0, conn->send_message(new MPing()));
+ }
+ wait_for_reply();
+
+ // test DirectConnection::send_message() with loopback connection
+ {
+ auto conn = client.get_loopback_connection();
+ // hold the lock to test that loopback dispatch is asynchronous
+ std::lock_guard<std::mutex> lock(mutex);
+ ASSERT_EQ(0, conn->send_message(new MPing()));
+ }
+ wait_for_reply();
+
+ ASSERT_EQ(0, client.shutdown());
+ client.wait();
+
+ ASSERT_EQ(0, server.shutdown());
+ server.wait();
+}
+
+/// test that wait() blocks until shutdown()
+TEST(DirectMessenger, WaitShutdown)
+{
+ auto cct = g_ceph_context;
+
+ // test wait() with both Queue- and FastStrategy
+ DirectMessenger client(cct, entity_name_t::CLIENT(1),
+ "client", 0, new QueueStrategy(1));
+ DirectMessenger server(cct, entity_name_t::CLIENT(2),
+ "server", 0, new FastStrategy());
+
+ ASSERT_EQ(0, client.set_direct_peer(&server));
+ ASSERT_EQ(0, server.set_direct_peer(&client));
+
+ ASSERT_EQ(0, client.start());
+ ASSERT_EQ(0, server.start());
+
+ std::atomic<bool> client_waiting{false};
+ std::atomic<bool> server_waiting{false};
+
+ // spawn threads to wait() on each of the messengers
+ std::thread client_thread([&] {
+ client_waiting = true;
+ client.wait();
+ client_waiting = false;
+ });
+ std::thread server_thread([&] {
+ server_waiting = true;
+ server.wait();
+ server_waiting = false;
+ });
+
+ // give them time to start
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+
+ ASSERT_TRUE(client_waiting);
+ ASSERT_TRUE(server_waiting);
+
+ // call shutdown to unblock the waiting threads
+ ASSERT_EQ(0, client.shutdown());
+ ASSERT_EQ(0, server.shutdown());
+
+ client_thread.join();
+ server_thread.join();
+
+ ASSERT_FALSE(client_waiting);
+ ASSERT_FALSE(server_waiting);
+}
+
+/// test connection and messenger interfaces after mark_down()
+TEST(DirectMessenger, MarkDown)
+{
+ auto cct = g_ceph_context;
+
+ DirectMessenger client(cct, entity_name_t::CLIENT(1),
+ "client", 0, new FastStrategy());
+ DirectMessenger server(cct, entity_name_t::CLIENT(2),
+ "server", 0, new FastStrategy());
+
+ ASSERT_EQ(0, client.set_direct_peer(&server));
+ ASSERT_EQ(0, server.set_direct_peer(&client));
+
+ ASSERT_EQ(0, client.start());
+ ASSERT_EQ(0, server.start());
+
+ auto client_to_server = client.get_connection(server.get_myinst());
+ auto server_to_client = server.get_connection(client.get_myinst());
+
+ ASSERT_TRUE(client_to_server->is_connected());
+ ASSERT_TRUE(server_to_client->is_connected());
+
+ // mark_down() breaks the connection on both sides
+ client_to_server->mark_down();
+
+ ASSERT_FALSE(client_to_server->is_connected());
+ ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing()));
+ ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst()));
+
+ ASSERT_FALSE(server_to_client->is_connected());
+ ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing()));
+ ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client.get_myinst()));
+
+ ASSERT_EQ(0, client.shutdown());
+ client.wait();
+
+ ASSERT_EQ(0, server.shutdown());
+ server.wait();
+}
+
+/// test connection and messenger interfaces after shutdown()
+TEST(DirectMessenger, SendShutdown)
+{
+ auto cct = g_ceph_context;
+
+ // put client on the heap so we can free it early
+ std::unique_ptr<DirectMessenger> client{
+ new DirectMessenger(cct, entity_name_t::CLIENT(1),
+ "client", 0, new FastStrategy())};
+ DirectMessenger server(cct, entity_name_t::CLIENT(2),
+ "server", 0, new FastStrategy());
+
+ ASSERT_EQ(0, client->set_direct_peer(&server));
+ ASSERT_EQ(0, server.set_direct_peer(client.get()));
+
+ ASSERT_EQ(0, client->start());
+ ASSERT_EQ(0, server.start());
+
+ const auto client_inst = client->get_myinst();
+ const auto server_inst = server.get_myinst();
+
+ auto client_to_server = client->get_connection(server_inst);
+ auto server_to_client = server.get_connection(client_inst);
+
+ ASSERT_TRUE(client_to_server->is_connected());
+ ASSERT_TRUE(server_to_client->is_connected());
+
+ // shut down the client to break connections
+ ASSERT_EQ(0, client->shutdown());
+ client->wait();
+
+ ASSERT_FALSE(client_to_server->is_connected());
+ ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing()));
+ ASSERT_EQ(-ENOTCONN, client->send_message(new MPing(), server_inst));
+
+ // free the client connection/messenger to test that calls to the server no
+ // longer try to dereference them
+ client_to_server.reset();
+ client.reset();
+
+ ASSERT_FALSE(server_to_client->is_connected());
+ ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing()));
+ ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client_inst));
+
+ ASSERT_EQ(0, server.shutdown());
+ server.wait();
+}
+
+/// test connection and messenger interfaces after bind()
+TEST(DirectMessenger, Bind)
+{
+ auto cct = g_ceph_context;
+
+ DirectMessenger client(cct, entity_name_t::CLIENT(1),
+ "client", 0, new FastStrategy());
+ DirectMessenger server(cct, entity_name_t::CLIENT(2),
+ "server", 0, new FastStrategy());
+
+ entity_addr_t client_addr;
+ client_addr.set_family(AF_INET);
+ client_addr.set_port(1);
+
+ // client bind succeeds before set_direct_peer()
+ ASSERT_EQ(0, client.bind(client_addr));
+
+ ASSERT_EQ(0, client.set_direct_peer(&server));
+ ASSERT_EQ(0, server.set_direct_peer(&client));
+
+ // server bind fails after set_direct_peer()
+ entity_addr_t empty_addr;
+ ASSERT_EQ(-EINVAL, server.bind(empty_addr));
+
+ ASSERT_EQ(0, client.start());
+ ASSERT_EQ(0, server.start());
+
+ auto client_to_server = client.get_connection(server.get_myinst());
+ auto server_to_client = server.get_connection(client.get_myinst());
+
+ ASSERT_TRUE(client_to_server->is_connected());
+ ASSERT_TRUE(server_to_client->is_connected());
+
+ // no address in connection to server
+ ASSERT_EQ(empty_addr, client_to_server->get_peer_addr());
+ // bind address is reflected in connection to client
+ ASSERT_EQ(client_addr, server_to_client->get_peer_addr());
+
+ // mark_down() with bind address breaks the connection
+ server.mark_down(client_addr);
+
+ ASSERT_FALSE(client_to_server->is_connected());
+ ASSERT_FALSE(server_to_client->is_connected());
+
+ ASSERT_EQ(0, client.shutdown());
+ client.wait();
+
+ ASSERT_EQ(0, server.shutdown());
+ server.wait();
+}
+
+/// test connection and messenger interfaces before calls to set_direct_peer()
+TEST(DirectMessenger, StartWithoutPeer)
+{
+ auto cct = g_ceph_context;
+
+ DirectMessenger client(cct, entity_name_t::CLIENT(1),
+ "client", 0, new FastStrategy());
+ DirectMessenger server(cct, entity_name_t::CLIENT(2),
+ "server", 0, new FastStrategy());
+
+ // can't start until set_direct_peer()
+ ASSERT_EQ(-EINVAL, client.start());
+ ASSERT_EQ(-EINVAL, server.start());
+
+ ASSERT_EQ(0, client.set_direct_peer(&server));
+
+ // only client can start
+ ASSERT_EQ(0, client.start());
+ ASSERT_EQ(-EINVAL, server.start());
+
+ // client has a connection but can't send
+ auto conn = client.get_connection(server.get_myinst());
+ ASSERT_NE(nullptr, conn);
+ ASSERT_FALSE(conn->is_connected());
+ ASSERT_EQ(-ENOTCONN, conn->send_message(new MPing()));
+ ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst()));
+
+ ASSERT_EQ(0, client.shutdown());
+ client.wait();
+}
+
+int main(int argc, char **argv)
+{
+ // command-line arguments
+ auto args = argv_to_vec(argc, argv);
+
+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_ANY,
+ CODE_ENVIRONMENT_DAEMON,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ common_init_finish(cct.get());
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}