summaryrefslogtreecommitdiffstats
path: root/src/crimson/net/SocketConnection.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/net/SocketConnection.h')
-rw-r--r--src/crimson/net/SocketConnection.h235
1 files changed, 235 insertions, 0 deletions
diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h
new file mode 100644
index 00000000..62cc77d5
--- /dev/null
+++ b/src/crimson/net/SocketConnection.h
@@ -0,0 +1,235 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/shared_future.hh>
+#include <seastar/core/sharded.hh>
+
+#include "msg/Policy.h"
+#include "Connection.h"
+#include "Socket.h"
+#include "crimson/thread/Throttle.h"
+
+class AuthAuthorizer;
+class AuthSessionHandler;
+
+namespace ceph::net {
+
+using stop_t = seastar::stop_iteration;
+
+class SocketMessenger;
+class SocketConnection;
+using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
+
+class SocketConnection : public Connection {
+ SocketMessenger& messenger;
+ seastar::foreign_ptr<std::unique_ptr<Socket>> socket;
+ Dispatcher& dispatcher;
+ seastar::gate pending_dispatch;
+
+ // if acceptor side, socket_port is different from peer_addr.get_port();
+ // if connector side, socket_port is different from my_addr.get_port().
+ enum class side_t {
+ none,
+ acceptor,
+ connector
+ };
+ side_t side = side_t::none;
+ uint16_t socket_port = 0;
+
+ enum class state_t {
+ none,
+ accepting,
+ connecting,
+ open,
+ standby,
+ wait,
+ closing
+ };
+ state_t state = state_t::none;
+
+ /// become valid only when state is state_t::closing
+ seastar::shared_future<> close_ready;
+
+ /// state for handshake
+ struct Handshake {
+ ceph_msg_connect connect;
+ ceph_msg_connect_reply reply;
+ std::unique_ptr<AuthAuthorizer> authorizer;
+ std::chrono::milliseconds backoff;
+ uint32_t connect_seq = 0;
+ uint32_t peer_global_seq = 0;
+ uint32_t global_seq;
+ seastar::promise<> promise;
+ } h;
+
+ /// server side of handshake negotiation
+ seastar::future<stop_t> repeat_handle_connect();
+ seastar::future<stop_t> handle_connect_with_existing(SocketConnectionRef existing,
+ bufferlist&& authorizer_reply);
+ seastar::future<stop_t> replace_existing(SocketConnectionRef existing,
+ bufferlist&& authorizer_reply,
+ bool is_reset_from_peer = false);
+ seastar::future<stop_t> send_connect_reply(ceph::net::msgr_tag_t tag,
+ bufferlist&& authorizer_reply = {});
+ seastar::future<stop_t> send_connect_reply_ready(ceph::net::msgr_tag_t tag,
+ bufferlist&& authorizer_reply);
+
+ seastar::future<> handle_keepalive2();
+ seastar::future<> handle_keepalive2_ack();
+
+ bool require_auth_feature() const;
+ uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
+ /// client side of handshake negotiation
+ seastar::future<stop_t> repeat_connect();
+ seastar::future<stop_t> handle_connect_reply(ceph::net::msgr_tag_t tag);
+ void reset_session();
+
+ /// state for an incoming message
+ struct MessageReader {
+ ceph_msg_header header;
+ ceph_msg_footer footer;
+ bufferlist front;
+ bufferlist middle;
+ bufferlist data;
+ } m;
+
+ seastar::future<> maybe_throttle();
+ seastar::future<> handle_tags();
+ seastar::future<> handle_ack();
+
+ /// becomes available when handshake completes, and when all previous messages
+ /// have been sent to the output stream. send() chains new messages as
+ /// continuations to this future to act as a queue
+ seastar::future<> send_ready;
+
+ /// encode/write a message
+ seastar::future<> write_message(MessageRef msg);
+
+ ceph::net::Policy<ceph::thread::Throttle> policy;
+ uint64_t features;
+ void set_features(uint64_t new_features) {
+ features = new_features;
+ }
+
+ /// the seq num of the last transmitted message
+ seq_num_t out_seq = 0;
+ /// the seq num of the last received message
+ seq_num_t in_seq = 0;
+ /// update the seq num of last received message
+ /// @returns true if the @c seq is valid, and @c in_seq is updated,
+ /// false otherwise.
+ bool update_rx_seq(seq_num_t seq);
+
+ seastar::future<> read_message();
+
+ std::unique_ptr<AuthSessionHandler> session_security;
+
+ // messages to be resent after connection gets reset
+ std::queue<MessageRef> out_q;
+ // messages sent, but not yet acked by peer
+ std::queue<MessageRef> sent;
+ static void discard_up_to(std::queue<MessageRef>*, seq_num_t);
+
+ struct Keepalive {
+ struct {
+ const char tag = CEPH_MSGR_TAG_KEEPALIVE2;
+ ceph_timespec stamp;
+ } __attribute__((packed)) req;
+ struct {
+ const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
+ ceph_timespec stamp;
+ } __attribute__((packed)) ack;
+ ceph_timespec ack_stamp;
+ } k;
+
+ seastar::future<> fault();
+
+ void execute_open();
+
+ seastar::future<> do_send(MessageRef msg);
+ seastar::future<> do_keepalive();
+ seastar::future<> do_close();
+
+ public:
+ SocketConnection(SocketMessenger& messenger,
+ Dispatcher& dispatcher);
+ ~SocketConnection();
+
+ Messenger* get_messenger() const override;
+
+ int get_peer_type() const override {
+ return peer_type;
+ }
+
+ seastar::future<bool> is_connected() override;
+
+ seastar::future<> send(MessageRef msg) override;
+
+ seastar::future<> keepalive() override;
+
+ seastar::future<> close() override;
+
+ seastar::shard_id shard_id() const override;
+
+ void print(ostream& out) const override;
+
+ public:
+ /// start a handshake from the client's perspective,
+ /// only call when SocketConnection first construct
+ void start_connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type);
+ /// start a handshake from the server's perspective,
+ /// only call when SocketConnection first construct
+ void start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& socket,
+ const entity_addr_t& peer_addr);
+
+ /// the number of connections initiated in this session, increment when a
+ /// new connection is established
+ uint32_t connect_seq() const {
+ return h.connect_seq;
+ }
+
+ /// the client side should connect us with a gseq. it will be reset with
+ /// the one of exsting connection if it's greater.
+ uint32_t peer_global_seq() const {
+ return h.peer_global_seq;
+ }
+ seq_num_t rx_seq_num() const {
+ return in_seq;
+ }
+
+ /// current state of connection
+ state_t get_state() const {
+ return state;
+ }
+ bool is_server_side() const {
+ return policy.server;
+ }
+ bool is_lossy() const {
+ return policy.lossy;
+ }
+
+ /// move all messages in the sent list back into the queue
+ void requeue_sent();
+
+ std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() {
+ return {out_seq, std::move(out_q)};
+ }
+};
+
+} // namespace ceph::net