summaryrefslogtreecommitdiffstats
path: root/src/osd/ReplicatedBackend.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/osd/ReplicatedBackend.h')
-rw-r--r--src/osd/ReplicatedBackend.h430
1 files changed, 430 insertions, 0 deletions
diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h
new file mode 100644
index 00000000..8f447495
--- /dev/null
+++ b/src/osd/ReplicatedBackend.h
@@ -0,0 +1,430 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef REPBACKEND_H
+#define REPBACKEND_H
+
+#include "PGBackend.h"
+
+struct C_ReplicatedBackend_OnPullComplete;
+class ReplicatedBackend : public PGBackend {
+ struct RPGHandle : public PGBackend::RecoveryHandle {
+ map<pg_shard_t, vector<PushOp> > pushes;
+ map<pg_shard_t, vector<PullOp> > pulls;
+ };
+ friend struct C_ReplicatedBackend_OnPullComplete;
+public:
+ ReplicatedBackend(
+ PGBackend::Listener *pg,
+ const coll_t &coll,
+ ObjectStore::CollectionHandle &ch,
+ ObjectStore *store,
+ CephContext *cct);
+
+ /// @see PGBackend::open_recovery_op
+ RPGHandle *_open_recovery_op() {
+ return new RPGHandle();
+ }
+ PGBackend::RecoveryHandle *open_recovery_op() override {
+ return _open_recovery_op();
+ }
+
+ /// @see PGBackend::run_recovery_op
+ void run_recovery_op(
+ PGBackend::RecoveryHandle *h,
+ int priority) override;
+
+ /// @see PGBackend::recover_object
+ int recover_object(
+ const hobject_t &hoid,
+ eversion_t v,
+ ObjectContextRef head,
+ ObjectContextRef obc,
+ RecoveryHandle *h
+ ) override;
+
+ void check_recovery_sources(const OSDMapRef& osdmap) override;
+
+ bool can_handle_while_inactive(OpRequestRef op) override;
+
+ /// @see PGBackend::handle_message
+ bool _handle_message(
+ OpRequestRef op
+ ) override;
+
+ void on_change() override;
+ void clear_recovery_state() override;
+
+ class RPCRecPred : public IsPGRecoverablePredicate {
+ public:
+ bool operator()(const set<pg_shard_t> &have) const override {
+ return !have.empty();
+ }
+ };
+ IsPGRecoverablePredicate *get_is_recoverable_predicate() const override {
+ return new RPCRecPred;
+ }
+
+ class RPCReadPred : public IsPGReadablePredicate {
+ pg_shard_t whoami;
+ public:
+ explicit RPCReadPred(pg_shard_t whoami) : whoami(whoami) {}
+ bool operator()(const set<pg_shard_t> &have) const override {
+ return have.count(whoami);
+ }
+ };
+ IsPGReadablePredicate *get_is_readable_predicate() const override {
+ return new RPCReadPred(get_parent()->whoami_shard());
+ }
+
+ void dump_recovery_info(Formatter *f) const override {
+ {
+ f->open_array_section("pull_from_peer");
+ for (map<pg_shard_t, set<hobject_t> >::const_iterator i = pull_from_peer.begin();
+ i != pull_from_peer.end();
+ ++i) {
+ f->open_object_section("pulling_from");
+ f->dump_stream("pull_from") << i->first;
+ {
+ f->open_array_section("pulls");
+ for (set<hobject_t>::const_iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ f->open_object_section("pull_info");
+ ceph_assert(pulling.count(*j));
+ pulling.find(*j)->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ {
+ f->open_array_section("pushing");
+ for (map<hobject_t, map<pg_shard_t, PushInfo>>::const_iterator i =
+ pushing.begin();
+ i != pushing.end();
+ ++i) {
+ f->open_object_section("object");
+ f->dump_stream("pushing") << i->first;
+ {
+ f->open_array_section("pushing_to");
+ for (map<pg_shard_t, PushInfo>::const_iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ f->open_object_section("push_progress");
+ f->dump_stream("pushing_to") << j->first;
+ {
+ f->open_object_section("push_info");
+ j->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ }
+
+ int objects_read_sync(
+ const hobject_t &hoid,
+ uint64_t off,
+ uint64_t len,
+ uint32_t op_flags,
+ bufferlist *bl) override;
+
+ void objects_read_async(
+ const hobject_t &hoid,
+ const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+ pair<bufferlist*, Context*> > > &to_read,
+ Context *on_complete,
+ bool fast_read = false) override;
+
+private:
+ // push
+ struct PushInfo {
+ ObjectRecoveryProgress recovery_progress;
+ ObjectRecoveryInfo recovery_info;
+ ObjectContextRef obc;
+ object_stat_sum_t stat;
+ ObcLockManager lock_manager;
+
+ void dump(Formatter *f) const {
+ {
+ f->open_object_section("recovery_progress");
+ recovery_progress.dump(f);
+ f->close_section();
+ }
+ {
+ f->open_object_section("recovery_info");
+ recovery_info.dump(f);
+ f->close_section();
+ }
+ }
+ };
+ map<hobject_t, map<pg_shard_t, PushInfo>> pushing;
+
+ // pull
+ struct PullInfo {
+ pg_shard_t from;
+ hobject_t soid;
+ ObjectRecoveryProgress recovery_progress;
+ ObjectRecoveryInfo recovery_info;
+ ObjectContextRef head_ctx;
+ ObjectContextRef obc;
+ object_stat_sum_t stat;
+ bool cache_dont_need;
+ ObcLockManager lock_manager;
+
+ void dump(Formatter *f) const {
+ {
+ f->open_object_section("recovery_progress");
+ recovery_progress.dump(f);
+ f->close_section();
+ }
+ {
+ f->open_object_section("recovery_info");
+ recovery_info.dump(f);
+ f->close_section();
+ }
+ }
+
+ bool is_complete() const {
+ return recovery_progress.is_complete(recovery_info);
+ }
+ };
+
+ map<hobject_t, PullInfo> pulling;
+
+ // Reverse mapping from osd peer to objects being pulled from that peer
+ map<pg_shard_t, set<hobject_t> > pull_from_peer;
+ void clear_pull(
+ map<hobject_t, PullInfo>::iterator piter,
+ bool clear_pull_from_peer = true);
+ void clear_pull_from(
+ map<hobject_t, PullInfo>::iterator piter);
+
+ void _do_push(OpRequestRef op);
+ void _do_pull_response(OpRequestRef op);
+ void do_push(OpRequestRef op) {
+ if (is_primary()) {
+ _do_pull_response(op);
+ } else {
+ _do_push(op);
+ }
+ }
+ void do_pull(OpRequestRef op);
+ void do_push_reply(OpRequestRef op);
+
+ bool handle_push_reply(pg_shard_t peer, const PushReplyOp &op, PushOp *reply);
+ void handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply);
+
+ struct pull_complete_info {
+ hobject_t hoid;
+ object_stat_sum_t stat;
+ };
+ bool handle_pull_response(
+ pg_shard_t from, const PushOp &op, PullOp *response,
+ list<pull_complete_info> *to_continue,
+ ObjectStore::Transaction *t);
+ void handle_push(pg_shard_t from, const PushOp &op, PushReplyOp *response,
+ ObjectStore::Transaction *t, bool is_repair);
+
+ static void trim_pushed_data(const interval_set<uint64_t> &copy_subset,
+ const interval_set<uint64_t> &intervals_received,
+ bufferlist data_received,
+ interval_set<uint64_t> *intervals_usable,
+ bufferlist *data_usable);
+ void _failed_pull(pg_shard_t from, const hobject_t &soid);
+
+ void send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes);
+ void prep_push_op_blank(const hobject_t& soid, PushOp *op);
+ void send_pulls(
+ int priority,
+ map<pg_shard_t, vector<PullOp> > &pulls);
+
+ int build_push_op(const ObjectRecoveryInfo &recovery_info,
+ const ObjectRecoveryProgress &progress,
+ ObjectRecoveryProgress *out_progress,
+ PushOp *out_op,
+ object_stat_sum_t *stat = 0,
+ bool cache_dont_need = true);
+ void submit_push_data(const ObjectRecoveryInfo &recovery_info,
+ bool first,
+ bool complete,
+ bool cache_dont_need,
+ const interval_set<uint64_t> &intervals_included,
+ bufferlist data_included,
+ bufferlist omap_header,
+ const map<string, bufferlist> &attrs,
+ const map<string, bufferlist> &omap_entries,
+ ObjectStore::Transaction *t);
+ void submit_push_complete(const ObjectRecoveryInfo &recovery_info,
+ ObjectStore::Transaction *t);
+
+ void calc_clone_subsets(
+ SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing,
+ const hobject_t &last_backfill,
+ interval_set<uint64_t>& data_subset,
+ map<hobject_t, interval_set<uint64_t>>& clone_subsets,
+ ObcLockManager &lock_manager);
+ void prepare_pull(
+ eversion_t v,
+ const hobject_t& soid,
+ ObjectContextRef headctx,
+ RPGHandle *h);
+ int start_pushes(
+ const hobject_t &soid,
+ ObjectContextRef obj,
+ RPGHandle *h);
+ int prep_push_to_replica(
+ ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
+ PushOp *pop, bool cache_dont_need = true);
+ int prep_push(
+ ObjectContextRef obc,
+ const hobject_t& oid, pg_shard_t dest,
+ PushOp *op,
+ bool cache_dont_need);
+ int prep_push(
+ ObjectContextRef obc,
+ const hobject_t& soid, pg_shard_t peer,
+ eversion_t version,
+ interval_set<uint64_t> &data_subset,
+ map<hobject_t, interval_set<uint64_t>>& clone_subsets,
+ PushOp *op,
+ bool cache,
+ ObcLockManager &&lock_manager);
+ void calc_head_subsets(
+ ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
+ const pg_missing_t& missing,
+ const hobject_t &last_backfill,
+ interval_set<uint64_t>& data_subset,
+ map<hobject_t, interval_set<uint64_t>>& clone_subsets,
+ ObcLockManager &lock_manager);
+ ObjectRecoveryInfo recalc_subsets(
+ const ObjectRecoveryInfo& recovery_info,
+ SnapSetContext *ssc,
+ ObcLockManager &lock_manager);
+
+ /**
+ * Client IO
+ */
+ struct InProgressOp : public RefCountedObject {
+ ceph_tid_t tid;
+ set<pg_shard_t> waiting_for_commit;
+ Context *on_commit;
+ OpRequestRef op;
+ eversion_t v;
+ InProgressOp(
+ ceph_tid_t tid, Context *on_commit,
+ OpRequestRef op, eversion_t v)
+ : RefCountedObject(nullptr, 0),
+ tid(tid), on_commit(on_commit),
+ op(op), v(v) {}
+ bool done() const {
+ return waiting_for_commit.empty();
+ }
+ };
+ typedef boost::intrusive_ptr<InProgressOp> InProgressOpRef;
+ map<ceph_tid_t, InProgressOpRef> in_progress_ops;
+public:
+ friend class C_OSD_OnOpCommit;
+
+ void call_write_ordered(std::function<void(void)> &&cb) override {
+ // ReplicatedBackend submits writes inline in submit_transaction, so
+ // we can just call the callback.
+ cb();
+ }
+
+ void submit_transaction(
+ const hobject_t &hoid,
+ const object_stat_sum_t &delta_stats,
+ const eversion_t &at_version,
+ PGTransactionUPtr &&t,
+ const eversion_t &trim_to,
+ const eversion_t &roll_forward_to,
+ const vector<pg_log_entry_t> &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_history,
+ Context *on_all_commit,
+ ceph_tid_t tid,
+ osd_reqid_t reqid,
+ OpRequestRef op
+ ) override;
+
+private:
+ Message * generate_subop(
+ const hobject_t &soid,
+ const eversion_t &at_version,
+ ceph_tid_t tid,
+ osd_reqid_t reqid,
+ eversion_t pg_trim_to,
+ eversion_t pg_roll_forward_to,
+ hobject_t new_temp_oid,
+ hobject_t discard_temp_oid,
+ const bufferlist &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_history,
+ ObjectStore::Transaction &op_t,
+ pg_shard_t peer,
+ const pg_info_t &pinfo);
+ void issue_op(
+ const hobject_t &soid,
+ const eversion_t &at_version,
+ ceph_tid_t tid,
+ osd_reqid_t reqid,
+ eversion_t pg_trim_to,
+ eversion_t pg_roll_forward_to,
+ hobject_t new_temp_oid,
+ hobject_t discard_temp_oid,
+ const vector<pg_log_entry_t> &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_history,
+ InProgressOp *op,
+ ObjectStore::Transaction &op_t);
+ void op_commit(InProgressOpRef& op);
+ void do_repop_reply(OpRequestRef op);
+ void do_repop(OpRequestRef op);
+
+ struct RepModify {
+ OpRequestRef op;
+ bool committed;
+ int ackerosd;
+ eversion_t last_complete;
+ epoch_t epoch_started;
+
+ ObjectStore::Transaction opt, localt;
+
+ RepModify() : committed(false), ackerosd(-1),
+ epoch_started(0) {}
+ };
+ typedef std::shared_ptr<RepModify> RepModifyRef;
+
+ struct C_OSD_RepModifyCommit;
+
+ void repop_commit(RepModifyRef rm);
+ bool auto_repair_supported() const override { return store->has_builtin_csum(); }
+
+
+ int be_deep_scrub(
+ const hobject_t &poid,
+ ScrubMap &map,
+ ScrubMapBuilder &pos,
+ ScrubMap::object &o) override;
+ uint64_t be_get_ondisk_size(uint64_t logical_size) override { return logical_size; }
+};
+
+#endif