diff options
Diffstat (limited to 'src/msg/xio/XioMessenger.cc')
-rw-r--r-- | src/msg/xio/XioMessenger.cc | 1136 |
1 files changed, 1136 insertions, 0 deletions
diff --git a/src/msg/xio/XioMessenger.cc b/src/msg/xio/XioMessenger.cc new file mode 100644 index 00000000..dec7d0c7 --- /dev/null +++ b/src/msg/xio/XioMessenger.cc @@ -0,0 +1,1136 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Portions Copyright (C) 2013 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <arpa/inet.h> +#include <boost/lexical_cast.hpp> +#include <set> +#include <stdlib.h> +#include <memory> + +#include "XioMsg.h" +#include "XioMessenger.h" +#include "common/address_helper.h" +#include "common/code_environment.h" +#include "messages/MNop.h" + +#define dout_subsys ceph_subsys_xio +#undef dout_prefix +#define dout_prefix *_dout << "xio." + +Mutex mtx("XioMessenger Package Lock"); +std::atomic<bool> initialized = { false }; + +std::atomic<unsigned> XioMessenger::nInstances = { 0 }; + +struct xio_mempool *xio_msgr_noreg_mpool; + +static struct xio_session_ops xio_msgr_ops; + +/* Accelio API callouts */ + +namespace xio_log +{ +typedef pair<const char*, int> level_pair; +static const level_pair LEVELS[] = { + make_pair("fatal", 0), + make_pair("error", 0), + make_pair("warn", 1), + make_pair("info", 1), + make_pair("debug", 2), + make_pair("trace", 20) +}; + +static CephContext *context; + +int get_level() +{ + int level = 0; + for (size_t i = 0; i < sizeof(LEVELS); i++) { + if (!ldlog_p1(context, dout_subsys, LEVELS[i].second)) + break; + level++; + } + return level; +} + +void log_dout(const char *file, unsigned line, + const char *function, unsigned level, + const char *fmt, ...) +{ + char buffer[2048]; + va_list args; + va_start(args, fmt); + int n = vsnprintf(buffer, sizeof(buffer), fmt, args); + va_end(args); + + if (n > 0) { + const char *short_file = strrchr(file, '/'); + short_file = (short_file == NULL) ? file : short_file + 1; + + const level_pair &lvl = LEVELS[level]; + ldout(context, lvl.second) << '[' << lvl.first << "] " + << short_file << ':' << line << ' ' + << function << " - " << buffer << dendl; + } +} +} + +static int on_session_event(struct xio_session *session, + struct xio_session_event_data *event_data, + void *cb_user_context) +{ + XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context); + CephContext *cct = msgr->cct; + + ldout(cct,4) << "session event: " << xio_session_event_str(event_data->event) + << ". reason: " << xio_strerror(event_data->reason) << dendl; + + return msgr->session_event(session, event_data, cb_user_context); +} + +static int on_new_session(struct xio_session *session, + struct xio_new_session_req *req, + void *cb_user_context) +{ + XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context); + CephContext *cct = msgr->cct; + + ldout(cct,4) << "new session " << session + << " user_context " << cb_user_context << dendl; + + return (msgr->new_session(session, req, cb_user_context)); +} + +static int on_msg(struct xio_session *session, + struct xio_msg *req, + int more_in_batch, + void *cb_user_context) +{ + XioConnection* xcon __attribute__((unused)) = + static_cast<XioConnection*>(cb_user_context); + CephContext *cct = xcon->get_messenger()->cct; + + ldout(cct,25) << "on_msg session " << session << " xcon " << xcon << dendl; + + if (unlikely(XioPool::trace_mempool)) { + static uint32_t nreqs; + if (unlikely((++nreqs % 65536) == 0)) { + xp_stats.dump(__func__, nreqs); + } + } + + return xcon->on_msg(session, req, more_in_batch, + cb_user_context); +} + +static int on_ow_msg_send_complete(struct xio_session *session, + struct xio_msg *msg, + void *conn_user_context) +{ + XioConnection *xcon = + static_cast<XioConnection*>(conn_user_context); + CephContext *cct = xcon->get_messenger()->cct; + + ldout(cct,25) << "msg delivered session: " << session + << " msg: " << msg << " conn_user_context " + << conn_user_context << dendl; + + return xcon->on_ow_msg_send_complete(session, msg, conn_user_context); +} + +static int on_msg_error(struct xio_session *session, + enum xio_status error, + enum xio_msg_direction dir, + struct xio_msg *msg, + void *conn_user_context) +{ + /* XIO promises to flush back undelivered messages */ + XioConnection *xcon = + static_cast<XioConnection*>(conn_user_context); + CephContext *cct = xcon->get_messenger()->cct; + + ldout(cct,4) << "msg error session: " << session + << " error: " << xio_strerror(error) << " msg: " << msg + << " conn_user_context " << conn_user_context << dendl; + + return xcon->on_msg_error(session, error, msg, conn_user_context); +} + +static int on_cancel(struct xio_session *session, + struct xio_msg *msg, + enum xio_status result, + void *conn_user_context) +{ + XioConnection* xcon __attribute__((unused)) = + static_cast<XioConnection*>(conn_user_context); + CephContext *cct = xcon->get_messenger()->cct; + + ldout(cct,25) << "on cancel: session: " << session << " msg: " << msg + << " conn_user_context " << conn_user_context << dendl; + + return 0; +} + +static int on_cancel_request(struct xio_session *session, + struct xio_msg *msg, + void *conn_user_context) +{ + XioConnection* xcon __attribute__((unused)) = + static_cast<XioConnection*>(conn_user_context); + CephContext *cct = xcon->get_messenger()->cct; + + ldout(cct,25) << "on cancel request: session: " << session << " msg: " << msg + << " conn_user_context " << conn_user_context << dendl; + + return 0; +} + +/* free functions */ +static string xio_uri_from_entity(const string &type, + const entity_addr_t& addr, bool want_port) +{ + const char *host = NULL; + char addr_buf[129]; + string xio_uri; + + switch(addr.get_family()) { + case AF_INET: + host = inet_ntop(AF_INET, &addr.in4_addr().sin_addr, addr_buf, + INET_ADDRSTRLEN); + break; + case AF_INET6: + host = inet_ntop(AF_INET6, &addr.in6_addr().sin6_addr, addr_buf, + INET6_ADDRSTRLEN); + break; + default: + abort(); + break; + }; + + if (type == "rdma" || type == "tcp") + xio_uri = type + "://"; + else + xio_uri = "rdma://"; + + /* The following can only succeed if the host is rdma-capable */ + xio_uri += host; + if (want_port) { + xio_uri += ":"; + xio_uri += boost::lexical_cast<std::string>(addr.get_port()); + } + + return xio_uri; +} /* xio_uri_from_entity */ + +void XioInit::package_init(CephContext *cct) { + if (! initialized) { + + mtx.Lock(); + if (! initialized) { + + xio_init(); + + // claim a reference to the first context we see + xio_log::context = cct->get(); + + int xopt; + xopt = xio_log::get_level(); + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_LEVEL, + &xopt, sizeof(xopt)); + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_FN, + (const void*)xio_log::log_dout, sizeof(xio_log_fn)); + + xopt = 1; + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_DISABLE_HUGETBL, + &xopt, sizeof(xopt)); + + if (g_code_env == CODE_ENVIRONMENT_DAEMON) { + xopt = 1; + xio_set_opt(NULL, XIO_OPTLEVEL_RDMA, XIO_OPTNAME_ENABLE_FORK_INIT, + &xopt, sizeof(xopt)); + } + + xopt = XIO_MSGR_IOVLEN; + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_IN_IOVLEN, + &xopt, sizeof(xopt)); + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN, + &xopt, sizeof(xopt)); + + /* enable flow-control */ + xopt = 1; + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_ENABLE_FLOW_CONTROL, + &xopt, sizeof(xopt)); + + /* and set threshold for buffer callouts */ + xopt = max(cct->_conf->xio_max_send_inline, 512); + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_DATA, + &xopt, sizeof(xopt)); + + xopt = XioMsgHdr::get_max_encoded_length(); + ldout(cct,2) << "setting accelio max header size " << xopt << dendl; + xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_HEADER, + &xopt, sizeof(xopt)); + + size_t queue_depth = cct->_conf->xio_queue_depth; + struct xio_mempool_config mempool_config = { + 6, + { + {1024, 0, queue_depth, 262144}, + {4096, 0, queue_depth, 262144}, + {16384, 0, queue_depth, 262144}, + {65536, 0, 128, 65536}, + {262144, 0, 32, 16384}, + {1048576, 0, 8, 8192} + } + }; + xio_set_opt(NULL, + XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_CONFIG_MEMPOOL, + &mempool_config, sizeof(mempool_config)); + + /* and unregisterd one */ + #define XMSG_MEMPOOL_QUANTUM 4096 + + xio_msgr_noreg_mpool = + xio_mempool_create(-1 /* nodeid */, + XIO_MEMPOOL_FLAG_REGULAR_PAGES_ALLOC); + + (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 64, + cct->_conf->xio_mp_min, + cct->_conf->xio_mp_max_64, + XMSG_MEMPOOL_QUANTUM, 0); + (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 256, + cct->_conf->xio_mp_min, + cct->_conf->xio_mp_max_256, + XMSG_MEMPOOL_QUANTUM, 0); + (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 1024, + cct->_conf->xio_mp_min, + cct->_conf->xio_mp_max_1k, + XMSG_MEMPOOL_QUANTUM, 0); + (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, getpagesize(), + cct->_conf->xio_mp_min, + cct->_conf->xio_mp_max_page, + XMSG_MEMPOOL_QUANTUM, 0); + + /* initialize ops singleton */ + xio_msgr_ops.on_session_event = on_session_event; + xio_msgr_ops.on_new_session = on_new_session; + xio_msgr_ops.on_session_established = NULL; + xio_msgr_ops.on_msg = on_msg; + xio_msgr_ops.on_ow_msg_send_complete = on_ow_msg_send_complete; + xio_msgr_ops.on_msg_error = on_msg_error; + xio_msgr_ops.on_cancel = on_cancel; + xio_msgr_ops.on_cancel_request = on_cancel_request; + + /* mark initialized */ + initialized = true; + } + mtx.Unlock(); + } + } + +/* XioMessenger */ +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) +static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) { + return *_dout << "-- " << msgr->get_myaddr_legacy() << " "; +} + +XioMessenger::XioMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t _nonce, + uint64_t cflags, DispatchStrategy *ds) + : SimplePolicyMessenger(cct, name, mname, _nonce), + XioInit(cct), + portals(this, get_nportals(cflags), get_nconns_per_portal(cflags)), + dispatch_strategy(ds), + loop_con(new XioLoopbackConnection(this)), + special_handling(0), + sh_mtx("XioMessenger session mutex"), + sh_cond(), + need_addr(true), + did_bind(false), + nonce(_nonce) +{ + + if (cct->_conf->xio_trace_xcon) + magic |= MSG_MAGIC_TRACE_XCON; + + XioPool::trace_mempool = (cct->_conf->xio_trace_mempool); + XioPool::trace_msgcnt = (cct->_conf->xio_trace_msgcnt); + + dispatch_strategy->set_messenger(this); + + /* update class instance count */ + nInstances++; + + loop_con->set_features(CEPH_FEATURES_ALL); + + ldout(cct,2) << "Create msgr: " << this << " instance: " + << nInstances << " type: " << name.type_str() + << " subtype: " << mname << " nportals: " << get_nportals(cflags) + << " nconns_per_portal: " << get_nconns_per_portal(cflags) + << dendl; + +} /* ctor */ + +int XioMessenger::pool_hint(uint32_t dsize) { + if (dsize > 1024*1024) + return 0; + + /* if dsize is already present, returns -EEXIST */ + return xio_mempool_add_slab(xio_msgr_noreg_mpool, dsize, 0, + cct->_conf->xio_mp_max_hint, + XMSG_MEMPOOL_QUANTUM, 0); +} + +int XioMessenger::get_nconns_per_portal(uint64_t cflags) +{ + const int XIO_DEFAULT_NUM_CONNS_PER_PORTAL = 8; + int nconns = XIO_DEFAULT_NUM_CONNS_PER_PORTAL; + + if (cflags & Messenger::HAS_MANY_CONNECTIONS) + nconns = max(cct->_conf->xio_max_conns_per_portal, XIO_DEFAULT_NUM_CONNS_PER_PORTAL); + else if (cflags & Messenger::HEARTBEAT) + nconns = max(cct->_conf->osd_heartbeat_min_peers * 4, XIO_DEFAULT_NUM_CONNS_PER_PORTAL); + + return nconns; +} + +int XioMessenger::get_nportals(uint64_t cflags) +{ + int nportals = 1; + + if (cflags & Messenger::HAS_HEAVY_TRAFFIC) + nportals = max(cct->_conf->xio_portal_threads, 1); + + return nportals; +} + +void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +{ + // be careful here: multiple threads may block here, and readers of + // my_inst.addr do NOT hold any lock. + + // this always goes from true -> false under the protection of the + // mutex. if it is already false, we need not retake the mutex at + // all. + if (!need_addr) + return; + + sh_mtx.Lock(); + if (need_addr) { + entity_addr_t t = peer_addr_for_me; + t.set_port(my_inst.addr.get_port()); + my_inst.addr.set_sockaddr(t.get_sockaddr()); + ldout(cct,2) << "learned my addr " << my_inst.addr << dendl; + need_addr = false; + // init_local_connection(); + } + sh_mtx.Unlock(); + +} + +int XioMessenger::new_session(struct xio_session *session, + struct xio_new_session_req *req, + void *cb_user_context) +{ + if (shutdown_called) { + return xio_reject( + session, XIO_E_SESSION_REFUSED, NULL /* udata */, 0 /* udata len */); + } + int code = portals.accept(session, req, cb_user_context); + if (! code) + nsessions++; + return code; +} /* new_session */ + +int XioMessenger::session_event(struct xio_session *session, + struct xio_session_event_data *event_data, + void *cb_user_context) +{ + XioConnection *xcon; + + switch (event_data->event) { + case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT: + { + struct xio_connection *conn = event_data->conn; + struct xio_connection_attr xcona; + entity_addr_t peer_addr_for_me, paddr; + + xcon = static_cast<XioConnection*>(event_data->conn_user_context); + + ldout(cct,2) << "connection established " << event_data->conn + << " session " << session << " xcon " << xcon << dendl; + + (void) xio_query_connection(conn, &xcona, + XIO_CONNECTION_ATTR_LOCAL_ADDR| + XIO_CONNECTION_ATTR_PEER_ADDR); + peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr); + paddr.set_sockaddr((struct sockaddr *)&xcona.peer_addr); + //set_myaddr(peer_addr_for_me); + learned_addr(peer_addr_for_me); + ldout(cct,2) << "client: connected from " << peer_addr_for_me << " to " << paddr << dendl; + + /* notify hook */ + this->ms_deliver_handle_connect(xcon); + this->ms_deliver_handle_fast_connect(xcon); + } + break; + + case XIO_SESSION_NEW_CONNECTION_EVENT: + { + struct xio_connection *conn = event_data->conn; + struct xio_connection_attr xcona; + entity_inst_t s_inst; + entity_addr_t peer_addr_for_me; + + (void) xio_query_connection(conn, &xcona, + XIO_CONNECTION_ATTR_CTX| + XIO_CONNECTION_ATTR_PEER_ADDR| + XIO_CONNECTION_ATTR_LOCAL_ADDR); + /* XXX assumes RDMA */ + s_inst.addr.set_sockaddr((struct sockaddr *)&xcona.peer_addr); + peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr); + + xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst); + xcon->session = session; + + struct xio_context_attr xctxa; + (void) xio_query_context(xcona.ctx, &xctxa, XIO_CONTEXT_ATTR_USER_CTX); + + xcon->conn = conn; + xcon->portal = static_cast<XioPortal*>(xctxa.user_context); + ceph_assert(xcon->portal); + + xcona.user_context = xcon; + (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX); + + xcon->connected = true; + + /* sentinel ref */ + xcon->get(); /* xcon->nref == 1 */ + conns_sp.lock(); + conns_list.push_back(*xcon); + /* XXX we can't put xcon in conns_entity_map becase we don't yet know + * it's peer address */ + conns_sp.unlock(); + + /* XXXX pre-merge of session startup negotiation ONLY! */ + xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE); + + ldout(cct,2) << "New connection session " << session + << " xcon " << xcon << " on msgr: " << this << " portal: " << xcon->portal << dendl; + ldout(cct,2) << "Server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl; + } + break; + case XIO_SESSION_CONNECTION_ERROR_EVENT: + case XIO_SESSION_CONNECTION_CLOSED_EVENT: /* orderly discon */ + case XIO_SESSION_CONNECTION_DISCONNECTED_EVENT: /* unexpected discon */ + case XIO_SESSION_CONNECTION_REFUSED_EVENT: + xcon = static_cast<XioConnection*>(event_data->conn_user_context); + ldout(cct,2) << xio_session_event_str(event_data->event) + << " xcon " << xcon << " session " << session << dendl; + if (likely(!!xcon)) { + unregister_xcon(xcon); + xcon->on_disconnect_event(); + } + break; + case XIO_SESSION_CONNECTION_TEARDOWN_EVENT: + xcon = static_cast<XioConnection*>(event_data->conn_user_context); + ldout(cct,2) << xio_session_event_str(event_data->event) + << " xcon " << xcon << " session " << session << dendl; + /* + * There are flows where Accelio sends teardown event without going + * through disconnect event. so we make sure we cleaned the connection. + */ + unregister_xcon(xcon); + xcon->on_teardown_event(); + break; + case XIO_SESSION_TEARDOWN_EVENT: + ldout(cct,2) << xio_session_event_str(event_data->event) + << " session " << session << dendl; + if (unlikely(XioPool::trace_mempool)) { + xp_stats.dump("xio session dtor", reinterpret_cast<uint64_t>(session)); + } + xio_session_destroy(session); + if (--nsessions == 0) { + Mutex::Locker lck(sh_mtx); + if (nsessions == 0) + sh_cond.Signal(); + } + break; + default: + break; + }; + + return 0; +} + +enum bl_type +{ + BUFFER_PAYLOAD, + BUFFER_MIDDLE, + BUFFER_DATA +}; + +#define MAX_XIO_BUF_SIZE 1044480 + +static inline int +xio_count_buffers(const buffer::list& bl, int& req_size, int& msg_off, int& req_off) +{ + + const std::list<buffer::ptr>& buffers = bl.buffers(); + list<bufferptr>::const_iterator pb; + size_t size, off; + int result; + int first = 1; + + off = size = 0; + result = 0; + for (;;) { + if (off >= size) { + if (first) pb = buffers.begin(); else ++pb; + if (pb == buffers.end()) { + break; + } + off = 0; + size = pb->length(); + first = 0; + } + size_t count = size - off; + if (!count) continue; + if (req_size + count > MAX_XIO_BUF_SIZE) { + count = MAX_XIO_BUF_SIZE - req_size; + } + + ++result; + + /* advance iov and perhaps request */ + + off += count; + req_size += count; + ++msg_off; + if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) { + ++req_off; + msg_off = 0; + req_size = 0; + } + } + + return result; +} + +static inline void +xio_place_buffers(const buffer::list& bl, XioMsg *xmsg, struct xio_msg*& req, + struct xio_iovec_ex*& msg_iov, int& req_size, + int ex_cnt, int& msg_off, int& req_off, bl_type type) +{ + + const std::list<buffer::ptr>& buffers = bl.buffers(); + list<bufferptr>::const_iterator pb; + struct xio_iovec_ex* iov; + size_t size, off; + const char *data = NULL; + int first = 1; + + off = size = 0; + for (;;) { + if (off >= size) { + if (first) pb = buffers.begin(); else ++pb; + if (pb == buffers.end()) { + break; + } + off = 0; + size = pb->length(); + data = pb->c_str(); // is c_str() efficient? + first = 0; + } + size_t count = size - off; + if (!count) continue; + if (req_size + count > MAX_XIO_BUF_SIZE) { + count = MAX_XIO_BUF_SIZE - req_size; + } + + /* assign buffer */ + iov = &msg_iov[msg_off]; + iov->iov_base = (void *) (&data[off]); + iov->iov_len = count; + + switch (type) { + case BUFFER_DATA: + //break; + default: + { + struct xio_reg_mem *mp = get_xio_mp(*pb); + iov->mr = (mp) ? mp->mr : NULL; + } + break; + } + + /* advance iov(s) */ + + off += count; + req_size += count; + ++msg_off; + + /* next request if necessary */ + + if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) { + /* finish this request */ + req->out.pdata_iov.nents = msg_off; + /* advance to next, and write in it if it's not the last one. */ + if (++req_off >= ex_cnt) { + req = 0; /* poison. trap if we try to use it. */ + msg_iov = NULL; + } else { + req = &xmsg->req_arr[req_off].msg; + msg_iov = req->out.pdata_iov.sglist; + } + msg_off = 0; + req_size = 0; + } + } +} + +int XioMessenger::bind(const entity_addr_t& addr) +{ + if (addr.is_blank_ip()) { + lderr(cct) << "ERROR: need rdma ip for remote use! " << dendl; + cout << "Error: xio bind failed. public/cluster ip not specified" << std::endl; + return -1; + } + + entity_addr_t shift_addr = addr; + string base_uri = xio_uri_from_entity(cct->_conf->xio_transport_type, + shift_addr, false /* want_port */); + ldout(cct,4) << "XioMessenger " << this << " bind: xio_uri " + << base_uri << ':' << shift_addr.get_port() << dendl; + + uint16_t port0; + int r = portals.bind(&xio_msgr_ops, base_uri, shift_addr.get_port(), &port0); + if (r == 0) { + shift_addr.set_port(port0); + shift_addr.nonce = nonce; + set_myaddr(shift_addr); + need_addr = false; + did_bind = true; + } + return r; +} /* bind */ + +int XioMessenger::rebind(const set<int>& avoid_ports) +{ + ldout(cct,4) << "XioMessenger " << this << " rebind attempt" << dendl; + return 0; +} /* rebind */ + +int XioMessenger::start() +{ + portals.start(); + dispatch_strategy->start(); + if (!did_bind) { + my_inst.addr.nonce = nonce; + } + started = true; + return 0; +} + +void XioMessenger::wait() +{ + portals.join(); + dispatch_strategy->wait(); +} /* wait */ + +int XioMessenger::_send_message(Message *m, const entity_inst_t& dest) +{ + ConnectionRef conn = get_connection(dest); + if (conn) + return _send_message(m, &(*conn)); + else + return EINVAL; +} /* send_message(Message *, const entity_inst_t&) */ + +static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon, + int ex_cnt) +{ + struct xio_reg_mem mp_mem; + int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioMsg), &mp_mem); + if (!!e) + return NULL; + XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr); + ceph_assert(!!xmsg); + new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL); + return xmsg; +} + +XioCommand* pool_alloc_xio_command(XioConnection *xcon) +{ + struct xio_reg_mem mp_mem; + int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioCommand), &mp_mem); + if (!!e) + return NULL; + XioCommand *xcmd = reinterpret_cast<XioCommand*>(mp_mem.addr); + ceph_assert(!!xcmd); + new (xcmd) XioCommand(xcon, mp_mem); + return xcmd; +} + +int XioMessenger::_send_message(Message *m, Connection *con) +{ + if (con == loop_con.get() /* intrusive_ptr get() */) { + m->set_connection(con); + m->set_src(get_myinst().name); + m->set_seq(loop_con->next_seq()); + ds_dispatch(m); + return 0; + } + + XioConnection *xcon = static_cast<XioConnection*>(con); + + /* If con is not in READY state, we have to enforce policy */ + if (xcon->cstate.session_state.read() != XioConnection::UP) { + std::lock_guard<decltype(xcon->sp) lg(xcon->sp); + + if (xcon->cstate.session_state.read() != XioConnection::UP) { + xcon->outgoing.mqueue.push_back(*m); + return 0; + } + } + + return _send_message_impl(m, xcon); +} /* send_message(Message* m, Connection *con) */ + +int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon) +{ + int code = 0; + + Mutex::Locker l(xcon->lock); + if (unlikely(XioPool::trace_mempool)) { + static uint32_t nreqs; + if (unlikely((++nreqs % 65536) == 0)) { + xp_stats.dump(__func__, nreqs); + } + } + + m->set_seq(xcon->state.next_out_seq()); + m->set_magic(magic); // trace flags and special handling + + m->encode(xcon->get_features(), this->crcflags); + + buffer::list &payload = m->get_payload(); + buffer::list &middle = m->get_middle(); + buffer::list &data = m->get_data(); + + int msg_off = 0; + int req_off = 0; + int req_size = 0; + int nbuffers = + xio_count_buffers(payload, req_size, msg_off, req_off) + + xio_count_buffers(middle, req_size, msg_off, req_off) + + xio_count_buffers(data, req_size, msg_off, req_off); + + int ex_cnt = req_off; + if (msg_off == 0 && ex_cnt > 0) { + // no buffers for last msg + ldout(cct,10) << "msg_off 0, ex_cnt " << ex_cnt << " -> " << ex_cnt-1 << dendl; + ex_cnt--; + } + + /* get an XioMsg frame */ + XioMsg *xmsg = pool_alloc_xio_msg(m, xcon, ex_cnt); + if (! xmsg) { + /* could happen if Accelio has been shutdown */ + return ENOMEM; + } + + ldout(cct,4) << __func__ << " " << m << " new XioMsg " << xmsg + << " tag " << (int)xmsg->hdr.tag + << " req_0 " << xmsg->get_xio_msg() << " msg type " << m->get_type() + << " features: " << xcon->get_features() + << " conn " << xcon->conn << " sess " << xcon->session << dendl; + + if (magic & (MSG_MAGIC_XIO)) { + + /* XXXX verify */ + switch (m->get_type()) { + case 43: + // case 15: + ldout(cct,4) << __func__ << " stop 43 " << m->get_type() << " " << *m << dendl; + buffer::list &payload = m->get_payload(); + ldout(cct,4) << __func__ << " payload dump:" << dendl; + payload.hexdump(cout); + } + } + + struct xio_msg *req = xmsg->get_xio_msg(); + struct xio_iovec_ex *msg_iov = req->out.pdata_iov.sglist; + + if (magic & (MSG_MAGIC_XIO)) { + ldout(cct,4) << "payload: " << payload.buffers().size() << + " middle: " << middle.buffers().size() << + " data: " << data.buffers().size() << + dendl; + } + + if (unlikely(ex_cnt > 0)) { + ldout(cct,4) << __func__ << " buffer cnt > XIO_MSGR_IOVLEN (" << + ((XIO_MSGR_IOVLEN-1) + nbuffers) << ")" << dendl; + } + + /* do the invariant part */ + msg_off = 0; + req_off = -1; /* most often, not used */ + req_size = 0; + + xio_place_buffers(payload, xmsg, req, msg_iov, req_size, ex_cnt, msg_off, + req_off, BUFFER_PAYLOAD); + + xio_place_buffers(middle, xmsg, req, msg_iov, req_size, ex_cnt, msg_off, + req_off, BUFFER_MIDDLE); + + xio_place_buffers(data, xmsg, req, msg_iov, req_size, ex_cnt, msg_off, + req_off, BUFFER_DATA); + ldout(cct,10) << "ex_cnt " << ex_cnt << ", req_off " << req_off + << ", msg_cnt " << xmsg->get_msg_count() << dendl; + + /* finalize request */ + if (msg_off) + req->out.pdata_iov.nents = msg_off; + + /* fixup first msg */ + req = xmsg->get_xio_msg(); + + const std::list<buffer::ptr>& header = xmsg->hdr.get_bl().buffers(); + ceph_assert(header.size() == 1); /* XXX */ + list<bufferptr>::const_iterator pb = header.begin(); + req->out.header.iov_base = (char*) pb->c_str(); + req->out.header.iov_len = pb->length(); + + /* deliver via xio, preserve ordering */ + if (xmsg->get_msg_count() > 1) { + struct xio_msg *head = xmsg->get_xio_msg(); + struct xio_msg *tail = head; + for (req_off = 0; ((unsigned) req_off) < xmsg->get_msg_count()-1; ++req_off) { + req = &xmsg->req_arr[req_off].msg; +assert(!req->in.pdata_iov.nents); +assert(req->out.pdata_iov.nents || !nbuffers); + tail->next = req; + tail = req; + } + tail->next = NULL; + } + xmsg->trace = m->trace; + m->trace.event("xio portal enqueue for send"); + m->trace.keyval("xio message segments", xmsg->hdr.msg_cnt); + xcon->portal->enqueue_for_send(xcon, xmsg); + + return code; +} /* send_message(Message *, Connection *) */ + +int XioMessenger::shutdown() +{ + shutdown_called = true; + conns_sp.lock(); + XioConnection::ConnList::iterator iter; + iter = conns_list.begin(); + for (iter = conns_list.begin(); iter != conns_list.end(); ++iter) { + (void) iter->disconnect(); // XXX mark down? + } + conns_sp.unlock(); + while(nsessions > 0) { + Mutex::Locker lck(sh_mtx); + if (nsessions > 0) + sh_cond.Wait(sh_mtx); + } + portals.shutdown(); + dispatch_strategy->shutdown(); + did_bind = false; + started = false; + return 0; +} /* shutdown */ + +ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest) +{ + if (shutdown_called) + return NULL; + + const entity_inst_t& self_inst = get_myinst(); + if ((&dest == &self_inst) || + (dest == self_inst)) { + return get_loopback_connection(); + } + + conns_sp.lock(); + XioConnection::EntitySet::iterator conn_iter = + conns_entity_map.find(dest, XioConnection::EntityComp()); + if (conn_iter != conns_entity_map.end()) { + ConnectionRef cref = &(*conn_iter); + conns_sp.unlock(); + return cref; + } + else { + conns_sp.unlock(); + string xio_uri = xio_uri_from_entity(cct->_conf->xio_transport_type, + dest.addr, true /* want_port */); + + ldout(cct,4) << "XioMessenger " << this << " get_connection: xio_uri " + << xio_uri << dendl; + + /* XXX client session creation parameters */ + struct xio_session_params params = {}; + params.type = XIO_SESSION_CLIENT; + params.ses_ops = &xio_msgr_ops; + params.user_context = this; + params.uri = xio_uri.c_str(); + + XioConnection *xcon = new XioConnection(this, XioConnection::ACTIVE, + dest); + + xcon->session = xio_session_create(¶ms); + if (! xcon->session) { + delete xcon; + return NULL; + } + + /* this should cause callbacks with user context of conn, but + * we can always set it explicitly */ + struct xio_connection_params xcp = {}; + xcp.session = xcon->session; + xcp.ctx = xcon->portal->ctx; + xcp.conn_user_context = xcon; + + xcon->conn = xio_connect(&xcp); + if (!xcon->conn) { + xio_session_destroy(xcon->session); + delete xcon; + return NULL; + } + + nsessions++; + xcon->connected = true; + + /* sentinel ref */ + xcon->get(); /* xcon->nref == 1 */ + conns_sp.lock(); + conns_list.push_back(*xcon); + conns_entity_map.insert(*xcon); + conns_sp.unlock(); + + /* XXXX pre-merge of session startup negotiation ONLY! */ + xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE); + + ldout(cct,2) << "New connection xcon: " << xcon << + " up_ready on session " << xcon->session << + " on msgr: " << this << " portal: " << xcon->portal << dendl; + + return xcon->get(); /* nref +1 */ + } +} /* get_connection */ + +ConnectionRef XioMessenger::get_loopback_connection() +{ + return (loop_con.get()); +} /* get_loopback_connection */ + +void XioMessenger::unregister_xcon(XioConnection *xcon) +{ + std::lock_guard<decltype(conns_sp)> lckr(conns_sp); + + XioConnection::EntitySet::iterator conn_iter = + conns_entity_map.find(xcon->peer, XioConnection::EntityComp()); + if (conn_iter != conns_entity_map.end()) { + XioConnection *xcon2 = &(*conn_iter); + if (xcon == xcon2) { + conns_entity_map.erase(conn_iter); + } + } + + /* check if citer on conn_list */ + if (xcon->conns_hook.is_linked()) { + /* now find xcon on conns_list and erase */ + XioConnection::ConnList::iterator citer = + XioConnection::ConnList::s_iterator_to(*xcon); + conns_list.erase(citer); + } +} + +void XioMessenger::mark_down(const entity_addr_t& addr) +{ + entity_inst_t inst(entity_name_t(), addr); + std::lock_guard<decltype(conns_sp)> lckr(conns_sp); + XioConnection::EntitySet::iterator conn_iter = + conns_entity_map.find(inst, XioConnection::EntityComp()); + if (conn_iter != conns_entity_map.end()) { + (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE); + } +} /* mark_down(const entity_addr_t& */ + +void XioMessenger::mark_down(Connection* con) +{ + XioConnection *xcon = static_cast<XioConnection*>(con); + xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE); +} /* mark_down(Connection*) */ + +void XioMessenger::mark_down_all() +{ + std::lock_guard<decltype(conns_sp)> lckr(conns_sp); + XioConnection::EntitySet::iterator conn_iter; + for (conn_iter = conns_entity_map.begin(); conn_iter != + conns_entity_map.begin(); ++conn_iter) { + (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE); + } +} /* mark_down_all */ + +static inline XioMarkDownHook* pool_alloc_markdown_hook( + XioConnection *xcon, Message *m) +{ + struct xio_reg_mem mp_mem; + int e = xio_mempool_alloc(xio_msgr_noreg_mpool, + sizeof(XioMarkDownHook), &mp_mem); + if (!!e) + return NULL; + XioMarkDownHook *hook = static_cast<XioMarkDownHook*>(mp_mem.addr); + new (hook) XioMarkDownHook(xcon, m, mp_mem); + return hook; +} + +void XioMessenger::mark_down_on_empty(Connection* con) +{ + XioConnection *xcon = static_cast<XioConnection*>(con); + MNop* m = new MNop(); + m->tag = XIO_NOP_TAG_MARKDOWN; + m->set_completion_hook(pool_alloc_markdown_hook(xcon, m)); + // stall new messages + xcon->cstate.session_state = XioConnection::session_states::BARRIER; + (void) _send_message_impl(m, xcon); +} + +void XioMessenger::mark_disposable(Connection *con) +{ + XioConnection *xcon = static_cast<XioConnection*>(con); + xcon->_mark_disposable(XioConnection::CState::OP_FLAG_NONE); +} + +void XioMessenger::try_insert(XioConnection *xcon) +{ + std::lock_guard<decltype(conns_sp)> lckr(conns_sp); + /* already resident in conns_list */ + conns_entity_map.insert(*xcon); +} + +XioMessenger::~XioMessenger() +{ + delete dispatch_strategy; + nInstances--; +} /* dtor */ |