summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/recovery_backend.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/crimson/osd/recovery_backend.h233
1 files changed, 233 insertions, 0 deletions
diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h
new file mode 100644
index 000000000..65e9bb01f
--- /dev/null
+++ b/src/crimson/osd/recovery_backend.h
@@ -0,0 +1,233 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future.hh>
+
+#include "crimson/common/type_helpers.h"
+#include "crimson/os/futurized_store.h"
+#include "crimson/os/futurized_collection.h"
+#include "crimson/osd/pg_interval_interrupt_condition.h"
+#include "crimson/osd/object_context.h"
+#include "crimson/osd/shard_services.h"
+
+#include "messages/MOSDPGBackfill.h"
+#include "messages/MOSDPGBackfillRemove.h"
+#include "messages/MOSDPGScan.h"
+#include "osd/recovery_types.h"
+#include "osd/osd_types.h"
+
+namespace crimson::osd{
+ class PG;
+}
+
+class PGBackend;
+
+class RecoveryBackend {
+public:
+ class WaitForObjectRecovery;
+public:
+ template <typename T = void>
+ using interruptible_future =
+ ::crimson::interruptible::interruptible_future<
+ ::crimson::osd::IOInterruptCondition, T>;
+ using interruptor =
+ ::crimson::interruptible::interruptor<
+ ::crimson::osd::IOInterruptCondition>;
+ RecoveryBackend(crimson::osd::PG& pg,
+ crimson::osd::ShardServices& shard_services,
+ crimson::os::CollectionRef coll,
+ PGBackend* backend)
+ : pg{pg},
+ shard_services{shard_services},
+ store{&shard_services.get_store()},
+ coll{coll},
+ backend{backend} {}
+ virtual ~RecoveryBackend() {}
+ WaitForObjectRecovery& add_recovering(const hobject_t& soid) {
+ auto [it, added] = recovering.emplace(soid, new WaitForObjectRecovery{});
+ assert(added);
+ return *(it->second);
+ }
+ WaitForObjectRecovery& get_recovering(const hobject_t& soid) {
+ assert(is_recovering(soid));
+ return *(recovering.at(soid));
+ }
+ void remove_recovering(const hobject_t& soid) {
+ recovering.erase(soid);
+ }
+ bool is_recovering(const hobject_t& soid) const {
+ return recovering.count(soid) != 0;
+ }
+ uint64_t total_recovering() const {
+ return recovering.size();
+ }
+
+ virtual interruptible_future<> handle_recovery_op(
+ Ref<MOSDFastDispatchOp> m,
+ crimson::net::ConnectionRef conn);
+
+ virtual interruptible_future<> recover_object(
+ const hobject_t& soid,
+ eversion_t need) = 0;
+ virtual interruptible_future<> recover_delete(
+ const hobject_t& soid,
+ eversion_t need) = 0;
+ virtual interruptible_future<> push_delete(
+ const hobject_t& soid,
+ eversion_t need) = 0;
+
+ interruptible_future<BackfillInterval> scan_for_backfill(
+ const hobject_t& from,
+ std::int64_t min,
+ std::int64_t max);
+
+ void on_peering_interval_change(ceph::os::Transaction& t) {
+ clean_up(t, "new peering interval");
+ }
+
+ seastar::future<> stop() {
+ for (auto& [soid, recovery_waiter] : recovering) {
+ recovery_waiter->stop();
+ }
+ return on_stop();
+ }
+protected:
+ crimson::osd::PG& pg;
+ crimson::osd::ShardServices& shard_services;
+ crimson::os::FuturizedStore::Shard* store;
+ crimson::os::CollectionRef coll;
+ PGBackend* backend;
+
+ struct pull_info_t {
+ pg_shard_t from;
+ hobject_t soid;
+ ObjectRecoveryProgress recovery_progress;
+ ObjectRecoveryInfo recovery_info;
+ crimson::osd::ObjectContextRef head_ctx;
+ crimson::osd::ObjectContextRef obc;
+ object_stat_sum_t stat;
+ bool is_complete() const {
+ return recovery_progress.is_complete(recovery_info);
+ }
+ };
+
+ struct push_info_t {
+ ObjectRecoveryProgress recovery_progress;
+ ObjectRecoveryInfo recovery_info;
+ crimson::osd::ObjectContextRef obc;
+ object_stat_sum_t stat;
+ };
+
+public:
+ class WaitForObjectRecovery :
+ public boost::intrusive_ref_counter<
+ WaitForObjectRecovery, boost::thread_unsafe_counter>,
+ public crimson::BlockerT<WaitForObjectRecovery> {
+ seastar::shared_promise<> readable, recovered, pulled;
+ std::map<pg_shard_t, seastar::shared_promise<>> pushes;
+ public:
+ static constexpr const char* type_name = "WaitForObjectRecovery";
+
+ crimson::osd::ObjectContextRef obc;
+ std::optional<pull_info_t> pull_info;
+ std::map<pg_shard_t, push_info_t> pushing;
+
+ seastar::future<> wait_for_readable() {
+ return readable.get_shared_future();
+ }
+ seastar::future<> wait_for_pushes(pg_shard_t shard) {
+ return pushes[shard].get_shared_future();
+ }
+ seastar::future<> wait_for_recovered() {
+ return recovered.get_shared_future();
+ }
+ template <typename T, typename F>
+ auto wait_track_blocking(T &trigger, F &&fut) {
+ WaitForObjectRecoveryRef ref = this;
+ return track_blocking(
+ trigger,
+ std::forward<F>(fut)
+ ).finally([ref] {});
+ }
+ template <typename T>
+ seastar::future<> wait_for_recovered(T &trigger) {
+ WaitForObjectRecoveryRef ref = this;
+ return wait_track_blocking(trigger, recovered.get_shared_future());
+ }
+ seastar::future<> wait_for_pull() {
+ return pulled.get_shared_future();
+ }
+ void set_readable() {
+ readable.set_value();
+ }
+ void set_recovered() {
+ recovered.set_value();
+ }
+ void set_pushed(pg_shard_t shard) {
+ pushes[shard].set_value();
+ }
+ void set_pulled() {
+ pulled.set_value();
+ }
+ void set_push_failed(pg_shard_t shard, std::exception_ptr e) {
+ pushes.at(shard).set_exception(e);
+ }
+ void interrupt(std::string_view why) {
+ readable.set_exception(std::system_error(
+ std::make_error_code(std::errc::interrupted), why.data()));
+ recovered.set_exception(std::system_error(
+ std::make_error_code(std::errc::interrupted), why.data()));
+ pulled.set_exception(std::system_error(
+ std::make_error_code(std::errc::interrupted), why.data()));
+ for (auto& [pg_shard, pr] : pushes) {
+ pr.set_exception(std::system_error(
+ std::make_error_code(std::errc::interrupted), why.data()));
+ }
+ }
+ void stop();
+ void dump_detail(Formatter* f) const {
+ }
+ };
+ using RecoveryBlockingEvent =
+ crimson::AggregateBlockingEvent<WaitForObjectRecovery::BlockingEvent>;
+ using WaitForObjectRecoveryRef = boost::intrusive_ptr<WaitForObjectRecovery>;
+protected:
+ std::map<hobject_t, WaitForObjectRecoveryRef> recovering;
+ hobject_t get_temp_recovery_object(
+ const hobject_t& target,
+ eversion_t version) const;
+
+ boost::container::flat_set<hobject_t> temp_contents;
+
+ void add_temp_obj(const hobject_t &oid) {
+ temp_contents.insert(oid);
+ }
+ void clear_temp_obj(const hobject_t &oid) {
+ temp_contents.erase(oid);
+ }
+ void clean_up(ceph::os::Transaction& t, std::string_view why);
+ virtual seastar::future<> on_stop() = 0;
+private:
+ void handle_backfill_finish(
+ MOSDPGBackfill& m,
+ crimson::net::ConnectionRef conn);
+ interruptible_future<> handle_backfill_progress(
+ MOSDPGBackfill& m);
+ interruptible_future<> handle_backfill_finish_ack(
+ MOSDPGBackfill& m);
+ interruptible_future<> handle_backfill(
+ MOSDPGBackfill& m,
+ crimson::net::ConnectionRef conn);
+
+ interruptible_future<> handle_scan_get_digest(
+ MOSDPGScan& m,
+ crimson::net::ConnectionRef conn);
+ interruptible_future<> handle_scan_digest(
+ MOSDPGScan& m);
+ interruptible_future<> handle_scan(
+ MOSDPGScan& m,
+ crimson::net::ConnectionRef conn);
+ interruptible_future<> handle_backfill_remove(MOSDPGBackfillRemove& m);
+};