summaryrefslogtreecommitdiffstats
path: root/src/msg/async/Stack.h
blob: 6739968f4e2b7667e81a1124fcfa6a9c7f942e65 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2016 XSKY <haomai@xsky.com>
 *
 * Author: Haomai Wang <haomaiwang@gmail.com>
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation.  See file COPYING.
 *
 */

#ifndef CEPH_MSG_ASYNC_STACK_H
#define CEPH_MSG_ASYNC_STACK_H

#include "common/perf_counters.h"
#include "common/perf_counters_key.h"
#include "include/spinlock.h"
#include "msg/async/Event.h"
#include "msg/msg_types.h"
#include <string>

class Worker;
class ConnectedSocketImpl {
 public:
  virtual ~ConnectedSocketImpl() {}
  virtual int is_connected() = 0;
  virtual ssize_t read(char*, size_t) = 0;
  virtual ssize_t send(ceph::buffer::list &bl, bool more) = 0;
  virtual void shutdown() = 0;
  virtual void close() = 0;
  virtual int fd() const = 0;
  virtual void set_priority(int sd, int prio, int domain) = 0;
};

class ConnectedSocket;
struct SocketOptions {
  bool nonblock = true;
  bool nodelay = true;
  int rcbuf_size = 0;
  int priority = -1;
  entity_addr_t connect_bind_addr;
};

/// \cond internal
class ServerSocketImpl {
 public:
  unsigned addr_type; ///< entity_addr_t::TYPE_*
  unsigned addr_slot; ///< position of our addr in myaddrs().v
  ServerSocketImpl(unsigned type, unsigned slot)
    : addr_type(type), addr_slot(slot) {}
  virtual ~ServerSocketImpl() {}
  virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0;
  virtual void abort_accept() = 0;
  /// Get file descriptor
  virtual int fd() const = 0;
};
/// \endcond

/// \addtogroup networking-module
/// @{

/// A TCP (or other stream-based protocol) connection.
///
/// A \c ConnectedSocket represents a full-duplex stream between
/// two endpoints, a local endpoint and a remote endpoint.
class ConnectedSocket {
  std::unique_ptr<ConnectedSocketImpl> _csi;

 public:
  /// Constructs a \c ConnectedSocket not corresponding to a connection
  ConnectedSocket() {};
  /// \cond internal
  explicit ConnectedSocket(std::unique_ptr<ConnectedSocketImpl> csi)
      : _csi(std::move(csi)) {}
  /// \endcond
   ~ConnectedSocket() {
    if (_csi)
      _csi->close();
  }
  /// Moves a \c ConnectedSocket object.
  ConnectedSocket(ConnectedSocket&& cs) = default;
  /// Move-assigns a \c ConnectedSocket object.
  ConnectedSocket& operator=(ConnectedSocket&& cs) = default;

  int is_connected() {
    return _csi->is_connected();
  }
  /// Read the input stream with copy.
  ///
  /// Copy an object returning data sent from the remote endpoint.
  ssize_t read(char* buf, size_t len) {
    return _csi->read(buf, len);
  }
  /// Gets the output stream.
  ///
  /// Gets an object that sends data to the remote endpoint.
  ssize_t send(ceph::buffer::list &bl, bool more) {
    return _csi->send(bl, more);
  }
  /// Disables output to the socket.
  ///
  /// Current or future writes that have not been successfully flushed
  /// will immediately fail with an error.  This is useful to abort
  /// operations on a socket that is not making progress due to a
  /// peer failure.
  void shutdown() {
    return _csi->shutdown();
  }
  /// Disables input from the socket.
  ///
  /// Current or future reads will immediately fail with an error.
  /// This is useful to abort operations on a socket that is not making
  /// progress due to a peer failure.
  void close() {
    _csi->close();
    _csi.reset();
  }

  /// Get file descriptor
  int fd() const {
    return _csi->fd();
  }

  void set_priority(int sd, int prio, int domain) {
    _csi->set_priority(sd, prio, domain);
  }

  explicit operator bool() const {
    return _csi.get();
  }
};
/// @}

/// \addtogroup networking-module
/// @{

