summaryrefslogtreecommitdiffstats
path: root/src/crimson/mon
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/crimson/mon
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/mon')
-rw-r--r--src/crimson/mon/MonClient.cc1111
-rw-r--r--src/crimson/mon/MonClient.h183
2 files changed, 1294 insertions, 0 deletions
diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc
new file mode 100644
index 000000000..9dfbb103a
--- /dev/null
+++ b/src/crimson/mon/MonClient.cc
@@ -0,0 +1,1111 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "MonClient.h"
+
+#include <random>
+
+#include <seastar/core/future-util.hh>
+#include <seastar/core/lowres_clock.hh>
+#include <seastar/core/shared_future.hh>
+#include <seastar/util/log.hh>
+
+#include "auth/AuthClientHandler.h"
+#include "auth/RotatingKeyRing.h"
+
+#include "common/hostname.h"
+
+#include "crimson/auth/KeyRing.h"
+#include "crimson/common/config_proxy.h"
+#include "crimson/common/log.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Errors.h"
+#include "crimson/net/Messenger.h"
+
+#include "messages/MAuth.h"
+#include "messages/MAuthReply.h"
+#include "messages/MConfig.h"
+#include "messages/MLogAck.h"
+#include "messages/MMonCommand.h"
+#include "messages/MMonCommandAck.h"
+#include "messages/MMonGetMap.h"
+#include "messages/MMonGetVersion.h"
+#include "messages/MMonGetVersionReply.h"
+#include "messages/MMonMap.h"
+#include "messages/MMonSubscribe.h"
+#include "messages/MMonSubscribeAck.h"
+
+namespace {
+ seastar::logger& logger()
+ {
+ return crimson::get_logger(ceph_subsys_monc);
+ }
+}
+
+namespace crimson::mon {
+
+using crimson::common::local_conf;
+
+class Connection {
+public:
+ Connection(const AuthRegistry& auth_registry,
+ crimson::net::ConnectionRef conn,
+ KeyRing* keyring);
+ enum class auth_result_t {
+ success = 0,
+ failure,
+ canceled
+ };
+ seastar::future<> handle_auth_reply(Ref<MAuthReply> m);
+ // v1
+ seastar::future<auth_result_t> authenticate_v1(
+ epoch_t epoch,
+ const EntityName& name,
+ uint32_t want_keys);
+ // v2
+ seastar::future<auth_result_t> authenticate_v2();
+ auth::AuthClient::auth_request_t
+ get_auth_request(const EntityName& name,
+ uint32_t want_keys);
+ using secret_t = string;
+ tuple<CryptoKey, secret_t, bufferlist>
+ handle_auth_reply_more(const ceph::buffer::list& bl);
+ int handle_auth_bad_method(uint32_t old_auth_method,
+ int result,
+ const std::vector<uint32_t>& allowed_methods,
+ const std::vector<uint32_t>& allowed_modes);
+
+ // v1 and v2
+ tuple<CryptoKey, secret_t, int>
+ handle_auth_done(uint64_t new_global_id,
+ const ceph::buffer::list& bl);
+ void close();
+ bool is_my_peer(const entity_addr_t& addr) const;
+ AuthAuthorizer* get_authorizer(entity_type_t peer) const;
+ KeyStore& get_keys();
+ seastar::future<> renew_tickets();
+ seastar::future<> renew_rotating_keyring();
+
+ crimson::net::ConnectionRef get_conn();
+
+private:
+ seastar::future<> setup_session(epoch_t epoch,
+ const EntityName& name);
+ std::unique_ptr<AuthClientHandler> create_auth(crimson::auth::method_t,
+ uint64_t global_id,
+ const EntityName& name,
+ uint32_t want_keys);
+ enum class request_t {
+ rotating,
+ general,
+ };
+ seastar::future<std::optional<auth_result_t>> do_auth_single(request_t);
+ seastar::future<auth_result_t> do_auth(request_t);
+
+private:
+ bool closed = false;
+ // v1
+ seastar::shared_promise<Ref<MAuthReply>> reply;
+ // v2
+ using clock_t = seastar::lowres_system_clock;
+ clock_t::time_point auth_start;
+ crimson::auth::method_t auth_method = 0;
+ std::optional<seastar::promise<auth_result_t>> auth_done;
+ // v1 and v2
+ const AuthRegistry& auth_registry;
+ crimson::net::ConnectionRef conn;
+ std::unique_ptr<AuthClientHandler> auth;
+ std::unique_ptr<RotatingKeyRing> rotating_keyring;
+ uint64_t global_id = 0;
+ clock_t::time_point last_rotating_renew_sent;
+};
+
+Connection::Connection(const AuthRegistry& auth_registry,
+ crimson::net::ConnectionRef conn,
+ KeyRing* keyring)
+ : auth_registry{auth_registry},
+ conn{conn},
+ rotating_keyring{
+ std::make_unique<RotatingKeyRing>(nullptr,
+ CEPH_ENTITY_TYPE_OSD,
+ keyring)}
+{}
+
+seastar::future<> Connection::handle_auth_reply(Ref<MAuthReply> m)
+{
+ reply.set_value(m);
+ reply = {};
+ return seastar::now();
+}
+
+seastar::future<> Connection::renew_tickets()
+{
+ if (auth->need_tickets()) {
+ return do_auth(request_t::general).then([](auth_result_t r) {
+ if (r != auth_result_t::success) {
+ throw std::system_error(
+ make_error_code(
+ crimson::net::error::negotiation_failure));
+ }
+ });
+ }
+ return seastar::now();
+}
+
+seastar::future<> Connection::renew_rotating_keyring()
+{
+ auto now = clock_t::now();
+ auto ttl = std::chrono::seconds{
+ static_cast<long>(crimson::common::local_conf()->auth_service_ticket_ttl)};
+ auto cutoff = now - ttl / 4;
+ if (!rotating_keyring->need_new_secrets(utime_t(cutoff))) {
+ return seastar::now();
+ }
+ if (now - last_rotating_renew_sent < std::chrono::seconds{1}) {
+ logger().info("renew_rotating_keyring called too often");
+ return seastar::now();
+ }
+ last_rotating_renew_sent = now;
+ return do_auth(request_t::rotating).then([](auth_result_t r) {
+ if (r != auth_result_t::success) {
+ throw std::system_error(make_error_code(
+ crimson::net::error::negotiation_failure));
+ }
+ });
+}
+
+AuthAuthorizer* Connection::get_authorizer(entity_type_t peer) const
+{
+ if (auth) {
+ return auth->build_authorizer(peer);
+ } else {
+ return nullptr;
+ }
+}
+
+KeyStore& Connection::get_keys() {
+ return *rotating_keyring;
+}
+
+std::unique_ptr<AuthClientHandler>
+Connection::create_auth(crimson::auth::method_t protocol,
+ uint64_t global_id,
+ const EntityName& name,
+ uint32_t want_keys)
+{
+ static crimson::common::CephContext cct;
+ std::unique_ptr<AuthClientHandler> auth;
+ auth.reset(AuthClientHandler::create(&cct,
+ protocol,
+ rotating_keyring.get()));
+ if (!auth) {
+ logger().error("no handler for protocol {}", protocol);
+ throw std::system_error(make_error_code(
+ crimson::net::error::negotiation_failure));
+ }
+ auth->init(name);
+ auth->set_want_keys(want_keys);
+ auth->set_global_id(global_id);
+ return auth;
+}
+
+seastar::future<>
+Connection::setup_session(epoch_t epoch,
+ const EntityName& name)
+{
+ auto m = ceph::make_message<MAuth>();
+ m->protocol = CEPH_AUTH_UNKNOWN;
+ m->monmap_epoch = epoch;
+ __u8 struct_v = 1;
+ encode(struct_v, m->auth_payload);
+ std::vector<crimson::auth::method_t> auth_methods;
+ auth_registry.get_supported_methods(conn->get_peer_type(), &auth_methods);
+ encode(auth_methods, m->auth_payload);
+ encode(name, m->auth_payload);
+ encode(global_id, m->auth_payload);
+ return conn->send(m);
+}
+
+seastar::future<std::optional<Connection::auth_result_t>>
+Connection::do_auth_single(Connection::request_t what)
+{
+ auto m = make_message<MAuth>();
+ m->protocol = auth->get_protocol();
+ auth->prepare_build_request();
+ switch (what) {
+ case request_t::rotating:
+ auth->build_rotating_request(m->auth_payload);
+ break;
+ case request_t::general:
+ if (int ret = auth->build_request(m->auth_payload); ret) {
+ logger().error("missing/bad key for '{}'", local_conf()->name);
+ throw std::system_error(make_error_code(
+ crimson::net::error::negotiation_failure));
+ }
+ break;
+ default:
+ assert(0);
+ }
+ logger().info("sending {}", *m);
+ return conn->send(m).then([this] {
+ logger().info("waiting");
+ return reply.get_shared_future();
+ }).then([this] (Ref<MAuthReply> m) {
+ if (!m) {
+ ceph_assert(closed);
+ logger().info("do_auth: connection closed");
+ return seastar::make_ready_future<std::optional<Connection::auth_result_t>>(
+ std::make_optional(auth_result_t::canceled));
+ }
+ logger().info(
+ "do_auth: mon {} => {} returns {}: {}",
+ conn->get_messenger()->get_myaddr(),
+ conn->get_peer_addr(), *m, m->result);
+ auto p = m->result_bl.cbegin();
+ auto ret = auth->handle_response(m->result, p,
+ nullptr, nullptr);
+ if (ret != 0 && ret != -EAGAIN) {
+ logger().error(
+ "do_auth: got error {} on mon {}",
+ ret,
+ conn->get_peer_addr());
+ }
+ return seastar::make_ready_future<std::optional<Connection::auth_result_t>>(
+ ret == -EAGAIN
+ ? std::nullopt
+ : std::make_optional(ret == 0
+ ? auth_result_t::success
+ : auth_result_t::failure
+ ));
+ });
+}
+
+seastar::future<Connection::auth_result_t>
+Connection::do_auth(Connection::request_t what) {
+ return seastar::repeat_until_value([this, what]() {
+ return do_auth_single(what);
+ });
+}
+
+seastar::future<Connection::auth_result_t>
+Connection::authenticate_v1(epoch_t epoch,
+ const EntityName& name,
+ uint32_t want_keys)
+{
+ return conn->keepalive().then([epoch, name, this] {
+ return setup_session(epoch, name);
+ }).then([this] {
+ return reply.get_shared_future();
+ }).then([name, want_keys, this](Ref<MAuthReply> m) {
+ if (!m) {
+ logger().error("authenticate_v1 canceled on {}", name);
+ return seastar::make_ready_future<auth_result_t>(auth_result_t::canceled);
+ }
+ global_id = m->global_id;
+ auth = create_auth(m->protocol, m->global_id, name, want_keys);
+ switch (auto p = m->result_bl.cbegin();
+ auth->handle_response(m->result, p,
+ nullptr, nullptr)) {
+ case 0:
+ // none
+ return seastar::make_ready_future<auth_result_t>(auth_result_t::success);
+ case -EAGAIN:
+ // cephx
+ return do_auth(request_t::general);
+ default:
+ ceph_assert_always(0);
+ }
+ }).handle_exception([](auto ep) {
+ logger().error("authenticate_v1 failed with {}", ep);
+ return seastar::make_ready_future<auth_result_t>(auth_result_t::canceled);
+ });
+}
+
+seastar::future<Connection::auth_result_t> Connection::authenticate_v2()
+{
+ auth_start = seastar::lowres_system_clock::now();
+ return conn->send(make_message<MMonGetMap>()).then([this] {
+ auth_done.emplace();
+ return auth_done->get_future();
+ });
+}
+
+auth::AuthClient::auth_request_t
+Connection::get_auth_request(const EntityName& entity_name,
+ uint32_t want_keys)
+{
+ // choose method
+ auth_method = [&] {
+ std::vector<crimson::auth::method_t> methods;
+ auth_registry.get_supported_methods(conn->get_peer_type(), &methods);
+ if (methods.empty()) {
+ logger().info("get_auth_request no methods is supported");
+ throw crimson::auth::error("no methods is supported");
+ }
+ return methods.front();
+ }();
+
+ std::vector<uint32_t> modes;
+ auth_registry.get_supported_modes(conn->get_peer_type(), auth_method,
+ &modes);
+ logger().info("method {} preferred_modes {}", auth_method, modes);
+ if (modes.empty()) {
+ throw crimson::auth::error("no modes is supported");
+ }
+ auth = create_auth(auth_method, global_id, entity_name, want_keys);
+
+ using ceph::encode;
+ bufferlist bl;
+ // initial request includes some boilerplate...
+ encode((char)AUTH_MODE_MON, bl);
+ encode(entity_name, bl);
+ encode(global_id, bl);
+ // and (maybe) some method-specific initial payload
+ auth->build_initial_request(&bl);
+ return {auth_method, modes, bl};
+}
+
+tuple<CryptoKey, Connection::secret_t, bufferlist>
+Connection::handle_auth_reply_more(const ceph::buffer::list& payload)
+{
+ CryptoKey session_key;
+ secret_t connection_secret;
+ bufferlist reply;
+ auto p = payload.cbegin();
+ int r = auth->handle_response(0, p, &session_key, &connection_secret);
+ if (r == -EAGAIN) {
+ auth->prepare_build_request();
+ auth->build_request(reply);
+ logger().info(" responding with {} bytes", reply.length());
+ return {session_key, connection_secret, reply};
+ } else if (r < 0) {
+ logger().error(" handle_response returned {}", r);
+ throw crimson::auth::error("unable to build auth");
+ } else {
+ logger().info("authenticated!");
+ std::terminate();
+ }
+}
+
+tuple<CryptoKey, Connection::secret_t, int>
+Connection::handle_auth_done(uint64_t new_global_id,
+ const ceph::buffer::list& payload)
+{
+ global_id = new_global_id;
+ auth->set_global_id(global_id);
+ auto p = payload.begin();
+ CryptoKey session_key;
+ secret_t connection_secret;
+ int r = auth->handle_response(0, p, &session_key, &connection_secret);
+ conn->set_last_keepalive_ack(auth_start);
+ if (auth_done) {
+ auth_done->set_value(auth_result_t::success);
+ auth_done.reset();
+ }
+ return {session_key, connection_secret, r};
+}
+
+int Connection::handle_auth_bad_method(uint32_t old_auth_method,
+ int result,
+ const std::vector<uint32_t>& allowed_methods,
+ const std::vector<uint32_t>& allowed_modes)
+{
+ logger().info("old_auth_method {} result {} allowed_methods {}",
+ old_auth_method, cpp_strerror(result), allowed_methods);
+ std::vector<uint32_t> auth_supported;
+ auth_registry.get_supported_methods(conn->get_peer_type(), &auth_supported);
+ auto p = std::find(auth_supported.begin(), auth_supported.end(),
+ old_auth_method);
+ assert(p != auth_supported.end());
+ p = std::find_first_of(std::next(p), auth_supported.end(),
+ allowed_methods.begin(), allowed_methods.end());
+ if (p == auth_supported.end()) {
+ logger().error("server allowed_methods {} but i only support {}",
+ allowed_methods, auth_supported);
+ assert(auth_done);
+ auth_done->set_exception(std::system_error(make_error_code(
+ crimson::net::error::negotiation_failure)));
+ return -EACCES;
+ }
+ auth_method = *p;
+ logger().info("will try {} next", auth_method);
+ return 0;
+}
+
+void Connection::close()
+{
+ reply.set_value(Ref<MAuthReply>(nullptr));
+ reply = {};
+ if (auth_done) {
+ auth_done->set_value(auth_result_t::canceled);
+ auth_done.reset();
+ }
+ if (conn && !std::exchange(closed, true)) {
+ conn->mark_down();
+ }
+}
+
+bool Connection::is_my_peer(const entity_addr_t& addr) const
+{
+ ceph_assert(conn);
+ return conn->get_peer_addr() == addr;
+}
+
+crimson::net::ConnectionRef Connection::get_conn() {
+ return conn;
+}
+
+Client::Client(crimson::net::Messenger& messenger,
+ crimson::common::AuthHandler& auth_handler)
+ // currently, crimson is OSD-only
+ : want_keys{CEPH_ENTITY_TYPE_MON |
+ CEPH_ENTITY_TYPE_OSD |
+ CEPH_ENTITY_TYPE_MGR},
+ timer{[this] { tick(); }},
+ msgr{messenger},
+ auth_registry{&cct},
+ auth_handler{auth_handler}
+{}
+
+Client::Client(Client&&) = default;
+Client::~Client() = default;
+
+seastar::future<> Client::start() {
+ entity_name = crimson::common::local_conf()->name;
+ auth_registry.refresh_config();
+ return load_keyring().then([this] {
+ return monmap.build_initial(crimson::common::local_conf(), false);
+ }).then([this] {
+ return authenticate();
+ }).then([this] {
+ auto interval =
+ std::chrono::duration_cast<seastar::lowres_clock::duration>(
+ std::chrono::duration<double>(
+ local_conf().get_val<double>("mon_client_ping_interval")));
+ timer.arm_periodic(interval);
+ });
+}
+
+seastar::future<> Client::load_keyring()
+{
+ if (!auth_registry.is_supported_method(msgr.get_mytype(), CEPH_AUTH_CEPHX)) {
+ return seastar::now();
+ } else {
+ return crimson::auth::load_from_keyring(&keyring).then([](KeyRing* keyring) {
+ return crimson::auth::load_from_keyfile(keyring);
+ }).then([](KeyRing* keyring) {
+ return crimson::auth::load_from_key(keyring);
+ }).then([](KeyRing*) {
+ return seastar::now();
+ });
+ }
+}
+
+void Client::tick()
+{
+ gate.dispatch_in_background(__func__, *this, [this] {
+ if (active_con) {
+ return seastar::when_all_succeed(active_con->get_conn()->keepalive(),
+ active_con->renew_tickets(),
+ active_con->renew_rotating_keyring()).then_unpack([] {});
+ } else {
+ return seastar::now();
+ }
+ });
+}
+
+bool Client::is_hunting() const {
+ return !active_con;
+}
+
+std::optional<seastar::future<>>
+Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
+{
+ bool dispatched = true;
+ gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
+ // we only care about these message types
+ switch (m->get_type()) {
+ case CEPH_MSG_MON_MAP:
+ return handle_monmap(conn, boost::static_pointer_cast<MMonMap>(m));
+ case CEPH_MSG_AUTH_REPLY:
+ return handle_auth_reply(
+ conn, boost::static_pointer_cast<MAuthReply>(m));
+ case CEPH_MSG_MON_SUBSCRIBE_ACK:
+ return handle_subscribe_ack(
+ boost::static_pointer_cast<MMonSubscribeAck>(m));
+ case CEPH_MSG_MON_GET_VERSION_REPLY:
+ return handle_get_version_reply(
+ boost::static_pointer_cast<MMonGetVersionReply>(m));
+ case MSG_MON_COMMAND_ACK:
+ return handle_mon_command_ack(
+ boost::static_pointer_cast<MMonCommandAck>(m));
+ case MSG_LOGACK:
+ return handle_log_ack(
+ boost::static_pointer_cast<MLogAck>(m));
+ case MSG_CONFIG:
+ return handle_config(
+ boost::static_pointer_cast<MConfig>(m));
+ default:
+ dispatched = false;
+ return seastar::now();
+ }
+ });
+ return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
+}
+
+void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool /* is_replace */)
+{
+ gate.dispatch_in_background(__func__, *this, [this, conn] {
+ auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
+ [peer_addr = conn->get_peer_addr()](auto& mc) {
+ return mc->is_my_peer(peer_addr);
+ });
+ if (found != pending_conns.end()) {
+ logger().warn("pending conn reset by {}", conn->get_peer_addr());
+ (*found)->close();
+ return seastar::now();
+ } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) {
+ logger().warn("active conn reset {}", conn->get_peer_addr());
+ active_con.reset();
+ return reopen_session(-1).then([this] {
+ send_pendings();
+ return seastar::now();
+ });
+ } else {
+ return seastar::now();
+ }
+ });
+}
+
+std::pair<std::vector<uint32_t>, std::vector<uint32_t>>
+Client::get_supported_auth_methods(int peer_type)
+{
+ std::vector<uint32_t> methods;
+ std::vector<uint32_t> modes;
+ auth_registry.get_supported_methods(peer_type, &methods, &modes);
+ return {methods, modes};
+}
+
+uint32_t Client::pick_con_mode(int peer_type,
+ uint32_t auth_method,
+ const std::vector<uint32_t>& preferred_modes)
+{
+ return auth_registry.pick_mode(peer_type, auth_method, preferred_modes);
+}
+
+AuthAuthorizeHandler* Client::get_auth_authorize_handler(int peer_type,
+ int auth_method)
+{
+ return auth_registry.get_handler(peer_type, auth_method);
+}
+
+
+int Client::handle_auth_request(crimson::net::ConnectionRef con,
+ AuthConnectionMetaRef auth_meta,
+ bool more,
+ uint32_t auth_method,
+ const ceph::bufferlist& payload,
+ ceph::bufferlist *reply)
+{
+ // for some channels prior to nautilus (osd heartbeat), we tolerate the lack of
+ // an authorizer.
+ if (payload.length() == 0) {
+ if (con->get_messenger()->get_require_authorizer()) {
+ return -EACCES;
+ } else {
+ auth_handler.handle_authentication({}, {});
+ return 1;
+ }
+ }
+ auth_meta->auth_mode = payload[0];
+ if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER ||
+ auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
+ return -EACCES;
+ }
+ AuthAuthorizeHandler* ah = get_auth_authorize_handler(con->get_peer_type(),
+ auth_method);
+ if (!ah) {
+ logger().error("no AuthAuthorizeHandler found for auth method: {}",
+ auth_method);
+ return -EOPNOTSUPP;
+ }
+ auto authorizer_challenge = &auth_meta->authorizer_challenge;
+ if (auth_meta->skip_authorizer_challenge) {
+ logger().info("skipping challenge on {}", con);
+ authorizer_challenge = nullptr;
+ }
+ bool was_challenge = (bool)auth_meta->authorizer_challenge;
+ EntityName name;
+ AuthCapsInfo caps_info;
+ bool is_valid = ah->verify_authorizer(
+ &cct,
+ active_con->get_keys(),
+ payload,
+ auth_meta->get_connection_secret_length(),
+ reply,
+ &name,
+ &active_con->get_conn()->peer_global_id,
+ &caps_info,
+ &auth_meta->session_key,
+ &auth_meta->connection_secret,
+ authorizer_challenge);
+ if (is_valid) {
+ auth_handler.handle_authentication(name, caps_info);
+ return 1;
+ }
+ if (!more && !was_challenge && auth_meta->authorizer_challenge) {
+ logger().info("added challenge on {}", con);
+ return 0;
+ } else {
+ logger().info("bad authorizer on {}", con);
+ return -EACCES;
+ }
+}
+
+auth::AuthClient::auth_request_t
+Client::get_auth_request(crimson::net::ConnectionRef con,
+ AuthConnectionMetaRef auth_meta)
+{
+ logger().info("get_auth_request(con={}, auth_method={})",
+ con, auth_meta->auth_method);
+ // connection to mon?
+ if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
+ [peer_addr = con->get_peer_addr()](auto& mc) {
+ return mc->is_my_peer(peer_addr);
+ });
+ if (found == pending_conns.end()) {
+ throw crimson::auth::error{"unknown connection"};
+ }
+ return (*found)->get_auth_request(entity_name, want_keys);
+ } else {
+ // generate authorizer
+ if (!active_con) {
+ logger().error(" but no auth handler is set up");
+ throw crimson::auth::error("no auth available");
+ }
+ auto authorizer = active_con->get_authorizer(con->get_peer_type());
+ if (!authorizer) {
+ logger().error("failed to build_authorizer for type {}",
+ ceph_entity_type_name(con->get_peer_type()));
+ throw crimson::auth::error("unable to build auth");
+ }
+ auth_meta->authorizer.reset(authorizer);
+ auth_meta->auth_method = authorizer->protocol;
+ vector<uint32_t> modes;
+ auth_registry.get_supported_modes(con->get_peer_type(),
+ auth_meta->auth_method,
+ &modes);
+ return {authorizer->protocol, modes, authorizer->bl};
+ }
+}
+
+ceph::bufferlist Client::handle_auth_reply_more(crimson::net::ConnectionRef conn,
+ AuthConnectionMetaRef auth_meta,
+ const bufferlist& bl)
+{
+ if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
+ [peer_addr = conn->get_peer_addr()](auto& mc) {
+ return mc->is_my_peer(peer_addr);
+ });
+ if (found == pending_conns.end()) {
+ throw crimson::auth::error{"unknown connection"};
+ }
+ bufferlist reply;
+ tie(auth_meta->session_key, auth_meta->connection_secret, reply) =
+ (*found)->handle_auth_reply_more(bl);
+ return reply;
+ } else {
+ // authorizer challenges
+ if (!active_con || !auth_meta->authorizer) {
+ logger().error("no authorizer?");
+ throw crimson::auth::error("no auth available");
+ }
+ auth_meta->authorizer->add_challenge(&cct, bl);
+ return auth_meta->authorizer->bl;
+ }
+}
+
+int Client::handle_auth_done(crimson::net::ConnectionRef conn,
+ AuthConnectionMetaRef auth_meta,
+ uint64_t global_id,
+ uint32_t con_mode,
+ const bufferlist& bl)
+{
+ if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
+ [peer_addr = conn->get_peer_addr()](auto& mc) {
+ return mc->is_my_peer(peer_addr);
+ });
+ if (found == pending_conns.end()) {
+ return -ENOENT;
+ }
+ int r = 0;
+ tie(auth_meta->session_key, auth_meta->connection_secret, r) =
+ (*found)->handle_auth_done(global_id, bl);
+ return r;
+ } else {
+ // verify authorizer reply
+ auto p = bl.begin();
+ if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) {
+ logger().error("failed verifying authorizer reply");
+ return -EACCES;
+ }
+ auth_meta->session_key = auth_meta->authorizer->session_key;
+ return 0;
+ }
+}
+
+ // Handle server's indication that the previous auth attempt failed
+int Client::handle_auth_bad_method(crimson::net::ConnectionRef conn,
+ AuthConnectionMetaRef auth_meta,
+ uint32_t old_auth_method,
+ int result,
+ const std::vector<uint32_t>& allowed_methods,
+ const std::vector<uint32_t>& allowed_modes)
+{
+ if (conn->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
+ [peer_addr = conn->get_peer_addr()](auto& mc) {
+ return mc->is_my_peer(peer_addr);
+ });
+ if (found != pending_conns.end()) {
+ return (*found)->handle_auth_bad_method(
+ old_auth_method, result,
+ allowed_methods, allowed_modes);
+ } else {
+ return -ENOENT;
+ }
+ } else {
+ // huh...
+ logger().info("hmm, they didn't like {} result {}",
+ old_auth_method, cpp_strerror(result));
+ return -EACCES;
+ }
+}
+
+seastar::future<> Client::handle_monmap(crimson::net::ConnectionRef conn,
+ Ref<MMonMap> m)
+{
+ monmap.decode(m->monmapbl);
+ const auto peer_addr = conn->get_peer_addr();
+ auto cur_mon = monmap.get_name(peer_addr);
+ logger().info("got monmap {}, mon.{}, is now rank {}",
+ monmap.epoch, cur_mon, monmap.get_rank(cur_mon));
+ sub.got("monmap", monmap.get_epoch());
+
+ if (monmap.get_addr_name(peer_addr, cur_mon)) {
+ if (active_con) {
+ logger().info("handle_monmap: renewing tickets");
+ return seastar::when_all_succeed(
+ active_con->renew_tickets(),
+ active_con->renew_rotating_keyring()).then_unpack([](){
+ logger().info("handle_mon_map: renewed tickets");
+ });
+ } else {
+ return seastar::now();
+ }
+ } else {
+ logger().warn("mon.{} went away", cur_mon);
+ return reopen_session(-1).then([this] {
+ send_pendings();
+ return seastar::now();
+ });
+ }
+}
+
+seastar::future<> Client::handle_auth_reply(crimson::net::ConnectionRef conn,
+ Ref<MAuthReply> m)
+{
+ logger().info(
+ "handle_auth_reply mon {} => {} returns {}: {}",
+ conn->get_messenger()->get_myaddr(),
+ conn->get_peer_addr(), *m, m->result);
+ auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
+ [peer_addr = conn->get_peer_addr()](auto& mc) {
+ return mc->is_my_peer(peer_addr);
+ });
+ if (found != pending_conns.end()) {
+ return (*found)->handle_auth_reply(m);
+ } else if (active_con) {
+ return active_con->handle_auth_reply(m);
+ } else {
+ logger().error("unknown auth reply from {}", conn->get_peer_addr());
+ return seastar::now();
+ }
+}
+
+seastar::future<> Client::handle_subscribe_ack(Ref<MMonSubscribeAck> m)
+{
+ sub.acked(m->interval);
+ return seastar::now();
+}
+
+Client::get_version_t Client::get_version(const std::string& map)
+{
+ auto m = make_message<MMonGetVersion>();
+ auto tid = ++last_version_req_id;
+ m->handle = tid;
+ m->what = map;
+ auto& req = version_reqs[tid];
+ return send_message(m).then([&req] {
+ return req.get_future();
+ });
+}
+
+seastar::future<>
+Client::handle_get_version_reply(Ref<MMonGetVersionReply> m)
+{
+ if (auto found = version_reqs.find(m->handle);
+ found != version_reqs.end()) {
+ auto& result = found->second;
+ logger().trace("{}: {} returns {}",
+ __func__, m->handle, m->version);
+ result.set_value(std::make_tuple(m->version, m->oldest_version));
+ version_reqs.erase(found);
+ } else {
+ logger().warn("{}: version request with handle {} not found",
+ __func__, m->handle);
+ }
+ return seastar::now();
+}
+
+seastar::future<> Client::handle_mon_command_ack(Ref<MMonCommandAck> m)
+{
+ const auto tid = m->get_tid();
+ if (auto found = mon_commands.find(tid);
+ found != mon_commands.end()) {
+ auto& result = found->second;
+ logger().trace("{} {}", __func__, tid);
+ result.set_value(std::make_tuple(m->r, m->rs, std::move(m->get_data())));
+ mon_commands.erase(found);
+ } else {
+ logger().warn("{} {} not found", __func__, tid);
+ }
+ return seastar::now();
+}
+
+seastar::future<> Client::handle_log_ack(Ref<MLogAck> m)
+{
+ // XXX
+ return seastar::now();
+}
+
+seastar::future<> Client::handle_config(Ref<MConfig> m)
+{
+ return crimson::common::local_conf().set_mon_vals(m->config);
+}
+
+std::vector<unsigned> Client::get_random_mons(unsigned n) const
+{
+ uint16_t min_priority = std::numeric_limits<uint16_t>::max();
+ for (const auto& m : monmap.mon_info) {
+ if (m.second.priority < min_priority) {
+ min_priority = m.second.priority;
+ }
+ }
+ vector<unsigned> ranks;
+ for (auto [name, info] : monmap.mon_info) {
+ if (info.priority == min_priority) {
+ ranks.push_back(monmap.get_rank(name));
+ }
+ }
+ std::random_device rd;
+ std::default_random_engine rng{rd()};
+ std::shuffle(ranks.begin(), ranks.end(), rng);
+ if (n == 0 || n > ranks.size()) {
+ return ranks;
+ } else {
+ return {ranks.begin(), ranks.begin() + n};
+ }
+}
+
+seastar::future<> Client::authenticate()
+{
+ return reopen_session(-1).then([this] {
+ send_pendings();
+ return seastar::now();
+ });
+}
+
+seastar::future<> Client::stop()
+{
+ logger().info("{}", __func__);
+ auto fut = gate.close();
+ timer.cancel();
+ for (auto& pending_con : pending_conns) {
+ pending_con->close();
+ }
+ if (active_con) {
+ active_con->close();
+ }
+ return fut;
+}
+
+seastar::future<> Client::reopen_session(int rank)
+{
+ logger().info("{} to mon.{}", __func__, rank);
+ vector<unsigned> mons;
+ if (rank >= 0) {
+ mons.push_back(rank);
+ } else {
+ const auto parallel =
+ crimson::common::local_conf().get_val<uint64_t>("mon_client_hunt_parallel");
+ mons = get_random_mons(parallel);
+ }
+ pending_conns.reserve(mons.size());
+ return seastar::parallel_for_each(mons, [this](auto rank) {
+ // TODO: connect to multiple addrs
+ auto peer = monmap.get_addrs(rank).pick_addr(msgr.get_myaddr().get_type());
+ if (peer == entity_addr_t{}) {
+ // crimson msgr only uses the first bound addr
+ logger().warn("mon.{} does not have an addr compatible with me", rank);
+ return seastar::now();
+ }
+ logger().info("connecting to mon.{}", rank);
+ return seastar::futurize_invoke(
+ [peer, this] () -> seastar::future<Connection::auth_result_t> {
+ auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON);
+ auto& mc = pending_conns.emplace_back(
+ std::make_unique<Connection>(auth_registry, conn, &keyring));
+ if (conn->get_peer_addr().is_msgr2()) {
+ return mc->authenticate_v2();
+ } else {
+ return mc->authenticate_v1(monmap.get_epoch(), entity_name, want_keys)
+ .handle_exception([conn](auto ep) {
+ conn->mark_down();
+ return seastar::make_exception_future<Connection::auth_result_t>(ep);
+ });
+ }
+ }).then([peer, this](auto result) {
+ if (result == Connection::auth_result_t::success) {
+ _finish_auth(peer);
+ }
+ logger().debug("reopen_session mon connection attempts complete");
+ }).handle_exception([](auto ep) {
+ logger().error("mon connections failed with ep {}", ep);
+ return seastar::make_exception_future(ep);
+ });
+ }).then([this] {
+ if (!active_con) {
+ return seastar::make_exception_future(
+ crimson::common::system_shutdown_exception());
+ }
+ return active_con->renew_rotating_keyring();
+ });
+}
+
+void Client::_finish_auth(const entity_addr_t& peer)
+{
+ if (!is_hunting()) {
+ return;
+ }
+ logger().info("found mon.{}", monmap.get_name(peer));
+
+ auto found = std::find_if(
+ pending_conns.begin(), pending_conns.end(),
+ [peer](auto& conn) {
+ return conn->is_my_peer(peer);
+ });
+ if (found == pending_conns.end()) {
+ // Happens if another connection has won the race
+ ceph_assert(active_con && pending_conns.empty());
+ logger().info("no pending connection for mon.{}, peer {}",
+ monmap.get_name(peer), peer);
+ return;
+ }
+
+ ceph_assert(!active_con && !pending_conns.empty());
+ active_con = std::move(*found);
+ found->reset();
+ for (auto& conn : pending_conns) {
+ if (conn) {
+ conn->close();
+ }
+ }
+ pending_conns.clear();
+}
+
+Client::command_result_t
+Client::run_command(const std::vector<std::string>& cmd,
+ const bufferlist& bl)
+{
+ auto m = make_message<MMonCommand>(monmap.fsid);
+ auto tid = ++last_mon_command_id;
+ m->set_tid(tid);
+ m->cmd = cmd;
+ m->set_data(bl);
+ auto& req = mon_commands[tid];
+ return send_message(m).then([&req] {
+ return req.get_future();
+ });
+}
+
+seastar::future<> Client::send_message(MessageRef m)
+{
+ if (active_con) {
+ if (!pending_messages.empty()) {
+ send_pendings();
+ }
+ return active_con->get_conn()->send(m);
+ }
+ auto& delayed = pending_messages.emplace_back(m);
+ return delayed.pr.get_future();
+}
+
+void Client::send_pendings()
+{
+ if (active_con) {
+ for (auto& m : pending_messages) {
+ (void) active_con->get_conn()->send(m.msg);
+ m.pr.set_value();
+ }
+ pending_messages.clear();
+ }
+}
+
+bool Client::sub_want(const std::string& what, version_t start, unsigned flags)
+{
+ return sub.want(what, start, flags);
+}
+
+void Client::sub_got(const std::string& what, version_t have)
+{
+ sub.got(what, have);
+}
+
+void Client::sub_unwant(const std::string& what)
+{
+ sub.unwant(what);
+}
+
+bool Client::sub_want_increment(const std::string& what,
+ version_t start,
+ unsigned flags)
+{
+ return sub.inc_want(what, start, flags);
+}
+
+seastar::future<> Client::renew_subs()
+{
+ if (!sub.have_new()) {
+ logger().warn("{} - empty", __func__);
+ return seastar::now();
+ }
+ logger().trace("{}", __func__);
+
+ auto m = make_message<MMonSubscribe>();
+ m->what = sub.get_subs();
+ m->hostname = ceph_get_short_hostname();
+ return send_message(m).then([this] {
+ sub.renewed();
+ });
+}
+
+void Client::print(std::ostream& out) const
+{
+ out << "mon." << entity_name;
+}
+
+} // namespace crimson::mon
diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h
new file mode 100644
index 000000000..e7d2df863
--- /dev/null
+++ b/src/crimson/mon/MonClient.h
@@ -0,0 +1,183 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <memory>
+
+#include <seastar/core/future.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/lowres_clock.hh>
+#include <seastar/core/timer.hh>
+
+#include "auth/AuthRegistry.h"
+#include "auth/KeyRing.h"
+#include "common/ceph_context.h"
+
+#include "crimson/auth/AuthClient.h"
+#include "crimson/auth/AuthServer.h"
+#include "crimson/common/auth_handler.h"
+#include "crimson/common/gated.h"
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Fwd.h"
+
+#include "mon/MonMap.h"
+
+#include "mon/MonSub.h"
+
+template<typename Message> using Ref = boost::intrusive_ptr<Message>;
+namespace crimson::net {
+ class Messenger;
+}
+
+struct AuthAuthorizeHandler;
+class MAuthReply;
+struct MMonMap;
+struct MMonSubscribeAck;
+struct MMonGetVersionReply;
+struct MMonCommandAck;
+struct MLogAck;
+struct MConfig;
+
+namespace crimson::mon {
+
+class Connection;
+
+class Client : public crimson::net::Dispatcher,
+ public crimson::auth::AuthClient,
+ public crimson::auth::AuthServer
+{
+ EntityName entity_name;
+ KeyRing keyring;
+ const uint32_t want_keys;
+
+ MonMap monmap;
+ std::unique_ptr<Connection> active_con;
+ std::vector<std::unique_ptr<Connection>> pending_conns;
+ seastar::timer<seastar::lowres_clock> timer;
+
+ crimson::net::Messenger& msgr;
+
+ // commands
+ using get_version_t = seastar::future<std::tuple<version_t, version_t>>;
+
+ ceph_tid_t last_version_req_id = 0;
+ std::map<ceph_tid_t, typename get_version_t::promise_type> version_reqs;
+
+ ceph_tid_t last_mon_command_id = 0;
+ using command_result_t =
+ seastar::future<std::tuple<std::int32_t, string, ceph::bufferlist>>;
+ std::map<ceph_tid_t, typename command_result_t::promise_type> mon_commands;
+
+ MonSub sub;
+
+public:
+ Client(crimson::net::Messenger&, crimson::common::AuthHandler&);
+ Client(Client&&);
+ ~Client();
+ seastar::future<> start();
+ seastar::future<> stop();
+
+ const uuid_d& get_fsid() const {
+ return monmap.fsid;
+ }
+ get_version_t get_version(const std::string& map);
+ command_result_t run_command(const std::vector<std::string>& cmd,
+ const bufferlist& bl);
+ seastar::future<> send_message(MessageRef);
+ bool sub_want(const std::string& what, version_t start, unsigned flags);
+ void sub_got(const std::string& what, version_t have);
+ void sub_unwant(const std::string& what);
+ bool sub_want_increment(const std::string& what, version_t start, unsigned flags);
+ seastar::future<> renew_subs();
+
+ void print(std::ostream&) const;
+private:
+ // AuthServer methods
+ std::pair<std::vector<uint32_t>, std::vector<uint32_t>>
+ get_supported_auth_methods(int peer_type) final;
+ uint32_t pick_con_mode(int peer_type,
+ uint32_t auth_method,
+ const std::vector<uint32_t>& preferred_modes) final;
+ AuthAuthorizeHandler* get_auth_authorize_handler(int peer_type,
+ int auth_method) final;
+ int handle_auth_request(crimson::net::ConnectionRef conn,
+ AuthConnectionMetaRef auth_meta,
+ bool more,
+ uint32_t auth_method,
+ const ceph::bufferlist& payload,
+ ceph::bufferlist *reply) final;
+
+ crimson::common::CephContext cct; // for auth_registry
+ AuthRegistry auth_registry;
+ crimson::common::AuthHandler& auth_handler;
+
+ // AuthClient methods
+ crimson::auth::AuthClient::auth_request_t
+ get_auth_request(crimson::net::ConnectionRef conn,
+ AuthConnectionMetaRef auth_meta) final;
+
+ // Handle server's request to continue the handshake
+ ceph::bufferlist handle_auth_reply_more(crimson::net::ConnectionRef conn,
+ AuthConnectionMetaRef auth_meta,
+ const bufferlist& bl) final;
+
+ // Handle server's indication that authentication succeeded
+ int handle_auth_done(crimson::net::ConnectionRef conn,
+ AuthConnectionMetaRef auth_meta,
+ uint64_t global_id,
+ uint32_t con_mode,
+ const bufferlist& bl) final;
+
+ // Handle server's indication that the previous auth attempt failed
+ int handle_auth_bad_method(crimson::net::ConnectionRef conn,
+ AuthConnectionMetaRef auth_meta,
+ uint32_t old_auth_method,
+ int result,
+ const std::vector<uint32_t>& allowed_methods,
+ const std::vector<uint32_t>& allowed_modes) final;
+
+private:
+ void tick();
+
+ std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef conn,
+ MessageRef m) override;
+ void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
+
+ seastar::future<> handle_monmap(crimson::net::ConnectionRef conn,
+ Ref<MMonMap> m);
+ seastar::future<> handle_auth_reply(crimson::net::ConnectionRef conn,
+ Ref<MAuthReply> m);
+ seastar::future<> handle_subscribe_ack(Ref<MMonSubscribeAck> m);
+ seastar::future<> handle_get_version_reply(Ref<MMonGetVersionReply> m);
+ seastar::future<> handle_mon_command_ack(Ref<MMonCommandAck> m);
+ seastar::future<> handle_log_ack(Ref<MLogAck> m);
+ seastar::future<> handle_config(Ref<MConfig> m);
+
+ void send_pendings();
+private:
+ seastar::future<> load_keyring();
+ seastar::future<> authenticate();
+
+ bool is_hunting() const;
+ seastar::future<> reopen_session(int rank);
+ std::vector<unsigned> get_random_mons(unsigned n) const;
+ seastar::future<> _add_conn(unsigned rank, uint64_t global_id);
+ void _finish_auth(const entity_addr_t& peer);
+ crimson::common::Gated gate;
+
+ // messages that are waiting for the active_con to be available
+ struct pending_msg_t {
+ pending_msg_t(MessageRef& m) : msg(m) {}
+ MessageRef msg;
+ seastar::promise<> pr;
+ };
+ std::deque<pending_msg_t> pending_messages;
+};
+
+inline std::ostream& operator<<(std::ostream& out, const Client& client) {
+ client.print(out);
+ return out;
+}
+
+} // namespace crimson::mon