From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/mon/Monitor.h | 1148 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1148 insertions(+) create mode 100644 src/mon/Monitor.h (limited to 'src/mon/Monitor.h') diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h new file mode 100644 index 000000000..1093649bb --- /dev/null +++ b/src/mon/Monitor.h @@ -0,0 +1,1148 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +/* + * This is the top level monitor. It runs on each machine in the Monitor + * Cluster. The election of a leader for the paxos algorithm only happens + * once per machine via the elector. There is a separate paxos instance (state) + * kept for each of the system components: Object Store Device (OSD) Monitor, + * Placement Group (PG) Monitor, Metadata Server (MDS) Monitor, and Client Monitor. + */ + +#ifndef CEPH_MONITOR_H +#define CEPH_MONITOR_H + +#include +#include +#include +#include + +#include "include/types.h" +#include "include/health.h" +#include "msg/Messenger.h" + +#include "common/Timer.h" + +#include "health_check.h" +#include "MonMap.h" +#include "Elector.h" +#include "Paxos.h" +#include "Session.h" +#include "MonCommand.h" + + +#include "common/config_obs.h" +#include "common/LogClient.h" +#include "auth/AuthClient.h" +#include "auth/AuthServer.h" +#include "auth/cephx/CephxKeyServer.h" +#include "auth/AuthMethodList.h" +#include "auth/KeyRing.h" +#include "include/common_fwd.h" +#include "messages/MMonCommand.h" +#include "mon/MonitorDBStore.h" +#include "mgr/MgrClient.h" + +#include "mon/MonOpRequest.h" +#include "common/WorkQueue.h" + +using namespace TOPNSPC::common; + +#define CEPH_MON_PROTOCOL 13 /* cluster internal */ + + +enum { + l_cluster_first = 555000, + l_cluster_num_mon, + l_cluster_num_mon_quorum, + l_cluster_num_osd, + l_cluster_num_osd_up, + l_cluster_num_osd_in, + l_cluster_osd_epoch, + l_cluster_osd_bytes, + l_cluster_osd_bytes_used, + l_cluster_osd_bytes_avail, + l_cluster_num_pool, + l_cluster_num_pg, + l_cluster_num_pg_active_clean, + l_cluster_num_pg_active, + l_cluster_num_pg_peering, + l_cluster_num_object, + l_cluster_num_object_degraded, + l_cluster_num_object_misplaced, + l_cluster_num_object_unfound, + l_cluster_num_bytes, + l_cluster_last, +}; + +enum { + l_mon_first = 456000, + l_mon_num_sessions, + l_mon_session_add, + l_mon_session_rm, + l_mon_session_trim, + l_mon_num_elections, + l_mon_election_call, + l_mon_election_win, + l_mon_election_lose, + l_mon_last, +}; + +class PaxosService; + +class AdminSocketHook; + +#define COMPAT_SET_LOC "feature_set" + +class Monitor : public Dispatcher, + public AuthClient, + public AuthServer, + public md_config_obs_t { +public: + int orig_argc = 0; + const char **orig_argv = nullptr; + + // me + std::string name; + int rank; + Messenger *messenger; + ConnectionRef con_self; + ceph::mutex lock = ceph::make_mutex("Monitor::lock"); + SafeTimer timer; + Finisher finisher; + ThreadPool cpu_tp; ///< threadpool for CPU intensive work + + ceph::mutex auth_lock = ceph::make_mutex("Monitor::auth_lock"); + + /// true if we have ever joined a quorum. if false, we are either a + /// new cluster, a newly joining monitor, or a just-upgraded + /// monitor. + bool has_ever_joined; + + PerfCounters *logger, *cluster_logger; + bool cluster_logger_registered; + + void register_cluster_logger(); + void unregister_cluster_logger(); + + MonMap *monmap; + uuid_d fingerprint; + + std::set extra_probe_peers; + + LogClient log_client; + LogChannelRef clog; + LogChannelRef audit_clog; + KeyRing keyring; + KeyServer key_server; + + AuthMethodList auth_cluster_required; + AuthMethodList auth_service_required; + + CompatSet features; + + std::vector leader_mon_commands; // quorum leader's commands + std::vector local_mon_commands; // commands i support + ceph::buffer::list local_mon_commands_bl; // encoded version of above + + std::vector prenautilus_local_mon_commands; + ceph::buffer::list prenautilus_local_mon_commands_bl; + + Messenger *mgr_messenger; + MgrClient mgr_client; + uint64_t mgr_proxy_bytes = 0; // in-flight proxied mgr command message bytes + std::string gss_ktfile_client{}; + +private: + void new_tick(); + + // -- local storage -- +public: + MonitorDBStore *store; + static const std::string MONITOR_NAME; + static const std::string MONITOR_STORE_PREFIX; + + // -- monitor state -- +private: + enum { + STATE_INIT = 1, + STATE_PROBING, + STATE_SYNCHRONIZING, + STATE_ELECTING, + STATE_LEADER, + STATE_PEON, + STATE_SHUTDOWN + }; + int state = STATE_INIT; + +public: + static const char *get_state_name(int s) { + switch (s) { + case STATE_PROBING: return "probing"; + case STATE_SYNCHRONIZING: return "synchronizing"; + case STATE_ELECTING: return "electing"; + case STATE_LEADER: return "leader"; + case STATE_PEON: return "peon"; + case STATE_SHUTDOWN: return "shutdown"; + default: return "???"; + } + } + const char *get_state_name() const { + return get_state_name(state); + } + + bool is_init() const { return state == STATE_INIT; } + bool is_shutdown() const { return state == STATE_SHUTDOWN; } + bool is_probing() const { return state == STATE_PROBING; } + bool is_synchronizing() const { return state == STATE_SYNCHRONIZING; } + bool is_electing() const { return state == STATE_ELECTING; } + bool is_leader() const { return state == STATE_LEADER; } + bool is_peon() const { return state == STATE_PEON; } + + const utime_t &get_leader_since() const; + + void prepare_new_fingerprint(MonitorDBStore::TransactionRef t); + + std::vector get_health_metrics(); + + int quorum_age() const { + auto age = std::chrono::duration_cast( + ceph::mono_clock::now() - quorum_since); + return age.count(); + } + + bool is_mon_down() const { + int max = monmap->size(); + int actual = get_quorum().size(); + auto now = ceph::real_clock::now(); + return actual < max && now > monmap->created.to_real_time(); + } + + // -- elector -- +private: + std::unique_ptr paxos; + Elector elector; + friend class Elector; + + /// features we require of peers (based on on-disk compatset) + uint64_t required_features; + + int leader; // current leader (to best of knowledge) + std::set quorum; // current active set of monitors (if !starting) + ceph::mono_clock::time_point quorum_since; // when quorum formed + utime_t leader_since; // when this monitor became the leader, if it is the leader + utime_t exited_quorum; // time detected as not in quorum; 0 if in + + // map of counts of connected clients, by type and features, for + // each quorum mon + std::map quorum_feature_map; + + /** + * Intersection of quorum member's connection feature bits. + */ + uint64_t quorum_con_features; + /** + * Intersection of quorum members mon-specific feature bits + */ + mon_feature_t quorum_mon_features; + + ceph_release_t quorum_min_mon_release{ceph_release_t::unknown}; + + std::set outside_quorum; + + bool stretch_mode_engaged{false}; + bool degraded_stretch_mode{false}; + bool recovering_stretch_mode{false}; + string stretch_bucket_divider; + map> dead_mon_buckets; // bucket->mon ranks, locations with no live mons + set up_mon_buckets; // locations with a live mon + void do_stretch_mode_election_work(); + + bool session_stretch_allowed(MonSession *s, MonOpRequestRef& op); + void disconnect_disallowed_stretch_sessions(); + void set_elector_disallowed_leaders(bool allow_election); + + map crush_loc; + bool need_set_crush_loc{false}; +public: + bool is_stretch_mode() { return stretch_mode_engaged; } + bool is_degraded_stretch_mode() { return degraded_stretch_mode; } + bool is_recovering_stretch_mode() { return recovering_stretch_mode; } + + /** + * This set of functions maintains the in-memory stretch state + * and sets up transitions of the map states by calling in to + * MonmapMonitor and OSDMonitor. + * + * The [maybe_]go_* functions are called on the leader to + * decide if transitions should happen; the trigger_* functions + * set up the map transitions; and the set_* functions actually + * change the memory state -- but these are only called + * via OSDMonitor::update_from_paxos, to guarantee consistent + * updates across the entire cluster. + */ + void try_engage_stretch_mode(); + void maybe_go_degraded_stretch_mode(); + void trigger_degraded_stretch_mode(const set& dead_mons, + const set& dead_buckets); + void set_degraded_stretch_mode(); + void go_recovery_stretch_mode(); + void set_recovery_stretch_mode(); + void trigger_healthy_stretch_mode(); + void set_healthy_stretch_mode(); + void enable_stretch_mode(); + void set_mon_crush_location(const string& loc); + + +private: + + /** + * @defgroup Monitor_h_scrub + * @{ + */ + version_t scrub_version; ///< paxos version we are scrubbing + std::map scrub_result; ///< results so far + + /** + * trigger a cross-mon scrub + * + * Verify all mons are storing identical content + */ + int scrub_start(); + int scrub(); + void handle_scrub(MonOpRequestRef op); + bool _scrub(ScrubResult *r, + std::pair *start, + int *num_keys); + void scrub_check_results(); + void scrub_timeout(); + void scrub_finish(); + void scrub_reset(); + void scrub_update_interval(ceph::timespan interval); + + Context *scrub_event; ///< periodic event to trigger scrub (leader) + Context *scrub_timeout_event; ///< scrub round timeout (leader) + void scrub_event_start(); + void scrub_event_cancel(); + void scrub_reset_timeout(); + void scrub_cancel_timeout(); + + struct ScrubState { + std::pair last_key; ///< last scrubbed key + bool finished; + + ScrubState() : finished(false) { } + virtual ~ScrubState() { } + }; + std::shared_ptr scrub_state; ///< keeps track of current scrub + + /** + * @defgroup Monitor_h_sync Synchronization + * @{ + */ + /** + * @} // provider state + */ + struct SyncProvider { + entity_addrvec_t addrs; + uint64_t cookie; ///< unique cookie for this sync attempt + utime_t timeout; ///< when we give up and expire this attempt + version_t last_committed; ///< last paxos version on peer + std::pair last_key; ///< last key sent to (or on) peer + bool full; ///< full scan? + MonitorDBStore::Synchronizer synchronizer; ///< iterator + + SyncProvider() : cookie(0), last_committed(0), full(false) {} + + void reset_timeout(CephContext *cct, int grace) { + timeout = ceph_clock_now(); + timeout += grace; + } + }; + + std::map sync_providers; ///< cookie -> SyncProvider for those syncing from us + uint64_t sync_provider_count; ///< counter for issued cookies to keep them unique + + /** + * @} // requester state + */ + entity_addrvec_t sync_provider; ///< who we are syncing from + uint64_t sync_cookie; ///< 0 if we are starting, non-zero otherwise + bool sync_full; ///< true if we are a full sync, false for recent catch-up + version_t sync_start_version; ///< last_committed at sync start + Context *sync_timeout_event; ///< timeout event + + /** + * floor for sync source + * + * When we sync we forget about our old last_committed value which + * can be dangerous. For example, if we have a cluster of: + * + * mon.a: lc 100 + * mon.b: lc 80 + * mon.c: lc 100 (us) + * + * If something forces us to sync (say, corruption, or manual + * intervention, or bug), we forget last_committed, and might abort. + * If mon.a happens to be down when we come back, we will see: + * + * mon.b: lc 80 + * mon.c: lc 0 (us) + * + * and sync from mon.b, at which point a+b will both have lc 80 and + * come online with a majority holding out of date commits. + * + * Avoid this by preserving our old last_committed value prior to + * sync and never going backwards. + */ + version_t sync_last_committed_floor; + + /** + * Obtain the synchronization target prefixes in set form. + * + * We consider a target prefix all those that are relevant when + * synchronizing two stores. That is, all those that hold paxos service's + * versions, as well as paxos versions, or any control keys such as the + * first or last committed version. + * + * Given the current design, this function should return the name of all and + * any available paxos service, plus the paxos name. + * + * @returns a set of strings referring to the prefixes being synchronized + */ + std::set get_sync_targets_names(); + + /** + * Reset the monitor's sync-related data structures for syncing *from* a peer + */ + void sync_reset_requester(); + + /** + * Reset sync state related to allowing others to sync from us + */ + void sync_reset_provider(); + + /** + * Caled when a sync attempt times out (requester-side) + */ + void sync_timeout(); + + /** + * Get the latest monmap for backup purposes during sync + */ + void sync_obtain_latest_monmap(ceph::buffer::list &bl); + + /** + * Start sync process + * + * Start pulling committed state from another monitor. + * + * @param entity where to pull committed state from + * @param full whether to do a full sync or just catch up on recent paxos + */ + void sync_start(entity_addrvec_t &addrs, bool full); + +public: + /** + * force a sync on next mon restart + */ + void sync_force(ceph::Formatter *f); + +private: + /** + * store critical state for safekeeping during sync + * + * We store a few things on the side that we don't want to get clobbered by sync. This + * includes the latest monmap and a lower bound on last_committed. + */ + void sync_stash_critical_state(MonitorDBStore::TransactionRef tx); + + /** + * reset the sync timeout + * + * This is used on the client to restart if things aren't progressing + */ + void sync_reset_timeout(); + + /** + * trim stale sync provider state + * + * If someone is syncing from us and hasn't talked to us recently, expire their state. + */ + void sync_trim_providers(); + + /** + * Complete a sync + * + * Finish up a sync after we've gotten all of the chunks. + * + * @param last_committed final last_committed value from provider + */ + void sync_finish(version_t last_committed); + + /** + * request the next chunk from the provider + */ + void sync_get_next_chunk(); + + /** + * handle sync message + * + * @param m Sync message with operation type MMonSync::OP_START_CHUNKS + */ + void handle_sync(MonOpRequestRef op); + + void _sync_reply_no_cookie(MonOpRequestRef op); + + void handle_sync_get_cookie(MonOpRequestRef op); + void handle_sync_get_chunk(MonOpRequestRef op); + void handle_sync_finish(MonOpRequestRef op); + + void handle_sync_cookie(MonOpRequestRef op); + void handle_sync_forward(MonOpRequestRef op); + void handle_sync_chunk(MonOpRequestRef op); + void handle_sync_no_cookie(MonOpRequestRef op); + + /** + * @} // Synchronization + */ + + std::list waitfor_quorum; + std::list maybe_wait_for_quorum; + + /** + * @defgroup Monitor_h_TimeCheck Monitor Clock Drift Early Warning System + * @{ + * + * We use time checks to keep track of any clock drifting going on in the + * cluster. This is accomplished by periodically ping each monitor in the + * quorum and register its response time on a map, assessing how much its + * clock has drifted. We also take this opportunity to assess the latency + * on response. + * + * This mechanism works as follows: + * + * - Leader sends out a 'PING' message to each other monitor in the quorum. + * The message is timestamped with the leader's current time. The leader's + * current time is recorded in a map, associated with each peon's + * instance. + * - The peon replies to the leader with a timestamped 'PONG' message. + * - The leader calculates a delta between the peon's timestamp and its + * current time and stashes it. + * - The leader also calculates the time it took to receive the 'PONG' + * since the 'PING' was sent, and stashes an approximate latency estimate. + * - Once all the quorum members have pong'ed, the leader will share the + * clock skew and latency maps with all the monitors in the quorum. + */ + std::map timecheck_waiting; + std::map timecheck_skews; + std::map timecheck_latencies; + // odd value means we are mid-round; even value means the round has + // finished. + version_t timecheck_round; + unsigned int timecheck_acks; + utime_t timecheck_round_start; + friend class HealthMonitor; + /* When we hit a skew we will start a new round based off of + * 'mon_timecheck_skew_interval'. Each new round will be backed off + * until we hit 'mon_timecheck_interval' -- which is the typical + * interval when not in the presence of a skew. + * + * This variable tracks the number of rounds with skews since last clean + * so that we can report to the user and properly adjust the backoff. + */ + uint64_t timecheck_rounds_since_clean; + /** + * Time Check event. + */ + Context *timecheck_event; + + void timecheck_start(); + void timecheck_finish(); + void timecheck_start_round(); + void timecheck_finish_round(bool success = true); + void timecheck_cancel_round(); + void timecheck_cleanup(); + void timecheck_reset_event(); + void timecheck_check_skews(); + void timecheck_report(); + void timecheck(); + health_status_t timecheck_status(std::ostringstream &ss, + const double skew_bound, + const double latency); + void handle_timecheck_leader(MonOpRequestRef op); + void handle_timecheck_peon(MonOpRequestRef op); + void handle_timecheck(MonOpRequestRef op); + + /** + * Returns 'true' if this is considered to be a skew; 'false' otherwise. + */ + bool timecheck_has_skew(const double skew_bound, double *abs) const { + double abs_skew = std::fabs(skew_bound); + if (abs) + *abs = abs_skew; + return (abs_skew > g_conf()->mon_clock_drift_allowed); + } + + /** + * @} + */ + /** + * Handle ping messages from others. + */ + void handle_ping(MonOpRequestRef op); + + Context *probe_timeout_event = nullptr; // for probing + + void reset_probe_timeout(); + void cancel_probe_timeout(); + void probe_timeout(int r); + + void _apply_compatset_features(CompatSet &new_features); + +public: + epoch_t get_epoch(); + int get_leader() const { return leader; } + std::string get_leader_name() { + return quorum.empty() ? std::string() : monmap->get_name(leader); + } + const std::set& get_quorum() const { return quorum; } + std::list get_quorum_names() { + std::list q; + for (auto p = quorum.begin(); p != quorum.end(); ++p) + q.push_back(monmap->get_name(*p)); + return q; + } + uint64_t get_quorum_con_features() const { + return quorum_con_features; + } + mon_feature_t get_quorum_mon_features() const { + return quorum_mon_features; + } + uint64_t get_required_features() const { + return required_features; + } + mon_feature_t get_required_mon_features() const { + return monmap->get_required_features(); + } + void apply_quorum_to_compatset_features(); + void apply_monmap_to_compatset_features(); + void calc_quorum_requirements(); + + void get_combined_feature_map(FeatureMap *fm); + +private: + void _reset(); ///< called from bootstrap, start_, or join_election + void wait_for_paxos_write(); + void _finish_svc_election(); ///< called by {win,lose}_election + void respawn(); +public: + void bootstrap(); + void join_election(); + void start_election(); + void win_standalone_election(); + // end election (called by Elector) + void win_election(epoch_t epoch, const std::set& q, + uint64_t features, + const mon_feature_t& mon_features, + ceph_release_t min_mon_release, + const std::map& metadata); + void lose_election(epoch_t epoch, std::set& q, int l, + uint64_t features, + const mon_feature_t& mon_features, + ceph_release_t min_mon_release); + // end election (called by Elector) + void finish_election(); + + void update_logger(); + + /** + * Vector holding the Services serviced by this Monitor. + */ + std::array, PAXOS_NUM> paxos_service; + + class MDSMonitor *mdsmon() { + return (class MDSMonitor *)paxos_service[PAXOS_MDSMAP].get(); + } + + class MonmapMonitor *monmon() { + return (class MonmapMonitor *)paxos_service[PAXOS_MONMAP].get(); + } + + class OSDMonitor *osdmon() { + return (class OSDMonitor *)paxos_service[PAXOS_OSDMAP].get(); + } + + class AuthMonitor *authmon() { + return (class AuthMonitor *)paxos_service[PAXOS_AUTH].get(); + } + + class LogMonitor *logmon() { + return (class LogMonitor*) paxos_service[PAXOS_LOG].get(); + } + + class MgrMonitor *mgrmon() { + return (class MgrMonitor*) paxos_service[PAXOS_MGR].get(); + } + + class MgrStatMonitor *mgrstatmon() { + return (class MgrStatMonitor*) paxos_service[PAXOS_MGRSTAT].get(); + } + + class HealthMonitor *healthmon() { + return (class HealthMonitor*) paxos_service[PAXOS_HEALTH].get(); + } + + class ConfigMonitor *configmon() { + return (class ConfigMonitor*) paxos_service[PAXOS_CONFIG].get(); + } + + class KVMonitor *kvmon() { + return (class KVMonitor*) paxos_service[PAXOS_KV].get(); + } + + friend class Paxos; + friend class OSDMonitor; + friend class MDSMonitor; + friend class MonmapMonitor; + friend class LogMonitor; + friend class KVMonitor; + + // -- sessions -- + MonSessionMap session_map; + ceph::mutex session_map_lock = ceph::make_mutex("Monitor::session_map_lock"); + AdminSocketHook *admin_hook; + + template + void with_session_map(Func&& func) { + std::lock_guard l(session_map_lock); + std::forward(func)(session_map); + } + void send_latest_monmap(Connection *con); + + // messages + void handle_get_version(MonOpRequestRef op); + void handle_subscribe(MonOpRequestRef op); + void handle_mon_get_map(MonOpRequestRef op); + + static void _generate_command_map(cmdmap_t& cmdmap, + std::map ¶m_str_map); + static const MonCommand *_get_moncommand( + const std::string &cmd_prefix, + const std::vector& cmds); + bool _allowed_command(MonSession *s, const std::string& module, + const std::string& prefix, + const cmdmap_t& cmdmap, + const std::map& param_str_map, + const MonCommand *this_cmd); + void get_mon_status(ceph::Formatter *f); + void _quorum_status(ceph::Formatter *f, std::ostream& ss); + bool _add_bootstrap_peer_hint(std::string_view cmd, const cmdmap_t& cmdmap, + std::ostream& ss); + void handle_tell_command(MonOpRequestRef op); + void handle_command(MonOpRequestRef op); + void handle_route(MonOpRequestRef op); + + int get_mon_metadata(int mon, ceph::Formatter *f, std::ostream& err); + int print_nodes(ceph::Formatter *f, std::ostream& err); + + // track metadata reported by win_election() + std::map mon_metadata; + std::map pending_metadata; + + /** + * + */ + struct health_cache_t { + health_status_t overall; + std::string summary; + + void reset() { + // health_status_t doesn't really have a NONE value and we're not + // okay with setting something else (say, HEALTH_ERR). so just + // leave it be. + summary.clear(); + } + } health_status_cache; + + Context *health_tick_event = nullptr; + Context *health_interval_event = nullptr; + + void health_tick_start(); + void health_tick_stop(); + ceph::real_clock::time_point health_interval_calc_next_update(); + void health_interval_start(); + void health_interval_stop(); + void health_events_cleanup(); + + void health_to_clog_update_conf(const std::set &changed); + + void do_health_to_clog_interval(); + void do_health_to_clog(bool force = false); + + void log_health( + const health_check_map_t& updated, + const health_check_map_t& previous, + MonitorDBStore::TransactionRef t); + + void update_pending_metadata(); + +protected: + + class HealthCheckLogStatus { + public: + health_status_t severity; + std::string last_message; + utime_t updated_at = 0; + HealthCheckLogStatus(health_status_t severity_, + const std::string &last_message_, + utime_t updated_at_) + : severity(severity_), + last_message(last_message_), + updated_at(updated_at_) + {} + }; + std::map health_check_log_times; + +public: + + void get_cluster_status(std::stringstream &ss, ceph::Formatter *f, + MonSession *session); + + void reply_command(MonOpRequestRef op, int rc, const std::string &rs, version_t version); + void reply_command(MonOpRequestRef op, int rc, const std::string &rs, ceph::buffer::list& rdata, version_t version); + + void reply_tell_command(MonOpRequestRef op, int rc, const std::string &rs); + + + + void handle_probe(MonOpRequestRef op); + /** + * Handle a Probe Operation, replying with our name, quorum and known versions. + * + * We use the MMonProbe message class for anything and everything related with + * Monitor probing. One of the operations relates directly with the probing + * itself, in which we receive a probe request and to which we reply with + * our name, our quorum and the known versions for each Paxos service. Thus the + * redundant function name. This reply will obviously be sent to the one + * probing/requesting these infos. + * + * @todo Add @pre and @post + * + * @param m A Probe message, with an operation of type Probe. + */ + void handle_probe_probe(MonOpRequestRef op); + void handle_probe_reply(MonOpRequestRef op); + + // request routing + struct RoutedRequest { + uint64_t tid; + ceph::buffer::list request_bl; + MonSession *session; + ConnectionRef con; + uint64_t con_features; + MonOpRequestRef op; + + RoutedRequest() : tid(0), session(NULL), con_features(0) {} + ~RoutedRequest() { + if (session) + session->put(); + } + }; + uint64_t routed_request_tid; + std::map routed_requests; + + void forward_request_leader(MonOpRequestRef op); + void handle_forward(MonOpRequestRef op); + void send_reply(MonOpRequestRef op, Message *reply); + void no_reply(MonOpRequestRef op); + void resend_routed_requests(); + void remove_session(MonSession *s); + void remove_all_sessions(); + void waitlist_or_zap_client(MonOpRequestRef op); + + void send_mon_message(Message *m, int rank); + /** can_change_external_state if we can do things like + * call elections as a result of the new map. + */ + void notify_new_monmap(bool can_change_external_state=false, bool remove_rank_elector=true); + +public: + struct C_Command : public C_MonOp { + Monitor &mon; + int rc; + std::string rs; + ceph::buffer::list rdata; + version_t version; + C_Command(Monitor &_mm, MonOpRequestRef _op, int r, std::string s, version_t v) : + C_MonOp(_op), mon(_mm), rc(r), rs(s), version(v){} + C_Command(Monitor &_mm, MonOpRequestRef _op, int r, std::string s, ceph::buffer::list rd, version_t v) : + C_MonOp(_op), mon(_mm), rc(r), rs(s), rdata(rd), version(v){} + + void _finish(int r) override { + auto m = op->get_req(); + if (r >= 0) { + std::ostringstream ss; + if (!op->get_req()->get_connection()) { + ss << "connection dropped for command "; + } else { + MonSession *s = op->get_session(); + + // if client drops we may not have a session to draw information from. + if (s) { + ss << "from='" << s->name << " " << s->addrs << "' " + << "entity='" << s->entity_name << "' "; + } else { + ss << "session dropped for command "; + } + } + cmdmap_t cmdmap; + std::ostringstream ds; + string prefix; + cmdmap_from_json(m->cmd, &cmdmap, ds); + cmd_getval(cmdmap, "prefix", prefix); + if (prefix != "config set" && prefix != "config-key set") + ss << "cmd='" << m->cmd << "': finished"; + + mon.audit_clog->info() << ss.str(); + mon.reply_command(op, rc, rs, rdata, version); + } + else if (r == -ECANCELED) + return; + else if (r == -EAGAIN) + mon.dispatch_op(op); + else + ceph_abort_msg("bad C_Command return value"); + } + }; + + private: + class C_RetryMessage : public C_MonOp { + Monitor *mon; + public: + C_RetryMessage(Monitor *m, MonOpRequestRef op) : + C_MonOp(op), mon(m) { } + + void _finish(int r) override { + if (r == -EAGAIN || r >= 0) + mon->dispatch_op(op); + else if (r == -ECANCELED) + return; + else + ceph_abort_msg("bad C_RetryMessage return value"); + } + }; + + //ms_dispatch handles a lot of logic and we want to reuse it + //on forwarded messages, so we create a non-locking version for this class + void _ms_dispatch(Message *m); + bool ms_dispatch(Message *m) override { + std::lock_guard l{lock}; + _ms_dispatch(m); + return true; + } + void dispatch_op(MonOpRequestRef op); + //mon_caps is used for un-connected messages from monitors + MonCap mon_caps; + bool get_authorizer(int dest_type, AuthAuthorizer **authorizer); +public: // for AuthMonitor msgr1: + int ms_handle_authentication(Connection *con) override; +private: + void ms_handle_accept(Connection *con) override; + bool ms_handle_reset(Connection *con) override; + void ms_handle_remote_reset(Connection *con) override {} + bool ms_handle_refused(Connection *con) override; + + // AuthClient + int get_auth_request( + Connection *con, + AuthConnectionMeta *auth_meta, + uint32_t *method, + std::vector *preferred_modes, + ceph::buffer::list *out) override; + int handle_auth_reply_more( + Connection *con, + AuthConnectionMeta *auth_meta, + const ceph::buffer::list& bl, + ceph::buffer::list *reply) override; + int handle_auth_done( + Connection *con, + AuthConnectionMeta *auth_meta, + uint64_t global_id, + uint32_t con_mode, + const ceph::buffer::list& bl, + CryptoKey *session_key, + std::string *connection_secret) override; + int handle_auth_bad_method( + Connection *con, + AuthConnectionMeta *auth_meta, + uint32_t old_auth_method, + int result, + const std::vector& allowed_methods, + const std::vector& allowed_modes) override; + // /AuthClient + // AuthServer + int handle_auth_request( + Connection *con, + AuthConnectionMeta *auth_meta, + bool more, + uint32_t auth_method, + const ceph::buffer::list& bl, + ceph::buffer::list *reply) override; + // /AuthServer + + int write_default_keyring(ceph::buffer::list& bl); + void extract_save_mon_key(KeyRing& keyring); + + void collect_metadata(Metadata *m); + int load_metadata(); + void count_metadata(const std::string& field, ceph::Formatter *f); + void count_metadata(const std::string& field, std::map *out); + // get_all_versions() gathers version information from daemons for health check + void get_all_versions(std::map> &versions); + void get_versions(std::map> &versions); + + // features + static CompatSet get_initial_supported_features(); + static CompatSet get_supported_features(); + static CompatSet get_legacy_features(); + /// read the ondisk features into the CompatSet pointed to by read_features + static void read_features_off_disk(MonitorDBStore *store, CompatSet *read_features); + void read_features(); + void write_features(MonitorDBStore::TransactionRef t); + + OpTracker op_tracker; + + public: + Monitor(CephContext *cct_, std::string nm, MonitorDBStore *s, + Messenger *m, Messenger *mgr_m, MonMap *map); + ~Monitor() override; + + static int check_features(MonitorDBStore *store); + + // config observer + const char** get_tracked_conf_keys() const override; + void handle_conf_change(const ConfigProxy& conf, + const std::set &changed) override; + + void update_log_clients(); + int sanitize_options(); + int preinit(); + int init(); + void init_paxos(); + void refresh_from_paxos(bool *need_bootstrap); + void shutdown(); + void tick(); + + void handle_signal(int sig); + + int mkfs(ceph::buffer::list& osdmapbl); + + /** + * check cluster_fsid file + * + * @return EEXIST if file exists and doesn't match, 0 on match, or negative error code + */ + int check_fsid(); + + /** + * write cluster_fsid file + * + * @return 0 on success, or negative error code + */ + int write_fsid(); + int write_fsid(MonitorDBStore::TransactionRef t); + + int do_admin_command(std::string_view command, const cmdmap_t& cmdmap, + ceph::Formatter *f, + std::ostream& err, + std::ostream& out); + +private: + // don't allow copying + Monitor(const Monitor& rhs); + Monitor& operator=(const Monitor &rhs); + +public: + static void format_command_descriptions(const std::vector &commands, + ceph::Formatter *f, + uint64_t features, + ceph::buffer::list *rdata); + + const std::vector &get_local_commands(mon_feature_t f) { + if (f.contains_all(ceph::features::mon::FEATURE_NAUTILUS)) { + return local_mon_commands; + } else { + return prenautilus_local_mon_commands; + } + } + const ceph::buffer::list& get_local_commands_bl(mon_feature_t f) { + if (f.contains_all(ceph::features::mon::FEATURE_NAUTILUS)) { + return local_mon_commands_bl; + } else { + return prenautilus_local_mon_commands_bl; + } + } + void set_leader_commands(const std::vector& cmds) { + leader_mon_commands = cmds; + } + + bool is_keyring_required(); +}; + +#define CEPH_MON_FEATURE_INCOMPAT_BASE CompatSet::Feature (1, "initial feature set (~v.18)") +#define CEPH_MON_FEATURE_INCOMPAT_GV CompatSet::Feature (2, "global version sequencing (v0.52)") +#define CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS CompatSet::Feature (3, "single paxos with k/v store (v0.\?)") +#define CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES CompatSet::Feature(4, "support erasure code pools") +#define CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC CompatSet::Feature(5, "new-style osdmap encoding") +#define CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2 CompatSet::Feature(6, "support isa/lrc erasure code") +#define CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3 CompatSet::Feature(7, "support shec erasure code") +#define CEPH_MON_FEATURE_INCOMPAT_KRAKEN CompatSet::Feature(8, "support monmap features") +#define CEPH_MON_FEATURE_INCOMPAT_LUMINOUS CompatSet::Feature(9, "luminous ondisk layout") +#define CEPH_MON_FEATURE_INCOMPAT_MIMIC CompatSet::Feature(10, "mimic ondisk layout") +#define CEPH_MON_FEATURE_INCOMPAT_NAUTILUS CompatSet::Feature(11, "nautilus ondisk layout") +#define CEPH_MON_FEATURE_INCOMPAT_OCTOPUS CompatSet::Feature(12, "octopus ondisk layout") +#define CEPH_MON_FEATURE_INCOMPAT_PACIFIC CompatSet::Feature(13, "pacific ondisk layout") +// make sure you add your feature to Monitor::get_supported_features + + +/* Callers use: + * + * new C_MonContext{...} + * + * instead of + * + * new C_MonContext(...) + * + * because of gcc bug [1]. + * + * [1] https://gcc.gnu.org/bugzilla/show_bug.cgi?id=85883 + */ +template +class C_MonContext : public LambdaContext { +public: + C_MonContext(const Monitor* m, T&& f) : + LambdaContext(std::forward(f)), + mon(m) + {} + void finish(int r) override { + if (mon->is_shutdown()) + return; + LambdaContext::finish(r); + } +private: + const Monitor* mon; +}; + +#endif -- cgit v1.2.3