diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/msg/Message.h | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/msg/Message.h | 577 |
1 files changed, 577 insertions, 0 deletions
diff --git a/src/msg/Message.h b/src/msg/Message.h new file mode 100644 index 00000000..42405ed3 --- /dev/null +++ b/src/msg/Message.h @@ -0,0 +1,577 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MESSAGE_H +#define CEPH_MESSAGE_H + +#include <stdlib.h> +#include <ostream> +#include <string_view> + +#include <boost/intrusive/list.hpp> + +#include "include/Context.h" +#include "common/RefCountedObj.h" +#include "common/ThrottleInterface.h" +#include "common/config.h" +#include "common/debug.h" +#include "common/zipkin_trace.h" +#include "include/ceph_assert.h" // Because intrusive_ptr clobbers our assert... +#include "include/buffer.h" +#include "include/types.h" +#include "msg/Connection.h" +#include "msg/MessageRef.h" +#include "msg_types.h" + +// monitor internal +#define MSG_MON_SCRUB 64 +#define MSG_MON_ELECTION 65 +#define MSG_MON_PAXOS 66 +#define MSG_MON_PROBE 67 +#define MSG_MON_JOIN 68 +#define MSG_MON_SYNC 69 + +/* monitor <-> mon admin tool */ +#define MSG_MON_COMMAND 50 +#define MSG_MON_COMMAND_ACK 51 +#define MSG_LOG 52 +#define MSG_LOGACK 53 + +#define MSG_GETPOOLSTATS 58 +#define MSG_GETPOOLSTATSREPLY 59 + +#define MSG_MON_GLOBAL_ID 60 + +#define MSG_ROUTE 47 +#define MSG_FORWARD 46 + +#define MSG_PAXOS 40 + +#define MSG_CONFIG 62 +#define MSG_GET_CONFIG 63 + + +// osd internal +#define MSG_OSD_PING 70 +#define MSG_OSD_BOOT 71 +#define MSG_OSD_FAILURE 72 +#define MSG_OSD_ALIVE 73 +#define MSG_OSD_MARK_ME_DOWN 74 +#define MSG_OSD_FULL 75 + +// removed right after luminous +//#define MSG_OSD_SUBOP 76 +//#define MSG_OSD_SUBOPREPLY 77 + +#define MSG_OSD_PGTEMP 78 + +#define MSG_OSD_BEACON 79 + +#define MSG_OSD_PG_NOTIFY 80 +#define MSG_OSD_PG_QUERY 81 +#define MSG_OSD_PG_LOG 83 +#define MSG_OSD_PG_REMOVE 84 +#define MSG_OSD_PG_INFO 85 +#define MSG_OSD_PG_TRIM 86 + +#define MSG_PGSTATS 87 +#define MSG_PGSTATSACK 88 + +#define MSG_OSD_PG_CREATE 89 +#define MSG_REMOVE_SNAPS 90 + +#define MSG_OSD_SCRUB 91 +#define MSG_OSD_SCRUB_RESERVE 92 // previous PG_MISSING +#define MSG_OSD_REP_SCRUB 93 + +#define MSG_OSD_PG_SCAN 94 +#define MSG_OSD_PG_BACKFILL 95 +#define MSG_OSD_PG_BACKFILL_REMOVE 96 + +#define MSG_COMMAND 97 +#define MSG_COMMAND_REPLY 98 + +#define MSG_OSD_BACKFILL_RESERVE 99 +#define MSG_OSD_RECOVERY_RESERVE 150 +#define MSG_OSD_FORCE_RECOVERY 151 + +#define MSG_OSD_PG_PUSH 105 +#define MSG_OSD_PG_PULL 106 +#define MSG_OSD_PG_PUSH_REPLY 107 + +#define MSG_OSD_EC_WRITE 108 +#define MSG_OSD_EC_WRITE_REPLY 109 +#define MSG_OSD_EC_READ 110 +#define MSG_OSD_EC_READ_REPLY 111 + +#define MSG_OSD_REPOP 112 +#define MSG_OSD_REPOPREPLY 113 +#define MSG_OSD_PG_UPDATE_LOG_MISSING 114 +#define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY 115 + +#define MSG_OSD_PG_CREATED 116 +#define MSG_OSD_REP_SCRUBMAP 117 +#define MSG_OSD_PG_RECOVERY_DELETE 118 +#define MSG_OSD_PG_RECOVERY_DELETE_REPLY 119 +#define MSG_OSD_PG_CREATE2 120 +#define MSG_OSD_SCRUB2 121 + +#define MSG_OSD_PG_READY_TO_MERGE 122 + +// *** MDS *** + +#define MSG_MDS_BEACON 100 // to monitor +#define MSG_MDS_SLAVE_REQUEST 101 +#define MSG_MDS_TABLE_REQUEST 102 + + // 150 already in use (MSG_OSD_RECOVERY_RESERVE) + +#define MSG_MDS_RESOLVE 0x200 +#define MSG_MDS_RESOLVEACK 0x201 +#define MSG_MDS_CACHEREJOIN 0x202 +#define MSG_MDS_DISCOVER 0x203 +#define MSG_MDS_DISCOVERREPLY 0x204 +#define MSG_MDS_INODEUPDATE 0x205 +#define MSG_MDS_DIRUPDATE 0x206 +#define MSG_MDS_CACHEEXPIRE 0x207 +#define MSG_MDS_DENTRYUNLINK 0x208 +#define MSG_MDS_FRAGMENTNOTIFY 0x209 +#define MSG_MDS_OFFLOAD_TARGETS 0x20a +#define MSG_MDS_DENTRYLINK 0x20c +#define MSG_MDS_FINDINO 0x20d +#define MSG_MDS_FINDINOREPLY 0x20e +#define MSG_MDS_OPENINO 0x20f +#define MSG_MDS_OPENINOREPLY 0x210 +#define MSG_MDS_SNAPUPDATE 0x211 +#define MSG_MDS_FRAGMENTNOTIFYACK 0x212 +#define MSG_MDS_LOCK 0x300 +#define MSG_MDS_INODEFILECAPS 0x301 + +#define MSG_MDS_EXPORTDIRDISCOVER 0x449 +#define MSG_MDS_EXPORTDIRDISCOVERACK 0x450 +#define MSG_MDS_EXPORTDIRCANCEL 0x451 +#define MSG_MDS_EXPORTDIRPREP 0x452 +#define MSG_MDS_EXPORTDIRPREPACK 0x453 +#define MSG_MDS_EXPORTDIRWARNING 0x454 +#define MSG_MDS_EXPORTDIRWARNINGACK 0x455 +#define MSG_MDS_EXPORTDIR 0x456 +#define MSG_MDS_EXPORTDIRACK 0x457 +#define MSG_MDS_EXPORTDIRNOTIFY 0x458 +#define MSG_MDS_EXPORTDIRNOTIFYACK 0x459 +#define MSG_MDS_EXPORTDIRFINISH 0x460 + +#define MSG_MDS_EXPORTCAPS 0x470 +#define MSG_MDS_EXPORTCAPSACK 0x471 +#define MSG_MDS_GATHERCAPS 0x472 + +#define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer + +// *** generic *** +#define MSG_TIMECHECK 0x600 +#define MSG_MON_HEALTH 0x601 + +// *** Message::encode() crcflags bits *** +#define MSG_CRC_DATA (1 << 0) +#define MSG_CRC_HEADER (1 << 1) +#define MSG_CRC_ALL (MSG_CRC_DATA | MSG_CRC_HEADER) + +// Xio Testing +#define MSG_DATA_PING 0x602 + +// Xio intends to define messages 0x603..0x606 + +// Special +#define MSG_NOP 0x607 + +#define MSG_MON_HEALTH_CHECKS 0x608 +#define MSG_TIMECHECK2 0x609 + +// *** ceph-mgr <-> OSD/MDS daemons *** +#define MSG_MGR_OPEN 0x700 +#define MSG_MGR_CONFIGURE 0x701 +#define MSG_MGR_REPORT 0x702 + +// *** ceph-mgr <-> ceph-mon *** +#define MSG_MGR_BEACON 0x703 + +// *** ceph-mon(MgrMonitor) -> OSD/MDS daemons *** +#define MSG_MGR_MAP 0x704 + +// *** ceph-mon(MgrMonitor) -> ceph-mgr +#define MSG_MGR_DIGEST 0x705 +// *** cephmgr -> ceph-mon +#define MSG_MON_MGR_REPORT 0x706 +#define MSG_SERVICE_MAP 0x707 + +#define MSG_MGR_CLOSE 0x708 + +// ====================================================== + +// abstract Message class + +namespace bi = boost::intrusive; + +// XioMessenger conditional trace flags +#define MSG_MAGIC_XIO 0x0002 +#define MSG_MAGIC_TRACE_XCON 0x0004 +#define MSG_MAGIC_TRACE_DTOR 0x0008 +#define MSG_MAGIC_TRACE_HDR 0x0010 +#define MSG_MAGIC_TRACE_XIO 0x0020 +#define MSG_MAGIC_TRACE_XMSGR 0x0040 +#define MSG_MAGIC_TRACE_CTR 0x0080 + +// XioMessenger diagnostic "ping pong" flag (resend msg when send completes) +#define MSG_MAGIC_REDUPE 0x0100 + +class Message : public RefCountedObject { +protected: + ceph_msg_header header; // headerelope + ceph_msg_footer footer; + bufferlist payload; // "front" unaligned blob + bufferlist middle; // "middle" unaligned blob + bufferlist data; // data payload (page-alignment will be preserved where possible) + + /* recv_stamp is set when the Messenger starts reading the + * Message off the wire */ + utime_t recv_stamp; + /* dispatch_stamp is set when the Messenger starts calling dispatch() on + * its endpoints */ + utime_t dispatch_stamp; + /* throttle_stamp is the point at which we got throttle */ + utime_t throttle_stamp; + /* time at which message was fully read */ + utime_t recv_complete_stamp; + + ConnectionRef connection; + + uint32_t magic = 0; + + bi::list_member_hook<> dispatch_q; + +public: + using ref = MessageRef; + using const_ref = MessageConstRef; + + // zipkin tracing + ZTracer::Trace trace; + void encode_trace(bufferlist &bl, uint64_t features) const; + void decode_trace(bufferlist::const_iterator &p, bool create = false); + + class CompletionHook : public Context { + protected: + Message *m; + friend class Message; + public: + explicit CompletionHook(Message *_m) : m(_m) {} + virtual void set_message(Message *_m) { m = _m; } + }; + + typedef bi::list< Message, + bi::member_hook< Message, + bi::list_member_hook<>, + &Message::dispatch_q > > Queue; + +protected: + CompletionHook* completion_hook = nullptr; // owned by Messenger + + // release our size in bytes back to this throttler when our payload + // is adjusted or when we are destroyed. + ThrottleInterface *byte_throttler = nullptr; + + // release a count back to this throttler when we are destroyed + ThrottleInterface *msg_throttler = nullptr; + + // keep track of how big this message was when we reserved space in + // the msgr dispatch_throttler, so that we can properly release it + // later. this is necessary because messages can enter the dispatch + // queue locally (not via read_message()), and those are not + // currently throttled. + uint64_t dispatch_throttle_size = 0; + + friend class Messenger; + +public: + Message() { + memset(&header, 0, sizeof(header)); + memset(&footer, 0, sizeof(footer)); + } + Message(int t, int version=1, int compat_version=0) { + memset(&header, 0, sizeof(header)); + header.type = t; + header.version = version; + header.compat_version = compat_version; + header.priority = 0; // undef + header.data_off = 0; + memset(&footer, 0, sizeof(footer)); + } + + Message *get() { + return static_cast<Message *>(RefCountedObject::get()); + } + +protected: + ~Message() override { + if (byte_throttler) + byte_throttler->put(payload.length() + middle.length() + data.length()); + release_message_throttle(); + trace.event("message destructed"); + /* call completion hooks (if any) */ + if (completion_hook) + completion_hook->complete(0); + } +public: + const ConnectionRef& get_connection() const { return connection; } + void set_connection(const ConnectionRef& c) { + connection = c; + } + CompletionHook* get_completion_hook() { return completion_hook; } + void set_completion_hook(CompletionHook *hook) { completion_hook = hook; } + void set_byte_throttler(ThrottleInterface *t) { + byte_throttler = t; + } + void set_message_throttler(ThrottleInterface *t) { + msg_throttler = t; + } + + void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; } + uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size; } + + const ceph_msg_header &get_header() const { return header; } + ceph_msg_header &get_header() { return header; } + void set_header(const ceph_msg_header &e) { header = e; } + void set_footer(const ceph_msg_footer &e) { footer = e; } + const ceph_msg_footer &get_footer() const { return footer; } + ceph_msg_footer &get_footer() { return footer; } + void set_src(const entity_name_t& src) { header.src = src; } + + uint32_t get_magic() const { return magic; } + void set_magic(int _magic) { magic = _magic; } + + /* + * If you use get_[data, middle, payload] you shouldn't + * use it to change those bufferlists unless you KNOW + * there is no throttle being used. The other + * functions are throttling-aware as appropriate. + */ + + void clear_payload() { + if (byte_throttler) { + byte_throttler->put(payload.length() + middle.length()); + } + payload.clear(); + middle.clear(); + } + + virtual void clear_buffers() {} + void clear_data() { + if (byte_throttler) + byte_throttler->put(data.length()); + data.clear(); + clear_buffers(); // let subclass drop buffers as well + } + void release_message_throttle() { + if (msg_throttler) + msg_throttler->put(); + msg_throttler = nullptr; + } + + bool empty_payload() const { return payload.length() == 0; } + bufferlist& get_payload() { return payload; } + const bufferlist& get_payload() const { return payload; } + void set_payload(bufferlist& bl) { + if (byte_throttler) + byte_throttler->put(payload.length()); + payload.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); + if (byte_throttler) + byte_throttler->take(payload.length()); + } + + void set_middle(bufferlist& bl) { + if (byte_throttler) + byte_throttler->put(middle.length()); + middle.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); + if (byte_throttler) + byte_throttler->take(middle.length()); + } + bufferlist& get_middle() { return middle; } + + void set_data(const bufferlist &bl) { + if (byte_throttler) + byte_throttler->put(data.length()); + data.share(bl); + if (byte_throttler) + byte_throttler->take(data.length()); + } + + const bufferlist& get_data() const { return data; } + bufferlist& get_data() { return data; } + void claim_data(bufferlist& bl, + unsigned int flags = buffer::list::CLAIM_DEFAULT) { + if (byte_throttler) + byte_throttler->put(data.length()); + bl.claim(data, flags); + } + off_t get_data_len() const { return data.length(); } + + void set_recv_stamp(utime_t t) { recv_stamp = t; } + const utime_t& get_recv_stamp() const { return recv_stamp; } + void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; } + const utime_t& get_dispatch_stamp() const { return dispatch_stamp; } + void set_throttle_stamp(utime_t t) { throttle_stamp = t; } + const utime_t& get_throttle_stamp() const { return throttle_stamp; } + void set_recv_complete_stamp(utime_t t) { recv_complete_stamp = t; } + const utime_t& get_recv_complete_stamp() const { return recv_complete_stamp; } + + void calc_header_crc() { + header.crc = ceph_crc32c(0, (unsigned char*)&header, + sizeof(header) - sizeof(header.crc)); + } + void calc_front_crc() { + footer.front_crc = payload.crc32c(0); + footer.middle_crc = middle.crc32c(0); + } + void calc_data_crc() { + footer.data_crc = data.crc32c(0); + } + + virtual int get_cost() const { + return data.length(); + } + + // type + int get_type() const { return header.type; } + void set_type(int t) { header.type = t; } + + uint64_t get_tid() const { return header.tid; } + void set_tid(uint64_t t) { header.tid = t; } + + uint64_t get_seq() const { return header.seq; } + void set_seq(uint64_t s) { header.seq = s; } + + unsigned get_priority() const { return header.priority; } + void set_priority(__s16 p) { header.priority = p; } + + // source/dest + entity_inst_t get_source_inst() const { + return entity_inst_t(get_source(), get_source_addr()); + } + entity_name_t get_source() const { + return entity_name_t(header.src); + } + entity_addr_t get_source_addr() const { + if (connection) + return connection->get_peer_addr(); + return entity_addr_t(); + } + entity_addrvec_t get_source_addrs() const { + if (connection) + return connection->get_peer_addrs(); + return entity_addrvec_t(); + } + + // forwarded? + entity_inst_t get_orig_source_inst() const { + return get_source_inst(); + } + entity_name_t get_orig_source() const { + return get_source(); + } + entity_addr_t get_orig_source_addr() const { + return get_source_addr(); + } + entity_addrvec_t get_orig_source_addrs() const { + return get_source_addrs(); + } + + // virtual bits + virtual void decode_payload() = 0; + virtual void encode_payload(uint64_t features) = 0; + virtual std::string_view get_type_name() const = 0; + virtual void print(ostream& out) const { + out << get_type_name() << " magic: " << magic; + } + + virtual void dump(Formatter *f) const; + + void encode(uint64_t features, int crcflags); +}; + +extern Message *decode_message(CephContext *cct, int crcflags, + ceph_msg_header &header, + ceph_msg_footer& footer, bufferlist& front, + bufferlist& middle, bufferlist& data, + Connection* conn); +inline ostream& operator<<(ostream& out, const Message& m) { + m.print(out); + if (m.get_header().version) + out << " v" << m.get_header().version; + return out; +} + +extern void encode_message(Message *m, uint64_t features, bufferlist& bl); +extern Message *decode_message(CephContext *cct, int crcflags, + bufferlist::const_iterator& bl); + +template <class MessageType> +class MessageFactory { +public: +template<typename... Args> + static typename MessageType::ref build(Args&&... args) { + return typename MessageType::ref(new MessageType(std::forward<Args>(args)...), false); + } +}; + +template<class T, class M = Message> +class MessageSubType : public M { +public: + typedef boost::intrusive_ptr<T> ref; + typedef boost::intrusive_ptr<T const> const_ref; + + static auto msgref_cast(typename M::ref const& m) { + return boost::static_pointer_cast<typename T::const_ref::element_type, typename std::remove_reference<decltype(m)>::type::element_type>(m); + } + static auto msgref_cast(typename M::const_ref const& m) { + return boost::static_pointer_cast<typename T::ref::element_type, typename std::remove_reference<decltype(m)>::type::element_type>(m); + } + +protected: +template<typename... Args> + MessageSubType(Args&&... args) : M(std::forward<Args>(args)...) {} + virtual ~MessageSubType() override {} +}; + + +template<class T, class M = Message> +class MessageInstance : public MessageSubType<T, M> { +public: + using factory = MessageFactory<T>; + + template<typename... Args> + static auto create(Args&&... args) { + return MessageFactory<T>::build(std::forward<Args>(args)...); + } + static auto msgref_cast(typename Message::ref const& m) { + return boost::static_pointer_cast<typename T::ref::element_type, typename std::remove_reference<decltype(m)>::type::element_type>(m); + } + static auto msgref_cast(typename Message::const_ref const& m) { + return boost::static_pointer_cast<typename T::const_ref::element_type, typename std::remove_reference<decltype(m)>::type::element_type>(m); + } + +protected: +template<typename... Args> + MessageInstance(Args&&... args) : MessageSubType<T,M>(std::forward<Args>(args)...) {} + virtual ~MessageInstance() override {} +}; + +#endif |