summaryrefslogtreecommitdiffstats
path: root/src/mgr/DaemonServer.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/mgr/DaemonServer.cc
parentInitial commit. (diff)
downloadceph-upstream/18.2.2.tar.xz
ceph-upstream/18.2.2.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/mgr/DaemonServer.cc')
-rw-r--r--src/mgr/DaemonServer.cc3142
1 files changed, 3142 insertions, 0 deletions
diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc
new file mode 100644
index 000000000..0e9e6be2a
--- /dev/null
+++ b/src/mgr/DaemonServer.cc
@@ -0,0 +1,3142 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#include "DaemonServer.h"
+#include <boost/algorithm/string.hpp>
+#include "mgr/Mgr.h"
+
+#include "include/stringify.h"
+#include "include/str_list.h"
+#include "auth/RotatingKeyRing.h"
+#include "json_spirit/json_spirit_writer.h"
+
+#include "mgr/mgr_commands.h"
+#include "mgr/DaemonHealthMetricCollector.h"
+#include "mgr/OSDPerfMetricCollector.h"
+#include "mgr/MDSPerfMetricCollector.h"
+#include "mon/MonCommand.h"
+
+#include "messages/MMgrOpen.h"
+#include "messages/MMgrUpdate.h"
+#include "messages/MMgrClose.h"
+#include "messages/MMgrConfigure.h"
+#include "messages/MMonMgrReport.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
+#include "messages/MMgrCommand.h"
+#include "messages/MMgrCommandReply.h"
+#include "messages/MPGStats.h"
+#include "messages/MOSDScrub2.h"
+#include "messages/MOSDForceRecovery.h"
+#include "common/errno.h"
+#include "common/pick_address.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr.server " << __func__ << " "
+
+using namespace TOPNSPC::common;
+
+using std::list;
+using std::ostringstream;
+using std::string;
+using std::stringstream;
+using std::vector;
+using std::unique_ptr;
+
+namespace {
+ template <typename Map>
+ bool map_compare(Map const &lhs, Map const &rhs) {
+ return lhs.size() == rhs.size()
+ && std::equal(lhs.begin(), lhs.end(), rhs.begin(),
+ [] (auto a, auto b) { return a.first == b.first && a.second == b.second; });
+ }
+}
+
+DaemonServer::DaemonServer(MonClient *monc_,
+ Finisher &finisher_,
+ DaemonStateIndex &daemon_state_,
+ ClusterState &cluster_state_,
+ PyModuleRegistry &py_modules_,
+ LogChannelRef clog_,
+ LogChannelRef audit_clog_)
+ : Dispatcher(g_ceph_context),
+ client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
+ g_conf().get_val<Option::size_t>("mgr_client_bytes"))),
+ client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
+ g_conf().get_val<uint64_t>("mgr_client_messages"))),
+ osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
+ g_conf().get_val<Option::size_t>("mgr_osd_bytes"))),
+ osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
+ g_conf().get_val<uint64_t>("mgr_osd_messages"))),
+ mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
+ g_conf().get_val<Option::size_t>("mgr_mds_bytes"))),
+ mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
+ g_conf().get_val<uint64_t>("mgr_mds_messages"))),
+ mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
+ g_conf().get_val<Option::size_t>("mgr_mon_bytes"))),
+ mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
+ g_conf().get_val<uint64_t>("mgr_mon_messages"))),
+ msgr(nullptr),
+ monc(monc_),
+ finisher(finisher_),
+ daemon_state(daemon_state_),
+ cluster_state(cluster_state_),
+ py_modules(py_modules_),
+ clog(clog_),
+ audit_clog(audit_clog_),
+ pgmap_ready(false),
+ timer(g_ceph_context, lock),
+ shutting_down(false),
+ tick_event(nullptr),
+ osd_perf_metric_collector_listener(this),
+ osd_perf_metric_collector(osd_perf_metric_collector_listener),
+ mds_perf_metric_collector_listener(this),
+ mds_perf_metric_collector(mds_perf_metric_collector_listener)
+{
+ g_conf().add_observer(this);
+}
+
+DaemonServer::~DaemonServer() {
+ delete msgr;
+ g_conf().remove_observer(this);
+}
+
+int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs)
+{
+ // Initialize Messenger
+ std::string public_msgr_type = g_conf()->ms_public_type.empty() ?
+ g_conf().get_val<std::string>("ms_type") : g_conf()->ms_public_type;
+ msgr = Messenger::create(g_ceph_context, public_msgr_type,
+ entity_name_t::MGR(gid),
+ "mgr",
+ Messenger::get_pid_nonce());
+ msgr->set_default_policy(Messenger::Policy::stateless_server(0));
+
+ msgr->set_auth_client(monc);
+
+ // throttle clients
+ msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
+ client_byte_throttler.get(),
+ client_msg_throttler.get());
+
+ // servers
+ msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
+ osd_byte_throttler.get(),
+ osd_msg_throttler.get());
+ msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
+ mds_byte_throttler.get(),
+ mds_msg_throttler.get());
+ msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
+ mon_byte_throttler.get(),
+ mon_msg_throttler.get());
+
+ entity_addrvec_t addrs;
+ int r = pick_addresses(cct, CEPH_PICK_ADDRESS_PUBLIC, &addrs);
+ if (r < 0) {
+ return r;
+ }
+ dout(20) << __func__ << " will bind to " << addrs << dendl;
+ r = msgr->bindv(addrs);
+ if (r < 0) {
+ derr << "unable to bind mgr to " << addrs << dendl;
+ return r;
+ }
+
+ msgr->set_myname(entity_name_t::MGR(gid));
+ msgr->set_addr_unknowns(client_addrs);
+
+ msgr->start();
+ msgr->add_dispatcher_tail(this);
+
+ msgr->set_auth_server(monc);
+ monc->set_handle_authentication_dispatcher(this);
+
+ started_at = ceph_clock_now();
+
+ std::lock_guard l(lock);
+ timer.init();
+
+ schedule_tick_locked(
+ g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
+
+ return 0;
+}
+
+entity_addrvec_t DaemonServer::get_myaddrs() const
+{
+ return msgr->get_myaddrs();
+}
+
+int DaemonServer::ms_handle_fast_authentication(Connection *con)
+{
+ auto s = ceph::make_ref<MgrSession>(cct);
+ con->set_priv(s);
+ s->inst.addr = con->get_peer_addr();
+ s->entity_name = con->peer_name;
+ dout(10) << __func__ << " new session " << s << " con " << con
+ << " entity " << con->peer_name
+ << " addr " << con->get_peer_addrs()
+ << dendl;
+
+ AuthCapsInfo &caps_info = con->get_peer_caps_info();
+ if (caps_info.allow_all) {
+ dout(10) << " session " << s << " " << s->entity_name
+ << " allow_all" << dendl;
+ s->caps.set_allow_all();
+ } else if (caps_info.caps.length() > 0) {
+ auto p = caps_info.caps.cbegin();
+ string str;
+ try {
+ decode(str, p);
+ }
+ catch (buffer::error& e) {
+ dout(10) << " session " << s << " " << s->entity_name
+ << " failed to decode caps" << dendl;
+ return -EACCES;
+ }
+ if (!s->caps.parse(str)) {
+ dout(10) << " session " << s << " " << s->entity_name
+ << " failed to parse caps '" << str << "'" << dendl;
+ return -EACCES;
+ }
+ dout(10) << " session " << s << " " << s->entity_name
+ << " has caps " << s->caps << " '" << str << "'" << dendl;
+ }
+ return 1;
+}
+
+void DaemonServer::ms_handle_accept(Connection* con)
+{
+ if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
+ auto s = ceph::ref_cast<MgrSession>(con->get_priv());
+ std::lock_guard l(lock);
+ s->osd_id = atoi(s->entity_name.get_id().c_str());
+ dout(10) << "registering osd." << s->osd_id << " session "
+ << s << " con " << con << dendl;
+ osd_cons[s->osd_id].insert(con);
+ }
+}
+
+bool DaemonServer::ms_handle_reset(Connection *con)
+{
+ if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
+ auto priv = con->get_priv();
+ auto session = static_cast<MgrSession*>(priv.get());
+ if (!session) {
+ return false;
+ }
+ std::lock_guard l(lock);
+ dout(10) << "unregistering osd." << session->osd_id
+ << " session " << session << " con " << con << dendl;
+ osd_cons[session->osd_id].erase(con);
+
+ auto iter = daemon_connections.find(con);
+ if (iter != daemon_connections.end()) {
+ daemon_connections.erase(iter);
+ }
+ }
+ return false;
+}
+
+bool DaemonServer::ms_handle_refused(Connection *con)
+{
+ // do nothing for now
+ return false;
+}
+
+bool DaemonServer::ms_dispatch2(const ref_t<Message>& m)
+{
+ // Note that we do *not* take ::lock here, in order to avoid
+ // serializing all message handling. It's up to each handler
+ // to take whatever locks it needs.
+ switch (m->get_type()) {
+ case MSG_PGSTATS:
+ cluster_state.ingest_pgstats(ref_cast<MPGStats>(m));
+ maybe_ready(m->get_source().num());
+ return true;
+ case MSG_MGR_REPORT:
+ return handle_report(ref_cast<MMgrReport>(m));
+ case MSG_MGR_OPEN:
+ return handle_open(ref_cast<MMgrOpen>(m));
+ case MSG_MGR_UPDATE:
+ return handle_update(ref_cast<MMgrUpdate>(m));
+ case MSG_MGR_CLOSE:
+ return handle_close(ref_cast<MMgrClose>(m));
+ case MSG_COMMAND:
+ return handle_command(ref_cast<MCommand>(m));
+ case MSG_MGR_COMMAND:
+ return handle_command(ref_cast<MMgrCommand>(m));
+ default:
+ dout(1) << "Unhandled message type " << m->get_type() << dendl;
+ return false;
+ };
+}
+
+void DaemonServer::dump_pg_ready(ceph::Formatter *f)
+{
+ f->dump_bool("pg_ready", pgmap_ready.load());
+}
+
+void DaemonServer::maybe_ready(int32_t osd_id)
+{
+ if (pgmap_ready.load()) {
+ // Fast path: we don't need to take lock because pgmap_ready
+ // is already set
+ } else {
+ std::lock_guard l(lock);
+
+ if (reported_osds.find(osd_id) == reported_osds.end()) {
+ dout(4) << "initial report from osd " << osd_id << dendl;
+ reported_osds.insert(osd_id);
+ std::set<int32_t> up_osds;
+
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ osdmap.get_up_osds(up_osds);
+ });
+
+ std::set<int32_t> unreported_osds;
+ std::set_difference(up_osds.begin(), up_osds.end(),
+ reported_osds.begin(), reported_osds.end(),
+ std::inserter(unreported_osds, unreported_osds.begin()));
+
+ if (unreported_osds.size() == 0) {
+ dout(4) << "all osds have reported, sending PG state to mon" << dendl;
+ pgmap_ready = true;
+ reported_osds.clear();
+ // Avoid waiting for next tick
+ send_report();
+ } else {
+ dout(4) << "still waiting for " << unreported_osds.size() << " osds"
+ " to report in before PGMap is ready" << dendl;
+ }
+ }
+ }
+}
+
+void DaemonServer::tick()
+{
+ dout(10) << dendl;
+ send_report();
+ adjust_pgs();
+
+ schedule_tick_locked(
+ g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
+}
+
+// Currently modules do not set health checks in response to events delivered to
+// all modules (e.g. notify) so we do not risk a thundering hurd situation here.
+// if this pattern emerges in the future, this scheduler could be modified to
+// fire after all modules have had a chance to set their health checks.
+void DaemonServer::schedule_tick_locked(double delay_sec)
+{
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
+
+ if (tick_event) {
+ timer.cancel_event(tick_event);
+ tick_event = nullptr;
+ }
+
+ // on shutdown start rejecting explicit requests to send reports that may
+ // originate from python land which may still be running.
+ if (shutting_down)
+ return;
+
+ tick_event = timer.add_event_after(delay_sec,
+ new LambdaContext([this](int r) {
+ tick();
+ }));
+}
+
+void DaemonServer::schedule_tick(double delay_sec)
+{
+ std::lock_guard l(lock);
+ schedule_tick_locked(delay_sec);
+}
+
+void DaemonServer::handle_osd_perf_metric_query_updated()
+{
+ dout(10) << dendl;
+
+ // Send a fresh MMgrConfigure to all clients, so that they can follow
+ // the new policy for transmitting stats
+ finisher.queue(new LambdaContext([this](int r) {
+ std::lock_guard l(lock);
+ for (auto &c : daemon_connections) {
+ if (c->peer_is_osd()) {
+ _send_configure(c);
+ }
+ }
+ }));
+}
+
+void DaemonServer::handle_mds_perf_metric_query_updated()
+{
+ dout(10) << dendl;
+
+ // Send a fresh MMgrConfigure to all clients, so that they can follow
+ // the new policy for transmitting stats
+ finisher.queue(new LambdaContext([this](int r) {
+ std::lock_guard l(lock);
+ for (auto &c : daemon_connections) {
+ if (c->peer_is_mds()) {
+ _send_configure(c);
+ }
+ }
+ }));
+}
+
+void DaemonServer::shutdown()
+{
+ dout(10) << "begin" << dendl;
+ msgr->shutdown();
+ msgr->wait();
+ cluster_state.shutdown();
+ dout(10) << "done" << dendl;
+
+ std::lock_guard l(lock);
+ shutting_down = true;
+ timer.shutdown();
+}
+
+static DaemonKey key_from_service(
+ const std::string& service_name,
+ int peer_type,
+ const std::string& daemon_name)
+{
+ if (!service_name.empty()) {
+ return DaemonKey{service_name, daemon_name};
+ } else {
+ return DaemonKey{ceph_entity_type_name(peer_type), daemon_name};
+ }
+}
+
+void DaemonServer::fetch_missing_metadata(const DaemonKey& key,
+ const entity_addr_t& addr)
+{
+ if (!daemon_state.is_updating(key) &&
+ (key.type == "osd" || key.type == "mds" || key.type == "mon")) {
+ std::ostringstream oss;
+ auto c = new MetadataUpdate(daemon_state, key);
+ if (key.type == "osd") {
+ oss << "{\"prefix\": \"osd metadata\", \"id\": "
+ << key.name<< "}";
+ } else if (key.type == "mds") {
+ c->set_default("addr", stringify(addr));
+ oss << "{\"prefix\": \"mds metadata\", \"who\": \""
+ << key.name << "\"}";
+ } else if (key.type == "mon") {
+ oss << "{\"prefix\": \"mon metadata\", \"id\": \""
+ << key.name << "\"}";
+ } else {
+ ceph_abort();
+ }
+ monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
+ }
+}
+
+bool DaemonServer::handle_open(const ref_t<MMgrOpen>& m)
+{
+ std::unique_lock l(lock);
+
+ DaemonKey key = key_from_service(m->service_name,
+ m->get_connection()->get_peer_type(),
+ m->daemon_name);
+
+ auto con = m->get_connection();
+ dout(10) << "from " << key << " " << con->get_peer_addr() << dendl;
+
+ _send_configure(con);
+
+ DaemonStatePtr daemon;
+ if (daemon_state.exists(key)) {
+ dout(20) << "updating existing DaemonState for " << key << dendl;
+ daemon = daemon_state.get(key);
+ }
+ if (!daemon) {
+ if (m->service_daemon) {
+ dout(4) << "constructing new DaemonState for " << key << dendl;
+ daemon = std::make_shared<DaemonState>(daemon_state.types);
+ daemon->key = key;
+ daemon->service_daemon = true;
+ daemon_state.insert(daemon);
+ } else {
+ /* A normal Ceph daemon has connected but we are or should be waiting on
+ * metadata for it. Close the session so that it tries to reconnect.
+ */
+ dout(2) << "ignoring open from " << key << " " << con->get_peer_addr()
+ << "; not ready for session (expect reconnect)" << dendl;
+ con->mark_down();
+ l.unlock();
+ fetch_missing_metadata(key, m->get_source_addr());
+ return true;
+ }
+ }
+ if (daemon) {
+ if (m->service_daemon) {
+ // update the metadata through the daemon state index to
+ // ensure it's kept up-to-date
+ daemon_state.update_metadata(daemon, m->daemon_metadata);
+ }
+
+ std::lock_guard l(daemon->lock);
+ daemon->perf_counters.clear();
+
+ daemon->service_daemon = m->service_daemon;
+ if (m->service_daemon) {
+ daemon->service_status = m->daemon_status;
+
+ utime_t now = ceph_clock_now();
+ auto [d, added] = pending_service_map.get_daemon(m->service_name,
+ m->daemon_name);
+ if (added || d->gid != (uint64_t)m->get_source().num()) {
+ dout(10) << "registering " << key << " in pending_service_map" << dendl;
+ d->gid = m->get_source().num();
+ d->addr = m->get_source_addr();
+ d->start_epoch = pending_service_map.epoch;
+ d->start_stamp = now;
+ d->metadata = m->daemon_metadata;
+ pending_service_map_dirty = pending_service_map.epoch;
+ }
+ }
+
+ auto p = m->config_bl.cbegin();
+ if (p != m->config_bl.end()) {
+ decode(daemon->config, p);
+ decode(daemon->ignored_mon_config, p);
+ dout(20) << " got config " << daemon->config
+ << " ignored " << daemon->ignored_mon_config << dendl;
+ }
+ daemon->config_defaults_bl = m->config_defaults_bl;
+ daemon->config_defaults.clear();
+ dout(20) << " got config_defaults_bl " << daemon->config_defaults_bl.length()
+ << " bytes" << dendl;
+ }
+
+ if (con->get_peer_type() != entity_name_t::TYPE_CLIENT &&
+ m->service_name.empty())
+ {
+ // Store in set of the daemon/service connections, i.e. those
+ // connections that require an update in the event of stats
+ // configuration changes.
+ daemon_connections.insert(con);
+ }
+
+ return true;
+}
+
+bool DaemonServer::handle_update(const ref_t<MMgrUpdate>& m)
+{
+ DaemonKey key;
+ if (!m->service_name.empty()) {
+ key.type = m->service_name;
+ } else {
+ key.type = ceph_entity_type_name(m->get_connection()->get_peer_type());
+ }
+ key.name = m->daemon_name;
+
+ dout(10) << "from " << m->get_connection() << " " << key << dendl;
+
+ if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
+ m->service_name.empty()) {
+ // Clients should not be sending us update request
+ dout(10) << "rejecting update request from non-daemon client " << m->daemon_name
+ << dendl;
+ clog->warn() << "rejecting report from non-daemon client " << m->daemon_name
+ << " at " << m->get_connection()->get_peer_addrs();
+ m->get_connection()->mark_down();
+ return true;
+ }
+
+
+ {
+ std::unique_lock locker(lock);
+
+ DaemonStatePtr daemon;
+ // Look up the DaemonState
+ if (daemon_state.exists(key)) {
+ dout(20) << "updating existing DaemonState for " << key << dendl;
+
+ daemon = daemon_state.get(key);
+ if (m->need_metadata_update &&
+ !m->daemon_metadata.empty()) {
+ daemon_state.update_metadata(daemon, m->daemon_metadata);
+ }
+ }
+ }
+
+ return true;
+}
+
+bool DaemonServer::handle_close(const ref_t<MMgrClose>& m)
+{
+ std::lock_guard l(lock);
+
+ DaemonKey key = key_from_service(m->service_name,
+ m->get_connection()->get_peer_type(),
+ m->daemon_name);
+ dout(4) << "from " << m->get_connection() << " " << key << dendl;
+
+ if (daemon_state.exists(key)) {
+ DaemonStatePtr daemon = daemon_state.get(key);
+ daemon_state.rm(key);
+ {
+ std::lock_guard l(daemon->lock);
+ if (daemon->service_daemon) {
+ pending_service_map.rm_daemon(m->service_name, m->daemon_name);
+ pending_service_map_dirty = pending_service_map.epoch;
+ }
+ }
+ }
+
+ // send same message back as a reply
+ m->get_connection()->send_message2(m);
+ return true;
+}
+
+void DaemonServer::update_task_status(
+ DaemonKey key,
+ const std::map<std::string,std::string>& task_status)
+{
+ dout(10) << "got task status from " << key << dendl;
+
+ [[maybe_unused]] auto [daemon, added] =
+ pending_service_map.get_daemon(key.type, key.name);
+ if (daemon->task_status != task_status) {
+ daemon->task_status = task_status;
+ pending_service_map_dirty = pending_service_map.epoch;
+ }
+}
+
+bool DaemonServer::handle_report(const ref_t<MMgrReport>& m)
+{
+ DaemonKey key;
+ if (!m->service_name.empty()) {
+ key.type = m->service_name;
+ } else {
+ key.type = ceph_entity_type_name(m->get_connection()->get_peer_type());
+ }
+ key.name = m->daemon_name;
+
+ dout(10) << "from " << m->get_connection() << " " << key << dendl;
+
+ if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
+ m->service_name.empty()) {
+ // Clients should not be sending us stats unless they are declaring
+ // themselves to be a daemon for some service.
+ dout(10) << "rejecting report from non-daemon client " << m->daemon_name
+ << dendl;
+ clog->warn() << "rejecting report from non-daemon client " << m->daemon_name
+ << " at " << m->get_connection()->get_peer_addrs();
+ m->get_connection()->mark_down();
+ return true;
+ }
+
+
+ {
+ std::unique_lock locker(lock);
+
+ DaemonStatePtr daemon;
+ // Look up the DaemonState
+ if (daemon = daemon_state.get(key); daemon != nullptr) {
+ dout(20) << "updating existing DaemonState for " << key << dendl;
+ } else {
+ locker.unlock();
+
+ // we don't know the hostname at this stage, reject MMgrReport here.
+ dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
+ << dendl;
+ // issue metadata request in background
+ fetch_missing_metadata(key, m->get_source_addr());
+
+ locker.lock();
+
+ // kill session
+ auto priv = m->get_connection()->get_priv();
+ auto session = static_cast<MgrSession*>(priv.get());
+ if (!session) {
+ return false;
+ }
+ m->get_connection()->mark_down();
+
+ dout(10) << "unregistering osd." << session->osd_id
+ << " session " << session << " con " << m->get_connection() << dendl;
+
+ if (osd_cons.find(session->osd_id) != osd_cons.end()) {
+ osd_cons[session->osd_id].erase(m->get_connection());
+ }
+
+ auto iter = daemon_connections.find(m->get_connection());
+ if (iter != daemon_connections.end()) {
+ daemon_connections.erase(iter);
+ }
+
+ return false;
+ }
+
+ // Update the DaemonState
+ ceph_assert(daemon != nullptr);
+ {
+ std::lock_guard l(daemon->lock);
+ auto &daemon_counters = daemon->perf_counters;
+ daemon_counters.update(*m.get());
+
+ auto p = m->config_bl.cbegin();
+ if (p != m->config_bl.end()) {
+ decode(daemon->config, p);
+ decode(daemon->ignored_mon_config, p);
+ dout(20) << " got config " << daemon->config
+ << " ignored " << daemon->ignored_mon_config << dendl;
+ }
+
+ utime_t now = ceph_clock_now();
+ if (daemon->service_daemon) {
+ if (m->daemon_status) {
+ daemon->service_status_stamp = now;
+ daemon->service_status = *m->daemon_status;
+ }
+ daemon->last_service_beacon = now;
+ } else if (m->daemon_status) {
+ derr << "got status from non-daemon " << key << dendl;
+ }
+ // update task status
+ if (m->task_status) {
+ update_task_status(key, *m->task_status);
+ daemon->last_service_beacon = now;
+ }
+ if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
+ // only OSD and MON send health_checks to me now
+ daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
+ dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
+ << dendl;
+ }
+ }
+ }
+
+ // if there are any schema updates, notify the python modules
+ /* no users currently
+ if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
+ py_modules.notify_all("perf_schema_update", ceph::to_string(key));
+ }
+ */
+
+ if (m->get_connection()->peer_is_osd()) {
+ osd_perf_metric_collector.process_reports(m->osd_perf_metric_reports);
+ }
+
+ if (m->metric_report_message) {
+ const MetricReportMessage &message = *m->metric_report_message;
+ boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
+ }
+
+ return true;
+}
+
+
+void DaemonServer::_generate_command_map(
+ cmdmap_t& cmdmap,
+ map<string,string> &param_str_map)
+{
+ for (auto p = cmdmap.begin();
+ p != cmdmap.end(); ++p) {
+ if (p->first == "prefix")
+ continue;
+ if (p->first == "caps") {
+ vector<string> cv;
+ if (cmd_getval(cmdmap, "caps", cv) &&
+ cv.size() % 2 == 0) {
+ for (unsigned i = 0; i < cv.size(); i += 2) {
+ string k = string("caps_") + cv[i];
+ param_str_map[k] = cv[i + 1];
+ }
+ continue;
+ }
+ }
+ param_str_map[p->first] = cmd_vartype_stringify(p->second);
+ }
+}
+
+const MonCommand *DaemonServer::_get_mgrcommand(
+ const string &cmd_prefix,
+ const std::vector<MonCommand> &cmds)
+{
+ const MonCommand *this_cmd = nullptr;
+ for (const auto &cmd : cmds) {
+ if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
+ this_cmd = &cmd;
+ break;
+ }
+ }
+ return this_cmd;
+}
+
+bool DaemonServer::_allowed_command(
+ MgrSession *s,
+ const string &service,
+ const string &module,
+ const string &prefix,
+ const cmdmap_t& cmdmap,
+ const map<string,string>& param_str_map,
+ const MonCommand *this_cmd) {
+
+ if (s->entity_name.is_mon()) {
+ // mon is all-powerful. even when it is forwarding commands on behalf of
+ // old clients; we expect the mon is validating commands before proxying!
+ return true;
+ }
+
+ bool cmd_r = this_cmd->requires_perm('r');
+ bool cmd_w = this_cmd->requires_perm('w');
+ bool cmd_x = this_cmd->requires_perm('x');
+
+ bool capable = s->caps.is_capable(
+ g_ceph_context,
+ s->entity_name,
+ service, module, prefix, param_str_map,
+ cmd_r, cmd_w, cmd_x,
+ s->get_peer_addr());
+
+ dout(10) << " " << s->entity_name << " "
+ << (capable ? "" : "not ") << "capable" << dendl;
+ return capable;
+}
+
+/**
+ * The working data for processing an MCommand. This lives in
+ * a class to enable passing it into other threads for processing
+ * outside of the thread/locks that called handle_command.
+ */
+class CommandContext {
+public:
+ ceph::ref_t<MCommand> m_tell;
+ ceph::ref_t<MMgrCommand> m_mgr;
+ const std::vector<std::string>& cmd; ///< ref into m_tell or m_mgr
+ const bufferlist& data; ///< ref into m_tell or m_mgr
+ bufferlist odata;
+ cmdmap_t cmdmap;
+
+ explicit CommandContext(ceph::ref_t<MCommand> m)
+ : m_tell{std::move(m)},
+ cmd(m_tell->cmd),
+ data(m_tell->get_data()) {
+ }
+ explicit CommandContext(ceph::ref_t<MMgrCommand> m)
+ : m_mgr{std::move(m)},
+ cmd(m_mgr->cmd),
+ data(m_mgr->get_data()) {
+ }
+
+ void reply(int r, const std::stringstream &ss) {
+ reply(r, ss.str());
+ }
+
+ void reply(int r, const std::string &rs) {
+ // Let the connection drop as soon as we've sent our response
+ ConnectionRef con = m_tell ? m_tell->get_connection()
+ : m_mgr->get_connection();
+ if (con) {
+ con->mark_disposable();
+ }
+
+ if (r == 0) {
+ dout(20) << "success" << dendl;
+ } else {
+ derr << __func__ << " " << cpp_strerror(r) << " " << rs << dendl;
+ }
+ if (con) {
+ if (m_tell) {
+ MCommandReply *reply = new MCommandReply(r, rs);
+ reply->set_tid(m_tell->get_tid());
+ reply->set_data(odata);
+ con->send_message(reply);
+ } else {
+ MMgrCommandReply *reply = new MMgrCommandReply(r, rs);
+ reply->set_tid(m_mgr->get_tid());
+ reply->set_data(odata);
+ con->send_message(reply);
+ }
+ }
+ }
+};
+
+/**
+ * A context for receiving a bufferlist/error string from a background
+ * function and then calling back to a CommandContext when it's done
+ */
+class ReplyOnFinish : public Context {
+ std::shared_ptr<CommandContext> cmdctx;
+
+public:
+ bufferlist from_mon;
+ string outs;
+
+ explicit ReplyOnFinish(const std::shared_ptr<CommandContext> &cmdctx_)
+ : cmdctx(cmdctx_)
+ {}
+ void finish(int r) override {
+ cmdctx->odata.claim_append(from_mon);
+ cmdctx->reply(r, outs);
+ }
+};
+
+bool DaemonServer::handle_command(const ref_t<MCommand>& m)
+{
+ std::lock_guard l(lock);
+ auto cmdctx = std::make_shared<CommandContext>(m);
+ try {
+ return _handle_command(cmdctx);
+ } catch (const bad_cmd_get& e) {
+ cmdctx->reply(-EINVAL, e.what());
+ return true;
+ }
+}
+
+bool DaemonServer::handle_command(const ref_t<MMgrCommand>& m)
+{
+ std::lock_guard l(lock);
+ auto cmdctx = std::make_shared<CommandContext>(m);
+ try {
+ return _handle_command(cmdctx);
+ } catch (const bad_cmd_get& e) {
+ cmdctx->reply(-EINVAL, e.what());
+ return true;
+ }
+}
+
+void DaemonServer::log_access_denied(
+ std::shared_ptr<CommandContext>& cmdctx,
+ MgrSession* session, std::stringstream& ss) {
+ dout(1) << " access denied" << dendl;
+ audit_clog->info() << "from='" << session->inst << "' "
+ << "entity='" << session->entity_name << "' "
+ << "cmd=" << cmdctx->cmd << ": access denied";
+ ss << "access denied: does your client key have mgr caps? "
+ "See http://docs.ceph.com/en/latest/mgr/administrator/"
+ "#client-authentication";
+}
+
+void DaemonServer::_check_offlines_pgs(
+ const set<int>& osds,
+ const OSDMap& osdmap,
+ const PGMap& pgmap,
+ offline_pg_report *report)
+{
+ // reset output
+ *report = offline_pg_report();
+ report->osds = osds;
+
+ for (const auto& q : pgmap.pg_stat) {
+ set<int32_t> pg_acting; // net acting sets (with no missing if degraded)
+ bool found = false;
+ if (q.second.state == 0) {
+ report->unknown.insert(q.first);
+ continue;
+ }
+ if (q.second.state & PG_STATE_DEGRADED) {
+ for (auto& anm : q.second.avail_no_missing) {
+ if (osds.count(anm.osd)) {
+ found = true;
+ continue;
+ }
+ if (anm.osd != CRUSH_ITEM_NONE) {
+ pg_acting.insert(anm.osd);
+ }
+ }
+ } else {
+ for (auto& a : q.second.acting) {
+ if (osds.count(a)) {
+ found = true;
+ continue;
+ }
+ if (a != CRUSH_ITEM_NONE) {
+ pg_acting.insert(a);
+ }
+ }
+ }
+ if (!found) {
+ continue;
+ }
+ const pg_pool_t *pi = osdmap.get_pg_pool(q.first.pool());
+ bool dangerous = false;
+ if (!pi) {
+ report->bad_no_pool.insert(q.first); // pool is creating or deleting
+ dangerous = true;
+ }
+ if (!(q.second.state & PG_STATE_ACTIVE)) {
+ report->bad_already_inactive.insert(q.first);
+ dangerous = true;
+ }
+ if (pg_acting.size() < pi->min_size) {
+ report->bad_become_inactive.insert(q.first);
+ dangerous = true;
+ }
+ if (dangerous) {
+ report->not_ok.insert(q.first);
+ } else {
+ report->ok.insert(q.first);
+ if (q.second.state & PG_STATE_DEGRADED) {
+ report->ok_become_more_degraded.insert(q.first);
+ } else {
+ report->ok_become_degraded.insert(q.first);
+ }
+ }
+ }
+ dout(20) << osds << " -> " << report->ok.size() << " ok, "
+ << report->not_ok.size() << " not ok, "
+ << report->unknown.size() << " unknown"
+ << dendl;
+}
+
+void DaemonServer::_maximize_ok_to_stop_set(
+ const set<int>& orig_osds,
+ unsigned max,
+ const OSDMap& osdmap,
+ const PGMap& pgmap,
+ offline_pg_report *out_report)
+{
+ dout(20) << "orig_osds " << orig_osds << " max " << max << dendl;
+ _check_offlines_pgs(orig_osds, osdmap, pgmap, out_report);
+ if (!out_report->ok_to_stop()) {
+ return;
+ }
+ if (orig_osds.size() >= max) {
+ // already at max
+ return;
+ }
+
+ // semi-arbitrarily start with the first osd in the set
+ offline_pg_report report;
+ set<int> osds = orig_osds;
+ int parent = *osds.begin();
+ set<int> children;
+
+ while (true) {
+ // identify the next parent
+ int r = osdmap.crush->get_immediate_parent_id(parent, &parent);
+ if (r < 0) {
+ return; // just go with what we have so far!
+ }
+
+ // get candidate additions that are beneath this point in the tree
+ children.clear();
+ r = osdmap.crush->get_all_children(parent, &children);
+ if (r < 0) {
+ return; // just go with what we have so far!
+ }
+ dout(20) << " parent " << parent << " children " << children << dendl;
+
+ // try adding in more osds
+ int failed = 0; // how many children we failed to add to our set
+ for (auto o : children) {
+ if (o >= 0 && osdmap.is_up(o) && osds.count(o) == 0) {
+ osds.insert(o);
+ _check_offlines_pgs(osds, osdmap, pgmap, &report);
+ if (!report.ok_to_stop()) {
+ osds.erase(o);
+ ++failed;
+ continue;
+ }
+ *out_report = report;
+ if (osds.size() == max) {
+ dout(20) << " hit max" << dendl;
+ return; // yay, we hit the max
+ }
+ }
+ }
+
+ if (failed) {
+ // we hit some failures; go with what we have
+ dout(20) << " hit some peer failures" << dendl;
+ return;
+ }
+ }
+}
+
+bool DaemonServer::_handle_command(
+ std::shared_ptr<CommandContext>& cmdctx)
+{
+ MessageRef m;
+ bool admin_socket_cmd = false;
+ if (cmdctx->m_tell) {
+ m = cmdctx->m_tell;
+ // a blank fsid in MCommand signals a legacy client sending a "mon-mgr" CLI
+ // command.
+ admin_socket_cmd = (cmdctx->m_tell->fsid != uuid_d());
+ } else {
+ m = cmdctx->m_mgr;
+ }
+ auto priv = m->get_connection()->get_priv();
+ auto session = static_cast<MgrSession*>(priv.get());
+ if (!session) {
+ return true;
+ }
+ if (session->inst.name == entity_name_t()) {
+ session->inst.name = m->get_source();
+ }
+
+ map<string,string> param_str_map;
+ std::stringstream ss;
+ int r = 0;
+
+ if (!cmdmap_from_json(cmdctx->cmd, &(cmdctx->cmdmap), ss)) {
+ cmdctx->reply(-EINVAL, ss);
+ return true;
+ }
+
+ string prefix;
+ cmd_getval(cmdctx->cmdmap, "prefix", prefix);
+ dout(10) << "decoded-size=" << cmdctx->cmdmap.size() << " prefix=" << prefix << dendl;
+
+ boost::scoped_ptr<Formatter> f;
+ {
+ std::string format;
+ if (boost::algorithm::ends_with(prefix, "_json")) {
+ format = "json";
+ } else {
+ format = cmd_getval_or<string>(cmdctx->cmdmap, "format", "plain");
+ }
+ f.reset(Formatter::create(format));
+ }
+
+ // this is just for mgr commands - admin socket commands will fall
+ // through and use the admin socket version of
+ // get_command_descriptions
+ if (prefix == "get_command_descriptions" && !admin_socket_cmd) {
+ dout(10) << "reading commands from python modules" << dendl;
+ const auto py_commands = py_modules.get_commands();
+
+ int cmdnum = 0;
+ JSONFormatter f;
+ f.open_object_section("command_descriptions");
+
+ auto dump_cmd = [&cmdnum, &f, m](const MonCommand &mc){
+ ostringstream secname;
+ secname << "cmd" << std::setfill('0') << std::setw(3) << cmdnum;
+ dump_cmddesc_to_json(&f, m->get_connection()->get_features(),
+ secname.str(), mc.cmdstring, mc.helpstring,
+ mc.module, mc.req_perms, 0);
+ cmdnum++;
+ };
+
+ for (const auto &pyc : py_commands) {
+ dump_cmd(pyc);
+ }
+
+ for (const auto &mgr_cmd : mgr_commands) {
+ dump_cmd(mgr_cmd);
+ }
+
+ f.close_section(); // command_descriptions
+ f.flush(cmdctx->odata);
+ cmdctx->reply(0, ss);
+ return true;
+ }
+
+ // lookup command
+ const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
+ _generate_command_map(cmdctx->cmdmap, param_str_map);
+
+ bool is_allowed = false;
+ ModuleCommand py_command;
+ if (admin_socket_cmd) {
+ // admin socket commands require all capabilities
+ is_allowed = session->caps.is_allow_all();
+ } else if (!mgr_cmd) {
+ // Resolve the command to the name of the module that will
+ // handle it (if the command exists)
+ auto py_commands = py_modules.get_py_commands();
+ for (const auto &pyc : py_commands) {
+ auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
+ if (pyc_prefix == prefix) {
+ py_command = pyc;
+ break;
+ }
+ }
+
+ MonCommand pyc = {"", "", "py", py_command.perm};
+ is_allowed = _allowed_command(session, "py", py_command.module_name,
+ prefix, cmdctx->cmdmap, param_str_map,
+ &pyc);
+ } else {
+ // validate user's permissions for requested command
+ is_allowed = _allowed_command(session, mgr_cmd->module, "",
+ prefix, cmdctx->cmdmap, param_str_map, mgr_cmd);
+ }
+
+ if (!is_allowed) {
+ log_access_denied(cmdctx, session, ss);
+ cmdctx->reply(-EACCES, ss);
+ return true;
+ }
+
+ audit_clog->debug()
+ << "from='" << session->inst << "' "
+ << "entity='" << session->entity_name << "' "
+ << "cmd=" << cmdctx->cmd << ": dispatch";
+
+ if (admin_socket_cmd) {
+ cct->get_admin_socket()->queue_tell_command(cmdctx->m_tell);
+ return true;
+ }
+
+ // ----------------
+ // service map commands
+ if (prefix == "service dump") {
+ if (!f)
+ f.reset(Formatter::create("json-pretty"));
+ cluster_state.with_servicemap([&](const ServiceMap &service_map) {
+ f->dump_object("service_map", service_map);
+ });
+ f->flush(cmdctx->odata);
+ cmdctx->reply(0, ss);
+ return true;
+ }
+ if (prefix == "service status") {
+ if (!f)
+ f.reset(Formatter::create("json-pretty"));
+ // only include state from services that are in the persisted service map
+ f->open_object_section("service_status");
+ for (auto& [type, service] : pending_service_map.services) {
+ if (ServiceMap::is_normal_ceph_entity(type)) {
+ continue;
+ }
+
+ f->open_object_section(type.c_str());
+ for (auto& q : service.daemons) {
+ f->open_object_section(q.first.c_str());
+ DaemonKey key{type, q.first};
+ ceph_assert(daemon_state.exists(key));
+ auto daemon = daemon_state.get(key);
+ std::lock_guard l(daemon->lock);
+ f->dump_stream("status_stamp") << daemon->service_status_stamp;
+ f->dump_stream("last_beacon") << daemon->last_service_beacon;
+ f->open_object_section("status");
+ for (auto& r : daemon->service_status) {
+ f->dump_string(r.first.c_str(), r.second);
+ }
+ f->close_section();
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ f->flush(cmdctx->odata);
+ cmdctx->reply(0, ss);
+ return true;
+ }
+
+ if (prefix == "config set") {
+ std::string key;
+ std::string val;
+ cmd_getval(cmdctx->cmdmap, "key", key);
+ cmd_getval(cmdctx->cmdmap, "value", val);
+ r = cct->_conf.set_val(key, val, &ss);
+ if (r == 0) {
+ cct->_conf.apply_changes(nullptr);
+ }
+ cmdctx->reply(0, ss);
+ return true;
+ }
+
+ // -----------
+ // PG commands
+
+ if (prefix == "pg scrub" ||
+ prefix == "pg repair" ||
+ prefix == "pg deep-scrub") {
+ string scrubop = prefix.substr(3, string::npos);
+ pg_t pgid;
+ spg_t spgid;
+ string pgidstr;
+ cmd_getval(cmdctx->cmdmap, "pgid", pgidstr);
+ if (!pgid.parse(pgidstr.c_str())) {
+ ss << "invalid pgid '" << pgidstr << "'";
+ cmdctx->reply(-EINVAL, ss);
+ return true;
+ }
+ bool pg_exists = false;
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ pg_exists = osdmap.pg_exists(pgid);
+ });
+ if (!pg_exists) {
+ ss << "pg " << pgid << " does not exist";
+ cmdctx->reply(-ENOENT, ss);
+ return true;
+ }
+ int acting_primary = -1;
+ epoch_t epoch;
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ epoch = osdmap.get_epoch();
+ osdmap.get_primary_shard(pgid, &acting_primary, &spgid);
+ });
+ if (acting_primary == -1) {
+ ss << "pg " << pgid << " has no primary osd";
+ cmdctx->reply(-EAGAIN, ss);
+ return true;
+ }
+ auto p = osd_cons.find(acting_primary);
+ if (p == osd_cons.end()) {
+ ss << "pg " << pgid << " primary osd." << acting_primary
+ << " is not currently connected";
+ cmdctx->reply(-EAGAIN, ss);
+ return true;
+ }
+ for (auto& con : p->second) {
+ assert(HAVE_FEATURE(con->get_features(), SERVER_OCTOPUS));
+ vector<spg_t> pgs = { spgid };
+ con->send_message(new MOSDScrub2(monc->get_fsid(),
+ epoch,
+ pgs,
+ scrubop == "repair",
+ scrubop == "deep-scrub"));
+ }
+ ss << "instructing pg " << spgid << " on osd." << acting_primary
+ << " to " << scrubop;
+ cmdctx->reply(0, ss);
+ return true;
+ } else if (prefix == "osd scrub" ||
+ prefix == "osd deep-scrub" ||
+ prefix == "osd repair") {
+ string whostr;
+ cmd_getval(cmdctx->cmdmap, "who", whostr);
+ vector<string> pvec;
+ get_str_vec(prefix, pvec);
+
+ set<int> osds;
+ if (whostr == "*" || whostr == "all" || whostr == "any") {
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ for (int i = 0; i < osdmap.get_max_osd(); i++)
+ if (osdmap.is_up(i)) {
+ osds.insert(i);
+ }
+ });
+ } else {
+ long osd = parse_osd_id(whostr.c_str(), &ss);
+ if (osd < 0) {
+ ss << "invalid osd '" << whostr << "'";
+ cmdctx->reply(-EINVAL, ss);
+ return true;
+ }
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ if (osdmap.is_up(osd)) {
+ osds.insert(osd);
+ }
+ });
+ if (osds.empty()) {
+ ss << "osd." << osd << " is not up";
+ cmdctx->reply(-EAGAIN, ss);
+ return true;
+ }
+ }
+ set<int> sent_osds, failed_osds;
+ for (auto osd : osds) {
+ vector<spg_t> spgs;
+ epoch_t epoch;
+ cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
+ epoch = osdmap.get_epoch();
+ auto p = pgmap.pg_by_osd.find(osd);
+ if (p != pgmap.pg_by_osd.end()) {
+ for (auto pgid : p->second) {
+ int primary;
+ spg_t spg;
+ osdmap.get_primary_shard(pgid, &primary, &spg);
+ if (primary == osd) {
+ spgs.push_back(spg);
+ }
+ }
+ }
+ });
+ auto p = osd_cons.find(osd);
+ if (p == osd_cons.end()) {
+ failed_osds.insert(osd);
+ } else {
+ sent_osds.insert(osd);
+ for (auto& con : p->second) {
+ con->send_message(new MOSDScrub2(monc->get_fsid(),
+ epoch,
+ spgs,
+ pvec.back() == "repair",
+ pvec.back() == "deep-scrub"));
+ }
+ }
+ }
+ if (failed_osds.size() == osds.size()) {
+ ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
+ << " (not connected)";
+ r = -EAGAIN;
+ } else {
+ ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
+ if (!failed_osds.empty()) {
+ ss << "; osd(s) " << failed_osds << " were not connected";
+ }
+ r = 0;
+ }
+ cmdctx->reply(0, ss);
+ return true;
+ } else if (prefix == "osd pool scrub" ||
+ prefix == "osd pool deep-scrub" ||
+ prefix == "osd pool repair") {
+ vector<string> pool_names;
+ cmd_getval(cmdctx->cmdmap, "who", pool_names);
+ if (pool_names.empty()) {
+ ss << "must specify one or more pool names";
+ cmdctx->reply(-EINVAL, ss);
+ return true;
+ }
+ epoch_t epoch;
+ map<int32_t, vector<pg_t>> pgs_by_primary; // legacy
+ map<int32_t, vector<spg_t>> spgs_by_primary;
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ epoch = osdmap.get_epoch();
+ for (auto& pool_name : pool_names) {
+ auto pool_id = osdmap.lookup_pg_pool_name(pool_name);
+ if (pool_id < 0) {
+ ss << "unrecognized pool '" << pool_name << "'";
+ r = -ENOENT;
+ return;
+ }
+ auto pool_pg_num = osdmap.get_pg_num(pool_id);
+ for (int i = 0; i < pool_pg_num; i++) {
+ pg_t pg(i, pool_id);
+ int primary;
+ spg_t spg;
+ auto got = osdmap.get_primary_shard(pg, &primary, &spg);
+ if (!got)
+ continue;
+ pgs_by_primary[primary].push_back(pg);
+ spgs_by_primary[primary].push_back(spg);
+ }
+ }
+ });
+ if (r < 0) {
+ cmdctx->reply(r, ss);
+ return true;
+ }
+ for (auto& it : spgs_by_primary) {
+ auto primary = it.first;
+ auto p = osd_cons.find(primary);
+ if (p == osd_cons.end()) {
+ ss << "osd." << primary << " is not currently connected";
+ cmdctx->reply(-EAGAIN, ss);
+ return true;
+ }
+ for (auto& con : p->second) {
+ con->send_message(new MOSDScrub2(monc->get_fsid(),
+ epoch,
+ it.second,
+ prefix == "osd pool repair",
+ prefix == "osd pool deep-scrub"));
+ }
+ }
+ cmdctx->reply(0, "");
+ return true;
+ } else if (prefix == "osd reweight-by-pg" ||
+ prefix == "osd reweight-by-utilization" ||
+ prefix == "osd test-reweight-by-pg" ||
+ prefix == "osd test-reweight-by-utilization") {
+ bool by_pg =
+ prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
+ bool dry_run =
+ prefix == "osd test-reweight-by-pg" ||
+ prefix == "osd test-reweight-by-utilization";
+ int64_t oload = cmd_getval_or<int64_t>(cmdctx->cmdmap, "oload", 120);
+ set<int64_t> pools;
+ vector<string> poolnames;
+ cmd_getval(cmdctx->cmdmap, "pools", poolnames);
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ for (const auto& poolname : poolnames) {
+ int64_t pool = osdmap.lookup_pg_pool_name(poolname);
+ if (pool < 0) {
+ ss << "pool '" << poolname << "' does not exist";
+ r = -ENOENT;
+ }
+ pools.insert(pool);
+ }
+ });
+ if (r) {
+ cmdctx->reply(r, ss);
+ return true;
+ }
+
+ double max_change = g_conf().get_val<double>("mon_reweight_max_change");
+ cmd_getval(cmdctx->cmdmap, "max_change", max_change);
+ if (max_change <= 0.0) {
+ ss << "max_change " << max_change << " must be positive";
+ cmdctx->reply(-EINVAL, ss);
+ return true;
+ }
+ int64_t max_osds = g_conf().get_val<int64_t>("mon_reweight_max_osds");
+ cmd_getval(cmdctx->cmdmap, "max_osds", max_osds);
+ if (max_osds <= 0) {
+ ss << "max_osds " << max_osds << " must be positive";
+ cmdctx->reply(-EINVAL, ss);
+ return true;
+ }
+ bool no_increasing = false;
+ cmd_getval_compat_cephbool(cmdctx->cmdmap, "no_increasing", no_increasing);
+ string out_str;
+ mempool::osdmap::map<int32_t, uint32_t> new_weights;
+ r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap &osdmap, const PGMap& pgmap) {
+ return reweight::by_utilization(osdmap, pgmap,
+ oload,
+ max_change,
+ max_osds,
+ by_pg,
+ pools.empty() ? NULL : &pools,
+ no_increasing,
+ &new_weights,
+ &ss, &out_str, f.get());
+ });
+ if (r >= 0) {
+ dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
+ }
+ if (f) {
+ f->flush(cmdctx->odata);
+ } else {
+ cmdctx->odata.append(out_str);
+ }
+ if (r < 0) {
+ ss << "FAILED reweight-by-pg";
+ cmdctx->reply(r, ss);
+ return true;
+ } else if (r == 0 || dry_run) {
+ ss << "no change";
+ cmdctx->reply(r, ss);
+ return true;
+ } else {
+ json_spirit::Object json_object;
+ for (const auto& osd_weight : new_weights) {
+ json_spirit::Config::add(json_object,
+ std::to_string(osd_weight.first),
+ std::to_string(osd_weight.second));
+ }
+ string s = json_spirit::write(json_object);
+ std::replace(begin(s), end(s), '\"', '\'');
+ const string cmd =
+ "{"
+ "\"prefix\": \"osd reweightn\", "
+ "\"weights\": \"" + s + "\""
+ "}";
+ auto on_finish = new ReplyOnFinish(cmdctx);
+ monc->start_mon_command({cmd}, {},
+ &on_finish->from_mon, &on_finish->outs, on_finish);
+ return true;
+ }
+ } else if (prefix == "osd df") {
+ string method, filter;
+ cmd_getval(cmdctx->cmdmap, "output_method", method);
+ cmd_getval(cmdctx->cmdmap, "filter", filter);
+ stringstream rs;
+ r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
+ // sanity check filter(s)
+ if (!filter.empty() &&
+ osdmap.lookup_pg_pool_name(filter) < 0 &&
+ !osdmap.crush->class_exists(filter) &&
+ !osdmap.crush->name_exists(filter)) {
+ rs << "'" << filter << "' not a pool, crush node or device class name";
+ return -EINVAL;
+ }
+ print_osd_utilization(osdmap, pgmap, ss,
+ f.get(), method == "tree", filter);
+ cmdctx->odata.append(ss);
+ return 0;
+ });
+ cmdctx->reply(r, rs);
+ return true;
+ } else if (prefix == "osd pool stats") {
+ string pool_name;
+ cmd_getval(cmdctx->cmdmap, "pool_name", pool_name);
+ int64_t poolid = -ENOENT;
+ bool one_pool = false;
+ r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
+ if (!pool_name.empty()) {
+ poolid = osdmap.lookup_pg_pool_name(pool_name);
+ if (poolid < 0) {
+ ceph_assert(poolid == -ENOENT);
+ ss << "unrecognized pool '" << pool_name << "'";
+ return -ENOENT;
+ }
+ one_pool = true;
+ }
+ stringstream rs;
+ if (f)
+ f->open_array_section("pool_stats");
+ else {
+ if (osdmap.get_pools().empty()) {
+ ss << "there are no pools!";
+ goto stats_out;
+ }
+ }
+ for (auto &p : osdmap.get_pools()) {
+ if (!one_pool) {
+ poolid = p.first;
+ }
+ pg_map.dump_pool_stats_and_io_rate(poolid, osdmap, f.get(), &rs);
+ if (one_pool) {
+ break;
+ }
+ }
+ stats_out:
+ if (f) {
+ f->close_section();
+ f->flush(cmdctx->odata);
+ } else {
+ cmdctx->odata.append(rs.str());
+ }
+ return 0;
+ });
+ if (r != -EOPNOTSUPP) {
+ cmdctx->reply(r, ss);
+ return true;
+ }
+ } else if (prefix == "osd safe-to-destroy" ||
+ prefix == "osd destroy" ||
+ prefix == "osd purge") {
+ set<int> osds;
+ int r = 0;
+ if (prefix == "osd safe-to-destroy") {
+ vector<string> ids;
+ cmd_getval(cmdctx->cmdmap, "ids", ids);
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ r = osdmap.parse_osd_id_list(ids, &osds, &ss);
+ });
+ if (!r && osds.empty()) {
+ ss << "must specify one or more OSDs";
+ r = -EINVAL;
+ }
+ } else {
+ int64_t id;
+ if (!cmd_getval(cmdctx->cmdmap, "id", id)) {
+ r = -EINVAL;
+ ss << "must specify OSD id";
+ } else {
+ osds.insert(id);
+ }
+ }
+ if (r < 0) {
+ cmdctx->reply(r, ss);
+ return true;
+ }
+ set<int> active_osds, missing_stats, stored_pgs, safe_to_destroy;
+ int affected_pgs = 0;
+ cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
+ if (pg_map.num_pg_unknown > 0) {
+ ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw"
+ << " any conclusions";
+ r = -EAGAIN;
+ return;
+ }
+ int num_active_clean = 0;
+ for (auto& p : pg_map.num_pg_by_state) {
+ unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN;
+ if ((p.first & want) == want) {
+ num_active_clean += p.second;
+ }
+ }
+ for (auto osd : osds) {
+ if (!osdmap.exists(osd)) {
+ safe_to_destroy.insert(osd);
+ continue; // clearly safe to destroy
+ }
+ auto q = pg_map.num_pg_by_osd.find(osd);
+ if (q != pg_map.num_pg_by_osd.end()) {
+ if (q->second.acting > 0 || q->second.up_not_acting > 0) {
+ active_osds.insert(osd);
+ // XXX: For overlapping PGs, this counts them again
+ affected_pgs += q->second.acting + q->second.up_not_acting;
+ continue;
+ }
+ }
+ if (num_active_clean < pg_map.num_pg) {
+ // all pgs aren't active+clean; we need to be careful.
+ auto p = pg_map.osd_stat.find(osd);
+ if (p == pg_map.osd_stat.end() || !osdmap.is_up(osd)) {
+ missing_stats.insert(osd);
+ continue;
+ } else if (p->second.num_pgs > 0) {
+ stored_pgs.insert(osd);
+ continue;
+ }
+ }
+ safe_to_destroy.insert(osd);
+ }
+ });
+ if (r && prefix == "osd safe-to-destroy") {
+ cmdctx->reply(r, ss); // regardless of formatter
+ return true;
+ }
+ if (!r && (!active_osds.empty() ||
+ !missing_stats.empty() || !stored_pgs.empty())) {
+ if (!safe_to_destroy.empty()) {
+ ss << "OSD(s) " << safe_to_destroy
+ << " are safe to destroy without reducing data durability. ";
+ }
+ if (!active_osds.empty()) {
+ ss << "OSD(s) " << active_osds << " have " << affected_pgs
+ << " pgs currently mapped to them. ";
+ }
+ if (!missing_stats.empty()) {
+ ss << "OSD(s) " << missing_stats << " have no reported stats, and not all"
+ << " PGs are active+clean; we cannot draw any conclusions. ";
+ }
+ if (!stored_pgs.empty()) {
+ ss << "OSD(s) " << stored_pgs << " last reported they still store some PG"
+ << " data, and not all PGs are active+clean; we cannot be sure they"
+ << " aren't still needed.";
+ }
+ if (!active_osds.empty() || !stored_pgs.empty()) {
+ r = -EBUSY;
+ } else {
+ r = -EAGAIN;
+ }
+ }
+
+ if (prefix == "osd safe-to-destroy") {
+ if (!r) {
+ ss << "OSD(s) " << osds << " are safe to destroy without reducing data"
+ << " durability.";
+ }
+ if (f) {
+ f->open_object_section("osd_status");
+ f->open_array_section("safe_to_destroy");
+ for (auto i : safe_to_destroy)
+ f->dump_int("osd", i);
+ f->close_section();
+ f->open_array_section("active");
+ for (auto i : active_osds)
+ f->dump_int("osd", i);
+ f->close_section();
+ f->open_array_section("missing_stats");
+ for (auto i : missing_stats)
+ f->dump_int("osd", i);
+ f->close_section();
+ f->open_array_section("stored_pgs");
+ for (auto i : stored_pgs)
+ f->dump_int("osd", i);
+ f->close_section();
+ f->close_section(); // osd_status
+ f->flush(cmdctx->odata);
+ r = 0;
+ std::stringstream().swap(ss);
+ }
+ cmdctx->reply(r, ss);
+ return true;
+ }
+
+ if (r) {
+ bool force = false;
+ cmd_getval(cmdctx->cmdmap, "force", force);
+ if (!force) {
+ // Backward compat
+ cmd_getval(cmdctx->cmdmap, "yes_i_really_mean_it", force);
+ }
+ if (!force) {
+ ss << "\nYou can proceed by passing --force, but be warned that"
+ " this will likely mean real, permanent data loss.";
+ } else {
+ r = 0;
+ }
+ }
+ if (r) {
+ cmdctx->reply(r, ss);
+ return true;
+ }
+ const string cmd =
+ "{"
+ "\"prefix\": \"" + prefix + "-actual\", "
+ "\"id\": " + stringify(osds) + ", "
+ "\"yes_i_really_mean_it\": true"
+ "}";
+ auto on_finish = new ReplyOnFinish(cmdctx);
+ monc->start_mon_command({cmd}, {}, nullptr, &on_finish->outs, on_finish);
+ return true;
+ } else if (prefix == "osd ok-to-stop") {
+ vector<string> ids;
+ cmd_getval(cmdctx->cmdmap, "ids", ids);
+ set<int> osds;
+ int64_t max = 1;
+ cmd_getval(cmdctx->cmdmap, "max", max);
+ int r;
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ r = osdmap.parse_osd_id_list(ids, &osds, &ss);
+ });
+ if (!r && osds.empty()) {
+ ss << "must specify one or more OSDs";
+ r = -EINVAL;
+ }
+ if (max < (int)osds.size()) {
+ max = osds.size();
+ }
+ if (r < 0) {
+ cmdctx->reply(r, ss);
+ return true;
+ }
+ offline_pg_report out_report;
+ cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
+ _maximize_ok_to_stop_set(
+ osds, max, osdmap, pg_map,
+ &out_report);
+ });
+ if (!f) {
+ f.reset(Formatter::create("json"));
+ }
+ f->dump_object("ok_to_stop", out_report);
+ f->flush(cmdctx->odata);
+ cmdctx->odata.append("\n");
+ if (!out_report.unknown.empty()) {
+ ss << out_report.unknown.size() << " pgs have unknown state; "
+ << "cannot draw any conclusions";
+ cmdctx->reply(-EAGAIN, ss);
+ }
+ if (!out_report.ok_to_stop()) {
+ ss << "unsafe to stop osd(s) at this time (" << out_report.not_ok.size() << " PGs are or would become offline)";
+ cmdctx->reply(-EBUSY, ss);
+ } else {
+ cmdctx->reply(0, ss);
+ }
+ return true;
+ } else if (prefix == "pg force-recovery" ||
+ prefix == "pg force-backfill" ||
+ prefix == "pg cancel-force-recovery" ||
+ prefix == "pg cancel-force-backfill" ||
+ prefix == "osd pool force-recovery" ||
+ prefix == "osd pool force-backfill" ||
+ prefix == "osd pool cancel-force-recovery" ||
+ prefix == "osd pool cancel-force-backfill") {
+ vector<string> vs;
+ get_str_vec(prefix, vs);
+ auto& granularity = vs.front();
+ auto& forceop = vs.back();
+ vector<pg_t> pgs;
+
+ // figure out actual op just once
+ int actual_op = 0;
+ if (forceop == "force-recovery") {
+ actual_op = OFR_RECOVERY;
+ } else if (forceop == "force-backfill") {
+ actual_op = OFR_BACKFILL;
+ } else if (forceop == "cancel-force-backfill") {
+ actual_op = OFR_BACKFILL | OFR_CANCEL;
+ } else if (forceop == "cancel-force-recovery") {
+ actual_op = OFR_RECOVERY | OFR_CANCEL;
+ }
+
+ set<pg_t> candidates; // deduped
+ if (granularity == "pg") {
+ // covnert pg names to pgs, discard any invalid ones while at it
+ vector<string> pgids;
+ cmd_getval(cmdctx->cmdmap, "pgid", pgids);
+ for (auto& i : pgids) {
+ pg_t pgid;
+ if (!pgid.parse(i.c_str())) {
+ ss << "invlaid pgid '" << i << "'; ";
+ r = -EINVAL;
+ continue;
+ }
+ candidates.insert(pgid);
+ }
+ } else {
+ // per pool
+ vector<string> pool_names;
+ cmd_getval(cmdctx->cmdmap, "who", pool_names);
+ if (pool_names.empty()) {
+ ss << "must specify one or more pool names";
+ cmdctx->reply(-EINVAL, ss);
+ return true;
+ }
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ for (auto& pool_name : pool_names) {
+ auto pool_id = osdmap.lookup_pg_pool_name(pool_name);
+ if (pool_id < 0) {
+ ss << "unrecognized pool '" << pool_name << "'";
+ r = -ENOENT;
+ return;
+ }
+ auto pool_pg_num = osdmap.get_pg_num(pool_id);
+ for (int i = 0; i < pool_pg_num; i++)
+ candidates.insert({(unsigned int)i, (uint64_t)pool_id});
+ }
+ });
+ if (r < 0) {
+ cmdctx->reply(r, ss);
+ return true;
+ }
+ }
+
+ cluster_state.with_pgmap([&](const PGMap& pg_map) {
+ for (auto& i : candidates) {
+ auto it = pg_map.pg_stat.find(i);
+ if (it == pg_map.pg_stat.end()) {
+ ss << "pg " << i << " does not exist; ";
+ r = -ENOENT;
+ continue;
+ }
+ auto state = it->second.state;
+ // discard pgs for which user requests are pointless
+ switch (actual_op) {
+ case OFR_RECOVERY:
+ if ((state & (PG_STATE_DEGRADED |
+ PG_STATE_RECOVERY_WAIT |
+ PG_STATE_RECOVERING)) == 0) {
+ // don't return error, user script may be racing with cluster.
+ // not fatal.
+ ss << "pg " << i << " doesn't require recovery; ";
+ continue;
+ } else if (state & PG_STATE_FORCED_RECOVERY) {
+ ss << "pg " << i << " recovery already forced; ";
+ // return error, as it may be a bug in user script
+ r = -EINVAL;
+ continue;
+ }
+ break;
+ case OFR_BACKFILL:
+ if ((state & (PG_STATE_DEGRADED |
+ PG_STATE_BACKFILL_WAIT |
+ PG_STATE_BACKFILLING)) == 0) {
+ ss << "pg " << i << " doesn't require backfilling; ";
+ continue;
+ } else if (state & PG_STATE_FORCED_BACKFILL) {
+ ss << "pg " << i << " backfill already forced; ";
+ r = -EINVAL;
+ continue;
+ }
+ break;
+ case OFR_BACKFILL | OFR_CANCEL:
+ if ((state & PG_STATE_FORCED_BACKFILL) == 0) {
+ ss << "pg " << i << " backfill not forced; ";
+ continue;
+ }
+ break;
+ case OFR_RECOVERY | OFR_CANCEL:
+ if ((state & PG_STATE_FORCED_RECOVERY) == 0) {
+ ss << "pg " << i << " recovery not forced; ";
+ continue;
+ }
+ break;
+ default:
+ ceph_abort_msg("actual_op value is not supported");
+ }
+ pgs.push_back(i);
+ } // for
+ });
+
+ // respond with error only when no pgs are correct
+ // yes, in case of mixed errors, only the last one will be emitted,
+ // but the message presented will be fine
+ if (pgs.size() != 0) {
+ // clear error to not confuse users/scripts
+ r = 0;
+ }
+
+ // optimize the command -> messages conversion, use only one
+ // message per distinct OSD
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ // group pgs to process by osd
+ map<int, vector<spg_t>> osdpgs;
+ for (auto& pgid : pgs) {
+ int primary;
+ spg_t spg;
+ if (osdmap.get_primary_shard(pgid, &primary, &spg)) {
+ osdpgs[primary].push_back(spg);
+ }
+ }
+ for (auto& i : osdpgs) {
+ if (osdmap.is_up(i.first)) {
+ auto p = osd_cons.find(i.first);
+ if (p == osd_cons.end()) {
+ ss << "osd." << i.first << " is not currently connected";
+ r = -EAGAIN;
+ continue;
+ }
+ for (auto& con : p->second) {
+ con->send_message(
+ new MOSDForceRecovery(monc->get_fsid(), i.second, actual_op));
+ }
+ ss << "instructing pg(s) " << i.second << " on osd." << i.first
+ << " to " << forceop << "; ";
+ }
+ }
+ });
+ ss << std::endl;
+ cmdctx->reply(r, ss);
+ return true;
+ } else if (prefix == "config show" ||
+ prefix == "config show-with-defaults") {
+ string who;
+ cmd_getval(cmdctx->cmdmap, "who", who);
+ auto [key, valid] = DaemonKey::parse(who);
+ if (!valid) {
+ ss << "invalid daemon name: use <type>.<id>";
+ cmdctx->reply(-EINVAL, ss);
+ return true;
+ }
+ DaemonStatePtr daemon = daemon_state.get(key);
+ if (!daemon) {
+ ss << "no config state for daemon " << who;
+ cmdctx->reply(-ENOENT, ss);
+ return true;
+ }
+
+ std::lock_guard l(daemon->lock);
+
+ int r = 0;
+ string name;
+ if (cmd_getval(cmdctx->cmdmap, "key", name)) {
+ // handle special options
+ if (name == "fsid") {
+ cmdctx->odata.append(stringify(monc->get_fsid()) + "\n");
+ cmdctx->reply(r, ss);
+ return true;
+ }
+ auto p = daemon->config.find(name);
+ if (p != daemon->config.end() &&
+ !p->second.empty()) {
+ cmdctx->odata.append(p->second.rbegin()->second + "\n");
+ } else {
+ auto& defaults = daemon->_get_config_defaults();
+ auto q = defaults.find(name);
+ if (q != defaults.end()) {
+ cmdctx->odata.append(q->second + "\n");
+ } else {
+ r = -ENOENT;
+ }
+ }
+ } else if (daemon->config_defaults_bl.length() > 0) {
+ TextTable tbl;
+ if (f) {
+ f->open_array_section("config");
+ } else {
+ tbl.define_column("NAME", TextTable::LEFT, TextTable::LEFT);
+ tbl.define_column("VALUE", TextTable::LEFT, TextTable::LEFT);
+ tbl.define_column("SOURCE", TextTable::LEFT, TextTable::LEFT);
+ tbl.define_column("OVERRIDES", TextTable::LEFT, TextTable::LEFT);
+ tbl.define_column("IGNORES", TextTable::LEFT, TextTable::LEFT);
+ }
+ if (prefix == "config show") {
+ // show
+ for (auto& i : daemon->config) {
+ dout(20) << " " << i.first << " -> " << i.second << dendl;
+ if (i.second.empty()) {
+ continue;
+ }
+ if (f) {
+ f->open_object_section("value");
+ f->dump_string("name", i.first);
+ f->dump_string("value", i.second.rbegin()->second);
+ f->dump_string("source", ceph_conf_level_name(
+ i.second.rbegin()->first));
+ if (i.second.size() > 1) {
+ f->open_array_section("overrides");
+ auto j = i.second.rend();
+ for (--j; j != i.second.rbegin(); --j) {
+ f->open_object_section("value");
+ f->dump_string("source", ceph_conf_level_name(j->first));
+ f->dump_string("value", j->second);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ if (daemon->ignored_mon_config.count(i.first)) {
+ f->dump_string("ignores", "mon");
+ }
+ f->close_section();
+ } else {
+ tbl << i.first;
+ tbl << i.second.rbegin()->second;
+ tbl << ceph_conf_level_name(i.second.rbegin()->first);
+ if (i.second.size() > 1) {
+ list<string> ov;
+ auto j = i.second.rend();
+ for (--j; j != i.second.rbegin(); --j) {
+ if (j->second == i.second.rbegin()->second) {
+ ov.push_front(string("(") + ceph_conf_level_name(j->first) +
+ string("[") + j->second + string("]") +
+ string(")"));
+ } else {
+ ov.push_front(ceph_conf_level_name(j->first) +
+ string("[") + j->second + string("]"));
+
+ }
+ }
+ tbl << ov;
+ } else {
+ tbl << "";
+ }
+ tbl << (daemon->ignored_mon_config.count(i.first) ? "mon" : "");
+ tbl << TextTable::endrow;
+ }
+ }
+ } else {
+ // show-with-defaults
+ auto& defaults = daemon->_get_config_defaults();
+ for (auto& i : defaults) {
+ if (f) {
+ f->open_object_section("value");
+ f->dump_string("name", i.first);
+ } else {
+ tbl << i.first;
+ }
+ auto j = daemon->config.find(i.first);
+ if (j != daemon->config.end() && !j->second.empty()) {
+ // have config
+ if (f) {
+ f->dump_string("value", j->second.rbegin()->second);
+ f->dump_string("source", ceph_conf_level_name(
+ j->second.rbegin()->first));
+ if (j->second.size() > 1) {
+ f->open_array_section("overrides");
+ auto k = j->second.rend();
+ for (--k; k != j->second.rbegin(); --k) {
+ f->open_object_section("value");
+ f->dump_string("source", ceph_conf_level_name(k->first));
+ f->dump_string("value", k->second);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ if (daemon->ignored_mon_config.count(i.first)) {
+ f->dump_string("ignores", "mon");
+ }
+ f->close_section();
+ } else {
+ tbl << j->second.rbegin()->second;
+ tbl << ceph_conf_level_name(j->second.rbegin()->first);
+ if (j->second.size() > 1) {
+ list<string> ov;
+ auto k = j->second.rend();
+ for (--k; k != j->second.rbegin(); --k) {
+ if (k->second == j->second.rbegin()->second) {
+ ov.push_front(string("(") + ceph_conf_level_name(k->first) +
+ string("[") + k->second + string("]") +
+ string(")"));
+ } else {
+ ov.push_front(ceph_conf_level_name(k->first) +
+ string("[") + k->second + string("]"));
+ }
+ }
+ tbl << ov;
+ } else {
+ tbl << "";
+ }
+ tbl << (daemon->ignored_mon_config.count(i.first) ? "mon" : "");
+ tbl << TextTable::endrow;
+ }
+ } else {
+ // only have default
+ if (f) {
+ f->dump_string("value", i.second);
+ f->dump_string("source", ceph_conf_level_name(CONF_DEFAULT));
+ f->close_section();
+ } else {
+ tbl << i.second;
+ tbl << ceph_conf_level_name(CONF_DEFAULT);
+ tbl << "";
+ tbl << "";
+ tbl << TextTable::endrow;
+ }
+ }
+ }
+ }
+ if (f) {
+ f->close_section();
+ f->flush(cmdctx->odata);
+ } else {
+ cmdctx->odata.append(stringify(tbl));
+ }
+ }
+ cmdctx->reply(r, ss);
+ return true;
+ } else if (prefix == "device ls") {
+ set<string> devids;
+ TextTable tbl;
+ if (f) {
+ f->open_array_section("devices");
+ daemon_state.with_devices([&f](const DeviceState& dev) {
+ f->dump_object("device", dev);
+ });
+ f->close_section();
+ f->flush(cmdctx->odata);
+ } else {
+ tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
+ tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
+ tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
+ tbl.define_column("WEAR", TextTable::RIGHT, TextTable::RIGHT);
+ tbl.define_column("LIFE EXPECTANCY", TextTable::LEFT, TextTable::LEFT);
+ auto now = ceph_clock_now();
+ daemon_state.with_devices([&tbl, now](const DeviceState& dev) {
+ string h;
+ for (auto& i : dev.attachments) {
+ if (h.size()) {
+ h += " ";
+ }
+ h += std::get<0>(i) + ":" + std::get<1>(i);
+ }
+ string d;
+ for (auto& i : dev.daemons) {
+ if (d.size()) {
+ d += " ";
+ }
+ d += to_string(i);
+ }
+ char wear_level_str[16] = {0};
+ if (dev.wear_level >= 0) {
+ snprintf(wear_level_str, sizeof(wear_level_str)-1, "%d%%",
+ (int)(100.1 * dev.wear_level));
+ }
+ tbl << dev.devid
+ << h
+ << d
+ << wear_level_str
+ << dev.get_life_expectancy_str(now)
+ << TextTable::endrow;
+ });
+ cmdctx->odata.append(stringify(tbl));
+ }
+ cmdctx->reply(0, ss);
+ return true;
+ } else if (prefix == "device ls-by-daemon") {
+ string who;
+ cmd_getval(cmdctx->cmdmap, "who", who);
+ if (auto [k, valid] = DaemonKey::parse(who); !valid) {
+ ss << who << " is not a valid daemon name";
+ r = -EINVAL;
+ } else {
+ auto dm = daemon_state.get(k);
+ if (dm) {
+ if (f) {
+ f->open_array_section("devices");
+ for (auto& i : dm->devices) {
+ daemon_state.with_device(i.first, [&f] (const DeviceState& dev) {
+ f->dump_object("device", dev);
+ });
+ }
+ f->close_section();
+ f->flush(cmdctx->odata);
+ } else {
+ TextTable tbl;
+ tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
+ tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
+ tbl.define_column("EXPECTED FAILURE", TextTable::LEFT,
+ TextTable::LEFT);
+ auto now = ceph_clock_now();
+ for (auto& i : dm->devices) {
+ daemon_state.with_device(
+ i.first, [&tbl, now] (const DeviceState& dev) {
+ string h;
+ for (auto& i : dev.attachments) {
+ if (h.size()) {
+ h += " ";
+ }
+ h += std::get<0>(i) + ":" + std::get<1>(i);
+ }
+ tbl << dev.devid
+ << h
+ << dev.get_life_expectancy_str(now)
+ << TextTable::endrow;
+ });
+ }
+ cmdctx->odata.append(stringify(tbl));
+ }
+ } else {
+ r = -ENOENT;
+ ss << "daemon " << who << " not found";
+ }
+ cmdctx->reply(r, ss);
+ }
+ } else if (prefix == "device ls-by-host") {
+ string host;
+ cmd_getval(cmdctx->cmdmap, "host", host);
+ set<string> devids;
+ daemon_state.list_devids_by_server(host, &devids);
+ if (f) {
+ f->open_array_section("devices");
+ for (auto& devid : devids) {
+ daemon_state.with_device(
+ devid, [&f] (const DeviceState& dev) {
+ f->dump_object("device", dev);
+ });
+ }
+ f->close_section();
+ f->flush(cmdctx->odata);
+ } else {
+ TextTable tbl;
+ tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
+ tbl.define_column("DEV", TextTable::LEFT, TextTable::LEFT);
+ tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
+ tbl.define_column("EXPECTED FAILURE", TextTable::LEFT, TextTable::LEFT);
+ auto now = ceph_clock_now();
+ for (auto& devid : devids) {
+ daemon_state.with_device(
+ devid, [&tbl, &host, now] (const DeviceState& dev) {
+ string n;
+ for (auto& j : dev.attachments) {
+ if (std::get<0>(j) == host) {
+ if (n.size()) {
+ n += " ";
+ }
+ n += std::get<1>(j);
+ }
+ }
+ string d;
+ for (auto& i : dev.daemons) {
+ if (d.size()) {
+ d += " ";
+ }
+ d += to_string(i);
+ }
+ tbl << dev.devid
+ << n
+ << d
+ << dev.get_life_expectancy_str(now)
+ << TextTable::endrow;
+ });
+ }
+ cmdctx->odata.append(stringify(tbl));
+ }
+ cmdctx->reply(0, ss);
+ return true;
+ } else if (prefix == "device info") {
+ string devid;
+ cmd_getval(cmdctx->cmdmap, "devid", devid);
+ int r = 0;
+ ostringstream rs;
+ if (!daemon_state.with_device(devid,
+ [&f, &rs] (const DeviceState& dev) {
+ if (f) {
+ f->dump_object("device", dev);
+ } else {
+ dev.print(rs);
+ }
+ })) {
+ ss << "device " << devid << " not found";
+ r = -ENOENT;
+ } else {
+ if (f) {
+ f->flush(cmdctx->odata);
+ } else {
+ cmdctx->odata.append(rs.str());
+ }
+ }
+ cmdctx->reply(r, ss);
+ return true;
+ } else if (prefix == "device set-life-expectancy") {
+ string devid;
+ cmd_getval(cmdctx->cmdmap, "devid", devid);
+ string from_str, to_str;
+ cmd_getval(cmdctx->cmdmap, "from", from_str);
+ cmd_getval(cmdctx->cmdmap, "to", to_str);
+ utime_t from, to;
+ if (!from.parse(from_str)) {
+ ss << "unable to parse datetime '" << from_str << "'";
+ r = -EINVAL;
+ cmdctx->reply(r, ss);
+ } else if (to_str.size() && !to.parse(to_str)) {
+ ss << "unable to parse datetime '" << to_str << "'";
+ r = -EINVAL;
+ cmdctx->reply(r, ss);
+ } else {
+ map<string,string> meta;
+ daemon_state.with_device_create(
+ devid,
+ [from, to, &meta] (DeviceState& dev) {
+ dev.set_life_expectancy(from, to, ceph_clock_now());
+ meta = dev.metadata;
+ });
+ json_spirit::Object json_object;
+ for (auto& i : meta) {
+ json_spirit::Config::add(json_object, i.first, i.second);
+ }
+ bufferlist json;
+ json.append(json_spirit::write(json_object));
+ const string cmd =
+ "{"
+ "\"prefix\": \"config-key set\", "
+ "\"key\": \"device/" + devid + "\""
+ "}";
+ auto on_finish = new ReplyOnFinish(cmdctx);
+ monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
+ }
+ return true;
+ } else if (prefix == "device rm-life-expectancy") {
+ string devid;
+ cmd_getval(cmdctx->cmdmap, "devid", devid);
+ map<string,string> meta;
+ if (daemon_state.with_device_write(devid, [&meta] (DeviceState& dev) {
+ dev.rm_life_expectancy();
+ meta = dev.metadata;
+ })) {
+ string cmd;
+ bufferlist json;
+ if (meta.empty()) {
+ cmd =
+ "{"
+ "\"prefix\": \"config-key rm\", "
+ "\"key\": \"device/" + devid + "\""
+ "}";
+ } else {
+ json_spirit::Object json_object;
+ for (auto& i : meta) {
+ json_spirit::Config::add(json_object, i.first, i.second);
+ }
+ json.append(json_spirit::write(json_object));
+ cmd =
+ "{"
+ "\"prefix\": \"config-key set\", "
+ "\"key\": \"device/" + devid + "\""
+ "}";
+ }
+ auto on_finish = new ReplyOnFinish(cmdctx);
+ monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
+ } else {
+ cmdctx->reply(0, ss);
+ }
+ return true;
+ } else {
+ if (!pgmap_ready) {
+ ss << "Warning: due to ceph-mgr restart, some PG states may not be up to date\n";
+ }
+ if (f) {
+ f->open_object_section("pg_info");
+ f->dump_bool("pg_ready", pgmap_ready);
+ }
+
+ // fall back to feeding command to PGMap
+ r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
+ return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
+ f.get(), &ss, &cmdctx->odata);
+ });
+
+ if (f) {
+ f->close_section();
+ }
+ if (r != -EOPNOTSUPP) {
+ if (f) {
+ f->flush(cmdctx->odata);
+ }
+ cmdctx->reply(r, ss);
+ return true;
+ }
+ }
+
+ // Was the command unfound?
+ if (py_command.cmdstring.empty()) {
+ ss << "No handler found for '" << prefix << "'";
+ dout(4) << "No handler found for '" << prefix << "'" << dendl;
+ cmdctx->reply(-EINVAL, ss);
+ return true;
+ }
+
+ // Validate that the module is active
+ auto& mod_name = py_command.module_name;
+ if (!py_modules.is_module_active(mod_name)) {
+ ss << "Module '" << mod_name << "' is not enabled/loaded (required by "
+ "command '" << prefix << "'): use `ceph mgr module enable "
+ << mod_name << "` to enable it";
+ dout(4) << ss.str() << dendl;
+ cmdctx->reply(-EOPNOTSUPP, ss);
+ return true;
+ }
+
+ dout(10) << "passing through command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
+ Finisher& mod_finisher = py_modules.get_active_module_finisher(mod_name);
+ mod_finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix]
+ (int r_) mutable {
+ std::stringstream ss;
+
+ dout(10) << "dispatching command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
+
+ // Validate that the module is enabled
+ auto& py_handler_name = py_command.module_name;
+ PyModuleRef module = py_modules.get_module(py_handler_name);
+ ceph_assert(module);
+ if (!module->is_enabled()) {
+ ss << "Module '" << py_handler_name << "' is not enabled (required by "
+ "command '" << prefix << "'): use `ceph mgr module enable "
+ << py_handler_name << "` to enable it";
+ dout(4) << ss.str() << dendl;
+ cmdctx->reply(-EOPNOTSUPP, ss);
+ return;
+ }
+
+ // Hack: allow the self-test method to run on unhealthy modules.
+ // Fix this in future by creating a special path for self test rather
+ // than having the hook be a normal module command.
+ std::string self_test_prefix = py_handler_name + " " + "self-test";
+
+ // Validate that the module is healthy
+ bool accept_command;
+ if (module->is_loaded()) {
+ if (module->get_can_run() && !module->is_failed()) {
+ // Healthy module
+ accept_command = true;
+ } else if (self_test_prefix == prefix) {
+ // Unhealthy, but allow because it's a self test command
+ accept_command = true;
+ } else {
+ accept_command = false;
+ ss << "Module '" << py_handler_name << "' has experienced an error and "
+ "cannot handle commands: " << module->get_error_string();
+ }
+ } else {
+ // Module not loaded
+ accept_command = false;
+ ss << "Module '" << py_handler_name << "' failed to load and "
+ "cannot handle commands: " << module->get_error_string();
+ }
+
+ if (!accept_command) {
+ dout(4) << ss.str() << dendl;
+ cmdctx->reply(-EIO, ss);
+ return;
+ }
+
+ std::stringstream ds;
+ bufferlist inbl = cmdctx->data;
+ int r = py_modules.handle_command(py_command, *session, cmdctx->cmdmap,
+ inbl, &ds, &ss);
+ if (r == -EACCES) {
+ log_access_denied(cmdctx, session, ss);
+ }
+
+ cmdctx->odata.append(ds);
+ cmdctx->reply(r, ss);
+ dout(10) << " command returned " << r << dendl;
+ }));
+ return true;
+}
+
+void DaemonServer::_prune_pending_service_map()
+{
+ utime_t cutoff = ceph_clock_now();
+ cutoff -= g_conf().get_val<double>("mgr_service_beacon_grace");
+ auto p = pending_service_map.services.begin();
+ while (p != pending_service_map.services.end()) {
+ auto q = p->second.daemons.begin();
+ while (q != p->second.daemons.end()) {
+ DaemonKey key{p->first, q->first};
+ if (!daemon_state.exists(key)) {
+ if (ServiceMap::is_normal_ceph_entity(p->first)) {
+ dout(10) << "daemon " << key << " in service map but not in daemon state "
+ << "index -- force pruning" << dendl;
+ q = p->second.daemons.erase(q);
+ pending_service_map_dirty = pending_service_map.epoch;
+ } else {
+ derr << "missing key " << key << dendl;
+ ++q;
+ }
+
+ continue;
+ }
+
+ auto daemon = daemon_state.get(key);
+ std::lock_guard l(daemon->lock);
+ if (daemon->last_service_beacon == utime_t()) {
+ // we must have just restarted; assume they are alive now.
+ daemon->last_service_beacon = ceph_clock_now();
+ ++q;
+ continue;
+ }
+ if (daemon->last_service_beacon < cutoff) {
+ dout(10) << "pruning stale " << p->first << "." << q->first
+ << " last_beacon " << daemon->last_service_beacon << dendl;
+ q = p->second.daemons.erase(q);
+ pending_service_map_dirty = pending_service_map.epoch;
+ } else {
+ ++q;
+ }
+ }
+ if (p->second.daemons.empty()) {
+ p = pending_service_map.services.erase(p);
+ pending_service_map_dirty = pending_service_map.epoch;
+ } else {
+ ++p;
+ }
+ }
+}
+
+void DaemonServer::send_report()
+{
+ if (!pgmap_ready) {
+ if (ceph_clock_now() - started_at > g_conf().get_val<int64_t>("mgr_stats_period") * 4.0) {
+ pgmap_ready = true;
+ reported_osds.clear();
+ dout(1) << "Giving up on OSDs that haven't reported yet, sending "
+ << "potentially incomplete PG state to mon" << dendl;
+ } else {
+ dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
+ << dendl;
+ return;
+ }
+ }
+
+ auto m = ceph::make_message<MMonMgrReport>();
+ m->gid = monc->get_global_id();
+ py_modules.get_health_checks(&m->health_checks);
+ py_modules.get_progress_events(&m->progress_events);
+
+ cluster_state.with_mutable_pgmap([&](PGMap& pg_map) {
+ cluster_state.update_delta_stats();
+
+ if (pending_service_map.epoch) {
+ _prune_pending_service_map();
+ if (pending_service_map_dirty >= pending_service_map.epoch) {
+ pending_service_map.modified = ceph_clock_now();
+ encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
+ dout(10) << "sending service_map e" << pending_service_map.epoch
+ << dendl;
+ pending_service_map.epoch++;
+ }
+ }
+
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ // FIXME: no easy way to get mon features here. this will do for
+ // now, though, as long as we don't make a backward-incompat change.
+ pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
+ dout(10) << pg_map << dendl;
+
+ pg_map.get_health_checks(g_ceph_context, osdmap,
+ &m->health_checks);
+
+ dout(10) << m->health_checks.checks.size() << " health checks"
+ << dendl;
+ dout(20) << "health checks:\n";
+ JSONFormatter jf(true);
+ jf.dump_object("health_checks", m->health_checks);
+ jf.flush(*_dout);
+ *_dout << dendl;
+ if (osdmap.require_osd_release >= ceph_release_t::luminous) {
+ clog->debug() << "pgmap v" << pg_map.version << ": " << pg_map;
+ }
+ });
+ });
+
+ map<daemon_metric, unique_ptr<DaemonHealthMetricCollector>> accumulated;
+ for (auto service : {"osd", "mon"} ) {
+ auto daemons = daemon_state.get_by_service(service);
+ for (const auto& [key,state] : daemons) {
+ std::lock_guard l{state->lock};
+ for (const auto& metric : state->daemon_health_metrics) {
+ auto acc = accumulated.find(metric.get_type());
+ if (acc == accumulated.end()) {
+ auto collector = DaemonHealthMetricCollector::create(metric.get_type());
+ if (!collector) {
+ derr << __func__ << " " << key
+ << " sent me an unknown health metric: "
+ << std::hex << static_cast<uint8_t>(metric.get_type())
+ << std::dec << dendl;
+ continue;
+ }
+ tie(acc, std::ignore) = accumulated.emplace(metric.get_type(),
+ std::move(collector));
+ }
+ acc->second->update(key, metric);
+ }
+ }
+ }
+ for (const auto& acc : accumulated) {
+ acc.second->summarize(m->health_checks);
+ }
+ // TODO? We currently do not notify the PyModules
+ // TODO: respect needs_send, so we send the report only if we are asked to do
+ // so, or the state is updated.
+ monc->send_mon_message(std::move(m));
+}
+
+void DaemonServer::adjust_pgs()
+{
+ dout(20) << dendl;
+ unsigned max = std::max<int64_t>(1, g_conf()->mon_osd_max_creating_pgs);
+ double max_misplaced = g_conf().get_val<double>("target_max_misplaced_ratio");
+ bool aggro = g_conf().get_val<bool>("mgr_debug_aggressive_pg_num_changes");
+
+ map<string,unsigned> pg_num_to_set;
+ map<string,unsigned> pgp_num_to_set;
+ set<pg_t> upmaps_to_clear;
+ cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
+ unsigned creating_or_unknown = 0;
+ for (auto& i : pg_map.num_pg_by_state) {
+ if ((i.first & (PG_STATE_CREATING)) ||
+ i.first == 0) {
+ creating_or_unknown += i.second;
+ }
+ }
+ unsigned left = max;
+ if (creating_or_unknown >= max) {
+ return;
+ }
+ left -= creating_or_unknown;
+ dout(10) << "creating_or_unknown " << creating_or_unknown
+ << " max_creating " << max
+ << " left " << left
+ << dendl;
+
+ // FIXME: These checks are fundamentally racy given that adjust_pgs()
+ // can run more frequently than we get updated pg stats from OSDs. We
+ // may make multiple adjustments with stale informaiton.
+ double misplaced_ratio, degraded_ratio;
+ double inactive_pgs_ratio, unknown_pgs_ratio;
+ pg_map.get_recovery_stats(&misplaced_ratio, &degraded_ratio,
+ &inactive_pgs_ratio, &unknown_pgs_ratio);
+ dout(20) << "misplaced_ratio " << misplaced_ratio
+ << " degraded_ratio " << degraded_ratio
+ << " inactive_pgs_ratio " << inactive_pgs_ratio
+ << " unknown_pgs_ratio " << unknown_pgs_ratio
+ << "; target_max_misplaced_ratio " << max_misplaced
+ << dendl;
+
+ for (auto& i : osdmap.get_pools()) {
+ const pg_pool_t& p = i.second;
+
+ // adjust pg_num?
+ if (p.get_pg_num_target() != p.get_pg_num()) {
+ dout(20) << "pool " << i.first
+ << " pg_num " << p.get_pg_num()
+ << " target " << p.get_pg_num_target()
+ << dendl;
+ if (p.has_flag(pg_pool_t::FLAG_CREATING)) {
+ dout(10) << "pool " << i.first
+ << " pg_num_target " << p.get_pg_num_target()
+ << " pg_num " << p.get_pg_num()
+ << " - still creating initial pgs"
+ << dendl;
+ } else if (p.get_pg_num_target() < p.get_pg_num()) {
+ // pg_num decrease (merge)
+ pg_t merge_source(p.get_pg_num() - 1, i.first);
+ pg_t merge_target = merge_source.get_parent();
+ bool ok = true;
+
+ if (p.get_pg_num() != p.get_pg_num_pending()) {
+ dout(10) << "pool " << i.first
+ << " pg_num_target " << p.get_pg_num_target()
+ << " pg_num " << p.get_pg_num()
+ << " - decrease and pg_num_pending != pg_num, waiting"
+ << dendl;
+ ok = false;
+ } else if (p.get_pg_num() == p.get_pgp_num()) {
+ dout(10) << "pool " << i.first
+ << " pg_num_target " << p.get_pg_num_target()
+ << " pg_num " << p.get_pg_num()
+ << " - decrease blocked by pgp_num "
+ << p.get_pgp_num()
+ << dendl;
+ ok = false;
+ }
+ vector<int32_t> source_acting;
+ for (auto &merge_participant : {merge_source, merge_target}) {
+ bool is_merge_source = merge_participant == merge_source;
+ if (osdmap.have_pg_upmaps(merge_participant)) {
+ dout(10) << "pool " << i.first
+ << " pg_num_target " << p.get_pg_num_target()
+ << " pg_num " << p.get_pg_num()
+ << (is_merge_source ? " - merge source " : " - merge target ")
+ << merge_participant
+ << " has upmap" << dendl;
+ upmaps_to_clear.insert(merge_participant);
+ ok = false;
+ }
+ auto q = pg_map.pg_stat.find(merge_participant);
+ if (q == pg_map.pg_stat.end()) {
+ dout(10) << "pool " << i.first
+ << " pg_num_target " << p.get_pg_num_target()
+ << " pg_num " << p.get_pg_num()
+ << " - no state for " << merge_participant
+ << (is_merge_source ? " (merge source)" : " (merge target)")
+ << dendl;
+ ok = false;
+ } else if ((q->second.state & (PG_STATE_ACTIVE | PG_STATE_CLEAN)) !=
+ (PG_STATE_ACTIVE | PG_STATE_CLEAN)) {
+ dout(10) << "pool " << i.first
+ << " pg_num_target " << p.get_pg_num_target()
+ << " pg_num " << p.get_pg_num()
+ << (is_merge_source ? " - merge source " : " - merge target ")
+ << merge_participant
+ << " not clean (" << pg_state_string(q->second.state)
+ << ")" << dendl;
+ ok = false;
+ }
+ if (is_merge_source) {
+ source_acting = q->second.acting;
+ } else if (ok && q->second.acting != source_acting) {
+ dout(10) << "pool " << i.first
+ << " pg_num_target " << p.get_pg_num_target()
+ << " pg_num " << p.get_pg_num()
+ << (is_merge_source ? " - merge source " : " - merge target ")
+ << merge_participant
+ << " acting does not match (source " << source_acting
+ << " != target " << q->second.acting
+ << ")" << dendl;
+ ok = false;
+ }
+ }
+
+ if (ok) {
+ unsigned target = p.get_pg_num() - 1;
+ dout(10) << "pool " << i.first
+ << " pg_num_target " << p.get_pg_num_target()
+ << " pg_num " << p.get_pg_num()
+ << " -> " << target
+ << " (merging " << merge_source
+ << " and " << merge_target
+ << ")" << dendl;
+ pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
+ continue;
+ }
+ } else if (p.get_pg_num_target() > p.get_pg_num()) {
+ // pg_num increase (split)
+ bool active = true;
+ auto q = pg_map.num_pg_by_pool_state.find(i.first);
+ if (q != pg_map.num_pg_by_pool_state.end()) {
+ for (auto& j : q->second) {
+ if ((j.first & (PG_STATE_ACTIVE|PG_STATE_PEERED)) == 0) {
+ dout(20) << "pool " << i.first << " has " << j.second
+ << " pgs in " << pg_state_string(j.first)
+ << dendl;
+ active = false;
+ break;
+ }
+ }
+ } else {
+ active = false;
+ }
+ unsigned pg_gap = p.get_pg_num() - p.get_pgp_num();
+ unsigned max_jump = cct->_conf->mgr_max_pg_num_change;
+ if (!active) {
+ dout(10) << "pool " << i.first
+ << " pg_num_target " << p.get_pg_num_target()
+ << " pg_num " << p.get_pg_num()
+ << " - not all pgs active"
+ << dendl;
+ } else if (pg_gap >= max_jump) {
+ dout(10) << "pool " << i.first
+ << " pg_num " << p.get_pg_num()
+ << " - pgp_num " << p.get_pgp_num()
+ << " gap >= max_pg_num_change " << max_jump
+ << " - must scale pgp_num first"
+ << dendl;
+ } else {
+ unsigned add = std::min(
+ std::min(left, max_jump - pg_gap),
+ p.get_pg_num_target() - p.get_pg_num());
+ unsigned target = p.get_pg_num() + add;
+ left -= add;
+ dout(10) << "pool " << i.first
+ << " pg_num_target " << p.get_pg_num_target()
+ << " pg_num " << p.get_pg_num()
+ << " -> " << target << dendl;
+ pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
+ }
+ }
+ }
+
+ // adjust pgp_num?
+ unsigned target = std::min(p.get_pg_num_pending(),
+ p.get_pgp_num_target());
+ if (target != p.get_pgp_num()) {
+ dout(20) << "pool " << i.first
+ << " pgp_num_target " << p.get_pgp_num_target()
+ << " pgp_num " << p.get_pgp_num()
+ << " -> " << target << dendl;
+ if (target > p.get_pgp_num() &&
+ p.get_pgp_num() == p.get_pg_num()) {
+ dout(10) << "pool " << i.first
+ << " pgp_num_target " << p.get_pgp_num_target()
+ << " pgp_num " << p.get_pgp_num()
+ << " - increase blocked by pg_num " << p.get_pg_num()
+ << dendl;
+ } else if (!aggro && (inactive_pgs_ratio > 0 ||
+ degraded_ratio > 0 ||
+ unknown_pgs_ratio > 0)) {
+ dout(10) << "pool " << i.first
+ << " pgp_num_target " << p.get_pgp_num_target()
+ << " pgp_num " << p.get_pgp_num()
+ << " - inactive|degraded|unknown pgs, deferring pgp_num"
+ << " update" << dendl;
+ } else if (!aggro && (misplaced_ratio > max_misplaced)) {
+ dout(10) << "pool " << i.first
+ << " pgp_num_target " << p.get_pgp_num_target()
+ << " pgp_num " << p.get_pgp_num()
+ << " - misplaced_ratio " << misplaced_ratio
+ << " > max " << max_misplaced
+ << ", deferring pgp_num update" << dendl;
+ } else {
+ // NOTE: this calculation assumes objects are
+ // basically uniformly distributed across all PGs
+ // (regardless of pool), which is probably not
+ // perfectly correct, but it's a start. make no
+ // single adjustment that's more than half of the
+ // max_misplaced, to somewhat limit the magnitude of
+ // our potential error here.
+ unsigned next;
+ static constexpr unsigned MAX_NUM_OBJECTS_PER_PG_FOR_LEAP = 1;
+ pool_stat_t s = pg_map.get_pg_pool_sum_stat(i.first);
+ if (aggro ||
+ // pool is (virtually) empty; just jump to final pgp_num?
+ (p.get_pgp_num_target() > p.get_pgp_num() &&
+ s.stats.sum.num_objects <= (MAX_NUM_OBJECTS_PER_PG_FOR_LEAP *
+ p.get_pgp_num_target()))) {
+ next = target;
+ } else {
+ double room =
+ std::min<double>(max_misplaced - misplaced_ratio,
+ max_misplaced / 2.0);
+ unsigned estmax = std::max<unsigned>(
+ (double)p.get_pg_num() * room, 1u);
+ unsigned next_min = 0;
+ if (p.get_pgp_num() > estmax) {
+ next_min = p.get_pgp_num() - estmax;
+ }
+ next = std::clamp(target,
+ next_min,
+ p.get_pgp_num() + estmax);
+ dout(20) << " room " << room << " estmax " << estmax
+ << " delta " << (target-p.get_pgp_num())
+ << " next " << next << dendl;
+ if (p.get_pgp_num_target() == p.get_pg_num_target() &&
+ p.get_pgp_num_target() < p.get_pg_num()) {
+ // since pgp_num is tracking pg_num, ceph is handling
+ // pgp_num. so, be responsible: don't let pgp_num get
+ // too far out ahead of merges (if we are merging).
+ // this avoids moving lots of unmerged pgs onto a
+ // small number of OSDs where we might blow out the
+ // per-osd pg max.
+ unsigned max_outpace_merges =
+ std::max<unsigned>(8, p.get_pg_num() * max_misplaced);
+ if (next + max_outpace_merges < p.get_pg_num()) {
+ next = p.get_pg_num() - max_outpace_merges;
+ dout(10) << " using next " << next
+ << " to avoid outpacing merges (max_outpace_merges "
+ << max_outpace_merges << ")" << dendl;
+ }
+ }
+ }
+ if (next != p.get_pgp_num()) {
+ dout(10) << "pool " << i.first
+ << " pgp_num_target " << p.get_pgp_num_target()
+ << " pgp_num " << p.get_pgp_num()
+ << " -> " << next << dendl;
+ pgp_num_to_set[osdmap.get_pool_name(i.first)] = next;
+ }
+ }
+ }
+ if (left == 0) {
+ return;
+ }
+ }
+ });
+ for (auto i : pg_num_to_set) {
+ const string cmd =
+ "{"
+ "\"prefix\": \"osd pool set\", "
+ "\"pool\": \"" + i.first + "\", "
+ "\"var\": \"pg_num_actual\", "
+ "\"val\": \"" + stringify(i.second) + "\""
+ "}";
+ monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
+ }
+ for (auto i : pgp_num_to_set) {
+ const string cmd =
+ "{"
+ "\"prefix\": \"osd pool set\", "
+ "\"pool\": \"" + i.first + "\", "
+ "\"var\": \"pgp_num_actual\", "
+ "\"val\": \"" + stringify(i.second) + "\""
+ "}";
+ monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
+ }
+ for (auto pg : upmaps_to_clear) {
+ const string cmd =
+ "{"
+ "\"prefix\": \"osd rm-pg-upmap\", "
+ "\"pgid\": \"" + stringify(pg) + "\""
+ "}";
+ monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
+ const string cmd2 =
+ "{"
+ "\"prefix\": \"osd rm-pg-upmap-items\", "
+ "\"pgid\": \"" + stringify(pg) + "\"" +
+ "}";
+ monc->start_mon_command({cmd2}, {}, nullptr, nullptr, nullptr);
+ }
+}
+
+void DaemonServer::got_service_map()
+{
+ std::lock_guard l(lock);
+
+ cluster_state.with_servicemap([&](const ServiceMap& service_map) {
+ if (pending_service_map.epoch == 0) {
+ // we just started up
+ dout(10) << "got initial map e" << service_map.epoch << dendl;
+ ceph_assert(pending_service_map_dirty == 0);
+ pending_service_map = service_map;
+ pending_service_map.epoch = service_map.epoch + 1;
+ } else if (pending_service_map.epoch <= service_map.epoch) {
+ // we just started up but got one more not our own map
+ dout(10) << "got newer initial map e" << service_map.epoch << dendl;
+ ceph_assert(pending_service_map_dirty == 0);
+ pending_service_map = service_map;
+ pending_service_map.epoch = service_map.epoch + 1;
+ } else {
+ // we already active and therefore must have persisted it,
+ // which means ours is the same or newer.
+ dout(10) << "got updated map e" << service_map.epoch << dendl;
+ }
+ });
+
+ // cull missing daemons, populate new ones
+ std::set<std::string> types;
+ for (auto& [type, service] : pending_service_map.services) {
+ if (ServiceMap::is_normal_ceph_entity(type)) {
+ continue;
+ }
+
+ types.insert(type);
+
+ std::set<std::string> names;
+ for (auto& q : service.daemons) {
+ names.insert(q.first);
+ DaemonKey key{type, q.first};
+ if (!daemon_state.exists(key)) {
+ auto daemon = std::make_shared<DaemonState>(daemon_state.types);
+ daemon->key = key;
+ daemon->set_metadata(q.second.metadata);
+ daemon->service_daemon = true;
+ daemon_state.insert(daemon);
+ dout(10) << "added missing " << key << dendl;
+ }
+ }
+ daemon_state.cull(type, names);
+ }
+ daemon_state.cull_services(types);
+}
+
+void DaemonServer::got_mgr_map()
+{
+ std::lock_guard l(lock);
+ set<std::string> have;
+ cluster_state.with_mgrmap([&](const MgrMap& mgrmap) {
+ auto md_update = [&] (DaemonKey key) {
+ std::ostringstream oss;
+ auto c = new MetadataUpdate(daemon_state, key);
+ // FIXME remove post-nautilus: include 'id' for luminous mons
+ oss << "{\"prefix\": \"mgr metadata\", \"who\": \""
+ << key.name << "\", \"id\": \"" << key.name << "\"}";
+ monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
+ };
+ if (mgrmap.active_name.size()) {
+ DaemonKey key{"mgr", mgrmap.active_name};
+ have.insert(mgrmap.active_name);
+ if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
+ md_update(key);
+ dout(10) << "triggered addition of " << key << " via metadata update" << dendl;
+ }
+ }
+ for (auto& i : mgrmap.standbys) {
+ DaemonKey key{"mgr", i.second.name};
+ have.insert(i.second.name);
+ if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
+ md_update(key);
+ dout(10) << "triggered addition of " << key << " via metadata update" << dendl;
+ }
+ }
+ });
+ daemon_state.cull("mgr", have);
+}
+
+const char** DaemonServer::get_tracked_conf_keys() const
+{
+ static const char *KEYS[] = {
+ "mgr_stats_threshold",
+ "mgr_stats_period",
+ nullptr
+ };
+
+ return KEYS;
+}
+
+void DaemonServer::handle_conf_change(const ConfigProxy& conf,
+ const std::set <std::string> &changed)
+{
+
+ if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) {
+ dout(4) << "Updating stats threshold/period on "
+ << daemon_connections.size() << " clients" << dendl;
+ // Send a fresh MMgrConfigure to all clients, so that they can follow
+ // the new policy for transmitting stats
+ finisher.queue(new LambdaContext([this](int r) {
+ std::lock_guard l(lock);
+ for (auto &c : daemon_connections) {
+ _send_configure(c);
+ }
+ }));
+ }
+}
+
+void DaemonServer::_send_configure(ConnectionRef c)
+{
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
+
+ auto configure = make_message<MMgrConfigure>();
+ configure->stats_period = g_conf().get_val<int64_t>("mgr_stats_period");
+ configure->stats_threshold = g_conf().get_val<int64_t>("mgr_stats_threshold");
+
+ if (c->peer_is_osd()) {
+ configure->osd_perf_metric_queries =
+ osd_perf_metric_collector.get_queries();
+ } else if (c->peer_is_mds()) {
+ configure->metric_config_message =
+ MetricConfigMessage(MDSConfigPayload(mds_perf_metric_collector.get_queries()));
+ }
+
+ c->send_message2(configure);
+}
+
+MetricQueryID DaemonServer::add_osd_perf_query(
+ const OSDPerfMetricQuery &query,
+ const std::optional<OSDPerfMetricLimit> &limit)
+{
+ return osd_perf_metric_collector.add_query(query, limit);
+}
+
+int DaemonServer::remove_osd_perf_query(MetricQueryID query_id)
+{
+ return osd_perf_metric_collector.remove_query(query_id);
+}
+
+int DaemonServer::get_osd_perf_counters(OSDPerfCollector *collector)
+{
+ return osd_perf_metric_collector.get_counters(collector);
+}
+
+MetricQueryID DaemonServer::add_mds_perf_query(
+ const MDSPerfMetricQuery &query,
+ const std::optional<MDSPerfMetricLimit> &limit)
+{
+ return mds_perf_metric_collector.add_query(query, limit);
+}
+
+int DaemonServer::remove_mds_perf_query(MetricQueryID query_id)
+{
+ return mds_perf_metric_collector.remove_query(query_id);
+}
+
+void DaemonServer::reregister_mds_perf_queries()
+{
+ mds_perf_metric_collector.reregister_queries();
+}
+
+int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector)
+{
+ return mds_perf_metric_collector.get_counters(collector);
+}