// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include #include #include #include #include #include #include #include #include #include "json_spirit/json_spirit_reader.h" #include "json_spirit/json_spirit_writer.h" #include "Monitor.h" #include "common/version.h" #include "common/blkdev.h" #include "common/cmdparse.h" #include "common/signal.h" #include "osd/OSDMap.h" #include "MonitorDBStore.h" #include "messages/PaxosServiceMessage.h" #include "messages/MMonMap.h" #include "messages/MMonGetMap.h" #include "messages/MMonGetVersion.h" #include "messages/MMonGetVersionReply.h" #include "messages/MGenericMessage.h" #include "messages/MMonCommand.h" #include "messages/MMonCommandAck.h" #include "messages/MMonMetadata.h" #include "messages/MMonSync.h" #include "messages/MMonScrub.h" #include "messages/MMonProbe.h" #include "messages/MMonJoin.h" #include "messages/MMonPaxos.h" #include "messages/MRoute.h" #include "messages/MForward.h" #include "messages/MMonSubscribe.h" #include "messages/MMonSubscribeAck.h" #include "messages/MAuthReply.h" #include "messages/MTimeCheck2.h" #include "messages/MPing.h" #include "common/strtol.h" #include "common/ceph_argparse.h" #include "common/Timer.h" #include "common/Clock.h" #include "common/errno.h" #include "common/perf_counters.h" #include "common/admin_socket.h" #include "global/signal_handler.h" #include "common/Formatter.h" #include "include/stringify.h" #include "include/color.h" #include "include/ceph_fs.h" #include "include/str_list.h" #include "OSDMonitor.h" #include "MDSMonitor.h" #include "MonmapMonitor.h" #include "LogMonitor.h" #include "AuthMonitor.h" #include "MgrMonitor.h" #include "MgrStatMonitor.h" #include "ConfigMonitor.h" #include "mon/QuorumService.h" #include "mon/HealthMonitor.h" #include "mon/ConfigKeyService.h" #include "common/config.h" #include "common/cmdparse.h" #include "include/ceph_assert.h" #include "include/compat.h" #include "perfglue/heap_profiler.h" #include "auth/none/AuthNoneClientHandler.h" #define dout_subsys ceph_subsys_mon #undef dout_prefix #define dout_prefix _prefix(_dout, this) static ostream& _prefix(std::ostream *_dout, const Monitor *mon) { return *_dout << "mon." << mon->name << "@" << mon->rank << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " "; } const string Monitor::MONITOR_NAME = "monitor"; const string Monitor::MONITOR_STORE_PREFIX = "monitor_store"; #undef FLAG #undef COMMAND #undef COMMAND_WITH_FLAG #define FLAG(f) (MonCommand::FLAG_##f) #define COMMAND(parsesig, helptext, modulename, req_perms) \ {parsesig, helptext, modulename, req_perms, FLAG(NONE)}, #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, flags) \ {parsesig, helptext, modulename, req_perms, flags}, MonCommand mon_commands[] = { #include }; #undef COMMAND #undef COMMAND_WITH_FLAG void C_MonContext::finish(int r) { if (mon->is_shutdown()) return; FunctionContext::finish(r); } Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s, Messenger *m, Messenger *mgr_m, MonMap *map) : Dispatcher(cct_), AuthServer(cct_), name(nm), rank(-1), messenger(m), con_self(m ? m->get_loopback_connection() : NULL), lock("Monitor::lock"), timer(cct_, lock), finisher(cct_, "mon_finisher", "fin"), cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads), has_ever_joined(false), logger(NULL), cluster_logger(NULL), cluster_logger_registered(false), monmap(map), log_client(cct_, messenger, monmap, LogClient::FLAG_MON), key_server(cct, &keyring), auth_cluster_required(cct, cct->_conf->auth_supported.empty() ? cct->_conf->auth_cluster_required : cct->_conf->auth_supported), auth_service_required(cct, cct->_conf->auth_supported.empty() ? cct->_conf->auth_service_required : cct->_conf->auth_supported), mgr_messenger(mgr_m), mgr_client(cct_, mgr_m), gss_ktfile_client(cct->_conf.get_val("gss_ktab_client_file")), store(s), elector(this), required_features(0), leader(0), quorum_con_features(0), // scrub scrub_version(0), scrub_event(NULL), scrub_timeout_event(NULL), // sync state sync_provider_count(0), sync_cookie(0), sync_full(false), sync_start_version(0), sync_timeout_event(NULL), sync_last_committed_floor(0), timecheck_round(0), timecheck_acks(0), timecheck_rounds_since_clean(0), timecheck_event(NULL), paxos_service(PAXOS_NUM), admin_hook(NULL), routed_request_tid(0), op_tracker(cct, g_conf().get_val("mon_enable_op_tracker"), 1) { clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER); audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT); update_log_clients(); if (!gss_ktfile_client.empty()) { // Assert we can export environment variable /* The default client keytab is used, if it is present and readable, to automatically obtain initial credentials for GSSAPI client applications. The principal name of the first entry in the client keytab is used by default when obtaining initial credentials. 1. The KRB5_CLIENT_KTNAME environment variable. 2. The default_client_keytab_name profile variable in [libdefaults]. 3. The hardcoded default, DEFCKTNAME. */ const int32_t set_result(setenv("KRB5_CLIENT_KTNAME", gss_ktfile_client.c_str(), 1)); ceph_assert(set_result == 0); } op_tracker.set_complaint_and_threshold( g_conf().get_val("mon_op_complaint_time").count(), g_conf().get_val("mon_op_log_threshold")); op_tracker.set_history_size_and_duration( g_conf().get_val("mon_op_history_size"), g_conf().get_val("mon_op_history_duration").count()); op_tracker.set_history_slow_op_size_and_threshold( g_conf().get_val("mon_op_history_slow_op_size"), g_conf().get_val("mon_op_history_slow_op_threshold").count()); paxos = new Paxos(this, "paxos"); paxos_service[PAXOS_MDSMAP].reset(new MDSMonitor(this, paxos, "mdsmap")); paxos_service[PAXOS_MONMAP].reset(new MonmapMonitor(this, paxos, "monmap")); paxos_service[PAXOS_OSDMAP].reset(new OSDMonitor(cct, this, paxos, "osdmap")); paxos_service[PAXOS_LOG].reset(new LogMonitor(this, paxos, "logm")); paxos_service[PAXOS_AUTH].reset(new AuthMonitor(this, paxos, "auth")); paxos_service[PAXOS_MGR].reset(new MgrMonitor(this, paxos, "mgr")); paxos_service[PAXOS_MGRSTAT].reset(new MgrStatMonitor(this, paxos, "mgrstat")); paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(this, paxos, "health")); paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(this, paxos, "config")); config_key_service = new ConfigKeyService(this, paxos); bool r = mon_caps.parse("allow *", NULL); ceph_assert(r); exited_quorum = ceph_clock_now(); // prepare local commands local_mon_commands.resize(std::size(mon_commands)); for (unsigned i = 0; i < std::size(mon_commands); ++i) { local_mon_commands[i] = mon_commands[i]; } MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl); prenautilus_local_mon_commands = local_mon_commands; for (auto& i : prenautilus_local_mon_commands) { std::string n = cmddesc_get_prenautilus_compat(i.cmdstring); if (n != i.cmdstring) { dout(20) << " pre-nautilus cmd " << i.cmdstring << " -> " << n << dendl; i.cmdstring = n; } } MonCommand::encode_vector(prenautilus_local_mon_commands, prenautilus_local_mon_commands_bl); // assume our commands until we have an election. this only means // we won't reply with EINVAL before the election; any command that // actually matters will wait until we have quorum etc and then // retry (and revalidate). leader_mon_commands = local_mon_commands; } Monitor::~Monitor() { op_tracker.on_shutdown(); paxos_service.clear(); delete config_key_service; delete paxos; ceph_assert(session_map.sessions.empty()); } class AdminHook : public AdminSocketHook { Monitor *mon; public: explicit AdminHook(Monitor *m) : mon(m) {} bool call(std::string_view command, const cmdmap_t& cmdmap, std::string_view format, bufferlist& out) override { stringstream ss; mon->do_admin_command(command, cmdmap, format, ss); out.append(ss); return true; } }; void Monitor::do_admin_command(std::string_view command, const cmdmap_t& cmdmap, std::string_view format, std::ostream& ss) { std::lock_guard l(lock); boost::scoped_ptr f(Formatter::create(format)); string args; for (auto p = cmdmap.begin(); p != cmdmap.end(); ++p) { if (p->first == "prefix") continue; if (!args.empty()) args += ", "; args += cmd_vartype_stringify(p->second); } args = "[" + args + "]"; bool read_only = (command == "mon_status" || command == "mon metadata" || command == "quorum_status" || command == "ops" || command == "sessions"); (read_only ? audit_clog->debug() : audit_clog->info()) << "from='admin socket' entity='admin socket' " << "cmd='" << command << "' args=" << args << ": dispatch"; if (command == "mon_status") { get_mon_status(f.get(), ss); if (f) f->flush(ss); } else if (command == "quorum_status") { _quorum_status(f.get(), ss); } else if (command == "sync_force") { string validate; if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) || (validate != "--yes-i-really-mean-it")) { ss << "are you SURE? this will mean the monitor store will be erased " "the next time the monitor is restarted. pass " "'--yes-i-really-mean-it' if you really do."; goto abort; } sync_force(f.get(), ss); } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0 || command.compare(0, 24, "add_bootstrap_peer_hintv") == 0) { if (!_add_bootstrap_peer_hint(command, cmdmap, ss)) goto abort; } else if (command == "quorum enter") { elector.start_participating(); start_election(); ss << "started responding to quorum, initiated new election"; } else if (command == "quorum exit") { start_election(); elector.stop_participating(); ss << "stopped responding to quorum, initiated new election"; } else if (command == "ops") { (void)op_tracker.dump_ops_in_flight(f.get()); if (f) { f->flush(ss); } } else if (command == "sessions") { if (f) { f->open_array_section("sessions"); for (auto p : session_map.sessions) { f->dump_object("session", *p); } f->close_section(); f->flush(ss); } } else if (command == "dump_historic_ops") { if (op_tracker.dump_historic_ops(f.get())) { f->flush(ss); } else { ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \ please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards."; } } else if (command == "dump_historic_ops_by_duration" ) { if (op_tracker.dump_historic_ops(f.get(), true)) { f->flush(ss); } else { ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \ please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards."; } } else if (command == "dump_historic_slow_ops") { if (op_tracker.dump_historic_slow_ops(f.get(), {})) { f->flush(ss); } else { ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \ please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards."; } } else { ceph_abort_msg("bad AdminSocket command binding"); } (read_only ? audit_clog->debug() : audit_clog->info()) << "from='admin socket' " << "entity='admin socket' " << "cmd=" << command << " " << "args=" << args << ": finished"; return; abort: (read_only ? audit_clog->debug() : audit_clog->info()) << "from='admin socket' " << "entity='admin socket' " << "cmd=" << command << " " << "args=" << args << ": aborted"; } void Monitor::handle_signal(int signum) { ceph_assert(signum == SIGINT || signum == SIGTERM); derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl; shutdown(); } CompatSet Monitor::get_initial_supported_features() { CompatSet::FeatureSet ceph_mon_feature_compat; CompatSet::FeatureSet ceph_mon_feature_ro_compat; CompatSet::FeatureSet ceph_mon_feature_incompat; ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE); ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS); return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat, ceph_mon_feature_incompat); } CompatSet Monitor::get_supported_features() { CompatSet compat = get_initial_supported_features(); compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES); compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC); compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2); compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3); compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN); compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS); compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC); compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS); return compat; } CompatSet Monitor::get_legacy_features() { CompatSet::FeatureSet ceph_mon_feature_compat; CompatSet::FeatureSet ceph_mon_feature_ro_compat; CompatSet::FeatureSet ceph_mon_feature_incompat; ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE); return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat, ceph_mon_feature_incompat); } int Monitor::check_features(MonitorDBStore *store) { CompatSet required = get_supported_features(); CompatSet ondisk; read_features_off_disk(store, &ondisk); if (!required.writeable(ondisk)) { CompatSet diff = required.unsupported(ondisk); generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl; return -EPERM; } return 0; } void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features) { bufferlist featuresbl; store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl); if (featuresbl.length() == 0) { generic_dout(0) << "WARNING: mon fs missing feature list.\n" << "Assuming it is old-style and introducing one." << dendl; //we only want the baseline ~v.18 features assumed to be on disk. //If new features are introduced this code needs to disappear or //be made smarter. *features = get_legacy_features(); features->encode(featuresbl); auto t(std::make_shared()); t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl); store->apply_transaction(t); } else { auto it = featuresbl.cbegin(); features->decode(it); } } void Monitor::read_features() { read_features_off_disk(store, &features); dout(10) << "features " << features << dendl; calc_quorum_requirements(); dout(10) << "required_features " << required_features << dendl; } void Monitor::write_features(MonitorDBStore::TransactionRef t) { bufferlist bl; features.encode(bl); t->put(MONITOR_NAME, COMPAT_SET_LOC, bl); } const char** Monitor::get_tracked_conf_keys() const { static const char* KEYS[] = { "crushtool", // helpful for testing "mon_election_timeout", "mon_lease", "mon_lease_renew_interval_factor", "mon_lease_ack_timeout_factor", "mon_accept_timeout_factor", // clog & admin clog "clog_to_monitors", "clog_to_syslog", "clog_to_syslog_facility", "clog_to_syslog_level", "clog_to_graylog", "clog_to_graylog_host", "clog_to_graylog_port", "host", "fsid", // periodic health to clog "mon_health_to_clog", "mon_health_to_clog_interval", "mon_health_to_clog_tick_interval", // scrub interval "mon_scrub_interval", "mon_allow_pool_delete", // osdmap pruning - observed, not handled. "mon_osdmap_full_prune_enabled", "mon_osdmap_full_prune_min", "mon_osdmap_full_prune_interval", "mon_osdmap_full_prune_txsize", // debug options - observed, not handled "mon_debug_extra_checks", "mon_debug_block_osdmap_trim", NULL }; return KEYS; } void Monitor::handle_conf_change(const ConfigProxy& conf, const std::set &changed) { sanitize_options(); dout(10) << __func__ << " " << changed << dendl; if (changed.count("clog_to_monitors") || changed.count("clog_to_syslog") || changed.count("clog_to_syslog_level") || changed.count("clog_to_syslog_facility") || changed.count("clog_to_graylog") || changed.count("clog_to_graylog_host") || changed.count("clog_to_graylog_port") || changed.count("host") || changed.count("fsid")) { update_log_clients(); } if (changed.count("mon_health_to_clog") || changed.count("mon_health_to_clog_interval") || changed.count("mon_health_to_clog_tick_interval")) { std::set c2(changed); finisher.queue(new C_MonContext(this, [this, c2](int) { Mutex::Locker l(lock); health_to_clog_update_conf(c2); })); } if (changed.count("mon_scrub_interval")) { int scrub_interval = conf->mon_scrub_interval; finisher.queue(new C_MonContext(this, [this, scrub_interval](int) { Mutex::Locker l(lock); scrub_update_interval(scrub_interval); })); } } void Monitor::update_log_clients() { map log_to_monitors; map log_to_syslog; map log_channel; map log_prio; map log_to_graylog; map log_to_graylog_host; map log_to_graylog_port; uuid_d fsid; string host; if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog, log_channel, log_prio, log_to_graylog, log_to_graylog_host, log_to_graylog_port, fsid, host)) return; clog->update_config(log_to_monitors, log_to_syslog, log_channel, log_prio, log_to_graylog, log_to_graylog_host, log_to_graylog_port, fsid, host); audit_clog->update_config(log_to_monitors, log_to_syslog, log_channel, log_prio, log_to_graylog, log_to_graylog_host, log_to_graylog_port, fsid, host); } int Monitor::sanitize_options() { int r = 0; // mon_lease must be greater than mon_lease_renewal; otherwise we // may incur in leases expiring before they are renewed. if (g_conf()->mon_lease_renew_interval_factor >= 1.0) { clog->error() << "mon_lease_renew_interval_factor (" << g_conf()->mon_lease_renew_interval_factor << ") must be less than 1.0"; r = -EINVAL; } // mon_lease_ack_timeout must be greater than mon_lease to make sure we've // got time to renew the lease and get an ack for it. Having both options // with the same value, for a given small vale, could mean timing out if // the monitors happened to be overloaded -- or even under normal load for // a small enough value. if (g_conf()->mon_lease_ack_timeout_factor <= 1.0) { clog->error() << "mon_lease_ack_timeout_factor (" << g_conf()->mon_lease_ack_timeout_factor << ") must be greater than 1.0"; r = -EINVAL; } return r; } int Monitor::preinit() { lock.Lock(); dout(1) << "preinit fsid " << monmap->fsid << dendl; int r = sanitize_options(); if (r < 0) { derr << "option sanitization failed!" << dendl; lock.Unlock(); return r; } ceph_assert(!logger); { PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last); pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess", PerfCountersBuilder::PRIO_USEFUL); pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions", "sadd", PerfCountersBuilder::PRIO_INTERESTING); pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions", "srm", PerfCountersBuilder::PRIO_INTERESTING); pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions", "strm", PerfCountersBuilder::PRIO_USEFUL); pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in", "ecnt", PerfCountersBuilder::PRIO_USEFUL); pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started", "estt", PerfCountersBuilder::PRIO_INTERESTING); pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won", "ewon", PerfCountersBuilder::PRIO_INTERESTING); pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost", "elst", PerfCountersBuilder::PRIO_INTERESTING); logger = pcb.create_perf_counters(); cct->get_perfcounters_collection()->add(logger); } ceph_assert(!cluster_logger); { PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last); pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors"); pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum"); pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs"); pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up"); pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)"); pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map"); pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster", NULL, 0, unit_t(UNIT_BYTES)); pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space", NULL, 0, unit_t(UNIT_BYTES)); pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space", NULL, 0, unit_t(UNIT_BYTES)); pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools"); pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups"); pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state"); pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state"); pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state"); pcb.add_u64(l_cluster_num_object, "num_object", "Objects"); pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects"); pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects"); pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects"); pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects", NULL, 0, unit_t(UNIT_BYTES)); cluster_logger = pcb.create_perf_counters(); } paxos->init_logger(); // verify cluster_uuid { int r = check_fsid(); if (r == -ENOENT) r = write_fsid(); if (r < 0) { lock.Unlock(); return r; } } // open compatset read_features(); // have we ever joined a quorum? has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0); dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl; if (!has_ever_joined) { // impose initial quorum restrictions? list initial_members; get_str_list(g_conf()->mon_initial_members, initial_members); if (!initial_members.empty()) { dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl; monmap->set_initial_members( g_ceph_context, initial_members, name, messenger->get_myaddrs(), &extra_probe_peers); dout(10) << " monmap is " << *monmap << dendl; dout(10) << " extra probe peers " << extra_probe_peers << dendl; } } else if (!monmap->contains(name)) { derr << "not in monmap and have been in a quorum before; " << "must have been removed" << dendl; if (g_conf()->mon_force_quorum_join) { dout(0) << "we should have died but " << "'mon_force_quorum_join' is set -- allowing boot" << dendl; } else { derr << "commit suicide!" << dendl; lock.Unlock(); return -ENOENT; } } { // We have a potentially inconsistent store state in hands. Get rid of it // and start fresh. bool clear_store = false; if (store->exists("mon_sync", "in_sync")) { dout(1) << __func__ << " clean up potentially inconsistent store state" << dendl; clear_store = true; } if (store->get("mon_sync", "force_sync") > 0) { dout(1) << __func__ << " force sync by clearing store state" << dendl; clear_store = true; } if (clear_store) { set sync_prefixes = get_sync_targets_names(); store->clear(sync_prefixes); } } sync_last_committed_floor = store->get("mon_sync", "last_committed_floor"); dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl; init_paxos(); if (is_keyring_required()) { // we need to bootstrap authentication keys so we can form an // initial quorum. if (authmon()->get_last_committed() == 0) { dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl; bufferlist bl; int err = store->get("mkfs", "keyring", bl); if (err == 0 && bl.length() > 0) { // Attempt to decode and extract keyring only if it is found. KeyRing keyring; auto p = bl.cbegin(); decode(keyring, p); extract_save_mon_key(keyring); } } string keyring_loc = g_conf()->mon_data + "/keyring"; r = keyring.load(cct, keyring_loc); if (r < 0) { EntityName mon_name; mon_name.set_type(CEPH_ENTITY_TYPE_MON); EntityAuth mon_key; if (key_server.get_auth(mon_name, mon_key)) { dout(1) << "copying mon. key from old db to external keyring" << dendl; keyring.add(mon_name, mon_key); bufferlist bl; keyring.encode_plaintext(bl); write_default_keyring(bl); } else { derr << "unable to load initial keyring " << g_conf()->keyring << dendl; lock.Unlock(); return r; } } } admin_hook = new AdminHook(this); AdminSocket* admin_socket = cct->get_admin_socket(); // unlock while registering to avoid mon_lock -> admin socket lock dependency. lock.Unlock(); r = admin_socket->register_command("mon_status", "mon_status", admin_hook, "show current monitor status"); ceph_assert(r == 0); r = admin_socket->register_command("quorum_status", "quorum_status", admin_hook, "show current quorum status"); ceph_assert(r == 0); r = admin_socket->register_command("sync_force", "sync_force name=validate," "type=CephChoices," "strings=--yes-i-really-mean-it", admin_hook, "force sync of and clear monitor store"); ceph_assert(r == 0); r = admin_socket->register_command("add_bootstrap_peer_hint", "add_bootstrap_peer_hint name=addr," "type=CephIPAddr", admin_hook, "add peer address as potential bootstrap" " peer for cluster bringup"); ceph_assert(r == 0); r = admin_socket->register_command("add_bootstrap_peer_hintv", "add_bootstrap_peer_hintv name=addrv," "type=CephString", admin_hook, "add peer address vector as potential bootstrap" " peer for cluster bringup"); ceph_assert(r == 0); r = admin_socket->register_command("quorum enter", "quorum enter", admin_hook, "force monitor back into quorum"); ceph_assert(r == 0); r = admin_socket->register_command("quorum exit", "quorum exit", admin_hook, "force monitor out of the quorum"); ceph_assert(r == 0); r = admin_socket->register_command("ops", "ops", admin_hook, "show the ops currently in flight"); ceph_assert(r == 0); r = admin_socket->register_command("sessions", "sessions", admin_hook, "list existing sessions"); ceph_assert(r == 0); r = admin_socket->register_command("dump_historic_ops", "dump_historic_ops", admin_hook, "show recent ops"); ceph_assert(r == 0); r = admin_socket->register_command("dump_historic_ops_by_duration", "dump_historic_ops_by_duration", admin_hook, "show recent ops, sorted by duration"); ceph_assert(r == 0); r = admin_socket->register_command("dump_historic_slow_ops", "dump_historic_slow_ops", admin_hook, "show recent slow ops"); ceph_assert(r == 0); lock.Lock(); // add ourselves as a conf observer g_conf().add_observer(this); messenger->set_auth_client(this); messenger->set_auth_server(this); mgr_messenger->set_auth_client(this); auth_registry.refresh_config(); lock.Unlock(); return 0; } int Monitor::init() { dout(2) << "init" << dendl; std::lock_guard l(lock); finisher.start(); // start ticker timer.init(); new_tick(); cpu_tp.start(); // i'm ready! messenger->add_dispatcher_tail(this); // kickstart pet mgrclient mgr_client.init(); mgr_messenger->add_dispatcher_tail(&mgr_client); mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls mgrmon()->prime_mgr_client(); state = STATE_PROBING; bootstrap(); // add features of myself into feature_map session_map.feature_map.add_mon(con_self->get_features()); return 0; } void Monitor::init_paxos() { dout(10) << __func__ << dendl; paxos->init(); // init services for (auto& svc : paxos_service) { svc->init(); } refresh_from_paxos(NULL); } void Monitor::refresh_from_paxos(bool *need_bootstrap) { dout(10) << __func__ << dendl; bufferlist bl; int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl); if (r >= 0) { try { auto p = bl.cbegin(); decode(fingerprint, p); } catch (buffer::error& e) { dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl; } } else { dout(10) << __func__ << " no cluster_fingerprint" << dendl; } for (auto& svc : paxos_service) { svc->refresh(need_bootstrap); } for (auto& svc : paxos_service) { svc->post_refresh(); } load_metadata(); } void Monitor::register_cluster_logger() { if (!cluster_logger_registered) { dout(10) << "register_cluster_logger" << dendl; cluster_logger_registered = true; cct->get_perfcounters_collection()->add(cluster_logger); } else { dout(10) << "register_cluster_logger - already registered" << dendl; } } void Monitor::unregister_cluster_logger() { if (cluster_logger_registered) { dout(10) << "unregister_cluster_logger" << dendl; cluster_logger_registered = false; cct->get_perfcounters_collection()->remove(cluster_logger); } else { dout(10) << "unregister_cluster_logger - not registered" << dendl; } } void Monitor::update_logger() { cluster_logger->set(l_cluster_num_mon, monmap->size()); cluster_logger->set(l_cluster_num_mon_quorum, quorum.size()); } void Monitor::shutdown() { dout(1) << "shutdown" << dendl; lock.Lock(); wait_for_paxos_write(); { std::lock_guard l(auth_lock); authmon()->_set_mon_num_rank(0, 0); } state = STATE_SHUTDOWN; lock.Unlock(); g_conf().remove_observer(this); lock.Lock(); if (admin_hook) { cct->get_admin_socket()->unregister_commands(admin_hook); delete admin_hook; admin_hook = NULL; } elector.shutdown(); mgr_client.shutdown(); lock.Unlock(); finisher.wait_for_empty(); finisher.stop(); lock.Lock(); // clean up paxos->shutdown(); for (auto& svc : paxos_service) { svc->shutdown(); } finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED); finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED); timer.shutdown(); cpu_tp.stop(); remove_all_sessions(); log_client.shutdown(); // unlock before msgr shutdown... lock.Unlock(); // shutdown messenger before removing logger from perfcounter collection, // otherwise _ms_dispatch() will try to update deleted logger messenger->shutdown(); mgr_messenger->shutdown(); if (logger) { cct->get_perfcounters_collection()->remove(logger); delete logger; logger = NULL; } if (cluster_logger) { if (cluster_logger_registered) cct->get_perfcounters_collection()->remove(cluster_logger); delete cluster_logger; cluster_logger = NULL; } } void Monitor::wait_for_paxos_write() { if (paxos->is_writing() || paxos->is_writing_previous()) { dout(10) << __func__ << " flushing pending write" << dendl; lock.Unlock(); store->flush(); lock.Lock(); dout(10) << __func__ << " flushed pending write" << dendl; } } void Monitor::respawn() { // --- WARNING TO FUTURE COPY/PASTERS --- // You must also add a call like // // ceph_pthread_setname(pthread_self(), "ceph-mon"); // // to main() so that /proc/$pid/stat field 2 contains "(ceph-mon)" // instead of "(exe)", so that killall (and log rotation) will work. dout(0) << __func__ << dendl; char *new_argv[orig_argc+1]; dout(1) << " e: '" << orig_argv[0] << "'" << dendl; for (int i=0; iget_epoch() == 0) { dout(10) << "reverting to legacy ranks for seed monmap (epoch 0)" << dendl; monmap->calc_legacy_ranks(); } dout(10) << "monmap " << *monmap << dendl; if (monmap->min_mon_release && monmap->min_mon_release + 2 < (int)ceph_release()) { derr << "current monmap has min_mon_release " << (int)monmap->min_mon_release << " (" << ceph_release_name(monmap->min_mon_release) << ") which is >2 releases older than me " << ceph_release() << " (" << ceph_release_name(ceph_release()) << "), stopping." << dendl; exit(0); } // note my rank int newrank = monmap->get_rank(messenger->get_myaddrs()); if (newrank < 0 && rank >= 0) { // was i ever part of the quorum? if (has_ever_joined) { dout(0) << " removed from monmap, suicide." << dendl; exit(0); } } if (newrank >= 0 && monmap->get_addrs(newrank) != messenger->get_myaddrs()) { dout(0) << " monmap addrs for rank " << newrank << " changed, i am " << messenger->get_myaddrs() << ", monmap is " << monmap->get_addrs(newrank) << ", respawning" << dendl; if (monmap->get_epoch()) { // store this map in temp mon_sync location so that we use it on // our next startup derr << " stashing newest monmap " << monmap->get_epoch() << " for next startup" << dendl; bufferlist bl; monmap->encode(bl, -1); auto t(std::make_shared()); t->put("mon_sync", "temp_newer_monmap", bl); store->apply_transaction(t); } respawn(); } if (newrank != rank) { dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl; messenger->set_myname(entity_name_t::MON(newrank)); rank = newrank; // reset all connections, or else our peers will think we are someone else. messenger->mark_down_all(); } // reset state = STATE_PROBING; _reset(); // sync store if (g_conf()->mon_compact_on_bootstrap) { dout(10) << "bootstrap -- triggering compaction" << dendl; store->compact(); dout(10) << "bootstrap -- finished compaction" << dendl; } // singleton monitor? if (monmap->size() == 1 && rank == 0) { win_standalone_election(); return; } reset_probe_timeout(); // i'm outside the quorum if (monmap->contains(name)) outside_quorum.insert(name); // probe monitors dout(10) << "probing other monitors" << dendl; for (unsigned i = 0; i < monmap->size(); i++) { if ((int)i != rank) send_mon_message( new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined, ceph_release()), i); } for (auto& av : extra_probe_peers) { if (av != messenger->get_myaddrs()) { messenger->send_to_mon( new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined, ceph_release()), av); } } } bool Monitor::_add_bootstrap_peer_hint(std::string_view cmd, const cmdmap_t& cmdmap, ostream& ss) { if (is_leader() || is_peon()) { ss << "mon already active; ignoring bootstrap hint"; return true; } entity_addrvec_t addrs; string addrstr; if (cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) { dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' addr '" << addrstr << "'" << dendl; entity_addr_t addr; const char *end = 0; if (!addr.parse(addrstr.c_str(), &end, entity_addr_t::TYPE_ANY)) { ss << "failed to parse addrs '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'"; return false; } addrs.v.push_back(addr); if (addr.get_port() == 0) { addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2); addrs.v[0].set_port(CEPH_MON_PORT_IANA); addrs.v.push_back(addr); addrs.v[1].set_type(entity_addr_t::TYPE_LEGACY); addrs.v[1].set_port(CEPH_MON_PORT_LEGACY); } else if (addr.get_type() == entity_addr_t::TYPE_ANY) { if (addr.get_port() == CEPH_MON_PORT_LEGACY) { addrs.v[0].set_type(entity_addr_t::TYPE_LEGACY); } else { addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2); } } } else if (cmd_getval(g_ceph_context, cmdmap, "addrv", addrstr)) { dout(10) << "_add_bootstrap_peer_hintv '" << cmd << "' addrv '" << addrstr << "'" << dendl; const char *end = 0; if (!addrs.parse(addrstr.c_str(), &end)) { ss << "failed to parse addrs '" << addrstr << "'; syntax is 'add_bootstrap_peer_hintv v2:ip:port[,v1:ip:port]'"; return false; } } else { ss << "no addr or addrv provided"; return false; } extra_probe_peers.insert(addrs); ss << "adding peer " << addrs << " to list: " << extra_probe_peers; return true; } // called by bootstrap(), or on leader|peon -> electing void Monitor::_reset() { dout(10) << __func__ << dendl; // disable authentication { std::lock_guard l(auth_lock); authmon()->_set_mon_num_rank(0, 0); } cancel_probe_timeout(); timecheck_finish(); health_events_cleanup(); health_check_log_times.clear(); scrub_event_cancel(); leader_since = utime_t(); quorum_since = {}; if (!quorum.empty()) { exited_quorum = ceph_clock_now(); } quorum.clear(); outside_quorum.clear(); quorum_feature_map.clear(); scrub_reset(); paxos->restart(); for (auto& svc : paxos_service) { svc->restart(); } } // ----------------------------------------------------------- // sync set Monitor::get_sync_targets_names() { set targets; targets.insert(paxos->get_name()); for (auto& svc : paxos_service) { svc->get_store_prefixes(targets); } ConfigKeyService *config_key_service_ptr = dynamic_cast(config_key_service); ceph_assert(config_key_service_ptr); config_key_service_ptr->get_store_prefixes(targets); return targets; } void Monitor::sync_timeout() { dout(10) << __func__ << dendl; ceph_assert(state == STATE_SYNCHRONIZING); bootstrap(); } void Monitor::sync_obtain_latest_monmap(bufferlist &bl) { dout(1) << __func__ << dendl; MonMap latest_monmap; // Grab latest monmap from MonmapMonitor bufferlist monmon_bl; int err = monmon()->get_monmap(monmon_bl); if (err < 0) { if (err != -ENOENT) { derr << __func__ << " something wrong happened while reading the store: " << cpp_strerror(err) << dendl; ceph_abort_msg("error reading the store"); } } else { latest_monmap.decode(monmon_bl); } // Grab last backed up monmap (if any) and compare epochs if (store->exists("mon_sync", "latest_monmap")) { bufferlist backup_bl; int err = store->get("mon_sync", "latest_monmap", backup_bl); if (err < 0) { derr << __func__ << " something wrong happened while reading the store: " << cpp_strerror(err) << dendl; ceph_abort_msg("error reading the store"); } ceph_assert(backup_bl.length() > 0); MonMap backup_monmap; backup_monmap.decode(backup_bl); if (backup_monmap.epoch > latest_monmap.epoch) latest_monmap = backup_monmap; } // Check if our current monmap's epoch is greater than the one we've // got so far. if (monmap->epoch > latest_monmap.epoch) latest_monmap = *monmap; dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl; latest_monmap.encode(bl, CEPH_FEATURES_ALL); } void Monitor::sync_reset_requester() { dout(10) << __func__ << dendl; if (sync_timeout_event) { timer.cancel_event(sync_timeout_event); sync_timeout_event = NULL; } sync_provider = entity_addrvec_t(); sync_cookie = 0; sync_full = false; sync_start_version = 0; } void Monitor::sync_reset_provider() { dout(10) << __func__ << dendl; sync_providers.clear(); } void Monitor::sync_start(entity_addrvec_t &addrs, bool full) { dout(10) << __func__ << " " << addrs << (full ? " full" : " recent") << dendl; ceph_assert(state == STATE_PROBING || state == STATE_SYNCHRONIZING); state = STATE_SYNCHRONIZING; // make sure are not a provider for anyone! sync_reset_provider(); sync_full = full; if (sync_full) { // stash key state, and mark that we are syncing auto t(std::make_shared()); sync_stash_critical_state(t); t->put("mon_sync", "in_sync", 1); sync_last_committed_floor = std::max(sync_last_committed_floor, paxos->get_version()); dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor " << sync_last_committed_floor << dendl; t->put("mon_sync", "last_committed_floor", sync_last_committed_floor); store->apply_transaction(t); ceph_assert(g_conf()->mon_sync_requester_kill_at != 1); // clear the underlying store set targets = get_sync_targets_names(); dout(10) << __func__ << " clearing prefixes " << targets << dendl; store->clear(targets); // make sure paxos knows it has been reset. this prevents a // bootstrap and then different probe reply order from possibly // deciding a partial or no sync is needed. paxos->init(); ceph_assert(g_conf()->mon_sync_requester_kill_at != 2); } // assume 'other' as the leader. We will update the leader once we receive // a reply to the sync start. sync_provider = addrs; sync_reset_timeout(); MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT); if (!sync_full) m->last_committed = paxos->get_version(); messenger->send_to_mon(m, sync_provider); } void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t) { dout(10) << __func__ << dendl; bufferlist backup_monmap; sync_obtain_latest_monmap(backup_monmap); ceph_assert(backup_monmap.length() > 0); t->put("mon_sync", "latest_monmap", backup_monmap); } void Monitor::sync_reset_timeout() { dout(10) << __func__ << dendl; if (sync_timeout_event) timer.cancel_event(sync_timeout_event); sync_timeout_event = timer.add_event_after( g_conf()->mon_sync_timeout, new C_MonContext(this, [this](int) { sync_timeout(); })); } void Monitor::sync_finish(version_t last_committed) { dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl; ceph_assert(g_conf()->mon_sync_requester_kill_at != 7); if (sync_full) { // finalize the paxos commits auto tx(std::make_shared()); paxos->read_and_prepare_transactions(tx, sync_start_version, last_committed); tx->put(paxos->get_name(), "last_committed", last_committed); dout(30) << __func__ << " final tx dump:\n"; JSONFormatter f(true); tx->dump(&f); f.flush(*_dout); *_dout << dendl; store->apply_transaction(tx); } ceph_assert(g_conf()->mon_sync_requester_kill_at != 8); auto t(std::make_shared()); t->erase("mon_sync", "in_sync"); t->erase("mon_sync", "force_sync"); t->erase("mon_sync", "last_committed_floor"); store->apply_transaction(t); ceph_assert(g_conf()->mon_sync_requester_kill_at != 9); init_paxos(); ceph_assert(g_conf()->mon_sync_requester_kill_at != 10); bootstrap(); } void Monitor::handle_sync(MonOpRequestRef op) { MMonSync *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; switch (m->op) { // provider --------- case MMonSync::OP_GET_COOKIE_FULL: case MMonSync::OP_GET_COOKIE_RECENT: handle_sync_get_cookie(op); break; case MMonSync::OP_GET_CHUNK: handle_sync_get_chunk(op); break; // client ----------- case MMonSync::OP_COOKIE: handle_sync_cookie(op); break; case MMonSync::OP_CHUNK: case MMonSync::OP_LAST_CHUNK: handle_sync_chunk(op); break; case MMonSync::OP_NO_COOKIE: handle_sync_no_cookie(op); break; default: dout(0) << __func__ << " unknown op " << m->op << dendl; ceph_abort_msg("unknown op"); } } // leader void Monitor::_sync_reply_no_cookie(MonOpRequestRef op) { MMonSync *m = static_cast(op->get_req()); MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie); m->get_connection()->send_message(reply); } void Monitor::handle_sync_get_cookie(MonOpRequestRef op) { MMonSync *m = static_cast(op->get_req()); if (is_synchronizing()) { _sync_reply_no_cookie(op); return; } ceph_assert(g_conf()->mon_sync_provider_kill_at != 1); // make sure they can understand us. if ((required_features ^ m->get_connection()->get_features()) & required_features) { dout(5) << " ignoring peer mon." << m->get_source().num() << " has features " << std::hex << m->get_connection()->get_features() << " but we require " << required_features << std::dec << dendl; return; } // make up a unique cookie. include election epoch (which persists // across restarts for the whole cluster) and a counter for this // process instance. there is no need to be unique *across* // monitors, though. uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count; ceph_assert(sync_providers.count(cookie) == 0); dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl; SyncProvider& sp = sync_providers[cookie]; sp.cookie = cookie; sp.addrs = m->get_source_addrs(); sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2); set sync_targets; if (m->op == MMonSync::OP_GET_COOKIE_FULL) { // full scan sync_targets = get_sync_targets_names(); sp.last_committed = paxos->get_version(); sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets); sp.full = true; dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl; } else { // just catch up paxos sp.last_committed = m->last_committed; } dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl; MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie); reply->last_committed = sp.last_committed; m->get_connection()->send_message(reply); } void Monitor::handle_sync_get_chunk(MonOpRequestRef op) { MMonSync *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; if (sync_providers.count(m->cookie) == 0) { dout(10) << __func__ << " no cookie " << m->cookie << dendl; _sync_reply_no_cookie(op); return; } ceph_assert(g_conf()->mon_sync_provider_kill_at != 2); SyncProvider& sp = sync_providers[m->cookie]; sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2); if (sp.last_committed < paxos->get_first_committed() && paxos->get_first_committed() > 1) { dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed << " < our fc " << paxos->get_first_committed() << dendl; sync_providers.erase(m->cookie); _sync_reply_no_cookie(op); return; } MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie); auto tx(std::make_shared()); int bytes_left = g_conf()->mon_sync_max_payload_size; int keys_left = g_conf()->mon_sync_max_payload_keys; while (sp.last_committed < paxos->get_version() && bytes_left > 0 && keys_left > 0) { bufferlist bl; sp.last_committed++; int err = store->get(paxos->get_name(), sp.last_committed, bl); ceph_assert(err == 0); tx->put(paxos->get_name(), sp.last_committed, bl); bytes_left -= bl.length(); --keys_left; dout(20) << __func__ << " including paxos state " << sp.last_committed << dendl; } reply->last_committed = sp.last_committed; if (sp.full && bytes_left > 0 && keys_left > 0) { sp.synchronizer->get_chunk_tx(tx, bytes_left, keys_left); sp.last_key = sp.synchronizer->get_last_key(); reply->last_key = sp.last_key; } if ((sp.full && sp.synchronizer->has_next_chunk()) || sp.last_committed < paxos->get_version()) { dout(10) << __func__ << " chunk, through version " << sp.last_committed << " key " << sp.last_key << dendl; } else { dout(10) << __func__ << " last chunk, through version " << sp.last_committed << " key " << sp.last_key << dendl; reply->op = MMonSync::OP_LAST_CHUNK; ceph_assert(g_conf()->mon_sync_provider_kill_at != 3); // clean up our local state sync_providers.erase(sp.cookie); } encode(*tx, reply->chunk_bl); m->get_connection()->send_message(reply); } // requester void Monitor::handle_sync_cookie(MonOpRequestRef op) { MMonSync *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; if (sync_cookie) { dout(10) << __func__ << " already have a cookie, ignoring" << dendl; return; } if (m->get_source_addrs() != sync_provider) { dout(10) << __func__ << " source does not match, discarding" << dendl; return; } sync_cookie = m->cookie; sync_start_version = m->last_committed; sync_reset_timeout(); sync_get_next_chunk(); ceph_assert(g_conf()->mon_sync_requester_kill_at != 3); } void Monitor::sync_get_next_chunk() { dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl; if (g_conf()->mon_inject_sync_get_chunk_delay > 0) { dout(20) << __func__ << " injecting delay of " << g_conf()->mon_inject_sync_get_chunk_delay << dendl; usleep((long long)(g_conf()->mon_inject_sync_get_chunk_delay * 1000000.0)); } MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie); messenger->send_to_mon(r, sync_provider); ceph_assert(g_conf()->mon_sync_requester_kill_at != 4); } void Monitor::handle_sync_chunk(MonOpRequestRef op) { MMonSync *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; if (m->cookie != sync_cookie) { dout(10) << __func__ << " cookie does not match, discarding" << dendl; return; } if (m->get_source_addrs() != sync_provider) { dout(10) << __func__ << " source does not match, discarding" << dendl; return; } ceph_assert(state == STATE_SYNCHRONIZING); ceph_assert(g_conf()->mon_sync_requester_kill_at != 5); auto tx(std::make_shared()); tx->append_from_encoded(m->chunk_bl); dout(30) << __func__ << " tx dump:\n"; JSONFormatter f(true); tx->dump(&f); f.flush(*_dout); *_dout << dendl; store->apply_transaction(tx); ceph_assert(g_conf()->mon_sync_requester_kill_at != 6); if (!sync_full) { dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl; auto tx(std::make_shared()); paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1, m->last_committed); tx->put(paxos->get_name(), "last_committed", m->last_committed); dout(30) << __func__ << " tx dump:\n"; JSONFormatter f(true); tx->dump(&f); f.flush(*_dout); *_dout << dendl; store->apply_transaction(tx); paxos->init(); // to refresh what we just wrote } if (m->op == MMonSync::OP_CHUNK) { sync_reset_timeout(); sync_get_next_chunk(); } else if (m->op == MMonSync::OP_LAST_CHUNK) { sync_finish(m->last_committed); } } void Monitor::handle_sync_no_cookie(MonOpRequestRef op) { dout(10) << __func__ << dendl; bootstrap(); } void Monitor::sync_trim_providers() { dout(20) << __func__ << dendl; utime_t now = ceph_clock_now(); map::iterator p = sync_providers.begin(); while (p != sync_providers.end()) { if (now > p->second.timeout) { dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.addrs << dendl; sync_providers.erase(p++); } else { ++p; } } } // --------------------------------------------------- // probe void Monitor::cancel_probe_timeout() { if (probe_timeout_event) { dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl; timer.cancel_event(probe_timeout_event); probe_timeout_event = NULL; } else { dout(10) << "cancel_probe_timeout (none scheduled)" << dendl; } } void Monitor::reset_probe_timeout() { cancel_probe_timeout(); probe_timeout_event = new C_MonContext(this, [this](int r) { probe_timeout(r); }); double t = g_conf()->mon_probe_timeout; if (timer.add_event_after(t, probe_timeout_event)) { dout(10) << "reset_probe_timeout " << probe_timeout_event << " after " << t << " seconds" << dendl; } else { probe_timeout_event = nullptr; } } void Monitor::probe_timeout(int r) { dout(4) << "probe_timeout " << probe_timeout_event << dendl; ceph_assert(is_probing() || is_synchronizing()); ceph_assert(probe_timeout_event); probe_timeout_event = NULL; bootstrap(); } void Monitor::handle_probe(MonOpRequestRef op) { MMonProbe *m = static_cast(op->get_req()); dout(10) << "handle_probe " << *m << dendl; if (m->fsid != monmap->fsid) { dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl; return; } switch (m->op) { case MMonProbe::OP_PROBE: handle_probe_probe(op); break; case MMonProbe::OP_REPLY: handle_probe_reply(op); break; case MMonProbe::OP_MISSING_FEATURES: derr << __func__ << " require release " << (int)m->mon_release << " > " << (int)ceph_release() << ", or missing features (have " << CEPH_FEATURES_ALL << ", required " << m->required_features << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL) << ")" << dendl; break; } } void Monitor::handle_probe_probe(MonOpRequestRef op) { MMonProbe *m = static_cast(op->get_req()); dout(10) << "handle_probe_probe " << m->get_source_inst() << *m << " features " << m->get_connection()->get_features() << dendl; uint64_t missing = required_features & ~m->get_connection()->get_features(); if ((m->mon_release > 0 && m->mon_release < monmap->min_mon_release) || missing) { dout(1) << " peer " << m->get_source_addr() << " release " << (int)m->mon_release << " < min_mon_release " << (int)monmap->min_mon_release << ", or missing features " << missing << dendl; MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES, name, has_ever_joined, monmap->min_mon_release); m->required_features = required_features; m->get_connection()->send_message(r); goto out; } if (!is_probing() && !is_synchronizing()) { // If the probing mon is way ahead of us, we need to re-bootstrap. // Normally we capture this case when we initially bootstrap, but // it is possible we pass those checks (we overlap with // quorum-to-be) but fail to join a quorum before it moves past // us. We need to be kicked back to bootstrap so we can // synchonize, not keep calling elections. if (paxos->get_version() + 1 < m->paxos_first_version) { dout(1) << " peer " << m->get_source_addr() << " has first_committed " << "ahead of us, re-bootstrapping" << dendl; bootstrap(); goto out; } } MMonProbe *r; r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined, ceph_release()); r->name = name; r->quorum = quorum; monmap->encode(r->monmap_bl, m->get_connection()->get_features()); r->paxos_first_version = paxos->get_first_committed(); r->paxos_last_version = paxos->get_version(); m->get_connection()->send_message(r); // did we discover a peer here? if (!monmap->contains(m->get_source_addr())) { dout(1) << " adding peer " << m->get_source_addrs() << " to list of hints" << dendl; extra_probe_peers.insert(m->get_source_addrs()); } out: return; } void Monitor::handle_probe_reply(MonOpRequestRef op) { MMonProbe *m = static_cast(op->get_req()); dout(10) << "handle_probe_reply " << m->get_source_inst() << " " << *m << dendl; dout(10) << " monmap is " << *monmap << dendl; // discover name and addrs during probing or electing states. if (!is_probing() && !is_electing()) { return; } // newer map, or they've joined a quorum and we haven't? bufferlist mybl; monmap->encode(mybl, m->get_connection()->get_features()); // make sure it's actually different; the checks below err toward // taking the other guy's map, which could cause us to loop. if (!mybl.contents_equal(m->monmap_bl)) { MonMap *newmap = new MonMap; newmap->decode(m->monmap_bl); if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() || !has_ever_joined)) { dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch() << ", mine was " << monmap->get_epoch() << dendl; delete newmap; monmap->decode(m->monmap_bl); bootstrap(); return; } delete newmap; } // rename peer? string peer_name = monmap->get_name(m->get_source_addr()); if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) { dout(10) << " renaming peer " << m->get_source_addr() << " " << peer_name << " -> " << m->name << " in my monmap" << dendl; monmap->rename(peer_name, m->name); if (is_electing()) { bootstrap(); return; } } else if (peer_name.size()) { dout(10) << " peer name is " << peer_name << dendl; } else { dout(10) << " peer " << m->get_source_addr() << " not in map" << dendl; } // new initial peer? if (monmap->get_epoch() == 0 && monmap->contains(m->name) && monmap->get_addrs(m->name).front().is_blank_ip()) { dout(1) << " learned initial mon " << m->name << " addrs " << m->get_source_addrs() << dendl; monmap->set_addrvec(m->name, m->get_source_addrs()); bootstrap(); return; } // end discover phase if (!is_probing()) { return; } ceph_assert(paxos != NULL); if (is_synchronizing()) { dout(10) << " currently syncing" << dendl; return; } entity_addrvec_t other = m->get_source_addrs(); if (m->paxos_last_version < sync_last_committed_floor) { dout(10) << " peer paxos versions [" << m->paxos_first_version << "," << m->paxos_last_version << "] < my sync_last_committed_floor " << sync_last_committed_floor << ", ignoring" << dendl; } else { if (paxos->get_version() < m->paxos_first_version && m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1. dout(10) << " peer paxos first versions [" << m->paxos_first_version << "," << m->paxos_last_version << "]" << " vs my version " << paxos->get_version() << " (too far ahead)" << dendl; cancel_probe_timeout(); sync_start(other, true); return; } if (paxos->get_version() + g_conf()->paxos_max_join_drift < m->paxos_last_version) { dout(10) << " peer paxos last version " << m->paxos_last_version << " vs my version " << paxos->get_version() << " (too far ahead)" << dendl; cancel_probe_timeout(); sync_start(other, false); return; } } // did the existing cluster complete upgrade to luminous? if (osdmon()->osdmap.get_epoch()) { if (osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) { derr << __func__ << " existing cluster has not completed upgrade to" << " luminous; 'ceph osd require_osd_release luminous' before" << " upgrading" << dendl; exit(0); } if (!osdmon()->osdmap.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS) || !osdmon()->osdmap.test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) { derr << __func__ << " existing cluster has not completed a full luminous" << " scrub to purge legacy snapdir objects; please scrub before" << " upgrading beyond luminous." << dendl; exit(0); } } // is there an existing quorum? if (m->quorum.size()) { dout(10) << " existing quorum " << m->quorum << dendl; dout(10) << " peer paxos version " << m->paxos_last_version << " vs my version " << paxos->get_version() << " (ok)" << dendl; if (monmap->contains(name) && !monmap->get_addrs(name).front().is_blank_ip()) { // i'm part of the cluster; just initiate a new election start_election(); } else { dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl; send_mon_message( new MMonJoin(monmap->fsid, name, messenger->get_myaddrs()), *m->quorum.begin()); } } else { if (monmap->contains(m->name)) { dout(10) << " mon." << m->name << " is outside the quorum" << dendl; outside_quorum.insert(m->name); } else { dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl; return; } unsigned need = monmap->min_quorum_size(); dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl; if (outside_quorum.size() >= need) { if (outside_quorum.count(name)) { dout(10) << " that's enough to form a new quorum, calling election" << dendl; start_election(); } else { dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl; } } else { dout(10) << " that's not yet enough for a new quorum, waiting" << dendl; } } } void Monitor::join_election() { dout(10) << __func__ << dendl; wait_for_paxos_write(); _reset(); state = STATE_ELECTING; logger->inc(l_mon_num_elections); } void Monitor::start_election() { dout(10) << "start_election" << dendl; wait_for_paxos_write(); _reset(); state = STATE_ELECTING; logger->inc(l_mon_num_elections); logger->inc(l_mon_election_call); clog->info() << "mon." << name << " calling monitor election"; elector.call_election(); } void Monitor::win_standalone_election() { dout(1) << "win_standalone_election" << dendl; // bump election epoch, in case the previous epoch included other // monitors; we need to be able to make the distinction. elector.init(); elector.advance_epoch(); rank = monmap->get_rank(name); ceph_assert(rank == 0); set q; q.insert(rank); map metadata; collect_metadata(&metadata[0]); win_election(elector.get_epoch(), q, CEPH_FEATURES_ALL, ceph::features::mon::get_supported(), ceph_release(), metadata); } const utime_t& Monitor::get_leader_since() const { ceph_assert(state == STATE_LEADER); return leader_since; } epoch_t Monitor::get_epoch() { return elector.get_epoch(); } void Monitor::_finish_svc_election() { ceph_assert(state == STATE_LEADER || state == STATE_PEON); for (auto& svc : paxos_service) { // we already called election_finished() on monmon(); avoid callig twice if (state == STATE_LEADER && svc.get() == monmon()) continue; svc->election_finished(); } } void Monitor::win_election(epoch_t epoch, set& active, uint64_t features, const mon_feature_t& mon_features, int min_mon_release, const map& metadata) { dout(10) << __func__ << " epoch " << epoch << " quorum " << active << " features " << features << " mon_features " << mon_features << " min_mon_release " << min_mon_release << dendl; ceph_assert(is_electing()); state = STATE_LEADER; leader_since = ceph_clock_now(); quorum_since = mono_clock::now(); leader = rank; quorum = active; quorum_con_features = features; quorum_mon_features = mon_features; quorum_min_mon_release = min_mon_release; pending_metadata = metadata; outside_quorum.clear(); clog->info() << "mon." << name << " is new leader, mons " << get_quorum_names() << " in quorum (ranks " << quorum << ")"; set_leader_commands(get_local_commands(mon_features)); paxos->leader_init(); // NOTE: tell monmap monitor first. This is important for the // bootstrap case to ensure that the very first paxos proposal // codifies the monmap. Otherwise any manner of chaos can ensue // when monitors are call elections or participating in a paxos // round without agreeing on who the participants are. monmon()->election_finished(); _finish_svc_election(); logger->inc(l_mon_election_win); // inject new metadata in first transaction. { // include previous metadata for missing mons (that aren't part of // the current quorum). map m = metadata; for (unsigned rank = 0; rank < monmap->size(); ++rank) { if (m.count(rank) == 0 && mon_metadata.count(rank)) { m[rank] = mon_metadata[rank]; } } // FIXME: This is a bit sloppy because we aren't guaranteed to submit // a new transaction immediately after the election finishes. We should // do that anyway for other reasons, though. MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); bufferlist bl; encode(m, bl); t->put(MONITOR_STORE_PREFIX, "last_metadata", bl); } finish_election(); if (monmap->size() > 1 && monmap->get_epoch() > 0) { timecheck_start(); health_tick_start(); // Freshen the health status before doing health_to_clog in case // our just-completed election changed the health healthmon()->wait_for_active_ctx(new FunctionContext([this](int r){ dout(20) << "healthmon now active" << dendl; healthmon()->tick(); if (healthmon()->is_proposing()) { dout(20) << __func__ << " healthmon proposing, waiting" << dendl; healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext(this, [this](int r){ ceph_assert(lock.is_locked_by_me()); do_health_to_clog_interval(); })); } else { do_health_to_clog_interval(); } })); scrub_event_start(); } } void Monitor::lose_election(epoch_t epoch, set &q, int l, uint64_t features, const mon_feature_t& mon_features, int min_mon_release) { state = STATE_PEON; leader_since = utime_t(); quorum_since = mono_clock::now(); leader = l; quorum = q; outside_quorum.clear(); quorum_con_features = features; quorum_mon_features = mon_features; quorum_min_mon_release = min_mon_release; dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader << " quorum is " << quorum << " features are " << quorum_con_features << " mon_features are " << quorum_mon_features << " min_mon_release " << min_mon_release << dendl; paxos->peon_init(); _finish_svc_election(); logger->inc(l_mon_election_lose); finish_election(); } namespace { std::string collect_compression_algorithms() { ostringstream os; bool printed = false; for (auto [name, key] : Compressor::compression_algorithms) { if (printed) { os << ", "; } else { printed = true; } std::ignore = key; os << name; } return os.str(); } } void Monitor::collect_metadata(Metadata *m) { collect_sys_info(m, g_ceph_context); (*m)["addrs"] = stringify(messenger->get_myaddrs()); (*m)["compression_algorithms"] = collect_compression_algorithms(); // infer storage device string devname = store->get_devname(); set devnames; get_raw_devices(devname, &devnames); map errs; get_device_metadata(devnames, m, &errs); for (auto& i : errs) { dout(1) << __func__ << " " << i.first << ": " << i.second << dendl; } } void Monitor::finish_election() { apply_quorum_to_compatset_features(); apply_monmap_to_compatset_features(); timecheck_finish(); exited_quorum = utime_t(); finish_contexts(g_ceph_context, waitfor_quorum); finish_contexts(g_ceph_context, maybe_wait_for_quorum); resend_routed_requests(); update_logger(); register_cluster_logger(); // enable authentication { std::lock_guard l(auth_lock); authmon()->_set_mon_num_rank(monmap->size(), rank); } // am i named properly? string cur_name = monmap->get_name(messenger->get_myaddrs()); if (cur_name != name) { dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl; send_mon_message( new MMonJoin(monmap->fsid, name, messenger->get_myaddrs()), *quorum.begin()); } } void Monitor::_apply_compatset_features(CompatSet &new_features) { if (new_features.compare(features) != 0) { CompatSet diff = features.unsupported(new_features); dout(1) << __func__ << " enabling new quorum features: " << diff << dendl; features = new_features; auto t = std::make_shared(); write_features(t); store->apply_transaction(t); calc_quorum_requirements(); } } void Monitor::apply_quorum_to_compatset_features() { CompatSet new_features(features); new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES); if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) { new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC); } new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2); new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3); dout(5) << __func__ << dendl; _apply_compatset_features(new_features); } void Monitor::apply_monmap_to_compatset_features() { CompatSet new_features(features); mon_feature_t monmap_features = monmap->get_required_features(); /* persistent monmap features may go into the compatset. * optional monmap features may not - why? * because optional monmap features may be set/unset by the admin, * and possibly by other means that haven't yet been thought out, * so we can't make the monitor enforce them on start - because they * may go away. * this, of course, does not invalidate setting a compatset feature * for an optional feature - as long as you make sure to clean it up * once you unset it. */ if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) { ceph_assert(ceph::features::mon::get_persistent().contains_all( ceph::features::mon::FEATURE_KRAKEN)); // this feature should only ever be set if the quorum supports it. ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN)); new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN); } if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) { ceph_assert(ceph::features::mon::get_persistent().contains_all( ceph::features::mon::FEATURE_LUMINOUS)); // this feature should only ever be set if the quorum supports it. ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)); new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS); } if (monmap_features.contains_all(ceph::features::mon::FEATURE_MIMIC)) { ceph_assert(ceph::features::mon::get_persistent().contains_all( ceph::features::mon::FEATURE_MIMIC)); // this feature should only ever be set if the quorum supports it. ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_MIMIC)); new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC); } if (monmap_features.contains_all(ceph::features::mon::FEATURE_NAUTILUS)) { ceph_assert(ceph::features::mon::get_persistent().contains_all( ceph::features::mon::FEATURE_NAUTILUS)); // this feature should only ever be set if the quorum supports it. ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_NAUTILUS)); new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS); } dout(5) << __func__ << dendl; _apply_compatset_features(new_features); } void Monitor::calc_quorum_requirements() { required_features = 0; // compatset if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) { required_features |= CEPH_FEATURE_OSDMAP_ENC; } if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) { required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN; } if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) { required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS; } if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_MIMIC)) { required_features |= CEPH_FEATUREMASK_SERVER_MIMIC; } if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS)) { required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS | CEPH_FEATUREMASK_CEPHX_V2; } // monmap if (monmap->get_required_features().contains_all( ceph::features::mon::FEATURE_KRAKEN)) { required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN; } if (monmap->get_required_features().contains_all( ceph::features::mon::FEATURE_LUMINOUS)) { required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS; } if (monmap->get_required_features().contains_all( ceph::features::mon::FEATURE_MIMIC)) { required_features |= CEPH_FEATUREMASK_SERVER_MIMIC; } if (monmap->get_required_features().contains_all( ceph::features::mon::FEATURE_NAUTILUS)) { required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS | CEPH_FEATUREMASK_CEPHX_V2; } dout(10) << __func__ << " required_features " << required_features << dendl; } void Monitor::get_combined_feature_map(FeatureMap *fm) { *fm += session_map.feature_map; for (auto id : quorum) { if (id != rank) { *fm += quorum_feature_map[id]; } } } void Monitor::sync_force(Formatter *f, ostream& ss) { bool free_formatter = false; if (!f) { // louzy/lazy hack: default to json if no formatter has been defined f = new JSONFormatter(); free_formatter = true; } auto tx(std::make_shared()); sync_stash_critical_state(tx); tx->put("mon_sync", "force_sync", 1); store->apply_transaction(tx); f->open_object_section("sync_force"); f->dump_int("ret", 0); f->dump_stream("msg") << "forcing store sync the next time the monitor starts"; f->close_section(); // sync_force f->flush(ss); if (free_formatter) delete f; } void Monitor::_quorum_status(Formatter *f, ostream& ss) { bool free_formatter = false; if (!f) { // louzy/lazy hack: default to json if no formatter has been defined f = new JSONFormatter(); free_formatter = true; } f->open_object_section("quorum_status"); f->dump_int("election_epoch", get_epoch()); f->open_array_section("quorum"); for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) f->dump_int("mon", *p); f->close_section(); // quorum list quorum_names = get_quorum_names(); f->open_array_section("quorum_names"); for (list::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p) f->dump_string("mon", *p); f->close_section(); // quorum_names f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin())); if (!quorum.empty()) { f->dump_int( "quorum_age", std::chrono::duration_cast( mono_clock::now() - quorum_since).count()); } f->open_object_section("monmap"); monmap->dump(f); f->close_section(); // monmap f->close_section(); // quorum_status f->flush(ss); if (free_formatter) delete f; } void Monitor::get_mon_status(Formatter *f, ostream& ss) { bool free_formatter = false; if (!f) { // louzy/lazy hack: default to json if no formatter has been defined f = new JSONFormatter(); free_formatter = true; } f->open_object_section("mon_status"); f->dump_string("name", name); f->dump_int("rank", rank); f->dump_string("state", get_state_name()); f->dump_int("election_epoch", get_epoch()); f->open_array_section("quorum"); for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) { f->dump_int("mon", *p); } f->close_section(); // quorum if (!quorum.empty()) { f->dump_int( "quorum_age", std::chrono::duration_cast( mono_clock::now() - quorum_since).count()); } f->open_object_section("features"); f->dump_stream("required_con") << required_features; mon_feature_t req_mon_features = get_required_mon_features(); req_mon_features.dump(f, "required_mon"); f->dump_stream("quorum_con") << quorum_con_features; quorum_mon_features.dump(f, "quorum_mon"); f->close_section(); // features f->open_array_section("outside_quorum"); for (set::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p) f->dump_string("mon", *p); f->close_section(); // outside_quorum f->open_array_section("extra_probe_peers"); for (set::iterator p = extra_probe_peers.begin(); p != extra_probe_peers.end(); ++p) { f->dump_object("peer", *p); } f->close_section(); // extra_probe_peers f->open_array_section("sync_provider"); for (map::const_iterator p = sync_providers.begin(); p != sync_providers.end(); ++p) { f->dump_unsigned("cookie", p->second.cookie); f->dump_object("addrs", p->second.addrs); f->dump_stream("timeout") << p->second.timeout; f->dump_unsigned("last_committed", p->second.last_committed); f->dump_stream("last_key") << p->second.last_key; } f->close_section(); if (is_synchronizing()) { f->open_object_section("sync"); f->dump_stream("sync_provider") << sync_provider; f->dump_unsigned("sync_cookie", sync_cookie); f->dump_unsigned("sync_start_version", sync_start_version); f->close_section(); } if (g_conf()->mon_sync_provider_kill_at > 0) f->dump_int("provider_kill_at", g_conf()->mon_sync_provider_kill_at); if (g_conf()->mon_sync_requester_kill_at > 0) f->dump_int("requester_kill_at", g_conf()->mon_sync_requester_kill_at); f->open_object_section("monmap"); monmap->dump(f); f->close_section(); f->dump_object("feature_map", session_map.feature_map); f->close_section(); // mon_status if (free_formatter) { // flush formatter to ss and delete it iff we created the formatter f->flush(ss); delete f; } } // health status to clog void Monitor::health_tick_start() { if (!cct->_conf->mon_health_to_clog || cct->_conf->mon_health_to_clog_tick_interval <= 0) return; dout(15) << __func__ << dendl; health_tick_stop(); health_tick_event = timer.add_event_after( cct->_conf->mon_health_to_clog_tick_interval, new C_MonContext(this, [this](int r) { if (r < 0) return; health_tick_start(); })); } void Monitor::health_tick_stop() { dout(15) << __func__ << dendl; if (health_tick_event) { timer.cancel_event(health_tick_event); health_tick_event = NULL; } } utime_t Monitor::health_interval_calc_next_update() { utime_t now = ceph_clock_now(); time_t secs = now.sec(); int remainder = secs % cct->_conf->mon_health_to_clog_interval; int adjustment = cct->_conf->mon_health_to_clog_interval - remainder; utime_t next = utime_t(secs + adjustment, 0); dout(20) << __func__ << " now: " << now << "," << " next: " << next << "," << " interval: " << cct->_conf->mon_health_to_clog_interval << dendl; return next; } void Monitor::health_interval_start() { dout(15) << __func__ << dendl; if (!cct->_conf->mon_health_to_clog || cct->_conf->mon_health_to_clog_interval <= 0) { return; } health_interval_stop(); utime_t next = health_interval_calc_next_update(); health_interval_event = new C_MonContext(this, [this](int r) { if (r < 0) return; do_health_to_clog_interval(); }); if (!timer.add_event_at(next, health_interval_event)) { health_interval_event = nullptr; } } void Monitor::health_interval_stop() { dout(15) << __func__ << dendl; if (health_interval_event) { timer.cancel_event(health_interval_event); } health_interval_event = NULL; } void Monitor::health_events_cleanup() { health_tick_stop(); health_interval_stop(); health_status_cache.reset(); } void Monitor::health_to_clog_update_conf(const std::set &changed) { dout(20) << __func__ << dendl; if (changed.count("mon_health_to_clog")) { if (!cct->_conf->mon_health_to_clog) { health_events_cleanup(); return; } else { if (!health_tick_event) { health_tick_start(); } if (!health_interval_event) { health_interval_start(); } } } if (changed.count("mon_health_to_clog_interval")) { if (cct->_conf->mon_health_to_clog_interval <= 0) { health_interval_stop(); } else { health_interval_start(); } } if (changed.count("mon_health_to_clog_tick_interval")) { if (cct->_conf->mon_health_to_clog_tick_interval <= 0) { health_tick_stop(); } else { health_tick_start(); } } } void Monitor::do_health_to_clog_interval() { // outputting to clog may have been disabled in the conf // since we were scheduled. if (!cct->_conf->mon_health_to_clog || cct->_conf->mon_health_to_clog_interval <= 0) return; dout(10) << __func__ << dendl; // do we have a cached value for next_clog_update? if not, // do we know when the last update was? do_health_to_clog(true); health_interval_start(); } void Monitor::do_health_to_clog(bool force) { // outputting to clog may have been disabled in the conf // since we were scheduled. if (!cct->_conf->mon_health_to_clog || cct->_conf->mon_health_to_clog_interval <= 0) return; dout(10) << __func__ << (force ? " (force)" : "") << dendl; string summary; health_status_t level = get_health_status(false, nullptr, &summary); if (!force && summary == health_status_cache.summary && level == health_status_cache.overall) return; if (g_conf()->mon_health_detail_to_clog && summary != health_status_cache.summary && level != HEALTH_OK) { string details; level = get_health_status(true, nullptr, &details); clog->health(level) << "Health detail: " << details; } else { clog->health(level) << "overall " << summary; } health_status_cache.summary = summary; health_status_cache.overall = level; } health_status_t Monitor::get_health_status( bool want_detail, Formatter *f, std::string *plain, const char *sep1, const char *sep2) { health_status_t r = HEALTH_OK; if (f) { f->open_object_section("health"); f->open_object_section("checks"); } string summary; string *psummary = f ? nullptr : &summary; for (auto& svc : paxos_service) { r = std::min(r, svc->get_health_checks().dump_summary( f, psummary, sep2, want_detail)); } if (f) { f->close_section(); f->dump_stream("status") << r; f->close_section(); } else { // one-liner: HEALTH_FOO[ thing1[; thing2 ...]] *plain = stringify(r); if (summary.size()) { *plain += sep1; *plain += summary; } *plain += "\n"; } if (want_detail && !f) { for (auto& svc : paxos_service) { svc->get_health_checks().dump_detail(plain); } } return r; } void Monitor::log_health( const health_check_map_t& updated, const health_check_map_t& previous, MonitorDBStore::TransactionRef t) { if (!g_conf()->mon_health_to_clog) { return; } const utime_t now = ceph_clock_now(); // FIXME: log atomically as part of @t instead of using clog. dout(10) << __func__ << " updated " << updated.checks.size() << " previous " << previous.checks.size() << dendl; const auto min_log_period = g_conf().get_val( "mon_health_log_update_period"); for (auto& p : updated.checks) { auto q = previous.checks.find(p.first); bool logged = false; if (q == previous.checks.end()) { // new ostringstream ss; ss << "Health check failed: " << p.second.summary << " (" << p.first << ")"; clog->health(p.second.severity) << ss.str(); logged = true; } else { if (p.second.summary != q->second.summary || p.second.severity != q->second.severity) { auto status_iter = health_check_log_times.find(p.first); if (status_iter != health_check_log_times.end()) { if (p.second.severity == q->second.severity && now - status_iter->second.updated_at < min_log_period) { // We already logged this recently and the severity is unchanged, // so skip emitting an update of the summary string. // We'll get an update out of tick() later if the check // is still failing. continue; } } // summary or severity changed (ignore detail changes at this level) ostringstream ss; ss << "Health check update: " << p.second.summary << " (" << p.first << ")"; clog->health(p.second.severity) << ss.str(); logged = true; } } // Record the time at which we last logged, so that we can check this // when considering whether/when to print update messages. if (logged) { auto iter = health_check_log_times.find(p.first); if (iter == health_check_log_times.end()) { health_check_log_times.emplace(p.first, HealthCheckLogStatus( p.second.severity, p.second.summary, now)); } else { iter->second = HealthCheckLogStatus( p.second.severity, p.second.summary, now); } } } for (auto& p : previous.checks) { if (!updated.checks.count(p.first)) { // cleared ostringstream ss; if (p.first == "DEGRADED_OBJECTS") { clog->info() << "All degraded objects recovered"; } else if (p.first == "OSD_FLAGS") { clog->info() << "OSD flags cleared"; } else { clog->info() << "Health check cleared: " << p.first << " (was: " << p.second.summary << ")"; } if (health_check_log_times.count(p.first)) { health_check_log_times.erase(p.first); } } } if (previous.checks.size() && updated.checks.size() == 0) { // We might be going into a fully healthy state, check // other subsystems bool any_checks = false; for (auto& svc : paxos_service) { if (&(svc->get_health_checks()) == &(previous)) { // Ignore the ones we're clearing right now continue; } if (svc->get_health_checks().checks.size() > 0) { any_checks = true; break; } } if (!any_checks) { clog->info() << "Cluster is now healthy"; } } } void Monitor::get_cluster_status(stringstream &ss, Formatter *f) { if (f) f->open_object_section("status"); mono_clock::time_point now = mono_clock::now(); if (f) { f->dump_stream("fsid") << monmap->get_fsid(); get_health_status(false, f, nullptr); f->dump_unsigned("election_epoch", get_epoch()); { f->open_array_section("quorum"); for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) f->dump_int("rank", *p); f->close_section(); f->open_array_section("quorum_names"); for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) f->dump_string("id", monmap->get_name(*p)); f->close_section(); f->dump_int( "quorum_age", std::chrono::duration_cast( mono_clock::now() - quorum_since).count()); } f->open_object_section("monmap"); monmap->dump_summary(f); f->close_section(); f->open_object_section("osdmap"); osdmon()->osdmap.print_summary(f, cout, string(12, ' ')); f->close_section(); f->open_object_section("pgmap"); mgrstatmon()->print_summary(f, NULL); f->close_section(); f->open_object_section("fsmap"); mdsmon()->get_fsmap().print_summary(f, NULL); f->close_section(); f->open_object_section("mgrmap"); mgrmon()->get_map().print_summary(f, nullptr); f->close_section(); f->dump_object("servicemap", mgrstatmon()->get_service_map()); f->open_object_section("progress_events"); for (auto& i : mgrstatmon()->get_progress_events()) { f->dump_object(i.first.c_str(), i.second); } f->close_section(); f->close_section(); } else { ss << " cluster:\n"; ss << " id: " << monmap->get_fsid() << "\n"; string health; get_health_status(false, nullptr, &health, "\n ", "\n "); ss << " health: " << health << "\n"; ss << "\n \n services:\n"; { size_t maxlen = 3; auto& service_map = mgrstatmon()->get_service_map(); for (auto& p : service_map.services) { maxlen = std::max(maxlen, p.first.size()); } string spacing(maxlen - 3, ' '); const auto quorum_names = get_quorum_names(); const auto mon_count = monmap->mon_info.size(); ss << " mon: " << spacing << mon_count << " daemons, quorum " << quorum_names << " (age " << timespan_str(now - quorum_since) << ")"; if (quorum_names.size() != mon_count) { std::list out_of_q; for (size_t i = 0; i < monmap->ranks.size(); ++i) { if (quorum.count(i) == 0) { out_of_q.push_back(monmap->ranks[i]); } } ss << ", out of quorum: " << joinify(out_of_q.begin(), out_of_q.end(), std::string(", ")); } ss << "\n"; if (mgrmon()->in_use()) { ss << " mgr: " << spacing; mgrmon()->get_map().print_summary(nullptr, &ss); ss << "\n"; } if (mdsmon()->should_print_status()) { ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n"; } ss << " osd: " << spacing; osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' ')); ss << "\n"; for (auto& p : service_map.services) { const std::string &service = p.first; // filter out normal ceph entity types if (ServiceMap::is_normal_ceph_entity(service)) { continue; } ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ') << p.second.get_summary() << "\n"; } } { auto& service_map = mgrstatmon()->get_service_map(); if (!service_map.services.empty()) { ss << "\n \n task status:\n"; { for (auto &p : service_map.services) { ss << p.second.get_task_summary(p.first); } } } } ss << "\n \n data:\n"; mgrstatmon()->print_summary(NULL, &ss); auto& pem = mgrstatmon()->get_progress_events(); if (!pem.empty()) { ss << "\n \n progress:\n"; for (auto& i : pem) { ss << " " << i.second.message << "\n"; ss << " ["; unsigned j; for (j = 0; j < (unsigned)(i.second.progress * 30.0); ++j) { ss << '='; } for (; j < 30; ++j) { ss << '.'; } ss << "]\n"; } } ss << "\n "; } } void Monitor::_generate_command_map(cmdmap_t& cmdmap, map ¶m_str_map) { for (auto p = cmdmap.begin(); p != cmdmap.end(); ++p) { if (p->first == "prefix") continue; if (p->first == "caps") { vector cv; if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) && cv.size() % 2 == 0) { for (unsigned i = 0; i < cv.size(); i += 2) { string k = string("caps_") + cv[i]; param_str_map[k] = cv[i + 1]; } continue; } } param_str_map[p->first] = cmd_vartype_stringify(p->second); } } const MonCommand *Monitor::_get_moncommand( const string &cmd_prefix, const vector& cmds) { for (auto& c : cmds) { if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) { return &c; } } return nullptr; } bool Monitor::_allowed_command(MonSession *s, const string &module, const string &prefix, const cmdmap_t& cmdmap, const map& param_str_map, const MonCommand *this_cmd) { bool cmd_r = this_cmd->requires_perm('r'); bool cmd_w = this_cmd->requires_perm('w'); bool cmd_x = this_cmd->requires_perm('x'); bool capable = s->caps.is_capable( g_ceph_context, s->entity_name, module, prefix, param_str_map, cmd_r, cmd_w, cmd_x, s->get_peer_socket_addr()); dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl; return capable; } void Monitor::format_command_descriptions(const std::vector &commands, Formatter *f, uint64_t features, bufferlist *rdata) { int cmdnum = 0; f->open_object_section("command_descriptions"); for (const auto &cmd : commands) { unsigned flags = cmd.flags; ostringstream secname; secname << "cmd" << setfill('0') << std::setw(3) << cmdnum; dump_cmddesc_to_json(f, features, secname.str(), cmd.cmdstring, cmd.helpstring, cmd.module, cmd.req_perms, flags); cmdnum++; } f->close_section(); // command_descriptions f->flush(*rdata); } bool Monitor::is_keyring_required() { return auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX) || auth_service_required.is_supported_auth(CEPH_AUTH_CEPHX) || auth_cluster_required.is_supported_auth(CEPH_AUTH_GSS) || auth_service_required.is_supported_auth(CEPH_AUTH_GSS); } struct C_MgrProxyCommand : public Context { Monitor *mon; MonOpRequestRef op; uint64_t size; bufferlist outbl; string outs; C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s) : mon(mon), op(op), size(s) { } void finish(int r) { std::lock_guard l(mon->lock); mon->mgr_proxy_bytes -= size; mon->reply_command(op, r, outs, outbl, 0); } }; void Monitor::handle_command(MonOpRequestRef op) { ceph_assert(op->is_type_command()); MMonCommand *m = static_cast(op->get_req()); if (m->fsid != monmap->fsid) { dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl; reply_command(op, -EPERM, "wrong fsid", 0); return; } MonSession *session = op->get_session(); if (!session) { dout(5) << __func__ << " dropping stray message " << *m << dendl; return; } if (m->cmd.empty()) { string rs = "No command supplied"; reply_command(op, -EINVAL, rs, 0); return; } string prefix; vector fullcmd; cmdmap_t cmdmap; stringstream ss, ds; bufferlist rdata; string rs; int r = -EINVAL; rs = "unrecognized command"; if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { // ss has reason for failure r = -EINVAL; rs = ss.str(); if (!m->get_source().is_mon()) // don't reply to mon->mon commands reply_command(op, r, rs, 0); return; } // check return value. If no prefix parameter provided, // return value will be false, then return error info. if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) { reply_command(op, -EINVAL, "command prefix not found", 0); return; } // check prefix is empty if (prefix.empty()) { reply_command(op, -EINVAL, "command prefix must not be empty", 0); return; } if (prefix == "get_command_descriptions") { bufferlist rdata; Formatter *f = Formatter::create("json"); std::vector commands = static_cast( paxos_service[PAXOS_MGR].get())->get_command_descs(); for (auto& c : leader_mon_commands) { commands.push_back(c); } auto features = m->get_connection()->get_features(); format_command_descriptions(commands, f, features, &rdata); delete f; reply_command(op, 0, "", rdata, 0); return; } string module; string err; dout(0) << "handle_command " << *m << dendl; string format; cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain")); boost::scoped_ptr f(Formatter::create(format)); get_str_vec(prefix, fullcmd); // make sure fullcmd is not empty. // invalid prefix will cause empty vector fullcmd. // such as, prefix=";,,;" if (fullcmd.empty()) { reply_command(op, -EINVAL, "command requires a prefix to be valid", 0); return; } module = fullcmd[0]; // validate command is in leader map const MonCommand *leader_cmd; const auto& mgr_cmds = mgrmon()->get_command_descs(); const MonCommand *mgr_cmd = nullptr; if (!mgr_cmds.empty()) { mgr_cmd = _get_moncommand(prefix, mgr_cmds); } leader_cmd = _get_moncommand(prefix, leader_mon_commands); if (!leader_cmd) { leader_cmd = mgr_cmd; if (!leader_cmd) { reply_command(op, -EINVAL, "command not known", 0); return; } } // validate command is in our map & matches, or forward if it is allowed const MonCommand *mon_cmd = _get_moncommand( prefix, get_local_commands(quorum_mon_features)); if (!mon_cmd) { mon_cmd = mgr_cmd; } if (!is_leader()) { if (!mon_cmd) { if (leader_cmd->is_noforward()) { reply_command(op, -EINVAL, "command not locally supported and not allowed to forward", 0); return; } dout(10) << "Command not locally supported, forwarding request " << m << dendl; forward_request_leader(op); return; } else if (!mon_cmd->is_compat(leader_cmd)) { if (mon_cmd->is_noforward()) { reply_command(op, -EINVAL, "command not compatible with leader and not allowed to forward", 0); return; } dout(10) << "Command not compatible with leader, forwarding request " << m << dendl; forward_request_leader(op); return; } } if (mon_cmd->is_obsolete() || (cct->_conf->mon_debug_deprecated_as_obsolete && mon_cmd->is_deprecated())) { reply_command(op, -ENOTSUP, "command is obsolete; please check usage and/or man page", 0); return; } if (session->proxy_con && mon_cmd->is_noforward()) { dout(10) << "Got forward for noforward command " << m << dendl; reply_command(op, -EINVAL, "forward for noforward command", rdata, 0); return; } /* what we perceive as being the service the command falls under */ string service(mon_cmd->module); dout(25) << __func__ << " prefix='" << prefix << "' module='" << module << "' service='" << service << "'" << dendl; bool cmd_is_rw = (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x')); // validate user's permissions for requested command map param_str_map; _generate_command_map(cmdmap, param_str_map); if (!_allowed_command(session, service, prefix, cmdmap, param_str_map, mon_cmd)) { dout(1) << __func__ << " access denied" << dendl; if (prefix != "config set" && prefix != "config-key set") (cmd_is_rw ? audit_clog->info() : audit_clog->debug()) << "from='" << session->name << " " << session->addrs << "' " << "entity='" << session->entity_name << "' " << "cmd=" << m->cmd << ": access denied"; reply_command(op, -EACCES, "access denied", 0); return; } if (prefix != "config set" && prefix != "config-key set") (cmd_is_rw ? audit_clog->info() : audit_clog->debug()) << "from='" << session->name << " " << session->addrs << "' " << "entity='" << session->entity_name << "' " << "cmd=" << m->cmd << ": dispatch"; if (mon_cmd->is_mgr()) { const auto& hdr = m->get_header(); uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len; uint64_t max = g_conf().get_val("mon_client_bytes") * g_conf().get_val("mon_mgr_proxy_client_bytes_ratio"); if (mgr_proxy_bytes + size > max) { dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes << " + " << size << " > max " << max << dendl; reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0); return; } mgr_proxy_bytes += size; dout(10) << __func__ << " proxying mgr command (+" << size << " -> " << mgr_proxy_bytes << ")" << dendl; C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size); mgr_client.start_command(m->cmd, m->get_data(), &fin->outbl, &fin->outs, new C_OnFinisher(fin, &finisher)); return; } if ((module == "mds" || module == "fs") && prefix != "fs authorize") { mdsmon()->dispatch(op); return; } if ((module == "osd" || prefix == "pg map" || prefix == "pg repeer") && prefix != "osd last-stat-seq") { osdmon()->dispatch(op); return; } if (module == "config") { configmon()->dispatch(op); return; } if (module == "mon" && /* Let the Monitor class handle the following commands: * 'mon compact' * 'mon scrub' * 'mon sync force' */ prefix != "mon compact" && prefix != "mon scrub" && prefix != "mon sync force" && prefix != "mon metadata" && prefix != "mon versions" && prefix != "mon count-metadata" && prefix != "mon ok-to-stop" && prefix != "mon ok-to-add-offline" && prefix != "mon ok-to-rm") { monmon()->dispatch(op); return; } if (module == "auth" || prefix == "fs authorize") { authmon()->dispatch(op); return; } if (module == "log") { logmon()->dispatch(op); return; } if (module == "config-key") { config_key_service->dispatch(op); return; } if (module == "mgr") { mgrmon()->dispatch(op); return; } if (prefix == "fsid") { if (f) { f->open_object_section("fsid"); f->dump_stream("fsid") << monmap->fsid; f->close_section(); f->flush(rdata); } else { ds << monmap->fsid; rdata.append(ds); } reply_command(op, 0, "", rdata, 0); return; } if (prefix == "scrub" || prefix == "mon scrub") { wait_for_paxos_write(); if (is_leader()) { int r = scrub_start(); reply_command(op, r, "", rdata, 0); } else if (is_peon()) { forward_request_leader(op); } else { reply_command(op, -EAGAIN, "no quorum", rdata, 0); } return; } if (prefix == "compact" || prefix == "mon compact") { dout(1) << "triggering manual compaction" << dendl; auto start = ceph::coarse_mono_clock::now(); store->compact_async(); auto end = ceph::coarse_mono_clock::now(); double duration = std::chrono::duration(end-start).count(); dout(1) << "finished manual compaction in " << duration << " seconds" << dendl; ostringstream oss; oss << "compacted " << g_conf().get_val("mon_keyvaluedb") << " in " << duration << " seconds"; rs = oss.str(); r = 0; } else if (prefix == "injectargs") { vector injected_args; cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args); if (!injected_args.empty()) { dout(0) << "parsing injected options '" << injected_args << "'" << dendl; ostringstream oss; r = g_conf().injectargs(str_join(injected_args, " "), &oss); ss << "injectargs:" << oss.str(); rs = ss.str(); goto out; } else { rs = "must supply options to be parsed in a single string"; r = -EINVAL; } } else if (prefix == "time-sync-status") { if (!f) f.reset(Formatter::create("json-pretty")); f->open_object_section("time_sync"); if (!timecheck_skews.empty()) { f->open_object_section("time_skew_status"); for (auto& i : timecheck_skews) { double skew = i.second; double latency = timecheck_latencies[i.first]; string name = monmap->get_name(i.first); ostringstream tcss; health_status_t tcstatus = timecheck_status(tcss, skew, latency); f->open_object_section(name.c_str()); f->dump_float("skew", skew); f->dump_float("latency", latency); f->dump_stream("health") << tcstatus; if (tcstatus != HEALTH_OK) { f->dump_stream("details") << tcss.str(); } f->close_section(); } f->close_section(); } f->open_object_section("timechecks"); f->dump_unsigned("epoch", get_epoch()); f->dump_int("round", timecheck_round); f->dump_stream("round_status") << ((timecheck_round%2) ? "on-going" : "finished"); f->close_section(); f->close_section(); f->flush(rdata); r = 0; rs = ""; } else if (prefix == "config set") { std::string key; cmd_getval(cct, cmdmap, "key", key); std::string val; cmd_getval(cct, cmdmap, "value", val); r = g_conf().set_val(key, val, &ss); if (r == 0) { g_conf().apply_changes(nullptr); } rs = ss.str(); goto out; } else if (prefix == "status" || prefix == "health" || prefix == "df") { string detail; cmd_getval(g_ceph_context, cmdmap, "detail", detail); if (prefix == "status") { // get_cluster_status handles f == NULL get_cluster_status(ds, f.get()); if (f) { f->flush(ds); ds << '\n'; } rdata.append(ds); } else if (prefix == "health") { string plain; get_health_status(detail == "detail", f.get(), f ? nullptr : &plain); if (f) { f->flush(rdata); } else { rdata.append(plain); } } else if (prefix == "df") { bool verbose = (detail == "detail"); if (f) f->open_object_section("stats"); mgrstatmon()->dump_cluster_stats(&ds, f.get(), verbose); if (!f) { ds << "\n \n"; } mgrstatmon()->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose); if (f) { f->close_section(); f->flush(ds); ds << '\n'; } } else { ceph_abort_msg("We should never get here!"); return; } rdata.append(ds); rs = ""; r = 0; } else if (prefix == "report") { // this must be formatted, in its current form if (!f) f.reset(Formatter::create("json-pretty")); f->open_object_section("report"); f->dump_stream("cluster_fingerprint") << fingerprint; f->dump_string("version", ceph_version_to_str()); f->dump_string("commit", git_version_to_str()); f->dump_stream("timestamp") << ceph_clock_now(); vector tagsvec; cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec); string tagstr = str_join(tagsvec, " "); if (!tagstr.empty()) tagstr = tagstr.substr(0, tagstr.find_last_of(' ')); f->dump_string("tag", tagstr); get_health_status(true, f.get(), nullptr); monmon()->dump_info(f.get()); osdmon()->dump_info(f.get()); mdsmon()->dump_info(f.get()); authmon()->dump_info(f.get()); mgrstatmon()->dump_info(f.get()); paxos->dump_info(f.get()); f->close_section(); f->flush(rdata); ostringstream ss2; ss2 << "report " << rdata.crc32c(CEPH_MON_PORT_LEGACY); rs = ss2.str(); r = 0; } else if (prefix == "osd last-stat-seq") { int64_t osd; cmd_getval(g_ceph_context, cmdmap, "id", osd); uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd); if (f) { f->dump_unsigned("seq", seq); f->flush(ds); } else { ds << seq; rdata.append(ds); } rs = ""; r = 0; } else if (prefix == "node ls") { string node_type("all"); cmd_getval(g_ceph_context, cmdmap, "type", node_type); if (!f) f.reset(Formatter::create("json-pretty")); if (node_type == "all") { f->open_object_section("nodes"); print_nodes(f.get(), ds); osdmon()->print_nodes(f.get()); mdsmon()->print_nodes(f.get()); mgrmon()->print_nodes(f.get()); f->close_section(); } else if (node_type == "mon") { print_nodes(f.get(), ds); } else if (node_type == "osd") { osdmon()->print_nodes(f.get()); } else if (node_type == "mds") { mdsmon()->print_nodes(f.get()); } else if (node_type == "mgr") { mgrmon()->print_nodes(f.get()); } f->flush(ds); rdata.append(ds); rs = ""; r = 0; } else if (prefix == "features") { if (!is_leader() && !is_peon()) { dout(10) << " waiting for quorum" << dendl; waitfor_quorum.push_back(new C_RetryMessage(this, op)); return; } if (!is_leader()) { forward_request_leader(op); return; } if (!f) f.reset(Formatter::create("json-pretty")); FeatureMap fm; get_combined_feature_map(&fm); f->dump_object("features", fm); f->flush(rdata); rs = ""; r = 0; } else if (prefix == "mon metadata") { if (!f) f.reset(Formatter::create("json-pretty")); string name; bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name); if (!all) { // Dump a single mon's metadata int mon = monmap->get_rank(name); if (mon < 0) { rs = "requested mon not found"; r = -ENOENT; goto out; } f->open_object_section("mon_metadata"); r = get_mon_metadata(mon, f.get(), ds); f->close_section(); } else { // Dump all mons' metadata r = 0; f->open_array_section("mon_metadata"); for (unsigned int rank = 0; rank < monmap->size(); ++rank) { std::ostringstream get_err; f->open_object_section("mon"); f->dump_string("name", monmap->get_name(rank)); r = get_mon_metadata(rank, f.get(), get_err); f->close_section(); if (r == -ENOENT || r == -EINVAL) { dout(1) << get_err.str() << dendl; // Drop error, list what metadata we do have r = 0; } else if (r != 0) { derr << "Unexpected error from get_mon_metadata: " << cpp_strerror(r) << dendl; ds << get_err.str(); break; } } f->close_section(); } f->flush(ds); rdata.append(ds); rs = ""; } else if (prefix == "mon versions") { if (!f) f.reset(Formatter::create("json-pretty")); count_metadata("ceph_version", f.get()); f->flush(ds); rdata.append(ds); rs = ""; r = 0; } else if (prefix == "mon count-metadata") { if (!f) f.reset(Formatter::create("json-pretty")); string field; cmd_getval(g_ceph_context, cmdmap, "property", field); count_metadata(field, f.get()); f->flush(ds); rdata.append(ds); rs = ""; r = 0; } else if (prefix == "quorum_status") { // make sure our map is readable and up to date if (!is_leader() && !is_peon()) { dout(10) << " waiting for quorum" << dendl; waitfor_quorum.push_back(new C_RetryMessage(this, op)); return; } _quorum_status(f.get(), ds); rdata.append(ds); rs = ""; r = 0; } else if (prefix == "mon ok-to-stop") { vector ids; if (!cmd_getval(g_ceph_context, cmdmap, "ids", ids)) { r = -EINVAL; goto out; } set wouldbe; for (auto rank : quorum) { wouldbe.insert(monmap->get_name(rank)); } for (auto& n : ids) { if (monmap->contains(n)) { wouldbe.erase(n); } } if (wouldbe.size() < monmap->min_quorum_size()) { r = -EBUSY; rs = "not enough monitors would be available (" + stringify(wouldbe) + ") after stopping mons " + stringify(ids); goto out; } r = 0; rs = "quorum should be preserved (" + stringify(wouldbe) + ") after stopping " + stringify(ids); } else if (prefix == "mon ok-to-add-offline") { if (quorum.size() < monmap->min_quorum_size(monmap->size() + 1)) { rs = "adding a monitor may break quorum (until that monitor starts)"; r = -EBUSY; goto out; } rs = "adding another mon that is not yet online will not break quorum"; r = 0; } else if (prefix == "mon ok-to-rm") { string id; if (!cmd_getval(g_ceph_context, cmdmap, "id", id)) { r = -EINVAL; rs = "must specify a monitor id"; goto out; } if (!monmap->contains(id)) { r = 0; rs = "mon." + id + " does not exist"; goto out; } int rank = monmap->get_rank(id); if (quorum.count(rank) && quorum.size() - 1 < monmap->min_quorum_size(monmap->size() - 1)) { r = -EBUSY; rs = "removing mon." + id + " would break quorum"; goto out; } r = 0; rs = "safe to remove mon." + id; } else if (prefix == "mon_status") { get_mon_status(f.get(), ds); if (f) f->flush(ds); rdata.append(ds); rs = ""; r = 0; } else if (prefix == "sync force" || prefix == "mon sync force") { bool validate1 = false; cmd_getval(g_ceph_context, cmdmap, "yes_i_really_mean_it", validate1); bool validate2 = false; cmd_getval(g_ceph_context, cmdmap, "i_know_what_i_am_doing", validate2); if (!validate1 || !validate2) { r = -EINVAL; rs = "are you SURE? this will mean the monitor store will be " "erased. pass '--yes-i-really-mean-it " "--i-know-what-i-am-doing' if you really do."; goto out; } sync_force(f.get(), ds); rs = ds.str(); r = 0; } else if (prefix == "heap") { if (!ceph_using_tcmalloc()) rs = "tcmalloc not enabled, can't use heap profiler commands\n"; else { string heapcmd; cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd); // XXX 1-element vector, change at callee or make vector here? vector heapcmd_vec; get_str_vec(heapcmd, heapcmd_vec); string value; if (cmd_getval(g_ceph_context, cmdmap, "value", value)) heapcmd_vec.push_back(value); ceph_heap_profiler_handle_command(heapcmd_vec, ds); rdata.append(ds); rs = ""; r = 0; } } else if (prefix == "quorum") { string quorumcmd; cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd); if (quorumcmd == "exit") { start_election(); elector.stop_participating(); rs = "stopped responding to quorum, initiated new election"; r = 0; } else if (quorumcmd == "enter") { elector.start_participating(); start_election(); rs = "started responding to quorum, initiated new election"; r = 0; } else { rs = "needs a valid 'quorum' command"; r = -EINVAL; } } else if (prefix == "version") { if (f) { f->open_object_section("version"); f->dump_string("version", pretty_version_to_str()); f->close_section(); f->flush(ds); } else { ds << pretty_version_to_str(); } rdata.append(ds); rs = ""; r = 0; } else if (prefix == "versions") { if (!f) f.reset(Formatter::create("json-pretty")); map overall; f->open_object_section("version"); map mon, mgr, osd, mds; count_metadata("ceph_version", &mon); f->open_object_section("mon"); for (auto& p : mon) { f->dump_int(p.first.c_str(), p.second); overall[p.first] += p.second; } f->close_section(); mgrmon()->count_metadata("ceph_version", &mgr); f->open_object_section("mgr"); for (auto& p : mgr) { f->dump_int(p.first.c_str(), p.second); overall[p.first] += p.second; } f->close_section(); osdmon()->count_metadata("ceph_version", &osd); f->open_object_section("osd"); for (auto& p : osd) { f->dump_int(p.first.c_str(), p.second); overall[p.first] += p.second; } f->close_section(); mdsmon()->count_metadata("ceph_version", &mds); f->open_object_section("mds"); for (auto& p : mds) { f->dump_int(p.first.c_str(), p.second); overall[p.first] += p.second; } f->close_section(); for (auto& p : mgrstatmon()->get_service_map().services) { auto &service = p.first; if (ServiceMap::is_normal_ceph_entity(service)) { continue; } f->open_object_section(service.c_str()); map m; p.second.count_metadata("ceph_version", &m); for (auto& q : m) { f->dump_int(q.first.c_str(), q.second); overall[q.first] += q.second; } f->close_section(); } f->open_object_section("overall"); for (auto& p : overall) { f->dump_int(p.first.c_str(), p.second); } f->close_section(); f->close_section(); f->flush(rdata); rs = ""; r = 0; } else if (prefix == "smart") { string want_devid; cmd_getval(cct, cmdmap, "devid", want_devid); string devname = store->get_devname(); set devnames; get_raw_devices(devname, &devnames); json_spirit::mObject json_map; uint64_t smart_timeout = cct->_conf.get_val( "mon_smart_report_timeout"); for (auto& devname : devnames) { string err; string devid = get_device_id(devname, &err); if (want_devid.size() && want_devid != devid) { derr << "get_device_id failed on " << devname << ": " << err << dendl; continue; } json_spirit::mValue smart_json; if (block_device_get_metrics(devname, smart_timeout, &smart_json)) { dout(10) << "block_device_get_metrics failed for /dev/" << devname << dendl; continue; } json_map[devid] = smart_json; } ostringstream ss; json_spirit::write(json_map, ss, json_spirit::pretty_print); rdata.append(ss.str()); r = 0; rs = ""; } out: if (!m->get_source().is_mon()) // don't reply to mon->mon commands reply_command(op, r, rs, rdata, 0); } void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version) { bufferlist rdata; reply_command(op, rc, rs, rdata, version); } void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, bufferlist& rdata, version_t version) { MMonCommand *m = static_cast(op->get_req()); ceph_assert(m->get_type() == MSG_MON_COMMAND); MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version); reply->set_tid(m->get_tid()); reply->set_data(rdata); send_reply(op, reply); } // ------------------------ // request/reply routing // // a client/mds/osd will connect to a random monitor. we need to forward any // messages requiring state updates to the leader, and then route any replies // back via the correct monitor and back to them. (the monitor will not // initiate any connections.) void Monitor::forward_request_leader(MonOpRequestRef op) { op->mark_event(__func__); int mon = get_leader(); MonSession *session = op->get_session(); PaxosServiceMessage *req = op->get_req(); if (req->get_source().is_mon() && req->get_source_addrs() != messenger->get_myaddrs()) { dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl; } else if (session->proxy_con) { dout(10) << "forward_request won't double fwd request " << *req << dendl; } else if (!session->closed) { RoutedRequest *rr = new RoutedRequest; rr->tid = ++routed_request_tid; rr->con = req->get_connection(); rr->con_features = rr->con->get_features(); encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features rr->session = static_cast(session->get()); rr->op = op; routed_requests[rr->tid] = rr; session->routed_request_tids.insert(rr->tid); dout(10) << "forward_request " << rr->tid << " request " << *req << " features " << rr->con_features << dendl; MForward *forward = new MForward(rr->tid, req, rr->con_features, rr->session->caps); forward->set_priority(req->get_priority()); if (session->auth_handler) { forward->entity_name = session->entity_name; } else if (req->get_source().is_mon()) { forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON); } send_mon_message(forward, mon); op->mark_forwarded(); ceph_assert(op->get_req()->get_type() != 0); } else { dout(10) << "forward_request no session for request " << *req << dendl; } } // fake connection attached to forwarded messages struct AnonConnection : public Connection { entity_addr_t socket_addr; explicit AnonConnection(CephContext *cct, const entity_addr_t& sa) : Connection(cct, NULL), socket_addr(sa) {} int send_message(Message *m) override { ceph_assert(!"send_message on anonymous connection"); } void send_keepalive() override { ceph_assert(!"send_keepalive on anonymous connection"); } void mark_down() override { // silently ignore } void mark_disposable() override { // silengtly ignore } bool is_connected() override { return false; } entity_addr_t get_peer_socket_addr() const override { return socket_addr; } }; //extract the original message and put it into the regular dispatch function void Monitor::handle_forward(MonOpRequestRef op) { MForward *m = static_cast(op->get_req()); dout(10) << "received forwarded message from " << ceph_entity_type_name(m->client_type) << " " << m->client_addrs << " via " << m->get_source_inst() << dendl; MonSession *session = op->get_session(); ceph_assert(session); if (!session->is_capable("mon", MON_CAP_X)) { dout(0) << "forward from entity with insufficient caps! " << session->caps << dendl; } else { // see PaxosService::dispatch(); we rely on this being anon // (c->msgr == NULL) PaxosServiceMessage *req = m->claim_message(); ceph_assert(req != NULL); ConnectionRef c(new AnonConnection(cct, m->client_socket_addr)); MonSession *s = new MonSession(static_cast(c.get())); s->_ident(req->get_source(), req->get_source_addrs()); c->set_priv(RefCountedPtr{s, false}); c->set_peer_addrs(m->client_addrs); c->set_peer_type(m->client_type); c->set_features(m->con_features); s->authenticated = true; s->caps = m->client_caps; dout(10) << " caps are " << s->caps << dendl; s->entity_name = m->entity_name; dout(10) << " entity name '" << s->entity_name << "' type " << s->entity_name.get_type() << dendl; s->proxy_con = m->get_connection(); s->proxy_tid = m->tid; req->set_connection(c); // not super accurate, but better than nothing. req->set_recv_stamp(m->get_recv_stamp()); /* * note which election epoch this is; we will drop the message if * there is a future election since our peers will resend routed * requests in that case. */ req->rx_election_epoch = get_epoch(); dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl; _ms_dispatch(req); // break the session <-> con ref loop by removing the con->session // reference, which is no longer needed once the MonOpRequest is // set up. c->set_priv(NULL); } } void Monitor::send_reply(MonOpRequestRef op, Message *reply) { op->mark_event(__func__); MonSession *session = op->get_session(); ceph_assert(session); Message *req = op->get_req(); ConnectionRef con = op->get_connection(); reply->set_cct(g_ceph_context); dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl; if (!con) { dout(2) << "send_reply no connection, dropping reply " << *reply << " to " << req << " " << *req << dendl; reply->put(); op->mark_event("reply: no connection"); return; } if (!session->con && !session->proxy_con) { dout(2) << "send_reply no connection, dropping reply " << *reply << " to " << req << " " << *req << dendl; reply->put(); op->mark_event("reply: no connection"); return; } if (session->proxy_con) { dout(15) << "send_reply routing reply to " << con->get_peer_addr() << " via " << session->proxy_con->get_peer_addr() << " for request " << *req << dendl; session->proxy_con->send_message(new MRoute(session->proxy_tid, reply)); op->mark_event("reply: send routed request"); } else { session->con->send_message(reply); op->mark_event("reply: send"); } } void Monitor::no_reply(MonOpRequestRef op) { MonSession *session = op->get_session(); Message *req = op->get_req(); if (session->proxy_con) { dout(10) << "no_reply to " << req->get_source_inst() << " via " << session->proxy_con->get_peer_addr() << " for request " << *req << dendl; session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL)); op->mark_event("no_reply: send routed request"); } else { dout(10) << "no_reply to " << req->get_source_inst() << " " << *req << dendl; op->mark_event("no_reply"); } } void Monitor::handle_route(MonOpRequestRef op) { MRoute *m = static_cast(op->get_req()); MonSession *session = op->get_session(); //check privileges if (!session->is_capable("mon", MON_CAP_X)) { dout(0) << "MRoute received from entity without appropriate perms! " << dendl; return; } if (m->msg) dout(10) << "handle_route tid " << m->session_mon_tid << " " << *m->msg << dendl; else dout(10) << "handle_route tid " << m->session_mon_tid << " null" << dendl; // look it up if (m->session_mon_tid) { if (routed_requests.count(m->session_mon_tid)) { RoutedRequest *rr = routed_requests[m->session_mon_tid]; // reset payload, in case encoding is dependent on target features if (m->msg) { m->msg->clear_payload(); rr->con->send_message(m->msg); m->msg = NULL; } if (m->send_osdmap_first) { dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl; osdmon()->send_incremental(m->send_osdmap_first, rr->session, true, MonOpRequestRef()); } ceph_assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid)); routed_requests.erase(m->session_mon_tid); rr->session->routed_request_tids.erase(m->session_mon_tid); delete rr; } else { dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl; } } else { dout(10) << " not a routed request, ignoring" << dendl; } } void Monitor::resend_routed_requests() { dout(10) << "resend_routed_requests" << dendl; int mon = get_leader(); list retry; for (map::iterator p = routed_requests.begin(); p != routed_requests.end(); ++p) { RoutedRequest *rr = p->second; if (mon == rank) { dout(10) << " requeue for self tid " << rr->tid << dendl; rr->op->mark_event("retry routed request"); retry.push_back(new C_RetryMessage(this, rr->op)); if (rr->session) { ceph_assert(rr->session->routed_request_tids.count(p->first)); rr->session->routed_request_tids.erase(p->first); } delete rr; } else { auto q = rr->request_bl.cbegin(); PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q); rr->op->mark_event("resend forwarded message to leader"); dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl; MForward *forward = new MForward(rr->tid, req, rr->con_features, rr->session->caps); req->put(); // forward takes its own ref; drop ours. forward->client_type = rr->con->get_peer_type(); forward->client_addrs = rr->con->get_peer_addrs(); forward->client_socket_addr = rr->con->get_peer_socket_addr(); forward->set_priority(req->get_priority()); send_mon_message(forward, mon); } } if (mon == rank) { routed_requests.clear(); finish_contexts(g_ceph_context, retry); } } void Monitor::remove_session(MonSession *s) { dout(10) << "remove_session " << s << " " << s->name << " " << s->addrs << " features 0x" << std::hex << s->con_features << std::dec << dendl; ceph_assert(s->con); ceph_assert(!s->closed); for (set::iterator p = s->routed_request_tids.begin(); p != s->routed_request_tids.end(); ++p) { ceph_assert(routed_requests.count(*p)); RoutedRequest *rr = routed_requests[*p]; dout(10) << " dropping routed request " << rr->tid << dendl; delete rr; routed_requests.erase(*p); } s->routed_request_tids.clear(); s->con->set_priv(nullptr); session_map.remove_session(s); logger->set(l_mon_num_sessions, session_map.get_size()); logger->inc(l_mon_session_rm); } void Monitor::remove_all_sessions() { std::lock_guard l(session_map_lock); while (!session_map.sessions.empty()) { MonSession *s = session_map.sessions.front(); remove_session(s); logger->inc(l_mon_session_rm); } if (logger) logger->set(l_mon_num_sessions, session_map.get_size()); } void Monitor::send_mon_message(Message *m, int rank) { messenger->send_to_mon(m, monmap->get_addrs(rank)); } void Monitor::waitlist_or_zap_client(MonOpRequestRef op) { /** * Wait list the new session until we're in the quorum, assuming it's * sufficiently new. * tick() will periodically send them back through so we can send * the client elsewhere if we don't think we're getting back in. * * But we whitelist a few sorts of messages: * 1) Monitors can talk to us at any time, of course. * 2) auth messages. It's unlikely to go through much faster, but * it's possible we've just lost our quorum status and we want to take... * 3) command messages. We want to accept these under all possible * circumstances. */ Message *m = op->get_req(); MonSession *s = op->get_session(); ConnectionRef con = op->get_connection(); utime_t too_old = ceph_clock_now(); too_old -= g_ceph_context->_conf->mon_lease; if (m->get_recv_stamp() > too_old && con->is_connected()) { dout(5) << "waitlisting message " << *m << dendl; maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op)); op->mark_wait_for_quorum(); } else { dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl; con->mark_down(); // proxied sessions aren't registered and don't have a con; don't remove // those. if (!s->proxy_con) { std::lock_guard l(session_map_lock); remove_session(s); } op->mark_zap(); } } void Monitor::_ms_dispatch(Message *m) { if (is_shutdown()) { m->put(); return; } MonOpRequestRef op = op_tracker.create_request(m); bool src_is_mon = op->is_src_mon(); op->mark_event("mon:_ms_dispatch"); MonSession *s = op->get_session(); if (s && s->closed) { return; } if (src_is_mon && s) { ConnectionRef con = m->get_connection(); if (con->get_messenger() && con->get_features() != s->con_features) { // only update features if this is a non-anonymous connection dout(10) << __func__ << " feature change for " << m->get_source_inst() << " (was " << s->con_features << ", now " << con->get_features() << ")" << dendl; // connection features changed - recreate session. if (s->con && s->con != con) { dout(10) << __func__ << " connection for " << m->get_source_inst() << " changed from session; mark down and replace" << dendl; s->con->mark_down(); } if (s->item.is_on_list()) { // forwarded messages' sessions are not in the sessions map and // exist only while the op is being handled. std::lock_guard l(session_map_lock); remove_session(s); } s = nullptr; } } if (!s) { // if the sender is not a monitor, make sure their first message for a // session is an MAuth. If it is not, assume it's a stray message, // and considering that we are creating a new session it is safe to // assume that the sender hasn't authenticated yet, so we have no way // of assessing whether we should handle it or not. if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH && m->get_type() != CEPH_MSG_MON_GET_MAP && m->get_type() != CEPH_MSG_PING)) { dout(1) << __func__ << " dropping stray message " << *m << " from " << m->get_source_inst() << dendl; return; } ConnectionRef con = m->get_connection(); { std::lock_guard l(session_map_lock); s = session_map.new_session(m->get_source(), m->get_source_addrs(), con.get()); } ceph_assert(s); con->set_priv(RefCountedPtr{s, false}); dout(10) << __func__ << " new session " << s << " " << *s << " features 0x" << std::hex << s->con_features << std::dec << dendl; op->set_session(s); logger->set(l_mon_num_sessions, session_map.get_size()); logger->inc(l_mon_session_add); if (src_is_mon) { // give it monitor caps; the peer type has been authenticated dout(5) << __func__ << " setting monitor caps on this connection" << dendl; if (!s->caps.is_allow_all()) // but no need to repeatedly copy s->caps = mon_caps; s->authenticated = true; } } else { dout(20) << __func__ << " existing session " << s << " for " << s->name << dendl; } ceph_assert(s); s->session_timeout = ceph_clock_now(); s->session_timeout += g_conf()->mon_session_timeout; if (s->auth_handler) { s->entity_name = s->auth_handler->get_entity_name(); s->global_id = s->auth_handler->get_global_id(); s->global_id_status = s->auth_handler->get_global_id_status(); } dout(20) << " entity_name " << s->entity_name << " global_id " << s->global_id << " (" << s->global_id_status << ") caps " << s->caps.get_str() << dendl; if ((is_synchronizing() || (!s->authenticated && !exited_quorum.is_zero())) && !src_is_mon && m->get_type() != CEPH_MSG_PING) { waitlist_or_zap_client(op); } else { dispatch_op(op); } return; } void Monitor::dispatch_op(MonOpRequestRef op) { op->mark_event("mon:dispatch_op"); MonSession *s = op->get_session(); ceph_assert(s); if (s->closed) { dout(10) << " session closed, dropping " << op->get_req() << dendl; return; } /* we will consider the default type as being 'monitor' until proven wrong */ op->set_type_monitor(); /* deal with all messages that do not necessarily need caps */ switch (op->get_req()->get_type()) { // auth case MSG_MON_GLOBAL_ID: case CEPH_MSG_AUTH: op->set_type_service(); /* no need to check caps here */ paxos_service[PAXOS_AUTH]->dispatch(op); return; case CEPH_MSG_PING: handle_ping(op); return; } if (!op->get_session()->authenticated) { dout(5) << __func__ << " " << op->get_req()->get_source_inst() << " is not authenticated, dropping " << *(op->get_req()) << dendl; return; } // global_id_status == NONE: all sessions for auth_none and krb, // mon <-> mon sessions (including proxied sessions) for cephx ceph_assert(s->global_id_status == global_id_status_t::NONE || s->global_id_status == global_id_status_t::NEW_OK || s->global_id_status == global_id_status_t::NEW_NOT_EXPOSED || s->global_id_status == global_id_status_t::RECLAIM_OK || s->global_id_status == global_id_status_t::RECLAIM_INSECURE); // let mon_getmap through for "ping" (which doesn't reconnect) // and "tell" (which reconnects but doesn't attempt to preserve // its global_id and stays in NEW_NOT_EXPOSED, retrying until // ->send_attempts reaches 0) if (cct->_conf->auth_expose_insecure_global_id_reclaim && s->global_id_status == global_id_status_t::NEW_NOT_EXPOSED && op->get_req()->get_type() != CEPH_MSG_MON_GET_MAP) { dout(5) << __func__ << " " << op->get_req()->get_source_inst() << " may omit old_ticket on reconnects, discarding " << *op->get_req() << " and forcing reconnect" << dendl; ceph_assert(s->con && !s->proxy_con); s->con->mark_down(); { std::lock_guard l(session_map_lock); remove_session(s); } op->mark_zap(); return; } switch (op->get_req()->get_type()) { case CEPH_MSG_MON_GET_MAP: handle_mon_get_map(op); return; case MSG_GET_CONFIG: configmon()->handle_get_config(op); return; case CEPH_MSG_MON_METADATA: return handle_mon_metadata(op); case CEPH_MSG_MON_SUBSCRIBE: /* FIXME: check what's being subscribed, filter accordingly */ handle_subscribe(op); return; } /* well, maybe the op belongs to a service... */ op->set_type_service(); /* deal with all messages which caps should be checked somewhere else */ switch (op->get_req()->get_type()) { // OSDs case CEPH_MSG_MON_GET_OSDMAP: case CEPH_MSG_POOLOP: case MSG_OSD_BEACON: case MSG_OSD_MARK_ME_DOWN: case MSG_OSD_FULL: case MSG_OSD_FAILURE: case MSG_OSD_BOOT: case MSG_OSD_ALIVE: case MSG_OSD_PGTEMP: case MSG_OSD_PG_CREATED: case MSG_REMOVE_SNAPS: case MSG_OSD_PG_READY_TO_MERGE: paxos_service[PAXOS_OSDMAP]->dispatch(op); return; // MDSs case MSG_MDS_BEACON: case MSG_MDS_OFFLOAD_TARGETS: paxos_service[PAXOS_MDSMAP]->dispatch(op); return; // Mgrs case MSG_MGR_BEACON: paxos_service[PAXOS_MGR]->dispatch(op); return; // MgrStat case MSG_MON_MGR_REPORT: case CEPH_MSG_STATFS: case MSG_GETPOOLSTATS: paxos_service[PAXOS_MGRSTAT]->dispatch(op); return; // log case MSG_LOG: paxos_service[PAXOS_LOG]->dispatch(op); return; // handle_command() does its own caps checking case MSG_MON_COMMAND: op->set_type_command(); handle_command(op); return; } /* nop, looks like it's not a service message; revert back to monitor */ op->set_type_monitor(); /* messages we, the Monitor class, need to deal with * but may be sent by clients. */ if (!op->get_session()->is_capable("mon", MON_CAP_R)) { dout(5) << __func__ << " " << op->get_req()->get_source_inst() << " not enough caps for " << *(op->get_req()) << " -- dropping" << dendl; return; } switch (op->get_req()->get_type()) { // misc case CEPH_MSG_MON_GET_VERSION: handle_get_version(op); return; } if (!op->is_src_mon()) { dout(1) << __func__ << " unexpected monitor message from" << " non-monitor entity " << op->get_req()->get_source_inst() << " " << *(op->get_req()) << " -- dropping" << dendl; return; } /* messages that should only be sent by another monitor */ switch (op->get_req()->get_type()) { case MSG_ROUTE: handle_route(op); return; case MSG_MON_PROBE: handle_probe(op); return; // Sync (i.e., the new slurp, but on steroids) case MSG_MON_SYNC: handle_sync(op); return; case MSG_MON_SCRUB: handle_scrub(op); return; /* log acks are sent from a monitor we sent the MLog to, and are never sent by clients to us. */ case MSG_LOGACK: log_client.handle_log_ack((MLogAck*)op->get_req()); return; // monmap case MSG_MON_JOIN: op->set_type_service(); paxos_service[PAXOS_MONMAP]->dispatch(op); return; // paxos case MSG_MON_PAXOS: { op->set_type_paxos(); MMonPaxos *pm = static_cast(op->get_req()); if (!op->get_session()->is_capable("mon", MON_CAP_X)) { //can't send these! return; } if (state == STATE_SYNCHRONIZING) { // we are synchronizing. These messages would do us no // good, thus just drop them and ignore them. dout(10) << __func__ << " ignore paxos msg from " << pm->get_source_inst() << dendl; return; } // sanitize if (pm->epoch > get_epoch()) { bootstrap(); return; } if (pm->epoch != get_epoch()) { return; } paxos->dispatch(op); } return; // elector messages case MSG_MON_ELECTION: op->set_type_election(); //check privileges here for simplicity if (!op->get_session()->is_capable("mon", MON_CAP_X)) { dout(0) << "MMonElection received from entity without enough caps!" << op->get_session()->caps << dendl; return;; } if (!is_probing() && !is_synchronizing()) { elector.dispatch(op); } return; case MSG_FORWARD: handle_forward(op); return; case MSG_TIMECHECK: dout(5) << __func__ << " ignoring " << op << dendl; return; case MSG_TIMECHECK2: handle_timecheck(op); return; case MSG_MON_HEALTH: dout(5) << __func__ << " dropping deprecated message: " << *op->get_req() << dendl; break; case MSG_MON_HEALTH_CHECKS: op->set_type_service(); paxos_service[PAXOS_HEALTH]->dispatch(op); return; } dout(1) << "dropping unexpected " << *(op->get_req()) << dendl; return; } void Monitor::handle_ping(MonOpRequestRef op) { MPing *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; MPing *reply = new MPing; bufferlist payload; boost::scoped_ptr f(new JSONFormatter(true)); f->open_object_section("pong"); get_health_status(false, f.get(), nullptr); { stringstream ss; get_mon_status(f.get(), ss); } f->close_section(); stringstream ss; f->flush(ss); encode(ss.str(), payload); reply->set_payload(payload); dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl; m->get_connection()->send_message(reply); } void Monitor::timecheck_start() { dout(10) << __func__ << dendl; timecheck_cleanup(); if (get_quorum_mon_features().contains_all( ceph::features::mon::FEATURE_NAUTILUS)) { timecheck_start_round(); } } void Monitor::timecheck_finish() { dout(10) << __func__ << dendl; timecheck_cleanup(); } void Monitor::timecheck_start_round() { dout(10) << __func__ << " curr " << timecheck_round << dendl; ceph_assert(is_leader()); if (monmap->size() == 1) { ceph_abort_msg("We are alone; this shouldn't have been scheduled!"); return; } if (timecheck_round % 2) { dout(10) << __func__ << " there's a timecheck going on" << dendl; utime_t curr_time = ceph_clock_now(); double max = g_conf()->mon_timecheck_interval*3; if (curr_time - timecheck_round_start < max) { dout(10) << __func__ << " keep current round going" << dendl; goto out; } else { dout(10) << __func__ << " finish current timecheck and start new" << dendl; timecheck_cancel_round(); } } ceph_assert(timecheck_round % 2 == 0); timecheck_acks = 0; timecheck_round ++; timecheck_round_start = ceph_clock_now(); dout(10) << __func__ << " new " << timecheck_round << dendl; timecheck(); out: dout(10) << __func__ << " setting up next event" << dendl; timecheck_reset_event(); } void Monitor::timecheck_finish_round(bool success) { dout(10) << __func__ << " curr " << timecheck_round << dendl; ceph_assert(timecheck_round % 2); timecheck_round ++; timecheck_round_start = utime_t(); if (success) { ceph_assert(timecheck_waiting.empty()); ceph_assert(timecheck_acks == quorum.size()); timecheck_report(); timecheck_check_skews(); return; } dout(10) << __func__ << " " << timecheck_waiting.size() << " peers still waiting:"; for (auto& p : timecheck_waiting) { *_dout << " mon." << p.first; } *_dout << dendl; timecheck_waiting.clear(); dout(10) << __func__ << " finished to " << timecheck_round << dendl; } void Monitor::timecheck_cancel_round() { timecheck_finish_round(false); } void Monitor::timecheck_cleanup() { timecheck_round = 0; timecheck_acks = 0; timecheck_round_start = utime_t(); if (timecheck_event) { timer.cancel_event(timecheck_event); timecheck_event = NULL; } timecheck_waiting.clear(); timecheck_skews.clear(); timecheck_latencies.clear(); timecheck_rounds_since_clean = 0; } void Monitor::timecheck_reset_event() { if (timecheck_event) { timer.cancel_event(timecheck_event); timecheck_event = NULL; } double delay = cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean; if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) { delay = cct->_conf->mon_timecheck_interval; } dout(10) << __func__ << " delay " << delay << " rounds_since_clean " << timecheck_rounds_since_clean << dendl; timecheck_event = timer.add_event_after( delay, new C_MonContext(this, [this](int) { timecheck_start_round(); })); } void Monitor::timecheck_check_skews() { dout(10) << __func__ << dendl; ceph_assert(is_leader()); ceph_assert((timecheck_round % 2) == 0); if (monmap->size() == 1) { ceph_abort_msg("We are alone; we shouldn't have gotten here!"); return; } ceph_assert(timecheck_latencies.size() == timecheck_skews.size()); bool found_skew = false; for (auto& p : timecheck_skews) { double abs_skew; if (timecheck_has_skew(p.second, &abs_skew)) { dout(10) << __func__ << " " << p.first << " skew " << abs_skew << dendl; found_skew = true; } } if (found_skew) { ++timecheck_rounds_since_clean; timecheck_reset_event(); } else if (timecheck_rounds_since_clean > 0) { dout(1) << __func__ << " no clock skews found after " << timecheck_rounds_since_clean << " rounds" << dendl; // make sure the skews are really gone and not just a transient success // this will run just once if not in the presence of skews again. timecheck_rounds_since_clean = 1; timecheck_reset_event(); timecheck_rounds_since_clean = 0; } } void Monitor::timecheck_report() { dout(10) << __func__ << dendl; ceph_assert(is_leader()); ceph_assert((timecheck_round % 2) == 0); if (monmap->size() == 1) { ceph_abort_msg("We are alone; we shouldn't have gotten here!"); return; } ceph_assert(timecheck_latencies.size() == timecheck_skews.size()); bool do_output = true; // only output report once for (set::iterator q = quorum.begin(); q != quorum.end(); ++q) { if (monmap->get_name(*q) == name) continue; MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_REPORT); m->epoch = get_epoch(); m->round = timecheck_round; for (auto& it : timecheck_skews) { double skew = it.second; double latency = timecheck_latencies[it.first]; m->skews[it.first] = skew; m->latencies[it.first] = latency; if (do_output) { dout(25) << __func__ << " mon." << it.first << " latency " << latency << " skew " << skew << dendl; } } do_output = false; dout(10) << __func__ << " send report to mon." << *q << dendl; send_mon_message(m, *q); } } void Monitor::timecheck() { dout(10) << __func__ << dendl; ceph_assert(is_leader()); if (monmap->size() == 1) { ceph_abort_msg("We are alone; we shouldn't have gotten here!"); return; } ceph_assert(timecheck_round % 2 != 0); timecheck_acks = 1; // we ack ourselves dout(10) << __func__ << " start timecheck epoch " << get_epoch() << " round " << timecheck_round << dendl; // we are at the eye of the storm; the point of reference timecheck_skews[rank] = 0.0; timecheck_latencies[rank] = 0.0; for (set::iterator it = quorum.begin(); it != quorum.end(); ++it) { if (monmap->get_name(*it) == name) continue; utime_t curr_time = ceph_clock_now(); timecheck_waiting[*it] = curr_time; MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_PING); m->epoch = get_epoch(); m->round = timecheck_round; dout(10) << __func__ << " send " << *m << " to mon." << *it << dendl; send_mon_message(m, *it); } } health_status_t Monitor::timecheck_status(ostringstream &ss, const double skew_bound, const double latency) { health_status_t status = HEALTH_OK; ceph_assert(latency >= 0); double abs_skew; if (timecheck_has_skew(skew_bound, &abs_skew)) { status = HEALTH_WARN; ss << "clock skew " << abs_skew << "s" << " > max " << g_conf()->mon_clock_drift_allowed << "s"; } return status; } void Monitor::handle_timecheck_leader(MonOpRequestRef op) { MTimeCheck2 *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; /* handles PONG's */ ceph_assert(m->op == MTimeCheck2::OP_PONG); int other = m->get_source().num(); if (m->epoch < get_epoch()) { dout(1) << __func__ << " got old timecheck epoch " << m->epoch << " from " << other << " curr " << get_epoch() << " -- severely lagged? discard" << dendl; return; } ceph_assert(m->epoch == get_epoch()); if (m->round < timecheck_round) { dout(1) << __func__ << " got old round " << m->round << " from " << other << " curr " << timecheck_round << " -- discard" << dendl; return; } utime_t curr_time = ceph_clock_now(); ceph_assert(timecheck_waiting.count(other) > 0); utime_t timecheck_sent = timecheck_waiting[other]; timecheck_waiting.erase(other); if (curr_time < timecheck_sent) { // our clock was readjusted -- drop everything until it all makes sense. dout(1) << __func__ << " our clock was readjusted --" << " bump round and drop current check" << dendl; timecheck_cancel_round(); return; } /* update peer latencies */ double latency = (double)(curr_time - timecheck_sent); if (timecheck_latencies.count(other) == 0) timecheck_latencies[other] = latency; else { double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2)); timecheck_latencies[other] = avg_latency; } /* * update skews * * some nasty thing goes on if we were to do 'a - b' between two utime_t, * and 'a' happens to be lower than 'b'; so we use double instead. * * latency is always expected to be >= 0. * * delta, the difference between theirs timestamp and ours, may either be * lower or higher than 0; will hardly ever be 0. * * The absolute skew is the absolute delta minus the latency, which is * taken as a whole instead of an rtt given that there is some queueing * and dispatch times involved and it's hard to assess how long exactly * it took for the message to travel to the other side and be handled. So * we call it a bounded skew, the worst case scenario. * * Now, to math! * * Given that the latency is always positive, we can establish that the * bounded skew will be: * * 1. positive if the absolute delta is higher than the latency and * delta is positive * 2. negative if the absolute delta is higher than the latency and * delta is negative. * 3. zero if the absolute delta is lower than the latency. * * On 3. we make a judgement call and treat the skew as non-existent. * This is because that, if the absolute delta is lower than the * latency, then the apparently existing skew is nothing more than a * side-effect of the high latency at work. * * This may not be entirely true though, as a severely skewed clock * may be masked by an even higher latency, but with high latencies * we probably have worse issues to deal with than just skewed clocks. */ ceph_assert(latency >= 0); double delta = ((double) m->timestamp) - ((double) curr_time); double abs_delta = (delta > 0 ? delta : -delta); double skew_bound = abs_delta - latency; if (skew_bound < 0) skew_bound = 0; else if (delta < 0) skew_bound = -skew_bound; ostringstream ss; health_status_t status = timecheck_status(ss, skew_bound, latency); if (status != HEALTH_OK) { clog->health(status) << other << " " << ss.str(); } dout(10) << __func__ << " from " << other << " ts " << m->timestamp << " delta " << delta << " skew_bound " << skew_bound << " latency " << latency << dendl; timecheck_skews[other] = skew_bound; timecheck_acks++; if (timecheck_acks == quorum.size()) { dout(10) << __func__ << " got pongs from everybody (" << timecheck_acks << " total)" << dendl; ceph_assert(timecheck_skews.size() == timecheck_acks); ceph_assert(timecheck_waiting.empty()); // everyone has acked, so bump the round to finish it. timecheck_finish_round(); } } void Monitor::handle_timecheck_peon(MonOpRequestRef op) { MTimeCheck2 *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; ceph_assert(is_peon()); ceph_assert(m->op == MTimeCheck2::OP_PING || m->op == MTimeCheck2::OP_REPORT); if (m->epoch != get_epoch()) { dout(1) << __func__ << " got wrong epoch " << "(ours " << get_epoch() << " theirs: " << m->epoch << ") -- discarding" << dendl; return; } if (m->round < timecheck_round) { dout(1) << __func__ << " got old round " << m->round << " current " << timecheck_round << " (epoch " << get_epoch() << ") -- discarding" << dendl; return; } timecheck_round = m->round; if (m->op == MTimeCheck2::OP_REPORT) { ceph_assert((timecheck_round % 2) == 0); timecheck_latencies.swap(m->latencies); timecheck_skews.swap(m->skews); return; } ceph_assert((timecheck_round % 2) != 0); MTimeCheck2 *reply = new MTimeCheck2(MTimeCheck2::OP_PONG); utime_t curr_time = ceph_clock_now(); reply->timestamp = curr_time; reply->epoch = m->epoch; reply->round = m->round; dout(10) << __func__ << " send " << *m << " to " << m->get_source_inst() << dendl; m->get_connection()->send_message(reply); } void Monitor::handle_timecheck(MonOpRequestRef op) { MTimeCheck2 *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; if (is_leader()) { if (m->op != MTimeCheck2::OP_PONG) { dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl; } else { handle_timecheck_leader(op); } } else if (is_peon()) { if (m->op != MTimeCheck2::OP_PING && m->op != MTimeCheck2::OP_REPORT) { dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl; } else { handle_timecheck_peon(op); } } else { dout(1) << __func__ << " drop unexpected msg" << dendl; } } void Monitor::handle_subscribe(MonOpRequestRef op) { MMonSubscribe *m = static_cast(op->get_req()); dout(10) << "handle_subscribe " << *m << dendl; bool reply = false; MonSession *s = op->get_session(); ceph_assert(s); if (m->hostname.size()) { s->remote_host = m->hostname; } for (map::iterator p = m->what.begin(); p != m->what.end(); ++p) { if (p->first == "monmap" || p->first == "config") { // these require no caps } else if (!s->is_capable("mon", MON_CAP_R)) { dout(5) << __func__ << " " << op->get_req()->get_source_inst() << " not enough caps for " << *(op->get_req()) << " -- dropping" << dendl; continue; } // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0) reply = true; // remove conflicting subscribes if (logmon()->sub_name_to_id(p->first) >= 0) { for (map::iterator it = s->sub_map.begin(); it != s->sub_map.end(); ) { if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) { std::lock_guard l(session_map_lock); session_map.remove_sub((it++)->second); } else { ++it; } } } { std::lock_guard l(session_map_lock); session_map.add_update_sub(s, p->first, p->second.start, p->second.flags & CEPH_SUBSCRIBE_ONETIME, m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP)); } if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) { dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl; if ((int)s->is_capable("mds", MON_CAP_R)) { Subscription *sub = s->sub_map[p->first]; ceph_assert(sub != nullptr); mdsmon()->check_sub(sub); } } else if (p->first == "osdmap") { if ((int)s->is_capable("osd", MON_CAP_R)) { if (s->osd_epoch > p->second.start) { // client needs earlier osdmaps on purpose, so reset the sent epoch s->osd_epoch = 0; } osdmon()->check_osdmap_sub(s->sub_map["osdmap"]); } } else if (p->first == "osd_pg_creates") { if ((int)s->is_capable("osd", MON_CAP_W)) { osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]); } } else if (p->first == "monmap") { monmon()->check_sub(s->sub_map[p->first]); } else if (logmon()->sub_name_to_id(p->first) >= 0) { logmon()->check_sub(s->sub_map[p->first]); } else if (p->first == "mgrmap" || p->first == "mgrdigest") { mgrmon()->check_sub(s->sub_map[p->first]); } else if (p->first == "servicemap") { mgrstatmon()->check_sub(s->sub_map[p->first]); } else if (p->first == "config") { configmon()->check_sub(s); } } if (reply) { // we only need to reply if the client is old enough to think it // has to send renewals. ConnectionRef con = m->get_connection(); if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) m->get_connection()->send_message(new MMonSubscribeAck( monmap->get_fsid(), (int)g_conf()->mon_subscribe_interval)); } } void Monitor::handle_get_version(MonOpRequestRef op) { MMonGetVersion *m = static_cast(op->get_req()); dout(10) << "handle_get_version " << *m << dendl; PaxosService *svc = NULL; MonSession *s = op->get_session(); ceph_assert(s); if (!is_leader() && !is_peon()) { dout(10) << " waiting for quorum" << dendl; waitfor_quorum.push_back(new C_RetryMessage(this, op)); goto out; } if (m->what == "mdsmap") { svc = mdsmon(); } else if (m->what == "fsmap") { svc = mdsmon(); } else if (m->what == "osdmap") { svc = osdmon(); } else if (m->what == "monmap") { svc = monmon(); } else { derr << "invalid map type " << m->what << dendl; } if (svc) { if (!svc->is_readable()) { svc->wait_for_readable(op, new C_RetryMessage(this, op)); goto out; } MMonGetVersionReply *reply = new MMonGetVersionReply(); reply->handle = m->handle; reply->version = svc->get_last_committed(); reply->oldest_version = svc->get_first_committed(); reply->set_tid(m->get_tid()); m->get_connection()->send_message(reply); } out: return; } bool Monitor::ms_handle_reset(Connection *con) { dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl; // ignore lossless monitor sessions if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) return false; auto priv = con->get_priv(); auto s = static_cast(priv.get()); if (!s) return false; // break any con <-> session ref cycle s->con->set_priv(nullptr); if (is_shutdown()) return false; std::lock_guard l(lock); dout(10) << "reset/close on session " << s->name << " " << s->addrs << dendl; if (!s->closed && s->item.is_on_list()) { std::lock_guard l(session_map_lock); remove_session(s); } return true; } bool Monitor::ms_handle_refused(Connection *con) { // just log for now... dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl; return false; } // ----- void Monitor::send_latest_monmap(Connection *con) { bufferlist bl; monmap->encode(bl, con->get_features()); con->send_message(new MMonMap(bl)); } void Monitor::handle_mon_get_map(MonOpRequestRef op) { MMonGetMap *m = static_cast(op->get_req()); dout(10) << "handle_mon_get_map" << dendl; send_latest_monmap(m->get_connection().get()); } void Monitor::handle_mon_metadata(MonOpRequestRef op) { MMonMetadata *m = static_cast(op->get_req()); if (is_leader()) { dout(10) << __func__ << dendl; update_mon_metadata(m->get_source().num(), std::move(m->data)); } } void Monitor::update_mon_metadata(int from, Metadata&& m) { // NOTE: this is now for legacy (kraken or jewel) mons only. pending_metadata[from] = std::move(m); MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); bufferlist bl; encode(pending_metadata, bl); t->put(MONITOR_STORE_PREFIX, "last_metadata", bl); paxos->trigger_propose(); } int Monitor::load_metadata() { bufferlist bl; int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl); if (r) return r; auto it = bl.cbegin(); decode(mon_metadata, it); pending_metadata = mon_metadata; return 0; } int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err) { ceph_assert(f); if (!mon_metadata.count(mon)) { err << "mon." << mon << " not found"; return -EINVAL; } const Metadata& m = mon_metadata[mon]; for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) { f->dump_string(p->first.c_str(), p->second); } return 0; } void Monitor::count_metadata(const string& field, map *out) { for (auto& p : mon_metadata) { auto q = p.second.find(field); if (q == p.second.end()) { (*out)["unknown"]++; } else { (*out)[q->second]++; } } } void Monitor::count_metadata(const string& field, Formatter *f) { map by_val; count_metadata(field, &by_val); f->open_object_section(field.c_str()); for (auto& p : by_val) { f->dump_int(p.first.c_str(), p.second); } f->close_section(); } int Monitor::print_nodes(Formatter *f, ostream& err) { map > mons; // hostname => mon for (map::iterator it = mon_metadata.begin(); it != mon_metadata.end(); ++it) { const Metadata& m = it->second; Metadata::const_iterator hostname = m.find("hostname"); if (hostname == m.end()) { // not likely though continue; } mons[hostname->second].push_back(monmap->get_name(it->first)); } dump_services(f, mons, "mon"); return 0; } // ---------------------------------------------- // scrub int Monitor::scrub_start() { dout(10) << __func__ << dendl; ceph_assert(is_leader()); if (!scrub_result.empty()) { clog->info() << "scrub already in progress"; return -EBUSY; } scrub_event_cancel(); scrub_result.clear(); scrub_state.reset(new ScrubState); scrub(); return 0; } int Monitor::scrub() { ceph_assert(is_leader()); ceph_assert(scrub_state); scrub_cancel_timeout(); wait_for_paxos_write(); scrub_version = paxos->get_version(); // scrub all keys if we're the only monitor in the quorum int32_t num_keys = (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys); for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) { if (*p == rank) continue; MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version, num_keys); r->key = scrub_state->last_key; send_mon_message(r, *p); } // scrub my keys bool r = _scrub(&scrub_result[rank], &scrub_state->last_key, &num_keys); scrub_state->finished = !r; // only after we got our scrub results do we really care whether the // other monitors are late on their results. Also, this way we avoid // triggering the timeout if we end up getting stuck in _scrub() for // longer than the duration of the timeout. scrub_reset_timeout(); if (quorum.size() == 1) { ceph_assert(scrub_state->finished == true); scrub_finish(); } return 0; } void Monitor::handle_scrub(MonOpRequestRef op) { MMonScrub *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; switch (m->op) { case MMonScrub::OP_SCRUB: { if (!is_peon()) break; wait_for_paxos_write(); if (m->version != paxos->get_version()) break; MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT, m->version, m->num_keys); reply->key = m->key; _scrub(&reply->result, &reply->key, &reply->num_keys); m->get_connection()->send_message(reply); } break; case MMonScrub::OP_RESULT: { if (!is_leader()) break; if (m->version != scrub_version) break; // reset the timeout each time we get a result scrub_reset_timeout(); int from = m->get_source().num(); ceph_assert(scrub_result.count(from) == 0); scrub_result[from] = m->result; if (scrub_result.size() == quorum.size()) { scrub_check_results(); scrub_result.clear(); if (scrub_state->finished) scrub_finish(); else scrub(); } } break; } } bool Monitor::_scrub(ScrubResult *r, pair *start, int *num_keys) { ceph_assert(r != NULL); ceph_assert(start != NULL); ceph_assert(num_keys != NULL); set prefixes = get_sync_targets_names(); prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc. dout(10) << __func__ << " start (" << *start << ")" << " num_keys " << *num_keys << dendl; MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes); int scrubbed_keys = 0; pair last_key; while (it->has_next_chunk()) { if (*num_keys > 0 && scrubbed_keys == *num_keys) break; pair k = it->get_next_key(); if (prefixes.count(k.first) == 0) continue; if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 && (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) { dout(10) << __func__ << " inject missing key, skipping (" << k << ")" << dendl; continue; } bufferlist bl; int err = store->get(k.first, k.second, bl); ceph_assert(err == 0); uint32_t key_crc = bl.crc32c(0); dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes" << " crc " << key_crc << dendl; r->prefix_keys[k.first]++; if (r->prefix_crc.count(k.first) == 0) { r->prefix_crc[k.first] = 0; } r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]); if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 && (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) { dout(10) << __func__ << " inject failure at (" << k << ")" << dendl; r->prefix_crc[k.first] += 1; } ++scrubbed_keys; last_key = k; } dout(20) << __func__ << " last_key (" << last_key << ")" << " scrubbed_keys " << scrubbed_keys << " has_next " << it->has_next_chunk() << dendl; *start = last_key; *num_keys = scrubbed_keys; return it->has_next_chunk(); } void Monitor::scrub_check_results() { dout(10) << __func__ << dendl; // compare int errors = 0; ScrubResult& mine = scrub_result[rank]; for (map::iterator p = scrub_result.begin(); p != scrub_result.end(); ++p) { if (p->first == rank) continue; if (p->second != mine) { ++errors; clog->error() << "scrub mismatch"; clog->error() << " mon." << rank << " " << mine; clog->error() << " mon." << p->first << " " << p->second; } } if (!errors) clog->debug() << "scrub ok on " << quorum << ": " << mine; } inline void Monitor::scrub_timeout() { dout(1) << __func__ << " restarting scrub" << dendl; scrub_reset(); scrub_start(); } void Monitor::scrub_finish() { dout(10) << __func__ << dendl; scrub_reset(); scrub_event_start(); } void Monitor::scrub_reset() { dout(10) << __func__ << dendl; scrub_cancel_timeout(); scrub_version = 0; scrub_result.clear(); scrub_state.reset(); } inline void Monitor::scrub_update_interval(int secs) { // we don't care about changes if we are not the leader. // changes will be visible if we become the leader. if (!is_leader()) return; dout(1) << __func__ << " new interval = " << secs << dendl; // if scrub already in progress, all changes will already be visible during // the next round. Nothing to do. if (scrub_state != NULL) return; scrub_event_cancel(); scrub_event_start(); } void Monitor::scrub_event_start() { dout(10) << __func__ << dendl; if (scrub_event) scrub_event_cancel(); if (cct->_conf->mon_scrub_interval <= 0) { dout(1) << __func__ << " scrub event is disabled" << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval << ")" << dendl; return; } scrub_event = timer.add_event_after( cct->_conf->mon_scrub_interval, new C_MonContext(this, [this](int) { scrub_start(); })); } void Monitor::scrub_event_cancel() { dout(10) << __func__ << dendl; if (scrub_event) { timer.cancel_event(scrub_event); scrub_event = NULL; } } inline void Monitor::scrub_cancel_timeout() { if (scrub_timeout_event) { timer.cancel_event(scrub_timeout_event); scrub_timeout_event = NULL; } } void Monitor::scrub_reset_timeout() { dout(15) << __func__ << " reset timeout event" << dendl; scrub_cancel_timeout(); scrub_timeout_event = timer.add_event_after( g_conf()->mon_scrub_timeout, new C_MonContext(this, [this](int) { scrub_timeout(); })); } /************ TICK ***************/ void Monitor::new_tick() { timer.add_event_after(g_conf()->mon_tick_interval, new C_MonContext(this, [this](int) { tick(); })); } void Monitor::tick() { // ok go. dout(11) << "tick" << dendl; const utime_t now = ceph_clock_now(); // Check if we need to emit any delayed health check updated messages if (is_leader()) { const auto min_period = g_conf().get_val( "mon_health_log_update_period"); for (auto& svc : paxos_service) { auto health = svc->get_health_checks(); for (const auto &i : health.checks) { const std::string &code = i.first; const std::string &summary = i.second.summary; const health_status_t severity = i.second.severity; auto status_iter = health_check_log_times.find(code); if (status_iter == health_check_log_times.end()) { continue; } auto &log_status = status_iter->second; bool const changed = log_status.last_message != summary || log_status.severity != severity; if (changed && now - log_status.updated_at > min_period) { log_status.last_message = summary; log_status.updated_at = now; log_status.severity = severity; ostringstream ss; ss << "Health check update: " << summary << " (" << code << ")"; clog->health(severity) << ss.str(); } } } } for (auto& svc : paxos_service) { svc->tick(); svc->maybe_trim(); } // trim sessions { std::lock_guard l(session_map_lock); auto p = session_map.sessions.begin(); bool out_for_too_long = (!exited_quorum.is_zero() && now > (exited_quorum + 2*g_conf()->mon_lease)); while (!p.end()) { MonSession *s = *p; ++p; // don't trim monitors if (s->name.is_mon()) continue; if (s->session_timeout < now && s->con) { // check keepalive, too s->session_timeout = s->con->get_last_keepalive(); s->session_timeout += g_conf()->mon_session_timeout; } if (s->session_timeout < now) { dout(10) << " trimming session " << s->con << " " << s->name << " " << s->addrs << " (timeout " << s->session_timeout << " < now " << now << ")" << dendl; } else if (out_for_too_long) { // boot the client Session because we've taken too long getting back in dout(10) << " trimming session " << s->con << " " << s->name << " because we've been out of quorum too long" << dendl; } else { continue; } s->con->mark_down(); remove_session(s); logger->inc(l_mon_session_trim); } } sync_trim_providers(); if (!maybe_wait_for_quorum.empty()) { finish_contexts(g_ceph_context, maybe_wait_for_quorum); } if (is_leader() && paxos->is_active() && fingerprint.is_zero()) { // this is only necessary on upgraded clusters. MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); prepare_new_fingerprint(t); paxos->trigger_propose(); } mgr_client.update_daemon_health(get_health_metrics()); new_tick(); } vector Monitor::get_health_metrics() { vector metrics; utime_t oldest_secs; const utime_t now = ceph_clock_now(); auto too_old = now; too_old -= g_conf().get_val("mon_op_complaint_time").count(); int slow = 0; TrackedOpRef oldest_op; auto count_slow_ops = [&](TrackedOp& op) { if (op.get_initiated() < too_old) { slow++; if (!oldest_op || op.get_initiated() < oldest_op->get_initiated()) { oldest_op = &op; } return true; } else { return false; } }; if (op_tracker.visit_ops_in_flight(&oldest_secs, count_slow_ops)) { if (slow) { derr << __func__ << " reporting " << slow << " slow ops, oldest is " << oldest_op->get_desc() << dendl; } metrics.emplace_back(daemon_metric::SLOW_OPS, slow, oldest_secs); } else { metrics.emplace_back(daemon_metric::SLOW_OPS, 0, 0); } return metrics; } void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t) { uuid_d nf; nf.generate_random(); dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl; bufferlist bl; encode(nf, bl); t->put(MONITOR_NAME, "cluster_fingerprint", bl); } int Monitor::check_fsid() { bufferlist ebl; int r = store->get(MONITOR_NAME, "cluster_uuid", ebl); if (r == -ENOENT) return r; ceph_assert(r == 0); string es(ebl.c_str(), ebl.length()); // only keep the first line size_t pos = es.find_first_of('\n'); if (pos != string::npos) es.resize(pos); dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl; uuid_d ondisk; if (!ondisk.parse(es.c_str())) { derr << "error: unable to parse uuid" << dendl; return -EINVAL; } if (monmap->get_fsid() != ondisk) { derr << "error: cluster_uuid file exists with value " << ondisk << ", != our uuid " << monmap->get_fsid() << dendl; return -EEXIST; } return 0; } int Monitor::write_fsid() { auto t(std::make_shared()); write_fsid(t); int r = store->apply_transaction(t); return r; } int Monitor::write_fsid(MonitorDBStore::TransactionRef t) { ostringstream ss; ss << monmap->get_fsid() << "\n"; string us = ss.str(); bufferlist b; b.append(us); t->put(MONITOR_NAME, "cluster_uuid", b); return 0; } /* * this is the closest thing to a traditional 'mkfs' for ceph. * initialize the monitor state machines to their initial values. */ int Monitor::mkfs(bufferlist& osdmapbl) { auto t(std::make_shared()); // verify cluster fsid int r = check_fsid(); if (r < 0 && r != -ENOENT) return r; bufferlist magicbl; magicbl.append(CEPH_MON_ONDISK_MAGIC); magicbl.append("\n"); t->put(MONITOR_NAME, "magic", magicbl); features = get_initial_supported_features(); write_features(t); // save monmap, osdmap, keyring. bufferlist monmapbl; monmap->encode(monmapbl, CEPH_FEATURES_ALL); monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos() t->put("mkfs", "monmap", monmapbl); if (osdmapbl.length()) { // make sure it's a valid osdmap try { OSDMap om; om.decode(osdmapbl); } catch (buffer::error& e) { derr << "error decoding provided osdmap: " << e.what() << dendl; return -EINVAL; } t->put("mkfs", "osdmap", osdmapbl); } if (is_keyring_required()) { KeyRing keyring; string keyring_filename; r = ceph_resolve_file_search(g_conf()->keyring, keyring_filename); if (r) { derr << "unable to find a keyring file on " << g_conf()->keyring << ": " << cpp_strerror(r) << dendl; if (g_conf()->key != "") { string keyring_plaintext = "[mon.]\n\tkey = " + g_conf()->key + "\n\tcaps mon = \"allow *\"\n"; bufferlist bl; bl.append(keyring_plaintext); try { auto i = bl.cbegin(); keyring.decode_plaintext(i); } catch (const buffer::error& e) { derr << "error decoding keyring " << keyring_plaintext << ": " << e.what() << dendl; return -EINVAL; } } else { return -ENOENT; } } else { r = keyring.load(g_ceph_context, keyring_filename); if (r < 0) { derr << "unable to load initial keyring " << g_conf()->keyring << dendl; return r; } } // put mon. key in external keyring; seed with everything else. extract_save_mon_key(keyring); bufferlist keyringbl; keyring.encode_plaintext(keyringbl); t->put("mkfs", "keyring", keyringbl); } write_fsid(t); store->apply_transaction(t); return 0; } int Monitor::write_default_keyring(bufferlist& bl) { ostringstream os; os << g_conf()->mon_data << "/keyring"; int err = 0; int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT|O_CLOEXEC, 0600); if (fd < 0) { err = -errno; dout(0) << __func__ << " failed to open " << os.str() << ": " << cpp_strerror(err) << dendl; return err; } err = bl.write_fd(fd); if (!err) ::fsync(fd); VOID_TEMP_FAILURE_RETRY(::close(fd)); return err; } void Monitor::extract_save_mon_key(KeyRing& keyring) { EntityName mon_name; mon_name.set_type(CEPH_ENTITY_TYPE_MON); EntityAuth mon_key; if (keyring.get_auth(mon_name, mon_key)) { dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl; KeyRing pkey; pkey.add(mon_name, mon_key); bufferlist bl; pkey.encode_plaintext(bl); write_default_keyring(bl); keyring.remove(mon_name); } } // AuthClient methods -- for mon <-> mon communication int Monitor::get_auth_request( Connection *con, AuthConnectionMeta *auth_meta, uint32_t *method, vector *preferred_modes, bufferlist *out) { std::scoped_lock l(auth_lock); if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON && con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) { return -EACCES; } AuthAuthorizer *auth; if (!ms_get_authorizer(con->get_peer_type(), &auth)) { return -EACCES; } auth_meta->authorizer.reset(auth); auth_registry.get_supported_modes(con->get_peer_type(), auth->protocol, preferred_modes); *method = auth->protocol; *out = auth->bl; return 0; } int Monitor::handle_auth_reply_more( Connection *con, AuthConnectionMeta *auth_meta, const bufferlist& bl, bufferlist *reply) { std::scoped_lock l(auth_lock); if (!auth_meta->authorizer) { derr << __func__ << " no authorizer?" << dendl; return -EACCES; } auth_meta->authorizer->add_challenge(cct, bl); *reply = auth_meta->authorizer->bl; return 0; } int Monitor::handle_auth_done( Connection *con, AuthConnectionMeta *auth_meta, uint64_t global_id, uint32_t con_mode, const bufferlist& bl, CryptoKey *session_key, std::string *connection_secret) { std::scoped_lock l(auth_lock); // verify authorizer reply auto p = bl.begin(); if (!auth_meta->authorizer->verify_reply(p, connection_secret)) { dout(0) << __func__ << " failed verifying authorizer reply" << dendl; return -EACCES; } auth_meta->session_key = auth_meta->authorizer->session_key; return 0; } int Monitor::handle_auth_bad_method( Connection *con, AuthConnectionMeta *auth_meta, uint32_t old_auth_method, int result, const std::vector& allowed_methods, const std::vector& allowed_modes) { derr << __func__ << " hmm, they didn't like " << old_auth_method << " result " << cpp_strerror(result) << dendl; return -EACCES; } bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer) { dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id) << dendl; if (is_shutdown()) return false; // we only connect to other monitors and mgr; every else connects to us. if (service_id != CEPH_ENTITY_TYPE_MON && service_id != CEPH_ENTITY_TYPE_MGR) return false; if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) { // auth_none dout(20) << __func__ << " building auth_none authorizer" << dendl; AuthNoneClientHandler handler{g_ceph_context}; handler.set_global_id(0); *authorizer = handler.build_authorizer(service_id); return true; } CephXServiceTicketInfo auth_ticket_info; CephXSessionAuthInfo info; int ret; EntityName name; name.set_type(CEPH_ENTITY_TYPE_MON); auth_ticket_info.ticket.name = name; auth_ticket_info.ticket.global_id = 0; if (service_id == CEPH_ENTITY_TYPE_MON) { // mon to mon authentication uses the private monitor shared key and not the // rotating key CryptoKey secret; if (!keyring.get_secret(name, secret) && !key_server.get_secret(name, secret)) { dout(0) << " couldn't get secret for mon service from keyring or keyserver" << dendl; stringstream ss, ds; int err = key_server.list_secrets(ds); if (err < 0) ss << "no installed auth entries!"; else ss << "installed auth entries:"; dout(0) << ss.str() << "\n" << ds.str() << dendl; return false; } ret = key_server.build_session_auth_info( service_id, auth_ticket_info.ticket, secret, (uint64_t)-1, info); if (ret < 0) { dout(0) << __func__ << " failed to build mon session_auth_info " << cpp_strerror(ret) << dendl; return false; } } else if (service_id == CEPH_ENTITY_TYPE_MGR) { // mgr ret = key_server.build_session_auth_info( service_id, auth_ticket_info.ticket, info); if (ret < 0) { derr << __func__ << " failed to build mgr service session_auth_info " << cpp_strerror(ret) << dendl; return false; } } else { ceph_abort(); // see check at top of fn } CephXTicketBlob blob; if (!cephx_build_service_ticket_blob(cct, info, blob)) { dout(0) << "ms_get_authorizer failed to build service ticket" << dendl; return false; } bufferlist ticket_data; encode(blob, ticket_data); auto iter = ticket_data.cbegin(); CephXTicketHandler handler(g_ceph_context, service_id); decode(handler.ticket, iter); handler.session_key = info.session_key; *authorizer = handler.build_authorizer(0); return true; } KeyStore *Monitor::ms_get_auth1_authorizer_keystore() { return &keyring; } int Monitor::handle_auth_request( Connection *con, AuthConnectionMeta *auth_meta, bool more, uint32_t auth_method, const bufferlist &payload, bufferlist *reply) { std::scoped_lock l(auth_lock); // NOTE: be careful, the Connection hasn't fully negotiated yet, so // e.g., peer_features, peer_addrs, and others are still unknown. dout(10) << __func__ << " con " << con << (more ? " (more)":" (start)") << " method " << auth_method << " payload " << payload.length() << dendl; if (!more) { auth_meta->auth_mode = payload[0]; } if (auth_meta->auth_mode >= AUTH_MODE_AUTHORIZER && auth_meta->auth_mode <= AUTH_MODE_AUTHORIZER_MAX) { AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(), auth_method); if (!ah) { lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method " << auth_method << dendl; return -EOPNOTSUPP; } bool was_challenge = (bool)auth_meta->authorizer_challenge; bool isvalid = ah->verify_authorizer( cct, &keyring, payload, auth_meta->get_connection_secret_length(), reply, &con->peer_name, &con->peer_global_id, &con->peer_caps_info, &auth_meta->session_key, &auth_meta->connection_secret, &auth_meta->authorizer_challenge); if (isvalid) { ms_handle_authentication(con); return 1; } if (!more && !was_challenge && auth_meta->authorizer_challenge) { return 0; } dout(10) << __func__ << " bad authorizer on " << con << dendl; return -EACCES; } else if (auth_meta->auth_mode < AUTH_MODE_MON || auth_meta->auth_mode > AUTH_MODE_MON_MAX) { derr << __func__ << " unrecognized auth mode " << auth_meta->auth_mode << dendl; return -EACCES; } // wait until we've formed an initial quorum on mkfs so that we have // the initial keys (e.g., client.admin). if (authmon()->get_last_committed() == 0) { dout(10) << __func__ << " haven't formed initial quorum, EBUSY" << dendl; return -EBUSY; } RefCountedPtr priv; MonSession *s; int32_t r = 0; auto p = payload.begin(); if (!more) { if (con->get_priv()) { return -EACCES; // wtf } // handler? unique_ptr auth_handler{get_auth_service_handler( auth_method, g_ceph_context, &key_server)}; if (!auth_handler) { dout(1) << __func__ << " auth_method " << auth_method << " not supported" << dendl; return -EOPNOTSUPP; } uint8_t mode; EntityName entity_name; try { decode(mode, p); if (mode < AUTH_MODE_MON || mode > AUTH_MODE_MON_MAX) { dout(1) << __func__ << " invalid mode " << (int)mode << dendl; return -EACCES; } assert(mode >= AUTH_MODE_MON && mode <= AUTH_MODE_MON_MAX); decode(entity_name, p); decode(con->peer_global_id, p); } catch (buffer::error& e) { dout(1) << __func__ << " failed to decode, " << e.what() << dendl; return -EACCES; } // supported method? if (entity_name.get_type() == CEPH_ENTITY_TYPE_MON || entity_name.get_type() == CEPH_ENTITY_TYPE_OSD || entity_name.get_type() == CEPH_ENTITY_TYPE_MDS || entity_name.get_type() == CEPH_ENTITY_TYPE_MGR) { if (!auth_cluster_required.is_supported_auth(auth_method)) { dout(10) << __func__ << " entity " << entity_name << " method " << auth_method << " not among supported " << auth_cluster_required.get_supported_set() << dendl; return -EOPNOTSUPP; } } else { if (!auth_service_required.is_supported_auth(auth_method)) { dout(10) << __func__ << " entity " << entity_name << " method " << auth_method << " not among supported " << auth_cluster_required.get_supported_set() << dendl; return -EOPNOTSUPP; } } // for msgr1 we would do some weirdness here to ensure signatures // are supported by the client if we require it. for msgr2 that // is not necessary. bool is_new_global_id = false; if (!con->peer_global_id) { con->peer_global_id = authmon()->_assign_global_id(); if (!con->peer_global_id) { dout(1) << __func__ << " failed to assign global_id" << dendl; return -EBUSY; } is_new_global_id = true; } // set up partial session s = new MonSession(con); s->auth_handler = auth_handler.release(); con->set_priv(RefCountedPtr{s, false}); r = s->auth_handler->start_session( entity_name, con->peer_global_id, is_new_global_id, reply, &con->peer_caps_info); } else { priv = con->get_priv(); if (!priv) { // this can happen if the async ms_handle_reset event races with // the unlocked call into handle_auth_request return -EACCES; } s = static_cast(priv.get()); r = s->auth_handler->handle_request( p, auth_meta->get_connection_secret_length(), reply, &con->peer_caps_info, &auth_meta->session_key, &auth_meta->connection_secret); } if (r > 0 && !s->authenticated) { ms_handle_authentication(con); } dout(30) << " r " << r << " reply:\n"; reply->hexdump(*_dout); *_dout << dendl; return r; } void Monitor::ms_handle_accept(Connection *con) { auto priv = con->get_priv(); MonSession *s = static_cast(priv.get()); if (!s) { // legacy protocol v1? dout(10) << __func__ << " con " << con << " no session" << dendl; return; } if (s->item.is_on_list()) { dout(10) << __func__ << " con " << con << " session " << s << " already on list" << dendl; } else { dout(10) << __func__ << " con " << con << " session " << s << " registering session for " << con->get_peer_addrs() << dendl; s->_ident(entity_name_t(con->get_peer_type(), con->get_peer_id()), con->get_peer_addrs()); std::lock_guard l(session_map_lock); session_map.add_session(s); } } int Monitor::ms_handle_authentication(Connection *con) { if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) { // mon <-> mon connections need no Session, and setting one up // creates an awkward ref cycle between Session and Connection. return 1; } auto priv = con->get_priv(); MonSession *s = static_cast(priv.get()); if (!s) { // must be msgr2, otherwise dispatch would have set up the session. s = session_map.new_session( entity_name_t(con->get_peer_type(), -1), // we don't know yet con->get_peer_addrs(), con); assert(s); dout(10) << __func__ << " adding session " << s << " to con " << con << dendl; con->set_priv(s); logger->set(l_mon_num_sessions, session_map.get_size()); logger->inc(l_mon_session_add); } dout(10) << __func__ << " session " << s << " con " << con << " addr " << s->con->get_peer_addr() << " " << *s << dendl; AuthCapsInfo &caps_info = con->get_peer_caps_info(); int ret = 0; if (caps_info.allow_all) { s->caps.set_allow_all(); s->authenticated = true; ret = 1; } if (caps_info.caps.length()) { bufferlist::const_iterator p = caps_info.caps.cbegin(); string str; try { decode(str, p); } catch (const buffer::error &err) { derr << __func__ << " corrupt cap data for " << con->get_peer_entity_name() << " in auth db" << dendl; str.clear(); ret = -EPERM; } if (ret >= 0) { if (s->caps.parse(str, NULL)) { s->authenticated = true; ret = 1; } else { derr << __func__ << " unparseable caps '" << str << "' for " << con->get_peer_entity_name() << dendl; ret = -EPERM; } } } return ret; }