diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/crimson/osd/recovery_backend.h | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/osd/recovery_backend.h')
-rw-r--r-- | src/crimson/osd/recovery_backend.h | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h new file mode 100644 index 000000000..cb0ae9f20 --- /dev/null +++ b/src/crimson/osd/recovery_backend.h @@ -0,0 +1,203 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/future.hh> + +#include "crimson/common/type_helpers.h" +#include "crimson/os/futurized_store.h" +#include "crimson/os/futurized_collection.h" +#include "crimson/osd/object_context.h" +#include "crimson/osd/shard_services.h" + +#include "messages/MOSDPGBackfill.h" +#include "messages/MOSDPGBackfillRemove.h" +#include "messages/MOSDPGScan.h" +#include "osd/recovery_types.h" +#include "osd/osd_types.h" + +namespace crimson::osd{ + class PG; +} + +class PGBackend; + +class RecoveryBackend { + void handle_backfill_finish( + MOSDPGBackfill& m); + seastar::future<> handle_backfill_progress( + MOSDPGBackfill& m); + seastar::future<> handle_backfill_finish_ack( + MOSDPGBackfill& m); + seastar::future<> handle_backfill(MOSDPGBackfill& m); + + seastar::future<> handle_backfill_remove(MOSDPGBackfillRemove& m); + + seastar::future<> handle_scan_get_digest( + MOSDPGScan& m); + seastar::future<> handle_scan_digest( + MOSDPGScan& m); + seastar::future<> handle_scan( + MOSDPGScan& m); +protected: + class WaitForObjectRecovery; +public: + RecoveryBackend(crimson::osd::PG& pg, + crimson::osd::ShardServices& shard_services, + crimson::os::CollectionRef coll, + PGBackend* backend) + : pg{pg}, + shard_services{shard_services}, + store{&shard_services.get_store()}, + coll{coll}, + backend{backend} {} + virtual ~RecoveryBackend() {} + WaitForObjectRecovery& add_recovering(const hobject_t& soid) { + auto [it, added] = recovering.emplace(soid, WaitForObjectRecovery{}); + assert(added); + return it->second; + } + WaitForObjectRecovery& get_recovering(const hobject_t& soid) { + assert(is_recovering(soid)); + return recovering.at(soid); + } + void remove_recovering(const hobject_t& soid) { + recovering.erase(soid); + } + bool is_recovering(const hobject_t& soid) const { + return recovering.count(soid) != 0; + } + uint64_t total_recovering() const { + return recovering.size(); + } + + virtual seastar::future<> handle_recovery_op( + Ref<MOSDFastDispatchOp> m); + + virtual seastar::future<> recover_object( + const hobject_t& soid, + eversion_t need) = 0; + virtual seastar::future<> recover_delete( + const hobject_t& soid, + eversion_t need) = 0; + virtual seastar::future<> push_delete( + const hobject_t& soid, + eversion_t need) = 0; + + seastar::future<BackfillInterval> scan_for_backfill( + const hobject_t& from, + std::int64_t min, + std::int64_t max); + + void on_peering_interval_change(ceph::os::Transaction& t) { + clean_up(t, "new peering interval"); + } + + seastar::future<> stop() { + for (auto& [soid, recovery_waiter] : recovering) { + recovery_waiter.stop(); + } + return on_stop(); + } +protected: + crimson::osd::PG& pg; + crimson::osd::ShardServices& shard_services; + crimson::os::FuturizedStore* store; + crimson::os::CollectionRef coll; + PGBackend* backend; + + struct PullInfo { + pg_shard_t from; + hobject_t soid; + ObjectRecoveryProgress recovery_progress; + ObjectRecoveryInfo recovery_info; + crimson::osd::ObjectContextRef head_ctx; + crimson::osd::ObjectContextRef obc; + object_stat_sum_t stat; + bool is_complete() const { + return recovery_progress.is_complete(recovery_info); + } + }; + + struct PushInfo { + ObjectRecoveryProgress recovery_progress; + ObjectRecoveryInfo recovery_info; + crimson::osd::ObjectContextRef obc; + object_stat_sum_t stat; + }; + + class WaitForObjectRecovery : public crimson::osd::BlockerT<WaitForObjectRecovery> { + seastar::shared_promise<> readable, recovered, pulled; + std::map<pg_shard_t, seastar::shared_promise<>> pushes; + public: + static constexpr const char* type_name = "WaitForObjectRecovery"; + + crimson::osd::ObjectContextRef obc; + std::optional<PullInfo> pi; + std::map<pg_shard_t, PushInfo> pushing; + + seastar::future<> wait_for_readable() { + return readable.get_shared_future(); + } + seastar::future<> wait_for_pushes(pg_shard_t shard) { + return pushes[shard].get_shared_future(); + } + seastar::future<> wait_for_recovered() { + return recovered.get_shared_future(); + } + crimson::osd::blocking_future<> + wait_for_recovered_blocking() { + return make_blocking_future( + recovered.get_shared_future()); + } + seastar::future<> wait_for_pull() { + return pulled.get_shared_future(); + } + void set_readable() { + readable.set_value(); + } + void set_recovered() { + recovered.set_value(); + } + void set_pushed(pg_shard_t shard) { + pushes[shard].set_value(); + } + void set_pulled() { + pulled.set_value(); + } + void set_push_failed(pg_shard_t shard, std::exception_ptr e) { + pushes.at(shard).set_exception(e); + } + void interrupt(std::string_view why) { + readable.set_exception(std::system_error( + std::make_error_code(std::errc::interrupted), why.data())); + recovered.set_exception(std::system_error( + std::make_error_code(std::errc::interrupted), why.data())); + pulled.set_exception(std::system_error( + std::make_error_code(std::errc::interrupted), why.data())); + for (auto& [pg_shard, pr] : pushes) { + pr.set_exception(std::system_error( + std::make_error_code(std::errc::interrupted), why.data())); + } + } + void stop(); + void dump_detail(Formatter* f) const { + } + }; + std::map<hobject_t, WaitForObjectRecovery> recovering; + hobject_t get_temp_recovery_object( + const hobject_t& target, + eversion_t version) const; + + boost::container::flat_set<hobject_t> temp_contents; + + void add_temp_obj(const hobject_t &oid) { + temp_contents.insert(oid); + } + void clear_temp_obj(const hobject_t &oid) { + temp_contents.erase(oid); + } + void clean_up(ceph::os::Transaction& t, std::string_view why); + virtual seastar::future<> on_stop() = 0; +}; |