summaryrefslogtreecommitdiffstats
path: root/src/test/messenger
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/messenger')
-rw-r--r--src/test/messenger/CMakeLists.txt43
-rw-r--r--src/test/messenger/message_helper.h135
-rw-r--r--src/test/messenger/simple_client.cc160
-rw-r--r--src/test/messenger/simple_dispatcher.cc85
-rw-r--r--src/test/messenger/simple_dispatcher.h104
-rw-r--r--src/test/messenger/simple_server.cc107
-rw-r--r--src/test/messenger/xio_client.cc189
-rw-r--r--src/test/messenger/xio_dispatcher.cc76
-rw-r--r--src/test/messenger/xio_dispatcher.h102
-rw-r--r--src/test/messenger/xio_server.cc122
10 files changed, 1123 insertions, 0 deletions
diff --git a/src/test/messenger/CMakeLists.txt b/src/test/messenger/CMakeLists.txt
new file mode 100644
index 00000000..7981fa6d
--- /dev/null
+++ b/src/test/messenger/CMakeLists.txt
@@ -0,0 +1,43 @@
+add_executable(simple_server
+ simple_server.cc
+ simple_dispatcher.cc
+ )
+target_link_libraries(simple_server
+ global ceph-common
+ ${EXTRALIBS}
+ ${CMAKE_DL_LIBS}
+ )
+
+add_executable(simple_client
+ simple_client.cc
+ simple_dispatcher.cc
+ )
+target_link_libraries(simple_client
+ global ceph-common
+ ${EXTRALIBS}
+ ${CMAKE_DL_LIBS}
+ )
+
+if(HAVE_XIO)
+ add_executable(xio_server
+ xio_server.cc
+ xio_dispatcher.cc
+ )
+ target_link_libraries(xio_server
+ global ceph-common
+ ${XIO_LIBRARY} pthread rt
+ ${EXTRALIBS}
+ ${CMAKE_DL_LIBS}
+ )
+
+ add_executable(xio_client
+ xio_client.cc
+ xio_dispatcher.cc
+ )
+ target_link_libraries(xio_client
+ global ceph-common
+ ${XIO_LIBRARY} pthread rt
+ ${EXTRALIBS}
+ ${CMAKE_DL_LIBS}
+ )
+endif(HAVE_XIO)
diff --git a/src/test/messenger/message_helper.h b/src/test/messenger/message_helper.h
new file mode 100644
index 00000000..45b11c87
--- /dev/null
+++ b/src/test/messenger/message_helper.h
@@ -0,0 +1,135 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef MESSAGE_HELPER_H
+#define MESSAGE_HELPER_H
+
+#include "msg/msg_types.h"
+#include "messages/MDataPing.h"
+#if defined(HAVE_XIO)
+#include "msg/xio/XioMessenger.h"
+#endif
+
+static inline Message* new_ping_monstyle(const char *tag, int mult)
+{
+ Message *m = new MPing();
+ Formatter *f = new JSONFormatter(true);
+
+ string str = "one giant step for ";
+
+ f->open_object_section(tag);
+ for (int ix = 0; ix < mult; ++ix) {
+ f->dump_string(tag, str);
+ }
+ f->close_section();
+
+ bufferlist bl;
+ stringstream ss;
+
+ f->flush(ss);
+ encode(ss.str(), bl);
+ m->set_payload(bl);
+
+ return m;
+}
+
+#if defined(HAVE_XIO)
+extern struct xio_mempool *xio_msgr_mpool;
+
+void xio_hook_func(struct xio_reg_mem *mp)
+{
+ xio_mempool_free(mp);
+}
+
+static inline Message* new_ping_with_data(const char *tag, uint32_t size)
+{
+ static uint32_t counter;
+
+ MDataPing *m = new MDataPing();
+ m->counter = counter++;
+ m->tag = tag;
+
+ bufferlist bl;
+ void *p;
+
+ struct xio_reg_mem *mp = m->get_mp();
+ int e = xio_mempool_alloc(xio_msgr_mpool, size, mp);
+ ceph_assert(e == 0);
+ p = mp->addr;
+ m->set_rdma_hook(xio_hook_func);
+
+ strcpy((char*) p, tag);
+ uint32_t* t = (uint32_t* ) (((char*) p) + size - 32);
+ *t = counter;
+
+ bl.append(buffer::create_static(size, (char*) p));
+ m->set_data(bl);
+
+ return static_cast<Message*>(m);
+}
+#endif
+
+static inline Message* new_simple_ping_with_data(const char *tag,
+ uint32_t size,
+ uint32_t nfrags)
+{
+ static size_t pagesize = sysconf(_SC_PAGESIZE);
+ static uint32_t counter;
+ uint32_t segsize;
+ int do_page_alignment;
+
+ MDataPing *m = new MDataPing();
+ m->counter = counter++;
+ m->tag = tag;
+
+ bufferlist bl;
+ void *p;
+
+ segsize = (size+nfrags-1)/nfrags;
+ segsize = (segsize + 7) & ~7;
+ if (segsize < 32) segsize = 32;
+
+ do_page_alignment = segsize >= 1024;
+ if (do_page_alignment)
+ segsize = (segsize + pagesize - 1) & ~(pagesize - 1);
+ m->free_data = true;
+ for (uint32_t i = 0; i < nfrags; ++i) {
+ if (do_page_alignment) {
+ if (posix_memalign(&p, pagesize, segsize))
+ p = nullptr;
+ } else {
+ p = malloc(segsize);
+ }
+ if (!p)
+ throw std::bad_alloc();
+ strcpy((char*) p, tag);
+ uint32_t* t = (uint32_t* ) (((char*) p) + segsize - 32);
+ *t = counter;
+ t[1] = i;
+
+ bl.append(buffer::create_static(segsize, (char*) p));
+ }
+ m->set_data(bl);
+
+ return static_cast<Message*>(m);
+}
+
+static inline Message* new_simple_ping_with_data(const char *tag,
+ uint32_t size)
+{
+ return new_simple_ping_with_data(tag, size, 1);
+}
+
+
+#endif /* MESSAGE_HELPER_H */
diff --git a/src/test/messenger/simple_client.cc b/src/test/messenger/simple_client.cc
new file mode 100644
index 00000000..ba7ed2b0
--- /dev/null
+++ b/src/test/messenger/simple_client.cc
@@ -0,0 +1,160 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/types.h>
+
+#include <iostream>
+#include <string>
+
+using namespace std;
+
+#include "common/config.h"
+#include "msg/msg_types.h"
+#include "msg/Messenger.h"
+#include "messages/MPing.h"
+#include "common/Timer.h"
+#include "common/ceph_argparse.h"
+#include "global/global_init.h"
+#include "perfglue/heap_profiler.h"
+#include "common/address_helper.h"
+#include "message_helper.h"
+#include "simple_dispatcher.h"
+
+#define dout_subsys ceph_subsys_simple_client
+
+void usage(ostream& out)
+{
+ out << "usage: simple_client [options]\n"
+"options:\n"
+" --addr X\n"
+" --port X\n"
+" --msgs X\n"
+" --dsize X\n"
+ ;
+}
+
+
+int main(int argc, const char **argv)
+{
+ vector<const char*> args;
+ Messenger* messenger;
+ SimpleDispatcher *dispatcher;
+ std::vector<const char*>::iterator arg_iter;
+ std::string val;
+ entity_addr_t dest_addr;
+ ConnectionRef conn;
+ int r = 0;
+
+ std::string addr = "localhost";
+ std::string port = "1234";
+
+ int n_msgs = 50;
+ int n_dsize = 0;
+
+ struct timespec ts;
+ ts.tv_sec = 1;
+ ts.tv_nsec = 0;
+
+ argv_to_vec(argc, argv, args);
+
+ auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_ANY,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+
+ for (arg_iter = args.begin(); arg_iter != args.end();) {
+ if (ceph_argparse_witharg(args, arg_iter, &val, "--addr",
+ (char*) NULL)) {
+ addr = val;
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--port",
+ (char*) NULL)) {
+ port = val;
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--msgs",
+ (char*) NULL)) {
+ n_msgs = atoi(val.c_str());;
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--dsize",
+ (char*) NULL)) {
+ n_dsize = atoi(val.c_str());;
+ } else {
+ ++arg_iter;
+ }
+ };
+
+ if (!args.empty()) {
+ cerr << "What is this? -- " << args[0] << std::endl;
+ usage(cerr);
+ exit(1);
+ }
+
+ cout << "simple_client starting " <<
+ "dest addr " << addr << " " <<
+ "dest port " << port << " " <<
+ "initial msgs (pipe depth) " << n_msgs << " " <<
+ "data buffer size " << n_dsize << std::endl;
+
+ messenger = Messenger::create(g_ceph_context, g_conf().get_val<std::string>("ms_type"),
+ entity_name_t::MON(-1),
+ "client",
+ getpid(), 0);
+
+ // enable timing prints
+ messenger->set_magic(MSG_MAGIC_TRACE_CTR);
+ messenger->set_default_policy(Messenger::Policy::lossy_client(0));
+
+ string dest_str = "tcp://";
+ dest_str += addr;
+ dest_str += ":";
+ dest_str += port;
+ entity_addr_from_url(&dest_addr, dest_str.c_str());
+ entity_addrvec_t dest_addrs(dest_addr);
+
+ dispatcher = new SimpleDispatcher(messenger);
+ messenger->add_dispatcher_head(dispatcher);
+
+ dispatcher->set_active(); // this side is the pinger
+
+ r = messenger->start();
+ if (r < 0)
+ goto out;
+
+ conn = messenger->connect_to_mon(dest_addrs);
+
+ // do stuff
+ time_t t1, t2;
+
+ t1 = time(NULL);
+
+ int msg_ix;
+ Message *m;
+ for (msg_ix = 0; msg_ix < n_msgs; ++msg_ix) {
+ /* add a data payload if asked */
+ if (! n_dsize) {
+ m = new MPing();
+ } else {
+ m = new_simple_ping_with_data("simple_client", n_dsize);
+ }
+ conn->send_message(m);
+ }
+
+ // do stuff
+ while (conn->is_connected()) {
+ nanosleep(&ts, NULL);
+ }
+
+ t2 = time(NULL);
+ cout << "Processed " << dispatcher->get_dcount() + n_msgs
+ << " round-trip messages in " << t2-t1 << "s"
+ << std::endl;
+out:
+ return r;
+}
diff --git a/src/test/messenger/simple_dispatcher.cc b/src/test/messenger/simple_dispatcher.cc
new file mode 100644
index 00000000..b13958d3
--- /dev/null
+++ b/src/test/messenger/simple_dispatcher.cc
@@ -0,0 +1,85 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "include/compat.h"
+
+#include "simple_dispatcher.h"
+#include "messages/MPing.h"
+#include "messages/MDataPing.h"
+
+SimpleDispatcher::SimpleDispatcher(Messenger *msgr) :
+ Dispatcher(msgr->cct),
+ active(false),
+ messenger(msgr),
+ dcount(0)
+{
+ // nothing
+}
+
+SimpleDispatcher::~SimpleDispatcher() {
+ // nothing
+}
+
+bool SimpleDispatcher::ms_dispatch(Message *m)
+{
+ uint64_t dc = 0;
+
+ dc = dcount++;
+
+ ConnectionRef con = m->get_connection();
+ Messenger* msgr = con->get_messenger();
+
+ switch (m->get_type()) {
+ case CEPH_MSG_PING:
+ break;
+ case MSG_DATA_PING:
+ {
+ MDataPing* mdp __attribute__((unused)) = static_cast<MDataPing*>(m);
+ //cout << "MDataPing " << mdp->tag << " " << mdp->counter << std::endl;
+ //mdp->get_data().hexdump(cout);
+ ConnectionRef con = m->get_connection();
+ con->send_message(m);
+ }
+ break;
+ default:
+ ceph_abort();
+ }
+
+ if (unlikely(msgr->get_magic() & MSG_MAGIC_TRACE_CTR)) {
+ if (unlikely(dc % 65536) == 0) {
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME_COARSE, &ts);
+ std::cout << "ping " << dc << " nanos: " <<
+ ts.tv_nsec + (ts.tv_sec * 1000000000) << std::endl;
+ }
+ } /* trace ctr */
+
+
+ con->send_message(m);
+
+ //m->put();
+
+ return true;
+}
+
+bool SimpleDispatcher::ms_handle_reset(Connection *con)
+{
+ return true;
+}
+
+void SimpleDispatcher::ms_handle_remote_reset(Connection *con)
+{
+ // nothing
+}
+
diff --git a/src/test/messenger/simple_dispatcher.h b/src/test/messenger/simple_dispatcher.h
new file mode 100644
index 00000000..a2d3bd46
--- /dev/null
+++ b/src/test/messenger/simple_dispatcher.h
@@ -0,0 +1,104 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef SIMPLEDISPATCHER_H_
+#define SIMPLEDISPATCHER_H_
+
+#include "msg/Dispatcher.h"
+#include "msg/Messenger.h"
+
+class SimpleDispatcher: public Dispatcher {
+private:
+ bool active;
+ Messenger *messenger;
+ uint64_t dcount;
+public:
+ explicit SimpleDispatcher(Messenger *msgr);
+ ~SimpleDispatcher() override;
+
+ uint64_t get_dcount() { return dcount; }
+
+ void set_active() {
+ active = true;
+ };
+
+ // how i receive messages
+ bool ms_dispatch(Message *m) override;
+
+ /**
+ * This function will be called whenever a new Connection is made to the
+ * Messenger.
+ *
+ * @param con The new Connection which has been established. You are not
+ * granted a reference to it -- take one if you need one!
+ */
+ void ms_handle_connect(Connection *con) override { };
+
+ /**
+ * Callback indicating we have accepted an incoming connection.
+ *
+ * @param con The (new or existing) Connection associated with the session
+ */
+ void ms_handle_accept(Connection *con) override { };
+
+ /*
+ * this indicates that the ordered+reliable delivery semantics have
+ * been violated. Messages may have been lost due to a fault
+ * in the network connection.
+ * Only called on lossy Connections or those you've
+ * designated mark_down_on_empty().
+ *
+ * @param con The Connection which broke. You are not granted
+ * a reference to it.
+ */
+ bool ms_handle_reset(Connection *con) override;
+
+ /**
+ * This indicates that the ordered+reliable delivery semantics
+ * have been violated because the remote somehow reset.
+ * It implies that incoming messages were dropped, and
+ * probably some of our previous outgoing messages were too.
+ *
+ * @param con The Connection which broke. You are not granted
+ * a reference to it.
+ */
+ void ms_handle_remote_reset(Connection *con) override;
+
+ bool ms_handle_refused(Connection *con) override { return false; }
+
+ /**
+ * @defgroup Authentication
+ * @{
+ */
+ /**
+ * Retrieve the AuthAuthorizer for the given peer type. It might not
+ * provide one if it knows there is no AuthAuthorizer for that type.
+ *
+ * @param dest_type The peer type we want the authorizer for.
+ * @param a Double pointer to an AuthAuthorizer. The Dispatcher will fill
+ * in *a with the correct AuthAuthorizer, if it can. Make sure that you have
+ * set *a to NULL before calling in.
+ *
+ * @return True if this function call properly filled in *a, false otherwise.
+ */
+ bool ms_get_authorizer(int dest_type, AuthAuthorizer **a) override {
+ return false;
+ };
+
+ int ms_handle_authentication(Connection *con) override {
+ return 1;
+ }
+};
+
+#endif /* SIMPLEDISPATCHER_H_ */
diff --git a/src/test/messenger/simple_server.cc b/src/test/messenger/simple_server.cc
new file mode 100644
index 00000000..8b85f3af
--- /dev/null
+++ b/src/test/messenger/simple_server.cc
@@ -0,0 +1,107 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/types.h>
+
+#include <iostream>
+#include <string>
+
+using namespace std;
+
+#include "common/config.h"
+#include "msg/Messenger.h"
+#include "common/Timer.h"
+#include "common/ceph_argparse.h"
+#include "global/global_init.h"
+#include "global/signal_handler.h"
+#include "perfglue/heap_profiler.h"
+#include "common/address_helper.h"
+#include "simple_dispatcher.h"
+
+#define dout_subsys ceph_subsys_simple_server
+
+
+int main(int argc, const char **argv)
+{
+ vector<const char*> args;
+ Messenger *messenger;
+ Dispatcher *dispatcher;
+ std::vector<const char*>::iterator arg_iter;
+ std::string val;
+ entity_addr_t bind_addr;
+ int r = 0;
+
+ using std::endl;
+
+ std::string addr = "localhost";
+ std::string port = "1234";
+
+ cout << "Simple Server starting..." << endl;
+
+ argv_to_vec(argc, argv, args);
+
+ auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_ANY,
+ CODE_ENVIRONMENT_DAEMON,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+
+ for (arg_iter = args.begin(); arg_iter != args.end();) {
+ if (ceph_argparse_witharg(args, arg_iter, &val, "--addr",
+ (char*) NULL)) {
+ addr = val;
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--port",
+ (char*) NULL)) {
+ port = val;
+ } else {
+ ++arg_iter;
+ }
+ };
+
+ string dest_str = "tcp://";
+ dest_str += addr;
+ dest_str += ":";
+ dest_str += port;
+ entity_addr_from_url(&bind_addr, dest_str.c_str());
+
+ messenger = Messenger::create(g_ceph_context, g_conf().get_val<std::string>("ms_type"),
+ entity_name_t::MON(-1),
+ "simple_server",
+ 0 /* nonce */,
+ 0 /* flags */);
+ // enable timing prints
+ messenger->set_magic(MSG_MAGIC_TRACE_CTR);
+ messenger->set_default_policy(
+ Messenger::Policy::stateless_server(0));
+
+ r = messenger->bind(bind_addr);
+ if (r < 0)
+ goto out;
+
+ // Set up crypto, daemonize, etc.
+ //global_init_daemonize(g_ceph_context, 0);
+ common_init_finish(g_ceph_context);
+
+ dispatcher = new SimpleDispatcher(messenger);
+
+ messenger->add_dispatcher_head(dispatcher); // should reach ready()
+ messenger->start();
+ messenger->wait(); // can't be called until ready()
+
+ // done
+ delete messenger;
+
+out:
+ cout << "Simple Server exit" << endl;
+ return r;
+}
+
diff --git a/src/test/messenger/xio_client.cc b/src/test/messenger/xio_client.cc
new file mode 100644
index 00000000..c916d491
--- /dev/null
+++ b/src/test/messenger/xio_client.cc
@@ -0,0 +1,189 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/types.h>
+
+#include <iostream>
+#include <string>
+
+using namespace std;
+
+#include "common/config.h"
+#include "msg/msg_types.h"
+#include "msg/xio/XioMessenger.h"
+#include "msg/xio/FastStrategy.h"
+#include "msg/xio/QueueStrategy.h"
+#include "msg/xio/XioMsg.h"
+#include "messages/MPing.h"
+#include "common/Timer.h"
+#include "common/ceph_argparse.h"
+#include "global/global_init.h"
+#include "perfglue/heap_profiler.h"
+#include "common/address_helper.h"
+#include "message_helper.h"
+#include "xio_dispatcher.h"
+#include "msg/xio/XioConnection.h"
+
+#define dout_subsys ceph_subsys_xio_client
+
+void usage(ostream& out)
+{
+ out << "usage: xio_client [options]\n"
+"options:\n"
+" --addr X\n"
+" --port X\n"
+" --msgs X\n"
+" --dsize X\n"
+" --nfrags X\n"
+" --dfast\n"
+ ;
+}
+
+int main(int argc, const char **argv)
+{
+ vector<const char*> args;
+ Messenger* messenger;
+ XioDispatcher *dispatcher;
+ std::vector<const char*>::iterator arg_iter;
+ std::string val;
+ entity_addr_t dest_addr;
+ ConnectionRef conn;
+ int r = 0;
+
+ std::string addr = "localhost";
+ std::string port = "1234";
+ int n_msgs = 50;
+ int n_dsize = 0;
+ int n_nfrags = 1;
+ bool dfast = false;
+
+ struct timespec ts;
+ ts.tv_sec = 5;
+ ts.tv_nsec = 0;
+
+ argv_to_vec(argc, argv, args);
+
+ auto cct = global_init(NULL, args,
+ CEPH_ENTITY_TYPE_ANY,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+
+ for (arg_iter = args.begin(); arg_iter != args.end();) {
+ if (ceph_argparse_witharg(args, arg_iter, &val, "--addr",
+ (char*) NULL)) {
+ addr = val;
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--port",
+ (char*) NULL)) {
+ port = val;
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--msgs",
+ (char*) NULL)) {
+ n_msgs = atoi(val.c_str());
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--dsize",
+ (char*) NULL)) {
+ n_dsize = atoi(val.c_str());
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--nfrags",
+ (char*) NULL)) {
+ n_nfrags = atoi(val.c_str());
+ } else if (ceph_argparse_flag(args, arg_iter, "--dfast",
+ (char*) NULL)) {
+ dfast = true;
+ } else {
+ ++arg_iter;
+ }
+ };
+
+ if (!args.empty()) {
+ cerr << "What is this? -- " << args[0] << std::endl;
+ usage(cerr);
+ exit(1);
+ }
+
+ DispatchStrategy* dstrategy;
+ if (dfast)
+ dstrategy = new FastStrategy();
+ else
+ dstrategy = new QueueStrategy(2);
+
+ messenger = new XioMessenger(g_ceph_context,
+ entity_name_t::MON(-1),
+ "xio_client",
+ 0 /* nonce */,
+ 0 /* cflags */,
+ dstrategy);
+
+ // enable timing prints
+ static_cast<XioMessenger*>(messenger)->set_magic(
+ MSG_MAGIC_REDUPE /* resubmit messages on delivery (REQUIRED) */ |
+ MSG_MAGIC_TRACE_CTR /* timing prints */);
+
+ // ensure we have a pool of sizeof(payload data)
+ if (n_dsize)
+ (void) static_cast<XioMessenger*>(messenger)->pool_hint(n_dsize);
+
+ messenger->set_default_policy(Messenger::Policy::lossy_client(0));
+
+ string dest_str = "tcp://";
+ dest_str += addr;
+ dest_str += ":";
+ dest_str += port;
+ entity_addr_from_url(&dest_addr, dest_str.c_str());
+ entity_inst_t dest_server(entity_name_t::MON(-1), dest_addr);
+
+ dispatcher = new XioDispatcher(messenger);
+ messenger->add_dispatcher_head(dispatcher);
+
+ dispatcher->set_active(); // this side is the pinger
+
+ r = messenger->start();
+ if (r < 0)
+ goto out;
+
+ conn = messenger->get_connection(dest_server);
+
+ // do stuff
+ time_t t1, t2;
+ t1 = time(NULL);
+
+ int msg_ix;
+ for (msg_ix = 0; msg_ix < n_msgs; ++msg_ix) {
+ /* add a data payload if asked */
+ if (! n_dsize) {
+ conn->send_message(new MPing());
+ } else {
+ conn->send_message(new_simple_ping_with_data("xio_client", n_dsize, n_nfrags));
+ }
+ }
+
+ // do stuff
+ while (conn->is_connected()) {
+ nanosleep(&ts, NULL);
+ }
+
+ t2 = time(NULL);
+ cout << "Processed "
+ << static_cast<XioConnection*>(conn->get())->get_scount()
+ << " one-way messages in " << t2-t1 << "s"
+ << std::endl;
+
+ conn->put();
+
+ // wait a bit for cleanup to finalize
+ ts.tv_sec = 5;
+ nanosleep(&ts, NULL);
+
+ messenger->shutdown();
+
+out:
+ return r;
+}
diff --git a/src/test/messenger/xio_dispatcher.cc b/src/test/messenger/xio_dispatcher.cc
new file mode 100644
index 00000000..ccd26daf
--- /dev/null
+++ b/src/test/messenger/xio_dispatcher.cc
@@ -0,0 +1,76 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "xio_dispatcher.h"
+#include "messages/MPing.h"
+#include "messages/MDataPing.h"
+
+XioDispatcher::XioDispatcher(Messenger *msgr) :
+ Dispatcher(msgr->cct),
+ active(false),
+ messenger(msgr),
+ dcount(0)
+{
+ // nothing
+}
+
+XioDispatcher::~XioDispatcher() {
+ // nothing
+}
+
+bool XioDispatcher::ms_dispatch(Message *m)
+{
+ ConnectionRef conn;
+ uint64_t dc = 0;
+
+ dc = dcount++;
+
+ switch (m->get_type()) {
+ case CEPH_MSG_PING:
+ break;
+ case MSG_DATA_PING:
+ {
+ MDataPing* mdp __attribute__((unused)) = static_cast<MDataPing*>(m);
+ //cout << "MDataPing " << mdp->tag << " " << mdp->counter << std::endl;
+ //mdp->get_data().hexdump(cout);
+ }
+ break;
+ default:
+ ceph_abort();
+ }
+
+ if (unlikely(m->get_magic() & MSG_MAGIC_TRACE_CTR)) {
+ if (unlikely(dc % 65536) == 0) {
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME_COARSE, &ts);
+ std::cout << "ping " << dc << " nanos: " <<
+ ts.tv_nsec + (ts.tv_sec * 1000000000) << std::endl;
+ }
+ } /* trace ctr */
+
+ m->put();
+
+ return true;
+}
+
+bool XioDispatcher::ms_handle_reset(Connection *con)
+{
+ return true;
+}
+
+void XioDispatcher::ms_handle_remote_reset(Connection *con)
+{
+ // nothing
+}
+
diff --git a/src/test/messenger/xio_dispatcher.h b/src/test/messenger/xio_dispatcher.h
new file mode 100644
index 00000000..51916861
--- /dev/null
+++ b/src/test/messenger/xio_dispatcher.h
@@ -0,0 +1,102 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIODISPATCHER_H_
+#define XIODISPATCHER_H_
+
+#include "msg/Dispatcher.h"
+#include "msg/Messenger.h"
+
+class XioDispatcher: public Dispatcher {
+private:
+ bool active;
+ Messenger *messenger;
+ uint64_t dcount;
+public:
+ explicit XioDispatcher(Messenger *msgr);
+ virtual ~XioDispatcher();
+
+ uint64_t get_dcount() { return dcount; }
+
+ void set_active() {
+ active = true;
+ };
+
+ // how i receive messages
+ virtual bool ms_dispatch(Message *m);
+
+ /**
+ * This function will be called whenever a new Connection is made to the
+ * Messenger.
+ *
+ * @param con The new Connection which has been established. You are not
+ * granted a reference to it -- take one if you need one!
+ */
+ virtual void ms_handle_connect(Connection *con) { };
+
+ /**
+ * Callback indicating we have accepted an incoming connection.
+ *
+ * @param con The (new or existing) Connection associated with the session
+ */
+ virtual void ms_handle_accept(Connection *con) { };
+
+ /*
+ * this indicates that the ordered+reliable delivery semantics have
+ * been violated. Messages may have been lost due to a fault
+ * in the network connection.
+ * Only called on lossy Connections or those you've
+ * designated mark_down_on_empty().
+ *
+ * @param con The Connection which broke. You are not granted
+ * a reference to it.
+ */
+ virtual bool ms_handle_reset(Connection *con);
+
+ /**
+ * This indicates that the ordered+reliable delivery semantics
+ * have been violated because the remote somehow reset.
+ * It implies that incoming messages were dropped, and
+ * probably some of our previous outgoing messages were too.
+ *
+ * @param con The Connection which broke. You are not granted
+ * a reference to it.
+ */
+ virtual void ms_handle_remote_reset(Connection *con);
+
+ virtual bool ms_handle_refused(Connection *con) { return false; }
+
+ /**
+ * @defgroup test_xio_dispatcher_h_auth Authentication
+ * @{
+ */
+ /**
+ * Retrieve the AuthAuthorizer for the given peer type. It might not
+ * provide one if it knows there is no AuthAuthorizer for that type.
+ *
+ * @param dest_type The peer type we want the authorizer for.
+ * @param a Double pointer to an AuthAuthorizer. The Dispatcher will fill
+ * in *a with the correct AuthAuthorizer, if it can. Make sure that you have
+ * set *a to NULL before calling in.
+ *
+ * @return True if this function call properly filled in *a, false otherwise.
+ */
+ virtual bool ms_get_authorizer(int dest_type, AuthAuthorizer **a) {
+ return false;
+ };
+
+
+};
+
+#endif /* XIODISPATCHER_H_ */
diff --git a/src/test/messenger/xio_server.cc b/src/test/messenger/xio_server.cc
new file mode 100644
index 00000000..0e274ebc
--- /dev/null
+++ b/src/test/messenger/xio_server.cc
@@ -0,0 +1,122 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/types.h>
+
+#include <iostream>
+#include <string>
+
+using namespace std;
+
+#include "common/config.h"
+#include "msg/xio/XioMessenger.h"
+#include "msg/xio/FastStrategy.h"
+#include "msg/xio/QueueStrategy.h"
+#include "common/Timer.h"
+#include "common/ceph_argparse.h"
+#include "global/global_init.h"
+#include "global/signal_handler.h"
+#include "perfglue/heap_profiler.h"
+#include "common/address_helper.h"
+#include "xio_dispatcher.h"
+
+#define dout_subsys ceph_subsys_simple_server
+
+
+int main(int argc, const char **argv)
+{
+ vector<const char*> args;
+ Messenger *messenger;
+ Dispatcher *dispatcher;
+ std::vector<const char*>::iterator arg_iter;
+ std::string val;
+ entity_addr_t bind_addr;
+ int r = 0;
+
+ using std::endl;
+
+ std::string addr = "localhost";
+ std::string port = "1234";
+ bool dfast = false;
+
+ cout << "Xio Server starting..." << endl;
+
+ argv_to_vec(argc, argv, args);
+
+ global_init(NULL, args, CEPH_ENTITY_TYPE_ANY, CODE_ENVIRONMENT_DAEMON,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+
+ for (arg_iter = args.begin(); arg_iter != args.end();) {
+ if (ceph_argparse_witharg(args, arg_iter, &val, "--addr",
+ (char*) NULL)) {
+ addr = val;
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--port",
+ (char*) NULL)) {
+ port = val;
+ } else if (ceph_argparse_flag(args, arg_iter, "--dfast",
+ (char*) NULL)) {
+ dfast = true;
+ } else {
+ ++arg_iter;
+ }
+ };
+
+ string dest_str = "tcp://";
+ dest_str += addr;
+ dest_str += ":";
+ dest_str += port;
+ entity_addr_from_url(&bind_addr, dest_str.c_str());
+
+ DispatchStrategy* dstrategy;
+ if (dfast)
+ dstrategy = new FastStrategy();
+ else
+ dstrategy = new QueueStrategy(2);
+
+ messenger = new XioMessenger(g_ceph_context,
+ entity_name_t::MON(-1),
+ "xio_server",
+ 0 /* nonce */,
+ 0 /* cflags */,
+ dstrategy);
+
+ static_cast<XioMessenger*>(messenger)->set_magic(
+ MSG_MAGIC_REDUPE /* resubmit messages on delivery (REQUIRED) */ |
+ MSG_MAGIC_TRACE_CTR /* timing prints */);
+
+ messenger->set_default_policy(
+ Messenger::Policy::stateless_server(0));
+
+ r = messenger->bind(bind_addr);
+ if (r < 0)
+ goto out;
+
+ // Set up crypto, daemonize, etc.
+ //global_init_daemonize(g_ceph_context, 0);
+ common_init_finish(g_ceph_context);
+
+ dispatcher = new XioDispatcher(messenger);
+
+ messenger->add_dispatcher_head(dispatcher); // should reach ready()
+ messenger->start();
+ messenger->wait(); // can't be called until ready()
+
+ // done
+ delete messenger;
+
+out:
+ cout << "Simple Server exit" << endl;
+ return r;
+}
+