summaryrefslogtreecommitdiffstats
path: root/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc
blob: 210eaf003388b716e9c7c49fe72c9d0a7e5d5ed1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#include <poll.h>

#include "msg/async/net_handler.h"
#include "RDMAStack.h"

#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << " RDMAIWARPServerSocketImpl "

RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(
  CephContext *cct, Infiniband* i,
  RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a, unsigned addr_slot)
  : RDMAServerSocketImpl(cct, i, s, w, a, addr_slot)
{
}

int RDMAIWARPServerSocketImpl::listen(entity_addr_t &sa,
				      const SocketOptions &opt)
{
  ldout(cct, 20) << __func__ << " bind to rdma point" << dendl;
  cm_channel = rdma_create_event_channel();
  rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP);
  ldout(cct, 20) << __func__ << " successfully created cm id: " << cm_id << dendl;
  int rc = rdma_bind_addr(cm_id, const_cast<struct sockaddr*>(sa.get_sockaddr()));
  if (rc < 0) {
    rc = -errno;
    ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
                   << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;
    goto err;
  }
  rc = rdma_listen(cm_id, 128);
  if (rc < 0) {
    rc = -errno;
    ldout(cct, 10) << __func__ << " unable to listen to " << sa.get_sockaddr()
                   << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;
    goto err;
  }
  server_setup_socket = cm_channel->fd;
  ldout(cct, 20) << __func__ << " fd of cm_channel is " << server_setup_socket << dendl;
  return 0;

err:
  server_setup_socket = -1;
  rdma_destroy_id(cm_id);
  rdma_destroy_event_channel(cm_channel);
  return rc;
}

int RDMAIWARPServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt,
    entity_addr_t *out, Worker *w)
{
  ldout(cct, 15) << __func__ << dendl;

  ceph_assert(sock);
  struct pollfd pfd = {
    .fd = cm_channel->fd,
    .events = POLLIN,
  };
  int ret = poll(&pfd, 1, 0);
  ceph_assert(ret >= 0);
  if (!ret)
    return -EAGAIN;

  struct rdma_cm_event *cm_event;
  rdma_get_cm_event(cm_channel, &cm_event);
  ldout(cct, 20) << __func__ << " event name: " << rdma_event_str(cm_event->event) << dendl;

  struct rdma_cm_id *event_cm_id = cm_event->id;
  struct rdma_event_channel *event_channel = rdma_create_event_channel();

  rdma_migrate_id(event_cm_id, event_channel);

  struct rdma_cm_id *new_cm_id = event_cm_id;
  struct rdma_conn_param *remote_conn_param = &cm_event->param.conn;
  struct rdma_conn_param local_conn_param;

  RDMACMInfo info(new_cm_id, event_channel, remote_conn_param->qp_num);
  RDMAIWARPConnectedSocketImpl* server =
    new RDMAIWARPConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w), &info);

  // FIPS zeroization audit 20191115: this memset is not security related.
  memset(&local_conn_param, 0, sizeof(local_conn_param));
  local_conn_param.qp_num = server->get_local_qpn();

  if (rdma_accept(new_cm_id, &local_conn_param)) {
    return -EAGAIN;
  }
  server->activate();
  ldout(cct, 20) << __func__ << " accepted a new QP" << dendl;

  rdma_ack_cm_event(cm_event);

  std::unique_ptr<RDMAConnectedSocketImpl> csi(server);
  *sock = ConnectedSocket(std::move(csi));
  struct sockaddr *addr = &new_cm_id->route.addr.dst_addr;
  out->set_sockaddr(addr);

  return 0;
}

void RDMAIWARPServerSocketImpl::abort_accept()
{
  if (server_setup_socket >= 0) {
    rdma_destroy_id(cm_id);
    rdma_destroy_event_channel(cm_channel);
  }
}