summaryrefslogtreecommitdiffstats
path: root/src/msg/async/rdma/RDMAServerSocketImpl.cc
blob: 98402cfd35469a2a852819ce3ab26ac7167e93d1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2016 XSKY <haomai@xsky.com>
 *
 * Author: Haomai Wang <haomaiwang@gmail.com>
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation.  See file COPYING.
 *
 */

#include "msg/async/net_handler.h"
#include "RDMAStack.h"

#include "include/compat.h"
#include "include/sock_compat.h"

#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << " RDMAServerSocketImpl "

RDMAServerSocketImpl::RDMAServerSocketImpl(
  CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w,
  entity_addr_t& a, unsigned slot)
  : ServerSocketImpl(a.get_type(), slot),
    cct(cct), net(cct), server_setup_socket(-1), infiniband(i),
    dispatcher(s), worker(w), sa(a)
{
}

int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
{
  int rc = 0;
  server_setup_socket = net.create_socket(sa.get_family(), true);
  if (server_setup_socket < 0) {
    rc = -errno;
    lderr(cct) << __func__ << " failed to create server socket: "
               << cpp_strerror(errno) << dendl;
    return rc;
  }

  rc = net.set_nonblock(server_setup_socket);
  if (rc < 0) {
    goto err;
  }

  rc = net.set_socket_options(server_setup_socket, opt.nodelay, opt.rcbuf_size);
  if (rc < 0) {
    goto err;
  }

  rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len());
  if (rc < 0) {
    rc = -errno;
    ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
                   << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;
    goto err;
  }

  rc = ::listen(server_setup_socket, cct->_conf->ms_tcp_listen_backlog);
  if (rc < 0) {
    rc = -errno;
    lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(errno) << dendl;
    goto err;
  }

  ldout(cct, 20) << __func__ << " bind to " << sa.get_sockaddr() << " on port " << sa.get_port()  << dendl;
  return 0;

err:
  ::close(server_setup_socket);
  server_setup_socket = -1;
  return rc;
}

int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)
{
  ldout(cct, 15) << __func__ << dendl;

  ceph_assert(sock);

  sockaddr_storage ss;
  socklen_t slen = sizeof(ss);
  int sd = accept_cloexec(server_setup_socket, (sockaddr*)&ss, &slen);
  if (sd < 0) {
    return -errno;
  }

  int r = net.set_nonblock(sd);
  if (r < 0) {
    ::close(sd);
    return -errno;
  }

  r = net.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
  if (r < 0) {
    ::close(sd);
    return -errno;
  }

  ceph_assert(NULL != out); //out should not be NULL in accept connection

  out->set_type(addr_type);
  out->set_sockaddr((sockaddr*)&ss);
  net.set_priority(sd, opt.priority, out->get_family());

  RDMAConnectedSocketImpl* server;
  //Worker* w = dispatcher->get_stack()->get_worker();
  server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w));
  server->set_accept_fd(sd);
  ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;
  std::unique_ptr<RDMAConnectedSocketImpl> csi(server);
  *sock = ConnectedSocket(std::move(csi));

  return 0;
}

void RDMAServerSocketImpl::abort_accept()
{
  if (server_setup_socket >= 0)
    ::close(server_setup_socket);
}