summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/recovery_backend.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/recovery_backend.h')
-rw-r--r--src/crimson/osd/recovery_backend.h203
1 files changed, 203 insertions, 0 deletions
diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h
new file mode 100644
index 000000000..cb0ae9f20
--- /dev/null
+++ b/src/crimson/osd/recovery_backend.h
@@ -0,0 +1,203 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future.hh>
+
+#include "crimson/common/type_helpers.h"
+#include "crimson/os/futurized_store.h"
+#include "crimson/os/futurized_collection.h"
+#include "crimson/osd/object_context.h"
+#include "crimson/osd/shard_services.h"
+
+#include "messages/MOSDPGBackfill.h"
+#include "messages/MOSDPGBackfillRemove.h"
+#include "messages/MOSDPGScan.h"
+#include "osd/recovery_types.h"
+#include "osd/osd_types.h"
+
+namespace crimson::osd{
+ class PG;
+}
+
+class PGBackend;
+
+class RecoveryBackend {
+ void handle_backfill_finish(
+ MOSDPGBackfill& m);
+ seastar::future<> handle_backfill_progress(
+ MOSDPGBackfill& m);
+ seastar::future<> handle_backfill_finish_ack(
+ MOSDPGBackfill& m);
+ seastar::future<> handle_backfill(MOSDPGBackfill& m);
+
+ seastar::future<> handle_backfill_remove(MOSDPGBackfillRemove& m);
+
+ seastar::future<> handle_scan_get_digest(
+ MOSDPGScan& m);
+ seastar::future<> handle_scan_digest(
+ MOSDPGScan& m);
+ seastar::future<> handle_scan(
+ MOSDPGScan& m);
+protected:
+ class WaitForObjectRecovery;
+public:
+ RecoveryBackend(crimson::osd::PG& pg,
+ crimson::osd::ShardServices& shard_services,
+ crimson::os::CollectionRef coll,
+ PGBackend* backend)
+ : pg{pg},
+ shard_services{shard_services},
+ store{&shard_services.get_store()},
+ coll{coll},
+ backend{backend} {}
+ virtual ~RecoveryBackend() {}
+ WaitForObjectRecovery& add_recovering(const hobject_t& soid) {
+ auto [it, added] = recovering.emplace(soid, WaitForObjectRecovery{});
+ assert(added);
+ return it->second;
+ }
+ WaitForObjectRecovery& get_recovering(const hobject_t& soid) {
+ assert(is_recovering(soid));
+ return recovering.at(soid);
+ }
+ void remove_recovering(const hobject_t& soid) {
+ recovering.erase(soid);
+ }
+ bool is_recovering(const hobject_t& soid) const {
+ return recovering.count(soid) != 0;
+ }
+ uint64_t total_recovering() const {
+ return recovering.size();
+ }
+
+ virtual seastar::future<> handle_recovery_op(
+ Ref<MOSDFastDispatchOp> m);
+
+ virtual seastar::future<> recover_object(
+ const hobject_t& soid,
+ eversion_t need) = 0;
+ virtual seastar::future<> recover_delete(
+ const hobject_t& soid,
+ eversion_t need) = 0;
+ virtual seastar::future<> push_delete(
+ const hobject_t& soid,
+ eversion_t need) = 0;
+
+ seastar::future<BackfillInterval> scan_for_backfill(
+ const hobject_t& from,
+ std::int64_t min,
+ std::int64_t max);
+
+ void on_peering_interval_change(ceph::os::Transaction& t) {
+ clean_up(t, "new peering interval");
+ }
+
+ seastar::future<> stop() {
+ for (auto& [soid, recovery_waiter] : recovering) {
+ recovery_waiter.stop();
+ }
+ return on_stop();
+ }
+protected:
+ crimson::osd::PG& pg;
+ crimson::osd::ShardServices& shard_services;
+ crimson::os::FuturizedStore* store;
+ crimson::os::CollectionRef coll;
+ PGBackend* backend;
+
+ struct PullInfo {
+ pg_shard_t from;
+ hobject_t soid;
+ ObjectRecoveryProgress recovery_progress;
+ ObjectRecoveryInfo recovery_info;
+ crimson::osd::ObjectContextRef head_ctx;
+ crimson::osd::ObjectContextRef obc;
+ object_stat_sum_t stat;
+ bool is_complete() const {
+ return recovery_progress.is_complete(recovery_info);
+ }
+ };
+
+ struct PushInfo {
+ ObjectRecoveryProgress recovery_progress;
+ ObjectRecoveryInfo recovery_info;
+ crimson::osd::ObjectContextRef obc;
+ object_stat_sum_t stat;
+ };
+
+ class WaitForObjectRecovery : public crimson::osd::BlockerT<WaitForObjectRecovery> {
+ seastar::shared_promise<> readable, recovered, pulled;
+ std::map<pg_shard_t, seastar::shared_promise<>> pushes;
+ public:
+ static constexpr const char* type_name = "WaitForObjectRecovery";
+
+ crimson::osd::ObjectContextRef obc;
+ std::optional<PullInfo> pi;
+ std::map<pg_shard_t, PushInfo> pushing;
+
+ seastar::future<> wait_for_readable() {
+ return readable.get_shared_future();
+ }
+ seastar::future<> wait_for_pushes(pg_shard_t shard) {
+ return pushes[shard].get_shared_future();
+ }
+ seastar::future<> wait_for_recovered() {
+ return recovered.get_shared_future();
+ }
+ crimson::osd::blocking_future<>
+ wait_for_recovered_blocking() {
+ return make_blocking_future(
+ recovered.get_shared_future());
+ }
+ seastar::future<> wait_for_pull() {
+ return pulled.get_shared_future();
+ }
+ void set_readable() {
+ readable.set_value();
+ }
+ void set_recovered() {
+ recovered.set_value();
+ }
+ void set_pushed(pg_shard_t shard) {
+ pushes[shard].set_value();
+ }
+ void set_pulled() {
+ pulled.set_value();
+ }
+ void set_push_failed(pg_shard_t shard, std::exception_ptr e) {
+ pushes.at(shard).set_exception(e);
+ }
+ void interrupt(std::string_view why) {
+ readable.set_exception(std::system_error(
+ std::make_error_code(std::errc::interrupted), why.data()));
+ recovered.set_exception(std::system_error(
+ std::make_error_code(std::errc::interrupted), why.data()));
+ pulled.set_exception(std::system_error(
+ std::make_error_code(std::errc::interrupted), why.data()));
+ for (auto& [pg_shard, pr] : pushes) {
+ pr.set_exception(std::system_error(
+ std::make_error_code(std::errc::interrupted), why.data()));
+ }
+ }
+ void stop();
+ void dump_detail(Formatter* f) const {
+ }
+ };
+ std::map<hobject_t, WaitForObjectRecovery> recovering;
+ hobject_t get_temp_recovery_object(
+ const hobject_t& target,
+ eversion_t version) const;
+
+ boost::container::flat_set<hobject_t> temp_contents;
+
+ void add_temp_obj(const hobject_t &oid) {
+ temp_contents.insert(oid);
+ }
+ void clear_temp_obj(const hobject_t &oid) {
+ temp_contents.erase(oid);
+ }
+ void clean_up(ceph::os::Transaction& t, std::string_view why);
+ virtual seastar::future<> on_stop() = 0;
+};