diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/crimson/net/ProtocolV2.h | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/net/ProtocolV2.h')
-rw-r--r-- | src/crimson/net/ProtocolV2.h | 225 |
1 files changed, 225 insertions, 0 deletions
diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h new file mode 100644 index 000000000..be9a22816 --- /dev/null +++ b/src/crimson/net/ProtocolV2.h @@ -0,0 +1,225 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/sleep.hh> + +#include "Protocol.h" +#include "msg/async/frames_v2.h" +#include "msg/async/crypto_onwire.h" + +namespace crimson::net { + +class ProtocolV2 final : public Protocol { + public: + ProtocolV2(ChainedDispatchers& dispatchers, + SocketConnection& conn, + SocketMessenger& messenger); + ~ProtocolV2() override; + void print(std::ostream&) const final; + private: + void on_closed() override; + bool is_connected() const override; + + void start_connect(const entity_addr_t& peer_addr, + const entity_name_t& peer_name) override; + + void start_accept(SocketRef&& socket, + const entity_addr_t& peer_addr) override; + + void trigger_close() override; + + ceph::bufferlist do_sweep_messages( + const std::deque<MessageRef>& msgs, + size_t num_msgs, + bool require_keepalive, + std::optional<utime_t> keepalive_ack, + bool require_ack) override; + + void notify_write() override; + + private: + SocketMessenger &messenger; + + enum class state_t { + NONE = 0, + ACCEPTING, + SERVER_WAIT, + ESTABLISHING, + CONNECTING, + READY, + STANDBY, + WAIT, + REPLACING, + CLOSING + }; + state_t state = state_t::NONE; + + static const char *get_state_name(state_t state) { + const char *const statenames[] = {"NONE", + "ACCEPTING", + "SERVER_WAIT", + "ESTABLISHING", + "CONNECTING", + "READY", + "STANDBY", + "WAIT", + "REPLACING", + "CLOSING"}; + return statenames[static_cast<int>(state)]; + } + + void trigger_state(state_t state, write_state_t write_state, bool reentrant); + + uint64_t connection_features = 0; + uint64_t peer_required_features = 0; + + uint64_t client_cookie = 0; + uint64_t server_cookie = 0; + uint64_t global_seq = 0; + uint64_t peer_global_seq = 0; + uint64_t connect_seq = 0; + + seastar::shared_future<> execution_done = seastar::now(); + + template <typename Func> + void gated_execute(const char* what, Func&& func) { + gate.dispatch_in_background(what, *this, [this, &func] { + execution_done = seastar::futurize_invoke(std::forward<Func>(func)); + return execution_done.get_future(); + }); + } + + class Timer { + double last_dur_ = 0.0; + const SocketConnection& conn; + std::optional<seastar::abort_source> as; + public: + Timer(SocketConnection& conn) : conn(conn) {} + double last_dur() const { return last_dur_; } + seastar::future<> backoff(double seconds); + void cancel() { + last_dur_ = 0.0; + if (as) { + as->request_abort(); + as = std::nullopt; + } + } + }; + Timer protocol_timer; + + // TODO: Frame related implementations, probably to a separate class. + private: + bool record_io = false; + ceph::bufferlist rxbuf; + ceph::bufferlist txbuf; + + void enable_recording(); + seastar::future<Socket::tmp_buf> read_exactly(size_t bytes); + seastar::future<bufferlist> read(size_t bytes); + seastar::future<> write(bufferlist&& buf); + seastar::future<> write_flush(bufferlist&& buf); + + ceph::crypto::onwire::rxtx_t session_stream_handlers; + ceph::msgr::v2::FrameAssembler tx_frame_asm{&session_stream_handlers, false}; + ceph::msgr::v2::FrameAssembler rx_frame_asm{&session_stream_handlers, false}; + ceph::bufferlist rx_preamble; + ceph::msgr::v2::segment_bls_t rx_segments_data; + + size_t get_current_msg_size() const; + seastar::future<ceph::msgr::v2::Tag> read_main_preamble(); + seastar::future<> read_frame_payload(); + template <class F> + seastar::future<> write_frame(F &frame, bool flush=true); + + private: + void fault(bool backoff, const char* func_name, std::exception_ptr eptr); + void reset_session(bool full); + seastar::future<std::tuple<entity_type_t, entity_addr_t>> + banner_exchange(bool is_connect); + + enum class next_step_t { + ready, + wait, + none, // protocol should have been aborted or failed + }; + + // CONNECTING (client) + seastar::future<> handle_auth_reply(); + inline seastar::future<> client_auth() { + std::vector<uint32_t> empty; + return client_auth(empty); + } + seastar::future<> client_auth(std::vector<uint32_t> &allowed_methods); + + seastar::future<next_step_t> process_wait(); + seastar::future<next_step_t> client_connect(); + seastar::future<next_step_t> client_reconnect(); + void execute_connecting(); + + // ACCEPTING (server) + seastar::future<> _auth_bad_method(int r); + seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more); + seastar::future<> server_auth(); + + bool validate_peer_name(const entity_name_t& peer_name) const; + seastar::future<next_step_t> send_wait(); + seastar::future<next_step_t> reuse_connection(ProtocolV2* existing_proto, + bool do_reset=false, + bool reconnect=false, + uint64_t conn_seq=0, + uint64_t msg_seq=0); + + seastar::future<next_step_t> handle_existing_connection(SocketConnectionRef existing_conn); + seastar::future<next_step_t> server_connect(); + + seastar::future<next_step_t> read_reconnect(); + seastar::future<next_step_t> send_retry(uint64_t connect_seq); + seastar::future<next_step_t> send_retry_global(uint64_t global_seq); + seastar::future<next_step_t> send_reset(bool full); + seastar::future<next_step_t> server_reconnect(); + + void execute_accepting(); + + // CONNECTING/ACCEPTING + seastar::future<> finish_auth(); + + // ESTABLISHING + void execute_establishing(SocketConnectionRef existing_conn, bool dispatch_reset); + + // ESTABLISHING/REPLACING (server) + seastar::future<> send_server_ident(); + + // REPLACING (server) + void trigger_replacing(bool reconnect, + bool do_reset, + SocketRef&& new_socket, + AuthConnectionMetaRef&& new_auth_meta, + ceph::crypto::onwire::rxtx_t new_rxtx, + uint64_t new_peer_global_seq, + // !reconnect + uint64_t new_client_cookie, + entity_name_t new_peer_name, + uint64_t new_conn_features, + bool tx_is_rev1, + bool rx_is_rev1, + // reconnect + uint64_t new_connect_seq, + uint64_t new_msg_seq); + + // READY + seastar::future<> read_message(utime_t throttle_stamp); + void execute_ready(bool dispatch_connect); + + // STANDBY + void execute_standby(); + + // WAIT + void execute_wait(bool max_backoff); + + // SERVER_WAIT + void execute_server_wait(); +}; + +} // namespace crimson::net |