diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/osd/PeeringState.cc | |
parent | Initial commit. (diff) | |
download | ceph-b26c4052f3542036551aa9dec9caa4226e456195.tar.xz ceph-b26c4052f3542036551aa9dec9caa4226e456195.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/osd/PeeringState.cc')
-rw-r--r-- | src/osd/PeeringState.cc | 7535 |
1 files changed, 7535 insertions, 0 deletions
diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc new file mode 100644 index 000000000..c86fbdc6f --- /dev/null +++ b/src/osd/PeeringState.cc @@ -0,0 +1,7535 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "PGPeeringEvent.h" +#include "common/ceph_releases.h" +#include "common/dout.h" +#include "PeeringState.h" + +#include "messages/MOSDPGRemove.h" +#include "messages/MBackfillReserve.h" +#include "messages/MRecoveryReserve.h" +#include "messages/MOSDScrubReserve.h" +#include "messages/MOSDPGInfo2.h" +#include "messages/MOSDPGTrim.h" +#include "messages/MOSDPGLog.h" +#include "messages/MOSDPGNotify2.h" +#include "messages/MOSDPGQuery2.h" +#include "messages/MOSDPGLease.h" +#include "messages/MOSDPGLeaseAck.h" + +#define dout_context cct +#define dout_subsys ceph_subsys_osd + +using std::dec; +using std::hex; +using std::make_pair; +using std::map; +using std::ostream; +using std::pair; +using std::set; +using std::string; +using std::stringstream; +using std::vector; + +using ceph::Formatter; +using ceph::make_message; + +BufferedRecoveryMessages::BufferedRecoveryMessages(PeeringCtx &ctx) + // steal messages from ctx + : message_map{std::move(ctx.message_map)} +{} + +void BufferedRecoveryMessages::send_notify(int to, const pg_notify_t &n) +{ + spg_t pgid(n.info.pgid.pgid, n.to); + send_osd_message(to, TOPNSPC::make_message<MOSDPGNotify2>(pgid, n)); +} + +void BufferedRecoveryMessages::send_query( + int to, + spg_t to_spgid, + const pg_query_t &q) +{ + send_osd_message(to, TOPNSPC::make_message<MOSDPGQuery2>(to_spgid, q)); +} + +void BufferedRecoveryMessages::send_info( + int to, + spg_t to_spgid, + epoch_t min_epoch, + epoch_t cur_epoch, + const pg_info_t &info, + std::optional<pg_lease_t> lease, + std::optional<pg_lease_ack_t> lease_ack) +{ + send_osd_message( + to, + TOPNSPC::make_message<MOSDPGInfo2>( + to_spgid, + info, + cur_epoch, + min_epoch, + lease, + lease_ack) + ); +} + +void PGPool::update(OSDMapRef map) +{ + const pg_pool_t *pi = map->get_pg_pool(id); + if (!pi) { + return; // pool has been deleted + } + info = *pi; + name = map->get_pool_name(id); + + bool updated = false; + if ((map->get_epoch() != cached_epoch + 1) || + (pi->get_snap_epoch() == map->get_epoch())) { + updated = true; + } + + if (info.is_pool_snaps_mode() && updated) { + snapc = pi->get_snap_context(); + } + cached_epoch = map->get_epoch(); +} + +/*-------------Peering State Helpers----------------*/ +#undef dout_prefix +#define dout_prefix (dpp->gen_prefix(*_dout)) +#undef psdout +#define psdout(x) ldout(cct, x) + +PeeringState::PeeringState( + CephContext *cct, + pg_shard_t pg_whoami, + spg_t spgid, + const PGPool &_pool, + OSDMapRef curmap, + DoutPrefixProvider *dpp, + PeeringListener *pl) + : state_history(*pl), + cct(cct), + spgid(spgid), + dpp(dpp), + pl(pl), + orig_ctx(0), + osdmap_ref(curmap), + pool(_pool), + pg_whoami(pg_whoami), + info(spgid), + pg_log(cct), + last_require_osd_release(curmap->require_osd_release), + missing_loc(spgid, this, dpp, cct), + machine(this, cct, spgid, dpp, pl, &state_history) +{ + machine.initiate(); +} + +void PeeringState::start_handle(PeeringCtx *new_ctx) { + ceph_assert(!rctx); + ceph_assert(!orig_ctx); + orig_ctx = new_ctx; + if (new_ctx) { + if (messages_pending_flush) { + rctx.emplace(*messages_pending_flush, *new_ctx); + } else { + rctx.emplace(*new_ctx); + } + rctx->start_time = ceph_clock_now(); + } +} + +void PeeringState::begin_block_outgoing() { + ceph_assert(!messages_pending_flush); + ceph_assert(orig_ctx); + ceph_assert(rctx); + messages_pending_flush.emplace(); + rctx.emplace(*messages_pending_flush, *orig_ctx); +} + +void PeeringState::clear_blocked_outgoing() { + ceph_assert(orig_ctx); + ceph_assert(rctx); + messages_pending_flush = std::optional<BufferedRecoveryMessages>(); +} + +void PeeringState::end_block_outgoing() { + ceph_assert(messages_pending_flush); + ceph_assert(orig_ctx); + ceph_assert(rctx); + + orig_ctx->accept_buffered_messages(*messages_pending_flush); + rctx.emplace(*orig_ctx); + messages_pending_flush = std::optional<BufferedRecoveryMessages>(); +} + +void PeeringState::end_handle() { + if (rctx) { + utime_t dur = ceph_clock_now() - rctx->start_time; + machine.event_time += dur; + } + + machine.event_count++; + rctx = std::nullopt; + orig_ctx = NULL; +} + +void PeeringState::check_recovery_sources(const OSDMapRef& osdmap) +{ + /* + * check that any peers we are planning to (or currently) pulling + * objects from are dealt with. + */ + missing_loc.check_recovery_sources(osdmap); + pl->check_recovery_sources(osdmap); + + for (auto i = peer_log_requested.begin(); i != peer_log_requested.end();) { + if (!osdmap->is_up(i->osd)) { + psdout(10) << "peer_log_requested removing " << *i << dendl; + peer_log_requested.erase(i++); + } else { + ++i; + } + } + + for (auto i = peer_missing_requested.begin(); + i != peer_missing_requested.end();) { + if (!osdmap->is_up(i->osd)) { + psdout(10) << "peer_missing_requested removing " << *i << dendl; + peer_missing_requested.erase(i++); + } else { + ++i; + } + } +} + +void PeeringState::update_history(const pg_history_t& new_history) +{ + auto mnow = pl->get_mnow(); + info.history.refresh_prior_readable_until_ub(mnow, prior_readable_until_ub); + if (info.history.merge(new_history)) { + psdout(20) << __func__ << " advanced history from " << new_history << dendl; + dirty_info = true; + if (info.history.last_epoch_clean >= info.history.same_interval_since) { + psdout(20) << __func__ << " clearing past_intervals" << dendl; + past_intervals.clear(); + dirty_big_info = true; + } + prior_readable_until_ub = info.history.get_prior_readable_until_ub(mnow); + if (prior_readable_until_ub != ceph::signedspan::zero()) { + dout(20) << __func__ + << " prior_readable_until_ub " << prior_readable_until_ub + << " (mnow " << mnow << " + " + << info.history.prior_readable_until_ub << ")" << dendl; + } + } + pl->on_info_history_change(); +} + +hobject_t PeeringState::earliest_backfill() const +{ + hobject_t e = hobject_t::get_max(); + for (const pg_shard_t& bt : get_backfill_targets()) { + const pg_info_t &pi = get_peer_info(bt); + e = std::min(pi.last_backfill, e); + } + return e; +} + +void PeeringState::purge_strays() +{ + if (is_premerge()) { + psdout(10) << "purge_strays " << stray_set << " but premerge, doing nothing" + << dendl; + return; + } + if (cct->_conf.get_val<bool>("osd_debug_no_purge_strays")) { + return; + } + psdout(10) << "purge_strays " << stray_set << dendl; + + bool removed = false; + for (auto p = stray_set.begin(); p != stray_set.end(); ++p) { + ceph_assert(!is_acting_recovery_backfill(*p)); + if (get_osdmap()->is_up(p->osd)) { + psdout(10) << "sending PGRemove to osd." << *p << dendl; + vector<spg_t> to_remove; + to_remove.push_back(spg_t(info.pgid.pgid, p->shard)); + auto m = TOPNSPC::make_message<MOSDPGRemove>( + get_osdmap_epoch(), + to_remove); + pl->send_cluster_message(p->osd, std::move(m), get_osdmap_epoch()); + } else { + psdout(10) << "not sending PGRemove to down osd." << *p << dendl; + } + peer_missing.erase(*p); + peer_info.erase(*p); + missing_loc.remove_stray_recovery_sources(*p); + peer_purged.insert(*p); + removed = true; + } + + // if we removed anyone, update peers (which include peer_info) + if (removed) + update_heartbeat_peers(); + + stray_set.clear(); + + // clear _requested maps; we may have to peer() again if we discover + // (more) stray content + peer_log_requested.clear(); + peer_missing_requested.clear(); +} + +void PeeringState::query_unfound(Formatter *f, string state) +{ + psdout(20) << "Enter PeeringState common QueryUnfound" << dendl; + { + f->dump_string("state", state); + f->dump_bool("available_might_have_unfound", true); + f->open_array_section("might_have_unfound"); + for (auto p = might_have_unfound.begin(); + p != might_have_unfound.end(); + ++p) { + if (peer_missing.count(*p)) { + ; // Ignore already probed OSDs + } else { + f->open_object_section("osd"); + f->dump_stream("osd") << *p; + if (peer_missing_requested.count(*p)) { + f->dump_string("status", "querying"); + } else if (!get_osdmap()->is_up(p->osd)) { + f->dump_string("status", "osd is down"); + } else { + f->dump_string("status", "not queried"); + } + f->close_section(); + } + } + f->close_section(); + } + psdout(20) << "Exit PeeringState common QueryUnfound" << dendl; + return; +} + +bool PeeringState::proc_replica_info( + pg_shard_t from, const pg_info_t &oinfo, epoch_t send_epoch) +{ + auto p = peer_info.find(from); + if (p != peer_info.end() && p->second.last_update == oinfo.last_update) { + psdout(10) << " got dup osd." << from << " info " + << oinfo << ", identical to ours" << dendl; + return false; + } + + if (!get_osdmap()->has_been_up_since(from.osd, send_epoch)) { + psdout(10) << " got info " << oinfo << " from down osd." << from + << " discarding" << dendl; + return false; + } + + psdout(10) << " got osd." << from << " " << oinfo << dendl; + ceph_assert(is_primary()); + peer_info[from] = oinfo; + might_have_unfound.insert(from); + + update_history(oinfo.history); + + // stray? + if (!is_up(from) && !is_acting(from)) { + psdout(10) << " osd." << from << " has stray content: " << oinfo << dendl; + stray_set.insert(from); + if (is_clean()) { + purge_strays(); + } + } + + // was this a new info? if so, update peers! + if (p == peer_info.end()) + update_heartbeat_peers(); + + return true; +} + + +void PeeringState::remove_down_peer_info(const OSDMapRef &osdmap) +{ + // Remove any downed osds from peer_info + bool removed = false; + auto p = peer_info.begin(); + while (p != peer_info.end()) { + if (!osdmap->is_up(p->first.osd)) { + psdout(10) << " dropping down osd." << p->first << " info " << p->second << dendl; + peer_missing.erase(p->first); + peer_log_requested.erase(p->first); + peer_missing_requested.erase(p->first); + peer_info.erase(p++); + removed = true; + } else + ++p; + } + + // Remove any downed osds from peer_purged so we can re-purge if necessary + auto it = peer_purged.begin(); + while (it != peer_purged.end()) { + if (!osdmap->is_up(it->osd)) { + psdout(10) << " dropping down osd." << *it << " from peer_purged" << dendl; + peer_purged.erase(it++); + } else { + ++it; + } + } + + // if we removed anyone, update peers (which include peer_info) + if (removed) + update_heartbeat_peers(); + + check_recovery_sources(osdmap); +} + +void PeeringState::update_heartbeat_peers() +{ + if (!is_primary()) + return; + + set<int> new_peers; + for (unsigned i=0; i<acting.size(); i++) { + if (acting[i] != CRUSH_ITEM_NONE) + new_peers.insert(acting[i]); + } + for (unsigned i=0; i<up.size(); i++) { + if (up[i] != CRUSH_ITEM_NONE) + new_peers.insert(up[i]); + } + for (auto p = peer_info.begin(); p != peer_info.end(); ++p) { + new_peers.insert(p->first.osd); + } + pl->update_heartbeat_peers(std::move(new_peers)); +} + +void PeeringState::write_if_dirty(ObjectStore::Transaction& t) +{ + pl->prepare_write( + info, + last_written_info, + past_intervals, + pg_log, + dirty_info, + dirty_big_info, + last_persisted_osdmap < get_osdmap_epoch(), + t); + if (dirty_info || dirty_big_info) { + last_persisted_osdmap = get_osdmap_epoch(); + last_written_info = info; + dirty_info = false; + dirty_big_info = false; + } +} + +void PeeringState::advance_map( + OSDMapRef osdmap, OSDMapRef lastmap, + vector<int>& newup, int up_primary, + vector<int>& newacting, int acting_primary, + PeeringCtx &rctx) +{ + ceph_assert(lastmap == osdmap_ref); + psdout(10) << "handle_advance_map " + << newup << "/" << newacting + << " -- " << up_primary << "/" << acting_primary + << dendl; + + update_osdmap_ref(osdmap); + pool.update(osdmap); + + AdvMap evt( + osdmap, lastmap, newup, up_primary, + newacting, acting_primary); + handle_event(evt, &rctx); + if (pool.info.last_change == osdmap_ref->get_epoch()) { + pl->on_pool_change(); + } + readable_interval = pool.get_readable_interval(cct->_conf); + last_require_osd_release = osdmap->require_osd_release; +} + +void PeeringState::activate_map(PeeringCtx &rctx) +{ + psdout(10) << __func__ << dendl; + ActMap evt; + handle_event(evt, &rctx); + if (osdmap_ref->get_epoch() - last_persisted_osdmap > + cct->_conf->osd_pg_epoch_persisted_max_stale) { + psdout(20) << __func__ << ": Dirtying info: last_persisted is " + << last_persisted_osdmap + << " while current is " << osdmap_ref->get_epoch() << dendl; + dirty_info = true; + } else { + psdout(20) << __func__ << ": Not dirtying info: last_persisted is " + << last_persisted_osdmap + << " while current is " << osdmap_ref->get_epoch() << dendl; + } + write_if_dirty(rctx.transaction); + + if (get_osdmap()->check_new_blocklist_entries()) { + pl->check_blocklisted_watchers(); + } +} + +void PeeringState::set_last_peering_reset() +{ + psdout(20) << "set_last_peering_reset " << get_osdmap_epoch() << dendl; + if (last_peering_reset != get_osdmap_epoch()) { + last_peering_reset = get_osdmap_epoch(); + psdout(10) << "Clearing blocked outgoing recovery messages" << dendl; + clear_blocked_outgoing(); + if (!pl->try_flush_or_schedule_async()) { + psdout(10) << "Beginning to block outgoing recovery messages" << dendl; + begin_block_outgoing(); + } else { + psdout(10) << "Not blocking outgoing recovery messages" << dendl; + } + } +} + +void PeeringState::complete_flush() +{ + flushes_in_progress--; + if (flushes_in_progress == 0) { + pl->on_flushed(); + } +} + +void PeeringState::check_full_transition(OSDMapRef lastmap, OSDMapRef osdmap) +{ + const pg_pool_t *pi = osdmap->get_pg_pool(info.pgid.pool()); + if (!pi) { + return; // pool deleted + } + bool changed = false; + if (pi->has_flag(pg_pool_t::FLAG_FULL)) { + const pg_pool_t *opi = lastmap->get_pg_pool(info.pgid.pool()); + if (!opi || !opi->has_flag(pg_pool_t::FLAG_FULL)) { + psdout(10) << " pool was marked full in " << osdmap->get_epoch() << dendl; + changed = true; + } + } + if (changed) { + info.history.last_epoch_marked_full = osdmap->get_epoch(); + dirty_info = true; + } +} + +bool PeeringState::should_restart_peering( + int newupprimary, + int newactingprimary, + const vector<int>& newup, + const vector<int>& newacting, + OSDMapRef lastmap, + OSDMapRef osdmap) +{ + if (PastIntervals::is_new_interval( + primary.osd, + newactingprimary, + acting, + newacting, + up_primary.osd, + newupprimary, + up, + newup, + osdmap.get(), + lastmap.get(), + info.pgid.pgid)) { + psdout(20) << "new interval newup " << newup + << " newacting " << newacting << dendl; + return true; + } + if (!lastmap->is_up(pg_whoami.osd) && osdmap->is_up(pg_whoami.osd)) { + psdout(10) << __func__ << " osd transitioned from down -> up" + << dendl; + return true; + } + return false; +} + +/* Called before initializing peering during advance_map */ +void PeeringState::start_peering_interval( + const OSDMapRef lastmap, + const vector<int>& newup, int new_up_primary, + const vector<int>& newacting, int new_acting_primary, + ObjectStore::Transaction &t) +{ + const OSDMapRef osdmap = get_osdmap(); + + set_last_peering_reset(); + + vector<int> oldacting, oldup; + int oldrole = get_role(); + + if (is_primary()) { + pl->clear_ready_to_merge(); + } + + + pg_shard_t old_acting_primary = get_primary(); + pg_shard_t old_up_primary = up_primary; + bool was_old_primary = is_primary(); + bool was_old_nonprimary = is_nonprimary(); + + acting.swap(oldacting); + up.swap(oldup); + init_primary_up_acting( + newup, + newacting, + new_up_primary, + new_acting_primary); + + if (info.stats.up != up || + info.stats.acting != acting || + info.stats.up_primary != new_up_primary || + info.stats.acting_primary != new_acting_primary) { + info.stats.up = up; + info.stats.up_primary = new_up_primary; + info.stats.acting = acting; + info.stats.acting_primary = new_acting_primary; + info.stats.mapping_epoch = osdmap->get_epoch(); + } + + pl->clear_publish_stats(); + + // This will now be remapped during a backfill in cases + // that it would not have been before. + if (up != acting) + state_set(PG_STATE_REMAPPED); + else + state_clear(PG_STATE_REMAPPED); + + int role = osdmap->calc_pg_role(pg_whoami, acting); + set_role(role); + + // did acting, up, primary|acker change? + if (!lastmap) { + psdout(10) << " no lastmap" << dendl; + dirty_info = true; + dirty_big_info = true; + info.history.same_interval_since = osdmap->get_epoch(); + } else { + std::stringstream debug; + ceph_assert(info.history.same_interval_since != 0); + bool new_interval = PastIntervals::check_new_interval( + old_acting_primary.osd, + new_acting_primary, + oldacting, newacting, + old_up_primary.osd, + new_up_primary, + oldup, newup, + info.history.same_interval_since, + info.history.last_epoch_clean, + osdmap.get(), + lastmap.get(), + info.pgid.pgid, + missing_loc.get_recoverable_predicate(), + &past_intervals, + &debug); + psdout(10) << __func__ << ": check_new_interval output: " + << debug.str() << dendl; + if (new_interval) { + if (osdmap->get_epoch() == pl->cluster_osdmap_trim_lower_bound() && + info.history.last_epoch_clean < osdmap->get_epoch()) { + psdout(10) << " map gap, clearing past_intervals and faking" << dendl; + // our information is incomplete and useless; someone else was clean + // after everything we know if osdmaps were trimmed. + past_intervals.clear(); + } else { + psdout(10) << " noting past " << past_intervals << dendl; + } + dirty_info = true; + dirty_big_info = true; + info.history.same_interval_since = osdmap->get_epoch(); + if (osdmap->have_pg_pool(info.pgid.pgid.pool()) && + info.pgid.pgid.is_split(lastmap->get_pg_num(info.pgid.pgid.pool()), + osdmap->get_pg_num(info.pgid.pgid.pool()), + nullptr)) { + info.history.last_epoch_split = osdmap->get_epoch(); + } + } + } + + if (old_up_primary != up_primary || + oldup != up) { + info.history.same_up_since = osdmap->get_epoch(); + } + // this comparison includes primary rank via pg_shard_t + if (old_acting_primary != get_primary()) { + info.history.same_primary_since = osdmap->get_epoch(); + } + + on_new_interval(); + pl->on_info_history_change(); + + psdout(1) << __func__ << " up " << oldup << " -> " << up + << ", acting " << oldacting << " -> " << acting + << ", acting_primary " << old_acting_primary << " -> " + << new_acting_primary + << ", up_primary " << old_up_primary << " -> " << new_up_primary + << ", role " << oldrole << " -> " << role + << ", features acting " << acting_features + << " upacting " << upacting_features + << dendl; + + // deactivate. + state_clear(PG_STATE_ACTIVE); + state_clear(PG_STATE_PEERED); + state_clear(PG_STATE_PREMERGE); + state_clear(PG_STATE_DOWN); + state_clear(PG_STATE_RECOVERY_WAIT); + state_clear(PG_STATE_RECOVERY_TOOFULL); + state_clear(PG_STATE_RECOVERING); + + peer_purged.clear(); + acting_recovery_backfill.clear(); + + // reset primary/replica state? + if (was_old_primary || is_primary()) { + pl->clear_want_pg_temp(); + } else if (was_old_nonprimary || is_nonprimary()) { + pl->clear_want_pg_temp(); + } + clear_primary_state(); + + pl->on_change(t); + + ceph_assert(!deleting); + + // should we tell the primary we are here? + send_notify = !is_primary(); + + if (role != oldrole || + was_old_primary != is_primary()) { + // did primary change? + if (was_old_primary != is_primary()) { + state_clear(PG_STATE_CLEAN); + // queue/dequeue the scrubber + pl->on_primary_status_change(was_old_primary, is_primary()); + } + + pl->on_role_change(); + } else { + // no role change. + // did primary change? + if (get_primary() != old_acting_primary) { + psdout(10) << oldacting << " -> " << acting + << ", acting primary " + << old_acting_primary << " -> " << get_primary() + << dendl; + } else { + // primary is the same. + if (is_primary()) { + // i am (still) primary. but my replica set changed. + state_clear(PG_STATE_CLEAN); + + psdout(10) << oldacting << " -> " << acting + << ", replicas changed" << dendl; + } + } + } + + if (is_primary() && was_old_primary) { + pl->reschedule_scrub(); + } + + if (acting.empty() && !up.empty() && up_primary == pg_whoami) { + psdout(10) << " acting empty, but i am up[0], clearing pg_temp" << dendl; + pl->queue_want_pg_temp(acting); + } +} + +void PeeringState::on_new_interval() +{ + dout(20) << __func__ << dendl; + const OSDMapRef osdmap = get_osdmap(); + + // initialize features + acting_features = CEPH_FEATURES_SUPPORTED_DEFAULT; + upacting_features = CEPH_FEATURES_SUPPORTED_DEFAULT; + for (auto p = acting.begin(); p != acting.end(); ++p) { + if (*p == CRUSH_ITEM_NONE) + continue; + uint64_t f = osdmap->get_xinfo(*p).features; + acting_features &= f; + upacting_features &= f; + } + for (auto p = up.begin(); p != up.end(); ++p) { + if (*p == CRUSH_ITEM_NONE) + continue; + upacting_features &= osdmap->get_xinfo(*p).features; + } + psdout(20) << __func__ << " upacting_features 0x" << std::hex + << upacting_features << std::dec + << " from " << acting << "+" << up << dendl; + + psdout(20) << __func__ << " checking missing set deletes flag. missing = " + << get_pg_log().get_missing() << dendl; + + if (!pg_log.get_missing().may_include_deletes && + !perform_deletes_during_peering()) { + pl->rebuild_missing_set_with_deletes(pg_log); + } + ceph_assert( + pg_log.get_missing().may_include_deletes == + !perform_deletes_during_peering()); + + init_hb_stamps(); + + // update lease bounds for a new interval + auto mnow = pl->get_mnow(); + prior_readable_until_ub = std::max(prior_readable_until_ub, + readable_until_ub); + prior_readable_until_ub = info.history.refresh_prior_readable_until_ub( + mnow, prior_readable_until_ub); + psdout(10) << __func__ << " prior_readable_until_ub " + << prior_readable_until_ub << " (mnow " << mnow << " + " + << info.history.prior_readable_until_ub << ")" << dendl; + prior_readable_down_osds.clear(); // we populate this when we build the priorset + + readable_until = + readable_until_ub = + readable_until_ub_sent = + readable_until_ub_from_primary = ceph::signedspan::zero(); + + acting_readable_until_ub.clear(); + if (is_primary()) { + acting_readable_until_ub.resize(acting.size(), ceph::signedspan::zero()); + } + + pl->on_new_interval(); +} + +void PeeringState::init_primary_up_acting( + const vector<int> &newup, + const vector<int> &newacting, + int new_up_primary, + int new_acting_primary) +{ + actingset.clear(); + acting = newacting; + for (uint8_t i = 0; i < acting.size(); ++i) { + if (acting[i] != CRUSH_ITEM_NONE) + actingset.insert( + pg_shard_t( + acting[i], + pool.info.is_erasure() ? shard_id_t(i) : shard_id_t::NO_SHARD)); + } + upset.clear(); + up = newup; + for (uint8_t i = 0; i < up.size(); ++i) { + if (up[i] != CRUSH_ITEM_NONE) + upset.insert( + pg_shard_t( + up[i], + pool.info.is_erasure() ? shard_id_t(i) : shard_id_t::NO_SHARD)); + } + if (!pool.info.is_erasure()) { + // replicated + up_primary = pg_shard_t(new_up_primary, shard_id_t::NO_SHARD); + primary = pg_shard_t(new_acting_primary, shard_id_t::NO_SHARD); + } else { + // erasure + up_primary = pg_shard_t(); + primary = pg_shard_t(); + for (uint8_t i = 0; i < up.size(); ++i) { + if (up[i] == new_up_primary) { + up_primary = pg_shard_t(up[i], shard_id_t(i)); + break; + } + } + for (uint8_t i = 0; i < acting.size(); ++i) { + if (acting[i] == new_acting_primary) { + primary = pg_shard_t(acting[i], shard_id_t(i)); + break; + } + } + ceph_assert(up_primary.osd == new_up_primary); + ceph_assert(primary.osd == new_acting_primary); + } +} + +void PeeringState::init_hb_stamps() +{ + if (is_primary()) { + // we care about all other osds in the acting set + hb_stamps.resize(acting.size() - 1); + unsigned i = 0; + for (auto p : acting) { + if (p == CRUSH_ITEM_NONE || p == get_primary().osd) { + continue; + } + hb_stamps[i++] = pl->get_hb_stamps(p); + } + hb_stamps.resize(i); + } else if (is_nonprimary()) { + // we care about just the primary + hb_stamps.resize(1); + hb_stamps[0] = pl->get_hb_stamps(get_primary().osd); + } else { + hb_stamps.clear(); + } + dout(10) << __func__ << " now " << hb_stamps << dendl; +} + + +void PeeringState::clear_recovery_state() +{ + async_recovery_targets.clear(); + backfill_targets.clear(); +} + +void PeeringState::clear_primary_state() +{ + psdout(10) << "clear_primary_state" << dendl; + + // clear peering state + stray_set.clear(); + peer_log_requested.clear(); + peer_missing_requested.clear(); + peer_info.clear(); + peer_bytes.clear(); + peer_missing.clear(); + peer_last_complete_ondisk.clear(); + peer_activated.clear(); + min_last_complete_ondisk = eversion_t(); + pg_trim_to = eversion_t(); + might_have_unfound.clear(); + need_up_thru = false; + missing_loc.clear(); + pg_log.reset_recovery_pointers(); + + clear_recovery_state(); + + last_update_ondisk = eversion_t(); + missing_loc.clear(); + pl->clear_primary_state(); +} + +/// return [start,end) bounds for required past_intervals +static pair<epoch_t, epoch_t> get_required_past_interval_bounds( + const pg_info_t &info, + epoch_t oldest_map) { + epoch_t start = std::max( + info.history.last_epoch_clean ? info.history.last_epoch_clean : + info.history.epoch_pool_created, + oldest_map); + epoch_t end = std::max( + info.history.same_interval_since, + info.history.epoch_pool_created); + return make_pair(start, end); +} + + +void PeeringState::check_past_interval_bounds() const +{ + // cluster_osdmap_trim_lower_bound gives us a bound on needed + // intervals, see doc/dev/osd_internals/past_intervals.rst + auto oldest_epoch = pl->cluster_osdmap_trim_lower_bound(); + auto rpib = get_required_past_interval_bounds( + info, + oldest_epoch); + if (rpib.first >= rpib.second) { + // do not warn if the start bound is dictated by oldest_map; the + // past intervals are presumably appropriate given the pg info. + if (!past_intervals.empty() && + rpib.first > oldest_epoch) { + pl->get_clog_error() << info.pgid << " required past_interval bounds are" + << " empty [" << rpib << ") but past_intervals is not: " + << past_intervals; + derr << info.pgid << " required past_interval bounds are" + << " empty [" << rpib << ") but past_intervals is not: " + << past_intervals << dendl; + } + } else { + if (past_intervals.empty()) { + pl->get_clog_error() << info.pgid << " required past_interval bounds are" + << " not empty [" << rpib << ") but past_intervals " + << past_intervals << " is empty"; + derr << info.pgid << " required past_interval bounds are" + << " not empty [" << rpib << ") but past_intervals " + << past_intervals << " is empty" << dendl; + ceph_assert(!past_intervals.empty()); + } + + auto apib = past_intervals.get_bounds(); + if (apib.first > rpib.first) { + pl->get_clog_error() << info.pgid << " past_intervals [" << apib + << ") start interval does not contain the required" + << " bound [" << rpib << ") start"; + derr << info.pgid << " past_intervals [" << apib + << ") start interval does not contain the required" + << " bound [" << rpib << ") start" << dendl; + ceph_abort_msg("past_interval start interval mismatch"); + } + if (apib.second != rpib.second) { + pl->get_clog_error() << info.pgid << " past_interal bound [" << apib + << ") end does not match required [" << rpib + << ") end"; + derr << info.pgid << " past_interal bound [" << apib + << ") end does not match required [" << rpib + << ") end" << dendl; + ceph_abort_msg("past_interval end mismatch"); + } + } +} + +int PeeringState::clamp_recovery_priority(int priority, int pool_recovery_priority, int max) +{ + static_assert(OSD_RECOVERY_PRIORITY_MIN < OSD_RECOVERY_PRIORITY_MAX, "Invalid priority range"); + static_assert(OSD_RECOVERY_PRIORITY_MIN >= 0, "Priority range must match unsigned type"); + + ceph_assert(max <= OSD_RECOVERY_PRIORITY_MAX); + + // User can't set this too high anymore, but might be a legacy value + if (pool_recovery_priority > OSD_POOL_PRIORITY_MAX) + pool_recovery_priority = OSD_POOL_PRIORITY_MAX; + if (pool_recovery_priority < OSD_POOL_PRIORITY_MIN) + pool_recovery_priority = OSD_POOL_PRIORITY_MIN; + // Shift range from min to max to 0 to max - min + pool_recovery_priority += (0 - OSD_POOL_PRIORITY_MIN); + ceph_assert(pool_recovery_priority >= 0 && pool_recovery_priority <= (OSD_POOL_PRIORITY_MAX - OSD_POOL_PRIORITY_MIN)); + + priority += pool_recovery_priority; + + // Clamp to valid range + return std::clamp<int>(priority, OSD_RECOVERY_PRIORITY_MIN, max); +} + +unsigned PeeringState::get_recovery_priority() +{ + // a higher value -> a higher priority + int ret = OSD_RECOVERY_PRIORITY_BASE; + int base = ret; + + if (state & PG_STATE_FORCED_RECOVERY) { + ret = OSD_RECOVERY_PRIORITY_FORCED; + } else { + // XXX: This priority boost isn't so much about inactive, but about data-at-risk + if (is_degraded() && info.stats.avail_no_missing.size() < pool.info.min_size) { + base = OSD_RECOVERY_INACTIVE_PRIORITY_BASE; + // inactive: no. of replicas < min_size, highest priority since it blocks IO + ret = base + (pool.info.min_size - info.stats.avail_no_missing.size()); + } + + int64_t pool_recovery_priority = 0; + pool.info.opts.get(pool_opts_t::RECOVERY_PRIORITY, &pool_recovery_priority); + + ret = clamp_recovery_priority(ret, pool_recovery_priority, max_prio_map[base]); + } + psdout(20) << __func__ << " recovery priority is " << ret << dendl; + return static_cast<unsigned>(ret); +} + +unsigned PeeringState::get_backfill_priority() +{ + // a higher value -> a higher priority + int ret = OSD_BACKFILL_PRIORITY_BASE; + int base = ret; + + if (state & PG_STATE_FORCED_BACKFILL) { + ret = OSD_BACKFILL_PRIORITY_FORCED; + } else { + if (actingset.size() < pool.info.min_size) { + base = OSD_BACKFILL_INACTIVE_PRIORITY_BASE; + // inactive: no. of replicas < min_size, highest priority since it blocks IO + ret = base + (pool.info.min_size - actingset.size()); + + } else if (is_undersized()) { + // undersized: OSD_BACKFILL_DEGRADED_PRIORITY_BASE + num missing replicas + ceph_assert(pool.info.size > actingset.size()); + base = OSD_BACKFILL_DEGRADED_PRIORITY_BASE; + ret = base + (pool.info.size - actingset.size()); + + } else if (is_degraded()) { + // degraded: baseline degraded + base = ret = OSD_BACKFILL_DEGRADED_PRIORITY_BASE; + } + + // Adjust with pool's recovery priority + int64_t pool_recovery_priority = 0; + pool.info.opts.get(pool_opts_t::RECOVERY_PRIORITY, &pool_recovery_priority); + + ret = clamp_recovery_priority(ret, pool_recovery_priority, max_prio_map[base]); + } + + psdout(20) << __func__ << " backfill priority is " << ret << dendl; + return static_cast<unsigned>(ret); +} + +unsigned PeeringState::get_delete_priority() +{ + auto state = get_osdmap()->get_state(pg_whoami.osd); + if (state & (CEPH_OSD_BACKFILLFULL | + CEPH_OSD_FULL)) { + return OSD_DELETE_PRIORITY_FULL; + } else if (state & CEPH_OSD_NEARFULL) { + return OSD_DELETE_PRIORITY_FULLISH; + } else { + return OSD_DELETE_PRIORITY_NORMAL; + } +} + +bool PeeringState::set_force_recovery(bool b) +{ + bool did = false; + if (b) { + if (!(state & PG_STATE_FORCED_RECOVERY) && + (state & (PG_STATE_DEGRADED | + PG_STATE_RECOVERY_WAIT | + PG_STATE_RECOVERING))) { + psdout(20) << __func__ << " set" << dendl; + state_set(PG_STATE_FORCED_RECOVERY); + pl->publish_stats_to_osd(); + did = true; + } + } else if (state & PG_STATE_FORCED_RECOVERY) { + psdout(20) << __func__ << " clear" << dendl; + state_clear(PG_STATE_FORCED_RECOVERY); + pl->publish_stats_to_osd(); + did = true; + } + if (did) { + psdout(20) << __func__ << " state " << get_current_state() + << dendl; + pl->update_local_background_io_priority(get_recovery_priority()); + } + return did; +} + +bool PeeringState::set_force_backfill(bool b) +{ + bool did = false; + if (b) { + if (!(state & PG_STATE_FORCED_BACKFILL) && + (state & (PG_STATE_DEGRADED | + PG_STATE_BACKFILL_WAIT | + PG_STATE_BACKFILLING))) { + psdout(10) << __func__ << " set" << dendl; + state_set(PG_STATE_FORCED_BACKFILL); + pl->publish_stats_to_osd(); + did = true; + } + } else if (state & PG_STATE_FORCED_BACKFILL) { + psdout(10) << __func__ << " clear" << dendl; + state_clear(PG_STATE_FORCED_BACKFILL); + pl->publish_stats_to_osd(); + did = true; + } + if (did) { + psdout(20) << __func__ << " state " << get_current_state() + << dendl; + pl->update_local_background_io_priority(get_backfill_priority()); + } + return did; +} + +void PeeringState::schedule_renew_lease() +{ + pl->schedule_renew_lease( + last_peering_reset, + readable_interval / 2); +} + +void PeeringState::send_lease() +{ + epoch_t epoch = pl->get_osdmap_epoch(); + for (auto peer : actingset) { + if (peer == pg_whoami) { + continue; + } + pl->send_cluster_message( + peer.osd, + TOPNSPC::make_message<MOSDPGLease>(epoch, + spg_t(spgid.pgid, peer.shard), + get_lease()), + epoch); + } +} + +void PeeringState::proc_lease(const pg_lease_t& l) +{ + assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)); + if (!is_nonprimary()) { + psdout(20) << __func__ << " no-op, !nonprimary" << dendl; + return; + } + psdout(10) << __func__ << " " << l << dendl; + if (l.readable_until_ub > readable_until_ub_from_primary) { + readable_until_ub_from_primary = l.readable_until_ub; + } + + ceph::signedspan ru = ceph::signedspan::zero(); + if (l.readable_until != ceph::signedspan::zero() && + hb_stamps[0]->peer_clock_delta_ub) { + ru = l.readable_until - *hb_stamps[0]->peer_clock_delta_ub; + psdout(20) << " peer_clock_delta_ub " << *hb_stamps[0]->peer_clock_delta_ub + << " -> ru " << ru << dendl; + } + if (ru > readable_until) { + readable_until = ru; + psdout(20) << __func__ << " readable_until now " << readable_until << dendl; + // NOTE: if we ever decide to block/queue ops on the replica, + // we'll need to wake them up here. + } + + ceph::signedspan ruub; + if (hb_stamps[0]->peer_clock_delta_lb) { + ruub = l.readable_until_ub - *hb_stamps[0]->peer_clock_delta_lb; + psdout(20) << " peer_clock_delta_lb " << *hb_stamps[0]->peer_clock_delta_lb + << " -> ruub " << ruub << dendl; + } else { + ruub = pl->get_mnow() + l.interval; + psdout(20) << " no peer_clock_delta_lb -> ruub " << ruub << dendl; + } + if (ruub > readable_until_ub) { + readable_until_ub = ruub; + psdout(20) << __func__ << " readable_until_ub now " << readable_until_ub + << dendl; + } +} + +void PeeringState::proc_lease_ack(int from, const pg_lease_ack_t& a) +{ + assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)); + auto now = pl->get_mnow(); + bool was_min = false; + for (unsigned i = 0; i < acting.size(); ++i) { + if (from == acting[i]) { + // the lease_ack value is based on the primary's clock + if (a.readable_until_ub > acting_readable_until_ub[i]) { + if (acting_readable_until_ub[i] == readable_until) { + was_min = true; + } + acting_readable_until_ub[i] = a.readable_until_ub; + break; + } + } + } + if (was_min) { + auto old_ru = readable_until; + recalc_readable_until(); + if (now >= old_ru) { + pl->recheck_readable(); + } + } +} + +void PeeringState::proc_renew_lease() +{ + assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)); + renew_lease(pl->get_mnow()); + send_lease(); + schedule_renew_lease(); +} + +void PeeringState::recalc_readable_until() +{ + assert(is_primary()); + ceph::signedspan min = readable_until_ub_sent; + for (unsigned i = 0; i < acting.size(); ++i) { + if (acting[i] == pg_whoami.osd || acting[i] == CRUSH_ITEM_NONE) { + continue; + } + dout(20) << __func__ << " peer osd." << acting[i] + << " ruub " << acting_readable_until_ub[i] << dendl; + if (acting_readable_until_ub[i] < min) { + min = acting_readable_until_ub[i]; + } + } + readable_until = min; + readable_until_ub = min; + dout(20) << __func__ << " readable_until[_ub] " << readable_until + << " (sent " << readable_until_ub_sent << ")" << dendl; +} + +bool PeeringState::check_prior_readable_down_osds(const OSDMapRef& map) +{ + assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)); + bool changed = false; + auto p = prior_readable_down_osds.begin(); + while (p != prior_readable_down_osds.end()) { + if (map->is_dead(*p)) { + dout(10) << __func__ << " prior_readable_down_osds osd." << *p + << " is dead as of epoch " << map->get_epoch() + << dendl; + p = prior_readable_down_osds.erase(p); + changed = true; + } else { + ++p; + } + } + if (changed && prior_readable_down_osds.empty()) { + psdout(10) << " empty prior_readable_down_osds, clearing ub" << dendl; + clear_prior_readable_until_ub(); + return true; + } + return false; +} + +bool PeeringState::adjust_need_up_thru(const OSDMapRef osdmap) +{ + epoch_t up_thru = osdmap->get_up_thru(pg_whoami.osd); + if (need_up_thru && + up_thru >= info.history.same_interval_since) { + psdout(10) << "adjust_need_up_thru now " + << up_thru << ", need_up_thru now false" << dendl; + need_up_thru = false; + return true; + } + return false; +} + +PastIntervals::PriorSet PeeringState::build_prior() +{ + if (1) { + // sanity check + for (auto it = peer_info.begin(); it != peer_info.end(); ++it) { + ceph_assert(info.history.last_epoch_started >= + it->second.history.last_epoch_started); + } + } + + const OSDMap &osdmap = *get_osdmap(); + PastIntervals::PriorSet prior = past_intervals.get_prior_set( + pool.info.is_erasure(), + info.history.last_epoch_started, + &missing_loc.get_recoverable_predicate(), + [&](epoch_t start, int osd, epoch_t *lost_at) { + const osd_info_t *pinfo = 0; + if (osdmap.exists(osd)) { + pinfo = &osdmap.get_info(osd); + if (lost_at) + *lost_at = pinfo->lost_at; + } + + if (osdmap.is_up(osd)) { + return PastIntervals::UP; + } else if (!pinfo) { + return PastIntervals::DNE; + } else if (pinfo->lost_at > start) { + return PastIntervals::LOST; + } else { + return PastIntervals::DOWN; + } + }, + up, + acting, + dpp); + + if (prior.pg_down) { + state_set(PG_STATE_DOWN); + } + + if (get_osdmap()->get_up_thru(pg_whoami.osd) < + info.history.same_interval_since) { + psdout(10) << "up_thru " << get_osdmap()->get_up_thru(pg_whoami.osd) + << " < same_since " << info.history.same_interval_since + << ", must notify monitor" << dendl; + need_up_thru = true; + } else { + psdout(10) << "up_thru " << get_osdmap()->get_up_thru(pg_whoami.osd) + << " >= same_since " << info.history.same_interval_since + << ", all is well" << dendl; + need_up_thru = false; + } + pl->set_probe_targets(prior.probe); + return prior; +} + +bool PeeringState::needs_recovery() const +{ + ceph_assert(is_primary()); + + auto &missing = pg_log.get_missing(); + + if (missing.num_missing()) { + psdout(10) << __func__ << " primary has " << missing.num_missing() + << " missing" << dendl; + return true; + } + + ceph_assert(!acting_recovery_backfill.empty()); + for (const pg_shard_t& peer : acting_recovery_backfill) { + if (peer == get_primary()) { + continue; + } + auto pm = peer_missing.find(peer); + if (pm == peer_missing.end()) { + psdout(10) << __func__ << " osd." << peer << " doesn't have missing set" + << dendl; + continue; + } + if (pm->second.num_missing()) { + psdout(10) << __func__ << " osd." << peer << " has " + << pm->second.num_missing() << " missing" << dendl; + return true; + } + } + + psdout(10) << __func__ << " is recovered" << dendl; + return false; +} + +bool PeeringState::needs_backfill() const +{ + ceph_assert(is_primary()); + + // We can assume that only possible osds that need backfill + // are on the backfill_targets vector nodes. + for (const pg_shard_t& peer : backfill_targets) { + auto pi = peer_info.find(peer); + ceph_assert(pi != peer_info.end()); + if (!pi->second.last_backfill.is_max()) { + psdout(10) << __func__ << " osd." << peer + << " has last_backfill " << pi->second.last_backfill << dendl; + return true; + } + } + + psdout(10) << __func__ << " does not need backfill" << dendl; + return false; +} + +/* + * Returns true unless there is a non-lost OSD in might_have_unfound. + */ +bool PeeringState::all_unfound_are_queried_or_lost( + const OSDMapRef osdmap) const +{ + ceph_assert(is_primary()); + + auto peer = might_have_unfound.begin(); + auto mend = might_have_unfound.end(); + for (; peer != mend; ++peer) { + if (peer_missing.count(*peer)) + continue; + auto iter = peer_info.find(*peer); + if (iter != peer_info.end() && + (iter->second.is_empty() || iter->second.dne())) + continue; + if (!osdmap->exists(peer->osd)) + continue; + const osd_info_t &osd_info(osdmap->get_info(peer->osd)); + if (osd_info.lost_at <= osd_info.up_from) { + // If there is even one OSD in might_have_unfound that isn't lost, we + // still might retrieve our unfound. + return false; + } + } + psdout(10) << "all_unfound_are_queried_or_lost all of might_have_unfound " + << might_have_unfound + << " have been queried or are marked lost" << dendl; + return true; +} + + +void PeeringState::reject_reservation() +{ + pl->unreserve_recovery_space(); + pl->send_cluster_message( + primary.osd, + TOPNSPC::make_message<MBackfillReserve>( + MBackfillReserve::REJECT_TOOFULL, + spg_t(info.pgid.pgid, primary.shard), + get_osdmap_epoch()), + get_osdmap_epoch()); +} + +/** + * find_best_info + * + * Returns an iterator to the best info in infos sorted by: + * 1) Prefer newer last_update + * 2) Prefer longer tail if it brings another info into contiguity + * 3) Prefer current primary + */ +map<pg_shard_t, pg_info_t>::const_iterator PeeringState::find_best_info( + const map<pg_shard_t, pg_info_t> &infos, + bool restrict_to_up_acting, + bool *history_les_bound) const +{ + ceph_assert(history_les_bound); + /* See doc/dev/osd_internals/last_epoch_started.rst before attempting + * to make changes to this process. Also, make sure to update it + * when you find bugs! */ + epoch_t max_last_epoch_started_found = 0; + for (auto i = infos.begin(); i != infos.end(); ++i) { + if (!cct->_conf->osd_find_best_info_ignore_history_les && + max_last_epoch_started_found < i->second.history.last_epoch_started) { + *history_les_bound = true; + max_last_epoch_started_found = i->second.history.last_epoch_started; + } + if (!i->second.is_incomplete() && + max_last_epoch_started_found < i->second.last_epoch_started) { + *history_les_bound = false; + max_last_epoch_started_found = i->second.last_epoch_started; + } + } + eversion_t min_last_update_acceptable = eversion_t::max(); + for (auto i = infos.begin(); i != infos.end(); ++i) { + if (max_last_epoch_started_found <= i->second.last_epoch_started) { + if (min_last_update_acceptable > i->second.last_update) + min_last_update_acceptable = i->second.last_update; + } + } + if (min_last_update_acceptable == eversion_t::max()) + return infos.end(); + + auto best = infos.end(); + // find osd with newest last_update (oldest for ec_pool). + // if there are multiples, prefer + // - a longer tail, if it brings another peer into log contiguity + // - the current primary + for (auto p = infos.begin(); p != infos.end(); ++p) { + if (restrict_to_up_acting && !is_up(p->first) && + !is_acting(p->first)) + continue; + // Only consider peers with last_update >= min_last_update_acceptable + if (p->second.last_update < min_last_update_acceptable) + continue; + // Disqualify anyone with a too old last_epoch_started + if (p->second.last_epoch_started < max_last_epoch_started_found) + continue; + // Disqualify anyone who is incomplete (not fully backfilled) + if (p->second.is_incomplete()) + continue; + if (best == infos.end()) { + best = p; + continue; + } + // Prefer newer last_update + if (pool.info.require_rollback()) { + if (p->second.last_update > best->second.last_update) + continue; + if (p->second.last_update < best->second.last_update) { + best = p; + continue; + } + } else { + if (p->second.last_update < best->second.last_update) + continue; + if (p->second.last_update > best->second.last_update) { + best = p; + continue; + } + } + + // Prefer longer tail + if (p->second.log_tail > best->second.log_tail) { + continue; + } else if (p->second.log_tail < best->second.log_tail) { + best = p; + continue; + } + + if (!p->second.has_missing() && best->second.has_missing()) { + psdout(10) << __func__ << " prefer osd." << p->first + << " because it is complete while best has missing" + << dendl; + best = p; + continue; + } else if (p->second.has_missing() && !best->second.has_missing()) { + psdout(10) << __func__ << " skipping osd." << p->first + << " because it has missing while best is complete" + << dendl; + continue; + } else { + // both are complete or have missing + // fall through + } + + // prefer current primary (usually the caller), all things being equal + if (p->first == pg_whoami) { + psdout(10) << "calc_acting prefer osd." << p->first + << " because it is current primary" << dendl; + best = p; + continue; + } + } + return best; +} + +void PeeringState::calc_ec_acting( + map<pg_shard_t, pg_info_t>::const_iterator auth_log_shard, + unsigned size, + const vector<int> &acting, + const vector<int> &up, + const map<pg_shard_t, pg_info_t> &all_info, + bool restrict_to_up_acting, + vector<int> *_want, + set<pg_shard_t> *backfill, + set<pg_shard_t> *acting_backfill, + ostream &ss) +{ + vector<int> want(size, CRUSH_ITEM_NONE); + map<shard_id_t, set<pg_shard_t> > all_info_by_shard; + for (auto i = all_info.begin(); + i != all_info.end(); + ++i) { + all_info_by_shard[i->first.shard].insert(i->first); + } + for (uint8_t i = 0; i < want.size(); ++i) { + ss << "For position " << (unsigned)i << ": "; + if (up.size() > (unsigned)i && up[i] != CRUSH_ITEM_NONE && + !all_info.find(pg_shard_t(up[i], shard_id_t(i)))->second.is_incomplete() && + all_info.find(pg_shard_t(up[i], shard_id_t(i)))->second.last_update >= + auth_log_shard->second.log_tail) { + ss << " selecting up[i]: " << pg_shard_t(up[i], shard_id_t(i)) << std::endl; + want[i] = up[i]; + continue; + } + if (up.size() > (unsigned)i && up[i] != CRUSH_ITEM_NONE) { + ss << " backfilling up[i]: " << pg_shard_t(up[i], shard_id_t(i)) + << " and "; + backfill->insert(pg_shard_t(up[i], shard_id_t(i))); + } + + if (acting.size() > (unsigned)i && acting[i] != CRUSH_ITEM_NONE && + !all_info.find(pg_shard_t(acting[i], shard_id_t(i)))->second.is_incomplete() && + all_info.find(pg_shard_t(acting[i], shard_id_t(i)))->second.last_update >= + auth_log_shard->second.log_tail) { + ss << " selecting acting[i]: " << pg_shard_t(acting[i], shard_id_t(i)) << std::endl; + want[i] = acting[i]; + } else if (!restrict_to_up_acting) { + for (auto j = all_info_by_shard[shard_id_t(i)].begin(); + j != all_info_by_shard[shard_id_t(i)].end(); + ++j) { + ceph_assert(j->shard == i); + if (!all_info.find(*j)->second.is_incomplete() && + all_info.find(*j)->second.last_update >= + auth_log_shard->second.log_tail) { + ss << " selecting stray: " << *j << std::endl; + want[i] = j->osd; + break; + } + } + if (want[i] == CRUSH_ITEM_NONE) + ss << " failed to fill position " << (int)i << std::endl; + } + } + + for (uint8_t i = 0; i < want.size(); ++i) { + if (want[i] != CRUSH_ITEM_NONE) { + acting_backfill->insert(pg_shard_t(want[i], shard_id_t(i))); + } + } + acting_backfill->insert(backfill->begin(), backfill->end()); + _want->swap(want); +} + +std::pair<map<pg_shard_t, pg_info_t>::const_iterator, eversion_t> +PeeringState::select_replicated_primary( + map<pg_shard_t, pg_info_t>::const_iterator auth_log_shard, + uint64_t force_auth_primary_missing_objects, + const std::vector<int> &up, + pg_shard_t up_primary, + const map<pg_shard_t, pg_info_t> &all_info, + const OSDMapRef osdmap, + ostream &ss) +{ + pg_shard_t auth_log_shard_id = auth_log_shard->first; + + ss << __func__ << " newest update on osd." << auth_log_shard_id + << " with " << auth_log_shard->second << std::endl; + + // select primary + auto primary = all_info.find(up_primary); + if (up.size() && + !primary->second.is_incomplete() && + primary->second.last_update >= + auth_log_shard->second.log_tail) { + assert(HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS)); + auto approx_missing_objects = + primary->second.stats.stats.sum.num_objects_missing; + auto auth_version = auth_log_shard->second.last_update.version; + auto primary_version = primary->second.last_update.version; + if (auth_version > primary_version) { + approx_missing_objects += auth_version - primary_version; + } else { + approx_missing_objects += primary_version - auth_version; + } + if ((uint64_t)approx_missing_objects > + force_auth_primary_missing_objects) { + primary = auth_log_shard; + ss << "up_primary: " << up_primary << ") has approximate " + << approx_missing_objects + << "(>" << force_auth_primary_missing_objects <<") " + << "missing objects, osd." << auth_log_shard_id + << " selected as primary instead" + << std::endl; + } else { + ss << "up_primary: " << up_primary << ") selected as primary" + << std::endl; + } + } else { + ceph_assert(!auth_log_shard->second.is_incomplete()); + ss << "up[0] needs backfill, osd." << auth_log_shard_id + << " selected as primary instead" << std::endl; + primary = auth_log_shard; + } + + ss << __func__ << " primary is osd." << primary->first + << " with " << primary->second << std::endl; + + /* We include auth_log_shard->second.log_tail because in GetLog, + * we will request logs back to the min last_update over our + * acting_backfill set, which will result in our log being extended + * as far backwards as necessary to pick up any peers which can + * be log recovered by auth_log_shard's log */ + eversion_t oldest_auth_log_entry = + std::min(primary->second.log_tail, auth_log_shard->second.log_tail); + + return std::make_pair(primary, oldest_auth_log_entry); +} + + +/** + * calculate the desired acting set. + * + * Choose an appropriate acting set. Prefer up[0], unless it is + * incomplete, or another osd has a longer tail that allows us to + * bring other up nodes up to date. + */ +void PeeringState::calc_replicated_acting( + map<pg_shard_t, pg_info_t>::const_iterator primary, + eversion_t oldest_auth_log_entry, + unsigned size, + const vector<int> &acting, + const vector<int> &up, + pg_shard_t up_primary, + const map<pg_shard_t, pg_info_t> &all_info, + bool restrict_to_up_acting, + vector<int> *want, + set<pg_shard_t> *backfill, + set<pg_shard_t> *acting_backfill, + const OSDMapRef osdmap, + const PGPool& pool, + ostream &ss) +{ + ss << __func__ << (restrict_to_up_acting ? " restrict_to_up_acting" : "") + << std::endl; + + want->push_back(primary->first.osd); + acting_backfill->insert(primary->first); + + // select replicas that have log contiguity with primary. + // prefer up, then acting, then any peer_info osds + for (auto i : up) { + pg_shard_t up_cand = pg_shard_t(i, shard_id_t::NO_SHARD); + if (up_cand == primary->first) + continue; + const pg_info_t &cur_info = all_info.find(up_cand)->second; + if (cur_info.is_incomplete() || + cur_info.last_update < oldest_auth_log_entry) { + ss << " shard " << up_cand << " (up) backfill " << cur_info << std::endl; + backfill->insert(up_cand); + acting_backfill->insert(up_cand); + } else { + want->push_back(i); + acting_backfill->insert(up_cand); + ss << " osd." << i << " (up) accepted " << cur_info << std::endl; + } + } + + if (want->size() >= size) { + return; + } + + std::vector<std::pair<eversion_t, int>> candidate_by_last_update; + candidate_by_last_update.reserve(acting.size()); + // This no longer has backfill OSDs, but they are covered above. + for (auto i : acting) { + pg_shard_t acting_cand(i, shard_id_t::NO_SHARD); + // skip up osds we already considered above + if (acting_cand == primary->first) + continue; + auto up_it = find(up.begin(), up.end(), i); + if (up_it != up.end()) + continue; + + const pg_info_t &cur_info = all_info.find(acting_cand)->second; + if (cur_info.is_incomplete() || + cur_info.last_update < oldest_auth_log_entry) { + ss << " shard " << acting_cand << " (acting) REJECTED " + << cur_info << std::endl; + } else { + candidate_by_last_update.emplace_back(cur_info.last_update, i); + } + } + + auto sort_by_eversion =[](const std::pair<eversion_t, int> &lhs, + const std::pair<eversion_t, int> &rhs) { + return lhs.first > rhs.first; + }; + // sort by last_update, in descending order. + std::sort(candidate_by_last_update.begin(), + candidate_by_last_update.end(), sort_by_eversion); + for (auto &p: candidate_by_last_update) { + ceph_assert(want->size() < size); + want->push_back(p.second); + pg_shard_t s = pg_shard_t(p.second, shard_id_t::NO_SHARD); + acting_backfill->insert(s); + ss << " shard " << s << " (acting) accepted " + << all_info.find(s)->second << std::endl; + if (want->size() >= size) { + return; + } + } + + if (restrict_to_up_acting) { + return; + } + candidate_by_last_update.clear(); + candidate_by_last_update.reserve(all_info.size()); // overestimate but fine + // continue to search stray to find more suitable peers + for (auto &i : all_info) { + // skip up osds we already considered above + if (i.first == primary->first) + continue; + auto up_it = find(up.begin(), up.end(), i.first.osd); + if (up_it != up.end()) + continue; + auto acting_it = find( + acting.begin(), acting.end(), i.first.osd); + if (acting_it != acting.end()) + continue; + + if (i.second.is_incomplete() || + i.second.last_update < oldest_auth_log_entry) { + ss << " shard " << i.first << " (stray) REJECTED " << i.second + << std::endl; + } else { + candidate_by_last_update.emplace_back( + i.second.last_update, i.first.osd); + } + } + + if (candidate_by_last_update.empty()) { + // save us some effort + return; + } + + // sort by last_update, in descending order. + std::sort(candidate_by_last_update.begin(), + candidate_by_last_update.end(), sort_by_eversion); + + for (auto &p: candidate_by_last_update) { + ceph_assert(want->size() < size); + want->push_back(p.second); + pg_shard_t s = pg_shard_t(p.second, shard_id_t::NO_SHARD); + acting_backfill->insert(s); + ss << " shard " << s << " (stray) accepted " + << all_info.find(s)->second << std::endl; + if (want->size() >= size) { + return; + } + } +} + +// Defines osd preference order: acting set, then larger last_update +using osd_ord_t = std::tuple<bool, eversion_t>; // <acting, last_update> +using osd_id_t = int; + +class bucket_candidates_t { + std::deque<std::pair<osd_ord_t, osd_id_t>> osds; + int selected = 0; + +public: + void add_osd(osd_ord_t ord, osd_id_t osd) { + // osds will be added in smallest to largest order + assert(osds.empty() || osds.back().first <= ord); + osds.push_back(std::make_pair(ord, osd)); + } + osd_id_t pop_osd() { + ceph_assert(!is_empty()); + auto ret = osds.front(); + osds.pop_front(); + return ret.second; + } + + void inc_selected() { selected++; } + unsigned get_num_selected() const { return selected; } + + osd_ord_t get_ord() const { + return osds.empty() ? std::make_tuple(false, eversion_t()) + : osds.front().first; + } + + bool is_empty() const { return osds.empty(); } + + bool operator<(const bucket_candidates_t &rhs) const { + return std::make_tuple(-selected, get_ord()) < + std::make_tuple(-rhs.selected, rhs.get_ord()); + } + + friend std::ostream &operator<<(std::ostream &, const bucket_candidates_t &); +}; + +std::ostream &operator<<(std::ostream &lhs, const bucket_candidates_t &cand) +{ + return lhs << "candidates[" << cand.osds << "]"; +} + +class bucket_heap_t { + using elem_t = std::reference_wrapper<bucket_candidates_t>; + std::vector<elem_t> heap; + + // Max heap -- should emit buckets in order of preference + struct comp { + bool operator()(const elem_t &lhs, const elem_t &rhs) { + return lhs.get() < rhs.get(); + } + }; +public: + void push_if_nonempty(elem_t e) { + if (!e.get().is_empty()) { + heap.push_back(e); + std::push_heap(heap.begin(), heap.end(), comp()); + } + } + elem_t pop() { + std::pop_heap(heap.begin(), heap.end(), comp()); + auto ret = heap.back(); + heap.pop_back(); + return ret; + } + + bool is_empty() const { return heap.empty(); } +}; + +/** + * calc_replicated_acting_stretch + * + * Choose an acting set using as much of the up set as possible; filling + * in the remaining slots so as to maximize the number of crush buckets at + * level pool.info.peering_crush_bucket_barrier represented. + * + * Stretch clusters are a bit special: while they have a "size" the + * same way as normal pools, if we happen to lose a data center + * (we call it a "stretch bucket", but really it'll be a data center or + * a cloud availability zone), we don't actually want to shove + * 2 DC's worth of replication into a single site -- it won't fit! + * So we locally calculate a bucket_max, based + * on the targeted number of stretch buckets for the pool and + * its size. Then we won't pull more than bucket_max from any + * given ancestor even if it leaves us undersized. + + * There are two distinct phases: (commented below) + */ +void PeeringState::calc_replicated_acting_stretch( + map<pg_shard_t, pg_info_t>::const_iterator primary, + eversion_t oldest_auth_log_entry, + unsigned size, + const vector<int> &acting, + const vector<int> &up, + pg_shard_t up_primary, + const map<pg_shard_t, pg_info_t> &all_info, + bool restrict_to_up_acting, + vector<int> *want, + set<pg_shard_t> *backfill, + set<pg_shard_t> *acting_backfill, + const OSDMapRef osdmap, + const PGPool& pool, + ostream &ss) +{ + ceph_assert(want); + ceph_assert(acting_backfill); + ceph_assert(backfill); + ss << __func__ << (restrict_to_up_acting ? " restrict_to_up_acting" : "") + << std::endl; + + auto used = [want](int osd) { + return std::find(want->begin(), want->end(), osd) != want->end(); + }; + + auto usable_info = [&](const auto &cur_info) mutable { + return !(cur_info.is_incomplete() || + cur_info.last_update < oldest_auth_log_entry); + }; + + auto osd_info = [&](int osd) mutable -> const pg_info_t & { + pg_shard_t cand = pg_shard_t(osd, shard_id_t::NO_SHARD); + const pg_info_t &cur_info = all_info.find(cand)->second; + return cur_info; + }; + + auto usable_osd = [&](int osd) mutable { + return usable_info(osd_info(osd)); + }; + + std::map<int, bucket_candidates_t> ancestors; + auto get_ancestor = [&](int osd) mutable { + int ancestor = osdmap->crush->get_parent_of_type( + osd, + pool.info.peering_crush_bucket_barrier, + pool.info.crush_rule); + return &ancestors[ancestor]; + }; + + unsigned bucket_max = pool.info.size / pool.info.peering_crush_bucket_target; + if (bucket_max * pool.info.peering_crush_bucket_target < pool.info.size) { + ++bucket_max; + } + + /* 1) Select all usable osds from the up set as well as the primary + * + * We also stash any unusable osds from up into backfill. + */ + auto add_required = [&](int osd) { + if (!used(osd)) { + want->push_back(osd); + acting_backfill->insert( + pg_shard_t(osd, shard_id_t::NO_SHARD)); + get_ancestor(osd)->inc_selected(); + } + }; + add_required(primary->first.osd); + ss << " osd " << primary->first.osd << " primary accepted " + << osd_info(primary->first.osd) << std::endl; + for (auto upcand: up) { + auto upshard = pg_shard_t(upcand, shard_id_t::NO_SHARD); + auto &curinfo = osd_info(upcand); + if (usable_osd(upcand)) { + ss << " osd " << upcand << " (up) accepted " << curinfo << std::endl; + add_required(upcand); + } else { + ss << " osd " << upcand << " (up) backfill " << curinfo << std::endl; + backfill->insert(upshard); + acting_backfill->insert(upshard); + } + } + + if (want->size() >= pool.info.size) { // non-failed CRUSH mappings are valid + ss << " up set sufficient" << std::endl; + return; + } + ss << " up set insufficient, considering remaining osds" << std::endl; + + /* 2) Fill out remaining slots from usable osds in all_info + * while maximizing the number of ancestor nodes at the + * barrier_id crush level. + */ + { + std::vector<std::pair<osd_ord_t, osd_id_t>> candidates; + /* To do this, we first filter the set of usable osd into an ordered + * list of usable osds + */ + auto get_osd_ord = [&](bool is_acting, const pg_info_t &info) -> osd_ord_t { + return std::make_tuple( + !is_acting /* acting should sort first */, + info.last_update); + }; + for (auto &cand : acting) { + auto &cand_info = osd_info(cand); + if (!used(cand) && usable_info(cand_info)) { + ss << " acting candidate " << cand << " " << cand_info << std::endl; + candidates.push_back(std::make_pair(get_osd_ord(true, cand_info), cand)); + } + } + if (!restrict_to_up_acting) { + for (auto &[cand, info] : all_info) { + if (!used(cand.osd) && usable_info(info) && + (std::find(acting.begin(), acting.end(), cand.osd) + == acting.end())) { + ss << " other candidate " << cand << " " << info << std::endl; + candidates.push_back( + std::make_pair(get_osd_ord(false, info), cand.osd)); + } + } + } + std::sort(candidates.begin(), candidates.end()); + + // We then filter these candidates by ancestor + std::for_each(candidates.begin(), candidates.end(), [&](auto cand) { + get_ancestor(cand.second)->add_osd(cand.first, cand.second); + }); + } + + auto pop_ancestor = [&](auto &ancestor) { + ceph_assert(!ancestor.is_empty()); + auto osd = ancestor.pop_osd(); + + ss << " accepting candidate " << osd << std::endl; + + ceph_assert(!used(osd)); + ceph_assert(usable_osd(osd)); + + want->push_back(osd); + acting_backfill->insert( + pg_shard_t(osd, shard_id_t::NO_SHARD)); + ancestor.inc_selected(); + }; + + /* Next, we use the ancestors map to grab a descendant of the + * peering_crush_mandatory_member if not already represented. + * + * TODO: using 0 here to match other users. Prior to merge, I + * expect that this and other users should instead check against + * CRUSH_ITEM_NONE. + */ + if (pool.info.peering_crush_mandatory_member != CRUSH_ITEM_NONE) { + auto aiter = ancestors.find(pool.info.peering_crush_mandatory_member); + if (aiter != ancestors.end() && + !aiter->second.get_num_selected()) { + ss << " adding required ancestor " << aiter->first << std::endl; + ceph_assert(!aiter->second.is_empty()); // wouldn't exist otherwise + pop_ancestor(aiter->second); + } + } + + /* We then place the ancestors in a heap ordered by fewest selected + * and then by the ordering token of the next osd */ + bucket_heap_t aheap; + std::for_each(ancestors.begin(), ancestors.end(), [&](auto &anc) { + aheap.push_if_nonempty(anc.second); + }); + + /* and pull from this heap until it's empty or we have enough. + * "We have enough" is a sufficient check here for + * stretch_set_can_peer() because our heap sorting always + * pulls from ancestors with the least number of included OSDs, + * so if it is possible to satisfy the bucket_count constraints we + * will do so. + */ + while (!aheap.is_empty() && want->size() < pool.info.size) { + auto next = aheap.pop(); + pop_ancestor(next.get()); + if (next.get().get_num_selected() < bucket_max) { + aheap.push_if_nonempty(next); + } + } + + /* The end result is that we should have as many buckets covered as + * possible while respecting up, the primary selection, + * the pool size (given bucket count constraints), + * and the mandatory member. + */ +} + + +bool PeeringState::recoverable(const vector<int> &want) const +{ + unsigned num_want_acting = 0; + set<pg_shard_t> have; + for (int i = 0; i < (int)want.size(); ++i) { + if (want[i] != CRUSH_ITEM_NONE) { + ++num_want_acting; + have.insert( + pg_shard_t( + want[i], + pool.info.is_erasure() ? shard_id_t(i) : shard_id_t::NO_SHARD)); + } + } + + if (num_want_acting < pool.info.min_size) { + if (!cct->_conf.get_val<bool>("osd_allow_recovery_below_min_size")) { + psdout(10) << __func__ << " failed, recovery below min size not enabled" << dendl; + return false; + } + } + if (missing_loc.get_recoverable_predicate()(have)) { + return true; + } else { + psdout(10) << __func__ << " failed, not recoverable " << dendl; + return false; + } +} + +void PeeringState::choose_async_recovery_ec( + const map<pg_shard_t, pg_info_t> &all_info, + const pg_info_t &auth_info, + vector<int> *want, + set<pg_shard_t> *async_recovery, + const OSDMapRef osdmap) const +{ + set<pair<int, pg_shard_t> > candidates_by_cost; + for (uint8_t i = 0; i < want->size(); ++i) { + if ((*want)[i] == CRUSH_ITEM_NONE) + continue; + + // Considering log entries to recover is accurate enough for + // now. We could use minimum_to_decode_with_cost() later if + // necessary. + pg_shard_t shard_i((*want)[i], shard_id_t(i)); + // do not include strays + if (stray_set.find(shard_i) != stray_set.end()) + continue; + // Do not include an osd that is not up, since choosing it as + // an async_recovery_target will move it out of the acting set. + // This results in it being identified as a stray during peering, + // because it is no longer in the up or acting set. + if (!is_up(shard_i)) + continue; + auto shard_info = all_info.find(shard_i)->second; + // for ec pools we rollback all entries past the authoritative + // last_update *before* activation. This is relatively inexpensive + // compared to recovery, since it is purely local, so treat shards + // past the authoritative last_update the same as those equal to it. + version_t auth_version = auth_info.last_update.version; + version_t candidate_version = shard_info.last_update.version; + assert(HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS)); + auto approx_missing_objects = + shard_info.stats.stats.sum.num_objects_missing; + if (auth_version > candidate_version) { + approx_missing_objects += auth_version - candidate_version; + } + if (static_cast<uint64_t>(approx_missing_objects) > + cct->_conf.get_val<uint64_t>("osd_async_recovery_min_cost")) { + candidates_by_cost.emplace(approx_missing_objects, shard_i); + } + } + + psdout(20) << __func__ << " candidates by cost are: " << candidates_by_cost + << dendl; + + // take out as many osds as we can for async recovery, in order of cost + for (auto rit = candidates_by_cost.rbegin(); + rit != candidates_by_cost.rend(); ++rit) { + pg_shard_t cur_shard = rit->second; + vector<int> candidate_want(*want); + candidate_want[cur_shard.shard.id] = CRUSH_ITEM_NONE; + if (recoverable(candidate_want)) { + want->swap(candidate_want); + async_recovery->insert(cur_shard); + } + } + psdout(20) << __func__ << " result want=" << *want + << " async_recovery=" << *async_recovery << dendl; +} + +void PeeringState::choose_async_recovery_replicated( + const map<pg_shard_t, pg_info_t> &all_info, + const pg_info_t &auth_info, + vector<int> *want, + set<pg_shard_t> *async_recovery, + const OSDMapRef osdmap) const +{ + set<pair<int, pg_shard_t> > candidates_by_cost; + for (auto osd_num : *want) { + pg_shard_t shard_i(osd_num, shard_id_t::NO_SHARD); + // do not include strays + if (stray_set.find(shard_i) != stray_set.end()) + continue; + // Do not include an osd that is not up, since choosing it as + // an async_recovery_target will move it out of the acting set. + // This results in it being identified as a stray during peering, + // because it is no longer in the up or acting set. + if (!is_up(shard_i)) + continue; + auto shard_info = all_info.find(shard_i)->second; + // use the approximate magnitude of the difference in length of + // logs plus historical missing objects as the cost of recovery + version_t auth_version = auth_info.last_update.version; + version_t candidate_version = shard_info.last_update.version; + assert(HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS)); + auto approx_missing_objects = + shard_info.stats.stats.sum.num_objects_missing; + if (auth_version > candidate_version) { + approx_missing_objects += auth_version - candidate_version; + } else { + approx_missing_objects += candidate_version - auth_version; + } + if (static_cast<uint64_t>(approx_missing_objects) > + cct->_conf.get_val<uint64_t>("osd_async_recovery_min_cost")) { + candidates_by_cost.emplace(approx_missing_objects, shard_i); + } + } + + psdout(20) << __func__ << " candidates by cost are: " << candidates_by_cost + << dendl; + // take out as many osds as we can for async recovery, in order of cost + for (auto rit = candidates_by_cost.rbegin(); + rit != candidates_by_cost.rend(); ++rit) { + if (want->size() <= pool.info.min_size) { + break; + } + pg_shard_t cur_shard = rit->second; + vector<int> candidate_want(*want); + for (auto it = candidate_want.begin(); it != candidate_want.end(); ++it) { + if (*it == cur_shard.osd) { + candidate_want.erase(it); + if (pool.info.stretch_set_can_peer(candidate_want, *osdmap, NULL)) { + // if we're in stretch mode, we can only remove the osd if it doesn't + // break peering limits. + want->swap(candidate_want); + async_recovery->insert(cur_shard); + } + break; + } + } + } + + psdout(20) << __func__ << " result want=" << *want + << " async_recovery=" << *async_recovery << dendl; +} + +/** + * choose acting + * + * calculate the desired acting, and request a change with the monitor + * if it differs from the current acting. + * + * if restrict_to_up_acting=true, we filter out anything that's not in + * up/acting. in order to lift this restriction, we need to + * 1) check whether it's worth switching the acting set any time we get + * a new pg info (not just here, when recovery finishes) + * 2) check whether anything in want_acting went down on each new map + * (and, if so, calculate a new want_acting) + * 3) remove the assertion in PG::PeeringState::Active::react(const AdvMap) + * TODO! + */ +bool PeeringState::choose_acting(pg_shard_t &auth_log_shard_id, + bool restrict_to_up_acting, + bool *history_les_bound, + bool request_pg_temp_change_only) +{ + map<pg_shard_t, pg_info_t> all_info(peer_info.begin(), peer_info.end()); + all_info[pg_whoami] = info; + + if (cct->_conf->subsys.should_gather<dout_subsys, 10>()) { + for (auto p = all_info.begin(); p != all_info.end(); ++p) { + psdout(10) << __func__ << " all_info osd." << p->first << " " + << p->second << dendl; + } + } + + auto auth_log_shard = find_best_info(all_info, restrict_to_up_acting, + history_les_bound); + + if (auth_log_shard == all_info.end()) { + if (up != acting) { + psdout(10) << __func__ << " no suitable info found (incomplete backfills?)," + << " reverting to up" << dendl; + want_acting = up; + vector<int> empty; + pl->queue_want_pg_temp(empty); + } else { + psdout(10) << __func__ << " failed" << dendl; + ceph_assert(want_acting.empty()); + } + return false; + } + + ceph_assert(!auth_log_shard->second.is_incomplete()); + auth_log_shard_id = auth_log_shard->first; + + set<pg_shard_t> want_backfill, want_acting_backfill; + vector<int> want; + stringstream ss; + if (pool.info.is_replicated()) { + auto [primary_shard, oldest_log] = select_replicated_primary( + auth_log_shard, + cct->_conf.get_val<uint64_t>( + "osd_force_auth_primary_missing_objects"), + up, + up_primary, + all_info, + get_osdmap(), + ss); + if (pool.info.is_stretch_pool()) { + calc_replicated_acting_stretch( + primary_shard, + oldest_log, + get_osdmap()->get_pg_size(info.pgid.pgid), + acting, + up, + up_primary, + all_info, + restrict_to_up_acting, + &want, + &want_backfill, + &want_acting_backfill, + get_osdmap(), + pool, + ss); + } else { + calc_replicated_acting( + primary_shard, + oldest_log, + get_osdmap()->get_pg_size(info.pgid.pgid), + acting, + up, + up_primary, + all_info, + restrict_to_up_acting, + &want, + &want_backfill, + &want_acting_backfill, + get_osdmap(), + pool, + ss); + } + } else { + calc_ec_acting( + auth_log_shard, + get_osdmap()->get_pg_size(info.pgid.pgid), + acting, + up, + all_info, + restrict_to_up_acting, + &want, + &want_backfill, + &want_acting_backfill, + ss); + } + psdout(10) << ss.str() << dendl; + + if (!recoverable(want)) { + want_acting.clear(); + return false; + } + + set<pg_shard_t> want_async_recovery; + if (HAVE_FEATURE(get_osdmap()->get_up_osd_features(), SERVER_MIMIC)) { + if (pool.info.is_erasure()) { + choose_async_recovery_ec( + all_info, auth_log_shard->second, &want, &want_async_recovery, + get_osdmap()); + } else { + choose_async_recovery_replicated( + all_info, auth_log_shard->second, &want, &want_async_recovery, + get_osdmap()); + } + } + while (want.size() > pool.info.size) { + // async recovery should have taken out as many osds as it can. + // if not, then always evict the last peer + // (will get synchronously recovered later) + psdout(10) << __func__ << " evicting osd." << want.back() + << " from oversized want " << want << dendl; + want.pop_back(); + } + if (want != acting) { + psdout(10) << __func__ << " want " << want << " != acting " << acting + << ", requesting pg_temp change" << dendl; + want_acting = want; + + if (!cct->_conf->osd_debug_no_acting_change) { + if (want_acting == up) { + // There can't be any pending backfill if + // want is the same as crush map up OSDs. + ceph_assert(want_backfill.empty()); + vector<int> empty; + pl->queue_want_pg_temp(empty); + } else + pl->queue_want_pg_temp(want); + } + return false; + } + + if (request_pg_temp_change_only) + return true; + want_acting.clear(); + acting_recovery_backfill = want_acting_backfill; + psdout(10) << "acting_recovery_backfill is " + << acting_recovery_backfill << dendl; + ceph_assert( + backfill_targets.empty() || + backfill_targets == want_backfill); + if (backfill_targets.empty()) { + // Caller is GetInfo + backfill_targets = want_backfill; + } + // Adding !needs_recovery() to let the async_recovery_targets reset after recovery is complete + ceph_assert( + async_recovery_targets.empty() || + async_recovery_targets == want_async_recovery || + !needs_recovery()); + if (async_recovery_targets.empty() || !needs_recovery()) { + async_recovery_targets = want_async_recovery; + } + // Will not change if already set because up would have had to change + // Verify that nothing in backfill is in stray_set + for (auto i = want_backfill.begin(); i != want_backfill.end(); ++i) { + ceph_assert(stray_set.find(*i) == stray_set.end()); + } + psdout(10) << "choose_acting want=" << want << " backfill_targets=" + << want_backfill << " async_recovery_targets=" + << async_recovery_targets << dendl; + return true; +} + +void PeeringState::log_weirdness() +{ + if (pg_log.get_tail() != info.log_tail) + pl->get_clog_error() << info.pgid + << " info mismatch, log.tail " << pg_log.get_tail() + << " != info.log_tail " << info.log_tail; + if (pg_log.get_head() != info.last_update) + pl->get_clog_error() << info.pgid + << " info mismatch, log.head " << pg_log.get_head() + << " != info.last_update " << info.last_update; + + if (!pg_log.get_log().empty()) { + // sloppy check + if ((pg_log.get_log().log.begin()->version <= pg_log.get_tail())) + pl->get_clog_error() << info.pgid + << " log bound mismatch, info (tail,head] (" + << pg_log.get_tail() << "," + << pg_log.get_head() << "]" + << " actual [" + << pg_log.get_log().log.begin()->version << "," + << pg_log.get_log().log.rbegin()->version << "]"; + } + + if (pg_log.get_log().caller_ops.size() > pg_log.get_log().log.size()) { + pl->get_clog_error() << info.pgid + << " caller_ops.size " + << pg_log.get_log().caller_ops.size() + << " > log size " << pg_log.get_log().log.size(); + } +} + +/* + * Process information from a replica to determine if it could have any + * objects that i need. + * + * TODO: if the missing set becomes very large, this could get expensive. + * Instead, we probably want to just iterate over our unfound set. + */ +bool PeeringState::search_for_missing( + const pg_info_t &oinfo, const pg_missing_t &omissing, + pg_shard_t from, + PeeringCtxWrapper &ctx) +{ + uint64_t num_unfound_before = missing_loc.num_unfound(); + bool found_missing = missing_loc.add_source_info( + from, oinfo, omissing, ctx.handle); + if (found_missing && num_unfound_before != missing_loc.num_unfound()) + pl->publish_stats_to_osd(); + // avoid doing this if the peer is empty. This is abit of paranoia + // to avoid doing something rash if add_source_info() above + // incorrectly decided we found something new. (if the peer has + // last_update=0'0 that's impossible.) + if (found_missing && + oinfo.last_update != eversion_t()) { + pg_info_t tinfo(oinfo); + tinfo.pgid.shard = pg_whoami.shard; + ctx.send_info( + from.osd, + spg_t(info.pgid.pgid, from.shard), + get_osdmap_epoch(), // fixme: use lower epoch? + get_osdmap_epoch(), + tinfo); + } + return found_missing; +} + +bool PeeringState::discover_all_missing( + BufferedRecoveryMessages &rctx) +{ + auto &missing = pg_log.get_missing(); + uint64_t unfound = get_num_unfound(); + bool any = false; // did we start any queries + + psdout(10) << __func__ << " " + << missing.num_missing() << " missing, " + << unfound << " unfound" + << dendl; + + auto m = might_have_unfound.begin(); + auto mend = might_have_unfound.end(); + for (; m != mend; ++m) { + pg_shard_t peer(*m); + + if (!get_osdmap()->is_up(peer.osd)) { + psdout(20) << __func__ << " skipping down osd." << peer << dendl; + continue; + } + + if (peer_purged.count(peer)) { + psdout(20) << __func__ << " skipping purged osd." << peer << dendl; + continue; + } + + auto iter = peer_info.find(peer); + if (iter != peer_info.end() && + (iter->second.is_empty() || iter->second.dne())) { + // ignore empty peers + continue; + } + + // If we've requested any of this stuff, the pg_missing_t information + // should be on its way. + // TODO: coalsce requested_* into a single data structure + if (peer_missing.find(peer) != peer_missing.end()) { + psdout(20) << __func__ << ": osd." << peer + << ": we already have pg_missing_t" << dendl; + continue; + } + if (peer_log_requested.find(peer) != peer_log_requested.end()) { + psdout(20) << __func__ << ": osd." << peer + << ": in peer_log_requested" << dendl; + continue; + } + if (peer_missing_requested.find(peer) != peer_missing_requested.end()) { + psdout(20) << __func__ << ": osd." << peer + << ": in peer_missing_requested" << dendl; + continue; + } + + // Request missing + psdout(10) << __func__ << ": osd." << peer << ": requesting pg_missing_t" + << dendl; + peer_missing_requested.insert(peer); + rctx.send_query( + peer.osd, + spg_t(info.pgid.pgid, peer.shard), + pg_query_t( + pg_query_t::FULLLOG, + peer.shard, pg_whoami.shard, + info.history, get_osdmap_epoch())); + any = true; + } + return any; +} + +/* Build the might_have_unfound set. + * + * This is used by the primary OSD during recovery. + * + * This set tracks the OSDs which might have unfound objects that the primary + * OSD needs. As we receive pg_missing_t from each OSD in might_have_unfound, we + * will remove the OSD from the set. + */ +void PeeringState::build_might_have_unfound() +{ + ceph_assert(might_have_unfound.empty()); + ceph_assert(is_primary()); + + psdout(10) << __func__ << dendl; + + check_past_interval_bounds(); + + might_have_unfound = past_intervals.get_might_have_unfound( + pg_whoami, + pool.info.is_erasure()); + + // include any (stray) peers + for (auto p = peer_info.begin(); p != peer_info.end(); ++p) + might_have_unfound.insert(p->first); + + psdout(15) << __func__ << ": built " << might_have_unfound << dendl; +} + +void PeeringState::activate( + ObjectStore::Transaction& t, + epoch_t activation_epoch, + PeeringCtxWrapper &ctx) +{ + ceph_assert(!is_peered()); + + // twiddle pg state + state_clear(PG_STATE_DOWN); + + send_notify = false; + + if (is_primary()) { + // only update primary last_epoch_started if we will go active + if (acting_set_writeable()) { + ceph_assert(cct->_conf->osd_find_best_info_ignore_history_les || + info.last_epoch_started <= activation_epoch); + info.last_epoch_started = activation_epoch; + info.last_interval_started = info.history.same_interval_since; + } + } else if (is_acting(pg_whoami)) { + /* update last_epoch_started on acting replica to whatever the primary sent + * unless it's smaller (could happen if we are going peered rather than + * active, see doc/dev/osd_internals/last_epoch_started.rst) */ + if (info.last_epoch_started < activation_epoch) { + info.last_epoch_started = activation_epoch; + info.last_interval_started = info.history.same_interval_since; + } + } + + auto &missing = pg_log.get_missing(); + + min_last_complete_ondisk = eversion_t(0,0); // we don't know (yet)! + if (is_primary()) { + last_update_ondisk = info.last_update; + } + last_update_applied = info.last_update; + last_rollback_info_trimmed_to_applied = pg_log.get_can_rollback_to(); + + need_up_thru = false; + + // write pg info, log + dirty_info = true; + dirty_big_info = true; // maybe + + pl->schedule_event_on_commit( + t, + std::make_shared<PGPeeringEvent>( + get_osdmap_epoch(), + get_osdmap_epoch(), + ActivateCommitted( + get_osdmap_epoch(), + activation_epoch))); + + // init complete pointer + if (missing.num_missing() == 0) { + psdout(10) << "activate - no missing, moving last_complete " << info.last_complete + << " -> " << info.last_update << dendl; + info.last_complete = info.last_update; + info.stats.stats.sum.num_objects_missing = 0; + pg_log.reset_recovery_pointers(); + } else { + psdout(10) << "activate - not complete, " << missing << dendl; + info.stats.stats.sum.num_objects_missing = missing.num_missing(); + pg_log.activate_not_complete(info); + } + + log_weirdness(); + + if (is_primary()) { + // initialize snap_trimq + interval_set<snapid_t> to_trim; + auto& removed_snaps_queue = get_osdmap()->get_removed_snaps_queue(); + auto p = removed_snaps_queue.find(info.pgid.pgid.pool()); + if (p != removed_snaps_queue.end()) { + dout(20) << "activate - purged_snaps " << info.purged_snaps + << " removed_snaps " << p->second + << dendl; + for (auto q : p->second) { + to_trim.insert(q.first, q.second); + } + } + interval_set<snapid_t> purged; + purged.intersection_of(to_trim, info.purged_snaps); + to_trim.subtract(purged); + + assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)); + renew_lease(pl->get_mnow()); + // do not schedule until we are actually activated + + // adjust purged_snaps: PG may have been inactive while snaps were pruned + // from the removed_snaps_queue in the osdmap. update local purged_snaps + // reflect only those snaps that we thought were pruned and were still in + // the queue. + info.purged_snaps.swap(purged); + + // start up replicas + if (prior_readable_down_osds.empty()) { + dout(10) << __func__ << " no prior_readable_down_osds to wait on, clearing ub" + << dendl; + clear_prior_readable_until_ub(); + } + info.history.refresh_prior_readable_until_ub(pl->get_mnow(), + prior_readable_until_ub); + + ceph_assert(!acting_recovery_backfill.empty()); + for (auto i = acting_recovery_backfill.begin(); + i != acting_recovery_backfill.end(); + ++i) { + if (*i == pg_whoami) continue; + pg_shard_t peer = *i; + ceph_assert(peer_info.count(peer)); + pg_info_t& pi = peer_info[peer]; + + psdout(10) << "activate peer osd." << peer << " " << pi << dendl; + + #if defined(WITH_SEASTAR) + MURef<MOSDPGLog> m; + #else + MRef<MOSDPGLog> m; + #endif + ceph_assert(peer_missing.count(peer)); + pg_missing_t& pm = peer_missing[peer]; + + bool needs_past_intervals = pi.dne(); + + // Save num_bytes for backfill reservation request, can't be negative + peer_bytes[peer] = std::max<int64_t>(0, pi.stats.stats.sum.num_bytes); + + if (pi.last_update == info.last_update) { + // empty log + if (!pi.last_backfill.is_max()) + pl->get_clog_info() << info.pgid << " continuing backfill to osd." + << peer + << " from (" << pi.log_tail << "," << pi.last_update + << "] " << pi.last_backfill + << " to " << info.last_update; + if (!pi.is_empty()) { + psdout(10) << "activate peer osd." << peer + << " is up to date, queueing in pending_activators" << dendl; + ctx.send_info( + peer.osd, + spg_t(info.pgid.pgid, peer.shard), + get_osdmap_epoch(), // fixme: use lower epoch? + get_osdmap_epoch(), + info, + get_lease()); + } else { + psdout(10) << "activate peer osd." << peer + << " is up to date, but sending pg_log anyway" << dendl; + m = TOPNSPC::make_message<MOSDPGLog>( + i->shard, pg_whoami.shard, + get_osdmap_epoch(), info, + last_peering_reset); + } + } else if ( + pg_log.get_tail() > pi.last_update || + pi.last_backfill == hobject_t() || + (backfill_targets.count(*i) && pi.last_backfill.is_max())) { + /* ^ This last case covers a situation where a replica is not contiguous + * with the auth_log, but is contiguous with this replica. Reshuffling + * the active set to handle this would be tricky, so instead we just go + * ahead and backfill it anyway. This is probably preferrable in any + * case since the replica in question would have to be significantly + * behind. + */ + // backfill + pl->get_clog_debug() << info.pgid << " starting backfill to osd." << peer + << " from (" << pi.log_tail << "," << pi.last_update + << "] " << pi.last_backfill + << " to " << info.last_update; + + pi.last_update = info.last_update; + pi.last_complete = info.last_update; + pi.set_last_backfill(hobject_t()); + pi.last_epoch_started = info.last_epoch_started; + pi.last_interval_started = info.last_interval_started; + pi.history = info.history; + pi.hit_set = info.hit_set; + pi.stats.stats.clear(); + pi.stats.stats.sum.num_bytes = peer_bytes[peer]; + + // initialize peer with our purged_snaps. + pi.purged_snaps = info.purged_snaps; + + m = TOPNSPC::make_message<MOSDPGLog>( + i->shard, pg_whoami.shard, + get_osdmap_epoch(), pi, + last_peering_reset /* epoch to create pg at */); + + // send some recent log, so that op dup detection works well. + m->log.copy_up_to(cct, pg_log.get_log(), + cct->_conf->osd_max_pg_log_entries); + m->info.log_tail = m->log.tail; + pi.log_tail = m->log.tail; // sigh... + + pm.clear(); + } else { + // catch up + ceph_assert(pg_log.get_tail() <= pi.last_update); + m = TOPNSPC::make_message<MOSDPGLog>( + i->shard, pg_whoami.shard, + get_osdmap_epoch(), info, + last_peering_reset /* epoch to create pg at */); + // send new stuff to append to replicas log + m->log.copy_after(cct, pg_log.get_log(), pi.last_update); + } + + // share past_intervals if we are creating the pg on the replica + // based on whether our info for that peer was dne() *before* + // updating pi.history in the backfill block above. + if (m && needs_past_intervals) + m->past_intervals = past_intervals; + + // update local version of peer's missing list! + if (m && pi.last_backfill != hobject_t()) { + for (auto p = m->log.log.begin(); p != m->log.log.end(); ++p) { + if (p->soid <= pi.last_backfill && + !p->is_error()) { + if (perform_deletes_during_peering() && p->is_delete()) { + pm.rm(p->soid, p->version); + } else { + pm.add_next_event(*p); + } + } + } + } + + if (m) { + dout(10) << "activate peer osd." << peer << " sending " << m->log + << dendl; + m->lease = get_lease(); + pl->send_cluster_message(peer.osd, std::move(m), get_osdmap_epoch()); + } + + // peer now has + pi.last_update = info.last_update; + + // update our missing + if (pm.num_missing() == 0) { + pi.last_complete = pi.last_update; + psdout(10) << "activate peer osd." << peer << " " << pi + << " uptodate" << dendl; + } else { + psdout(10) << "activate peer osd." << peer << " " << pi + << " missing " << pm << dendl; + } + } + + // Set up missing_loc + set<pg_shard_t> complete_shards; + for (auto i = acting_recovery_backfill.begin(); + i != acting_recovery_backfill.end(); + ++i) { + psdout(20) << __func__ << " setting up missing_loc from shard " << *i + << " " << dendl; + if (*i == get_primary()) { + missing_loc.add_active_missing(missing); + if (!missing.have_missing()) + complete_shards.insert(*i); + } else { + auto peer_missing_entry = peer_missing.find(*i); + ceph_assert(peer_missing_entry != peer_missing.end()); + missing_loc.add_active_missing(peer_missing_entry->second); + if (!peer_missing_entry->second.have_missing() && + peer_info[*i].last_backfill.is_max()) + complete_shards.insert(*i); + } + } + + // If necessary, create might_have_unfound to help us find our unfound objects. + // NOTE: It's important that we build might_have_unfound before trimming the + // past intervals. + might_have_unfound.clear(); + if (needs_recovery()) { + // If only one shard has missing, we do a trick to add all others as recovery + // source, this is considered safe since the PGLogs have been merged locally, + // and covers vast majority of the use cases, like one OSD/host is down for + // a while for hardware repairing + if (complete_shards.size() + 1 == acting_recovery_backfill.size()) { + missing_loc.add_batch_sources_info(complete_shards, ctx.handle); + } else { + missing_loc.add_source_info(pg_whoami, info, pg_log.get_missing(), + ctx.handle); + for (auto i = acting_recovery_backfill.begin(); + i != acting_recovery_backfill.end(); + ++i) { + if (*i == pg_whoami) continue; + psdout(10) << __func__ << ": adding " << *i << " as a source" << dendl; + ceph_assert(peer_missing.count(*i)); + ceph_assert(peer_info.count(*i)); + missing_loc.add_source_info( + *i, + peer_info[*i], + peer_missing[*i], + ctx.handle); + } + } + for (auto i = peer_missing.begin(); i != peer_missing.end(); ++i) { + if (is_acting_recovery_backfill(i->first)) + continue; + ceph_assert(peer_info.count(i->first)); + search_for_missing( + peer_info[i->first], + i->second, + i->first, + ctx); + } + + build_might_have_unfound(); + + // Always call now so update_calc_stats() will be accurate + discover_all_missing(ctx.msgs); + + } + + // num_objects_degraded if calculated should reflect this too, unless no + // missing and we are about to go clean. + if (get_osdmap()->get_pg_size(info.pgid.pgid) > actingset.size()) { + state_set(PG_STATE_UNDERSIZED); + } + + state_set(PG_STATE_ACTIVATING); + pl->on_activate(std::move(to_trim)); + } + if (acting_set_writeable()) { + PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)}; + pg_log.roll_forward(rollbacker.get()); + } +} + +void PeeringState::share_pg_info() +{ + psdout(10) << "share_pg_info" << dendl; + + info.history.refresh_prior_readable_until_ub(pl->get_mnow(), + prior_readable_until_ub); + + // share new pg_info_t with replicas + ceph_assert(!acting_recovery_backfill.empty()); + for (auto pg_shard : acting_recovery_backfill) { + if (pg_shard == pg_whoami) continue; + if (auto peer = peer_info.find(pg_shard); peer != peer_info.end()) { + peer->second.last_epoch_started = info.last_epoch_started; + peer->second.last_interval_started = info.last_interval_started; + peer->second.history.merge(info.history); + } + auto m = TOPNSPC::make_message<MOSDPGInfo2>(spg_t{info.pgid.pgid, pg_shard.shard}, + info, + get_osdmap_epoch(), + get_osdmap_epoch(), + std::optional<pg_lease_t>{get_lease()}, + std::nullopt); + pl->send_cluster_message(pg_shard.osd, std::move(m), get_osdmap_epoch()); + } +} + +void PeeringState::merge_log( + ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t&& olog, + pg_shard_t from) +{ + PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)}; + pg_log.merge_log( + oinfo, std::move(olog), from, info, rollbacker.get(), + dirty_info, dirty_big_info); +} + +void PeeringState::rewind_divergent_log( + ObjectStore::Transaction& t, eversion_t newhead) +{ + PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)}; + pg_log.rewind_divergent_log( + newhead, info, rollbacker.get(), dirty_info, dirty_big_info); +} + + +void PeeringState::proc_primary_info( + ObjectStore::Transaction &t, const pg_info_t &oinfo) +{ + ceph_assert(!is_primary()); + + update_history(oinfo.history); + if (!info.stats.stats_invalid && info.stats.stats.sum.num_scrub_errors) { + info.stats.stats.sum.num_scrub_errors = 0; + info.stats.stats.sum.num_shallow_scrub_errors = 0; + info.stats.stats.sum.num_deep_scrub_errors = 0; + dirty_info = true; + } + + if (!(info.purged_snaps == oinfo.purged_snaps)) { + psdout(10) << __func__ << " updating purged_snaps to " + << oinfo.purged_snaps + << dendl; + info.purged_snaps = oinfo.purged_snaps; + dirty_info = true; + dirty_big_info = true; + } +} + +void PeeringState::proc_master_log( + ObjectStore::Transaction& t, pg_info_t &oinfo, + pg_log_t&& olog, pg_missing_t&& omissing, pg_shard_t from) +{ + psdout(10) << "proc_master_log for osd." << from << ": " + << olog << " " << omissing << dendl; + ceph_assert(!is_peered() && is_primary()); + + // merge log into our own log to build master log. no need to + // make any adjustments to their missing map; we are taking their + // log to be authoritative (i.e., their entries are by definitely + // non-divergent). + merge_log(t, oinfo, std::move(olog), from); + peer_info[from] = oinfo; + psdout(10) << " peer osd." << from << " now " << oinfo + << " " << omissing << dendl; + might_have_unfound.insert(from); + + // See doc/dev/osd_internals/last_epoch_started + if (oinfo.last_epoch_started > info.last_epoch_started) { + info.last_epoch_started = oinfo.last_epoch_started; + dirty_info = true; + } + if (oinfo.last_interval_started > info.last_interval_started) { + info.last_interval_started = oinfo.last_interval_started; + dirty_info = true; + } + update_history(oinfo.history); + ceph_assert(cct->_conf->osd_find_best_info_ignore_history_les || + info.last_epoch_started >= info.history.last_epoch_started); + + peer_missing[from].claim(std::move(omissing)); +} + +void PeeringState::proc_replica_log( + pg_info_t &oinfo, + const pg_log_t &olog, + pg_missing_t&& omissing, + pg_shard_t from) +{ + psdout(10) << "proc_replica_log for osd." << from << ": " + << oinfo << " " << olog << " " << omissing << dendl; + + pg_log.proc_replica_log(oinfo, olog, omissing, from); + + peer_info[from] = oinfo; + psdout(10) << " peer osd." << from << " now " + << oinfo << " " << omissing << dendl; + might_have_unfound.insert(from); + + for (auto i = omissing.get_items().begin(); + i != omissing.get_items().end(); + ++i) { + psdout(20) << " after missing " << i->first + << " need " << i->second.need + << " have " << i->second.have << dendl; + } + peer_missing[from].claim(std::move(omissing)); +} + +void PeeringState::fulfill_info( + pg_shard_t from, const pg_query_t &query, + pair<pg_shard_t, pg_info_t> ¬ify_info) +{ + ceph_assert(from == primary); + ceph_assert(query.type == pg_query_t::INFO); + + // info + psdout(10) << "sending info" << dendl; + notify_info = make_pair(from, info); +} + +void PeeringState::fulfill_log( + pg_shard_t from, const pg_query_t &query, epoch_t query_epoch) +{ + psdout(10) << "log request from " << from << dendl; + ceph_assert(from == primary); + ceph_assert(query.type != pg_query_t::INFO); + + auto mlog = TOPNSPC::make_message<MOSDPGLog>( + from.shard, pg_whoami.shard, + get_osdmap_epoch(), + info, query_epoch); + mlog->missing = pg_log.get_missing(); + + // primary -> other, when building master log + if (query.type == pg_query_t::LOG) { + psdout(10) << " sending info+missing+log since " << query.since + << dendl; + if (query.since != eversion_t() && query.since < pg_log.get_tail()) { + pl->get_clog_error() << info.pgid << " got broken pg_query_t::LOG since " + << query.since + << " when my log.tail is " << pg_log.get_tail() + << ", sending full log instead"; + mlog->log = pg_log.get_log(); // primary should not have requested this!! + } else + mlog->log.copy_after(cct, pg_log.get_log(), query.since); + } + else if (query.type == pg_query_t::FULLLOG) { + psdout(10) << " sending info+missing+full log" << dendl; + mlog->log = pg_log.get_log(); + } + + psdout(10) << " sending " << mlog->log << " " << mlog->missing << dendl; + + pl->send_cluster_message(from.osd, std::move(mlog), get_osdmap_epoch(), true); +} + +void PeeringState::fulfill_query(const MQuery& query, PeeringCtxWrapper &rctx) +{ + if (query.query.type == pg_query_t::INFO) { + pair<pg_shard_t, pg_info_t> notify_info; + // note this refreshes our prior_readable_until_ub value + update_history(query.query.history); + fulfill_info(query.from, query.query, notify_info); + rctx.send_notify( + notify_info.first.osd, + pg_notify_t( + notify_info.first.shard, pg_whoami.shard, + query.query_epoch, + get_osdmap_epoch(), + notify_info.second, + past_intervals)); + } else { + update_history(query.query.history); + fulfill_log(query.from, query.query, query.query_epoch); + } +} + +void PeeringState::try_mark_clean() +{ + if (actingset.size() == get_osdmap()->get_pg_size(info.pgid.pgid)) { + state_clear(PG_STATE_FORCED_BACKFILL | PG_STATE_FORCED_RECOVERY); + state_set(PG_STATE_CLEAN); + info.history.last_epoch_clean = get_osdmap_epoch(); + info.history.last_interval_clean = info.history.same_interval_since; + past_intervals.clear(); + dirty_big_info = true; + dirty_info = true; + } + + if (!is_active() && is_peered()) { + if (is_clean()) { + bool target; + if (pool.info.is_pending_merge(info.pgid.pgid, &target)) { + if (target) { + psdout(10) << "ready to merge (target)" << dendl; + pl->set_ready_to_merge_target( + info.last_update, + info.history.last_epoch_started, + info.history.last_epoch_clean); + } else { + psdout(10) << "ready to merge (source)" << dendl; + pl->set_ready_to_merge_source(info.last_update); + } + } + } else { + psdout(10) << "not clean, not ready to merge" << dendl; + // we should have notified OSD in Active state entry point + } + } + + state_clear(PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL); + + share_pg_info(); + pl->publish_stats_to_osd(); + clear_recovery_state(); +} + +void PeeringState::split_into( + pg_t child_pgid, PeeringState *child, unsigned split_bits) +{ + child->update_osdmap_ref(get_osdmap()); + child->pool = pool; + + // Log + pg_log.split_into(child_pgid, split_bits, &(child->pg_log)); + child->info.last_complete = info.last_complete; + + info.last_update = pg_log.get_head(); + child->info.last_update = child->pg_log.get_head(); + + child->info.last_user_version = info.last_user_version; + + info.log_tail = pg_log.get_tail(); + child->info.log_tail = child->pg_log.get_tail(); + + // reset last_complete, we might have modified pg_log & missing above + pg_log.reset_complete_to(&info); + child->pg_log.reset_complete_to(&child->info); + + // Info + child->info.history = info.history; + child->info.history.epoch_created = get_osdmap_epoch(); + child->info.purged_snaps = info.purged_snaps; + + if (info.last_backfill.is_max()) { + child->info.set_last_backfill(hobject_t::get_max()); + } else { + // restart backfill on parent and child to be safe. we could + // probably do better in the bitwise sort case, but it's more + // fragile (there may be special work to do on backfill completion + // in the future). + info.set_last_backfill(hobject_t()); + child->info.set_last_backfill(hobject_t()); + // restarting backfill implies that the missing set is empty, + // since it is only used for objects prior to last_backfill + pg_log.reset_backfill(); + child->pg_log.reset_backfill(); + } + + child->info.stats = info.stats; + child->info.stats.parent_split_bits = split_bits; + info.stats.stats_invalid = true; + child->info.stats.stats_invalid = true; + child->info.stats.objects_trimmed = 0; + child->info.stats.snaptrim_duration = 0.0; + child->info.last_epoch_started = info.last_epoch_started; + child->info.last_interval_started = info.last_interval_started; + + // There can't be recovery/backfill going on now + int primary, up_primary; + vector<int> newup, newacting; + get_osdmap()->pg_to_up_acting_osds( + child->info.pgid.pgid, &newup, &up_primary, &newacting, &primary); + child->init_primary_up_acting( + newup, + newacting, + up_primary, + primary); + child->role = OSDMap::calc_pg_role(pg_whoami, child->acting); + + // this comparison includes primary rank via pg_shard_t + if (get_primary() != child->get_primary()) + child->info.history.same_primary_since = get_osdmap_epoch(); + + child->info.stats.up = newup; + child->info.stats.up_primary = up_primary; + child->info.stats.acting = newacting; + child->info.stats.acting_primary = primary; + child->info.stats.mapping_epoch = get_osdmap_epoch(); + + // History + child->past_intervals = past_intervals; + + child->on_new_interval(); + + child->send_notify = !child->is_primary(); + + child->dirty_info = true; + child->dirty_big_info = true; + dirty_info = true; + dirty_big_info = true; +} + +void PeeringState::merge_from( + map<spg_t,PeeringState *>& sources, + PeeringCtx &rctx, + unsigned split_bits, + const pg_merge_meta_t& last_pg_merge_meta) +{ + bool incomplete = false; + if (info.last_complete != info.last_update || + info.is_incomplete() || + info.dne()) { + psdout(10) << __func__ << " target incomplete" << dendl; + incomplete = true; + } + if (last_pg_merge_meta.source_pgid != pg_t()) { + if (info.pgid.pgid != last_pg_merge_meta.source_pgid.get_parent()) { + psdout(10) << __func__ << " target doesn't match expected parent " + << last_pg_merge_meta.source_pgid.get_parent() + << " of source_pgid " << last_pg_merge_meta.source_pgid + << dendl; + incomplete = true; + } + if (info.last_update != last_pg_merge_meta.target_version) { + psdout(10) << __func__ << " target version doesn't match expected " + << last_pg_merge_meta.target_version << dendl; + incomplete = true; + } + } + + PGLog::LogEntryHandlerRef handler{pl->get_log_handler(rctx.transaction)}; + pg_log.roll_forward(handler.get()); + + info.last_complete = info.last_update; // to fake out trim() + pg_log.reset_recovery_pointers(); + pg_log.trim(info.last_update, info); + + vector<PGLog*> log_from; + for (auto& i : sources) { + auto& source = i.second; + if (!source) { + psdout(10) << __func__ << " source " << i.first << " missing" << dendl; + incomplete = true; + continue; + } + if (source->info.last_complete != source->info.last_update || + source->info.is_incomplete() || + source->info.dne()) { + psdout(10) << __func__ << " source " << source->pg_whoami + << " incomplete" + << dendl; + incomplete = true; + } + if (last_pg_merge_meta.source_pgid != pg_t()) { + if (source->info.pgid.pgid != last_pg_merge_meta.source_pgid) { + dout(10) << __func__ << " source " << source->info.pgid.pgid + << " doesn't match expected source pgid " + << last_pg_merge_meta.source_pgid << dendl; + incomplete = true; + } + if (source->info.last_update != last_pg_merge_meta.source_version) { + dout(10) << __func__ << " source version doesn't match expected " + << last_pg_merge_meta.target_version << dendl; + incomplete = true; + } + } + + // prepare log + PGLog::LogEntryHandlerRef handler{ + source->pl->get_log_handler(rctx.transaction)}; + source->pg_log.roll_forward(handler.get()); + source->info.last_complete = source->info.last_update; // to fake out trim() + source->pg_log.reset_recovery_pointers(); + source->pg_log.trim(source->info.last_update, source->info); + log_from.push_back(&source->pg_log); + + // combine stats + info.stats.add(source->info.stats); + + // pull up last_update + info.last_update = std::max(info.last_update, source->info.last_update); + + // adopt source's PastIntervals if target has none. we can do this since + // pgp_num has been reduced prior to the merge, so the OSD mappings for + // the PGs are identical. + if (past_intervals.empty() && !source->past_intervals.empty()) { + psdout(10) << __func__ << " taking source's past_intervals" << dendl; + past_intervals = source->past_intervals; + } + } + + info.last_complete = info.last_update; + info.log_tail = info.last_update; + if (incomplete) { + info.last_backfill = hobject_t(); + } + + // merge logs + pg_log.merge_from(log_from, info.last_update); + + // make sure we have a meaningful last_epoch_started/clean (if we were a + // placeholder) + if (info.history.epoch_created == 0) { + // start with (a) source's history, since these PGs *should* have been + // remapped in concert with each other... + info.history = sources.begin()->second->info.history; + + // we use the last_epoch_{started,clean} we got from + // the caller, which are the epochs that were reported by the PGs were + // found to be ready for merge. + info.history.last_epoch_clean = last_pg_merge_meta.last_epoch_clean; + info.history.last_epoch_started = last_pg_merge_meta.last_epoch_started; + info.last_epoch_started = last_pg_merge_meta.last_epoch_started; + psdout(10) << __func__ + << " set les/c to " << last_pg_merge_meta.last_epoch_started << "/" + << last_pg_merge_meta.last_epoch_clean + << " from pool last_dec_*, source pg history was " + << sources.begin()->second->info.history + << dendl; + + // above we have pulled down source's history and we need to check + // history.epoch_created again to confirm that source is not a placeholder + // too. (peering requires a sane history.same_interval_since value for any + // non-newly created pg and below here we know we are basically iterating + // back a series of past maps to fake a merge process, hence we need to + // fix history.same_interval_since first so that start_peering_interval() + // will not complain) + if (info.history.epoch_created == 0) { + dout(10) << __func__ << " both merge target and source are placeholders," + << " set sis to lec " << info.history.last_epoch_clean + << dendl; + info.history.same_interval_since = info.history.last_epoch_clean; + } + + // if the past_intervals start is later than last_epoch_clean, it + // implies the source repeered again but the target didn't, or + // that the source became clean in a later epoch than the target. + // avoid the discrepancy but adjusting the interval start + // backwards to match so that check_past_interval_bounds() will + // not complain. + auto pib = past_intervals.get_bounds(); + if (info.history.last_epoch_clean < pib.first) { + psdout(10) << __func__ << " last_epoch_clean " + << info.history.last_epoch_clean << " < past_interval start " + << pib.first << ", adjusting start backwards" << dendl; + past_intervals.adjust_start_backwards(info.history.last_epoch_clean); + } + + // Similarly, if the same_interval_since value is later than + // last_epoch_clean, the next interval change will result in a + // past_interval start that is later than last_epoch_clean. This + // can happen if we use the pg_history values from the merge + // source. Adjust the same_interval_since value backwards if that + // happens. (We trust the les and lec values more because they came from + // the real target, whereas the history value we stole from the source.) + if (info.history.last_epoch_started < info.history.same_interval_since) { + psdout(10) << __func__ << " last_epoch_started " + << info.history.last_epoch_started << " < same_interval_since " + << info.history.same_interval_since + << ", adjusting pg_history backwards" << dendl; + info.history.same_interval_since = info.history.last_epoch_clean; + // make sure same_{up,primary}_since are <= same_interval_since + info.history.same_up_since = std::min( + info.history.same_up_since, info.history.same_interval_since); + info.history.same_primary_since = std::min( + info.history.same_primary_since, info.history.same_interval_since); + } + } + + dirty_info = true; + dirty_big_info = true; +} + +void PeeringState::start_split_stats( + const set<spg_t>& childpgs, vector<object_stat_sum_t> *out) +{ + out->resize(childpgs.size() + 1); + info.stats.stats.sum.split(*out); +} + +void PeeringState::finish_split_stats( + const object_stat_sum_t& stats, ObjectStore::Transaction &t) +{ + info.stats.stats.sum = stats; + write_if_dirty(t); +} + +void PeeringState::update_blocked_by() +{ + // set a max on the number of blocking peers we report. if we go + // over, report a random subset. keep the result sorted. + unsigned keep = std::min<unsigned>( + blocked_by.size(), cct->_conf->osd_max_pg_blocked_by); + unsigned skip = blocked_by.size() - keep; + info.stats.blocked_by.clear(); + info.stats.blocked_by.resize(keep); + unsigned pos = 0; + for (auto p = blocked_by.begin(); p != blocked_by.end() && keep > 0; ++p) { + if (skip > 0 && (rand() % (skip + keep) < skip)) { + --skip; + } else { + info.stats.blocked_by[pos++] = *p; + --keep; + } + } +} + +static bool find_shard(const set<pg_shard_t> & pgs, shard_id_t shard) +{ + for (auto&p : pgs) + if (p.shard == shard) + return true; + return false; +} + +static pg_shard_t get_another_shard(const set<pg_shard_t> & pgs, pg_shard_t skip, shard_id_t shard) +{ + for (auto&p : pgs) { + if (p == skip) + continue; + if (p.shard == shard) + return p; + } + return pg_shard_t(); +} + +void PeeringState::update_calc_stats() +{ + info.stats.version = info.last_update; + info.stats.created = info.history.epoch_created; + info.stats.last_scrub = info.history.last_scrub; + info.stats.last_scrub_stamp = info.history.last_scrub_stamp; + info.stats.last_deep_scrub = info.history.last_deep_scrub; + info.stats.last_deep_scrub_stamp = info.history.last_deep_scrub_stamp; + info.stats.last_clean_scrub_stamp = info.history.last_clean_scrub_stamp; + info.stats.last_epoch_clean = info.history.last_epoch_clean; + + info.stats.log_size = pg_log.get_head().version - pg_log.get_tail().version; + info.stats.log_dups_size = pg_log.get_log().dups.size(); + info.stats.ondisk_log_size = info.stats.log_size; + info.stats.log_start = pg_log.get_tail(); + info.stats.ondisk_log_start = pg_log.get_tail(); + info.stats.snaptrimq_len = pl->get_snap_trimq_size(); + + unsigned num_shards = get_osdmap()->get_pg_size(info.pgid.pgid); + + // In rare case that upset is too large (usually transient), use as target + // for calculations below. + unsigned target = std::max(num_shards, (unsigned)upset.size()); + // For undersized actingset may be larger with OSDs out + unsigned nrep = std::max(actingset.size(), upset.size()); + // calc num_object_copies + info.stats.stats.calc_copies(std::max(target, nrep)); + info.stats.stats.sum.num_objects_degraded = 0; + info.stats.stats.sum.num_objects_unfound = 0; + info.stats.stats.sum.num_objects_misplaced = 0; + info.stats.avail_no_missing.clear(); + info.stats.object_location_counts.clear(); + + // We should never hit this condition, but if end up hitting it, + // make sure to update num_objects and set PG_STATE_INCONSISTENT. + if (info.stats.stats.sum.num_objects < 0) { + psdout(0) << __func__ << " negative num_objects = " + << info.stats.stats.sum.num_objects << " setting it to 0 " + << dendl; + info.stats.stats.sum.num_objects = 0; + state_set(PG_STATE_INCONSISTENT); + } + + if ((is_remapped() || is_undersized() || !is_clean()) && + (is_peered()|| is_activating())) { + psdout(20) << __func__ << " actingset " << actingset << " upset " + << upset << " acting_recovery_backfill " << acting_recovery_backfill << dendl; + + ceph_assert(!acting_recovery_backfill.empty()); + + bool estimate = false; + + // NOTE: we only generate degraded, misplaced and unfound + // values for the summation, not individual stat categories. + int64_t num_objects = info.stats.stats.sum.num_objects; + + // Objects missing from up nodes, sorted by # objects. + boost::container::flat_set<pair<int64_t,pg_shard_t>> missing_target_objects; + // Objects missing from nodes not in up, sort by # objects + boost::container::flat_set<pair<int64_t,pg_shard_t>> acting_source_objects; + + // Fill missing_target_objects/acting_source_objects + + { + int64_t missing; + + // Primary first + missing = pg_log.get_missing().num_missing(); + ceph_assert(acting_recovery_backfill.count(pg_whoami)); + if (upset.count(pg_whoami)) { + missing_target_objects.emplace(missing, pg_whoami); + } else { + acting_source_objects.emplace(missing, pg_whoami); + } + info.stats.stats.sum.num_objects_missing_on_primary = missing; + if (missing == 0) + info.stats.avail_no_missing.push_back(pg_whoami); + psdout(20) << __func__ << " shard " << pg_whoami + << " primary objects " << num_objects + << " missing " << missing + << dendl; + } + + // All other peers + for (auto& peer : peer_info) { + // Primary should not be in the peer_info, skip if it is. + if (peer.first == pg_whoami) continue; + int64_t missing = 0; + int64_t peer_num_objects = + std::max((int64_t)0, peer.second.stats.stats.sum.num_objects); + // Backfill targets always track num_objects accurately + // all other peers track missing accurately. + if (is_backfill_target(peer.first)) { + missing = std::max((int64_t)0, num_objects - peer_num_objects); + } else { + if (peer_missing.count(peer.first)) { + missing = peer_missing[peer.first].num_missing(); + } else { + psdout(20) << __func__ << " no peer_missing found for " + << peer.first << dendl; + if (is_recovering()) { + estimate = true; + } + missing = std::max((int64_t)0, num_objects - peer_num_objects); + } + } + if (upset.count(peer.first)) { + missing_target_objects.emplace(missing, peer.first); + } else if (actingset.count(peer.first)) { + acting_source_objects.emplace(missing, peer.first); + } + peer.second.stats.stats.sum.num_objects_missing = missing; + if (missing == 0) + info.stats.avail_no_missing.push_back(peer.first); + psdout(20) << __func__ << " shard " << peer.first + << " objects " << peer_num_objects + << " missing " << missing + << dendl; + } + + // Compute object_location_counts + for (auto& ml: missing_loc.get_missing_locs()) { + info.stats.object_location_counts[ml.second]++; + psdout(30) << __func__ << " " << ml.first << " object_location_counts[" + << ml.second << "]=" << info.stats.object_location_counts[ml.second] + << dendl; + } + int64_t not_missing = num_objects - missing_loc.get_missing_locs().size(); + if (not_missing) { + // During recovery we know upset == actingset and is being populated + // During backfill we know that all non-missing objects are in the actingset + info.stats.object_location_counts[actingset] = not_missing; + } + psdout(30) << __func__ << " object_location_counts[" + << upset << "]=" << info.stats.object_location_counts[upset] + << dendl; + psdout(20) << __func__ << " object_location_counts " + << info.stats.object_location_counts << dendl; + + // A misplaced object is not stored on the correct OSD + int64_t misplaced = 0; + // a degraded objects has fewer replicas or EC shards than the pool specifies. + int64_t degraded = 0; + + if (is_recovering()) { + for (auto& sml: missing_loc.get_missing_by_count()) { + for (auto& ml: sml.second) { + int missing_shards; + if (sml.first == shard_id_t::NO_SHARD) { + psdout(20) << __func__ << " ml " << ml.second + << " upset size " << upset.size() + << " up " << ml.first.up << dendl; + missing_shards = (int)upset.size() - ml.first.up; + } else { + // Handle shards not even in upset below + if (!find_shard(upset, sml.first)) + continue; + missing_shards = std::max(0, 1 - ml.first.up); + psdout(20) << __func__ + << " shard " << sml.first + << " ml " << ml.second + << " missing shards " << missing_shards << dendl; + } + int odegraded = ml.second * missing_shards; + // Copies on other osds but limited to the possible degraded + int more_osds = std::min(missing_shards, ml.first.other); + int omisplaced = ml.second * more_osds; + ceph_assert(omisplaced <= odegraded); + odegraded -= omisplaced; + + misplaced += omisplaced; + degraded += odegraded; + } + } + + psdout(20) << __func__ << " missing based degraded " + << degraded << dendl; + psdout(20) << __func__ << " missing based misplaced " + << misplaced << dendl; + + // Handle undersized case + if (pool.info.is_replicated()) { + // Add degraded for missing targets (num_objects missing) + ceph_assert(target >= upset.size()); + unsigned needed = target - upset.size(); + degraded += num_objects * needed; + } else { + for (unsigned i = 0 ; i < num_shards; ++i) { + shard_id_t shard(i); + + if (!find_shard(upset, shard)) { + pg_shard_t pgs = get_another_shard(actingset, pg_shard_t(), shard); + + if (pgs != pg_shard_t()) { + int64_t missing; + + if (pgs == pg_whoami) + missing = info.stats.stats.sum.num_objects_missing_on_primary; + else + missing = peer_info[pgs].stats.stats.sum.num_objects_missing; + + degraded += missing; + misplaced += std::max((int64_t)0, num_objects - missing); + } else { + // No shard anywhere + degraded += num_objects; + } + } + } + } + goto out; + } + + // Handle undersized case + if (pool.info.is_replicated()) { + // Add to missing_target_objects + ceph_assert(target >= missing_target_objects.size()); + unsigned needed = target - missing_target_objects.size(); + if (needed) + missing_target_objects.emplace(num_objects * needed, pg_shard_t(pg_shard_t::NO_OSD)); + } else { + for (unsigned i = 0 ; i < num_shards; ++i) { + shard_id_t shard(i); + bool found = false; + for (const auto& t : missing_target_objects) { + if (std::get<1>(t).shard == shard) { + found = true; + break; + } + } + if (!found) + missing_target_objects.emplace(num_objects, pg_shard_t(pg_shard_t::NO_OSD,shard)); + } + } + + for (const auto& item : missing_target_objects) + psdout(20) << __func__ << " missing shard " << std::get<1>(item) + << " missing= " << std::get<0>(item) << dendl; + for (const auto& item : acting_source_objects) + psdout(20) << __func__ << " acting shard " << std::get<1>(item) + << " missing= " << std::get<0>(item) << dendl; + + // Handle all objects not in missing for remapped + // or backfill + for (auto m = missing_target_objects.rbegin(); + m != missing_target_objects.rend(); ++m) { + + int64_t extra_missing = -1; + + if (pool.info.is_replicated()) { + if (!acting_source_objects.empty()) { + auto extra_copy = acting_source_objects.begin(); + extra_missing = std::get<0>(*extra_copy); + acting_source_objects.erase(extra_copy); + } + } else { // Erasure coded + // Use corresponding shard + for (const auto& a : acting_source_objects) { + if (std::get<1>(a).shard == std::get<1>(*m).shard) { + extra_missing = std::get<0>(a); + acting_source_objects.erase(a); + break; + } + } + } + + if (extra_missing >= 0 && std::get<0>(*m) >= extra_missing) { + // We don't know which of the objects on the target + // are part of extra_missing so assume are all degraded. + misplaced += std::get<0>(*m) - extra_missing; + degraded += extra_missing; + } else { + // 1. extra_missing == -1, more targets than sources so degraded + // 2. extra_missing > std::get<0>(m), so that we know that some extra_missing + // previously degraded are now present on the target. + degraded += std::get<0>(*m); + } + } + // If there are still acting that haven't been accounted for + // then they are misplaced + for (const auto& a : acting_source_objects) { + int64_t extra_misplaced = std::max((int64_t)0, num_objects - std::get<0>(a)); + psdout(20) << __func__ << " extra acting misplaced " << extra_misplaced + << dendl; + misplaced += extra_misplaced; + } +out: + // NOTE: Tests use these messages to verify this code + psdout(20) << __func__ << " degraded " << degraded + << (estimate ? " (est)": "") << dendl; + psdout(20) << __func__ << " misplaced " << misplaced + << (estimate ? " (est)": "")<< dendl; + + info.stats.stats.sum.num_objects_degraded = degraded; + info.stats.stats.sum.num_objects_unfound = get_num_unfound(); + info.stats.stats.sum.num_objects_misplaced = misplaced; + } +} + +std::optional<pg_stat_t> PeeringState::prepare_stats_for_publish( + const std::optional<pg_stat_t> &pg_stats_publish, + const object_stat_collection_t &unstable_stats) +{ + if (info.stats.stats.sum.num_scrub_errors) { + psdout(10) << __func__ << " inconsistent due to " << + info.stats.stats.sum.num_scrub_errors << " scrub errors" << dendl; + state_set(PG_STATE_INCONSISTENT); + } else { + state_clear(PG_STATE_INCONSISTENT); + state_clear(PG_STATE_FAILED_REPAIR); + } + + utime_t now = ceph_clock_now(); + if (info.stats.state != state) { + info.stats.last_change = now; + // Optimistic estimation, if we just find out an inactive PG, + // assume it is active till now. + if (!(state & PG_STATE_ACTIVE) && + (info.stats.state & PG_STATE_ACTIVE)) + info.stats.last_active = now; + + if ((state & PG_STATE_ACTIVE) && + !(info.stats.state & PG_STATE_ACTIVE)) + info.stats.last_became_active = now; + if ((state & (PG_STATE_ACTIVE|PG_STATE_PEERED)) && + !(info.stats.state & (PG_STATE_ACTIVE|PG_STATE_PEERED))) + info.stats.last_became_peered = now; + info.stats.state = state; + } + + update_calc_stats(); + if (info.stats.stats.sum.num_objects_degraded) { + state_set(PG_STATE_DEGRADED); + } else { + state_clear(PG_STATE_DEGRADED); + } + update_blocked_by(); + + pg_stat_t pre_publish = info.stats; + pre_publish.stats.add(unstable_stats); + utime_t cutoff = now; + cutoff -= cct->_conf->osd_pg_stat_report_interval_max; + + // share (some of) our purged_snaps via the pg_stats. limit # of intervals + // because we don't want to make the pg_stat_t structures too expensive. + unsigned max = cct->_conf->osd_max_snap_prune_intervals_per_epoch; + unsigned num = 0; + auto i = info.purged_snaps.begin(); + while (num < max && i != info.purged_snaps.end()) { + pre_publish.purged_snaps.insert(i.get_start(), i.get_len()); + ++num; + ++i; + } + psdout(20) << __func__ << " reporting purged_snaps " + << pre_publish.purged_snaps << dendl; + + if (pg_stats_publish && pre_publish == *pg_stats_publish && + info.stats.last_fresh > cutoff) { + psdout(15) << "publish_stats_to_osd " << pg_stats_publish->reported_epoch + << ": no change since " << info.stats.last_fresh << dendl; + return std::nullopt; + } else { + // update our stat summary and timestamps + info.stats.reported_epoch = get_osdmap_epoch(); + ++info.stats.reported_seq; + + info.stats.last_fresh = now; + + if (info.stats.state & PG_STATE_CLEAN) + info.stats.last_clean = now; + if (info.stats.state & PG_STATE_ACTIVE) + info.stats.last_active = now; + if (info.stats.state & (PG_STATE_ACTIVE|PG_STATE_PEERED)) + info.stats.last_peered = now; + info.stats.last_unstale = now; + if ((info.stats.state & PG_STATE_DEGRADED) == 0) + info.stats.last_undegraded = now; + if ((info.stats.state & PG_STATE_UNDERSIZED) == 0) + info.stats.last_fullsized = now; + + psdout(15) << "publish_stats_to_osd " << pre_publish.reported_epoch + << ":" << pre_publish.reported_seq << dendl; + return std::make_optional(std::move(pre_publish)); + } +} + +void PeeringState::init( + int role, + const vector<int>& newup, int new_up_primary, + const vector<int>& newacting, int new_acting_primary, + const pg_history_t& history, + const PastIntervals& pi, + ObjectStore::Transaction &t) +{ + psdout(10) << "init role " << role << " up " + << newup << " acting " << newacting + << " history " << history + << " past_intervals " << pi + << dendl; + + set_role(role); + init_primary_up_acting( + newup, + newacting, + new_up_primary, + new_acting_primary); + + info.history = history; + past_intervals = pi; + + info.stats.up = up; + info.stats.up_primary = new_up_primary; + info.stats.acting = acting; + info.stats.acting_primary = new_acting_primary; + info.stats.mapping_epoch = info.history.same_interval_since; + + if (!perform_deletes_during_peering()) { + pg_log.set_missing_may_contain_deletes(); + } + + on_new_interval(); + + dirty_info = true; + dirty_big_info = true; + write_if_dirty(t); +} + +void PeeringState::dump_peering_state(Formatter *f) +{ + f->dump_string("state", get_pg_state_string()); + f->dump_unsigned("epoch", get_osdmap_epoch()); + f->open_array_section("up"); + for (auto p = up.begin(); p != up.end(); ++p) + f->dump_unsigned("osd", *p); + f->close_section(); + f->open_array_section("acting"); + for (auto p = acting.begin(); p != acting.end(); ++p) + f->dump_unsigned("osd", *p); + f->close_section(); + if (!backfill_targets.empty()) { + f->open_array_section("backfill_targets"); + for (auto p = backfill_targets.begin(); p != backfill_targets.end(); ++p) + f->dump_stream("shard") << *p; + f->close_section(); + } + if (!async_recovery_targets.empty()) { + f->open_array_section("async_recovery_targets"); + for (auto p = async_recovery_targets.begin(); + p != async_recovery_targets.end(); + ++p) + f->dump_stream("shard") << *p; + f->close_section(); + } + if (!acting_recovery_backfill.empty()) { + f->open_array_section("acting_recovery_backfill"); + for (auto p = acting_recovery_backfill.begin(); + p != acting_recovery_backfill.end(); + ++p) + f->dump_stream("shard") << *p; + f->close_section(); + } + f->open_object_section("info"); + update_calc_stats(); + info.dump(f); + f->close_section(); + + f->open_array_section("peer_info"); + for (auto p = peer_info.begin(); p != peer_info.end(); ++p) { + f->open_object_section("info"); + f->dump_stream("peer") << p->first; + p->second.dump(f); + f->close_section(); + } + f->close_section(); +} + +void PeeringState::update_stats( + std::function<bool(pg_history_t &, pg_stat_t &)> f, + ObjectStore::Transaction *t) { + if (f(info.history, info.stats)) { + pl->publish_stats_to_osd(); + } + pl->reschedule_scrub(); + + if (t) { + dirty_info = true; + write_if_dirty(*t); + } +} + +void PeeringState::update_stats_wo_resched( + std::function<void(pg_history_t &, pg_stat_t &)> f) +{ + f(info.history, info.stats); +} + +bool PeeringState::append_log_entries_update_missing( + const mempool::osd_pglog::list<pg_log_entry_t> &entries, + ObjectStore::Transaction &t, std::optional<eversion_t> trim_to, + std::optional<eversion_t> roll_forward_to) +{ + ceph_assert(!entries.empty()); + ceph_assert(entries.begin()->version > info.last_update); + + PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)}; + bool invalidate_stats = + pg_log.append_new_log_entries( + info.last_backfill, + entries, + rollbacker.get()); + + if (roll_forward_to && entries.rbegin()->soid > info.last_backfill) { + pg_log.roll_forward(rollbacker.get()); + } + if (roll_forward_to && *roll_forward_to > pg_log.get_can_rollback_to()) { + pg_log.roll_forward_to(*roll_forward_to, rollbacker.get()); + last_rollback_info_trimmed_to_applied = *roll_forward_to; + } + + info.last_update = pg_log.get_head(); + + if (pg_log.get_missing().num_missing() == 0) { + // advance last_complete since nothing else is missing! + info.last_complete = info.last_update; + } + info.stats.stats_invalid = info.stats.stats_invalid || invalidate_stats; + + psdout(20) << __func__ << " trim_to bool = " << bool(trim_to) + << " trim_to = " << (trim_to ? *trim_to : eversion_t()) << dendl; + if (trim_to) + pg_log.trim(*trim_to, info); + dirty_info = true; + write_if_dirty(t); + return invalidate_stats; +} + +void PeeringState::merge_new_log_entries( + const mempool::osd_pglog::list<pg_log_entry_t> &entries, + ObjectStore::Transaction &t, + std::optional<eversion_t> trim_to, + std::optional<eversion_t> roll_forward_to) +{ + psdout(10) << __func__ << " " << entries << dendl; + ceph_assert(is_primary()); + + bool rebuild_missing = append_log_entries_update_missing(entries, t, trim_to, roll_forward_to); + for (auto i = acting_recovery_backfill.begin(); + i != acting_recovery_backfill.end(); + ++i) { + pg_shard_t peer(*i); + if (peer == pg_whoami) continue; + ceph_assert(peer_missing.count(peer)); + ceph_assert(peer_info.count(peer)); + pg_missing_t& pmissing(peer_missing[peer]); + psdout(20) << __func__ << " peer_missing for " << peer + << " = " << pmissing << dendl; + pg_info_t& pinfo(peer_info[peer]); + bool invalidate_stats = PGLog::append_log_entries_update_missing( + pinfo.last_backfill, + entries, + true, + NULL, + pmissing, + NULL, + dpp); + pinfo.last_update = info.last_update; + pinfo.stats.stats_invalid = pinfo.stats.stats_invalid || invalidate_stats; + rebuild_missing = rebuild_missing || invalidate_stats; + } + + if (!rebuild_missing) { + return; + } + + for (auto &&i: entries) { + missing_loc.rebuild( + i.soid, + pg_whoami, + acting_recovery_backfill, + info, + pg_log.get_missing(), + peer_missing, + peer_info); + } +} + +void PeeringState::add_log_entry(const pg_log_entry_t& e, bool applied) +{ + // raise last_complete only if we were previously up to date + if (info.last_complete == info.last_update) + info.last_complete = e.version; + + // raise last_update. + ceph_assert(e.version > info.last_update); + info.last_update = e.version; + + // raise user_version, if it increased (it may have not get bumped + // by all logged updates) + if (e.user_version > info.last_user_version) + info.last_user_version = e.user_version; + + // log mutation + pg_log.add(e, applied); + psdout(10) << "add_log_entry " << e << dendl; +} + + +void PeeringState::append_log( + vector<pg_log_entry_t>&& logv, + eversion_t trim_to, + eversion_t roll_forward_to, + eversion_t mlcod, + ObjectStore::Transaction &t, + bool transaction_applied, + bool async) +{ + /* The primary has sent an info updating the history, but it may not + * have arrived yet. We want to make sure that we cannot remember this + * write without remembering that it happened in an interval which went + * active in epoch history.last_epoch_started. + */ + if (info.last_epoch_started != info.history.last_epoch_started) { + info.history.last_epoch_started = info.last_epoch_started; + } + if (info.last_interval_started != info.history.last_interval_started) { + info.history.last_interval_started = info.last_interval_started; + } + psdout(10) << "append_log " << pg_log.get_log() << " " << logv << dendl; + + PGLog::LogEntryHandlerRef handler{pl->get_log_handler(t)}; + if (!transaction_applied) { + /* We must be a backfill or async recovery peer, so it's ok if we apply + * out-of-turn since we won't be considered when + * determining a min possible last_update. + * + * We skip_rollforward() here, which advances the crt, without + * doing an actual rollforward. This avoids cleaning up entries + * from the backend and we do not end up in a situation, where the + * object is deleted before we can _merge_object_divergent_entries(). + */ + pg_log.skip_rollforward(); + } + + for (auto p = logv.begin(); p != logv.end(); ++p) { + add_log_entry(*p, transaction_applied); + + /* We don't want to leave the rollforward artifacts around + * here past last_backfill. It's ok for the same reason as + * above */ + if (transaction_applied && + p->soid > info.last_backfill) { + pg_log.roll_forward(handler.get()); + } + } + if (transaction_applied && roll_forward_to > pg_log.get_can_rollback_to()) { + pg_log.roll_forward_to( + roll_forward_to, + handler.get()); + last_rollback_info_trimmed_to_applied = roll_forward_to; + } + + psdout(10) << __func__ << " approx pg log length = " + << pg_log.get_log().approx_size() << dendl; + psdout(10) << __func__ << " dups pg log length = " + << pg_log.get_log().dups.size() << dendl; + psdout(10) << __func__ << " transaction_applied = " + << transaction_applied << dendl; + if (!transaction_applied || async) + psdout(10) << __func__ << " " << pg_whoami + << " is async_recovery or backfill target" << dendl; + pg_log.trim(trim_to, info, transaction_applied, async); + + // update the local pg, pg log + dirty_info = true; + write_if_dirty(t); + + if (!is_primary()) + min_last_complete_ondisk = mlcod; +} + +void PeeringState::recover_got( + const hobject_t &oid, eversion_t v, + bool is_delete, + ObjectStore::Transaction &t) +{ + if (v > pg_log.get_can_rollback_to()) { + /* This can only happen during a repair, and even then, it would + * be one heck of a race. If we are repairing the object, the + * write in question must be fully committed, so it's not valid + * to roll it back anyway (and we'll be rolled forward shortly + * anyway) */ + PGLog::LogEntryHandlerRef handler{pl->get_log_handler(t)}; + pg_log.roll_forward_to(v, handler.get()); + } + + psdout(10) << "got missing " << oid << " v " << v << dendl; + pg_log.recover_got(oid, v, info); + if (pg_log.get_log().log.empty()) { + psdout(10) << "last_complete now " << info.last_complete + << " while log is empty" << dendl; + } else if (pg_log.get_log().complete_to != pg_log.get_log().log.end()) { + psdout(10) << "last_complete now " << info.last_complete + << " log.complete_to " << pg_log.get_log().complete_to->version + << dendl; + } else { + psdout(10) << "last_complete now " << info.last_complete + << " log.complete_to at end" << dendl; + //below is not true in the repair case. + //assert(missing.num_missing() == 0); // otherwise, complete_to was wrong. + ceph_assert(info.last_complete == info.last_update); + } + + if (is_primary()) { + ceph_assert(missing_loc.needs_recovery(oid)); + if (!is_delete) + missing_loc.add_location(oid, pg_whoami); + } + + // update pg + dirty_info = true; + write_if_dirty(t); +} + +void PeeringState::update_backfill_progress( + const hobject_t &updated_backfill, + const pg_stat_t &updated_stats, + bool preserve_local_num_bytes, + ObjectStore::Transaction &t) { + info.set_last_backfill(updated_backfill); + if (preserve_local_num_bytes) { + psdout(25) << __func__ << " primary " << updated_stats.stats.sum.num_bytes + << " local " << info.stats.stats.sum.num_bytes << dendl; + int64_t bytes = info.stats.stats.sum.num_bytes; + info.stats = updated_stats; + info.stats.stats.sum.num_bytes = bytes; + } else { + psdout(20) << __func__ << " final " << updated_stats.stats.sum.num_bytes + << " replaces local " << info.stats.stats.sum.num_bytes << dendl; + info.stats = updated_stats; + } + + dirty_info = true; + write_if_dirty(t); +} + +void PeeringState::adjust_purged_snaps( + std::function<void(interval_set<snapid_t> &snaps)> f) { + f(info.purged_snaps); + dirty_info = true; + dirty_big_info = true; +} + +void PeeringState::on_peer_recover( + pg_shard_t peer, + const hobject_t &soid, + const eversion_t &version) +{ + pl->publish_stats_to_osd(); + // done! + peer_missing[peer].got(soid, version); + missing_loc.add_location(soid, peer); +} + +void PeeringState::begin_peer_recover( + pg_shard_t peer, + const hobject_t soid) +{ + peer_missing[peer].revise_have(soid, eversion_t()); +} + +void PeeringState::force_object_missing( + const set<pg_shard_t> &peers, + const hobject_t &soid, + eversion_t version) +{ + for (auto &&peer : peers) { + if (peer != primary) { + peer_missing[peer].add(soid, version, eversion_t(), false); + } else { + pg_log.missing_add(soid, version, eversion_t()); + pg_log.reset_complete_to(&info); + pg_log.set_last_requested(0); + } + } + + missing_loc.rebuild( + soid, + pg_whoami, + acting_recovery_backfill, + info, + pg_log.get_missing(), + peer_missing, + peer_info); +} + +void PeeringState::pre_submit_op( + const hobject_t &hoid, + const vector<pg_log_entry_t>& logv, + eversion_t at_version) +{ + if (at_version > eversion_t()) { + for (auto &&i : get_acting_recovery_backfill()) { + if (i == primary) continue; + pg_info_t &pinfo = peer_info[i]; + // keep peer_info up to date + if (pinfo.last_complete == pinfo.last_update) + pinfo.last_complete = at_version; + pinfo.last_update = at_version; + } + } + + bool requires_missing_loc = false; + for (auto &&i : get_async_recovery_targets()) { + if (i == primary || !get_peer_missing(i).is_missing(hoid)) + continue; + requires_missing_loc = true; + for (auto &&entry: logv) { + peer_missing[i].add_next_event(entry); + } + } + + if (requires_missing_loc) { + for (auto &&entry: logv) { + psdout(30) << __func__ << " missing_loc before: " + << missing_loc.get_locations(entry.soid) << dendl; + missing_loc.add_missing(entry.soid, entry.version, + eversion_t(), entry.is_delete()); + // clear out missing_loc + missing_loc.clear_location(entry.soid); + for (auto &i: get_actingset()) { + if (!get_peer_missing(i).is_missing(entry.soid)) + missing_loc.add_location(entry.soid, i); + } + psdout(30) << __func__ << " missing_loc after: " + << missing_loc.get_locations(entry.soid) << dendl; + } + } +} + +void PeeringState::recovery_committed_to(eversion_t version) +{ + psdout(10) << __func__ << " version " << version + << " now ondisk" << dendl; + last_complete_ondisk = version; + + if (last_complete_ondisk == info.last_update) { + if (!is_primary()) { + // Either we are a replica or backfill target. + // we are fully up to date. tell the primary! + pl->send_cluster_message( + get_primary().osd, + TOPNSPC::make_message<MOSDPGTrim>( + get_osdmap_epoch(), + spg_t(info.pgid.pgid, primary.shard), + last_complete_ondisk), + get_osdmap_epoch()); + } else { + calc_min_last_complete_ondisk(); + } + } +} + +void PeeringState::complete_write(eversion_t v, eversion_t lc) +{ + last_update_ondisk = v; + last_complete_ondisk = lc; + calc_min_last_complete_ondisk(); +} + +void PeeringState::calc_trim_to() +{ + size_t target = pl->get_target_pg_log_entries(); + + eversion_t limit = std::min( + min_last_complete_ondisk, + pg_log.get_can_rollback_to()); + if (limit != eversion_t() && + limit != pg_trim_to && + pg_log.get_log().approx_size() > target) { + size_t num_to_trim = std::min(pg_log.get_log().approx_size() - target, + cct->_conf->osd_pg_log_trim_max); + if (num_to_trim < cct->_conf->osd_pg_log_trim_min && + cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) { + return; + } + auto it = pg_log.get_log().log.begin(); + eversion_t new_trim_to; + for (size_t i = 0; i < num_to_trim; ++i) { + new_trim_to = it->version; + ++it; + if (new_trim_to > limit) { + new_trim_to = limit; + psdout(10) << "calc_trim_to trimming to min_last_complete_ondisk" << dendl; + break; + } + } + psdout(10) << "calc_trim_to " << pg_trim_to << " -> " << new_trim_to << dendl; + pg_trim_to = new_trim_to; + assert(pg_trim_to <= pg_log.get_head()); + assert(pg_trim_to <= min_last_complete_ondisk); + } +} + +void PeeringState::calc_trim_to_aggressive() +{ + size_t target = pl->get_target_pg_log_entries(); + + // limit pg log trimming up to the can_rollback_to value + eversion_t limit = std::min({ + pg_log.get_head(), + pg_log.get_can_rollback_to(), + last_update_ondisk}); + psdout(10) << __func__ << " limit = " << limit << dendl; + + if (limit != eversion_t() && + limit != pg_trim_to && + pg_log.get_log().approx_size() > target) { + psdout(10) << __func__ << " approx pg log length = " + << pg_log.get_log().approx_size() << dendl; + uint64_t num_to_trim = std::min<uint64_t>(pg_log.get_log().approx_size() - target, + cct->_conf->osd_pg_log_trim_max); + psdout(10) << __func__ << " num_to_trim = " << num_to_trim << dendl; + if (num_to_trim < cct->_conf->osd_pg_log_trim_min && + cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) { + return; + } + auto it = pg_log.get_log().log.begin(); // oldest log entry + auto rit = pg_log.get_log().log.rbegin(); + eversion_t by_n_to_keep; // start from tail + eversion_t by_n_to_trim = eversion_t::max(); // start from head + for (size_t i = 0; it != pg_log.get_log().log.end(); ++it, ++rit) { + i++; + if (i > target && by_n_to_keep == eversion_t()) { + by_n_to_keep = rit->version; + } + if (i >= num_to_trim && by_n_to_trim == eversion_t::max()) { + by_n_to_trim = it->version; + } + if (by_n_to_keep != eversion_t() && + by_n_to_trim != eversion_t::max()) { + break; + } + } + + if (by_n_to_keep == eversion_t()) { + return; + } + + pg_trim_to = std::min({by_n_to_keep, by_n_to_trim, limit}); + psdout(10) << __func__ << " pg_trim_to now " << pg_trim_to << dendl; + ceph_assert(pg_trim_to <= pg_log.get_head()); + } +} + +void PeeringState::apply_op_stats( + const hobject_t &soid, + const object_stat_sum_t &delta_stats) +{ + info.stats.stats.add(delta_stats); + info.stats.stats.floor(0); + + for (auto i = get_backfill_targets().begin(); + i != get_backfill_targets().end(); + ++i) { + pg_shard_t bt = *i; + pg_info_t& pinfo = peer_info[bt]; + if (soid <= pinfo.last_backfill) + pinfo.stats.stats.add(delta_stats); + } +} + +void PeeringState::update_complete_backfill_object_stats( + const hobject_t &hoid, + const pg_stat_t &stats) +{ + for (auto &&bt: get_backfill_targets()) { + pg_info_t& pinfo = peer_info[bt]; + //Add stats to all peers that were missing object + if (hoid > pinfo.last_backfill) + pinfo.stats.add(stats); + } +} + +void PeeringState::update_peer_last_backfill( + pg_shard_t peer, + const hobject_t &new_last_backfill) +{ + pg_info_t &pinfo = peer_info[peer]; + pinfo.last_backfill = new_last_backfill; + if (new_last_backfill.is_max()) { + /* pinfo.stats might be wrong if we did log-based recovery on the + * backfilled portion in addition to continuing backfill. + */ + pinfo.stats = info.stats; + } +} + +void PeeringState::set_revert_with_targets( + const hobject_t &soid, + const set<pg_shard_t> &good_peers) +{ + for (auto &&peer: good_peers) { + missing_loc.add_location(soid, peer); + } +} + +void PeeringState::prepare_backfill_for_missing( + const hobject_t &soid, + const eversion_t &version, + const vector<pg_shard_t> &targets) { + for (auto &&peer: targets) { + peer_missing[peer].add(soid, version, eversion_t(), false); + } +} + +void PeeringState::update_hset(const pg_hit_set_history_t &hset_history) +{ + info.hit_set = hset_history; +} + +/*------------ Peering State Machine----------------*/ +#undef dout_prefix +#define dout_prefix (context< PeeringMachine >().dpp->gen_prefix(*_dout) \ + << "state<" << get_state_name() << ">: ") +#undef psdout +#define psdout(x) ldout(context< PeeringMachine >().cct, x) + +#define DECLARE_LOCALS \ + PeeringState *ps = context< PeeringMachine >().state; \ + std::ignore = ps; \ + PeeringListener *pl = context< PeeringMachine >().pl; \ + std::ignore = pl + + +/*------Crashed-------*/ +PeeringState::Crashed::Crashed(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Crashed") +{ + context< PeeringMachine >().log_enter(state_name); + ceph_abort_msg("we got a bad state machine event"); +} + + +/*------Initial-------*/ +PeeringState::Initial::Initial(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Initial") +{ + context< PeeringMachine >().log_enter(state_name); +} + +boost::statechart::result PeeringState::Initial::react(const MNotifyRec& notify) +{ + DECLARE_LOCALS; + ps->proc_replica_info( + notify.from, notify.notify.info, notify.notify.epoch_sent); + ps->set_last_peering_reset(); + return transit< Primary >(); +} + +boost::statechart::result PeeringState::Initial::react(const MInfoRec& i) +{ + DECLARE_LOCALS; + ceph_assert(!ps->is_primary()); + post_event(i); + return transit< Stray >(); +} + +boost::statechart::result PeeringState::Initial::react(const MLogRec& i) +{ + DECLARE_LOCALS; + ceph_assert(!ps->is_primary()); + post_event(i); + return transit< Stray >(); +} + +void PeeringState::Initial::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_initial_latency, dur); +} + +/*------Started-------*/ +PeeringState::Started::Started(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started") +{ + context< PeeringMachine >().log_enter(state_name); +} + +boost::statechart::result +PeeringState::Started::react(const IntervalFlush&) +{ + psdout(10) << "Ending blocked outgoing recovery messages" << dendl; + context< PeeringMachine >().state->end_block_outgoing(); + return discard_event(); +} + +boost::statechart::result PeeringState::Started::react(const AdvMap& advmap) +{ + DECLARE_LOCALS; + psdout(10) << "Started advmap" << dendl; + ps->check_full_transition(advmap.lastmap, advmap.osdmap); + if (ps->should_restart_peering( + advmap.up_primary, + advmap.acting_primary, + advmap.newup, + advmap.newacting, + advmap.lastmap, + advmap.osdmap)) { + psdout(10) << "should_restart_peering, transitioning to Reset" + << dendl; + post_event(advmap); + return transit< Reset >(); + } + ps->remove_down_peer_info(advmap.osdmap); + return discard_event(); +} + +boost::statechart::result PeeringState::Started::react(const QueryState& q) +{ + q.f->open_object_section("state"); + q.f->dump_string("name", state_name); + q.f->dump_stream("enter_time") << enter_time; + q.f->close_section(); + return discard_event(); +} + +boost::statechart::result PeeringState::Started::react(const QueryUnfound& q) +{ + q.f->dump_string("state", "Started"); + q.f->dump_bool("available_might_have_unfound", false); + return discard_event(); +} + +void PeeringState::Started::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_started_latency, dur); + ps->state_clear(PG_STATE_WAIT | PG_STATE_LAGGY); +} + +/*--------Reset---------*/ +PeeringState::Reset::Reset(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Reset") +{ + context< PeeringMachine >().log_enter(state_name); + DECLARE_LOCALS; + + ps->flushes_in_progress = 0; + ps->set_last_peering_reset(); + ps->log_weirdness(); +} + +boost::statechart::result +PeeringState::Reset::react(const IntervalFlush&) +{ + psdout(10) << "Ending blocked outgoing recovery messages" << dendl; + context< PeeringMachine >().state->end_block_outgoing(); + return discard_event(); +} + +boost::statechart::result PeeringState::Reset::react(const AdvMap& advmap) +{ + DECLARE_LOCALS; + psdout(10) << "Reset advmap" << dendl; + + ps->check_full_transition(advmap.lastmap, advmap.osdmap); + + if (ps->should_restart_peering( + advmap.up_primary, + advmap.acting_primary, + advmap.newup, + advmap.newacting, + advmap.lastmap, + advmap.osdmap)) { + psdout(10) << "should restart peering, calling start_peering_interval again" + << dendl; + ps->start_peering_interval( + advmap.lastmap, + advmap.newup, advmap.up_primary, + advmap.newacting, advmap.acting_primary, + context< PeeringMachine >().get_cur_transaction()); + } + ps->remove_down_peer_info(advmap.osdmap); + ps->check_past_interval_bounds(); + return discard_event(); +} + +boost::statechart::result PeeringState::Reset::react(const ActMap&) +{ + DECLARE_LOCALS; + if (ps->should_send_notify() && ps->get_primary().osd >= 0) { + ps->info.history.refresh_prior_readable_until_ub( + pl->get_mnow(), + ps->prior_readable_until_ub); + context< PeeringMachine >().send_notify( + ps->get_primary().osd, + pg_notify_t( + ps->get_primary().shard, ps->pg_whoami.shard, + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + ps->info, + ps->past_intervals)); + } + + ps->update_heartbeat_peers(); + + return transit< Started >(); +} + +boost::statechart::result PeeringState::Reset::react(const QueryState& q) +{ + q.f->open_object_section("state"); + q.f->dump_string("name", state_name); + q.f->dump_stream("enter_time") << enter_time; + q.f->close_section(); + return discard_event(); +} + +boost::statechart::result PeeringState::Reset::react(const QueryUnfound& q) +{ + q.f->dump_string("state", "Reset"); + q.f->dump_bool("available_might_have_unfound", false); + return discard_event(); +} + +void PeeringState::Reset::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_reset_latency, dur); +} + +/*-------Start---------*/ +PeeringState::Start::Start(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Start") +{ + context< PeeringMachine >().log_enter(state_name); + + DECLARE_LOCALS; + if (ps->is_primary()) { + psdout(1) << "transitioning to Primary" << dendl; + post_event(MakePrimary()); + } else { //is_stray + psdout(1) << "transitioning to Stray" << dendl; + post_event(MakeStray()); + } +} + +void PeeringState::Start::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_start_latency, dur); +} + +/*---------Primary--------*/ +PeeringState::Primary::Primary(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary") +{ + context< PeeringMachine >().log_enter(state_name); + DECLARE_LOCALS; + ceph_assert(ps->want_acting.empty()); + + // set CREATING bit until we have peered for the first time. + if (ps->info.history.last_epoch_started == 0) { + ps->state_set(PG_STATE_CREATING); + // use the history timestamp, which ultimately comes from the + // monitor in the create case. + utime_t t = ps->info.history.last_scrub_stamp; + ps->info.stats.last_fresh = t; + ps->info.stats.last_active = t; + ps->info.stats.last_change = t; + ps->info.stats.last_peered = t; + ps->info.stats.last_clean = t; + ps->info.stats.last_unstale = t; + ps->info.stats.last_undegraded = t; + ps->info.stats.last_fullsized = t; + ps->info.stats.last_scrub_stamp = t; + ps->info.stats.last_deep_scrub_stamp = t; + ps->info.stats.last_clean_scrub_stamp = t; + } +} + +boost::statechart::result PeeringState::Primary::react(const MNotifyRec& notevt) +{ + DECLARE_LOCALS; + psdout(7) << "handle_pg_notify from osd." << notevt.from << dendl; + ps->proc_replica_info( + notevt.from, notevt.notify.info, notevt.notify.epoch_sent); + return discard_event(); +} + +boost::statechart::result PeeringState::Primary::react(const ActMap&) +{ + DECLARE_LOCALS; + psdout(7) << "handle ActMap primary" << dendl; + pl->publish_stats_to_osd(); + return discard_event(); +} + +boost::statechart::result PeeringState::Primary::react( + const SetForceRecovery&) +{ + DECLARE_LOCALS; + ps->set_force_recovery(true); + return discard_event(); +} + +boost::statechart::result PeeringState::Primary::react( + const UnsetForceRecovery&) +{ + DECLARE_LOCALS; + ps->set_force_recovery(false); + return discard_event(); +} + +boost::statechart::result PeeringState::Primary::react( + const RequestScrub& evt) +{ + DECLARE_LOCALS; + if (ps->is_primary()) { + pl->scrub_requested(evt.deep, evt.repair); + psdout(10) << "marking for scrub" << dendl; + } + return discard_event(); +} + +boost::statechart::result PeeringState::Primary::react( + const SetForceBackfill&) +{ + DECLARE_LOCALS; + ps->set_force_backfill(true); + return discard_event(); +} + +boost::statechart::result PeeringState::Primary::react( + const UnsetForceBackfill&) +{ + DECLARE_LOCALS; + ps->set_force_backfill(false); + return discard_event(); +} + +void PeeringState::Primary::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + ps->want_acting.clear(); + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_primary_latency, dur); + pl->clear_primary_state(); + ps->state_clear(PG_STATE_CREATING); +} + +/*---------Peering--------*/ +PeeringState::Peering::Peering(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Peering"), + history_les_bound(false) +{ + context< PeeringMachine >().log_enter(state_name); + DECLARE_LOCALS; + + ceph_assert(!ps->is_peered()); + ceph_assert(!ps->is_peering()); + ceph_assert(ps->is_primary()); + ps->state_set(PG_STATE_PEERING); +} + +boost::statechart::result PeeringState::Peering::react(const AdvMap& advmap) +{ + DECLARE_LOCALS; + psdout(10) << "Peering advmap" << dendl; + if (prior_set.affected_by_map(*(advmap.osdmap), ps->dpp)) { + psdout(1) << "Peering, affected_by_map, going to Reset" << dendl; + post_event(advmap); + return transit< Reset >(); + } + + ps->adjust_need_up_thru(advmap.osdmap); + ps->check_prior_readable_down_osds(advmap.osdmap); + + return forward_event(); +} + +boost::statechart::result PeeringState::Peering::react(const QueryState& q) +{ + DECLARE_LOCALS; + + q.f->open_object_section("state"); + q.f->dump_string("name", state_name); + q.f->dump_stream("enter_time") << enter_time; + + q.f->open_array_section("past_intervals"); + ps->past_intervals.dump(q.f); + q.f->close_section(); + + q.f->open_array_section("probing_osds"); + for (auto p = prior_set.probe.begin(); p != prior_set.probe.end(); ++p) + q.f->dump_stream("osd") << *p; + q.f->close_section(); + + if (prior_set.pg_down) + q.f->dump_string("blocked", "peering is blocked due to down osds"); + + q.f->open_array_section("down_osds_we_would_probe"); + for (auto p = prior_set.down.begin(); p != prior_set.down.end(); ++p) + q.f->dump_int("osd", *p); + q.f->close_section(); + + q.f->open_array_section("peering_blocked_by"); + for (auto p = prior_set.blocked_by.begin(); + p != prior_set.blocked_by.end(); + ++p) { + q.f->open_object_section("osd"); + q.f->dump_int("osd", p->first); + q.f->dump_int("current_lost_at", p->second); + q.f->dump_string("comment", "starting or marking this osd lost may let us proceed"); + q.f->close_section(); + } + q.f->close_section(); + + if (history_les_bound) { + q.f->open_array_section("peering_blocked_by_detail"); + q.f->open_object_section("item"); + q.f->dump_string("detail","peering_blocked_by_history_les_bound"); + q.f->close_section(); + q.f->close_section(); + } + + q.f->close_section(); + return forward_event(); +} + +boost::statechart::result PeeringState::Peering::react(const QueryUnfound& q) +{ + q.f->dump_string("state", "Peering"); + q.f->dump_bool("available_might_have_unfound", false); + return discard_event(); +} + +void PeeringState::Peering::exit() +{ + + DECLARE_LOCALS; + psdout(10) << "Leaving Peering" << dendl; + context< PeeringMachine >().log_exit(state_name, enter_time); + ps->state_clear(PG_STATE_PEERING); + pl->clear_probe_targets(); + + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_peering_latency, dur); +} + + +/*------Backfilling-------*/ +PeeringState::Backfilling::Backfilling(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/Backfilling") +{ + context< PeeringMachine >().log_enter(state_name); + + + DECLARE_LOCALS; + ps->backfill_reserved = true; + pl->on_backfill_reserved(); + ps->state_clear(PG_STATE_BACKFILL_TOOFULL); + ps->state_clear(PG_STATE_BACKFILL_WAIT); + ps->state_set(PG_STATE_BACKFILLING); + pl->publish_stats_to_osd(); +} + +void PeeringState::Backfilling::backfill_release_reservations() +{ + DECLARE_LOCALS; + pl->cancel_local_background_io_reservation(); + for (auto it = ps->backfill_targets.begin(); + it != ps->backfill_targets.end(); + ++it) { + ceph_assert(*it != ps->pg_whoami); + pl->send_cluster_message( + it->osd, + TOPNSPC::make_message<MBackfillReserve>( + MBackfillReserve::RELEASE, + spg_t(ps->info.pgid.pgid, it->shard), + ps->get_osdmap_epoch()), + ps->get_osdmap_epoch()); + } +} + +void PeeringState::Backfilling::cancel_backfill() +{ + DECLARE_LOCALS; + backfill_release_reservations(); + pl->on_backfill_canceled(); +} + +boost::statechart::result +PeeringState::Backfilling::react(const Backfilled &c) +{ + backfill_release_reservations(); + return transit<Recovered>(); +} + +boost::statechart::result +PeeringState::Backfilling::react(const DeferBackfill &c) +{ + DECLARE_LOCALS; + + psdout(10) << "defer backfill, retry delay " << c.delay << dendl; + ps->state_set(PG_STATE_BACKFILL_WAIT); + ps->state_clear(PG_STATE_BACKFILLING); + cancel_backfill(); + + pl->schedule_event_after( + std::make_shared<PGPeeringEvent>( + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + RequestBackfill()), + c.delay); + return transit<NotBackfilling>(); +} + +boost::statechart::result +PeeringState::Backfilling::react(const UnfoundBackfill &c) +{ + DECLARE_LOCALS; + psdout(10) << "backfill has unfound, can't continue" << dendl; + ps->state_set(PG_STATE_BACKFILL_UNFOUND); + ps->state_clear(PG_STATE_BACKFILLING); + cancel_backfill(); + return transit<NotBackfilling>(); +} + +boost::statechart::result +PeeringState::Backfilling::react(const RemoteReservationRevokedTooFull &) +{ + DECLARE_LOCALS; + + ps->state_set(PG_STATE_BACKFILL_TOOFULL); + ps->state_clear(PG_STATE_BACKFILLING); + cancel_backfill(); + + pl->schedule_event_after( + std::make_shared<PGPeeringEvent>( + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + RequestBackfill()), + ps->cct->_conf->osd_backfill_retry_interval); + + return transit<NotBackfilling>(); +} + +boost::statechart::result +PeeringState::Backfilling::react(const RemoteReservationRevoked &) +{ + DECLARE_LOCALS; + ps->state_set(PG_STATE_BACKFILL_WAIT); + cancel_backfill(); + if (ps->needs_backfill()) { + return transit<WaitLocalBackfillReserved>(); + } else { + // raced with MOSDPGBackfill::OP_BACKFILL_FINISH, ignore + return discard_event(); + } +} + +void PeeringState::Backfilling::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + ps->backfill_reserved = false; + ps->state_clear(PG_STATE_BACKFILLING); + ps->state_clear(PG_STATE_FORCED_BACKFILL | PG_STATE_FORCED_RECOVERY); + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_backfilling_latency, dur); +} + +/*--WaitRemoteBackfillReserved--*/ + +PeeringState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/WaitRemoteBackfillReserved"), + backfill_osd_it(context< Active >().remote_shards_to_reserve_backfill.begin()) +{ + context< PeeringMachine >().log_enter(state_name); + DECLARE_LOCALS; + + ps->state_set(PG_STATE_BACKFILL_WAIT); + pl->publish_stats_to_osd(); + post_event(RemoteBackfillReserved()); +} + +boost::statechart::result +PeeringState::WaitRemoteBackfillReserved::react(const RemoteBackfillReserved &evt) +{ + DECLARE_LOCALS; + + int64_t num_bytes = ps->info.stats.stats.sum.num_bytes; + psdout(10) << __func__ << " num_bytes " << num_bytes << dendl; + if (backfill_osd_it != + context< Active >().remote_shards_to_reserve_backfill.end()) { + // The primary never backfills itself + ceph_assert(*backfill_osd_it != ps->pg_whoami); + pl->send_cluster_message( + backfill_osd_it->osd, + TOPNSPC::make_message<MBackfillReserve>( + MBackfillReserve::REQUEST, + spg_t(context< PeeringMachine >().spgid.pgid, backfill_osd_it->shard), + ps->get_osdmap_epoch(), + ps->get_backfill_priority(), + num_bytes, + ps->peer_bytes[*backfill_osd_it]), + ps->get_osdmap_epoch()); + ++backfill_osd_it; + } else { + ps->peer_bytes.clear(); + post_event(AllBackfillsReserved()); + } + return discard_event(); +} + +void PeeringState::WaitRemoteBackfillReserved::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_waitremotebackfillreserved_latency, dur); +} + +void PeeringState::WaitRemoteBackfillReserved::retry() +{ + DECLARE_LOCALS; + pl->cancel_local_background_io_reservation(); + + // Send CANCEL to all previously acquired reservations + set<pg_shard_t>::const_iterator it, begin, end; + begin = context< Active >().remote_shards_to_reserve_backfill.begin(); + end = context< Active >().remote_shards_to_reserve_backfill.end(); + ceph_assert(begin != end); + for (it = begin; it != backfill_osd_it; ++it) { + // The primary never backfills itself + ceph_assert(*it != ps->pg_whoami); + pl->send_cluster_message( + it->osd, + TOPNSPC::make_message<MBackfillReserve>( + MBackfillReserve::RELEASE, + spg_t(context< PeeringMachine >().spgid.pgid, it->shard), + ps->get_osdmap_epoch()), + ps->get_osdmap_epoch()); + } + + ps->state_clear(PG_STATE_BACKFILL_WAIT); + pl->publish_stats_to_osd(); + + pl->schedule_event_after( + std::make_shared<PGPeeringEvent>( + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + RequestBackfill()), + ps->cct->_conf->osd_backfill_retry_interval); +} + +boost::statechart::result +PeeringState::WaitRemoteBackfillReserved::react(const RemoteReservationRejectedTooFull &evt) +{ + DECLARE_LOCALS; + ps->state_set(PG_STATE_BACKFILL_TOOFULL); + retry(); + return transit<NotBackfilling>(); +} + +boost::statechart::result +PeeringState::WaitRemoteBackfillReserved::react(const RemoteReservationRevoked &evt) +{ + retry(); + return transit<NotBackfilling>(); +} + +/*--WaitLocalBackfillReserved--*/ +PeeringState::WaitLocalBackfillReserved::WaitLocalBackfillReserved(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/WaitLocalBackfillReserved") +{ + context< PeeringMachine >().log_enter(state_name); + DECLARE_LOCALS; + + ps->state_set(PG_STATE_BACKFILL_WAIT); + pl->request_local_background_io_reservation( + ps->get_backfill_priority(), + std::make_unique<PGPeeringEvent>( + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + LocalBackfillReserved()), + std::make_unique<PGPeeringEvent>( + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + DeferBackfill(0.0))); + pl->publish_stats_to_osd(); +} + +void PeeringState::WaitLocalBackfillReserved::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_waitlocalbackfillreserved_latency, dur); +} + +/*----NotBackfilling------*/ +PeeringState::NotBackfilling::NotBackfilling(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/NotBackfilling") +{ + context< PeeringMachine >().log_enter(state_name); + DECLARE_LOCALS; + ps->state_clear(PG_STATE_REPAIR); + pl->publish_stats_to_osd(); +} + +boost::statechart::result PeeringState::NotBackfilling::react(const QueryUnfound& q) +{ + DECLARE_LOCALS; + + ps->query_unfound(q.f, "NotBackfilling"); + return discard_event(); +} + +boost::statechart::result +PeeringState::NotBackfilling::react(const RemoteBackfillReserved &evt) +{ + return discard_event(); +} + +boost::statechart::result +PeeringState::NotBackfilling::react(const RemoteReservationRejectedTooFull &evt) +{ + return discard_event(); +} + +void PeeringState::NotBackfilling::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + + DECLARE_LOCALS; + ps->state_clear(PG_STATE_BACKFILL_UNFOUND); + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_notbackfilling_latency, dur); +} + +/*----NotRecovering------*/ +PeeringState::NotRecovering::NotRecovering(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/NotRecovering") +{ + context< PeeringMachine >().log_enter(state_name); + DECLARE_LOCALS; + ps->state_clear(PG_STATE_REPAIR); + pl->publish_stats_to_osd(); +} + +boost::statechart::result PeeringState::NotRecovering::react(const QueryUnfound& q) +{ + DECLARE_LOCALS; + + ps->query_unfound(q.f, "NotRecovering"); + return discard_event(); +} + +void PeeringState::NotRecovering::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + + DECLARE_LOCALS; + ps->state_clear(PG_STATE_RECOVERY_UNFOUND); + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_notrecovering_latency, dur); +} + +/*---RepNotRecovering----*/ +PeeringState::RepNotRecovering::RepNotRecovering(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/ReplicaActive/RepNotRecovering") +{ + context< PeeringMachine >().log_enter(state_name); +} + +boost::statechart::result +PeeringState::RepNotRecovering::react(const RejectTooFullRemoteReservation &evt) +{ + DECLARE_LOCALS; + ps->reject_reservation(); + post_event(RemoteReservationRejectedTooFull()); + return discard_event(); +} + +void PeeringState::RepNotRecovering::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_repnotrecovering_latency, dur); +} + +/*---RepWaitRecoveryReserved--*/ +PeeringState::RepWaitRecoveryReserved::RepWaitRecoveryReserved(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/ReplicaActive/RepWaitRecoveryReserved") +{ + context< PeeringMachine >().log_enter(state_name); +} + +boost::statechart::result +PeeringState::RepWaitRecoveryReserved::react(const RemoteRecoveryReserved &evt) +{ + DECLARE_LOCALS; + pl->send_cluster_message( + ps->primary.osd, + TOPNSPC::make_message<MRecoveryReserve>( + MRecoveryReserve::GRANT, + spg_t(ps->info.pgid.pgid, ps->primary.shard), + ps->get_osdmap_epoch()), + ps->get_osdmap_epoch()); + return transit<RepRecovering>(); +} + +boost::statechart::result +PeeringState::RepWaitRecoveryReserved::react( + const RemoteReservationCanceled &evt) +{ + DECLARE_LOCALS; + pl->unreserve_recovery_space(); + + pl->cancel_remote_recovery_reservation(); + return transit<RepNotRecovering>(); +} + +void PeeringState::RepWaitRecoveryReserved::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_repwaitrecoveryreserved_latency, dur); +} + +/*-RepWaitBackfillReserved*/ +PeeringState::RepWaitBackfillReserved::RepWaitBackfillReserved(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/ReplicaActive/RepWaitBackfillReserved") +{ + context< PeeringMachine >().log_enter(state_name); +} + +boost::statechart::result +PeeringState::RepNotRecovering::react(const RequestBackfillPrio &evt) +{ + + DECLARE_LOCALS; + + if (!pl->try_reserve_recovery_space( + evt.primary_num_bytes, evt.local_num_bytes)) { + post_event(RejectTooFullRemoteReservation()); + } else { + PGPeeringEventURef preempt; + if (HAVE_FEATURE(ps->upacting_features, RECOVERY_RESERVATION_2)) { + // older peers will interpret preemption as TOOFULL + preempt = std::make_unique<PGPeeringEvent>( + pl->get_osdmap_epoch(), + pl->get_osdmap_epoch(), + RemoteBackfillPreempted()); + } + pl->request_remote_recovery_reservation( + evt.priority, + std::make_unique<PGPeeringEvent>( + pl->get_osdmap_epoch(), + pl->get_osdmap_epoch(), + RemoteBackfillReserved()), + std::move(preempt)); + } + return transit<RepWaitBackfillReserved>(); +} + +boost::statechart::result +PeeringState::RepNotRecovering::react(const RequestRecoveryPrio &evt) +{ + DECLARE_LOCALS; + + // fall back to a local reckoning of priority of primary doesn't pass one + // (pre-mimic compat) + int prio = evt.priority ? evt.priority : ps->get_recovery_priority(); + + PGPeeringEventURef preempt; + if (HAVE_FEATURE(ps->upacting_features, RECOVERY_RESERVATION_2)) { + // older peers can't handle this + preempt = std::make_unique<PGPeeringEvent>( + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + RemoteRecoveryPreempted()); + } + + pl->request_remote_recovery_reservation( + prio, + std::make_unique<PGPeeringEvent>( + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + RemoteRecoveryReserved()), + std::move(preempt)); + return transit<RepWaitRecoveryReserved>(); +} + +void PeeringState::RepWaitBackfillReserved::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_repwaitbackfillreserved_latency, dur); +} + +boost::statechart::result +PeeringState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &evt) +{ + DECLARE_LOCALS; + + + pl->send_cluster_message( + ps->primary.osd, + TOPNSPC::make_message<MBackfillReserve>( + MBackfillReserve::GRANT, + spg_t(ps->info.pgid.pgid, ps->primary.shard), + ps->get_osdmap_epoch()), + ps->get_osdmap_epoch()); + return transit<RepRecovering>(); +} + +boost::statechart::result +PeeringState::RepWaitBackfillReserved::react( + const RejectTooFullRemoteReservation &evt) +{ + DECLARE_LOCALS; + ps->reject_reservation(); + post_event(RemoteReservationRejectedTooFull()); + return discard_event(); +} + +boost::statechart::result +PeeringState::RepWaitBackfillReserved::react( + const RemoteReservationRejectedTooFull &evt) +{ + DECLARE_LOCALS; + pl->unreserve_recovery_space(); + + pl->cancel_remote_recovery_reservation(); + return transit<RepNotRecovering>(); +} + +boost::statechart::result +PeeringState::RepWaitBackfillReserved::react( + const RemoteReservationCanceled &evt) +{ + DECLARE_LOCALS; + pl->unreserve_recovery_space(); + + pl->cancel_remote_recovery_reservation(); + return transit<RepNotRecovering>(); +} + +/*---RepRecovering-------*/ +PeeringState::RepRecovering::RepRecovering(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/ReplicaActive/RepRecovering") +{ + context< PeeringMachine >().log_enter(state_name); +} + +boost::statechart::result +PeeringState::RepRecovering::react(const RemoteRecoveryPreempted &) +{ + DECLARE_LOCALS; + + + pl->unreserve_recovery_space(); + pl->send_cluster_message( + ps->primary.osd, + TOPNSPC::make_message<MRecoveryReserve>( + MRecoveryReserve::REVOKE, + spg_t(ps->info.pgid.pgid, ps->primary.shard), + ps->get_osdmap_epoch()), + ps->get_osdmap_epoch()); + return discard_event(); +} + +boost::statechart::result +PeeringState::RepRecovering::react(const BackfillTooFull &) +{ + DECLARE_LOCALS; + + + pl->unreserve_recovery_space(); + pl->send_cluster_message( + ps->primary.osd, + TOPNSPC::make_message<MBackfillReserve>( + MBackfillReserve::REVOKE_TOOFULL, + spg_t(ps->info.pgid.pgid, ps->primary.shard), + ps->get_osdmap_epoch()), + ps->get_osdmap_epoch()); + return discard_event(); +} + +boost::statechart::result +PeeringState::RepRecovering::react(const RemoteBackfillPreempted &) +{ + DECLARE_LOCALS; + + + pl->unreserve_recovery_space(); + pl->send_cluster_message( + ps->primary.osd, + TOPNSPC::make_message<MBackfillReserve>( + MBackfillReserve::REVOKE, + spg_t(ps->info.pgid.pgid, ps->primary.shard), + ps->get_osdmap_epoch()), + ps->get_osdmap_epoch()); + return discard_event(); +} + +void PeeringState::RepRecovering::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + pl->unreserve_recovery_space(); + + pl->cancel_remote_recovery_reservation(); + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_reprecovering_latency, dur); +} + +/*------Activating--------*/ +PeeringState::Activating::Activating(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/Activating") +{ + context< PeeringMachine >().log_enter(state_name); +} + +void PeeringState::Activating::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_activating_latency, dur); +} + +PeeringState::WaitLocalRecoveryReserved::WaitLocalRecoveryReserved(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/WaitLocalRecoveryReserved") +{ + context< PeeringMachine >().log_enter(state_name); + DECLARE_LOCALS; + + // Make sure all nodes that part of the recovery aren't full + if (!ps->cct->_conf->osd_debug_skip_full_check_in_recovery && + ps->get_osdmap()->check_full(ps->acting_recovery_backfill)) { + post_event(RecoveryTooFull()); + return; + } + + ps->state_clear(PG_STATE_RECOVERY_TOOFULL); + ps->state_set(PG_STATE_RECOVERY_WAIT); + pl->request_local_background_io_reservation( + ps->get_recovery_priority(), + std::make_unique<PGPeeringEvent>( + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + LocalRecoveryReserved()), + std::make_unique<PGPeeringEvent>( + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + DeferRecovery(0.0))); + pl->publish_stats_to_osd(); +} + +boost::statechart::result +PeeringState::WaitLocalRecoveryReserved::react(const RecoveryTooFull &evt) +{ + DECLARE_LOCALS; + ps->state_set(PG_STATE_RECOVERY_TOOFULL); + pl->schedule_event_after( + std::make_shared<PGPeeringEvent>( + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + DoRecovery()), + ps->cct->_conf->osd_recovery_retry_interval); + return transit<NotRecovering>(); +} + +void PeeringState::WaitLocalRecoveryReserved::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_waitlocalrecoveryreserved_latency, dur); +} + +PeeringState::WaitRemoteRecoveryReserved::WaitRemoteRecoveryReserved(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/WaitRemoteRecoveryReserved"), + remote_recovery_reservation_it(context< Active >().remote_shards_to_reserve_recovery.begin()) +{ + context< PeeringMachine >().log_enter(state_name); + post_event(RemoteRecoveryReserved()); +} + +boost::statechart::result +PeeringState::WaitRemoteRecoveryReserved::react(const RemoteRecoveryReserved &evt) { + DECLARE_LOCALS; + + if (remote_recovery_reservation_it != + context< Active >().remote_shards_to_reserve_recovery.end()) { + ceph_assert(*remote_recovery_reservation_it != ps->pg_whoami); + pl->send_cluster_message( + remote_recovery_reservation_it->osd, + TOPNSPC::make_message<MRecoveryReserve>( + MRecoveryReserve::REQUEST, + spg_t(context< PeeringMachine >().spgid.pgid, + remote_recovery_reservation_it->shard), + ps->get_osdmap_epoch(), + ps->get_recovery_priority()), + ps->get_osdmap_epoch()); + ++remote_recovery_reservation_it; + } else { + post_event(AllRemotesReserved()); + } + return discard_event(); +} + +void PeeringState::WaitRemoteRecoveryReserved::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_waitremoterecoveryreserved_latency, dur); +} + +PeeringState::Recovering::Recovering(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/Recovering") +{ + context< PeeringMachine >().log_enter(state_name); + + DECLARE_LOCALS; + ps->state_clear(PG_STATE_RECOVERY_WAIT); + ps->state_clear(PG_STATE_RECOVERY_TOOFULL); + ps->state_set(PG_STATE_RECOVERING); + pl->on_recovery_reserved(); + ceph_assert(!ps->state_test(PG_STATE_ACTIVATING)); + pl->publish_stats_to_osd(); +} + +void PeeringState::Recovering::release_reservations(bool cancel) +{ + DECLARE_LOCALS; + ceph_assert(cancel || !ps->pg_log.get_missing().have_missing()); + + // release remote reservations + for (auto i = context< Active >().remote_shards_to_reserve_recovery.begin(); + i != context< Active >().remote_shards_to_reserve_recovery.end(); + ++i) { + if (*i == ps->pg_whoami) // skip myself + continue; + pl->send_cluster_message( + i->osd, + TOPNSPC::make_message<MRecoveryReserve>( + MRecoveryReserve::RELEASE, + spg_t(ps->info.pgid.pgid, i->shard), + ps->get_osdmap_epoch()), + ps->get_osdmap_epoch()); + } +} + +boost::statechart::result +PeeringState::Recovering::react(const AllReplicasRecovered &evt) +{ + DECLARE_LOCALS; + ps->state_clear(PG_STATE_FORCED_RECOVERY); + release_reservations(); + pl->cancel_local_background_io_reservation(); + return transit<Recovered>(); +} + +boost::statechart::result +PeeringState::Recovering::react(const RequestBackfill &evt) +{ + DECLARE_LOCALS; + + release_reservations(); + + ps->state_clear(PG_STATE_FORCED_RECOVERY); + pl->cancel_local_background_io_reservation(); + pl->publish_stats_to_osd(); + // transit any async_recovery_targets back into acting + // so pg won't have to stay undersized for long + // as backfill might take a long time to complete.. + if (!ps->async_recovery_targets.empty()) { + pg_shard_t auth_log_shard; + bool history_les_bound = false; + // FIXME: Uh-oh we have to check this return value; choose_acting can fail! + ps->choose_acting(auth_log_shard, true, &history_les_bound); + } + return transit<WaitLocalBackfillReserved>(); +} + +boost::statechart::result +PeeringState::Recovering::react(const DeferRecovery &evt) +{ + DECLARE_LOCALS; + if (!ps->state_test(PG_STATE_RECOVERING)) { + // we may have finished recovery and have an AllReplicasRecovered + // event queued to move us to the next state. + psdout(10) << "got defer recovery but not recovering" << dendl; + return discard_event(); + } + psdout(10) << "defer recovery, retry delay " << evt.delay << dendl; + ps->state_set(PG_STATE_RECOVERY_WAIT); + pl->cancel_local_background_io_reservation(); + release_reservations(true); + pl->schedule_event_after( + std::make_shared<PGPeeringEvent>( + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + DoRecovery()), + evt.delay); + return transit<NotRecovering>(); +} + +boost::statechart::result +PeeringState::Recovering::react(const UnfoundRecovery &evt) +{ + DECLARE_LOCALS; + psdout(10) << "recovery has unfound, can't continue" << dendl; + ps->state_set(PG_STATE_RECOVERY_UNFOUND); + pl->cancel_local_background_io_reservation(); + release_reservations(true); + return transit<NotRecovering>(); +} + +void PeeringState::Recovering::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + ps->state_clear(PG_STATE_RECOVERING); + pl->get_peering_perf().tinc(rs_recovering_latency, dur); +} + +PeeringState::Recovered::Recovered(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/Recovered") +{ + pg_shard_t auth_log_shard; + + context< PeeringMachine >().log_enter(state_name); + + DECLARE_LOCALS; + + ceph_assert(!ps->needs_recovery()); + + // if we finished backfill, all acting are active; recheck if + // DEGRADED | UNDERSIZED is appropriate. + ceph_assert(!ps->acting_recovery_backfill.empty()); + if (ps->get_osdmap()->get_pg_size(context< PeeringMachine >().spgid.pgid) <= + ps->acting_recovery_backfill.size()) { + ps->state_clear(PG_STATE_FORCED_BACKFILL | PG_STATE_FORCED_RECOVERY); + pl->publish_stats_to_osd(); + } + + // adjust acting set? (e.g. because backfill completed...) + bool history_les_bound = false; + if (ps->acting != ps->up && !ps->choose_acting(auth_log_shard, + true, &history_les_bound)) { + ceph_assert(ps->want_acting.size()); + } else if (!ps->async_recovery_targets.empty()) { + // FIXME: Uh-oh we have to check this return value; choose_acting can fail! + ps->choose_acting(auth_log_shard, true, &history_les_bound); + } + + if (context< Active >().all_replicas_activated && + ps->async_recovery_targets.empty()) + post_event(GoClean()); +} + +void PeeringState::Recovered::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_recovered_latency, dur); +} + +PeeringState::Clean::Clean(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/Clean") +{ + context< PeeringMachine >().log_enter(state_name); + + DECLARE_LOCALS; + + if (ps->info.last_complete != ps->info.last_update) { + ceph_abort(); + } + + + ps->try_mark_clean(); + + context< PeeringMachine >().get_cur_transaction().register_on_commit( + pl->on_clean()); +} + +void PeeringState::Clean::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + + DECLARE_LOCALS; + ps->state_clear(PG_STATE_CLEAN); + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_clean_latency, dur); +} + +template <typename T> +set<pg_shard_t> unique_osd_shard_set(const pg_shard_t & skip, const T &in) +{ + set<int> osds_found; + set<pg_shard_t> out; + for (auto i = in.begin(); i != in.end(); ++i) { + if (*i != skip && !osds_found.count(i->osd)) { + osds_found.insert(i->osd); + out.insert(*i); + } + } + return out; +} + +/*---------Active---------*/ +PeeringState::Active::Active(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active"), + remote_shards_to_reserve_recovery( + unique_osd_shard_set( + context< PeeringMachine >().state->pg_whoami, + context< PeeringMachine >().state->acting_recovery_backfill)), + remote_shards_to_reserve_backfill( + unique_osd_shard_set( + context< PeeringMachine >().state->pg_whoami, + context< PeeringMachine >().state->backfill_targets)), + all_replicas_activated(false) +{ + context< PeeringMachine >().log_enter(state_name); + + + DECLARE_LOCALS; + + ceph_assert(!ps->backfill_reserved); + ceph_assert(ps->is_primary()); + psdout(10) << "In Active, about to call activate" << dendl; + ps->start_flush(context< PeeringMachine >().get_cur_transaction()); + ps->activate(context< PeeringMachine >().get_cur_transaction(), + ps->get_osdmap_epoch(), + context< PeeringMachine >().get_recovery_ctx()); + + // everyone has to commit/ack before we are truly active + ps->blocked_by.clear(); + for (auto p = ps->acting_recovery_backfill.begin(); + p != ps->acting_recovery_backfill.end(); + ++p) { + if (p->shard != ps->pg_whoami.shard) { + ps->blocked_by.insert(p->shard); + } + } + pl->publish_stats_to_osd(); + psdout(10) << "Activate Finished" << dendl; +} + +boost::statechart::result PeeringState::Active::react(const AdvMap& advmap) +{ + DECLARE_LOCALS; + + if (ps->should_restart_peering( + advmap.up_primary, + advmap.acting_primary, + advmap.newup, + advmap.newacting, + advmap.lastmap, + advmap.osdmap)) { + psdout(10) << "Active advmap interval change, fast return" << dendl; + return forward_event(); + } + psdout(10) << "Active advmap" << dendl; + bool need_publish = false; + + pl->on_active_advmap(advmap.osdmap); + if (ps->dirty_big_info) { + // share updated purged_snaps to mgr/mon so that we (a) stop reporting + // purged snaps and (b) perhaps share more snaps that we have purged + // but didn't fit in pg_stat_t. + need_publish = true; + ps->share_pg_info(); + } + + bool need_acting_change = false; + for (size_t i = 0; i < ps->want_acting.size(); i++) { + int osd = ps->want_acting[i]; + if (!advmap.osdmap->is_up(osd)) { + pg_shard_t osd_with_shard(osd, shard_id_t(i)); + if (!ps->is_acting(osd_with_shard) && !ps->is_up(osd_with_shard)) { + psdout(10) << "Active stray osd." << osd << " in want_acting is down" + << dendl; + need_acting_change = true; + } + } + } + if (need_acting_change) { + psdout(10) << "Active need acting change, call choose_acting again" + << dendl; + // possibly because we re-add some strays into the acting set and + // some of them then go down in a subsequent map before we could see + // the map changing the pg temp. + // call choose_acting again to clear them out. + // note that we leave restrict_to_up_acting to false in order to + // not overkill any chosen stray that is still alive. + pg_shard_t auth_log_shard; + bool history_les_bound = false; + ps->remove_down_peer_info(advmap.osdmap); + ps->choose_acting(auth_log_shard, false, &history_les_bound, true); + } + + /* Check for changes in pool size (if the acting set changed as a result, + * this does not matter) */ + if (advmap.lastmap->get_pg_size(ps->info.pgid.pgid) != + ps->get_osdmap()->get_pg_size(ps->info.pgid.pgid)) { + if (ps->get_osdmap()->get_pg_size(ps->info.pgid.pgid) <= + ps->actingset.size()) { + ps->state_clear(PG_STATE_UNDERSIZED); + } else { + ps->state_set(PG_STATE_UNDERSIZED); + } + // degraded changes will be detected by call from publish_stats_to_osd() + need_publish = true; + } + + // if we haven't reported our PG stats in a long time, do so now. + if (ps->info.stats.reported_epoch + ps->cct->_conf->osd_pg_stat_report_interval_max < advmap.osdmap->get_epoch()) { + psdout(20) << "reporting stats to osd after " << (advmap.osdmap->get_epoch() - ps->info.stats.reported_epoch) + << " epochs" << dendl; + need_publish = true; + } + + if (need_publish) + pl->publish_stats_to_osd(); + + if (ps->check_prior_readable_down_osds(advmap.osdmap)) { + pl->recheck_readable(); + } + + return forward_event(); +} + +boost::statechart::result PeeringState::Active::react(const ActMap&) +{ + DECLARE_LOCALS; + psdout(10) << "Active: handling ActMap" << dendl; + ceph_assert(ps->is_primary()); + + pl->on_active_actmap(); + + if (ps->have_unfound()) { + // object may have become unfound + ps->discover_all_missing(context<PeeringMachine>().get_recovery_ctx().msgs); + } + + uint64_t unfound = ps->missing_loc.num_unfound(); + if (unfound > 0 && + ps->all_unfound_are_queried_or_lost(ps->get_osdmap())) { + if (ps->cct->_conf->osd_auto_mark_unfound_lost) { + pl->get_clog_error() << context< PeeringMachine >().spgid.pgid << " has " << unfound + << " objects unfound and apparently lost, would automatically " + << "mark these objects lost but this feature is not yet implemented " + << "(osd_auto_mark_unfound_lost)"; + } else + pl->get_clog_error() << context< PeeringMachine >().spgid.pgid << " has " + << unfound << " objects unfound and apparently lost"; + } + + return forward_event(); +} + +boost::statechart::result PeeringState::Active::react(const MNotifyRec& notevt) +{ + + DECLARE_LOCALS; + ceph_assert(ps->is_primary()); + if (ps->peer_info.count(notevt.from)) { + psdout(10) << "Active: got notify from " << notevt.from + << ", already have info from that osd, ignoring" + << dendl; + } else if (ps->peer_purged.count(notevt.from)) { + psdout(10) << "Active: got notify from " << notevt.from + << ", already purged that peer, ignoring" + << dendl; + } else { + psdout(10) << "Active: got notify from " << notevt.from + << ", calling proc_replica_info and discover_all_missing" + << dendl; + ps->proc_replica_info( + notevt.from, notevt.notify.info, notevt.notify.epoch_sent); + if (ps->have_unfound() || (ps->is_degraded() && ps->might_have_unfound.count(notevt.from))) { + ps->discover_all_missing( + context<PeeringMachine>().get_recovery_ctx().msgs); + } + // check if it is a previous down acting member that's coming back. + // if so, request pg_temp change to trigger a new interval transition + pg_shard_t auth_log_shard; + bool history_les_bound = false; + // FIXME: Uh-oh we have to check this return value; choose_acting can fail! + ps->choose_acting(auth_log_shard, false, &history_les_bound, true); + if (!ps->want_acting.empty() && ps->want_acting != ps->acting) { + psdout(10) << "Active: got notify from previous acting member " + << notevt.from << ", requesting pg_temp change" + << dendl; + } + } + return discard_event(); +} + +boost::statechart::result PeeringState::Active::react(const MTrim& trim) +{ + DECLARE_LOCALS; + ceph_assert(ps->is_primary()); + + // peer is informing us of their last_complete_ondisk + ldout(ps->cct,10) << " replica osd." << trim.from << " lcod " << trim.trim_to << dendl; + ps->update_peer_last_complete_ondisk(pg_shard_t{trim.from, trim.shard}, + trim.trim_to); + // trim log when the pg is recovered + ps->calc_min_last_complete_ondisk(); + return discard_event(); +} + +boost::statechart::result PeeringState::Active::react(const MInfoRec& infoevt) +{ + DECLARE_LOCALS; + ceph_assert(ps->is_primary()); + + ceph_assert(!ps->acting_recovery_backfill.empty()); + if (infoevt.lease_ack) { + ps->proc_lease_ack(infoevt.from.osd, *infoevt.lease_ack); + } + // don't update history (yet) if we are active and primary; the replica + // may be telling us they have activated (and committed) but we can't + // share that until _everyone_ does the same. + if (ps->is_acting_recovery_backfill(infoevt.from) && + ps->peer_activated.count(infoevt.from) == 0) { + psdout(10) << " peer osd." << infoevt.from + << " activated and committed" << dendl; + ps->peer_activated.insert(infoevt.from); + ps->blocked_by.erase(infoevt.from.shard); + pl->publish_stats_to_osd(); + if (ps->peer_activated.size() == ps->acting_recovery_backfill.size()) { + all_activated_and_committed(); + } + } + return discard_event(); +} + +boost::statechart::result PeeringState::Active::react(const MLogRec& logevt) +{ + DECLARE_LOCALS; + psdout(10) << "searching osd." << logevt.from + << " log for unfound items" << dendl; + ps->proc_replica_log( + logevt.msg->info, logevt.msg->log, std::move(logevt.msg->missing), logevt.from); + bool got_missing = ps->search_for_missing( + ps->peer_info[logevt.from], + ps->peer_missing[logevt.from], + logevt.from, + context< PeeringMachine >().get_recovery_ctx()); + // If there are missing AND we are "fully" active then start recovery now + if (got_missing && ps->state_test(PG_STATE_ACTIVE)) { + post_event(DoRecovery()); + } + return discard_event(); +} + +boost::statechart::result PeeringState::Active::react(const QueryState& q) +{ + DECLARE_LOCALS; + + q.f->open_object_section("state"); + q.f->dump_string("name", state_name); + q.f->dump_stream("enter_time") << enter_time; + + { + q.f->open_array_section("might_have_unfound"); + for (auto p = ps->might_have_unfound.begin(); + p != ps->might_have_unfound.end(); + ++p) { + q.f->open_object_section("osd"); + q.f->dump_stream("osd") << *p; + if (ps->peer_missing.count(*p)) { + q.f->dump_string("status", "already probed"); + } else if (ps->peer_missing_requested.count(*p)) { + q.f->dump_string("status", "querying"); + } else if (!ps->get_osdmap()->is_up(p->osd)) { + q.f->dump_string("status", "osd is down"); + } else { + q.f->dump_string("status", "not queried"); + } + q.f->close_section(); + } + q.f->close_section(); + } + { + q.f->open_object_section("recovery_progress"); + q.f->open_array_section("backfill_targets"); + for (auto p = ps->backfill_targets.begin(); + p != ps->backfill_targets.end(); ++p) + q.f->dump_stream("replica") << *p; + q.f->close_section(); + pl->dump_recovery_info(q.f); + q.f->close_section(); + } + + q.f->close_section(); + return forward_event(); +} + +boost::statechart::result PeeringState::Active::react(const QueryUnfound& q) +{ + DECLARE_LOCALS; + + ps->query_unfound(q.f, "Active"); + return discard_event(); +} + +boost::statechart::result PeeringState::Active::react( + const ActivateCommitted &evt) +{ + DECLARE_LOCALS; + ceph_assert(!ps->peer_activated.count(ps->pg_whoami)); + ps->peer_activated.insert(ps->pg_whoami); + psdout(10) << "_activate_committed " << evt.epoch + << " peer_activated now " << ps->peer_activated + << " last_interval_started " + << ps->info.history.last_interval_started + << " last_epoch_started " + << ps->info.history.last_epoch_started + << " same_interval_since " + << ps->info.history.same_interval_since + << dendl; + ceph_assert(!ps->acting_recovery_backfill.empty()); + if (ps->peer_activated.size() == ps->acting_recovery_backfill.size()) + all_activated_and_committed(); + return discard_event(); +} + +boost::statechart::result PeeringState::Active::react(const AllReplicasActivated &evt) +{ + + DECLARE_LOCALS; + pg_t pgid = context< PeeringMachine >().spgid.pgid; + + all_replicas_activated = true; + + ps->state_clear(PG_STATE_ACTIVATING); + ps->state_clear(PG_STATE_CREATING); + ps->state_clear(PG_STATE_PREMERGE); + + bool merge_target; + if (ps->pool.info.is_pending_merge(pgid, &merge_target)) { + ps->state_set(PG_STATE_PEERED); + ps->state_set(PG_STATE_PREMERGE); + + if (ps->actingset.size() != ps->get_osdmap()->get_pg_size(pgid)) { + if (merge_target) { + pg_t src = pgid; + src.set_ps(ps->pool.info.get_pg_num_pending()); + assert(src.get_parent() == pgid); + pl->set_not_ready_to_merge_target(pgid, src); + } else { + pl->set_not_ready_to_merge_source(pgid); + } + } + } else if (!ps->acting_set_writeable()) { + ps->state_set(PG_STATE_PEERED); + } else { + ps->state_set(PG_STATE_ACTIVE); + } + + auto mnow = pl->get_mnow(); + if (ps->prior_readable_until_ub > mnow) { + psdout(10) << " waiting for prior_readable_until_ub " + << ps->prior_readable_until_ub << " > mnow " << mnow << dendl; + ps->state_set(PG_STATE_WAIT); + pl->queue_check_readable( + ps->last_peering_reset, + ps->prior_readable_until_ub - mnow); + } else { + psdout(10) << " mnow " << mnow << " >= prior_readable_until_ub " + << ps->prior_readable_until_ub << dendl; + } + + if (ps->pool.info.has_flag(pg_pool_t::FLAG_CREATING)) { + pl->send_pg_created(pgid); + } + + psdout(1) << __func__ << " AllReplicasActivated Activating complete" << dendl; + + ps->info.history.last_epoch_started = ps->info.last_epoch_started; + ps->info.history.last_interval_started = ps->info.last_interval_started; + ps->dirty_info = true; + + ps->share_pg_info(); + pl->publish_stats_to_osd(); + + pl->on_activate_complete(); + + return discard_event(); +} + +boost::statechart::result PeeringState::Active::react(const RenewLease& rl) +{ + DECLARE_LOCALS; + ps->proc_renew_lease(); + return discard_event(); +} + +boost::statechart::result PeeringState::Active::react(const MLeaseAck& la) +{ + DECLARE_LOCALS; + ps->proc_lease_ack(la.from, la.lease_ack); + return discard_event(); +} + + +boost::statechart::result PeeringState::Active::react(const CheckReadable &evt) +{ + DECLARE_LOCALS; + pl->recheck_readable(); + return discard_event(); +} + +/* + * update info.history.last_epoch_started ONLY after we and all + * replicas have activated AND committed the activate transaction + * (i.e. the peering results are stable on disk). + */ +void PeeringState::Active::all_activated_and_committed() +{ + DECLARE_LOCALS; + psdout(10) << "all_activated_and_committed" << dendl; + ceph_assert(ps->is_primary()); + ceph_assert(ps->peer_activated.size() == ps->acting_recovery_backfill.size()); + ceph_assert(!ps->acting_recovery_backfill.empty()); + ceph_assert(ps->blocked_by.empty()); + + assert(HAVE_FEATURE(ps->upacting_features, SERVER_OCTOPUS)); + // this is overkill when the activation is quick, but when it is slow it + // is important, because the lease was renewed by the activate itself but we + // don't know how long ago that was, and simply scheduling now may leave + // a gap in lease coverage. keep it simple and aggressively renew. + ps->renew_lease(pl->get_mnow()); + ps->send_lease(); + ps->schedule_renew_lease(); + + // Degraded? + ps->update_calc_stats(); + if (ps->info.stats.stats.sum.num_objects_degraded) { + ps->state_set(PG_STATE_DEGRADED); + } else { + ps->state_clear(PG_STATE_DEGRADED); + } + + post_event(PeeringState::AllReplicasActivated()); +} + + +void PeeringState::Active::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + + + DECLARE_LOCALS; + pl->cancel_local_background_io_reservation(); + + ps->blocked_by.clear(); + ps->backfill_reserved = false; + ps->state_clear(PG_STATE_ACTIVATING); + ps->state_clear(PG_STATE_DEGRADED); + ps->state_clear(PG_STATE_UNDERSIZED); + ps->state_clear(PG_STATE_BACKFILL_TOOFULL); + ps->state_clear(PG_STATE_BACKFILL_WAIT); + ps->state_clear(PG_STATE_RECOVERY_WAIT); + ps->state_clear(PG_STATE_RECOVERY_TOOFULL); + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_active_latency, dur); + pl->on_active_exit(); +} + +/*------ReplicaActive-----*/ +PeeringState::ReplicaActive::ReplicaActive(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/ReplicaActive") +{ + context< PeeringMachine >().log_enter(state_name); + + DECLARE_LOCALS; + ps->start_flush(context< PeeringMachine >().get_cur_transaction()); +} + + +boost::statechart::result PeeringState::ReplicaActive::react( + const Activate& actevt) { + DECLARE_LOCALS; + psdout(10) << "In ReplicaActive, about to call activate" << dendl; + ps->activate( + context< PeeringMachine >().get_cur_transaction(), + actevt.activation_epoch, + context< PeeringMachine >().get_recovery_ctx()); + psdout(10) << "Activate Finished" << dendl; + return discard_event(); +} + +boost::statechart::result PeeringState::ReplicaActive::react( + const ActivateCommitted &evt) +{ + DECLARE_LOCALS; + psdout(10) << __func__ << " " << evt.epoch << " telling primary" << dendl; + + auto &rctx = context<PeeringMachine>().get_recovery_ctx(); + auto epoch = ps->get_osdmap_epoch(); + pg_info_t i = ps->info; + i.history.last_epoch_started = evt.activation_epoch; + i.history.last_interval_started = i.history.same_interval_since; + rctx.send_info( + ps->get_primary().osd, + spg_t(ps->info.pgid.pgid, ps->get_primary().shard), + epoch, + epoch, + i, + {}, /* lease */ + ps->get_lease_ack()); + + if (ps->acting_set_writeable()) { + ps->state_set(PG_STATE_ACTIVE); + } else { + ps->state_set(PG_STATE_PEERED); + } + pl->on_activate_committed(); + + return discard_event(); +} + +boost::statechart::result PeeringState::ReplicaActive::react(const MLease& l) +{ + DECLARE_LOCALS; + spg_t spgid = context< PeeringMachine >().spgid; + epoch_t epoch = pl->get_osdmap_epoch(); + + ps->proc_lease(l.lease); + pl->send_cluster_message( + ps->get_primary().osd, + TOPNSPC::make_message<MOSDPGLeaseAck>(epoch, + spg_t(spgid.pgid, ps->get_primary().shard), + ps->get_lease_ack()), + epoch); + return discard_event(); +} + +boost::statechart::result PeeringState::ReplicaActive::react(const MInfoRec& infoevt) +{ + DECLARE_LOCALS; + ps->proc_primary_info(context<PeeringMachine>().get_cur_transaction(), + infoevt.info); + return discard_event(); +} + +boost::statechart::result PeeringState::ReplicaActive::react(const MLogRec& logevt) +{ + DECLARE_LOCALS; + psdout(10) << "received log from " << logevt.from << dendl; + ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction(); + ps->merge_log(t, logevt.msg->info, std::move(logevt.msg->log), logevt.from); + ceph_assert(ps->pg_log.get_head() == ps->info.last_update); + if (logevt.msg->lease) { + ps->proc_lease(*logevt.msg->lease); + } + + return discard_event(); +} + +boost::statechart::result PeeringState::ReplicaActive::react(const MTrim& trim) +{ + DECLARE_LOCALS; + // primary is instructing us to trim + ps->pg_log.trim(trim.trim_to, ps->info); + ps->dirty_info = true; + return discard_event(); +} + +boost::statechart::result PeeringState::ReplicaActive::react(const ActMap&) +{ + DECLARE_LOCALS; + if (ps->should_send_notify() && ps->get_primary().osd >= 0) { + ps->info.history.refresh_prior_readable_until_ub( + pl->get_mnow(), ps->prior_readable_until_ub); + context< PeeringMachine >().send_notify( + ps->get_primary().osd, + pg_notify_t( + ps->get_primary().shard, ps->pg_whoami.shard, + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + ps->info, + ps->past_intervals)); + } + return discard_event(); +} + +boost::statechart::result PeeringState::ReplicaActive::react( + const MQuery& query) +{ + DECLARE_LOCALS; + ps->fulfill_query(query, context<PeeringMachine>().get_recovery_ctx()); + return discard_event(); +} + +boost::statechart::result PeeringState::ReplicaActive::react(const QueryState& q) +{ + q.f->open_object_section("state"); + q.f->dump_string("name", state_name); + q.f->dump_stream("enter_time") << enter_time; + q.f->close_section(); + return forward_event(); +} + +boost::statechart::result PeeringState::ReplicaActive::react(const QueryUnfound& q) +{ + q.f->dump_string("state", "ReplicaActive"); + q.f->dump_bool("available_might_have_unfound", false); + return discard_event(); +} + +void PeeringState::ReplicaActive::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + pl->unreserve_recovery_space(); + + pl->cancel_remote_recovery_reservation(); + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_replicaactive_latency, dur); + + ps->min_last_complete_ondisk = eversion_t(); +} + +/*-------Stray---*/ +PeeringState::Stray::Stray(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Stray") +{ + context< PeeringMachine >().log_enter(state_name); + + + DECLARE_LOCALS; + ceph_assert(!ps->is_peered()); + ceph_assert(!ps->is_peering()); + ceph_assert(!ps->is_primary()); + + if (!ps->get_osdmap()->have_pg_pool(ps->info.pgid.pgid.pool())) { + ldout(ps->cct,10) << __func__ << " pool is deleted" << dendl; + post_event(DeleteStart()); + } else { + ps->start_flush(context< PeeringMachine >().get_cur_transaction()); + } +} + +boost::statechart::result PeeringState::Stray::react(const MLogRec& logevt) +{ + DECLARE_LOCALS; + MOSDPGLog *msg = logevt.msg.get(); + psdout(10) << "got info+log from osd." << logevt.from << " " << msg->info << " " << msg->log << dendl; + + ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction(); + if (msg->info.last_backfill == hobject_t()) { + // restart backfill + ps->info = msg->info; + pl->on_info_history_change(); + ps->dirty_info = true; + ps->dirty_big_info = true; // maybe. + + PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)}; + ps->pg_log.reset_backfill_claim_log(msg->log, rollbacker.get()); + + ps->pg_log.reset_backfill(); + } else { + ps->merge_log(t, msg->info, std::move(msg->log), logevt.from); + } + if (logevt.msg->lease) { + ps->proc_lease(*logevt.msg->lease); + } + + ceph_assert(ps->pg_log.get_head() == ps->info.last_update); + + post_event(Activate(logevt.msg->info.last_epoch_started)); + return transit<ReplicaActive>(); +} + +boost::statechart::result PeeringState::Stray::react(const MInfoRec& infoevt) +{ + DECLARE_LOCALS; + psdout(10) << "got info from osd." << infoevt.from << " " << infoevt.info << dendl; + + if (ps->info.last_update > infoevt.info.last_update) { + // rewind divergent log entries + ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction(); + ps->rewind_divergent_log(t, infoevt.info.last_update); + ps->info.stats = infoevt.info.stats; + ps->info.hit_set = infoevt.info.hit_set; + } + + if (infoevt.lease) { + ps->proc_lease(*infoevt.lease); + } + + ceph_assert(infoevt.info.last_update == ps->info.last_update); + ceph_assert(ps->pg_log.get_head() == ps->info.last_update); + + post_event(Activate(infoevt.info.last_epoch_started)); + return transit<ReplicaActive>(); +} + +boost::statechart::result PeeringState::Stray::react(const MQuery& query) +{ + DECLARE_LOCALS; + ps->fulfill_query(query, context<PeeringMachine>().get_recovery_ctx()); + return discard_event(); +} + +boost::statechart::result PeeringState::Stray::react(const ActMap&) +{ + DECLARE_LOCALS; + if (ps->should_send_notify() && ps->get_primary().osd >= 0) { + ps->info.history.refresh_prior_readable_until_ub( + pl->get_mnow(), ps->prior_readable_until_ub); + context< PeeringMachine >().send_notify( + ps->get_primary().osd, + pg_notify_t( + ps->get_primary().shard, ps->pg_whoami.shard, + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + ps->info, + ps->past_intervals)); + } + return discard_event(); +} + +void PeeringState::Stray::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_stray_latency, dur); +} + + +/*--------ToDelete----------*/ +PeeringState::ToDelete::ToDelete(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/ToDelete") +{ + context< PeeringMachine >().log_enter(state_name); + DECLARE_LOCALS; + pl->get_perf_logger().inc(l_osd_pg_removing); +} + +void PeeringState::ToDelete::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + // note: on a successful removal, this path doesn't execute. see + // do_delete_work(). + pl->get_perf_logger().dec(l_osd_pg_removing); + + pl->cancel_local_background_io_reservation(); +} + +/*----WaitDeleteReserved----*/ +PeeringState::WaitDeleteReserved::WaitDeleteReserved(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, + "Started/ToDelete/WaitDeleteReseved") +{ + context< PeeringMachine >().log_enter(state_name); + DECLARE_LOCALS; + context< ToDelete >().priority = ps->get_delete_priority(); + + pl->cancel_local_background_io_reservation(); + pl->request_local_background_io_reservation( + context<ToDelete>().priority, + std::make_unique<PGPeeringEvent>( + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + DeleteReserved()), + std::make_unique<PGPeeringEvent>( + ps->get_osdmap_epoch(), + ps->get_osdmap_epoch(), + DeleteInterrupted())); +} + +boost::statechart::result PeeringState::ToDelete::react( + const ActMap& evt) +{ + DECLARE_LOCALS; + if (ps->get_delete_priority() != priority) { + psdout(10) << __func__ << " delete priority changed, resetting" + << dendl; + return transit<ToDelete>(); + } + return discard_event(); +} + +void PeeringState::WaitDeleteReserved::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); +} + +/*----Deleting-----*/ +PeeringState::Deleting::Deleting(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/ToDelete/Deleting") +{ + context< PeeringMachine >().log_enter(state_name); + + DECLARE_LOCALS; + ps->deleting = true; + ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction(); + + // clear log + PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)}; + ps->pg_log.roll_forward(rollbacker.get()); + + // adjust info to backfill + ps->info.set_last_backfill(hobject_t()); + ps->pg_log.reset_backfill(); + ps->dirty_info = true; + + pl->on_removal(t); +} + +boost::statechart::result PeeringState::Deleting::react( + const DeleteSome& evt) +{ + DECLARE_LOCALS; + std::pair<ghobject_t, bool> p; + p = pl->do_delete_work(context<PeeringMachine>().get_cur_transaction(), + next); + next = p.first; + return p.second ? discard_event() : terminate(); +} + +void PeeringState::Deleting::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + ps->deleting = false; + pl->cancel_local_background_io_reservation(); +} + +/*--------GetInfo---------*/ +PeeringState::GetInfo::GetInfo(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Peering/GetInfo") +{ + context< PeeringMachine >().log_enter(state_name); + + + DECLARE_LOCALS; + ps->check_past_interval_bounds(); + ps->log_weirdness(); + PastIntervals::PriorSet &prior_set = context< Peering >().prior_set; + + ceph_assert(ps->blocked_by.empty()); + + prior_set = ps->build_prior(); + ps->prior_readable_down_osds = prior_set.down; + + if (ps->prior_readable_down_osds.empty()) { + psdout(10) << " no prior_set down osds, will clear prior_readable_until_ub before activating" + << dendl; + } + + ps->reset_min_peer_features(); + get_infos(); + if (prior_set.pg_down) { + post_event(IsDown()); + } else if (peer_info_requested.empty()) { + post_event(GotInfo()); + } +} + +void PeeringState::GetInfo::get_infos() +{ + DECLARE_LOCALS; + PastIntervals::PriorSet &prior_set = context< Peering >().prior_set; + + ps->blocked_by.clear(); + for (auto it = prior_set.probe.begin(); it != prior_set.probe.end(); ++it) { + pg_shard_t peer = *it; + if (peer == ps->pg_whoami) { + continue; + } + if (ps->peer_info.count(peer)) { + psdout(10) << " have osd." << peer << " info " << ps->peer_info[peer] << dendl; + continue; + } + if (peer_info_requested.count(peer)) { + psdout(10) << " already requested info from osd." << peer << dendl; + ps->blocked_by.insert(peer.osd); + } else if (!ps->get_osdmap()->is_up(peer.osd)) { + psdout(10) << " not querying info from down osd." << peer << dendl; + } else { + psdout(10) << " querying info from osd." << peer << dendl; + context< PeeringMachine >().send_query( + peer.osd, + pg_query_t(pg_query_t::INFO, + it->shard, ps->pg_whoami.shard, + ps->info.history, + ps->get_osdmap_epoch())); + peer_info_requested.insert(peer); + ps->blocked_by.insert(peer.osd); + } + } + + ps->check_prior_readable_down_osds(ps->get_osdmap()); + + pl->publish_stats_to_osd(); +} + +boost::statechart::result PeeringState::GetInfo::react(const MNotifyRec& infoevt) +{ + + DECLARE_LOCALS; + + auto p = peer_info_requested.find(infoevt.from); + if (p != peer_info_requested.end()) { + peer_info_requested.erase(p); + ps->blocked_by.erase(infoevt.from.osd); + } + + epoch_t old_start = ps->info.history.last_epoch_started; + if (ps->proc_replica_info( + infoevt.from, infoevt.notify.info, infoevt.notify.epoch_sent)) { + // we got something new ... + PastIntervals::PriorSet &prior_set = context< Peering >().prior_set; + if (old_start < ps->info.history.last_epoch_started) { + psdout(10) << " last_epoch_started moved forward, rebuilding prior" << dendl; + prior_set = ps->build_prior(); + ps->prior_readable_down_osds = prior_set.down; + + // filter out any osds that got dropped from the probe set from + // peer_info_requested. this is less expensive than restarting + // peering (which would re-probe everyone). + auto p = peer_info_requested.begin(); + while (p != peer_info_requested.end()) { + if (prior_set.probe.count(*p) == 0) { + psdout(20) << " dropping osd." << *p << " from info_requested, no longer in probe set" << dendl; + peer_info_requested.erase(p++); + } else { + ++p; + } + } + get_infos(); + } + psdout(20) << "Adding osd: " << infoevt.from.osd << " peer features: " + << hex << infoevt.features << dec << dendl; + ps->apply_peer_features(infoevt.features); + + // are we done getting everything? + if (peer_info_requested.empty() && !prior_set.pg_down) { + psdout(20) << "Common peer features: " << hex << ps->get_min_peer_features() << dec << dendl; + psdout(20) << "Common acting features: " << hex << ps->get_min_acting_features() << dec << dendl; + psdout(20) << "Common upacting features: " << hex << ps->get_min_upacting_features() << dec << dendl; + post_event(GotInfo()); + } + } + return discard_event(); +} + +boost::statechart::result PeeringState::GetInfo::react(const QueryState& q) +{ + DECLARE_LOCALS; + q.f->open_object_section("state"); + q.f->dump_string("name", state_name); + q.f->dump_stream("enter_time") << enter_time; + + q.f->open_array_section("requested_info_from"); + for (auto p = peer_info_requested.begin(); + p != peer_info_requested.end(); + ++p) { + q.f->open_object_section("osd"); + q.f->dump_stream("osd") << *p; + if (ps->peer_info.count(*p)) { + q.f->open_object_section("got_info"); + ps->peer_info[*p].dump(q.f); + q.f->close_section(); + } + q.f->close_section(); + } + q.f->close_section(); + + q.f->close_section(); + return forward_event(); +} + +boost::statechart::result PeeringState::GetInfo::react(const QueryUnfound& q) +{ + q.f->dump_string("state", "GetInfo"); + q.f->dump_bool("available_might_have_unfound", false); + return discard_event(); +} + +void PeeringState::GetInfo::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_getinfo_latency, dur); + ps->blocked_by.clear(); +} + +/*------GetLog------------*/ +PeeringState::GetLog::GetLog(my_context ctx) + : my_base(ctx), + NamedState( + context< PeeringMachine >().state_history, + "Started/Primary/Peering/GetLog"), + msg(0) +{ + context< PeeringMachine >().log_enter(state_name); + + DECLARE_LOCALS; + + ps->log_weirdness(); + + // adjust acting? + if (!ps->choose_acting(auth_log_shard, false, + &context< Peering >().history_les_bound)) { + if (!ps->want_acting.empty()) { + post_event(NeedActingChange()); + } else { + post_event(IsIncomplete()); + } + return; + } + + // am i the best? + if (auth_log_shard == ps->pg_whoami) { + post_event(GotLog()); + return; + } + + const pg_info_t& best = ps->peer_info[auth_log_shard]; + + // am i broken? + if (ps->info.last_update < best.log_tail) { + psdout(10) << " not contiguous with osd." << auth_log_shard << ", down" << dendl; + post_event(IsIncomplete()); + return; + } + + // how much log to request? + eversion_t request_log_from = ps->info.last_update; + ceph_assert(!ps->acting_recovery_backfill.empty()); + for (auto p = ps->acting_recovery_backfill.begin(); + p != ps->acting_recovery_backfill.end(); + ++p) { + if (*p == ps->pg_whoami) continue; + pg_info_t& ri = ps->peer_info[*p]; + if (ri.last_update < ps->info.log_tail && ri.last_update >= best.log_tail && + ri.last_update < request_log_from) + request_log_from = ri.last_update; + } + + // how much? + psdout(10) << " requesting log from osd." << auth_log_shard << dendl; + context<PeeringMachine>().send_query( + auth_log_shard.osd, + pg_query_t( + pg_query_t::LOG, + auth_log_shard.shard, ps->pg_whoami.shard, + request_log_from, ps->info.history, + ps->get_osdmap_epoch())); + + ceph_assert(ps->blocked_by.empty()); + ps->blocked_by.insert(auth_log_shard.osd); + pl->publish_stats_to_osd(); +} + +boost::statechart::result PeeringState::GetLog::react(const AdvMap& advmap) +{ + // make sure our log source didn't go down. we need to check + // explicitly because it may not be part of the prior set, which + // means the Peering state check won't catch it going down. + if (!advmap.osdmap->is_up(auth_log_shard.osd)) { + psdout(10) << "GetLog: auth_log_shard osd." + << auth_log_shard.osd << " went down" << dendl; + post_event(advmap); + return transit< Reset >(); + } + + // let the Peering state do its checks. + return forward_event(); +} + +boost::statechart::result PeeringState::GetLog::react(const MLogRec& logevt) +{ + ceph_assert(!msg); + if (logevt.from != auth_log_shard) { + psdout(10) << "GetLog: discarding log from " + << "non-auth_log_shard osd." << logevt.from << dendl; + return discard_event(); + } + psdout(10) << "GetLog: received master log from osd." + << logevt.from << dendl; + msg = logevt.msg; + post_event(GotLog()); + return discard_event(); +} + +boost::statechart::result PeeringState::GetLog::react(const GotLog&) +{ + + DECLARE_LOCALS; + psdout(10) << "leaving GetLog" << dendl; + if (msg) { + psdout(10) << "processing master log" << dendl; + ps->proc_master_log(context<PeeringMachine>().get_cur_transaction(), + msg->info, std::move(msg->log), std::move(msg->missing), + auth_log_shard); + } + ps->start_flush(context< PeeringMachine >().get_cur_transaction()); + return transit< GetMissing >(); +} + +boost::statechart::result PeeringState::GetLog::react(const QueryState& q) +{ + q.f->open_object_section("state"); + q.f->dump_string("name", state_name); + q.f->dump_stream("enter_time") << enter_time; + q.f->dump_stream("auth_log_shard") << auth_log_shard; + q.f->close_section(); + return forward_event(); +} + +boost::statechart::result PeeringState::GetLog::react(const QueryUnfound& q) +{ + q.f->dump_string("state", "GetLog"); + q.f->dump_bool("available_might_have_unfound", false); + return discard_event(); +} + +void PeeringState::GetLog::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_getlog_latency, dur); + ps->blocked_by.clear(); +} + +/*------WaitActingChange--------*/ +PeeringState::WaitActingChange::WaitActingChange(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/WaitActingChange") +{ + context< PeeringMachine >().log_enter(state_name); +} + +boost::statechart::result PeeringState::WaitActingChange::react(const AdvMap& advmap) +{ + DECLARE_LOCALS; + OSDMapRef osdmap = advmap.osdmap; + + psdout(10) << "verifying no want_acting " << ps->want_acting << " targets didn't go down" << dendl; + for (auto p = ps->want_acting.begin(); p != ps->want_acting.end(); ++p) { + if (!osdmap->is_up(*p)) { + psdout(10) << " want_acting target osd." << *p << " went down, resetting" << dendl; + post_event(advmap); + return transit< Reset >(); + } + } + return forward_event(); +} + +boost::statechart::result PeeringState::WaitActingChange::react(const MLogRec& logevt) +{ + psdout(10) << "In WaitActingChange, ignoring MLocRec" << dendl; + return discard_event(); +} + +boost::statechart::result PeeringState::WaitActingChange::react(const MInfoRec& evt) +{ + psdout(10) << "In WaitActingChange, ignoring MInfoRec" << dendl; + return discard_event(); +} + +boost::statechart::result PeeringState::WaitActingChange::react(const MNotifyRec& evt) +{ + psdout(10) << "In WaitActingChange, ignoring MNotifyRec" << dendl; + return discard_event(); +} + +boost::statechart::result PeeringState::WaitActingChange::react(const QueryState& q) +{ + q.f->open_object_section("state"); + q.f->dump_string("name", state_name); + q.f->dump_stream("enter_time") << enter_time; + q.f->dump_string("comment", "waiting for pg acting set to change"); + q.f->close_section(); + return forward_event(); +} + +boost::statechart::result PeeringState::WaitActingChange::react(const QueryUnfound& q) +{ + q.f->dump_string("state", "WaitActingChange"); + q.f->dump_bool("available_might_have_unfound", false); + return discard_event(); +} + +void PeeringState::WaitActingChange::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_waitactingchange_latency, dur); +} + +/*------Down--------*/ +PeeringState::Down::Down(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Peering/Down") +{ + context< PeeringMachine >().log_enter(state_name); + DECLARE_LOCALS; + + ps->state_clear(PG_STATE_PEERING); + ps->state_set(PG_STATE_DOWN); + + auto &prior_set = context< Peering >().prior_set; + ceph_assert(ps->blocked_by.empty()); + ps->blocked_by.insert(prior_set.down.begin(), prior_set.down.end()); + pl->publish_stats_to_osd(); +} + +void PeeringState::Down::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + + DECLARE_LOCALS; + + ps->state_clear(PG_STATE_DOWN); + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_down_latency, dur); + + ps->blocked_by.clear(); +} + +boost::statechart::result PeeringState::Down::react(const QueryState& q) +{ + q.f->open_object_section("state"); + q.f->dump_string("name", state_name); + q.f->dump_stream("enter_time") << enter_time; + q.f->dump_string("comment", + "not enough up instances of this PG to go active"); + q.f->close_section(); + return forward_event(); +} + +boost::statechart::result PeeringState::Down::react(const QueryUnfound& q) +{ + q.f->dump_string("state", "Down"); + q.f->dump_bool("available_might_have_unfound", false); + return discard_event(); +} + +boost::statechart::result PeeringState::Down::react(const MNotifyRec& infoevt) +{ + DECLARE_LOCALS; + + ceph_assert(ps->is_primary()); + epoch_t old_start = ps->info.history.last_epoch_started; + if (!ps->peer_info.count(infoevt.from) && + ps->get_osdmap()->has_been_up_since(infoevt.from.osd, infoevt.notify.epoch_sent)) { + ps->update_history(infoevt.notify.info.history); + } + // if we got something new to make pg escape down state + if (ps->info.history.last_epoch_started > old_start) { + psdout(10) << " last_epoch_started moved forward, re-enter getinfo" << dendl; + ps->state_clear(PG_STATE_DOWN); + ps->state_set(PG_STATE_PEERING); + return transit< GetInfo >(); + } + + return discard_event(); +} + + +/*------Incomplete--------*/ +PeeringState::Incomplete::Incomplete(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Peering/Incomplete") +{ + context< PeeringMachine >().log_enter(state_name); + DECLARE_LOCALS; + + ps->state_clear(PG_STATE_PEERING); + ps->state_set(PG_STATE_INCOMPLETE); + + PastIntervals::PriorSet &prior_set = context< Peering >().prior_set; + ceph_assert(ps->blocked_by.empty()); + ps->blocked_by.insert(prior_set.down.begin(), prior_set.down.end()); + pl->publish_stats_to_osd(); +} + +boost::statechart::result PeeringState::Incomplete::react(const AdvMap &advmap) { + DECLARE_LOCALS; + int64_t poolnum = ps->info.pgid.pool(); + + // Reset if min_size turn smaller than previous value, pg might now be able to go active + if (!advmap.osdmap->have_pg_pool(poolnum) || + advmap.lastmap->get_pools().find(poolnum)->second.min_size > + advmap.osdmap->get_pools().find(poolnum)->second.min_size) { + post_event(advmap); + return transit< Reset >(); + } + + return forward_event(); +} + +boost::statechart::result PeeringState::Incomplete::react(const MNotifyRec& notevt) { + DECLARE_LOCALS; + psdout(7) << "handle_pg_notify from osd." << notevt.from << dendl; + if (ps->proc_replica_info( + notevt.from, notevt.notify.info, notevt.notify.epoch_sent)) { + // We got something new, try again! + return transit< GetLog >(); + } else { + return discard_event(); + } +} + +boost::statechart::result PeeringState::Incomplete::react( + const QueryState& q) +{ + q.f->open_object_section("state"); + q.f->dump_string("name", state_name); + q.f->dump_stream("enter_time") << enter_time; + q.f->dump_string("comment", "not enough complete instances of this PG"); + q.f->close_section(); + return forward_event(); +} + +boost::statechart::result PeeringState::Incomplete::react(const QueryUnfound& q) +{ + q.f->dump_string("state", "Incomplete"); + q.f->dump_bool("available_might_have_unfound", false); + return discard_event(); +} + +void PeeringState::Incomplete::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + + DECLARE_LOCALS; + + ps->state_clear(PG_STATE_INCOMPLETE); + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_incomplete_latency, dur); + + ps->blocked_by.clear(); +} + +/*------GetMissing--------*/ +PeeringState::GetMissing::GetMissing(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Peering/GetMissing") +{ + context< PeeringMachine >().log_enter(state_name); + + DECLARE_LOCALS; + ps->log_weirdness(); + ceph_assert(!ps->acting_recovery_backfill.empty()); + eversion_t since; + for (auto i = ps->acting_recovery_backfill.begin(); + i != ps->acting_recovery_backfill.end(); + ++i) { + if (*i == ps->get_primary()) continue; + const pg_info_t& pi = ps->peer_info[*i]; + // reset this so to make sure the pg_missing_t is initialized and + // has the correct semantics even if we don't need to get a + // missing set from a shard. This way later additions due to + // lost+unfound delete work properly. + ps->peer_missing[*i].may_include_deletes = !ps->perform_deletes_during_peering(); + + if (pi.is_empty()) + continue; // no pg data, nothing divergent + + if (pi.last_update < ps->pg_log.get_tail()) { + psdout(10) << " osd." << *i << " is not contiguous, will restart backfill" << dendl; + ps->peer_missing[*i].clear(); + continue; + } + if (pi.last_backfill == hobject_t()) { + psdout(10) << " osd." << *i << " will fully backfill; can infer empty missing set" << dendl; + ps->peer_missing[*i].clear(); + continue; + } + + if (pi.last_update == pi.last_complete && // peer has no missing + pi.last_update == ps->info.last_update) { // peer is up to date + // replica has no missing and identical log as us. no need to + // pull anything. + // FIXME: we can do better here. if last_update==last_complete we + // can infer the rest! + psdout(10) << " osd." << *i << " has no missing, identical log" << dendl; + ps->peer_missing[*i].clear(); + continue; + } + + // We pull the log from the peer's last_epoch_started to ensure we + // get enough log to detect divergent updates. + since.epoch = pi.last_epoch_started; + ceph_assert(pi.last_update >= ps->info.log_tail); // or else choose_acting() did a bad thing + if (pi.log_tail <= since) { + psdout(10) << " requesting log+missing since " << since << " from osd." << *i << dendl; + context< PeeringMachine >().send_query( + i->osd, + pg_query_t( + pg_query_t::LOG, + i->shard, ps->pg_whoami.shard, + since, ps->info.history, + ps->get_osdmap_epoch())); + } else { + psdout(10) << " requesting fulllog+missing from osd." << *i + << " (want since " << since << " < log.tail " + << pi.log_tail << ")" << dendl; + context< PeeringMachine >().send_query( + i->osd, pg_query_t( + pg_query_t::FULLLOG, + i->shard, ps->pg_whoami.shard, + ps->info.history, ps->get_osdmap_epoch())); + } + peer_missing_requested.insert(*i); + ps->blocked_by.insert(i->osd); + } + + if (peer_missing_requested.empty()) { + if (ps->need_up_thru) { + psdout(10) << " still need up_thru update before going active" + << dendl; + post_event(NeedUpThru()); + return; + } + + // all good! + post_event(Activate(ps->get_osdmap_epoch())); + } else { + pl->publish_stats_to_osd(); + } +} + +boost::statechart::result PeeringState::GetMissing::react(const MLogRec& logevt) +{ + DECLARE_LOCALS; + + peer_missing_requested.erase(logevt.from); + ps->proc_replica_log(logevt.msg->info, + logevt.msg->log, + std::move(logevt.msg->missing), + logevt.from); + + if (peer_missing_requested.empty()) { + if (ps->need_up_thru) { + psdout(10) << " still need up_thru update before going active" + << dendl; + post_event(NeedUpThru()); + } else { + psdout(10) << "Got last missing, don't need missing " + << "posting Activate" << dendl; + post_event(Activate(ps->get_osdmap_epoch())); + } + } + return discard_event(); +} + +boost::statechart::result PeeringState::GetMissing::react(const QueryState& q) +{ + DECLARE_LOCALS; + q.f->open_object_section("state"); + q.f->dump_string("name", state_name); + q.f->dump_stream("enter_time") << enter_time; + + q.f->open_array_section("peer_missing_requested"); + for (auto p = peer_missing_requested.begin(); + p != peer_missing_requested.end(); + ++p) { + q.f->open_object_section("osd"); + q.f->dump_stream("osd") << *p; + if (ps->peer_missing.count(*p)) { + q.f->open_object_section("got_missing"); + ps->peer_missing[*p].dump(q.f); + q.f->close_section(); + } + q.f->close_section(); + } + q.f->close_section(); + + q.f->close_section(); + return forward_event(); +} + +boost::statechart::result PeeringState::GetMissing::react(const QueryUnfound& q) +{ + q.f->dump_string("state", "GetMising"); + q.f->dump_bool("available_might_have_unfound", false); + return discard_event(); +} + +void PeeringState::GetMissing::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_getmissing_latency, dur); + ps->blocked_by.clear(); +} + +/*------WaitUpThru--------*/ +PeeringState::WaitUpThru::WaitUpThru(my_context ctx) + : my_base(ctx), + NamedState(context< PeeringMachine >().state_history, "Started/Primary/Peering/WaitUpThru") +{ + context< PeeringMachine >().log_enter(state_name); +} + +boost::statechart::result PeeringState::WaitUpThru::react(const ActMap& am) +{ + DECLARE_LOCALS; + if (!ps->need_up_thru) { + post_event(Activate(ps->get_osdmap_epoch())); + } + return forward_event(); +} + +boost::statechart::result PeeringState::WaitUpThru::react(const MLogRec& logevt) +{ + DECLARE_LOCALS; + psdout(10) << "Noting missing from osd." << logevt.from << dendl; + ps->peer_missing[logevt.from].claim(std::move(logevt.msg->missing)); + ps->peer_info[logevt.from] = logevt.msg->info; + return discard_event(); +} + +boost::statechart::result PeeringState::WaitUpThru::react(const QueryState& q) +{ + q.f->open_object_section("state"); + q.f->dump_string("name", state_name); + q.f->dump_stream("enter_time") << enter_time; + q.f->dump_string("comment", "waiting for osdmap to reflect a new up_thru for this osd"); + q.f->close_section(); + return forward_event(); +} + +boost::statechart::result PeeringState::WaitUpThru::react(const QueryUnfound& q) +{ + q.f->dump_string("state", "WaitUpThru"); + q.f->dump_bool("available_might_have_unfound", false); + return discard_event(); +} + +void PeeringState::WaitUpThru::exit() +{ + context< PeeringMachine >().log_exit(state_name, enter_time); + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + pl->get_peering_perf().tinc(rs_waitupthru_latency, dur); +} + +/*----PeeringState::PeeringMachine Methods-----*/ +#undef dout_prefix +#define dout_prefix dpp->gen_prefix(*_dout) + +void PeeringState::PeeringMachine::log_enter(const char *state_name) +{ + DECLARE_LOCALS; + psdout(5) << "enter " << state_name << dendl; + pl->log_state_enter(state_name); +} + +void PeeringState::PeeringMachine::log_exit(const char *state_name, utime_t enter_time) +{ + DECLARE_LOCALS; + utime_t dur = ceph_clock_now() - enter_time; + psdout(5) << "exit " << state_name << " " << dur << " " << event_count << " " << event_time << dendl; + pl->log_state_exit(state_name, enter_time, event_count, event_time); + event_count = 0; + event_time = utime_t(); +} + +ostream &operator<<(ostream &out, const PeeringState &ps) { + out << "pg[" << ps.info + << " " << pg_vector_string(ps.up); + if (ps.acting != ps.up) + out << "/" << pg_vector_string(ps.acting); + if (ps.is_ec_pg()) + out << "p" << ps.get_primary(); + if (!ps.async_recovery_targets.empty()) + out << " async=[" << ps.async_recovery_targets << "]"; + if (!ps.backfill_targets.empty()) + out << " backfill=[" << ps.backfill_targets << "]"; + out << " r=" << ps.get_role(); + out << " lpr=" << ps.get_last_peering_reset(); + + if (ps.deleting) + out << " DELETING"; + + if (!ps.past_intervals.empty()) { + out << " pi=[" << ps.past_intervals.get_bounds() + << ")/" << ps.past_intervals.size(); + } + + if (ps.is_peered()) { + if (ps.last_update_ondisk != ps.info.last_update) + out << " luod=" << ps.last_update_ondisk; + if (ps.last_update_applied != ps.info.last_update) + out << " lua=" << ps.last_update_applied; + } + + if (ps.pg_log.get_tail() != ps.info.log_tail || + ps.pg_log.get_head() != ps.info.last_update) + out << " (info mismatch, " << ps.pg_log.get_log() << ")"; + + if (!ps.pg_log.get_log().empty()) { + if ((ps.pg_log.get_log().log.begin()->version <= ps.pg_log.get_tail())) { + out << " (log bound mismatch, actual=[" + << ps.pg_log.get_log().log.begin()->version << "," + << ps.pg_log.get_log().log.rbegin()->version << "]"; + out << ")"; + } + } + + out << " crt=" << ps.pg_log.get_can_rollback_to(); + + if (ps.last_complete_ondisk != ps.info.last_complete) + out << " lcod " << ps.last_complete_ondisk; + + out << " mlcod " << ps.min_last_complete_ondisk; + + out << " " << pg_state_string(ps.get_state()); + if (ps.should_send_notify()) + out << " NOTIFY"; + + if (ps.prior_readable_until_ub != ceph::signedspan::zero()) { + out << " pruub " << ps.prior_readable_until_ub + << "@" << ps.get_prior_readable_down_osds(); + } + return out; +} + +std::vector<pg_shard_t> PeeringState::get_replica_recovery_order() const +{ + std::vector<std::pair<unsigned int, pg_shard_t>> replicas_by_num_missing, + async_by_num_missing; + replicas_by_num_missing.reserve(get_acting_recovery_backfill().size() - 1); + for (auto &p : get_acting_recovery_backfill()) { + if (p == get_primary()) { + continue; + } + auto pm = get_peer_missing().find(p); + assert(pm != get_peer_missing().end()); + auto nm = pm->second.num_missing(); + if (nm != 0) { + if (is_async_recovery_target(p)) { + async_by_num_missing.push_back(make_pair(nm, p)); + } else { + replicas_by_num_missing.push_back(make_pair(nm, p)); + } + } + } + // sort by number of missing objects, in ascending order. + auto func = [](const std::pair<unsigned int, pg_shard_t> &lhs, + const std::pair<unsigned int, pg_shard_t> &rhs) { + return lhs.first < rhs.first; + }; + // acting goes first + std::sort(replicas_by_num_missing.begin(), replicas_by_num_missing.end(), func); + // then async_recovery_targets + std::sort(async_by_num_missing.begin(), async_by_num_missing.end(), func); + replicas_by_num_missing.insert(replicas_by_num_missing.end(), + async_by_num_missing.begin(), async_by_num_missing.end()); + + std::vector<pg_shard_t> ret; + ret.reserve(replicas_by_num_missing.size()); + for (auto p : replicas_by_num_missing) { + ret.push_back(p.second); + } + return ret; +} + + |