summaryrefslogtreecommitdiffstats
path: root/src/mon/MgrStatMonitor.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/mon/MgrStatMonitor.cc
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/mon/MgrStatMonitor.cc')
-rw-r--r--src/mon/MgrStatMonitor.cc367
1 files changed, 367 insertions, 0 deletions
diff --git a/src/mon/MgrStatMonitor.cc b/src/mon/MgrStatMonitor.cc
new file mode 100644
index 000000000..9da4c50da
--- /dev/null
+++ b/src/mon/MgrStatMonitor.cc
@@ -0,0 +1,367 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "MgrStatMonitor.h"
+#include "mon/OSDMonitor.h"
+#include "mon/MgrMonitor.h"
+#include "mon/PGMap.h"
+#include "messages/MGetPoolStats.h"
+#include "messages/MGetPoolStatsReply.h"
+#include "messages/MMonMgrReport.h"
+#include "messages/MStatfs.h"
+#include "messages/MStatfsReply.h"
+#include "messages/MServiceMap.h"
+
+#include "include/ceph_assert.h" // re-clobber assert
+
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, mon)
+
+using std::dec;
+using std::hex;
+using std::list;
+using std::map;
+using std::make_pair;
+using std::ostream;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::to_string;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::decode;
+using ceph::encode;
+using ceph::ErasureCodeInterfaceRef;
+using ceph::ErasureCodeProfile;
+using ceph::Formatter;
+using ceph::JSONFormatter;
+using ceph::make_message;
+using ceph::mono_clock;
+using ceph::mono_time;
+
+static ostream& _prefix(std::ostream *_dout, Monitor &mon) {
+ return *_dout << "mon." << mon.name << "@" << mon.rank
+ << "(" << mon.get_state_name()
+ << ").mgrstat ";
+}
+
+MgrStatMonitor::MgrStatMonitor(Monitor &mn, Paxos &p, const string& service_name)
+ : PaxosService(mn, p, service_name)
+{
+}
+
+MgrStatMonitor::~MgrStatMonitor() = default;
+
+void MgrStatMonitor::create_initial()
+{
+ dout(10) << __func__ << dendl;
+ version = 0;
+ service_map.epoch = 1;
+ service_map.modified = ceph_clock_now();
+ pending_service_map_bl.clear();
+ encode(service_map, pending_service_map_bl, CEPH_FEATURES_ALL);
+}
+
+void MgrStatMonitor::update_from_paxos(bool *need_bootstrap)
+{
+ version = get_last_committed();
+ dout(10) << " " << version << dendl;
+ load_health();
+ bufferlist bl;
+ get_version(version, bl);
+ if (version) {
+ ceph_assert(bl.length());
+ try {
+ auto p = bl.cbegin();
+ decode(digest, p);
+ decode(service_map, p);
+ if (!p.end()) {
+ decode(progress_events, p);
+ }
+ dout(10) << __func__ << " v" << version
+ << " service_map e" << service_map.epoch
+ << " " << progress_events.size() << " progress events"
+ << dendl;
+ }
+ catch (ceph::buffer::error& e) {
+ derr << "failed to decode mgrstat state; luminous dev version? "
+ << e.what() << dendl;
+ }
+ }
+ check_subs();
+ update_logger();
+ mon.osdmon()->notify_new_pg_digest();
+}
+
+void MgrStatMonitor::update_logger()
+{
+ dout(20) << __func__ << dendl;
+
+ mon.cluster_logger->set(l_cluster_osd_bytes, digest.osd_sum.statfs.total);
+ mon.cluster_logger->set(l_cluster_osd_bytes_used,
+ digest.osd_sum.statfs.get_used_raw());
+ mon.cluster_logger->set(l_cluster_osd_bytes_avail,
+ digest.osd_sum.statfs.available);
+
+ mon.cluster_logger->set(l_cluster_num_pool, digest.pg_pool_sum.size());
+ uint64_t num_pg = 0;
+ for (auto i : digest.num_pg_by_pool) {
+ num_pg += i.second;
+ }
+ mon.cluster_logger->set(l_cluster_num_pg, num_pg);
+
+ unsigned active = 0, active_clean = 0, peering = 0;
+ for (auto p = digest.num_pg_by_state.begin();
+ p != digest.num_pg_by_state.end();
+ ++p) {
+ if (p->first & PG_STATE_ACTIVE) {
+ active += p->second;
+ if (p->first & PG_STATE_CLEAN)
+ active_clean += p->second;
+ }
+ if (p->first & PG_STATE_PEERING)
+ peering += p->second;
+ }
+ mon.cluster_logger->set(l_cluster_num_pg_active_clean, active_clean);
+ mon.cluster_logger->set(l_cluster_num_pg_active, active);
+ mon.cluster_logger->set(l_cluster_num_pg_peering, peering);
+
+ mon.cluster_logger->set(l_cluster_num_object, digest.pg_sum.stats.sum.num_objects);
+ mon.cluster_logger->set(l_cluster_num_object_degraded, digest.pg_sum.stats.sum.num_objects_degraded);
+ mon.cluster_logger->set(l_cluster_num_object_misplaced, digest.pg_sum.stats.sum.num_objects_misplaced);
+ mon.cluster_logger->set(l_cluster_num_object_unfound, digest.pg_sum.stats.sum.num_objects_unfound);
+ mon.cluster_logger->set(l_cluster_num_bytes, digest.pg_sum.stats.sum.num_bytes);
+
+}
+
+void MgrStatMonitor::create_pending()
+{
+ dout(10) << " " << version << dendl;
+ pending_digest = digest;
+ pending_health_checks = get_health_checks();
+ pending_service_map_bl.clear();
+ encode(service_map, pending_service_map_bl, mon.get_quorum_con_features());
+}
+
+void MgrStatMonitor::encode_pending(MonitorDBStore::TransactionRef t)
+{
+ ++version;
+ dout(10) << " " << version << dendl;
+ bufferlist bl;
+ encode(pending_digest, bl, mon.get_quorum_con_features());
+ ceph_assert(pending_service_map_bl.length());
+ bl.append(pending_service_map_bl);
+ encode(pending_progress_events, bl);
+ put_version(t, version, bl);
+ put_last_committed(t, version);
+
+ encode_health(pending_health_checks, t);
+}
+
+version_t MgrStatMonitor::get_trim_to() const
+{
+ // we don't actually need *any* old states, but keep a few.
+ if (version > 5) {
+ return version - 5;
+ }
+ return 0;
+}
+
+void MgrStatMonitor::on_active()
+{
+ update_logger();
+}
+
+void MgrStatMonitor::tick()
+{
+}
+
+bool MgrStatMonitor::preprocess_query(MonOpRequestRef op)
+{
+ auto m = op->get_req<PaxosServiceMessage>();
+ switch (m->get_type()) {
+ case CEPH_MSG_STATFS:
+ return preprocess_statfs(op);
+ case MSG_MON_MGR_REPORT:
+ return preprocess_report(op);
+ case MSG_GETPOOLSTATS:
+ return preprocess_getpoolstats(op);
+ default:
+ mon.no_reply(op);
+ derr << "Unhandled message type " << m->get_type() << dendl;
+ return true;
+ }
+}
+
+bool MgrStatMonitor::prepare_update(MonOpRequestRef op)
+{
+ auto m = op->get_req<PaxosServiceMessage>();
+ switch (m->get_type()) {
+ case MSG_MON_MGR_REPORT:
+ return prepare_report(op);
+ default:
+ mon.no_reply(op);
+ derr << "Unhandled message type " << m->get_type() << dendl;
+ return true;
+ }
+}
+
+bool MgrStatMonitor::preprocess_report(MonOpRequestRef op)
+{
+ auto m = op->get_req<MMonMgrReport>();
+ mon.no_reply(op);
+ if (m->gid &&
+ m->gid != mon.mgrmon()->get_map().get_active_gid()) {
+ dout(10) << "ignoring report from non-active mgr " << m->gid
+ << dendl;
+ return true;
+ }
+ return false;
+}
+
+bool MgrStatMonitor::prepare_report(MonOpRequestRef op)
+{
+ auto m = op->get_req<MMonMgrReport>();
+ bufferlist bl = m->get_data();
+ auto p = bl.cbegin();
+ decode(pending_digest, p);
+ pending_health_checks.swap(m->health_checks);
+ if (m->service_map_bl.length()) {
+ pending_service_map_bl.swap(m->service_map_bl);
+ }
+ pending_progress_events.swap(m->progress_events);
+ dout(10) << __func__ << " " << pending_digest << ", "
+ << pending_health_checks.checks.size() << " health checks, "
+ << progress_events.size() << " progress events" << dendl;
+ dout(20) << "pending_digest:\n";
+ JSONFormatter jf(true);
+ jf.open_object_section("pending_digest");
+ pending_digest.dump(&jf);
+ jf.close_section();
+ jf.flush(*_dout);
+ *_dout << dendl;
+ dout(20) << "health checks:\n";
+ JSONFormatter jf(true);
+ jf.open_object_section("health_checks");
+ pending_health_checks.dump(&jf);
+ jf.close_section();
+ jf.flush(*_dout);
+ *_dout << dendl;
+ dout(20) << "progress events:\n";
+ JSONFormatter jf(true);
+ jf.open_object_section("progress_events");
+ for (auto& i : pending_progress_events) {
+ jf.dump_object(i.first.c_str(), i.second);
+ }
+ jf.close_section();
+ jf.flush(*_dout);
+ *_dout << dendl;
+ return true;
+}
+
+bool MgrStatMonitor::preprocess_getpoolstats(MonOpRequestRef op)
+{
+ op->mark_pgmon_event(__func__);
+ auto m = op->get_req<MGetPoolStats>();
+ auto session = op->get_session();
+ if (!session)
+ return true;
+ if (!session->is_capable("pg", MON_CAP_R)) {
+ dout(0) << "MGetPoolStats received from entity with insufficient caps "
+ << session->caps << dendl;
+ return true;
+ }
+ if (m->fsid != mon.monmap->fsid) {
+ dout(0) << __func__ << " on fsid "
+ << m->fsid << " != " << mon.monmap->fsid << dendl;
+ return true;
+ }
+ epoch_t ver = get_last_committed();
+ auto reply = new MGetPoolStatsReply(m->fsid, m->get_tid(), ver);
+ reply->per_pool = digest.use_per_pool_stats();
+ for (const auto& pool_name : m->pools) {
+ const auto pool_id = mon.osdmon()->osdmap.lookup_pg_pool_name(pool_name);
+ if (pool_id == -ENOENT)
+ continue;
+ auto pool_stat = get_pool_stat(pool_id);
+ if (!pool_stat)
+ continue;
+ reply->pool_stats[pool_name] = *pool_stat;
+ }
+ mon.send_reply(op, reply);
+ return true;
+}
+
+bool MgrStatMonitor::preprocess_statfs(MonOpRequestRef op)
+{
+ op->mark_pgmon_event(__func__);
+ auto statfs = op->get_req<MStatfs>();
+ auto session = op->get_session();
+
+ if (!session)
+ return true;
+ if (!session->is_capable("pg", MON_CAP_R)) {
+ dout(0) << "MStatfs received from entity with insufficient privileges "
+ << session->caps << dendl;
+ return true;
+ }
+ if (statfs->fsid != mon.monmap->fsid) {
+ dout(0) << __func__ << " on fsid " << statfs->fsid
+ << " != " << mon.monmap->fsid << dendl;
+ return true;
+ }
+ const auto& pool = statfs->data_pool;
+ if (pool && !mon.osdmon()->osdmap.have_pg_pool(*pool)) {
+ // There's no error field for MStatfsReply so just ignore the request.
+ // This is known to happen when a client is still accessing a removed fs.
+ dout(1) << __func__ << " on removed pool " << *pool << dendl;
+ return true;
+ }
+ dout(10) << __func__ << " " << *statfs
+ << " from " << statfs->get_orig_source() << dendl;
+ epoch_t ver = get_last_committed();
+ auto reply = new MStatfsReply(statfs->fsid, statfs->get_tid(), ver);
+ reply->h.st = get_statfs(mon.osdmon()->osdmap, pool);
+ mon.send_reply(op, reply);
+ return true;
+}
+
+void MgrStatMonitor::check_sub(Subscription *sub)
+{
+ dout(10) << __func__
+ << " next " << sub->next
+ << " vs service_map.epoch " << service_map.epoch << dendl;
+ if (sub->next <= service_map.epoch) {
+ auto m = new MServiceMap(service_map);
+ sub->session->con->send_message(m);
+ if (sub->onetime) {
+ mon.with_session_map([sub](MonSessionMap& session_map) {
+ session_map.remove_sub(sub);
+ });
+ } else {
+ sub->next = service_map.epoch + 1;
+ }
+ }
+}
+
+void MgrStatMonitor::check_subs()
+{
+ dout(10) << __func__ << dendl;
+ if (!service_map.epoch) {
+ return;
+ }
+ auto subs = mon.session_map.subs.find("servicemap");
+ if (subs == mon.session_map.subs.end()) {
+ return;
+ }
+ auto p = subs->second->begin();
+ while (!p.end()) {
+ auto sub = *p;
+ ++p;
+ check_sub(sub);
+ }
+}