diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/msg/xio | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/msg/xio')
-rw-r--r-- | src/msg/xio/XioConnection.cc | 858 | ||||
-rw-r--r-- | src/msg/xio/XioConnection.h | 380 | ||||
-rw-r--r-- | src/msg/xio/XioInSeq.h | 84 | ||||
-rw-r--r-- | src/msg/xio/XioMessenger.cc | 1136 | ||||
-rw-r--r-- | src/msg/xio/XioMessenger.h | 176 | ||||
-rw-r--r-- | src/msg/xio/XioMsg.cc | 51 | ||||
-rw-r--r-- | src/msg/xio/XioMsg.h | 446 | ||||
-rw-r--r-- | src/msg/xio/XioPool.cc | 41 | ||||
-rw-r--r-- | src/msg/xio/XioPool.h | 218 | ||||
-rw-r--r-- | src/msg/xio/XioPortal.cc | 98 | ||||
-rw-r--r-- | src/msg/xio/XioPortal.h | 458 | ||||
-rw-r--r-- | src/msg/xio/XioSubmit.h | 58 |
12 files changed, 4004 insertions, 0 deletions
diff --git a/src/msg/xio/XioConnection.cc b/src/msg/xio/XioConnection.cc new file mode 100644 index 00000000..4bfab39b --- /dev/null +++ b/src/msg/xio/XioConnection.cc @@ -0,0 +1,858 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Portions Copyright (C) 2013 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "XioMsg.h" +#include "XioConnection.h" +#include "XioMessenger.h" +#include "messages/MDataPing.h" +#include "msg/msg_types.h" +#include "auth/none/AuthNoneProtocol.h" // XXX + +#include "include/ceph_assert.h" +#include "common/dout.h" + +extern struct xio_mempool *xio_msgr_mpool; +extern struct xio_mempool *xio_msgr_noreg_mpool; + +#define dout_subsys ceph_subsys_xio + +void print_xio_msg_hdr(CephContext *cct, const char *tag, + const XioMsgHdr &hdr, const struct xio_msg *msg) +{ + if (msg) { + ldout(cct,4) << tag << + " xio msg:" << + " sn: " << msg->sn << + " timestamp: " << msg->timestamp << + dendl; + } + + ldout(cct,4) << tag << + " ceph header: " << + " front_len: " << hdr.hdr->front_len << + " seq: " << hdr.hdr->seq << + " tid: " << hdr.hdr->tid << + " type: " << hdr.hdr->type << + " prio: " << hdr.hdr->priority << + " name type: " << (int) hdr.hdr->src.type << + " name num: " << (int) hdr.hdr->src.num << + " version: " << hdr.hdr->version << + " compat_version: " << hdr.hdr->compat_version << + " front_len: " << hdr.hdr->front_len << + " middle_len: " << hdr.hdr->middle_len << + " data_len: " << hdr.hdr->data_len << + " xio header: " << + " msg_cnt: " << hdr.msg_cnt << + dendl; + + ldout(cct,4) << tag << + " ceph footer: " << + " front_crc: " << hdr.ftr->front_crc << + " middle_crc: " << hdr.ftr->middle_crc << + " data_crc: " << hdr.ftr->data_crc << + " sig: " << hdr.ftr->sig << + " flags: " << (uint32_t) hdr.ftr->flags << + dendl; +} + +void print_ceph_msg(CephContext *cct, const char *tag, Message *m) +{ + if (m->get_magic() & (MSG_MAGIC_XIO & MSG_MAGIC_TRACE_DTOR)) { + ceph_msg_header& header = m->get_header(); + ldout(cct,4) << tag << " header version " << header.version << + " compat version " << header.compat_version << + dendl; + } +} + +#undef dout_prefix +#define dout_prefix conn_prefix(_dout) +ostream& XioConnection::conn_prefix(std::ostream *_dout) { + return *_dout << "-- " << get_messenger()->get_myinst().addr << " >> " << peer_addr + << " peer=" << peer.name.type_str() + << " conn=" << conn << " sess=" << session << " "; +} + +XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type, + const entity_inst_t& _peer) : + Connection(m->cct, m), + xio_conn_type(_type), + portal(m->get_portal()), + connected(false), + peer(_peer), + session(NULL), + conn(NULL), + magic(m->get_magic()), + scount(0), + send_ctr(0), + in_seq(), + cstate(this) +{ + set_peer_type(peer.name.type()); + set_peer_addr(peer.addr); + + Messenger::Policy policy; + int64_t max_msgs = 0, max_bytes = 0, bytes_opt = 0; + int xopt; + + policy = m->get_policy(peer_type); + + if (policy.throttler_messages) { + max_msgs = policy.throttler_messages->get_max(); + ldout(m->cct,4) << "XioMessenger throttle_msgs: " << max_msgs << dendl; + } + + xopt = m->cct->_conf->xio_queue_depth; + if (max_msgs > xopt) + xopt = max_msgs; + + /* set high mark for send, reserved 20% for credits */ + q_high_mark = xopt * 4 / 5; + q_low_mark = q_high_mark/2; + + /* set send & receive msgs queue depth */ + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_MSGS, + &xopt, sizeof(xopt)); + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_MSGS, + &xopt, sizeof(xopt)); + + if (policy.throttler_bytes) { + max_bytes = policy.throttler_bytes->get_max(); + ldout(m->cct,4) << "XioMessenger throttle_bytes: " << max_bytes << dendl; + } + + bytes_opt = (2 << 28); /* default: 512 MB */ + if (max_bytes > bytes_opt) + bytes_opt = max_bytes; + + /* set send & receive total bytes throttle */ + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_BYTES, + &bytes_opt, sizeof(bytes_opt)); + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_BYTES, + &bytes_opt, sizeof(bytes_opt)); + + ldout(m->cct,4) << "throttle_msgs: " << xopt << " throttle_bytes: " << bytes_opt << dendl; + + /* XXXX fake features, aieee! */ + set_features(XIO_ALL_FEATURES); +} + +int XioConnection::send_message(Message *m) +{ + XioMessenger *ms = static_cast<XioMessenger*>(get_messenger()); + return ms->_send_message(m, this); +} + +void XioConnection::send_keepalive_or_ack(bool ack, const utime_t *tp) +{ + /* If con is not in READY state, we need to queue the request */ + if (cstate.session_state.read() != XioConnection::UP) { + std::lock_guad<ceph::util::spinlock> lg(sp); + if (cstate.session_state.read() != XioConnection::UP) { + if (ack) { + outgoing.ack = true; + outgoing.ack_time = *tp; + } + else { + outgoing.keepalive = true; + } + return; + } + } + + send_keepalive_or_ack_internal(ack, tp); +} + +void XioConnection::send_keepalive_or_ack_internal(bool ack, const utime_t *tp) +{ + XioCommand *xcmd = pool_alloc_xio_command(this); + if (! xcmd) { + /* could happen if Accelio has been shutdown */ + return; + } + + struct ceph_timespec ts; + if (ack) { + ceph_assert(tp); + tp->encode_timeval(&ts); + xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2_ACK); + xcmd->get_bl_ref().append((char*)&ts, sizeof(ts)); + } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { + utime_t t = ceph_clock_now(); + t.encode_timeval(&ts); + xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2); + xcmd->get_bl_ref().append((char*)&ts, sizeof(ts)); + } else { + xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE); + } + + const std::list<buffer::ptr>& header = xcmd->get_bl_ref().buffers(); + ceph_assert(header.size() == 1); /* accelio header must be without scatter gather */ + list<bufferptr>::const_iterator pb = header.begin(); + ceph_assert(pb->length() < XioMsgHdr::get_max_encoded_length()); + struct xio_msg * msg = xcmd->get_xio_msg(); + msg->out.header.iov_base = (char*) pb->c_str(); + msg->out.header.iov_len = pb->length(); + + ldout(msgr->cct,8) << __func__ << " sending command with tag " << (int)(*(char*)msg->out.header.iov_base) + << " len " << msg->out.header.iov_len << dendl; + + portal->enqueue(this, xcmd); +} + + +int XioConnection::passive_setup() +{ + /* XXX passive setup is a placeholder for (potentially active-side + initiated) feature and auth* negotiation */ + static bufferlist authorizer_reply; /* static because fake */ + static CryptoKey session_key; /* ditto */ + bool authorizer_valid; + + XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger()); + + // fake an auth buffer + EntityName name; + name.set_type(peer.name.type()); + + AuthNoneAuthorizer auth; + auth.build_authorizer(name, peer.name.num()); + + /* XXX fake authorizer! */ + msgr->ms_deliver_verify_authorizer( + this, peer_type, CEPH_AUTH_NONE, + auth.bl, + 0, + authorizer_reply, + authorizer_valid, + session_key); + + /* notify hook */ + msgr->ms_deliver_handle_accept(this); + msgr->ms_deliver_handle_fast_accept(this); + + /* try to insert in conns_entity_map */ + msgr->try_insert(this); + return (0); +} + +static inline XioDispatchHook* pool_alloc_xio_dispatch_hook( + XioConnection *xcon, Message *m, XioInSeq& msg_seq) +{ + struct xio_reg_mem mp_mem; + int e = xpool_alloc(xio_msgr_noreg_mpool, + sizeof(XioDispatchHook), &mp_mem); + if (!!e) + return NULL; + XioDispatchHook *xhook = static_cast<XioDispatchHook*>(mp_mem.addr); + new (xhook) XioDispatchHook(xcon, m, msg_seq, mp_mem); + return xhook; +} + +int XioConnection::handle_data_msg(struct xio_session *session, + struct xio_msg *msg, + int more_in_batch, + void *cb_user_context) +{ + struct xio_msg *tmsg = msg; + + /* XXX Accelio guarantees message ordering at + * xio_session */ + + if (! in_seq.p()) { + if (!tmsg->in.header.iov_len) { + ldout(msgr->cct,0) << __func__ << " empty header: packet out of sequence?" << dendl; + xio_release_msg(msg); + return 0; + } + const size_t sizeof_tag = 1; + XioMsgCnt msg_cnt( + buffer::create_static(tmsg->in.header.iov_len-sizeof_tag, + ((char*) tmsg->in.header.iov_base)+sizeof_tag)); + ldout(msgr->cct,10) << __func__ << " receive msg " << "tmsg " << tmsg + << " msg_cnt " << msg_cnt.msg_cnt + << " iov_base " << tmsg->in.header.iov_base + << " iov_len " << (int) tmsg->in.header.iov_len + << " nents " << tmsg->in.pdata_iov.nents + << " sn " << tmsg->sn << dendl; + ceph_assert(session == this->session); + in_seq.set_count(msg_cnt.msg_cnt); + } else { + /* XXX major sequence error */ + ceph_assert(! tmsg->in.header.iov_len); + } + + in_seq.append(msg); + if (in_seq.count() > 0) { + return 0; + } + + XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger()); + XioDispatchHook *m_hook = + pool_alloc_xio_dispatch_hook(this, NULL /* msg */, in_seq); + XioInSeq& msg_seq = m_hook->msg_seq; + in_seq.clear(); + + ceph_msg_header header; + ceph_msg_footer footer; + buffer::list payload, middle, data; + + const utime_t recv_stamp = ceph_clock_now(); + + ldout(msgr->cct,4) << __func__ << " " << "msg_seq.size()=" << msg_seq.size() << + dendl; + + struct xio_msg* msg_iter = msg_seq.begin(); + tmsg = msg_iter; + XioMsgHdr hdr(header, footer, + buffer::create_static(tmsg->in.header.iov_len, + (char*) tmsg->in.header.iov_base)); + + if (magic & (MSG_MAGIC_TRACE_XCON)) { + if (hdr.hdr->type == 43) { + print_xio_msg_hdr(msgr->cct, "on_msg", hdr, NULL); + } + } + + unsigned int ix, blen, iov_len; + struct xio_iovec_ex *msg_iov, *iovs; + uint32_t take_len, left_len = 0; + char *left_base = NULL; + + ix = 0; + blen = header.front_len; + + while (blen && (msg_iter != msg_seq.end())) { + tmsg = msg_iter; + iov_len = vmsg_sglist_nents(&tmsg->in); + iovs = vmsg_sglist(&tmsg->in); + for (; blen && (ix < iov_len); ++ix) { + msg_iov = &iovs[ix]; + + /* XXX need to detect any buffer which needs to be + * split due to coalescing of a segment (front, middle, + * data) boundary */ + + take_len = std::min(blen, msg_iov->iov_len); + payload.append( + buffer::create_msg( + take_len, (char*) msg_iov->iov_base, m_hook)); + blen -= take_len; + if (! blen) { + left_len = msg_iov->iov_len - take_len; + if (left_len) { + left_base = ((char*) msg_iov->iov_base) + take_len; + } + } + } + /* XXX as above, if a buffer is split, then we needed to track + * the new start (carry) and not advance */ + if (ix == iov_len) { + msg_seq.next(&msg_iter); + ix = 0; + } + } + + if (magic & (MSG_MAGIC_TRACE_XCON)) { + if (hdr.hdr->type == 43) { + ldout(msgr->cct,4) << "front (payload) dump:"; + payload.hexdump( *_dout ); + *_dout << dendl; + } + } + + blen = header.middle_len; + + if (blen && left_len) { + middle.append( + buffer::create_msg(left_len, left_base, m_hook)); + left_len = 0; + } + + while (blen && (msg_iter != msg_seq.end())) { + tmsg = msg_iter; + iov_len = vmsg_sglist_nents(&tmsg->in); + iovs = vmsg_sglist(&tmsg->in); + for (; blen && (ix < iov_len); ++ix) { + msg_iov = &iovs[ix]; + take_len = std::min(blen, msg_iov->iov_len); + middle.append( + buffer::create_msg( + take_len, (char*) msg_iov->iov_base, m_hook)); + blen -= take_len; + if (! blen) { + left_len = msg_iov->iov_len - take_len; + if (left_len) { + left_base = ((char*) msg_iov->iov_base) + take_len; + } + } + } + if (ix == iov_len) { + msg_seq.next(&msg_iter); + ix = 0; + } + } + + blen = header.data_len; + + if (blen && left_len) { + data.append( + buffer::create_msg(left_len, left_base, m_hook)); + left_len = 0; + } + + while (blen && (msg_iter != msg_seq.end())) { + tmsg = msg_iter; + iov_len = vmsg_sglist_nents(&tmsg->in); + iovs = vmsg_sglist(&tmsg->in); + for (; blen && (ix < iov_len); ++ix) { + msg_iov = &iovs[ix]; + data.append( + buffer::create_msg( + msg_iov->iov_len, (char*) msg_iov->iov_base, m_hook)); + blen -= msg_iov->iov_len; + } + if (ix == iov_len) { + msg_seq.next(&msg_iter); + ix = 0; + } + } + + /* update connection timestamp */ + recv = tmsg->timestamp; + + Message *m = decode_message(msgr->cct, msgr->crcflags, header, footer, + payload, middle, data, this); + + if (m) { + /* completion */ + m->set_connection(this); + + /* reply hook */ + m_hook->set_message(m); + m->set_completion_hook(m_hook); + + /* trace flag */ + m->set_magic(magic); + + /* update timestamps */ + m->set_recv_stamp(recv_stamp); + m->set_recv_complete_stamp(ceph_clock_now()); + m->set_seq(header.seq); + + /* MP-SAFE */ + state.set_in_seq(header.seq); + + /* XXXX validate peer type */ + if (peer_type != (int) hdr.peer_type) { /* XXX isn't peer_type -1? */ + peer_type = hdr.peer_type; + peer_addr = hdr.addr; + peer.addr = peer_addr; + peer.name = entity_name_t(hdr.hdr->src); + if (xio_conn_type == XioConnection::PASSIVE) { + /* XXX kick off feature/authn/authz negotiation + * nb: very possibly the active side should initiate this, but + * for now, call a passive hook so OSD and friends can create + * sessions without actually negotiating + */ + passive_setup(); + } + } + + if (magic & (MSG_MAGIC_TRACE_XCON)) { + ldout(msgr->cct,4) << "decode m is " << m->get_type() << dendl; + } + + /* dispatch it */ + msgr->ds_dispatch(m); + } else { + /* responds for undecoded messages and frees hook */ + ldout(msgr->cct,4) << "decode m failed" << dendl; + m_hook->on_err_finalize(this); + } + + return 0; +} + +int XioConnection::on_msg(struct xio_session *session, + struct xio_msg *msg, + int more_in_batch, + void *cb_user_context) +{ + char tag = CEPH_MSGR_TAG_MSG; + if (msg->in.header.iov_len) + tag = *(char*)msg->in.header.iov_base; + + ldout(msgr->cct,8) << __func__ << " receive msg with iov_len " + << (int) msg->in.header.iov_len << " tag " << (int)tag << dendl; + + //header_len_without_tag is only meaningful in case we have tag + size_t header_len_without_tag = msg->in.header.iov_len - sizeof(tag); + + switch(tag) { + case CEPH_MSGR_TAG_MSG: + ldout(msgr->cct, 20) << __func__ << " got data message" << dendl; + return handle_data_msg(session, msg, more_in_batch, cb_user_context); + + case CEPH_MSGR_TAG_KEEPALIVE: + ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl; + set_last_keepalive(ceph_clock_now()); + break; + + case CEPH_MSGR_TAG_KEEPALIVE2: + if (header_len_without_tag < sizeof(ceph_timespec)) { + lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2: got " << header_len_without_tag << + " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl; + } + else { + ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag)); + utime_t kp_t = utime_t(*t); + ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2 with timestamp" << kp_t << dendl; + send_keepalive_or_ack(true, &kp_t); + set_last_keepalive(ceph_clock_now()); + } + + break; + + case CEPH_MSGR_TAG_KEEPALIVE2_ACK: + if (header_len_without_tag < sizeof(ceph_timespec)) { + lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2_ACK: got " << header_len_without_tag << + " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl; + } + else { + ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag)); + utime_t kp_t(*t); + ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2_ACK with timestamp" << kp_t << dendl; + set_last_keepalive_ack(kp_t); + } + break; + + default: + lderr(msgr->cct) << __func__ << " unsupported message tag " << (int) tag << dendl; + ceph_assert(! "unsupported message tag"); + } + + xio_release_msg(msg); + return 0; +} + + +int XioConnection::on_ow_msg_send_complete(struct xio_session *session, + struct xio_msg *req, + void *conn_user_context) +{ + /* requester send complete (one-way) */ + uint64_t rc = ++scount; + + XioSend* xsend = static_cast<XioSend*>(req->user_context); + if (unlikely(magic & MSG_MAGIC_TRACE_CTR)) { + if (unlikely((rc % 1000000) == 0)) { + std::cout << "xio finished " << rc << " " << time(0) << std::endl; + } + } /* trace ctr */ + + ldout(msgr->cct,11) << "on_msg_delivered xcon: " << xsend->xcon << + " msg: " << req << " sn: " << req->sn << dendl; + + XioMsg *xmsg = dynamic_cast<XioMsg*>(xsend); + if (xmsg) { + ldout(msgr->cct,11) << "on_msg_delivered xcon: " << + " type: " << xmsg->m->get_type() << " tid: " << xmsg->m->get_tid() << + " seq: " << xmsg->m->get_seq() << dendl; + } + + --send_ctr; /* atomic, because portal thread */ + + /* unblock flow-controlled connections, avoid oscillation */ + if (unlikely(cstate.session_state.read() == + XioConnection::FLOW_CONTROLLED)) { + if ((send_ctr <= uint32_t(xio_qdepth_low_mark())) && + (1 /* XXX memory <= memory low-water mark */)) { + cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE); + ldout(msgr->cct,2) << "on_msg_delivered xcon: " << xsend->xcon + << " up_ready from flow_controlled" << dendl; + } + } + + xsend->put(); + + return 0; +} /* on_msg_delivered */ + +void XioConnection::msg_send_fail(XioSend *xsend, int code) +{ + ldout(msgr->cct,2) << "xio_send_msg FAILED xcon: " << this << + " msg: " << xsend->get_xio_msg() << " code=" << code << + " (" << xio_strerror(code) << ")" << dendl; + /* return refs taken for each xio_msg */ + xsend->put_msg_refs(); +} /* msg_send_fail */ + +void XioConnection::msg_release_fail(struct xio_msg *msg, int code) +{ + ldout(msgr->cct,2) << "xio_release_msg FAILED xcon: " << this << + " msg: " << msg << "code=" << code << + " (" << xio_strerror(code) << ")" << dendl; +} /* msg_release_fail */ + +int XioConnection::flush_out_queues(uint32_t flags) { + XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger()); + if (! (flags & CState::OP_FLAG_LOCKED)) + sp.lock(); + + if (outgoing.keepalive) { + outgoing.keepalive = false; + send_keepalive_or_ack_internal(); + } + + if (outgoing.ack) { + outgoing.ack = false; + send_keepalive_or_ack_internal(true, &outgoing.ack_time); + } + + // send deferred 1 (direct backpresssure) + if (outgoing.requeue.size() > 0) + portal->requeue(this, outgoing.requeue); + + // send deferred 2 (sent while deferred) + int ix, q_size = outgoing.mqueue.size(); + for (ix = 0; ix < q_size; ++ix) { + Message::Queue::iterator q_iter = outgoing.mqueue.begin(); + Message* m = &(*q_iter); + outgoing.mqueue.erase(q_iter); + msgr->_send_message_impl(m, this); + } + if (! (flags & CState::OP_FLAG_LOCKED)) + sp.unlock(); + return 0; +} + +int XioConnection::discard_out_queues(uint32_t flags) +{ + Message::Queue disc_q; + XioSubmit::Queue deferred_q; + + if (! (flags & CState::OP_FLAG_LOCKED)) + sp.lock(); + + /* the two send queues contain different objects: + * - anything on the mqueue is a Message + * - anything on the requeue is an XioSend + */ + Message::Queue::const_iterator i1 = disc_q.end(); + disc_q.splice(i1, outgoing.mqueue); + + XioSubmit::Queue::const_iterator i2 = deferred_q.end(); + deferred_q.splice(i2, outgoing.requeue); + + outgoing.keepalive = outgoing.ack = false; + + if (! (flags & CState::OP_FLAG_LOCKED)) + sp.unlock(); + + // mqueue + while (!disc_q.empty()) { + Message::Queue::iterator q_iter = disc_q.begin(); + Message* m = &(*q_iter); + disc_q.erase(q_iter); + m->put(); + } + + // requeue + while (!deferred_q.empty()) { + XioSubmit::Queue::iterator q_iter = deferred_q.begin(); + XioSubmit* xs = &(*q_iter); + XioSend* xsend; + switch (xs->type) { + case XioSubmit::OUTGOING_MSG: + xsend = static_cast<XioSend*>(xs); + deferred_q.erase(q_iter); + // release once for each chained xio_msg + xsend->put(xsend->get_msg_count()); + break; + case XioSubmit::INCOMING_MSG_RELEASE: + deferred_q.erase(q_iter); + portal->release_xio_msg(static_cast<XioCompletion*>(xs)); + break; + default: + ldout(msgr->cct,0) << __func__ << ": Unknown Msg type " << xs->type << dendl; + break; + } + } + + return 0; +} + +int XioConnection::adjust_clru(uint32_t flags) +{ + if (flags & CState::OP_FLAG_LOCKED) + sp.unlock(); + + XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger()); + msgr->conns_sp.lock(); + sp.lock(); + + if (cstate.flags & CState::FLAG_MAPPED) { + XioConnection::ConnList::iterator citer = + XioConnection::ConnList::s_iterator_to(*this); + msgr->conns_list.erase(citer); + msgr->conns_list.push_front(*this); // LRU + } + + msgr->conns_sp.unlock(); + + if (! (flags & CState::OP_FLAG_LOCKED)) + sp.unlock(); + + return 0; +} + +int XioConnection::on_msg_error(struct xio_session *session, + enum xio_status error, + struct xio_msg *msg, + void *conn_user_context) +{ + XioSend *xsend = static_cast<XioSend*>(msg->user_context); + if (xsend) + xsend->put(); + + --send_ctr; /* atomic, because portal thread */ + return 0; +} /* on_msg_error */ + +void XioConnection::mark_down() +{ + _mark_down(XioConnection::CState::OP_FLAG_NONE); +} + +int XioConnection::_mark_down(uint32_t flags) +{ + if (! (flags & CState::OP_FLAG_LOCKED)) + sp.lock(); + + // per interface comment, we only stage a remote reset if the + // current policy required it + if (cstate.policy.resetcheck) + cstate.flags |= CState::FLAG_RESET; + + disconnect(); + + /* XXX this will almost certainly be called again from + * on_disconnect_event() */ + discard_out_queues(flags|CState::OP_FLAG_LOCKED); + + if (! (flags & CState::OP_FLAG_LOCKED)) + sp.unlock(); + + return 0; +} + +void XioConnection::mark_disposable() +{ + _mark_disposable(XioConnection::CState::OP_FLAG_NONE); +} + +int XioConnection::_mark_disposable(uint32_t flags) +{ + if (! (flags & CState::OP_FLAG_LOCKED)) + sp.lock(); + + cstate.policy.lossy = true; + + if (! (flags & CState::OP_FLAG_LOCKED)) + sp.unlock(); + + return 0; +} + +int XioConnection::CState::state_up_ready(uint32_t flags) +{ + if (! (flags & CState::OP_FLAG_LOCKED)) + xcon->sp.lock(); + + xcon->flush_out_queues(flags|CState::OP_FLAG_LOCKED); + + session_state = session_states::UP; + startup_state = session_startup_states::READY; + + if (! (flags & CState::OP_FLAG_LOCKED)) + xcon->sp.unlock(); + + return (0); +} + +int XioConnection::CState::state_discon() +{ + session_state = session_states::DISCONNECTED; + startup_state = session_startup_states::IDLE; + + return 0; +} + +int XioConnection::CState::state_flow_controlled(uint32_t flags) +{ + if (! (flags & OP_FLAG_LOCKED)) + xcon->sp.lock(); + + session_state = session_states::FLOW_CONTROLLED; + + if (! (flags & OP_FLAG_LOCKED)) + xcon->sp.unlock(); + + return (0); +} + +int XioConnection::CState::state_fail(Message* m, uint32_t flags) +{ + if (! (flags & OP_FLAG_LOCKED)) + xcon->sp.lock(); + + // advance to state FAIL, drop queued, msgs, adjust LRU + session_state = session_states::DISCONNECTED; + startup_state = session_startup_states::FAIL; + + xcon->discard_out_queues(flags|OP_FLAG_LOCKED); + xcon->adjust_clru(flags|OP_FLAG_LOCKED|OP_FLAG_LRU); + + xcon->disconnect(); + + if (! (flags & OP_FLAG_LOCKED)) + xcon->sp.unlock(); + + // notify ULP + XioMessenger* msgr = static_cast<XioMessenger*>(xcon->get_messenger()); + msgr->ms_deliver_handle_reset(xcon); + m->put(); + + return 0; +} + + +int XioLoopbackConnection::send_message(Message *m) +{ + XioMessenger *ms = static_cast<XioMessenger*>(get_messenger()); + m->set_connection(this); + m->set_seq(next_seq()); + m->set_src(ms->get_myinst().name); + ms->ds_dispatch(m); + return 0; +} + +void XioLoopbackConnection::send_keepalive() +{ + utime_t t = ceph_clock_now(); + set_last_keepalive(t); + set_last_keepalive_ack(t); +} diff --git a/src/msg/xio/XioConnection.h b/src/msg/xio/XioConnection.h new file mode 100644 index 00000000..00024ef3 --- /dev/null +++ b/src/msg/xio/XioConnection.h @@ -0,0 +1,380 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Portions Copyright (C) 2013 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef XIO_CONNECTION_H +#define XIO_CONNECTION_H + +#include <atomic> + +#include <boost/intrusive/avl_set.hpp> +#include <boost/intrusive/list.hpp> + +extern "C" { +#include "libxio.h" +} + +#include "XioInSeq.h" +#include "XioSubmit.h" +#include "msg/Connection.h" +#include "msg/Messenger.h" +#include "auth/AuthSessionHandler.h" + +#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL) + + +#define XIO_NOP_TAG_MARKDOWN 0x0001 + +namespace bi = boost::intrusive; + +class XioPortal; +class XioMessenger; +class XioSend; + +class XioConnection : public Connection +{ +public: + enum type { ACTIVE, PASSIVE }; + + enum class session_states : unsigned { + INIT = 0, + START, + UP, + FLOW_CONTROLLED, + DISCONNECTED, + DELETED, + BARRIER + }; + + enum class session_startup_states : unsigned { + IDLE = 0, + CONNECTING, + ACCEPTING, + READY, + FAIL + }; + +private: + XioConnection::type xio_conn_type; + XioPortal *portal; + std::atomic<bool> connected = { false }; + entity_inst_t peer; + struct xio_session *session; + struct xio_connection *conn; + ceph::util::spinlock sp; + std::atomic<int64_t> send = { 0 }; + std::atomic<int64_t> recv = { 0 }; + uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials) + uint32_t magic; + uint32_t special_handling; + uint64_t scount; + uint32_t send_ctr; + int q_high_mark; + int q_low_mark; + + struct lifecycle { + // different from Pipe states? + enum lf_state { + INIT, + LOCAL_DISCON, + REMOTE_DISCON, + RECONNECTING, + UP, + DEAD } state; + + /* XXX */ + uint32_t reconnects; + uint32_t connect_seq, peer_global_seq; + uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt + std::atomic<int64_t> out_seq = { 0 }; + + lifecycle() : state(lifecycle::INIT), reconnects(0), connect_seq(0), + peer_global_seq(0), in_seq(0), out_seq_acked(0) + {} + + void set_in_seq(uint64_t seq) { + in_seq = seq; + } + + uint64_t next_out_seq() { + return ++out_seq; + } + + } state; + + /* batching */ + XioInSeq in_seq; + + class CState + { + public: + static const int FLAG_NONE = 0x0000; + static const int FLAG_BAD_AUTH = 0x0001; + static const int FLAG_MAPPED = 0x0002; + static const int FLAG_RESET = 0x0004; + + static const int OP_FLAG_NONE = 0x0000; + static const int OP_FLAG_LOCKED = 0x0001; + static const int OP_FLAG_LRU = 0x0002; + + uint64_t features; + Messenger::Policy policy; + + CryptoKey session_key; + std::shared_ptr<AuthSessionHandler> session_security; + AuthAuthorizer *authorizer; + XioConnection *xcon; + uint32_t protocol_version; + + std::atomic<session_states> session_state = { 0 }; + std::atomic<session_startup_state> startup_state = { 0 }; + + uint32_t reconnects; + uint32_t connect_seq, global_seq, peer_global_seq; + uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt + std::atomic<uint64_t> out_seq = { 0 }; + + uint32_t flags; + + explicit CState(XioConnection* _xcon) + : features(0), + authorizer(NULL), + xcon(_xcon), + protocol_version(0), + session_state(INIT), + startup_state(IDLE), + reconnects(0), + connect_seq(0), + global_seq(0), + peer_global_seq(0), + in_seq(0), + out_seq_acked(0), + flags(FLAG_NONE) {} + + uint64_t get_session_state() { + return session_state; + } + + uint64_t get_startup_state() { + return startup_state; + } + + void set_in_seq(uint64_t seq) { + in_seq = seq; + } + + uint64_t next_out_seq() { + return ++out_seq; + }; + + // state machine + int init_state(); + int next_state(Message* m); +#if 0 // future (session startup) + int msg_connect(MConnect *m); + int msg_connect_reply(MConnectReply *m); + int msg_connect_reply(MConnectAuthReply *m); + int msg_connect_auth(MConnectAuth *m); + int msg_connect_auth_reply(MConnectAuthReply *m); +#endif + int state_up_ready(uint32_t flags); + int state_flow_controlled(uint32_t flags); + int state_discon(); + int state_fail(Message* m, uint32_t flags); + + } cstate; /* CState */ + + // message submission queue + struct SendQ { + bool keepalive; + bool ack; + utime_t ack_time; + Message::Queue mqueue; // deferred + XioSubmit::Queue requeue; + + SendQ():keepalive(false), ack(false){} + } outgoing; + + // conns_entity_map comparison functor + struct EntityComp + { + // for internal ordering + bool operator()(const XioConnection &lhs, const XioConnection &rhs) const + { return lhs.get_peer() < rhs.get_peer(); } + + // for external search by entity_inst_t(peer) + bool operator()(const entity_inst_t &peer, const XioConnection &c) const + { return peer < c.get_peer(); } + + bool operator()(const XioConnection &c, const entity_inst_t &peer) const + { return c.get_peer() < peer; } + }; + + bi::list_member_hook<> conns_hook; + bi::avl_set_member_hook<> conns_entity_map_hook; + + typedef bi::list< XioConnection, + bi::member_hook<XioConnection, bi::list_member_hook<>, + &XioConnection::conns_hook > > ConnList; + + typedef bi::member_hook<XioConnection, bi::avl_set_member_hook<>, + &XioConnection::conns_entity_map_hook> EntityHook; + + typedef bi::avl_set< XioConnection, EntityHook, + bi::compare<EntityComp> > EntitySet; + + friend class XioPortal; + friend class XioMessenger; + friend class XioDispatchHook; + friend class XioMarkDownHook; + friend class XioSend; + + int on_disconnect_event() { + std::lock_guard<ceph::spinlock> lg(sp); + + connected = false; + discard_out_queues(CState::OP_FLAG_LOCKED); + + return 0; + } + + int on_teardown_event() { + + { + std::lock_guard<ceph::spinlock> lg(sp); + + if (conn) + xio_connection_destroy(conn); + conn = NULL; + } + + this->put(); + return 0; + } + + int xio_qdepth_high_mark() { + return q_high_mark; + } + + int xio_qdepth_low_mark() { + return q_low_mark; + } + +public: + XioConnection(XioMessenger *m, XioConnection::type _type, + const entity_inst_t& peer); + + ~XioConnection() { + if (conn) + xio_connection_destroy(conn); + } + ostream& conn_prefix(std::ostream *_dout); + + bool is_connected() override { return connected; } + + int send_message(Message *m) override; + void send_keepalive() override {send_keepalive_or_ack();} + void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr); + void mark_down() override; + int _mark_down(uint32_t flags); + void mark_disposable() override; + int _mark_disposable(uint32_t flags); + + const entity_inst_t& get_peer() const { return peer; } + + XioConnection* get() { +#if 0 + cout << "XioConnection::get " << this << " " << nref.load() << std::endl; +#endif + RefCountedObject::get(); + return this; + } + + void put() { + RefCountedObject::put(); +#if 0 + cout << "XioConnection::put " << this << " " << nref.load() << std::endl; +#endif + } + + void disconnect() { + if (is_connected()) { + connected = false; + xio_disconnect(conn); // normal teardown will clean up conn + } + } + + uint32_t get_magic() { return magic; } + void set_magic(int _magic) { magic = _magic; } + uint32_t get_special_handling() { return special_handling; } + void set_special_handling(int n) { special_handling = n; } + uint64_t get_scount() { return scount; } + + int passive_setup(); /* XXX */ + + int handle_data_msg(struct xio_session *session, struct xio_msg *msg, + int more_in_batch, void *cb_user_context); + int on_msg(struct xio_session *session, struct xio_msg *msg, + int more_in_batch, void *cb_user_context); + int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg, + void *conn_user_context); + int on_msg_error(struct xio_session *session, enum xio_status error, + struct xio_msg *msg, void *conn_user_context); + void msg_send_fail(XioSend *xsend, int code); + void msg_release_fail(struct xio_msg *msg, int code); +private: + void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr); + int flush_out_queues(uint32_t flags); + int discard_out_queues(uint32_t flags); + int adjust_clru(uint32_t flags); +}; + +typedef boost::intrusive_ptr<XioConnection> XioConnectionRef; + +class XioLoopbackConnection : public Connection +{ +private: + std::atomic<uint64_t> seq = { 0 }; +public: + explicit XioLoopbackConnection(Messenger *m) : Connection(m->cct, m) + { + const entity_inst_t& m_inst = m->get_myinst(); + peer_addr = m_inst.addr; + peer_type = m_inst.name.type(); + set_features(XIO_ALL_FEATURES); /* XXXX set to ours */ + } + + XioLoopbackConnection* get() { + return static_cast<XioLoopbackConnection*>(RefCountedObject::get()); + } + + bool is_connected() override { return true; } + + int send_message(Message *m) override; + void send_keepalive() override; + void mark_down() override {} + void mark_disposable() override {} + + uint64_t get_seq() { + return seq; + } + + uint64_t next_seq() { + return ++seq; + } +}; + +typedef boost::intrusive_ptr<XioLoopbackConnection> XioLoopbackConnectionRef; + +#endif /* XIO_CONNECTION_H */ diff --git a/src/msg/xio/XioInSeq.h b/src/msg/xio/XioInSeq.h new file mode 100644 index 00000000..7863a8f6 --- /dev/null +++ b/src/msg/xio/XioInSeq.h @@ -0,0 +1,84 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Portions Copyright (C) 2013 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef XIO_IN_SEQ_H +#define XIO_IN_SEQ_H + +#include <boost/intrusive/list.hpp> +#include "msg/SimplePolicyMessenger.h" +extern "C" { +#include "libxio.h" +} + +/* For inbound messages (Accelio-owned) ONLY, use the message's + * user_context as an SLIST */ +class XioInSeq { +private: + int cnt; + int sz; + struct xio_msg* head; + struct xio_msg* tail; + +public: + XioInSeq() : cnt(0), sz(0), head(NULL), tail(NULL) {} + XioInSeq(const XioInSeq& seq) { + cnt = seq.cnt; + sz = seq.sz; + head = seq.head; + tail = seq.tail; + } + + int count() { return cnt; } + + int size() { return sz; } + + bool p() { return !!head; } + + void set_count(int _cnt) { cnt = _cnt; } + + void append(struct xio_msg* msg) { + msg->user_context = NULL; + if (!head) { + head = tail = msg; + } else { + tail->user_context = msg; + tail = msg; + } + ++sz; + --cnt; + } + + struct xio_msg* begin() { return head; } + + struct xio_msg* end() { return NULL; } + + void next(struct xio_msg** msg) { + *msg = static_cast<struct xio_msg *>((*msg)->user_context); + } + + struct xio_msg* dequeue() { + struct xio_msg* msgs = head; + clear(); + return msgs; + } + + void clear() { + head = tail = NULL; + cnt = 0; + sz = 0; + } +}; + +#endif /* XIO_IN_SEQ_H */ diff --git a/src/msg/xio/XioMessenger.cc b/src/msg/xio/XioMessenger.cc new file mode 100644 index 00000000..dec7d0c7 --- /dev/null +++ b/src/msg/xio/XioMessenger.cc @@ -0,0 +1,1136 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Portions Copyright (C) 2013 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <arpa/inet.h> +#include <boost/lexical_cast.hpp> +#include <set> +#include <stdlib.h> +#include <memory> + +#include "XioMsg.h" +#include "XioMessenger.h" +#include "common/address_helper.h" +#include "common/code_environment.h" +#include "messages/MNop.h" + +#define dout_subsys ceph_subsys_xio +#undef dout_prefix +#define dout_prefix *_dout << "xio." + +Mutex mtx("XioMessenger Package Lock"); +std::atomic<bool> initialized = { false }; + +std::atomic<unsigned> XioMessenger::nInstances = { 0 }; + +struct xio_mempool *xio_msgr_noreg_mpool; + +static struct xio_session_ops xio_msgr_ops; + +/* Accelio API callouts */ + +namespace xio_log +{ +typedef pair<const char*, int> level_pair; +static const level_pair LEVELS[] = { + make_pair("fatal", 0), + make_pair("error", 0), + make_pair("warn", 1), + make_pair("info", 1), + make_pair("debug", 2), + make_pair("trace", 20) +}; + +static CephContext *context; + +int get_level() +{ + int level = 0; + for (size_t i = 0; i < sizeof(LEVELS); i++) { + if (!ldlog_p1(context, dout_subsys, LEVELS[i].second)) + break; + level++; + } + return level; +} + +void log_dout(const char *file, unsigned line, + const char *function, unsigned level, + const char *fmt, ...) +{ + char buffer[2048]; + va_list args; + va_start(args, fmt); + int n = vsnprintf(buffer, sizeof(buffer), fmt, args); + va_end(args); + + if (n > 0) { + const char *short_file = strrchr(file, '/'); + short_file = (short_file == NULL) ? file : short_file + 1; + + const level_pair &lvl = LEVELS[level]; + ldout(context, lvl.second) << '[' << lvl.first << "] " + << short_file << ':' << line << ' ' + << function << " - " << buffer << dendl; + } +} +} + +static int on_session_event(struct xio_session *session, + struct xio_session_event_data *event_data, + void *cb_user_context) +{ + XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context); + CephContext *cct = msgr->cct; + + ldout(cct,4) << "session event: " << xio_session_event_str(event_data->event) + << ". reason: " << xio_strerror(event_data->reason) << dendl; + + return msgr->session_event(session, event_data, cb_user_context); +} + +static int on_new_session(struct xio_session *session, + struct xio_new_session_req *req, + void *cb_user_context) +{ + XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context); + CephContext *cct = msgr->cct; + + ldout(cct,4) << "new session " << session + << " user_context " << cb_user_context << dendl; + + return (msgr->new_session(session, req, cb_user_context)); +} + +static int on_msg(struct xio_session *session, + struct xio_msg *req, + int more_in_batch, + void *cb_user_context) +{ + XioConnection* xcon __attribute__((unused)) = + static_cast<XioConnection*>(cb_user_context); + CephContext *cct = xcon->get_messenger()->cct; + + ldout(cct,25) << "on_msg session " << session << " xcon " << xcon << dendl; + + if (unlikely(XioPool::trace_mempool)) { + static uint32_t nreqs; + if (unlikely((++nreqs % 65536) == 0)) { + xp_stats.dump(__func__, nreqs); + } + } + + return xcon->on_msg(session, req, more_in_batch, + cb_user_context); +} + +static int on_ow_msg_send_complete(struct xio_session *session, + struct xio_msg *msg, + void *conn_user_context) +{ + XioConnection *xcon = + static_cast<XioConnection*>(conn_user_context); + CephContext *cct = xcon->get_messenger()->cct; + + ldout(cct,25) << "msg delivered session: " << session + << " msg: " << msg << " conn_user_context " + << conn_user_context << dendl; + + return xcon->on_ow_msg_send_complete(session, msg, conn_user_context); +} + +static int on_msg_error(struct xio_session *session, + enum xio_status error, + enum xio_msg_direction dir, + struct xio_msg *msg, + void *conn_user_context) +{ + /* XIO promises to flush back undelivered messages */ + XioConnection *xcon = + static_cast<XioConnection*>(conn_user_context); + CephContext *cct = xcon->get_messenger()->cct; + + ldout(cct,4) << "msg error session: " << session + << " error: " << xio_strerror(error) << " msg: " << msg + << " conn_user_context " << conn_user_context << dendl; + + return xcon->on_msg_error(session, error, msg, conn_user_context); +} + +static int on_cancel(struct xio_session *session, + struct xio_msg *msg, + enum xio_status result, + void *conn_user_context) +{ + XioConnection* xcon __attribute__((unused)) = + static_cast<XioConnection*>(conn_user_context); + CephContext *cct = xcon->get_messenger()->cct; + + ldout(cct,25) << "on cancel: session: " << session << " msg: " << msg + << " conn_user_context " << conn_user_context << dendl; + + return 0; +} + +static int on_cancel_request(struct xio_session *session, + struct xio_msg *msg, + void *conn_user_context) +{ + XioConnection* xcon __attribute__((unused)) = + static_cast<XioConnection*>(conn_user_context); + CephContext *cct = xcon->get_messenger()->cct; + + ldout(cct,25) << "on cancel request: session: " << session << " msg: " << msg + << " conn_user_context " << conn_user_context << dendl; + + return 0; +} + +/* free functions */ +static string xio_uri_from_entity(const string &type, + const entity_addr_t& addr, bool want_port) +{ + const char *host = NULL; + char addr_buf[129]; + string xio_uri; + + switch(addr.get_family()) { + case AF_INET: + host = inet_ntop(AF_INET, &addr.in4_addr().sin_addr, addr_buf, + INET_ADDRSTRLEN); + break; + case AF_INET6: + host = inet_ntop(AF_INET6, &addr.in6_addr().sin6_addr, addr_buf, + INET6_ADDRSTRLEN); + break; + default: + abort(); + break; + }; + + if (type == "rdma" || type == "tcp") + xio_uri = type + "://"; + else + xio_uri = "rdma://"; + + /* The following can only succeed if the host is rdma-capable */ + xio_uri += host; + if (want_port) { + xio_uri += ":"; + xio_uri += boost::lexical_cast<std::string>(addr.get_port()); + } + + return xio_uri; +} /* xio_uri_from_entity */ + +void XioInit::package_init(CephContext *cct) { + if (! initialized) { + + mtx.Lock(); + if (! initialized) { + + xio_init(); + + // claim a reference to the first context we see + xio_log::context = cct->get(); + + int xopt; + xopt = xio_log::get_level(); + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_LEVEL, + &xopt, sizeof(xopt)); + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_FN, + (const void*)xio_log::log_dout, sizeof(xio_log_fn)); + + xopt = 1; + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_DISABLE_HUGETBL, + &xopt, sizeof(xopt)); + + if (g_code_env == CODE_ENVIRONMENT_DAEMON) { + xopt = 1; + xio_set_opt(NULL, XIO_OPTLEVEL_RDMA, XIO_OPTNAME_ENABLE_FORK_INIT, + &xopt, sizeof(xopt)); + } + + xopt = XIO_MSGR_IOVLEN; + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_IN_IOVLEN, + &xopt, sizeof(xopt)); + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN, + &xopt, sizeof(xopt)); + + /* enable flow-control */ + xopt = 1; + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_ENABLE_FLOW_CONTROL, + &xopt, sizeof(xopt)); + + /* and set threshold for buffer callouts */ + xopt = max(cct->_conf->xio_max_send_inline, 512); + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_DATA, + &xopt, sizeof(xopt)); + + xopt = XioMsgHdr::get_max_encoded_length(); + ldout(cct,2) << "setting accelio max header size " << xopt << dendl; + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_HEADER, + &xopt, sizeof(xopt)); + + size_t queue_depth = cct->_conf->xio_queue_depth; + struct xio_mempool_config mempool_config = { + 6, + { + {1024, 0, queue_depth, 262144}, + {4096, 0, queue_depth, 262144}, + {16384, 0, queue_depth, 262144}, + {65536, 0, 128, 65536}, + {262144, 0, 32, 16384}, + {1048576, 0, 8, 8192} + } + }; + xio_set_opt(NULL, + XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_CONFIG_MEMPOOL, + &mempool_config, sizeof(mempool_config)); + + /* and unregisterd one */ + #define XMSG_MEMPOOL_QUANTUM 4096 + + xio_msgr_noreg_mpool = + xio_mempool_create(-1 /* nodeid */, + XIO_MEMPOOL_FLAG_REGULAR_PAGES_ALLOC); + + (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 64, + cct->_conf->xio_mp_min, + cct->_conf->xio_mp_max_64, + XMSG_MEMPOOL_QUANTUM, 0); + (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 256, + cct->_conf->xio_mp_min, + cct->_conf->xio_mp_max_256, + XMSG_MEMPOOL_QUANTUM, 0); + (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 1024, + cct->_conf->xio_mp_min, + cct->_conf->xio_mp_max_1k, + XMSG_MEMPOOL_QUANTUM, 0); + (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, getpagesize(), + cct->_conf->xio_mp_min, + cct->_conf->xio_mp_max_page, + XMSG_MEMPOOL_QUANTUM, 0); + + /* initialize ops singleton */ + xio_msgr_ops.on_session_event = on_session_event; + xio_msgr_ops.on_new_session = on_new_session; + xio_msgr_ops.on_session_established = NULL; + xio_msgr_ops.on_msg = on_msg; + xio_msgr_ops.on_ow_msg_send_complete = on_ow_msg_send_complete; + xio_msgr_ops.on_msg_error = on_msg_error; + xio_msgr_ops.on_cancel = on_cancel; + xio_msgr_ops.on_cancel_request = on_cancel_request; + + /* mark initialized */ + initialized = true; + } + mtx.Unlock(); + } + } + +/* XioMessenger */ +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) +static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) { + return *_dout << "-- " << msgr->get_myaddr_legacy() << " "; +} + +XioMessenger::XioMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t _nonce, + uint64_t cflags, DispatchStrategy *ds) + : SimplePolicyMessenger(cct, name, mname, _nonce), + XioInit(cct), + portals(this, get_nportals(cflags), get_nconns_per_portal(cflags)), + dispatch_strategy(ds), + loop_con(new XioLoopbackConnection(this)), + special_handling(0), + sh_mtx("XioMessenger session mutex"), + sh_cond(), + need_addr(true), + did_bind(false), + nonce(_nonce) +{ + + if (cct->_conf->xio_trace_xcon) + magic |= MSG_MAGIC_TRACE_XCON; + + XioPool::trace_mempool = (cct->_conf->xio_trace_mempool); + XioPool::trace_msgcnt = (cct->_conf->xio_trace_msgcnt); + + dispatch_strategy->set_messenger(this); + + /* update class instance count */ + nInstances++; + + loop_con->set_features(CEPH_FEATURES_ALL); + + ldout(cct,2) << "Create msgr: " << this << " instance: " + << nInstances << " type: " << name.type_str() + << " subtype: " << mname << " nportals: " << get_nportals(cflags) + << " nconns_per_portal: " << get_nconns_per_portal(cflags) + << dendl; + +} /* ctor */ + +int XioMessenger::pool_hint(uint32_t dsize) { + if (dsize > 1024*1024) + return 0; + + /* if dsize is already present, returns -EEXIST */ + return xio_mempool_add_slab(xio_msgr_noreg_mpool, dsize, 0, + cct->_conf->xio_mp_max_hint, + XMSG_MEMPOOL_QUANTUM, 0); +} + +int XioMessenger::get_nconns_per_portal(uint64_t cflags) +{ + const int XIO_DEFAULT_NUM_CONNS_PER_PORTAL = 8; + int nconns = XIO_DEFAULT_NUM_CONNS_PER_PORTAL; + + if (cflags & Messenger::HAS_MANY_CONNECTIONS) + nconns = max(cct->_conf->xio_max_conns_per_portal, XIO_DEFAULT_NUM_CONNS_PER_PORTAL); + else if (cflags & Messenger::HEARTBEAT) + nconns = max(cct->_conf->osd_heartbeat_min_peers * 4, XIO_DEFAULT_NUM_CONNS_PER_PORTAL); + + return nconns; +} + +int XioMessenger::get_nportals(uint64_t cflags) +{ + int nportals = 1; + + if (cflags & Messenger::HAS_HEAVY_TRAFFIC) + nportals = max(cct->_conf->xio_portal_threads, 1); + + return nportals; +} + +void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +{ + // be careful here: multiple threads may block here, and readers of + // my_inst.addr do NOT hold any lock. + + // this always goes from true -> false under the protection of the + // mutex. if it is already false, we need not retake the mutex at + // all. + if (!need_addr) + return; + + sh_mtx.Lock(); + if (need_addr) { + entity_addr_t t = peer_addr_for_me; + t.set_port(my_inst.addr.get_port()); + my_inst.addr.set_sockaddr(t.get_sockaddr()); + ldout(cct,2) << "learned my addr " << my_inst.addr << dendl; + need_addr = false; + // init_local_connection(); + } + sh_mtx.Unlock(); + +} + +int XioMessenger::new_session(struct xio_session *session, + struct xio_new_session_req *req, + void *cb_user_context) +{ + if (shutdown_called) { + return xio_reject( + session, XIO_E_SESSION_REFUSED, NULL /* udata */, 0 /* udata len */); + } + int code = portals.accept(session, req, cb_user_context); + if (! code) + nsessions++; + return code; +} /* new_session */ + +int XioMessenger::session_event(struct xio_session *session, + struct xio_session_event_data *event_data, + void *cb_user_context) +{ + XioConnection *xcon; + + switch (event_data->event) { + case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT: + { + struct xio_connection *conn = event_data->conn; + struct xio_connection_attr xcona; + entity_addr_t peer_addr_for_me, paddr; + + xcon = static_cast<XioConnection*>(event_data->conn_user_context); + + ldout(cct,2) << "connection established " << event_data->conn + << " session " << session << " xcon " << xcon << dendl; + + (void) xio_query_connection(conn, &xcona, + XIO_CONNECTION_ATTR_LOCAL_ADDR| + XIO_CONNECTION_ATTR_PEER_ADDR); + peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr); + paddr.set_sockaddr((struct sockaddr *)&xcona.peer_addr); + //set_myaddr(peer_addr_for_me); + learned_addr(peer_addr_for_me); + ldout(cct,2) << "client: connected from " << peer_addr_for_me << " to " << paddr << dendl; + + /* notify hook */ + this->ms_deliver_handle_connect(xcon); + this->ms_deliver_handle_fast_connect(xcon); + } + break; + + case XIO_SESSION_NEW_CONNECTION_EVENT: + { + struct xio_connection *conn = event_data->conn; + struct xio_connection_attr xcona; + entity_inst_t s_inst; + entity_addr_t peer_addr_for_me; + + (void) xio_query_connection(conn, &xcona, + XIO_CONNECTION_ATTR_CTX| + XIO_CONNECTION_ATTR_PEER_ADDR| + XIO_CONNECTION_ATTR_LOCAL_ADDR); + /* XXX assumes RDMA */ + s_inst.addr.set_sockaddr((struct sockaddr *)&xcona.peer_addr); + peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr); + + xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst); + xcon->session = session; + + struct xio_context_attr xctxa; + (void) xio_query_context(xcona.ctx, &xctxa, XIO_CONTEXT_ATTR_USER_CTX); + + xcon->conn = conn; + xcon->portal = static_cast<XioPortal*>(xctxa.user_context); + ceph_assert(xcon->portal); + + xcona.user_context = xcon; + (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX); + + xcon->connected = true; + + /* sentinel ref */ + xcon->get(); /* xcon->nref == 1 */ + conns_sp.lock(); + conns_list.push_back(*xcon); + /* XXX we can't put xcon in conns_entity_map becase we don't yet know + * it's peer address */ + conns_sp.unlock(); + + /* XXXX pre-merge of session startup negotiation ONLY! */ + xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE); + + ldout(cct,2) << "New connection session " << session + << " xcon " << xcon << " on msgr: " << this << " portal: " << xcon->portal << dendl; + ldout(cct,2) << "Server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl; + } + break; + case XIO_SESSION_CONNECTION_ERROR_EVENT: + case XIO_SESSION_CONNECTION_CLOSED_EVENT: /* orderly discon */ + case XIO_SESSION_CONNECTION_DISCONNECTED_EVENT: /* unexpected discon */ + case XIO_SESSION_CONNECTION_REFUSED_EVENT: + xcon = static_cast<XioConnection*>(event_data->conn_user_context); + ldout(cct,2) << xio_session_event_str(event_data->event) + << " xcon " << xcon << " session " << session << dendl; + if (likely(!!xcon)) { + unregister_xcon(xcon); + xcon->on_disconnect_event(); + } + break; + case XIO_SESSION_CONNECTION_TEARDOWN_EVENT: + xcon = static_cast<XioConnection*>(event_data->conn_user_context); + ldout(cct,2) << xio_session_event_str(event_data->event) + << " xcon " << xcon << " session " << session << dendl; + /* + * There are flows where Accelio sends teardown event without going + * through disconnect event. so we make sure we cleaned the connection. + */ + unregister_xcon(xcon); + xcon->on_teardown_event(); + break; + case XIO_SESSION_TEARDOWN_EVENT: + ldout(cct,2) << xio_session_event_str(event_data->event) + << " session " << session << dendl; + if (unlikely(XioPool::trace_mempool)) { + xp_stats.dump("xio session dtor", reinterpret_cast<uint64_t>(session)); + } + xio_session_destroy(session); + if (--nsessions == 0) { + Mutex::Locker lck(sh_mtx); + if (nsessions == 0) + sh_cond.Signal(); + } + break; + default: + break; + }; + + return 0; +} + +enum bl_type +{ + BUFFER_PAYLOAD, + BUFFER_MIDDLE, + BUFFER_DATA +}; + +#define MAX_XIO_BUF_SIZE 1044480 + +static inline int +xio_count_buffers(const buffer::list& bl, int& req_size, int& msg_off, int& req_off) +{ + + const std::list<buffer::ptr>& buffers = bl.buffers(); + list<bufferptr>::const_iterator pb; + size_t size, off; + int result; + int first = 1; + + off = size = 0; + result = 0; + for (;;) { + if (off >= size) { + if (first) pb = buffers.begin(); else ++pb; + if (pb == buffers.end()) { + break; + } + off = 0; + size = pb->length(); + first = 0; + } + size_t count = size - off; + if (!count) continue; + if (req_size + count > MAX_XIO_BUF_SIZE) { + count = MAX_XIO_BUF_SIZE - req_size; + } + + ++result; + + /* advance iov and perhaps request */ + + off += count; + req_size += count; + ++msg_off; + if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) { + ++req_off; + msg_off = 0; + req_size = 0; + } + } + + return result; +} + +static inline void +xio_place_buffers(const buffer::list& bl, XioMsg *xmsg, struct xio_msg*& req, + struct xio_iovec_ex*& msg_iov, int& req_size, + int ex_cnt, int& msg_off, int& req_off, bl_type type) +{ + + const std::list<buffer::ptr>& buffers = bl.buffers(); + list<bufferptr>::const_iterator pb; + struct xio_iovec_ex* iov; + size_t size, off; + const char *data = NULL; + int first = 1; + + off = size = 0; + for (;;) { + if (off >= size) { + if (first) pb = buffers.begin(); else ++pb; + if (pb == buffers.end()) { + break; + } + off = 0; + size = pb->length(); + data = pb->c_str(); // is c_str() efficient? + first = 0; + } + size_t count = size - off; + if (!count) continue; + if (req_size + count > MAX_XIO_BUF_SIZE) { + count = MAX_XIO_BUF_SIZE - req_size; + } + + /* assign buffer */ + iov = &msg_iov[msg_off]; + iov->iov_base = (void *) (&data[off]); + iov->iov_len = count; + + switch (type) { + case BUFFER_DATA: + //break; + default: + { + struct xio_reg_mem *mp = get_xio_mp(*pb); + iov->mr = (mp) ? mp->mr : NULL; + } + break; + } + + /* advance iov(s) */ + + off += count; + req_size += count; + ++msg_off; + + /* next request if necessary */ + + if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) { + /* finish this request */ + req->out.pdata_iov.nents = msg_off; + /* advance to next, and write in it if it's not the last one. */ + if (++req_off >= ex_cnt) { + req = 0; /* poison. trap if we try to use it. */ + msg_iov = NULL; + } else { + req = &xmsg->req_arr[req_off].msg; + msg_iov = req->out.pdata_iov.sglist; + } + msg_off = 0; + req_size = 0; + } + } +} + +int XioMessenger::bind(const entity_addr_t& addr) +{ + if (addr.is_blank_ip()) { + lderr(cct) << "ERROR: need rdma ip for remote use! " << dendl; + cout << "Error: xio bind failed. public/cluster ip not specified" << std::endl; + return -1; + } + + entity_addr_t shift_addr = addr; + string base_uri = xio_uri_from_entity(cct->_conf->xio_transport_type, + shift_addr, false /* want_port */); + ldout(cct,4) << "XioMessenger " << this << " bind: xio_uri " + << base_uri << ':' << shift_addr.get_port() << dendl; + + uint16_t port0; + int r = portals.bind(&xio_msgr_ops, base_uri, shift_addr.get_port(), &port0); + if (r == 0) { + shift_addr.set_port(port0); + shift_addr.nonce = nonce; + set_myaddr(shift_addr); + need_addr = false; + did_bind = true; + } + return r; +} /* bind */ + +int XioMessenger::rebind(const set<int>& avoid_ports) +{ + ldout(cct,4) << "XioMessenger " << this << " rebind attempt" << dendl; + return 0; +} /* rebind */ + +int XioMessenger::start() +{ + portals.start(); + dispatch_strategy->start(); + if (!did_bind) { + my_inst.addr.nonce = nonce; + } + started = true; + return 0; +} + +void XioMessenger::wait() +{ + portals.join(); + dispatch_strategy->wait(); +} /* wait */ + +int XioMessenger::_send_message(Message *m, const entity_inst_t& dest) +{ + ConnectionRef conn = get_connection(dest); + if (conn) + return _send_message(m, &(*conn)); + else + return EINVAL; +} /* send_message(Message *, const entity_inst_t&) */ + +static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon, + int ex_cnt) +{ + struct xio_reg_mem mp_mem; + int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioMsg), &mp_mem); + if (!!e) + return NULL; + XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr); + ceph_assert(!!xmsg); + new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL); + return xmsg; +} + +XioCommand* pool_alloc_xio_command(XioConnection *xcon) +{ + struct xio_reg_mem mp_mem; + int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioCommand), &mp_mem); + if (!!e) + return NULL; + XioCommand *xcmd = reinterpret_cast<XioCommand*>(mp_mem.addr); + ceph_assert(!!xcmd); + new (xcmd) XioCommand(xcon, mp_mem); + return xcmd; +} + +int XioMessenger::_send_message(Message *m, Connection *con) +{ + if (con == loop_con.get() /* intrusive_ptr get() */) { + m->set_connection(con); + m->set_src(get_myinst().name); + m->set_seq(loop_con->next_seq()); + ds_dispatch(m); + return 0; + } + + XioConnection *xcon = static_cast<XioConnection*>(con); + + /* If con is not in READY state, we have to enforce policy */ + if (xcon->cstate.session_state.read() != XioConnection::UP) { + std::lock_guard<decltype(xcon->sp) lg(xcon->sp); + + if (xcon->cstate.session_state.read() != XioConnection::UP) { + xcon->outgoing.mqueue.push_back(*m); + return 0; + } + } + + return _send_message_impl(m, xcon); +} /* send_message(Message* m, Connection *con) */ + +int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon) +{ + int code = 0; + + Mutex::Locker l(xcon->lock); + if (unlikely(XioPool::trace_mempool)) { + static uint32_t nreqs; + if (unlikely((++nreqs % 65536) == 0)) { + xp_stats.dump(__func__, nreqs); + } + } + + m->set_seq(xcon->state.next_out_seq()); + m->set_magic(magic); // trace flags and special handling + + m->encode(xcon->get_features(), this->crcflags); + + buffer::list &payload = m->get_payload(); + buffer::list &middle = m->get_middle(); + buffer::list &data = m->get_data(); + + int msg_off = 0; + int req_off = 0; + int req_size = 0; + int nbuffers = + xio_count_buffers(payload, req_size, msg_off, req_off) + + xio_count_buffers(middle, req_size, msg_off, req_off) + + xio_count_buffers(data, req_size, msg_off, req_off); + + int ex_cnt = req_off; + if (msg_off == 0 && ex_cnt > 0) { + // no buffers for last msg + ldout(cct,10) << "msg_off 0, ex_cnt " << ex_cnt << " -> " << ex_cnt-1 << dendl; + ex_cnt--; + } + + /* get an XioMsg frame */ + XioMsg *xmsg = pool_alloc_xio_msg(m, xcon, ex_cnt); + if (! xmsg) { + /* could happen if Accelio has been shutdown */ + return ENOMEM; + } + + ldout(cct,4) << __func__ << " " << m << " new XioMsg " << xmsg + << " tag " << (int)xmsg->hdr.tag + << " req_0 " << xmsg->get_xio_msg() << " msg type " << m->get_type() + << " features: " << xcon->get_features() + << " conn " << xcon->conn << " sess " << xcon->session << dendl; + + if (magic & (MSG_MAGIC_XIO)) { + + /* XXXX verify */ + switch (m->get_type()) { + case 43: + // case 15: + ldout(cct,4) << __func__ << " stop 43 " << m->get_type() << " " << *m << dendl; + buffer::list &payload = m->get_payload(); + ldout(cct,4) << __func__ << " payload dump:" << dendl; + payload.hexdump(cout); + } + } + + struct xio_msg *req = xmsg->get_xio_msg(); + struct xio_iovec_ex *msg_iov = req->out.pdata_iov.sglist; + + if (magic & (MSG_MAGIC_XIO)) { + ldout(cct,4) << "payload: " << payload.buffers().size() << + " middle: " << middle.buffers().size() << + " data: " << data.buffers().size() << + dendl; + } + + if (unlikely(ex_cnt > 0)) { + ldout(cct,4) << __func__ << " buffer cnt > XIO_MSGR_IOVLEN (" << + ((XIO_MSGR_IOVLEN-1) + nbuffers) << ")" << dendl; + } + + /* do the invariant part */ + msg_off = 0; + req_off = -1; /* most often, not used */ + req_size = 0; + + xio_place_buffers(payload, xmsg, req, msg_iov, req_size, ex_cnt, msg_off, + req_off, BUFFER_PAYLOAD); + + xio_place_buffers(middle, xmsg, req, msg_iov, req_size, ex_cnt, msg_off, + req_off, BUFFER_MIDDLE); + + xio_place_buffers(data, xmsg, req, msg_iov, req_size, ex_cnt, msg_off, + req_off, BUFFER_DATA); + ldout(cct,10) << "ex_cnt " << ex_cnt << ", req_off " << req_off + << ", msg_cnt " << xmsg->get_msg_count() << dendl; + + /* finalize request */ + if (msg_off) + req->out.pdata_iov.nents = msg_off; + + /* fixup first msg */ + req = xmsg->get_xio_msg(); + + const std::list<buffer::ptr>& header = xmsg->hdr.get_bl().buffers(); + ceph_assert(header.size() == 1); /* XXX */ + list<bufferptr>::const_iterator pb = header.begin(); + req->out.header.iov_base = (char*) pb->c_str(); + req->out.header.iov_len = pb->length(); + + /* deliver via xio, preserve ordering */ + if (xmsg->get_msg_count() > 1) { + struct xio_msg *head = xmsg->get_xio_msg(); + struct xio_msg *tail = head; + for (req_off = 0; ((unsigned) req_off) < xmsg->get_msg_count()-1; ++req_off) { + req = &xmsg->req_arr[req_off].msg; +assert(!req->in.pdata_iov.nents); +assert(req->out.pdata_iov.nents || !nbuffers); + tail->next = req; + tail = req; + } + tail->next = NULL; + } + xmsg->trace = m->trace; + m->trace.event("xio portal enqueue for send"); + m->trace.keyval("xio message segments", xmsg->hdr.msg_cnt); + xcon->portal->enqueue_for_send(xcon, xmsg); + + return code; +} /* send_message(Message *, Connection *) */ + +int XioMessenger::shutdown() +{ + shutdown_called = true; + conns_sp.lock(); + XioConnection::ConnList::iterator iter; + iter = conns_list.begin(); + for (iter = conns_list.begin(); iter != conns_list.end(); ++iter) { + (void) iter->disconnect(); // XXX mark down? + } + conns_sp.unlock(); + while(nsessions > 0) { + Mutex::Locker lck(sh_mtx); + if (nsessions > 0) + sh_cond.Wait(sh_mtx); + } + portals.shutdown(); + dispatch_strategy->shutdown(); + did_bind = false; + started = false; + return 0; +} /* shutdown */ + +ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest) +{ + if (shutdown_called) + return NULL; + + const entity_inst_t& self_inst = get_myinst(); + if ((&dest == &self_inst) || + (dest == self_inst)) { + return get_loopback_connection(); + } + + conns_sp.lock(); + XioConnection::EntitySet::iterator conn_iter = + conns_entity_map.find(dest, XioConnection::EntityComp()); + if (conn_iter != conns_entity_map.end()) { + ConnectionRef cref = &(*conn_iter); + conns_sp.unlock(); + return cref; + } + else { + conns_sp.unlock(); + string xio_uri = xio_uri_from_entity(cct->_conf->xio_transport_type, + dest.addr, true /* want_port */); + + ldout(cct,4) << "XioMessenger " << this << " get_connection: xio_uri " + << xio_uri << dendl; + + /* XXX client session creation parameters */ + struct xio_session_params params = {}; + params.type = XIO_SESSION_CLIENT; + params.ses_ops = &xio_msgr_ops; + params.user_context = this; + params.uri = xio_uri.c_str(); + + XioConnection *xcon = new XioConnection(this, XioConnection::ACTIVE, + dest); + + xcon->session = xio_session_create(¶ms); + if (! xcon->session) { + delete xcon; + return NULL; + } + + /* this should cause callbacks with user context of conn, but + * we can always set it explicitly */ + struct xio_connection_params xcp = {}; + xcp.session = xcon->session; + xcp.ctx = xcon->portal->ctx; + xcp.conn_user_context = xcon; + + xcon->conn = xio_connect(&xcp); + if (!xcon->conn) { + xio_session_destroy(xcon->session); + delete xcon; + return NULL; + } + + nsessions++; + xcon->connected = true; + + /* sentinel ref */ + xcon->get(); /* xcon->nref == 1 */ + conns_sp.lock(); + conns_list.push_back(*xcon); + conns_entity_map.insert(*xcon); + conns_sp.unlock(); + + /* XXXX pre-merge of session startup negotiation ONLY! */ + xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE); + + ldout(cct,2) << "New connection xcon: " << xcon << + " up_ready on session " << xcon->session << + " on msgr: " << this << " portal: " << xcon->portal << dendl; + + return xcon->get(); /* nref +1 */ + } +} /* get_connection */ + +ConnectionRef XioMessenger::get_loopback_connection() +{ + return (loop_con.get()); +} /* get_loopback_connection */ + +void XioMessenger::unregister_xcon(XioConnection *xcon) +{ + std::lock_guard<decltype(conns_sp)> lckr(conns_sp); + + XioConnection::EntitySet::iterator conn_iter = + conns_entity_map.find(xcon->peer, XioConnection::EntityComp()); + if (conn_iter != conns_entity_map.end()) { + XioConnection *xcon2 = &(*conn_iter); + if (xcon == xcon2) { + conns_entity_map.erase(conn_iter); + } + } + + /* check if citer on conn_list */ + if (xcon->conns_hook.is_linked()) { + /* now find xcon on conns_list and erase */ + XioConnection::ConnList::iterator citer = + XioConnection::ConnList::s_iterator_to(*xcon); + conns_list.erase(citer); + } +} + +void XioMessenger::mark_down(const entity_addr_t& addr) +{ + entity_inst_t inst(entity_name_t(), addr); + std::lock_guard<decltype(conns_sp)> lckr(conns_sp); + XioConnection::EntitySet::iterator conn_iter = + conns_entity_map.find(inst, XioConnection::EntityComp()); + if (conn_iter != conns_entity_map.end()) { + (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE); + } +} /* mark_down(const entity_addr_t& */ + +void XioMessenger::mark_down(Connection* con) +{ + XioConnection *xcon = static_cast<XioConnection*>(con); + xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE); +} /* mark_down(Connection*) */ + +void XioMessenger::mark_down_all() +{ + std::lock_guard<decltype(conns_sp)> lckr(conns_sp); + XioConnection::EntitySet::iterator conn_iter; + for (conn_iter = conns_entity_map.begin(); conn_iter != + conns_entity_map.begin(); ++conn_iter) { + (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE); + } +} /* mark_down_all */ + +static inline XioMarkDownHook* pool_alloc_markdown_hook( + XioConnection *xcon, Message *m) +{ + struct xio_reg_mem mp_mem; + int e = xio_mempool_alloc(xio_msgr_noreg_mpool, + sizeof(XioMarkDownHook), &mp_mem); + if (!!e) + return NULL; + XioMarkDownHook *hook = static_cast<XioMarkDownHook*>(mp_mem.addr); + new (hook) XioMarkDownHook(xcon, m, mp_mem); + return hook; +} + +void XioMessenger::mark_down_on_empty(Connection* con) +{ + XioConnection *xcon = static_cast<XioConnection*>(con); + MNop* m = new MNop(); + m->tag = XIO_NOP_TAG_MARKDOWN; + m->set_completion_hook(pool_alloc_markdown_hook(xcon, m)); + // stall new messages + xcon->cstate.session_state = XioConnection::session_states::BARRIER; + (void) _send_message_impl(m, xcon); +} + +void XioMessenger::mark_disposable(Connection *con) +{ + XioConnection *xcon = static_cast<XioConnection*>(con); + xcon->_mark_disposable(XioConnection::CState::OP_FLAG_NONE); +} + +void XioMessenger::try_insert(XioConnection *xcon) +{ + std::lock_guard<decltype(conns_sp)> lckr(conns_sp); + /* already resident in conns_list */ + conns_entity_map.insert(*xcon); +} + +XioMessenger::~XioMessenger() +{ + delete dispatch_strategy; + nInstances--; +} /* dtor */ diff --git a/src/msg/xio/XioMessenger.h b/src/msg/xio/XioMessenger.h new file mode 100644 index 00000000..6f8a67ba --- /dev/null +++ b/src/msg/xio/XioMessenger.h @@ -0,0 +1,176 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Portions Copyright (C) 2013 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef XIO_MESSENGER_H +#define XIO_MESSENGER_H + +#include "msg/SimplePolicyMessenger.h" + +#include <atomic> + +extern "C" { +#include "libxio.h" +} + +#include "XioConnection.h" +#include "XioPortal.h" +#include "QueueStrategy.h" +#include "common/Thread.h" +#include "common/Mutex.h" +#include "include/spinlock.h" + +class XioInit { + /* safe to be called multiple times */ + void package_init(CephContext *cct); + +protected: + explicit XioInit(CephContext *cct) { + this->package_init(cct); + } +}; + +class XioMessenger : public SimplePolicyMessenger, XioInit +{ +private: + static std::atomic<uint64_t> nInstances = { 0 }; + std::atomic<uint64_t> nsessions = { 0 }; + std::atomic<bool> shutdown_called = { false }; + ceph::spinlock conns_sp; + XioConnection::ConnList conns_list; + XioConnection::EntitySet conns_entity_map; + XioPortals portals; + DispatchStrategy* dispatch_strategy; + XioLoopbackConnectionRef loop_con; + uint32_t special_handling; + Mutex sh_mtx; + Cond sh_cond; + bool need_addr; + bool did_bind; + + /// approximately unique ID set by the Constructor for use in entity_addr_t + uint64_t nonce; + + friend class XioConnection; + +public: + XioMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t nonce, + uint64_t cflags = 0, + DispatchStrategy* ds = new QueueStrategy(1)); + + virtual ~XioMessenger(); + + XioPortal* get_portal() { return portals.get_next_portal(); } + + virtual void set_myaddr(const entity_addr_t& a) { + Messenger::set_myaddr(a); + loop_con->set_peer_addr(a); + } + + int _send_message(Message *m, const entity_inst_t &dest); + int _send_message(Message *m, Connection *con); + int _send_message_impl(Message *m, XioConnection *xcon); + + uint32_t get_special_handling() { return special_handling; } + void set_special_handling(int n) { special_handling = n; } + int pool_hint(uint32_t size); + void try_insert(XioConnection *xcon); + + /* xio hooks */ + int new_session(struct xio_session *session, + struct xio_new_session_req *req, + void *cb_user_context); + + int session_event(struct xio_session *session, + struct xio_session_event_data *event_data, + void *cb_user_context); + + /* Messenger interface */ + virtual bool set_addr_unknowns(const entity_addrvec_t &addr) override + { } /* XXX applicable? */ + virtual void set_addr(const entity_addr_t &addr) override + { } /* XXX applicable? */ + + virtual int get_dispatch_queue_len() + { return 0; } /* XXX bogus? */ + + virtual double get_dispatch_queue_max_age(utime_t now) + { return 0; } /* XXX bogus? */ + + virtual void set_cluster_protocol(int p) + { } + + virtual int bind(const entity_addr_t& addr); + + virtual int rebind(const set<int>& avoid_ports); + + virtual int start(); + + virtual void wait(); + + virtual int shutdown(); + + virtual int send_message(Message *m, const entity_inst_t &dest) { + return _send_message(m, dest); + } + + virtual int lazy_send_message(Message *m, const entity_inst_t& dest) + { return EINVAL; } + + virtual int lazy_send_message(Message *m, Connection *con) + { return EINVAL; } + + virtual ConnectionRef get_connection(const entity_inst_t& dest); + + // compat hack + ConnectionRef connect_to( + int type, const entity_addrvec_t& dest) override { + return get_connection(entity_inst_t(entity_name_t(type, -1), + dest.legacy_addr())); + } + + virtual ConnectionRef get_loopback_connection(); + + void unregister_xcon(XioConnection *xcon); + virtual void mark_down(const entity_addr_t& a); + virtual void mark_down(Connection *con); + virtual void mark_down_all(); + virtual void mark_down_on_empty(Connection *con); + virtual void mark_disposable(Connection *con); + + void ds_dispatch(Message *m) + { dispatch_strategy->ds_dispatch(m); } + + /** + * Tell the XioMessenger its full IP address. + * + * This is used by clients when connecting to other endpoints, and + * probably shouldn't be called by anybody else. + */ + void learned_addr(const entity_addr_t& peer_addr_for_me); + +private: + int get_nconns_per_portal(uint64_t cflags); + int get_nportals(uint64_t cflags); + +protected: + virtual void ready() + { } +}; + +XioCommand* pool_alloc_xio_command(XioConnection *xcon); + + +#endif /* XIO_MESSENGER_H */ diff --git a/src/msg/xio/XioMsg.cc b/src/msg/xio/XioMsg.cc new file mode 100644 index 00000000..4b6a5d68 --- /dev/null +++ b/src/msg/xio/XioMsg.cc @@ -0,0 +1,51 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Portions Copyright (C) 2013 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "XioMessenger.h" +#include "XioConnection.h" +#include "XioMsg.h" + + +int XioDispatchHook::release_msgs() +{ + XioCompletion *xcmp; + int r = msg_seq.size(); + cl_flag = true; + + /* queue for release */ + xcmp = static_cast<XioCompletion *>(rsp_pool.alloc(sizeof(XioCompletion))); + new (xcmp) XioCompletion(xcon, this); + xcmp->trace = m->trace; + + /* merge with portal traffic */ + xcon->portal->enqueue(xcon, xcmp); + + ceph_assert(r); + return r; +} + +/*static*/ size_t XioMsgHdr::get_max_encoded_length() { + ceph_msg_header _ceph_msg_header; + ceph_msg_footer _ceph_msg_footer; + XioMsgHdr hdr (_ceph_msg_header, _ceph_msg_footer, 0 /* features */); + const std::list<buffer::ptr>& hdr_buffers = hdr.get_bl().buffers(); + ceph_assert(hdr_buffers.size() == 1); /* accelio header is small without scatter gather */ + return hdr_buffers.begin()->length(); +} + +void XioMsg::print_debug(CephContext *cct, const char *tag) const { + print_xio_msg_hdr(cct, tag, hdr, get_xio_msg()); + print_ceph_msg(cct, tag, m); +} diff --git a/src/msg/xio/XioMsg.h b/src/msg/xio/XioMsg.h new file mode 100644 index 00000000..2f0c8490 --- /dev/null +++ b/src/msg/xio/XioMsg.h @@ -0,0 +1,446 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Portions Copyright (C) 2013 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef XIO_MSG_H +#define XIO_MSG_H + +#include <boost/intrusive/list.hpp> +#include "msg/SimplePolicyMessenger.h" +extern "C" { +#include "libxio.h" +} +#include "XioConnection.h" +#include "XioSubmit.h" +#include "msg/msg_types.h" +#include "XioPool.h" + +namespace bi = boost::intrusive; + +class XioMessenger; + +class XioMsgCnt +{ +public: + ceph_le32 msg_cnt; + buffer::list bl; +public: + explicit XioMsgCnt(buffer::ptr p) + { + bl.append(p); + buffer::list::iterator bl_iter = bl.begin(); + decode(msg_cnt, bl_iter); + } +}; + +class XioMsgHdr +{ +public: + char tag; + ceph_le32 msg_cnt; + ceph_le32 peer_type; + entity_addr_t addr; /* XXX hack! */ + ceph_msg_header* hdr; + ceph_msg_footer* ftr; + uint64_t features; + buffer::list bl; +public: + XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer& _ftr, uint64_t _features) + : tag(CEPH_MSGR_TAG_MSG), msg_cnt(init_le32(0)), hdr(&_hdr), ftr(&_ftr), + features(_features) + { } + + XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer &_ftr, buffer::ptr p) + : hdr(&_hdr), ftr(&_ftr) + { + bl.append(p); + buffer::list::iterator bl_iter = bl.begin(); + decode(bl_iter); + } + + static size_t get_max_encoded_length(); + + const buffer::list& get_bl() { encode(bl); return bl; }; + + inline void encode_hdr(ceph::buffer::list& bl) const { + using ceph::encode; + encode(tag, bl); + encode(msg_cnt, bl); + encode(peer_type, bl); + encode(addr, bl, features); + encode(hdr->seq, bl); + encode(hdr->tid, bl); + encode(hdr->type, bl); + encode(hdr->priority, bl); + encode(hdr->version, bl); + encode(hdr->front_len, bl); + encode(hdr->middle_len, bl); + encode(hdr->data_len, bl); + encode(hdr->data_off, bl); + encode(hdr->src.type, bl); + encode(hdr->src.num, bl); + encode(hdr->compat_version, bl); + encode(hdr->crc, bl); + } + + inline void encode_ftr(buffer::list& bl) const { + using ceph::encode; + encode(ftr->front_crc, bl); + encode(ftr->middle_crc, bl); + encode(ftr->data_crc, bl); + encode(ftr->sig, bl); + encode(ftr->flags, bl); + } + + inline void encode(buffer::list& bl) const { + encode_hdr(bl); + encode_ftr(bl); + } + + inline void decode_hdr(buffer::list::iterator& bl) { + using ceph::decode; + decode(tag, bl); + decode(msg_cnt, bl); + decode(peer_type, bl); + decode(addr, bl); + decode(hdr->seq, bl); + decode(hdr->tid, bl); + decode(hdr->type, bl); + decode(hdr->priority, bl); + decode(hdr->version, bl); + decode(hdr->front_len, bl); + decode(hdr->middle_len, bl); + decode(hdr->data_len, bl); + decode(hdr->data_off, bl); + decode(hdr->src.type, bl); + decode(hdr->src.num, bl); + decode(hdr->compat_version, bl); + decode(hdr->crc, bl); + } + + inline void decode_ftr(buffer::list::iterator& bl) { + using ceph::decode; + decode(ftr->front_crc, bl); + decode(ftr->middle_crc, bl); + decode(ftr->data_crc, bl); + decode(ftr->sig, bl); + decode(ftr->flags, bl); + } + + inline void decode(buffer::list::iterator& bl) { + decode_hdr(bl); + decode_ftr(bl); + } + + virtual ~XioMsgHdr() + {} +}; + +WRITE_CLASS_ENCODER(XioMsgHdr); + +extern struct xio_mempool *xio_msgr_noreg_mpool; + +#define XIO_MSGR_IOVLEN 16 + +struct xio_msg_ex +{ + struct xio_msg msg; + struct xio_iovec_ex iovs[XIO_MSGR_IOVLEN]; + + explicit xio_msg_ex(void* user_context) { + // go in structure order + msg.in.header.iov_len = 0; + msg.in.header.iov_base = NULL; /* XXX Accelio requires this currently */ + + msg.in.sgl_type = XIO_SGL_TYPE_IOV_PTR; + msg.in.pdata_iov.max_nents = XIO_MSGR_IOVLEN; + msg.in.pdata_iov.nents = 0; + msg.in.pdata_iov.sglist = iovs; + + // minimal zero "out" side + msg.out.header.iov_len = 0; + msg.out.header.iov_base = NULL; /* XXX Accelio requires this currently, + * against spec */ + // out (some members adjusted later) + msg.out.sgl_type = XIO_SGL_TYPE_IOV_PTR; + msg.out.pdata_iov.max_nents = XIO_MSGR_IOVLEN; + msg.out.pdata_iov.nents = 0; + msg.out.pdata_iov.sglist = iovs; + + // minimal initialize an "out" msg + msg.request = NULL; + msg.type = XIO_MSG_TYPE_ONE_WAY; + // for now, we DO NEED receipts for every msg + msg.flags = 0; + msg.user_context = user_context; + msg.next = NULL; + // minimal zero "in" side + } +}; + +class XioSend : public XioSubmit +{ +public: + virtual void print_debug(CephContext *cct, const char *tag) const {}; + const struct xio_msg * get_xio_msg() const {return &req_0.msg;} + struct xio_msg * get_xio_msg() {return &req_0.msg;} + virtual size_t get_msg_count() const {return 1;} + + XioSend(XioConnection *_xcon, struct xio_reg_mem& _mp, int _ex_cnt=0) : + XioSubmit(XioSubmit::OUTGOING_MSG, _xcon), + req_0(this), mp_this(_mp), nrefs(_ex_cnt+1) + { + xpool_inc_msgcnt(); + xcon->get(); + } + + XioSend* get() { nrefs++; return this; }; + + void put(int n) { + int refs = nrefs -= n; + if (refs == 0) { + struct xio_reg_mem *mp = &this->mp_this; + this->~XioSend(); + xpool_free(sizeof(XioSend), mp); + } + } + + void put() { + put(1); + } + + void put_msg_refs() { + put(get_msg_count()); + } + + virtual ~XioSend() { + xpool_dec_msgcnt(); + xcon->put(); + } + +private: + xio_msg_ex req_0; + struct xio_reg_mem mp_this; + std::atomic<unsigned> nrefs = { 0 }; +}; + +class XioCommand : public XioSend +{ +public: + XioCommand(XioConnection *_xcon, struct xio_reg_mem& _mp):XioSend(_xcon, _mp) { + } + + buffer::list& get_bl_ref() { return bl; }; + +private: + buffer::list bl; +}; + +struct XioMsg : public XioSend +{ +public: + Message* m; + XioMsgHdr hdr; + xio_msg_ex* req_arr; + +public: + XioMsg(Message *_m, XioConnection *_xcon, struct xio_reg_mem& _mp, + int _ex_cnt, uint64_t _features) : + XioSend(_xcon, _mp, _ex_cnt), + m(_m), hdr(m->get_header(), m->get_footer(), _features), + req_arr(NULL) + { + const entity_inst_t &inst = xcon->get_messenger()->get_myinst(); + hdr.peer_type = inst.name.type(); + hdr.addr = xcon->get_messenger()->get_myaddr_legacy(); + hdr.hdr->src.type = inst.name.type(); + hdr.hdr->src.num = inst.name.num(); + hdr.msg_cnt = _ex_cnt+1; + + if (unlikely(_ex_cnt > 0)) { + alloc_trailers(_ex_cnt); + } + } + + void print_debug(CephContext *cct, const char *tag) const override; + size_t get_msg_count() const override { + return hdr.msg_cnt; + } + + void alloc_trailers(int cnt) { + req_arr = static_cast<xio_msg_ex*>(malloc(cnt * sizeof(xio_msg_ex))); + for (int ix = 0; ix < cnt; ++ix) { + xio_msg_ex* xreq = &(req_arr[ix]); + new (xreq) xio_msg_ex(this); + } + } + + Message *get_message() { return m; } + + ~XioMsg() + { + if (unlikely(!!req_arr)) { + for (unsigned int ix = 0; ix < get_msg_count()-1; ++ix) { + xio_msg_ex* xreq = &(req_arr[ix]); + xreq->~xio_msg_ex(); + } + free(req_arr); + } + + /* testing only! server's ready, resubmit request (not reached on + * PASSIVE/server side) */ + if (unlikely(m->get_magic() & MSG_MAGIC_REDUPE)) { + if (likely(xcon->is_connected())) { + xcon->send_message(m); + } else { + /* dispose it */ + m->put(); + } + } else { + /* the normal case: done with message */ + m->put(); + } + } +}; + +class XioDispatchHook : public Message::CompletionHook +{ +private: + XioConnection *xcon; + XioInSeq msg_seq; + XioPool rsp_pool; + std::atomic<unsigned> nrefs { 1 }; + bool cl_flag; + friend class XioConnection; + friend class XioMessenger; +public: + struct xio_reg_mem mp_this; + + XioDispatchHook(XioConnection *_xcon, Message *_m, XioInSeq& _msg_seq, + struct xio_reg_mem& _mp) : + CompletionHook(_m), + xcon(_xcon->get()), + msg_seq(_msg_seq), + rsp_pool(xio_msgr_noreg_mpool), + cl_flag(false), + mp_this(_mp) + { + ++xcon->n_reqs; // atomicity by portal thread + xpool_inc_hookcnt(); + } + + virtual void finish(int r) { + this->put(); + } + + virtual void complete(int r) { + finish(r); + } + + int release_msgs(); + + XioDispatchHook* get() { + nrefs++; return this; + } + + void put(int n = 1) { + int refs = nrefs -= n; + if (refs == 0) { + /* in Marcus' new system, refs reaches 0 twice: once in + * Message lifecycle, and again after xio_release_msg. + */ + if (!cl_flag && release_msgs()) + return; + struct xio_reg_mem *mp = &this->mp_this; + this->~XioDispatchHook(); + xpool_free(sizeof(XioDispatchHook), mp); + } + } + + XioInSeq& get_seq() { return msg_seq; } + + XioPool& get_pool() { return rsp_pool; } + + void on_err_finalize(XioConnection *xcon) { + /* can't decode message; even with one-way must free + * xio_msg structures, and then xiopool + */ + this->finish(-1); + } + + ~XioDispatchHook() { + --xcon->n_reqs; // atomicity by portal thread + xpool_dec_hookcnt(); + xcon->put(); + } +}; + +/* A sender-side CompletionHook that relies on the on_msg_delivered + * to complete a pending mark down. */ +class XioMarkDownHook : public Message::CompletionHook +{ +private: + XioConnection* xcon; + +public: + struct xio_reg_mem mp_this; + + XioMarkDownHook( + XioConnection* _xcon, Message *_m, struct xio_reg_mem& _mp) : + CompletionHook(_m), xcon(_xcon->get()), mp_this(_mp) + { } + + virtual void claim(int r) {} + + virtual void finish(int r) { + xcon->put(); + struct xio_reg_mem *mp = &this->mp_this; + this->~XioMarkDownHook(); + xio_mempool_free(mp); + } + + virtual void complete(int r) { + xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE); + finish(r); + } +}; + +struct XioCompletion : public XioSubmit +{ + XioDispatchHook *xhook; +public: + XioCompletion(XioConnection *_xcon, XioDispatchHook *_xhook) + : XioSubmit(XioSubmit::INCOMING_MSG_RELEASE, _xcon /* not xcon! */), + xhook(_xhook->get()) { + // submit queue ref + xcon->get(); + }; + + struct xio_msg* dequeue() { + return xhook->get_seq().dequeue(); + } + + XioDispatchHook* get_xhook() { return xhook; } + + void finalize() { + xcon->put(); + xhook->put(); + } +}; + +void print_xio_msg_hdr(CephContext *cct, const char *tag, + const XioMsgHdr &hdr, const struct xio_msg *msg); +void print_ceph_msg(CephContext *cct, const char *tag, Message *m); + +#endif /* XIO_MSG_H */ diff --git a/src/msg/xio/XioPool.cc b/src/msg/xio/XioPool.cc new file mode 100644 index 00000000..5f0d77a2 --- /dev/null +++ b/src/msg/xio/XioPool.cc @@ -0,0 +1,41 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <iostream> +#include "XioPool.h" + +XioPoolStats xp_stats; + +bool XioPool::trace_mempool = 0; +bool XioPool::trace_msgcnt = 0; + +void XioPoolStats::dump(const char* tag, uint64_t serial) +{ + std::cout + << tag << " #" << serial << ": " + << "pool objs: " + << "64: " << ctr_set[SLAB_64].read() << " " + << "256: " << ctr_set[SLAB_256].read() << " " + << "1024: " << ctr_set[SLAB_1024].read() << " " + << "page: " << ctr_set[SLAB_PAGE].read() << " " + << "max: " << ctr_set[SLAB_MAX].read() << " " + << "overflow: " << ctr_set[SLAB_OVERFLOW].read() << " " + << std::endl; + std::cout + << tag << " #" << serial << ": " + << " msg objs: " + << "in: " << hook_cnt.read() << " " + << "out: " << msg_cnt.read() << " " + << std::endl; +} diff --git a/src/msg/xio/XioPool.h b/src/msg/xio/XioPool.h new file mode 100644 index 00000000..07fa7311 --- /dev/null +++ b/src/msg/xio/XioPool.h @@ -0,0 +1,218 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#ifndef XIO_POOL_H +#define XIO_POOL_H + +#include <atomic> +#include <vector> +#include <cstdlib> +#include <cstring> +#include <cstdint> + +extern "C" { +#include "libxio.h" +} + +#include "common/likely.h" + +static inline int xpool_alloc(struct xio_mempool *pool, uint64_t size, + struct xio_reg_mem* mp); +static inline void xpool_free(uint64_t size, struct xio_reg_mem* mp); + +class XioPool +{ +private: + struct xio_mempool *handle; + +public: + static bool trace_mempool; + static bool trace_msgcnt; + static const int MB = 8; + + struct xio_piece { + struct xio_reg_mem mp[1]; + struct xio_piece *next; + int s; + char payload[MB]; + } *first; + + explicit XioPool(struct xio_mempool *_handle) : + handle(_handle), first(0) + { + } + ~XioPool() + { + struct xio_piece *p; + while ((p = first)) { + first = p->next; + if (unlikely(trace_mempool)) { + memset(p->payload, 0xcf, p->s); // guard bytes + } + xpool_free(sizeof(struct xio_piece)+(p->s)-MB, p->mp); + } + } + void *alloc(size_t _s) + { + void *r; + struct xio_reg_mem mp[1]; + struct xio_piece *x; + int e = xpool_alloc(handle, (sizeof(struct xio_piece)-MB) + _s, mp); + if (e) { + r = 0; + } else { + x = reinterpret_cast<struct xio_piece *>(mp->addr); + *x->mp = *mp; + x->next = first; + x->s = _s; + first = x; + r = x->payload; + } + return r; + } +}; + +class XioPoolStats { +private: + enum pool_sizes { + SLAB_64 = 0, + SLAB_256, + SLAB_1024, + SLAB_PAGE, + SLAB_MAX, + SLAB_OVERFLOW, + NUM_SLABS, + }; + + std::atomic<unsigned> ctr_set[NUM_SLABS] = {}; + std::atomic<unsigned> msg_cnt = { 0 }; // send msgs + std::atomic<unsigned> hook_cnt = { 0 }; // recv msgs + +public: + void dump(const char* tag, uint64_t serial); + + void inc(uint64_t size) { + if (size <= 64) { + (ctr_set[SLAB_64])++; + return; + } + if (size <= 256) { + (ctr_set[SLAB_256])++; + return; + } + if (size <= 1024) { + (ctr_set[SLAB_1024])++; + return; + } + if (size <= 8192) { + (ctr_set[SLAB_PAGE])++; + return; + } + (ctr_set[SLAB_MAX])++; + } + + void dec(uint64_t size) { + if (size <= 64) { + (ctr_set[SLAB_64])--; + return; + } + if (size <= 256) { + (ctr_set[SLAB_256])--; + return; + } + if (size <= 1024) { + (ctr_set[SLAB_1024])--; + return; + } + if (size <= 8192) { + (ctr_set[SLAB_PAGE])--; + return; + } + (ctr_set[SLAB_MAX])--; + } + + void inc_overflow() { ctr_set[SLAB_OVERFLOW]++; } + void dec_overflow() { ctr_set[SLAB_OVERFLOW]--; } + + void inc_msgcnt() { + if (unlikely(XioPool::trace_msgcnt)) { + msg_cnt++; + } + } + + void dec_msgcnt() { + if (unlikely(XioPool::trace_msgcnt)) { + msg_cnt--; + } + } + + void inc_hookcnt() { + if (unlikely(XioPool::trace_msgcnt)) { + hook_cnt++; + } + } + + void dec_hookcnt() { + if (unlikely(XioPool::trace_msgcnt)) { + hook_cnt--; + } + } +}; + +extern XioPoolStats xp_stats; + +static inline int xpool_alloc(struct xio_mempool *pool, uint64_t size, + struct xio_reg_mem* mp) +{ + // try to allocate from the xio pool + int r = xio_mempool_alloc(pool, size, mp); + if (r == 0) { + if (unlikely(XioPool::trace_mempool)) + xp_stats += size; + return 0; + } + // fall back to malloc on errors + mp->addr = malloc(size); + ceph_assert(mp->addr); + mp->length = 0; + if (unlikely(XioPool::trace_mempool)) + xp_stats.inc_overflow(); + return 0; +} + +static inline void xpool_free(uint64_t size, struct xio_reg_mem* mp) +{ + if (mp->length) { + if (unlikely(XioPool::trace_mempool)) + xp_stats -= size; + xio_mempool_free(mp); + } else { // from malloc + if (unlikely(XioPool::trace_mempool)) + xp_stats.dec_overflow(); + free(mp->addr); + } +} + +#define xpool_inc_msgcnt() \ + do { xp_stats.inc_msgcnt(); } while (0) + +#define xpool_dec_msgcnt() \ + do { xp_stats.dec_msgcnt(); } while (0) + +#define xpool_inc_hookcnt() \ + do { xp_stats.inc_hookcnt(); } while (0) + +#define xpool_dec_hookcnt() \ + do { xp_stats.dec_hookcnt(); } while (0) + +#endif /* XIO_POOL_H */ diff --git a/src/msg/xio/XioPortal.cc b/src/msg/xio/XioPortal.cc new file mode 100644 index 00000000..e2379fb3 --- /dev/null +++ b/src/msg/xio/XioPortal.cc @@ -0,0 +1,98 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "XioPortal.h" +#include <stdio.h> + +#define dout_subsys ceph_subsys_xio + +int XioPortal::bind(struct xio_session_ops *ops, const string &base_uri, + uint16_t port, uint16_t *assigned_port) +{ + // format uri + char buf[40]; + xio_uri = base_uri; + xio_uri += ":"; + sprintf(buf, "%d", port); + xio_uri += buf; + + uint16_t assigned; + server = xio_bind(ctx, ops, xio_uri.c_str(), &assigned, 0, msgr); + if (server == NULL) + return xio_errno(); + + // update uri if port changed + if (port != assigned) { + xio_uri = base_uri; + xio_uri += ":"; + sprintf(buf, "%d", assigned); + xio_uri += buf; + } + + portal_id = const_cast<char*>(xio_uri.c_str()); + if (assigned_port) + *assigned_port = assigned; + ldout(msgr->cct,20) << "xio_bind: portal " << xio_uri + << " returned server " << server << dendl; + return 0; +} + +int XioPortals::bind(struct xio_session_ops *ops, const string& base_uri, + uint16_t port, uint16_t *port0) +{ + /* a server needs at least 1 portal */ + if (n < 1) + return EINVAL; + Messenger *msgr = portals[0]->msgr; + portals.resize(n); + + uint16_t port_min = msgr->cct->_conf->ms_bind_port_min; + const uint16_t port_max = msgr->cct->_conf->ms_bind_port_max; + + /* bind the portals */ + for (size_t i = 0; i < portals.size(); i++) { + uint16_t result_port; + if (port != 0) { + // bind directly to the given port + int r = portals[i]->bind(ops, base_uri, port, &result_port); + if (r != 0) + return -r; + } else { + int r = EADDRINUSE; + // try ports within the configured range + for (; port_min <= port_max; port_min++) { + r = portals[i]->bind(ops, base_uri, port_min, &result_port); + if (r == 0) { + port_min++; + break; + } + } + if (r != 0) { + lderr(msgr->cct) << "portal.bind unable to bind to " << base_uri + << " on any port in range " << msgr->cct->_conf->ms_bind_port_min + << "-" << port_max << ": " << xio_strerror(r) << dendl; + return -r; + } + } + + ldout(msgr->cct,5) << "xp::bind: portal " << i << " bind OK: " + << portals[i]->xio_uri << dendl; + + if (i == 0 && port0 != NULL) + *port0 = result_port; + port = 0; // use port 0 for all subsequent portals + } + + return 0; +} diff --git a/src/msg/xio/XioPortal.h b/src/msg/xio/XioPortal.h new file mode 100644 index 00000000..7a0afee4 --- /dev/null +++ b/src/msg/xio/XioPortal.h @@ -0,0 +1,458 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Portions Copyright (C) 2013 CohortFS, LLC + *s + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef XIO_PORTAL_H +#define XIO_PORTAL_H + +#include <string> + +extern "C" { +#include "libxio.h" +} +#include "XioInSeq.h" +#include <boost/lexical_cast.hpp> +#include "msg/SimplePolicyMessenger.h" +#include "XioConnection.h" +#include "XioMsg.h" + +#include "include/spinlock.h" + +#include "include/ceph_assert.h" +#include "common/dout.h" + +#ifndef CACHE_LINE_SIZE +#define CACHE_LINE_SIZE 64 /* XXX arch-specific define */ +#endif +#define CACHE_PAD(_n) char __pad ## _n [CACHE_LINE_SIZE] + +class XioPortal : public Thread +{ +private: + + struct SubmitQueue + { + const static int nlanes = 7; + + struct Lane + { + uint32_t size; + XioSubmit::Queue q; + ceph::spinlock sp; + CACHE_PAD(0); + }; + + Lane qlane[nlanes]; + + int ix; /* atomicity by portal thread */ + + SubmitQueue() : ix(0) + { + int ix; + Lane* lane; + + for (ix = 0; ix < nlanes; ++ix) { + lane = &qlane[ix]; + lane->size = 0; + } + } + + inline Lane* get_lane(XioConnection *xcon) + { + return &qlane[(((uint64_t) xcon) / 16) % nlanes]; + } + + void enq(XioConnection *xcon, XioSubmit* xs) + { + Lane* lane = get_lane(xcon); + std::lock_guard<decltype(lane->sp)> lg(lane->sp); + lane->q.push_back(*xs); + ++(lane->size); + } + + void enq(XioConnection *xcon, XioSubmit::Queue& requeue_q) + { + int size = requeue_q.size(); + Lane* lane = get_lane(xcon); + std::lock_guard<decltype(lane->sp)> lg(lane->sp); + XioSubmit::Queue::const_iterator i1 = lane->q.end(); + lane->q.splice(i1, requeue_q); + lane->size += size; + } + + void deq(XioSubmit::Queue& send_q) + { + Lane* lane; + int cnt; + + for (cnt = 0; cnt < nlanes; ++cnt, ++ix, ix = ix % nlanes) { + std::lock_guard<decltype(lane->sp)> lg(lane->sp); + lane = &qlane[ix]; + if (lane->size > 0) { + XioSubmit::Queue::const_iterator i1 = send_q.end(); + send_q.splice(i1, lane->q); + lane->size = 0; + ++ix, ix = ix % nlanes; + break; + } + } + } + + }; /* SubmitQueue */ + + Messenger *msgr; + struct xio_context *ctx; + struct xio_server *server; + SubmitQueue submit_q; + ceph::spinlock sp; + void *ev_loop; + string xio_uri; + char *portal_id; + bool _shutdown; + bool drained; + uint32_t magic; + uint32_t special_handling; + + friend class XioPortals; + friend class XioMessenger; + +public: + explicit XioPortal(Messenger *_msgr, int max_conns) : + msgr(_msgr), ctx(NULL), server(NULL), submit_q(), xio_uri(""), + portal_id(NULL), _shutdown(false), drained(false), + magic(0), + special_handling(0) + { + struct xio_context_params ctx_params; + memset(&ctx_params, 0, sizeof(ctx_params)); + ctx_params.user_context = this; + /* + * hint to Accelio the total number of connections that will share + * this context's resources: internal primary task pool... + */ + ctx_params.max_conns_per_ctx = max_conns; + + /* a portal is an xio_context and event loop */ + ctx = xio_context_create(&ctx_params, 0 /* poll timeout */, -1 /* cpu hint */); + ceph_assert(ctx && "Whoops, failed to create portal/ctx"); + } + + int bind(struct xio_session_ops *ops, const string &base_uri, + uint16_t port, uint16_t *assigned_port); + + inline void release_xio_msg(XioCompletion* xcmp) { + struct xio_msg *msg = xcmp->dequeue(); + struct xio_msg *next_msg = NULL; + int code; + if (unlikely(!xcmp->xcon->conn)) { + // NOTE: msg is not safe to dereference if the connection was torn down + xcmp->xcon->msg_release_fail(msg, ENOTCONN); + } + else while (msg) { + next_msg = static_cast<struct xio_msg *>(msg->user_context); + code = xio_release_msg(msg); + if (unlikely(code)) /* very unlikely, so log it */ + xcmp->xcon->msg_release_fail(msg, code); + msg = next_msg; + } + xcmp->trace.event("xio_release_msg"); + xcmp->finalize(); /* unconditional finalize */ + } + + void enqueue(XioConnection *xcon, XioSubmit *xs) + { + if (! _shutdown) { + submit_q.enq(xcon, xs); + xio_context_stop_loop(ctx); + return; + } + + /* dispose xs */ + switch(xs->type) { + case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */ + { + XioSend* xsend = static_cast<XioSend*>(xs); + xs->xcon->msg_send_fail(xsend, -EINVAL); + } + break; + default: + /* INCOMING_MSG_RELEASE */ + release_xio_msg(static_cast<XioCompletion*>(xs)); + break; + }; + } + + void requeue(XioConnection* xcon, XioSubmit::Queue& send_q) { + submit_q.enq(xcon, send_q); + } + + void requeue_all_xcon(XioConnection* xcon, + XioSubmit::Queue::iterator& q_iter, + XioSubmit::Queue& send_q) { + // XXX gather all already-dequeued outgoing messages for xcon + // and push them in FIFO order to front of the input queue, + // and mark the connection as flow-controlled + XioSubmit::Queue requeue_q; + + while (q_iter != send_q.end()) { + XioSubmit *xs = &(*q_iter); + // skip retires and anything for other connections + if (xs->xcon != xcon) { + q_iter++; + continue; + } + q_iter = send_q.erase(q_iter); + requeue_q.push_back(*xs); + } + std::lock_guard<decltype(xcon->sp)> lg(xcon->sp); + XioSubmit::Queue::const_iterator i1 = xcon->outgoing.requeue.begin(); + xcon->outgoing.requeue.splice(i1, requeue_q); + xcon->cstate.state_flow_controlled(XioConnection::CState::OP_FLAG_LOCKED); + } + + void *entry() + { + int size, code = 0; + uint32_t xio_qdepth_high; + XioSubmit::Queue send_q; + XioSubmit::Queue::iterator q_iter; + struct xio_msg *msg = NULL; + XioConnection *xcon; + XioSubmit *xs; + XioSend *xsend; + + do { + submit_q.deq(send_q); + + /* shutdown() barrier */ + std::lock_guard<decltype(sp)> lg(sp); + + restart: + size = send_q.size(); + + if (_shutdown) { + // XXX XioSend queues for flow-controlled connections may require + // cleanup + drained = true; + } + + if (size > 0) { + q_iter = send_q.begin(); + while (q_iter != send_q.end()) { + xs = &(*q_iter); + xcon = xs->xcon; + + switch (xs->type) { + case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */ + xsend = static_cast<XioSend*>(xs); + if (unlikely(!xcon->conn || !xcon->is_connected())) + code = ENOTCONN; + else { + /* XXX guard Accelio send queue (should be safe to rely + * on Accelio's check on below, but this assures that + * all chained xio_msg are accounted) */ + xio_qdepth_high = xcon->xio_qdepth_high_mark(); + if (unlikely((xcon->send_ctr + xsend->get_msg_count()) > + xio_qdepth_high)) { + requeue_all_xcon(xcon, q_iter, send_q); + goto restart; + } + + xs->trace.event("xio_send_msg"); + msg = xsend->get_xio_msg(); + code = xio_send_msg(xcon->conn, msg); + /* header trace moved here to capture xio serial# */ + if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) { + xsend->print_debug(msgr->cct, "xio_send_msg"); + } + /* get the right Accelio's errno code */ + if (unlikely(code)) { + if ((code == -1) && (xio_errno() == -1)) { + /* In case XIO does not have any credits to send, + * it would still queue up the message(s) for transmission, + * but would return -1 and errno would also be set to -1. + * This needs to be treated as a success. + */ + code = 0; + } + else { + code = xio_errno(); + } + } + } /* !ENOTCONN */ + if (unlikely(code)) { + switch (code) { + case XIO_E_TX_QUEUE_OVERFLOW: + { + requeue_all_xcon(xcon, q_iter, send_q); + goto restart; + } + break; + default: + q_iter = send_q.erase(q_iter); + xcon->msg_send_fail(xsend, code); + continue; + break; + }; + } else { + xcon->send.set(msg->timestamp); // need atomic? + xcon->send_ctr += xsend->get_msg_count(); // only inc if cb promised + } + break; + default: + /* INCOMING_MSG_RELEASE */ + q_iter = send_q.erase(q_iter); + release_xio_msg(static_cast<XioCompletion*>(xs)); + continue; + } /* switch (xs->type) */ + q_iter = send_q.erase(q_iter); + } /* while */ + } /* size > 0 */ + + xio_context_run_loop(ctx, 300); + + } while ((!_shutdown) || (!drained)); + + /* shutting down */ + if (server) { + xio_unbind(server); + } + xio_context_destroy(ctx); + return NULL; + } + + void shutdown() + { + std::lock_guard<decltype(sp)> lg(sp); + _shutdown = true; + } +}; + +class XioPortals +{ +private: + vector<XioPortal*> portals; + char **p_vec; + int n; + int last_unused; + +public: + XioPortals(Messenger *msgr, int _n, int nconns) : p_vec(NULL), last_unused(0) + { + n = max(_n, 1); + + portals.resize(n); + for (int i = 0; i < n; i++) { + if (!portals[i]) { + portals[i] = new XioPortal(msgr, nconns); + ceph_assert(portals[i] != nullptr); + } + } + } + + vector<XioPortal*>& get() { return portals; } + + const char **get_vec() + { + return (const char **) p_vec; + } + + int get_portals_len() + { + return n; + } + + int get_last_unused() + { + int pix = last_unused; + if (++last_unused >= get_portals_len()) + last_unused = 0; + return pix; + } + + XioPortal* get_next_portal() + { + int pix = get_last_unused(); + return portals[pix]; + } + + int bind(struct xio_session_ops *ops, const string& base_uri, + uint16_t port, uint16_t *port0); + + int accept(struct xio_session *session, + struct xio_new_session_req *req, + void *cb_user_context) + { + const char **portals_vec = get_vec(); + int pix = get_last_unused(); + + if (pix == 0) { + return xio_accept(session, NULL, 0, NULL, 0); + } else { + return xio_accept(session, + (const char **)&(portals_vec[pix]), + 1, NULL, 0); + } + } + + void start() + { + XioPortal *portal; + int p_ix, nportals = portals.size(); + + p_vec = new char*[nportals]; + for (p_ix = 0; p_ix < nportals; ++p_ix) { + portal = portals[p_ix]; + p_vec[p_ix] = (char*) /* portal->xio_uri.c_str() */ + portal->portal_id; + } + + for (p_ix = 0; p_ix < nportals; ++p_ix) { + string thread_name = "ms_xio_"; + thread_name.append(std::to_string(p_ix)); + portal = portals[p_ix]; + portal->create(thread_name.c_str()); + } + } + + void shutdown() + { + int nportals = portals.size(); + for (int p_ix = 0; p_ix < nportals; ++p_ix) { + XioPortal *portal = portals[p_ix]; + portal->shutdown(); + } + } + + void join() + { + int nportals = portals.size(); + for (int p_ix = 0; p_ix < nportals; ++p_ix) { + XioPortal *portal = portals[p_ix]; + portal->join(); + } + } + + ~XioPortals() + { + int nportals = portals.size(); + for (int ix = 0; ix < nportals; ++ix) + delete(portals[ix]); + portals.clear(); + if (p_vec) + delete[] p_vec; + } +}; + +#endif /* XIO_PORTAL_H */ diff --git a/src/msg/xio/XioSubmit.h b/src/msg/xio/XioSubmit.h new file mode 100644 index 00000000..9840ad4a --- /dev/null +++ b/src/msg/xio/XioSubmit.h @@ -0,0 +1,58 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Portions Copyright (C) 2013 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef XIO_SUBMIT_H +#define XIO_SUBMIT_H + +#include <boost/intrusive/list.hpp> +#include "msg/SimplePolicyMessenger.h" +extern "C" { +#include "libxio.h" +} +#include "XioConnection.h" +#include "msg/msg_types.h" +#include "XioPool.h" + +namespace bi = boost::intrusive; + +class XioConnection; + +struct XioSubmit +{ +public: + enum submit_type + { + OUTGOING_MSG, + INCOMING_MSG_RELEASE + }; + enum submit_type type; + bi::list_member_hook<> submit_list; + XioConnection *xcon; + ZTracer::Trace trace; + + XioSubmit(enum submit_type _type, XioConnection *_xcon) : + type(_type), xcon(_xcon) + {} + + typedef bi::list< XioSubmit, + bi::member_hook< XioSubmit, + bi::list_member_hook<>, + &XioSubmit::submit_list > + > Queue; + virtual ~XioSubmit(){ + } +}; + +#endif /* XIO_SUBMIT_H */ |