summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/recovery_backend.h
blob: cb0ae9f20565f15828c6284fb4209d20d6a7f644 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#pragma once

#include <seastar/core/future.hh>

#include "crimson/common/type_helpers.h"
#include "crimson/os/futurized_store.h"
#include "crimson/os/futurized_collection.h"
#include "crimson/osd/object_context.h"
#include "crimson/osd/shard_services.h"

#include "messages/MOSDPGBackfill.h"
#include "messages/MOSDPGBackfillRemove.h"
#include "messages/MOSDPGScan.h"
#include "osd/recovery_types.h"
#include "osd/osd_types.h"

namespace crimson::osd{
  class PG;
}

class PGBackend;

class RecoveryBackend {
  void handle_backfill_finish(
    MOSDPGBackfill& m);
  seastar::future<> handle_backfill_progress(
    MOSDPGBackfill& m);
  seastar::future<> handle_backfill_finish_ack(
    MOSDPGBackfill& m);
  seastar::future<> handle_backfill(MOSDPGBackfill& m);

  seastar::future<> handle_backfill_remove(MOSDPGBackfillRemove& m);

  seastar::future<> handle_scan_get_digest(
    MOSDPGScan& m);
  seastar::future<> handle_scan_digest(
    MOSDPGScan& m);
  seastar::future<> handle_scan(
    MOSDPGScan& m);
protected:
  class WaitForObjectRecovery;
public:
  RecoveryBackend(crimson::osd::PG& pg,
		  crimson::osd::ShardServices& shard_services,
		  crimson::os::CollectionRef coll,
		  PGBackend* backend)
    : pg{pg},
      shard_services{shard_services},
      store{&shard_services.get_store()},
      coll{coll},
      backend{backend} {}
  virtual ~RecoveryBackend() {}
  WaitForObjectRecovery& add_recovering(const hobject_t& soid) {
    auto [it, added] = recovering.emplace(soid, WaitForObjectRecovery{});
    assert(added);
    return it->second;
  }
  WaitForObjectRecovery& get_recovering(const hobject_t& soid) {
    assert(is_recovering(soid));
    return recovering.at(soid);
  }
  void remove_recovering(const hobject_t& soid) {
    recovering.erase(soid);
  }
  bool is_recovering(const hobject_t& soid) const {
    return recovering.count(soid) != 0;
  }
  uint64_t total_recovering() const {
    return recovering.size();
  }

  virtual seastar::future<> handle_recovery_op(
    Ref<MOSDFastDispatchOp> m);

  virtual seastar::future<> recover_object(
    const hobject_t& soid,
    eversion_t need) = 0;
  virtual seastar::future<> recover_delete(
    const hobject_t& soid,
    eversion_t need) = 0;
  virtual seastar::future<> push_delete(
    const hobject_t& soid,
    eversion_t need) = 0;

  seastar::future<BackfillInterval> scan_for_backfill(
    const hobject_t& from,
    std::int64_t min,
    std::int64_t max);

  void on_peering_interval_change(ceph::os::Transaction& t) {
    clean_up(t, "new peering interval");
  }

  seastar::future<> stop() {
    for (auto& [soid, recovery_waiter] : recovering) {
      recovery_waiter.stop();
    }
    return on_stop();
  }
protected:
  crimson::osd::PG& pg;
  crimson::osd::ShardServices& shard_services;
  crimson::os::FuturizedStore* store;
  crimson::os::CollectionRef coll;
  PGBackend* backend;

  struct PullInfo {
    pg_shard_t from;
    hobject_t soid;
    ObjectRecoveryProgress recovery_progress;
    ObjectRecoveryInfo recovery_info;
    crimson::osd::ObjectContextRef head_ctx;
    crimson::osd::ObjectContextRef obc;
    object_stat_sum_t stat;
    bool is_complete() const {
      return recovery_progress.is_complete(recovery_info);
    }
  };

  struct PushInfo {
    ObjectRecoveryProgress recovery_progress;
    ObjectRecoveryInfo recovery_info;
    crimson::osd::ObjectContextRef obc;
    object_stat_sum_t stat;
  };

  class WaitForObjectRecovery : public crimson::osd::BlockerT<WaitForObjectRecovery> {
    seastar::shared_promise<> readable, recovered, pulled;
    std::map<pg_shard_t, seastar::shared_promise<>> pushes;
  public:
    static constexpr const char* type_name = "WaitForObjectRecovery";

    crimson::osd::ObjectContextRef obc;
    std::optional<PullInfo> pi;
    std::map<pg_shard_t, PushInfo> pushing;

    seastar::future<> wait_for_readable() {
      return readable.get_shared_future();
    }
    seastar::future<> wait_for_pushes(pg_shard_t shard) {
      return pushes[shard].get_shared_future();
    }
    seastar::future<> wait_for_recovered() {
      return recovered.get_shared_future();
    }
    crimson::osd::blocking_future<>
    wait_for_recovered_blocking() {
      return make_blocking_future(
	  recovered.get_shared_future());
    }
    seastar::future<> wait_for_pull() {
      return pulled.get_shared_future();
    }
    void set_readable() {
      readable.set_value();
    }
    void set_recovered() {
      recovered.set_value();
    }
    void set_pushed(pg_shard_t shard) {
      pushes[shard].set_value();
    }
    void set_pulled() {
      pulled.set_value();
    }
    void set_push_failed(pg_shard_t shard, std::exception_ptr e) {
      pushes.at(shard).set_exception(e);
    }
    void interrupt(std::string_view why) {
      readable.set_exception(std::system_error(
        std::make_error_code(std::errc::interrupted), why.data()));
      recovered.set_exception(std::system_error(
        std::make_error_code(std::errc::interrupted), why.data()));
      pulled.set_exception(std::system_error(
        std::make_error_code(std::errc::interrupted), why.data()));
      for (auto& [pg_shard, pr] : pushes) {
        pr.set_exception(std::system_error(
          std::make_error_code(std::errc::interrupted), why.data()));
      }
    }
    void stop();
    void dump_detail(Formatter* f) const {
    }
  };
  std::map<hobject_t, WaitForObjectRecovery> recovering;
  hobject_t get_temp_recovery_object(
    const hobject_t& target,
    eversion_t version) const;

  boost::container::flat_set<hobject_t> temp_contents;

  void add_temp_obj(const hobject_t &oid) {
    temp_contents.insert(oid);
  }
  void clear_temp_obj(const hobject_t &oid) {
    temp_contents.erase(oid);
  }
  void clean_up(ceph::os::Transaction& t, std::string_view why);
  virtual seastar::future<> on_stop() = 0;
};