summaryrefslogtreecommitdiffstats
path: root/src/mon/Elector.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/mon/Elector.cc
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/mon/Elector.cc')
-rw-r--r--src/mon/Elector.cc807
1 files changed, 807 insertions, 0 deletions
diff --git a/src/mon/Elector.cc b/src/mon/Elector.cc
new file mode 100644
index 000000000..671c08d85
--- /dev/null
+++ b/src/mon/Elector.cc
@@ -0,0 +1,807 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Elector.h"
+#include "Monitor.h"
+
+#include "common/Timer.h"
+#include "MonitorDBStore.h"
+#include "messages/MMonElection.h"
+#include "messages/MMonPing.h"
+
+#include "common/config.h"
+#include "include/ceph_assert.h"
+
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, mon, get_epoch())
+using std::cerr;
+using std::cout;
+using std::dec;
+using std::hex;
+using std::list;
+using std::map;
+using std::make_pair;
+using std::ostream;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::setfill;
+using std::string;
+using std::stringstream;
+using std::to_string;
+using std::vector;
+using std::unique_ptr;
+
+using ceph::bufferlist;
+using ceph::decode;
+using ceph::encode;
+using ceph::Formatter;
+using ceph::JSONFormatter;
+using ceph::mono_clock;
+using ceph::mono_time;
+using ceph::timespan_str;
+static ostream& _prefix(std::ostream *_dout, Monitor *mon, epoch_t epoch) {
+ return *_dout << "mon." << mon->name << "@" << mon->rank
+ << "(" << mon->get_state_name()
+ << ").elector(" << epoch << ") ";
+}
+
+Elector::Elector(Monitor *m, int strategy) : logic(this, static_cast<ElectionLogic::election_strategy>(strategy),
+ &peer_tracker,
+ m->cct->_conf.get_val<double>("mon_elector_ignore_propose_margin"),
+ m->cct),
+ peer_tracker(this, m->rank,
+ m->cct->_conf.get_val<uint64_t>("mon_con_tracker_score_halflife"),
+ m->cct->_conf.get_val<uint64_t>("mon_con_tracker_persist_interval"), m->cct),
+ ping_timeout(m->cct->_conf.get_val<double>("mon_elector_ping_timeout")),
+ PING_DIVISOR(m->cct->_conf.get_val<uint64_t>("mon_elector_ping_divisor")),
+ mon(m), elector(this) {
+ bufferlist bl;
+ mon->store->get(Monitor::MONITOR_NAME, "connectivity_scores", bl);
+ if (bl.length()) {
+ bufferlist::const_iterator bi = bl.begin();
+ peer_tracker.decode(bi);
+ }
+}
+
+
+void Elector::persist_epoch(epoch_t e)
+{
+ auto t(std::make_shared<MonitorDBStore::Transaction>());
+ t->put(Monitor::MONITOR_NAME, "election_epoch", e);
+ t->put(Monitor::MONITOR_NAME, "connectivity_scores", peer_tracker.get_encoded_bl());
+ mon->store->apply_transaction(t);
+}
+
+void Elector::persist_connectivity_scores()
+{
+ dout(20) << __func__ << dendl;
+ auto t(std::make_shared<MonitorDBStore::Transaction>());
+ t->put(Monitor::MONITOR_NAME, "connectivity_scores", peer_tracker.get_encoded_bl());
+ mon->store->apply_transaction(t);
+}
+
+epoch_t Elector::read_persisted_epoch() const
+{
+ return mon->store->get(Monitor::MONITOR_NAME, "election_epoch");
+}
+
+void Elector::validate_store()
+{
+ auto t(std::make_shared<MonitorDBStore::Transaction>());
+ t->put(Monitor::MONITOR_NAME, "election_writeable_test", rand());
+ int r = mon->store->apply_transaction(t);
+ ceph_assert(r >= 0);
+}
+
+bool Elector::is_current_member(int rank) const
+{
+ return mon->quorum.count(rank);
+}
+
+void Elector::trigger_new_election()
+{
+ mon->start_election();
+}
+
+int Elector::get_my_rank() const
+{
+ return mon->rank;
+}
+
+void Elector::reset_election()
+{
+ mon->bootstrap();
+}
+
+bool Elector::ever_participated() const
+{
+ return mon->has_ever_joined;
+}
+
+unsigned Elector::paxos_size() const
+{
+ return (unsigned)mon->monmap->size();
+}
+
+void Elector::shutdown()
+{
+ cancel_timer();
+}
+
+void Elector::notify_bump_epoch()
+{
+ mon->join_election();
+}
+
+void Elector::propose_to_peers(epoch_t e, bufferlist& logic_bl)
+{
+ // bcast to everyone else
+ for (unsigned i=0; i<mon->monmap->size(); ++i) {
+ if ((int)i == mon->rank) continue;
+ MMonElection *m =
+ new MMonElection(MMonElection::OP_PROPOSE, e,
+ peer_tracker.get_encoded_bl(),
+ logic.strategy, mon->monmap);
+ m->sharing_bl = logic_bl;
+ m->mon_features = ceph::features::mon::get_supported();
+ m->mon_release = ceph_release();
+ mon->send_mon_message(m, i);
+ }
+}
+
+void Elector::_start()
+{
+ peer_info.clear();
+ peer_info[mon->rank].cluster_features = CEPH_FEATURES_ALL;
+ peer_info[mon->rank].mon_release = ceph_release();
+ peer_info[mon->rank].mon_features = ceph::features::mon::get_supported();
+ mon->collect_metadata(&peer_info[mon->rank].metadata);
+ reset_timer();
+}
+
+void Elector::_defer_to(int who)
+{
+ MMonElection *m = new MMonElection(MMonElection::OP_ACK, get_epoch(),
+ peer_tracker.get_encoded_bl(),
+ logic.strategy, mon->monmap);
+ m->mon_features = ceph::features::mon::get_supported();
+ m->mon_release = ceph_release();
+ mon->collect_metadata(&m->metadata);
+
+ mon->send_mon_message(m, who);
+
+ // set a timer
+ reset_timer(1.0); // give the leader some extra time to declare victory
+}
+
+
+void Elector::reset_timer(double plus)
+{
+ // set the timer
+ cancel_timer();
+ /**
+ * This class is used as the callback when the expire_event timer fires up.
+ *
+ * If the expire_event is fired, then it means that we had an election going,
+ * either started by us or by some other participant, but it took too long,
+ * thus expiring.
+ *
+ * When the election expires, we will check if we were the ones who won, and
+ * if so we will declare victory. If that is not the case, then we assume
+ * that the one we defered to didn't declare victory quickly enough (in fact,
+ * as far as we know, we may even be dead); so, just propose ourselves as the
+ * Leader.
+ */
+ expire_event = mon->timer.add_event_after(
+ g_conf()->mon_election_timeout + plus,
+ new C_MonContext{mon, [this](int) {
+ logic.end_election_period();
+ }});
+}
+
+
+void Elector::cancel_timer()
+{
+ if (expire_event) {
+ mon->timer.cancel_event(expire_event);
+ expire_event = 0;
+ }
+}
+
+void Elector::assimilate_connection_reports(const bufferlist& tbl)
+{
+ dout(10) << __func__ << dendl;
+ ConnectionTracker pct(tbl, mon->cct);
+ peer_tracker.receive_peer_report(pct);
+}
+
+void Elector::message_victory(const std::set<int>& quorum)
+{
+ uint64_t cluster_features = CEPH_FEATURES_ALL;
+ mon_feature_t mon_features = ceph::features::mon::get_supported();
+ map<int,Metadata> metadata;
+ ceph_release_t min_mon_release{ceph_release_t::unknown};
+ for (auto id : quorum) {
+ auto i = peer_info.find(id);
+ ceph_assert(i != peer_info.end());
+ auto& info = i->second;
+ cluster_features &= info.cluster_features;
+ mon_features &= info.mon_features;
+ metadata[id] = info.metadata;
+ if (min_mon_release == ceph_release_t::unknown ||
+ info.mon_release < min_mon_release) {
+ min_mon_release = info.mon_release;
+ }
+ }
+
+ cancel_timer();
+
+
+ // tell everyone!
+ for (set<int>::iterator p = quorum.begin();
+ p != quorum.end();
+ ++p) {
+ if (*p == mon->rank) continue;
+ MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, get_epoch(),
+ peer_tracker.get_encoded_bl(),
+ logic.strategy, mon->monmap);
+ m->quorum = quorum;
+ m->quorum_features = cluster_features;
+ m->mon_features = mon_features;
+ m->sharing_bl = mon->get_local_commands_bl(mon_features);
+ m->mon_release = min_mon_release;
+ mon->send_mon_message(m, *p);
+ }
+
+ // tell monitor
+ mon->win_election(get_epoch(), quorum,
+ cluster_features, mon_features, min_mon_release,
+ metadata);
+}
+
+
+void Elector::handle_propose(MonOpRequestRef op)
+{
+ op->mark_event("elector:handle_propose");
+ auto m = op->get_req<MMonElection>();
+ dout(5) << "handle_propose from " << m->get_source() << dendl;
+ int from = m->get_source().num();
+
+ ceph_assert(m->epoch % 2 == 1); // election
+ uint64_t required_features = mon->get_required_features();
+ mon_feature_t required_mon_features = mon->get_required_mon_features();
+
+ dout(10) << __func__ << " required features " << required_features
+ << " " << required_mon_features
+ << ", peer features " << m->get_connection()->get_features()
+ << " " << m->mon_features
+ << dendl;
+
+ if ((required_features ^ m->get_connection()->get_features()) &
+ required_features) {
+ dout(5) << " ignoring propose from mon" << from
+ << " without required features" << dendl;
+ nak_old_peer(op);
+ return;
+ } else if (mon->monmap->min_mon_release > m->mon_release) {
+ dout(5) << " ignoring propose from mon" << from
+ << " release " << (int)m->mon_release
+ << " < min_mon_release " << (int)mon->monmap->min_mon_release
+ << dendl;
+ nak_old_peer(op);
+ return;
+ } else if (!m->mon_features.contains_all(required_mon_features)) {
+ // all the features in 'required_mon_features' not in 'm->mon_features'
+ mon_feature_t missing = required_mon_features.diff(m->mon_features);
+ dout(5) << " ignoring propose from mon." << from
+ << " without required mon_features " << missing
+ << dendl;
+ nak_old_peer(op);
+ }
+ ConnectionTracker *oct = NULL;
+ if (m->sharing_bl.length()) {
+ oct = new ConnectionTracker(m->sharing_bl, mon->cct);
+ }
+ logic.receive_propose(from, m->epoch, oct);
+ delete oct;
+}
+
+void Elector::handle_ack(MonOpRequestRef op)
+{
+ op->mark_event("elector:handle_ack");
+ auto m = op->get_req<MMonElection>();
+ dout(5) << "handle_ack from " << m->get_source() << dendl;
+ int from = m->get_source().num();
+
+ ceph_assert(m->epoch == get_epoch());
+ uint64_t required_features = mon->get_required_features();
+ if ((required_features ^ m->get_connection()->get_features()) &
+ required_features) {
+ dout(5) << " ignoring ack from mon" << from
+ << " without required features" << dendl;
+ return;
+ }
+
+ mon_feature_t required_mon_features = mon->get_required_mon_features();
+ if (!m->mon_features.contains_all(required_mon_features)) {
+ mon_feature_t missing = required_mon_features.diff(m->mon_features);
+ dout(5) << " ignoring ack from mon." << from
+ << " without required mon_features " << missing
+ << dendl;
+ return;
+ }
+
+ if (logic.electing_me) {
+ // thanks
+ peer_info[from].cluster_features = m->get_connection()->get_features();
+ peer_info[from].mon_features = m->mon_features;
+ peer_info[from].mon_release = m->mon_release;
+ peer_info[from].metadata = m->metadata;
+ dout(5) << " so far i have {";
+ for (auto q = logic.acked_me.begin();
+ q != logic.acked_me.end();
+ ++q) {
+ auto p = peer_info.find(*q);
+ ceph_assert(p != peer_info.end());
+ if (q != logic.acked_me.begin())
+ *_dout << ",";
+ *_dout << " mon." << p->first << ":"
+ << " features " << p->second.cluster_features
+ << " " << p->second.mon_features;
+ }
+ *_dout << " }" << dendl;
+ }
+
+ logic.receive_ack(from, m->epoch);
+}
+
+void Elector::handle_victory(MonOpRequestRef op)
+{
+ op->mark_event("elector:handle_victory");
+ auto m = op->get_req<MMonElection>();
+ dout(5) << "handle_victory from " << m->get_source()
+ << " quorum_features " << m->quorum_features
+ << " " << m->mon_features
+ << dendl;
+ int from = m->get_source().num();
+
+ bool accept_victory = logic.receive_victory_claim(from, m->epoch);
+
+ if (!accept_victory) {
+ return;
+ }
+
+ mon->lose_election(get_epoch(), m->quorum, from,
+ m->quorum_features, m->mon_features, m->mon_release);
+
+ // cancel my timer
+ cancel_timer();
+
+ // stash leader's commands
+ ceph_assert(m->sharing_bl.length());
+ vector<MonCommand> new_cmds;
+ auto bi = m->sharing_bl.cbegin();
+ MonCommand::decode_vector(new_cmds, bi);
+ mon->set_leader_commands(new_cmds);
+}
+
+void Elector::nak_old_peer(MonOpRequestRef op)
+{
+ op->mark_event("elector:nak_old_peer");
+ auto m = op->get_req<MMonElection>();
+ uint64_t supported_features = m->get_connection()->get_features();
+ uint64_t required_features = mon->get_required_features();
+ mon_feature_t required_mon_features = mon->get_required_mon_features();
+ dout(10) << "sending nak to peer " << m->get_source()
+ << " supports " << supported_features << " " << m->mon_features
+ << ", required " << required_features << " " << required_mon_features
+ << ", release " << (int)m->mon_release
+ << " vs required " << (int)mon->monmap->min_mon_release
+ << dendl;
+ MMonElection *reply = new MMonElection(MMonElection::OP_NAK, m->epoch,
+ peer_tracker.get_encoded_bl(),
+ logic.strategy, mon->monmap);
+ reply->quorum_features = required_features;
+ reply->mon_features = required_mon_features;
+ reply->mon_release = mon->monmap->min_mon_release;
+ mon->features.encode(reply->sharing_bl);
+ m->get_connection()->send_message(reply);
+}
+
+void Elector::handle_nak(MonOpRequestRef op)
+{
+ op->mark_event("elector:handle_nak");
+ auto m = op->get_req<MMonElection>();
+ dout(1) << "handle_nak from " << m->get_source()
+ << " quorum_features " << m->quorum_features
+ << " " << m->mon_features
+ << " min_mon_release " << (int)m->mon_release
+ << dendl;
+
+ if (m->mon_release > ceph_release()) {
+ derr << "Shutting down because I am release " << (int)ceph_release()
+ << " < min_mon_release " << (int)m->mon_release << dendl;
+ } else {
+ CompatSet other;
+ auto bi = m->sharing_bl.cbegin();
+ other.decode(bi);
+ CompatSet diff = Monitor::get_supported_features().unsupported(other);
+
+ mon_feature_t mon_supported = ceph::features::mon::get_supported();
+ // all features in 'm->mon_features' not in 'mon_supported'
+ mon_feature_t mon_diff = m->mon_features.diff(mon_supported);
+
+ derr << "Shutting down because I lack required monitor features: { "
+ << diff << " } " << mon_diff << dendl;
+ }
+ exit(0);
+ // the end!
+}
+
+void Elector::begin_peer_ping(int peer)
+{
+ dout(20) << __func__ << " against " << peer << dendl;
+ if (live_pinging.count(peer)) {
+ dout(20) << peer << " already in live_pinging ... return " << dendl;
+ return;
+ }
+
+ if (!mon->get_quorum_mon_features().contains_all(
+ ceph::features::mon::FEATURE_PINGING)) {
+ return;
+ }
+
+ peer_tracker.report_live_connection(peer, 0); // init this peer as existing
+ live_pinging.insert(peer);
+ dead_pinging.erase(peer);
+ peer_acked_ping[peer] = ceph_clock_now();
+ if (!send_peer_ping(peer)) return;
+ mon->timer.add_event_after(ping_timeout / PING_DIVISOR,
+ new C_MonContext{mon, [this, peer](int) {
+ ping_check(peer);
+ }});
+}
+
+bool Elector::send_peer_ping(int peer, const utime_t *n)
+{
+ dout(10) << __func__ << " to peer " << peer << dendl;
+ if (peer >= mon->monmap->ranks.size()) {
+ // Monitor no longer exists in the monmap,
+ // therefore, we shouldn't ping this monitor
+ // since we cannot lookup the address!
+ dout(5) << "peer: " << peer << " >= ranks_size: "
+ << mon->monmap->ranks.size() << " ... dropping to prevent "
+ << "https://tracker.ceph.com/issues/50089" << dendl;
+ live_pinging.erase(peer);
+ return false;
+ }
+ utime_t now;
+ if (n != NULL) {
+ now = *n;
+ } else {
+ now = ceph_clock_now();
+ }
+ MMonPing *ping = new MMonPing(MMonPing::PING, now, peer_tracker.get_encoded_bl());
+ mon->messenger->send_to_mon(ping, mon->monmap->get_addrs(peer));
+ peer_sent_ping[peer] = now;
+ return true;
+}
+
+void Elector::ping_check(int peer)
+{
+ dout(20) << __func__ << " to peer " << peer << dendl;
+
+ if (!live_pinging.count(peer) &&
+ !dead_pinging.count(peer)) {
+ dout(20) << __func__ << peer << " is no longer marked for pinging" << dendl;
+ return;
+ }
+ utime_t now = ceph_clock_now();
+ utime_t& acked_ping = peer_acked_ping[peer];
+ utime_t& newest_ping = peer_sent_ping[peer];
+ if (!acked_ping.is_zero() && acked_ping < now - ping_timeout) {
+ peer_tracker.report_dead_connection(peer, now - acked_ping);
+ acked_ping = now;
+ begin_dead_ping(peer);
+ return;
+ }
+
+ if (acked_ping == newest_ping) {
+ if (!send_peer_ping(peer, &now)) return;
+ }
+
+ mon->timer.add_event_after(ping_timeout / PING_DIVISOR,
+ new C_MonContext{mon, [this, peer](int) {
+ ping_check(peer);
+ }});
+}
+
+void Elector::begin_dead_ping(int peer)
+{
+ dout(20) << __func__ << " to peer " << peer << dendl;
+ if (dead_pinging.count(peer)) {
+ return;
+ }
+
+ live_pinging.erase(peer);
+ dead_pinging.insert(peer);
+ mon->timer.add_event_after(ping_timeout,
+ new C_MonContext{mon, [this, peer](int) {
+ dead_ping(peer);
+ }});
+}
+
+void Elector::dead_ping(int peer)
+{
+ dout(20) << __func__ << " to peer " << peer << dendl;
+ if (!dead_pinging.count(peer)) {
+ dout(20) << __func__ << peer << " is no longer marked for dead pinging" << dendl;
+ return;
+ }
+ ceph_assert(!live_pinging.count(peer));
+
+ utime_t now = ceph_clock_now();
+ utime_t& acked_ping = peer_acked_ping[peer];
+
+ peer_tracker.report_dead_connection(peer, now - acked_ping);
+ acked_ping = now;
+ mon->timer.add_event_after(ping_timeout,
+ new C_MonContext{mon, [this, peer](int) {
+ dead_ping(peer);
+ }});
+}
+
+void Elector::handle_ping(MonOpRequestRef op)
+{
+ MMonPing *m = static_cast<MMonPing*>(op->get_req());
+ int prank = mon->monmap->get_rank(m->get_source_addr());
+ dout(20) << __func__ << " from: " << prank << dendl;
+ begin_peer_ping(prank);
+ assimilate_connection_reports(m->tracker_bl);
+ switch(m->op) {
+ case MMonPing::PING:
+ {
+ MMonPing *reply = new MMonPing(MMonPing::PING_REPLY, m->stamp, peer_tracker.get_encoded_bl());
+ m->get_connection()->send_message(reply);
+ }
+ break;
+
+ case MMonPing::PING_REPLY:
+
+ const utime_t& previous_acked = peer_acked_ping[prank];
+ const utime_t& newest = peer_sent_ping[prank];
+
+ if (m->stamp > newest && !newest.is_zero()) {
+ derr << "dropping PING_REPLY stamp " << m->stamp
+ << " as it is newer than newest sent " << newest << dendl;
+ return;
+ }
+
+ if (m->stamp > previous_acked) {
+ dout(20) << "m->stamp > previous_acked" << dendl;
+ peer_tracker.report_live_connection(prank, m->stamp - previous_acked);
+ peer_acked_ping[prank] = m->stamp;
+ } else{
+ dout(20) << "m->stamp <= previous_acked .. we don't report_live_connection" << dendl;
+ }
+ utime_t now = ceph_clock_now();
+ dout(30) << "now: " << now << " m->stamp: " << m->stamp << " ping_timeout: "
+ << ping_timeout << " PING_DIVISOR: " << PING_DIVISOR << dendl;
+ if (now - m->stamp > ping_timeout / PING_DIVISOR) {
+ if (!send_peer_ping(prank, &now)) return;
+ }
+ break;
+ }
+}
+
+void Elector::dispatch(MonOpRequestRef op)
+{
+ op->mark_event("elector:dispatch");
+ ceph_assert(op->is_type_election_or_ping());
+
+ switch (op->get_req()->get_type()) {
+
+ case MSG_MON_ELECTION:
+ {
+ if (!logic.participating) {
+ return;
+ }
+ if (op->get_req()->get_source().num() >= mon->monmap->size()) {
+ dout(5) << " ignoring bogus election message with bad mon rank "
+ << op->get_req()->get_source() << dendl;
+ return;
+ }
+
+ auto em = op->get_req<MMonElection>();
+ dout(20) << __func__ << " from: " << mon->monmap->get_rank(em->get_source_addr()) << dendl;
+ // assume an old message encoding would have matched
+ if (em->fsid != mon->monmap->fsid) {
+ dout(0) << " ignoring election msg fsid "
+ << em->fsid << " != " << mon->monmap->fsid << dendl;
+ return;
+ }
+
+ if (!mon->monmap->contains(em->get_source_addr())) {
+ dout(1) << "discarding election message: " << em->get_source_addr()
+ << " not in my monmap " << *mon->monmap << dendl;
+ return;
+ }
+
+ MonMap peermap;
+ peermap.decode(em->monmap_bl);
+ if (peermap.epoch > mon->monmap->epoch) {
+ dout(0) << em->get_source_inst() << " has newer monmap epoch " << peermap.epoch
+ << " > my epoch " << mon->monmap->epoch
+ << ", taking it"
+ << dendl;
+ mon->monmap->decode(em->monmap_bl);
+ auto t(std::make_shared<MonitorDBStore::Transaction>());
+ t->put("monmap", mon->monmap->epoch, em->monmap_bl);
+ t->put("monmap", "last_committed", mon->monmap->epoch);
+ mon->store->apply_transaction(t);
+ //mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl);
+ cancel_timer();
+ mon->notify_new_monmap(false);
+ mon->bootstrap();
+ return;
+ }
+ if (peermap.epoch < mon->monmap->epoch) {
+ dout(0) << em->get_source_inst() << " has older monmap epoch " << peermap.epoch
+ << " < my epoch " << mon->monmap->epoch
+ << dendl;
+ }
+
+ if (em->strategy != logic.strategy) {
+ dout(5) << __func__ << " somehow got an Election message with different strategy "
+ << em->strategy << " from local " << logic.strategy
+ << "; dropping for now to let race resolve" << dendl;
+ return;
+ }
+
+ if (em->scoring_bl.length()) {
+ assimilate_connection_reports(em->scoring_bl);
+ }
+
+ begin_peer_ping(mon->monmap->get_rank(em->get_source_addr()));
+ switch (em->op) {
+ case MMonElection::OP_PROPOSE:
+ handle_propose(op);
+ return;
+ }
+
+ if (em->epoch < get_epoch()) {
+ dout(5) << "old epoch, dropping" << dendl;
+ break;
+ }
+
+ switch (em->op) {
+ case MMonElection::OP_ACK:
+ handle_ack(op);
+ return;
+ case MMonElection::OP_VICTORY:
+ handle_victory(op);
+ return;
+ case MMonElection::OP_NAK:
+ handle_nak(op);
+ return;
+ default:
+ ceph_abort();
+ }
+ }
+ break;
+
+ case MSG_MON_PING:
+ handle_ping(op);
+ break;
+
+ default:
+ ceph_abort();
+ }
+}
+
+void Elector::start_participating()
+{
+ logic.participating = true;
+}
+
+bool Elector::peer_tracker_is_clean()
+{
+ return peer_tracker.is_clean(mon->rank, paxos_size());
+}
+
+void Elector::notify_clear_peer_state()
+{
+ dout(10) << __func__ << dendl;
+ dout(20) << " peer_tracker before: " << peer_tracker << dendl;
+ peer_tracker.notify_reset();
+ peer_tracker.set_rank(mon->rank);
+ dout(20) << " peer_tracker after: " << peer_tracker << dendl;
+}
+
+void Elector::notify_rank_changed(int new_rank)
+{
+ dout(10) << __func__ << " to " << new_rank << dendl;
+ peer_tracker.notify_rank_changed(new_rank);
+ live_pinging.erase(new_rank);
+ dead_pinging.erase(new_rank);
+}
+
+void Elector::notify_rank_removed(int rank_removed, int new_rank)
+{
+ dout(10) << __func__ << ": " << rank_removed << dendl;
+ peer_tracker.notify_rank_removed(rank_removed, new_rank);
+ /* we have to clean up the pinging state, which is annoying
+ because it's not indexed anywhere (and adding indexing
+ would also be annoying).
+ In the case where we are removing any rank that is not the
+ higest, we start with the removed rank and examine the state
+ of the surrounding ranks.
+ Everybody who remains with larger rank gets a new rank one lower
+ than before, and we have to figure out the remaining scheduled
+ ping contexts. So, starting one past with the removed rank, we:
+ * check if the current rank is alive or dead
+ * examine our new rank (one less than before, initially the removed
+ rank)
+ * * erase it if it's in the wrong set
+ * * start pinging it if we're not already
+ * check if the next rank is in the same pinging set, and delete
+ * ourselves if not.
+ In the case where we are removing the highest rank,
+ we erase the removed rank from all sets.
+ */
+ if (rank_removed < paxos_size()) {
+ for (unsigned i = rank_removed + 1; i <= paxos_size() ; ++i) {
+ if (live_pinging.count(i)) {
+ dead_pinging.erase(i-1);
+ if (!live_pinging.count(i-1)) {
+ begin_peer_ping(i-1);
+ }
+ if (!live_pinging.count(i+1)) {
+ live_pinging.erase(i);
+ }
+ }
+ else if (dead_pinging.count(i)) {
+ live_pinging.erase(i-1);
+ if (!dead_pinging.count(i-1)) {
+ begin_dead_ping(i-1);
+ }
+ if (!dead_pinging.count(i+1)) {
+ dead_pinging.erase(i);
+ }
+ } else {
+ // we aren't pinging rank i at all
+ if (i-1 == (unsigned)rank_removed) {
+ // so we special case to make sure we
+ // actually nuke the removed rank
+ dead_pinging.erase(rank_removed);
+ live_pinging.erase(rank_removed);
+ }
+ }
+ }
+ } else {
+ if (live_pinging.count(rank_removed)) {
+ live_pinging.erase(rank_removed);
+ }
+ if (dead_pinging.count(rank_removed)) {
+ dead_pinging.erase(rank_removed);
+ }
+ }
+}
+
+void Elector::notify_strategy_maybe_changed(int strategy)
+{
+ logic.set_election_strategy(static_cast<ElectionLogic::election_strategy>(strategy));
+}