diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/test/direct_messenger | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/direct_messenger')
-rw-r--r-- | src/test/direct_messenger/CMakeLists.txt | 5 | ||||
-rw-r--r-- | src/test/direct_messenger/DirectMessenger.cc | 252 | ||||
-rw-r--r-- | src/test/direct_messenger/DirectMessenger.h | 98 | ||||
-rw-r--r-- | src/test/direct_messenger/DispatchStrategy.h | 37 | ||||
-rw-r--r-- | src/test/direct_messenger/FastStrategy.h | 35 | ||||
-rw-r--r-- | src/test/direct_messenger/QueueStrategy.cc | 107 | ||||
-rw-r--r-- | src/test/direct_messenger/QueueStrategy.h | 63 | ||||
-rw-r--r-- | src/test/direct_messenger/test_direct_messenger.cc | 436 |
8 files changed, 1033 insertions, 0 deletions
diff --git a/src/test/direct_messenger/CMakeLists.txt b/src/test/direct_messenger/CMakeLists.txt new file mode 100644 index 000000000..6ca56946e --- /dev/null +++ b/src/test/direct_messenger/CMakeLists.txt @@ -0,0 +1,5 @@ +# unittest_direct_messenger +#add_library(QueueStrategy OBJECT QueueStrategy.cc) +#add_executable(unittest_direct_messenger $<TARGET_OBJECTS:QueueStrategy> test_direct_messenger.cc DirectMessenger.cc) +#add_ceph_unittest(unittest_direct_messenger) +#target_link_libraries(unittest_direct_messenger global) diff --git a/src/test/direct_messenger/DirectMessenger.cc b/src/test/direct_messenger/DirectMessenger.cc new file mode 100644 index 000000000..3aeff4fee --- /dev/null +++ b/src/test/direct_messenger/DirectMessenger.cc @@ -0,0 +1,252 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "DirectMessenger.h" +#include "DispatchStrategy.h" + + +class DirectConnection : public Connection { + /// sent messages are dispatched here + DispatchStrategy *const dispatchers; + + /// the connection that will be attached to outgoing messages, so that replies + /// can be dispatched back to the sender. the pointer is atomic for + /// thread-safety between mark_down() and send_message(). no reference is held + /// on this Connection to avoid cyclical refs. we don't need a reference + /// because its owning DirectMessenger will mark both connections down (and + /// clear this pointer) before dropping its own reference + std::atomic<Connection*> reply_connection{nullptr}; + + private: + FRIEND_MAKE_REF(DirectConnection); + DirectConnection(CephContext *cct, DirectMessenger *m, + DispatchStrategy *dispatchers) + : Connection(cct, m), + dispatchers(dispatchers) + {} + + public: + /// sets the Connection that will receive replies to outgoing messages + void set_direct_reply_connection(ConnectionRef conn); + + /// return true if a peer connection exists + bool is_connected() override; + + /// pass the given message directly to our dispatchers + int send_message(Message *m) override; + + /// release our pointer to the peer connection. later calls to is_connected() + /// will return false, and send_message() will fail with -ENOTCONN + void mark_down() override; + + /// noop - keepalive messages are not needed within a process + void send_keepalive() override {} + + /// noop - reconnect/recovery semantics are not needed within a process + void mark_disposable() override {} +}; + +void DirectConnection::set_direct_reply_connection(ConnectionRef conn) +{ + reply_connection.store(conn.get()); +} + +bool DirectConnection::is_connected() +{ + // true between calls to set_direct_reply_connection() and mark_down() + return reply_connection.load() != nullptr; +} + +int DirectConnection::send_message(Message *m) +{ + // read reply_connection atomically and take a reference + ConnectionRef conn = reply_connection.load(); + if (!conn) { + m->put(); + return -ENOTCONN; + } + // attach reply_connection to the Message, so that calls to + // m->get_connection()->send_message() can be dispatched back to the sender + m->set_connection(conn); + + dispatchers->ds_dispatch(m); + return 0; +} + +void DirectConnection::mark_down() +{ + Connection *conn = reply_connection.load(); + if (!conn) { + return; // already marked down + } + if (!reply_connection.compare_exchange_weak(conn, nullptr)) { + return; // lost the race to mark down + } + // called only once to avoid loops + conn->mark_down(); +} + + +static ConnectionRef create_loopback(DirectMessenger *m, + entity_name_t name, + DispatchStrategy *dispatchers) +{ + auto loopback = ceph::make_ref<DirectConnection>(m->cct, m, dispatchers); + // loopback replies go to itself + loopback->set_direct_reply_connection(loopback); + loopback->set_peer_type(name.type()); + loopback->set_features(CEPH_FEATURES_ALL); + return loopback; +} + +DirectMessenger::DirectMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t nonce, + DispatchStrategy *dispatchers) + : SimplePolicyMessenger(cct, name, mname, nonce), + dispatchers(dispatchers), + loopback_connection(create_loopback(this, name, dispatchers)) +{ + dispatchers->set_messenger(this); +} + +DirectMessenger::~DirectMessenger() +{ +} + +int DirectMessenger::set_direct_peer(DirectMessenger *peer) +{ + if (get_myinst() == peer->get_myinst()) { + return -EADDRINUSE; // must have a different entity instance + } + peer_inst = peer->get_myinst(); + + // allocate a Connection that dispatches to the peer messenger + auto direct_connection = ceph::make_ref<DirectConnection>(cct, peer, peer->dispatchers.get()); + + direct_connection->set_peer_addr(peer_inst.addr); + direct_connection->set_peer_type(peer_inst.name.type()); + direct_connection->set_features(CEPH_FEATURES_ALL); + + // if set_direct_peer() was already called on the peer messenger, we can + // finish by attaching their connections. if not, the later call to + // peer->set_direct_peer() will attach their connection to ours + auto connection = peer->get_connection(get_myinst()); + if (connection) { + auto p = static_cast<DirectConnection*>(connection.get()); + + p->set_direct_reply_connection(direct_connection); + direct_connection->set_direct_reply_connection(p); + } + + peer_connection = std::move(direct_connection); + return 0; +} + +int DirectMessenger::bind(const entity_addr_t &bind_addr) +{ + if (peer_connection) { + return -EINVAL; // can't change address after sharing it with the peer + } + set_myaddr(bind_addr); + loopback_connection->set_peer_addr(bind_addr); + return 0; +} + +int DirectMessenger::client_bind(const entity_addr_t &bind_addr) +{ + // same as bind + return bind(bind_addr); +} + +int DirectMessenger::start() +{ + if (!peer_connection) { + return -EINVAL; // did not connect to a peer + } + if (started) { + return -EINVAL; // already started + } + + dispatchers->start(); + return SimplePolicyMessenger::start(); +} + +int DirectMessenger::shutdown() +{ + if (!started) { + return -EINVAL; // not started + } + + mark_down_all(); + peer_connection.reset(); + loopback_connection.reset(); + + dispatchers->shutdown(); + SimplePolicyMessenger::shutdown(); + sem.Put(); // signal wait() + return 0; +} + +void DirectMessenger::wait() +{ + sem.Get(); // wait on signal from shutdown() + dispatchers->wait(); +} + +ConnectionRef DirectMessenger::get_connection(const entity_inst_t& dst) +{ + if (dst == peer_inst) { + return peer_connection; + } + if (dst == get_myinst()) { + return loopback_connection; + } + return nullptr; +} + +ConnectionRef DirectMessenger::get_loopback_connection() +{ + return loopback_connection; +} + +int DirectMessenger::send_message(Message *m, const entity_inst_t& dst) +{ + auto conn = get_connection(dst); + if (!conn) { + m->put(); + return -ENOTCONN; + } + return conn->send_message(m); +} + +void DirectMessenger::mark_down(const entity_addr_t& addr) +{ + ConnectionRef conn; + if (addr == peer_inst.addr) { + conn = peer_connection; + } else if (addr == get_myaddr_legacy()) { + conn = loopback_connection; + } + if (conn) { + conn->mark_down(); + } +} + +void DirectMessenger::mark_down_all() +{ + if (peer_connection) { + peer_connection->mark_down(); + } + loopback_connection->mark_down(); +} diff --git a/src/test/direct_messenger/DirectMessenger.h b/src/test/direct_messenger/DirectMessenger.h new file mode 100644 index 000000000..710fcfb37 --- /dev/null +++ b/src/test/direct_messenger/DirectMessenger.h @@ -0,0 +1,98 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_DIRECTMESSENGER_H +#define CEPH_MSG_DIRECTMESSENGER_H + +#include "msg/SimplePolicyMessenger.h" +#include "common/Semaphore.h" + + +class DispatchStrategy; + +/** + * DirectMessenger provides a direct path between two messengers + * within a process. A pair of DirectMessengers share their + * DispatchStrategy with each other, and calls to send_message() + * forward the message directly to the other. + * + * This is for testing and i/o injection only, and cannot be used + * for normal messengers with ms_type. + */ +class DirectMessenger : public SimplePolicyMessenger { + private: + /// strategy for local dispatch + std::unique_ptr<DispatchStrategy> dispatchers; + /// peer instance for comparison in get_connection() + entity_inst_t peer_inst; + /// connection that sends to the peer's dispatchers + ConnectionRef peer_connection; + /// connection that sends to my own dispatchers + ConnectionRef loopback_connection; + /// semaphore for signalling wait() from shutdown() + Semaphore sem; + + public: + DirectMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t nonce, + DispatchStrategy *dispatchers); + ~DirectMessenger(); + + /// attach to a peer messenger. must be called before start() + int set_direct_peer(DirectMessenger *peer); + + + // Messenger interface + + /// sets the addr. must not be called after set_direct_peer() or start() + int bind(const entity_addr_t& bind_addr) override; + + /// sets the addr. must not be called after set_direct_peer() or start() + int client_bind(const entity_addr_t& bind_addr) override; + + /// starts dispatchers + int start() override; + + /// breaks connections, stops dispatchers, and unblocks callers of wait() + int shutdown() override; + + /// blocks until shutdown() completes + void wait() override; + + /// returns a connection to the peer instance, a loopback connection to our + /// own instance, or null if not connected + ConnectionRef get_connection(const entity_inst_t& dst) override; + + /// returns a loopback connection that dispatches to this messenger + ConnectionRef get_loopback_connection() override; + + /// dispatches a message to the peer instance if connected + int send_message(Message *m, const entity_inst_t& dst) override; + + /// mark down the connection for the given address + void mark_down(const entity_addr_t& a) override; + + /// mark down all connections + void mark_down_all() override; + + + // unimplemented Messenger interface + void set_addr_unknowns(const entity_addr_t &addr) override {} + void set_addr(const entity_addr_t &addr) override {} + int get_dispatch_queue_len() override { return 0; } + double get_dispatch_queue_max_age(utime_t now) override { return 0; } + void set_cluster_protocol(int p) override {} +}; + +#endif diff --git a/src/test/direct_messenger/DispatchStrategy.h b/src/test/direct_messenger/DispatchStrategy.h new file mode 100644 index 000000000..4c9726ed6 --- /dev/null +++ b/src/test/direct_messenger/DispatchStrategy.h @@ -0,0 +1,37 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef DISPATCH_STRATEGY_H +#define DISPATCH_STRATEGY_H + +#include "msg/Message.h" + +class Messenger; + +class DispatchStrategy +{ +protected: + Messenger *msgr = nullptr; +public: + DispatchStrategy() {} + Messenger *get_messenger() { return msgr; } + void set_messenger(Messenger *_msgr) { msgr = _msgr; } + virtual void ds_dispatch(Message *m) = 0; + virtual void shutdown() = 0; + virtual void start() = 0; + virtual void wait() = 0; + virtual ~DispatchStrategy() {} +}; + +#endif /* DISPATCH_STRATEGY_H */ diff --git a/src/test/direct_messenger/FastStrategy.h b/src/test/direct_messenger/FastStrategy.h new file mode 100644 index 000000000..001ff4004 --- /dev/null +++ b/src/test/direct_messenger/FastStrategy.h @@ -0,0 +1,35 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#ifndef FAST_STRATEGY_H +#define FAST_STRATEGY_H +#include "DispatchStrategy.h" + +class FastStrategy : public DispatchStrategy { +public: + FastStrategy() {} + void ds_dispatch(Message *m) override { + msgr->ms_fast_preprocess(m); + if (msgr->ms_can_fast_dispatch(m)) + msgr->ms_fast_dispatch(m); + else + msgr->ms_deliver_dispatch(m); + } + void shutdown() override {} + void start() override {} + void wait() override {} + virtual ~FastStrategy() {} +}; +#endif /* FAST_STRATEGY_H */ diff --git a/src/test/direct_messenger/QueueStrategy.cc b/src/test/direct_messenger/QueueStrategy.cc new file mode 100644 index 000000000..342494c5a --- /dev/null +++ b/src/test/direct_messenger/QueueStrategy.cc @@ -0,0 +1,107 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#include <string> +#include "QueueStrategy.h" +#define dout_subsys ceph_subsys_ms +#include "common/debug.h" + +QueueStrategy::QueueStrategy(int _n_threads) + : n_threads(_n_threads), + stop(false), + mqueue(), + disp_threads() +{ +} + +void QueueStrategy::ds_dispatch(Message *m) { + msgr->ms_fast_preprocess(m); + if (msgr->ms_can_fast_dispatch(m)) { + msgr->ms_fast_dispatch(m); + return; + } + std::lock_guard l{lock}; + mqueue.push_back(*m); + if (disp_threads.size()) { + if (! disp_threads.empty()) { + QSThread *thrd = &disp_threads.front(); + disp_threads.pop_front(); + thrd->cond.notify_all(); + } + } +} + +void QueueStrategy::entry(QSThread *thrd) +{ + for (;;) { + ceph::ref_t<Message> m; + std::unique_lock l{lock}; + for (;;) { + if (! mqueue.empty()) { + m = ceph::ref_t<Message>(&mqueue.front(), false); + mqueue.pop_front(); + break; + } + if (stop) + break; + disp_threads.push_front(*thrd); + thrd->cond.wait(l); + } + l.unlock(); + if (stop) { + if (!m) break; + continue; + } + get_messenger()->ms_deliver_dispatch(m); + } +} + +void QueueStrategy::shutdown() +{ + QSThread *thrd; + std::lock_guard l{lock}; + stop = true; + while (disp_threads.size()) { + thrd = &(disp_threads.front()); + disp_threads.pop_front(); + thrd->cond.notify_all(); + } +} + +void QueueStrategy::wait() +{ + std::unique_lock l{lock}; + ceph_assert(stop); + for (auto& thread : threads) { + l.unlock(); + + // join outside of lock + thread->join(); + + l.lock(); + } +} + +void QueueStrategy::start() +{ + ceph_assert(!stop); + std::lock_guard l{lock}; + threads.reserve(n_threads); + for (int ix = 0; ix < n_threads; ++ix) { + std::string thread_name = "ms_qs_"; + thread_name.append(std::to_string(ix)); + auto thrd = std::make_unique<QSThread>(this); + thrd->create(thread_name.c_str()); + threads.emplace_back(std::move(thrd)); + } +} diff --git a/src/test/direct_messenger/QueueStrategy.h b/src/test/direct_messenger/QueueStrategy.h new file mode 100644 index 000000000..b7f6df85d --- /dev/null +++ b/src/test/direct_messenger/QueueStrategy.h @@ -0,0 +1,63 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#ifndef QUEUE_STRATEGY_H +#define QUEUE_STRATEGY_H + +#include <vector> +#include <memory> +#include <boost/intrusive/list.hpp> +#include "DispatchStrategy.h" +#include "msg/Messenger.h" + +namespace bi = boost::intrusive; + +class QueueStrategy : public DispatchStrategy { + ceph::mutex lock = ceph::make_mutex("QueueStrategy::lock"); + const int n_threads; + bool stop; + + Message::Queue mqueue; + + class QSThread : public Thread { + public: + bi::list_member_hook<> thread_q; + QueueStrategy *dq; + ceph::condition_variable cond; + explicit QSThread(QueueStrategy *dq) : thread_q(), dq(dq) {} + void* entry() { + dq->entry(this); + return NULL; + } + + typedef bi::list< QSThread, + bi::member_hook< QSThread, + bi::list_member_hook<>, + &QSThread::thread_q > > Queue; + }; + + std::vector<std::unique_ptr<QSThread>> threads; //< all threads + QSThread::Queue disp_threads; //< waiting threads + +public: + explicit QueueStrategy(int n_threads); + void ds_dispatch(Message *m) override; + void shutdown() override; + void start() override; + void wait() override; + void entry(QSThread *thrd); + virtual ~QueueStrategy() {} +}; +#endif /* QUEUE_STRATEGY_H */ diff --git a/src/test/direct_messenger/test_direct_messenger.cc b/src/test/direct_messenger/test_direct_messenger.cc new file mode 100644 index 000000000..311fce161 --- /dev/null +++ b/src/test/direct_messenger/test_direct_messenger.cc @@ -0,0 +1,436 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <condition_variable> +#include <mutex> +#include <thread> + +#include <gtest/gtest.h> + +#include "global/global_init.h" +#include "common/ceph_argparse.h" + +#include "DirectMessenger.h" +#include "FastStrategy.h" +#include "QueueStrategy.h" +#include "messages/MPing.h" + + +/// mock dispatcher that calls the given callback +class MockDispatcher : public Dispatcher { + std::function<void(Message*)> callback; + public: + MockDispatcher(CephContext *cct, std::function<void(Message*)> callback) + : Dispatcher(cct), callback(std::move(callback)) {} + bool ms_handle_reset(Connection *con) override { return false; } + void ms_handle_remote_reset(Connection *con) override {} + bool ms_handle_refused(Connection *con) override { return false; } + bool ms_dispatch(Message *m) override { + callback(m); + m->put(); + return true; + } +}; + +/// test synchronous dispatch of messenger and connection interfaces +TEST(DirectMessenger, SyncDispatch) +{ + auto cct = g_ceph_context; + + // use FastStrategy for synchronous dispatch + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy()); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + bool got_request = false; + bool got_reply = false; + + MockDispatcher client_dispatcher(cct, [&] (Message *m) { + got_reply = true; + }); + client.add_dispatcher_head(&client_dispatcher); + + MockDispatcher server_dispatcher(cct, [&] (Message *m) { + got_request = true; + ASSERT_EQ(0, m->get_connection()->send_message(new MPing())); + }); + server.add_dispatcher_head(&server_dispatcher); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + // test DirectMessenger::send_message() + ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst())); + ASSERT_TRUE(got_request); + ASSERT_TRUE(got_reply); + + // test DirectConnection::send_message() + { + got_request = false; + got_reply = false; + auto conn = client.get_connection(server.get_myinst()); + ASSERT_EQ(0, conn->send_message(new MPing())); + ASSERT_TRUE(got_request); + ASSERT_TRUE(got_reply); + } + + // test DirectMessenger::send_message() with loopback address + got_request = false; + got_reply = false; + ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst())); + ASSERT_FALSE(got_request); // server should never see this + ASSERT_TRUE(got_reply); + + // test DirectConnection::send_message() with loopback address + { + got_request = false; + got_reply = false; + auto conn = client.get_connection(client.get_myinst()); + ASSERT_EQ(0, conn->send_message(new MPing())); + ASSERT_FALSE(got_request); // server should never see this + ASSERT_TRUE(got_reply); + } + + // test DirectConnection::send_message() with loopback connection + { + got_request = false; + got_reply = false; + auto conn = client.get_loopback_connection(); + ASSERT_EQ(0, conn->send_message(new MPing())); + ASSERT_FALSE(got_request); // server should never see this + ASSERT_TRUE(got_reply); + } + + ASSERT_EQ(0, client.shutdown()); + client.wait(); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test asynchronous dispatch of messenger and connection interfaces +TEST(DirectMessenger, AsyncDispatch) +{ + auto cct = g_ceph_context; + + // use QueueStrategy for async replies + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new QueueStrategy(1)); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + // condition variable to wait on ping reply + std::mutex mutex; + std::condition_variable cond; + bool done = false; + + auto wait_for_reply = [&] { + std::unique_lock<std::mutex> lock(mutex); + while (!done) { + cond.wait(lock); + } + done = false; // clear for reuse + }; + + // client dispatcher signals the condition variable on reply + MockDispatcher client_dispatcher(cct, [&] (Message *m) { + std::lock_guard<std::mutex> lock(mutex); + done = true; + cond.notify_one(); + }); + client.add_dispatcher_head(&client_dispatcher); + + MockDispatcher server_dispatcher(cct, [&] (Message *m) { + // hold the lock over the call to send_message() to prove that the client's + // dispatch is asynchronous. if it isn't, it will deadlock + std::lock_guard<std::mutex> lock(mutex); + ASSERT_EQ(0, m->get_connection()->send_message(new MPing())); + }); + server.add_dispatcher_head(&server_dispatcher); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + // test DirectMessenger::send_message() + ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst())); + wait_for_reply(); + + // test DirectConnection::send_message() + { + auto conn = client.get_connection(server.get_myinst()); + ASSERT_EQ(0, conn->send_message(new MPing())); + } + wait_for_reply(); + + // test DirectMessenger::send_message() with loopback address + { + // hold the lock to test that loopback dispatch is asynchronous + std::lock_guard<std::mutex> lock(mutex); + ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst())); + } + wait_for_reply(); + + // test DirectConnection::send_message() with loopback address + { + auto conn = client.get_connection(client.get_myinst()); + // hold the lock to test that loopback dispatch is asynchronous + std::lock_guard<std::mutex> lock(mutex); + ASSERT_EQ(0, conn->send_message(new MPing())); + } + wait_for_reply(); + + // test DirectConnection::send_message() with loopback connection + { + auto conn = client.get_loopback_connection(); + // hold the lock to test that loopback dispatch is asynchronous + std::lock_guard<std::mutex> lock(mutex); + ASSERT_EQ(0, conn->send_message(new MPing())); + } + wait_for_reply(); + + ASSERT_EQ(0, client.shutdown()); + client.wait(); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test that wait() blocks until shutdown() +TEST(DirectMessenger, WaitShutdown) +{ + auto cct = g_ceph_context; + + // test wait() with both Queue- and FastStrategy + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new QueueStrategy(1)); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + std::atomic<bool> client_waiting{false}; + std::atomic<bool> server_waiting{false}; + + // spawn threads to wait() on each of the messengers + std::thread client_thread([&] { + client_waiting = true; + client.wait(); + client_waiting = false; + }); + std::thread server_thread([&] { + server_waiting = true; + server.wait(); + server_waiting = false; + }); + + // give them time to start + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + ASSERT_TRUE(client_waiting); + ASSERT_TRUE(server_waiting); + + // call shutdown to unblock the waiting threads + ASSERT_EQ(0, client.shutdown()); + ASSERT_EQ(0, server.shutdown()); + + client_thread.join(); + server_thread.join(); + + ASSERT_FALSE(client_waiting); + ASSERT_FALSE(server_waiting); +} + +/// test connection and messenger interfaces after mark_down() +TEST(DirectMessenger, MarkDown) +{ + auto cct = g_ceph_context; + + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy()); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + auto client_to_server = client.get_connection(server.get_myinst()); + auto server_to_client = server.get_connection(client.get_myinst()); + + ASSERT_TRUE(client_to_server->is_connected()); + ASSERT_TRUE(server_to_client->is_connected()); + + // mark_down() breaks the connection on both sides + client_to_server->mark_down(); + + ASSERT_FALSE(client_to_server->is_connected()); + ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst())); + + ASSERT_FALSE(server_to_client->is_connected()); + ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client.get_myinst())); + + ASSERT_EQ(0, client.shutdown()); + client.wait(); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test connection and messenger interfaces after shutdown() +TEST(DirectMessenger, SendShutdown) +{ + auto cct = g_ceph_context; + + // put client on the heap so we can free it early + std::unique_ptr<DirectMessenger> client{ + new DirectMessenger(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy())}; + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client->set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(client.get())); + + ASSERT_EQ(0, client->start()); + ASSERT_EQ(0, server.start()); + + const auto client_inst = client->get_myinst(); + const auto server_inst = server.get_myinst(); + + auto client_to_server = client->get_connection(server_inst); + auto server_to_client = server.get_connection(client_inst); + + ASSERT_TRUE(client_to_server->is_connected()); + ASSERT_TRUE(server_to_client->is_connected()); + + // shut down the client to break connections + ASSERT_EQ(0, client->shutdown()); + client->wait(); + + ASSERT_FALSE(client_to_server->is_connected()); + ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, client->send_message(new MPing(), server_inst)); + + // free the client connection/messenger to test that calls to the server no + // longer try to dereference them + client_to_server.reset(); + client.reset(); + + ASSERT_FALSE(server_to_client->is_connected()); + ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client_inst)); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test connection and messenger interfaces after bind() +TEST(DirectMessenger, Bind) +{ + auto cct = g_ceph_context; + + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy()); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + entity_addr_t client_addr; + client_addr.set_family(AF_INET); + client_addr.set_port(1); + + // client bind succeeds before set_direct_peer() + ASSERT_EQ(0, client.bind(client_addr)); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + // server bind fails after set_direct_peer() + entity_addr_t empty_addr; + ASSERT_EQ(-EINVAL, server.bind(empty_addr)); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + auto client_to_server = client.get_connection(server.get_myinst()); + auto server_to_client = server.get_connection(client.get_myinst()); + + ASSERT_TRUE(client_to_server->is_connected()); + ASSERT_TRUE(server_to_client->is_connected()); + + // no address in connection to server + ASSERT_EQ(empty_addr, client_to_server->get_peer_addr()); + // bind address is reflected in connection to client + ASSERT_EQ(client_addr, server_to_client->get_peer_addr()); + + // mark_down() with bind address breaks the connection + server.mark_down(client_addr); + + ASSERT_FALSE(client_to_server->is_connected()); + ASSERT_FALSE(server_to_client->is_connected()); + + ASSERT_EQ(0, client.shutdown()); + client.wait(); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test connection and messenger interfaces before calls to set_direct_peer() +TEST(DirectMessenger, StartWithoutPeer) +{ + auto cct = g_ceph_context; + + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy()); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + // can't start until set_direct_peer() + ASSERT_EQ(-EINVAL, client.start()); + ASSERT_EQ(-EINVAL, server.start()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + + // only client can start + ASSERT_EQ(0, client.start()); + ASSERT_EQ(-EINVAL, server.start()); + + // client has a connection but can't send + auto conn = client.get_connection(server.get_myinst()); + ASSERT_NE(nullptr, conn); + ASSERT_FALSE(conn->is_connected()); + ASSERT_EQ(-ENOTCONN, conn->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst())); + + ASSERT_EQ(0, client.shutdown()); + client.wait(); +} + +int main(int argc, char **argv) +{ + // command-line arguments + auto args = argv_to_vec(argc, argv); + + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_ANY, + CODE_ENVIRONMENT_DAEMON, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + common_init_finish(cct.get()); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |