diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/msg/async/rdma/RDMAConnectedSocketImpl.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/msg/async/rdma/RDMAConnectedSocketImpl.cc | 743 |
1 files changed, 743 insertions, 0 deletions
diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc new file mode 100644 index 00000000..89be7428 --- /dev/null +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -0,0 +1,743 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY <haomai@xsky.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#include "RDMAStack.h" + +class C_handle_connection_established : public EventCallback { + RDMAConnectedSocketImpl *csi; + bool active = true; + public: + C_handle_connection_established(RDMAConnectedSocketImpl *w) : csi(w) {} + void do_request(uint64_t fd) final { + if (active) + csi->handle_connection_established(); + } + void close() { + active = false; + } +}; + +class C_handle_connection_read : public EventCallback { + RDMAConnectedSocketImpl *csi; + bool active = true; + public: + explicit C_handle_connection_read(RDMAConnectedSocketImpl *w): csi(w) {} + void do_request(uint64_t fd) final { + if (active) + csi->handle_connection(); + } + void close() { + active = false; + } +}; + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << " RDMAConnectedSocketImpl " + +RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, + RDMAWorker *w) + : cct(cct), connected(0), error(0), infiniband(ib), + dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"), + is_server(false), read_handler(new C_handle_connection_read(this)), + established_handler(new C_handle_connection_established(this)), + active(false), pending(false) +{ + if (!cct->_conf->ms_async_rdma_cm) { + qp = infiniband->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL); + my_msg.qpn = qp->get_local_qp_number(); + my_msg.psn = qp->get_initial_psn(); + my_msg.lid = infiniband->get_lid(); + my_msg.peer_qpn = 0; + my_msg.gid = infiniband->get_gid(); + notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); + dispatcher->register_qp(qp, this); + dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair); + dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair); + } +} + +RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() +{ + ldout(cct, 20) << __func__ << " destruct." << dendl; + cleanup(); + worker->remove_pending_conn(this); + dispatcher->erase_qpn(my_msg.qpn); + + for (unsigned i=0; i < wc.size(); ++i) { + dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id)); + } + for (unsigned i=0; i < buffers.size(); ++i) { + dispatcher->post_chunk_to_pool(buffers[i]); + } + + Mutex::Locker l(lock); + if (notify_fd >= 0) + ::close(notify_fd); + if (tcp_fd >= 0) + ::close(tcp_fd); + error = ECONNRESET; +} + +void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v) +{ + Mutex::Locker l(lock); + if (wc.empty()) + wc = std::move(v); + else + wc.insert(wc.end(), v.begin(), v.end()); + notify(); +} + +void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w) +{ + Mutex::Locker l(lock); + if (wc.empty()) + return ; + w.swap(wc); +} + +int RDMAConnectedSocketImpl::activate() +{ + ibv_qp_attr qpa; + int r; + + // now connect up the qps and switch to RTR + memset(&qpa, 0, sizeof(qpa)); + qpa.qp_state = IBV_QPS_RTR; + qpa.path_mtu = IBV_MTU_1024; + qpa.dest_qp_num = peer_msg.qpn; + qpa.rq_psn = peer_msg.psn; + qpa.max_dest_rd_atomic = 1; + qpa.min_rnr_timer = 12; + //qpa.ah_attr.is_global = 0; + qpa.ah_attr.is_global = 1; + qpa.ah_attr.grh.hop_limit = 6; + qpa.ah_attr.grh.dgid = peer_msg.gid; + + qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx(); + + qpa.ah_attr.dlid = peer_msg.lid; + qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl; + qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp; + qpa.ah_attr.src_path_bits = 0; + qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port()); + + ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl; + + r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | + IBV_QP_AV | + IBV_QP_PATH_MTU | + IBV_QP_DEST_QPN | + IBV_QP_RQ_PSN | + IBV_QP_MIN_RNR_TIMER | + IBV_QP_MAX_DEST_RD_ATOMIC); + if (r) { + lderr(cct) << __func__ << " failed to transition to RTR state: " + << cpp_strerror(errno) << dendl; + return -1; + } + + ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl; + + // now move to RTS + qpa.qp_state = IBV_QPS_RTS; + + // How long to wait before retrying if packet lost or server dead. + // Supposedly the timeout is 4.096us*2^timeout. However, the actual + // timeout appears to be 4.096us*2^(timeout+1), so the setting + // below creates a 135ms timeout. + qpa.timeout = 14; + + // How many times to retry after timeouts before giving up. + qpa.retry_cnt = 7; + + // How many times to retry after RNR (receiver not ready) condition + // before giving up. Occurs when the remote side has not yet posted + // a receive request. + qpa.rnr_retry = 7; // 7 is infinite retry. + qpa.sq_psn = my_msg.psn; + qpa.max_rd_atomic = 1; + + r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | + IBV_QP_TIMEOUT | + IBV_QP_RETRY_CNT | + IBV_QP_RNR_RETRY | + IBV_QP_SQ_PSN | + IBV_QP_MAX_QP_RD_ATOMIC); + if (r) { + lderr(cct) << __func__ << " failed to transition to RTS state: " + << cpp_strerror(errno) << dendl; + return -1; + } + + // the queue pair should be ready to use once the client has finished + // setting up their end. + ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl; + ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl; + + if (!is_server) { + connected = 1; //indicate successfully + ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl; + submit(false); + } + active = true; + + return 0; +} + +int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { + ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:" + << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl; + NetHandler net(cct); + + // we construct a socket to transport ib sync message + // but we shouldn't block in tcp connecting + if (opts.nonblock) { + tcp_fd = net.nonblock_connect(peer_addr, opts.connect_bind_addr); + } else { + tcp_fd = net.connect(peer_addr, opts.connect_bind_addr); + } + + if (tcp_fd < 0) { + return -errno; + } + + int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size); + if (r < 0) { + ::close(tcp_fd); + tcp_fd = -1; + return -errno; + } + + ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl; + net.set_priority(tcp_fd, opts.priority, peer_addr.get_family()); + r = 0; + if (opts.nonblock) { + worker->center.create_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE , established_handler); + } else { + r = handle_connection_established(false); + } + return r; +} + +int RDMAConnectedSocketImpl::handle_connection_established(bool need_set_fault) { + ldout(cct, 20) << __func__ << " start " << dendl; + // delete read event + worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE); + if (1 == connected) { + ldout(cct, 1) << __func__ << " warnning: logic failed " << dendl; + if (need_set_fault) { + fault(); + } + return -1; + } + // send handshake msg to server + my_msg.peer_qpn = 0; + int r = infiniband->send_msg(cct, tcp_fd, my_msg); + if (r < 0) { + ldout(cct, 1) << __func__ << " send handshake msg failed." << r << dendl; + if (need_set_fault) { + fault(); + } + return r; + } + worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler); + ldout(cct, 20) << __func__ << " finish " << dendl; + return 0; +} + +void RDMAConnectedSocketImpl::handle_connection() { + ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl; + int r = infiniband->recv_msg(cct, tcp_fd, peer_msg); + if (r <= 0) { + if (r != -EAGAIN) { + dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); + ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl; + fault(); + } + return; + } + + if (1 == connected) { + ldout(cct, 1) << __func__ << " warnning: logic failed: read len: " << r << dendl; + fault(); + return; + } + + if (!is_server) {// syn + ack from server + my_msg.peer_qpn = peer_msg.qpn; + ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn + << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl; + if (!connected) { + r = activate(); + ceph_assert(!r); + } + notify(); + r = infiniband->send_msg(cct, tcp_fd, my_msg); + if (r < 0) { + ldout(cct, 1) << __func__ << " send client ack failed." << dendl; + dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); + fault(); + } + } else { + if (peer_msg.peer_qpn == 0) {// syn from client + if (active) { + ldout(cct, 10) << __func__ << " server is already active." << dendl; + return ; + } + r = activate(); + ceph_assert(!r); + r = infiniband->send_msg(cct, tcp_fd, my_msg); + if (r < 0) { + ldout(cct, 1) << __func__ << " server ack failed." << dendl; + dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); + fault(); + return ; + } + } else { // ack from client + connected = 1; + ldout(cct, 10) << __func__ << " handshake of rdma is done. server connected: " << connected << dendl; + //cleanup(); + submit(false); + notify(); + } + } +} + +ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) +{ + uint64_t i = 0; + int r = ::read(notify_fd, &i, sizeof(i)); + ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl; + + if (!active) { + ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl; + return -EAGAIN; + } + + if (0 == connected) { + ldout(cct, 1) << __func__ << " when ib not connected. len: " << len <<dendl; + return -EAGAIN; + } + ssize_t read = 0; + if (!buffers.empty()) + read = read_buffers(buf,len); + + std::vector<ibv_wc> cqe; + get_wc(cqe); + if (cqe.empty()) { + if (!buffers.empty()) { + notify(); + } + if (read > 0) { + return read; + } + if (error) { + return -error; + } else { + return -EAGAIN; + } + } + + ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl; + for (size_t i = 0; i < cqe.size(); ++i) { + ibv_wc* response = &cqe[i]; + ceph_assert(response->status == IBV_WC_SUCCESS); + Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id); + ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl; + chunk->prepare_read(response->byte_len); + worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len); + if (response->byte_len == 0) { + dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin); + if (connected) { + error = ECONNRESET; + ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; + } + dispatcher->post_chunk_to_pool(chunk); + } else { + if (read == (ssize_t)len) { + buffers.push_back(chunk); + ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl; + } else if (read + response->byte_len > (ssize_t)len) { + read += chunk->read(buf+read, (ssize_t)len-read); + buffers.push_back(chunk); + ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; + } else { + read += chunk->read(buf+read, response->byte_len); + dispatcher->post_chunk_to_pool(chunk); + update_post_backlog(); + } + } + } + + worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size()); + if (is_server && connected == 0) { + ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl; + connected = 1; //if so, we don't need the last handshake + cleanup(); + submit(false); + } + + if (!buffers.empty()) { + notify(); + } + + if (read == 0 && error) + return -error; + return read == 0 ? -EAGAIN : read; +} + +ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) +{ + size_t read = 0, tmp = 0; + auto c = buffers.begin(); + for (; c != buffers.end() ; ++c) { + tmp = (*c)->read(buf+read, len-read); + read += tmp; + ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl; + if ((*c)->over()) { + dispatcher->post_chunk_to_pool(*c); + update_post_backlog(); + ldout(cct, 25) << __func__ << " one chunk over." << dendl; + } + if (read == len) { + break; + } + } + + if (c != buffers.end() && (*c)->over()) + ++c; + buffers.erase(buffers.begin(), c); + ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl; + return read; +} + +ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data) +{ + if (error) + return -error; + static const int MAX_COMPLETIONS = 16; + ibv_wc wc[MAX_COMPLETIONS]; + ssize_t size = 0; + + ibv_wc* response; + Chunk* chunk; + bool loaded = false; + auto iter = buffers.begin(); + if (iter != buffers.end()) { + chunk = *iter; + // FIXME need to handle release + // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); + buffers.erase(iter); + loaded = true; + size = chunk->bound; + } + + std::vector<ibv_wc> cqe; + get_wc(cqe); + if (cqe.empty()) + return size == 0 ? -EAGAIN : size; + + ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl; + + for (size_t i = 0; i < cqe.size(); ++i) { + response = &wc[i]; + chunk = reinterpret_cast<Chunk*>(response->wr_id); + chunk->prepare_read(response->byte_len); + if (!loaded && i == 0) { + // FIXME need to handle release + // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); + size = chunk->bound; + continue; + } + buffers.push_back(chunk); + iter++; + } + + if (size == 0) + return -EAGAIN; + return size; +} + +ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) +{ + if (error) { + if (!active) + return -EPIPE; + return -error; + } + size_t bytes = bl.length(); + if (!bytes) + return 0; + { + Mutex::Locker l(lock); + pending_bl.claim_append(bl); + if (!connected) { + ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl; + return bytes; + } + } + ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl; + ssize_t r = submit(more); + if (r < 0 && r != -EAGAIN) + return r; + return bytes; +} + +ssize_t RDMAConnectedSocketImpl::submit(bool more) +{ + if (error) + return -error; + Mutex::Locker l(lock); + size_t bytes = pending_bl.length(); + ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: " + << pending_bl.buffers().size() << dendl; + if (!bytes) + return 0; + + auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers, + unsigned bytes, + auto& start, + const auto& end) -> unsigned { + ceph_assert(start != end); + auto chunk_idx = tx_buffers.size(); + int ret = worker->get_reged_mem(this, tx_buffers, bytes); + if (ret == 0) { + ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_no_mem); + return 0; + } + + unsigned total_copied = 0; + Chunk *current_chunk = tx_buffers[chunk_idx]; + while (start != end) { + const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str()); + unsigned copied = 0; + while (copied < start->length()) { + uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied); + copied += r; + total_copied += r; + bytes -= r; + if (current_chunk->full()){ + if (++chunk_idx == tx_buffers.size()) + return total_copied; + current_chunk = tx_buffers[chunk_idx]; + } + } + ++start; + } + ceph_assert(bytes == 0); + return total_copied; + }; + + std::vector<Chunk*> tx_buffers; + auto it = std::cbegin(pending_bl.buffers()); + auto copy_it = it; + unsigned total = 0; + unsigned need_reserve_bytes = 0; + while (it != pending_bl.buffers().end()) { + if (infiniband->is_tx_buffer(it->raw_c_str())) { + if (need_reserve_bytes) { + unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); + total += copied; + if (copied < need_reserve_bytes) + goto sending; + need_reserve_bytes = 0; + } + ceph_assert(copy_it == it); + tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str())); + total += it->length(); + ++copy_it; + } else { + need_reserve_bytes += it->length(); + } + ++it; + } + if (need_reserve_bytes) + total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); + + sending: + if (total == 0) + return -EAGAIN; + ceph_assert(total <= pending_bl.length()); + bufferlist swapped; + if (total < pending_bl.length()) { + worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem); + pending_bl.splice(total, pending_bl.length()-total, &swapped); + pending_bl.swap(swapped); + } else { + pending_bl.clear(); + } + + ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers " + << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl; + + int r = post_work_request(tx_buffers); + if (r < 0) + return r; + + ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl; + return pending_bl.length() ? -EAGAIN : 0; +} + +int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers) +{ + ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl; + vector<Chunk*>::iterator current_buffer = tx_buffers.begin(); + ibv_sge isge[tx_buffers.size()]; + uint32_t current_sge = 0; + ibv_send_wr iswr[tx_buffers.size()]; + uint32_t current_swr = 0; + ibv_send_wr* pre_wr = NULL; + uint32_t num = 0; + + // FIPS zeroization audit 20191115: these memsets are not security related. + memset(iswr, 0, sizeof(iswr)); + memset(isge, 0, sizeof(isge)); + + while (current_buffer != tx_buffers.end()) { + isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer); + isge[current_sge].length = (*current_buffer)->get_offset(); + isge[current_sge].lkey = (*current_buffer)->mr->lkey; + ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl; + + iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer); + iswr[current_swr].next = NULL; + iswr[current_swr].sg_list = &isge[current_sge]; + iswr[current_swr].num_sge = 1; + iswr[current_swr].opcode = IBV_WR_SEND; + iswr[current_swr].send_flags = IBV_SEND_SIGNALED; + /*if (isge[current_sge].length < infiniband->max_inline_data) { + iswr[current_swr].send_flags = IBV_SEND_INLINE; + ldout(cct, 20) << __func__ << " send_inline." << dendl; + }*/ + + num++; + worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length); + if (pre_wr) + pre_wr->next = &iswr[current_swr]; + pre_wr = &iswr[current_swr]; + ++current_sge; + ++current_swr; + ++current_buffer; + } + + ibv_send_wr *bad_tx_work_request; + if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) { + ldout(cct, 1) << __func__ << " failed to send data" + << " (most probably should be peer not ready): " + << cpp_strerror(errno) << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_failed); + return -errno; + } + qp->add_tx_wr(num); + worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size()); + ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl; + return 0; +} + +void RDMAConnectedSocketImpl::fin() { + ibv_send_wr wr; + // FIPS zeroization audit 20191115: this memset is not security related. + memset(&wr, 0, sizeof(wr)); + + wr.wr_id = reinterpret_cast<uint64_t>(qp); + wr.num_sge = 0; + wr.opcode = IBV_WR_SEND; + wr.send_flags = IBV_SEND_SIGNALED; + ibv_send_wr* bad_tx_work_request; + if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) { + ldout(cct, 1) << __func__ << " failed to send message=" + << " ibv_post_send failed(most probably should be peer not ready): " + << cpp_strerror(errno) << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_failed); + return ; + } + qp->add_tx_wr(1); +} + +void RDMAConnectedSocketImpl::cleanup() { + if (read_handler && tcp_fd >= 0) { + (static_cast<C_handle_connection_read*>(read_handler))->close(); + worker->center.submit_to(worker->center.get_id(), [this]() { + worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE); + }, false); + delete read_handler; + read_handler = nullptr; + } + if (established_handler) { + (static_cast<C_handle_connection_established*>(established_handler))->close(); + delete established_handler; + established_handler = nullptr; + } +} + +void RDMAConnectedSocketImpl::notify() +{ + // note: notify_fd is an event fd (man eventfd) + // write argument must be a 64bit integer + uint64_t i = 1; + + ceph_assert(sizeof(i) == write(notify_fd, &i, sizeof(i))); +} + +void RDMAConnectedSocketImpl::shutdown() +{ + if (!error) + fin(); + error = ECONNRESET; + active = false; +} + +void RDMAConnectedSocketImpl::close() +{ + if (!error) + fin(); + error = ECONNRESET; + active = false; +} + +void RDMAConnectedSocketImpl::fault() +{ + ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl; + /*if (qp) { + qp->to_dead(); + qp = NULL; + }*/ + error = ECONNRESET; + connected = 1; + notify(); +} + +void RDMAConnectedSocketImpl::set_accept_fd(int sd) +{ + tcp_fd = sd; + is_server = true; + worker->center.submit_to(worker->center.get_id(), [this]() { + worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler); + }, true); +} + +void RDMAConnectedSocketImpl::post_chunks_to_rq(int num) +{ + post_backlog += num - infiniband->post_chunks_to_rq(num, qp->get_qp()); +} + +void RDMAConnectedSocketImpl::update_post_backlog() +{ + if (post_backlog) + post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp->get_qp()); +} |