summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/Protocol.h
blob: dc4e4f2af8f33c188740fd10b6bae8a267191f8d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#pragma once

#include <seastar/core/gate.hh>
#include <seastar/core/shared_future.hh>

#include "crimson/common/gated.h"
#include "crimson/common/log.h"
#include "Fwd.h"
#include "SocketConnection.h"

namespace crimson::net {

class Protocol {
 public:
  enum class proto_t {
    none,
    v1,
    v2
  };

  Protocol(Protocol&&) = delete;
  virtual ~Protocol();

  virtual bool is_connected() const = 0;

#ifdef UNIT_TESTS_BUILT
  bool is_closed_clean = false;
  bool is_closed() const { return closed; }
#endif

  // Reentrant closing
  void close(bool dispatch_reset, std::optional<std::function<void()>> f_accept_new=std::nullopt);
  seastar::future<> close_clean(bool dispatch_reset) {
    close(dispatch_reset);
    // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
    // which will otherwise result in deadlock
    assert(close_ready.valid());
    return close_ready.get_future();
  }

  virtual void start_connect(const entity_addr_t& peer_addr,
                             const entity_name_t& peer_name) = 0;

  virtual void start_accept(SocketRef&& socket,
                            const entity_addr_t& peer_addr) = 0;

  virtual void print(std::ostream&) const = 0;
 protected:
  Protocol(proto_t type,
           ChainedDispatchers& dispatchers,
           SocketConnection& conn);

  virtual void trigger_close() = 0;

  virtual ceph::bufferlist do_sweep_messages(
      const std::deque<MessageRef>& msgs,
      size_t num_msgs,
      bool require_keepalive,
      std::optional<utime_t> keepalive_ack,
      bool require_ack) = 0;

  virtual void notify_write() {};

  virtual void on_closed() {}

 public:
  const proto_t proto_type;
  SocketRef socket;

 protected:
  ChainedDispatchers& dispatchers;
  SocketConnection &conn;

  AuthConnectionMetaRef auth_meta;

 private:
  bool closed = false;
  // become valid only after closed == true
  seastar::shared_future<> close_ready;

// the write state-machine
 public:
  seastar::future<> send(MessageRef msg);
  seastar::future<> keepalive();

// TODO: encapsulate a SessionedSender class
 protected:
  // write_state is changed with state atomically, indicating the write
  // behavior of the according state.
  enum class write_state_t : uint8_t {
    none,
    delay,
    open,
    drop
  };

  static const char* get_state_name(write_state_t state) {
    uint8_t index = static_cast<uint8_t>(state);
    static const char *const state_names[] = {"none",
                                              "delay",
                                              "open",
                                              "drop"};
    assert(index < std::size(state_names));
    return state_names[index];
  }

  void set_write_state(const write_state_t& state) {
    if (write_state == write_state_t::open &&
        state != write_state_t::open &&
        write_dispatching) {
      exit_open = seastar::shared_promise<>();
    }
    write_state = state;
    state_changed.set_value();
    state_changed = seastar::shared_promise<>();
  }

  seastar::future<> wait_write_exit() {
    if (exit_open) {
      return exit_open->get_shared_future();
    }
    return seastar::now();
  }

  void notify_keepalive_ack(utime_t keepalive_ack);

  void notify_ack();

  void requeue_up_to(seq_num_t seq);

  void requeue_sent();

  void reset_write();

  bool is_queued() const {
    return (!conn.out_q.empty() ||
            ack_left > 0 ||
            need_keepalive ||
            keepalive_ack.has_value());
  }

  void ack_writes(seq_num_t seq);
  crimson::common::Gated gate;

 private:
  write_state_t write_state = write_state_t::none;
  // wait until current state changed
  seastar::shared_promise<> state_changed;

  bool need_keepalive = false;
  std::optional<utime_t> keepalive_ack = std::nullopt;
  uint64_t ack_left = 0;
  bool write_dispatching = false;
  // If another continuation is trying to close or replace socket when
  // write_dispatching is true and write_state is open,
  // it needs to wait for exit_open until writing is stopped or failed.
  std::optional<seastar::shared_promise<>> exit_open;

  seastar::future<stop_t> try_exit_sweep();
  seastar::future<> do_write_dispatch_sweep();
  void write_event();
};

inline std::ostream& operator<<(std::ostream& out, const Protocol& proto) {
  proto.print(out);
  return out;
}


} // namespace crimson::net