summaryrefslogtreecommitdiffstats
path: root/src/test/fio/fio_ceph_messenger.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/fio/fio_ceph_messenger.cc')
-rw-r--r--src/test/fio/fio_ceph_messenger.cc697
1 files changed, 697 insertions, 0 deletions
diff --git a/src/test/fio/fio_ceph_messenger.cc b/src/test/fio/fio_ceph_messenger.cc
new file mode 100644
index 000000000..4a4cf4fb5
--- /dev/null
+++ b/src/test/fio/fio_ceph_messenger.cc
@@ -0,0 +1,697 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * CEPH messenger engine
+ *
+ * FIO engine which uses ceph messenger as a transport. See corresponding
+ * FIO client and server jobs for details.
+ */
+
+#include "global/global_init.h"
+#include "msg/Messenger.h"
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+#include "common/perf_counters.h"
+#include "auth/DummyAuth.h"
+#include "ring_buffer.h"
+
+#include <fio.h>
+#include <flist.h>
+#include <optgroup.h>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_
+
+enum ceph_msgr_type {
+ CEPH_MSGR_TYPE_UNDEF,
+ CEPH_MSGR_TYPE_POSIX,
+ CEPH_MSGR_TYPE_DPDK,
+ CEPH_MSGR_TYPE_RDMA,
+};
+
+const char *ceph_msgr_types[] = { "undef", "async+posix",
+ "async+dpdk", "async+rdma" };
+
+struct ceph_msgr_options {
+ struct thread_data *td__;
+ unsigned int is_receiver;
+ unsigned int is_single;
+ unsigned int port;
+ const char *hostname;
+ const char *conffile;
+ enum ceph_msgr_type ms_type;
+};
+
+class FioDispatcher;
+
+struct ceph_msgr_data {
+ ceph_msgr_data(struct ceph_msgr_options *o_, unsigned iodepth) :
+ o(o_) {
+ INIT_FLIST_HEAD(&io_inflight_list);
+ INIT_FLIST_HEAD(&io_pending_list);
+ ring_buffer_init(&io_completed_q, iodepth);
+ pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE);
+ }
+
+ struct ceph_msgr_options *o;
+ Messenger *msgr = NULL;
+ FioDispatcher *disp = NULL;
+ pthread_spinlock_t spin;
+ struct ring_buffer io_completed_q;
+ struct flist_head io_inflight_list;
+ struct flist_head io_pending_list;
+ unsigned int io_inflight_nr = 0;
+ unsigned int io_pending_nr = 0;
+};
+
+struct ceph_msgr_io {
+ struct flist_head list;
+ struct ceph_msgr_data *data;
+ struct io_u *io_u;
+ MOSDOp *req_msg; /** Cached request, valid only for sender */
+};
+
+struct ceph_msgr_reply_io {
+ struct flist_head list;
+ MOSDOpReply *rep;
+};
+
+static void *str_to_ptr(const std::string &str)
+{
+ return (void *)strtoul(str.c_str(), NULL, 16);
+}
+
+static std::string ptr_to_str(void *ptr)
+{
+ char buf[32];
+
+ snprintf(buf, sizeof(buf), "%llx", (unsigned long long)ptr);
+ return std::string(buf);
+}
+
+/*
+ * Used for refcounters print on the last context put, almost duplicates
+ * global context refcounter, sigh.
+ */
+static std::atomic<int> ctx_ref(1);
+static DummyAuthClientServer *g_dummy_auth;
+
+static void create_or_get_ceph_context(struct ceph_msgr_options *o)
+{
+ if (g_ceph_context) {
+ g_ceph_context->get();
+ ctx_ref++;
+ return;
+ }
+
+ boost::intrusive_ptr<CephContext> cct;
+ vector<const char*> args;
+
+ if (o->conffile)
+ args = { "--conf", o->conffile };
+
+ cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ /* Will use g_ceph_context instead */
+ cct.detach();
+
+ common_init_finish(g_ceph_context);
+ g_ceph_context->_conf.apply_changes(NULL);
+ g_dummy_auth = new DummyAuthClientServer(g_ceph_context);
+ g_dummy_auth->auth_registry.refresh_config();
+}
+
+static void put_ceph_context(void)
+{
+ if (--ctx_ref == 0) {
+ ostringstream ostr;
+ Formatter* f;
+
+ f = Formatter::create("json-pretty");
+ g_ceph_context->get_perfcounters_collection()->dump_formatted(f, false);
+ ostr << ">>>>>>>>>>>>> PERFCOUNTERS BEGIN <<<<<<<<<<<<" << std::endl;
+ f->flush(ostr);
+ ostr << ">>>>>>>>>>>>> PERFCOUNTERS END <<<<<<<<<<<<" << std::endl;
+
+ delete f;
+ delete g_dummy_auth;
+ dout(0) << ostr.str() << dendl;
+ }
+
+ g_ceph_context->put();
+}
+
+static void ceph_msgr_sender_on_reply(const object_t &oid)
+{
+ struct ceph_msgr_data *data;
+ struct ceph_msgr_io *io;
+
+ /*
+ * Here we abuse object and use it as a raw pointer. Since this is
+ * only for benchmarks and testing we do not care about anything
+ * but performance. So no need to use global structure in order
+ * to search for reply, just send a pointer and get it back.
+ */
+
+ io = (decltype(io))str_to_ptr(oid.name);
+ data = io->data;
+ ring_buffer_enqueue(&data->io_completed_q, (void *)io);
+}
+
+
+class ReplyCompletion : public Message::CompletionHook {
+ struct ceph_msgr_io *m_io;
+
+public:
+ ReplyCompletion(MOSDOpReply *rep, struct ceph_msgr_io *io) :
+ Message::CompletionHook(rep),
+ m_io(io) {
+ }
+ void finish(int err) override {
+ struct ceph_msgr_data *data = m_io->data;
+
+ ring_buffer_enqueue(&data->io_completed_q, (void *)m_io);
+ }
+};
+
+static void ceph_msgr_receiver_on_request(struct ceph_msgr_data *data,
+ MOSDOp *req)
+{
+ MOSDOpReply *rep;
+
+ rep = new MOSDOpReply(req, 0, 0, 0, false);
+ rep->set_connection(req->get_connection());
+
+ pthread_spin_lock(&data->spin);
+ if (data->io_inflight_nr) {
+ struct ceph_msgr_io *io;
+
+ data->io_inflight_nr--;
+ io = flist_first_entry(&data->io_inflight_list,
+ struct ceph_msgr_io, list);
+ flist_del(&io->list);
+ pthread_spin_unlock(&data->spin);
+
+ rep->set_completion_hook(new ReplyCompletion(rep, io));
+ rep->get_connection()->send_message(rep);
+ } else {
+ struct ceph_msgr_reply_io *rep_io;
+
+ rep_io = (decltype(rep_io))malloc(sizeof(*rep_io));
+ rep_io->rep = rep;
+
+ data->io_pending_nr++;
+ flist_add_tail(&rep_io->list, &data->io_pending_list);
+ pthread_spin_unlock(&data->spin);
+ }
+}
+
+class FioDispatcher : public Dispatcher {
+ struct ceph_msgr_data *m_data;
+
+public:
+ FioDispatcher(struct ceph_msgr_data *data):
+ Dispatcher(g_ceph_context),
+ m_data(data) {
+ }
+ bool ms_can_fast_dispatch_any() const override {
+ return true;
+ }
+ bool ms_can_fast_dispatch(const Message *m) const override {
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ return m_data->o->is_receiver;
+ case CEPH_MSG_OSD_OPREPLY:
+ return !m_data->o->is_receiver;
+ default:
+ return false;
+ }
+ }
+ void ms_handle_fast_connect(Connection *con) override {
+ }
+ void ms_handle_fast_accept(Connection *con) override {
+ }
+ bool ms_dispatch(Message *m) override {
+ return true;
+ }
+ void ms_fast_dispatch(Message *m) override {
+ if (m_data->o->is_receiver) {
+ MOSDOp *req;
+
+ /*
+ * Server side, handle request.
+ */
+
+ req = static_cast<MOSDOp*>(m);
+ req->finish_decode();
+
+ ceph_msgr_receiver_on_request(m_data, req);
+ } else {
+ MOSDOpReply *rep;
+
+ /*
+ * Client side, get reply, extract objid and mark
+ * IO as completed.
+ */
+
+ rep = static_cast<MOSDOpReply*>(m);
+ ceph_msgr_sender_on_reply(rep->get_oid());
+ }
+ m->put();
+ }
+ bool ms_handle_reset(Connection *con) override {
+ return true;
+ }
+ void ms_handle_remote_reset(Connection *con) override {
+ }
+ bool ms_handle_refused(Connection *con) override {
+ return false;
+ }
+ int ms_handle_authentication(Connection *con) override {
+ return 1;
+ }
+};
+
+static entity_addr_t hostname_to_addr(struct ceph_msgr_options *o)
+{
+ entity_addr_t addr;
+
+ addr.parse(o->hostname);
+ addr.set_port(o->port);
+ addr.set_nonce(0);
+
+ return addr;
+}
+
+static Messenger *create_messenger(struct ceph_msgr_options *o)
+{
+ entity_name_t ename = o->is_receiver ?
+ entity_name_t::OSD(0) : entity_name_t::CLIENT(0);
+ std::string lname = o->is_receiver ?
+ "receiver" : "sender";
+
+ std::string ms_type = o->ms_type != CEPH_MSGR_TYPE_UNDEF ?
+ ceph_msgr_types[o->ms_type] :
+ g_ceph_context->_conf.get_val<std::string>("ms_type");
+
+ /* o->td__>pid doesn't set value, so use getpid() instead*/
+ auto nonce = o->is_receiver ? 0 : (getpid() + o->td__->thread_number);
+ Messenger *msgr = Messenger::create(g_ceph_context, ms_type.c_str(),
+ ename, lname, nonce);
+ if (o->is_receiver) {
+ msgr->set_default_policy(Messenger::Policy::stateless_server(0));
+ msgr->bind(hostname_to_addr(o));
+ } else {
+ msgr->set_default_policy(Messenger::Policy::lossless_client(0));
+ }
+ msgr->set_auth_client(g_dummy_auth);
+ msgr->set_auth_server(g_dummy_auth);
+ msgr->set_require_authorizer(false);
+ msgr->start();
+
+ return msgr;
+}
+
+static Messenger *single_msgr;
+static std::atomic<int> single_msgr_ref;
+static vector<FioDispatcher *> single_msgr_disps;
+
+static void init_messenger(struct ceph_msgr_data *data)
+{
+ struct ceph_msgr_options *o = data->o;
+ FioDispatcher *disp;
+ Messenger *msgr;
+
+ disp = new FioDispatcher(data);
+ if (o->is_single) {
+ /*
+ * Single messenger instance for the whole FIO
+ */
+
+ if (!single_msgr) {
+ msgr = create_messenger(o);
+ single_msgr = msgr;
+ } else {
+ msgr = single_msgr;
+ }
+ single_msgr_disps.push_back(disp);
+ single_msgr_ref++;
+ } else {
+ /*
+ * Messenger instance per FIO thread
+ */
+ msgr = create_messenger(o);
+ }
+ msgr->add_dispatcher_head(disp);
+
+ data->disp = disp;
+ data->msgr = msgr;
+}
+
+static void free_messenger(struct ceph_msgr_data *data)
+{
+ data->msgr->shutdown();
+ data->msgr->wait();
+ delete data->msgr;
+}
+
+static void put_messenger(struct ceph_msgr_data *data)
+{
+ struct ceph_msgr_options *o = data->o;
+
+ if (o->is_single) {
+ if (--single_msgr_ref == 0) {
+ free_messenger(data);
+ /*
+ * In case of a single messenger instance we have to
+ * free dispatchers after actual messenger destruction.
+ */
+ for (auto disp : single_msgr_disps)
+ delete disp;
+ single_msgr = NULL;
+ }
+ } else {
+ free_messenger(data);
+ delete data->disp;
+ }
+ data->disp = NULL;
+ data->msgr = NULL;
+}
+
+static int fio_ceph_msgr_setup(struct thread_data *td)
+{
+ struct ceph_msgr_options *o = (decltype(o))td->eo;
+ o->td__ = td;
+ ceph_msgr_data *data;
+
+ /* We have to manage global resources so we use threads */
+ td->o.use_thread = 1;
+
+ create_or_get_ceph_context(o);
+
+ if (!td->io_ops_data) {
+ data = new ceph_msgr_data(o, td->o.iodepth);
+ init_messenger(data);
+ td->io_ops_data = (void *)data;
+ }
+
+ return 0;
+}
+
+static void fio_ceph_msgr_cleanup(struct thread_data *td)
+{
+ struct ceph_msgr_data *data;
+ unsigned nr;
+
+ data = (decltype(data))td->io_ops_data;
+ put_messenger(data);
+
+ nr = ring_buffer_used_size(&data->io_completed_q);
+ if (nr)
+ fprintf(stderr, "fio: io_completed_nr==%d, but should be zero\n",
+ nr);
+ if (data->io_inflight_nr)
+ fprintf(stderr, "fio: io_inflight_nr==%d, but should be zero\n",
+ data->io_inflight_nr);
+ if (data->io_pending_nr)
+ fprintf(stderr, "fio: io_pending_nr==%d, but should be zero\n",
+ data->io_pending_nr);
+ if (!flist_empty(&data->io_inflight_list))
+ fprintf(stderr, "fio: io_inflight_list is not empty\n");
+ if (!flist_empty(&data->io_pending_list))
+ fprintf(stderr, "fio: io_pending_list is not empty\n");
+
+ ring_buffer_deinit(&data->io_completed_q);
+ delete data;
+ put_ceph_context();
+}
+
+static int fio_ceph_msgr_io_u_init(struct thread_data *td, struct io_u *io_u)
+{
+ struct ceph_msgr_options *o = (decltype(o))td->eo;
+ struct ceph_msgr_io *io;
+ MOSDOp *req_msg = NULL;
+
+ io = (decltype(io))malloc(sizeof(*io));
+ io->io_u = io_u;
+ io->data = (decltype(io->data))td->io_ops_data;
+
+ if (!o->is_receiver) {
+ object_t oid(ptr_to_str(io));
+ pg_t pgid;
+ object_locator_t oloc;
+ hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(),
+ pgid.pool(), oloc.nspace);
+ spg_t spgid(pgid);
+ entity_inst_t dest(entity_name_t::OSD(0), hostname_to_addr(o));
+
+ Messenger *msgr = io->data->msgr;
+ ConnectionRef con = msgr->connect_to(dest.name.type(),
+ entity_addrvec_t(dest.addr));
+
+ req_msg = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
+ req_msg->set_connection(con);
+ }
+
+ io->req_msg = req_msg;
+ io_u->engine_data = (void *)io;
+
+ return 0;
+}
+
+static void fio_ceph_msgr_io_u_free(struct thread_data *td, struct io_u *io_u)
+{
+ struct ceph_msgr_io *io;
+
+ io = (decltype(io))io_u->engine_data;
+ if (io) {
+ io_u->engine_data = NULL;
+ if (io->req_msg)
+ io->req_msg->put();
+ free(io);
+ }
+}
+
+static enum fio_q_status ceph_msgr_sender_queue(struct thread_data *td,
+ struct io_u *io_u)
+{
+ struct ceph_msgr_data *data;
+ struct ceph_msgr_io *io;
+
+ bufferlist buflist = bufferlist::static_from_mem(
+ (char *)io_u->buf, io_u->buflen);
+
+ io = (decltype(io))io_u->engine_data;
+ data = (decltype(data))td->io_ops_data;
+
+ /* No handy method to clear ops before reusage? Ok */
+ io->req_msg->ops.clear();
+
+ /* Here we do not care about direction, always send as write */
+ io->req_msg->write(0, io_u->buflen, buflist);
+ /* Keep message alive */
+ io->req_msg->get();
+ io->req_msg->get_connection()->send_message(io->req_msg);
+
+ return FIO_Q_QUEUED;
+}
+
+static int fio_ceph_msgr_getevents(struct thread_data *td, unsigned int min,
+ unsigned int max, const struct timespec *ts)
+{
+ struct ceph_msgr_data *data;
+ unsigned int nr;
+
+ data = (decltype(data))td->io_ops_data;
+
+ /*
+ * Check io_u.c : if min == 0 -> ts is valid and equal to zero,
+ * if min != 0 -> ts is NULL.
+ */
+ assert(!min ^ !ts);
+
+ nr = ring_buffer_used_size(&data->io_completed_q);
+ if (nr >= min)
+ /* We got something */
+ return min(nr, max);
+
+ /* Here we are only if min != 0 and ts == NULL */
+ assert(min && !ts);
+
+ while ((nr = ring_buffer_used_size(&data->io_completed_q)) < min &&
+ !td->terminate) {
+ /* Poll, no disk IO, so we expect response immediately. */
+ usleep(10);
+ }
+
+ return min(nr, max);
+}
+
+static struct io_u *fio_ceph_msgr_event(struct thread_data *td, int event)
+{
+ struct ceph_msgr_data *data;
+ struct ceph_msgr_io *io;
+
+ data = (decltype(data))td->io_ops_data;
+ io = (decltype(io))ring_buffer_dequeue(&data->io_completed_q);
+
+ return io->io_u;
+}
+
+static enum fio_q_status ceph_msgr_receiver_queue(struct thread_data *td,
+ struct io_u *io_u)
+{
+ struct ceph_msgr_data *data;
+ struct ceph_msgr_io *io;
+
+ io = (decltype(io))io_u->engine_data;
+ data = io->data;
+ pthread_spin_lock(&data->spin);
+ if (data->io_pending_nr) {
+ struct ceph_msgr_reply_io *rep_io;
+ MOSDOpReply *rep;
+
+ data->io_pending_nr--;
+ rep_io = flist_first_entry(&data->io_pending_list,
+ struct ceph_msgr_reply_io,
+ list);
+ flist_del(&rep_io->list);
+ rep = rep_io->rep;
+ pthread_spin_unlock(&data->spin);
+ free(rep_io);
+
+ rep->set_completion_hook(new ReplyCompletion(rep, io));
+ rep->get_connection()->send_message(rep);
+ } else {
+ data->io_inflight_nr++;
+ flist_add_tail(&io->list, &data->io_inflight_list);
+ pthread_spin_unlock(&data->spin);
+ }
+
+ return FIO_Q_QUEUED;
+}
+
+static enum fio_q_status fio_ceph_msgr_queue(struct thread_data *td,
+ struct io_u *io_u)
+{
+ struct ceph_msgr_options *o = (decltype(o))td->eo;
+
+ if (o->is_receiver)
+ return ceph_msgr_receiver_queue(td, io_u);
+ else
+ return ceph_msgr_sender_queue(td, io_u);
+}
+
+static int fio_ceph_msgr_open_file(struct thread_data *td, struct fio_file *f)
+{
+ return 0;
+}
+
+static int fio_ceph_msgr_close_file(struct thread_data *, struct fio_file *)
+{
+ return 0;
+}
+
+template <class Func>
+fio_option make_option(Func&& func)
+{
+ auto o = fio_option{};
+ o.category = FIO_OPT_C_ENGINE;
+ func(std::ref(o));
+ return o;
+}
+
+static std::vector<fio_option> options {
+ make_option([] (fio_option& o) {
+ o.name = "receiver";
+ o.lname = "CEPH messenger is receiver";
+ o.type = FIO_OPT_BOOL;
+ o.off1 = offsetof(struct ceph_msgr_options, is_receiver);
+ o.help = "CEPH messenger is sender or receiver";
+ o.def = "0";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "single_instance";
+ o.lname = "Single instance of CEPH messenger ";
+ o.type = FIO_OPT_BOOL;
+ o.off1 = offsetof(struct ceph_msgr_options, is_single);
+ o.help = "CEPH messenger is a created once for all threads";
+ o.def = "0";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "hostname";
+ o.lname = "CEPH messenger hostname";
+ o.type = FIO_OPT_STR_STORE;
+ o.off1 = offsetof(struct ceph_msgr_options, hostname);
+ o.help = "Hostname for CEPH messenger engine";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "port";
+ o.lname = "CEPH messenger engine port";
+ o.type = FIO_OPT_INT;
+ o.off1 = offsetof(struct ceph_msgr_options, port);
+ o.maxval = 65535;
+ o.minval = 1;
+ o.help = "Port to use for CEPH messenger";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "ms_type";
+ o.lname = "CEPH messenger transport type: async+posix, async+dpdk, async+rdma";
+ o.type = FIO_OPT_STR;
+ o.off1 = offsetof(struct ceph_msgr_options, ms_type);
+ o.help = "Transport type for CEPH messenger, see 'ms async transport type' corresponding CEPH documentation page";
+ o.def = "undef";
+
+ o.posval[0].ival = "undef";
+ o.posval[0].oval = CEPH_MSGR_TYPE_UNDEF;
+
+ o.posval[1].ival = "async+posix";
+ o.posval[1].oval = CEPH_MSGR_TYPE_POSIX;
+ o.posval[1].help = "POSIX API";
+
+ o.posval[2].ival = "async+dpdk";
+ o.posval[2].oval = CEPH_MSGR_TYPE_DPDK;
+ o.posval[2].help = "DPDK";
+
+ o.posval[3].ival = "async+rdma";
+ o.posval[3].oval = CEPH_MSGR_TYPE_RDMA;
+ o.posval[3].help = "RDMA";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "ceph_conf_file";
+ o.lname = "CEPH configuration file";
+ o.type = FIO_OPT_STR_STORE;
+ o.off1 = offsetof(struct ceph_msgr_options, conffile);
+ o.help = "Path to CEPH configuration file";
+ }),
+ {} /* Last NULL */
+};
+
+static struct ioengine_ops ioengine;
+
+extern "C" {
+
+void get_ioengine(struct ioengine_ops** ioengine_ptr)
+{
+ /*
+ * Main ioengine structure
+ */
+ ioengine.name = "ceph-msgr";
+ ioengine.version = FIO_IOOPS_VERSION;
+ ioengine.flags = FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO;
+ ioengine.setup = fio_ceph_msgr_setup;
+ ioengine.queue = fio_ceph_msgr_queue;
+ ioengine.getevents = fio_ceph_msgr_getevents;
+ ioengine.event = fio_ceph_msgr_event;
+ ioengine.cleanup = fio_ceph_msgr_cleanup;
+ ioengine.open_file = fio_ceph_msgr_open_file;
+ ioengine.close_file = fio_ceph_msgr_close_file;
+ ioengine.io_u_init = fio_ceph_msgr_io_u_init;
+ ioengine.io_u_free = fio_ceph_msgr_io_u_free;
+ ioengine.option_struct_size = sizeof(struct ceph_msgr_options);
+ ioengine.options = options.data();
+
+ *ioengine_ptr = &ioengine;
+}
+} // extern "C"