// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include #include #include #include #include "crimson/osd/pg.h" #include "crimson/osd/pg_backend.h" #include "replicated_recovery_backend.h" #include "msg/Message.h" namespace { seastar::logger& logger() { return crimson::get_logger(ceph_subsys_osd); } } seastar::future<> ReplicatedRecoveryBackend::recover_object( const hobject_t& soid, eversion_t need) { logger().debug("{}: {}, {}", __func__, soid, need); // always add_recovering(soid) before recover_object(soid) assert(is_recovering(soid)); // start tracking the recovery of soid return maybe_pull_missing_obj(soid, need).then([this, soid, need] { logger().debug("recover_object: loading obc: {}", soid); return pg.with_head_obc(soid, [this, soid, need](auto obc) { logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid); auto& recovery_waiter = recovering.at(soid); recovery_waiter.obc = obc; recovery_waiter.obc->wait_recovery_read(); return maybe_push_shards(soid, need); }).handle_error( crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) { // TODO: may need eio handling? logger().error("recover_object saw error code {}, ignoring object {}", code, soid); })); }); } seastar::future<> ReplicatedRecoveryBackend::maybe_push_shards( const hobject_t& soid, eversion_t need) { return seastar::parallel_for_each(get_shards_to_push(soid), [this, need, soid](auto shard) { return prep_push(soid, need, shard).then([this, soid, shard](auto push) { auto msg = make_message(); msg->from = pg.get_pg_whoami(); msg->pgid = pg.get_pgid(); msg->map_epoch = pg.get_osdmap_epoch(); msg->min_epoch = pg.get_last_peering_reset(); msg->pushes.push_back(std::move(push)); msg->set_priority(pg.get_recovery_op_priority()); return shard_services.send_to_osd(shard.osd, std::move(msg), pg.get_osdmap_epoch()).then( [this, soid, shard] { return recovering.at(soid).wait_for_pushes(shard); }); }); }).then([this, soid] { auto &recovery = recovering.at(soid); auto push_info = recovery.pushing.begin(); object_stat_sum_t stat = {}; if (push_info != recovery.pushing.end()) { stat = push_info->second.stat; } else { // no push happened, take pull_info's stat assert(recovery.pi); stat = recovery.pi->stat; } pg.get_recovery_handler()->on_global_recover(soid, stat, false); return seastar::make_ready_future<>(); }).handle_exception([this, soid](auto e) { auto &recovery = recovering.at(soid); if (recovery.obc) { recovery.obc->drop_recovery_read(); } recovering.erase(soid); return seastar::make_exception_future<>(e); }); } seastar::future<> ReplicatedRecoveryBackend::maybe_pull_missing_obj( const hobject_t& soid, eversion_t need) { pg_missing_tracker_t local_missing = pg.get_local_missing(); if (!local_missing.is_missing(soid)) { return seastar::make_ready_future<>(); } PullOp po; auto& recovery_waiter = recovering.at(soid); recovery_waiter.pi = std::make_optional(); auto& pi = *recovery_waiter.pi; prepare_pull(po, pi, soid, need); auto msg = make_message(); msg->from = pg.get_pg_whoami(); msg->set_priority(pg.get_recovery_op_priority()); msg->pgid = pg.get_pgid(); msg->map_epoch = pg.get_osdmap_epoch(); msg->min_epoch = pg.get_last_peering_reset(); msg->set_pulls({std::move(po)}); return shard_services.send_to_osd( pi.from.osd, std::move(msg), pg.get_osdmap_epoch() ).then([&recovery_waiter] { return recovery_waiter.wait_for_pull(); }); } seastar::future<> ReplicatedRecoveryBackend::push_delete( const hobject_t& soid, eversion_t need) { logger().debug("{}: {}, {}", __func__, soid, need); recovering[soid]; epoch_t min_epoch = pg.get_last_peering_reset(); assert(pg.get_acting_recovery_backfill().size() > 0); return seastar::parallel_for_each(pg.get_acting_recovery_backfill(), [this, soid, need, min_epoch](pg_shard_t shard) { if (shard == pg.get_pg_whoami()) return seastar::make_ready_future<>(); auto iter = pg.get_shard_missing().find(shard); if (iter == pg.get_shard_missing().end()) return seastar::make_ready_future<>(); if (iter->second.is_missing(soid)) { logger().debug("push_delete: will remove {} from {}", soid, shard); pg.begin_peer_recover(shard, soid); spg_t target_pg(pg.get_info().pgid.pgid, shard.shard); auto msg = make_message( pg.get_pg_whoami(), target_pg, pg.get_osdmap_epoch(), min_epoch); msg->set_priority(pg.get_recovery_op_priority()); msg->objects.push_back(std::make_pair(soid, need)); return shard_services.send_to_osd(shard.osd, std::move(msg), pg.get_osdmap_epoch()).then( [this, soid, shard] { return recovering.at(soid).wait_for_pushes(shard); }); } return seastar::make_ready_future<>(); }); } seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete( Ref m) { logger().debug("{}: {}", __func__, *m); auto& p = m->objects.front(); //TODO: only one delete per message for now. return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch()).then( [this, m] { auto reply = make_message(); reply->from = pg.get_pg_whoami(); reply->set_priority(m->get_priority()); reply->pgid = spg_t(pg.get_info().pgid.pgid, m->from.shard); reply->map_epoch = m->map_epoch; reply->min_epoch = m->min_epoch; reply->objects = m->objects; return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch()); }); } seastar::future<> ReplicatedRecoveryBackend::on_local_recover_persist( const hobject_t& soid, const ObjectRecoveryInfo& _recovery_info, bool is_delete, epoch_t epoch_frozen) { logger().debug("{}", __func__); ceph::os::Transaction t; pg.get_recovery_handler()->on_local_recover(soid, _recovery_info, is_delete, t); return shard_services.get_store().do_transaction(coll, std::move(t)).then( [this, epoch_frozen, last_complete = pg.get_info().last_complete] { pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete); return seastar::make_ready_future<>(); }); } seastar::future<> ReplicatedRecoveryBackend::local_recover_delete( const hobject_t& soid, eversion_t need, epoch_t epoch_to_freeze) { logger().debug("{}: {}, {}", __func__, soid, need); return backend->load_metadata(soid).safe_then([this] (auto lomt) { if (lomt->os.exists) { return seastar::do_with(ceph::os::Transaction(), [this, lomt = std::move(lomt)](auto& txn) { return backend->remove(lomt->os, txn).then([this, &txn]() mutable { return shard_services.get_store().do_transaction(coll, std::move(txn)); }); }); } return seastar::make_ready_future<>(); }).safe_then([this, soid, epoch_to_freeze, need] { ObjectRecoveryInfo recovery_info; recovery_info.soid = soid; recovery_info.version = need; return on_local_recover_persist(soid, recovery_info, true, epoch_to_freeze); }, PGBackend::load_metadata_ertr::all_same_way( [this, soid, epoch_to_freeze, need] (auto e) { ObjectRecoveryInfo recovery_info; recovery_info.soid = soid; recovery_info.version = need; return on_local_recover_persist(soid, recovery_info, true, epoch_to_freeze); }) ); } seastar::future<> ReplicatedRecoveryBackend::recover_delete( const hobject_t &soid, eversion_t need) { logger().debug("{}: {}, {}", __func__, soid, need); epoch_t cur_epoch = pg.get_osdmap_epoch(); return seastar::do_with(object_stat_sum_t(), [this, soid, need, cur_epoch](auto& stat_diff) { return local_recover_delete(soid, need, cur_epoch).then( [this, &stat_diff, cur_epoch, soid, need] { if (!pg.has_reset_since(cur_epoch)) { bool object_missing = false; for (const auto& shard : pg.get_acting_recovery_backfill()) { if (shard == pg.get_pg_whoami()) continue; if (pg.get_shard_missing(shard)->is_missing(soid)) { logger().debug("recover_delete: soid {} needs to deleted from replca {}", soid, shard); object_missing = true; break; } } if (!object_missing) { stat_diff.num_objects_recovered = 1; return seastar::make_ready_future<>(); } else { return push_delete(soid, need); } } return seastar::make_ready_future<>(); }).then([this, soid, &stat_diff] { pg.get_recovery_handler()->on_global_recover(soid, stat_diff, true); return seastar::make_ready_future<>(); }); }); } seastar::future ReplicatedRecoveryBackend::prep_push( const hobject_t& soid, eversion_t need, pg_shard_t pg_shard) { logger().debug("{}: {}, {}", __func__, soid, need); auto& recovery_waiter = recovering.at(soid); auto& obc = recovery_waiter.obc; interval_set data_subset; if (obc->obs.oi.size) { data_subset.insert(0, obc->obs.oi.size); } const auto& missing = pg.get_shard_missing().find(pg_shard)->second; if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) { const auto it = missing.get_items().find(soid); assert(it != missing.get_items().end()); data_subset.intersection_of(it->second.clean_regions.get_dirty_regions()); logger().debug("prep_push: {} data_subset {}", soid, data_subset); } logger().debug("prep_push: {} to {}", soid, pg_shard); auto& pi = recovery_waiter.pushing[pg_shard]; pg.begin_peer_recover(pg_shard, soid); const auto pmissing_iter = pg.get_shard_missing().find(pg_shard); const auto missing_iter = pmissing_iter->second.get_items().find(soid); assert(missing_iter != pmissing_iter->second.get_items().end()); pi.obc = obc; pi.recovery_info.size = obc->obs.oi.size; pi.recovery_info.copy_subset = data_subset; pi.recovery_info.soid = soid; pi.recovery_info.oi = obc->obs.oi; pi.recovery_info.version = obc->obs.oi.version; pi.recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist(); pi.recovery_progress.omap_complete = (!missing_iter->second.clean_regions.omap_is_dirty() && HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)); return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then( [this, soid, pg_shard](auto pop) { auto& recovery_waiter = recovering.at(soid); auto& pi = recovery_waiter.pushing[pg_shard]; pi.recovery_progress = pop.after_progress; return pop; }); } void ReplicatedRecoveryBackend::prepare_pull(PullOp& po, PullInfo& pi, const hobject_t& soid, eversion_t need) { logger().debug("{}: {}, {}", __func__, soid, need); pg_missing_tracker_t local_missing = pg.get_local_missing(); const auto missing_iter = local_missing.get_items().find(soid); auto m = pg.get_missing_loc_shards(); pg_shard_t fromshard = *(m[soid].begin()); //TODO: skipped snap objects case for now po.recovery_info.copy_subset.insert(0, (uint64_t) -1); if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) po.recovery_info.copy_subset.intersection_of( missing_iter->second.clean_regions.get_dirty_regions()); po.recovery_info.size = ((uint64_t) -1); po.recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist(); po.recovery_info.soid = soid; po.soid = soid; po.recovery_progress.data_complete = false; po.recovery_progress.omap_complete = !missing_iter->second.clean_regions.omap_is_dirty() && HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS); po.recovery_progress.data_recovered_to = 0; po.recovery_progress.first = true; pi.from = fromshard; pi.soid = soid; pi.recovery_info = po.recovery_info; pi.recovery_progress = po.recovery_progress; } seastar::future ReplicatedRecoveryBackend::build_push_op( const ObjectRecoveryInfo& recovery_info, const ObjectRecoveryProgress& progress, object_stat_sum_t* stat) { logger().debug("{} {} @{}", __func__, recovery_info.soid, recovery_info.version); return seastar::do_with(ObjectRecoveryProgress(progress), uint64_t(crimson::common::local_conf() ->osd_recovery_max_chunk), recovery_info.version, PushOp(), [this, &recovery_info, &progress, stat] (auto new_progress, auto available, auto v, auto pop) { return read_metadata_for_push_op(recovery_info.soid, progress, new_progress, v, &pop).then([&](eversion_t local_ver) mutable { // If requestor didn't know the version, use ours if (v == eversion_t()) { v = local_ver; } else if (v != local_ver) { logger().error("build_push_op: {} push {} v{} failed because local copy is {}", pg.get_pgid(), recovery_info.soid, recovery_info.version, local_ver); // TODO: bail out } return read_omap_for_push_op(recovery_info.soid, progress, new_progress, &available, &pop); }).then([this, &recovery_info, &progress, &available, &pop]() mutable { logger().debug("build_push_op: available: {}, copy_subset: {}", available, recovery_info.copy_subset); return read_object_for_push_op(recovery_info.soid, recovery_info.copy_subset, progress.data_recovered_to, available, &pop); }).then([&recovery_info, &v, &progress, &new_progress, stat, &pop] (uint64_t recovered_to) mutable { new_progress.data_recovered_to = recovered_to; if (new_progress.is_complete(recovery_info)) { new_progress.data_complete = true; if (stat) stat->num_objects_recovered++; } else if (progress.first && progress.omap_complete) { // If omap is not changed, we need recovery omap // when recovery cannot be completed once new_progress.omap_complete = false; } if (stat) { stat->num_keys_recovered += pop.omap_entries.size(); stat->num_bytes_recovered += pop.data.length(); } pop.version = v; pop.soid = recovery_info.soid; pop.recovery_info = recovery_info; pop.after_progress = new_progress; pop.before_progress = progress; logger().debug("build_push_op: pop version: {}, pop data length: {}", pop.version, pop.data.length()); return seastar::make_ready_future(std::move(pop)); }); }); } seastar::future ReplicatedRecoveryBackend::read_metadata_for_push_op( const hobject_t& oid, const ObjectRecoveryProgress& progress, ObjectRecoveryProgress& new_progress, eversion_t ver, PushOp* push_op) { if (!progress.first) { return seastar::make_ready_future(ver); } return seastar::when_all_succeed( backend->omap_get_header(coll, ghobject_t(oid)).handle_error( crimson::os::FuturizedStore::read_errorator::all_same_way( [] (const std::error_code& e) { return seastar::make_ready_future(); })), store->get_attrs(coll, ghobject_t(oid)).handle_error( crimson::os::FuturizedStore::get_attrs_ertr::all_same_way( [] (const std::error_code& e) { return seastar::make_ready_future(); })) ).then_unpack([&new_progress, push_op](auto bl, auto attrs) { if (bl.length() == 0) { logger().error("read_metadata_for_push_op: fail to read omap header"); return eversion_t{}; } else if (attrs.empty()) { logger().error("read_metadata_for_push_op: fail to read attrs"); return eversion_t{}; } push_op->omap_header.claim_append(std::move(bl)); for (auto&& [key, val] : std::move(attrs)) { push_op->attrset[key].push_back(val); } logger().debug("read_metadata_for_push_op: {}", push_op->attrset[OI_ATTR]); object_info_t oi; oi.decode(push_op->attrset[OI_ATTR]); new_progress.first = false; return oi.version; }); } seastar::future ReplicatedRecoveryBackend::read_object_for_push_op( const hobject_t& oid, const interval_set& copy_subset, uint64_t offset, uint64_t max_len, PushOp* push_op) { if (max_len == 0 || copy_subset.empty()) { push_op->data_included.clear(); return seastar::make_ready_future(offset); } // 1. get the extents in the interested range return backend->fiemap(coll, ghobject_t{oid}, 0, copy_subset.range_end()).then_wrapped( [=](auto&& fiemap_included) mutable { interval_set extents; try { extents.intersection_of(copy_subset, fiemap_included.get0()); } catch (std::exception &) { // if fiemap() fails, we will read nothing, as the intersection of // copy_subset and an empty interval_set would be empty anyway extents.clear(); } // 2. we can read up to "max_len" bytes from "offset", so truncate the // extents down to this quota. no need to return the number of consumed // bytes, as this is the last consumer of this quota push_op->data_included.span_of(extents, offset, max_len); // 3. read the truncated extents // TODO: check if the returned extents are pruned return store->readv(coll, ghobject_t{oid}, push_op->data_included, 0); }).safe_then([push_op, range_end=copy_subset.range_end()](auto &&bl) { push_op->data.claim_append(std::move(bl)); uint64_t recovered_to = 0; if (push_op->data_included.empty()) { // zero filled section, skip to end! recovered_to = range_end; } else { // note down the progress, we will start from there next time recovered_to = push_op->data_included.range_end(); } return seastar::make_ready_future(recovered_to); }, PGBackend::read_errorator::all_same_way([](auto e) { logger().debug("build_push_op: read exception"); return seastar::make_exception_future(e); })); } seastar::future<> ReplicatedRecoveryBackend::read_omap_for_push_op( const hobject_t& oid, const ObjectRecoveryProgress& progress, ObjectRecoveryProgress& new_progress, uint64_t* max_len, PushOp* push_op) { if (progress.omap_complete) { return seastar::make_ready_future<>(); } return shard_services.get_store().get_omap_iterator(coll, ghobject_t{oid}) .then([&progress, &new_progress, max_len, push_op](auto omap_iter) { return omap_iter->lower_bound(progress.omap_recovered_to).then( [omap_iter, &new_progress, max_len, push_op] { return seastar::do_until([omap_iter, &new_progress, max_len, push_op] { if (!omap_iter->valid()) { new_progress.omap_complete = true; return true; } if (push_op->omap_entries.empty()) { return false; } if (const uint64_t entries_per_chunk = crimson::common::local_conf()->osd_recovery_max_omap_entries_per_chunk; entries_per_chunk > 0 && push_op->omap_entries.size() >= entries_per_chunk) { new_progress.omap_recovered_to = omap_iter->key(); return true; } if (omap_iter->key().size() + omap_iter->value().length() > *max_len) { new_progress.omap_recovered_to = omap_iter->key(); return true; } return false; }, [omap_iter, max_len, push_op] { push_op->omap_entries.emplace(omap_iter->key(), omap_iter->value()); if (const uint64_t entry_size = omap_iter->key().size() + omap_iter->value().length(); entry_size > *max_len) { *max_len -= entry_size; } else { *max_len = 0; } return omap_iter->next(); }); }); }); } std::vector ReplicatedRecoveryBackend::get_shards_to_push(const hobject_t& soid) const { std::vector shards; assert(pg.get_acting_recovery_backfill().size() > 0); for (const auto& peer : pg.get_acting_recovery_backfill()) { if (peer == pg.get_pg_whoami()) continue; auto shard_missing = pg.get_shard_missing().find(peer); assert(shard_missing != pg.get_shard_missing().end()); if (shard_missing->second.is_missing(soid)) { shards.push_back(shard_missing->first); } } return shards; } seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref m) { logger().debug("{}: {}", __func__, *m); return seastar::parallel_for_each(m->take_pulls(), [this, from=m->from](auto& pull_op) { const hobject_t& soid = pull_op.soid; logger().debug("handle_pull: {}", soid); return backend->stat(coll, ghobject_t(soid)).then( [this, &pull_op](auto st) { ObjectRecoveryInfo &recovery_info = pull_op.recovery_info; ObjectRecoveryProgress &progress = pull_op.recovery_progress; if (progress.first && recovery_info.size == ((uint64_t) -1)) { // Adjust size and copy_subset recovery_info.size = st.st_size; if (st.st_size) { interval_set object_range; object_range.insert(0, st.st_size); recovery_info.copy_subset.intersection_of(object_range); } else { recovery_info.copy_subset.clear(); } assert(recovery_info.clone_subset.empty()); } return build_push_op(recovery_info, progress, 0); }).then([this, from](auto pop) { auto msg = make_message(); msg->from = pg.get_pg_whoami(); msg->pgid = pg.get_pgid(); msg->map_epoch = pg.get_osdmap_epoch(); msg->min_epoch = pg.get_last_peering_reset(); msg->set_priority(pg.get_recovery_op_priority()); msg->pushes.push_back(std::move(pop)); return shard_services.send_to_osd(from.osd, std::move(msg), pg.get_osdmap_epoch()); }); }); } seastar::future ReplicatedRecoveryBackend::_handle_pull_response( pg_shard_t from, const PushOp& pop, PullOp* response, ceph::os::Transaction* t) { logger().debug("handle_pull_response {} {} data.size() is {} data_included: {}", pop.recovery_info, pop.after_progress, pop.data.length(), pop.data_included); const hobject_t &hoid = pop.soid; auto& recovery_waiter = recovering.at(hoid); auto& pi = *recovery_waiter.pi; if (pi.recovery_info.size == (uint64_t(-1))) { pi.recovery_info.size = pop.recovery_info.size; pi.recovery_info.copy_subset.intersection_of( pop.recovery_info.copy_subset); } // If primary doesn't have object info and didn't know version if (pi.recovery_info.version == eversion_t()) pi.recovery_info.version = pop.version; auto prepare_waiter = seastar::make_ready_future<>(); if (pi.recovery_progress.first) { prepare_waiter = pg.with_head_obc( pi.recovery_info.soid, [&pi, &recovery_waiter, &pop](auto obc) { pi.obc = obc; recovery_waiter.obc = obc; obc->obs.oi.decode(pop.attrset.at(OI_ATTR)); pi.recovery_info.oi = obc->obs.oi; return crimson::osd::PG::load_obc_ertr::now(); }).handle_error(crimson::ct_error::assert_all{}); }; return prepare_waiter.then([this, &pi, &pop, t, response]() mutable { const bool first = pi.recovery_progress.first; pi.recovery_progress = pop.after_progress; logger().debug("new recovery_info {}, new progress {}", pi.recovery_info, pi.recovery_progress); interval_set data_zeros; { uint64_t offset = pop.before_progress.data_recovered_to; uint64_t length = (pop.after_progress.data_recovered_to - pop.before_progress.data_recovered_to); if (length) { data_zeros.insert(offset, length); } } auto [usable_intervals, data] = trim_pushed_data(pi.recovery_info.copy_subset, pop.data_included, pop.data); bool complete = pi.is_complete(); bool clear_omap = !pop.before_progress.omap_complete; return submit_push_data(pi.recovery_info, first, complete, clear_omap, std::move(data_zeros), usable_intervals, data, pop.omap_header, pop.attrset, pop.omap_entries, t).then( [this, response, &pi, &pop, complete, t, bytes_recovered=data.length()] { pi.stat.num_keys_recovered += pop.omap_entries.size(); pi.stat.num_bytes_recovered += bytes_recovered; if (complete) { pi.stat.num_objects_recovered++; pg.get_recovery_handler()->on_local_recover( pop.soid, recovering.at(pop.soid).pi->recovery_info, false, *t); return true; } else { response->soid = pop.soid; response->recovery_info = pi.recovery_info; response->recovery_progress = pi.recovery_progress; return false; } }); }); } seastar::future<> ReplicatedRecoveryBackend::handle_pull_response( Ref m) { const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now. if (pop.version == eversion_t()) { // replica doesn't have it! pg.get_recovery_handler()->on_failed_recover({ m->from }, pop.soid, get_recovering(pop.soid).pi->recovery_info.version); return seastar::make_exception_future<>( std::runtime_error(fmt::format( "Error on pushing side {} when pulling obj {}", m->from, pop.soid))); } logger().debug("{}: {}", __func__, *m); return seastar::do_with(PullOp(), [this, m](auto& response) { return seastar::do_with(ceph::os::Transaction(), m.get(), [this, &response](auto& t, auto& m) { pg_shard_t from = m->from; PushOp& pop = m->pushes[0]; // only one push per message for now return _handle_pull_response(from, pop, &response, &t).then( [this, &t](bool complete) { epoch_t epoch_frozen = pg.get_osdmap_epoch(); return shard_services.get_store().do_transaction(coll, std::move(t)) .then([this, epoch_frozen, complete, last_complete = pg.get_info().last_complete] { pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete); return seastar::make_ready_future(complete); }); }); }).then([this, m, &response](bool complete) { if (complete) { auto& pop = m->pushes[0]; recovering.at(pop.soid).set_pulled(); return seastar::make_ready_future<>(); } else { auto reply = make_message(); reply->from = pg.get_pg_whoami(); reply->set_priority(m->get_priority()); reply->pgid = pg.get_info().pgid; reply->map_epoch = m->map_epoch; reply->min_epoch = m->min_epoch; reply->set_pulls({std::move(response)}); return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch()); } }); }); } seastar::future<> ReplicatedRecoveryBackend::_handle_push( pg_shard_t from, const PushOp &pop, PushReplyOp *response, ceph::os::Transaction *t) { logger().debug("{}", __func__); bool first = pop.before_progress.first; interval_set data_zeros; { uint64_t offset = pop.before_progress.data_recovered_to; uint64_t length = (pop.after_progress.data_recovered_to - pop.before_progress.data_recovered_to); if (length) { data_zeros.insert(offset, length); } } bool complete = (pop.after_progress.data_complete && pop.after_progress.omap_complete); bool clear_omap = !pop.before_progress.omap_complete; response->soid = pop.recovery_info.soid; return submit_push_data(pop.recovery_info, first, complete, clear_omap, std::move(data_zeros), pop.data_included, pop.data, pop.omap_header, pop.attrset, pop.omap_entries, t).then([this, complete, &pop, t] { if (complete) { pg.get_recovery_handler()->on_local_recover( pop.recovery_info.soid, pop.recovery_info, false, *t); } }); } seastar::future<> ReplicatedRecoveryBackend::handle_push( Ref m) { if (pg.is_primary()) { return handle_pull_response(m); } logger().debug("{}: {}", __func__, *m); return seastar::do_with(PushReplyOp(), [this, m](auto& response) { const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now return seastar::do_with(ceph::os::Transaction(), [this, m, &pop, &response](auto& t) { return _handle_push(m->from, pop, &response, &t).then( [this, &t] { epoch_t epoch_frozen = pg.get_osdmap_epoch(); return shard_services.get_store().do_transaction(coll, std::move(t)).then( [this, epoch_frozen, last_complete = pg.get_info().last_complete] { //TODO: this should be grouped with pg.on_local_recover somehow. pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete); }); }); }).then([this, m, &response]() mutable { auto reply = make_message(); reply->from = pg.get_pg_whoami(); reply->set_priority(m->get_priority()); reply->pgid = pg.get_info().pgid; reply->map_epoch = m->map_epoch; reply->min_epoch = m->min_epoch; std::vector replies = { std::move(response) }; reply->replies.swap(replies); return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch()); }); }); } seastar::future> ReplicatedRecoveryBackend::_handle_push_reply( pg_shard_t peer, const PushReplyOp &op) { const hobject_t& soid = op.soid; logger().debug("{}, soid {}, from {}", __func__, soid, peer); auto recovering_iter = recovering.find(soid); if (recovering_iter == recovering.end() || !recovering_iter->second.pushing.count(peer)) { logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer); return seastar::make_ready_future>(); } else { auto& pi = recovering_iter->second.pushing[peer]; bool error = pi.recovery_progress.error; if (!pi.recovery_progress.data_complete && !error) { return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then([&pi] (auto pop) { pi.recovery_progress = pop.after_progress; return seastar::make_ready_future>(std::move(pop)); }).handle_exception([recovering_iter, &pi, peer] (auto e) { pi.recovery_progress.error = true; recovering_iter->second.set_push_failed(peer, e); return seastar::make_ready_future>(); }); } if (!error) { pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info); } recovering_iter->second.set_pushed(peer); return seastar::make_ready_future>(); } } seastar::future<> ReplicatedRecoveryBackend::handle_push_reply( Ref m) { logger().debug("{}: {}", __func__, *m); auto from = m->from; auto& push_reply = m->replies[0]; //TODO: only one reply per message return _handle_push_reply(from, push_reply).then( [this, from](std::optional push_op) { if (push_op) { auto msg = make_message(); msg->from = pg.get_pg_whoami(); msg->pgid = pg.get_pgid(); msg->map_epoch = pg.get_osdmap_epoch(); msg->min_epoch = pg.get_last_peering_reset(); msg->set_priority(pg.get_recovery_op_priority()); msg->pushes.push_back(std::move(*push_op)); return shard_services.send_to_osd(from.osd, std::move(msg), pg.get_osdmap_epoch()); } else { return seastar::make_ready_future<>(); } }); } std::pair, bufferlist> ReplicatedRecoveryBackend::trim_pushed_data( const interval_set ©_subset, const interval_set &intervals_received, ceph::bufferlist data_received) { logger().debug("{}", __func__); // what i have is only a subset of what i want if (intervals_received.subset_of(copy_subset)) { return {intervals_received, data_received}; } // only collect the extents included by copy_subset and intervals_received interval_set intervals_usable; bufferlist data_usable; intervals_usable.intersection_of(copy_subset, intervals_received); uint64_t have_off = 0; for (auto [have_start, have_len] : intervals_received) { interval_set want; want.insert(have_start, have_len); want.intersection_of(copy_subset); for (auto [want_start, want_len] : want) { bufferlist sub; uint64_t data_off = have_off + (want_start - have_start); sub.substr_of(data_received, data_off, want_len); data_usable.claim_append(sub); } have_off += have_len; } return {intervals_usable, data_usable}; } seastar::future<> ReplicatedRecoveryBackend::submit_push_data( const ObjectRecoveryInfo &recovery_info, bool first, bool complete, bool clear_omap, interval_set data_zeros, const interval_set &intervals_included, bufferlist data_included, bufferlist omap_header, const map &attrs, const map &omap_entries, ObjectStore::Transaction *t) { logger().debug("{}", __func__); hobject_t target_oid; if (first && complete) { target_oid = recovery_info.soid; } else { target_oid = get_temp_recovery_object(recovery_info.soid, recovery_info.version); if (first) { logger().debug("{}: Adding oid {} in the temp collection", __func__, target_oid); add_temp_obj(target_oid); } } return [this, &recovery_info, first, complete, t, &omap_header, &attrs, target_oid, clear_omap] { if (first) { if (!complete) { t->remove(coll->get_cid(), ghobject_t(target_oid)); t->touch(coll->get_cid(), ghobject_t(target_oid)); bufferlist bv = attrs.at(OI_ATTR); object_info_t oi(bv); t->set_alloc_hint(coll->get_cid(), ghobject_t(target_oid), oi.expected_object_size, oi.expected_write_size, oi.alloc_hint_flags); } else { if (!recovery_info.object_exist) { t->remove(coll->get_cid(), ghobject_t(target_oid)); t->touch(coll->get_cid(), ghobject_t(target_oid)); bufferlist bv = attrs.at(OI_ATTR); object_info_t oi(bv); t->set_alloc_hint(coll->get_cid(), ghobject_t(target_oid), oi.expected_object_size, oi.expected_write_size, oi.alloc_hint_flags); } //remove xattr and update later if overwrite on original object t->rmattrs(coll->get_cid(), ghobject_t(target_oid)); //if need update omap, clear the previous content first if (clear_omap) t->omap_clear(coll->get_cid(), ghobject_t(target_oid)); } t->truncate(coll->get_cid(), ghobject_t(target_oid), recovery_info.size); if (omap_header.length()) t->omap_setheader(coll->get_cid(), ghobject_t(target_oid), omap_header); return store->stat(coll, ghobject_t(recovery_info.soid)).then( [this, &recovery_info, complete, t, target_oid, omap_header = std::move(omap_header)] (auto st) { //TODO: pg num bytes counting if (!complete) { //clone overlap content in local object if (recovery_info.object_exist) { uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size); interval_set local_intervals_included, local_intervals_excluded; if (local_size) { local_intervals_included.insert(0, local_size); local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset); local_intervals_included.subtract(local_intervals_excluded); } for (auto [off, len] : local_intervals_included) { logger().debug(" clone_range {} {}~{}", recovery_info.soid, off, len); t->clone_range(coll->get_cid(), ghobject_t(recovery_info.soid), ghobject_t(target_oid), off, len, off); } } } return seastar::make_ready_future<>(); }); } return seastar::make_ready_future<>(); }().then([this, data_zeros=std::move(data_zeros), &recovery_info, &intervals_included, t, target_oid, &omap_entries, &attrs, data_included, complete, first]() mutable { uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL; // Punch zeros for data, if fiemap indicates nothing but it is marked dirty if (!data_zeros.empty()) { data_zeros.intersection_of(recovery_info.copy_subset); assert(intervals_included.subset_of(data_zeros)); data_zeros.subtract(intervals_included); logger().debug("submit_push_data recovering object {} copy_subset: {} " "intervals_included: {} data_zeros: {}", recovery_info.soid, recovery_info.copy_subset, intervals_included, data_zeros); for (auto [start, len] : data_zeros) { t->zero(coll->get_cid(), ghobject_t(target_oid), start, len); } } uint64_t off = 0; for (auto [start, len] : intervals_included) { bufferlist bit; bit.substr_of(data_included, off, len); t->write(coll->get_cid(), ghobject_t(target_oid), start, len, bit, fadvise_flags); off += len; } if (!omap_entries.empty()) t->omap_setkeys(coll->get_cid(), ghobject_t(target_oid), omap_entries); if (!attrs.empty()) t->setattrs(coll->get_cid(), ghobject_t(target_oid), attrs); if (complete) { if (!first) { logger().debug("submit_push_data: Removing oid {} from the temp collection", target_oid); clear_temp_obj(target_oid); t->remove(coll->get_cid(), ghobject_t(recovery_info.soid)); t->collection_move_rename(coll->get_cid(), ghobject_t(target_oid), coll->get_cid(), ghobject_t(recovery_info.soid)); } submit_push_complete(recovery_info, t); } logger().debug("submit_push_data: done"); return seastar::make_ready_future<>(); }); } void ReplicatedRecoveryBackend::submit_push_complete( const ObjectRecoveryInfo &recovery_info, ObjectStore::Transaction *t) { for (const auto& [oid, extents] : recovery_info.clone_subset) { for (const auto [off, len] : extents) { logger().debug(" clone_range {} {}~{}", oid, off, len); t->clone_range(coll->get_cid(), ghobject_t(oid), ghobject_t(recovery_info.soid), off, len, off); } } } seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete_reply( Ref m) { auto& p = m->objects.front(); hobject_t soid = p.first; ObjectRecoveryInfo recovery_info; recovery_info.version = p.second; pg.get_recovery_handler()->on_peer_recover(m->from, soid, recovery_info); get_recovering(soid).set_pushed(m->from); return seastar::now(); } seastar::future<> ReplicatedRecoveryBackend::handle_recovery_op(Ref m) { switch (m->get_header().type) { case MSG_OSD_PG_PULL: return handle_pull(boost::static_pointer_cast(m)); case MSG_OSD_PG_PUSH: return handle_push(boost::static_pointer_cast(m)); case MSG_OSD_PG_PUSH_REPLY: return handle_push_reply( boost::static_pointer_cast(m)); case MSG_OSD_PG_RECOVERY_DELETE: return handle_recovery_delete( boost::static_pointer_cast(m)); case MSG_OSD_PG_RECOVERY_DELETE_REPLY: return handle_recovery_delete_reply( boost::static_pointer_cast(m)); default: // delegate to parent class for handling backend-agnostic recovery ops. return RecoveryBackend::handle_recovery_op(std::move(m)); } }