summaryrefslogtreecommitdiffstats
path: root/src/msg/async/ProtocolV1.h
blob: b23860e8a015cf52b9296ded19385e33a9b5c3a1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#ifndef _MSG_ASYNC_PROTOCOL_V1_
#define _MSG_ASYNC_PROTOCOL_V1_

#include "Protocol.h"

class ProtocolV1;
using CtPtr = Ct<ProtocolV1>*;

class ProtocolV1 : public Protocol {
/*
 *  ProtocolV1 State Machine
 *

    send_server_banner                             send_client_banner
            |                                              |
            v                                              v
    wait_client_banner                              wait_server_banner
            |                                              |
            |                                              v
            v                                 handle_server_banner_and_identify
    wait_connect_message <---------\                       |
      |     |                      |                       v
      |  wait_connect_message_auth |           send_connect_message <----------\
      |     |                      |                       |                   |
      v     v                      |                       |                   |
handle_connect_message_2           |                       v                   |
        |           |              |            wait_connect_reply             |
        v           v              |              |        |                   |
     replace -> send_connect_message_reply        |        V                   |
        |                                         |   wait_connect_reply_auth  |
        |                                         |        |                   |
        v                                         v        v                   |
      open ---\                                 handle_connect_reply_2 --------/
        |     |                                            |
        |     v                                            v
        |   wait_seq                                  wait_ack_seq
        |     |                                            |
        v     v                                            v
    server_ready                                      client_ready
            |                                              |
            \------------------> wait_message <------------/
                                 |  ^   |  ^
        /------------------------/  |   |  |
        |                           |   |  \----------------- ------------\
        v                /----------/   v                                 |
handle_keepalive2        |        handle_message_header      read_message_footer
handle_keepalive2_ack    |              |                                 ^
handle_tag_ack           |              v                                 |
        |                |        throttle_message             read_message_data
        \----------------/              |                                 ^
                                        v                                 |
                             read_message_front --> read_message_middle --/
*/

protected:

  enum State {
    NONE = 0,
    START_CONNECT,
    CONNECTING,
    CONNECTING_WAIT_BANNER_AND_IDENTIFY,
    CONNECTING_SEND_CONNECT_MSG,
    START_ACCEPT,
    ACCEPTING,
    ACCEPTING_WAIT_CONNECT_MSG_AUTH,
    ACCEPTING_HANDLED_CONNECT_MSG,
    OPENED,
    THROTTLE_MESSAGE,
    THROTTLE_BYTES,
    THROTTLE_DISPATCH_QUEUE,
    READ_MESSAGE_FRONT,
    READ_FOOTER_AND_DISPATCH,
    CLOSED,
    WAIT,
    STANDBY
  };

  static const char *get_state_name(int state) {
    const char *const statenames[] = {"NONE",
                                      "START_CONNECT",
                                      "CONNECTING",
                                      "CONNECTING_WAIT_BANNER_AND_IDENTIFY",
                                      "CONNECTING_SEND_CONNECT_MSG",
                                      "START_ACCEPT",
                                      "ACCEPTING",
                                      "ACCEPTING_WAIT_CONNECT_MSG_AUTH",
                                      "ACCEPTING_HANDLED_CONNECT_MSG",
                                      "OPENED",
                                      "THROTTLE_MESSAGE",
                                      "THROTTLE_BYTES",
                                      "THROTTLE_DISPATCH_QUEUE",
                                      "READ_MESSAGE_FRONT",
                                      "READ_FOOTER_AND_DISPATCH",
                                      "CLOSED",
                                      "WAIT",
                                      "STANDBY"};
    return statenames[state];
  }

  char *temp_buffer;

  enum class WriteStatus { NOWRITE, REPLACING, CANWRITE, CLOSED };
  std::atomic<WriteStatus> can_write;
  std::list<Message *> sent;  // the first ceph::buffer::list need to inject seq
  // priority queue for outbound msgs
  std::map<int, std::list<std::pair<ceph::buffer::list, Message *>>> out_q;
  bool keepalive;
  bool write_in_progress = false;

  __u32 connect_seq, peer_global_seq;
  std::atomic<uint64_t> in_seq{0};
  std::atomic<uint64_t> out_seq{0};
  std::atomic<uint64_t> ack_left{0};

  std::shared_ptr<AuthSessionHandler> session_security;

  // Open state
  ceph_msg_connect connect_msg;
  ceph_msg_connect_reply connect_reply;
  ceph::buffer::list authorizer_buf;  // auth(orizer) payload read off the wire
  ceph::buffer::list authorizer_more;  // connect-side auth retry (we added challenge)

  utime_t backoff;  // backoff time
  utime_t recv_stamp;
  utime_t throttle_stamp;
  unsigned msg_left;
  uint64_t cur_msg_size;
  ceph_msg_header current_header;
  ceph::buffer::list data_buf;
  ceph::buffer::list::iterator data_blp;
  ceph::buffer::list front, middle, data;

  bool replacing;  // when replacing process happened, we will reply connect
                   // side with RETRY tag and accept side will clear replaced
                   // connection. So when connect side reissue connect_msg,
                   // there won't exists conflicting connection so we use
                   // "replacing" to skip RESETSESSION to avoid detect wrong
                   // presentation
  bool is_reset_from_peer;
  bool once_ready;

  State state;

  void run_continuation(CtPtr pcontinuation);
  CtPtr read(CONTINUATION_RX_TYPE<ProtocolV1> &next, int len,
             char *buffer = nullptr);
  CtPtr write(CONTINUATION_TX_TYPE<ProtocolV1> &next,ceph::buffer::list &bl);
  inline CtPtr _fault() {  // helper fault method that stops continuation
    fault();
    return nullptr;
  }

