diff options
Diffstat (limited to '')
-rw-r--r-- | src/msg/xio/XioConnection.h | 380 |
1 files changed, 380 insertions, 0 deletions
diff --git a/src/msg/xio/XioConnection.h b/src/msg/xio/XioConnection.h new file mode 100644 index 00000000..00024ef3 --- /dev/null +++ b/src/msg/xio/XioConnection.h @@ -0,0 +1,380 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Portions Copyright (C) 2013 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef XIO_CONNECTION_H +#define XIO_CONNECTION_H + +#include <atomic> + +#include <boost/intrusive/avl_set.hpp> +#include <boost/intrusive/list.hpp> + +extern "C" { +#include "libxio.h" +} + +#include "XioInSeq.h" +#include "XioSubmit.h" +#include "msg/Connection.h" +#include "msg/Messenger.h" +#include "auth/AuthSessionHandler.h" + +#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL) + + +#define XIO_NOP_TAG_MARKDOWN 0x0001 + +namespace bi = boost::intrusive; + +class XioPortal; +class XioMessenger; +class XioSend; + +class XioConnection : public Connection +{ +public: + enum type { ACTIVE, PASSIVE }; + + enum class session_states : unsigned { + INIT = 0, + START, + UP, + FLOW_CONTROLLED, + DISCONNECTED, + DELETED, + BARRIER + }; + + enum class session_startup_states : unsigned { + IDLE = 0, + CONNECTING, + ACCEPTING, + READY, + FAIL + }; + +private: + XioConnection::type xio_conn_type; + XioPortal *portal; + std::atomic<bool> connected = { false }; + entity_inst_t peer; + struct xio_session *session; + struct xio_connection *conn; + ceph::util::spinlock sp; + std::atomic<int64_t> send = { 0 }; + std::atomic<int64_t> recv = { 0 }; + uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials) + uint32_t magic; + uint32_t special_handling; + uint64_t scount; + uint32_t send_ctr; + int q_high_mark; + int q_low_mark; + + struct lifecycle { + // different from Pipe states? + enum lf_state { + INIT, + LOCAL_DISCON, + REMOTE_DISCON, + RECONNECTING, + UP, + DEAD } state; + + /* XXX */ + uint32_t reconnects; + uint32_t connect_seq, peer_global_seq; + uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt + std::atomic<int64_t> out_seq = { 0 }; + + lifecycle() : state(lifecycle::INIT), reconnects(0), connect_seq(0), + peer_global_seq(0), in_seq(0), out_seq_acked(0) + {} + + void set_in_seq(uint64_t seq) { + in_seq = seq; + } + + uint64_t next_out_seq() { + return ++out_seq; + } + + } state; + + /* batching */ + XioInSeq in_seq; + + class CState + { + public: + static const int FLAG_NONE = 0x0000; + static const int FLAG_BAD_AUTH = 0x0001; + static const int FLAG_MAPPED = 0x0002; + static const int FLAG_RESET = 0x0004; + + static const int OP_FLAG_NONE = 0x0000; + static const int OP_FLAG_LOCKED = 0x0001; + static const int OP_FLAG_LRU = 0x0002; + + uint64_t features; + Messenger::Policy policy; + + CryptoKey session_key; + std::shared_ptr<AuthSessionHandler> session_security; + AuthAuthorizer *authorizer; + XioConnection *xcon; + uint32_t protocol_version; + + std::atomic<session_states> session_state = { 0 }; + std::atomic<session_startup_state> startup_state = { 0 }; + + uint32_t reconnects; + uint32_t connect_seq, global_seq, peer_global_seq; + uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt + std::atomic<uint64_t> out_seq = { 0 }; + + uint32_t flags; + + explicit CState(XioConnection* _xcon) + : features(0), + authorizer(NULL), + xcon(_xcon), + protocol_version(0), + session_state(INIT), + startup_state(IDLE), + reconnects(0), + connect_seq(0), + global_seq(0), + peer_global_seq(0), + in_seq(0), + out_seq_acked(0), + flags(FLAG_NONE) {} + + uint64_t get_session_state() { + return session_state; + } + + uint64_t get_startup_state() { + return startup_state; + } + + void set_in_seq(uint64_t seq) { + in_seq = seq; + } + + uint64_t next_out_seq() { + return ++out_seq; + }; + + // state machine + int init_state(); + int next_state(Message* m); +#if 0 // future (session startup) + int msg_connect(MConnect *m); + int msg_connect_reply(MConnectReply *m); + int msg_connect_reply(MConnectAuthReply *m); + int msg_connect_auth(MConnectAuth *m); + int msg_connect_auth_reply(MConnectAuthReply *m); +#endif + int state_up_ready(uint32_t flags); + int state_flow_controlled(uint32_t flags); + int state_discon(); + int state_fail(Message* m, uint32_t flags); + + } cstate; /* CState */ + + // message submission queue + struct SendQ { + bool keepalive; + bool ack; + utime_t ack_time; + Message::Queue mqueue; // deferred + XioSubmit::Queue requeue; + + SendQ():keepalive(false), ack(false){} + } outgoing; + + // conns_entity_map comparison functor + struct EntityComp + { + // for internal ordering + bool operator()(const XioConnection &lhs, const XioConnection &rhs) const + { return lhs.get_peer() < rhs.get_peer(); } + + // for external search by entity_inst_t(peer) + bool operator()(const entity_inst_t &peer, const XioConnection &c) const + { return peer < c.get_peer(); } + + bool operator()(const XioConnection &c, const entity_inst_t &peer) const + { return c.get_peer() < peer; } + }; + + bi::list_member_hook<> conns_hook; + bi::avl_set_member_hook<> conns_entity_map_hook; + + typedef bi::list< XioConnection, + bi::member_hook<XioConnection, bi::list_member_hook<>, + &XioConnection::conns_hook > > ConnList; + + typedef bi::member_hook<XioConnection, bi::avl_set_member_hook<>, + &XioConnection::conns_entity_map_hook> EntityHook; + + typedef bi::avl_set< XioConnection, EntityHook, + bi::compare<EntityComp> > EntitySet; + + friend class XioPortal; + friend class XioMessenger; + friend class XioDispatchHook; + friend class XioMarkDownHook; + friend class XioSend; + + int on_disconnect_event() { + std::lock_guard<ceph::spinlock> lg(sp); + + connected = false; + discard_out_queues(CState::OP_FLAG_LOCKED); + + return 0; + } + + int on_teardown_event() { + + { + std::lock_guard<ceph::spinlock> lg(sp); + + if (conn) + xio_connection_destroy(conn); + conn = NULL; + } + + this->put(); + return 0; + } + + int xio_qdepth_high_mark() { + return q_high_mark; + } + + int xio_qdepth_low_mark() { + return q_low_mark; + } + +public: + XioConnection(XioMessenger *m, XioConnection::type _type, + const entity_inst_t& peer); + + ~XioConnection() { + if (conn) + xio_connection_destroy(conn); + } + ostream& conn_prefix(std::ostream *_dout); + + bool is_connected() override { return connected; } + + int send_message(Message *m) override; + void send_keepalive() override {send_keepalive_or_ack();} + void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr); + void mark_down() override; + int _mark_down(uint32_t flags); + void mark_disposable() override; + int _mark_disposable(uint32_t flags); + + const entity_inst_t& get_peer() const { return peer; } + + XioConnection* get() { +#if 0 + cout << "XioConnection::get " << this << " " << nref.load() << std::endl; +#endif + RefCountedObject::get(); + return this; + } + + void put() { + RefCountedObject::put(); +#if 0 + cout << "XioConnection::put " << this << " " << nref.load() << std::endl; +#endif + } + + void disconnect() { + if (is_connected()) { + connected = false; + xio_disconnect(conn); // normal teardown will clean up conn + } + } + + uint32_t get_magic() { return magic; } + void set_magic(int _magic) { magic = _magic; } + uint32_t get_special_handling() { return special_handling; } + void set_special_handling(int n) { special_handling = n; } + uint64_t get_scount() { return scount; } + + int passive_setup(); /* XXX */ + + int handle_data_msg(struct xio_session *session, struct xio_msg *msg, + int more_in_batch, void *cb_user_context); + int on_msg(struct xio_session *session, struct xio_msg *msg, + int more_in_batch, void *cb_user_context); + int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg, + void *conn_user_context); + int on_msg_error(struct xio_session *session, enum xio_status error, + struct xio_msg *msg, void *conn_user_context); + void msg_send_fail(XioSend *xsend, int code); + void msg_release_fail(struct xio_msg *msg, int code); +private: + void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr); + int flush_out_queues(uint32_t flags); + int discard_out_queues(uint32_t flags); + int adjust_clru(uint32_t flags); +}; + +typedef boost::intrusive_ptr<XioConnection> XioConnectionRef; + +class XioLoopbackConnection : public Connection +{ +private: + std::atomic<uint64_t> seq = { 0 }; +public: + explicit XioLoopbackConnection(Messenger *m) : Connection(m->cct, m) + { + const entity_inst_t& m_inst = m->get_myinst(); + peer_addr = m_inst.addr; + peer_type = m_inst.name.type(); + set_features(XIO_ALL_FEATURES); /* XXXX set to ours */ + } + + XioLoopbackConnection* get() { + return static_cast<XioLoopbackConnection*>(RefCountedObject::get()); + } + + bool is_connected() override { return true; } + + int send_message(Message *m) override; + void send_keepalive() override; + void mark_down() override {} + void mark_disposable() override {} + + uint64_t get_seq() { + return seq; + } + + uint64_t next_seq() { + return ++seq; + } +}; + +typedef boost::intrusive_ptr<XioLoopbackConnection> XioLoopbackConnectionRef; + +#endif /* XIO_CONNECTION_H */ |