summaryrefslogtreecommitdiffstats
path: root/src/osd/PG.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/osd/PG.cc
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/osd/PG.cc')
-rw-r--r--src/osd/PG.cc2753
1 files changed, 2753 insertions, 0 deletions
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
new file mode 100644
index 000000000..5b10f1466
--- /dev/null
+++ b/src/osd/PG.cc
@@ -0,0 +1,2753 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "PG.h"
+#include "messages/MOSDRepScrub.h"
+
+#include "common/errno.h"
+#include "common/ceph_releases.h"
+#include "common/config.h"
+#include "OSD.h"
+#include "OpRequest.h"
+#include "ScrubStore.h"
+#include "pg_scrubber.h"
+#include "Session.h"
+#include "osd/scheduler/OpSchedulerItem.h"
+
+#include "common/Timer.h"
+#include "common/perf_counters.h"
+
+#include "messages/MOSDOp.h"
+#include "messages/MOSDPGNotify.h"
+#include "messages/MOSDPGInfo.h"
+#include "messages/MOSDPGScan.h"
+#include "messages/MOSDPGBackfill.h"
+#include "messages/MOSDPGBackfillRemove.h"
+#include "messages/MBackfillReserve.h"
+#include "messages/MRecoveryReserve.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+#include "messages/MOSDPGPull.h"
+#include "messages/MOSDECSubOpWrite.h"
+#include "messages/MOSDECSubOpWriteReply.h"
+#include "messages/MOSDECSubOpRead.h"
+#include "messages/MOSDECSubOpReadReply.h"
+#include "messages/MOSDPGUpdateLogMissing.h"
+#include "messages/MOSDPGUpdateLogMissingReply.h"
+#include "messages/MOSDBackoff.h"
+#include "messages/MOSDScrubReserve.h"
+#include "messages/MOSDRepOp.h"
+#include "messages/MOSDRepOpReply.h"
+#include "messages/MOSDRepScrubMap.h"
+#include "messages/MOSDPGRecoveryDelete.h"
+#include "messages/MOSDPGRecoveryDeleteReply.h"
+
+#include "common/BackTrace.h"
+#include "common/EventTrace.h"
+
+#ifdef WITH_LTTNG
+#define TRACEPOINT_DEFINE
+#define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
+#include "tracing/pg.h"
+#undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
+#undef TRACEPOINT_DEFINE
+#else
+#define tracepoint(...)
+#endif
+
+#include <sstream>
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+
+using std::list;
+using std::map;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::unique_ptr;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::bufferptr;
+using ceph::decode;
+using ceph::encode;
+using ceph::Formatter;
+
+using namespace ceph::osd::scheduler;
+
+template <class T>
+static ostream& _prefix(std::ostream *_dout, T *t)
+{
+ return t->gen_prefix(*_dout);
+}
+
+void PG::get(const char* tag)
+{
+ int after = ++ref;
+ lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " "
+ << "tag " << (tag ? tag : "(none") << " "
+ << (after - 1) << " -> " << after << dendl;
+#ifdef PG_DEBUG_REFS
+ std::lock_guard l(_ref_id_lock);
+ _tag_counts[tag]++;
+#endif
+}
+
+void PG::put(const char* tag)
+{
+#ifdef PG_DEBUG_REFS
+ {
+ std::lock_guard l(_ref_id_lock);
+ auto tag_counts_entry = _tag_counts.find(tag);
+ ceph_assert(tag_counts_entry != _tag_counts.end());
+ --tag_counts_entry->second;
+ if (tag_counts_entry->second == 0) {
+ _tag_counts.erase(tag_counts_entry);
+ }
+ }
+#endif
+ auto local_cct = cct;
+ int after = --ref;
+ lgeneric_subdout(local_cct, refs, 5) << "PG::put " << this << " "
+ << "tag " << (tag ? tag : "(none") << " "
+ << (after + 1) << " -> " << after
+ << dendl;
+ if (after == 0)
+ delete this;
+}
+
+#ifdef PG_DEBUG_REFS
+uint64_t PG::get_with_id()
+{
+ ref++;
+ std::lock_guard l(_ref_id_lock);
+ uint64_t id = ++_ref_id;
+ BackTrace bt(0);
+ stringstream ss;
+ bt.print(ss);
+ lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " " << info.pgid
+ << " got id " << id << " "
+ << (ref - 1) << " -> " << ref
+ << dendl;
+ ceph_assert(!_live_ids.count(id));
+ _live_ids.insert(make_pair(id, ss.str()));
+ return id;
+}
+
+void PG::put_with_id(uint64_t id)
+{
+ int newref = --ref;
+ lgeneric_subdout(cct, refs, 5) << "PG::put " << this << " " << info.pgid
+ << " put id " << id << " "
+ << (newref + 1) << " -> " << newref
+ << dendl;
+ {
+ std::lock_guard l(_ref_id_lock);
+ ceph_assert(_live_ids.count(id));
+ _live_ids.erase(id);
+ }
+ if (newref)
+ delete this;
+}
+
+void PG::dump_live_ids()
+{
+ std::lock_guard l(_ref_id_lock);
+ dout(0) << "\t" << __func__ << ": " << info.pgid << " live ids:" << dendl;
+ for (map<uint64_t, string>::iterator i = _live_ids.begin();
+ i != _live_ids.end();
+ ++i) {
+ dout(0) << "\t\tid: " << *i << dendl;
+ }
+ dout(0) << "\t" << __func__ << ": " << info.pgid << " live tags:" << dendl;
+ for (map<string, uint64_t>::iterator i = _tag_counts.begin();
+ i != _tag_counts.end();
+ ++i) {
+ dout(0) << "\t\tid: " << *i << dendl;
+ }
+}
+#endif
+
+PG::PG(OSDService *o, OSDMapRef curmap,
+ const PGPool &_pool, spg_t p) :
+ pg_whoami(o->whoami, p.shard),
+ pg_id(p),
+ coll(p),
+ osd(o),
+ cct(o->cct),
+ osdriver(osd->store, coll_t(), OSD::make_snapmapper_oid()),
+ snap_mapper(
+ cct,
+ &osdriver,
+ p.ps(),
+ p.get_split_bits(_pool.info.get_pg_num()),
+ _pool.id,
+ p.shard),
+ trace_endpoint("0.0.0.0", 0, "PG"),
+ info_struct_v(0),
+ pgmeta_oid(p.make_pgmeta_oid()),
+ stat_queue_item(this),
+ recovery_queued(false),
+ recovery_ops_active(0),
+ backfill_reserving(false),
+ pg_stats_publish_valid(false),
+ finish_sync_event(NULL),
+ scrub_after_recovery(false),
+ active_pushes(0),
+ recovery_state(
+ o->cct,
+ pg_whoami,
+ p,
+ _pool,
+ curmap,
+ this,
+ this),
+ pool(recovery_state.get_pool()),
+ info(recovery_state.get_info())
+{
+#ifdef PG_DEBUG_REFS
+ osd->add_pgid(p, this);
+#endif
+#ifdef WITH_BLKIN
+ std::stringstream ss;
+ ss << "PG " << info.pgid;
+ trace_endpoint.copy_name(ss.str());
+#endif
+}
+
+PG::~PG()
+{
+#ifdef PG_DEBUG_REFS
+ osd->remove_pgid(info.pgid, this);
+#endif
+}
+
+void PG::lock(bool no_lockdep) const
+{
+#ifdef CEPH_DEBUG_MUTEX
+ _lock.lock(no_lockdep);
+#else
+ _lock.lock();
+ locked_by = std::this_thread::get_id();
+#endif
+ // if we have unrecorded dirty state with the lock dropped, there is a bug
+ ceph_assert(!recovery_state.debug_has_dirty_state());
+
+ dout(30) << "lock" << dendl;
+}
+
+bool PG::is_locked() const
+{
+ return ceph_mutex_is_locked(_lock);
+}
+
+void PG::unlock() const
+{
+ //generic_dout(0) << this << " " << info.pgid << " unlock" << dendl;
+ ceph_assert(!recovery_state.debug_has_dirty_state());
+#ifndef CEPH_DEBUG_MUTEX
+ locked_by = {};
+#endif
+ _lock.unlock();
+}
+
+std::ostream& PG::gen_prefix(std::ostream& out) const
+{
+ OSDMapRef mapref = recovery_state.get_osdmap();
+#ifdef CEPH_DEBUG_MUTEX
+ if (_lock.is_locked_by_me()) {
+#else
+ if (locked_by == std::this_thread::get_id()) {
+#endif
+ out << "osd." << osd->whoami
+ << " pg_epoch: " << (mapref ? mapref->get_epoch():0)
+ << " " << *this << " ";
+ } else {
+ out << "osd." << osd->whoami
+ << " pg_epoch: " << (mapref ? mapref->get_epoch():0)
+ << " pg[" << pg_id.pgid << "(unlocked)] ";
+ }
+ return out;
+}
+
+PerfCounters &PG::get_peering_perf() {
+ return *(osd->recoverystate_perf);
+}
+
+PerfCounters &PG::get_perf_logger() {
+ return *(osd->logger);
+}
+
+void PG::log_state_enter(const char *state) {
+ osd->pg_recovery_stats.log_enter(state);
+}
+
+void PG::log_state_exit(
+ const char *state_name, utime_t enter_time,
+ uint64_t events, utime_t event_dur) {
+ osd->pg_recovery_stats.log_exit(
+ state_name, ceph_clock_now() - enter_time, events, event_dur);
+}
+
+/********* PG **********/
+
+void PG::remove_snap_mapped_object(
+ ObjectStore::Transaction &t, const hobject_t &soid)
+{
+ t.remove(
+ coll,
+ ghobject_t(soid, ghobject_t::NO_GEN, pg_whoami.shard));
+ clear_object_snap_mapping(&t, soid);
+}
+
+void PG::clear_object_snap_mapping(
+ ObjectStore::Transaction *t, const hobject_t &soid)
+{
+ OSDriver::OSTransaction _t(osdriver.get_transaction(t));
+ if (soid.snap < CEPH_MAXSNAP) {
+ int r = snap_mapper.remove_oid(
+ soid,
+ &_t);
+ if (!(r == 0 || r == -ENOENT)) {
+ derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
+ ceph_abort();
+ }
+ }
+}
+
+void PG::update_object_snap_mapping(
+ ObjectStore::Transaction *t, const hobject_t &soid, const set<snapid_t> &snaps)
+{
+ OSDriver::OSTransaction _t(osdriver.get_transaction(t));
+ ceph_assert(soid.snap < CEPH_MAXSNAP);
+ int r = snap_mapper.remove_oid(
+ soid,
+ &_t);
+ if (!(r == 0 || r == -ENOENT)) {
+ derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
+ ceph_abort();
+ }
+ snap_mapper.add_oid(
+ soid,
+ snaps,
+ &_t);
+}
+
+/******* PG ***********/
+void PG::clear_primary_state()
+{
+ dout(20) << __func__ << dendl;
+
+ projected_log = PGLog::IndexedLog();
+
+ snap_trimq.clear();
+ snap_trimq_repeat.clear();
+ finish_sync_event = 0; // so that _finish_recovery doesn't go off in another thread
+ release_pg_backoffs();
+
+ if (m_scrubber) {
+ m_scrubber->discard_replica_reservations();
+ }
+ scrub_after_recovery = false;
+
+ agent_clear();
+}
+
+
+bool PG::op_has_sufficient_caps(OpRequestRef& op)
+{
+ // only check MOSDOp
+ if (op->get_req()->get_type() != CEPH_MSG_OSD_OP)
+ return true;
+
+ auto req = op->get_req<MOSDOp>();
+ auto priv = req->get_connection()->get_priv();
+ auto session = static_cast<Session*>(priv.get());
+ if (!session) {
+ dout(0) << "op_has_sufficient_caps: no session for op " << *req << dendl;
+ return false;
+ }
+ OSDCap& caps = session->caps;
+ priv.reset();
+
+ const string &key = req->get_hobj().get_key().empty() ?
+ req->get_oid().name :
+ req->get_hobj().get_key();
+
+ bool cap = caps.is_capable(pool.name, req->get_hobj().nspace,
+ pool.info.application_metadata,
+ key,
+ op->need_read_cap(),
+ op->need_write_cap(),
+ op->classes(),
+ session->get_peer_socket_addr());
+
+ dout(20) << "op_has_sufficient_caps "
+ << "session=" << session
+ << " pool=" << pool.id << " (" << pool.name
+ << " " << req->get_hobj().nspace
+ << ")"
+ << " pool_app_metadata=" << pool.info.application_metadata
+ << " need_read_cap=" << op->need_read_cap()
+ << " need_write_cap=" << op->need_write_cap()
+ << " classes=" << op->classes()
+ << " -> " << (cap ? "yes" : "NO")
+ << dendl;
+ return cap;
+}
+
+void PG::queue_recovery()
+{
+ if (!is_primary() || !is_peered()) {
+ dout(10) << "queue_recovery -- not primary or not peered " << dendl;
+ ceph_assert(!recovery_queued);
+ } else if (recovery_queued) {
+ dout(10) << "queue_recovery -- already queued" << dendl;
+ } else {
+ dout(10) << "queue_recovery -- queuing" << dendl;
+ recovery_queued = true;
+ osd->queue_for_recovery(this);
+ }
+}
+
+void PG::queue_scrub_after_repair()
+{
+ dout(10) << __func__ << dendl;
+ ceph_assert(ceph_mutex_is_locked(_lock));
+
+ m_planned_scrub.must_deep_scrub = true;
+ m_planned_scrub.check_repair = true;
+ m_planned_scrub.must_scrub = true;
+
+ if (is_scrub_queued_or_active()) {
+ dout(10) << __func__ << ": scrubbing already ("
+ << (is_scrubbing() ? "active)" : "queued)") << dendl;
+ return;
+ }
+
+ m_scrubber->set_op_parameters(m_planned_scrub);
+ dout(15) << __func__ << ": queueing" << dendl;
+
+ m_scrubber->set_queued_or_active();
+ osd->queue_scrub_after_repair(this, Scrub::scrub_prio_t::high_priority);
+}
+
+unsigned PG::get_scrub_priority()
+{
+ // a higher value -> a higher priority
+ int64_t pool_scrub_priority =
+ pool.info.opts.value_or(pool_opts_t::SCRUB_PRIORITY, (int64_t)0);
+ return pool_scrub_priority > 0 ? pool_scrub_priority : cct->_conf->osd_scrub_priority;
+}
+
+Context *PG::finish_recovery()
+{
+ dout(10) << "finish_recovery" << dendl;
+ ceph_assert(info.last_complete == info.last_update);
+
+ clear_recovery_state();
+
+ /*
+ * sync all this before purging strays. but don't block!
+ */
+ finish_sync_event = new C_PG_FinishRecovery(this);
+ return finish_sync_event;
+}
+
+void PG::_finish_recovery(Context* c)
+{
+ dout(15) << __func__ << " finish_sync_event? " << finish_sync_event << " clean? "
+ << is_clean() << dendl;
+
+ std::scoped_lock locker{*this};
+ if (recovery_state.is_deleting() || !is_clean()) {
+ dout(10) << __func__ << " raced with delete or repair" << dendl;
+ return;
+ }
+ // When recovery is initiated by a repair, that flag is left on
+ state_clear(PG_STATE_REPAIR);
+ if (c == finish_sync_event) {
+ dout(15) << __func__ << " scrub_after_recovery? " << scrub_after_recovery << dendl;
+ finish_sync_event = 0;
+ recovery_state.purge_strays();
+
+ publish_stats_to_osd();
+
+ if (scrub_after_recovery) {
+ dout(10) << "_finish_recovery requeueing for scrub" << dendl;
+ scrub_after_recovery = false;
+ queue_scrub_after_repair();
+ }
+ } else {
+ dout(10) << "_finish_recovery -- stale" << dendl;
+ }
+}
+
+void PG::start_recovery_op(const hobject_t& soid)
+{
+ dout(10) << "start_recovery_op " << soid
+#ifdef DEBUG_RECOVERY_OIDS
+ << " (" << recovering_oids << ")"
+#endif
+ << dendl;
+ ceph_assert(recovery_ops_active >= 0);
+ recovery_ops_active++;
+#ifdef DEBUG_RECOVERY_OIDS
+ recovering_oids.insert(soid);
+#endif
+ osd->start_recovery_op(this, soid);
+}
+
+void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
+{
+ dout(10) << "finish_recovery_op " << soid
+#ifdef DEBUG_RECOVERY_OIDS
+ << " (" << recovering_oids << ")"
+#endif
+ << dendl;
+ ceph_assert(recovery_ops_active > 0);
+ recovery_ops_active--;
+#ifdef DEBUG_RECOVERY_OIDS
+ ceph_assert(recovering_oids.count(soid));
+ recovering_oids.erase(recovering_oids.find(soid));
+#endif
+ osd->finish_recovery_op(this, soid, dequeue);
+
+ if (!dequeue) {
+ queue_recovery();
+ }
+}
+
+void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
+{
+ recovery_state.split_into(child_pgid, &child->recovery_state, split_bits);
+
+ child->update_snap_mapper_bits(split_bits);
+
+ child->snap_trimq = snap_trimq;
+ child->snap_trimq_repeat = snap_trimq_repeat;
+
+ _split_into(child_pgid, child, split_bits);
+
+ // release all backoffs for simplicity
+ release_backoffs(hobject_t(), hobject_t::get_max());
+}
+
+void PG::start_split_stats(const set<spg_t>& childpgs, vector<object_stat_sum_t> *out)
+{
+ recovery_state.start_split_stats(childpgs, out);
+}
+
+void PG::finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction &t)
+{
+ recovery_state.finish_split_stats(stats, t);
+}
+
+void PG::merge_from(map<spg_t,PGRef>& sources, PeeringCtx &rctx,
+ unsigned split_bits,
+ const pg_merge_meta_t& last_pg_merge_meta)
+{
+ dout(10) << __func__ << " from " << sources << " split_bits " << split_bits
+ << dendl;
+ map<spg_t, PeeringState*> source_ps;
+ for (auto &&source : sources) {
+ source_ps.emplace(source.first, &source.second->recovery_state);
+ }
+ recovery_state.merge_from(source_ps, rctx, split_bits, last_pg_merge_meta);
+
+ for (auto& i : sources) {
+ auto& source = i.second;
+ // wipe out source's pgmeta
+ rctx.transaction.remove(source->coll, source->pgmeta_oid);
+
+ // merge (and destroy source collection)
+ rctx.transaction.merge_collection(source->coll, coll, split_bits);
+ }
+
+ // merge_collection does this, but maybe all of our sources were missing.
+ rctx.transaction.collection_set_bits(coll, split_bits);
+
+ snap_mapper.update_bits(split_bits);
+}
+
+void PG::add_backoff(const ceph::ref_t<Session>& s, const hobject_t& begin, const hobject_t& end)
+{
+ auto con = s->con;
+ if (!con) // OSD::ms_handle_reset clears s->con without a lock
+ return;
+ auto b = s->have_backoff(info.pgid, begin);
+ if (b) {
+ derr << __func__ << " already have backoff for " << s << " begin " << begin
+ << " " << *b << dendl;
+ ceph_abort();
+ }
+ std::lock_guard l(backoff_lock);
+ b = ceph::make_ref<Backoff>(info.pgid, this, s, ++s->backoff_seq, begin, end);
+ backoffs[begin].insert(b);
+ s->add_backoff(b);
+ dout(10) << __func__ << " session " << s << " added " << *b << dendl;
+ con->send_message(
+ new MOSDBackoff(
+ info.pgid,
+ get_osdmap_epoch(),
+ CEPH_OSD_BACKOFF_OP_BLOCK,
+ b->id,
+ begin,
+ end));
+}
+
+void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
+{
+ dout(10) << __func__ << " [" << begin << "," << end << ")" << dendl;
+ vector<ceph::ref_t<Backoff>> bv;
+ {
+ std::lock_guard l(backoff_lock);
+ auto p = backoffs.lower_bound(begin);
+ while (p != backoffs.end()) {
+ int r = cmp(p->first, end);
+ dout(20) << __func__ << " ? " << r << " " << p->first
+ << " " << p->second << dendl;
+ // note: must still examine begin=end=p->first case
+ if (r > 0 || (r == 0 && begin < end)) {
+ break;
+ }
+ dout(20) << __func__ << " checking " << p->first
+ << " " << p->second << dendl;
+ auto q = p->second.begin();
+ while (q != p->second.end()) {
+ dout(20) << __func__ << " checking " << *q << dendl;
+ int r = cmp((*q)->begin, begin);
+ if (r == 0 || (r > 0 && (*q)->end < end)) {
+ bv.push_back(*q);
+ q = p->second.erase(q);
+ } else {
+ ++q;
+ }
+ }
+ if (p->second.empty()) {
+ p = backoffs.erase(p);
+ } else {
+ ++p;
+ }
+ }
+ }
+ for (auto b : bv) {
+ std::lock_guard l(b->lock);
+ dout(10) << __func__ << " " << *b << dendl;
+ if (b->session) {
+ ceph_assert(b->pg == this);
+ ConnectionRef con = b->session->con;
+ if (con) { // OSD::ms_handle_reset clears s->con without a lock
+ con->send_message(
+ new MOSDBackoff(
+ info.pgid,
+ get_osdmap_epoch(),
+ CEPH_OSD_BACKOFF_OP_UNBLOCK,
+ b->id,
+ b->begin,
+ b->end));
+ }
+ if (b->is_new()) {
+ b->state = Backoff::STATE_DELETING;
+ } else {
+ b->session->rm_backoff(b);
+ b->session.reset();
+ }
+ b->pg.reset();
+ }
+ }
+}
+
+void PG::clear_backoffs()
+{
+ dout(10) << __func__ << " " << dendl;
+ map<hobject_t,set<ceph::ref_t<Backoff>>> ls;
+ {
+ std::lock_guard l(backoff_lock);
+ ls.swap(backoffs);
+ }
+ for (auto& p : ls) {
+ for (auto& b : p.second) {
+ std::lock_guard l(b->lock);
+ dout(10) << __func__ << " " << *b << dendl;
+ if (b->session) {
+ ceph_assert(b->pg == this);
+ if (b->is_new()) {
+ b->state = Backoff::STATE_DELETING;
+ } else {
+ b->session->rm_backoff(b);
+ b->session.reset();
+ }
+ b->pg.reset();
+ }
+ }
+ }
+}
+
+// called by Session::clear_backoffs()
+void PG::rm_backoff(const ceph::ref_t<Backoff>& b)
+{
+ dout(10) << __func__ << " " << *b << dendl;
+ std::lock_guard l(backoff_lock);
+ ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
+ ceph_assert(b->pg == this);
+ auto p = backoffs.find(b->begin);
+ // may race with release_backoffs()
+ if (p != backoffs.end()) {
+ auto q = p->second.find(b);
+ if (q != p->second.end()) {
+ p->second.erase(q);
+ if (p->second.empty()) {
+ backoffs.erase(p);
+ }
+ }
+ }
+}
+
+void PG::clear_recovery_state()
+{
+ dout(10) << "clear_recovery_state" << dendl;
+
+ finish_sync_event = 0;
+
+ hobject_t soid;
+ while (recovery_ops_active > 0) {
+#ifdef DEBUG_RECOVERY_OIDS
+ soid = *recovering_oids.begin();
+#endif
+ finish_recovery_op(soid, true);
+ }
+
+ backfill_info.clear();
+ peer_backfill_info.clear();
+ waiting_on_backfill.clear();
+ _clear_recovery_state(); // pg impl specific hook
+}
+
+void PG::cancel_recovery()
+{
+ dout(10) << "cancel_recovery" << dendl;
+ clear_recovery_state();
+}
+
+void PG::set_probe_targets(const set<pg_shard_t> &probe_set)
+{
+ std::lock_guard l(heartbeat_peer_lock);
+ probe_targets.clear();
+ for (set<pg_shard_t>::iterator i = probe_set.begin();
+ i != probe_set.end();
+ ++i) {
+ probe_targets.insert(i->osd);
+ }
+}
+
+void PG::send_cluster_message(
+ int target, MessageRef m,
+ epoch_t epoch, bool share_map_update=false)
+{
+ ConnectionRef con = osd->get_con_osd_cluster(
+ target, get_osdmap_epoch());
+ if (!con) {
+ return;
+ }
+
+ if (share_map_update) {
+ osd->maybe_share_map(con.get(), get_osdmap());
+ }
+ osd->send_message_osd_cluster(m, con.get());
+}
+
+void PG::clear_probe_targets()
+{
+ std::lock_guard l(heartbeat_peer_lock);
+ probe_targets.clear();
+}
+
+void PG::update_heartbeat_peers(set<int> new_peers)
+{
+ bool need_update = false;
+ heartbeat_peer_lock.lock();
+ if (new_peers == heartbeat_peers) {
+ dout(10) << "update_heartbeat_peers " << heartbeat_peers << " unchanged" << dendl;
+ } else {
+ dout(10) << "update_heartbeat_peers " << heartbeat_peers << " -> " << new_peers << dendl;
+ heartbeat_peers.swap(new_peers);
+ need_update = true;
+ }
+ heartbeat_peer_lock.unlock();
+
+ if (need_update)
+ osd->need_heartbeat_peer_update();
+}
+
+
+bool PG::check_in_progress_op(
+ const osd_reqid_t &r,
+ eversion_t *version,
+ version_t *user_version,
+ int *return_code,
+ vector<pg_log_op_return_item_t> *op_returns
+ ) const
+{
+ return (
+ projected_log.get_request(r, version, user_version, return_code,
+ op_returns) ||
+ recovery_state.get_pg_log().get_log().get_request(
+ r, version, user_version, return_code, op_returns));
+}
+
+void PG::publish_stats_to_osd()
+{
+ if (!is_primary())
+ return;
+
+ std::lock_guard l{pg_stats_publish_lock};
+ auto stats = recovery_state.prepare_stats_for_publish(
+ pg_stats_publish_valid,
+ pg_stats_publish,
+ unstable_stats);
+ if (stats) {
+ pg_stats_publish = stats.value();
+ pg_stats_publish_valid = true;
+ }
+}
+
+unsigned PG::get_target_pg_log_entries() const
+{
+ return osd->get_target_pg_log_entries();
+}
+
+void PG::clear_publish_stats()
+{
+ dout(15) << "clear_stats" << dendl;
+ std::lock_guard l{pg_stats_publish_lock};
+ pg_stats_publish_valid = false;
+}
+
+/**
+ * initialize a newly instantiated pg
+ *
+ * Initialize PG state, as when a PG is initially created, or when it
+ * is first instantiated on the current node.
+ *
+ * @param role our role/rank
+ * @param newup up set
+ * @param newacting acting set
+ * @param history pg history
+ * @param pi past_intervals
+ * @param backfill true if info should be marked as backfill
+ * @param t transaction to write out our new state in
+ */
+void PG::init(
+ int role,
+ const vector<int>& newup, int new_up_primary,
+ const vector<int>& newacting, int new_acting_primary,
+ const pg_history_t& history,
+ const PastIntervals& pi,
+ bool backfill,
+ ObjectStore::Transaction &t)
+{
+ recovery_state.init(
+ role, newup, new_up_primary, newacting,
+ new_acting_primary, history, pi, backfill, t);
+}
+
+void PG::shutdown()
+{
+ ch->flush();
+ std::scoped_lock l{*this};
+ recovery_state.shutdown();
+ on_shutdown();
+}
+
+#pragma GCC diagnostic ignored "-Wpragmas"
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+
+void PG::upgrade(ObjectStore *store)
+{
+ dout(0) << __func__ << " " << info_struct_v << " -> " << pg_latest_struct_v
+ << dendl;
+ ceph_assert(info_struct_v <= 10);
+ ObjectStore::Transaction t;
+
+ // <do upgrade steps here>
+
+ // finished upgrade!
+ ceph_assert(info_struct_v == 10);
+
+ // update infover_key
+ if (info_struct_v < pg_latest_struct_v) {
+ map<string,bufferlist> v;
+ __u8 ver = pg_latest_struct_v;
+ encode(ver, v[string(infover_key)]);
+ t.omap_setkeys(coll, pgmeta_oid, v);
+ }
+
+ recovery_state.force_write_state(t);
+
+ ObjectStore::CollectionHandle ch = store->open_collection(coll);
+ int r = store->queue_transaction(ch, std::move(t));
+ if (r != 0) {
+ derr << __func__ << ": queue_transaction returned "
+ << cpp_strerror(r) << dendl;
+ ceph_abort();
+ }
+ ceph_assert(r == 0);
+
+ C_SaferCond waiter;
+ if (!ch->flush_commit(&waiter)) {
+ waiter.wait();
+ }
+}
+
+#pragma GCC diagnostic pop
+#pragma GCC diagnostic warning "-Wpragmas"
+
+void PG::prepare_write(
+ pg_info_t &info,
+ pg_info_t &last_written_info,
+ PastIntervals &past_intervals,
+ PGLog &pglog,
+ bool dirty_info,
+ bool dirty_big_info,
+ bool need_write_epoch,
+ ObjectStore::Transaction &t)
+{
+ info.stats.stats.add(unstable_stats);
+ unstable_stats.clear();
+ map<string,bufferlist> km;
+ string key_to_remove;
+ if (dirty_big_info || dirty_info) {
+ int ret = prepare_info_keymap(
+ cct,
+ &km,
+ &key_to_remove,
+ get_osdmap_epoch(),
+ info,
+ last_written_info,
+ past_intervals,
+ dirty_big_info,
+ need_write_epoch,
+ cct->_conf->osd_fast_info,
+ osd->logger,
+ this);
+ ceph_assert(ret == 0);
+ }
+ pglog.write_log_and_missing(
+ t, &km, coll, pgmeta_oid, pool.info.require_rollback());
+ if (!km.empty())
+ t.omap_setkeys(coll, pgmeta_oid, km);
+ if (!key_to_remove.empty())
+ t.omap_rmkey(coll, pgmeta_oid, key_to_remove);
+}
+
+#pragma GCC diagnostic ignored "-Wpragmas"
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+
+bool PG::_has_removal_flag(ObjectStore *store,
+ spg_t pgid)
+{
+ coll_t coll(pgid);
+ ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
+
+ // first try new way
+ set<string> keys;
+ keys.insert("_remove");
+ map<string,bufferlist> values;
+ auto ch = store->open_collection(coll);
+ ceph_assert(ch);
+ if (store->omap_get_values(ch, pgmeta_oid, keys, &values) == 0 &&
+ values.size() == 1)
+ return true;
+
+ return false;
+}
+
+int PG::peek_map_epoch(ObjectStore *store,
+ spg_t pgid,
+ epoch_t *pepoch)
+{
+ coll_t coll(pgid);
+ ghobject_t legacy_infos_oid(OSD::make_infos_oid());
+ ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
+ epoch_t cur_epoch = 0;
+
+ // validate collection name
+ ceph_assert(coll.is_pg());
+
+ // try for v8
+ set<string> keys;
+ keys.insert(string(infover_key));
+ keys.insert(string(epoch_key));
+ map<string,bufferlist> values;
+ auto ch = store->open_collection(coll);
+ ceph_assert(ch);
+ int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
+ if (r == 0) {
+ ceph_assert(values.size() == 2);
+
+ // sanity check version
+ auto bp = values[string(infover_key)].cbegin();
+ __u8 struct_v = 0;
+ decode(struct_v, bp);
+ ceph_assert(struct_v >= 8);
+
+ // get epoch
+ bp = values[string(epoch_key)].begin();
+ decode(cur_epoch, bp);
+ } else {
+ // probably bug 10617; see OSD::load_pgs()
+ return -1;
+ }
+
+ *pepoch = cur_epoch;
+ return 0;
+}
+
+#pragma GCC diagnostic pop
+#pragma GCC diagnostic warning "-Wpragmas"
+
+bool PG::check_log_for_corruption(ObjectStore *store)
+{
+ /// TODO: this method needs to work with the omap log
+ return true;
+}
+
+//! Get the name we're going to save our corrupt page log as
+std::string PG::get_corrupt_pg_log_name() const
+{
+ const int MAX_BUF = 512;
+ char buf[MAX_BUF];
+ struct tm tm_buf;
+ time_t my_time(time(NULL));
+ const struct tm *t = localtime_r(&my_time, &tm_buf);
+ int ret = strftime(buf, sizeof(buf), "corrupt_log_%Y-%m-%d_%k:%M_", t);
+ if (ret == 0) {
+ dout(0) << "strftime failed" << dendl;
+ return "corrupt_log_unknown_time";
+ }
+ string out(buf);
+ out += stringify(info.pgid);
+ return out;
+}
+
+int PG::read_info(
+ ObjectStore *store, spg_t pgid, const coll_t &coll,
+ pg_info_t &info, PastIntervals &past_intervals,
+ __u8 &struct_v)
+{
+ set<string> keys;
+ keys.insert(string(infover_key));
+ keys.insert(string(info_key));
+ keys.insert(string(biginfo_key));
+ keys.insert(string(fastinfo_key));
+ ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
+ map<string,bufferlist> values;
+ auto ch = store->open_collection(coll);
+ ceph_assert(ch);
+ int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
+ ceph_assert(r == 0);
+ ceph_assert(values.size() == 3 ||
+ values.size() == 4);
+
+ auto p = values[string(infover_key)].cbegin();
+ decode(struct_v, p);
+ ceph_assert(struct_v >= 10);
+
+ p = values[string(info_key)].begin();
+ decode(info, p);
+
+ p = values[string(biginfo_key)].begin();
+ decode(past_intervals, p);
+ decode(info.purged_snaps, p);
+
+ p = values[string(fastinfo_key)].begin();
+ if (!p.end()) {
+ pg_fast_info_t fast;
+ decode(fast, p);
+ fast.try_apply_to(&info);
+ }
+ return 0;
+}
+
+void PG::read_state(ObjectStore *store)
+{
+ PastIntervals past_intervals_from_disk;
+ pg_info_t info_from_disk;
+ int r = read_info(
+ store,
+ pg_id,
+ coll,
+ info_from_disk,
+ past_intervals_from_disk,
+ info_struct_v);
+ ceph_assert(r >= 0);
+
+ if (info_struct_v < pg_compat_struct_v) {
+ derr << "PG needs upgrade, but on-disk data is too old; upgrade to"
+ << " an older version first." << dendl;
+ ceph_abort_msg("PG too old to upgrade");
+ }
+
+ recovery_state.init_from_disk_state(
+ std::move(info_from_disk),
+ std::move(past_intervals_from_disk),
+ [this, store] (PGLog &pglog) {
+ ostringstream oss;
+ pglog.read_log_and_missing(
+ store,
+ ch,
+ pgmeta_oid,
+ info,
+ oss,
+ cct->_conf->osd_ignore_stale_divergent_priors,
+ cct->_conf->osd_debug_verify_missing_on_start);
+
+ if (oss.tellp())
+ osd->clog->error() << oss.str();
+ return 0;
+ });
+
+ if (info_struct_v < pg_latest_struct_v) {
+ upgrade(store);
+ }
+
+ // initialize current mapping
+ {
+ int primary, up_primary;
+ vector<int> acting, up;
+ get_osdmap()->pg_to_up_acting_osds(
+ pg_id.pgid, &up, &up_primary, &acting, &primary);
+ recovery_state.init_primary_up_acting(
+ up,
+ acting,
+ up_primary,
+ primary);
+ recovery_state.set_role(OSDMap::calc_pg_role(pg_whoami, acting));
+ }
+
+ // init pool options
+ store->set_collection_opts(ch, pool.info.opts);
+
+ PeeringCtx rctx(ceph_release_t::unknown);
+ handle_initialize(rctx);
+ // note: we don't activate here because we know the OSD will advance maps
+ // during boot.
+ write_if_dirty(rctx.transaction);
+ store->queue_transaction(ch, std::move(rctx.transaction));
+}
+
+void PG::update_snap_map(
+ const vector<pg_log_entry_t> &log_entries,
+ ObjectStore::Transaction &t)
+{
+ for (auto i = log_entries.cbegin(); i != log_entries.cend(); ++i) {
+ OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
+ if (i->soid.snap < CEPH_MAXSNAP) {
+ if (i->is_delete()) {
+ int r = snap_mapper.remove_oid(
+ i->soid,
+ &_t);
+ if (r)
+ derr << __func__ << " remove_oid " << i->soid << " failed with " << r << dendl;
+ // On removal tolerate missing key corruption
+ ceph_assert(r == 0 || r == -ENOENT);
+ } else if (i->is_update()) {
+ ceph_assert(i->snaps.length() > 0);
+ vector<snapid_t> snaps;
+ bufferlist snapbl = i->snaps;
+ auto p = snapbl.cbegin();
+ try {
+ decode(snaps, p);
+ } catch (...) {
+ derr << __func__ << " decode snaps failure on " << *i << dendl;
+ snaps.clear();
+ }
+ set<snapid_t> _snaps(snaps.begin(), snaps.end());
+
+ if (i->is_clone() || i->is_promote()) {
+ snap_mapper.add_oid(
+ i->soid,
+ _snaps,
+ &_t);
+ } else if (i->is_modify()) {
+ int r = snap_mapper.update_snaps(
+ i->soid,
+ _snaps,
+ 0,
+ &_t);
+ ceph_assert(r == 0);
+ } else {
+ ceph_assert(i->is_clean());
+ }
+ }
+ }
+ }
+}
+
+/**
+ * filter trimming|trimmed snaps out of snapcontext
+ */
+void PG::filter_snapc(vector<snapid_t> &snaps)
+{
+ // nothing needs to trim, we can return immediately
+ if (snap_trimq.empty() && info.purged_snaps.empty())
+ return;
+
+ bool filtering = false;
+ vector<snapid_t> newsnaps;
+ for (vector<snapid_t>::iterator p = snaps.begin();
+ p != snaps.end();
+ ++p) {
+ if (snap_trimq.contains(*p) || info.purged_snaps.contains(*p)) {
+ if (!filtering) {
+ // start building a new vector with what we've seen so far
+ dout(10) << "filter_snapc filtering " << snaps << dendl;
+ newsnaps.insert(newsnaps.begin(), snaps.begin(), p);
+ filtering = true;
+ }
+ dout(20) << "filter_snapc removing trimq|purged snap " << *p << dendl;
+ } else {
+ if (filtering)
+ newsnaps.push_back(*p); // continue building new vector
+ }
+ }
+ if (filtering) {
+ snaps.swap(newsnaps);
+ dout(10) << "filter_snapc result " << snaps << dendl;
+ }
+}
+
+void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m)
+{
+ for (auto it = m.begin(); it != m.end(); ++it)
+ requeue_ops(it->second);
+ m.clear();
+}
+
+void PG::requeue_op(OpRequestRef op)
+{
+ auto p = waiting_for_map.find(op->get_source());
+ if (p != waiting_for_map.end()) {
+ dout(20) << __func__ << " " << op << " (waiting_for_map " << p->first << ")"
+ << dendl;
+ p->second.push_front(op);
+ } else {
+ dout(20) << __func__ << " " << op << dendl;
+ osd->enqueue_front(
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, op)),
+ op->get_req()->get_cost(),
+ op->get_req()->get_priority(),
+ op->get_req()->get_recv_stamp(),
+ op->get_req()->get_source().num(),
+ get_osdmap_epoch()));
+ }
+}
+
+void PG::requeue_ops(list<OpRequestRef> &ls)
+{
+ for (list<OpRequestRef>::reverse_iterator i = ls.rbegin();
+ i != ls.rend();
+ ++i) {
+ requeue_op(*i);
+ }
+ ls.clear();
+}
+
+void PG::requeue_map_waiters()
+{
+ epoch_t epoch = get_osdmap_epoch();
+ auto p = waiting_for_map.begin();
+ while (p != waiting_for_map.end()) {
+ if (epoch < p->second.front()->min_epoch) {
+ dout(20) << __func__ << " " << p->first << " front op "
+ << p->second.front() << " must still wait, doing nothing"
+ << dendl;
+ ++p;
+ } else {
+ dout(20) << __func__ << " " << p->first << " " << p->second << dendl;
+ for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) {
+ auto req = *q;
+ osd->enqueue_front(OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, req)),
+ req->get_req()->get_cost(),
+ req->get_req()->get_priority(),
+ req->get_req()->get_recv_stamp(),
+ req->get_req()->get_source().num(),
+ epoch));
+ }
+ p = waiting_for_map.erase(p);
+ }
+ }
+}
+
+bool PG::get_must_scrub() const
+{
+ dout(20) << __func__ << " must_scrub? " << (m_planned_scrub.must_scrub ? "true" : "false") << dendl;
+ return m_planned_scrub.must_scrub;
+}
+
+unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority) const
+{
+ return m_scrubber->scrub_requeue_priority(with_priority);
+}
+
+unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const
+{
+ return m_scrubber->scrub_requeue_priority(with_priority, suggested_priority);
+}
+
+// ==========================================================================================
+// SCRUB
+
+/*
+ * implementation note:
+ * PG::sched_scrub() is called only once per a specific scrub session.
+ * That call commits us to the whatever choices are made (deep/shallow, etc').
+ * Unless failing to start scrubbing, the 'planned scrub' flag-set is 'frozen' into
+ * PgScrubber's m_flags, then cleared.
+ */
+bool PG::sched_scrub()
+{
+ dout(15) << __func__ << " pg(" << info.pgid
+ << (is_active() ? ") <active>" : ") <not-active>")
+ << (is_clean() ? " <clean>" : " <not-clean>") << dendl;
+ ceph_assert(ceph_mutex_is_locked(_lock));
+
+ if (!is_primary() || !is_active() || !is_clean()) {
+ return false;
+ }
+
+ if (is_scrub_queued_or_active()) {
+ return false;
+ }
+
+ // analyse the combination of the requested scrub flags, the osd/pool configuration
+ // and the PG status to determine whether we should scrub now, and what type of scrub
+ // should that be.
+ auto updated_flags = verify_scrub_mode();
+ if (!updated_flags) {
+ // the stars do not align for starting a scrub for this PG at this time
+ // (due to configuration or priority issues)
+ // The reason was already reported by the callee.
+ dout(10) << __func__ << ": failed to initiate a scrub" << dendl;
+ return false;
+ }
+
+ // try to reserve the local OSD resources. If failing: no harm. We will
+ // be retried by the OSD later on.
+ if (!m_scrubber->reserve_local()) {
+ dout(10) << __func__ << ": failed to reserve locally" << dendl;
+ return false;
+ }
+
+ // can commit to the updated flags now, as nothing will stop the scrub
+ m_planned_scrub = *updated_flags;
+
+ // An interrupted recovery repair could leave this set.
+ state_clear(PG_STATE_REPAIR);
+
+ // Pass control to the scrubber. It is the scrubber that handles the replicas'
+ // resources reservations.
+ m_scrubber->set_op_parameters(m_planned_scrub);
+
+ dout(10) << __func__ << ": queueing" << dendl;
+ m_scrubber->set_queued_or_active();
+ osd->queue_for_scrub(this, Scrub::scrub_prio_t::low_priority);
+ return true;
+}
+
+double PG::next_deepscrub_interval() const
+{
+ double deep_scrub_interval =
+ pool.info.opts.value_or(pool_opts_t::DEEP_SCRUB_INTERVAL, 0.0);
+ if (deep_scrub_interval <= 0.0)
+ deep_scrub_interval = cct->_conf->osd_deep_scrub_interval;
+ return info.history.last_deep_scrub_stamp + deep_scrub_interval;
+}
+
+bool PG::is_time_for_deep(bool allow_deep_scrub,
+ bool allow_scrub,
+ bool has_deep_errors,
+ const requested_scrub_t& planned) const
+{
+ dout(10) << __func__ << ": need_auto?" << planned.need_auto << " allow_deep_scrub? "
+ << allow_deep_scrub << dendl;
+
+ if (!allow_deep_scrub)
+ return false;
+
+ if (planned.need_auto) {
+ dout(10) << __func__ << ": need repair after scrub errors" << dendl;
+ return true;
+ }
+
+ if (ceph_clock_now() >= next_deepscrub_interval()) {
+ dout(20) << __func__ << ": now (" << ceph_clock_now() << ") >= time for deep ("
+ << next_deepscrub_interval() << ")" << dendl;
+ return true;
+ }
+
+ if (has_deep_errors) {
+ osd->clog->info() << "osd." << osd->whoami << " pg " << info.pgid
+ << " Deep scrub errors, upgrading scrub to deep-scrub";
+ return true;
+ }
+
+ // we only flip coins if 'allow_scrub' is asserted. Otherwise - as this function is
+ // called often, we will probably be deep-scrubbing most of the time.
+ if (allow_scrub) {
+ bool deep_coin_flip =
+ (rand() % 100) < cct->_conf->osd_deep_scrub_randomize_ratio * 100;
+
+ dout(15) << __func__ << ": time_for_deep=" << planned.time_for_deep
+ << " deep_coin_flip=" << deep_coin_flip << dendl;
+
+ if (deep_coin_flip)
+ return true;
+ }
+
+ return false;
+}
+
+bool PG::verify_periodic_scrub_mode(bool allow_deep_scrub,
+ bool try_to_auto_repair,
+ bool allow_regular_scrub,
+ bool has_deep_errors,
+ requested_scrub_t& planned) const
+
+{
+ ceph_assert(!planned.must_deep_scrub && !planned.must_repair);
+
+ if (!allow_deep_scrub && has_deep_errors) {
+ osd->clog->error()
+ << "osd." << osd->whoami << " pg " << info.pgid
+ << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
+ return false;
+ }
+
+ if (allow_deep_scrub) {
+ // Initial entry and scheduled scrubs without nodeep_scrub set get here
+
+ planned.time_for_deep =
+ is_time_for_deep(allow_deep_scrub, allow_regular_scrub, has_deep_errors, planned);
+
+ if (try_to_auto_repair) {
+ if (planned.time_for_deep) {
+ dout(20) << __func__ << ": auto repair with deep scrubbing" << dendl;
+ planned.auto_repair = true;
+ } else if (allow_regular_scrub) {
+ dout(20) << __func__ << ": auto repair with scrubbing, rescrub if errors found"
+ << dendl;
+ planned.deep_scrub_on_error = true;
+ }
+ }
+ }
+
+ dout(20) << __func__ << " updated flags: " << planned
+ << " allow_regular_scrub: " << allow_regular_scrub << dendl;
+
+ // NOSCRUB so skip regular scrubs
+ if (!allow_regular_scrub && !planned.time_for_deep) {
+ return false;
+ }
+
+ return true;
+}
+
+std::optional<requested_scrub_t> PG::verify_scrub_mode() const
+{
+ dout(10) << __func__ << " processing pg " << info.pgid << dendl;
+
+ bool allow_deep_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
+ pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
+ bool allow_regular_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
+ pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
+ bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
+ bool try_to_auto_repair =
+ (cct->_conf->osd_scrub_auto_repair && get_pgbackend()->auto_repair_supported());
+
+ auto upd_flags = m_planned_scrub;
+
+ upd_flags.time_for_deep = false;
+ // Clear these in case user issues the scrub/repair command during
+ // the scheduling of the scrub/repair (e.g. request reservation)
+ upd_flags.deep_scrub_on_error = false;
+ upd_flags.auto_repair = false;
+
+ if (upd_flags.must_scrub && !upd_flags.must_deep_scrub && has_deep_errors) {
+ osd->clog->error() << "osd." << osd->whoami << " pg " << info.pgid
+ << " Regular scrub request, deep-scrub details will be lost";
+ }
+
+ if (!upd_flags.must_scrub) {
+ // All periodic scrub handling goes here because must_scrub is
+ // always set for must_deep_scrub and must_repair.
+
+ bool can_start_periodic =
+ verify_periodic_scrub_mode(allow_deep_scrub, try_to_auto_repair,
+ allow_regular_scrub, has_deep_errors, upd_flags);
+ if (!can_start_periodic) {
+ return std::nullopt;
+ }
+ }
+
+ // scrubbing while recovering?
+
+ bool prevented_by_recovery =
+ osd->is_recovery_active() && !cct->_conf->osd_scrub_during_recovery &&
+ (!cct->_conf->osd_repair_during_recovery || !upd_flags.must_repair);
+
+ if (prevented_by_recovery) {
+ dout(20) << __func__ << ": scrubbing prevented during recovery" << dendl;
+ return std::nullopt;
+ }
+
+ upd_flags.need_auto = false;
+ return upd_flags;
+}
+
+void PG::reg_next_scrub()
+{
+ m_scrubber->reg_next_scrub(m_planned_scrub);
+}
+
+void PG::on_info_history_change()
+{
+ dout(20) << __func__ << dendl;
+ if (m_scrubber) {
+ m_scrubber->unreg_next_scrub();
+ m_scrubber->reg_next_scrub(m_planned_scrub);
+ }
+}
+
+void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
+{
+ if (m_scrubber) {
+ m_scrubber->scrub_requested(scrub_level, scrub_type, m_planned_scrub);
+ }
+}
+
+void PG::clear_ready_to_merge() {
+ osd->clear_ready_to_merge(this);
+}
+
+void PG::queue_want_pg_temp(const vector<int> &wanted) {
+ osd->queue_want_pg_temp(get_pgid().pgid, wanted);
+}
+
+void PG::clear_want_pg_temp() {
+ osd->remove_want_pg_temp(get_pgid().pgid);
+}
+
+void PG::on_role_change() {
+ requeue_ops(waiting_for_peered);
+ plpg_on_role_change();
+}
+
+void PG::on_new_interval()
+{
+ dout(20) << __func__ << ": scrub flags on new interval: " << m_planned_scrub
+ << dendl;
+ projected_last_update = eversion_t();
+ cancel_recovery();
+}
+
+epoch_t PG::oldest_stored_osdmap() {
+ return osd->get_superblock().oldest_map;
+}
+
+OstreamTemp PG::get_clog_info() {
+ return osd->clog->info();
+}
+
+OstreamTemp PG::get_clog_debug() {
+ return osd->clog->debug();
+}
+
+OstreamTemp PG::get_clog_error() {
+ return osd->clog->error();
+}
+
+void PG::schedule_event_after(
+ PGPeeringEventRef event,
+ float delay) {
+ std::lock_guard lock(osd->recovery_request_lock);
+ osd->recovery_request_timer.add_event_after(
+ delay,
+ new QueuePeeringEvt(
+ this,
+ std::move(event)));
+}
+
+void PG::request_local_background_io_reservation(
+ unsigned priority,
+ PGPeeringEventURef on_grant,
+ PGPeeringEventURef on_preempt) {
+ osd->local_reserver.request_reservation(
+ pg_id,
+ on_grant ? new QueuePeeringEvt(
+ this, std::move(on_grant)) : nullptr,
+ priority,
+ on_preempt ? new QueuePeeringEvt(
+ this, std::move(on_preempt)) : nullptr);
+}
+
+void PG::update_local_background_io_priority(
+ unsigned priority) {
+ osd->local_reserver.update_priority(
+ pg_id,
+ priority);
+}
+
+void PG::cancel_local_background_io_reservation() {
+ osd->local_reserver.cancel_reservation(
+ pg_id);
+}
+
+void PG::request_remote_recovery_reservation(
+ unsigned priority,
+ PGPeeringEventURef on_grant,
+ PGPeeringEventURef on_preempt) {
+ osd->remote_reserver.request_reservation(
+ pg_id,
+ on_grant ? new QueuePeeringEvt(
+ this, std::move(on_grant)) : nullptr,
+ priority,
+ on_preempt ? new QueuePeeringEvt(
+ this, std::move(on_preempt)) : nullptr);
+}
+
+void PG::cancel_remote_recovery_reservation() {
+ osd->remote_reserver.cancel_reservation(
+ pg_id);
+}
+
+void PG::schedule_event_on_commit(
+ ObjectStore::Transaction &t,
+ PGPeeringEventRef on_commit)
+{
+ t.register_on_commit(new QueuePeeringEvt(this, on_commit));
+}
+
+void PG::on_activate(interval_set<snapid_t> snaps)
+{
+ ceph_assert(!m_scrubber->are_callbacks_pending());
+ ceph_assert(callbacks_for_degraded_object.empty());
+ snap_trimq = snaps;
+ release_pg_backoffs();
+ projected_last_update = info.last_update;
+}
+
+void PG::on_active_exit()
+{
+ backfill_reserving = false;
+ agent_stop();
+}
+
+void PG::on_active_advmap(const OSDMapRef &osdmap)
+{
+ const auto& new_removed_snaps = osdmap->get_new_removed_snaps();
+ auto i = new_removed_snaps.find(get_pgid().pool());
+ if (i != new_removed_snaps.end()) {
+ bool bad = false;
+ for (auto j : i->second) {
+ if (snap_trimq.intersects(j.first, j.second)) {
+ decltype(snap_trimq) added, overlap;
+ added.insert(j.first, j.second);
+ overlap.intersection_of(snap_trimq, added);
+ derr << __func__ << " removed_snaps already contains "
+ << overlap << dendl;
+ bad = true;
+ snap_trimq.union_of(added);
+ } else {
+ snap_trimq.insert(j.first, j.second);
+ }
+ }
+ dout(10) << __func__ << " new removed_snaps " << i->second
+ << ", snap_trimq now " << snap_trimq << dendl;
+ ceph_assert(!bad || !cct->_conf->osd_debug_verify_cached_snaps);
+ }
+
+ const auto& new_purged_snaps = osdmap->get_new_purged_snaps();
+ auto j = new_purged_snaps.find(get_pgid().pgid.pool());
+ if (j != new_purged_snaps.end()) {
+ bool bad = false;
+ for (auto k : j->second) {
+ if (!recovery_state.get_info().purged_snaps.contains(k.first, k.second)) {
+ interval_set<snapid_t> rm, overlap;
+ rm.insert(k.first, k.second);
+ overlap.intersection_of(recovery_state.get_info().purged_snaps, rm);
+ derr << __func__ << " purged_snaps does not contain "
+ << rm << ", only " << overlap << dendl;
+ recovery_state.adjust_purged_snaps(
+ [&overlap](auto &purged_snaps) {
+ purged_snaps.subtract(overlap);
+ });
+ // This can currently happen in the normal (if unlikely) course of
+ // events. Because adding snaps to purged_snaps does not increase
+ // the pg version or add a pg log entry, we don't reliably propagate
+ // purged_snaps additions to other OSDs.
+ // One example:
+ // - purge S
+ // - primary and replicas update purged_snaps
+ // - no object updates
+ // - pg mapping changes, new primary on different node
+ // - new primary pg version == eversion_t(), so info is not
+ // propagated.
+ //bad = true;
+ } else {
+ recovery_state.adjust_purged_snaps(
+ [&k](auto &purged_snaps) {
+ purged_snaps.erase(k.first, k.second);
+ });
+ }
+ }
+ dout(10) << __func__ << " new purged_snaps " << j->second
+ << ", now " << recovery_state.get_info().purged_snaps << dendl;
+ ceph_assert(!bad || !cct->_conf->osd_debug_verify_cached_snaps);
+ }
+}
+
+void PG::queue_snap_retrim(snapid_t snap)
+{
+ if (!is_active() ||
+ !is_primary()) {
+ dout(10) << __func__ << " snap " << snap << " - not active and primary"
+ << dendl;
+ return;
+ }
+ if (!snap_trimq.contains(snap)) {
+ snap_trimq.insert(snap);
+ snap_trimq_repeat.insert(snap);
+ dout(20) << __func__ << " snap " << snap
+ << ", trimq now " << snap_trimq
+ << ", repeat " << snap_trimq_repeat << dendl;
+ kick_snap_trim();
+ } else {
+ dout(20) << __func__ << " snap " << snap
+ << " already in trimq " << snap_trimq << dendl;
+ }
+}
+
+void PG::on_active_actmap()
+{
+ if (cct->_conf->osd_check_for_log_corruption)
+ check_log_for_corruption(osd->store);
+
+
+ if (recovery_state.is_active()) {
+ dout(10) << "Active: kicking snap trim" << dendl;
+ kick_snap_trim();
+ }
+
+ if (recovery_state.is_peered() &&
+ !recovery_state.is_clean() &&
+ !recovery_state.get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL) &&
+ (!recovery_state.get_osdmap()->test_flag(CEPH_OSDMAP_NOREBALANCE) ||
+ recovery_state.is_degraded())) {
+ queue_recovery();
+ }
+}
+
+void PG::on_backfill_reserved()
+{
+ backfill_reserving = false;
+ queue_recovery();
+}
+
+void PG::on_backfill_canceled()
+{
+ if (!waiting_on_backfill.empty()) {
+ waiting_on_backfill.clear();
+ finish_recovery_op(hobject_t::get_max());
+ }
+}
+
+void PG::on_recovery_reserved()
+{
+ queue_recovery();
+}
+
+void PG::set_not_ready_to_merge_target(pg_t pgid, pg_t src)
+{
+ osd->set_not_ready_to_merge_target(pgid, src);
+}
+
+void PG::set_not_ready_to_merge_source(pg_t pgid)
+{
+ osd->set_not_ready_to_merge_source(pgid);
+}
+
+void PG::set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec)
+{
+ osd->set_ready_to_merge_target(this, lu, les, lec);
+}
+
+void PG::set_ready_to_merge_source(eversion_t lu)
+{
+ osd->set_ready_to_merge_source(this, lu);
+}
+
+void PG::send_pg_created(pg_t pgid)
+{
+ osd->send_pg_created(pgid);
+}
+
+ceph::signedspan PG::get_mnow()
+{
+ return osd->get_mnow();
+}
+
+HeartbeatStampsRef PG::get_hb_stamps(int peer)
+{
+ return osd->get_hb_stamps(peer);
+}
+
+void PG::schedule_renew_lease(epoch_t lpr, ceph::timespan delay)
+{
+ auto spgid = info.pgid;
+ auto o = osd;
+ osd->mono_timer.add_event(
+ delay,
+ [o, lpr, spgid]() {
+ o->queue_renew_lease(lpr, spgid);
+ });
+}
+
+void PG::queue_check_readable(epoch_t lpr, ceph::timespan delay)
+{
+ osd->queue_check_readable(info.pgid, lpr, delay);
+}
+
+void PG::rebuild_missing_set_with_deletes(PGLog &pglog)
+{
+ pglog.rebuild_missing_set_with_deletes(
+ osd->store,
+ ch,
+ recovery_state.get_info());
+}
+
+void PG::on_activate_committed()
+{
+ if (!is_primary()) {
+ // waiters
+ if (recovery_state.needs_flush() == 0) {
+ requeue_ops(waiting_for_peered);
+ } else if (!waiting_for_peered.empty()) {
+ dout(10) << __func__ << " flushes in progress, moving "
+ << waiting_for_peered.size() << " items to waiting_for_flush"
+ << dendl;
+ ceph_assert(waiting_for_flush.empty());
+ waiting_for_flush.swap(waiting_for_peered);
+ }
+ }
+}
+
+// Compute pending backfill data
+static int64_t pending_backfill(CephContext *cct, int64_t bf_bytes, int64_t local_bytes)
+{
+ lgeneric_dout(cct, 20) << __func__ << " Adjust local usage "
+ << (local_bytes >> 10) << "KiB"
+ << " primary usage " << (bf_bytes >> 10)
+ << "KiB" << dendl;
+
+ return std::max((int64_t)0, bf_bytes - local_bytes);
+}
+
+
+// We can zero the value of primary num_bytes as just an atomic.
+// However, setting above zero reserves space for backfill and requires
+// the OSDService::stat_lock which protects all OSD usage
+bool PG::try_reserve_recovery_space(
+ int64_t primary_bytes, int64_t local_bytes) {
+ // Use tentative_bacfill_full() to make sure enough
+ // space is available to handle target bytes from primary.
+
+ // TODO: If we passed num_objects from primary we could account for
+ // an estimate of the metadata overhead.
+
+ // TODO: If we had compressed_allocated and compressed_original from primary
+ // we could compute compression ratio and adjust accordingly.
+
+ // XXX: There is no way to get omap overhead and this would only apply
+ // to whatever possibly different partition that is storing the database.
+
+ // update_osd_stat() from heartbeat will do this on a new
+ // statfs using ps->primary_bytes.
+ uint64_t pending_adjustment = 0;
+ if (primary_bytes) {
+ // For erasure coded pool overestimate by a full stripe per object
+ // because we don't know how each objected rounded to the nearest stripe
+ if (pool.info.is_erasure()) {
+ primary_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
+ primary_bytes += get_pgbackend()->get_ec_stripe_chunk_size() *
+ info.stats.stats.sum.num_objects;
+ local_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
+ local_bytes += get_pgbackend()->get_ec_stripe_chunk_size() *
+ info.stats.stats.sum.num_objects;
+ }
+ pending_adjustment = pending_backfill(
+ cct,
+ primary_bytes,
+ local_bytes);
+ dout(10) << __func__ << " primary_bytes " << (primary_bytes >> 10)
+ << "KiB"
+ << " local " << (local_bytes >> 10) << "KiB"
+ << " pending_adjustments " << (pending_adjustment >> 10) << "KiB"
+ << dendl;
+ }
+
+ // This lock protects not only the stats OSDService but also setting the
+ // pg primary_bytes. That's why we don't immediately unlock
+ std::lock_guard l{osd->stat_lock};
+ osd_stat_t cur_stat = osd->osd_stat;
+ if (cct->_conf->osd_debug_reject_backfill_probability > 0 &&
+ (rand()%1000 < (cct->_conf->osd_debug_reject_backfill_probability*1000.0))) {
+ dout(10) << "backfill reservation rejected: failure injection"
+ << dendl;
+ return false;
+ } else if (!cct->_conf->osd_debug_skip_full_check_in_backfill_reservation &&
+ osd->tentative_backfill_full(this, pending_adjustment, cur_stat)) {
+ dout(10) << "backfill reservation rejected: backfill full"
+ << dendl;
+ return false;
+ } else {
+ // Don't reserve space if skipped reservation check, this is used
+ // to test the other backfill full check AND in case a corruption
+ // of num_bytes requires ignoring that value and trying the
+ // backfill anyway.
+ if (primary_bytes &&
+ !cct->_conf->osd_debug_skip_full_check_in_backfill_reservation) {
+ primary_num_bytes.store(primary_bytes);
+ local_num_bytes.store(local_bytes);
+ } else {
+ unreserve_recovery_space();
+ }
+ return true;
+ }
+}
+
+void PG::unreserve_recovery_space() {
+ primary_num_bytes.store(0);
+ local_num_bytes.store(0);
+}
+
+void PG::_scan_rollback_obs(const vector<ghobject_t> &rollback_obs)
+{
+ ObjectStore::Transaction t;
+ eversion_t trimmed_to = recovery_state.get_last_rollback_info_trimmed_to_applied();
+ for (vector<ghobject_t>::const_iterator i = rollback_obs.begin();
+ i != rollback_obs.end();
+ ++i) {
+ if (i->generation < trimmed_to.version) {
+ dout(10) << __func__ << "osd." << osd->whoami
+ << " pg " << info.pgid
+ << " found obsolete rollback obj "
+ << *i << " generation < trimmed_to "
+ << trimmed_to
+ << "...repaired" << dendl;
+ t.remove(coll, *i);
+ }
+ }
+ if (!t.empty()) {
+ derr << __func__ << ": queueing trans to clean up obsolete rollback objs"
+ << dendl;
+ osd->store->queue_transaction(ch, std::move(t), NULL);
+ }
+}
+
+
+void PG::_repair_oinfo_oid(ScrubMap &smap)
+{
+ for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
+ i != smap.objects.rend();
+ ++i) {
+ const hobject_t &hoid = i->first;
+ ScrubMap::object &o = i->second;
+
+ bufferlist bl;
+ if (o.attrs.find(OI_ATTR) == o.attrs.end()) {
+ continue;
+ }
+ bl.push_back(o.attrs[OI_ATTR]);
+ object_info_t oi;
+ try {
+ oi.decode(bl);
+ } catch(...) {
+ continue;
+ }
+ if (oi.soid != hoid) {
+ ObjectStore::Transaction t;
+ OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
+ osd->clog->error() << "osd." << osd->whoami
+ << " found object info error on pg "
+ << info.pgid
+ << " oid " << hoid << " oid in object info: "
+ << oi.soid
+ << "...repaired";
+ // Fix object info
+ oi.soid = hoid;
+ bl.clear();
+ encode(oi, bl, get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
+
+ bufferptr bp(bl.c_str(), bl.length());
+ o.attrs[OI_ATTR] = bp;
+
+ t.setattr(coll, ghobject_t(hoid), OI_ATTR, bl);
+ int r = osd->store->queue_transaction(ch, std::move(t));
+ if (r != 0) {
+ derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
+ << dendl;
+ }
+ }
+ }
+}
+
+void PG::repair_object(
+ const hobject_t &soid,
+ const list<pair<ScrubMap::object, pg_shard_t> > &ok_peers,
+ const set<pg_shard_t> &bad_peers)
+{
+ set<pg_shard_t> ok_shards;
+ for (auto &&peer: ok_peers) ok_shards.insert(peer.second);
+
+ dout(10) << "repair_object " << soid
+ << " bad_peers osd.{" << bad_peers << "},"
+ << " ok_peers osd.{" << ok_shards << "}" << dendl;
+
+ const ScrubMap::object &po = ok_peers.back().first;
+ eversion_t v;
+ object_info_t oi;
+ try {
+ bufferlist bv;
+ if (po.attrs.count(OI_ATTR)) {
+ bv.push_back(po.attrs.find(OI_ATTR)->second);
+ }
+ auto bliter = bv.cbegin();
+ decode(oi, bliter);
+ } catch (...) {
+ dout(0) << __func__ << ": Need version of replica, bad object_info_t: "
+ << soid << dendl;
+ ceph_abort();
+ }
+
+ if (bad_peers.count(get_primary())) {
+ // We should only be scrubbing if the PG is clean.
+ ceph_assert(waiting_for_unreadable_object.empty());
+ dout(10) << __func__ << ": primary = " << get_primary() << dendl;
+ }
+
+ /* No need to pass ok_peers, they must not be missing the object, so
+ * force_object_missing will add them to missing_loc anyway */
+ recovery_state.force_object_missing(bad_peers, soid, oi.version);
+}
+
+void PG::forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued, std::string_view desc)
+{
+ dout(20) << __func__ << ": " << desc << " queued at: " << epoch_queued << dendl;
+ ceph_assert(m_scrubber);
+ if (is_active()) {
+ ((*m_scrubber).*fn)(epoch_queued);
+ } else {
+ // pg might be in the process of being deleted
+ dout(5) << __func__ << " refusing to forward. " << (is_clean() ? "(clean) " : "(not clean) ") <<
+ (is_active() ? "(active) " : "(not active) ") << dendl;
+ }
+}
+
+void PG::forward_scrub_event(ScrubSafeAPI fn,
+ epoch_t epoch_queued,
+ Scrub::act_token_t act_token,
+ std::string_view desc)
+{
+ dout(20) << __func__ << ": " << desc << " queued: " << epoch_queued
+ << " token: " << act_token << dendl;
+ ceph_assert(m_scrubber);
+ if (is_active()) {
+ ((*m_scrubber).*fn)(epoch_queued, act_token);
+ } else {
+ // pg might be in the process of being deleted
+ dout(5) << __func__ << " refusing to forward. "
+ << (is_clean() ? "(clean) " : "(not clean) ")
+ << (is_active() ? "(active) " : "(not active) ") << dendl;
+ }
+}
+
+void PG::replica_scrub(OpRequestRef op, ThreadPool::TPHandle& handle)
+{
+ dout(10) << __func__ << " (op)" << dendl;
+ ceph_assert(m_scrubber);
+ m_scrubber->replica_scrub_op(op);
+}
+
+void PG::replica_scrub(epoch_t epoch_queued,
+ Scrub::act_token_t act_token,
+ [[maybe_unused]] ThreadPool::TPHandle& handle)
+{
+ dout(10) << __func__ << " queued at: " << epoch_queued
+ << (is_primary() ? " (primary)" : " (replica)") << dendl;
+ forward_scrub_event(&ScrubPgIF::send_start_replica, epoch_queued, act_token,
+ "StartReplica/nw");
+}
+
+bool PG::ops_blocked_by_scrub() const
+{
+ return !waiting_for_scrub.empty();
+}
+
+Scrub::scrub_prio_t PG::is_scrub_blocking_ops() const
+{
+ return waiting_for_scrub.empty() ? Scrub::scrub_prio_t::low_priority
+ : Scrub::scrub_prio_t::high_priority;
+}
+
+bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch)
+{
+ if (auto last_reset = get_last_peering_reset();
+ last_reset > reply_epoch || last_reset > query_epoch) {
+ dout(10) << "old_peering_msg reply_epoch " << reply_epoch << " query_epoch "
+ << query_epoch << " last_peering_reset " << last_reset << dendl;
+ return true;
+ }
+ return false;
+}
+
+struct FlushState {
+ PGRef pg;
+ epoch_t epoch;
+ FlushState(PG *pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
+ ~FlushState() {
+ std::scoped_lock l{*pg};
+ if (!pg->pg_has_reset_since(epoch)) {
+ pg->recovery_state.complete_flush();
+ }
+ }
+};
+typedef std::shared_ptr<FlushState> FlushStateRef;
+
+void PG::start_flush_on_transaction(ObjectStore::Transaction &t)
+{
+ // flush in progress ops
+ FlushStateRef flush_trigger (std::make_shared<FlushState>(
+ this, get_osdmap_epoch()));
+ t.register_on_applied(new ContainerContext<FlushStateRef>(flush_trigger));
+ t.register_on_commit(new ContainerContext<FlushStateRef>(flush_trigger));
+}
+
+bool PG::try_flush_or_schedule_async()
+{
+ Context *c = new QueuePeeringEvt(
+ this, get_osdmap_epoch(), PeeringState::IntervalFlush());
+ if (!ch->flush_commit(c)) {
+ return false;
+ } else {
+ delete c;
+ return true;
+ }
+}
+
+ostream& operator<<(ostream& out, const PG& pg)
+{
+ out << pg.recovery_state;
+
+ // listing all scrub-related flags - both current and "planned next scrub"
+ if (pg.is_scrubbing()) {
+ out << *pg.m_scrubber;
+ }
+ out << pg.m_planned_scrub;
+
+ if (pg.recovery_ops_active)
+ out << " rops=" << pg.recovery_ops_active;
+
+ //out << " (" << pg.pg_log.get_tail() << "," << pg.pg_log.get_head() << "]";
+ if (pg.recovery_state.have_missing()) {
+ out << " m=" << pg.recovery_state.get_num_missing();
+ if (pg.is_primary()) {
+ uint64_t unfound = pg.recovery_state.get_num_unfound();
+ if (unfound)
+ out << " u=" << unfound;
+ }
+ }
+ if (!pg.is_clean()) {
+ out << " mbc=" << pg.recovery_state.get_missing_by_count();
+ }
+ if (!pg.snap_trimq.empty()) {
+ out << " trimq=";
+ // only show a count if the set is large
+ if (pg.snap_trimq.num_intervals() > 16) {
+ out << pg.snap_trimq.size();
+ if (!pg.snap_trimq_repeat.empty()) {
+ out << "(" << pg.snap_trimq_repeat.size() << ")";
+ }
+ } else {
+ out << pg.snap_trimq;
+ if (!pg.snap_trimq_repeat.empty()) {
+ out << "(" << pg.snap_trimq_repeat << ")";
+ }
+ }
+ }
+ if (!pg.recovery_state.get_info().purged_snaps.empty()) {
+ out << " ps="; // snap trim queue / purged snaps
+ if (pg.recovery_state.get_info().purged_snaps.num_intervals() > 16) {
+ out << pg.recovery_state.get_info().purged_snaps.size();
+ } else {
+ out << pg.recovery_state.get_info().purged_snaps;
+ }
+ }
+
+ out << "]";
+ return out;
+}
+
+bool PG::can_discard_op(OpRequestRef& op)
+{
+ auto m = op->get_req<MOSDOp>();
+ if (cct->_conf->osd_discard_disconnected_ops && OSD::op_is_discardable(m)) {
+ dout(20) << " discard " << *m << dendl;
+ return true;
+ }
+
+ if (m->get_map_epoch() < info.history.same_primary_since) {
+ dout(7) << " changed after " << m->get_map_epoch()
+ << ", dropping " << *m << dendl;
+ return true;
+ }
+
+ if ((m->get_flags() & (CEPH_OSD_FLAG_BALANCE_READS |
+ CEPH_OSD_FLAG_LOCALIZE_READS)) &&
+ !is_primary() &&
+ m->get_map_epoch() < info.history.same_interval_since) {
+ // Note: the Objecter will resend on interval change without the primary
+ // changing if it actually sent to a replica. If the primary hasn't
+ // changed since the send epoch, we got it, and we're primary, it won't
+ // have resent even if the interval did change as it sent it to the primary
+ // (us).
+ return true;
+ }
+
+
+ if (m->get_connection()->has_feature(CEPH_FEATURE_RESEND_ON_SPLIT)) {
+ // >= luminous client
+ if (m->get_connection()->has_feature(CEPH_FEATURE_SERVER_NAUTILUS)) {
+ // >= nautilus client
+ if (m->get_map_epoch() < pool.info.get_last_force_op_resend()) {
+ dout(7) << __func__ << " sent before last_force_op_resend "
+ << pool.info.last_force_op_resend
+ << ", dropping" << *m << dendl;
+ return true;
+ }
+ } else {
+ // == < nautilus client (luminous or mimic)
+ if (m->get_map_epoch() < pool.info.get_last_force_op_resend_prenautilus()) {
+ dout(7) << __func__ << " sent before last_force_op_resend_prenautilus "
+ << pool.info.last_force_op_resend_prenautilus
+ << ", dropping" << *m << dendl;
+ return true;
+ }
+ }
+ if (m->get_map_epoch() < info.history.last_epoch_split) {
+ dout(7) << __func__ << " pg split in "
+ << info.history.last_epoch_split << ", dropping" << dendl;
+ return true;
+ }
+ } else if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_POOLRESEND)) {
+ // < luminous client
+ if (m->get_map_epoch() < pool.info.get_last_force_op_resend_preluminous()) {
+ dout(7) << __func__ << " sent before last_force_op_resend_preluminous "
+ << pool.info.last_force_op_resend_preluminous
+ << ", dropping" << *m << dendl;
+ return true;
+ }
+ }
+
+ return false;
+}
+
+template<typename T, int MSGTYPE>
+bool PG::can_discard_replica_op(OpRequestRef& op)
+{
+ auto m = op->get_req<T>();
+ ceph_assert(m->get_type() == MSGTYPE);
+
+ int from = m->get_source().num();
+
+ // if a repop is replied after a replica goes down in a new osdmap, and
+ // before the pg advances to this new osdmap, the repop replies before this
+ // repop can be discarded by that replica OSD, because the primary resets the
+ // connection to it when handling the new osdmap marking it down, and also
+ // resets the messenger sesssion when the replica reconnects. to avoid the
+ // out-of-order replies, the messages from that replica should be discarded.
+ OSDMapRef next_map = osd->get_next_osdmap();
+ if (next_map->is_down(from)) {
+ dout(20) << " " << __func__ << " dead for nextmap is down " << from << dendl;
+ return true;
+ }
+ /* Mostly, this overlaps with the old_peering_msg
+ * condition. An important exception is pushes
+ * sent by replicas not in the acting set, since
+ * if such a replica goes down it does not cause
+ * a new interval. */
+ if (next_map->get_down_at(from) >= m->map_epoch) {
+ dout(20) << " " << __func__ << " dead for 'get_down_at' " << from << dendl;
+ return true;
+ }
+
+ // same pg?
+ // if pg changes _at all_, we reset and repeer!
+ if (old_peering_msg(m->map_epoch, m->map_epoch)) {
+ dout(10) << "can_discard_replica_op pg changed " << info.history
+ << " after " << m->map_epoch
+ << ", dropping" << dendl;
+ return true;
+ }
+ return false;
+}
+
+bool PG::can_discard_scan(OpRequestRef op)
+{
+ auto m = op->get_req<MOSDPGScan>();
+ ceph_assert(m->get_type() == MSG_OSD_PG_SCAN);
+
+ if (old_peering_msg(m->map_epoch, m->query_epoch)) {
+ dout(10) << " got old scan, ignoring" << dendl;
+ return true;
+ }
+ return false;
+}
+
+bool PG::can_discard_backfill(OpRequestRef op)
+{
+ auto m = op->get_req<MOSDPGBackfill>();
+ ceph_assert(m->get_type() == MSG_OSD_PG_BACKFILL);
+
+ if (old_peering_msg(m->map_epoch, m->query_epoch)) {
+ dout(10) << " got old backfill, ignoring" << dendl;
+ return true;
+ }
+
+ return false;
+
+}
+
+bool PG::can_discard_request(OpRequestRef& op)
+{
+ switch (op->get_req()->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ return can_discard_op(op);
+ case CEPH_MSG_OSD_BACKOFF:
+ return false; // never discard
+ case MSG_OSD_REPOP:
+ return can_discard_replica_op<MOSDRepOp, MSG_OSD_REPOP>(op);
+ case MSG_OSD_PG_PUSH:
+ return can_discard_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op);
+ case MSG_OSD_PG_PULL:
+ return can_discard_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op);
+ case MSG_OSD_PG_PUSH_REPLY:
+ return can_discard_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op);
+ case MSG_OSD_REPOPREPLY:
+ return can_discard_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op);
+ case MSG_OSD_PG_RECOVERY_DELETE:
+ return can_discard_replica_op<MOSDPGRecoveryDelete, MSG_OSD_PG_RECOVERY_DELETE>(op);
+
+ case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+ return can_discard_replica_op<MOSDPGRecoveryDeleteReply, MSG_OSD_PG_RECOVERY_DELETE_REPLY>(op);
+
+ case MSG_OSD_EC_WRITE:
+ return can_discard_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
+ case MSG_OSD_EC_WRITE_REPLY:
+ return can_discard_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op);
+ case MSG_OSD_EC_READ:
+ return can_discard_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
+ case MSG_OSD_EC_READ_REPLY:
+ return can_discard_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
+ case MSG_OSD_REP_SCRUB:
+ return can_discard_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
+ case MSG_OSD_SCRUB_RESERVE:
+ return can_discard_replica_op<MOSDScrubReserve, MSG_OSD_SCRUB_RESERVE>(op);
+ case MSG_OSD_REP_SCRUBMAP:
+ return can_discard_replica_op<MOSDRepScrubMap, MSG_OSD_REP_SCRUBMAP>(op);
+ case MSG_OSD_PG_UPDATE_LOG_MISSING:
+ return can_discard_replica_op<
+ MOSDPGUpdateLogMissing, MSG_OSD_PG_UPDATE_LOG_MISSING>(op);
+ case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+ return can_discard_replica_op<
+ MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(op);
+
+ case MSG_OSD_PG_SCAN:
+ return can_discard_scan(op);
+ case MSG_OSD_PG_BACKFILL:
+ return can_discard_backfill(op);
+ case MSG_OSD_PG_BACKFILL_REMOVE:
+ return can_discard_replica_op<MOSDPGBackfillRemove,
+ MSG_OSD_PG_BACKFILL_REMOVE>(op);
+ }
+ return true;
+}
+
+void PG::do_peering_event(PGPeeringEventRef evt, PeeringCtx &rctx)
+{
+ dout(10) << __func__ << ": " << evt->get_desc() << dendl;
+ ceph_assert(have_same_or_newer_map(evt->get_epoch_sent()));
+ if (old_peering_evt(evt)) {
+ dout(10) << "discard old " << evt->get_desc() << dendl;
+ } else {
+ recovery_state.handle_event(evt, &rctx);
+ }
+ // write_if_dirty regardless of path above to ensure we capture any work
+ // done by OSD::advance_pg().
+ write_if_dirty(rctx.transaction);
+}
+
+void PG::queue_peering_event(PGPeeringEventRef evt)
+{
+ if (old_peering_evt(evt))
+ return;
+ osd->osd->enqueue_peering_evt(info.pgid, evt);
+}
+
+void PG::queue_null(epoch_t msg_epoch,
+ epoch_t query_epoch)
+{
+ dout(10) << "null" << dendl;
+ queue_peering_event(
+ PGPeeringEventRef(std::make_shared<PGPeeringEvent>(msg_epoch, query_epoch,
+ NullEvt())));
+}
+
+void PG::find_unfound(epoch_t queued, PeeringCtx &rctx)
+{
+ /*
+ * if we couldn't start any recovery ops and things are still
+ * unfound, see if we can discover more missing object locations.
+ * It may be that our initial locations were bad and we errored
+ * out while trying to pull.
+ */
+ if (!recovery_state.discover_all_missing(rctx)) {
+ string action;
+ if (state_test(PG_STATE_BACKFILLING)) {
+ auto evt = PGPeeringEventRef(
+ new PGPeeringEvent(
+ queued,
+ queued,
+ PeeringState::UnfoundBackfill()));
+ queue_peering_event(evt);
+ action = "in backfill";
+ } else if (state_test(PG_STATE_RECOVERING)) {
+ auto evt = PGPeeringEventRef(
+ new PGPeeringEvent(
+ queued,
+ queued,
+ PeeringState::UnfoundRecovery()));
+ queue_peering_event(evt);
+ action = "in recovery";
+ } else {
+ action = "already out of recovery/backfill";
+ }
+ dout(10) << __func__ << ": no luck, giving up on this pg for now (" << action << ")" << dendl;
+ } else {
+ dout(10) << __func__ << ": no luck, giving up on this pg for now (queue_recovery)" << dendl;
+ queue_recovery();
+ }
+}
+
+void PG::handle_advance_map(
+ OSDMapRef osdmap, OSDMapRef lastmap,
+ vector<int>& newup, int up_primary,
+ vector<int>& newacting, int acting_primary,
+ PeeringCtx &rctx)
+{
+ dout(10) << __func__ << ": " << osdmap->get_epoch() << dendl;
+ osd_shard->update_pg_epoch(pg_slot, osdmap->get_epoch());
+ recovery_state.advance_map(
+ osdmap,
+ lastmap,
+ newup,
+ up_primary,
+ newacting,
+ acting_primary,
+ rctx);
+}
+
+void PG::handle_activate_map(PeeringCtx &rctx)
+{
+ dout(10) << __func__ << ": " << get_osdmap()->get_epoch()
+ << dendl;
+ recovery_state.activate_map(rctx);
+
+ requeue_map_waiters();
+}
+
+void PG::handle_initialize(PeeringCtx &rctx)
+{
+ dout(10) << __func__ << dendl;
+ PeeringState::Initialize evt;
+ recovery_state.handle_event(evt, &rctx);
+}
+
+
+void PG::handle_query_state(Formatter *f)
+{
+ dout(10) << "handle_query_state" << dendl;
+ PeeringState::QueryState q(f);
+ recovery_state.handle_event(q, 0);
+
+ // This code has moved to after the close of recovery_state array.
+ // I don't think that scrub is a recovery state
+ if (is_primary() && is_active() && m_scrubber && m_scrubber->is_scrub_active()) {
+ m_scrubber->handle_query_state(f);
+ }
+}
+
+void PG::init_collection_pool_opts()
+{
+ auto r = osd->store->set_collection_opts(ch, pool.info.opts);
+ if (r < 0 && r != -EOPNOTSUPP) {
+ derr << __func__ << " set_collection_opts returns error:" << r << dendl;
+ }
+}
+
+void PG::on_pool_change()
+{
+ init_collection_pool_opts();
+ plpg_on_pool_change();
+}
+
+void PG::C_DeleteMore::complete(int r) {
+ ceph_assert(r == 0);
+ pg->lock();
+ if (!pg->pg_has_reset_since(epoch)) {
+ pg->osd->queue_for_pg_delete(pg->get_pgid(), epoch);
+ }
+ pg->unlock();
+ delete this;
+}
+
+std::pair<ghobject_t, bool> PG::do_delete_work(
+ ObjectStore::Transaction &t,
+ ghobject_t _next)
+{
+ dout(10) << __func__ << dendl;
+
+ {
+ float osd_delete_sleep = osd->osd->get_osd_delete_sleep();
+ if (osd_delete_sleep > 0 && delete_needs_sleep) {
+ epoch_t e = get_osdmap()->get_epoch();
+ PGRef pgref(this);
+ auto delete_requeue_callback = new LambdaContext([this, pgref, e](int r) {
+ dout(20) << "do_delete_work() [cb] wake up at "
+ << ceph_clock_now()
+ << ", re-queuing delete" << dendl;
+ std::scoped_lock locker{*this};
+ delete_needs_sleep = false;
+ if (!pg_has_reset_since(e)) {
+ osd->queue_for_pg_delete(get_pgid(), e);
+ }
+ });
+
+ auto delete_schedule_time = ceph::real_clock::now();
+ delete_schedule_time += ceph::make_timespan(osd_delete_sleep);
+ std::lock_guard l{osd->sleep_lock};
+ osd->sleep_timer.add_event_at(delete_schedule_time,
+ delete_requeue_callback);
+ dout(20) << __func__ << " Delete scheduled at " << delete_schedule_time << dendl;
+ return std::make_pair(_next, true);
+ }
+ }
+
+ delete_needs_sleep = true;
+
+ ghobject_t next;
+
+ vector<ghobject_t> olist;
+ int max = std::min(osd->store->get_ideal_list_max(),
+ (int)cct->_conf->osd_target_transaction_size);
+
+ osd->store->collection_list(
+ ch,
+ _next,
+ ghobject_t::get_max(),
+ max,
+ &olist,
+ &next);
+ dout(20) << __func__ << " " << olist << dendl;
+
+ // make sure we've removed everything
+ // by one more listing from the beginning
+ if (_next != ghobject_t() && olist.empty()) {
+ next = ghobject_t();
+ osd->store->collection_list(
+ ch,
+ next,
+ ghobject_t::get_max(),
+ max,
+ &olist,
+ &next);
+ if (!olist.empty()) {
+ for (auto& oid : olist) {
+ if (oid == pgmeta_oid) {
+ dout(20) << __func__ << " removing pgmeta object " << oid << dendl;
+ } else {
+ dout(0) << __func__ << " additional unexpected onode"
+ <<" new onode has appeared since PG removal started"
+ << oid << dendl;
+ }
+ }
+ }
+ }
+
+ OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
+ int64_t num = 0;
+ for (auto& oid : olist) {
+ if (oid == pgmeta_oid) {
+ continue;
+ }
+ if (oid.is_pgmeta()) {
+ osd->clog->warn() << info.pgid << " found stray pgmeta-like " << oid
+ << " during PG removal";
+ }
+ int r = snap_mapper.remove_oid(oid.hobj, &_t);
+ if (r != 0 && r != -ENOENT) {
+ ceph_abort();
+ }
+ t.remove(coll, oid);
+ ++num;
+ }
+ bool running = true;
+ if (num) {
+ dout(20) << __func__ << " deleting " << num << " objects" << dendl;
+ Context *fin = new C_DeleteMore(this, get_osdmap_epoch());
+ t.register_on_commit(fin);
+ } else {
+ if (cct->_conf->osd_inject_failure_on_pg_removal) {
+ _exit(1);
+ }
+
+ // final flush here to ensure completions drop refs. Of particular concern
+ // are the SnapMapper ContainerContexts.
+ {
+ PGRef pgref(this);
+ PGLog::clear_info_log(info.pgid, &t);
+ t.remove_collection(coll);
+ t.register_on_commit(new ContainerContext<PGRef>(pgref));
+ t.register_on_applied(new ContainerContext<PGRef>(pgref));
+ osd->store->queue_transaction(ch, std::move(t));
+ }
+ ch->flush();
+
+ if (!osd->try_finish_pg_delete(this, pool.info.get_pg_num())) {
+ dout(1) << __func__ << " raced with merge, reinstantiating" << dendl;
+ ch = osd->store->create_new_collection(coll);
+ create_pg_collection(t,
+ info.pgid,
+ info.pgid.get_split_bits(pool.info.get_pg_num()));
+ init_pg_ondisk(t, info.pgid, &pool.info);
+ recovery_state.reset_last_persisted();
+ } else {
+ recovery_state.set_delete_complete();
+
+ // cancel reserver here, since the PG is about to get deleted and the
+ // exit() methods don't run when that happens.
+ osd->local_reserver.cancel_reservation(info.pgid);
+
+ running = false;
+ }
+ }
+ return {next, running};
+}
+
+int PG::pg_stat_adjust(osd_stat_t *ns)
+{
+ osd_stat_t &new_stat = *ns;
+ if (is_primary()) {
+ return 0;
+ }
+ // Adjust the kb_used by adding pending backfill data
+ uint64_t reserved_num_bytes = get_reserved_num_bytes();
+
+ // For now we don't consider projected space gains here
+ // I suggest we have an optional 2 pass backfill that frees up
+ // space in a first pass. This could be triggered when at nearfull
+ // or near to backfillfull.
+ if (reserved_num_bytes > 0) {
+ // TODO: Handle compression by adjusting by the PGs average
+ // compression precentage.
+ dout(20) << __func__ << " reserved_num_bytes " << (reserved_num_bytes >> 10) << "KiB"
+ << " Before kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
+ if (new_stat.statfs.available > reserved_num_bytes)
+ new_stat.statfs.available -= reserved_num_bytes;
+ else
+ new_stat.statfs.available = 0;
+ dout(20) << __func__ << " After kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
+ return 1;
+ }
+ return 0;
+}
+
+void PG::dump_pgstate_history(Formatter *f)
+{
+ std::scoped_lock l{*this};
+ recovery_state.dump_history(f);
+}
+
+void PG::dump_missing(Formatter *f)
+{
+ for (auto& i : recovery_state.get_pg_log().get_missing().get_items()) {
+ f->open_object_section("object");
+ f->dump_object("oid", i.first);
+ f->dump_object("missing_info", i.second);
+ if (recovery_state.get_missing_loc().needs_recovery(i.first)) {
+ f->dump_bool(
+ "unfound",
+ recovery_state.get_missing_loc().is_unfound(i.first));
+ f->open_array_section("locations");
+ for (auto l : recovery_state.get_missing_loc().get_locations(i.first)) {
+ f->dump_object("shard", l);
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+}
+
+void PG::get_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)> f)
+{
+ std::lock_guard l{pg_stats_publish_lock};
+ if (pg_stats_publish_valid) {
+ f(pg_stats_publish, pg_stats_publish.get_effective_last_epoch_clean());
+ }
+}
+
+void PG::with_heartbeat_peers(std::function<void(int)> f)
+{
+ std::lock_guard l{heartbeat_peer_lock};
+ for (auto p : heartbeat_peers) {
+ f(p);
+ }
+ for (auto p : probe_targets) {
+ f(p);
+ }
+}
+
+uint64_t PG::get_min_alloc_size() const {
+ return osd->store->get_min_alloc_size();
+}