diff options
Diffstat (limited to 'src/msg/xio/XioMsg.h')
-rw-r--r-- | src/msg/xio/XioMsg.h | 446 |
1 files changed, 446 insertions, 0 deletions
diff --git a/src/msg/xio/XioMsg.h b/src/msg/xio/XioMsg.h new file mode 100644 index 00000000..2f0c8490 --- /dev/null +++ b/src/msg/xio/XioMsg.h @@ -0,0 +1,446 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * Portions Copyright (C) 2013 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef XIO_MSG_H +#define XIO_MSG_H + +#include <boost/intrusive/list.hpp> +#include "msg/SimplePolicyMessenger.h" +extern "C" { +#include "libxio.h" +} +#include "XioConnection.h" +#include "XioSubmit.h" +#include "msg/msg_types.h" +#include "XioPool.h" + +namespace bi = boost::intrusive; + +class XioMessenger; + +class XioMsgCnt +{ +public: + ceph_le32 msg_cnt; + buffer::list bl; +public: + explicit XioMsgCnt(buffer::ptr p) + { + bl.append(p); + buffer::list::iterator bl_iter = bl.begin(); + decode(msg_cnt, bl_iter); + } +}; + +class XioMsgHdr +{ +public: + char tag; + ceph_le32 msg_cnt; + ceph_le32 peer_type; + entity_addr_t addr; /* XXX hack! */ + ceph_msg_header* hdr; + ceph_msg_footer* ftr; + uint64_t features; + buffer::list bl; +public: + XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer& _ftr, uint64_t _features) + : tag(CEPH_MSGR_TAG_MSG), msg_cnt(init_le32(0)), hdr(&_hdr), ftr(&_ftr), + features(_features) + { } + + XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer &_ftr, buffer::ptr p) + : hdr(&_hdr), ftr(&_ftr) + { + bl.append(p); + buffer::list::iterator bl_iter = bl.begin(); + decode(bl_iter); + } + + static size_t get_max_encoded_length(); + + const buffer::list& get_bl() { encode(bl); return bl; }; + + inline void encode_hdr(ceph::buffer::list& bl) const { + using ceph::encode; + encode(tag, bl); + encode(msg_cnt, bl); + encode(peer_type, bl); + encode(addr, bl, features); + encode(hdr->seq, bl); + encode(hdr->tid, bl); + encode(hdr->type, bl); + encode(hdr->priority, bl); + encode(hdr->version, bl); + encode(hdr->front_len, bl); + encode(hdr->middle_len, bl); + encode(hdr->data_len, bl); + encode(hdr->data_off, bl); + encode(hdr->src.type, bl); + encode(hdr->src.num, bl); + encode(hdr->compat_version, bl); + encode(hdr->crc, bl); + } + + inline void encode_ftr(buffer::list& bl) const { + using ceph::encode; + encode(ftr->front_crc, bl); + encode(ftr->middle_crc, bl); + encode(ftr->data_crc, bl); + encode(ftr->sig, bl); + encode(ftr->flags, bl); + } + + inline void encode(buffer::list& bl) const { + encode_hdr(bl); + encode_ftr(bl); + } + + inline void decode_hdr(buffer::list::iterator& bl) { + using ceph::decode; + decode(tag, bl); + decode(msg_cnt, bl); + decode(peer_type, bl); + decode(addr, bl); + decode(hdr->seq, bl); + decode(hdr->tid, bl); + decode(hdr->type, bl); + decode(hdr->priority, bl); + decode(hdr->version, bl); + decode(hdr->front_len, bl); + decode(hdr->middle_len, bl); + decode(hdr->data_len, bl); + decode(hdr->data_off, bl); + decode(hdr->src.type, bl); + decode(hdr->src.num, bl); + decode(hdr->compat_version, bl); + decode(hdr->crc, bl); + } + + inline void decode_ftr(buffer::list::iterator& bl) { + using ceph::decode; + decode(ftr->front_crc, bl); + decode(ftr->middle_crc, bl); + decode(ftr->data_crc, bl); + decode(ftr->sig, bl); + decode(ftr->flags, bl); + } + + inline void decode(buffer::list::iterator& bl) { + decode_hdr(bl); + decode_ftr(bl); + } + + virtual ~XioMsgHdr() + {} +}; + +WRITE_CLASS_ENCODER(XioMsgHdr); + +extern struct xio_mempool *xio_msgr_noreg_mpool; + +#define XIO_MSGR_IOVLEN 16 + +struct xio_msg_ex +{ + struct xio_msg msg; + struct xio_iovec_ex iovs[XIO_MSGR_IOVLEN]; + + explicit xio_msg_ex(void* user_context) { + // go in structure order + msg.in.header.iov_len = 0; + msg.in.header.iov_base = NULL; /* XXX Accelio requires this currently */ + + msg.in.sgl_type = XIO_SGL_TYPE_IOV_PTR; + msg.in.pdata_iov.max_nents = XIO_MSGR_IOVLEN; + msg.in.pdata_iov.nents = 0; + msg.in.pdata_iov.sglist = iovs; + + // minimal zero "out" side + msg.out.header.iov_len = 0; + msg.out.header.iov_base = NULL; /* XXX Accelio requires this currently, + * against spec */ + // out (some members adjusted later) + msg.out.sgl_type = XIO_SGL_TYPE_IOV_PTR; + msg.out.pdata_iov.max_nents = XIO_MSGR_IOVLEN; + msg.out.pdata_iov.nents = 0; + msg.out.pdata_iov.sglist = iovs; + + // minimal initialize an "out" msg + msg.request = NULL; + msg.type = XIO_MSG_TYPE_ONE_WAY; + // for now, we DO NEED receipts for every msg + msg.flags = 0; + msg.user_context = user_context; + msg.next = NULL; + // minimal zero "in" side + } +}; + +class XioSend : public XioSubmit +{ +public: + virtual void print_debug(CephContext *cct, const char *tag) const {}; + const struct xio_msg * get_xio_msg() const {return &req_0.msg;} + struct xio_msg * get_xio_msg() {return &req_0.msg;} + virtual size_t get_msg_count() const {return 1;} + + XioSend(XioConnection *_xcon, struct xio_reg_mem& _mp, int _ex_cnt=0) : + XioSubmit(XioSubmit::OUTGOING_MSG, _xcon), + req_0(this), mp_this(_mp), nrefs(_ex_cnt+1) + { + xpool_inc_msgcnt(); + xcon->get(); + } + + XioSend* get() { nrefs++; return this; }; + + void put(int n) { + int refs = nrefs -= n; + if (refs == 0) { + struct xio_reg_mem *mp = &this->mp_this; + this->~XioSend(); + xpool_free(sizeof(XioSend), mp); + } + } + + void put() { + put(1); + } + + void put_msg_refs() { + put(get_msg_count()); + } + + virtual ~XioSend() { + xpool_dec_msgcnt(); + xcon->put(); + } + +private: + xio_msg_ex req_0; + struct xio_reg_mem mp_this; + std::atomic<unsigned> nrefs = { 0 }; +}; + +class XioCommand : public XioSend +{ +public: + XioCommand(XioConnection *_xcon, struct xio_reg_mem& _mp):XioSend(_xcon, _mp) { + } + + buffer::list& get_bl_ref() { return bl; }; + +private: + buffer::list bl; +}; + +struct XioMsg : public XioSend +{ +public: + Message* m; + XioMsgHdr hdr; + xio_msg_ex* req_arr; + +public: + XioMsg(Message *_m, XioConnection *_xcon, struct xio_reg_mem& _mp, + int _ex_cnt, uint64_t _features) : + XioSend(_xcon, _mp, _ex_cnt), + m(_m), hdr(m->get_header(), m->get_footer(), _features), + req_arr(NULL) + { + const entity_inst_t &inst = xcon->get_messenger()->get_myinst(); + hdr.peer_type = inst.name.type(); + hdr.addr = xcon->get_messenger()->get_myaddr_legacy(); + hdr.hdr->src.type = inst.name.type(); + hdr.hdr->src.num = inst.name.num(); + hdr.msg_cnt = _ex_cnt+1; + + if (unlikely(_ex_cnt > 0)) { + alloc_trailers(_ex_cnt); + } + } + + void print_debug(CephContext *cct, const char *tag) const override; + size_t get_msg_count() const override { + return hdr.msg_cnt; + } + + void alloc_trailers(int cnt) { + req_arr = static_cast<xio_msg_ex*>(malloc(cnt * sizeof(xio_msg_ex))); + for (int ix = 0; ix < cnt; ++ix) { + xio_msg_ex* xreq = &(req_arr[ix]); + new (xreq) xio_msg_ex(this); + } + } + + Message *get_message() { return m; } + + ~XioMsg() + { + if (unlikely(!!req_arr)) { + for (unsigned int ix = 0; ix < get_msg_count()-1; ++ix) { + xio_msg_ex* xreq = &(req_arr[ix]); + xreq->~xio_msg_ex(); + } + free(req_arr); + } + + /* testing only! server's ready, resubmit request (not reached on + * PASSIVE/server side) */ + if (unlikely(m->get_magic() & MSG_MAGIC_REDUPE)) { + if (likely(xcon->is_connected())) { + xcon->send_message(m); + } else { + /* dispose it */ + m->put(); + } + } else { + /* the normal case: done with message */ + m->put(); + } + } +}; + +class XioDispatchHook : public Message::CompletionHook +{ +private: + XioConnection *xcon; + XioInSeq msg_seq; + XioPool rsp_pool; + std::atomic<unsigned> nrefs { 1 }; + bool cl_flag; + friend class XioConnection; + friend class XioMessenger; +public: + struct xio_reg_mem mp_this; + + XioDispatchHook(XioConnection *_xcon, Message *_m, XioInSeq& _msg_seq, + struct xio_reg_mem& _mp) : + CompletionHook(_m), + xcon(_xcon->get()), + msg_seq(_msg_seq), + rsp_pool(xio_msgr_noreg_mpool), + cl_flag(false), + mp_this(_mp) + { + ++xcon->n_reqs; // atomicity by portal thread + xpool_inc_hookcnt(); + } + + virtual void finish(int r) { + this->put(); + } + + virtual void complete(int r) { + finish(r); + } + + int release_msgs(); + + XioDispatchHook* get() { + nrefs++; return this; + } + + void put(int n = 1) { + int refs = nrefs -= n; + if (refs == 0) { + /* in Marcus' new system, refs reaches 0 twice: once in + * Message lifecycle, and again after xio_release_msg. + */ + if (!cl_flag && release_msgs()) + return; + struct xio_reg_mem *mp = &this->mp_this; + this->~XioDispatchHook(); + xpool_free(sizeof(XioDispatchHook), mp); + } + } + + XioInSeq& get_seq() { return msg_seq; } + + XioPool& get_pool() { return rsp_pool; } + + void on_err_finalize(XioConnection *xcon) { + /* can't decode message; even with one-way must free + * xio_msg structures, and then xiopool + */ + this->finish(-1); + } + + ~XioDispatchHook() { + --xcon->n_reqs; // atomicity by portal thread + xpool_dec_hookcnt(); + xcon->put(); + } +}; + +/* A sender-side CompletionHook that relies on the on_msg_delivered + * to complete a pending mark down. */ +class XioMarkDownHook : public Message::CompletionHook +{ +private: + XioConnection* xcon; + +public: + struct xio_reg_mem mp_this; + + XioMarkDownHook( + XioConnection* _xcon, Message *_m, struct xio_reg_mem& _mp) : + CompletionHook(_m), xcon(_xcon->get()), mp_this(_mp) + { } + + virtual void claim(int r) {} + + virtual void finish(int r) { + xcon->put(); + struct xio_reg_mem *mp = &this->mp_this; + this->~XioMarkDownHook(); + xio_mempool_free(mp); + } + + virtual void complete(int r) { + xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE); + finish(r); + } +}; + +struct XioCompletion : public XioSubmit +{ + XioDispatchHook *xhook; +public: + XioCompletion(XioConnection *_xcon, XioDispatchHook *_xhook) + : XioSubmit(XioSubmit::INCOMING_MSG_RELEASE, _xcon /* not xcon! */), + xhook(_xhook->get()) { + // submit queue ref + xcon->get(); + }; + + struct xio_msg* dequeue() { + return xhook->get_seq().dequeue(); + } + + XioDispatchHook* get_xhook() { return xhook; } + + void finalize() { + xcon->put(); + xhook->put(); + } +}; + +void print_xio_msg_hdr(CephContext *cct, const char *tag, + const XioMsgHdr &hdr, const struct xio_msg *msg); +void print_ceph_msg(CephContext *cct, const char *tag, Message *m); + +#endif /* XIO_MSG_H */ |