diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/mds/Beacon.cc | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/mds/Beacon.cc')
-rw-r--r-- | src/mds/Beacon.cc | 484 |
1 files changed, 484 insertions, 0 deletions
diff --git a/src/mds/Beacon.cc b/src/mds/Beacon.cc new file mode 100644 index 00000000..b66550bd --- /dev/null +++ b/src/mds/Beacon.cc @@ -0,0 +1,484 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2012 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include "common/dout.h" +#include "common/HeartbeatMap.h" + +#include "include/stringify.h" +#include "include/util.h" + +#include "mon/MonClient.h" +#include "mds/MDLog.h" +#include "mds/MDSRank.h" +#include "mds/MDSMap.h" +#include "mds/Locker.h" + +#include "Beacon.h" + +#include <chrono> + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mds +#undef dout_prefix +#define dout_prefix *_dout << "mds.beacon." << name << ' ' + +using namespace std::chrono_literals; + +Beacon::Beacon(CephContext *cct, MonClient *monc, std::string_view name) + : + Dispatcher(cct), + beacon_interval(g_conf()->mds_beacon_interval), + monc(monc), + name(name) +{ +} + +Beacon::~Beacon() +{ + shutdown(); +} + +void Beacon::shutdown() +{ + std::unique_lock<std::mutex> lock(mutex); + if (!finished) { + finished = true; + lock.unlock(); + if (sender.joinable()) + sender.join(); + } +} + +void Beacon::init(const MDSMap &mdsmap) +{ + std::unique_lock lock(mutex); + + _notify_mdsmap(mdsmap); + + sender = std::thread([this]() { + std::unique_lock<std::mutex> lock(mutex); + std::condition_variable c; // no one wakes us + while (!finished) { + auto now = clock::now(); + auto since = std::chrono::duration<double>(now-last_send).count(); + auto interval = beacon_interval; + if (since >= interval*.90) { + if (!_send()) { + interval = 0.5; /* 500ms */ + } + } else { + interval -= since; + } + dout(20) << "sender thread waiting interval " << interval << "s" << dendl; + c.wait_for(lock, interval*1s); + } + }); +} + +bool Beacon::ms_can_fast_dispatch2(const Message::const_ref& m) const +{ + return m->get_type() == MSG_MDS_BEACON; +} + +void Beacon::ms_fast_dispatch2(const Message::ref& m) +{ + bool handled = ms_dispatch2(m); + ceph_assert(handled); +} + +bool Beacon::ms_dispatch2(const Message::ref& m) +{ + if (m->get_type() == MSG_MDS_BEACON) { + if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) { + handle_mds_beacon(MMDSBeacon::msgref_cast(m)); + } + return true; + } + + return false; +} + + +/** + * Update lagginess state based on response from remote MDSMonitor + * + * This function puts the passed message before returning + */ +void Beacon::handle_mds_beacon(const MMDSBeacon::const_ref &m) +{ + std::unique_lock lock(mutex); + + version_t seq = m->get_seq(); + + // update lab + auto it = seq_stamp.find(seq); + if (it != seq_stamp.end()) { + auto now = clock::now(); + + last_acked_stamp = it->second; + auto rtt = std::chrono::duration<double>(now - last_acked_stamp).count(); + + dout(5) << "received beacon reply " << ceph_mds_state_name(m->get_state()) << " seq " << m->get_seq() << " rtt " << rtt << dendl; + + if (laggy && rtt < g_conf()->mds_beacon_grace) { + dout(0) << " MDS is no longer laggy" << dendl; + laggy = false; + last_laggy = now; + } + + // clean up seq_stamp map + seq_stamp.erase(seq_stamp.begin(), ++it); + + // Wake a waiter up if present + cvar.notify_all(); + } else { + dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state()) + << " seq " << m->get_seq() << " dne" << dendl; + } +} + + +void Beacon::send() +{ + std::unique_lock lock(mutex); + _send(); +} + + +void Beacon::send_and_wait(const double duration) +{ + std::unique_lock lock(mutex); + _send(); + auto awaiting_seq = last_seq; + dout(20) << __func__ << ": awaiting " << awaiting_seq + << " for up to " << duration << "s" << dendl; + + auto start = clock::now(); + while (!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq) { + auto now = clock::now(); + auto s = duration*.95-std::chrono::duration<double>(now-start).count(); + if (s < 0) break; + cvar.wait_for(lock, s*1s); + } +} + + +/** + * Call periodically, or when you have updated the desired state + */ +bool Beacon::_send() +{ + auto now = clock::now(); + auto since = std::chrono::duration<double>(now-last_acked_stamp).count(); + + if (!cct->get_heartbeat_map()->is_healthy()) { + /* If anything isn't progressing, let avoid sending a beacon so that + * the MDS will consider us laggy */ + dout(0) << "Skipping beacon heartbeat to monitors (last acked " << since << "s ago); MDS internal heartbeat is not healthy!" << dendl; + return false; + } + + ++last_seq; + dout(5) << "Sending beacon " << ceph_mds_state_name(want_state) << " seq " << last_seq << dendl; + + seq_stamp[last_seq] = now; + + ceph_assert(want_state != MDSMap::STATE_NULL); + + auto beacon = MMDSBeacon::create( + monc->get_fsid(), mds_gid_t(monc->get_global_id()), + name, + epoch, + want_state, + last_seq, + CEPH_FEATURES_SUPPORTED_DEFAULT); + + beacon->set_health(health); + beacon->set_compat(compat); + // piggyback the sys info on beacon msg + if (want_state == MDSMap::STATE_BOOT) { + map<string, string> sys_info; + collect_sys_info(&sys_info, cct); + sys_info["addr"] = stringify(monc->get_myaddrs()); + beacon->set_sys_info(sys_info); + } + monc->send_mon_message(beacon.detach()); + last_send = now; + return true; +} + +/** + * Call this when there is a new MDSMap available + */ +void Beacon::notify_mdsmap(const MDSMap &mdsmap) +{ + std::unique_lock lock(mutex); + + _notify_mdsmap(mdsmap); +} + +void Beacon::_notify_mdsmap(const MDSMap &mdsmap) +{ + ceph_assert(mdsmap.get_epoch() >= epoch); + + if (mdsmap.get_epoch() != epoch) { + epoch = mdsmap.get_epoch(); + compat = MDSMap::get_compat_set_default(); + compat.merge(mdsmap.compat); + } +} + + +bool Beacon::is_laggy() +{ + std::unique_lock lock(mutex); + + auto now = clock::now(); + auto since = std::chrono::duration<double>(now-last_acked_stamp).count(); + if (since > g_conf()->mds_beacon_grace) { + if (!laggy) { + dout(1) << "MDS connection to Monitors appears to be laggy; " << since + << "s since last acked beacon" << dendl; + } + laggy = true; + return true; + } + return false; +} + +void Beacon::set_want_state(const MDSMap &mdsmap, MDSMap::DaemonState newstate) +{ + std::unique_lock lock(mutex); + + // Update mdsmap epoch atomically with updating want_state, so that when + // we send a beacon with the new want state it has the latest epoch, and + // once we have updated to the latest epoch, we are not sending out + // a stale want_state (i.e. one from before making it through MDSMap + // handling) + _notify_mdsmap(mdsmap); + + if (want_state != newstate) { + dout(5) << __func__ << ": " + << ceph_mds_state_name(want_state) << " -> " + << ceph_mds_state_name(newstate) << dendl; + want_state = newstate; + } +} + + +/** + * We are 'shown' an MDS briefly in order to update + * some health metrics that we will send in the next + * beacon. + */ +void Beacon::notify_health(MDSRank const *mds) +{ + std::unique_lock lock(mutex); + if (!mds) { + // No MDS rank held + return; + } + + // I'm going to touch this MDS, so it must be locked + ceph_assert(mds->mds_lock.is_locked_by_me()); + + health.metrics.clear(); + + // Detect presence of entries in DamageTable + if (!mds->damage_table.empty()) { + MDSHealthMetric m(MDS_HEALTH_DAMAGE, HEALTH_ERR, std::string( + "Metadata damage detected")); + health.metrics.push_back(m); + } + + // Detect MDS_HEALTH_TRIM condition + // Indicates MDS is not trimming promptly + { + if (mds->mdlog->get_num_segments() > (size_t)(g_conf()->mds_log_max_segments * g_conf().get_val<double>("mds_log_warn_factor"))) { + std::ostringstream oss; + oss << "Behind on trimming (" << mds->mdlog->get_num_segments() + << "/" << g_conf()->mds_log_max_segments << ")"; + + MDSHealthMetric m(MDS_HEALTH_TRIM, HEALTH_WARN, oss.str()); + m.metadata["num_segments"] = stringify(mds->mdlog->get_num_segments()); + m.metadata["max_segments"] = stringify(g_conf()->mds_log_max_segments); + health.metrics.push_back(m); + } + } + + // Detect clients failing to respond to modifications to capabilities in + // CLIENT_CAPS messages. + { + std::list<client_t> late_clients; + mds->locker->get_late_revoking_clients(&late_clients, + mds->mdsmap->get_session_timeout()); + std::list<MDSHealthMetric> late_cap_metrics; + + for (std::list<client_t>::iterator i = late_clients.begin(); i != late_clients.end(); ++i) { + + // client_t is equivalent to session.info.inst.name.num + // Construct an entity_name_t to lookup into SessionMap + entity_name_t ename(CEPH_ENTITY_TYPE_CLIENT, i->v); + Session const *s = mds->sessionmap.get_session(ename); + if (s == NULL) { + // Shouldn't happen, but not worth crashing if it does as this is + // just health-reporting code. + derr << "Client ID without session: " << i->v << dendl; + continue; + } + + std::ostringstream oss; + oss << "Client " << s->get_human_name() << " failing to respond to capability release"; + MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE, HEALTH_WARN, oss.str()); + m.metadata["client_id"] = stringify(i->v); + late_cap_metrics.push_back(m); + } + + if (late_cap_metrics.size() <= (size_t)g_conf()->mds_health_summarize_threshold) { + health.metrics.splice(health.metrics.end(), late_cap_metrics); + } else { + std::ostringstream oss; + oss << "Many clients (" << late_cap_metrics.size() + << ") failing to respond to capability release"; + MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE_MANY, HEALTH_WARN, oss.str()); + m.metadata["client_count"] = stringify(late_cap_metrics.size()); + health.metrics.push_back(m); + late_cap_metrics.clear(); + } + } + + // Detect clients failing to generate cap releases from CEPH_SESSION_RECALL_STATE + // messages. May be due to buggy client or resource-hogging application. + // + // Detect clients failing to advance their old_client_tid + { + set<Session*> sessions; + mds->sessionmap.get_client_session_set(sessions); + + const auto min_caps_working_set = g_conf().get_val<uint64_t>("mds_min_caps_working_set"); + const auto recall_warning_threshold = g_conf().get_val<Option::size_t>("mds_recall_warning_threshold"); + const auto max_completed_requests = g_conf()->mds_max_completed_requests; + const auto max_completed_flushes = g_conf()->mds_max_completed_flushes; + std::list<MDSHealthMetric> late_recall_metrics; + std::list<MDSHealthMetric> large_completed_requests_metrics; + for (auto& session : sessions) { + const uint64_t num_caps = session->get_num_caps(); + const uint64_t recall_caps = session->get_recall_caps(); + if (recall_caps > recall_warning_threshold && num_caps > min_caps_working_set) { + dout(2) << "Session " << *session << + " is not releasing caps fast enough. Recalled caps at " << recall_caps + << " > " << recall_warning_threshold << " (mds_recall_warning_threshold)." << dendl; + std::ostringstream oss; + oss << "Client " << session->get_human_name() << " failing to respond to cache pressure"; + MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, oss.str()); + m.metadata["client_id"] = stringify(session->get_client()); + late_recall_metrics.push_back(m); + } + if ((session->get_num_trim_requests_warnings() > 0 && + session->get_num_completed_requests() >= max_completed_requests) || + (session->get_num_trim_flushes_warnings() > 0 && + session->get_num_completed_flushes() >= max_completed_flushes)) { + std::ostringstream oss; + oss << "Client " << session->get_human_name() << " failing to advance its oldest client/flush tid. "; + MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID, HEALTH_WARN, oss.str()); + m.metadata["client_id"] = stringify(session->get_client()); + large_completed_requests_metrics.push_back(m); + } + } + + if (late_recall_metrics.size() <= (size_t)g_conf()->mds_health_summarize_threshold) { + health.metrics.splice(health.metrics.end(), late_recall_metrics); + } else { + std::ostringstream oss; + oss << "Many clients (" << late_recall_metrics.size() + << ") failing to respond to cache pressure"; + MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL_MANY, HEALTH_WARN, oss.str()); + m.metadata["client_count"] = stringify(late_recall_metrics.size()); + health.metrics.push_back(m); + late_recall_metrics.clear(); + } + + if (large_completed_requests_metrics.size() <= (size_t)g_conf()->mds_health_summarize_threshold) { + health.metrics.splice(health.metrics.end(), large_completed_requests_metrics); + } else { + std::ostringstream oss; + oss << "Many clients (" << large_completed_requests_metrics.size() + << ") failing to advance their oldest client/flush tid"; + MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID_MANY, HEALTH_WARN, oss.str()); + m.metadata["client_count"] = stringify(large_completed_requests_metrics.size()); + health.metrics.push_back(m); + large_completed_requests_metrics.clear(); + } + } + + // Detect MDS_HEALTH_SLOW_REQUEST condition + { + int slow = mds->get_mds_slow_req_count(); + if (slow) { + dout(20) << slow << " slow request found" << dendl; + std::ostringstream oss; + oss << slow << " slow requests are blocked > " << g_conf()->mds_op_complaint_time << " secs"; + + MDSHealthMetric m(MDS_HEALTH_SLOW_REQUEST, HEALTH_WARN, oss.str()); + health.metrics.push_back(m); + } + } + + { + auto complaint_time = g_conf()->osd_op_complaint_time; + auto now = clock::now(); + auto cutoff = now - ceph::make_timespan(complaint_time); + + std::string count; + ceph::coarse_mono_time oldest; + if (MDSIOContextBase::check_ios_in_flight(cutoff, count, oldest)) { + dout(20) << count << " slow metadata IOs found" << dendl; + + auto oldest_secs = std::chrono::duration<double>(now - oldest).count(); + std::ostringstream oss; + oss << count << " slow metadata IOs are blocked > " << complaint_time + << " secs, oldest blocked for " << (int64_t)oldest_secs << " secs"; + + MDSHealthMetric m(MDS_HEALTH_SLOW_METADATA_IO, HEALTH_WARN, oss.str()); + health.metrics.push_back(m); + } + } + + // Report a health warning if we are readonly + if (mds->mdcache->is_readonly()) { + MDSHealthMetric m(MDS_HEALTH_READ_ONLY, HEALTH_WARN, + "MDS in read-only mode"); + health.metrics.push_back(m); + } + + // Report if we have significantly exceeded our cache size limit + if (mds->mdcache->cache_overfull()) { + std::ostringstream oss; + oss << "MDS cache is too large (" << bytes2str(mds->mdcache->cache_size()) + << "/" << bytes2str(mds->mdcache->cache_limit_memory()) << "); " + << mds->mdcache->num_inodes_with_caps << " inodes in use by clients, " + << mds->mdcache->get_num_strays() << " stray files"; + + MDSHealthMetric m(MDS_HEALTH_CACHE_OVERSIZED, HEALTH_WARN, oss.str()); + health.metrics.push_back(m); + } +} + +MDSMap::DaemonState Beacon::get_want_state() const +{ + std::unique_lock lock(mutex); + return want_state; +} + |