summaryrefslogtreecommitdiffstats
path: root/src/msg/async/Protocol.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/msg/async/Protocol.h')
-rw-r--r--src/msg/async/Protocol.h140
1 files changed, 140 insertions, 0 deletions
diff --git a/src/msg/async/Protocol.h b/src/msg/async/Protocol.h
new file mode 100644
index 00000000..cccba183
--- /dev/null
+++ b/src/msg/async/Protocol.h
@@ -0,0 +1,140 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef _MSG_ASYNC_PROTOCOL_
+#define _MSG_ASYNC_PROTOCOL_
+
+#include <list>
+#include <map>
+
+#include "AsyncConnection.h"
+#include "include/buffer.h"
+#include "include/msgr.h"
+
+/*
+ * Continuation Helper Classes
+ */
+
+#include <memory>
+#include <tuple>
+
+template <class C>
+class Ct {
+public:
+ virtual ~Ct() {}
+ virtual Ct<C> *call(C *foo) const = 0;
+};
+
+template <class C, typename... Args>
+class CtFun : public Ct<C> {
+private:
+ using fn_t = Ct<C> *(C::*)(Args...);
+ fn_t _f;
+ std::tuple<Args...> _params;
+
+ template <std::size_t... Is>
+ inline Ct<C> *_call(C *foo, std::index_sequence<Is...>) const {
+ return (foo->*_f)(std::get<Is>(_params)...);
+ }
+
+public:
+ CtFun(fn_t f) : _f(f) {}
+
+ inline void setParams(Args... args) { _params = std::make_tuple(args...); }
+ inline Ct<C> *call(C *foo) const override {
+ return _call(foo, std::index_sequence_for<Args...>());
+ }
+};
+
+using rx_buffer_t =
+ std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer>;
+
+template <class C>
+class CtRxNode : public Ct<C> {
+ using fn_t = Ct<C> *(C::*)(rx_buffer_t&&, int r);
+ fn_t _f;
+
+public:
+ mutable rx_buffer_t node;
+ int r;
+
+ CtRxNode(fn_t f) : _f(f) {}
+ void setParams(rx_buffer_t &&node, int r) {
+ this->node = std::move(node);
+ this->r = r;
+ }
+ inline Ct<C> *call(C *foo) const override {
+ return (foo->*_f)(std::move(node), r);
+ }
+};
+
+template <class C> using CONTINUATION_TYPE = CtFun<C>;
+template <class C> using CONTINUATION_TX_TYPE = CtFun<C, int>;
+template <class C> using CONTINUATION_RX_TYPE = CtFun<C, char*, int>;
+template <class C> using CONTINUATION_RXBPTR_TYPE = CtRxNode<C>;
+
+#define CONTINUATION_DECL(C, F, ...) \
+ CtFun<C, ##__VA_ARGS__> F##_cont { (&C::F) };
+
+#define CONTINUATION(F) F##_cont
+#define CONTINUE(F, ...) (F##_cont.setParams(__VA_ARGS__), &F##_cont)
+
+#define CONTINUATION_RUN(CT) \
+ { \
+ Ct<std::remove_reference<decltype(*this)>::type> *_cont = &CT;\
+ do { \
+ _cont = _cont->call(this); \
+ } while (_cont); \
+ }
+
+#define READ_HANDLER_CONTINUATION_DECL(C, F) \
+ CONTINUATION_DECL(C, F, char *, int)
+
+#define READ_BPTR_HANDLER_CONTINUATION_DECL(C, F) \
+ CtRxNode<C> F##_cont { (&C::F) };
+
+#define WRITE_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, int)
+
+//////////////////////////////////////////////////////////////////////
+
+class AsyncMessenger;
+
+class Protocol {
+public:
+ const int proto_type;
+protected:
+ AsyncConnection *connection;
+ AsyncMessenger *messenger;
+ CephContext *cct;
+public:
+ std::shared_ptr<AuthConnectionMeta> auth_meta;
+
+public:
+ Protocol(int type, AsyncConnection *connection);
+ virtual ~Protocol();
+
+ // prepare protocol for connecting to peer
+ virtual void connect() = 0;
+ // prepare protocol for accepting peer connections
+ virtual void accept() = 0;
+ // true -> protocol is ready for sending messages
+ virtual bool is_connected() = 0;
+ // stop connection
+ virtual void stop() = 0;
+ // signal and handle connection failure
+ virtual void fault() = 0;
+ // send message
+ virtual void send_message(Message *m) = 0;
+ // send keepalive
+ virtual void send_keepalive() = 0;
+
+ virtual void read_event() = 0;
+ virtual void write_event() = 0;
+ virtual bool is_queued() = 0;
+
+ int get_con_mode() const {
+ return auth_meta->con_mode;
+ }
+};
+
+#endif /* _MSG_ASYNC_PROTOCOL_ */