summaryrefslogtreecommitdiffstats
path: root/src/mon/ConnectionTracker.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/mon/ConnectionTracker.cc361
1 files changed, 361 insertions, 0 deletions
diff --git a/src/mon/ConnectionTracker.cc b/src/mon/ConnectionTracker.cc
new file mode 100644
index 000000000..272ad40c2
--- /dev/null
+++ b/src/mon/ConnectionTracker.cc
@@ -0,0 +1,361 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "ConnectionTracker.h"
+#include "common/Formatter.h"
+#include "common/dout.h"
+#include "include/ceph_assert.h"
+
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, rank, epoch, version)
+
+static std::ostream& _prefix(std::ostream *_dout, int rank, epoch_t epoch, uint64_t version) {
+ return *_dout << "rank: " << rank << " version: "<< version << " ConnectionTracker(" << epoch << ") ";
+}
+
+std::ostream& operator<<(std::ostream&o, const ConnectionReport& c) {
+ o << "rank=" << c.rank << ",epoch=" << c.epoch << ",version=" << c.epoch_version
+ << ", current links: " << c.current << ", history: " << c.history;
+ return o;
+}
+
+std::ostream& operator<<(std::ostream& o, const ConnectionTracker& c) {
+ o << "rank=" << c.rank << ", epoch=" << c.epoch << ", version=" << c.version
+ << ", half_life=" << c.half_life << ", reports: " << c.peer_reports;
+ return o;
+}
+
+ConnectionReport *ConnectionTracker::reports(int p)
+{
+ auto i = peer_reports.find(p);
+ if (i == peer_reports.end()) {
+ ceph_assert(p != rank);
+ auto[j,k] = peer_reports.insert(std::pair<int,ConnectionReport>(p,ConnectionReport()));
+ i = j;
+ }
+ return &i->second;
+}
+
+const ConnectionReport *ConnectionTracker::reports(int p) const
+{
+ auto i = peer_reports.find(p);
+ if (i == peer_reports.end()) {
+ return NULL;
+ }
+ return &i->second;
+}
+
+void ConnectionTracker::receive_peer_report(const ConnectionTracker& o)
+{
+ ldout(cct, 30) << __func__ << dendl;
+ for (auto& i : o.peer_reports) {
+ const ConnectionReport& report = i.second;
+ if (i.first == rank) continue;
+ ConnectionReport& existing = *reports(i.first);
+ if (report.epoch > existing.epoch ||
+ (report.epoch == existing.epoch &&
+ report.epoch_version > existing.epoch_version)) {
+ ldout(cct, 30) << " new peer_report is more updated" << dendl;
+ ldout(cct, 30) << "existing: " << existing << dendl;
+ ldout(cct, 30) << "new: " << report << dendl;
+ existing = report;
+ }
+ }
+ encoding.clear();
+}
+
+bool ConnectionTracker::increase_epoch(epoch_t e)
+{
+ ldout(cct, 30) << __func__ << " to " << e << dendl;
+ if (e > epoch) {
+ my_reports.epoch_version = version = 0;
+ my_reports.epoch = epoch = e;
+ peer_reports[rank] = my_reports;
+ encoding.clear();
+ return true;
+ }
+ return false;
+}
+
+void ConnectionTracker::increase_version()
+{
+ ldout(cct, 30) << __func__ << " to " << version+1 << dendl;
+ encoding.clear();
+ ++version;
+ my_reports.epoch_version = version;
+ peer_reports[rank] = my_reports;
+ if ((version % persist_interval) == 0 ) {
+ ldout(cct, 30) << version << " % " << persist_interval << " == 0" << dendl;
+ owner->persist_connectivity_scores();
+ }
+}
+
+void ConnectionTracker::report_live_connection(int peer_rank, double units_alive)
+{
+ ldout(cct, 30) << __func__ << " peer_rank: " << peer_rank << " units_alive: " << units_alive << dendl;
+ ldout(cct, 30) << "my_reports before: " << my_reports << dendl;
+ if (peer_rank == rank) {
+ lderr(cct) << "Got a report from my own rank, hopefully this is startup weirdness, dropping" << dendl;
+ return;
+ }
+ // we need to "auto-initialize" to 1, do shenanigans
+ auto i = my_reports.history.find(peer_rank);
+ if (i == my_reports.history.end()) {
+ ldout(cct, 30) << "couldn't find: " << peer_rank
+ << " in my_reports.history" << "... inserting: "
+ << "(" << peer_rank << ", 1" << dendl;
+ auto[j,k] = my_reports.history.insert(std::pair<int,double>(peer_rank,1.0));
+ i = j;
+ }
+ double& pscore = i->second;
+ ldout(cct, 30) << "adding new pscore to my_reports" << dendl;
+ pscore = pscore * (1 - units_alive / (2 * half_life)) +
+ (units_alive / (2 * half_life));
+ pscore = std::min(pscore, 1.0);
+ my_reports.current[peer_rank] = true;
+
+ increase_version();
+ ldout(cct, 30) << "my_reports after: " << my_reports << dendl;
+}
+
+void ConnectionTracker::report_dead_connection(int peer_rank, double units_dead)
+{
+ ldout(cct, 30) << __func__ << " peer_rank: " << peer_rank << " units_dead: " << units_dead << dendl;
+ ldout(cct, 30) << "my_reports before: " << my_reports << dendl;
+ if (peer_rank == rank) {
+ lderr(cct) << "Got a report from my own rank, hopefully this is startup weirdness, dropping" << dendl;
+ return;
+ }
+ // we need to "auto-initialize" to 1, do shenanigans
+ auto i = my_reports.history.find(peer_rank);
+ if (i == my_reports.history.end()) {
+ ldout(cct, 30) << "couldn't find: " << peer_rank
+ << " in my_reports.history" << "... inserting: "
+ << "(" << peer_rank << ", 1" << dendl;
+ auto[j,k] = my_reports.history.insert(std::pair<int,double>(peer_rank,1.0));
+ i = j;
+ }
+ double& pscore = i->second;
+ ldout(cct, 30) << "adding new pscore to my_reports" << dendl;
+ pscore = pscore * (1 - units_dead / (2 * half_life)) -
+ (units_dead / (2*half_life));
+ pscore = std::max(pscore, 0.0);
+ my_reports.current[peer_rank] = false;
+
+ increase_version();
+ ldout(cct, 30) << "my_reports after: " << my_reports << dendl;
+}
+
+void ConnectionTracker::get_total_connection_score(int peer_rank, double *rating,
+ int *live_count) const
+{
+ ldout(cct, 30) << __func__ << dendl;
+ *rating = 0;
+ *live_count = 0;
+ double rate = 0;
+ int live = 0;
+
+ for (const auto& i : peer_reports) { // loop through all the scores
+ if (i.first == peer_rank) { // ... except the ones it has for itself, of course!
+ continue;
+ }
+ const auto& report = i.second;
+ auto score_i = report.history.find(peer_rank);
+ auto live_i = report.current.find(peer_rank);
+ if (score_i != report.history.end()) {
+ if (live_i->second) {
+ rate += score_i->second;
+ ++live;
+ }
+ }
+ }
+ *rating = rate;
+ *live_count = live;
+}
+
+void ConnectionTracker::notify_rank_changed(int new_rank)
+{
+ ldout(cct, 20) << __func__ << " to " << new_rank << dendl;
+ if (new_rank == rank) return;
+ ldout(cct, 20) << "peer_reports before: " << peer_reports << dendl;
+ peer_reports.erase(rank);
+ peer_reports.erase(new_rank);
+ my_reports.rank = new_rank;
+ rank = new_rank;
+ encoding.clear();
+ ldout(cct, 20) << "peer_reports after: " << peer_reports << dendl;
+
+ increase_version();
+}
+
+void ConnectionTracker::notify_rank_removed(int rank_removed, int new_rank)
+{
+ ldout(cct, 20) << __func__ << " " << rank_removed
+ << " new_rank: " << new_rank << dendl;
+ ldout(cct, 20) << "my_reports before: " << my_reports << dendl;
+ ldout(cct, 20) << "peer_reports before: " << peer_reports << dendl;
+ ldout(cct, 20) << "my rank before: " << rank << dendl;
+
+ encoding.clear();
+ size_t starting_size_current = my_reports.current.size();
+ // Lets adjust everything in my report.
+ my_reports.current.erase(rank_removed);
+ my_reports.history.erase(rank_removed);
+ auto ci = my_reports.current.upper_bound(rank_removed);
+ auto hi = my_reports.history.upper_bound(rank_removed);
+ while (ci != my_reports.current.end()) {
+ ceph_assert(ci->first == hi->first);
+ my_reports.current[ci->first - 1] = ci->second;
+ my_reports.history[hi->first - 1] = hi->second;
+ my_reports.current.erase(ci++);
+ my_reports.history.erase(hi++);
+ }
+ ceph_assert((my_reports.current.size() == starting_size_current) ||
+ (my_reports.current.size() + 1 == starting_size_current));
+
+ size_t starting_size = peer_reports.size();
+ auto pi = peer_reports.upper_bound(rank_removed);
+ // Remove the target rank and adjust everything that comes after.
+ // Note that we don't adjust current and history for our peer_reports
+ // because it is better to rely on our peers on that information.
+ peer_reports.erase(rank_removed);
+ while (pi != peer_reports.end()) {
+ peer_reports[pi->first - 1] = pi->second; // copy content of next rank to ourself.
+ peer_reports.erase(pi++); // destroy our next rank and move on.
+ }
+
+ ceph_assert((peer_reports.size() == starting_size) ||
+ (peer_reports.size() + 1 == starting_size));
+
+ if (rank_removed < rank) { // if the rank removed is lower than us, we need to adjust.
+ --rank;
+ my_reports.rank = rank; // also adjust my_reports.rank.
+ }
+
+ ldout(cct, 20) << "my rank after: " << rank << dendl;
+ ldout(cct, 20) << "peer_reports after: " << peer_reports << dendl;
+ ldout(cct, 20) << "my_reports after: " << my_reports << dendl;
+
+ //check if the new_rank from monmap is equal to our adjusted rank.
+ ceph_assert(rank == new_rank);
+
+ increase_version();
+}
+
+bool ConnectionTracker::is_clean(int mon_rank, int monmap_size)
+{
+ ldout(cct, 30) << __func__ << dendl;
+ // check consistency between our rank according
+ // to monmap and our rank according to our report.
+ if (rank != mon_rank ||
+ my_reports.rank != mon_rank) {
+ return false;
+ } else if (!peer_reports.empty()){
+ // if peer_report max rank is greater than monmap max rank
+ // then there is a problem.
+ if (peer_reports.rbegin()->first > monmap_size - 1) return false;
+ }
+ return true;
+}
+
+void ConnectionTracker::encode(bufferlist &bl) const
+{
+ ENCODE_START(1, 1, bl);
+ encode(rank, bl);
+ encode(epoch, bl);
+ encode(version, bl);
+ encode(half_life, bl);
+ encode(peer_reports, bl);
+ ENCODE_FINISH(bl);
+}
+
+void ConnectionTracker::decode(bufferlist::const_iterator& bl) {
+ clear_peer_reports();
+ encoding.clear();
+
+ DECODE_START(1, bl);
+ decode(rank, bl);
+ decode(epoch, bl);
+ decode(version, bl);
+ decode(half_life, bl);
+ decode(peer_reports, bl);
+ DECODE_FINISH(bl);
+ if (rank >=0)
+ my_reports = peer_reports[rank];
+}
+
+const bufferlist& ConnectionTracker::get_encoded_bl()
+{
+ if (!encoding.length()) {
+ encode(encoding);
+ }
+ return encoding;
+}
+
+void ConnectionReport::dump(ceph::Formatter *f) const
+{
+ f->dump_int("rank", rank);
+ f->dump_int("epoch", epoch);
+ f->dump_int("version", epoch_version);
+ f->open_object_section("peer_scores");
+ for (auto i : history) {
+ f->open_object_section("peer");
+ f->dump_int("peer_rank", i.first);
+ f->dump_float("peer_score", i.second);
+ f->dump_bool("peer_alive", current.find(i.first)->second);
+ f->close_section();
+ }
+ f->close_section(); // peer scores
+}
+
+void ConnectionReport::generate_test_instances(std::list<ConnectionReport*>& o)
+{
+ o.push_back(new ConnectionReport);
+ o.push_back(new ConnectionReport);
+ o.back()->rank = 1;
+ o.back()->epoch = 2;
+ o.back()->epoch_version = 3;
+ o.back()->current[0] = true;
+ o.back()->history[0] = .4;
+}
+
+void ConnectionTracker::dump(ceph::Formatter *f) const
+{
+ f->dump_int("rank", rank);
+ f->dump_int("epoch", epoch);
+ f->dump_int("version", version);
+ f->dump_float("half_life", half_life);
+ f->dump_int("persist_interval", persist_interval);
+ f->open_object_section("reports");
+ for (const auto& i : peer_reports) {
+ f->open_object_section("report");
+ i.second.dump(f);
+ f->close_section();
+ }
+ f->close_section(); // reports
+}
+
+void ConnectionTracker::generate_test_instances(std::list<ConnectionTracker*>& o)
+{
+ o.push_back(new ConnectionTracker);
+ o.push_back(new ConnectionTracker);
+ ConnectionTracker *e = o.back();
+ e->rank = 2;
+ e->epoch = 3;
+ e->version = 4;
+ e->peer_reports[0];
+ e->peer_reports[1];
+ e->my_reports = e->peer_reports[2];
+}