From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/osd/scrubber/PrimaryLogScrub.cc | 260 +++ src/osd/scrubber/PrimaryLogScrub.h | 51 + src/osd/scrubber/ScrubStore.cc | 208 +++ src/osd/scrubber/ScrubStore.h | 63 + src/osd/scrubber/osd_scrub_sched.cc | 817 +++++++++ src/osd/scrubber/osd_scrub_sched.h | 553 ++++++ src/osd/scrubber/pg_scrubber.cc | 2979 ++++++++++++++++++++++++++++++++ src/osd/scrubber/pg_scrubber.h | 1047 +++++++++++ src/osd/scrubber/scrub_backend.cc | 1954 +++++++++++++++++++++ src/osd/scrubber/scrub_backend.h | 554 ++++++ src/osd/scrubber/scrub_machine.cc | 602 +++++++ src/osd/scrubber/scrub_machine.h | 384 ++++ src/osd/scrubber/scrub_machine_lstnr.h | 223 +++ 13 files changed, 9695 insertions(+) create mode 100644 src/osd/scrubber/PrimaryLogScrub.cc create mode 100644 src/osd/scrubber/PrimaryLogScrub.h create mode 100644 src/osd/scrubber/ScrubStore.cc create mode 100644 src/osd/scrubber/ScrubStore.h create mode 100644 src/osd/scrubber/osd_scrub_sched.cc create mode 100644 src/osd/scrubber/osd_scrub_sched.h create mode 100644 src/osd/scrubber/pg_scrubber.cc create mode 100644 src/osd/scrubber/pg_scrubber.h create mode 100644 src/osd/scrubber/scrub_backend.cc create mode 100644 src/osd/scrubber/scrub_backend.h create mode 100644 src/osd/scrubber/scrub_machine.cc create mode 100644 src/osd/scrubber/scrub_machine.h create mode 100644 src/osd/scrubber/scrub_machine_lstnr.h (limited to 'src/osd/scrubber') diff --git a/src/osd/scrubber/PrimaryLogScrub.cc b/src/osd/scrubber/PrimaryLogScrub.cc new file mode 100644 index 000000000..74661ab12 --- /dev/null +++ b/src/osd/scrubber/PrimaryLogScrub.cc @@ -0,0 +1,260 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "./PrimaryLogScrub.h" + +#include + +#include "common/scrub_types.h" +#include "osd/PeeringState.h" +#include "osd/PrimaryLogPG.h" +#include "osd/osd_types_fmt.h" + +#include "scrub_machine.h" + +#define dout_context (m_osds->cct) +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) + +template +static ostream& _prefix(std::ostream* _dout, T* t) +{ + return t->gen_prefix(*_dout); +} + +using namespace Scrub; + +bool PrimaryLogScrub::get_store_errors(const scrub_ls_arg_t& arg, + scrub_ls_result_t& res_inout) const +{ + if (!m_store) { + return false; + } + + if (arg.get_snapsets) { + res_inout.vals = m_store->get_snap_errors(m_pg->get_pgid().pool(), + arg.start_after, + arg.max_return); + } else { + res_inout.vals = m_store->get_object_errors(m_pg->get_pgid().pool(), + arg.start_after, + arg.max_return); + } + return true; +} + +/// \todo combine the multiple transactions into a single one +void PrimaryLogScrub::submit_digest_fixes(const digests_fixes_t& fixes) +{ + // note: the following line was modified from '+=' to '=', as we should not + // encounter previous-chunk digest updates after starting a new chunk + num_digest_updates_pending = fixes.size(); + dout(10) << __func__ + << ": num_digest_updates_pending: " << num_digest_updates_pending + << dendl; + + for (auto& [obj, dgs] : fixes) { + + ObjectContextRef obc = m_pl_pg->get_object_context(obj, false); + if (!obc) { + m_osds->clog->error() << m_pg_id << " " << m_mode_desc + << " cannot get object context for object " << obj; + num_digest_updates_pending--; + continue; + } + dout(15) << fmt::format( + "{}: {}, pg[{}] {}/{}", __func__, num_digest_updates_pending, + m_pg_id, obj, dgs) + << dendl; + if (obc->obs.oi.soid != obj) { + m_osds->clog->error() + << m_pg_id << " " << m_mode_desc << " " << obj + << " : object has a valid oi attr with a mismatched name, " + << " obc->obs.oi.soid: " << obc->obs.oi.soid; + num_digest_updates_pending--; + continue; + } + + PrimaryLogPG::OpContextUPtr ctx = m_pl_pg->simple_opc_create(obc); + ctx->at_version = m_pl_pg->get_next_version(); + ctx->mtime = utime_t(); // do not update mtime + if (dgs.first) { + ctx->new_obs.oi.set_data_digest(*dgs.first); + } else { + ctx->new_obs.oi.clear_data_digest(); + } + if (dgs.second) { + ctx->new_obs.oi.set_omap_digest(*dgs.second); + } else { + ctx->new_obs.oi.clear_omap_digest(); + } + m_pl_pg->finish_ctx(ctx.get(), pg_log_entry_t::MODIFY); + + + ctx->register_on_success([this]() { + if ((num_digest_updates_pending >= 1) && + (--num_digest_updates_pending == 0)) { + m_osds->queue_scrub_digest_update(m_pl_pg, + m_pl_pg->is_scrub_blocking_ops()); + } + }); + + m_pl_pg->simple_opc_submit(std::move(ctx)); + } +} + +// a forwarder used by the scrubber backend + +void PrimaryLogScrub::add_to_stats(const object_stat_sum_t& stat) +{ + m_scrub_cstat.add(stat); +} + + +void PrimaryLogScrub::_scrub_finish() +{ + auto& info = m_pg->get_pg_info(ScrubberPasskey{}); ///< a temporary alias + + dout(10) << __func__ << " info stats: " + << (info.stats.stats_invalid ? "invalid" : "valid") + << " m_is_repair: " << m_is_repair << dendl; + + if (info.stats.stats_invalid) { + m_pl_pg->recovery_state.update_stats([=, this](auto& history, auto& stats) { + stats.stats = m_scrub_cstat; + stats.stats_invalid = false; + return false; + }); + + if (m_pl_pg->agent_state) { + m_pl_pg->agent_choose_mode(); + } + } + + dout(10) << m_mode_desc << " got " << m_scrub_cstat.sum.num_objects << "/" + << info.stats.stats.sum.num_objects << " objects, " + << m_scrub_cstat.sum.num_object_clones << "/" + << info.stats.stats.sum.num_object_clones << " clones, " + << m_scrub_cstat.sum.num_objects_dirty << "/" + << info.stats.stats.sum.num_objects_dirty << " dirty, " + << m_scrub_cstat.sum.num_objects_omap << "/" + << info.stats.stats.sum.num_objects_omap << " omap, " + << m_scrub_cstat.sum.num_objects_pinned << "/" + << info.stats.stats.sum.num_objects_pinned << " pinned, " + << m_scrub_cstat.sum.num_objects_hit_set_archive << "/" + << info.stats.stats.sum.num_objects_hit_set_archive + << " hit_set_archive, " << m_scrub_cstat.sum.num_bytes << "/" + << info.stats.stats.sum.num_bytes << " bytes, " + << m_scrub_cstat.sum.num_objects_manifest << "/" + << info.stats.stats.sum.num_objects_manifest << " manifest objects, " + << m_scrub_cstat.sum.num_bytes_hit_set_archive << "/" + << info.stats.stats.sum.num_bytes_hit_set_archive + << " hit_set_archive bytes." << dendl; + + if (m_scrub_cstat.sum.num_objects != info.stats.stats.sum.num_objects || + m_scrub_cstat.sum.num_object_clones != + info.stats.stats.sum.num_object_clones || + (m_scrub_cstat.sum.num_objects_dirty != + info.stats.stats.sum.num_objects_dirty && + !info.stats.dirty_stats_invalid) || + (m_scrub_cstat.sum.num_objects_omap != + info.stats.stats.sum.num_objects_omap && + !info.stats.omap_stats_invalid) || + (m_scrub_cstat.sum.num_objects_pinned != + info.stats.stats.sum.num_objects_pinned && + !info.stats.pin_stats_invalid) || + (m_scrub_cstat.sum.num_objects_hit_set_archive != + info.stats.stats.sum.num_objects_hit_set_archive && + !info.stats.hitset_stats_invalid) || + (m_scrub_cstat.sum.num_bytes_hit_set_archive != + info.stats.stats.sum.num_bytes_hit_set_archive && + !info.stats.hitset_bytes_stats_invalid) || + (m_scrub_cstat.sum.num_objects_manifest != + info.stats.stats.sum.num_objects_manifest && + !info.stats.manifest_stats_invalid) || + m_scrub_cstat.sum.num_whiteouts != info.stats.stats.sum.num_whiteouts || + m_scrub_cstat.sum.num_bytes != info.stats.stats.sum.num_bytes) { + + m_osds->clog->error() << info.pgid << " " << m_mode_desc + << " : stat mismatch, got " + << m_scrub_cstat.sum.num_objects << "/" + << info.stats.stats.sum.num_objects << " objects, " + << m_scrub_cstat.sum.num_object_clones << "/" + << info.stats.stats.sum.num_object_clones + << " clones, " << m_scrub_cstat.sum.num_objects_dirty + << "/" << info.stats.stats.sum.num_objects_dirty + << " dirty, " << m_scrub_cstat.sum.num_objects_omap + << "/" << info.stats.stats.sum.num_objects_omap + << " omap, " << m_scrub_cstat.sum.num_objects_pinned + << "/" << info.stats.stats.sum.num_objects_pinned + << " pinned, " + << m_scrub_cstat.sum.num_objects_hit_set_archive + << "/" + << info.stats.stats.sum.num_objects_hit_set_archive + << " hit_set_archive, " + << m_scrub_cstat.sum.num_whiteouts << "/" + << info.stats.stats.sum.num_whiteouts + << " whiteouts, " << m_scrub_cstat.sum.num_bytes + << "/" << info.stats.stats.sum.num_bytes << " bytes, " + << m_scrub_cstat.sum.num_objects_manifest << "/" + << info.stats.stats.sum.num_objects_manifest + << " manifest objects, " + << m_scrub_cstat.sum.num_bytes_hit_set_archive << "/" + << info.stats.stats.sum.num_bytes_hit_set_archive + << " hit_set_archive bytes."; + ++m_shallow_errors; + + if (m_is_repair) { + ++m_fixed_count; + m_pl_pg->recovery_state.update_stats([this](auto& history, auto& stats) { + stats.stats = m_scrub_cstat; + stats.dirty_stats_invalid = false; + stats.omap_stats_invalid = false; + stats.hitset_stats_invalid = false; + stats.hitset_bytes_stats_invalid = false; + stats.pin_stats_invalid = false; + stats.manifest_stats_invalid = false; + return false; + }); + m_pl_pg->publish_stats_to_osd(); + m_pl_pg->recovery_state.share_pg_info(); + } + } + // Clear object context cache to get repair information + if (m_is_repair) + m_pl_pg->object_contexts.clear(); +} + +PrimaryLogScrub::PrimaryLogScrub(PrimaryLogPG* pg) : PgScrubber{pg}, m_pl_pg{pg} +{} + +void PrimaryLogScrub::_scrub_clear_state() +{ + m_scrub_cstat = object_stat_collection_t(); +} + +void PrimaryLogScrub::stats_of_handled_objects( + const object_stat_sum_t& delta_stats, + const hobject_t& soid) +{ + // We scrub objects in hobject_t order, so objects before m_start have already + // been scrubbed and their stats have already been added to the scrubber. + // Objects after that point haven't been included in the scrubber's stats + // accounting yet, so they will be included when the scrubber gets to that + // object. + if (is_primary() && is_scrub_active()) { + if (soid < m_start) { + + dout(20) << fmt::format("{} {} < [{},{})", __func__, soid, m_start, m_end) + << dendl; + m_scrub_cstat.add(delta_stats); + + } else { + + dout(25) + << fmt::format("{} {} >= [{},{})", __func__, soid, m_start, m_end) + << dendl; + } + } +} diff --git a/src/osd/scrubber/PrimaryLogScrub.h b/src/osd/scrubber/PrimaryLogScrub.h new file mode 100644 index 000000000..58e66223e --- /dev/null +++ b/src/osd/scrubber/PrimaryLogScrub.h @@ -0,0 +1,51 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#pragma once + +// the './' includes are marked this way to affect clang-format +#include "./pg_scrubber.h" + +#include "debug.h" + +#include "common/errno.h" +#include "common/scrub_types.h" +#include "messages/MOSDOp.h" +#include "messages/MOSDRepScrub.h" +#include "messages/MOSDRepScrubMap.h" +#include "messages/MOSDScrubReserve.h" +#include "osd/OSD.h" + +#include "scrub_machine.h" + +class PrimaryLogPG; + +/** + * The derivative of PgScrubber that is used by PrimaryLogPG. + */ +class PrimaryLogScrub : public PgScrubber { + public: + explicit PrimaryLogScrub(PrimaryLogPG* pg); + + void _scrub_finish() final; + + bool get_store_errors(const scrub_ls_arg_t& arg, + scrub_ls_result_t& res_inout) const final; + + void stats_of_handled_objects(const object_stat_sum_t& delta_stats, + const hobject_t& soid) final; + + // the interface used by the scrubber-backend: + + void add_to_stats(const object_stat_sum_t& stat) final; + + void submit_digest_fixes(const digests_fixes_t& fixes) final; + + private: + // we know our PG is actually a PrimaryLogPG. Let's alias the pointer to that + // object: + PrimaryLogPG* const m_pl_pg; + + // handle our part in stats collection + object_stat_collection_t m_scrub_cstat; + void _scrub_clear_state() final; // which just clears the stats +}; diff --git a/src/osd/scrubber/ScrubStore.cc b/src/osd/scrubber/ScrubStore.cc new file mode 100644 index 000000000..a00ab2cae --- /dev/null +++ b/src/osd/scrubber/ScrubStore.cc @@ -0,0 +1,208 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "ScrubStore.h" +#include "osd/osd_types.h" +#include "common/scrub_types.h" +#include "include/rados/rados_types.hpp" + +using std::ostringstream; +using std::string; +using std::vector; + +using ceph::bufferlist; + +namespace { +ghobject_t make_scrub_object(const spg_t& pgid) +{ + ostringstream ss; + ss << "scrub_" << pgid; + return pgid.make_temp_ghobject(ss.str()); +} + +string first_object_key(int64_t pool) +{ + auto hoid = hobject_t(object_t(), + "", + 0, + 0x00000000, + pool, + ""); + hoid.build_hash_cache(); + return "SCRUB_OBJ_" + hoid.to_str(); +} + +// the object_key should be unique across pools +string to_object_key(int64_t pool, const librados::object_id_t& oid) +{ + auto hoid = hobject_t(object_t(oid.name), + oid.locator, // key + oid.snap, + 0, // hash + pool, + oid.nspace); + hoid.build_hash_cache(); + return "SCRUB_OBJ_" + hoid.to_str(); +} + +string last_object_key(int64_t pool) +{ + auto hoid = hobject_t(object_t(), + "", + 0, + 0xffffffff, + pool, + ""); + hoid.build_hash_cache(); + return "SCRUB_OBJ_" + hoid.to_str(); +} + +string first_snap_key(int64_t pool) +{ + // scrub object is per spg_t object, so we can misuse the hash (pg.seed) for + // the representing the minimal and maximum keys. and this relies on how + // hobject_t::to_str() works: hex(pool).hex(revhash). + auto hoid = hobject_t(object_t(), + "", + 0, + 0x00000000, + pool, + ""); + hoid.build_hash_cache(); + return "SCRUB_SS_" + hoid.to_str(); +} + +string to_snap_key(int64_t pool, const librados::object_id_t& oid) +{ + auto hoid = hobject_t(object_t(oid.name), + oid.locator, // key + oid.snap, + 0x77777777, // hash + pool, + oid.nspace); + hoid.build_hash_cache(); + return "SCRUB_SS_" + hoid.to_str(); +} + +string last_snap_key(int64_t pool) +{ + auto hoid = hobject_t(object_t(), + "", + 0, + 0xffffffff, + pool, + ""); + hoid.build_hash_cache(); + return "SCRUB_SS_" + hoid.to_str(); +} +} + +namespace Scrub { + +Store* +Store::create(ObjectStore* store, + ObjectStore::Transaction* t, + const spg_t& pgid, + const coll_t& coll) +{ + ceph_assert(store); + ceph_assert(t); + ghobject_t oid = make_scrub_object(pgid); + t->touch(coll, oid); + return new Store{coll, oid, store}; +} + +Store::Store(const coll_t& coll, const ghobject_t& oid, ObjectStore* store) + : coll(coll), + hoid(oid), + driver(store, coll, hoid), + backend(&driver) +{} + +Store::~Store() +{ + ceph_assert(results.empty()); +} + +void Store::add_error(int64_t pool, const inconsistent_obj_wrapper& e) +{ + add_object_error(pool, e); +} + +void Store::add_object_error(int64_t pool, const inconsistent_obj_wrapper& e) +{ + bufferlist bl; + e.encode(bl); + results[to_object_key(pool, e.object)] = bl; +} + +void Store::add_error(int64_t pool, const inconsistent_snapset_wrapper& e) +{ + add_snap_error(pool, e); +} + +void Store::add_snap_error(int64_t pool, const inconsistent_snapset_wrapper& e) +{ + bufferlist bl; + e.encode(bl); + results[to_snap_key(pool, e.object)] = bl; +} + +bool Store::empty() const +{ + return results.empty(); +} + +void Store::flush(ObjectStore::Transaction* t) +{ + if (t) { + OSDriver::OSTransaction txn = driver.get_transaction(t); + backend.set_keys(results, &txn); + } + results.clear(); +} + +void Store::cleanup(ObjectStore::Transaction* t) +{ + t->remove(coll, hoid); +} + +std::vector +Store::get_snap_errors(int64_t pool, + const librados::object_id_t& start, + uint64_t max_return) const +{ + const string begin = (start.name.empty() ? + first_snap_key(pool) : to_snap_key(pool, start)); + const string end = last_snap_key(pool); + return get_errors(begin, end, max_return); +} + +std::vector +Store::get_object_errors(int64_t pool, + const librados::object_id_t& start, + uint64_t max_return) const +{ + const string begin = (start.name.empty() ? + first_object_key(pool) : to_object_key(pool, start)); + const string end = last_object_key(pool); + return get_errors(begin, end, max_return); +} + +std::vector +Store::get_errors(const string& begin, + const string& end, + uint64_t max_return) const +{ + vector errors; + auto next = std::make_pair(begin, bufferlist{}); + while (max_return && !backend.get_next(next.first, &next)) { + if (next.first >= end) + break; + errors.push_back(next.second); + max_return--; + } + return errors; +} + +} // namespace Scrub diff --git a/src/osd/scrubber/ScrubStore.h b/src/osd/scrubber/ScrubStore.h new file mode 100644 index 000000000..567badf60 --- /dev/null +++ b/src/osd/scrubber/ScrubStore.h @@ -0,0 +1,63 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_SCRUB_RESULT_H +#define CEPH_SCRUB_RESULT_H + +#include "common/map_cacher.hpp" +#include "osd/SnapMapper.h" // for OSDriver + +namespace librados { +struct object_id_t; +} + +struct inconsistent_obj_wrapper; +struct inconsistent_snapset_wrapper; + +namespace Scrub { + +class Store { + public: + ~Store(); + static Store* create(ObjectStore* store, + ObjectStore::Transaction* t, + const spg_t& pgid, + const coll_t& coll); + void add_object_error(int64_t pool, const inconsistent_obj_wrapper& e); + void add_snap_error(int64_t pool, const inconsistent_snapset_wrapper& e); + + // and a variant-friendly interface: + void add_error(int64_t pool, const inconsistent_obj_wrapper& e); + void add_error(int64_t pool, const inconsistent_snapset_wrapper& e); + + bool empty() const; + void flush(ObjectStore::Transaction*); + void cleanup(ObjectStore::Transaction*); + + std::vector get_snap_errors( + int64_t pool, + const librados::object_id_t& start, + uint64_t max_return) const; + + std::vector get_object_errors( + int64_t pool, + const librados::object_id_t& start, + uint64_t max_return) const; + + private: + Store(const coll_t& coll, const ghobject_t& oid, ObjectStore* store); + std::vector get_errors(const std::string& start, + const std::string& end, + uint64_t max_return) const; + private: + const coll_t coll; + const ghobject_t hoid; + // a temp object holding mappings from seq-id to inconsistencies found in + // scrubbing + OSDriver driver; + mutable MapCacher::MapCacher backend; + std::map results; +}; +} // namespace Scrub + +#endif // CEPH_SCRUB_RESULT_H diff --git a/src/osd/scrubber/osd_scrub_sched.cc b/src/osd/scrubber/osd_scrub_sched.cc new file mode 100644 index 000000000..82b7c689d --- /dev/null +++ b/src/osd/scrubber/osd_scrub_sched.cc @@ -0,0 +1,817 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include "./osd_scrub_sched.h" + +#include "osd/OSD.h" + +#include "pg_scrubber.h" + +using namespace ::std::literals; + +// ////////////////////////////////////////////////////////////////////////// // +// ScrubJob + +#define dout_context (cct) +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix *_dout << "osd." << whoami << " " + +ScrubQueue::ScrubJob::ScrubJob(CephContext* cct, const spg_t& pg, int node_id) + : RefCountedObject{cct} + , pgid{pg} + , whoami{node_id} + , cct{cct} +{} + +// debug usage only +ostream& operator<<(ostream& out, const ScrubQueue::ScrubJob& sjob) +{ + out << sjob.pgid << ", " << sjob.schedule.scheduled_at + << " dead: " << sjob.schedule.deadline << " - " + << sjob.registration_state() << " / failure: " << sjob.resources_failure + << " / pen. t.o.: " << sjob.penalty_timeout + << " / queue state: " << ScrubQueue::qu_state_text(sjob.state); + + return out; +} + +void ScrubQueue::ScrubJob::update_schedule( + const ScrubQueue::scrub_schedule_t& adjusted) +{ + schedule = adjusted; + penalty_timeout = utime_t(0, 0); // helps with debugging + + // 'updated' is changed here while not holding jobs_lock. That's OK, as + // the (atomic) flag will only be cleared by select_pg_and_scrub() after + // scan_penalized() is called and the job was moved to the to_scrub queue. + updated = true; + dout(10) << fmt::format("{}: pg[{}] adjusted: {:s} ({})", __func__, pgid, + schedule.scheduled_at, registration_state()) << dendl; +} + +std::string ScrubQueue::ScrubJob::scheduling_state(utime_t now_is, + bool is_deep_expected) const +{ + // if not in the OSD scheduling queues, not a candidate for scrubbing + if (state != qu_state_t::registered) { + return "no scrub is scheduled"; + } + + // if the time has passed, we are surely in the queue + // (note that for now we do not tell client if 'penalized') + if (now_is > schedule.scheduled_at) { + // we are never sure that the next scrub will indeed be shallow: + return fmt::format("queued for {}scrub", (is_deep_expected ? "deep " : "")); + } + + return fmt::format("{}scrub scheduled @ {:s}", + (is_deep_expected ? "deep " : ""), + schedule.scheduled_at); +} + + +// ////////////////////////////////////////////////////////////////////////// // +// ScrubQueue + +#undef dout_context +#define dout_context (cct) +#undef dout_prefix +#define dout_prefix \ + *_dout << "osd." << osd_service.get_nodeid() << " scrub-queue::" << __func__ \ + << " " + + +ScrubQueue::ScrubQueue(CephContext* cct, Scrub::ScrubSchedListener& osds) + : cct{cct} + , osd_service{osds} +{ + // initialize the daily loadavg with current 15min loadavg + if (double loadavgs[3]; getloadavg(loadavgs, 3) == 3) { + daily_loadavg = loadavgs[2]; + } else { + derr << "OSD::init() : couldn't read loadavgs\n" << dendl; + daily_loadavg = 1.0; + } +} + +std::optional ScrubQueue::update_load_average() +{ + int hb_interval = conf()->osd_heartbeat_interval; + int n_samples = 60 * 24 * 24; + if (hb_interval > 1) { + n_samples /= hb_interval; + if (n_samples < 1) + n_samples = 1; + } + + // get CPU load avg + double loadavg; + if (getloadavg(&loadavg, 1) == 1) { + daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavg) / n_samples; + dout(17) << "heartbeat: daily_loadavg " << daily_loadavg << dendl; + return 100 * loadavg; + } + + return std::nullopt; +} + +/* + * Modify the scrub job state: + * - if 'registered' (as expected): mark as 'unregistering'. The job will be + * dequeued the next time sched_scrub() is called. + * - if already 'not_registered': shouldn't really happen, but not a problem. + * The state will not be modified. + * - same for 'unregistering'. + * + * Note: not holding the jobs lock + */ +void ScrubQueue::remove_from_osd_queue(ScrubJobRef scrub_job) +{ + dout(15) << "removing pg[" << scrub_job->pgid << "] from OSD scrub queue" + << dendl; + + qu_state_t expected_state{qu_state_t::registered}; + auto ret = + scrub_job->state.compare_exchange_strong(expected_state, + qu_state_t::unregistering); + + if (ret) { + + dout(10) << "pg[" << scrub_job->pgid << "] sched-state changed from " + << qu_state_text(expected_state) << " to " + << qu_state_text(scrub_job->state) << dendl; + + } else { + + // job wasn't in state 'registered' coming in + dout(5) << "removing pg[" << scrub_job->pgid + << "] failed. State was: " << qu_state_text(expected_state) + << dendl; + } +} + +void ScrubQueue::register_with_osd( + ScrubJobRef scrub_job, + const ScrubQueue::sched_params_t& suggested) +{ + qu_state_t state_at_entry = scrub_job->state.load(); + dout(20) << fmt::format( + "pg[{}] state at entry: <{:.14}>", scrub_job->pgid, + state_at_entry) + << dendl; + + switch (state_at_entry) { + case qu_state_t::registered: + // just updating the schedule? + update_job(scrub_job, suggested); + break; + + case qu_state_t::not_registered: + // insertion under lock + { + std::unique_lock lck{jobs_lock}; + + if (state_at_entry != scrub_job->state) { + lck.unlock(); + dout(5) << " scrub job state changed. Retrying." << dendl; + // retry + register_with_osd(scrub_job, suggested); + break; + } + + update_job(scrub_job, suggested); + to_scrub.push_back(scrub_job); + scrub_job->in_queues = true; + scrub_job->state = qu_state_t::registered; + } + break; + + case qu_state_t::unregistering: + // restore to the to_sched queue + { + // must be under lock, as the job might be removed from the queue + // at any minute + std::lock_guard lck{jobs_lock}; + + update_job(scrub_job, suggested); + if (scrub_job->state == qu_state_t::not_registered) { + dout(5) << " scrub job state changed to 'not registered'" << dendl; + to_scrub.push_back(scrub_job); + } + scrub_job->in_queues = true; + scrub_job->state = qu_state_t::registered; + } + break; + } + + dout(10) << fmt::format( + "pg[{}] sched-state changed from <{:.14}> to <{:.14}> (@{:s})", + scrub_job->pgid, state_at_entry, scrub_job->state.load(), + scrub_job->schedule.scheduled_at) + << dendl; +} + +// look mommy - no locks! +void ScrubQueue::update_job(ScrubJobRef scrub_job, + const ScrubQueue::sched_params_t& suggested) +{ + // adjust the suggested scrub time according to OSD-wide status + auto adjusted = adjust_target_time(suggested); + scrub_job->update_schedule(adjusted); +} + +ScrubQueue::sched_params_t ScrubQueue::determine_scrub_time( + const requested_scrub_t& request_flags, + const pg_info_t& pg_info, + const pool_opts_t& pool_conf) const +{ + ScrubQueue::sched_params_t res; + + if (request_flags.must_scrub || request_flags.need_auto) { + + // Set the smallest time that isn't utime_t() + res.proposed_time = PgScrubber::scrub_must_stamp(); + res.is_must = ScrubQueue::must_scrub_t::mandatory; + // we do not need the interval data in this case + + } else if (pg_info.stats.stats_invalid && conf()->osd_scrub_invalid_stats) { + res.proposed_time = time_now(); + res.is_must = ScrubQueue::must_scrub_t::mandatory; + + } else { + res.proposed_time = pg_info.history.last_scrub_stamp; + res.min_interval = pool_conf.value_or(pool_opts_t::SCRUB_MIN_INTERVAL, 0.0); + res.max_interval = pool_conf.value_or(pool_opts_t::SCRUB_MAX_INTERVAL, 0.0); + } + + dout(15) << fmt::format( + "suggested: {:s} hist: {:s} v:{}/{} must:{} pool-min:{} {}", + res.proposed_time, pg_info.history.last_scrub_stamp, + (bool)pg_info.stats.stats_invalid, + conf()->osd_scrub_invalid_stats, + (res.is_must == must_scrub_t::mandatory ? "y" : "n"), + res.min_interval, request_flags) + << dendl; + return res; +} + + +// used under jobs_lock +void ScrubQueue::move_failed_pgs(utime_t now_is) +{ + int punished_cnt{0}; // for log/debug only + + for (auto job = to_scrub.begin(); job != to_scrub.end();) { + if ((*job)->resources_failure) { + auto sjob = *job; + + // last time it was scheduled for a scrub, this PG failed in securing + // remote resources. Move it to the secondary scrub queue. + + dout(15) << "moving " << sjob->pgid + << " state: " << ScrubQueue::qu_state_text(sjob->state) << dendl; + + // determine the penalty time, after which the job should be reinstated + utime_t after = now_is; + after += conf()->osd_scrub_sleep * 2 + utime_t{300'000ms}; + + // note: currently - not taking 'deadline' into account when determining + // 'penalty_timeout'. + sjob->penalty_timeout = after; + sjob->resources_failure = false; + sjob->updated = false; // as otherwise will be pardoned immediately + + // place in the penalty list, and remove from the to-scrub group + penalized.push_back(sjob); + job = to_scrub.erase(job); + punished_cnt++; + } else { + job++; + } + } + + if (punished_cnt) { + dout(15) << "# of jobs penalized: " << punished_cnt << dendl; + } +} + +// clang-format off +/* + * Implementation note: + * Clang (10 & 11) produces here efficient table-based code, comparable to using + * a direct index into an array of strings. + * Gcc (11, trunk) is almost as efficient. + */ +std::string_view ScrubQueue::attempt_res_text(Scrub::schedule_result_t v) +{ + switch (v) { + case Scrub::schedule_result_t::scrub_initiated: return "scrubbing"sv; + case Scrub::schedule_result_t::none_ready: return "no ready job"sv; + case Scrub::schedule_result_t::no_local_resources: return "local resources shortage"sv; + case Scrub::schedule_result_t::already_started: return "denied as already started"sv; + case Scrub::schedule_result_t::no_such_pg: return "pg not found"sv; + case Scrub::schedule_result_t::bad_pg_state: return "prevented by pg state"sv; + case Scrub::schedule_result_t::preconditions: return "preconditions not met"sv; + } + // g++ (unlike CLANG), requires an extra 'return' here + return "(unknown)"sv; +} + +std::string_view ScrubQueue::qu_state_text(qu_state_t st) +{ + switch (st) { + case qu_state_t::not_registered: return "not registered w/ OSD"sv; + case qu_state_t::registered: return "registered"sv; + case qu_state_t::unregistering: return "unregistering"sv; + } + // g++ (unlike CLANG), requires an extra 'return' here + return "(unknown)"sv; +} +// clang-format on + +/** + * a note regarding 'to_scrub_copy': + * 'to_scrub_copy' is a sorted set of all the ripe jobs from to_copy. + * As we usually expect to refer to only the first job in this set, we could + * consider an alternative implementation: + * - have collect_ripe_jobs() return the copied set without sorting it; + * - loop, performing: + * - use std::min_element() to find a candidate; + * - try that one. If not suitable, discard from 'to_scrub_copy' + */ +Scrub::schedule_result_t ScrubQueue::select_pg_and_scrub( + Scrub::ScrubPreconds& preconds) +{ + dout(10) << " reg./pen. sizes: " << to_scrub.size() << " / " + << penalized.size() << dendl; + + utime_t now_is = time_now(); + + preconds.time_permit = scrub_time_permit(now_is); + preconds.load_is_low = scrub_load_below_threshold(); + preconds.only_deadlined = !preconds.time_permit || !preconds.load_is_low; + + // create a list of candidates (copying, as otherwise creating a deadlock): + // - possibly restore penalized + // - (if we didn't handle directly) remove invalid jobs + // - create a copy of the to_scrub (possibly up to first not-ripe) + // - same for the penalized (although that usually be a waste) + // unlock, then try the lists + + std::unique_lock lck{jobs_lock}; + + // pardon all penalized jobs that have deadlined (or were updated) + scan_penalized(restore_penalized, now_is); + restore_penalized = false; + + // remove the 'updated' flag from all entries + std::for_each(to_scrub.begin(), + to_scrub.end(), + [](const auto& jobref) -> void { jobref->updated = false; }); + + // add failed scrub attempts to the penalized list + move_failed_pgs(now_is); + + // collect all valid & ripe jobs from the two lists. Note that we must copy, + // as when we use the lists we will not be holding jobs_lock (see + // explanation above) + + auto to_scrub_copy = collect_ripe_jobs(to_scrub, now_is); + auto penalized_copy = collect_ripe_jobs(penalized, now_is); + lck.unlock(); + + // try the regular queue first + auto res = select_from_group(to_scrub_copy, preconds, now_is); + + // in the sole scenario in which we've gone over all ripe jobs without success + // - we will try the penalized + if (res == Scrub::schedule_result_t::none_ready && !penalized_copy.empty()) { + res = select_from_group(penalized_copy, preconds, now_is); + dout(10) << "tried the penalized. Res: " + << ScrubQueue::attempt_res_text(res) << dendl; + restore_penalized = true; + } + + dout(15) << dendl; + return res; +} + +// must be called under lock +void ScrubQueue::rm_unregistered_jobs(ScrubQContainer& group) +{ + std::for_each(group.begin(), group.end(), [](auto& job) { + if (job->state == qu_state_t::unregistering) { + job->in_queues = false; + job->state = qu_state_t::not_registered; + } else if (job->state == qu_state_t::not_registered) { + job->in_queues = false; + } + }); + + group.erase(std::remove_if(group.begin(), group.end(), invalid_state), + group.end()); +} + +namespace { +struct cmp_sched_time_t { + bool operator()(const ScrubQueue::ScrubJobRef& lhs, + const ScrubQueue::ScrubJobRef& rhs) const + { + return lhs->schedule.scheduled_at < rhs->schedule.scheduled_at; + } +}; +} // namespace + +// called under lock +ScrubQueue::ScrubQContainer ScrubQueue::collect_ripe_jobs( + ScrubQContainer& group, + utime_t time_now) +{ + rm_unregistered_jobs(group); + + // copy ripe jobs + ScrubQueue::ScrubQContainer ripes; + ripes.reserve(group.size()); + + std::copy_if(group.begin(), + group.end(), + std::back_inserter(ripes), + [time_now](const auto& jobref) -> bool { + return jobref->schedule.scheduled_at <= time_now; + }); + std::sort(ripes.begin(), ripes.end(), cmp_sched_time_t{}); + + if (g_conf()->subsys.should_gather()) { + for (const auto& jobref : group) { + if (jobref->schedule.scheduled_at > time_now) { + dout(20) << " not ripe: " << jobref->pgid << " @ " + << jobref->schedule.scheduled_at << dendl; + } + } + } + + return ripes; +} + +// not holding jobs_lock. 'group' is a copy of the actual list. +Scrub::schedule_result_t ScrubQueue::select_from_group( + ScrubQContainer& group, + const Scrub::ScrubPreconds& preconds, + utime_t now_is) +{ + dout(15) << "jobs #: " << group.size() << dendl; + + for (auto& candidate : group) { + + // we expect the first job in the list to be a good candidate (if any) + + dout(20) << "try initiating scrub for " << candidate->pgid << dendl; + + if (preconds.only_deadlined && (candidate->schedule.deadline.is_zero() || + candidate->schedule.deadline >= now_is)) { + dout(15) << " not scheduling scrub for " << candidate->pgid << " due to " + << (preconds.time_permit ? "high load" : "time not permitting") + << dendl; + continue; + } + + // we have a candidate to scrub. We turn to the OSD to verify that the PG + // configuration allows the specified type of scrub, and to initiate the + // scrub. + switch ( + osd_service.initiate_a_scrub(candidate->pgid, + preconds.allow_requested_repair_only)) { + + case Scrub::schedule_result_t::scrub_initiated: + // the happy path. We are done + dout(20) << " initiated for " << candidate->pgid << dendl; + return Scrub::schedule_result_t::scrub_initiated; + + case Scrub::schedule_result_t::already_started: + case Scrub::schedule_result_t::preconditions: + case Scrub::schedule_result_t::bad_pg_state: + // continue with the next job + dout(20) << "failed (state/cond/started) " << candidate->pgid << dendl; + break; + + case Scrub::schedule_result_t::no_such_pg: + // The pg is no longer there + dout(20) << "failed (no pg) " << candidate->pgid << dendl; + break; + + case Scrub::schedule_result_t::no_local_resources: + // failure to secure local resources. No point in trying the other + // PGs at this time. Note that this is not the same as replica resources + // failure! + dout(20) << "failed (local) " << candidate->pgid << dendl; + return Scrub::schedule_result_t::no_local_resources; + + case Scrub::schedule_result_t::none_ready: + // can't happen. Just for the compiler. + dout(5) << "failed !!! " << candidate->pgid << dendl; + return Scrub::schedule_result_t::none_ready; + } + } + + dout(20) << " returning 'none ready'" << dendl; + return Scrub::schedule_result_t::none_ready; +} + +ScrubQueue::scrub_schedule_t ScrubQueue::adjust_target_time( + const sched_params_t& times) const +{ + ScrubQueue::scrub_schedule_t sched_n_dead{ + times.proposed_time, times.proposed_time}; + + if (times.is_must == ScrubQueue::must_scrub_t::not_mandatory) { + // unless explicitly requested, postpone the scrub with a random delay + double scrub_min_interval = times.min_interval > 0 + ? times.min_interval + : conf()->osd_scrub_min_interval; + double scrub_max_interval = times.max_interval > 0 + ? times.max_interval + : conf()->osd_scrub_max_interval; + + sched_n_dead.scheduled_at += scrub_min_interval; + double r = rand() / (double)RAND_MAX; + sched_n_dead.scheduled_at += + scrub_min_interval * conf()->osd_scrub_interval_randomize_ratio * r; + + if (scrub_max_interval <= 0) { + sched_n_dead.deadline = utime_t{}; + } else { + sched_n_dead.deadline += scrub_max_interval; + } + // note: no specific job can be named in the log message + dout(20) << fmt::format( + "not-must. Was:{:s} {{min:{}/{} max:{}/{} ratio:{}}} " + "Adjusted:{:s} ({:s})", + times.proposed_time, fmt::group_digits(times.min_interval), + fmt::group_digits(conf()->osd_scrub_min_interval), + fmt::group_digits(times.max_interval), + fmt::group_digits(conf()->osd_scrub_max_interval), + conf()->osd_scrub_interval_randomize_ratio, + sched_n_dead.scheduled_at, sched_n_dead.deadline) + << dendl; + } + // else - no log needed. All relevant data will be logged by the caller + return sched_n_dead; +} + +double ScrubQueue::scrub_sleep_time(bool must_scrub) const +{ + double regular_sleep_period = conf()->osd_scrub_sleep; + + if (must_scrub || scrub_time_permit(time_now())) { + return regular_sleep_period; + } + + // relevant if scrubbing started during allowed time, but continued into + // forbidden hours + double extended_sleep = conf()->osd_scrub_extended_sleep; + dout(20) << "w/ extended sleep (" << extended_sleep << ")" << dendl; + return std::max(extended_sleep, regular_sleep_period); +} + +bool ScrubQueue::scrub_load_below_threshold() const +{ + double loadavgs[3]; + if (getloadavg(loadavgs, 3) != 3) { + dout(10) << __func__ << " couldn't read loadavgs\n" << dendl; + return false; + } + + // allow scrub if below configured threshold + long cpus = sysconf(_SC_NPROCESSORS_ONLN); + double loadavg_per_cpu = cpus > 0 ? loadavgs[0] / cpus : loadavgs[0]; + if (loadavg_per_cpu < conf()->osd_scrub_load_threshold) { + dout(20) << "loadavg per cpu " << loadavg_per_cpu << " < max " + << conf()->osd_scrub_load_threshold << " = yes" << dendl; + return true; + } + + // allow scrub if below daily avg and currently decreasing + if (loadavgs[0] < daily_loadavg && loadavgs[0] < loadavgs[2]) { + dout(20) << "loadavg " << loadavgs[0] << " < daily_loadavg " + << daily_loadavg << " and < 15m avg " << loadavgs[2] << " = yes" + << dendl; + return true; + } + + dout(20) << "loadavg " << loadavgs[0] << " >= max " + << conf()->osd_scrub_load_threshold << " and ( >= daily_loadavg " + << daily_loadavg << " or >= 15m avg " << loadavgs[2] << ") = no" + << dendl; + return false; +} + + +// note: called with jobs_lock held +void ScrubQueue::scan_penalized(bool forgive_all, utime_t time_now) +{ + dout(20) << time_now << (forgive_all ? " all " : " - ") << penalized.size() + << dendl; + + // clear dead entries (deleted PGs, or those PGs we are no longer their + // primary) + rm_unregistered_jobs(penalized); + + if (forgive_all) { + + std::copy(penalized.begin(), penalized.end(), std::back_inserter(to_scrub)); + penalized.clear(); + + } else { + + auto forgiven_last = std::partition( + penalized.begin(), + penalized.end(), + [time_now](const auto& e) { + return (*e).updated || ((*e).penalty_timeout <= time_now); + }); + + std::copy(penalized.begin(), forgiven_last, std::back_inserter(to_scrub)); + penalized.erase(penalized.begin(), forgiven_last); + dout(20) << "penalized after screening: " << penalized.size() << dendl; + } +} + +// checks for half-closed ranges. Modify the (p +#include +#include +#include +#include + +#include "common/RefCountedObj.h" +#include "common/ceph_atomic.h" +#include "osd/osd_types.h" +#include "osd/scrubber_common.h" +#include "include/utime_fmt.h" +#include "osd/osd_types_fmt.h" +#include "utime.h" + +class PG; + +namespace Scrub { + +using namespace ::std::literals; + +// possible outcome when trying to select a PG and scrub it +enum class schedule_result_t { + scrub_initiated, // successfully started a scrub + none_ready, // no pg to scrub + no_local_resources, // failure to secure local OSD scrub resource + already_started, // failed, as already started scrubbing this pg + no_such_pg, // can't find this pg + bad_pg_state, // pg state (clean, active, etc.) + preconditions // time, configuration, etc. +}; + +// the OSD services provided to the scrub scheduler +class ScrubSchedListener { + public: + virtual int get_nodeid() const = 0; // returns the OSD number ('whoami') + + /** + * A callback used by the ScrubQueue object to initiate a scrub on a specific + * PG. + * + * The request might fail for multiple reasons, as ScrubQueue cannot by its + * own check some of the PG-specific preconditions and those are checked here. + * See attempt_t definition. + * + * @return a Scrub::attempt_t detailing either a success, or the failure + * reason. + */ + virtual schedule_result_t initiate_a_scrub( + spg_t pgid, + bool allow_requested_repair_only) = 0; + + virtual ~ScrubSchedListener() {} +}; + +} // namespace Scrub + +/** + * the queue of PGs waiting to be scrubbed. + * Main operations are scheduling/unscheduling a PG to be scrubbed at a certain + * time. + * + * A "penalty" queue maintains those PGs that have failed to reserve the + * resources of their replicas. The PGs in this list will be reinstated into the + * scrub queue when all eligible PGs were already handled, or after a timeout + * (or if their deadline has passed [[disabled at this time]]). + */ +class ScrubQueue { + public: + enum class must_scrub_t { not_mandatory, mandatory }; + + enum class qu_state_t { + not_registered, // not a primary, thus not considered for scrubbing by this + // OSD (also the temporary state when just created) + registered, // in either of the two queues ('to_scrub' or 'penalized') + unregistering // in the process of being unregistered. Will be finalized + // under lock + }; + + ScrubQueue(CephContext* cct, Scrub::ScrubSchedListener& osds); + virtual ~ScrubQueue() = default; + + struct scrub_schedule_t { + utime_t scheduled_at{}; + utime_t deadline{0, 0}; + }; + + struct sched_params_t { + utime_t proposed_time{}; + double min_interval{0.0}; + double max_interval{0.0}; + must_scrub_t is_must{ScrubQueue::must_scrub_t::not_mandatory}; + }; + + struct ScrubJob final : public RefCountedObject { + + /** + * a time scheduled for scrub, and a deadline: The scrub could be delayed + * if system load is too high (but not if after the deadline),or if trying + * to scrub out of scrub hours. + */ + scrub_schedule_t schedule; + + /// pg to be scrubbed + const spg_t pgid; + + /// the OSD id (for the log) + const int whoami; + + ceph::atomic state{qu_state_t::not_registered}; + + /** + * the old 'is_registered'. Set whenever the job is registered with the OSD, + * i.e. is in either the 'to_scrub' or the 'penalized' vectors. + */ + std::atomic_bool in_queues{false}; + + /// last scrub attempt failed to secure replica resources + bool resources_failure{false}; + + /** + * 'updated' is a temporary flag, used to create a barrier after + * 'sched_time' and 'deadline' (or any other job entry) were modified by + * different task. + * 'updated' also signals the need to move a job back from the penalized + * queue to the regular one. + */ + std::atomic_bool updated{false}; + + /** + * the scrubber is waiting for locked objects to be unlocked. + * Set after a grace period has passed. + */ + bool blocked{false}; + utime_t blocked_since{}; + + utime_t penalty_timeout{0, 0}; + + CephContext* cct; + + ScrubJob(CephContext* cct, const spg_t& pg, int node_id); + + utime_t get_sched_time() const { return schedule.scheduled_at; } + + /** + * relatively low-cost(*) access to the scrub job's state, to be used in + * logging. + * (*) not a low-cost access on x64 architecture + */ + std::string_view state_desc() const + { + return ScrubQueue::qu_state_text(state.load(std::memory_order_relaxed)); + } + + void update_schedule(const ScrubQueue::scrub_schedule_t& adjusted); + + void dump(ceph::Formatter* f) const; + + /* + * as the atomic 'in_queues' appears in many log prints, accessing it for + * display-only should be made less expensive (on ARM. On x86 the _relaxed + * produces the same code as '_cs') + */ + std::string_view registration_state() const + { + return in_queues.load(std::memory_order_relaxed) ? "in-queue" + : "not-queued"; + } + + /** + * a text description of the "scheduling intentions" of this PG: + * are we already scheduled for a scrub/deep scrub? when? + */ + std::string scheduling_state(utime_t now_is, bool is_deep_expected) const; + + friend std::ostream& operator<<(std::ostream& out, const ScrubJob& pg); + }; + + friend class TestOSDScrub; + friend class ScrubSchedTestWrapper; ///< unit-tests structure + + using ScrubJobRef = ceph::ref_t; + using ScrubQContainer = std::vector; + + static std::string_view qu_state_text(qu_state_t st); + + /** + * called periodically by the OSD to select the first scrub-eligible PG + * and scrub it. + * + * Selection is affected by: + * - time of day: scheduled scrubbing might be configured to only happen + * during certain hours; + * - same for days of the week, and for the system load; + * + * @param preconds: what types of scrub are allowed, given system status & + * config. Some of the preconditions are calculated here. + * @return Scrub::attempt_t::scrubbing if a scrub session was successfully + * initiated. Otherwise - the failure cause. + * + * locking: locks jobs_lock + */ + Scrub::schedule_result_t select_pg_and_scrub(Scrub::ScrubPreconds& preconds); + + /** + * Translate attempt_ values into readable text + */ + static std::string_view attempt_res_text(Scrub::schedule_result_t v); + + /** + * remove the pg from set of PGs to be scanned for scrubbing. + * To be used if we are no longer the PG's primary, or if the PG is removed. + */ + void remove_from_osd_queue(ScrubJobRef sjob); + + /** + * @return the list (not std::set!) of all scrub jobs registered + * (apart from PGs in the process of being removed) + */ + ScrubQContainer list_registered_jobs() const; + + /** + * Add the scrub job to the list of jobs (i.e. list of PGs) to be periodically + * scrubbed by the OSD. + * The registration is active as long as the PG exists and the OSD is its + * primary. + * + * See update_job() for the handling of the 'suggested' parameter. + * + * locking: might lock jobs_lock + */ + void register_with_osd(ScrubJobRef sjob, const sched_params_t& suggested); + + /** + * modify a scrub-job's scheduled time and deadline + * + * There are 3 argument combinations to consider: + * - 'must' is asserted, and the suggested time is 'scrub_must_stamp': + * the registration will be with "beginning of time" target, making the + * scrub-job eligible to immediate scrub (given that external conditions + * do not prevent scrubbing) + * + * - 'must' is asserted, and the suggested time is 'now': + * This happens if our stats are unknown. The results are similar to the + * previous scenario. + * + * - not a 'must': we take the suggested time as a basis, and add to it some + * configuration / random delays. + * + * ('must' is sched_params_t.is_must) + * + * locking: not using the jobs_lock + */ + void update_job(ScrubJobRef sjob, const sched_params_t& suggested); + + sched_params_t determine_scrub_time(const requested_scrub_t& request_flags, + const pg_info_t& pg_info, + const pool_opts_t& pool_conf) const; + + public: + void dump_scrubs(ceph::Formatter* f) const; + + /** + * No new scrub session will start while a scrub was initiated on a PG, + * and that PG is trying to acquire replica resources. + */ + void set_reserving_now() { a_pg_is_reserving = true; } + void clear_reserving_now() { a_pg_is_reserving = false; } + bool is_reserving_now() const { return a_pg_is_reserving; } + + bool can_inc_scrubs() const; + bool inc_scrubs_local(); + void dec_scrubs_local(); + bool inc_scrubs_remote(); + void dec_scrubs_remote(); + void dump_scrub_reservations(ceph::Formatter* f) const; + + /// counting the number of PGs stuck while scrubbing, waiting for objects + void mark_pg_scrub_blocked(spg_t blocked_pg); + void clear_pg_scrub_blocked(spg_t blocked_pg); + int get_blocked_pgs_count() const; + + /** + * Pacing the scrub operation by inserting delays (mostly between chunks) + * + * Special handling for regular scrubs that continued into "no scrub" times. + * Scrubbing will continue, but the delays will be controlled by a separate + * (read - with higher value) configuration element + * (osd_scrub_extended_sleep). + */ + double scrub_sleep_time(bool must_scrub) const; /// \todo (future) return + /// milliseconds + + /** + * called every heartbeat to update the "daily" load average + * + * @returns a load value for the logger + */ + [[nodiscard]] std::optional update_load_average(); + + private: + CephContext* cct; + Scrub::ScrubSchedListener& osd_service; + +#ifdef WITH_SEASTAR + auto& conf() const { return local_conf(); } +#else + auto& conf() const { return cct->_conf; } +#endif + + /** + * jobs_lock protects the job containers and the relevant scrub-jobs state + * variables. Specifically, the following are guaranteed: + * - 'in_queues' is asserted only if the job is in one of the queues; + * - a job will only be in state 'registered' if in one of the queues; + * - no job will be in the two queues simultaneously; + * + * Note that PG locks should not be acquired while holding jobs_lock. + */ + mutable ceph::mutex jobs_lock = ceph::make_mutex("ScrubQueue::jobs_lock"); + + ScrubQContainer to_scrub; ///< scrub jobs (i.e. PGs) to scrub + ScrubQContainer penalized; ///< those that failed to reserve remote resources + bool restore_penalized{false}; + + double daily_loadavg{0.0}; + + static inline constexpr auto registered_job = [](const auto& jobref) -> bool { + return jobref->state == qu_state_t::registered; + }; + + static inline constexpr auto invalid_state = [](const auto& jobref) -> bool { + return jobref->state == qu_state_t::not_registered; + }; + + /** + * Are there scrub jobs that should be reinstated? + */ + void scan_penalized(bool forgive_all, utime_t time_now); + + /** + * clear dead entries (unregistered, or belonging to removed PGs) from a + * queue. Job state is changed to match new status. + */ + void rm_unregistered_jobs(ScrubQContainer& group); + + /** + * the set of all scrub jobs in 'group' which are ready to be scrubbed + * (ready = their scheduled time has passed). + * The scrub jobs in the new collection are sorted according to + * their scheduled time. + * + * Note that the returned container holds independent refs to the + * scrub jobs. + */ + ScrubQContainer collect_ripe_jobs(ScrubQContainer& group, utime_t time_now); + + + /// scrub resources management lock (guarding scrubs_local & scrubs_remote) + mutable ceph::mutex resource_lock = + ceph::make_mutex("ScrubQueue::resource_lock"); + + /// the counters used to manage scrub activity parallelism: + int scrubs_local{0}; + int scrubs_remote{0}; + + /** + * The scrubbing of PGs might be delayed if the scrubbed chunk of objects is + * locked by some other operation. A bug might cause this to be an infinite + * delay. If that happens, the OSDs "scrub resources" (i.e. the + * counters that limit the number of concurrent scrub operations) might + * be exhausted. + * We do issue a cluster-log warning in such occasions, but that message is + * easy to miss. The 'some pg is blocked' global flag is used to note the + * existence of such a situation in the scrub-queue log messages. + */ + std::atomic_int_fast16_t blocked_scrubs_cnt{0}; + + std::atomic_bool a_pg_is_reserving{false}; + + [[nodiscard]] bool scrub_load_below_threshold() const; + [[nodiscard]] bool scrub_time_permit(utime_t now) const; + + /** + * If the scrub job was not explicitly requested, we postpone it by some + * random length of time. + * And if delaying the scrub - we calculate, based on pool parameters, a + * deadline we should scrub before. + * + * @return a pair of values: the determined scrub time, and the deadline + */ + scrub_schedule_t adjust_target_time( + const sched_params_t& recomputed_params) const; + + /** + * Look for scrub jobs that have their 'resources_failure' set. These jobs + * have failed to acquire remote resources last time we've initiated a scrub + * session on them. They are now moved from the 'to_scrub' queue to the + * 'penalized' set. + * + * locking: called with job_lock held + */ + void move_failed_pgs(utime_t now_is); + + Scrub::schedule_result_t select_from_group( + ScrubQContainer& group, + const Scrub::ScrubPreconds& preconds, + utime_t now_is); + +protected: // used by the unit-tests + /** + * unit-tests will override this function to return a mock time + */ + virtual utime_t time_now() const { return ceph_clock_now(); } +}; + +template <> +struct fmt::formatter + : fmt::formatter { + template + auto format(const ScrubQueue::qu_state_t& s, FormatContext& ctx) + { + auto out = ctx.out(); + out = fmt::formatter::format( + std::string{ScrubQueue::qu_state_text(s)}, ctx); + return out; + } +}; + +template <> +struct fmt::formatter { + constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); } + + template + auto format(const ScrubQueue::ScrubJob& sjob, FormatContext& ctx) + { + return fmt::format_to( + ctx.out(), + "pg[{}] @ {:s} (dl:{:s}) - <{}> / failure: {} / pen. t.o.: {:s} / queue " + "state: {:.7}", + sjob.pgid, sjob.schedule.scheduled_at, sjob.schedule.deadline, + sjob.registration_state(), sjob.resources_failure, sjob.penalty_timeout, + sjob.state.load(std::memory_order_relaxed)); + } +}; diff --git a/src/osd/scrubber/pg_scrubber.cc b/src/osd/scrubber/pg_scrubber.cc new file mode 100644 index 000000000..4cb4ef8da --- /dev/null +++ b/src/osd/scrubber/pg_scrubber.cc @@ -0,0 +1,2979 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=2 sw=2 smarttab + +#include "./pg_scrubber.h" // '.' notation used to affect clang-format order + +#include +#include +#include + +#include + +#include "debug.h" + +#include "common/ceph_time.h" +#include "common/errno.h" +#include "messages/MOSDOp.h" +#include "messages/MOSDRepScrub.h" +#include "messages/MOSDRepScrubMap.h" +#include "messages/MOSDScrubReserve.h" +#include "osd/OSD.h" +#include "osd/PG.h" +#include "include/utime_fmt.h" +#include "osd/osd_types_fmt.h" + +#include "ScrubStore.h" +#include "scrub_backend.h" +#include "scrub_machine.h" + +using std::list; +using std::pair; +using std::stringstream; +using std::vector; +using namespace Scrub; +using namespace std::chrono; +using namespace std::chrono_literals; +using namespace std::literals; + +#define dout_context (m_osds->cct) +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) + +template +static ostream& _prefix(std::ostream* _dout, T* t) +{ + return t->gen_prefix(*_dout); +} + +ostream& operator<<(ostream& out, const scrub_flags_t& sf) +{ + if (sf.auto_repair) + out << " AUTO_REPAIR"; + if (sf.check_repair) + out << " CHECK_REPAIR"; + if (sf.deep_scrub_on_error) + out << " DEEP_SCRUB_ON_ERROR"; + if (sf.required) + out << " REQ_SCRUB"; + + return out; +} + +ostream& operator<<(ostream& out, const requested_scrub_t& sf) +{ + if (sf.must_repair) + out << " MUST_REPAIR"; + if (sf.auto_repair) + out << " planned AUTO_REPAIR"; + if (sf.check_repair) + out << " planned CHECK_REPAIR"; + if (sf.deep_scrub_on_error) + out << " planned DEEP_SCRUB_ON_ERROR"; + if (sf.must_deep_scrub) + out << " MUST_DEEP_SCRUB"; + if (sf.must_scrub) + out << " MUST_SCRUB"; + if (sf.time_for_deep) + out << " TIME_FOR_DEEP"; + if (sf.need_auto) + out << " NEED_AUTO"; + if (sf.req_scrub) + out << " planned REQ_SCRUB"; + + return out; +} + +/* + * if the incoming message is from a previous interval, it must mean + * PrimaryLogPG::on_change() was called when that interval ended. We can safely + * discard the stale message. + */ +bool PgScrubber::check_interval(epoch_t epoch_to_verify) +{ + return epoch_to_verify >= m_pg->get_same_interval_since(); +} + +bool PgScrubber::is_message_relevant(epoch_t epoch_to_verify) +{ + if (!m_active) { + // not scrubbing. We can assume that the scrub was already terminated, and + // we can silently discard the incoming event. + return false; + } + + // is this a message from before we started this scrub? + if (epoch_to_verify < m_epoch_start) { + return false; + } + + // has a new interval started? + if (!check_interval(epoch_to_verify)) { + // if this is a new interval, on_change() has already terminated that + // old scrub. + return false; + } + + ceph_assert(is_primary()); + + // were we instructed to abort? + return verify_against_abort(epoch_to_verify); +} + +bool PgScrubber::verify_against_abort(epoch_t epoch_to_verify) +{ + if (!should_abort()) { + return true; + } + + dout(10) << __func__ << " aborting. incoming epoch: " << epoch_to_verify + << " vs last-aborted: " << m_last_aborted << dendl; + + // if we were not aware of the abort before - kill the scrub. + if (epoch_to_verify >= m_last_aborted) { + scrub_clear_state(); + m_last_aborted = std::max(epoch_to_verify, m_epoch_start); + } + return false; +} + +bool PgScrubber::should_abort() const +{ + // note that set_op_parameters() guarantees that we would never have + // must_scrub set (i.e. possibly have started a scrub even though noscrub + // was set), without having 'required' also set. + if (m_flags.required) { + return false; // not stopping 'required' scrubs for configuration changes + } + + // note: deep scrubs are allowed even if 'no-scrub' is set (but not + // 'no-deepscrub') + if (m_is_deep) { + if (get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) || + m_pg->pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB)) { + dout(10) << "nodeep_scrub set, aborting" << dendl; + return true; + } + } else if (get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) || + m_pg->pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB)) { + dout(10) << "noscrub set, aborting" << dendl; + return true; + } + + return false; +} + +// initiating state-machine events -------------------------------- + +/* + * a note re the checks performed before sending scrub-initiating messages: + * + * For those ('StartScrub', 'AfterRepairScrub') scrub-initiation messages that + * possibly were in the queue while the PG changed state and became unavailable + * for scrubbing: + * + * The check_interval() catches all major changes to the PG. As for the other + * conditions we may check (and see is_message_relevant() above): + * + * - we are not 'active' yet, so must not check against is_active(), and: + * + * - the 'abort' flags were just verified (when the triggering message was + * queued). As those are only modified in human speeds - they need not be + * queried again. + * + * Some of the considerations above are also relevant to the replica-side + * initiation + * ('StartReplica' & 'StartReplicaNoWait'). + */ + +void PgScrubber::initiate_regular_scrub(epoch_t epoch_queued) +{ + dout(15) << __func__ << " epoch: " << epoch_queued << dendl; + // we may have lost our Primary status while the message languished in the + // queue + if (check_interval(epoch_queued)) { + dout(10) << "scrubber event -->> StartScrub epoch: " << epoch_queued + << dendl; + reset_epoch(epoch_queued); + m_fsm->process_event(StartScrub{}); + dout(10) << "scrubber event --<< StartScrub" << dendl; + } else { + clear_queued_or_active(); // also restarts snap trimming + } +} + +void PgScrubber::initiate_scrub_after_repair(epoch_t epoch_queued) +{ + dout(15) << __func__ << " epoch: " << epoch_queued << dendl; + // we may have lost our Primary status while the message languished in the + // queue + if (check_interval(epoch_queued)) { + dout(10) << "scrubber event -->> AfterRepairScrub epoch: " << epoch_queued + << dendl; + reset_epoch(epoch_queued); + m_fsm->process_event(AfterRepairScrub{}); + dout(10) << "scrubber event --<< AfterRepairScrub" << dendl; + } else { + clear_queued_or_active(); // also restarts snap trimming + } +} + +void PgScrubber::send_scrub_unblock(epoch_t epoch_queued) +{ + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + if (is_message_relevant(epoch_queued)) { + m_fsm->process_event(Unblocked{}); + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::send_scrub_resched(epoch_t epoch_queued) +{ + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + if (is_message_relevant(epoch_queued)) { + m_fsm->process_event(InternalSchedScrub{}); + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::send_start_replica(epoch_t epoch_queued, + Scrub::act_token_t token) +{ + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << " token: " << token << dendl; + if (is_primary()) { + // shouldn't happen. Ignore + dout(1) << "got a replica scrub request while Primary!" << dendl; + return; + } + + if (check_interval(epoch_queued) && is_token_current(token)) { + // save us some time by not waiting for updates if there are none + // to wait for. Affects the transition from NotActive into either + // ReplicaWaitUpdates or ActiveReplica. + if (pending_active_pushes()) + m_fsm->process_event(StartReplica{}); + else + m_fsm->process_event(StartReplicaNoWait{}); + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::send_sched_replica(epoch_t epoch_queued, + Scrub::act_token_t token) +{ + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << " token: " << token << dendl; + if (check_interval(epoch_queued) && is_token_current(token)) { + m_fsm->process_event(SchedReplica{}); // retest for map availability + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::active_pushes_notification(epoch_t epoch_queued) +{ + // note: Primary only + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + if (is_message_relevant(epoch_queued)) { + m_fsm->process_event(ActivePushesUpd{}); + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::update_applied_notification(epoch_t epoch_queued) +{ + // note: Primary only + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + if (is_message_relevant(epoch_queued)) { + m_fsm->process_event(UpdatesApplied{}); + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::digest_update_notification(epoch_t epoch_queued) +{ + // note: Primary only + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + if (is_message_relevant(epoch_queued)) { + m_fsm->process_event(DigestUpdate{}); + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::send_local_map_done(epoch_t epoch_queued) +{ + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + if (is_message_relevant(epoch_queued)) { + m_fsm->process_event(Scrub::IntLocalMapDone{}); + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::send_replica_maps_ready(epoch_t epoch_queued) +{ + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + if (is_message_relevant(epoch_queued)) { + m_fsm->process_event(GotReplicas{}); + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::send_replica_pushes_upd(epoch_t epoch_queued) +{ + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + if (check_interval(epoch_queued)) { + m_fsm->process_event(ReplicaPushesUpd{}); + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::send_remotes_reserved(epoch_t epoch_queued) +{ + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + // note: scrub is not active yet + if (check_interval(epoch_queued)) { + m_fsm->process_event(RemotesReserved{}); + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::send_reservation_failure(epoch_t epoch_queued) +{ + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + if (check_interval(epoch_queued)) { // do not check for 'active'! + m_fsm->process_event(ReservationFailure{}); + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::send_full_reset(epoch_t epoch_queued) +{ + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + + m_fsm->process_event(Scrub::FullReset{}); + + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::send_chunk_free(epoch_t epoch_queued) +{ + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + if (check_interval(epoch_queued)) { + m_fsm->process_event(Scrub::SelectedChunkFree{}); + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::send_chunk_busy(epoch_t epoch_queued) +{ + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + if (check_interval(epoch_queued)) { + m_fsm->process_event(Scrub::ChunkIsBusy{}); + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::send_get_next_chunk(epoch_t epoch_queued) +{ + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + if (is_message_relevant(epoch_queued)) { + m_fsm->process_event(Scrub::NextChunk{}); + } + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +void PgScrubber::send_scrub_is_finished(epoch_t epoch_queued) +{ + dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued + << dendl; + + // can't check for "active" + + m_fsm->process_event(Scrub::ScrubFinished{}); + + dout(10) << "scrubber event --<< " << __func__ << dendl; +} + +// ----------------- + +bool PgScrubber::is_reserving() const +{ + return m_fsm->is_reserving(); +} + +void PgScrubber::reset_epoch(epoch_t epoch_queued) +{ + dout(10) << __func__ << " state deep? " << state_test(PG_STATE_DEEP_SCRUB) + << dendl; + m_fsm->assert_not_active(); + + m_epoch_start = epoch_queued; + m_needs_sleep = true; + ceph_assert(m_is_deep == state_test(PG_STATE_DEEP_SCRUB)); + update_op_mode_text(); +} + +unsigned int PgScrubber::scrub_requeue_priority( + Scrub::scrub_prio_t with_priority) const +{ + unsigned int qu_priority = m_flags.priority; + + if (with_priority == Scrub::scrub_prio_t::high_priority) { + qu_priority = + std::max(qu_priority, + (unsigned int)m_pg->get_cct()->_conf->osd_client_op_priority); + } + return qu_priority; +} + +unsigned int PgScrubber::scrub_requeue_priority( + Scrub::scrub_prio_t with_priority, + unsigned int suggested_priority) const +{ + if (with_priority == Scrub::scrub_prio_t::high_priority) { + suggested_priority = + std::max(suggested_priority, + (unsigned int)m_pg->get_cct()->_conf->osd_client_op_priority); + } + return suggested_priority; +} + +// ///////////////////////////////////////////////////////////////////// // +// scrub-op registration handling + +void PgScrubber::unregister_from_osd() +{ + if (m_scrub_job) { + dout(15) << __func__ << " prev. state: " << registration_state() << dendl; + m_osds->get_scrub_services().remove_from_osd_queue(m_scrub_job); + } +} + +bool PgScrubber::is_scrub_registered() const +{ + return m_scrub_job && m_scrub_job->in_queues; +} + +std::string_view PgScrubber::registration_state() const +{ + if (m_scrub_job) { + return m_scrub_job->registration_state(); + } + return "(no sched job)"sv; +} + +void PgScrubber::rm_from_osd_scrubbing() +{ + // make sure the OSD won't try to scrub this one just now + unregister_from_osd(); +} + +void PgScrubber::on_primary_change( + std::string_view caller, + const requested_scrub_t& request_flags) +{ + if (!m_scrub_job) { + // we won't have a chance to see more logs from this function, thus: + dout(10) << fmt::format( + "{}: (from {}& w/{}) {}.Reg-state:{:.7}. No scrub-job", + __func__, caller, request_flags, + (is_primary() ? "Primary" : "Replica/other"), + registration_state()) + << dendl; + return; + } + + auto pre_state = m_scrub_job->state_desc(); + auto pre_reg = registration_state(); + if (is_primary()) { + auto suggested = m_osds->get_scrub_services().determine_scrub_time( + request_flags, m_pg->info, m_pg->get_pgpool().info.opts); + m_osds->get_scrub_services().register_with_osd(m_scrub_job, suggested); + } else { + m_osds->get_scrub_services().remove_from_osd_queue(m_scrub_job); + } + + // is there an interval change we should respond to? + if (is_primary() && is_scrub_active()) { + if (m_interval_start < m_pg->get_same_interval_since()) { + dout(10) << fmt::format( + "{}: interval changed ({} -> {}). Aborting active scrub.", + __func__, m_interval_start, m_pg->get_same_interval_since()) + << dendl; + scrub_clear_state(); + } + } + + dout(10) + << fmt::format( + "{} (from {} {}): {}. <{:.5}>&<{:.10}> --> <{:.5}>&<{:.14}>", + __func__, caller, request_flags, + (is_primary() ? "Primary" : "Replica/other"), pre_reg, pre_state, + registration_state(), m_scrub_job->state_desc()) + << dendl; +} + +/* + * A note re the call to publish_stats_to_osd() below: + * - we are called from either request_rescrubbing() or scrub_requested(). + * - in both cases - the schedule was modified, and needs to be published; + * - we are a Primary. + * - in the 1st case - the call is made as part of scrub_finish(), which + * guarantees that the PG is locked and the interval is still the same. + * - in the 2nd case - we know the PG state and we know we are only called + * for a Primary. +*/ +void PgScrubber::update_scrub_job(const requested_scrub_t& request_flags) +{ + dout(10) << fmt::format("{}: flags:<{}>", __func__, request_flags) << dendl; + // verify that the 'in_q' status matches our "Primariority" + if (m_scrub_job && is_primary() && !m_scrub_job->in_queues) { + dout(1) << __func__ << " !!! primary but not scheduled! " << dendl; + } + + if (is_primary() && m_scrub_job) { + ceph_assert(m_pg->is_locked()); + auto suggested = m_osds->get_scrub_services().determine_scrub_time( + request_flags, m_pg->info, m_pg->get_pgpool().info.opts); + m_osds->get_scrub_services().update_job(m_scrub_job, suggested); + m_pg->publish_stats_to_osd(); + } + + dout(15) << __func__ << ": done " << registration_state() << dendl; +} + +void PgScrubber::scrub_requested(scrub_level_t scrub_level, + scrub_type_t scrub_type, + requested_scrub_t& req_flags) +{ + dout(10) << __func__ + << (scrub_level == scrub_level_t::deep ? " deep " : " shallow ") + << (scrub_type == scrub_type_t::do_repair ? " repair-scrub " + : " not-repair ") + << " prev stamp: " << m_scrub_job->get_sched_time() + << " registered? " << registration_state() << dendl; + + req_flags.must_scrub = true; + req_flags.must_deep_scrub = (scrub_level == scrub_level_t::deep) || + (scrub_type == scrub_type_t::do_repair); + req_flags.must_repair = (scrub_type == scrub_type_t::do_repair); + // User might intervene, so clear this + req_flags.need_auto = false; + req_flags.req_scrub = true; + + dout(20) << __func__ << " pg(" << m_pg_id << ") planned:" << req_flags + << dendl; + + update_scrub_job(req_flags); +} + + +void PgScrubber::request_rescrubbing(requested_scrub_t& request_flags) +{ + dout(10) << __func__ << " flags: " << request_flags << dendl; + + request_flags.need_auto = true; + update_scrub_job(request_flags); +} + +bool PgScrubber::reserve_local() +{ + // try to create the reservation object (which translates into asking the + // OSD for the local scrub resource). If failing - undo it immediately + + m_local_osd_resource.emplace(m_osds); + if (m_local_osd_resource->is_reserved()) { + dout(15) << __func__ << ": local resources reserved" << dendl; + return true; + } + + dout(10) << __func__ << ": failed to reserve local scrub resources" << dendl; + m_local_osd_resource.reset(); + return false; +} + +// ---------------------------------------------------------------------------- + +bool PgScrubber::has_pg_marked_new_updates() const +{ + auto last_applied = m_pg->recovery_state.get_last_update_applied(); + dout(10) << __func__ << " recovery last: " << last_applied + << " vs. scrub's: " << m_subset_last_update << dendl; + + return last_applied >= m_subset_last_update; +} + +void PgScrubber::set_subset_last_update(eversion_t e) +{ + m_subset_last_update = e; + dout(15) << __func__ << " last-update: " << e << dendl; +} + +void PgScrubber::on_applied_when_primary(const eversion_t& applied_version) +{ + // we are only interested in updates if we are the Primary, and in state + // WaitLastUpdate + if (m_fsm->is_accepting_updates() && + (applied_version >= m_subset_last_update)) { + m_osds->queue_scrub_applied_update(m_pg, m_pg->is_scrub_blocking_ops()); + dout(15) << __func__ << " update: " << applied_version + << " vs. required: " << m_subset_last_update << dendl; + } +} + + +namespace { + +/** + * an aux function to be used in select_range() below, to + * select the correct chunk size based on the type of scrub + */ +int size_from_conf( + bool is_deep, + const ceph::common::ConfigProxy& conf, + std::string_view deep_opt, + std::string_view shallow_opt) +{ + if (!is_deep) { + auto sz = conf.get_val(shallow_opt); + if (sz != 0) { + // assuming '0' means that no distinction was yet configured between + // deep and shallow scrubbing + return static_cast(sz); + } + } + return static_cast(conf.get_val(deep_opt)); +} +} // anonymous namespace + +/* + * The selected range is set directly into 'm_start' and 'm_end' + * setting: + * - m_subset_last_update + * - m_max_end + * - end + * - start + */ +bool PgScrubber::select_range() +{ + m_be->new_chunk(); + + /* get the start and end of our scrub chunk + * + * Our scrub chunk has an important restriction we're going to need to + * respect. We can't let head be start or end. + * Using a half-open interval means that if end == head, + * we'd scrub/lock head and the clone right next to head in different + * chunks which would allow us to miss clones created between + * scrubbing that chunk and scrubbing the chunk including head. + * This isn't true for any of the other clones since clones can + * only be created "just to the left of" head. There is one exception + * to this: promotion of clones which always happens to the left of the + * left-most clone, but promote_object checks the scrubber in that + * case, so it should be ok. Also, it's ok to "miss" clones at the + * left end of the range if we are a tier because they may legitimately + * not exist (see _scrub). + */ + + const auto& conf = m_pg->get_cct()->_conf; + dout(20) << fmt::format( + "{} {} mins: {}d {}s, max: {}d {}s", __func__, + (m_is_deep ? "D" : "S"), + conf.get_val("osd_scrub_chunk_min"), + conf.get_val("osd_shallow_scrub_chunk_min"), + conf.get_val("osd_scrub_chunk_max"), + conf.get_val("osd_shallow_scrub_chunk_max")) + << dendl; + + const int min_from_conf = size_from_conf( + m_is_deep, conf, "osd_scrub_chunk_min", "osd_shallow_scrub_chunk_min"); + const int max_from_conf = size_from_conf( + m_is_deep, conf, "osd_scrub_chunk_max", "osd_shallow_scrub_chunk_max"); + + const int divisor = static_cast(preemption_data.chunk_divisor()); + const int min_chunk_sz = std::max(3, min_from_conf / divisor); + const int max_chunk_sz = std::max(min_chunk_sz, max_from_conf / divisor); + + dout(10) << fmt::format( + "{}: Min: {} Max: {} Div: {}", __func__, min_chunk_sz, + max_chunk_sz, divisor) + << dendl; + + hobject_t start = m_start; + hobject_t candidate_end; + std::vector objects; + int ret = m_pg->get_pgbackend()->objects_list_partial( + start, min_chunk_sz, max_chunk_sz, &objects, &candidate_end); + ceph_assert(ret >= 0); + + if (!objects.empty()) { + + hobject_t back = objects.back(); + while (candidate_end.is_head() && candidate_end == back.get_head()) { + candidate_end = back; + objects.pop_back(); + if (objects.empty()) { + ceph_assert(0 == + "Somehow we got more than 2 objects which" + "have the same head but are not clones"); + } + back = objects.back(); + } + + if (candidate_end.is_head()) { + ceph_assert(candidate_end != back.get_head()); + candidate_end = candidate_end.get_object_boundary(); + } + + } else { + ceph_assert(candidate_end.is_max()); + } + + // is that range free for us? if not - we will be rescheduled later by whoever + // triggered us this time + + if (!m_pg->_range_available_for_scrub(m_start, candidate_end)) { + // we'll be requeued by whatever made us unavailable for scrub + dout(10) << __func__ << ": scrub blocked somewhere in range " + << "[" << m_start << ", " << candidate_end << ")" << dendl; + return false; + } + + m_end = candidate_end; + if (m_end > m_max_end) + m_max_end = m_end; + + dout(15) << __func__ << " range selected: " << m_start << " //// " << m_end + << " //// " << m_max_end << dendl; + + // debug: be 'blocked' if told so by the 'pg scrub_debug block' asok command + if (m_debug_blockrange > 0) { + m_debug_blockrange--; + return false; + } + return true; +} + +void PgScrubber::select_range_n_notify() +{ + if (select_range()) { + // the next chunk to handle is not blocked + dout(20) << __func__ << ": selection OK" << dendl; + m_osds->queue_scrub_chunk_free(m_pg, Scrub::scrub_prio_t::low_priority); + + } else { + // we will wait for the objects range to become available for scrubbing + dout(10) << __func__ << ": selected chunk is busy" << dendl; + m_osds->queue_scrub_chunk_busy(m_pg, Scrub::scrub_prio_t::low_priority); + } +} + +bool PgScrubber::write_blocked_by_scrub(const hobject_t& soid) +{ + if (soid < m_start || soid >= m_end) { + return false; + } + + dout(20) << __func__ << " " << soid << " can preempt? " + << preemption_data.is_preemptable() << " already preempted? " + << preemption_data.was_preempted() << dendl; + + if (preemption_data.was_preempted()) { + // otherwise - write requests arriving while 'already preempted' is set + // but 'preemptable' is not - will not be allowed to continue, and will + // not be requeued on time. + return false; + } + + if (preemption_data.is_preemptable()) { + + dout(10) << __func__ << " " << soid << " preempted" << dendl; + + // signal the preemption + preemption_data.do_preempt(); + m_end = m_start; // free the range we were scrubbing + + return false; + } + return true; +} + +bool PgScrubber::range_intersects_scrub(const hobject_t& start, + const hobject_t& end) +{ + // does [start, end] intersect [scrubber.start, scrubber.m_max_end) + return (start < m_max_end && end >= m_start); +} + +Scrub::BlockedRangeWarning PgScrubber::acquire_blocked_alarm() +{ + int grace = get_pg_cct()->_conf->osd_blocked_scrub_grace_period; + if (grace == 0) { + // we will not be sending any alarms re the blocked object + dout(10) + << __func__ + << ": blocked-alarm disabled ('osd_blocked_scrub_grace_period' set to 0)" + << dendl; + return nullptr; + } + ceph::timespan grace_period{m_debug_blockrange ? 4s : seconds{grace}}; + dout(20) << fmt::format(": timeout:{}", + std::chrono::duration_cast(grace_period)) + << dendl; + return std::make_unique(m_osds, + grace_period, + *this, + m_pg_id); +} + +/** + * if we are required to sleep: + * arrange a callback sometimes later. + * be sure to be able to identify a stale callback. + * Otherwise: perform a requeue (i.e. - rescheduling thru the OSD queue) + * anyway. + */ +void PgScrubber::add_delayed_scheduling() +{ + m_end = m_start; // not blocking any range now + + milliseconds sleep_time{0ms}; + if (m_needs_sleep) { + double scrub_sleep = + 1000.0 * m_osds->get_scrub_services().scrub_sleep_time(m_flags.required); + sleep_time = milliseconds{int64_t(scrub_sleep)}; + } + dout(15) << __func__ << " sleep: " << sleep_time.count() << "ms. needed? " + << m_needs_sleep << dendl; + + if (sleep_time.count()) { + // schedule a transition for some 'sleep_time' ms in the future + + m_needs_sleep = false; + m_sleep_started_at = ceph_clock_now(); + + // the following log line is used by osd-scrub-test.sh + dout(20) << __func__ << " scrub state is PendingTimer, sleeping" << dendl; + + // the 'delayer' for crimson is different. Will be factored out. + + spg_t pgid = m_pg->get_pgid(); + auto callbk = new LambdaContext([osds = m_osds, pgid, scrbr = this]( + [[maybe_unused]] int r) mutable { + PGRef pg = osds->osd->lookup_lock_pg(pgid); + if (!pg) { + lgeneric_subdout(g_ceph_context, osd, 10) + << "scrub_requeue_callback: Could not find " + << "PG " << pgid << " can't complete scrub requeue after sleep" + << dendl; + return; + } + scrbr->m_needs_sleep = true; + lgeneric_dout(scrbr->get_pg_cct(), 7) + << "scrub_requeue_callback: slept for " + << ceph_clock_now() - scrbr->m_sleep_started_at << ", re-queuing scrub" + << dendl; + + scrbr->m_sleep_started_at = utime_t{}; + osds->queue_for_scrub_resched(&(*pg), Scrub::scrub_prio_t::low_priority); + pg->unlock(); + }); + + std::lock_guard l(m_osds->sleep_lock); + m_osds->sleep_timer.add_event_after(sleep_time.count() / 1000.0f, callbk); + + } else { + // just a requeue + m_osds->queue_for_scrub_resched(m_pg, Scrub::scrub_prio_t::high_priority); + } +} + +eversion_t PgScrubber::search_log_for_updates() const +{ + auto& projected = m_pg->projected_log.log; + auto pi = find_if(projected.crbegin(), + projected.crend(), + [this](const auto& e) -> bool { + return e.soid >= m_start && e.soid < m_end; + }); + + if (pi != projected.crend()) + return pi->version; + + // there was no relevant update entry in the log + + auto& log = m_pg->recovery_state.get_pg_log().get_log().log; + auto p = find_if(log.crbegin(), log.crend(), [this](const auto& e) -> bool { + return e.soid >= m_start && e.soid < m_end; + }); + + if (p == log.crend()) + return eversion_t{}; + else + return p->version; +} + +void PgScrubber::get_replicas_maps(bool replica_can_preempt) +{ + dout(10) << __func__ << " started in epoch/interval: " << m_epoch_start << "/" + << m_interval_start << " pg same_interval_since: " + << m_pg->info.history.same_interval_since << dendl; + + m_primary_scrubmap_pos.reset(); + + // ask replicas to scan and send maps + for (const auto& i : m_pg->get_actingset()) { + + if (i == m_pg_whoami) + continue; + + m_maps_status.mark_replica_map_request(i); + _request_scrub_map(i, + m_subset_last_update, + m_start, + m_end, + m_is_deep, + replica_can_preempt); + } + + dout(10) << __func__ << " awaiting" << m_maps_status << dendl; +} + +bool PgScrubber::was_epoch_changed() const +{ + // for crimson we have m_pg->get_info().history.same_interval_since + dout(10) << __func__ << " epoch_start: " << m_interval_start + << " from pg: " << m_pg->get_history().same_interval_since << dendl; + + return m_interval_start < m_pg->get_history().same_interval_since; +} + +void PgScrubber::mark_local_map_ready() +{ + m_maps_status.mark_local_map_ready(); +} + +bool PgScrubber::are_all_maps_available() const +{ + return m_maps_status.are_all_maps_available(); +} + +std::string PgScrubber::dump_awaited_maps() const +{ + return m_maps_status.dump(); +} + +void PgScrubber::update_op_mode_text() +{ + auto visible_repair = state_test(PG_STATE_REPAIR); + m_mode_desc = + (visible_repair ? "repair" : (m_is_deep ? "deep-scrub" : "scrub")); + + dout(10) << __func__ + << ": repair: visible: " << (visible_repair ? "true" : "false") + << ", internal: " << (m_is_repair ? "true" : "false") + << ". Displayed: " << m_mode_desc << dendl; +} + +void PgScrubber::_request_scrub_map(pg_shard_t replica, + eversion_t version, + hobject_t start, + hobject_t end, + bool deep, + bool allow_preemption) +{ + ceph_assert(replica != m_pg_whoami); + dout(10) << __func__ << " scrubmap from osd." << replica + << (deep ? " deep" : " shallow") << dendl; + + auto repscrubop = new MOSDRepScrub(spg_t(m_pg->info.pgid.pgid, replica.shard), + version, + get_osdmap_epoch(), + m_pg->get_last_peering_reset(), + start, + end, + deep, + allow_preemption, + m_flags.priority, + m_pg->ops_blocked_by_scrub()); + + // default priority. We want the replica-scrub processed prior to any recovery + // or client io messages (we are holding a lock!) + m_osds->send_message_osd_cluster(replica.osd, repscrubop, get_osdmap_epoch()); +} + +void PgScrubber::cleanup_store(ObjectStore::Transaction* t) +{ + if (!m_store) + return; + + struct OnComplete : Context { + std::unique_ptr store; + explicit OnComplete(std::unique_ptr&& store) + : store(std::move(store)) + {} + void finish(int) override {} + }; + m_store->cleanup(t); + t->register_on_complete(new OnComplete(std::move(m_store))); + ceph_assert(!m_store); +} + +void PgScrubber::on_init() +{ + // going upwards from 'inactive' + ceph_assert(!is_scrub_active()); + m_pg->reset_objects_scrubbed(); + preemption_data.reset(); + m_interval_start = m_pg->get_history().same_interval_since; + dout(10) << __func__ << " start same_interval:" << m_interval_start << dendl; + + m_be = std::make_unique( + *this, + *m_pg, + m_pg_whoami, + m_is_repair, + m_is_deep ? scrub_level_t::deep : scrub_level_t::shallow, + m_pg->get_actingset()); + + // create a new store + { + ObjectStore::Transaction t; + cleanup_store(&t); + m_store.reset( + Scrub::Store::create(m_pg->osd->store, &t, m_pg->info.pgid, m_pg->coll)); + m_pg->osd->store->queue_transaction(m_pg->ch, std::move(t), nullptr); + } + + m_start = m_pg->info.pgid.pgid.get_hobj_start(); + m_active = true; + ++m_sessions_counter; + // publish the session counter and the fact the we are scrubbing. + m_pg->publish_stats_to_osd(); +} + +/* + * Note: as on_replica_init() is likely to be called twice (entering + * both ReplicaWaitUpdates & ActiveReplica), its operations should be + * idempotent. + * Now that it includes some state-changing operations, we need to check + * m_active against double-activation. + */ +void PgScrubber::on_replica_init() +{ + dout(10) << __func__ << " called with 'active' " + << (m_active ? "set" : "cleared") << dendl; + if (!m_active) { + m_be = std::make_unique( + *this, *m_pg, m_pg_whoami, m_is_repair, + m_is_deep ? scrub_level_t::deep : scrub_level_t::shallow); + m_active = true; + ++m_sessions_counter; + } +} + +int PgScrubber::build_primary_map_chunk() +{ + epoch_t map_building_since = m_pg->get_osdmap_epoch(); + dout(20) << __func__ << ": initiated at epoch " << map_building_since + << dendl; + + auto ret = build_scrub_map_chunk(m_be->get_primary_scrubmap(), + m_primary_scrubmap_pos, + m_start, + m_end, + m_is_deep); + + if (ret == -EINPROGRESS) { + // reschedule another round of asking the backend to collect the scrub data + m_osds->queue_for_scrub_resched(m_pg, Scrub::scrub_prio_t::low_priority); + } + return ret; +} + + +int PgScrubber::build_replica_map_chunk() +{ + dout(10) << __func__ << " interval start: " << m_interval_start + << " current token: " << m_current_token + << " epoch: " << m_epoch_start << " deep: " << m_is_deep << dendl; + + ceph_assert(m_be); + + auto ret = build_scrub_map_chunk(replica_scrubmap, + replica_scrubmap_pos, + m_start, + m_end, + m_is_deep); + + switch (ret) { + + case -EINPROGRESS: + // must wait for the backend to finish. No external event source. + // (note: previous version used low priority here. Now switched to using + // the priority of the original message) + m_osds->queue_for_rep_scrub_resched(m_pg, + m_replica_request_priority, + m_flags.priority, + m_current_token); + break; + + case 0: { + // finished! + + auto required_fixes = m_be->replica_clean_meta(replica_scrubmap, + m_end.is_max(), + m_start, + get_snap_mapper_accessor()); + // actuate snap-mapper changes: + apply_snap_mapper_fixes(required_fixes); + + // the local map has been created. Send it to the primary. + // Note: once the message reaches the Primary, it may ask us for another + // chunk - and we better be done with the current scrub. Thus - the + // preparation of the reply message is separate, and we clear the scrub + // state before actually sending it. + + auto reply = prep_replica_map_msg(PreemptionNoted::no_preemption); + replica_handling_done(); + dout(15) << __func__ << " chunk map sent " << dendl; + send_replica_map(reply); + } break; + + default: + // negative retval: build_scrub_map_chunk() signalled an error + // Pre-Pacific code ignored this option, treating it as a success. + // \todo Add an error flag in the returning message. + dout(1) << "Error! Aborting. ActiveReplica::react(SchedReplica) Ret: " + << ret << dendl; + replica_handling_done(); + // only in debug mode for now: + assert(false && "backend error"); + break; + }; + + return ret; +} + +int PgScrubber::build_scrub_map_chunk(ScrubMap& map, + ScrubMapBuilder& pos, + hobject_t start, + hobject_t end, + bool deep) +{ + dout(10) << __func__ << " [" << start << "," << end << ") " + << " pos " << pos << " Deep: " << deep << dendl; + + // start + while (pos.empty()) { + + pos.deep = deep; + map.valid_through = m_pg->info.last_update; + + // objects + vector rollback_obs; + pos.ret = m_pg->get_pgbackend()->objects_list_range(start, + end, + &pos.ls, + &rollback_obs); + dout(10) << __func__ << " while pos empty " << pos.ret << dendl; + if (pos.ret < 0) { + dout(5) << "objects_list_range error: " << pos.ret << dendl; + return pos.ret; + } + dout(10) << __func__ << " pos.ls.empty()? " << (pos.ls.empty() ? "+" : "-") + << dendl; + if (pos.ls.empty()) { + break; + } + m_pg->_scan_rollback_obs(rollback_obs); + pos.pos = 0; + return -EINPROGRESS; + } + + // scan objects + while (!pos.done()) { + + int r = m_pg->get_pgbackend()->be_scan_list(map, pos); + dout(30) << __func__ << " BE returned " << r << dendl; + if (r == -EINPROGRESS) { + dout(20) << __func__ << " in progress" << dendl; + return r; + } + } + + // finish + dout(20) << __func__ << " finishing" << dendl; + ceph_assert(pos.done()); + repair_oinfo_oid(map); + + dout(20) << __func__ << " done, got " << map.objects.size() << " items" + << dendl; + return 0; +} + +/// \todo consider moving repair_oinfo_oid() back to the backend +void PgScrubber::repair_oinfo_oid(ScrubMap& smap) +{ + for (auto i = smap.objects.rbegin(); i != smap.objects.rend(); ++i) { + + const hobject_t& hoid = i->first; + ScrubMap::object& o = i->second; + + if (o.attrs.find(OI_ATTR) == o.attrs.end()) { + continue; + } + bufferlist bl; + bl.push_back(o.attrs[OI_ATTR]); + object_info_t oi; + try { + oi.decode(bl); + } catch (...) { + continue; + } + + if (oi.soid != hoid) { + ObjectStore::Transaction t; + OSDriver::OSTransaction _t(m_pg->osdriver.get_transaction(&t)); + + m_osds->clog->error() + << "osd." << m_pg_whoami << " found object info error on pg " << m_pg_id + << " oid " << hoid << " oid in object info: " << oi.soid + << "...repaired"; + // Fix object info + oi.soid = hoid; + bl.clear(); + encode(oi, + bl, + m_pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr)); + + bufferptr bp(bl.c_str(), bl.length()); + o.attrs[OI_ATTR] = bp; + + t.setattr(m_pg->coll, ghobject_t(hoid), OI_ATTR, bl); + int r = m_pg->osd->store->queue_transaction(m_pg->ch, std::move(t)); + if (r != 0) { + derr << __func__ << ": queue_transaction got " << cpp_strerror(r) + << dendl; + } + } + } +} + + +void PgScrubber::run_callbacks() +{ + std::list to_run; + to_run.swap(m_callbacks); + + for (auto& tr : to_run) { + tr->complete(0); + } +} + +void PgScrubber::persist_scrub_results(inconsistent_objs_t&& all_errors) +{ + dout(10) << __func__ << " " << all_errors.size() << " errors" << dendl; + + for (auto& e : all_errors) { + std::visit([this](auto& e) { m_store->add_error(m_pg->pool.id, e); }, e); + } + + ObjectStore::Transaction t; + m_store->flush(&t); + m_osds->store->queue_transaction(m_pg->ch, std::move(t), nullptr); +} + +void PgScrubber::apply_snap_mapper_fixes( + const std::vector& fix_list) +{ + dout(15) << __func__ << " " << fix_list.size() << " fixes" << dendl; + + if (fix_list.empty()) { + return; + } + + ObjectStore::Transaction t; + OSDriver::OSTransaction t_drv(m_pg->osdriver.get_transaction(&t)); + + for (auto& [fix_op, hoid, snaps, bogus_snaps] : fix_list) { + + if (fix_op != snap_mapper_op_t::add) { + + // must remove the existing snap-set before inserting the correct one + if (auto r = m_pg->snap_mapper.remove_oid(hoid, &t_drv); r < 0) { + + derr << __func__ << ": remove_oid returned " << cpp_strerror(r) + << dendl; + if (fix_op == snap_mapper_op_t::update) { + // for inconsistent snapmapper objects (i.e. for + // snap_mapper_op_t::inconsistent), we don't fret if we can't remove + // the old entries + ceph_abort(); + } + } + + m_osds->clog->error() << fmt::format( + "osd.{} found snap mapper error on pg {} oid {} snaps in mapper: {}, " + "oi: " + "{} ...repaired", + m_pg_whoami, + m_pg_id, + hoid, + bogus_snaps, + snaps); + + } else { + + m_osds->clog->error() << fmt::format( + "osd.{} found snap mapper error on pg {} oid {} snaps missing in " + "mapper, should be: {} ...repaired", + m_pg_whoami, + m_pg_id, + hoid, + snaps); + } + + // now - insert the correct snap-set + m_pg->snap_mapper.add_oid(hoid, snaps, &t_drv); + } + + // wait for repair to apply to avoid confusing other bits of the system. + { + dout(15) << __func__ << " wait on repair!" << dendl; + + ceph::condition_variable my_cond; + ceph::mutex my_lock = ceph::make_mutex("PG::_scan_snaps my_lock"); + int e = 0; + bool done{false}; + + t.register_on_applied_sync(new C_SafeCond(my_lock, my_cond, &done, &e)); + + if (e = m_pg->osd->store->queue_transaction(m_pg->ch, std::move(t)); + e != 0) { + derr << __func__ << ": queue_transaction got " << cpp_strerror(e) + << dendl; + } else { + std::unique_lock l{my_lock}; + my_cond.wait(l, [&done] { return done; }); + ceph_assert(m_pg->osd->store); // RRR why? + } + dout(15) << __func__ << " wait on repair - done" << dendl; + } +} + +void PgScrubber::maps_compare_n_cleanup() +{ + m_pg->add_objects_scrubbed_count(m_be->get_primary_scrubmap().objects.size()); + + auto required_fixes = + m_be->scrub_compare_maps(m_end.is_max(), get_snap_mapper_accessor()); + if (!required_fixes.inconsistent_objs.empty()) { + if (state_test(PG_STATE_REPAIR)) { + dout(10) << __func__ << ": discarding scrub results (repairing)" << dendl; + } else { + // perform the ordered scrub-store I/O: + persist_scrub_results(std::move(required_fixes.inconsistent_objs)); + } + } + + // actuate snap-mapper changes: + apply_snap_mapper_fixes(required_fixes.snap_fix_list); + + auto chunk_err_counts = m_be->get_error_counts(); + m_shallow_errors += chunk_err_counts.shallow_errors; + m_deep_errors += chunk_err_counts.deep_errors; + + m_start = m_end; + run_callbacks(); + + // requeue the writes from the chunk that just finished + requeue_waiting(); +} + +Scrub::preemption_t& PgScrubber::get_preemptor() +{ + return preemption_data; +} + +/* + * Process note: called for the arriving "give me your map, replica!" request. + * Unlike the original implementation, we do not requeue the Op waiting for + * updates. Instead - we trigger the FSM. + */ +void PgScrubber::replica_scrub_op(OpRequestRef op) +{ + op->mark_started(); + auto msg = op->get_req(); + dout(10) << __func__ << " pg:" << m_pg->pg_id + << " Msg: map_epoch:" << msg->map_epoch + << " min_epoch:" << msg->min_epoch << " deep?" << msg->deep << dendl; + + // are we still processing a previous scrub-map request without noticing that + // the interval changed? won't see it here, but rather at the reservation + // stage. + + if (msg->map_epoch < m_pg->info.history.same_interval_since) { + dout(10) << "replica_scrub_op discarding old replica_scrub from " + << msg->map_epoch << " < " + << m_pg->info.history.same_interval_since << dendl; + + // is there a general sync issue? are we holding a stale reservation? + // not checking now - assuming we will actively react to interval change. + + return; + } + + if (is_queued_or_active()) { + // this is bug! + // Somehow, we have received a new scrub request from our Primary, before + // having finished with the previous one. Did we go through an interval + // change without reseting the FSM? Possible responses: + // - crashing (the original assert_not_active() implemented that one), or + // - trying to recover: + // - (logging enough information to debug this scenario) + // - reset the FSM. + m_osds->clog->warn() << fmt::format( + "{}: error: a second scrub-op received while handling the previous one", + __func__); + + scrub_clear_state(); + m_osds->clog->warn() << fmt::format( + "{}: after a reset. Now handling the new OP", + __func__); + } + // make sure the FSM is at NotActive + m_fsm->assert_not_active(); + + replica_scrubmap = ScrubMap{}; + replica_scrubmap_pos = ScrubMapBuilder{}; + + m_replica_min_epoch = msg->min_epoch; + m_start = msg->start; + m_end = msg->end; + m_max_end = msg->end; + m_is_deep = msg->deep; + m_interval_start = m_pg->info.history.same_interval_since; + m_replica_request_priority = msg->high_priority + ? Scrub::scrub_prio_t::high_priority + : Scrub::scrub_prio_t::low_priority; + m_flags.priority = msg->priority ? msg->priority : m_pg->get_scrub_priority(); + + preemption_data.reset(); + preemption_data.force_preemptability(msg->allow_preemption); + + replica_scrubmap_pos.reset(); // needed? RRR + + set_queued_or_active(); + m_osds->queue_for_rep_scrub(m_pg, + m_replica_request_priority, + m_flags.priority, + m_current_token); +} + +void PgScrubber::set_op_parameters(const requested_scrub_t& request) +{ + dout(10) << fmt::format("{}: @ input: {}", __func__, request) << dendl; + + set_queued_or_active(); // we are fully committed now. + + // write down the epoch of starting a new scrub. Will be used + // to discard stale messages from previous aborted scrubs. + m_epoch_start = m_pg->get_osdmap_epoch(); + + m_flags.check_repair = request.check_repair; + m_flags.auto_repair = request.auto_repair || request.need_auto; + m_flags.required = request.req_scrub || request.must_scrub; + + m_flags.priority = (request.must_scrub || request.need_auto) + ? get_pg_cct()->_conf->osd_requested_scrub_priority + : m_pg->get_scrub_priority(); + + state_set(PG_STATE_SCRUBBING); + + // will we be deep-scrubbing? + if (request.calculated_to_deep) { + state_set(PG_STATE_DEEP_SCRUB); + m_is_deep = true; + } else { + m_is_deep = false; + + // make sure we got the 'calculated_to_deep' flag right + ceph_assert(!request.must_deep_scrub); + ceph_assert(!request.need_auto); + } + + // m_is_repair is set for either 'must_repair' or 'repair-on-the-go' (i.e. + // deep-scrub with the auto_repair configuration flag set). m_is_repair value + // determines the scrubber behavior. + // + // PG_STATE_REPAIR, on the other hand, is only used for status reports (inc. + // the PG status as appearing in the logs). + m_is_repair = request.must_repair || m_flags.auto_repair; + if (request.must_repair) { + state_set(PG_STATE_REPAIR); + update_op_mode_text(); + } + + // The publishing here is required for tests synchronization. + // The PG state flags were modified. + m_pg->publish_stats_to_osd(); + m_flags.deep_scrub_on_error = request.deep_scrub_on_error; +} + + +ScrubMachineListener::MsgAndEpoch PgScrubber::prep_replica_map_msg( + PreemptionNoted was_preempted) +{ + dout(10) << __func__ << " min epoch:" << m_replica_min_epoch << dendl; + + auto reply = make_message( + spg_t(m_pg->info.pgid.pgid, m_pg->get_primary().shard), + m_replica_min_epoch, + m_pg_whoami); + + reply->preempted = (was_preempted == PreemptionNoted::preempted); + ::encode(replica_scrubmap, reply->get_data()); + + return ScrubMachineListener::MsgAndEpoch{reply, m_replica_min_epoch}; +} + +void PgScrubber::send_replica_map(const MsgAndEpoch& preprepared) +{ + m_pg->send_cluster_message(m_pg->get_primary().osd, + preprepared.m_msg, + preprepared.m_epoch, + false); +} + +void PgScrubber::send_preempted_replica() +{ + auto reply = make_message( + spg_t{m_pg->info.pgid.pgid, m_pg->get_primary().shard}, + m_replica_min_epoch, + m_pg_whoami); + + reply->preempted = true; + ::encode(replica_scrubmap, + reply->get_data()); // skipping this crashes the scrubber + m_pg->send_cluster_message(m_pg->get_primary().osd, + reply, + m_replica_min_epoch, + false); +} + +/* + * - if the replica lets us know it was interrupted, we mark the chunk as + * interrupted. The state-machine will react to that when all replica maps are + * received. + * - when all maps are received, we signal the FSM with the GotReplicas event + * (see scrub_send_replmaps_ready()). Note that due to the no-reentrancy + * limitations of the FSM, we do not 'process' the event directly. Instead - it + * is queued for the OSD to handle. + */ +void PgScrubber::map_from_replica(OpRequestRef op) +{ + auto m = op->get_req(); + dout(15) << __func__ << " " << *m << dendl; + + if (m->map_epoch < m_pg->info.history.same_interval_since) { + dout(10) << __func__ << " discarding old from " << m->map_epoch << " < " + << m_pg->info.history.same_interval_since << dendl; + return; + } + + // note: we check for active() before map_from_replica() is called. Thus, we + // know m_be is initialized + m_be->decode_received_map(m->from, *m); + + auto [is_ok, err_txt] = m_maps_status.mark_arriving_map(m->from); + if (!is_ok) { + // previously an unexpected map was triggering an assert. Now, as scrubs can + // be aborted at any time, the chances of this happening have increased, and + // aborting is not justified + dout(1) << __func__ << err_txt << " from OSD " << m->from << dendl; + return; + } + + if (m->preempted) { + dout(10) << __func__ << " replica was preempted, setting flag" << dendl; + preemption_data.do_preempt(); + } + + if (m_maps_status.are_all_maps_available()) { + dout(15) << __func__ << " all repl-maps available" << dendl; + m_osds->queue_scrub_got_repl_maps(m_pg, m_pg->is_scrub_blocking_ops()); + } +} + +void PgScrubber::handle_scrub_reserve_request(OpRequestRef op) +{ + dout(10) << __func__ << " " << *op->get_req() << dendl; + op->mark_started(); + auto request_ep = op->get_req()->get_map_epoch(); + dout(20) << fmt::format("{}: request_ep:{} recovery:{}", + __func__, + request_ep, + m_osds->is_recovery_active()) + << dendl; + + /* + * if we are currently holding a reservation, then: + * either (1) we, the scrubber, did not yet notice an interval change. The + * remembered reservation epoch is from before our interval, and we can + * silently discard the reservation (no message is required). + * or: + * + * (2) the interval hasn't changed, but the same Primary that (we think) + * holds the lock just sent us a new request. Note that we know it's the + * same Primary, as otherwise the interval would have changed. + * + * Ostensibly we can discard & redo the reservation. But then we + * will be temporarily releasing the OSD resource - and might not be able + * to grab it again. Thus, we simply treat this as a successful new request + * (but mark the fact that if there is a previous request from the primary + * to scrub a specific chunk - that request is now defunct). + */ + + if (m_remote_osd_resource.has_value() && m_remote_osd_resource->is_stale()) { + // we are holding a stale reservation from a past epoch + m_remote_osd_resource.reset(); + dout(10) << __func__ << " cleared existing stale reservation" << dendl; + } + + if (request_ep < m_pg->get_same_interval_since()) { + // will not ack stale requests + dout(10) << fmt::format("{}: stale reservation (request ep{} < {}) denied", + __func__, + request_ep, + m_pg->get_same_interval_since()) + << dendl; + return; + } + + bool granted{false}; + if (m_remote_osd_resource.has_value()) { + + dout(10) << __func__ << " already reserved. Reassigned." << dendl; + + /* + * it might well be that we did not yet finish handling the latest scrub-op + * from our primary. This happens, for example, if 'noscrub' was set via a + * command, then reset. The primary in this scenario will remain in the + * same interval, but we do need to reset our internal state (otherwise - + * the first renewed 'give me your scrub map' from the primary will see us + * in active state, crashing the OSD). + */ + advance_token(); + granted = true; + + } else if (m_pg->cct->_conf->osd_scrub_during_recovery || + !m_osds->is_recovery_active()) { + m_remote_osd_resource.emplace(this, m_pg, m_osds, request_ep); + // OSD resources allocated? + granted = m_remote_osd_resource->is_reserved(); + if (!granted) { + // just forget it + m_remote_osd_resource.reset(); + dout(20) << __func__ << ": failed to reserve remotely" << dendl; + } + } else { + dout(10) << __func__ << ": recovery is active; not granting" << dendl; + } + + dout(10) << __func__ << " reserved? " << (granted ? "yes" : "no") << dendl; + + Message* reply = new MOSDScrubReserve( + spg_t(m_pg->info.pgid.pgid, m_pg->get_primary().shard), + request_ep, + granted ? MOSDScrubReserve::GRANT : MOSDScrubReserve::REJECT, + m_pg_whoami); + + m_osds->send_message_osd_cluster(reply, op->get_req()->get_connection()); +} + +void PgScrubber::handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from) +{ + dout(10) << __func__ << " " << *op->get_req() << dendl; + op->mark_started(); + + if (m_reservations.has_value()) { + m_reservations->handle_reserve_grant(op, from); + } else { + dout(20) << __func__ << ": late/unsolicited reservation grant from osd " + << from << " (" << op << ")" << dendl; + } +} + +void PgScrubber::handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from) +{ + dout(10) << __func__ << " " << *op->get_req() << dendl; + op->mark_started(); + + if (m_reservations.has_value()) { + // there is an active reservation process. No action is required otherwise. + m_reservations->handle_reserve_reject(op, from); + } +} + +void PgScrubber::handle_scrub_reserve_release(OpRequestRef op) +{ + dout(10) << __func__ << " " << *op->get_req() << dendl; + op->mark_started(); + + /* + * this specific scrub session has terminated. All incoming events carrying + * the old tag will be discarded. + */ + advance_token(); + m_remote_osd_resource.reset(); +} + +void PgScrubber::discard_replica_reservations() +{ + dout(10) << __func__ << dendl; + if (m_reservations.has_value()) { + m_reservations->discard_all(); + } +} + +void PgScrubber::clear_scrub_reservations() +{ + dout(10) << __func__ << dendl; + m_reservations.reset(); // the remote reservations + m_local_osd_resource.reset(); // the local reservation + m_remote_osd_resource.reset(); // we as replica reserved for a Primary +} + +void PgScrubber::message_all_replicas(int32_t opcode, std::string_view op_text) +{ + ceph_assert(m_pg->recovery_state.get_backfill_targets().empty()); + + std::vector> messages; + messages.reserve(m_pg->get_actingset().size()); + + epoch_t epch = get_osdmap_epoch(); + + for (auto& p : m_pg->get_actingset()) { + + if (p == m_pg_whoami) + continue; + + dout(10) << "scrub requesting " << op_text << " from osd." << p + << " Epoch: " << epch << dendl; + Message* m = new MOSDScrubReserve(spg_t(m_pg->info.pgid.pgid, p.shard), + epch, + opcode, + m_pg_whoami); + messages.push_back(std::make_pair(p.osd, m)); + } + + if (!messages.empty()) { + m_osds->send_message_osd_cluster(messages, epch); + } +} + +void PgScrubber::unreserve_replicas() +{ + dout(10) << __func__ << dendl; + m_reservations.reset(); +} + +void PgScrubber::set_reserving_now() +{ + m_osds->get_scrub_services().set_reserving_now(); +} + +void PgScrubber::clear_reserving_now() +{ + m_osds->get_scrub_services().clear_reserving_now(); +} + +void PgScrubber::set_queued_or_active() +{ + m_queued_or_active = true; +} + +void PgScrubber::clear_queued_or_active() +{ + if (m_queued_or_active) { + m_queued_or_active = false; + // and just in case snap trimming was blocked by the aborted scrub + m_pg->snap_trimmer_scrub_complete(); + } +} + +bool PgScrubber::is_queued_or_active() const +{ + return m_queued_or_active; +} + +void PgScrubber::set_scrub_blocked(utime_t since) +{ + ceph_assert(!m_scrub_job->blocked); + // we are called from a time-triggered lambda, + // thus - not under PG-lock + PGRef pg = m_osds->osd->lookup_lock_pg(m_pg_id); + ceph_assert(pg); // 'this' here should not exist if the PG was removed + m_osds->get_scrub_services().mark_pg_scrub_blocked(m_pg_id); + m_scrub_job->blocked_since = since; + m_scrub_job->blocked = true; + m_pg->publish_stats_to_osd(); + pg->unlock(); +} + +void PgScrubber::clear_scrub_blocked() +{ + ceph_assert(m_scrub_job->blocked); + m_osds->get_scrub_services().clear_pg_scrub_blocked(m_pg_id); + m_scrub_job->blocked = false; + m_pg->publish_stats_to_osd(); +} + +/* + * note: only called for the Primary. + */ +void PgScrubber::scrub_finish() +{ + dout(10) << __func__ << " before flags: " << m_flags << ". repair state: " + << (state_test(PG_STATE_REPAIR) ? "repair" : "no-repair") + << ". deep_scrub_on_error: " << m_flags.deep_scrub_on_error << dendl; + + ceph_assert(m_pg->is_locked()); + ceph_assert(is_queued_or_active()); + + m_planned_scrub = requested_scrub_t{}; + + // if the repair request comes from auto-repair and large number of errors, + // we would like to cancel auto-repair + if (m_is_repair && m_flags.auto_repair && + m_be->authoritative_peers_count() > + static_cast(m_pg->cct->_conf->osd_scrub_auto_repair_num_errors)) { + + dout(10) << __func__ << " undoing the repair" << dendl; + state_clear(PG_STATE_REPAIR); // not expected to be set, anyway + m_is_repair = false; + update_op_mode_text(); + } + + m_be->update_repair_status(m_is_repair); + + // if a regular scrub had errors within the limit, do a deep scrub to auto + // repair + bool do_auto_scrub = false; + if (m_flags.deep_scrub_on_error && m_be->authoritative_peers_count() && + m_be->authoritative_peers_count() <= + static_cast(m_pg->cct->_conf->osd_scrub_auto_repair_num_errors)) { + ceph_assert(!m_is_deep); + do_auto_scrub = true; + dout(15) << __func__ << " Try to auto repair after scrub errors" << dendl; + } + + m_flags.deep_scrub_on_error = false; + + // type-specific finish (can tally more errors) + _scrub_finish(); + + /// \todo fix the relevant scrub test so that we would not need the extra log + /// line here (even if the following 'if' is false) + + if (m_be->authoritative_peers_count()) { + + auto err_msg = fmt::format("{} {} {} missing, {} inconsistent objects", + m_pg->info.pgid, + m_mode_desc, + m_be->m_missing.size(), + m_be->m_inconsistent.size()); + + dout(2) << err_msg << dendl; + m_osds->clog->error() << fmt::to_string(err_msg); + } + + // note that the PG_STATE_REPAIR might have changed above + if (m_be->authoritative_peers_count() && m_is_repair) { + + state_clear(PG_STATE_CLEAN); + // we know we have a problem, so it's OK to set the user-visible flag + // even if we only reached here via auto-repair + state_set(PG_STATE_REPAIR); + update_op_mode_text(); + m_be->update_repair_status(true); + m_fixed_count += m_be->scrub_process_inconsistent(); + } + + bool has_error = (m_be->authoritative_peers_count() > 0) && m_is_repair; + + { + stringstream oss; + oss << m_pg->info.pgid.pgid << " " << m_mode_desc << " "; + int total_errors = m_shallow_errors + m_deep_errors; + if (total_errors) + oss << total_errors << " errors"; + else + oss << "ok"; + if (!m_is_deep && m_pg->info.stats.stats.sum.num_deep_scrub_errors) + oss << " ( " << m_pg->info.stats.stats.sum.num_deep_scrub_errors + << " remaining deep scrub error details lost)"; + if (m_is_repair) + oss << ", " << m_fixed_count << " fixed"; + if (total_errors) + m_osds->clog->error(oss); + else + m_osds->clog->debug(oss); + } + + // Since we don't know which errors were fixed, we can only clear them + // when every one has been fixed. + if (m_is_repair) { + dout(15) << fmt::format("{}: {} errors. {} errors fixed", + __func__, + m_shallow_errors + m_deep_errors, + m_fixed_count) + << dendl; + if (m_fixed_count == m_shallow_errors + m_deep_errors) { + + ceph_assert(m_is_deep); + m_shallow_errors = 0; + m_deep_errors = 0; + dout(20) << __func__ << " All may be fixed" << dendl; + + } else if (has_error) { + + // Deep scrub in order to get corrected error counts + m_pg->scrub_after_recovery = true; + m_planned_scrub.req_scrub = m_planned_scrub.req_scrub || m_flags.required; + + dout(20) << __func__ << " Current 'required': " << m_flags.required + << " Planned 'req_scrub': " << m_planned_scrub.req_scrub + << dendl; + + } else if (m_shallow_errors || m_deep_errors) { + + // We have errors but nothing can be fixed, so there is no repair + // possible. + state_set(PG_STATE_FAILED_REPAIR); + dout(10) << __func__ << " " << (m_shallow_errors + m_deep_errors) + << " error(s) present with no repair possible" << dendl; + } + } + + { + // finish up + ObjectStore::Transaction t; + m_pg->recovery_state.update_stats( + [this](auto& history, auto& stats) { + dout(10) << "m_pg->recovery_state.update_stats() errors:" + << m_shallow_errors << "/" << m_deep_errors << " deep? " + << m_is_deep << dendl; + utime_t now = ceph_clock_now(); + history.last_scrub = m_pg->recovery_state.get_info().last_update; + history.last_scrub_stamp = now; + if (m_is_deep) { + history.last_deep_scrub = m_pg->recovery_state.get_info().last_update; + history.last_deep_scrub_stamp = now; + } + + if (m_is_deep) { + if ((m_shallow_errors == 0) && (m_deep_errors == 0)) { + history.last_clean_scrub_stamp = now; + } + stats.stats.sum.num_shallow_scrub_errors = m_shallow_errors; + stats.stats.sum.num_deep_scrub_errors = m_deep_errors; + auto omap_stats = m_be->this_scrub_omapstats(); + stats.stats.sum.num_large_omap_objects = + omap_stats.large_omap_objects; + stats.stats.sum.num_omap_bytes = omap_stats.omap_bytes; + stats.stats.sum.num_omap_keys = omap_stats.omap_keys; + dout(19) << "scrub_finish shard " << m_pg_whoami + << " num_omap_bytes = " << stats.stats.sum.num_omap_bytes + << " num_omap_keys = " << stats.stats.sum.num_omap_keys + << dendl; + } else { + stats.stats.sum.num_shallow_scrub_errors = m_shallow_errors; + // XXX: last_clean_scrub_stamp doesn't mean the pg is not inconsistent + // because of deep-scrub errors + if (m_shallow_errors == 0) { + history.last_clean_scrub_stamp = now; + } + } + + stats.stats.sum.num_scrub_errors = + stats.stats.sum.num_shallow_scrub_errors + + stats.stats.sum.num_deep_scrub_errors; + + if (m_flags.check_repair) { + m_flags.check_repair = false; + if (m_pg->info.stats.stats.sum.num_scrub_errors) { + state_set(PG_STATE_FAILED_REPAIR); + dout(10) << "scrub_finish " + << m_pg->info.stats.stats.sum.num_scrub_errors + << " error(s) still present after re-scrub" << dendl; + } + } + return true; + }, + &t); + int tr = m_osds->store->queue_transaction(m_pg->ch, std::move(t), nullptr); + ceph_assert(tr == 0); + } + + if (has_error) { + m_pg->queue_peering_event(PGPeeringEventRef( + std::make_shared(get_osdmap_epoch(), + get_osdmap_epoch(), + PeeringState::DoRecovery()))); + } else { + m_is_repair = false; + state_clear(PG_STATE_REPAIR); + update_op_mode_text(); + } + + cleanup_on_finish(); + if (do_auto_scrub) { + request_rescrubbing(m_planned_scrub); + } + + if (m_pg->is_active() && m_pg->is_primary()) { + m_pg->recovery_state.share_pg_info(); + } +} + +void PgScrubber::on_digest_updates() +{ + dout(10) << __func__ << " #pending: " << num_digest_updates_pending << " " + << (m_end.is_max() ? " " : " ") + << (is_queued_or_active() ? "" : " ** not marked as scrubbing **") + << dendl; + + if (num_digest_updates_pending > 0) { + // do nothing for now. We will be called again when new updates arrive + return; + } + + // got all updates, and finished with this chunk. Any more? + if (m_end.is_max()) { + m_osds->queue_scrub_is_finished(m_pg); + } else { + // go get a new chunk (via "requeue") + preemption_data.reset(); + m_osds->queue_scrub_next_chunk(m_pg, m_pg->is_scrub_blocking_ops()); + } +} + +/* + * note that the flags-set fetched from the PG (m_pg->m_planned_scrub) + * is cleared once scrubbing starts; Some of the values dumped here are + * thus transitory. + */ +void PgScrubber::dump_scrubber(ceph::Formatter* f, + const requested_scrub_t& request_flags) const +{ + f->open_object_section("scrubber"); + + if (m_active) { // TBD replace with PR#42780's test + f->dump_bool("active", true); + dump_active_scrubber(f, state_test(PG_STATE_DEEP_SCRUB)); + } else { + f->dump_bool("active", false); + f->dump_bool("must_scrub", + (m_planned_scrub.must_scrub || m_flags.required)); + f->dump_bool("must_deep_scrub", request_flags.must_deep_scrub); + f->dump_bool("must_repair", request_flags.must_repair); + f->dump_bool("need_auto", request_flags.need_auto); + + f->dump_stream("scrub_reg_stamp") << m_scrub_job->get_sched_time(); + + // note that we are repeating logic that is coded elsewhere (currently + // PG.cc). This is not optimal. + bool deep_expected = + (ceph_clock_now() >= m_pg->next_deepscrub_interval()) || + request_flags.must_deep_scrub || request_flags.need_auto; + auto sched_state = + m_scrub_job->scheduling_state(ceph_clock_now(), deep_expected); + f->dump_string("schedule", sched_state); + } + + if (m_publish_sessions) { + f->dump_int("test_sequence", + m_sessions_counter); // an ever-increasing number used by tests + } + + f->close_section(); +} + +void PgScrubber::dump_active_scrubber(ceph::Formatter* f, bool is_deep) const +{ + f->dump_stream("epoch_start") << m_interval_start; + f->dump_stream("start") << m_start; + f->dump_stream("end") << m_end; + f->dump_stream("max_end") << m_max_end; + f->dump_stream("subset_last_update") << m_subset_last_update; + // note that m_is_deep will be set some time after PG_STATE_DEEP_SCRUB is + // asserted. Thus, using the latter. + f->dump_bool("deep", is_deep); + + // dump the scrub-type flags + f->dump_bool("req_scrub", m_flags.required); + f->dump_bool("auto_repair", m_flags.auto_repair); + f->dump_bool("check_repair", m_flags.check_repair); + f->dump_bool("deep_scrub_on_error", m_flags.deep_scrub_on_error); + f->dump_unsigned("priority", m_flags.priority); + + f->dump_int("shallow_errors", m_shallow_errors); + f->dump_int("deep_errors", m_deep_errors); + f->dump_int("fixed", m_fixed_count); + { + f->open_array_section("waiting_on_whom"); + for (const auto& p : m_maps_status.get_awaited()) { + f->dump_stream("shard") << p; + } + f->close_section(); + } + if (m_scrub_job->blocked) { + f->dump_string("schedule", "blocked"); + } else { + f->dump_string("schedule", "scrubbing"); + } +} + +pg_scrubbing_status_t PgScrubber::get_schedule() const +{ + if (!m_scrub_job) { + return pg_scrubbing_status_t{}; + } + + dout(25) << fmt::format("{}: active:{} blocked:{}", + __func__, + m_active, + m_scrub_job->blocked) + << dendl; + + auto now_is = ceph_clock_now(); + + if (m_active) { + // report current scrub info, including updated duration + + if (m_scrub_job->blocked) { + // a bug. An object is held locked. + int32_t blocked_for = + (utime_t{now_is} - m_scrub_job->blocked_since).sec(); + return pg_scrubbing_status_t{ + utime_t{}, + blocked_for, + pg_scrub_sched_status_t::blocked, + true, // active + (m_is_deep ? scrub_level_t::deep : scrub_level_t::shallow), + false}; + + } else { + int32_t duration = (utime_t{now_is} - scrub_begin_stamp).sec(); + return pg_scrubbing_status_t{ + utime_t{}, + duration, + pg_scrub_sched_status_t::active, + true, // active + (m_is_deep ? scrub_level_t::deep : scrub_level_t::shallow), + false /* is periodic? unknown, actually */}; + } + } + if (m_scrub_job->state != ScrubQueue::qu_state_t::registered) { + return pg_scrubbing_status_t{utime_t{}, + 0, + pg_scrub_sched_status_t::not_queued, + false, + scrub_level_t::shallow, + false}; + } + + // Will next scrub surely be a deep one? note that deep-scrub might be + // selected even if we report a regular scrub here. + bool deep_expected = (now_is >= m_pg->next_deepscrub_interval()) || + m_planned_scrub.must_deep_scrub || + m_planned_scrub.need_auto; + scrub_level_t expected_level = + deep_expected ? scrub_level_t::deep : scrub_level_t::shallow; + bool periodic = !m_planned_scrub.must_scrub && !m_planned_scrub.need_auto && + !m_planned_scrub.must_deep_scrub; + + // are we ripe for scrubbing? + if (now_is > m_scrub_job->schedule.scheduled_at) { + // we are waiting for our turn at the OSD. + return pg_scrubbing_status_t{m_scrub_job->schedule.scheduled_at, + 0, + pg_scrub_sched_status_t::queued, + false, + expected_level, + periodic}; + } + + return pg_scrubbing_status_t{m_scrub_job->schedule.scheduled_at, + 0, + pg_scrub_sched_status_t::scheduled, + false, + expected_level, + periodic}; +} + +void PgScrubber::handle_query_state(ceph::Formatter* f) +{ + dout(15) << __func__ << dendl; + + f->open_object_section("scrub"); + f->dump_stream("scrubber.epoch_start") << m_interval_start; + f->dump_bool("scrubber.active", m_active); + f->dump_stream("scrubber.start") << m_start; + f->dump_stream("scrubber.end") << m_end; + f->dump_stream("scrubber.max_end") << m_max_end; + f->dump_stream("scrubber.subset_last_update") << m_subset_last_update; + f->dump_bool("scrubber.deep", m_is_deep); + { + f->open_array_section("scrubber.waiting_on_whom"); + for (const auto& p : m_maps_status.get_awaited()) { + f->dump_stream("shard") << p; + } + f->close_section(); + } + + f->dump_string("comment", "DEPRECATED - may be removed in the next release"); + + f->close_section(); +} + +PgScrubber::~PgScrubber() +{ + if (m_scrub_job) { + // make sure the OSD won't try to scrub this one just now + rm_from_osd_scrubbing(); + m_scrub_job.reset(); + } +} + +PgScrubber::PgScrubber(PG* pg) + : m_pg{pg} + , m_pg_id{pg->pg_id} + , m_osds{m_pg->osd} + , m_pg_whoami{pg->pg_whoami} + , m_planned_scrub{pg->get_planned_scrub(ScrubberPasskey{})} + , preemption_data{pg} +{ + m_fsm = std::make_unique(m_pg, this); + m_fsm->initiate(); + + m_scrub_job = ceph::make_ref(m_osds->cct, + m_pg->pg_id, + m_osds->get_nodeid()); +} + +void PgScrubber::set_scrub_begin_time() +{ + scrub_begin_stamp = ceph_clock_now(); + m_osds->clog->debug() << fmt::format( + "{} {} starts", + m_pg->info.pgid.pgid, + m_mode_desc); +} + +void PgScrubber::set_scrub_duration() +{ + utime_t stamp = ceph_clock_now(); + utime_t duration = stamp - scrub_begin_stamp; + m_pg->recovery_state.update_stats([=](auto& history, auto& stats) { + stats.last_scrub_duration = ceill(duration.to_msec() / 1000.0); + stats.scrub_duration = double(duration); + return true; + }); +} + +void PgScrubber::reserve_replicas() +{ + dout(10) << __func__ << dendl; + m_reservations.emplace( + m_pg, m_pg_whoami, m_scrub_job, m_pg->get_cct()->_conf); +} + +void PgScrubber::cleanup_on_finish() +{ + dout(10) << __func__ << dendl; + ceph_assert(m_pg->is_locked()); + + state_clear(PG_STATE_SCRUBBING); + state_clear(PG_STATE_DEEP_SCRUB); + + clear_scrub_reservations(); + requeue_waiting(); + + reset_internal_state(); + m_flags = scrub_flags_t{}; + + // type-specific state clear + _scrub_clear_state(); + // PG state flags changed: + m_pg->publish_stats_to_osd(); +} + +// uses process_event(), so must be invoked externally +void PgScrubber::scrub_clear_state() +{ + dout(10) << __func__ << dendl; + + clear_pgscrub_state(); + m_fsm->process_event(FullReset{}); +} + +/* + * note: does not access the state-machine + */ +void PgScrubber::clear_pgscrub_state() +{ + dout(10) << __func__ << dendl; + ceph_assert(m_pg->is_locked()); + + state_clear(PG_STATE_SCRUBBING); + state_clear(PG_STATE_DEEP_SCRUB); + + state_clear(PG_STATE_REPAIR); + + clear_scrub_reservations(); + requeue_waiting(); + + reset_internal_state(); + m_flags = scrub_flags_t{}; + + // type-specific state clear + _scrub_clear_state(); +} + +void PgScrubber::replica_handling_done() +{ + dout(10) << __func__ << dendl; + + state_clear(PG_STATE_SCRUBBING); + state_clear(PG_STATE_DEEP_SCRUB); + + reset_internal_state(); +} + +/* + * note: performs run_callbacks() + * note: reservations-related variables are not reset here + */ +void PgScrubber::reset_internal_state() +{ + dout(10) << __func__ << dendl; + + preemption_data.reset(); + m_maps_status.reset(); + + m_start = hobject_t{}; + m_end = hobject_t{}; + m_max_end = hobject_t{}; + m_subset_last_update = eversion_t{}; + m_shallow_errors = 0; + m_deep_errors = 0; + m_fixed_count = 0; + + run_callbacks(); + + num_digest_updates_pending = 0; + m_primary_scrubmap_pos.reset(); + replica_scrubmap = ScrubMap{}; + replica_scrubmap_pos.reset(); + m_needs_sleep = true; + m_sleep_started_at = utime_t{}; + + m_active = false; + clear_queued_or_active(); + ++m_sessions_counter; + m_be.reset(); +} + +// note that only applicable to the Replica: +void PgScrubber::advance_token() +{ + dout(10) << __func__ << " was: " << m_current_token << dendl; + m_current_token++; + + // when advance_token() is called, it is assumed that no scrubbing takes + // place. We will, though, verify that. And if we are actually still handling + // a stale request - both our internal state and the FSM state will be + // cleared. + replica_handling_done(); + m_fsm->process_event(FullReset{}); +} + +bool PgScrubber::is_token_current(Scrub::act_token_t received_token) +{ + if (received_token == 0 || received_token == m_current_token) { + return true; + } + dout(5) << __func__ << " obsolete token (" << received_token << " vs current " + << m_current_token << dendl; + + return false; +} + +const OSDMapRef& PgScrubber::get_osdmap() const +{ + return m_pg->get_osdmap(); +} + +LoggerSinkSet& PgScrubber::get_logger() const { return *m_osds->clog.get(); } + +ostream &operator<<(ostream &out, const PgScrubber &scrubber) { + return out << scrubber.m_flags; +} + +std::ostream& PgScrubber::gen_prefix(std::ostream& out) const +{ + if (m_pg) { + return m_pg->gen_prefix(out) << "scrubber<" << m_fsm_state_name << ">: "; + } else { + return out << " scrubber [" << m_pg_id << "]: "; + } +} + +void PgScrubber::log_cluster_warning(const std::string& warning) const +{ + m_osds->clog->do_log(CLOG_WARN, warning); +} + +ostream& PgScrubber::show(ostream& out) const +{ + return out << " [ " << m_pg_id << ": " << m_flags << " ] "; +} + +int PgScrubber::asok_debug(std::string_view cmd, + std::string param, + Formatter* f, + stringstream& ss) +{ + dout(10) << __func__ << " cmd: " << cmd << " param: " << param << dendl; + + if (cmd == "block") { + // 'm_debug_blockrange' causes the next 'select_range' to report a blocked + // object + m_debug_blockrange = 10; // >1, so that will trigger fast state reports + + } else if (cmd == "unblock") { + // send an 'unblock' event, as if a blocked range was freed + m_debug_blockrange = 0; + m_fsm->process_event(Unblocked{}); + + } else if ((cmd == "set") || (cmd == "unset")) { + + if (param == "sessions") { + // set/reset the inclusion of the scrub sessions counter in 'query' output + m_publish_sessions = (cmd == "set"); + + } else if (param == "block") { + if (cmd == "set") { + // set a flag that will cause the next 'select_range' to report a + // blocked object + m_debug_blockrange = 10; // >1, so that will trigger fast state reports + } else { + // send an 'unblock' event, as if a blocked range was freed + m_debug_blockrange = 0; + m_fsm->process_event(Unblocked{}); + } + } + } + + return 0; +} + +/* + * Note: under PG lock + */ +void PgScrubber::update_scrub_stats(ceph::coarse_real_clock::time_point now_is) +{ + using clock = ceph::coarse_real_clock; + using namespace std::chrono; + + const seconds period_active = seconds(m_pg->get_cct()->_conf.get_val( + "osd_stats_update_period_scrubbing")); + if (!period_active.count()) { + // a way for the operator to disable these stats updates + return; + } + const seconds period_inactive = + seconds(m_pg->get_cct()->_conf.get_val( + "osd_stats_update_period_not_scrubbing") + + m_pg_id.pgid.m_seed % 30); + + // determine the required update period, based on our current state + auto period{period_inactive}; + if (m_active) { + period = m_debug_blockrange ? 2s : period_active; + } + + /// \todo use the date library (either the one included in Arrow or directly) + /// to get the formatting of the time_points. + + if (g_conf()->subsys.should_gather()) { + // will only create the debug strings if required + char buf[50]; + auto printable_last = fmt::localtime(clock::to_time_t(m_last_stat_upd)); + strftime(buf, sizeof(buf), "%Y-%m-%dT%T", &printable_last); + dout(20) << fmt::format("{}: period: {}/{}-> {} last:{}", + __func__, + period_active, + period_inactive, + period, + buf) + << dendl; + } + + if (now_is - m_last_stat_upd > period) { + m_pg->publish_stats_to_osd(); + m_last_stat_upd = now_is; + } +} + + +// ///////////////////// preemption_data_t ////////////////////////////////// + +PgScrubber::preemption_data_t::preemption_data_t(PG* pg) : m_pg{pg} +{ + m_left = static_cast( + m_pg->get_cct()->_conf.get_val("osd_scrub_max_preemptions")); +} + +void PgScrubber::preemption_data_t::reset() +{ + std::lock_guard lk{m_preemption_lock}; + + m_preemptable = false; + m_preempted = false; + m_left = static_cast( + m_pg->cct->_conf.get_val("osd_scrub_max_preemptions")); + m_size_divisor = 1; +} + + +// ///////////////////// ReplicaReservations ////////////////////////////////// +namespace Scrub { + +void ReplicaReservations::release_replica(pg_shard_t peer, epoch_t epoch) +{ + auto m = new MOSDScrubReserve(spg_t(m_pg_info.pgid.pgid, peer.shard), + epoch, + MOSDScrubReserve::RELEASE, + m_pg->pg_whoami); + m_osds->send_message_osd_cluster(peer.osd, m, epoch); +} + +ReplicaReservations::ReplicaReservations( + PG* pg, + pg_shard_t whoami, + ScrubQueue::ScrubJobRef scrubjob, + const ConfigProxy& conf) + : m_pg{pg} + , m_acting_set{pg->get_actingset()} + , m_osds{m_pg->get_pg_osd(ScrubberPasskey())} + , m_pending{static_cast(m_acting_set.size()) - 1} + , m_pg_info{m_pg->get_pg_info(ScrubberPasskey())} + , m_scrub_job{scrubjob} + , m_conf{conf} +{ + epoch_t epoch = m_pg->get_osdmap_epoch(); + m_timeout = conf.get_val( + "osd_scrub_slow_reservation_response"); + m_log_msg_prefix = fmt::format( + "osd.{} ep: {} scrubber::ReplicaReservations pg[{}]: ", m_osds->whoami, + epoch, pg->pg_id); + + if (m_pending <= 0) { + // A special case of no replicas. + // just signal the scrub state-machine to continue + send_all_done(); + + } else { + // start a timer to handle the case of no replies + m_no_reply = make_unique( + m_osds, m_conf, *this, m_log_msg_prefix); + + // send the reservation requests + for (auto p : m_acting_set) { + if (p == whoami) + continue; + auto m = new MOSDScrubReserve( + spg_t(m_pg_info.pgid.pgid, p.shard), epoch, MOSDScrubReserve::REQUEST, + m_pg->pg_whoami); + m_osds->send_message_osd_cluster(p.osd, m, epoch); + m_waited_for_peers.push_back(p); + dout(10) << __func__ << ": reserve " << p.osd << dendl; + } + } +} + +void ReplicaReservations::send_all_done() +{ + // stop any pending timeout timer + m_no_reply.reset(); + m_osds->queue_for_scrub_granted(m_pg, scrub_prio_t::low_priority); +} + +void ReplicaReservations::send_reject() +{ + // stop any pending timeout timer + m_no_reply.reset(); + m_scrub_job->resources_failure = true; + m_osds->queue_for_scrub_denied(m_pg, scrub_prio_t::low_priority); +} + +void ReplicaReservations::discard_all() +{ + dout(10) << __func__ << ": " << m_reserved_peers << dendl; + + m_no_reply.reset(); + m_had_rejections = true; // preventing late-coming responses from triggering + // events + m_reserved_peers.clear(); + m_waited_for_peers.clear(); +} + +/* + * The following holds when update_latecomers() is called: + * - we are still waiting for replies from some of the replicas; + * - we might have already set a timer. If so, we should restart it. + * - we might have received responses from 50% of the replicas. + */ +std::optional +ReplicaReservations::update_latecomers(tpoint_t now_is) +{ + if (m_reserved_peers.size() > m_waited_for_peers.size()) { + // at least half of the replicas have already responded. Time we flag + // latecomers. + return now_is + m_timeout; + } else { + return std::nullopt; + } +} + +ReplicaReservations::~ReplicaReservations() +{ + m_had_rejections = true; // preventing late-coming responses from triggering + // events + + // stop any pending timeout timer + m_no_reply.reset(); + + // send un-reserve messages to all reserved replicas. We do not wait for + // answer (there wouldn't be one). Other incoming messages will be discarded + // on the way, by our owner. + epoch_t epoch = m_pg->get_osdmap_epoch(); + + for (auto& p : m_reserved_peers) { + release_replica(p, epoch); + } + m_reserved_peers.clear(); + + // note: the release will follow on the heels of the request. When tried + // otherwise, grants that followed a reject arrived after the whole scrub + // machine-state was reset, causing leaked reservations. + for (auto& p : m_waited_for_peers) { + release_replica(p, epoch); + } + m_waited_for_peers.clear(); +} + +/** + * @ATTN we would not reach here if the ReplicaReservation object managed by + * the scrubber was reset. + */ +void ReplicaReservations::handle_reserve_grant(OpRequestRef op, pg_shard_t from) +{ + dout(10) << __func__ << ": granted by " << from << dendl; + op->mark_started(); + + { + // reduce the amount of extra release messages. Not a must, but the log is + // cleaner + auto w = find(m_waited_for_peers.begin(), m_waited_for_peers.end(), from); + if (w != m_waited_for_peers.end()) + m_waited_for_peers.erase(w); + } + + // are we forced to reject the reservation? + if (m_had_rejections) { + + dout(10) << __func__ << ": rejecting late-coming reservation from " << from + << dendl; + release_replica(from, m_pg->get_osdmap_epoch()); + + } else if (std::find(m_reserved_peers.begin(), + m_reserved_peers.end(), + from) != m_reserved_peers.end()) { + + dout(10) << __func__ << ": already had osd." << from << " reserved" + << dendl; + + } else { + + dout(10) << __func__ << ": osd." << from << " scrub reserve = success" + << dendl; + m_reserved_peers.push_back(from); + + // was this response late? + auto now_is = clock::now(); + if (m_timeout_point && (now_is > *m_timeout_point)) { + m_osds->clog->warn() << fmt::format( + "osd.{} scrubber pg[{}]: late reservation from osd.{}", + m_osds->whoami, + m_pg->pg_id, + from); + m_timeout_point.reset(); + } else { + // possibly set a timer to warn about late-coming reservations + m_timeout_point = update_latecomers(now_is); + } + + if (--m_pending == 0) { + send_all_done(); + } + } +} + +void ReplicaReservations::handle_reserve_reject(OpRequestRef op, + pg_shard_t from) +{ + dout(10) << __func__ << ": rejected by " << from << dendl; + dout(15) << __func__ << ": " << *op->get_req() << dendl; + op->mark_started(); + + { + // reduce the amount of extra release messages. Not a must, but the log is + // cleaner + auto w = find(m_waited_for_peers.begin(), m_waited_for_peers.end(), from); + if (w != m_waited_for_peers.end()) + m_waited_for_peers.erase(w); + } + + if (m_had_rejections) { + + // our failure was already handled when the first rejection arrived + dout(15) << __func__ << ": ignoring late-coming rejection from " << from + << dendl; + + } else if (std::find(m_reserved_peers.begin(), + m_reserved_peers.end(), + from) != m_reserved_peers.end()) { + + dout(10) << __func__ << ": already had osd." << from << " reserved" + << dendl; + + } else { + + dout(10) << __func__ << ": osd." << from << " scrub reserve = fail" + << dendl; + m_had_rejections = true; // preventing any additional notifications + send_reject(); + } +} + +void ReplicaReservations::handle_no_reply_timeout() +{ + dout(1) << fmt::format( + "{}: timeout! no reply from {}", __func__, m_waited_for_peers) + << dendl; + + m_had_rejections = true; // preventing any additional notifications + send_reject(); +} + +std::ostream& ReplicaReservations::gen_prefix(std::ostream& out) const +{ + return out << m_log_msg_prefix; +} + +ReplicaReservations::no_reply_t::no_reply_t( + OSDService* osds, + const ConfigProxy& conf, + ReplicaReservations& parent, + std::string_view log_prfx) + : m_osds{osds} + , m_conf{conf} + , m_parent{parent} + , m_log_prfx{log_prfx} +{ + using namespace std::chrono; + auto now_is = clock::now(); + auto timeout = + conf.get_val("osd_scrub_reservation_timeout"); + + m_abort_callback = new LambdaContext([this, now_is]([[maybe_unused]] int r) { + // behave as if a REJECT was received + m_osds->clog->warn() << fmt::format( + "{} timeout on replica reservations (since {})", m_log_prfx, now_is); + m_parent.handle_no_reply_timeout(); + }); + + std::lock_guard l(m_osds->sleep_lock); + m_osds->sleep_timer.add_event_after(timeout, m_abort_callback); +} + +ReplicaReservations::no_reply_t::~no_reply_t() +{ + std::lock_guard l(m_osds->sleep_lock); + if (m_abort_callback) { + m_osds->sleep_timer.cancel_event(m_abort_callback); + } +} + +// ///////////////////// LocalReservation ////////////////////////////////// + +// note: no dout()s in LocalReservation functions. Client logs interactions. +LocalReservation::LocalReservation(OSDService* osds) : m_osds{osds} +{ + if (m_osds->get_scrub_services().inc_scrubs_local()) { + // a failure is signalled by not having m_holding_local_reservation set + m_holding_local_reservation = true; + } +} + +LocalReservation::~LocalReservation() +{ + if (m_holding_local_reservation) { + m_holding_local_reservation = false; + m_osds->get_scrub_services().dec_scrubs_local(); + } +} + +// ///////////////////// ReservedByRemotePrimary /////////////////////////////// + +ReservedByRemotePrimary::ReservedByRemotePrimary(const PgScrubber* scrubber, + PG* pg, + OSDService* osds, + epoch_t epoch) + : m_scrubber{scrubber} + , m_pg{pg} + , m_osds{osds} + , m_reserved_at{epoch} +{ + if (!m_osds->get_scrub_services().inc_scrubs_remote()) { + dout(10) << __func__ << ": failed to reserve at Primary request" << dendl; + // the failure is signalled by not having m_reserved_by_remote_primary set + return; + } + + dout(20) << __func__ << ": scrub resources reserved at Primary request" + << dendl; + m_reserved_by_remote_primary = true; +} + +bool ReservedByRemotePrimary::is_stale() const +{ + return m_reserved_at < m_pg->get_same_interval_since(); +} + +ReservedByRemotePrimary::~ReservedByRemotePrimary() +{ + if (m_reserved_by_remote_primary) { + m_reserved_by_remote_primary = false; + m_osds->get_scrub_services().dec_scrubs_remote(); + } +} + +std::ostream& ReservedByRemotePrimary::gen_prefix(std::ostream& out) const +{ + return m_scrubber->gen_prefix(out); +} + +// ///////////////////// MapsCollectionStatus //////////////////////////////// + +auto MapsCollectionStatus::mark_arriving_map(pg_shard_t from) + -> std::tuple +{ + auto fe = + std::find(m_maps_awaited_for.begin(), m_maps_awaited_for.end(), from); + if (fe != m_maps_awaited_for.end()) { + // we are indeed waiting for a map from this replica + m_maps_awaited_for.erase(fe); + return std::tuple{true, ""sv}; + } else { + return std::tuple{false, " unsolicited scrub-map"sv}; + } +} + +void MapsCollectionStatus::reset() +{ + *this = MapsCollectionStatus{}; +} + +std::string MapsCollectionStatus::dump() const +{ + std::string all; + for (const auto& rp : m_maps_awaited_for) { + all.append(rp.get_osd() + " "s); + } + return all; +} + +ostream& operator<<(ostream& out, const MapsCollectionStatus& sf) +{ + out << " [ "; + for (const auto& rp : sf.m_maps_awaited_for) { + out << rp.get_osd() << " "; + } + if (!sf.m_local_map_ready) { + out << " local "; + } + return out << " ] "; +} + +// ///////////////////// blocked_range_t /////////////////////////////// + +blocked_range_t::blocked_range_t(OSDService* osds, + ceph::timespan waittime, + ScrubMachineListener& scrubber, + spg_t pg_id) + : m_osds{osds} + , m_scrubber{scrubber} + , m_pgid{pg_id} +{ + auto now_is = std::chrono::system_clock::now(); + m_callbk = new LambdaContext([this, now_is]([[maybe_unused]] int r) { + std::time_t now_c = std::chrono::system_clock::to_time_t(now_is); + char buf[50]; + strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", std::localtime(&now_c)); + lgeneric_subdout(g_ceph_context, osd, 10) + << "PgScrubber: " << m_pgid + << " blocked on an object for too long (since " << buf << ")" << dendl; + m_osds->clog->warn() << "osd." << m_osds->whoami + << " PgScrubber: " << m_pgid + << " blocked on an object for too long (since " << buf + << ")"; + + m_warning_issued = true; + m_scrubber.set_scrub_blocked(utime_t{now_c,0}); + return; + }); + + std::lock_guard l(m_osds->sleep_lock); + m_osds->sleep_timer.add_event_after(waittime, m_callbk); +} + +blocked_range_t::~blocked_range_t() +{ + if (m_warning_issued) { + m_scrubber.clear_scrub_blocked(); + } + std::lock_guard l(m_osds->sleep_lock); + m_osds->sleep_timer.cancel_event(m_callbk); +} + +} // namespace Scrub diff --git a/src/osd/scrubber/pg_scrubber.h b/src/osd/scrubber/pg_scrubber.h new file mode 100644 index 000000000..10e08b72e --- /dev/null +++ b/src/osd/scrubber/pg_scrubber.h @@ -0,0 +1,1047 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +// clang-format off +/* + +Main Scrubber interfaces: + +┌──────────────────────────────────────────────┬────┐ +│ │ │ +│ │ │ +│ PG │ │ +│ │ │ +│ │ │ +├──────────────────────────────────────────────┘ │ +│ │ +│ PrimaryLogPG │ +└────────────────────────────────┬──────────────────┘ + │ + │ + │ ownes & uses + │ + │ + │ +┌────────────────────────────────▼──────────────────┐ +│ <> │ +└───────────────────────────▲───────────────────────┘ + │ + │ + │implements + │ + │ + │ +┌───────────────────────────┴───────────────┬───────┐ +│ │ │ +│ PgScrubber │ │ +│ │ │ +│ │ ├───────┐ +├───────────────────────────────────────────┘ │ │ +│ │ │ +│ PrimaryLogScrub │ │ +└─────┬───────────────────┬─────────────────────────┘ │ + │ │ implements + │ ownes & uses │ │ + │ │ ┌─────────────────────────▼──────┐ + │ │ │ <> │ + │ │ └─────────▲──────────────────────┘ + │ │ │ + │ │ │ + │ ▼ │ + │ ┌────────────────────────────────┴───────┐ + │ │ │ + │ │ ScrubMachine │ + │ │ │ + │ └────────────────────────────────────────┘ + │ + ┌───▼─────────────────────────────────┐ + │ │ + │ ScrubStore │ + │ │ + └─────────────────────────────────────┘ + +*/ +// clang-format on + + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "osd/PG.h" +#include "osd/scrubber_common.h" + +#include "ScrubStore.h" +#include "osd_scrub_sched.h" +#include "scrub_backend.h" +#include "scrub_machine_lstnr.h" + +namespace Scrub { +class ScrubMachine; +struct BuildMap; + +/** + * Reserving/freeing scrub resources at the replicas. + * + * When constructed - sends reservation requests to the acting_set. + * A rejection triggers a "couldn't acquire the replicas' scrub resources" + * event. All previous requests, whether already granted or not, are explicitly + * released. + * + * Timeouts: + * + * Slow-Secondary Warning: + * Once at least half of the replicas have accepted the reservation, we start + * reporting any secondary that takes too long (more than milliseconds + * after the previous response received) to respond to the reservation request. + * (Why? because we have encountered real-life situations where a specific OSD + * was systematically very slow (e.g. 5 seconds) to respond to the reservation + * requests, slowing the scrub process to a crawl). + * + * Reservation Timeout: + * We limit the total time we wait for the replicas to respond to the + * reservation request. If we don't get all the responses (either Grant or + * Reject) within milliseconds, we give up and release all the + * reservations we have acquired so far. + * (Why? because we have encountered instances where a reservation request was + * lost - either due to a bug or due to a network issue.) + * + * A note re performance: I've measured a few container alternatives for + * m_reserved_peers, with its specific usage pattern. Std::set is extremely + * slow, as expected. flat_set is only slightly better. Surprisingly - + * std::vector (with no sorting) is better than boost::small_vec. And for + * std::vector: no need to pre-reserve. + */ +class ReplicaReservations { + using clock = std::chrono::system_clock; + using tpoint_t = std::chrono::time_point; + + /// a no-reply timeout handler + struct no_reply_t { + explicit no_reply_t( + OSDService* osds, + const ConfigProxy& conf, + ReplicaReservations& parent, + std::string_view log_prfx); + + ~no_reply_t(); + OSDService* m_osds; + const ConfigProxy& m_conf; + ReplicaReservations& m_parent; + std::string m_log_prfx; + Context* m_abort_callback{nullptr}; + }; + + PG* m_pg; + std::set m_acting_set; + OSDService* m_osds; + std::vector m_waited_for_peers; + std::vector m_reserved_peers; + bool m_had_rejections{false}; + int m_pending{-1}; + const pg_info_t& m_pg_info; + ScrubQueue::ScrubJobRef m_scrub_job; ///< a ref to this PG's scrub job + const ConfigProxy& m_conf; + + // detecting slow peers (see 'slow-secondary' above) + std::chrono::milliseconds m_timeout; + std::optional m_timeout_point; + + // detecting & handling a "no show" of a replica + std::unique_ptr m_no_reply; + + void release_replica(pg_shard_t peer, epoch_t epoch); + + void send_all_done(); ///< all reservations are granted + + /// notify the scrubber that we have failed to reserve replicas' resources + void send_reject(); + + std::optional update_latecomers(tpoint_t now_is); + + public: + std::string m_log_msg_prefix; + + /** + * quietly discard all knowledge about existing reservations. No messages + * are sent to peers. + * To be used upon interval change, as we know the the running scrub is no + * longer relevant, and that the replicas had reset the reservations on + * their side. + */ + void discard_all(); + + ReplicaReservations(PG* pg, + pg_shard_t whoami, + ScrubQueue::ScrubJobRef scrubjob, + const ConfigProxy& conf); + + ~ReplicaReservations(); + + void handle_reserve_grant(OpRequestRef op, pg_shard_t from); + + void handle_reserve_reject(OpRequestRef op, pg_shard_t from); + + // if timing out on receiving replies from our replicas: + void handle_no_reply_timeout(); + + std::ostream& gen_prefix(std::ostream& out) const; +}; + +/** + * wraps the local OSD scrub resource reservation in an RAII wrapper + */ +class LocalReservation { + OSDService* m_osds; + bool m_holding_local_reservation{false}; + + public: + explicit LocalReservation(OSDService* osds); + ~LocalReservation(); + bool is_reserved() const { return m_holding_local_reservation; } +}; + +/** + * wraps the OSD resource we are using when reserved as a replica by a + * scrubbing primary. + */ +class ReservedByRemotePrimary { + const PgScrubber* m_scrubber; ///< we will be using its gen_prefix() + PG* m_pg; + OSDService* m_osds; + bool m_reserved_by_remote_primary{false}; + const epoch_t m_reserved_at; + + public: + ReservedByRemotePrimary(const PgScrubber* scrubber, + PG* pg, + OSDService* osds, + epoch_t epoch); + ~ReservedByRemotePrimary(); + [[nodiscard]] bool is_reserved() const + { + return m_reserved_by_remote_primary; + } + + /// compare the remembered reserved-at epoch to the current interval + [[nodiscard]] bool is_stale() const; + + std::ostream& gen_prefix(std::ostream& out) const; +}; + +/** + * Once all replicas' scrub maps are received, we go on to compare the maps. + * That is - unless we we have not yet completed building our own scrub map. + * MapsCollectionStatus combines the status of waiting for both the local map + * and the replicas, without resorting to adding dummy entries into a list. + */ +class MapsCollectionStatus { + + bool m_local_map_ready{false}; + std::vector m_maps_awaited_for; + + public: + [[nodiscard]] bool are_all_maps_available() const + { + return m_local_map_ready && m_maps_awaited_for.empty(); + } + + void mark_local_map_ready() { m_local_map_ready = true; } + + void mark_replica_map_request(pg_shard_t from_whom) + { + m_maps_awaited_for.push_back(from_whom); + } + + /// @returns true if indeed waiting for this one. Otherwise: an error string + auto mark_arriving_map(pg_shard_t from) -> std::tuple; + + [[nodiscard]] std::vector get_awaited() const + { + return m_maps_awaited_for; + } + + void reset(); + + std::string dump() const; + + friend ostream& operator<<(ostream& out, const MapsCollectionStatus& sf); +}; + + +} // namespace Scrub + + +/** + * the scrub operation flags. Primary only. + * Set at scrub start. Checked in multiple locations - mostly + * at finish. + */ +struct scrub_flags_t { + + unsigned int priority{0}; + + /** + * set by queue_scrub() if either planned_scrub.auto_repair or + * need_auto were set. + * Tested at scrub end. + */ + bool auto_repair{false}; + + /// this flag indicates that we are scrubbing post repair to verify everything + /// is fixed + bool check_repair{false}; + + /// checked at the end of the scrub, to possibly initiate a deep-scrub + bool deep_scrub_on_error{false}; + + /** + * scrub must not be aborted. + * Set for explicitly requested scrubs, and for scrubs originated by the + * pairing process with the 'repair' flag set (in the RequestScrub event). + */ + bool required{false}; +}; + +ostream& operator<<(ostream& out, const scrub_flags_t& sf); + + +/** + * The part of PG-scrubbing code that isn't state-machine wiring. + * + * Why the separation? I wish to move to a different FSM implementation. Thus I + * am forced to strongly decouple the state-machine implementation details from + * the actual scrubbing code. + */ +class PgScrubber : public ScrubPgIF, + public ScrubMachineListener, + public ScrubBeListener { + public: + explicit PgScrubber(PG* pg); + + friend class ScrubBackend; // will be replaced by a limited interface + + // ------------------ the I/F exposed to the PG (ScrubPgIF) ------------- + + /// are we waiting for resource reservation grants form our replicas? + [[nodiscard]] bool is_reserving() const final; + + void initiate_regular_scrub(epoch_t epoch_queued) final; + + void initiate_scrub_after_repair(epoch_t epoch_queued) final; + + void send_scrub_resched(epoch_t epoch_queued) final; + + void active_pushes_notification(epoch_t epoch_queued) final; + + void update_applied_notification(epoch_t epoch_queued) final; + + void send_scrub_unblock(epoch_t epoch_queued) final; + + void digest_update_notification(epoch_t epoch_queued) final; + + void send_replica_maps_ready(epoch_t epoch_queued) final; + + void send_start_replica(epoch_t epoch_queued, Scrub::act_token_t token) final; + + void send_sched_replica(epoch_t epoch_queued, Scrub::act_token_t token) final; + + void send_replica_pushes_upd(epoch_t epoch_queued) final; + /** + * The PG has updated its 'applied version'. It might be that we are waiting + * for this information: after selecting a range of objects to scrub, we've + * marked the latest version of these objects in m_subset_last_update. We will + * not start the map building before we know that the PG has reached this + * version. + */ + void on_applied_when_primary(const eversion_t& applied_version) final; + + void send_full_reset(epoch_t epoch_queued) final; + + void send_chunk_free(epoch_t epoch_queued) final; + + void send_chunk_busy(epoch_t epoch_queued) final; + + void send_local_map_done(epoch_t epoch_queued) final; + + void send_get_next_chunk(epoch_t epoch_queued) final; + + void send_scrub_is_finished(epoch_t epoch_queued) final; + + /** + * we allow some number of preemptions of the scrub, which mean we do + * not block. Then we start to block. Once we start blocking, we do + * not stop until the scrub range is completed. + */ + bool write_blocked_by_scrub(const hobject_t& soid) final; + + /// true if the given range intersects the scrub interval in any way + bool range_intersects_scrub(const hobject_t& start, + const hobject_t& end) final; + + /** + * we are a replica being asked by the Primary to reserve OSD resources for + * scrubbing + */ + void handle_scrub_reserve_request(OpRequestRef op) final; + + void handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from) final; + void handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from) final; + void handle_scrub_reserve_release(OpRequestRef op) final; + void discard_replica_reservations() final; + void clear_scrub_reservations() final; // PG::clear... fwds to here + void unreserve_replicas() final; + + // managing scrub op registration + + void update_scrub_job(const requested_scrub_t& request_flags) final; + + void rm_from_osd_scrubbing() final; + + void on_primary_change( + std::string_view caller, + const requested_scrub_t& request_flags) final; + + void scrub_requested(scrub_level_t scrub_level, + scrub_type_t scrub_type, + requested_scrub_t& req_flags) final; + + /** + * Reserve local scrub resources (managed by the OSD) + * + * Fails if OSD's local-scrubs budget was exhausted + * \returns were local resources reserved? + */ + bool reserve_local() final; + + void handle_query_state(ceph::Formatter* f) final; + + pg_scrubbing_status_t get_schedule() const final; + + void dump_scrubber(ceph::Formatter* f, + const requested_scrub_t& request_flags) const final; + + // used if we are a replica + + void replica_scrub_op(OpRequestRef op) final; + + /// the op priority, taken from the primary's request message + Scrub::scrub_prio_t replica_op_priority() const final + { + return m_replica_request_priority; + }; + + unsigned int scrub_requeue_priority( + Scrub::scrub_prio_t with_priority, + unsigned int suggested_priority) const final; + /// the version that refers to m_flags.priority + unsigned int scrub_requeue_priority( + Scrub::scrub_prio_t with_priority) const final; + + void add_callback(Context* context) final { m_callbacks.push_back(context); } + + [[nodiscard]] bool are_callbacks_pending() const final // used for an assert + // in PG.cc + { + return !m_callbacks.empty(); + } + + /// handle a message carrying a replica map + void map_from_replica(OpRequestRef op) final; + + void scrub_clear_state() final; + + bool is_queued_or_active() const final; + + /** + * add to scrub statistics, but only if the soid is below the scrub start + */ + void stats_of_handled_objects(const object_stat_sum_t& delta_stats, + const hobject_t& soid) override + { + ceph_assert(false); + } + + /** + * finalize the parameters of the initiated scrubbing session: + * + * The "current scrub" flags (m_flags) are set from the 'planned_scrub' + * flag-set; PG_STATE_SCRUBBING, and possibly PG_STATE_DEEP_SCRUB & + * PG_STATE_REPAIR are set. + */ + void set_op_parameters(const requested_scrub_t& request) final; + + void cleanup_store(ObjectStore::Transaction* t) final; + + bool get_store_errors(const scrub_ls_arg_t& arg, + scrub_ls_result_t& res_inout) const override + { + return false; + } + + void update_scrub_stats(ceph::coarse_real_clock::time_point now_is) final; + + int asok_debug(std::string_view cmd, + std::string param, + Formatter* f, + std::stringstream& ss) override; + int m_debug_blockrange{0}; + + // -------------------------------------------------------------------------- + // the I/F used by the state-machine (i.e. the implementation of + // ScrubMachineListener) + + [[nodiscard]] bool is_primary() const final + { + return m_pg->recovery_state.is_primary(); + } + + void set_state_name(const char* name) final + { + m_fsm_state_name = name; + } + + void select_range_n_notify() final; + + Scrub::BlockedRangeWarning acquire_blocked_alarm() final; + void set_scrub_blocked(utime_t since) final; + void clear_scrub_blocked() final; + + + /// walk the log to find the latest update that affects our chunk + eversion_t search_log_for_updates() const final; + + eversion_t get_last_update_applied() const final + { + return m_pg->recovery_state.get_last_update_applied(); + } + + int pending_active_pushes() const final { return m_pg->active_pushes; } + + void on_init() final; + void on_replica_init() final; + void replica_handling_done() final; + + /// the version of 'scrub_clear_state()' that does not try to invoke FSM + /// services (thus can be called from FSM reactions) + void clear_pgscrub_state() final; + + /* + * Send an 'InternalSchedScrub' FSM event either immediately, or - if + * 'm_need_sleep' is asserted - after a configuration-dependent timeout. + */ + void add_delayed_scheduling() final; + + void get_replicas_maps(bool replica_can_preempt) final; + + void on_digest_updates() final; + + void scrub_finish() final; + + ScrubMachineListener::MsgAndEpoch prep_replica_map_msg( + Scrub::PreemptionNoted was_preempted) final; + + void send_replica_map( + const ScrubMachineListener::MsgAndEpoch& preprepared) final; + + void send_preempted_replica() final; + + void send_remotes_reserved(epoch_t epoch_queued) final; + void send_reservation_failure(epoch_t epoch_queued) final; + + /** + * does the PG have newer updates than what we (the scrubber) know? + */ + [[nodiscard]] bool has_pg_marked_new_updates() const final; + + void set_subset_last_update(eversion_t e) final; + + void maps_compare_n_cleanup() final; + + Scrub::preemption_t& get_preemptor() final; + + int build_primary_map_chunk() final; + + int build_replica_map_chunk() final; + + void reserve_replicas() final; + + void set_reserving_now() final; + void clear_reserving_now() final; + + [[nodiscard]] bool was_epoch_changed() const final; + + void set_queued_or_active() final; + /// Clears `m_queued_or_active` and restarts snaptrimming + void clear_queued_or_active() final; + + void mark_local_map_ready() final; + + [[nodiscard]] bool are_all_maps_available() const final; + + std::string dump_awaited_maps() const final; + + void set_scrub_begin_time() final; + + void set_scrub_duration() final; + + utime_t scrub_begin_stamp; + std::ostream& gen_prefix(std::ostream& out) const final; + + /// facilitate scrub-backend access to SnapMapper mappings + Scrub::SnapMapReaderI& get_snap_mapper_accessor() + { + return m_pg->snap_mapper; + } + + void log_cluster_warning(const std::string& warning) const final; + + protected: + bool state_test(uint64_t m) const { return m_pg->state_test(m); } + void state_set(uint64_t m) { m_pg->state_set(m); } + void state_clear(uint64_t m) { m_pg->state_clear(m); } + + [[nodiscard]] bool is_scrub_registered() const; + + /// the 'is-in-scheduling-queue' status, using relaxed-semantics access to the + /// status + std::string_view registration_state() const; + + virtual void _scrub_clear_state() {} + + utime_t m_scrub_reg_stamp; ///< stamp we registered for + ScrubQueue::ScrubJobRef m_scrub_job; ///< the scrub-job used by the OSD to + ///< schedule us + + ostream& show(ostream& out) const override; + + public: + // ------------------ the I/F used by the ScrubBackend (ScrubBeListener) + + // note: the reason we must have these forwarders, is because of the + // artificial PG vs. PrimaryLogPG distinction. Some of the services used + // by the scrubber backend are PrimaryLog-specific. + + void add_to_stats(const object_stat_sum_t& stat) override + { + ceph_assert(0 && "expecting a PrimaryLogScrub object"); + } + + void submit_digest_fixes(const digests_fixes_t& fixes) override + { + ceph_assert(0 && "expecting a PrimaryLogScrub object"); + } + + CephContext* get_pg_cct() const final { return m_pg->cct; } + + LoggerSinkSet& get_logger() const final; + + spg_t get_pgid() const final { return m_pg->get_pgid(); } + + /// Returns reference to current osdmap + const OSDMapRef& get_osdmap() const final; + + + // --------------------------------------------------------------------------- + + friend ostream& operator<<(ostream& out, const PgScrubber& scrubber); + + static utime_t scrub_must_stamp() { return utime_t(1, 1); } + + virtual ~PgScrubber(); // must be defined separately, in the .cc file + + [[nodiscard]] bool is_scrub_active() const final { return m_active; } + + private: + void reset_internal_state(); + + /** + * the current scrubbing operation is done. We should mark that fact, so that + * all events related to the previous operation can be discarded. + */ + void advance_token(); + + bool is_token_current(Scrub::act_token_t received_token); + + void requeue_waiting() const { m_pg->requeue_ops(m_pg->waiting_for_scrub); } + + /** + * mark down some parameters of the initiated scrub: + * - the epoch when started; + * - the depth of the scrub requested (from the PG_STATE variable) + */ + void reset_epoch(epoch_t epoch_queued); + + void run_callbacks(); + + // 'query' command data for an active scrub + void dump_active_scrubber(ceph::Formatter* f, bool is_deep) const; + + // ----- methods used to verify the relevance of incoming events: + + /** + * is the incoming event still relevant and should be forwarded to the FSM? + * + * It isn't if: + * - (1) we are no longer 'actively scrubbing'; or + * - (2) the message is from an epoch prior to when we started the current + * scrub session; or + * - (3) the message epoch is from a previous interval; or + * - (4) the 'abort' configuration flags were set. + * + * For (1) & (2) - the incoming message is discarded, w/o further action. + * + * For (3): (see check_interval() for a full description) if we have not + * reacted yet to this specific new interval, we do now: + * - replica reservations are silently discarded (we count on the replicas to + * notice the interval change and un-reserve themselves); + * - the scrubbing is halted. + * + * For (4): the message will be discarded, but also: + * if this is the first time we've noticed the 'abort' request, we perform + * the abort. + * + * \returns should the incoming event be processed? + */ + bool is_message_relevant(epoch_t epoch_to_verify); + + /** + * check the 'no scrub' configuration options. + */ + [[nodiscard]] bool should_abort() const; + + /** + * Check the 'no scrub' configuration flags. + * + * Reset everything if the abort was not handled before. + * @returns false if the message was discarded due to abort flag. + */ + [[nodiscard]] bool verify_against_abort(epoch_t epoch_to_verify); + + [[nodiscard]] bool check_interval(epoch_t epoch_to_verify); + + epoch_t m_last_aborted{}; // last time we've noticed a request to abort + + bool m_needs_sleep{true}; ///< should we sleep before being rescheduled? + ///< always 'true', unless we just got out of a + ///< sleep period + + utime_t m_sleep_started_at; + + + // 'optional', as 'ReplicaReservations' & 'LocalReservation' are + // 'RAII-designed' to guarantee un-reserving when deleted. + std::optional m_reservations; + std::optional m_local_osd_resource; + + /// the 'remote' resource we, as a replica, grant our Primary when it is + /// scrubbing + std::optional m_remote_osd_resource; + + void cleanup_on_finish(); // scrub_clear_state() as called for a Primary when + // Active->NotActive + + protected: + PG* const m_pg; + + /** + * the derivative-specific scrub-finishing touches: + */ + virtual void _scrub_finish() {} + + // common code used by build_primary_map_chunk() and + // build_replica_map_chunk(): + int build_scrub_map_chunk(ScrubMap& map, // primary or replica? + ScrubMapBuilder& pos, + hobject_t start, + hobject_t end, + bool deep); + + std::unique_ptr m_fsm; + /// the FSM state, as a string for logging + const char* m_fsm_state_name{nullptr}; + const spg_t m_pg_id; ///< a local copy of m_pg->pg_id + OSDService* const m_osds; + const pg_shard_t m_pg_whoami; ///< a local copy of m_pg->pg_whoami; + + epoch_t m_interval_start{0}; ///< interval's 'from' of when scrubbing was + ///< first scheduled + + void repair_oinfo_oid(ScrubMap& smap); + + /* + * the exact epoch when the scrubbing actually started (started here - cleared + * checks for no-scrub conf). Incoming events are verified against this, with + * stale events discarded. + */ + epoch_t m_epoch_start{0}; ///< the actual epoch when scrubbing started + + /** + * (replica) a tag identifying a specific scrub "session". Incremented + * whenever the Primary releases the replica scrub resources. When the scrub + * session is terminated (even if the interval remains unchanged, as might + * happen following an asok no-scrub command), stale scrub-resched messages + * triggered by the backend will be discarded. + */ + Scrub::act_token_t m_current_token{1}; + + /** + * (primary/replica) a test aid. A counter that is incremented whenever a + * scrub starts, and again when it terminates. Exposed as part of the 'pg + * query' command, to be used by test scripts. + * + * @ATTN: not guaranteed to be accurate. To be only used for tests. This is + * why it is initialized to a meaningless number; + */ + int32_t m_sessions_counter{ + (int32_t)((int64_t)(this) & 0x0000'0000'00ff'fff0)}; + bool m_publish_sessions{false}; //< will the counter be part of 'query' + //output? + + scrub_flags_t m_flags; + + /// a reference to the details of the next scrub (as requested and managed by + /// the PG) + requested_scrub_t& m_planned_scrub; + + bool m_active{false}; + + /** + * a flag designed to prevent the initiation of a second scrub on a PG for + * which scrubbing has been initiated. + * + * set once scrubbing was initiated (i.e. - even before the FSM event that + * will trigger a state-change out of Inactive was handled), and only reset + * once the FSM is back in Inactive. + * In other words - its ON period encompasses: + * - the time period covered today by 'queued', and + * - the time when m_active is set, and + * - all the time from scrub_finish() calling update_stats() till the + * FSM handles the 'finished' event + * + * Compared with 'm_active', this flag is asserted earlier and remains ON for + * longer. + */ + bool m_queued_or_active{false}; + + eversion_t m_subset_last_update{}; + + std::unique_ptr m_store; + + int num_digest_updates_pending{0}; + hobject_t m_start, m_end; ///< note: half-closed: [start,end) + + /// Returns epoch of current osdmap + epoch_t get_osdmap_epoch() const { return get_osdmap()->get_epoch(); } + + // collected statistics + int m_shallow_errors{0}; + int m_deep_errors{0}; + int m_fixed_count{0}; + + protected: + /** + * 'm_is_deep' - is the running scrub a deep one? + * + * Note that most of the code directly checks PG_STATE_DEEP_SCRUB, which is + * primary-only (and is set earlier - when scheduling the scrub). 'm_is_deep' + * is meaningful both for the primary and the replicas, and is used as a + * parameter when building the scrub maps. + */ + bool m_is_deep{false}; + + /** + * If set: affects the backend & scrubber-backend functions called after all + * scrub maps are available. + * + * Replaces code that directly checks PG_STATE_REPAIR (which was meant to be + * a "user facing" status display only). + */ + bool m_is_repair{false}; + + /** + * User-readable summary of the scrubber's current mode of operation. Used for + * both osd.*.log and the cluster log. + * One of: + * "repair" + * "deep-scrub", + * "scrub + * + * Note: based on PG_STATE_REPAIR, and not on m_is_repair. I.e. for + * auto_repair will show as "deep-scrub" and not as "repair" (until the first + * error is detected). + */ + std::string_view m_mode_desc; + + void update_op_mode_text(); + + private: + /** + * initiate a deep-scrub after the current scrub ended with errors. + */ + void request_rescrubbing(requested_scrub_t& req_flags); + + void unregister_from_osd(); + + /* + * Select a range of objects to scrub. + * + * By: + * - setting tentative range based on conf and divisor + * - requesting a partial list of elements from the backend; + * - handling some head/clones issues + * + * The selected range is set directly into 'm_start' and 'm_end' + */ + bool select_range(); + + std::list m_callbacks; + + /** + * send a replica (un)reservation request to the acting set + * + * @param opcode - one of MOSDScrubReserve::REQUEST + * or MOSDScrubReserve::RELEASE + */ + void message_all_replicas(int32_t opcode, std::string_view op_text); + + hobject_t m_max_end; ///< Largest end that may have been sent to replicas + ScrubMapBuilder m_primary_scrubmap_pos; + + void _request_scrub_map(pg_shard_t replica, + eversion_t version, + hobject_t start, + hobject_t end, + bool deep, + bool allow_preemption); + + + Scrub::MapsCollectionStatus m_maps_status; + + void persist_scrub_results(inconsistent_objs_t&& all_errors); + void apply_snap_mapper_fixes( + const std::vector& fix_list); + + // our latest periodic 'publish_stats_to_osd()'. Required frequency depends on + // scrub state. + ceph::coarse_real_clock::time_point m_last_stat_upd{}; + + // ------------ members used if we are a replica + + epoch_t m_replica_min_epoch; ///< the min epoch needed to handle this message + + ScrubMapBuilder replica_scrubmap_pos; + ScrubMap replica_scrubmap; + + // the backend, handling the details of comparing maps & fixing objects + std::unique_ptr m_be; + + /** + * we mark the request priority as it arrived. It influences the queuing + * priority when we wait for local updates + */ + Scrub::scrub_prio_t m_replica_request_priority; + + /** + * the 'preemption' "state-machine". + * Note: I was considering an orthogonal sub-machine implementation, but as + * the state diagram is extremely simple, the added complexity wasn't + * justified. + */ + class preemption_data_t : public Scrub::preemption_t { + public: + explicit preemption_data_t(PG* pg); // the PG access is used for conf + // access (and logs) + + [[nodiscard]] bool is_preemptable() const final { return m_preemptable; } + + preemption_data_t(const preemption_data_t&) = delete; + preemption_data_t(preemption_data_t&&) = delete; + + bool do_preempt() final + { + if (m_preempted || !m_preemptable) + return false; + + std::lock_guard lk{m_preemption_lock}; + if (!m_preemptable) + return false; + + m_preempted = true; + return true; + } + + /// same as 'do_preempt()' but w/o checks (as once a replica + /// was preempted, we cannot continue) + void replica_preempted() { m_preempted = true; } + + void enable_preemption() + { + std::lock_guard lk{m_preemption_lock}; + if (are_preemptions_left() && !m_preempted) { + m_preemptable = true; + } + } + + /// used by a replica to set preemptability state according to the Primary's + /// request + void force_preemptability(bool is_allowed) + { + // note: no need to lock for a replica + m_preempted = false; + m_preemptable = is_allowed; + } + + bool disable_and_test() final + { + std::lock_guard lk{m_preemption_lock}; + m_preemptable = false; + return m_preempted; + } + + [[nodiscard]] bool was_preempted() const { return m_preempted; } + + [[nodiscard]] size_t chunk_divisor() const { return m_size_divisor; } + + void reset(); + + void adjust_parameters() final + { + std::lock_guard lk{m_preemption_lock}; + + if (m_preempted) { + m_preempted = false; + m_preemptable = adjust_left(); + } else { + m_preemptable = are_preemptions_left(); + } + } + + private: + PG* m_pg; + mutable ceph::mutex m_preemption_lock = ceph::make_mutex("preemption_lock"); + bool m_preemptable{false}; + bool m_preempted{false}; + int m_left; + size_t m_size_divisor{1}; + bool are_preemptions_left() const { return m_left > 0; } + + bool adjust_left() + { + if (m_left > 0) { + --m_left; + m_size_divisor *= 2; + } + return m_left > 0; + } + }; + + preemption_data_t preemption_data; +}; diff --git a/src/osd/scrubber/scrub_backend.cc b/src/osd/scrubber/scrub_backend.cc new file mode 100644 index 000000000..e25c5b99d --- /dev/null +++ b/src/osd/scrubber/scrub_backend.cc @@ -0,0 +1,1954 @@ +// -*- m_mode_desc:C++; tab-width:8; c-basic-offset:2; indent-tabs-m_mode_desc:t +// -*- vim: ts=2 sw=2 smarttab + +#include "./scrub_backend.h" + +#include + +#include + +#include "common/debug.h" + +#include "include/utime_fmt.h" +#include "messages/MOSDRepScrubMap.h" +#include "osd/ECUtil.h" +#include "osd/OSD.h" +#include "osd/PG.h" +#include "osd/PrimaryLogPG.h" +#include "osd/osd_types_fmt.h" + +#include "pg_scrubber.h" + +using std::set; +using std::stringstream; +using std::vector; +using namespace Scrub; +using namespace std::chrono; +using namespace std::chrono_literals; +using namespace std::literals; + +#define dout_context (m_scrubber.get_pg_cct()) +#define dout_subsys ceph_subsys_osd +#undef dout_prefix + +#define dout_prefix ScrubBackend::logger_prefix(_dout, this) + +std::ostream& ScrubBackend::logger_prefix(std::ostream* out, + const ScrubBackend* t) +{ + return t->m_scrubber.gen_prefix(*out) << " b.e.: "; +} + +// ////////////////////////////////////////////////////////////////////////// // + +// for a Primary +ScrubBackend::ScrubBackend(ScrubBeListener& scrubber, + PgScrubBeListener& pg, + pg_shard_t i_am, + bool repair, + scrub_level_t shallow_or_deep, + const std::set& acting) + : m_scrubber{scrubber} + , m_pg{pg} + , m_pg_whoami{i_am} + , m_repair{repair} + , m_depth{shallow_or_deep} + , m_pg_id{scrubber.get_pgid()} + , m_pool{m_pg.get_pgpool()} + , m_incomplete_clones_allowed{m_pool.info.allow_incomplete_clones()} + , m_conf{m_scrubber.get_pg_cct()->_conf} + , clog{m_scrubber.get_logger()} +{ + m_formatted_id = m_pg_id.calc_name_sring(); + + m_acting_but_me.reserve(acting.size()); + std::copy_if(acting.begin(), + acting.end(), + std::back_inserter(m_acting_but_me), + [i_am](const pg_shard_t& shard) { return shard != i_am; }); + + m_is_replicated = m_pool.info.is_replicated(); + m_mode_desc = + (m_repair ? "repair"sv + : (m_depth == scrub_level_t::deep ? "deep-scrub"sv : "scrub"sv)); +} + +// for a Replica +ScrubBackend::ScrubBackend(ScrubBeListener& scrubber, + PgScrubBeListener& pg, + pg_shard_t i_am, + bool repair, + scrub_level_t shallow_or_deep) + : m_scrubber{scrubber} + , m_pg{pg} + , m_pg_whoami{i_am} + , m_repair{repair} + , m_depth{shallow_or_deep} + , m_pg_id{scrubber.get_pgid()} + , m_pool{m_pg.get_pgpool()} + , m_conf{m_scrubber.get_pg_cct()->_conf} + , clog{m_scrubber.get_logger()} +{ + m_formatted_id = m_pg_id.calc_name_sring(); + m_is_replicated = m_pool.info.is_replicated(); + m_mode_desc = + (m_repair ? "repair"sv + : (m_depth == scrub_level_t::deep ? "deep-scrub"sv : "scrub"sv)); +} + +uint64_t ScrubBackend::logical_to_ondisk_size(uint64_t logical_size) const +{ + return m_pg.logical_to_ondisk_size(logical_size); +} + +void ScrubBackend::update_repair_status(bool should_repair) +{ + dout(15) << __func__ + << ": repair state set to :" << (should_repair ? "true" : "false") + << dendl; + m_repair = should_repair; + m_mode_desc = + (m_repair ? "repair"sv + : (m_depth == scrub_level_t::deep ? "deep-scrub"sv : "scrub"sv)); +} + +void ScrubBackend::new_chunk() +{ + dout(15) << __func__ << dendl; + this_chunk.emplace(m_pg_whoami); +} + +ScrubMap& ScrubBackend::get_primary_scrubmap() +{ + return this_chunk->received_maps[m_pg_whoami]; +} + +void ScrubBackend::merge_to_authoritative_set() +{ + dout(15) << __func__ << dendl; + ceph_assert(m_scrubber.is_primary()); + ceph_assert(this_chunk->authoritative_set.empty() && + "the scrubber-backend should be empty"); + + if (g_conf()->subsys.should_gather()) { + for (const auto& rpl : m_acting_but_me) { + dout(15) << fmt::format("{}: replica {} has {} items", + __func__, + rpl, + this_chunk->received_maps[rpl].objects.size()) + << dendl; + } + } + + // Construct the authoritative set of objects + for (const auto& map : this_chunk->received_maps) { + std::transform(map.second.objects.begin(), + map.second.objects.end(), + std::inserter(this_chunk->authoritative_set, + this_chunk->authoritative_set.end()), + [](const auto& i) { return i.first; }); + } +} + +ScrubMap& ScrubBackend::my_map() +{ + return this_chunk->received_maps[m_pg_whoami]; +} + +void ScrubBackend::decode_received_map(pg_shard_t from, + const MOSDRepScrubMap& msg) +{ + auto p = const_cast(msg.get_data()).cbegin(); + this_chunk->received_maps[from].decode(p, m_pool.id); + + dout(15) << __func__ << ": decoded map from : " << from + << ": versions: " << this_chunk->received_maps[from].valid_through + << " / " << msg.get_map_epoch() << dendl; +} + + +std::vector ScrubBackend::replica_clean_meta( + ScrubMap& repl_map, + bool max_reached, + const hobject_t& start, + SnapMapReaderI& snaps_getter) +{ + dout(15) << __func__ << ": REPL META # " << m_cleaned_meta_map.objects.size() + << " objects" << dendl; + ceph_assert(!m_cleaned_meta_map.objects.size()); + m_cleaned_meta_map.clear_from(start); // RRR how can this be required? + m_cleaned_meta_map.insert(repl_map); + auto for_meta_scrub = clean_meta_map(m_cleaned_meta_map, max_reached); + return scan_snaps(for_meta_scrub, snaps_getter); +} + + +// ///////////////////////////////////////////////////////////////////////////// +// +// comparing the maps +// +// ///////////////////////////////////////////////////////////////////////////// + +objs_fix_list_t ScrubBackend::scrub_compare_maps( + bool max_reached, + SnapMapReaderI& snaps_getter) +{ + dout(10) << __func__ << " has maps, analyzing" << dendl; + ceph_assert(m_scrubber.is_primary()); + + // construct authoritative scrub map for type-specific scrubbing + + m_cleaned_meta_map.insert(my_map()); + merge_to_authoritative_set(); + + // collect some omap statistics into m_omap_stats + omap_checks(); + + update_authoritative(); + auto for_meta_scrub = clean_meta_map(m_cleaned_meta_map, max_reached); + + // ok, do the pg-type specific scrubbing + + // (Validates consistency of the object info and snap sets) + scrub_snapshot_metadata(for_meta_scrub); + + return objs_fix_list_t{std::move(this_chunk->m_inconsistent_objs), + scan_snaps(for_meta_scrub, snaps_getter)}; +} + +void ScrubBackend::omap_checks() +{ + const bool needs_omap_check = std::any_of( + this_chunk->received_maps.begin(), + this_chunk->received_maps.end(), + [](const auto& m) -> bool { + return m.second.has_large_omap_object_errors || m.second.has_omap_keys; + }); + + if (!needs_omap_check) { + return; // Nothing to do + } + + stringstream wss; + + // Iterate through objects and update omap stats + for (const auto& ho : this_chunk->authoritative_set) { + + for (const auto& [srd, smap] : this_chunk->received_maps) { + if (srd != m_pg_whoami) { + // Only set omap stats for the primary + continue; + } + + auto it = smap.objects.find(ho); + if (it == smap.objects.end()) { + continue; + } + + const ScrubMap::object& smap_obj = it->second; + m_omap_stats.omap_bytes += smap_obj.object_omap_bytes; + m_omap_stats.omap_keys += smap_obj.object_omap_keys; + if (smap_obj.large_omap_object_found) { + auto osdmap = m_scrubber.get_osdmap(); + pg_t pg; + osdmap->map_to_pg(ho.pool, ho.oid.name, ho.get_key(), ho.nspace, &pg); + pg_t mpg = osdmap->raw_pg_to_pg(pg); + m_omap_stats.large_omap_objects++; + wss << "Large omap object found. Object: " << ho << " PG: " << pg + << " (" << mpg << ")" + << " Key count: " << smap_obj.large_omap_object_key_count + << " Size (bytes): " << smap_obj.large_omap_object_value_size + << '\n'; + break; + } + } + } + + if (!wss.str().empty()) { + dout(5) << __func__ << ": " << wss.str() << dendl; + clog.warn(wss); + } +} + +/* + * update_authoritative() updates: + * + * - m_auth_peers: adds obj-> list of pairs < scrub-map, shard> + * + * - m_cleaned_meta_map: replaces [obj] entry with: + * the relevant object in the scrub-map of the "selected" (back-most) peer + */ +void ScrubBackend::update_authoritative() +{ + dout(10) << __func__ << dendl; + + if (m_acting_but_me.empty()) { + return; + } + + compare_smaps(); // note: might cluster-log errors + + // update the session-wide m_auth_peers with the list of good + // peers for each object (i.e. the ones that are in this_chunks's auth list) + for (auto& [obj, peers] : this_chunk->authoritative) { + + auth_peers_t good_peers; + + for (auto& peer : peers) { + good_peers.emplace_back(this_chunk->received_maps[peer].objects[obj], + peer); + } + + m_auth_peers.emplace(obj, std::move(good_peers)); + } + + for (const auto& [obj, peers] : this_chunk->authoritative) { + m_cleaned_meta_map.objects.erase(obj); + m_cleaned_meta_map.objects.insert( + *(this_chunk->received_maps[peers.back()].objects.find(obj))); + } +} + +int ScrubBackend::scrub_process_inconsistent() +{ + dout(20) << fmt::format("{}: {} (m_repair:{}) good peers tbl #: {}", + __func__, + m_mode_desc, + m_repair, + m_auth_peers.size()) + << dendl; + + ceph_assert(!m_auth_peers.empty()); + // authoritative only store objects which are missing or inconsistent. + + // some tests expect an error message that does not contain the __func__ and + // PG: + auto err_msg = fmt::format("{} {} {} missing, {} inconsistent objects", + m_formatted_id, + m_mode_desc, + m_missing.size(), + m_inconsistent.size()); + + dout(4) << err_msg << dendl; + clog.error() << err_msg; + + ceph_assert(m_repair); + int fixed_cnt{0}; + + for (const auto& [hobj, shrd_list] : m_auth_peers) { + + auto missing_entry = m_missing.find(hobj); + + if (missing_entry != m_missing.end()) { + repair_object(hobj, shrd_list, missing_entry->second); + fixed_cnt += missing_entry->second.size(); + } + + if (m_inconsistent.count(hobj)) { + repair_object(hobj, shrd_list, m_inconsistent[hobj]); + fixed_cnt += m_inconsistent[hobj].size(); + } + } + return fixed_cnt; +} + +void ScrubBackend::repair_object(const hobject_t& soid, + const auth_peers_t& ok_peers, + const set& bad_peers) +{ + if (g_conf()->subsys.should_gather()) { + // log the good peers + set ok_shards; // the shards from the ok_peers list + for (const auto& peer : ok_peers) { + ok_shards.insert(peer.second); + } + dout(10) << fmt::format( + "repair_object {} bad_peers osd.{{{}}}, ok_peers osd.{{{}}}", + soid, + bad_peers, + ok_shards) + << dendl; + } + + const ScrubMap::object& po = ok_peers.back().first; + + object_info_t oi; + try { + bufferlist bv; + if (po.attrs.count(OI_ATTR)) { + bv.push_back(po.attrs.find(OI_ATTR)->second); + } + auto bliter = bv.cbegin(); + decode(oi, bliter); + } catch (...) { + dout(0) << __func__ + << ": Need version of replica, bad object_info_t: " << soid + << dendl; + ceph_abort(); + } + + if (bad_peers.count(m_pg.get_primary())) { + // We should only be scrubbing if the PG is clean. + ceph_assert(!m_pg.is_waiting_for_unreadable_object()); + dout(10) << __func__ << ": primary = " << m_pg.get_primary() << dendl; + } + + // No need to pass ok_peers, they must not be missing the object, so + // force_object_missing will add them to missing_loc anyway + m_pg.force_object_missing(ScrubberPasskey{}, bad_peers, soid, oi.version); +} + + +// ///////////////////////////////////////////////////////////////////////////// +// +// components formerly of PGBackend::be_compare_scrubmaps() +// +// ///////////////////////////////////////////////////////////////////////////// + +using usable_t = shard_as_auth_t::usable_t; + + +static inline int dcount(const object_info_t& oi) +{ + return (oi.is_data_digest() ? 1 : 0) + (oi.is_omap_digest() ? 1 : 0); +} + +auth_selection_t ScrubBackend::select_auth_object(const hobject_t& ho, + stringstream& errstream) +{ + // Create a list of shards (with the Primary first, so that it will be + // auth-copy, all other things being equal) + + /// \todo: consider sorting the candidate shards by the conditions for + /// selecting best auth source below. Then - stopping on the first one + /// that is auth eligible. + /// This creates an issue with 'digest_match' that should be handled. + std::list shards; + for (const auto& [srd, smap] : this_chunk->received_maps) { + if (srd != m_pg_whoami) { + shards.push_back(srd); + } + } + shards.push_front(m_pg_whoami); + + auth_selection_t ret_auth; + ret_auth.auth = this_chunk->received_maps.end(); + eversion_t auth_version; + + for (auto& l : shards) { + + auto shard_ret = possible_auth_shard(ho, l, ret_auth.shard_map); + + // digest_match will only be true if computed digests are the same + if (auth_version != eversion_t() && + ret_auth.auth->second.objects[ho].digest_present && + shard_ret.digest.has_value() && + ret_auth.auth->second.objects[ho].digest != *shard_ret.digest) { + + ret_auth.digest_match = false; + dout(10) << fmt::format( + "{}: digest_match = false, {} data_digest 0x{:x} != " + "data_digest 0x{:x}", + __func__, + ho, + ret_auth.auth->second.objects[ho].digest, + *shard_ret.digest) + << dendl; + } + + dout(20) + << fmt::format("{}: {} shard {} got:{:D}", __func__, ho, l, shard_ret) + << dendl; + + if (shard_ret.possible_auth == shard_as_auth_t::usable_t::not_usable) { + + // Don't use this particular shard due to previous errors + // XXX: For now we can't pick one shard for repair and another's object + // info or snapset + + ceph_assert(shard_ret.error_text.length()); + errstream << m_pg_id.pgid << " shard " << l << " soid " << ho << " : " + << shard_ret.error_text << "\n"; + + } else if (shard_ret.possible_auth == + shard_as_auth_t::usable_t::not_found) { + + // do not emit the returned error message to the log + dout(15) << fmt::format("{}: {} not found on shard {}", __func__, ho, l) + << dendl; + } else { + + dout(30) << fmt::format("{}: consider using {} srv: {} oi soid: {}", + __func__, + l, + shard_ret.oi.version, + shard_ret.oi.soid) + << dendl; + + // consider using this shard as authoritative. Is it more recent? + + if (auth_version == eversion_t() || shard_ret.oi.version > auth_version || + (shard_ret.oi.version == auth_version && + dcount(shard_ret.oi) > dcount(ret_auth.auth_oi))) { + + dout(20) << fmt::format("{}: using {} moved auth oi {:p} <-> {:p}", + __func__, + l, + (void*)&ret_auth.auth_oi, + (void*)&shard_ret.oi) + << dendl; + + ret_auth.auth = shard_ret.auth_iter; + ret_auth.auth_shard = ret_auth.auth->first; + ret_auth.auth_oi = shard_ret.oi; + auth_version = shard_ret.oi.version; + ret_auth.is_auth_available = true; + } + } + } + + dout(10) << fmt::format("{}: selecting osd {} for obj {} with oi {}", + __func__, + ret_auth.auth_shard, + ho, + ret_auth.auth_oi) + << dendl; + + return ret_auth; +} + +using set_sinfo_err_t = void (shard_info_wrapper::*)(); + +inline static const char* sep(bool& prev_err) +{ + if (prev_err) { + return ", "; + } else { + prev_err = true; + return ""; + } +} + +// retval: should we continue with the tests +static inline bool dup_error_cond(bool& prev_err, + bool continue_on_err, + bool pred, + shard_info_wrapper& si, + set_sinfo_err_t sete, + std::string_view msg, + stringstream& errstream) +{ + if (pred) { + (si.*sete)(); + errstream << sep(prev_err) << msg; + return continue_on_err; + } + return true; +} + +/** + * calls a shard_info_wrapper function, but only if the error predicate is + * true. + * Returns a copy of the error status. + */ +static inline bool test_error_cond(bool error_pred, + shard_info_wrapper& si, + set_sinfo_err_t sete) +{ + if (error_pred) { + (si.*sete)(); + } + return error_pred; +} + +shard_as_auth_t ScrubBackend::possible_auth_shard(const hobject_t& obj, + const pg_shard_t& srd, + shard_info_map_t& shard_map) +{ + // 'maps' (originally called with this_chunk->maps): this_chunk->maps + // 'auth_oi' (called with 'auth_oi', which wasn't initialized at call site) + // - create and return + // 'shard_map' - the one created in select_auth_object() + // - used to access the 'shard_info' + + const auto j = this_chunk->received_maps.find(srd); + const auto& j_shard = j->first; + const auto& j_smap = j->second; + auto i = j_smap.objects.find(obj); + if (i == j_smap.objects.end()) { + return shard_as_auth_t{}; + } + const auto& smap_obj = i->second; + + auto& shard_info = shard_map[j_shard]; + if (j_shard == m_pg_whoami) { + shard_info.primary = true; + } + + stringstream errstream; // for this shard + + bool err{false}; + dup_error_cond(err, + true, + smap_obj.read_error, + shard_info, + &shard_info_wrapper::set_read_error, + "candidate had a read error"sv, + errstream); + dup_error_cond(err, + true, + smap_obj.ec_hash_mismatch, + shard_info, + &shard_info_wrapper::set_ec_hash_mismatch, + "candidate had an ec hash mismatch"sv, + errstream); + dup_error_cond(err, + true, + smap_obj.ec_size_mismatch, + shard_info, + &shard_info_wrapper::set_ec_size_mismatch, + "candidate had an ec size mismatch"sv, + errstream); + + if (!dup_error_cond(err, + false, + smap_obj.stat_error, + shard_info, + &shard_info_wrapper::set_stat_error, + "candidate had a stat error"sv, + errstream)) { + // With stat_error no further checking + // We don't need to also see a missing_object_info_attr + return shard_as_auth_t{errstream.str()}; + } + + // We won't pick an auth copy if the snapset is missing or won't decode. + ceph_assert(!obj.is_snapdir()); + + if (obj.is_head()) { + auto k = smap_obj.attrs.find(SS_ATTR); + if (dup_error_cond(err, + false, + (k == smap_obj.attrs.end()), + shard_info, + &shard_info_wrapper::set_snapset_missing, + "candidate had a missing snapset key"sv, + errstream)) { + bufferlist ss_bl; + SnapSet snapset; + ss_bl.push_back(k->second); + try { + auto bliter = ss_bl.cbegin(); + decode(snapset, bliter); + } catch (...) { + // invalid snapset, probably corrupt + dup_error_cond(err, + false, + true, + shard_info, + &shard_info_wrapper::set_snapset_corrupted, + "candidate had a corrupt snapset"sv, + errstream); + } + } else { + // debug@dev only + dout(30) << fmt::format( + "{} missing snap addr: {:p} shard_info: {:p} er: {:x}", + __func__, + (void*)&smap_obj, + (void*)&shard_info, + shard_info.errors) + << dendl; + } + } + + if (!m_is_replicated) { + auto k = smap_obj.attrs.find(ECUtil::get_hinfo_key()); + if (dup_error_cond(err, + false, + (k == smap_obj.attrs.end()), + shard_info, + &shard_info_wrapper::set_hinfo_missing, + "candidate had a missing hinfo key"sv, + errstream)) { + bufferlist hk_bl; + ECUtil::HashInfo hi; + hk_bl.push_back(k->second); + try { + auto bliter = hk_bl.cbegin(); + decode(hi, bliter); + } catch (...) { + dup_error_cond(err, + false, + true, + shard_info, + &shard_info_wrapper::set_hinfo_corrupted, + "candidate had a corrupt hinfo"sv, + errstream); + } + } + } + + object_info_t oi; + + { + auto k = smap_obj.attrs.find(OI_ATTR); + if (!dup_error_cond(err, + false, + (k == smap_obj.attrs.end()), + shard_info, + &shard_info_wrapper::set_info_missing, + "candidate had a missing info key"sv, + errstream)) { + // no object info on object, probably corrupt + return shard_as_auth_t{errstream.str()}; + } + + bufferlist bl; + bl.push_back(k->second); + try { + auto bliter = bl.cbegin(); + decode(oi, bliter); + } catch (...) { + // invalid object info, probably corrupt + if (!dup_error_cond(err, + false, + true, + shard_info, + &shard_info_wrapper::set_info_corrupted, + "candidate had a corrupt info"sv, + errstream)) { + return shard_as_auth_t{errstream.str()}; + } + } + } + + // This is automatically corrected in repair_oinfo_oid() + ceph_assert(oi.soid == obj); + + if (test_error_cond(smap_obj.size != logical_to_ondisk_size(oi.size), + shard_info, + &shard_info_wrapper::set_obj_size_info_mismatch)) { + + errstream << sep(err) << "candidate size " << smap_obj.size << " info size " + << logical_to_ondisk_size(oi.size) << " mismatch"; + } + + std::optional digest; + if (smap_obj.digest_present) { + digest = smap_obj.digest; + } + + if (shard_info.errors) { + ceph_assert(err); + return shard_as_auth_t{errstream.str(), digest}; + } + + ceph_assert(!err); + // note that the error text is made available to the caller, even + // for a successful shard selection + return shard_as_auth_t{oi, j, errstream.str(), digest}; +} + +// re-implementation of PGBackend::be_compare_scrubmaps() +void ScrubBackend::compare_smaps() +{ + dout(10) << __func__ + << ": authoritative-set #: " << this_chunk->authoritative_set.size() + << dendl; + + std::for_each(this_chunk->authoritative_set.begin(), + this_chunk->authoritative_set.end(), + [this](const auto& ho) { + if (auto maybe_clust_err = compare_obj_in_maps(ho); + maybe_clust_err) { + clog.error() << *maybe_clust_err; + } + }); +} + +std::optional ScrubBackend::compare_obj_in_maps( + const hobject_t& ho) +{ + // clear per-object data: + this_chunk->cur_inconsistent.clear(); + this_chunk->cur_missing.clear(); + this_chunk->fix_digest = false; + + stringstream candidates_errors; + auto auth_res = select_auth_object(ho, candidates_errors); + if (candidates_errors.str().size()) { + // a collection of shard-specific errors detected while + // finding the best shard to serve as authoritative + clog.error() << candidates_errors.str(); + } + + inconsistent_obj_wrapper object_error{ho}; + if (!auth_res.is_auth_available) { + // no auth selected + object_error.set_version(0); + object_error.set_auth_missing(ho, + this_chunk->received_maps, + auth_res.shard_map, + this_chunk->m_error_counts.shallow_errors, + this_chunk->m_error_counts.deep_errors, + m_pg_whoami); + + if (object_error.has_deep_errors()) { + this_chunk->m_error_counts.deep_errors++; + } else if (object_error.has_shallow_errors()) { + this_chunk->m_error_counts.shallow_errors++; + } + + this_chunk->m_inconsistent_objs.push_back(std::move(object_error)); + return fmt::format("{} soid {} : failed to pick suitable object info\n", + m_scrubber.get_pgid().pgid, + ho); + } + + stringstream errstream; + auto& auth = auth_res.auth; + + // an auth source was selected + + object_error.set_version(auth_res.auth_oi.user_version); + ScrubMap::object& auth_object = auth->second.objects[ho]; + ceph_assert(!this_chunk->fix_digest); + + auto [auths, objerrs] = + match_in_shards(ho, auth_res, object_error, errstream); + + auto opt_ers = + for_empty_auth_list(std::move(auths), + std::move(objerrs), + auth, + ho, + errstream); + + if (opt_ers.has_value()) { + + // At this point auth_list is populated, so we add the object error + // shards as inconsistent. + inconsistents(ho, + auth_object, + auth_res.auth_oi, + std::move(*opt_ers), + errstream); + } else { + + // both the auth & errs containers are empty + errstream << m_pg_id << " soid " << ho << " : empty auth list\n"; + } + + if (object_error.has_deep_errors()) { + this_chunk->m_error_counts.deep_errors++; + } else if (object_error.has_shallow_errors()) { + this_chunk->m_error_counts.shallow_errors++; + } + + if (object_error.errors || object_error.union_shards.errors) { + this_chunk->m_inconsistent_objs.push_back(std::move(object_error)); + } + + if (errstream.str().empty()) { + return std::nullopt; + } else { + return errstream.str(); + } +} + + +std::optional +ScrubBackend::for_empty_auth_list(std::list&& auths, + std::set&& obj_errors, + shard_to_scrubmap_t::iterator auth, + const hobject_t& ho, + stringstream& errstream) +{ + if (auths.empty()) { + if (obj_errors.empty()) { + errstream << m_pg_id << " soid " << ho + << " : failed to pick suitable auth object\n"; + return std::nullopt; + } + // Object errors exist and nothing in auth_list + // Prefer the auth shard, otherwise take first from list. + pg_shard_t shard; + if (obj_errors.count(auth->first)) { + shard = auth->first; + } else { + shard = *(obj_errors.begin()); + } + + auths.push_back(shard); + obj_errors.erase(shard); + } + + return ScrubBackend::auth_and_obj_errs_t{std::move(auths), + std::move(obj_errors)}; +} + + +/// \todo replace the errstream with a member of this_chunk. Better be a +/// fmt::buffer. Then - we can use it directly in should_fix_digest() +void ScrubBackend::inconsistents(const hobject_t& ho, + ScrubMap::object& auth_object, + object_info_t& auth_oi, + auth_and_obj_errs_t&& auth_n_errs, + stringstream& errstream) +{ + auto& object_errors = auth_n_errs.object_errors; + auto& auth_list = auth_n_errs.auth_list; + + this_chunk->cur_inconsistent.insert(object_errors.begin(), + object_errors.end()); // merge? + + dout(15) << fmt::format( + "{}: object errors #: {} auth list #: {} cur_missing #: {} " + "cur_incon #: {}", + __func__, + object_errors.size(), + auth_list.size(), + this_chunk->cur_missing.size(), + this_chunk->cur_inconsistent.size()) + << dendl; + + + if (!this_chunk->cur_missing.empty()) { + m_missing[ho] = this_chunk->cur_missing; + } + if (!this_chunk->cur_inconsistent.empty()) { + m_inconsistent[ho] = this_chunk->cur_inconsistent; + } + + if (this_chunk->fix_digest) { + + ceph_assert(auth_object.digest_present); + std::optional data_digest{auth_object.digest}; + + std::optional omap_digest; + if (auth_object.omap_digest_present) { + omap_digest = auth_object.omap_digest; + } + this_chunk->missing_digest.push_back( + make_pair(ho, make_pair(data_digest, omap_digest))); + } + + if (!this_chunk->cur_inconsistent.empty() || + !this_chunk->cur_missing.empty()) { + + this_chunk->authoritative[ho] = auth_list; + + } else if (!this_chunk->fix_digest && m_is_replicated) { + + auto is_to_fix = + should_fix_digest(ho, auth_object, auth_oi, m_repair, errstream); + + switch (is_to_fix) { + + case digest_fixing_t::no: + break; + + case digest_fixing_t::if_aged: { + utime_t age = this_chunk->started - auth_oi.local_mtime; + + // \todo find out 'age_limit' only once + const auto age_limit = m_conf->osd_deep_scrub_update_digest_min_age; + + if (age <= age_limit) { + dout(20) << __func__ << ": missing digest but age (" << age + << ") < conf (" << age_limit << ") on " << ho << dendl; + break; + } + } + + [[fallthrough]]; + + case digest_fixing_t::force: + + std::optional data_digest; + if (auth_object.digest_present) { + data_digest = auth_object.digest; + dout(20) << __func__ << ": will update data digest on " << ho + << dendl; + } + + std::optional omap_digest; + if (auth_object.omap_digest_present) { + omap_digest = auth_object.omap_digest; + dout(20) << __func__ << ": will update omap digest on " << ho + << dendl; + } + this_chunk->missing_digest.push_back( + make_pair(ho, make_pair(data_digest, omap_digest))); + break; + } + } +} + +/// \todo consider changing to use format() and to return the strings +ScrubBackend::digest_fixing_t ScrubBackend::should_fix_digest( + const hobject_t& ho, + const ScrubMap::object& auth_object, + const object_info_t& auth_oi, + bool repair_flag, + stringstream& errstream) +{ + digest_fixing_t update{digest_fixing_t::no}; + + if (auth_object.digest_present && !auth_oi.is_data_digest()) { + dout(15) << __func__ << " missing data digest on " << ho << dendl; + update = digest_fixing_t::if_aged; + } + + if (auth_object.omap_digest_present && !auth_oi.is_omap_digest()) { + dout(15) << __func__ << " missing omap digest on " << ho << dendl; + update = digest_fixing_t::if_aged; + } + + // recorded digest != actual digest? + if (auth_oi.is_data_digest() && auth_object.digest_present && + auth_oi.data_digest != auth_object.digest) { + errstream << m_pg_id << " recorded data digest 0x" << std::hex + << auth_oi.data_digest << " != on disk 0x" << auth_object.digest + << std::dec << " on " << auth_oi.soid << "\n"; + if (repair_flag) + update = digest_fixing_t::force; + } + + if (auth_oi.is_omap_digest() && auth_object.omap_digest_present && + auth_oi.omap_digest != auth_object.omap_digest) { + errstream << m_pg_id << " recorded omap digest 0x" << std::hex + << auth_oi.omap_digest << " != on disk 0x" + << auth_object.omap_digest << std::dec << " on " << auth_oi.soid + << "\n"; + if (repair_flag) + update = digest_fixing_t::force; + } + + return update; +} + +ScrubBackend::auth_and_obj_errs_t ScrubBackend::match_in_shards( + const hobject_t& ho, + auth_selection_t& auth_sel, + inconsistent_obj_wrapper& obj_result, + stringstream& errstream) +{ + std::list auth_list; // out "param" to + std::set object_errors; // be returned + + for (auto& [srd, smap] : this_chunk->received_maps) { + + if (srd == auth_sel.auth_shard) { + auth_sel.shard_map[auth_sel.auth_shard].selected_oi = true; + } + + if (smap.objects.count(ho)) { + + // the scrub-map has our object + auth_sel.shard_map[srd].set_object(smap.objects[ho]); + + // Compare + stringstream ss; + const auto& auth_object = auth_sel.auth->second.objects[ho]; + const bool discrep_found = compare_obj_details(auth_sel.auth_shard, + auth_object, + auth_sel.auth_oi, + smap.objects[ho], + auth_sel.shard_map[srd], + obj_result, + ss, + ho.has_snapset()); + + dout(20) << fmt::format( + "{}: {}{} <{}:{}> shards: {} {} {}", __func__, + (m_repair ? "repair " : ""), + (m_is_replicated ? "replicated " : ""), srd, + (srd == auth_sel.auth_shard ? "auth" : "-"), + auth_sel.shard_map.size(), + (auth_sel.digest_match ? " digest_match " : " "), + (auth_sel.shard_map[srd].only_data_digest_mismatch_info() + ? "'info mismatch info'" + : "")) + << dendl; + if (discrep_found) { + dout(10) << fmt::format( + "{}: <{}> auth:{} ({}/{}) vs {} ({}/{}) {}", __func__, ho, + auth_sel.auth_shard, auth_object.omap_digest_present, + auth_object.omap_digest, srd, + smap.objects[ho].omap_digest_present ? true : false, + smap.objects[ho].omap_digest, ss.str()) + << dendl; + } + + // If all replicas match, but they don't match object_info we can + // repair it by using missing_digest mechanism + if (m_repair && m_is_replicated && (srd == auth_sel.auth_shard) && + auth_sel.shard_map.size() > 1 && auth_sel.digest_match && + auth_sel.shard_map[srd].only_data_digest_mismatch_info() && + auth_object.digest_present) { + // Set in missing_digests + this_chunk->fix_digest = true; + // Clear the error + auth_sel.shard_map[srd].clear_data_digest_mismatch_info(); + errstream << m_pg_id << " soid " << ho + << " : repairing object info data_digest" + << "\n"; + } + + // Some errors might have already been set in select_auth_object() + if (auth_sel.shard_map[srd].errors != 0) { + + this_chunk->cur_inconsistent.insert(srd); + if (auth_sel.shard_map[srd].has_deep_errors()) { + this_chunk->m_error_counts.deep_errors++; + } else { + this_chunk->m_error_counts.shallow_errors++; + } + + if (discrep_found) { + // Only true if compare_obj_details() found errors and put something + // in ss + errstream << m_pg_id << " shard " << srd << " soid " << ho << " : " + << ss.str() << "\n"; + } + + } else if (discrep_found) { + + // Track possible shards to use as authoritative, if needed + + // There are errors, without identifying the shard + object_errors.insert(srd); + errstream << m_pg_id << " soid " << ho << " : " << ss.str() << "\n"; + + } else { + + // XXX: The auth shard might get here that we don't know + // that it has the "correct" data. + auth_list.push_back(srd); + } + + } else { + + this_chunk->cur_missing.insert(srd); + auth_sel.shard_map[srd].set_missing(); + auth_sel.shard_map[srd].primary = (srd == m_pg_whoami); + + // Can't have any other errors if there is no information available + this_chunk->m_error_counts.shallow_errors++; + errstream << m_pg_id << " shard " << srd << " " << ho << " : missing\n"; + } + obj_result.add_shard(srd, auth_sel.shard_map[srd]); + + dout(20) << __func__ << ": (debug) soid " << ho << " : " << errstream.str() + << dendl; + } + + dout(15) << fmt::format("{}: auth_list: {} #: {}; obj-errs#: {}", + __func__, + auth_list, + auth_list.size(), + object_errors.size()) + << dendl; + return {auth_list, object_errors}; +} + +// == PGBackend::be_compare_scrub_objects() +bool ScrubBackend::compare_obj_details(pg_shard_t auth_shard, + const ScrubMap::object& auth, + const object_info_t& auth_oi, + const ScrubMap::object& candidate, + shard_info_wrapper& shard_result, + inconsistent_obj_wrapper& obj_result, + stringstream& errstream, + bool has_snapset) +{ + fmt::memory_buffer out; + bool error{false}; + + // ------------------------------------------------------------------------ + + if (auth.digest_present && candidate.digest_present && + auth.digest != candidate.digest) { + fmt::format_to(std::back_inserter(out), + "data_digest {:#x} != data_digest {:#x} from shard {}", + candidate.digest, + auth.digest, + auth_shard); + error = true; + obj_result.set_data_digest_mismatch(); + } + + if (auth.omap_digest_present && candidate.omap_digest_present && + auth.omap_digest != candidate.omap_digest) { + fmt::format_to(std::back_inserter(out), + "{}omap_digest {:#x} != omap_digest {:#x} from shard {}", + sep(error), + candidate.omap_digest, + auth.omap_digest, + auth_shard); + obj_result.set_omap_digest_mismatch(); + } + + // for replicated: + if (m_is_replicated) { + if (auth_oi.is_data_digest() && candidate.digest_present && + auth_oi.data_digest != candidate.digest) { + fmt::format_to(std::back_inserter(out), + "{}data_digest {:#x} != data_digest {:#x} from auth oi {}", + sep(error), + candidate.digest, + auth_oi.data_digest, + auth_oi); + shard_result.set_data_digest_mismatch_info(); + } + + // for replicated: + if (auth_oi.is_omap_digest() && candidate.omap_digest_present && + auth_oi.omap_digest != candidate.omap_digest) { + fmt::format_to(std::back_inserter(out), + "{}omap_digest {:#x} != omap_digest {:#x} from auth oi {}", + sep(error), + candidate.omap_digest, + auth_oi.omap_digest, + auth_oi); + shard_result.set_omap_digest_mismatch_info(); + } + } + + // ------------------------------------------------------------------------ + + if (candidate.stat_error) { + if (error) { + errstream << fmt::to_string(out); + } + return error; + } + + // ------------------------------------------------------------------------ + + if (!shard_result.has_info_missing() && !shard_result.has_info_corrupted()) { + + auto can_attr = candidate.attrs.find(OI_ATTR); + ceph_assert(can_attr != candidate.attrs.end()); + bufferlist can_bl; + can_bl.push_back(can_attr->second); + + auto auth_attr = auth.attrs.find(OI_ATTR); + ceph_assert(auth_attr != auth.attrs.end()); + bufferlist auth_bl; + auth_bl.push_back(auth_attr->second); + + if (!can_bl.contents_equal(auth_bl)) { + fmt::format_to(std::back_inserter(out), + "{}object info inconsistent ", + sep(error)); + obj_result.set_object_info_inconsistency(); + } + } + + if (has_snapset) { + if (!shard_result.has_snapset_missing() && + !shard_result.has_snapset_corrupted()) { + + auto can_attr = candidate.attrs.find(SS_ATTR); + ceph_assert(can_attr != candidate.attrs.end()); + bufferlist can_bl; + can_bl.push_back(can_attr->second); + + auto auth_attr = auth.attrs.find(SS_ATTR); + ceph_assert(auth_attr != auth.attrs.end()); + bufferlist auth_bl; + auth_bl.push_back(auth_attr->second); + + if (!can_bl.contents_equal(auth_bl)) { + fmt::format_to(std::back_inserter(out), + "{}snapset inconsistent ", + sep(error)); + obj_result.set_snapset_inconsistency(); + } + } + } + + // ------------------------------------------------------------------------ + + if (!m_is_replicated) { + if (!shard_result.has_hinfo_missing() && + !shard_result.has_hinfo_corrupted()) { + + auto can_hi = candidate.attrs.find(ECUtil::get_hinfo_key()); + ceph_assert(can_hi != candidate.attrs.end()); + bufferlist can_bl; + can_bl.push_back(can_hi->second); + + auto auth_hi = auth.attrs.find(ECUtil::get_hinfo_key()); + ceph_assert(auth_hi != auth.attrs.end()); + bufferlist auth_bl; + auth_bl.push_back(auth_hi->second); + + if (!can_bl.contents_equal(auth_bl)) { + fmt::format_to(std::back_inserter(out), + "{}hinfo inconsistent ", + sep(error)); + obj_result.set_hinfo_inconsistency(); + } + } + } + + // ------------------------------------------------------------------------ + + // sizes: + + uint64_t oi_size = logical_to_ondisk_size(auth_oi.size); + if (oi_size != candidate.size) { + fmt::format_to(std::back_inserter(out), + "{}size {} != size {} from auth oi {}", + sep(error), + candidate.size, + oi_size, + auth_oi); + shard_result.set_size_mismatch_info(); + } + + if (auth.size != candidate.size) { + fmt::format_to(std::back_inserter(out), + "{}size {} != size {} from shard {}", + sep(error), + candidate.size, + auth.size, + auth_shard); + obj_result.set_size_mismatch(); + } + + // If the replica is too large and we didn't already count it for this object + + if (candidate.size > m_conf->osd_max_object_size && + !obj_result.has_size_too_large()) { + + fmt::format_to(std::back_inserter(out), + "{}size {} > {} is too large", + sep(error), + candidate.size, + m_conf->osd_max_object_size); + obj_result.set_size_too_large(); + } + + // ------------------------------------------------------------------------ + + // comparing the attributes: + + for (const auto& [k, v] : auth.attrs) { + if (k == OI_ATTR || k[0] != '_') { + // We check system keys separately + continue; + } + + auto cand = candidate.attrs.find(k); + if (cand == candidate.attrs.end()) { + fmt::format_to(std::back_inserter(out), + "{}attr name mismatch '{}'", + sep(error), + k); + obj_result.set_attr_name_mismatch(); + } else if (cand->second.cmp(v)) { + fmt::format_to(std::back_inserter(out), + "{}attr value mismatch '{}'", + sep(error), + k); + obj_result.set_attr_value_mismatch(); + } + } + + for (const auto& [k, v] : candidate.attrs) { + if (k == OI_ATTR || k[0] != '_') { + // We check system keys separately + continue; + } + + auto in_auth = auth.attrs.find(k); + if (in_auth == auth.attrs.end()) { + fmt::format_to(std::back_inserter(out), + "{}attr name mismatch '{}'", + sep(error), + k); + obj_result.set_attr_name_mismatch(); + } + } + + if (error) { + errstream << fmt::to_string(out); + } + return error; +} + +static inline bool doing_clones( + const std::optional& snapset, + const vector::reverse_iterator& curclone) +{ + return snapset && curclone != snapset->clones.rend(); +} + +// ///////////////////////////////////////////////////////////////////////////// +// +// final checking & fixing - scrub_snapshot_metadata() +// +// ///////////////////////////////////////////////////////////////////////////// + +/* + * Validate consistency of the object info and snap sets. + * + * We are sort of comparing 2 lists. The main loop is on objmap.objects. But + * the comparison of the objects is against multiple snapset.clones. There are + * multiple clone lists and in between lists we expect head. + * + * Example + * + * objects expected + * ======= ======= + * obj1 snap 1 head, unexpected obj1 snap 1 + * obj2 head head, match + * [SnapSet clones 6 4 2 1] + * obj2 snap 7 obj2 snap 6, unexpected obj2 snap 7 + * obj2 snap 6 obj2 snap 6, match + * obj2 snap 4 obj2 snap 4, match + * obj3 head obj2 snap 2 (expected), obj2 snap 1 (expected), match + * [Snapset clones 3 1] + * obj3 snap 3 obj3 snap 3 match + * obj3 snap 1 obj3 snap 1 match + * obj4 head head, match + * [Snapset clones 4] + * EOL obj4 snap 4, (expected) + */ +void ScrubBackend::scrub_snapshot_metadata(ScrubMap& map) +{ + dout(10) << __func__ << " num stat obj " + << m_pg.get_pg_info(ScrubberPasskey{}).stats.stats.sum.num_objects + << dendl; + + std::optional all_clones; // Unspecified snapid_t or std::nullopt + + // traverse in reverse order. + std::optional head; + std::optional snapset; // If initialized so will head (above) + vector::reverse_iterator + curclone; // Defined only if snapset initialized + int missing = 0; + inconsistent_snapset_wrapper soid_error, head_error; + int soid_error_count = 0; + + for (auto p = map.objects.rbegin(); p != map.objects.rend(); ++p) { + + const hobject_t& soid = p->first; + ceph_assert(!soid.is_snapdir()); + soid_error = inconsistent_snapset_wrapper{soid}; + object_stat_sum_t stat; + + stat.num_objects++; + + if (soid.nspace == m_conf->osd_hit_set_namespace) + stat.num_objects_hit_set_archive++; + + if (soid.is_snap()) { + // it's a clone + stat.num_object_clones++; + } + + // basic checks. + std::optional oi; + if (!p->second.attrs.count(OI_ATTR)) { + oi = std::nullopt; + clog.error() << m_mode_desc << " " << m_pg_id << " " << soid + << " : no '" << OI_ATTR << "' attr"; + this_chunk->m_error_counts.shallow_errors++; + soid_error.set_info_missing(); + } else { + bufferlist bv; + bv.push_back(p->second.attrs[OI_ATTR]); + try { + oi = object_info_t(bv); + } catch (ceph::buffer::error& e) { + oi = std::nullopt; + clog.error() << m_mode_desc << " " << m_pg_id << " " << soid + << " : can't decode '" << OI_ATTR << "' attr " + << e.what(); + this_chunk->m_error_counts.shallow_errors++; + soid_error.set_info_corrupted(); + soid_error.set_info_missing(); // Not available too + } + } + + if (oi) { + if (logical_to_ondisk_size(oi->size) != p->second.size) { + clog.error() << m_mode_desc << " " << m_pg_id << " " << soid + << " : on disk size (" << p->second.size + << ") does not match object info size (" << oi->size + << ") adjusted for ondisk to (" + << logical_to_ondisk_size(oi->size) << ")"; + soid_error.set_size_mismatch(); + this_chunk->m_error_counts.shallow_errors++; + } + + dout(20) << m_mode_desc << " " << soid << " " << *oi << dendl; + + // A clone num_bytes will be added later when we have snapset + if (!soid.is_snap()) { + stat.num_bytes += oi->size; + } + if (soid.nspace == m_conf->osd_hit_set_namespace) + stat.num_bytes_hit_set_archive += oi->size; + + if (oi->is_dirty()) + ++stat.num_objects_dirty; + if (oi->is_whiteout()) + ++stat.num_whiteouts; + if (oi->is_omap()) + ++stat.num_objects_omap; + if (oi->is_cache_pinned()) + ++stat.num_objects_pinned; + if (oi->has_manifest()) + ++stat.num_objects_manifest; + } + + // Check for any problems while processing clones + if (doing_clones(snapset, curclone)) { + std::optional target; + // Expecting an object with snap for current head + if (soid.has_snapset() || soid.get_head() != head->get_head()) { + + dout(10) << __func__ << " " << m_mode_desc << " " << m_pg_id + << " new object " << soid << " while processing " << *head + << dendl; + + target = all_clones; + } else { + ceph_assert(soid.is_snap()); + target = soid.snap; + } + + // Log any clones we were expecting to be there up to target + // This will set missing, but will be a no-op if snap.soid == *curclone. + missing += + process_clones_to(head, snapset, target, &curclone, head_error); + } + + bool expected; + // Check doing_clones() again in case we ran process_clones_to() + if (doing_clones(snapset, curclone)) { + // A head would have processed all clones above + // or all greater than *curclone. + ceph_assert(soid.is_snap() && *curclone <= soid.snap); + + // After processing above clone snap should match the expected curclone + expected = (*curclone == soid.snap); + } else { + // If we aren't doing clones any longer, then expecting head + expected = soid.has_snapset(); + } + if (!expected) { + // If we couldn't read the head's snapset, just ignore clones + if (head && !snapset) { + clog.error() << m_mode_desc << " " << m_pg_id + << " " << soid + << " : clone ignored due to missing snapset"; + } else { + clog.error() << m_mode_desc << " " << m_pg_id << " " << soid + << " : is an unexpected clone"; + } + this_chunk->m_error_counts.shallow_errors++; + soid_error.set_headless(); + this_chunk->m_inconsistent_objs.push_back(std::move(soid_error)); + ++soid_error_count; + if (head && soid.get_head() == head->get_head()) + head_error.set_clone(soid.snap); + continue; + } + + // new snapset? + if (soid.has_snapset()) { + + if (missing) { + log_missing(missing, head, __func__); + } + + // Save previous head error information + if (head && (head_error.errors || soid_error_count)) { + this_chunk->m_inconsistent_objs.push_back(std::move(head_error)); + } + + // Set this as a new head object + head = soid; + missing = 0; + head_error = soid_error; + soid_error_count = 0; + + dout(20) << __func__ << " " << m_mode_desc << " new head " << head + << dendl; + + if (p->second.attrs.count(SS_ATTR) == 0) { + clog.error() << m_mode_desc << " " << m_pg_id << " " << soid + << " : no '" << SS_ATTR << "' attr"; + this_chunk->m_error_counts.shallow_errors++; + snapset = std::nullopt; + head_error.set_snapset_missing(); + } else { + bufferlist bl; + bl.push_back(p->second.attrs[SS_ATTR]); + auto blp = bl.cbegin(); + try { + snapset = SnapSet(); // Initialize optional<> before decoding into it + decode(*snapset, blp); + head_error.ss_bl.push_back(p->second.attrs[SS_ATTR]); + } catch (ceph::buffer::error& e) { + snapset = std::nullopt; + clog.error() << m_mode_desc << " " << m_pg_id << " " << soid + << " : can't decode '" << SS_ATTR << "' attr " + << e.what(); + this_chunk->m_error_counts.shallow_errors++; + head_error.set_snapset_corrupted(); + } + } + + if (snapset) { + // what will be next? + curclone = snapset->clones.rbegin(); + + if (!snapset->clones.empty()) { + dout(20) << " snapset " << *snapset << dendl; + if (snapset->seq == 0) { + clog.error() << m_mode_desc << " " << m_pg_id << " " << soid + << " : snaps.seq not set"; + this_chunk->m_error_counts.shallow_errors++; + head_error.set_snapset_error(); + } + } + } + } else { + ceph_assert(soid.is_snap()); + ceph_assert(head); + ceph_assert(snapset); + ceph_assert(soid.snap == *curclone); + + dout(20) << __func__ << " " << m_mode_desc << " matched clone " << soid + << dendl; + + if (snapset->clone_size.count(soid.snap) == 0) { + clog.error() << m_mode_desc << " " << m_pg_id << " " << soid + << " : is missing in clone_size"; + this_chunk->m_error_counts.shallow_errors++; + soid_error.set_size_mismatch(); + } else { + if (oi && oi->size != snapset->clone_size[soid.snap]) { + clog.error() << m_mode_desc << " " << m_pg_id << " " << soid + << " : size " << oi->size << " != clone_size " + << snapset->clone_size[*curclone]; + this_chunk->m_error_counts.shallow_errors++; + soid_error.set_size_mismatch(); + } + + if (snapset->clone_overlap.count(soid.snap) == 0) { + clog.error() << m_mode_desc << " " << m_pg_id << " " << soid + << " : is missing in clone_overlap"; + this_chunk->m_error_counts.shallow_errors++; + soid_error.set_size_mismatch(); + } else { + // This checking is based on get_clone_bytes(). The first 2 asserts + // can't happen because we know we have a clone_size and + // a clone_overlap. Now we check that the interval_set won't + // cause the last assert. + uint64_t size = snapset->clone_size.find(soid.snap)->second; + const interval_set& overlap = + snapset->clone_overlap.find(soid.snap)->second; + bool bad_interval_set = false; + for (interval_set::const_iterator i = overlap.begin(); + i != overlap.end(); + ++i) { + if (size < i.get_len()) { + bad_interval_set = true; + break; + } + size -= i.get_len(); + } + + if (bad_interval_set) { + clog.error() << m_mode_desc << " " << m_pg_id << " " << soid + << " : bad interval_set in clone_overlap"; + this_chunk->m_error_counts.shallow_errors++; + soid_error.set_size_mismatch(); + } else { + stat.num_bytes += snapset->get_clone_bytes(soid.snap); + } + } + } + + // what's next? + ++curclone; + if (soid_error.errors) { + this_chunk->m_inconsistent_objs.push_back(std::move(soid_error)); + ++soid_error_count; + } + } + m_scrubber.add_to_stats(stat); + } + + if (doing_clones(snapset, curclone)) { + dout(10) << __func__ << " " << m_mode_desc << " " << m_pg_id + << " No more objects while processing " << *head << dendl; + + missing += + process_clones_to(head, snapset, all_clones, &curclone, head_error); + } + + // There could be missing found by the test above or even + // before dropping out of the loop for the last head. + + if (missing) { + log_missing(missing, head, __func__); + } + if (head && (head_error.errors || soid_error_count)) { + this_chunk->m_inconsistent_objs.push_back(std::move(head_error)); + } + + // fix data/omap digests + m_scrubber.submit_digest_fixes(this_chunk->missing_digest); + + dout(10) << __func__ << " (" << m_mode_desc << ") finish" << dendl; +} + +int ScrubBackend::process_clones_to( + const std::optional& head, + const std::optional& snapset, + std::optional target, + vector::reverse_iterator* curclone, + inconsistent_snapset_wrapper& e) +{ + ceph_assert(head); + ceph_assert(snapset); + int missing_count = 0; + + // NOTE: clones are in descending order, thus **curclone > target test here + hobject_t next_clone(*head); + while (doing_clones(snapset, *curclone) && + (!target || **curclone > *target)) { + + ++missing_count; + // it is okay to be missing one or more clones in a cache tier. + // skip higher-numbered clones in the list. + if (!m_incomplete_clones_allowed) { + next_clone.snap = **curclone; + clog.error() << m_mode_desc << " " << m_pg_id << " " << *head + << " : expected clone " << next_clone << " " << m_missing + << " missing"; + this_chunk->m_error_counts.shallow_errors++; + e.set_clone_missing(next_clone.snap); + } + // Clones are descending + ++(*curclone); + } + return missing_count; +} + +void ScrubBackend::log_missing(int missing, + const std::optional& head, + const char* logged_func_name) +{ + ceph_assert(head); + if (m_incomplete_clones_allowed) { + dout(20) << logged_func_name << " " << m_mode_desc << " " << m_pg_id << " " + << *head << " skipped " << missing << " clone(s) in cache tier" + << dendl; + } else { + clog.info() << m_mode_desc << " " << m_pg_id << " " << *head << " : " + << missing << " missing clone(s)"; + } +} + + +// //////////////////////////////////////////////////////////////////////////////// + +std::vector ScrubBackend::scan_snaps( + ScrubMap& smap, + SnapMapReaderI& snaps_getter) +{ + std::vector out_orders; + hobject_t head; + SnapSet snapset; + + // Test qa/standalone/scrub/osd-scrub-snaps.sh greps for the strings + // in this function + dout(15) << "_scan_snaps starts" << dendl; + + for (auto i = smap.objects.rbegin(); i != smap.objects.rend(); ++i) { + + const hobject_t& hoid = i->first; + ScrubMap::object& o = i->second; + + dout(20) << __func__ << " " << hoid << dendl; + + ceph_assert(!hoid.is_snapdir()); + + if (hoid.is_head()) { + // parse the SnapSet + bufferlist bl; + if (o.attrs.find(SS_ATTR) == o.attrs.end()) { + // no snaps for this head + continue; + } + bl.push_back(o.attrs[SS_ATTR]); + auto p = bl.cbegin(); + try { + decode(snapset, p); + } catch (...) { + dout(20) << fmt::format("{}: failed to decode the snapset ({})", + __func__, + hoid) + << dendl; + continue; + } + head = hoid.get_head(); + continue; + } + + /// \todo document why guaranteed to have initialized 'head' at this point + + if (hoid.snap < CEPH_MAXSNAP) { + + if (hoid.get_head() != head) { + derr << __func__ << " no head for " << hoid << " (have " << head << ")" + << dendl; + continue; + } + + // the 'hoid' is a clone hoid at this point. The 'snapset' below was taken + // from the corresponding head hoid. + auto maybe_fix_order = scan_object_snaps(hoid, snapset, snaps_getter); + if (maybe_fix_order) { + out_orders.push_back(std::move(*maybe_fix_order)); + } + } + } + + dout(15) << __func__ << " " << out_orders.size() << " fix orders" << dendl; + return out_orders; +} + +std::optional ScrubBackend::scan_object_snaps( + const hobject_t& hoid, + const SnapSet& snapset, + SnapMapReaderI& snaps_getter) +{ + using result_t = Scrub::SnapMapReaderI::result_t; + dout(15) << fmt::format("{}: obj:{} snapset:{}", __func__, hoid, snapset) + << dendl; + + auto p = snapset.clone_snaps.find(hoid.snap); + if (p == snapset.clone_snaps.end()) { + derr << __func__ << " no clone_snaps for " << hoid << " in " << snapset + << dendl; + return std::nullopt; + } + set obj_snaps{p->second.begin(), p->second.end()}; + + // clang-format off + + // validate both that the mapper contains the correct snaps for the object + // and that it is internally consistent. + // possible outcomes: + // + // Error scenarios: + // - SnapMapper index of object snaps does not match that stored in head + // object snapset attribute: + // we should delete the snapmapper entry and re-add it. + // - no mapping found for the object's snaps: + // we should add the missing mapper entries. + // - the snapmapper set for this object is internally inconsistent (e.g. + // the OBJ_ entries do not match the SNA_ entries). We remove + // whatever entries are there, and redo the DB content for this object. + // + // And + // There is the "happy path": cur_snaps == obj_snaps. Nothing to do there. + + // clang-format on + + auto cur_snaps = snaps_getter.get_snaps_check_consistency(hoid); + if (!cur_snaps) { + switch (auto e = cur_snaps.error(); e.code) { + case result_t::code_t::backend_error: + derr << __func__ << ": get_snaps returned " + << cpp_strerror(e.backend_error) << " for " << hoid << dendl; + ceph_abort(); + case result_t::code_t::not_found: + dout(10) << __func__ << ": no snaps for " << hoid << ". Adding." + << dendl; + return snap_mapper_fix_t{snap_mapper_op_t::add, hoid, obj_snaps, {}}; + case result_t::code_t::inconsistent: + dout(10) << __func__ << ": inconsistent snapmapper data for " << hoid + << ". Recreating." << dendl; + return snap_mapper_fix_t{ + snap_mapper_op_t::overwrite, hoid, obj_snaps, {}}; + default: + dout(10) << __func__ << ": error (" << cpp_strerror(e.backend_error) + << ") fetching snapmapper data for " << hoid << ". Recreating." + << dendl; + return snap_mapper_fix_t{ + snap_mapper_op_t::overwrite, hoid, obj_snaps, {}}; + } + __builtin_unreachable(); + } + + if (*cur_snaps == obj_snaps) { + dout(20) << fmt::format( + "{}: {}: snapset match SnapMapper's ({})", __func__, hoid, + obj_snaps) + << dendl; + return std::nullopt; + } + + // add this object to the list of snapsets that needs fixing. Note + // that we also collect the existing (bogus) list, for logging purposes + dout(20) << fmt::format( + "{}: obj {}: was: {} updating to: {}", __func__, hoid, + *cur_snaps, obj_snaps) + << dendl; + return snap_mapper_fix_t{ + snap_mapper_op_t::update, hoid, obj_snaps, *cur_snaps}; +} + +/* + * Building a map of objects suitable for snapshot validation. + * + * We are moving all "full" clone sets, i.e. the head and (preceding it, as + * snapshots precede the head entry) the clone entries, into 'for_meta_scrub'. + * That collection, not containing partial items, will be scrubbed by + * scrub_snapshot_metadata(). + * + * What's left in m_cleaned_meta_map is the leftover partial items that need to + * be completed before they can be processed. + */ +ScrubMap ScrubBackend::clean_meta_map(ScrubMap& cleaned, bool max_reached) +{ + ScrubMap for_meta_scrub; + + if (max_reached || cleaned.objects.empty()) { + cleaned.swap(for_meta_scrub); + } else { + auto iter = cleaned.objects.end(); + --iter; // not empty, see 'if' clause + auto begin = cleaned.objects.begin(); + if (iter->first.has_snapset()) { + ++iter; + } else { + while (iter != begin) { + auto next = iter--; + if (next->first.get_head() != iter->first.get_head()) { + ++iter; + break; + } + } + } + for_meta_scrub.objects.insert(begin, iter); + cleaned.objects.erase(begin, iter); + } + + return for_meta_scrub; +} diff --git a/src/osd/scrubber/scrub_backend.h b/src/osd/scrubber/scrub_backend.h new file mode 100644 index 000000000..ffb41c27e --- /dev/null +++ b/src/osd/scrubber/scrub_backend.h @@ -0,0 +1,554 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#pragma once + +// clang-format off +/* + +------------------------+ + | | + | PgScrubber | + | |-----------------------------+ + | | | + +------------------------+ | ownes & uses + | PrimaryLogScrub | | + +------------------------+ | + | + | + v + +-------------------------------------------+ + |ScrubBackend | + +----------------+ |============ | + | this_chunk | | | + | (scrub_chunk_t)|<-------| + decode_received_map() | + +----------------+ | + scrub_compare_maps() | + | + scan_snaps() | + | ..... | + | | + | | + +--------------------/-------------\--------+ + --/ / \ + --/ | | + --/ / \ + -/ uses | uses | + uses --/ / \ + --/ / | + --/ | \ + v v v + PgBackend PG/PrimaryLogPG OSD Services + + +*/ +// clang-format on + +#include +#include + +#include + +#include "common/LogClient.h" +#include "osd/OSDMap.h" +#include "osd/osd_types_fmt.h" +#include "osd/scrubber_common.h" +#include "osd/SnapMapReaderI.h" + +struct ScrubMap; + +class PG; +class PgScrubber; +struct PGPool; +using Scrub::PgScrubBeListener; + +using data_omap_digests_t = + std::pair, std::optional>; + +/// a list of fixes to be performed on objects' digests +using digests_fixes_t = std::vector>; + +using shard_info_map_t = std::map; +using shard_to_scrubmap_t = std::map; + +using auth_peers_t = std::vector>; + +using wrapped_err_t = + std::variant; +using inconsistent_objs_t = std::vector; + +/// omap-specific stats +struct omap_stat_t { + int large_omap_objects{0}; + int64_t omap_bytes{0}; + int64_t omap_keys{0}; +}; + +struct error_counters_t { + int shallow_errors{0}; + int deep_errors{0}; +}; + +// the PgScrubber services used by the backend +struct ScrubBeListener { + virtual std::ostream& gen_prefix(std::ostream& out) const = 0; + virtual CephContext* get_pg_cct() const = 0; + virtual LoggerSinkSet& get_logger() const = 0; + virtual bool is_primary() const = 0; + virtual spg_t get_pgid() const = 0; + virtual const OSDMapRef& get_osdmap() const = 0; + virtual void add_to_stats(const object_stat_sum_t& stat) = 0; + virtual void submit_digest_fixes(const digests_fixes_t& fixes) = 0; + virtual ~ScrubBeListener() = default; +}; + +// As the main scrub-backend entry point - scrub_compare_maps() - must +// be able to return both a list of snap fixes and a list of inconsistent +// objects: +struct objs_fix_list_t { + inconsistent_objs_t inconsistent_objs; + std::vector snap_fix_list; +}; + +/** + * A structure used internally by select_auth_object() + * + * Conveys the usability of a specific shard as an auth source. + */ +struct shard_as_auth_t { + // note: 'not_found' differs from 'not_usable' in that 'not_found' + // does not carry an error message to be cluster-logged. + enum class usable_t : uint8_t { not_usable, not_found, usable }; + + // the ctor used when the shard should not be considered as auth + explicit shard_as_auth_t(std::string err_msg) + : possible_auth{usable_t::not_usable} + , error_text{err_msg} + , oi{} + , auth_iter{} + , digest{std::nullopt} + {} + + // the object cannot be found on the shard + explicit shard_as_auth_t() + : possible_auth{usable_t::not_found} + , error_text{} + , oi{} + , auth_iter{} + , digest{std::nullopt} + {} + + shard_as_auth_t(std::string err_msg, std::optional data_digest) + : possible_auth{usable_t::not_usable} + , error_text{err_msg} + , oi{} + , auth_iter{} + , digest{data_digest} + {} + + // possible auth candidate + shard_as_auth_t(const object_info_t& anoi, + shard_to_scrubmap_t::iterator it, + std::string err_msg, + std::optional data_digest) + : possible_auth{usable_t::usable} + , error_text{err_msg} + , oi{anoi} + , auth_iter{it} + , digest{data_digest} + {} + + + usable_t possible_auth; + std::string error_text; + object_info_t oi; + shard_to_scrubmap_t::iterator auth_iter; + std::optional digest; + // when used for Crimson, we'll probably want to return 'digest_match' (and + // other in/out arguments) via this struct +}; + +// the format specifier {D} is used to request debug output +template <> +struct fmt::formatter { + template + constexpr auto parse(ParseContext& ctx) + { + auto it = ctx.begin(); + if (it != ctx.end()) { + debug_log = (*it++) == 'D'; + } + return it; + } + template + auto format(shard_as_auth_t const& as_auth, FormatContext& ctx) + { + if (debug_log) { + // note: 'if' chain, as hard to consistently (on all compilers) avoid some + // warnings for a switch plus multiple return paths + if (as_auth.possible_auth == shard_as_auth_t::usable_t::not_usable) { + return fmt::format_to(ctx.out(), + "{{shard-not-usable:{}}}", + as_auth.error_text); + } + if (as_auth.possible_auth == shard_as_auth_t::usable_t::not_found) { + return fmt::format_to(ctx.out(), "{{shard-not-found}}"); + } + return fmt::format_to(ctx.out(), + "{{shard-usable: soid:{} {{txt:{}}} }}", + as_auth.oi.soid, + as_auth.error_text); + + } else { + return fmt::format_to( + ctx.out(), + "usable:{} soid:{} {{txt:{}}}", + (as_auth.possible_auth == shard_as_auth_t::usable_t::usable) ? "yes" + : "no", + as_auth.oi.soid, + as_auth.error_text); + } + } + + bool debug_log{false}; +}; + +struct auth_selection_t { + shard_to_scrubmap_t::iterator auth; ///< an iter into one of this_chunk->maps + pg_shard_t auth_shard; // set to auth->first + object_info_t auth_oi; + shard_info_map_t shard_map; + bool is_auth_available{false}; ///< managed to select an auth' source? + bool digest_match{true}; ///< do all (existing) digests match? +}; + +// note: some scrub tests are sensitive to the specific format of +// auth_selection_t listing in the logs +template <> +struct fmt::formatter { + template + constexpr auto parse(ParseContext& ctx) + { + return ctx.begin(); + } + + template + auto format(auth_selection_t const& aus, FormatContext& ctx) + { + return fmt::format_to(ctx.out(), + " {{AU-S: {}->{:x} OI({:x}:{}) {} dm:{}}} ", + aus.auth->first, + (uint64_t)(&aus.auth->second), + (uint64_t)(&aus.auth_oi), + aus.auth_oi, + aus.shard_map.size(), + aus.digest_match); + } +}; + +/** + * the back-end data that is per-chunk + * + * Created by the Scrubber after all replicas' maps have arrived. + */ +struct scrub_chunk_t { + + explicit scrub_chunk_t(pg_shard_t i_am) { received_maps[i_am] = ScrubMap{}; } + + /// the working set of scrub maps: the received maps, plus + /// Primary's own map. + std::map received_maps; + + /// a collection of all objs mentioned in the maps + std::set authoritative_set; + + utime_t started{ceph_clock_now()}; + + digests_fixes_t missing_digest; + + /// Map from object with errors to good peers + std::map> authoritative; + + inconsistent_objs_t m_inconsistent_objs; + + /// shallow/deep error counters + error_counters_t m_error_counts; + + // these must be reset for each element: + + std::set cur_missing; + std::set cur_inconsistent; + bool fix_digest{false}; +}; + + +/** + * ScrubBackend wraps the data and operations required for the back-end part of + * the scrubbing (i.e. for comparing the maps and fixing objects). + * + * Created anew upon each initiation of a scrub session. + */ +class ScrubBackend { + public: + // Primary constructor + ScrubBackend(ScrubBeListener& scrubber, + PgScrubBeListener& pg, + pg_shard_t i_am, + bool repair, + scrub_level_t shallow_or_deep, + const std::set& acting); + + // Replica constructor: no primary map + ScrubBackend(ScrubBeListener& scrubber, + PgScrubBeListener& pg, + pg_shard_t i_am, + bool repair, + scrub_level_t shallow_or_deep); + + friend class PgScrubber; + friend class TestScrubBackend; + + /** + * reset the per-chunk data structure (scrub_chunk_t). + * Create an empty scrub-map for this shard, and place it + * in the appropriate entry in 'received_maps'. + * + * @returns a pointer to the newly created ScrubMap. + */ + void new_chunk(); + + ScrubMap& get_primary_scrubmap(); + + /** + * sets Backend's m_repair flag (setting m_mode_desc to a corresponding + * string) + */ + void update_repair_status(bool should_repair); + + std::vector replica_clean_meta( + ScrubMap& smap, + bool max_reached, + const hobject_t& start, + Scrub::SnapMapReaderI& snaps_getter); + + /** + * decode the arriving MOSDRepScrubMap message, placing the replica's + * scrub-map into received_maps[from]. + * + * @param from replica + */ + void decode_received_map(pg_shard_t from, const MOSDRepScrubMap& msg); + + objs_fix_list_t scrub_compare_maps(bool max_reached, + Scrub::SnapMapReaderI& snaps_getter); + + int scrub_process_inconsistent(); + + const omap_stat_t& this_scrub_omapstats() const { return m_omap_stats; } + + int authoritative_peers_count() const { return m_auth_peers.size(); }; + + std::ostream& logger_prefix(std::ostream* _dout, const ScrubBackend* t); + + private: + // set/constructed at the ctor(): + ScrubBeListener& m_scrubber; + Scrub::PgScrubBeListener& m_pg; + const pg_shard_t m_pg_whoami; + bool m_repair; + const scrub_level_t m_depth; + const spg_t m_pg_id; + std::vector m_acting_but_me; // primary only + bool m_is_replicated{true}; + std::string_view m_mode_desc; + std::string m_formatted_id; + const PGPool& m_pool; + bool m_incomplete_clones_allowed{false}; + + /// collecting some scrub-session-wide omap stats + omap_stat_t m_omap_stats; + + /// Mapping from object with errors to good peers + std::map m_auth_peers; + + // shorthands: + ConfigProxy& m_conf; + LoggerSinkSet& clog; + + private: + + struct auth_and_obj_errs_t { + std::list auth_list; + std::set object_errors; + }; + + std::optional this_chunk; + + /// Maps from objects with errors to missing peers + HobjToShardSetMapping m_missing; // used by scrub_process_inconsistent() + + /// Maps from objects with errors to inconsistent peers + HobjToShardSetMapping m_inconsistent; // used by scrub_process_inconsistent() + + /// Cleaned std::map pending snap metadata scrub + ScrubMap m_cleaned_meta_map{}; + + /// a reference to the primary map + ScrubMap& my_map(); + + /// shallow/deep error counters + error_counters_t get_error_counts() const { return this_chunk->m_error_counts; } + + /** + * merge_to_authoritative_set() updates + * - this_chunk->maps[from] with the replicas' scrub-maps; + * - this_chunk->authoritative_set as a union of all the maps' objects; + */ + void merge_to_authoritative_set(); + + // note: used by both Primary & replicas + static ScrubMap clean_meta_map(ScrubMap& cleaned, bool max_reached); + + void compare_smaps(); + + /// might return error messages to be cluster-logged + std::optional compare_obj_in_maps(const hobject_t& ho); + + void omap_checks(); + + std::optional for_empty_auth_list( + std::list&& auths, + std::set&& obj_errors, + shard_to_scrubmap_t::iterator auth, + const hobject_t& ho, + std::stringstream& errstream); + + auth_and_obj_errs_t match_in_shards(const hobject_t& ho, + auth_selection_t& auth_sel, + inconsistent_obj_wrapper& obj_result, + std::stringstream& errstream); + + // returns: true if a discrepancy was found + bool compare_obj_details(pg_shard_t auth_shard, + const ScrubMap::object& auth, + const object_info_t& auth_oi, + const ScrubMap::object& candidate, + shard_info_wrapper& shard_result, + inconsistent_obj_wrapper& obj_result, + std::stringstream& errorstream, + bool has_snapset); + + void repair_object(const hobject_t& soid, + const auth_peers_t& ok_peers, + const std::set& bad_peers); + + /** + * An auxiliary used by select_auth_object() to test a specific shard + * as a possible auth candidate. + * @param ho the hobject for which we are looking for an auth source + * @param srd the candidate shard + * @param shard_map [out] a collection of shard_info-s per shard. + * possible_auth_shard() might set error flags in the relevant (this shard's) + * entry. + */ + shard_as_auth_t possible_auth_shard(const hobject_t& ho, + const pg_shard_t& srd, + shard_info_map_t& shard_map); + + auth_selection_t select_auth_object(const hobject_t& ho, + std::stringstream& errstream); + + + enum class digest_fixing_t { no, if_aged, force }; + + /* + * an aux used by inconsistents() to determine whether to fix the digest + */ + [[nodiscard]] digest_fixing_t should_fix_digest( + const hobject_t& ho, + const ScrubMap::object& auth_object, + const object_info_t& auth_oi, + bool repair_flag, + std::stringstream& errstream); + + void inconsistents(const hobject_t& ho, + ScrubMap::object& auth_object, + object_info_t& auth_oi, // consider moving to object + auth_and_obj_errs_t&& auth_n_errs, + std::stringstream& errstream); + + int process_clones_to(const std::optional& head, + const std::optional& snapset, + std::optional target, + std::vector::reverse_iterator* curclone, + inconsistent_snapset_wrapper& e); + + /** + * Validate consistency of the object info and snap sets. + */ + void scrub_snapshot_metadata(ScrubMap& map); + + /** + * Updates the "global" (i.e. - not 'per-chunk') databases: + * - in m_authoritative: a list of good peers for each "problem" object in + * the current chunk; + * - in m_cleaned_meta_map: a "cleaned" version of the object (the one from + * the selected shard). + */ + void update_authoritative(); + + void log_missing(int missing, + const std::optional& head, + const char* logged_func_name); + + /** + * returns a list of snaps "fix orders" + */ + std::vector scan_snaps( + ScrubMap& smap, + Scrub::SnapMapReaderI& snaps_getter); + + /** + * an aux used by scan_snaps(), possibly returning a fix-order + * for a specific hobject. + */ + std::optional scan_object_snaps( + const hobject_t& hoid, + const SnapSet& snapset, + Scrub::SnapMapReaderI& snaps_getter); + + // accessing the PG backend for this translation service + uint64_t logical_to_ondisk_size(uint64_t logical_size) const; +}; + +template <> +struct fmt::formatter { + constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); } + + template + auto format(const data_omap_digests_t& dg, FormatContext& ctx) + { + // can't use value_or() due to different output types + if (std::get<0>(dg).has_value()) { + fmt::format_to(ctx.out(), "[{:#x}/", std::get<0>(dg).value()); + } else { + fmt::format_to(ctx.out(), "[---/"); + } + if (std::get<1>(dg).has_value()) { + return fmt::format_to(ctx.out(), "{:#x}]", std::get<1>(dg).value()); + } else { + return fmt::format_to(ctx.out(), "---]"); + } + } +}; + +template <> +struct fmt::formatter> { + constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); } + + template + auto format(const std::pair& x, + FormatContext& ctx) const + { + return fmt::format_to(ctx.out(), + "{{ {} - {} }}", + std::get<0>(x), + std::get<1>(x)); + } +}; diff --git a/src/osd/scrubber/scrub_machine.cc b/src/osd/scrubber/scrub_machine.cc new file mode 100644 index 000000000..c372c7ede --- /dev/null +++ b/src/osd/scrubber/scrub_machine.cc @@ -0,0 +1,602 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include + +#include + +#include "osd/OSD.h" +#include "osd/OpRequest.h" + +#include "ScrubStore.h" +#include "scrub_machine.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix *_dout << " scrubberFSM " + +using namespace std::chrono; +using namespace std::chrono_literals; + +#define DECLARE_LOCALS \ + ScrubMachineListener* scrbr = context().m_scrbr; \ + std::ignore = scrbr; \ + auto pg_id = context().m_pg_id; \ + std::ignore = pg_id; + +NamedSimply::NamedSimply(ScrubMachineListener* scrubber, const char* name) +{ + scrubber->set_state_name(name); +} + +namespace Scrub { + +// --------- trace/debug auxiliaries ------------------------------- + +void on_event_creation(std::string_view nm) +{ + dout(20) << " event: --vvvv---- " << nm << dendl; +} + +void on_event_discard(std::string_view nm) +{ + dout(20) << " event: --^^^^---- " << nm << dendl; +} + +void ScrubMachine::assert_not_active() const +{ + ceph_assert(state_cast()); +} + +bool ScrubMachine::is_reserving() const +{ + return state_cast(); +} + +bool ScrubMachine::is_accepting_updates() const +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + ceph_assert(scrbr->is_primary()); + + return state_cast(); +} + +// for the rest of the code in this file - we know what PG we are dealing with: +#undef dout_prefix +#define dout_prefix _prefix(_dout, this->context()) + +template +static ostream& _prefix(std::ostream* _dout, T& t) +{ + return t.gen_prefix(*_dout); +} + +std::ostream& ScrubMachine::gen_prefix(std::ostream& out) const +{ + return m_scrbr->gen_prefix(out) << "FSM: "; +} + +// ////////////// the actual actions + +// ----------------------- NotActive ----------------------------------------- + +NotActive::NotActive(my_context ctx) + : my_base(ctx) + , NamedSimply(context().m_scrbr, "NotActive") +{ + dout(10) << "-- state -->> NotActive" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + scrbr->clear_queued_or_active(); +} + +sc::result NotActive::react(const StartScrub&) +{ + dout(10) << "NotActive::react(const StartScrub&)" << dendl; + DECLARE_LOCALS; + scrbr->set_scrub_begin_time(); + return transit(); +} + +sc::result NotActive::react(const AfterRepairScrub&) +{ + dout(10) << "NotActive::react(const AfterRepairScrub&)" << dendl; + DECLARE_LOCALS; + scrbr->set_scrub_begin_time(); + return transit(); +} + +// ----------------------- ReservingReplicas --------------------------------- + +ReservingReplicas::ReservingReplicas(my_context ctx) + : my_base(ctx) + , NamedSimply(context().m_scrbr, "ReservingReplicas") +{ + dout(10) << "-- state -->> ReservingReplicas" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + + // prevent the OSD from starting another scrub while we are trying to secure + // replicas resources + scrbr->set_reserving_now(); + scrbr->reserve_replicas(); +} + +ReservingReplicas::~ReservingReplicas() +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + scrbr->clear_reserving_now(); +} + +sc::result ReservingReplicas::react(const ReservationFailure&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "ReservingReplicas::react(const ReservationFailure&)" << dendl; + + // the Scrubber must release all resources and abort the scrubbing + scrbr->clear_pgscrub_state(); + return transit(); +} + +/** + * note: the event poster is handling the scrubber reset + */ +sc::result ReservingReplicas::react(const FullReset&) +{ + dout(10) << "ReservingReplicas::react(const FullReset&)" << dendl; + return transit(); +} + +// ----------------------- ActiveScrubbing ----------------------------------- + +ActiveScrubbing::ActiveScrubbing(my_context ctx) + : my_base(ctx) + , NamedSimply(context().m_scrbr, "ActiveScrubbing") +{ + dout(10) << "-- state -->> ActiveScrubbing" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + scrbr->on_init(); +} + +/** + * upon exiting the Active state + */ +ActiveScrubbing::~ActiveScrubbing() +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(15) << __func__ << dendl; + scrbr->unreserve_replicas(); + scrbr->clear_queued_or_active(); +} + +/* + * The only source of an InternalError event as of now is the BuildMap state, + * when encountering a backend error. + * We kill the scrub and reset the FSM. + */ +sc::result ActiveScrubbing::react(const InternalError&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << __func__ << dendl; + scrbr->clear_pgscrub_state(); + return transit(); +} + +sc::result ActiveScrubbing::react(const FullReset&) +{ + dout(10) << "ActiveScrubbing::react(const FullReset&)" << dendl; + // caller takes care of clearing the scrubber & FSM states + return transit(); +} + +// ----------------------- RangeBlocked ----------------------------------- + +/* + * Blocked. Will be released by kick_object_context_blocked() (or upon + * an abort) + * + * Note: we are never expected to be waiting for long for a blocked object. + * Unfortunately we know from experience that a bug elsewhere might result + * in an indefinite wait in this state, for an object that is never released. + * If that happens, all we can do is to issue a warning message to help + * with the debugging. + */ +RangeBlocked::RangeBlocked(my_context ctx) + : my_base(ctx) + , NamedSimply(context().m_scrbr, "Act/RangeBlocked") +{ + dout(10) << "-- state -->> Act/RangeBlocked" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + + // arrange to have a warning message issued if we are stuck in this + // state for longer than some reasonable number of minutes. + m_timeout = scrbr->acquire_blocked_alarm(); +} + +// ----------------------- PendingTimer ----------------------------------- + +/** + * Sleeping till timer reactivation - or just requeuing + */ +PendingTimer::PendingTimer(my_context ctx) + : my_base(ctx) + , NamedSimply(context().m_scrbr, "Act/PendingTimer") +{ + dout(10) << "-- state -->> Act/PendingTimer" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + + scrbr->add_delayed_scheduling(); +} + +// ----------------------- NewChunk ----------------------------------- + +/** + * Preconditions: + * - preemption data was set + * - epoch start was updated + */ +NewChunk::NewChunk(my_context ctx) + : my_base(ctx) + , NamedSimply(context().m_scrbr, "Act/NewChunk") +{ + dout(10) << "-- state -->> Act/NewChunk" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + + scrbr->get_preemptor().adjust_parameters(); + + // choose range to work on + // select_range_n_notify() will signal either SelectedChunkFree or + // ChunkIsBusy. If 'busy', we transition to Blocked, and wait for the + // range to become available. + scrbr->select_range_n_notify(); +} + +sc::result NewChunk::react(const SelectedChunkFree&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "NewChunk::react(const SelectedChunkFree&)" << dendl; + + scrbr->set_subset_last_update(scrbr->search_log_for_updates()); + return transit(); +} + +// ----------------------- WaitPushes ----------------------------------- + +WaitPushes::WaitPushes(my_context ctx) + : my_base(ctx) + , NamedSimply(context().m_scrbr, "Act/WaitPushes") +{ + dout(10) << " -- state -->> Act/WaitPushes" << dendl; + post_event(ActivePushesUpd{}); +} + +/* + * Triggered externally, by the entity that had an update re pushes + */ +sc::result WaitPushes::react(const ActivePushesUpd&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) + << "WaitPushes::react(const ActivePushesUpd&) pending_active_pushes: " + << scrbr->pending_active_pushes() << dendl; + + if (!scrbr->pending_active_pushes()) { + // done waiting + return transit(); + } + + return discard_event(); +} + +// ----------------------- WaitLastUpdate ----------------------------------- + +WaitLastUpdate::WaitLastUpdate(my_context ctx) + : my_base(ctx) + , NamedSimply(context().m_scrbr, "Act/WaitLastUpdate") +{ + dout(10) << " -- state -->> Act/WaitLastUpdate" << dendl; + post_event(UpdatesApplied{}); +} + +/** + * Note: + * Updates are locally readable immediately. Thus, on the replicas we do need + * to wait for the update notifications before scrubbing. For the Primary it's + * a bit different: on EC (and only there) rmw operations have an additional + * read roundtrip. That means that on the Primary we need to wait for + * last_update_applied (the replica side, even on EC, is still safe + * since the actual transaction will already be readable by commit time. + */ +void WaitLastUpdate::on_new_updates(const UpdatesApplied&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "WaitLastUpdate::on_new_updates(const UpdatesApplied&)" << dendl; + + if (scrbr->has_pg_marked_new_updates()) { + post_event(InternalAllUpdates{}); + } else { + // will be requeued by op_applied + dout(10) << "wait for EC read/modify/writes to queue" << dendl; + } +} + +/* + * request maps from the replicas in the acting set + */ +sc::result WaitLastUpdate::react(const InternalAllUpdates&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "WaitLastUpdate::react(const InternalAllUpdates&)" << dendl; + + scrbr->get_replicas_maps(scrbr->get_preemptor().is_preemptable()); + return transit(); +} + +// ----------------------- BuildMap ----------------------------------- + +BuildMap::BuildMap(my_context ctx) + : my_base(ctx) + , NamedSimply(context().m_scrbr, "Act/BuildMap") +{ + dout(10) << " -- state -->> Act/BuildMap" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + + // no need to check for an epoch change, as all possible flows that brought + // us here have a check_interval() verification of their final event. + + if (scrbr->get_preemptor().was_preempted()) { + + // we were preempted, either directly or by a replica + dout(10) << __func__ << " preempted!!!" << dendl; + scrbr->mark_local_map_ready(); + post_event(IntBmPreempted{}); + + } else { + + auto ret = scrbr->build_primary_map_chunk(); + + if (ret == -EINPROGRESS) { + // must wait for the backend to finish. No specific event provided. + // build_primary_map_chunk() has already requeued us. + dout(20) << "waiting for the backend..." << dendl; + + } else if (ret < 0) { + + dout(10) << "BuildMap::BuildMap() Error! Aborting. Ret: " << ret << dendl; + post_event(InternalError{}); + + } else { + + // the local map was created + post_event(IntLocalMapDone{}); + } + } +} + +sc::result BuildMap::react(const IntLocalMapDone&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "BuildMap::react(const IntLocalMapDone&)" << dendl; + + scrbr->mark_local_map_ready(); + return transit(); +} + +// ----------------------- DrainReplMaps ----------------------------------- + +DrainReplMaps::DrainReplMaps(my_context ctx) + : my_base(ctx) + , NamedSimply(context().m_scrbr, "Act/DrainReplMaps") +{ + dout(10) << "-- state -->> Act/DrainReplMaps" << dendl; + // we may have got all maps already. Send the event that will make us check. + post_event(GotReplicas{}); +} + +sc::result DrainReplMaps::react(const GotReplicas&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "DrainReplMaps::react(const GotReplicas&)" << dendl; + + if (scrbr->are_all_maps_available()) { + // NewChunk will handle the preemption that brought us to this state + return transit(); + } + + dout(15) << "DrainReplMaps::react(const GotReplicas&): still draining " + "incoming maps: " + << scrbr->dump_awaited_maps() << dendl; + return discard_event(); +} + +// ----------------------- WaitReplicas ----------------------------------- + +WaitReplicas::WaitReplicas(my_context ctx) + : my_base(ctx) + , NamedSimply(context().m_scrbr, "Act/WaitReplicas") +{ + dout(10) << "-- state -->> Act/WaitReplicas" << dendl; + post_event(GotReplicas{}); +} + +/** + * note: now that maps_compare_n_cleanup() is "futurized"(*), and we remain in + * this state for a while even after we got all our maps, we must prevent + * are_all_maps_available() (actually - the code after the if()) from being + * called more than once. + * This is basically a separate state, but it's too transitory and artificial + * to justify the cost of a separate state. + + * (*) "futurized" - in Crimson, the call to maps_compare_n_cleanup() returns + * immediately after initiating the process. The actual termination of the + * maps comparing etc' is signalled via an event. As we share the code with + * "classic" OSD, here too maps_compare_n_cleanup() is responsible for + * signalling the completion of the processing. + */ +sc::result WaitReplicas::react(const GotReplicas&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "WaitReplicas::react(const GotReplicas&)" << dendl; + + if (!all_maps_already_called && scrbr->are_all_maps_available()) { + dout(10) << "WaitReplicas::react(const GotReplicas&) got all" << dendl; + + all_maps_already_called = true; + + // were we preempted? + if (scrbr->get_preemptor().disable_and_test()) { // a test&set + + + dout(10) << "WaitReplicas::react(const GotReplicas&) PREEMPTED!" << dendl; + return transit(); + + } else { + scrbr->maps_compare_n_cleanup(); + return transit(); + } + } else { + return discard_event(); + } +} + +sc::result WaitReplicas::react(const DigestUpdate&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + auto warn_msg = + "WaitReplicas::react(const DigestUpdate&): Unexpected DigestUpdate event"s; + dout(10) << warn_msg << dendl; + scrbr->log_cluster_warning(warn_msg); + return discard_event(); +} + +// ----------------------- WaitDigestUpdate ----------------------------------- + +WaitDigestUpdate::WaitDigestUpdate(my_context ctx) + : my_base(ctx) + , NamedSimply(context().m_scrbr, "Act/WaitDigestUpdate") +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "-- state -->> Act/WaitDigestUpdate" << dendl; + + // perform an initial check: maybe we already + // have all the updates we need: + // (note that DigestUpdate is usually an external event) + post_event(DigestUpdate{}); +} + +sc::result WaitDigestUpdate::react(const DigestUpdate&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "WaitDigestUpdate::react(const DigestUpdate&)" << dendl; + + // on_digest_updates() will either: + // - do nothing - if we are still waiting for updates, or + // - finish the scrubbing of the current chunk, and: + // - send NextChunk, or + // - send ScrubFinished + scrbr->on_digest_updates(); + return discard_event(); +} + +sc::result WaitDigestUpdate::react(const ScrubFinished&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "WaitDigestUpdate::react(const ScrubFinished&)" << dendl; + scrbr->set_scrub_duration(); + scrbr->scrub_finish(); + return transit(); +} + +ScrubMachine::ScrubMachine(PG* pg, ScrubMachineListener* pg_scrub) + : m_pg_id{pg->pg_id} + , m_scrbr{pg_scrub} +{} + +ScrubMachine::~ScrubMachine() = default; + +// -------- for replicas ----------------------------------------------------- + +// ----------------------- ReplicaWaitUpdates -------------------------------- + +ReplicaWaitUpdates::ReplicaWaitUpdates(my_context ctx) + : my_base(ctx) + , NamedSimply(context().m_scrbr, "ReplicaWaitUpdates") +{ + dout(10) << "-- state -->> ReplicaWaitUpdates" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + scrbr->on_replica_init(); +} + +/* + * Triggered externally, by the entity that had an update re pushes + */ +sc::result ReplicaWaitUpdates::react(const ReplicaPushesUpd&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "ReplicaWaitUpdates::react(const ReplicaPushesUpd&): " + << scrbr->pending_active_pushes() << dendl; + + if (scrbr->pending_active_pushes() == 0) { + + // done waiting + return transit(); + } + + return discard_event(); +} + +/** + * the event poster is handling the scrubber reset + */ +sc::result ReplicaWaitUpdates::react(const FullReset&) +{ + dout(10) << "ReplicaWaitUpdates::react(const FullReset&)" << dendl; + return transit(); +} + +// ----------------------- ActiveReplica ----------------------------------- + +ActiveReplica::ActiveReplica(my_context ctx) + : my_base(ctx) + , NamedSimply(context().m_scrbr, "ActiveReplica") +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "-- state -->> ActiveReplica" << dendl; + // and as we might have skipped ReplicaWaitUpdates: + scrbr->on_replica_init(); + post_event(SchedReplica{}); +} + +sc::result ActiveReplica::react(const SchedReplica&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "ActiveReplica::react(const SchedReplica&). is_preemptable? " + << scrbr->get_preemptor().is_preemptable() << dendl; + + if (scrbr->get_preemptor().was_preempted()) { + dout(10) << "replica scrub job preempted" << dendl; + + scrbr->send_preempted_replica(); + scrbr->replica_handling_done(); + return transit(); + } + + // start or check progress of build_replica_map_chunk() + auto ret_init = scrbr->build_replica_map_chunk(); + if (ret_init != -EINPROGRESS) { + return transit(); + } + + return discard_event(); +} + +/** + * the event poster is handling the scrubber reset + */ +sc::result ActiveReplica::react(const FullReset&) +{ + dout(10) << "ActiveReplica::react(const FullReset&)" << dendl; + return transit(); +} + +} // namespace Scrub diff --git a/src/osd/scrubber/scrub_machine.h b/src/osd/scrubber/scrub_machine.h new file mode 100644 index 000000000..038668fb2 --- /dev/null +++ b/src/osd/scrubber/scrub_machine.h @@ -0,0 +1,384 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/version.h" +#include "include/Context.h" +#include "osd/scrubber_common.h" + +#include "scrub_machine_lstnr.h" + +/// a wrapper that sets the FSM state description used by the +/// PgScrubber +/// \todo consider using the full NamedState as in Peering +struct NamedSimply { + explicit NamedSimply(ScrubMachineListener* scrubber, const char* name); +}; + +class PG; // holding a pointer to that one - just for testing +class PgScrubber; + +namespace Scrub { + +namespace sc = ::boost::statechart; +namespace mpl = ::boost::mpl; + +// +// EVENTS +// + +void on_event_creation(std::string_view nm); +void on_event_discard(std::string_view nm); + +#define MEV(E) \ + struct E : sc::event { \ + inline static int actv{0}; \ + E() \ + { \ + if (!actv++) \ + on_event_creation(#E); \ + } \ + ~E() \ + { \ + if (!--actv) \ + on_event_discard(#E); \ + } \ + void print(std::ostream* out) const { *out << #E; } \ + std::string_view print() const { return #E; } \ + }; + +/// all replicas have granted our reserve request +MEV(RemotesReserved) + +/// a reservation request has failed +MEV(ReservationFailure) + +/// initiate a new scrubbing session (relevant if we are a Primary) +MEV(StartScrub) + +/// initiate a new scrubbing session. Only triggered at Recovery completion +MEV(AfterRepairScrub) + +/// triggered when the PG unblocked an object that was marked for scrubbing. +/// Via the PGScrubUnblocked op +MEV(Unblocked) + +MEV(InternalSchedScrub) + +MEV(SelectedChunkFree) + +MEV(ChunkIsBusy) + +/// Update to active_pushes. 'active_pushes' represents recovery that +/// is in-flight to the local ObjectStore +MEV(ActivePushesUpd) + +/// (Primary only) all updates are committed +MEV(UpdatesApplied) + +/// the internal counterpart of UpdatesApplied +MEV(InternalAllUpdates) + +/// got a map from a replica +MEV(GotReplicas) + +/// internal - BuildMap preempted. Required, as detected within the ctor +MEV(IntBmPreempted) + +MEV(InternalError) + +MEV(IntLocalMapDone) + +/// external. called upon success of a MODIFY op. See +/// scrub_snapshot_metadata() +MEV(DigestUpdate) + +/// initiating replica scrub +MEV(StartReplica) + +/// 'start replica' when there are no pending updates +MEV(StartReplicaNoWait) + +MEV(SchedReplica) + +/// Update to active_pushes. 'active_pushes' represents recovery +/// that is in-flight to the local ObjectStore +MEV(ReplicaPushesUpd) + +/// guarantee that the FSM is in the quiescent state (i.e. NotActive) +MEV(FullReset) + +/// finished handling this chunk. Go get the next one +MEV(NextChunk) + +/// all chunks handled +MEV(ScrubFinished) + +// +// STATES +// + +struct NotActive; ///< the quiescent state. No active scrubbing. +struct ReservingReplicas; ///< securing scrub resources from replicas' OSDs +struct ActiveScrubbing; ///< the active state for a Primary. A sub-machine. +struct ReplicaWaitUpdates; ///< an active state for a replica. Waiting for all + ///< active operations to finish. +struct ActiveReplica; ///< an active state for a replica. + + +class ScrubMachine : public sc::state_machine { + public: + friend class PgScrubber; + + public: + explicit ScrubMachine(PG* pg, ScrubMachineListener* pg_scrub); + ~ScrubMachine(); + + spg_t m_pg_id; + ScrubMachineListener* m_scrbr; + std::ostream& gen_prefix(std::ostream& out) const; + + void assert_not_active() const; + [[nodiscard]] bool is_reserving() const; + [[nodiscard]] bool is_accepting_updates() const; +}; + +/** + * The Scrubber's base (quiescent) state. + * Scrubbing is triggered by one of the following events: + * + * - (standard scenario for a Primary): 'StartScrub'. Initiates the OSDs + * resources reservation process. Will be issued by PG::scrub(), following a + * queued "PGScrub" op. + * + * - a special end-of-recovery Primary scrub event ('AfterRepairScrub'). + * + * - (for a replica) 'StartReplica' or 'StartReplicaNoWait', triggered by + * an incoming MOSDRepScrub message. + * + * note (20.8.21): originally, AfterRepairScrub was triggering a scrub without + * waiting for replica resources to be acquired. But once replicas started + * using the resource-request to identify and tag the scrub session, this + * bypass cannot be supported anymore. + */ +struct NotActive : sc::state, NamedSimply { + explicit NotActive(my_context ctx); + + using reactions = + mpl::list, + // a scrubbing that was initiated at recovery completion: + sc::custom_reaction, + sc::transition, + sc::transition>; + sc::result react(const StartScrub&); + sc::result react(const AfterRepairScrub&); +}; + +struct ReservingReplicas : sc::state, + NamedSimply { + + explicit ReservingReplicas(my_context ctx); + ~ReservingReplicas(); + using reactions = mpl::list, + // all replicas granted our resources request + sc::transition, + sc::custom_reaction>; + + sc::result react(const FullReset&); + + /// at least one replica denied us the scrub resources we've requested + sc::result react(const ReservationFailure&); +}; + + +// the "active" sub-states + +/// the objects range is blocked +struct RangeBlocked; + +/// either delaying the scrub by some time and requeuing, or just requeue +struct PendingTimer; + +/// select a chunk to scrub, and verify its availability +struct NewChunk; + +struct WaitPushes; +struct WaitLastUpdate; +struct BuildMap; + +/// a problem during BuildMap. Wait for all replicas to report, then restart. +struct DrainReplMaps; + +/// wait for all replicas to report +struct WaitReplicas; + +struct WaitDigestUpdate; + +struct ActiveScrubbing + : sc::state, NamedSimply { + + explicit ActiveScrubbing(my_context ctx); + ~ActiveScrubbing(); + + using reactions = mpl::list, + sc::custom_reaction>; + + sc::result react(const FullReset&); + sc::result react(const InternalError&); +}; + +struct RangeBlocked : sc::state, NamedSimply { + explicit RangeBlocked(my_context ctx); + using reactions = mpl::list>; + + Scrub::BlockedRangeWarning m_timeout; +}; + +struct PendingTimer : sc::state, NamedSimply { + + explicit PendingTimer(my_context ctx); + + using reactions = mpl::list>; +}; + +struct NewChunk : sc::state, NamedSimply { + + explicit NewChunk(my_context ctx); + + using reactions = mpl::list, + sc::custom_reaction>; + + sc::result react(const SelectedChunkFree&); +}; + +/** + * initiate the update process for this chunk + * + * Wait fo 'active_pushes' to clear. + * 'active_pushes' represents recovery that is in-flight to the local + * Objectstore, hence scrub waits until the correct data is readable + * (in-flight data to the Objectstore is not readable until written to + * disk, termed 'applied' here) + */ +struct WaitPushes : sc::state, NamedSimply { + + explicit WaitPushes(my_context ctx); + + using reactions = mpl::list>; + + sc::result react(const ActivePushesUpd&); +}; + +struct WaitLastUpdate : sc::state, + NamedSimply { + + explicit WaitLastUpdate(my_context ctx); + + void on_new_updates(const UpdatesApplied&); + + using reactions = + mpl::list, + sc::in_state_reaction>; + + sc::result react(const InternalAllUpdates&); +}; + +struct BuildMap : sc::state, NamedSimply { + explicit BuildMap(my_context ctx); + + // possible error scenarios: + // - an error reported by the backend will trigger an 'InternalError' event, + // handled by our parent state; + // - if preempted, we switch to DrainReplMaps, where we will wait for all + // replicas to send their maps before acknowledging the preemption; + // - an interval change will be handled by the relevant 'send-event' + // functions, and will translated into a 'FullReset' event. + using reactions = mpl::list, + // looping, waiting for the backend to finish: + sc::transition, + sc::custom_reaction>; + + sc::result react(const IntLocalMapDone&); +}; + +/* + * "drain" scrub-maps responses from replicas + */ +struct DrainReplMaps : sc::state, NamedSimply { + explicit DrainReplMaps(my_context ctx); + + using reactions = + // all replicas are accounted for: + mpl::list>; + + sc::result react(const GotReplicas&); +}; + +struct WaitReplicas : sc::state, NamedSimply { + explicit WaitReplicas(my_context ctx); + + using reactions = mpl::list< + // all replicas are accounted for: + sc::custom_reaction, + sc::custom_reaction>; + + sc::result react(const GotReplicas&); + sc::result react(const DigestUpdate&); + bool all_maps_already_called{false}; // see comment in react code +}; + +struct WaitDigestUpdate : sc::state, + NamedSimply { + explicit WaitDigestUpdate(my_context ctx); + + using reactions = mpl::list, + sc::custom_reaction, + sc::transition>; + sc::result react(const DigestUpdate&); + sc::result react(const ScrubFinished&); +}; + +// ----------------------------- the "replica active" states + +/* + * Waiting for 'active_pushes' to complete + * + * When in this state: + * - the details of the Primary's request were internalized by PgScrubber; + * - 'active' scrubbing is set + */ +struct ReplicaWaitUpdates : sc::state, + NamedSimply { + explicit ReplicaWaitUpdates(my_context ctx); + using reactions = mpl::list, + sc::custom_reaction>; + + sc::result react(const ReplicaPushesUpd&); + sc::result react(const FullReset&); +}; + + +struct ActiveReplica : sc::state, NamedSimply { + explicit ActiveReplica(my_context ctx); + using reactions = mpl::list, + sc::custom_reaction>; + + sc::result react(const SchedReplica&); + sc::result react(const FullReset&); +}; + +} // namespace Scrub diff --git a/src/osd/scrubber/scrub_machine_lstnr.h b/src/osd/scrubber/scrub_machine_lstnr.h new file mode 100644 index 000000000..a8e77d075 --- /dev/null +++ b/src/osd/scrubber/scrub_machine_lstnr.h @@ -0,0 +1,223 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once +/** + * \file the PgScrubber interface used by the scrub FSM + */ +#include "common/version.h" +#include "include/Context.h" +#include "osd/osd_types.h" + +struct ScrubMachineListener; + +namespace Scrub { + +enum class PreemptionNoted { no_preemption, preempted }; + +/// the interface exposed by the PgScrubber into its internal +/// preemption_data object +struct preemption_t { + + virtual ~preemption_t() = default; + + preemption_t() = default; + preemption_t(const preemption_t&) = delete; + preemption_t(preemption_t&&) = delete; + + [[nodiscard]] virtual bool is_preemptable() const = 0; + + [[nodiscard]] virtual bool was_preempted() const = 0; + + virtual void adjust_parameters() = 0; + + /** + * Try to preempt the scrub. + * 'true' (i.e. - preempted) if: + * preemptable && not already preempted + */ + virtual bool do_preempt() = 0; + + /** + * disables preemptions. + * Returns 'true' if we were already preempted + */ + virtual bool disable_and_test() = 0; +}; + +/// an aux used when blocking on a busy object. +/// Issues a log warning if still blocked after 'waittime'. +struct blocked_range_t { + blocked_range_t(OSDService* osds, + ceph::timespan waittime, + ScrubMachineListener& scrubber, + spg_t pg_id); + ~blocked_range_t(); + + OSDService* m_osds; + ScrubMachineListener& m_scrubber; + + /// used to identify ourselves to the PG, when no longer blocked + spg_t m_pgid; + Context* m_callbk; + + // once timed-out, we flag the OSD's scrub-queue as having + // a problem. 'm_warning_issued' signals the need to clear + // that OSD-wide flag. + bool m_warning_issued{false}; +}; + +using BlockedRangeWarning = std::unique_ptr; + +} // namespace Scrub + +struct ScrubMachineListener { + + struct MsgAndEpoch { + MessageRef m_msg; + epoch_t m_epoch; + }; + + virtual ~ScrubMachineListener() = default; + + /// set the string we'd use in logs to convey the current state-machine + /// state. + virtual void set_state_name(const char* name) = 0; + + [[nodiscard]] virtual bool is_primary() const = 0; + + virtual void select_range_n_notify() = 0; + + virtual Scrub::BlockedRangeWarning acquire_blocked_alarm() = 0; + + /// walk the log to find the latest update that affects our chunk + virtual eversion_t search_log_for_updates() const = 0; + + virtual eversion_t get_last_update_applied() const = 0; + + virtual int pending_active_pushes() const = 0; + + virtual int build_primary_map_chunk() = 0; + + virtual int build_replica_map_chunk() = 0; + + virtual void on_init() = 0; + + virtual void on_replica_init() = 0; + + virtual void replica_handling_done() = 0; + + /// the version of 'scrub_clear_state()' that does not try to invoke FSM + /// services (thus can be called from FSM reactions) + virtual void clear_pgscrub_state() = 0; + + /* + * Send an 'InternalSchedScrub' FSM event either immediately, or - if + * 'm_need_sleep' is asserted - after a configuration-dependent timeout. + */ + virtual void add_delayed_scheduling() = 0; + + /** + * Ask all replicas for their scrub maps for the current chunk. + */ + virtual void get_replicas_maps(bool replica_can_preempt) = 0; + + virtual void on_digest_updates() = 0; + + /// the part that actually finalizes a scrub + virtual void scrub_finish() = 0; + + /** + * Prepare a MOSDRepScrubMap message carrying the requested scrub map + * @param was_preempted - were we preempted? + * @return the message, and the current value of 'm_replica_min_epoch' (which + * is used when sending the message, but will be overwritten before that). + */ + [[nodiscard]] virtual MsgAndEpoch prep_replica_map_msg( + Scrub::PreemptionNoted was_preempted) = 0; + + /** + * Send to the primary the pre-prepared message containing the requested map + */ + virtual void send_replica_map(const MsgAndEpoch& preprepared) = 0; + + /** + * Let the primary know that we were preempted while trying to build the + * requested map. + */ + virtual void send_preempted_replica() = 0; + + [[nodiscard]] virtual bool has_pg_marked_new_updates() const = 0; + + virtual void set_subset_last_update(eversion_t e) = 0; + + [[nodiscard]] virtual bool was_epoch_changed() const = 0; + + virtual Scrub::preemption_t& get_preemptor() = 0; + + /** + * a "technical" collection of the steps performed once all + * rep maps are available: + * - the maps are compared + * - the scrub region markers (start_ & end_) are advanced + * - callbacks and ops that were pending are allowed to run + */ + virtual void maps_compare_n_cleanup() = 0; + + /** + * order the PgScrubber to initiate the process of reserving replicas' scrub + * resources. + */ + virtual void reserve_replicas() = 0; + + virtual void unreserve_replicas() = 0; + + virtual void set_scrub_begin_time() = 0; + + virtual void set_scrub_duration() = 0; + + /** + * No new scrub session will start while a scrub was initiate on a PG, + * and that PG is trying to acquire replica resources. + * set_reserving_now()/clear_reserving_now() let's the OSD scrub-queue know + * we are busy reserving. + */ + virtual void set_reserving_now() = 0; + virtual void clear_reserving_now() = 0; + + /** + * Manipulate the 'I am being scrubbed now' Scrubber's flag + */ + virtual void set_queued_or_active() = 0; + virtual void clear_queued_or_active() = 0; + + /** + * Our scrubbing is blocked, waiting for an excessive length of time for + * our target chunk to be unlocked. We will set the corresponding flags, + * both in the OSD_wide scrub-queue object, and in our own scrub-job object. + * Both flags are used to report the unhealthy state in the log and in + * response to scrub-queue queries. + */ + virtual void set_scrub_blocked(utime_t since) = 0; + virtual void clear_scrub_blocked() = 0; + + /** + * the FSM interface into the "are we waiting for maps, either our own or from + * replicas" state. + * The FSM can only: + * - mark the local map as available, and + * - query status + */ + virtual void mark_local_map_ready() = 0; + + [[nodiscard]] virtual bool are_all_maps_available() const = 0; + + /// a log/debug interface + virtual std::string dump_awaited_maps() const = 0; + + /// exposed to be used by the scrub_machine logger + virtual std::ostream& gen_prefix(std::ostream& out) const = 0; + + /// sending cluster-log warnings + virtual void log_cluster_warning(const std::string& msg) const = 0; +}; -- cgit v1.2.3