summaryrefslogtreecommitdiffstats
path: root/src/msg/xio/XioMessenger.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/msg/xio/XioMessenger.cc')
-rw-r--r--src/msg/xio/XioMessenger.cc1136
1 files changed, 1136 insertions, 0 deletions
diff --git a/src/msg/xio/XioMessenger.cc b/src/msg/xio/XioMessenger.cc
new file mode 100644
index 00000000..dec7d0c7
--- /dev/null
+++ b/src/msg/xio/XioMessenger.cc
@@ -0,0 +1,1136 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <arpa/inet.h>
+#include <boost/lexical_cast.hpp>
+#include <set>
+#include <stdlib.h>
+#include <memory>
+
+#include "XioMsg.h"
+#include "XioMessenger.h"
+#include "common/address_helper.h"
+#include "common/code_environment.h"
+#include "messages/MNop.h"
+
+#define dout_subsys ceph_subsys_xio
+#undef dout_prefix
+#define dout_prefix *_dout << "xio."
+
+Mutex mtx("XioMessenger Package Lock");
+std::atomic<bool> initialized = { false };
+
+std::atomic<unsigned> XioMessenger::nInstances = { 0 };
+
+struct xio_mempool *xio_msgr_noreg_mpool;
+
+static struct xio_session_ops xio_msgr_ops;
+
+/* Accelio API callouts */
+
+namespace xio_log
+{
+typedef pair<const char*, int> level_pair;
+static const level_pair LEVELS[] = {
+ make_pair("fatal", 0),
+ make_pair("error", 0),
+ make_pair("warn", 1),
+ make_pair("info", 1),
+ make_pair("debug", 2),
+ make_pair("trace", 20)
+};
+
+static CephContext *context;
+
+int get_level()
+{
+ int level = 0;
+ for (size_t i = 0; i < sizeof(LEVELS); i++) {
+ if (!ldlog_p1(context, dout_subsys, LEVELS[i].second))
+ break;
+ level++;
+ }
+ return level;
+}
+
+void log_dout(const char *file, unsigned line,
+ const char *function, unsigned level,
+ const char *fmt, ...)
+{
+ char buffer[2048];
+ va_list args;
+ va_start(args, fmt);
+ int n = vsnprintf(buffer, sizeof(buffer), fmt, args);
+ va_end(args);
+
+ if (n > 0) {
+ const char *short_file = strrchr(file, '/');
+ short_file = (short_file == NULL) ? file : short_file + 1;
+
+ const level_pair &lvl = LEVELS[level];
+ ldout(context, lvl.second) << '[' << lvl.first << "] "
+ << short_file << ':' << line << ' '
+ << function << " - " << buffer << dendl;
+ }
+}
+}
+
+static int on_session_event(struct xio_session *session,
+ struct xio_session_event_data *event_data,
+ void *cb_user_context)
+{
+ XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
+ CephContext *cct = msgr->cct;
+
+ ldout(cct,4) << "session event: " << xio_session_event_str(event_data->event)
+ << ". reason: " << xio_strerror(event_data->reason) << dendl;
+
+ return msgr->session_event(session, event_data, cb_user_context);
+}
+
+static int on_new_session(struct xio_session *session,
+ struct xio_new_session_req *req,
+ void *cb_user_context)
+{
+ XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
+ CephContext *cct = msgr->cct;
+
+ ldout(cct,4) << "new session " << session
+ << " user_context " << cb_user_context << dendl;
+
+ return (msgr->new_session(session, req, cb_user_context));
+}
+
+static int on_msg(struct xio_session *session,
+ struct xio_msg *req,
+ int more_in_batch,
+ void *cb_user_context)
+{
+ XioConnection* xcon __attribute__((unused)) =
+ static_cast<XioConnection*>(cb_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,25) << "on_msg session " << session << " xcon " << xcon << dendl;
+
+ if (unlikely(XioPool::trace_mempool)) {
+ static uint32_t nreqs;
+ if (unlikely((++nreqs % 65536) == 0)) {
+ xp_stats.dump(__func__, nreqs);
+ }
+ }
+
+ return xcon->on_msg(session, req, more_in_batch,
+ cb_user_context);
+}
+
+static int on_ow_msg_send_complete(struct xio_session *session,
+ struct xio_msg *msg,
+ void *conn_user_context)
+{
+ XioConnection *xcon =
+ static_cast<XioConnection*>(conn_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,25) << "msg delivered session: " << session
+ << " msg: " << msg << " conn_user_context "
+ << conn_user_context << dendl;
+
+ return xcon->on_ow_msg_send_complete(session, msg, conn_user_context);
+}
+
+static int on_msg_error(struct xio_session *session,
+ enum xio_status error,
+ enum xio_msg_direction dir,
+ struct xio_msg *msg,
+ void *conn_user_context)
+{
+ /* XIO promises to flush back undelivered messages */
+ XioConnection *xcon =
+ static_cast<XioConnection*>(conn_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,4) << "msg error session: " << session
+ << " error: " << xio_strerror(error) << " msg: " << msg
+ << " conn_user_context " << conn_user_context << dendl;
+
+ return xcon->on_msg_error(session, error, msg, conn_user_context);
+}
+
+static int on_cancel(struct xio_session *session,
+ struct xio_msg *msg,
+ enum xio_status result,
+ void *conn_user_context)
+{
+ XioConnection* xcon __attribute__((unused)) =
+ static_cast<XioConnection*>(conn_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,25) << "on cancel: session: " << session << " msg: " << msg
+ << " conn_user_context " << conn_user_context << dendl;
+
+ return 0;
+}
+
+static int on_cancel_request(struct xio_session *session,
+ struct xio_msg *msg,
+ void *conn_user_context)
+{
+ XioConnection* xcon __attribute__((unused)) =
+ static_cast<XioConnection*>(conn_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,25) << "on cancel request: session: " << session << " msg: " << msg
+ << " conn_user_context " << conn_user_context << dendl;
+
+ return 0;
+}
+
+/* free functions */
+static string xio_uri_from_entity(const string &type,
+ const entity_addr_t& addr, bool want_port)
+{
+ const char *host = NULL;
+ char addr_buf[129];
+ string xio_uri;
+
+ switch(addr.get_family()) {
+ case AF_INET:
+ host = inet_ntop(AF_INET, &addr.in4_addr().sin_addr, addr_buf,
+ INET_ADDRSTRLEN);
+ break;
+ case AF_INET6:
+ host = inet_ntop(AF_INET6, &addr.in6_addr().sin6_addr, addr_buf,
+ INET6_ADDRSTRLEN);
+ break;
+ default:
+ abort();
+ break;
+ };
+
+ if (type == "rdma" || type == "tcp")
+ xio_uri = type + "://";
+ else
+ xio_uri = "rdma://";
+
+ /* The following can only succeed if the host is rdma-capable */
+ xio_uri += host;
+ if (want_port) {
+ xio_uri += ":";
+ xio_uri += boost::lexical_cast<std::string>(addr.get_port());
+ }
+
+ return xio_uri;
+} /* xio_uri_from_entity */
+
+void XioInit::package_init(CephContext *cct) {
+ if (! initialized) {
+
+ mtx.Lock();
+ if (! initialized) {
+
+ xio_init();
+
+ // claim a reference to the first context we see
+ xio_log::context = cct->get();
+
+ int xopt;
+ xopt = xio_log::get_level();
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_LEVEL,
+ &xopt, sizeof(xopt));
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_FN,
+ (const void*)xio_log::log_dout, sizeof(xio_log_fn));
+
+ xopt = 1;
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_DISABLE_HUGETBL,
+ &xopt, sizeof(xopt));
+
+ if (g_code_env == CODE_ENVIRONMENT_DAEMON) {
+ xopt = 1;
+ xio_set_opt(NULL, XIO_OPTLEVEL_RDMA, XIO_OPTNAME_ENABLE_FORK_INIT,
+ &xopt, sizeof(xopt));
+ }
+
+ xopt = XIO_MSGR_IOVLEN;
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_IN_IOVLEN,
+ &xopt, sizeof(xopt));
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN,
+ &xopt, sizeof(xopt));
+
+ /* enable flow-control */
+ xopt = 1;
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_ENABLE_FLOW_CONTROL,
+ &xopt, sizeof(xopt));
+
+ /* and set threshold for buffer callouts */
+ xopt = max(cct->_conf->xio_max_send_inline, 512);
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_DATA,
+ &xopt, sizeof(xopt));
+
+ xopt = XioMsgHdr::get_max_encoded_length();
+ ldout(cct,2) << "setting accelio max header size " << xopt << dendl;
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_HEADER,
+ &xopt, sizeof(xopt));
+
+ size_t queue_depth = cct->_conf->xio_queue_depth;
+ struct xio_mempool_config mempool_config = {
+ 6,
+ {
+ {1024, 0, queue_depth, 262144},
+ {4096, 0, queue_depth, 262144},
+ {16384, 0, queue_depth, 262144},
+ {65536, 0, 128, 65536},
+ {262144, 0, 32, 16384},
+ {1048576, 0, 8, 8192}
+ }
+ };
+ xio_set_opt(NULL,
+ XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_CONFIG_MEMPOOL,
+ &mempool_config, sizeof(mempool_config));
+
+ /* and unregisterd one */
+ #define XMSG_MEMPOOL_QUANTUM 4096
+
+ xio_msgr_noreg_mpool =
+ xio_mempool_create(-1 /* nodeid */,
+ XIO_MEMPOOL_FLAG_REGULAR_PAGES_ALLOC);
+
+ (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 64,
+ cct->_conf->xio_mp_min,
+ cct->_conf->xio_mp_max_64,
+ XMSG_MEMPOOL_QUANTUM, 0);
+ (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 256,
+ cct->_conf->xio_mp_min,
+ cct->_conf->xio_mp_max_256,
+ XMSG_MEMPOOL_QUANTUM, 0);
+ (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 1024,
+ cct->_conf->xio_mp_min,
+ cct->_conf->xio_mp_max_1k,
+ XMSG_MEMPOOL_QUANTUM, 0);
+ (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, getpagesize(),
+ cct->_conf->xio_mp_min,
+ cct->_conf->xio_mp_max_page,
+ XMSG_MEMPOOL_QUANTUM, 0);
+
+ /* initialize ops singleton */
+ xio_msgr_ops.on_session_event = on_session_event;
+ xio_msgr_ops.on_new_session = on_new_session;
+ xio_msgr_ops.on_session_established = NULL;
+ xio_msgr_ops.on_msg = on_msg;
+ xio_msgr_ops.on_ow_msg_send_complete = on_ow_msg_send_complete;
+ xio_msgr_ops.on_msg_error = on_msg_error;
+ xio_msgr_ops.on_cancel = on_cancel;
+ xio_msgr_ops.on_cancel_request = on_cancel_request;
+
+ /* mark initialized */
+ initialized = true;
+ }
+ mtx.Unlock();
+ }
+ }
+
+/* XioMessenger */
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) {
+ return *_dout << "-- " << msgr->get_myaddr_legacy() << " ";
+}
+
+XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
+ string mname, uint64_t _nonce,
+ uint64_t cflags, DispatchStrategy *ds)
+ : SimplePolicyMessenger(cct, name, mname, _nonce),
+ XioInit(cct),
+ portals(this, get_nportals(cflags), get_nconns_per_portal(cflags)),
+ dispatch_strategy(ds),
+ loop_con(new XioLoopbackConnection(this)),
+ special_handling(0),
+ sh_mtx("XioMessenger session mutex"),
+ sh_cond(),
+ need_addr(true),
+ did_bind(false),
+ nonce(_nonce)
+{
+
+ if (cct->_conf->xio_trace_xcon)
+ magic |= MSG_MAGIC_TRACE_XCON;
+
+ XioPool::trace_mempool = (cct->_conf->xio_trace_mempool);
+ XioPool::trace_msgcnt = (cct->_conf->xio_trace_msgcnt);
+
+ dispatch_strategy->set_messenger(this);
+
+ /* update class instance count */
+ nInstances++;
+
+ loop_con->set_features(CEPH_FEATURES_ALL);
+
+ ldout(cct,2) << "Create msgr: " << this << " instance: "
+ << nInstances << " type: " << name.type_str()
+ << " subtype: " << mname << " nportals: " << get_nportals(cflags)
+ << " nconns_per_portal: " << get_nconns_per_portal(cflags)
+ << dendl;
+
+} /* ctor */
+
+int XioMessenger::pool_hint(uint32_t dsize) {
+ if (dsize > 1024*1024)
+ return 0;
+
+ /* if dsize is already present, returns -EEXIST */
+ return xio_mempool_add_slab(xio_msgr_noreg_mpool, dsize, 0,
+ cct->_conf->xio_mp_max_hint,
+ XMSG_MEMPOOL_QUANTUM, 0);
+}
+
+int XioMessenger::get_nconns_per_portal(uint64_t cflags)
+{
+ const int XIO_DEFAULT_NUM_CONNS_PER_PORTAL = 8;
+ int nconns = XIO_DEFAULT_NUM_CONNS_PER_PORTAL;
+
+ if (cflags & Messenger::HAS_MANY_CONNECTIONS)
+ nconns = max(cct->_conf->xio_max_conns_per_portal, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
+ else if (cflags & Messenger::HEARTBEAT)
+ nconns = max(cct->_conf->osd_heartbeat_min_peers * 4, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
+
+ return nconns;
+}
+
+int XioMessenger::get_nportals(uint64_t cflags)
+{
+ int nportals = 1;
+
+ if (cflags & Messenger::HAS_HEAVY_TRAFFIC)
+ nportals = max(cct->_conf->xio_portal_threads, 1);
+
+ return nportals;
+}
+
+void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
+{
+ // be careful here: multiple threads may block here, and readers of
+ // my_inst.addr do NOT hold any lock.
+
+ // this always goes from true -> false under the protection of the
+ // mutex. if it is already false, we need not retake the mutex at
+ // all.
+ if (!need_addr)
+ return;
+
+ sh_mtx.Lock();
+ if (need_addr) {
+ entity_addr_t t = peer_addr_for_me;
+ t.set_port(my_inst.addr.get_port());
+ my_inst.addr.set_sockaddr(t.get_sockaddr());
+ ldout(cct,2) << "learned my addr " << my_inst.addr << dendl;
+ need_addr = false;
+ // init_local_connection();
+ }
+ sh_mtx.Unlock();
+
+}
+
+int XioMessenger::new_session(struct xio_session *session,
+ struct xio_new_session_req *req,
+ void *cb_user_context)
+{
+ if (shutdown_called) {
+ return xio_reject(
+ session, XIO_E_SESSION_REFUSED, NULL /* udata */, 0 /* udata len */);
+ }
+ int code = portals.accept(session, req, cb_user_context);
+ if (! code)
+ nsessions++;
+ return code;
+} /* new_session */
+
+int XioMessenger::session_event(struct xio_session *session,
+ struct xio_session_event_data *event_data,
+ void *cb_user_context)
+{
+ XioConnection *xcon;
+
+ switch (event_data->event) {
+ case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT:
+ {
+ struct xio_connection *conn = event_data->conn;
+ struct xio_connection_attr xcona;
+ entity_addr_t peer_addr_for_me, paddr;
+
+ xcon = static_cast<XioConnection*>(event_data->conn_user_context);
+
+ ldout(cct,2) << "connection established " << event_data->conn
+ << " session " << session << " xcon " << xcon << dendl;
+
+ (void) xio_query_connection(conn, &xcona,
+ XIO_CONNECTION_ATTR_LOCAL_ADDR|
+ XIO_CONNECTION_ATTR_PEER_ADDR);
+ peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr);
+ paddr.set_sockaddr((struct sockaddr *)&xcona.peer_addr);
+ //set_myaddr(peer_addr_for_me);
+ learned_addr(peer_addr_for_me);
+ ldout(cct,2) << "client: connected from " << peer_addr_for_me << " to " << paddr << dendl;
+
+ /* notify hook */
+ this->ms_deliver_handle_connect(xcon);
+ this->ms_deliver_handle_fast_connect(xcon);
+ }
+ break;
+
+ case XIO_SESSION_NEW_CONNECTION_EVENT:
+ {
+ struct xio_connection *conn = event_data->conn;
+ struct xio_connection_attr xcona;
+ entity_inst_t s_inst;
+ entity_addr_t peer_addr_for_me;
+
+ (void) xio_query_connection(conn, &xcona,
+ XIO_CONNECTION_ATTR_CTX|
+ XIO_CONNECTION_ATTR_PEER_ADDR|
+ XIO_CONNECTION_ATTR_LOCAL_ADDR);
+ /* XXX assumes RDMA */
+ s_inst.addr.set_sockaddr((struct sockaddr *)&xcona.peer_addr);
+ peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr);
+
+ xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst);
+ xcon->session = session;
+
+ struct xio_context_attr xctxa;
+ (void) xio_query_context(xcona.ctx, &xctxa, XIO_CONTEXT_ATTR_USER_CTX);
+
+ xcon->conn = conn;
+ xcon->portal = static_cast<XioPortal*>(xctxa.user_context);
+ ceph_assert(xcon->portal);
+
+ xcona.user_context = xcon;
+ (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX);
+
+ xcon->connected = true;
+
+ /* sentinel ref */
+ xcon->get(); /* xcon->nref == 1 */
+ conns_sp.lock();
+ conns_list.push_back(*xcon);
+ /* XXX we can't put xcon in conns_entity_map becase we don't yet know
+ * it's peer address */
+ conns_sp.unlock();
+
+ /* XXXX pre-merge of session startup negotiation ONLY! */
+ xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
+
+ ldout(cct,2) << "New connection session " << session
+ << " xcon " << xcon << " on msgr: " << this << " portal: " << xcon->portal << dendl;
+ ldout(cct,2) << "Server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl;
+ }
+ break;
+ case XIO_SESSION_CONNECTION_ERROR_EVENT:
+ case XIO_SESSION_CONNECTION_CLOSED_EVENT: /* orderly discon */
+ case XIO_SESSION_CONNECTION_DISCONNECTED_EVENT: /* unexpected discon */
+ case XIO_SESSION_CONNECTION_REFUSED_EVENT:
+ xcon = static_cast<XioConnection*>(event_data->conn_user_context);
+ ldout(cct,2) << xio_session_event_str(event_data->event)
+ << " xcon " << xcon << " session " << session << dendl;
+ if (likely(!!xcon)) {
+ unregister_xcon(xcon);
+ xcon->on_disconnect_event();
+ }
+ break;
+ case XIO_SESSION_CONNECTION_TEARDOWN_EVENT:
+ xcon = static_cast<XioConnection*>(event_data->conn_user_context);
+ ldout(cct,2) << xio_session_event_str(event_data->event)
+ << " xcon " << xcon << " session " << session << dendl;
+ /*
+ * There are flows where Accelio sends teardown event without going
+ * through disconnect event. so we make sure we cleaned the connection.
+ */
+ unregister_xcon(xcon);
+ xcon->on_teardown_event();
+ break;
+ case XIO_SESSION_TEARDOWN_EVENT:
+ ldout(cct,2) << xio_session_event_str(event_data->event)
+ << " session " << session << dendl;
+ if (unlikely(XioPool::trace_mempool)) {
+ xp_stats.dump("xio session dtor", reinterpret_cast<uint64_t>(session));
+ }
+ xio_session_destroy(session);
+ if (--nsessions == 0) {
+ Mutex::Locker lck(sh_mtx);
+ if (nsessions == 0)
+ sh_cond.Signal();
+ }
+ break;
+ default:
+ break;
+ };
+
+ return 0;
+}
+
+enum bl_type
+{
+ BUFFER_PAYLOAD,
+ BUFFER_MIDDLE,
+ BUFFER_DATA
+};
+
+#define MAX_XIO_BUF_SIZE 1044480
+
+static inline int
+xio_count_buffers(const buffer::list& bl, int& req_size, int& msg_off, int& req_off)
+{
+
+ const std::list<buffer::ptr>& buffers = bl.buffers();
+ list<bufferptr>::const_iterator pb;
+ size_t size, off;
+ int result;
+ int first = 1;
+
+ off = size = 0;
+ result = 0;
+ for (;;) {
+ if (off >= size) {
+ if (first) pb = buffers.begin(); else ++pb;
+ if (pb == buffers.end()) {
+ break;
+ }
+ off = 0;
+ size = pb->length();
+ first = 0;
+ }
+ size_t count = size - off;
+ if (!count) continue;
+ if (req_size + count > MAX_XIO_BUF_SIZE) {
+ count = MAX_XIO_BUF_SIZE - req_size;
+ }
+
+ ++result;
+
+ /* advance iov and perhaps request */
+
+ off += count;
+ req_size += count;
+ ++msg_off;
+ if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
+ ++req_off;
+ msg_off = 0;
+ req_size = 0;
+ }
+ }
+
+ return result;
+}
+
+static inline void
+xio_place_buffers(const buffer::list& bl, XioMsg *xmsg, struct xio_msg*& req,
+ struct xio_iovec_ex*& msg_iov, int& req_size,
+ int ex_cnt, int& msg_off, int& req_off, bl_type type)
+{
+
+ const std::list<buffer::ptr>& buffers = bl.buffers();
+ list<bufferptr>::const_iterator pb;
+ struct xio_iovec_ex* iov;
+ size_t size, off;
+ const char *data = NULL;
+ int first = 1;
+
+ off = size = 0;
+ for (;;) {
+ if (off >= size) {
+ if (first) pb = buffers.begin(); else ++pb;
+ if (pb == buffers.end()) {
+ break;
+ }
+ off = 0;
+ size = pb->length();
+ data = pb->c_str(); // is c_str() efficient?
+ first = 0;
+ }
+ size_t count = size - off;
+ if (!count) continue;
+ if (req_size + count > MAX_XIO_BUF_SIZE) {
+ count = MAX_XIO_BUF_SIZE - req_size;
+ }
+
+ /* assign buffer */
+ iov = &msg_iov[msg_off];
+ iov->iov_base = (void *) (&data[off]);
+ iov->iov_len = count;
+
+ switch (type) {
+ case BUFFER_DATA:
+ //break;
+ default:
+ {
+ struct xio_reg_mem *mp = get_xio_mp(*pb);
+ iov->mr = (mp) ? mp->mr : NULL;
+ }
+ break;
+ }
+
+ /* advance iov(s) */
+
+ off += count;
+ req_size += count;
+ ++msg_off;
+
+ /* next request if necessary */
+
+ if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
+ /* finish this request */
+ req->out.pdata_iov.nents = msg_off;
+ /* advance to next, and write in it if it's not the last one. */
+ if (++req_off >= ex_cnt) {
+ req = 0; /* poison. trap if we try to use it. */
+ msg_iov = NULL;
+ } else {
+ req = &xmsg->req_arr[req_off].msg;
+ msg_iov = req->out.pdata_iov.sglist;
+ }
+ msg_off = 0;
+ req_size = 0;
+ }
+ }
+}
+
+int XioMessenger::bind(const entity_addr_t& addr)
+{
+ if (addr.is_blank_ip()) {
+ lderr(cct) << "ERROR: need rdma ip for remote use! " << dendl;
+ cout << "Error: xio bind failed. public/cluster ip not specified" << std::endl;
+ return -1;
+ }
+
+ entity_addr_t shift_addr = addr;
+ string base_uri = xio_uri_from_entity(cct->_conf->xio_transport_type,
+ shift_addr, false /* want_port */);
+ ldout(cct,4) << "XioMessenger " << this << " bind: xio_uri "
+ << base_uri << ':' << shift_addr.get_port() << dendl;
+
+ uint16_t port0;
+ int r = portals.bind(&xio_msgr_ops, base_uri, shift_addr.get_port(), &port0);
+ if (r == 0) {
+ shift_addr.set_port(port0);
+ shift_addr.nonce = nonce;
+ set_myaddr(shift_addr);
+ need_addr = false;
+ did_bind = true;
+ }
+ return r;
+} /* bind */
+
+int XioMessenger::rebind(const set<int>& avoid_ports)
+{
+ ldout(cct,4) << "XioMessenger " << this << " rebind attempt" << dendl;
+ return 0;
+} /* rebind */
+
+int XioMessenger::start()
+{
+ portals.start();
+ dispatch_strategy->start();
+ if (!did_bind) {
+ my_inst.addr.nonce = nonce;
+ }
+ started = true;
+ return 0;
+}
+
+void XioMessenger::wait()
+{
+ portals.join();
+ dispatch_strategy->wait();
+} /* wait */
+
+int XioMessenger::_send_message(Message *m, const entity_inst_t& dest)
+{
+ ConnectionRef conn = get_connection(dest);
+ if (conn)
+ return _send_message(m, &(*conn));
+ else
+ return EINVAL;
+} /* send_message(Message *, const entity_inst_t&) */
+
+static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
+ int ex_cnt)
+{
+ struct xio_reg_mem mp_mem;
+ int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioMsg), &mp_mem);
+ if (!!e)
+ return NULL;
+ XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr);
+ ceph_assert(!!xmsg);
+ new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL);
+ return xmsg;
+}
+
+XioCommand* pool_alloc_xio_command(XioConnection *xcon)
+{
+ struct xio_reg_mem mp_mem;
+ int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioCommand), &mp_mem);
+ if (!!e)
+ return NULL;
+ XioCommand *xcmd = reinterpret_cast<XioCommand*>(mp_mem.addr);
+ ceph_assert(!!xcmd);
+ new (xcmd) XioCommand(xcon, mp_mem);
+ return xcmd;
+}
+
+int XioMessenger::_send_message(Message *m, Connection *con)
+{
+ if (con == loop_con.get() /* intrusive_ptr get() */) {
+ m->set_connection(con);
+ m->set_src(get_myinst().name);
+ m->set_seq(loop_con->next_seq());
+ ds_dispatch(m);
+ return 0;
+ }
+
+ XioConnection *xcon = static_cast<XioConnection*>(con);
+
+ /* If con is not in READY state, we have to enforce policy */
+ if (xcon->cstate.session_state.read() != XioConnection::UP) {
+ std::lock_guard<decltype(xcon->sp) lg(xcon->sp);
+
+ if (xcon->cstate.session_state.read() != XioConnection::UP) {
+ xcon->outgoing.mqueue.push_back(*m);
+ return 0;
+ }
+ }
+
+ return _send_message_impl(m, xcon);
+} /* send_message(Message* m, Connection *con) */
+
+int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon)
+{
+ int code = 0;
+
+ Mutex::Locker l(xcon->lock);
+ if (unlikely(XioPool::trace_mempool)) {
+ static uint32_t nreqs;
+ if (unlikely((++nreqs % 65536) == 0)) {
+ xp_stats.dump(__func__, nreqs);
+ }
+ }
+
+ m->set_seq(xcon->state.next_out_seq());
+ m->set_magic(magic); // trace flags and special handling
+
+ m->encode(xcon->get_features(), this->crcflags);
+
+ buffer::list &payload = m->get_payload();
+ buffer::list &middle = m->get_middle();
+ buffer::list &data = m->get_data();
+
+ int msg_off = 0;
+ int req_off = 0;
+ int req_size = 0;
+ int nbuffers =
+ xio_count_buffers(payload, req_size, msg_off, req_off) +
+ xio_count_buffers(middle, req_size, msg_off, req_off) +
+ xio_count_buffers(data, req_size, msg_off, req_off);
+
+ int ex_cnt = req_off;
+ if (msg_off == 0 && ex_cnt > 0) {
+ // no buffers for last msg
+ ldout(cct,10) << "msg_off 0, ex_cnt " << ex_cnt << " -> " << ex_cnt-1 << dendl;
+ ex_cnt--;
+ }
+
+ /* get an XioMsg frame */
+ XioMsg *xmsg = pool_alloc_xio_msg(m, xcon, ex_cnt);
+ if (! xmsg) {
+ /* could happen if Accelio has been shutdown */
+ return ENOMEM;
+ }
+
+ ldout(cct,4) << __func__ << " " << m << " new XioMsg " << xmsg
+ << " tag " << (int)xmsg->hdr.tag
+ << " req_0 " << xmsg->get_xio_msg() << " msg type " << m->get_type()
+ << " features: " << xcon->get_features()
+ << " conn " << xcon->conn << " sess " << xcon->session << dendl;
+
+ if (magic & (MSG_MAGIC_XIO)) {
+
+ /* XXXX verify */
+ switch (m->get_type()) {
+ case 43:
+ // case 15:
+ ldout(cct,4) << __func__ << " stop 43 " << m->get_type() << " " << *m << dendl;
+ buffer::list &payload = m->get_payload();
+ ldout(cct,4) << __func__ << " payload dump:" << dendl;
+ payload.hexdump(cout);
+ }
+ }
+
+ struct xio_msg *req = xmsg->get_xio_msg();
+ struct xio_iovec_ex *msg_iov = req->out.pdata_iov.sglist;
+
+ if (magic & (MSG_MAGIC_XIO)) {
+ ldout(cct,4) << "payload: " << payload.buffers().size() <<
+ " middle: " << middle.buffers().size() <<
+ " data: " << data.buffers().size() <<
+ dendl;
+ }
+
+ if (unlikely(ex_cnt > 0)) {
+ ldout(cct,4) << __func__ << " buffer cnt > XIO_MSGR_IOVLEN (" <<
+ ((XIO_MSGR_IOVLEN-1) + nbuffers) << ")" << dendl;
+ }
+
+ /* do the invariant part */
+ msg_off = 0;
+ req_off = -1; /* most often, not used */
+ req_size = 0;
+
+ xio_place_buffers(payload, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
+ req_off, BUFFER_PAYLOAD);
+
+ xio_place_buffers(middle, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
+ req_off, BUFFER_MIDDLE);
+
+ xio_place_buffers(data, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
+ req_off, BUFFER_DATA);
+ ldout(cct,10) << "ex_cnt " << ex_cnt << ", req_off " << req_off
+ << ", msg_cnt " << xmsg->get_msg_count() << dendl;
+
+ /* finalize request */
+ if (msg_off)
+ req->out.pdata_iov.nents = msg_off;
+
+ /* fixup first msg */
+ req = xmsg->get_xio_msg();
+
+ const std::list<buffer::ptr>& header = xmsg->hdr.get_bl().buffers();
+ ceph_assert(header.size() == 1); /* XXX */
+ list<bufferptr>::const_iterator pb = header.begin();
+ req->out.header.iov_base = (char*) pb->c_str();
+ req->out.header.iov_len = pb->length();
+
+ /* deliver via xio, preserve ordering */
+ if (xmsg->get_msg_count() > 1) {
+ struct xio_msg *head = xmsg->get_xio_msg();
+ struct xio_msg *tail = head;
+ for (req_off = 0; ((unsigned) req_off) < xmsg->get_msg_count()-1; ++req_off) {
+ req = &xmsg->req_arr[req_off].msg;
+assert(!req->in.pdata_iov.nents);
+assert(req->out.pdata_iov.nents || !nbuffers);
+ tail->next = req;
+ tail = req;
+ }
+ tail->next = NULL;
+ }
+ xmsg->trace = m->trace;
+ m->trace.event("xio portal enqueue for send");
+ m->trace.keyval("xio message segments", xmsg->hdr.msg_cnt);
+ xcon->portal->enqueue_for_send(xcon, xmsg);
+
+ return code;
+} /* send_message(Message *, Connection *) */
+
+int XioMessenger::shutdown()
+{
+ shutdown_called = true;
+ conns_sp.lock();
+ XioConnection::ConnList::iterator iter;
+ iter = conns_list.begin();
+ for (iter = conns_list.begin(); iter != conns_list.end(); ++iter) {
+ (void) iter->disconnect(); // XXX mark down?
+ }
+ conns_sp.unlock();
+ while(nsessions > 0) {
+ Mutex::Locker lck(sh_mtx);
+ if (nsessions > 0)
+ sh_cond.Wait(sh_mtx);
+ }
+ portals.shutdown();
+ dispatch_strategy->shutdown();
+ did_bind = false;
+ started = false;
+ return 0;
+} /* shutdown */
+
+ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest)
+{
+ if (shutdown_called)
+ return NULL;
+
+ const entity_inst_t& self_inst = get_myinst();
+ if ((&dest == &self_inst) ||
+ (dest == self_inst)) {
+ return get_loopback_connection();
+ }
+
+ conns_sp.lock();
+ XioConnection::EntitySet::iterator conn_iter =
+ conns_entity_map.find(dest, XioConnection::EntityComp());
+ if (conn_iter != conns_entity_map.end()) {
+ ConnectionRef cref = &(*conn_iter);
+ conns_sp.unlock();
+ return cref;
+ }
+ else {
+ conns_sp.unlock();
+ string xio_uri = xio_uri_from_entity(cct->_conf->xio_transport_type,
+ dest.addr, true /* want_port */);
+
+ ldout(cct,4) << "XioMessenger " << this << " get_connection: xio_uri "
+ << xio_uri << dendl;
+
+ /* XXX client session creation parameters */
+ struct xio_session_params params = {};
+ params.type = XIO_SESSION_CLIENT;
+ params.ses_ops = &xio_msgr_ops;
+ params.user_context = this;
+ params.uri = xio_uri.c_str();
+
+ XioConnection *xcon = new XioConnection(this, XioConnection::ACTIVE,
+ dest);
+
+ xcon->session = xio_session_create(&params);
+ if (! xcon->session) {
+ delete xcon;
+ return NULL;
+ }
+
+ /* this should cause callbacks with user context of conn, but
+ * we can always set it explicitly */
+ struct xio_connection_params xcp = {};
+ xcp.session = xcon->session;
+ xcp.ctx = xcon->portal->ctx;
+ xcp.conn_user_context = xcon;
+
+ xcon->conn = xio_connect(&xcp);
+ if (!xcon->conn) {
+ xio_session_destroy(xcon->session);
+ delete xcon;
+ return NULL;
+ }
+
+ nsessions++;
+ xcon->connected = true;
+
+ /* sentinel ref */
+ xcon->get(); /* xcon->nref == 1 */
+ conns_sp.lock();
+ conns_list.push_back(*xcon);
+ conns_entity_map.insert(*xcon);
+ conns_sp.unlock();
+
+ /* XXXX pre-merge of session startup negotiation ONLY! */
+ xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
+
+ ldout(cct,2) << "New connection xcon: " << xcon <<
+ " up_ready on session " << xcon->session <<
+ " on msgr: " << this << " portal: " << xcon->portal << dendl;
+
+ return xcon->get(); /* nref +1 */
+ }
+} /* get_connection */
+
+ConnectionRef XioMessenger::get_loopback_connection()
+{
+ return (loop_con.get());
+} /* get_loopback_connection */
+
+void XioMessenger::unregister_xcon(XioConnection *xcon)
+{
+ std::lock_guard<decltype(conns_sp)> lckr(conns_sp);
+
+ XioConnection::EntitySet::iterator conn_iter =
+ conns_entity_map.find(xcon->peer, XioConnection::EntityComp());
+ if (conn_iter != conns_entity_map.end()) {
+ XioConnection *xcon2 = &(*conn_iter);
+ if (xcon == xcon2) {
+ conns_entity_map.erase(conn_iter);
+ }
+ }
+
+ /* check if citer on conn_list */
+ if (xcon->conns_hook.is_linked()) {
+ /* now find xcon on conns_list and erase */
+ XioConnection::ConnList::iterator citer =
+ XioConnection::ConnList::s_iterator_to(*xcon);
+ conns_list.erase(citer);
+ }
+}
+
+void XioMessenger::mark_down(const entity_addr_t& addr)
+{
+ entity_inst_t inst(entity_name_t(), addr);
+ std::lock_guard<decltype(conns_sp)> lckr(conns_sp);
+ XioConnection::EntitySet::iterator conn_iter =
+ conns_entity_map.find(inst, XioConnection::EntityComp());
+ if (conn_iter != conns_entity_map.end()) {
+ (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
+ }
+} /* mark_down(const entity_addr_t& */
+
+void XioMessenger::mark_down(Connection* con)
+{
+ XioConnection *xcon = static_cast<XioConnection*>(con);
+ xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
+} /* mark_down(Connection*) */
+
+void XioMessenger::mark_down_all()
+{
+ std::lock_guard<decltype(conns_sp)> lckr(conns_sp);
+ XioConnection::EntitySet::iterator conn_iter;
+ for (conn_iter = conns_entity_map.begin(); conn_iter !=
+ conns_entity_map.begin(); ++conn_iter) {
+ (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
+ }
+} /* mark_down_all */
+
+static inline XioMarkDownHook* pool_alloc_markdown_hook(
+ XioConnection *xcon, Message *m)
+{
+ struct xio_reg_mem mp_mem;
+ int e = xio_mempool_alloc(xio_msgr_noreg_mpool,
+ sizeof(XioMarkDownHook), &mp_mem);
+ if (!!e)
+ return NULL;
+ XioMarkDownHook *hook = static_cast<XioMarkDownHook*>(mp_mem.addr);
+ new (hook) XioMarkDownHook(xcon, m, mp_mem);
+ return hook;
+}
+
+void XioMessenger::mark_down_on_empty(Connection* con)
+{
+ XioConnection *xcon = static_cast<XioConnection*>(con);
+ MNop* m = new MNop();
+ m->tag = XIO_NOP_TAG_MARKDOWN;
+ m->set_completion_hook(pool_alloc_markdown_hook(xcon, m));
+ // stall new messages
+ xcon->cstate.session_state = XioConnection::session_states::BARRIER;
+ (void) _send_message_impl(m, xcon);
+}
+
+void XioMessenger::mark_disposable(Connection *con)
+{
+ XioConnection *xcon = static_cast<XioConnection*>(con);
+ xcon->_mark_disposable(XioConnection::CState::OP_FLAG_NONE);
+}
+
+void XioMessenger::try_insert(XioConnection *xcon)
+{
+ std::lock_guard<decltype(conns_sp)> lckr(conns_sp);
+ /* already resident in conns_list */
+ conns_entity_map.insert(*xcon);
+}
+
+XioMessenger::~XioMessenger()
+{
+ delete dispatch_strategy;
+ nInstances--;
+} /* dtor */