summaryrefslogtreecommitdiffstats
path: root/src/test/msgr
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/test/msgr/CMakeLists.txt58
-rw-r--r--src/test/msgr/perf_msgr_client.cc219
-rw-r--r--src/test/msgr/perf_msgr_server.cc176
-rw-r--r--src/test/msgr/test_async_driver.cc354
-rw-r--r--src/test/msgr/test_async_networkstack.cc1072
-rw-r--r--src/test/msgr/test_comp_registry.cc98
-rw-r--r--src/test/msgr/test_frames_v2.cc483
-rw-r--r--src/test/msgr/test_msgr.cc2424
-rw-r--r--src/test/msgr/test_userspace_event.cc174
9 files changed, 5058 insertions, 0 deletions
diff --git a/src/test/msgr/CMakeLists.txt b/src/test/msgr/CMakeLists.txt
new file mode 100644
index 000000000..beaa7133d
--- /dev/null
+++ b/src/test/msgr/CMakeLists.txt
@@ -0,0 +1,58 @@
+# ceph_test_async_driver
+add_executable(ceph_test_async_driver
+ test_async_driver.cc
+ $<TARGET_OBJECTS:unit-main>
+ )
+target_link_libraries(ceph_test_async_driver os global ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS} ${UNITTEST_LIBS})
+
+# ceph_test_msgr
+add_executable(ceph_test_msgr
+ test_msgr.cc
+ )
+target_link_libraries(ceph_test_msgr os global ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS} ${UNITTEST_LIBS})
+
+# ceph_test_async_networkstack
+add_executable(ceph_test_async_networkstack
+ test_async_networkstack.cc
+ $<TARGET_OBJECTS:unit-main>
+ )
+target_link_libraries(ceph_test_async_networkstack global ${CRYPTO_LIBS} ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS} ${UNITTEST_LIBS})
+
+#ceph_perf_msgr_server
+add_executable(ceph_perf_msgr_server perf_msgr_server.cc)
+target_link_libraries(ceph_perf_msgr_server os global ${UNITTEST_LIBS})
+
+#ceph_perf_msgr_client
+add_executable(ceph_perf_msgr_client perf_msgr_client.cc)
+target_link_libraries(ceph_perf_msgr_client os global ${UNITTEST_LIBS})
+
+# unitttest_frames_v2
+add_executable(unittest_frames_v2 test_frames_v2.cc)
+add_ceph_unittest(unittest_frames_v2)
+target_link_libraries(unittest_frames_v2 os global ${UNITTEST_LIBS})
+
+add_executable(unittest_comp_registry
+ test_comp_registry.cc
+ $<TARGET_OBJECTS:unit-main>
+ )
+add_ceph_unittest(unittest_comp_registry)
+target_link_libraries(unittest_comp_registry global)
+
+# test_userspace_event
+if(HAVE_DPDK)
+ add_executable(ceph_test_userspace_event
+ test_userspace_event.cc
+ $<TARGET_OBJECTS:unit-main>)
+ target_link_libraries(ceph_test_userspace_event
+ global
+ ${CMAKE_DL_LIBS}
+ ${UNITTEST_LIBS})
+endif(HAVE_DPDK)
+
+install(TARGETS
+ ceph_test_async_driver
+ ceph_test_msgr
+ ceph_test_async_networkstack
+ ceph_perf_msgr_server
+ ceph_perf_msgr_client
+ DESTINATION ${CMAKE_INSTALL_BINDIR})
diff --git a/src/test/msgr/perf_msgr_client.cc b/src/test/msgr/perf_msgr_client.cc
new file mode 100644
index 000000000..ffbfc1614
--- /dev/null
+++ b/src/test/msgr/perf_msgr_client.cc
@@ -0,0 +1,219 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <string>
+#include <unistd.h>
+#include <iostream>
+
+using namespace std;
+
+#include "common/ceph_argparse.h"
+#include "common/debug.h"
+#include "common/Cycles.h"
+#include "global/global_init.h"
+#include "msg/Messenger.h"
+#include "messages/MOSDOp.h"
+#include "auth/DummyAuth.h"
+
+#include <atomic>
+
+class MessengerClient {
+ class ClientThread;
+ class ClientDispatcher : public Dispatcher {
+ uint64_t think_time;
+ ClientThread *thread;
+
+ public:
+ ClientDispatcher(uint64_t delay, ClientThread *t): Dispatcher(g_ceph_context), think_time(delay), thread(t) {}
+ bool ms_can_fast_dispatch_any() const override { return true; }
+ bool ms_can_fast_dispatch(const Message *m) const override {
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_OPREPLY:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ void ms_handle_fast_connect(Connection *con) override {}
+ void ms_handle_fast_accept(Connection *con) override {}
+ bool ms_dispatch(Message *m) override { return true; }
+ void ms_fast_dispatch(Message *m) override;
+ bool ms_handle_reset(Connection *con) override { return true; }
+ void ms_handle_remote_reset(Connection *con) override {}
+ bool ms_handle_refused(Connection *con) override { return false; }
+ int ms_handle_fast_authentication(Connection *con) override {
+ return 1;
+ }
+ };
+
+ class ClientThread : public Thread {
+ Messenger *msgr;
+ int concurrent;
+ ConnectionRef conn;
+ std::atomic<unsigned> client_inc = { 0 };
+ object_t oid;
+ object_locator_t oloc;
+ pg_t pgid;
+ int msg_len;
+ bufferlist data;
+ int ops;
+ ClientDispatcher dispatcher;
+
+ public:
+ ceph::mutex lock = ceph::make_mutex("MessengerBenchmark::ClientThread::lock");
+ ceph::condition_variable cond;
+ uint64_t inflight;
+
+ ClientThread(Messenger *m, int c, ConnectionRef con, int len, int ops, int think_time_us):
+ msgr(m), concurrent(c), conn(con), oid("object-name"), oloc(1, 1), msg_len(len), ops(ops),
+ dispatcher(think_time_us, this), inflight(0) {
+ m->add_dispatcher_head(&dispatcher);
+ bufferptr ptr(msg_len);
+ memset(ptr.c_str(), 0, msg_len);
+ data.append(ptr);
+ }
+ void *entry() override {
+ std::unique_lock locker{lock};
+ for (int i = 0; i < ops; ++i) {
+ if (inflight > uint64_t(concurrent)) {
+ cond.wait(locker);
+ }
+ hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(),
+ oloc.nspace);
+ spg_t spgid(pgid);
+ MOSDOp *m = new MOSDOp(client_inc, 0, hobj, spgid, 0, 0, 0);
+ bufferlist msg_data(data);
+ m->write(0, msg_len, msg_data);
+ inflight++;
+ conn->send_message(m);
+ //cerr << __func__ << " send m=" << m << std::endl;
+ }
+ locker.unlock();
+ msgr->shutdown();
+ return 0;
+ }
+ };
+
+ string type;
+ string serveraddr;
+ int think_time_us;
+ vector<Messenger*> msgrs;
+ vector<ClientThread*> clients;
+ DummyAuthClientServer dummy_auth;
+
+ public:
+ MessengerClient(const string &t, const string &addr, int delay):
+ type(t), serveraddr(addr), think_time_us(delay),
+ dummy_auth(g_ceph_context) {
+ }
+ ~MessengerClient() {
+ for (uint64_t i = 0; i < clients.size(); ++i)
+ delete clients[i];
+ for (uint64_t i = 0; i < msgrs.size(); ++i) {
+ msgrs[i]->shutdown();
+ msgrs[i]->wait();
+ }
+ }
+ void ready(int c, int jobs, int ops, int msg_len) {
+ entity_addr_t addr;
+ addr.parse(serveraddr.c_str());
+ addr.set_nonce(0);
+ dummy_auth.auth_registry.refresh_config();
+ for (int i = 0; i < jobs; ++i) {
+ Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i);
+ msgr->set_default_policy(Messenger::Policy::lossless_client(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->start();
+ entity_addrvec_t addrs(addr);
+ ConnectionRef conn = msgr->connect_to_osd(addrs);
+ ClientThread *t = new ClientThread(msgr, c, conn, msg_len, ops, think_time_us);
+ msgrs.push_back(msgr);
+ clients.push_back(t);
+ }
+ usleep(1000*1000);
+ }
+ void start() {
+ for (uint64_t i = 0; i < clients.size(); ++i)
+ clients[i]->create("client");
+ for (uint64_t i = 0; i < msgrs.size(); ++i)
+ msgrs[i]->wait();
+ }
+};
+
+void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message *m) {
+ usleep(think_time);
+ m->put();
+ std::lock_guard l{thread->lock};
+ thread->inflight--;
+ thread->cond.notify_all();
+}
+
+
+void usage(const string &name) {
+ cout << "Usage: " << name << " [server ip:port] [numjobs] [concurrency] [ios] [thinktime us] [msg length]" << std::endl;
+ cout << " [server ip:port]: connect to the ip:port pair" << std::endl;
+ cout << " [numjobs]: how much client threads spawned and do benchmark" << std::endl;
+ cout << " [concurrency]: the max inflight messages(like iodepth in fio)" << std::endl;
+ cout << " [ios]: how much messages sent for each client" << std::endl;
+ cout << " [thinktime]: sleep time when do fast dispatching(match client logic)" << std::endl;
+ cout << " [msg length]: message data bytes" << std::endl;
+}
+
+int main(int argc, char **argv)
+{
+ auto args = argv_to_vec(argc, argv);
+
+ auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ common_init_finish(g_ceph_context);
+ g_ceph_context->_conf.apply_changes(nullptr);
+
+ if (args.size() < 6) {
+ usage(argv[0]);
+ return 1;
+ }
+
+ int numjobs = atoi(args[1]);
+ int concurrent = atoi(args[2]);
+ int ios = atoi(args[3]);
+ int think_time = atoi(args[4]);
+ int len = atoi(args[5]);
+
+ std::string public_msgr_type = g_ceph_context->_conf->ms_public_type.empty() ? g_ceph_context->_conf.get_val<std::string>("ms_type") : g_ceph_context->_conf->ms_public_type;
+
+ cout << " using ms-public-type " << public_msgr_type << std::endl;
+ cout << " server ip:port " << args[0] << std::endl;
+ cout << " numjobs " << numjobs << std::endl;
+ cout << " concurrency " << concurrent << std::endl;
+ cout << " ios " << ios << std::endl;
+ cout << " thinktime(us) " << think_time << std::endl;
+ cout << " message data bytes " << len << std::endl;
+
+ MessengerClient client(public_msgr_type, args[0], think_time);
+
+ client.ready(concurrent, numjobs, ios, len);
+ Cycles::init();
+ uint64_t start = Cycles::rdtsc();
+ client.start();
+ uint64_t stop = Cycles::rdtsc();
+ cout << " Total op " << (ios * numjobs) << " run time " << Cycles::to_microseconds(stop - start) << "us." << std::endl;
+
+ return 0;
+}
diff --git a/src/test/msgr/perf_msgr_server.cc b/src/test/msgr/perf_msgr_server.cc
new file mode 100644
index 000000000..0c492ab17
--- /dev/null
+++ b/src/test/msgr/perf_msgr_server.cc
@@ -0,0 +1,176 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <string>
+#include <unistd.h>
+#include <iostream>
+
+using namespace std;
+
+#include "common/ceph_argparse.h"
+#include "common/debug.h"
+#include "common/WorkQueue.h"
+#include "global/global_init.h"
+#include "msg/Messenger.h"
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+#include "auth/DummyAuth.h"
+
+class ServerDispatcher : public Dispatcher {
+ uint64_t think_time;
+ ThreadPool op_tp;
+ class OpWQ : public ThreadPool::WorkQueue<Message> {
+ list<Message*> messages;
+
+ public:
+ OpWQ(ceph::timespan timeout, ceph::timespan suicide_timeout, ThreadPool *tp)
+ : ThreadPool::WorkQueue<Message>("ServerDispatcher::OpWQ", timeout, suicide_timeout, tp) {}
+
+ bool _enqueue(Message *m) override {
+ messages.push_back(m);
+ return true;
+ }
+ void _dequeue(Message *m) override {
+ ceph_abort();
+ }
+ bool _empty() override {
+ return messages.empty();
+ }
+ Message *_dequeue() override {
+ if (messages.empty())
+ return NULL;
+ Message *m = messages.front();
+ messages.pop_front();
+ return m;
+ }
+ void _process(Message *m, ThreadPool::TPHandle &handle) override {
+ MOSDOp *osd_op = static_cast<MOSDOp*>(m);
+ MOSDOpReply *reply = new MOSDOpReply(osd_op, 0, 0, 0, false);
+ m->get_connection()->send_message(reply);
+ m->put();
+ }
+ void _process_finish(Message *m) override { }
+ void _clear() override {
+ ceph_assert(messages.empty());
+ }
+ } op_wq;
+
+ public:
+ ServerDispatcher(int threads, uint64_t delay): Dispatcher(g_ceph_context), think_time(delay),
+ op_tp(g_ceph_context, "ServerDispatcher::op_tp", "tp_serv_disp", threads, "serverdispatcher_op_threads"),
+ op_wq(ceph::make_timespan(30), ceph::make_timespan(30), &op_tp) {
+ op_tp.start();
+ }
+ ~ServerDispatcher() override {
+ op_tp.stop();
+ }
+ bool ms_can_fast_dispatch_any() const override { return true; }
+ bool ms_can_fast_dispatch(const Message *m) const override {
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ void ms_handle_fast_connect(Connection *con) override {}
+ void ms_handle_fast_accept(Connection *con) override {}
+ bool ms_dispatch(Message *m) override { return true; }
+ bool ms_handle_reset(Connection *con) override { return true; }
+ void ms_handle_remote_reset(Connection *con) override {}
+ bool ms_handle_refused(Connection *con) override { return false; }
+ void ms_fast_dispatch(Message *m) override {
+ usleep(think_time);
+ //cerr << __func__ << " reply message=" << m << std::endl;
+ op_wq.queue(m);
+ }
+ int ms_handle_fast_authentication(Connection *con) override {
+ return 1;
+ }
+};
+
+class MessengerServer {
+ Messenger *msgr;
+ string type;
+ string bindaddr;
+ ServerDispatcher dispatcher;
+ DummyAuthClientServer dummy_auth;
+
+ public:
+ MessengerServer(const string &t, const string &addr, int threads, int delay):
+ msgr(NULL), type(t), bindaddr(addr), dispatcher(threads, delay),
+ dummy_auth(g_ceph_context) {
+ msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0);
+ msgr->set_default_policy(Messenger::Policy::stateless_server(0));
+ dummy_auth.auth_registry.refresh_config();
+ msgr->set_auth_server(&dummy_auth);
+ }
+ ~MessengerServer() {
+ msgr->shutdown();
+ msgr->wait();
+ }
+ void start() {
+ entity_addr_t addr;
+ addr.parse(bindaddr.c_str());
+ msgr->bind(addr);
+ msgr->add_dispatcher_head(&dispatcher);
+ msgr->start();
+ msgr->wait();
+ }
+};
+
+void usage(const string &name) {
+ cerr << "Usage: " << name << " [bind ip:port] [server worker threads] [thinktime us]" << std::endl;
+ cerr << " [bind ip:port]: The ip:port pair to bind, client need to specify this pair to connect" << std::endl;
+ cerr << " [server worker threads]: threads will process incoming messages and reply(matching pg threads)" << std::endl;
+ cerr << " [thinktime]: sleep time when do dispatching(match fast dispatch logic in OSD.cc)" << std::endl;
+}
+
+int main(int argc, char **argv)
+{
+ auto args = argv_to_vec(argc, argv);
+
+ auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ common_init_finish(g_ceph_context);
+ g_ceph_context->_conf.apply_changes(nullptr);
+
+ if (args.size() < 3) {
+ usage(argv[0]);
+ return 1;
+ }
+
+ int worker_threads = atoi(args[1]);
+ int think_time = atoi(args[2]);
+ std::string public_msgr_type = g_ceph_context->_conf->ms_public_type.empty() ? g_ceph_context->_conf.get_val<std::string>("ms_type") : g_ceph_context->_conf->ms_public_type;
+
+ cerr << " This tool won't handle connection error alike things, " << std::endl;
+ cerr << "please ensure the proper network environment to test." << std::endl;
+ cerr << " Or ctrl+c when meeting error and restart tests" << std::endl;
+ cerr << " using ms-public-type " << public_msgr_type << std::endl;
+ cerr << " bind ip:port " << args[0] << std::endl;
+ cerr << " worker threads " << worker_threads << std::endl;
+ cerr << " thinktime(us) " << think_time << std::endl;
+
+ MessengerServer server(public_msgr_type, args[0], worker_threads, think_time);
+ server.start();
+
+ return 0;
+}
diff --git a/src/test/msgr/test_async_driver.cc b/src/test/msgr/test_async_driver.cc
new file mode 100644
index 000000000..6cf4211b6
--- /dev/null
+++ b/src/test/msgr/test_async_driver.cc
@@ -0,0 +1,354 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifdef __APPLE__
+#include <AvailabilityMacros.h>
+#endif
+
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <arpa/inet.h>
+#include "include/Context.h"
+#include "common/ceph_mutex.h"
+#include "common/Cond.h"
+#include "global/global_init.h"
+#include "common/ceph_argparse.h"
+#include "msg/async/Event.h"
+
+#include <atomic>
+
+// We use epoll, kqueue, evport, select in descending order by performance.
+#if defined(__linux__)
+#define HAVE_EPOLL 1
+#endif
+
+#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
+#define HAVE_KQUEUE 1
+#endif
+
+#ifdef __sun
+#include <sys/feature_tests.h>
+#ifdef _DTRACE_VERSION
+#define HAVE_EVPORT 1
+#endif
+#endif
+
+#ifdef HAVE_EPOLL
+#include "msg/async/EventEpoll.h"
+#endif
+#ifdef HAVE_KQUEUE
+#include "msg/async/EventKqueue.h"
+#endif
+#include "msg/async/EventSelect.h"
+
+#include <gtest/gtest.h>
+
+using namespace std;
+
+class EventDriverTest : public ::testing::TestWithParam<const char*> {
+ public:
+ EventDriver *driver;
+
+ EventDriverTest(): driver(0) {}
+ void SetUp() override {
+ cerr << __func__ << " start set up " << GetParam() << std::endl;
+#ifdef HAVE_EPOLL
+ if (strcmp(GetParam(), "epoll"))
+ driver = new EpollDriver(g_ceph_context);
+#endif
+#ifdef HAVE_KQUEUE
+ if (strcmp(GetParam(), "kqueue"))
+ driver = new KqueueDriver(g_ceph_context);
+#endif
+ if (strcmp(GetParam(), "select"))
+ driver = new SelectDriver(g_ceph_context);
+ driver->init(NULL, 100);
+ }
+ void TearDown() override {
+ delete driver;
+ }
+};
+
+int set_nonblock(int sd)
+{
+ int flags;
+
+ /* Set the socket nonblocking.
+ * Note that fcntl(2) for F_GETFL and F_SETFL can't be
+ * interrupted by a signal. */
+ if ((flags = fcntl(sd, F_GETFL)) < 0 ) {
+ return -1;
+ }
+ if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) {
+ return -1;
+ }
+ return 0;
+}
+
+
+TEST_P(EventDriverTest, PipeTest) {
+ int fds[2];
+ vector<FiredFileEvent> fired_events;
+ int r;
+ struct timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 1;
+
+ r = pipe(fds);
+ ASSERT_EQ(r, 0);
+ r = driver->add_event(fds[0], EVENT_NONE, EVENT_READABLE);
+ ASSERT_EQ(r, 0);
+ r = driver->event_wait(fired_events, &tv);
+ ASSERT_EQ(r, 0);
+
+ char c = 'A';
+ r = write(fds[1], &c, sizeof(c));
+ ASSERT_EQ(r, 1);
+ r = driver->event_wait(fired_events, &tv);
+ ASSERT_EQ(r, 1);
+ ASSERT_EQ(fired_events[0].fd, fds[0]);
+
+
+ fired_events.clear();
+ r = write(fds[1], &c, sizeof(c));
+ ASSERT_EQ(r, 1);
+ r = driver->event_wait(fired_events, &tv);
+ ASSERT_EQ(r, 1);
+ ASSERT_EQ(fired_events[0].fd, fds[0]);
+
+ fired_events.clear();
+ driver->del_event(fds[0], EVENT_READABLE, EVENT_READABLE);
+ r = write(fds[1], &c, sizeof(c));
+ ASSERT_EQ(r, 1);
+ r = driver->event_wait(fired_events, &tv);
+ ASSERT_EQ(r, 0);
+}
+
+void* echoclient(void *arg)
+{
+ intptr_t port = (intptr_t)arg;
+ struct sockaddr_in sa;
+ memset(&sa, 0, sizeof(sa));
+ sa.sin_family = AF_INET;
+ sa.sin_port = htons(port);
+ char addr[] = "127.0.0.1";
+ int r = inet_pton(AF_INET, addr, &sa.sin_addr);
+ ceph_assert(r == 1);
+
+ int connect_sd = ::socket(AF_INET, SOCK_STREAM, 0);
+ if (connect_sd >= 0) {
+ r = connect(connect_sd, (struct sockaddr*)&sa, sizeof(sa));
+ ceph_assert(r == 0);
+ int t = 0;
+
+ do {
+ char c[] = "banner";
+ r = write(connect_sd, c, sizeof(c));
+ char d[100];
+ r = read(connect_sd, d, sizeof(d));
+ if (r == 0)
+ break;
+ if (t++ == 30)
+ break;
+ } while (1);
+ ::close(connect_sd);
+ }
+ return 0;
+}
+
+TEST_P(EventDriverTest, NetworkSocketTest) {
+ int listen_sd = ::socket(AF_INET, SOCK_STREAM, 0);
+ ASSERT_TRUE(listen_sd > 0);
+ int on = 1;
+ int r = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+ ASSERT_EQ(r, 0);
+ r = set_nonblock(listen_sd);
+ ASSERT_EQ(r, 0);
+ struct sockaddr_in sa;
+ long port = 0;
+ for (port = 38788; port < 40000; port++) {
+ memset(&sa,0,sizeof(sa));
+ sa.sin_family = AF_INET;
+ sa.sin_port = htons(port);
+ sa.sin_addr.s_addr = htonl(INADDR_ANY);
+
+ r = ::bind(listen_sd, (struct sockaddr *)&sa, sizeof(sa));
+ if (r == 0) {
+ break;
+ }
+ }
+ ASSERT_EQ(r, 0);
+ r = listen(listen_sd, 511);
+ ASSERT_EQ(r, 0);
+
+ vector<FiredFileEvent> fired_events;
+ struct timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 1;
+ r = driver->add_event(listen_sd, EVENT_NONE, EVENT_READABLE);
+ ASSERT_EQ(r, 0);
+ r = driver->event_wait(fired_events, &tv);
+ ASSERT_EQ(r, 0);
+
+ fired_events.clear();
+ pthread_t thread1;
+ r = pthread_create(&thread1, NULL, echoclient, (void*)(intptr_t)port);
+ ASSERT_EQ(r, 0);
+ tv.tv_sec = 5;
+ tv.tv_usec = 0;
+ r = driver->event_wait(fired_events, &tv);
+ ASSERT_EQ(r, 1);
+ ASSERT_EQ(fired_events[0].fd, listen_sd);
+
+ fired_events.clear();
+ int client_sd = ::accept(listen_sd, NULL, NULL);
+ ASSERT_TRUE(client_sd > 0);
+ r = driver->add_event(client_sd, EVENT_NONE, EVENT_READABLE);
+ ASSERT_EQ(r, 0);
+
+ do {
+ fired_events.clear();
+ tv.tv_sec = 5;
+ tv.tv_usec = 0;
+ r = driver->event_wait(fired_events, &tv);
+ ASSERT_EQ(1, r);
+ ASSERT_EQ(EVENT_READABLE, fired_events[0].mask);
+
+ fired_events.clear();
+ char data[100];
+ r = ::read(client_sd, data, sizeof(data));
+ if (r == 0)
+ break;
+ ASSERT_GT(r, 0);
+ r = driver->add_event(client_sd, EVENT_READABLE, EVENT_WRITABLE);
+ ASSERT_EQ(0, r);
+ r = driver->event_wait(fired_events, &tv);
+ ASSERT_EQ(1, r);
+ ASSERT_EQ(fired_events[0].mask, EVENT_WRITABLE);
+ r = write(client_sd, data, strlen(data));
+ ASSERT_EQ((int)strlen(data), r);
+ driver->del_event(client_sd, EVENT_READABLE|EVENT_WRITABLE,
+ EVENT_WRITABLE);
+ } while (1);
+
+ ::close(client_sd);
+ ::close(listen_sd);
+}
+
+class FakeEvent : public EventCallback {
+
+ public:
+ void do_request(uint64_t fd_or_id) override {}
+};
+
+TEST(EventCenterTest, FileEventExpansion) {
+ vector<int> sds;
+ EventCenter center(g_ceph_context);
+ center.init(100, 0, "posix");
+ center.set_owner();
+ EventCallbackRef e(new FakeEvent());
+ for (int i = 0; i < 300; i++) {
+ int sd = ::socket(AF_INET, SOCK_STREAM, 0);
+ center.create_file_event(sd, EVENT_READABLE, e);
+ sds.push_back(sd);
+ }
+
+ for (vector<int>::iterator it = sds.begin(); it != sds.end(); ++it)
+ center.delete_file_event(*it, EVENT_READABLE);
+}
+
+
+class Worker : public Thread {
+ CephContext *cct;
+ bool done;
+
+ public:
+ EventCenter center;
+ explicit Worker(CephContext *c, int idx): cct(c), done(false), center(c) {
+ center.init(100, idx, "posix");
+ }
+ void stop() {
+ done = true;
+ center.wakeup();
+ }
+ void* entry() override {
+ center.set_owner();
+ while (!done)
+ center.process_events(1000000);
+ return 0;
+ }
+};
+
+class CountEvent: public EventCallback {
+ std::atomic<unsigned> *count;
+ ceph::mutex *lock;
+ ceph::condition_variable *cond;
+
+ public:
+ CountEvent(std::atomic<unsigned> *atomic,
+ ceph::mutex *l, ceph::condition_variable *c)
+ : count(atomic), lock(l), cond(c) {}
+ void do_request(uint64_t id) override {
+ std::scoped_lock l{*lock};
+ (*count)--;
+ cond->notify_all();
+ }
+};
+
+TEST(EventCenterTest, DispatchTest) {
+ Worker worker1(g_ceph_context, 1), worker2(g_ceph_context, 2);
+ std::atomic<unsigned> count = { 0 };
+ ceph::mutex lock = ceph::make_mutex("DispatchTest::lock");
+ ceph::condition_variable cond;
+ worker1.create("worker_1");
+ worker2.create("worker_2");
+ for (int i = 0; i < 10000; ++i) {
+ count++;
+ worker1.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
+ count++;
+ worker2.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
+ std::unique_lock l{lock};
+ cond.wait(l, [&] { return count == 0; });
+ }
+ worker1.stop();
+ worker2.stop();
+ worker1.join();
+ worker2.join();
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ AsyncMessenger,
+ EventDriverTest,
+ ::testing::Values(
+#ifdef HAVE_EPOLL
+ "epoll",
+#endif
+#ifdef HAVE_KQUEUE
+ "kqueue",
+#endif
+ "select"
+ )
+);
+
+/*
+ * Local Variables:
+ * compile-command: "cd ../.. ; make ceph_test_async_driver &&
+ * ./ceph_test_async_driver
+ *
+ * End:
+ */
diff --git a/src/test/msgr/test_async_networkstack.cc b/src/test/msgr/test_async_networkstack.cc
new file mode 100644
index 000000000..14923fd7d
--- /dev/null
+++ b/src/test/msgr/test_async_networkstack.cc
@@ -0,0 +1,1072 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <algorithm>
+#include <atomic>
+#include <iostream>
+#include <list>
+#include <random>
+#include <string>
+#include <set>
+#include <vector>
+#include <gtest/gtest.h>
+
+#include "acconfig.h"
+#include "common/config_obs.h"
+#include "include/Context.h"
+#include "msg/async/Event.h"
+#include "msg/async/Stack.h"
+
+using namespace std;
+
+class NoopConfigObserver : public md_config_obs_t {
+ std::list<std::string> options;
+ const char **ptrs = 0;
+
+public:
+ NoopConfigObserver(std::list<std::string> l) : options(l) {
+ ptrs = new const char*[options.size() + 1];
+ unsigned j = 0;
+ for (auto& i : options) {
+ ptrs[j++] = i.c_str();
+ }
+ ptrs[j] = 0;
+ }
+ ~NoopConfigObserver() {
+ delete[] ptrs;
+ }
+
+ const char** get_tracked_conf_keys() const override {
+ return ptrs;
+ }
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set <std::string> &changed) override {
+ }
+};
+
+class NetworkWorkerTest : public ::testing::TestWithParam<const char*> {
+ public:
+ std::shared_ptr<NetworkStack> stack;
+ string addr, port_addr;
+
+ NoopConfigObserver fake_obs = {{"ms_type",
+ "ms_dpdk_coremask",
+ "ms_dpdk_host_ipv4_addr",
+ "ms_dpdk_gateway_ipv4_addr",
+ "ms_dpdk_netmask_ipv4_addr"}};
+
+ NetworkWorkerTest() {}
+ void SetUp() override {
+ cerr << __func__ << " start set up " << GetParam() << std::endl;
+ if (strncmp(GetParam(), "dpdk", 4)) {
+ g_ceph_context->_conf.set_val("ms_type", "async+posix");
+ addr = "127.0.0.1:15000";
+ port_addr = "127.0.0.1:15001";
+ } else {
+ g_ceph_context->_conf.set_val_or_die("ms_dpdk_debug_allow_loopback", "true");
+ g_ceph_context->_conf.set_val_or_die("ms_async_op_threads", "2");
+ string ipv4_addr = g_ceph_context->_conf.get_val<std::string>("ms_dpdk_host_ipv4_addr");
+ addr = ipv4_addr + std::string(":15000");
+ port_addr = ipv4_addr + std::string(":15001");
+ }
+ stack = NetworkStack::create(g_ceph_context, GetParam());
+ stack->start();
+ }
+ void TearDown() override {
+ stack->stop();
+ }
+ string get_addr() const {
+ return addr;
+ }
+ string get_ip_different_port() const {
+ return port_addr;
+ }
+ string get_different_ip() const {
+ return "10.0.123.100:4323";
+ }
+ EventCenter *get_center(unsigned i) {
+ return &stack->get_worker(i)->center;
+ }
+ Worker *get_worker(unsigned i) {
+ return stack->get_worker(i);
+ }
+ template<typename func>
+ class C_dispatch : public EventCallback {
+ Worker *worker;
+ func f;
+ std::atomic_bool done;
+ public:
+ C_dispatch(Worker *w, func &&_f): worker(w), f(std::move(_f)), done(false) {}
+ void do_request(uint64_t id) override {
+ f(worker);
+ done = true;
+ }
+ void wait() {
+ int us = 1000 * 1000 * 1000;
+ while (!done) {
+ ASSERT_TRUE(us > 0);
+ usleep(100);
+ us -= 100;
+ }
+ }
+ };
+ template<typename func>
+ void exec_events(func &&f) {
+ std::vector<C_dispatch<func>*> dis;
+ for (unsigned i = 0; i < stack->get_num_worker(); ++i) {
+ Worker *w = stack->get_worker(i);
+ C_dispatch<func> *e = new C_dispatch<func>(w, std::move(f));
+ stack->get_worker(i)->center.dispatch_event_external(e);
+ dis.push_back(e);
+ }
+
+ for (auto &&e : dis) {
+ e->wait();
+ delete e;
+ }
+ }
+};
+
+class C_poll : public EventCallback {
+ EventCenter *center;
+ std::atomic<bool> woken;
+ static const int sleepus = 500;
+
+ public:
+ explicit C_poll(EventCenter *c): center(c), woken(false) {}
+ void do_request(uint64_t r) override {
+ woken = true;
+ }
+ bool poll(int milliseconds) {
+ auto start = ceph::coarse_real_clock::now();
+ while (!woken) {
+ center->process_events(sleepus);
+ usleep(sleepus);
+ auto r = std::chrono::duration_cast<std::chrono::milliseconds>(
+ ceph::coarse_real_clock::now() - start);
+ if (r >= std::chrono::milliseconds(milliseconds))
+ break;
+ }
+ return woken;
+ }
+ void reset() {
+ woken = false;
+ }
+};
+
+TEST_P(NetworkWorkerTest, SimpleTest) {
+ entity_addr_t bind_addr;
+ ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
+ std::atomic_bool accepted(false);
+ std::atomic_bool *accepted_p = &accepted;
+
+ exec_events([this, accepted_p, bind_addr](Worker *worker) mutable {
+ entity_addr_t cli_addr;
+ SocketOptions options;
+ ServerSocket bind_socket;
+ EventCenter *center = &worker->center;
+ ssize_t r = 0;
+ if (stack->support_local_listen_table() || worker->id == 0)
+ r = worker->listen(bind_addr, 0, options, &bind_socket);
+ ASSERT_EQ(0, r);
+
+ ConnectedSocket cli_socket, srv_socket;
+ if (worker->id == 0) {
+ r = worker->connect(bind_addr, options, &cli_socket);
+ ASSERT_EQ(0, r);
+ }
+
+ bool is_my_accept = false;
+ if (bind_socket) {
+ C_poll cb(center);
+ center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
+ if (cb.poll(500)) {
+ *accepted_p = true;
+ is_my_accept = true;
+ }
+ ASSERT_TRUE(*accepted_p);
+ center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
+ }
+
+ if (is_my_accept) {
+ r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
+ ASSERT_EQ(0, r);
+ ASSERT_TRUE(srv_socket.fd() > 0);
+ }
+
+ if (worker->id == 0) {
+ C_poll cb(center);
+ center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
+ r = cli_socket.is_connected();
+ if (r == 0) {
+ ASSERT_EQ(true, cb.poll(500));
+ r = cli_socket.is_connected();
+ }
+ ASSERT_EQ(1, r);
+ center->delete_file_event(cli_socket.fd(), EVENT_READABLE);
+ }
+
+ const char *message = "this is a new message";
+ int len = strlen(message);
+ bufferlist bl;
+ bl.append(message, len);
+ if (worker->id == 0) {
+ r = cli_socket.send(bl, false);
+ ASSERT_EQ(len, r);
+ }
+
+ char buf[1024];
+ C_poll cb(center);
+ if (is_my_accept) {
+ center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb);
+ {
+ r = srv_socket.read(buf, sizeof(buf));
+ while (r == -EAGAIN) {
+ ASSERT_TRUE(cb.poll(500));
+ r = srv_socket.read(buf, sizeof(buf));
+ cb.reset();
+ }
+ ASSERT_EQ(len, r);
+ ASSERT_EQ(0, memcmp(buf, message, len));
+ }
+ bind_socket.abort_accept();
+ }
+ if (worker->id == 0) {
+ cli_socket.shutdown();
+ // ack delay is 200 ms
+ }
+
+ bl.clear();
+ bl.append(message, len);
+ if (worker->id == 0) {
+ r = cli_socket.send(bl, false);
+ ASSERT_EQ(-EPIPE, r);
+ }
+ if (is_my_accept) {
+ cb.reset();
+ ASSERT_TRUE(cb.poll(500));
+ r = srv_socket.read(buf, sizeof(buf));
+ if (r == -EAGAIN) {
+ cb.reset();
+ ASSERT_TRUE(cb.poll(1000*500));
+ r = srv_socket.read(buf, sizeof(buf));
+ }
+ ASSERT_EQ(0, r);
+ center->delete_file_event(srv_socket.fd(), EVENT_READABLE);
+ srv_socket.close();
+ }
+ });
+}
+
+TEST_P(NetworkWorkerTest, ConnectFailedTest) {
+ entity_addr_t bind_addr;
+ ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
+
+ exec_events([this, bind_addr](Worker *worker) mutable {
+ EventCenter *center = &worker->center;
+ entity_addr_t cli_addr;
+ SocketOptions options;
+ ServerSocket bind_socket;
+ int r = 0;
+ if (stack->support_local_listen_table() || worker->id == 0)
+ r = worker->listen(bind_addr, 0, options, &bind_socket);
+ ASSERT_EQ(0, r);
+
+ ConnectedSocket cli_socket1, cli_socket2;
+ if (worker->id == 0) {
+ ASSERT_TRUE(cli_addr.parse(get_ip_different_port().c_str()));
+ r = worker->connect(cli_addr, options, &cli_socket1);
+ ASSERT_EQ(0, r);
+ C_poll cb(center);
+ center->create_file_event(cli_socket1.fd(), EVENT_READABLE, &cb);
+ r = cli_socket1.is_connected();
+ if (r == 0) {
+ ASSERT_TRUE(cb.poll(500));
+ r = cli_socket1.is_connected();
+ }
+ ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET);
+ }
+
+ if (worker->id == 1) {
+ ASSERT_TRUE(cli_addr.parse(get_different_ip().c_str()));
+ r = worker->connect(cli_addr, options, &cli_socket2);
+ ASSERT_EQ(0, r);
+ C_poll cb(center);
+ center->create_file_event(cli_socket2.fd(), EVENT_READABLE, &cb);
+ r = cli_socket2.is_connected();
+ if (r == 0) {
+ cb.poll(500);
+ r = cli_socket2.is_connected();
+ }
+ ASSERT_TRUE(r != 1);
+ center->delete_file_event(cli_socket2.fd(), EVENT_READABLE);
+ }
+ });
+}
+
+TEST_P(NetworkWorkerTest, ListenTest) {
+ Worker *worker = get_worker(0);
+ entity_addr_t bind_addr;
+ ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
+ SocketOptions options;
+ ServerSocket bind_socket1, bind_socket2;
+ int r = worker->listen(bind_addr, 0, options, &bind_socket1);
+ ASSERT_EQ(0, r);
+
+ r = worker->listen(bind_addr, 0, options, &bind_socket2);
+ ASSERT_EQ(-EADDRINUSE, r);
+}
+
+TEST_P(NetworkWorkerTest, AcceptAndCloseTest) {
+ entity_addr_t bind_addr;
+ ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
+ std::atomic_bool accepted(false);
+ std::atomic_bool *accepted_p = &accepted;
+ std::atomic_int unbind_count(stack->get_num_worker());
+ std::atomic_int *count_p = &unbind_count;
+ exec_events([this, bind_addr, accepted_p, count_p](Worker *worker) mutable {
+ SocketOptions options;
+ EventCenter *center = &worker->center;
+ entity_addr_t cli_addr;
+ int r = 0;
+ {
+ ServerSocket bind_socket;
+ if (stack->support_local_listen_table() || worker->id == 0)
+ r = worker->listen(bind_addr, 0, options, &bind_socket);
+ ASSERT_EQ(0, r);
+
+ ConnectedSocket srv_socket, cli_socket;
+ if (bind_socket) {
+ r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
+ ASSERT_EQ(-EAGAIN, r);
+ }
+
+ C_poll cb(center);
+ if (worker->id == 0) {
+ center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
+ r = worker->connect(bind_addr, options, &cli_socket);
+ ASSERT_EQ(0, r);
+ ASSERT_TRUE(cb.poll(500));
+ }
+
+ if (bind_socket) {
+ cb.reset();
+ cb.poll(500);
+ ConnectedSocket srv_socket2;
+ do {
+ r = bind_socket.accept(&srv_socket2, options, &cli_addr, worker);
+ usleep(100);
+ } while (r == -EAGAIN && !*accepted_p);
+ if (r == 0)
+ *accepted_p = true;
+ ASSERT_TRUE(*accepted_p);
+ // srv_socket2 closed
+ center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
+ }
+
+ if (worker->id == 0) {
+ char buf[100];
+ cb.reset();
+ center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
+ int i = 3;
+ while (!i--) {
+ ASSERT_TRUE(cb.poll(500));
+ r = cli_socket.read(buf, sizeof(buf));
+ if (r == 0)
+ break;
+ }
+ ASSERT_EQ(0, r);
+ center->delete_file_event(cli_socket.fd(), EVENT_READABLE);
+ }
+
+ if (bind_socket)
+ center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
+ if (worker->id == 0) {
+ *accepted_p = false;
+ r = worker->connect(bind_addr, options, &cli_socket);
+ ASSERT_EQ(0, r);
+ cb.reset();
+ ASSERT_TRUE(cb.poll(500));
+ cli_socket.close();
+ }
+
+ if (bind_socket) {
+ do {
+ r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
+ usleep(100);
+ } while (r == -EAGAIN && !*accepted_p);
+ if (r == 0)
+ *accepted_p = true;
+ ASSERT_TRUE(*accepted_p);
+ center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
+ }
+ // unbind
+ }
+
+ --*count_p;
+ while (*count_p > 0)
+ usleep(100);
+
+ ConnectedSocket cli_socket;
+ r = worker->connect(bind_addr, options, &cli_socket);
+ ASSERT_EQ(0, r);
+ {
+ C_poll cb(center);
+ center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
+ r = cli_socket.is_connected();
+ if (r == 0) {
+ ASSERT_TRUE(cb.poll(500));
+ r = cli_socket.is_connected();
+ }
+ ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET);
+ }
+ });
+}
+
+TEST_P(NetworkWorkerTest, ComplexTest) {
+ entity_addr_t bind_addr;
+ std::atomic_bool listen_done(false);
+ std::atomic_bool *listen_p = &listen_done;
+ std::atomic_bool accepted(false);
+ std::atomic_bool *accepted_p = &accepted;
+ std::atomic_bool done(false);
+ std::atomic_bool *done_p = &done;
+ ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
+ exec_events([this, bind_addr, listen_p, accepted_p, done_p](Worker *worker) mutable {
+ entity_addr_t cli_addr;
+ EventCenter *center = &worker->center;
+ SocketOptions options;
+ ServerSocket bind_socket;
+ int r = 0;
+ if (stack->support_local_listen_table() || worker->id == 0) {
+ r = worker->listen(bind_addr, 0, options, &bind_socket);
+ ASSERT_EQ(0, r);
+ *listen_p = true;
+ }
+ ConnectedSocket cli_socket, srv_socket;
+ if (worker->id == 1) {
+ while (!*listen_p || stack->support_local_listen_table()) {
+ usleep(50);
+ r = worker->connect(bind_addr, options, &cli_socket);
+ ASSERT_EQ(0, r);
+ if (stack->support_local_listen_table())
+ break;
+ }
+ }
+
+ if (bind_socket) {
+ C_poll cb(center);
+ center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
+ int count = 3;
+ while (count--) {
+ if (cb.poll(500)) {
+ r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
+ ASSERT_EQ(0, r);
+ *accepted_p = true;
+ break;
+ }
+ }
+ ASSERT_TRUE(*accepted_p);
+ center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
+ }
+
+ if (worker->id == 1) {
+ C_poll cb(center);
+ center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb);
+ r = cli_socket.is_connected();
+ if (r == 0) {
+ ASSERT_TRUE(cb.poll(500));
+ r = cli_socket.is_connected();
+ }
+ ASSERT_EQ(1, r);
+ center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE);
+ }
+
+ const size_t message_size = 10240;
+ size_t count = 100;
+ string message(message_size, '!');
+ for (size_t i = 0; i < message_size; i += 100)
+ message[i] = ',';
+ size_t len = message_size * count;
+ C_poll cb(center);
+ if (worker->id == 1)
+ center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb);
+ if (srv_socket)
+ center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb);
+ size_t left = len;
+ len *= 2;
+ string read_string;
+ int again_count = 0;
+ int c = 2;
+ bufferlist bl;
+ for (size_t i = 0; i < count; ++i)
+ bl.push_back(bufferptr((char*)message.data(), message_size));
+ while (!*done_p) {
+ again_count = 0;
+ if (worker->id == 1) {
+ if (c > 0) {
+ ssize_t r = 0;
+ usleep(100);
+ if (left > 0) {
+ r = cli_socket.send(bl, false);
+ ASSERT_TRUE(r >= 0 || r == -EAGAIN);
+ if (r > 0)
+ left -= r;
+ if (r == -EAGAIN)
+ ++again_count;
+ }
+ if (left == 0) {
+ --c;
+ left = message_size * count;
+ ASSERT_EQ(0U, bl.length());
+ for (size_t i = 0; i < count; ++i)
+ bl.push_back(bufferptr((char*)message.data(), message_size));
+ }
+ }
+ }
+
+ if (srv_socket) {
+ char buf[1000];
+ if (len > 0) {
+ r = srv_socket.read(buf, sizeof(buf));
+ ASSERT_TRUE(r > 0 || r == -EAGAIN);
+ if (r > 0) {
+ read_string.append(buf, r);
+ len -= r;
+ } else if (r == -EAGAIN) {
+ ++again_count;
+ }
+ }
+ if (len == 0) {
+ for (size_t i = 0; i < read_string.size(); i += message_size)
+ ASSERT_EQ(0, memcmp(read_string.c_str()+i, message.c_str(), message_size));
+ *done_p = true;
+ }
+ }
+ if (again_count) {
+ cb.reset();
+ cb.poll(500);
+ }
+ }
+ if (worker->id == 1)
+ center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE);
+ if (srv_socket)
+ center->delete_file_event(srv_socket.fd(), EVENT_READABLE);
+
+ if (bind_socket)
+ bind_socket.abort_accept();
+ if (srv_socket)
+ srv_socket.close();
+ if (worker->id == 1)
+ cli_socket.close();
+ });
+}
+
+class StressFactory {
+ struct Client;
+ struct Server;
+ struct ThreadData {
+ Worker *worker;
+ std::set<Client*> clients;
+ std::set<Server*> servers;
+ ~ThreadData() {
+ for (auto && i : clients)
+ delete i;
+ for (auto && i : servers)
+ delete i;
+ }
+ };
+
+ struct RandomString {
+ size_t slen;
+ vector<std::string> strs;
+ std::random_device rd;
+ std::default_random_engine rng;
+
+ explicit RandomString(size_t s): slen(s), rng(rd()) {}
+ void prepare(size_t n) {
+ static const char alphabet[] =
+ "abcdefghijklmnopqrstuvwxyz"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "0123456789";
+
+ std::uniform_int_distribution<> dist(
+ 0, sizeof(alphabet) / sizeof(*alphabet) - 2);
+
+ strs.reserve(n);
+ std::generate_n(
+ std::back_inserter(strs), strs.capacity(), [&] {
+ std::string str;
+ str.reserve(slen);
+ std::generate_n(std::back_inserter(str), slen, [&]() {
+ return alphabet[dist(rng)];
+ });
+ return str;
+ }
+ );
+ }
+ std::string &get_random_string() {
+ std::uniform_int_distribution<> dist(
+ 0, strs.size() - 1);
+ return strs[dist(rng)];
+ }
+ };
+ struct Message {
+ size_t idx;
+ size_t len;
+ std::string content;
+
+ explicit Message(RandomString &rs, size_t i, size_t l): idx(i) {
+ size_t slen = rs.slen;
+ len = std::max(slen, l);
+
+ std::vector<std::string> strs;
+ strs.reserve(len / slen);
+ std::generate_n(
+ std::back_inserter(strs), strs.capacity(), [&] {
+ return rs.get_random_string();
+ }
+ );
+ len = slen * strs.size();
+ content.reserve(len);
+ for (auto &&s : strs)
+ content.append(s);
+ }
+ bool verify(const char *b, size_t len = 0) const {
+ return content.compare(0, len, b, 0, len) == 0;
+ }
+ };
+
+ template <typename T>
+ class C_delete : public EventCallback {
+ T *ctxt;
+ public:
+ explicit C_delete(T *c): ctxt(c) {}
+ void do_request(uint64_t id) override {
+ delete ctxt;
+ delete this;
+ }
+ };
+
+ class Client {
+ StressFactory *factory;
+ EventCenter *center;
+ ConnectedSocket socket;
+ std::deque<StressFactory::Message*> acking;
+ std::deque<StressFactory::Message*> writings;
+ std::string buffer;
+ size_t index = 0;
+ size_t left;
+ bool write_enabled = false;
+ size_t read_offset = 0, write_offset = 0;
+ bool first = true;
+ bool dead = false;
+ StressFactory::Message homeless_message;
+
+ class Client_read_handle : public EventCallback {
+ Client *c;
+ public:
+ explicit Client_read_handle(Client *_c): c(_c) {}
+ void do_request(uint64_t id) override {
+ c->do_read_request();
+ }
+ } read_ctxt;
+
+ class Client_write_handle : public EventCallback {
+ Client *c;
+ public:
+ explicit Client_write_handle(Client *_c): c(_c) {}
+ void do_request(uint64_t id) override {
+ c->do_write_request();
+ }
+ } write_ctxt;
+
+ public:
+ Client(StressFactory *f, EventCenter *cen, ConnectedSocket s, size_t c)
+ : factory(f), center(cen), socket(std::move(s)), left(c), homeless_message(factory->rs, -1, 1024),
+ read_ctxt(this), write_ctxt(this) {
+ center->create_file_event(
+ socket.fd(), EVENT_READABLE, &read_ctxt);
+ center->dispatch_event_external(&read_ctxt);
+ }
+ void close() {
+ ASSERT_FALSE(write_enabled);
+ dead = true;
+ socket.shutdown();
+ center->delete_file_event(socket.fd(), EVENT_READABLE);
+ center->dispatch_event_external(new C_delete<Client>(this));
+ }
+
+ void do_read_request() {
+ if (dead)
+ return ;
+ ASSERT_TRUE(socket.is_connected() >= 0);
+ if (!socket.is_connected())
+ return ;
+ ASSERT_TRUE(!acking.empty() || first);
+ if (first) {
+ first = false;
+ center->dispatch_event_external(&write_ctxt);
+ if (acking.empty())
+ return ;
+ }
+ StressFactory::Message *m = acking.front();
+ int r = 0;
+ if (buffer.empty())
+ buffer.resize(m->len);
+ bool must_no = false;
+ while (true) {
+ r = socket.read((char*)buffer.data() + read_offset,
+ m->len - read_offset);
+ ASSERT_TRUE(r == -EAGAIN || r > 0);
+ if (r == -EAGAIN)
+ break;
+ read_offset += r;
+
+ std::cerr << " client " << this << " receive " << m->idx << " len " << r << " content: " << std::endl;
+ ASSERT_FALSE(must_no);
+ if ((m->len - read_offset) == 0) {
+ ASSERT_TRUE(m->verify(buffer.data(), 0));
+ delete m;
+ acking.pop_front();
+ read_offset = 0;
+ buffer.clear();
+ if (acking.empty()) {
+ m = &homeless_message;
+ must_no = true;
+ } else {
+ m = acking.front();
+ buffer.resize(m->len);
+ }
+ }
+ }
+ if (acking.empty()) {
+ center->dispatch_event_external(&write_ctxt);
+ return ;
+ }
+ }
+
+ void do_write_request() {
+ if (dead)
+ return ;
+ ASSERT_TRUE(socket.is_connected() > 0);
+
+ while (left > 0 && factory->queue_depth > writings.size() + acking.size()) {
+ StressFactory::Message *m = new StressFactory::Message(
+ factory->rs, ++index,
+ factory->rd() % factory->max_message_length);
+ std::cerr << " client " << this << " generate message " << m->idx << " length " << m->len << std::endl;
+ ASSERT_EQ(m->len, m->content.size());
+ writings.push_back(m);
+ --left;
+ --factory->message_left;
+ }
+
+ while (!writings.empty()) {
+ StressFactory::Message *m = writings.front();
+ bufferlist bl;
+ bl.append(m->content.data() + write_offset, m->content.size() - write_offset);
+ ssize_t r = socket.send(bl, false);
+ if (r == 0)
+ break;
+ std::cerr << " client " << this << " send " << m->idx << " len " << r << " content: " << std::endl;
+ ASSERT_TRUE(r >= 0);
+ write_offset += r;
+ if (write_offset == m->content.size()) {
+ write_offset = 0;
+ writings.pop_front();
+ acking.push_back(m);
+ }
+ }
+ if (writings.empty() && write_enabled) {
+ center->delete_file_event(socket.fd(), EVENT_WRITABLE);
+ write_enabled = false;
+ } else if (!writings.empty() && !write_enabled) {
+ ASSERT_EQ(0, center->create_file_event(
+ socket.fd(), EVENT_WRITABLE, &write_ctxt));
+ write_enabled = true;
+ }
+ }
+
+ bool finish() const {
+ return left == 0 && acking.empty() && writings.empty();
+ }
+ };
+ friend class Client;
+
+ class Server {
+ StressFactory *factory;
+ EventCenter *center;
+ ConnectedSocket socket;
+ std::deque<std::string> buffers;
+ bool write_enabled = false;
+ bool dead = false;
+
+ class Server_read_handle : public EventCallback {
+ Server *s;
+ public:
+ explicit Server_read_handle(Server *_s): s(_s) {}
+ void do_request(uint64_t id) override {
+ s->do_read_request();
+ }
+ } read_ctxt;
+
+ class Server_write_handle : public EventCallback {
+ Server *s;
+ public:
+ explicit Server_write_handle(Server *_s): s(_s) {}
+ void do_request(uint64_t id) override {
+ s->do_write_request();
+ }
+ } write_ctxt;
+
+ public:
+ Server(StressFactory *f, EventCenter *c, ConnectedSocket s):
+ factory(f), center(c), socket(std::move(s)), read_ctxt(this), write_ctxt(this) {
+ center->create_file_event(socket.fd(), EVENT_READABLE, &read_ctxt);
+ center->dispatch_event_external(&read_ctxt);
+ }
+ void close() {
+ ASSERT_FALSE(write_enabled);
+ socket.shutdown();
+ center->delete_file_event(socket.fd(), EVENT_READABLE);
+ center->dispatch_event_external(new C_delete<Server>(this));
+ }
+ void do_read_request() {
+ if (dead)
+ return ;
+ int r = 0;
+ while (true) {
+ char buf[4096];
+ bufferptr data;
+ r = socket.read(buf, sizeof(buf));
+ ASSERT_TRUE(r == -EAGAIN || (r >= 0 && (size_t)r <= sizeof(buf)));
+ if (r == 0) {
+ ASSERT_TRUE(buffers.empty());
+ dead = true;
+ return ;
+ } else if (r == -EAGAIN)
+ break;
+ buffers.emplace_back(buf, 0, r);
+ std::cerr << " server " << this << " receive " << r << " content: " << std::endl;
+ }
+ if (!buffers.empty() && !write_enabled)
+ center->dispatch_event_external(&write_ctxt);
+ }
+
+ void do_write_request() {
+ if (dead)
+ return ;
+
+ while (!buffers.empty()) {
+ bufferlist bl;
+ auto it = buffers.begin();
+ for (size_t i = 0; i < buffers.size(); ++i) {
+ bl.push_back(bufferptr((char*)it->data(), it->size()));
+ ++it;
+ }
+
+ ssize_t r = socket.send(bl, false);
+ std::cerr << " server " << this << " send " << r << std::endl;
+ if (r == 0)
+ break;
+ ASSERT_TRUE(r >= 0);
+ while (r > 0) {
+ ASSERT_TRUE(!buffers.empty());
+ string &buffer = buffers.front();
+ if (r >= (int)buffer.size()) {
+ r -= (int)buffer.size();
+ buffers.pop_front();
+ } else {
+ std::cerr << " server " << this << " sent " << r << std::endl;
+ buffer = buffer.substr(r, buffer.size());
+ break;
+ }
+ }
+ }
+ if (buffers.empty()) {
+ if (write_enabled) {
+ center->delete_file_event(socket.fd(), EVENT_WRITABLE);
+ write_enabled = false;
+ }
+ } else if (!write_enabled) {
+ ASSERT_EQ(0, center->create_file_event(
+ socket.fd(), EVENT_WRITABLE, &write_ctxt));
+ write_enabled = true;
+ }
+ }
+
+ bool finish() {
+ return dead;
+ }
+ };
+ friend class Server;
+
+ class C_accept : public EventCallback {
+ StressFactory *factory;
+ ServerSocket bind_socket;
+ ThreadData *t_data;
+ Worker *worker;
+
+ public:
+ C_accept(StressFactory *f, ServerSocket s, ThreadData *data, Worker *w)
+ : factory(f), bind_socket(std::move(s)), t_data(data), worker(w) {}
+ void do_request(uint64_t id) override {
+ while (true) {
+ entity_addr_t cli_addr;
+ ConnectedSocket srv_socket;
+ SocketOptions options;
+ int r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
+ if (r == -EAGAIN) {
+ break;
+ }
+ ASSERT_EQ(0, r);
+ ASSERT_TRUE(srv_socket.fd() > 0);
+ Server *cb = new Server(factory, &t_data->worker->center, std::move(srv_socket));
+ t_data->servers.insert(cb);
+ }
+ }
+ };
+ friend class C_accept;
+
+ public:
+ static const size_t min_client_send_messages = 100;
+ static const size_t max_client_send_messages = 1000;
+ std::shared_ptr<NetworkStack> stack;
+ RandomString rs;
+ std::random_device rd;
+ const size_t client_num, queue_depth, max_message_length;
+ atomic_int message_count, message_left;
+ entity_addr_t bind_addr;
+ std::atomic_bool already_bind = {false};
+ SocketOptions options;
+
+ explicit StressFactory(const std::shared_ptr<NetworkStack> &s, const string &addr,
+ size_t cli, size_t qd, size_t mc, size_t l)
+ : stack(s), rs(128), client_num(cli), queue_depth(qd),
+ max_message_length(l), message_count(mc), message_left(mc) {
+ bind_addr.parse(addr.c_str());
+ rs.prepare(100);
+ }
+ ~StressFactory() {
+ }
+
+ void add_client(ThreadData *t_data) {
+ static ceph::mutex lock = ceph::make_mutex("add_client_lock");
+ std::lock_guard l{lock};
+ ConnectedSocket sock;
+ int r = t_data->worker->connect(bind_addr, options, &sock);
+ std::default_random_engine rng(rd());
+ std::uniform_int_distribution<> dist(
+ min_client_send_messages, max_client_send_messages);
+ ASSERT_EQ(0, r);
+ int c = dist(rng);
+ if (c > message_count.load())
+ c = message_count.load();
+ Client *cb = new Client(this, &t_data->worker->center, std::move(sock), c);
+ t_data->clients.insert(cb);
+ message_count -= c;
+ }
+
+ void drop_client(ThreadData *t_data, Client *c) {
+ c->close();
+ ASSERT_EQ(1U, t_data->clients.erase(c));
+ }
+
+ void drop_server(ThreadData *t_data, Server *s) {
+ s->close();
+ ASSERT_EQ(1U, t_data->servers.erase(s));
+ }
+
+ void start(Worker *worker) {
+ int r = 0;
+ ThreadData t_data;
+ t_data.worker = worker;
+ ServerSocket bind_socket;
+ if (stack->support_local_listen_table() || worker->id == 0) {
+ r = worker->listen(bind_addr, 0, options, &bind_socket);
+ ASSERT_EQ(0, r);
+ already_bind = true;
+ }
+ while (!already_bind)
+ usleep(50);
+ C_accept *accept_handler = nullptr;
+ int bind_fd = 0;
+ if (bind_socket) {
+ bind_fd = bind_socket.fd();
+ accept_handler = new C_accept(this, std::move(bind_socket), &t_data, worker);
+ ASSERT_EQ(0, worker->center.create_file_event(
+ bind_fd, EVENT_READABLE, accept_handler));
+ }
+
+ int echo_throttle = message_count;
+ while (message_count > 0 || !t_data.clients.empty() || !t_data.servers.empty()) {
+ if (message_count > 0 && t_data.clients.size() < client_num && t_data.servers.size() < client_num)
+ add_client(&t_data);
+ for (auto &&c : t_data.clients) {
+ if (c->finish()) {
+ drop_client(&t_data, c);
+ break;
+ }
+ }
+ for (auto &&s : t_data.servers) {
+ if (s->finish()) {
+ drop_server(&t_data, s);
+ break;
+ }
+ }
+
+ worker->center.process_events(1);
+ if (echo_throttle > message_left) {
+ std::cerr << " clients " << t_data.clients.size() << " servers " << t_data.servers.size()
+ << " message count " << message_left << std::endl;
+ echo_throttle -= 100;
+ }
+ }
+ if (bind_fd)
+ worker->center.delete_file_event(bind_fd, EVENT_READABLE);
+ delete accept_handler;
+ }
+};
+
+TEST_P(NetworkWorkerTest, StressTest) {
+ StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024);
+ StressFactory *f = &factory;
+ exec_events([f](Worker *worker) mutable {
+ f->start(worker);
+ });
+ ASSERT_EQ(0, factory.message_left);
+}
+
+
+INSTANTIATE_TEST_SUITE_P(
+ NetworkStack,
+ NetworkWorkerTest,
+ ::testing::Values(
+#ifdef HAVE_DPDK
+ "dpdk",
+#endif
+ "posix"
+ )
+);
+
+/*
+ * Local Variables:
+ * compile-command: "cd ../.. ; make ceph_test_async_networkstack &&
+ * ./ceph_test_async_networkstack
+ *
+ * End:
+ */
diff --git a/src/test/msgr/test_comp_registry.cc b/src/test/msgr/test_comp_registry.cc
new file mode 100644
index 000000000..d5513e2e4
--- /dev/null
+++ b/src/test/msgr/test_comp_registry.cc
@@ -0,0 +1,98 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/types.h"
+#include "include/stringify.h"
+#include "compressor/Compressor.h"
+#include "msg/compressor_registry.h"
+#include "gtest/gtest.h"
+#include "common/ceph_context.h"
+#include "global/global_context.h"
+
+#include <sstream>
+
+TEST(CompressorRegistry, con_modes)
+{
+ auto cct = g_ceph_context;
+ CompressorRegistry reg(cct);
+ std::vector<uint32_t> methods;
+ uint32_t method;
+ uint32_t mode;
+
+ const std::vector<uint32_t> snappy_method = { Compressor::COMP_ALG_SNAPPY };
+ const std::vector<uint32_t> zlib_method = { Compressor::COMP_ALG_ZLIB };
+ const std::vector<uint32_t> both_methods = { Compressor::COMP_ALG_ZLIB, Compressor::COMP_ALG_SNAPPY};
+ const std::vector<uint32_t> no_method = { Compressor::COMP_ALG_NONE };
+
+ cct->_conf.set_val(
+ "enable_experimental_unrecoverable_data_corrupting_features", "*");
+
+ // baseline: compression for communication with osd is enabled
+ cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT);
+ cct->_conf.set_val("ms_osd_compress_mode", "force");
+ cct->_conf.set_val("ms_osd_compression_algorithm", "snappy");
+ cct->_conf.set_val("ms_compress_secure", "false");
+ cct->_conf.apply_changes(NULL);
+
+ ASSERT_EQ(reg.get_is_compress_secure(), false);
+
+ methods = reg.get_methods(CEPH_ENTITY_TYPE_MON);
+ ASSERT_EQ(methods.size(), 0);
+ method = reg.pick_method(CEPH_ENTITY_TYPE_MON, both_methods);
+ ASSERT_EQ(method, Compressor::COMP_ALG_NONE);
+ mode = reg.get_mode(CEPH_ENTITY_TYPE_MON, false);
+ ASSERT_EQ(mode, Compressor::COMP_NONE);
+
+ methods = reg.get_methods(CEPH_ENTITY_TYPE_OSD);
+ ASSERT_EQ(methods, snappy_method);
+ const std::vector<uint32_t> rev_both_methods (both_methods.rbegin(), both_methods.rend());
+ method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, rev_both_methods);
+ ASSERT_EQ(method, Compressor::COMP_ALG_SNAPPY);
+ mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false);
+ ASSERT_EQ(mode, Compressor::COMP_FORCE);
+ mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, true);
+ ASSERT_EQ(mode, Compressor::COMP_NONE);
+
+ method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, zlib_method);
+ ASSERT_EQ(method, Compressor::COMP_ALG_NONE);
+
+ // disable compression mode
+ cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT);
+ cct->_conf.set_val("ms_osd_compress_mode", "none");
+ cct->_conf.apply_changes(NULL);
+
+ mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false);
+ ASSERT_EQ(mode, Compressor::COMP_NONE);
+
+ // no compression methods
+ cct->_conf.set_val("ms_osd_compress_mode", "force");
+ cct->_conf.set_val("ms_osd_compression_algorithm", "none");
+ cct->_conf.apply_changes(NULL);
+
+ method = reg.pick_method(CEPH_ENTITY_TYPE_OSD, both_methods);
+ ASSERT_EQ(method, Compressor::COMP_ALG_NONE);
+
+ // min compression size
+ cct->_conf.set_val("ms_osd_compress_min_size", "1024");
+ cct->_conf.apply_changes(NULL);
+
+ uint32_t s = reg.get_min_compression_size(CEPH_ENTITY_TYPE_OSD);
+ ASSERT_EQ(s, 1024);
+
+ // allow secure with compression
+ cct->_conf.set_val("ms_osd_compress_mode", "force");
+ cct->_conf.set_val("ms_osd_compression_algorithm", "snappy");
+ cct->_conf.set_val("ms_compress_secure", "true");
+ cct->_conf.apply_changes(NULL);
+
+ ASSERT_EQ(reg.get_is_compress_secure(), true);
+
+ mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, true);
+ ASSERT_EQ(mode, Compressor::COMP_FORCE);
+
+ mode = reg.get_mode(CEPH_ENTITY_TYPE_OSD, false);
+ ASSERT_EQ(mode, Compressor::COMP_FORCE);
+
+ // back to normalish, for the benefit of the next test(s)
+ cct->_set_module_type(CEPH_ENTITY_TYPE_CLIENT);
+}
diff --git a/src/test/msgr/test_frames_v2.cc b/src/test/msgr/test_frames_v2.cc
new file mode 100644
index 000000000..c5be32a9c
--- /dev/null
+++ b/src/test/msgr/test_frames_v2.cc
@@ -0,0 +1,483 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "msg/async/frames_v2.h"
+
+#include <numeric>
+#include <ostream>
+#include <string>
+#include <tuple>
+
+#include "msg/async/compression_meta.h"
+#include "auth/Auth.h"
+#include "common/ceph_argparse.h"
+#include "global/global_init.h"
+#include "global/global_context.h"
+#include "include/Context.h"
+
+#include <gtest/gtest.h>
+
+#define COMP_THRESHOLD 1 << 10
+#define EXPECT_COMPRESSED(is_compressed, val1, val2) \
+ if (is_compressed && val1 > COMP_THRESHOLD) { \
+ EXPECT_GE(val1, val2); \
+ } else { \
+ EXPECT_EQ(val1, val2); \
+ }
+
+using namespace std;
+
+namespace ceph::msgr::v2 {
+
+// MessageFrame with the first segment not fixed to ceph_msg_header2
+struct TestFrame : Frame<TestFrame,
+ /* four segments */
+ segment_t::DEFAULT_ALIGNMENT,
+ segment_t::DEFAULT_ALIGNMENT,
+ segment_t::DEFAULT_ALIGNMENT,
+ segment_t::PAGE_SIZE_ALIGNMENT> {
+ static constexpr Tag tag = static_cast<Tag>(123);
+
+ static TestFrame Encode(const bufferlist& header,
+ const bufferlist& front,
+ const bufferlist& middle,
+ const bufferlist& data) {
+ TestFrame f;
+ f.segments[SegmentIndex::Msg::HEADER] = header;
+ f.segments[SegmentIndex::Msg::FRONT] = front;
+ f.segments[SegmentIndex::Msg::MIDDLE] = middle;
+ f.segments[SegmentIndex::Msg::DATA] = data;
+
+ // discard cached crcs for perf tests
+ f.segments[SegmentIndex::Msg::HEADER].invalidate_crc();
+ f.segments[SegmentIndex::Msg::FRONT].invalidate_crc();
+ f.segments[SegmentIndex::Msg::MIDDLE].invalidate_crc();
+ f.segments[SegmentIndex::Msg::DATA].invalidate_crc();
+ return f;
+ }
+
+ static TestFrame Decode(segment_bls_t& segment_bls) {
+ TestFrame f;
+ // Transfer segments' bufferlists. If segment_bls contains
+ // less than SegmentsNumV segments, the missing ones will be
+ // seen as empty.
+ for (size_t i = 0; i < segment_bls.size(); i++) {
+ f.segments[i] = std::move(segment_bls[i]);
+ }
+ return f;
+ }
+
+ bufferlist& header() {
+ return segments[SegmentIndex::Msg::HEADER];
+ }
+ bufferlist& front() {
+ return segments[SegmentIndex::Msg::FRONT];
+ }
+ bufferlist& middle() {
+ return segments[SegmentIndex::Msg::MIDDLE];
+ }
+ bufferlist& data() {
+ return segments[SegmentIndex::Msg::DATA];
+ }
+
+protected:
+ using Frame::Frame;
+};
+
+struct mode_t {
+ bool is_rev1;
+ bool is_secure;
+ bool is_compress;
+};
+
+static std::ostream& operator<<(std::ostream& os, const mode_t& m) {
+ os << "msgr2." << (m.is_rev1 ? "1" : "0")
+ << (m.is_secure ? "-secure" : "-crc")
+ << (m.is_compress ? "-compress": "-nocompress");
+ return os;
+}
+
+static const mode_t modes[] = {
+ {false, false, false},
+ {false, true, false},
+ {true, false, false},
+ {true, true, false},
+ {false, false, true},
+ {false, true, true},
+ {true, false, true},
+ {true, true, true}
+};
+
+struct round_trip_instance_t {
+ uint32_t header_len;
+ uint32_t front_len;
+ uint32_t middle_len;
+ uint32_t data_len;
+
+ // expected number of segments (same for each mode)
+ size_t num_segments;
+ // expected layout (different for each mode)
+ uint32_t onwire_lens[4][MAX_NUM_SEGMENTS + 2];
+};
+
+static std::ostream& operator<<(std::ostream& os,
+ const round_trip_instance_t& rti) {
+ os << rti.header_len << "+" << rti.front_len << "+"
+ << rti.middle_len << "+" << rti.data_len;
+ return os;
+}
+
+static bufferlist make_bufferlist(size_t len, char c) {
+ bufferlist bl;
+ if (len > 0) {
+ bl.reserve(len);
+ bl.append(std::string(len, c));
+ }
+ return bl;
+}
+
+bool disassemble_frame(FrameAssembler& frame_asm, bufferlist& frame_bl,
+ Tag& tag, segment_bls_t& segment_bls) {
+ bufferlist preamble_bl;
+ frame_bl.splice(0, frame_asm.get_preamble_onwire_len(), &preamble_bl);
+ tag = frame_asm.disassemble_preamble(preamble_bl);
+
+ do {
+ size_t seg_idx = segment_bls.size();
+ segment_bls.emplace_back();
+
+ uint32_t onwire_len = frame_asm.get_segment_onwire_len(seg_idx);
+ if (onwire_len > 0) {
+ frame_bl.splice(0, onwire_len, &segment_bls.back());
+ }
+ } while (segment_bls.size() < frame_asm.get_num_segments());
+
+ bufferlist epilogue_bl;
+ uint32_t epilogue_onwire_len = frame_asm.get_epilogue_onwire_len();
+ if (epilogue_onwire_len > 0) {
+ frame_bl.splice(0, epilogue_onwire_len, &epilogue_bl);
+ }
+
+ return frame_asm.disassemble_segments(preamble_bl, segment_bls.data(), epilogue_bl);
+}
+
+class RoundTripTestBase : public ::testing::TestWithParam<
+ std::tuple<round_trip_instance_t, mode_t>> {
+protected:
+ RoundTripTestBase()
+ : m_tx_frame_asm(&m_tx_crypto, std::get<1>(GetParam()).is_rev1, true,
+ &m_tx_comp),
+ m_rx_frame_asm(&m_rx_crypto, std::get<1>(GetParam()).is_rev1, true,
+ &m_rx_comp),
+ m_header(make_bufferlist(std::get<0>(GetParam()).header_len, 'H')),
+ m_front(make_bufferlist(std::get<0>(GetParam()).front_len, 'F')),
+ m_middle(make_bufferlist(std::get<0>(GetParam()).middle_len, 'M')),
+ m_data(make_bufferlist(std::get<0>(GetParam()).data_len, 'D')) {
+ const auto& m = std::get<1>(GetParam());
+ if (m.is_secure) {
+ AuthConnectionMeta auth_meta;
+ auth_meta.con_mode = CEPH_CON_MODE_SECURE;
+ // see AuthConnectionMeta::get_connection_secret_length()
+ auth_meta.connection_secret.resize(64);
+ g_ceph_context->random()->get_bytes(auth_meta.connection_secret.data(),
+ auth_meta.connection_secret.size());
+ m_tx_crypto = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+ g_ceph_context, auth_meta, /*new_nonce_format=*/m.is_rev1,
+ /*crossed=*/false);
+ m_rx_crypto = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+ g_ceph_context, auth_meta, /*new_nonce_format=*/m.is_rev1,
+ /*crossed=*/true);
+ }
+
+ if (m.is_compress) {
+ CompConnectionMeta comp_meta;
+ comp_meta.con_mode = Compressor::COMP_FORCE;
+ comp_meta.con_method = Compressor::COMP_ALG_SNAPPY;
+ m_tx_comp = ceph::compression::onwire::rxtx_t::create_handler_pair(
+ g_ceph_context, comp_meta, /*min_compress_size=*/COMP_THRESHOLD
+ );
+ m_rx_comp = ceph::compression::onwire::rxtx_t::create_handler_pair(
+ g_ceph_context, comp_meta, /*min_compress_size=*/COMP_THRESHOLD
+ );
+ }
+ }
+
+ void check_frame_assembler(const FrameAssembler& frame_asm) {
+ const auto& [rti, m] = GetParam();
+ const auto& onwire_lens = rti.onwire_lens[m.is_rev1 << 1 | m.is_secure];
+
+ EXPECT_COMPRESSED(m.is_compress, rti.header_len + rti.front_len + rti.middle_len + rti.data_len,
+ frame_asm.get_frame_logical_len());
+ ASSERT_EQ(rti.num_segments, frame_asm.get_num_segments());
+ EXPECT_COMPRESSED(m.is_compress, onwire_lens[0], frame_asm.get_preamble_onwire_len());
+ for (size_t i = 0; i < rti.num_segments; i++) {
+ EXPECT_COMPRESSED(m.is_compress, onwire_lens[i + 1], frame_asm.get_segment_onwire_len(i));
+ }
+ EXPECT_COMPRESSED(m.is_compress, onwire_lens[rti.num_segments + 1],
+ frame_asm.get_epilogue_onwire_len());
+ EXPECT_COMPRESSED(m.is_compress,
+ std::accumulate(std::begin(onwire_lens), std::end(onwire_lens),
+ uint64_t(0)),
+ frame_asm.get_frame_onwire_len());
+ }
+
+ void test_round_trip() {
+ auto tx_frame = TestFrame::Encode(m_header, m_front, m_middle, m_data);
+ auto onwire_bl = tx_frame.get_buffer(m_tx_frame_asm);
+ check_frame_assembler(m_tx_frame_asm);
+ EXPECT_EQ(m_tx_frame_asm.get_frame_onwire_len(), onwire_bl.length());
+
+ Tag rx_tag;
+ segment_bls_t rx_segment_bls;
+ EXPECT_TRUE(disassemble_frame(m_rx_frame_asm, onwire_bl, rx_tag,
+ rx_segment_bls));
+ check_frame_assembler(m_rx_frame_asm);
+ EXPECT_EQ(0, onwire_bl.length());
+ EXPECT_EQ(TestFrame::tag, rx_tag);
+ EXPECT_EQ(m_rx_frame_asm.get_num_segments(), rx_segment_bls.size());
+
+ auto rx_frame = TestFrame::Decode(rx_segment_bls);
+ EXPECT_TRUE(m_header.contents_equal(rx_frame.header()));
+ EXPECT_TRUE(m_front.contents_equal(rx_frame.front()));
+ EXPECT_TRUE(m_middle.contents_equal(rx_frame.middle()));
+ EXPECT_TRUE(m_data.contents_equal(rx_frame.data()));
+ }
+
+ ceph::crypto::onwire::rxtx_t m_tx_crypto;
+ ceph::crypto::onwire::rxtx_t m_rx_crypto;
+ ceph::compression::onwire::rxtx_t m_tx_comp;
+ ceph::compression::onwire::rxtx_t m_rx_comp;
+ FrameAssembler m_tx_frame_asm;
+ FrameAssembler m_rx_frame_asm;
+
+ const bufferlist m_header;
+ const bufferlist m_front;
+ const bufferlist m_middle;
+ const bufferlist m_data;
+};
+
+class RoundTripTest : public RoundTripTestBase {};
+
+TEST_P(RoundTripTest, Basic) {
+ test_round_trip();
+}
+
+TEST_P(RoundTripTest, Reuse) {
+ for (int i = 0; i < 3; i++) {
+ test_round_trip();
+ }
+}
+
+static const round_trip_instance_t round_trip_instances[] = {
+ // first segment is empty
+ { 0, 0, 0, 0, 1, {{32, 0, 17, 0, 0, 0},
+ {32, 0, 32, 0, 0, 0},
+ {32, 0, 0, 0, 0, 0},
+ {96, 0, 0, 0, 0, 0}}},
+ { 0, 0, 0, 303, 4, {{32, 0, 0, 0, 303, 17},
+ {32, 0, 0, 0, 304, 32},
+ {32, 0, 0, 0, 303, 13},
+ {96, 0, 0, 0, 304, 32}}},
+ { 0, 0, 202, 0, 3, {{32, 0, 0, 202, 17, 0},
+ {32, 0, 0, 208, 32, 0},
+ {32, 0, 0, 202, 13, 0},
+ {96, 0, 0, 208, 32, 0}}},
+ { 0, 0, 202, 303, 4, {{32, 0, 0, 202, 303, 17},
+ {32, 0, 0, 208, 304, 32},
+ {32, 0, 0, 202, 303, 13},
+ {96, 0, 0, 208, 304, 32}}},
+ { 0, 101, 0, 0, 2, {{32, 0, 101, 17, 0, 0},
+ {32, 0, 112, 32, 0, 0},
+ {32, 0, 101, 13, 0, 0},
+ {96, 0, 112, 32, 0, 0}}},
+ { 0, 101, 0, 303, 4, {{32, 0, 101, 0, 303, 17},
+ {32, 0, 112, 0, 304, 32},
+ {32, 0, 101, 0, 303, 13},
+ {96, 0, 112, 0, 304, 32}}},
+ { 0, 101, 202, 0, 3, {{32, 0, 101, 202, 17, 0},
+ {32, 0, 112, 208, 32, 0},
+ {32, 0, 101, 202, 13, 0},
+ {96, 0, 112, 208, 32, 0}}},
+ { 0, 101, 202, 303, 4, {{32, 0, 101, 202, 303, 17},
+ {32, 0, 112, 208, 304, 32},
+ {32, 0, 101, 202, 303, 13},
+ {96, 0, 112, 208, 304, 32}}},
+
+ // first segment is fully inlined, inline buffer is not full
+ { 1, 0, 0, 0, 1, {{32, 1, 17, 0, 0, 0},
+ {32, 16, 32, 0, 0, 0},
+ {32, 5, 0, 0, 0, 0},
+ {96, 0, 0, 0, 0, 0}}},
+ { 1, 0, 0, 303, 4, {{32, 1, 0, 0, 303, 17},
+ {32, 16, 0, 0, 304, 32},
+ {32, 5, 0, 0, 303, 13},
+ {96, 0, 0, 0, 304, 32}}},
+ { 1, 0, 202, 0, 3, {{32, 1, 0, 202, 17, 0},
+ {32, 16, 0, 208, 32, 0},
+ {32, 5, 0, 202, 13, 0},
+ {96, 0, 0, 208, 32, 0}}},
+ { 1, 0, 202, 303, 4, {{32, 1, 0, 202, 303, 17},
+ {32, 16, 0, 208, 304, 32},
+ {32, 5, 0, 202, 303, 13},
+ {96, 0, 0, 208, 304, 32}}},
+ { 1, 101, 0, 0, 2, {{32, 1, 101, 17, 0, 0},
+ {32, 16, 112, 32, 0, 0},
+ {32, 5, 101, 13, 0, 0},
+ {96, 0, 112, 32, 0, 0}}},
+ { 1, 101, 0, 303, 4, {{32, 1, 101, 0, 303, 17},
+ {32, 16, 112, 0, 304, 32},
+ {32, 5, 101, 0, 303, 13},
+ {96, 0, 112, 0, 304, 32}}},
+ { 1, 101, 202, 0, 3, {{32, 1, 101, 202, 17, 0},
+ {32, 16, 112, 208, 32, 0},
+ {32, 5, 101, 202, 13, 0},
+ {96, 0, 112, 208, 32, 0}}},
+ { 1, 101, 202, 303, 4, {{32, 1, 101, 202, 303, 17},
+ {32, 16, 112, 208, 304, 32},
+ {32, 5, 101, 202, 303, 13},
+ {96, 0, 112, 208, 304, 32}}},
+
+ // first segment is fully inlined, inline buffer is full
+ {48, 0, 0, 0, 1, {{32, 48, 17, 0, 0, 0},
+ {32, 48, 32, 0, 0, 0},
+ {32, 52, 0, 0, 0, 0},
+ {96, 0, 0, 0, 0, 0}}},
+ {48, 0, 0, 303, 4, {{32, 48, 0, 0, 303, 17},
+ {32, 48, 0, 0, 304, 32},
+ {32, 52, 0, 0, 303, 13},
+ {96, 0, 0, 0, 304, 32}}},
+ {48, 0, 202, 0, 3, {{32, 48, 0, 202, 17, 0},
+ {32, 48, 0, 208, 32, 0},
+ {32, 52, 0, 202, 13, 0},
+ {96, 0, 0, 208, 32, 0}}},
+ {48, 0, 202, 303, 4, {{32, 48, 0, 202, 303, 17},
+ {32, 48, 0, 208, 304, 32},
+ {32, 52, 0, 202, 303, 13},
+ {96, 0, 0, 208, 304, 32}}},
+ {48, 101, 0, 0, 2, {{32, 48, 101, 17, 0, 0},
+ {32, 48, 112, 32, 0, 0},
+ {32, 52, 101, 13, 0, 0},
+ {96, 0, 112, 32, 0, 0}}},
+ {48, 101, 0, 303, 4, {{32, 48, 101, 0, 303, 17},
+ {32, 48, 112, 0, 304, 32},
+ {32, 52, 101, 0, 303, 13},
+ {96, 0, 112, 0, 304, 32}}},
+ {48, 101, 202, 0, 3, {{32, 48, 101, 202, 17, 0},
+ {32, 48, 112, 208, 32, 0},
+ {32, 52, 101, 202, 13, 0},
+ {96, 0, 112, 208, 32, 0}}},
+ {48, 101, 202, 303, 4, {{32, 48, 101, 202, 303, 17},
+ {32, 48, 112, 208, 304, 32},
+ {32, 52, 101, 202, 303, 13},
+ {96, 0, 112, 208, 304, 32}}},
+
+ // first segment is partially inlined
+ {49, 0, 0, 0, 1, {{32, 49, 17, 0, 0, 0},
+ {32, 64, 32, 0, 0, 0},
+ {32, 53, 0, 0, 0, 0},
+ {96, 32, 0, 0, 0, 0}}},
+ {49, 0, 0, 303, 4, {{32, 49, 0, 0, 303, 17},
+ {32, 64, 0, 0, 304, 32},
+ {32, 53, 0, 0, 303, 13},
+ {96, 32, 0, 0, 304, 32}}},
+ {49, 0, 202, 0, 3, {{32, 49, 0, 202, 17, 0},
+ {32, 64, 0, 208, 32, 0},
+ {32, 53, 0, 202, 13, 0},
+ {96, 32, 0, 208, 32, 0}}},
+ {49, 0, 202, 303, 4, {{32, 49, 0, 202, 303, 17},
+ {32, 64, 0, 208, 304, 32},
+ {32, 53, 0, 202, 303, 13},
+ {96, 32, 0, 208, 304, 32}}},
+ {49, 101, 0, 0, 2, {{32, 49, 101, 17, 0, 0},
+ {32, 64, 112, 32, 0, 0},
+ {32, 53, 101, 13, 0, 0},
+ {96, 32, 112, 32, 0, 0}}},
+ {49, 101, 0, 303, 4, {{32, 49, 101, 0, 303, 17},
+ {32, 64, 112, 0, 304, 32},
+ {32, 53, 101, 0, 303, 13},
+ {96, 32, 112, 0, 304, 32}}},
+ {49, 101, 202, 0, 3, {{32, 49, 101, 202, 17, 0},
+ {32, 64, 112, 208, 32, 0},
+ {32, 53, 101, 202, 13, 0},
+ {96, 32, 112, 208, 32, 0}}},
+ {49, 101, 202, 303, 4, {{32, 49, 101, 202, 303, 17},
+ {32, 64, 112, 208, 304, 32},
+ {32, 53, 101, 202, 303, 13},
+ {96, 32, 112, 208, 304, 32}}},
+};
+
+INSTANTIATE_TEST_SUITE_P(
+ RoundTripTests, RoundTripTest, ::testing::Combine(
+ ::testing::ValuesIn(round_trip_instances),
+ ::testing::ValuesIn(modes)));
+
+class RoundTripPerfTest : public RoundTripTestBase {};
+
+TEST_P(RoundTripPerfTest, DISABLED_Basic) {
+ for (int i = 0; i < 100000; i++) {
+ auto tx_frame = TestFrame::Encode(m_header, m_front, m_middle, m_data);
+ auto onwire_bl = tx_frame.get_buffer(m_tx_frame_asm);
+
+ Tag rx_tag;
+ segment_bls_t rx_segment_bls;
+ ASSERT_TRUE(disassemble_frame(m_rx_frame_asm, onwire_bl, rx_tag,
+ rx_segment_bls));
+ }
+}
+
+static const round_trip_instance_t round_trip_perf_instances[] = {
+ {41, 250, 0, 0, 2, {{32, 41, 250, 17, 0, 0},
+ {32, 48, 256, 32, 0, 0},
+ {32, 45, 250, 13, 0, 0},
+ {96, 0, 256, 32, 0, 0}}},
+ {41, 250, 0, 512, 4, {{32, 41, 250, 0, 512, 17},
+ {32, 48, 256, 0, 512, 32},
+ {32, 45, 250, 0, 512, 13},
+ {96, 0, 256, 0, 512, 32}}},
+ {41, 250, 0, 4096, 4, {{32, 41, 250, 0, 4096, 17},
+ {32, 48, 256, 0, 4096, 32},
+ {32, 45, 250, 0, 4096, 13},
+ {96, 0, 256, 0, 4096, 32}}},
+ {41, 250, 0, 32768, 4, {{32, 41, 250, 0, 32768, 17},
+ {32, 48, 256, 0, 32768, 32},
+ {32, 45, 250, 0, 32768, 13},
+ {96, 0, 256, 0, 32768, 32}}},
+ {41, 250, 0, 131072, 4, {{32, 41, 250, 0, 131072, 17},
+ {32, 48, 256, 0, 131072, 32},
+ {32, 45, 250, 0, 131072, 13},
+ {96, 0, 256, 0, 131072, 32}}},
+ {41, 250, 0, 4194304, 4, {{32, 41, 250, 0, 4194304, 17},
+ {32, 48, 256, 0, 4194304, 32},
+ {32, 45, 250, 0, 4194304, 13},
+ {96, 0, 256, 0, 4194304, 32}}},
+};
+
+INSTANTIATE_TEST_SUITE_P(
+ RoundTripPerfTests, RoundTripPerfTest, ::testing::Combine(
+ ::testing::ValuesIn(round_trip_perf_instances),
+ ::testing::ValuesIn(modes)));
+
+} // namespace ceph::msgr::v2
+
+int main(int argc, char* argv[]) {
+ auto args = argv_to_vec(argc, argv);
+
+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ common_init_finish(g_ceph_context);
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc
new file mode 100644
index 000000000..f702cc288
--- /dev/null
+++ b/src/test/msgr/test_msgr.cc
@@ -0,0 +1,2424 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <atomic>
+#include <iostream>
+#include <list>
+#include <memory>
+#include <set>
+#include <stdlib.h>
+#include <time.h>
+#include <unistd.h>
+
+#include <boost/random/binomial_distribution.hpp>
+#include <boost/random/mersenne_twister.hpp>
+#include <boost/random/uniform_int.hpp>
+#include <gtest/gtest.h>
+
+#define MSG_POLICY_UNIT_TESTING
+
+#include "common/ceph_argparse.h"
+#include "common/ceph_mutex.h"
+#include "global/global_init.h"
+#include "messages/MCommand.h"
+#include "messages/MPing.h"
+#include "msg/Connection.h"
+#include "msg/Dispatcher.h"
+#include "msg/Message.h"
+#include "msg/Messenger.h"
+#include "msg/msg_types.h"
+
+typedef boost::mt11213b gen_type;
+
+#include "common/dout.h"
+#include "include/ceph_assert.h"
+
+#include "auth/DummyAuth.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << " ceph_test_msgr "
+
+
+#define CHECK_AND_WAIT_TRUE(expr) do { \
+ int n = 1000; \
+ while (--n) { \
+ if (expr) \
+ break; \
+ usleep(1000); \
+ } \
+} while(0);
+
+using namespace std;
+
+class MessengerTest : public ::testing::TestWithParam<const char*> {
+ public:
+ DummyAuthClientServer dummy_auth;
+ Messenger *server_msgr;
+ Messenger *client_msgr;
+
+ MessengerTest() : dummy_auth(g_ceph_context),
+ server_msgr(NULL), client_msgr(NULL) {
+ dummy_auth.auth_registry.refresh_config();
+ }
+ void SetUp() override {
+ lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
+ server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
+ client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid());
+ server_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
+ client_msgr->set_default_policy(Messenger::Policy::lossy_client(0));
+ server_msgr->set_auth_client(&dummy_auth);
+ server_msgr->set_auth_server(&dummy_auth);
+ client_msgr->set_auth_client(&dummy_auth);
+ client_msgr->set_auth_server(&dummy_auth);
+ server_msgr->set_require_authorizer(false);
+ }
+ void TearDown() override {
+ ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0);
+ ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0);
+ delete server_msgr;
+ delete client_msgr;
+ }
+
+};
+
+
+class FakeDispatcher : public Dispatcher {
+ public:
+ struct Session : public RefCountedObject {
+ atomic<uint64_t> count;
+ ConnectionRef con;
+
+ explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) {
+ }
+ uint64_t get_count() { return count; }
+ };
+
+ ceph::mutex lock = ceph::make_mutex("FakeDispatcher::lock");
+ ceph::condition_variable cond;
+ bool is_server;
+ bool got_new;
+ bool got_remote_reset;
+ bool got_connect;
+ bool loopback;
+ entity_addrvec_t last_accept;
+ ConnectionRef *last_accept_con_ptr = nullptr;
+
+ explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context),
+ is_server(s), got_new(false), got_remote_reset(false),
+ got_connect(false), loopback(false) {
+ }
+ bool ms_can_fast_dispatch_any() const override { return true; }
+ bool ms_can_fast_dispatch(const Message *m) const override {
+ switch (m->get_type()) {
+ case CEPH_MSG_PING:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ void ms_handle_fast_connect(Connection *con) override {
+ std::scoped_lock l{lock};
+ lderr(g_ceph_context) << __func__ << " " << con << dendl;
+ auto s = con->get_priv();
+ if (!s) {
+ auto session = new Session(con);
+ con->set_priv(RefCountedPtr{session, false});
+ lderr(g_ceph_context) << __func__ << " con: " << con
+ << " count: " << session->count << dendl;
+ }
+ got_connect = true;
+ cond.notify_all();
+ }
+ void ms_handle_fast_accept(Connection *con) override {
+ last_accept = con->get_peer_addrs();
+ if (last_accept_con_ptr) {
+ *last_accept_con_ptr = con;
+ }
+ if (!con->get_priv()) {
+ con->set_priv(RefCountedPtr{new Session(con), false});
+ }
+ }
+ bool ms_dispatch(Message *m) override {
+ auto priv = m->get_connection()->get_priv();
+ auto s = static_cast<Session*>(priv.get());
+ if (!s) {
+ s = new Session(m->get_connection());
+ priv.reset(s, false);
+ m->get_connection()->set_priv(priv);
+ }
+ s->count++;
+ lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
+ if (is_server) {
+ reply_message(m);
+ }
+ std::lock_guard l{lock};
+ got_new = true;
+ cond.notify_all();
+ m->put();
+ return true;
+ }
+ bool ms_handle_reset(Connection *con) override {
+ std::lock_guard l{lock};
+ lderr(g_ceph_context) << __func__ << " " << con << dendl;
+ auto priv = con->get_priv();
+ if (auto s = static_cast<Session*>(priv.get()); s) {
+ s->con.reset(); // break con <-> session ref cycle
+ con->set_priv(nullptr); // break ref <-> session cycle, if any
+ }
+ return true;
+ }
+ void ms_handle_remote_reset(Connection *con) override {
+ std::lock_guard l{lock};
+ lderr(g_ceph_context) << __func__ << " " << con << dendl;
+ auto priv = con->get_priv();
+ if (auto s = static_cast<Session*>(priv.get()); s) {
+ s->con.reset(); // break con <-> session ref cycle
+ con->set_priv(nullptr); // break ref <-> session cycle, if any
+ }
+ got_remote_reset = true;
+ cond.notify_all();
+ }
+ bool ms_handle_refused(Connection *con) override {
+ return false;
+ }
+ void ms_fast_dispatch(Message *m) override {
+ auto priv = m->get_connection()->get_priv();
+ auto s = static_cast<Session*>(priv.get());
+ if (!s) {
+ s = new Session(m->get_connection());
+ priv.reset(s, false);
+ m->get_connection()->set_priv(priv);
+ }
+ s->count++;
+ lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
+ if (is_server) {
+ if (loopback)
+ ceph_assert(m->get_source().is_osd());
+ else
+ reply_message(m);
+ } else if (loopback) {
+ ceph_assert(m->get_source().is_client());
+ }
+ m->put();
+ std::lock_guard l{lock};
+ got_new = true;
+ cond.notify_all();
+ }
+
+ int ms_handle_fast_authentication(Connection *con) override {
+ return 1;
+ }
+
+ void reply_message(Message *m) {
+ MPing *rm = new MPing();
+ m->get_connection()->send_message(rm);
+ }
+};
+
+typedef FakeDispatcher::Session Session;
+
+struct TestInterceptor : public Interceptor {
+
+ bool step_waiting = false;
+ bool waiting = true;
+ std::map<Connection *, uint32_t> current_step;
+ std::map<Connection *, std::list<uint32_t>> step_history;
+ std::map<uint32_t, std::optional<ACTION>> decisions;
+ std::set<uint32_t> breakpoints;
+
+ uint32_t count_step(Connection *conn, uint32_t step) {
+ uint32_t count = 0;
+ for (auto s : step_history[conn]) {
+ if (s == step) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ void breakpoint(uint32_t step) {
+ breakpoints.insert(step);
+ }
+
+ void remove_bp(uint32_t step) {
+ breakpoints.erase(step);
+ }
+
+ Connection *wait(uint32_t step, Connection *conn=nullptr) {
+ std::unique_lock<std::mutex> l(lock);
+ while(true) {
+ if (conn) {
+ auto it = current_step.find(conn);
+ if (it != current_step.end()) {
+ if (it->second == step) {
+ break;
+ }
+ }
+ } else {
+ for (auto it : current_step) {
+ if (it.second == step) {
+ conn = it.first;
+ break;
+ }
+ }
+ if (conn) {
+ break;
+ }
+ }
+ step_waiting = true;
+ cond_var.wait(l);
+ }
+ step_waiting = false;
+ return conn;
+ }
+
+ ACTION wait_for_decision(uint32_t step, std::unique_lock<std::mutex> &l) {
+ if (decisions[step]) {
+ return *(decisions[step]);
+ }
+ waiting = true;
+ cond_var.wait(l, [this] { return !waiting; });
+ return *(decisions[step]);
+ }
+
+ void proceed(uint32_t step, ACTION decision) {
+ std::unique_lock<std::mutex> l(lock);
+ decisions[step] = decision;
+ if (waiting) {
+ waiting = false;
+ cond_var.notify_one();
+ }
+ }
+
+ ACTION intercept(Connection *conn, uint32_t step) override {
+ lderr(g_ceph_context) << __func__ << " conn(" << conn
+ << ") intercept called on step=" << step << dendl;
+
+ {
+ std::unique_lock<std::mutex> l(lock);
+ step_history[conn].push_back(step);
+ current_step[conn] = step;
+ if (step_waiting) {
+ cond_var.notify_one();
+ }
+ }
+
+ std::unique_lock<std::mutex> l(lock);
+ ACTION decision = ACTION::CONTINUE;
+ if (breakpoints.find(step) != breakpoints.end()) {
+ lderr(g_ceph_context) << __func__ << " conn(" << conn
+ << ") pausing on step=" << step << dendl;
+ decision = wait_for_decision(step, l);
+ } else {
+ if (decisions[step]) {
+ decision = *(decisions[step]);
+ }
+ }
+ lderr(g_ceph_context) << __func__ << " conn(" << conn
+ << ") resuming step=" << step << " with decision="
+ << decision << dendl;
+ decisions[step].reset();
+ return decision;
+ }
+
+};
+
+/**
+ * Scenario: A connects to B, and B connects to A at the same time.
+ */
+TEST_P(MessengerTest, ConnectionRaceTest) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+ TestInterceptor *cli_interceptor = new TestInterceptor();
+ TestInterceptor *srv_interceptor = new TestInterceptor();
+
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0));
+ server_msgr->interceptor = srv_interceptor;
+
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0));
+ client_msgr->interceptor = cli_interceptor;
+
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1:3300");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ bind_addr.parse("v2:127.0.0.1:3301");
+ client_msgr->bind(bind_addr);
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // pause before sending client_ident message
+ cli_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY);
+ // pause before sending client_ident message
+ srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY);
+
+ ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ MPing *m1 = new MPing();
+ ASSERT_EQ(c2s->send_message(m1), 0);
+
+ ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
+ client_msgr->get_myaddrs());
+ MPing *m2 = new MPing();
+ ASSERT_EQ(s2c->send_message(m2), 0);
+
+ cli_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, c2s.get());
+ srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, s2c.get());
+
+ // at this point both connections (A->B, B->A) are paused just before sending
+ // the client_ident message.
+
+ cli_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY);
+ srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY);
+
+ cli_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE);
+ srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE);
+
+ {
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+
+ {
+ std::unique_lock l{srv_dispatcher.lock};
+ srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
+ srv_dispatcher.got_new = false;
+ }
+
+ ASSERT_TRUE(s2c->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
+ ASSERT_TRUE(s2c->peer_is_client());
+
+ ASSERT_TRUE(c2s->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+ ASSERT_TRUE(c2s->peer_is_osd());
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+ server_msgr->shutdown();
+ server_msgr->wait();
+
+ delete cli_interceptor;
+ delete srv_interceptor;
+}
+
+/**
+ * Scenario: A connects to B, and B connects to A at the same time.
+ * The first (A -> B) connection gets to message flow handshake, the
+ * second (B -> A) connection is stuck waiting for a banner from A.
+ * After A sends client_ident to B, the first connection wins and B
+ * calls reuse_connection() to replace the second connection's socket
+ * while the second connection is still in BANNER_CONNECTING.
+ */
+TEST_P(MessengerTest, ConnectionRaceReuseBannerTest) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+ auto cli_interceptor = std::make_unique<TestInterceptor>();
+ auto srv_interceptor = std::make_unique<TestInterceptor>();
+
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT,
+ Messenger::Policy::lossless_peer_reuse(0));
+ server_msgr->interceptor = srv_interceptor.get();
+
+ client_msgr->set_policy(entity_name_t::TYPE_OSD,
+ Messenger::Policy::lossless_peer_reuse(0));
+ client_msgr->interceptor = cli_interceptor.get();
+
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1:3300");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ bind_addr.parse("v2:127.0.0.1:3301");
+ client_msgr->bind(bind_addr);
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // pause before sending client_ident message
+ srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY);
+
+ ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
+ client_msgr->get_myaddrs());
+ MPing *m1 = new MPing();
+ ASSERT_EQ(s2c->send_message(m1), 0);
+
+ srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY);
+ srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY);
+
+ // pause before sending banner
+ cli_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING);
+
+ ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ MPing *m2 = new MPing();
+ ASSERT_EQ(c2s->send_message(m2), 0);
+
+ cli_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING);
+ cli_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING);
+
+ // second connection is in BANNER_CONNECTING, ensure it stays so
+ // and send client_ident
+ srv_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE);
+ srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE);
+
+ // handle client_ident -- triggers reuse_connection() with exproto
+ // in BANNER_CONNECTING
+ cli_interceptor->breakpoint(Interceptor::STEP::READY);
+ cli_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING, Interceptor::ACTION::CONTINUE);
+
+ cli_interceptor->wait(Interceptor::STEP::READY);
+ cli_interceptor->remove_bp(Interceptor::STEP::READY);
+
+ // first connection is in READY
+ Connection *s2c_accepter = srv_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE);
+ srv_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE);
+
+ srv_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE, Interceptor::ACTION::CONTINUE);
+ cli_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE);
+
+ {
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+
+ {
+ std::unique_lock l{srv_dispatcher.lock};
+ srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
+ srv_dispatcher.got_new = false;
+ }
+
+ EXPECT_TRUE(s2c->is_connected());
+ EXPECT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
+ EXPECT_TRUE(s2c->peer_is_client());
+
+ EXPECT_TRUE(c2s->is_connected());
+ EXPECT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+ EXPECT_TRUE(c2s->peer_is_osd());
+
+ // closed in reuse_connection() -- EPIPE when writing banner/hello
+ EXPECT_FALSE(s2c_accepter->is_connected());
+
+ // established exactly once, never faulted and reconnected
+ EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::START_CLIENT_BANNER_EXCHANGE), 1u);
+ EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 0u);
+ EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::READY), 1u);
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+ server_msgr->shutdown();
+ server_msgr->wait();
+}
+
+/**
+ * Scenario:
+ * - A connects to B
+ * - A sends client_ident to B
+ * - B fails before sending server_ident to A
+ * - A reconnects
+ */
+TEST_P(MessengerTest, MissingServerIdenTest) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+ TestInterceptor *cli_interceptor = new TestInterceptor();
+ TestInterceptor *srv_interceptor = new TestInterceptor();
+
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
+ server_msgr->interceptor = srv_interceptor;
+
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0));
+ client_msgr->interceptor = cli_interceptor;
+
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1:3300");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ bind_addr.parse("v2:127.0.0.1:3301");
+ client_msgr->bind(bind_addr);
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // pause before sending server_ident message
+ srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY);
+
+ ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ MPing *m1 = new MPing();
+ ASSERT_EQ(c2s->send_message(m1), 0);
+
+ Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY);
+ srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY);
+
+ // We inject a message from this side of the connection to force it to be
+ // in standby when we inject the failure below
+ MPing *m2 = new MPing();
+ ASSERT_EQ(c2s_accepter->send_message(m2), 0);
+
+ srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL);
+
+ {
+ std::unique_lock l{srv_dispatcher.lock};
+ srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
+ srv_dispatcher.got_new = false;
+ }
+
+ {
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+
+ ASSERT_TRUE(c2s->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+ ASSERT_TRUE(c2s->peer_is_osd());
+
+ ASSERT_TRUE(c2s_accepter->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
+ ASSERT_TRUE(c2s_accepter->peer_is_client());
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+ server_msgr->shutdown();
+ server_msgr->wait();
+
+ delete cli_interceptor;
+ delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ * - A connects to B
+ * - A sends client_ident to B
+ * - B fails before sending server_ident to A
+ * - A goes to standby
+ * - B reconnects to A
+ */
+TEST_P(MessengerTest, MissingServerIdenTest2) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+ TestInterceptor *cli_interceptor = new TestInterceptor();
+ TestInterceptor *srv_interceptor = new TestInterceptor();
+
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
+ server_msgr->interceptor = srv_interceptor;
+
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
+ client_msgr->interceptor = cli_interceptor;
+
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1:3300");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ bind_addr.parse("v2:127.0.0.1:3301");
+ client_msgr->bind(bind_addr);
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // pause before sending server_ident message
+ srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY);
+
+ ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+
+ Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY);
+ srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY);
+
+ // We inject a message from this side of the connection to force it to be
+ // in standby when we inject the failure below
+ MPing *m2 = new MPing();
+ ASSERT_EQ(c2s_accepter->send_message(m2), 0);
+
+ srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL);
+
+ {
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+
+ ASSERT_TRUE(c2s->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+ ASSERT_TRUE(c2s->peer_is_osd());
+
+ ASSERT_TRUE(c2s_accepter->is_connected());
+ ASSERT_EQ(0u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
+ ASSERT_TRUE(c2s_accepter->peer_is_client());
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+ server_msgr->shutdown();
+ server_msgr->wait();
+
+ delete cli_interceptor;
+ delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ * - A connects to B
+ * - A and B exchange messages
+ * - A fails
+ * - B goes into standby
+ * - A reconnects
+ */
+TEST_P(MessengerTest, ReconnectTest) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+
+ TestInterceptor *cli_interceptor = new TestInterceptor();
+ TestInterceptor *srv_interceptor = new TestInterceptor();
+
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
+ server_msgr->interceptor = srv_interceptor;
+
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
+ client_msgr->interceptor = cli_interceptor;
+
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1:3300");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ bind_addr.parse("v2:127.0.0.1:3301");
+ client_msgr->bind(bind_addr);
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+
+ MPing *m1 = new MPing();
+ ASSERT_EQ(c2s->send_message(m1), 0);
+
+ {
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+
+ ASSERT_TRUE(c2s->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+ ASSERT_TRUE(c2s->peer_is_osd());
+
+ cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE);
+
+ MPing *m2 = new MPing();
+ ASSERT_EQ(c2s->send_message(m2), 0);
+
+ cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get());
+ cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE);
+
+ // at this point client and server are connected together
+
+ srv_interceptor->breakpoint(Interceptor::STEP::READY);
+
+ // failing client
+ cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL);
+
+ MPing *m3 = new MPing();
+ ASSERT_EQ(c2s->send_message(m3), 0);
+
+ Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY);
+ // the srv end of theconnection is now paused at ready
+ // this means that the reconnect was successful
+ srv_interceptor->remove_bp(Interceptor::STEP::READY);
+
+ ASSERT_TRUE(c2s_accepter->peer_is_client());
+ // c2s_accepter sent 0 reconnect messages
+ ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u);
+ // c2s_accepter sent 1 reconnect_ok messages
+ ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 1u);
+ // c2s sent 1 reconnect messages
+ ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u);
+ // c2s sent 0 reconnect_ok messages
+ ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 0u);
+
+ srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
+
+ {
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+ server_msgr->shutdown();
+ server_msgr->wait();
+
+ delete cli_interceptor;
+ delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ * - A connects to B
+ * - A and B exchange messages
+ * - A fails
+ * - A reconnects // B reconnects
+ */
+TEST_P(MessengerTest, ReconnectRaceTest) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+
+ TestInterceptor *cli_interceptor = new TestInterceptor();
+ TestInterceptor *srv_interceptor = new TestInterceptor();
+
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
+ server_msgr->interceptor = srv_interceptor;
+
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
+ client_msgr->interceptor = cli_interceptor;
+
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1:3300");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ bind_addr.parse("v2:127.0.0.1:3301");
+ client_msgr->bind(bind_addr);
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+
+ MPing *m1 = new MPing();
+ ASSERT_EQ(c2s->send_message(m1), 0);
+
+ {
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+
+ ASSERT_TRUE(c2s->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+ ASSERT_TRUE(c2s->peer_is_osd());
+
+ cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE);
+
+ MPing *m2 = new MPing();
+ ASSERT_EQ(c2s->send_message(m2), 0);
+
+ cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get());
+ cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE);
+
+ // at this point client and server are connected together
+
+ // force both client and server to race on reconnect
+ cli_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT);
+ srv_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT);
+
+ // failing client
+ // this will cause both client and server to reconnect at the same time
+ cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL);
+
+ MPing *m3 = new MPing();
+ ASSERT_EQ(c2s->send_message(m3), 0);
+
+ cli_interceptor->wait(Interceptor::STEP::SEND_RECONNECT, c2s.get());
+ srv_interceptor->wait(Interceptor::STEP::SEND_RECONNECT);
+
+ cli_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT);
+ srv_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT);
+
+ // pause on "ready"
+ srv_interceptor->breakpoint(Interceptor::STEP::READY);
+
+ cli_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE);
+ srv_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE);
+
+ Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY);
+
+ // the server has reconnected and is "ready"
+ srv_interceptor->remove_bp(Interceptor::STEP::READY);
+
+ ASSERT_TRUE(c2s_accepter->peer_is_client());
+ ASSERT_TRUE(c2s->peer_is_osd());
+
+ // the server should win the reconnect race
+
+ // c2s_accepter sent 1 or 2 reconnect messages
+ ASSERT_LT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 3u);
+ ASSERT_GT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u);
+ // c2s_accepter sent 0 reconnect_ok messages
+ ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 0u);
+ // c2s sent 1 reconnect messages
+ ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u);
+ // c2s sent 1 reconnect_ok messages
+ ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 1u);
+
+ if (srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT) == 2) {
+ // if the server send the reconnect message two times then
+ // the client must have sent a session retry message to the server
+ ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 1u);
+ } else {
+ ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 0u);
+ }
+
+ srv_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE);
+
+ {
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+ server_msgr->shutdown();
+ server_msgr->wait();
+
+ delete cli_interceptor;
+ delete srv_interceptor;
+}
+
+TEST_P(MessengerTest, SimpleTest) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // 1. simple round trip
+ MPing *m = new MPing();
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_TRUE(conn->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
+ ASSERT_TRUE(conn->peer_is_osd());
+
+ // 2. test rebind port
+ set<int> avoid_ports;
+ for (int i = 0; i < 10 ; i++) {
+ for (auto a : server_msgr->get_myaddrs().v) {
+ avoid_ports.insert(a.get_port() + i);
+ }
+ }
+ server_msgr->rebind(avoid_ports);
+ for (auto a : server_msgr->get_myaddrs().v) {
+ ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
+ }
+
+ conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+
+ // 3. test markdown connection
+ conn->mark_down();
+ ASSERT_FALSE(conn->is_connected());
+
+ // 4. test failed connection
+ server_msgr->shutdown();
+ server_msgr->wait();
+
+ m = new MPing();
+ conn->send_message(m);
+ CHECK_AND_WAIT_TRUE(!conn->is_connected());
+ ASSERT_FALSE(conn->is_connected());
+
+ // 5. loopback connection
+ srv_dispatcher.loopback = true;
+ conn = client_msgr->get_loopback_connection();
+ {
+ m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ srv_dispatcher.loopback = false;
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+ client_msgr->shutdown();
+ client_msgr->wait();
+ server_msgr->shutdown();
+ server_msgr->wait();
+}
+
+TEST_P(MessengerTest, SimpleMsgr2Test) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t legacy_addr;
+ legacy_addr.parse("v1:127.0.0.1");
+ entity_addr_t msgr2_addr;
+ msgr2_addr.parse("v2:127.0.0.1");
+ entity_addrvec_t bind_addrs;
+ bind_addrs.v.push_back(legacy_addr);
+ bind_addrs.v.push_back(msgr2_addr);
+ server_msgr->bindv(bind_addrs);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // 1. simple round trip
+ MPing *m = new MPing();
+ ConnectionRef conn = client_msgr->connect_to(
+ server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_TRUE(conn->is_connected());
+ ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
+ ASSERT_TRUE(conn->peer_is_osd());
+
+ // 2. test rebind port
+ set<int> avoid_ports;
+ for (int i = 0; i < 10 ; i++) {
+ for (auto a : server_msgr->get_myaddrs().v) {
+ avoid_ports.insert(a.get_port() + i);
+ }
+ }
+ server_msgr->rebind(avoid_ports);
+ for (auto a : server_msgr->get_myaddrs().v) {
+ ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
+ }
+
+ conn = client_msgr->connect_to(
+ server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+
+ // 3. test markdown connection
+ conn->mark_down();
+ ASSERT_FALSE(conn->is_connected());
+
+ // 4. test failed connection
+ server_msgr->shutdown();
+ server_msgr->wait();
+
+ m = new MPing();
+ conn->send_message(m);
+ CHECK_AND_WAIT_TRUE(!conn->is_connected());
+ ASSERT_FALSE(conn->is_connected());
+
+ // 5. loopback connection
+ srv_dispatcher.loopback = true;
+ conn = client_msgr->get_loopback_connection();
+ {
+ m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ srv_dispatcher.loopback = false;
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+ client_msgr->shutdown();
+ client_msgr->wait();
+ server_msgr->shutdown();
+ server_msgr->wait();
+}
+
+TEST_P(MessengerTest, FeatureTest) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1");
+ uint64_t all_feature_supported, feature_required, feature_supported = 0;
+ for (int i = 0; i < 10; i++)
+ feature_supported |= 1ULL << i;
+ feature_supported |= CEPH_FEATUREMASK_MSG_ADDR2;
+ feature_supported |= CEPH_FEATUREMASK_SERVER_NAUTILUS;
+ feature_required = feature_supported | 1ULL << 13;
+ all_feature_supported = feature_required | 1ULL << 14;
+
+ Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT);
+ p.features_required = feature_required;
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ // 1. Suppose if only support less than required
+ p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
+ p.features_supported = feature_supported;
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ MPing *m = new MPing();
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ conn->send_message(m);
+ CHECK_AND_WAIT_TRUE(!conn->is_connected());
+ // should failed build a connection
+ ASSERT_FALSE(conn->is_connected());
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+
+ // 2. supported met required
+ p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
+ p.features_supported = all_feature_supported;
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
+ client_msgr->start();
+
+ conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+
+ server_msgr->shutdown();
+ client_msgr->shutdown();
+ server_msgr->wait();
+ client_msgr->wait();
+}
+
+TEST_P(MessengerTest, TimeoutTest) {
+ g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "1");
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // 1. build the connection
+ MPing *m = new MPing();
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_TRUE(conn->is_connected());
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+ ASSERT_TRUE(conn->peer_is_osd());
+
+ // 2. wait for idle
+ usleep(2500*1000);
+ ASSERT_FALSE(conn->is_connected());
+
+ server_msgr->shutdown();
+ server_msgr->wait();
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+ g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "900");
+}
+
+TEST_P(MessengerTest, StatefulTest) {
+ Message *m;
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1");
+ Messenger::Policy p = Messenger::Policy::stateful_server(0);
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
+ p = Messenger::Policy::lossless_client(0);
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
+
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // 1. test for server standby
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+ conn->mark_down();
+ ASSERT_FALSE(conn->is_connected());
+ ConnectionRef server_conn = server_msgr->connect_to(
+ client_msgr->get_mytype(), srv_dispatcher.last_accept);
+ // don't lose state
+ ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
+
+ srv_dispatcher.got_new = false;
+ conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+ server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+ srv_dispatcher.last_accept);
+ {
+ std::unique_lock l{srv_dispatcher.lock};
+ srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_remote_reset; });
+ }
+
+ // 2. test for client reconnect
+ ASSERT_FALSE(cli_dispatcher.got_remote_reset);
+ cli_dispatcher.got_connect = false;
+ cli_dispatcher.got_new = false;
+ cli_dispatcher.got_remote_reset = false;
+ server_conn->mark_down();
+ ASSERT_FALSE(server_conn->is_connected());
+ // ensure client detect server socket closed
+ {
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; });
+ cli_dispatcher.got_remote_reset = false;
+ }
+ {
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; });
+ cli_dispatcher.got_connect = false;
+ }
+ CHECK_AND_WAIT_TRUE(conn->is_connected());
+ ASSERT_TRUE(conn->is_connected());
+
+ {
+ m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ ASSERT_TRUE(conn->is_connected());
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ // resetcheck happen
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+ server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+ srv_dispatcher.last_accept);
+ ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
+ cli_dispatcher.got_remote_reset = false;
+
+ server_msgr->shutdown();
+ client_msgr->shutdown();
+ server_msgr->wait();
+ client_msgr->wait();
+}
+
+TEST_P(MessengerTest, StatelessTest) {
+ Message *m;
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1");
+ Messenger::Policy p = Messenger::Policy::stateless_server(0);
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
+ p = Messenger::Policy::lossy_client(0);
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
+
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // 1. test for server lose state
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+ conn->mark_down();
+ ASSERT_FALSE(conn->is_connected());
+
+ srv_dispatcher.got_new = false;
+ ConnectionRef server_conn;
+ srv_dispatcher.last_accept_con_ptr = &server_conn;
+ conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+ ASSERT_TRUE(server_conn);
+
+ // server lose state
+ {
+ std::unique_lock l{srv_dispatcher.lock};
+ srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
+
+ // 2. test for client lossy
+ server_conn->mark_down();
+ ASSERT_FALSE(server_conn->is_connected());
+ conn->send_keepalive();
+ CHECK_AND_WAIT_TRUE(!conn->is_connected());
+ ASSERT_FALSE(conn->is_connected());
+ conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+
+ server_msgr->shutdown();
+ client_msgr->shutdown();
+ server_msgr->wait();
+ client_msgr->wait();
+}
+
+TEST_P(MessengerTest, AnonTest) {
+ Message *m;
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1");
+ Messenger::Policy p = Messenger::Policy::stateless_server(0);
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
+ p = Messenger::Policy::lossy_client(0);
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
+
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ ConnectionRef server_con_a, server_con_b;
+
+ // a
+ srv_dispatcher.last_accept_con_ptr = &server_con_a;
+ ConnectionRef con_a = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs(),
+ true);
+ {
+ m = new MPing();
+ ASSERT_EQ(con_a->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(con_a->get_priv().get())->get_count());
+
+ // b
+ srv_dispatcher.last_accept_con_ptr = &server_con_b;
+ ConnectionRef con_b = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs(),
+ true);
+ {
+ m = new MPing();
+ ASSERT_EQ(con_b->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(con_b->get_priv().get())->get_count());
+
+ // these should be distinct
+ ASSERT_NE(con_a, con_b);
+ ASSERT_NE(server_con_a, server_con_b);
+
+ // and both connected
+ {
+ m = new MPing();
+ ASSERT_EQ(con_a->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ {
+ m = new MPing();
+ ASSERT_EQ(con_b->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+
+ // clean up
+ con_a->mark_down();
+ ASSERT_FALSE(con_a->is_connected());
+ con_b->mark_down();
+ ASSERT_FALSE(con_b->is_connected());
+
+ server_msgr->shutdown();
+ client_msgr->shutdown();
+ server_msgr->wait();
+ client_msgr->wait();
+}
+
+TEST_P(MessengerTest, ClientStandbyTest) {
+ Message *m;
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1");
+ Messenger::Policy p = Messenger::Policy::stateful_server(0);
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
+ p = Messenger::Policy::lossless_peer(0);
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
+
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // 1. test for client standby, resetcheck
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+ ConnectionRef server_conn = server_msgr->connect_to(
+ client_msgr->get_mytype(),
+ srv_dispatcher.last_accept);
+ ASSERT_FALSE(cli_dispatcher.got_remote_reset);
+ cli_dispatcher.got_connect = false;
+ server_conn->mark_down();
+ ASSERT_FALSE(server_conn->is_connected());
+ // client should be standby
+ usleep(300*1000);
+ // client should be standby, so we use original connection
+ {
+ // Try send message to verify got remote reset callback
+ m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ {
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; });
+ cli_dispatcher.got_remote_reset = false;
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; });
+ cli_dispatcher.got_connect = false;
+ }
+ CHECK_AND_WAIT_TRUE(conn->is_connected());
+ ASSERT_TRUE(conn->is_connected());
+ m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+ server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+ srv_dispatcher.last_accept);
+ ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
+
+ server_msgr->shutdown();
+ client_msgr->shutdown();
+ server_msgr->wait();
+ client_msgr->wait();
+}
+
+TEST_P(MessengerTest, AuthTest) {
+ g_ceph_context->_conf.set_val("auth_cluster_required", "cephx");
+ g_ceph_context->_conf.set_val("auth_service_required", "cephx");
+ g_ceph_context->_conf.set_val("auth_client_required", "cephx");
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // 1. simple auth round trip
+ MPing *m = new MPing();
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_TRUE(conn->is_connected());
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+
+ // 2. mix auth
+ g_ceph_context->_conf.set_val("auth_cluster_required", "none");
+ g_ceph_context->_conf.set_val("auth_service_required", "none");
+ g_ceph_context->_conf.set_val("auth_client_required", "none");
+ conn->mark_down();
+ ASSERT_FALSE(conn->is_connected());
+ conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ MPing *m = new MPing();
+ ASSERT_EQ(conn->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_TRUE(conn->is_connected());
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+ server_msgr->shutdown();
+ client_msgr->shutdown();
+ server_msgr->wait();
+ client_msgr->wait();
+}
+
+TEST_P(MessengerTest, MessageTest) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1");
+ Messenger::Policy p = Messenger::Policy::stateful_server(0);
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
+ p = Messenger::Policy::lossless_peer(0);
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
+
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+
+ // 1. A very large "front"(as well as "payload")
+ // Because a external message need to invade Messenger::decode_message,
+ // here we only use existing message class(MCommand)
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ {
+ uuid_d uuid;
+ uuid.generate_random();
+ vector<string> cmds;
+ string s("abcdefghijklmnopqrstuvwxyz");
+ for (int i = 0; i < 1024*30; i++)
+ cmds.push_back(s);
+ MCommand *m = new MCommand(uuid);
+ m->cmd = cmds;
+ conn->send_message(m);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait_for(l, 500s, [&] { return cli_dispatcher.got_new; });
+ ASSERT_TRUE(cli_dispatcher.got_new);
+ cli_dispatcher.got_new = false;
+ }
+
+ // 2. A very large "data"
+ {
+ bufferlist bl;
+ string s("abcdefghijklmnopqrstuvwxyz");
+ for (int i = 0; i < 1024*30; i++)
+ bl.append(s);
+ MPing *m = new MPing();
+ m->set_data(bl);
+ conn->send_message(m);
+ utime_t t;
+ t += 1000*1000*500;
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ ASSERT_TRUE(cli_dispatcher.got_new);
+ cli_dispatcher.got_new = false;
+ }
+ server_msgr->shutdown();
+ client_msgr->shutdown();
+ server_msgr->wait();
+ client_msgr->wait();
+}
+
+
+class SyntheticWorkload;
+
+struct Payload {
+ enum Who : uint8_t {
+ PING = 0,
+ PONG = 1,
+ };
+ uint8_t who = 0;
+ uint64_t seq = 0;
+ bufferlist data;
+
+ Payload(Who who, uint64_t seq, const bufferlist& data)
+ : who(who), seq(seq), data(data)
+ {}
+ Payload() = default;
+ DENC(Payload, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.who, p);
+ denc(v.seq, p);
+ denc(v.data, p);
+ DENC_FINISH(p);
+ }
+};
+WRITE_CLASS_DENC(Payload)
+
+ostream& operator<<(ostream& out, const Payload &pl)
+{
+ return out << "reply=" << pl.who << " i = " << pl.seq;
+}
+
+class SyntheticDispatcher : public Dispatcher {
+ public:
+ ceph::mutex lock = ceph::make_mutex("SyntheticDispatcher::lock");
+ ceph::condition_variable cond;
+ bool is_server;
+ bool got_new;
+ bool got_remote_reset;
+ bool got_connect;
+ map<ConnectionRef, list<uint64_t> > conn_sent;
+ map<uint64_t, bufferlist> sent;
+ atomic<uint64_t> index;
+ SyntheticWorkload *workload;
+
+ SyntheticDispatcher(bool s, SyntheticWorkload *wl):
+ Dispatcher(g_ceph_context), is_server(s), got_new(false),
+ got_remote_reset(false), got_connect(false), index(0), workload(wl) {
+ }
+ bool ms_can_fast_dispatch_any() const override { return true; }
+ bool ms_can_fast_dispatch(const Message *m) const override {
+ switch (m->get_type()) {
+ case CEPH_MSG_PING:
+ case MSG_COMMAND:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ void ms_handle_fast_connect(Connection *con) override {
+ std::lock_guard l{lock};
+ list<uint64_t> c = conn_sent[con];
+ for (list<uint64_t>::iterator it = c.begin();
+ it != c.end(); ++it)
+ sent.erase(*it);
+ conn_sent.erase(con);
+ got_connect = true;
+ cond.notify_all();
+ }
+ void ms_handle_fast_accept(Connection *con) override {
+ std::lock_guard l{lock};
+ list<uint64_t> c = conn_sent[con];
+ for (list<uint64_t>::iterator it = c.begin();
+ it != c.end(); ++it)
+ sent.erase(*it);
+ conn_sent.erase(con);
+ cond.notify_all();
+ }
+ bool ms_dispatch(Message *m) override {
+ ceph_abort();
+ }
+ bool ms_handle_reset(Connection *con) override;
+ void ms_handle_remote_reset(Connection *con) override {
+ std::lock_guard l{lock};
+ list<uint64_t> c = conn_sent[con];
+ for (list<uint64_t>::iterator it = c.begin();
+ it != c.end(); ++it)
+ sent.erase(*it);
+ conn_sent.erase(con);
+ got_remote_reset = true;
+ }
+ bool ms_handle_refused(Connection *con) override {
+ return false;
+ }
+ void ms_fast_dispatch(Message *m) override {
+ // MSG_COMMAND is used to disorganize regular message flow
+ if (m->get_type() == MSG_COMMAND) {
+ m->put();
+ return ;
+ }
+
+ Payload pl;
+ auto p = m->get_data().cbegin();
+ decode(pl, p);
+ if (pl.who == Payload::PING) {
+ lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
+ reply_message(m, pl);
+ m->put();
+ std::lock_guard l{lock};
+ got_new = true;
+ cond.notify_all();
+ } else {
+ std::lock_guard l{lock};
+ if (sent.count(pl.seq)) {
+ lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
+ ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
+ ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
+ conn_sent[m->get_connection()].pop_front();
+ sent.erase(pl.seq);
+ }
+ m->put();
+ got_new = true;
+ cond.notify_all();
+ }
+ }
+
+ int ms_handle_fast_authentication(Connection *con) override {
+ return 1;
+ }
+
+ void reply_message(const Message *m, Payload& pl) {
+ pl.who = Payload::PONG;
+ bufferlist bl;
+ encode(pl, bl);
+ MPing *rm = new MPing();
+ rm->set_data(bl);
+ m->get_connection()->send_message(rm);
+ lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
+ }
+
+ void send_message_wrap(ConnectionRef con, const bufferlist& data) {
+ Message *m = new MPing();
+ Payload pl{Payload::PING, index++, data};
+ bufferlist bl;
+ encode(pl, bl);
+ m->set_data(bl);
+ if (!con->get_messenger()->get_default_policy().lossy) {
+ std::lock_guard l{lock};
+ sent[pl.seq] = pl.data;
+ conn_sent[con].push_back(pl.seq);
+ }
+ lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
+ ASSERT_EQ(0, con->send_message(m));
+ }
+
+ uint64_t get_pending() {
+ std::lock_guard l{lock};
+ return sent.size();
+ }
+
+ void clear_pending(ConnectionRef con) {
+ std::lock_guard l{lock};
+
+ for (list<uint64_t>::iterator it = conn_sent[con].begin();
+ it != conn_sent[con].end(); ++it)
+ sent.erase(*it);
+ conn_sent.erase(con);
+ }
+
+ void print() {
+ for (auto && p : conn_sent) {
+ if (!p.second.empty()) {
+ lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl;
+ }
+ }
+ }
+};
+
+
+class SyntheticWorkload {
+ ceph::mutex lock = ceph::make_mutex("SyntheticWorkload::lock");
+ ceph::condition_variable cond;
+ set<Messenger*> available_servers;
+ set<Messenger*> available_clients;
+ Messenger::Policy client_policy;
+ map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections;
+ SyntheticDispatcher dispatcher;
+ gen_type rng;
+ vector<bufferlist> rand_data;
+ DummyAuthClientServer dummy_auth;
+
+ public:
+ const unsigned max_in_flight = 0;
+ const unsigned max_connections = 0;
+ static const unsigned max_message_len = 1024 * 1024 * 4;
+
+ SyntheticWorkload(int servers, int clients, string type, int random_num,
+ Messenger::Policy srv_policy, Messenger::Policy cli_policy,
+ int _max_in_flight = 64, int _max_connections = 128)
+ : client_policy(cli_policy),
+ dispatcher(false, this),
+ rng(time(NULL)),
+ dummy_auth(g_ceph_context),
+ max_in_flight(_max_in_flight),
+ max_connections(_max_connections) {
+
+ dummy_auth.auth_registry.refresh_config();
+ Messenger *msgr;
+ int base_port = 16800;
+ entity_addr_t bind_addr;
+ char addr[64];
+ for (int i = 0; i < servers; ++i) {
+ msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
+ "server", getpid()+i);
+ snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d",
+ base_port+i);
+ bind_addr.parse(addr);
+ msgr->bind(bind_addr);
+ msgr->add_dispatcher_head(&dispatcher);
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+
+ ceph_assert(msgr);
+ msgr->set_default_policy(srv_policy);
+ available_servers.insert(msgr);
+ msgr->start();
+ }
+
+ for (int i = 0; i < clients; ++i) {
+ msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
+ "client", getpid()+i+servers);
+ if (cli_policy.standby) {
+ snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d",
+ base_port+i+servers);
+ bind_addr.parse(addr);
+ msgr->bind(bind_addr);
+ }
+ msgr->add_dispatcher_head(&dispatcher);
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+
+ ceph_assert(msgr);
+ msgr->set_default_policy(cli_policy);
+ available_clients.insert(msgr);
+ msgr->start();
+ }
+
+ for (int i = 0; i < random_num; i++) {
+ bufferlist bl;
+ boost::uniform_int<> u(32, max_message_len);
+ uint64_t value_len = u(rng);
+ bufferptr bp(value_len);
+ bp.zero();
+ for (uint64_t j = 0; j < value_len-sizeof(i); ) {
+ memcpy(bp.c_str()+j, &i, sizeof(i));
+ j += 4096;
+ }
+
+ bl.append(bp);
+ rand_data.push_back(bl);
+ }
+ }
+
+ ConnectionRef _get_random_connection() {
+ while (dispatcher.get_pending() > max_in_flight) {
+ lock.unlock();
+ usleep(500);
+ lock.lock();
+ }
+ ceph_assert(ceph_mutex_is_locked(lock));
+ boost::uniform_int<> choose(0, available_connections.size() - 1);
+ int index = choose(rng);
+ map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
+ for (; index > 0; --index, ++i) ;
+ return i->first;
+ }
+
+ bool can_create_connection() {
+ return available_connections.size() < max_connections;
+ }
+
+ void generate_connection() {
+ std::lock_guard l{lock};
+ if (!can_create_connection())
+ return ;
+
+ Messenger *server, *client;
+ {
+ boost::uniform_int<> choose(0, available_servers.size() - 1);
+ int index = choose(rng);
+ set<Messenger*>::iterator i = available_servers.begin();
+ for (; index > 0; --index, ++i) ;
+ server = *i;
+ }
+ {
+ boost::uniform_int<> choose(0, available_clients.size() - 1);
+ int index = choose(rng);
+ set<Messenger*>::iterator i = available_clients.begin();
+ for (; index > 0; --index, ++i) ;
+ client = *i;
+ }
+
+ pair<Messenger*, Messenger*> p;
+ {
+ boost::uniform_int<> choose(0, available_servers.size() - 1);
+ if (server->get_default_policy().server) {
+ p = make_pair(client, server);
+ ConnectionRef conn = client->connect_to(server->get_mytype(),
+ server->get_myaddrs());
+ available_connections[conn] = p;
+ } else {
+ ConnectionRef conn = client->connect_to(server->get_mytype(),
+ server->get_myaddrs());
+ p = make_pair(client, server);
+ available_connections[conn] = p;
+ }
+ }
+ }
+
+ void send_message() {
+ std::lock_guard l{lock};
+ ConnectionRef conn = _get_random_connection();
+ boost::uniform_int<> true_false(0, 99);
+ int val = true_false(rng);
+ if (val >= 95) {
+ uuid_d uuid;
+ uuid.generate_random();
+ MCommand *m = new MCommand(uuid);
+ vector<string> cmds;
+ cmds.push_back("command");
+ m->cmd = cmds;
+ m->set_priority(200);
+ conn->send_message(m);
+ } else {
+ boost::uniform_int<> u(0, rand_data.size()-1);
+ dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
+ }
+ }
+
+ void send_large_message(bool inject_network_congestion=false) {
+ std::lock_guard l{lock};
+ ConnectionRef conn = _get_random_connection();
+ uuid_d uuid;
+ uuid.generate_random();
+ MCommand *m = new MCommand(uuid);
+ vector<string> cmds;
+ cmds.push_back("command");
+ // set the random data to make the large message
+ bufferlist bl;
+ string s("abcdefghijklmnopqrstuvwxyz");
+ for (int i = 0; i < 1024*256; i++)
+ bl.append(s);
+ // bl is around 6M
+ m->set_data(bl);
+ m->cmd = cmds;
+ m->set_priority(200);
+ // setup after connection is ready
+ if (inject_network_congestion && conn->is_connected()) {
+ g_ceph_context->_conf.set_val("ms_inject_network_congestion", "100");
+ } else {
+ g_ceph_context->_conf.set_val("ms_inject_network_congestion", "0");
+ }
+ conn->send_message(m);
+ }
+
+ void drop_connection() {
+ std::lock_guard l{lock};
+ if (available_connections.size() < 10)
+ return;
+ ConnectionRef conn = _get_random_connection();
+ dispatcher.clear_pending(conn);
+ conn->mark_down();
+ if (!client_policy.server &&
+ !client_policy.lossy &&
+ client_policy.standby) {
+ // it's a lossless policy, so we need to mark down each side
+ pair<Messenger*, Messenger*> &p = available_connections[conn];
+ if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
+ ASSERT_EQ(conn->get_messenger(), p.first);
+ ConnectionRef peer = p.second->connect_to(p.first->get_mytype(),
+ p.first->get_myaddrs());
+ peer->mark_down();
+ dispatcher.clear_pending(peer);
+ available_connections.erase(peer);
+ }
+ }
+ ASSERT_EQ(available_connections.erase(conn), 1U);
+ }
+
+ void print_internal_state(bool detail=false) {
+ std::lock_guard l{lock};
+ lderr(g_ceph_context) << "available_connections: " << available_connections.size()
+ << " inflight messages: " << dispatcher.get_pending() << dendl;
+ if (detail && !available_connections.empty()) {
+ dispatcher.print();
+ }
+ }
+
+ void wait_for_done() {
+ int64_t tick_us = 1000 * 100; // 100ms
+ int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins
+ int i = 0;
+ while (dispatcher.get_pending()) {
+ usleep(tick_us);
+ timeout_us -= tick_us;
+ if (i++ % 50 == 0)
+ print_internal_state(true);
+ if (timeout_us < 0)
+ ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!");
+ }
+ for (set<Messenger*>::iterator it = available_servers.begin();
+ it != available_servers.end(); ++it) {
+ (*it)->shutdown();
+ (*it)->wait();
+ ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
+ delete (*it);
+ }
+ available_servers.clear();
+
+ for (set<Messenger*>::iterator it = available_clients.begin();
+ it != available_clients.end(); ++it) {
+ (*it)->shutdown();
+ (*it)->wait();
+ ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
+ delete (*it);
+ }
+ available_clients.clear();
+ }
+
+ void handle_reset(Connection *con) {
+ std::lock_guard l{lock};
+ available_connections.erase(con);
+ dispatcher.clear_pending(con);
+ }
+};
+
+bool SyntheticDispatcher::ms_handle_reset(Connection *con) {
+ workload->handle_reset(con);
+ return true;
+}
+
+TEST_P(MessengerTest, SyntheticStressTest) {
+ SyntheticWorkload test_msg(8, 32, GetParam(), 100,
+ Messenger::Policy::stateful_server(0),
+ Messenger::Policy::lossless_client(0));
+ for (int i = 0; i < 100; ++i) {
+ if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
+ test_msg.generate_connection();
+ }
+ gen_type rng(time(NULL));
+ for (int i = 0; i < 5000; ++i) {
+ if (!(i % 10)) {
+ lderr(g_ceph_context) << "Op " << i << ": " << dendl;
+ test_msg.print_internal_state();
+ }
+ boost::uniform_int<> true_false(0, 99);
+ int val = true_false(rng);
+ if (val > 90) {
+ test_msg.generate_connection();
+ } else if (val > 80) {
+ test_msg.drop_connection();
+ } else if (val > 10) {
+ test_msg.send_message();
+ } else {
+ usleep(rand() % 1000 + 500);
+ }
+ }
+ test_msg.wait_for_done();
+}
+
+TEST_P(MessengerTest, SyntheticStressTest1) {
+ SyntheticWorkload test_msg(16, 32, GetParam(), 100,
+ Messenger::Policy::lossless_peer_reuse(0),
+ Messenger::Policy::lossless_peer_reuse(0));
+ for (int i = 0; i < 10; ++i) {
+ if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
+ test_msg.generate_connection();
+ }
+ gen_type rng(time(NULL));
+ for (int i = 0; i < 10000; ++i) {
+ if (!(i % 10)) {
+ lderr(g_ceph_context) << "Op " << i << ": " << dendl;
+ test_msg.print_internal_state();
+ }
+ boost::uniform_int<> true_false(0, 99);
+ int val = true_false(rng);
+ if (val > 80) {
+ test_msg.generate_connection();
+ } else if (val > 60) {
+ test_msg.drop_connection();
+ } else if (val > 10) {
+ test_msg.send_message();
+ } else {
+ usleep(rand() % 1000 + 500);
+ }
+ }
+ test_msg.wait_for_done();
+}
+
+
+TEST_P(MessengerTest, SyntheticInjectTest) {
+ uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes;
+ g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
+ g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
+ g_ceph_context->_conf.set_val("ms_dispatch_throttle_bytes", "16777216");
+ SyntheticWorkload test_msg(8, 32, GetParam(), 100,
+ Messenger::Policy::stateful_server(0),
+ Messenger::Policy::lossless_client(0));
+ for (int i = 0; i < 100; ++i) {
+ if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
+ test_msg.generate_connection();
+ }
+ gen_type rng(time(NULL));
+ for (int i = 0; i < 1000; ++i) {
+ if (!(i % 10)) {
+ lderr(g_ceph_context) << "Op " << i << ": " << dendl;
+ test_msg.print_internal_state();
+ }
+ boost::uniform_int<> true_false(0, 99);
+ int val = true_false(rng);
+ if (val > 90) {
+ test_msg.generate_connection();
+ } else if (val > 80) {
+ test_msg.drop_connection();
+ } else if (val > 10) {
+ test_msg.send_message();
+ } else {
+ usleep(rand() % 500 + 100);
+ }
+ }
+ test_msg.wait_for_done();
+ g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
+ g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
+ g_ceph_context->_conf.set_val(
+ "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes));
+}
+
+TEST_P(MessengerTest, SyntheticInjectTest2) {
+ g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
+ g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
+ SyntheticWorkload test_msg(8, 16, GetParam(), 100,
+ Messenger::Policy::lossless_peer_reuse(0),
+ Messenger::Policy::lossless_peer_reuse(0));
+ for (int i = 0; i < 100; ++i) {
+ if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
+ test_msg.generate_connection();
+ }
+ gen_type rng(time(NULL));
+ for (int i = 0; i < 1000; ++i) {
+ if (!(i % 10)) {
+ lderr(g_ceph_context) << "Op " << i << ": " << dendl;
+ test_msg.print_internal_state();
+ }
+ boost::uniform_int<> true_false(0, 99);
+ int val = true_false(rng);
+ if (val > 90) {
+ test_msg.generate_connection();
+ } else if (val > 80) {
+ test_msg.drop_connection();
+ } else if (val > 10) {
+ test_msg.send_message();
+ } else {
+ usleep(rand() % 500 + 100);
+ }
+ }
+ test_msg.wait_for_done();
+ g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
+ g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
+}
+
+TEST_P(MessengerTest, SyntheticInjectTest3) {
+ g_ceph_context->_conf.set_val("ms_inject_socket_failures", "600");
+ g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
+ SyntheticWorkload test_msg(8, 16, GetParam(), 100,
+ Messenger::Policy::stateless_server(0),
+ Messenger::Policy::lossy_client(0));
+ for (int i = 0; i < 100; ++i) {
+ if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
+ test_msg.generate_connection();
+ }
+ gen_type rng(time(NULL));
+ for (int i = 0; i < 1000; ++i) {
+ if (!(i % 10)) {
+ lderr(g_ceph_context) << "Op " << i << ": " << dendl;
+ test_msg.print_internal_state();
+ }
+ boost::uniform_int<> true_false(0, 99);
+ int val = true_false(rng);
+ if (val > 90) {
+ test_msg.generate_connection();
+ } else if (val > 80) {
+ test_msg.drop_connection();
+ } else if (val > 10) {
+ test_msg.send_message();
+ } else {
+ usleep(rand() % 500 + 100);
+ }
+ }
+ test_msg.wait_for_done();
+ g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
+ g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
+}
+
+
+TEST_P(MessengerTest, SyntheticInjectTest4) {
+ g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
+ g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
+ g_ceph_context->_conf.set_val("ms_inject_delay_probability", "1");
+ g_ceph_context->_conf.set_val("ms_inject_delay_type", "client osd");
+ g_ceph_context->_conf.set_val("ms_inject_delay_max", "5");
+ SyntheticWorkload test_msg(16, 32, GetParam(), 100,
+ Messenger::Policy::lossless_peer(0),
+ Messenger::Policy::lossless_peer(0));
+ for (int i = 0; i < 100; ++i) {
+ if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
+ test_msg.generate_connection();
+ }
+ gen_type rng(time(NULL));
+ for (int i = 0; i < 1000; ++i) {
+ if (!(i % 10)) {
+ lderr(g_ceph_context) << "Op " << i << ": " << dendl;
+ test_msg.print_internal_state();
+ }
+ boost::uniform_int<> true_false(0, 99);
+ int val = true_false(rng);
+ if (val > 95) {
+ test_msg.generate_connection();
+ } else if (val > 80) {
+ // test_msg.drop_connection();
+ } else if (val > 10) {
+ test_msg.send_message();
+ } else {
+ usleep(rand() % 500 + 100);
+ }
+ }
+ test_msg.wait_for_done();
+ g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
+ g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
+ g_ceph_context->_conf.set_val("ms_inject_delay_probability", "0");
+ g_ceph_context->_conf.set_val("ms_inject_delay_type", "");
+ g_ceph_context->_conf.set_val("ms_inject_delay_max", "0");
+}
+
+// This is test for network block, means ::send return EAGAIN
+TEST_P(MessengerTest, SyntheticInjectTest5) {
+ SyntheticWorkload test_msg(1, 8, GetParam(), 100,
+ Messenger::Policy::stateful_server(0),
+ Messenger::Policy::lossless_client(0),
+ 64, 2);
+ bool simulate_network_congestion = true;
+ for (int i = 0; i < 2; ++i)
+ test_msg.generate_connection();
+ for (int i = 0; i < 5000; ++i) {
+ if (!(i % 10)) {
+ ldout(g_ceph_context, 0) << "Op " << i << ": " << dendl;
+ test_msg.print_internal_state();
+ }
+ if (i < 1600) {
+ // means that we would stuck 1600 * 6M (9.6G) around with 2 connections
+ test_msg.send_large_message(simulate_network_congestion);
+ } else {
+ simulate_network_congestion = false;
+ test_msg.send_large_message(simulate_network_congestion);
+ }
+ }
+ test_msg.wait_for_done();
+}
+
+
+class MarkdownDispatcher : public Dispatcher {
+ ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock");
+ set<ConnectionRef> conns;
+ bool last_mark;
+ public:
+ std::atomic<uint64_t> count = { 0 };
+ explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context),
+ last_mark(false) {
+ }
+ bool ms_can_fast_dispatch_any() const override { return false; }
+ bool ms_can_fast_dispatch(const Message *m) const override {
+ switch (m->get_type()) {
+ case CEPH_MSG_PING:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ void ms_handle_fast_connect(Connection *con) override {
+ lderr(g_ceph_context) << __func__ << " " << con << dendl;
+ std::lock_guard l{lock};
+ conns.insert(con);
+ }
+ void ms_handle_fast_accept(Connection *con) override {
+ std::lock_guard l{lock};
+ conns.insert(con);
+ }
+ bool ms_dispatch(Message *m) override {
+ lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl;
+ std::lock_guard l{lock};
+ count++;
+ conns.insert(m->get_connection());
+ if (conns.size() < 2 && !last_mark) {
+ m->put();
+ return true;
+ }
+
+ last_mark = true;
+ usleep(rand() % 500);
+ for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) {
+ if ((*it) != m->get_connection().get()) {
+ (*it)->mark_down();
+ conns.erase(it);
+ break;
+ }
+ }
+ if (conns.empty())
+ last_mark = false;
+ m->put();
+ return true;
+ }
+ bool ms_handle_reset(Connection *con) override {
+ lderr(g_ceph_context) << __func__ << " " << con << dendl;
+ std::lock_guard l{lock};
+ conns.erase(con);
+ usleep(rand() % 500);
+ return true;
+ }
+ void ms_handle_remote_reset(Connection *con) override {
+ std::lock_guard l{lock};
+ conns.erase(con);
+ lderr(g_ceph_context) << __func__ << " " << con << dendl;
+ }
+ bool ms_handle_refused(Connection *con) override {
+ return false;
+ }
+ void ms_fast_dispatch(Message *m) override {
+ ceph_abort();
+ }
+ int ms_handle_fast_authentication(Connection *con) override {
+ return 1;
+ }
+};
+
+
+// Markdown with external lock
+TEST_P(MessengerTest, MarkdownTest) {
+ Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
+ MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ DummyAuthClientServer dummy_auth(g_ceph_context);
+ dummy_auth.auth_registry.refresh_config();
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1:16800");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->set_auth_client(&dummy_auth);
+ server_msgr->set_auth_server(&dummy_auth);
+ server_msgr->start();
+ bind_addr.parse("v2:127.0.0.1:16801");
+ server_msgr2->bind(bind_addr);
+ server_msgr2->add_dispatcher_head(&srv_dispatcher);
+ server_msgr2->set_auth_client(&dummy_auth);
+ server_msgr2->set_auth_server(&dummy_auth);
+ server_msgr2->start();
+
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->set_auth_client(&dummy_auth);
+ client_msgr->set_auth_server(&dummy_auth);
+ client_msgr->start();
+
+ int i = 1000;
+ uint64_t last = 0;
+ bool equal = false;
+ uint64_t equal_count = 0;
+ while (i--) {
+ ConnectionRef conn1 = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ ConnectionRef conn2 = client_msgr->connect_to(server_msgr2->get_mytype(),
+ server_msgr2->get_myaddrs());
+ MPing *m = new MPing();
+ ASSERT_EQ(conn1->send_message(m), 0);
+ m = new MPing();
+ ASSERT_EQ(conn2->send_message(m), 0);
+ CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1);
+ if (srv_dispatcher.count == last) {
+ lderr(g_ceph_context) << __func__ << " last is " << last << dendl;
+ equal = true;
+ equal_count++;
+ } else {
+ equal = false;
+ equal_count = 0;
+ }
+ last = srv_dispatcher.count;
+ if (equal_count)
+ usleep(1000*500);
+ ASSERT_FALSE(equal && equal_count > 3);
+ }
+ server_msgr->shutdown();
+ client_msgr->shutdown();
+ server_msgr2->shutdown();
+ server_msgr->wait();
+ client_msgr->wait();
+ server_msgr2->wait();
+ delete server_msgr2;
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ Messenger,
+ MessengerTest,
+ ::testing::Values(
+ "async+posix"
+ )
+);
+
+int main(int argc, char **argv) {
+ auto args = argv_to_vec(argc, argv);
+
+ auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ g_ceph_context->_conf.set_val("auth_cluster_required", "none");
+ g_ceph_context->_conf.set_val("auth_service_required", "none");
+ g_ceph_context->_conf.set_val("auth_client_required", "none");
+ g_ceph_context->_conf.set_val("keyring", "/dev/null");
+ g_ceph_context->_conf.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
+ g_ceph_context->_conf.set_val("ms_die_on_bad_msg", "true");
+ g_ceph_context->_conf.set_val("ms_die_on_old_message", "true");
+ g_ceph_context->_conf.set_val("ms_max_backoff", "1");
+ common_init_finish(g_ceph_context);
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+/*
+ * Local Variables:
+ * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"
+ * End:
+ */
diff --git a/src/test/msgr/test_userspace_event.cc b/src/test/msgr/test_userspace_event.cc
new file mode 100644
index 000000000..067d81545
--- /dev/null
+++ b/src/test/msgr/test_userspace_event.cc
@@ -0,0 +1,174 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <map>
+#include <random>
+#include <gtest/gtest.h>
+
+#include "msg/async/dpdk/UserspaceEvent.h"
+#include "global/global_context.h"
+
+class UserspaceManagerTest : public ::testing::Test {
+ public:
+ UserspaceEventManager *manager;
+
+ UserspaceManagerTest() {}
+ virtual void SetUp() {
+ manager = new UserspaceEventManager(g_ceph_context);
+ }
+ virtual void TearDown() {
+ delete manager;
+ }
+};
+
+TEST_F(UserspaceManagerTest, BasicTest) {
+ int events[10];
+ int masks[10];
+ int fd = manager->get_eventfd();
+ ASSERT_EQ(fd, 1);
+ ASSERT_EQ(0, manager->listen(fd, 1));
+ ASSERT_EQ(0, manager->notify(fd, 1));
+ ASSERT_EQ(1, manager->poll(events, masks, 10, nullptr));
+ ASSERT_EQ(fd, events[0]);
+ ASSERT_EQ(1, masks[0]);
+ ASSERT_EQ(0, manager->notify(fd, 2));
+ ASSERT_EQ(0, manager->poll(events, masks, 10, nullptr));
+ ASSERT_EQ(0, manager->unlisten(fd, 1));
+ ASSERT_EQ(0, manager->notify(fd, 1));
+ ASSERT_EQ(0, manager->poll(events, masks, 10, nullptr));
+ manager->close(fd);
+ fd = manager->get_eventfd();
+ ASSERT_EQ(fd, 1);
+ ASSERT_EQ(0, manager->poll(events, masks, 10, nullptr));
+}
+
+TEST_F(UserspaceManagerTest, FailTest) {
+ int events[10];
+ int masks[10];
+ int fd = manager->get_eventfd();
+ ASSERT_EQ(fd, 1);
+ ASSERT_EQ(-ENOENT, manager->listen(fd+1, 1));
+ ASSERT_EQ(-ENOENT, manager->notify(fd+1, 1));
+ ASSERT_EQ(0, manager->poll(events, masks, 10, nullptr));
+ ASSERT_EQ(-ENOENT, manager->unlisten(fd+1, 1));
+ manager->close(fd);
+}
+
+TEST_F(UserspaceManagerTest, StressTest) {
+ std::vector<std::pair<int, int> > mappings;
+ int events[10];
+ int masks[10];
+ std::random_device rd;
+ std::default_random_engine rng(rd());
+ std::uniform_int_distribution<> dist(0, 100);
+
+ mappings.resize(1001);
+ mappings[0] = std::make_pair(-1, -1);
+ for (int i = 0; i < 1000; ++i) {
+ int fd = manager->get_eventfd();
+ ASSERT_TRUE(fd > 0);
+ mappings[fd] = std::make_pair(0, 0);
+ }
+ int r = 0;
+ int fd = manager->get_eventfd();
+ auto get_activate_count = [](std::vector<std::pair<int, int> > &m) {
+ std::vector<int> fds;
+ int mask = 0;
+ size_t idx = 0;
+ for (auto &&p : m) {
+ mask = p.first & p.second;
+ if (p.first != -1 && mask) {
+ p.second &= (~mask);
+ fds.push_back(idx);
+ std::cerr << " activate " << idx << " mask " << mask << std::endl;
+ }
+ ++idx;
+ }
+ return fds;
+ };
+ for (int i = 0; i < 10000; ++i) {
+ int value = dist(rng);
+ fd = dist(rng) % mappings.size();
+ auto &p = mappings[fd];
+ int mask = dist(rng) % 2 + 1;
+ if (value > 55) {
+ r = manager->notify(fd, mask);
+ if (p.first == -1) {
+ ASSERT_EQ(p.second, -1);
+ ASSERT_EQ(r, -ENOENT);
+ } else {
+ p.second |= mask;
+ ASSERT_EQ(r, 0);
+ }
+ std::cerr << " notify fd " << fd << " mask " << mask << " r " << r << std::endl;
+ } else if (value > 45) {
+ r = manager->listen(fd, mask);
+ std::cerr << " listen fd " << fd << " mask " << mask << " r " << r << std::endl;
+ if (p.first == -1) {
+ ASSERT_EQ(p.second, -1);
+ ASSERT_EQ(r, -ENOENT);
+ } else {
+ p.first |= mask;
+ ASSERT_EQ(r, 0);
+ }
+ } else if (value > 35) {
+ r = manager->unlisten(fd, mask);
+ std::cerr << " unlisten fd " << fd << " mask " << mask << " r " << r << std::endl;
+ if (p.first == -1) {
+ ASSERT_EQ(p.second, -1);
+ ASSERT_EQ(r, -ENOENT);
+ } else {
+ p.first &= ~mask;
+ ASSERT_EQ(r, 0);
+ }
+ } else if (value > 20) {
+ std::set<int> actual, expected;
+ do {
+ r = manager->poll(events, masks, 3, nullptr);
+ std::cerr << " poll " << r;
+ for (int k = 0; k < r; ++k) {
+ std::cerr << events[k] << " ";
+ actual.insert(events[k]);
+ }
+ } while (r == 3);
+ std::cerr << std::endl;
+ auto fds = get_activate_count(mappings);
+ for (auto &&d : fds)
+ expected.insert(d);
+ ASSERT_EQ(expected, actual);
+ } else if (value > 10) {
+ r = manager->get_eventfd();
+ std::cerr << " open fd " << r << std::endl;
+ ASSERT_TRUE(r > 0);
+ if ((size_t)r >= mappings.size())
+ mappings.resize(r+1);
+ mappings[r] = std::make_pair(0, 0);
+ } else {
+ manager->close(fd);
+ std::cerr << " close fd " << fd << std::endl;
+ mappings[fd] = std::make_pair(-1, -1);
+ }
+ ASSERT_TRUE(manager->check());
+ }
+}
+
+/*
+ * Local Variables:
+ * compile-command: "cd ../.. ; make ceph_test_userspace_event &&
+ * ./ceph_test_userspace_event.cc
+ *
+ * End:
+ */