// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #ifndef _MSG_ASYNC_PROTOCOL_ #define _MSG_ASYNC_PROTOCOL_ #include #include #include "AsyncConnection.h" #include "include/buffer.h" #include "include/msgr.h" /* * Continuation Helper Classes */ #include #include template class Ct { public: virtual ~Ct() {} virtual Ct *call(C *foo) const = 0; }; template class CtFun : public Ct { private: using fn_t = Ct *(C::*)(Args...); fn_t _f; std::tuple _params; template inline Ct *_call(C *foo, std::index_sequence) const { return (foo->*_f)(std::get(_params)...); } public: CtFun(fn_t f) : _f(f) {} inline void setParams(Args... args) { _params = std::make_tuple(args...); } inline Ct *call(C *foo) const override { return _call(foo, std::index_sequence_for()); } }; using rx_buffer_t = std::unique_ptr; template class CtRxNode : public Ct { using fn_t = Ct *(C::*)(rx_buffer_t&&, int r); fn_t _f; public: mutable rx_buffer_t node; int r; CtRxNode(fn_t f) : _f(f) {} void setParams(rx_buffer_t &&node, int r) { this->node = std::move(node); this->r = r; } inline Ct *call(C *foo) const override { return (foo->*_f)(std::move(node), r); } }; template using CONTINUATION_TYPE = CtFun; template using CONTINUATION_TX_TYPE = CtFun; template using CONTINUATION_RX_TYPE = CtFun; template using CONTINUATION_RXBPTR_TYPE = CtRxNode; #define CONTINUATION_DECL(C, F, ...) \ CtFun F##_cont { (&C::F) }; #define CONTINUATION(F) F##_cont #define CONTINUE(F, ...) (F##_cont.setParams(__VA_ARGS__), &F##_cont) #define CONTINUATION_RUN(CT) \ { \ Ct::type> *_cont = &CT;\ do { \ _cont = _cont->call(this); \ } while (_cont); \ } #define READ_HANDLER_CONTINUATION_DECL(C, F) \ CONTINUATION_DECL(C, F, char *, int) #define READ_BPTR_HANDLER_CONTINUATION_DECL(C, F) \ CtRxNode F##_cont { (&C::F) }; #define WRITE_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, int) ////////////////////////////////////////////////////////////////////// class AsyncMessenger; class Protocol { public: const int proto_type; protected: AsyncConnection *connection; AsyncMessenger *messenger; CephContext *cct; public: std::shared_ptr auth_meta; public: Protocol(int type, AsyncConnection *connection); virtual ~Protocol(); // prepare protocol for connecting to peer virtual void connect() = 0; // prepare protocol for accepting peer connections virtual void accept() = 0; // true -> protocol is ready for sending messages virtual bool is_connected() = 0; // stop connection virtual void stop() = 0; // signal and handle connection failure virtual void fault() = 0; // send message virtual void send_message(Message *m) = 0; // send keepalive virtual void send_keepalive() = 0; virtual void read_event() = 0; virtual void write_event() = 0; virtual bool is_queued() = 0; int get_con_mode() const { return auth_meta->con_mode; } }; #endif /* _MSG_ASYNC_PROTOCOL_ */