summaryrefslogtreecommitdiffstats
path: root/src/crimson/mon
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/mon')
-rw-r--r--src/crimson/mon/MonClient.cc606
-rw-r--r--src/crimson/mon/MonClient.h114
2 files changed, 720 insertions, 0 deletions
diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc
new file mode 100644
index 00000000..4b61270c
--- /dev/null
+++ b/src/crimson/mon/MonClient.cc
@@ -0,0 +1,606 @@
+#include "MonClient.h"
+
+#include <random>
+
+#include <seastar/core/future-util.hh>
+#include <seastar/core/lowres_clock.hh>
+#include <seastar/util/log.hh>
+
+#include "auth/AuthClientHandler.h"
+#include "auth/AuthMethodList.h"
+#include "auth/RotatingKeyRing.h"
+
+#include "common/hostname.h"
+
+#include "crimson/auth/KeyRing.h"
+#include "crimson/common/config_proxy.h"
+#include "crimson/common/log.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Errors.h"
+#include "crimson/net/Messenger.h"
+
+#include "messages/MAuth.h"
+#include "messages/MAuthReply.h"
+#include "messages/MConfig.h"
+#include "messages/MLogAck.h"
+#include "messages/MMonCommand.h"
+#include "messages/MMonCommandAck.h"
+#include "messages/MMonGetVersion.h"
+#include "messages/MMonGetVersionReply.h"
+#include "messages/MMonMap.h"
+#include "messages/MMonSubscribe.h"
+#include "messages/MMonSubscribeAck.h"
+
+namespace {
+ seastar::logger& logger()
+ {
+ return ceph::get_logger(ceph_subsys_monc);
+ }
+
+ template<typename Message, typename... Args>
+ Ref<Message> make_message(Args&&... args)
+ {
+ return {new Message{std::forward<Args>(args)...}, false};
+ }
+}
+
+namespace ceph::mon {
+
+
+class Connection {
+public:
+ Connection(ceph::net::ConnectionRef conn,
+ KeyRing* keyring);
+ seastar::future<> handle_auth_reply(Ref<MAuthReply> m);
+ seastar::future<> authenticate(epoch_t epoch,
+ const EntityName& name,
+ const AuthMethodList& auth_methods,
+ uint32_t want_keys);
+ seastar::future<> close();
+ bool is_my_peer(const entity_addr_t& addr) const;
+
+ seastar::future<> renew_tickets();
+ ceph::net::ConnectionRef get_conn();
+
+private:
+ seastar::future<> setup_session(epoch_t epoch,
+ const AuthMethodList& auth_methods,
+ const EntityName& name);
+ std::unique_ptr<AuthClientHandler> create_auth(Ref<MAuthReply> m,
+ const EntityName& name,
+ uint32_t want_keys);
+ seastar::future<bool> do_auth();
+
+private:
+ bool closed = false;
+ seastar::promise<Ref<MAuthReply>> reply;
+ ceph::net::ConnectionRef conn;
+ std::unique_ptr<AuthClientHandler> auth;
+ RotatingKeyRing rotating_keyring;
+ uint64_t global_id;
+};
+
+Connection::Connection(ceph::net::ConnectionRef conn,
+ KeyRing* keyring)
+ : conn{conn},
+ rotating_keyring{nullptr, CEPH_ENTITY_TYPE_OSD, keyring}
+{}
+
+seastar::future<> Connection::handle_auth_reply(Ref<MAuthReply> m)
+{
+ reply.set_value(m);
+ return seastar::now();
+}
+
+seastar::future<> Connection::renew_tickets()
+{
+ if (auth->need_tickets()) {
+ return do_auth().then([](bool success) {
+ if (!success) {
+ throw std::system_error(make_error_code(
+ ceph::net::error::negotiation_failure));
+ }
+ });
+ }
+ return seastar::now();
+}
+
+std::unique_ptr<AuthClientHandler>
+Connection::create_auth(Ref<MAuthReply> m,
+ const EntityName& name,
+ uint32_t want_keys)
+{
+ static CephContext cct;
+ std::unique_ptr<AuthClientHandler> auth;
+ auth.reset(AuthClientHandler::create(&cct,
+ m->protocol,
+ &rotating_keyring));
+ if (!auth) {
+ logger().error("no handler for protocol {}", m->protocol);
+ throw std::system_error(make_error_code(
+ ceph::net::error::negotiation_failure));
+ }
+ auth->init(name);
+ auth->set_want_keys(want_keys);
+ auth->set_global_id(global_id);
+
+ if (m->global_id != global_id) {
+ // it's a new session
+ auth->set_global_id(global_id);
+ auth->reset();
+ }
+ return auth;
+}
+
+seastar::future<>
+Connection::setup_session(epoch_t epoch,
+ const AuthMethodList& auth_methods,
+ const EntityName& name)
+{
+ auto m = make_message<MAuth>();
+ m->protocol = 0;
+ m->monmap_epoch = epoch;
+ __u8 struct_v = 1;
+ encode(struct_v, m->auth_payload);
+ encode(auth_methods.get_supported_set(), m->auth_payload);
+ encode(name, m->auth_payload);
+ encode(global_id, m->auth_payload);
+ return conn->send(m);
+}
+
+seastar::future<bool> Connection::do_auth()
+{
+ auto m = make_message<MAuth>();
+ m->protocol = auth->get_protocol();
+ auth->prepare_build_request();
+ if (int ret = auth->build_request(m->auth_payload); ret) {
+ logger().error("missing/bad key for '{}'",
+ ceph::common::local_conf()->name);
+ throw std::system_error(make_error_code(
+ ceph::net::error::negotiation_failure));
+ }
+ logger().info("sending {}", *m);
+ return conn->send(m).then([this] {
+ logger().info("waiting");
+ return reply.get_future();
+ }).then([this] (Ref<MAuthReply> m) {
+ logger().info("mon {} => {} returns {}: {}",
+ conn->get_messenger()->get_myaddr(),
+ conn->get_peer_addr(), *m, m->result);
+ reply = decltype(reply){};
+ auto p = m->result_bl.cbegin();
+ auto ret = auth->handle_response(m->result, p,
+#warning fix crimson: session_key, connection_secret
+ nullptr, nullptr);
+ if (ret != 0 && ret != -EAGAIN) {
+ throw std::system_error(make_error_code(
+ ceph::net::error::negotiation_failure));
+ }
+ return seastar::make_ready_future<bool>(ret == 0);
+ });
+}
+
+seastar::future<>
+Connection::authenticate(epoch_t epoch,
+ const EntityName& name,
+ const AuthMethodList& auth_methods,
+ uint32_t want_keys)
+{
+ return conn->keepalive().then([epoch, auth_methods, name, this] {
+ return setup_session(epoch, auth_methods, name);
+ }).then([this] {
+ return reply.get_future();
+ }).then([name, want_keys, this](Ref<MAuthReply> m) {
+ reply = decltype(reply){};
+ auth = create_auth(m, name, want_keys);
+ global_id = m->global_id;
+ switch (auto p = m->result_bl.cbegin();
+ auth->handle_response(m->result, p,
+#warning fix crimson: session_key, connection_secret
+ nullptr, nullptr)) {
+ case 0:
+ // none
+ return seastar::now();
+ case -EAGAIN:
+ // cephx
+ return seastar::repeat([this] {
+ return do_auth().then([](bool success) {
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ success ?
+ seastar::stop_iteration::yes:
+ seastar::stop_iteration::no);
+ });
+ });
+ default:
+ ceph_assert_always(0);
+ }
+ });
+}
+
+seastar::future<> Connection::close()
+{
+ if (conn && !std::exchange(closed, true)) {
+ return conn->close();
+ } else {
+ return seastar::now();
+ }
+}
+
+bool Connection::is_my_peer(const entity_addr_t& addr) const
+{
+ return conn->get_peer_addr() == addr;
+}
+
+ceph::net::ConnectionRef Connection::get_conn() {
+ return conn;
+}
+
+namespace {
+auto create_auth_methods(uint32_t entity_type)
+{
+ auto& conf = ceph::common::local_conf();
+ std::string method;
+ const auto auth_supported = conf.get_val<std::string>("auth_supported");
+ if (!auth_supported.empty()) {
+ method = auth_supported;
+ } else if (entity_type & (CEPH_ENTITY_TYPE_OSD |
+ CEPH_ENTITY_TYPE_MDS |
+ CEPH_ENTITY_TYPE_MON |
+ CEPH_ENTITY_TYPE_MGR)) {
+ method = conf.get_val<std::string>("auth_cluster_required");
+ } else {
+ method = conf.get_val<std::string>("auth_client_required");
+ }
+ return std::make_unique<AuthMethodList>(nullptr, method);
+}
+}
+
+Client::Client(ceph::net::Messenger& messenger)
+ // currently, crimson is OSD-only
+ : want_keys{CEPH_ENTITY_TYPE_MON |
+ CEPH_ENTITY_TYPE_OSD |
+ CEPH_ENTITY_TYPE_MGR},
+ timer{[this] { tick(); }},
+ msgr{messenger}
+{}
+
+Client::Client(Client&&) = default;
+Client::~Client() = default;
+
+seastar::future<> Client::start() {
+ entity_name = ceph::common::local_conf()->name;
+ // should always be OSD, though
+ auth_methods = create_auth_methods(entity_name.get_type());
+ return load_keyring().then([this] {
+ return monmap.build_initial(ceph::common::local_conf(), false);
+ }).then([this] {
+ return authenticate();
+ });
+}
+
+seastar::future<> Client::load_keyring()
+{
+ if (!auth_methods->is_supported_auth(CEPH_AUTH_CEPHX)) {
+ return seastar::now();
+ } else {
+ return ceph::auth::load_from_keyring(&keyring).then([](KeyRing* keyring) {
+ return ceph::auth::load_from_keyfile(keyring);
+ }).then([](KeyRing* keyring) {
+ return ceph::auth::load_from_key(keyring);
+ }).then([](KeyRing*) {
+ return seastar::now();
+ });
+ }
+}
+
+void Client::tick()
+{
+ seastar::with_gate(tick_gate, [this] {
+ return active_con->renew_tickets();
+ });
+}
+
+bool Client::is_hunting() const {
+ return !active_con;
+}
+
+seastar::future<>
+Client::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m)
+{
+ logger().info("ms_dispatch {}", *m);
+ // we only care about these message types
+ switch (m->get_type()) {
+ case CEPH_MSG_MON_MAP:
+ return handle_monmap(conn, boost::static_pointer_cast<MMonMap>(m));
+ case CEPH_MSG_AUTH_REPLY:
+ return handle_auth_reply(
+ conn, boost::static_pointer_cast<MAuthReply>(m));
+ case CEPH_MSG_MON_SUBSCRIBE_ACK:
+ return handle_subscribe_ack(
+ boost::static_pointer_cast<MMonSubscribeAck>(m));
+ case CEPH_MSG_MON_GET_VERSION_REPLY:
+ return handle_get_version_reply(
+ boost::static_pointer_cast<MMonGetVersionReply>(m));
+ case MSG_MON_COMMAND_ACK:
+ return handle_mon_command_ack(
+ boost::static_pointer_cast<MMonCommandAck>(m));
+ case MSG_LOGACK:
+ return handle_log_ack(
+ boost::static_pointer_cast<MLogAck>(m));
+ case MSG_CONFIG:
+ return handle_config(
+ boost::static_pointer_cast<MConfig>(m));
+ default:
+ return seastar::now();
+ }
+}
+
+seastar::future<> Client::ms_handle_reset(ceph::net::ConnectionRef conn)
+{
+ auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
+ [peer_addr = conn->get_peer_addr()](auto& mc) {
+ return mc.is_my_peer(peer_addr);
+ });
+ if (found != pending_conns.end()) {
+ logger().warn("pending conn reset by {}", conn->get_peer_addr());
+ return found->close();
+ } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) {
+ logger().warn("active conn reset {}", conn->get_peer_addr());
+ active_con.reset();
+ return reopen_session(-1);
+ } else {
+ logger().error("unknown reset from {}", conn->get_peer_addr());
+ return seastar::now();
+ }
+}
+
+seastar::future<> Client::handle_monmap(ceph::net::ConnectionRef conn,
+ Ref<MMonMap> m)
+{
+ monmap.decode(m->monmapbl);
+ const auto peer_addr = conn->get_peer_addr();
+ auto cur_mon = monmap.get_name(peer_addr);
+ logger().info("got monmap {}, mon.{}, is now rank {}",
+ monmap.epoch, cur_mon, monmap.get_rank(cur_mon));
+ sub.got("monmap", monmap.get_epoch());
+
+ if (monmap.get_addr_name(peer_addr, cur_mon)) {
+ return seastar::now();
+ } else {
+ logger().warn("mon.{} went away", cur_mon);
+ return reopen_session(-1);
+ }
+}
+
+seastar::future<> Client::handle_auth_reply(ceph::net::ConnectionRef conn,
+ Ref<MAuthReply> m)
+{
+ logger().info("mon {} => {} returns {}: {}",
+ conn->get_messenger()->get_myaddr(),
+ conn->get_peer_addr(), *m, m->result);
+ auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
+ [peer_addr = conn->get_peer_addr()](auto& mc) {
+ return mc.is_my_peer(peer_addr);
+ });
+ if (found != pending_conns.end()) {
+ return found->handle_auth_reply(m);
+ } else if (active_con) {
+ return active_con->handle_auth_reply(m);
+ } else {
+ logger().error("unknown auth reply from {}", conn->get_peer_addr());
+ return seastar::now();
+ }
+}
+
+seastar::future<> Client::handle_subscribe_ack(Ref<MMonSubscribeAck> m)
+{
+ sub.acked(m->interval);
+ return seastar::now();
+}
+
+Client::get_version_t Client::get_version(const std::string& map)
+{
+ auto m = make_message<MMonGetVersion>();
+ auto tid = ++last_version_req_id;
+ m->handle = tid;
+ m->what = map;
+ auto& req = version_reqs[tid];
+ return active_con->get_conn()->send(m).then([&req] {
+ return req.get_future();
+ });
+}
+
+seastar::future<>
+Client::handle_get_version_reply(Ref<MMonGetVersionReply> m)
+{
+ if (auto found = version_reqs.find(m->handle);
+ found != version_reqs.end()) {
+ auto& result = found->second;
+ logger().trace("{}: {} returns {}",
+ __func__, m->handle, m->version);
+ result.set_value(m->version, m->oldest_version);
+ version_reqs.erase(found);
+ } else {
+ logger().warn("{}: version request with handle {} not found",
+ __func__, m->handle);
+ }
+ return seastar::now();
+}
+
+seastar::future<> Client::handle_mon_command_ack(Ref<MMonCommandAck> m)
+{
+ const auto tid = m->get_tid();
+ if (auto found = mon_commands.find(tid);
+ found != mon_commands.end()) {
+ auto& result = found->second;
+ logger().trace("{} {}", __func__, tid);
+ result.set_value(m->r, m->rs, std::move(m->get_data()));
+ mon_commands.erase(found);
+ } else {
+ logger().warn("{} {} not found", __func__, tid);
+ }
+ return seastar::now();
+}
+
+seastar::future<> Client::handle_log_ack(Ref<MLogAck> m)
+{
+ // XXX
+ return seastar::now();
+}
+
+seastar::future<> Client::handle_config(Ref<MConfig> m)
+{
+ return ceph::common::local_conf().set_mon_vals(m->config);
+}
+
+std::vector<unsigned> Client::get_random_mons(unsigned n) const
+{
+ uint16_t min_priority = std::numeric_limits<uint16_t>::max();
+ for (const auto& m : monmap.mon_info) {
+ if (m.second.priority < min_priority) {
+ min_priority = m.second.priority;
+ }
+ }
+ vector<unsigned> ranks;
+ for (auto [name, info] : monmap.mon_info) {
+ // TODO: #msgr-v2
+ if (info.public_addrs.legacy_addr().is_blank_ip()) {
+ continue;
+ }
+ if (info.priority == min_priority) {
+ ranks.push_back(monmap.get_rank(name));
+ }
+ }
+ std::random_device rd;
+ std::default_random_engine rng{rd()};
+ std::shuffle(ranks.begin(), ranks.end(), rng);
+ if (n == 0 || n > ranks.size()) {
+ return ranks;
+ } else {
+ return {ranks.begin(), ranks.begin() + n};
+ }
+}
+
+seastar::future<> Client::authenticate()
+{
+ return reopen_session(-1);
+}
+
+seastar::future<> Client::stop()
+{
+ return tick_gate.close().then([this] {
+ if (active_con) {
+ return active_con->close();
+ } else {
+ return seastar::now();
+ }
+ });
+}
+
+seastar::future<> Client::reopen_session(int rank)
+{
+ vector<unsigned> mons;
+ if (rank >= 0) {
+ mons.push_back(rank);
+ } else {
+ const auto parallel =
+ ceph::common::local_conf().get_val<uint64_t>("mon_client_hunt_parallel");
+ mons = get_random_mons(parallel);
+ }
+ pending_conns.reserve(mons.size());
+ return seastar::parallel_for_each(mons, [this](auto rank) {
+#warning fixme
+ auto peer = monmap.get_addrs(rank).legacy_addr();
+ logger().info("connecting to mon.{}", rank);
+ return msgr.connect(peer, CEPH_ENTITY_TYPE_MON)
+ .then([this] (auto xconn) {
+ // sharded-messenger compatible mode assumes all connections running
+ // in one shard.
+ ceph_assert((*xconn)->shard_id() == seastar::engine().cpu_id());
+ ceph::net::ConnectionRef conn = xconn->release();
+ auto& mc = pending_conns.emplace_back(conn, &keyring);
+ return mc.authenticate(
+ monmap.get_epoch(), entity_name,
+ *auth_methods, want_keys).handle_exception([conn](auto ep) {
+ return conn->close().then([ep = std::move(ep)] {
+ std::rethrow_exception(ep);
+ });
+ });
+ }).then([peer, this] {
+ if (!is_hunting()) {
+ return seastar::now();
+ }
+ auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
+ [peer](auto& mc) {
+ return mc.is_my_peer(peer);
+ });
+ ceph_assert_always(found != pending_conns.end());
+ active_con.reset(new Connection{std::move(*found)});
+ return seastar::parallel_for_each(pending_conns, [] (auto& conn) {
+ return conn.close();
+ });
+ });
+ }).then([this] {
+ pending_conns.clear();
+ });
+}
+
+Client::command_result_t
+Client::run_command(const std::vector<std::string>& cmd,
+ const bufferlist& bl)
+{
+ auto m = make_message<MMonCommand>(monmap.fsid);
+ auto tid = ++last_mon_command_id;
+ m->set_tid(tid);
+ m->cmd = cmd;
+ m->set_data(bl);
+ auto& req = mon_commands[tid];
+ return active_con->get_conn()->send(m).then([&req] {
+ return req.get_future();
+ });
+}
+
+seastar::future<> Client::send_message(MessageRef m)
+{
+ return active_con->get_conn()->send(m);
+}
+
+bool Client::sub_want(const std::string& what, version_t start, unsigned flags)
+{
+ return sub.want(what, start, flags);
+}
+
+void Client::sub_got(const std::string& what, version_t have)
+{
+ sub.got(what, have);
+}
+
+void Client::sub_unwant(const std::string& what)
+{
+ sub.unwant(what);
+}
+
+bool Client::sub_want_increment(const std::string& what,
+ version_t start,
+ unsigned flags)
+{
+ return sub.inc_want(what, start, flags);
+}
+
+seastar::future<> Client::renew_subs()
+{
+ if (!sub.have_new()) {
+ logger().warn("{} - empty", __func__);
+ return seastar::now();
+ }
+ logger().trace("{}", __func__);
+
+ auto m = make_message<MMonSubscribe>();
+ m->what = sub.get_subs();
+ m->hostname = ceph_get_short_hostname();
+ return active_con->get_conn()->send(m).then([this] {
+ sub.renewed();
+ });
+}
+
+} // namespace ceph::mon
diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h
new file mode 100644
index 00000000..ef9bcb69
--- /dev/null
+++ b/src/crimson/mon/MonClient.h
@@ -0,0 +1,114 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#pragma once
+
+#include <memory>
+
+#include <seastar/core/future.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/lowres_clock.hh>
+#include <seastar/core/timer.hh>
+
+#include "auth/KeyRing.h"
+
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Fwd.h"
+
+#include "mon/MonMap.h"
+
+#include "mon/MonSub.h"
+
+template<typename Message> using Ref = boost::intrusive_ptr<Message>;
+namespace ceph::net {
+ class Messenger;
+}
+
+class AuthMethodList;
+class MAuthReply;
+struct MMonMap;
+struct MMonSubscribeAck;
+struct MMonGetVersionReply;
+struct MMonCommandAck;
+struct MLogAck;
+struct MConfig;
+
+namespace ceph::mon {
+
+class Connection;
+
+class Client : public ceph::net::Dispatcher {
+ EntityName entity_name;
+ KeyRing keyring;
+ std::unique_ptr<AuthMethodList> auth_methods;
+ const uint32_t want_keys;
+
+ MonMap monmap;
+ seastar::promise<MessageRef> reply;
+ std::unique_ptr<Connection> active_con;
+ std::vector<Connection> pending_conns;
+ seastar::timer<seastar::lowres_clock> timer;
+ seastar::gate tick_gate;
+
+ ceph::net::Messenger& msgr;
+
+ // commands
+ using get_version_t = seastar::future<version_t, version_t>;
+
+ ceph_tid_t last_version_req_id = 0;
+ std::map<ceph_tid_t, typename get_version_t::promise_type> version_reqs;
+
+ ceph_tid_t last_mon_command_id = 0;
+ using command_result_t =
+ seastar::future<std::int32_t, string, ceph::bufferlist>;
+ std::map<ceph_tid_t, typename command_result_t::promise_type> mon_commands;
+
+ MonSub sub;
+
+public:
+ Client(ceph::net::Messenger& messenger);
+ Client(Client&&);
+ ~Client();
+ seastar::future<> start();
+ seastar::future<> stop();
+
+ const uuid_d& get_fsid() const {
+ return monmap.fsid;
+ }
+ get_version_t get_version(const std::string& map);
+ command_result_t run_command(const std::vector<std::string>& cmd,
+ const bufferlist& bl);
+ seastar::future<> send_message(MessageRef);
+ bool sub_want(const std::string& what, version_t start, unsigned flags);
+ void sub_got(const std::string& what, version_t have);
+ void sub_unwant(const std::string& what);
+ bool sub_want_increment(const std::string& what, version_t start, unsigned flags);
+ seastar::future<> renew_subs();
+
+private:
+ void tick();
+
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn,
+ MessageRef m) override;
+ seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override;
+
+ seastar::future<> handle_monmap(ceph::net::ConnectionRef conn,
+ Ref<MMonMap> m);
+ seastar::future<> handle_auth_reply(ceph::net::ConnectionRef conn,
+ Ref<MAuthReply> m);
+ seastar::future<> handle_subscribe_ack(Ref<MMonSubscribeAck> m);
+ seastar::future<> handle_get_version_reply(Ref<MMonGetVersionReply> m);
+ seastar::future<> handle_mon_command_ack(Ref<MMonCommandAck> m);
+ seastar::future<> handle_log_ack(Ref<MLogAck> m);
+ seastar::future<> handle_config(Ref<MConfig> m);
+
+private:
+ seastar::future<> load_keyring();
+ seastar::future<> authenticate();
+
+ bool is_hunting() const;
+ seastar::future<> reopen_session(int rank);
+ std::vector<unsigned> get_random_mons(unsigned n) const;
+ seastar::future<> _add_conn(unsigned rank, uint64_t global_id);
+};
+
+} // namespace ceph::mon