summaryrefslogtreecommitdiffstats
path: root/src/osd/PG.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/osd/PG.h')
-rw-r--r--src/osd/PG.h1437
1 files changed, 1437 insertions, 0 deletions
diff --git a/src/osd/PG.h b/src/osd/PG.h
new file mode 100644
index 000000000..88c893b35
--- /dev/null
+++ b/src/osd/PG.h
@@ -0,0 +1,1437 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_PG_H
+#define CEPH_PG_H
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/container/flat_set.hpp>
+#include "include/mempool.h"
+
+// re-include our assert to clobber boost's
+#include "include/ceph_assert.h"
+#include "include/common_fwd.h"
+
+#include "include/types.h"
+#include "include/stringify.h"
+#include "osd_types.h"
+#include "include/xlist.h"
+#include "SnapMapper.h"
+#include "Session.h"
+#include "common/Timer.h"
+
+#include "PGLog.h"
+#include "OSDMap.h"
+#include "include/str_list.h"
+#include "PGBackend.h"
+#include "PGPeeringEvent.h"
+#include "PeeringState.h"
+#include "recovery_types.h"
+#include "MissingLoc.h"
+#include "scrubber_common.h"
+
+#include "mgr/OSDPerfMetricTypes.h"
+
+#include <atomic>
+#include <list>
+#include <memory>
+#include <string>
+#include <tuple>
+
+//#define DEBUG_RECOVERY_OIDS // track std::set of recovering oids explicitly, to find counting bugs
+//#define PG_DEBUG_REFS // track provenance of pg refs, helpful for finding leaks
+
+class OSD;
+class OSDService;
+struct OSDShard;
+struct OSDShardPGSlot;
+
+class PG;
+struct OpRequest;
+typedef OpRequest::Ref OpRequestRef;
+class DynamicPerfStats;
+class PgScrubber;
+class ScrubBackend;
+
+namespace Scrub {
+ class Store;
+ class ReplicaReservations;
+ class LocalReservation;
+ class ReservedByRemotePrimary;
+ enum class schedule_result_t;
+}
+
+#ifdef PG_DEBUG_REFS
+#include "common/tracked_int_ptr.hpp"
+ uint64_t get_with_id(PG *pg);
+ void put_with_id(PG *pg, uint64_t id);
+ typedef TrackedIntPtr<PG> PGRef;
+#else
+ typedef boost::intrusive_ptr<PG> PGRef;
+#endif
+
+class PGRecoveryStats {
+ struct per_state_info {
+ uint64_t enter, exit; // enter/exit counts
+ uint64_t events;
+ utime_t event_time; // time spent processing events
+ utime_t total_time; // total time in state
+ utime_t min_time, max_time;
+
+ // cppcheck-suppress unreachableCode
+ per_state_info() : enter(0), exit(0), events(0) {}
+ };
+ std::map<const char *,per_state_info> info;
+ ceph::mutex lock = ceph::make_mutex("PGRecoverStats::lock");
+
+ public:
+ PGRecoveryStats() = default;
+
+ void reset() {
+ std::lock_guard l(lock);
+ info.clear();
+ }
+ void dump(ostream& out) {
+ std::lock_guard l(lock);
+ for (std::map<const char *,per_state_info>::iterator p = info.begin(); p != info.end(); ++p) {
+ per_state_info& i = p->second;
+ out << i.enter << "\t" << i.exit << "\t"
+ << i.events << "\t" << i.event_time << "\t"
+ << i.total_time << "\t"
+ << i.min_time << "\t" << i.max_time << "\t"
+ << p->first << "\n";
+ }
+ }
+
+ void dump_formatted(ceph::Formatter *f) {
+ std::lock_guard l(lock);
+ f->open_array_section("pg_recovery_stats");
+ for (std::map<const char *,per_state_info>::iterator p = info.begin();
+ p != info.end(); ++p) {
+ per_state_info& i = p->second;
+ f->open_object_section("recovery_state");
+ f->dump_int("enter", i.enter);
+ f->dump_int("exit", i.exit);
+ f->dump_int("events", i.events);
+ f->dump_stream("event_time") << i.event_time;
+ f->dump_stream("total_time") << i.total_time;
+ f->dump_stream("min_time") << i.min_time;
+ f->dump_stream("max_time") << i.max_time;
+ std::vector<std::string> states;
+ get_str_vec(p->first, "/", states);
+ f->open_array_section("nested_states");
+ for (std::vector<std::string>::iterator st = states.begin();
+ st != states.end(); ++st) {
+ f->dump_string("state", *st);
+ }
+ f->close_section();
+ f->close_section();
+ }
+ f->close_section();
+ }
+
+ void log_enter(const char *s) {
+ std::lock_guard l(lock);
+ info[s].enter++;
+ }
+ void log_exit(const char *s, utime_t dur, uint64_t events, utime_t event_dur) {
+ std::lock_guard l(lock);
+ per_state_info &i = info[s];
+ i.exit++;
+ i.total_time += dur;
+ if (dur > i.max_time)
+ i.max_time = dur;
+ if (dur < i.min_time || i.min_time == utime_t())
+ i.min_time = dur;
+ i.events += events;
+ i.event_time += event_dur;
+ }
+};
+
+/** PG - Replica Placement Group
+ *
+ */
+
+class PG : public DoutPrefixProvider,
+ public PeeringState::PeeringListener,
+ public Scrub::PgScrubBeListener {
+ friend struct NamedState;
+ friend class PeeringState;
+ friend class PgScrubber;
+ friend class ScrubBackend;
+
+public:
+ const pg_shard_t pg_whoami;
+ const spg_t pg_id;
+
+ /// the 'scrubber'. Will be allocated in the derivative (PrimaryLogPG) ctor,
+ /// and be removed only in the PrimaryLogPG destructor.
+ std::unique_ptr<ScrubPgIF> m_scrubber;
+
+ /// flags detailing scheduling/operation characteristics of the next scrub
+ requested_scrub_t m_planned_scrub;
+
+ const requested_scrub_t& get_planned_scrub() const {
+ return m_planned_scrub;
+ }
+
+ /// scrubbing state for both Primary & replicas
+ bool is_scrub_active() const { return m_scrubber->is_scrub_active(); }
+
+ /// set when the scrub request is queued, and reset after scrubbing fully
+ /// cleaned up.
+ bool is_scrub_queued_or_active() const { return m_scrubber->is_queued_or_active(); }
+
+public:
+ // -- members --
+ const coll_t coll;
+
+ ObjectStore::CollectionHandle ch;
+
+ // -- methods --
+ std::ostream& gen_prefix(std::ostream& out) const override;
+ CephContext *get_cct() const override {
+ return cct;
+ }
+ unsigned get_subsys() const override {
+ return ceph_subsys_osd;
+ }
+
+ const char* const get_current_state() const {
+ return recovery_state.get_current_state();
+ }
+
+ const OSDMapRef& get_osdmap() const {
+ ceph_assert(is_locked());
+ return recovery_state.get_osdmap();
+ }
+
+ epoch_t get_osdmap_epoch() const override final {
+ return recovery_state.get_osdmap()->get_epoch();
+ }
+
+ PerfCounters &get_peering_perf() override;
+ PerfCounters &get_perf_logger() override;
+ void log_state_enter(const char *state) override;
+ void log_state_exit(
+ const char *state_name, utime_t enter_time,
+ uint64_t events, utime_t event_dur) override;
+
+ void lock(bool no_lockdep = false) const;
+ void unlock() const;
+ bool is_locked() const;
+
+ const spg_t& get_pgid() const {
+ return pg_id;
+ }
+
+ const PGPool& get_pgpool() const final {
+ return pool;
+ }
+ uint64_t get_last_user_version() const {
+ return info.last_user_version;
+ }
+ const pg_history_t& get_history() const {
+ return info.history;
+ }
+ bool get_need_up_thru() const {
+ return recovery_state.get_need_up_thru();
+ }
+ epoch_t get_same_interval_since() const {
+ return info.history.same_interval_since;
+ }
+
+ bool is_waiting_for_unreadable_object() const final
+ {
+ return !waiting_for_unreadable_object.empty();
+ }
+
+ static void set_last_scrub_stamp(
+ utime_t t, pg_history_t &history, pg_stat_t &stats) {
+ stats.last_scrub_stamp = t;
+ history.last_scrub_stamp = t;
+ }
+
+ void set_last_scrub_stamp(utime_t t) {
+ recovery_state.update_stats(
+ [t](auto &history, auto &stats) {
+ set_last_scrub_stamp(t, history, stats);
+ return true;
+ });
+ }
+
+ static void set_last_deep_scrub_stamp(
+ utime_t t, pg_history_t &history, pg_stat_t &stats) {
+ stats.last_deep_scrub_stamp = t;
+ history.last_deep_scrub_stamp = t;
+ }
+
+ void set_last_deep_scrub_stamp(utime_t t) {
+ recovery_state.update_stats(
+ [t](auto &history, auto &stats) {
+ set_last_deep_scrub_stamp(t, history, stats);
+ return true;
+ });
+ }
+
+ static void add_objects_scrubbed_count(
+ int64_t count, pg_stat_t &stats) {
+ stats.objects_scrubbed += count;
+ }
+
+ void add_objects_scrubbed_count(int64_t count) {
+ recovery_state.update_stats(
+ [count](auto &history, auto &stats) {
+ add_objects_scrubbed_count(count, stats);
+ return true;
+ });
+ }
+
+ static void reset_objects_scrubbed(pg_stat_t &stats) {
+ stats.objects_scrubbed = 0;
+ }
+
+ void reset_objects_scrubbed()
+ {
+ recovery_state.update_stats([](auto& history, auto& stats) {
+ reset_objects_scrubbed(stats);
+ return true;
+ });
+ }
+
+ bool is_deleting() const {
+ return recovery_state.is_deleting();
+ }
+ bool is_deleted() const {
+ return recovery_state.is_deleted();
+ }
+ bool is_nonprimary() const {
+ return recovery_state.is_nonprimary();
+ }
+ bool is_primary() const {
+ return recovery_state.is_primary();
+ }
+ bool pg_has_reset_since(epoch_t e) {
+ ceph_assert(is_locked());
+ return recovery_state.pg_has_reset_since(e);
+ }
+
+ bool is_ec_pg() const {
+ return recovery_state.is_ec_pg();
+ }
+ int get_role() const {
+ return recovery_state.get_role();
+ }
+ const std::vector<int> get_acting() const {
+ return recovery_state.get_acting();
+ }
+ const std::set<pg_shard_t> &get_actingset() const {
+ return recovery_state.get_actingset();
+ }
+ int get_acting_primary() const {
+ return recovery_state.get_acting_primary();
+ }
+ pg_shard_t get_primary() const final {
+ return recovery_state.get_primary();
+ }
+ const std::vector<int> get_up() const {
+ return recovery_state.get_up();
+ }
+ int get_up_primary() const {
+ return recovery_state.get_up_primary();
+ }
+ const PastIntervals& get_past_intervals() const {
+ return recovery_state.get_past_intervals();
+ }
+ bool is_acting_recovery_backfill(pg_shard_t osd) const {
+ return recovery_state.is_acting_recovery_backfill(osd);
+ }
+ const std::set<pg_shard_t> &get_acting_recovery_backfill() const {
+ return recovery_state.get_acting_recovery_backfill();
+ }
+ bool is_acting(pg_shard_t osd) const {
+ return recovery_state.is_acting(osd);
+ }
+ bool is_up(pg_shard_t osd) const {
+ return recovery_state.is_up(osd);
+ }
+ static bool has_shard(bool ec, const std::vector<int>& v, pg_shard_t osd) {
+ return PeeringState::has_shard(ec, v, osd);
+ }
+
+ /// initialize created PG
+ void init(
+ int role,
+ const std::vector<int>& up,
+ int up_primary,
+ const std::vector<int>& acting,
+ int acting_primary,
+ const pg_history_t& history,
+ const PastIntervals& pim,
+ ObjectStore::Transaction &t);
+
+ /// read existing pg state off disk
+ void read_state(ObjectStore *store);
+ static int peek_map_epoch(ObjectStore *store, spg_t pgid, epoch_t *pepoch);
+
+ static int get_latest_struct_v() {
+ return pg_latest_struct_v;
+ }
+ static int get_compat_struct_v() {
+ return pg_compat_struct_v;
+ }
+ static int read_info(
+ ObjectStore *store, spg_t pgid, const coll_t &coll,
+ pg_info_t &info, PastIntervals &past_intervals,
+ __u8 &);
+ static bool _has_removal_flag(ObjectStore *store, spg_t pgid);
+
+ void rm_backoff(const ceph::ref_t<Backoff>& b);
+
+ void update_snap_mapper_bits(uint32_t bits) {
+ snap_mapper.update_bits(bits);
+ }
+ void start_split_stats(const std::set<spg_t>& childpgs, std::vector<object_stat_sum_t> *v);
+ virtual void split_colls(
+ spg_t child,
+ int split_bits,
+ int seed,
+ const pg_pool_t *pool,
+ ObjectStore::Transaction &t) = 0;
+ void split_into(pg_t child_pgid, PG *child, unsigned split_bits);
+ void merge_from(std::map<spg_t,PGRef>& sources, PeeringCtx &rctx,
+ unsigned split_bits,
+ const pg_merge_meta_t& last_pg_merge_meta);
+ void finish_split_stats(const object_stat_sum_t& stats,
+ ObjectStore::Transaction &t);
+
+ void scrub(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ // a new scrub
+ forward_scrub_event(&ScrubPgIF::initiate_regular_scrub, queued, "StartScrub");
+ }
+
+ /**
+ * a special version of PG::scrub(), which:
+ * - is initiated after repair, and
+ * (not true anymore:)
+ * - is not required to allocate local/remote OSD scrub resources
+ */
+ void recovery_scrub(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ // a new scrub
+ forward_scrub_event(&ScrubPgIF::initiate_scrub_after_repair, queued,
+ "AfterRepairScrub");
+ }
+
+ void replica_scrub(epoch_t queued,
+ Scrub::act_token_t act_token,
+ ThreadPool::TPHandle& handle);
+
+ void replica_scrub_resched(epoch_t queued,
+ Scrub::act_token_t act_token,
+ ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_sched_replica, queued, act_token,
+ "SchedReplica");
+ }
+
+ void scrub_send_resources_granted(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_remotes_reserved, queued, "RemotesReserved");
+ }
+
+ void scrub_send_resources_denied(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_reservation_failure, queued,
+ "ReservationFailure");
+ }
+
+ void scrub_send_scrub_resched(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_scrub_resched, queued, "InternalSchedScrub");
+ }
+
+ void scrub_send_pushes_update(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::active_pushes_notification, queued,
+ "ActivePushesUpd");
+ }
+
+ void scrub_send_applied_update(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::update_applied_notification, queued,
+ "UpdatesApplied");
+ }
+
+ void scrub_send_unblocking(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_scrub_unblock, queued, "Unblocked");
+ }
+
+ void scrub_send_digest_update(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::digest_update_notification, queued, "DigestUpdate");
+ }
+
+ void scrub_send_local_map_ready(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_local_map_done, queued, "IntLocalMapDone");
+ }
+
+ void scrub_send_replmaps_ready(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_replica_maps_ready, queued, "GotReplicas");
+ }
+
+ void scrub_send_replica_pushes(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_replica_pushes_upd, queued,
+ "ReplicaPushesUpd");
+ }
+
+ void scrub_send_get_next_chunk(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_get_next_chunk, queued, "NextChunk");
+ }
+
+ void scrub_send_scrub_is_finished(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_scrub_is_finished, queued, "ScrubFinished");
+ }
+
+ void scrub_send_chunk_free(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_chunk_free, queued, "SelectedChunkFree");
+ }
+
+ void scrub_send_chunk_busy(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_chunk_busy, queued, "ChunkIsBusy");
+ }
+
+ void queue_want_pg_temp(const std::vector<int> &wanted) override;
+ void clear_want_pg_temp() override;
+
+ void on_new_interval() override;
+
+ void on_role_change() override;
+ virtual void plpg_on_role_change() = 0;
+
+ void init_collection_pool_opts();
+ void on_pool_change() override;
+ virtual void plpg_on_pool_change() = 0;
+
+ void on_info_history_change() override;
+
+ void on_primary_status_change(bool was_primary, bool now_primary) override;
+
+ void reschedule_scrub() override;
+
+ void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) override;
+
+ uint64_t get_snap_trimq_size() const override {
+ return snap_trimq.size();
+ }
+
+ static void add_objects_trimmed_count(
+ int64_t count, pg_stat_t &stats) {
+ stats.objects_trimmed += count;
+ }
+
+ void add_objects_trimmed_count(int64_t count) {
+ recovery_state.update_stats_wo_resched(
+ [count](auto &history, auto &stats) {
+ add_objects_trimmed_count(count, stats);
+ });
+ }
+
+ static void reset_objects_trimmed(pg_stat_t &stats) {
+ stats.objects_trimmed = 0;
+ }
+
+ void reset_objects_trimmed() {
+ recovery_state.update_stats_wo_resched(
+ [](auto &history, auto &stats) {
+ reset_objects_trimmed(stats);
+ });
+ }
+
+ utime_t snaptrim_begin_stamp;
+
+ void set_snaptrim_begin_stamp() {
+ snaptrim_begin_stamp = ceph_clock_now();
+ }
+
+ void set_snaptrim_duration() {
+ utime_t cur_stamp = ceph_clock_now();
+ utime_t duration = cur_stamp - snaptrim_begin_stamp;
+ recovery_state.update_stats_wo_resched(
+ [duration](auto &history, auto &stats) {
+ stats.snaptrim_duration = double(duration);
+ });
+ }
+
+ unsigned get_target_pg_log_entries() const override;
+
+ void clear_publish_stats() override;
+ void clear_primary_state() override;
+
+ epoch_t cluster_osdmap_trim_lower_bound() override;
+ OstreamTemp get_clog_error() override;
+ OstreamTemp get_clog_info() override;
+ OstreamTemp get_clog_debug() override;
+
+ void schedule_event_after(
+ PGPeeringEventRef event,
+ float delay) override;
+ void request_local_background_io_reservation(
+ unsigned priority,
+ PGPeeringEventURef on_grant,
+ PGPeeringEventURef on_preempt) override;
+ void update_local_background_io_priority(
+ unsigned priority) override;
+ void cancel_local_background_io_reservation() override;
+
+ void request_remote_recovery_reservation(
+ unsigned priority,
+ PGPeeringEventURef on_grant,
+ PGPeeringEventURef on_preempt) override;
+ void cancel_remote_recovery_reservation() override;
+
+ void schedule_event_on_commit(
+ ObjectStore::Transaction &t,
+ PGPeeringEventRef on_commit) override;
+
+ void on_active_exit() override;
+
+ Context *on_clean() override {
+ if (is_active()) {
+ kick_snap_trim();
+ }
+ requeue_ops(waiting_for_clean_to_primary_repair);
+ return finish_recovery();
+ }
+
+ void on_activate(interval_set<snapid_t> snaps) override;
+
+ void on_activate_committed() override;
+
+ void on_active_actmap() override;
+ void on_active_advmap(const OSDMapRef &osdmap) override;
+
+ void queue_snap_retrim(snapid_t snap);
+
+ void on_backfill_reserved() override;
+ void on_backfill_canceled() override;
+ void on_recovery_reserved() override;
+
+ bool is_forced_recovery_or_backfill() const {
+ return recovery_state.is_forced_recovery_or_backfill();
+ }
+
+ PGLog::LogEntryHandlerRef get_log_handler(
+ ObjectStore::Transaction &t) override {
+ return std::make_unique<PG::PGLogEntryHandler>(this, &t);
+ }
+
+ std::pair<ghobject_t, bool> do_delete_work(ObjectStore::Transaction &t,
+ ghobject_t _next) override;
+
+ void clear_ready_to_merge() override;
+ void set_not_ready_to_merge_target(pg_t pgid, pg_t src) override;
+ void set_not_ready_to_merge_source(pg_t pgid) override;
+ void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) override;
+ void set_ready_to_merge_source(eversion_t lu) override;
+
+ void send_pg_created(pg_t pgid) override;
+
+ ceph::signedspan get_mnow() const override;
+ HeartbeatStampsRef get_hb_stamps(int peer) override;
+ void schedule_renew_lease(epoch_t lpr, ceph::timespan delay) override;
+ void queue_check_readable(epoch_t lpr, ceph::timespan delay) override;
+
+ void rebuild_missing_set_with_deletes(PGLog &pglog) override;
+
+ void queue_peering_event(PGPeeringEventRef evt);
+ void do_peering_event(PGPeeringEventRef evt, PeeringCtx &rcx);
+ void queue_null(epoch_t msg_epoch, epoch_t query_epoch);
+ void queue_flushed(epoch_t started_at);
+ void handle_advance_map(
+ OSDMapRef osdmap, OSDMapRef lastmap,
+ std::vector<int>& newup, int up_primary,
+ std::vector<int>& newacting, int acting_primary,
+ PeeringCtx &rctx);
+ void handle_activate_map(PeeringCtx &rctx);
+ void handle_initialize(PeeringCtx &rxcx);
+ void handle_query_state(ceph::Formatter *f);
+
+ /**
+ * @param ops_begun returns how many recovery ops the function started
+ * @returns true if any useful work was accomplished; false otherwise
+ */
+ virtual bool start_recovery_ops(
+ uint64_t max,
+ ThreadPool::TPHandle &handle,
+ uint64_t *ops_begun) = 0;
+
+ // more work after the above, but with a PeeringCtx
+ void find_unfound(epoch_t queued, PeeringCtx &rctx);
+
+ virtual void get_watchers(std::list<obj_watch_item_t> *ls) = 0;
+
+ void dump_pgstate_history(ceph::Formatter *f);
+ void dump_missing(ceph::Formatter *f);
+
+ void with_pg_stats(ceph::coarse_real_clock::time_point now_is,
+ std::function<void(const pg_stat_t&, epoch_t lec)>&& f);
+ void with_heartbeat_peers(std::function<void(int)>&& f);
+
+ void shutdown();
+ virtual void on_shutdown() = 0;
+
+ bool get_must_scrub() const;
+ Scrub::schedule_result_t sched_scrub();
+
+ unsigned int scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const;
+ /// the version that refers to flags_.priority
+ unsigned int scrub_requeue_priority(Scrub::scrub_prio_t with_priority) const;
+private:
+ // auxiliaries used by sched_scrub():
+ double next_deepscrub_interval() const;
+
+ /// should we perform deep scrub?
+ bool is_time_for_deep(bool allow_deep_scrub,
+ bool allow_shallow_scrub,
+ bool has_deep_errors,
+ const requested_scrub_t& planned) const;
+
+ /**
+ * Validate the various 'next scrub' flags in m_planned_scrub against configuration
+ * and scrub-related timestamps.
+ *
+ * @returns an updated copy of the m_planned_flags (or nothing if no scrubbing)
+ */
+ std::optional<requested_scrub_t> validate_scrub_mode() const;
+
+ std::optional<requested_scrub_t> validate_periodic_mode(
+ bool allow_deep_scrub,
+ bool try_to_auto_repair,
+ bool allow_shallow_scrub,
+ bool time_for_deep,
+ bool has_deep_errors,
+ const requested_scrub_t& planned) const;
+
+ std::optional<requested_scrub_t> validate_initiated_scrub(
+ bool allow_deep_scrub,
+ bool try_to_auto_repair,
+ bool time_for_deep,
+ bool has_deep_errors,
+ const requested_scrub_t& planned) const;
+
+ using ScrubAPI = void (ScrubPgIF::*)(epoch_t epoch_queued);
+ void forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued, std::string_view desc);
+ // and for events that carry a meaningful 'activation token'
+ using ScrubSafeAPI = void (ScrubPgIF::*)(epoch_t epoch_queued,
+ Scrub::act_token_t act_token);
+ void forward_scrub_event(ScrubSafeAPI fn,
+ epoch_t epoch_queued,
+ Scrub::act_token_t act_token,
+ std::string_view desc);
+
+public:
+ virtual void do_request(
+ OpRequestRef& op,
+ ThreadPool::TPHandle &handle
+ ) = 0;
+ virtual void clear_cache() = 0;
+ virtual int get_cache_obj_count() = 0;
+
+ virtual void snap_trimmer(epoch_t epoch_queued) = 0;
+ virtual void do_command(
+ const std::string_view& prefix,
+ const cmdmap_t& cmdmap,
+ const ceph::buffer::list& idata,
+ std::function<void(int,const std::string&,ceph::buffer::list&)> on_finish) = 0;
+
+ virtual bool agent_work(int max) = 0;
+ virtual bool agent_work(int max, int agent_flush_quota) = 0;
+ virtual void agent_stop() = 0;
+ virtual void agent_delay() = 0;
+ virtual void agent_clear() = 0;
+ virtual void agent_choose_mode_restart() = 0;
+
+ struct C_DeleteMore : public Context {
+ PGRef pg;
+ epoch_t epoch;
+ C_DeleteMore(PG *p, epoch_t e) : pg(p), epoch(e) {}
+ void finish(int r) override {
+ ceph_abort();
+ }
+ void complete(int r) override;
+ };
+
+ virtual void set_dynamic_perf_stats_queries(
+ const std::list<OSDPerfMetricQuery> &queries) {
+ }
+ virtual void get_dynamic_perf_stats(DynamicPerfStats *stats) {
+ }
+
+ uint64_t get_min_alloc_size() const;
+
+ // reference counting
+#ifdef PG_DEBUG_REFS
+ uint64_t get_with_id();
+ void put_with_id(uint64_t);
+ void dump_live_ids();
+#endif
+ void get(const char* tag);
+ void put(const char* tag);
+ int get_num_ref() {
+ return ref;
+ }
+
+ // ctor
+ PG(OSDService *o, OSDMapRef curmap,
+ const PGPool &pool, spg_t p);
+ ~PG() override;
+
+ // prevent copying
+ explicit PG(const PG& rhs) = delete;
+ PG& operator=(const PG& rhs) = delete;
+
+protected:
+ // -------------
+ // protected
+ OSDService *osd;
+public:
+ OSDShard *osd_shard = nullptr;
+ OSDShardPGSlot *pg_slot = nullptr;
+protected:
+ CephContext *cct;
+
+ // locking and reference counting.
+ // I destroy myself when the reference count hits zero.
+ // lock() should be called before doing anything.
+ // get() should be called on pointer copy (to another thread, etc.).
+ // put() should be called on destruction of some previously copied pointer.
+ // unlock() when done with the current pointer (_most common_).
+ mutable ceph::mutex _lock = ceph::make_mutex("PG::_lock");
+#ifndef CEPH_DEBUG_MUTEX
+ mutable std::thread::id locked_by;
+#endif
+ std::atomic<unsigned int> ref{0};
+
+#ifdef PG_DEBUG_REFS
+ ceph::mutex _ref_id_lock = ceph::make_mutex("PG::_ref_id_lock");
+ std::map<uint64_t, std::string> _live_ids;
+ std::map<std::string, uint64_t> _tag_counts;
+ uint64_t _ref_id = 0;
+
+ friend uint64_t get_with_id(PG *pg) { return pg->get_with_id(); }
+ friend void put_with_id(PG *pg, uint64_t id) { return pg->put_with_id(id); }
+#endif
+
+private:
+ friend void intrusive_ptr_add_ref(PG *pg) {
+ pg->get("intptr");
+ }
+ friend void intrusive_ptr_release(PG *pg) {
+ pg->put("intptr");
+ }
+
+
+ // =====================
+
+protected:
+ OSDriver osdriver;
+ SnapMapper snap_mapper;
+
+ virtual PGBackend *get_pgbackend() = 0;
+ virtual const PGBackend* get_pgbackend() const = 0;
+
+protected:
+ void requeue_map_waiters();
+
+protected:
+
+ ZTracer::Endpoint trace_endpoint;
+
+
+protected:
+ __u8 info_struct_v = 0;
+ void upgrade(ObjectStore *store);
+
+protected:
+ ghobject_t pgmeta_oid;
+
+ // ------------------
+ interval_set<snapid_t> snap_trimq;
+ std::set<snapid_t> snap_trimq_repeat;
+
+ /* You should not use these items without taking their respective queue locks
+ * (if they have one) */
+ xlist<PG*>::item stat_queue_item;
+ bool recovery_queued;
+
+ int recovery_ops_active;
+ std::set<pg_shard_t> waiting_on_backfill;
+#ifdef DEBUG_RECOVERY_OIDS
+ multiset<hobject_t> recovering_oids;
+#endif
+
+public:
+ bool dne() { return info.dne(); }
+
+ void send_cluster_message(
+ int osd, MessageRef m, epoch_t epoch, bool share_map_update) override;
+
+protected:
+ epoch_t get_last_peering_reset() const {
+ return recovery_state.get_last_peering_reset();
+ }
+
+ /* heartbeat peers */
+ void set_probe_targets(const std::set<pg_shard_t> &probe_set) override;
+ void clear_probe_targets() override;
+
+ ceph::mutex heartbeat_peer_lock =
+ ceph::make_mutex("PG::heartbeat_peer_lock");
+ std::set<int> heartbeat_peers;
+ std::set<int> probe_targets;
+
+protected:
+ BackfillInterval backfill_info;
+ std::map<pg_shard_t, BackfillInterval> peer_backfill_info;
+ bool backfill_reserving;
+
+ // The primary's num_bytes and local num_bytes for this pg, only valid
+ // during backfill for non-primary shards.
+ // Both of these are adjusted for EC to reflect the on-disk bytes
+ std::atomic<int64_t> primary_num_bytes = 0;
+ std::atomic<int64_t> local_num_bytes = 0;
+
+public:
+ // Space reserved for backfill is primary_num_bytes - local_num_bytes
+ // Don't care that difference itself isn't atomic
+ uint64_t get_reserved_num_bytes() {
+ int64_t primary = primary_num_bytes.load();
+ int64_t local = local_num_bytes.load();
+ if (primary > local)
+ return primary - local;
+ else
+ return 0;
+ }
+
+ bool is_remote_backfilling() {
+ return primary_num_bytes.load() > 0;
+ }
+
+ bool try_reserve_recovery_space(int64_t primary, int64_t local) override;
+ void unreserve_recovery_space() override;
+
+ // If num_bytes are inconsistent and local_num- goes negative
+ // it's ok, because it would then be ignored.
+
+ // The value of num_bytes could be negative,
+ // but we don't let local_num_bytes go negative.
+ void add_local_num_bytes(int64_t num_bytes) {
+ if (num_bytes) {
+ int64_t prev_bytes = local_num_bytes.load();
+ int64_t new_bytes;
+ do {
+ new_bytes = prev_bytes + num_bytes;
+ if (new_bytes < 0)
+ new_bytes = 0;
+ } while(!local_num_bytes.compare_exchange_weak(prev_bytes, new_bytes));
+ }
+ }
+ void sub_local_num_bytes(int64_t num_bytes) {
+ ceph_assert(num_bytes >= 0);
+ if (num_bytes) {
+ int64_t prev_bytes = local_num_bytes.load();
+ int64_t new_bytes;
+ do {
+ new_bytes = prev_bytes - num_bytes;
+ if (new_bytes < 0)
+ new_bytes = 0;
+ } while(!local_num_bytes.compare_exchange_weak(prev_bytes, new_bytes));
+ }
+ }
+ // The value of num_bytes could be negative,
+ // but we don't let info.stats.stats.sum.num_bytes go negative.
+ void add_num_bytes(int64_t num_bytes) {
+ ceph_assert(ceph_mutex_is_locked_by_me(_lock));
+ if (num_bytes) {
+ recovery_state.update_stats(
+ [num_bytes](auto &history, auto &stats) {
+ stats.stats.sum.num_bytes += num_bytes;
+ if (stats.stats.sum.num_bytes < 0) {
+ stats.stats.sum.num_bytes = 0;
+ }
+ return false;
+ });
+ }
+ }
+ void sub_num_bytes(int64_t num_bytes) {
+ ceph_assert(ceph_mutex_is_locked_by_me(_lock));
+ ceph_assert(num_bytes >= 0);
+ if (num_bytes) {
+ recovery_state.update_stats(
+ [num_bytes](auto &history, auto &stats) {
+ stats.stats.sum.num_bytes -= num_bytes;
+ if (stats.stats.sum.num_bytes < 0) {
+ stats.stats.sum.num_bytes = 0;
+ }
+ return false;
+ });
+ }
+ }
+
+ // Only used in testing so not worried about needing the PG lock here
+ int64_t get_stats_num_bytes() {
+ std::lock_guard l{_lock};
+ int num_bytes = info.stats.stats.sum.num_bytes;
+ if (pool.info.is_erasure()) {
+ num_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
+ // Round up each object by a stripe
+ num_bytes += get_pgbackend()->get_ec_stripe_chunk_size() * info.stats.stats.sum.num_objects;
+ }
+ int64_t lnb = local_num_bytes.load();
+ if (lnb && lnb != num_bytes) {
+ lgeneric_dout(cct, 0) << this << " " << info.pgid << " num_bytes mismatch "
+ << lnb << " vs stats "
+ << info.stats.stats.sum.num_bytes << " / chunk "
+ << get_pgbackend()->get_ec_data_chunk_count()
+ << dendl;
+ }
+ return num_bytes;
+ }
+
+protected:
+
+ /*
+ * blocked request wait hierarchy
+ *
+ * In order to preserve request ordering we need to be careful about the
+ * order in which blocked requests get requeued. Generally speaking, we
+ * push the requests back up to the op_wq in reverse order (most recent
+ * request first) so that they come back out again in the original order.
+ * However, because there are multiple wait queues, we need to requeue
+ * waitlists in order. Generally speaking, we requeue the wait lists
+ * that are checked first.
+ *
+ * Here are the various wait lists, in the order they are used during
+ * request processing, with notes:
+ *
+ * - waiting_for_map
+ * - may start or stop blocking at any time (depending on client epoch)
+ * - waiting_for_peered
+ * - !is_peered()
+ * - only starts blocking on interval change; never restarts
+ * - waiting_for_flush
+ * - flushes_in_progress
+ * - waiting for final flush during activate
+ * - waiting_for_active
+ * - !is_active()
+ * - only starts blocking on interval change; never restarts
+ * - waiting_for_readable
+ * - now > readable_until
+ * - unblocks when we get fresh(er) osd_pings
+ * - waiting_for_scrub
+ * - starts and stops blocking for varying intervals during scrub
+ * - waiting_for_unreadable_object
+ * - never restarts once object is readable (* except for EIO?)
+ * - waiting_for_degraded_object
+ * - never restarts once object is writeable (* except for EIO?)
+ * - waiting_for_blocked_object
+ * - starts and stops based on proxied op activity
+ * - obc rwlocks
+ * - starts and stops based on read/write activity
+ *
+ * Notes:
+ *
+ * 1. During and interval change, we requeue *everything* in the above order.
+ *
+ * 2. When an obc rwlock is released, we check for a scrub block and requeue
+ * the op there if it applies. We ignore the unreadable/degraded/blocked
+ * queues because we assume they cannot apply at that time (this is
+ * probably mostly true).
+ *
+ * 3. The requeue_ops helper will push ops onto the waiting_for_map std::list if
+ * it is non-empty.
+ *
+ * These three behaviors are generally sufficient to maintain ordering, with
+ * the possible exception of cases where we make an object degraded or
+ * unreadable that was previously okay, e.g. when scrub or op processing
+ * encounter an unexpected error. FIXME.
+ */
+
+ // ops with newer maps than our (or blocked behind them)
+ // track these by client, since inter-request ordering doesn't otherwise
+ // matter.
+ std::unordered_map<entity_name_t,std::list<OpRequestRef>> waiting_for_map;
+
+ // ops waiting on peered
+ std::list<OpRequestRef> waiting_for_peered;
+
+ /// ops waiting on readble
+ std::list<OpRequestRef> waiting_for_readable;
+
+ // ops waiting on active (require peered as well)
+ std::list<OpRequestRef> waiting_for_active;
+ std::list<OpRequestRef> waiting_for_flush;
+ std::list<OpRequestRef> waiting_for_scrub;
+
+ std::list<OpRequestRef> waiting_for_cache_not_full;
+ std::list<OpRequestRef> waiting_for_clean_to_primary_repair;
+ std::map<hobject_t, std::list<OpRequestRef>> waiting_for_unreadable_object,
+ waiting_for_degraded_object,
+ waiting_for_blocked_object;
+
+ std::set<hobject_t> objects_blocked_on_cache_full;
+ std::map<hobject_t,snapid_t> objects_blocked_on_degraded_snap;
+ std::map<hobject_t,ObjectContextRef> objects_blocked_on_snap_promotion;
+
+ // Callbacks should assume pg (and nothing else) is locked
+ std::map<hobject_t, std::list<Context*>> callbacks_for_degraded_object;
+
+ std::map<eversion_t,
+ std::list<
+ std::tuple<OpRequestRef, version_t, int,
+ std::vector<pg_log_op_return_item_t>>>> waiting_for_ondisk;
+
+ void requeue_object_waiters(std::map<hobject_t, std::list<OpRequestRef>>& m);
+ void requeue_op(OpRequestRef op);
+ void requeue_ops(std::list<OpRequestRef> &l);
+
+ // stats that persist lazily
+ object_stat_collection_t unstable_stats;
+
+ // publish stats
+ ceph::mutex pg_stats_publish_lock =
+ ceph::make_mutex("PG::pg_stats_publish_lock");
+ std::optional<pg_stat_t> pg_stats_publish;
+
+ friend class TestOpsSocketHook;
+ void publish_stats_to_osd() override;
+
+ bool needs_recovery() const {
+ return recovery_state.needs_recovery();
+ }
+ bool needs_backfill() const {
+ return recovery_state.needs_backfill();
+ }
+
+ bool all_unfound_are_queried_or_lost(const OSDMapRef osdmap) const;
+
+ struct PGLogEntryHandler : public PGLog::LogEntryHandler {
+ PG *pg;
+ ObjectStore::Transaction *t;
+ PGLogEntryHandler(PG *pg, ObjectStore::Transaction *t) : pg(pg), t(t) {}
+
+ // LogEntryHandler
+ void remove(const hobject_t &hoid) override {
+ pg->get_pgbackend()->remove(hoid, t);
+ }
+ void try_stash(const hobject_t &hoid, version_t v) override {
+ pg->get_pgbackend()->try_stash(hoid, v, t);
+ }
+ void rollback(const pg_log_entry_t &entry) override {
+ ceph_assert(entry.can_rollback());
+ pg->get_pgbackend()->rollback(entry, t);
+ }
+ void rollforward(const pg_log_entry_t &entry) override {
+ pg->get_pgbackend()->rollforward(entry, t);
+ }
+ void trim(const pg_log_entry_t &entry) override {
+ pg->get_pgbackend()->trim(entry, t);
+ }
+ };
+
+ void update_object_snap_mapping(
+ ObjectStore::Transaction *t, const hobject_t &soid,
+ const std::set<snapid_t> &snaps);
+ void clear_object_snap_mapping(
+ ObjectStore::Transaction *t, const hobject_t &soid);
+ void remove_snap_mapped_object(
+ ObjectStore::Transaction& t, const hobject_t& soid);
+
+ bool have_unfound() const {
+ return recovery_state.have_unfound();
+ }
+ uint64_t get_num_unfound() const {
+ return recovery_state.get_num_unfound();
+ }
+
+ virtual void check_local() = 0;
+
+ void purge_strays();
+
+ void update_heartbeat_peers(std::set<int> peers) override;
+
+ Context *finish_sync_event;
+
+ Context *finish_recovery();
+ void _finish_recovery(Context *c);
+ struct C_PG_FinishRecovery : public Context {
+ PGRef pg;
+ explicit C_PG_FinishRecovery(PG *p) : pg(p) {}
+ void finish(int r) override {
+ pg->_finish_recovery(this);
+ }
+ };
+ void cancel_recovery();
+ void clear_recovery_state();
+ virtual void _clear_recovery_state() = 0;
+ void start_recovery_op(const hobject_t& soid);
+ void finish_recovery_op(const hobject_t& soid, bool dequeue=false);
+
+ virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits) = 0;
+
+ friend class C_OSD_RepModify_Commit;
+ friend struct C_DeleteMore;
+
+ // -- backoff --
+ ceph::mutex backoff_lock = // orders inside Backoff::lock
+ ceph::make_mutex("PG::backoff_lock");
+ std::map<hobject_t,std::set<ceph::ref_t<Backoff>>> backoffs;
+
+ void add_backoff(const ceph::ref_t<Session>& s, const hobject_t& begin, const hobject_t& end);
+ void release_backoffs(const hobject_t& begin, const hobject_t& end);
+ void release_backoffs(const hobject_t& o) {
+ release_backoffs(o, o);
+ }
+ void clear_backoffs();
+
+ void add_pg_backoff(const ceph::ref_t<Session>& s) {
+ hobject_t begin = info.pgid.pgid.get_hobj_start();
+ hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
+ add_backoff(s, begin, end);
+ }
+public:
+ void release_pg_backoffs() {
+ hobject_t begin = info.pgid.pgid.get_hobj_start();
+ hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
+ release_backoffs(begin, end);
+ }
+
+ // -- scrub --
+protected:
+ bool scrub_after_recovery;
+
+ int active_pushes;
+
+ [[nodiscard]] bool ops_blocked_by_scrub() const;
+ [[nodiscard]] Scrub::scrub_prio_t is_scrub_blocking_ops() const;
+
+ void _scan_rollback_obs(const std::vector<ghobject_t> &rollback_obs);
+ /**
+ * returns true if [begin, end) is good to scrub at this time
+ * a false return value obliges the implementer to requeue scrub when the
+ * condition preventing scrub clears
+ */
+ virtual bool _range_available_for_scrub(
+ const hobject_t &begin, const hobject_t &end) = 0;
+
+ /**
+ * Initiate the process that will create our scrub map for the Primary.
+ * (triggered by MSG_OSD_REP_SCRUB)
+ */
+ void replica_scrub(OpRequestRef op, ThreadPool::TPHandle &handle);
+
+ // -- recovery state --
+
+ struct QueuePeeringEvt : Context {
+ PGRef pg;
+ PGPeeringEventRef evt;
+
+ template <class EVT>
+ QueuePeeringEvt(PG *pg, epoch_t epoch, EVT evt) :
+ pg(pg), evt(std::make_shared<PGPeeringEvent>(epoch, epoch, evt)) {}
+
+ QueuePeeringEvt(PG *pg, PGPeeringEventRef evt) :
+ pg(pg), evt(std::move(evt)) {}
+
+ void finish(int r) override {
+ pg->lock();
+ pg->queue_peering_event(std::move(evt));
+ pg->unlock();
+ }
+ };
+
+
+public:
+ int pg_stat_adjust(osd_stat_t *new_stat);
+protected:
+ bool delete_needs_sleep = false;
+
+protected:
+ bool state_test(uint64_t m) const { return recovery_state.state_test(m); }
+ void state_set(uint64_t m) { recovery_state.state_set(m); }
+ void state_clear(uint64_t m) { recovery_state.state_clear(m); }
+
+ bool is_complete() const {
+ return recovery_state.is_complete();
+ }
+ bool should_send_notify() const {
+ return recovery_state.should_send_notify();
+ }
+
+ bool is_active() const { return recovery_state.is_active(); }
+ bool is_activating() const { return recovery_state.is_activating(); }
+ bool is_peering() const { return recovery_state.is_peering(); }
+ bool is_down() const { return recovery_state.is_down(); }
+ bool is_recovery_unfound() const { return recovery_state.is_recovery_unfound(); }
+ bool is_backfill_unfound() const { return recovery_state.is_backfill_unfound(); }
+ bool is_incomplete() const { return recovery_state.is_incomplete(); }
+ bool is_clean() const { return recovery_state.is_clean(); }
+ bool is_degraded() const { return recovery_state.is_degraded(); }
+ bool is_undersized() const { return recovery_state.is_undersized(); }
+ bool is_scrubbing() const { return state_test(PG_STATE_SCRUBBING); } // Primary only
+ bool is_remapped() const { return recovery_state.is_remapped(); }
+ bool is_peered() const { return recovery_state.is_peered(); }
+ bool is_recovering() const { return recovery_state.is_recovering(); }
+ bool is_premerge() const { return recovery_state.is_premerge(); }
+ bool is_repair() const { return recovery_state.is_repair(); }
+ bool is_laggy() const { return state_test(PG_STATE_LAGGY); }
+ bool is_wait() const { return state_test(PG_STATE_WAIT); }
+
+ bool is_empty() const { return recovery_state.is_empty(); }
+
+ // pg on-disk state
+ void do_pending_flush();
+
+public:
+ void prepare_write(
+ pg_info_t &info,
+ pg_info_t &last_written_info,
+ PastIntervals &past_intervals,
+ PGLog &pglog,
+ bool dirty_info,
+ bool dirty_big_info,
+ bool need_write_epoch,
+ ObjectStore::Transaction &t) override;
+
+ void write_if_dirty(PeeringCtx &rctx) {
+ write_if_dirty(rctx.transaction);
+ }
+protected:
+ void write_if_dirty(ObjectStore::Transaction& t) {
+ recovery_state.write_if_dirty(t);
+ }
+
+ PGLog::IndexedLog projected_log;
+ bool check_in_progress_op(
+ const osd_reqid_t &r,
+ eversion_t *version,
+ version_t *user_version,
+ int *return_code,
+ std::vector<pg_log_op_return_item_t> *op_returns) const;
+ eversion_t projected_last_update;
+ eversion_t get_next_version() const {
+ eversion_t at_version(
+ get_osdmap_epoch(),
+ projected_last_update.version+1);
+ ceph_assert(at_version > info.last_update);
+ ceph_assert(at_version > recovery_state.get_pg_log().get_head());
+ ceph_assert(at_version > projected_last_update);
+ return at_version;
+ }
+
+ bool check_log_for_corruption(ObjectStore *store);
+
+ std::string get_corrupt_pg_log_name() const;
+
+ void update_snap_map(
+ const std::vector<pg_log_entry_t> &log_entries,
+ ObjectStore::Transaction& t);
+
+ void filter_snapc(std::vector<snapid_t> &snaps);
+
+ virtual void kick_snap_trim() = 0;
+ virtual void snap_trimmer_scrub_complete() = 0;
+
+ void queue_recovery();
+ void queue_scrub_after_repair();
+ unsigned int get_scrub_priority();
+
+ bool try_flush_or_schedule_async() override;
+ void start_flush_on_transaction(
+ ObjectStore::Transaction &t) override;
+
+ void update_history(const pg_history_t& history) {
+ recovery_state.update_history(history);
+ }
+
+ // OpRequest queueing
+ bool can_discard_op(OpRequestRef& op);
+ bool can_discard_scan(OpRequestRef op);
+ bool can_discard_backfill(OpRequestRef op);
+ bool can_discard_request(OpRequestRef& op);
+
+ template<typename T, int MSGTYPE>
+ bool can_discard_replica_op(OpRequestRef& op);
+
+ bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);
+ bool old_peering_evt(PGPeeringEventRef evt) {
+ return old_peering_msg(evt->get_epoch_sent(), evt->get_epoch_requested());
+ }
+ bool have_same_or_newer_map(epoch_t e) {
+ return e <= get_osdmap_epoch();
+ }
+
+ bool op_has_sufficient_caps(OpRequestRef& op);
+
+ // abstract bits
+ friend struct FlushState;
+
+ friend ostream& operator<<(ostream& out, const PG& pg);
+
+protected:
+ PeeringState recovery_state;
+
+ // ref to recovery_state.pool
+ const PGPool &pool;
+
+ // ref to recovery_state.info
+ const pg_info_t &info;
+
+
+// ScrubberPasskey getters/misc:
+public:
+ const pg_info_t& get_pg_info(ScrubberPasskey) const final { return info; }
+
+ OSDService* get_pg_osd(ScrubberPasskey) const { return osd; }
+
+ requested_scrub_t& get_planned_scrub(ScrubberPasskey)
+ {
+ return m_planned_scrub;
+ }
+
+ void force_object_missing(ScrubberPasskey,
+ const std::set<pg_shard_t>& peer,
+ const hobject_t& oid,
+ eversion_t version) final
+ {
+ recovery_state.force_object_missing(peer, oid, version);
+ }
+
+ uint64_t logical_to_ondisk_size(uint64_t logical_size) const final
+ {
+ return get_pgbackend()->be_get_ondisk_size(logical_size);
+ }
+};
+
+#endif