summaryrefslogtreecommitdiffstats
path: root/src/msg/xio
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/msg/xio/XioConnection.cc858
-rw-r--r--src/msg/xio/XioConnection.h380
-rw-r--r--src/msg/xio/XioInSeq.h84
-rw-r--r--src/msg/xio/XioMessenger.cc1136
-rw-r--r--src/msg/xio/XioMessenger.h176
-rw-r--r--src/msg/xio/XioMsg.cc51
-rw-r--r--src/msg/xio/XioMsg.h446
-rw-r--r--src/msg/xio/XioPool.cc41
-rw-r--r--src/msg/xio/XioPool.h218
-rw-r--r--src/msg/xio/XioPortal.cc98
-rw-r--r--src/msg/xio/XioPortal.h458
-rw-r--r--src/msg/xio/XioSubmit.h58
12 files changed, 4004 insertions, 0 deletions
diff --git a/src/msg/xio/XioConnection.cc b/src/msg/xio/XioConnection.cc
new file mode 100644
index 00000000..4bfab39b
--- /dev/null
+++ b/src/msg/xio/XioConnection.cc
@@ -0,0 +1,858 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "XioMsg.h"
+#include "XioConnection.h"
+#include "XioMessenger.h"
+#include "messages/MDataPing.h"
+#include "msg/msg_types.h"
+#include "auth/none/AuthNoneProtocol.h" // XXX
+
+#include "include/ceph_assert.h"
+#include "common/dout.h"
+
+extern struct xio_mempool *xio_msgr_mpool;
+extern struct xio_mempool *xio_msgr_noreg_mpool;
+
+#define dout_subsys ceph_subsys_xio
+
+void print_xio_msg_hdr(CephContext *cct, const char *tag,
+ const XioMsgHdr &hdr, const struct xio_msg *msg)
+{
+ if (msg) {
+ ldout(cct,4) << tag <<
+ " xio msg:" <<
+ " sn: " << msg->sn <<
+ " timestamp: " << msg->timestamp <<
+ dendl;
+ }
+
+ ldout(cct,4) << tag <<
+ " ceph header: " <<
+ " front_len: " << hdr.hdr->front_len <<
+ " seq: " << hdr.hdr->seq <<
+ " tid: " << hdr.hdr->tid <<
+ " type: " << hdr.hdr->type <<
+ " prio: " << hdr.hdr->priority <<
+ " name type: " << (int) hdr.hdr->src.type <<
+ " name num: " << (int) hdr.hdr->src.num <<
+ " version: " << hdr.hdr->version <<
+ " compat_version: " << hdr.hdr->compat_version <<
+ " front_len: " << hdr.hdr->front_len <<
+ " middle_len: " << hdr.hdr->middle_len <<
+ " data_len: " << hdr.hdr->data_len <<
+ " xio header: " <<
+ " msg_cnt: " << hdr.msg_cnt <<
+ dendl;
+
+ ldout(cct,4) << tag <<
+ " ceph footer: " <<
+ " front_crc: " << hdr.ftr->front_crc <<
+ " middle_crc: " << hdr.ftr->middle_crc <<
+ " data_crc: " << hdr.ftr->data_crc <<
+ " sig: " << hdr.ftr->sig <<
+ " flags: " << (uint32_t) hdr.ftr->flags <<
+ dendl;
+}
+
+void print_ceph_msg(CephContext *cct, const char *tag, Message *m)
+{
+ if (m->get_magic() & (MSG_MAGIC_XIO & MSG_MAGIC_TRACE_DTOR)) {
+ ceph_msg_header& header = m->get_header();
+ ldout(cct,4) << tag << " header version " << header.version <<
+ " compat version " << header.compat_version <<
+ dendl;
+ }
+}
+
+#undef dout_prefix
+#define dout_prefix conn_prefix(_dout)
+ostream& XioConnection::conn_prefix(std::ostream *_dout) {
+ return *_dout << "-- " << get_messenger()->get_myinst().addr << " >> " << peer_addr
+ << " peer=" << peer.name.type_str()
+ << " conn=" << conn << " sess=" << session << " ";
+}
+
+XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type,
+ const entity_inst_t& _peer) :
+ Connection(m->cct, m),
+ xio_conn_type(_type),
+ portal(m->get_portal()),
+ connected(false),
+ peer(_peer),
+ session(NULL),
+ conn(NULL),
+ magic(m->get_magic()),
+ scount(0),
+ send_ctr(0),
+ in_seq(),
+ cstate(this)
+{
+ set_peer_type(peer.name.type());
+ set_peer_addr(peer.addr);
+
+ Messenger::Policy policy;
+ int64_t max_msgs = 0, max_bytes = 0, bytes_opt = 0;
+ int xopt;
+
+ policy = m->get_policy(peer_type);
+
+ if (policy.throttler_messages) {
+ max_msgs = policy.throttler_messages->get_max();
+ ldout(m->cct,4) << "XioMessenger throttle_msgs: " << max_msgs << dendl;
+ }
+
+ xopt = m->cct->_conf->xio_queue_depth;
+ if (max_msgs > xopt)
+ xopt = max_msgs;
+
+ /* set high mark for send, reserved 20% for credits */
+ q_high_mark = xopt * 4 / 5;
+ q_low_mark = q_high_mark/2;
+
+ /* set send & receive msgs queue depth */
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_MSGS,
+ &xopt, sizeof(xopt));
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_MSGS,
+ &xopt, sizeof(xopt));
+
+ if (policy.throttler_bytes) {
+ max_bytes = policy.throttler_bytes->get_max();
+ ldout(m->cct,4) << "XioMessenger throttle_bytes: " << max_bytes << dendl;
+ }
+
+ bytes_opt = (2 << 28); /* default: 512 MB */
+ if (max_bytes > bytes_opt)
+ bytes_opt = max_bytes;
+
+ /* set send & receive total bytes throttle */
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_BYTES,
+ &bytes_opt, sizeof(bytes_opt));
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_BYTES,
+ &bytes_opt, sizeof(bytes_opt));
+
+ ldout(m->cct,4) << "throttle_msgs: " << xopt << " throttle_bytes: " << bytes_opt << dendl;
+
+ /* XXXX fake features, aieee! */
+ set_features(XIO_ALL_FEATURES);
+}
+
+int XioConnection::send_message(Message *m)
+{
+ XioMessenger *ms = static_cast<XioMessenger*>(get_messenger());
+ return ms->_send_message(m, this);
+}
+
+void XioConnection::send_keepalive_or_ack(bool ack, const utime_t *tp)
+{
+ /* If con is not in READY state, we need to queue the request */
+ if (cstate.session_state.read() != XioConnection::UP) {
+ std::lock_guad<ceph::util::spinlock> lg(sp);
+ if (cstate.session_state.read() != XioConnection::UP) {
+ if (ack) {
+ outgoing.ack = true;
+ outgoing.ack_time = *tp;
+ }
+ else {
+ outgoing.keepalive = true;
+ }
+ return;
+ }
+ }
+
+ send_keepalive_or_ack_internal(ack, tp);
+}
+
+void XioConnection::send_keepalive_or_ack_internal(bool ack, const utime_t *tp)
+{
+ XioCommand *xcmd = pool_alloc_xio_command(this);
+ if (! xcmd) {
+ /* could happen if Accelio has been shutdown */
+ return;
+ }
+
+ struct ceph_timespec ts;
+ if (ack) {
+ ceph_assert(tp);
+ tp->encode_timeval(&ts);
+ xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
+ xcmd->get_bl_ref().append((char*)&ts, sizeof(ts));
+ } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
+ utime_t t = ceph_clock_now();
+ t.encode_timeval(&ts);
+ xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2);
+ xcmd->get_bl_ref().append((char*)&ts, sizeof(ts));
+ } else {
+ xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE);
+ }
+
+ const std::list<buffer::ptr>& header = xcmd->get_bl_ref().buffers();
+ ceph_assert(header.size() == 1); /* accelio header must be without scatter gather */
+ list<bufferptr>::const_iterator pb = header.begin();
+ ceph_assert(pb->length() < XioMsgHdr::get_max_encoded_length());
+ struct xio_msg * msg = xcmd->get_xio_msg();
+ msg->out.header.iov_base = (char*) pb->c_str();
+ msg->out.header.iov_len = pb->length();
+
+ ldout(msgr->cct,8) << __func__ << " sending command with tag " << (int)(*(char*)msg->out.header.iov_base)
+ << " len " << msg->out.header.iov_len << dendl;
+
+ portal->enqueue(this, xcmd);
+}
+
+
+int XioConnection::passive_setup()
+{
+ /* XXX passive setup is a placeholder for (potentially active-side
+ initiated) feature and auth* negotiation */
+ static bufferlist authorizer_reply; /* static because fake */
+ static CryptoKey session_key; /* ditto */
+ bool authorizer_valid;
+
+ XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger());
+
+ // fake an auth buffer
+ EntityName name;
+ name.set_type(peer.name.type());
+
+ AuthNoneAuthorizer auth;
+ auth.build_authorizer(name, peer.name.num());
+
+ /* XXX fake authorizer! */
+ msgr->ms_deliver_verify_authorizer(
+ this, peer_type, CEPH_AUTH_NONE,
+ auth.bl,
+ 0,
+ authorizer_reply,
+ authorizer_valid,
+ session_key);
+
+ /* notify hook */
+ msgr->ms_deliver_handle_accept(this);
+ msgr->ms_deliver_handle_fast_accept(this);
+
+ /* try to insert in conns_entity_map */
+ msgr->try_insert(this);
+ return (0);
+}
+
+static inline XioDispatchHook* pool_alloc_xio_dispatch_hook(
+ XioConnection *xcon, Message *m, XioInSeq& msg_seq)
+{
+ struct xio_reg_mem mp_mem;
+ int e = xpool_alloc(xio_msgr_noreg_mpool,
+ sizeof(XioDispatchHook), &mp_mem);
+ if (!!e)
+ return NULL;
+ XioDispatchHook *xhook = static_cast<XioDispatchHook*>(mp_mem.addr);
+ new (xhook) XioDispatchHook(xcon, m, msg_seq, mp_mem);
+ return xhook;
+}
+
+int XioConnection::handle_data_msg(struct xio_session *session,
+ struct xio_msg *msg,
+ int more_in_batch,
+ void *cb_user_context)
+{
+ struct xio_msg *tmsg = msg;
+
+ /* XXX Accelio guarantees message ordering at
+ * xio_session */
+
+ if (! in_seq.p()) {
+ if (!tmsg->in.header.iov_len) {
+ ldout(msgr->cct,0) << __func__ << " empty header: packet out of sequence?" << dendl;
+ xio_release_msg(msg);
+ return 0;
+ }
+ const size_t sizeof_tag = 1;
+ XioMsgCnt msg_cnt(
+ buffer::create_static(tmsg->in.header.iov_len-sizeof_tag,
+ ((char*) tmsg->in.header.iov_base)+sizeof_tag));
+ ldout(msgr->cct,10) << __func__ << " receive msg " << "tmsg " << tmsg
+ << " msg_cnt " << msg_cnt.msg_cnt
+ << " iov_base " << tmsg->in.header.iov_base
+ << " iov_len " << (int) tmsg->in.header.iov_len
+ << " nents " << tmsg->in.pdata_iov.nents
+ << " sn " << tmsg->sn << dendl;
+ ceph_assert(session == this->session);
+ in_seq.set_count(msg_cnt.msg_cnt);
+ } else {
+ /* XXX major sequence error */
+ ceph_assert(! tmsg->in.header.iov_len);
+ }
+
+ in_seq.append(msg);
+ if (in_seq.count() > 0) {
+ return 0;
+ }
+
+ XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger());
+ XioDispatchHook *m_hook =
+ pool_alloc_xio_dispatch_hook(this, NULL /* msg */, in_seq);
+ XioInSeq& msg_seq = m_hook->msg_seq;
+ in_seq.clear();
+
+ ceph_msg_header header;
+ ceph_msg_footer footer;
+ buffer::list payload, middle, data;
+
+ const utime_t recv_stamp = ceph_clock_now();
+
+ ldout(msgr->cct,4) << __func__ << " " << "msg_seq.size()=" << msg_seq.size() <<
+ dendl;
+
+ struct xio_msg* msg_iter = msg_seq.begin();
+ tmsg = msg_iter;
+ XioMsgHdr hdr(header, footer,
+ buffer::create_static(tmsg->in.header.iov_len,
+ (char*) tmsg->in.header.iov_base));
+
+ if (magic & (MSG_MAGIC_TRACE_XCON)) {
+ if (hdr.hdr->type == 43) {
+ print_xio_msg_hdr(msgr->cct, "on_msg", hdr, NULL);
+ }
+ }
+
+ unsigned int ix, blen, iov_len;
+ struct xio_iovec_ex *msg_iov, *iovs;
+ uint32_t take_len, left_len = 0;
+ char *left_base = NULL;
+
+ ix = 0;
+ blen = header.front_len;
+
+ while (blen && (msg_iter != msg_seq.end())) {
+ tmsg = msg_iter;
+ iov_len = vmsg_sglist_nents(&tmsg->in);
+ iovs = vmsg_sglist(&tmsg->in);
+ for (; blen && (ix < iov_len); ++ix) {
+ msg_iov = &iovs[ix];
+
+ /* XXX need to detect any buffer which needs to be
+ * split due to coalescing of a segment (front, middle,
+ * data) boundary */
+
+ take_len = std::min(blen, msg_iov->iov_len);
+ payload.append(
+ buffer::create_msg(
+ take_len, (char*) msg_iov->iov_base, m_hook));
+ blen -= take_len;
+ if (! blen) {
+ left_len = msg_iov->iov_len - take_len;
+ if (left_len) {
+ left_base = ((char*) msg_iov->iov_base) + take_len;
+ }
+ }
+ }
+ /* XXX as above, if a buffer is split, then we needed to track
+ * the new start (carry) and not advance */
+ if (ix == iov_len) {
+ msg_seq.next(&msg_iter);
+ ix = 0;
+ }
+ }
+
+ if (magic & (MSG_MAGIC_TRACE_XCON)) {
+ if (hdr.hdr->type == 43) {
+ ldout(msgr->cct,4) << "front (payload) dump:";
+ payload.hexdump( *_dout );
+ *_dout << dendl;
+ }
+ }
+
+ blen = header.middle_len;
+
+ if (blen && left_len) {
+ middle.append(
+ buffer::create_msg(left_len, left_base, m_hook));
+ left_len = 0;
+ }
+
+ while (blen && (msg_iter != msg_seq.end())) {
+ tmsg = msg_iter;
+ iov_len = vmsg_sglist_nents(&tmsg->in);
+ iovs = vmsg_sglist(&tmsg->in);
+ for (; blen && (ix < iov_len); ++ix) {
+ msg_iov = &iovs[ix];
+ take_len = std::min(blen, msg_iov->iov_len);
+ middle.append(
+ buffer::create_msg(
+ take_len, (char*) msg_iov->iov_base, m_hook));
+ blen -= take_len;
+ if (! blen) {
+ left_len = msg_iov->iov_len - take_len;
+ if (left_len) {
+ left_base = ((char*) msg_iov->iov_base) + take_len;
+ }
+ }
+ }
+ if (ix == iov_len) {
+ msg_seq.next(&msg_iter);
+ ix = 0;
+ }
+ }
+
+ blen = header.data_len;
+
+ if (blen && left_len) {
+ data.append(
+ buffer::create_msg(left_len, left_base, m_hook));
+ left_len = 0;
+ }
+
+ while (blen && (msg_iter != msg_seq.end())) {
+ tmsg = msg_iter;
+ iov_len = vmsg_sglist_nents(&tmsg->in);
+ iovs = vmsg_sglist(&tmsg->in);
+ for (; blen && (ix < iov_len); ++ix) {
+ msg_iov = &iovs[ix];
+ data.append(
+ buffer::create_msg(
+ msg_iov->iov_len, (char*) msg_iov->iov_base, m_hook));
+ blen -= msg_iov->iov_len;
+ }
+ if (ix == iov_len) {
+ msg_seq.next(&msg_iter);
+ ix = 0;
+ }
+ }
+
+ /* update connection timestamp */
+ recv = tmsg->timestamp;
+
+ Message *m = decode_message(msgr->cct, msgr->crcflags, header, footer,
+ payload, middle, data, this);
+
+ if (m) {
+ /* completion */
+ m->set_connection(this);
+
+ /* reply hook */
+ m_hook->set_message(m);
+ m->set_completion_hook(m_hook);
+
+ /* trace flag */
+ m->set_magic(magic);
+
+ /* update timestamps */
+ m->set_recv_stamp(recv_stamp);
+ m->set_recv_complete_stamp(ceph_clock_now());
+ m->set_seq(header.seq);
+
+ /* MP-SAFE */
+ state.set_in_seq(header.seq);
+
+ /* XXXX validate peer type */
+ if (peer_type != (int) hdr.peer_type) { /* XXX isn't peer_type -1? */
+ peer_type = hdr.peer_type;
+ peer_addr = hdr.addr;
+ peer.addr = peer_addr;
+ peer.name = entity_name_t(hdr.hdr->src);
+ if (xio_conn_type == XioConnection::PASSIVE) {
+ /* XXX kick off feature/authn/authz negotiation
+ * nb: very possibly the active side should initiate this, but
+ * for now, call a passive hook so OSD and friends can create
+ * sessions without actually negotiating
+ */
+ passive_setup();
+ }
+ }
+
+ if (magic & (MSG_MAGIC_TRACE_XCON)) {
+ ldout(msgr->cct,4) << "decode m is " << m->get_type() << dendl;
+ }
+
+ /* dispatch it */
+ msgr->ds_dispatch(m);
+ } else {
+ /* responds for undecoded messages and frees hook */
+ ldout(msgr->cct,4) << "decode m failed" << dendl;
+ m_hook->on_err_finalize(this);
+ }
+
+ return 0;
+}
+
+int XioConnection::on_msg(struct xio_session *session,
+ struct xio_msg *msg,
+ int more_in_batch,
+ void *cb_user_context)
+{
+ char tag = CEPH_MSGR_TAG_MSG;
+ if (msg->in.header.iov_len)
+ tag = *(char*)msg->in.header.iov_base;
+
+ ldout(msgr->cct,8) << __func__ << " receive msg with iov_len "
+ << (int) msg->in.header.iov_len << " tag " << (int)tag << dendl;
+
+ //header_len_without_tag is only meaningful in case we have tag
+ size_t header_len_without_tag = msg->in.header.iov_len - sizeof(tag);
+
+ switch(tag) {
+ case CEPH_MSGR_TAG_MSG:
+ ldout(msgr->cct, 20) << __func__ << " got data message" << dendl;
+ return handle_data_msg(session, msg, more_in_batch, cb_user_context);
+
+ case CEPH_MSGR_TAG_KEEPALIVE:
+ ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl;
+ set_last_keepalive(ceph_clock_now());
+ break;
+
+ case CEPH_MSGR_TAG_KEEPALIVE2:
+ if (header_len_without_tag < sizeof(ceph_timespec)) {
+ lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2: got " << header_len_without_tag <<
+ " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl;
+ }
+ else {
+ ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag));
+ utime_t kp_t = utime_t(*t);
+ ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2 with timestamp" << kp_t << dendl;
+ send_keepalive_or_ack(true, &kp_t);
+ set_last_keepalive(ceph_clock_now());
+ }
+
+ break;
+
+ case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
+ if (header_len_without_tag < sizeof(ceph_timespec)) {
+ lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2_ACK: got " << header_len_without_tag <<
+ " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl;
+ }
+ else {
+ ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag));
+ utime_t kp_t(*t);
+ ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2_ACK with timestamp" << kp_t << dendl;
+ set_last_keepalive_ack(kp_t);
+ }
+ break;
+
+ default:
+ lderr(msgr->cct) << __func__ << " unsupported message tag " << (int) tag << dendl;
+ ceph_assert(! "unsupported message tag");
+ }
+
+ xio_release_msg(msg);
+ return 0;
+}
+
+
+int XioConnection::on_ow_msg_send_complete(struct xio_session *session,
+ struct xio_msg *req,
+ void *conn_user_context)
+{
+ /* requester send complete (one-way) */
+ uint64_t rc = ++scount;
+
+ XioSend* xsend = static_cast<XioSend*>(req->user_context);
+ if (unlikely(magic & MSG_MAGIC_TRACE_CTR)) {
+ if (unlikely((rc % 1000000) == 0)) {
+ std::cout << "xio finished " << rc << " " << time(0) << std::endl;
+ }
+ } /* trace ctr */
+
+ ldout(msgr->cct,11) << "on_msg_delivered xcon: " << xsend->xcon <<
+ " msg: " << req << " sn: " << req->sn << dendl;
+
+ XioMsg *xmsg = dynamic_cast<XioMsg*>(xsend);
+ if (xmsg) {
+ ldout(msgr->cct,11) << "on_msg_delivered xcon: " <<
+ " type: " << xmsg->m->get_type() << " tid: " << xmsg->m->get_tid() <<
+ " seq: " << xmsg->m->get_seq() << dendl;
+ }
+
+ --send_ctr; /* atomic, because portal thread */
+
+ /* unblock flow-controlled connections, avoid oscillation */
+ if (unlikely(cstate.session_state.read() ==
+ XioConnection::FLOW_CONTROLLED)) {
+ if ((send_ctr <= uint32_t(xio_qdepth_low_mark())) &&
+ (1 /* XXX memory <= memory low-water mark */)) {
+ cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
+ ldout(msgr->cct,2) << "on_msg_delivered xcon: " << xsend->xcon
+ << " up_ready from flow_controlled" << dendl;
+ }
+ }
+
+ xsend->put();
+
+ return 0;
+} /* on_msg_delivered */
+
+void XioConnection::msg_send_fail(XioSend *xsend, int code)
+{
+ ldout(msgr->cct,2) << "xio_send_msg FAILED xcon: " << this <<
+ " msg: " << xsend->get_xio_msg() << " code=" << code <<
+ " (" << xio_strerror(code) << ")" << dendl;
+ /* return refs taken for each xio_msg */
+ xsend->put_msg_refs();
+} /* msg_send_fail */
+
+void XioConnection::msg_release_fail(struct xio_msg *msg, int code)
+{
+ ldout(msgr->cct,2) << "xio_release_msg FAILED xcon: " << this <<
+ " msg: " << msg << "code=" << code <<
+ " (" << xio_strerror(code) << ")" << dendl;
+} /* msg_release_fail */
+
+int XioConnection::flush_out_queues(uint32_t flags) {
+ XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger());
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ sp.lock();
+
+ if (outgoing.keepalive) {
+ outgoing.keepalive = false;
+ send_keepalive_or_ack_internal();
+ }
+
+ if (outgoing.ack) {
+ outgoing.ack = false;
+ send_keepalive_or_ack_internal(true, &outgoing.ack_time);
+ }
+
+ // send deferred 1 (direct backpresssure)
+ if (outgoing.requeue.size() > 0)
+ portal->requeue(this, outgoing.requeue);
+
+ // send deferred 2 (sent while deferred)
+ int ix, q_size = outgoing.mqueue.size();
+ for (ix = 0; ix < q_size; ++ix) {
+ Message::Queue::iterator q_iter = outgoing.mqueue.begin();
+ Message* m = &(*q_iter);
+ outgoing.mqueue.erase(q_iter);
+ msgr->_send_message_impl(m, this);
+ }
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ sp.unlock();
+ return 0;
+}
+
+int XioConnection::discard_out_queues(uint32_t flags)
+{
+ Message::Queue disc_q;
+ XioSubmit::Queue deferred_q;
+
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ sp.lock();
+
+ /* the two send queues contain different objects:
+ * - anything on the mqueue is a Message
+ * - anything on the requeue is an XioSend
+ */
+ Message::Queue::const_iterator i1 = disc_q.end();
+ disc_q.splice(i1, outgoing.mqueue);
+
+ XioSubmit::Queue::const_iterator i2 = deferred_q.end();
+ deferred_q.splice(i2, outgoing.requeue);
+
+ outgoing.keepalive = outgoing.ack = false;
+
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ sp.unlock();
+
+ // mqueue
+ while (!disc_q.empty()) {
+ Message::Queue::iterator q_iter = disc_q.begin();
+ Message* m = &(*q_iter);
+ disc_q.erase(q_iter);
+ m->put();
+ }
+
+ // requeue
+ while (!deferred_q.empty()) {
+ XioSubmit::Queue::iterator q_iter = deferred_q.begin();
+ XioSubmit* xs = &(*q_iter);
+ XioSend* xsend;
+ switch (xs->type) {
+ case XioSubmit::OUTGOING_MSG:
+ xsend = static_cast<XioSend*>(xs);
+ deferred_q.erase(q_iter);
+ // release once for each chained xio_msg
+ xsend->put(xsend->get_msg_count());
+ break;
+ case XioSubmit::INCOMING_MSG_RELEASE:
+ deferred_q.erase(q_iter);
+ portal->release_xio_msg(static_cast<XioCompletion*>(xs));
+ break;
+ default:
+ ldout(msgr->cct,0) << __func__ << ": Unknown Msg type " << xs->type << dendl;
+ break;
+ }
+ }
+
+ return 0;
+}
+
+int XioConnection::adjust_clru(uint32_t flags)
+{
+ if (flags & CState::OP_FLAG_LOCKED)
+ sp.unlock();
+
+ XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger());
+ msgr->conns_sp.lock();
+ sp.lock();
+
+ if (cstate.flags & CState::FLAG_MAPPED) {
+ XioConnection::ConnList::iterator citer =
+ XioConnection::ConnList::s_iterator_to(*this);
+ msgr->conns_list.erase(citer);
+ msgr->conns_list.push_front(*this); // LRU
+ }
+
+ msgr->conns_sp.unlock();
+
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ sp.unlock();
+
+ return 0;
+}
+
+int XioConnection::on_msg_error(struct xio_session *session,
+ enum xio_status error,
+ struct xio_msg *msg,
+ void *conn_user_context)
+{
+ XioSend *xsend = static_cast<XioSend*>(msg->user_context);
+ if (xsend)
+ xsend->put();
+
+ --send_ctr; /* atomic, because portal thread */
+ return 0;
+} /* on_msg_error */
+
+void XioConnection::mark_down()
+{
+ _mark_down(XioConnection::CState::OP_FLAG_NONE);
+}
+
+int XioConnection::_mark_down(uint32_t flags)
+{
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ sp.lock();
+
+ // per interface comment, we only stage a remote reset if the
+ // current policy required it
+ if (cstate.policy.resetcheck)
+ cstate.flags |= CState::FLAG_RESET;
+
+ disconnect();
+
+ /* XXX this will almost certainly be called again from
+ * on_disconnect_event() */
+ discard_out_queues(flags|CState::OP_FLAG_LOCKED);
+
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ sp.unlock();
+
+ return 0;
+}
+
+void XioConnection::mark_disposable()
+{
+ _mark_disposable(XioConnection::CState::OP_FLAG_NONE);
+}
+
+int XioConnection::_mark_disposable(uint32_t flags)
+{
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ sp.lock();
+
+ cstate.policy.lossy = true;
+
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ sp.unlock();
+
+ return 0;
+}
+
+int XioConnection::CState::state_up_ready(uint32_t flags)
+{
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ xcon->sp.lock();
+
+ xcon->flush_out_queues(flags|CState::OP_FLAG_LOCKED);
+
+ session_state = session_states::UP;
+ startup_state = session_startup_states::READY;
+
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ xcon->sp.unlock();
+
+ return (0);
+}
+
+int XioConnection::CState::state_discon()
+{
+ session_state = session_states::DISCONNECTED;
+ startup_state = session_startup_states::IDLE;
+
+ return 0;
+}
+
+int XioConnection::CState::state_flow_controlled(uint32_t flags)
+{
+ if (! (flags & OP_FLAG_LOCKED))
+ xcon->sp.lock();
+
+ session_state = session_states::FLOW_CONTROLLED;
+
+ if (! (flags & OP_FLAG_LOCKED))
+ xcon->sp.unlock();
+
+ return (0);
+}
+
+int XioConnection::CState::state_fail(Message* m, uint32_t flags)
+{
+ if (! (flags & OP_FLAG_LOCKED))
+ xcon->sp.lock();
+
+ // advance to state FAIL, drop queued, msgs, adjust LRU
+ session_state = session_states::DISCONNECTED;
+ startup_state = session_startup_states::FAIL;
+
+ xcon->discard_out_queues(flags|OP_FLAG_LOCKED);
+ xcon->adjust_clru(flags|OP_FLAG_LOCKED|OP_FLAG_LRU);
+
+ xcon->disconnect();
+
+ if (! (flags & OP_FLAG_LOCKED))
+ xcon->sp.unlock();
+
+ // notify ULP
+ XioMessenger* msgr = static_cast<XioMessenger*>(xcon->get_messenger());
+ msgr->ms_deliver_handle_reset(xcon);
+ m->put();
+
+ return 0;
+}
+
+
+int XioLoopbackConnection::send_message(Message *m)
+{
+ XioMessenger *ms = static_cast<XioMessenger*>(get_messenger());
+ m->set_connection(this);
+ m->set_seq(next_seq());
+ m->set_src(ms->get_myinst().name);
+ ms->ds_dispatch(m);
+ return 0;
+}
+
+void XioLoopbackConnection::send_keepalive()
+{
+ utime_t t = ceph_clock_now();
+ set_last_keepalive(t);
+ set_last_keepalive_ack(t);
+}
diff --git a/src/msg/xio/XioConnection.h b/src/msg/xio/XioConnection.h
new file mode 100644
index 00000000..00024ef3
--- /dev/null
+++ b/src/msg/xio/XioConnection.h
@@ -0,0 +1,380 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIO_CONNECTION_H
+#define XIO_CONNECTION_H
+
+#include <atomic>
+
+#include <boost/intrusive/avl_set.hpp>
+#include <boost/intrusive/list.hpp>
+
+extern "C" {
+#include "libxio.h"
+}
+
+#include "XioInSeq.h"
+#include "XioSubmit.h"
+#include "msg/Connection.h"
+#include "msg/Messenger.h"
+#include "auth/AuthSessionHandler.h"
+
+#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL)
+
+
+#define XIO_NOP_TAG_MARKDOWN 0x0001
+
+namespace bi = boost::intrusive;
+
+class XioPortal;
+class XioMessenger;
+class XioSend;
+
+class XioConnection : public Connection
+{
+public:
+ enum type { ACTIVE, PASSIVE };
+
+ enum class session_states : unsigned {
+ INIT = 0,
+ START,
+ UP,
+ FLOW_CONTROLLED,
+ DISCONNECTED,
+ DELETED,
+ BARRIER
+ };
+
+ enum class session_startup_states : unsigned {
+ IDLE = 0,
+ CONNECTING,
+ ACCEPTING,
+ READY,
+ FAIL
+ };
+
+private:
+ XioConnection::type xio_conn_type;
+ XioPortal *portal;
+ std::atomic<bool> connected = { false };
+ entity_inst_t peer;
+ struct xio_session *session;
+ struct xio_connection *conn;
+ ceph::util::spinlock sp;
+ std::atomic<int64_t> send = { 0 };
+ std::atomic<int64_t> recv = { 0 };
+ uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials)
+ uint32_t magic;
+ uint32_t special_handling;
+ uint64_t scount;
+ uint32_t send_ctr;
+ int q_high_mark;
+ int q_low_mark;
+
+ struct lifecycle {
+ // different from Pipe states?
+ enum lf_state {
+ INIT,
+ LOCAL_DISCON,
+ REMOTE_DISCON,
+ RECONNECTING,
+ UP,
+ DEAD } state;
+
+ /* XXX */
+ uint32_t reconnects;
+ uint32_t connect_seq, peer_global_seq;
+ uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
+ std::atomic<int64_t> out_seq = { 0 };
+
+ lifecycle() : state(lifecycle::INIT), reconnects(0), connect_seq(0),
+ peer_global_seq(0), in_seq(0), out_seq_acked(0)
+ {}
+
+ void set_in_seq(uint64_t seq) {
+ in_seq = seq;
+ }
+
+ uint64_t next_out_seq() {
+ return ++out_seq;
+ }
+
+ } state;
+
+ /* batching */
+ XioInSeq in_seq;
+
+ class CState
+ {
+ public:
+ static const int FLAG_NONE = 0x0000;
+ static const int FLAG_BAD_AUTH = 0x0001;
+ static const int FLAG_MAPPED = 0x0002;
+ static const int FLAG_RESET = 0x0004;
+
+ static const int OP_FLAG_NONE = 0x0000;
+ static const int OP_FLAG_LOCKED = 0x0001;
+ static const int OP_FLAG_LRU = 0x0002;
+
+ uint64_t features;
+ Messenger::Policy policy;
+
+ CryptoKey session_key;
+ std::shared_ptr<AuthSessionHandler> session_security;
+ AuthAuthorizer *authorizer;
+ XioConnection *xcon;
+ uint32_t protocol_version;
+
+ std::atomic<session_states> session_state = { 0 };
+ std::atomic<session_startup_state> startup_state = { 0 };
+
+ uint32_t reconnects;
+ uint32_t connect_seq, global_seq, peer_global_seq;
+ uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
+ std::atomic<uint64_t> out_seq = { 0 };
+
+ uint32_t flags;
+
+ explicit CState(XioConnection* _xcon)
+ : features(0),
+ authorizer(NULL),
+ xcon(_xcon),
+ protocol_version(0),
+ session_state(INIT),
+ startup_state(IDLE),
+ reconnects(0),
+ connect_seq(0),
+ global_seq(0),
+ peer_global_seq(0),
+ in_seq(0),
+ out_seq_acked(0),
+ flags(FLAG_NONE) {}
+
+ uint64_t get_session_state() {
+ return session_state;
+ }
+
+ uint64_t get_startup_state() {
+ return startup_state;
+ }
+
+ void set_in_seq(uint64_t seq) {
+ in_seq = seq;
+ }
+
+ uint64_t next_out_seq() {
+ return ++out_seq;
+ };
+
+ // state machine
+ int init_state();
+ int next_state(Message* m);
+#if 0 // future (session startup)
+ int msg_connect(MConnect *m);
+ int msg_connect_reply(MConnectReply *m);
+ int msg_connect_reply(MConnectAuthReply *m);
+ int msg_connect_auth(MConnectAuth *m);
+ int msg_connect_auth_reply(MConnectAuthReply *m);
+#endif
+ int state_up_ready(uint32_t flags);
+ int state_flow_controlled(uint32_t flags);
+ int state_discon();
+ int state_fail(Message* m, uint32_t flags);
+
+ } cstate; /* CState */
+
+ // message submission queue
+ struct SendQ {
+ bool keepalive;
+ bool ack;
+ utime_t ack_time;
+ Message::Queue mqueue; // deferred
+ XioSubmit::Queue requeue;
+
+ SendQ():keepalive(false), ack(false){}
+ } outgoing;
+
+ // conns_entity_map comparison functor
+ struct EntityComp
+ {
+ // for internal ordering
+ bool operator()(const XioConnection &lhs, const XioConnection &rhs) const
+ { return lhs.get_peer() < rhs.get_peer(); }
+
+ // for external search by entity_inst_t(peer)
+ bool operator()(const entity_inst_t &peer, const XioConnection &c) const
+ { return peer < c.get_peer(); }
+
+ bool operator()(const XioConnection &c, const entity_inst_t &peer) const
+ { return c.get_peer() < peer; }
+ };
+
+ bi::list_member_hook<> conns_hook;
+ bi::avl_set_member_hook<> conns_entity_map_hook;
+
+ typedef bi::list< XioConnection,
+ bi::member_hook<XioConnection, bi::list_member_hook<>,
+ &XioConnection::conns_hook > > ConnList;
+
+ typedef bi::member_hook<XioConnection, bi::avl_set_member_hook<>,
+ &XioConnection::conns_entity_map_hook> EntityHook;
+
+ typedef bi::avl_set< XioConnection, EntityHook,
+ bi::compare<EntityComp> > EntitySet;
+
+ friend class XioPortal;
+ friend class XioMessenger;
+ friend class XioDispatchHook;
+ friend class XioMarkDownHook;
+ friend class XioSend;
+
+ int on_disconnect_event() {
+ std::lock_guard<ceph::spinlock> lg(sp);
+
+ connected = false;
+ discard_out_queues(CState::OP_FLAG_LOCKED);
+
+ return 0;
+ }
+
+ int on_teardown_event() {
+
+ {
+ std::lock_guard<ceph::spinlock> lg(sp);
+
+ if (conn)
+ xio_connection_destroy(conn);
+ conn = NULL;
+ }
+
+ this->put();
+ return 0;
+ }
+
+ int xio_qdepth_high_mark() {
+ return q_high_mark;
+ }
+
+ int xio_qdepth_low_mark() {
+ return q_low_mark;
+ }
+
+public:
+ XioConnection(XioMessenger *m, XioConnection::type _type,
+ const entity_inst_t& peer);
+
+ ~XioConnection() {
+ if (conn)
+ xio_connection_destroy(conn);
+ }
+ ostream& conn_prefix(std::ostream *_dout);
+
+ bool is_connected() override { return connected; }
+
+ int send_message(Message *m) override;
+ void send_keepalive() override {send_keepalive_or_ack();}
+ void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr);
+ void mark_down() override;
+ int _mark_down(uint32_t flags);
+ void mark_disposable() override;
+ int _mark_disposable(uint32_t flags);
+
+ const entity_inst_t& get_peer() const { return peer; }
+
+ XioConnection* get() {
+#if 0
+ cout << "XioConnection::get " << this << " " << nref.load() << std::endl;
+#endif
+ RefCountedObject::get();
+ return this;
+ }
+
+ void put() {
+ RefCountedObject::put();
+#if 0
+ cout << "XioConnection::put " << this << " " << nref.load() << std::endl;
+#endif
+ }
+
+ void disconnect() {
+ if (is_connected()) {
+ connected = false;
+ xio_disconnect(conn); // normal teardown will clean up conn
+ }
+ }
+
+ uint32_t get_magic() { return magic; }
+ void set_magic(int _magic) { magic = _magic; }
+ uint32_t get_special_handling() { return special_handling; }
+ void set_special_handling(int n) { special_handling = n; }
+ uint64_t get_scount() { return scount; }
+
+ int passive_setup(); /* XXX */
+
+ int handle_data_msg(struct xio_session *session, struct xio_msg *msg,
+ int more_in_batch, void *cb_user_context);
+ int on_msg(struct xio_session *session, struct xio_msg *msg,
+ int more_in_batch, void *cb_user_context);
+ int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg,
+ void *conn_user_context);
+ int on_msg_error(struct xio_session *session, enum xio_status error,
+ struct xio_msg *msg, void *conn_user_context);
+ void msg_send_fail(XioSend *xsend, int code);
+ void msg_release_fail(struct xio_msg *msg, int code);
+private:
+ void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr);
+ int flush_out_queues(uint32_t flags);
+ int discard_out_queues(uint32_t flags);
+ int adjust_clru(uint32_t flags);
+};
+
+typedef boost::intrusive_ptr<XioConnection> XioConnectionRef;
+
+class XioLoopbackConnection : public Connection
+{
+private:
+ std::atomic<uint64_t> seq = { 0 };
+public:
+ explicit XioLoopbackConnection(Messenger *m) : Connection(m->cct, m)
+ {
+ const entity_inst_t& m_inst = m->get_myinst();
+ peer_addr = m_inst.addr;
+ peer_type = m_inst.name.type();
+ set_features(XIO_ALL_FEATURES); /* XXXX set to ours */
+ }
+
+ XioLoopbackConnection* get() {
+ return static_cast<XioLoopbackConnection*>(RefCountedObject::get());
+ }
+
+ bool is_connected() override { return true; }
+
+ int send_message(Message *m) override;
+ void send_keepalive() override;
+ void mark_down() override {}
+ void mark_disposable() override {}
+
+ uint64_t get_seq() {
+ return seq;
+ }
+
+ uint64_t next_seq() {
+ return ++seq;
+ }
+};
+
+typedef boost::intrusive_ptr<XioLoopbackConnection> XioLoopbackConnectionRef;
+
+#endif /* XIO_CONNECTION_H */
diff --git a/src/msg/xio/XioInSeq.h b/src/msg/xio/XioInSeq.h
new file mode 100644
index 00000000..7863a8f6
--- /dev/null
+++ b/src/msg/xio/XioInSeq.h
@@ -0,0 +1,84 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIO_IN_SEQ_H
+#define XIO_IN_SEQ_H
+
+#include <boost/intrusive/list.hpp>
+#include "msg/SimplePolicyMessenger.h"
+extern "C" {
+#include "libxio.h"
+}
+
+/* For inbound messages (Accelio-owned) ONLY, use the message's
+ * user_context as an SLIST */
+class XioInSeq {
+private:
+ int cnt;
+ int sz;
+ struct xio_msg* head;
+ struct xio_msg* tail;
+
+public:
+ XioInSeq() : cnt(0), sz(0), head(NULL), tail(NULL) {}
+ XioInSeq(const XioInSeq& seq) {
+ cnt = seq.cnt;
+ sz = seq.sz;
+ head = seq.head;
+ tail = seq.tail;
+ }
+
+ int count() { return cnt; }
+
+ int size() { return sz; }
+
+ bool p() { return !!head; }
+
+ void set_count(int _cnt) { cnt = _cnt; }
+
+ void append(struct xio_msg* msg) {
+ msg->user_context = NULL;
+ if (!head) {
+ head = tail = msg;
+ } else {
+ tail->user_context = msg;
+ tail = msg;
+ }
+ ++sz;
+ --cnt;
+ }
+
+ struct xio_msg* begin() { return head; }
+
+ struct xio_msg* end() { return NULL; }
+
+ void next(struct xio_msg** msg) {
+ *msg = static_cast<struct xio_msg *>((*msg)->user_context);
+ }
+
+ struct xio_msg* dequeue() {
+ struct xio_msg* msgs = head;
+ clear();
+ return msgs;
+ }
+
+ void clear() {
+ head = tail = NULL;
+ cnt = 0;
+ sz = 0;
+ }
+};
+
+#endif /* XIO_IN_SEQ_H */
diff --git a/src/msg/xio/XioMessenger.cc b/src/msg/xio/XioMessenger.cc
new file mode 100644
index 00000000..dec7d0c7
--- /dev/null
+++ b/src/msg/xio/XioMessenger.cc
@@ -0,0 +1,1136 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <arpa/inet.h>
+#include <boost/lexical_cast.hpp>
+#include <set>
+#include <stdlib.h>
+#include <memory>
+
+#include "XioMsg.h"
+#include "XioMessenger.h"
+#include "common/address_helper.h"
+#include "common/code_environment.h"
+#include "messages/MNop.h"
+
+#define dout_subsys ceph_subsys_xio
+#undef dout_prefix
+#define dout_prefix *_dout << "xio."
+
+Mutex mtx("XioMessenger Package Lock");
+std::atomic<bool> initialized = { false };
+
+std::atomic<unsigned> XioMessenger::nInstances = { 0 };
+
+struct xio_mempool *xio_msgr_noreg_mpool;
+
+static struct xio_session_ops xio_msgr_ops;
+
+/* Accelio API callouts */
+
+namespace xio_log
+{
+typedef pair<const char*, int> level_pair;
+static const level_pair LEVELS[] = {
+ make_pair("fatal", 0),
+ make_pair("error", 0),
+ make_pair("warn", 1),
+ make_pair("info", 1),
+ make_pair("debug", 2),
+ make_pair("trace", 20)
+};
+
+static CephContext *context;
+
+int get_level()
+{
+ int level = 0;
+ for (size_t i = 0; i < sizeof(LEVELS); i++) {
+ if (!ldlog_p1(context, dout_subsys, LEVELS[i].second))
+ break;
+ level++;
+ }
+ return level;
+}
+
+void log_dout(const char *file, unsigned line,
+ const char *function, unsigned level,
+ const char *fmt, ...)
+{
+ char buffer[2048];
+ va_list args;
+ va_start(args, fmt);
+ int n = vsnprintf(buffer, sizeof(buffer), fmt, args);
+ va_end(args);
+
+ if (n > 0) {
+ const char *short_file = strrchr(file, '/');
+ short_file = (short_file == NULL) ? file : short_file + 1;
+
+ const level_pair &lvl = LEVELS[level];
+ ldout(context, lvl.second) << '[' << lvl.first << "] "
+ << short_file << ':' << line << ' '
+ << function << " - " << buffer << dendl;
+ }
+}
+}
+
+static int on_session_event(struct xio_session *session,
+ struct xio_session_event_data *event_data,
+ void *cb_user_context)
+{
+ XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
+ CephContext *cct = msgr->cct;
+
+ ldout(cct,4) << "session event: " << xio_session_event_str(event_data->event)
+ << ". reason: " << xio_strerror(event_data->reason) << dendl;
+
+ return msgr->session_event(session, event_data, cb_user_context);
+}
+
+static int on_new_session(struct xio_session *session,
+ struct xio_new_session_req *req,
+ void *cb_user_context)
+{
+ XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
+ CephContext *cct = msgr->cct;
+
+ ldout(cct,4) << "new session " << session
+ << " user_context " << cb_user_context << dendl;
+
+ return (msgr->new_session(session, req, cb_user_context));
+}
+
+static int on_msg(struct xio_session *session,
+ struct xio_msg *req,
+ int more_in_batch,
+ void *cb_user_context)
+{
+ XioConnection* xcon __attribute__((unused)) =
+ static_cast<XioConnection*>(cb_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,25) << "on_msg session " << session << " xcon " << xcon << dendl;
+
+ if (unlikely(XioPool::trace_mempool)) {
+ static uint32_t nreqs;
+ if (unlikely((++nreqs % 65536) == 0)) {
+ xp_stats.dump(__func__, nreqs);
+ }
+ }
+
+ return xcon->on_msg(session, req, more_in_batch,
+ cb_user_context);
+}
+
+static int on_ow_msg_send_complete(struct xio_session *session,
+ struct xio_msg *msg,
+ void *conn_user_context)
+{
+ XioConnection *xcon =
+ static_cast<XioConnection*>(conn_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,25) << "msg delivered session: " << session
+ << " msg: " << msg << " conn_user_context "
+ << conn_user_context << dendl;
+
+ return xcon->on_ow_msg_send_complete(session, msg, conn_user_context);
+}
+
+static int on_msg_error(struct xio_session *session,
+ enum xio_status error,
+ enum xio_msg_direction dir,
+ struct xio_msg *msg,
+ void *conn_user_context)
+{
+ /* XIO promises to flush back undelivered messages */
+ XioConnection *xcon =
+ static_cast<XioConnection*>(conn_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,4) << "msg error session: " << session
+ << " error: " << xio_strerror(error) << " msg: " << msg
+ << " conn_user_context " << conn_user_context << dendl;
+
+ return xcon->on_msg_error(session, error, msg, conn_user_context);
+}
+
+static int on_cancel(struct xio_session *session,
+ struct xio_msg *msg,
+ enum xio_status result,
+ void *conn_user_context)
+{
+ XioConnection* xcon __attribute__((unused)) =
+ static_cast<XioConnection*>(conn_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,25) << "on cancel: session: " << session << " msg: " << msg
+ << " conn_user_context " << conn_user_context << dendl;
+
+ return 0;
+}
+
+static int on_cancel_request(struct xio_session *session,
+ struct xio_msg *msg,
+ void *conn_user_context)
+{
+ XioConnection* xcon __attribute__((unused)) =
+ static_cast<XioConnection*>(conn_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,25) << "on cancel request: session: " << session << " msg: " << msg
+ << " conn_user_context " << conn_user_context << dendl;
+
+ return 0;
+}
+
+/* free functions */
+static string xio_uri_from_entity(const string &type,
+ const entity_addr_t& addr, bool want_port)
+{
+ const char *host = NULL;
+ char addr_buf[129];
+ string xio_uri;
+
+ switch(addr.get_family()) {
+ case AF_INET:
+ host = inet_ntop(AF_INET, &addr.in4_addr().sin_addr, addr_buf,
+ INET_ADDRSTRLEN);
+ break;
+ case AF_INET6:
+ host = inet_ntop(AF_INET6, &addr.in6_addr().sin6_addr, addr_buf,
+ INET6_ADDRSTRLEN);
+ break;
+ default:
+ abort();
+ break;
+ };
+
+ if (type == "rdma" || type == "tcp")
+ xio_uri = type + "://";
+ else
+ xio_uri = "rdma://";
+
+ /* The following can only succeed if the host is rdma-capable */
+ xio_uri += host;
+ if (want_port) {
+ xio_uri += ":";
+ xio_uri += boost::lexical_cast<std::string>(addr.get_port());
+ }
+
+ return xio_uri;
+} /* xio_uri_from_entity */
+
+void XioInit::package_init(CephContext *cct) {
+ if (! initialized) {
+
+ mtx.Lock();
+ if (! initialized) {
+
+ xio_init();
+
+ // claim a reference to the first context we see
+ xio_log::context = cct->get();
+
+ int xopt;
+ xopt = xio_log::get_level();
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_LEVEL,
+ &xopt, sizeof(xopt));
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_FN,
+ (const void*)xio_log::log_dout, sizeof(xio_log_fn));
+
+ xopt = 1;
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_DISABLE_HUGETBL,
+ &xopt, sizeof(xopt));
+
+ if (g_code_env == CODE_ENVIRONMENT_DAEMON) {
+ xopt = 1;
+ xio_set_opt(NULL, XIO_OPTLEVEL_RDMA, XIO_OPTNAME_ENABLE_FORK_INIT,
+ &xopt, sizeof(xopt));
+ }
+
+ xopt = XIO_MSGR_IOVLEN;
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_IN_IOVLEN,
+ &xopt, sizeof(xopt));
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN,
+ &xopt, sizeof(xopt));
+
+ /* enable flow-control */
+ xopt = 1;
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_ENABLE_FLOW_CONTROL,
+ &xopt, sizeof(xopt));
+
+ /* and set threshold for buffer callouts */
+ xopt = max(cct->_conf->xio_max_send_inline, 512);
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_DATA,
+ &xopt, sizeof(xopt));
+
+ xopt = XioMsgHdr::get_max_encoded_length();
+ ldout(cct,2) << "setting accelio max header size " << xopt << dendl;
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_HEADER,
+ &xopt, sizeof(xopt));
+
+ size_t queue_depth = cct->_conf->xio_queue_depth;
+ struct xio_mempool_config mempool_config = {
+ 6,
+ {
+ {1024, 0, queue_depth, 262144},
+ {4096, 0, queue_depth, 262144},
+ {16384, 0, queue_depth, 262144},
+ {65536, 0, 128, 65536},
+ {262144, 0, 32, 16384},
+ {1048576, 0, 8, 8192}
+ }
+ };
+ xio_set_opt(NULL,
+ XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_CONFIG_MEMPOOL,
+ &mempool_config, sizeof(mempool_config));
+
+ /* and unregisterd one */
+ #define XMSG_MEMPOOL_QUANTUM 4096
+
+ xio_msgr_noreg_mpool =
+ xio_mempool_create(-1 /* nodeid */,
+ XIO_MEMPOOL_FLAG_REGULAR_PAGES_ALLOC);
+
+ (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 64,
+ cct->_conf->xio_mp_min,
+ cct->_conf->xio_mp_max_64,
+ XMSG_MEMPOOL_QUANTUM, 0);
+ (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 256,
+ cct->_conf->xio_mp_min,
+ cct->_conf->xio_mp_max_256,
+ XMSG_MEMPOOL_QUANTUM, 0);
+ (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 1024,
+ cct->_conf->xio_mp_min,
+ cct->_conf->xio_mp_max_1k,
+ XMSG_MEMPOOL_QUANTUM, 0);
+ (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, getpagesize(),
+ cct->_conf->xio_mp_min,
+ cct->_conf->xio_mp_max_page,
+ XMSG_MEMPOOL_QUANTUM, 0);
+
+ /* initialize ops singleton */
+ xio_msgr_ops.on_session_event = on_session_event;
+ xio_msgr_ops.on_new_session = on_new_session;
+ xio_msgr_ops.on_session_established = NULL;
+ xio_msgr_ops.on_msg = on_msg;
+ xio_msgr_ops.on_ow_msg_send_complete = on_ow_msg_send_complete;
+ xio_msgr_ops.on_msg_error = on_msg_error;
+ xio_msgr_ops.on_cancel = on_cancel;
+ xio_msgr_ops.on_cancel_request = on_cancel_request;
+
+ /* mark initialized */
+ initialized = true;
+ }
+ mtx.Unlock();
+ }
+ }
+
+/* XioMessenger */
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) {
+ return *_dout << "-- " << msgr->get_myaddr_legacy() << " ";
+}
+
+XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
+ string mname, uint64_t _nonce,
+ uint64_t cflags, DispatchStrategy *ds)
+ : SimplePolicyMessenger(cct, name, mname, _nonce),
+ XioInit(cct),
+ portals(this, get_nportals(cflags), get_nconns_per_portal(cflags)),
+ dispatch_strategy(ds),
+ loop_con(new XioLoopbackConnection(this)),
+ special_handling(0),
+ sh_mtx("XioMessenger session mutex"),
+ sh_cond(),
+ need_addr(true),
+ did_bind(false),
+ nonce(_nonce)
+{
+
+ if (cct->_conf->xio_trace_xcon)
+ magic |= MSG_MAGIC_TRACE_XCON;
+
+ XioPool::trace_mempool = (cct->_conf->xio_trace_mempool);
+ XioPool::trace_msgcnt = (cct->_conf->xio_trace_msgcnt);
+
+ dispatch_strategy->set_messenger(this);
+
+ /* update class instance count */
+ nInstances++;
+
+ loop_con->set_features(CEPH_FEATURES_ALL);
+
+ ldout(cct,2) << "Create msgr: " << this << " instance: "
+ << nInstances << " type: " << name.type_str()
+ << " subtype: " << mname << " nportals: " << get_nportals(cflags)
+ << " nconns_per_portal: " << get_nconns_per_portal(cflags)
+ << dendl;
+
+} /* ctor */
+
+int XioMessenger::pool_hint(uint32_t dsize) {
+ if (dsize > 1024*1024)
+ return 0;
+
+ /* if dsize is already present, returns -EEXIST */
+ return xio_mempool_add_slab(xio_msgr_noreg_mpool, dsize, 0,
+ cct->_conf->xio_mp_max_hint,
+ XMSG_MEMPOOL_QUANTUM, 0);
+}
+
+int XioMessenger::get_nconns_per_portal(uint64_t cflags)
+{
+ const int XIO_DEFAULT_NUM_CONNS_PER_PORTAL = 8;
+ int nconns = XIO_DEFAULT_NUM_CONNS_PER_PORTAL;
+
+ if (cflags & Messenger::HAS_MANY_CONNECTIONS)
+ nconns = max(cct->_conf->xio_max_conns_per_portal, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
+ else if (cflags & Messenger::HEARTBEAT)
+ nconns = max(cct->_conf->osd_heartbeat_min_peers * 4, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
+
+ return nconns;
+}
+
+int XioMessenger::get_nportals(uint64_t cflags)
+{
+ int nportals = 1;
+
+ if (cflags & Messenger::HAS_HEAVY_TRAFFIC)
+ nportals = max(cct->_conf->xio_portal_threads, 1);
+
+ return nportals;
+}
+
+void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
+{
+ // be careful here: multiple threads may block here, and readers of
+ // my_inst.addr do NOT hold any lock.
+
+ // this always goes from true -> false under the protection of the
+ // mutex. if it is already false, we need not retake the mutex at
+ // all.
+ if (!need_addr)
+ return;
+
+ sh_mtx.Lock();
+ if (need_addr) {
+ entity_addr_t t = peer_addr_for_me;
+ t.set_port(my_inst.addr.get_port());
+ my_inst.addr.set_sockaddr(t.get_sockaddr());
+ ldout(cct,2) << "learned my addr " << my_inst.addr << dendl;
+ need_addr = false;
+ // init_local_connection();
+ }
+ sh_mtx.Unlock();
+
+}
+
+int XioMessenger::new_session(struct xio_session *session,
+ struct xio_new_session_req *req,
+ void *cb_user_context)
+{
+ if (shutdown_called) {
+ return xio_reject(
+ session, XIO_E_SESSION_REFUSED, NULL /* udata */, 0 /* udata len */);
+ }
+ int code = portals.accept(session, req, cb_user_context);
+ if (! code)
+ nsessions++;
+ return code;
+} /* new_session */
+
+int XioMessenger::session_event(struct xio_session *session,
+ struct xio_session_event_data *event_data,
+ void *cb_user_context)
+{
+ XioConnection *xcon;
+
+ switch (event_data->event) {
+ case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT:
+ {
+ struct xio_connection *conn = event_data->conn;
+ struct xio_connection_attr xcona;
+ entity_addr_t peer_addr_for_me, paddr;
+
+ xcon = static_cast<XioConnection*>(event_data->conn_user_context);
+
+ ldout(cct,2) << "connection established " << event_data->conn
+ << " session " << session << " xcon " << xcon << dendl;
+
+ (void) xio_query_connection(conn, &xcona,
+ XIO_CONNECTION_ATTR_LOCAL_ADDR|
+ XIO_CONNECTION_ATTR_PEER_ADDR);
+ peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr);
+ paddr.set_sockaddr((struct sockaddr *)&xcona.peer_addr);
+ //set_myaddr(peer_addr_for_me);
+ learned_addr(peer_addr_for_me);
+ ldout(cct,2) << "client: connected from " << peer_addr_for_me << " to " << paddr << dendl;
+
+ /* notify hook */
+ this->ms_deliver_handle_connect(xcon);
+ this->ms_deliver_handle_fast_connect(xcon);
+ }
+ break;
+
+ case XIO_SESSION_NEW_CONNECTION_EVENT:
+ {
+ struct xio_connection *conn = event_data->conn;
+ struct xio_connection_attr xcona;
+ entity_inst_t s_inst;
+ entity_addr_t peer_addr_for_me;
+
+ (void) xio_query_connection(conn, &xcona,
+ XIO_CONNECTION_ATTR_CTX|
+ XIO_CONNECTION_ATTR_PEER_ADDR|
+ XIO_CONNECTION_ATTR_LOCAL_ADDR);
+ /* XXX assumes RDMA */
+ s_inst.addr.set_sockaddr((struct sockaddr *)&xcona.peer_addr);
+ peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr);
+
+ xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst);
+ xcon->session = session;
+
+ struct xio_context_attr xctxa;
+ (void) xio_query_context(xcona.ctx, &xctxa, XIO_CONTEXT_ATTR_USER_CTX);
+
+ xcon->conn = conn;
+ xcon->portal = static_cast<XioPortal*>(xctxa.user_context);
+ ceph_assert(xcon->portal);
+
+ xcona.user_context = xcon;
+ (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX);
+
+ xcon->connected = true;
+
+ /* sentinel ref */
+ xcon->get(); /* xcon->nref == 1 */
+ conns_sp.lock();
+ conns_list.push_back(*xcon);
+ /* XXX we can't put xcon in conns_entity_map becase we don't yet know
+ * it's peer address */
+ conns_sp.unlock();
+
+ /* XXXX pre-merge of session startup negotiation ONLY! */
+ xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
+
+ ldout(cct,2) << "New connection session " << session
+ << " xcon " << xcon << " on msgr: " << this << " portal: " << xcon->portal << dendl;
+ ldout(cct,2) << "Server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl;
+ }
+ break;
+ case XIO_SESSION_CONNECTION_ERROR_EVENT:
+ case XIO_SESSION_CONNECTION_CLOSED_EVENT: /* orderly discon */
+ case XIO_SESSION_CONNECTION_DISCONNECTED_EVENT: /* unexpected discon */
+ case XIO_SESSION_CONNECTION_REFUSED_EVENT:
+ xcon = static_cast<XioConnection*>(event_data->conn_user_context);
+ ldout(cct,2) << xio_session_event_str(event_data->event)
+ << " xcon " << xcon << " session " << session << dendl;
+ if (likely(!!xcon)) {
+ unregister_xcon(xcon);
+ xcon->on_disconnect_event();
+ }
+ break;
+ case XIO_SESSION_CONNECTION_TEARDOWN_EVENT:
+ xcon = static_cast<XioConnection*>(event_data->conn_user_context);
+ ldout(cct,2) << xio_session_event_str(event_data->event)
+ << " xcon " << xcon << " session " << session << dendl;
+ /*
+ * There are flows where Accelio sends teardown event without going
+ * through disconnect event. so we make sure we cleaned the connection.
+ */
+ unregister_xcon(xcon);
+ xcon->on_teardown_event();
+ break;
+ case XIO_SESSION_TEARDOWN_EVENT:
+ ldout(cct,2) << xio_session_event_str(event_data->event)
+ << " session " << session << dendl;
+ if (unlikely(XioPool::trace_mempool)) {
+ xp_stats.dump("xio session dtor", reinterpret_cast<uint64_t>(session));
+ }
+ xio_session_destroy(session);
+ if (--nsessions == 0) {
+ Mutex::Locker lck(sh_mtx);
+ if (nsessions == 0)
+ sh_cond.Signal();
+ }
+ break;
+ default:
+ break;
+ };
+
+ return 0;
+}
+
+enum bl_type
+{
+ BUFFER_PAYLOAD,
+ BUFFER_MIDDLE,
+ BUFFER_DATA
+};
+
+#define MAX_XIO_BUF_SIZE 1044480
+
+static inline int
+xio_count_buffers(const buffer::list& bl, int& req_size, int& msg_off, int& req_off)
+{
+
+ const std::list<buffer::ptr>& buffers = bl.buffers();
+ list<bufferptr>::const_iterator pb;
+ size_t size, off;
+ int result;
+ int first = 1;
+
+ off = size = 0;
+ result = 0;
+ for (;;) {
+ if (off >= size) {
+ if (first) pb = buffers.begin(); else ++pb;
+ if (pb == buffers.end()) {
+ break;
+ }
+ off = 0;
+ size = pb->length();
+ first = 0;
+ }
+ size_t count = size - off;
+ if (!count) continue;
+ if (req_size + count > MAX_XIO_BUF_SIZE) {
+ count = MAX_XIO_BUF_SIZE - req_size;
+ }
+
+ ++result;
+
+ /* advance iov and perhaps request */
+
+ off += count;
+ req_size += count;
+ ++msg_off;
+ if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
+ ++req_off;
+ msg_off = 0;
+ req_size = 0;
+ }
+ }
+
+ return result;
+}
+
+static inline void
+xio_place_buffers(const buffer::list& bl, XioMsg *xmsg, struct xio_msg*& req,
+ struct xio_iovec_ex*& msg_iov, int& req_size,
+ int ex_cnt, int& msg_off, int& req_off, bl_type type)
+{
+
+ const std::list<buffer::ptr>& buffers = bl.buffers();
+ list<bufferptr>::const_iterator pb;
+ struct xio_iovec_ex* iov;
+ size_t size, off;
+ const char *data = NULL;
+ int first = 1;
+
+ off = size = 0;
+ for (;;) {
+ if (off >= size) {
+ if (first) pb = buffers.begin(); else ++pb;
+ if (pb == buffers.end()) {
+ break;
+ }
+ off = 0;
+ size = pb->length();
+ data = pb->c_str(); // is c_str() efficient?
+ first = 0;
+ }
+ size_t count = size - off;
+ if (!count) continue;
+ if (req_size + count > MAX_XIO_BUF_SIZE) {
+ count = MAX_XIO_BUF_SIZE - req_size;
+ }
+
+ /* assign buffer */
+ iov = &msg_iov[msg_off];
+ iov->iov_base = (void *) (&data[off]);
+ iov->iov_len = count;
+
+ switch (type) {
+ case BUFFER_DATA:
+ //break;
+ default:
+ {
+ struct xio_reg_mem *mp = get_xio_mp(*pb);
+ iov->mr = (mp) ? mp->mr : NULL;
+ }
+ break;
+ }
+
+ /* advance iov(s) */
+
+ off += count;
+ req_size += count;
+ ++msg_off;
+
+ /* next request if necessary */
+
+ if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
+ /* finish this request */
+ req->out.pdata_iov.nents = msg_off;
+ /* advance to next, and write in it if it's not the last one. */
+ if (++req_off >= ex_cnt) {
+ req = 0; /* poison. trap if we try to use it. */
+ msg_iov = NULL;
+ } else {
+ req = &xmsg->req_arr[req_off].msg;
+ msg_iov = req->out.pdata_iov.sglist;
+ }
+ msg_off = 0;
+ req_size = 0;
+ }
+ }
+}
+
+int XioMessenger::bind(const entity_addr_t& addr)
+{
+ if (addr.is_blank_ip()) {
+ lderr(cct) << "ERROR: need rdma ip for remote use! " << dendl;
+ cout << "Error: xio bind failed. public/cluster ip not specified" << std::endl;
+ return -1;
+ }
+
+ entity_addr_t shift_addr = addr;
+ string base_uri = xio_uri_from_entity(cct->_conf->xio_transport_type,
+ shift_addr, false /* want_port */);
+ ldout(cct,4) << "XioMessenger " << this << " bind: xio_uri "
+ << base_uri << ':' << shift_addr.get_port() << dendl;
+
+ uint16_t port0;
+ int r = portals.bind(&xio_msgr_ops, base_uri, shift_addr.get_port(), &port0);
+ if (r == 0) {
+ shift_addr.set_port(port0);
+ shift_addr.nonce = nonce;
+ set_myaddr(shift_addr);
+ need_addr = false;
+ did_bind = true;
+ }
+ return r;
+} /* bind */
+
+int XioMessenger::rebind(const set<int>& avoid_ports)
+{
+ ldout(cct,4) << "XioMessenger " << this << " rebind attempt" << dendl;
+ return 0;
+} /* rebind */
+
+int XioMessenger::start()
+{
+ portals.start();
+ dispatch_strategy->start();
+ if (!did_bind) {
+ my_inst.addr.nonce = nonce;
+ }
+ started = true;
+ return 0;
+}
+
+void XioMessenger::wait()
+{
+ portals.join();
+ dispatch_strategy->wait();
+} /* wait */
+
+int XioMessenger::_send_message(Message *m, const entity_inst_t& dest)
+{
+ ConnectionRef conn = get_connection(dest);
+ if (conn)
+ return _send_message(m, &(*conn));
+ else
+ return EINVAL;
+} /* send_message(Message *, const entity_inst_t&) */
+
+static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
+ int ex_cnt)
+{
+ struct xio_reg_mem mp_mem;
+ int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioMsg), &mp_mem);
+ if (!!e)
+ return NULL;
+ XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr);
+ ceph_assert(!!xmsg);
+ new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL);
+ return xmsg;
+}
+
+XioCommand* pool_alloc_xio_command(XioConnection *xcon)
+{
+ struct xio_reg_mem mp_mem;
+ int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioCommand), &mp_mem);
+ if (!!e)
+ return NULL;
+ XioCommand *xcmd = reinterpret_cast<XioCommand*>(mp_mem.addr);
+ ceph_assert(!!xcmd);
+ new (xcmd) XioCommand(xcon, mp_mem);
+ return xcmd;
+}
+
+int XioMessenger::_send_message(Message *m, Connection *con)
+{
+ if (con == loop_con.get() /* intrusive_ptr get() */) {
+ m->set_connection(con);
+ m->set_src(get_myinst().name);
+ m->set_seq(loop_con->next_seq());
+ ds_dispatch(m);
+ return 0;
+ }
+
+ XioConnection *xcon = static_cast<XioConnection*>(con);
+
+ /* If con is not in READY state, we have to enforce policy */
+ if (xcon->cstate.session_state.read() != XioConnection::UP) {
+ std::lock_guard<decltype(xcon->sp) lg(xcon->sp);
+
+ if (xcon->cstate.session_state.read() != XioConnection::UP) {
+ xcon->outgoing.mqueue.push_back(*m);
+ return 0;
+ }
+ }
+
+ return _send_message_impl(m, xcon);
+} /* send_message(Message* m, Connection *con) */
+
+int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon)
+{
+ int code = 0;
+
+ Mutex::Locker l(xcon->lock);
+ if (unlikely(XioPool::trace_mempool)) {
+ static uint32_t nreqs;
+ if (unlikely((++nreqs % 65536) == 0)) {
+ xp_stats.dump(__func__, nreqs);
+ }
+ }
+
+ m->set_seq(xcon->state.next_out_seq());
+ m->set_magic(magic); // trace flags and special handling
+
+ m->encode(xcon->get_features(), this->crcflags);
+
+ buffer::list &payload = m->get_payload();
+ buffer::list &middle = m->get_middle();
+ buffer::list &data = m->get_data();
+
+ int msg_off = 0;
+ int req_off = 0;
+ int req_size = 0;
+ int nbuffers =
+ xio_count_buffers(payload, req_size, msg_off, req_off) +
+ xio_count_buffers(middle, req_size, msg_off, req_off) +
+ xio_count_buffers(data, req_size, msg_off, req_off);
+
+ int ex_cnt = req_off;
+ if (msg_off == 0 && ex_cnt > 0) {
+ // no buffers for last msg
+ ldout(cct,10) << "msg_off 0, ex_cnt " << ex_cnt << " -> " << ex_cnt-1 << dendl;
+ ex_cnt--;
+ }
+
+ /* get an XioMsg frame */
+ XioMsg *xmsg = pool_alloc_xio_msg(m, xcon, ex_cnt);
+ if (! xmsg) {
+ /* could happen if Accelio has been shutdown */
+ return ENOMEM;
+ }
+
+ ldout(cct,4) << __func__ << " " << m << " new XioMsg " << xmsg
+ << " tag " << (int)xmsg->hdr.tag
+ << " req_0 " << xmsg->get_xio_msg() << " msg type " << m->get_type()
+ << " features: " << xcon->get_features()
+ << " conn " << xcon->conn << " sess " << xcon->session << dendl;
+
+ if (magic & (MSG_MAGIC_XIO)) {
+
+ /* XXXX verify */
+ switch (m->get_type()) {
+ case 43:
+ // case 15:
+ ldout(cct,4) << __func__ << " stop 43 " << m->get_type() << " " << *m << dendl;
+ buffer::list &payload = m->get_payload();
+ ldout(cct,4) << __func__ << " payload dump:" << dendl;
+ payload.hexdump(cout);
+ }
+ }
+
+ struct xio_msg *req = xmsg->get_xio_msg();
+ struct xio_iovec_ex *msg_iov = req->out.pdata_iov.sglist;
+
+ if (magic & (MSG_MAGIC_XIO)) {
+ ldout(cct,4) << "payload: " << payload.buffers().size() <<
+ " middle: " << middle.buffers().size() <<
+ " data: " << data.buffers().size() <<
+ dendl;
+ }
+
+ if (unlikely(ex_cnt > 0)) {
+ ldout(cct,4) << __func__ << " buffer cnt > XIO_MSGR_IOVLEN (" <<
+ ((XIO_MSGR_IOVLEN-1) + nbuffers) << ")" << dendl;
+ }
+
+ /* do the invariant part */
+ msg_off = 0;
+ req_off = -1; /* most often, not used */
+ req_size = 0;
+
+ xio_place_buffers(payload, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
+ req_off, BUFFER_PAYLOAD);
+
+ xio_place_buffers(middle, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
+ req_off, BUFFER_MIDDLE);
+
+ xio_place_buffers(data, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
+ req_off, BUFFER_DATA);
+ ldout(cct,10) << "ex_cnt " << ex_cnt << ", req_off " << req_off
+ << ", msg_cnt " << xmsg->get_msg_count() << dendl;
+
+ /* finalize request */
+ if (msg_off)
+ req->out.pdata_iov.nents = msg_off;
+
+ /* fixup first msg */
+ req = xmsg->get_xio_msg();
+
+ const std::list<buffer::ptr>& header = xmsg->hdr.get_bl().buffers();
+ ceph_assert(header.size() == 1); /* XXX */
+ list<bufferptr>::const_iterator pb = header.begin();
+ req->out.header.iov_base = (char*) pb->c_str();
+ req->out.header.iov_len = pb->length();
+
+ /* deliver via xio, preserve ordering */
+ if (xmsg->get_msg_count() > 1) {
+ struct xio_msg *head = xmsg->get_xio_msg();
+ struct xio_msg *tail = head;
+ for (req_off = 0; ((unsigned) req_off) < xmsg->get_msg_count()-1; ++req_off) {
+ req = &xmsg->req_arr[req_off].msg;
+assert(!req->in.pdata_iov.nents);
+assert(req->out.pdata_iov.nents || !nbuffers);
+ tail->next = req;
+ tail = req;
+ }
+ tail->next = NULL;
+ }
+ xmsg->trace = m->trace;
+ m->trace.event("xio portal enqueue for send");
+ m->trace.keyval("xio message segments", xmsg->hdr.msg_cnt);
+ xcon->portal->enqueue_for_send(xcon, xmsg);
+
+ return code;
+} /* send_message(Message *, Connection *) */
+
+int XioMessenger::shutdown()
+{
+ shutdown_called = true;
+ conns_sp.lock();
+ XioConnection::ConnList::iterator iter;
+ iter = conns_list.begin();
+ for (iter = conns_list.begin(); iter != conns_list.end(); ++iter) {
+ (void) iter->disconnect(); // XXX mark down?
+ }
+ conns_sp.unlock();
+ while(nsessions > 0) {
+ Mutex::Locker lck(sh_mtx);
+ if (nsessions > 0)
+ sh_cond.Wait(sh_mtx);
+ }
+ portals.shutdown();
+ dispatch_strategy->shutdown();
+ did_bind = false;
+ started = false;
+ return 0;
+} /* shutdown */
+
+ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest)
+{
+ if (shutdown_called)
+ return NULL;
+
+ const entity_inst_t& self_inst = get_myinst();
+ if ((&dest == &self_inst) ||
+ (dest == self_inst)) {
+ return get_loopback_connection();
+ }
+
+ conns_sp.lock();
+ XioConnection::EntitySet::iterator conn_iter =
+ conns_entity_map.find(dest, XioConnection::EntityComp());
+ if (conn_iter != conns_entity_map.end()) {
+ ConnectionRef cref = &(*conn_iter);
+ conns_sp.unlock();
+ return cref;
+ }
+ else {
+ conns_sp.unlock();
+ string xio_uri = xio_uri_from_entity(cct->_conf->xio_transport_type,
+ dest.addr, true /* want_port */);
+
+ ldout(cct,4) << "XioMessenger " << this << " get_connection: xio_uri "
+ << xio_uri << dendl;
+
+ /* XXX client session creation parameters */
+ struct xio_session_params params = {};
+ params.type = XIO_SESSION_CLIENT;
+ params.ses_ops = &xio_msgr_ops;
+ params.user_context = this;
+ params.uri = xio_uri.c_str();
+
+ XioConnection *xcon = new XioConnection(this, XioConnection::ACTIVE,
+ dest);
+
+ xcon->session = xio_session_create(&params);
+ if (! xcon->session) {
+ delete xcon;
+ return NULL;
+ }
+
+ /* this should cause callbacks with user context of conn, but
+ * we can always set it explicitly */
+ struct xio_connection_params xcp = {};
+ xcp.session = xcon->session;
+ xcp.ctx = xcon->portal->ctx;
+ xcp.conn_user_context = xcon;
+
+ xcon->conn = xio_connect(&xcp);
+ if (!xcon->conn) {
+ xio_session_destroy(xcon->session);
+ delete xcon;
+ return NULL;
+ }
+
+ nsessions++;
+ xcon->connected = true;
+
+ /* sentinel ref */
+ xcon->get(); /* xcon->nref == 1 */
+ conns_sp.lock();
+ conns_list.push_back(*xcon);
+ conns_entity_map.insert(*xcon);
+ conns_sp.unlock();
+
+ /* XXXX pre-merge of session startup negotiation ONLY! */
+ xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
+
+ ldout(cct,2) << "New connection xcon: " << xcon <<
+ " up_ready on session " << xcon->session <<
+ " on msgr: " << this << " portal: " << xcon->portal << dendl;
+
+ return xcon->get(); /* nref +1 */
+ }
+} /* get_connection */
+
+ConnectionRef XioMessenger::get_loopback_connection()
+{
+ return (loop_con.get());
+} /* get_loopback_connection */
+
+void XioMessenger::unregister_xcon(XioConnection *xcon)
+{
+ std::lock_guard<decltype(conns_sp)> lckr(conns_sp);
+
+ XioConnection::EntitySet::iterator conn_iter =
+ conns_entity_map.find(xcon->peer, XioConnection::EntityComp());
+ if (conn_iter != conns_entity_map.end()) {
+ XioConnection *xcon2 = &(*conn_iter);
+ if (xcon == xcon2) {
+ conns_entity_map.erase(conn_iter);
+ }
+ }
+
+ /* check if citer on conn_list */
+ if (xcon->conns_hook.is_linked()) {
+ /* now find xcon on conns_list and erase */
+ XioConnection::ConnList::iterator citer =
+ XioConnection::ConnList::s_iterator_to(*xcon);
+ conns_list.erase(citer);
+ }
+}
+
+void XioMessenger::mark_down(const entity_addr_t& addr)
+{
+ entity_inst_t inst(entity_name_t(), addr);
+ std::lock_guard<decltype(conns_sp)> lckr(conns_sp);
+ XioConnection::EntitySet::iterator conn_iter =
+ conns_entity_map.find(inst, XioConnection::EntityComp());
+ if (conn_iter != conns_entity_map.end()) {
+ (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
+ }
+} /* mark_down(const entity_addr_t& */
+
+void XioMessenger::mark_down(Connection* con)
+{
+ XioConnection *xcon = static_cast<XioConnection*>(con);
+ xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
+} /* mark_down(Connection*) */
+
+void XioMessenger::mark_down_all()
+{
+ std::lock_guard<decltype(conns_sp)> lckr(conns_sp);
+ XioConnection::EntitySet::iterator conn_iter;
+ for (conn_iter = conns_entity_map.begin(); conn_iter !=
+ conns_entity_map.begin(); ++conn_iter) {
+ (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
+ }
+} /* mark_down_all */
+
+static inline XioMarkDownHook* pool_alloc_markdown_hook(
+ XioConnection *xcon, Message *m)
+{
+ struct xio_reg_mem mp_mem;
+ int e = xio_mempool_alloc(xio_msgr_noreg_mpool,
+ sizeof(XioMarkDownHook), &mp_mem);
+ if (!!e)
+ return NULL;
+ XioMarkDownHook *hook = static_cast<XioMarkDownHook*>(mp_mem.addr);
+ new (hook) XioMarkDownHook(xcon, m, mp_mem);
+ return hook;
+}
+
+void XioMessenger::mark_down_on_empty(Connection* con)
+{
+ XioConnection *xcon = static_cast<XioConnection*>(con);
+ MNop* m = new MNop();
+ m->tag = XIO_NOP_TAG_MARKDOWN;
+ m->set_completion_hook(pool_alloc_markdown_hook(xcon, m));
+ // stall new messages
+ xcon->cstate.session_state = XioConnection::session_states::BARRIER;
+ (void) _send_message_impl(m, xcon);
+}
+
+void XioMessenger::mark_disposable(Connection *con)
+{
+ XioConnection *xcon = static_cast<XioConnection*>(con);
+ xcon->_mark_disposable(XioConnection::CState::OP_FLAG_NONE);
+}
+
+void XioMessenger::try_insert(XioConnection *xcon)
+{
+ std::lock_guard<decltype(conns_sp)> lckr(conns_sp);
+ /* already resident in conns_list */
+ conns_entity_map.insert(*xcon);
+}
+
+XioMessenger::~XioMessenger()
+{
+ delete dispatch_strategy;
+ nInstances--;
+} /* dtor */
diff --git a/src/msg/xio/XioMessenger.h b/src/msg/xio/XioMessenger.h
new file mode 100644
index 00000000..6f8a67ba
--- /dev/null
+++ b/src/msg/xio/XioMessenger.h
@@ -0,0 +1,176 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIO_MESSENGER_H
+#define XIO_MESSENGER_H
+
+#include "msg/SimplePolicyMessenger.h"
+
+#include <atomic>
+
+extern "C" {
+#include "libxio.h"
+}
+
+#include "XioConnection.h"
+#include "XioPortal.h"
+#include "QueueStrategy.h"
+#include "common/Thread.h"
+#include "common/Mutex.h"
+#include "include/spinlock.h"
+
+class XioInit {
+ /* safe to be called multiple times */
+ void package_init(CephContext *cct);
+
+protected:
+ explicit XioInit(CephContext *cct) {
+ this->package_init(cct);
+ }
+};
+
+class XioMessenger : public SimplePolicyMessenger, XioInit
+{
+private:
+ static std::atomic<uint64_t> nInstances = { 0 };
+ std::atomic<uint64_t> nsessions = { 0 };
+ std::atomic<bool> shutdown_called = { false };
+ ceph::spinlock conns_sp;
+ XioConnection::ConnList conns_list;
+ XioConnection::EntitySet conns_entity_map;
+ XioPortals portals;
+ DispatchStrategy* dispatch_strategy;
+ XioLoopbackConnectionRef loop_con;
+ uint32_t special_handling;
+ Mutex sh_mtx;
+ Cond sh_cond;
+ bool need_addr;
+ bool did_bind;
+
+ /// approximately unique ID set by the Constructor for use in entity_addr_t
+ uint64_t nonce;
+
+ friend class XioConnection;
+
+public:
+ XioMessenger(CephContext *cct, entity_name_t name,
+ string mname, uint64_t nonce,
+ uint64_t cflags = 0,
+ DispatchStrategy* ds = new QueueStrategy(1));
+
+ virtual ~XioMessenger();
+
+ XioPortal* get_portal() { return portals.get_next_portal(); }
+
+ virtual void set_myaddr(const entity_addr_t& a) {
+ Messenger::set_myaddr(a);
+ loop_con->set_peer_addr(a);
+ }
+
+ int _send_message(Message *m, const entity_inst_t &dest);
+ int _send_message(Message *m, Connection *con);
+ int _send_message_impl(Message *m, XioConnection *xcon);
+
+ uint32_t get_special_handling() { return special_handling; }
+ void set_special_handling(int n) { special_handling = n; }
+ int pool_hint(uint32_t size);
+ void try_insert(XioConnection *xcon);
+
+ /* xio hooks */
+ int new_session(struct xio_session *session,
+ struct xio_new_session_req *req,
+ void *cb_user_context);
+
+ int session_event(struct xio_session *session,
+ struct xio_session_event_data *event_data,
+ void *cb_user_context);
+
+ /* Messenger interface */
+ virtual bool set_addr_unknowns(const entity_addrvec_t &addr) override
+ { } /* XXX applicable? */
+ virtual void set_addr(const entity_addr_t &addr) override
+ { } /* XXX applicable? */
+
+ virtual int get_dispatch_queue_len()
+ { return 0; } /* XXX bogus? */
+
+ virtual double get_dispatch_queue_max_age(utime_t now)
+ { return 0; } /* XXX bogus? */
+
+ virtual void set_cluster_protocol(int p)
+ { }
+
+ virtual int bind(const entity_addr_t& addr);
+
+ virtual int rebind(const set<int>& avoid_ports);
+
+ virtual int start();
+
+ virtual void wait();
+
+ virtual int shutdown();
+
+ virtual int send_message(Message *m, const entity_inst_t &dest) {
+ return _send_message(m, dest);
+ }
+
+ virtual int lazy_send_message(Message *m, const entity_inst_t& dest)
+ { return EINVAL; }
+
+ virtual int lazy_send_message(Message *m, Connection *con)
+ { return EINVAL; }
+
+ virtual ConnectionRef get_connection(const entity_inst_t& dest);
+
+ // compat hack
+ ConnectionRef connect_to(
+ int type, const entity_addrvec_t& dest) override {
+ return get_connection(entity_inst_t(entity_name_t(type, -1),
+ dest.legacy_addr()));
+ }
+
+ virtual ConnectionRef get_loopback_connection();
+
+ void unregister_xcon(XioConnection *xcon);
+ virtual void mark_down(const entity_addr_t& a);
+ virtual void mark_down(Connection *con);
+ virtual void mark_down_all();
+ virtual void mark_down_on_empty(Connection *con);
+ virtual void mark_disposable(Connection *con);
+
+ void ds_dispatch(Message *m)
+ { dispatch_strategy->ds_dispatch(m); }
+
+ /**
+ * Tell the XioMessenger its full IP address.
+ *
+ * This is used by clients when connecting to other endpoints, and
+ * probably shouldn't be called by anybody else.
+ */
+ void learned_addr(const entity_addr_t& peer_addr_for_me);
+
+private:
+ int get_nconns_per_portal(uint64_t cflags);
+ int get_nportals(uint64_t cflags);
+
+protected:
+ virtual void ready()
+ { }
+};
+
+XioCommand* pool_alloc_xio_command(XioConnection *xcon);
+
+
+#endif /* XIO_MESSENGER_H */
diff --git a/src/msg/xio/XioMsg.cc b/src/msg/xio/XioMsg.cc
new file mode 100644
index 00000000..4b6a5d68
--- /dev/null
+++ b/src/msg/xio/XioMsg.cc
@@ -0,0 +1,51 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "XioMessenger.h"
+#include "XioConnection.h"
+#include "XioMsg.h"
+
+
+int XioDispatchHook::release_msgs()
+{
+ XioCompletion *xcmp;
+ int r = msg_seq.size();
+ cl_flag = true;
+
+ /* queue for release */
+ xcmp = static_cast<XioCompletion *>(rsp_pool.alloc(sizeof(XioCompletion)));
+ new (xcmp) XioCompletion(xcon, this);
+ xcmp->trace = m->trace;
+
+ /* merge with portal traffic */
+ xcon->portal->enqueue(xcon, xcmp);
+
+ ceph_assert(r);
+ return r;
+}
+
+/*static*/ size_t XioMsgHdr::get_max_encoded_length() {
+ ceph_msg_header _ceph_msg_header;
+ ceph_msg_footer _ceph_msg_footer;
+ XioMsgHdr hdr (_ceph_msg_header, _ceph_msg_footer, 0 /* features */);
+ const std::list<buffer::ptr>& hdr_buffers = hdr.get_bl().buffers();
+ ceph_assert(hdr_buffers.size() == 1); /* accelio header is small without scatter gather */
+ return hdr_buffers.begin()->length();
+}
+
+void XioMsg::print_debug(CephContext *cct, const char *tag) const {
+ print_xio_msg_hdr(cct, tag, hdr, get_xio_msg());
+ print_ceph_msg(cct, tag, m);
+}
diff --git a/src/msg/xio/XioMsg.h b/src/msg/xio/XioMsg.h
new file mode 100644
index 00000000..2f0c8490
--- /dev/null
+++ b/src/msg/xio/XioMsg.h
@@ -0,0 +1,446 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIO_MSG_H
+#define XIO_MSG_H
+
+#include <boost/intrusive/list.hpp>
+#include "msg/SimplePolicyMessenger.h"
+extern "C" {
+#include "libxio.h"
+}
+#include "XioConnection.h"
+#include "XioSubmit.h"
+#include "msg/msg_types.h"
+#include "XioPool.h"
+
+namespace bi = boost::intrusive;
+
+class XioMessenger;
+
+class XioMsgCnt
+{
+public:
+ ceph_le32 msg_cnt;
+ buffer::list bl;
+public:
+ explicit XioMsgCnt(buffer::ptr p)
+ {
+ bl.append(p);
+ buffer::list::iterator bl_iter = bl.begin();
+ decode(msg_cnt, bl_iter);
+ }
+};
+
+class XioMsgHdr
+{
+public:
+ char tag;
+ ceph_le32 msg_cnt;
+ ceph_le32 peer_type;
+ entity_addr_t addr; /* XXX hack! */
+ ceph_msg_header* hdr;
+ ceph_msg_footer* ftr;
+ uint64_t features;
+ buffer::list bl;
+public:
+ XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer& _ftr, uint64_t _features)
+ : tag(CEPH_MSGR_TAG_MSG), msg_cnt(init_le32(0)), hdr(&_hdr), ftr(&_ftr),
+ features(_features)
+ { }
+
+ XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer &_ftr, buffer::ptr p)
+ : hdr(&_hdr), ftr(&_ftr)
+ {
+ bl.append(p);
+ buffer::list::iterator bl_iter = bl.begin();
+ decode(bl_iter);
+ }
+
+ static size_t get_max_encoded_length();
+
+ const buffer::list& get_bl() { encode(bl); return bl; };
+
+ inline void encode_hdr(ceph::buffer::list& bl) const {
+ using ceph::encode;
+ encode(tag, bl);
+ encode(msg_cnt, bl);
+ encode(peer_type, bl);
+ encode(addr, bl, features);
+ encode(hdr->seq, bl);
+ encode(hdr->tid, bl);
+ encode(hdr->type, bl);
+ encode(hdr->priority, bl);
+ encode(hdr->version, bl);
+ encode(hdr->front_len, bl);
+ encode(hdr->middle_len, bl);
+ encode(hdr->data_len, bl);
+ encode(hdr->data_off, bl);
+ encode(hdr->src.type, bl);
+ encode(hdr->src.num, bl);
+ encode(hdr->compat_version, bl);
+ encode(hdr->crc, bl);
+ }
+
+ inline void encode_ftr(buffer::list& bl) const {
+ using ceph::encode;
+ encode(ftr->front_crc, bl);
+ encode(ftr->middle_crc, bl);
+ encode(ftr->data_crc, bl);
+ encode(ftr->sig, bl);
+ encode(ftr->flags, bl);
+ }
+
+ inline void encode(buffer::list& bl) const {
+ encode_hdr(bl);
+ encode_ftr(bl);
+ }
+
+ inline void decode_hdr(buffer::list::iterator& bl) {
+ using ceph::decode;
+ decode(tag, bl);
+ decode(msg_cnt, bl);
+ decode(peer_type, bl);
+ decode(addr, bl);
+ decode(hdr->seq, bl);
+ decode(hdr->tid, bl);
+ decode(hdr->type, bl);
+ decode(hdr->priority, bl);
+ decode(hdr->version, bl);
+ decode(hdr->front_len, bl);
+ decode(hdr->middle_len, bl);
+ decode(hdr->data_len, bl);
+ decode(hdr->data_off, bl);
+ decode(hdr->src.type, bl);
+ decode(hdr->src.num, bl);
+ decode(hdr->compat_version, bl);
+ decode(hdr->crc, bl);
+ }
+
+ inline void decode_ftr(buffer::list::iterator& bl) {
+ using ceph::decode;
+ decode(ftr->front_crc, bl);
+ decode(ftr->middle_crc, bl);
+ decode(ftr->data_crc, bl);
+ decode(ftr->sig, bl);
+ decode(ftr->flags, bl);
+ }
+
+ inline void decode(buffer::list::iterator& bl) {
+ decode_hdr(bl);
+ decode_ftr(bl);
+ }
+
+ virtual ~XioMsgHdr()
+ {}
+};
+
+WRITE_CLASS_ENCODER(XioMsgHdr);
+
+extern struct xio_mempool *xio_msgr_noreg_mpool;
+
+#define XIO_MSGR_IOVLEN 16
+
+struct xio_msg_ex
+{
+ struct xio_msg msg;
+ struct xio_iovec_ex iovs[XIO_MSGR_IOVLEN];
+
+ explicit xio_msg_ex(void* user_context) {
+ // go in structure order
+ msg.in.header.iov_len = 0;
+ msg.in.header.iov_base = NULL; /* XXX Accelio requires this currently */
+
+ msg.in.sgl_type = XIO_SGL_TYPE_IOV_PTR;
+ msg.in.pdata_iov.max_nents = XIO_MSGR_IOVLEN;
+ msg.in.pdata_iov.nents = 0;
+ msg.in.pdata_iov.sglist = iovs;
+
+ // minimal zero "out" side
+ msg.out.header.iov_len = 0;
+ msg.out.header.iov_base = NULL; /* XXX Accelio requires this currently,
+ * against spec */
+ // out (some members adjusted later)
+ msg.out.sgl_type = XIO_SGL_TYPE_IOV_PTR;
+ msg.out.pdata_iov.max_nents = XIO_MSGR_IOVLEN;
+ msg.out.pdata_iov.nents = 0;
+ msg.out.pdata_iov.sglist = iovs;
+
+ // minimal initialize an "out" msg
+ msg.request = NULL;
+ msg.type = XIO_MSG_TYPE_ONE_WAY;
+ // for now, we DO NEED receipts for every msg
+ msg.flags = 0;
+ msg.user_context = user_context;
+ msg.next = NULL;
+ // minimal zero "in" side
+ }
+};
+
+class XioSend : public XioSubmit
+{
+public:
+ virtual void print_debug(CephContext *cct, const char *tag) const {};
+ const struct xio_msg * get_xio_msg() const {return &req_0.msg;}
+ struct xio_msg * get_xio_msg() {return &req_0.msg;}
+ virtual size_t get_msg_count() const {return 1;}
+
+ XioSend(XioConnection *_xcon, struct xio_reg_mem& _mp, int _ex_cnt=0) :
+ XioSubmit(XioSubmit::OUTGOING_MSG, _xcon),
+ req_0(this), mp_this(_mp), nrefs(_ex_cnt+1)
+ {
+ xpool_inc_msgcnt();
+ xcon->get();
+ }
+
+ XioSend* get() { nrefs++; return this; };
+
+ void put(int n) {
+ int refs = nrefs -= n;
+ if (refs == 0) {
+ struct xio_reg_mem *mp = &this->mp_this;
+ this->~XioSend();
+ xpool_free(sizeof(XioSend), mp);
+ }
+ }
+
+ void put() {
+ put(1);
+ }
+
+ void put_msg_refs() {
+ put(get_msg_count());
+ }
+
+ virtual ~XioSend() {
+ xpool_dec_msgcnt();
+ xcon->put();
+ }
+
+private:
+ xio_msg_ex req_0;
+ struct xio_reg_mem mp_this;
+ std::atomic<unsigned> nrefs = { 0 };
+};
+
+class XioCommand : public XioSend
+{
+public:
+ XioCommand(XioConnection *_xcon, struct xio_reg_mem& _mp):XioSend(_xcon, _mp) {
+ }
+
+ buffer::list& get_bl_ref() { return bl; };
+
+private:
+ buffer::list bl;
+};
+
+struct XioMsg : public XioSend
+{
+public:
+ Message* m;
+ XioMsgHdr hdr;
+ xio_msg_ex* req_arr;
+
+public:
+ XioMsg(Message *_m, XioConnection *_xcon, struct xio_reg_mem& _mp,
+ int _ex_cnt, uint64_t _features) :
+ XioSend(_xcon, _mp, _ex_cnt),
+ m(_m), hdr(m->get_header(), m->get_footer(), _features),
+ req_arr(NULL)
+ {
+ const entity_inst_t &inst = xcon->get_messenger()->get_myinst();
+ hdr.peer_type = inst.name.type();
+ hdr.addr = xcon->get_messenger()->get_myaddr_legacy();
+ hdr.hdr->src.type = inst.name.type();
+ hdr.hdr->src.num = inst.name.num();
+ hdr.msg_cnt = _ex_cnt+1;
+
+ if (unlikely(_ex_cnt > 0)) {
+ alloc_trailers(_ex_cnt);
+ }
+ }
+
+ void print_debug(CephContext *cct, const char *tag) const override;
+ size_t get_msg_count() const override {
+ return hdr.msg_cnt;
+ }
+
+ void alloc_trailers(int cnt) {
+ req_arr = static_cast<xio_msg_ex*>(malloc(cnt * sizeof(xio_msg_ex)));
+ for (int ix = 0; ix < cnt; ++ix) {
+ xio_msg_ex* xreq = &(req_arr[ix]);
+ new (xreq) xio_msg_ex(this);
+ }
+ }
+
+ Message *get_message() { return m; }
+
+ ~XioMsg()
+ {
+ if (unlikely(!!req_arr)) {
+ for (unsigned int ix = 0; ix < get_msg_count()-1; ++ix) {
+ xio_msg_ex* xreq = &(req_arr[ix]);
+ xreq->~xio_msg_ex();
+ }
+ free(req_arr);
+ }
+
+ /* testing only! server's ready, resubmit request (not reached on
+ * PASSIVE/server side) */
+ if (unlikely(m->get_magic() & MSG_MAGIC_REDUPE)) {
+ if (likely(xcon->is_connected())) {
+ xcon->send_message(m);
+ } else {
+ /* dispose it */
+ m->put();
+ }
+ } else {
+ /* the normal case: done with message */
+ m->put();
+ }
+ }
+};
+
+class XioDispatchHook : public Message::CompletionHook
+{
+private:
+ XioConnection *xcon;
+ XioInSeq msg_seq;
+ XioPool rsp_pool;
+ std::atomic<unsigned> nrefs { 1 };
+ bool cl_flag;
+ friend class XioConnection;
+ friend class XioMessenger;
+public:
+ struct xio_reg_mem mp_this;
+
+ XioDispatchHook(XioConnection *_xcon, Message *_m, XioInSeq& _msg_seq,
+ struct xio_reg_mem& _mp) :
+ CompletionHook(_m),
+ xcon(_xcon->get()),
+ msg_seq(_msg_seq),
+ rsp_pool(xio_msgr_noreg_mpool),
+ cl_flag(false),
+ mp_this(_mp)
+ {
+ ++xcon->n_reqs; // atomicity by portal thread
+ xpool_inc_hookcnt();
+ }
+
+ virtual void finish(int r) {
+ this->put();
+ }
+
+ virtual void complete(int r) {
+ finish(r);
+ }
+
+ int release_msgs();
+
+ XioDispatchHook* get() {
+ nrefs++; return this;
+ }
+
+ void put(int n = 1) {
+ int refs = nrefs -= n;
+ if (refs == 0) {
+ /* in Marcus' new system, refs reaches 0 twice: once in
+ * Message lifecycle, and again after xio_release_msg.
+ */
+ if (!cl_flag && release_msgs())
+ return;
+ struct xio_reg_mem *mp = &this->mp_this;
+ this->~XioDispatchHook();
+ xpool_free(sizeof(XioDispatchHook), mp);
+ }
+ }
+
+ XioInSeq& get_seq() { return msg_seq; }
+
+ XioPool& get_pool() { return rsp_pool; }
+
+ void on_err_finalize(XioConnection *xcon) {
+ /* can't decode message; even with one-way must free
+ * xio_msg structures, and then xiopool
+ */
+ this->finish(-1);
+ }
+
+ ~XioDispatchHook() {
+ --xcon->n_reqs; // atomicity by portal thread
+ xpool_dec_hookcnt();
+ xcon->put();
+ }
+};
+
+/* A sender-side CompletionHook that relies on the on_msg_delivered
+ * to complete a pending mark down. */
+class XioMarkDownHook : public Message::CompletionHook
+{
+private:
+ XioConnection* xcon;
+
+public:
+ struct xio_reg_mem mp_this;
+
+ XioMarkDownHook(
+ XioConnection* _xcon, Message *_m, struct xio_reg_mem& _mp) :
+ CompletionHook(_m), xcon(_xcon->get()), mp_this(_mp)
+ { }
+
+ virtual void claim(int r) {}
+
+ virtual void finish(int r) {
+ xcon->put();
+ struct xio_reg_mem *mp = &this->mp_this;
+ this->~XioMarkDownHook();
+ xio_mempool_free(mp);
+ }
+
+ virtual void complete(int r) {
+ xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
+ finish(r);
+ }
+};
+
+struct XioCompletion : public XioSubmit
+{
+ XioDispatchHook *xhook;
+public:
+ XioCompletion(XioConnection *_xcon, XioDispatchHook *_xhook)
+ : XioSubmit(XioSubmit::INCOMING_MSG_RELEASE, _xcon /* not xcon! */),
+ xhook(_xhook->get()) {
+ // submit queue ref
+ xcon->get();
+ };
+
+ struct xio_msg* dequeue() {
+ return xhook->get_seq().dequeue();
+ }
+
+ XioDispatchHook* get_xhook() { return xhook; }
+
+ void finalize() {
+ xcon->put();
+ xhook->put();
+ }
+};
+
+void print_xio_msg_hdr(CephContext *cct, const char *tag,
+ const XioMsgHdr &hdr, const struct xio_msg *msg);
+void print_ceph_msg(CephContext *cct, const char *tag, Message *m);
+
+#endif /* XIO_MSG_H */
diff --git a/src/msg/xio/XioPool.cc b/src/msg/xio/XioPool.cc
new file mode 100644
index 00000000..5f0d77a2
--- /dev/null
+++ b/src/msg/xio/XioPool.cc
@@ -0,0 +1,41 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <iostream>
+#include "XioPool.h"
+
+XioPoolStats xp_stats;
+
+bool XioPool::trace_mempool = 0;
+bool XioPool::trace_msgcnt = 0;
+
+void XioPoolStats::dump(const char* tag, uint64_t serial)
+{
+ std::cout
+ << tag << " #" << serial << ": "
+ << "pool objs: "
+ << "64: " << ctr_set[SLAB_64].read() << " "
+ << "256: " << ctr_set[SLAB_256].read() << " "
+ << "1024: " << ctr_set[SLAB_1024].read() << " "
+ << "page: " << ctr_set[SLAB_PAGE].read() << " "
+ << "max: " << ctr_set[SLAB_MAX].read() << " "
+ << "overflow: " << ctr_set[SLAB_OVERFLOW].read() << " "
+ << std::endl;
+ std::cout
+ << tag << " #" << serial << ": "
+ << " msg objs: "
+ << "in: " << hook_cnt.read() << " "
+ << "out: " << msg_cnt.read() << " "
+ << std::endl;
+}
diff --git a/src/msg/xio/XioPool.h b/src/msg/xio/XioPool.h
new file mode 100644
index 00000000..07fa7311
--- /dev/null
+++ b/src/msg/xio/XioPool.h
@@ -0,0 +1,218 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef XIO_POOL_H
+#define XIO_POOL_H
+
+#include <atomic>
+#include <vector>
+#include <cstdlib>
+#include <cstring>
+#include <cstdint>
+
+extern "C" {
+#include "libxio.h"
+}
+
+#include "common/likely.h"
+
+static inline int xpool_alloc(struct xio_mempool *pool, uint64_t size,
+ struct xio_reg_mem* mp);
+static inline void xpool_free(uint64_t size, struct xio_reg_mem* mp);
+
+class XioPool
+{
+private:
+ struct xio_mempool *handle;
+
+public:
+ static bool trace_mempool;
+ static bool trace_msgcnt;
+ static const int MB = 8;
+
+ struct xio_piece {
+ struct xio_reg_mem mp[1];
+ struct xio_piece *next;
+ int s;
+ char payload[MB];
+ } *first;
+
+ explicit XioPool(struct xio_mempool *_handle) :
+ handle(_handle), first(0)
+ {
+ }
+ ~XioPool()
+ {
+ struct xio_piece *p;
+ while ((p = first)) {
+ first = p->next;
+ if (unlikely(trace_mempool)) {
+ memset(p->payload, 0xcf, p->s); // guard bytes
+ }
+ xpool_free(sizeof(struct xio_piece)+(p->s)-MB, p->mp);
+ }
+ }
+ void *alloc(size_t _s)
+ {
+ void *r;
+ struct xio_reg_mem mp[1];
+ struct xio_piece *x;
+ int e = xpool_alloc(handle, (sizeof(struct xio_piece)-MB) + _s, mp);
+ if (e) {
+ r = 0;
+ } else {
+ x = reinterpret_cast<struct xio_piece *>(mp->addr);
+ *x->mp = *mp;
+ x->next = first;
+ x->s = _s;
+ first = x;
+ r = x->payload;
+ }
+ return r;
+ }
+};
+
+class XioPoolStats {
+private:
+ enum pool_sizes {
+ SLAB_64 = 0,
+ SLAB_256,
+ SLAB_1024,
+ SLAB_PAGE,
+ SLAB_MAX,
+ SLAB_OVERFLOW,
+ NUM_SLABS,
+ };
+
+ std::atomic<unsigned> ctr_set[NUM_SLABS] = {};
+ std::atomic<unsigned> msg_cnt = { 0 }; // send msgs
+ std::atomic<unsigned> hook_cnt = { 0 }; // recv msgs
+
+public:
+ void dump(const char* tag, uint64_t serial);
+
+ void inc(uint64_t size) {
+ if (size <= 64) {
+ (ctr_set[SLAB_64])++;
+ return;
+ }
+ if (size <= 256) {
+ (ctr_set[SLAB_256])++;
+ return;
+ }
+ if (size <= 1024) {
+ (ctr_set[SLAB_1024])++;
+ return;
+ }
+ if (size <= 8192) {
+ (ctr_set[SLAB_PAGE])++;
+ return;
+ }
+ (ctr_set[SLAB_MAX])++;
+ }
+
+ void dec(uint64_t size) {
+ if (size <= 64) {
+ (ctr_set[SLAB_64])--;
+ return;
+ }
+ if (size <= 256) {
+ (ctr_set[SLAB_256])--;
+ return;
+ }
+ if (size <= 1024) {
+ (ctr_set[SLAB_1024])--;
+ return;
+ }
+ if (size <= 8192) {
+ (ctr_set[SLAB_PAGE])--;
+ return;
+ }
+ (ctr_set[SLAB_MAX])--;
+ }
+
+ void inc_overflow() { ctr_set[SLAB_OVERFLOW]++; }
+ void dec_overflow() { ctr_set[SLAB_OVERFLOW]--; }
+
+ void inc_msgcnt() {
+ if (unlikely(XioPool::trace_msgcnt)) {
+ msg_cnt++;
+ }
+ }
+
+ void dec_msgcnt() {
+ if (unlikely(XioPool::trace_msgcnt)) {
+ msg_cnt--;
+ }
+ }
+
+ void inc_hookcnt() {
+ if (unlikely(XioPool::trace_msgcnt)) {
+ hook_cnt++;
+ }
+ }
+
+ void dec_hookcnt() {
+ if (unlikely(XioPool::trace_msgcnt)) {
+ hook_cnt--;
+ }
+ }
+};
+
+extern XioPoolStats xp_stats;
+
+static inline int xpool_alloc(struct xio_mempool *pool, uint64_t size,
+ struct xio_reg_mem* mp)
+{
+ // try to allocate from the xio pool
+ int r = xio_mempool_alloc(pool, size, mp);
+ if (r == 0) {
+ if (unlikely(XioPool::trace_mempool))
+ xp_stats += size;
+ return 0;
+ }
+ // fall back to malloc on errors
+ mp->addr = malloc(size);
+ ceph_assert(mp->addr);
+ mp->length = 0;
+ if (unlikely(XioPool::trace_mempool))
+ xp_stats.inc_overflow();
+ return 0;
+}
+
+static inline void xpool_free(uint64_t size, struct xio_reg_mem* mp)
+{
+ if (mp->length) {
+ if (unlikely(XioPool::trace_mempool))
+ xp_stats -= size;
+ xio_mempool_free(mp);
+ } else { // from malloc
+ if (unlikely(XioPool::trace_mempool))
+ xp_stats.dec_overflow();
+ free(mp->addr);
+ }
+}
+
+#define xpool_inc_msgcnt() \
+ do { xp_stats.inc_msgcnt(); } while (0)
+
+#define xpool_dec_msgcnt() \
+ do { xp_stats.dec_msgcnt(); } while (0)
+
+#define xpool_inc_hookcnt() \
+ do { xp_stats.inc_hookcnt(); } while (0)
+
+#define xpool_dec_hookcnt() \
+ do { xp_stats.dec_hookcnt(); } while (0)
+
+#endif /* XIO_POOL_H */
diff --git a/src/msg/xio/XioPortal.cc b/src/msg/xio/XioPortal.cc
new file mode 100644
index 00000000..e2379fb3
--- /dev/null
+++ b/src/msg/xio/XioPortal.cc
@@ -0,0 +1,98 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "XioPortal.h"
+#include <stdio.h>
+
+#define dout_subsys ceph_subsys_xio
+
+int XioPortal::bind(struct xio_session_ops *ops, const string &base_uri,
+ uint16_t port, uint16_t *assigned_port)
+{
+ // format uri
+ char buf[40];
+ xio_uri = base_uri;
+ xio_uri += ":";
+ sprintf(buf, "%d", port);
+ xio_uri += buf;
+
+ uint16_t assigned;
+ server = xio_bind(ctx, ops, xio_uri.c_str(), &assigned, 0, msgr);
+ if (server == NULL)
+ return xio_errno();
+
+ // update uri if port changed
+ if (port != assigned) {
+ xio_uri = base_uri;
+ xio_uri += ":";
+ sprintf(buf, "%d", assigned);
+ xio_uri += buf;
+ }
+
+ portal_id = const_cast<char*>(xio_uri.c_str());
+ if (assigned_port)
+ *assigned_port = assigned;
+ ldout(msgr->cct,20) << "xio_bind: portal " << xio_uri
+ << " returned server " << server << dendl;
+ return 0;
+}
+
+int XioPortals::bind(struct xio_session_ops *ops, const string& base_uri,
+ uint16_t port, uint16_t *port0)
+{
+ /* a server needs at least 1 portal */
+ if (n < 1)
+ return EINVAL;
+ Messenger *msgr = portals[0]->msgr;
+ portals.resize(n);
+
+ uint16_t port_min = msgr->cct->_conf->ms_bind_port_min;
+ const uint16_t port_max = msgr->cct->_conf->ms_bind_port_max;
+
+ /* bind the portals */
+ for (size_t i = 0; i < portals.size(); i++) {
+ uint16_t result_port;
+ if (port != 0) {
+ // bind directly to the given port
+ int r = portals[i]->bind(ops, base_uri, port, &result_port);
+ if (r != 0)
+ return -r;
+ } else {
+ int r = EADDRINUSE;
+ // try ports within the configured range
+ for (; port_min <= port_max; port_min++) {
+ r = portals[i]->bind(ops, base_uri, port_min, &result_port);
+ if (r == 0) {
+ port_min++;
+ break;
+ }
+ }
+ if (r != 0) {
+ lderr(msgr->cct) << "portal.bind unable to bind to " << base_uri
+ << " on any port in range " << msgr->cct->_conf->ms_bind_port_min
+ << "-" << port_max << ": " << xio_strerror(r) << dendl;
+ return -r;
+ }
+ }
+
+ ldout(msgr->cct,5) << "xp::bind: portal " << i << " bind OK: "
+ << portals[i]->xio_uri << dendl;
+
+ if (i == 0 && port0 != NULL)
+ *port0 = result_port;
+ port = 0; // use port 0 for all subsequent portals
+ }
+
+ return 0;
+}
diff --git a/src/msg/xio/XioPortal.h b/src/msg/xio/XioPortal.h
new file mode 100644
index 00000000..7a0afee4
--- /dev/null
+++ b/src/msg/xio/XioPortal.h
@@ -0,0 +1,458 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *s
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIO_PORTAL_H
+#define XIO_PORTAL_H
+
+#include <string>
+
+extern "C" {
+#include "libxio.h"
+}
+#include "XioInSeq.h"
+#include <boost/lexical_cast.hpp>
+#include "msg/SimplePolicyMessenger.h"
+#include "XioConnection.h"
+#include "XioMsg.h"
+
+#include "include/spinlock.h"
+
+#include "include/ceph_assert.h"
+#include "common/dout.h"
+
+#ifndef CACHE_LINE_SIZE
+#define CACHE_LINE_SIZE 64 /* XXX arch-specific define */
+#endif
+#define CACHE_PAD(_n) char __pad ## _n [CACHE_LINE_SIZE]
+
+class XioPortal : public Thread
+{
+private:
+
+ struct SubmitQueue
+ {
+ const static int nlanes = 7;
+
+ struct Lane
+ {
+ uint32_t size;
+ XioSubmit::Queue q;
+ ceph::spinlock sp;
+ CACHE_PAD(0);
+ };
+
+ Lane qlane[nlanes];
+
+ int ix; /* atomicity by portal thread */
+
+ SubmitQueue() : ix(0)
+ {
+ int ix;
+ Lane* lane;
+
+ for (ix = 0; ix < nlanes; ++ix) {
+ lane = &qlane[ix];
+ lane->size = 0;
+ }
+ }
+
+ inline Lane* get_lane(XioConnection *xcon)
+ {
+ return &qlane[(((uint64_t) xcon) / 16) % nlanes];
+ }
+
+ void enq(XioConnection *xcon, XioSubmit* xs)
+ {
+ Lane* lane = get_lane(xcon);
+ std::lock_guard<decltype(lane->sp)> lg(lane->sp);
+ lane->q.push_back(*xs);
+ ++(lane->size);
+ }
+
+ void enq(XioConnection *xcon, XioSubmit::Queue& requeue_q)
+ {
+ int size = requeue_q.size();
+ Lane* lane = get_lane(xcon);
+ std::lock_guard<decltype(lane->sp)> lg(lane->sp);
+ XioSubmit::Queue::const_iterator i1 = lane->q.end();
+ lane->q.splice(i1, requeue_q);
+ lane->size += size;
+ }
+
+ void deq(XioSubmit::Queue& send_q)
+ {
+ Lane* lane;
+ int cnt;
+
+ for (cnt = 0; cnt < nlanes; ++cnt, ++ix, ix = ix % nlanes) {
+ std::lock_guard<decltype(lane->sp)> lg(lane->sp);
+ lane = &qlane[ix];
+ if (lane->size > 0) {
+ XioSubmit::Queue::const_iterator i1 = send_q.end();
+ send_q.splice(i1, lane->q);
+ lane->size = 0;
+ ++ix, ix = ix % nlanes;
+ break;
+ }
+ }
+ }
+
+ }; /* SubmitQueue */
+
+ Messenger *msgr;
+ struct xio_context *ctx;
+ struct xio_server *server;
+ SubmitQueue submit_q;
+ ceph::spinlock sp;
+ void *ev_loop;
+ string xio_uri;
+ char *portal_id;
+ bool _shutdown;
+ bool drained;
+ uint32_t magic;
+ uint32_t special_handling;
+
+ friend class XioPortals;
+ friend class XioMessenger;
+
+public:
+ explicit XioPortal(Messenger *_msgr, int max_conns) :
+ msgr(_msgr), ctx(NULL), server(NULL), submit_q(), xio_uri(""),
+ portal_id(NULL), _shutdown(false), drained(false),
+ magic(0),
+ special_handling(0)
+ {
+ struct xio_context_params ctx_params;
+ memset(&ctx_params, 0, sizeof(ctx_params));
+ ctx_params.user_context = this;
+ /*
+ * hint to Accelio the total number of connections that will share
+ * this context's resources: internal primary task pool...
+ */
+ ctx_params.max_conns_per_ctx = max_conns;
+
+ /* a portal is an xio_context and event loop */
+ ctx = xio_context_create(&ctx_params, 0 /* poll timeout */, -1 /* cpu hint */);
+ ceph_assert(ctx && "Whoops, failed to create portal/ctx");
+ }
+
+ int bind(struct xio_session_ops *ops, const string &base_uri,
+ uint16_t port, uint16_t *assigned_port);
+
+ inline void release_xio_msg(XioCompletion* xcmp) {
+ struct xio_msg *msg = xcmp->dequeue();
+ struct xio_msg *next_msg = NULL;
+ int code;
+ if (unlikely(!xcmp->xcon->conn)) {
+ // NOTE: msg is not safe to dereference if the connection was torn down
+ xcmp->xcon->msg_release_fail(msg, ENOTCONN);
+ }
+ else while (msg) {
+ next_msg = static_cast<struct xio_msg *>(msg->user_context);
+ code = xio_release_msg(msg);
+ if (unlikely(code)) /* very unlikely, so log it */
+ xcmp->xcon->msg_release_fail(msg, code);
+ msg = next_msg;
+ }
+ xcmp->trace.event("xio_release_msg");
+ xcmp->finalize(); /* unconditional finalize */
+ }
+
+ void enqueue(XioConnection *xcon, XioSubmit *xs)
+ {
+ if (! _shutdown) {
+ submit_q.enq(xcon, xs);
+ xio_context_stop_loop(ctx);
+ return;
+ }
+
+ /* dispose xs */
+ switch(xs->type) {
+ case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
+ {
+ XioSend* xsend = static_cast<XioSend*>(xs);
+ xs->xcon->msg_send_fail(xsend, -EINVAL);
+ }
+ break;
+ default:
+ /* INCOMING_MSG_RELEASE */
+ release_xio_msg(static_cast<XioCompletion*>(xs));
+ break;
+ };
+ }
+
+ void requeue(XioConnection* xcon, XioSubmit::Queue& send_q) {
+ submit_q.enq(xcon, send_q);
+ }
+
+ void requeue_all_xcon(XioConnection* xcon,
+ XioSubmit::Queue::iterator& q_iter,
+ XioSubmit::Queue& send_q) {
+ // XXX gather all already-dequeued outgoing messages for xcon
+ // and push them in FIFO order to front of the input queue,
+ // and mark the connection as flow-controlled
+ XioSubmit::Queue requeue_q;
+
+ while (q_iter != send_q.end()) {
+ XioSubmit *xs = &(*q_iter);
+ // skip retires and anything for other connections
+ if (xs->xcon != xcon) {
+ q_iter++;
+ continue;
+ }
+ q_iter = send_q.erase(q_iter);
+ requeue_q.push_back(*xs);
+ }
+ std::lock_guard<decltype(xcon->sp)> lg(xcon->sp);
+ XioSubmit::Queue::const_iterator i1 = xcon->outgoing.requeue.begin();
+ xcon->outgoing.requeue.splice(i1, requeue_q);
+ xcon->cstate.state_flow_controlled(XioConnection::CState::OP_FLAG_LOCKED);
+ }
+
+ void *entry()
+ {
+ int size, code = 0;
+ uint32_t xio_qdepth_high;
+ XioSubmit::Queue send_q;
+ XioSubmit::Queue::iterator q_iter;
+ struct xio_msg *msg = NULL;
+ XioConnection *xcon;
+ XioSubmit *xs;
+ XioSend *xsend;
+
+ do {
+ submit_q.deq(send_q);
+
+ /* shutdown() barrier */
+ std::lock_guard<decltype(sp)> lg(sp);
+
+ restart:
+ size = send_q.size();
+
+ if (_shutdown) {
+ // XXX XioSend queues for flow-controlled connections may require
+ // cleanup
+ drained = true;
+ }
+
+ if (size > 0) {
+ q_iter = send_q.begin();
+ while (q_iter != send_q.end()) {
+ xs = &(*q_iter);
+ xcon = xs->xcon;
+
+ switch (xs->type) {
+ case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
+ xsend = static_cast<XioSend*>(xs);
+ if (unlikely(!xcon->conn || !xcon->is_connected()))
+ code = ENOTCONN;
+ else {
+ /* XXX guard Accelio send queue (should be safe to rely
+ * on Accelio's check on below, but this assures that
+ * all chained xio_msg are accounted) */
+ xio_qdepth_high = xcon->xio_qdepth_high_mark();
+ if (unlikely((xcon->send_ctr + xsend->get_msg_count()) >
+ xio_qdepth_high)) {
+ requeue_all_xcon(xcon, q_iter, send_q);
+ goto restart;
+ }
+
+ xs->trace.event("xio_send_msg");
+ msg = xsend->get_xio_msg();
+ code = xio_send_msg(xcon->conn, msg);
+ /* header trace moved here to capture xio serial# */
+ if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
+ xsend->print_debug(msgr->cct, "xio_send_msg");
+ }
+ /* get the right Accelio's errno code */
+ if (unlikely(code)) {
+ if ((code == -1) && (xio_errno() == -1)) {
+ /* In case XIO does not have any credits to send,
+ * it would still queue up the message(s) for transmission,
+ * but would return -1 and errno would also be set to -1.
+ * This needs to be treated as a success.
+ */
+ code = 0;
+ }
+ else {
+ code = xio_errno();
+ }
+ }
+ } /* !ENOTCONN */
+ if (unlikely(code)) {
+ switch (code) {
+ case XIO_E_TX_QUEUE_OVERFLOW:
+ {
+ requeue_all_xcon(xcon, q_iter, send_q);
+ goto restart;
+ }
+ break;
+ default:
+ q_iter = send_q.erase(q_iter);
+ xcon->msg_send_fail(xsend, code);
+ continue;
+ break;
+ };
+ } else {
+ xcon->send.set(msg->timestamp); // need atomic?
+ xcon->send_ctr += xsend->get_msg_count(); // only inc if cb promised
+ }
+ break;
+ default:
+ /* INCOMING_MSG_RELEASE */
+ q_iter = send_q.erase(q_iter);
+ release_xio_msg(static_cast<XioCompletion*>(xs));
+ continue;
+ } /* switch (xs->type) */
+ q_iter = send_q.erase(q_iter);
+ } /* while */
+ } /* size > 0 */
+
+ xio_context_run_loop(ctx, 300);
+
+ } while ((!_shutdown) || (!drained));
+
+ /* shutting down */
+ if (server) {
+ xio_unbind(server);
+ }
+ xio_context_destroy(ctx);
+ return NULL;
+ }
+
+ void shutdown()
+ {
+ std::lock_guard<decltype(sp)> lg(sp);
+ _shutdown = true;
+ }
+};
+
+class XioPortals
+{
+private:
+ vector<XioPortal*> portals;
+ char **p_vec;
+ int n;
+ int last_unused;
+
+public:
+ XioPortals(Messenger *msgr, int _n, int nconns) : p_vec(NULL), last_unused(0)
+ {
+ n = max(_n, 1);
+
+ portals.resize(n);
+ for (int i = 0; i < n; i++) {
+ if (!portals[i]) {
+ portals[i] = new XioPortal(msgr, nconns);
+ ceph_assert(portals[i] != nullptr);
+ }
+ }
+ }
+
+ vector<XioPortal*>& get() { return portals; }
+
+ const char **get_vec()
+ {
+ return (const char **) p_vec;
+ }
+
+ int get_portals_len()
+ {
+ return n;
+ }
+
+ int get_last_unused()
+ {
+ int pix = last_unused;
+ if (++last_unused >= get_portals_len())
+ last_unused = 0;
+ return pix;
+ }
+
+ XioPortal* get_next_portal()
+ {
+ int pix = get_last_unused();
+ return portals[pix];
+ }
+
+ int bind(struct xio_session_ops *ops, const string& base_uri,
+ uint16_t port, uint16_t *port0);
+
+ int accept(struct xio_session *session,
+ struct xio_new_session_req *req,
+ void *cb_user_context)
+ {
+ const char **portals_vec = get_vec();
+ int pix = get_last_unused();
+
+ if (pix == 0) {
+ return xio_accept(session, NULL, 0, NULL, 0);
+ } else {
+ return xio_accept(session,
+ (const char **)&(portals_vec[pix]),
+ 1, NULL, 0);
+ }
+ }
+
+ void start()
+ {
+ XioPortal *portal;
+ int p_ix, nportals = portals.size();
+
+ p_vec = new char*[nportals];
+ for (p_ix = 0; p_ix < nportals; ++p_ix) {
+ portal = portals[p_ix];
+ p_vec[p_ix] = (char*) /* portal->xio_uri.c_str() */
+ portal->portal_id;
+ }
+
+ for (p_ix = 0; p_ix < nportals; ++p_ix) {
+ string thread_name = "ms_xio_";
+ thread_name.append(std::to_string(p_ix));
+ portal = portals[p_ix];
+ portal->create(thread_name.c_str());
+ }
+ }
+
+ void shutdown()
+ {
+ int nportals = portals.size();
+ for (int p_ix = 0; p_ix < nportals; ++p_ix) {
+ XioPortal *portal = portals[p_ix];
+ portal->shutdown();
+ }
+ }
+
+ void join()
+ {
+ int nportals = portals.size();
+ for (int p_ix = 0; p_ix < nportals; ++p_ix) {
+ XioPortal *portal = portals[p_ix];
+ portal->join();
+ }
+ }
+
+ ~XioPortals()
+ {
+ int nportals = portals.size();
+ for (int ix = 0; ix < nportals; ++ix)
+ delete(portals[ix]);
+ portals.clear();
+ if (p_vec)
+ delete[] p_vec;
+ }
+};
+
+#endif /* XIO_PORTAL_H */
diff --git a/src/msg/xio/XioSubmit.h b/src/msg/xio/XioSubmit.h
new file mode 100644
index 00000000..9840ad4a
--- /dev/null
+++ b/src/msg/xio/XioSubmit.h
@@ -0,0 +1,58 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIO_SUBMIT_H
+#define XIO_SUBMIT_H
+
+#include <boost/intrusive/list.hpp>
+#include "msg/SimplePolicyMessenger.h"
+extern "C" {
+#include "libxio.h"
+}
+#include "XioConnection.h"
+#include "msg/msg_types.h"
+#include "XioPool.h"
+
+namespace bi = boost::intrusive;
+
+class XioConnection;
+
+struct XioSubmit
+{
+public:
+ enum submit_type
+ {
+ OUTGOING_MSG,
+ INCOMING_MSG_RELEASE
+ };
+ enum submit_type type;
+ bi::list_member_hook<> submit_list;
+ XioConnection *xcon;
+ ZTracer::Trace trace;
+
+ XioSubmit(enum submit_type _type, XioConnection *_xcon) :
+ type(_type), xcon(_xcon)
+ {}
+
+ typedef bi::list< XioSubmit,
+ bi::member_hook< XioSubmit,
+ bi::list_member_hook<>,
+ &XioSubmit::submit_list >
+ > Queue;
+ virtual ~XioSubmit(){
+ }
+};
+
+#endif /* XIO_SUBMIT_H */