  CONTINUATION_DECL(ProtocolV1, wait_message);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_keepalive2);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_keepalive2_ack);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_tag_ack);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_header);
  CONTINUATION_DECL(ProtocolV1, throttle_message);
  CONTINUATION_DECL(ProtocolV1, throttle_bytes);
  CONTINUATION_DECL(ProtocolV1, throttle_dispatch_queue);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_front);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_middle);
  CONTINUATION_DECL(ProtocolV1, read_message_data);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_data);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_footer);

  CtPtr ready();
  CtPtr wait_message();
  CtPtr handle_message(char *buffer, int r);

  CtPtr handle_keepalive2(char *buffer, int r);
  void append_keepalive_or_ack(bool ack = false, utime_t *t = nullptr);
  CtPtr handle_keepalive2_ack(char *buffer, int r);
  CtPtr handle_tag_ack(char *buffer, int r);

  CtPtr handle_message_header(char *buffer, int r);
  CtPtr throttle_message();
  CtPtr throttle_bytes();
  CtPtr throttle_dispatch_queue();
  CtPtr read_message_front();
  CtPtr handle_message_front(char *buffer, int r);
  CtPtr read_message_middle();
  CtPtr handle_message_middle(char *buffer, int r);
  CtPtr read_message_data_prepare();
  CtPtr read_message_data();
  CtPtr handle_message_data(char *buffer, int r);
  CtPtr read_message_footer();
  CtPtr handle_message_footer(char *buffer, int r);

  void session_reset();
  void randomize_out_seq();

  Message *_get_next_outgoing(ceph::buffer::list *bl);

  void prepare_send_message(uint64_t features, Message *m, ceph::buffer::list &bl);
  ssize_t write_message(Message *m, ceph::buffer::list &bl, bool more);

  void requeue_sent();
  uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
  void discard_out_queue();

  void reset_recv_state();
  void reset_security();

  std::ostream& _conn_prefix(std::ostream *_dout);

public:
  ProtocolV1(AsyncConnection *connection);
  virtual ~ProtocolV1();

  virtual void connect() override;
  virtual void accept() override;
  virtual bool is_connected() override;
  virtual void stop() override;
  virtual void fault() override;
  virtual void send_message(Message *m) override;
  virtual void send_keepalive() override;

  virtual void read_event() override;
  virtual void write_event() override;
  virtual bool is_queued() override;

  // Client Protocol
private:
  int global_seq;

  CONTINUATION_DECL(ProtocolV1, send_client_banner);
  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_client_banner_write);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_server_banner_and_identify);
  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_my_addr_write);
  CONTINUATION_DECL(ProtocolV1, send_connect_message);
  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_write);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_reply_1);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_reply_auth);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_ack_seq);
  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_in_seq_write);

  CtPtr send_client_banner();
  CtPtr handle_client_banner_write(int r);
  CtPtr wait_server_banner();
  CtPtr handle_server_banner_and_identify(char *buffer, int r);
  CtPtr handle_my_addr_write(int r);
  CtPtr send_connect_message();
  CtPtr handle_connect_message_write(int r);
  CtPtr wait_connect_reply();
  CtPtr handle_connect_reply_1(char *buffer, int r);
  CtPtr wait_connect_reply_auth();
  CtPtr handle_connect_reply_auth(char *buffer, int r);
  CtPtr handle_connect_reply_2();
  CtPtr wait_ack_seq();
  CtPtr handle_ack_seq(char *buffer, int r);
  CtPtr handle_in_seq_write(int r);
  CtPtr client_ready();

  // Server Protocol
protected:
  bool wait_for_seq;

  CONTINUATION_DECL(ProtocolV1, send_server_banner);
  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_server_banner_write);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_client_banner);
  CONTINUATION_DECL(ProtocolV1, wait_connect_message);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_1);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_auth);
  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1,
                                  handle_connect_message_reply_write);
  WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1,
                                  handle_ready_connect_message_reply_write);
  READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_seq);

  CtPtr send_server_banner();
  CtPtr handle_server_banner_write(int r);
  CtPtr wait_client_banner();
  CtPtr handle_client_banner(char *buffer, int r);
  CtPtr wait_connect_message();
  CtPtr handle_connect_message_1(char *buffer, int r);
  CtPtr wait_connect_message_auth();
  CtPtr handle_connect_message_auth(char *buffer, int r);
  CtPtr handle_connect_message_2();
  CtPtr send_connect_message_reply(char tag, ceph_msg_connect_reply &reply,
                                   ceph::buffer::list &authorizer_reply);
  CtPtr handle_connect_message_reply_write(int r);
  CtPtr replace(const AsyncConnectionRef& existing, ceph_msg_connect_reply &reply,
                ceph::buffer::list &authorizer_reply);
  CtPtr open(ceph_msg_connect_reply &reply, ceph::buffer::list &authorizer_reply);
  CtPtr handle_ready_connect_message_reply_write(int r);
  CtPtr wait_seq();
  CtPtr handle_seq(char *buffer, int r);
  CtPtr server_ready();
};

class LoopbackProtocolV1 : public ProtocolV1 {
public:
  LoopbackProtocolV1(AsyncConnection *connection) : ProtocolV1(connection) {
    this->can_write = WriteStatus::CANWRITE;
  }
};

#endif /* _MSG_ASYNC_PROTOCOL_V1_ */