summaryrefslogtreecommitdiffstats
path: root/src/msg/async/rdma/Infiniband.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/msg/async/rdma/Infiniband.cc
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/msg/async/rdma/Infiniband.cc')
-rw-r--r--src/msg/async/rdma/Infiniband.cc1234
1 files changed, 1234 insertions, 0 deletions
diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc
new file mode 100644
index 00000000..34299975
--- /dev/null
+++ b/src/msg/async/rdma/Infiniband.cc
@@ -0,0 +1,1234 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Infiniband.h"
+#include "common/errno.h"
+#include "common/debug.h"
+#include "RDMAStack.h"
+#include <sys/time.h>
+#include <sys/resource.h>
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << "Infiniband "
+
+static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
+static const uint32_t MAX_INLINE_DATA = 0;
+static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
+static const uint32_t CQ_DEPTH = 30000;
+
+Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr), gid_idx(0)
+{
+#ifdef HAVE_IBV_EXP
+ union ibv_gid cgid;
+ struct ibv_exp_gid_attr gid_attr;
+ bool malformed = false;
+
+ ldout(cct,1) << __func__ << " using experimental verbs for gid" << dendl;
+ int r = ibv_query_port(ctxt, port_num, port_attr);
+ if (r == -1) {
+ lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+
+ lid = port_attr->lid;
+
+ // search for requested GID in GIDs table
+ ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
+ << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
+ r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
+ "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
+ ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
+ &cgid.raw[ 0], &cgid.raw[ 1],
+ &cgid.raw[ 2], &cgid.raw[ 3],
+ &cgid.raw[ 4], &cgid.raw[ 5],
+ &cgid.raw[ 6], &cgid.raw[ 7],
+ &cgid.raw[ 8], &cgid.raw[ 9],
+ &cgid.raw[10], &cgid.raw[11],
+ &cgid.raw[12], &cgid.raw[13],
+ &cgid.raw[14], &cgid.raw[15]);
+
+ if (r != 16) {
+ ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
+ malformed = true;
+ }
+
+ gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
+
+ for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) {
+ r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
+ if (r) {
+ lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
+ if (r) {
+ lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+
+ if (malformed) break; // stay with gid_idx=0
+ if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
+ (memcmp(&gid, &cgid, 16) == 0) ) {
+ ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
+ break;
+ }
+ }
+
+ if (gid_idx == port_attr->gid_tbl_len) {
+ lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
+ ceph_abort();
+ }
+#else
+ int r = ibv_query_port(ctxt, port_num, port_attr);
+ if (r == -1) {
+ lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+
+ lid = port_attr->lid;
+ r = ibv_query_gid(ctxt, port_num, 0, &gid);
+ if (r) {
+ lderr(cct) << __func__ << " query gid failed " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+#endif
+}
+
+
+Device::Device(CephContext *cct, ibv_device* d, struct ibv_context *dc)
+ : device(d), device_attr(new ibv_device_attr), active_port(nullptr)
+{
+ if (device == NULL) {
+ lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ name = ibv_get_device_name(device);
+ if (cct->_conf->ms_async_rdma_cm) {
+ ctxt = dc;
+ } else {
+ ctxt = ibv_open_device(device);
+ }
+ if (ctxt == NULL) {
+ lderr(cct) << __func__ << " open rdma device failed. " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ int r = ibv_query_device(ctxt, device_attr);
+ if (r == -1) {
+ lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+}
+
+void Device::binding_port(CephContext *cct, int port_num) {
+ port_cnt = device_attr->phys_port_cnt;
+ for (uint8_t i = 0; i < port_cnt; ++i) {
+ Port *port = new Port(cct, ctxt, i+1);
+ if (i + 1 == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
+ active_port = port;
+ ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
+ break;
+ } else {
+ ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
+ }
+ delete port;
+ }
+ if (nullptr == active_port) {
+ lderr(cct) << __func__ << " port not found" << dendl;
+ ceph_assert(active_port);
+ }
+}
+
+
+Infiniband::QueuePair::QueuePair(
+ CephContext *c, Infiniband& infiniband, ibv_qp_type type,
+ int port, ibv_srq *srq,
+ Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
+ uint32_t tx_queue_len, uint32_t rx_queue_len, struct rdma_cm_id *cid, uint32_t q_key)
+: cct(c), infiniband(infiniband),
+ type(type),
+ ctxt(infiniband.device->ctxt),
+ ib_physical_port(port),
+ pd(infiniband.pd->pd),
+ srq(srq),
+ qp(NULL),
+ cm_id(cid),
+ txcq(txcq),
+ rxcq(rxcq),
+ initial_psn(0),
+ max_send_wr(tx_queue_len),
+ max_recv_wr(rx_queue_len),
+ q_key(q_key),
+ dead(false)
+{
+ initial_psn = lrand48() & 0xffffff;
+ if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
+ lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+ pd = infiniband.pd->pd;
+}
+
+int Infiniband::QueuePair::init()
+{
+ ldout(cct, 20) << __func__ << " started." << dendl;
+ ibv_qp_init_attr qpia;
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(&qpia, 0, sizeof(qpia));
+ qpia.send_cq = txcq->get_cq();
+ qpia.recv_cq = rxcq->get_cq();
+ if (srq) {
+ qpia.srq = srq; // use the same shared receive queue
+ } else {
+ qpia.cap.max_recv_wr = max_recv_wr;
+ qpia.cap.max_recv_sge = 1;
+ }
+ qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests
+ qpia.cap.max_send_sge = 1; // max send scatter-gather elements
+ qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q
+ qpia.qp_type = type; // RC, UC, UD, or XRC
+ qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs
+
+ if (!cct->_conf->ms_async_rdma_cm) {
+ qp = ibv_create_qp(pd, &qpia);
+ if (qp == NULL) {
+ lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
+ if (errno == ENOMEM) {
+ lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, "
+ " ms_async_rdma_send_buffers or"
+ " ms_async_rdma_buffer_size" << dendl;
+ }
+ return -1;
+ }
+ } else {
+ ceph_assert(cm_id->verbs == pd->context);
+ if (rdma_create_qp(cm_id, pd, &qpia)) {
+ lderr(cct) << __func__ << " failed to create queue pair with rdmacm library"
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ qp = cm_id->qp;
+ }
+ ldout(cct, 20) << __func__ << " successfully create queue pair: "
+ << "qp=" << qp << dendl;
+
+ if (cct->_conf->ms_async_rdma_cm)
+ return 0;
+
+ // move from RESET to INIT state
+ ibv_qp_attr qpa;
+ memset(&qpa, 0, sizeof(qpa));
+ qpa.qp_state = IBV_QPS_INIT;
+ qpa.pkey_index = 0;
+ qpa.port_num = (uint8_t)(ib_physical_port);
+ qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
+ qpa.qkey = q_key;
+
+ int mask = IBV_QP_STATE | IBV_QP_PORT;
+ switch (type) {
+ case IBV_QPT_RC:
+ mask |= IBV_QP_ACCESS_FLAGS;
+ mask |= IBV_QP_PKEY_INDEX;
+ break;
+ case IBV_QPT_UD:
+ mask |= IBV_QP_QKEY;
+ mask |= IBV_QP_PKEY_INDEX;
+ break;
+ case IBV_QPT_RAW_PACKET:
+ break;
+ default:
+ ceph_abort();
+ }
+
+ int ret = ibv_modify_qp(qp, &qpa, mask);
+ if (ret) {
+ ibv_destroy_qp(qp);
+ lderr(cct) << __func__ << " failed to transition to INIT state: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:"
+ << " qp=" << qp << dendl;
+ return 0;
+}
+
+/**
+ * Change RC QueuePair into the ERROR state. This is necessary modify
+ * the Queue Pair into the Error state and poll all of the relevant
+ * Work Completions prior to destroying a Queue Pair.
+ * Since destroying a Queue Pair does not guarantee that its Work
+ * Completions are removed from the CQ upon destruction. Even if the
+ * Work Completions are already in the CQ, it might not be possible to
+ * retrieve them. If the Queue Pair is associated with an SRQ, it is
+ * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED
+ *
+ * \return
+ * -errno if the QueuePair can't switch to ERROR
+ * 0 for success.
+ */
+int Infiniband::QueuePair::to_dead()
+{
+ if (dead)
+ return 0;
+ ibv_qp_attr qpa;
+ memset(&qpa, 0, sizeof(qpa));
+ qpa.qp_state = IBV_QPS_ERR;
+
+ int mask = IBV_QP_STATE;
+ int ret = ibv_modify_qp(qp, &qpa, mask);
+ if (ret) {
+ lderr(cct) << __func__ << " failed to transition to ERROR state: "
+ << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+ dead = true;
+ return ret;
+}
+
+int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const
+{
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to query qp: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+
+ if (rqp)
+ *rqp = qpa.dest_qp_num;
+ return 0;
+}
+
+/**
+ * Get the remote infiniband address for this QueuePair, as set in #plumb().
+ * LIDs are "local IDs" in infiniband terminology. They are short, locally
+ * routable addresses.
+ */
+int Infiniband::QueuePair::get_remote_lid(uint16_t *lid) const
+{
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to query qp: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+
+ if (lid)
+ *lid = qpa.ah_attr.dlid;
+ return 0;
+}
+
+/**
+ * Get the state of a QueuePair.
+ */
+int Infiniband::QueuePair::get_state() const
+{
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to get state: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ return qpa.qp_state;
+}
+
+/**
+ * Return true if the queue pair is in an error state, false otherwise.
+ */
+bool Infiniband::QueuePair::is_error() const
+{
+ ibv_qp_attr qpa;
+ ibv_qp_init_attr qpia;
+
+ int r = ibv_query_qp(qp, &qpa, -1, &qpia);
+ if (r) {
+ lderr(cct) << __func__ << " failed to get state: "
+ << cpp_strerror(errno) << dendl;
+ return true;
+ }
+ return qpa.cur_qp_state == IBV_QPS_ERR;
+}
+
+
+Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib)
+ : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
+{
+}
+
+Infiniband::CompletionChannel::~CompletionChannel()
+{
+ if (channel) {
+ int r = ibv_destroy_comp_channel(channel);
+ if (r < 0)
+ lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
+ ceph_assert(r == 0);
+ }
+}
+
+int Infiniband::CompletionChannel::init()
+{
+ ldout(cct, 20) << __func__ << " started." << dendl;
+ channel = ibv_create_comp_channel(infiniband.device->ctxt);
+ if (!channel) {
+ lderr(cct) << __func__ << " failed to create receive completion channel: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ int rc = NetHandler(cct).set_nonblock(channel->fd);
+ if (rc < 0) {
+ ibv_destroy_comp_channel(channel);
+ return -1;
+ }
+ return 0;
+}
+
+void Infiniband::CompletionChannel::ack_events()
+{
+ ibv_ack_cq_events(cq, cq_events_that_need_ack);
+ cq_events_that_need_ack = 0;
+}
+
+bool Infiniband::CompletionChannel::get_cq_event()
+{
+ ibv_cq *cq = NULL;
+ void *ev_ctx;
+ if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {
+ if (errno != EAGAIN && errno != EINTR)
+ lderr(cct) << __func__ << " failed to retrieve CQ event: "
+ << cpp_strerror(errno) << dendl;
+ return false;
+ }
+
+ /* accumulate number of cq events that need to
+ * * be acked, and periodically ack them
+ * */
+ if (++cq_events_that_need_ack == MAX_ACK_EVENT) {
+ ldout(cct, 20) << __func__ << " ack aq events." << dendl;
+ ibv_ack_cq_events(cq, MAX_ACK_EVENT);
+ cq_events_that_need_ack = 0;
+ }
+
+ return true;
+}
+
+
+Infiniband::CompletionQueue::~CompletionQueue()
+{
+ if (cq) {
+ int r = ibv_destroy_cq(cq);
+ if (r < 0)
+ lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
+ ceph_assert(r == 0);
+ }
+}
+
+int Infiniband::CompletionQueue::init()
+{
+ cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
+ if (!cq) {
+ lderr(cct) << __func__ << " failed to create receive completion queue: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+
+ if (ibv_req_notify_cq(cq, 0)) {
+ lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;
+ ibv_destroy_cq(cq);
+ cq = nullptr;
+ return -1;
+ }
+
+ channel->bind_cq(cq);
+ ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;
+ return 0;
+}
+
+int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)
+{
+ ldout(cct, 20) << __func__ << " started." << dendl;
+ int r = ibv_req_notify_cq(cq, 0);
+ if (r < 0)
+ lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;
+ return r;
+}
+
+int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {
+ int r = ibv_poll_cq(cq, num_entries, ret_wc_array);
+ if (r < 0) {
+ lderr(cct) << __func__ << " poll_completion_queue occur met error: "
+ << cpp_strerror(errno) << dendl;
+ return -1;
+ }
+ return r;
+}
+
+
+Infiniband::ProtectionDomain::ProtectionDomain(CephContext *cct, Device *device)
+ : pd(ibv_alloc_pd(device->ctxt))
+{
+ if (pd == NULL) {
+ lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;
+ ceph_abort();
+ }
+}
+
+Infiniband::ProtectionDomain::~ProtectionDomain()
+{
+ ibv_dealloc_pd(pd);
+}
+
+
+Infiniband::MemoryManager::Chunk::Chunk(ibv_mr* m, uint32_t len, char* b)
+ : mr(m), bytes(len), offset(0), buffer(b)
+{
+}
+
+Infiniband::MemoryManager::Chunk::~Chunk()
+{
+}
+
+void Infiniband::MemoryManager::Chunk::set_offset(uint32_t o)
+{
+ offset = o;
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::get_offset()
+{
+ return offset;
+}
+
+void Infiniband::MemoryManager::Chunk::set_bound(uint32_t b)
+{
+ bound = b;
+}
+
+void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b)
+{
+ offset = 0;
+ bound = b;
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::get_bound()
+{
+ return bound;
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::read(char* buf, uint32_t len)
+{
+ uint32_t left = bound - offset;
+ if (left >= len) {
+ memcpy(buf, buffer+offset, len);
+ offset += len;
+ return len;
+ } else {
+ memcpy(buf, buffer+offset, left);
+ offset = 0;
+ bound = 0;
+ return left;
+ }
+}
+
+uint32_t Infiniband::MemoryManager::Chunk::write(char* buf, uint32_t len)
+{
+ uint32_t left = bytes - offset;
+ if (left >= len) {
+ memcpy(buffer+offset, buf, len);
+ offset += len;
+ return len;
+ } else {
+ memcpy(buffer+offset, buf, left);
+ offset = bytes;
+ return left;
+ }
+}
+
+bool Infiniband::MemoryManager::Chunk::full()
+{
+ return offset == bytes;
+}
+
+bool Infiniband::MemoryManager::Chunk::over()
+{
+ return Infiniband::MemoryManager::Chunk::offset == bound;
+}
+
+void Infiniband::MemoryManager::Chunk::clear()
+{
+ offset = 0;
+ bound = 0;
+}
+
+Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
+ : manager(m), buffer_size(s), lock("cluster_lock")
+{
+}
+
+Infiniband::MemoryManager::Cluster::~Cluster()
+{
+ int r = ibv_dereg_mr(chunk_base->mr);
+ ceph_assert(r == 0);
+ const auto chunk_end = chunk_base + num_chunk;
+ for (auto chunk = chunk_base; chunk != chunk_end; chunk++) {
+ chunk->~Chunk();
+ }
+
+ ::free(chunk_base);
+ manager.free(base);
+}
+
+int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
+{
+ ceph_assert(!base);
+ num_chunk = num;
+ uint32_t bytes = buffer_size * num;
+
+ base = (char*)manager.malloc(bytes);
+ end = base + bytes;
+ ceph_assert(base);
+ chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num));
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(static_cast<void*>(chunk_base), 0, sizeof(Chunk) * num);
+ free_chunks.reserve(num);
+ ibv_mr* m = ibv_reg_mr(manager.pd->pd, base, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
+ ceph_assert(m);
+ Chunk* chunk = chunk_base;
+ for (uint32_t offset = 0; offset < bytes; offset += buffer_size){
+ new(chunk) Chunk(m, buffer_size, base+offset);
+ free_chunks.push_back(chunk);
+ chunk++;
+ }
+ return 0;
+}
+
+void Infiniband::MemoryManager::Cluster::take_back(std::vector<Chunk*> &ck)
+{
+ Mutex::Locker l(lock);
+ for (auto c : ck) {
+ c->clear();
+ free_chunks.push_back(c);
+ }
+}
+
+int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks, size_t bytes)
+{
+ uint32_t num = bytes / buffer_size + 1;
+ if (bytes % buffer_size == 0)
+ --num;
+ int r = num;
+ Mutex::Locker l(lock);
+ if (free_chunks.empty())
+ return 0;
+ if (!bytes) {
+ r = free_chunks.size();
+ for (auto c : free_chunks)
+ chunks.push_back(c);
+ free_chunks.clear();
+ return r;
+ }
+ if (free_chunks.size() < num) {
+ num = free_chunks.size();
+ r = num;
+ }
+ for (uint32_t i = 0; i < num; ++i) {
+ chunks.push_back(free_chunks.back());
+ free_chunks.pop_back();
+ }
+ return r;
+}
+
+bool Infiniband::MemoryManager::MemPoolContext::can_alloc(unsigned nbufs)
+{
+ /* unlimited */
+ if (manager->cct->_conf->ms_async_rdma_receive_buffers <= 0)
+ return true;
+
+ if (n_bufs_allocated + nbufs > (unsigned)manager->cct->_conf->ms_async_rdma_receive_buffers) {
+ lderr(manager->cct) << __func__ << " WARNING: OUT OF RX BUFFERS: allocated: " <<
+ n_bufs_allocated << " requested: " << nbufs <<
+ " limit: " << manager->cct->_conf->ms_async_rdma_receive_buffers << dendl;
+ return false;
+ }
+
+ return true;
+}
+
+void Infiniband::MemoryManager::MemPoolContext::set_stat_logger(PerfCounters *logger) {
+ perf_logger = logger;
+ if (perf_logger != nullptr)
+ perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated);
+}
+
+void Infiniband::MemoryManager::MemPoolContext::update_stats(int nbufs)
+{
+ n_bufs_allocated += nbufs;
+
+ if (!perf_logger)
+ return;
+
+ if (nbufs > 0) {
+ perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);
+ } else {
+ perf_logger->dec(l_msgr_rdma_rx_bufs_total, -nbufs);
+ }
+}
+
+void *Infiniband::MemoryManager::mem_pool::slow_malloc()
+{
+ void *p;
+
+ Mutex::Locker l(PoolAllocator::lock);
+ PoolAllocator::g_ctx = ctx;
+ // this will trigger pool expansion via PoolAllocator::malloc()
+ p = boost::pool<PoolAllocator>::malloc();
+ PoolAllocator::g_ctx = nullptr;
+ return p;
+}
+
+Infiniband::MemoryManager::MemPoolContext *Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr;
+Mutex Infiniband::MemoryManager::PoolAllocator::lock("pool-alloc-lock");
+
+// lock is taken by mem_pool::slow_malloc()
+char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type bytes)
+{
+ mem_info *m;
+ Chunk *ch;
+ size_t rx_buf_size;
+ unsigned nbufs;
+ MemoryManager *manager;
+ CephContext *cct;
+
+ ceph_assert(g_ctx);
+ manager = g_ctx->manager;
+ cct = manager->cct;
+ rx_buf_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size;
+ nbufs = bytes/rx_buf_size;
+
+ if (!g_ctx->can_alloc(nbufs))
+ return NULL;
+
+ m = static_cast<mem_info *>(manager->malloc(bytes + sizeof(*m)));
+ if (!m) {
+ lderr(cct) << __func__ << " failed to allocate " <<
+ bytes << " + " << sizeof(*m) << " bytes of memory for " << nbufs << dendl;
+ return NULL;
+ }
+
+ m->mr = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
+ if (m->mr == NULL) {
+ lderr(cct) << __func__ << " failed to register " <<
+ bytes << " + " << sizeof(*m) << " bytes of memory for " << nbufs << dendl;
+ manager->free(m);
+ return NULL;
+ }
+
+ m->nbufs = nbufs;
+ // save this chunk context
+ m->ctx = g_ctx;
+
+ // note that the memory can be allocated before perf logger is set
+ g_ctx->update_stats(nbufs);
+
+ /* initialize chunks */
+ ch = m->chunks;
+ for (unsigned i = 0; i < nbufs; i++) {
+ ch->lkey = m->mr->lkey;
+ ch->bytes = cct->_conf->ms_async_rdma_buffer_size;
+ ch->offset = 0;
+ ch->buffer = ch->data; // TODO: refactor tx and remove buffer
+ ch = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(ch) + rx_buf_size);
+ }
+
+ return reinterpret_cast<char *>(m->chunks);
+}
+
+
+void Infiniband::MemoryManager::PoolAllocator::free(char * const block)
+{
+ mem_info *m;
+ Mutex::Locker l(lock);
+
+ m = reinterpret_cast<mem_info *>(block) - 1;
+ m->ctx->update_stats(-m->nbufs);
+ ibv_dereg_mr(m->mr);
+ m->ctx->manager->free(m);
+}
+
+Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p)
+ : cct(c), device(d), pd(p),
+ rxbuf_pool_ctx(this),
+ rxbuf_pool(&rxbuf_pool_ctx, sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size,
+ c->_conf->ms_async_rdma_receive_buffers > 0 ?
+ // if possible make initial pool size 2 * receive_queue_len
+ // that way there will be no pool expansion upon receive of the
+ // first packet.
+ (c->_conf->ms_async_rdma_receive_buffers < 2 * c->_conf->ms_async_rdma_receive_queue_len ?
+ c->_conf->ms_async_rdma_receive_buffers : 2 * c->_conf->ms_async_rdma_receive_queue_len) :
+ // rx pool is infinite, we can set any initial size that we want
+ 2 * c->_conf->ms_async_rdma_receive_queue_len)
+{
+}
+
+Infiniband::MemoryManager::~MemoryManager()
+{
+ if (send)
+ delete send;
+}
+
+void* Infiniband::MemoryManager::huge_pages_malloc(size_t size)
+{
+ size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);
+ char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);
+ if (ptr == MAP_FAILED) {
+ ptr = (char *)std::malloc(real_size);
+ if (ptr == NULL) return NULL;
+ real_size = 0;
+ }
+ *((size_t *)ptr) = real_size;
+ return ptr + HUGE_PAGE_SIZE;
+}
+
+void Infiniband::MemoryManager::huge_pages_free(void *ptr)
+{
+ if (ptr == NULL) return;
+ void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
+ size_t real_size = *((size_t *)real_ptr);
+ ceph_assert(real_size % HUGE_PAGE_SIZE == 0);
+ if (real_size != 0)
+ munmap(real_ptr, real_size);
+ else
+ std::free(real_ptr);
+}
+
+
+void* Infiniband::MemoryManager::malloc(size_t size)
+{
+ if (cct->_conf->ms_async_rdma_enable_hugepage)
+ return huge_pages_malloc(size);
+ else
+ return std::malloc(size);
+}
+
+void Infiniband::MemoryManager::free(void *ptr)
+{
+ if (cct->_conf->ms_async_rdma_enable_hugepage)
+ huge_pages_free(ptr);
+ else
+ std::free(ptr);
+}
+
+void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num)
+{
+ ceph_assert(device);
+ ceph_assert(pd);
+
+ send = new Cluster(*this, size);
+ send->fill(tx_num);
+}
+
+void Infiniband::MemoryManager::return_tx(std::vector<Chunk*> &chunks)
+{
+ send->take_back(chunks);
+}
+
+int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t bytes)
+{
+ return send->get_buffers(c, bytes);
+}
+
+static std::atomic<bool> init_prereq = {false};
+
+void Infiniband::verify_prereq(CephContext *cct) {
+
+ //On RDMA MUST be called before fork
+ int rc = ibv_fork_init();
+ if (rc) {
+ lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
+ ceph_abort();
+ }
+
+ ldout(cct, 20) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl;
+ if (cct->_conf->ms_async_rdma_enable_hugepage){
+ rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1);
+ ldout(cct, 0) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl;
+ if (rc) {
+ lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
+ ceph_abort();
+ }
+ }
+
+ //Check ulimit
+ struct rlimit limit;
+ getrlimit(RLIMIT_MEMLOCK, &limit);
+ if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
+ lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
+ " We recommend setting this parameter to infinity" << dendl;
+ }
+ init_prereq = true;
+}
+
+Infiniband::Infiniband(CephContext *cct)
+ : cct(cct), lock("IB lock"),
+ device_name(cct->_conf->ms_async_rdma_device_name),
+ port_num( cct->_conf->ms_async_rdma_port_num)
+{
+ if (!init_prereq)
+ verify_prereq(cct);
+ ldout(cct, 20) << __func__ << " constructing Infiniband..." << dendl;
+}
+
+void Infiniband::init()
+{
+ Mutex::Locker l(lock);
+
+ if (initialized)
+ return;
+
+ device_list = new DeviceList(cct);
+ initialized = true;
+
+ device = device_list->get_device(device_name.c_str());
+ ceph_assert(device);
+ device->binding_port(cct, port_num);
+ ib_physical_port = device->active_port->get_port_num();
+ pd = new ProtectionDomain(cct, device);
+ ceph_assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
+
+ support_srq = cct->_conf->ms_async_rdma_support_srq;
+ if (support_srq)
+ rx_queue_len = device->device_attr->max_srq_wr;
+ else
+ rx_queue_len = device->device_attr->max_qp_wr;
+ if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) {
+ rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len;
+ ldout(cct, 1) << __func__ << " receive queue length is " << rx_queue_len << " receive buffers" << dendl;
+ } else {
+ ldout(cct, 0) << __func__ << " requested receive queue length " <<
+ cct->_conf->ms_async_rdma_receive_queue_len <<
+ " is too big. Setting " << rx_queue_len << dendl;
+ }
+
+ // check for the misconfiguration
+ if (cct->_conf->ms_async_rdma_receive_buffers > 0 &&
+ rx_queue_len > (unsigned)cct->_conf->ms_async_rdma_receive_buffers) {
+ lderr(cct) << __func__ << " rdma_receive_queue_len (" <<
+ rx_queue_len << ") > ms_async_rdma_receive_buffers(" <<
+ cct->_conf->ms_async_rdma_receive_buffers << ")." << dendl;
+ ceph_abort();
+ }
+
+ tx_queue_len = device->device_attr->max_qp_wr;
+ if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) {
+ tx_queue_len = cct->_conf->ms_async_rdma_send_buffers;
+ ldout(cct, 1) << __func__ << " assigning: " << tx_queue_len << " send buffers" << dendl;
+ } else {
+ ldout(cct, 0) << __func__ << " using the max allowed send buffers: " << tx_queue_len << dendl;
+ }
+
+ ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe
+ << " completion entries" << dendl;
+
+ memory_manager = new MemoryManager(cct, device, pd);
+ memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len);
+
+ if (support_srq) {
+ srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
+ post_chunks_to_rq(rx_queue_len, NULL); //add to srq
+ }
+}
+
+Infiniband::~Infiniband()
+{
+ if (!initialized)
+ return;
+ if (support_srq)
+ ibv_destroy_srq(srq);
+ delete memory_manager;
+ delete pd;
+}
+
+/**
+ * Create a shared receive queue. This basically wraps the verbs call.
+ *
+ * \param[in] max_wr
+ * The max number of outstanding work requests in the SRQ.
+ * \param[in] max_sge
+ * The max number of scatter elements per WR.
+ * \return
+ * A valid ibv_srq pointer, or NULL on error.
+ */
+ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
+{
+ ibv_srq_init_attr sia;
+ // FIPS zeroization audit 20191115: this memset is not security related.
+ memset(&sia, 0, sizeof(sia));
+ sia.srq_context = device->ctxt;
+ sia.attr.max_wr = max_wr;
+ sia.attr.max_sge = max_sge;
+ return ibv_create_srq(pd->pd, &sia);
+}
+
+int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
+{
+ return memory_manager->get_send_buffers(c, bytes);
+}
+
+/**
+ * Create a new QueuePair. This factory should be used in preference to
+ * the QueuePair constructor directly, since this lets derivatives of
+ * Infiniband, e.g. MockInfiniband (if it existed),
+ * return mocked out QueuePair derivatives.
+ *
+ * \return
+ * QueuePair on success or NULL if init fails
+ * See QueuePair::QueuePair for parameter documentation.
+ */
+Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx,
+ CompletionQueue* rx, ibv_qp_type type, struct rdma_cm_id *cm_id)
+{
+ Infiniband::QueuePair *qp = new QueuePair(
+ cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len, cm_id);
+ if (qp->init()) {
+ delete qp;
+ return NULL;
+ }
+ return qp;
+}
+
+int Infiniband::post_chunks_to_rq(int num, ibv_qp *qp)
+{
+ int ret, i = 0;
+ ibv_sge isge[num];
+ Chunk *chunk;
+ ibv_recv_wr rx_work_request[num];
+
+ while (i < num) {
+ chunk = get_memory_manager()->get_rx_buffer();
+ if (chunk == NULL) {
+ lderr(cct) << __func__ << " WARNING: out of memory. Requested " << num <<
+ " rx buffers. Got " << i << dendl;
+ if (i == 0)
+ return 0;
+ // if we got some buffers post them and hope for the best
+ rx_work_request[i-1].next = 0;
+ break;
+ }
+
+ isge[i].addr = reinterpret_cast<uint64_t>(chunk->data);
+ isge[i].length = chunk->bytes;
+ isge[i].lkey = chunk->lkey;
+
+ memset(&rx_work_request[i], 0, sizeof(rx_work_request[i]));
+ rx_work_request[i].wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
+ if (i == num - 1) {
+ rx_work_request[i].next = 0;
+ } else {
+ rx_work_request[i].next = &rx_work_request[i+1];
+ }
+ rx_work_request[i].sg_list = &isge[i];
+ rx_work_request[i].num_sge = 1;
+ i++;
+ }
+ ibv_recv_wr *badworkrequest;
+ if (support_srq) {
+ ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
+ ceph_assert(ret == 0);
+ } else {
+ ceph_assert(qp);
+ ret = ibv_post_recv(qp, &rx_work_request[0], &badworkrequest);
+ ceph_assert(ret == 0);
+ }
+ return i;
+}
+
+Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
+{
+ Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
+ if (cc->init()) {
+ delete cc;
+ return NULL;
+ }
+ return cc;
+}
+
+Infiniband::CompletionQueue* Infiniband::create_comp_queue(
+ CephContext *cct, CompletionChannel *cc)
+{
+ Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
+ cct, *this, CQ_DEPTH, cc);
+ if (cq->init()) {
+ delete cq;
+ return NULL;
+ }
+ return cq;
+}
+
+// 1 means no valid buffer read, 0 means got enough buffer
+// else return < 0 means error
+int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
+{
+ char msg[TCP_MSG_LEN];
+ char gid[33];
+ ssize_t r = ::read(sd, &msg, sizeof(msg));
+ // Drop incoming qpt
+ if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+ return -EINVAL;
+ }
+ }
+ if (r < 0) {
+ r = -errno;
+ lderr(cct) << __func__ << " got error " << r << ": "
+ << cpp_strerror(r) << dendl;
+ } else if (r == 0) { // valid disconnect message of length 0
+ ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
+ } else if ((size_t)r != sizeof(msg)) { // invalid message
+ ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
+ r = -EINVAL;
+ } else { // valid message
+ sscanf(msg, "%hx:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
+ wire_gid_to_gid(gid, &(im.gid));
+ ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl;
+ }
+ return r;
+}
+
+int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
+{
+ int retry = 0;
+ ssize_t r;
+
+ char msg[TCP_MSG_LEN];
+ char gid[33];
+retry:
+ gid_to_wire_gid(&(im.gid), gid);
+ sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
+ ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
+ << ", " << im.peer_qpn << ", " << gid << dendl;
+ r = ::write(sd, msg, sizeof(msg));
+ // Drop incoming qpt
+ if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+ return -EINVAL;
+ }
+ }
+
+ if ((size_t)r != sizeof(msg)) {
+ // FIXME need to handle EAGAIN instead of retry
+ if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
+ retry++;
+ goto retry;
+ }
+ if (r < 0)
+ lderr(cct) << __func__ << " send returned error " << errno << ": "
+ << cpp_strerror(errno) << dendl;
+ else
+ lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+ return 0;
+}
+
+void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)
+{
+ char tmp[9];
+ uint32_t v32;
+ int i;
+
+ for (tmp[8] = 0, i = 0; i < 4; ++i) {
+ memcpy(tmp, wgid + i * 8, 8);
+ sscanf(tmp, "%x", &v32);
+ *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);
+ }
+}
+
+void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])
+{
+ for (int i = 0; i < 4; ++i)
+ sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));
+}
+
+Infiniband::QueuePair::~QueuePair()
+{
+ if (qp) {
+ ldout(cct, 20) << __func__ << " destroy qp=" << qp << dendl;
+ ceph_assert(!ibv_destroy_qp(qp));
+ }
+}
+
+/**
+ * Given a string representation of the `status' field from Verbs
+ * struct `ibv_wc'.
+ *
+ * \param[in] status
+ * The integer status obtained in ibv_wc.status.
+ * \return
+ * A string corresponding to the given status.
+ */
+const char* Infiniband::wc_status_to_string(int status)
+{
+ static const char *lookup[] = {
+ "SUCCESS",
+ "LOC_LEN_ERR",
+ "LOC_QP_OP_ERR",
+ "LOC_EEC_OP_ERR",
+ "LOC_PROT_ERR",
+ "WR_FLUSH_ERR",
+ "MW_BIND_ERR",
+ "BAD_RESP_ERR",
+ "LOC_ACCESS_ERR",
+ "REM_INV_REQ_ERR",
+ "REM_ACCESS_ERR",
+ "REM_OP_ERR",
+ "RETRY_EXC_ERR",
+ "RNR_RETRY_EXC_ERR",
+ "LOC_RDD_VIOL_ERR",
+ "REM_INV_RD_REQ_ERR",
+ "REM_ABORT_ERR",
+ "INV_EECN_ERR",
+ "INV_EEC_STATE_ERR",
+ "FATAL_ERR",
+ "RESP_TIMEOUT_ERR",
+ "GENERAL_ERR"
+ };
+
+ if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)
+ return "<status out of range!>";
+ return lookup[status];
+}
+
+const char* Infiniband::qp_state_string(int status) {
+ switch(status) {
+ case IBV_QPS_RESET : return "IBV_QPS_RESET";
+ case IBV_QPS_INIT : return "IBV_QPS_INIT";
+ case IBV_QPS_RTR : return "IBV_QPS_RTR";
+ case IBV_QPS_RTS : return "IBV_QPS_RTS";
+ case IBV_QPS_SQD : return "IBV_QPS_SQD";
+ case IBV_QPS_SQE : return "IBV_QPS_SQE";
+ case IBV_QPS_ERR : return "IBV_QPS_ERR";
+ default: return " out of range.";
+ }
+}