From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/msg/async/rdma/RDMAConnectedSocketImpl.cc | 602 ++++++++++++++++++++++++++ 1 file changed, 602 insertions(+) create mode 100644 src/msg/async/rdma/RDMAConnectedSocketImpl.cc (limited to 'src/msg/async/rdma/RDMAConnectedSocketImpl.cc') diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc new file mode 100644 index 000000000..5ab6c9b2e --- /dev/null +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -0,0 +1,602 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#include "RDMAStack.h" + +class C_handle_connection_established : public EventCallback { + RDMAConnectedSocketImpl *csi; + bool active = true; + public: + C_handle_connection_established(RDMAConnectedSocketImpl *w) : csi(w) {} + void do_request(uint64_t fd) final { + if (active) + csi->handle_connection_established(); + } + void close() { + active = false; + } +}; + +class C_handle_connection_read : public EventCallback { + RDMAConnectedSocketImpl *csi; + bool active = true; + public: + explicit C_handle_connection_read(RDMAConnectedSocketImpl *w): csi(w) {} + void do_request(uint64_t fd) final { + if (active) + csi->handle_connection(); + } + void close() { + active = false; + } +}; + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << " RDMAConnectedSocketImpl " + +RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, std::shared_ptr &ib, + std::shared_ptr& rdma_dispatcher, + RDMAWorker *w) + : cct(cct), connected(0), error(0), ib(ib), + dispatcher(rdma_dispatcher), worker(w), + is_server(false), read_handler(new C_handle_connection_read(this)), + established_handler(new C_handle_connection_established(this)), + active(false), pending(false) +{ + if (!cct->_conf->ms_async_rdma_cm) { + qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL); + if (!qp) { + lderr(cct) << __func__ << " queue pair create failed" << dendl; + return; + } + local_qpn = qp->get_local_qp_number(); + notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); + dispatcher->register_qp(qp, this); + dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair); + dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair); + } +} + +RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() +{ + ldout(cct, 20) << __func__ << " destruct." << dendl; + cleanup(); + worker->remove_pending_conn(this); + dispatcher->schedule_qp_destroy(local_qpn); + + for (unsigned i=0; i < wc.size(); ++i) { + dispatcher->post_chunk_to_pool(reinterpret_cast(wc[i].wr_id)); + } + for (unsigned i=0; i < buffers.size(); ++i) { + dispatcher->post_chunk_to_pool(buffers[i]); + } + + std::lock_guard l{lock}; + if (notify_fd >= 0) + ::close(notify_fd); + if (tcp_fd >= 0) + ::close(tcp_fd); + error = ECONNRESET; +} + +void RDMAConnectedSocketImpl::pass_wc(std::vector &&v) +{ + std::lock_guard l{lock}; + if (wc.empty()) + wc = std::move(v); + else + wc.insert(wc.end(), v.begin(), v.end()); + notify(); +} + +void RDMAConnectedSocketImpl::get_wc(std::vector &w) +{ + std::lock_guard l{lock}; + if (wc.empty()) + return ; + w.swap(wc); +} + +int RDMAConnectedSocketImpl::activate() +{ + qp->get_local_cm_meta().peer_qpn = qp->get_peer_cm_meta().local_qpn; + if (qp->modify_qp_to_rtr() != 0) + return -1; + + if (qp->modify_qp_to_rts() != 0) + return -1; + + if (!is_server) { + connected = 1; //indicate successfully + ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_qpn << dendl; + submit(false); + } + active = true; + peer_qpn = qp->get_local_cm_meta().peer_qpn; + + return 0; +} + +int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { + ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:" + << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl; + ceph::NetHandler net(cct); + + // we construct a socket to transport ib sync message + // but we shouldn't block in tcp connecting + if (opts.nonblock) { + tcp_fd = net.nonblock_connect(peer_addr, opts.connect_bind_addr); + } else { + tcp_fd = net.connect(peer_addr, opts.connect_bind_addr); + } + + if (tcp_fd < 0) { + return -errno; + } + + int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size); + if (r < 0) { + ::close(tcp_fd); + tcp_fd = -1; + return -errno; + } + + ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl; + net.set_priority(tcp_fd, opts.priority, peer_addr.get_family()); + r = 0; + if (opts.nonblock) { + worker->center.create_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE , established_handler); + } else { + r = handle_connection_established(false); + } + return r; +} + +int RDMAConnectedSocketImpl::handle_connection_established(bool need_set_fault) { + ldout(cct, 20) << __func__ << " start " << dendl; + // delete read event + worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE); + if (1 == connected) { + ldout(cct, 1) << __func__ << " warnning: logic failed " << dendl; + if (need_set_fault) { + fault(); + } + return -1; + } + // send handshake msg to server + qp->get_local_cm_meta().peer_qpn = 0; + int r = qp->send_cm_meta(cct, tcp_fd); + if (r < 0) { + ldout(cct, 1) << __func__ << " send handshake msg failed." << r << dendl; + if (need_set_fault) { + fault(); + } + return r; + } + worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler); + ldout(cct, 20) << __func__ << " finish " << dendl; + return 0; +} + +void RDMAConnectedSocketImpl::handle_connection() { + ldout(cct, 20) << __func__ << " QP: " << local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl; + int r = qp->recv_cm_meta(cct, tcp_fd); + if (r <= 0) { + if (r != -EAGAIN) { + dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); + ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl; + fault(); + } + return; + } + + if (1 == connected) { + ldout(cct, 1) << __func__ << " warnning: logic failed: read len: " << r << dendl; + fault(); + return; + } + + if (!is_server) {// first time: cm meta sync + ack from server + if (!connected) { + r = activate(); + ceph_assert(!r); + } + notify(); + r = qp->send_cm_meta(cct, tcp_fd); + if (r < 0) { + ldout(cct, 1) << __func__ << " send client ack failed." << dendl; + dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); + fault(); + } + } else { + if (qp->get_peer_cm_meta().peer_qpn == 0) {// first time: cm meta sync from client + if (active) { + ldout(cct, 10) << __func__ << " server is already active." << dendl; + return ; + } + r = activate(); + ceph_assert(!r); + r = qp->send_cm_meta(cct, tcp_fd); + if (r < 0) { + ldout(cct, 1) << __func__ << " server ack failed." << dendl; + dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); + fault(); + return ; + } + } else { // second time: cm meta ack from client + connected = 1; + ldout(cct, 10) << __func__ << " handshake of rdma is done. server connected: " << connected << dendl; + //cleanup(); + submit(false); + notify(); + } + } +} + +ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) +{ + eventfd_t event_val = 0; + int r = eventfd_read(notify_fd, &event_val); + ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_qpn + << " r = " << r << dendl; + + if (!active) { + ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl; + return -EAGAIN; + } + + if (0 == connected) { + ldout(cct, 1) << __func__ << " when ib not connected. len: " << len < cqe; + get_wc(cqe); + if(cqe.empty()) + return; + + for(size_t i = 0; i < cqe.size(); ++i) { + ibv_wc* response = &cqe[i]; + ceph_assert(response->status == IBV_WC_SUCCESS); + Chunk* chunk = reinterpret_cast(response->wr_id); + chunk->prepare_read(response->byte_len); + + if (chunk->get_size() == 0) { + chunk->reset_read_chunk(); + dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin); + if (connected) { + error = ECONNRESET; + ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; + } + dispatcher->post_chunk_to_pool(chunk); + continue; + } else { + buffers.push_back(chunk); + ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; + } + } + worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size()); +} + +ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) +{ + size_t read_size = 0, tmp = 0; + buffer_prefetch(); + auto pchunk = buffers.begin(); + while (pchunk != buffers.end()) { + tmp = (*pchunk)->read(buf + read_size, len - read_size); + read_size += tmp; + ldout(cct, 25) << __func__ << " read chunk " << *pchunk << " bytes length" << tmp << " offset: " + << (*pchunk)->get_offset() << " ,bound: " << (*pchunk)->get_bound() << dendl; + + if ((*pchunk)->get_size() == 0) { + (*pchunk)->reset_read_chunk(); + dispatcher->post_chunk_to_pool(*pchunk); + update_post_backlog(); + ldout(cct, 25) << __func__ << " read over one chunk " << dendl; + pchunk++; + } + + if (read_size == len) { + break; + } + } + + buffers.erase(buffers.begin(), pchunk); + ldout(cct, 25) << __func__ << " got " << read_size << " bytes, buffers size: " << buffers.size() << dendl; + worker->perf_logger->inc(l_msgr_rdma_rx_bytes, read_size); + return read_size; +} + +ssize_t RDMAConnectedSocketImpl::send(ceph::buffer::list &bl, bool more) +{ + if (error) { + if (!active) + return -EPIPE; + return -error; + } + size_t bytes = bl.length(); + if (!bytes) + return 0; + { + std::lock_guard l{lock}; + pending_bl.claim_append(bl); + if (!connected) { + ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_qpn << dendl; + return bytes; + } + } + ldout(cct, 20) << __func__ << " QP: " << local_qpn << dendl; + ssize_t r = submit(more); + if (r < 0 && r != -EAGAIN) + return r; + return bytes; +} + +size_t RDMAConnectedSocketImpl::tx_copy_chunk(std::vector &tx_buffers, + size_t req_copy_len, decltype(std::cbegin(pending_bl.buffers()))& start, + const decltype(std::cbegin(pending_bl.buffers()))& end) +{ + ceph_assert(start != end); + auto chunk_idx = tx_buffers.size(); + if (0 == worker->get_reged_mem(this, tx_buffers, req_copy_len)) { + ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_no_mem); + return 0; + } + + Chunk *current_chunk = tx_buffers[chunk_idx]; + size_t write_len = 0; + while (start != end) { + const uintptr_t addr = reinterpret_cast(start->c_str()); + + size_t slice_write_len = 0; + while (slice_write_len < start->length()) { + size_t real_len = current_chunk->write((char*)addr + slice_write_len, start->length() - slice_write_len); + + slice_write_len += real_len; + write_len += real_len; + req_copy_len -= real_len; + + if (current_chunk->full()) { + if (++chunk_idx == tx_buffers.size()) + return write_len; + current_chunk = tx_buffers[chunk_idx]; + } + } + + ++start; + } + ceph_assert(req_copy_len == 0); + return write_len; +} + +ssize_t RDMAConnectedSocketImpl::submit(bool more) +{ + if (error) + return -error; + std::lock_guard l{lock}; + size_t bytes = pending_bl.length(); + ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: " + << pending_bl.get_num_buffers() << dendl; + if (!bytes) + return 0; + + std::vector tx_buffers; + auto it = std::cbegin(pending_bl.buffers()); + auto copy_start = it; + size_t total_copied = 0, wait_copy_len = 0; + while (it != pending_bl.buffers().end()) { + if (ib->is_tx_buffer(it->raw_c_str())) { + if (wait_copy_len) { + size_t copied = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it); + total_copied += copied; + if (copied < wait_copy_len) + goto sending; + wait_copy_len = 0; + } + ceph_assert(copy_start == it); + tx_buffers.push_back(ib->get_tx_chunk_by_buffer(it->raw_c_str())); + total_copied += it->length(); + ++copy_start; + } else { + wait_copy_len += it->length(); + } + ++it; + } + if (wait_copy_len) + total_copied += tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it); + + sending: + if (total_copied == 0) + return -EAGAIN; + ceph_assert(total_copied <= pending_bl.length()); + ceph::buffer::list swapped; + if (total_copied < pending_bl.length()) { + worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem); + pending_bl.splice(total_copied, pending_bl.length() - total_copied, &swapped); + pending_bl.swap(swapped); + } else { + pending_bl.clear(); + } + + ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers " + << pending_bl.get_num_buffers() << " tx chunks " << tx_buffers.size() << dendl; + + int r = post_work_request(tx_buffers); + if (r < 0) + return r; + + ldout(cct, 20) << __func__ << " finished sending " << total_copied << " bytes." << dendl; + return pending_bl.length() ? -EAGAIN : 0; +} + +int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) +{ + ldout(cct, 20) << __func__ << " QP: " << local_qpn << " " << tx_buffers[0] << dendl; + auto current_buffer = tx_buffers.begin(); + ibv_sge isge[tx_buffers.size()]; + uint32_t current_sge = 0; + ibv_send_wr iswr[tx_buffers.size()]; + uint32_t current_swr = 0; + ibv_send_wr* pre_wr = NULL; + uint32_t num = 0; + + // FIPS zeroization audit 20191115: these memsets are not security related. + memset(iswr, 0, sizeof(iswr)); + memset(isge, 0, sizeof(isge)); + + while (current_buffer != tx_buffers.end()) { + isge[current_sge].addr = reinterpret_cast((*current_buffer)->buffer); + isge[current_sge].length = (*current_buffer)->get_offset(); + isge[current_sge].lkey = (*current_buffer)->mr->lkey; + ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl; + + iswr[current_swr].wr_id = reinterpret_cast(*current_buffer); + iswr[current_swr].next = NULL; + iswr[current_swr].sg_list = &isge[current_sge]; + iswr[current_swr].num_sge = 1; + iswr[current_swr].opcode = IBV_WR_SEND; + iswr[current_swr].send_flags = IBV_SEND_SIGNALED; + + num++; + worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length); + if (pre_wr) + pre_wr->next = &iswr[current_swr]; + pre_wr = &iswr[current_swr]; + ++current_sge; + ++current_swr; + ++current_buffer; + } + + ibv_send_wr *bad_tx_work_request = nullptr; + if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) { + ldout(cct, 1) << __func__ << " failed to send data" + << " (most probably should be peer not ready): " + << cpp_strerror(errno) << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_failed); + return -errno; + } + worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size()); + ldout(cct, 20) << __func__ << " qp state is " << get_qp_state() << dendl; + return 0; +} + +void RDMAConnectedSocketImpl::fin() { + ibv_send_wr wr; + // FIPS zeroization audit 20191115: this memset is not security related. + memset(&wr, 0, sizeof(wr)); + + wr.wr_id = reinterpret_cast(qp); + wr.num_sge = 0; + wr.opcode = IBV_WR_SEND; + wr.send_flags = IBV_SEND_SIGNALED; + ibv_send_wr* bad_tx_work_request = nullptr; + if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) { + ldout(cct, 1) << __func__ << " failed to send message=" + << " ibv_post_send failed(most probably should be peer not ready): " + << cpp_strerror(errno) << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_failed); + return ; + } +} + +void RDMAConnectedSocketImpl::cleanup() { + if (read_handler && tcp_fd >= 0) { + (static_cast(read_handler))->close(); + worker->center.submit_to(worker->center.get_id(), [this]() { + worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE); + }, false); + delete read_handler; + read_handler = nullptr; + } + if (established_handler) { + (static_cast(established_handler))->close(); + delete established_handler; + established_handler = nullptr; + } +} + +void RDMAConnectedSocketImpl::notify() +{ + eventfd_t event_val = 1; + int r = eventfd_write(notify_fd, event_val); + ceph_assert(r == 0); +} + +void RDMAConnectedSocketImpl::shutdown() +{ + if (!error) + fin(); + error = ECONNRESET; + active = false; +} + +void RDMAConnectedSocketImpl::close() +{ + if (!error) + fin(); + error = ECONNRESET; + active = false; +} + +void RDMAConnectedSocketImpl::fault() +{ + ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl; + error = ECONNRESET; + connected = 1; + notify(); +} + +void RDMAConnectedSocketImpl::set_accept_fd(int sd) +{ + tcp_fd = sd; + is_server = true; + worker->center.submit_to(worker->center.get_id(), [this]() { + worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler); + }, true); +} + +void RDMAConnectedSocketImpl::post_chunks_to_rq(int num) +{ + post_backlog += num - ib->post_chunks_to_rq(num, qp); +} + +void RDMAConnectedSocketImpl::update_post_backlog() +{ + if (post_backlog) + post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp); +} -- cgit v1.2.3