diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/crimson/osd/osd.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/osd/osd.cc')
-rw-r--r-- | src/crimson/osd/osd.cc | 1357 |
1 files changed, 1357 insertions, 0 deletions
diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc new file mode 100644 index 000000000..cfe4f54ab --- /dev/null +++ b/src/crimson/osd/osd.cc @@ -0,0 +1,1357 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "osd.h" + +#include <sys/utsname.h> + +#include <boost/iterator/counting_iterator.hpp> +#include <boost/range/join.hpp> +#include <fmt/format.h> +#include <fmt/os.h> +#include <fmt/ostream.h> +#include <seastar/core/timer.hh> + +#include "common/pick_address.h" +#include "include/util.h" + +#include "messages/MCommand.h" +#include "messages/MOSDBeacon.h" +#include "messages/MOSDBoot.h" +#include "messages/MOSDMap.h" +#include "messages/MOSDMarkMeDown.h" +#include "messages/MOSDOp.h" +#include "messages/MOSDPeeringOp.h" +#include "messages/MOSDPGCreate2.h" +#include "messages/MOSDPGUpdateLogMissing.h" +#include "messages/MOSDPGUpdateLogMissingReply.h" +#include "messages/MOSDRepOpReply.h" +#include "messages/MOSDScrub2.h" +#include "messages/MPGStats.h" + +#include "os/Transaction.h" +#include "osd/ClassHandler.h" +#include "osd/OSDCap.h" +#include "osd/PGPeeringEvent.h" +#include "osd/PeeringState.h" + +#include "crimson/admin/osd_admin.h" +#include "crimson/admin/pg_commands.h" +#include "crimson/common/buffer_io.h" +#include "crimson/common/exception.h" +#include "crimson/mon/MonClient.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Messenger.h" +#include "crimson/os/futurized_collection.h" +#include "crimson/os/futurized_store.h" +#include "crimson/osd/heartbeat.h" +#include "crimson/osd/osd_meta.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/pg_backend.h" +#include "crimson/osd/pg_meta.h" +#include "crimson/osd/osd_operations/client_request.h" +#include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/osd_operations/pg_advance_map.h" +#include "crimson/osd/osd_operations/recovery_subrequest.h" +#include "crimson/osd/osd_operations/replicated_request.h" +#include "crimson/osd/osd_operation_external_tracking.h" +#include "crimson/crush/CrushLocation.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } + static constexpr int TICK_INTERVAL = 1; +} + +using std::make_unique; +using std::map; +using std::pair; +using std::string; +using std::unique_ptr; +using std::vector; + +using crimson::common::local_conf; +using crimson::os::FuturizedStore; + +namespace crimson::osd { + +OSD::OSD(int id, uint32_t nonce, + seastar::abort_source& abort_source, + crimson::os::FuturizedStore& store, + crimson::net::MessengerRef cluster_msgr, + crimson::net::MessengerRef public_msgr, + crimson::net::MessengerRef hb_front_msgr, + crimson::net::MessengerRef hb_back_msgr) + : whoami{id}, + nonce{nonce}, + abort_source{abort_source}, + // do this in background + beacon_timer{[this] { (void)send_beacon(); }}, + cluster_msgr{cluster_msgr}, + public_msgr{public_msgr}, + hb_front_msgr{hb_front_msgr}, + hb_back_msgr{hb_back_msgr}, + monc{new crimson::mon::Client{*public_msgr, *this}}, + mgrc{new crimson::mgr::Client{*public_msgr, *this}}, + store{store}, + pg_shard_manager{osd_singleton_state, + shard_services, + pg_to_shard_mappings}, + // do this in background -- continuation rearms timer when complete + tick_timer{[this] { + std::ignore = update_heartbeat_peers( + ).then([this] { + update_stats(); + tick_timer.arm( + std::chrono::seconds(TICK_INTERVAL)); + }); + }}, + asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()}, + log_client(cluster_msgr.get(), LogClient::NO_FLAGS), + clog(log_client.create_channel()) +{ + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr), + std::ref(hb_front_msgr), std::ref(hb_back_msgr)}) { + msgr.get()->set_auth_server(monc.get()); + msgr.get()->set_auth_client(monc.get()); + } + + if (local_conf()->osd_open_classes_on_start) { + const int r = ClassHandler::get_instance().open_all_classes(); + if (r) { + logger().warn("{} warning: got an error loading one or more classes: {}", + __func__, cpp_strerror(r)); + } + } + logger().info("{}: nonce is {}", __func__, nonce); + monc->set_log_client(&log_client); + clog->set_log_to_monitors(true); +} + +OSD::~OSD() = default; + +namespace { +// Initial features in new superblock. +// Features here are also automatically upgraded +CompatSet get_osd_initial_compat_set() +{ + CompatSet::FeatureSet ceph_osd_feature_compat; + CompatSet::FeatureSet ceph_osd_feature_ro_compat; + CompatSet::FeatureSet ceph_osd_feature_incompat; + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_OLOC); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEC); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_CATEGORIES); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HOBJECTPOOL); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BIGINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBLOG); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SNAPMAPPER); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HINTS); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES); + return CompatSet(ceph_osd_feature_compat, + ceph_osd_feature_ro_compat, + ceph_osd_feature_incompat); +} +} + +seastar::future<> OSD::open_meta_coll() +{ + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return store.get_sharded_store().open_collection( + coll_t::meta() + ).then([this](auto ch) { + pg_shard_manager.init_meta_coll(ch, store.get_sharded_store()); + return seastar::now(); + }); +} + +seastar::future<OSDMeta> OSD::open_or_create_meta_coll(FuturizedStore &store) +{ + return store.get_sharded_store().open_collection(coll_t::meta()).then([&store](auto ch) { + if (!ch) { + return store.get_sharded_store().create_new_collection( + coll_t::meta() + ).then([&store](auto ch) { + return OSDMeta(ch, store.get_sharded_store()); + }); + } else { + return seastar::make_ready_future<OSDMeta>(ch, store.get_sharded_store()); + } + }); +} + +seastar::future<> OSD::mkfs( + FuturizedStore &store, + unsigned whoami, + uuid_d osd_uuid, + uuid_d cluster_fsid, + std::string osdspec_affinity) +{ + return store.start().then([&store, osd_uuid] { + return store.mkfs(osd_uuid).handle_error( + crimson::stateful_ec::handle([] (const auto& ec) { + logger().error("error creating empty object store in {}: ({}) {}", + local_conf().get_val<std::string>("osd_data"), + ec.value(), ec.message()); + std::exit(EXIT_FAILURE); + })); + }).then([&store] { + return store.mount().handle_error( + crimson::stateful_ec::handle([](const auto& ec) { + logger().error("error mounting object store in {}: ({}) {}", + local_conf().get_val<std::string>("osd_data"), + ec.value(), ec.message()); + std::exit(EXIT_FAILURE); + })); + }).then([&store] { + return open_or_create_meta_coll(store); + }).then([&store, whoami, cluster_fsid](auto meta_coll) { + OSDSuperblock superblock; + superblock.cluster_fsid = cluster_fsid; + superblock.osd_fsid = store.get_fsid(); + superblock.whoami = whoami; + superblock.compat_features = get_osd_initial_compat_set(); + return _write_superblock( + store, std::move(meta_coll), std::move(superblock)); + }).then([&store, cluster_fsid] { + return store.write_meta("ceph_fsid", cluster_fsid.to_string()); + }).then([&store] { + return store.write_meta("magic", CEPH_OSD_ONDISK_MAGIC); + }).then([&store, whoami] { + return store.write_meta("whoami", std::to_string(whoami)); + }).then([&store] { + return _write_key_meta(store); + }).then([&store, osdspec_affinity=std::move(osdspec_affinity)] { + return store.write_meta("osdspec_affinity", osdspec_affinity); + }).then([&store] { + return store.write_meta("ready", "ready"); + }).then([&store, whoami, cluster_fsid] { + fmt::print("created object store {} for osd.{} fsid {}\n", + local_conf().get_val<std::string>("osd_data"), + whoami, cluster_fsid); + return store.umount(); + }).then([&store] { + return store.stop(); + }); +} + +seastar::future<> OSD::_write_superblock( + FuturizedStore &store, + OSDMeta meta_coll, + OSDSuperblock superblock) +{ + return seastar::do_with( + std::move(meta_coll), + std::move(superblock), + [&store](auto &meta_coll, auto &superblock) { + return meta_coll.load_superblock( + ).safe_then([&superblock](OSDSuperblock&& sb) { + if (sb.cluster_fsid != superblock.cluster_fsid) { + logger().error("provided cluster fsid {} != superblock's {}", + sb.cluster_fsid, superblock.cluster_fsid); + throw std::invalid_argument("mismatched fsid"); + } + if (sb.whoami != superblock.whoami) { + logger().error("provided osd id {} != superblock's {}", + sb.whoami, superblock.whoami); + throw std::invalid_argument("mismatched osd id"); + } + }).handle_error( + crimson::ct_error::enoent::handle([&store, &meta_coll, &superblock] { + // meta collection does not yet, create superblock + logger().info( + "{} writing superblock cluster_fsid {} osd_fsid {}", + "_write_superblock", + superblock.cluster_fsid, + superblock.osd_fsid); + ceph::os::Transaction t; + meta_coll.create(t); + meta_coll.store_superblock(t, superblock); + logger().debug("OSD::_write_superblock: do_transaction..."); + return store.get_sharded_store().do_transaction( + meta_coll.collection(), + std::move(t)); + }), + crimson::ct_error::assert_all("_write_superbock error") + ); + }); +} + +// this `to_string` sits in the `crimson::osd` namespace, so we don't brake +// the language rule on not overloading in `std::`. +static std::string to_string(const seastar::temporary_buffer<char>& temp_buf) +{ + return {temp_buf.get(), temp_buf.size()}; +} + +seastar::future<> OSD::_write_key_meta(FuturizedStore &store) +{ + + if (auto key = local_conf().get_val<std::string>("key"); !std::empty(key)) { + return store.write_meta("osd_key", key); + } else if (auto keyfile = local_conf().get_val<std::string>("keyfile"); + !std::empty(keyfile)) { + return read_file(keyfile).then([&store](const auto& temp_buf) { + // it's on a truly cold path, so don't worry about memcpy. + return store.write_meta("osd_key", to_string(temp_buf)); + }).handle_exception([keyfile] (auto ep) { + logger().error("_write_key_meta: failed to handle keyfile {}: {}", + keyfile, ep); + ceph_abort(); + }); + } else { + return seastar::now(); + } +} + +namespace { + entity_addrvec_t pick_addresses(int what) { + entity_addrvec_t addrs; + crimson::common::CephContext cct; + // we're interested solely in v2; crimson doesn't do v1 + const auto flags = what | CEPH_PICK_ADDRESS_MSGR2; + if (int r = ::pick_addresses(&cct, flags, &addrs, -1); r < 0) { + throw std::runtime_error("failed to pick address"); + } + for (auto addr : addrs.v) { + logger().info("picked address {}", addr); + } + return addrs; + } + std::pair<entity_addrvec_t, bool> + replace_unknown_addrs(entity_addrvec_t maybe_unknowns, + const entity_addrvec_t& knowns) { + bool changed = false; + auto maybe_replace = [&](entity_addr_t addr) { + if (!addr.is_blank_ip()) { + return addr; + } + for (auto& b : knowns.v) { + if (addr.get_family() == b.get_family()) { + auto a = b; + a.set_nonce(addr.get_nonce()); + a.set_type(addr.get_type()); + a.set_port(addr.get_port()); + changed = true; + return a; + } + } + throw std::runtime_error("failed to replace unknown address"); + }; + entity_addrvec_t replaced; + std::transform(maybe_unknowns.v.begin(), + maybe_unknowns.v.end(), + std::back_inserter(replaced.v), + maybe_replace); + return {replaced, changed}; + } +} + +seastar::future<> OSD::start() +{ + logger().info("start"); + + startup_time = ceph::mono_clock::now(); + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return store.start().then([this] { + return pg_to_shard_mappings.start(0, seastar::smp::count + ).then([this] { + return osd_singleton_state.start_single( + whoami, std::ref(*cluster_msgr), std::ref(*public_msgr), + std::ref(*monc), std::ref(*mgrc)); + }).then([this] { + return osd_states.start(); + }).then([this] { + ceph::mono_time startup_time = ceph::mono_clock::now(); + return shard_services.start( + std::ref(osd_singleton_state), + std::ref(pg_to_shard_mappings), + whoami, + startup_time, + osd_singleton_state.local().perf, + osd_singleton_state.local().recoverystate_perf, + std::ref(store), + std::ref(osd_states)); + }); + }).then([this] { + heartbeat.reset(new Heartbeat{ + whoami, get_shard_services(), + *monc, *hb_front_msgr, *hb_back_msgr}); + return store.mount().handle_error( + crimson::stateful_ec::handle([] (const auto& ec) { + logger().error("error mounting object store in {}: ({}) {}", + local_conf().get_val<std::string>("osd_data"), + ec.value(), ec.message()); + std::exit(EXIT_FAILURE); + })); + }).then([this] { + return open_meta_coll(); + }).then([this] { + return pg_shard_manager.get_meta_coll().load_superblock( + ).handle_error( + crimson::ct_error::assert_all("open_meta_coll error") + ); + }).then([this](OSDSuperblock&& sb) { + superblock = std::move(sb); + pg_shard_manager.set_superblock(superblock); + return pg_shard_manager.get_local_map(superblock.current_epoch); + }).then([this](OSDMapService::local_cached_map_t&& map) { + osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(map)); + return pg_shard_manager.update_map(std::move(map)); + }).then([this] { + return shard_services.invoke_on_all([this](auto &local_service) { + local_service.local_state.osdmap_gate.got_map(osdmap->get_epoch()); + }); + }).then([this] { + bind_epoch = osdmap->get_epoch(); + return pg_shard_manager.load_pgs(store); + }).then([this] { + uint64_t osd_required = + CEPH_FEATURE_UID | + CEPH_FEATURE_PGID64 | + CEPH_FEATURE_OSDENC; + using crimson::net::SocketPolicy; + + public_msgr->set_default_policy(SocketPolicy::stateless_server(0)); + public_msgr->set_policy(entity_name_t::TYPE_MON, + SocketPolicy::lossy_client(osd_required)); + public_msgr->set_policy(entity_name_t::TYPE_MGR, + SocketPolicy::lossy_client(osd_required)); + public_msgr->set_policy(entity_name_t::TYPE_OSD, + SocketPolicy::stateless_server(0)); + + cluster_msgr->set_default_policy(SocketPolicy::stateless_server(0)); + cluster_msgr->set_policy(entity_name_t::TYPE_MON, + SocketPolicy::lossy_client(0)); + cluster_msgr->set_policy(entity_name_t::TYPE_OSD, + SocketPolicy::lossless_peer(osd_required)); + cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT, + SocketPolicy::stateless_server(0)); + + crimson::net::dispatchers_t dispatchers{this, monc.get(), mgrc.get()}; + return seastar::when_all_succeed( + cluster_msgr->bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER)) + .safe_then([this, dispatchers]() mutable { + return cluster_msgr->start(dispatchers); + }, crimson::net::Messenger::bind_ertr::all_same_way( + [] (const std::error_code& e) { + logger().error("cluster messenger bind(): {}", e); + ceph_abort(); + })), + public_msgr->bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC)) + .safe_then([this, dispatchers]() mutable { + return public_msgr->start(dispatchers); + }, crimson::net::Messenger::bind_ertr::all_same_way( + [] (const std::error_code& e) { + logger().error("public messenger bind(): {}", e); + ceph_abort(); + }))); + }).then_unpack([this] { + return seastar::when_all_succeed(monc->start(), + mgrc->start()); + }).then_unpack([this] { + return _add_me_to_crush(); + }).then([this] { + monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0); + monc->sub_want("mgrmap", 0, 0); + monc->sub_want("osdmap", 0, 0); + return monc->renew_subs(); + }).then([this] { + if (auto [addrs, changed] = + replace_unknown_addrs(cluster_msgr->get_myaddrs(), + public_msgr->get_myaddrs()); changed) { + logger().debug("replacing unkwnown addrs of cluster messenger"); + cluster_msgr->set_myaddrs(addrs); + } + return heartbeat->start(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), + pick_addresses(CEPH_PICK_ADDRESS_CLUSTER)); + }).then([this] { + // create the admin-socket server, and the objects that register + // to handle incoming commands + return start_asok_admin(); + }).then([this] { + return log_client.set_fsid(monc->get_fsid()); + }).then([this] { + return start_boot(); + }); +} + +seastar::future<> OSD::start_boot() +{ + pg_shard_manager.set_preboot(); + return monc->get_version("osdmap").then([this](auto&& ret) { + auto [newest, oldest] = ret; + return _preboot(oldest, newest); + }); +} + +seastar::future<> OSD::_preboot(version_t oldest, version_t newest) +{ + logger().info("osd.{}: _preboot", whoami); + if (osdmap->get_epoch() == 0) { + logger().info("waiting for initial osdmap"); + } else if (osdmap->is_destroyed(whoami)) { + logger().warn("osdmap says I am destroyed"); + // provide a small margin so we don't livelock seeing if we + // un-destroyed ourselves. + if (osdmap->get_epoch() > newest - 1) { + throw std::runtime_error("i am destroyed"); + } + } else if (osdmap->is_noup(whoami)) { + logger().warn("osdmap NOUP flag is set, waiting for it to clear"); + } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) { + logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it"); + } else if (osdmap->require_osd_release < ceph_release_t::octopus) { + logger().error("osdmap require_osd_release < octopus; please upgrade to octopus"); + } else if (false) { + // TODO: update mon if current fullness state is different from osdmap + } else if (version_t n = local_conf()->osd_map_message_max; + osdmap->get_epoch() >= oldest - 1 && + osdmap->get_epoch() + n > newest) { + return _send_boot(); + } + // get all the latest maps + if (osdmap->get_epoch() + 1 >= oldest) { + return get_shard_services().osdmap_subscribe(osdmap->get_epoch() + 1, false); + } else { + return get_shard_services().osdmap_subscribe(oldest - 1, true); + } +} + +seastar::future<> OSD::_send_boot() +{ + pg_shard_manager.set_booting(); + + entity_addrvec_t public_addrs = public_msgr->get_myaddrs(); + entity_addrvec_t cluster_addrs = cluster_msgr->get_myaddrs(); + entity_addrvec_t hb_back_addrs = heartbeat->get_back_addrs(); + entity_addrvec_t hb_front_addrs = heartbeat->get_front_addrs(); + if (cluster_msgr->set_addr_unknowns(public_addrs)) { + cluster_addrs = cluster_msgr->get_myaddrs(); + } + if (heartbeat->get_back_msgr().set_addr_unknowns(cluster_addrs)) { + hb_back_addrs = heartbeat->get_back_addrs(); + } + if (heartbeat->get_front_msgr().set_addr_unknowns(public_addrs)) { + hb_front_addrs = heartbeat->get_front_addrs(); + } + logger().info("hb_back_msgr: {}", hb_back_addrs); + logger().info("hb_front_msgr: {}", hb_front_addrs); + logger().info("cluster_msgr: {}", cluster_addrs); + + auto m = crimson::make_message<MOSDBoot>(superblock, + osdmap->get_epoch(), + boot_epoch, + hb_back_addrs, + hb_front_addrs, + cluster_addrs, + CEPH_FEATURES_ALL); + collect_sys_info(&m->metadata, NULL); + + // See OSDMonitor::preprocess_boot, prevents boot without allow_crimson + // OSDMap flag + m->metadata["osd_type"] = "crimson"; + return monc->send_message(std::move(m)); +} + +seastar::future<> OSD::_add_me_to_crush() +{ + if (!local_conf().get_val<bool>("osd_crush_update_on_start")) { + return seastar::now(); + } + auto get_weight = [this] { + if (auto w = local_conf().get_val<double>("osd_crush_initial_weight"); + w >= 0) { + return seastar::make_ready_future<double>(w); + } else { + return store.stat().then([](auto st) { + auto total = st.total; + return seastar::make_ready_future<double>( + std::max(.00001, + double(total) / double(1ull << 40))); // TB + }); + } + }; + return get_weight().then([this](auto weight) { + const crimson::crush::CrushLocation loc; + return seastar::do_with( + std::move(loc), + [this, weight] (crimson::crush::CrushLocation& loc) { + return loc.init_on_startup().then([this, weight, &loc]() { + logger().info("crush location is {}", loc); + string cmd = fmt::format(R"({{ + "prefix": "osd crush create-or-move", + "id": {}, + "weight": {:.4f}, + "args": [{}] + }})", whoami, weight, loc); + return monc->run_command(std::move(cmd), {}); + }); + }); + }).then([](auto&& command_result) { + [[maybe_unused]] auto [code, message, out] = std::move(command_result); + if (code) { + logger().warn("fail to add to crush: {} ({})", message, code); + throw std::runtime_error("fail to add to crush"); + } else { + logger().info("added to crush: {}", message); + } + return seastar::now(); + }); +} + +seastar::future<> OSD::handle_command( + crimson::net::ConnectionRef conn, + Ref<MCommand> m) +{ + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return asok->handle_command(conn, std::move(m)); +} + +/* + The OSD's Admin Socket object created here has two servers (i.e. - blocks of commands + to handle) registered to it: + - OSD's specific commands are handled by the OSD object; + - there are some common commands registered to be directly handled by the AdminSocket object + itself. +*/ +seastar::future<> OSD::start_asok_admin() +{ + auto asok_path = local_conf().get_val<std::string>("admin_socket"); + using namespace crimson::admin; + return asok->start(asok_path).then([this] { + asok->register_admin_commands(); + asok->register_command(make_asok_hook<OsdStatusHook>(std::as_const(*this))); + asok->register_command(make_asok_hook<SendBeaconHook>(*this)); + asok->register_command(make_asok_hook<FlushPgStatsHook>(*this)); + asok->register_command( + make_asok_hook<DumpPGStateHistory>(std::as_const(pg_shard_manager))); + asok->register_command(make_asok_hook<DumpMetricsHook>()); + asok->register_command(make_asok_hook<DumpPerfCountersHook>()); + asok->register_command(make_asok_hook<InjectDataErrorHook>(get_shard_services())); + asok->register_command(make_asok_hook<InjectMDataErrorHook>(get_shard_services())); + // PG commands + asok->register_command(make_asok_hook<pg::QueryCommand>(*this)); + asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this)); + // ops commands + asok->register_command( + make_asok_hook<DumpInFlightOpsHook>( + std::as_const(pg_shard_manager))); + asok->register_command( + make_asok_hook<DumpHistoricOpsHook>( + std::as_const(get_shard_services().get_registry()))); + asok->register_command( + make_asok_hook<DumpSlowestHistoricOpsHook>( + std::as_const(get_shard_services().get_registry()))); + asok->register_command( + make_asok_hook<DumpRecoveryReservationsHook>(get_shard_services())); + }); +} + +seastar::future<> OSD::stop() +{ + logger().info("stop"); + beacon_timer.cancel(); + tick_timer.cancel(); + // see also OSD::shutdown() + return prepare_to_stop().then([this] { + return pg_shard_manager.set_stopping(); + }).then([this] { + logger().debug("prepared to stop"); + public_msgr->stop(); + cluster_msgr->stop(); + auto gate_close_fut = gate.close(); + return asok->stop().then([this] { + return heartbeat->stop(); + }).then([this] { + return pg_shard_manager.stop_registries(); + }).then([this] { + return store.umount(); + }).then([this] { + return store.stop(); + }).then([this] { + return pg_shard_manager.stop_pgs(); + }).then([this] { + return monc->stop(); + }).then([this] { + return mgrc->stop(); + }).then([this] { + return shard_services.stop(); + }).then([this] { + return osd_states.stop(); + }).then([this] { + return osd_singleton_state.stop(); + }).then([this] { + return pg_to_shard_mappings.stop(); + }).then([fut=std::move(gate_close_fut)]() mutable { + return std::move(fut); + }).then([this] { + return when_all_succeed( + public_msgr->shutdown(), + cluster_msgr->shutdown()).discard_result(); + }).handle_exception([](auto ep) { + logger().error("error while stopping osd: {}", ep); + }); + }); +} + +void OSD::dump_status(Formatter* f) const +{ + f->dump_stream("cluster_fsid") << superblock.cluster_fsid; + f->dump_stream("osd_fsid") << superblock.osd_fsid; + f->dump_unsigned("whoami", superblock.whoami); + f->dump_string("state", pg_shard_manager.get_osd_state_string()); + f->dump_unsigned("oldest_map", superblock.oldest_map); + f->dump_unsigned("cluster_osdmap_trim_lower_bound", + superblock.cluster_osdmap_trim_lower_bound); + f->dump_unsigned("newest_map", superblock.newest_map); + f->dump_unsigned("num_pgs", pg_shard_manager.get_num_pgs()); +} + +void OSD::print(std::ostream& out) const +{ + out << "{osd." << superblock.whoami << " " + << superblock.osd_fsid << " [" << superblock.oldest_map + << "," << superblock.newest_map << "] " + << "tlb:" << superblock.cluster_osdmap_trim_lower_bound + << " pgs:" << pg_shard_manager.get_num_pgs() + << "}"; +} + +std::optional<seastar::future<>> +OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) +{ + if (pg_shard_manager.is_stopping()) { + return seastar::now(); + } + auto maybe_ret = do_ms_dispatch(conn, std::move(m)); + if (!maybe_ret.has_value()) { + return std::nullopt; + } + + gate.dispatch_in_background( + __func__, *this, [ret=std::move(maybe_ret.value())]() mutable { + return std::move(ret); + }); + return seastar::now(); +} + +std::optional<seastar::future<>> +OSD::do_ms_dispatch( + crimson::net::ConnectionRef conn, + MessageRef m) +{ + if (seastar::this_shard_id() != PRIMARY_CORE) { + switch (m->get_type()) { + case CEPH_MSG_OSD_MAP: + case MSG_COMMAND: + case MSG_OSD_MARK_ME_DOWN: + // FIXME: order is not guaranteed in this path + return conn.get_foreign( + ).then([this, m=std::move(m)](auto f_conn) { + return seastar::smp::submit_to(PRIMARY_CORE, + [f_conn=std::move(f_conn), m=std::move(m), this]() mutable { + auto conn = make_local_shared_foreign(std::move(f_conn)); + auto ret = do_ms_dispatch(conn, std::move(m)); + assert(ret.has_value()); + return std::move(ret.value()); + }); + }); + } + } + + switch (m->get_type()) { + case CEPH_MSG_OSD_MAP: + return handle_osd_map(boost::static_pointer_cast<MOSDMap>(m)); + case CEPH_MSG_OSD_OP: + return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m)); + case MSG_OSD_PG_CREATE2: + return handle_pg_create( + conn, boost::static_pointer_cast<MOSDPGCreate2>(m)); + return seastar::now(); + case MSG_COMMAND: + return handle_command(conn, boost::static_pointer_cast<MCommand>(m)); + case MSG_OSD_MARK_ME_DOWN: + return handle_mark_me_down(conn, boost::static_pointer_cast<MOSDMarkMeDown>(m)); + case MSG_OSD_PG_PULL: + [[fallthrough]]; + case MSG_OSD_PG_PUSH: + [[fallthrough]]; + case MSG_OSD_PG_PUSH_REPLY: + [[fallthrough]]; + case MSG_OSD_PG_RECOVERY_DELETE: + [[fallthrough]]; + case MSG_OSD_PG_RECOVERY_DELETE_REPLY: + [[fallthrough]]; + case MSG_OSD_PG_SCAN: + [[fallthrough]]; + case MSG_OSD_PG_BACKFILL: + [[fallthrough]]; + case MSG_OSD_PG_BACKFILL_REMOVE: + return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m)); + case MSG_OSD_PG_LEASE: + [[fallthrough]]; + case MSG_OSD_PG_LEASE_ACK: + [[fallthrough]]; + case MSG_OSD_PG_NOTIFY2: + [[fallthrough]]; + case MSG_OSD_PG_INFO2: + [[fallthrough]]; + case MSG_OSD_PG_QUERY2: + [[fallthrough]]; + case MSG_OSD_BACKFILL_RESERVE: + [[fallthrough]]; + case MSG_OSD_RECOVERY_RESERVE: + [[fallthrough]]; + case MSG_OSD_PG_LOG: + return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m)); + case MSG_OSD_REPOP: + return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m)); + case MSG_OSD_REPOPREPLY: + return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m)); + case MSG_OSD_SCRUB2: + return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m)); + case MSG_OSD_PG_UPDATE_LOG_MISSING: + return handle_update_log_missing(conn, boost::static_pointer_cast< + MOSDPGUpdateLogMissing>(m)); + case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: + return handle_update_log_missing_reply(conn, boost::static_pointer_cast< + MOSDPGUpdateLogMissingReply>(m)); + default: + return std::nullopt; + } +} + +void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) +{ + // TODO: cleanup the session attached to this connection + logger().warn("ms_handle_reset"); +} + +void OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn) +{ + logger().warn("ms_handle_remote_reset"); +} + +void OSD::handle_authentication(const EntityName& name, + const AuthCapsInfo& caps_info) +{ + // TODO: store the parsed cap and associate it with the connection + if (caps_info.allow_all) { + logger().debug("{} {} has all caps", __func__, name); + return; + } + if (caps_info.caps.length() > 0) { + auto p = caps_info.caps.cbegin(); + string str; + try { + decode(str, p); + } catch (ceph::buffer::error& e) { + logger().warn("{} {} failed to decode caps string", __func__, name); + return; + } + OSDCap caps; + if (caps.parse(str)) { + logger().debug("{} {} has caps {}", __func__, name, str); + } else { + logger().warn("{} {} failed to parse caps {}", __func__, name, str); + } + } +} + +void OSD::update_stats() +{ + osd_stat_seq++; + osd_stat.up_from = get_shard_services().get_up_epoch(); + osd_stat.hb_peers = heartbeat->get_peers(); + osd_stat.seq = ( + static_cast<uint64_t>(get_shard_services().get_up_epoch()) << 32 + ) | osd_stat_seq; + gate.dispatch_in_background("statfs", *this, [this] { + (void) store.stat().then([this](store_statfs_t&& st) { + osd_stat.statfs = st; + }); + }); +} + +seastar::future<MessageURef> OSD::get_stats() const +{ + // MPGStats::had_map_for is not used since PGMonitor was removed + auto m = crimson::make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch()); + m->osd_stat = osd_stat; + return pg_shard_manager.get_pg_stats( + ).then([m=std::move(m)](auto &&stats) mutable { + m->pg_stat = std::move(stats); + return seastar::make_ready_future<MessageURef>(std::move(m)); + }); +} + +uint64_t OSD::send_pg_stats() +{ + // mgr client sends the report message in background + mgrc->report(); + return osd_stat.seq; +} + +seastar::future<> OSD::handle_osd_map(Ref<MOSDMap> m) +{ + /* Ensure that only one MOSDMap is processed at a time. Allowing concurrent + * processing may eventually be worthwhile, but such an implementation would + * need to ensure (among other things) + * 1. any particular map is only processed once + * 2. PGAdvanceMap operations are processed in order for each PG + * As map handling is not presently a bottleneck, we stick to this + * simpler invariant for now. + * See https://tracker.ceph.com/issues/59165 + */ + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + return handle_osd_map_lock.lock().then([this, m] { + return _handle_osd_map(m); + }).finally([this] { + return handle_osd_map_lock.unlock(); + }); +} + +seastar::future<> OSD::_handle_osd_map(Ref<MOSDMap> m) +{ + logger().info("handle_osd_map {}", *m); + if (m->fsid != superblock.cluster_fsid) { + logger().warn("fsid mismatched"); + return seastar::now(); + } + if (pg_shard_manager.is_initializing()) { + logger().warn("i am still initializing"); + return seastar::now(); + } + + const auto first = m->get_first(); + const auto last = m->get_last(); + logger().info("handle_osd_map epochs [{}..{}], i have {}, src has [{}..{}]", + first, last, superblock.newest_map, + m->cluster_osdmap_trim_lower_bound, m->newest_map); + // make sure there is something new, here, before we bother flushing + // the queues and such + if (last <= superblock.newest_map) { + return seastar::now(); + } + // missing some? + bool skip_maps = false; + epoch_t start = superblock.newest_map + 1; + if (first > start) { + logger().info("handle_osd_map message skips epochs {}..{}", + start, first - 1); + if (m->cluster_osdmap_trim_lower_bound <= start) { + return get_shard_services().osdmap_subscribe(start, false); + } + // always try to get the full range of maps--as many as we can. this + // 1- is good to have + // 2- is at present the only way to ensure that we get a *full* map as + // the first map! + if (m->cluster_osdmap_trim_lower_bound < first) { + return get_shard_services().osdmap_subscribe( + m->cluster_osdmap_trim_lower_bound - 1, true); + } + skip_maps = true; + start = first; + } + + return seastar::do_with(ceph::os::Transaction{}, + [=, this](auto& t) { + return pg_shard_manager.store_maps(t, start, m).then([=, this, &t] { + // even if this map isn't from a mon, we may have satisfied our subscription + monc->sub_got("osdmap", last); + if (!superblock.oldest_map || skip_maps) { + superblock.oldest_map = first; + } + superblock.newest_map = last; + superblock.current_epoch = last; + + // note in the superblock that we were clean thru the prior epoch + if (boot_epoch && boot_epoch >= superblock.mounted) { + superblock.mounted = boot_epoch; + superblock.clean_thru = last; + } + pg_shard_manager.get_meta_coll().store_superblock(t, superblock); + pg_shard_manager.set_superblock(superblock); + logger().debug("OSD::handle_osd_map: do_transaction..."); + return store.get_sharded_store().do_transaction( + pg_shard_manager.get_meta_coll().collection(), + std::move(t)); + }); + }).then([=, this] { + // TODO: write to superblock and commit the transaction + return committed_osd_maps(start, last, m); + }); +} + +seastar::future<> OSD::committed_osd_maps( + version_t first, + version_t last, + Ref<MOSDMap> m) +{ + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last); + // advance through the new maps + return seastar::do_for_each(boost::make_counting_iterator(first), + boost::make_counting_iterator(last + 1), + [this](epoch_t cur) { + return pg_shard_manager.get_local_map( + cur + ).then([this](OSDMapService::local_cached_map_t&& o) { + osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o)); + return pg_shard_manager.update_map(std::move(o)); + }).then([this] { + if (get_shard_services().get_up_epoch() == 0 && + osdmap->is_up(whoami) && + osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) { + return pg_shard_manager.set_up_epoch( + osdmap->get_epoch() + ).then([this] { + if (!boot_epoch) { + boot_epoch = osdmap->get_epoch(); + } + }); + } else { + return seastar::now(); + } + }); + }).then([m, this] { + auto fut = seastar::now(); + if (osdmap->is_up(whoami)) { + const auto up_from = osdmap->get_up_from(whoami); + logger().info("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}", + whoami, osdmap->get_epoch(), up_from, bind_epoch, + pg_shard_manager.get_osd_state_string()); + if (bind_epoch < up_from && + osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() && + pg_shard_manager.is_booting()) { + logger().info("osd.{}: activating...", whoami); + fut = pg_shard_manager.set_active().then([this] { + beacon_timer.arm_periodic( + std::chrono::seconds(local_conf()->osd_beacon_report_interval)); + // timer continuation rearms when complete + tick_timer.arm( + std::chrono::seconds(TICK_INTERVAL)); + }); + } + } else { + if (pg_shard_manager.is_prestop()) { + got_stop_ack(); + return seastar::now(); + } + } + return fut.then([this] { + return check_osdmap_features().then([this] { + // yay! + logger().info("osd.{}: committed_osd_maps: broadcasting osdmaps up" + " to {} epoch to pgs", whoami, osdmap->get_epoch()); + return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch()); + }); + }); + }).then([m, this] { + if (pg_shard_manager.is_active()) { + logger().info("osd.{}: now active", whoami); + if (!osdmap->exists(whoami) || + osdmap->is_stop(whoami)) { + return shutdown(); + } + if (should_restart()) { + return restart(); + } else { + return seastar::now(); + } + } else if (pg_shard_manager.is_preboot()) { + logger().info("osd.{}: now preboot", whoami); + + if (m->get_source().is_mon()) { + return _preboot( + m->cluster_osdmap_trim_lower_bound, m->newest_map); + } else { + logger().info("osd.{}: start_boot", whoami); + return start_boot(); + } + } else { + logger().info("osd.{}: now {}", whoami, + pg_shard_manager.get_osd_state_string()); + // XXX + return seastar::now(); + } + }); +} + +seastar::future<> OSD::handle_osd_op( + crimson::net::ConnectionRef conn, + Ref<MOSDOp> m) +{ + return pg_shard_manager.start_pg_operation<ClientRequest>( + get_shard_services(), + conn, + std::move(m)).second; +} + +seastar::future<> OSD::handle_pg_create( + crimson::net::ConnectionRef conn, + Ref<MOSDPGCreate2> m) +{ + return seastar::do_for_each(m->pgs, [this, conn, m](auto& pg) { + auto& [pgid, when] = pg; + const auto &[created, created_stamp] = when; + auto q = m->pg_extra.find(pgid); + ceph_assert(q != m->pg_extra.end()); + auto& [history, pi] = q->second; + logger().debug( + "{}: {} e{} @{} " + "history {} pi {}", + __func__, pgid, created, created_stamp, + history, pi); + if (!pi.empty() && + m->epoch < pi.get_bounds().second) { + logger().error( + "got pg_create on {} epoch {} " + "unmatched past_intervals {} (history {})", + pgid, m->epoch, + pi, history); + return seastar::now(); + } else { + return pg_shard_manager.start_pg_operation<RemotePeeringEvent>( + conn, + pg_shard_t(), + pgid, + m->epoch, + m->epoch, + NullEvt(), + true, + new PGCreateInfo(pgid, m->epoch, history, pi, true)).second; + } + }); +} + +seastar::future<> OSD::handle_update_log_missing( + crimson::net::ConnectionRef conn, + Ref<MOSDPGUpdateLogMissing> m) +{ + m->decode_payload(); + return pg_shard_manager.start_pg_operation<LogMissingRequest>( + std::move(conn), + std::move(m)).second; +} + +seastar::future<> OSD::handle_update_log_missing_reply( + crimson::net::ConnectionRef conn, + Ref<MOSDPGUpdateLogMissingReply> m) +{ + m->decode_payload(); + return pg_shard_manager.start_pg_operation<LogMissingRequestReply>( + std::move(conn), + std::move(m)).second; +} + +seastar::future<> OSD::handle_rep_op( + crimson::net::ConnectionRef conn, + Ref<MOSDRepOp> m) +{ + m->finish_decode(); + return pg_shard_manager.start_pg_operation<RepRequest>( + std::move(conn), + std::move(m)).second; +} + +seastar::future<> OSD::handle_rep_op_reply( + crimson::net::ConnectionRef conn, + Ref<MOSDRepOpReply> m) +{ + spg_t pgid = m->get_spg(); + return pg_shard_manager.with_pg( + pgid, + [m=std::move(m)](auto &&pg) { + if (pg) { + m->finish_decode(); + pg->handle_rep_op_reply(*m); + } else { + logger().warn("stale reply: {}", *m); + } + return seastar::now(); + }); +} + +seastar::future<> OSD::handle_scrub( + crimson::net::ConnectionRef conn, + Ref<MOSDScrub2> m) +{ + if (m->fsid != superblock.cluster_fsid) { + logger().warn("fsid mismatched"); + return seastar::now(); + } + return seastar::parallel_for_each(std::move(m->scrub_pgs), + [m, conn, this](spg_t pgid) { + pg_shard_t from_shard{static_cast<int>(m->get_source().num()), + pgid.shard}; + PeeringState::RequestScrub scrub_request{m->deep, m->repair}; + return pg_shard_manager.start_pg_operation<RemotePeeringEvent>( + conn, + from_shard, + pgid, + PGPeeringEvent{m->epoch, m->epoch, scrub_request}).second; + }); +} + +seastar::future<> OSD::handle_mark_me_down( + crimson::net::ConnectionRef conn, + Ref<MOSDMarkMeDown> m) +{ + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + if (pg_shard_manager.is_prestop()) { + got_stop_ack(); + } + return seastar::now(); +} + +seastar::future<> OSD::handle_recovery_subreq( + crimson::net::ConnectionRef conn, + Ref<MOSDFastDispatchOp> m) +{ + return pg_shard_manager.start_pg_operation<RecoverySubRequest>( + conn, std::move(m)).second; +} + +bool OSD::should_restart() const +{ + if (!osdmap->is_up(whoami)) { + logger().info("map e {} marked osd.{} down", + osdmap->get_epoch(), whoami); + return true; + } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) { + logger().error("map e {} had wrong client addr ({} != my {})", + osdmap->get_epoch(), + osdmap->get_addrs(whoami), + public_msgr->get_myaddrs()); + return true; + } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) { + logger().error("map e {} had wrong cluster addr ({} != my {})", + osdmap->get_epoch(), + osdmap->get_cluster_addrs(whoami), + cluster_msgr->get_myaddrs()); + return true; + } else { + return false; + } +} + +seastar::future<> OSD::restart() +{ + beacon_timer.cancel(); + tick_timer.cancel(); + return pg_shard_manager.set_up_epoch( + 0 + ).then([this] { + bind_epoch = osdmap->get_epoch(); + // TODO: promote to shutdown if being marked down for multiple times + // rebind messengers + return start_boot(); + }); +} + +seastar::future<> OSD::shutdown() +{ + logger().info("shutting down per osdmap"); + abort_source.request_abort(); + return seastar::now(); +} + +seastar::future<> OSD::send_beacon() +{ + if (!pg_shard_manager.is_active()) { + return seastar::now(); + } + // FIXME: min lec should be calculated from pg_stat + // and should set m->pgs + epoch_t min_last_epoch_clean = osdmap->get_epoch(); + auto m = crimson::make_message<MOSDBeacon>(osdmap->get_epoch(), + min_last_epoch_clean, + superblock.last_purged_snaps_scrub, + local_conf()->osd_beacon_report_interval); + return monc->send_message(std::move(m)); +} + +seastar::future<> OSD::update_heartbeat_peers() +{ + if (!pg_shard_manager.is_active()) { + return seastar::now();; + } + + pg_shard_manager.for_each_pgid([this](auto &pgid) { + vector<int> up, acting; + osdmap->pg_to_up_acting_osds(pgid.pgid, + &up, nullptr, + &acting, nullptr); + for (int osd : boost::join(up, acting)) { + if (osd == CRUSH_ITEM_NONE || osd == whoami) { + continue; + } else { + heartbeat->add_peer(osd, osdmap->get_epoch()); + } + } + }); + heartbeat->update_peers(whoami); + return seastar::now(); +} + +seastar::future<> OSD::handle_peering_op( + crimson::net::ConnectionRef conn, + Ref<MOSDPeeringOp> m) +{ + const int from = m->get_source().num(); + logger().debug("handle_peering_op on {} from {}", m->get_spg(), from); + m->set_features(conn->get_features()); + std::unique_ptr<PGPeeringEvent> evt(m->get_event()); + return pg_shard_manager.start_pg_operation<RemotePeeringEvent>( + conn, + pg_shard_t{from, m->get_spg().shard}, + m->get_spg(), + std::move(*evt)).second; +} + +seastar::future<> OSD::check_osdmap_features() +{ + assert(seastar::this_shard_id() == PRIMARY_CORE); + return store.write_meta( + "require_osd_release", + stringify((int)osdmap->require_osd_release)); +} + +seastar::future<> OSD::prepare_to_stop() +{ + if (osdmap && osdmap->is_up(whoami)) { + pg_shard_manager.set_prestop(); + const auto timeout = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::duration<double>( + local_conf().get_val<double>("osd_mon_shutdown_timeout"))); + + return seastar::with_timeout( + seastar::timer<>::clock::now() + timeout, + monc->send_message( + crimson::make_message<MOSDMarkMeDown>( + monc->get_fsid(), + whoami, + osdmap->get_addrs(whoami), + osdmap->get_epoch(), + true)).then([this] { + return stop_acked.get_future(); + }) + ).handle_exception_type( + [](seastar::timed_out_error&) { + return seastar::now(); + }); + } + return seastar::now(); +} + +} |