/// A listening socket, waiting to accept incoming network connections.
class ServerSocket {
  std::unique_ptr<ServerSocketImpl> _ssi;
 public:
  /// Constructs a \c ServerSocket not corresponding to a connection
  ServerSocket() {}
  /// \cond internal
  explicit ServerSocket(std::unique_ptr<ServerSocketImpl> ssi)
      : _ssi(std::move(ssi)) {}
  ~ServerSocket() {
    if (_ssi)
      _ssi->abort_accept();
  }
  /// \endcond
  /// Moves a \c ServerSocket object.
  ServerSocket(ServerSocket&& ss) = default;
  /// Move-assigns a \c ServerSocket object.
  ServerSocket& operator=(ServerSocket&& cs) = default;

  /// Accepts the next connection to successfully connect to this socket.
  ///
  /// \Accepts a \ref ConnectedSocket representing the connection, and
  ///          a \ref entity_addr_t describing the remote endpoint.
  int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
    return _ssi->accept(sock, opt, out, w);
  }

  /// Stops any \ref accept() in progress.
  ///
  /// Current and future \ref accept() calls will terminate immediately
  /// with an error.
  void abort_accept() {
    _ssi->abort_accept();
    _ssi.reset();
  }

  /// Get file descriptor
  int fd() const {
    return _ssi->fd();
  }

  /// get listen/bind addr
  unsigned get_addr_slot() {
    return _ssi->addr_slot;
  }

  explicit operator bool() const {
    return _ssi.get();
  }
};
/// @}

class NetworkStack;

enum {
  l_msgr_first = 94000,
  l_msgr_recv_messages,
  l_msgr_send_messages,
  l_msgr_recv_bytes,
  l_msgr_send_bytes,
  l_msgr_created_connections,
  l_msgr_active_connections,

  l_msgr_running_total_time,
  l_msgr_running_send_time,
  l_msgr_running_recv_time,
  l_msgr_running_fast_dispatch_time,

  l_msgr_send_messages_queue_lat,
  l_msgr_handle_ack_lat,

  l_msgr_recv_encrypted_bytes,
  l_msgr_send_encrypted_bytes,

  l_msgr_last,
};

enum {
  l_msgr_labeled_first = l_msgr_last + 1,

  l_msgr_connection_ready_timeouts,
  l_msgr_connection_idle_timeouts,

  l_msgr_labeled_last,
};

class Worker {
  std::mutex init_lock;
  std::condition_variable init_cond;
  bool init = false;

 public:
  bool done = false;

  CephContext *cct;
  PerfCounters *perf_logger;
  PerfCounters *perf_labeled_logger;
  unsigned id;

  std::atomic_uint references;
  EventCenter center;

  Worker(const Worker&) = delete;
  Worker& operator=(const Worker&) = delete;

  Worker(CephContext *c, unsigned worker_id)
    : cct(c), id(worker_id), references(0), center(c) {
    char name[128];
    char name_prefix[] = "AsyncMessenger::Worker";
    sprintf(name, "%s-%u", name_prefix, id);

    // initialize perf_logger
    PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);

    plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
    plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
    plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes", NULL, 0, unit_t(UNIT_BYTES));
    plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network sent bytes", NULL, 0, unit_t(UNIT_BYTES));
    plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
    plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");

    plb.add_time(l_msgr_running_total_time, "msgr_running_total_time", "The total time of thread running");
    plb.add_time(l_msgr_running_send_time, "msgr_running_send_time", "The total time of message sending");
    plb.add_time(l_msgr_running_recv_time, "msgr_running_recv_time", "The total time of message receiving");
    plb.add_time(l_msgr_running_fast_dispatch_time, "msgr_running_fast_dispatch_time", "The total time of fast dispatch");

    plb.add_time_avg(l_msgr_send_messages_queue_lat, "msgr_send_messages_queue_lat", "Network sent messages lat");
    plb.add_time_avg(l_msgr_handle_ack_lat, "msgr_handle_ack_lat", "Connection handle ack lat");

    plb.add_u64_counter(l_msgr_recv_encrypted_bytes, "msgr_recv_encrypted_bytes", "Network received encrypted bytes", NULL, 0, unit_t(UNIT_BYTES));
    plb.add_u64_counter(l_msgr_send_encrypted_bytes, "msgr_send_encrypted_bytes", "Network sent encrypted bytes", NULL, 0, unit_t(UNIT_BYTES));

