diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/crimson/osd/replicated_recovery_backend.cc | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/crimson/osd/replicated_recovery_backend.cc | 1076 |
1 files changed, 1076 insertions, 0 deletions
diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc new file mode 100644 index 000000000..0812003bb --- /dev/null +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -0,0 +1,1076 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <fmt/format.h> +#include <fmt/ostream.h> +#include <seastar/core/future.hh> +#include <seastar/core/do_with.hh> + +#include "crimson/osd/pg.h" +#include "crimson/osd/pg_backend.h" +#include "replicated_recovery_backend.h" + +#include "msg/Message.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +seastar::future<> ReplicatedRecoveryBackend::recover_object( + const hobject_t& soid, + eversion_t need) +{ + logger().debug("{}: {}, {}", __func__, soid, need); + // always add_recovering(soid) before recover_object(soid) + assert(is_recovering(soid)); + // start tracking the recovery of soid + return maybe_pull_missing_obj(soid, need).then([this, soid, need] { + logger().debug("recover_object: loading obc: {}", soid); + return pg.with_head_obc<RWState::RWREAD>(soid, + [this, soid, need](auto obc) { + logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid); + auto& recovery_waiter = recovering.at(soid); + recovery_waiter.obc = obc; + recovery_waiter.obc->wait_recovery_read(); + return maybe_push_shards(soid, need); + }).handle_error( + crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) { + // TODO: may need eio handling? + logger().error("recover_object saw error code {}, ignoring object {}", + code, soid); + })); + }); +} + +seastar::future<> +ReplicatedRecoveryBackend::maybe_push_shards( + const hobject_t& soid, + eversion_t need) +{ + return seastar::parallel_for_each(get_shards_to_push(soid), + [this, need, soid](auto shard) { + return prep_push(soid, need, shard).then([this, soid, shard](auto push) { + auto msg = make_message<MOSDPGPush>(); + msg->from = pg.get_pg_whoami(); + msg->pgid = pg.get_pgid(); + msg->map_epoch = pg.get_osdmap_epoch(); + msg->min_epoch = pg.get_last_peering_reset(); + msg->pushes.push_back(std::move(push)); + msg->set_priority(pg.get_recovery_op_priority()); + return shard_services.send_to_osd(shard.osd, + std::move(msg), + pg.get_osdmap_epoch()).then( + [this, soid, shard] { + return recovering.at(soid).wait_for_pushes(shard); + }); + }); + }).then([this, soid] { + auto &recovery = recovering.at(soid); + auto push_info = recovery.pushing.begin(); + object_stat_sum_t stat = {}; + if (push_info != recovery.pushing.end()) { + stat = push_info->second.stat; + } else { + // no push happened, take pull_info's stat + assert(recovery.pi); + stat = recovery.pi->stat; + } + pg.get_recovery_handler()->on_global_recover(soid, stat, false); + return seastar::make_ready_future<>(); + }).handle_exception([this, soid](auto e) { + auto &recovery = recovering.at(soid); + if (recovery.obc) { + recovery.obc->drop_recovery_read(); + } + recovering.erase(soid); + return seastar::make_exception_future<>(e); + }); +} + +seastar::future<> +ReplicatedRecoveryBackend::maybe_pull_missing_obj( + const hobject_t& soid, + eversion_t need) +{ + pg_missing_tracker_t local_missing = pg.get_local_missing(); + if (!local_missing.is_missing(soid)) { + return seastar::make_ready_future<>(); + } + PullOp po; + auto& recovery_waiter = recovering.at(soid); + recovery_waiter.pi = std::make_optional<RecoveryBackend::PullInfo>(); + auto& pi = *recovery_waiter.pi; + prepare_pull(po, pi, soid, need); + auto msg = make_message<MOSDPGPull>(); + msg->from = pg.get_pg_whoami(); + msg->set_priority(pg.get_recovery_op_priority()); + msg->pgid = pg.get_pgid(); + msg->map_epoch = pg.get_osdmap_epoch(); + msg->min_epoch = pg.get_last_peering_reset(); + msg->set_pulls({std::move(po)}); + return shard_services.send_to_osd( + pi.from.osd, + std::move(msg), + pg.get_osdmap_epoch() + ).then([&recovery_waiter] { + return recovery_waiter.wait_for_pull(); + }); +} + +seastar::future<> ReplicatedRecoveryBackend::push_delete( + const hobject_t& soid, + eversion_t need) +{ + logger().debug("{}: {}, {}", __func__, soid, need); + recovering[soid]; + epoch_t min_epoch = pg.get_last_peering_reset(); + + assert(pg.get_acting_recovery_backfill().size() > 0); + return seastar::parallel_for_each(pg.get_acting_recovery_backfill(), + [this, soid, need, min_epoch](pg_shard_t shard) { + if (shard == pg.get_pg_whoami()) + return seastar::make_ready_future<>(); + auto iter = pg.get_shard_missing().find(shard); + if (iter == pg.get_shard_missing().end()) + return seastar::make_ready_future<>(); + if (iter->second.is_missing(soid)) { + logger().debug("push_delete: will remove {} from {}", soid, shard); + pg.begin_peer_recover(shard, soid); + spg_t target_pg(pg.get_info().pgid.pgid, shard.shard); + auto msg = make_message<MOSDPGRecoveryDelete>( + pg.get_pg_whoami(), target_pg, pg.get_osdmap_epoch(), min_epoch); + msg->set_priority(pg.get_recovery_op_priority()); + msg->objects.push_back(std::make_pair(soid, need)); + return shard_services.send_to_osd(shard.osd, std::move(msg), + pg.get_osdmap_epoch()).then( + [this, soid, shard] { + return recovering.at(soid).wait_for_pushes(shard); + }); + } + return seastar::make_ready_future<>(); + }); +} + +seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete( + Ref<MOSDPGRecoveryDelete> m) +{ + logger().debug("{}: {}", __func__, *m); + + auto& p = m->objects.front(); //TODO: only one delete per message for now. + return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch()).then( + [this, m] { + auto reply = make_message<MOSDPGRecoveryDeleteReply>(); + reply->from = pg.get_pg_whoami(); + reply->set_priority(m->get_priority()); + reply->pgid = spg_t(pg.get_info().pgid.pgid, m->from.shard); + reply->map_epoch = m->map_epoch; + reply->min_epoch = m->min_epoch; + reply->objects = m->objects; + return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch()); + }); +} + +seastar::future<> ReplicatedRecoveryBackend::on_local_recover_persist( + const hobject_t& soid, + const ObjectRecoveryInfo& _recovery_info, + bool is_delete, + epoch_t epoch_frozen) +{ + logger().debug("{}", __func__); + ceph::os::Transaction t; + pg.get_recovery_handler()->on_local_recover(soid, _recovery_info, is_delete, t); + return shard_services.get_store().do_transaction(coll, std::move(t)).then( + [this, epoch_frozen, last_complete = pg.get_info().last_complete] { + pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete); + return seastar::make_ready_future<>(); + }); +} + +seastar::future<> ReplicatedRecoveryBackend::local_recover_delete( + const hobject_t& soid, + eversion_t need, + epoch_t epoch_to_freeze) +{ + logger().debug("{}: {}, {}", __func__, soid, need); + return backend->load_metadata(soid).safe_then([this] + (auto lomt) { + if (lomt->os.exists) { + return seastar::do_with(ceph::os::Transaction(), + [this, lomt = std::move(lomt)](auto& txn) { + return backend->remove(lomt->os, txn).then([this, &txn]() mutable { + return shard_services.get_store().do_transaction(coll, + std::move(txn)); + }); + }); + } + return seastar::make_ready_future<>(); + }).safe_then([this, soid, epoch_to_freeze, need] { + ObjectRecoveryInfo recovery_info; + recovery_info.soid = soid; + recovery_info.version = need; + return on_local_recover_persist(soid, recovery_info, + true, epoch_to_freeze); + }, PGBackend::load_metadata_ertr::all_same_way( + [this, soid, epoch_to_freeze, need] (auto e) { + ObjectRecoveryInfo recovery_info; + recovery_info.soid = soid; + recovery_info.version = need; + return on_local_recover_persist(soid, recovery_info, + true, epoch_to_freeze); + }) + ); +} + +seastar::future<> ReplicatedRecoveryBackend::recover_delete( + const hobject_t &soid, eversion_t need) +{ + logger().debug("{}: {}, {}", __func__, soid, need); + + epoch_t cur_epoch = pg.get_osdmap_epoch(); + return seastar::do_with(object_stat_sum_t(), + [this, soid, need, cur_epoch](auto& stat_diff) { + return local_recover_delete(soid, need, cur_epoch).then( + [this, &stat_diff, cur_epoch, soid, need] { + if (!pg.has_reset_since(cur_epoch)) { + bool object_missing = false; + for (const auto& shard : pg.get_acting_recovery_backfill()) { + if (shard == pg.get_pg_whoami()) + continue; + if (pg.get_shard_missing(shard)->is_missing(soid)) { + logger().debug("recover_delete: soid {} needs to deleted from replca {}", + soid, shard); + object_missing = true; + break; + } + } + + if (!object_missing) { + stat_diff.num_objects_recovered = 1; + return seastar::make_ready_future<>(); + } else { + return push_delete(soid, need); + } + } + return seastar::make_ready_future<>(); + }).then([this, soid, &stat_diff] { + pg.get_recovery_handler()->on_global_recover(soid, stat_diff, true); + return seastar::make_ready_future<>(); + }); + }); +} + +seastar::future<PushOp> +ReplicatedRecoveryBackend::prep_push( + const hobject_t& soid, + eversion_t need, + pg_shard_t pg_shard) +{ + logger().debug("{}: {}, {}", __func__, soid, need); + + auto& recovery_waiter = recovering.at(soid); + auto& obc = recovery_waiter.obc; + interval_set<uint64_t> data_subset; + if (obc->obs.oi.size) { + data_subset.insert(0, obc->obs.oi.size); + } + const auto& missing = pg.get_shard_missing().find(pg_shard)->second; + if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) { + const auto it = missing.get_items().find(soid); + assert(it != missing.get_items().end()); + data_subset.intersection_of(it->second.clean_regions.get_dirty_regions()); + logger().debug("prep_push: {} data_subset {}", soid, data_subset); + } + + logger().debug("prep_push: {} to {}", soid, pg_shard); + auto& pi = recovery_waiter.pushing[pg_shard]; + pg.begin_peer_recover(pg_shard, soid); + const auto pmissing_iter = pg.get_shard_missing().find(pg_shard); + const auto missing_iter = pmissing_iter->second.get_items().find(soid); + assert(missing_iter != pmissing_iter->second.get_items().end()); + + pi.obc = obc; + pi.recovery_info.size = obc->obs.oi.size; + pi.recovery_info.copy_subset = data_subset; + pi.recovery_info.soid = soid; + pi.recovery_info.oi = obc->obs.oi; + pi.recovery_info.version = obc->obs.oi.version; + pi.recovery_info.object_exist = + missing_iter->second.clean_regions.object_is_exist(); + pi.recovery_progress.omap_complete = + (!missing_iter->second.clean_regions.omap_is_dirty() && + HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)); + + return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then( + [this, soid, pg_shard](auto pop) { + auto& recovery_waiter = recovering.at(soid); + auto& pi = recovery_waiter.pushing[pg_shard]; + pi.recovery_progress = pop.after_progress; + return pop; + }); +} + +void ReplicatedRecoveryBackend::prepare_pull(PullOp& po, PullInfo& pi, + const hobject_t& soid, + eversion_t need) { + logger().debug("{}: {}, {}", __func__, soid, need); + + pg_missing_tracker_t local_missing = pg.get_local_missing(); + const auto missing_iter = local_missing.get_items().find(soid); + auto m = pg.get_missing_loc_shards(); + pg_shard_t fromshard = *(m[soid].begin()); + + //TODO: skipped snap objects case for now + po.recovery_info.copy_subset.insert(0, (uint64_t) -1); + if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) + po.recovery_info.copy_subset.intersection_of( + missing_iter->second.clean_regions.get_dirty_regions()); + po.recovery_info.size = ((uint64_t) -1); + po.recovery_info.object_exist = + missing_iter->second.clean_regions.object_is_exist(); + po.recovery_info.soid = soid; + po.soid = soid; + po.recovery_progress.data_complete = false; + po.recovery_progress.omap_complete = + !missing_iter->second.clean_regions.omap_is_dirty() && + HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS); + po.recovery_progress.data_recovered_to = 0; + po.recovery_progress.first = true; + + pi.from = fromshard; + pi.soid = soid; + pi.recovery_info = po.recovery_info; + pi.recovery_progress = po.recovery_progress; +} + +seastar::future<PushOp> ReplicatedRecoveryBackend::build_push_op( + const ObjectRecoveryInfo& recovery_info, + const ObjectRecoveryProgress& progress, + object_stat_sum_t* stat) +{ + logger().debug("{} {} @{}", + __func__, recovery_info.soid, recovery_info.version); + return seastar::do_with(ObjectRecoveryProgress(progress), + uint64_t(crimson::common::local_conf() + ->osd_recovery_max_chunk), + recovery_info.version, + PushOp(), + [this, &recovery_info, &progress, stat] + (auto new_progress, auto available, auto v, auto pop) { + return read_metadata_for_push_op(recovery_info.soid, + progress, new_progress, + v, &pop).then([&](eversion_t local_ver) mutable { + // If requestor didn't know the version, use ours + if (v == eversion_t()) { + v = local_ver; + } else if (v != local_ver) { + logger().error("build_push_op: {} push {} v{} failed because local copy is {}", + pg.get_pgid(), recovery_info.soid, recovery_info.version, local_ver); + // TODO: bail out + } + return read_omap_for_push_op(recovery_info.soid, + progress, + new_progress, + &available, &pop); + }).then([this, &recovery_info, &progress, &available, &pop]() mutable { + logger().debug("build_push_op: available: {}, copy_subset: {}", + available, recovery_info.copy_subset); + return read_object_for_push_op(recovery_info.soid, + recovery_info.copy_subset, + progress.data_recovered_to, + available, &pop); + }).then([&recovery_info, &v, &progress, &new_progress, stat, &pop] + (uint64_t recovered_to) mutable { + new_progress.data_recovered_to = recovered_to; + if (new_progress.is_complete(recovery_info)) { + new_progress.data_complete = true; + if (stat) + stat->num_objects_recovered++; + } else if (progress.first && progress.omap_complete) { + // If omap is not changed, we need recovery omap + // when recovery cannot be completed once + new_progress.omap_complete = false; + } + if (stat) { + stat->num_keys_recovered += pop.omap_entries.size(); + stat->num_bytes_recovered += pop.data.length(); + } + pop.version = v; + pop.soid = recovery_info.soid; + pop.recovery_info = recovery_info; + pop.after_progress = new_progress; + pop.before_progress = progress; + logger().debug("build_push_op: pop version: {}, pop data length: {}", + pop.version, pop.data.length()); + return seastar::make_ready_future<PushOp>(std::move(pop)); + }); + }); +} + +seastar::future<eversion_t> +ReplicatedRecoveryBackend::read_metadata_for_push_op( + const hobject_t& oid, + const ObjectRecoveryProgress& progress, + ObjectRecoveryProgress& new_progress, + eversion_t ver, + PushOp* push_op) +{ + if (!progress.first) { + return seastar::make_ready_future<eversion_t>(ver); + } + return seastar::when_all_succeed( + backend->omap_get_header(coll, ghobject_t(oid)).handle_error( + crimson::os::FuturizedStore::read_errorator::all_same_way( + [] (const std::error_code& e) { + return seastar::make_ready_future<bufferlist>(); + })), + store->get_attrs(coll, ghobject_t(oid)).handle_error( + crimson::os::FuturizedStore::get_attrs_ertr::all_same_way( + [] (const std::error_code& e) { + return seastar::make_ready_future<crimson::os::FuturizedStore::attrs_t>(); + })) + ).then_unpack([&new_progress, push_op](auto bl, auto attrs) { + if (bl.length() == 0) { + logger().error("read_metadata_for_push_op: fail to read omap header"); + return eversion_t{}; + } else if (attrs.empty()) { + logger().error("read_metadata_for_push_op: fail to read attrs"); + return eversion_t{}; + } + push_op->omap_header.claim_append(std::move(bl)); + for (auto&& [key, val] : std::move(attrs)) { + push_op->attrset[key].push_back(val); + } + logger().debug("read_metadata_for_push_op: {}", push_op->attrset[OI_ATTR]); + object_info_t oi; + oi.decode(push_op->attrset[OI_ATTR]); + new_progress.first = false; + return oi.version; + }); +} + +seastar::future<uint64_t> +ReplicatedRecoveryBackend::read_object_for_push_op( + const hobject_t& oid, + const interval_set<uint64_t>& copy_subset, + uint64_t offset, + uint64_t max_len, + PushOp* push_op) +{ + if (max_len == 0 || copy_subset.empty()) { + push_op->data_included.clear(); + return seastar::make_ready_future<uint64_t>(offset); + } + // 1. get the extents in the interested range + return backend->fiemap(coll, ghobject_t{oid}, + 0, copy_subset.range_end()).then_wrapped( + [=](auto&& fiemap_included) mutable { + interval_set<uint64_t> extents; + try { + extents.intersection_of(copy_subset, fiemap_included.get0()); + } catch (std::exception &) { + // if fiemap() fails, we will read nothing, as the intersection of + // copy_subset and an empty interval_set would be empty anyway + extents.clear(); + } + // 2. we can read up to "max_len" bytes from "offset", so truncate the + // extents down to this quota. no need to return the number of consumed + // bytes, as this is the last consumer of this quota + push_op->data_included.span_of(extents, offset, max_len); + // 3. read the truncated extents + // TODO: check if the returned extents are pruned + return store->readv(coll, ghobject_t{oid}, push_op->data_included, 0); + }).safe_then([push_op, range_end=copy_subset.range_end()](auto &&bl) { + push_op->data.claim_append(std::move(bl)); + uint64_t recovered_to = 0; + if (push_op->data_included.empty()) { + // zero filled section, skip to end! + recovered_to = range_end; + } else { + // note down the progress, we will start from there next time + recovered_to = push_op->data_included.range_end(); + } + return seastar::make_ready_future<uint64_t>(recovered_to); + }, PGBackend::read_errorator::all_same_way([](auto e) { + logger().debug("build_push_op: read exception"); + return seastar::make_exception_future<uint64_t>(e); + })); +} + +seastar::future<> +ReplicatedRecoveryBackend::read_omap_for_push_op( + const hobject_t& oid, + const ObjectRecoveryProgress& progress, + ObjectRecoveryProgress& new_progress, + uint64_t* max_len, + PushOp* push_op) +{ + if (progress.omap_complete) { + return seastar::make_ready_future<>(); + } + return shard_services.get_store().get_omap_iterator(coll, ghobject_t{oid}) + .then([&progress, &new_progress, max_len, push_op](auto omap_iter) { + return omap_iter->lower_bound(progress.omap_recovered_to).then( + [omap_iter, &new_progress, max_len, push_op] { + return seastar::do_until([omap_iter, &new_progress, max_len, push_op] { + if (!omap_iter->valid()) { + new_progress.omap_complete = true; + return true; + } + if (push_op->omap_entries.empty()) { + return false; + } + if (const uint64_t entries_per_chunk = + crimson::common::local_conf()->osd_recovery_max_omap_entries_per_chunk; + entries_per_chunk > 0 && + push_op->omap_entries.size() >= entries_per_chunk) { + new_progress.omap_recovered_to = omap_iter->key(); + return true; + } + if (omap_iter->key().size() + omap_iter->value().length() > *max_len) { + new_progress.omap_recovered_to = omap_iter->key(); + return true; + } + return false; + }, + [omap_iter, max_len, push_op] { + push_op->omap_entries.emplace(omap_iter->key(), omap_iter->value()); + if (const uint64_t entry_size = + omap_iter->key().size() + omap_iter->value().length(); + entry_size > *max_len) { + *max_len -= entry_size; + } else { + *max_len = 0; + } + return omap_iter->next(); + }); + }); + }); +} + +std::vector<pg_shard_t> +ReplicatedRecoveryBackend::get_shards_to_push(const hobject_t& soid) const +{ + std::vector<pg_shard_t> shards; + assert(pg.get_acting_recovery_backfill().size() > 0); + for (const auto& peer : pg.get_acting_recovery_backfill()) { + if (peer == pg.get_pg_whoami()) + continue; + auto shard_missing = + pg.get_shard_missing().find(peer); + assert(shard_missing != pg.get_shard_missing().end()); + if (shard_missing->second.is_missing(soid)) { + shards.push_back(shard_missing->first); + } + } + return shards; +} + +seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m) +{ + logger().debug("{}: {}", __func__, *m); + return seastar::parallel_for_each(m->take_pulls(), + [this, from=m->from](auto& pull_op) { + const hobject_t& soid = pull_op.soid; + logger().debug("handle_pull: {}", soid); + return backend->stat(coll, ghobject_t(soid)).then( + [this, &pull_op](auto st) { + ObjectRecoveryInfo &recovery_info = pull_op.recovery_info; + ObjectRecoveryProgress &progress = pull_op.recovery_progress; + if (progress.first && recovery_info.size == ((uint64_t) -1)) { + // Adjust size and copy_subset + recovery_info.size = st.st_size; + if (st.st_size) { + interval_set<uint64_t> object_range; + object_range.insert(0, st.st_size); + recovery_info.copy_subset.intersection_of(object_range); + } else { + recovery_info.copy_subset.clear(); + } + assert(recovery_info.clone_subset.empty()); + } + return build_push_op(recovery_info, progress, 0); + }).then([this, from](auto pop) { + auto msg = make_message<MOSDPGPush>(); + msg->from = pg.get_pg_whoami(); + msg->pgid = pg.get_pgid(); + msg->map_epoch = pg.get_osdmap_epoch(); + msg->min_epoch = pg.get_last_peering_reset(); + msg->set_priority(pg.get_recovery_op_priority()); + msg->pushes.push_back(std::move(pop)); + return shard_services.send_to_osd(from.osd, std::move(msg), + pg.get_osdmap_epoch()); + }); + }); +} + +seastar::future<bool> ReplicatedRecoveryBackend::_handle_pull_response( + pg_shard_t from, + const PushOp& pop, + PullOp* response, + ceph::os::Transaction* t) +{ + logger().debug("handle_pull_response {} {} data.size() is {} data_included: {}", + pop.recovery_info, pop.after_progress, pop.data.length(), pop.data_included); + + const hobject_t &hoid = pop.soid; + auto& recovery_waiter = recovering.at(hoid); + auto& pi = *recovery_waiter.pi; + if (pi.recovery_info.size == (uint64_t(-1))) { + pi.recovery_info.size = pop.recovery_info.size; + pi.recovery_info.copy_subset.intersection_of( + pop.recovery_info.copy_subset); + } + + // If primary doesn't have object info and didn't know version + if (pi.recovery_info.version == eversion_t()) + pi.recovery_info.version = pop.version; + + auto prepare_waiter = seastar::make_ready_future<>(); + if (pi.recovery_progress.first) { + prepare_waiter = pg.with_head_obc<RWState::RWNONE>( + pi.recovery_info.soid, [&pi, &recovery_waiter, &pop](auto obc) { + pi.obc = obc; + recovery_waiter.obc = obc; + obc->obs.oi.decode(pop.attrset.at(OI_ATTR)); + pi.recovery_info.oi = obc->obs.oi; + return crimson::osd::PG::load_obc_ertr::now(); + }).handle_error(crimson::ct_error::assert_all{}); + }; + return prepare_waiter.then([this, &pi, &pop, t, response]() mutable { + const bool first = pi.recovery_progress.first; + pi.recovery_progress = pop.after_progress; + logger().debug("new recovery_info {}, new progress {}", + pi.recovery_info, pi.recovery_progress); + interval_set<uint64_t> data_zeros; + { + uint64_t offset = pop.before_progress.data_recovered_to; + uint64_t length = (pop.after_progress.data_recovered_to - + pop.before_progress.data_recovered_to); + if (length) { + data_zeros.insert(offset, length); + } + } + auto [usable_intervals, data] = + trim_pushed_data(pi.recovery_info.copy_subset, + pop.data_included, pop.data); + bool complete = pi.is_complete(); + bool clear_omap = !pop.before_progress.omap_complete; + return submit_push_data(pi.recovery_info, first, complete, clear_omap, + std::move(data_zeros), usable_intervals, data, pop.omap_header, + pop.attrset, pop.omap_entries, t).then( + [this, response, &pi, &pop, complete, t, bytes_recovered=data.length()] { + pi.stat.num_keys_recovered += pop.omap_entries.size(); + pi.stat.num_bytes_recovered += bytes_recovered; + + if (complete) { + pi.stat.num_objects_recovered++; + pg.get_recovery_handler()->on_local_recover( + pop.soid, recovering.at(pop.soid).pi->recovery_info, + false, *t); + return true; + } else { + response->soid = pop.soid; + response->recovery_info = pi.recovery_info; + response->recovery_progress = pi.recovery_progress; + return false; + } + }); + }); +} + +seastar::future<> ReplicatedRecoveryBackend::handle_pull_response( + Ref<MOSDPGPush> m) +{ + const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now. + if (pop.version == eversion_t()) { + // replica doesn't have it! + pg.get_recovery_handler()->on_failed_recover({ m->from }, pop.soid, + get_recovering(pop.soid).pi->recovery_info.version); + return seastar::make_exception_future<>( + std::runtime_error(fmt::format( + "Error on pushing side {} when pulling obj {}", + m->from, pop.soid))); + } + + logger().debug("{}: {}", __func__, *m); + return seastar::do_with(PullOp(), [this, m](auto& response) { + return seastar::do_with(ceph::os::Transaction(), m.get(), + [this, &response](auto& t, auto& m) { + pg_shard_t from = m->from; + PushOp& pop = m->pushes[0]; // only one push per message for now + return _handle_pull_response(from, pop, &response, &t).then( + [this, &t](bool complete) { + epoch_t epoch_frozen = pg.get_osdmap_epoch(); + return shard_services.get_store().do_transaction(coll, std::move(t)) + .then([this, epoch_frozen, complete, + last_complete = pg.get_info().last_complete] { + pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete); + return seastar::make_ready_future<bool>(complete); + }); + }); + }).then([this, m, &response](bool complete) { + if (complete) { + auto& pop = m->pushes[0]; + recovering.at(pop.soid).set_pulled(); + return seastar::make_ready_future<>(); + } else { + auto reply = make_message<MOSDPGPull>(); + reply->from = pg.get_pg_whoami(); + reply->set_priority(m->get_priority()); + reply->pgid = pg.get_info().pgid; + reply->map_epoch = m->map_epoch; + reply->min_epoch = m->min_epoch; + reply->set_pulls({std::move(response)}); + return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch()); + } + }); + }); +} + +seastar::future<> ReplicatedRecoveryBackend::_handle_push( + pg_shard_t from, + const PushOp &pop, + PushReplyOp *response, + ceph::os::Transaction *t) +{ + logger().debug("{}", __func__); + + bool first = pop.before_progress.first; + interval_set<uint64_t> data_zeros; + { + uint64_t offset = pop.before_progress.data_recovered_to; + uint64_t length = (pop.after_progress.data_recovered_to - + pop.before_progress.data_recovered_to); + if (length) { + data_zeros.insert(offset, length); + } + } + bool complete = (pop.after_progress.data_complete && + pop.after_progress.omap_complete); + bool clear_omap = !pop.before_progress.omap_complete; + response->soid = pop.recovery_info.soid; + + return submit_push_data(pop.recovery_info, first, complete, clear_omap, + std::move(data_zeros), pop.data_included, pop.data, pop.omap_header, + pop.attrset, pop.omap_entries, t).then([this, complete, &pop, t] { + if (complete) { + pg.get_recovery_handler()->on_local_recover( + pop.recovery_info.soid, pop.recovery_info, + false, *t); + } + }); +} + +seastar::future<> ReplicatedRecoveryBackend::handle_push( + Ref<MOSDPGPush> m) +{ + if (pg.is_primary()) { + return handle_pull_response(m); + } + + logger().debug("{}: {}", __func__, *m); + return seastar::do_with(PushReplyOp(), [this, m](auto& response) { + const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now + return seastar::do_with(ceph::os::Transaction(), + [this, m, &pop, &response](auto& t) { + return _handle_push(m->from, pop, &response, &t).then( + [this, &t] { + epoch_t epoch_frozen = pg.get_osdmap_epoch(); + return shard_services.get_store().do_transaction(coll, std::move(t)).then( + [this, epoch_frozen, last_complete = pg.get_info().last_complete] { + //TODO: this should be grouped with pg.on_local_recover somehow. + pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete); + }); + }); + }).then([this, m, &response]() mutable { + auto reply = make_message<MOSDPGPushReply>(); + reply->from = pg.get_pg_whoami(); + reply->set_priority(m->get_priority()); + reply->pgid = pg.get_info().pgid; + reply->map_epoch = m->map_epoch; + reply->min_epoch = m->min_epoch; + std::vector<PushReplyOp> replies = { std::move(response) }; + reply->replies.swap(replies); + return shard_services.send_to_osd(m->from.osd, + std::move(reply), pg.get_osdmap_epoch()); + }); + }); +} + +seastar::future<std::optional<PushOp>> +ReplicatedRecoveryBackend::_handle_push_reply( + pg_shard_t peer, + const PushReplyOp &op) +{ + const hobject_t& soid = op.soid; + logger().debug("{}, soid {}, from {}", __func__, soid, peer); + auto recovering_iter = recovering.find(soid); + if (recovering_iter == recovering.end() + || !recovering_iter->second.pushing.count(peer)) { + logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer); + return seastar::make_ready_future<std::optional<PushOp>>(); + } else { + auto& pi = recovering_iter->second.pushing[peer]; + bool error = pi.recovery_progress.error; + if (!pi.recovery_progress.data_complete && !error) { + return build_push_op(pi.recovery_info, pi.recovery_progress, + &pi.stat).then([&pi] (auto pop) { + pi.recovery_progress = pop.after_progress; + return seastar::make_ready_future<std::optional<PushOp>>(std::move(pop)); + }).handle_exception([recovering_iter, &pi, peer] (auto e) { + pi.recovery_progress.error = true; + recovering_iter->second.set_push_failed(peer, e); + return seastar::make_ready_future<std::optional<PushOp>>(); + }); + } + if (!error) { + pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info); + } + recovering_iter->second.set_pushed(peer); + return seastar::make_ready_future<std::optional<PushOp>>(); + } +} + +seastar::future<> ReplicatedRecoveryBackend::handle_push_reply( + Ref<MOSDPGPushReply> m) +{ + logger().debug("{}: {}", __func__, *m); + auto from = m->from; + auto& push_reply = m->replies[0]; //TODO: only one reply per message + + return _handle_push_reply(from, push_reply).then( + [this, from](std::optional<PushOp> push_op) { + if (push_op) { + auto msg = make_message<MOSDPGPush>(); + msg->from = pg.get_pg_whoami(); + msg->pgid = pg.get_pgid(); + msg->map_epoch = pg.get_osdmap_epoch(); + msg->min_epoch = pg.get_last_peering_reset(); + msg->set_priority(pg.get_recovery_op_priority()); + msg->pushes.push_back(std::move(*push_op)); + return shard_services.send_to_osd(from.osd, + std::move(msg), + pg.get_osdmap_epoch()); + } else { + return seastar::make_ready_future<>(); + } + }); +} + +std::pair<interval_set<uint64_t>, + bufferlist> +ReplicatedRecoveryBackend::trim_pushed_data( + const interval_set<uint64_t> ©_subset, + const interval_set<uint64_t> &intervals_received, + ceph::bufferlist data_received) +{ + logger().debug("{}", __func__); + // what i have is only a subset of what i want + if (intervals_received.subset_of(copy_subset)) { + return {intervals_received, data_received}; + } + // only collect the extents included by copy_subset and intervals_received + interval_set<uint64_t> intervals_usable; + bufferlist data_usable; + intervals_usable.intersection_of(copy_subset, intervals_received); + uint64_t have_off = 0; + for (auto [have_start, have_len] : intervals_received) { + interval_set<uint64_t> want; + want.insert(have_start, have_len); + want.intersection_of(copy_subset); + for (auto [want_start, want_len] : want) { + bufferlist sub; + uint64_t data_off = have_off + (want_start - have_start); + sub.substr_of(data_received, data_off, want_len); + data_usable.claim_append(sub); + } + have_off += have_len; + } + return {intervals_usable, data_usable}; +} + +seastar::future<> ReplicatedRecoveryBackend::submit_push_data( + const ObjectRecoveryInfo &recovery_info, + bool first, + bool complete, + bool clear_omap, + interval_set<uint64_t> data_zeros, + const interval_set<uint64_t> &intervals_included, + bufferlist data_included, + bufferlist omap_header, + const map<string, bufferlist> &attrs, + const map<string, bufferlist> &omap_entries, + ObjectStore::Transaction *t) +{ + logger().debug("{}", __func__); + hobject_t target_oid; + if (first && complete) { + target_oid = recovery_info.soid; + } else { + target_oid = get_temp_recovery_object(recovery_info.soid, + recovery_info.version); + if (first) { + logger().debug("{}: Adding oid {} in the temp collection", + __func__, target_oid); + add_temp_obj(target_oid); + } + } + + return [this, &recovery_info, first, complete, t, + &omap_header, &attrs, target_oid, clear_omap] { + if (first) { + if (!complete) { + t->remove(coll->get_cid(), ghobject_t(target_oid)); + t->touch(coll->get_cid(), ghobject_t(target_oid)); + bufferlist bv = attrs.at(OI_ATTR); + object_info_t oi(bv); + t->set_alloc_hint(coll->get_cid(), ghobject_t(target_oid), + oi.expected_object_size, + oi.expected_write_size, + oi.alloc_hint_flags); + } else { + if (!recovery_info.object_exist) { + t->remove(coll->get_cid(), ghobject_t(target_oid)); + t->touch(coll->get_cid(), ghobject_t(target_oid)); + bufferlist bv = attrs.at(OI_ATTR); + object_info_t oi(bv); + t->set_alloc_hint(coll->get_cid(), ghobject_t(target_oid), + oi.expected_object_size, + oi.expected_write_size, + oi.alloc_hint_flags); + } + //remove xattr and update later if overwrite on original object + t->rmattrs(coll->get_cid(), ghobject_t(target_oid)); + //if need update omap, clear the previous content first + if (clear_omap) + t->omap_clear(coll->get_cid(), ghobject_t(target_oid)); + } + + t->truncate(coll->get_cid(), ghobject_t(target_oid), recovery_info.size); + if (omap_header.length()) + t->omap_setheader(coll->get_cid(), ghobject_t(target_oid), omap_header); + + return store->stat(coll, ghobject_t(recovery_info.soid)).then( + [this, &recovery_info, complete, t, target_oid, + omap_header = std::move(omap_header)] (auto st) { + //TODO: pg num bytes counting + if (!complete) { + //clone overlap content in local object + if (recovery_info.object_exist) { + uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size); + interval_set<uint64_t> local_intervals_included, local_intervals_excluded; + if (local_size) { + local_intervals_included.insert(0, local_size); + local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset); + local_intervals_included.subtract(local_intervals_excluded); + } + for (auto [off, len] : local_intervals_included) { + logger().debug(" clone_range {} {}~{}", + recovery_info.soid, off, len); + t->clone_range(coll->get_cid(), ghobject_t(recovery_info.soid), + ghobject_t(target_oid), off, len, off); + } + } + } + return seastar::make_ready_future<>(); + }); + } + return seastar::make_ready_future<>(); + }().then([this, data_zeros=std::move(data_zeros), + &recovery_info, &intervals_included, t, target_oid, + &omap_entries, &attrs, data_included, complete, first]() mutable { + uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL; + // Punch zeros for data, if fiemap indicates nothing but it is marked dirty + if (!data_zeros.empty()) { + data_zeros.intersection_of(recovery_info.copy_subset); + assert(intervals_included.subset_of(data_zeros)); + data_zeros.subtract(intervals_included); + + logger().debug("submit_push_data recovering object {} copy_subset: {} " + "intervals_included: {} data_zeros: {}", + recovery_info.soid, recovery_info.copy_subset, + intervals_included, data_zeros); + + for (auto [start, len] : data_zeros) { + t->zero(coll->get_cid(), ghobject_t(target_oid), start, len); + } + } + uint64_t off = 0; + for (auto [start, len] : intervals_included) { + bufferlist bit; + bit.substr_of(data_included, off, len); + t->write(coll->get_cid(), ghobject_t(target_oid), + start, len, bit, fadvise_flags); + off += len; + } + + if (!omap_entries.empty()) + t->omap_setkeys(coll->get_cid(), ghobject_t(target_oid), omap_entries); + if (!attrs.empty()) + t->setattrs(coll->get_cid(), ghobject_t(target_oid), attrs); + + if (complete) { + if (!first) { + logger().debug("submit_push_data: Removing oid {} from the temp collection", + target_oid); + clear_temp_obj(target_oid); + t->remove(coll->get_cid(), ghobject_t(recovery_info.soid)); + t->collection_move_rename(coll->get_cid(), ghobject_t(target_oid), + coll->get_cid(), ghobject_t(recovery_info.soid)); + } + submit_push_complete(recovery_info, t); + } + logger().debug("submit_push_data: done"); + return seastar::make_ready_future<>(); + }); +} + +void ReplicatedRecoveryBackend::submit_push_complete( + const ObjectRecoveryInfo &recovery_info, + ObjectStore::Transaction *t) +{ + for (const auto& [oid, extents] : recovery_info.clone_subset) { + for (const auto [off, len] : extents) { + logger().debug(" clone_range {} {}~{}", oid, off, len); + t->clone_range(coll->get_cid(), ghobject_t(oid), ghobject_t(recovery_info.soid), + off, len, off); + } + } +} + +seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete_reply( + Ref<MOSDPGRecoveryDeleteReply> m) +{ + auto& p = m->objects.front(); + hobject_t soid = p.first; + ObjectRecoveryInfo recovery_info; + recovery_info.version = p.second; + pg.get_recovery_handler()->on_peer_recover(m->from, soid, recovery_info); + get_recovering(soid).set_pushed(m->from); + return seastar::now(); +} + +seastar::future<> ReplicatedRecoveryBackend::handle_recovery_op(Ref<MOSDFastDispatchOp> m) +{ + switch (m->get_header().type) { + case MSG_OSD_PG_PULL: + return handle_pull(boost::static_pointer_cast<MOSDPGPull>(m)); + case MSG_OSD_PG_PUSH: + return handle_push(boost::static_pointer_cast<MOSDPGPush>(m)); + case MSG_OSD_PG_PUSH_REPLY: + return handle_push_reply( + boost::static_pointer_cast<MOSDPGPushReply>(m)); + case MSG_OSD_PG_RECOVERY_DELETE: + return handle_recovery_delete( + boost::static_pointer_cast<MOSDPGRecoveryDelete>(m)); + case MSG_OSD_PG_RECOVERY_DELETE_REPLY: + return handle_recovery_delete_reply( + boost::static_pointer_cast<MOSDPGRecoveryDeleteReply>(m)); + default: + // delegate to parent class for handling backend-agnostic recovery ops. + return RecoveryBackend::handle_recovery_op(std::move(m)); + } +} + |