// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #pragma once #include #include "crimson/common/type_helpers.h" #include "crimson/os/futurized_store.h" #include "crimson/os/futurized_collection.h" #include "crimson/osd/object_context.h" #include "crimson/osd/shard_services.h" #include "messages/MOSDPGBackfill.h" #include "messages/MOSDPGBackfillRemove.h" #include "messages/MOSDPGScan.h" #include "osd/recovery_types.h" #include "osd/osd_types.h" namespace crimson::osd{ class PG; } class PGBackend; class RecoveryBackend { void handle_backfill_finish( MOSDPGBackfill& m); seastar::future<> handle_backfill_progress( MOSDPGBackfill& m); seastar::future<> handle_backfill_finish_ack( MOSDPGBackfill& m); seastar::future<> handle_backfill(MOSDPGBackfill& m); seastar::future<> handle_backfill_remove(MOSDPGBackfillRemove& m); seastar::future<> handle_scan_get_digest( MOSDPGScan& m); seastar::future<> handle_scan_digest( MOSDPGScan& m); seastar::future<> handle_scan( MOSDPGScan& m); protected: class WaitForObjectRecovery; public: RecoveryBackend(crimson::osd::PG& pg, crimson::osd::ShardServices& shard_services, crimson::os::CollectionRef coll, PGBackend* backend) : pg{pg}, shard_services{shard_services}, store{&shard_services.get_store()}, coll{coll}, backend{backend} {} virtual ~RecoveryBackend() {} WaitForObjectRecovery& add_recovering(const hobject_t& soid) { auto [it, added] = recovering.emplace(soid, WaitForObjectRecovery{}); assert(added); return it->second; } WaitForObjectRecovery& get_recovering(const hobject_t& soid) { assert(is_recovering(soid)); return recovering.at(soid); } void remove_recovering(const hobject_t& soid) { recovering.erase(soid); } bool is_recovering(const hobject_t& soid) const { return recovering.count(soid) != 0; } uint64_t total_recovering() const { return recovering.size(); } virtual seastar::future<> handle_recovery_op( Ref m); virtual seastar::future<> recover_object( const hobject_t& soid, eversion_t need) = 0; virtual seastar::future<> recover_delete( const hobject_t& soid, eversion_t need) = 0; virtual seastar::future<> push_delete( const hobject_t& soid, eversion_t need) = 0; seastar::future scan_for_backfill( const hobject_t& from, std::int64_t min, std::int64_t max); void on_peering_interval_change(ceph::os::Transaction& t) { clean_up(t, "new peering interval"); } seastar::future<> stop() { for (auto& [soid, recovery_waiter] : recovering) { recovery_waiter.stop(); } return on_stop(); } protected: crimson::osd::PG& pg; crimson::osd::ShardServices& shard_services; crimson::os::FuturizedStore* store; crimson::os::CollectionRef coll; PGBackend* backend; struct PullInfo { pg_shard_t from; hobject_t soid; ObjectRecoveryProgress recovery_progress; ObjectRecoveryInfo recovery_info; crimson::osd::ObjectContextRef head_ctx; crimson::osd::ObjectContextRef obc; object_stat_sum_t stat; bool is_complete() const { return recovery_progress.is_complete(recovery_info); } }; struct PushInfo { ObjectRecoveryProgress recovery_progress; ObjectRecoveryInfo recovery_info; crimson::osd::ObjectContextRef obc; object_stat_sum_t stat; }; class WaitForObjectRecovery : public crimson::osd::BlockerT { seastar::shared_promise<> readable, recovered, pulled; std::map> pushes; public: static constexpr const char* type_name = "WaitForObjectRecovery"; crimson::osd::ObjectContextRef obc; std::optional pi; std::map pushing; seastar::future<> wait_for_readable() { return readable.get_shared_future(); } seastar::future<> wait_for_pushes(pg_shard_t shard) { return pushes[shard].get_shared_future(); } seastar::future<> wait_for_recovered() { return recovered.get_shared_future(); } crimson::osd::blocking_future<> wait_for_recovered_blocking() { return make_blocking_future( recovered.get_shared_future()); } seastar::future<> wait_for_pull() { return pulled.get_shared_future(); } void set_readable() { readable.set_value(); } void set_recovered() { recovered.set_value(); } void set_pushed(pg_shard_t shard) { pushes[shard].set_value(); } void set_pulled() { pulled.set_value(); } void set_push_failed(pg_shard_t shard, std::exception_ptr e) { pushes.at(shard).set_exception(e); } void interrupt(std::string_view why) { readable.set_exception(std::system_error( std::make_error_code(std::errc::interrupted), why.data())); recovered.set_exception(std::system_error( std::make_error_code(std::errc::interrupted), why.data())); pulled.set_exception(std::system_error( std::make_error_code(std::errc::interrupted), why.data())); for (auto& [pg_shard, pr] : pushes) { pr.set_exception(std::system_error( std::make_error_code(std::errc::interrupted), why.data())); } } void stop(); void dump_detail(Formatter* f) const { } }; std::map recovering; hobject_t get_temp_recovery_object( const hobject_t& target, eversion_t version) const; boost::container::flat_set temp_contents; void add_temp_obj(const hobject_t &oid) { temp_contents.insert(oid); } void clear_temp_obj(const hobject_t &oid) { temp_contents.erase(oid); } void clean_up(ceph::os::Transaction& t, std::string_view why); virtual seastar::future<> on_stop() = 0; };