summaryrefslogtreecommitdiffstats
path: root/src/mon/MonClient.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/mon/MonClient.cc')
-rw-r--r--src/mon/MonClient.cc2025
1 files changed, 2025 insertions, 0 deletions
diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc
new file mode 100644
index 000000000..9c637bf8a
--- /dev/null
+++ b/src/mon/MonClient.cc
@@ -0,0 +1,2025 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <algorithm>
+#include <iterator>
+#include <random>
+#include <boost/range/adaptor/map.hpp>
+#include <boost/range/adaptor/filtered.hpp>
+#include <boost/range/algorithm/copy.hpp>
+#include <boost/range/algorithm_ext/copy_n.hpp>
+#include "common/weighted_shuffle.h"
+
+#include "include/random.h"
+#include "include/scope_guard.h"
+#include "include/stringify.h"
+
+#include "messages/MMonGetMap.h"
+#include "messages/MMonGetVersion.h"
+#include "messages/MMonGetMap.h"
+#include "messages/MMonGetVersionReply.h"
+#include "messages/MMonMap.h"
+#include "messages/MConfig.h"
+#include "messages/MGetConfig.h"
+#include "messages/MAuth.h"
+#include "messages/MLogAck.h"
+#include "messages/MAuthReply.h"
+#include "messages/MMonCommand.h"
+#include "messages/MMonCommandAck.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
+#include "messages/MPing.h"
+
+#include "messages/MMonSubscribe.h"
+#include "messages/MMonSubscribeAck.h"
+#include "common/errno.h"
+#include "common/hostname.h"
+#include "common/LogClient.h"
+
+#include "MonClient.h"
+#include "error_code.h"
+#include "MonMap.h"
+
+#include "auth/Auth.h"
+#include "auth/KeyRing.h"
+#include "auth/AuthClientHandler.h"
+#include "auth/AuthRegistry.h"
+#include "auth/RotatingKeyRing.h"
+
+#define dout_subsys ceph_subsys_monc
+#undef dout_prefix
+#define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
+
+namespace bs = boost::system;
+using std::string;
+using namespace std::literals;
+
+MonClient::MonClient(CephContext *cct_, boost::asio::io_context& service) :
+ Dispatcher(cct_),
+ AuthServer(cct_),
+ messenger(NULL),
+ timer(cct_, monc_lock),
+ service(service),
+ initialized(false),
+ log_client(NULL),
+ more_log_pending(false),
+ want_monmap(true),
+ had_a_connection(false),
+ reopen_interval_multiplier(
+ cct_->_conf.get_val<double>("mon_client_hunt_interval_min_multiple")),
+ last_mon_command_tid(0),
+ version_req_id(0)
+{}
+
+MonClient::~MonClient()
+{
+}
+
+int MonClient::build_initial_monmap()
+{
+ ldout(cct, 10) << __func__ << dendl;
+ int r = monmap.build_initial(cct, false, std::cerr);
+ ldout(cct,10) << "monmap:\n";
+ monmap.print(*_dout);
+ *_dout << dendl;
+ return r;
+}
+
+int MonClient::get_monmap()
+{
+ ldout(cct, 10) << __func__ << dendl;
+ std::unique_lock l(monc_lock);
+
+ sub.want("monmap", 0, 0);
+ if (!_opened())
+ _reopen_session();
+ map_cond.wait(l, [this] { return !want_monmap; });
+ ldout(cct, 10) << __func__ << " done" << dendl;
+ return 0;
+}
+
+int MonClient::get_monmap_and_config()
+{
+ ldout(cct, 10) << __func__ << dendl;
+ ceph_assert(!messenger);
+
+ int tries = 10;
+
+ cct->init_crypto();
+ auto shutdown_crypto = make_scope_guard([this] {
+ cct->shutdown_crypto();
+ });
+
+ int r = build_initial_monmap();
+ if (r < 0) {
+ lderr(cct) << __func__ << " cannot identify monitors to contact" << dendl;
+ return r;
+ }
+
+ messenger = Messenger::create_client_messenger(
+ cct, "temp_mon_client");
+ ceph_assert(messenger);
+ messenger->add_dispatcher_head(this);
+ messenger->start();
+ auto shutdown_msgr = make_scope_guard([this] {
+ messenger->shutdown();
+ messenger->wait();
+ delete messenger;
+ messenger = nullptr;
+ if (!monmap.fsid.is_zero()) {
+ cct->_conf.set_val("fsid", stringify(monmap.fsid));
+ }
+ });
+
+ want_bootstrap_config = true;
+ auto shutdown_config = make_scope_guard([this] {
+ std::unique_lock l(monc_lock);
+ want_bootstrap_config = false;
+ bootstrap_config.reset();
+ });
+
+ ceph::ref_t<MConfig> config;
+ while (tries-- > 0) {
+ r = init();
+ if (r < 0) {
+ return r;
+ }
+ r = authenticate(std::chrono::duration<double>(cct->_conf.get_val<std::chrono::seconds>("client_mount_timeout")).count());
+ if (r == -ETIMEDOUT) {
+ shutdown();
+ continue;
+ }
+ if (r < 0) {
+ break;
+ }
+ {
+ std::unique_lock l(monc_lock);
+ if (monmap.get_epoch() &&
+ !monmap.persistent_features.contains_all(
+ ceph::features::mon::FEATURE_MIMIC)) {
+ ldout(cct,10) << __func__ << " pre-mimic monitor, no config to fetch"
+ << dendl;
+ r = 0;
+ break;
+ }
+ while ((!bootstrap_config || monmap.get_epoch() == 0) && r == 0) {
+ ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl;
+ auto status = map_cond.wait_for(l, ceph::make_timespan(
+ cct->_conf->mon_client_hunt_interval));
+ if (status == std::cv_status::timeout) {
+ r = -ETIMEDOUT;
+ }
+ }
+
+ if (bootstrap_config) {
+ ldout(cct,10) << __func__ << " success" << dendl;
+ config = std::move(bootstrap_config);
+ r = 0;
+ break;
+ }
+ }
+ lderr(cct) << __func__ << " failed to get config" << dendl;
+ shutdown();
+ continue;
+ }
+
+ if (config) {
+ // apply the bootstrap config to ensure its applied prior to completing
+ // the bootstrap
+ cct->_conf.set_mon_vals(cct, config->config, config_cb);
+ }
+
+ shutdown();
+ return r;
+}
+
+
+/**
+ * Ping the monitor with id @p mon_id and set the resulting reply in
+ * the provided @p result_reply, if this last parameter is not NULL.
+ *
+ * So that we don't rely on the MonClient's default messenger, set up
+ * during connect(), we create our own messenger to comunicate with the
+ * specified monitor. This is advantageous in the following ways:
+ *
+ * - Isolate the ping procedure from the rest of the MonClient's operations,
+ * allowing us to not acquire or manage the big monc_lock, thus not
+ * having to block waiting for some other operation to finish before we
+ * can proceed.
+ * * for instance, we can ping mon.FOO even if we are currently hunting
+ * or blocked waiting for auth to complete with mon.BAR.
+ *
+ * - Ping a monitor prior to establishing a connection (using connect())
+ * and properly establish the MonClient's messenger. This frees us
+ * from dealing with the complex foo that happens in connect().
+ *
+ * We also don't rely on MonClient as a dispatcher for this messenger,
+ * unlike what happens with the MonClient's default messenger. This allows
+ * us to sandbox the whole ping, having it much as a separate entity in
+ * the MonClient class, considerably simplifying the handling and dispatching
+ * of messages without needing to consider monc_lock.
+ *
+ * Current drawback is that we will establish a messenger for each ping
+ * we want to issue, instead of keeping a single messenger instance that
+ * would be used for all pings.
+ */
+int MonClient::ping_monitor(const string &mon_id, string *result_reply)
+{
+ ldout(cct, 10) << __func__ << dendl;
+
+ string new_mon_id;
+ if (monmap.contains("noname-"+mon_id)) {
+ new_mon_id = "noname-"+mon_id;
+ } else {
+ new_mon_id = mon_id;
+ }
+
+ if (new_mon_id.empty()) {
+ ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
+ return -EINVAL;
+ } else if (!monmap.contains(new_mon_id)) {
+ ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
+ << dendl;
+ return -ENOENT;
+ }
+
+ // N.B. monc isn't initialized
+
+ auth_registry.refresh_config();
+
+ KeyRing keyring;
+ keyring.from_ceph_context(cct);
+ RotatingKeyRing rkeyring(cct, cct->get_module_type(), &keyring);
+
+ MonClientPinger *pinger = new MonClientPinger(cct,
+ &rkeyring,
+ result_reply);
+
+ Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
+ smsgr->add_dispatcher_head(pinger);
+ smsgr->set_auth_client(pinger);
+ smsgr->start();
+
+ ConnectionRef con = smsgr->connect_to_mon(monmap.get_addrs(new_mon_id));
+ ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
+ << " " << con->get_peer_addr() << dendl;
+
+ pinger->mc.reset(new MonConnection(cct, con, 0, &auth_registry));
+ pinger->mc->start(monmap.get_epoch(), entity_name);
+ con->send_message(new MPing);
+
+ int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout);
+ if (ret == 0) {
+ ldout(cct,10) << __func__ << " got ping reply" << dendl;
+ } else {
+ ret = -ret;
+ }
+
+ con->mark_down();
+ pinger->mc.reset();
+ smsgr->shutdown();
+ smsgr->wait();
+ delete smsgr;
+ delete pinger;
+ return ret;
+}
+
+bool MonClient::ms_dispatch(Message *m)
+{
+ // we only care about these message types
+ switch (m->get_type()) {
+ case CEPH_MSG_MON_MAP:
+ case CEPH_MSG_AUTH_REPLY:
+ case CEPH_MSG_MON_SUBSCRIBE_ACK:
+ case CEPH_MSG_MON_GET_VERSION_REPLY:
+ case MSG_MON_COMMAND_ACK:
+ case MSG_COMMAND_REPLY:
+ case MSG_LOGACK:
+ case MSG_CONFIG:
+ break;
+ case CEPH_MSG_PING:
+ m->put();
+ return true;
+ default:
+ return false;
+ }
+
+ std::lock_guard lock(monc_lock);
+
+ if (!m->get_connection()->is_anon() &&
+ m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
+ if (_hunting()) {
+ auto p = _find_pending_con(m->get_connection());
+ if (p == pending_cons.end()) {
+ // ignore any messages outside hunting sessions
+ ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
+ m->put();
+ return true;
+ }
+ } else if (!active_con || active_con->get_con() != m->get_connection()) {
+ // ignore any messages outside our session(s)
+ ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
+ m->put();
+ return true;
+ }
+ }
+
+ switch (m->get_type()) {
+ case CEPH_MSG_MON_MAP:
+ handle_monmap(static_cast<MMonMap*>(m));
+ if (passthrough_monmap) {
+ return false;
+ } else {
+ m->put();
+ }
+ break;
+ case CEPH_MSG_AUTH_REPLY:
+ handle_auth(static_cast<MAuthReply*>(m));
+ break;
+ case CEPH_MSG_MON_SUBSCRIBE_ACK:
+ handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
+ break;
+ case CEPH_MSG_MON_GET_VERSION_REPLY:
+ handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
+ break;
+ case MSG_MON_COMMAND_ACK:
+ handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
+ break;
+ case MSG_COMMAND_REPLY:
+ if (m->get_connection()->is_anon() &&
+ m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
+ // this connection is from 'tell'... ignore everything except our command
+ // reply. (we'll get misc other message because we authenticated, but we
+ // don't need them.)
+ handle_command_reply(static_cast<MCommandReply*>(m));
+ return true;
+ }
+ // leave the message for another dispatch handler (e.g., Objecter)
+ return false;
+ case MSG_LOGACK:
+ if (log_client) {
+ log_client->handle_log_ack(static_cast<MLogAck*>(m));
+ m->put();
+ if (more_log_pending) {
+ send_log();
+ }
+ } else {
+ m->put();
+ }
+ break;
+ case MSG_CONFIG:
+ handle_config(static_cast<MConfig*>(m));
+ break;
+ }
+ return true;
+}
+
+void MonClient::send_log(bool flush)
+{
+ if (log_client) {
+ auto lm = log_client->get_mon_log_message(flush);
+ if (lm)
+ _send_mon_message(std::move(lm));
+ more_log_pending = log_client->are_pending();
+ }
+}
+
+void MonClient::flush_log()
+{
+ std::lock_guard l(monc_lock);
+ send_log();
+}
+
+/* Unlike all the other message-handling functions, we don't put away a reference
+* because we want to support MMonMap passthrough to other Dispatchers. */
+void MonClient::handle_monmap(MMonMap *m)
+{
+ ldout(cct, 10) << __func__ << " " << *m << dendl;
+ auto con_addrs = m->get_source_addrs();
+ string old_name = monmap.get_name(con_addrs);
+ const auto old_epoch = monmap.get_epoch();
+
+ auto p = m->monmapbl.cbegin();
+ decode(monmap, p);
+
+ ldout(cct, 10) << " got monmap " << monmap.epoch
+ << " from mon." << old_name
+ << " (according to old e" << monmap.get_epoch() << ")"
+ << dendl;
+ ldout(cct, 10) << "dump:\n";
+ monmap.print(*_dout);
+ *_dout << dendl;
+
+ if (old_epoch != monmap.get_epoch()) {
+ tried.clear();
+ }
+ if (old_name.size() == 0) {
+ ldout(cct,10) << " can't identify which mon we were connected to" << dendl;
+ _reopen_session();
+ } else {
+ auto new_name = monmap.get_name(con_addrs);
+ if (new_name.empty()) {
+ ldout(cct, 10) << "mon." << old_name << " at " << con_addrs
+ << " went away" << dendl;
+ // can't find the mon we were talking to (above)
+ _reopen_session();
+ } else if (messenger->should_use_msgr2() &&
+ monmap.get_addrs(new_name).has_msgr2() &&
+ !con_addrs.has_msgr2()) {
+ ldout(cct,1) << " mon." << new_name << " has (v2) addrs "
+ << monmap.get_addrs(new_name) << " but i'm connected to "
+ << con_addrs << ", reconnecting" << dendl;
+ _reopen_session();
+ }
+ }
+
+ cct->set_mon_addrs(monmap);
+
+ sub.got("monmap", monmap.get_epoch());
+ map_cond.notify_all();
+ want_monmap = false;
+
+ if (authenticate_err == 1) {
+ _finish_auth(0);
+ }
+}
+
+void MonClient::handle_config(MConfig *m)
+{
+ ldout(cct,10) << __func__ << " " << *m << dendl;
+
+ if (want_bootstrap_config) {
+ // get_monmap_and_config is waiting for config which it will apply
+ // synchronously
+ bootstrap_config = ceph::ref_t<MConfig>(m, false);
+ map_cond.notify_all();
+ return;
+ }
+
+ // Take the sledgehammer approach to ensuring we don't depend on
+ // anything in MonClient.
+ boost::asio::post(finish_strand,
+ [m, cct = boost::intrusive_ptr<CephContext>(cct),
+ config_notify_cb = config_notify_cb,
+ config_cb = config_cb]() {
+ cct->_conf.set_mon_vals(cct.get(), m->config, config_cb);
+ if (config_notify_cb) {
+ config_notify_cb();
+ }
+ m->put();
+ });
+}
+
+// ----------------------
+
+int MonClient::init()
+{
+ ldout(cct, 10) << __func__ << dendl;
+
+ entity_name = cct->_conf->name;
+
+ auth_registry.refresh_config();
+
+ std::lock_guard l(monc_lock);
+ keyring.reset(new KeyRing);
+ if (auth_registry.is_supported_method(messenger->get_mytype(),
+ CEPH_AUTH_CEPHX)) {
+ // this should succeed, because auth_registry just checked!
+ int r = keyring->from_ceph_context(cct);
+ if (r != 0) {
+ // but be somewhat graceful in case there was a race condition
+ lderr(cct) << "keyring not found" << dendl;
+ return r;
+ }
+ }
+ if (!auth_registry.any_supported_methods(messenger->get_mytype())) {
+ return -ENOENT;
+ }
+
+ rotating_secrets.reset(
+ new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
+
+ initialized = true;
+
+ messenger->set_auth_client(this);
+ messenger->add_dispatcher_head(this);
+
+ timer.init();
+ schedule_tick();
+
+ return 0;
+}
+
+void MonClient::shutdown()
+{
+ ldout(cct, 10) << __func__ << dendl;
+ monc_lock.lock();
+ stopping = true;
+ while (!version_requests.empty()) {
+ ceph::async::post(std::move(version_requests.begin()->second),
+ monc_errc::shutting_down, 0, 0);
+ ldout(cct, 20) << __func__ << " canceling and discarding version request "
+ << version_requests.begin()->first << dendl;
+ version_requests.erase(version_requests.begin());
+ }
+ while (!mon_commands.empty()) {
+ auto tid = mon_commands.begin()->first;
+ _cancel_mon_command(tid);
+ }
+ ldout(cct, 20) << __func__ << " discarding " << waiting_for_session.size()
+ << " pending message(s)" << dendl;
+ waiting_for_session.clear();
+
+ active_con.reset();
+ pending_cons.clear();
+
+ auth.reset();
+ global_id = 0;
+ authenticate_err = 0;
+ authenticated = false;
+
+ monc_lock.unlock();
+
+ if (initialized) {
+ initialized = false;
+ }
+ monc_lock.lock();
+ timer.shutdown();
+ stopping = false;
+ monc_lock.unlock();
+}
+
+int MonClient::authenticate(double timeout)
+{
+ std::unique_lock lock{monc_lock};
+
+ if (active_con) {
+ ldout(cct, 5) << "already authenticated" << dendl;
+ return 0;
+ }
+ sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
+ sub.want("config", 0, 0);
+ if (!_opened())
+ _reopen_session();
+
+ auto until = ceph::real_clock::now();
+ until += ceph::make_timespan(timeout);
+ if (timeout > 0.0)
+ ldout(cct, 10) << "authenticate will time out at " << until << dendl;
+ while (!active_con && authenticate_err >= 0) {
+ if (timeout > 0.0) {
+ auto r = auth_cond.wait_until(lock, until);
+ if (r == std::cv_status::timeout && !active_con) {
+ ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
+ authenticate_err = -ETIMEDOUT;
+ }
+ } else {
+ auth_cond.wait(lock);
+ }
+ }
+
+ if (active_con) {
+ ldout(cct, 5) << __func__ << " success, global_id "
+ << active_con->get_global_id() << dendl;
+ // active_con should not have been set if there was an error
+ ceph_assert(authenticate_err >= 0);
+ authenticated = true;
+ }
+
+ if (authenticate_err < 0 && auth_registry.no_keyring_disabled_cephx()) {
+ lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
+ }
+
+ return authenticate_err;
+}
+
+void MonClient::handle_auth(MAuthReply *m)
+{
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
+
+ if (m->get_connection()->is_anon()) {
+ // anon connection, used for mon tell commands
+ for (auto& p : mon_commands) {
+ if (p.second->target_con == m->get_connection()) {
+ auto& mc = p.second->target_session;
+ int ret = mc->handle_auth(m, entity_name,
+ CEPH_ENTITY_TYPE_MON,
+ rotating_secrets.get());
+ (void)ret; // we don't care
+ break;
+ }
+ }
+ m->put();
+ return;
+ }
+
+ if (!_hunting()) {
+ std::swap(active_con->get_auth(), auth);
+ int ret = active_con->authenticate(m);
+ m->put();
+ std::swap(auth, active_con->get_auth());
+ if (global_id != active_con->get_global_id()) {
+ lderr(cct) << __func__ << " peer assigned me a different global_id: "
+ << active_con->get_global_id() << dendl;
+ }
+ if (ret != -EAGAIN) {
+ _finish_auth(ret);
+ }
+ return;
+ }
+
+ // hunting
+ auto found = _find_pending_con(m->get_connection());
+ ceph_assert(found != pending_cons.end());
+ int auth_err = found->second.handle_auth(m, entity_name, want_keys,
+ rotating_secrets.get());
+ m->put();
+ if (auth_err == -EAGAIN) {
+ return;
+ }
+ if (auth_err) {
+ pending_cons.erase(found);
+ if (!pending_cons.empty()) {
+ // keep trying with pending connections
+ return;
+ }
+ // the last try just failed, give up.
+ } else {
+ auto& mc = found->second;
+ ceph_assert(mc.have_session());
+ active_con.reset(new MonConnection(std::move(mc)));
+ pending_cons.clear();
+ }
+
+ _finish_hunting(auth_err);
+ _finish_auth(auth_err);
+}
+
+void MonClient::_finish_auth(int auth_err)
+{
+ ldout(cct,10) << __func__ << " " << auth_err << dendl;
+ authenticate_err = auth_err;
+ // _resend_mon_commands() could _reopen_session() if the connected mon is not
+ // the one the MonCommand is targeting.
+ if (!auth_err && active_con) {
+ ceph_assert(auth);
+ _check_auth_tickets();
+ }
+ auth_cond.notify_all();
+}
+
+// ---------
+
+void MonClient::send_mon_message(MessageRef m)
+{
+ std::lock_guard l{monc_lock};
+ _send_mon_message(std::move(m));
+}
+
+void MonClient::_send_mon_message(MessageRef m)
+{
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
+ if (active_con) {
+ auto cur_con = active_con->get_con();
+ ldout(cct, 10) << "_send_mon_message to mon."
+ << monmap.get_name(cur_con->get_peer_addr())
+ << " at " << cur_con->get_peer_addr() << dendl;
+ cur_con->send_message2(std::move(m));
+ } else {
+ waiting_for_session.push_back(std::move(m));
+ }
+}
+
+void MonClient::_reopen_session(int rank)
+{
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
+ ldout(cct, 10) << __func__ << " rank " << rank << dendl;
+
+ active_con.reset();
+ pending_cons.clear();
+
+ authenticate_err = 1; // == in progress
+
+ _start_hunting();
+
+ if (rank >= 0) {
+ _add_conn(rank);
+ } else {
+ _add_conns();
+ }
+
+ // throw out old queued messages
+ waiting_for_session.clear();
+
+ // throw out version check requests
+ while (!version_requests.empty()) {
+ ceph::async::post(std::move(version_requests.begin()->second),
+ monc_errc::session_reset, 0, 0);
+ version_requests.erase(version_requests.begin());
+ }
+
+ for (auto& c : pending_cons) {
+ c.second.start(monmap.get_epoch(), entity_name);
+ }
+
+ if (sub.reload()) {
+ _renew_subs();
+ }
+}
+
+void MonClient::_add_conn(unsigned rank)
+{
+ auto peer = monmap.get_addrs(rank);
+ auto conn = messenger->connect_to_mon(peer);
+ MonConnection mc(cct, conn, global_id, &auth_registry);
+ if (auth) {
+ mc.get_auth().reset(auth->clone());
+ }
+ pending_cons.insert(std::make_pair(peer, std::move(mc)));
+ ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
+ << " con " << conn
+ << " addr " << peer
+ << dendl;
+}
+
+void MonClient::_add_conns()
+{
+ // collect the next batch of candidates who are listed right next to the ones
+ // already tried
+ auto get_next_batch = [this]() -> std::vector<unsigned> {
+ std::multimap<uint16_t, unsigned> ranks_by_priority;
+ boost::copy(
+ monmap.mon_info | boost::adaptors::filtered(
+ [this](auto& info) {
+ auto rank = monmap.get_rank(info.first);
+ return tried.count(rank) == 0;
+ }) | boost::adaptors::transformed(
+ [this](auto& info) {
+ auto rank = monmap.get_rank(info.first);
+ return std::make_pair(info.second.priority, rank);
+ }), std::inserter(ranks_by_priority, end(ranks_by_priority)));
+ if (ranks_by_priority.empty()) {
+ return {};
+ }
+ // only choose the monitors with lowest priority
+ auto cands = boost::make_iterator_range(
+ ranks_by_priority.equal_range(ranks_by_priority.begin()->first));
+ std::vector<unsigned> ranks;
+ boost::range::copy(cands | boost::adaptors::map_values,
+ std::back_inserter(ranks));
+ return ranks;
+ };
+ auto ranks = get_next_batch();
+ if (ranks.empty()) {
+ tried.clear(); // start over
+ ranks = get_next_batch();
+ }
+ ceph_assert(!ranks.empty());
+ if (ranks.size() > 1) {
+ std::vector<uint16_t> weights;
+ for (auto i : ranks) {
+ auto rank_name = monmap.get_name(i);
+ weights.push_back(monmap.get_weight(rank_name));
+ }
+ random_device_t rd;
+ if (std::accumulate(begin(weights), end(weights), 0u) == 0) {
+ std::shuffle(begin(ranks), end(ranks), std::mt19937{rd()});
+ } else {
+ weighted_shuffle(begin(ranks), end(ranks), begin(weights), end(weights),
+ std::mt19937{rd()});
+ }
+ }
+ ldout(cct, 10) << __func__ << " ranks=" << ranks << dendl;
+ unsigned n = cct->_conf->mon_client_hunt_parallel;
+ if (n == 0 || n > ranks.size()) {
+ n = ranks.size();
+ }
+ for (unsigned i = 0; i < n; i++) {
+ _add_conn(ranks[i]);
+ tried.insert(ranks[i]);
+ }
+}
+
+bool MonClient::ms_handle_reset(Connection *con)
+{
+ std::lock_guard lock(monc_lock);
+
+ if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
+ return false;
+
+ if (con->is_anon()) {
+ auto p = mon_commands.begin();
+ while (p != mon_commands.end()) {
+ auto cmd = p->second;
+ ++p;
+ if (cmd->target_con == con) {
+ _send_command(cmd); // may retry or fail
+ break;
+ }
+ }
+ return true;
+ }
+
+ if (_hunting()) {
+ if (pending_cons.count(con->get_peer_addrs())) {
+ ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addrs()
+ << dendl;
+ } else {
+ ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addrs()
+ << dendl;
+ }
+ return true;
+ } else {
+ if (active_con && con == active_con->get_con()) {
+ ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addrs()
+ << dendl;
+ _reopen_session();
+ return false;
+ } else {
+ ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addrs()
+ << dendl;
+ return true;
+ }
+ }
+}
+
+bool MonClient::_opened() const
+{
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
+ return active_con || _hunting();
+}
+
+bool MonClient::_hunting() const
+{
+ return !pending_cons.empty();
+}
+
+void MonClient::_start_hunting()
+{
+ ceph_assert(!_hunting());
+ // adjust timeouts if necessary
+ if (!had_a_connection)
+ return;
+ reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
+ if (reopen_interval_multiplier >
+ cct->_conf->mon_client_hunt_interval_max_multiple) {
+ reopen_interval_multiplier =
+ cct->_conf->mon_client_hunt_interval_max_multiple;
+ }
+}
+
+void MonClient::_finish_hunting(int auth_err)
+{
+ ldout(cct,10) << __func__ << " " << auth_err << dendl;
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
+ // the pending conns have been cleaned.
+ ceph_assert(!_hunting());
+ if (active_con) {
+ auto con = active_con->get_con();
+ ldout(cct, 1) << "found mon."
+ << monmap.get_name(con->get_peer_addr())
+ << dendl;
+ } else {
+ ldout(cct, 1) << "no mon sessions established" << dendl;
+ }
+
+ had_a_connection = true;
+ _un_backoff();
+
+ if (!auth_err) {
+ last_rotating_renew_sent = utime_t();
+ while (!waiting_for_session.empty()) {
+ _send_mon_message(std::move(waiting_for_session.front()));
+ waiting_for_session.pop_front();
+ }
+ _resend_mon_commands();
+ send_log(true);
+ if (active_con) {
+ auth = std::move(active_con->get_auth());
+ if (global_id && global_id != active_con->get_global_id()) {
+ lderr(cct) << __func__ << " global_id changed from " << global_id
+ << " to " << active_con->get_global_id() << dendl;
+ }
+ global_id = active_con->get_global_id();
+ }
+ }
+}
+
+void MonClient::tick()
+{
+ ldout(cct, 10) << __func__ << dendl;
+
+ utime_t now = ceph_clock_now();
+
+ auto reschedule_tick = make_scope_guard([this] {
+ schedule_tick();
+ });
+
+ _check_auth_tickets();
+ _check_tell_commands();
+
+ if (_hunting()) {
+ ldout(cct, 1) << "continuing hunt" << dendl;
+ return _reopen_session();
+ } else if (active_con) {
+ // just renew as needed
+ auto cur_con = active_con->get_con();
+ if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
+ const bool maybe_renew = sub.need_renew();
+ ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no")
+ << dendl;
+ if (maybe_renew) {
+ _renew_subs();
+ }
+ }
+
+ if (now > last_keepalive + cct->_conf->mon_client_ping_interval) {
+ cur_con->send_keepalive();
+ last_keepalive = now;
+
+ if (cct->_conf->mon_client_ping_timeout > 0 &&
+ cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
+ utime_t lk = cur_con->get_last_keepalive_ack();
+ utime_t interval = now - lk;
+ if (interval > cct->_conf->mon_client_ping_timeout) {
+ ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
+ << " seconds), reconnecting" << dendl;
+ return _reopen_session();
+ }
+ }
+
+ _un_backoff();
+ }
+
+ if (now > last_send_log + cct->_conf->mon_client_log_interval) {
+ send_log();
+ last_send_log = now;
+ }
+ }
+}
+
+void MonClient::_un_backoff()
+{
+ // un-backoff our reconnect interval
+ reopen_interval_multiplier = std::max(
+ cct->_conf.get_val<double>("mon_client_hunt_interval_min_multiple"),
+ reopen_interval_multiplier /
+ cct->_conf.get_val<double>("mon_client_hunt_interval_backoff"));
+ ldout(cct, 20) << __func__ << " reopen_interval_multipler now "
+ << reopen_interval_multiplier << dendl;
+}
+
+void MonClient::schedule_tick()
+{
+ auto do_tick = make_lambda_context([this](int) { tick(); });
+ if (!is_connected()) {
+ // start another round of hunting
+ const auto hunt_interval = (cct->_conf->mon_client_hunt_interval *
+ reopen_interval_multiplier);
+ timer.add_event_after(hunt_interval, do_tick);
+ } else {
+ // keep in touch
+ timer.add_event_after(std::min(cct->_conf->mon_client_ping_interval,
+ cct->_conf->mon_client_log_interval),
+ do_tick);
+ }
+}
+
+// ---------
+
+void MonClient::_renew_subs()
+{
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
+ if (!sub.have_new()) {
+ ldout(cct, 10) << __func__ << " - empty" << dendl;
+ return;
+ }
+
+ ldout(cct, 10) << __func__ << dendl;
+ if (!_opened())
+ _reopen_session();
+ else {
+ auto m = ceph::make_message<MMonSubscribe>();
+ m->what = sub.get_subs();
+ m->hostname = ceph_get_short_hostname();
+ _send_mon_message(std::move(m));
+ sub.renewed();
+ }
+}
+
+void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
+{
+ sub.acked(m->interval);
+ m->put();
+}
+
+int MonClient::_check_auth_tickets()
+{
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
+ if (active_con && auth) {
+ if (auth->need_tickets()) {
+ ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
+ auto m = ceph::make_message<MAuth>();
+ m->protocol = auth->get_protocol();
+ auth->prepare_build_request();
+ auth->build_request(m->auth_payload);
+ _send_mon_message(m);
+ }
+
+ _check_auth_rotating();
+ }
+ return 0;
+}
+
+int MonClient::_check_auth_rotating()
+{
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
+ if (!rotating_secrets ||
+ !auth_principal_needs_rotating_keys(entity_name)) {
+ ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
+ return 0;
+ }
+
+ if (!active_con || !auth) {
+ ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
+ return 0;
+ }
+
+ utime_t now = ceph_clock_now();
+ utime_t cutoff = now;
+ cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
+ utime_t issued_at_lower_bound = now;
+ issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
+ if (!rotating_secrets->need_new_secrets(cutoff)) {
+ ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
+ rotating_secrets->dump_rotating();
+ return 0;
+ }
+
+ ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
+ if (!rotating_secrets->need_new_secrets() &&
+ rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
+ // the key has expired before it has been issued?
+ lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
+ << " (before " << issued_at_lower_bound << ")" << dendl;
+ }
+ if ((now > last_rotating_renew_sent) &&
+ double(now - last_rotating_renew_sent) < 1) {
+ ldout(cct, 10) << __func__ << " called too often (last: "
+ << last_rotating_renew_sent << "), skipping refresh" << dendl;
+ return 0;
+ }
+ auto m = ceph::make_message<MAuth>();
+ m->protocol = auth->get_protocol();
+ if (auth->build_rotating_request(m->auth_payload)) {
+ last_rotating_renew_sent = now;
+ _send_mon_message(std::move(m));
+ }
+ return 0;
+}
+
+int MonClient::wait_auth_rotating(double timeout)
+{
+ std::unique_lock l(monc_lock);
+
+ // Must be initialized
+ ceph_assert(auth != nullptr);
+
+ if (auth->get_protocol() == CEPH_AUTH_NONE)
+ return 0;
+
+ if (!rotating_secrets)
+ return 0;
+
+ ldout(cct, 10) << __func__ << " waiting for " << timeout << dendl;
+ utime_t cutoff = ceph_clock_now();
+ cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
+ if (auth_cond.wait_for(l, ceph::make_timespan(timeout), [this, cutoff] {
+ return (!auth_principal_needs_rotating_keys(entity_name) ||
+ !rotating_secrets->need_new_secrets(cutoff));
+ })) {
+ ldout(cct, 10) << __func__ << " done" << dendl;
+ return 0;
+ } else {
+ ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
+ return -ETIMEDOUT;
+ }
+}
+
+// ---------
+
+void MonClient::_send_command(MonCommand *r)
+{
+ if (r->is_tell()) {
+ ++r->send_attempts;
+ if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
+ _finish_command(r, monc_errc::mon_unavailable, "mon unavailable", {});
+ return;
+ }
+ // tell-style command
+ if (monmap.min_mon_release >= ceph_release_t::octopus) {
+ if (r->target_con) {
+ r->target_con->mark_down();
+ }
+ if (r->target_rank >= 0) {
+ if (r->target_rank >= (int)monmap.size()) {
+ ldout(cct, 10) << " target " << r->target_rank
+ << " >= max mon " << monmap.size() << dendl;
+ _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
+ return;
+ }
+ r->target_con = messenger->connect_to_mon(
+ monmap.get_addrs(r->target_rank), true /* anon */);
+ } else {
+ if (!monmap.contains(r->target_name)) {
+ ldout(cct, 10) << " target " << r->target_name
+ << " not present in monmap" << dendl;
+ _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
+ return;
+ }
+ r->target_con = messenger->connect_to_mon(
+ monmap.get_addrs(r->target_name), true /* anon */);
+ }
+
+ r->target_session.reset(new MonConnection(cct, r->target_con, 0,
+ &auth_registry));
+ r->target_session->start(monmap.get_epoch(), entity_name);
+ r->last_send_attempt = ceph_clock_now();
+
+ MCommand *m = new MCommand(monmap.fsid);
+ m->set_tid(r->tid);
+ m->cmd = r->cmd;
+ m->set_data(r->inbl);
+ r->target_session->queue_command(m);
+ return;
+ }
+
+ // ugly legacy handling of pre-octopus mons
+ entity_addr_t peer;
+ if (active_con) {
+ peer = active_con->get_con()->get_peer_addr();
+ }
+
+ if (r->target_rank >= 0 &&
+ r->target_rank != monmap.get_rank(peer)) {
+ ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
+ << " wants rank " << r->target_rank
+ << ", reopening session"
+ << dendl;
+ if (r->target_rank >= (int)monmap.size()) {
+ ldout(cct, 10) << " target " << r->target_rank
+ << " >= max mon " << monmap.size() << dendl;
+ _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
+ return;
+ }
+ _reopen_session(r->target_rank);
+ return;
+ }
+ if (r->target_name.length() &&
+ r->target_name != monmap.get_name(peer)) {
+ ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
+ << " wants mon " << r->target_name
+ << ", reopening session"
+ << dendl;
+ if (!monmap.contains(r->target_name)) {
+ ldout(cct, 10) << " target " << r->target_name
+ << " not present in monmap" << dendl;
+ _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
+ return;
+ }
+ _reopen_session(monmap.get_rank(r->target_name));
+ return;
+ }
+ // fall-thru to send 'normal' CLI command
+ }
+
+ // normal CLI command
+ ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
+ auto m = ceph::make_message<MMonCommand>(monmap.fsid);
+ m->set_tid(r->tid);
+ m->cmd = r->cmd;
+ m->set_data(r->inbl);
+ _send_mon_message(std::move(m));
+ return;
+}
+
+void MonClient::_check_tell_commands()
+{
+ // resend any requests
+ auto now = ceph_clock_now();
+ auto p = mon_commands.begin();
+ while (p != mon_commands.end()) {
+ auto cmd = p->second;
+ ++p;
+ if (cmd->is_tell() &&
+ cmd->last_send_attempt != utime_t() &&
+ now - cmd->last_send_attempt > cct->_conf->mon_client_hunt_interval) {
+ ldout(cct,5) << __func__ << " timeout tell command " << cmd->tid << dendl;
+ _send_command(cmd); // might remove cmd from mon_commands
+ }
+ }
+}
+
+void MonClient::_resend_mon_commands()
+{
+ // resend any requests
+ auto p = mon_commands.begin();
+ while (p != mon_commands.end()) {
+ auto cmd = p->second;
+ ++p;
+ if (cmd->is_tell() && monmap.min_mon_release >= ceph_release_t::octopus) {
+ // starting with octopus, tell commands use their own connetion and need no
+ // special resend when we finish hunting.
+ } else {
+ _send_command(cmd); // might remove cmd from mon_commands
+ }
+ }
+}
+
+void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
+{
+ MonCommand *r = NULL;
+ uint64_t tid = ack->get_tid();
+
+ if (tid == 0 && !mon_commands.empty()) {
+ r = mon_commands.begin()->second;
+ ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
+ } else {
+ auto p = mon_commands.find(tid);
+ if (p == mon_commands.end()) {
+ ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
+ ack->put();
+ return;
+ }
+ r = p->second;
+ }
+
+ ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
+ auto ec = ack->r < 0 ? bs::error_code(-ack->r, mon_category())
+ : bs::error_code();
+ _finish_command(r, ec, ack->rs,
+ std::move(ack->get_data()));
+ ack->put();
+}
+
+void MonClient::handle_command_reply(MCommandReply *reply)
+{
+ MonCommand *r = NULL;
+ uint64_t tid = reply->get_tid();
+
+ if (tid == 0 && !mon_commands.empty()) {
+ r = mon_commands.begin()->second;
+ ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid
+ << dendl;
+ } else {
+ auto p = mon_commands.find(tid);
+ if (p == mon_commands.end()) {
+ ldout(cct, 10) << __func__ << " " << reply->get_tid() << " not found"
+ << dendl;
+ reply->put();
+ return;
+ }
+ r = p->second;
+ }
+
+ ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
+ auto ec = reply->r < 0 ? bs::error_code(-reply->r, mon_category())
+ : bs::error_code();
+ _finish_command(r, ec, reply->rs, std::move(reply->get_data()));
+ reply->put();
+}
+
+int MonClient::_cancel_mon_command(uint64_t tid)
+{
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
+
+ auto it = mon_commands.find(tid);
+ if (it == mon_commands.end()) {
+ ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
+ return -ENOENT;
+ }
+
+ ldout(cct, 10) << __func__ << " tid " << tid << dendl;
+
+ MonCommand *cmd = it->second;
+ _finish_command(cmd, monc_errc::timed_out, "timed out"sv, {});
+ return 0;
+}
+
+void MonClient::_finish_command(MonCommand *r, bs::error_code ret,
+ std::string_view rs, ceph::buffer::list&& bl)
+{
+ ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs
+ << dendl;
+ ceph::async::post(std::move(r->onfinish), ret, std::string(rs),
+ std::move(bl));
+ if (r->target_con) {
+ r->target_con->mark_down();
+ }
+ mon_commands.erase(r->tid);
+ delete r;
+}
+
+// ---------
+
+void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
+{
+ ceph_assert(ceph_mutex_is_locked(monc_lock));
+ auto iter = version_requests.find(m->handle);
+ if (iter == version_requests.end()) {
+ ldout(cct, 0) << __func__ << " version request with handle " << m->handle
+ << " not found" << dendl;
+ } else {
+ auto req = std::move(iter->second);
+ ldout(cct, 10) << __func__ << " finishing " << iter->first << " version "
+ << m->version << dendl;
+ version_requests.erase(iter);
+ ceph::async::post(std::move(req), bs::error_code(),
+ m->version, m->oldest_version);
+ }
+ m->put();
+}
+
+int MonClient::get_auth_request(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ uint32_t *auth_method,
+ std::vector<uint32_t> *preferred_modes,
+ ceph::buffer::list *bl)
+{
+ std::lock_guard l(monc_lock);
+ ldout(cct,10) << __func__ << " con " << con << " auth_method " << *auth_method
+ << dendl;
+
+ // connection to mon?
+ if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ ceph_assert(!auth_meta->authorizer);
+ if (con->is_anon()) {
+ for (auto& i : mon_commands) {
+ if (i.second->target_con == con) {
+ return i.second->target_session->get_auth_request(
+ auth_method, preferred_modes, bl,
+ entity_name, want_keys, rotating_secrets.get());
+ }
+ }
+ }
+ for (auto& i : pending_cons) {
+ if (i.second.is_con(con)) {
+ return i.second.get_auth_request(
+ auth_method, preferred_modes, bl,
+ entity_name, want_keys, rotating_secrets.get());
+ }
+ }
+ return -ENOENT;
+ }
+
+ // generate authorizer
+ if (!auth) {
+ lderr(cct) << __func__ << " but no auth handler is set up" << dendl;
+ return -EACCES;
+ }
+ auth_meta->authorizer.reset(auth->build_authorizer(con->get_peer_type()));
+ if (!auth_meta->authorizer) {
+ lderr(cct) << __func__ << " failed to build_authorizer for type "
+ << ceph_entity_type_name(con->get_peer_type()) << dendl;
+ return -EACCES;
+ }
+ auth_meta->auth_method = auth_meta->authorizer->protocol;
+ auth_registry.get_supported_modes(con->get_peer_type(),
+ auth_meta->auth_method,
+ preferred_modes);
+ *bl = auth_meta->authorizer->bl;
+ return 0;
+}
+
+int MonClient::handle_auth_reply_more(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ const ceph::buffer::list& bl,
+ ceph::buffer::list *reply)
+{
+ std::lock_guard l(monc_lock);
+
+ if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ if (con->is_anon()) {
+ for (auto& i : mon_commands) {
+ if (i.second->target_con == con) {
+ return i.second->target_session->handle_auth_reply_more(
+ auth_meta, bl, reply);
+ }
+ }
+ }
+ for (auto& i : pending_cons) {
+ if (i.second.is_con(con)) {
+ return i.second.handle_auth_reply_more(auth_meta, bl, reply);
+ }
+ }
+ return -ENOENT;
+ }
+
+ // authorizer challenges
+ if (!auth || !auth_meta->authorizer) {
+ lderr(cct) << __func__ << " no authorizer?" << dendl;
+ return -1;
+ }
+ auth_meta->authorizer->add_challenge(cct, bl);
+ *reply = auth_meta->authorizer->bl;
+ return 0;
+}
+
+int MonClient::handle_auth_done(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ uint64_t global_id,
+ uint32_t con_mode,
+ const ceph::buffer::list& bl,
+ CryptoKey *session_key,
+ std::string *connection_secret)
+{
+ if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ std::lock_guard l(monc_lock);
+ if (con->is_anon()) {
+ for (auto& i : mon_commands) {
+ if (i.second->target_con == con) {
+ return i.second->target_session->handle_auth_done(
+ auth_meta, global_id, bl,
+ session_key, connection_secret);
+ }
+ }
+ }
+ for (auto& i : pending_cons) {
+ if (i.second.is_con(con)) {
+ int r = i.second.handle_auth_done(
+ auth_meta, global_id, bl,
+ session_key, connection_secret);
+ if (r) {
+ pending_cons.erase(i.first);
+ if (!pending_cons.empty()) {
+ return r;
+ }
+ } else {
+ active_con.reset(new MonConnection(std::move(i.second)));
+ pending_cons.clear();
+ ceph_assert(active_con->have_session());
+ }
+
+ _finish_hunting(r);
+ if (r || monmap.get_epoch() > 0) {
+ _finish_auth(r);
+ }
+ return r;
+ }
+ }
+ return -ENOENT;
+ } else {
+ // verify authorizer reply
+ auto p = bl.begin();
+ if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) {
+ ldout(cct, 0) << __func__ << " failed verifying authorizer reply"
+ << dendl;
+ return -EACCES;
+ }
+ auth_meta->session_key = auth_meta->authorizer->session_key;
+ return 0;
+ }
+}
+
+int MonClient::handle_auth_bad_method(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ uint32_t old_auth_method,
+ int result,
+ const std::vector<uint32_t>& allowed_methods,
+ const std::vector<uint32_t>& allowed_modes)
+{
+ auth_meta->allowed_methods = allowed_methods;
+
+ std::lock_guard l(monc_lock);
+ if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ if (con->is_anon()) {
+ for (auto& i : mon_commands) {
+ if (i.second->target_con == con) {
+ int r = i.second->target_session->handle_auth_bad_method(
+ old_auth_method,
+ result,
+ allowed_methods,
+ allowed_modes);
+ if (r < 0) {
+ auto ec = bs::error_code(-r, mon_category());
+ _finish_command(i.second, ec, "auth failed"sv, {});
+ }
+ return r;
+ }
+ }
+ }
+ for (auto& i : pending_cons) {
+ if (i.second.is_con(con)) {
+ int r = i.second.handle_auth_bad_method(old_auth_method,
+ result,
+ allowed_methods,
+ allowed_modes);
+ if (r == 0) {
+ return r; // try another method on this con
+ }
+ pending_cons.erase(i.first);
+ if (!pending_cons.empty()) {
+ return r; // fail this con, maybe another con will succeed
+ }
+ // fail hunt
+ _finish_hunting(r);
+ _finish_auth(r);
+ return r;
+ }
+ }
+ return -ENOENT;
+ } else {
+ // huh...
+ ldout(cct,10) << __func__ << " hmm, they didn't like " << old_auth_method
+ << " result " << cpp_strerror(result)
+ << " and auth is " << (auth ? auth->get_protocol() : 0)
+ << dendl;
+ return -EACCES;
+ }
+}
+
+int MonClient::handle_auth_request(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ bool more,
+ uint32_t auth_method,
+ const ceph::buffer::list& payload,
+ ceph::buffer::list *reply)
+{
+ if (payload.length() == 0) {
+ // for some channels prior to nautilus (osd heartbeat), we
+ // tolerate the lack of an authorizer.
+ if (!con->get_messenger()->require_authorizer) {
+ handle_authentication_dispatcher->ms_handle_authentication(con);
+ return 1;
+ }
+ return -EACCES;
+ }
+ auth_meta->auth_mode = payload[0];
+ if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER ||
+ auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
+ return -EACCES;
+ }
+ AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
+ auth_method);
+ if (!ah) {
+ lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
+ << auth_method << dendl;
+ return -EOPNOTSUPP;
+ }
+
+ auto ac = &auth_meta->authorizer_challenge;
+ if (auth_meta->skip_authorizer_challenge) {
+ ldout(cct, 10) << __func__ << " skipping challenge on " << con << dendl;
+ ac = nullptr;
+ }
+
+ bool was_challenge = (bool)auth_meta->authorizer_challenge;
+ bool isvalid = ah->verify_authorizer(
+ cct,
+ *rotating_secrets,
+ payload,
+ auth_meta->get_connection_secret_length(),
+ reply,
+ &con->peer_name,
+ &con->peer_global_id,
+ &con->peer_caps_info,
+ &auth_meta->session_key,
+ &auth_meta->connection_secret,
+ ac);
+ if (isvalid) {
+ handle_authentication_dispatcher->ms_handle_authentication(con);
+ return 1;
+ }
+ if (!more && !was_challenge && auth_meta->authorizer_challenge) {
+ ldout(cct,10) << __func__ << " added challenge on " << con << dendl;
+ return 0;
+ }
+ ldout(cct,10) << __func__ << " bad authorizer on " << con << dendl;
+ // discard old challenge
+ auth_meta->authorizer_challenge.reset();
+ return -EACCES;
+}
+
+AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
+ std::lock_guard l(monc_lock);
+ if (auth) {
+ return auth->build_authorizer(service_id);
+ } else {
+ ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
+ << ", but no auth is available now" << dendl;
+ return nullptr;
+ }
+}
+
+#define dout_subsys ceph_subsys_monc
+#undef dout_prefix
+#define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
+
+MonConnection::MonConnection(
+ CephContext *cct, ConnectionRef con, uint64_t global_id,
+ AuthRegistry *ar)
+ : cct(cct), con(con), global_id(global_id), auth_registry(ar)
+{}
+
+MonConnection::~MonConnection()
+{
+ if (con) {
+ con->mark_down();
+ con.reset();
+ }
+}
+
+bool MonConnection::have_session() const
+{
+ return state == State::HAVE_SESSION;
+}
+
+void MonConnection::start(epoch_t epoch,
+ const EntityName& entity_name)
+{
+ using ceph::encode;
+ auth_start = ceph_clock_now();
+
+ if (con->get_peer_addr().is_msgr2()) {
+ ldout(cct, 10) << __func__ << " opening mon connection" << dendl;
+ state = State::AUTHENTICATING;
+ con->send_message(new MMonGetMap());
+ return;
+ }
+
+ // restart authentication handshake
+ state = State::NEGOTIATING;
+
+ // send an initial keepalive to ensure our timestamp is valid by the
+ // time we are in an OPENED state (by sequencing this before
+ // authentication).
+ con->send_keepalive();
+
+ auto m = new MAuth;
+ m->protocol = CEPH_AUTH_UNKNOWN;
+ m->monmap_epoch = epoch;
+ __u8 struct_v = 1;
+ encode(struct_v, m->auth_payload);
+ std::vector<uint32_t> auth_supported;
+ auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
+ encode(auth_supported, m->auth_payload);
+ encode(entity_name, m->auth_payload);
+ encode(global_id, m->auth_payload);
+ con->send_message(m);
+}
+
+int MonConnection::get_auth_request(
+ uint32_t *method,
+ std::vector<uint32_t> *preferred_modes,
+ ceph::buffer::list *bl,
+ const EntityName& entity_name,
+ uint32_t want_keys,
+ RotatingKeyRing* keyring)
+{
+ using ceph::encode;
+ // choose method
+ if (auth_method < 0) {
+ std::vector<uint32_t> as;
+ auth_registry->get_supported_methods(con->get_peer_type(), &as);
+ if (as.empty()) {
+ return -EACCES;
+ }
+ auth_method = as.front();
+ }
+ *method = auth_method;
+ auth_registry->get_supported_modes(con->get_peer_type(), auth_method,
+ preferred_modes);
+ ldout(cct,10) << __func__ << " method " << *method
+ << " preferred_modes " << *preferred_modes << dendl;
+ if (preferred_modes->empty()) {
+ return -EACCES;
+ }
+
+ int r = _init_auth(*method, entity_name, want_keys, keyring, true);
+ ceph_assert(r == 0);
+
+ // initial requset includes some boilerplate...
+ encode((char)AUTH_MODE_MON, *bl);
+ encode(entity_name, *bl);
+ encode(global_id, *bl);
+
+ // and (maybe) some method-specific initial payload
+ auth->build_initial_request(bl);
+
+ return 0;
+}
+
+int MonConnection::handle_auth_reply_more(
+ AuthConnectionMeta *auth_meta,
+ const ceph::buffer::list& bl,
+ ceph::buffer::list *reply)
+{
+ ldout(cct, 10) << __func__ << " payload " << bl.length() << dendl;
+ ldout(cct, 30) << __func__ << " got\n";
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ auto p = bl.cbegin();
+ ldout(cct, 10) << __func__ << " payload_len " << bl.length() << dendl;
+ int r = auth->handle_response(0, p, &auth_meta->session_key,
+ &auth_meta->connection_secret);
+ if (r == -EAGAIN) {
+ auth->prepare_build_request();
+ auth->build_request(*reply);
+ ldout(cct, 10) << __func__ << " responding with " << reply->length()
+ << " bytes" << dendl;
+ r = 0;
+ } else if (r < 0) {
+ lderr(cct) << __func__ << " handle_response returned " << r << dendl;
+ } else {
+ ldout(cct, 10) << __func__ << " authenticated!" << dendl;
+ // FIXME
+ ceph_abort(cct, "write me");
+ }
+ return r;
+}
+
+int MonConnection::handle_auth_done(
+ AuthConnectionMeta *auth_meta,
+ uint64_t new_global_id,
+ const ceph::buffer::list& bl,
+ CryptoKey *session_key,
+ std::string *connection_secret)
+{
+ ldout(cct,10) << __func__ << " global_id " << new_global_id
+ << " payload " << bl.length()
+ << dendl;
+ global_id = new_global_id;
+ auth->set_global_id(global_id);
+ auto p = bl.begin();
+ int auth_err = auth->handle_response(0, p, &auth_meta->session_key,
+ &auth_meta->connection_secret);
+ if (auth_err >= 0) {
+ state = State::HAVE_SESSION;
+ }
+ con->set_last_keepalive_ack(auth_start);
+
+ if (pending_tell_command) {
+ con->send_message2(std::move(pending_tell_command));
+ }
+ return auth_err;
+}
+
+int MonConnection::handle_auth_bad_method(
+ uint32_t old_auth_method,
+ int result,
+ const std::vector<uint32_t>& allowed_methods,
+ const std::vector<uint32_t>& allowed_modes)
+{
+ ldout(cct,10) << __func__ << " old_auth_method " << old_auth_method
+ << " result " << cpp_strerror(result)
+ << " allowed_methods " << allowed_methods << dendl;
+ std::vector<uint32_t> auth_supported;
+ auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
+ auto p = std::find(auth_supported.begin(), auth_supported.end(),
+ old_auth_method);
+ assert(p != auth_supported.end());
+ p = std::find_first_of(std::next(p), auth_supported.end(),
+ allowed_methods.begin(), allowed_methods.end());
+ if (p == auth_supported.end()) {
+ lderr(cct) << __func__ << " server allowed_methods " << allowed_methods
+ << " but i only support " << auth_supported << dendl;
+ return -EACCES;
+ }
+ auth_method = *p;
+ ldout(cct,10) << __func__ << " will try " << auth_method << " next" << dendl;
+ return 0;
+}
+
+int MonConnection::handle_auth(MAuthReply* m,
+ const EntityName& entity_name,
+ uint32_t want_keys,
+ RotatingKeyRing* keyring)
+{
+ if (state == State::NEGOTIATING) {
+ int r = _negotiate(m, entity_name, want_keys, keyring);
+ if (r) {
+ return r;
+ }
+ state = State::AUTHENTICATING;
+ }
+ int r = authenticate(m);
+ if (!r) {
+ state = State::HAVE_SESSION;
+ }
+ return r;
+}
+
+int MonConnection::_negotiate(MAuthReply *m,
+ const EntityName& entity_name,
+ uint32_t want_keys,
+ RotatingKeyRing* keyring)
+{
+ int r = _init_auth(m->protocol, entity_name, want_keys, keyring, false);
+ if (r == -ENOTSUP) {
+ if (m->result == -ENOTSUP) {
+ ldout(cct, 10) << "none of our auth protocols are supported by the server"
+ << dendl;
+ }
+ return m->result;
+ }
+ return r;
+}
+
+int MonConnection::_init_auth(
+ uint32_t method,
+ const EntityName& entity_name,
+ uint32_t want_keys,
+ RotatingKeyRing* keyring,
+ bool msgr2)
+{
+ ldout(cct, 10) << __func__ << " method " << method << dendl;
+ if (auth && auth->get_protocol() == (int)method) {
+ ldout(cct, 10) << __func__ << " already have auth, reseting" << dendl;
+ auth->reset();
+ return 0;
+ }
+
+ ldout(cct, 10) << __func__ << " creating new auth" << dendl;
+ auth.reset(AuthClientHandler::create(cct, method, keyring));
+ if (!auth) {
+ ldout(cct, 10) << " no handler for protocol " << method << dendl;
+ return -ENOTSUP;
+ }
+
+ // do not request MGR key unless the mon has the SERVER_KRAKEN
+ // feature. otherwise it will give us an auth error. note that
+ // we have to use the FEATUREMASK because pre-jewel the kraken
+ // feature bit was used for something else.
+ if (!msgr2 &&
+ (want_keys & CEPH_ENTITY_TYPE_MGR) &&
+ !(con->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
+ ldout(cct, 1) << __func__
+ << " not requesting MGR keys from pre-kraken monitor"
+ << dendl;
+ want_keys &= ~CEPH_ENTITY_TYPE_MGR;
+ }
+ auth->set_want_keys(want_keys);
+ auth->init(entity_name);
+ auth->set_global_id(global_id);
+ return 0;
+}
+
+int MonConnection::authenticate(MAuthReply *m)
+{
+ ceph_assert(auth);
+ if (!m->global_id) {
+ ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
+ }
+ if (m->global_id != global_id) {
+ // it's a new session
+ auth->reset();
+ global_id = m->global_id;
+ auth->set_global_id(global_id);
+ ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
+ }
+ auto p = m->result_bl.cbegin();
+ int ret = auth->handle_response(m->result, p, nullptr, nullptr);
+ if (ret == -EAGAIN) {
+ auto ma = new MAuth;
+ ma->protocol = auth->get_protocol();
+ auth->prepare_build_request();
+ auth->build_request(ma->auth_payload);
+ con->send_message(ma);
+ }
+ if (ret == 0 && pending_tell_command) {
+ con->send_message2(std::move(pending_tell_command));
+ }
+
+ return ret;
+}
+
+void MonClient::register_config_callback(md_config_t::config_callback fn) {
+ ceph_assert(!config_cb);
+ config_cb = fn;
+}
+
+md_config_t::config_callback MonClient::get_config_callback() {
+ return config_cb;
+}
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
+class monc_error_category : public ceph::converting_category {
+public:
+ monc_error_category(){}
+ const char* name() const noexcept override;
+ const char* message(int ev, char*, std::size_t) const noexcept override;
+ std::string message(int ev) const override;
+ bs::error_condition default_error_condition(int ev) const noexcept
+ override;
+ bool equivalent(int ev, const bs::error_condition& c) const
+ noexcept override;
+ using ceph::converting_category::equivalent;
+ int from_code(int ev) const noexcept override;
+};
+#pragma GCC diagnostic pop
+#pragma clang diagnostic pop
+
+const char* monc_error_category::name() const noexcept {
+ return "monc";
+}
+
+const char* monc_error_category::message(int ev, char*, std::size_t) const noexcept {
+ if (ev == 0)
+ return "No error";
+
+ switch (static_cast<monc_errc>(ev)) {
+ case monc_errc::shutting_down: // Command failed due to MonClient shutting down
+ return "Command failed due to MonClient shutting down";
+ case monc_errc::session_reset:
+ return "Monitor session was reset";
+ case monc_errc::rank_dne:
+ return "Requested monitor rank does not exist";
+ case monc_errc::mon_dne:
+ return "Requested monitor does not exist";
+ case monc_errc::timed_out:
+ return "Monitor operation timed out";
+ case monc_errc::mon_unavailable:
+ return "Monitor unavailable";
+ }
+
+ return "Unknown error";
+}
+
+std::string monc_error_category::message(int ev) const {
+ return message(ev, nullptr, 0);
+}
+
+bs::error_condition monc_error_category::default_error_condition(int ev) const noexcept {
+ switch (static_cast<monc_errc>(ev)) {
+ case monc_errc::shutting_down:
+ return bs::errc::operation_canceled;
+ case monc_errc::session_reset:
+ return bs::errc::resource_unavailable_try_again;
+ case monc_errc::rank_dne:
+ [[fallthrough]];
+ case monc_errc::mon_dne:
+ return ceph::errc::not_in_map;
+ case monc_errc::timed_out:
+ return bs::errc::timed_out;
+ case monc_errc::mon_unavailable:
+ return bs::errc::no_such_device;
+ }
+ return { ev, *this };
+}
+
+bool monc_error_category::equivalent(int ev, const bs::error_condition& c) const noexcept {
+ switch (static_cast<monc_errc>(ev)) {
+ case monc_errc::rank_dne:
+ [[fallthrough]];
+ case monc_errc::mon_dne:
+ return c == bs::errc::no_such_file_or_directory;
+ default:
+ return default_error_condition(ev) == c;
+ }
+}
+
+int monc_error_category::from_code(int ev) const noexcept {
+ if (ev == 0)
+ return 0;
+
+ switch (static_cast<monc_errc>(ev)) {
+ case monc_errc::shutting_down:
+ return -ECANCELED;
+ case monc_errc::session_reset:
+ return -EAGAIN;
+ case monc_errc::rank_dne:
+ [[fallthrough]];
+ case monc_errc::mon_dne:
+ return -ENOENT;
+ case monc_errc::timed_out:
+ return -ETIMEDOUT;
+ case monc_errc::mon_unavailable:
+ return -ENXIO;
+ }
+ return -EDOM;
+}
+
+const bs::error_category& monc_category() noexcept {
+ static const monc_error_category c;
+ return c;
+}