summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/ProtocolV1.h
blob: ed6df895415fff08d5ac59c126d027d52015349b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#pragma once

#include "Protocol.h"

class AuthAuthorizer;
class AuthSessionHandler;

namespace crimson::net {

class ProtocolV1 final : public Protocol {
 public:
  ProtocolV1(ChainedDispatchers& dispatchers,
             SocketConnection& conn,
             SocketMessenger& messenger);
  ~ProtocolV1() override;
  void print(std::ostream&) const final;
 private:
  void on_closed() override;
  bool is_connected() const override;

  void start_connect(const entity_addr_t& peer_addr,
                     const entity_name_t& peer_name) override;

  void start_accept(SocketRef&& socket,
                    const entity_addr_t& peer_addr) override;

  void trigger_close() override;

  ceph::bufferlist do_sweep_messages(
      const std::deque<MessageRef>& msgs,
      size_t num_msgs,
      bool require_keepalive,
      std::optional<utime_t> keepalive_ack,
      bool require_ack) override;

 private:
  SocketMessenger &messenger;

  enum class state_t {
    none,
    accepting,
    connecting,
    open,
    standby,
    wait,
    closing
  };
  state_t state = state_t::none;

  // state for handshake
  struct Handshake {
    ceph_msg_connect connect;
    ceph_msg_connect_reply reply;
    ceph::bufferlist auth_payload;  // auth(orizer) payload read off the wire
    ceph::bufferlist auth_more;     // connect-side auth retry (we added challenge)
    std::chrono::milliseconds backoff;
    uint32_t connect_seq = 0;
    uint32_t peer_global_seq = 0;
    uint32_t global_seq;
  } h;

  std::unique_ptr<AuthSessionHandler> session_security;

  // state for an incoming message
  struct MessageReader {
    ceph_msg_header header;
    ceph_msg_footer footer;
    bufferlist front;
    bufferlist middle;
    bufferlist data;
  } m;

  struct Keepalive {
    struct {
      const char tag = CEPH_MSGR_TAG_KEEPALIVE2;
      ceph_timespec stamp;
    } __attribute__((packed)) req;
    struct {
      const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
      ceph_timespec stamp;
    } __attribute__((packed)) ack;
    ceph_timespec ack_stamp;
  } k;

 private:
  // connecting
  void reset_session();
  seastar::future<stop_t> handle_connect_reply(crimson::net::msgr_tag_t tag);
  seastar::future<stop_t> repeat_connect();
  ceph::bufferlist get_auth_payload();

  // accepting
  seastar::future<stop_t> send_connect_reply(
      msgr_tag_t tag, bufferlist&& authorizer_reply = {});
  seastar::future<stop_t> send_connect_reply_ready(
      msgr_tag_t tag, bufferlist&& authorizer_reply);
  seastar::future<stop_t> replace_existing(
      SocketConnectionRef existing,
      bufferlist&& authorizer_reply,
      bool is_reset_from_peer = false);
  seastar::future<stop_t> handle_connect_with_existing(
      SocketConnectionRef existing, bufferlist&& authorizer_reply);
  bool require_auth_feature() const;
  bool require_cephx_v2_feature() const;
  seastar::future<stop_t> repeat_handle_connect();

  // open
  seastar::future<> handle_keepalive2_ack();
  seastar::future<> handle_keepalive2();
  seastar::future<> handle_ack();
  seastar::future<> maybe_throttle();
  seastar::future<> read_message();
  seastar::future<> handle_tags();

  enum class open_t {
    connected,
    accepted
  };
  void execute_open(open_t type);

  // replacing
  // the number of connections initiated in this session, increment when a
  // new connection is established
  uint32_t connect_seq() const { return h.connect_seq; }
  // the client side should connect us with a gseq. it will be reset with
  // the one of exsting connection if it's greater.
  uint32_t peer_global_seq() const { return h.peer_global_seq; }
  // current state of ProtocolV1
  state_t get_state() const { return state; }

  seastar::future<> fault();
};

} // namespace crimson::net