diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/msg/async/rdma | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/msg/async/rdma')
-rw-r--r-- | src/msg/async/rdma/Infiniband.cc | 1321 | ||||
-rw-r--r-- | src/msg/async/rdma/Infiniband.h | 591 | ||||
-rw-r--r-- | src/msg/async/rdma/RDMAConnectedSocketImpl.cc | 607 | ||||
-rw-r--r-- | src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc | 183 | ||||
-rw-r--r-- | src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc | 119 | ||||
-rw-r--r-- | src/msg/async/rdma/RDMAServerSocketImpl.cc | 135 | ||||
-rw-r--r-- | src/msg/async/rdma/RDMAStack.cc | 812 | ||||
-rw-r--r-- | src/msg/async/rdma/RDMAStack.h | 344 |
8 files changed, 4112 insertions, 0 deletions
diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc new file mode 100644 index 000000000..52323f948 --- /dev/null +++ b/src/msg/async/rdma/Infiniband.cc @@ -0,0 +1,1321 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY <haomai@xsky.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Infiniband.h" +#include "common/errno.h" +#include "common/debug.h" +#include "RDMAStack.h" +#include <sys/time.h> +#include <sys/resource.h> + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << "Infiniband " + +static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1; +static const uint32_t MAX_INLINE_DATA = 0; +static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000"); +static const uint32_t CQ_DEPTH = 30000; + +Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), + gid_idx(cct->_conf.get_val<int64_t>("ms_async_rdma_gid_idx")) +{ + int r = ibv_query_port(ctxt, port_num, &port_attr); + if (r == -1) { + lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl; + ceph_abort(); + } + + lid = port_attr.lid; + ceph_assert(gid_idx < port_attr.gid_tbl_len); +#ifdef HAVE_IBV_EXP + union ibv_gid cgid; + struct ibv_exp_gid_attr gid_attr; + bool malformed = false; + + ldout(cct,1) << __func__ << " using experimental verbs for gid" << dendl; + + + // search for requested GID in GIDs table + ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid) + << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl; + r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(), + "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx" + ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx", + &cgid.raw[ 0], &cgid.raw[ 1], + &cgid.raw[ 2], &cgid.raw[ 3], + &cgid.raw[ 4], &cgid.raw[ 5], + &cgid.raw[ 6], &cgid.raw[ 7], + &cgid.raw[ 8], &cgid.raw[ 9], + &cgid.raw[10], &cgid.raw[11], + &cgid.raw[12], &cgid.raw[13], + &cgid.raw[14], &cgid.raw[15]); + + if (r != 16) { + ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl; + malformed = true; + } + + gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE; + + for (gid_idx = 0; gid_idx < port_attr.gid_tbl_len; gid_idx++) { + r = ibv_query_gid(ctxt, port_num, gid_idx, &gid); + if (r) { + lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; + ceph_abort(); + } + r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr); + if (r) { + lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; + ceph_abort(); + } + + if (malformed) break; // stay with gid_idx=0 + if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) && + (memcmp(&gid, &cgid, 16) == 0) ) { + ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl; + break; + } + } + + if (gid_idx == port_attr.gid_tbl_len) { + lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl; + ceph_abort(); + } +#else + r = ibv_query_gid(ctxt, port_num, gid_idx, &gid); + if (r) { + lderr(cct) << __func__ << " query gid failed " << cpp_strerror(errno) << dendl; + ceph_abort(); + } +#endif +} + +Device::Device(CephContext *cct, ibv_device* ib_dev): device(ib_dev), active_port(nullptr) +{ + ceph_assert(device); + ctxt = ibv_open_device(device); + ceph_assert(ctxt); + + name = ibv_get_device_name(device); + + int r = ibv_query_device(ctxt, &device_attr); + if (r) { + lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl; + ceph_abort(); + } +} + +Device::Device(CephContext *cct, struct ibv_context *ib_ctx): device(ib_ctx->device), + active_port(nullptr) +{ + ceph_assert(device); + ctxt = ib_ctx; + ceph_assert(ctxt); + + name = ibv_get_device_name(device); + + int r = ibv_query_device(ctxt, &device_attr); + if (r) { + lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl; + ceph_abort(); + } +} + +void Device::binding_port(CephContext *cct, int port_num) { + port_cnt = device_attr.phys_port_cnt; + for (uint8_t port_id = 1; port_id <= port_cnt; ++port_id) { + Port *port = new Port(cct, ctxt, port_id); + if (port_id == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) { + active_port = port; + ldout(cct, 1) << __func__ << " found active port " << static_cast<int>(port_id) << dendl; + break; + } else { + ldout(cct, 10) << __func__ << " port " << port_id << " is not what we want. state: " + << ibv_port_state_str(port->get_port_attr()->state) << dendl; + delete port; + } + } + if (nullptr == active_port) { + lderr(cct) << __func__ << " port not found" << dendl; + ceph_assert(active_port); + } +} + + +Infiniband::QueuePair::QueuePair( + CephContext *c, Infiniband& infiniband, ibv_qp_type type, + int port, ibv_srq *srq, + Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, + uint32_t tx_queue_len, uint32_t rx_queue_len, struct rdma_cm_id *cid, uint32_t q_key) +: cct(c), infiniband(infiniband), + type(type), + ctxt(infiniband.device->ctxt), + ib_physical_port(port), + pd(infiniband.pd->pd), + srq(srq), + qp(NULL), + cm_id(cid), peer_cm_meta{0}, local_cm_meta{0}, + txcq(txcq), + rxcq(rxcq), + initial_psn(lrand48() & PSN_MSK), + // One extra WR for beacon + max_send_wr(tx_queue_len + 1), + max_recv_wr(rx_queue_len), + q_key(q_key), + dead(false) +{ + if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) { + lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl; + ceph_abort(); + } +} + +int Infiniband::QueuePair::modify_qp_to_error(void) +{ + ibv_qp_attr qpa; + // FIPS zeroization audit 20191115: this memset is not security related. + memset(&qpa, 0, sizeof(qpa)); + qpa.qp_state = IBV_QPS_ERR; + if (ibv_modify_qp(qp, &qpa, IBV_QP_STATE)) { + lderr(cct) << __func__ << " failed to transition to ERROR state: " << cpp_strerror(errno) << dendl; + return -1; + } + ldout(cct, 20) << __func__ << " transition to ERROR state successfully." << dendl; + return 0; +} + +int Infiniband::QueuePair::modify_qp_to_rts(void) +{ + // move from RTR state RTS + ibv_qp_attr qpa; + // FIPS zeroization audit 20191115: this memset is not security related. + memset(&qpa, 0, sizeof(qpa)); + qpa.qp_state = IBV_QPS_RTS; + /* + * How long to wait before retrying if packet lost or server dead. + * Supposedly the timeout is 4.096us*2^timeout. However, the actual + * timeout appears to be 4.096us*2^(timeout+1), so the setting + * below creates a 135ms timeout. + */ + qpa.timeout = 0x12; + // How many times to retry after timeouts before giving up. + qpa.retry_cnt = 7; + /* + * How many times to retry after RNR (receiver not ready) condition + * before giving up. Occurs when the remote side has not yet posted + * a receive request. + */ + qpa.rnr_retry = 7; // 7 is infinite retry. + qpa.sq_psn = local_cm_meta.psn; + qpa.max_rd_atomic = 1; + + int attr_mask = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC; + int r = ibv_modify_qp(qp, &qpa, attr_mask); + if (r) { + lderr(cct) << __func__ << " failed to transition to RTS state: " << cpp_strerror(errno) << dendl; + return -1; + } + ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl; + return 0; +} + +int Infiniband::QueuePair::modify_qp_to_rtr(void) +{ + // move from INIT to RTR state + ibv_qp_attr qpa; + // FIPS zeroization audit 20191115: this memset is not security related. + memset(&qpa, 0, sizeof(qpa)); + qpa.qp_state = IBV_QPS_RTR; + qpa.path_mtu = IBV_MTU_1024; + qpa.dest_qp_num = peer_cm_meta.local_qpn; + qpa.rq_psn = peer_cm_meta.psn; + qpa.max_dest_rd_atomic = 1; + qpa.min_rnr_timer = 0x12; + qpa.ah_attr.is_global = 1; + qpa.ah_attr.grh.hop_limit = 6; + qpa.ah_attr.grh.dgid = peer_cm_meta.gid; + qpa.ah_attr.grh.sgid_index = infiniband.get_device()->get_gid_idx(); + qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp; + //qpa.ah_attr.grh.flow_label = 0; + + qpa.ah_attr.dlid = peer_cm_meta.lid; + qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl; + qpa.ah_attr.src_path_bits = 0; + qpa.ah_attr.port_num = (uint8_t)(ib_physical_port); + + ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl; + + int attr_mask = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MIN_RNR_TIMER | IBV_QP_MAX_DEST_RD_ATOMIC; + + int r = ibv_modify_qp(qp, &qpa, attr_mask); + if (r) { + lderr(cct) << __func__ << " failed to transition to RTR state: " << cpp_strerror(errno) << dendl; + return -1; + } + ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl; + return 0; +} + +int Infiniband::QueuePair::modify_qp_to_init(void) +{ + // move from RESET to INIT state + ibv_qp_attr qpa; + // FIPS zeroization audit 20191115: this memset is not security related. + memset(&qpa, 0, sizeof(qpa)); + qpa.qp_state = IBV_QPS_INIT; + qpa.pkey_index = 0; + qpa.port_num = (uint8_t)(ib_physical_port); + qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE; + qpa.qkey = q_key; + + int mask = IBV_QP_STATE | IBV_QP_PORT; + switch (type) { + case IBV_QPT_RC: + mask |= IBV_QP_ACCESS_FLAGS; + mask |= IBV_QP_PKEY_INDEX; + break; + case IBV_QPT_UD: + mask |= IBV_QP_QKEY; + mask |= IBV_QP_PKEY_INDEX; + break; + case IBV_QPT_RAW_PACKET: + break; + default: + ceph_abort(); + } + + if (ibv_modify_qp(qp, &qpa, mask)) { + lderr(cct) << __func__ << " failed to switch to INIT state Queue Pair, qp number: " << qp->qp_num + << " Error: " << cpp_strerror(errno) << dendl; + return -1; + } + ldout(cct, 20) << __func__ << " successfully switch to INIT state Queue Pair, qp number: " << qp->qp_num << dendl; + return 0; +} + +int Infiniband::QueuePair::init() +{ + ldout(cct, 20) << __func__ << " started." << dendl; + ibv_qp_init_attr qpia; + // FIPS zeroization audit 20191115: this memset is not security related. + memset(&qpia, 0, sizeof(qpia)); + qpia.send_cq = txcq->get_cq(); + qpia.recv_cq = rxcq->get_cq(); + if (srq) { + qpia.srq = srq; // use the same shared receive queue + } else { + qpia.cap.max_recv_wr = max_recv_wr; + qpia.cap.max_recv_sge = 1; + } + qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests + qpia.cap.max_send_sge = 1; // max send scatter-gather elements + qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q + qpia.qp_type = type; // RC, UC, UD, or XRC + qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs + + if (!cct->_conf->ms_async_rdma_cm) { + qp = ibv_create_qp(pd, &qpia); + if (qp == NULL) { + lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl; + if (errno == ENOMEM) { + lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, " + " ms_async_rdma_send_buffers or" + " ms_async_rdma_buffer_size" << dendl; + } + return -1; + } + if (modify_qp_to_init() != 0) { + ibv_destroy_qp(qp); + return -1; + } + } else { + ceph_assert(cm_id->verbs == pd->context); + if (rdma_create_qp(cm_id, pd, &qpia)) { + lderr(cct) << __func__ << " failed to create queue pair with rdmacm library" + << cpp_strerror(errno) << dendl; + return -1; + } + qp = cm_id->qp; + } + ldout(cct, 20) << __func__ << " successfully create queue pair: " + << "qp=" << qp << dendl; + local_cm_meta.local_qpn = get_local_qp_number(); + local_cm_meta.psn = get_initial_psn(); + local_cm_meta.lid = infiniband.get_lid(); + local_cm_meta.peer_qpn = 0; + local_cm_meta.gid = infiniband.get_gid(); + if (!srq) { + int rq_wrs = infiniband.post_chunks_to_rq(max_recv_wr, this); + if (rq_wrs == 0) { + lderr(cct) << __func__ << " intialize no SRQ Queue Pair, qp number: " << qp->qp_num + << " fatal error: can't post SQ WR " << dendl; + return -1; + } + ldout(cct, 20) << __func__ << " initialize no SRQ Queue Pair, qp number: " + << qp->qp_num << " post SQ WR " << rq_wrs << dendl; + } + return 0; +} + +void Infiniband::QueuePair::wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data) +{ + char tmp[9]; + uint32_t v32; + int i; + + for (tmp[8] = 0, i = 0; i < 4; ++i) { + memcpy(tmp, wgid + i * 8, 8); + sscanf(tmp, "%x", &v32); + *(uint32_t *)(&cm_meta_data->gid.raw[i * 4]) = ntohl(v32); + } +} + +void Infiniband::QueuePair::gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[]) +{ + for (int i = 0; i < 4; ++i) + sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(cm_meta_data.gid.raw + i * 4))); +} + +/* + * return value + * 1: means no valid buffer read + * 0: means got enough buffer + * < 0: means error + */ +int Infiniband::QueuePair::recv_cm_meta(CephContext *cct, int socket_fd) +{ + char msg[TCP_MSG_LEN]; + char gid[33]; + ssize_t r = ::read(socket_fd, &msg, sizeof(msg)); + // Drop incoming qpt + if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; + return -EINVAL; + } + } + if (r < 0) { + r = -errno; + lderr(cct) << __func__ << " got error " << r << ": " + << cpp_strerror(r) << dendl; + } else if (r == 0) { // valid disconnect message of length 0 + ldout(cct, 10) << __func__ << " got disconnect message " << dendl; + } else if ((size_t)r != sizeof(msg)) { // invalid message + ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl; + r = -EINVAL; + } else { // valid message + sscanf(msg, "%hx:%x:%x:%x:%s", &(peer_cm_meta.lid), &(peer_cm_meta.local_qpn), &(peer_cm_meta.psn), &(peer_cm_meta.peer_qpn), gid); + wire_gid_to_gid(gid, &peer_cm_meta); + ldout(cct, 5) << __func__ << " recevd: " << peer_cm_meta.lid << ", " << peer_cm_meta.local_qpn + << ", " << peer_cm_meta.psn << ", " << peer_cm_meta.peer_qpn << ", " << gid << dendl; + } + return r; +} + +int Infiniband::QueuePair::send_cm_meta(CephContext *cct, int socket_fd) +{ + int retry = 0; + ssize_t r; + + char msg[TCP_MSG_LEN]; + char gid[33]; +retry: + gid_to_wire_gid(local_cm_meta, gid); + sprintf(msg, "%04x:%08x:%08x:%08x:%s", local_cm_meta.lid, local_cm_meta.local_qpn, local_cm_meta.psn, local_cm_meta.peer_qpn, gid); + ldout(cct, 10) << __func__ << " sending: " << local_cm_meta.lid << ", " << local_cm_meta.local_qpn + << ", " << local_cm_meta.psn << ", " << local_cm_meta.peer_qpn << ", " << gid << dendl; + r = ::write(socket_fd, msg, sizeof(msg)); + // Drop incoming qpt + if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) { + if (rand() % cct->_conf->ms_inject_socket_failures == 0) { + ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; + return -EINVAL; + } + } + + if ((size_t)r != sizeof(msg)) { + // FIXME need to handle EAGAIN instead of retry + if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) { + retry++; + goto retry; + } + if (r < 0) + lderr(cct) << __func__ << " send returned error " << errno << ": " + << cpp_strerror(errno) << dendl; + else + lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl; + return -errno; + } + return 0; +} + +/** + * Switch QP to ERROR state and then post a beacon to be able to drain all + * WCEs and then safely destroy QP. See RDMADispatcher::handle_tx_event() + * for details. + * + * \return + * -errno if the QueuePair can't switch to ERROR + * 0 for success. + */ +int Infiniband::QueuePair::to_dead() +{ + if (dead) + return 0; + + if (modify_qp_to_error()) { + return -1; + } + ldout(cct, 20) << __func__ << " force trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn + << " bound remote QueuePair, qp number: " << local_cm_meta.peer_qpn << dendl; + + struct ibv_send_wr *bad_wr = nullptr, beacon; + // FIPS zeroization audit 20191115: this memset is not security related. + memset(&beacon, 0, sizeof(beacon)); + beacon.wr_id = BEACON_WRID; + beacon.opcode = IBV_WR_SEND; + beacon.send_flags = IBV_SEND_SIGNALED; + if (ibv_post_send(qp, &beacon, &bad_wr)) { + lderr(cct) << __func__ << " failed to send a beacon: " << cpp_strerror(errno) << dendl; + return -errno; + } + ldout(cct, 20) << __func__ << " trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn << " Beacon sent " << dendl; + dead = true; + + return 0; +} + +int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const +{ + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia); + if (r) { + lderr(cct) << __func__ << " failed to query qp: " + << cpp_strerror(errno) << dendl; + return -1; + } + + if (rqp) + *rqp = qpa.dest_qp_num; + return 0; +} + +/** + * Get the remote infiniband address for this QueuePair, as set in #plumb(). + * LIDs are "local IDs" in infiniband terminology. They are short, locally + * routable addresses. + */ +int Infiniband::QueuePair::get_remote_lid(uint16_t *lid) const +{ + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia); + if (r) { + lderr(cct) << __func__ << " failed to query qp: " + << cpp_strerror(errno) << dendl; + return -1; + } + + if (lid) + *lid = qpa.ah_attr.dlid; + return 0; +} + +/** + * Get the state of a QueuePair. + */ +int Infiniband::QueuePair::get_state() const +{ + ibv_qp_attr qpa; + ibv_qp_init_attr qpia; + + int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia); + if (r) { + lderr(cct) << __func__ << " failed to get state: " + << cpp_strerror(errno) << dendl; + return -1; + } + return qpa.qp_state; +} + +Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib) + : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) +{ +} + +Infiniband::CompletionChannel::~CompletionChannel() +{ + if (channel) { + int r = ibv_destroy_comp_channel(channel); + if (r < 0) + lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl; + ceph_assert(r == 0); + } +} + +int Infiniband::CompletionChannel::init() +{ + ldout(cct, 20) << __func__ << " started." << dendl; + channel = ibv_create_comp_channel(infiniband.device->ctxt); + if (!channel) { + lderr(cct) << __func__ << " failed to create receive completion channel: " + << cpp_strerror(errno) << dendl; + return -1; + } + int rc = ceph::NetHandler(cct).set_nonblock(channel->fd); + if (rc < 0) { + ibv_destroy_comp_channel(channel); + return -1; + } + return 0; +} + +void Infiniband::CompletionChannel::ack_events() +{ + ibv_ack_cq_events(cq, cq_events_that_need_ack); + cq_events_that_need_ack = 0; +} + +bool Infiniband::CompletionChannel::get_cq_event() +{ + ibv_cq *cq = NULL; + void *ev_ctx; + if (ibv_get_cq_event(channel, &cq, &ev_ctx)) { + if (errno != EAGAIN && errno != EINTR) + lderr(cct) << __func__ << " failed to retrieve CQ event: " + << cpp_strerror(errno) << dendl; + return false; + } + + /* accumulate number of cq events that need to + * * be acked, and periodically ack them + * */ + if (++cq_events_that_need_ack == MAX_ACK_EVENT) { + ldout(cct, 20) << __func__ << " ack aq events." << dendl; + ibv_ack_cq_events(cq, MAX_ACK_EVENT); + cq_events_that_need_ack = 0; + } + + return true; +} + + +Infiniband::CompletionQueue::~CompletionQueue() +{ + if (cq) { + int r = ibv_destroy_cq(cq); + if (r < 0) + lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl; + ceph_assert(r == 0); + } +} + +int Infiniband::CompletionQueue::init() +{ + cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0); + if (!cq) { + lderr(cct) << __func__ << " failed to create receive completion queue: " + << cpp_strerror(errno) << dendl; + return -1; + } + + if (ibv_req_notify_cq(cq, 0)) { + lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl; + ibv_destroy_cq(cq); + cq = nullptr; + return -1; + } + + channel->bind_cq(cq); + ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl; + return 0; +} + +int Infiniband::CompletionQueue::rearm_notify(bool solicite_only) +{ + ldout(cct, 20) << __func__ << " started." << dendl; + int r = ibv_req_notify_cq(cq, 0); + if (r < 0) + lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl; + return r; +} + +int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) { + int r = ibv_poll_cq(cq, num_entries, ret_wc_array); + if (r < 0) { + lderr(cct) << __func__ << " poll_completion_queue occur met error: " + << cpp_strerror(errno) << dendl; + return -1; + } + return r; +} + + +Infiniband::ProtectionDomain::ProtectionDomain(CephContext *cct, Device *device) + : pd(ibv_alloc_pd(device->ctxt)) +{ + if (pd == NULL) { + lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl; + ceph_abort(); + } +} + +Infiniband::ProtectionDomain::~ProtectionDomain() +{ + ibv_dealloc_pd(pd); +} + + +Infiniband::MemoryManager::Chunk::Chunk(ibv_mr* m, uint32_t bytes, char* buffer, + uint32_t offset, uint32_t bound, uint32_t lkey, QueuePair* qp) + : mr(m), qp(qp), lkey(lkey), bytes(bytes), offset(offset), bound(bound), buffer(buffer) +{ +} + +Infiniband::MemoryManager::Chunk::~Chunk() +{ +} + +uint32_t Infiniband::MemoryManager::Chunk::get_offset() +{ + return offset; +} + +uint32_t Infiniband::MemoryManager::Chunk::get_size() const +{ + return bound - offset; +} + +void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b) +{ + offset = 0; + bound = b; +} + +uint32_t Infiniband::MemoryManager::Chunk::get_bound() +{ + return bound; +} + +uint32_t Infiniband::MemoryManager::Chunk::read(char* buf, uint32_t len) +{ + uint32_t left = get_size(); + uint32_t read_len = left <= len ? left : len; + memcpy(buf, buffer + offset, read_len); + offset += read_len; + return read_len; +} + +uint32_t Infiniband::MemoryManager::Chunk::write(char* buf, uint32_t len) +{ + uint32_t write_len = (bytes - offset) <= len ? (bytes - offset) : len; + memcpy(buffer + offset, buf, write_len); + offset += write_len; + return write_len; +} + +bool Infiniband::MemoryManager::Chunk::full() +{ + return offset == bytes; +} + +void Infiniband::MemoryManager::Chunk::reset_read_chunk() +{ + offset = 0; + bound = 0; +} + +void Infiniband::MemoryManager::Chunk::reset_write_chunk() +{ + offset = 0; + bound = bytes; +} + +Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s) + : manager(m), buffer_size(s) +{ +} + +Infiniband::MemoryManager::Cluster::~Cluster() +{ + int r = ibv_dereg_mr(chunk_base->mr); + ceph_assert(r == 0); + const auto chunk_end = chunk_base + num_chunk; + for (auto chunk = chunk_base; chunk != chunk_end; chunk++) { + chunk->~Chunk(); + } + + ::free(chunk_base); + manager.free(base); +} + +int Infiniband::MemoryManager::Cluster::fill(uint32_t num) +{ + ceph_assert(!base); + num_chunk = num; + uint32_t bytes = buffer_size * num; + + base = (char*)manager.malloc(bytes); + end = base + bytes; + ceph_assert(base); + chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num)); + // FIPS zeroization audit 20191115: this memset is not security related. + memset(static_cast<void*>(chunk_base), 0, sizeof(Chunk) * num); + free_chunks.reserve(num); + ibv_mr* m = ibv_reg_mr(manager.pd->pd, base, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); + ceph_assert(m); + Chunk* chunk = chunk_base; + for (uint32_t offset = 0; offset < bytes; offset += buffer_size){ + new(chunk) Chunk(m, buffer_size, base + offset, 0, buffer_size, m->lkey); + free_chunks.push_back(chunk); + chunk++; + } + return 0; +} + +void Infiniband::MemoryManager::Cluster::take_back(std::vector<Chunk*> &ck) +{ + std::lock_guard l{lock}; + for (auto c : ck) { + c->reset_write_chunk(); + free_chunks.push_back(c); + } +} + +int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks, size_t block_size) +{ + std::lock_guard l{lock}; + uint32_t chunk_buffer_number = (block_size + buffer_size - 1) / buffer_size; + chunk_buffer_number = free_chunks.size() < chunk_buffer_number ? free_chunks.size(): chunk_buffer_number; + uint32_t r = 0; + + for (r = 0; r < chunk_buffer_number; ++r) { + chunks.push_back(free_chunks.back()); + free_chunks.pop_back(); + } + return r; +} + +bool Infiniband::MemoryManager::MemPoolContext::can_alloc(unsigned nbufs) +{ + /* unlimited */ + if (manager->cct->_conf->ms_async_rdma_receive_buffers <= 0) + return true; + + if (n_bufs_allocated + nbufs > (unsigned)manager->cct->_conf->ms_async_rdma_receive_buffers) { + lderr(manager->cct) << __func__ << " WARNING: OUT OF RX BUFFERS: allocated: " << + n_bufs_allocated << " requested: " << nbufs << + " limit: " << manager->cct->_conf->ms_async_rdma_receive_buffers << dendl; + return false; + } + + return true; +} + +void Infiniband::MemoryManager::MemPoolContext::set_stat_logger(PerfCounters *logger) { + perf_logger = logger; + if (perf_logger != nullptr) + perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated); +} + +void Infiniband::MemoryManager::MemPoolContext::update_stats(int nbufs) +{ + n_bufs_allocated += nbufs; + + if (!perf_logger) + return; + + if (nbufs > 0) { + perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs); + } else { + perf_logger->dec(l_msgr_rdma_rx_bufs_total, -nbufs); + } +} + +void *Infiniband::MemoryManager::mem_pool::slow_malloc() +{ + // this will trigger pool expansion via PoolAllocator::malloc() + return PoolAllocator::with_context(ctx, [this] { + return boost::pool<PoolAllocator>::malloc(); + }); +} + +Infiniband::MemoryManager::MemPoolContext* +Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr; + +// lock is taken by mem_pool::slow_malloc() +ceph::mutex& Infiniband::MemoryManager::PoolAllocator::get_lock() +{ + static ceph::mutex lock = ceph::make_mutex("pool-alloc-lock"); + return lock; +} + +char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type block_size) +{ + ceph_assert(g_ctx); + MemoryManager *manager = g_ctx->manager; + CephContext *cct = manager->cct; + size_t chunk_buffer_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size; + size_t chunk_buffer_number = block_size / chunk_buffer_size; + + if (!g_ctx->can_alloc(chunk_buffer_number)) + return NULL; + + mem_info *minfo= static_cast<mem_info *>(manager->malloc(block_size + sizeof(mem_info))); + if (!minfo) { + lderr(cct) << __func__ << " failed to allocate " << chunk_buffer_number << " buffers " + " Its block size is : " << block_size + sizeof(mem_info) << dendl; + return NULL; + } + + minfo->mr = ibv_reg_mr(manager->pd->pd, minfo->chunks, block_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); + if (minfo->mr == NULL) { + lderr(cct) << __func__ << " failed to do rdma memory registration " << block_size << " bytes. " + " relase allocated memory now." << dendl; + manager->free(minfo); + return NULL; + } + + minfo->nbufs = chunk_buffer_number; + // save this chunk context + minfo->ctx = g_ctx; + + // note that the memory can be allocated before perf logger is set + g_ctx->update_stats(chunk_buffer_number); + + /* initialize chunks */ + Chunk *chunk = minfo->chunks; + for (unsigned i = 0; i < chunk_buffer_number; i++) { + new(chunk) Chunk(minfo->mr, cct->_conf->ms_async_rdma_buffer_size, chunk->data, 0, 0, minfo->mr->lkey); + chunk = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(chunk) + chunk_buffer_size); + } + + return reinterpret_cast<char *>(minfo->chunks); +} + + +void Infiniband::MemoryManager::PoolAllocator::free(char * const block) +{ + mem_info *m; + std::lock_guard l{get_lock()}; + + Chunk *mem_info_chunk = reinterpret_cast<Chunk *>(block); + m = reinterpret_cast<mem_info *>(reinterpret_cast<char *>(mem_info_chunk) - offsetof(mem_info, chunks)); + m->ctx->update_stats(-m->nbufs); + ibv_dereg_mr(m->mr); + m->ctx->manager->free(m); +} + +Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p) + : cct(c), device(d), pd(p), + rxbuf_pool_ctx(this), + rxbuf_pool(&rxbuf_pool_ctx, sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size, + c->_conf->ms_async_rdma_receive_buffers > 0 ? + // if possible make initial pool size 2 * receive_queue_len + // that way there will be no pool expansion upon receive of the + // first packet. + (c->_conf->ms_async_rdma_receive_buffers < 2 * c->_conf->ms_async_rdma_receive_queue_len ? + c->_conf->ms_async_rdma_receive_buffers : 2 * c->_conf->ms_async_rdma_receive_queue_len) : + // rx pool is infinite, we can set any initial size that we want + 2 * c->_conf->ms_async_rdma_receive_queue_len, + device->device_attr.max_mr_size / (sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size)) +{ +} + +Infiniband::MemoryManager::~MemoryManager() +{ + if (send) + delete send; +} + +void* Infiniband::MemoryManager::huge_pages_malloc(size_t size) +{ + size_t real_size = ALIGN_TO_PAGE_2MB(size) + HUGE_PAGE_SIZE_2MB; + char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE | MAP_HUGETLB, -1, 0); + if (ptr == MAP_FAILED) { + ptr = (char *)std::malloc(real_size); + if (ptr == NULL) return NULL; + real_size = 0; + } + *((size_t *)ptr) = real_size; + return ptr + HUGE_PAGE_SIZE_2MB; +} + +void Infiniband::MemoryManager::huge_pages_free(void *ptr) +{ + if (ptr == NULL) return; + void *real_ptr = (char *)ptr - HUGE_PAGE_SIZE_2MB; + size_t real_size = *((size_t *)real_ptr); + ceph_assert(real_size % HUGE_PAGE_SIZE_2MB == 0); + if (real_size != 0) + munmap(real_ptr, real_size); + else + std::free(real_ptr); +} + + +void* Infiniband::MemoryManager::malloc(size_t size) +{ + if (cct->_conf->ms_async_rdma_enable_hugepage) + return huge_pages_malloc(size); + else + return std::malloc(size); +} + +void Infiniband::MemoryManager::free(void *ptr) +{ + if (cct->_conf->ms_async_rdma_enable_hugepage) + huge_pages_free(ptr); + else + std::free(ptr); +} + +void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num) +{ + ceph_assert(device); + ceph_assert(pd); + + send = new Cluster(*this, size); + send->fill(tx_num); +} + +void Infiniband::MemoryManager::return_tx(std::vector<Chunk*> &chunks) +{ + send->take_back(chunks); +} + +int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t bytes) +{ + return send->get_buffers(c, bytes); +} + +static std::atomic<bool> init_prereq = {false}; + +void Infiniband::verify_prereq(CephContext *cct) { + int rc = 0; + ldout(cct, 20) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl; + if (cct->_conf->ms_async_rdma_enable_hugepage){ + rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1); + ldout(cct, 0) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl; + if (rc) { + lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl; + ceph_abort(); + } + } + + //On RDMA MUST be called before fork + rc = ibv_fork_init(); + if (rc) { + lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl; + ceph_abort(); + } + + //Check ulimit + struct rlimit limit; + getrlimit(RLIMIT_MEMLOCK, &limit); + if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) { + lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory." + " We recommend setting this parameter to infinity" << dendl; + } + init_prereq = true; +} + +Infiniband::Infiniband(CephContext *cct) + : cct(cct), + device_name(cct->_conf->ms_async_rdma_device_name), + port_num( cct->_conf->ms_async_rdma_port_num) +{ + if (!init_prereq) + verify_prereq(cct); + ldout(cct, 20) << __func__ << " constructing Infiniband..." << dendl; +} + +void Infiniband::init() +{ + std::lock_guard l{lock}; + + if (initialized) + return; + + device_list = new DeviceList(cct); + initialized = true; + + device = device_list->get_device(device_name.c_str()); + ceph_assert(device); + device->binding_port(cct, port_num); + ib_physical_port = device->active_port->get_port_num(); + pd = new ProtectionDomain(cct, device); + ceph_assert(ceph::NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0); + + support_srq = cct->_conf->ms_async_rdma_support_srq; + if (support_srq) { + ceph_assert(device->device_attr.max_srq); + rx_queue_len = device->device_attr.max_srq_wr; + } + else + rx_queue_len = device->device_attr.max_qp_wr; + if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) { + rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len; + ldout(cct, 1) << __func__ << " assigning: " << rx_queue_len << " receive buffers" << dendl; + } else { + ldout(cct, 0) << __func__ << " using the max allowed receive buffers: " << rx_queue_len << dendl; + } + + // check for the misconfiguration + if (cct->_conf->ms_async_rdma_receive_buffers > 0 && + rx_queue_len > (unsigned)cct->_conf->ms_async_rdma_receive_buffers) { + lderr(cct) << __func__ << " rdma_receive_queue_len (" << + rx_queue_len << ") > ms_async_rdma_receive_buffers(" << + cct->_conf->ms_async_rdma_receive_buffers << ")." << dendl; + ceph_abort(); + } + + // Keep extra one WR for a beacon to indicate all WCEs were consumed + tx_queue_len = device->device_attr.max_qp_wr - 1; + if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) { + tx_queue_len = cct->_conf->ms_async_rdma_send_buffers; + ldout(cct, 1) << __func__ << " assigning: " << tx_queue_len << " send buffers" << dendl; + } else { + ldout(cct, 0) << __func__ << " using the max allowed send buffers: " << tx_queue_len << dendl; + } + + //check for the memory region size misconfiguration + if ((uint64_t)cct->_conf->ms_async_rdma_buffer_size * tx_queue_len > device->device_attr.max_mr_size) { + lderr(cct) << __func__ << " Out of max memory region size " << dendl; + ceph_abort(); + } + + ldout(cct, 1) << __func__ << " device allow " << device->device_attr.max_cqe + << " completion entries" << dendl; + + memory_manager = new MemoryManager(cct, device, pd); + memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len); + + if (support_srq) { + srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT); + post_chunks_to_rq(rx_queue_len, NULL); //add to srq + } +} + +Infiniband::~Infiniband() +{ + if (!initialized) + return; + if (support_srq) + ibv_destroy_srq(srq); + delete memory_manager; + delete pd; + delete device_list; +} + +/** + * Create a shared receive queue. This basically wraps the verbs call. + * + * \param[in] max_wr + * The max number of outstanding work requests in the SRQ. + * \param[in] max_sge + * The max number of scatter elements per WR. + * \return + * A valid ibv_srq pointer, or NULL on error. + */ +ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge) +{ + ibv_srq_init_attr sia; + // FIPS zeroization audit 20191115: this memset is not security related. + memset(&sia, 0, sizeof(sia)); + sia.srq_context = device->ctxt; + sia.attr.max_wr = max_wr; + sia.attr.max_sge = max_sge; + return ibv_create_srq(pd->pd, &sia); +} + +int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes) +{ + return memory_manager->get_send_buffers(c, bytes); +} + +/** + * Create a new QueuePair. This factory should be used in preference to + * the QueuePair constructor directly, since this lets derivatives of + * Infiniband, e.g. MockInfiniband (if it existed), + * return mocked out QueuePair derivatives. + * + * \return + * QueuePair on success or NULL if init fails + * See QueuePair::QueuePair for parameter documentation. + */ +Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, + CompletionQueue* rx, ibv_qp_type type, struct rdma_cm_id *cm_id) +{ + Infiniband::QueuePair *qp = new QueuePair( + cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len, cm_id); + if (qp->init()) { + delete qp; + return NULL; + } + return qp; +} + +int Infiniband::post_chunks_to_rq(int rq_wr_num, QueuePair *qp) +{ + int ret = 0; + Chunk *chunk = nullptr; + + ibv_recv_wr *rx_work_request = static_cast<ibv_recv_wr*>(::calloc(rq_wr_num, sizeof(ibv_recv_wr))); + ibv_sge *isge = static_cast<ibv_sge*>(::calloc(rq_wr_num, sizeof(ibv_sge))); + ceph_assert(rx_work_request); + ceph_assert(isge); + + int i = 0; + while (i < rq_wr_num) { + chunk = get_memory_manager()->get_rx_buffer(); + if (chunk == nullptr) { + lderr(cct) << __func__ << " WARNING: out of memory. Request " << rq_wr_num << + " rx buffers. Only get " << i << " rx buffers." << dendl; + if (i == 0) { + ::free(rx_work_request); + ::free(isge); + return 0; + } + break; //get some buffers, so we need post them to recevie queue + } + + isge[i].addr = reinterpret_cast<uint64_t>(chunk->data); + isge[i].length = chunk->bytes; + isge[i].lkey = chunk->lkey; + + rx_work_request[i].wr_id = reinterpret_cast<uint64_t>(chunk);// assign chunk address as work request id + + if (i != 0) { + rx_work_request[i - 1].next = &rx_work_request[i]; + } + rx_work_request[i].sg_list = &isge[i]; + rx_work_request[i].num_sge = 1; + + if (qp && !qp->get_srq()) { + chunk->set_qp(qp); + qp->add_rq_wr(chunk); + } + i++; + } + + ibv_recv_wr *badworkrequest = nullptr; + if (support_srq) { + ret = ibv_post_srq_recv(srq, rx_work_request, &badworkrequest); + } else { + ceph_assert(qp); + ret = ibv_post_recv(qp->get_qp(), rx_work_request, &badworkrequest); + } + + ::free(rx_work_request); + ::free(isge); + ceph_assert(badworkrequest == nullptr && ret == 0); + return i; +} + +Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c) +{ + Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this); + if (cc->init()) { + delete cc; + return NULL; + } + return cc; +} + +Infiniband::CompletionQueue* Infiniband::create_comp_queue( + CephContext *cct, CompletionChannel *cc) +{ + Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue( + cct, *this, CQ_DEPTH, cc); + if (cq->init()) { + delete cq; + return NULL; + } + return cq; +} + +Infiniband::QueuePair::~QueuePair() +{ + ldout(cct, 20) << __func__ << " destroy Queue Pair, qp number: " << qp->qp_num << " left SQ WR " << recv_queue.size() << dendl; + if (qp) { + ldout(cct, 20) << __func__ << " destroy qp=" << qp << dendl; + ceph_assert(!ibv_destroy_qp(qp)); + } + + for (auto& chunk: recv_queue) { + infiniband.get_memory_manager()->release_rx_buffer(chunk); + } + recv_queue.clear(); +} + +/** + * Given a string representation of the `status' field from Verbs + * struct `ibv_wc'. + * + * \param[in] status + * The integer status obtained in ibv_wc.status. + * \return + * A string corresponding to the given status. + */ +const char* Infiniband::wc_status_to_string(int status) +{ + static const char *lookup[] = { + "SUCCESS", + "LOC_LEN_ERR", + "LOC_QP_OP_ERR", + "LOC_EEC_OP_ERR", + "LOC_PROT_ERR", + "WR_FLUSH_ERR", + "MW_BIND_ERR", + "BAD_RESP_ERR", + "LOC_ACCESS_ERR", + "REM_INV_REQ_ERR", + "REM_ACCESS_ERR", + "REM_OP_ERR", + "RETRY_EXC_ERR", + "RNR_RETRY_EXC_ERR", + "LOC_RDD_VIOL_ERR", + "REM_INV_RD_REQ_ERR", + "REM_ABORT_ERR", + "INV_EECN_ERR", + "INV_EEC_STATE_ERR", + "FATAL_ERR", + "RESP_TIMEOUT_ERR", + "GENERAL_ERR" + }; + + if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR) + return "<status out of range!>"; + return lookup[status]; +} + +const char* Infiniband::qp_state_string(int status) { + switch(status) { + case IBV_QPS_RESET : return "IBV_QPS_RESET"; + case IBV_QPS_INIT : return "IBV_QPS_INIT"; + case IBV_QPS_RTR : return "IBV_QPS_RTR"; + case IBV_QPS_RTS : return "IBV_QPS_RTS"; + case IBV_QPS_SQD : return "IBV_QPS_SQD"; + case IBV_QPS_SQE : return "IBV_QPS_SQE"; + case IBV_QPS_ERR : return "IBV_QPS_ERR"; + default: return " out of range."; + } +} diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h new file mode 100644 index 000000000..f18442e4e --- /dev/null +++ b/src/msg/async/rdma/Infiniband.h @@ -0,0 +1,591 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY <haomai@xsky.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_INFINIBAND_H +#define CEPH_INFINIBAND_H + +#include <boost/pool/pool.hpp> +// need this because boost messes with ceph log/assert definitions +#include "include/ceph_assert.h" + +#include <infiniband/verbs.h> +#include <rdma/rdma_cma.h> + +#include <atomic> +#include <functional> +#include <string> +#include <vector> + +#include "include/common_fwd.h" +#include "include/int_types.h" +#include "include/page.h" +#include "include/scope_guard.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/ceph_mutex.h" +#include "common/perf_counters.h" +#include "msg/msg_types.h" +#include "msg/async/net_handler.h" + +#define HUGE_PAGE_SIZE_2MB (2 * 1024 * 1024) +#define ALIGN_TO_PAGE_2MB(x) \ + (((x) + (HUGE_PAGE_SIZE_2MB - 1)) & ~(HUGE_PAGE_SIZE_2MB - 1)) + +#define PSN_LEN 24 +#define PSN_MSK ((1 << PSN_LEN) - 1) + +#define BEACON_WRID 0xDEADBEEF + +struct ib_cm_meta_t { + uint16_t lid; + uint32_t local_qpn; + uint32_t psn; + uint32_t peer_qpn; + union ibv_gid gid; +} __attribute__((packed)); + +class RDMAStack; + +class Port { + struct ibv_context* ctxt; + int port_num; + struct ibv_port_attr port_attr; + uint16_t lid; + int gid_idx; + union ibv_gid gid; + + public: + explicit Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn); + uint16_t get_lid() { return lid; } + ibv_gid get_gid() { return gid; } + int get_port_num() { return port_num; } + ibv_port_attr* get_port_attr() { return &port_attr; } + int get_gid_idx() { return gid_idx; } +}; + + +class Device { + ibv_device *device; + const char* name; + uint8_t port_cnt = 0; + public: + explicit Device(CephContext *c, ibv_device* ib_dev); + explicit Device(CephContext *c, ibv_context *ib_ctx); + ~Device() { + if (active_port) { + delete active_port; + ceph_assert(ibv_close_device(ctxt) == 0); + } + } + const char* get_name() { return name;} + uint16_t get_lid() { return active_port->get_lid(); } + ibv_gid get_gid() { return active_port->get_gid(); } + int get_gid_idx() { return active_port->get_gid_idx(); } + void binding_port(CephContext *c, int port_num); + struct ibv_context *ctxt; + ibv_device_attr device_attr; + Port* active_port; +}; + + +class DeviceList { + struct ibv_device ** device_list; + struct ibv_context ** device_context_list; + int num; + Device** devices; + public: + explicit DeviceList(CephContext *cct): device_list(nullptr), device_context_list(nullptr), + num(0), devices(nullptr) { + device_list = ibv_get_device_list(&num); + ceph_assert(device_list); + ceph_assert(num); + if (cct->_conf->ms_async_rdma_cm) { + device_context_list = rdma_get_devices(NULL); + ceph_assert(device_context_list); + } + devices = new Device*[num]; + + for (int i = 0;i < num; ++i) { + if (cct->_conf->ms_async_rdma_cm) { + devices[i] = new Device(cct, device_context_list[i]); + } else { + devices[i] = new Device(cct, device_list[i]); + } + } + } + ~DeviceList() { + for (int i=0; i < num; ++i) { + delete devices[i]; + } + delete []devices; + ibv_free_device_list(device_list); + rdma_free_devices(device_context_list); + } + + Device* get_device(const char* device_name) { + for (int i = 0; i < num; ++i) { + if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) { + return devices[i]; + } + } + return NULL; + } +}; + +// stat counters +enum { + l_msgr_rdma_dispatcher_first = 94000, + + l_msgr_rdma_polling, + l_msgr_rdma_inflight_tx_chunks, + l_msgr_rdma_rx_bufs_in_use, + l_msgr_rdma_rx_bufs_total, + + l_msgr_rdma_tx_total_wc, + l_msgr_rdma_tx_total_wc_errors, + l_msgr_rdma_tx_wc_retry_errors, + l_msgr_rdma_tx_wc_wr_flush_errors, + + l_msgr_rdma_rx_total_wc, + l_msgr_rdma_rx_total_wc_errors, + l_msgr_rdma_rx_fin, + + l_msgr_rdma_handshake_errors, + + l_msgr_rdma_total_async_events, + l_msgr_rdma_async_last_wqe_events, + + l_msgr_rdma_created_queue_pair, + l_msgr_rdma_active_queue_pair, + + l_msgr_rdma_dispatcher_last, +}; + +enum { + l_msgr_rdma_first = 95000, + + l_msgr_rdma_tx_no_mem, + l_msgr_rdma_tx_parital_mem, + l_msgr_rdma_tx_failed, + + l_msgr_rdma_tx_chunks, + l_msgr_rdma_tx_bytes, + l_msgr_rdma_rx_chunks, + l_msgr_rdma_rx_bytes, + l_msgr_rdma_pending_sent_conns, + + l_msgr_rdma_last, +}; + +class RDMADispatcher; + +class Infiniband { + public: + class ProtectionDomain { + public: + explicit ProtectionDomain(CephContext *cct, Device *device); + ~ProtectionDomain(); + + ibv_pd* const pd; + }; + + class QueuePair; + class MemoryManager { + public: + class Chunk { + public: + Chunk(ibv_mr* m, uint32_t bytes, char* buffer, uint32_t offset = 0, uint32_t bound = 0, uint32_t lkey = 0, QueuePair* qp = nullptr); + ~Chunk(); + + uint32_t get_offset(); + uint32_t get_size() const; + void prepare_read(uint32_t b); + uint32_t get_bound(); + uint32_t read(char* buf, uint32_t len); + uint32_t write(char* buf, uint32_t len); + bool full(); + void reset_read_chunk(); + void reset_write_chunk(); + void set_qp(QueuePair *qp) { this->qp = qp; } + void clear_qp() { set_qp(nullptr); } + QueuePair* get_qp() { return qp; } + + public: + ibv_mr* mr; + QueuePair *qp; + uint32_t lkey; + uint32_t bytes; + uint32_t offset; + uint32_t bound; + char* buffer; // TODO: remove buffer/refactor TX + char data[0]; + }; + + class Cluster { + public: + Cluster(MemoryManager& m, uint32_t s); + ~Cluster(); + + int fill(uint32_t num); + void take_back(std::vector<Chunk*> &ck); + int get_buffers(std::vector<Chunk*> &chunks, size_t bytes); + Chunk *get_chunk_by_buffer(const char *c) { + uint32_t idx = (c - base) / buffer_size; + Chunk *chunk = chunk_base + idx; + return chunk; + } + bool is_my_buffer(const char *c) const { + return c >= base && c < end; + } + + bool is_valid_chunk(const Chunk* c) const { + return c >= chunk_base && c < chunk_base + num_chunk; + } + MemoryManager& manager; + uint32_t buffer_size; + uint32_t num_chunk = 0; + ceph::mutex lock = ceph::make_mutex("cluster_lock"); + std::vector<Chunk*> free_chunks; + char *base = nullptr; + char *end = nullptr; + Chunk* chunk_base = nullptr; + }; + + class MemPoolContext { + PerfCounters *perf_logger; + + public: + MemoryManager *manager; + unsigned n_bufs_allocated; + // true if it is possible to alloc + // more memory for the pool + explicit MemPoolContext(MemoryManager *m) : + perf_logger(nullptr), + manager(m), + n_bufs_allocated(0) {} + bool can_alloc(unsigned nbufs); + void update_stats(int val); + void set_stat_logger(PerfCounters *logger); + }; + + class PoolAllocator { + struct mem_info { + ibv_mr *mr; + MemPoolContext *ctx; + unsigned nbufs; + Chunk chunks[0]; + }; + public: + typedef std::size_t size_type; + typedef std::ptrdiff_t difference_type; + + static char * malloc(const size_type bytes); + static void free(char * const block); + + template<typename Func> + static std::invoke_result_t<Func> with_context(MemPoolContext* ctx, + Func&& func) { + std::lock_guard l{get_lock()}; + g_ctx = ctx; + scope_guard reset_ctx{[] { g_ctx = nullptr; }}; + return std::move(func)(); + } + private: + static ceph::mutex& get_lock(); + static MemPoolContext* g_ctx; + }; + + /** + * modify boost pool so that it is possible to + * have a thread safe 'context' when allocating/freeing + * the memory. It is needed to allow a different pool + * configurations and bookkeeping per CephContext and + * also to be able to use same allocator to deal with + * RX and TX pool. + * TODO: use boost pool to allocate TX chunks too + */ + class mem_pool : public boost::pool<PoolAllocator> { + private: + MemPoolContext *ctx; + void *slow_malloc(); + + public: + ceph::mutex lock = ceph::make_mutex("mem_pool_lock"); + explicit mem_pool(MemPoolContext *ctx, const size_type nrequested_size, + const size_type nnext_size = 32, + const size_type nmax_size = 0) : + pool(nrequested_size, nnext_size, nmax_size), + ctx(ctx) { } + + void *malloc() { + if (!store().empty()) + return (store().malloc)(); + // need to alloc more memory... + // slow path code + return slow_malloc(); + } + }; + + MemoryManager(CephContext *c, Device *d, ProtectionDomain *p); + ~MemoryManager(); + + void* malloc(size_t size); + void free(void *ptr); + + void create_tx_pool(uint32_t size, uint32_t tx_num); + void return_tx(std::vector<Chunk*> &chunks); + int get_send_buffers(std::vector<Chunk*> &c, size_t bytes); + bool is_tx_buffer(const char* c) { return send->is_my_buffer(c); } + bool is_valid_chunk(const Chunk* c) { return send->is_valid_chunk(c); } + Chunk *get_tx_chunk_by_buffer(const char *c) { + return send->get_chunk_by_buffer(c); + } + uint32_t get_tx_buffer_size() const { + return send->buffer_size; + } + + Chunk *get_rx_buffer() { + std::lock_guard l{rxbuf_pool.lock}; + return reinterpret_cast<Chunk *>(rxbuf_pool.malloc()); + } + + void release_rx_buffer(Chunk *chunk) { + std::lock_guard l{rxbuf_pool.lock}; + chunk->clear_qp(); + rxbuf_pool.free(chunk); + } + + void set_rx_stat_logger(PerfCounters *logger) { + rxbuf_pool_ctx.set_stat_logger(logger); + } + + CephContext *cct; + private: + // TODO: Cluster -> TxPool txbuf_pool + // chunk layout fix + // + Cluster* send = nullptr;// SEND + Device *device; + ProtectionDomain *pd; + MemPoolContext rxbuf_pool_ctx; + mem_pool rxbuf_pool; + + + void* huge_pages_malloc(size_t size); + void huge_pages_free(void *ptr); + }; + + private: + uint32_t tx_queue_len = 0; + uint32_t rx_queue_len = 0; + uint32_t max_sge = 0; + uint8_t ib_physical_port = 0; + MemoryManager* memory_manager = nullptr; + ibv_srq* srq = nullptr; // shared receive work queue + Device *device = NULL; + ProtectionDomain *pd = NULL; + DeviceList *device_list = nullptr; + CephContext *cct; + ceph::mutex lock = ceph::make_mutex("IB lock"); + bool initialized = false; + const std::string &device_name; + uint8_t port_num; + bool support_srq = false; + + public: + explicit Infiniband(CephContext *c); + ~Infiniband(); + void init(); + static void verify_prereq(CephContext *cct); + + class CompletionChannel { + static const uint32_t MAX_ACK_EVENT = 5000; + CephContext *cct; + Infiniband& infiniband; + ibv_comp_channel *channel; + ibv_cq *cq; + uint32_t cq_events_that_need_ack; + + public: + CompletionChannel(CephContext *c, Infiniband &ib); + ~CompletionChannel(); + int init(); + bool get_cq_event(); + int get_fd() { return channel->fd; } + ibv_comp_channel* get_channel() { return channel; } + void bind_cq(ibv_cq *c) { cq = c; } + void ack_events(); + }; + + // this class encapsulates the creation, use, and destruction of an RC + // completion queue. + // + // You need to call init and it will create a cq and associate to comp channel + class CompletionQueue { + public: + CompletionQueue(CephContext *c, Infiniband &ib, + const uint32_t qd, CompletionChannel *cc) + : cct(c), infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {} + ~CompletionQueue(); + int init(); + int poll_cq(int num_entries, ibv_wc *ret_wc_array); + + ibv_cq* get_cq() const { return cq; } + int rearm_notify(bool solicited_only=true); + CompletionChannel* get_cc() const { return channel; } + private: + CephContext *cct; + Infiniband& infiniband; // Infiniband to which this QP belongs + CompletionChannel *channel; + ibv_cq *cq; + uint32_t queue_depth; + }; + + // this class encapsulates the creation, use, and destruction of an RC + // queue pair. + // + // you need call init and it will create a qp and bring it to the INIT state. + // after obtaining the lid, qpn, and psn of a remote queue pair, one + // must call plumb() to bring the queue pair to the RTS state. + class QueuePair { + public: + typedef MemoryManager::Chunk Chunk; + QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type, + int ib_physical_port, ibv_srq *srq, + Infiniband::CompletionQueue* txcq, + Infiniband::CompletionQueue* rxcq, + uint32_t tx_queue_len, uint32_t max_recv_wr, struct rdma_cm_id *cid, uint32_t q_key = 0); + ~QueuePair(); + + int modify_qp_to_error(); + int modify_qp_to_rts(); + int modify_qp_to_rtr(); + int modify_qp_to_init(); + int init(); + + /** + * Get the initial packet sequence number for this QueuePair. + * This is randomly generated on creation. It should not be confused + * with the remote side's PSN, which is set in #plumb(). + */ + uint32_t get_initial_psn() const { return initial_psn; }; + /** + * Get the local queue pair number for this QueuePair. + * QPNs are analogous to UDP/TCP port numbers. + */ + uint32_t get_local_qp_number() const { return qp->qp_num; }; + /** + * Get the remote queue pair number for this QueuePair, as set in #plumb(). + * QPNs are analogous to UDP/TCP port numbers. + */ + int get_remote_qp_number(uint32_t *rqp) const; + /** + * Get the remote infiniband address for this QueuePair, as set in #plumb(). + * LIDs are "local IDs" in infiniband terminology. They are short, locally + * routable addresses. + */ + int get_remote_lid(uint16_t *lid) const; + /** + * Get the state of a QueuePair. + */ + int get_state() const; + /* + * send/receive connection management meta data + */ + int send_cm_meta(CephContext *cct, int socket_fd); + int recv_cm_meta(CephContext *cct, int socket_fd); + void wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data); + void gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[]); + ibv_qp* get_qp() const { return qp; } + Infiniband::CompletionQueue* get_tx_cq() const { return txcq; } + Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; } + int to_dead(); + bool is_dead() const { return dead; } + ib_cm_meta_t& get_peer_cm_meta() { return peer_cm_meta; } + ib_cm_meta_t& get_local_cm_meta() { return local_cm_meta; } + void add_rq_wr(Chunk* chunk) + { + if (srq) return; + + std::lock_guard l{lock}; + recv_queue.push_back(chunk); + } + + void remove_rq_wr(Chunk* chunk) { + if (srq) return; + + std::lock_guard l{lock}; + auto it = std::find(recv_queue.begin(), recv_queue.end(), chunk); + ceph_assert(it != recv_queue.end()); + recv_queue.erase(it); + } + ibv_srq* get_srq() const { return srq; } + + private: + CephContext *cct; + Infiniband& infiniband; // Infiniband to which this QP belongs + ibv_qp_type type; // QP type (IBV_QPT_RC, etc.) + ibv_context* ctxt; // device context of the HCA to use + int ib_physical_port; + ibv_pd* pd; // protection domain + ibv_srq* srq; // shared receive queue + ibv_qp* qp; // infiniband verbs QP handle + struct rdma_cm_id *cm_id; + ib_cm_meta_t peer_cm_meta; + ib_cm_meta_t local_cm_meta; + Infiniband::CompletionQueue* txcq; + Infiniband::CompletionQueue* rxcq; + uint32_t initial_psn; // initial packet sequence number + uint32_t max_send_wr; + uint32_t max_recv_wr; + uint32_t q_key; + bool dead; + std::vector<Chunk*> recv_queue; + ceph::mutex lock = ceph::make_mutex("queue_pair_lock"); + }; + + public: + typedef MemoryManager::Cluster Cluster; + typedef MemoryManager::Chunk Chunk; + QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, + ibv_qp_type type, struct rdma_cm_id *cm_id); + ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge); + // post rx buffers to srq, return number of buffers actually posted + int post_chunks_to_rq(int num, QueuePair *qp = nullptr); + void post_chunk_to_pool(Chunk* chunk) { + QueuePair *qp = chunk->get_qp(); + if (qp != nullptr) { + qp->remove_rq_wr(chunk); + } + get_memory_manager()->release_rx_buffer(chunk); + } + int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes); + CompletionChannel *create_comp_channel(CephContext *c); + CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL); + uint8_t get_ib_physical_port() { return ib_physical_port; } + uint16_t get_lid() { return device->get_lid(); } + ibv_gid get_gid() { return device->get_gid(); } + MemoryManager* get_memory_manager() { return memory_manager; } + Device* get_device() { return device; } + int get_async_fd() { return device->ctxt->async_fd; } + bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);} + Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); } + static const char* wc_status_to_string(int status); + static const char* qp_state_string(int status); + uint32_t get_rx_queue_len() const { return rx_queue_len; } +}; + +#endif diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc new file mode 100644 index 000000000..6c79dc54f --- /dev/null +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -0,0 +1,607 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY <haomai@xsky.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#include "RDMAStack.h" + +class C_handle_connection_established : public EventCallback { + RDMAConnectedSocketImpl *csi; + bool active = true; + public: + C_handle_connection_established(RDMAConnectedSocketImpl *w) : csi(w) {} + void do_request(uint64_t fd) final { + if (active) + csi->handle_connection_established(); + } + void close() { + active = false; + } +}; + +class C_handle_connection_read : public EventCallback { + RDMAConnectedSocketImpl *csi; + bool active = true; + public: + explicit C_handle_connection_read(RDMAConnectedSocketImpl *w): csi(w) {} + void do_request(uint64_t fd) final { + if (active) + csi->handle_connection(); + } + void close() { + active = false; + } +}; + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << " RDMAConnectedSocketImpl " + +RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband> &ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, + RDMAWorker *w) + : cct(cct), connected(0), error(0), ib(ib), + dispatcher(rdma_dispatcher), worker(w), + is_server(false), read_handler(new C_handle_connection_read(this)), + established_handler(new C_handle_connection_established(this)), + active(false), pending(false) +{ + if (!cct->_conf->ms_async_rdma_cm) { + qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL); + if (!qp) { + lderr(cct) << __func__ << " queue pair create failed" << dendl; + return; + } + local_qpn = qp->get_local_qp_number(); + notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); + dispatcher->register_qp(qp, this); + dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair); + dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair); + } +} + +RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() +{ + ldout(cct, 20) << __func__ << " destruct." << dendl; + cleanup(); + worker->remove_pending_conn(this); + dispatcher->schedule_qp_destroy(local_qpn); + + for (unsigned i=0; i < wc.size(); ++i) { + dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id)); + } + for (unsigned i=0; i < buffers.size(); ++i) { + dispatcher->post_chunk_to_pool(buffers[i]); + } + + std::lock_guard l{lock}; + if (notify_fd >= 0) + ::close(notify_fd); + if (tcp_fd >= 0) + ::close(tcp_fd); + error = ECONNRESET; +} + +void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v) +{ + std::lock_guard l{lock}; + if (wc.empty()) + wc = std::move(v); + else + wc.insert(wc.end(), v.begin(), v.end()); + notify(); +} + +void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w) +{ + std::lock_guard l{lock}; + if (wc.empty()) + return ; + w.swap(wc); +} + +int RDMAConnectedSocketImpl::activate() +{ + qp->get_local_cm_meta().peer_qpn = qp->get_peer_cm_meta().local_qpn; + if (qp->modify_qp_to_rtr() != 0) + return -1; + + if (qp->modify_qp_to_rts() != 0) + return -1; + + if (!is_server) { + connected = 1; //indicate successfully + ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_qpn << dendl; + submit(false); + } + active = true; + peer_qpn = qp->get_local_cm_meta().peer_qpn; + + return 0; +} + +int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { + ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:" + << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl; + ceph::NetHandler net(cct); + + // we construct a socket to transport ib sync message + // but we shouldn't block in tcp connecting + if (opts.nonblock) { + tcp_fd = net.nonblock_connect(peer_addr, opts.connect_bind_addr); + } else { + tcp_fd = net.connect(peer_addr, opts.connect_bind_addr); + } + + if (tcp_fd < 0) { + return -errno; + } + + int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size); + if (r < 0) { + ::close(tcp_fd); + tcp_fd = -1; + return -errno; + } + + ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl; + net.set_priority(tcp_fd, opts.priority, peer_addr.get_family()); + r = 0; + if (opts.nonblock) { + worker->center.create_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE , established_handler); + } else { + r = handle_connection_established(false); + } + return r; +} + +int RDMAConnectedSocketImpl::handle_connection_established(bool need_set_fault) { + ldout(cct, 20) << __func__ << " start " << dendl; + // delete read event + worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE); + if (1 == connected) { + ldout(cct, 1) << __func__ << " warnning: logic failed " << dendl; + if (need_set_fault) { + fault(); + } + return -1; + } + // send handshake msg to server + qp->get_local_cm_meta().peer_qpn = 0; + int r = qp->send_cm_meta(cct, tcp_fd); + if (r < 0) { + ldout(cct, 1) << __func__ << " send handshake msg failed." << r << dendl; + if (need_set_fault) { + fault(); + } + return r; + } + worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler); + ldout(cct, 20) << __func__ << " finish " << dendl; + return 0; +} + +void RDMAConnectedSocketImpl::handle_connection() { + ldout(cct, 20) << __func__ << " QP: " << local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl; + int r = qp->recv_cm_meta(cct, tcp_fd); + if (r <= 0) { + if (r != -EAGAIN) { + dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); + ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl; + fault(); + } + return; + } + + if (1 == connected) { + ldout(cct, 1) << __func__ << " warnning: logic failed: read len: " << r << dendl; + fault(); + return; + } + + if (!is_server) {// first time: cm meta sync + ack from server + if (!connected) { + r = activate(); + ceph_assert(!r); + } + notify(); + r = qp->send_cm_meta(cct, tcp_fd); + if (r < 0) { + ldout(cct, 1) << __func__ << " send client ack failed." << dendl; + dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); + fault(); + } + } else { + if (qp->get_peer_cm_meta().peer_qpn == 0) {// first time: cm meta sync from client + if (active) { + ldout(cct, 10) << __func__ << " server is already active." << dendl; + return ; + } + r = activate(); + ceph_assert(!r); + r = qp->send_cm_meta(cct, tcp_fd); + if (r < 0) { + ldout(cct, 1) << __func__ << " server ack failed." << dendl; + dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); + fault(); + return ; + } + } else { // second time: cm meta ack from client + connected = 1; + ldout(cct, 10) << __func__ << " handshake of rdma is done. server connected: " << connected << dendl; + //cleanup(); + submit(false); + notify(); + } + } +} + +ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) +{ + eventfd_t event_val = 0; + int r = eventfd_read(notify_fd, &event_val); + ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_qpn + << " r = " << r << dendl; + + if (!active) { + ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl; + return -EAGAIN; + } + + if (0 == connected) { + ldout(cct, 1) << __func__ << " when ib not connected. len: " << len <<dendl; + return -EAGAIN; + } + ssize_t read = 0; + read = read_buffers(buf,len); + + if (is_server && connected == 0) { + ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << local_qpn << " peer QP: " << peer_qpn << dendl; + connected = 1; //if so, we don't need the last handshake + cleanup(); + submit(false); + } + + if (!buffers.empty()) { + notify(); + } + + if (read == 0 && error) + return -error; + return read == 0 ? -EAGAIN : read; +} + +void RDMAConnectedSocketImpl::buffer_prefetch(void) +{ + std::vector<ibv_wc> cqe; + get_wc(cqe); + if(cqe.empty()) + return; + + for(size_t i = 0; i < cqe.size(); ++i) { + ibv_wc* response = &cqe[i]; + ceph_assert(response->status == IBV_WC_SUCCESS); + Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id); + chunk->prepare_read(response->byte_len); + + if (chunk->get_size() == 0) { + chunk->reset_read_chunk(); + dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin); + if (connected) { + error = ECONNRESET; + ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; + } + dispatcher->post_chunk_to_pool(chunk); + continue; + } else { + buffers.push_back(chunk); + ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; + } + } + worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size()); +} + +ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) +{ + size_t read_size = 0, tmp = 0; + buffer_prefetch(); + auto pchunk = buffers.begin(); + while (pchunk != buffers.end()) { + tmp = (*pchunk)->read(buf + read_size, len - read_size); + read_size += tmp; + ldout(cct, 25) << __func__ << " read chunk " << *pchunk << " bytes length" << tmp << " offset: " + << (*pchunk)->get_offset() << " ,bound: " << (*pchunk)->get_bound() << dendl; + + if ((*pchunk)->get_size() == 0) { + (*pchunk)->reset_read_chunk(); + dispatcher->post_chunk_to_pool(*pchunk); + update_post_backlog(); + ldout(cct, 25) << __func__ << " read over one chunk " << dendl; + pchunk++; + } + + if (read_size == len) { + break; + } + } + + buffers.erase(buffers.begin(), pchunk); + ldout(cct, 25) << __func__ << " got " << read_size << " bytes, buffers size: " << buffers.size() << dendl; + worker->perf_logger->inc(l_msgr_rdma_rx_bytes, read_size); + return read_size; +} + +ssize_t RDMAConnectedSocketImpl::send(ceph::buffer::list &bl, bool more) +{ + if (error) { + if (!active) + return -EPIPE; + return -error; + } + size_t bytes = bl.length(); + if (!bytes) + return 0; + { + std::lock_guard l{lock}; + pending_bl.claim_append(bl); + if (!connected) { + ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_qpn << dendl; + return bytes; + } + } + ldout(cct, 20) << __func__ << " QP: " << local_qpn << dendl; + ssize_t r = submit(more); + if (r < 0 && r != -EAGAIN) + return r; + return bytes; +} + +size_t RDMAConnectedSocketImpl::tx_copy_chunk(std::vector<Chunk*> &tx_buffers, + size_t req_copy_len, decltype(std::cbegin(pending_bl.buffers()))& start, + const decltype(std::cbegin(pending_bl.buffers()))& end) +{ + ceph_assert(start != end); + auto chunk_idx = tx_buffers.size(); + if (0 == worker->get_reged_mem(this, tx_buffers, req_copy_len)) { + ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_no_mem); + return 0; + } + + Chunk *current_chunk = tx_buffers[chunk_idx]; + size_t write_len = 0; + while (start != end) { + const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str()); + + size_t slice_write_len = 0; + while (slice_write_len < start->length()) { + size_t real_len = current_chunk->write((char*)addr + slice_write_len, start->length() - slice_write_len); + + slice_write_len += real_len; + write_len += real_len; + req_copy_len -= real_len; + + if (current_chunk->full()) { + if (++chunk_idx == tx_buffers.size()) + return write_len; + current_chunk = tx_buffers[chunk_idx]; + } + } + + ++start; + } + ceph_assert(req_copy_len == 0); + return write_len; +} + +ssize_t RDMAConnectedSocketImpl::submit(bool more) +{ + if (error) + return -error; + std::lock_guard l{lock}; + size_t bytes = pending_bl.length(); + ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: " + << pending_bl.get_num_buffers() << dendl; + if (!bytes) + return 0; + + std::vector<Chunk*> tx_buffers; + auto it = std::cbegin(pending_bl.buffers()); + auto copy_start = it; + size_t total_copied = 0, wait_copy_len = 0; + while (it != pending_bl.buffers().end()) { + if (ib->is_tx_buffer(it->raw_c_str())) { + if (wait_copy_len) { + size_t copied = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it); + total_copied += copied; + if (copied < wait_copy_len) + goto sending; + wait_copy_len = 0; + } + ceph_assert(copy_start == it); + tx_buffers.push_back(ib->get_tx_chunk_by_buffer(it->raw_c_str())); + total_copied += it->length(); + ++copy_start; + } else { + wait_copy_len += it->length(); + } + ++it; + } + if (wait_copy_len) + total_copied += tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it); + + sending: + if (total_copied == 0) + return -EAGAIN; + ceph_assert(total_copied <= pending_bl.length()); + ceph::buffer::list swapped; + if (total_copied < pending_bl.length()) { + worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem); + pending_bl.splice(total_copied, pending_bl.length() - total_copied, &swapped); + pending_bl.swap(swapped); + } else { + pending_bl.clear(); + } + + ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers " + << pending_bl.get_num_buffers() << " tx chunks " << tx_buffers.size() << dendl; + + int r = post_work_request(tx_buffers); + if (r < 0) + return r; + + ldout(cct, 20) << __func__ << " finished sending " << total_copied << " bytes." << dendl; + return pending_bl.length() ? -EAGAIN : 0; +} + +int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers) +{ + ldout(cct, 20) << __func__ << " QP: " << local_qpn << " " << tx_buffers[0] << dendl; + auto current_buffer = tx_buffers.begin(); + ibv_sge isge[tx_buffers.size()]; + uint32_t current_sge = 0; + ibv_send_wr iswr[tx_buffers.size()]; + uint32_t current_swr = 0; + ibv_send_wr* pre_wr = NULL; + uint32_t num = 0; + + // FIPS zeroization audit 20191115: these memsets are not security related. + memset(iswr, 0, sizeof(iswr)); + memset(isge, 0, sizeof(isge)); + + while (current_buffer != tx_buffers.end()) { + isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer); + isge[current_sge].length = (*current_buffer)->get_offset(); + isge[current_sge].lkey = (*current_buffer)->mr->lkey; + ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl; + + iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer); + iswr[current_swr].next = NULL; + iswr[current_swr].sg_list = &isge[current_sge]; + iswr[current_swr].num_sge = 1; + iswr[current_swr].opcode = IBV_WR_SEND; + iswr[current_swr].send_flags = IBV_SEND_SIGNALED; + + num++; + worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length); + if (pre_wr) + pre_wr->next = &iswr[current_swr]; + pre_wr = &iswr[current_swr]; + ++current_sge; + ++current_swr; + ++current_buffer; + } + + ibv_send_wr *bad_tx_work_request = nullptr; + if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) { + ldout(cct, 1) << __func__ << " failed to send data" + << " (most probably should be peer not ready): " + << cpp_strerror(errno) << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_failed); + return -errno; + } + worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size()); + ldout(cct, 20) << __func__ << " qp state is " << get_qp_state() << dendl; + return 0; +} + +void RDMAConnectedSocketImpl::fin() { + ibv_send_wr wr; + // FIPS zeroization audit 20191115: this memset is not security related. + memset(&wr, 0, sizeof(wr)); + + wr.wr_id = reinterpret_cast<uint64_t>(qp); + wr.num_sge = 0; + wr.opcode = IBV_WR_SEND; + wr.send_flags = IBV_SEND_SIGNALED; + ibv_send_wr* bad_tx_work_request = nullptr; + if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) { + ldout(cct, 1) << __func__ << " failed to send message=" + << " ibv_post_send failed(most probably should be peer not ready): " + << cpp_strerror(errno) << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_failed); + return ; + } +} + +void RDMAConnectedSocketImpl::cleanup() { + if (read_handler && tcp_fd >= 0) { + (static_cast<C_handle_connection_read*>(read_handler))->close(); + worker->center.submit_to(worker->center.get_id(), [this]() { + worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE); + }, false); + delete read_handler; + read_handler = nullptr; + } + if (established_handler) { + (static_cast<C_handle_connection_established*>(established_handler))->close(); + delete established_handler; + established_handler = nullptr; + } +} + +void RDMAConnectedSocketImpl::notify() +{ + eventfd_t event_val = 1; + int r = eventfd_write(notify_fd, event_val); + ceph_assert(r == 0); +} + +void RDMAConnectedSocketImpl::shutdown() +{ + if (!error) + fin(); + error = ECONNRESET; + active = false; +} + +void RDMAConnectedSocketImpl::close() +{ + if (!error) + fin(); + error = ECONNRESET; + active = false; +} + +void RDMAConnectedSocketImpl::set_priority(int sd, int prio, int domain) { + ceph::NetHandler net(cct); + net.set_priority(sd, prio, domain); +} + +void RDMAConnectedSocketImpl::fault() +{ + ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl; + error = ECONNRESET; + connected = 1; + notify(); +} + +void RDMAConnectedSocketImpl::set_accept_fd(int sd) +{ + tcp_fd = sd; + is_server = true; + worker->center.submit_to(worker->center.get_id(), [this]() { + worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler); + }, true); +} + +void RDMAConnectedSocketImpl::post_chunks_to_rq(int num) +{ + post_backlog += num - ib->post_chunks_to_rq(num, qp); +} + +void RDMAConnectedSocketImpl::update_post_backlog() +{ + if (post_backlog) + post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp); +} diff --git a/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc new file mode 100644 index 000000000..606dbd281 --- /dev/null +++ b/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc @@ -0,0 +1,183 @@ +#include "RDMAStack.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << " RDMAIWARPConnectedSocketImpl " + +#define TIMEOUT_MS 3000 +#define RETRY_COUNT 7 + +RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, + RDMAWorker *w, RDMACMInfo *info) + : RDMAConnectedSocketImpl(cct, ib, rdma_dispatcher, w), cm_con_handler(new C_handle_cm_connection(this)) +{ + status = IDLE; + notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); + if (info) { + is_server = true; + cm_id = info->cm_id; + cm_channel = info->cm_channel; + status = RDMA_ID_CREATED; + peer_qpn = info->qp_num; + if (alloc_resource()) { + close_notify(); + return; + } + worker->center.submit_to(worker->center.get_id(), [this]() { + worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler); + status = CHANNEL_FD_CREATED; + }, false); + status = RESOURCE_ALLOCATED; + qp->get_local_cm_meta().peer_qpn = peer_qpn; + qp->get_peer_cm_meta().local_qpn = peer_qpn; + } else { + is_server = false; + cm_channel = rdma_create_event_channel(); + rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP); + status = RDMA_ID_CREATED; + ldout(cct, 20) << __func__ << " successfully created cm id: " << cm_id << dendl; + } +} + +RDMAIWARPConnectedSocketImpl::~RDMAIWARPConnectedSocketImpl() { + ldout(cct, 20) << __func__ << " destruct." << dendl; + std::unique_lock l(close_mtx); + close_condition.wait(l, [&] { return closed; }); + if (status >= RDMA_ID_CREATED) { + rdma_destroy_id(cm_id); + rdma_destroy_event_channel(cm_channel); + } +} + +int RDMAIWARPConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { + worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler); + status = CHANNEL_FD_CREATED; + if (rdma_resolve_addr(cm_id, NULL, const_cast<struct sockaddr*>(peer_addr.get_sockaddr()), TIMEOUT_MS)) { + lderr(cct) << __func__ << " failed to resolve addr" << dendl; + return -1; + } + return 0; +} + +void RDMAIWARPConnectedSocketImpl::close() { + error = ECONNRESET; + active = false; + if (status >= CONNECTED) { + rdma_disconnect(cm_id); + } + close_notify(); +} + +void RDMAIWARPConnectedSocketImpl::shutdown() { + error = ECONNRESET; + active = false; +} + +void RDMAIWARPConnectedSocketImpl::handle_cm_connection() { + struct rdma_cm_event *event; + rdma_get_cm_event(cm_channel, &event); + ldout(cct, 20) << __func__ << " event name: " << rdma_event_str(event->event) + << " (cm id: " << cm_id << ")" << dendl; + struct rdma_conn_param cm_params; + switch (event->event) { + case RDMA_CM_EVENT_ADDR_RESOLVED: + status = ADDR_RESOLVED; + if (rdma_resolve_route(cm_id, TIMEOUT_MS)) { + lderr(cct) << __func__ << " failed to resolve rdma addr" << dendl; + notify(); + } + break; + + case RDMA_CM_EVENT_ROUTE_RESOLVED: + status = ROUTE_RESOLVED; + if (alloc_resource()) { + lderr(cct) << __func__ << " failed to alloc resource while resolving the route" << dendl; + connected = -ECONNREFUSED; + notify(); + break; + } + + // FIPS zeroization audit 20191115: this memset is not security related. + memset(&cm_params, 0, sizeof(cm_params)); + cm_params.retry_count = RETRY_COUNT; + cm_params.qp_num = local_qpn; + if (rdma_connect(cm_id, &cm_params)) { + lderr(cct) << __func__ << " failed to connect remote rdma port" << dendl; + connected = -ECONNREFUSED; + notify(); + } + break; + + case RDMA_CM_EVENT_ESTABLISHED: + ldout(cct, 20) << __func__ << " qp_num=" << cm_id->qp->qp_num << dendl; + status = CONNECTED; + if (!is_server) { + peer_qpn = event->param.conn.qp_num; + activate(); + qp->get_local_cm_meta().peer_qpn = peer_qpn; + qp->get_peer_cm_meta().local_qpn = peer_qpn; + notify(); + } + break; + + case RDMA_CM_EVENT_ADDR_ERROR: + case RDMA_CM_EVENT_ROUTE_ERROR: + case RDMA_CM_EVENT_CONNECT_ERROR: + case RDMA_CM_EVENT_UNREACHABLE: + case RDMA_CM_EVENT_REJECTED: + lderr(cct) << __func__ << " rdma connection rejected" << dendl; + connected = -ECONNREFUSED; + notify(); + break; + + case RDMA_CM_EVENT_DISCONNECTED: + status = DISCONNECTED; + close_notify(); + if (!error) { + error = ECONNRESET; + notify(); + } + break; + + case RDMA_CM_EVENT_DEVICE_REMOVAL: + break; + + default: + ceph_abort_msg("unhandled event"); + break; + } + rdma_ack_cm_event(event); +} + +void RDMAIWARPConnectedSocketImpl::activate() { + ldout(cct, 30) << __func__ << dendl; + active = true; + connected = 1; +} + +int RDMAIWARPConnectedSocketImpl::alloc_resource() { + ldout(cct, 30) << __func__ << dendl; + qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), + dispatcher->get_rx_cq(), IBV_QPT_RC, cm_id); + if (!qp) { + return -1; + } + local_qpn = qp->get_local_qp_number(); + dispatcher->register_qp(qp, this); + dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair); + dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair); + return 0; +} + +void RDMAIWARPConnectedSocketImpl::close_notify() { + ldout(cct, 30) << __func__ << dendl; + if (status >= CHANNEL_FD_CREATED) { + worker->center.delete_file_event(cm_channel->fd, EVENT_READABLE); + } + std::unique_lock l(close_mtx); + if (!closed) { + closed = true; + close_condition.notify_all(); + } +} diff --git a/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc new file mode 100644 index 000000000..0500b4420 --- /dev/null +++ b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc @@ -0,0 +1,119 @@ +#include <poll.h> + +#include "msg/async/net_handler.h" +#include "RDMAStack.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << " RDMAIWARPServerSocketImpl " + +RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl( + CephContext *cct, std::shared_ptr<Infiniband>& ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w, + entity_addr_t& a, unsigned addr_slot) + : RDMAServerSocketImpl(cct, ib, rdma_dispatcher, w, a, addr_slot) +{ +} + +int RDMAIWARPServerSocketImpl::listen(entity_addr_t &sa, + const SocketOptions &opt) +{ + ldout(cct, 20) << __func__ << " bind to rdma point" << dendl; + cm_channel = rdma_create_event_channel(); + rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP); + ldout(cct, 20) << __func__ << " successfully created cm id: " << cm_id << dendl; + int rc = rdma_bind_addr(cm_id, const_cast<struct sockaddr*>(sa.get_sockaddr())); + if (rc < 0) { + rc = -errno; + ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr() + << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl; + goto err; + } + rc = rdma_listen(cm_id, 128); + if (rc < 0) { + rc = -errno; + ldout(cct, 10) << __func__ << " unable to listen to " << sa.get_sockaddr() + << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl; + goto err; + } + server_setup_socket = cm_channel->fd; + rc = net.set_nonblock(server_setup_socket); + if (rc < 0) { + goto err; + } + ldout(cct, 20) << __func__ << " fd of cm_channel is " << server_setup_socket << dendl; + return 0; + +err: + server_setup_socket = -1; + rdma_destroy_id(cm_id); + rdma_destroy_event_channel(cm_channel); + return rc; +} + +int RDMAIWARPServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, + entity_addr_t *out, Worker *w) +{ + ldout(cct, 15) << __func__ << dendl; + + ceph_assert(sock); + struct pollfd pfd = { + .fd = cm_channel->fd, + .events = POLLIN, + .revents = 0, + }; + int ret = poll(&pfd, 1, 0); + ceph_assert(ret >= 0); + if (!ret) + return -EAGAIN; + + struct rdma_cm_event *cm_event; + rdma_get_cm_event(cm_channel, &cm_event); + ldout(cct, 20) << __func__ << " event name: " << rdma_event_str(cm_event->event) << dendl; + + struct rdma_cm_id *event_cm_id = cm_event->id; + struct rdma_event_channel *event_channel = rdma_create_event_channel(); + + if (net.set_nonblock(event_channel->fd) < 0) { + lderr(cct) << __func__ << " failed to switch event channel to non-block, close event channel " << dendl; + rdma_destroy_event_channel(event_channel); + rdma_ack_cm_event(cm_event); + return -errno; + } + + rdma_migrate_id(event_cm_id, event_channel); + + struct rdma_conn_param *remote_conn_param = &cm_event->param.conn; + struct rdma_conn_param local_conn_param; + + RDMACMInfo info(event_cm_id, event_channel, remote_conn_param->qp_num); + RDMAIWARPConnectedSocketImpl* server = + new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, dynamic_cast<RDMAWorker*>(w), &info); + + // FIPS zeroization audit 20191115: this memset is not security related. + memset(&local_conn_param, 0, sizeof(local_conn_param)); + local_conn_param.qp_num = server->get_local_qpn(); + + if (rdma_accept(event_cm_id, &local_conn_param)) { + return -EAGAIN; + } + server->activate(); + ldout(cct, 20) << __func__ << " accepted a new QP" << dendl; + + rdma_ack_cm_event(cm_event); + + std::unique_ptr<RDMAConnectedSocketImpl> csi(server); + *sock = ConnectedSocket(std::move(csi)); + struct sockaddr *addr = &event_cm_id->route.addr.dst_addr; + out->set_sockaddr(addr); + + return 0; +} + +void RDMAIWARPServerSocketImpl::abort_accept() +{ + if (server_setup_socket >= 0) { + rdma_destroy_id(cm_id); + rdma_destroy_event_channel(cm_channel); + } +} diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc new file mode 100644 index 000000000..665faa931 --- /dev/null +++ b/src/msg/async/rdma/RDMAServerSocketImpl.cc @@ -0,0 +1,135 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY <haomai@xsky.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "msg/async/net_handler.h" +#include "RDMAStack.h" + +#include "include/compat.h" +#include "include/sock_compat.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << " RDMAServerSocketImpl " + +RDMAServerSocketImpl::RDMAServerSocketImpl( + CephContext *cct, std::shared_ptr<Infiniband>& ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, + RDMAWorker *w, entity_addr_t& a, unsigned slot) + : ServerSocketImpl(a.get_type(), slot), + cct(cct), net(cct), server_setup_socket(-1), ib(ib), + dispatcher(rdma_dispatcher), worker(w), sa(a) +{ +} + +int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt) +{ + int rc = 0; + server_setup_socket = net.create_socket(sa.get_family(), true); + if (server_setup_socket < 0) { + rc = -errno; + lderr(cct) << __func__ << " failed to create server socket: " + << cpp_strerror(errno) << dendl; + return rc; + } + + rc = net.set_nonblock(server_setup_socket); + if (rc < 0) { + goto err; + } + + rc = net.set_socket_options(server_setup_socket, opt.nodelay, opt.rcbuf_size); + if (rc < 0) { + goto err; + } + + rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len()); + if (rc < 0) { + rc = -errno; + ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr() + << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl; + goto err; + } + + rc = ::listen(server_setup_socket, cct->_conf->ms_tcp_listen_backlog); + if (rc < 0) { + rc = -errno; + lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(errno) << dendl; + goto err; + } + + ldout(cct, 20) << __func__ << " bind to " << sa.get_sockaddr() << " on port " << sa.get_port() << dendl; + return 0; + +err: + ::close(server_setup_socket); + server_setup_socket = -1; + return rc; +} + +int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) +{ + ldout(cct, 15) << __func__ << dendl; + + ceph_assert(sock); + + sockaddr_storage ss; + socklen_t slen = sizeof(ss); + int sd = accept_cloexec(server_setup_socket, (sockaddr*)&ss, &slen); + if (sd < 0) { + return -errno; + } + + int r = net.set_nonblock(sd); + if (r < 0) { + ::close(sd); + return -errno; + } + + r = net.set_socket_options(sd, opt.nodelay, opt.rcbuf_size); + if (r < 0) { + ::close(sd); + return -errno; + } + + ceph_assert(NULL != out); //out should not be NULL in accept connection + + out->set_type(addr_type); + out->set_sockaddr((sockaddr*)&ss); + net.set_priority(sd, opt.priority, out->get_family()); + + RDMAConnectedSocketImpl* server; + //Worker* w = dispatcher->get_stack()->get_worker(); + server = new RDMAConnectedSocketImpl(cct, ib, dispatcher, dynamic_cast<RDMAWorker*>(w)); + if (!server->get_qp()) { + lderr(cct) << __func__ << " server->qp is null" << dendl; + // cann't use delete server here, destructor will fail. + server->cleanup(); + ::close(sd); + return -1; + } + server->set_accept_fd(sd); + ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl; + std::unique_ptr<RDMAConnectedSocketImpl> csi(server); + *sock = ConnectedSocket(std::move(csi)); + + return 0; +} + +void RDMAServerSocketImpl::abort_accept() +{ + if (server_setup_socket >= 0) + ::close(server_setup_socket); +} diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc new file mode 100644 index 000000000..12db599d6 --- /dev/null +++ b/src/msg/async/rdma/RDMAStack.cc @@ -0,0 +1,812 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY <haomai@xsky.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <poll.h> +#include <errno.h> +#include <sys/time.h> +#include <sys/resource.h> + +#include "include/str_list.h" +#include "include/compat.h" +#include "common/Cycles.h" +#include "common/deleter.h" +#include "RDMAStack.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << "RDMAStack " + +RDMADispatcher::~RDMADispatcher() +{ + ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl; + polling_stop(); + + ceph_assert(qp_conns.empty()); + ceph_assert(num_qp_conn == 0); + ceph_assert(dead_queue_pairs.empty()); +} + +RDMADispatcher::RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib) + : cct(c), ib(ib) +{ + PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last); + + plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling"); + plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks"); + plb.add_u64_counter(l_msgr_rdma_rx_bufs_in_use, "rx_bufs_in_use", "The number of rx buffers that are holding data and being processed"); + plb.add_u64_counter(l_msgr_rdma_rx_bufs_total, "rx_bufs_total", "The total number of rx buffers"); + + plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions"); + plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors"); + plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors"); + plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors"); + + plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion"); + plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion"); + plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request"); + + plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events"); + plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events"); + + plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors"); + + + plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number"); + plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number"); + + perf_logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(perf_logger); + Cycles::init(); +} + +void RDMADispatcher::polling_start() +{ + // take lock because listen/connect can happen from different worker threads + std::lock_guard l{lock}; + + if (t.joinable()) + return; // dispatcher thread already running + + ib->get_memory_manager()->set_rx_stat_logger(perf_logger); + + tx_cc = ib->create_comp_channel(cct); + ceph_assert(tx_cc); + rx_cc = ib->create_comp_channel(cct); + ceph_assert(rx_cc); + tx_cq = ib->create_comp_queue(cct, tx_cc); + ceph_assert(tx_cq); + rx_cq = ib->create_comp_queue(cct, rx_cc); + ceph_assert(rx_cq); + + t = std::thread(&RDMADispatcher::polling, this); + ceph_pthread_setname(t.native_handle(), "rdma-polling"); +} + +void RDMADispatcher::polling_stop() +{ + { + std::lock_guard l{lock}; + done = true; + } + + if (!t.joinable()) + return; + + t.join(); + + tx_cc->ack_events(); + rx_cc->ack_events(); + delete tx_cq; + delete rx_cq; + delete tx_cc; + delete rx_cc; +} + +void RDMADispatcher::handle_async_event() +{ + ldout(cct, 30) << __func__ << dendl; + while (1) { + ibv_async_event async_event; + if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) { + if (errno != EAGAIN) + lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno + << " " << cpp_strerror(errno) << ")" << dendl; + return; + } + perf_logger->inc(l_msgr_rdma_total_async_events); + ldout(cct, 1) << __func__ << "Event : " << ibv_event_type_str(async_event.event_type) << dendl; + + switch (async_event.event_type) { + /***********************CQ events********************/ + case IBV_EVENT_CQ_ERR: + lderr(cct) << __func__ << " Fatal Error, effect all QP bound with same CQ, " + << " CQ Overflow, dev = " << ib->get_device()->ctxt + << " Need destroy and recreate resource " << dendl; + break; + /***********************QP events********************/ + case IBV_EVENT_QP_FATAL: + { + /* Error occurred on a QP and it transitioned to error state */ + ibv_qp* ib_qp = async_event.element.qp; + uint32_t qpn = ib_qp->qp_num; + QueuePair* qp = get_qp(qpn); + lderr(cct) << __func__ << " Fatal Error, event associate qp number: " << qpn + << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state()) + << " Event : " << ibv_event_type_str(async_event.event_type) << dendl; + } + break; + case IBV_EVENT_QP_LAST_WQE_REACHED: + { + /* + * 1. The QP bound with SRQ is in IBV_QPS_ERR state & no more WQE on the RQ of the QP + * Reason: QP is force switched into Error before posting Beacon WR. + * The QP's WRs will be flushed into CQ with IBV_WC_WR_FLUSH_ERR status + * For SRQ, only WRs on the QP which is switched into Error status will be flushed. + * Handle: Only confirm that qp enter into dead queue pairs + * 2. The CQE with error was generated for the last WQE + * Handle: output error log + */ + perf_logger->inc(l_msgr_rdma_async_last_wqe_events); + ibv_qp* ib_qp = async_event.element.qp; + uint32_t qpn = ib_qp->qp_num; + std::lock_guard l{lock}; + RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn); + QueuePair* qp = get_qp_lockless(qpn); + + if (qp && !qp->is_dead()) { + lderr(cct) << __func__ << " QP not dead, event associate qp number: " << qpn + << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state()) + << " Event : " << ibv_event_type_str(async_event.event_type) << dendl; + } + if (!conn) { + ldout(cct, 20) << __func__ << " Connection's QP maybe entered into dead status. " + << " qp number: " << qpn << dendl; + } else { + conn->fault(); + if (qp) { + if (!cct->_conf->ms_async_rdma_cm) { + enqueue_dead_qp_lockless(qpn); + } + } + } + } + break; + case IBV_EVENT_QP_REQ_ERR: + /* Invalid Request Local Work Queue Error */ + [[fallthrough]]; + case IBV_EVENT_QP_ACCESS_ERR: + /* Local access violation error */ + [[fallthrough]]; + case IBV_EVENT_COMM_EST: + /* Communication was established on a QP */ + [[fallthrough]]; + case IBV_EVENT_SQ_DRAINED: + /* Send Queue was drained of outstanding messages in progress */ + [[fallthrough]]; + case IBV_EVENT_PATH_MIG: + /* A connection has migrated to the alternate path */ + [[fallthrough]]; + case IBV_EVENT_PATH_MIG_ERR: + /* A connection failed to migrate to the alternate path */ + break; + /***********************SRQ events*******************/ + case IBV_EVENT_SRQ_ERR: + /* Error occurred on an SRQ */ + [[fallthrough]]; + case IBV_EVENT_SRQ_LIMIT_REACHED: + /* SRQ limit was reached */ + break; + /***********************Port events******************/ + case IBV_EVENT_PORT_ACTIVE: + /* Link became active on a port */ + [[fallthrough]]; + case IBV_EVENT_PORT_ERR: + /* Link became unavailable on a port */ + [[fallthrough]]; + case IBV_EVENT_LID_CHANGE: + /* LID was changed on a port */ + [[fallthrough]]; + case IBV_EVENT_PKEY_CHANGE: + /* P_Key table was changed on a port */ + [[fallthrough]]; + case IBV_EVENT_SM_CHANGE: + /* SM was changed on a port */ + [[fallthrough]]; + case IBV_EVENT_CLIENT_REREGISTER: + /* SM sent a CLIENT_REREGISTER request to a port */ + [[fallthrough]]; + case IBV_EVENT_GID_CHANGE: + /* GID table was changed on a port */ + break; + + /***********************CA events******************/ + //CA events: + case IBV_EVENT_DEVICE_FATAL: + /* CA is in FATAL state */ + lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt + << " evt: " << ibv_event_type_str(async_event.event_type) << dendl; + break; + default: + lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt + << " unknown event: " << async_event.event_type << dendl; + break; + } + ibv_ack_async_event(&async_event); + } +} + +void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) +{ + std::lock_guard l{lock}; + ib->post_chunk_to_pool(chunk); + perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); +} + +int RDMADispatcher::post_chunks_to_rq(int num, QueuePair *qp) +{ + std::lock_guard l{lock}; + return ib->post_chunks_to_rq(num, qp); +} + +void RDMADispatcher::polling() +{ + static int MAX_COMPLETIONS = 32; + ibv_wc wc[MAX_COMPLETIONS]; + + std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled; + std::vector<ibv_wc> tx_cqe; + ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl; + uint64_t last_inactive = Cycles::rdtsc(); + bool rearmed = false; + int r = 0; + + while (true) { + int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc); + if (tx_ret > 0) { + ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret + << " responses."<< dendl; + handle_tx_event(wc, tx_ret); + } + + int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc); + if (rx_ret > 0) { + ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret + << " responses."<< dendl; + handle_rx_event(wc, rx_ret); + } + + if (!tx_ret && !rx_ret) { + perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight); + // + // Clean up dead QPs when rx/tx CQs are in idle. The thing is that + // we can destroy QPs even earlier, just when beacon has been received, + // but we have two CQs (rx & tx), thus beacon WC can be poped from tx + // CQ before other WCs are fully consumed from rx CQ. For safety, we + // wait for beacon and then "no-events" from CQs. + // + // Calling size() on vector without locks is totally fine, since we + // use it as a hint (accuracy is not important here) + // + if (!dead_queue_pairs.empty()) { + decltype(dead_queue_pairs) dead_qps; + { + std::lock_guard l{lock}; + dead_queue_pairs.swap(dead_qps); + } + + for (auto& qp: dead_qps) { + perf_logger->dec(l_msgr_rdma_active_queue_pair); + ldout(cct, 10) << __func__ << " finally delete qp = " << qp << dendl; + delete qp; + } + } + + if (!num_qp_conn && done && dead_queue_pairs.empty()) + break; + + uint64_t now = Cycles::rdtsc(); + if (Cycles::to_microseconds(now - last_inactive) > cct->_conf->ms_async_rdma_polling_us) { + handle_async_event(); + if (!rearmed) { + // Clean up cq events after rearm notify ensure no new incoming event + // arrived between polling and rearm + tx_cq->rearm_notify(); + rx_cq->rearm_notify(); + rearmed = true; + continue; + } + + struct pollfd channel_poll[2]; + channel_poll[0].fd = tx_cc->get_fd(); + channel_poll[0].events = POLLIN; + channel_poll[0].revents = 0; + channel_poll[1].fd = rx_cc->get_fd(); + channel_poll[1].events = POLLIN; + channel_poll[1].revents = 0; + r = 0; + perf_logger->set(l_msgr_rdma_polling, 0); + while (!done && r == 0) { + r = TEMP_FAILURE_RETRY(poll(channel_poll, 2, 100)); + if (r < 0) { + r = -errno; + lderr(cct) << __func__ << " poll failed " << r << dendl; + ceph_abort(); + } + } + if (r > 0 && tx_cc->get_cq_event()) + ldout(cct, 20) << __func__ << " got tx cq event." << dendl; + if (r > 0 && rx_cc->get_cq_event()) + ldout(cct, 20) << __func__ << " got rx cq event." << dendl; + last_inactive = Cycles::rdtsc(); + perf_logger->set(l_msgr_rdma_polling, 1); + rearmed = false; + } + } + } +} + +void RDMADispatcher::notify_pending_workers() { + if (num_pending_workers) { + RDMAWorker *w = nullptr; + { + std::lock_guard l{w_lock}; + if (!pending_workers.empty()) { + w = pending_workers.front(); + pending_workers.pop_front(); + --num_pending_workers; + } + } + if (w) + w->notify_worker(); + } +} + +void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) +{ + std::lock_guard l{lock}; + ceph_assert(!qp_conns.count(qp->get_local_qp_number())); + qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi); + ++num_qp_conn; +} + +RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp) +{ + auto it = qp_conns.find(qp); + if (it == qp_conns.end()) + return nullptr; + if (it->second.first->is_dead()) + return nullptr; + return it->second.second; +} + +Infiniband::QueuePair* RDMADispatcher::get_qp_lockless(uint32_t qp) +{ + // Try to find the QP in qp_conns firstly. + auto it = qp_conns.find(qp); + if (it != qp_conns.end()) + return it->second.first; + + // Try again in dead_queue_pairs. + for (auto &i: dead_queue_pairs) + if (i->get_local_qp_number() == qp) + return i; + + return nullptr; +} + +Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp) +{ + std::lock_guard l{lock}; + return get_qp_lockless(qp); +} + +void RDMADispatcher::enqueue_dead_qp_lockless(uint32_t qpn) +{ + auto it = qp_conns.find(qpn); + if (it == qp_conns.end()) { + lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl; + return ; + } + QueuePair *qp = it->second.first; + dead_queue_pairs.push_back(qp); + qp_conns.erase(it); + --num_qp_conn; +} + +void RDMADispatcher::enqueue_dead_qp(uint32_t qpn) +{ + std::lock_guard l{lock}; + enqueue_dead_qp_lockless(qpn); +} + +void RDMADispatcher::schedule_qp_destroy(uint32_t qpn) +{ + std::lock_guard l{lock}; + auto it = qp_conns.find(qpn); + if (it == qp_conns.end()) { + lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl; + return; + } + QueuePair *qp = it->second.first; + if (qp->to_dead()) { + // + // Failed to switch to dead. This is abnormal, but we can't + // do anything, so just destroy QP. + // + dead_queue_pairs.push_back(qp); + qp_conns.erase(it); + --num_qp_conn; + } else { + // + // Successfully switched to dead, thus keep entry in the map. + // But only zero out socked pointer in order to return null from + // get_conn_lockless(); + it->second.second = nullptr; + } +} + +void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) +{ + std::vector<Chunk*> tx_chunks; + + for (int i = 0; i < n; ++i) { + ibv_wc* response = &cqe[i]; + + // If it's beacon WR, enqueue the QP to be destroyed later + if (response->wr_id == BEACON_WRID) { + enqueue_dead_qp(response->qp_num); + continue; + } + + ldout(cct, 20) << __func__ << " QP number: " << response->qp_num << " len: " << response->byte_len + << " status: " << ib->wc_status_to_string(response->status) << dendl; + + if (response->status != IBV_WC_SUCCESS) { + switch(response->status) { + case IBV_WC_RETRY_EXC_ERR: + { + perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors); + + ldout(cct, 1) << __func__ << " Responder ACK timeout, possible disconnect, or Remote QP in bad state " + << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) + << " WCE QP number " << response->qp_num << " Opcode " << response->opcode + << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; + + std::lock_guard l{lock}; + RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num); + if (conn) { + ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: " + << conn->get_peer_qpn() << dendl; + } + } + break; + case IBV_WC_WR_FLUSH_ERR: + { + perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors); + + std::lock_guard l{lock}; + QueuePair *qp = get_qp_lockless(response->qp_num); + if (qp) { + ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl; + } + if (qp && qp->is_dead()) { + ldout(cct, 20) << __func__ << " outstanding SQ WR is flushed into CQ since QueuePair is dead " << dendl; + } else { + lderr(cct) << __func__ << " Invalid/Unsupported request to consume outstanding SQ WR," + << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) + << " WCE QP number " << response->qp_num << " Opcode " << response->opcode + << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; + + RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num); + if (conn) { + ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: " + << conn->get_peer_qpn() << dendl; + } + } + } + break; + + default: + { + lderr(cct) << __func__ << " SQ WR return error," + << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) + << " WCE QP number " << response->qp_num << " Opcode " << response->opcode + << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; + + std::lock_guard l{lock}; + RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num); + if (conn && conn->is_connected()) { + ldout(cct, 20) << __func__ << " SQ WR return error Queue Pair error state is : " << conn->get_qp_state() + << " remote Queue Pair, qp number: " << conn->get_peer_qpn() << dendl; + conn->fault(); + } else { + ldout(cct, 1) << __func__ << " Disconnected, qp_num = " << response->qp_num << " discard event" << dendl; + } + } + break; + } + } + + auto chunk = reinterpret_cast<Chunk *>(response->wr_id); + //TX completion may come either from + // 1) regular send message, WCE wr_id points to chunk + // 2) 'fin' message, wr_id points to the QP + if (ib->get_memory_manager()->is_valid_chunk(chunk)) { + tx_chunks.push_back(chunk); + } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) { + ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl; + } else { + ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl; + ceph_abort(); + } + } + + perf_logger->inc(l_msgr_rdma_tx_total_wc, n); + post_tx_buffer(tx_chunks); +} + +/** + * Add the given Chunks to the given free queue. + * + * \param[in] chunks + * The Chunks to enqueue. + * \return + * 0 if success or -1 for failure + */ +void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks) +{ + if (chunks.empty()) + return ; + + inflight -= chunks.size(); + ib->get_memory_manager()->return_tx(chunks); + ldout(cct, 30) << __func__ << " release " << chunks.size() + << " chunks, inflight " << inflight << dendl; + notify_pending_workers(); +} + +void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number) +{ + perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number); + perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number); + + std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled; + std::lock_guard l{lock};//make sure connected socket alive when pass wc + + for (int i = 0; i < rx_number; ++i) { + ibv_wc* response = &cqe[i]; + Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id); + RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num); + QueuePair *qp = get_qp_lockless(response->qp_num); + + switch (response->status) { + case IBV_WC_SUCCESS: + ceph_assert(response->opcode == IBV_WC_RECV); + if (!conn) { + ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk 0x" + << std::hex << chunk << " will be back." << std::dec << dendl; + ib->post_chunk_to_pool(chunk); + perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); + } else { + conn->post_chunks_to_rq(1); + polled[conn].push_back(*response); + + if (qp != nullptr && !qp->get_srq()) { + qp->remove_rq_wr(chunk); + chunk->clear_qp(); + } + } + break; + + case IBV_WC_WR_FLUSH_ERR: + perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); + + if (qp) { + ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl; + } + if (qp && qp->is_dead()) { + ldout(cct, 20) << __func__ << " outstanding RQ WR is flushed into CQ since QueuePair is dead " << dendl; + } else { + ldout(cct, 1) << __func__ << " RQ WR return error," + << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) + << " WCE QP number " << response->qp_num << " Opcode " << response->opcode + << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; + if (conn) { + ldout(cct, 1) << __func__ << " RQ WR return error, remote Queue Pair, qp number: " + << conn->get_peer_qpn() << dendl; + } + } + + ib->post_chunk_to_pool(chunk); + perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); + break; + + default: + perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); + + ldout(cct, 1) << __func__ << " RQ WR return error," + << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status) + << " WCE QP number " << response->qp_num << " Opcode " << response->opcode + << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl; + if (conn && conn->is_connected()) + conn->fault(); + + ib->post_chunk_to_pool(chunk); + perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); + break; + } + } + + for (auto &i : polled) + i.first->pass_wc(std::move(i.second)); + polled.clear(); +} + +RDMAWorker::RDMAWorker(CephContext *c, unsigned worker_id) + : Worker(c, worker_id), + tx_handler(new C_handle_cq_tx(this)) +{ + // initialize perf_logger + char name[128]; + sprintf(name, "AsyncMessenger::RDMAWorker-%u", id); + PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last); + + plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer"); + plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer"); + plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted"); + + plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted"); + plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES)); + plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted"); + plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES)); + plb.add_u64_counter(l_msgr_rdma_pending_sent_conns, "pending_sent_conns", "The count of pending sent conns"); + + perf_logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(perf_logger); +} + +RDMAWorker::~RDMAWorker() +{ + delete tx_handler; +} + +void RDMAWorker::initialize() +{ + ceph_assert(dispatcher); +} + +int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot, + const SocketOptions &opt,ServerSocket *sock) +{ + ib->init(); + dispatcher->polling_start(); + + RDMAServerSocketImpl *p; + if (cct->_conf->ms_async_rdma_type == "iwarp") { + p = new RDMAIWARPServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot); + } else { + p = new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot); + } + int r = p->listen(sa, opt); + if (r < 0) { + delete p; + return r; + } + + *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p)); + return 0; +} + +int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) +{ + ib->init(); + dispatcher->polling_start(); + + RDMAConnectedSocketImpl* p; + if (cct->_conf->ms_async_rdma_type == "iwarp") { + p = new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, this); + } else { + p = new RDMAConnectedSocketImpl(cct, ib, dispatcher, this); + } + int r = p->try_connect(addr, opts); + + if (r < 0) { + ldout(cct, 1) << __func__ << " try connecting failed." << dendl; + delete p; + return r; + } + std::unique_ptr<RDMAConnectedSocketImpl> csi(p); + *socket = ConnectedSocket(std::move(csi)); + return 0; +} + +int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes) +{ + ceph_assert(center.in_thread()); + int r = ib->get_tx_buffers(c, bytes); + size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r; + ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl; + dispatcher->inflight += r; + if (got >= bytes) + return r; + + if (o) { + if (!o->is_pending()) { + pending_sent_conns.push_back(o); + perf_logger->inc(l_msgr_rdma_pending_sent_conns, 1); + o->set_pending(1); + } + dispatcher->make_pending_worker(this); + } + return r; +} + + +void RDMAWorker::handle_pending_message() +{ + ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl; + while (!pending_sent_conns.empty()) { + RDMAConnectedSocketImpl *o = pending_sent_conns.front(); + pending_sent_conns.pop_front(); + ssize_t r = o->submit(false); + ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl; + if (r < 0) { + if (r == -EAGAIN) { + pending_sent_conns.push_back(o); + dispatcher->make_pending_worker(this); + return ; + } + o->fault(); + } + o->set_pending(0); + perf_logger->dec(l_msgr_rdma_pending_sent_conns, 1); + } + dispatcher->notify_pending_workers(); +} + +RDMAStack::RDMAStack(CephContext *cct) + : NetworkStack(cct), ib(std::make_shared<Infiniband>(cct)), + rdma_dispatcher(std::make_shared<RDMADispatcher>(cct, ib)) +{ + ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; + ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl; +} + +RDMAStack::~RDMAStack() +{ + if (cct->_conf->ms_async_rdma_enable_hugepage) { + unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction + } +} + +Worker* RDMAStack::create_worker(CephContext *c, unsigned worker_id) +{ + auto w = new RDMAWorker(c, worker_id); + w->set_dispatcher(rdma_dispatcher); + w->set_ib(ib); + return w; +} + +void RDMAStack::spawn_worker(std::function<void ()> &&func) +{ + threads.emplace_back(std::move(func)); +} + +void RDMAStack::join_worker(unsigned i) +{ + ceph_assert(threads.size() > i && threads[i].joinable()); + threads[i].join(); +} diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h new file mode 100644 index 000000000..a36fd5fb7 --- /dev/null +++ b/src/msg/async/rdma/RDMAStack.h @@ -0,0 +1,344 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY <haomai@xsky.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_RDMASTACK_H +#define CEPH_MSG_RDMASTACK_H + +#include <sys/eventfd.h> + +#include <list> +#include <vector> +#include <thread> + +#include "common/ceph_context.h" +#include "common/debug.h" +#include "common/errno.h" +#include "msg/async/Stack.h" +#include "Infiniband.h" + +class RDMAConnectedSocketImpl; +class RDMAServerSocketImpl; +class RDMAStack; +class RDMAWorker; + +class RDMADispatcher { + typedef Infiniband::MemoryManager::Chunk Chunk; + typedef Infiniband::QueuePair QueuePair; + + std::thread t; + CephContext *cct; + std::shared_ptr<Infiniband> ib; + Infiniband::CompletionQueue* tx_cq = nullptr; + Infiniband::CompletionQueue* rx_cq = nullptr; + Infiniband::CompletionChannel *tx_cc = nullptr, *rx_cc = nullptr; + bool done = false; + std::atomic<uint64_t> num_qp_conn = {0}; + // protect `qp_conns`, `dead_queue_pairs` + ceph::mutex lock = ceph::make_mutex("RDMADispatcher::lock"); + // qp_num -> InfRcConnection + // The main usage of `qp_conns` is looking up connection by qp_num, + // so the lifecycle of element in `qp_conns` is the lifecycle of qp. + //// make qp queue into dead state + /** + * 1. Connection call mark_down + * 2. Move the Queue Pair into the Error state(QueuePair::to_dead) + * 3. Post a beacon + * 4. Wait for beacon which indicates queues are drained + * 5. Destroy the QP by calling ibv_destroy_qp() + * + * @param qp The qp needed to dead + */ + ceph::unordered_map<uint32_t, std::pair<QueuePair*, RDMAConnectedSocketImpl*> > qp_conns; + + /// if a queue pair is closed when transmit buffers are active + /// on it, the transmit buffers never get returned via tx_cq. To + /// work around this problem, don't delete queue pairs immediately. Instead, + /// save them in this vector and delete them at a safe time, when there are + /// no outstanding transmit buffers to be lost. + std::vector<QueuePair*> dead_queue_pairs; + + std::atomic<uint64_t> num_pending_workers = {0}; + // protect pending workers + ceph::mutex w_lock = + ceph::make_mutex("RDMADispatcher::for worker pending list"); + // fixme: lockfree + std::list<RDMAWorker*> pending_workers; + void enqueue_dead_qp_lockless(uint32_t qp); + void enqueue_dead_qp(uint32_t qpn); + + public: + PerfCounters *perf_logger; + + explicit RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib); + virtual ~RDMADispatcher(); + void handle_async_event(); + + void polling_start(); + void polling_stop(); + void polling(); + void register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi); + void make_pending_worker(RDMAWorker* w) { + std::lock_guard l{w_lock}; + auto it = std::find(pending_workers.begin(), pending_workers.end(), w); + if (it != pending_workers.end()) + return; + pending_workers.push_back(w); + ++num_pending_workers; + } + RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp); + QueuePair* get_qp_lockless(uint32_t qp); + QueuePair* get_qp(uint32_t qp); + void schedule_qp_destroy(uint32_t qp); + Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; } + Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; } + void notify_pending_workers(); + void handle_tx_event(ibv_wc *cqe, int n); + void post_tx_buffer(std::vector<Chunk*> &chunks); + void handle_rx_event(ibv_wc *cqe, int rx_number); + + std::atomic<uint64_t> inflight = {0}; + + void post_chunk_to_pool(Chunk* chunk); + int post_chunks_to_rq(int num, QueuePair *qp = nullptr); +}; + +class RDMAWorker : public Worker { + typedef Infiniband::CompletionQueue CompletionQueue; + typedef Infiniband::CompletionChannel CompletionChannel; + typedef Infiniband::MemoryManager::Chunk Chunk; + typedef Infiniband::MemoryManager MemoryManager; + typedef std::vector<Chunk*>::iterator ChunkIter; + std::shared_ptr<Infiniband> ib; + EventCallbackRef tx_handler; + std::list<RDMAConnectedSocketImpl*> pending_sent_conns; + std::shared_ptr<RDMADispatcher> dispatcher; + ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock"); + + class C_handle_cq_tx : public EventCallback { + RDMAWorker *worker; + public: + explicit C_handle_cq_tx(RDMAWorker *w): worker(w) {} + void do_request(uint64_t fd) { + worker->handle_pending_message(); + } + }; + + public: + PerfCounters *perf_logger; + explicit RDMAWorker(CephContext *c, unsigned i); + virtual ~RDMAWorker(); + virtual int listen(entity_addr_t &addr, + unsigned addr_slot, + const SocketOptions &opts, ServerSocket *) override; + virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; + virtual void initialize() override; + int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes); + void remove_pending_conn(RDMAConnectedSocketImpl *o) { + ceph_assert(center.in_thread()); + pending_sent_conns.remove(o); + } + void handle_pending_message(); + void set_dispatcher(std::shared_ptr<RDMADispatcher>& dispatcher) { this->dispatcher = dispatcher; } + void set_ib(std::shared_ptr<Infiniband> &ib) {this->ib = ib;} + void notify_worker() { + center.dispatch_event_external(tx_handler); + } +}; + +struct RDMACMInfo { + RDMACMInfo(rdma_cm_id *cid, rdma_event_channel *cm_channel_, uint32_t qp_num_) + : cm_id(cid), cm_channel(cm_channel_), qp_num(qp_num_) {} + rdma_cm_id *cm_id; + rdma_event_channel *cm_channel; + uint32_t qp_num; +}; + +class RDMAConnectedSocketImpl : public ConnectedSocketImpl { + public: + typedef Infiniband::MemoryManager::Chunk Chunk; + typedef Infiniband::CompletionChannel CompletionChannel; + typedef Infiniband::CompletionQueue CompletionQueue; + + protected: + CephContext *cct; + Infiniband::QueuePair *qp; + uint32_t peer_qpn = 0; + uint32_t local_qpn = 0; + int connected; + int error; + std::shared_ptr<Infiniband> ib; + std::shared_ptr<RDMADispatcher> dispatcher; + RDMAWorker* worker; + std::vector<Chunk*> buffers; + int notify_fd = -1; + ceph::buffer::list pending_bl; + + ceph::mutex lock = ceph::make_mutex("RDMAConnectedSocketImpl::lock"); + std::vector<ibv_wc> wc; + bool is_server; + EventCallbackRef read_handler; + EventCallbackRef established_handler; + int tcp_fd = -1; + bool active;// qp is active ? + bool pending; + int post_backlog = 0; + + void notify(); + void buffer_prefetch(void); + ssize_t read_buffers(char* buf, size_t len); + int post_work_request(std::vector<Chunk*>&); + size_t tx_copy_chunk(std::vector<Chunk*> &tx_buffers, size_t req_copy_len, + decltype(std::cbegin(pending_bl.buffers()))& start, + const decltype(std::cbegin(pending_bl.buffers()))& end); + + public: + RDMAConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w); + virtual ~RDMAConnectedSocketImpl(); + + void pass_wc(std::vector<ibv_wc> &&v); + void get_wc(std::vector<ibv_wc> &w); + virtual int is_connected() override { return connected; } + + virtual ssize_t read(char* buf, size_t len) override; + virtual ssize_t send(ceph::buffer::list &bl, bool more) override; + virtual void shutdown() override; + virtual void close() override; + virtual int fd() const override { return notify_fd; } + virtual void set_priority(int sd, int prio, int domain) override; + void fault(); + const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); } + uint32_t get_peer_qpn () const { return peer_qpn; } + uint32_t get_local_qpn () const { return local_qpn; } + Infiniband::QueuePair* get_qp () const { return qp; } + ssize_t submit(bool more); + int activate(); + void fin(); + void handle_connection(); + int handle_connection_established(bool need_set_fault = true); + void cleanup(); + void set_accept_fd(int sd); + virtual int try_connect(const entity_addr_t&, const SocketOptions &opt); + bool is_pending() {return pending;} + void set_pending(bool val) {pending = val;} + void post_chunks_to_rq(int num); + void update_post_backlog(); +}; + +enum RDMA_CM_STATUS { + IDLE = 1, + RDMA_ID_CREATED, + CHANNEL_FD_CREATED, + RESOURCE_ALLOCATED, + ADDR_RESOLVED, + ROUTE_RESOLVED, + CONNECTED, + DISCONNECTED, + ERROR +}; + +class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl { + public: + RDMAIWARPConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, + RDMAWorker *w, RDMACMInfo *info = nullptr); + ~RDMAIWARPConnectedSocketImpl(); + virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override; + virtual void close() override; + virtual void shutdown() override; + virtual void handle_cm_connection(); + void activate(); + int alloc_resource(); + void close_notify(); + + private: + rdma_cm_id *cm_id = nullptr; + rdma_event_channel *cm_channel = nullptr; + EventCallbackRef cm_con_handler; + std::mutex close_mtx; + std::condition_variable close_condition; + bool closed = false; + RDMA_CM_STATUS status = IDLE; + + + class C_handle_cm_connection : public EventCallback { + RDMAIWARPConnectedSocketImpl *csi; + public: + C_handle_cm_connection(RDMAIWARPConnectedSocketImpl *w): csi(w) {} + void do_request(uint64_t fd) { + csi->handle_cm_connection(); + } + }; +}; + +class RDMAServerSocketImpl : public ServerSocketImpl { + protected: + CephContext *cct; + ceph::NetHandler net; + int server_setup_socket; + std::shared_ptr<Infiniband> ib; + std::shared_ptr<RDMADispatcher> dispatcher; + RDMAWorker *worker; + entity_addr_t sa; + + public: + RDMAServerSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, + RDMAWorker *w, entity_addr_t& a, unsigned slot); + + virtual int listen(entity_addr_t &sa, const SocketOptions &opt); + virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; + virtual void abort_accept() override; + virtual int fd() const override { return server_setup_socket; } +}; + +class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl { + public: + RDMAIWARPServerSocketImpl( + CephContext *cct, std::shared_ptr<Infiniband>& ib, + std::shared_ptr<RDMADispatcher>& rdma_dispatcher, + RDMAWorker* w, entity_addr_t& addr, unsigned addr_slot); + virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override; + virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; + virtual void abort_accept() override; + private: + rdma_cm_id *cm_id = nullptr; + rdma_event_channel *cm_channel = nullptr; +}; + +class RDMAStack : public NetworkStack { + std::vector<std::thread> threads; + PerfCounters *perf_counter; + std::shared_ptr<Infiniband> ib; + std::shared_ptr<RDMADispatcher> rdma_dispatcher; + + std::atomic<bool> fork_finished = {false}; + + virtual Worker* create_worker(CephContext *c, unsigned worker_id) override; + + public: + explicit RDMAStack(CephContext *cct); + virtual ~RDMAStack(); + virtual bool nonblock_connect_need_writable_event() const override { return false; } + + virtual void spawn_worker(std::function<void ()> &&func) override; + virtual void join_worker(unsigned i) override; + virtual bool is_ready() override { return fork_finished.load(); }; + virtual void ready() override { fork_finished = true; }; +}; + + +#endif |