summaryrefslogtreecommitdiffstats
path: root/src/msg/async/rdma
diff options
context:
space:
mode:
Diffstat (limited to 'src/msg/async/rdma')
-rw-r--r--src/msg/async/rdma/Infiniband.cc1321
-rw-r--r--src/msg/async/rdma/Infiniband.h591
-rw-r--r--src/msg/async/rdma/RDMAConnectedSocketImpl.cc607
-rw-r--r--src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc183
-rw-r--r--src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc119
-rw-r--r--src/msg/async/rdma/RDMAServerSocketImpl.cc135
-rw-r--r--src/msg/async/rdma/RDMAStack.cc812
-rw-r--r--src/msg/async/rdma/RDMAStack.h344
8 files changed, 4112 insertions, 0 deletions
diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc
new file mode 100644
index 000000000..52323f948
--- /dev/null
+++ b/src/msg/async/rdma/Infiniband.cc
@@ -0,0 +1,1321 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Infiniband.h"
+#include "common/errno.h"
+#include "common/debug.h"
+#include "RDMAStack.h"
+#include <sys/time.h>
+#include <sys/resource.h>
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << "Infiniband "
+
+static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
+static const uint32_t MAX_INLINE_DATA = 0;
+static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
+static const uint32_t CQ_DEPTH = 30000;
+
+Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn),
+ gid_idx(cct->_conf.get_val<int64_t>("ms_async_rdma_gid_idx"))
+{
+ int r = ibv_query_port(ctxt, port_num, &port_attr);
+ if (r == -1) {
+ lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+
+ lid = port_attr.lid;
+ ceph_assert(gid_idx < port_attr.gid_tbl_len);
+#ifdef HAVE_IBV_EXP
+ union ibv_gid cgid;
+ struct ibv_exp_gid_attr gid_attr;
+ bool malformed = false;
+
+ ldout(cct,1) << __func__ << " using experimental verbs for gid" << dendl;
+
+
+ // search for requested GID in GIDs table
+ ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
+ << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
+ r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
+ "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
+ ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
+ &cgid.raw[ 0], &cgid.raw[ 1],
+ &cgid.raw[ 2], &cgid.raw[ 3],
+ &cgid.raw[ 4], &cgid.raw[ 5],
+ &cgid.raw[ 6], &cgid.raw[ 7],
+ &cgid.raw[ 8], &cgid.raw[ 9],
+ &cgid.raw[10], &cgid.raw[11],
+ &cgid.raw[12], &cgid.raw[13],
+ &cgid.raw[14], &cgid.raw[15]);
+
+ if (r != 16) {
+ ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
+ malformed = true;
+ }
+
+ gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
+
+ for (gid_idx = 0; gid_idx < port_attr.gid_tbl_len; gid_idx++) {
+ r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
+ if (r) {
+ lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
+ if (r) {
+ lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+
+ if (malformed) break; // stay with gid_idx=0
+ if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
+ (memcmp(&gid, &cgid, 16) == 0) ) {
+ ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
+ break;
+ }
+ }
+
+ if (gid_idx == port_attr.gid_tbl_len) {
+ lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
+ ceph_abort();
+ }
+#else
+ r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
+ if (r) {
+ lderr(cct) << __func__ << " query gid failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+#endif
+}
+
+Device::Device(CephContext *cct, ibv_device* ib_dev): device(ib_dev), active_port(nullptr)
+{
+ ceph_assert(device);
+ ctxt = ibv_open_device(device);
+ ceph_assert(ctxt);
+
+ name = ibv_get_device_name(device);
+
+ int r = ibv_query_device(ctxt, &device_attr);
+ if (r) {
+ lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+}
+
+Device::Device(CephContext *cct, struct ibv_context *ib_ctx): device(ib_ctx->device),
+ active_port(nullptr)
+{
+ ceph_assert(device);
+ ctxt = ib_ctx;
+ ceph_assert(ctxt);
+
+ name = ibv_get_device_name(device);
+
+ int r = ibv_query_device(ctxt, &device_attr);
+ if (r) {
+ lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+}
+
+void Device::binding_port(CephContext *cct, int port_num) {
+ port_cnt = device_attr.phys_port_cnt;
+ for (uint8_t port_id = 1; port_id <= port_cnt; ++port_id) {
+ Port *port = new Port(cct, ctxt, port_id);
+ if (port_id == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
+ active_port = port;
+ ldout(cct, 1) << __func__ << " found active port " << static_cast<int>(port_id) << dendl;
+ break;
+ } else {
+ ldout(cct, 10) << __func__ << " port " << port_id << " is not what we want. state: "
+ << ibv_port_state_str(port->get_port_attr()->state) << dendl;
+ delete port;
+ }
+ }
+ if (nullptr == active_port) {
+ lderr(cct) << __func__ << " port not found" << dendl;
+ ceph_assert(active_port);
+ }
+}
+
+
+Infiniband::QueuePair::QueuePair(
+ CephContext *c, Infiniband& infiniband, ibv_qp_type type,
+ int port, ibv_srq *srq,
+ Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
+ uint32_t tx_queue_len, uint32_t rx_queue_len, struct rdma_cm_id *cid, uint32_t q_key)
+: cct(c), infiniband(infiniband),
+ type(type),
+ ctxt(infiniband.device->ctxt),
+ ib_physical_port(port),
+ pd(infiniband.pd->pd),
+ srq(srq),
+ qp(NULL),
+ cm_id(cid), peer_cm_meta{0}, local_cm_meta{0},
+ txcq(txcq),
+ rxcq(rxcq),
+ initial_psn(lrand48() & PSN_MSK),
+ // One extra WR for beacon
+ max_send_wr(tx_queue_len + 1),
+ max_recv_wr(rx_queue_len),
+ q_key(q_key),
+ dead(false)
+{
+ if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
+ lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+}
+
+int Infiniband::QueuePair::modify_qp_to_error(void)
+{
+ ibv_qp_attr qpa;
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(&qpa, 0, sizeof(qpa));
+ qpa.qp_state = IBV_QPS_ERR;
+ if (ibv_modify_qp(qp, &qpa, IBV_QP_STATE)) {
+ lderr(cct) << __func__ << " failed to transition to ERROR state: " << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ ldout(cct, 20) << __func__ << " transition to ERROR state successfully." << dendl;
+ return 0;
+}
+
+int Infiniband::QueuePair::modify_qp_to_rts(void)
+{
+ // move from RTR state RTS
+ ibv_qp_attr qpa;
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(&qpa, 0, sizeof(qpa));
+ qpa.qp_state = IBV_QPS_RTS;
+ /*
+ * How long to wait before retrying if packet lost or server dead.
+ * Supposedly the timeout is 4.096us*2^timeout. However, the actual
+ * timeout appears to be 4.096us*2^(timeout+1), so the setting
+ * below creates a 135ms timeout.
+ */
+ qpa.timeout = 0x12;
+ // How many times to retry after timeouts before giving up.
+ qpa.retry_cnt = 7;
+ /*
+ * How many times to retry after RNR (receiver not ready) condition
+ * before giving up. Occurs when the remote side has not yet posted
+ * a receive request.
+ */
+ qpa.rnr_retry = 7; // 7 is infinite retry.
+ qpa.sq_psn = local_cm_meta.psn;
+ qpa.max_rd_atomic = 1;
+
+ int attr_mask = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;
+ int r = ibv_modify_qp(qp, &qpa, attr_mask);
+ if (r) {
+ lderr(cct) << __func__ << " failed to transition to RTS state: " << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;
+ return 0;
+}
+
+int Infiniband::QueuePair::modify_qp_to_rtr(void)
+{
+ // move from INIT to RTR state
+ ibv_qp_attr qpa;
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(&qpa, 0, sizeof(qpa));
+ qpa.qp_state = IBV_QPS_RTR;
+ qpa.path_mtu = IBV_MTU_1024;
+ qpa.dest_qp_num = peer_cm_meta.local_qpn;
+ qpa.rq_psn = peer_cm_meta.psn;
+ qpa.max_dest_rd_atomic = 1;
+ qpa.min_rnr_timer = 0x12;
+ qpa.ah_attr.is_global = 1;
+ qpa.ah_attr.grh.hop_limit = 6;
+ qpa.ah_attr.grh.dgid = peer_cm_meta.gid;
+ qpa.ah_attr.grh.sgid_index = infiniband.get_device()->get_gid_idx();
+ qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
+ //qpa.ah_attr.grh.flow_label = 0;
+
+ qpa.ah_attr.dlid = peer_cm_meta.lid;
+ qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
+ qpa.ah_attr.src_path_bits = 0;
+ qpa.ah_attr.port_num = (uint8_t)(ib_physical_port);
+
+ ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
+
+ int attr_mask = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MIN_RNR_TIMER | IBV_QP_MAX_DEST_RD_ATOMIC;
+
+ int r = ibv_modify_qp(qp, &qpa, attr_mask);
+ if (r) {
+ lderr(cct) << __func__ << " failed to transition to RTR state: " << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;
+ return 0;
+}
+
+int Infiniband::QueuePair::modify_qp_to_init(void)
+{
+ // move from RESET to INIT state
+ ibv_qp_attr qpa;
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(&qpa, 0, sizeof(qpa));
+ qpa.qp_state = IBV_QPS_INIT;
+ qpa.pkey_index = 0;
+ qpa.port_num = (uint8_t)(ib_physical_port);
+ qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
+ qpa.qkey = q_key;
+
+ int mask = IBV_QP_STATE | IBV_QP_PORT;
+ switch (type) {
+ case IBV_QPT_RC:
+ mask |= IBV_QP_ACCESS_FLAGS;
+ mask |= IBV_QP_PKEY_INDEX;
+ break;
+ case IBV_QPT_UD:
+ mask |= IBV_QP_QKEY;
+ mask |= IBV_QP_PKEY_INDEX;
+ break;
+ case IBV_QPT_RAW_PACKET:
+ break;
+ default:
+ ceph_abort();
+ }
+
+ if (ibv_modify_qp(qp, &qpa, mask)) {
+ lderr(cct) << __func__ << " failed to switch to INIT state Queue Pair, qp number: " << qp->qp_num
+ << " Error: " << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ ldout(cct, 20) << __func__ << " successfully switch to INIT state Queue Pair, qp number: " << qp->qp_num << dendl;
+ return 0;
+}
+
+int Infiniband::QueuePair::init()
+{
+ ldout(cct, 20) << __func__ << " started." << dendl;
+ ibv_qp_init_attr qpia;
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(&qpia, 0, sizeof(qpia));
+ qpia.send_cq = txcq->get_cq();
+ qpia.recv_cq = rxcq->get_cq();
+ if (srq) {
+ qpia.srq = srq; // use the same shared receive queue
+ } else {
+ qpia.cap.max_recv_wr = max_recv_wr;
+ qpia.cap.max_recv_sge = 1;
+ }
+ qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests
+ qpia.cap.max_send_sge = 1; // max send scatter-gather elements
+ qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q
+ qpia.qp_type = type; // RC, UC, UD, or XRC
+ qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs
+
+ if (!cct->_conf->ms_async_rdma_cm) {
+ qp = ibv_create_qp(pd, &qpia);
+ if (qp == NULL) {
+ lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
+ if (errno == ENOMEM) {
+ lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, "
+ " ms_async_rdma_send_buffers or"
+ " ms_async_rdma_buffer_size" << dendl;
+ }
+ return -1;
+ }
+ if (modify_qp_to_init() != 0) {
+ ibv_destroy_qp(qp);
+ return -1;
+ }
+ } else {
+ ceph_assert(cm_id->verbs == pd->context);
+ if (rdma_create_qp(cm_id, pd, &qpia)) {
+ lderr(cct) << __func__ << " failed to create queue pair with rdmacm library"
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ qp = cm_id->qp;
+ }
+ ldout(cct, 20) << __func__ << " successfully create queue pair: "
+ << "qp=" << qp << dendl;
+ local_cm_meta.local_qpn = get_local_qp_number();
+ local_cm_meta.psn = get_initial_psn();
+ local_cm_meta.lid = infiniband.get_lid();
+ local_cm_meta.peer_qpn = 0;
+ local_cm_meta.gid = infiniband.get_gid();
+ if (!srq) {
+ int rq_wrs = infiniband.post_chunks_to_rq(max_recv_wr, this);
+ if (rq_wrs == 0) {
+ lderr(cct) << __func__ << " intialize no SRQ Queue Pair, qp number: " << qp->qp_num
+ << " fatal error: can't post SQ WR " << dendl;
+ return -1;
+ }
+ ldout(cct, 20) << __func__ << " initialize no SRQ Queue Pair, qp number: "
+ << qp->qp_num << " post SQ WR " << rq_wrs << dendl;
+ }
+ return 0;
+}
+
+void Infiniband::QueuePair::wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data)
+{
+ char tmp[9];
+ uint32_t v32;
+ int i;
+
+ for (tmp[8] = 0, i = 0; i < 4; ++i) {
+ memcpy(tmp, wgid + i * 8, 8);
+ sscanf(tmp, "%x", &v32);
+ *(uint32_t *)(&cm_meta_data->gid.raw[i * 4]) = ntohl(v32);
+ }
+}
+
+void Infiniband::QueuePair::gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[])
+{
+ for (int i = 0; i < 4; ++i)
+ sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(cm_meta_data.gid.raw + i * 4)));
+}
+
+/*
+ * return value
+ * 1: means no valid buffer read
+ * 0: means got enough buffer
+ * < 0: means error
+ */
+int Infiniband::QueuePair::recv_cm_meta(CephContext *cct, int socket_fd)
+{
+ char msg[TCP_MSG_LEN];
+ char gid[33];
+ ssize_t r = ::read(socket_fd, &msg, sizeof(msg));
+ // Drop incoming qpt
+ if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+ return -EINVAL;
+ }
+ }
+ if (r < 0) {
+ r = -errno;
+ lderr(cct) << __func__ << " got error " << r << ": "
+ << cpp_strerror(r) << dendl;
+ } else if (r == 0) { // valid disconnect message of length 0
+ ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
+ } else if ((size_t)r != sizeof(msg)) { // invalid message
+ ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
+ r = -EINVAL;
+ } else { // valid message
+ sscanf(msg, "%hx:%x:%x:%x:%s", &(peer_cm_meta.lid), &(peer_cm_meta.local_qpn), &(peer_cm_meta.psn), &(peer_cm_meta.peer_qpn), gid);
+ wire_gid_to_gid(gid, &peer_cm_meta);
+ ldout(cct, 5) << __func__ << " recevd: " << peer_cm_meta.lid << ", " << peer_cm_meta.local_qpn
+ << ", " << peer_cm_meta.psn << ", " << peer_cm_meta.peer_qpn << ", " << gid << dendl;
+ }
+ return r;
+}
+
+int Infiniband::QueuePair::send_cm_meta(CephContext *cct, int socket_fd)
+{
+ int retry = 0;
+ ssize_t r;
+
+ char msg[TCP_MSG_LEN];
+ char gid[33];
+retry:
+ gid_to_wire_gid(local_cm_meta, gid);
+ sprintf(msg, "%04x:%08x:%08x:%08x:%s", local_cm_meta.lid, local_cm_meta.local_qpn, local_cm_meta.psn, local_cm_meta.peer_qpn, gid);
+ ldout(cct, 10) << __func__ << " sending: " << local_cm_meta.lid << ", " << local_cm_meta.local_qpn
+ << ", " << local_cm_meta.psn << ", " << local_cm_meta.peer_qpn << ", " << gid << dendl;
+ r = ::write(socket_fd, msg, sizeof(msg));
+ // Drop incoming qpt
+ if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+ return -EINVAL;
+ }
+ }
+
+ if ((size_t)r != sizeof(msg)) {
+ // FIXME need to handle EAGAIN instead of retry
+ if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
+ retry++;
+ goto retry;
+ }
+ if (r < 0)
+ lderr(cct) << __func__ << " send returned error " << errno << ": "
+ << cpp_strerror(errno) << dendl;
+ else
+ lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+ return 0;
+}
+
+/**
+ * Switch QP to ERROR state and then post a beacon to be able to drain all
+ * WCEs and then safely destroy QP. See RDMADispatcher::handle_tx_event()
+ * for details.
+ *
+ * \return
+ * -errno if the QueuePair can't switch to ERROR
+ * 0 for success.
+ */
+int Infiniband::QueuePair::to_dead()
+{
+ if (dead)
+ return 0;
+
+ if (modify_qp_to_error()) {
+ return -1;
+ }
+ ldout(cct, 20) << __func__ << " force trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn
+ << " bound remote QueuePair, qp number: " << local_cm_meta.peer_qpn << dendl;
+
+ struct ibv_send_wr *bad_wr = nullptr, beacon;
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(&beacon, 0, sizeof(beacon));
+ beacon.wr_id = BEACON_WRID;
+ beacon.opcode = IBV_WR_SEND;
+ beacon.send_flags = IBV_SEND_SIGNALED;
+ if (ibv_post_send(qp, &beacon, &bad_wr)) {
+ lderr(cct) << __func__ << " failed to send a beacon: " << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+ ldout(cct, 20) << __func__ << " trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn << " Beacon sent " << dendl;
+ dead = true;
+
+ return 0;
+}
+
+int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const
+{
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to query qp: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+
+ if (rqp)
+ *rqp = qpa.dest_qp_num;
+ return 0;
+}
+
+/**
+ * Get the remote infiniband address for this QueuePair, as set in #plumb().
+ * LIDs are "local IDs" in infiniband terminology. They are short, locally
+ * routable addresses.
+ */
+int Infiniband::QueuePair::get_remote_lid(uint16_t *lid) const
+{
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to query qp: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+
+ if (lid)
+ *lid = qpa.ah_attr.dlid;
+ return 0;
+}
+
+/**
+ * Get the state of a QueuePair.
+ */
+int Infiniband::QueuePair::get_state() const
+{
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to get state: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ return qpa.qp_state;
+}
+
+Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib)
+ : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
+{
+}
+
+Infiniband::CompletionChannel::~CompletionChannel()
+{
+ if (channel) {
+ int r = ibv_destroy_comp_channel(channel);
+ if (r < 0)
+ lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
+ ceph_assert(r == 0);
+ }
+}
+
+int Infiniband::CompletionChannel::init()
+{
+ ldout(cct, 20) << __func__ << " started." << dendl;
+ channel = ibv_create_comp_channel(infiniband.device->ctxt);
+ if (!channel) {
+ lderr(cct) << __func__ << " failed to create receive completion channel: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ int rc = ceph::NetHandler(cct).set_nonblock(channel->fd);
+ if (rc < 0) {
+ ibv_destroy_comp_channel(channel);
+ return -1;
+ }
+ return 0;
+}
+
+void Infiniband::CompletionChannel::ack_events()
+{
+ ibv_ack_cq_events(cq, cq_events_that_need_ack);
+ cq_events_that_need_ack = 0;
+}
+
+bool Infiniband::CompletionChannel::get_cq_event()
+{
+ ibv_cq *cq = NULL;
+ void *ev_ctx;
+ if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {
+ if (errno != EAGAIN && errno != EINTR)
+ lderr(cct) << __func__ << " failed to retrieve CQ event: "
+ << cpp_strerror(errno) << dendl;
+ return false;
+ }
+
+ /* accumulate number of cq events that need to
+ * * be acked, and periodically ack them
+ * */
+ if (++cq_events_that_need_ack == MAX_ACK_EVENT) {
+ ldout(cct, 20) << __func__ << " ack aq events." << dendl;
+ ibv_ack_cq_events(cq, MAX_ACK_EVENT);
+ cq_events_that_need_ack = 0;
+ }
+
+ return true;
+}
+
+
+Infiniband::CompletionQueue::~CompletionQueue()
+{
+ if (cq) {
+ int r = ibv_destroy_cq(cq);
+ if (r < 0)
+ lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
+ ceph_assert(r == 0);
+ }
+}
+
+int Infiniband::CompletionQueue::init()
+{
+ cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
+ if (!cq) {
+ lderr(cct) << __func__ << " failed to create receive completion queue: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+
+ if (ibv_req_notify_cq(cq, 0)) {
+ lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;
+ ibv_destroy_cq(cq);
+ cq = nullptr;
+ return -1;
+ }
+
+ channel->bind_cq(cq);
+ ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;
+ return 0;
+}
+
+int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)
+{
+ ldout(cct, 20) << __func__ << " started." << dendl;
+ int r = ibv_req_notify_cq(cq, 0);
+ if (r < 0)
+ lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;
+ return r;
+}
+
+int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {
+ int r = ibv_poll_cq(cq, num_entries, ret_wc_array);
+ if (r < 0) {
+ lderr(cct) << __func__ << " poll_completion_queue occur met error: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ return r;
+}
+
+
+Infiniband::ProtectionDomain::ProtectionDomain(CephContext *cct, Device *device)
+ : pd(ibv_alloc_pd(device->ctxt))
+{
+ if (pd == NULL) {
+ lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+}
+
+Infiniband::ProtectionDomain::~ProtectionDomain()
+{
+ ibv_dealloc_pd(pd);
+}
+
+
+Infiniband::MemoryManager::Chunk::Chunk(ibv_mr* m, uint32_t bytes, char* buffer,
+ uint32_t offset, uint32_t bound, uint32_t lkey, QueuePair* qp)
+ : mr(m), qp(qp), lkey(lkey), bytes(bytes), offset(offset), bound(bound), buffer(buffer)
+{
+}
+
+Infiniband::MemoryManager::Chunk::~Chunk()
+{
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::get_offset()
+{
+ return offset;
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::get_size() const
+{
+ return bound - offset;
+}
+
+void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b)
+{
+ offset = 0;
+ bound = b;
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::get_bound()
+{
+ return bound;
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::read(char* buf, uint32_t len)
+{
+ uint32_t left = get_size();
+ uint32_t read_len = left <= len ? left : len;
+ memcpy(buf, buffer + offset, read_len);
+ offset += read_len;
+ return read_len;
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::write(char* buf, uint32_t len)
+{
+ uint32_t write_len = (bytes - offset) <= len ? (bytes - offset) : len;
+ memcpy(buffer + offset, buf, write_len);
+ offset += write_len;
+ return write_len;
+}
+
+bool Infiniband::MemoryManager::Chunk::full()
+{
+ return offset == bytes;
+}
+
+void Infiniband::MemoryManager::Chunk::reset_read_chunk()
+{
+ offset = 0;
+ bound = 0;
+}
+
+void Infiniband::MemoryManager::Chunk::reset_write_chunk()
+{
+ offset = 0;
+ bound = bytes;
+}
+
+Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
+ : manager(m), buffer_size(s)
+{
+}
+
+Infiniband::MemoryManager::Cluster::~Cluster()
+{
+ int r = ibv_dereg_mr(chunk_base->mr);
+ ceph_assert(r == 0);
+ const auto chunk_end = chunk_base + num_chunk;
+ for (auto chunk = chunk_base; chunk != chunk_end; chunk++) {
+ chunk->~Chunk();
+ }
+
+ ::free(chunk_base);
+ manager.free(base);
+}
+
+int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
+{
+ ceph_assert(!base);
+ num_chunk = num;
+ uint32_t bytes = buffer_size * num;
+
+ base = (char*)manager.malloc(bytes);
+ end = base + bytes;
+ ceph_assert(base);
+ chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num));
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(static_cast<void*>(chunk_base), 0, sizeof(Chunk) * num);
+ free_chunks.reserve(num);
+ ibv_mr* m = ibv_reg_mr(manager.pd->pd, base, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
+ ceph_assert(m);
+ Chunk* chunk = chunk_base;
+ for (uint32_t offset = 0; offset < bytes; offset += buffer_size){
+ new(chunk) Chunk(m, buffer_size, base + offset, 0, buffer_size, m->lkey);
+ free_chunks.push_back(chunk);
+ chunk++;
+ }
+ return 0;
+}
+
+void Infiniband::MemoryManager::Cluster::take_back(std::vector<Chunk*> &ck)
+{
+ std::lock_guard l{lock};
+ for (auto c : ck) {
+ c->reset_write_chunk();
+ free_chunks.push_back(c);
+ }
+}
+
+int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks, size_t block_size)
+{
+ std::lock_guard l{lock};
+ uint32_t chunk_buffer_number = (block_size + buffer_size - 1) / buffer_size;
+ chunk_buffer_number = free_chunks.size() < chunk_buffer_number ? free_chunks.size(): chunk_buffer_number;
+ uint32_t r = 0;
+
+ for (r = 0; r < chunk_buffer_number; ++r) {
+ chunks.push_back(free_chunks.back());
+ free_chunks.pop_back();
+ }
+ return r;
+}
+
+bool Infiniband::MemoryManager::MemPoolContext::can_alloc(unsigned nbufs)
+{
+ /* unlimited */
+ if (manager->cct->_conf->ms_async_rdma_receive_buffers <= 0)
+ return true;
+
+ if (n_bufs_allocated + nbufs > (unsigned)manager->cct->_conf->ms_async_rdma_receive_buffers) {
+ lderr(manager->cct) << __func__ << " WARNING: OUT OF RX BUFFERS: allocated: " <<
+ n_bufs_allocated << " requested: " << nbufs <<
+ " limit: " << manager->cct->_conf->ms_async_rdma_receive_buffers << dendl;
+ return false;
+ }
+
+ return true;
+}
+
+void Infiniband::MemoryManager::MemPoolContext::set_stat_logger(PerfCounters *logger) {
+ perf_logger = logger;
+ if (perf_logger != nullptr)
+ perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated);
+}
+
+void Infiniband::MemoryManager::MemPoolContext::update_stats(int nbufs)
+{
+ n_bufs_allocated += nbufs;
+
+ if (!perf_logger)
+ return;
+
+ if (nbufs > 0) {
+ perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);
+ } else {
+ perf_logger->dec(l_msgr_rdma_rx_bufs_total, -nbufs);
+ }
+}
+
+void *Infiniband::MemoryManager::mem_pool::slow_malloc()
+{
+ // this will trigger pool expansion via PoolAllocator::malloc()
+ return PoolAllocator::with_context(ctx, [this] {
+ return boost::pool<PoolAllocator>::malloc();
+ });
+}
+
+Infiniband::MemoryManager::MemPoolContext*
+Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr;
+
+// lock is taken by mem_pool::slow_malloc()
+ceph::mutex& Infiniband::MemoryManager::PoolAllocator::get_lock()
+{
+ static ceph::mutex lock = ceph::make_mutex("pool-alloc-lock");
+ return lock;
+}
+
+char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type block_size)
+{
+ ceph_assert(g_ctx);
+ MemoryManager *manager = g_ctx->manager;
+ CephContext *cct = manager->cct;
+ size_t chunk_buffer_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size;
+ size_t chunk_buffer_number = block_size / chunk_buffer_size;
+
+ if (!g_ctx->can_alloc(chunk_buffer_number))
+ return NULL;
+
+ mem_info *minfo= static_cast<mem_info *>(manager->malloc(block_size + sizeof(mem_info)));
+ if (!minfo) {
+ lderr(cct) << __func__ << " failed to allocate " << chunk_buffer_number << " buffers "
+ " Its block size is : " << block_size + sizeof(mem_info) << dendl;
+ return NULL;
+ }
+
+ minfo->mr = ibv_reg_mr(manager->pd->pd, minfo->chunks, block_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
+ if (minfo->mr == NULL) {
+ lderr(cct) << __func__ << " failed to do rdma memory registration " << block_size << " bytes. "
+ " relase allocated memory now." << dendl;
+ manager->free(minfo);
+ return NULL;
+ }
+
+ minfo->nbufs = chunk_buffer_number;
+ // save this chunk context
+ minfo->ctx = g_ctx;
+
+ // note that the memory can be allocated before perf logger is set
+ g_ctx->update_stats(chunk_buffer_number);
+
+ /* initialize chunks */
+ Chunk *chunk = minfo->chunks;
+ for (unsigned i = 0; i < chunk_buffer_number; i++) {
+ new(chunk) Chunk(minfo->mr, cct->_conf->ms_async_rdma_buffer_size, chunk->data, 0, 0, minfo->mr->lkey);
+ chunk = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(chunk) + chunk_buffer_size);
+ }
+
+ return reinterpret_cast<char *>(minfo->chunks);
+}
+
+
+void Infiniband::MemoryManager::PoolAllocator::free(char * const block)
+{
+ mem_info *m;
+ std::lock_guard l{get_lock()};
+
+ Chunk *mem_info_chunk = reinterpret_cast<Chunk *>(block);
+ m = reinterpret_cast<mem_info *>(reinterpret_cast<char *>(mem_info_chunk) - offsetof(mem_info, chunks));
+ m->ctx->update_stats(-m->nbufs);
+ ibv_dereg_mr(m->mr);
+ m->ctx->manager->free(m);
+}
+
+Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p)
+ : cct(c), device(d), pd(p),
+ rxbuf_pool_ctx(this),
+ rxbuf_pool(&rxbuf_pool_ctx, sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size,
+ c->_conf->ms_async_rdma_receive_buffers > 0 ?
+ // if possible make initial pool size 2 * receive_queue_len
+ // that way there will be no pool expansion upon receive of the
+ // first packet.
+ (c->_conf->ms_async_rdma_receive_buffers < 2 * c->_conf->ms_async_rdma_receive_queue_len ?
+ c->_conf->ms_async_rdma_receive_buffers : 2 * c->_conf->ms_async_rdma_receive_queue_len) :
+ // rx pool is infinite, we can set any initial size that we want
+ 2 * c->_conf->ms_async_rdma_receive_queue_len,
+ device->device_attr.max_mr_size / (sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size))
+{
+}
+
+Infiniband::MemoryManager::~MemoryManager()
+{
+ if (send)
+ delete send;
+}
+
+void* Infiniband::MemoryManager::huge_pages_malloc(size_t size)
+{
+ size_t real_size = ALIGN_TO_PAGE_2MB(size) + HUGE_PAGE_SIZE_2MB;
+ char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE | MAP_HUGETLB, -1, 0);
+ if (ptr == MAP_FAILED) {
+ ptr = (char *)std::malloc(real_size);
+ if (ptr == NULL) return NULL;
+ real_size = 0;
+ }
+ *((size_t *)ptr) = real_size;
+ return ptr + HUGE_PAGE_SIZE_2MB;
+}
+
+void Infiniband::MemoryManager::huge_pages_free(void *ptr)
+{
+ if (ptr == NULL) return;
+ void *real_ptr = (char *)ptr - HUGE_PAGE_SIZE_2MB;
+ size_t real_size = *((size_t *)real_ptr);
+ ceph_assert(real_size % HUGE_PAGE_SIZE_2MB == 0);
+ if (real_size != 0)
+ munmap(real_ptr, real_size);
+ else
+ std::free(real_ptr);
+}
+
+
+void* Infiniband::MemoryManager::malloc(size_t size)
+{
+ if (cct->_conf->ms_async_rdma_enable_hugepage)
+ return huge_pages_malloc(size);
+ else
+ return std::malloc(size);
+}
+
+void Infiniband::MemoryManager::free(void *ptr)
+{
+ if (cct->_conf->ms_async_rdma_enable_hugepage)
+ huge_pages_free(ptr);
+ else
+ std::free(ptr);
+}
+
+void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num)
+{
+ ceph_assert(device);
+ ceph_assert(pd);
+
+ send = new Cluster(*this, size);
+ send->fill(tx_num);
+}
+
+void Infiniband::MemoryManager::return_tx(std::vector<Chunk*> &chunks)
+{
+ send->take_back(chunks);
+}
+
+int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t bytes)
+{
+ return send->get_buffers(c, bytes);
+}
+
+static std::atomic<bool> init_prereq = {false};
+
+void Infiniband::verify_prereq(CephContext *cct) {
+ int rc = 0;
+ ldout(cct, 20) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl;
+ if (cct->_conf->ms_async_rdma_enable_hugepage){
+ rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1);
+ ldout(cct, 0) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl;
+ if (rc) {
+ lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
+ ceph_abort();
+ }
+ }
+
+ //On RDMA MUST be called before fork
+ rc = ibv_fork_init();
+ if (rc) {
+ lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
+ ceph_abort();
+ }
+
+ //Check ulimit
+ struct rlimit limit;
+ getrlimit(RLIMIT_MEMLOCK, &limit);
+ if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
+ lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
+ " We recommend setting this parameter to infinity" << dendl;
+ }
+ init_prereq = true;
+}
+
+Infiniband::Infiniband(CephContext *cct)
+ : cct(cct),
+ device_name(cct->_conf->ms_async_rdma_device_name),
+ port_num( cct->_conf->ms_async_rdma_port_num)
+{
+ if (!init_prereq)
+ verify_prereq(cct);
+ ldout(cct, 20) << __func__ << " constructing Infiniband..." << dendl;
+}
+
+void Infiniband::init()
+{
+ std::lock_guard l{lock};
+
+ if (initialized)
+ return;
+
+ device_list = new DeviceList(cct);
+ initialized = true;
+
+ device = device_list->get_device(device_name.c_str());
+ ceph_assert(device);
+ device->binding_port(cct, port_num);
+ ib_physical_port = device->active_port->get_port_num();
+ pd = new ProtectionDomain(cct, device);
+ ceph_assert(ceph::NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
+
+ support_srq = cct->_conf->ms_async_rdma_support_srq;
+ if (support_srq) {
+ ceph_assert(device->device_attr.max_srq);
+ rx_queue_len = device->device_attr.max_srq_wr;
+ }
+ else
+ rx_queue_len = device->device_attr.max_qp_wr;
+ if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) {
+ rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len;
+ ldout(cct, 1) << __func__ << " assigning: " << rx_queue_len << " receive buffers" << dendl;
+ } else {
+ ldout(cct, 0) << __func__ << " using the max allowed receive buffers: " << rx_queue_len << dendl;
+ }
+
+ // check for the misconfiguration
+ if (cct->_conf->ms_async_rdma_receive_buffers > 0 &&
+ rx_queue_len > (unsigned)cct->_conf->ms_async_rdma_receive_buffers) {
+ lderr(cct) << __func__ << " rdma_receive_queue_len (" <<
+ rx_queue_len << ") > ms_async_rdma_receive_buffers(" <<
+ cct->_conf->ms_async_rdma_receive_buffers << ")." << dendl;
+ ceph_abort();
+ }
+
+ // Keep extra one WR for a beacon to indicate all WCEs were consumed
+ tx_queue_len = device->device_attr.max_qp_wr - 1;
+ if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) {
+ tx_queue_len = cct->_conf->ms_async_rdma_send_buffers;
+ ldout(cct, 1) << __func__ << " assigning: " << tx_queue_len << " send buffers" << dendl;
+ } else {
+ ldout(cct, 0) << __func__ << " using the max allowed send buffers: " << tx_queue_len << dendl;
+ }
+
+ //check for the memory region size misconfiguration
+ if ((uint64_t)cct->_conf->ms_async_rdma_buffer_size * tx_queue_len > device->device_attr.max_mr_size) {
+ lderr(cct) << __func__ << " Out of max memory region size " << dendl;
+ ceph_abort();
+ }
+
+ ldout(cct, 1) << __func__ << " device allow " << device->device_attr.max_cqe
+ << " completion entries" << dendl;
+
+ memory_manager = new MemoryManager(cct, device, pd);
+ memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len);
+
+ if (support_srq) {
+ srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
+ post_chunks_to_rq(rx_queue_len, NULL); //add to srq
+ }
+}
+
+Infiniband::~Infiniband()
+{
+ if (!initialized)
+ return;
+ if (support_srq)
+ ibv_destroy_srq(srq);
+ delete memory_manager;
+ delete pd;
+ delete device_list;
+}
+
+/**
+ * Create a shared receive queue. This basically wraps the verbs call.
+ *
+ * \param[in] max_wr
+ * The max number of outstanding work requests in the SRQ.
+ * \param[in] max_sge
+ * The max number of scatter elements per WR.
+ * \return
+ * A valid ibv_srq pointer, or NULL on error.
+ */
+ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
+{
+ ibv_srq_init_attr sia;
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(&sia, 0, sizeof(sia));
+ sia.srq_context = device->ctxt;
+ sia.attr.max_wr = max_wr;
+ sia.attr.max_sge = max_sge;
+ return ibv_create_srq(pd->pd, &sia);
+}
+
+int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
+{
+ return memory_manager->get_send_buffers(c, bytes);
+}
+
+/**
+ * Create a new QueuePair. This factory should be used in preference to
+ * the QueuePair constructor directly, since this lets derivatives of
+ * Infiniband, e.g. MockInfiniband (if it existed),
+ * return mocked out QueuePair derivatives.
+ *
+ * \return
+ * QueuePair on success or NULL if init fails
+ * See QueuePair::QueuePair for parameter documentation.
+ */
+Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx,
+ CompletionQueue* rx, ibv_qp_type type, struct rdma_cm_id *cm_id)
+{
+ Infiniband::QueuePair *qp = new QueuePair(
+ cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len, cm_id);
+ if (qp->init()) {
+ delete qp;
+ return NULL;
+ }
+ return qp;
+}
+
+int Infiniband::post_chunks_to_rq(int rq_wr_num, QueuePair *qp)
+{
+ int ret = 0;
+ Chunk *chunk = nullptr;
+
+ ibv_recv_wr *rx_work_request = static_cast<ibv_recv_wr*>(::calloc(rq_wr_num, sizeof(ibv_recv_wr)));
+ ibv_sge *isge = static_cast<ibv_sge*>(::calloc(rq_wr_num, sizeof(ibv_sge)));
+ ceph_assert(rx_work_request);
+ ceph_assert(isge);
+
+ int i = 0;
+ while (i < rq_wr_num) {
+ chunk = get_memory_manager()->get_rx_buffer();
+ if (chunk == nullptr) {
+ lderr(cct) << __func__ << " WARNING: out of memory. Request " << rq_wr_num <<
+ " rx buffers. Only get " << i << " rx buffers." << dendl;
+ if (i == 0) {
+ ::free(rx_work_request);
+ ::free(isge);
+ return 0;
+ }
+ break; //get some buffers, so we need post them to recevie queue
+ }
+
+ isge[i].addr = reinterpret_cast<uint64_t>(chunk->data);
+ isge[i].length = chunk->bytes;
+ isge[i].lkey = chunk->lkey;
+
+ rx_work_request[i].wr_id = reinterpret_cast<uint64_t>(chunk);// assign chunk address as work request id
+
+ if (i != 0) {
+ rx_work_request[i - 1].next = &rx_work_request[i];
+ }
+ rx_work_request[i].sg_list = &isge[i];
+ rx_work_request[i].num_sge = 1;
+
+ if (qp && !qp->get_srq()) {
+ chunk->set_qp(qp);
+ qp->add_rq_wr(chunk);
+ }
+ i++;
+ }
+
+ ibv_recv_wr *badworkrequest = nullptr;
+ if (support_srq) {
+ ret = ibv_post_srq_recv(srq, rx_work_request, &badworkrequest);
+ } else {
+ ceph_assert(qp);
+ ret = ibv_post_recv(qp->get_qp(), rx_work_request, &badworkrequest);
+ }
+
+ ::free(rx_work_request);
+ ::free(isge);
+ ceph_assert(badworkrequest == nullptr && ret == 0);
+ return i;
+}
+
+Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
+{
+ Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
+ if (cc->init()) {
+ delete cc;
+ return NULL;
+ }
+ return cc;
+}
+
+Infiniband::CompletionQueue* Infiniband::create_comp_queue(
+ CephContext *cct, CompletionChannel *cc)
+{
+ Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
+ cct, *this, CQ_DEPTH, cc);
+ if (cq->init()) {
+ delete cq;
+ return NULL;
+ }
+ return cq;
+}
+
+Infiniband::QueuePair::~QueuePair()
+{
+ ldout(cct, 20) << __func__ << " destroy Queue Pair, qp number: " << qp->qp_num << " left SQ WR " << recv_queue.size() << dendl;
+ if (qp) {
+ ldout(cct, 20) << __func__ << " destroy qp=" << qp << dendl;
+ ceph_assert(!ibv_destroy_qp(qp));
+ }
+
+ for (auto& chunk: recv_queue) {
+ infiniband.get_memory_manager()->release_rx_buffer(chunk);
+ }
+ recv_queue.clear();
+}
+
+/**
+ * Given a string representation of the `status' field from Verbs
+ * struct `ibv_wc'.
+ *
+ * \param[in] status
+ * The integer status obtained in ibv_wc.status.
+ * \return
+ * A string corresponding to the given status.
+ */
+const char* Infiniband::wc_status_to_string(int status)
+{
+ static const char *lookup[] = {
+ "SUCCESS",
+ "LOC_LEN_ERR",
+ "LOC_QP_OP_ERR",
+ "LOC_EEC_OP_ERR",
+ "LOC_PROT_ERR",
+ "WR_FLUSH_ERR",
+ "MW_BIND_ERR",
+ "BAD_RESP_ERR",
+ "LOC_ACCESS_ERR",
+ "REM_INV_REQ_ERR",
+ "REM_ACCESS_ERR",
+ "REM_OP_ERR",
+ "RETRY_EXC_ERR",
+ "RNR_RETRY_EXC_ERR",
+ "LOC_RDD_VIOL_ERR",
+ "REM_INV_RD_REQ_ERR",
+ "REM_ABORT_ERR",
+ "INV_EECN_ERR",
+ "INV_EEC_STATE_ERR",
+ "FATAL_ERR",
+ "RESP_TIMEOUT_ERR",
+ "GENERAL_ERR"
+ };
+
+ if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)
+ return "<status out of range!>";
+ return lookup[status];
+}
+
+const char* Infiniband::qp_state_string(int status) {
+ switch(status) {
+ case IBV_QPS_RESET : return "IBV_QPS_RESET";
+ case IBV_QPS_INIT : return "IBV_QPS_INIT";
+ case IBV_QPS_RTR : return "IBV_QPS_RTR";
+ case IBV_QPS_RTS : return "IBV_QPS_RTS";
+ case IBV_QPS_SQD : return "IBV_QPS_SQD";
+ case IBV_QPS_SQE : return "IBV_QPS_SQE";
+ case IBV_QPS_ERR : return "IBV_QPS_ERR";
+ default: return " out of range.";
+ }
+}
diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h
new file mode 100644
index 000000000..f18442e4e
--- /dev/null
+++ b/src/msg/async/rdma/Infiniband.h
@@ -0,0 +1,591 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_INFINIBAND_H
+#define CEPH_INFINIBAND_H
+
+#include <boost/pool/pool.hpp>
+// need this because boost messes with ceph log/assert definitions
+#include "include/ceph_assert.h"
+
+#include <infiniband/verbs.h>
+#include <rdma/rdma_cma.h>
+
+#include <atomic>
+#include <functional>
+#include <string>
+#include <vector>
+
+#include "include/common_fwd.h"
+#include "include/int_types.h"
+#include "include/page.h"
+#include "include/scope_guard.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/ceph_mutex.h"
+#include "common/perf_counters.h"
+#include "msg/msg_types.h"
+#include "msg/async/net_handler.h"
+
+#define HUGE_PAGE_SIZE_2MB (2 * 1024 * 1024)
+#define ALIGN_TO_PAGE_2MB(x) \
+ (((x) + (HUGE_PAGE_SIZE_2MB - 1)) & ~(HUGE_PAGE_SIZE_2MB - 1))
+
+#define PSN_LEN 24
+#define PSN_MSK ((1 << PSN_LEN) - 1)
+
+#define BEACON_WRID 0xDEADBEEF
+
+struct ib_cm_meta_t {
+ uint16_t lid;
+ uint32_t local_qpn;
+ uint32_t psn;
+ uint32_t peer_qpn;
+ union ibv_gid gid;
+} __attribute__((packed));
+
+class RDMAStack;
+
+class Port {
+ struct ibv_context* ctxt;
+ int port_num;
+ struct ibv_port_attr port_attr;
+ uint16_t lid;
+ int gid_idx;
+ union ibv_gid gid;
+
+ public:
+ explicit Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn);
+ uint16_t get_lid() { return lid; }
+ ibv_gid get_gid() { return gid; }
+ int get_port_num() { return port_num; }
+ ibv_port_attr* get_port_attr() { return &port_attr; }
+ int get_gid_idx() { return gid_idx; }
+};
+
+
+class Device {
+ ibv_device *device;
+ const char* name;
+ uint8_t port_cnt = 0;
+ public:
+ explicit Device(CephContext *c, ibv_device* ib_dev);
+ explicit Device(CephContext *c, ibv_context *ib_ctx);
+ ~Device() {
+ if (active_port) {
+ delete active_port;
+ ceph_assert(ibv_close_device(ctxt) == 0);
+ }
+ }
+ const char* get_name() { return name;}
+ uint16_t get_lid() { return active_port->get_lid(); }
+ ibv_gid get_gid() { return active_port->get_gid(); }
+ int get_gid_idx() { return active_port->get_gid_idx(); }
+ void binding_port(CephContext *c, int port_num);
+ struct ibv_context *ctxt;
+ ibv_device_attr device_attr;
+ Port* active_port;
+};
+
+
+class DeviceList {
+ struct ibv_device ** device_list;
+ struct ibv_context ** device_context_list;
+ int num;
+ Device** devices;
+ public:
+ explicit DeviceList(CephContext *cct): device_list(nullptr), device_context_list(nullptr),
+ num(0), devices(nullptr) {
+ device_list = ibv_get_device_list(&num);
+ ceph_assert(device_list);
+ ceph_assert(num);
+ if (cct->_conf->ms_async_rdma_cm) {
+ device_context_list = rdma_get_devices(NULL);
+ ceph_assert(device_context_list);
+ }
+ devices = new Device*[num];
+
+ for (int i = 0;i < num; ++i) {
+ if (cct->_conf->ms_async_rdma_cm) {
+ devices[i] = new Device(cct, device_context_list[i]);
+ } else {
+ devices[i] = new Device(cct, device_list[i]);
+ }
+ }
+ }
+ ~DeviceList() {
+ for (int i=0; i < num; ++i) {
+ delete devices[i];
+ }
+ delete []devices;
+ ibv_free_device_list(device_list);
+ rdma_free_devices(device_context_list);
+ }
+
+ Device* get_device(const char* device_name) {
+ for (int i = 0; i < num; ++i) {
+ if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) {
+ return devices[i];
+ }
+ }
+ return NULL;
+ }
+};
+
+// stat counters
+enum {
+ l_msgr_rdma_dispatcher_first = 94000,
+
+ l_msgr_rdma_polling,
+ l_msgr_rdma_inflight_tx_chunks,
+ l_msgr_rdma_rx_bufs_in_use,
+ l_msgr_rdma_rx_bufs_total,
+
+ l_msgr_rdma_tx_total_wc,
+ l_msgr_rdma_tx_total_wc_errors,
+ l_msgr_rdma_tx_wc_retry_errors,
+ l_msgr_rdma_tx_wc_wr_flush_errors,
+
+ l_msgr_rdma_rx_total_wc,
+ l_msgr_rdma_rx_total_wc_errors,
+ l_msgr_rdma_rx_fin,
+
+ l_msgr_rdma_handshake_errors,
+
+ l_msgr_rdma_total_async_events,
+ l_msgr_rdma_async_last_wqe_events,
+
+ l_msgr_rdma_created_queue_pair,
+ l_msgr_rdma_active_queue_pair,
+
+ l_msgr_rdma_dispatcher_last,
+};
+
+enum {
+ l_msgr_rdma_first = 95000,
+
+ l_msgr_rdma_tx_no_mem,
+ l_msgr_rdma_tx_parital_mem,
+ l_msgr_rdma_tx_failed,
+
+ l_msgr_rdma_tx_chunks,
+ l_msgr_rdma_tx_bytes,
+ l_msgr_rdma_rx_chunks,
+ l_msgr_rdma_rx_bytes,
+ l_msgr_rdma_pending_sent_conns,
+
+ l_msgr_rdma_last,
+};
+
+class RDMADispatcher;
+
+class Infiniband {
+ public:
+ class ProtectionDomain {
+ public:
+ explicit ProtectionDomain(CephContext *cct, Device *device);
+ ~ProtectionDomain();
+
+ ibv_pd* const pd;
+ };
+
+ class QueuePair;
+ class MemoryManager {
+ public:
+ class Chunk {
+ public:
+ Chunk(ibv_mr* m, uint32_t bytes, char* buffer, uint32_t offset = 0, uint32_t bound = 0, uint32_t lkey = 0, QueuePair* qp = nullptr);
+ ~Chunk();
+
+ uint32_t get_offset();
+ uint32_t get_size() const;
+ void prepare_read(uint32_t b);
+ uint32_t get_bound();
+ uint32_t read(char* buf, uint32_t len);
+ uint32_t write(char* buf, uint32_t len);
+ bool full();
+ void reset_read_chunk();
+ void reset_write_chunk();
+ void set_qp(QueuePair *qp) { this->qp = qp; }
+ void clear_qp() { set_qp(nullptr); }
+ QueuePair* get_qp() { return qp; }
+
+ public:
+ ibv_mr* mr;
+ QueuePair *qp;
+ uint32_t lkey;
+ uint32_t bytes;
+ uint32_t offset;
+ uint32_t bound;
+ char* buffer; // TODO: remove buffer/refactor TX
+ char data[0];
+ };
+
+ class Cluster {
+ public:
+ Cluster(MemoryManager& m, uint32_t s);
+ ~Cluster();
+
+ int fill(uint32_t num);
+ void take_back(std::vector<Chunk*> &ck);
+ int get_buffers(std::vector<Chunk*> &chunks, size_t bytes);
+ Chunk *get_chunk_by_buffer(const char *c) {
+ uint32_t idx = (c - base) / buffer_size;
+ Chunk *chunk = chunk_base + idx;
+ return chunk;
+ }
+ bool is_my_buffer(const char *c) const {
+ return c >= base && c < end;
+ }
+
+ bool is_valid_chunk(const Chunk* c) const {
+ return c >= chunk_base && c < chunk_base + num_chunk;
+ }
+ MemoryManager& manager;
+ uint32_t buffer_size;
+ uint32_t num_chunk = 0;
+ ceph::mutex lock = ceph::make_mutex("cluster_lock");
+ std::vector<Chunk*> free_chunks;
+ char *base = nullptr;
+ char *end = nullptr;
+ Chunk* chunk_base = nullptr;
+ };
+
+ class MemPoolContext {
+ PerfCounters *perf_logger;
+
+ public:
+ MemoryManager *manager;
+ unsigned n_bufs_allocated;
+ // true if it is possible to alloc
+ // more memory for the pool
+ explicit MemPoolContext(MemoryManager *m) :
+ perf_logger(nullptr),
+ manager(m),
+ n_bufs_allocated(0) {}
+ bool can_alloc(unsigned nbufs);
+ void update_stats(int val);
+ void set_stat_logger(PerfCounters *logger);
+ };
+
+ class PoolAllocator {
+ struct mem_info {
+ ibv_mr *mr;
+ MemPoolContext *ctx;
+ unsigned nbufs;
+ Chunk chunks[0];
+ };
+ public:
+ typedef std::size_t size_type;
+ typedef std::ptrdiff_t difference_type;
+
+ static char * malloc(const size_type bytes);
+ static void free(char * const block);
+
+ template<typename Func>
+ static std::invoke_result_t<Func> with_context(MemPoolContext* ctx,
+ Func&& func) {
+ std::lock_guard l{get_lock()};
+ g_ctx = ctx;
+ scope_guard reset_ctx{[] { g_ctx = nullptr; }};
+ return std::move(func)();
+ }
+ private:
+ static ceph::mutex& get_lock();
+ static MemPoolContext* g_ctx;
+ };
+
+ /**
+ * modify boost pool so that it is possible to
+ * have a thread safe 'context' when allocating/freeing
+ * the memory. It is needed to allow a different pool
+ * configurations and bookkeeping per CephContext and
+ * also to be able to use same allocator to deal with
+ * RX and TX pool.
+ * TODO: use boost pool to allocate TX chunks too
+ */
+ class mem_pool : public boost::pool<PoolAllocator> {
+ private:
+ MemPoolContext *ctx;
+ void *slow_malloc();
+
+ public:
+ ceph::mutex lock = ceph::make_mutex("mem_pool_lock");
+ explicit mem_pool(MemPoolContext *ctx, const size_type nrequested_size,
+ const size_type nnext_size = 32,
+ const size_type nmax_size = 0) :
+ pool(nrequested_size, nnext_size, nmax_size),
+ ctx(ctx) { }
+
+ void *malloc() {
+ if (!store().empty())
+ return (store().malloc)();
+ // need to alloc more memory...
+ // slow path code
+ return slow_malloc();
+ }
+ };
+
+ MemoryManager(CephContext *c, Device *d, ProtectionDomain *p);
+ ~MemoryManager();
+
+ void* malloc(size_t size);
+ void free(void *ptr);
+
+ void create_tx_pool(uint32_t size, uint32_t tx_num);
+ void return_tx(std::vector<Chunk*> &chunks);
+ int get_send_buffers(std::vector<Chunk*> &c, size_t bytes);
+ bool is_tx_buffer(const char* c) { return send->is_my_buffer(c); }
+ bool is_valid_chunk(const Chunk* c) { return send->is_valid_chunk(c); }
+ Chunk *get_tx_chunk_by_buffer(const char *c) {
+ return send->get_chunk_by_buffer(c);
+ }
+ uint32_t get_tx_buffer_size() const {
+ return send->buffer_size;
+ }
+
+ Chunk *get_rx_buffer() {
+ std::lock_guard l{rxbuf_pool.lock};
+ return reinterpret_cast<Chunk *>(rxbuf_pool.malloc());
+ }
+
+ void release_rx_buffer(Chunk *chunk) {
+ std::lock_guard l{rxbuf_pool.lock};
+ chunk->clear_qp();
+ rxbuf_pool.free(chunk);
+ }
+
+ void set_rx_stat_logger(PerfCounters *logger) {
+ rxbuf_pool_ctx.set_stat_logger(logger);
+ }
+
+ CephContext *cct;
+ private:
+ // TODO: Cluster -> TxPool txbuf_pool
+ // chunk layout fix
+ //
+ Cluster* send = nullptr;// SEND
+ Device *device;
+ ProtectionDomain *pd;
+ MemPoolContext rxbuf_pool_ctx;
+ mem_pool rxbuf_pool;
+
+
+ void* huge_pages_malloc(size_t size);
+ void huge_pages_free(void *ptr);
+ };
+
+ private:
+ uint32_t tx_queue_len = 0;
+ uint32_t rx_queue_len = 0;
+ uint32_t max_sge = 0;
+ uint8_t ib_physical_port = 0;
+ MemoryManager* memory_manager = nullptr;
+ ibv_srq* srq = nullptr; // shared receive work queue
+ Device *device = NULL;
+ ProtectionDomain *pd = NULL;
+ DeviceList *device_list = nullptr;
+ CephContext *cct;
+ ceph::mutex lock = ceph::make_mutex("IB lock");
+ bool initialized = false;
+ const std::string &device_name;
+ uint8_t port_num;
+ bool support_srq = false;
+
+ public:
+ explicit Infiniband(CephContext *c);
+ ~Infiniband();
+ void init();
+ static void verify_prereq(CephContext *cct);
+
+ class CompletionChannel {
+ static const uint32_t MAX_ACK_EVENT = 5000;
+ CephContext *cct;
+ Infiniband& infiniband;
+ ibv_comp_channel *channel;
+ ibv_cq *cq;
+ uint32_t cq_events_that_need_ack;
+
+ public:
+ CompletionChannel(CephContext *c, Infiniband &ib);
+ ~CompletionChannel();
+ int init();
+ bool get_cq_event();
+ int get_fd() { return channel->fd; }
+ ibv_comp_channel* get_channel() { return channel; }
+ void bind_cq(ibv_cq *c) { cq = c; }
+ void ack_events();
+ };
+
+ // this class encapsulates the creation, use, and destruction of an RC
+ // completion queue.
+ //
+ // You need to call init and it will create a cq and associate to comp channel
+ class CompletionQueue {
+ public:
+ CompletionQueue(CephContext *c, Infiniband &ib,
+ const uint32_t qd, CompletionChannel *cc)
+ : cct(c), infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {}
+ ~CompletionQueue();
+ int init();
+ int poll_cq(int num_entries, ibv_wc *ret_wc_array);
+
+ ibv_cq* get_cq() const { return cq; }
+ int rearm_notify(bool solicited_only=true);
+ CompletionChannel* get_cc() const { return channel; }
+ private:
+ CephContext *cct;
+ Infiniband& infiniband; // Infiniband to which this QP belongs
+ CompletionChannel *channel;
+ ibv_cq *cq;
+ uint32_t queue_depth;
+ };
+
+ // this class encapsulates the creation, use, and destruction of an RC
+ // queue pair.
+ //
+ // you need call init and it will create a qp and bring it to the INIT state.
+ // after obtaining the lid, qpn, and psn of a remote queue pair, one
+ // must call plumb() to bring the queue pair to the RTS state.
+ class QueuePair {
+ public:
+ typedef MemoryManager::Chunk Chunk;
+ QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type,
+ int ib_physical_port, ibv_srq *srq,
+ Infiniband::CompletionQueue* txcq,
+ Infiniband::CompletionQueue* rxcq,
+ uint32_t tx_queue_len, uint32_t max_recv_wr, struct rdma_cm_id *cid, uint32_t q_key = 0);
+ ~QueuePair();
+
+ int modify_qp_to_error();
+ int modify_qp_to_rts();
+ int modify_qp_to_rtr();
+ int modify_qp_to_init();
+ int init();
+
+ /**
+ * Get the initial packet sequence number for this QueuePair.
+ * This is randomly generated on creation. It should not be confused
+ * with the remote side's PSN, which is set in #plumb().
+ */
+ uint32_t get_initial_psn() const { return initial_psn; };
+ /**
+ * Get the local queue pair number for this QueuePair.
+ * QPNs are analogous to UDP/TCP port numbers.
+ */
+ uint32_t get_local_qp_number() const { return qp->qp_num; };
+ /**
+ * Get the remote queue pair number for this QueuePair, as set in #plumb().
+ * QPNs are analogous to UDP/TCP port numbers.
+ */
+ int get_remote_qp_number(uint32_t *rqp) const;
+ /**
+ * Get the remote infiniband address for this QueuePair, as set in #plumb().
+ * LIDs are "local IDs" in infiniband terminology. They are short, locally
+ * routable addresses.
+ */
+ int get_remote_lid(uint16_t *lid) const;
+ /**
+ * Get the state of a QueuePair.
+ */
+ int get_state() const;
+ /*
+ * send/receive connection management meta data
+ */
+ int send_cm_meta(CephContext *cct, int socket_fd);
+ int recv_cm_meta(CephContext *cct, int socket_fd);
+ void wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data);
+ void gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[]);
+ ibv_qp* get_qp() const { return qp; }
+ Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }
+ Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }
+ int to_dead();
+ bool is_dead() const { return dead; }
+ ib_cm_meta_t& get_peer_cm_meta() { return peer_cm_meta; }
+ ib_cm_meta_t& get_local_cm_meta() { return local_cm_meta; }
+ void add_rq_wr(Chunk* chunk)
+ {
+ if (srq) return;
+
+ std::lock_guard l{lock};
+ recv_queue.push_back(chunk);
+ }
+
+ void remove_rq_wr(Chunk* chunk) {
+ if (srq) return;
+
+ std::lock_guard l{lock};
+ auto it = std::find(recv_queue.begin(), recv_queue.end(), chunk);
+ ceph_assert(it != recv_queue.end());
+ recv_queue.erase(it);
+ }
+ ibv_srq* get_srq() const { return srq; }
+
+ private:
+ CephContext *cct;
+ Infiniband& infiniband; // Infiniband to which this QP belongs
+ ibv_qp_type type; // QP type (IBV_QPT_RC, etc.)
+ ibv_context* ctxt; // device context of the HCA to use
+ int ib_physical_port;
+ ibv_pd* pd; // protection domain
+ ibv_srq* srq; // shared receive queue
+ ibv_qp* qp; // infiniband verbs QP handle
+ struct rdma_cm_id *cm_id;
+ ib_cm_meta_t peer_cm_meta;
+ ib_cm_meta_t local_cm_meta;
+ Infiniband::CompletionQueue* txcq;
+ Infiniband::CompletionQueue* rxcq;
+ uint32_t initial_psn; // initial packet sequence number
+ uint32_t max_send_wr;
+ uint32_t max_recv_wr;
+ uint32_t q_key;
+ bool dead;
+ std::vector<Chunk*> recv_queue;
+ ceph::mutex lock = ceph::make_mutex("queue_pair_lock");
+ };
+
+ public:
+ typedef MemoryManager::Cluster Cluster;
+ typedef MemoryManager::Chunk Chunk;
+ QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*,
+ ibv_qp_type type, struct rdma_cm_id *cm_id);
+ ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
+ // post rx buffers to srq, return number of buffers actually posted
+ int post_chunks_to_rq(int num, QueuePair *qp = nullptr);
+ void post_chunk_to_pool(Chunk* chunk) {
+ QueuePair *qp = chunk->get_qp();
+ if (qp != nullptr) {
+ qp->remove_rq_wr(chunk);
+ }
+ get_memory_manager()->release_rx_buffer(chunk);
+ }
+ int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
+ CompletionChannel *create_comp_channel(CephContext *c);
+ CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
+ uint8_t get_ib_physical_port() { return ib_physical_port; }
+ uint16_t get_lid() { return device->get_lid(); }
+ ibv_gid get_gid() { return device->get_gid(); }
+ MemoryManager* get_memory_manager() { return memory_manager; }
+ Device* get_device() { return device; }
+ int get_async_fd() { return device->ctxt->async_fd; }
+ bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
+ Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
+ static const char* wc_status_to_string(int status);
+ static const char* qp_state_string(int status);
+ uint32_t get_rx_queue_len() const { return rx_queue_len; }
+};
+
+#endif
diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
new file mode 100644
index 000000000..6c79dc54f
--- /dev/null
+++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
@@ -0,0 +1,607 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include "RDMAStack.h"
+
+class C_handle_connection_established : public EventCallback {
+ RDMAConnectedSocketImpl *csi;
+ bool active = true;
+ public:
+ C_handle_connection_established(RDMAConnectedSocketImpl *w) : csi(w) {}
+ void do_request(uint64_t fd) final {
+ if (active)
+ csi->handle_connection_established();
+ }
+ void close() {
+ active = false;
+ }
+};
+
+class C_handle_connection_read : public EventCallback {
+ RDMAConnectedSocketImpl *csi;
+ bool active = true;
+ public:
+ explicit C_handle_connection_read(RDMAConnectedSocketImpl *w): csi(w) {}
+ void do_request(uint64_t fd) final {
+ if (active)
+ csi->handle_connection();
+ }
+ void close() {
+ active = false;
+ }
+};
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
+
+RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband> &ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ RDMAWorker *w)
+ : cct(cct), connected(0), error(0), ib(ib),
+ dispatcher(rdma_dispatcher), worker(w),
+ is_server(false), read_handler(new C_handle_connection_read(this)),
+ established_handler(new C_handle_connection_established(this)),
+ active(false), pending(false)
+{
+ if (!cct->_conf->ms_async_rdma_cm) {
+ qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL);
+ if (!qp) {
+ lderr(cct) << __func__ << " queue pair create failed" << dendl;
+ return;
+ }
+ local_qpn = qp->get_local_qp_number();
+ notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+ dispatcher->register_qp(qp, this);
+ dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
+ dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
+ }
+}
+
+RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
+{
+ ldout(cct, 20) << __func__ << " destruct." << dendl;
+ cleanup();
+ worker->remove_pending_conn(this);
+ dispatcher->schedule_qp_destroy(local_qpn);
+
+ for (unsigned i=0; i < wc.size(); ++i) {
+ dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
+ }
+ for (unsigned i=0; i < buffers.size(); ++i) {
+ dispatcher->post_chunk_to_pool(buffers[i]);
+ }
+
+ std::lock_guard l{lock};
+ if (notify_fd >= 0)
+ ::close(notify_fd);
+ if (tcp_fd >= 0)
+ ::close(tcp_fd);
+ error = ECONNRESET;
+}
+
+void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
+{
+ std::lock_guard l{lock};
+ if (wc.empty())
+ wc = std::move(v);
+ else
+ wc.insert(wc.end(), v.begin(), v.end());
+ notify();
+}
+
+void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
+{
+ std::lock_guard l{lock};
+ if (wc.empty())
+ return ;
+ w.swap(wc);
+}
+
+int RDMAConnectedSocketImpl::activate()
+{
+ qp->get_local_cm_meta().peer_qpn = qp->get_peer_cm_meta().local_qpn;
+ if (qp->modify_qp_to_rtr() != 0)
+ return -1;
+
+ if (qp->modify_qp_to_rts() != 0)
+ return -1;
+
+ if (!is_server) {
+ connected = 1; //indicate successfully
+ ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_qpn << dendl;
+ submit(false);
+ }
+ active = true;
+ peer_qpn = qp->get_local_cm_meta().peer_qpn;
+
+ return 0;
+}
+
+int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
+ ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
+ << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
+ ceph::NetHandler net(cct);
+
+ // we construct a socket to transport ib sync message
+ // but we shouldn't block in tcp connecting
+ if (opts.nonblock) {
+ tcp_fd = net.nonblock_connect(peer_addr, opts.connect_bind_addr);
+ } else {
+ tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
+ }
+
+ if (tcp_fd < 0) {
+ return -errno;
+ }
+
+ int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size);
+ if (r < 0) {
+ ::close(tcp_fd);
+ tcp_fd = -1;
+ return -errno;
+ }
+
+ ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
+ net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
+ r = 0;
+ if (opts.nonblock) {
+ worker->center.create_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE , established_handler);
+ } else {
+ r = handle_connection_established(false);
+ }
+ return r;
+}
+
+int RDMAConnectedSocketImpl::handle_connection_established(bool need_set_fault) {
+ ldout(cct, 20) << __func__ << " start " << dendl;
+ // delete read event
+ worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE);
+ if (1 == connected) {
+ ldout(cct, 1) << __func__ << " warnning: logic failed " << dendl;
+ if (need_set_fault) {
+ fault();
+ }
+ return -1;
+ }
+ // send handshake msg to server
+ qp->get_local_cm_meta().peer_qpn = 0;
+ int r = qp->send_cm_meta(cct, tcp_fd);
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " send handshake msg failed." << r << dendl;
+ if (need_set_fault) {
+ fault();
+ }
+ return r;
+ }
+ worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler);
+ ldout(cct, 20) << __func__ << " finish " << dendl;
+ return 0;
+}
+
+void RDMAConnectedSocketImpl::handle_connection() {
+ ldout(cct, 20) << __func__ << " QP: " << local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
+ int r = qp->recv_cm_meta(cct, tcp_fd);
+ if (r <= 0) {
+ if (r != -EAGAIN) {
+ dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
+ ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
+ fault();
+ }
+ return;
+ }
+
+ if (1 == connected) {
+ ldout(cct, 1) << __func__ << " warnning: logic failed: read len: " << r << dendl;
+ fault();
+ return;
+ }
+
+ if (!is_server) {// first time: cm meta sync + ack from server
+ if (!connected) {
+ r = activate();
+ ceph_assert(!r);
+ }
+ notify();
+ r = qp->send_cm_meta(cct, tcp_fd);
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
+ dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
+ fault();
+ }
+ } else {
+ if (qp->get_peer_cm_meta().peer_qpn == 0) {// first time: cm meta sync from client
+ if (active) {
+ ldout(cct, 10) << __func__ << " server is already active." << dendl;
+ return ;
+ }
+ r = activate();
+ ceph_assert(!r);
+ r = qp->send_cm_meta(cct, tcp_fd);
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " server ack failed." << dendl;
+ dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
+ fault();
+ return ;
+ }
+ } else { // second time: cm meta ack from client
+ connected = 1;
+ ldout(cct, 10) << __func__ << " handshake of rdma is done. server connected: " << connected << dendl;
+ //cleanup();
+ submit(false);
+ notify();
+ }
+ }
+}
+
+ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
+{
+ eventfd_t event_val = 0;
+ int r = eventfd_read(notify_fd, &event_val);
+ ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_qpn
+ << " r = " << r << dendl;
+
+ if (!active) {
+ ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl;
+ return -EAGAIN;
+ }
+
+ if (0 == connected) {
+ ldout(cct, 1) << __func__ << " when ib not connected. len: " << len <<dendl;
+ return -EAGAIN;
+ }
+ ssize_t read = 0;
+ read = read_buffers(buf,len);
+
+ if (is_server && connected == 0) {
+ ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << local_qpn << " peer QP: " << peer_qpn << dendl;
+ connected = 1; //if so, we don't need the last handshake
+ cleanup();
+ submit(false);
+ }
+
+ if (!buffers.empty()) {
+ notify();
+ }
+
+ if (read == 0 && error)
+ return -error;
+ return read == 0 ? -EAGAIN : read;
+}
+
+void RDMAConnectedSocketImpl::buffer_prefetch(void)
+{
+ std::vector<ibv_wc> cqe;
+ get_wc(cqe);
+ if(cqe.empty())
+ return;
+
+ for(size_t i = 0; i < cqe.size(); ++i) {
+ ibv_wc* response = &cqe[i];
+ ceph_assert(response->status == IBV_WC_SUCCESS);
+ Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+ chunk->prepare_read(response->byte_len);
+
+ if (chunk->get_size() == 0) {
+ chunk->reset_read_chunk();
+ dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
+ if (connected) {
+ error = ECONNRESET;
+ ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
+ }
+ dispatcher->post_chunk_to_pool(chunk);
+ continue;
+ } else {
+ buffers.push_back(chunk);
+ ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
+ }
+ }
+ worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
+}
+
+ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
+{
+ size_t read_size = 0, tmp = 0;
+ buffer_prefetch();
+ auto pchunk = buffers.begin();
+ while (pchunk != buffers.end()) {
+ tmp = (*pchunk)->read(buf + read_size, len - read_size);
+ read_size += tmp;
+ ldout(cct, 25) << __func__ << " read chunk " << *pchunk << " bytes length" << tmp << " offset: "
+ << (*pchunk)->get_offset() << " ,bound: " << (*pchunk)->get_bound() << dendl;
+
+ if ((*pchunk)->get_size() == 0) {
+ (*pchunk)->reset_read_chunk();
+ dispatcher->post_chunk_to_pool(*pchunk);
+ update_post_backlog();
+ ldout(cct, 25) << __func__ << " read over one chunk " << dendl;
+ pchunk++;
+ }
+
+ if (read_size == len) {
+ break;
+ }
+ }
+
+ buffers.erase(buffers.begin(), pchunk);
+ ldout(cct, 25) << __func__ << " got " << read_size << " bytes, buffers size: " << buffers.size() << dendl;
+ worker->perf_logger->inc(l_msgr_rdma_rx_bytes, read_size);
+ return read_size;
+}
+
+ssize_t RDMAConnectedSocketImpl::send(ceph::buffer::list &bl, bool more)
+{
+ if (error) {
+ if (!active)
+ return -EPIPE;
+ return -error;
+ }
+ size_t bytes = bl.length();
+ if (!bytes)
+ return 0;
+ {
+ std::lock_guard l{lock};
+ pending_bl.claim_append(bl);
+ if (!connected) {
+ ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_qpn << dendl;
+ return bytes;
+ }
+ }
+ ldout(cct, 20) << __func__ << " QP: " << local_qpn << dendl;
+ ssize_t r = submit(more);
+ if (r < 0 && r != -EAGAIN)
+ return r;
+ return bytes;
+}
+
+size_t RDMAConnectedSocketImpl::tx_copy_chunk(std::vector<Chunk*> &tx_buffers,
+ size_t req_copy_len, decltype(std::cbegin(pending_bl.buffers()))& start,
+ const decltype(std::cbegin(pending_bl.buffers()))& end)
+{
+ ceph_assert(start != end);
+ auto chunk_idx = tx_buffers.size();
+ if (0 == worker->get_reged_mem(this, tx_buffers, req_copy_len)) {
+ ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
+ worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
+ return 0;
+ }
+
+ Chunk *current_chunk = tx_buffers[chunk_idx];
+ size_t write_len = 0;
+ while (start != end) {
+ const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str());
+
+ size_t slice_write_len = 0;
+ while (slice_write_len < start->length()) {
+ size_t real_len = current_chunk->write((char*)addr + slice_write_len, start->length() - slice_write_len);
+
+ slice_write_len += real_len;
+ write_len += real_len;
+ req_copy_len -= real_len;
+
+ if (current_chunk->full()) {
+ if (++chunk_idx == tx_buffers.size())
+ return write_len;
+ current_chunk = tx_buffers[chunk_idx];
+ }
+ }
+
+ ++start;
+ }
+ ceph_assert(req_copy_len == 0);
+ return write_len;
+}
+
+ssize_t RDMAConnectedSocketImpl::submit(bool more)
+{
+ if (error)
+ return -error;
+ std::lock_guard l{lock};
+ size_t bytes = pending_bl.length();
+ ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
+ << pending_bl.get_num_buffers() << dendl;
+ if (!bytes)
+ return 0;
+
+ std::vector<Chunk*> tx_buffers;
+ auto it = std::cbegin(pending_bl.buffers());
+ auto copy_start = it;
+ size_t total_copied = 0, wait_copy_len = 0;
+ while (it != pending_bl.buffers().end()) {
+ if (ib->is_tx_buffer(it->raw_c_str())) {
+ if (wait_copy_len) {
+ size_t copied = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
+ total_copied += copied;
+ if (copied < wait_copy_len)
+ goto sending;
+ wait_copy_len = 0;
+ }
+ ceph_assert(copy_start == it);
+ tx_buffers.push_back(ib->get_tx_chunk_by_buffer(it->raw_c_str()));
+ total_copied += it->length();
+ ++copy_start;
+ } else {
+ wait_copy_len += it->length();
+ }
+ ++it;
+ }
+ if (wait_copy_len)
+ total_copied += tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
+
+ sending:
+ if (total_copied == 0)
+ return -EAGAIN;
+ ceph_assert(total_copied <= pending_bl.length());
+ ceph::buffer::list swapped;
+ if (total_copied < pending_bl.length()) {
+ worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
+ pending_bl.splice(total_copied, pending_bl.length() - total_copied, &swapped);
+ pending_bl.swap(swapped);
+ } else {
+ pending_bl.clear();
+ }
+
+ ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
+ << pending_bl.get_num_buffers() << " tx chunks " << tx_buffers.size() << dendl;
+
+ int r = post_work_request(tx_buffers);
+ if (r < 0)
+ return r;
+
+ ldout(cct, 20) << __func__ << " finished sending " << total_copied << " bytes." << dendl;
+ return pending_bl.length() ? -EAGAIN : 0;
+}
+
+int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
+{
+ ldout(cct, 20) << __func__ << " QP: " << local_qpn << " " << tx_buffers[0] << dendl;
+ auto current_buffer = tx_buffers.begin();
+ ibv_sge isge[tx_buffers.size()];
+ uint32_t current_sge = 0;
+ ibv_send_wr iswr[tx_buffers.size()];
+ uint32_t current_swr = 0;
+ ibv_send_wr* pre_wr = NULL;
+ uint32_t num = 0;
+
+ // FIPS zeroization audit 20191115: these memsets are not security related.
+ memset(iswr, 0, sizeof(iswr));
+ memset(isge, 0, sizeof(isge));
+
+ while (current_buffer != tx_buffers.end()) {
+ isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);
+ isge[current_sge].length = (*current_buffer)->get_offset();
+ isge[current_sge].lkey = (*current_buffer)->mr->lkey;
+ ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl;
+
+ iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);
+ iswr[current_swr].next = NULL;
+ iswr[current_swr].sg_list = &isge[current_sge];
+ iswr[current_swr].num_sge = 1;
+ iswr[current_swr].opcode = IBV_WR_SEND;
+ iswr[current_swr].send_flags = IBV_SEND_SIGNALED;
+
+ num++;
+ worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
+ if (pre_wr)
+ pre_wr->next = &iswr[current_swr];
+ pre_wr = &iswr[current_swr];
+ ++current_sge;
+ ++current_swr;
+ ++current_buffer;
+ }
+
+ ibv_send_wr *bad_tx_work_request = nullptr;
+ if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
+ ldout(cct, 1) << __func__ << " failed to send data"
+ << " (most probably should be peer not ready): "
+ << cpp_strerror(errno) << dendl;
+ worker->perf_logger->inc(l_msgr_rdma_tx_failed);
+ return -errno;
+ }
+ worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
+ ldout(cct, 20) << __func__ << " qp state is " << get_qp_state() << dendl;
+ return 0;
+}
+
+void RDMAConnectedSocketImpl::fin() {
+ ibv_send_wr wr;
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(&wr, 0, sizeof(wr));
+
+ wr.wr_id = reinterpret_cast<uint64_t>(qp);
+ wr.num_sge = 0;
+ wr.opcode = IBV_WR_SEND;
+ wr.send_flags = IBV_SEND_SIGNALED;
+ ibv_send_wr* bad_tx_work_request = nullptr;
+ if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
+ ldout(cct, 1) << __func__ << " failed to send message="
+ << " ibv_post_send failed(most probably should be peer not ready): "
+ << cpp_strerror(errno) << dendl;
+ worker->perf_logger->inc(l_msgr_rdma_tx_failed);
+ return ;
+ }
+}
+
+void RDMAConnectedSocketImpl::cleanup() {
+ if (read_handler && tcp_fd >= 0) {
+ (static_cast<C_handle_connection_read*>(read_handler))->close();
+ worker->center.submit_to(worker->center.get_id(), [this]() {
+ worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE);
+ }, false);
+ delete read_handler;
+ read_handler = nullptr;
+ }
+ if (established_handler) {
+ (static_cast<C_handle_connection_established*>(established_handler))->close();
+ delete established_handler;
+ established_handler = nullptr;
+ }
+}
+
+void RDMAConnectedSocketImpl::notify()
+{
+ eventfd_t event_val = 1;
+ int r = eventfd_write(notify_fd, event_val);
+ ceph_assert(r == 0);
+}
+
+void RDMAConnectedSocketImpl::shutdown()
+{
+ if (!error)
+ fin();
+ error = ECONNRESET;
+ active = false;
+}
+
+void RDMAConnectedSocketImpl::close()
+{
+ if (!error)
+ fin();
+ error = ECONNRESET;
+ active = false;
+}
+
+void RDMAConnectedSocketImpl::set_priority(int sd, int prio, int domain) {
+ ceph::NetHandler net(cct);
+ net.set_priority(sd, prio, domain);
+}
+
+void RDMAConnectedSocketImpl::fault()
+{
+ ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
+ error = ECONNRESET;
+ connected = 1;
+ notify();
+}
+
+void RDMAConnectedSocketImpl::set_accept_fd(int sd)
+{
+ tcp_fd = sd;
+ is_server = true;
+ worker->center.submit_to(worker->center.get_id(), [this]() {
+ worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler);
+ }, true);
+}
+
+void RDMAConnectedSocketImpl::post_chunks_to_rq(int num)
+{
+ post_backlog += num - ib->post_chunks_to_rq(num, qp);
+}
+
+void RDMAConnectedSocketImpl::update_post_backlog()
+{
+ if (post_backlog)
+ post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp);
+}
diff --git a/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
new file mode 100644
index 000000000..606dbd281
--- /dev/null
+++ b/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
@@ -0,0 +1,183 @@
+#include "RDMAStack.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << " RDMAIWARPConnectedSocketImpl "
+
+#define TIMEOUT_MS 3000
+#define RETRY_COUNT 7
+
+RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ RDMAWorker *w, RDMACMInfo *info)
+ : RDMAConnectedSocketImpl(cct, ib, rdma_dispatcher, w), cm_con_handler(new C_handle_cm_connection(this))
+{
+ status = IDLE;
+ notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+ if (info) {
+ is_server = true;
+ cm_id = info->cm_id;
+ cm_channel = info->cm_channel;
+ status = RDMA_ID_CREATED;
+ peer_qpn = info->qp_num;
+ if (alloc_resource()) {
+ close_notify();
+ return;
+ }
+ worker->center.submit_to(worker->center.get_id(), [this]() {
+ worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
+ status = CHANNEL_FD_CREATED;
+ }, false);
+ status = RESOURCE_ALLOCATED;
+ qp->get_local_cm_meta().peer_qpn = peer_qpn;
+ qp->get_peer_cm_meta().local_qpn = peer_qpn;
+ } else {
+ is_server = false;
+ cm_channel = rdma_create_event_channel();
+ rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP);
+ status = RDMA_ID_CREATED;
+ ldout(cct, 20) << __func__ << " successfully created cm id: " << cm_id << dendl;
+ }
+}
+
+RDMAIWARPConnectedSocketImpl::~RDMAIWARPConnectedSocketImpl() {
+ ldout(cct, 20) << __func__ << " destruct." << dendl;
+ std::unique_lock l(close_mtx);
+ close_condition.wait(l, [&] { return closed; });
+ if (status >= RDMA_ID_CREATED) {
+ rdma_destroy_id(cm_id);
+ rdma_destroy_event_channel(cm_channel);
+ }
+}
+
+int RDMAIWARPConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
+ worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
+ status = CHANNEL_FD_CREATED;
+ if (rdma_resolve_addr(cm_id, NULL, const_cast<struct sockaddr*>(peer_addr.get_sockaddr()), TIMEOUT_MS)) {
+ lderr(cct) << __func__ << " failed to resolve addr" << dendl;
+ return -1;
+ }
+ return 0;
+}
+
+void RDMAIWARPConnectedSocketImpl::close() {
+ error = ECONNRESET;
+ active = false;
+ if (status >= CONNECTED) {
+ rdma_disconnect(cm_id);
+ }
+ close_notify();
+}
+
+void RDMAIWARPConnectedSocketImpl::shutdown() {
+ error = ECONNRESET;
+ active = false;
+}
+
+void RDMAIWARPConnectedSocketImpl::handle_cm_connection() {
+ struct rdma_cm_event *event;
+ rdma_get_cm_event(cm_channel, &event);
+ ldout(cct, 20) << __func__ << " event name: " << rdma_event_str(event->event)
+ << " (cm id: " << cm_id << ")" << dendl;
+ struct rdma_conn_param cm_params;
+ switch (event->event) {
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ status = ADDR_RESOLVED;
+ if (rdma_resolve_route(cm_id, TIMEOUT_MS)) {
+ lderr(cct) << __func__ << " failed to resolve rdma addr" << dendl;
+ notify();
+ }
+ break;
+
+ case RDMA_CM_EVENT_ROUTE_RESOLVED:
+ status = ROUTE_RESOLVED;
+ if (alloc_resource()) {
+ lderr(cct) << __func__ << " failed to alloc resource while resolving the route" << dendl;
+ connected = -ECONNREFUSED;
+ notify();
+ break;
+ }
+
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(&cm_params, 0, sizeof(cm_params));
+ cm_params.retry_count = RETRY_COUNT;
+ cm_params.qp_num = local_qpn;
+ if (rdma_connect(cm_id, &cm_params)) {
+ lderr(cct) << __func__ << " failed to connect remote rdma port" << dendl;
+ connected = -ECONNREFUSED;
+ notify();
+ }
+ break;
+
+ case RDMA_CM_EVENT_ESTABLISHED:
+ ldout(cct, 20) << __func__ << " qp_num=" << cm_id->qp->qp_num << dendl;
+ status = CONNECTED;
+ if (!is_server) {
+ peer_qpn = event->param.conn.qp_num;
+ activate();
+ qp->get_local_cm_meta().peer_qpn = peer_qpn;
+ qp->get_peer_cm_meta().local_qpn = peer_qpn;
+ notify();
+ }
+ break;
+
+ case RDMA_CM_EVENT_ADDR_ERROR:
+ case RDMA_CM_EVENT_ROUTE_ERROR:
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ case RDMA_CM_EVENT_UNREACHABLE:
+ case RDMA_CM_EVENT_REJECTED:
+ lderr(cct) << __func__ << " rdma connection rejected" << dendl;
+ connected = -ECONNREFUSED;
+ notify();
+ break;
+
+ case RDMA_CM_EVENT_DISCONNECTED:
+ status = DISCONNECTED;
+ close_notify();
+ if (!error) {
+ error = ECONNRESET;
+ notify();
+ }
+ break;
+
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ break;
+
+ default:
+ ceph_abort_msg("unhandled event");
+ break;
+ }
+ rdma_ack_cm_event(event);
+}
+
+void RDMAIWARPConnectedSocketImpl::activate() {
+ ldout(cct, 30) << __func__ << dendl;
+ active = true;
+ connected = 1;
+}
+
+int RDMAIWARPConnectedSocketImpl::alloc_resource() {
+ ldout(cct, 30) << __func__ << dendl;
+ qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(),
+ dispatcher->get_rx_cq(), IBV_QPT_RC, cm_id);
+ if (!qp) {
+ return -1;
+ }
+ local_qpn = qp->get_local_qp_number();
+ dispatcher->register_qp(qp, this);
+ dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
+ dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
+ return 0;
+}
+
+void RDMAIWARPConnectedSocketImpl::close_notify() {
+ ldout(cct, 30) << __func__ << dendl;
+ if (status >= CHANNEL_FD_CREATED) {
+ worker->center.delete_file_event(cm_channel->fd, EVENT_READABLE);
+ }
+ std::unique_lock l(close_mtx);
+ if (!closed) {
+ closed = true;
+ close_condition.notify_all();
+ }
+}
diff --git a/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc
new file mode 100644
index 000000000..0500b4420
--- /dev/null
+++ b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc
@@ -0,0 +1,119 @@
+#include <poll.h>
+
+#include "msg/async/net_handler.h"
+#include "RDMAStack.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << " RDMAIWARPServerSocketImpl "
+
+RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(
+ CephContext *cct, std::shared_ptr<Infiniband>& ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w,
+ entity_addr_t& a, unsigned addr_slot)
+ : RDMAServerSocketImpl(cct, ib, rdma_dispatcher, w, a, addr_slot)
+{
+}
+
+int RDMAIWARPServerSocketImpl::listen(entity_addr_t &sa,
+ const SocketOptions &opt)
+{
+ ldout(cct, 20) << __func__ << " bind to rdma point" << dendl;
+ cm_channel = rdma_create_event_channel();
+ rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP);
+ ldout(cct, 20) << __func__ << " successfully created cm id: " << cm_id << dendl;
+ int rc = rdma_bind_addr(cm_id, const_cast<struct sockaddr*>(sa.get_sockaddr()));
+ if (rc < 0) {
+ rc = -errno;
+ ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
+ << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;
+ goto err;
+ }
+ rc = rdma_listen(cm_id, 128);
+ if (rc < 0) {
+ rc = -errno;
+ ldout(cct, 10) << __func__ << " unable to listen to " << sa.get_sockaddr()
+ << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;
+ goto err;
+ }
+ server_setup_socket = cm_channel->fd;
+ rc = net.set_nonblock(server_setup_socket);
+ if (rc < 0) {
+ goto err;
+ }
+ ldout(cct, 20) << __func__ << " fd of cm_channel is " << server_setup_socket << dendl;
+ return 0;
+
+err:
+ server_setup_socket = -1;
+ rdma_destroy_id(cm_id);
+ rdma_destroy_event_channel(cm_channel);
+ return rc;
+}
+
+int RDMAIWARPServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt,
+ entity_addr_t *out, Worker *w)
+{
+ ldout(cct, 15) << __func__ << dendl;
+
+ ceph_assert(sock);
+ struct pollfd pfd = {
+ .fd = cm_channel->fd,
+ .events = POLLIN,
+ .revents = 0,
+ };
+ int ret = poll(&pfd, 1, 0);
+ ceph_assert(ret >= 0);
+ if (!ret)
+ return -EAGAIN;
+
+ struct rdma_cm_event *cm_event;
+ rdma_get_cm_event(cm_channel, &cm_event);
+ ldout(cct, 20) << __func__ << " event name: " << rdma_event_str(cm_event->event) << dendl;
+
+ struct rdma_cm_id *event_cm_id = cm_event->id;
+ struct rdma_event_channel *event_channel = rdma_create_event_channel();
+
+ if (net.set_nonblock(event_channel->fd) < 0) {
+ lderr(cct) << __func__ << " failed to switch event channel to non-block, close event channel " << dendl;
+ rdma_destroy_event_channel(event_channel);
+ rdma_ack_cm_event(cm_event);
+ return -errno;
+ }
+
+ rdma_migrate_id(event_cm_id, event_channel);
+
+ struct rdma_conn_param *remote_conn_param = &cm_event->param.conn;
+ struct rdma_conn_param local_conn_param;
+
+ RDMACMInfo info(event_cm_id, event_channel, remote_conn_param->qp_num);
+ RDMAIWARPConnectedSocketImpl* server =
+ new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, dynamic_cast<RDMAWorker*>(w), &info);
+
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(&local_conn_param, 0, sizeof(local_conn_param));
+ local_conn_param.qp_num = server->get_local_qpn();
+
+ if (rdma_accept(event_cm_id, &local_conn_param)) {
+ return -EAGAIN;
+ }
+ server->activate();
+ ldout(cct, 20) << __func__ << " accepted a new QP" << dendl;
+
+ rdma_ack_cm_event(cm_event);
+
+ std::unique_ptr<RDMAConnectedSocketImpl> csi(server);
+ *sock = ConnectedSocket(std::move(csi));
+ struct sockaddr *addr = &event_cm_id->route.addr.dst_addr;
+ out->set_sockaddr(addr);
+
+ return 0;
+}
+
+void RDMAIWARPServerSocketImpl::abort_accept()
+{
+ if (server_setup_socket >= 0) {
+ rdma_destroy_id(cm_id);
+ rdma_destroy_event_channel(cm_channel);
+ }
+}
diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc
new file mode 100644
index 000000000..665faa931
--- /dev/null
+++ b/src/msg/async/rdma/RDMAServerSocketImpl.cc
@@ -0,0 +1,135 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "msg/async/net_handler.h"
+#include "RDMAStack.h"
+
+#include "include/compat.h"
+#include "include/sock_compat.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << " RDMAServerSocketImpl "
+
+RDMAServerSocketImpl::RDMAServerSocketImpl(
+ CephContext *cct, std::shared_ptr<Infiniband>& ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ RDMAWorker *w, entity_addr_t& a, unsigned slot)
+ : ServerSocketImpl(a.get_type(), slot),
+ cct(cct), net(cct), server_setup_socket(-1), ib(ib),
+ dispatcher(rdma_dispatcher), worker(w), sa(a)
+{
+}
+
+int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
+{
+ int rc = 0;
+ server_setup_socket = net.create_socket(sa.get_family(), true);
+ if (server_setup_socket < 0) {
+ rc = -errno;
+ lderr(cct) << __func__ << " failed to create server socket: "
+ << cpp_strerror(errno) << dendl;
+ return rc;
+ }
+
+ rc = net.set_nonblock(server_setup_socket);
+ if (rc < 0) {
+ goto err;
+ }
+
+ rc = net.set_socket_options(server_setup_socket, opt.nodelay, opt.rcbuf_size);
+ if (rc < 0) {
+ goto err;
+ }
+
+ rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len());
+ if (rc < 0) {
+ rc = -errno;
+ ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
+ << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;
+ goto err;
+ }
+
+ rc = ::listen(server_setup_socket, cct->_conf->ms_tcp_listen_backlog);
+ if (rc < 0) {
+ rc = -errno;
+ lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(errno) << dendl;
+ goto err;
+ }
+
+ ldout(cct, 20) << __func__ << " bind to " << sa.get_sockaddr() << " on port " << sa.get_port() << dendl;
+ return 0;
+
+err:
+ ::close(server_setup_socket);
+ server_setup_socket = -1;
+ return rc;
+}
+
+int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)
+{
+ ldout(cct, 15) << __func__ << dendl;
+
+ ceph_assert(sock);
+
+ sockaddr_storage ss;
+ socklen_t slen = sizeof(ss);
+ int sd = accept_cloexec(server_setup_socket, (sockaddr*)&ss, &slen);
+ if (sd < 0) {
+ return -errno;
+ }
+
+ int r = net.set_nonblock(sd);
+ if (r < 0) {
+ ::close(sd);
+ return -errno;
+ }
+
+ r = net.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
+ if (r < 0) {
+ ::close(sd);
+ return -errno;
+ }
+
+ ceph_assert(NULL != out); //out should not be NULL in accept connection
+
+ out->set_type(addr_type);
+ out->set_sockaddr((sockaddr*)&ss);
+ net.set_priority(sd, opt.priority, out->get_family());
+
+ RDMAConnectedSocketImpl* server;
+ //Worker* w = dispatcher->get_stack()->get_worker();
+ server = new RDMAConnectedSocketImpl(cct, ib, dispatcher, dynamic_cast<RDMAWorker*>(w));
+ if (!server->get_qp()) {
+ lderr(cct) << __func__ << " server->qp is null" << dendl;
+ // cann't use delete server here, destructor will fail.
+ server->cleanup();
+ ::close(sd);
+ return -1;
+ }
+ server->set_accept_fd(sd);
+ ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;
+ std::unique_ptr<RDMAConnectedSocketImpl> csi(server);
+ *sock = ConnectedSocket(std::move(csi));
+
+ return 0;
+}
+
+void RDMAServerSocketImpl::abort_accept()
+{
+ if (server_setup_socket >= 0)
+ ::close(server_setup_socket);
+}
diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc
new file mode 100644
index 000000000..12db599d6
--- /dev/null
+++ b/src/msg/async/rdma/RDMAStack.cc
@@ -0,0 +1,812 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <poll.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <sys/resource.h>
+
+#include "include/str_list.h"
+#include "include/compat.h"
+#include "common/Cycles.h"
+#include "common/deleter.h"
+#include "RDMAStack.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << "RDMAStack "
+
+RDMADispatcher::~RDMADispatcher()
+{
+ ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
+ polling_stop();
+
+ ceph_assert(qp_conns.empty());
+ ceph_assert(num_qp_conn == 0);
+ ceph_assert(dead_queue_pairs.empty());
+}
+
+RDMADispatcher::RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib)
+ : cct(c), ib(ib)
+{
+ PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
+
+ plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
+ plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
+ plb.add_u64_counter(l_msgr_rdma_rx_bufs_in_use, "rx_bufs_in_use", "The number of rx buffers that are holding data and being processed");
+ plb.add_u64_counter(l_msgr_rdma_rx_bufs_total, "rx_bufs_total", "The total number of rx buffers");
+
+ plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
+ plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
+ plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
+ plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
+
+ plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
+ plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
+ plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");
+
+ plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events");
+ plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events");
+
+ plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors");
+
+
+ plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number");
+ plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number");
+
+ perf_logger = plb.create_perf_counters();
+ cct->get_perfcounters_collection()->add(perf_logger);
+ Cycles::init();
+}
+
+void RDMADispatcher::polling_start()
+{
+ // take lock because listen/connect can happen from different worker threads
+ std::lock_guard l{lock};
+
+ if (t.joinable())
+ return; // dispatcher thread already running
+
+ ib->get_memory_manager()->set_rx_stat_logger(perf_logger);
+
+ tx_cc = ib->create_comp_channel(cct);
+ ceph_assert(tx_cc);
+ rx_cc = ib->create_comp_channel(cct);
+ ceph_assert(rx_cc);
+ tx_cq = ib->create_comp_queue(cct, tx_cc);
+ ceph_assert(tx_cq);
+ rx_cq = ib->create_comp_queue(cct, rx_cc);
+ ceph_assert(rx_cq);
+
+ t = std::thread(&RDMADispatcher::polling, this);
+ ceph_pthread_setname(t.native_handle(), "rdma-polling");
+}
+
+void RDMADispatcher::polling_stop()
+{
+ {
+ std::lock_guard l{lock};
+ done = true;
+ }
+
+ if (!t.joinable())
+ return;
+
+ t.join();
+
+ tx_cc->ack_events();
+ rx_cc->ack_events();
+ delete tx_cq;
+ delete rx_cq;
+ delete tx_cc;
+ delete rx_cc;
+}
+
+void RDMADispatcher::handle_async_event()
+{
+ ldout(cct, 30) << __func__ << dendl;
+ while (1) {
+ ibv_async_event async_event;
+ if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
+ if (errno != EAGAIN)
+ lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
+ << " " << cpp_strerror(errno) << ")" << dendl;
+ return;
+ }
+ perf_logger->inc(l_msgr_rdma_total_async_events);
+ ldout(cct, 1) << __func__ << "Event : " << ibv_event_type_str(async_event.event_type) << dendl;
+
+ switch (async_event.event_type) {
+ /***********************CQ events********************/
+ case IBV_EVENT_CQ_ERR:
+ lderr(cct) << __func__ << " Fatal Error, effect all QP bound with same CQ, "
+ << " CQ Overflow, dev = " << ib->get_device()->ctxt
+ << " Need destroy and recreate resource " << dendl;
+ break;
+ /***********************QP events********************/
+ case IBV_EVENT_QP_FATAL:
+ {
+ /* Error occurred on a QP and it transitioned to error state */
+ ibv_qp* ib_qp = async_event.element.qp;
+ uint32_t qpn = ib_qp->qp_num;
+ QueuePair* qp = get_qp(qpn);
+ lderr(cct) << __func__ << " Fatal Error, event associate qp number: " << qpn
+ << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
+ << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
+ }
+ break;
+ case IBV_EVENT_QP_LAST_WQE_REACHED:
+ {
+ /*
+ * 1. The QP bound with SRQ is in IBV_QPS_ERR state & no more WQE on the RQ of the QP
+ * Reason: QP is force switched into Error before posting Beacon WR.
+ * The QP's WRs will be flushed into CQ with IBV_WC_WR_FLUSH_ERR status
+ * For SRQ, only WRs on the QP which is switched into Error status will be flushed.
+ * Handle: Only confirm that qp enter into dead queue pairs
+ * 2. The CQE with error was generated for the last WQE
+ * Handle: output error log
+ */
+ perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
+ ibv_qp* ib_qp = async_event.element.qp;
+ uint32_t qpn = ib_qp->qp_num;
+ std::lock_guard l{lock};
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
+ QueuePair* qp = get_qp_lockless(qpn);
+
+ if (qp && !qp->is_dead()) {
+ lderr(cct) << __func__ << " QP not dead, event associate qp number: " << qpn
+ << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
+ << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
+ }
+ if (!conn) {
+ ldout(cct, 20) << __func__ << " Connection's QP maybe entered into dead status. "
+ << " qp number: " << qpn << dendl;
+ } else {
+ conn->fault();
+ if (qp) {
+ if (!cct->_conf->ms_async_rdma_cm) {
+ enqueue_dead_qp_lockless(qpn);
+ }
+ }
+ }
+ }
+ break;
+ case IBV_EVENT_QP_REQ_ERR:
+ /* Invalid Request Local Work Queue Error */
+ [[fallthrough]];
+ case IBV_EVENT_QP_ACCESS_ERR:
+ /* Local access violation error */
+ [[fallthrough]];
+ case IBV_EVENT_COMM_EST:
+ /* Communication was established on a QP */
+ [[fallthrough]];
+ case IBV_EVENT_SQ_DRAINED:
+ /* Send Queue was drained of outstanding messages in progress */
+ [[fallthrough]];
+ case IBV_EVENT_PATH_MIG:
+ /* A connection has migrated to the alternate path */
+ [[fallthrough]];
+ case IBV_EVENT_PATH_MIG_ERR:
+ /* A connection failed to migrate to the alternate path */
+ break;
+ /***********************SRQ events*******************/
+ case IBV_EVENT_SRQ_ERR:
+ /* Error occurred on an SRQ */
+ [[fallthrough]];
+ case IBV_EVENT_SRQ_LIMIT_REACHED:
+ /* SRQ limit was reached */
+ break;
+ /***********************Port events******************/
+ case IBV_EVENT_PORT_ACTIVE:
+ /* Link became active on a port */
+ [[fallthrough]];
+ case IBV_EVENT_PORT_ERR:
+ /* Link became unavailable on a port */
+ [[fallthrough]];
+ case IBV_EVENT_LID_CHANGE:
+ /* LID was changed on a port */
+ [[fallthrough]];
+ case IBV_EVENT_PKEY_CHANGE:
+ /* P_Key table was changed on a port */
+ [[fallthrough]];
+ case IBV_EVENT_SM_CHANGE:
+ /* SM was changed on a port */
+ [[fallthrough]];
+ case IBV_EVENT_CLIENT_REREGISTER:
+ /* SM sent a CLIENT_REREGISTER request to a port */
+ [[fallthrough]];
+ case IBV_EVENT_GID_CHANGE:
+ /* GID table was changed on a port */
+ break;
+
+ /***********************CA events******************/
+ //CA events:
+ case IBV_EVENT_DEVICE_FATAL:
+ /* CA is in FATAL state */
+ lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
+ << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
+ break;
+ default:
+ lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
+ << " unknown event: " << async_event.event_type << dendl;
+ break;
+ }
+ ibv_ack_async_event(&async_event);
+ }
+}
+
+void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
+{
+ std::lock_guard l{lock};
+ ib->post_chunk_to_pool(chunk);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+}
+
+int RDMADispatcher::post_chunks_to_rq(int num, QueuePair *qp)
+{
+ std::lock_guard l{lock};
+ return ib->post_chunks_to_rq(num, qp);
+}
+
+void RDMADispatcher::polling()
+{
+ static int MAX_COMPLETIONS = 32;
+ ibv_wc wc[MAX_COMPLETIONS];
+
+ std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
+ std::vector<ibv_wc> tx_cqe;
+ ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
+ uint64_t last_inactive = Cycles::rdtsc();
+ bool rearmed = false;
+ int r = 0;
+
+ while (true) {
+ int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
+ if (tx_ret > 0) {
+ ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
+ << " responses."<< dendl;
+ handle_tx_event(wc, tx_ret);
+ }
+
+ int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
+ if (rx_ret > 0) {
+ ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
+ << " responses."<< dendl;
+ handle_rx_event(wc, rx_ret);
+ }
+
+ if (!tx_ret && !rx_ret) {
+ perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
+ //
+ // Clean up dead QPs when rx/tx CQs are in idle. The thing is that
+ // we can destroy QPs even earlier, just when beacon has been received,
+ // but we have two CQs (rx & tx), thus beacon WC can be poped from tx
+ // CQ before other WCs are fully consumed from rx CQ. For safety, we
+ // wait for beacon and then "no-events" from CQs.
+ //
+ // Calling size() on vector without locks is totally fine, since we
+ // use it as a hint (accuracy is not important here)
+ //
+ if (!dead_queue_pairs.empty()) {
+ decltype(dead_queue_pairs) dead_qps;
+ {
+ std::lock_guard l{lock};
+ dead_queue_pairs.swap(dead_qps);
+ }
+
+ for (auto& qp: dead_qps) {
+ perf_logger->dec(l_msgr_rdma_active_queue_pair);
+ ldout(cct, 10) << __func__ << " finally delete qp = " << qp << dendl;
+ delete qp;
+ }
+ }
+
+ if (!num_qp_conn && done && dead_queue_pairs.empty())
+ break;
+
+ uint64_t now = Cycles::rdtsc();
+ if (Cycles::to_microseconds(now - last_inactive) > cct->_conf->ms_async_rdma_polling_us) {
+ handle_async_event();
+ if (!rearmed) {
+ // Clean up cq events after rearm notify ensure no new incoming event
+ // arrived between polling and rearm
+ tx_cq->rearm_notify();
+ rx_cq->rearm_notify();
+ rearmed = true;
+ continue;
+ }
+
+ struct pollfd channel_poll[2];
+ channel_poll[0].fd = tx_cc->get_fd();
+ channel_poll[0].events = POLLIN;
+ channel_poll[0].revents = 0;
+ channel_poll[1].fd = rx_cc->get_fd();
+ channel_poll[1].events = POLLIN;
+ channel_poll[1].revents = 0;
+ r = 0;
+ perf_logger->set(l_msgr_rdma_polling, 0);
+ while (!done && r == 0) {
+ r = TEMP_FAILURE_RETRY(poll(channel_poll, 2, 100));
+ if (r < 0) {
+ r = -errno;
+ lderr(cct) << __func__ << " poll failed " << r << dendl;
+ ceph_abort();
+ }
+ }
+ if (r > 0 && tx_cc->get_cq_event())
+ ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
+ if (r > 0 && rx_cc->get_cq_event())
+ ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
+ last_inactive = Cycles::rdtsc();
+ perf_logger->set(l_msgr_rdma_polling, 1);
+ rearmed = false;
+ }
+ }
+ }
+}
+
+void RDMADispatcher::notify_pending_workers() {
+ if (num_pending_workers) {
+ RDMAWorker *w = nullptr;
+ {
+ std::lock_guard l{w_lock};
+ if (!pending_workers.empty()) {
+ w = pending_workers.front();
+ pending_workers.pop_front();
+ --num_pending_workers;
+ }
+ }
+ if (w)
+ w->notify_worker();
+ }
+}
+
+void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
+{
+ std::lock_guard l{lock};
+ ceph_assert(!qp_conns.count(qp->get_local_qp_number()));
+ qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
+ ++num_qp_conn;
+}
+
+RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
+{
+ auto it = qp_conns.find(qp);
+ if (it == qp_conns.end())
+ return nullptr;
+ if (it->second.first->is_dead())
+ return nullptr;
+ return it->second.second;
+}
+
+Infiniband::QueuePair* RDMADispatcher::get_qp_lockless(uint32_t qp)
+{
+ // Try to find the QP in qp_conns firstly.
+ auto it = qp_conns.find(qp);
+ if (it != qp_conns.end())
+ return it->second.first;
+
+ // Try again in dead_queue_pairs.
+ for (auto &i: dead_queue_pairs)
+ if (i->get_local_qp_number() == qp)
+ return i;
+
+ return nullptr;
+}
+
+Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
+{
+ std::lock_guard l{lock};
+ return get_qp_lockless(qp);
+}
+
+void RDMADispatcher::enqueue_dead_qp_lockless(uint32_t qpn)
+{
+ auto it = qp_conns.find(qpn);
+ if (it == qp_conns.end()) {
+ lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
+ return ;
+ }
+ QueuePair *qp = it->second.first;
+ dead_queue_pairs.push_back(qp);
+ qp_conns.erase(it);
+ --num_qp_conn;
+}
+
+void RDMADispatcher::enqueue_dead_qp(uint32_t qpn)
+{
+ std::lock_guard l{lock};
+ enqueue_dead_qp_lockless(qpn);
+}
+
+void RDMADispatcher::schedule_qp_destroy(uint32_t qpn)
+{
+ std::lock_guard l{lock};
+ auto it = qp_conns.find(qpn);
+ if (it == qp_conns.end()) {
+ lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
+ return;
+ }
+ QueuePair *qp = it->second.first;
+ if (qp->to_dead()) {
+ //
+ // Failed to switch to dead. This is abnormal, but we can't
+ // do anything, so just destroy QP.
+ //
+ dead_queue_pairs.push_back(qp);
+ qp_conns.erase(it);
+ --num_qp_conn;
+ } else {
+ //
+ // Successfully switched to dead, thus keep entry in the map.
+ // But only zero out socked pointer in order to return null from
+ // get_conn_lockless();
+ it->second.second = nullptr;
+ }
+}
+
+void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
+{
+ std::vector<Chunk*> tx_chunks;
+
+ for (int i = 0; i < n; ++i) {
+ ibv_wc* response = &cqe[i];
+
+ // If it's beacon WR, enqueue the QP to be destroyed later
+ if (response->wr_id == BEACON_WRID) {
+ enqueue_dead_qp(response->qp_num);
+ continue;
+ }
+
+ ldout(cct, 20) << __func__ << " QP number: " << response->qp_num << " len: " << response->byte_len
+ << " status: " << ib->wc_status_to_string(response->status) << dendl;
+
+ if (response->status != IBV_WC_SUCCESS) {
+ switch(response->status) {
+ case IBV_WC_RETRY_EXC_ERR:
+ {
+ perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
+
+ ldout(cct, 1) << __func__ << " Responder ACK timeout, possible disconnect, or Remote QP in bad state "
+ << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+ << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+ << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+
+ std::lock_guard l{lock};
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+ if (conn) {
+ ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
+ << conn->get_peer_qpn() << dendl;
+ }
+ }
+ break;
+ case IBV_WC_WR_FLUSH_ERR:
+ {
+ perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
+
+ std::lock_guard l{lock};
+ QueuePair *qp = get_qp_lockless(response->qp_num);
+ if (qp) {
+ ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
+ }
+ if (qp && qp->is_dead()) {
+ ldout(cct, 20) << __func__ << " outstanding SQ WR is flushed into CQ since QueuePair is dead " << dendl;
+ } else {
+ lderr(cct) << __func__ << " Invalid/Unsupported request to consume outstanding SQ WR,"
+ << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+ << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+ << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+ if (conn) {
+ ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
+ << conn->get_peer_qpn() << dendl;
+ }
+ }
+ }
+ break;
+
+ default:
+ {
+ lderr(cct) << __func__ << " SQ WR return error,"
+ << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+ << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+ << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+
+ std::lock_guard l{lock};
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+ if (conn && conn->is_connected()) {
+ ldout(cct, 20) << __func__ << " SQ WR return error Queue Pair error state is : " << conn->get_qp_state()
+ << " remote Queue Pair, qp number: " << conn->get_peer_qpn() << dendl;
+ conn->fault();
+ } else {
+ ldout(cct, 1) << __func__ << " Disconnected, qp_num = " << response->qp_num << " discard event" << dendl;
+ }
+ }
+ break;
+ }
+ }
+
+ auto chunk = reinterpret_cast<Chunk *>(response->wr_id);
+ //TX completion may come either from
+ // 1) regular send message, WCE wr_id points to chunk
+ // 2) 'fin' message, wr_id points to the QP
+ if (ib->get_memory_manager()->is_valid_chunk(chunk)) {
+ tx_chunks.push_back(chunk);
+ } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
+ ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
+ } else {
+ ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
+ ceph_abort();
+ }
+ }
+
+ perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
+ post_tx_buffer(tx_chunks);
+}
+
+/**
+ * Add the given Chunks to the given free queue.
+ *
+ * \param[in] chunks
+ * The Chunks to enqueue.
+ * \return
+ * 0 if success or -1 for failure
+ */
+void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
+{
+ if (chunks.empty())
+ return ;
+
+ inflight -= chunks.size();
+ ib->get_memory_manager()->return_tx(chunks);
+ ldout(cct, 30) << __func__ << " release " << chunks.size()
+ << " chunks, inflight " << inflight << dendl;
+ notify_pending_workers();
+}
+
+void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number)
+{
+ perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number);
+ perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number);
+
+ std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
+ std::lock_guard l{lock};//make sure connected socket alive when pass wc
+
+ for (int i = 0; i < rx_number; ++i) {
+ ibv_wc* response = &cqe[i];
+ Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+ QueuePair *qp = get_qp_lockless(response->qp_num);
+
+ switch (response->status) {
+ case IBV_WC_SUCCESS:
+ ceph_assert(response->opcode == IBV_WC_RECV);
+ if (!conn) {
+ ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk 0x"
+ << std::hex << chunk << " will be back." << std::dec << dendl;
+ ib->post_chunk_to_pool(chunk);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+ } else {
+ conn->post_chunks_to_rq(1);
+ polled[conn].push_back(*response);
+
+ if (qp != nullptr && !qp->get_srq()) {
+ qp->remove_rq_wr(chunk);
+ chunk->clear_qp();
+ }
+ }
+ break;
+
+ case IBV_WC_WR_FLUSH_ERR:
+ perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
+
+ if (qp) {
+ ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
+ }
+ if (qp && qp->is_dead()) {
+ ldout(cct, 20) << __func__ << " outstanding RQ WR is flushed into CQ since QueuePair is dead " << dendl;
+ } else {
+ ldout(cct, 1) << __func__ << " RQ WR return error,"
+ << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+ << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+ << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+ if (conn) {
+ ldout(cct, 1) << __func__ << " RQ WR return error, remote Queue Pair, qp number: "
+ << conn->get_peer_qpn() << dendl;
+ }
+ }
+
+ ib->post_chunk_to_pool(chunk);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+ break;
+
+ default:
+ perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
+
+ ldout(cct, 1) << __func__ << " RQ WR return error,"
+ << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+ << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+ << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+ if (conn && conn->is_connected())
+ conn->fault();
+
+ ib->post_chunk_to_pool(chunk);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+ break;
+ }
+ }
+
+ for (auto &i : polled)
+ i.first->pass_wc(std::move(i.second));
+ polled.clear();
+}
+
+RDMAWorker::RDMAWorker(CephContext *c, unsigned worker_id)
+ : Worker(c, worker_id),
+ tx_handler(new C_handle_cq_tx(this))
+{
+ // initialize perf_logger
+ char name[128];
+ sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
+ PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);
+
+ plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
+ plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
+ plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
+
+ plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
+ plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES));
+ plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted");
+ plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES));
+ plb.add_u64_counter(l_msgr_rdma_pending_sent_conns, "pending_sent_conns", "The count of pending sent conns");
+
+ perf_logger = plb.create_perf_counters();
+ cct->get_perfcounters_collection()->add(perf_logger);
+}
+
+RDMAWorker::~RDMAWorker()
+{
+ delete tx_handler;
+}
+
+void RDMAWorker::initialize()
+{
+ ceph_assert(dispatcher);
+}
+
+int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
+ const SocketOptions &opt,ServerSocket *sock)
+{
+ ib->init();
+ dispatcher->polling_start();
+
+ RDMAServerSocketImpl *p;
+ if (cct->_conf->ms_async_rdma_type == "iwarp") {
+ p = new RDMAIWARPServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
+ } else {
+ p = new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
+ }
+ int r = p->listen(sa, opt);
+ if (r < 0) {
+ delete p;
+ return r;
+ }
+
+ *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
+ return 0;
+}
+
+int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
+{
+ ib->init();
+ dispatcher->polling_start();
+
+ RDMAConnectedSocketImpl* p;
+ if (cct->_conf->ms_async_rdma_type == "iwarp") {
+ p = new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, this);
+ } else {
+ p = new RDMAConnectedSocketImpl(cct, ib, dispatcher, this);
+ }
+ int r = p->try_connect(addr, opts);
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
+ delete p;
+ return r;
+ }
+ std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
+ *socket = ConnectedSocket(std::move(csi));
+ return 0;
+}
+
+int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
+{
+ ceph_assert(center.in_thread());
+ int r = ib->get_tx_buffers(c, bytes);
+ size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r;
+ ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
+ dispatcher->inflight += r;
+ if (got >= bytes)
+ return r;
+
+ if (o) {
+ if (!o->is_pending()) {
+ pending_sent_conns.push_back(o);
+ perf_logger->inc(l_msgr_rdma_pending_sent_conns, 1);
+ o->set_pending(1);
+ }
+ dispatcher->make_pending_worker(this);
+ }
+ return r;
+}
+
+
+void RDMAWorker::handle_pending_message()
+{
+ ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl;
+ while (!pending_sent_conns.empty()) {
+ RDMAConnectedSocketImpl *o = pending_sent_conns.front();
+ pending_sent_conns.pop_front();
+ ssize_t r = o->submit(false);
+ ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
+ if (r < 0) {
+ if (r == -EAGAIN) {
+ pending_sent_conns.push_back(o);
+ dispatcher->make_pending_worker(this);
+ return ;
+ }
+ o->fault();
+ }
+ o->set_pending(0);
+ perf_logger->dec(l_msgr_rdma_pending_sent_conns, 1);
+ }
+ dispatcher->notify_pending_workers();
+}
+
+RDMAStack::RDMAStack(CephContext *cct)
+ : NetworkStack(cct), ib(std::make_shared<Infiniband>(cct)),
+ rdma_dispatcher(std::make_shared<RDMADispatcher>(cct, ib))
+{
+ ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
+ ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl;
+}
+
+RDMAStack::~RDMAStack()
+{
+ if (cct->_conf->ms_async_rdma_enable_hugepage) {
+ unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction
+ }
+}
+
+Worker* RDMAStack::create_worker(CephContext *c, unsigned worker_id)
+{
+ auto w = new RDMAWorker(c, worker_id);
+ w->set_dispatcher(rdma_dispatcher);
+ w->set_ib(ib);
+ return w;
+}
+
+void RDMAStack::spawn_worker(std::function<void ()> &&func)
+{
+ threads.emplace_back(std::move(func));
+}
+
+void RDMAStack::join_worker(unsigned i)
+{
+ ceph_assert(threads.size() > i && threads[i].joinable());
+ threads[i].join();
+}
diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h
new file mode 100644
index 000000000..a36fd5fb7
--- /dev/null
+++ b/src/msg/async/rdma/RDMAStack.h
@@ -0,0 +1,344 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_RDMASTACK_H
+#define CEPH_MSG_RDMASTACK_H
+
+#include <sys/eventfd.h>
+
+#include <list>
+#include <vector>
+#include <thread>
+
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "msg/async/Stack.h"
+#include "Infiniband.h"
+
+class RDMAConnectedSocketImpl;
+class RDMAServerSocketImpl;
+class RDMAStack;
+class RDMAWorker;
+
+class RDMADispatcher {
+ typedef Infiniband::MemoryManager::Chunk Chunk;
+ typedef Infiniband::QueuePair QueuePair;
+
+ std::thread t;
+ CephContext *cct;
+ std::shared_ptr<Infiniband> ib;
+ Infiniband::CompletionQueue* tx_cq = nullptr;
+ Infiniband::CompletionQueue* rx_cq = nullptr;
+ Infiniband::CompletionChannel *tx_cc = nullptr, *rx_cc = nullptr;
+ bool done = false;
+ std::atomic<uint64_t> num_qp_conn = {0};
+ // protect `qp_conns`, `dead_queue_pairs`
+ ceph::mutex lock = ceph::make_mutex("RDMADispatcher::lock");
+ // qp_num -> InfRcConnection
+ // The main usage of `qp_conns` is looking up connection by qp_num,
+ // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
+ //// make qp queue into dead state
+ /**
+ * 1. Connection call mark_down
+ * 2. Move the Queue Pair into the Error state(QueuePair::to_dead)
+ * 3. Post a beacon
+ * 4. Wait for beacon which indicates queues are drained
+ * 5. Destroy the QP by calling ibv_destroy_qp()
+ *
+ * @param qp The qp needed to dead
+ */
+ ceph::unordered_map<uint32_t, std::pair<QueuePair*, RDMAConnectedSocketImpl*> > qp_conns;
+
+ /// if a queue pair is closed when transmit buffers are active
+ /// on it, the transmit buffers never get returned via tx_cq. To
+ /// work around this problem, don't delete queue pairs immediately. Instead,
+ /// save them in this vector and delete them at a safe time, when there are
+ /// no outstanding transmit buffers to be lost.
+ std::vector<QueuePair*> dead_queue_pairs;
+
+ std::atomic<uint64_t> num_pending_workers = {0};
+ // protect pending workers
+ ceph::mutex w_lock =
+ ceph::make_mutex("RDMADispatcher::for worker pending list");
+ // fixme: lockfree
+ std::list<RDMAWorker*> pending_workers;
+ void enqueue_dead_qp_lockless(uint32_t qp);
+ void enqueue_dead_qp(uint32_t qpn);
+
+ public:
+ PerfCounters *perf_logger;
+
+ explicit RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib);
+ virtual ~RDMADispatcher();
+ void handle_async_event();
+
+ void polling_start();
+ void polling_stop();
+ void polling();
+ void register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
+ void make_pending_worker(RDMAWorker* w) {
+ std::lock_guard l{w_lock};
+ auto it = std::find(pending_workers.begin(), pending_workers.end(), w);
+ if (it != pending_workers.end())
+ return;
+ pending_workers.push_back(w);
+ ++num_pending_workers;
+ }
+ RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
+ QueuePair* get_qp_lockless(uint32_t qp);
+ QueuePair* get_qp(uint32_t qp);
+ void schedule_qp_destroy(uint32_t qp);
+ Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
+ Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
+ void notify_pending_workers();
+ void handle_tx_event(ibv_wc *cqe, int n);
+ void post_tx_buffer(std::vector<Chunk*> &chunks);
+ void handle_rx_event(ibv_wc *cqe, int rx_number);
+
+ std::atomic<uint64_t> inflight = {0};
+
+ void post_chunk_to_pool(Chunk* chunk);
+ int post_chunks_to_rq(int num, QueuePair *qp = nullptr);
+};
+
+class RDMAWorker : public Worker {
+ typedef Infiniband::CompletionQueue CompletionQueue;
+ typedef Infiniband::CompletionChannel CompletionChannel;
+ typedef Infiniband::MemoryManager::Chunk Chunk;
+ typedef Infiniband::MemoryManager MemoryManager;
+ typedef std::vector<Chunk*>::iterator ChunkIter;
+ std::shared_ptr<Infiniband> ib;
+ EventCallbackRef tx_handler;
+ std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
+ std::shared_ptr<RDMADispatcher> dispatcher;
+ ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock");
+
+ class C_handle_cq_tx : public EventCallback {
+ RDMAWorker *worker;
+ public:
+ explicit C_handle_cq_tx(RDMAWorker *w): worker(w) {}
+ void do_request(uint64_t fd) {
+ worker->handle_pending_message();
+ }
+ };
+
+ public:
+ PerfCounters *perf_logger;
+ explicit RDMAWorker(CephContext *c, unsigned i);
+ virtual ~RDMAWorker();
+ virtual int listen(entity_addr_t &addr,
+ unsigned addr_slot,
+ const SocketOptions &opts, ServerSocket *) override;
+ virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
+ virtual void initialize() override;
+ int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
+ void remove_pending_conn(RDMAConnectedSocketImpl *o) {
+ ceph_assert(center.in_thread());
+ pending_sent_conns.remove(o);
+ }
+ void handle_pending_message();
+ void set_dispatcher(std::shared_ptr<RDMADispatcher>& dispatcher) { this->dispatcher = dispatcher; }
+ void set_ib(std::shared_ptr<Infiniband> &ib) {this->ib = ib;}
+ void notify_worker() {
+ center.dispatch_event_external(tx_handler);
+ }
+};
+
+struct RDMACMInfo {
+ RDMACMInfo(rdma_cm_id *cid, rdma_event_channel *cm_channel_, uint32_t qp_num_)
+ : cm_id(cid), cm_channel(cm_channel_), qp_num(qp_num_) {}
+ rdma_cm_id *cm_id;
+ rdma_event_channel *cm_channel;
+ uint32_t qp_num;
+};
+
+class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
+ public:
+ typedef Infiniband::MemoryManager::Chunk Chunk;
+ typedef Infiniband::CompletionChannel CompletionChannel;
+ typedef Infiniband::CompletionQueue CompletionQueue;
+
+ protected:
+ CephContext *cct;
+ Infiniband::QueuePair *qp;
+ uint32_t peer_qpn = 0;
+ uint32_t local_qpn = 0;
+ int connected;
+ int error;
+ std::shared_ptr<Infiniband> ib;
+ std::shared_ptr<RDMADispatcher> dispatcher;
+ RDMAWorker* worker;
+ std::vector<Chunk*> buffers;
+ int notify_fd = -1;
+ ceph::buffer::list pending_bl;
+
+ ceph::mutex lock = ceph::make_mutex("RDMAConnectedSocketImpl::lock");
+ std::vector<ibv_wc> wc;
+ bool is_server;
+ EventCallbackRef read_handler;
+ EventCallbackRef established_handler;
+ int tcp_fd = -1;
+ bool active;// qp is active ?
+ bool pending;
+ int post_backlog = 0;
+
+ void notify();
+ void buffer_prefetch(void);
+ ssize_t read_buffers(char* buf, size_t len);
+ int post_work_request(std::vector<Chunk*>&);
+ size_t tx_copy_chunk(std::vector<Chunk*> &tx_buffers, size_t req_copy_len,
+ decltype(std::cbegin(pending_bl.buffers()))& start,
+ const decltype(std::cbegin(pending_bl.buffers()))& end);
+
+ public:
+ RDMAConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w);
+ virtual ~RDMAConnectedSocketImpl();
+
+ void pass_wc(std::vector<ibv_wc> &&v);
+ void get_wc(std::vector<ibv_wc> &w);
+ virtual int is_connected() override { return connected; }
+
+ virtual ssize_t read(char* buf, size_t len) override;
+ virtual ssize_t send(ceph::buffer::list &bl, bool more) override;
+ virtual void shutdown() override;
+ virtual void close() override;
+ virtual int fd() const override { return notify_fd; }
+ virtual void set_priority(int sd, int prio, int domain) override;
+ void fault();
+ const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
+ uint32_t get_peer_qpn () const { return peer_qpn; }
+ uint32_t get_local_qpn () const { return local_qpn; }
+ Infiniband::QueuePair* get_qp () const { return qp; }
+ ssize_t submit(bool more);
+ int activate();
+ void fin();
+ void handle_connection();
+ int handle_connection_established(bool need_set_fault = true);
+ void cleanup();
+ void set_accept_fd(int sd);
+ virtual int try_connect(const entity_addr_t&, const SocketOptions &opt);
+ bool is_pending() {return pending;}
+ void set_pending(bool val) {pending = val;}
+ void post_chunks_to_rq(int num);
+ void update_post_backlog();
+};
+
+enum RDMA_CM_STATUS {
+ IDLE = 1,
+ RDMA_ID_CREATED,
+ CHANNEL_FD_CREATED,
+ RESOURCE_ALLOCATED,
+ ADDR_RESOLVED,
+ ROUTE_RESOLVED,
+ CONNECTED,
+ DISCONNECTED,
+ ERROR
+};
+
+class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl {
+ public:
+ RDMAIWARPConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ RDMAWorker *w, RDMACMInfo *info = nullptr);
+ ~RDMAIWARPConnectedSocketImpl();
+ virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
+ virtual void close() override;
+ virtual void shutdown() override;
+ virtual void handle_cm_connection();
+ void activate();
+ int alloc_resource();
+ void close_notify();
+
+ private:
+ rdma_cm_id *cm_id = nullptr;
+ rdma_event_channel *cm_channel = nullptr;
+ EventCallbackRef cm_con_handler;
+ std::mutex close_mtx;
+ std::condition_variable close_condition;
+ bool closed = false;
+ RDMA_CM_STATUS status = IDLE;
+
+
+ class C_handle_cm_connection : public EventCallback {
+ RDMAIWARPConnectedSocketImpl *csi;
+ public:
+ C_handle_cm_connection(RDMAIWARPConnectedSocketImpl *w): csi(w) {}
+ void do_request(uint64_t fd) {
+ csi->handle_cm_connection();
+ }
+ };
+};
+
+class RDMAServerSocketImpl : public ServerSocketImpl {
+ protected:
+ CephContext *cct;
+ ceph::NetHandler net;
+ int server_setup_socket;
+ std::shared_ptr<Infiniband> ib;
+ std::shared_ptr<RDMADispatcher> dispatcher;
+ RDMAWorker *worker;
+ entity_addr_t sa;
+
+ public:
+ RDMAServerSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ RDMAWorker *w, entity_addr_t& a, unsigned slot);
+
+ virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
+ virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
+ virtual void abort_accept() override;
+ virtual int fd() const override { return server_setup_socket; }
+};
+
+class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
+ public:
+ RDMAIWARPServerSocketImpl(
+ CephContext *cct, std::shared_ptr<Infiniband>& ib,
+ std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ RDMAWorker* w, entity_addr_t& addr, unsigned addr_slot);
+ virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
+ virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
+ virtual void abort_accept() override;
+ private:
+ rdma_cm_id *cm_id = nullptr;
+ rdma_event_channel *cm_channel = nullptr;
+};
+
+class RDMAStack : public NetworkStack {
+ std::vector<std::thread> threads;
+ PerfCounters *perf_counter;
+ std::shared_ptr<Infiniband> ib;
+ std::shared_ptr<RDMADispatcher> rdma_dispatcher;
+
+ std::atomic<bool> fork_finished = {false};
+
+ virtual Worker* create_worker(CephContext *c, unsigned worker_id) override;
+
+ public:
+ explicit RDMAStack(CephContext *cct);
+ virtual ~RDMAStack();
+ virtual bool nonblock_connect_need_writable_event() const override { return false; }
+
+ virtual void spawn_worker(std::function<void ()> &&func) override;
+ virtual void join_worker(unsigned i) override;
+ virtual bool is_ready() override { return fork_finished.load(); };
+ virtual void ready() override { fork_finished = true; };
+};
+
+
+#endif