diff options
Diffstat (limited to '')
-rw-r--r-- | src/test/msgr/CMakeLists.txt | 58 | ||||
-rw-r--r-- | src/test/msgr/perf_msgr_client.cc | 219 | ||||
-rw-r--r-- | src/test/msgr/perf_msgr_server.cc | 176 | ||||
-rw-r--r-- | src/test/msgr/test_async_driver.cc | 354 | ||||
-rw-r--r-- | src/test/msgr/test_async_networkstack.cc | 1072 | ||||
-rw-r--r-- | src/test/msgr/test_comp_registry.cc | 98 | ||||
-rw-r--r-- | src/test/msgr/test_frames_v2.cc | 483 | ||||
-rw-r--r-- | src/test/msgr/test_msgr.cc | 2424 | ||||
-rw-r--r-- | src/test/msgr/test_userspace_event.cc | 174 |
9 files changed, 5058 insertions, 0 deletions
diff --git a/src/test/msgr/CMakeLists.txt b/src/test/msgr/CMakeLists.txt new file mode 100644 index 000000000..beaa7133d --- /dev/null +++ b/src/test/msgr/CMakeLists.txt @@ -0,0 +1,58 @@ +# ceph_test_async_driver +add_executable(ceph_test_async_driver + test_async_driver.cc + $<TARGET_OBJECTS:unit-main> + ) +target_link_libraries(ceph_test_async_driver os global ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS} ${UNITTEST_LIBS}) + +# ceph_test_msgr +add_executable(ceph_test_msgr + test_msgr.cc + ) +target_link_libraries(ceph_test_msgr os global ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS} ${UNITTEST_LIBS}) + +# ceph_test_async_networkstack +add_executable(ceph_test_async_networkstack + test_async_networkstack.cc + $<TARGET_OBJECTS:unit-main> + ) +target_link_libraries(ceph_test_async_networkstack global ${CRYPTO_LIBS} ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS} ${UNITTEST_LIBS}) + +#ceph_perf_msgr_server +add_executable(ceph_perf_msgr_server perf_msgr_server.cc) +target_link_libraries(ceph_perf_msgr_server os global ${UNITTEST_LIBS}) + +#ceph_perf_msgr_client +add_executable(ceph_perf_msgr_client perf_msgr_client.cc) +target_link_libraries(ceph_perf_msgr_client os global ${UNITTEST_LIBS}) + +# unitttest_frames_v2 +add_executable(unittest_frames_v2 test_frames_v2.cc) +add_ceph_unittest(unittest_frames_v2) +target_link_libraries(unittest_frames_v2 os global ${UNITTEST_LIBS}) + +add_executable(unittest_comp_registry + test_comp_registry.cc + $<TARGET_OBJECTS:unit-main> + ) +add_ceph_unittest(unittest_comp_registry) +target_link_libraries(unittest_comp_registry global) + +# test_userspace_event +if(HAVE_DPDK) + add_executable(ceph_test_userspace_event + test_userspace_event.cc + $<TARGET_OBJECTS:unit-main>) + target_link_libraries(ceph_test_userspace_event + global + ${CMAKE_DL_LIBS} + ${UNITTEST_LIBS}) +endif(HAVE_DPDK) + +install(TARGETS + ceph_test_async_driver + ceph_test_msgr + ceph_test_async_networkstack + ceph_perf_msgr_server + ceph_perf_msgr_client + DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/test/msgr/perf_msgr_client.cc b/src/test/msgr/perf_msgr_client.cc new file mode 100644 index 000000000..ffbfc1614 --- /dev/null +++ b/src/test/msgr/perf_msgr_client.cc @@ -0,0 +1,219 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <stdlib.h> +#include <stdint.h> +#include <string> +#include <unistd.h> +#include <iostream> + +using namespace std; + +#include "common/ceph_argparse.h" +#include "common/debug.h" +#include "common/Cycles.h" +#include "global/global_init.h" +#include "msg/Messenger.h" +#include "messages/MOSDOp.h" +#include "auth/DummyAuth.h" + +#include <atomic> + +class MessengerClient { + class ClientThread; + class ClientDispatcher : public Dispatcher { + uint64_t think_time; + ClientThread *thread; + + public: + ClientDispatcher(uint64_t delay, ClientThread *t): Dispatcher(g_ceph_context), think_time(delay), thread(t) {} + bool ms_can_fast_dispatch_any() const override { return true; } + bool ms_can_fast_dispatch(const Message *m) const override { + switch (m->get_type()) { + case CEPH_MSG_OSD_OPREPLY: + return true; + default: + return false; + } + } + + void ms_handle_fast_connect(Connection *con) override {} + void ms_handle_fast_accept(Connection *con) override {} + bool ms_dispatch(Message *m) override { return true; } + void ms_fast_dispatch(Message *m) override; + bool ms_handle_reset(Connection *con) override { return true; } + void ms_handle_remote_reset(Connection *con) override {} + bool ms_handle_refused(Connection *con) override { return false; } + int ms_handle_fast_authentication(Connection *con) override { + return 1; + } + }; + + class ClientThread : public Thread { + Messenger *msgr; + int concurrent; + ConnectionRef conn; + std::atomic<unsigned> client_inc = { 0 }; + object_t oid; + object_locator_t oloc; + pg_t pgid; + int msg_len; + bufferlist data; + int ops; + ClientDispatcher dispatcher; + + public: + ceph::mutex lock = ceph::make_mutex("MessengerBenchmark::ClientThread::lock"); + ceph::condition_variable cond; + uint64_t inflight; + + ClientThread(Messenger *m, int c, ConnectionRef con, int len, int ops, int think_time_us): + msgr(m), concurrent(c), conn(con), oid("object-name"), oloc(1, 1), msg_len(len), ops(ops), + dispatcher(think_time_us, this), inflight(0) { + m->add_dispatcher_head(&dispatcher); + bufferptr ptr(msg_len); + memset(ptr.c_str(), 0, msg_len); + data.append(ptr); + } + void *entry() override { + std::unique_lock locker{lock}; + for (int i = 0; i < ops; ++i) { + if (inflight > uint64_t(concurrent)) { + cond.wait(locker); + } + hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(), + oloc.nspace); + spg_t spgid(pgid); + MOSDOp *m = new MOSDOp(client_inc, 0, hobj, spgid, 0, 0, 0); + bufferlist msg_data(data); + m->write(0, msg_len, msg_data); + inflight++; + conn->send_message(m); + //cerr << __func__ << " send m=" << m << std::endl; + } + locker.unlock(); + msgr->shutdown(); + return 0; + } + }; + + string type; + string serveraddr; + int think_time_us; + vector<Messenger*> msgrs; + vector<ClientThread*> clients; + DummyAuthClientServer dummy_auth; + + public: + MessengerClient(const string &t, const string &addr, int delay): + type(t), serveraddr(addr), think_time_us(delay), + dummy_auth(g_ceph_context) { + } + ~MessengerClient() { + for (uint64_t i = 0; i < clients.size(); ++i) + delete clients[i]; + for (uint64_t i = 0; i < msgrs.size(); ++i) { + msgrs[i]->shutdown(); + msgrs[i]->wait(); + } + } + void ready(int c, int jobs, int ops, int msg_len) { + entity_addr_t addr; + addr.parse(serveraddr.c_str()); + addr.set_nonce(0); + dummy_auth.auth_registry.refresh_config(); + for (int i = 0; i < jobs; ++i) { + Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i); + msgr->set_default_policy(Messenger::Policy::lossless_client(0)); + msgr->set_auth_client(&dummy_auth); + msgr->start(); + entity_addrvec_t addrs(addr); + ConnectionRef conn = msgr->connect_to_osd(addrs); + ClientThread *t = new ClientThread(msgr, c, conn, msg_len, ops, think_time_us); + msgrs.push_back(msgr); + clients.push_back(t); + } + usleep(1000*1000); + } + void start() { + for (uint64_t i = 0; i < clients.size(); ++i) + clients[i]->create("client"); + for (uint64_t i = 0; i < msgrs.size(); ++i) + msgrs[i]->wait(); + } +}; + +void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message *m) { + usleep(think_time); + m->put(); + std::lock_guard l{thread->lock}; + thread->inflight--; + thread->cond.notify_all(); +} + + +void usage(const string &name) { + cout << "Usage: " << name << " [server ip:port] [numjobs] [concurrency] [ios] [thinktime us] [msg length]" << std::endl; + cout << " [server ip:port]: connect to the ip:port pair" << std::endl; + cout << " [numjobs]: how much client threads spawned and do benchmark" << std::endl; + cout << " [concurrency]: the max inflight messages(like iodepth in fio)" << std::endl; + cout << " [ios]: how much messages sent for each client" << std::endl; + cout << " [thinktime]: sleep time when do fast dispatching(match client logic)" << std::endl; + cout << " [msg length]: message data bytes" << std::endl; +} + +int main(int argc, char **argv) +{ + auto args = argv_to_vec(argc, argv); + + auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + common_init_finish(g_ceph_context); + g_ceph_context->_conf.apply_changes(nullptr); + + if (args.size() < 6) { + usage(argv[0]); + return 1; + } + + int numjobs = atoi(args[1]); + int concurrent = atoi(args[2]); + int ios = atoi(args[3]); + int think_time = atoi(args[4]); + int len = atoi(args[5]); + + std::string public_msgr_type = g_ceph_context->_conf->ms_public_type.empty() ? g_ceph_context->_conf.get_val<std::string>("ms_type") : g_ceph_context->_conf->ms_public_type; + + cout << " using ms-public-type " << public_msgr_type << std::endl; + cout << " server ip:port " << args[0] << std::endl; + cout << " numjobs " << numjobs << std::endl; + cout << " concurrency " << concurrent << std::endl; + cout << " ios " << ios << std::endl; + cout << " thinktime(us) " << think_time << std::endl; + cout << " message data bytes " << len << std::endl; + + MessengerClient client(public_msgr_type, args[0], think_time); + + client.ready(concurrent, numjobs, ios, len); + Cycles::init(); + uint64_t start = Cycles::rdtsc(); + client.start(); + uint64_t stop = Cycles::rdtsc(); + cout << " Total op " << (ios * numjobs) << " run time " << Cycles::to_microseconds(stop - start) << "us." << std::endl; + + return 0; +} diff --git a/src/test/msgr/perf_msgr_server.cc b/src/test/msgr/perf_msgr_server.cc new file mode 100644 index 000000000..0c492ab17 --- /dev/null +++ b/src/test/msgr/perf_msgr_server.cc @@ -0,0 +1,176 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <stdlib.h> +#include <stdint.h> +#include <string> +#include <unistd.h> +#include <iostream> + +using namespace std; + +#include "common/ceph_argparse.h" +#include "common/debug.h" +#include "common/WorkQueue.h" +#include "global/global_init.h" +#include "msg/Messenger.h" +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" +#include "auth/DummyAuth.h" + +class ServerDispatcher : public Dispatcher { + uint64_t think_time; + ThreadPool op_tp; + class OpWQ : public ThreadPool::WorkQueue<Message> { + list<Message*> messages; + + public: + OpWQ(ceph::timespan timeout, ceph::timespan suicide_timeout, ThreadPool *tp) + : ThreadPool::WorkQueue<Message>("ServerDispatcher::OpWQ", timeout, suicide_timeout, tp) {} + + bool _enqueue(Message *m) override { + messages.push_back(m); + return true; + } + void _dequeue(Message *m) override { + ceph_abort(); + } + bool _empty() override { + return messages.empty(); + } + Message *_dequeue() override { + if (messages.empty()) + return NULL; + Message *m = messages.front(); + messages.pop_front(); + return m; + } + void _process(Message *m, ThreadPool::TPHandle &handle) override { + MOSDOp *osd_op = static_cast<MOSDOp*>(m); + MOSDOpReply *reply = new MOSDOpReply(osd_op, 0, 0, 0, false); + m->get_connection()->send_message(reply); + m->put(); + } + void _process_finish(Message *m) override { } + void _clear() override { + ceph_assert(messages.empty()); + } + } op_wq; + + public: + ServerDispatcher(int threads, uint64_t delay): Dispatcher(g_ceph_context), think_time(delay), + op_tp(g_ceph_context, "ServerDispatcher::op_tp", "tp_serv_disp", threads, "serverdispatcher_op_threads"), + op_wq(ceph::make_timespan(30), ceph::make_timespan(30), &op_tp) { + op_tp.start(); + } + ~ServerDispatcher() override { + op_tp.stop(); + } + bool ms_can_fast_dispatch_any() const override { return true; } + bool ms_can_fast_dispatch(const Message *m) const override { + switch (m->get_type()) { + case CEPH_MSG_OSD_OP: + return true; + default: + return false; + } + } + + void ms_handle_fast_connect(Connection *con) override {} + void ms_handle_fast_accept(Connection *con) override {} + bool ms_dispatch(Message *m) override { return true; } + bool ms_handle_reset(Connection *con) override { return true; } + void ms_handle_remote_reset(Connection *con) override {} + bool ms_handle_refused(Connection *con) override { return false; } + void ms_fast_dispatch(Message *m) override { + usleep(think_time); + //cerr << __func__ << " reply message=" << m << std::endl; + op_wq.queue(m); + } + int ms_handle_fast_authentication(Connection *con) override { + return 1; + } +}; + +class MessengerServer { + Messenger *msgr; + string type; + string bindaddr; + ServerDispatcher dispatcher; + DummyAuthClientServer dummy_auth; + + public: + MessengerServer(const string &t, const string &addr, int threads, int delay): + msgr(NULL), type(t), bindaddr(addr), dispatcher(threads, delay), + dummy_auth(g_ceph_context) { + msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0); + msgr->set_default_policy(Messenger::Policy::stateless_server(0)); + dummy_auth.auth_registry.refresh_config(); + msgr->set_auth_server(&dummy_auth); + } + ~MessengerServer() { + msgr->shutdown(); + msgr->wait(); + } + void start() { + entity_addr_t addr; + addr.parse(bindaddr.c_str()); + msgr->bind(addr); + msgr->add_dispatcher_head(&dispatcher); + msgr->start(); + msgr->wait(); + } +}; + +void usage(const string &name) { + cerr << "Usage: " << name << " [bind ip:port] [server worker threads] [thinktime us]" << std::endl; + cerr << " [bind ip:port]: The ip:port pair to bind, client need to specify this pair to connect" << std::endl; + cerr << " [server worker threads]: threads will process incoming messages and reply(matching pg threads)" << std::endl; + cerr << " [thinktime]: sleep time when do dispatching(match fast dispatch logic in OSD.cc)" << std::endl; +} + +int main(int argc, char **argv) +{ + auto args = argv_to_vec(argc, argv); + + auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + common_init_finish(g_ceph_context); + g_ceph_context->_conf.apply_changes(nullptr); + + if (args.size() < 3) { + usage(argv[0]); + return 1; + } + + int worker_threads = atoi(args[1]); + int think_time = atoi(args[2]); + std::string public_msgr_type = g_ceph_context->_conf->ms_public_type.empty() ? g_ceph_context->_conf.get_val<std::string>("ms_type") : g_ceph_context->_conf->ms_public_type; + + cerr << " This tool won't handle connection error alike things, " << std::endl; + cerr << "please ensure the proper network environment to test." << std::endl; + cerr << " Or ctrl+c when meeting error and restart tests" << std::endl; + cerr << " using ms-public-type " << public_msgr_type << std::endl; + cerr << " bind ip:port " << args[0] << std::endl; + cerr << " worker threads " << worker_threads << std::endl; + cerr << " thinktime(us) " << think_time << std::endl; + + MessengerServer server(public_msgr_type, args[0], worker_threads, think_time); + server.start(); + + return 0; +} diff --git a/src/test/msgr/test_async_driver.cc b/src/test/msgr/test_async_driver.cc new file mode 100644 index 000000000..6cf4211b6 --- /dev/null +++ b/src/test/msgr/test_async_driver.cc @@ -0,0 +1,354 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifdef __APPLE__ +#include <AvailabilityMacros.h> +#endif + +#include <fcntl.h> +#include <sys/socket.h> +#include <pthread.h> +#include <stdint.h> +#include <arpa/inet.h> +#include "include/Context.h" +#include "common/ceph_mutex.h" +#include "common/Cond.h" +#include "global/global_init.h" +#include "common/ceph_argparse.h" +#include "msg/async/Event.h" + +#include <atomic> + +// We use epoll, kqueue, evport, select in descending order by performance. +#if defined(__linux__) +#define HAVE_EPOLL 1 +#endif + +#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__) +#define HAVE_KQUEUE 1 +#endif + +#ifdef __sun +#include <sys/feature_tests.h> +#ifdef _DTRACE_VERSION +#define HAVE_EVPORT 1 +#endif +#endif + +#ifdef HAVE_EPOLL +#include "msg/async/EventEpoll.h" +#endif +#ifdef HAVE_KQUEUE +#include "msg/async/EventKqueue.h" +#endif +#include "msg/async/EventSelect.h" + +#include <gtest/gtest.h> + +using namespace std; + +class EventDriverTest : public ::testing::TestWithParam<const char*> { + public: + EventDriver *driver; + + EventDriverTest(): driver(0) {} + void SetUp() override { + cerr << __func__ << " start set up " << GetParam() << std::endl; +#ifdef HAVE_EPOLL + if (strcmp(GetParam(), "epoll")) + driver = new EpollDriver(g_ceph_context); +#endif +#ifdef HAVE_KQUEUE + if (strcmp(GetParam(), "kqueue")) + driver = new KqueueDriver(g_ceph_context); +#endif + if (strcmp(GetParam(), "select")) + driver = new SelectDriver(g_ceph_context); + driver->init(NULL, 100); + } + void TearDown() override { + delete driver; + } +}; + +int set_nonblock(int sd) +{ + int flags; + + /* Set the socket nonblocking. + * Note that fcntl(2) for F_GETFL and F_SETFL can't be + * interrupted by a signal. */ + if ((flags = fcntl(sd, F_GETFL)) < 0 ) { + return -1; + } + if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) { + return -1; + } + return 0; +} + + +TEST_P(EventDriverTest, PipeTest) { + int fds[2]; + vector<FiredFileEvent> fired_events; + int r; + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 1; + + r = pipe(fds); + ASSERT_EQ(r, 0); + r = driver->add_event(fds[0], EVENT_NONE, EVENT_READABLE); + ASSERT_EQ(r, 0); + r = driver->event_wait(fired_events, &tv); + ASSERT_EQ(r, 0); + + char c = 'A'; + r = write(fds[1], &c, sizeof(c)); + ASSERT_EQ(r, 1); + r = driver->event_wait(fired_events, &tv); + ASSERT_EQ(r, 1); + ASSERT_EQ(fired_events[0].fd, fds[0]); + + + fired_events.clear(); + r = write(fds[1], &c, sizeof(c)); + ASSERT_EQ(r, 1); + r = driver->event_wait(fired_events, &tv); + ASSERT_EQ(r, 1); + ASSERT_EQ(fired_events[0].fd, fds[0]); + + fired_events.clear(); + driver->del_event(fds[0], EVENT_READABLE, EVENT_READABLE); + r = write(fds[1], &c, sizeof(c)); + ASSERT_EQ(r, 1); + r = driver->event_wait(fired_events, &tv); + ASSERT_EQ(r, 0); +} + +void* echoclient(void *arg) +{ + intptr_t port = (intptr_t)arg; + struct sockaddr_in sa; + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_port = htons(port); + char addr[] = "127.0.0.1"; + int r = inet_pton(AF_INET, addr, &sa.sin_addr); + ceph_assert(r == 1); + + int connect_sd = ::socket(AF_INET, SOCK_STREAM, 0); + if (connect_sd >= 0) { + r = connect(connect_sd, (struct sockaddr*)&sa, sizeof(sa)); + ceph_assert(r == 0); + int t = 0; + + do { + char c[] = "banner"; + r = write(connect_sd, c, sizeof(c)); + char d[100]; + r = read(connect_sd, d, sizeof(d)); + if (r == 0) + break; + if (t++ == 30) + break; + } while (1); + ::close(connect_sd); + } + return 0; +} + +TEST_P(EventDriverTest, NetworkSocketTest) { + int listen_sd = ::socket(AF_INET, SOCK_STREAM, 0); + ASSERT_TRUE(listen_sd > 0); + int on = 1; + int r = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + ASSERT_EQ(r, 0); + r = set_nonblock(listen_sd); + ASSERT_EQ(r, 0); + struct sockaddr_in sa; + long port = 0; + for (port = 38788; port < 40000; port++) { + memset(&sa,0,sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_port = htons(port); + sa.sin_addr.s_addr = htonl(INADDR_ANY); + + r = ::bind(listen_sd, (struct sockaddr *)&sa, sizeof(sa)); + if (r == 0) { + break; + } + } + ASSERT_EQ(r, 0); + r = listen(listen_sd, 511); + ASSERT_EQ(r, 0); + + vector<FiredFileEvent> fired_events; + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 1; + r = driver->add_event(listen_sd, EVENT_NONE, EVENT_READABLE); + ASSERT_EQ(r, 0); + r = driver->event_wait(fired_events, &tv); + ASSERT_EQ(r, 0); + + fired_events.clear(); + pthread_t thread1; + r = pthread_create(&thread1, NULL, echoclient, (void*)(intptr_t)port); + ASSERT_EQ(r, 0); + tv.tv_sec = 5; + tv.tv_usec = 0; + r = driver->event_wait(fired_events, &tv); + ASSERT_EQ(r, 1); + ASSERT_EQ(fired_events[0].fd, listen_sd); + + fired_events.clear(); + int client_sd = ::accept(listen_sd, NULL, NULL); + ASSERT_TRUE(client_sd > 0); + r = driver->add_event(client_sd, EVENT_NONE, EVENT_READABLE); + ASSERT_EQ(r, 0); + + do { + fired_events.clear(); + tv.tv_sec = 5; + tv.tv_usec = 0; + r = driver->event_wait(fired_events, &tv); + ASSERT_EQ(1, r); + ASSERT_EQ(EVENT_READABLE, fired_events[0].mask); + + fired_events.clear(); + char data[100]; + r = ::read(client_sd, data, sizeof(data)); + if (r == 0) + break; + ASSERT_GT(r, 0); + r = driver->add_event(client_sd, EVENT_READABLE, EVENT_WRITABLE); + ASSERT_EQ(0, r); + r = driver->event_wait(fired_events, &tv); + ASSERT_EQ(1, r); + ASSERT_EQ(fired_events[0].mask, EVENT_WRITABLE); + r = write(client_sd, data, strlen(data)); + ASSERT_EQ((int)strlen(data), r); + driver->del_event(client_sd, EVENT_READABLE|EVENT_WRITABLE, + EVENT_WRITABLE); + } while (1); + + ::close(client_sd); + ::close(listen_sd); +} + +class FakeEvent : public EventCallback { + + public: + void do_request(uint64_t fd_or_id) override {} +}; + +TEST(EventCenterTest, FileEventExpansion) { + vector<int> sds; + EventCenter center(g_ceph_context); + center.init(100, 0, "posix"); + center.set_owner(); + EventCallbackRef e(new FakeEvent()); + for (int i = 0; i < 300; i++) { + int sd = ::socket(AF_INET, SOCK_STREAM, 0); + center.create_file_event(sd, EVENT_READABLE, e); + sds.push_back(sd); + } + + for (vector<int>::iterator it = sds.begin(); it != sds.end(); ++it) + center.delete_file_event(*it, EVENT_READABLE); +} + + +class Worker : public Thread { + CephContext *cct; + bool done; + + public: + EventCenter center; + explicit Worker(CephContext *c, int idx): cct(c), done(false), center(c) { + center.init(100, idx, "posix"); + } + void stop() { + done = true; + center.wakeup(); + } + void* entry() override { + center.set_owner(); + while (!done) + center.process_events(1000000); + return 0; + } +}; + +class CountEvent: public EventCallback { + std::atomic<unsigned> *count; + ceph::mutex *lock; + ceph::condition_variable *cond; + + public: + CountEvent(std::atomic<unsigned> *atomic, + ceph::mutex *l, ceph::condition_variable *c) + : count(atomic), lock(l), cond(c) {} + void do_request(uint64_t id) override { + std::scoped_lock l{*lock}; + (*count)--; + cond->notify_all(); + } +}; + +TEST(EventCenterTest, DispatchTest) { + Worker worker1(g_ceph_context, 1), worker2(g_ceph_context, 2); + std::atomic<unsigned> count = { 0 }; + ceph::mutex lock = ceph::make_mutex("DispatchTest::lock"); + ceph::condition_variable cond; + worker1.create("worker_1"); + worker2.create("worker_2"); + for (int i = 0; i < 10000; ++i) { + count++; + worker1.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond))); + count++; + worker2.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond))); + std::unique_lock l{lock}; + cond.wait(l, [&] { return count == 0; }); + } + worker1.stop(); + worker2.stop(); + worker1.join(); + worker2.join(); +} + +INSTANTIATE_TEST_SUITE_P( + AsyncMessenger, + EventDriverTest, + ::testing::Values( +#ifdef HAVE_EPOLL + "epoll", +#endif +#ifdef HAVE_KQUEUE + "kqueue", +#endif + "select" + ) +); + +/* + * Local Variables: + * compile-command: "cd ../.. ; make ceph_test_async_driver && + * ./ceph_test_async_driver + * + * End: + */ diff --git a/src/test/msgr/test_async_networkstack.cc b/src/test/msgr/test_async_networkstack.cc new file mode 100644 index 000000000..14923fd7d --- /dev/null +++ b/src/test/msgr/test_async_networkstack.cc @@ -0,0 +1,1072 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSky <haomai@xsky.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <algorithm> +#include <atomic> +#include <iostream> +#include <list> +#include <random> +#include <string> +#include <set> +#include <vector> +#include <gtest/gtest.h> + +#include "acconfig.h" +#include "common/config_obs.h" +#include "include/Context.h" +#include "msg/async/Event.h" +#include "msg/async/Stack.h" + +using namespace std; + +class NoopConfigObserver : public md_config_obs_t { + std::list<std::string> options; + const char **ptrs = 0; + +public: + NoopConfigObserver(std::list<std::string> l) : options(l) { + ptrs = new const char*[options.size() + 1]; + unsigned j = 0; + for (auto& i : options) { + ptrs[j++] = i.c_str(); + } + ptrs[j] = 0; + } + ~NoopConfigObserver() { + delete[] ptrs; + } + + const char** get_tracked_conf_keys() const override { + return ptrs; + } + void handle_conf_change(const ConfigProxy& conf, + const std::set <std::string> &changed) override { + } +}; + +class NetworkWorkerTest : public ::testing::TestWithParam<const char*> { + public: + std::shared_ptr<NetworkStack> stack; + string addr, port_addr; + + NoopConfigObserver fake_obs = {{"ms_type", + "ms_dpdk_coremask", + "ms_dpdk_host_ipv4_addr", + "ms_dpdk_gateway_ipv4_addr", + "ms_dpdk_netmask_ipv4_addr"}}; + + NetworkWorkerTest() {} + void SetUp() override { + cerr << __func__ << " start set up " << GetParam() << std::endl; + if (strncmp(GetParam(), "dpdk", 4)) { + g_ceph_context->_conf.set_val("ms_type", "async+posix"); + addr = "127.0.0.1:15000"; + port_addr = "127.0.0.1:15001"; + } else { + g_ceph_context->_conf.set_val_or_die("ms_dpdk_debug_allow_loopback", "true"); + g_ceph_context->_conf.set_val_or_die("ms_async_op_threads", "2"); + string ipv4_addr = g_ceph_context->_conf.get_val<std::string>("ms_dpdk_host_ipv4_addr"); + addr = ipv4_addr + std::string(":15000"); + port_addr = ipv4_addr + std::string(":15001"); + } + stack = NetworkStack::create(g_ceph_context, GetParam()); + stack->start(); + } + void TearDown() override { + stack->stop(); + } + string get_addr() const { + return addr; + } + string get_ip_different_port() const { + return port_addr; + } + string get_different_ip() const { + return "10.0.123.100:4323"; + } + EventCenter *get_center(unsigned i) { + return &stack->get_worker(i)->center; + } + Worker *get_worker(unsigned i) { + return stack->get_worker(i); + } + template<typename func> + class C_dispatch : public EventCallback { + Worker *worker; + func f; + std::atomic_bool done; + public: + C_dispatch(Worker *w, func &&_f): worker(w), f(std::move(_f)), done(false) {} + void do_request(uint64_t id) override { + f(worker); + done = true; + } + void wait() { + int us = 1000 * 1000 * 1000; + while (!done) { + ASSERT_TRUE(us > 0); + usleep(100); + us -= 100; + } + } + }; + template<typename func> + void exec_events(func &&f) { + std::vector<C_dispatch<func>*> dis; + for (unsigned i = 0; i < stack->get_num_worker(); ++i) { + Worker *w = stack->get_worker(i); + C_dispatch<func> *e = new C_dispatch<func>(w, std::move(f)); + stack->get_worker(i)->center.dispatch_event_external(e); + dis.push_back(e); + } + + for (auto &&e : dis) { + e->wait(); + delete e; + } + } +}; + +class C_poll : public EventCallback { + EventCenter *center; + std::atomic<bool> woken; + static const int sleepus = 500; + + public: + explicit C_poll(EventCenter *c): center(c), woken(false) {} + void do_request(uint64_t r) override { + woken = true; + } + bool poll(int milliseconds) { + auto start = ceph::coarse_real_clock::now(); + while (!woken) { + center->process_events(sleepus); + usleep(sleepus); + auto r = std::chrono::duration_cast<std::chrono::milliseconds>( + ceph::coarse_real_clock::now() - start); + if (r >= std::chrono::milliseconds(milliseconds)) + break; + } + return woken; + } + void reset() { + woken = false; + } +}; + +TEST_P(NetworkWorkerTest, SimpleTest) { + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + std::atomic_bool accepted(false); + std::atomic_bool *accepted_p = &accepted; + + exec_events([this, accepted_p, bind_addr](Worker *worker) mutable { + entity_addr_t cli_addr; + SocketOptions options; + ServerSocket bind_socket; + EventCenter *center = &worker->center; + ssize_t r = 0; + if (stack->support_local_listen_table() || worker->id == 0) + r = worker->listen(bind_addr, 0, options, &bind_socket); + ASSERT_EQ(0, r); + + ConnectedSocket cli_socket, srv_socket; + if (worker->id == 0) { + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + } + + bool is_my_accept = false; + if (bind_socket) { + C_poll cb(center); + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + if (cb.poll(500)) { + *accepted_p = true; + is_my_accept = true; + } + ASSERT_TRUE(*accepted_p); + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + + if (is_my_accept) { + r = bind_socket.accept(&srv_socket, options, &cli_addr, worker); + ASSERT_EQ(0, r); + ASSERT_TRUE(srv_socket.fd() > 0); + } + + if (worker->id == 0) { + C_poll cb(center); + center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb); + r = cli_socket.is_connected(); + if (r == 0) { + ASSERT_EQ(true, cb.poll(500)); + r = cli_socket.is_connected(); + } + ASSERT_EQ(1, r); + center->delete_file_event(cli_socket.fd(), EVENT_READABLE); + } + + const char *message = "this is a new message"; + int len = strlen(message); + bufferlist bl; + bl.append(message, len); + if (worker->id == 0) { + r = cli_socket.send(bl, false); + ASSERT_EQ(len, r); + } + + char buf[1024]; + C_poll cb(center); + if (is_my_accept) { + center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb); + { + r = srv_socket.read(buf, sizeof(buf)); + while (r == -EAGAIN) { + ASSERT_TRUE(cb.poll(500)); + r = srv_socket.read(buf, sizeof(buf)); + cb.reset(); + } + ASSERT_EQ(len, r); + ASSERT_EQ(0, memcmp(buf, message, len)); + } + bind_socket.abort_accept(); + } + if (worker->id == 0) { + cli_socket.shutdown(); + // ack delay is 200 ms + } + + bl.clear(); + bl.append(message, len); + if (worker->id == 0) { + r = cli_socket.send(bl, false); + ASSERT_EQ(-EPIPE, r); + } + if (is_my_accept) { + cb.reset(); + ASSERT_TRUE(cb.poll(500)); + r = srv_socket.read(buf, sizeof(buf)); + if (r == -EAGAIN) { + cb.reset(); + ASSERT_TRUE(cb.poll(1000*500)); + r = srv_socket.read(buf, sizeof(buf)); + } + ASSERT_EQ(0, r); + center->delete_file_event(srv_socket.fd(), EVENT_READABLE); + srv_socket.close(); + } + }); +} + +TEST_P(NetworkWorkerTest, ConnectFailedTest) { + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + + exec_events([this, bind_addr](Worker *worker) mutable { + EventCenter *center = &worker->center; + entity_addr_t cli_addr; + SocketOptions options; + ServerSocket bind_socket; + int r = 0; + if (stack->support_local_listen_table() || worker->id == 0) + r = worker->listen(bind_addr, 0, options, &bind_socket); + ASSERT_EQ(0, r); + + ConnectedSocket cli_socket1, cli_socket2; + if (worker->id == 0) { + ASSERT_TRUE(cli_addr.parse(get_ip_different_port().c_str())); + r = worker->connect(cli_addr, options, &cli_socket1); + ASSERT_EQ(0, r); + C_poll cb(center); + center->create_file_event(cli_socket1.fd(), EVENT_READABLE, &cb); + r = cli_socket1.is_connected(); + if (r == 0) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket1.is_connected(); + } + ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET); + } + + if (worker->id == 1) { + ASSERT_TRUE(cli_addr.parse(get_different_ip().c_str())); + r = worker->connect(cli_addr, options, &cli_socket2); + ASSERT_EQ(0, r); + C_poll cb(center); + center->create_file_event(cli_socket2.fd(), EVENT_READABLE, &cb); + r = cli_socket2.is_connected(); + if (r == 0) { + cb.poll(500); + r = cli_socket2.is_connected(); + } + ASSERT_TRUE(r != 1); + center->delete_file_event(cli_socket2.fd(), EVENT_READABLE); + } + }); +} + +TEST_P(NetworkWorkerTest, ListenTest) { + Worker *worker = get_worker(0); + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + SocketOptions options; + ServerSocket bind_socket1, bind_socket2; + int r = worker->listen(bind_addr, 0, options, &bind_socket1); + ASSERT_EQ(0, r); + + r = worker->listen(bind_addr, 0, options, &bind_socket2); + ASSERT_EQ(-EADDRINUSE, r); +} + +TEST_P(NetworkWorkerTest, AcceptAndCloseTest) { + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + std::atomic_bool accepted(false); + std::atomic_bool *accepted_p = &accepted; + std::atomic_int unbind_count(stack->get_num_worker()); + std::atomic_int *count_p = &unbind_count; + exec_events([this, bind_addr, accepted_p, count_p](Worker *worker) mutable { + SocketOptions options; + EventCenter *center = &worker->center; + entity_addr_t cli_addr; + int r = 0; + { + ServerSocket bind_socket; + if (stack->support_local_listen_table() || worker->id == 0) + r = worker->listen(bind_addr, 0, options, &bind_socket); + ASSERT_EQ(0, r); + + ConnectedSocket srv_socket, cli_socket; + if (bind_socket) { + r = bind_socket.accept(&srv_socket, options, &cli_addr, worker); + ASSERT_EQ(-EAGAIN, r); + } + + C_poll cb(center); + if (worker->id == 0) { + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + ASSERT_TRUE(cb.poll(500)); + } + + if (bind_socket) { + cb.reset(); + cb.poll(500); + ConnectedSocket srv_socket2; + do { + r = bind_socket.accept(&srv_socket2, options, &cli_addr, worker); + usleep(100); + } while (r == -EAGAIN && !*accepted_p); + if (r == 0) + *accepted_p = true; + ASSERT_TRUE(*accepted_p); + // srv_socket2 closed + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + + if (worker->id == 0) { + char buf[100]; + cb.reset(); + center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb); + int i = 3; + while (!i--) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket.read(buf, sizeof(buf)); + if (r == 0) + break; + } + ASSERT_EQ(0, r); + center->delete_file_event(cli_socket.fd(), EVENT_READABLE); + } + + if (bind_socket) + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + if (worker->id == 0) { + *accepted_p = false; + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + cb.reset(); + ASSERT_TRUE(cb.poll(500)); + cli_socket.close(); + } + + if (bind_socket) { + do { + r = bind_socket.accept(&srv_socket, options, &cli_addr, worker); + usleep(100); + } while (r == -EAGAIN && !*accepted_p); + if (r == 0) + *accepted_p = true; + ASSERT_TRUE(*accepted_p); + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + // unbind + } + + --*count_p; + while (*count_p > 0) + usleep(100); + + ConnectedSocket cli_socket; + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + { + C_poll cb(center); + center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb); + r = cli_socket.is_connected(); + if (r == 0) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket.is_connected(); + } + ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET); + } + }); +} + +TEST_P(NetworkWorkerTest, ComplexTest) { + entity_addr_t bind_addr; + std::atomic_bool listen_done(false); + std::atomic_bool *listen_p = &listen_done; + std::atomic_bool accepted(false); + std::atomic_bool *accepted_p = &accepted; + std::atomic_bool done(false); + std::atomic_bool *done_p = &done; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + exec_events([this, bind_addr, listen_p, accepted_p, done_p](Worker *worker) mutable { + entity_addr_t cli_addr; + EventCenter *center = &worker->center; + SocketOptions options; + ServerSocket bind_socket; + int r = 0; + if (stack->support_local_listen_table() || worker->id == 0) { + r = worker->listen(bind_addr, 0, options, &bind_socket); + ASSERT_EQ(0, r); + *listen_p = true; + } + ConnectedSocket cli_socket, srv_socket; + if (worker->id == 1) { + while (!*listen_p || stack->support_local_listen_table()) { + usleep(50); + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + if (stack->support_local_listen_table()) + break; + } + } + + if (bind_socket) { + C_poll cb(center); + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + int count = 3; + while (count--) { + if (cb.poll(500)) { + r = bind_socket.accept(&srv_socket, options, &cli_addr, worker); + ASSERT_EQ(0, r); + *accepted_p = true; + break; + } + } + ASSERT_TRUE(*accepted_p); + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + + if (worker->id == 1) { + C_poll cb(center); + center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb); + r = cli_socket.is_connected(); + if (r == 0) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket.is_connected(); + } + ASSERT_EQ(1, r); + center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE); + } + + const size_t message_size = 10240; + size_t count = 100; + string message(message_size, '!'); + for (size_t i = 0; i < message_size; i += 100) + message[i] = ','; + size_t len = message_size * count; + C_poll cb(center); + if (worker->id == 1) + center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb); + if (srv_socket) + center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb); + size_t left = len; + len *= 2; + string read_string; + int again_count = 0; + int c = 2; + bufferlist bl; + for (size_t i = 0; i < count; ++i) + bl.push_back(bufferptr((char*)message.data(), message_size)); + while (!*done_p) { + again_count = 0; + if (worker->id == 1) { + if (c > 0) { + ssize_t r = 0; + usleep(100); + if (left > 0) { + r = cli_socket.send(bl, false); + ASSERT_TRUE(r >= 0 || r == -EAGAIN); + if (r > 0) + left -= r; + if (r == -EAGAIN) + ++again_count; + } + if (left == 0) { + --c; + left = message_size * count; + ASSERT_EQ(0U, bl.length()); + for (size_t i = 0; i < count; ++i) + bl.push_back(bufferptr((char*)message.data(), message_size)); + } + } + } + + if (srv_socket) { + char buf[1000]; + if (len > 0) { + r = srv_socket.read(buf, sizeof(buf)); + ASSERT_TRUE(r > 0 || r == -EAGAIN); + if (r > 0) { + read_string.append(buf, r); + len -= r; + } else if (r == -EAGAIN) { + ++again_count; + } + } + if (len == 0) { + for (size_t i = 0; i < read_string.size(); i += message_size) + ASSERT_EQ(0, memcmp(read_string.c_str()+i, message.c_str(), message_size)); + *done_p = true; + } + } + if (again_count) { + cb.reset(); + cb.poll(500); + } + } + if (worker->id == 1) + center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE); + if (srv_socket) + center->delete_file_event(srv_socket.fd(), EVENT_READABLE); + + if (bind_socket) + bind_socket.abort_accept(); + if (srv_socket) + srv_socket.close(); + if (worker->id == 1) + cli_socket.close(); + }); +} + +class StressFactory { + struct Client; + struct Server; + struct ThreadData { + Worker *worker; + std::set<Client*> clients; + std::set<Server*> servers; + ~ThreadData() { + for (auto && i : clients) + delete i; + for (auto && i : servers) + delete i; + } + }; + + struct RandomString { + size_t slen; + vector<std::string> strs; + std::random_device rd; + std::default_random_engine rng; + + explicit RandomString(size_t s): slen(s), rng(rd()) {} + void prepare(size_t n) { + static const char alphabet[] = + "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "0123456789"; + + std::uniform_int_distribution<> dist( + 0, sizeof(alphabet) / sizeof(*alphabet) - 2); + + strs.reserve(n); + std::generate_n( + std::back_inserter(strs), strs.capacity(), [&] { + std::string str; + str.reserve(slen); + std::generate_n(std::back_inserter(str), slen, [&]() { + return alphabet[dist(rng)]; + }); + return str; + } + ); + } + std::string &get_random_string() { + std::uniform_int_distribution<> dist( + 0, strs.size() - 1); + return strs[dist(rng)]; + } + }; + struct Message { + size_t idx; + size_t len; + std::string content; + + explicit Message(RandomString &rs, size_t i, size_t l): idx(i) { + size_t slen = rs.slen; + len = std::max(slen, l); + + std::vector<std::string> strs; + strs.reserve(len / slen); + std::generate_n( + std::back_inserter(strs), strs.capacity(), [&] { + return rs.get_random_string(); + } + ); + len = slen * strs.size(); + content.reserve(len); + for (auto &&s : strs) + content.append(s); + } + bool verify(const char *b, size_t len = 0) const { + return content.compare(0, len, b, 0, len) == 0; + } + }; + + template <typename T> + class C_delete : public EventCallback { + T *ctxt; + public: + explicit C_delete(T *c): ctxt(c) {} + void do_request(uint64_t id) override { + delete ctxt; + delete this; + } + }; + + class Client { + StressFactory *factory; + EventCenter *center; + ConnectedSocket socket; + std::deque<StressFactory::Message*> acking; + std::deque<StressFactory::Message*> writings; + std::string buffer; + size_t index = 0; + size_t left; + bool write_enabled = false; + size_t read_offset = 0, write_offset = 0; + bool first = true; + bool dead = false; + StressFactory::Message homeless_message; + + class Client_read_handle : public EventCallback { + Client *c; + public: + explicit Client_read_handle(Client *_c): c(_c) {} + void do_request(uint64_t id) override { + c->do_read_request(); + } + } read_ctxt; + + class Client_write_handle : public EventCallback { + Client *c; + public: + explicit Client_write_handle(Client *_c): c(_c) {} + void do_request(uint64_t id) override { + c->do_write_request(); + } + } write_ctxt; + + public: + Client(StressFactory *f, EventCenter *cen, ConnectedSocket s, size_t c) + : factory(f), center(cen), socket(std::move(s)), left(c), homeless_message(factory->rs, -1, 1024), + read_ctxt(this), write_ctxt(this) { + center->create_file_event( + socket.fd(), EVENT_READABLE, &read_ctxt); + center->dispatch_event_external(&read_ctxt); + } + void close() { + ASSERT_FALSE(write_enabled); + dead = true; + socket.shutdown(); + center->delete_file_event(socket.fd(), EVENT_READABLE); + center->dispatch_event_external(new C_delete<Client>(this)); + } + + void do_read_request() { + if (dead) + return ; + ASSERT_TRUE(socket.is_connected() >= 0); + if (!socket.is_connected()) + return ; + ASSERT_TRUE(!acking.empty() || first); + if (first) { + first = false; + center->dispatch_event_external(&write_ctxt); + if (acking.empty()) + return ; + } + StressFactory::Message *m = acking.front(); + int r = 0; + if (buffer.empty()) + buffer.resize(m->len); + bool must_no = false; + while (true) { + r = socket.read((char*)buffer.data() + read_offset, + m->len - read_offset); + ASSERT_TRUE(r == -EAGAIN || r > 0); + if (r == -EAGAIN) + break; + read_offset += r; + + std::cerr << " client " << this << " receive " << m->idx << " len " << r << " content: " << std::endl; + ASSERT_FALSE(must_no); + if ((m->len - read_offset) == 0) { + ASSERT_TRUE(m->verify(buffer.data(), 0)); + delete m; + acking.pop_front(); + read_offset = 0; + buffer.clear(); + if (acking.empty()) { + m = &homeless_message; + must_no = true; + } else { + m = acking.front(); + buffer.resize(m->len); + } + } + } + if (acking.empty()) { + center->dispatch_event_external(&write_ctxt); + return ; + } + } + + void do_write_request() { + if (dead) + return ; + ASSERT_TRUE(socket.is_connected() > 0); + + while (left > 0 && factory->queue_depth > writings.size() + acking.size()) { + StressFactory::Message *m = new StressFactory::Message( + factory->rs, ++index, + factory->rd() % factory->max_message_length); + std::cerr << " client " << this << " generate message " << m->idx << " length " << m->len << std::endl; + ASSERT_EQ(m->len, m->content.size()); + writings.push_back(m); + --left; + --factory->message_left; + } + + while (!writings.empty()) { + StressFactory::Message *m = writings.front(); + bufferlist bl; + bl.append(m->content.data() + write_offset, m->content.size() - write_offset); + ssize_t r = socket.send(bl, false); + if (r == 0) + break; + std::cerr << " client " << this << " send " << m->idx << " len " << r << " content: " << std::endl; + ASSERT_TRUE(r >= 0); + write_offset += r; + if (write_offset == m->content.size()) { + write_offset = 0; + writings.pop_front(); + acking.push_back(m); + } + } + if (writings.empty() && write_enabled) { + center->delete_file_event(socket.fd(), EVENT_WRITABLE); + write_enabled = false; + } else if (!writings.empty() && !write_enabled) { + ASSERT_EQ(0, center->create_file_event( + socket.fd(), EVENT_WRITABLE, &write_ctxt)); + write_enabled = true; + } + } + + bool finish() const { + return left == 0 && acking.empty() && writings.empty(); + } + }; + friend class Client; + + class Server { + StressFactory *factory; + EventCenter *center; + ConnectedSocket socket; + std::deque<std::string> buffers; + bool write_enabled = false; + bool dead = false; + + class Server_read_handle : public EventCallback { + Server *s; + public: + explicit Server_read_handle(Server *_s): s(_s) {} + void do_request(uint64_t id) override { + s->do_read_request(); + } + } read_ctxt; + + class Server_write_handle : public EventCallback { + Server *s; + public: + explicit Server_write_handle(Server *_s): s(_s) {} + void do_request(uint64_t id) override { + s->do_write_request(); + } + } write_ctxt; + + public: + Server(StressFactory *f, EventCenter *c, ConnectedSocket s): + factory(f), center(c), socket(std::move(s)), read_ctxt(this), write_ctxt(this) { + center->create_file_event(socket.fd(), EVENT_READABLE, &read_ctxt); + center->dispatch_event_external(&read_ctxt); + } + void close() { + ASSERT_FALSE(write_enabled); + socket.shutdown(); + center->delete_file_event(socket.fd(), EVENT_READABLE); + center->dispatch_event_external(new C_delete<Server>(this)); + } + void do_read_request() { + if (dead) + return ; + int r = 0; + while (true) { + char buf[4096]; + bufferptr data; + r = socket.read(buf, sizeof(buf)); + ASSERT_TRUE(r == -EAGAIN || (r >= 0 && (size_t)r <= sizeof(buf))); + if (r == 0) { + ASSERT_TRUE(buffers.empty()); + dead = true; + return ; + } else if (r == -EAGAIN) + break; + buffers.emplace_back(buf, 0, r); + std::cerr << " server " << this << " receive " << r << " content: " << std::endl; + } + if (!buffers.empty() && !write_enabled) + center->dispatch_event_external(&write_ctxt); + } + + void do_write_request() { + if (dead) + return ; + + while (!buffers.empty()) { + bufferlist bl; + auto it = buffers.begin(); + for (size_t i = 0; i < buffers.size(); ++i) { + bl.push_back(bufferptr((char*)it->data(), it->size())); + ++it; + } + + ssize_t r = socket.send(bl, false); + std::cerr << " server " << this << " send " << r << std::endl; + if (r == 0) + break; + ASSERT_TRUE(r >= 0); + while (r > 0) { + ASSERT_TRUE(!buffers.empty()); + string &buffer = buffers.front(); + if (r >= (int)buffer.size()) { + r -= (int)buffer.size(); + buffers.pop_front(); + } else { + std::cerr << " server " << this << " sent " << r << std::endl; + buffer = buffer.substr(r, buffer.size()); + break; + } + } + } + if (buffers.empty()) { + if (write_enabled) { + center->delete_file_event(socket.fd(), EVENT_WRITABLE); + write_enabled = false; + } + } else if (!write_enabled) { + ASSERT_EQ(0, center->create_file_event( + socket.fd(), EVENT_WRITABLE, &write_ctxt)); + write_enabled = true; + } + } + + bool finish() { + return dead; + } + }; + friend class Server; + + class C_accept : public EventCallback { + StressFactory *factory; + ServerSocket bind_socket; + ThreadData *t_data; + Worker *worker; + + public: + C_accept(StressFactory *f, ServerSocket s, ThreadData *data, Worker *w) + : factory(f), bind_socket(std::move(s)), t_data(data), worker(w) {} + void do_request(uint64_t id) override { + while (true) { + entity_addr_t cli_addr; + ConnectedSocket srv_socket; + SocketOptions options; + int r = bind_socket.accept(&srv_socket, options, &cli_addr, worker); + if (r == -EAGAIN) { + break; + } + ASSERT_EQ(0, r); + ASSERT_TRUE(srv_socket.fd() > 0); + Server *cb = new Server(factory, &t_data->worker->center, std::move(srv_socket)); + t_data->servers.insert(cb); + } + } + }; + friend class C_accept; + + public: + static const size_t min_client_send_messages = 100; + static const size_t max_client_send_messages = 1000; + std::shared_ptr<NetworkStack> stack; + RandomString rs; + std::random_device rd; + const size_t client_num, queue_depth, max_message_length; + atomic_int message_count, message_left; + entity_addr_t bind_addr; + std::atomic_bool already_bind = {false}; + SocketOptions options; + + explicit StressFactory(const std::shared_ptr<NetworkStack> &s, const string &addr, + size_t cli, size_t qd, size_t mc, size_t l) + : stack(s), rs(128), client_num(cli), queue_depth(qd), + max_message_length(l), message_count(mc), message_left(mc) { + bind_addr.parse(addr.c_str()); + rs.prepare(100); + } + ~StressFactory() { + } + + void add_client(ThreadData *t_data) { + static ceph::mutex lock = ceph::make_mutex("add_client_lock"); + std::lock_guard l{lock}; + ConnectedSocket sock; + int r = t_data->worker->connect(bind_addr, options, &sock); + std::default_random_engine rng(rd()); + std::uniform_int_distribution<> dist( + min_client_send_messages, max_client_send_messages); + ASSERT_EQ(0, r); + int c = dist(rng); + if (c > message_count.load()) + c = message_count.load(); + Client *cb = new Client(this, &t_data->worker->center, std::move(sock), c); + t_data->clients.insert(cb); + message_count -= c; + } + + void drop_client(ThreadData *t_data, Client *c) { + c->close(); + ASSERT_EQ(1U, t_data->clients.erase(c)); + } + + void drop_server(ThreadData *t_data, Server *s) { + s->close(); + ASSERT_EQ(1U, t_data->servers.erase(s)); + } + + void start(Worker *worker) { + int r = 0; + ThreadData t_data; + t_data.worker = worker; + ServerSocket bind_socket; + if (stack->support_local_listen_table() || worker->id == 0) { + r = worker->listen(bind_addr, 0, options, &bind_socket); + ASSERT_EQ(0, r); + already_bind = true; + } + while (!already_bind) + usleep(50); + C_accept *accept_handler = nullptr; + int bind_fd = 0; + if (bind_socket) { + bind_fd = bind_socket.fd(); + accept_handler = new C_accept(this, std::move(bind_socket), &t_data, worker); + ASSERT_EQ(0, worker->center.create_file_event( + bind_fd, EVENT_READABLE, accept_handler)); + } + + int echo_throttle = message_count; + while (message_count > 0 || !t_data.clients.empty() || !t_data.servers.empty()) { + if (message_count > 0 && t_data.clients.size() < client_num && t_data.servers.size() < client_num) + add_client(&t_data); + for (auto &&c : t_data.clients) { + if (c->finish()) { + drop_client(&t_data, c); + break; + } + } + for (auto &&s : t_data.servers) { + if (s->finish()) { + drop_server(&t_data, s); + break; + } + } + + worker->center.process_events(1); + if (echo_throttle > message_left) { + std::cerr << " clients " << t_data.clients.size() << " servers " << t_data.servers.size() + << " message count " << message_left << std::endl; + echo_throttle -= 100; + } + } + if (bind_fd) + worker->center.delete_file_event(bind_fd, EVENT_READABLE); + delete accept_handler; + } +}; + +TEST_P(NetworkWorkerTest, StressTest) { + StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024); + StressFactory *f = &factory; + exec_events([f](Worker *worker) mutable { + f->start(worker); + }); + ASSERT_EQ(0, factory.message_left); +} + + +INSTANTIATE_TEST_SUITE_P( + NetworkStack, + NetworkWorkerTest, + ::testing::Values( +#ifdef HAVE_DPDK + "dpdk", +#endif + "posix" + ) +); + +/* + * Local Variables: + * compile-command: "cd ../.. ; make ceph_test_async_networkstack && + * ./ceph_test_async_networkstack + * + * End: + */ diff --git a/src/test/msgr/test_comp_registry.cc b/src/test/msgr/test_comp_registry.cc new file mode 100644 index 000000000..d5513e2e4 --- /dev/null +++ b/src/test/msgr/test_comp_registry.cc @@ -0,0 +1,98 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/types.h" +#include "include/stringify.h" +#include "compressor/Compressor.h" +#include "msg/compressor_registry.h" +#include "gtest/gtest.h" +#include "common/ceph_context.h" +#include "global/global_context.h" + +#include <sstream> + +TEST(CompressorRegistry, con_modes) +{ + auto cct = g_ceph_context; + CompressorRegistry reg(cct); + std::vector<uint32_t> methods; + uint32_t method; + uint32_t mode; + + const std::vector<uint32_t> snappy_method = { Compressor::COMP_ALG_SNAPPY }; + const std::vector<uint32_t> zlib_method = { Compressor::COMP_ALG_ZLIB }; + const std::vector<uint32_t> both_methods = { Compressor::COMP_ALG_ZLIB, Compressor::COMP_ALG_SNAPPY}; + const std::vector<uint32_t> no_method = { Compressor::COMP_ALG_NONE }; + + cct->_conf.set_val( + "enable_experimental_unrecoverable_data_corrupting_features", "*"); + + // baseline: compression for communication with osd is enabled + cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT); + cct->_conf.set_val("ms_osd_compress_mode", "force"); + cct->_conf.set_val("ms_osd_compression_algorithm", "snappy"); + cct->_conf.set_val("ms_compress_secure", "false"); + cct->_conf.apply_changes(NULL); + + ASSERT_EQ(reg.get_is_compress_secure(), false); + + methods = reg.get_methods(CEPH_ENTITY_TYPE_MON); + ASSERT_EQ(methods.size(), 0); + method = reg.pick_method(CEPH_ENTITY_TYPE_MON, both_methods); + ASSERT_EQ(method, Compressor::COMP_ALG_NONE); + mode = reg.get_mode(CEPH_ENTITY_TYPE_MON, false); + ASSERT_EQ(mode, Compressor::COMP_NONE); + + methods = reg.get_methods(CEPH_ENTITY_TYPE_OSD); + ASSERT_EQ(methods, snappy_method); + const std::vector<uint32_t> rev_both_methods (both_methods.rbegin(), both_methods.rend()); + method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, rev_both_methods); + ASSERT_EQ(method, Compressor::COMP_ALG_SNAPPY); + mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false); + ASSERT_EQ(mode, Compressor::COMP_FORCE); + mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, true); + ASSERT_EQ(mode, Compressor::COMP_NONE); + + method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, zlib_method); + ASSERT_EQ(method, Compressor::COMP_ALG_NONE); + + // disable compression mode + cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT); + cct->_conf.set_val("ms_osd_compress_mode", "none"); + cct->_conf.apply_changes(NULL); + + mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false); + ASSERT_EQ(mode, Compressor::COMP_NONE); + + // no compression methods + cct->_conf.set_val("ms_osd_compress_mode", "force"); + cct->_conf.set_val("ms_osd_compression_algorithm", "none"); + cct->_conf.apply_changes(NULL); + + method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, both_methods); + ASSERT_EQ(method, Compressor::COMP_ALG_NONE); + + // min compression size + cct->_conf.set_val("ms_osd_compress_min_size", "1024"); + cct->_conf.apply_changes(NULL); + + uint32_t s = reg.get_min_compression_size(CEPH_ENTITY_TYPE_OSD); + ASSERT_EQ(s, 1024); + + // allow secure with compression + cct->_conf.set_val("ms_osd_compress_mode", "force"); + cct->_conf.set_val("ms_osd_compression_algorithm", "snappy"); + cct->_conf.set_val("ms_compress_secure", "true"); + cct->_conf.apply_changes(NULL); + + ASSERT_EQ(reg.get_is_compress_secure(), true); + + mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, true); + ASSERT_EQ(mode, Compressor::COMP_FORCE); + + mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false); + ASSERT_EQ(mode, Compressor::COMP_FORCE); + + // back to normalish, for the benefit of the next test(s) + cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT); +} diff --git a/src/test/msgr/test_frames_v2.cc b/src/test/msgr/test_frames_v2.cc new file mode 100644 index 000000000..c5be32a9c --- /dev/null +++ b/src/test/msgr/test_frames_v2.cc @@ -0,0 +1,483 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "msg/async/frames_v2.h" + +#include <numeric> +#include <ostream> +#include <string> +#include <tuple> + +#include "msg/async/compression_meta.h" +#include "auth/Auth.h" +#include "common/ceph_argparse.h" +#include "global/global_init.h" +#include "global/global_context.h" +#include "include/Context.h" + +#include <gtest/gtest.h> + +#define COMP_THRESHOLD 1 << 10 +#define EXPECT_COMPRESSED(is_compressed, val1, val2) \ + if (is_compressed && val1 > COMP_THRESHOLD) { \ + EXPECT_GE(val1, val2); \ + } else { \ + EXPECT_EQ(val1, val2); \ + } + +using namespace std; + +namespace ceph::msgr::v2 { + +// MessageFrame with the first segment not fixed to ceph_msg_header2 +struct TestFrame : Frame<TestFrame, + /* four segments */ + segment_t::DEFAULT_ALIGNMENT, + segment_t::DEFAULT_ALIGNMENT, + segment_t::DEFAULT_ALIGNMENT, + segment_t::PAGE_SIZE_ALIGNMENT> { + static constexpr Tag tag = static_cast<Tag>(123); + + static TestFrame Encode(const bufferlist& header, + const bufferlist& front, + const bufferlist& middle, + const bufferlist& data) { + TestFrame f; + f.segments[SegmentIndex::Msg::HEADER] = header; + f.segments[SegmentIndex::Msg::FRONT] = front; + f.segments[SegmentIndex::Msg::MIDDLE] = middle; + f.segments[SegmentIndex::Msg::DATA] = data; + + // discard cached crcs for perf tests + f.segments[SegmentIndex::Msg::HEADER].invalidate_crc(); + f.segments[SegmentIndex::Msg::FRONT].invalidate_crc(); + f.segments[SegmentIndex::Msg::MIDDLE].invalidate_crc(); + f.segments[SegmentIndex::Msg::DATA].invalidate_crc(); + return f; + } + + static TestFrame Decode(segment_bls_t& segment_bls) { + TestFrame f; + // Transfer segments' bufferlists. If segment_bls contains + // less than SegmentsNumV segments, the missing ones will be + // seen as empty. + for (size_t i = 0; i < segment_bls.size(); i++) { + f.segments[i] = std::move(segment_bls[i]); + } + return f; + } + + bufferlist& header() { + return segments[SegmentIndex::Msg::HEADER]; + } + bufferlist& front() { + return segments[SegmentIndex::Msg::FRONT]; + } + bufferlist& middle() { + return segments[SegmentIndex::Msg::MIDDLE]; + } + bufferlist& data() { + return segments[SegmentIndex::Msg::DATA]; + } + +protected: + using Frame::Frame; +}; + +struct mode_t { + bool is_rev1; + bool is_secure; + bool is_compress; +}; + +static std::ostream& operator<<(std::ostream& os, const mode_t& m) { + os << "msgr2." << (m.is_rev1 ? "1" : "0") + << (m.is_secure ? "-secure" : "-crc") + << (m.is_compress ? "-compress": "-nocompress"); + return os; +} + +static const mode_t modes[] = { + {false, false, false}, + {false, true, false}, + {true, false, false}, + {true, true, false}, + {false, false, true}, + {false, true, true}, + {true, false, true}, + {true, true, true} +}; + +struct round_trip_instance_t { + uint32_t header_len; + uint32_t front_len; + uint32_t middle_len; + uint32_t data_len; + + // expected number of segments (same for each mode) + size_t num_segments; + // expected layout (different for each mode) + uint32_t onwire_lens[4][MAX_NUM_SEGMENTS + 2]; +}; + +static std::ostream& operator<<(std::ostream& os, + const round_trip_instance_t& rti) { + os << rti.header_len << "+" << rti.front_len << "+" + << rti.middle_len << "+" << rti.data_len; + return os; +} + +static bufferlist make_bufferlist(size_t len, char c) { + bufferlist bl; + if (len > 0) { + bl.reserve(len); + bl.append(std::string(len, c)); + } + return bl; +} + +bool disassemble_frame(FrameAssembler& frame_asm, bufferlist& frame_bl, + Tag& tag, segment_bls_t& segment_bls) { + bufferlist preamble_bl; + frame_bl.splice(0, frame_asm.get_preamble_onwire_len(), &preamble_bl); + tag = frame_asm.disassemble_preamble(preamble_bl); + + do { + size_t seg_idx = segment_bls.size(); + segment_bls.emplace_back(); + + uint32_t onwire_len = frame_asm.get_segment_onwire_len(seg_idx); + if (onwire_len > 0) { + frame_bl.splice(0, onwire_len, &segment_bls.back()); + } + } while (segment_bls.size() < frame_asm.get_num_segments()); + + bufferlist epilogue_bl; + uint32_t epilogue_onwire_len = frame_asm.get_epilogue_onwire_len(); + if (epilogue_onwire_len > 0) { + frame_bl.splice(0, epilogue_onwire_len, &epilogue_bl); + } + + return frame_asm.disassemble_segments(preamble_bl, segment_bls.data(), epilogue_bl); +} + +class RoundTripTestBase : public ::testing::TestWithParam< + std::tuple<round_trip_instance_t, mode_t>> { +protected: + RoundTripTestBase() + : m_tx_frame_asm(&m_tx_crypto, std::get<1>(GetParam()).is_rev1, true, + &m_tx_comp), + m_rx_frame_asm(&m_rx_crypto, std::get<1>(GetParam()).is_rev1, true, + &m_rx_comp), + m_header(make_bufferlist(std::get<0>(GetParam()).header_len, 'H')), + m_front(make_bufferlist(std::get<0>(GetParam()).front_len, 'F')), + m_middle(make_bufferlist(std::get<0>(GetParam()).middle_len, 'M')), + m_data(make_bufferlist(std::get<0>(GetParam()).data_len, 'D')) { + const auto& m = std::get<1>(GetParam()); + if (m.is_secure) { + AuthConnectionMeta auth_meta; + auth_meta.con_mode = CEPH_CON_MODE_SECURE; + // see AuthConnectionMeta::get_connection_secret_length() + auth_meta.connection_secret.resize(64); + g_ceph_context->random()->get_bytes(auth_meta.connection_secret.data(), + auth_meta.connection_secret.size()); + m_tx_crypto = ceph::crypto::onwire::rxtx_t::create_handler_pair( + g_ceph_context, auth_meta, /*new_nonce_format=*/m.is_rev1, + /*crossed=*/false); + m_rx_crypto = ceph::crypto::onwire::rxtx_t::create_handler_pair( + g_ceph_context, auth_meta, /*new_nonce_format=*/m.is_rev1, + /*crossed=*/true); + } + + if (m.is_compress) { + CompConnectionMeta comp_meta; + comp_meta.con_mode = Compressor::COMP_FORCE; + comp_meta.con_method = Compressor::COMP_ALG_SNAPPY; + m_tx_comp = ceph::compression::onwire::rxtx_t::create_handler_pair( + g_ceph_context, comp_meta, /*min_compress_size=*/COMP_THRESHOLD + ); + m_rx_comp = ceph::compression::onwire::rxtx_t::create_handler_pair( + g_ceph_context, comp_meta, /*min_compress_size=*/COMP_THRESHOLD + ); + } + } + + void check_frame_assembler(const FrameAssembler& frame_asm) { + const auto& [rti, m] = GetParam(); + const auto& onwire_lens = rti.onwire_lens[m.is_rev1 << 1 | m.is_secure]; + + EXPECT_COMPRESSED(m.is_compress, rti.header_len + rti.front_len + rti.middle_len + rti.data_len, + frame_asm.get_frame_logical_len()); + ASSERT_EQ(rti.num_segments, frame_asm.get_num_segments()); + EXPECT_COMPRESSED(m.is_compress, onwire_lens[0], frame_asm.get_preamble_onwire_len()); + for (size_t i = 0; i < rti.num_segments; i++) { + EXPECT_COMPRESSED(m.is_compress, onwire_lens[i + 1], frame_asm.get_segment_onwire_len(i)); + } + EXPECT_COMPRESSED(m.is_compress, onwire_lens[rti.num_segments + 1], + frame_asm.get_epilogue_onwire_len()); + EXPECT_COMPRESSED(m.is_compress, + std::accumulate(std::begin(onwire_lens), std::end(onwire_lens), + uint64_t(0)), + frame_asm.get_frame_onwire_len()); + } + + void test_round_trip() { + auto tx_frame = TestFrame::Encode(m_header, m_front, m_middle, m_data); + auto onwire_bl = tx_frame.get_buffer(m_tx_frame_asm); + check_frame_assembler(m_tx_frame_asm); + EXPECT_EQ(m_tx_frame_asm.get_frame_onwire_len(), onwire_bl.length()); + + Tag rx_tag; + segment_bls_t rx_segment_bls; + EXPECT_TRUE(disassemble_frame(m_rx_frame_asm, onwire_bl, rx_tag, + rx_segment_bls)); + check_frame_assembler(m_rx_frame_asm); + EXPECT_EQ(0, onwire_bl.length()); + EXPECT_EQ(TestFrame::tag, rx_tag); + EXPECT_EQ(m_rx_frame_asm.get_num_segments(), rx_segment_bls.size()); + + auto rx_frame = TestFrame::Decode(rx_segment_bls); + EXPECT_TRUE(m_header.contents_equal(rx_frame.header())); + EXPECT_TRUE(m_front.contents_equal(rx_frame.front())); + EXPECT_TRUE(m_middle.contents_equal(rx_frame.middle())); + EXPECT_TRUE(m_data.contents_equal(rx_frame.data())); + } + + ceph::crypto::onwire::rxtx_t m_tx_crypto; + ceph::crypto::onwire::rxtx_t m_rx_crypto; + ceph::compression::onwire::rxtx_t m_tx_comp; + ceph::compression::onwire::rxtx_t m_rx_comp; + FrameAssembler m_tx_frame_asm; + FrameAssembler m_rx_frame_asm; + + const bufferlist m_header; + const bufferlist m_front; + const bufferlist m_middle; + const bufferlist m_data; +}; + +class RoundTripTest : public RoundTripTestBase {}; + +TEST_P(RoundTripTest, Basic) { + test_round_trip(); +} + +TEST_P(RoundTripTest, Reuse) { + for (int i = 0; i < 3; i++) { + test_round_trip(); + } +} + +static const round_trip_instance_t round_trip_instances[] = { + // first segment is empty + { 0, 0, 0, 0, 1, {{32, 0, 17, 0, 0, 0}, + {32, 0, 32, 0, 0, 0}, + {32, 0, 0, 0, 0, 0}, + {96, 0, 0, 0, 0, 0}}}, + { 0, 0, 0, 303, 4, {{32, 0, 0, 0, 303, 17}, + {32, 0, 0, 0, 304, 32}, + {32, 0, 0, 0, 303, 13}, + {96, 0, 0, 0, 304, 32}}}, + { 0, 0, 202, 0, 3, {{32, 0, 0, 202, 17, 0}, + {32, 0, 0, 208, 32, 0}, + {32, 0, 0, 202, 13, 0}, + {96, 0, 0, 208, 32, 0}}}, + { 0, 0, 202, 303, 4, {{32, 0, 0, 202, 303, 17}, + {32, 0, 0, 208, 304, 32}, + {32, 0, 0, 202, 303, 13}, + {96, 0, 0, 208, 304, 32}}}, + { 0, 101, 0, 0, 2, {{32, 0, 101, 17, 0, 0}, + {32, 0, 112, 32, 0, 0}, + {32, 0, 101, 13, 0, 0}, + {96, 0, 112, 32, 0, 0}}}, + { 0, 101, 0, 303, 4, {{32, 0, 101, 0, 303, 17}, + {32, 0, 112, 0, 304, 32}, + {32, 0, 101, 0, 303, 13}, + {96, 0, 112, 0, 304, 32}}}, + { 0, 101, 202, 0, 3, {{32, 0, 101, 202, 17, 0}, + {32, 0, 112, 208, 32, 0}, + {32, 0, 101, 202, 13, 0}, + {96, 0, 112, 208, 32, 0}}}, + { 0, 101, 202, 303, 4, {{32, 0, 101, 202, 303, 17}, + {32, 0, 112, 208, 304, 32}, + {32, 0, 101, 202, 303, 13}, + {96, 0, 112, 208, 304, 32}}}, + + // first segment is fully inlined, inline buffer is not full + { 1, 0, 0, 0, 1, {{32, 1, 17, 0, 0, 0}, + {32, 16, 32, 0, 0, 0}, + {32, 5, 0, 0, 0, 0}, + {96, 0, 0, 0, 0, 0}}}, + { 1, 0, 0, 303, 4, {{32, 1, 0, 0, 303, 17}, + {32, 16, 0, 0, 304, 32}, + {32, 5, 0, 0, 303, 13}, + {96, 0, 0, 0, 304, 32}}}, + { 1, 0, 202, 0, 3, {{32, 1, 0, 202, 17, 0}, + {32, 16, 0, 208, 32, 0}, + {32, 5, 0, 202, 13, 0}, + {96, 0, 0, 208, 32, 0}}}, + { 1, 0, 202, 303, 4, {{32, 1, 0, 202, 303, 17}, + {32, 16, 0, 208, 304, 32}, + {32, 5, 0, 202, 303, 13}, + {96, 0, 0, 208, 304, 32}}}, + { 1, 101, 0, 0, 2, {{32, 1, 101, 17, 0, 0}, + {32, 16, 112, 32, 0, 0}, + {32, 5, 101, 13, 0, 0}, + {96, 0, 112, 32, 0, 0}}}, + { 1, 101, 0, 303, 4, {{32, 1, 101, 0, 303, 17}, + {32, 16, 112, 0, 304, 32}, + {32, 5, 101, 0, 303, 13}, + {96, 0, 112, 0, 304, 32}}}, + { 1, 101, 202, 0, 3, {{32, 1, 101, 202, 17, 0}, + {32, 16, 112, 208, 32, 0}, + {32, 5, 101, 202, 13, 0}, + {96, 0, 112, 208, 32, 0}}}, + { 1, 101, 202, 303, 4, {{32, 1, 101, 202, 303, 17}, + {32, 16, 112, 208, 304, 32}, + {32, 5, 101, 202, 303, 13}, + {96, 0, 112, 208, 304, 32}}}, + + // first segment is fully inlined, inline buffer is full + {48, 0, 0, 0, 1, {{32, 48, 17, 0, 0, 0}, + {32, 48, 32, 0, 0, 0}, + {32, 52, 0, 0, 0, 0}, + {96, 0, 0, 0, 0, 0}}}, + {48, 0, 0, 303, 4, {{32, 48, 0, 0, 303, 17}, + {32, 48, 0, 0, 304, 32}, + {32, 52, 0, 0, 303, 13}, + {96, 0, 0, 0, 304, 32}}}, + {48, 0, 202, 0, 3, {{32, 48, 0, 202, 17, 0}, + {32, 48, 0, 208, 32, 0}, + {32, 52, 0, 202, 13, 0}, + {96, 0, 0, 208, 32, 0}}}, + {48, 0, 202, 303, 4, {{32, 48, 0, 202, 303, 17}, + {32, 48, 0, 208, 304, 32}, + {32, 52, 0, 202, 303, 13}, + {96, 0, 0, 208, 304, 32}}}, + {48, 101, 0, 0, 2, {{32, 48, 101, 17, 0, 0}, + {32, 48, 112, 32, 0, 0}, + {32, 52, 101, 13, 0, 0}, + {96, 0, 112, 32, 0, 0}}}, + {48, 101, 0, 303, 4, {{32, 48, 101, 0, 303, 17}, + {32, 48, 112, 0, 304, 32}, + {32, 52, 101, 0, 303, 13}, + {96, 0, 112, 0, 304, 32}}}, + {48, 101, 202, 0, 3, {{32, 48, 101, 202, 17, 0}, + {32, 48, 112, 208, 32, 0}, + {32, 52, 101, 202, 13, 0}, + {96, 0, 112, 208, 32, 0}}}, + {48, 101, 202, 303, 4, {{32, 48, 101, 202, 303, 17}, + {32, 48, 112, 208, 304, 32}, + {32, 52, 101, 202, 303, 13}, + {96, 0, 112, 208, 304, 32}}}, + + // first segment is partially inlined + {49, 0, 0, 0, 1, {{32, 49, 17, 0, 0, 0}, + {32, 64, 32, 0, 0, 0}, + {32, 53, 0, 0, 0, 0}, + {96, 32, 0, 0, 0, 0}}}, + {49, 0, 0, 303, 4, {{32, 49, 0, 0, 303, 17}, + {32, 64, 0, 0, 304, 32}, + {32, 53, 0, 0, 303, 13}, + {96, 32, 0, 0, 304, 32}}}, + {49, 0, 202, 0, 3, {{32, 49, 0, 202, 17, 0}, + {32, 64, 0, 208, 32, 0}, + {32, 53, 0, 202, 13, 0}, + {96, 32, 0, 208, 32, 0}}}, + {49, 0, 202, 303, 4, {{32, 49, 0, 202, 303, 17}, + {32, 64, 0, 208, 304, 32}, + {32, 53, 0, 202, 303, 13}, + {96, 32, 0, 208, 304, 32}}}, + {49, 101, 0, 0, 2, {{32, 49, 101, 17, 0, 0}, + {32, 64, 112, 32, 0, 0}, + {32, 53, 101, 13, 0, 0}, + {96, 32, 112, 32, 0, 0}}}, + {49, 101, 0, 303, 4, {{32, 49, 101, 0, 303, 17}, + {32, 64, 112, 0, 304, 32}, + {32, 53, 101, 0, 303, 13}, + {96, 32, 112, 0, 304, 32}}}, + {49, 101, 202, 0, 3, {{32, 49, 101, 202, 17, 0}, + {32, 64, 112, 208, 32, 0}, + {32, 53, 101, 202, 13, 0}, + {96, 32, 112, 208, 32, 0}}}, + {49, 101, 202, 303, 4, {{32, 49, 101, 202, 303, 17}, + {32, 64, 112, 208, 304, 32}, + {32, 53, 101, 202, 303, 13}, + {96, 32, 112, 208, 304, 32}}}, +}; + +INSTANTIATE_TEST_SUITE_P( + RoundTripTests, RoundTripTest, ::testing::Combine( + ::testing::ValuesIn(round_trip_instances), + ::testing::ValuesIn(modes))); + +class RoundTripPerfTest : public RoundTripTestBase {}; + +TEST_P(RoundTripPerfTest, DISABLED_Basic) { + for (int i = 0; i < 100000; i++) { + auto tx_frame = TestFrame::Encode(m_header, m_front, m_middle, m_data); + auto onwire_bl = tx_frame.get_buffer(m_tx_frame_asm); + + Tag rx_tag; + segment_bls_t rx_segment_bls; + ASSERT_TRUE(disassemble_frame(m_rx_frame_asm, onwire_bl, rx_tag, + rx_segment_bls)); + } +} + +static const round_trip_instance_t round_trip_perf_instances[] = { + {41, 250, 0, 0, 2, {{32, 41, 250, 17, 0, 0}, + {32, 48, 256, 32, 0, 0}, + {32, 45, 250, 13, 0, 0}, + {96, 0, 256, 32, 0, 0}}}, + {41, 250, 0, 512, 4, {{32, 41, 250, 0, 512, 17}, + {32, 48, 256, 0, 512, 32}, + {32, 45, 250, 0, 512, 13}, + {96, 0, 256, 0, 512, 32}}}, + {41, 250, 0, 4096, 4, {{32, 41, 250, 0, 4096, 17}, + {32, 48, 256, 0, 4096, 32}, + {32, 45, 250, 0, 4096, 13}, + {96, 0, 256, 0, 4096, 32}}}, + {41, 250, 0, 32768, 4, {{32, 41, 250, 0, 32768, 17}, + {32, 48, 256, 0, 32768, 32}, + {32, 45, 250, 0, 32768, 13}, + {96, 0, 256, 0, 32768, 32}}}, + {41, 250, 0, 131072, 4, {{32, 41, 250, 0, 131072, 17}, + {32, 48, 256, 0, 131072, 32}, + {32, 45, 250, 0, 131072, 13}, + {96, 0, 256, 0, 131072, 32}}}, + {41, 250, 0, 4194304, 4, {{32, 41, 250, 0, 4194304, 17}, + {32, 48, 256, 0, 4194304, 32}, + {32, 45, 250, 0, 4194304, 13}, + {96, 0, 256, 0, 4194304, 32}}}, +}; + +INSTANTIATE_TEST_SUITE_P( + RoundTripPerfTests, RoundTripPerfTest, ::testing::Combine( + ::testing::ValuesIn(round_trip_perf_instances), + ::testing::ValuesIn(modes))); + +} // namespace ceph::msgr::v2 + +int main(int argc, char* argv[]) { + auto args = argv_to_vec(argc, argv); + + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + common_init_finish(g_ceph_context); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc new file mode 100644 index 000000000..f702cc288 --- /dev/null +++ b/src/test/msgr/test_msgr.cc @@ -0,0 +1,2424 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <atomic> +#include <iostream> +#include <list> +#include <memory> +#include <set> +#include <stdlib.h> +#include <time.h> +#include <unistd.h> + +#include <boost/random/binomial_distribution.hpp> +#include <boost/random/mersenne_twister.hpp> +#include <boost/random/uniform_int.hpp> +#include <gtest/gtest.h> + +#define MSG_POLICY_UNIT_TESTING + +#include "common/ceph_argparse.h" +#include "common/ceph_mutex.h" +#include "global/global_init.h" +#include "messages/MCommand.h" +#include "messages/MPing.h" +#include "msg/Connection.h" +#include "msg/Dispatcher.h" +#include "msg/Message.h" +#include "msg/Messenger.h" +#include "msg/msg_types.h" + +typedef boost::mt11213b gen_type; + +#include "common/dout.h" +#include "include/ceph_assert.h" + +#include "auth/DummyAuth.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << " ceph_test_msgr " + + +#define CHECK_AND_WAIT_TRUE(expr) do { \ + int n = 1000; \ + while (--n) { \ + if (expr) \ + break; \ + usleep(1000); \ + } \ +} while(0); + +using namespace std; + +class MessengerTest : public ::testing::TestWithParam<const char*> { + public: + DummyAuthClientServer dummy_auth; + Messenger *server_msgr; + Messenger *client_msgr; + + MessengerTest() : dummy_auth(g_ceph_context), + server_msgr(NULL), client_msgr(NULL) { + dummy_auth.auth_registry.refresh_config(); + } + void SetUp() override { + lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl; + server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid()); + client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid()); + server_msgr->set_default_policy(Messenger::Policy::stateless_server(0)); + client_msgr->set_default_policy(Messenger::Policy::lossy_client(0)); + server_msgr->set_auth_client(&dummy_auth); + server_msgr->set_auth_server(&dummy_auth); + client_msgr->set_auth_client(&dummy_auth); + client_msgr->set_auth_server(&dummy_auth); + server_msgr->set_require_authorizer(false); + } + void TearDown() override { + ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0); + ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0); + delete server_msgr; + delete client_msgr; + } + +}; + + +class FakeDispatcher : public Dispatcher { + public: + struct Session : public RefCountedObject { + atomic<uint64_t> count; + ConnectionRef con; + + explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) { + } + uint64_t get_count() { return count; } + }; + + ceph::mutex lock = ceph::make_mutex("FakeDispatcher::lock"); + ceph::condition_variable cond; + bool is_server; + bool got_new; + bool got_remote_reset; + bool got_connect; + bool loopback; + entity_addrvec_t last_accept; + ConnectionRef *last_accept_con_ptr = nullptr; + + explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), + is_server(s), got_new(false), got_remote_reset(false), + got_connect(false), loopback(false) { + } + bool ms_can_fast_dispatch_any() const override { return true; } + bool ms_can_fast_dispatch(const Message *m) const override { + switch (m->get_type()) { + case CEPH_MSG_PING: + return true; + default: + return false; + } + } + + void ms_handle_fast_connect(Connection *con) override { + std::scoped_lock l{lock}; + lderr(g_ceph_context) << __func__ << " " << con << dendl; + auto s = con->get_priv(); + if (!s) { + auto session = new Session(con); + con->set_priv(RefCountedPtr{session, false}); + lderr(g_ceph_context) << __func__ << " con: " << con + << " count: " << session->count << dendl; + } + got_connect = true; + cond.notify_all(); + } + void ms_handle_fast_accept(Connection *con) override { + last_accept = con->get_peer_addrs(); + if (last_accept_con_ptr) { + *last_accept_con_ptr = con; + } + if (!con->get_priv()) { + con->set_priv(RefCountedPtr{new Session(con), false}); + } + } + bool ms_dispatch(Message *m) override { + auto priv = m->get_connection()->get_priv(); + auto s = static_cast<Session*>(priv.get()); + if (!s) { + s = new Session(m->get_connection()); + priv.reset(s, false); + m->get_connection()->set_priv(priv); + } + s->count++; + lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; + if (is_server) { + reply_message(m); + } + std::lock_guard l{lock}; + got_new = true; + cond.notify_all(); + m->put(); + return true; + } + bool ms_handle_reset(Connection *con) override { + std::lock_guard l{lock}; + lderr(g_ceph_context) << __func__ << " " << con << dendl; + auto priv = con->get_priv(); + if (auto s = static_cast<Session*>(priv.get()); s) { + s->con.reset(); // break con <-> session ref cycle + con->set_priv(nullptr); // break ref <-> session cycle, if any + } + return true; + } + void ms_handle_remote_reset(Connection *con) override { + std::lock_guard l{lock}; + lderr(g_ceph_context) << __func__ << " " << con << dendl; + auto priv = con->get_priv(); + if (auto s = static_cast<Session*>(priv.get()); s) { + s->con.reset(); // break con <-> session ref cycle + con->set_priv(nullptr); // break ref <-> session cycle, if any + } + got_remote_reset = true; + cond.notify_all(); + } + bool ms_handle_refused(Connection *con) override { + return false; + } + void ms_fast_dispatch(Message *m) override { + auto priv = m->get_connection()->get_priv(); + auto s = static_cast<Session*>(priv.get()); + if (!s) { + s = new Session(m->get_connection()); + priv.reset(s, false); + m->get_connection()->set_priv(priv); + } + s->count++; + lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; + if (is_server) { + if (loopback) + ceph_assert(m->get_source().is_osd()); + else + reply_message(m); + } else if (loopback) { + ceph_assert(m->get_source().is_client()); + } + m->put(); + std::lock_guard l{lock}; + got_new = true; + cond.notify_all(); + } + + int ms_handle_fast_authentication(Connection *con) override { + return 1; + } + + void reply_message(Message *m) { + MPing *rm = new MPing(); + m->get_connection()->send_message(rm); + } +}; + +typedef FakeDispatcher::Session Session; + +struct TestInterceptor : public Interceptor { + + bool step_waiting = false; + bool waiting = true; + std::map<Connection *, uint32_t> current_step; + std::map<Connection *, std::list<uint32_t>> step_history; + std::map<uint32_t, std::optional<ACTION>> decisions; + std::set<uint32_t> breakpoints; + + uint32_t count_step(Connection *conn, uint32_t step) { + uint32_t count = 0; + for (auto s : step_history[conn]) { + if (s == step) { + count++; + } + } + return count; + } + + void breakpoint(uint32_t step) { + breakpoints.insert(step); + } + + void remove_bp(uint32_t step) { + breakpoints.erase(step); + } + + Connection *wait(uint32_t step, Connection *conn=nullptr) { + std::unique_lock<std::mutex> l(lock); + while(true) { + if (conn) { + auto it = current_step.find(conn); + if (it != current_step.end()) { + if (it->second == step) { + break; + } + } + } else { + for (auto it : current_step) { + if (it.second == step) { + conn = it.first; + break; + } + } + if (conn) { + break; + } + } + step_waiting = true; + cond_var.wait(l); + } + step_waiting = false; + return conn; + } + + ACTION wait_for_decision(uint32_t step, std::unique_lock<std::mutex> &l) { + if (decisions[step]) { + return *(decisions[step]); + } + waiting = true; + cond_var.wait(l, [this] { return !waiting; }); + return *(decisions[step]); + } + + void proceed(uint32_t step, ACTION decision) { + std::unique_lock<std::mutex> l(lock); + decisions[step] = decision; + if (waiting) { + waiting = false; + cond_var.notify_one(); + } + } + + ACTION intercept(Connection *conn, uint32_t step) override { + lderr(g_ceph_context) << __func__ << " conn(" << conn + << ") intercept called on step=" << step << dendl; + + { + std::unique_lock<std::mutex> l(lock); + step_history[conn].push_back(step); + current_step[conn] = step; + if (step_waiting) { + cond_var.notify_one(); + } + } + + std::unique_lock<std::mutex> l(lock); + ACTION decision = ACTION::CONTINUE; + if (breakpoints.find(step) != breakpoints.end()) { + lderr(g_ceph_context) << __func__ << " conn(" << conn + << ") pausing on step=" << step << dendl; + decision = wait_for_decision(step, l); + } else { + if (decisions[step]) { + decision = *(decisions[step]); + } + } + lderr(g_ceph_context) << __func__ << " conn(" << conn + << ") resuming step=" << step << " with decision=" + << decision << dendl; + decisions[step].reset(); + return decision; + } + +}; + +/** + * Scenario: A connects to B, and B connects to A at the same time. + */ +TEST_P(MessengerTest, ConnectionRaceTest) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); + + TestInterceptor *cli_interceptor = new TestInterceptor(); + TestInterceptor *srv_interceptor = new TestInterceptor(); + + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0)); + server_msgr->interceptor = srv_interceptor; + + client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0)); + client_msgr->interceptor = cli_interceptor; + + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1:3300"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + bind_addr.parse("v2:127.0.0.1:3301"); + client_msgr->bind(bind_addr); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // pause before sending client_ident message + cli_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY); + // pause before sending client_ident message + srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY); + + ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + MPing *m1 = new MPing(); + ASSERT_EQ(c2s->send_message(m1), 0); + + ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(), + client_msgr->get_myaddrs()); + MPing *m2 = new MPing(); + ASSERT_EQ(s2c->send_message(m2), 0); + + cli_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, c2s.get()); + srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, s2c.get()); + + // at this point both connections (A->B, B->A) are paused just before sending + // the client_ident message. + + cli_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY); + srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY); + + cli_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE); + srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE); + + { + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + + { + std::unique_lock l{srv_dispatcher.lock}; + srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); + srv_dispatcher.got_new = false; + } + + ASSERT_TRUE(s2c->is_connected()); + ASSERT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count()); + ASSERT_TRUE(s2c->peer_is_client()); + + ASSERT_TRUE(c2s->is_connected()); + ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); + ASSERT_TRUE(c2s->peer_is_osd()); + + client_msgr->shutdown(); + client_msgr->wait(); + server_msgr->shutdown(); + server_msgr->wait(); + + delete cli_interceptor; + delete srv_interceptor; +} + +/** + * Scenario: A connects to B, and B connects to A at the same time. + * The first (A -> B) connection gets to message flow handshake, the + * second (B -> A) connection is stuck waiting for a banner from A. + * After A sends client_ident to B, the first connection wins and B + * calls reuse_connection() to replace the second connection's socket + * while the second connection is still in BANNER_CONNECTING. + */ +TEST_P(MessengerTest, ConnectionRaceReuseBannerTest) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); + + auto cli_interceptor = std::make_unique<TestInterceptor>(); + auto srv_interceptor = std::make_unique<TestInterceptor>(); + + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, + Messenger::Policy::lossless_peer_reuse(0)); + server_msgr->interceptor = srv_interceptor.get(); + + client_msgr->set_policy(entity_name_t::TYPE_OSD, + Messenger::Policy::lossless_peer_reuse(0)); + client_msgr->interceptor = cli_interceptor.get(); + + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1:3300"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + bind_addr.parse("v2:127.0.0.1:3301"); + client_msgr->bind(bind_addr); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // pause before sending client_ident message + srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY); + + ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(), + client_msgr->get_myaddrs()); + MPing *m1 = new MPing(); + ASSERT_EQ(s2c->send_message(m1), 0); + + srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY); + srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY); + + // pause before sending banner + cli_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING); + + ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + MPing *m2 = new MPing(); + ASSERT_EQ(c2s->send_message(m2), 0); + + cli_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING); + cli_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING); + + // second connection is in BANNER_CONNECTING, ensure it stays so + // and send client_ident + srv_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE); + srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE); + + // handle client_ident -- triggers reuse_connection() with exproto + // in BANNER_CONNECTING + cli_interceptor->breakpoint(Interceptor::STEP::READY); + cli_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING, Interceptor::ACTION::CONTINUE); + + cli_interceptor->wait(Interceptor::STEP::READY); + cli_interceptor->remove_bp(Interceptor::STEP::READY); + + // first connection is in READY + Connection *s2c_accepter = srv_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE); + srv_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE); + + srv_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE, Interceptor::ACTION::CONTINUE); + cli_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE); + + { + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + + { + std::unique_lock l{srv_dispatcher.lock}; + srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); + srv_dispatcher.got_new = false; + } + + EXPECT_TRUE(s2c->is_connected()); + EXPECT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count()); + EXPECT_TRUE(s2c->peer_is_client()); + + EXPECT_TRUE(c2s->is_connected()); + EXPECT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); + EXPECT_TRUE(c2s->peer_is_osd()); + + // closed in reuse_connection() -- EPIPE when writing banner/hello + EXPECT_FALSE(s2c_accepter->is_connected()); + + // established exactly once, never faulted and reconnected + EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::START_CLIENT_BANNER_EXCHANGE), 1u); + EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 0u); + EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::READY), 1u); + + client_msgr->shutdown(); + client_msgr->wait(); + server_msgr->shutdown(); + server_msgr->wait(); +} + +/** + * Scenario: + * - A connects to B + * - A sends client_ident to B + * - B fails before sending server_ident to A + * - A reconnects + */ +TEST_P(MessengerTest, MissingServerIdenTest) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); + + TestInterceptor *cli_interceptor = new TestInterceptor(); + TestInterceptor *srv_interceptor = new TestInterceptor(); + + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0)); + server_msgr->interceptor = srv_interceptor; + + client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0)); + client_msgr->interceptor = cli_interceptor; + + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1:3300"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + bind_addr.parse("v2:127.0.0.1:3301"); + client_msgr->bind(bind_addr); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // pause before sending server_ident message + srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY); + + ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + MPing *m1 = new MPing(); + ASSERT_EQ(c2s->send_message(m1), 0); + + Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY); + srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY); + + // We inject a message from this side of the connection to force it to be + // in standby when we inject the failure below + MPing *m2 = new MPing(); + ASSERT_EQ(c2s_accepter->send_message(m2), 0); + + srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL); + + { + std::unique_lock l{srv_dispatcher.lock}; + srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); + srv_dispatcher.got_new = false; + } + + { + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + + ASSERT_TRUE(c2s->is_connected()); + ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); + ASSERT_TRUE(c2s->peer_is_osd()); + + ASSERT_TRUE(c2s_accepter->is_connected()); + ASSERT_EQ(1u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count()); + ASSERT_TRUE(c2s_accepter->peer_is_client()); + + client_msgr->shutdown(); + client_msgr->wait(); + server_msgr->shutdown(); + server_msgr->wait(); + + delete cli_interceptor; + delete srv_interceptor; +} + +/** + * Scenario: + * - A connects to B + * - A sends client_ident to B + * - B fails before sending server_ident to A + * - A goes to standby + * - B reconnects to A + */ +TEST_P(MessengerTest, MissingServerIdenTest2) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); + + TestInterceptor *cli_interceptor = new TestInterceptor(); + TestInterceptor *srv_interceptor = new TestInterceptor(); + + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0)); + server_msgr->interceptor = srv_interceptor; + + client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); + client_msgr->interceptor = cli_interceptor; + + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1:3300"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + bind_addr.parse("v2:127.0.0.1:3301"); + client_msgr->bind(bind_addr); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // pause before sending server_ident message + srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY); + + ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + + Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY); + srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY); + + // We inject a message from this side of the connection to force it to be + // in standby when we inject the failure below + MPing *m2 = new MPing(); + ASSERT_EQ(c2s_accepter->send_message(m2), 0); + + srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL); + + { + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + + ASSERT_TRUE(c2s->is_connected()); + ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); + ASSERT_TRUE(c2s->peer_is_osd()); + + ASSERT_TRUE(c2s_accepter->is_connected()); + ASSERT_EQ(0u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count()); + ASSERT_TRUE(c2s_accepter->peer_is_client()); + + client_msgr->shutdown(); + client_msgr->wait(); + server_msgr->shutdown(); + server_msgr->wait(); + + delete cli_interceptor; + delete srv_interceptor; +} + +/** + * Scenario: + * - A connects to B + * - A and B exchange messages + * - A fails + * - B goes into standby + * - A reconnects + */ +TEST_P(MessengerTest, ReconnectTest) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + + TestInterceptor *cli_interceptor = new TestInterceptor(); + TestInterceptor *srv_interceptor = new TestInterceptor(); + + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0)); + server_msgr->interceptor = srv_interceptor; + + client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); + client_msgr->interceptor = cli_interceptor; + + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1:3300"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + bind_addr.parse("v2:127.0.0.1:3301"); + client_msgr->bind(bind_addr); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + + MPing *m1 = new MPing(); + ASSERT_EQ(c2s->send_message(m1), 0); + + { + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + + ASSERT_TRUE(c2s->is_connected()); + ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); + ASSERT_TRUE(c2s->peer_is_osd()); + + cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE); + + MPing *m2 = new MPing(); + ASSERT_EQ(c2s->send_message(m2), 0); + + cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get()); + cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE); + + // at this point client and server are connected together + + srv_interceptor->breakpoint(Interceptor::STEP::READY); + + // failing client + cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL); + + MPing *m3 = new MPing(); + ASSERT_EQ(c2s->send_message(m3), 0); + + Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY); + // the srv end of theconnection is now paused at ready + // this means that the reconnect was successful + srv_interceptor->remove_bp(Interceptor::STEP::READY); + + ASSERT_TRUE(c2s_accepter->peer_is_client()); + // c2s_accepter sent 0 reconnect messages + ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u); + // c2s_accepter sent 1 reconnect_ok messages + ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 1u); + // c2s sent 1 reconnect messages + ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u); + // c2s sent 0 reconnect_ok messages + ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 0u); + + srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE); + + { + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + + client_msgr->shutdown(); + client_msgr->wait(); + server_msgr->shutdown(); + server_msgr->wait(); + + delete cli_interceptor; + delete srv_interceptor; +} + +/** + * Scenario: + * - A connects to B + * - A and B exchange messages + * - A fails + * - A reconnects // B reconnects + */ +TEST_P(MessengerTest, ReconnectRaceTest) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + + TestInterceptor *cli_interceptor = new TestInterceptor(); + TestInterceptor *srv_interceptor = new TestInterceptor(); + + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0)); + server_msgr->interceptor = srv_interceptor; + + client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); + client_msgr->interceptor = cli_interceptor; + + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1:3300"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + bind_addr.parse("v2:127.0.0.1:3301"); + client_msgr->bind(bind_addr); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + + MPing *m1 = new MPing(); + ASSERT_EQ(c2s->send_message(m1), 0); + + { + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + + ASSERT_TRUE(c2s->is_connected()); + ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); + ASSERT_TRUE(c2s->peer_is_osd()); + + cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE); + + MPing *m2 = new MPing(); + ASSERT_EQ(c2s->send_message(m2), 0); + + cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get()); + cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE); + + // at this point client and server are connected together + + // force both client and server to race on reconnect + cli_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT); + srv_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT); + + // failing client + // this will cause both client and server to reconnect at the same time + cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL); + + MPing *m3 = new MPing(); + ASSERT_EQ(c2s->send_message(m3), 0); + + cli_interceptor->wait(Interceptor::STEP::SEND_RECONNECT, c2s.get()); + srv_interceptor->wait(Interceptor::STEP::SEND_RECONNECT); + + cli_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT); + srv_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT); + + // pause on "ready" + srv_interceptor->breakpoint(Interceptor::STEP::READY); + + cli_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE); + srv_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE); + + Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY); + + // the server has reconnected and is "ready" + srv_interceptor->remove_bp(Interceptor::STEP::READY); + + ASSERT_TRUE(c2s_accepter->peer_is_client()); + ASSERT_TRUE(c2s->peer_is_osd()); + + // the server should win the reconnect race + + // c2s_accepter sent 1 or 2 reconnect messages + ASSERT_LT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 3u); + ASSERT_GT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u); + // c2s_accepter sent 0 reconnect_ok messages + ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 0u); + // c2s sent 1 reconnect messages + ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u); + // c2s sent 1 reconnect_ok messages + ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 1u); + + if (srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT) == 2) { + // if the server send the reconnect message two times then + // the client must have sent a session retry message to the server + ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 1u); + } else { + ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 0u); + } + + srv_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE); + + { + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + + client_msgr->shutdown(); + client_msgr->wait(); + server_msgr->shutdown(); + server_msgr->wait(); + + delete cli_interceptor; + delete srv_interceptor; +} + +TEST_P(MessengerTest, SimpleTest) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // 1. simple round trip + MPing *m = new MPing(); + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_TRUE(conn->is_connected()); + ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count()); + ASSERT_TRUE(conn->peer_is_osd()); + + // 2. test rebind port + set<int> avoid_ports; + for (int i = 0; i < 10 ; i++) { + for (auto a : server_msgr->get_myaddrs().v) { + avoid_ports.insert(a.get_port() + i); + } + } + server_msgr->rebind(avoid_ports); + for (auto a : server_msgr->get_myaddrs().v) { + ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0); + } + + conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + + // 3. test markdown connection + conn->mark_down(); + ASSERT_FALSE(conn->is_connected()); + + // 4. test failed connection + server_msgr->shutdown(); + server_msgr->wait(); + + m = new MPing(); + conn->send_message(m); + CHECK_AND_WAIT_TRUE(!conn->is_connected()); + ASSERT_FALSE(conn->is_connected()); + + // 5. loopback connection + srv_dispatcher.loopback = true; + conn = client_msgr->get_loopback_connection(); + { + m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + srv_dispatcher.loopback = false; + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + client_msgr->shutdown(); + client_msgr->wait(); + server_msgr->shutdown(); + server_msgr->wait(); +} + +TEST_P(MessengerTest, SimpleMsgr2Test) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t legacy_addr; + legacy_addr.parse("v1:127.0.0.1"); + entity_addr_t msgr2_addr; + msgr2_addr.parse("v2:127.0.0.1"); + entity_addrvec_t bind_addrs; + bind_addrs.v.push_back(legacy_addr); + bind_addrs.v.push_back(msgr2_addr); + server_msgr->bindv(bind_addrs); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // 1. simple round trip + MPing *m = new MPing(); + ConnectionRef conn = client_msgr->connect_to( + server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_TRUE(conn->is_connected()); + ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count()); + ASSERT_TRUE(conn->peer_is_osd()); + + // 2. test rebind port + set<int> avoid_ports; + for (int i = 0; i < 10 ; i++) { + for (auto a : server_msgr->get_myaddrs().v) { + avoid_ports.insert(a.get_port() + i); + } + } + server_msgr->rebind(avoid_ports); + for (auto a : server_msgr->get_myaddrs().v) { + ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0); + } + + conn = client_msgr->connect_to( + server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + + // 3. test markdown connection + conn->mark_down(); + ASSERT_FALSE(conn->is_connected()); + + // 4. test failed connection + server_msgr->shutdown(); + server_msgr->wait(); + + m = new MPing(); + conn->send_message(m); + CHECK_AND_WAIT_TRUE(!conn->is_connected()); + ASSERT_FALSE(conn->is_connected()); + + // 5. loopback connection + srv_dispatcher.loopback = true; + conn = client_msgr->get_loopback_connection(); + { + m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + srv_dispatcher.loopback = false; + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + client_msgr->shutdown(); + client_msgr->wait(); + server_msgr->shutdown(); + server_msgr->wait(); +} + +TEST_P(MessengerTest, FeatureTest) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1"); + uint64_t all_feature_supported, feature_required, feature_supported = 0; + for (int i = 0; i < 10; i++) + feature_supported |= 1ULL << i; + feature_supported |= CEPH_FEATUREMASK_MSG_ADDR2; + feature_supported |= CEPH_FEATUREMASK_SERVER_NAUTILUS; + feature_required = feature_supported | 1ULL << 13; + all_feature_supported = feature_required | 1ULL << 14; + + Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT); + p.features_required = feature_required; + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + // 1. Suppose if only support less than required + p = client_msgr->get_policy(entity_name_t::TYPE_OSD); + p.features_supported = feature_supported; + client_msgr->set_policy(entity_name_t::TYPE_OSD, p); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + MPing *m = new MPing(); + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + conn->send_message(m); + CHECK_AND_WAIT_TRUE(!conn->is_connected()); + // should failed build a connection + ASSERT_FALSE(conn->is_connected()); + + client_msgr->shutdown(); + client_msgr->wait(); + + // 2. supported met required + p = client_msgr->get_policy(entity_name_t::TYPE_OSD); + p.features_supported = all_feature_supported; + client_msgr->set_policy(entity_name_t::TYPE_OSD, p); + client_msgr->start(); + + conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + + server_msgr->shutdown(); + client_msgr->shutdown(); + server_msgr->wait(); + client_msgr->wait(); +} + +TEST_P(MessengerTest, TimeoutTest) { + g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "1"); + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // 1. build the connection + MPing *m = new MPing(); + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_TRUE(conn->is_connected()); + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + ASSERT_TRUE(conn->peer_is_osd()); + + // 2. wait for idle + usleep(2500*1000); + ASSERT_FALSE(conn->is_connected()); + + server_msgr->shutdown(); + server_msgr->wait(); + + client_msgr->shutdown(); + client_msgr->wait(); + g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "900"); +} + +TEST_P(MessengerTest, StatefulTest) { + Message *m; + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1"); + Messenger::Policy p = Messenger::Policy::stateful_server(0); + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); + p = Messenger::Policy::lossless_client(0); + client_msgr->set_policy(entity_name_t::TYPE_OSD, p); + + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // 1. test for server standby + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + conn->mark_down(); + ASSERT_FALSE(conn->is_connected()); + ConnectionRef server_conn = server_msgr->connect_to( + client_msgr->get_mytype(), srv_dispatcher.last_accept); + // don't lose state + ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count()); + + srv_dispatcher.got_new = false; + conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + server_conn = server_msgr->connect_to(client_msgr->get_mytype(), + srv_dispatcher.last_accept); + { + std::unique_lock l{srv_dispatcher.lock}; + srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_remote_reset; }); + } + + // 2. test for client reconnect + ASSERT_FALSE(cli_dispatcher.got_remote_reset); + cli_dispatcher.got_connect = false; + cli_dispatcher.got_new = false; + cli_dispatcher.got_remote_reset = false; + server_conn->mark_down(); + ASSERT_FALSE(server_conn->is_connected()); + // ensure client detect server socket closed + { + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; }); + cli_dispatcher.got_remote_reset = false; + } + { + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; }); + cli_dispatcher.got_connect = false; + } + CHECK_AND_WAIT_TRUE(conn->is_connected()); + ASSERT_TRUE(conn->is_connected()); + + { + m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + ASSERT_TRUE(conn->is_connected()); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + // resetcheck happen + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + server_conn = server_msgr->connect_to(client_msgr->get_mytype(), + srv_dispatcher.last_accept); + ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count()); + cli_dispatcher.got_remote_reset = false; + + server_msgr->shutdown(); + client_msgr->shutdown(); + server_msgr->wait(); + client_msgr->wait(); +} + +TEST_P(MessengerTest, StatelessTest) { + Message *m; + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1"); + Messenger::Policy p = Messenger::Policy::stateless_server(0); + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); + p = Messenger::Policy::lossy_client(0); + client_msgr->set_policy(entity_name_t::TYPE_OSD, p); + + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // 1. test for server lose state + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + conn->mark_down(); + ASSERT_FALSE(conn->is_connected()); + + srv_dispatcher.got_new = false; + ConnectionRef server_conn; + srv_dispatcher.last_accept_con_ptr = &server_conn; + conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + ASSERT_TRUE(server_conn); + + // server lose state + { + std::unique_lock l{srv_dispatcher.lock}; + srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); + } + ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count()); + + // 2. test for client lossy + server_conn->mark_down(); + ASSERT_FALSE(server_conn->is_connected()); + conn->send_keepalive(); + CHECK_AND_WAIT_TRUE(!conn->is_connected()); + ASSERT_FALSE(conn->is_connected()); + conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + + server_msgr->shutdown(); + client_msgr->shutdown(); + server_msgr->wait(); + client_msgr->wait(); +} + +TEST_P(MessengerTest, AnonTest) { + Message *m; + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1"); + Messenger::Policy p = Messenger::Policy::stateless_server(0); + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); + p = Messenger::Policy::lossy_client(0); + client_msgr->set_policy(entity_name_t::TYPE_OSD, p); + + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + ConnectionRef server_con_a, server_con_b; + + // a + srv_dispatcher.last_accept_con_ptr = &server_con_a; + ConnectionRef con_a = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs(), + true); + { + m = new MPing(); + ASSERT_EQ(con_a->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_EQ(1U, static_cast<Session*>(con_a->get_priv().get())->get_count()); + + // b + srv_dispatcher.last_accept_con_ptr = &server_con_b; + ConnectionRef con_b = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs(), + true); + { + m = new MPing(); + ASSERT_EQ(con_b->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_EQ(1U, static_cast<Session*>(con_b->get_priv().get())->get_count()); + + // these should be distinct + ASSERT_NE(con_a, con_b); + ASSERT_NE(server_con_a, server_con_b); + + // and both connected + { + m = new MPing(); + ASSERT_EQ(con_a->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + { + m = new MPing(); + ASSERT_EQ(con_b->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + + // clean up + con_a->mark_down(); + ASSERT_FALSE(con_a->is_connected()); + con_b->mark_down(); + ASSERT_FALSE(con_b->is_connected()); + + server_msgr->shutdown(); + client_msgr->shutdown(); + server_msgr->wait(); + client_msgr->wait(); +} + +TEST_P(MessengerTest, ClientStandbyTest) { + Message *m; + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1"); + Messenger::Policy p = Messenger::Policy::stateful_server(0); + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); + p = Messenger::Policy::lossless_peer(0); + client_msgr->set_policy(entity_name_t::TYPE_OSD, p); + + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // 1. test for client standby, resetcheck + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + ConnectionRef server_conn = server_msgr->connect_to( + client_msgr->get_mytype(), + srv_dispatcher.last_accept); + ASSERT_FALSE(cli_dispatcher.got_remote_reset); + cli_dispatcher.got_connect = false; + server_conn->mark_down(); + ASSERT_FALSE(server_conn->is_connected()); + // client should be standby + usleep(300*1000); + // client should be standby, so we use original connection + { + // Try send message to verify got remote reset callback + m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + { + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; }); + cli_dispatcher.got_remote_reset = false; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; }); + cli_dispatcher.got_connect = false; + } + CHECK_AND_WAIT_TRUE(conn->is_connected()); + ASSERT_TRUE(conn->is_connected()); + m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + server_conn = server_msgr->connect_to(client_msgr->get_mytype(), + srv_dispatcher.last_accept); + ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count()); + + server_msgr->shutdown(); + client_msgr->shutdown(); + server_msgr->wait(); + client_msgr->wait(); +} + +TEST_P(MessengerTest, AuthTest) { + g_ceph_context->_conf.set_val("auth_cluster_required", "cephx"); + g_ceph_context->_conf.set_val("auth_service_required", "cephx"); + g_ceph_context->_conf.set_val("auth_client_required", "cephx"); + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // 1. simple auth round trip + MPing *m = new MPing(); + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_TRUE(conn->is_connected()); + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + + // 2. mix auth + g_ceph_context->_conf.set_val("auth_cluster_required", "none"); + g_ceph_context->_conf.set_val("auth_service_required", "none"); + g_ceph_context->_conf.set_val("auth_client_required", "none"); + conn->mark_down(); + ASSERT_FALSE(conn->is_connected()); + conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + MPing *m = new MPing(); + ASSERT_EQ(conn->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_TRUE(conn->is_connected()); + ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); + server_msgr->shutdown(); + client_msgr->shutdown(); + server_msgr->wait(); + client_msgr->wait(); +} + +TEST_P(MessengerTest, MessageTest) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1"); + Messenger::Policy p = Messenger::Policy::stateful_server(0); + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); + p = Messenger::Policy::lossless_peer(0); + client_msgr->set_policy(entity_name_t::TYPE_OSD, p); + + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + + // 1. A very large "front"(as well as "payload") + // Because a external message need to invade Messenger::decode_message, + // here we only use existing message class(MCommand) + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + { + uuid_d uuid; + uuid.generate_random(); + vector<string> cmds; + string s("abcdefghijklmnopqrstuvwxyz"); + for (int i = 0; i < 1024*30; i++) + cmds.push_back(s); + MCommand *m = new MCommand(uuid); + m->cmd = cmds; + conn->send_message(m); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait_for(l, 500s, [&] { return cli_dispatcher.got_new; }); + ASSERT_TRUE(cli_dispatcher.got_new); + cli_dispatcher.got_new = false; + } + + // 2. A very large "data" + { + bufferlist bl; + string s("abcdefghijklmnopqrstuvwxyz"); + for (int i = 0; i < 1024*30; i++) + bl.append(s); + MPing *m = new MPing(); + m->set_data(bl); + conn->send_message(m); + utime_t t; + t += 1000*1000*500; + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + ASSERT_TRUE(cli_dispatcher.got_new); + cli_dispatcher.got_new = false; + } + server_msgr->shutdown(); + client_msgr->shutdown(); + server_msgr->wait(); + client_msgr->wait(); +} + + +class SyntheticWorkload; + +struct Payload { + enum Who : uint8_t { + PING = 0, + PONG = 1, + }; + uint8_t who = 0; + uint64_t seq = 0; + bufferlist data; + + Payload(Who who, uint64_t seq, const bufferlist& data) + : who(who), seq(seq), data(data) + {} + Payload() = default; + DENC(Payload, v, p) { + DENC_START(1, 1, p); + denc(v.who, p); + denc(v.seq, p); + denc(v.data, p); + DENC_FINISH(p); + } +}; +WRITE_CLASS_DENC(Payload) + +ostream& operator<<(ostream& out, const Payload &pl) +{ + return out << "reply=" << pl.who << " i = " << pl.seq; +} + +class SyntheticDispatcher : public Dispatcher { + public: + ceph::mutex lock = ceph::make_mutex("SyntheticDispatcher::lock"); + ceph::condition_variable cond; + bool is_server; + bool got_new; + bool got_remote_reset; + bool got_connect; + map<ConnectionRef, list<uint64_t> > conn_sent; + map<uint64_t, bufferlist> sent; + atomic<uint64_t> index; + SyntheticWorkload *workload; + + SyntheticDispatcher(bool s, SyntheticWorkload *wl): + Dispatcher(g_ceph_context), is_server(s), got_new(false), + got_remote_reset(false), got_connect(false), index(0), workload(wl) { + } + bool ms_can_fast_dispatch_any() const override { return true; } + bool ms_can_fast_dispatch(const Message *m) const override { + switch (m->get_type()) { + case CEPH_MSG_PING: + case MSG_COMMAND: + return true; + default: + return false; + } + } + + void ms_handle_fast_connect(Connection *con) override { + std::lock_guard l{lock}; + list<uint64_t> c = conn_sent[con]; + for (list<uint64_t>::iterator it = c.begin(); + it != c.end(); ++it) + sent.erase(*it); + conn_sent.erase(con); + got_connect = true; + cond.notify_all(); + } + void ms_handle_fast_accept(Connection *con) override { + std::lock_guard l{lock}; + list<uint64_t> c = conn_sent[con]; + for (list<uint64_t>::iterator it = c.begin(); + it != c.end(); ++it) + sent.erase(*it); + conn_sent.erase(con); + cond.notify_all(); + } + bool ms_dispatch(Message *m) override { + ceph_abort(); + } + bool ms_handle_reset(Connection *con) override; + void ms_handle_remote_reset(Connection *con) override { + std::lock_guard l{lock}; + list<uint64_t> c = conn_sent[con]; + for (list<uint64_t>::iterator it = c.begin(); + it != c.end(); ++it) + sent.erase(*it); + conn_sent.erase(con); + got_remote_reset = true; + } + bool ms_handle_refused(Connection *con) override { + return false; + } + void ms_fast_dispatch(Message *m) override { + // MSG_COMMAND is used to disorganize regular message flow + if (m->get_type() == MSG_COMMAND) { + m->put(); + return ; + } + + Payload pl; + auto p = m->get_data().cbegin(); + decode(pl, p); + if (pl.who == Payload::PING) { + lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; + reply_message(m, pl); + m->put(); + std::lock_guard l{lock}; + got_new = true; + cond.notify_all(); + } else { + std::lock_guard l{lock}; + if (sent.count(pl.seq)) { + lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; + ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq); + ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq])); + conn_sent[m->get_connection()].pop_front(); + sent.erase(pl.seq); + } + m->put(); + got_new = true; + cond.notify_all(); + } + } + + int ms_handle_fast_authentication(Connection *con) override { + return 1; + } + + void reply_message(const Message *m, Payload& pl) { + pl.who = Payload::PONG; + bufferlist bl; + encode(pl, bl); + MPing *rm = new MPing(); + rm->set_data(bl); + m->get_connection()->send_message(rm); + lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl; + } + + void send_message_wrap(ConnectionRef con, const bufferlist& data) { + Message *m = new MPing(); + Payload pl{Payload::PING, index++, data}; + bufferlist bl; + encode(pl, bl); + m->set_data(bl); + if (!con->get_messenger()->get_default_policy().lossy) { + std::lock_guard l{lock}; + sent[pl.seq] = pl.data; + conn_sent[con].push_back(pl.seq); + } + lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl; + ASSERT_EQ(0, con->send_message(m)); + } + + uint64_t get_pending() { + std::lock_guard l{lock}; + return sent.size(); + } + + void clear_pending(ConnectionRef con) { + std::lock_guard l{lock}; + + for (list<uint64_t>::iterator it = conn_sent[con].begin(); + it != conn_sent[con].end(); ++it) + sent.erase(*it); + conn_sent.erase(con); + } + + void print() { + for (auto && p : conn_sent) { + if (!p.second.empty()) { + lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl; + } + } + } +}; + + +class SyntheticWorkload { + ceph::mutex lock = ceph::make_mutex("SyntheticWorkload::lock"); + ceph::condition_variable cond; + set<Messenger*> available_servers; + set<Messenger*> available_clients; + Messenger::Policy client_policy; + map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections; + SyntheticDispatcher dispatcher; + gen_type rng; + vector<bufferlist> rand_data; + DummyAuthClientServer dummy_auth; + + public: + const unsigned max_in_flight = 0; + const unsigned max_connections = 0; + static const unsigned max_message_len = 1024 * 1024 * 4; + + SyntheticWorkload(int servers, int clients, string type, int random_num, + Messenger::Policy srv_policy, Messenger::Policy cli_policy, + int _max_in_flight = 64, int _max_connections = 128) + : client_policy(cli_policy), + dispatcher(false, this), + rng(time(NULL)), + dummy_auth(g_ceph_context), + max_in_flight(_max_in_flight), + max_connections(_max_connections) { + + dummy_auth.auth_registry.refresh_config(); + Messenger *msgr; + int base_port = 16800; + entity_addr_t bind_addr; + char addr[64]; + for (int i = 0; i < servers; ++i) { + msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), + "server", getpid()+i); + snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d", + base_port+i); + bind_addr.parse(addr); + msgr->bind(bind_addr); + msgr->add_dispatcher_head(&dispatcher); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + + ceph_assert(msgr); + msgr->set_default_policy(srv_policy); + available_servers.insert(msgr); + msgr->start(); + } + + for (int i = 0; i < clients; ++i) { + msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1), + "client", getpid()+i+servers); + if (cli_policy.standby) { + snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d", + base_port+i+servers); + bind_addr.parse(addr); + msgr->bind(bind_addr); + } + msgr->add_dispatcher_head(&dispatcher); + msgr->set_auth_client(&dummy_auth); + msgr->set_auth_server(&dummy_auth); + + ceph_assert(msgr); + msgr->set_default_policy(cli_policy); + available_clients.insert(msgr); + msgr->start(); + } + + for (int i = 0; i < random_num; i++) { + bufferlist bl; + boost::uniform_int<> u(32, max_message_len); + uint64_t value_len = u(rng); + bufferptr bp(value_len); + bp.zero(); + for (uint64_t j = 0; j < value_len-sizeof(i); ) { + memcpy(bp.c_str()+j, &i, sizeof(i)); + j += 4096; + } + + bl.append(bp); + rand_data.push_back(bl); + } + } + + ConnectionRef _get_random_connection() { + while (dispatcher.get_pending() > max_in_flight) { + lock.unlock(); + usleep(500); + lock.lock(); + } + ceph_assert(ceph_mutex_is_locked(lock)); + boost::uniform_int<> choose(0, available_connections.size() - 1); + int index = choose(rng); + map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin(); + for (; index > 0; --index, ++i) ; + return i->first; + } + + bool can_create_connection() { + return available_connections.size() < max_connections; + } + + void generate_connection() { + std::lock_guard l{lock}; + if (!can_create_connection()) + return ; + + Messenger *server, *client; + { + boost::uniform_int<> choose(0, available_servers.size() - 1); + int index = choose(rng); + set<Messenger*>::iterator i = available_servers.begin(); + for (; index > 0; --index, ++i) ; + server = *i; + } + { + boost::uniform_int<> choose(0, available_clients.size() - 1); + int index = choose(rng); + set<Messenger*>::iterator i = available_clients.begin(); + for (; index > 0; --index, ++i) ; + client = *i; + } + + pair<Messenger*, Messenger*> p; + { + boost::uniform_int<> choose(0, available_servers.size() - 1); + if (server->get_default_policy().server) { + p = make_pair(client, server); + ConnectionRef conn = client->connect_to(server->get_mytype(), + server->get_myaddrs()); + available_connections[conn] = p; + } else { + ConnectionRef conn = client->connect_to(server->get_mytype(), + server->get_myaddrs()); + p = make_pair(client, server); + available_connections[conn] = p; + } + } + } + + void send_message() { + std::lock_guard l{lock}; + ConnectionRef conn = _get_random_connection(); + boost::uniform_int<> true_false(0, 99); + int val = true_false(rng); + if (val >= 95) { + uuid_d uuid; + uuid.generate_random(); + MCommand *m = new MCommand(uuid); + vector<string> cmds; + cmds.push_back("command"); + m->cmd = cmds; + m->set_priority(200); + conn->send_message(m); + } else { + boost::uniform_int<> u(0, rand_data.size()-1); + dispatcher.send_message_wrap(conn, rand_data[u(rng)]); + } + } + + void send_large_message(bool inject_network_congestion=false) { + std::lock_guard l{lock}; + ConnectionRef conn = _get_random_connection(); + uuid_d uuid; + uuid.generate_random(); + MCommand *m = new MCommand(uuid); + vector<string> cmds; + cmds.push_back("command"); + // set the random data to make the large message + bufferlist bl; + string s("abcdefghijklmnopqrstuvwxyz"); + for (int i = 0; i < 1024*256; i++) + bl.append(s); + // bl is around 6M + m->set_data(bl); + m->cmd = cmds; + m->set_priority(200); + // setup after connection is ready + if (inject_network_congestion && conn->is_connected()) { + g_ceph_context->_conf.set_val("ms_inject_network_congestion", "100"); + } else { + g_ceph_context->_conf.set_val("ms_inject_network_congestion", "0"); + } + conn->send_message(m); + } + + void drop_connection() { + std::lock_guard l{lock}; + if (available_connections.size() < 10) + return; + ConnectionRef conn = _get_random_connection(); + dispatcher.clear_pending(conn); + conn->mark_down(); + if (!client_policy.server && + !client_policy.lossy && + client_policy.standby) { + // it's a lossless policy, so we need to mark down each side + pair<Messenger*, Messenger*> &p = available_connections[conn]; + if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) { + ASSERT_EQ(conn->get_messenger(), p.first); + ConnectionRef peer = p.second->connect_to(p.first->get_mytype(), + p.first->get_myaddrs()); + peer->mark_down(); + dispatcher.clear_pending(peer); + available_connections.erase(peer); + } + } + ASSERT_EQ(available_connections.erase(conn), 1U); + } + + void print_internal_state(bool detail=false) { + std::lock_guard l{lock}; + lderr(g_ceph_context) << "available_connections: " << available_connections.size() + << " inflight messages: " << dispatcher.get_pending() << dendl; + if (detail && !available_connections.empty()) { + dispatcher.print(); + } + } + + void wait_for_done() { + int64_t tick_us = 1000 * 100; // 100ms + int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins + int i = 0; + while (dispatcher.get_pending()) { + usleep(tick_us); + timeout_us -= tick_us; + if (i++ % 50 == 0) + print_internal_state(true); + if (timeout_us < 0) + ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!"); + } + for (set<Messenger*>::iterator it = available_servers.begin(); + it != available_servers.end(); ++it) { + (*it)->shutdown(); + (*it)->wait(); + ASSERT_EQ((*it)->get_dispatch_queue_len(), 0); + delete (*it); + } + available_servers.clear(); + + for (set<Messenger*>::iterator it = available_clients.begin(); + it != available_clients.end(); ++it) { + (*it)->shutdown(); + (*it)->wait(); + ASSERT_EQ((*it)->get_dispatch_queue_len(), 0); + delete (*it); + } + available_clients.clear(); + } + + void handle_reset(Connection *con) { + std::lock_guard l{lock}; + available_connections.erase(con); + dispatcher.clear_pending(con); + } +}; + +bool SyntheticDispatcher::ms_handle_reset(Connection *con) { + workload->handle_reset(con); + return true; +} + +TEST_P(MessengerTest, SyntheticStressTest) { + SyntheticWorkload test_msg(8, 32, GetParam(), 100, + Messenger::Policy::stateful_server(0), + Messenger::Policy::lossless_client(0)); + for (int i = 0; i < 100; ++i) { + if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; + test_msg.generate_connection(); + } + gen_type rng(time(NULL)); + for (int i = 0; i < 5000; ++i) { + if (!(i % 10)) { + lderr(g_ceph_context) << "Op " << i << ": " << dendl; + test_msg.print_internal_state(); + } + boost::uniform_int<> true_false(0, 99); + int val = true_false(rng); + if (val > 90) { + test_msg.generate_connection(); + } else if (val > 80) { + test_msg.drop_connection(); + } else if (val > 10) { + test_msg.send_message(); + } else { + usleep(rand() % 1000 + 500); + } + } + test_msg.wait_for_done(); +} + +TEST_P(MessengerTest, SyntheticStressTest1) { + SyntheticWorkload test_msg(16, 32, GetParam(), 100, + Messenger::Policy::lossless_peer_reuse(0), + Messenger::Policy::lossless_peer_reuse(0)); + for (int i = 0; i < 10; ++i) { + if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; + test_msg.generate_connection(); + } + gen_type rng(time(NULL)); + for (int i = 0; i < 10000; ++i) { + if (!(i % 10)) { + lderr(g_ceph_context) << "Op " << i << ": " << dendl; + test_msg.print_internal_state(); + } + boost::uniform_int<> true_false(0, 99); + int val = true_false(rng); + if (val > 80) { + test_msg.generate_connection(); + } else if (val > 60) { + test_msg.drop_connection(); + } else if (val > 10) { + test_msg.send_message(); + } else { + usleep(rand() % 1000 + 500); + } + } + test_msg.wait_for_done(); +} + + +TEST_P(MessengerTest, SyntheticInjectTest) { + uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes; + g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30"); + g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); + g_ceph_context->_conf.set_val("ms_dispatch_throttle_bytes", "16777216"); + SyntheticWorkload test_msg(8, 32, GetParam(), 100, + Messenger::Policy::stateful_server(0), + Messenger::Policy::lossless_client(0)); + for (int i = 0; i < 100; ++i) { + if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; + test_msg.generate_connection(); + } + gen_type rng(time(NULL)); + for (int i = 0; i < 1000; ++i) { + if (!(i % 10)) { + lderr(g_ceph_context) << "Op " << i << ": " << dendl; + test_msg.print_internal_state(); + } + boost::uniform_int<> true_false(0, 99); + int val = true_false(rng); + if (val > 90) { + test_msg.generate_connection(); + } else if (val > 80) { + test_msg.drop_connection(); + } else if (val > 10) { + test_msg.send_message(); + } else { + usleep(rand() % 500 + 100); + } + } + test_msg.wait_for_done(); + g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); + g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); + g_ceph_context->_conf.set_val( + "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes)); +} + +TEST_P(MessengerTest, SyntheticInjectTest2) { + g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30"); + g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); + SyntheticWorkload test_msg(8, 16, GetParam(), 100, + Messenger::Policy::lossless_peer_reuse(0), + Messenger::Policy::lossless_peer_reuse(0)); + for (int i = 0; i < 100; ++i) { + if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; + test_msg.generate_connection(); + } + gen_type rng(time(NULL)); + for (int i = 0; i < 1000; ++i) { + if (!(i % 10)) { + lderr(g_ceph_context) << "Op " << i << ": " << dendl; + test_msg.print_internal_state(); + } + boost::uniform_int<> true_false(0, 99); + int val = true_false(rng); + if (val > 90) { + test_msg.generate_connection(); + } else if (val > 80) { + test_msg.drop_connection(); + } else if (val > 10) { + test_msg.send_message(); + } else { + usleep(rand() % 500 + 100); + } + } + test_msg.wait_for_done(); + g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); + g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); +} + +TEST_P(MessengerTest, SyntheticInjectTest3) { + g_ceph_context->_conf.set_val("ms_inject_socket_failures", "600"); + g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); + SyntheticWorkload test_msg(8, 16, GetParam(), 100, + Messenger::Policy::stateless_server(0), + Messenger::Policy::lossy_client(0)); + for (int i = 0; i < 100; ++i) { + if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; + test_msg.generate_connection(); + } + gen_type rng(time(NULL)); + for (int i = 0; i < 1000; ++i) { + if (!(i % 10)) { + lderr(g_ceph_context) << "Op " << i << ": " << dendl; + test_msg.print_internal_state(); + } + boost::uniform_int<> true_false(0, 99); + int val = true_false(rng); + if (val > 90) { + test_msg.generate_connection(); + } else if (val > 80) { + test_msg.drop_connection(); + } else if (val > 10) { + test_msg.send_message(); + } else { + usleep(rand() % 500 + 100); + } + } + test_msg.wait_for_done(); + g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); + g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); +} + + +TEST_P(MessengerTest, SyntheticInjectTest4) { + g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30"); + g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); + g_ceph_context->_conf.set_val("ms_inject_delay_probability", "1"); + g_ceph_context->_conf.set_val("ms_inject_delay_type", "client osd"); + g_ceph_context->_conf.set_val("ms_inject_delay_max", "5"); + SyntheticWorkload test_msg(16, 32, GetParam(), 100, + Messenger::Policy::lossless_peer(0), + Messenger::Policy::lossless_peer(0)); + for (int i = 0; i < 100; ++i) { + if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; + test_msg.generate_connection(); + } + gen_type rng(time(NULL)); + for (int i = 0; i < 1000; ++i) { + if (!(i % 10)) { + lderr(g_ceph_context) << "Op " << i << ": " << dendl; + test_msg.print_internal_state(); + } + boost::uniform_int<> true_false(0, 99); + int val = true_false(rng); + if (val > 95) { + test_msg.generate_connection(); + } else if (val > 80) { + // test_msg.drop_connection(); + } else if (val > 10) { + test_msg.send_message(); + } else { + usleep(rand() % 500 + 100); + } + } + test_msg.wait_for_done(); + g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); + g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); + g_ceph_context->_conf.set_val("ms_inject_delay_probability", "0"); + g_ceph_context->_conf.set_val("ms_inject_delay_type", ""); + g_ceph_context->_conf.set_val("ms_inject_delay_max", "0"); +} + +// This is test for network block, means ::send return EAGAIN +TEST_P(MessengerTest, SyntheticInjectTest5) { + SyntheticWorkload test_msg(1, 8, GetParam(), 100, + Messenger::Policy::stateful_server(0), + Messenger::Policy::lossless_client(0), + 64, 2); + bool simulate_network_congestion = true; + for (int i = 0; i < 2; ++i) + test_msg.generate_connection(); + for (int i = 0; i < 5000; ++i) { + if (!(i % 10)) { + ldout(g_ceph_context, 0) << "Op " << i << ": " << dendl; + test_msg.print_internal_state(); + } + if (i < 1600) { + // means that we would stuck 1600 * 6M (9.6G) around with 2 connections + test_msg.send_large_message(simulate_network_congestion); + } else { + simulate_network_congestion = false; + test_msg.send_large_message(simulate_network_congestion); + } + } + test_msg.wait_for_done(); +} + + +class MarkdownDispatcher : public Dispatcher { + ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock"); + set<ConnectionRef> conns; + bool last_mark; + public: + std::atomic<uint64_t> count = { 0 }; + explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), + last_mark(false) { + } + bool ms_can_fast_dispatch_any() const override { return false; } + bool ms_can_fast_dispatch(const Message *m) const override { + switch (m->get_type()) { + case CEPH_MSG_PING: + return true; + default: + return false; + } + } + + void ms_handle_fast_connect(Connection *con) override { + lderr(g_ceph_context) << __func__ << " " << con << dendl; + std::lock_guard l{lock}; + conns.insert(con); + } + void ms_handle_fast_accept(Connection *con) override { + std::lock_guard l{lock}; + conns.insert(con); + } + bool ms_dispatch(Message *m) override { + lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl; + std::lock_guard l{lock}; + count++; + conns.insert(m->get_connection()); + if (conns.size() < 2 && !last_mark) { + m->put(); + return true; + } + + last_mark = true; + usleep(rand() % 500); + for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) { + if ((*it) != m->get_connection().get()) { + (*it)->mark_down(); + conns.erase(it); + break; + } + } + if (conns.empty()) + last_mark = false; + m->put(); + return true; + } + bool ms_handle_reset(Connection *con) override { + lderr(g_ceph_context) << __func__ << " " << con << dendl; + std::lock_guard l{lock}; + conns.erase(con); + usleep(rand() % 500); + return true; + } + void ms_handle_remote_reset(Connection *con) override { + std::lock_guard l{lock}; + conns.erase(con); + lderr(g_ceph_context) << __func__ << " " << con << dendl; + } + bool ms_handle_refused(Connection *con) override { + return false; + } + void ms_fast_dispatch(Message *m) override { + ceph_abort(); + } + int ms_handle_fast_authentication(Connection *con) override { + return 1; + } +}; + + +// Markdown with external lock +TEST_P(MessengerTest, MarkdownTest) { + Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid()); + MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true); + DummyAuthClientServer dummy_auth(g_ceph_context); + dummy_auth.auth_registry.refresh_config(); + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1:16800"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->set_auth_client(&dummy_auth); + server_msgr->set_auth_server(&dummy_auth); + server_msgr->start(); + bind_addr.parse("v2:127.0.0.1:16801"); + server_msgr2->bind(bind_addr); + server_msgr2->add_dispatcher_head(&srv_dispatcher); + server_msgr2->set_auth_client(&dummy_auth); + server_msgr2->set_auth_server(&dummy_auth); + server_msgr2->start(); + + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->set_auth_client(&dummy_auth); + client_msgr->set_auth_server(&dummy_auth); + client_msgr->start(); + + int i = 1000; + uint64_t last = 0; + bool equal = false; + uint64_t equal_count = 0; + while (i--) { + ConnectionRef conn1 = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + ConnectionRef conn2 = client_msgr->connect_to(server_msgr2->get_mytype(), + server_msgr2->get_myaddrs()); + MPing *m = new MPing(); + ASSERT_EQ(conn1->send_message(m), 0); + m = new MPing(); + ASSERT_EQ(conn2->send_message(m), 0); + CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1); + if (srv_dispatcher.count == last) { + lderr(g_ceph_context) << __func__ << " last is " << last << dendl; + equal = true; + equal_count++; + } else { + equal = false; + equal_count = 0; + } + last = srv_dispatcher.count; + if (equal_count) + usleep(1000*500); + ASSERT_FALSE(equal && equal_count > 3); + } + server_msgr->shutdown(); + client_msgr->shutdown(); + server_msgr2->shutdown(); + server_msgr->wait(); + client_msgr->wait(); + server_msgr2->wait(); + delete server_msgr2; +} + +INSTANTIATE_TEST_SUITE_P( + Messenger, + MessengerTest, + ::testing::Values( + "async+posix" + ) +); + +int main(int argc, char **argv) { + auto args = argv_to_vec(argc, argv); + + auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + g_ceph_context->_conf.set_val("auth_cluster_required", "none"); + g_ceph_context->_conf.set_val("auth_service_required", "none"); + g_ceph_context->_conf.set_val("auth_client_required", "none"); + g_ceph_context->_conf.set_val("keyring", "/dev/null"); + g_ceph_context->_conf.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async"); + g_ceph_context->_conf.set_val("ms_die_on_bad_msg", "true"); + g_ceph_context->_conf.set_val("ms_die_on_old_message", "true"); + g_ceph_context->_conf.set_val("ms_max_backoff", "1"); + common_init_finish(g_ceph_context); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +/* + * Local Variables: + * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr" + * End: + */ diff --git a/src/test/msgr/test_userspace_event.cc b/src/test/msgr/test_userspace_event.cc new file mode 100644 index 000000000..067d81545 --- /dev/null +++ b/src/test/msgr/test_userspace_event.cc @@ -0,0 +1,174 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSky <haomai@xsky.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <map> +#include <random> +#include <gtest/gtest.h> + +#include "msg/async/dpdk/UserspaceEvent.h" +#include "global/global_context.h" + +class UserspaceManagerTest : public ::testing::Test { + public: + UserspaceEventManager *manager; + + UserspaceManagerTest() {} + virtual void SetUp() { + manager = new UserspaceEventManager(g_ceph_context); + } + virtual void TearDown() { + delete manager; + } +}; + +TEST_F(UserspaceManagerTest, BasicTest) { + int events[10]; + int masks[10]; + int fd = manager->get_eventfd(); + ASSERT_EQ(fd, 1); + ASSERT_EQ(0, manager->listen(fd, 1)); + ASSERT_EQ(0, manager->notify(fd, 1)); + ASSERT_EQ(1, manager->poll(events, masks, 10, nullptr)); + ASSERT_EQ(fd, events[0]); + ASSERT_EQ(1, masks[0]); + ASSERT_EQ(0, manager->notify(fd, 2)); + ASSERT_EQ(0, manager->poll(events, masks, 10, nullptr)); + ASSERT_EQ(0, manager->unlisten(fd, 1)); + ASSERT_EQ(0, manager->notify(fd, 1)); + ASSERT_EQ(0, manager->poll(events, masks, 10, nullptr)); + manager->close(fd); + fd = manager->get_eventfd(); + ASSERT_EQ(fd, 1); + ASSERT_EQ(0, manager->poll(events, masks, 10, nullptr)); +} + +TEST_F(UserspaceManagerTest, FailTest) { + int events[10]; + int masks[10]; + int fd = manager->get_eventfd(); + ASSERT_EQ(fd, 1); + ASSERT_EQ(-ENOENT, manager->listen(fd+1, 1)); + ASSERT_EQ(-ENOENT, manager->notify(fd+1, 1)); + ASSERT_EQ(0, manager->poll(events, masks, 10, nullptr)); + ASSERT_EQ(-ENOENT, manager->unlisten(fd+1, 1)); + manager->close(fd); +} + +TEST_F(UserspaceManagerTest, StressTest) { + std::vector<std::pair<int, int> > mappings; + int events[10]; + int masks[10]; + std::random_device rd; + std::default_random_engine rng(rd()); + std::uniform_int_distribution<> dist(0, 100); + + mappings.resize(1001); + mappings[0] = std::make_pair(-1, -1); + for (int i = 0; i < 1000; ++i) { + int fd = manager->get_eventfd(); + ASSERT_TRUE(fd > 0); + mappings[fd] = std::make_pair(0, 0); + } + int r = 0; + int fd = manager->get_eventfd(); + auto get_activate_count = [](std::vector<std::pair<int, int> > &m) { + std::vector<int> fds; + int mask = 0; + size_t idx = 0; + for (auto &&p : m) { + mask = p.first & p.second; + if (p.first != -1 && mask) { + p.second &= (~mask); + fds.push_back(idx); + std::cerr << " activate " << idx << " mask " << mask << std::endl; + } + ++idx; + } + return fds; + }; + for (int i = 0; i < 10000; ++i) { + int value = dist(rng); + fd = dist(rng) % mappings.size(); + auto &p = mappings[fd]; + int mask = dist(rng) % 2 + 1; + if (value > 55) { + r = manager->notify(fd, mask); + if (p.first == -1) { + ASSERT_EQ(p.second, -1); + ASSERT_EQ(r, -ENOENT); + } else { + p.second |= mask; + ASSERT_EQ(r, 0); + } + std::cerr << " notify fd " << fd << " mask " << mask << " r " << r << std::endl; + } else if (value > 45) { + r = manager->listen(fd, mask); + std::cerr << " listen fd " << fd << " mask " << mask << " r " << r << std::endl; + if (p.first == -1) { + ASSERT_EQ(p.second, -1); + ASSERT_EQ(r, -ENOENT); + } else { + p.first |= mask; + ASSERT_EQ(r, 0); + } + } else if (value > 35) { + r = manager->unlisten(fd, mask); + std::cerr << " unlisten fd " << fd << " mask " << mask << " r " << r << std::endl; + if (p.first == -1) { + ASSERT_EQ(p.second, -1); + ASSERT_EQ(r, -ENOENT); + } else { + p.first &= ~mask; + ASSERT_EQ(r, 0); + } + } else if (value > 20) { + std::set<int> actual, expected; + do { + r = manager->poll(events, masks, 3, nullptr); + std::cerr << " poll " << r; + for (int k = 0; k < r; ++k) { + std::cerr << events[k] << " "; + actual.insert(events[k]); + } + } while (r == 3); + std::cerr << std::endl; + auto fds = get_activate_count(mappings); + for (auto &&d : fds) + expected.insert(d); + ASSERT_EQ(expected, actual); + } else if (value > 10) { + r = manager->get_eventfd(); + std::cerr << " open fd " << r << std::endl; + ASSERT_TRUE(r > 0); + if ((size_t)r >= mappings.size()) + mappings.resize(r+1); + mappings[r] = std::make_pair(0, 0); + } else { + manager->close(fd); + std::cerr << " close fd " << fd << std::endl; + mappings[fd] = std::make_pair(-1, -1); + } + ASSERT_TRUE(manager->check()); + } +} + +/* + * Local Variables: + * compile-command: "cd ../.. ; make ceph_test_userspace_event && + * ./ceph_test_userspace_event.cc + * + * End: + */ |