From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/crimson/mon/MonClient.cc | 1162 ++++++++++++++++++++++++++++++++++++++++++ src/crimson/mon/MonClient.h | 218 ++++++++ 2 files changed, 1380 insertions(+) create mode 100644 src/crimson/mon/MonClient.cc create mode 100644 src/crimson/mon/MonClient.h (limited to 'src/crimson/mon') diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc new file mode 100644 index 000000000..7be09915a --- /dev/null +++ b/src/crimson/mon/MonClient.cc @@ -0,0 +1,1162 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "MonClient.h" + +#include +#include +#include +#include +#include +#include + +#include "auth/AuthClientHandler.h" +#include "auth/RotatingKeyRing.h" + +#include "common/hostname.h" + +#include "crimson/auth/KeyRing.h" +#include "crimson/common/config_proxy.h" +#include "crimson/common/log.h" +#include "crimson/common/logclient.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Errors.h" +#include "crimson/net/Messenger.h" + +#include "messages/MAuth.h" +#include "messages/MAuthReply.h" +#include "messages/MConfig.h" +#include "messages/MLogAck.h" +#include "messages/MMonCommand.h" +#include "messages/MMonCommandAck.h" +#include "messages/MMonGetMap.h" +#include "messages/MMonGetVersion.h" +#include "messages/MMonGetVersionReply.h" +#include "messages/MMonMap.h" +#include "messages/MMonSubscribe.h" +#include "messages/MMonSubscribeAck.h" + +using std::string; +using std::tuple; +using std::vector; + +namespace { + seastar::logger& logger() + { + return crimson::get_logger(ceph_subsys_monc); + } +} + +namespace crimson::mon { + +using crimson::common::local_conf; + +class Connection : public seastar::enable_shared_from_this { +public: + Connection(const AuthRegistry& auth_registry, + crimson::net::ConnectionRef conn, + KeyRing* keyring); + enum class auth_result_t { + success = 0, + failure, + canceled + }; + seastar::future<> handle_auth_reply(Ref m); + // v2 + seastar::future authenticate_v2(); + auth::AuthClient::auth_request_t + get_auth_request(const EntityName& name, + uint32_t want_keys); + using secret_t = string; + tuple + handle_auth_reply_more(const ceph::buffer::list& bl); + int handle_auth_bad_method(uint32_t old_auth_method, + int result, + const std::vector& allowed_methods, + const std::vector& allowed_modes); + tuple + handle_auth_done(uint64_t new_global_id, + const ceph::buffer::list& bl); + void close(); + bool is_my_peer(const entity_addr_t& addr) const; + AuthAuthorizer* get_authorizer(entity_type_t peer) const; + KeyStore& get_keys(); + seastar::future<> renew_tickets(); + seastar::future<> renew_rotating_keyring(); + + crimson::net::ConnectionRef get_conn(); + +private: + std::unique_ptr create_auth(crimson::auth::method_t, + uint64_t global_id, + const EntityName& name, + uint32_t want_keys); + enum class request_t { + rotating, + general, + }; + seastar::future> do_auth_single(request_t); + seastar::future do_auth(request_t); + +private: + bool closed = false; + seastar::shared_promise> auth_reply; + // v2 + using clock_t = seastar::lowres_system_clock; + clock_t::time_point auth_start; + crimson::auth::method_t auth_method = 0; + std::optional> auth_done; + const AuthRegistry& auth_registry; + crimson::net::ConnectionRef conn; + std::unique_ptr auth; + std::unique_ptr rotating_keyring; + uint64_t global_id = 0; + clock_t::time_point last_rotating_renew_sent; +}; + +Connection::Connection(const AuthRegistry& auth_registry, + crimson::net::ConnectionRef conn, + KeyRing* keyring) + : auth_registry{auth_registry}, + conn{conn}, + rotating_keyring{ + std::make_unique(nullptr, + CEPH_ENTITY_TYPE_OSD, + keyring)} +{} + +seastar::future<> Connection::handle_auth_reply(Ref m) +{ + logger().info("{}", __func__); + ceph_assert(m); + auth_reply.set_value(m); + auth_reply = {}; + return seastar::now(); +} + +seastar::future<> Connection::renew_tickets() +{ + if (auth->need_tickets()) { + logger().info("{}: retrieving new tickets", __func__); + return do_auth(request_t::general).then([](const auth_result_t r) { + if (r == auth_result_t::failure) { + logger().info("renew_tickets: ignoring failed auth reply"); + } + }); + } else { + logger().debug("{}: don't need new tickets", __func__); + return seastar::now(); + } +} + +seastar::future<> Connection::renew_rotating_keyring() +{ + auto now = clock_t::now(); + auto ttl = std::chrono::seconds{ + static_cast(crimson::common::local_conf()->auth_service_ticket_ttl)}; + auto cutoff = utime_t{now - std::min(std::chrono::seconds{30}, ttl / 4)}; + if (!rotating_keyring->need_new_secrets(cutoff)) { + logger().debug("renew_rotating_keyring secrets are up-to-date " + "(they expire after {})", cutoff); + return seastar::now(); + } else { + logger().info("renew_rotating_keyring renewing rotating keys " + " (they expired before {})", cutoff); + } + if ((now > last_rotating_renew_sent) && + (now - last_rotating_renew_sent < std::chrono::seconds{1})) { + logger().info("renew_rotating_keyring called too often (last: {})", + utime_t{last_rotating_renew_sent}); + return seastar::now(); + } + last_rotating_renew_sent = now; + return do_auth(request_t::rotating).then([](const auth_result_t r) { + if (r == auth_result_t::failure) { + logger().info("renew_rotating_keyring: ignoring failed auth reply"); + } + }); +} + +AuthAuthorizer* Connection::get_authorizer(entity_type_t peer) const +{ + if (auth) { + return auth->build_authorizer(peer); + } else { + return nullptr; + } +} + +KeyStore& Connection::get_keys() { + return *rotating_keyring; +} + +std::unique_ptr +Connection::create_auth(crimson::auth::method_t protocol, + uint64_t global_id, + const EntityName& name, + uint32_t want_keys) +{ + static crimson::common::CephContext cct; + std::unique_ptr auth; + auth.reset(AuthClientHandler::create(&cct, + protocol, + rotating_keyring.get())); + if (!auth) { + logger().error("no handler for protocol {}", protocol); + throw std::system_error(make_error_code( + crimson::net::error::negotiation_failure)); + } + auth->init(name); + auth->set_want_keys(want_keys); + auth->set_global_id(global_id); + return auth; +} + +seastar::future> +Connection::do_auth_single(Connection::request_t what) +{ + auto m = crimson::make_message(); + m->protocol = auth->get_protocol(); + auth->prepare_build_request(); + switch (what) { + case request_t::rotating: + auth->build_rotating_request(m->auth_payload); + break; + case request_t::general: + if (int ret = auth->build_request(m->auth_payload); ret) { + logger().error("missing/bad key for '{}'", local_conf()->name); + throw std::system_error(make_error_code( + crimson::net::error::negotiation_failure)); + } + break; + default: + assert(0); + } + logger().info("sending {}", *m); + return conn->send(std::move(m)).then([this] { + logger().info("waiting"); + return auth_reply.get_shared_future(); + }).then([this, life_extender=shared_from_this()] (Ref m) { + if (!m) { + ceph_assert(closed); + logger().info("do_auth_single: connection closed"); + return std::make_optional(auth_result_t::canceled); + } + logger().info("do_auth_single: {} returns {}: {}", + *conn, *m, m->result); + auto p = m->result_bl.cbegin(); + auto ret = auth->handle_response(m->result, p, + nullptr, nullptr); + std::optional auth_result; + switch (ret) { + case -EAGAIN: + auth_result = std::nullopt; + break; + case 0: + auth_result = auth_result_t::success; + break; + default: + auth_result = auth_result_t::failure; + logger().error( + "do_auth_single: got error {} on mon {}", + ret, conn->get_peer_addr()); + break; + } + return auth_result; + }); +} + +seastar::future +Connection::do_auth(Connection::request_t what) { + return seastar::repeat_until_value( + [this, life_extender=shared_from_this(), what]() { + return do_auth_single(what); + }); +} + +seastar::future Connection::authenticate_v2() +{ + auth_start = seastar::lowres_system_clock::now(); + return conn->send(crimson::make_message()).then([this] { + auth_done.emplace(); + return auth_done->get_future(); + }); +} + +auth::AuthClient::auth_request_t +Connection::get_auth_request(const EntityName& entity_name, + uint32_t want_keys) +{ + // choose method + auth_method = [&] { + std::vector methods; + auth_registry.get_supported_methods(conn->get_peer_type(), &methods); + if (methods.empty()) { + logger().info("get_auth_request no methods is supported"); + throw crimson::auth::error("no methods is supported"); + } + return methods.front(); + }(); + + std::vector modes; + auth_registry.get_supported_modes(conn->get_peer_type(), auth_method, + &modes); + logger().info("method {} preferred_modes {}", auth_method, modes); + if (modes.empty()) { + throw crimson::auth::error("no modes is supported"); + } + auth = create_auth(auth_method, global_id, entity_name, want_keys); + + using ceph::encode; + bufferlist bl; + // initial request includes some boilerplate... + encode((char)AUTH_MODE_MON, bl); + encode(entity_name, bl); + encode(global_id, bl); + // and (maybe) some method-specific initial payload + auth->build_initial_request(&bl); + return {auth_method, modes, bl}; +} + +tuple +Connection::handle_auth_reply_more(const ceph::buffer::list& payload) +{ + CryptoKey session_key; + secret_t connection_secret; + bufferlist reply; + auto p = payload.cbegin(); + int r = auth->handle_response(0, p, &session_key, &connection_secret); + if (r == -EAGAIN) { + auth->prepare_build_request(); + auth->build_request(reply); + logger().info(" responding with {} bytes", reply.length()); + return {session_key, connection_secret, reply}; + } else if (r < 0) { + logger().error(" handle_response returned {}", r); + throw crimson::auth::error("unable to build auth"); + } else { + logger().info("authenticated!"); + std::terminate(); + } +} + +tuple +Connection::handle_auth_done(uint64_t new_global_id, + const ceph::buffer::list& payload) +{ + global_id = new_global_id; + auth->set_global_id(global_id); + auto p = payload.begin(); + CryptoKey session_key; + secret_t connection_secret; + int r = auth->handle_response(0, p, &session_key, &connection_secret); + conn->set_last_keepalive_ack(auth_start); + if (auth_done) { + auth_done->set_value(auth_result_t::success); + auth_done.reset(); + } + return {session_key, connection_secret, r}; +} + +int Connection::handle_auth_bad_method(uint32_t old_auth_method, + int result, + const std::vector& allowed_methods, + const std::vector& allowed_modes) +{ + logger().info("old_auth_method {} result {} allowed_methods {}", + old_auth_method, cpp_strerror(result), allowed_methods); + std::vector auth_supported; + auth_registry.get_supported_methods(conn->get_peer_type(), &auth_supported); + auto p = std::find(auth_supported.begin(), auth_supported.end(), + old_auth_method); + assert(p != auth_supported.end()); + p = std::find_first_of(std::next(p), auth_supported.end(), + allowed_methods.begin(), allowed_methods.end()); + if (p == auth_supported.end()) { + logger().error("server allowed_methods {} but i only support {}", + allowed_methods, auth_supported); + assert(auth_done); + auth_done->set_exception(std::system_error(make_error_code( + crimson::net::error::negotiation_failure))); + return -EACCES; + } + auth_method = *p; + logger().info("will try {} next", auth_method); + return 0; +} + +void Connection::close() +{ + logger().info("{}", __func__); + auth_reply.set_value(Ref(nullptr)); + auth_reply = {}; + if (auth_done) { + auth_done->set_value(auth_result_t::canceled); + auth_done.reset(); + } + if (conn && !std::exchange(closed, true)) { + conn->mark_down(); + } +} + +bool Connection::is_my_peer(const entity_addr_t& addr) const +{ + ceph_assert(conn); + return conn->get_peer_addr() == addr; +} + +crimson::net::ConnectionRef Connection::get_conn() { + return conn; +} + +Client::mon_command_t::mon_command_t(MURef req) + : req(std::move(req)) +{} + +Client::Client(crimson::net::Messenger& messenger, + crimson::common::AuthHandler& auth_handler) + // currently, crimson is OSD-only + : want_keys{CEPH_ENTITY_TYPE_MON | + CEPH_ENTITY_TYPE_OSD | + CEPH_ENTITY_TYPE_MGR}, + timer{[this] { tick(); }}, + msgr{messenger}, + log_client{nullptr}, + auth_registry{&cct}, + auth_handler{auth_handler} +{} + +Client::Client(Client&&) = default; +Client::~Client() = default; + +seastar::future<> Client::start() { + entity_name = crimson::common::local_conf()->name; + auth_registry.refresh_config(); + return load_keyring().then([this] { + return monmap.build_initial(crimson::common::local_conf(), false); + }).then([this] { + return authenticate(); + }).then([this] { + auto interval = + std::chrono::duration_cast( + std::chrono::duration( + local_conf().get_val("mon_client_ping_interval"))); + timer.arm_periodic(interval); + }); +} + +seastar::future<> Client::load_keyring() +{ + if (!auth_registry.is_supported_method(msgr.get_mytype(), CEPH_AUTH_CEPHX)) { + return seastar::now(); + } else { + return crimson::auth::load_from_keyring(&keyring).then([](KeyRing* keyring) { + return crimson::auth::load_from_keyfile(keyring); + }).then([](KeyRing* keyring) { + return crimson::auth::load_from_key(keyring); + }).then([](KeyRing*) { + return seastar::now(); + }); + } +} + +void Client::tick() +{ + gate.dispatch_in_background(__func__, *this, [this] { + if (active_con) { + return seastar::when_all_succeed(wait_for_send_log(), + active_con->get_conn()->send_keepalive(), + active_con->renew_tickets(), + active_con->renew_rotating_keyring()).discard_result(); + } else { + assert(is_hunting()); + logger().info("{} continuing the hunt", __func__); + return authenticate(); + } + }); +} + +seastar::future<> Client::wait_for_send_log() { + utime_t now = ceph_clock_now(); + if (now > last_send_log + cct._conf->mon_client_log_interval) { + last_send_log = now; + return send_log(log_flushing_t::NO_FLUSH); + } + return seastar::now(); +} + +seastar::future<> Client::send_log(log_flushing_t flush_flag) { + if (log_client) { + if (auto lm = log_client->get_mon_log_message(flush_flag); lm) { + return send_message(std::move(lm)); + } + more_log_pending = log_client->are_pending(); + } + return seastar::now(); +} + +bool Client::is_hunting() const { + return !active_con; +} + +std::optional> +Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) +{ + bool dispatched = true; + gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] { + // we only care about these message types + switch (m->get_type()) { + case CEPH_MSG_MON_MAP: + return handle_monmap(*conn, boost::static_pointer_cast(m)); + case CEPH_MSG_AUTH_REPLY: + return handle_auth_reply( + *conn, boost::static_pointer_cast(m)); + case CEPH_MSG_MON_SUBSCRIBE_ACK: + return handle_subscribe_ack( + boost::static_pointer_cast(m)); + case CEPH_MSG_MON_GET_VERSION_REPLY: + return handle_get_version_reply( + boost::static_pointer_cast(m)); + case MSG_MON_COMMAND_ACK: + return handle_mon_command_ack( + boost::static_pointer_cast(m)); + case MSG_LOGACK: + return handle_log_ack( + boost::static_pointer_cast(m)); + case MSG_CONFIG: + return handle_config( + boost::static_pointer_cast(m)); + default: + dispatched = false; + return seastar::now(); + } + }); + return (dispatched ? std::make_optional(seastar::now()) : std::nullopt); +} + +void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool /* is_replace */) +{ + gate.dispatch_in_background(__func__, *this, [this, conn] { + auto found = std::find_if(pending_conns.begin(), pending_conns.end(), + [peer_addr = conn->get_peer_addr()](auto& mc) { + return mc->is_my_peer(peer_addr); + }); + if (found != pending_conns.end()) { + logger().warn("pending conn reset by {}", conn->get_peer_addr()); + (*found)->close(); + pending_conns.erase(found); + return seastar::now(); + } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) { + logger().warn("active conn reset {}", conn->get_peer_addr()); + return reopen_session(-1).then([this](bool opened) { + if (opened) { + return on_session_opened(); + } else { + return seastar::now(); + } + }); + } else { + return seastar::now(); + } + }); +} + +std::pair, std::vector> +Client::get_supported_auth_methods(int peer_type) +{ + std::vector methods; + std::vector modes; + auth_registry.get_supported_methods(peer_type, &methods, &modes); + return {methods, modes}; +} + +uint32_t Client::pick_con_mode(int peer_type, + uint32_t auth_method, + const std::vector& preferred_modes) +{ + return auth_registry.pick_mode(peer_type, auth_method, preferred_modes); +} + +AuthAuthorizeHandler* Client::get_auth_authorize_handler(int peer_type, + int auth_method) +{ + return auth_registry.get_handler(peer_type, auth_method); +} + + +int Client::handle_auth_request(crimson::net::Connection &conn, + AuthConnectionMeta &auth_meta, + bool more, + uint32_t auth_method, + const ceph::bufferlist& payload, + uint64_t *p_peer_global_id, + ceph::bufferlist *reply) +{ + if (payload.length() == 0) { + return -EACCES; + } + auth_meta.auth_mode = payload[0]; + if (auth_meta.auth_mode < AUTH_MODE_AUTHORIZER || + auth_meta.auth_mode > AUTH_MODE_AUTHORIZER_MAX) { + return -EACCES; + } + AuthAuthorizeHandler* ah = get_auth_authorize_handler(conn.get_peer_type(), + auth_method); + if (!ah) { + logger().error("no AuthAuthorizeHandler found for auth method: {}", + auth_method); + return -EOPNOTSUPP; + } + auto authorizer_challenge = &auth_meta.authorizer_challenge; + if (auth_meta.skip_authorizer_challenge) { + logger().info("skipping challenge on {}", conn); + authorizer_challenge = nullptr; + } + if (!active_con) { + logger().info("auth request during inactivity period"); + // let's instruct the client to come back later + return -EBUSY; + } + bool was_challenge = (bool)auth_meta.authorizer_challenge; + EntityName name; + AuthCapsInfo caps_info; + bool is_valid = ah->verify_authorizer( + &cct, + active_con->get_keys(), + payload, + auth_meta.get_connection_secret_length(), + reply, + &name, + p_peer_global_id, + &caps_info, + &auth_meta.session_key, + &auth_meta.connection_secret, + authorizer_challenge); + if (is_valid) { + auth_handler.handle_authentication(name, caps_info); + return 1; + } + if (!more && !was_challenge && auth_meta.authorizer_challenge) { + logger().info("added challenge on {}", conn); + return 0; + } else { + logger().info("bad authorizer on {}", conn); + return -EACCES; + } +} + +auth::AuthClient::auth_request_t +Client::get_auth_request(crimson::net::Connection &conn, + AuthConnectionMeta &auth_meta) +{ + logger().info("get_auth_request(conn={}, auth_method={})", + conn, auth_meta.auth_method); + // connection to mon? + if (conn.get_peer_type() == CEPH_ENTITY_TYPE_MON) { + auto found = std::find_if(pending_conns.begin(), pending_conns.end(), + [peer_addr = conn.get_peer_addr()](auto& mc) { + return mc->is_my_peer(peer_addr); + }); + if (found == pending_conns.end()) { + throw crimson::auth::error{"unknown connection"}; + } + return (*found)->get_auth_request(entity_name, want_keys); + } else { + // generate authorizer + if (!active_con) { + logger().error(" but no auth handler is set up"); + throw crimson::auth::error("no auth available"); + } + auto authorizer = active_con->get_authorizer(conn.get_peer_type()); + if (!authorizer) { + logger().error("failed to build_authorizer for type {}", + ceph_entity_type_name(conn.get_peer_type())); + throw crimson::auth::error("unable to build auth"); + } + auth_meta.authorizer.reset(authorizer); + auth_meta.auth_method = authorizer->protocol; + vector modes; + auth_registry.get_supported_modes(conn.get_peer_type(), + auth_meta.auth_method, + &modes); + return {authorizer->protocol, modes, authorizer->bl}; + } +} + +ceph::bufferlist Client::handle_auth_reply_more(crimson::net::Connection &conn, + AuthConnectionMeta &auth_meta, + const bufferlist& bl) +{ + if (conn.get_peer_type() == CEPH_ENTITY_TYPE_MON) { + auto found = std::find_if(pending_conns.begin(), pending_conns.end(), + [peer_addr = conn.get_peer_addr()](auto& mc) { + return mc->is_my_peer(peer_addr); + }); + if (found == pending_conns.end()) { + throw crimson::auth::error{"unknown connection"}; + } + bufferlist reply; + tie(auth_meta.session_key, auth_meta.connection_secret, reply) = + (*found)->handle_auth_reply_more(bl); + return reply; + } else { + // authorizer challenges + if (!active_con || !auth_meta.authorizer) { + logger().error("no authorizer?"); + throw crimson::auth::error("no auth available"); + } + auth_meta.authorizer->add_challenge(&cct, bl); + return auth_meta.authorizer->bl; + } +} + +int Client::handle_auth_done(crimson::net::Connection &conn, + AuthConnectionMeta &auth_meta, + uint64_t global_id, + uint32_t /*con_mode*/, + const bufferlist& bl) +{ + if (conn.get_peer_type() == CEPH_ENTITY_TYPE_MON) { + auto found = std::find_if(pending_conns.begin(), pending_conns.end(), + [peer_addr = conn.get_peer_addr()](auto& mc) { + return mc->is_my_peer(peer_addr); + }); + if (found == pending_conns.end()) { + return -ENOENT; + } + int r = 0; + tie(auth_meta.session_key, auth_meta.connection_secret, r) = + (*found)->handle_auth_done(global_id, bl); + return r; + } else { + // verify authorizer reply + auto p = bl.begin(); + if (!auth_meta.authorizer->verify_reply(p, &auth_meta.connection_secret)) { + logger().error("failed verifying authorizer reply"); + return -EACCES; + } + auth_meta.session_key = auth_meta.authorizer->session_key; + return 0; + } +} + + // Handle server's indication that the previous auth attempt failed +int Client::handle_auth_bad_method(crimson::net::Connection &conn, + AuthConnectionMeta &auth_meta, + uint32_t old_auth_method, + int result, + const std::vector& allowed_methods, + const std::vector& allowed_modes) +{ + if (conn.get_peer_type() == CEPH_ENTITY_TYPE_MON) { + auto found = std::find_if(pending_conns.begin(), pending_conns.end(), + [peer_addr = conn.get_peer_addr()](auto& mc) { + return mc->is_my_peer(peer_addr); + }); + if (found != pending_conns.end()) { + return (*found)->handle_auth_bad_method( + old_auth_method, result, + allowed_methods, allowed_modes); + } else { + return -ENOENT; + } + } else { + // huh... + logger().info("hmm, they didn't like {} result {}", + old_auth_method, cpp_strerror(result)); + return -EACCES; + } +} + +seastar::future<> Client::handle_monmap(crimson::net::Connection &conn, + Ref m) +{ + monmap.decode(m->monmapbl); + const auto peer_addr = conn.get_peer_addr(); + auto cur_mon = monmap.get_name(peer_addr); + logger().info("got monmap {}, mon.{}, is now rank {}", + monmap.epoch, cur_mon, monmap.get_rank(cur_mon)); + sub.got("monmap", monmap.get_epoch()); + + if (monmap.get_addr_name(peer_addr, cur_mon)) { + if (active_con) { + logger().info("handle_monmap: renewing tickets"); + return seastar::when_all_succeed( + active_con->renew_tickets(), + active_con->renew_rotating_keyring()).then_unpack([] { + logger().info("handle_mon_map: renewed tickets"); + }); + } else { + return seastar::now(); + } + } else { + logger().warn("mon.{} went away", cur_mon); + return reopen_session(-1).then([this](bool opened) { + if (opened) { + return on_session_opened(); + } else { + return seastar::now(); + } + }); + } +} + +seastar::future<> Client::handle_auth_reply(crimson::net::Connection &conn, + Ref m) +{ + logger().info("handle_auth_reply {} returns {}: {}", + conn, *m, m->result); + auto found = std::find_if(pending_conns.begin(), pending_conns.end(), + [peer_addr = conn.get_peer_addr()](auto& mc) { + return mc->is_my_peer(peer_addr); + }); + if (found != pending_conns.end()) { + return (*found)->handle_auth_reply(m); + } else if (active_con) { + return active_con->handle_auth_reply(m).then([this] { + return seastar::when_all_succeed( + active_con->renew_rotating_keyring(), + active_con->renew_tickets()).discard_result(); + }); + } else { + logger().error("unknown auth reply from {}", conn.get_peer_addr()); + return seastar::now(); + } +} + +seastar::future<> Client::handle_subscribe_ack(Ref m) +{ + sub.acked(m->interval); + return seastar::now(); +} + +Client::get_version_t Client::get_version(const std::string& map) +{ + auto m = crimson::make_message(); + auto tid = ++last_version_req_id; + m->handle = tid; + m->what = map; + auto& req = version_reqs[tid]; + return send_message(std::move(m)).then([&req] { + return req.get_future(); + }); +} + +seastar::future<> +Client::handle_get_version_reply(Ref m) +{ + if (auto found = version_reqs.find(m->handle); + found != version_reqs.end()) { + auto& result = found->second; + logger().trace("{}: {} returns {}", + __func__, m->handle, m->version); + result.set_value(std::make_tuple(m->version, m->oldest_version)); + version_reqs.erase(found); + } else { + logger().warn("{}: version request with handle {} not found", + __func__, m->handle); + } + return seastar::now(); +} + +seastar::future<> Client::handle_mon_command_ack(Ref m) +{ + const auto tid = m->get_tid(); + if (auto found = std::find_if(mon_commands.begin(), + mon_commands.end(), + [tid](auto& cmd) { + return cmd.req->get_tid() == tid; + }); + found != mon_commands.end()) { + auto& command = *found; + logger().trace("{} {}", __func__, tid); + command.result.set_value(std::make_tuple(m->r, m->rs, std::move(m->get_data()))); + mon_commands.erase(found); + } else { + logger().warn("{} {} not found", __func__, tid); + } + return seastar::now(); +} + +seastar::future<> Client::handle_log_ack(Ref m) +{ + if (log_client) { + return log_client->handle_log_ack(m).then([this] { + if (more_log_pending) { + return send_log(log_flushing_t::NO_FLUSH); + } else { + return seastar::now(); + } + }); + } + return seastar::now(); +} + +seastar::future<> Client::handle_config(Ref m) +{ + return crimson::common::local_conf().set_mon_vals(m->config).then([this] { + if (config_updated) { + config_updated->set_value(); + } + }); +} + +std::vector Client::get_random_mons(unsigned n) const +{ + uint16_t min_priority = std::numeric_limits::max(); + for (const auto& m : monmap.mon_info) { + if (m.second.priority < min_priority) { + min_priority = m.second.priority; + } + } + vector ranks; + for (auto [name, info] : monmap.mon_info) { + if (info.priority == min_priority) { + ranks.push_back(monmap.get_rank(name)); + } + } + std::random_device rd; + std::default_random_engine rng{rd()}; + std::shuffle(ranks.begin(), ranks.end(), rng); + if (n == 0 || n > ranks.size()) { + return ranks; + } else { + return {ranks.begin(), ranks.begin() + n}; + } +} + +seastar::future<> Client::authenticate() +{ + return reopen_session(-1).then([this](bool opened) { + if (opened) { + return on_session_opened(); + } else { + return seastar::now(); + } + }); +} + +seastar::future<> Client::stop() +{ + logger().info("{}", __func__); + auto fut = gate.close(); + timer.cancel(); + ready_to_send = false; + for (auto& pending_con : pending_conns) { + pending_con->close(); + } + if (active_con) { + active_con->close(); + } + return fut; +} + +static entity_addr_t choose_client_addr( + const entity_addrvec_t& my_addrs, + const entity_addrvec_t& client_addrs) +{ + // here is where we decide which of the addrs to connect to. always prefer + // the first one, if we support it. + for (const auto& a : client_addrs.v) { + if (a.is_msgr2()) { + // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before + // trying it? for now, just pick whichever is listed first. + return a; + } + } + return entity_addr_t{}; +} + +seastar::future Client::reopen_session(int rank) +{ + logger().info("{} to mon.{}", __func__, rank); + ready_to_send = false; + if (active_con) { + active_con->close(); + active_con = nullptr; + ceph_assert(pending_conns.empty()); + } else { + for (auto& pending_con : pending_conns) { + pending_con->close(); + } + pending_conns.clear(); + } + vector mons; + if (rank >= 0) { + mons.push_back(rank); + } else { + const auto parallel = + crimson::common::local_conf().get_val("mon_client_hunt_parallel"); + mons = get_random_mons(parallel); + } + pending_conns.reserve(mons.size()); + return seastar::parallel_for_each(mons, [this](auto rank) { + auto peer = choose_client_addr(msgr.get_myaddrs(), + monmap.get_addrs(rank)); + if (peer == entity_addr_t{}) { + // crimson msgr only uses the first bound addr + logger().warn("mon.{} does not have an addr compatible with me", rank); + return seastar::now(); + } + logger().info("connecting to mon.{}", rank); + auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON); + auto& mc = pending_conns.emplace_back( + seastar::make_shared(auth_registry, conn, &keyring)); + assert(conn->get_peer_addr().is_msgr2()); + return mc->authenticate_v2().then([peer, this](auto result) { + if (result == Connection::auth_result_t::success) { + _finish_auth(peer); + } + logger().debug("reopen_session mon connection attempts complete"); + }).handle_exception([](auto ep) { + logger().error("mon connections failed with ep {}", ep); + return seastar::make_exception_future(ep); + }); + }).then([this] { + if (active_con) { + return true; + } else { + logger().warn("cannot establish the active_con with any mon"); + return false; + } + }); +} + +void Client::_finish_auth(const entity_addr_t& peer) +{ + if (!is_hunting()) { + return; + } + logger().info("found mon.{}", monmap.get_name(peer)); + + auto found = std::find_if( + pending_conns.begin(), pending_conns.end(), + [peer](auto& conn) { + return conn->is_my_peer(peer); + }); + if (found == pending_conns.end()) { + // Happens if another connection has won the race + ceph_assert(active_con && pending_conns.empty()); + logger().info("no pending connection for mon.{}, peer {}", + monmap.get_name(peer), peer); + return; + } + + ceph_assert(!active_con && !pending_conns.empty()); + // It's too early to toggle the `ready_to_send` flag. It will + // be set atfer finishing the MAuth exchange and draining out + // the `pending_messages` queue. + active_con = std::move(*found); + *found = nullptr; + for (auto& conn : pending_conns) { + if (conn) { + conn->close(); + } + } + pending_conns.clear(); +} + +Client::command_result_t +Client::run_command(std::string&& cmd, + bufferlist&& bl) +{ + auto m = crimson::make_message(monmap.fsid); + auto tid = ++last_mon_command_id; + m->set_tid(tid); + m->cmd = {std::move(cmd)}; + m->set_data(std::move(bl)); + auto& command = mon_commands.emplace_back(crimson::make_message(*m)); + return send_message(std::move(m)).then([&result=command.result] { + return result.get_future(); + }); +} + +seastar::future<> Client::send_message(MessageURef m) +{ + if (active_con && ready_to_send) { + assert(pending_messages.empty()); + return active_con->get_conn()->send(std::move(m)); + } else { + auto& delayed = pending_messages.emplace_back(std::move(m)); + return delayed.pr.get_future(); + } +} + +seastar::future<> Client::on_session_opened() +{ + return active_con->renew_rotating_keyring().then([this] { + if (!active_con) { + // the connection can be closed even in the middle of the opening sequence + logger().info("on_session_opened {}: connection closed", __LINE__); + return seastar::now(); + } + for (auto& m : pending_messages) { + (void) active_con->get_conn()->send(std::move(m.msg)); + m.pr.set_value(); + } + pending_messages.clear(); + ready_to_send = true; + return sub.reload() ? renew_subs() : seastar::now(); + }).then([this] { + if (!active_con) { + logger().info("on_session_opened {}: connection closed", __LINE__); + return seastar::now(); + } + return seastar::parallel_for_each(mon_commands, + [this](auto &command) { + return send_message(crimson::make_message(*command.req)); + }); + }); +} + +bool Client::sub_want(const std::string& what, version_t start, unsigned flags) +{ + return sub.want(what, start, flags); +} + +void Client::sub_got(const std::string& what, version_t have) +{ + sub.got(what, have); +} + +void Client::sub_unwant(const std::string& what) +{ + sub.unwant(what); +} + +bool Client::sub_want_increment(const std::string& what, + version_t start, + unsigned flags) +{ + return sub.inc_want(what, start, flags); +} + +seastar::future<> Client::renew_subs() +{ + if (!sub.have_new()) { + logger().warn("{} - empty", __func__); + return seastar::now(); + } + logger().trace("{}", __func__); + + auto m = crimson::make_message(); + m->what = sub.get_subs(); + m->hostname = ceph_get_short_hostname(); + return send_message(std::move(m)).then([this] { + sub.renewed(); + }); +} + +seastar::future<> Client::wait_for_config() +{ + assert(!config_updated); + config_updated = seastar::promise<>(); + return config_updated->get_future(); +} + +void Client::print(std::ostream& out) const +{ + out << "mon." << entity_name; +} + +} // namespace crimson::mon diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h new file mode 100644 index 000000000..1228ecd0b --- /dev/null +++ b/src/crimson/mon/MonClient.h @@ -0,0 +1,218 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include + +#include "auth/AuthRegistry.h" +#include "auth/KeyRing.h" +#include "common/ceph_context.h" + +#include "crimson/auth/AuthClient.h" +#include "crimson/auth/AuthServer.h" +#include "crimson/common/auth_handler.h" +#include "crimson/common/gated.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/net/Fwd.h" + +#include "mon/MonMap.h" + +#include "mon/MonSub.h" + +template using Ref = boost::intrusive_ptr; +namespace crimson::net { + class Messenger; +} + +class LogClient; + +struct AuthAuthorizeHandler; +class MAuthReply; +struct MMonMap; +struct MMonSubscribeAck; +struct MMonGetVersionReply; +struct MMonCommand; +struct MMonCommandAck; +struct MLogAck; +struct MConfig; + +enum class log_flushing_t; + +namespace crimson::mon { + +class Connection; + +class Client : public crimson::net::Dispatcher, + public crimson::auth::AuthClient, + public crimson::auth::AuthServer +{ + EntityName entity_name; + KeyRing keyring; + const uint32_t want_keys; + + MonMap monmap; + bool ready_to_send = false; + seastar::shared_ptr active_con; + std::vector> pending_conns; + seastar::timer timer; + + crimson::net::Messenger& msgr; + + LogClient *log_client; + bool more_log_pending = false; + utime_t last_send_log; + + seastar::future<> send_log(log_flushing_t flush_flag); + seastar::future<> wait_for_send_log(); + + // commands + using get_version_t = seastar::future>; + + ceph_tid_t last_version_req_id = 0; + std::map version_reqs; + + ceph_tid_t last_mon_command_id = 0; + using command_result_t = + seastar::future>; + struct mon_command_t { + MURef req; + typename command_result_t::promise_type result; + mon_command_t(MURef req); + }; + std::vector mon_commands; + + MonSub sub; + +public: + Client(crimson::net::Messenger&, crimson::common::AuthHandler&); + Client(Client&&); + ~Client(); + seastar::future<> start(); + seastar::future<> stop(); + + void set_log_client(LogClient *clog) { + log_client = clog; + } + + const uuid_d& get_fsid() const { + return monmap.fsid; + } + get_version_t get_version(const std::string& map); + command_result_t run_command(std::string&& cmd, + bufferlist&& bl); + seastar::future<> send_message(MessageURef); + bool sub_want(const std::string& what, version_t start, unsigned flags); + void sub_got(const std::string& what, version_t have); + void sub_unwant(const std::string& what); + bool sub_want_increment(const std::string& what, version_t start, unsigned flags); + seastar::future<> renew_subs(); + seastar::future<> wait_for_config(); + + void print(std::ostream&) const; +private: + // AuthServer methods + std::pair, std::vector> + get_supported_auth_methods(int peer_type) final; + uint32_t pick_con_mode(int peer_type, + uint32_t auth_method, + const std::vector& preferred_modes) final; + AuthAuthorizeHandler* get_auth_authorize_handler(int peer_type, + int auth_method) final; + int handle_auth_request(crimson::net::Connection &conn, + AuthConnectionMeta &auth_meta, + bool more, + uint32_t auth_method, + const ceph::bufferlist& payload, + uint64_t *p_peer_global_id, + ceph::bufferlist *reply) final; + + crimson::common::CephContext cct; // for auth_registry + AuthRegistry auth_registry; + crimson::common::AuthHandler& auth_handler; + + // AuthClient methods + crimson::auth::AuthClient::auth_request_t + get_auth_request(crimson::net::Connection &conn, + AuthConnectionMeta &auth_meta) final; + + // Handle server's request to continue the handshake + ceph::bufferlist handle_auth_reply_more(crimson::net::Connection &conn, + AuthConnectionMeta &auth_meta, + const bufferlist& bl) final; + + // Handle server's indication that authentication succeeded + int handle_auth_done(crimson::net::Connection &conn, + AuthConnectionMeta &auth_meta, + uint64_t global_id, + uint32_t con_mode, + const bufferlist& bl) final; + + // Handle server's indication that the previous auth attempt failed + int handle_auth_bad_method(crimson::net::Connection &conn, + AuthConnectionMeta &auth_meta, + uint32_t old_auth_method, + int result, + const std::vector& allowed_methods, + const std::vector& allowed_modes) final; + +private: + void tick(); + + std::optional> ms_dispatch(crimson::net::ConnectionRef conn, + MessageRef m) override; + void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; + + seastar::future<> handle_monmap(crimson::net::Connection &conn, + Ref m); + seastar::future<> handle_auth_reply(crimson::net::Connection &conn, + Ref m); + seastar::future<> handle_subscribe_ack(Ref m); + seastar::future<> handle_get_version_reply(Ref m); + seastar::future<> handle_mon_command_ack(Ref m); + seastar::future<> handle_log_ack(Ref m); + seastar::future<> handle_config(Ref m); + + seastar::future<> on_session_opened(); +private: + seastar::future<> load_keyring(); + seastar::future<> authenticate(); + + bool is_hunting() const; + // @param rank, rank of the monitor to be connected, if it is less than 0, + // try to connect to all monitors in monmap, until one of them + // is connected. + // @return true if a connection to monitor is established + seastar::future reopen_session(int rank); + std::vector get_random_mons(unsigned n) const; + seastar::future<> _add_conn(unsigned rank, uint64_t global_id); + void _finish_auth(const entity_addr_t& peer); + crimson::common::Gated gate; + + // messages that are waiting for the active_con to be available + struct pending_msg_t { + pending_msg_t(MessageURef m) : msg(std::move(m)) {} + MessageURef msg; + seastar::promise<> pr; + }; + std::deque pending_messages; + std::optional> config_updated; +}; + +inline std::ostream& operator<<(std::ostream& out, const Client& client) { + client.print(out); + return out; +} + +} // namespace crimson::mon + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter : fmt::ostream_formatter {}; +#endif -- cgit v1.2.3