summaryrefslogtreecommitdiffstats
path: root/src/osd/ECBackend.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/osd/ECBackend.cc
parentInitial commit. (diff)
downloadceph-upstream/18.2.2.tar.xz
ceph-upstream/18.2.2.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/osd/ECBackend.cc')
-rw-r--r--src/osd/ECBackend.cc2642
1 files changed, 2642 insertions, 0 deletions
diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc
new file mode 100644
index 000000000..685af573e
--- /dev/null
+++ b/src/osd/ECBackend.cc
@@ -0,0 +1,2642 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <iostream>
+#include <sstream>
+
+#include "ECBackend.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+#include "messages/MOSDECSubOpWrite.h"
+#include "messages/MOSDECSubOpWriteReply.h"
+#include "messages/MOSDECSubOpRead.h"
+#include "messages/MOSDECSubOpReadReply.h"
+#include "ECMsgTypes.h"
+
+#include "PrimaryLogPG.h"
+#include "osd_tracer.h"
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_osd
+#define DOUT_PREFIX_ARGS this
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+
+using std::dec;
+using std::hex;
+using std::less;
+using std::list;
+using std::make_pair;
+using std::map;
+using std::pair;
+using std::ostream;
+using std::set;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+using ceph::bufferhash;
+using ceph::bufferlist;
+using ceph::bufferptr;
+using ceph::ErasureCodeInterfaceRef;
+using ceph::Formatter;
+
+static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
+ return pgb->get_parent()->gen_dbg_prefix(*_dout);
+}
+
+struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
+ list<ECBackend::RecoveryOp> ops;
+};
+
+ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
+ switch (rhs.pipeline_state) {
+ case ECBackend::pipeline_state_t::CACHE_VALID:
+ return lhs << "CACHE_VALID";
+ case ECBackend::pipeline_state_t::CACHE_INVALID:
+ return lhs << "CACHE_INVALID";
+ default:
+ ceph_abort_msg("invalid pipeline state");
+ }
+ return lhs; // unreachable
+}
+
+static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
+{
+ lhs << "[";
+ for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
+ i != rhs.end();
+ ++i) {
+ if (i != rhs.begin())
+ lhs << ", ";
+ lhs << make_pair(i->first, i->second.length());
+ }
+ return lhs << "]";
+}
+
+static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
+{
+ lhs << "[";
+ for (map<int, bufferlist>::const_iterator i = rhs.begin();
+ i != rhs.end();
+ ++i) {
+ if (i != rhs.begin())
+ lhs << ", ";
+ lhs << make_pair(i->first, i->second.length());
+ }
+ return lhs << "]";
+}
+
+static ostream &operator<<(
+ ostream &lhs,
+ const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
+{
+ return lhs << "(" << rhs.get<0>() << ", "
+ << rhs.get<1>() << ", " << rhs.get<2>() << ")";
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
+{
+ return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
+ << ", need=" << rhs.need
+ << ", want_attrs=" << rhs.want_attrs
+ << ")";
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
+{
+ lhs << "read_result_t(r=" << rhs.r
+ << ", errors=" << rhs.errors;
+ if (rhs.attrs) {
+ lhs << ", attrs=" << *(rhs.attrs);
+ } else {
+ lhs << ", noattrs";
+ }
+ return lhs << ", returned=" << rhs.returned << ")";
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
+{
+ lhs << "ReadOp(tid=" << rhs.tid;
+ if (rhs.op && rhs.op->get_req()) {
+ lhs << ", op=";
+ rhs.op->get_req()->print(lhs);
+ }
+ return lhs << ", to_read=" << rhs.to_read
+ << ", complete=" << rhs.complete
+ << ", priority=" << rhs.priority
+ << ", obj_to_source=" << rhs.obj_to_source
+ << ", source_to_obj=" << rhs.source_to_obj
+ << ", in_progress=" << rhs.in_progress << ")";
+}
+
+void ECBackend::ReadOp::dump(Formatter *f) const
+{
+ f->dump_unsigned("tid", tid);
+ if (op && op->get_req()) {
+ f->dump_stream("op") << *(op->get_req());
+ }
+ f->dump_stream("to_read") << to_read;
+ f->dump_stream("complete") << complete;
+ f->dump_int("priority", priority);
+ f->dump_stream("obj_to_source") << obj_to_source;
+ f->dump_stream("source_to_obj") << source_to_obj;
+ f->dump_stream("in_progress") << in_progress;
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
+{
+ lhs << "Op(" << rhs.hoid
+ << " v=" << rhs.version
+ << " tt=" << rhs.trim_to
+ << " tid=" << rhs.tid
+ << " reqid=" << rhs.reqid;
+ if (rhs.client_op && rhs.client_op->get_req()) {
+ lhs << " client_op=";
+ rhs.client_op->get_req()->print(lhs);
+ }
+ lhs << " roll_forward_to=" << rhs.roll_forward_to
+ << " temp_added=" << rhs.temp_added
+ << " temp_cleared=" << rhs.temp_cleared
+ << " pending_read=" << rhs.pending_read
+ << " remote_read=" << rhs.remote_read
+ << " remote_read_result=" << rhs.remote_read_result
+ << " pending_apply=" << rhs.pending_apply
+ << " pending_commit=" << rhs.pending_commit
+ << " plan.to_read=" << rhs.plan.to_read
+ << " plan.will_write=" << rhs.plan.will_write
+ << ")";
+ return lhs;
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
+{
+ return lhs << "RecoveryOp("
+ << "hoid=" << rhs.hoid
+ << " v=" << rhs.v
+ << " missing_on=" << rhs.missing_on
+ << " missing_on_shards=" << rhs.missing_on_shards
+ << " recovery_info=" << rhs.recovery_info
+ << " recovery_progress=" << rhs.recovery_progress
+ << " obc refcount=" << rhs.obc.use_count()
+ << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
+ << " waiting_on_pushes=" << rhs.waiting_on_pushes
+ << " extent_requested=" << rhs.extent_requested
+ << ")";
+}
+
+void ECBackend::RecoveryOp::dump(Formatter *f) const
+{
+ f->dump_stream("hoid") << hoid;
+ f->dump_stream("v") << v;
+ f->dump_stream("missing_on") << missing_on;
+ f->dump_stream("missing_on_shards") << missing_on_shards;
+ f->dump_stream("recovery_info") << recovery_info;
+ f->dump_stream("recovery_progress") << recovery_progress;
+ f->dump_stream("state") << tostr(state);
+ f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
+ f->dump_stream("extent_requested") << extent_requested;
+}
+
+ECBackend::ECBackend(
+ PGBackend::Listener *pg,
+ const coll_t &coll,
+ ObjectStore::CollectionHandle &ch,
+ ObjectStore *store,
+ CephContext *cct,
+ ErasureCodeInterfaceRef ec_impl,
+ uint64_t stripe_width)
+ : PGBackend(cct, pg, store, coll, ch),
+ ec_impl(ec_impl),
+ sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
+ ceph_assert((ec_impl->get_data_chunk_count() *
+ ec_impl->get_chunk_size(stripe_width)) == stripe_width);
+}
+
+PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
+{
+ return new ECRecoveryHandle;
+}
+
+void ECBackend::_failed_push(const hobject_t &hoid,
+ pair<RecoveryMessages *, ECBackend::read_result_t &> &in)
+{
+ ECBackend::read_result_t &res = in.second;
+ dout(10) << __func__ << ": Read error " << hoid << " r="
+ << res.r << " errors=" << res.errors << dendl;
+ dout(10) << __func__ << ": canceling recovery op for obj " << hoid
+ << dendl;
+ ceph_assert(recovery_ops.count(hoid));
+ eversion_t v = recovery_ops[hoid].v;
+ recovery_ops.erase(hoid);
+
+ set<pg_shard_t> fl;
+ for (auto&& i : res.errors) {
+ fl.insert(i.first);
+ }
+ get_parent()->on_failed_pull(fl, hoid, v);
+}
+
+struct OnRecoveryReadComplete :
+ public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
+ ECBackend *pg;
+ hobject_t hoid;
+ OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
+ : pg(pg), hoid(hoid) {}
+ void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
+ ECBackend::read_result_t &res = in.second;
+ if (!(res.r == 0 && res.errors.empty())) {
+ pg->_failed_push(hoid, in);
+ return;
+ }
+ ceph_assert(res.returned.size() == 1);
+ pg->handle_recovery_read_complete(
+ hoid,
+ res.returned.back(),
+ res.attrs,
+ in.first);
+ }
+};
+
+struct RecoveryMessages {
+ map<hobject_t,
+ ECBackend::read_request_t> reads;
+ map<hobject_t, set<int>> want_to_read;
+ void read(
+ ECBackend *ec,
+ const hobject_t &hoid, uint64_t off, uint64_t len,
+ set<int> &&_want_to_read,
+ const map<pg_shard_t, vector<pair<int, int>>> &need,
+ bool attrs) {
+ list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
+ to_read.push_back(boost::make_tuple(off, len, 0));
+ ceph_assert(!reads.count(hoid));
+ want_to_read.insert(make_pair(hoid, std::move(_want_to_read)));
+ reads.insert(
+ make_pair(
+ hoid,
+ ECBackend::read_request_t(
+ to_read,
+ need,
+ attrs,
+ new OnRecoveryReadComplete(
+ ec,
+ hoid))));
+ }
+
+ map<pg_shard_t, vector<PushOp> > pushes;
+ map<pg_shard_t, vector<PushReplyOp> > push_replies;
+ ObjectStore::Transaction t;
+ RecoveryMessages() {}
+ ~RecoveryMessages() {}
+};
+
+void ECBackend::handle_recovery_push(
+ const PushOp &op,
+ RecoveryMessages *m,
+ bool is_repair)
+{
+ if (get_parent()->check_failsafe_full()) {
+ dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
+ ceph_abort();
+ }
+
+ bool oneshot = op.before_progress.first && op.after_progress.data_complete;
+ ghobject_t tobj;
+ if (oneshot) {
+ tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
+ get_parent()->whoami_shard().shard);
+ } else {
+ tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
+ op.version),
+ ghobject_t::NO_GEN,
+ get_parent()->whoami_shard().shard);
+ if (op.before_progress.first) {
+ dout(10) << __func__ << ": Adding oid "
+ << tobj.hobj << " in the temp collection" << dendl;
+ add_temp_obj(tobj.hobj);
+ }
+ }
+
+ if (op.before_progress.first) {
+ m->t.remove(coll, tobj);
+ m->t.touch(coll, tobj);
+ }
+
+ if (!op.data_included.empty()) {
+ uint64_t start = op.data_included.range_start();
+ uint64_t end = op.data_included.range_end();
+ ceph_assert(op.data.length() == (end - start));
+
+ m->t.write(
+ coll,
+ tobj,
+ start,
+ op.data.length(),
+ op.data);
+ } else {
+ ceph_assert(op.data.length() == 0);
+ }
+
+ if (get_parent()->pg_is_remote_backfilling()) {
+ get_parent()->pg_add_local_num_bytes(op.data.length());
+ get_parent()->pg_add_num_bytes(op.data.length() * get_ec_data_chunk_count());
+ dout(10) << __func__ << " " << op.soid
+ << " add new actual data by " << op.data.length()
+ << " add new num_bytes by " << op.data.length() * get_ec_data_chunk_count()
+ << dendl;
+ }
+
+ if (op.before_progress.first) {
+ ceph_assert(op.attrset.count(string("_")));
+ m->t.setattrs(
+ coll,
+ tobj,
+ op.attrset);
+ }
+
+ if (op.after_progress.data_complete && !oneshot) {
+ dout(10) << __func__ << ": Removing oid "
+ << tobj.hobj << " from the temp collection" << dendl;
+ clear_temp_obj(tobj.hobj);
+ m->t.remove(coll, ghobject_t(
+ op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
+ m->t.collection_move_rename(
+ coll, tobj,
+ coll, ghobject_t(
+ op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
+ }
+ if (op.after_progress.data_complete) {
+ if ((get_parent()->pgb_is_primary())) {
+ ceph_assert(recovery_ops.count(op.soid));
+ ceph_assert(recovery_ops[op.soid].obc);
+ if (get_parent()->pg_is_repair() || is_repair)
+ get_parent()->inc_osd_stat_repaired();
+ get_parent()->on_local_recover(
+ op.soid,
+ op.recovery_info,
+ recovery_ops[op.soid].obc,
+ false,
+ &m->t);
+ } else {
+ // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired
+ if (is_repair)
+ get_parent()->inc_osd_stat_repaired();
+ get_parent()->on_local_recover(
+ op.soid,
+ op.recovery_info,
+ ObjectContextRef(),
+ false,
+ &m->t);
+ if (get_parent()->pg_is_remote_backfilling()) {
+ struct stat st;
+ int r = store->stat(ch, ghobject_t(op.soid, ghobject_t::NO_GEN,
+ get_parent()->whoami_shard().shard), &st);
+ if (r == 0) {
+ get_parent()->pg_sub_local_num_bytes(st.st_size);
+ // XXX: This can be way overestimated for small objects
+ get_parent()->pg_sub_num_bytes(st.st_size * get_ec_data_chunk_count());
+ dout(10) << __func__ << " " << op.soid
+ << " sub actual data by " << st.st_size
+ << " sub num_bytes by " << st.st_size * get_ec_data_chunk_count()
+ << dendl;
+ }
+ }
+ }
+ }
+ m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
+ m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
+}
+
+void ECBackend::handle_recovery_push_reply(
+ const PushReplyOp &op,
+ pg_shard_t from,
+ RecoveryMessages *m)
+{
+ if (!recovery_ops.count(op.soid))
+ return;
+ RecoveryOp &rop = recovery_ops[op.soid];
+ ceph_assert(rop.waiting_on_pushes.count(from));
+ rop.waiting_on_pushes.erase(from);
+ continue_recovery_op(rop, m);
+}
+
+void ECBackend::handle_recovery_read_complete(
+ const hobject_t &hoid,
+ boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
+ std::optional<map<string, bufferlist, less<>> > attrs,
+ RecoveryMessages *m)
+{
+ dout(10) << __func__ << ": returned " << hoid << " "
+ << "(" << to_read.get<0>()
+ << ", " << to_read.get<1>()
+ << ", " << to_read.get<2>()
+ << ")"
+ << dendl;
+ ceph_assert(recovery_ops.count(hoid));
+ RecoveryOp &op = recovery_ops[hoid];
+ ceph_assert(op.returned_data.empty());
+ map<int, bufferlist*> target;
+ for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
+ i != op.missing_on_shards.end();
+ ++i) {
+ target[*i] = &(op.returned_data[*i]);
+ }
+ map<int, bufferlist> from;
+ for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
+ i != to_read.get<2>().end();
+ ++i) {
+ from[i->first.shard] = std::move(i->second);
+ }
+ dout(10) << __func__ << ": " << from << dendl;
+ int r;
+ r = ECUtil::decode(sinfo, ec_impl, from, target);
+ ceph_assert(r == 0);
+ if (attrs) {
+ op.xattrs.swap(*attrs);
+
+ if (!op.obc) {
+ // attrs only reference the origin bufferlist (decode from
+ // ECSubReadReply message) whose size is much greater than attrs
+ // in recovery. If obc cache it (get_obc maybe cache the attr),
+ // this causes the whole origin bufferlist would not be free
+ // until obc is evicted from obc cache. So rebuild the
+ // bufferlist before cache it.
+ for (map<string, bufferlist>::iterator it = op.xattrs.begin();
+ it != op.xattrs.end();
+ ++it) {
+ it->second.rebuild();
+ }
+ // Need to remove ECUtil::get_hinfo_key() since it should not leak out
+ // of the backend (see bug #12983)
+ map<string, bufferlist, less<>> sanitized_attrs(op.xattrs);
+ sanitized_attrs.erase(ECUtil::get_hinfo_key());
+ op.obc = get_parent()->get_obc(hoid, sanitized_attrs);
+ ceph_assert(op.obc);
+ op.recovery_info.size = op.obc->obs.oi.size;
+ op.recovery_info.oi = op.obc->obs.oi;
+ }
+
+ ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
+ if (op.obc->obs.oi.size > 0) {
+ ceph_assert(op.xattrs.count(ECUtil::get_hinfo_key()));
+ auto bp = op.xattrs[ECUtil::get_hinfo_key()].cbegin();
+ decode(hinfo, bp);
+ }
+ op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
+ }
+ ceph_assert(op.xattrs.size());
+ ceph_assert(op.obc);
+ continue_recovery_op(op, m);
+}
+
+struct SendPushReplies : public Context {
+ PGBackend::Listener *l;
+ epoch_t epoch;
+ map<int, MOSDPGPushReply*> replies;
+ SendPushReplies(
+ PGBackend::Listener *l,
+ epoch_t epoch,
+ map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
+ replies.swap(in);
+ }
+ void finish(int) override {
+ std::vector<std::pair<int, Message*>> messages;
+ messages.reserve(replies.size());
+ for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
+ i != replies.end();
+ ++i) {
+ messages.push_back(std::make_pair(i->first, i->second));
+ }
+ if (!messages.empty()) {
+ l->send_message_osd_cluster(messages, epoch);
+ }
+ replies.clear();
+ }
+ ~SendPushReplies() override {
+ for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
+ i != replies.end();
+ ++i) {
+ i->second->put();
+ }
+ replies.clear();
+ }
+};
+
+void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
+{
+ for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
+ i != m.pushes.end();
+ m.pushes.erase(i++)) {
+ MOSDPGPush *msg = new MOSDPGPush();
+ msg->set_priority(priority);
+ msg->map_epoch = get_osdmap_epoch();
+ msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
+ msg->from = get_parent()->whoami_shard();
+ msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
+ msg->pushes.swap(i->second);
+ msg->compute_cost(cct);
+ msg->is_repair = get_parent()->pg_is_repair();
+ get_parent()->send_message(
+ i->first.osd,
+ msg);
+ }
+ map<int, MOSDPGPushReply*> replies;
+ for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
+ m.push_replies.begin();
+ i != m.push_replies.end();
+ m.push_replies.erase(i++)) {
+ MOSDPGPushReply *msg = new MOSDPGPushReply();
+ msg->set_priority(priority);
+ msg->map_epoch = get_osdmap_epoch();
+ msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
+ msg->from = get_parent()->whoami_shard();
+ msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
+ msg->replies.swap(i->second);
+ msg->compute_cost(cct);
+ replies.insert(make_pair(i->first.osd, msg));
+ }
+
+ if (!replies.empty()) {
+ (m.t).register_on_complete(
+ get_parent()->bless_context(
+ new SendPushReplies(
+ get_parent(),
+ get_osdmap_epoch(),
+ replies)));
+ get_parent()->queue_transaction(std::move(m.t));
+ }
+
+ if (m.reads.empty())
+ return;
+ start_read_op(
+ priority,
+ m.want_to_read,
+ m.reads,
+ OpRequestRef(),
+ false, true);
+}
+
+void ECBackend::continue_recovery_op(
+ RecoveryOp &op,
+ RecoveryMessages *m)
+{
+ dout(10) << __func__ << ": continuing " << op << dendl;
+ while (1) {
+ switch (op.state) {
+ case RecoveryOp::IDLE: {
+ // start read
+ op.state = RecoveryOp::READING;
+ ceph_assert(!op.recovery_progress.data_complete);
+ set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
+ uint64_t from = op.recovery_progress.data_recovered_to;
+ uint64_t amount = get_recovery_chunk_size();
+
+ if (op.recovery_progress.first && op.obc) {
+ /* We've got the attrs and the hinfo, might as well use them */
+ op.hinfo = get_hash_info(op.hoid);
+ if (!op.hinfo) {
+ derr << __func__ << ": " << op.hoid << " has inconsistent hinfo"
+ << dendl;
+ ceph_assert(recovery_ops.count(op.hoid));
+ eversion_t v = recovery_ops[op.hoid].v;
+ recovery_ops.erase(op.hoid);
+ get_parent()->on_failed_pull({get_parent()->whoami_shard()},
+ op.hoid, v);
+ return;
+ }
+ op.xattrs = op.obc->attr_cache;
+ encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
+ }
+
+ map<pg_shard_t, vector<pair<int, int>>> to_read;
+ int r = get_min_avail_to_read_shards(
+ op.hoid, want, true, false, &to_read);
+ if (r != 0) {
+ // we must have lost a recovery source
+ ceph_assert(!op.recovery_progress.first);
+ dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
+ << dendl;
+ get_parent()->cancel_pull(op.hoid);
+ recovery_ops.erase(op.hoid);
+ return;
+ }
+ m->read(
+ this,
+ op.hoid,
+ op.recovery_progress.data_recovered_to,
+ amount,
+ std::move(want),
+ to_read,
+ op.recovery_progress.first && !op.obc);
+ op.extent_requested = make_pair(
+ from,
+ amount);
+ dout(10) << __func__ << ": IDLE return " << op << dendl;
+ return;
+ }
+ case RecoveryOp::READING: {
+ // read completed, start write
+ ceph_assert(op.xattrs.size());
+ ceph_assert(op.returned_data.size());
+ op.state = RecoveryOp::WRITING;
+ ObjectRecoveryProgress after_progress = op.recovery_progress;
+ after_progress.data_recovered_to += op.extent_requested.second;
+ after_progress.first = false;
+ if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
+ after_progress.data_recovered_to =
+ sinfo.logical_to_next_stripe_offset(
+ op.obc->obs.oi.size);
+ after_progress.data_complete = true;
+ }
+ for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
+ mi != op.missing_on.end();
+ ++mi) {
+ ceph_assert(op.returned_data.count(mi->shard));
+ m->pushes[*mi].push_back(PushOp());
+ PushOp &pop = m->pushes[*mi].back();
+ pop.soid = op.hoid;
+ pop.version = op.v;
+ pop.data = op.returned_data[mi->shard];
+ dout(10) << __func__ << ": before_progress=" << op.recovery_progress
+ << ", after_progress=" << after_progress
+ << ", pop.data.length()=" << pop.data.length()
+ << ", size=" << op.obc->obs.oi.size << dendl;
+ ceph_assert(
+ pop.data.length() ==
+ sinfo.aligned_logical_offset_to_chunk_offset(
+ after_progress.data_recovered_to -
+ op.recovery_progress.data_recovered_to)
+ );
+ if (pop.data.length())
+ pop.data_included.insert(
+ sinfo.aligned_logical_offset_to_chunk_offset(
+ op.recovery_progress.data_recovered_to),
+ pop.data.length()
+ );
+ if (op.recovery_progress.first) {
+ pop.attrset = op.xattrs;
+ }
+ pop.recovery_info = op.recovery_info;
+ pop.before_progress = op.recovery_progress;
+ pop.after_progress = after_progress;
+ if (*mi != get_parent()->primary_shard())
+ get_parent()->begin_peer_recover(
+ *mi,
+ op.hoid);
+ }
+ op.returned_data.clear();
+ op.waiting_on_pushes = op.missing_on;
+ op.recovery_progress = after_progress;
+ dout(10) << __func__ << ": READING return " << op << dendl;
+ return;
+ }
+ case RecoveryOp::WRITING: {
+ if (op.waiting_on_pushes.empty()) {
+ if (op.recovery_progress.data_complete) {
+ op.state = RecoveryOp::COMPLETE;
+ for (set<pg_shard_t>::iterator i = op.missing_on.begin();
+ i != op.missing_on.end();
+ ++i) {
+ if (*i != get_parent()->primary_shard()) {
+ dout(10) << __func__ << ": on_peer_recover on " << *i
+ << ", obj " << op.hoid << dendl;
+ get_parent()->on_peer_recover(
+ *i,
+ op.hoid,
+ op.recovery_info);
+ }
+ }
+ object_stat_sum_t stat;
+ stat.num_bytes_recovered = op.recovery_info.size;
+ stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
+ stat.num_objects_recovered = 1;
+ if (get_parent()->pg_is_repair())
+ stat.num_objects_repaired = 1;
+ get_parent()->on_global_recover(op.hoid, stat, false);
+ dout(10) << __func__ << ": WRITING return " << op << dendl;
+ recovery_ops.erase(op.hoid);
+ return;
+ } else {
+ op.state = RecoveryOp::IDLE;
+ dout(10) << __func__ << ": WRITING continue " << op << dendl;
+ continue;
+ }
+ }
+ return;
+ }
+ // should never be called once complete
+ case RecoveryOp::COMPLETE:
+ default: {
+ ceph_abort();
+ };
+ }
+ }
+}
+
+void ECBackend::run_recovery_op(
+ RecoveryHandle *_h,
+ int priority)
+{
+ ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
+ RecoveryMessages m;
+ for (list<RecoveryOp>::iterator i = h->ops.begin();
+ i != h->ops.end();
+ ++i) {
+ dout(10) << __func__ << ": starting " << *i << dendl;
+ ceph_assert(!recovery_ops.count(i->hoid));
+ RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
+ continue_recovery_op(op, &m);
+ }
+
+ dispatch_recovery_messages(m, priority);
+ send_recovery_deletes(priority, h->deletes);
+ delete _h;
+}
+
+int ECBackend::recover_object(
+ const hobject_t &hoid,
+ eversion_t v,
+ ObjectContextRef head,
+ ObjectContextRef obc,
+ RecoveryHandle *_h)
+{
+ ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
+ h->ops.push_back(RecoveryOp());
+ h->ops.back().v = v;
+ h->ops.back().hoid = hoid;
+ h->ops.back().obc = obc;
+ h->ops.back().recovery_info.soid = hoid;
+ h->ops.back().recovery_info.version = v;
+ if (obc) {
+ h->ops.back().recovery_info.size = obc->obs.oi.size;
+ h->ops.back().recovery_info.oi = obc->obs.oi;
+ }
+ if (hoid.is_snap()) {
+ if (obc) {
+ ceph_assert(obc->ssc);
+ h->ops.back().recovery_info.ss = obc->ssc->snapset;
+ } else if (head) {
+ ceph_assert(head->ssc);
+ h->ops.back().recovery_info.ss = head->ssc->snapset;
+ } else {
+ ceph_abort_msg("neither obc nor head set for a snap object");
+ }
+ }
+ h->ops.back().recovery_progress.omap_complete = true;
+ for (set<pg_shard_t>::const_iterator i =
+ get_parent()->get_acting_recovery_backfill_shards().begin();
+ i != get_parent()->get_acting_recovery_backfill_shards().end();
+ ++i) {
+ dout(10) << "checking " << *i << dendl;
+ if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
+ h->ops.back().missing_on.insert(*i);
+ h->ops.back().missing_on_shards.insert(i->shard);
+ }
+ }
+ dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
+ return 0;
+}
+
+bool ECBackend::can_handle_while_inactive(
+ OpRequestRef _op)
+{
+ return false;
+}
+
+bool ECBackend::_handle_message(
+ OpRequestRef _op)
+{
+ dout(10) << __func__ << ": " << *_op->get_req() << dendl;
+ int priority = _op->get_req()->get_priority();
+ switch (_op->get_req()->get_type()) {
+ case MSG_OSD_EC_WRITE: {
+ // NOTE: this is non-const because handle_sub_write modifies the embedded
+ // ObjectStore::Transaction in place (and then std::move's it). It does
+ // not conflict with ECSubWrite's operator<<.
+ MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
+ _op->get_nonconst_req());
+ parent->maybe_preempt_replica_scrub(op->op.soid);
+ handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
+ return true;
+ }
+ case MSG_OSD_EC_WRITE_REPLY: {
+ const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
+ _op->get_req());
+ handle_sub_write_reply(op->op.from, op->op, _op->pg_trace);
+ return true;
+ }
+ case MSG_OSD_EC_READ: {
+ auto op = _op->get_req<MOSDECSubOpRead>();
+ MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
+ reply->pgid = get_parent()->primary_spg_t();
+ reply->map_epoch = get_osdmap_epoch();
+ reply->min_epoch = get_parent()->get_interval_start_epoch();
+ handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace);
+ reply->trace = _op->pg_trace;
+ get_parent()->send_message_osd_cluster(
+ reply, _op->get_req()->get_connection());
+ return true;
+ }
+ case MSG_OSD_EC_READ_REPLY: {
+ // NOTE: this is non-const because handle_sub_read_reply steals resulting
+ // buffers. It does not conflict with ECSubReadReply operator<<.
+ MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
+ _op->get_nonconst_req());
+ RecoveryMessages rm;
+ handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
+ dispatch_recovery_messages(rm, priority);
+ return true;
+ }
+ case MSG_OSD_PG_PUSH: {
+ auto op = _op->get_req<MOSDPGPush>();
+ RecoveryMessages rm;
+ for (vector<PushOp>::const_iterator i = op->pushes.begin();
+ i != op->pushes.end();
+ ++i) {
+ handle_recovery_push(*i, &rm, op->is_repair);
+ }
+ dispatch_recovery_messages(rm, priority);
+ return true;
+ }
+ case MSG_OSD_PG_PUSH_REPLY: {
+ const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
+ _op->get_req());
+ RecoveryMessages rm;
+ for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
+ i != op->replies.end();
+ ++i) {
+ handle_recovery_push_reply(*i, op->from, &rm);
+ }
+ dispatch_recovery_messages(rm, priority);
+ return true;
+ }
+ default:
+ return false;
+ }
+ return false;
+}
+
+struct SubWriteCommitted : public Context {
+ ECBackend *pg;
+ OpRequestRef msg;
+ ceph_tid_t tid;
+ eversion_t version;
+ eversion_t last_complete;
+ const ZTracer::Trace trace;
+ SubWriteCommitted(
+ ECBackend *pg,
+ OpRequestRef msg,
+ ceph_tid_t tid,
+ eversion_t version,
+ eversion_t last_complete,
+ const ZTracer::Trace &trace)
+ : pg(pg), msg(msg), tid(tid),
+ version(version), last_complete(last_complete), trace(trace) {}
+ void finish(int) override {
+ if (msg)
+ msg->mark_event("sub_op_committed");
+ pg->sub_write_committed(tid, version, last_complete, trace);
+ }
+};
+void ECBackend::sub_write_committed(
+ ceph_tid_t tid, eversion_t version, eversion_t last_complete,
+ const ZTracer::Trace &trace) {
+ if (get_parent()->pgb_is_primary()) {
+ ECSubWriteReply reply;
+ reply.tid = tid;
+ reply.last_complete = last_complete;
+ reply.committed = true;
+ reply.applied = true;
+ reply.from = get_parent()->whoami_shard();
+ handle_sub_write_reply(
+ get_parent()->whoami_shard(),
+ reply, trace);
+ } else {
+ get_parent()->update_last_complete_ondisk(last_complete);
+ MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
+ r->pgid = get_parent()->primary_spg_t();
+ r->map_epoch = get_osdmap_epoch();
+ r->min_epoch = get_parent()->get_interval_start_epoch();
+ r->op.tid = tid;
+ r->op.last_complete = last_complete;
+ r->op.committed = true;
+ r->op.applied = true;
+ r->op.from = get_parent()->whoami_shard();
+ r->set_priority(CEPH_MSG_PRIO_HIGH);
+ r->trace = trace;
+ r->trace.event("sending sub op commit");
+ get_parent()->send_message_osd_cluster(
+ get_parent()->primary_shard().osd, r, get_osdmap_epoch());
+ }
+}
+
+void ECBackend::handle_sub_write(
+ pg_shard_t from,
+ OpRequestRef msg,
+ ECSubWrite &op,
+ const ZTracer::Trace &trace)
+{
+ if (msg) {
+ msg->mark_event("sub_op_started");
+ }
+ trace.event("handle_sub_write");
+
+ if (!get_parent()->pgb_is_primary())
+ get_parent()->update_stats(op.stats);
+ ObjectStore::Transaction localt;
+ if (!op.temp_added.empty()) {
+ add_temp_objs(op.temp_added);
+ }
+ if (op.backfill_or_async_recovery) {
+ for (set<hobject_t>::iterator i = op.temp_removed.begin();
+ i != op.temp_removed.end();
+ ++i) {
+ dout(10) << __func__ << ": removing object " << *i
+ << " since we won't get the transaction" << dendl;
+ localt.remove(
+ coll,
+ ghobject_t(
+ *i,
+ ghobject_t::NO_GEN,
+ get_parent()->whoami_shard().shard));
+ }
+ }
+ clear_temp_objs(op.temp_removed);
+ dout(30) << __func__ << " missing before " << get_parent()->get_log().get_missing().get_items() << dendl;
+ // flag set to true during async recovery
+ bool async = false;
+ pg_missing_tracker_t pmissing = get_parent()->get_local_missing();
+ if (pmissing.is_missing(op.soid)) {
+ async = true;
+ dout(30) << __func__ << " is_missing " << pmissing.is_missing(op.soid) << dendl;
+ for (auto &&e: op.log_entries) {
+ dout(30) << " add_next_event entry " << e << dendl;
+ get_parent()->add_local_next_event(e);
+ dout(30) << " entry is_delete " << e.is_delete() << dendl;
+ }
+ }
+ get_parent()->log_operation(
+ std::move(op.log_entries),
+ op.updated_hit_set_history,
+ op.trim_to,
+ op.roll_forward_to,
+ op.roll_forward_to,
+ !op.backfill_or_async_recovery,
+ localt,
+ async);
+
+ if (!get_parent()->pg_is_undersized() &&
+ (unsigned)get_parent()->whoami_shard().shard >=
+ ec_impl->get_data_chunk_count())
+ op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+
+ localt.register_on_commit(
+ get_parent()->bless_context(
+ new SubWriteCommitted(
+ this, msg, op.tid,
+ op.at_version,
+ get_parent()->get_info().last_complete, trace)));
+ vector<ObjectStore::Transaction> tls;
+ tls.reserve(2);
+ tls.push_back(std::move(op.t));
+ tls.push_back(std::move(localt));
+ get_parent()->queue_transactions(tls, msg);
+ dout(30) << __func__ << " missing after" << get_parent()->get_log().get_missing().get_items() << dendl;
+ if (op.at_version != eversion_t()) {
+ // dummy rollforward transaction doesn't get at_version (and doesn't advance it)
+ get_parent()->op_applied(op.at_version);
+ }
+}
+
+void ECBackend::handle_sub_read(
+ pg_shard_t from,
+ const ECSubRead &op,
+ ECSubReadReply *reply,
+ const ZTracer::Trace &trace)
+{
+ trace.event("handle sub read");
+ shard_id_t shard = get_parent()->whoami_shard().shard;
+ for(auto i = op.to_read.begin();
+ i != op.to_read.end();
+ ++i) {
+ int r = 0;
+ for (auto j = i->second.begin(); j != i->second.end(); ++j) {
+ bufferlist bl;
+ if ((op.subchunks.find(i->first)->second.size() == 1) &&
+ (op.subchunks.find(i->first)->second.front().second ==
+ ec_impl->get_sub_chunk_count())) {
+ dout(25) << __func__ << " case1: reading the complete chunk/shard." << dendl;
+ r = store->read(
+ ch,
+ ghobject_t(i->first, ghobject_t::NO_GEN, shard),
+ j->get<0>(),
+ j->get<1>(),
+ bl, j->get<2>()); // Allow EIO return
+ } else {
+ dout(25) << __func__ << " case2: going to do fragmented read." << dendl;
+ int subchunk_size =
+ sinfo.get_chunk_size() / ec_impl->get_sub_chunk_count();
+ bool error = false;
+ for (int m = 0; m < (int)j->get<1>() && !error;
+ m += sinfo.get_chunk_size()) {
+ for (auto &&k:op.subchunks.find(i->first)->second) {
+ bufferlist bl0;
+ r = store->read(
+ ch,
+ ghobject_t(i->first, ghobject_t::NO_GEN, shard),
+ j->get<0>() + m + (k.first)*subchunk_size,
+ (k.second)*subchunk_size,
+ bl0, j->get<2>());
+ if (r < 0) {
+ error = true;
+ break;
+ }
+ bl.claim_append(bl0);
+ }
+ }
+ }
+
+ if (r < 0) {
+ // if we are doing fast reads, it's possible for one of the shard
+ // reads to cross paths with another update and get a (harmless)
+ // ENOENT. Suppress the message to the cluster log in that case.
+ if (r == -ENOENT && get_parent()->get_pool().fast_read) {
+ dout(5) << __func__ << ": Error " << r
+ << " reading " << i->first << ", fast read, probably ok"
+ << dendl;
+ } else {
+ get_parent()->clog_error() << "Error " << r
+ << " reading object "
+ << i->first;
+ dout(5) << __func__ << ": Error " << r
+ << " reading " << i->first << dendl;
+ }
+ goto error;
+ } else {
+ dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
+ reply->buffers_read[i->first].push_back(
+ make_pair(
+ j->get<0>(),
+ bl)
+ );
+ }
+
+ if (!get_parent()->get_pool().allows_ecoverwrites()) {
+ // This shows that we still need deep scrub because large enough files
+ // are read in sections, so the digest check here won't be done here.
+ // Do NOT check osd_read_eio_on_bad_digest here. We need to report
+ // the state of our chunk in case other chunks could substitute.
+ ECUtil::HashInfoRef hinfo;
+ hinfo = get_hash_info(i->first);
+ if (!hinfo) {
+ r = -EIO;
+ get_parent()->clog_error() << "Corruption detected: object "
+ << i->first
+ << " is missing hash_info";
+ dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
+ goto error;
+ }
+ ceph_assert(hinfo->has_chunk_hash());
+ if ((bl.length() == hinfo->get_total_chunk_size()) &&
+ (j->get<0>() == 0)) {
+ dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
+ bufferhash h(-1);
+ h << bl;
+ if (h.digest() != hinfo->get_chunk_hash(shard)) {
+ get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x"
+ << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec;
+ dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
+ << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
+ r = -EIO;
+ goto error;
+ }
+ }
+ }
+ }
+ continue;
+error:
+ // Do NOT check osd_read_eio_on_bad_digest here. We need to report
+ // the state of our chunk in case other chunks could substitute.
+ reply->buffers_read.erase(i->first);
+ reply->errors[i->first] = r;
+ }
+ for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
+ i != op.attrs_to_read.end();
+ ++i) {
+ dout(10) << __func__ << ": fulfilling attr request on "
+ << *i << dendl;
+ if (reply->errors.count(*i))
+ continue;
+ int r = store->getattrs(
+ ch,
+ ghobject_t(
+ *i, ghobject_t::NO_GEN, shard),
+ reply->attrs_read[*i]);
+ if (r < 0) {
+ // If we read error, we should not return the attrs too.
+ reply->attrs_read.erase(*i);
+ reply->buffers_read.erase(*i);
+ reply->errors[*i] = r;
+ }
+ }
+ reply->from = get_parent()->whoami_shard();
+ reply->tid = op.tid;
+}
+
+void ECBackend::handle_sub_write_reply(
+ pg_shard_t from,
+ const ECSubWriteReply &op,
+ const ZTracer::Trace &trace)
+{
+ map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
+ ceph_assert(i != tid_to_op_map.end());
+ if (op.committed) {
+ trace.event("sub write committed");
+ ceph_assert(i->second.pending_commit.count(from));
+ i->second.pending_commit.erase(from);
+ if (from != get_parent()->whoami_shard()) {
+ get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
+ }
+ }
+ if (op.applied) {
+ trace.event("sub write applied");
+ ceph_assert(i->second.pending_apply.count(from));
+ i->second.pending_apply.erase(from);
+ }
+
+ if (i->second.pending_commit.empty() &&
+ i->second.on_all_commit &&
+ // also wait for apply, to preserve ordering with luminous peers.
+ i->second.pending_apply.empty()) {
+ dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
+ i->second.on_all_commit->complete(0);
+ i->second.on_all_commit = 0;
+ i->second.trace.event("ec write all committed");
+ }
+ check_ops();
+}
+
+void ECBackend::handle_sub_read_reply(
+ pg_shard_t from,
+ ECSubReadReply &op,
+ RecoveryMessages *m,
+ const ZTracer::Trace &trace)
+{
+ trace.event("ec sub read reply");
+ dout(10) << __func__ << ": reply " << op << dendl;
+ map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
+ if (iter == tid_to_read_map.end()) {
+ //canceled
+ dout(20) << __func__ << ": dropped " << op << dendl;
+ return;
+ }
+ ReadOp &rop = iter->second;
+ for (auto i = op.buffers_read.begin();
+ i != op.buffers_read.end();
+ ++i) {
+ ceph_assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer
+ if (!rop.to_read.count(i->first)) {
+ // We canceled this read! @see filter_read_op
+ dout(20) << __func__ << " to_read skipping" << dendl;
+ continue;
+ }
+ list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter =
+ rop.to_read.find(i->first)->second.to_read.begin();
+ list<
+ boost::tuple<
+ uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
+ rop.complete[i->first].returned.begin();
+ for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j, ++req_iter, ++riter) {
+ ceph_assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
+ ceph_assert(riter != rop.complete[i->first].returned.end());
+ pair<uint64_t, uint64_t> adjusted =
+ sinfo.aligned_offset_len_to_chunk(
+ make_pair(req_iter->get<0>(), req_iter->get<1>()));
+ ceph_assert(adjusted.first == j->first);
+ riter->get<2>()[from] = std::move(j->second);
+ }
+ }
+ for (auto i = op.attrs_read.begin();
+ i != op.attrs_read.end();
+ ++i) {
+ ceph_assert(!op.errors.count(i->first)); // if read error better not have sent an attribute
+ if (!rop.to_read.count(i->first)) {
+ // We canceled this read! @see filter_read_op
+ dout(20) << __func__ << " to_read skipping" << dendl;
+ continue;
+ }
+ rop.complete[i->first].attrs.emplace();
+ (*(rop.complete[i->first].attrs)).swap(i->second);
+ }
+ for (auto i = op.errors.begin();
+ i != op.errors.end();
+ ++i) {
+ rop.complete[i->first].errors.insert(
+ make_pair(
+ from,
+ i->second));
+ dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl;
+ }
+
+ map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
+ shard_to_read_map.find(from);
+ ceph_assert(siter != shard_to_read_map.end());
+ ceph_assert(siter->second.count(op.tid));
+ siter->second.erase(op.tid);
+
+ ceph_assert(rop.in_progress.count(from));
+ rop.in_progress.erase(from);
+ unsigned is_complete = 0;
+ bool need_resend = false;
+ // For redundant reads check for completion as each shard comes in,
+ // or in a non-recovery read check for completion once all the shards read.
+ if (rop.do_redundant_reads || rop.in_progress.empty()) {
+ for (map<hobject_t, read_result_t>::const_iterator iter =
+ rop.complete.begin();
+ iter != rop.complete.end();
+ ++iter) {
+ set<int> have;
+ for (map<pg_shard_t, bufferlist>::const_iterator j =
+ iter->second.returned.front().get<2>().begin();
+ j != iter->second.returned.front().get<2>().end();
+ ++j) {
+ have.insert(j->first.shard);
+ dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
+ }
+ map<int, vector<pair<int, int>>> dummy_minimum;
+ int err;
+ if ((err = ec_impl->minimum_to_decode(rop.want_to_read[iter->first], have, &dummy_minimum)) < 0) {
+ dout(20) << __func__ << " minimum_to_decode failed" << dendl;
+ if (rop.in_progress.empty()) {
+ // If we don't have enough copies, try other pg_shard_ts if available.
+ // During recovery there may be multiple osds with copies of the same shard,
+ // so getting EIO from one may result in multiple passes through this code path.
+ if (!rop.do_redundant_reads) {
+ int r = send_all_remaining_reads(iter->first, rop);
+ if (r == 0) {
+ // We changed the rop's to_read and not incrementing is_complete
+ need_resend = true;
+ continue;
+ }
+ // Couldn't read any additional shards so handle as completed with errors
+ }
+ // We don't want to confuse clients / RBD with objectstore error
+ // values in particular ENOENT. We may have different error returns
+ // from different shards, so we'll return minimum_to_decode() error
+ // (usually EIO) to reader. It is likely an error here is due to a
+ // damaged pg.
+ rop.complete[iter->first].r = err;
+ ++is_complete;
+ }
+ } else {
+ ceph_assert(rop.complete[iter->first].r == 0);
+ if (!rop.complete[iter->first].errors.empty()) {
+ if (cct->_conf->osd_read_ec_check_for_errors) {
+ dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl;
+ err = rop.complete[iter->first].errors.begin()->second;
+ rop.complete[iter->first].r = err;
+ } else {
+ get_parent()->clog_warn() << "Error(s) ignored for "
+ << iter->first << " enough copies available";
+ dout(10) << __func__ << " Error(s) ignored for " << iter->first
+ << " enough copies available" << dendl;
+ rop.complete[iter->first].errors.clear();
+ }
+ }
+ // avoid re-read for completed object as we may send remaining reads for uncopmpleted objects
+ rop.to_read.at(iter->first).need.clear();
+ rop.to_read.at(iter->first).want_attrs = false;
+ ++is_complete;
+ }
+ }
+ }
+ if (need_resend) {
+ do_read_op(rop);
+ } else if (rop.in_progress.empty() ||
+ is_complete == rop.complete.size()) {
+ dout(20) << __func__ << " Complete: " << rop << dendl;
+ rop.trace.event("ec read complete");
+ complete_read_op(rop, m);
+ } else {
+ dout(10) << __func__ << " readop not complete: " << rop << dendl;
+ }
+}
+
+void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
+{
+ map<hobject_t, read_request_t>::iterator reqiter =
+ rop.to_read.begin();
+ map<hobject_t, read_result_t>::iterator resiter =
+ rop.complete.begin();
+ ceph_assert(rop.to_read.size() == rop.complete.size());
+ for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
+ if (reqiter->second.cb) {
+ pair<RecoveryMessages *, read_result_t &> arg(
+ m, resiter->second);
+ reqiter->second.cb->complete(arg);
+ reqiter->second.cb = nullptr;
+ }
+ }
+ // if the read op is over. clean all the data of this tid.
+ for (set<pg_shard_t>::iterator iter = rop.in_progress.begin();
+ iter != rop.in_progress.end();
+ iter++) {
+ shard_to_read_map[*iter].erase(rop.tid);
+ }
+ rop.in_progress.clear();
+ tid_to_read_map.erase(rop.tid);
+}
+
+struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
+ ECBackend *ec;
+ ceph_tid_t tid;
+ FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
+ void finish(ThreadPool::TPHandle &handle) override {
+ auto ropiter = ec->tid_to_read_map.find(tid);
+ ceph_assert(ropiter != ec->tid_to_read_map.end());
+ int priority = ropiter->second.priority;
+ RecoveryMessages rm;
+ ec->complete_read_op(ropiter->second, &rm);
+ ec->dispatch_recovery_messages(rm, priority);
+ }
+};
+
+void ECBackend::filter_read_op(
+ const OSDMapRef& osdmap,
+ ReadOp &op)
+{
+ set<hobject_t> to_cancel;
+ for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
+ i != op.source_to_obj.end();
+ ++i) {
+ if (osdmap->is_down(i->first.osd)) {
+ to_cancel.insert(i->second.begin(), i->second.end());
+ op.in_progress.erase(i->first);
+ continue;
+ }
+ }
+
+ if (to_cancel.empty())
+ return;
+
+ for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
+ i != op.source_to_obj.end();
+ ) {
+ for (set<hobject_t>::iterator j = i->second.begin();
+ j != i->second.end();
+ ) {
+ if (to_cancel.count(*j))
+ i->second.erase(j++);
+ else
+ ++j;
+ }
+ if (i->second.empty()) {
+ op.source_to_obj.erase(i++);
+ } else {
+ ceph_assert(!osdmap->is_down(i->first.osd));
+ ++i;
+ }
+ }
+
+ for (set<hobject_t>::iterator i = to_cancel.begin();
+ i != to_cancel.end();
+ ++i) {
+ get_parent()->cancel_pull(*i);
+
+ ceph_assert(op.to_read.count(*i));
+ read_request_t &req = op.to_read.find(*i)->second;
+ dout(10) << __func__ << ": canceling " << req
+ << " for obj " << *i << dendl;
+ ceph_assert(req.cb);
+ delete req.cb;
+ req.cb = nullptr;
+
+ op.to_read.erase(*i);
+ op.complete.erase(*i);
+ recovery_ops.erase(*i);
+ }
+
+ if (op.in_progress.empty()) {
+ /* This case is odd. filter_read_op gets called while processing
+ * an OSDMap. Normal, non-recovery reads only happen from acting
+ * set osds. For this op to have had a read source go down and
+ * there not be an interval change, it must be part of a pull during
+ * log-based recovery.
+ *
+ * This callback delays calling complete_read_op until later to avoid
+ * dealing with recovery while handling an OSDMap. We assign a
+ * cost here of 1 because:
+ * 1) This should be very rare, and the operation itself was already
+ * throttled.
+ * 2) It shouldn't result in IO, rather it should result in restarting
+ * the pull on the affected objects and pushes from in-memory buffers
+ * on any now complete unaffected objects.
+ */
+ get_parent()->schedule_recovery_work(
+ get_parent()->bless_unlocked_gencontext(
+ new FinishReadOp(this, op.tid)),
+ 1);
+ }
+}
+
+void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
+{
+ set<ceph_tid_t> tids_to_filter;
+ for (map<pg_shard_t, set<ceph_tid_t> >::iterator
+ i = shard_to_read_map.begin();
+ i != shard_to_read_map.end();
+ ) {
+ if (osdmap->is_down(i->first.osd)) {
+ tids_to_filter.insert(i->second.begin(), i->second.end());
+ shard_to_read_map.erase(i++);
+ } else {
+ ++i;
+ }
+ }
+ for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
+ i != tids_to_filter.end();
+ ++i) {
+ map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
+ ceph_assert(j != tid_to_read_map.end());
+ filter_read_op(osdmap, j->second);
+ }
+}
+
+void ECBackend::on_change()
+{
+ dout(10) << __func__ << dendl;
+
+ completed_to = eversion_t();
+ committed_to = eversion_t();
+ pipeline_state.clear();
+ waiting_reads.clear();
+ waiting_state.clear();
+ waiting_commit.clear();
+ for (auto &&op: tid_to_op_map) {
+ cache.release_write_pin(op.second.pin);
+ }
+ tid_to_op_map.clear();
+
+ for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
+ i != tid_to_read_map.end();
+ ++i) {
+ dout(10) << __func__ << ": cancelling " << i->second << dendl;
+ for (map<hobject_t, read_request_t>::iterator j =
+ i->second.to_read.begin();
+ j != i->second.to_read.end();
+ ++j) {
+ delete j->second.cb;
+ j->second.cb = nullptr;
+ }
+ }
+ tid_to_read_map.clear();
+ in_progress_client_reads.clear();
+ shard_to_read_map.clear();
+ clear_recovery_state();
+}
+
+void ECBackend::clear_recovery_state()
+{
+ recovery_ops.clear();
+}
+
+void ECBackend::dump_recovery_info(Formatter *f) const
+{
+ f->open_array_section("recovery_ops");
+ for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
+ i != recovery_ops.end();
+ ++i) {
+ f->open_object_section("op");
+ i->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ f->open_array_section("read_ops");
+ for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
+ i != tid_to_read_map.end();
+ ++i) {
+ f->open_object_section("read_op");
+ i->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+}
+
+void ECBackend::submit_transaction(
+ const hobject_t &hoid,
+ const object_stat_sum_t &delta_stats,
+ const eversion_t &at_version,
+ PGTransactionUPtr &&t,
+ const eversion_t &trim_to,
+ const eversion_t &min_last_complete_ondisk,
+ vector<pg_log_entry_t>&& log_entries,
+ std::optional<pg_hit_set_history_t> &hset_history,
+ Context *on_all_commit,
+ ceph_tid_t tid,
+ osd_reqid_t reqid,
+ OpRequestRef client_op
+ )
+{
+ ceph_assert(!tid_to_op_map.count(tid));
+ Op *op = &(tid_to_op_map[tid]);
+ op->hoid = hoid;
+ op->delta_stats = delta_stats;
+ op->version = at_version;
+ op->trim_to = trim_to;
+ op->roll_forward_to = std::max(min_last_complete_ondisk, committed_to);
+ op->log_entries = log_entries;
+ std::swap(op->updated_hit_set_history, hset_history);
+ op->on_all_commit = on_all_commit;
+ op->tid = tid;
+ op->reqid = reqid;
+ op->client_op = client_op;
+ if (client_op) {
+ op->trace = client_op->pg_trace;
+ }
+ dout(10) << __func__ << ": op " << *op << " starting" << dendl;
+ start_rmw(op, std::move(t));
+}
+
+void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
+ if (!waiting_state.empty()) {
+ waiting_state.back().on_write.emplace_back(std::move(cb));
+ } else if (!waiting_reads.empty()) {
+ waiting_reads.back().on_write.emplace_back(std::move(cb));
+ } else {
+ // Nothing earlier in the pipeline, just call it
+ cb();
+ }
+}
+
+void ECBackend::get_all_avail_shards(
+ const hobject_t &hoid,
+ const set<pg_shard_t> &error_shards,
+ set<int> &have,
+ map<shard_id_t, pg_shard_t> &shards,
+ bool for_recovery)
+{
+ for (set<pg_shard_t>::const_iterator i =
+ get_parent()->get_acting_shards().begin();
+ i != get_parent()->get_acting_shards().end();
+ ++i) {
+ dout(10) << __func__ << ": checking acting " << *i << dendl;
+ const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
+ if (error_shards.find(*i) != error_shards.end())
+ continue;
+ if (!missing.is_missing(hoid)) {
+ ceph_assert(!have.count(i->shard));
+ have.insert(i->shard);
+ ceph_assert(!shards.count(i->shard));
+ shards.insert(make_pair(i->shard, *i));
+ }
+ }
+
+ if (for_recovery) {
+ for (set<pg_shard_t>::const_iterator i =
+ get_parent()->get_backfill_shards().begin();
+ i != get_parent()->get_backfill_shards().end();
+ ++i) {
+ if (error_shards.find(*i) != error_shards.end())
+ continue;
+ if (have.count(i->shard)) {
+ ceph_assert(shards.count(i->shard));
+ continue;
+ }
+ dout(10) << __func__ << ": checking backfill " << *i << dendl;
+ ceph_assert(!shards.count(i->shard));
+ const pg_info_t &info = get_parent()->get_shard_info(*i);
+ const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
+ if (hoid < info.last_backfill &&
+ !missing.is_missing(hoid)) {
+ have.insert(i->shard);
+ shards.insert(make_pair(i->shard, *i));
+ }
+ }
+
+ map<hobject_t, set<pg_shard_t>>::const_iterator miter =
+ get_parent()->get_missing_loc_shards().find(hoid);
+ if (miter != get_parent()->get_missing_loc_shards().end()) {
+ for (set<pg_shard_t>::iterator i = miter->second.begin();
+ i != miter->second.end();
+ ++i) {
+ dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
+ auto m = get_parent()->maybe_get_shard_missing(*i);
+ if (m) {
+ ceph_assert(!(*m).is_missing(hoid));
+ }
+ if (error_shards.find(*i) != error_shards.end())
+ continue;
+ have.insert(i->shard);
+ shards.insert(make_pair(i->shard, *i));
+ }
+ }
+ }
+}
+
+int ECBackend::get_min_avail_to_read_shards(
+ const hobject_t &hoid,
+ const set<int> &want,
+ bool for_recovery,
+ bool do_redundant_reads,
+ map<pg_shard_t, vector<pair<int, int>>> *to_read)
+{
+ // Make sure we don't do redundant reads for recovery
+ ceph_assert(!for_recovery || !do_redundant_reads);
+
+ set<int> have;
+ map<shard_id_t, pg_shard_t> shards;
+ set<pg_shard_t> error_shards;
+
+ get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
+
+ map<int, vector<pair<int, int>>> need;
+ int r = ec_impl->minimum_to_decode(want, have, &need);
+ if (r < 0)
+ return r;
+
+ if (do_redundant_reads) {
+ vector<pair<int, int>> subchunks_list;
+ subchunks_list.push_back(make_pair(0, ec_impl->get_sub_chunk_count()));
+ for (auto &&i: have) {
+ need[i] = subchunks_list;
+ }
+ }
+
+ if (!to_read)
+ return 0;
+
+ for (auto &&i:need) {
+ ceph_assert(shards.count(shard_id_t(i.first)));
+ to_read->insert(make_pair(shards[shard_id_t(i.first)], i.second));
+ }
+ return 0;
+}
+
+int ECBackend::get_remaining_shards(
+ const hobject_t &hoid,
+ const set<int> &avail,
+ const set<int> &want,
+ const read_result_t &result,
+ map<pg_shard_t, vector<pair<int, int>>> *to_read,
+ bool for_recovery)
+{
+ ceph_assert(to_read);
+
+ set<int> have;
+ map<shard_id_t, pg_shard_t> shards;
+ set<pg_shard_t> error_shards;
+ for (auto &p : result.errors) {
+ error_shards.insert(p.first);
+ }
+
+ get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
+
+ map<int, vector<pair<int, int>>> need;
+ int r = ec_impl->minimum_to_decode(want, have, &need);
+ if (r < 0) {
+ dout(0) << __func__ << " not enough shards left to try for " << hoid
+ << " read result was " << result << dendl;
+ return -EIO;
+ }
+
+ set<int> shards_left;
+ for (auto p : need) {
+ if (avail.find(p.first) == avail.end()) {
+ shards_left.insert(p.first);
+ }
+ }
+
+ vector<pair<int, int>> subchunks;
+ subchunks.push_back(make_pair(0, ec_impl->get_sub_chunk_count()));
+ for (set<int>::iterator i = shards_left.begin();
+ i != shards_left.end();
+ ++i) {
+ ceph_assert(shards.count(shard_id_t(*i)));
+ ceph_assert(avail.find(*i) == avail.end());
+ to_read->insert(make_pair(shards[shard_id_t(*i)], subchunks));
+ }
+ return 0;
+}
+
+void ECBackend::start_read_op(
+ int priority,
+ map<hobject_t, set<int>> &want_to_read,
+ map<hobject_t, read_request_t> &to_read,
+ OpRequestRef _op,
+ bool do_redundant_reads,
+ bool for_recovery)
+{
+ ceph_tid_t tid = get_parent()->get_tid();
+ ceph_assert(!tid_to_read_map.count(tid));
+ auto &op = tid_to_read_map.emplace(
+ tid,
+ ReadOp(
+ priority,
+ tid,
+ do_redundant_reads,
+ for_recovery,
+ _op,
+ std::move(want_to_read),
+ std::move(to_read))).first->second;
+ dout(10) << __func__ << ": starting " << op << dendl;
+ if (_op) {
+ op.trace = _op->pg_trace;
+ op.trace.event("start ec read");
+ }
+ do_read_op(op);
+}
+
+void ECBackend::do_read_op(ReadOp &op)
+{
+ int priority = op.priority;
+ ceph_tid_t tid = op.tid;
+
+ dout(10) << __func__ << ": starting read " << op << dendl;
+
+ map<pg_shard_t, ECSubRead> messages;
+ for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
+ i != op.to_read.end();
+ ++i) {
+ bool need_attrs = i->second.want_attrs;
+
+ for (auto j = i->second.need.begin();
+ j != i->second.need.end();
+ ++j) {
+ if (need_attrs) {
+ messages[j->first].attrs_to_read.insert(i->first);
+ need_attrs = false;
+ }
+ messages[j->first].subchunks[i->first] = j->second;
+ op.obj_to_source[i->first].insert(j->first);
+ op.source_to_obj[j->first].insert(i->first);
+ }
+ for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
+ i->second.to_read.begin();
+ j != i->second.to_read.end();
+ ++j) {
+ pair<uint64_t, uint64_t> chunk_off_len =
+ sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
+ for (auto k = i->second.need.begin();
+ k != i->second.need.end();
+ ++k) {
+ messages[k->first].to_read[i->first].push_back(
+ boost::make_tuple(
+ chunk_off_len.first,
+ chunk_off_len.second,
+ j->get<2>()));
+ }
+ ceph_assert(!need_attrs);
+ }
+ }
+
+ std::vector<std::pair<int, Message*>> m;
+ m.reserve(messages.size());
+ for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
+ i != messages.end();
+ ++i) {
+ op.in_progress.insert(i->first);
+ shard_to_read_map[i->first].insert(op.tid);
+ i->second.tid = tid;
+ MOSDECSubOpRead *msg = new MOSDECSubOpRead;
+ msg->set_priority(priority);
+ msg->pgid = spg_t(
+ get_parent()->whoami_spg_t().pgid,
+ i->first.shard);
+ msg->map_epoch = get_osdmap_epoch();
+ msg->min_epoch = get_parent()->get_interval_start_epoch();
+ msg->op = i->second;
+ msg->op.from = get_parent()->whoami_shard();
+ msg->op.tid = tid;
+ if (op.trace) {
+ // initialize a child span for this shard
+ msg->trace.init("ec sub read", nullptr, &op.trace);
+ msg->trace.keyval("shard", i->first.shard.id);
+ }
+ m.push_back(std::make_pair(i->first.osd, msg));
+ }
+ if (!m.empty()) {
+ get_parent()->send_message_osd_cluster(m, get_osdmap_epoch());
+ }
+
+ dout(10) << __func__ << ": started " << op << dendl;
+}
+
+ECUtil::HashInfoRef ECBackend::get_hash_info(
+ const hobject_t &hoid, bool create, const map<string,bufferptr,less<>> *attrs)
+{
+ dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
+ ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
+ if (!ref) {
+ dout(10) << __func__ << ": not in cache " << hoid << dendl;
+ struct stat st;
+ int r = store->stat(
+ ch,
+ ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ &st);
+ ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
+ if (r >= 0) {
+ dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
+ bufferlist bl;
+ if (attrs) {
+ map<string, bufferptr>::const_iterator k = attrs->find(ECUtil::get_hinfo_key());
+ if (k == attrs->end()) {
+ dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl;
+ } else {
+ bl.push_back(k->second);
+ }
+ } else {
+ r = store->getattr(
+ ch,
+ ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ ECUtil::get_hinfo_key(),
+ bl);
+ if (r < 0) {
+ dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl;
+ bl.clear(); // just in case
+ }
+ }
+ if (bl.length() > 0) {
+ auto bp = bl.cbegin();
+ try {
+ decode(hinfo, bp);
+ } catch(...) {
+ dout(0) << __func__ << ": Can't decode hinfo for " << hoid << dendl;
+ return ECUtil::HashInfoRef();
+ }
+ if (hinfo.get_total_chunk_size() != (uint64_t)st.st_size) {
+ dout(0) << __func__ << ": Mismatch of total_chunk_size "
+ << hinfo.get_total_chunk_size() << dendl;
+ return ECUtil::HashInfoRef();
+ }
+ } else if (st.st_size > 0) { // If empty object and no hinfo, create it
+ return ECUtil::HashInfoRef();
+ }
+ } else if (r != -ENOENT || !create) {
+ derr << __func__ << ": stat " << hoid << " failed: " << cpp_strerror(r)
+ << dendl;
+ return ECUtil::HashInfoRef();
+ }
+ ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
+ }
+ return ref;
+}
+
+void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
+{
+ ceph_assert(op);
+
+ op->plan = ECTransaction::get_write_plan(
+ sinfo,
+ std::move(t),
+ [&](const hobject_t &i) {
+ ECUtil::HashInfoRef ref = get_hash_info(i, true);
+ if (!ref) {
+ derr << __func__ << ": get_hash_info(" << i << ")"
+ << " returned a null pointer and there is no "
+ << " way to recover from such an error in this "
+ << " context" << dendl;
+ ceph_abort();
+ }
+ return ref;
+ },
+ get_parent()->get_dpp());
+
+ dout(10) << __func__ << ": " << *op << dendl;
+
+ waiting_state.push_back(*op);
+ check_ops();
+}
+
+bool ECBackend::try_state_to_reads()
+{
+ if (waiting_state.empty())
+ return false;
+
+ Op *op = &(waiting_state.front());
+ if (op->requires_rmw() && pipeline_state.cache_invalid()) {
+ ceph_assert(get_parent()->get_pool().allows_ecoverwrites());
+ dout(20) << __func__ << ": blocking " << *op
+ << " because it requires an rmw and the cache is invalid "
+ << pipeline_state
+ << dendl;
+ return false;
+ }
+
+ if (!pipeline_state.caching_enabled()) {
+ op->using_cache = false;
+ } else if (op->invalidates_cache()) {
+ dout(20) << __func__ << ": invalidating cache after this op"
+ << dendl;
+ pipeline_state.invalidate();
+ }
+
+ waiting_state.pop_front();
+ waiting_reads.push_back(*op);
+
+ if (op->using_cache) {
+ cache.open_write_pin(op->pin);
+
+ extent_set empty;
+ for (auto &&hpair: op->plan.will_write) {
+ auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
+ const extent_set &to_read_plan =
+ to_read_plan_iter == op->plan.to_read.end() ?
+ empty :
+ to_read_plan_iter->second;
+
+ extent_set remote_read = cache.reserve_extents_for_rmw(
+ hpair.first,
+ op->pin,
+ hpair.second,
+ to_read_plan);
+
+ extent_set pending_read = to_read_plan;
+ pending_read.subtract(remote_read);
+
+ if (!remote_read.empty()) {
+ op->remote_read[hpair.first] = std::move(remote_read);
+ }
+ if (!pending_read.empty()) {
+ op->pending_read[hpair.first] = std::move(pending_read);
+ }
+ }
+ } else {
+ op->remote_read = op->plan.to_read;
+ }
+
+ dout(10) << __func__ << ": " << *op << dendl;
+
+ if (!op->remote_read.empty()) {
+ ceph_assert(get_parent()->get_pool().allows_ecoverwrites());
+ objects_read_async_no_cache(
+ op->remote_read,
+ [this, op](map<hobject_t,pair<int, extent_map> > &&results) {
+ for (auto &&i: results) {
+ op->remote_read_result.emplace(i.first, i.second.second);
+ }
+ check_ops();
+ });
+ }
+
+ return true;
+}
+
+bool ECBackend::try_reads_to_commit()
+{
+ if (waiting_reads.empty())
+ return false;
+ Op *op = &(waiting_reads.front());
+ if (op->read_in_progress())
+ return false;
+ waiting_reads.pop_front();
+ waiting_commit.push_back(*op);
+
+ dout(10) << __func__ << ": starting commit on " << *op << dendl;
+ dout(20) << __func__ << ": " << cache << dendl;
+
+ get_parent()->apply_stats(
+ op->hoid,
+ op->delta_stats);
+
+ if (op->using_cache) {
+ for (auto &&hpair: op->pending_read) {
+ op->remote_read_result[hpair.first].insert(
+ cache.get_remaining_extents_for_rmw(
+ hpair.first,
+ op->pin,
+ hpair.second));
+ }
+ op->pending_read.clear();
+ } else {
+ ceph_assert(op->pending_read.empty());
+ }
+
+ map<shard_id_t, ObjectStore::Transaction> trans;
+ for (set<pg_shard_t>::const_iterator i =
+ get_parent()->get_acting_recovery_backfill_shards().begin();
+ i != get_parent()->get_acting_recovery_backfill_shards().end();
+ ++i) {
+ trans[i->shard];
+ }
+
+ op->trace.event("start ec write");
+
+ map<hobject_t,extent_map> written;
+ if (op->plan.t) {
+ ECTransaction::generate_transactions(
+ op->plan,
+ ec_impl,
+ get_parent()->get_info().pgid.pgid,
+ sinfo,
+ op->remote_read_result,
+ op->log_entries,
+ &written,
+ &trans,
+ &(op->temp_added),
+ &(op->temp_cleared),
+ get_parent()->get_dpp(),
+ get_osdmap()->require_osd_release);
+ }
+
+ dout(20) << __func__ << ": " << cache << dendl;
+ dout(20) << __func__ << ": written: " << written << dendl;
+ dout(20) << __func__ << ": op: " << *op << dendl;
+
+ if (!get_parent()->get_pool().allows_ecoverwrites()) {
+ for (auto &&i: op->log_entries) {
+ if (i.requires_kraken()) {
+ derr << __func__ << ": log entry " << i << " requires kraken"
+ << " but overwrites are not enabled!" << dendl;
+ ceph_abort();
+ }
+ }
+ }
+
+ map<hobject_t,extent_set> written_set;
+ for (auto &&i: written) {
+ written_set[i.first] = i.second.get_interval_set();
+ }
+ dout(20) << __func__ << ": written_set: " << written_set << dendl;
+ ceph_assert(written_set == op->plan.will_write);
+
+ if (op->using_cache) {
+ for (auto &&hpair: written) {
+ dout(20) << __func__ << ": " << hpair << dendl;
+ cache.present_rmw_update(hpair.first, op->pin, hpair.second);
+ }
+ }
+ op->remote_read.clear();
+ op->remote_read_result.clear();
+
+ ObjectStore::Transaction empty;
+ bool should_write_local = false;
+ ECSubWrite local_write_op;
+ std::vector<std::pair<int, Message*>> messages;
+ messages.reserve(get_parent()->get_acting_recovery_backfill_shards().size());
+ set<pg_shard_t> backfill_shards = get_parent()->get_backfill_shards();
+ for (set<pg_shard_t>::const_iterator i =
+ get_parent()->get_acting_recovery_backfill_shards().begin();
+ i != get_parent()->get_acting_recovery_backfill_shards().end();
+ ++i) {
+ op->pending_apply.insert(*i);
+ op->pending_commit.insert(*i);
+ map<shard_id_t, ObjectStore::Transaction>::iterator iter =
+ trans.find(i->shard);
+ ceph_assert(iter != trans.end());
+ bool should_send = get_parent()->should_send_op(*i, op->hoid);
+ const pg_stat_t &stats =
+ (should_send || !backfill_shards.count(*i)) ?
+ get_info().stats :
+ parent->get_shard_info().find(*i)->second.stats;
+
+ ECSubWrite sop(
+ get_parent()->whoami_shard(),
+ op->tid,
+ op->reqid,
+ op->hoid,
+ stats,
+ should_send ? iter->second : empty,
+ op->version,
+ op->trim_to,
+ op->roll_forward_to,
+ op->log_entries,
+ op->updated_hit_set_history,
+ op->temp_added,
+ op->temp_cleared,
+ !should_send);
+
+ ZTracer::Trace trace;
+ if (op->trace) {
+ // initialize a child span for this shard
+ trace.init("ec sub write", nullptr, &op->trace);
+ trace.keyval("shard", i->shard.id);
+ }
+
+ if (*i == get_parent()->whoami_shard()) {
+ should_write_local = true;
+ local_write_op.claim(sop);
+ } else {
+ MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
+ r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
+ r->map_epoch = get_osdmap_epoch();
+ r->min_epoch = get_parent()->get_interval_start_epoch();
+ r->trace = trace;
+ messages.push_back(std::make_pair(i->osd, r));
+ }
+ }
+
+ if (!messages.empty()) {
+ get_parent()->send_message_osd_cluster(messages, get_osdmap_epoch());
+ }
+
+ if (should_write_local) {
+ handle_sub_write(
+ get_parent()->whoami_shard(),
+ op->client_op,
+ local_write_op,
+ op->trace);
+ }
+
+ for (auto i = op->on_write.begin();
+ i != op->on_write.end();
+ op->on_write.erase(i++)) {
+ (*i)();
+ }
+
+ return true;
+}
+
+bool ECBackend::try_finish_rmw()
+{
+ if (waiting_commit.empty())
+ return false;
+ Op *op = &(waiting_commit.front());
+ if (op->write_in_progress())
+ return false;
+ waiting_commit.pop_front();
+
+ dout(10) << __func__ << ": " << *op << dendl;
+ dout(20) << __func__ << ": " << cache << dendl;
+
+ if (op->roll_forward_to > completed_to)
+ completed_to = op->roll_forward_to;
+ if (op->version > committed_to)
+ committed_to = op->version;
+
+ if (get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
+ if (op->version > get_parent()->get_log().get_can_rollback_to() &&
+ waiting_reads.empty() &&
+ waiting_commit.empty()) {
+ // submit a dummy transaction to kick the rollforward
+ auto tid = get_parent()->get_tid();
+ Op *nop = &(tid_to_op_map[tid]);
+ nop->hoid = op->hoid;
+ nop->trim_to = op->trim_to;
+ nop->roll_forward_to = op->version;
+ nop->tid = tid;
+ nop->reqid = op->reqid;
+ waiting_reads.push_back(*nop);
+ }
+ }
+
+ if (op->using_cache) {
+ cache.release_write_pin(op->pin);
+ }
+ tid_to_op_map.erase(op->tid);
+
+ if (waiting_reads.empty() &&
+ waiting_commit.empty()) {
+ pipeline_state.clear();
+ dout(20) << __func__ << ": clearing pipeline_state "
+ << pipeline_state
+ << dendl;
+ }
+ return true;
+}
+
+void ECBackend::check_ops()
+{
+ while (try_state_to_reads() ||
+ try_reads_to_commit() ||
+ try_finish_rmw());
+}
+
+int ECBackend::objects_read_sync(
+ const hobject_t &hoid,
+ uint64_t off,
+ uint64_t len,
+ uint32_t op_flags,
+ bufferlist *bl)
+{
+ return -EOPNOTSUPP;
+}
+
+void ECBackend::objects_read_async(
+ const hobject_t &hoid,
+ const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+ pair<bufferlist*, Context*> > > &to_read,
+ Context *on_complete,
+ bool fast_read)
+{
+ map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
+ reads;
+
+ uint32_t flags = 0;
+ extent_set es;
+ for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+ pair<bufferlist*, Context*> > >::const_iterator i =
+ to_read.begin();
+ i != to_read.end();
+ ++i) {
+ pair<uint64_t, uint64_t> tmp =
+ sinfo.offset_len_to_stripe_bounds(
+ make_pair(i->first.get<0>(), i->first.get<1>()));
+
+ es.union_insert(tmp.first, tmp.second);
+ flags |= i->first.get<2>();
+ }
+
+ if (!es.empty()) {
+ auto &offsets = reads[hoid];
+ for (auto j = es.begin();
+ j != es.end();
+ ++j) {
+ offsets.push_back(
+ boost::make_tuple(
+ j.get_start(),
+ j.get_len(),
+ flags));
+ }
+ }
+
+ struct cb {
+ ECBackend *ec;
+ hobject_t hoid;
+ list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+ pair<bufferlist*, Context*> > > to_read;
+ unique_ptr<Context> on_complete;
+ cb(const cb&) = delete;
+ cb(cb &&) = default;
+ cb(ECBackend *ec,
+ const hobject_t &hoid,
+ const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+ pair<bufferlist*, Context*> > > &to_read,
+ Context *on_complete)
+ : ec(ec),
+ hoid(hoid),
+ to_read(to_read),
+ on_complete(on_complete) {}
+ void operator()(map<hobject_t,pair<int, extent_map> > &&results) {
+ auto dpp = ec->get_parent()->get_dpp();
+ ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
+ << dendl;
+ ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
+ << dendl;
+
+ auto &got = results[hoid];
+
+ int r = 0;
+ for (auto &&read: to_read) {
+ if (got.first < 0) {
+ if (read.second.second) {
+ read.second.second->complete(got.first);
+ }
+ if (r == 0)
+ r = got.first;
+ } else {
+ ceph_assert(read.second.first);
+ uint64_t offset = read.first.get<0>();
+ uint64_t length = read.first.get<1>();
+ auto range = got.second.get_containing_range(offset, length);
+ ceph_assert(range.first != range.second);
+ ceph_assert(range.first.get_off() <= offset);
+ ldpp_dout(dpp, 30) << "offset: " << offset << dendl;
+ ldpp_dout(dpp, 30) << "range offset: " << range.first.get_off() << dendl;
+ ldpp_dout(dpp, 30) << "length: " << length << dendl;
+ ldpp_dout(dpp, 30) << "range length: " << range.first.get_len() << dendl;
+ ceph_assert(
+ (offset + length) <=
+ (range.first.get_off() + range.first.get_len()));
+ read.second.first->substr_of(
+ range.first.get_val(),
+ offset - range.first.get_off(),
+ length);
+ if (read.second.second) {
+ read.second.second->complete(length);
+ read.second.second = nullptr;
+ }
+ }
+ }
+ to_read.clear();
+ if (on_complete) {
+ on_complete.release()->complete(r);
+ }
+ }
+ ~cb() {
+ for (auto &&i: to_read) {
+ delete i.second.second;
+ }
+ to_read.clear();
+ }
+ };
+ objects_read_and_reconstruct(
+ reads,
+ fast_read,
+ make_gen_lambda_context<
+ map<hobject_t,pair<int, extent_map> > &&, cb>(
+ cb(this,
+ hoid,
+ to_read,
+ on_complete)));
+}
+
+struct CallClientContexts :
+ public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
+ hobject_t hoid;
+ ECBackend *ec;
+ ECBackend::ClientAsyncReadStatus *status;
+ list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
+ CallClientContexts(
+ hobject_t hoid,
+ ECBackend *ec,
+ ECBackend::ClientAsyncReadStatus *status,
+ const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
+ : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
+ void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
+ ECBackend::read_result_t &res = in.second;
+ extent_map result;
+ if (res.r != 0)
+ goto out;
+ ceph_assert(res.returned.size() == to_read.size());
+ ceph_assert(res.errors.empty());
+ for (auto &&read: to_read) {
+ pair<uint64_t, uint64_t> adjusted =
+ ec->sinfo.offset_len_to_stripe_bounds(
+ make_pair(read.get<0>(), read.get<1>()));
+ ceph_assert(res.returned.front().get<0>() == adjusted.first);
+ ceph_assert(res.returned.front().get<1>() == adjusted.second);
+ map<int, bufferlist> to_decode;
+ bufferlist bl;
+ for (map<pg_shard_t, bufferlist>::iterator j =
+ res.returned.front().get<2>().begin();
+ j != res.returned.front().get<2>().end();
+ ++j) {
+ to_decode[j->first.shard] = std::move(j->second);
+ }
+ int r = ECUtil::decode(
+ ec->sinfo,
+ ec->ec_impl,
+ to_decode,
+ &bl);
+ if (r < 0) {
+ res.r = r;
+ goto out;
+ }
+ bufferlist trimmed;
+ trimmed.substr_of(
+ bl,
+ read.get<0>() - adjusted.first,
+ std::min(read.get<1>(),
+ bl.length() - (read.get<0>() - adjusted.first)));
+ result.insert(
+ read.get<0>(), trimmed.length(), std::move(trimmed));
+ res.returned.pop_front();
+ }
+out:
+ status->complete_object(hoid, res.r, std::move(result));
+ ec->kick_reads();
+ }
+};
+
+void ECBackend::objects_read_and_reconstruct(
+ const map<hobject_t,
+ std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+ > &reads,
+ bool fast_read,
+ GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
+{
+ in_progress_client_reads.emplace_back(
+ reads.size(), std::move(func));
+ if (!reads.size()) {
+ kick_reads();
+ return;
+ }
+
+ map<hobject_t, set<int>> obj_want_to_read;
+ set<int> want_to_read;
+ get_want_to_read_shards(&want_to_read);
+
+ map<hobject_t, read_request_t> for_read_op;
+ for (auto &&to_read: reads) {
+ map<pg_shard_t, vector<pair<int, int>>> shards;
+ int r = get_min_avail_to_read_shards(
+ to_read.first,
+ want_to_read,
+ false,
+ fast_read,
+ &shards);
+ ceph_assert(r == 0);
+
+ CallClientContexts *c = new CallClientContexts(
+ to_read.first,
+ this,
+ &(in_progress_client_reads.back()),
+ to_read.second);
+ for_read_op.insert(
+ make_pair(
+ to_read.first,
+ read_request_t(
+ to_read.second,
+ shards,
+ false,
+ c)));
+ obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
+ }
+
+ start_read_op(
+ CEPH_MSG_PRIO_DEFAULT,
+ obj_want_to_read,
+ for_read_op,
+ OpRequestRef(),
+ fast_read, false);
+ return;
+}
+
+
+int ECBackend::send_all_remaining_reads(
+ const hobject_t &hoid,
+ ReadOp &rop)
+{
+ set<int> already_read;
+ const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
+ for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
+ already_read.insert(i->shard);
+ dout(10) << __func__ << " have/error shards=" << already_read << dendl;
+ map<pg_shard_t, vector<pair<int, int>>> shards;
+ int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid],
+ rop.complete[hoid], &shards, rop.for_recovery);
+ if (r)
+ return r;
+
+ list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
+ rop.to_read.find(hoid)->second.to_read;
+ GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
+ rop.to_read.find(hoid)->second.cb;
+
+ // (Note cuixf) If we need to read attrs and we read failed, try to read again.
+ bool want_attrs =
+ rop.to_read.find(hoid)->second.want_attrs &&
+ (!rop.complete[hoid].attrs || rop.complete[hoid].attrs->empty());
+ if (want_attrs) {
+ dout(10) << __func__ << " want attrs again" << dendl;
+ }
+
+ rop.to_read.erase(hoid);
+ rop.to_read.insert(make_pair(
+ hoid,
+ read_request_t(
+ offsets,
+ shards,
+ want_attrs,
+ c)));
+ return 0;
+}
+
+int ECBackend::objects_get_attrs(
+ const hobject_t &hoid,
+ map<string, bufferlist, less<>> *out)
+{
+ int r = store->getattrs(
+ ch,
+ ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ *out);
+ if (r < 0)
+ return r;
+
+ for (map<string, bufferlist>::iterator i = out->begin();
+ i != out->end();
+ ) {
+ if (ECUtil::is_hinfo_key_string(i->first))
+ out->erase(i++);
+ else
+ ++i;
+ }
+ return r;
+}
+
+void ECBackend::rollback_append(
+ const hobject_t &hoid,
+ uint64_t old_size,
+ ObjectStore::Transaction *t)
+{
+ ceph_assert(old_size % sinfo.get_stripe_width() == 0);
+ t->truncate(
+ coll,
+ ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ sinfo.aligned_logical_offset_to_chunk_offset(
+ old_size));
+}
+
+int ECBackend::be_deep_scrub(
+ const hobject_t &poid,
+ ScrubMap &map,
+ ScrubMapBuilder &pos,
+ ScrubMap::object &o)
+{
+ dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
+ int r;
+
+ uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+ CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
+
+ utime_t sleeptime;
+ sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
+ if (sleeptime != utime_t()) {
+ lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
+ sleeptime.sleep();
+ }
+
+ if (pos.data_pos == 0) {
+ pos.data_hash = bufferhash(-1);
+ }
+
+ uint64_t stride = cct->_conf->osd_deep_scrub_stride;
+ if (stride % sinfo.get_chunk_size())
+ stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
+
+ bufferlist bl;
+ r = store->read(
+ ch,
+ ghobject_t(
+ poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ pos.data_pos,
+ stride, bl,
+ fadvise_flags);
+ if (r < 0) {
+ dout(20) << __func__ << " " << poid << " got "
+ << r << " on read, read_error" << dendl;
+ o.read_error = true;
+ return 0;
+ }
+ if (bl.length() % sinfo.get_chunk_size()) {
+ dout(20) << __func__ << " " << poid << " got "
+ << r << " on read, not chunk size " << sinfo.get_chunk_size() << " aligned"
+ << dendl;
+ o.read_error = true;
+ return 0;
+ }
+ if (r > 0) {
+ pos.data_hash << bl;
+ }
+ pos.data_pos += r;
+ if (r == (int)stride) {
+ return -EINPROGRESS;
+ }
+
+ ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
+ if (!hinfo) {
+ dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl;
+ o.read_error = true;
+ o.digest_present = false;
+ return 0;
+ } else {
+ if (!get_parent()->get_pool().allows_ecoverwrites()) {
+ if (!hinfo->has_chunk_hash()) {
+ dout(0) << "_scan_list " << poid << " got invalid hash info" << dendl;
+ o.ec_size_mismatch = true;
+ return 0;
+ }
+ if (hinfo->get_total_chunk_size() != (unsigned)pos.data_pos) {
+ dout(0) << "_scan_list " << poid << " got incorrect size on read 0x"
+ << std::hex << pos
+ << " expected 0x" << hinfo->get_total_chunk_size() << std::dec
+ << dendl;
+ o.ec_size_mismatch = true;
+ return 0;
+ }
+
+ if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) !=
+ pos.data_hash.digest()) {
+ dout(0) << "_scan_list " << poid << " got incorrect hash on read 0x"
+ << std::hex << pos.data_hash.digest() << " != expected 0x"
+ << hinfo->get_chunk_hash(get_parent()->whoami_shard().shard)
+ << std::dec << dendl;
+ o.ec_hash_mismatch = true;
+ return 0;
+ }
+
+ /* We checked above that we match our own stored hash. We cannot
+ * send a hash of the actual object, so instead we simply send
+ * our locally stored hash of shard 0 on the assumption that if
+ * we match our chunk hash and our recollection of the hash for
+ * chunk 0 matches that of our peers, there is likely no corruption.
+ */
+ o.digest = hinfo->get_chunk_hash(0);
+ o.digest_present = true;
+ } else {
+ /* Hack! We must be using partial overwrites, and partial overwrites
+ * don't support deep-scrub yet
+ */
+ o.digest = 0;
+ o.digest_present = true;
+ }
+ }
+
+ o.omap_digest = -1;
+ o.omap_digest_present = true;
+ return 0;
+}