summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/osd.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/osd.cc')
-rw-r--r--src/crimson/osd/osd.cc658
1 files changed, 658 insertions, 0 deletions
diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc
new file mode 100644
index 00000000..bdb1642e
--- /dev/null
+++ b/src/crimson/osd/osd.cc
@@ -0,0 +1,658 @@
+#include "osd.h"
+
+#include <boost/range/join.hpp>
+#include <boost/smart_ptr/make_local_shared.hpp>
+
+#include "common/pick_address.h"
+#include "messages/MOSDBeacon.h"
+#include "messages/MOSDBoot.h"
+#include "messages/MOSDMap.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/os/cyan_collection.h"
+#include "crimson/os/cyan_object.h"
+#include "crimson/os/cyan_store.h"
+#include "crimson/os/Transaction.h"
+#include "crimson/osd/heartbeat.h"
+#include "crimson/osd/osd_meta.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/pg_meta.h"
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_osd);
+ }
+
+ template<typename Message, typename... Args>
+ Ref<Message> make_message(Args&&... args)
+ {
+ return {new Message{std::forward<Args>(args)...}, false};
+ }
+ static constexpr int TICK_INTERVAL = 1;
+}
+
+using ceph::common::local_conf;
+using ceph::os::CyanStore;
+
+OSD::OSD(int id, uint32_t nonce)
+ : whoami{id},
+ nonce{nonce},
+ beacon_timer{[this] { send_beacon(); }},
+ heartbeat_timer{[this] { update_heartbeat_peers(); }}
+{
+ osdmaps[0] = boost::make_local_shared<OSDMap>();
+}
+
+OSD::~OSD() = default;
+
+namespace {
+// Initial features in new superblock.
+// Features here are also automatically upgraded
+CompatSet get_osd_initial_compat_set()
+{
+ CompatSet::FeatureSet ceph_osd_feature_compat;
+ CompatSet::FeatureSet ceph_osd_feature_ro_compat;
+ CompatSet::FeatureSet ceph_osd_feature_incompat;
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGINFO);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_OLOC);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEC);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_CATEGORIES);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HOBJECTPOOL);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BIGINFO);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBINFO);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBLOG);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SNAPMAPPER);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HINTS);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO);
+ ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES);
+ return CompatSet(ceph_osd_feature_compat,
+ ceph_osd_feature_ro_compat,
+ ceph_osd_feature_incompat);
+}
+}
+
+seastar::future<> OSD::mkfs(uuid_d cluster_fsid)
+{
+ const auto data_path = local_conf().get_val<std::string>("osd_data");
+ store = std::make_unique<ceph::os::CyanStore>(data_path);
+ uuid_d osd_fsid;
+ osd_fsid.generate_random();
+ return store->mkfs(osd_fsid).then([this] {
+ return store->mount();
+ }).then([cluster_fsid, osd_fsid, this] {
+ superblock.cluster_fsid = cluster_fsid;
+ superblock.osd_fsid = osd_fsid;
+ superblock.whoami = whoami;
+ superblock.compat_features = get_osd_initial_compat_set();
+
+ meta_coll = make_unique<OSDMeta>(
+ store->create_new_collection(coll_t::meta()), store.get());
+ ceph::os::Transaction t;
+ meta_coll->create(t);
+ meta_coll->store_superblock(t, superblock);
+ return store->do_transaction(meta_coll->collection(), std::move(t));
+ }).then([cluster_fsid, this] {
+ store->write_meta("ceph_fsid", cluster_fsid.to_string());
+ store->write_meta("whoami", std::to_string(whoami));
+ return seastar::now();
+ });
+}
+
+namespace {
+ entity_addrvec_t pick_addresses(int what) {
+ entity_addrvec_t addrs;
+ CephContext cct;
+ if (int r = ::pick_addresses(&cct, what, &addrs, -1); r < 0) {
+ throw std::runtime_error("failed to pick address");
+ }
+ // TODO: v2: ::pick_addresses() returns v2 addresses, but crimson-msgr does
+ // not support v2 yet. remove following set_type() once v2 support is ready.
+ for (auto addr : addrs.v) {
+ addr.set_type(addr.TYPE_LEGACY);
+ logger().info("picked address {}", addr);
+ }
+ return addrs;
+ }
+ std::pair<entity_addrvec_t, bool>
+ replace_unknown_addrs(entity_addrvec_t maybe_unknowns,
+ const entity_addrvec_t& knowns) {
+ bool changed = false;
+ auto maybe_replace = [&](entity_addr_t addr) {
+ if (!addr.is_blank_ip()) {
+ return addr;
+ }
+ for (auto& b : knowns.v) {
+ if (addr.get_family() == b.get_family()) {
+ auto a = b;
+ a.set_nonce(addr.get_nonce());
+ a.set_type(addr.get_type());
+ a.set_port(addr.get_port());
+ changed = true;
+ return a;
+ }
+ }
+ throw std::runtime_error("failed to replace unknown address");
+ };
+ entity_addrvec_t replaced;
+ std::transform(maybe_unknowns.v.begin(),
+ maybe_unknowns.v.end(),
+ std::back_inserter(replaced.v),
+ maybe_replace);
+ return {replaced, changed};
+ }
+}
+
+seastar::future<> OSD::start()
+{
+ logger().info("start");
+
+ return seastar::when_all_succeed(
+ ceph::net::Messenger::create(entity_name_t::OSD(whoami),
+ "cluster",
+ nonce,
+ seastar::engine().cpu_id())
+ .then([this] (auto msgr) { cluster_msgr = msgr; }),
+ ceph::net::Messenger::create(entity_name_t::OSD(whoami),
+ "client",
+ nonce,
+ seastar::engine().cpu_id())
+ .then([this] (auto msgr) { public_msgr = msgr; }))
+ .then([this] {
+ monc.reset(new ceph::mon::Client{*public_msgr});
+ heartbeat.reset(new Heartbeat{whoami, nonce, *this, *monc});
+
+ for (auto msgr : {cluster_msgr, public_msgr}) {
+ if (local_conf()->ms_crc_data) {
+ msgr->set_crc_data();
+ }
+ if (local_conf()->ms_crc_header) {
+ msgr->set_crc_header();
+ }
+ }
+ dispatchers.push_front(this);
+ dispatchers.push_front(monc.get());
+
+ const auto data_path = local_conf().get_val<std::string>("osd_data");
+ store = std::make_unique<ceph::os::CyanStore>(data_path);
+ return store->mount();
+ }).then([this] {
+ meta_coll = make_unique<OSDMeta>(store->open_collection(coll_t::meta()),
+ store.get());
+ return meta_coll->load_superblock();
+ }).then([this](OSDSuperblock&& sb) {
+ superblock = std::move(sb);
+ return get_map(superblock.current_epoch);
+ }).then([this](cached_map_t&& map) {
+ osdmap = std::move(map);
+ return load_pgs();
+ }).then([this] {
+ return seastar::when_all_succeed(
+ cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
+ local_conf()->ms_bind_port_min,
+ local_conf()->ms_bind_port_max)
+ .then([this] { return cluster_msgr->start(&dispatchers); }),
+ public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
+ local_conf()->ms_bind_port_min,
+ local_conf()->ms_bind_port_max)
+ .then([this] { return public_msgr->start(&dispatchers); }));
+ }).then([this] {
+ return monc->start();
+ }).then([this] {
+ monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
+ monc->sub_want("mgrmap", 0, 0);
+ monc->sub_want("osdmap", 0, 0);
+ return monc->renew_subs();
+ }).then([this] {
+ if (auto [addrs, changed] =
+ replace_unknown_addrs(cluster_msgr->get_myaddrs(),
+ public_msgr->get_myaddrs()); changed) {
+ cluster_msgr->set_myaddrs(addrs);
+ }
+ return heartbeat->start(public_msgr->get_myaddrs(),
+ cluster_msgr->get_myaddrs());
+ }).then([this] {
+ return start_boot();
+ });
+}
+
+seastar::future<> OSD::start_boot()
+{
+ state.set_preboot();
+ return monc->get_version("osdmap").then([this](version_t newest, version_t oldest) {
+ return _preboot(oldest, newest);
+ });
+}
+
+seastar::future<> OSD::_preboot(version_t oldest, version_t newest)
+{
+ logger().info("osd.{}: _preboot", whoami);
+ if (osdmap->get_epoch() == 0) {
+ logger().warn("waiting for initial osdmap");
+ } else if (osdmap->is_destroyed(whoami)) {
+ logger().warn("osdmap says I am destroyed");
+ // provide a small margin so we don't livelock seeing if we
+ // un-destroyed ourselves.
+ if (osdmap->get_epoch() > newest - 1) {
+ throw std::runtime_error("i am destroyed");
+ }
+ } else if (osdmap->is_noup(whoami)) {
+ logger().warn("osdmap NOUP flag is set, waiting for it to clear");
+ } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
+ logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it");
+ } else if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+ logger().error("osdmap require_osd_release < luminous; please upgrade to luminous");
+ } else if (false) {
+ // TODO: update mon if current fullness state is different from osdmap
+ } else if (version_t n = local_conf()->osd_map_message_max;
+ osdmap->get_epoch() >= oldest - 1 &&
+ osdmap->get_epoch() + n > newest) {
+ return _send_boot();
+ }
+ // get all the latest maps
+ if (osdmap->get_epoch() + 1 >= oldest) {
+ return osdmap_subscribe(osdmap->get_epoch() + 1, false);
+ } else {
+ return osdmap_subscribe(oldest - 1, true);
+ }
+}
+
+seastar::future<> OSD::_send_boot()
+{
+ state.set_booting();
+
+ logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs());
+ logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs());
+ logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr());
+ auto m = make_message<MOSDBoot>(superblock,
+ osdmap->get_epoch(),
+ osdmap->get_epoch(),
+ heartbeat->get_back_addrs(),
+ heartbeat->get_front_addrs(),
+ cluster_msgr->get_myaddrs(),
+ CEPH_FEATURES_ALL);
+ return monc->send_message(m);
+}
+
+seastar::future<> OSD::stop()
+{
+ // see also OSD::shutdown()
+ state.set_stopping();
+ return gate.close().then([this] {
+ return heartbeat->stop();
+ }).then([this] {
+ return monc->stop();
+ }).then([this] {
+ return public_msgr->shutdown();
+ }).then([this] {
+ return cluster_msgr->shutdown();
+ });
+}
+
+seastar::future<> OSD::load_pgs()
+{
+ return seastar::parallel_for_each(store->list_collections(),
+ [this](auto coll) {
+ spg_t pgid;
+ if (coll.is_pg(&pgid)) {
+ return load_pg(pgid).then([pgid, this](auto&& pg) {
+ logger().info("load_pgs: loaded {}", pgid);
+ pgs.emplace(pgid, std::move(pg));
+ return seastar::now();
+ });
+ } else if (coll.is_temp(&pgid)) {
+ // TODO: remove the collection
+ return seastar::now();
+ } else {
+ logger().warn("ignoring unrecognized collection: {}", coll);
+ return seastar::now();
+ }
+ });
+}
+
+seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
+{
+ using ec_profile_t = map<string,string>;
+ return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) {
+ return get_map(e);
+ }).then([pgid, this] (auto&& create_map) {
+ if (create_map->have_pg_pool(pgid.pool())) {
+ pg_pool_t pi = *create_map->get_pg_pool(pgid.pool());
+ string name = create_map->get_pool_name(pgid.pool());
+ ec_profile_t ec_profile;
+ if (pi.is_erasure()) {
+ ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile);
+ }
+ return seastar::make_ready_future<pg_pool_t,
+ string,
+ ec_profile_t>(std::move(pi),
+ std::move(name),
+ std::move(ec_profile));
+ } else {
+ // pool was deleted; grab final pg_pool_t off disk.
+ return meta_coll->load_final_pool_info(pgid.pool());
+ }
+ }).then([this](pg_pool_t&& pool, string&& name, ec_profile_t&& ec_profile) {
+ Ref<PG> pg{new PG{std::move(pool),
+ std::move(name),
+ std::move(ec_profile)}};
+ return seastar::make_ready_future<Ref<PG>>(std::move(pg));
+ });
+}
+
+seastar::future<> OSD::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m)
+{
+ logger().info("ms_dispatch {}", *m);
+ if (state.is_stopping()) {
+ return seastar::now();
+ }
+
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_MAP:
+ return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
+ default:
+ return seastar::now();
+ }
+}
+
+seastar::future<> OSD::ms_handle_connect(ceph::net::ConnectionRef conn)
+{
+ if (conn->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
+ return seastar::now();
+ } else {
+ return seastar::now();
+ }
+}
+
+seastar::future<> OSD::ms_handle_reset(ceph::net::ConnectionRef conn)
+{
+ // TODO: cleanup the session attached to this connection
+ logger().warn("ms_handle_reset");
+ return seastar::now();
+}
+
+seastar::future<> OSD::ms_handle_remote_reset(ceph::net::ConnectionRef conn)
+{
+ logger().warn("ms_handle_remote_reset");
+ return seastar::now();
+}
+
+OSD::cached_map_t OSD::get_map() const
+{
+ return osdmap;
+}
+
+seastar::future<OSD::cached_map_t> OSD::get_map(epoch_t e)
+{
+ // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
+ if (auto found = osdmaps.find(e); found) {
+ return seastar::make_ready_future<cached_map_t>(std::move(found));
+ } else {
+ return load_map_bl(e).then([e, this](bufferlist bl) {
+ auto osdmap = std::make_unique<OSDMap>();
+ osdmap->decode(bl);
+ return seastar::make_ready_future<cached_map_t>(
+ osdmaps.insert(e, std::move(osdmap)));
+ });
+ }
+}
+
+void OSD::store_map_bl(ceph::os::Transaction& t,
+ epoch_t e, bufferlist&& bl)
+{
+ meta_coll->store_map(t, e, bl);
+ map_bl_cache.insert(e, std::move(bl));
+}
+
+seastar::future<bufferlist> OSD::load_map_bl(epoch_t e)
+{
+ if (std::optional<bufferlist> found = map_bl_cache.find(e); found) {
+ return seastar::make_ready_future<bufferlist>(*found);
+ } else {
+ return meta_coll->load_map(e);
+ }
+}
+
+seastar::future<> OSD::store_maps(ceph::os::Transaction& t,
+ epoch_t start, Ref<MOSDMap> m)
+{
+ return seastar::do_for_each(boost::counting_iterator<epoch_t>(start),
+ boost::counting_iterator<epoch_t>(m->get_last() + 1),
+ [&t, m, this](epoch_t e) {
+ if (auto p = m->maps.find(e); p != m->maps.end()) {
+ auto o = std::make_unique<OSDMap>();
+ o->decode(p->second);
+ logger().info("store_maps osdmap.{}", e);
+ store_map_bl(t, e, std::move(std::move(p->second)));
+ osdmaps.insert(e, std::move(o));
+ return seastar::now();
+ } else if (auto p = m->incremental_maps.find(e);
+ p != m->incremental_maps.end()) {
+ OSDMap::Incremental inc;
+ auto i = p->second.cbegin();
+ inc.decode(i);
+ return load_map_bl(e - 1)
+ .then([&t, e, inc=std::move(inc), this](bufferlist bl) {
+ auto o = std::make_unique<OSDMap>();
+ o->decode(bl);
+ o->apply_incremental(inc);
+ bufferlist fbl;
+ o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
+ store_map_bl(t, e, std::move(fbl));
+ osdmaps.insert(e, std::move(o));
+ return seastar::now();
+ });
+ } else {
+ logger().error("MOSDMap lied about what maps it had?");
+ return seastar::now();
+ }
+ });
+}
+
+seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request)
+{
+ logger().info("{}({})", __func__, epoch);
+ if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
+ force_request) {
+ return monc->renew_subs();
+ } else {
+ return seastar::now();
+ }
+}
+
+seastar::future<> OSD::handle_osd_map(ceph::net::ConnectionRef conn,
+ Ref<MOSDMap> m)
+{
+ logger().info("handle_osd_map {}", *m);
+ if (m->fsid != superblock.cluster_fsid) {
+ logger().warn("fsid mismatched");
+ return seastar::now();
+ }
+ if (state.is_initializing()) {
+ logger().warn("i am still initializing");
+ return seastar::now();
+ }
+
+ const auto first = m->get_first();
+ const auto last = m->get_last();
+
+ // make sure there is something new, here, before we bother flushing
+ // the queues and such
+ if (last <= superblock.newest_map) {
+ return seastar::now();
+ }
+ // missing some?
+ bool skip_maps = false;
+ epoch_t start = superblock.newest_map + 1;
+ if (first > start) {
+ logger().info("handle_osd_map message skips epochs {}..{}",
+ start, first - 1);
+ if (m->oldest_map <= start) {
+ return osdmap_subscribe(start, false);
+ }
+ // always try to get the full range of maps--as many as we can. this
+ // 1- is good to have
+ // 2- is at present the only way to ensure that we get a *full* map as
+ // the first map!
+ if (m->oldest_map < first) {
+ return osdmap_subscribe(m->oldest_map - 1, true);
+ }
+ skip_maps = true;
+ start = first;
+ }
+
+ return seastar::do_with(ceph::os::Transaction{},
+ [=](auto& t) {
+ return store_maps(t, start, m).then([=, &t] {
+ // even if this map isn't from a mon, we may have satisfied our subscription
+ monc->sub_got("osdmap", last);
+ if (!superblock.oldest_map || skip_maps) {
+ superblock.oldest_map = first;
+ }
+ superblock.newest_map = last;
+ superblock.current_epoch = last;
+
+ // note in the superblock that we were clean thru the prior epoch
+ if (boot_epoch && boot_epoch >= superblock.mounted) {
+ superblock.mounted = boot_epoch;
+ superblock.clean_thru = last;
+ }
+ meta_coll->store_superblock(t, superblock);
+ return store->do_transaction(meta_coll->collection(), std::move(t));
+ });
+ }).then([=] {
+ // TODO: write to superblock and commit the transaction
+ return committed_osd_maps(start, last, m);
+ });
+}
+
+seastar::future<> OSD::committed_osd_maps(version_t first,
+ version_t last,
+ Ref<MOSDMap> m)
+{
+ logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last);
+ // advance through the new maps
+ return seastar::parallel_for_each(boost::irange(first, last + 1),
+ [this](epoch_t cur) {
+ return get_map(cur).then([this](cached_map_t&& o) {
+ osdmap = std::move(o);
+ if (up_epoch != 0 &&
+ osdmap->is_up(whoami) &&
+ osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
+ up_epoch = osdmap->get_epoch();
+ if (!boot_epoch) {
+ boot_epoch = osdmap->get_epoch();
+ }
+ }
+ });
+ }).then([m, this] {
+ if (osdmap->is_up(whoami) &&
+ osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
+ bind_epoch < osdmap->get_up_from(whoami)) {
+ if (state.is_booting()) {
+ logger().info("osd.{}: activating...", whoami);
+ state.set_active();
+ beacon_timer.arm_periodic(
+ std::chrono::seconds(local_conf()->osd_beacon_report_interval));
+ heartbeat_timer.arm_periodic(
+ std::chrono::seconds(TICK_INTERVAL));
+ }
+ }
+
+ if (state.is_active()) {
+ logger().info("osd.{}: now active", whoami);
+ if (!osdmap->exists(whoami)) {
+ return shutdown();
+ }
+ if (should_restart()) {
+ return restart();
+ } else {
+ return seastar::now();
+ }
+ } else if (state.is_preboot()) {
+ logger().info("osd.{}: now preboot", whoami);
+
+ if (m->get_source().is_mon()) {
+ return _preboot(m->oldest_map, m->newest_map);
+ } else {
+ logger().info("osd.{}: start_boot", whoami);
+ return start_boot();
+ }
+ } else {
+ logger().info("osd.{}: now {}", whoami, state);
+ // XXX
+ return seastar::now();
+ }
+ });
+}
+
+bool OSD::should_restart() const
+{
+ if (!osdmap->is_up(whoami)) {
+ logger().info("map e {} marked osd.{} down",
+ osdmap->get_epoch(), whoami);
+ return true;
+ } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) {
+ logger().error("map e {} had wrong client addr ({} != my {})",
+ osdmap->get_epoch(),
+ osdmap->get_addrs(whoami),
+ public_msgr->get_myaddrs());
+ return true;
+ } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) {
+ logger().error("map e {} had wrong cluster addr ({} != my {})",
+ osdmap->get_epoch(),
+ osdmap->get_cluster_addrs(whoami),
+ cluster_msgr->get_myaddrs());
+ return true;
+ } else {
+ return false;
+ }
+}
+
+seastar::future<> OSD::restart()
+{
+ up_epoch = 0;
+ bind_epoch = osdmap->get_epoch();
+ // TODO: promote to shutdown if being marked down for multiple times
+ // rebind messengers
+ return start_boot();
+}
+
+seastar::future<> OSD::shutdown()
+{
+ // TODO
+ superblock.mounted = boot_epoch;
+ superblock.clean_thru = osdmap->get_epoch();
+ return seastar::now();
+}
+
+seastar::future<> OSD::send_beacon()
+{
+ // FIXME: min lec should be calculated from pg_stat
+ // and should set m->pgs
+ epoch_t min_last_epoch_clean = osdmap->get_epoch();
+ auto m = make_message<MOSDBeacon>(osdmap->get_epoch(),
+ min_last_epoch_clean);
+ return monc->send_message(m);
+}
+
+void OSD::update_heartbeat_peers()
+{
+ if (!state.is_active()) {
+ return;
+ }
+ for (auto& pg : pgs) {
+ vector<int> up, acting;
+ osdmap->pg_to_up_acting_osds(pg.first.pgid,
+ &up, nullptr,
+ &acting, nullptr);
+ for (auto osd : boost::join(up, acting)) {
+ if (osd != CRUSH_ITEM_NONE) {
+ heartbeat->add_peer(osd, osdmap->get_epoch());
+ }
+ }
+ }
+ heartbeat->update_peers(whoami);
+}