    perf_logger = plb.create_perf_counters();
    cct->get_perfcounters_collection()->add(perf_logger);

    // Add labeled perfcounters
    std::string labels = ceph::perf_counters::key_create(
        name_prefix, {{"id", std::to_string(id)}});
    PerfCountersBuilder plb_labeled(
        cct, labels, l_msgr_labeled_first,
        l_msgr_labeled_last);

    plb_labeled.add_u64_counter(
        l_msgr_connection_ready_timeouts, "msgr_connection_ready_timeouts",
        "Number of not yet ready connections declared as dead", NULL,
        PerfCountersBuilder::PRIO_USEFUL);
    plb_labeled.add_u64_counter(
        l_msgr_connection_idle_timeouts, "msgr_connection_idle_timeouts",
        "Number of connections closed due to idleness", NULL,
        PerfCountersBuilder::PRIO_USEFUL);

    perf_labeled_logger = plb_labeled.create_perf_counters();
    cct->get_perfcounters_collection()->add(perf_labeled_logger);
  }
  virtual ~Worker() {
    if (perf_logger) {
      cct->get_perfcounters_collection()->remove(perf_logger);
      delete perf_logger;
    }
    if (perf_labeled_logger) {
      cct->get_perfcounters_collection()->remove(perf_labeled_logger);
      delete perf_labeled_logger;
    }
  }

  virtual int listen(entity_addr_t &addr, unsigned addr_slot,
                     const SocketOptions &opts, ServerSocket *) = 0;
  virtual int connect(const entity_addr_t &addr,
                      const SocketOptions &opts, ConnectedSocket *socket) = 0;
  virtual void destroy() {}

  virtual void initialize() {}
  PerfCounters *get_perf_counter() { return perf_logger; }
  PerfCounters *get_labeled_perf_counter() { return perf_labeled_logger; }
  void release_worker() {
    int oldref = references.fetch_sub(1);
    ceph_assert(oldref > 0);
  }
  void init_done() {
    init_lock.lock();
    init = true;
    init_cond.notify_all();
    init_lock.unlock();
  }
  bool is_init() {
    std::lock_guard<std::mutex> l(init_lock);
    return init;
  }
  void wait_for_init() {
    std::unique_lock<std::mutex> l(init_lock);
    while (!init)
      init_cond.wait(l);
  }
  void reset() {
    init_lock.lock();
    init = false;
    init_cond.notify_all();
    init_lock.unlock();
    done = false;
  }
};

class NetworkStack {
  ceph::spinlock pool_spin;
  bool started = false;

  std::function<void ()> add_thread(Worker* w);

  virtual Worker* create_worker(CephContext *c, unsigned i) = 0;
  virtual void rename_thread(unsigned id) {
    static constexpr int TASK_COMM_LEN = 16;
    char tp_name[TASK_COMM_LEN];
    sprintf(tp_name, "msgr-worker-%u", id);
    ceph_pthread_setname(pthread_self(), tp_name);
  }

 protected:
  CephContext *cct;
  std::vector<Worker*> workers;

  explicit NetworkStack(CephContext *c);
 public:
  NetworkStack(const NetworkStack &) = delete;
  NetworkStack& operator=(const NetworkStack &) = delete;
  virtual ~NetworkStack() {
    for (auto &&w : workers)
      delete w;
  }

  static std::shared_ptr<NetworkStack> create(
    CephContext *c, const std::string &type);

  // backend need to override this method if backend doesn't support shared
  // listen table.
  // For example, posix backend has in kernel global listen table. If one
  // thread bind a port, other threads also aware this.
  // But for dpdk backend, we maintain listen table in each thread. So we
  // need to let each thread do binding port.
  virtual bool support_local_listen_table() const { return false; }
  virtual bool nonblock_connect_need_writable_event() const { return true; }

  void start();
  void stop();
  virtual Worker *get_worker();
  Worker *get_worker(unsigned worker_id) {
    return workers[worker_id];
  }
  void drain();
  unsigned get_num_worker() const {
    return workers.size();
  }

  // direct is used in tests only
  virtual void spawn_worker(std::function<void ()> &&) = 0;
  virtual void join_worker(unsigned i) = 0;

  virtual bool is_ready() { return true; };
  virtual void ready() { };
};

#endif //CEPH_MSG_ASYNC_STACK_H