// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #pragma once #include #include #include "common/ceph_time.h" #include "crimson/common/gated.h" #include "crimson/net/Dispatcher.h" #include "crimson/net/Fwd.h" class MOSDPing; namespace crimson::osd { class ShardServices; } namespace crimson::mon { class Client; } template using Ref = boost::intrusive_ptr; class Heartbeat : public crimson::net::Dispatcher { public: using osd_id_t = int; Heartbeat(osd_id_t whoami, crimson::osd::ShardServices& service, crimson::mon::Client& monc, crimson::net::Messenger &front_msgr, crimson::net::Messenger &back_msgr); seastar::future<> start(entity_addrvec_t front, entity_addrvec_t back); seastar::future<> stop(); using osds_t = std::vector; void add_peer(osd_id_t peer, epoch_t epoch); void update_peers(int whoami); void remove_peer(osd_id_t peer); osds_t get_peers() const; const entity_addrvec_t& get_front_addrs() const; const entity_addrvec_t& get_back_addrs() const; crimson::net::Messenger &get_front_msgr() const; crimson::net::Messenger &get_back_msgr() const; // Dispatcher methods std::optional> ms_dispatch( crimson::net::ConnectionRef conn, MessageRef m) override; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; void ms_handle_connect(crimson::net::ConnectionRef conn, seastar::shard_id) override; void ms_handle_accept(crimson::net::ConnectionRef conn, seastar::shard_id, bool is_replace) override; void print(std::ostream&) const; private: seastar::future<> handle_osd_ping(crimson::net::ConnectionRef conn, Ref m); seastar::future<> handle_ping(crimson::net::ConnectionRef conn, Ref m); seastar::future<> handle_reply(crimson::net::ConnectionRef conn, Ref m); seastar::future<> handle_you_died(); /// remove down OSDs /// @return peers not added in this epoch osds_t remove_down_peers(); /// add enough reporters for fast failure detection void add_reporter_peers(int whoami); seastar::future<> start_messenger(crimson::net::Messenger& msgr, const entity_addrvec_t& addrs); seastar::future<> maybe_share_osdmap(crimson::net::ConnectionRef, Ref m); private: const osd_id_t whoami; crimson::osd::ShardServices& service; crimson::mon::Client& monc; crimson::net::Messenger &front_msgr; crimson::net::Messenger &back_msgr; seastar::timer timer; // use real_clock so it can be converted to utime_t using clock = ceph::coarse_real_clock; class ConnectionListener; class Connection; class Session; class Peer; using peers_map_t = std::map; peers_map_t peers; // osds which are considered failed // osd_id => when was the last time that both front and back pings were acked // or sent. // use for calculating how long the OSD has been unresponsive using failure_queue_t = std::map; seastar::future<> send_failures(failure_queue_t&& failure_queue); seastar::future<> send_heartbeats(); void heartbeat_check(); // osds we've reported to monior as failed ones, but they are not marked down // yet crimson::common::Gated gate; class FailingPeers { public: FailingPeers(Heartbeat& heartbeat) : heartbeat(heartbeat) {} bool add_pending(osd_id_t peer, clock::time_point failed_since, clock::time_point now, std::vector>& futures); seastar::future<> cancel_one(osd_id_t peer); private: seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&); Heartbeat& heartbeat; struct failure_info_t { clock::time_point failed_since; entity_addrvec_t addrs; }; std::map failure_pending; } failing_peers; }; inline std::ostream& operator<<(std::ostream& out, const Heartbeat& hb) { hb.print(out); return out; } /* * Event driven interface for Heartbeat::Peer to be notified when both hb_front * and hb_back are connected, or connection is lost. */ class Heartbeat::ConnectionListener { public: ConnectionListener(size_t connections) : connections{connections} {} void increase_connected() { assert(connected < connections); ++connected; if (connected == connections) { on_connected(); } } void decrease_connected() { assert(connected > 0); if (connected == connections) { on_disconnected(); } --connected; } enum class type_t { front, back }; virtual entity_addr_t get_peer_addr(type_t) = 0; protected: virtual void on_connected() = 0; virtual void on_disconnected() = 0; private: const size_t connections; size_t connected = 0; }; class Heartbeat::Connection { public: using type_t = ConnectionListener::type_t; Connection(osd_id_t peer, bool is_winner_side, type_t type, crimson::net::Messenger& msgr, ConnectionListener& listener) : peer{peer}, type{type}, msgr{msgr}, listener{listener}, is_winner_side{is_winner_side} { connect(); } Connection(const Connection&) = delete; Connection(Connection&&) = delete; Connection& operator=(const Connection&) = delete; Connection& operator=(Connection&&) = delete; ~Connection(); bool matches(crimson::net::ConnectionRef _conn) const; void connected() { set_connected(); } bool accepted(crimson::net::ConnectionRef, bool is_replace); void reset(bool is_replace=false); seastar::future<> send(MessageURef msg); void validate(); // retry connection if still pending void retry(); private: void set_connected(); void set_unconnected(); void connect(); const osd_id_t peer; const type_t type; crimson::net::Messenger& msgr; ConnectionListener& listener; /* * Resolve the following racing when both me and peer are trying to connect * each other symmetrically, under SocketPolicy::lossy_client: * * OSD.A OSD.B * - - * |-[1]----> <----[2]-| * \ / * \ / * delay.. X delay.. * / \ * |-[1]x> / \ | * |(reset#1) (reset#2)| * |(reconnectB) (reconnectA)| * |-[2]---> <---[1]-| * delay.. delay.. * (remote close populated) * |-[2]x> hb_back reset */ class Heartbeat::Session { public: Session(osd_id_t peer) : peer{peer} {} void set_epoch_added(epoch_t epoch_) { epoch = epoch_; } epoch_t get_epoch_added() const { return epoch; } void set_projected_epoch(epoch_t epoch_) { projected_epoch = epoch_; } epoch_t get_projected_epoch() const { return projected_epoch; } bool is_started() const { return connected; } bool pinged() const { if (clock::is_zero(first_tx)) { // i can never receive a pong without sending any ping message first. assert(clock::is_zero(last_rx_front) && clock::is_zero(last_rx_back)); return false; } else { return true; } } enum class health_state { UNKNOWN, UNHEALTHY, HEALTHY, }; health_state do_health_screen(clock::time_point now) const { if (!pinged()) { // we are not healty nor unhealty because we haven't sent anything yet return health_state::UNKNOWN; } else if (!ping_history.empty() && ping_history.begin()->second.deadline < now) { return health_state::UNHEALTHY; } else if (!clock::is_zero(last_rx_front) && !clock::is_zero(last_rx_back)) { // only declare to be healthy until we have received the first // replies from both front/back connections return health_state::HEALTHY; } else { return health_state::UNKNOWN; } } clock::time_point failed_since(clock::time_point now) const; void set_tx(clock::time_point now) { if (!pinged()) { first_tx = now; } last_tx = now; } void on_connected() { assert(!connected); connected = true; ping_history.clear(); } void on_ping(const utime_t& sent_stamp, const clock::time_point& deadline) { assert(connected); [[maybe_unused]] auto [reply, added] = ping_history.emplace(sent_stamp, reply_t{deadline, 2}); } bool on_pong(const utime_t& ping_stamp, Connection::type_t type, clock::time_point now) { assert(connected); auto ping = ping_history.find(ping_stamp); if (ping == ping_history.end()) { // old replies, deprecated by newly sent pings. return false; } auto& unacked = ping->second.unacknowledged; assert(unacked); if (type == Connection::type_t::front) { last_rx_front = now; unacked--; } else { last_rx_back = now; unacked--; } if (unacked == 0) { ping_history.erase(ping_history.begin(), ++ping); } return true; } void on_disconnected() { assert(connected); connected = false; if (!ping_history.empty()) { // we lost our ping_history of the last session, but still need to keep // the oldest deadline for unhealthy check. auto oldest = ping_history.begin(); auto sent_stamp = oldest->first; auto deadline = oldest->second.deadline; ping_history.clear(); ping_history.emplace(sent_stamp, reply_t{deadline, 0}); } } // maintain an entry in ping_history for unhealthy check void set_inactive_history(clock::time_point); private: const osd_id_t peer; bool connected = false; // time we sent our first ping request clock::time_point first_tx; // last time we sent a ping request clock::time_point last_tx; // last time we got a ping reply on the front side clock::time_point last_rx_front; // last time we got a ping reply on the back side clock::time_point last_rx_back; // most recent epoch we wanted this peer epoch_t epoch; // rename me to epoch_added // epoch we expect peer to be at once our sent incrementals are processed epoch_t projected_epoch = 0; struct reply_t { clock::time_point deadline; // one sent over front conn, another sent over back conn uint8_t unacknowledged = 0; }; // history of inflight pings, arranging by timestamp we sent std::map ping_history; }; class Heartbeat::Peer final : private Heartbeat::ConnectionListener { public: Peer(Heartbeat&, osd_id_t); ~Peer(); Peer(Peer&&) = delete; Peer(const Peer&) = delete; Peer& operator=(Peer&&) = delete; Peer& operator=(const Peer&) = delete; // set/get the epoch at which the peer was added void set_epoch_added(epoch_t epoch) { session.set_epoch_added(epoch); } epoch_t get_epoch_added() const { return session.get_epoch_added(); } void set_projected_epoch(epoch_t epoch) { session.set_projected_epoch(epoch); } epoch_t get_projected_epoch() const { return session.get_projected_epoch(); } // if failure, return time_point since last active // else, return clock::zero() clock::time_point failed_since(clock::time_point now) const { return session.failed_since(now); } void send_heartbeat( clock::time_point, ceph::signedspan, std::vector>&); seastar::future<> handle_reply(crimson::net::ConnectionRef, Ref); void handle_reset(crimson::net::ConnectionRef conn, bool is_replace); void handle_connect(crimson::net::ConnectionRef conn); void handle_accept(crimson::net::ConnectionRef conn, bool is_replace); private: entity_addr_t get_peer_addr(type_t type) override; void on_connected() override; void on_disconnected() override; void do_send_heartbeat( clock::time_point, ceph::signedspan, std::vector>*); template void for_each_conn(Func&& f) { f(con_front); f(con_back); } Heartbeat& heartbeat; const osd_id_t peer; Session session; // if need to send heartbeat when session connected bool pending_send = false; Connection con_front; Connection con_back; friend std::ostream& operator<<(std::ostream& os, const Peer& p) { return os << "peer(osd." << p.peer << ")"; } }; #if FMT_VERSION >= 90000 template <> struct fmt::formatter : fmt::ostream_formatter {}; template <> struct fmt::formatter : fmt::ostream_formatter {}; template <> struct fmt::formatter : fmt::ostream_formatter {}; #endif