summaryrefslogtreecommitdiffstats
path: root/src/mgr/ClusterState.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/mgr/ClusterState.cc
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/mgr/ClusterState.cc384
1 files changed, 384 insertions, 0 deletions
diff --git a/src/mgr/ClusterState.cc b/src/mgr/ClusterState.cc
new file mode 100644
index 000000000..28340d56d
--- /dev/null
+++ b/src/mgr/ClusterState.cc
@@ -0,0 +1,384 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 John Spray <john.spray@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#include "messages/MMgrDigest.h"
+#include "messages/MMonMgrReport.h"
+#include "messages/MPGStats.h"
+
+#include "mgr/ClusterState.h"
+#include <time.h>
+#include <boost/range/adaptor/reversed.hpp>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+ClusterState::ClusterState(
+ MonClient *monc_,
+ Objecter *objecter_,
+ const MgrMap& mgrmap)
+ : monc(monc_),
+ objecter(objecter_),
+ mgr_map(mgrmap),
+ asok_hook(NULL)
+{}
+
+void ClusterState::set_objecter(Objecter *objecter_)
+{
+ std::lock_guard l(lock);
+
+ objecter = objecter_;
+}
+
+void ClusterState::set_fsmap(FSMap const &new_fsmap)
+{
+ std::lock_guard l(lock);
+
+ fsmap = new_fsmap;
+}
+
+void ClusterState::set_mgr_map(MgrMap const &new_mgrmap)
+{
+ std::lock_guard l(lock);
+ mgr_map = new_mgrmap;
+}
+
+void ClusterState::set_service_map(ServiceMap const &new_service_map)
+{
+ std::lock_guard l(lock);
+ servicemap = new_service_map;
+}
+
+void ClusterState::load_digest(MMgrDigest *m)
+{
+ std::lock_guard l(lock);
+ health_json = std::move(m->health_json);
+ mon_status_json = std::move(m->mon_status_json);
+}
+
+void ClusterState::ingest_pgstats(ref_t<MPGStats> stats)
+{
+ std::lock_guard l(lock);
+
+ const int from = stats->get_orig_source().num();
+ bool is_in = with_osdmap([from](const OSDMap& osdmap) {
+ return osdmap.is_in(from);
+ });
+
+ if (is_in) {
+ pending_inc.update_stat(from, std::move(stats->osd_stat));
+ } else {
+ osd_stat_t empty_stat;
+ empty_stat.seq = stats->osd_stat.seq;
+ pending_inc.update_stat(from, std::move(empty_stat));
+ }
+
+ for (auto p : stats->pg_stat) {
+ pg_t pgid = p.first;
+ const auto &pg_stats = p.second;
+
+ // In case we're hearing about a PG that according to last
+ // OSDMap update should not exist
+ auto r = existing_pools.find(pgid.pool());
+ if (r == existing_pools.end()) {
+ dout(15) << " got " << pgid
+ << " reported at " << pg_stats.reported_epoch << ":"
+ << pg_stats.reported_seq
+ << " state " << pg_state_string(pg_stats.state)
+ << " but pool not in " << existing_pools
+ << dendl;
+ continue;
+ }
+ if (pgid.ps() >= r->second) {
+ dout(15) << " got " << pgid
+ << " reported at " << pg_stats.reported_epoch << ":"
+ << pg_stats.reported_seq
+ << " state " << pg_state_string(pg_stats.state)
+ << " but > pg_num " << r->second
+ << dendl;
+ continue;
+ }
+ // In case we already heard about more recent stats from this PG
+ // from another OSD
+ const auto q = pg_map.pg_stat.find(pgid);
+ if (q != pg_map.pg_stat.end() &&
+ q->second.get_version_pair() > pg_stats.get_version_pair()) {
+ dout(15) << " had " << pgid << " from "
+ << q->second.reported_epoch << ":"
+ << q->second.reported_seq << dendl;
+ continue;
+ }
+
+ pending_inc.pg_stat_updates[pgid] = pg_stats;
+ }
+ for (auto p : stats->pool_stat) {
+ pending_inc.pool_statfs_updates[std::make_pair(p.first, from)] = p.second;
+ }
+}
+
+void ClusterState::update_delta_stats()
+{
+ pending_inc.stamp = ceph_clock_now();
+ pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
+ dout(10) << " v" << pending_inc.version << dendl;
+
+ dout(30) << " pg_map before:\n";
+ JSONFormatter jf(true);
+ jf.dump_object("pg_map", pg_map);
+ jf.flush(*_dout);
+ *_dout << dendl;
+ dout(30) << " incremental:\n";
+ JSONFormatter jf(true);
+ jf.dump_object("pending_inc", pending_inc);
+ jf.flush(*_dout);
+ *_dout << dendl;
+ pg_map.apply_incremental(g_ceph_context, pending_inc);
+ pending_inc = PGMap::Incremental();
+}
+
+void ClusterState::notify_osdmap(const OSDMap &osd_map)
+{
+ assert(ceph_mutex_is_locked(lock));
+
+ pending_inc.stamp = ceph_clock_now();
+ pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
+ dout(10) << " v" << pending_inc.version << dendl;
+
+ PGMapUpdater::check_osd_map(g_ceph_context, osd_map, pg_map, &pending_inc);
+
+ // update our list of pools that exist, so that we can filter pg_map updates
+ // in synchrony with this OSDMap.
+ existing_pools.clear();
+ for (auto& p : osd_map.get_pools()) {
+ existing_pools[p.first] = p.second.get_pg_num();
+ }
+
+ // brute force this for now (don't bother being clever by only
+ // checking osds that went up/down)
+ set<int> need_check_down_pg_osds;
+ PGMapUpdater::check_down_pgs(osd_map, pg_map, true,
+ need_check_down_pg_osds, &pending_inc);
+
+ dout(30) << " pg_map before:\n";
+ JSONFormatter jf(true);
+ jf.dump_object("pg_map", pg_map);
+ jf.flush(*_dout);
+ *_dout << dendl;
+ dout(30) << " incremental:\n";
+ JSONFormatter jf(true);
+ jf.dump_object("pending_inc", pending_inc);
+ jf.flush(*_dout);
+ *_dout << dendl;
+
+ pg_map.apply_incremental(g_ceph_context, pending_inc);
+ pending_inc = PGMap::Incremental();
+ // TODO: Complete the separation of PG state handling so
+ // that a cut-down set of functionality remains in PGMonitor
+ // while the full-blown PGMap lives only here.
+}
+
+class ClusterSocketHook : public AdminSocketHook {
+ ClusterState *cluster_state;
+public:
+ explicit ClusterSocketHook(ClusterState *o) : cluster_state(o) {}
+ int call(std::string_view admin_command, const cmdmap_t& cmdmap,
+ Formatter *f,
+ std::ostream& errss,
+ bufferlist& out) override {
+ stringstream outss;
+ int r = 0;
+ try {
+ r = cluster_state->asok_command(admin_command, cmdmap, f, outss);
+ out.append(outss);
+ } catch (const TOPNSPC::common::bad_cmd_get& e) {
+ errss << e.what();
+ r = -EINVAL;
+ }
+ return r;
+ }
+};
+
+void ClusterState::final_init()
+{
+ AdminSocket *admin_socket = g_ceph_context->get_admin_socket();
+ asok_hook = new ClusterSocketHook(this);
+ int r = admin_socket->register_command(
+ "dump_osd_network name=value,type=CephInt,req=false", asok_hook,
+ "Dump osd heartbeat network ping times");
+ ceph_assert(r == 0);
+}
+
+void ClusterState::shutdown()
+{
+ // unregister commands
+ g_ceph_context->get_admin_socket()->unregister_commands(asok_hook);
+ delete asok_hook;
+ asok_hook = NULL;
+}
+
+bool ClusterState::asok_command(
+ std::string_view admin_command,
+ const cmdmap_t& cmdmap,
+ Formatter *f,
+ ostream& ss)
+{
+ std::lock_guard l(lock);
+ if (admin_command == "dump_osd_network") {
+ int64_t value = 0;
+ // Default to health warning level if nothing specified
+ if (!(TOPNSPC::common::cmd_getval(cmdmap, "value", value))) {
+ // Convert milliseconds to microseconds
+ value = static_cast<int64_t>(g_ceph_context->_conf.get_val<double>("mon_warn_on_slow_ping_time")) * 1000;
+ if (value == 0) {
+ double ratio = g_conf().get_val<double>("mon_warn_on_slow_ping_ratio");
+ value = g_conf().get_val<int64_t>("osd_heartbeat_grace");
+ value *= 1000000 * ratio; // Seconds of grace to microseconds at ratio
+ }
+ } else {
+ // Convert user input to microseconds
+ value *= 1000;
+ }
+ if (value < 0)
+ value = 0;
+
+ struct mgr_ping_time_t {
+ uint32_t pingtime;
+ int from;
+ int to;
+ bool back;
+ std::array<uint32_t,3> times;
+ std::array<uint32_t,3> min;
+ std::array<uint32_t,3> max;
+ uint32_t last;
+ uint32_t last_update;
+
+ bool operator<(const mgr_ping_time_t& rhs) const {
+ if (pingtime < rhs.pingtime)
+ return true;
+ if (pingtime > rhs.pingtime)
+ return false;
+ if (from < rhs.from)
+ return true;
+ if (from > rhs.from)
+ return false;
+ if (to < rhs.to)
+ return true;
+ if (to > rhs.to)
+ return false;
+ return back;
+ }
+ };
+
+ set<mgr_ping_time_t> sorted;
+ utime_t now = ceph_clock_now();
+ for (auto i : pg_map.osd_stat) {
+ for (auto j : i.second.hb_pingtime) {
+
+ if (j.second.last_update == 0)
+ continue;
+ auto stale_time = g_ceph_context->_conf.get_val<int64_t>("osd_mon_heartbeat_stat_stale");
+ if (now.sec() - j.second.last_update > stale_time) {
+ dout(20) << __func__ << " time out heartbeat for osd " << i.first
+ << " last_update " << j.second.last_update << dendl;
+ continue;
+ }
+ mgr_ping_time_t item;
+ item.pingtime = std::max(j.second.back_pingtime[0], j.second.back_pingtime[1]);
+ item.pingtime = std::max(item.pingtime, j.second.back_pingtime[2]);
+ if (!value || item.pingtime >= value) {
+ item.from = i.first;
+ item.to = j.first;
+ item.times[0] = j.second.back_pingtime[0];
+ item.times[1] = j.second.back_pingtime[1];
+ item.times[2] = j.second.back_pingtime[2];
+ item.min[0] = j.second.back_min[0];
+ item.min[1] = j.second.back_min[1];
+ item.min[2] = j.second.back_min[2];
+ item.max[0] = j.second.back_max[0];
+ item.max[1] = j.second.back_max[1];
+ item.max[2] = j.second.back_max[2];
+ item.last = j.second.back_last;
+ item.back = true;
+ item.last_update = j.second.last_update;
+ sorted.emplace(item);
+ }
+
+ if (j.second.front_last == 0)
+ continue;
+ item.pingtime = std::max(j.second.front_pingtime[0], j.second.front_pingtime[1]);
+ item.pingtime = std::max(item.pingtime, j.second.front_pingtime[2]);
+ if (!value || item.pingtime >= value) {
+ item.from = i.first;
+ item.to = j.first;
+ item.times[0] = j.second.front_pingtime[0];
+ item.times[1] = j.second.front_pingtime[1];
+ item.times[2] = j.second.front_pingtime[2];
+ item.min[0] = j.second.front_min[0];
+ item.min[1] = j.second.front_min[1];
+ item.min[2] = j.second.front_min[2];
+ item.max[0] = j.second.front_max[0];
+ item.max[1] = j.second.front_max[1];
+ item.max[2] = j.second.front_max[2];
+ item.last = j.second.front_last;
+ item.back = false;
+ item.last_update = j.second.last_update;
+ sorted.emplace(item);
+ }
+ }
+ }
+
+ // Network ping times (1min 5min 15min)
+ f->open_object_section("network_ping_times");
+ f->dump_int("threshold", value / 1000);
+ f->open_array_section("entries");
+ for (auto &sitem : boost::adaptors::reverse(sorted)) {
+ ceph_assert(!value || sitem.pingtime >= value);
+
+ f->open_object_section("entry");
+
+ const time_t lu(sitem.last_update);
+ char buffer[26];
+ string lustr(ctime_r(&lu, buffer));
+ lustr.pop_back(); // Remove trailing \n
+ auto stale = g_ceph_context->_conf.get_val<int64_t>("osd_heartbeat_stale");
+ f->dump_string("last update", lustr);
+ f->dump_bool("stale", ceph_clock_now().sec() - sitem.last_update > stale);
+ f->dump_int("from osd", sitem.from);
+ f->dump_int("to osd", sitem.to);
+ f->dump_string("interface", (sitem.back ? "back" : "front"));
+ f->open_object_section("average");
+ f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.times[0],3).c_str());
+ f->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem.times[1],3).c_str());
+ f->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem.times[2],3).c_str());
+ f->close_section(); // average
+ f->open_object_section("min");
+ f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.min[0],3).c_str());
+ f->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem.min[1],3).c_str());
+ f->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem.min[2],3).c_str());
+ f->close_section(); // min
+ f->open_object_section("max");
+ f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.max[0],3).c_str());
+ f->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem.max[1],3).c_str());
+ f->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem.max[2],3).c_str());
+ f->close_section(); // max
+ f->dump_format_unquoted("last", "%s", fixed_u_to_string(sitem.last,3).c_str());
+ f->close_section(); // entry
+ }
+ f->close_section(); // entries
+ f->close_section(); // network_ping_times
+ } else {
+ ceph_abort_msg("broken asok registration");
+ }
+ return true;
+}