// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "PGPeeringEvent.h" #include "common/ceph_releases.h" #include "common/dout.h" #include "PeeringState.h" #include "messages/MOSDPGRemove.h" #include "messages/MBackfillReserve.h" #include "messages/MRecoveryReserve.h" #include "messages/MOSDScrubReserve.h" #include "messages/MOSDPGInfo2.h" #include "messages/MOSDPGTrim.h" #include "messages/MOSDPGLog.h" #include "messages/MOSDPGNotify2.h" #include "messages/MOSDPGQuery2.h" #include "messages/MOSDPGLease.h" #include "messages/MOSDPGLeaseAck.h" #define dout_context cct #define dout_subsys ceph_subsys_osd using std::dec; using std::hex; using std::make_pair; using std::map; using std::ostream; using std::pair; using std::set; using std::string; using std::stringstream; using std::vector; using ceph::Formatter; using ceph::make_message; BufferedRecoveryMessages::BufferedRecoveryMessages(PeeringCtx &ctx) // steal messages from ctx : message_map{std::move(ctx.message_map)} {} void BufferedRecoveryMessages::send_notify(int to, const pg_notify_t &n) { spg_t pgid(n.info.pgid.pgid, n.to); send_osd_message(to, TOPNSPC::make_message(pgid, n)); } void BufferedRecoveryMessages::send_query( int to, spg_t to_spgid, const pg_query_t &q) { send_osd_message(to, TOPNSPC::make_message(to_spgid, q)); } void BufferedRecoveryMessages::send_info( int to, spg_t to_spgid, epoch_t min_epoch, epoch_t cur_epoch, const pg_info_t &info, std::optional lease, std::optional lease_ack) { send_osd_message( to, TOPNSPC::make_message( to_spgid, info, cur_epoch, min_epoch, lease, lease_ack) ); } void PGPool::update(OSDMapRef map) { const pg_pool_t *pi = map->get_pg_pool(id); if (!pi) { return; // pool has been deleted } info = *pi; name = map->get_pool_name(id); bool updated = false; if ((map->get_epoch() != cached_epoch + 1) || (pi->get_snap_epoch() == map->get_epoch())) { updated = true; } if (info.is_pool_snaps_mode() && updated) { snapc = pi->get_snap_context(); } cached_epoch = map->get_epoch(); } /*-------------Peering State Helpers----------------*/ #undef dout_prefix #define dout_prefix (dpp->gen_prefix(*_dout)) #undef psdout #define psdout(x) ldout(cct, x) PeeringState::PeeringState( CephContext *cct, pg_shard_t pg_whoami, spg_t spgid, const PGPool &_pool, OSDMapRef curmap, DoutPrefixProvider *dpp, PeeringListener *pl) : state_history(*pl), cct(cct), spgid(spgid), dpp(dpp), pl(pl), orig_ctx(0), osdmap_ref(curmap), pool(_pool), pg_whoami(pg_whoami), info(spgid), pg_log(cct), last_require_osd_release(curmap->require_osd_release), missing_loc(spgid, this, dpp, cct), machine(this, cct, spgid, dpp, pl, &state_history) { machine.initiate(); } void PeeringState::start_handle(PeeringCtx *new_ctx) { ceph_assert(!rctx); ceph_assert(!orig_ctx); orig_ctx = new_ctx; if (new_ctx) { if (messages_pending_flush) { rctx.emplace(*messages_pending_flush, *new_ctx); } else { rctx.emplace(*new_ctx); } rctx->start_time = ceph_clock_now(); } } void PeeringState::begin_block_outgoing() { ceph_assert(!messages_pending_flush); ceph_assert(orig_ctx); ceph_assert(rctx); messages_pending_flush.emplace(); rctx.emplace(*messages_pending_flush, *orig_ctx); } void PeeringState::clear_blocked_outgoing() { ceph_assert(orig_ctx); ceph_assert(rctx); messages_pending_flush = std::optional(); } void PeeringState::end_block_outgoing() { ceph_assert(messages_pending_flush); ceph_assert(orig_ctx); ceph_assert(rctx); orig_ctx->accept_buffered_messages(*messages_pending_flush); rctx.emplace(*orig_ctx); messages_pending_flush = std::optional(); } void PeeringState::end_handle() { if (rctx) { utime_t dur = ceph_clock_now() - rctx->start_time; machine.event_time += dur; } machine.event_count++; rctx = std::nullopt; orig_ctx = NULL; } void PeeringState::check_recovery_sources(const OSDMapRef& osdmap) { /* * check that any peers we are planning to (or currently) pulling * objects from are dealt with. */ missing_loc.check_recovery_sources(osdmap); pl->check_recovery_sources(osdmap); for (auto i = peer_log_requested.begin(); i != peer_log_requested.end();) { if (!osdmap->is_up(i->osd)) { psdout(10) << "peer_log_requested removing " << *i << dendl; peer_log_requested.erase(i++); } else { ++i; } } for (auto i = peer_missing_requested.begin(); i != peer_missing_requested.end();) { if (!osdmap->is_up(i->osd)) { psdout(10) << "peer_missing_requested removing " << *i << dendl; peer_missing_requested.erase(i++); } else { ++i; } } } void PeeringState::update_history(const pg_history_t& new_history) { auto mnow = pl->get_mnow(); info.history.refresh_prior_readable_until_ub(mnow, prior_readable_until_ub); if (info.history.merge(new_history)) { psdout(20) << __func__ << " advanced history from " << new_history << dendl; dirty_info = true; if (info.history.last_epoch_clean >= info.history.same_interval_since) { psdout(20) << __func__ << " clearing past_intervals" << dendl; past_intervals.clear(); dirty_big_info = true; } prior_readable_until_ub = info.history.get_prior_readable_until_ub(mnow); if (prior_readable_until_ub != ceph::signedspan::zero()) { dout(20) << __func__ << " prior_readable_until_ub " << prior_readable_until_ub << " (mnow " << mnow << " + " << info.history.prior_readable_until_ub << ")" << dendl; } } pl->on_info_history_change(); } hobject_t PeeringState::earliest_backfill() const { hobject_t e = hobject_t::get_max(); for (const pg_shard_t& bt : get_backfill_targets()) { const pg_info_t &pi = get_peer_info(bt); e = std::min(pi.last_backfill, e); } return e; } void PeeringState::purge_strays() { if (is_premerge()) { psdout(10) << "purge_strays " << stray_set << " but premerge, doing nothing" << dendl; return; } if (cct->_conf.get_val("osd_debug_no_purge_strays")) { return; } psdout(10) << "purge_strays " << stray_set << dendl; bool removed = false; for (auto p = stray_set.begin(); p != stray_set.end(); ++p) { ceph_assert(!is_acting_recovery_backfill(*p)); if (get_osdmap()->is_up(p->osd)) { psdout(10) << "sending PGRemove to osd." << *p << dendl; vector to_remove; to_remove.push_back(spg_t(info.pgid.pgid, p->shard)); auto m = TOPNSPC::make_message( get_osdmap_epoch(), to_remove); pl->send_cluster_message(p->osd, std::move(m), get_osdmap_epoch()); } else { psdout(10) << "not sending PGRemove to down osd." << *p << dendl; } peer_missing.erase(*p); peer_info.erase(*p); missing_loc.remove_stray_recovery_sources(*p); peer_purged.insert(*p); removed = true; } // if we removed anyone, update peers (which include peer_info) if (removed) update_heartbeat_peers(); stray_set.clear(); // clear _requested maps; we may have to peer() again if we discover // (more) stray content peer_log_requested.clear(); peer_missing_requested.clear(); } void PeeringState::query_unfound(Formatter *f, string state) { psdout(20) << "Enter PeeringState common QueryUnfound" << dendl; { f->dump_string("state", state); f->dump_bool("available_might_have_unfound", true); f->open_array_section("might_have_unfound"); for (auto p = might_have_unfound.begin(); p != might_have_unfound.end(); ++p) { if (peer_missing.count(*p)) { ; // Ignore already probed OSDs } else { f->open_object_section("osd"); f->dump_stream("osd") << *p; if (peer_missing_requested.count(*p)) { f->dump_string("status", "querying"); } else if (!get_osdmap()->is_up(p->osd)) { f->dump_string("status", "osd is down"); } else { f->dump_string("status", "not queried"); } f->close_section(); } } f->close_section(); } psdout(20) << "Exit PeeringState common QueryUnfound" << dendl; return; } bool PeeringState::proc_replica_info( pg_shard_t from, const pg_info_t &oinfo, epoch_t send_epoch) { auto p = peer_info.find(from); if (p != peer_info.end() && p->second.last_update == oinfo.last_update) { psdout(10) << " got dup osd." << from << " info " << oinfo << ", identical to ours" << dendl; return false; } if (!get_osdmap()->has_been_up_since(from.osd, send_epoch)) { psdout(10) << " got info " << oinfo << " from down osd." << from << " discarding" << dendl; return false; } psdout(10) << " got osd." << from << " " << oinfo << dendl; ceph_assert(is_primary()); peer_info[from] = oinfo; might_have_unfound.insert(from); update_history(oinfo.history); // stray? if (!is_up(from) && !is_acting(from)) { psdout(10) << " osd." << from << " has stray content: " << oinfo << dendl; stray_set.insert(from); if (is_clean()) { purge_strays(); } } // was this a new info? if so, update peers! if (p == peer_info.end()) update_heartbeat_peers(); return true; } void PeeringState::remove_down_peer_info(const OSDMapRef &osdmap) { // Remove any downed osds from peer_info bool removed = false; auto p = peer_info.begin(); while (p != peer_info.end()) { if (!osdmap->is_up(p->first.osd)) { psdout(10) << " dropping down osd." << p->first << " info " << p->second << dendl; peer_missing.erase(p->first); peer_log_requested.erase(p->first); peer_missing_requested.erase(p->first); peer_info.erase(p++); removed = true; } else ++p; } // Remove any downed osds from peer_purged so we can re-purge if necessary auto it = peer_purged.begin(); while (it != peer_purged.end()) { if (!osdmap->is_up(it->osd)) { psdout(10) << " dropping down osd." << *it << " from peer_purged" << dendl; peer_purged.erase(it++); } else { ++it; } } // if we removed anyone, update peers (which include peer_info) if (removed) update_heartbeat_peers(); check_recovery_sources(osdmap); } void PeeringState::update_heartbeat_peers() { if (!is_primary()) return; set new_peers; for (unsigned i=0; ifirst.osd); } pl->update_heartbeat_peers(std::move(new_peers)); } void PeeringState::write_if_dirty(ObjectStore::Transaction& t) { pl->prepare_write( info, last_written_info, past_intervals, pg_log, dirty_info, dirty_big_info, last_persisted_osdmap < get_osdmap_epoch(), t); if (dirty_info || dirty_big_info) { last_persisted_osdmap = get_osdmap_epoch(); last_written_info = info; dirty_info = false; dirty_big_info = false; } } void PeeringState::advance_map( OSDMapRef osdmap, OSDMapRef lastmap, vector& newup, int up_primary, vector& newacting, int acting_primary, PeeringCtx &rctx) { ceph_assert(lastmap == osdmap_ref); psdout(10) << "handle_advance_map " << newup << "/" << newacting << " -- " << up_primary << "/" << acting_primary << dendl; update_osdmap_ref(osdmap); pool.update(osdmap); AdvMap evt( osdmap, lastmap, newup, up_primary, newacting, acting_primary); handle_event(evt, &rctx); if (pool.info.last_change == osdmap_ref->get_epoch()) { pl->on_pool_change(); } readable_interval = pool.get_readable_interval(cct->_conf); last_require_osd_release = osdmap->require_osd_release; } void PeeringState::activate_map(PeeringCtx &rctx) { psdout(10) << __func__ << dendl; ActMap evt; handle_event(evt, &rctx); if (osdmap_ref->get_epoch() - last_persisted_osdmap > cct->_conf->osd_pg_epoch_persisted_max_stale) { psdout(20) << __func__ << ": Dirtying info: last_persisted is " << last_persisted_osdmap << " while current is " << osdmap_ref->get_epoch() << dendl; dirty_info = true; } else { psdout(20) << __func__ << ": Not dirtying info: last_persisted is " << last_persisted_osdmap << " while current is " << osdmap_ref->get_epoch() << dendl; } write_if_dirty(rctx.transaction); if (get_osdmap()->check_new_blocklist_entries()) { pl->check_blocklisted_watchers(); } } void PeeringState::set_last_peering_reset() { psdout(20) << "set_last_peering_reset " << get_osdmap_epoch() << dendl; if (last_peering_reset != get_osdmap_epoch()) { last_peering_reset = get_osdmap_epoch(); psdout(10) << "Clearing blocked outgoing recovery messages" << dendl; clear_blocked_outgoing(); if (!pl->try_flush_or_schedule_async()) { psdout(10) << "Beginning to block outgoing recovery messages" << dendl; begin_block_outgoing(); } else { psdout(10) << "Not blocking outgoing recovery messages" << dendl; } } } void PeeringState::complete_flush() { flushes_in_progress--; if (flushes_in_progress == 0) { pl->on_flushed(); } } void PeeringState::check_full_transition(OSDMapRef lastmap, OSDMapRef osdmap) { const pg_pool_t *pi = osdmap->get_pg_pool(info.pgid.pool()); if (!pi) { return; // pool deleted } bool changed = false; if (pi->has_flag(pg_pool_t::FLAG_FULL)) { const pg_pool_t *opi = lastmap->get_pg_pool(info.pgid.pool()); if (!opi || !opi->has_flag(pg_pool_t::FLAG_FULL)) { psdout(10) << " pool was marked full in " << osdmap->get_epoch() << dendl; changed = true; } } if (changed) { info.history.last_epoch_marked_full = osdmap->get_epoch(); dirty_info = true; } } bool PeeringState::should_restart_peering( int newupprimary, int newactingprimary, const vector& newup, const vector& newacting, OSDMapRef lastmap, OSDMapRef osdmap) { if (PastIntervals::is_new_interval( primary.osd, newactingprimary, acting, newacting, up_primary.osd, newupprimary, up, newup, osdmap.get(), lastmap.get(), info.pgid.pgid)) { psdout(20) << "new interval newup " << newup << " newacting " << newacting << dendl; return true; } if (!lastmap->is_up(pg_whoami.osd) && osdmap->is_up(pg_whoami.osd)) { psdout(10) << __func__ << " osd transitioned from down -> up" << dendl; return true; } return false; } /* Called before initializing peering during advance_map */ void PeeringState::start_peering_interval( const OSDMapRef lastmap, const vector& newup, int new_up_primary, const vector& newacting, int new_acting_primary, ObjectStore::Transaction &t) { const OSDMapRef osdmap = get_osdmap(); set_last_peering_reset(); vector oldacting, oldup; int oldrole = get_role(); if (is_primary()) { pl->clear_ready_to_merge(); } pg_shard_t old_acting_primary = get_primary(); pg_shard_t old_up_primary = up_primary; bool was_old_primary = is_primary(); bool was_old_nonprimary = is_nonprimary(); acting.swap(oldacting); up.swap(oldup); init_primary_up_acting( newup, newacting, new_up_primary, new_acting_primary); if (info.stats.up != up || info.stats.acting != acting || info.stats.up_primary != new_up_primary || info.stats.acting_primary != new_acting_primary) { info.stats.up = up; info.stats.up_primary = new_up_primary; info.stats.acting = acting; info.stats.acting_primary = new_acting_primary; info.stats.mapping_epoch = osdmap->get_epoch(); } pl->clear_publish_stats(); // This will now be remapped during a backfill in cases // that it would not have been before. if (up != acting) state_set(PG_STATE_REMAPPED); else state_clear(PG_STATE_REMAPPED); int role = osdmap->calc_pg_role(pg_whoami, acting); set_role(role); // did acting, up, primary|acker change? if (!lastmap) { psdout(10) << " no lastmap" << dendl; dirty_info = true; dirty_big_info = true; info.history.same_interval_since = osdmap->get_epoch(); } else { std::stringstream debug; ceph_assert(info.history.same_interval_since != 0); bool new_interval = PastIntervals::check_new_interval( old_acting_primary.osd, new_acting_primary, oldacting, newacting, old_up_primary.osd, new_up_primary, oldup, newup, info.history.same_interval_since, info.history.last_epoch_clean, osdmap.get(), lastmap.get(), info.pgid.pgid, missing_loc.get_recoverable_predicate(), &past_intervals, &debug); psdout(10) << __func__ << ": check_new_interval output: " << debug.str() << dendl; if (new_interval) { if (osdmap->get_epoch() == pl->cluster_osdmap_trim_lower_bound() && info.history.last_epoch_clean < osdmap->get_epoch()) { psdout(10) << " map gap, clearing past_intervals and faking" << dendl; // our information is incomplete and useless; someone else was clean // after everything we know if osdmaps were trimmed. past_intervals.clear(); } else { psdout(10) << " noting past " << past_intervals << dendl; } dirty_info = true; dirty_big_info = true; info.history.same_interval_since = osdmap->get_epoch(); if (osdmap->have_pg_pool(info.pgid.pgid.pool()) && info.pgid.pgid.is_split(lastmap->get_pg_num(info.pgid.pgid.pool()), osdmap->get_pg_num(info.pgid.pgid.pool()), nullptr)) { info.history.last_epoch_split = osdmap->get_epoch(); } } } if (old_up_primary != up_primary || oldup != up) { info.history.same_up_since = osdmap->get_epoch(); } // this comparison includes primary rank via pg_shard_t if (old_acting_primary != get_primary()) { info.history.same_primary_since = osdmap->get_epoch(); } on_new_interval(); pl->on_info_history_change(); psdout(1) << __func__ << " up " << oldup << " -> " << up << ", acting " << oldacting << " -> " << acting << ", acting_primary " << old_acting_primary << " -> " << new_acting_primary << ", up_primary " << old_up_primary << " -> " << new_up_primary << ", role " << oldrole << " -> " << role << ", features acting " << acting_features << " upacting " << upacting_features << dendl; // deactivate. state_clear(PG_STATE_ACTIVE); state_clear(PG_STATE_PEERED); state_clear(PG_STATE_PREMERGE); state_clear(PG_STATE_DOWN); state_clear(PG_STATE_RECOVERY_WAIT); state_clear(PG_STATE_RECOVERY_TOOFULL); state_clear(PG_STATE_RECOVERING); peer_purged.clear(); acting_recovery_backfill.clear(); // reset primary/replica state? if (was_old_primary || is_primary()) { pl->clear_want_pg_temp(); } else if (was_old_nonprimary || is_nonprimary()) { pl->clear_want_pg_temp(); } clear_primary_state(); pl->on_change(t); ceph_assert(!deleting); // should we tell the primary we are here? send_notify = !is_primary(); if (role != oldrole || was_old_primary != is_primary()) { // did primary change? if (was_old_primary != is_primary()) { state_clear(PG_STATE_CLEAN); // queue/dequeue the scrubber pl->on_primary_status_change(was_old_primary, is_primary()); } pl->on_role_change(); } else { // no role change. // did primary change? if (get_primary() != old_acting_primary) { psdout(10) << oldacting << " -> " << acting << ", acting primary " << old_acting_primary << " -> " << get_primary() << dendl; } else { // primary is the same. if (is_primary()) { // i am (still) primary. but my replica set changed. state_clear(PG_STATE_CLEAN); psdout(10) << oldacting << " -> " << acting << ", replicas changed" << dendl; } } } if (is_primary() && was_old_primary) { pl->reschedule_scrub(); } if (acting.empty() && !up.empty() && up_primary == pg_whoami) { psdout(10) << " acting empty, but i am up[0], clearing pg_temp" << dendl; pl->queue_want_pg_temp(acting); } } void PeeringState::on_new_interval() { dout(20) << __func__ << dendl; const OSDMapRef osdmap = get_osdmap(); // initialize features acting_features = CEPH_FEATURES_SUPPORTED_DEFAULT; upacting_features = CEPH_FEATURES_SUPPORTED_DEFAULT; for (auto p = acting.begin(); p != acting.end(); ++p) { if (*p == CRUSH_ITEM_NONE) continue; uint64_t f = osdmap->get_xinfo(*p).features; acting_features &= f; upacting_features &= f; } for (auto p = up.begin(); p != up.end(); ++p) { if (*p == CRUSH_ITEM_NONE) continue; upacting_features &= osdmap->get_xinfo(*p).features; } psdout(20) << __func__ << " upacting_features 0x" << std::hex << upacting_features << std::dec << " from " << acting << "+" << up << dendl; psdout(20) << __func__ << " checking missing set deletes flag. missing = " << get_pg_log().get_missing() << dendl; if (!pg_log.get_missing().may_include_deletes && !perform_deletes_during_peering()) { pl->rebuild_missing_set_with_deletes(pg_log); } ceph_assert( pg_log.get_missing().may_include_deletes == !perform_deletes_during_peering()); init_hb_stamps(); // update lease bounds for a new interval auto mnow = pl->get_mnow(); prior_readable_until_ub = std::max(prior_readable_until_ub, readable_until_ub); prior_readable_until_ub = info.history.refresh_prior_readable_until_ub( mnow, prior_readable_until_ub); psdout(10) << __func__ << " prior_readable_until_ub " << prior_readable_until_ub << " (mnow " << mnow << " + " << info.history.prior_readable_until_ub << ")" << dendl; prior_readable_down_osds.clear(); // we populate this when we build the priorset readable_until = readable_until_ub = readable_until_ub_sent = readable_until_ub_from_primary = ceph::signedspan::zero(); acting_readable_until_ub.clear(); if (is_primary()) { acting_readable_until_ub.resize(acting.size(), ceph::signedspan::zero()); } pl->on_new_interval(); } void PeeringState::init_primary_up_acting( const vector &newup, const vector &newacting, int new_up_primary, int new_acting_primary) { actingset.clear(); acting = newacting; for (uint8_t i = 0; i < acting.size(); ++i) { if (acting[i] != CRUSH_ITEM_NONE) actingset.insert( pg_shard_t( acting[i], pool.info.is_erasure() ? shard_id_t(i) : shard_id_t::NO_SHARD)); } upset.clear(); up = newup; for (uint8_t i = 0; i < up.size(); ++i) { if (up[i] != CRUSH_ITEM_NONE) upset.insert( pg_shard_t( up[i], pool.info.is_erasure() ? shard_id_t(i) : shard_id_t::NO_SHARD)); } if (!pool.info.is_erasure()) { // replicated up_primary = pg_shard_t(new_up_primary, shard_id_t::NO_SHARD); primary = pg_shard_t(new_acting_primary, shard_id_t::NO_SHARD); } else { // erasure up_primary = pg_shard_t(); primary = pg_shard_t(); for (uint8_t i = 0; i < up.size(); ++i) { if (up[i] == new_up_primary) { up_primary = pg_shard_t(up[i], shard_id_t(i)); break; } } for (uint8_t i = 0; i < acting.size(); ++i) { if (acting[i] == new_acting_primary) { primary = pg_shard_t(acting[i], shard_id_t(i)); break; } } ceph_assert(up_primary.osd == new_up_primary); ceph_assert(primary.osd == new_acting_primary); } } void PeeringState::init_hb_stamps() { if (is_primary()) { // we care about all other osds in the acting set hb_stamps.resize(acting.size() - 1); unsigned i = 0; for (auto p : acting) { if (p == CRUSH_ITEM_NONE || p == get_primary().osd) { continue; } hb_stamps[i++] = pl->get_hb_stamps(p); } hb_stamps.resize(i); } else if (is_nonprimary()) { // we care about just the primary hb_stamps.resize(1); hb_stamps[0] = pl->get_hb_stamps(get_primary().osd); } else { hb_stamps.clear(); } dout(10) << __func__ << " now " << hb_stamps << dendl; } void PeeringState::clear_recovery_state() { async_recovery_targets.clear(); backfill_targets.clear(); } void PeeringState::clear_primary_state() { psdout(10) << "clear_primary_state" << dendl; // clear peering state stray_set.clear(); peer_log_requested.clear(); peer_missing_requested.clear(); peer_info.clear(); peer_bytes.clear(); peer_missing.clear(); peer_last_complete_ondisk.clear(); peer_activated.clear(); min_last_complete_ondisk = eversion_t(); pg_trim_to = eversion_t(); might_have_unfound.clear(); need_up_thru = false; missing_loc.clear(); pg_log.reset_recovery_pointers(); clear_recovery_state(); last_update_ondisk = eversion_t(); missing_loc.clear(); pl->clear_primary_state(); } /// return [start,end) bounds for required past_intervals static pair get_required_past_interval_bounds( const pg_info_t &info, epoch_t oldest_map) { epoch_t start = std::max( info.history.last_epoch_clean ? info.history.last_epoch_clean : info.history.epoch_pool_created, oldest_map); epoch_t end = std::max( info.history.same_interval_since, info.history.epoch_pool_created); return make_pair(start, end); } void PeeringState::check_past_interval_bounds() const { // cluster_osdmap_trim_lower_bound gives us a bound on needed // intervals, see doc/dev/osd_internals/past_intervals.rst auto oldest_epoch = pl->cluster_osdmap_trim_lower_bound(); auto rpib = get_required_past_interval_bounds( info, oldest_epoch); if (rpib.first >= rpib.second) { // do not warn if the start bound is dictated by oldest_map; the // past intervals are presumably appropriate given the pg info. if (!past_intervals.empty() && rpib.first > oldest_epoch) { pl->get_clog_error() << info.pgid << " required past_interval bounds are" << " empty [" << rpib << ") but past_intervals is not: " << past_intervals; derr << info.pgid << " required past_interval bounds are" << " empty [" << rpib << ") but past_intervals is not: " << past_intervals << dendl; } } else { if (past_intervals.empty()) { pl->get_clog_error() << info.pgid << " required past_interval bounds are" << " not empty [" << rpib << ") but past_intervals " << past_intervals << " is empty"; derr << info.pgid << " required past_interval bounds are" << " not empty [" << rpib << ") but past_intervals " << past_intervals << " is empty" << dendl; ceph_assert(!past_intervals.empty()); } auto apib = past_intervals.get_bounds(); if (apib.first > rpib.first) { pl->get_clog_error() << info.pgid << " past_intervals [" << apib << ") start interval does not contain the required" << " bound [" << rpib << ") start"; derr << info.pgid << " past_intervals [" << apib << ") start interval does not contain the required" << " bound [" << rpib << ") start" << dendl; ceph_abort_msg("past_interval start interval mismatch"); } if (apib.second != rpib.second) { pl->get_clog_error() << info.pgid << " past_interal bound [" << apib << ") end does not match required [" << rpib << ") end"; derr << info.pgid << " past_interal bound [" << apib << ") end does not match required [" << rpib << ") end" << dendl; ceph_abort_msg("past_interval end mismatch"); } } } int PeeringState::clamp_recovery_priority(int priority, int pool_recovery_priority, int max) { static_assert(OSD_RECOVERY_PRIORITY_MIN < OSD_RECOVERY_PRIORITY_MAX, "Invalid priority range"); static_assert(OSD_RECOVERY_PRIORITY_MIN >= 0, "Priority range must match unsigned type"); ceph_assert(max <= OSD_RECOVERY_PRIORITY_MAX); // User can't set this too high anymore, but might be a legacy value if (pool_recovery_priority > OSD_POOL_PRIORITY_MAX) pool_recovery_priority = OSD_POOL_PRIORITY_MAX; if (pool_recovery_priority < OSD_POOL_PRIORITY_MIN) pool_recovery_priority = OSD_POOL_PRIORITY_MIN; // Shift range from min to max to 0 to max - min pool_recovery_priority += (0 - OSD_POOL_PRIORITY_MIN); ceph_assert(pool_recovery_priority >= 0 && pool_recovery_priority <= (OSD_POOL_PRIORITY_MAX - OSD_POOL_PRIORITY_MIN)); priority += pool_recovery_priority; // Clamp to valid range return std::clamp(priority, OSD_RECOVERY_PRIORITY_MIN, max); } unsigned PeeringState::get_recovery_priority() { // a higher value -> a higher priority int ret = OSD_RECOVERY_PRIORITY_BASE; int base = ret; if (state & PG_STATE_FORCED_RECOVERY) { ret = OSD_RECOVERY_PRIORITY_FORCED; } else { // XXX: This priority boost isn't so much about inactive, but about data-at-risk if (is_degraded() && info.stats.avail_no_missing.size() < pool.info.min_size) { base = OSD_RECOVERY_INACTIVE_PRIORITY_BASE; // inactive: no. of replicas < min_size, highest priority since it blocks IO ret = base + (pool.info.min_size - info.stats.avail_no_missing.size()); } int64_t pool_recovery_priority = 0; pool.info.opts.get(pool_opts_t::RECOVERY_PRIORITY, &pool_recovery_priority); ret = clamp_recovery_priority(ret, pool_recovery_priority, max_prio_map[base]); } psdout(20) << __func__ << " recovery priority is " << ret << dendl; return static_cast(ret); } unsigned PeeringState::get_backfill_priority() { // a higher value -> a higher priority int ret = OSD_BACKFILL_PRIORITY_BASE; int base = ret; if (state & PG_STATE_FORCED_BACKFILL) { ret = OSD_BACKFILL_PRIORITY_FORCED; } else { if (actingset.size() < pool.info.min_size) { base = OSD_BACKFILL_INACTIVE_PRIORITY_BASE; // inactive: no. of replicas < min_size, highest priority since it blocks IO ret = base + (pool.info.min_size - actingset.size()); } else if (is_undersized()) { // undersized: OSD_BACKFILL_DEGRADED_PRIORITY_BASE + num missing replicas ceph_assert(pool.info.size > actingset.size()); base = OSD_BACKFILL_DEGRADED_PRIORITY_BASE; ret = base + (pool.info.size - actingset.size()); } else if (is_degraded()) { // degraded: baseline degraded base = ret = OSD_BACKFILL_DEGRADED_PRIORITY_BASE; } // Adjust with pool's recovery priority int64_t pool_recovery_priority = 0; pool.info.opts.get(pool_opts_t::RECOVERY_PRIORITY, &pool_recovery_priority); ret = clamp_recovery_priority(ret, pool_recovery_priority, max_prio_map[base]); } psdout(20) << __func__ << " backfill priority is " << ret << dendl; return static_cast(ret); } unsigned PeeringState::get_delete_priority() { auto state = get_osdmap()->get_state(pg_whoami.osd); if (state & (CEPH_OSD_BACKFILLFULL | CEPH_OSD_FULL)) { return OSD_DELETE_PRIORITY_FULL; } else if (state & CEPH_OSD_NEARFULL) { return OSD_DELETE_PRIORITY_FULLISH; } else { return OSD_DELETE_PRIORITY_NORMAL; } } bool PeeringState::set_force_recovery(bool b) { bool did = false; if (b) { if (!(state & PG_STATE_FORCED_RECOVERY) && (state & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING))) { psdout(20) << __func__ << " set" << dendl; state_set(PG_STATE_FORCED_RECOVERY); pl->publish_stats_to_osd(); did = true; } } else if (state & PG_STATE_FORCED_RECOVERY) { psdout(20) << __func__ << " clear" << dendl; state_clear(PG_STATE_FORCED_RECOVERY); pl->publish_stats_to_osd(); did = true; } if (did) { psdout(20) << __func__ << " state " << get_current_state() << dendl; pl->update_local_background_io_priority(get_recovery_priority()); } return did; } bool PeeringState::set_force_backfill(bool b) { bool did = false; if (b) { if (!(state & PG_STATE_FORCED_BACKFILL) && (state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING))) { psdout(10) << __func__ << " set" << dendl; state_set(PG_STATE_FORCED_BACKFILL); pl->publish_stats_to_osd(); did = true; } } else if (state & PG_STATE_FORCED_BACKFILL) { psdout(10) << __func__ << " clear" << dendl; state_clear(PG_STATE_FORCED_BACKFILL); pl->publish_stats_to_osd(); did = true; } if (did) { psdout(20) << __func__ << " state " << get_current_state() << dendl; pl->update_local_background_io_priority(get_backfill_priority()); } return did; } void PeeringState::schedule_renew_lease() { pl->schedule_renew_lease( last_peering_reset, readable_interval / 2); } void PeeringState::send_lease() { epoch_t epoch = pl->get_osdmap_epoch(); for (auto peer : actingset) { if (peer == pg_whoami) { continue; } pl->send_cluster_message( peer.osd, TOPNSPC::make_message(epoch, spg_t(spgid.pgid, peer.shard), get_lease()), epoch); } } void PeeringState::proc_lease(const pg_lease_t& l) { assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)); if (!is_nonprimary()) { psdout(20) << __func__ << " no-op, !nonprimary" << dendl; return; } psdout(10) << __func__ << " " << l << dendl; if (l.readable_until_ub > readable_until_ub_from_primary) { readable_until_ub_from_primary = l.readable_until_ub; } ceph::signedspan ru = ceph::signedspan::zero(); if (l.readable_until != ceph::signedspan::zero() && hb_stamps[0]->peer_clock_delta_ub) { ru = l.readable_until - *hb_stamps[0]->peer_clock_delta_ub; psdout(20) << " peer_clock_delta_ub " << *hb_stamps[0]->peer_clock_delta_ub << " -> ru " << ru << dendl; } if (ru > readable_until) { readable_until = ru; psdout(20) << __func__ << " readable_until now " << readable_until << dendl; // NOTE: if we ever decide to block/queue ops on the replica, // we'll need to wake them up here. } ceph::signedspan ruub; if (hb_stamps[0]->peer_clock_delta_lb) { ruub = l.readable_until_ub - *hb_stamps[0]->peer_clock_delta_lb; psdout(20) << " peer_clock_delta_lb " << *hb_stamps[0]->peer_clock_delta_lb << " -> ruub " << ruub << dendl; } else { ruub = pl->get_mnow() + l.interval; psdout(20) << " no peer_clock_delta_lb -> ruub " << ruub << dendl; } if (ruub > readable_until_ub) { readable_until_ub = ruub; psdout(20) << __func__ << " readable_until_ub now " << readable_until_ub << dendl; } } void PeeringState::proc_lease_ack(int from, const pg_lease_ack_t& a) { assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)); auto now = pl->get_mnow(); bool was_min = false; for (unsigned i = 0; i < acting.size(); ++i) { if (from == acting[i]) { // the lease_ack value is based on the primary's clock if (a.readable_until_ub > acting_readable_until_ub[i]) { if (acting_readable_until_ub[i] == readable_until) { was_min = true; } acting_readable_until_ub[i] = a.readable_until_ub; break; } } } if (was_min) { auto old_ru = readable_until; recalc_readable_until(); if (now >= old_ru) { pl->recheck_readable(); } } } void PeeringState::proc_renew_lease() { assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)); renew_lease(pl->get_mnow()); send_lease(); schedule_renew_lease(); } void PeeringState::recalc_readable_until() { assert(is_primary()); ceph::signedspan min = readable_until_ub_sent; for (unsigned i = 0; i < acting.size(); ++i) { if (acting[i] == pg_whoami.osd || acting[i] == CRUSH_ITEM_NONE) { continue; } dout(20) << __func__ << " peer osd." << acting[i] << " ruub " << acting_readable_until_ub[i] << dendl; if (acting_readable_until_ub[i] < min) { min = acting_readable_until_ub[i]; } } readable_until = min; readable_until_ub = min; dout(20) << __func__ << " readable_until[_ub] " << readable_until << " (sent " << readable_until_ub_sent << ")" << dendl; } bool PeeringState::check_prior_readable_down_osds(const OSDMapRef& map) { assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)); bool changed = false; auto p = prior_readable_down_osds.begin(); while (p != prior_readable_down_osds.end()) { if (map->is_dead(*p)) { dout(10) << __func__ << " prior_readable_down_osds osd." << *p << " is dead as of epoch " << map->get_epoch() << dendl; p = prior_readable_down_osds.erase(p); changed = true; } else { ++p; } } if (changed && prior_readable_down_osds.empty()) { psdout(10) << " empty prior_readable_down_osds, clearing ub" << dendl; clear_prior_readable_until_ub(); return true; } return false; } bool PeeringState::adjust_need_up_thru(const OSDMapRef osdmap) { epoch_t up_thru = osdmap->get_up_thru(pg_whoami.osd); if (need_up_thru && up_thru >= info.history.same_interval_since) { psdout(10) << "adjust_need_up_thru now " << up_thru << ", need_up_thru now false" << dendl; need_up_thru = false; return true; } return false; } PastIntervals::PriorSet PeeringState::build_prior() { if (1) { // sanity check for (auto it = peer_info.begin(); it != peer_info.end(); ++it) { ceph_assert(info.history.last_epoch_started >= it->second.history.last_epoch_started); } } const OSDMap &osdmap = *get_osdmap(); PastIntervals::PriorSet prior = past_intervals.get_prior_set( pool.info.is_erasure(), info.history.last_epoch_started, &missing_loc.get_recoverable_predicate(), [&](epoch_t start, int osd, epoch_t *lost_at) { const osd_info_t *pinfo = 0; if (osdmap.exists(osd)) { pinfo = &osdmap.get_info(osd); if (lost_at) *lost_at = pinfo->lost_at; } if (osdmap.is_up(osd)) { return PastIntervals::UP; } else if (!pinfo) { return PastIntervals::DNE; } else if (pinfo->lost_at > start) { return PastIntervals::LOST; } else { return PastIntervals::DOWN; } }, up, acting, dpp); if (prior.pg_down) { state_set(PG_STATE_DOWN); } if (get_osdmap()->get_up_thru(pg_whoami.osd) < info.history.same_interval_since) { psdout(10) << "up_thru " << get_osdmap()->get_up_thru(pg_whoami.osd) << " < same_since " << info.history.same_interval_since << ", must notify monitor" << dendl; need_up_thru = true; } else { psdout(10) << "up_thru " << get_osdmap()->get_up_thru(pg_whoami.osd) << " >= same_since " << info.history.same_interval_since << ", all is well" << dendl; need_up_thru = false; } pl->set_probe_targets(prior.probe); return prior; } bool PeeringState::needs_recovery() const { ceph_assert(is_primary()); auto &missing = pg_log.get_missing(); if (missing.num_missing()) { psdout(10) << __func__ << " primary has " << missing.num_missing() << " missing" << dendl; return true; } ceph_assert(!acting_recovery_backfill.empty()); for (const pg_shard_t& peer : acting_recovery_backfill) { if (peer == get_primary()) { continue; } auto pm = peer_missing.find(peer); if (pm == peer_missing.end()) { psdout(10) << __func__ << " osd." << peer << " doesn't have missing set" << dendl; continue; } if (pm->second.num_missing()) { psdout(10) << __func__ << " osd." << peer << " has " << pm->second.num_missing() << " missing" << dendl; return true; } } psdout(10) << __func__ << " is recovered" << dendl; return false; } bool PeeringState::needs_backfill() const { ceph_assert(is_primary()); // We can assume that only possible osds that need backfill // are on the backfill_targets vector nodes. for (const pg_shard_t& peer : backfill_targets) { auto pi = peer_info.find(peer); ceph_assert(pi != peer_info.end()); if (!pi->second.last_backfill.is_max()) { psdout(10) << __func__ << " osd." << peer << " has last_backfill " << pi->second.last_backfill << dendl; return true; } } psdout(10) << __func__ << " does not need backfill" << dendl; return false; } /* * Returns true unless there is a non-lost OSD in might_have_unfound. */ bool PeeringState::all_unfound_are_queried_or_lost( const OSDMapRef osdmap) const { ceph_assert(is_primary()); auto peer = might_have_unfound.begin(); auto mend = might_have_unfound.end(); for (; peer != mend; ++peer) { if (peer_missing.count(*peer)) continue; auto iter = peer_info.find(*peer); if (iter != peer_info.end() && (iter->second.is_empty() || iter->second.dne())) continue; if (!osdmap->exists(peer->osd)) continue; const osd_info_t &osd_info(osdmap->get_info(peer->osd)); if (osd_info.lost_at <= osd_info.up_from) { // If there is even one OSD in might_have_unfound that isn't lost, we // still might retrieve our unfound. return false; } } psdout(10) << "all_unfound_are_queried_or_lost all of might_have_unfound " << might_have_unfound << " have been queried or are marked lost" << dendl; return true; } void PeeringState::reject_reservation() { pl->unreserve_recovery_space(); pl->send_cluster_message( primary.osd, TOPNSPC::make_message( MBackfillReserve::REJECT_TOOFULL, spg_t(info.pgid.pgid, primary.shard), get_osdmap_epoch()), get_osdmap_epoch()); } /** * find_best_info * * Returns an iterator to the best info in infos sorted by: * 1) Prefer newer last_update * 2) Prefer longer tail if it brings another info into contiguity * 3) Prefer current primary */ map::const_iterator PeeringState::find_best_info( const map &infos, bool restrict_to_up_acting, bool *history_les_bound) const { ceph_assert(history_les_bound); /* See doc/dev/osd_internals/last_epoch_started.rst before attempting * to make changes to this process. Also, make sure to update it * when you find bugs! */ epoch_t max_last_epoch_started_found = 0; for (auto i = infos.begin(); i != infos.end(); ++i) { if (!cct->_conf->osd_find_best_info_ignore_history_les && max_last_epoch_started_found < i->second.history.last_epoch_started) { *history_les_bound = true; max_last_epoch_started_found = i->second.history.last_epoch_started; } if (!i->second.is_incomplete() && max_last_epoch_started_found < i->second.last_epoch_started) { *history_les_bound = false; max_last_epoch_started_found = i->second.last_epoch_started; } } eversion_t min_last_update_acceptable = eversion_t::max(); for (auto i = infos.begin(); i != infos.end(); ++i) { if (max_last_epoch_started_found <= i->second.last_epoch_started) { if (min_last_update_acceptable > i->second.last_update) min_last_update_acceptable = i->second.last_update; } } if (min_last_update_acceptable == eversion_t::max()) return infos.end(); auto best = infos.end(); // find osd with newest last_update (oldest for ec_pool). // if there are multiples, prefer // - a longer tail, if it brings another peer into log contiguity // - the current primary for (auto p = infos.begin(); p != infos.end(); ++p) { if (restrict_to_up_acting && !is_up(p->first) && !is_acting(p->first)) continue; // Only consider peers with last_update >= min_last_update_acceptable if (p->second.last_update < min_last_update_acceptable) continue; // Disqualify anyone with a too old last_epoch_started if (p->second.last_epoch_started < max_last_epoch_started_found) continue; // Disqualify anyone who is incomplete (not fully backfilled) if (p->second.is_incomplete()) continue; if (best == infos.end()) { best = p; continue; } // Prefer newer last_update if (pool.info.require_rollback()) { if (p->second.last_update > best->second.last_update) continue; if (p->second.last_update < best->second.last_update) { best = p; continue; } } else { if (p->second.last_update < best->second.last_update) continue; if (p->second.last_update > best->second.last_update) { best = p; continue; } } // Prefer longer tail if (p->second.log_tail > best->second.log_tail) { continue; } else if (p->second.log_tail < best->second.log_tail) { best = p; continue; } if (!p->second.has_missing() && best->second.has_missing()) { psdout(10) << __func__ << " prefer osd." << p->first << " because it is complete while best has missing" << dendl; best = p; continue; } else if (p->second.has_missing() && !best->second.has_missing()) { psdout(10) << __func__ << " skipping osd." << p->first << " because it has missing while best is complete" << dendl; continue; } else { // both are complete or have missing // fall through } // prefer current primary (usually the caller), all things being equal if (p->first == pg_whoami) { psdout(10) << "calc_acting prefer osd." << p->first << " because it is current primary" << dendl; best = p; continue; } } return best; } void PeeringState::calc_ec_acting( map::const_iterator auth_log_shard, unsigned size, const vector &acting, const vector &up, const map &all_info, bool restrict_to_up_acting, vector *_want, set *backfill, set *acting_backfill, ostream &ss) { vector want(size, CRUSH_ITEM_NONE); map > all_info_by_shard; for (auto i = all_info.begin(); i != all_info.end(); ++i) { all_info_by_shard[i->first.shard].insert(i->first); } for (uint8_t i = 0; i < want.size(); ++i) { ss << "For position " << (unsigned)i << ": "; if (up.size() > (unsigned)i && up[i] != CRUSH_ITEM_NONE && !all_info.find(pg_shard_t(up[i], shard_id_t(i)))->second.is_incomplete() && all_info.find(pg_shard_t(up[i], shard_id_t(i)))->second.last_update >= auth_log_shard->second.log_tail) { ss << " selecting up[i]: " << pg_shard_t(up[i], shard_id_t(i)) << std::endl; want[i] = up[i]; continue; } if (up.size() > (unsigned)i && up[i] != CRUSH_ITEM_NONE) { ss << " backfilling up[i]: " << pg_shard_t(up[i], shard_id_t(i)) << " and "; backfill->insert(pg_shard_t(up[i], shard_id_t(i))); } if (acting.size() > (unsigned)i && acting[i] != CRUSH_ITEM_NONE && !all_info.find(pg_shard_t(acting[i], shard_id_t(i)))->second.is_incomplete() && all_info.find(pg_shard_t(acting[i], shard_id_t(i)))->second.last_update >= auth_log_shard->second.log_tail) { ss << " selecting acting[i]: " << pg_shard_t(acting[i], shard_id_t(i)) << std::endl; want[i] = acting[i]; } else if (!restrict_to_up_acting) { for (auto j = all_info_by_shard[shard_id_t(i)].begin(); j != all_info_by_shard[shard_id_t(i)].end(); ++j) { ceph_assert(j->shard == i); if (!all_info.find(*j)->second.is_incomplete() && all_info.find(*j)->second.last_update >= auth_log_shard->second.log_tail) { ss << " selecting stray: " << *j << std::endl; want[i] = j->osd; break; } } if (want[i] == CRUSH_ITEM_NONE) ss << " failed to fill position " << (int)i << std::endl; } } for (uint8_t i = 0; i < want.size(); ++i) { if (want[i] != CRUSH_ITEM_NONE) { acting_backfill->insert(pg_shard_t(want[i], shard_id_t(i))); } } acting_backfill->insert(backfill->begin(), backfill->end()); _want->swap(want); } std::pair::const_iterator, eversion_t> PeeringState::select_replicated_primary( map::const_iterator auth_log_shard, uint64_t force_auth_primary_missing_objects, const std::vector &up, pg_shard_t up_primary, const map &all_info, const OSDMapRef osdmap, ostream &ss) { pg_shard_t auth_log_shard_id = auth_log_shard->first; ss << __func__ << " newest update on osd." << auth_log_shard_id << " with " << auth_log_shard->second << std::endl; // select primary auto primary = all_info.find(up_primary); if (up.size() && !primary->second.is_incomplete() && primary->second.last_update >= auth_log_shard->second.log_tail) { assert(HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS)); auto approx_missing_objects = primary->second.stats.stats.sum.num_objects_missing; auto auth_version = auth_log_shard->second.last_update.version; auto primary_version = primary->second.last_update.version; if (auth_version > primary_version) { approx_missing_objects += auth_version - primary_version; } else { approx_missing_objects += primary_version - auth_version; } if ((uint64_t)approx_missing_objects > force_auth_primary_missing_objects) { primary = auth_log_shard; ss << "up_primary: " << up_primary << ") has approximate " << approx_missing_objects << "(>" << force_auth_primary_missing_objects <<") " << "missing objects, osd." << auth_log_shard_id << " selected as primary instead" << std::endl; } else { ss << "up_primary: " << up_primary << ") selected as primary" << std::endl; } } else { ceph_assert(!auth_log_shard->second.is_incomplete()); ss << "up[0] needs backfill, osd." << auth_log_shard_id << " selected as primary instead" << std::endl; primary = auth_log_shard; } ss << __func__ << " primary is osd." << primary->first << " with " << primary->second << std::endl; /* We include auth_log_shard->second.log_tail because in GetLog, * we will request logs back to the min last_update over our * acting_backfill set, which will result in our log being extended * as far backwards as necessary to pick up any peers which can * be log recovered by auth_log_shard's log */ eversion_t oldest_auth_log_entry = std::min(primary->second.log_tail, auth_log_shard->second.log_tail); return std::make_pair(primary, oldest_auth_log_entry); } /** * calculate the desired acting set. * * Choose an appropriate acting set. Prefer up[0], unless it is * incomplete, or another osd has a longer tail that allows us to * bring other up nodes up to date. */ void PeeringState::calc_replicated_acting( map::const_iterator primary, eversion_t oldest_auth_log_entry, unsigned size, const vector &acting, const vector &up, pg_shard_t up_primary, const map &all_info, bool restrict_to_up_acting, vector *want, set *backfill, set *acting_backfill, const OSDMapRef osdmap, const PGPool& pool, ostream &ss) { ss << __func__ << (restrict_to_up_acting ? " restrict_to_up_acting" : "") << std::endl; want->push_back(primary->first.osd); acting_backfill->insert(primary->first); // select replicas that have log contiguity with primary. // prefer up, then acting, then any peer_info osds for (auto i : up) { pg_shard_t up_cand = pg_shard_t(i, shard_id_t::NO_SHARD); if (up_cand == primary->first) continue; const pg_info_t &cur_info = all_info.find(up_cand)->second; if (cur_info.is_incomplete() || cur_info.last_update < oldest_auth_log_entry) { ss << " shard " << up_cand << " (up) backfill " << cur_info << std::endl; backfill->insert(up_cand); acting_backfill->insert(up_cand); } else { want->push_back(i); acting_backfill->insert(up_cand); ss << " osd." << i << " (up) accepted " << cur_info << std::endl; } } if (want->size() >= size) { return; } std::vector> candidate_by_last_update; candidate_by_last_update.reserve(acting.size()); // This no longer has backfill OSDs, but they are covered above. for (auto i : acting) { pg_shard_t acting_cand(i, shard_id_t::NO_SHARD); // skip up osds we already considered above if (acting_cand == primary->first) continue; auto up_it = find(up.begin(), up.end(), i); if (up_it != up.end()) continue; const pg_info_t &cur_info = all_info.find(acting_cand)->second; if (cur_info.is_incomplete() || cur_info.last_update < oldest_auth_log_entry) { ss << " shard " << acting_cand << " (acting) REJECTED " << cur_info << std::endl; } else { candidate_by_last_update.emplace_back(cur_info.last_update, i); } } auto sort_by_eversion =[](const std::pair &lhs, const std::pair &rhs) { return lhs.first > rhs.first; }; // sort by last_update, in descending order. std::sort(candidate_by_last_update.begin(), candidate_by_last_update.end(), sort_by_eversion); for (auto &p: candidate_by_last_update) { ceph_assert(want->size() < size); want->push_back(p.second); pg_shard_t s = pg_shard_t(p.second, shard_id_t::NO_SHARD); acting_backfill->insert(s); ss << " shard " << s << " (acting) accepted " << all_info.find(s)->second << std::endl; if (want->size() >= size) { return; } } if (restrict_to_up_acting) { return; } candidate_by_last_update.clear(); candidate_by_last_update.reserve(all_info.size()); // overestimate but fine // continue to search stray to find more suitable peers for (auto &i : all_info) { // skip up osds we already considered above if (i.first == primary->first) continue; auto up_it = find(up.begin(), up.end(), i.first.osd); if (up_it != up.end()) continue; auto acting_it = find( acting.begin(), acting.end(), i.first.osd); if (acting_it != acting.end()) continue; if (i.second.is_incomplete() || i.second.last_update < oldest_auth_log_entry) { ss << " shard " << i.first << " (stray) REJECTED " << i.second << std::endl; } else { candidate_by_last_update.emplace_back( i.second.last_update, i.first.osd); } } if (candidate_by_last_update.empty()) { // save us some effort return; } // sort by last_update, in descending order. std::sort(candidate_by_last_update.begin(), candidate_by_last_update.end(), sort_by_eversion); for (auto &p: candidate_by_last_update) { ceph_assert(want->size() < size); want->push_back(p.second); pg_shard_t s = pg_shard_t(p.second, shard_id_t::NO_SHARD); acting_backfill->insert(s); ss << " shard " << s << " (stray) accepted " << all_info.find(s)->second << std::endl; if (want->size() >= size) { return; } } } // Defines osd preference order: acting set, then larger last_update using osd_ord_t = std::tuple; // using osd_id_t = int; class bucket_candidates_t { std::deque> osds; int selected = 0; public: void add_osd(osd_ord_t ord, osd_id_t osd) { // osds will be added in smallest to largest order assert(osds.empty() || osds.back().first <= ord); osds.push_back(std::make_pair(ord, osd)); } osd_id_t pop_osd() { ceph_assert(!is_empty()); auto ret = osds.front(); osds.pop_front(); return ret.second; } void inc_selected() { selected++; } unsigned get_num_selected() const { return selected; } osd_ord_t get_ord() const { return osds.empty() ? std::make_tuple(false, eversion_t()) : osds.front().first; } bool is_empty() const { return osds.empty(); } bool operator<(const bucket_candidates_t &rhs) const { return std::make_tuple(-selected, get_ord()) < std::make_tuple(-rhs.selected, rhs.get_ord()); } friend std::ostream &operator<<(std::ostream &, const bucket_candidates_t &); }; std::ostream &operator<<(std::ostream &lhs, const bucket_candidates_t &cand) { return lhs << "candidates[" << cand.osds << "]"; } class bucket_heap_t { using elem_t = std::reference_wrapper; std::vector heap; // Max heap -- should emit buckets in order of preference struct comp { bool operator()(const elem_t &lhs, const elem_t &rhs) { return lhs.get() < rhs.get(); } }; public: void push_if_nonempty(elem_t e) { if (!e.get().is_empty()) { heap.push_back(e); std::push_heap(heap.begin(), heap.end(), comp()); } } elem_t pop() { std::pop_heap(heap.begin(), heap.end(), comp()); auto ret = heap.back(); heap.pop_back(); return ret; } bool is_empty() const { return heap.empty(); } }; /** * calc_replicated_acting_stretch * * Choose an acting set using as much of the up set as possible; filling * in the remaining slots so as to maximize the number of crush buckets at * level pool.info.peering_crush_bucket_barrier represented. * * Stretch clusters are a bit special: while they have a "size" the * same way as normal pools, if we happen to lose a data center * (we call it a "stretch bucket", but really it'll be a data center or * a cloud availability zone), we don't actually want to shove * 2 DC's worth of replication into a single site -- it won't fit! * So we locally calculate a bucket_max, based * on the targeted number of stretch buckets for the pool and * its size. Then we won't pull more than bucket_max from any * given ancestor even if it leaves us undersized. * There are two distinct phases: (commented below) */ void PeeringState::calc_replicated_acting_stretch( map::const_iterator primary, eversion_t oldest_auth_log_entry, unsigned size, const vector &acting, const vector &up, pg_shard_t up_primary, const map &all_info, bool restrict_to_up_acting, vector *want, set *backfill, set *acting_backfill, const OSDMapRef osdmap, const PGPool& pool, ostream &ss) { ceph_assert(want); ceph_assert(acting_backfill); ceph_assert(backfill); ss << __func__ << (restrict_to_up_acting ? " restrict_to_up_acting" : "") << std::endl; auto used = [want](int osd) { return std::find(want->begin(), want->end(), osd) != want->end(); }; auto usable_info = [&](const auto &cur_info) mutable { return !(cur_info.is_incomplete() || cur_info.last_update < oldest_auth_log_entry); }; auto osd_info = [&](int osd) mutable -> const pg_info_t & { pg_shard_t cand = pg_shard_t(osd, shard_id_t::NO_SHARD); const pg_info_t &cur_info = all_info.find(cand)->second; return cur_info; }; auto usable_osd = [&](int osd) mutable { return usable_info(osd_info(osd)); }; std::map ancestors; auto get_ancestor = [&](int osd) mutable { int ancestor = osdmap->crush->get_parent_of_type( osd, pool.info.peering_crush_bucket_barrier, pool.info.crush_rule); return &ancestors[ancestor]; }; unsigned bucket_max = pool.info.size / pool.info.peering_crush_bucket_target; if (bucket_max * pool.info.peering_crush_bucket_target < pool.info.size) { ++bucket_max; } /* 1) Select all usable osds from the up set as well as the primary * * We also stash any unusable osds from up into backfill. */ auto add_required = [&](int osd) { if (!used(osd)) { want->push_back(osd); acting_backfill->insert( pg_shard_t(osd, shard_id_t::NO_SHARD)); get_ancestor(osd)->inc_selected(); } }; add_required(primary->first.osd); ss << " osd " << primary->first.osd << " primary accepted " << osd_info(primary->first.osd) << std::endl; for (auto upcand: up) { auto upshard = pg_shard_t(upcand, shard_id_t::NO_SHARD); auto &curinfo = osd_info(upcand); if (usable_osd(upcand)) { ss << " osd " << upcand << " (up) accepted " << curinfo << std::endl; add_required(upcand); } else { ss << " osd " << upcand << " (up) backfill " << curinfo << std::endl; backfill->insert(upshard); acting_backfill->insert(upshard); } } if (want->size() >= pool.info.size) { // non-failed CRUSH mappings are valid ss << " up set sufficient" << std::endl; return; } ss << " up set insufficient, considering remaining osds" << std::endl; /* 2) Fill out remaining slots from usable osds in all_info * while maximizing the number of ancestor nodes at the * barrier_id crush level. */ { std::vector> candidates; /* To do this, we first filter the set of usable osd into an ordered * list of usable osds */ auto get_osd_ord = [&](bool is_acting, const pg_info_t &info) -> osd_ord_t { return std::make_tuple( !is_acting /* acting should sort first */, info.last_update); }; for (auto &cand : acting) { auto &cand_info = osd_info(cand); if (!used(cand) && usable_info(cand_info)) { ss << " acting candidate " << cand << " " << cand_info << std::endl; candidates.push_back(std::make_pair(get_osd_ord(true, cand_info), cand)); } } if (!restrict_to_up_acting) { for (auto &[cand, info] : all_info) { if (!used(cand.osd) && usable_info(info) && (std::find(acting.begin(), acting.end(), cand.osd) == acting.end())) { ss << " other candidate " << cand << " " << info << std::endl; candidates.push_back( std::make_pair(get_osd_ord(false, info), cand.osd)); } } } std::sort(candidates.begin(), candidates.end()); // We then filter these candidates by ancestor std::for_each(candidates.begin(), candidates.end(), [&](auto cand) { get_ancestor(cand.second)->add_osd(cand.first, cand.second); }); } auto pop_ancestor = [&](auto &ancestor) { ceph_assert(!ancestor.is_empty()); auto osd = ancestor.pop_osd(); ss << " accepting candidate " << osd << std::endl; ceph_assert(!used(osd)); ceph_assert(usable_osd(osd)); want->push_back(osd); acting_backfill->insert( pg_shard_t(osd, shard_id_t::NO_SHARD)); ancestor.inc_selected(); }; /* Next, we use the ancestors map to grab a descendant of the * peering_crush_mandatory_member if not already represented. * * TODO: using 0 here to match other users. Prior to merge, I * expect that this and other users should instead check against * CRUSH_ITEM_NONE. */ if (pool.info.peering_crush_mandatory_member != CRUSH_ITEM_NONE) { auto aiter = ancestors.find(pool.info.peering_crush_mandatory_member); if (aiter != ancestors.end() && !aiter->second.get_num_selected()) { ss << " adding required ancestor " << aiter->first << std::endl; ceph_assert(!aiter->second.is_empty()); // wouldn't exist otherwise pop_ancestor(aiter->second); } } /* We then place the ancestors in a heap ordered by fewest selected * and then by the ordering token of the next osd */ bucket_heap_t aheap; std::for_each(ancestors.begin(), ancestors.end(), [&](auto &anc) { aheap.push_if_nonempty(anc.second); }); /* and pull from this heap until it's empty or we have enough. * "We have enough" is a sufficient check here for * stretch_set_can_peer() because our heap sorting always * pulls from ancestors with the least number of included OSDs, * so if it is possible to satisfy the bucket_count constraints we * will do so. */ while (!aheap.is_empty() && want->size() < pool.info.size) { auto next = aheap.pop(); pop_ancestor(next.get()); if (next.get().get_num_selected() < bucket_max) { aheap.push_if_nonempty(next); } } /* The end result is that we should have as many buckets covered as * possible while respecting up, the primary selection, * the pool size (given bucket count constraints), * and the mandatory member. */ } bool PeeringState::recoverable(const vector &want) const { unsigned num_want_acting = 0; set have; for (int i = 0; i < (int)want.size(); ++i) { if (want[i] != CRUSH_ITEM_NONE) { ++num_want_acting; have.insert( pg_shard_t( want[i], pool.info.is_erasure() ? shard_id_t(i) : shard_id_t::NO_SHARD)); } } if (num_want_acting < pool.info.min_size) { if (!cct->_conf.get_val("osd_allow_recovery_below_min_size")) { psdout(10) << __func__ << " failed, recovery below min size not enabled" << dendl; return false; } } if (missing_loc.get_recoverable_predicate()(have)) { return true; } else { psdout(10) << __func__ << " failed, not recoverable " << dendl; return false; } } void PeeringState::choose_async_recovery_ec( const map &all_info, const pg_info_t &auth_info, vector *want, set *async_recovery, const OSDMapRef osdmap) const { set > candidates_by_cost; for (uint8_t i = 0; i < want->size(); ++i) { if ((*want)[i] == CRUSH_ITEM_NONE) continue; // Considering log entries to recover is accurate enough for // now. We could use minimum_to_decode_with_cost() later if // necessary. pg_shard_t shard_i((*want)[i], shard_id_t(i)); // do not include strays if (stray_set.find(shard_i) != stray_set.end()) continue; // Do not include an osd that is not up, since choosing it as // an async_recovery_target will move it out of the acting set. // This results in it being identified as a stray during peering, // because it is no longer in the up or acting set. if (!is_up(shard_i)) continue; auto shard_info = all_info.find(shard_i)->second; // for ec pools we rollback all entries past the authoritative // last_update *before* activation. This is relatively inexpensive // compared to recovery, since it is purely local, so treat shards // past the authoritative last_update the same as those equal to it. version_t auth_version = auth_info.last_update.version; version_t candidate_version = shard_info.last_update.version; assert(HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS)); auto approx_missing_objects = shard_info.stats.stats.sum.num_objects_missing; if (auth_version > candidate_version) { approx_missing_objects += auth_version - candidate_version; } if (static_cast(approx_missing_objects) > cct->_conf.get_val("osd_async_recovery_min_cost")) { candidates_by_cost.emplace(approx_missing_objects, shard_i); } } psdout(20) << __func__ << " candidates by cost are: " << candidates_by_cost << dendl; // take out as many osds as we can for async recovery, in order of cost for (auto rit = candidates_by_cost.rbegin(); rit != candidates_by_cost.rend(); ++rit) { pg_shard_t cur_shard = rit->second; vector candidate_want(*want); candidate_want[cur_shard.shard.id] = CRUSH_ITEM_NONE; if (recoverable(candidate_want)) { want->swap(candidate_want); async_recovery->insert(cur_shard); } } psdout(20) << __func__ << " result want=" << *want << " async_recovery=" << *async_recovery << dendl; } void PeeringState::choose_async_recovery_replicated( const map &all_info, const pg_info_t &auth_info, vector *want, set *async_recovery, const OSDMapRef osdmap) const { set > candidates_by_cost; for (auto osd_num : *want) { pg_shard_t shard_i(osd_num, shard_id_t::NO_SHARD); // do not include strays if (stray_set.find(shard_i) != stray_set.end()) continue; // Do not include an osd that is not up, since choosing it as // an async_recovery_target will move it out of the acting set. // This results in it being identified as a stray during peering, // because it is no longer in the up or acting set. if (!is_up(shard_i)) continue; auto shard_info = all_info.find(shard_i)->second; // use the approximate magnitude of the difference in length of // logs plus historical missing objects as the cost of recovery version_t auth_version = auth_info.last_update.version; version_t candidate_version = shard_info.last_update.version; assert(HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS)); auto approx_missing_objects = shard_info.stats.stats.sum.num_objects_missing; if (auth_version > candidate_version) { approx_missing_objects += auth_version - candidate_version; } else { approx_missing_objects += candidate_version - auth_version; } if (static_cast(approx_missing_objects) > cct->_conf.get_val("osd_async_recovery_min_cost")) { candidates_by_cost.emplace(approx_missing_objects, shard_i); } } psdout(20) << __func__ << " candidates by cost are: " << candidates_by_cost << dendl; // take out as many osds as we can for async recovery, in order of cost for (auto rit = candidates_by_cost.rbegin(); rit != candidates_by_cost.rend(); ++rit) { if (want->size() <= pool.info.min_size) { break; } pg_shard_t cur_shard = rit->second; vector candidate_want(*want); for (auto it = candidate_want.begin(); it != candidate_want.end(); ++it) { if (*it == cur_shard.osd) { candidate_want.erase(it); if (pool.info.stretch_set_can_peer(candidate_want, *osdmap, NULL)) { // if we're in stretch mode, we can only remove the osd if it doesn't // break peering limits. want->swap(candidate_want); async_recovery->insert(cur_shard); } break; } } } psdout(20) << __func__ << " result want=" << *want << " async_recovery=" << *async_recovery << dendl; } /** * choose acting * * calculate the desired acting, and request a change with the monitor * if it differs from the current acting. * * if restrict_to_up_acting=true, we filter out anything that's not in * up/acting. in order to lift this restriction, we need to * 1) check whether it's worth switching the acting set any time we get * a new pg info (not just here, when recovery finishes) * 2) check whether anything in want_acting went down on each new map * (and, if so, calculate a new want_acting) * 3) remove the assertion in PG::PeeringState::Active::react(const AdvMap) * TODO! */ bool PeeringState::choose_acting(pg_shard_t &auth_log_shard_id, bool restrict_to_up_acting, bool *history_les_bound, bool request_pg_temp_change_only) { map all_info(peer_info.begin(), peer_info.end()); all_info[pg_whoami] = info; if (cct->_conf->subsys.should_gather()) { for (auto p = all_info.begin(); p != all_info.end(); ++p) { psdout(10) << __func__ << " all_info osd." << p->first << " " << p->second << dendl; } } auto auth_log_shard = find_best_info(all_info, restrict_to_up_acting, history_les_bound); if (auth_log_shard == all_info.end()) { if (up != acting) { psdout(10) << __func__ << " no suitable info found (incomplete backfills?)," << " reverting to up" << dendl; want_acting = up; vector empty; pl->queue_want_pg_temp(empty); } else { psdout(10) << __func__ << " failed" << dendl; ceph_assert(want_acting.empty()); } return false; } ceph_assert(!auth_log_shard->second.is_incomplete()); auth_log_shard_id = auth_log_shard->first; set want_backfill, want_acting_backfill; vector want; stringstream ss; if (pool.info.is_replicated()) { auto [primary_shard, oldest_log] = select_replicated_primary( auth_log_shard, cct->_conf.get_val( "osd_force_auth_primary_missing_objects"), up, up_primary, all_info, get_osdmap(), ss); if (pool.info.is_stretch_pool()) { calc_replicated_acting_stretch( primary_shard, oldest_log, get_osdmap()->get_pg_size(info.pgid.pgid), acting, up, up_primary, all_info, restrict_to_up_acting, &want, &want_backfill, &want_acting_backfill, get_osdmap(), pool, ss); } else { calc_replicated_acting( primary_shard, oldest_log, get_osdmap()->get_pg_size(info.pgid.pgid), acting, up, up_primary, all_info, restrict_to_up_acting, &want, &want_backfill, &want_acting_backfill, get_osdmap(), pool, ss); } } else { calc_ec_acting( auth_log_shard, get_osdmap()->get_pg_size(info.pgid.pgid), acting, up, all_info, restrict_to_up_acting, &want, &want_backfill, &want_acting_backfill, ss); } psdout(10) << ss.str() << dendl; if (!recoverable(want)) { want_acting.clear(); return false; } set want_async_recovery; if (HAVE_FEATURE(get_osdmap()->get_up_osd_features(), SERVER_MIMIC)) { if (pool.info.is_erasure()) { choose_async_recovery_ec( all_info, auth_log_shard->second, &want, &want_async_recovery, get_osdmap()); } else { choose_async_recovery_replicated( all_info, auth_log_shard->second, &want, &want_async_recovery, get_osdmap()); } } while (want.size() > pool.info.size) { // async recovery should have taken out as many osds as it can. // if not, then always evict the last peer // (will get synchronously recovered later) psdout(10) << __func__ << " evicting osd." << want.back() << " from oversized want " << want << dendl; want.pop_back(); } if (want != acting) { psdout(10) << __func__ << " want " << want << " != acting " << acting << ", requesting pg_temp change" << dendl; want_acting = want; if (!cct->_conf->osd_debug_no_acting_change) { if (want_acting == up) { // There can't be any pending backfill if // want is the same as crush map up OSDs. ceph_assert(want_backfill.empty()); vector empty; pl->queue_want_pg_temp(empty); } else pl->queue_want_pg_temp(want); } return false; } if (request_pg_temp_change_only) return true; want_acting.clear(); acting_recovery_backfill = want_acting_backfill; psdout(10) << "acting_recovery_backfill is " << acting_recovery_backfill << dendl; ceph_assert( backfill_targets.empty() || backfill_targets == want_backfill); if (backfill_targets.empty()) { // Caller is GetInfo backfill_targets = want_backfill; } // Adding !needs_recovery() to let the async_recovery_targets reset after recovery is complete ceph_assert( async_recovery_targets.empty() || async_recovery_targets == want_async_recovery || !needs_recovery()); if (async_recovery_targets.empty() || !needs_recovery()) { async_recovery_targets = want_async_recovery; } // Will not change if already set because up would have had to change // Verify that nothing in backfill is in stray_set for (auto i = want_backfill.begin(); i != want_backfill.end(); ++i) { ceph_assert(stray_set.find(*i) == stray_set.end()); } psdout(10) << "choose_acting want=" << want << " backfill_targets=" << want_backfill << " async_recovery_targets=" << async_recovery_targets << dendl; return true; } void PeeringState::log_weirdness() { if (pg_log.get_tail() != info.log_tail) pl->get_clog_error() << info.pgid << " info mismatch, log.tail " << pg_log.get_tail() << " != info.log_tail " << info.log_tail; if (pg_log.get_head() != info.last_update) pl->get_clog_error() << info.pgid << " info mismatch, log.head " << pg_log.get_head() << " != info.last_update " << info.last_update; if (!pg_log.get_log().empty()) { // sloppy check if ((pg_log.get_log().log.begin()->version <= pg_log.get_tail())) pl->get_clog_error() << info.pgid << " log bound mismatch, info (tail,head] (" << pg_log.get_tail() << "," << pg_log.get_head() << "]" << " actual [" << pg_log.get_log().log.begin()->version << "," << pg_log.get_log().log.rbegin()->version << "]"; } if (pg_log.get_log().caller_ops.size() > pg_log.get_log().log.size()) { pl->get_clog_error() << info.pgid << " caller_ops.size " << pg_log.get_log().caller_ops.size() << " > log size " << pg_log.get_log().log.size(); } } /* * Process information from a replica to determine if it could have any * objects that i need. * * TODO: if the missing set becomes very large, this could get expensive. * Instead, we probably want to just iterate over our unfound set. */ bool PeeringState::search_for_missing( const pg_info_t &oinfo, const pg_missing_t &omissing, pg_shard_t from, PeeringCtxWrapper &ctx) { uint64_t num_unfound_before = missing_loc.num_unfound(); bool found_missing = missing_loc.add_source_info( from, oinfo, omissing, ctx.handle); if (found_missing && num_unfound_before != missing_loc.num_unfound()) pl->publish_stats_to_osd(); // avoid doing this if the peer is empty. This is abit of paranoia // to avoid doing something rash if add_source_info() above // incorrectly decided we found something new. (if the peer has // last_update=0'0 that's impossible.) if (found_missing && oinfo.last_update != eversion_t()) { pg_info_t tinfo(oinfo); tinfo.pgid.shard = pg_whoami.shard; ctx.send_info( from.osd, spg_t(info.pgid.pgid, from.shard), get_osdmap_epoch(), // fixme: use lower epoch? get_osdmap_epoch(), tinfo); } return found_missing; } bool PeeringState::discover_all_missing( BufferedRecoveryMessages &rctx) { auto &missing = pg_log.get_missing(); uint64_t unfound = get_num_unfound(); bool any = false; // did we start any queries psdout(10) << __func__ << " " << missing.num_missing() << " missing, " << unfound << " unfound" << dendl; auto m = might_have_unfound.begin(); auto mend = might_have_unfound.end(); for (; m != mend; ++m) { pg_shard_t peer(*m); if (!get_osdmap()->is_up(peer.osd)) { psdout(20) << __func__ << " skipping down osd." << peer << dendl; continue; } if (peer_purged.count(peer)) { psdout(20) << __func__ << " skipping purged osd." << peer << dendl; continue; } auto iter = peer_info.find(peer); if (iter != peer_info.end() && (iter->second.is_empty() || iter->second.dne())) { // ignore empty peers continue; } // If we've requested any of this stuff, the pg_missing_t information // should be on its way. // TODO: coalsce requested_* into a single data structure if (peer_missing.find(peer) != peer_missing.end()) { psdout(20) << __func__ << ": osd." << peer << ": we already have pg_missing_t" << dendl; continue; } if (peer_log_requested.find(peer) != peer_log_requested.end()) { psdout(20) << __func__ << ": osd." << peer << ": in peer_log_requested" << dendl; continue; } if (peer_missing_requested.find(peer) != peer_missing_requested.end()) { psdout(20) << __func__ << ": osd." << peer << ": in peer_missing_requested" << dendl; continue; } // Request missing psdout(10) << __func__ << ": osd." << peer << ": requesting pg_missing_t" << dendl; peer_missing_requested.insert(peer); rctx.send_query( peer.osd, spg_t(info.pgid.pgid, peer.shard), pg_query_t( pg_query_t::FULLLOG, peer.shard, pg_whoami.shard, info.history, get_osdmap_epoch())); any = true; } return any; } /* Build the might_have_unfound set. * * This is used by the primary OSD during recovery. * * This set tracks the OSDs which might have unfound objects that the primary * OSD needs. As we receive pg_missing_t from each OSD in might_have_unfound, we * will remove the OSD from the set. */ void PeeringState::build_might_have_unfound() { ceph_assert(might_have_unfound.empty()); ceph_assert(is_primary()); psdout(10) << __func__ << dendl; check_past_interval_bounds(); might_have_unfound = past_intervals.get_might_have_unfound( pg_whoami, pool.info.is_erasure()); // include any (stray) peers for (auto p = peer_info.begin(); p != peer_info.end(); ++p) might_have_unfound.insert(p->first); psdout(15) << __func__ << ": built " << might_have_unfound << dendl; } void PeeringState::activate( ObjectStore::Transaction& t, epoch_t activation_epoch, PeeringCtxWrapper &ctx) { ceph_assert(!is_peered()); // twiddle pg state state_clear(PG_STATE_DOWN); send_notify = false; if (is_primary()) { // only update primary last_epoch_started if we will go active if (acting_set_writeable()) { ceph_assert(cct->_conf->osd_find_best_info_ignore_history_les || info.last_epoch_started <= activation_epoch); info.last_epoch_started = activation_epoch; info.last_interval_started = info.history.same_interval_since; } } else if (is_acting(pg_whoami)) { /* update last_epoch_started on acting replica to whatever the primary sent * unless it's smaller (could happen if we are going peered rather than * active, see doc/dev/osd_internals/last_epoch_started.rst) */ if (info.last_epoch_started < activation_epoch) { info.last_epoch_started = activation_epoch; info.last_interval_started = info.history.same_interval_since; } } auto &missing = pg_log.get_missing(); min_last_complete_ondisk = eversion_t(0,0); // we don't know (yet)! if (is_primary()) { last_update_ondisk = info.last_update; } last_update_applied = info.last_update; last_rollback_info_trimmed_to_applied = pg_log.get_can_rollback_to(); need_up_thru = false; // write pg info, log dirty_info = true; dirty_big_info = true; // maybe pl->schedule_event_on_commit( t, std::make_shared( get_osdmap_epoch(), get_osdmap_epoch(), ActivateCommitted( get_osdmap_epoch(), activation_epoch))); // init complete pointer if (missing.num_missing() == 0) { psdout(10) << "activate - no missing, moving last_complete " << info.last_complete << " -> " << info.last_update << dendl; info.last_complete = info.last_update; info.stats.stats.sum.num_objects_missing = 0; pg_log.reset_recovery_pointers(); } else { psdout(10) << "activate - not complete, " << missing << dendl; info.stats.stats.sum.num_objects_missing = missing.num_missing(); pg_log.activate_not_complete(info); } log_weirdness(); if (is_primary()) { // initialize snap_trimq interval_set to_trim; auto& removed_snaps_queue = get_osdmap()->get_removed_snaps_queue(); auto p = removed_snaps_queue.find(info.pgid.pgid.pool()); if (p != removed_snaps_queue.end()) { dout(20) << "activate - purged_snaps " << info.purged_snaps << " removed_snaps " << p->second << dendl; for (auto q : p->second) { to_trim.insert(q.first, q.second); } } interval_set purged; purged.intersection_of(to_trim, info.purged_snaps); to_trim.subtract(purged); assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)); renew_lease(pl->get_mnow()); // do not schedule until we are actually activated // adjust purged_snaps: PG may have been inactive while snaps were pruned // from the removed_snaps_queue in the osdmap. update local purged_snaps // reflect only those snaps that we thought were pruned and were still in // the queue. info.purged_snaps.swap(purged); // start up replicas if (prior_readable_down_osds.empty()) { dout(10) << __func__ << " no prior_readable_down_osds to wait on, clearing ub" << dendl; clear_prior_readable_until_ub(); } info.history.refresh_prior_readable_until_ub(pl->get_mnow(), prior_readable_until_ub); ceph_assert(!acting_recovery_backfill.empty()); for (auto i = acting_recovery_backfill.begin(); i != acting_recovery_backfill.end(); ++i) { if (*i == pg_whoami) continue; pg_shard_t peer = *i; ceph_assert(peer_info.count(peer)); pg_info_t& pi = peer_info[peer]; psdout(10) << "activate peer osd." << peer << " " << pi << dendl; #if defined(WITH_SEASTAR) MURef m; #else MRef m; #endif ceph_assert(peer_missing.count(peer)); pg_missing_t& pm = peer_missing[peer]; bool needs_past_intervals = pi.dne(); // Save num_bytes for backfill reservation request, can't be negative peer_bytes[peer] = std::max(0, pi.stats.stats.sum.num_bytes); if (pi.last_update == info.last_update) { // empty log if (!pi.last_backfill.is_max()) pl->get_clog_info() << info.pgid << " continuing backfill to osd." << peer << " from (" << pi.log_tail << "," << pi.last_update << "] " << pi.last_backfill << " to " << info.last_update; if (!pi.is_empty()) { psdout(10) << "activate peer osd." << peer << " is up to date, queueing in pending_activators" << dendl; ctx.send_info( peer.osd, spg_t(info.pgid.pgid, peer.shard), get_osdmap_epoch(), // fixme: use lower epoch? get_osdmap_epoch(), info, get_lease()); } else { psdout(10) << "activate peer osd." << peer << " is up to date, but sending pg_log anyway" << dendl; m = TOPNSPC::make_message( i->shard, pg_whoami.shard, get_osdmap_epoch(), info, last_peering_reset); } } else if ( pg_log.get_tail() > pi.last_update || pi.last_backfill == hobject_t() || (backfill_targets.count(*i) && pi.last_backfill.is_max())) { /* ^ This last case covers a situation where a replica is not contiguous * with the auth_log, but is contiguous with this replica. Reshuffling * the active set to handle this would be tricky, so instead we just go * ahead and backfill it anyway. This is probably preferrable in any * case since the replica in question would have to be significantly * behind. */ // backfill pl->get_clog_debug() << info.pgid << " starting backfill to osd." << peer << " from (" << pi.log_tail << "," << pi.last_update << "] " << pi.last_backfill << " to " << info.last_update; pi.last_update = info.last_update; pi.last_complete = info.last_update; pi.set_last_backfill(hobject_t()); pi.last_epoch_started = info.last_epoch_started; pi.last_interval_started = info.last_interval_started; pi.history = info.history; pi.hit_set = info.hit_set; pi.stats.stats.clear(); pi.stats.stats.sum.num_bytes = peer_bytes[peer]; // initialize peer with our purged_snaps. pi.purged_snaps = info.purged_snaps; m = TOPNSPC::make_message( i->shard, pg_whoami.shard, get_osdmap_epoch(), pi, last_peering_reset /* epoch to create pg at */); // send some recent log, so that op dup detection works well. m->log.copy_up_to(cct, pg_log.get_log(), cct->_conf->osd_max_pg_log_entries); m->info.log_tail = m->log.tail; pi.log_tail = m->log.tail; // sigh... pm.clear(); } else { // catch up ceph_assert(pg_log.get_tail() <= pi.last_update); m = TOPNSPC::make_message( i->shard, pg_whoami.shard, get_osdmap_epoch(), info, last_peering_reset /* epoch to create pg at */); // send new stuff to append to replicas log m->log.copy_after(cct, pg_log.get_log(), pi.last_update); } // share past_intervals if we are creating the pg on the replica // based on whether our info for that peer was dne() *before* // updating pi.history in the backfill block above. if (m && needs_past_intervals) m->past_intervals = past_intervals; // update local version of peer's missing list! if (m && pi.last_backfill != hobject_t()) { for (auto p = m->log.log.begin(); p != m->log.log.end(); ++p) { if (p->soid <= pi.last_backfill && !p->is_error()) { if (perform_deletes_during_peering() && p->is_delete()) { pm.rm(p->soid, p->version); } else { pm.add_next_event(*p); } } } } if (m) { dout(10) << "activate peer osd." << peer << " sending " << m->log << dendl; m->lease = get_lease(); pl->send_cluster_message(peer.osd, std::move(m), get_osdmap_epoch()); } // peer now has pi.last_update = info.last_update; // update our missing if (pm.num_missing() == 0) { pi.last_complete = pi.last_update; psdout(10) << "activate peer osd." << peer << " " << pi << " uptodate" << dendl; } else { psdout(10) << "activate peer osd." << peer << " " << pi << " missing " << pm << dendl; } } // Set up missing_loc set complete_shards; for (auto i = acting_recovery_backfill.begin(); i != acting_recovery_backfill.end(); ++i) { psdout(20) << __func__ << " setting up missing_loc from shard " << *i << " " << dendl; if (*i == get_primary()) { missing_loc.add_active_missing(missing); if (!missing.have_missing()) complete_shards.insert(*i); } else { auto peer_missing_entry = peer_missing.find(*i); ceph_assert(peer_missing_entry != peer_missing.end()); missing_loc.add_active_missing(peer_missing_entry->second); if (!peer_missing_entry->second.have_missing() && peer_info[*i].last_backfill.is_max()) complete_shards.insert(*i); } } // If necessary, create might_have_unfound to help us find our unfound objects. // NOTE: It's important that we build might_have_unfound before trimming the // past intervals. might_have_unfound.clear(); if (needs_recovery()) { // If only one shard has missing, we do a trick to add all others as recovery // source, this is considered safe since the PGLogs have been merged locally, // and covers vast majority of the use cases, like one OSD/host is down for // a while for hardware repairing if (complete_shards.size() + 1 == acting_recovery_backfill.size()) { missing_loc.add_batch_sources_info(complete_shards, ctx.handle); } else { missing_loc.add_source_info(pg_whoami, info, pg_log.get_missing(), ctx.handle); for (auto i = acting_recovery_backfill.begin(); i != acting_recovery_backfill.end(); ++i) { if (*i == pg_whoami) continue; psdout(10) << __func__ << ": adding " << *i << " as a source" << dendl; ceph_assert(peer_missing.count(*i)); ceph_assert(peer_info.count(*i)); missing_loc.add_source_info( *i, peer_info[*i], peer_missing[*i], ctx.handle); } } for (auto i = peer_missing.begin(); i != peer_missing.end(); ++i) { if (is_acting_recovery_backfill(i->first)) continue; ceph_assert(peer_info.count(i->first)); search_for_missing( peer_info[i->first], i->second, i->first, ctx); } build_might_have_unfound(); // Always call now so update_calc_stats() will be accurate discover_all_missing(ctx.msgs); } // num_objects_degraded if calculated should reflect this too, unless no // missing and we are about to go clean. if (get_osdmap()->get_pg_size(info.pgid.pgid) > actingset.size()) { state_set(PG_STATE_UNDERSIZED); } state_set(PG_STATE_ACTIVATING); pl->on_activate(std::move(to_trim)); } if (acting_set_writeable()) { PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)}; pg_log.roll_forward(rollbacker.get()); } } void PeeringState::share_pg_info() { psdout(10) << "share_pg_info" << dendl; info.history.refresh_prior_readable_until_ub(pl->get_mnow(), prior_readable_until_ub); // share new pg_info_t with replicas ceph_assert(!acting_recovery_backfill.empty()); for (auto pg_shard : acting_recovery_backfill) { if (pg_shard == pg_whoami) continue; if (auto peer = peer_info.find(pg_shard); peer != peer_info.end()) { peer->second.last_epoch_started = info.last_epoch_started; peer->second.last_interval_started = info.last_interval_started; peer->second.history.merge(info.history); } auto m = TOPNSPC::make_message(spg_t{info.pgid.pgid, pg_shard.shard}, info, get_osdmap_epoch(), get_osdmap_epoch(), std::optional{get_lease()}, std::nullopt); pl->send_cluster_message(pg_shard.osd, std::move(m), get_osdmap_epoch()); } } void PeeringState::merge_log( ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t&& olog, pg_shard_t from) { PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)}; pg_log.merge_log( oinfo, std::move(olog), from, info, rollbacker.get(), dirty_info, dirty_big_info); } void PeeringState::rewind_divergent_log( ObjectStore::Transaction& t, eversion_t newhead) { PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)}; pg_log.rewind_divergent_log( newhead, info, rollbacker.get(), dirty_info, dirty_big_info); } void PeeringState::proc_primary_info( ObjectStore::Transaction &t, const pg_info_t &oinfo) { ceph_assert(!is_primary()); update_history(oinfo.history); if (!info.stats.stats_invalid && info.stats.stats.sum.num_scrub_errors) { info.stats.stats.sum.num_scrub_errors = 0; info.stats.stats.sum.num_shallow_scrub_errors = 0; info.stats.stats.sum.num_deep_scrub_errors = 0; dirty_info = true; } if (!(info.purged_snaps == oinfo.purged_snaps)) { psdout(10) << __func__ << " updating purged_snaps to " << oinfo.purged_snaps << dendl; info.purged_snaps = oinfo.purged_snaps; dirty_info = true; dirty_big_info = true; } } void PeeringState::proc_master_log( ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t&& olog, pg_missing_t&& omissing, pg_shard_t from) { psdout(10) << "proc_master_log for osd." << from << ": " << olog << " " << omissing << dendl; ceph_assert(!is_peered() && is_primary()); // merge log into our own log to build master log. no need to // make any adjustments to their missing map; we are taking their // log to be authoritative (i.e., their entries are by definitely // non-divergent). merge_log(t, oinfo, std::move(olog), from); peer_info[from] = oinfo; psdout(10) << " peer osd." << from << " now " << oinfo << " " << omissing << dendl; might_have_unfound.insert(from); // See doc/dev/osd_internals/last_epoch_started if (oinfo.last_epoch_started > info.last_epoch_started) { info.last_epoch_started = oinfo.last_epoch_started; dirty_info = true; } if (oinfo.last_interval_started > info.last_interval_started) { info.last_interval_started = oinfo.last_interval_started; dirty_info = true; } update_history(oinfo.history); ceph_assert(cct->_conf->osd_find_best_info_ignore_history_les || info.last_epoch_started >= info.history.last_epoch_started); peer_missing[from].claim(std::move(omissing)); } void PeeringState::proc_replica_log( pg_info_t &oinfo, const pg_log_t &olog, pg_missing_t&& omissing, pg_shard_t from) { psdout(10) << "proc_replica_log for osd." << from << ": " << oinfo << " " << olog << " " << omissing << dendl; pg_log.proc_replica_log(oinfo, olog, omissing, from); peer_info[from] = oinfo; psdout(10) << " peer osd." << from << " now " << oinfo << " " << omissing << dendl; might_have_unfound.insert(from); for (auto i = omissing.get_items().begin(); i != omissing.get_items().end(); ++i) { psdout(20) << " after missing " << i->first << " need " << i->second.need << " have " << i->second.have << dendl; } peer_missing[from].claim(std::move(omissing)); } void PeeringState::fulfill_info( pg_shard_t from, const pg_query_t &query, pair ¬ify_info) { ceph_assert(from == primary); ceph_assert(query.type == pg_query_t::INFO); // info psdout(10) << "sending info" << dendl; notify_info = make_pair(from, info); } void PeeringState::fulfill_log( pg_shard_t from, const pg_query_t &query, epoch_t query_epoch) { psdout(10) << "log request from " << from << dendl; ceph_assert(from == primary); ceph_assert(query.type != pg_query_t::INFO); auto mlog = TOPNSPC::make_message( from.shard, pg_whoami.shard, get_osdmap_epoch(), info, query_epoch); mlog->missing = pg_log.get_missing(); // primary -> other, when building master log if (query.type == pg_query_t::LOG) { psdout(10) << " sending info+missing+log since " << query.since << dendl; if (query.since != eversion_t() && query.since < pg_log.get_tail()) { pl->get_clog_error() << info.pgid << " got broken pg_query_t::LOG since " << query.since << " when my log.tail is " << pg_log.get_tail() << ", sending full log instead"; mlog->log = pg_log.get_log(); // primary should not have requested this!! } else mlog->log.copy_after(cct, pg_log.get_log(), query.since); } else if (query.type == pg_query_t::FULLLOG) { psdout(10) << " sending info+missing+full log" << dendl; mlog->log = pg_log.get_log(); } psdout(10) << " sending " << mlog->log << " " << mlog->missing << dendl; pl->send_cluster_message(from.osd, std::move(mlog), get_osdmap_epoch(), true); } void PeeringState::fulfill_query(const MQuery& query, PeeringCtxWrapper &rctx) { if (query.query.type == pg_query_t::INFO) { pair notify_info; // note this refreshes our prior_readable_until_ub value update_history(query.query.history); fulfill_info(query.from, query.query, notify_info); rctx.send_notify( notify_info.first.osd, pg_notify_t( notify_info.first.shard, pg_whoami.shard, query.query_epoch, get_osdmap_epoch(), notify_info.second, past_intervals)); } else { update_history(query.query.history); fulfill_log(query.from, query.query, query.query_epoch); } } void PeeringState::try_mark_clean() { if (actingset.size() == get_osdmap()->get_pg_size(info.pgid.pgid)) { state_clear(PG_STATE_FORCED_BACKFILL | PG_STATE_FORCED_RECOVERY); state_set(PG_STATE_CLEAN); info.history.last_epoch_clean = get_osdmap_epoch(); info.history.last_interval_clean = info.history.same_interval_since; past_intervals.clear(); dirty_big_info = true; dirty_info = true; } if (!is_active() && is_peered()) { if (is_clean()) { bool target; if (pool.info.is_pending_merge(info.pgid.pgid, &target)) { if (target) { psdout(10) << "ready to merge (target)" << dendl; pl->set_ready_to_merge_target( info.last_update, info.history.last_epoch_started, info.history.last_epoch_clean); } else { psdout(10) << "ready to merge (source)" << dendl; pl->set_ready_to_merge_source(info.last_update); } } } else { psdout(10) << "not clean, not ready to merge" << dendl; // we should have notified OSD in Active state entry point } } state_clear(PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL); share_pg_info(); pl->publish_stats_to_osd(); clear_recovery_state(); } void PeeringState::split_into( pg_t child_pgid, PeeringState *child, unsigned split_bits) { child->update_osdmap_ref(get_osdmap()); child->pool = pool; // Log pg_log.split_into(child_pgid, split_bits, &(child->pg_log)); child->info.last_complete = info.last_complete; info.last_update = pg_log.get_head(); child->info.last_update = child->pg_log.get_head(); child->info.last_user_version = info.last_user_version; info.log_tail = pg_log.get_tail(); child->info.log_tail = child->pg_log.get_tail(); // reset last_complete, we might have modified pg_log & missing above pg_log.reset_complete_to(&info); child->pg_log.reset_complete_to(&child->info); // Info child->info.history = info.history; child->info.history.epoch_created = get_osdmap_epoch(); child->info.purged_snaps = info.purged_snaps; if (info.last_backfill.is_max()) { child->info.set_last_backfill(hobject_t::get_max()); } else { // restart backfill on parent and child to be safe. we could // probably do better in the bitwise sort case, but it's more // fragile (there may be special work to do on backfill completion // in the future). info.set_last_backfill(hobject_t()); child->info.set_last_backfill(hobject_t()); // restarting backfill implies that the missing set is empty, // since it is only used for objects prior to last_backfill pg_log.reset_backfill(); child->pg_log.reset_backfill(); } child->info.stats = info.stats; child->info.stats.parent_split_bits = split_bits; info.stats.stats_invalid = true; child->info.stats.stats_invalid = true; child->info.stats.objects_trimmed = 0; child->info.stats.snaptrim_duration = 0.0; child->info.last_epoch_started = info.last_epoch_started; child->info.last_interval_started = info.last_interval_started; // There can't be recovery/backfill going on now int primary, up_primary; vector newup, newacting; get_osdmap()->pg_to_up_acting_osds( child->info.pgid.pgid, &newup, &up_primary, &newacting, &primary); child->init_primary_up_acting( newup, newacting, up_primary, primary); child->role = OSDMap::calc_pg_role(pg_whoami, child->acting); // this comparison includes primary rank via pg_shard_t if (get_primary() != child->get_primary()) child->info.history.same_primary_since = get_osdmap_epoch(); child->info.stats.up = newup; child->info.stats.up_primary = up_primary; child->info.stats.acting = newacting; child->info.stats.acting_primary = primary; child->info.stats.mapping_epoch = get_osdmap_epoch(); // History child->past_intervals = past_intervals; child->on_new_interval(); child->send_notify = !child->is_primary(); child->dirty_info = true; child->dirty_big_info = true; dirty_info = true; dirty_big_info = true; } void PeeringState::merge_from( map& sources, PeeringCtx &rctx, unsigned split_bits, const pg_merge_meta_t& last_pg_merge_meta) { bool incomplete = false; if (info.last_complete != info.last_update || info.is_incomplete() || info.dne()) { psdout(10) << __func__ << " target incomplete" << dendl; incomplete = true; } if (last_pg_merge_meta.source_pgid != pg_t()) { if (info.pgid.pgid != last_pg_merge_meta.source_pgid.get_parent()) { psdout(10) << __func__ << " target doesn't match expected parent " << last_pg_merge_meta.source_pgid.get_parent() << " of source_pgid " << last_pg_merge_meta.source_pgid << dendl; incomplete = true; } if (info.last_update != last_pg_merge_meta.target_version) { psdout(10) << __func__ << " target version doesn't match expected " << last_pg_merge_meta.target_version << dendl; incomplete = true; } } PGLog::LogEntryHandlerRef handler{pl->get_log_handler(rctx.transaction)}; pg_log.roll_forward(handler.get()); info.last_complete = info.last_update; // to fake out trim() pg_log.reset_recovery_pointers(); pg_log.trim(info.last_update, info); vector log_from; for (auto& i : sources) { auto& source = i.second; if (!source) { psdout(10) << __func__ << " source " << i.first << " missing" << dendl; incomplete = true; continue; } if (source->info.last_complete != source->info.last_update || source->info.is_incomplete() || source->info.dne()) { psdout(10) << __func__ << " source " << source->pg_whoami << " incomplete" << dendl; incomplete = true; } if (last_pg_merge_meta.source_pgid != pg_t()) { if (source->info.pgid.pgid != last_pg_merge_meta.source_pgid) { dout(10) << __func__ << " source " << source->info.pgid.pgid << " doesn't match expected source pgid " << last_pg_merge_meta.source_pgid << dendl; incomplete = true; } if (source->info.last_update != last_pg_merge_meta.source_version) { dout(10) << __func__ << " source version doesn't match expected " << last_pg_merge_meta.target_version << dendl; incomplete = true; } } // prepare log PGLog::LogEntryHandlerRef handler{ source->pl->get_log_handler(rctx.transaction)}; source->pg_log.roll_forward(handler.get()); source->info.last_complete = source->info.last_update; // to fake out trim() source->pg_log.reset_recovery_pointers(); source->pg_log.trim(source->info.last_update, source->info); log_from.push_back(&source->pg_log); // combine stats info.stats.add(source->info.stats); // pull up last_update info.last_update = std::max(info.last_update, source->info.last_update); // adopt source's PastIntervals if target has none. we can do this since // pgp_num has been reduced prior to the merge, so the OSD mappings for // the PGs are identical. if (past_intervals.empty() && !source->past_intervals.empty()) { psdout(10) << __func__ << " taking source's past_intervals" << dendl; past_intervals = source->past_intervals; } } info.last_complete = info.last_update; info.log_tail = info.last_update; if (incomplete) { info.last_backfill = hobject_t(); } // merge logs pg_log.merge_from(log_from, info.last_update); // make sure we have a meaningful last_epoch_started/clean (if we were a // placeholder) if (info.history.epoch_created == 0) { // start with (a) source's history, since these PGs *should* have been // remapped in concert with each other... info.history = sources.begin()->second->info.history; // we use the last_epoch_{started,clean} we got from // the caller, which are the epochs that were reported by the PGs were // found to be ready for merge. info.history.last_epoch_clean = last_pg_merge_meta.last_epoch_clean; info.history.last_epoch_started = last_pg_merge_meta.last_epoch_started; info.last_epoch_started = last_pg_merge_meta.last_epoch_started; psdout(10) << __func__ << " set les/c to " << last_pg_merge_meta.last_epoch_started << "/" << last_pg_merge_meta.last_epoch_clean << " from pool last_dec_*, source pg history was " << sources.begin()->second->info.history << dendl; // above we have pulled down source's history and we need to check // history.epoch_created again to confirm that source is not a placeholder // too. (peering requires a sane history.same_interval_since value for any // non-newly created pg and below here we know we are basically iterating // back a series of past maps to fake a merge process, hence we need to // fix history.same_interval_since first so that start_peering_interval() // will not complain) if (info.history.epoch_created == 0) { dout(10) << __func__ << " both merge target and source are placeholders," << " set sis to lec " << info.history.last_epoch_clean << dendl; info.history.same_interval_since = info.history.last_epoch_clean; } // if the past_intervals start is later than last_epoch_clean, it // implies the source repeered again but the target didn't, or // that the source became clean in a later epoch than the target. // avoid the discrepancy but adjusting the interval start // backwards to match so that check_past_interval_bounds() will // not complain. auto pib = past_intervals.get_bounds(); if (info.history.last_epoch_clean < pib.first) { psdout(10) << __func__ << " last_epoch_clean " << info.history.last_epoch_clean << " < past_interval start " << pib.first << ", adjusting start backwards" << dendl; past_intervals.adjust_start_backwards(info.history.last_epoch_clean); } // Similarly, if the same_interval_since value is later than // last_epoch_clean, the next interval change will result in a // past_interval start that is later than last_epoch_clean. This // can happen if we use the pg_history values from the merge // source. Adjust the same_interval_since value backwards if that // happens. (We trust the les and lec values more because they came from // the real target, whereas the history value we stole from the source.) if (info.history.last_epoch_started < info.history.same_interval_since) { psdout(10) << __func__ << " last_epoch_started " << info.history.last_epoch_started << " < same_interval_since " << info.history.same_interval_since << ", adjusting pg_history backwards" << dendl; info.history.same_interval_since = info.history.last_epoch_clean; // make sure same_{up,primary}_since are <= same_interval_since info.history.same_up_since = std::min( info.history.same_up_since, info.history.same_interval_since); info.history.same_primary_since = std::min( info.history.same_primary_since, info.history.same_interval_since); } } dirty_info = true; dirty_big_info = true; } void PeeringState::start_split_stats( const set& childpgs, vector *out) { out->resize(childpgs.size() + 1); info.stats.stats.sum.split(*out); } void PeeringState::finish_split_stats( const object_stat_sum_t& stats, ObjectStore::Transaction &t) { info.stats.stats.sum = stats; write_if_dirty(t); } void PeeringState::update_blocked_by() { // set a max on the number of blocking peers we report. if we go // over, report a random subset. keep the result sorted. unsigned keep = std::min( blocked_by.size(), cct->_conf->osd_max_pg_blocked_by); unsigned skip = blocked_by.size() - keep; info.stats.blocked_by.clear(); info.stats.blocked_by.resize(keep); unsigned pos = 0; for (auto p = blocked_by.begin(); p != blocked_by.end() && keep > 0; ++p) { if (skip > 0 && (rand() % (skip + keep) < skip)) { --skip; } else { info.stats.blocked_by[pos++] = *p; --keep; } } } static bool find_shard(const set & pgs, shard_id_t shard) { for (auto&p : pgs) if (p.shard == shard) return true; return false; } static pg_shard_t get_another_shard(const set & pgs, pg_shard_t skip, shard_id_t shard) { for (auto&p : pgs) { if (p == skip) continue; if (p.shard == shard) return p; } return pg_shard_t(); } void PeeringState::update_calc_stats() { info.stats.version = info.last_update; info.stats.created = info.history.epoch_created; info.stats.last_scrub = info.history.last_scrub; info.stats.last_scrub_stamp = info.history.last_scrub_stamp; info.stats.last_deep_scrub = info.history.last_deep_scrub; info.stats.last_deep_scrub_stamp = info.history.last_deep_scrub_stamp; info.stats.last_clean_scrub_stamp = info.history.last_clean_scrub_stamp; info.stats.last_epoch_clean = info.history.last_epoch_clean; info.stats.log_size = pg_log.get_head().version - pg_log.get_tail().version; info.stats.log_dups_size = pg_log.get_log().dups.size(); info.stats.ondisk_log_size = info.stats.log_size; info.stats.log_start = pg_log.get_tail(); info.stats.ondisk_log_start = pg_log.get_tail(); info.stats.snaptrimq_len = pl->get_snap_trimq_size(); unsigned num_shards = get_osdmap()->get_pg_size(info.pgid.pgid); // In rare case that upset is too large (usually transient), use as target // for calculations below. unsigned target = std::max(num_shards, (unsigned)upset.size()); // For undersized actingset may be larger with OSDs out unsigned nrep = std::max(actingset.size(), upset.size()); // calc num_object_copies info.stats.stats.calc_copies(std::max(target, nrep)); info.stats.stats.sum.num_objects_degraded = 0; info.stats.stats.sum.num_objects_unfound = 0; info.stats.stats.sum.num_objects_misplaced = 0; info.stats.avail_no_missing.clear(); info.stats.object_location_counts.clear(); // We should never hit this condition, but if end up hitting it, // make sure to update num_objects and set PG_STATE_INCONSISTENT. if (info.stats.stats.sum.num_objects < 0) { psdout(0) << __func__ << " negative num_objects = " << info.stats.stats.sum.num_objects << " setting it to 0 " << dendl; info.stats.stats.sum.num_objects = 0; state_set(PG_STATE_INCONSISTENT); } if ((is_remapped() || is_undersized() || !is_clean()) && (is_peered()|| is_activating())) { psdout(20) << __func__ << " actingset " << actingset << " upset " << upset << " acting_recovery_backfill " << acting_recovery_backfill << dendl; ceph_assert(!acting_recovery_backfill.empty()); bool estimate = false; // NOTE: we only generate degraded, misplaced and unfound // values for the summation, not individual stat categories. int64_t num_objects = info.stats.stats.sum.num_objects; // Objects missing from up nodes, sorted by # objects. boost::container::flat_set> missing_target_objects; // Objects missing from nodes not in up, sort by # objects boost::container::flat_set> acting_source_objects; // Fill missing_target_objects/acting_source_objects { int64_t missing; // Primary first missing = pg_log.get_missing().num_missing(); ceph_assert(acting_recovery_backfill.count(pg_whoami)); if (upset.count(pg_whoami)) { missing_target_objects.emplace(missing, pg_whoami); } else { acting_source_objects.emplace(missing, pg_whoami); } info.stats.stats.sum.num_objects_missing_on_primary = missing; if (missing == 0) info.stats.avail_no_missing.push_back(pg_whoami); psdout(20) << __func__ << " shard " << pg_whoami << " primary objects " << num_objects << " missing " << missing << dendl; } // All other peers for (auto& peer : peer_info) { // Primary should not be in the peer_info, skip if it is. if (peer.first == pg_whoami) continue; int64_t missing = 0; int64_t peer_num_objects = std::max((int64_t)0, peer.second.stats.stats.sum.num_objects); // Backfill targets always track num_objects accurately // all other peers track missing accurately. if (is_backfill_target(peer.first)) { missing = std::max((int64_t)0, num_objects - peer_num_objects); } else { if (peer_missing.count(peer.first)) { missing = peer_missing[peer.first].num_missing(); } else { psdout(20) << __func__ << " no peer_missing found for " << peer.first << dendl; if (is_recovering()) { estimate = true; } missing = std::max((int64_t)0, num_objects - peer_num_objects); } } if (upset.count(peer.first)) { missing_target_objects.emplace(missing, peer.first); } else if (actingset.count(peer.first)) { acting_source_objects.emplace(missing, peer.first); } peer.second.stats.stats.sum.num_objects_missing = missing; if (missing == 0) info.stats.avail_no_missing.push_back(peer.first); psdout(20) << __func__ << " shard " << peer.first << " objects " << peer_num_objects << " missing " << missing << dendl; } // Compute object_location_counts for (auto& ml: missing_loc.get_missing_locs()) { info.stats.object_location_counts[ml.second]++; psdout(30) << __func__ << " " << ml.first << " object_location_counts[" << ml.second << "]=" << info.stats.object_location_counts[ml.second] << dendl; } int64_t not_missing = num_objects - missing_loc.get_missing_locs().size(); if (not_missing) { // During recovery we know upset == actingset and is being populated // During backfill we know that all non-missing objects are in the actingset info.stats.object_location_counts[actingset] = not_missing; } psdout(30) << __func__ << " object_location_counts[" << upset << "]=" << info.stats.object_location_counts[upset] << dendl; psdout(20) << __func__ << " object_location_counts " << info.stats.object_location_counts << dendl; // A misplaced object is not stored on the correct OSD int64_t misplaced = 0; // a degraded objects has fewer replicas or EC shards than the pool specifies. int64_t degraded = 0; if (is_recovering()) { for (auto& sml: missing_loc.get_missing_by_count()) { for (auto& ml: sml.second) { int missing_shards; if (sml.first == shard_id_t::NO_SHARD) { psdout(20) << __func__ << " ml " << ml.second << " upset size " << upset.size() << " up " << ml.first.up << dendl; missing_shards = (int)upset.size() - ml.first.up; } else { // Handle shards not even in upset below if (!find_shard(upset, sml.first)) continue; missing_shards = std::max(0, 1 - ml.first.up); psdout(20) << __func__ << " shard " << sml.first << " ml " << ml.second << " missing shards " << missing_shards << dendl; } int odegraded = ml.second * missing_shards; // Copies on other osds but limited to the possible degraded int more_osds = std::min(missing_shards, ml.first.other); int omisplaced = ml.second * more_osds; ceph_assert(omisplaced <= odegraded); odegraded -= omisplaced; misplaced += omisplaced; degraded += odegraded; } } psdout(20) << __func__ << " missing based degraded " << degraded << dendl; psdout(20) << __func__ << " missing based misplaced " << misplaced << dendl; // Handle undersized case if (pool.info.is_replicated()) { // Add degraded for missing targets (num_objects missing) ceph_assert(target >= upset.size()); unsigned needed = target - upset.size(); degraded += num_objects * needed; } else { for (unsigned i = 0 ; i < num_shards; ++i) { shard_id_t shard(i); if (!find_shard(upset, shard)) { pg_shard_t pgs = get_another_shard(actingset, pg_shard_t(), shard); if (pgs != pg_shard_t()) { int64_t missing; if (pgs == pg_whoami) missing = info.stats.stats.sum.num_objects_missing_on_primary; else missing = peer_info[pgs].stats.stats.sum.num_objects_missing; degraded += missing; misplaced += std::max((int64_t)0, num_objects - missing); } else { // No shard anywhere degraded += num_objects; } } } } goto out; } // Handle undersized case if (pool.info.is_replicated()) { // Add to missing_target_objects ceph_assert(target >= missing_target_objects.size()); unsigned needed = target - missing_target_objects.size(); if (needed) missing_target_objects.emplace(num_objects * needed, pg_shard_t(pg_shard_t::NO_OSD)); } else { for (unsigned i = 0 ; i < num_shards; ++i) { shard_id_t shard(i); bool found = false; for (const auto& t : missing_target_objects) { if (std::get<1>(t).shard == shard) { found = true; break; } } if (!found) missing_target_objects.emplace(num_objects, pg_shard_t(pg_shard_t::NO_OSD,shard)); } } for (const auto& item : missing_target_objects) psdout(20) << __func__ << " missing shard " << std::get<1>(item) << " missing= " << std::get<0>(item) << dendl; for (const auto& item : acting_source_objects) psdout(20) << __func__ << " acting shard " << std::get<1>(item) << " missing= " << std::get<0>(item) << dendl; // Handle all objects not in missing for remapped // or backfill for (auto m = missing_target_objects.rbegin(); m != missing_target_objects.rend(); ++m) { int64_t extra_missing = -1; if (pool.info.is_replicated()) { if (!acting_source_objects.empty()) { auto extra_copy = acting_source_objects.begin(); extra_missing = std::get<0>(*extra_copy); acting_source_objects.erase(extra_copy); } } else { // Erasure coded // Use corresponding shard for (const auto& a : acting_source_objects) { if (std::get<1>(a).shard == std::get<1>(*m).shard) { extra_missing = std::get<0>(a); acting_source_objects.erase(a); break; } } } if (extra_missing >= 0 && std::get<0>(*m) >= extra_missing) { // We don't know which of the objects on the target // are part of extra_missing so assume are all degraded. misplaced += std::get<0>(*m) - extra_missing; degraded += extra_missing; } else { // 1. extra_missing == -1, more targets than sources so degraded // 2. extra_missing > std::get<0>(m), so that we know that some extra_missing // previously degraded are now present on the target. degraded += std::get<0>(*m); } } // If there are still acting that haven't been accounted for // then they are misplaced for (const auto& a : acting_source_objects) { int64_t extra_misplaced = std::max((int64_t)0, num_objects - std::get<0>(a)); psdout(20) << __func__ << " extra acting misplaced " << extra_misplaced << dendl; misplaced += extra_misplaced; } out: // NOTE: Tests use these messages to verify this code psdout(20) << __func__ << " degraded " << degraded << (estimate ? " (est)": "") << dendl; psdout(20) << __func__ << " misplaced " << misplaced << (estimate ? " (est)": "")<< dendl; info.stats.stats.sum.num_objects_degraded = degraded; info.stats.stats.sum.num_objects_unfound = get_num_unfound(); info.stats.stats.sum.num_objects_misplaced = misplaced; } } std::optional PeeringState::prepare_stats_for_publish( const std::optional &pg_stats_publish, const object_stat_collection_t &unstable_stats) { if (info.stats.stats.sum.num_scrub_errors) { psdout(10) << __func__ << " inconsistent due to " << info.stats.stats.sum.num_scrub_errors << " scrub errors" << dendl; state_set(PG_STATE_INCONSISTENT); } else { state_clear(PG_STATE_INCONSISTENT); state_clear(PG_STATE_FAILED_REPAIR); } utime_t now = ceph_clock_now(); if (info.stats.state != state) { info.stats.last_change = now; // Optimistic estimation, if we just find out an inactive PG, // assume it is active till now. if (!(state & PG_STATE_ACTIVE) && (info.stats.state & PG_STATE_ACTIVE)) info.stats.last_active = now; if ((state & PG_STATE_ACTIVE) && !(info.stats.state & PG_STATE_ACTIVE)) info.stats.last_became_active = now; if ((state & (PG_STATE_ACTIVE|PG_STATE_PEERED)) && !(info.stats.state & (PG_STATE_ACTIVE|PG_STATE_PEERED))) info.stats.last_became_peered = now; info.stats.state = state; } update_calc_stats(); if (info.stats.stats.sum.num_objects_degraded) { state_set(PG_STATE_DEGRADED); } else { state_clear(PG_STATE_DEGRADED); } update_blocked_by(); pg_stat_t pre_publish = info.stats; pre_publish.stats.add(unstable_stats); utime_t cutoff = now; cutoff -= cct->_conf->osd_pg_stat_report_interval_max; // share (some of) our purged_snaps via the pg_stats. limit # of intervals // because we don't want to make the pg_stat_t structures too expensive. unsigned max = cct->_conf->osd_max_snap_prune_intervals_per_epoch; unsigned num = 0; auto i = info.purged_snaps.begin(); while (num < max && i != info.purged_snaps.end()) { pre_publish.purged_snaps.insert(i.get_start(), i.get_len()); ++num; ++i; } psdout(20) << __func__ << " reporting purged_snaps " << pre_publish.purged_snaps << dendl; if (pg_stats_publish && pre_publish == *pg_stats_publish && info.stats.last_fresh > cutoff) { psdout(15) << "publish_stats_to_osd " << pg_stats_publish->reported_epoch << ": no change since " << info.stats.last_fresh << dendl; return std::nullopt; } else { // update our stat summary and timestamps info.stats.reported_epoch = get_osdmap_epoch(); ++info.stats.reported_seq; info.stats.last_fresh = now; if (info.stats.state & PG_STATE_CLEAN) info.stats.last_clean = now; if (info.stats.state & PG_STATE_ACTIVE) info.stats.last_active = now; if (info.stats.state & (PG_STATE_ACTIVE|PG_STATE_PEERED)) info.stats.last_peered = now; info.stats.last_unstale = now; if ((info.stats.state & PG_STATE_DEGRADED) == 0) info.stats.last_undegraded = now; if ((info.stats.state & PG_STATE_UNDERSIZED) == 0) info.stats.last_fullsized = now; psdout(15) << "publish_stats_to_osd " << pre_publish.reported_epoch << ":" << pre_publish.reported_seq << dendl; return std::make_optional(std::move(pre_publish)); } } void PeeringState::init( int role, const vector& newup, int new_up_primary, const vector& newacting, int new_acting_primary, const pg_history_t& history, const PastIntervals& pi, ObjectStore::Transaction &t) { psdout(10) << "init role " << role << " up " << newup << " acting " << newacting << " history " << history << " past_intervals " << pi << dendl; set_role(role); init_primary_up_acting( newup, newacting, new_up_primary, new_acting_primary); info.history = history; past_intervals = pi; info.stats.up = up; info.stats.up_primary = new_up_primary; info.stats.acting = acting; info.stats.acting_primary = new_acting_primary; info.stats.mapping_epoch = info.history.same_interval_since; if (!perform_deletes_during_peering()) { pg_log.set_missing_may_contain_deletes(); } on_new_interval(); dirty_info = true; dirty_big_info = true; write_if_dirty(t); } void PeeringState::dump_peering_state(Formatter *f) { f->dump_string("state", get_pg_state_string()); f->dump_unsigned("epoch", get_osdmap_epoch()); f->open_array_section("up"); for (auto p = up.begin(); p != up.end(); ++p) f->dump_unsigned("osd", *p); f->close_section(); f->open_array_section("acting"); for (auto p = acting.begin(); p != acting.end(); ++p) f->dump_unsigned("osd", *p); f->close_section(); if (!backfill_targets.empty()) { f->open_array_section("backfill_targets"); for (auto p = backfill_targets.begin(); p != backfill_targets.end(); ++p) f->dump_stream("shard") << *p; f->close_section(); } if (!async_recovery_targets.empty()) { f->open_array_section("async_recovery_targets"); for (auto p = async_recovery_targets.begin(); p != async_recovery_targets.end(); ++p) f->dump_stream("shard") << *p; f->close_section(); } if (!acting_recovery_backfill.empty()) { f->open_array_section("acting_recovery_backfill"); for (auto p = acting_recovery_backfill.begin(); p != acting_recovery_backfill.end(); ++p) f->dump_stream("shard") << *p; f->close_section(); } f->open_object_section("info"); update_calc_stats(); info.dump(f); f->close_section(); f->open_array_section("peer_info"); for (auto p = peer_info.begin(); p != peer_info.end(); ++p) { f->open_object_section("info"); f->dump_stream("peer") << p->first; p->second.dump(f); f->close_section(); } f->close_section(); } void PeeringState::update_stats( std::function f, ObjectStore::Transaction *t) { if (f(info.history, info.stats)) { pl->publish_stats_to_osd(); } pl->reschedule_scrub(); if (t) { dirty_info = true; write_if_dirty(*t); } } void PeeringState::update_stats_wo_resched( std::function f) { f(info.history, info.stats); } bool PeeringState::append_log_entries_update_missing( const mempool::osd_pglog::list &entries, ObjectStore::Transaction &t, std::optional trim_to, std::optional roll_forward_to) { ceph_assert(!entries.empty()); ceph_assert(entries.begin()->version > info.last_update); PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)}; bool invalidate_stats = pg_log.append_new_log_entries( info.last_backfill, entries, rollbacker.get()); if (roll_forward_to && entries.rbegin()->soid > info.last_backfill) { pg_log.roll_forward(rollbacker.get()); } if (roll_forward_to && *roll_forward_to > pg_log.get_can_rollback_to()) { pg_log.roll_forward_to(*roll_forward_to, rollbacker.get()); last_rollback_info_trimmed_to_applied = *roll_forward_to; } info.last_update = pg_log.get_head(); if (pg_log.get_missing().num_missing() == 0) { // advance last_complete since nothing else is missing! info.last_complete = info.last_update; } info.stats.stats_invalid = info.stats.stats_invalid || invalidate_stats; psdout(20) << __func__ << " trim_to bool = " << bool(trim_to) << " trim_to = " << (trim_to ? *trim_to : eversion_t()) << dendl; if (trim_to) pg_log.trim(*trim_to, info); dirty_info = true; write_if_dirty(t); return invalidate_stats; } void PeeringState::merge_new_log_entries( const mempool::osd_pglog::list &entries, ObjectStore::Transaction &t, std::optional trim_to, std::optional roll_forward_to) { psdout(10) << __func__ << " " << entries << dendl; ceph_assert(is_primary()); bool rebuild_missing = append_log_entries_update_missing(entries, t, trim_to, roll_forward_to); for (auto i = acting_recovery_backfill.begin(); i != acting_recovery_backfill.end(); ++i) { pg_shard_t peer(*i); if (peer == pg_whoami) continue; ceph_assert(peer_missing.count(peer)); ceph_assert(peer_info.count(peer)); pg_missing_t& pmissing(peer_missing[peer]); psdout(20) << __func__ << " peer_missing for " << peer << " = " << pmissing << dendl; pg_info_t& pinfo(peer_info[peer]); bool invalidate_stats = PGLog::append_log_entries_update_missing( pinfo.last_backfill, entries, true, NULL, pmissing, NULL, dpp); pinfo.last_update = info.last_update; pinfo.stats.stats_invalid = pinfo.stats.stats_invalid || invalidate_stats; rebuild_missing = rebuild_missing || invalidate_stats; } if (!rebuild_missing) { return; } for (auto &&i: entries) { missing_loc.rebuild( i.soid, pg_whoami, acting_recovery_backfill, info, pg_log.get_missing(), peer_missing, peer_info); } } void PeeringState::add_log_entry(const pg_log_entry_t& e, bool applied) { // raise last_complete only if we were previously up to date if (info.last_complete == info.last_update) info.last_complete = e.version; // raise last_update. ceph_assert(e.version > info.last_update); info.last_update = e.version; // raise user_version, if it increased (it may have not get bumped // by all logged updates) if (e.user_version > info.last_user_version) info.last_user_version = e.user_version; // log mutation pg_log.add(e, applied); psdout(10) << "add_log_entry " << e << dendl; } void PeeringState::append_log( vector&& logv, eversion_t trim_to, eversion_t roll_forward_to, eversion_t mlcod, ObjectStore::Transaction &t, bool transaction_applied, bool async) { /* The primary has sent an info updating the history, but it may not * have arrived yet. We want to make sure that we cannot remember this * write without remembering that it happened in an interval which went * active in epoch history.last_epoch_started. */ if (info.last_epoch_started != info.history.last_epoch_started) { info.history.last_epoch_started = info.last_epoch_started; } if (info.last_interval_started != info.history.last_interval_started) { info.history.last_interval_started = info.last_interval_started; } psdout(10) << "append_log " << pg_log.get_log() << " " << logv << dendl; PGLog::LogEntryHandlerRef handler{pl->get_log_handler(t)}; if (!transaction_applied) { /* We must be a backfill or async recovery peer, so it's ok if we apply * out-of-turn since we won't be considered when * determining a min possible last_update. * * We skip_rollforward() here, which advances the crt, without * doing an actual rollforward. This avoids cleaning up entries * from the backend and we do not end up in a situation, where the * object is deleted before we can _merge_object_divergent_entries(). */ pg_log.skip_rollforward(); } for (auto p = logv.begin(); p != logv.end(); ++p) { add_log_entry(*p, transaction_applied); /* We don't want to leave the rollforward artifacts around * here past last_backfill. It's ok for the same reason as * above */ if (transaction_applied && p->soid > info.last_backfill) { pg_log.roll_forward(handler.get()); } } if (transaction_applied && roll_forward_to > pg_log.get_can_rollback_to()) { pg_log.roll_forward_to( roll_forward_to, handler.get()); last_rollback_info_trimmed_to_applied = roll_forward_to; } psdout(10) << __func__ << " approx pg log length = " << pg_log.get_log().approx_size() << dendl; psdout(10) << __func__ << " dups pg log length = " << pg_log.get_log().dups.size() << dendl; psdout(10) << __func__ << " transaction_applied = " << transaction_applied << dendl; if (!transaction_applied || async) psdout(10) << __func__ << " " << pg_whoami << " is async_recovery or backfill target" << dendl; pg_log.trim(trim_to, info, transaction_applied, async); // update the local pg, pg log dirty_info = true; write_if_dirty(t); if (!is_primary()) min_last_complete_ondisk = mlcod; } void PeeringState::recover_got( const hobject_t &oid, eversion_t v, bool is_delete, ObjectStore::Transaction &t) { if (v > pg_log.get_can_rollback_to()) { /* This can only happen during a repair, and even then, it would * be one heck of a race. If we are repairing the object, the * write in question must be fully committed, so it's not valid * to roll it back anyway (and we'll be rolled forward shortly * anyway) */ PGLog::LogEntryHandlerRef handler{pl->get_log_handler(t)}; pg_log.roll_forward_to(v, handler.get()); } psdout(10) << "got missing " << oid << " v " << v << dendl; pg_log.recover_got(oid, v, info); if (pg_log.get_log().log.empty()) { psdout(10) << "last_complete now " << info.last_complete << " while log is empty" << dendl; } else if (pg_log.get_log().complete_to != pg_log.get_log().log.end()) { psdout(10) << "last_complete now " << info.last_complete << " log.complete_to " << pg_log.get_log().complete_to->version << dendl; } else { psdout(10) << "last_complete now " << info.last_complete << " log.complete_to at end" << dendl; //below is not true in the repair case. //assert(missing.num_missing() == 0); // otherwise, complete_to was wrong. ceph_assert(info.last_complete == info.last_update); } if (is_primary()) { ceph_assert(missing_loc.needs_recovery(oid)); if (!is_delete) missing_loc.add_location(oid, pg_whoami); } // update pg dirty_info = true; write_if_dirty(t); } void PeeringState::update_backfill_progress( const hobject_t &updated_backfill, const pg_stat_t &updated_stats, bool preserve_local_num_bytes, ObjectStore::Transaction &t) { info.set_last_backfill(updated_backfill); if (preserve_local_num_bytes) { psdout(25) << __func__ << " primary " << updated_stats.stats.sum.num_bytes << " local " << info.stats.stats.sum.num_bytes << dendl; int64_t bytes = info.stats.stats.sum.num_bytes; info.stats = updated_stats; info.stats.stats.sum.num_bytes = bytes; } else { psdout(20) << __func__ << " final " << updated_stats.stats.sum.num_bytes << " replaces local " << info.stats.stats.sum.num_bytes << dendl; info.stats = updated_stats; } dirty_info = true; write_if_dirty(t); } void PeeringState::adjust_purged_snaps( std::function &snaps)> f) { f(info.purged_snaps); dirty_info = true; dirty_big_info = true; } void PeeringState::on_peer_recover( pg_shard_t peer, const hobject_t &soid, const eversion_t &version) { pl->publish_stats_to_osd(); // done! peer_missing[peer].got(soid, version); missing_loc.add_location(soid, peer); } void PeeringState::begin_peer_recover( pg_shard_t peer, const hobject_t soid) { peer_missing[peer].revise_have(soid, eversion_t()); } void PeeringState::force_object_missing( const set &peers, const hobject_t &soid, eversion_t version) { for (auto &&peer : peers) { if (peer != primary) { peer_missing[peer].add(soid, version, eversion_t(), false); } else { pg_log.missing_add(soid, version, eversion_t()); pg_log.reset_complete_to(&info); pg_log.set_last_requested(0); } } missing_loc.rebuild( soid, pg_whoami, acting_recovery_backfill, info, pg_log.get_missing(), peer_missing, peer_info); } void PeeringState::pre_submit_op( const hobject_t &hoid, const vector& logv, eversion_t at_version) { if (at_version > eversion_t()) { for (auto &&i : get_acting_recovery_backfill()) { if (i == primary) continue; pg_info_t &pinfo = peer_info[i]; // keep peer_info up to date if (pinfo.last_complete == pinfo.last_update) pinfo.last_complete = at_version; pinfo.last_update = at_version; } } bool requires_missing_loc = false; for (auto &&i : get_async_recovery_targets()) { if (i == primary || !get_peer_missing(i).is_missing(hoid)) continue; requires_missing_loc = true; for (auto &&entry: logv) { peer_missing[i].add_next_event(entry); } } if (requires_missing_loc) { for (auto &&entry: logv) { psdout(30) << __func__ << " missing_loc before: " << missing_loc.get_locations(entry.soid) << dendl; missing_loc.add_missing(entry.soid, entry.version, eversion_t(), entry.is_delete()); // clear out missing_loc missing_loc.clear_location(entry.soid); for (auto &i: get_actingset()) { if (!get_peer_missing(i).is_missing(entry.soid)) missing_loc.add_location(entry.soid, i); } psdout(30) << __func__ << " missing_loc after: " << missing_loc.get_locations(entry.soid) << dendl; } } } void PeeringState::recovery_committed_to(eversion_t version) { psdout(10) << __func__ << " version " << version << " now ondisk" << dendl; last_complete_ondisk = version; if (last_complete_ondisk == info.last_update) { if (!is_primary()) { // Either we are a replica or backfill target. // we are fully up to date. tell the primary! pl->send_cluster_message( get_primary().osd, TOPNSPC::make_message( get_osdmap_epoch(), spg_t(info.pgid.pgid, primary.shard), last_complete_ondisk), get_osdmap_epoch()); } else { calc_min_last_complete_ondisk(); } } } void PeeringState::complete_write(eversion_t v, eversion_t lc) { last_update_ondisk = v; last_complete_ondisk = lc; calc_min_last_complete_ondisk(); } void PeeringState::calc_trim_to() { size_t target = pl->get_target_pg_log_entries(); eversion_t limit = std::min( min_last_complete_ondisk, pg_log.get_can_rollback_to()); if (limit != eversion_t() && limit != pg_trim_to && pg_log.get_log().approx_size() > target) { size_t num_to_trim = std::min(pg_log.get_log().approx_size() - target, cct->_conf->osd_pg_log_trim_max); if (num_to_trim < cct->_conf->osd_pg_log_trim_min && cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) { return; } auto it = pg_log.get_log().log.begin(); eversion_t new_trim_to; for (size_t i = 0; i < num_to_trim; ++i) { new_trim_to = it->version; ++it; if (new_trim_to > limit) { new_trim_to = limit; psdout(10) << "calc_trim_to trimming to min_last_complete_ondisk" << dendl; break; } } psdout(10) << "calc_trim_to " << pg_trim_to << " -> " << new_trim_to << dendl; pg_trim_to = new_trim_to; assert(pg_trim_to <= pg_log.get_head()); assert(pg_trim_to <= min_last_complete_ondisk); } } void PeeringState::calc_trim_to_aggressive() { size_t target = pl->get_target_pg_log_entries(); // limit pg log trimming up to the can_rollback_to value eversion_t limit = std::min({ pg_log.get_head(), pg_log.get_can_rollback_to(), last_update_ondisk}); psdout(10) << __func__ << " limit = " << limit << dendl; if (limit != eversion_t() && limit != pg_trim_to && pg_log.get_log().approx_size() > target) { psdout(10) << __func__ << " approx pg log length = " << pg_log.get_log().approx_size() << dendl; uint64_t num_to_trim = std::min(pg_log.get_log().approx_size() - target, cct->_conf->osd_pg_log_trim_max); psdout(10) << __func__ << " num_to_trim = " << num_to_trim << dendl; if (num_to_trim < cct->_conf->osd_pg_log_trim_min && cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) { return; } auto it = pg_log.get_log().log.begin(); // oldest log entry auto rit = pg_log.get_log().log.rbegin(); eversion_t by_n_to_keep; // start from tail eversion_t by_n_to_trim = eversion_t::max(); // start from head for (size_t i = 0; it != pg_log.get_log().log.end(); ++it, ++rit) { i++; if (i > target && by_n_to_keep == eversion_t()) { by_n_to_keep = rit->version; } if (i >= num_to_trim && by_n_to_trim == eversion_t::max()) { by_n_to_trim = it->version; } if (by_n_to_keep != eversion_t() && by_n_to_trim != eversion_t::max()) { break; } } if (by_n_to_keep == eversion_t()) { return; } pg_trim_to = std::min({by_n_to_keep, by_n_to_trim, limit}); psdout(10) << __func__ << " pg_trim_to now " << pg_trim_to << dendl; ceph_assert(pg_trim_to <= pg_log.get_head()); } } void PeeringState::apply_op_stats( const hobject_t &soid, const object_stat_sum_t &delta_stats) { info.stats.stats.add(delta_stats); info.stats.stats.floor(0); for (auto i = get_backfill_targets().begin(); i != get_backfill_targets().end(); ++i) { pg_shard_t bt = *i; pg_info_t& pinfo = peer_info[bt]; if (soid <= pinfo.last_backfill) pinfo.stats.stats.add(delta_stats); } } void PeeringState::update_complete_backfill_object_stats( const hobject_t &hoid, const pg_stat_t &stats) { for (auto &&bt: get_backfill_targets()) { pg_info_t& pinfo = peer_info[bt]; //Add stats to all peers that were missing object if (hoid > pinfo.last_backfill) pinfo.stats.add(stats); } } void PeeringState::update_peer_last_backfill( pg_shard_t peer, const hobject_t &new_last_backfill) { pg_info_t &pinfo = peer_info[peer]; pinfo.last_backfill = new_last_backfill; if (new_last_backfill.is_max()) { /* pinfo.stats might be wrong if we did log-based recovery on the * backfilled portion in addition to continuing backfill. */ pinfo.stats = info.stats; } } void PeeringState::set_revert_with_targets( const hobject_t &soid, const set &good_peers) { for (auto &&peer: good_peers) { missing_loc.add_location(soid, peer); } } void PeeringState::prepare_backfill_for_missing( const hobject_t &soid, const eversion_t &version, const vector &targets) { for (auto &&peer: targets) { peer_missing[peer].add(soid, version, eversion_t(), false); } } void PeeringState::update_hset(const pg_hit_set_history_t &hset_history) { info.hit_set = hset_history; } /*------------ Peering State Machine----------------*/ #undef dout_prefix #define dout_prefix (context< PeeringMachine >().dpp->gen_prefix(*_dout) \ << "state<" << get_state_name() << ">: ") #undef psdout #define psdout(x) ldout(context< PeeringMachine >().cct, x) #define DECLARE_LOCALS \ PeeringState *ps = context< PeeringMachine >().state; \ std::ignore = ps; \ PeeringListener *pl = context< PeeringMachine >().pl; \ std::ignore = pl /*------Crashed-------*/ PeeringState::Crashed::Crashed(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Crashed") { context< PeeringMachine >().log_enter(state_name); ceph_abort_msg("we got a bad state machine event"); } /*------Initial-------*/ PeeringState::Initial::Initial(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Initial") { context< PeeringMachine >().log_enter(state_name); } boost::statechart::result PeeringState::Initial::react(const MNotifyRec& notify) { DECLARE_LOCALS; ps->proc_replica_info( notify.from, notify.notify.info, notify.notify.epoch_sent); ps->set_last_peering_reset(); return transit< Primary >(); } boost::statechart::result PeeringState::Initial::react(const MInfoRec& i) { DECLARE_LOCALS; ceph_assert(!ps->is_primary()); post_event(i); return transit< Stray >(); } boost::statechart::result PeeringState::Initial::react(const MLogRec& i) { DECLARE_LOCALS; ceph_assert(!ps->is_primary()); post_event(i); return transit< Stray >(); } void PeeringState::Initial::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_initial_latency, dur); } /*------Started-------*/ PeeringState::Started::Started(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started") { context< PeeringMachine >().log_enter(state_name); } boost::statechart::result PeeringState::Started::react(const IntervalFlush&) { psdout(10) << "Ending blocked outgoing recovery messages" << dendl; context< PeeringMachine >().state->end_block_outgoing(); return discard_event(); } boost::statechart::result PeeringState::Started::react(const AdvMap& advmap) { DECLARE_LOCALS; psdout(10) << "Started advmap" << dendl; ps->check_full_transition(advmap.lastmap, advmap.osdmap); if (ps->should_restart_peering( advmap.up_primary, advmap.acting_primary, advmap.newup, advmap.newacting, advmap.lastmap, advmap.osdmap)) { psdout(10) << "should_restart_peering, transitioning to Reset" << dendl; post_event(advmap); return transit< Reset >(); } ps->remove_down_peer_info(advmap.osdmap); return discard_event(); } boost::statechart::result PeeringState::Started::react(const QueryState& q) { q.f->open_object_section("state"); q.f->dump_string("name", state_name); q.f->dump_stream("enter_time") << enter_time; q.f->close_section(); return discard_event(); } boost::statechart::result PeeringState::Started::react(const QueryUnfound& q) { q.f->dump_string("state", "Started"); q.f->dump_bool("available_might_have_unfound", false); return discard_event(); } void PeeringState::Started::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_started_latency, dur); ps->state_clear(PG_STATE_WAIT | PG_STATE_LAGGY); } /*--------Reset---------*/ PeeringState::Reset::Reset(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Reset") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ps->flushes_in_progress = 0; ps->set_last_peering_reset(); ps->log_weirdness(); } boost::statechart::result PeeringState::Reset::react(const IntervalFlush&) { psdout(10) << "Ending blocked outgoing recovery messages" << dendl; context< PeeringMachine >().state->end_block_outgoing(); return discard_event(); } boost::statechart::result PeeringState::Reset::react(const AdvMap& advmap) { DECLARE_LOCALS; psdout(10) << "Reset advmap" << dendl; ps->check_full_transition(advmap.lastmap, advmap.osdmap); if (ps->should_restart_peering( advmap.up_primary, advmap.acting_primary, advmap.newup, advmap.newacting, advmap.lastmap, advmap.osdmap)) { psdout(10) << "should restart peering, calling start_peering_interval again" << dendl; ps->start_peering_interval( advmap.lastmap, advmap.newup, advmap.up_primary, advmap.newacting, advmap.acting_primary, context< PeeringMachine >().get_cur_transaction()); } ps->remove_down_peer_info(advmap.osdmap); ps->check_past_interval_bounds(); return discard_event(); } boost::statechart::result PeeringState::Reset::react(const ActMap&) { DECLARE_LOCALS; if (ps->should_send_notify() && ps->get_primary().osd >= 0) { ps->info.history.refresh_prior_readable_until_ub( pl->get_mnow(), ps->prior_readable_until_ub); context< PeeringMachine >().send_notify( ps->get_primary().osd, pg_notify_t( ps->get_primary().shard, ps->pg_whoami.shard, ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), ps->info, ps->past_intervals)); } ps->update_heartbeat_peers(); return transit< Started >(); } boost::statechart::result PeeringState::Reset::react(const QueryState& q) { q.f->open_object_section("state"); q.f->dump_string("name", state_name); q.f->dump_stream("enter_time") << enter_time; q.f->close_section(); return discard_event(); } boost::statechart::result PeeringState::Reset::react(const QueryUnfound& q) { q.f->dump_string("state", "Reset"); q.f->dump_bool("available_might_have_unfound", false); return discard_event(); } void PeeringState::Reset::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_reset_latency, dur); } /*-------Start---------*/ PeeringState::Start::Start(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Start") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; if (ps->is_primary()) { psdout(1) << "transitioning to Primary" << dendl; post_event(MakePrimary()); } else { //is_stray psdout(1) << "transitioning to Stray" << dendl; post_event(MakeStray()); } } void PeeringState::Start::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_start_latency, dur); } /*---------Primary--------*/ PeeringState::Primary::Primary(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ceph_assert(ps->want_acting.empty()); // set CREATING bit until we have peered for the first time. if (ps->info.history.last_epoch_started == 0) { ps->state_set(PG_STATE_CREATING); // use the history timestamp, which ultimately comes from the // monitor in the create case. utime_t t = ps->info.history.last_scrub_stamp; ps->info.stats.last_fresh = t; ps->info.stats.last_active = t; ps->info.stats.last_change = t; ps->info.stats.last_peered = t; ps->info.stats.last_clean = t; ps->info.stats.last_unstale = t; ps->info.stats.last_undegraded = t; ps->info.stats.last_fullsized = t; ps->info.stats.last_scrub_stamp = t; ps->info.stats.last_deep_scrub_stamp = t; ps->info.stats.last_clean_scrub_stamp = t; } } boost::statechart::result PeeringState::Primary::react(const MNotifyRec& notevt) { DECLARE_LOCALS; psdout(7) << "handle_pg_notify from osd." << notevt.from << dendl; ps->proc_replica_info( notevt.from, notevt.notify.info, notevt.notify.epoch_sent); return discard_event(); } boost::statechart::result PeeringState::Primary::react(const ActMap&) { DECLARE_LOCALS; psdout(7) << "handle ActMap primary" << dendl; pl->publish_stats_to_osd(); return discard_event(); } boost::statechart::result PeeringState::Primary::react( const SetForceRecovery&) { DECLARE_LOCALS; ps->set_force_recovery(true); return discard_event(); } boost::statechart::result PeeringState::Primary::react( const UnsetForceRecovery&) { DECLARE_LOCALS; ps->set_force_recovery(false); return discard_event(); } boost::statechart::result PeeringState::Primary::react( const RequestScrub& evt) { DECLARE_LOCALS; if (ps->is_primary()) { pl->scrub_requested(evt.deep, evt.repair); psdout(10) << "marking for scrub" << dendl; } return discard_event(); } boost::statechart::result PeeringState::Primary::react( const SetForceBackfill&) { DECLARE_LOCALS; ps->set_force_backfill(true); return discard_event(); } boost::statechart::result PeeringState::Primary::react( const UnsetForceBackfill&) { DECLARE_LOCALS; ps->set_force_backfill(false); return discard_event(); } void PeeringState::Primary::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; ps->want_acting.clear(); utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_primary_latency, dur); pl->clear_primary_state(); ps->state_clear(PG_STATE_CREATING); } /*---------Peering--------*/ PeeringState::Peering::Peering(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Peering"), history_les_bound(false) { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ceph_assert(!ps->is_peered()); ceph_assert(!ps->is_peering()); ceph_assert(ps->is_primary()); ps->state_set(PG_STATE_PEERING); } boost::statechart::result PeeringState::Peering::react(const AdvMap& advmap) { DECLARE_LOCALS; psdout(10) << "Peering advmap" << dendl; if (prior_set.affected_by_map(*(advmap.osdmap), ps->dpp)) { psdout(1) << "Peering, affected_by_map, going to Reset" << dendl; post_event(advmap); return transit< Reset >(); } ps->adjust_need_up_thru(advmap.osdmap); ps->check_prior_readable_down_osds(advmap.osdmap); return forward_event(); } boost::statechart::result PeeringState::Peering::react(const QueryState& q) { DECLARE_LOCALS; q.f->open_object_section("state"); q.f->dump_string("name", state_name); q.f->dump_stream("enter_time") << enter_time; q.f->open_array_section("past_intervals"); ps->past_intervals.dump(q.f); q.f->close_section(); q.f->open_array_section("probing_osds"); for (auto p = prior_set.probe.begin(); p != prior_set.probe.end(); ++p) q.f->dump_stream("osd") << *p; q.f->close_section(); if (prior_set.pg_down) q.f->dump_string("blocked", "peering is blocked due to down osds"); q.f->open_array_section("down_osds_we_would_probe"); for (auto p = prior_set.down.begin(); p != prior_set.down.end(); ++p) q.f->dump_int("osd", *p); q.f->close_section(); q.f->open_array_section("peering_blocked_by"); for (auto p = prior_set.blocked_by.begin(); p != prior_set.blocked_by.end(); ++p) { q.f->open_object_section("osd"); q.f->dump_int("osd", p->first); q.f->dump_int("current_lost_at", p->second); q.f->dump_string("comment", "starting or marking this osd lost may let us proceed"); q.f->close_section(); } q.f->close_section(); if (history_les_bound) { q.f->open_array_section("peering_blocked_by_detail"); q.f->open_object_section("item"); q.f->dump_string("detail","peering_blocked_by_history_les_bound"); q.f->close_section(); q.f->close_section(); } q.f->close_section(); return forward_event(); } boost::statechart::result PeeringState::Peering::react(const QueryUnfound& q) { q.f->dump_string("state", "Peering"); q.f->dump_bool("available_might_have_unfound", false); return discard_event(); } void PeeringState::Peering::exit() { DECLARE_LOCALS; psdout(10) << "Leaving Peering" << dendl; context< PeeringMachine >().log_exit(state_name, enter_time); ps->state_clear(PG_STATE_PEERING); pl->clear_probe_targets(); utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_peering_latency, dur); } /*------Backfilling-------*/ PeeringState::Backfilling::Backfilling(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/Backfilling") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ps->backfill_reserved = true; pl->on_backfill_reserved(); ps->state_clear(PG_STATE_BACKFILL_TOOFULL); ps->state_clear(PG_STATE_BACKFILL_WAIT); ps->state_set(PG_STATE_BACKFILLING); pl->publish_stats_to_osd(); } void PeeringState::Backfilling::backfill_release_reservations() { DECLARE_LOCALS; pl->cancel_local_background_io_reservation(); for (auto it = ps->backfill_targets.begin(); it != ps->backfill_targets.end(); ++it) { ceph_assert(*it != ps->pg_whoami); pl->send_cluster_message( it->osd, TOPNSPC::make_message( MBackfillReserve::RELEASE, spg_t(ps->info.pgid.pgid, it->shard), ps->get_osdmap_epoch()), ps->get_osdmap_epoch()); } } void PeeringState::Backfilling::cancel_backfill() { DECLARE_LOCALS; backfill_release_reservations(); pl->on_backfill_canceled(); } boost::statechart::result PeeringState::Backfilling::react(const Backfilled &c) { backfill_release_reservations(); return transit(); } boost::statechart::result PeeringState::Backfilling::react(const DeferBackfill &c) { DECLARE_LOCALS; psdout(10) << "defer backfill, retry delay " << c.delay << dendl; ps->state_set(PG_STATE_BACKFILL_WAIT); ps->state_clear(PG_STATE_BACKFILLING); cancel_backfill(); pl->schedule_event_after( std::make_shared( ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), RequestBackfill()), c.delay); return transit(); } boost::statechart::result PeeringState::Backfilling::react(const UnfoundBackfill &c) { DECLARE_LOCALS; psdout(10) << "backfill has unfound, can't continue" << dendl; ps->state_set(PG_STATE_BACKFILL_UNFOUND); ps->state_clear(PG_STATE_BACKFILLING); cancel_backfill(); return transit(); } boost::statechart::result PeeringState::Backfilling::react(const RemoteReservationRevokedTooFull &) { DECLARE_LOCALS; ps->state_set(PG_STATE_BACKFILL_TOOFULL); ps->state_clear(PG_STATE_BACKFILLING); cancel_backfill(); pl->schedule_event_after( std::make_shared( ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), RequestBackfill()), ps->cct->_conf->osd_backfill_retry_interval); return transit(); } boost::statechart::result PeeringState::Backfilling::react(const RemoteReservationRevoked &) { DECLARE_LOCALS; ps->state_set(PG_STATE_BACKFILL_WAIT); cancel_backfill(); if (ps->needs_backfill()) { return transit(); } else { // raced with MOSDPGBackfill::OP_BACKFILL_FINISH, ignore return discard_event(); } } void PeeringState::Backfilling::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; ps->backfill_reserved = false; ps->state_clear(PG_STATE_BACKFILLING); ps->state_clear(PG_STATE_FORCED_BACKFILL | PG_STATE_FORCED_RECOVERY); utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_backfilling_latency, dur); } /*--WaitRemoteBackfillReserved--*/ PeeringState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/WaitRemoteBackfillReserved"), backfill_osd_it(context< Active >().remote_shards_to_reserve_backfill.begin()) { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ps->state_set(PG_STATE_BACKFILL_WAIT); pl->publish_stats_to_osd(); post_event(RemoteBackfillReserved()); } boost::statechart::result PeeringState::WaitRemoteBackfillReserved::react(const RemoteBackfillReserved &evt) { DECLARE_LOCALS; int64_t num_bytes = ps->info.stats.stats.sum.num_bytes; psdout(10) << __func__ << " num_bytes " << num_bytes << dendl; if (backfill_osd_it != context< Active >().remote_shards_to_reserve_backfill.end()) { // The primary never backfills itself ceph_assert(*backfill_osd_it != ps->pg_whoami); pl->send_cluster_message( backfill_osd_it->osd, TOPNSPC::make_message( MBackfillReserve::REQUEST, spg_t(context< PeeringMachine >().spgid.pgid, backfill_osd_it->shard), ps->get_osdmap_epoch(), ps->get_backfill_priority(), num_bytes, ps->peer_bytes[*backfill_osd_it]), ps->get_osdmap_epoch()); ++backfill_osd_it; } else { ps->peer_bytes.clear(); post_event(AllBackfillsReserved()); } return discard_event(); } void PeeringState::WaitRemoteBackfillReserved::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_waitremotebackfillreserved_latency, dur); } void PeeringState::WaitRemoteBackfillReserved::retry() { DECLARE_LOCALS; pl->cancel_local_background_io_reservation(); // Send CANCEL to all previously acquired reservations set::const_iterator it, begin, end; begin = context< Active >().remote_shards_to_reserve_backfill.begin(); end = context< Active >().remote_shards_to_reserve_backfill.end(); ceph_assert(begin != end); for (it = begin; it != backfill_osd_it; ++it) { // The primary never backfills itself ceph_assert(*it != ps->pg_whoami); pl->send_cluster_message( it->osd, TOPNSPC::make_message( MBackfillReserve::RELEASE, spg_t(context< PeeringMachine >().spgid.pgid, it->shard), ps->get_osdmap_epoch()), ps->get_osdmap_epoch()); } ps->state_clear(PG_STATE_BACKFILL_WAIT); pl->publish_stats_to_osd(); pl->schedule_event_after( std::make_shared( ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), RequestBackfill()), ps->cct->_conf->osd_backfill_retry_interval); } boost::statechart::result PeeringState::WaitRemoteBackfillReserved::react(const RemoteReservationRejectedTooFull &evt) { DECLARE_LOCALS; ps->state_set(PG_STATE_BACKFILL_TOOFULL); retry(); return transit(); } boost::statechart::result PeeringState::WaitRemoteBackfillReserved::react(const RemoteReservationRevoked &evt) { retry(); return transit(); } /*--WaitLocalBackfillReserved--*/ PeeringState::WaitLocalBackfillReserved::WaitLocalBackfillReserved(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/WaitLocalBackfillReserved") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ps->state_set(PG_STATE_BACKFILL_WAIT); pl->request_local_background_io_reservation( ps->get_backfill_priority(), std::make_unique( ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), LocalBackfillReserved()), std::make_unique( ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), DeferBackfill(0.0))); pl->publish_stats_to_osd(); } void PeeringState::WaitLocalBackfillReserved::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_waitlocalbackfillreserved_latency, dur); } /*----NotBackfilling------*/ PeeringState::NotBackfilling::NotBackfilling(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/NotBackfilling") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ps->state_clear(PG_STATE_REPAIR); pl->publish_stats_to_osd(); } boost::statechart::result PeeringState::NotBackfilling::react(const QueryUnfound& q) { DECLARE_LOCALS; ps->query_unfound(q.f, "NotBackfilling"); return discard_event(); } boost::statechart::result PeeringState::NotBackfilling::react(const RemoteBackfillReserved &evt) { return discard_event(); } boost::statechart::result PeeringState::NotBackfilling::react(const RemoteReservationRejectedTooFull &evt) { return discard_event(); } void PeeringState::NotBackfilling::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; ps->state_clear(PG_STATE_BACKFILL_UNFOUND); utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_notbackfilling_latency, dur); } /*----NotRecovering------*/ PeeringState::NotRecovering::NotRecovering(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/NotRecovering") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ps->state_clear(PG_STATE_REPAIR); pl->publish_stats_to_osd(); } boost::statechart::result PeeringState::NotRecovering::react(const QueryUnfound& q) { DECLARE_LOCALS; ps->query_unfound(q.f, "NotRecovering"); return discard_event(); } void PeeringState::NotRecovering::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; ps->state_clear(PG_STATE_RECOVERY_UNFOUND); utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_notrecovering_latency, dur); } /*---RepNotRecovering----*/ PeeringState::RepNotRecovering::RepNotRecovering(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/ReplicaActive/RepNotRecovering") { context< PeeringMachine >().log_enter(state_name); } boost::statechart::result PeeringState::RepNotRecovering::react(const RejectTooFullRemoteReservation &evt) { DECLARE_LOCALS; ps->reject_reservation(); post_event(RemoteReservationRejectedTooFull()); return discard_event(); } void PeeringState::RepNotRecovering::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_repnotrecovering_latency, dur); } /*---RepWaitRecoveryReserved--*/ PeeringState::RepWaitRecoveryReserved::RepWaitRecoveryReserved(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/ReplicaActive/RepWaitRecoveryReserved") { context< PeeringMachine >().log_enter(state_name); } boost::statechart::result PeeringState::RepWaitRecoveryReserved::react(const RemoteRecoveryReserved &evt) { DECLARE_LOCALS; pl->send_cluster_message( ps->primary.osd, TOPNSPC::make_message( MRecoveryReserve::GRANT, spg_t(ps->info.pgid.pgid, ps->primary.shard), ps->get_osdmap_epoch()), ps->get_osdmap_epoch()); return transit(); } boost::statechart::result PeeringState::RepWaitRecoveryReserved::react( const RemoteReservationCanceled &evt) { DECLARE_LOCALS; pl->unreserve_recovery_space(); pl->cancel_remote_recovery_reservation(); return transit(); } void PeeringState::RepWaitRecoveryReserved::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_repwaitrecoveryreserved_latency, dur); } /*-RepWaitBackfillReserved*/ PeeringState::RepWaitBackfillReserved::RepWaitBackfillReserved(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/ReplicaActive/RepWaitBackfillReserved") { context< PeeringMachine >().log_enter(state_name); } boost::statechart::result PeeringState::RepNotRecovering::react(const RequestBackfillPrio &evt) { DECLARE_LOCALS; if (!pl->try_reserve_recovery_space( evt.primary_num_bytes, evt.local_num_bytes)) { post_event(RejectTooFullRemoteReservation()); } else { PGPeeringEventURef preempt; if (HAVE_FEATURE(ps->upacting_features, RECOVERY_RESERVATION_2)) { // older peers will interpret preemption as TOOFULL preempt = std::make_unique( pl->get_osdmap_epoch(), pl->get_osdmap_epoch(), RemoteBackfillPreempted()); } pl->request_remote_recovery_reservation( evt.priority, std::make_unique( pl->get_osdmap_epoch(), pl->get_osdmap_epoch(), RemoteBackfillReserved()), std::move(preempt)); } return transit(); } boost::statechart::result PeeringState::RepNotRecovering::react(const RequestRecoveryPrio &evt) { DECLARE_LOCALS; // fall back to a local reckoning of priority of primary doesn't pass one // (pre-mimic compat) int prio = evt.priority ? evt.priority : ps->get_recovery_priority(); PGPeeringEventURef preempt; if (HAVE_FEATURE(ps->upacting_features, RECOVERY_RESERVATION_2)) { // older peers can't handle this preempt = std::make_unique( ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), RemoteRecoveryPreempted()); } pl->request_remote_recovery_reservation( prio, std::make_unique( ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), RemoteRecoveryReserved()), std::move(preempt)); return transit(); } void PeeringState::RepWaitBackfillReserved::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_repwaitbackfillreserved_latency, dur); } boost::statechart::result PeeringState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &evt) { DECLARE_LOCALS; pl->send_cluster_message( ps->primary.osd, TOPNSPC::make_message( MBackfillReserve::GRANT, spg_t(ps->info.pgid.pgid, ps->primary.shard), ps->get_osdmap_epoch()), ps->get_osdmap_epoch()); return transit(); } boost::statechart::result PeeringState::RepWaitBackfillReserved::react( const RejectTooFullRemoteReservation &evt) { DECLARE_LOCALS; ps->reject_reservation(); post_event(RemoteReservationRejectedTooFull()); return discard_event(); } boost::statechart::result PeeringState::RepWaitBackfillReserved::react( const RemoteReservationRejectedTooFull &evt) { DECLARE_LOCALS; pl->unreserve_recovery_space(); pl->cancel_remote_recovery_reservation(); return transit(); } boost::statechart::result PeeringState::RepWaitBackfillReserved::react( const RemoteReservationCanceled &evt) { DECLARE_LOCALS; pl->unreserve_recovery_space(); pl->cancel_remote_recovery_reservation(); return transit(); } /*---RepRecovering-------*/ PeeringState::RepRecovering::RepRecovering(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/ReplicaActive/RepRecovering") { context< PeeringMachine >().log_enter(state_name); } boost::statechart::result PeeringState::RepRecovering::react(const RemoteRecoveryPreempted &) { DECLARE_LOCALS; pl->unreserve_recovery_space(); pl->send_cluster_message( ps->primary.osd, TOPNSPC::make_message( MRecoveryReserve::REVOKE, spg_t(ps->info.pgid.pgid, ps->primary.shard), ps->get_osdmap_epoch()), ps->get_osdmap_epoch()); return discard_event(); } boost::statechart::result PeeringState::RepRecovering::react(const BackfillTooFull &) { DECLARE_LOCALS; pl->unreserve_recovery_space(); pl->send_cluster_message( ps->primary.osd, TOPNSPC::make_message( MBackfillReserve::REVOKE_TOOFULL, spg_t(ps->info.pgid.pgid, ps->primary.shard), ps->get_osdmap_epoch()), ps->get_osdmap_epoch()); return discard_event(); } boost::statechart::result PeeringState::RepRecovering::react(const RemoteBackfillPreempted &) { DECLARE_LOCALS; pl->unreserve_recovery_space(); pl->send_cluster_message( ps->primary.osd, TOPNSPC::make_message( MBackfillReserve::REVOKE, spg_t(ps->info.pgid.pgid, ps->primary.shard), ps->get_osdmap_epoch()), ps->get_osdmap_epoch()); return discard_event(); } void PeeringState::RepRecovering::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; pl->unreserve_recovery_space(); pl->cancel_remote_recovery_reservation(); utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_reprecovering_latency, dur); } /*------Activating--------*/ PeeringState::Activating::Activating(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/Activating") { context< PeeringMachine >().log_enter(state_name); } void PeeringState::Activating::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_activating_latency, dur); } PeeringState::WaitLocalRecoveryReserved::WaitLocalRecoveryReserved(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/WaitLocalRecoveryReserved") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; // Make sure all nodes that part of the recovery aren't full if (!ps->cct->_conf->osd_debug_skip_full_check_in_recovery && ps->get_osdmap()->check_full(ps->acting_recovery_backfill)) { post_event(RecoveryTooFull()); return; } ps->state_clear(PG_STATE_RECOVERY_TOOFULL); ps->state_set(PG_STATE_RECOVERY_WAIT); pl->request_local_background_io_reservation( ps->get_recovery_priority(), std::make_unique( ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), LocalRecoveryReserved()), std::make_unique( ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), DeferRecovery(0.0))); pl->publish_stats_to_osd(); } boost::statechart::result PeeringState::WaitLocalRecoveryReserved::react(const RecoveryTooFull &evt) { DECLARE_LOCALS; ps->state_set(PG_STATE_RECOVERY_TOOFULL); pl->schedule_event_after( std::make_shared( ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), DoRecovery()), ps->cct->_conf->osd_recovery_retry_interval); return transit(); } void PeeringState::WaitLocalRecoveryReserved::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_waitlocalrecoveryreserved_latency, dur); } PeeringState::WaitRemoteRecoveryReserved::WaitRemoteRecoveryReserved(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/WaitRemoteRecoveryReserved"), remote_recovery_reservation_it(context< Active >().remote_shards_to_reserve_recovery.begin()) { context< PeeringMachine >().log_enter(state_name); post_event(RemoteRecoveryReserved()); } boost::statechart::result PeeringState::WaitRemoteRecoveryReserved::react(const RemoteRecoveryReserved &evt) { DECLARE_LOCALS; if (remote_recovery_reservation_it != context< Active >().remote_shards_to_reserve_recovery.end()) { ceph_assert(*remote_recovery_reservation_it != ps->pg_whoami); pl->send_cluster_message( remote_recovery_reservation_it->osd, TOPNSPC::make_message( MRecoveryReserve::REQUEST, spg_t(context< PeeringMachine >().spgid.pgid, remote_recovery_reservation_it->shard), ps->get_osdmap_epoch(), ps->get_recovery_priority()), ps->get_osdmap_epoch()); ++remote_recovery_reservation_it; } else { post_event(AllRemotesReserved()); } return discard_event(); } void PeeringState::WaitRemoteRecoveryReserved::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_waitremoterecoveryreserved_latency, dur); } PeeringState::Recovering::Recovering(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/Recovering") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ps->state_clear(PG_STATE_RECOVERY_WAIT); ps->state_clear(PG_STATE_RECOVERY_TOOFULL); ps->state_set(PG_STATE_RECOVERING); pl->on_recovery_reserved(); ceph_assert(!ps->state_test(PG_STATE_ACTIVATING)); pl->publish_stats_to_osd(); } void PeeringState::Recovering::release_reservations(bool cancel) { DECLARE_LOCALS; ceph_assert(cancel || !ps->pg_log.get_missing().have_missing()); // release remote reservations for (auto i = context< Active >().remote_shards_to_reserve_recovery.begin(); i != context< Active >().remote_shards_to_reserve_recovery.end(); ++i) { if (*i == ps->pg_whoami) // skip myself continue; pl->send_cluster_message( i->osd, TOPNSPC::make_message( MRecoveryReserve::RELEASE, spg_t(ps->info.pgid.pgid, i->shard), ps->get_osdmap_epoch()), ps->get_osdmap_epoch()); } } boost::statechart::result PeeringState::Recovering::react(const AllReplicasRecovered &evt) { DECLARE_LOCALS; ps->state_clear(PG_STATE_FORCED_RECOVERY); release_reservations(); pl->cancel_local_background_io_reservation(); return transit(); } boost::statechart::result PeeringState::Recovering::react(const RequestBackfill &evt) { DECLARE_LOCALS; release_reservations(); ps->state_clear(PG_STATE_FORCED_RECOVERY); pl->cancel_local_background_io_reservation(); pl->publish_stats_to_osd(); // transit any async_recovery_targets back into acting // so pg won't have to stay undersized for long // as backfill might take a long time to complete.. if (!ps->async_recovery_targets.empty()) { pg_shard_t auth_log_shard; bool history_les_bound = false; // FIXME: Uh-oh we have to check this return value; choose_acting can fail! ps->choose_acting(auth_log_shard, true, &history_les_bound); } return transit(); } boost::statechart::result PeeringState::Recovering::react(const DeferRecovery &evt) { DECLARE_LOCALS; if (!ps->state_test(PG_STATE_RECOVERING)) { // we may have finished recovery and have an AllReplicasRecovered // event queued to move us to the next state. psdout(10) << "got defer recovery but not recovering" << dendl; return discard_event(); } psdout(10) << "defer recovery, retry delay " << evt.delay << dendl; ps->state_set(PG_STATE_RECOVERY_WAIT); pl->cancel_local_background_io_reservation(); release_reservations(true); pl->schedule_event_after( std::make_shared( ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), DoRecovery()), evt.delay); return transit(); } boost::statechart::result PeeringState::Recovering::react(const UnfoundRecovery &evt) { DECLARE_LOCALS; psdout(10) << "recovery has unfound, can't continue" << dendl; ps->state_set(PG_STATE_RECOVERY_UNFOUND); pl->cancel_local_background_io_reservation(); release_reservations(true); return transit(); } void PeeringState::Recovering::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; ps->state_clear(PG_STATE_RECOVERING); pl->get_peering_perf().tinc(rs_recovering_latency, dur); } PeeringState::Recovered::Recovered(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/Recovered") { pg_shard_t auth_log_shard; context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ceph_assert(!ps->needs_recovery()); // if we finished backfill, all acting are active; recheck if // DEGRADED | UNDERSIZED is appropriate. ceph_assert(!ps->acting_recovery_backfill.empty()); if (ps->get_osdmap()->get_pg_size(context< PeeringMachine >().spgid.pgid) <= ps->acting_recovery_backfill.size()) { ps->state_clear(PG_STATE_FORCED_BACKFILL | PG_STATE_FORCED_RECOVERY); pl->publish_stats_to_osd(); } // adjust acting set? (e.g. because backfill completed...) bool history_les_bound = false; if (ps->acting != ps->up && !ps->choose_acting(auth_log_shard, true, &history_les_bound)) { ceph_assert(ps->want_acting.size()); } else if (!ps->async_recovery_targets.empty()) { // FIXME: Uh-oh we have to check this return value; choose_acting can fail! ps->choose_acting(auth_log_shard, true, &history_les_bound); } if (context< Active >().all_replicas_activated && ps->async_recovery_targets.empty()) post_event(GoClean()); } void PeeringState::Recovered::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_recovered_latency, dur); } PeeringState::Clean::Clean(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/Clean") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; if (ps->info.last_complete != ps->info.last_update) { ceph_abort(); } ps->try_mark_clean(); context< PeeringMachine >().get_cur_transaction().register_on_commit( pl->on_clean()); } void PeeringState::Clean::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; ps->state_clear(PG_STATE_CLEAN); utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_clean_latency, dur); } template set unique_osd_shard_set(const pg_shard_t & skip, const T &in) { set osds_found; set out; for (auto i = in.begin(); i != in.end(); ++i) { if (*i != skip && !osds_found.count(i->osd)) { osds_found.insert(i->osd); out.insert(*i); } } return out; } /*---------Active---------*/ PeeringState::Active::Active(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active"), remote_shards_to_reserve_recovery( unique_osd_shard_set( context< PeeringMachine >().state->pg_whoami, context< PeeringMachine >().state->acting_recovery_backfill)), remote_shards_to_reserve_backfill( unique_osd_shard_set( context< PeeringMachine >().state->pg_whoami, context< PeeringMachine >().state->backfill_targets)), all_replicas_activated(false) { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ceph_assert(!ps->backfill_reserved); ceph_assert(ps->is_primary()); psdout(10) << "In Active, about to call activate" << dendl; ps->start_flush(context< PeeringMachine >().get_cur_transaction()); ps->activate(context< PeeringMachine >().get_cur_transaction(), ps->get_osdmap_epoch(), context< PeeringMachine >().get_recovery_ctx()); // everyone has to commit/ack before we are truly active ps->blocked_by.clear(); for (auto p = ps->acting_recovery_backfill.begin(); p != ps->acting_recovery_backfill.end(); ++p) { if (p->shard != ps->pg_whoami.shard) { ps->blocked_by.insert(p->shard); } } pl->publish_stats_to_osd(); psdout(10) << "Activate Finished" << dendl; } boost::statechart::result PeeringState::Active::react(const AdvMap& advmap) { DECLARE_LOCALS; if (ps->should_restart_peering( advmap.up_primary, advmap.acting_primary, advmap.newup, advmap.newacting, advmap.lastmap, advmap.osdmap)) { psdout(10) << "Active advmap interval change, fast return" << dendl; return forward_event(); } psdout(10) << "Active advmap" << dendl; bool need_publish = false; pl->on_active_advmap(advmap.osdmap); if (ps->dirty_big_info) { // share updated purged_snaps to mgr/mon so that we (a) stop reporting // purged snaps and (b) perhaps share more snaps that we have purged // but didn't fit in pg_stat_t. need_publish = true; ps->share_pg_info(); } bool need_acting_change = false; for (size_t i = 0; i < ps->want_acting.size(); i++) { int osd = ps->want_acting[i]; if (!advmap.osdmap->is_up(osd)) { pg_shard_t osd_with_shard(osd, shard_id_t(i)); if (!ps->is_acting(osd_with_shard) && !ps->is_up(osd_with_shard)) { psdout(10) << "Active stray osd." << osd << " in want_acting is down" << dendl; need_acting_change = true; } } } if (need_acting_change) { psdout(10) << "Active need acting change, call choose_acting again" << dendl; // possibly because we re-add some strays into the acting set and // some of them then go down in a subsequent map before we could see // the map changing the pg temp. // call choose_acting again to clear them out. // note that we leave restrict_to_up_acting to false in order to // not overkill any chosen stray that is still alive. pg_shard_t auth_log_shard; bool history_les_bound = false; ps->remove_down_peer_info(advmap.osdmap); ps->choose_acting(auth_log_shard, false, &history_les_bound, true); } /* Check for changes in pool size (if the acting set changed as a result, * this does not matter) */ if (advmap.lastmap->get_pg_size(ps->info.pgid.pgid) != ps->get_osdmap()->get_pg_size(ps->info.pgid.pgid)) { if (ps->get_osdmap()->get_pg_size(ps->info.pgid.pgid) <= ps->actingset.size()) { ps->state_clear(PG_STATE_UNDERSIZED); } else { ps->state_set(PG_STATE_UNDERSIZED); } // degraded changes will be detected by call from publish_stats_to_osd() need_publish = true; } // if we haven't reported our PG stats in a long time, do so now. if (ps->info.stats.reported_epoch + ps->cct->_conf->osd_pg_stat_report_interval_max < advmap.osdmap->get_epoch()) { psdout(20) << "reporting stats to osd after " << (advmap.osdmap->get_epoch() - ps->info.stats.reported_epoch) << " epochs" << dendl; need_publish = true; } if (need_publish) pl->publish_stats_to_osd(); if (ps->check_prior_readable_down_osds(advmap.osdmap)) { pl->recheck_readable(); } return forward_event(); } boost::statechart::result PeeringState::Active::react(const ActMap&) { DECLARE_LOCALS; psdout(10) << "Active: handling ActMap" << dendl; ceph_assert(ps->is_primary()); pl->on_active_actmap(); if (ps->have_unfound()) { // object may have become unfound ps->discover_all_missing(context().get_recovery_ctx().msgs); } uint64_t unfound = ps->missing_loc.num_unfound(); if (unfound > 0 && ps->all_unfound_are_queried_or_lost(ps->get_osdmap())) { if (ps->cct->_conf->osd_auto_mark_unfound_lost) { pl->get_clog_error() << context< PeeringMachine >().spgid.pgid << " has " << unfound << " objects unfound and apparently lost, would automatically " << "mark these objects lost but this feature is not yet implemented " << "(osd_auto_mark_unfound_lost)"; } else pl->get_clog_error() << context< PeeringMachine >().spgid.pgid << " has " << unfound << " objects unfound and apparently lost"; } return forward_event(); } boost::statechart::result PeeringState::Active::react(const MNotifyRec& notevt) { DECLARE_LOCALS; ceph_assert(ps->is_primary()); if (ps->peer_info.count(notevt.from)) { psdout(10) << "Active: got notify from " << notevt.from << ", already have info from that osd, ignoring" << dendl; } else if (ps->peer_purged.count(notevt.from)) { psdout(10) << "Active: got notify from " << notevt.from << ", already purged that peer, ignoring" << dendl; } else { psdout(10) << "Active: got notify from " << notevt.from << ", calling proc_replica_info and discover_all_missing" << dendl; ps->proc_replica_info( notevt.from, notevt.notify.info, notevt.notify.epoch_sent); if (ps->have_unfound() || (ps->is_degraded() && ps->might_have_unfound.count(notevt.from))) { ps->discover_all_missing( context().get_recovery_ctx().msgs); } // check if it is a previous down acting member that's coming back. // if so, request pg_temp change to trigger a new interval transition pg_shard_t auth_log_shard; bool history_les_bound = false; // FIXME: Uh-oh we have to check this return value; choose_acting can fail! ps->choose_acting(auth_log_shard, false, &history_les_bound, true); if (!ps->want_acting.empty() && ps->want_acting != ps->acting) { psdout(10) << "Active: got notify from previous acting member " << notevt.from << ", requesting pg_temp change" << dendl; } } return discard_event(); } boost::statechart::result PeeringState::Active::react(const MTrim& trim) { DECLARE_LOCALS; ceph_assert(ps->is_primary()); // peer is informing us of their last_complete_ondisk ldout(ps->cct,10) << " replica osd." << trim.from << " lcod " << trim.trim_to << dendl; ps->update_peer_last_complete_ondisk(pg_shard_t{trim.from, trim.shard}, trim.trim_to); // trim log when the pg is recovered ps->calc_min_last_complete_ondisk(); return discard_event(); } boost::statechart::result PeeringState::Active::react(const MInfoRec& infoevt) { DECLARE_LOCALS; ceph_assert(ps->is_primary()); ceph_assert(!ps->acting_recovery_backfill.empty()); if (infoevt.lease_ack) { ps->proc_lease_ack(infoevt.from.osd, *infoevt.lease_ack); } // don't update history (yet) if we are active and primary; the replica // may be telling us they have activated (and committed) but we can't // share that until _everyone_ does the same. if (ps->is_acting_recovery_backfill(infoevt.from) && ps->peer_activated.count(infoevt.from) == 0) { psdout(10) << " peer osd." << infoevt.from << " activated and committed" << dendl; ps->peer_activated.insert(infoevt.from); ps->blocked_by.erase(infoevt.from.shard); pl->publish_stats_to_osd(); if (ps->peer_activated.size() == ps->acting_recovery_backfill.size()) { all_activated_and_committed(); } } return discard_event(); } boost::statechart::result PeeringState::Active::react(const MLogRec& logevt) { DECLARE_LOCALS; psdout(10) << "searching osd." << logevt.from << " log for unfound items" << dendl; ps->proc_replica_log( logevt.msg->info, logevt.msg->log, std::move(logevt.msg->missing), logevt.from); bool got_missing = ps->search_for_missing( ps->peer_info[logevt.from], ps->peer_missing[logevt.from], logevt.from, context< PeeringMachine >().get_recovery_ctx()); // If there are missing AND we are "fully" active then start recovery now if (got_missing && ps->state_test(PG_STATE_ACTIVE)) { post_event(DoRecovery()); } return discard_event(); } boost::statechart::result PeeringState::Active::react(const QueryState& q) { DECLARE_LOCALS; q.f->open_object_section("state"); q.f->dump_string("name", state_name); q.f->dump_stream("enter_time") << enter_time; { q.f->open_array_section("might_have_unfound"); for (auto p = ps->might_have_unfound.begin(); p != ps->might_have_unfound.end(); ++p) { q.f->open_object_section("osd"); q.f->dump_stream("osd") << *p; if (ps->peer_missing.count(*p)) { q.f->dump_string("status", "already probed"); } else if (ps->peer_missing_requested.count(*p)) { q.f->dump_string("status", "querying"); } else if (!ps->get_osdmap()->is_up(p->osd)) { q.f->dump_string("status", "osd is down"); } else { q.f->dump_string("status", "not queried"); } q.f->close_section(); } q.f->close_section(); } { q.f->open_object_section("recovery_progress"); q.f->open_array_section("backfill_targets"); for (auto p = ps->backfill_targets.begin(); p != ps->backfill_targets.end(); ++p) q.f->dump_stream("replica") << *p; q.f->close_section(); pl->dump_recovery_info(q.f); q.f->close_section(); } q.f->close_section(); return forward_event(); } boost::statechart::result PeeringState::Active::react(const QueryUnfound& q) { DECLARE_LOCALS; ps->query_unfound(q.f, "Active"); return discard_event(); } boost::statechart::result PeeringState::Active::react( const ActivateCommitted &evt) { DECLARE_LOCALS; ceph_assert(!ps->peer_activated.count(ps->pg_whoami)); ps->peer_activated.insert(ps->pg_whoami); psdout(10) << "_activate_committed " << evt.epoch << " peer_activated now " << ps->peer_activated << " last_interval_started " << ps->info.history.last_interval_started << " last_epoch_started " << ps->info.history.last_epoch_started << " same_interval_since " << ps->info.history.same_interval_since << dendl; ceph_assert(!ps->acting_recovery_backfill.empty()); if (ps->peer_activated.size() == ps->acting_recovery_backfill.size()) all_activated_and_committed(); return discard_event(); } boost::statechart::result PeeringState::Active::react(const AllReplicasActivated &evt) { DECLARE_LOCALS; pg_t pgid = context< PeeringMachine >().spgid.pgid; all_replicas_activated = true; ps->state_clear(PG_STATE_ACTIVATING); ps->state_clear(PG_STATE_CREATING); ps->state_clear(PG_STATE_PREMERGE); bool merge_target; if (ps->pool.info.is_pending_merge(pgid, &merge_target)) { ps->state_set(PG_STATE_PEERED); ps->state_set(PG_STATE_PREMERGE); if (ps->actingset.size() != ps->get_osdmap()->get_pg_size(pgid)) { if (merge_target) { pg_t src = pgid; src.set_ps(ps->pool.info.get_pg_num_pending()); assert(src.get_parent() == pgid); pl->set_not_ready_to_merge_target(pgid, src); } else { pl->set_not_ready_to_merge_source(pgid); } } } else if (!ps->acting_set_writeable()) { ps->state_set(PG_STATE_PEERED); } else { ps->state_set(PG_STATE_ACTIVE); } auto mnow = pl->get_mnow(); if (ps->prior_readable_until_ub > mnow) { psdout(10) << " waiting for prior_readable_until_ub " << ps->prior_readable_until_ub << " > mnow " << mnow << dendl; ps->state_set(PG_STATE_WAIT); pl->queue_check_readable( ps->last_peering_reset, ps->prior_readable_until_ub - mnow); } else { psdout(10) << " mnow " << mnow << " >= prior_readable_until_ub " << ps->prior_readable_until_ub << dendl; } if (ps->pool.info.has_flag(pg_pool_t::FLAG_CREATING)) { pl->send_pg_created(pgid); } psdout(1) << __func__ << " AllReplicasActivated Activating complete" << dendl; ps->info.history.last_epoch_started = ps->info.last_epoch_started; ps->info.history.last_interval_started = ps->info.last_interval_started; ps->dirty_info = true; ps->share_pg_info(); pl->publish_stats_to_osd(); pl->on_activate_complete(); return discard_event(); } boost::statechart::result PeeringState::Active::react(const RenewLease& rl) { DECLARE_LOCALS; ps->proc_renew_lease(); return discard_event(); } boost::statechart::result PeeringState::Active::react(const MLeaseAck& la) { DECLARE_LOCALS; ps->proc_lease_ack(la.from, la.lease_ack); return discard_event(); } boost::statechart::result PeeringState::Active::react(const CheckReadable &evt) { DECLARE_LOCALS; pl->recheck_readable(); return discard_event(); } /* * update info.history.last_epoch_started ONLY after we and all * replicas have activated AND committed the activate transaction * (i.e. the peering results are stable on disk). */ void PeeringState::Active::all_activated_and_committed() { DECLARE_LOCALS; psdout(10) << "all_activated_and_committed" << dendl; ceph_assert(ps->is_primary()); ceph_assert(ps->peer_activated.size() == ps->acting_recovery_backfill.size()); ceph_assert(!ps->acting_recovery_backfill.empty()); ceph_assert(ps->blocked_by.empty()); assert(HAVE_FEATURE(ps->upacting_features, SERVER_OCTOPUS)); // this is overkill when the activation is quick, but when it is slow it // is important, because the lease was renewed by the activate itself but we // don't know how long ago that was, and simply scheduling now may leave // a gap in lease coverage. keep it simple and aggressively renew. ps->renew_lease(pl->get_mnow()); ps->send_lease(); ps->schedule_renew_lease(); // Degraded? ps->update_calc_stats(); if (ps->info.stats.stats.sum.num_objects_degraded) { ps->state_set(PG_STATE_DEGRADED); } else { ps->state_clear(PG_STATE_DEGRADED); } post_event(PeeringState::AllReplicasActivated()); } void PeeringState::Active::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; pl->cancel_local_background_io_reservation(); ps->blocked_by.clear(); ps->backfill_reserved = false; ps->state_clear(PG_STATE_ACTIVATING); ps->state_clear(PG_STATE_DEGRADED); ps->state_clear(PG_STATE_UNDERSIZED); ps->state_clear(PG_STATE_BACKFILL_TOOFULL); ps->state_clear(PG_STATE_BACKFILL_WAIT); ps->state_clear(PG_STATE_RECOVERY_WAIT); ps->state_clear(PG_STATE_RECOVERY_TOOFULL); utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_active_latency, dur); pl->on_active_exit(); } /*------ReplicaActive-----*/ PeeringState::ReplicaActive::ReplicaActive(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/ReplicaActive") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ps->start_flush(context< PeeringMachine >().get_cur_transaction()); } boost::statechart::result PeeringState::ReplicaActive::react( const Activate& actevt) { DECLARE_LOCALS; psdout(10) << "In ReplicaActive, about to call activate" << dendl; ps->activate( context< PeeringMachine >().get_cur_transaction(), actevt.activation_epoch, context< PeeringMachine >().get_recovery_ctx()); psdout(10) << "Activate Finished" << dendl; return discard_event(); } boost::statechart::result PeeringState::ReplicaActive::react( const ActivateCommitted &evt) { DECLARE_LOCALS; psdout(10) << __func__ << " " << evt.epoch << " telling primary" << dendl; auto &rctx = context().get_recovery_ctx(); auto epoch = ps->get_osdmap_epoch(); pg_info_t i = ps->info; i.history.last_epoch_started = evt.activation_epoch; i.history.last_interval_started = i.history.same_interval_since; rctx.send_info( ps->get_primary().osd, spg_t(ps->info.pgid.pgid, ps->get_primary().shard), epoch, epoch, i, {}, /* lease */ ps->get_lease_ack()); if (ps->acting_set_writeable()) { ps->state_set(PG_STATE_ACTIVE); } else { ps->state_set(PG_STATE_PEERED); } pl->on_activate_committed(); return discard_event(); } boost::statechart::result PeeringState::ReplicaActive::react(const MLease& l) { DECLARE_LOCALS; spg_t spgid = context< PeeringMachine >().spgid; epoch_t epoch = pl->get_osdmap_epoch(); ps->proc_lease(l.lease); pl->send_cluster_message( ps->get_primary().osd, TOPNSPC::make_message(epoch, spg_t(spgid.pgid, ps->get_primary().shard), ps->get_lease_ack()), epoch); return discard_event(); } boost::statechart::result PeeringState::ReplicaActive::react(const MInfoRec& infoevt) { DECLARE_LOCALS; ps->proc_primary_info(context().get_cur_transaction(), infoevt.info); return discard_event(); } boost::statechart::result PeeringState::ReplicaActive::react(const MLogRec& logevt) { DECLARE_LOCALS; psdout(10) << "received log from " << logevt.from << dendl; ObjectStore::Transaction &t = context().get_cur_transaction(); ps->merge_log(t, logevt.msg->info, std::move(logevt.msg->log), logevt.from); ceph_assert(ps->pg_log.get_head() == ps->info.last_update); if (logevt.msg->lease) { ps->proc_lease(*logevt.msg->lease); } return discard_event(); } boost::statechart::result PeeringState::ReplicaActive::react(const MTrim& trim) { DECLARE_LOCALS; // primary is instructing us to trim ps->pg_log.trim(trim.trim_to, ps->info); ps->dirty_info = true; return discard_event(); } boost::statechart::result PeeringState::ReplicaActive::react(const ActMap&) { DECLARE_LOCALS; if (ps->should_send_notify() && ps->get_primary().osd >= 0) { ps->info.history.refresh_prior_readable_until_ub( pl->get_mnow(), ps->prior_readable_until_ub); context< PeeringMachine >().send_notify( ps->get_primary().osd, pg_notify_t( ps->get_primary().shard, ps->pg_whoami.shard, ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), ps->info, ps->past_intervals)); } return discard_event(); } boost::statechart::result PeeringState::ReplicaActive::react( const MQuery& query) { DECLARE_LOCALS; ps->fulfill_query(query, context().get_recovery_ctx()); return discard_event(); } boost::statechart::result PeeringState::ReplicaActive::react(const QueryState& q) { q.f->open_object_section("state"); q.f->dump_string("name", state_name); q.f->dump_stream("enter_time") << enter_time; q.f->close_section(); return forward_event(); } boost::statechart::result PeeringState::ReplicaActive::react(const QueryUnfound& q) { q.f->dump_string("state", "ReplicaActive"); q.f->dump_bool("available_might_have_unfound", false); return discard_event(); } void PeeringState::ReplicaActive::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; pl->unreserve_recovery_space(); pl->cancel_remote_recovery_reservation(); utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_replicaactive_latency, dur); ps->min_last_complete_ondisk = eversion_t(); } /*-------Stray---*/ PeeringState::Stray::Stray(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Stray") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ceph_assert(!ps->is_peered()); ceph_assert(!ps->is_peering()); ceph_assert(!ps->is_primary()); if (!ps->get_osdmap()->have_pg_pool(ps->info.pgid.pgid.pool())) { ldout(ps->cct,10) << __func__ << " pool is deleted" << dendl; post_event(DeleteStart()); } else { ps->start_flush(context< PeeringMachine >().get_cur_transaction()); } } boost::statechart::result PeeringState::Stray::react(const MLogRec& logevt) { DECLARE_LOCALS; MOSDPGLog *msg = logevt.msg.get(); psdout(10) << "got info+log from osd." << logevt.from << " " << msg->info << " " << msg->log << dendl; ObjectStore::Transaction &t = context().get_cur_transaction(); if (msg->info.last_backfill == hobject_t()) { // restart backfill ps->info = msg->info; pl->on_info_history_change(); ps->dirty_info = true; ps->dirty_big_info = true; // maybe. PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)}; ps->pg_log.reset_backfill_claim_log(msg->log, rollbacker.get()); ps->pg_log.reset_backfill(); } else { ps->merge_log(t, msg->info, std::move(msg->log), logevt.from); } if (logevt.msg->lease) { ps->proc_lease(*logevt.msg->lease); } ceph_assert(ps->pg_log.get_head() == ps->info.last_update); post_event(Activate(logevt.msg->info.last_epoch_started)); return transit(); } boost::statechart::result PeeringState::Stray::react(const MInfoRec& infoevt) { DECLARE_LOCALS; psdout(10) << "got info from osd." << infoevt.from << " " << infoevt.info << dendl; if (ps->info.last_update > infoevt.info.last_update) { // rewind divergent log entries ObjectStore::Transaction &t = context().get_cur_transaction(); ps->rewind_divergent_log(t, infoevt.info.last_update); ps->info.stats = infoevt.info.stats; ps->info.hit_set = infoevt.info.hit_set; } if (infoevt.lease) { ps->proc_lease(*infoevt.lease); } ceph_assert(infoevt.info.last_update == ps->info.last_update); ceph_assert(ps->pg_log.get_head() == ps->info.last_update); post_event(Activate(infoevt.info.last_epoch_started)); return transit(); } boost::statechart::result PeeringState::Stray::react(const MQuery& query) { DECLARE_LOCALS; ps->fulfill_query(query, context().get_recovery_ctx()); return discard_event(); } boost::statechart::result PeeringState::Stray::react(const ActMap&) { DECLARE_LOCALS; if (ps->should_send_notify() && ps->get_primary().osd >= 0) { ps->info.history.refresh_prior_readable_until_ub( pl->get_mnow(), ps->prior_readable_until_ub); context< PeeringMachine >().send_notify( ps->get_primary().osd, pg_notify_t( ps->get_primary().shard, ps->pg_whoami.shard, ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), ps->info, ps->past_intervals)); } return discard_event(); } void PeeringState::Stray::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_stray_latency, dur); } /*--------ToDelete----------*/ PeeringState::ToDelete::ToDelete(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/ToDelete") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; pl->get_perf_logger().inc(l_osd_pg_removing); } void PeeringState::ToDelete::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; // note: on a successful removal, this path doesn't execute. see // do_delete_work(). pl->get_perf_logger().dec(l_osd_pg_removing); pl->cancel_local_background_io_reservation(); } /*----WaitDeleteReserved----*/ PeeringState::WaitDeleteReserved::WaitDeleteReserved(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/ToDelete/WaitDeleteReseved") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; context< ToDelete >().priority = ps->get_delete_priority(); pl->cancel_local_background_io_reservation(); pl->request_local_background_io_reservation( context().priority, std::make_unique( ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), DeleteReserved()), std::make_unique( ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), DeleteInterrupted())); } boost::statechart::result PeeringState::ToDelete::react( const ActMap& evt) { DECLARE_LOCALS; if (ps->get_delete_priority() != priority) { psdout(10) << __func__ << " delete priority changed, resetting" << dendl; return transit(); } return discard_event(); } void PeeringState::WaitDeleteReserved::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); } /*----Deleting-----*/ PeeringState::Deleting::Deleting(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/ToDelete/Deleting") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ps->deleting = true; ObjectStore::Transaction &t = context().get_cur_transaction(); // clear log PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)}; ps->pg_log.roll_forward(rollbacker.get()); // adjust info to backfill ps->info.set_last_backfill(hobject_t()); ps->pg_log.reset_backfill(); ps->dirty_info = true; pl->on_removal(t); } boost::statechart::result PeeringState::Deleting::react( const DeleteSome& evt) { DECLARE_LOCALS; std::pair p; p = pl->do_delete_work(context().get_cur_transaction(), next); next = p.first; return p.second ? discard_event() : terminate(); } void PeeringState::Deleting::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; ps->deleting = false; pl->cancel_local_background_io_reservation(); } /*--------GetInfo---------*/ PeeringState::GetInfo::GetInfo(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Peering/GetInfo") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ps->check_past_interval_bounds(); ps->log_weirdness(); PastIntervals::PriorSet &prior_set = context< Peering >().prior_set; ceph_assert(ps->blocked_by.empty()); prior_set = ps->build_prior(); ps->prior_readable_down_osds = prior_set.down; if (ps->prior_readable_down_osds.empty()) { psdout(10) << " no prior_set down osds, will clear prior_readable_until_ub before activating" << dendl; } ps->reset_min_peer_features(); get_infos(); if (prior_set.pg_down) { post_event(IsDown()); } else if (peer_info_requested.empty()) { post_event(GotInfo()); } } void PeeringState::GetInfo::get_infos() { DECLARE_LOCALS; PastIntervals::PriorSet &prior_set = context< Peering >().prior_set; ps->blocked_by.clear(); for (auto it = prior_set.probe.begin(); it != prior_set.probe.end(); ++it) { pg_shard_t peer = *it; if (peer == ps->pg_whoami) { continue; } if (ps->peer_info.count(peer)) { psdout(10) << " have osd." << peer << " info " << ps->peer_info[peer] << dendl; continue; } if (peer_info_requested.count(peer)) { psdout(10) << " already requested info from osd." << peer << dendl; ps->blocked_by.insert(peer.osd); } else if (!ps->get_osdmap()->is_up(peer.osd)) { psdout(10) << " not querying info from down osd." << peer << dendl; } else { psdout(10) << " querying info from osd." << peer << dendl; context< PeeringMachine >().send_query( peer.osd, pg_query_t(pg_query_t::INFO, it->shard, ps->pg_whoami.shard, ps->info.history, ps->get_osdmap_epoch())); peer_info_requested.insert(peer); ps->blocked_by.insert(peer.osd); } } ps->check_prior_readable_down_osds(ps->get_osdmap()); pl->publish_stats_to_osd(); } boost::statechart::result PeeringState::GetInfo::react(const MNotifyRec& infoevt) { DECLARE_LOCALS; auto p = peer_info_requested.find(infoevt.from); if (p != peer_info_requested.end()) { peer_info_requested.erase(p); ps->blocked_by.erase(infoevt.from.osd); } epoch_t old_start = ps->info.history.last_epoch_started; if (ps->proc_replica_info( infoevt.from, infoevt.notify.info, infoevt.notify.epoch_sent)) { // we got something new ... PastIntervals::PriorSet &prior_set = context< Peering >().prior_set; if (old_start < ps->info.history.last_epoch_started) { psdout(10) << " last_epoch_started moved forward, rebuilding prior" << dendl; prior_set = ps->build_prior(); ps->prior_readable_down_osds = prior_set.down; // filter out any osds that got dropped from the probe set from // peer_info_requested. this is less expensive than restarting // peering (which would re-probe everyone). auto p = peer_info_requested.begin(); while (p != peer_info_requested.end()) { if (prior_set.probe.count(*p) == 0) { psdout(20) << " dropping osd." << *p << " from info_requested, no longer in probe set" << dendl; peer_info_requested.erase(p++); } else { ++p; } } get_infos(); } psdout(20) << "Adding osd: " << infoevt.from.osd << " peer features: " << hex << infoevt.features << dec << dendl; ps->apply_peer_features(infoevt.features); // are we done getting everything? if (peer_info_requested.empty() && !prior_set.pg_down) { psdout(20) << "Common peer features: " << hex << ps->get_min_peer_features() << dec << dendl; psdout(20) << "Common acting features: " << hex << ps->get_min_acting_features() << dec << dendl; psdout(20) << "Common upacting features: " << hex << ps->get_min_upacting_features() << dec << dendl; post_event(GotInfo()); } } return discard_event(); } boost::statechart::result PeeringState::GetInfo::react(const QueryState& q) { DECLARE_LOCALS; q.f->open_object_section("state"); q.f->dump_string("name", state_name); q.f->dump_stream("enter_time") << enter_time; q.f->open_array_section("requested_info_from"); for (auto p = peer_info_requested.begin(); p != peer_info_requested.end(); ++p) { q.f->open_object_section("osd"); q.f->dump_stream("osd") << *p; if (ps->peer_info.count(*p)) { q.f->open_object_section("got_info"); ps->peer_info[*p].dump(q.f); q.f->close_section(); } q.f->close_section(); } q.f->close_section(); q.f->close_section(); return forward_event(); } boost::statechart::result PeeringState::GetInfo::react(const QueryUnfound& q) { q.f->dump_string("state", "GetInfo"); q.f->dump_bool("available_might_have_unfound", false); return discard_event(); } void PeeringState::GetInfo::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_getinfo_latency, dur); ps->blocked_by.clear(); } /*------GetLog------------*/ PeeringState::GetLog::GetLog(my_context ctx) : my_base(ctx), NamedState( context< PeeringMachine >().state_history, "Started/Primary/Peering/GetLog"), msg(0) { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ps->log_weirdness(); // adjust acting? if (!ps->choose_acting(auth_log_shard, false, &context< Peering >().history_les_bound)) { if (!ps->want_acting.empty()) { post_event(NeedActingChange()); } else { post_event(IsIncomplete()); } return; } // am i the best? if (auth_log_shard == ps->pg_whoami) { post_event(GotLog()); return; } const pg_info_t& best = ps->peer_info[auth_log_shard]; // am i broken? if (ps->info.last_update < best.log_tail) { psdout(10) << " not contiguous with osd." << auth_log_shard << ", down" << dendl; post_event(IsIncomplete()); return; } // how much log to request? eversion_t request_log_from = ps->info.last_update; ceph_assert(!ps->acting_recovery_backfill.empty()); for (auto p = ps->acting_recovery_backfill.begin(); p != ps->acting_recovery_backfill.end(); ++p) { if (*p == ps->pg_whoami) continue; pg_info_t& ri = ps->peer_info[*p]; if (ri.last_update < ps->info.log_tail && ri.last_update >= best.log_tail && ri.last_update < request_log_from) request_log_from = ri.last_update; } // how much? psdout(10) << " requesting log from osd." << auth_log_shard << dendl; context().send_query( auth_log_shard.osd, pg_query_t( pg_query_t::LOG, auth_log_shard.shard, ps->pg_whoami.shard, request_log_from, ps->info.history, ps->get_osdmap_epoch())); ceph_assert(ps->blocked_by.empty()); ps->blocked_by.insert(auth_log_shard.osd); pl->publish_stats_to_osd(); } boost::statechart::result PeeringState::GetLog::react(const AdvMap& advmap) { // make sure our log source didn't go down. we need to check // explicitly because it may not be part of the prior set, which // means the Peering state check won't catch it going down. if (!advmap.osdmap->is_up(auth_log_shard.osd)) { psdout(10) << "GetLog: auth_log_shard osd." << auth_log_shard.osd << " went down" << dendl; post_event(advmap); return transit< Reset >(); } // let the Peering state do its checks. return forward_event(); } boost::statechart::result PeeringState::GetLog::react(const MLogRec& logevt) { ceph_assert(!msg); if (logevt.from != auth_log_shard) { psdout(10) << "GetLog: discarding log from " << "non-auth_log_shard osd." << logevt.from << dendl; return discard_event(); } psdout(10) << "GetLog: received master log from osd." << logevt.from << dendl; msg = logevt.msg; post_event(GotLog()); return discard_event(); } boost::statechart::result PeeringState::GetLog::react(const GotLog&) { DECLARE_LOCALS; psdout(10) << "leaving GetLog" << dendl; if (msg) { psdout(10) << "processing master log" << dendl; ps->proc_master_log(context().get_cur_transaction(), msg->info, std::move(msg->log), std::move(msg->missing), auth_log_shard); } ps->start_flush(context< PeeringMachine >().get_cur_transaction()); return transit< GetMissing >(); } boost::statechart::result PeeringState::GetLog::react(const QueryState& q) { q.f->open_object_section("state"); q.f->dump_string("name", state_name); q.f->dump_stream("enter_time") << enter_time; q.f->dump_stream("auth_log_shard") << auth_log_shard; q.f->close_section(); return forward_event(); } boost::statechart::result PeeringState::GetLog::react(const QueryUnfound& q) { q.f->dump_string("state", "GetLog"); q.f->dump_bool("available_might_have_unfound", false); return discard_event(); } void PeeringState::GetLog::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_getlog_latency, dur); ps->blocked_by.clear(); } /*------WaitActingChange--------*/ PeeringState::WaitActingChange::WaitActingChange(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/WaitActingChange") { context< PeeringMachine >().log_enter(state_name); } boost::statechart::result PeeringState::WaitActingChange::react(const AdvMap& advmap) { DECLARE_LOCALS; OSDMapRef osdmap = advmap.osdmap; psdout(10) << "verifying no want_acting " << ps->want_acting << " targets didn't go down" << dendl; for (auto p = ps->want_acting.begin(); p != ps->want_acting.end(); ++p) { if (!osdmap->is_up(*p)) { psdout(10) << " want_acting target osd." << *p << " went down, resetting" << dendl; post_event(advmap); return transit< Reset >(); } } return forward_event(); } boost::statechart::result PeeringState::WaitActingChange::react(const MLogRec& logevt) { psdout(10) << "In WaitActingChange, ignoring MLocRec" << dendl; return discard_event(); } boost::statechart::result PeeringState::WaitActingChange::react(const MInfoRec& evt) { psdout(10) << "In WaitActingChange, ignoring MInfoRec" << dendl; return discard_event(); } boost::statechart::result PeeringState::WaitActingChange::react(const MNotifyRec& evt) { psdout(10) << "In WaitActingChange, ignoring MNotifyRec" << dendl; return discard_event(); } boost::statechart::result PeeringState::WaitActingChange::react(const QueryState& q) { q.f->open_object_section("state"); q.f->dump_string("name", state_name); q.f->dump_stream("enter_time") << enter_time; q.f->dump_string("comment", "waiting for pg acting set to change"); q.f->close_section(); return forward_event(); } boost::statechart::result PeeringState::WaitActingChange::react(const QueryUnfound& q) { q.f->dump_string("state", "WaitActingChange"); q.f->dump_bool("available_might_have_unfound", false); return discard_event(); } void PeeringState::WaitActingChange::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_waitactingchange_latency, dur); } /*------Down--------*/ PeeringState::Down::Down(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Peering/Down") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ps->state_clear(PG_STATE_PEERING); ps->state_set(PG_STATE_DOWN); auto &prior_set = context< Peering >().prior_set; ceph_assert(ps->blocked_by.empty()); ps->blocked_by.insert(prior_set.down.begin(), prior_set.down.end()); pl->publish_stats_to_osd(); } void PeeringState::Down::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; ps->state_clear(PG_STATE_DOWN); utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_down_latency, dur); ps->blocked_by.clear(); } boost::statechart::result PeeringState::Down::react(const QueryState& q) { q.f->open_object_section("state"); q.f->dump_string("name", state_name); q.f->dump_stream("enter_time") << enter_time; q.f->dump_string("comment", "not enough up instances of this PG to go active"); q.f->close_section(); return forward_event(); } boost::statechart::result PeeringState::Down::react(const QueryUnfound& q) { q.f->dump_string("state", "Down"); q.f->dump_bool("available_might_have_unfound", false); return discard_event(); } boost::statechart::result PeeringState::Down::react(const MNotifyRec& infoevt) { DECLARE_LOCALS; ceph_assert(ps->is_primary()); epoch_t old_start = ps->info.history.last_epoch_started; if (!ps->peer_info.count(infoevt.from) && ps->get_osdmap()->has_been_up_since(infoevt.from.osd, infoevt.notify.epoch_sent)) { ps->update_history(infoevt.notify.info.history); } // if we got something new to make pg escape down state if (ps->info.history.last_epoch_started > old_start) { psdout(10) << " last_epoch_started moved forward, re-enter getinfo" << dendl; ps->state_clear(PG_STATE_DOWN); ps->state_set(PG_STATE_PEERING); return transit< GetInfo >(); } return discard_event(); } /*------Incomplete--------*/ PeeringState::Incomplete::Incomplete(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Peering/Incomplete") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ps->state_clear(PG_STATE_PEERING); ps->state_set(PG_STATE_INCOMPLETE); PastIntervals::PriorSet &prior_set = context< Peering >().prior_set; ceph_assert(ps->blocked_by.empty()); ps->blocked_by.insert(prior_set.down.begin(), prior_set.down.end()); pl->publish_stats_to_osd(); } boost::statechart::result PeeringState::Incomplete::react(const AdvMap &advmap) { DECLARE_LOCALS; int64_t poolnum = ps->info.pgid.pool(); // Reset if min_size turn smaller than previous value, pg might now be able to go active if (!advmap.osdmap->have_pg_pool(poolnum) || advmap.lastmap->get_pools().find(poolnum)->second.min_size > advmap.osdmap->get_pools().find(poolnum)->second.min_size) { post_event(advmap); return transit< Reset >(); } return forward_event(); } boost::statechart::result PeeringState::Incomplete::react(const MNotifyRec& notevt) { DECLARE_LOCALS; psdout(7) << "handle_pg_notify from osd." << notevt.from << dendl; if (ps->proc_replica_info( notevt.from, notevt.notify.info, notevt.notify.epoch_sent)) { // We got something new, try again! return transit< GetLog >(); } else { return discard_event(); } } boost::statechart::result PeeringState::Incomplete::react( const QueryState& q) { q.f->open_object_section("state"); q.f->dump_string("name", state_name); q.f->dump_stream("enter_time") << enter_time; q.f->dump_string("comment", "not enough complete instances of this PG"); q.f->close_section(); return forward_event(); } boost::statechart::result PeeringState::Incomplete::react(const QueryUnfound& q) { q.f->dump_string("state", "Incomplete"); q.f->dump_bool("available_might_have_unfound", false); return discard_event(); } void PeeringState::Incomplete::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; ps->state_clear(PG_STATE_INCOMPLETE); utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_incomplete_latency, dur); ps->blocked_by.clear(); } /*------GetMissing--------*/ PeeringState::GetMissing::GetMissing(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Peering/GetMissing") { context< PeeringMachine >().log_enter(state_name); DECLARE_LOCALS; ps->log_weirdness(); ceph_assert(!ps->acting_recovery_backfill.empty()); eversion_t since; for (auto i = ps->acting_recovery_backfill.begin(); i != ps->acting_recovery_backfill.end(); ++i) { if (*i == ps->get_primary()) continue; const pg_info_t& pi = ps->peer_info[*i]; // reset this so to make sure the pg_missing_t is initialized and // has the correct semantics even if we don't need to get a // missing set from a shard. This way later additions due to // lost+unfound delete work properly. ps->peer_missing[*i].may_include_deletes = !ps->perform_deletes_during_peering(); if (pi.is_empty()) continue; // no pg data, nothing divergent if (pi.last_update < ps->pg_log.get_tail()) { psdout(10) << " osd." << *i << " is not contiguous, will restart backfill" << dendl; ps->peer_missing[*i].clear(); continue; } if (pi.last_backfill == hobject_t()) { psdout(10) << " osd." << *i << " will fully backfill; can infer empty missing set" << dendl; ps->peer_missing[*i].clear(); continue; } if (pi.last_update == pi.last_complete && // peer has no missing pi.last_update == ps->info.last_update) { // peer is up to date // replica has no missing and identical log as us. no need to // pull anything. // FIXME: we can do better here. if last_update==last_complete we // can infer the rest! psdout(10) << " osd." << *i << " has no missing, identical log" << dendl; ps->peer_missing[*i].clear(); continue; } // We pull the log from the peer's last_epoch_started to ensure we // get enough log to detect divergent updates. since.epoch = pi.last_epoch_started; ceph_assert(pi.last_update >= ps->info.log_tail); // or else choose_acting() did a bad thing if (pi.log_tail <= since) { psdout(10) << " requesting log+missing since " << since << " from osd." << *i << dendl; context< PeeringMachine >().send_query( i->osd, pg_query_t( pg_query_t::LOG, i->shard, ps->pg_whoami.shard, since, ps->info.history, ps->get_osdmap_epoch())); } else { psdout(10) << " requesting fulllog+missing from osd." << *i << " (want since " << since << " < log.tail " << pi.log_tail << ")" << dendl; context< PeeringMachine >().send_query( i->osd, pg_query_t( pg_query_t::FULLLOG, i->shard, ps->pg_whoami.shard, ps->info.history, ps->get_osdmap_epoch())); } peer_missing_requested.insert(*i); ps->blocked_by.insert(i->osd); } if (peer_missing_requested.empty()) { if (ps->need_up_thru) { psdout(10) << " still need up_thru update before going active" << dendl; post_event(NeedUpThru()); return; } // all good! post_event(Activate(ps->get_osdmap_epoch())); } else { pl->publish_stats_to_osd(); } } boost::statechart::result PeeringState::GetMissing::react(const MLogRec& logevt) { DECLARE_LOCALS; peer_missing_requested.erase(logevt.from); ps->proc_replica_log(logevt.msg->info, logevt.msg->log, std::move(logevt.msg->missing), logevt.from); if (peer_missing_requested.empty()) { if (ps->need_up_thru) { psdout(10) << " still need up_thru update before going active" << dendl; post_event(NeedUpThru()); } else { psdout(10) << "Got last missing, don't need missing " << "posting Activate" << dendl; post_event(Activate(ps->get_osdmap_epoch())); } } return discard_event(); } boost::statechart::result PeeringState::GetMissing::react(const QueryState& q) { DECLARE_LOCALS; q.f->open_object_section("state"); q.f->dump_string("name", state_name); q.f->dump_stream("enter_time") << enter_time; q.f->open_array_section("peer_missing_requested"); for (auto p = peer_missing_requested.begin(); p != peer_missing_requested.end(); ++p) { q.f->open_object_section("osd"); q.f->dump_stream("osd") << *p; if (ps->peer_missing.count(*p)) { q.f->open_object_section("got_missing"); ps->peer_missing[*p].dump(q.f); q.f->close_section(); } q.f->close_section(); } q.f->close_section(); q.f->close_section(); return forward_event(); } boost::statechart::result PeeringState::GetMissing::react(const QueryUnfound& q) { q.f->dump_string("state", "GetMising"); q.f->dump_bool("available_might_have_unfound", false); return discard_event(); } void PeeringState::GetMissing::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_getmissing_latency, dur); ps->blocked_by.clear(); } /*------WaitUpThru--------*/ PeeringState::WaitUpThru::WaitUpThru(my_context ctx) : my_base(ctx), NamedState(context< PeeringMachine >().state_history, "Started/Primary/Peering/WaitUpThru") { context< PeeringMachine >().log_enter(state_name); } boost::statechart::result PeeringState::WaitUpThru::react(const ActMap& am) { DECLARE_LOCALS; if (!ps->need_up_thru) { post_event(Activate(ps->get_osdmap_epoch())); } return forward_event(); } boost::statechart::result PeeringState::WaitUpThru::react(const MLogRec& logevt) { DECLARE_LOCALS; psdout(10) << "Noting missing from osd." << logevt.from << dendl; ps->peer_missing[logevt.from].claim(std::move(logevt.msg->missing)); ps->peer_info[logevt.from] = logevt.msg->info; return discard_event(); } boost::statechart::result PeeringState::WaitUpThru::react(const QueryState& q) { q.f->open_object_section("state"); q.f->dump_string("name", state_name); q.f->dump_stream("enter_time") << enter_time; q.f->dump_string("comment", "waiting for osdmap to reflect a new up_thru for this osd"); q.f->close_section(); return forward_event(); } boost::statechart::result PeeringState::WaitUpThru::react(const QueryUnfound& q) { q.f->dump_string("state", "WaitUpThru"); q.f->dump_bool("available_might_have_unfound", false); return discard_event(); } void PeeringState::WaitUpThru::exit() { context< PeeringMachine >().log_exit(state_name, enter_time); DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; pl->get_peering_perf().tinc(rs_waitupthru_latency, dur); } /*----PeeringState::PeeringMachine Methods-----*/ #undef dout_prefix #define dout_prefix dpp->gen_prefix(*_dout) void PeeringState::PeeringMachine::log_enter(const char *state_name) { DECLARE_LOCALS; psdout(5) << "enter " << state_name << dendl; pl->log_state_enter(state_name); } void PeeringState::PeeringMachine::log_exit(const char *state_name, utime_t enter_time) { DECLARE_LOCALS; utime_t dur = ceph_clock_now() - enter_time; psdout(5) << "exit " << state_name << " " << dur << " " << event_count << " " << event_time << dendl; pl->log_state_exit(state_name, enter_time, event_count, event_time); event_count = 0; event_time = utime_t(); } ostream &operator<<(ostream &out, const PeeringState &ps) { out << "pg[" << ps.info << " " << pg_vector_string(ps.up); if (ps.acting != ps.up) out << "/" << pg_vector_string(ps.acting); if (ps.is_ec_pg()) out << "p" << ps.get_primary(); if (!ps.async_recovery_targets.empty()) out << " async=[" << ps.async_recovery_targets << "]"; if (!ps.backfill_targets.empty()) out << " backfill=[" << ps.backfill_targets << "]"; out << " r=" << ps.get_role(); out << " lpr=" << ps.get_last_peering_reset(); if (ps.deleting) out << " DELETING"; if (!ps.past_intervals.empty()) { out << " pi=[" << ps.past_intervals.get_bounds() << ")/" << ps.past_intervals.size(); } if (ps.is_peered()) { if (ps.last_update_ondisk != ps.info.last_update) out << " luod=" << ps.last_update_ondisk; if (ps.last_update_applied != ps.info.last_update) out << " lua=" << ps.last_update_applied; } if (ps.pg_log.get_tail() != ps.info.log_tail || ps.pg_log.get_head() != ps.info.last_update) out << " (info mismatch, " << ps.pg_log.get_log() << ")"; if (!ps.pg_log.get_log().empty()) { if ((ps.pg_log.get_log().log.begin()->version <= ps.pg_log.get_tail())) { out << " (log bound mismatch, actual=[" << ps.pg_log.get_log().log.begin()->version << "," << ps.pg_log.get_log().log.rbegin()->version << "]"; out << ")"; } } out << " crt=" << ps.pg_log.get_can_rollback_to(); if (ps.last_complete_ondisk != ps.info.last_complete) out << " lcod " << ps.last_complete_ondisk; out << " mlcod " << ps.min_last_complete_ondisk; out << " " << pg_state_string(ps.get_state()); if (ps.should_send_notify()) out << " NOTIFY"; if (ps.prior_readable_until_ub != ceph::signedspan::zero()) { out << " pruub " << ps.prior_readable_until_ub << "@" << ps.get_prior_readable_down_osds(); } return out; } std::vector PeeringState::get_replica_recovery_order() const { std::vector> replicas_by_num_missing, async_by_num_missing; replicas_by_num_missing.reserve(get_acting_recovery_backfill().size() - 1); for (auto &p : get_acting_recovery_backfill()) { if (p == get_primary()) { continue; } auto pm = get_peer_missing().find(p); assert(pm != get_peer_missing().end()); auto nm = pm->second.num_missing(); if (nm != 0) { if (is_async_recovery_target(p)) { async_by_num_missing.push_back(make_pair(nm, p)); } else { replicas_by_num_missing.push_back(make_pair(nm, p)); } } } // sort by number of missing objects, in ascending order. auto func = [](const std::pair &lhs, const std::pair &rhs) { return lhs.first < rhs.first; }; // acting goes first std::sort(replicas_by_num_missing.begin(), replicas_by_num_missing.end(), func); // then async_recovery_targets std::sort(async_by_num_missing.begin(), async_by_num_missing.end(), func); replicas_by_num_missing.insert(replicas_by_num_missing.end(), async_by_num_missing.begin(), async_by_num_missing.end()); std::vector ret; ret.reserve(replicas_by_num_missing.size()); for (auto p : replicas_by_num_missing) { ret.push_back(p.second); } return ret; }