summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/pg.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/crimson/osd/pg.cc1544
1 files changed, 1544 insertions, 0 deletions
diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc
new file mode 100644
index 000000000..7cf3b158c
--- /dev/null
+++ b/src/crimson/osd/pg.cc
@@ -0,0 +1,1544 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "pg.h"
+
+#include <functional>
+
+#include <boost/range/adaptor/filtered.hpp>
+#include <boost/range/adaptor/map.hpp>
+#include <boost/range/adaptor/transformed.hpp>
+#include <boost/range/algorithm/copy.hpp>
+#include <boost/range/algorithm/max_element.hpp>
+#include <boost/range/numeric.hpp>
+#include <fmt/format.h>
+#include <fmt/ostream.h>
+
+#include "common/hobject_fmt.h"
+
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+#include "messages/MOSDRepOp.h"
+#include "messages/MOSDRepOpReply.h"
+
+#include "osd/OSDMap.h"
+#include "osd/osd_types_fmt.h"
+
+#include "os/Transaction.h"
+
+#include "crimson/common/exception.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/os/cyanstore/cyan_store.h"
+#include "crimson/os/futurized_collection.h"
+#include "crimson/osd/exceptions.h"
+#include "crimson/osd/pg_meta.h"
+#include "crimson/osd/pg_backend.h"
+#include "crimson/osd/ops_executer.h"
+#include "crimson/osd/osd_operations/osdop_params.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/background_recovery.h"
+#include "crimson/osd/osd_operations/snaptrim_event.h"
+#include "crimson/osd/pg_recovery.h"
+#include "crimson/osd/replicated_recovery_backend.h"
+#include "crimson/osd/watch.h"
+
+using std::ostream;
+using std::set;
+using std::string;
+using std::vector;
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace std::chrono {
+std::ostream& operator<<(std::ostream& out, const signedspan& d)
+{
+ auto s = std::chrono::duration_cast<std::chrono::seconds>(d).count();
+ auto ns = std::abs((d % 1s).count());
+ fmt::print(out, "{}{}s", s, ns ? fmt::format(".{:0>9}", ns) : "");
+ return out;
+}
+}
+
+template <typename T>
+struct fmt::formatter<std::optional<T>> : fmt::formatter<T> {
+ template <typename FormatContext>
+ auto format(const std::optional<T>& v, FormatContext& ctx) const {
+ if (v.has_value()) {
+ return fmt::formatter<T>::format(*v, ctx);
+ }
+ return fmt::format_to(ctx.out(), "<null>");
+ }
+};
+
+namespace crimson::osd {
+
+using crimson::common::local_conf;
+
+class RecoverablePredicate : public IsPGRecoverablePredicate {
+public:
+ bool operator()(const set<pg_shard_t> &have) const override {
+ return !have.empty();
+ }
+};
+
+class ReadablePredicate: public IsPGReadablePredicate {
+ pg_shard_t whoami;
+public:
+ explicit ReadablePredicate(pg_shard_t whoami) : whoami(whoami) {}
+ bool operator()(const set<pg_shard_t> &have) const override {
+ return have.count(whoami);
+ }
+};
+
+PG::PG(
+ spg_t pgid,
+ pg_shard_t pg_shard,
+ crimson::os::CollectionRef coll_ref,
+ pg_pool_t&& pool,
+ std::string&& name,
+ cached_map_t osdmap,
+ ShardServices &shard_services,
+ ec_profile_t profile)
+ : pgid{pgid},
+ pg_whoami{pg_shard},
+ coll_ref{coll_ref},
+ pgmeta_oid{pgid.make_pgmeta_oid()},
+ osdmap_gate("PG::osdmap_gate"),
+ shard_services{shard_services},
+ backend(
+ PGBackend::create(
+ pgid.pgid,
+ pg_shard,
+ pool,
+ coll_ref,
+ shard_services,
+ profile,
+ *this)),
+ recovery_backend(
+ std::make_unique<ReplicatedRecoveryBackend>(
+ *this, shard_services, coll_ref, backend.get())),
+ recovery_handler(
+ std::make_unique<PGRecovery>(this)),
+ peering_state(
+ shard_services.get_cct(),
+ pg_shard,
+ pgid,
+ PGPool(
+ osdmap,
+ pgid.pool(),
+ pool,
+ name),
+ osdmap,
+ this,
+ this),
+ obc_registry{
+ local_conf()},
+ obc_loader{
+ obc_registry,
+ *backend.get(),
+ *this},
+ osdriver(
+ &shard_services.get_store(),
+ coll_ref,
+ pgid.make_pgmeta_oid()),
+ snap_mapper(
+ this->shard_services.get_cct(),
+ &osdriver,
+ pgid.ps(),
+ pgid.get_split_bits(pool.get_pg_num()),
+ pgid.pool(),
+ pgid.shard),
+ wait_for_active_blocker(this)
+{
+ peering_state.set_backend_predicates(
+ new ReadablePredicate(pg_whoami),
+ new RecoverablePredicate());
+ osdmap_gate.got_map(osdmap->get_epoch());
+}
+
+PG::~PG() {}
+
+void PG::check_blocklisted_watchers()
+{
+ logger().debug("{}", __func__);
+ obc_registry.for_each([this](ObjectContextRef obc) {
+ assert(obc);
+ for (const auto& [key, watch] : obc->watchers) {
+ assert(watch->get_pg() == this);
+ const auto& ea = watch->get_peer_addr();
+ logger().debug("watch: Found {} cookie {}. Checking entity_add_t {}",
+ watch->get_entity(), watch->get_cookie(), ea);
+ if (get_osdmap()->is_blocklisted(ea)) {
+ logger().info("watch: Found blocklisted watcher for {}", ea);
+ watch->do_watch_timeout();
+ }
+ }
+ });
+}
+
+bool PG::try_flush_or_schedule_async() {
+ logger().debug("PG::try_flush_or_schedule_async: flush ...");
+ (void)shard_services.get_store().flush(
+ coll_ref
+ ).then(
+ [this, epoch=get_osdmap_epoch()]() {
+ return shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ pg_whoami,
+ pgid,
+ epoch,
+ epoch,
+ PeeringState::IntervalFlush());
+ });
+ return false;
+}
+
+void PG::publish_stats_to_osd()
+{
+ if (!is_primary())
+ return;
+ if (auto new_pg_stats = peering_state.prepare_stats_for_publish(
+ pg_stats,
+ object_stat_collection_t());
+ new_pg_stats.has_value()) {
+ pg_stats = std::move(new_pg_stats);
+ }
+}
+
+void PG::clear_publish_stats()
+{
+ pg_stats.reset();
+}
+
+pg_stat_t PG::get_stats() const
+{
+ return pg_stats.value_or(pg_stat_t{});
+}
+
+void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay)
+{
+ // handle the peering event in the background
+ logger().debug(
+ "{}: PG::queue_check_readable lpr: {}, delay: {}",
+ *this, last_peering_reset, delay);
+ check_readable_timer.cancel();
+ check_readable_timer.set_callback([last_peering_reset, this] {
+ logger().debug(
+ "{}: PG::queue_check_readable callback lpr: {}",
+ *this, last_peering_reset);
+ (void) shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ pg_whoami,
+ pgid,
+ last_peering_reset,
+ last_peering_reset,
+ PeeringState::CheckReadable{});
+ });
+ check_readable_timer.arm(
+ std::chrono::duration_cast<seastar::lowres_clock::duration>(delay));
+}
+
+void PG::recheck_readable()
+{
+ bool changed = false;
+ const auto mnow = shard_services.get_mnow();
+ if (peering_state.state_test(PG_STATE_WAIT)) {
+ auto prior_readable_until_ub = peering_state.get_prior_readable_until_ub();
+ if (mnow < prior_readable_until_ub) {
+ logger().info(
+ "{}: {} will wait (mnow {} < prior_readable_until_ub {})",
+ *this, __func__, mnow, prior_readable_until_ub);
+ queue_check_readable(
+ peering_state.get_last_peering_reset(),
+ prior_readable_until_ub - mnow);
+ } else {
+ logger().info(
+ "{}:{} no longer wait (mnow {} >= prior_readable_until_ub {})",
+ *this, __func__, mnow, prior_readable_until_ub);
+ peering_state.state_clear(PG_STATE_WAIT);
+ peering_state.clear_prior_readable_until_ub();
+ changed = true;
+ }
+ }
+ if (peering_state.state_test(PG_STATE_LAGGY)) {
+ auto readable_until = peering_state.get_readable_until();
+ if (readable_until == readable_until.zero()) {
+ logger().info(
+ "{}:{} still laggy (mnow {}, readable_until zero)",
+ *this, __func__, mnow);
+ } else if (mnow >= readable_until) {
+ logger().info(
+ "{}:{} still laggy (mnow {} >= readable_until {})",
+ *this, __func__, mnow, readable_until);
+ } else {
+ logger().info(
+ "{}:{} no longer laggy (mnow {} < readable_until {})",
+ *this, __func__, mnow, readable_until);
+ peering_state.state_clear(PG_STATE_LAGGY);
+ changed = true;
+ }
+ }
+ if (changed) {
+ publish_stats_to_osd();
+ if (!peering_state.state_test(PG_STATE_WAIT) &&
+ !peering_state.state_test(PG_STATE_LAGGY)) {
+ // TODO: requeue ops waiting for readable
+ }
+ }
+}
+
+unsigned PG::get_target_pg_log_entries() const
+{
+ const unsigned local_num_pgs = shard_services.get_num_local_pgs();
+ const unsigned local_target =
+ local_conf().get_val<uint64_t>("osd_target_pg_log_entries_per_osd") /
+ seastar::smp::count;
+ const unsigned min_pg_log_entries =
+ local_conf().get_val<uint64_t>("osd_min_pg_log_entries");
+ if (local_num_pgs > 0 && local_target > 0) {
+ // target an even spread of our budgeted log entries across all
+ // PGs. note that while we only get to control the entry count
+ // for primary PGs, we'll normally be responsible for a mix of
+ // primary and replica PGs (for the same pool(s) even), so this
+ // will work out.
+ const unsigned max_pg_log_entries =
+ local_conf().get_val<uint64_t>("osd_max_pg_log_entries");
+ return std::clamp(local_target / local_num_pgs,
+ min_pg_log_entries,
+ max_pg_log_entries);
+ } else {
+ // fall back to a per-pg value.
+ return min_pg_log_entries;
+ }
+}
+
+void PG::on_removal(ceph::os::Transaction &t) {
+ t.register_on_commit(
+ new LambdaContext(
+ [this](int r) {
+ ceph_assert(r == 0);
+ (void)shard_services.start_operation<LocalPeeringEvent>(
+ this, pg_whoami, pgid, float(0.001), get_osdmap_epoch(),
+ get_osdmap_epoch(), PeeringState::DeleteSome());
+ }));
+}
+
+void PG::on_activate(interval_set<snapid_t> snaps)
+{
+ logger().debug("{}: {} snaps={}", *this, __func__, snaps);
+ snap_trimq = std::move(snaps);
+ projected_last_update = peering_state.get_info().last_update;
+}
+
+void PG::on_activate_complete()
+{
+ wait_for_active_blocker.unblock();
+
+ if (peering_state.needs_recovery()) {
+ logger().info("{}: requesting recovery",
+ __func__);
+ (void) shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ pg_whoami,
+ pgid,
+ float(0.001),
+ get_osdmap_epoch(),
+ get_osdmap_epoch(),
+ PeeringState::DoRecovery{});
+ } else if (peering_state.needs_backfill()) {
+ logger().info("{}: requesting backfill",
+ __func__);
+ (void) shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ pg_whoami,
+ pgid,
+ float(0.001),
+ get_osdmap_epoch(),
+ get_osdmap_epoch(),
+ PeeringState::RequestBackfill{});
+ } else {
+ logger().debug("{}: no need to recover or backfill, AllReplicasRecovered",
+ " for pg: {}", __func__, pgid);
+ (void) shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ pg_whoami,
+ pgid,
+ float(0.001),
+ get_osdmap_epoch(),
+ get_osdmap_epoch(),
+ PeeringState::AllReplicasRecovered{});
+ }
+ publish_stats_to_osd();
+}
+
+void PG::prepare_write(pg_info_t &info,
+ pg_info_t &last_written_info,
+ PastIntervals &past_intervals,
+ PGLog &pglog,
+ bool dirty_info,
+ bool dirty_big_info,
+ bool need_write_epoch,
+ ceph::os::Transaction &t)
+{
+ std::map<string,bufferlist> km;
+ std::string key_to_remove;
+ if (dirty_big_info || dirty_info) {
+ int ret = prepare_info_keymap(
+ shard_services.get_cct(),
+ &km,
+ &key_to_remove,
+ get_osdmap_epoch(),
+ info,
+ last_written_info,
+ past_intervals,
+ dirty_big_info,
+ need_write_epoch,
+ true,
+ nullptr,
+ this);
+ ceph_assert(ret == 0);
+ }
+ pglog.write_log_and_missing(
+ t, &km, coll_ref->get_cid(), pgmeta_oid,
+ peering_state.get_pgpool().info.require_rollback());
+ if (!km.empty()) {
+ t.omap_setkeys(coll_ref->get_cid(), pgmeta_oid, km);
+ }
+ if (!key_to_remove.empty()) {
+ t.omap_rmkey(coll_ref->get_cid(), pgmeta_oid, key_to_remove);
+ }
+}
+
+std::pair<ghobject_t, bool>
+PG::do_delete_work(ceph::os::Transaction &t, ghobject_t _next)
+{
+ logger().info("removing pg {}", pgid);
+ auto fut = interruptor::make_interruptible(
+ shard_services.get_store().list_objects(
+ coll_ref,
+ _next,
+ ghobject_t::get_max(),
+ local_conf()->osd_target_transaction_size));
+
+ auto [objs_to_rm, next] = fut.get();
+ if (objs_to_rm.empty()) {
+ logger().info("all objs removed, removing coll for {}", pgid);
+ t.remove(coll_ref->get_cid(), pgmeta_oid);
+ t.remove_collection(coll_ref->get_cid());
+ (void) shard_services.get_store().do_transaction(
+ coll_ref, std::move(t)).then([this] {
+ return shard_services.remove_pg(pgid);
+ });
+ return {next, false};
+ } else {
+ for (auto &obj : objs_to_rm) {
+ if (obj == pgmeta_oid) {
+ continue;
+ }
+ logger().trace("pg {}, removing obj {}", pgid, obj);
+ t.remove(coll_ref->get_cid(), obj);
+ }
+ t.register_on_commit(
+ new LambdaContext([this](int r) {
+ ceph_assert(r == 0);
+ logger().trace("triggering more pg delete {}", pgid);
+ (void) shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ pg_whoami,
+ pgid,
+ float(0.001),
+ get_osdmap_epoch(),
+ get_osdmap_epoch(),
+ PeeringState::DeleteSome{});
+ }));
+ return {next, true};
+ }
+}
+
+Context *PG::on_clean()
+{
+ // Not needed yet (will be needed for IO unblocking)
+ return nullptr;
+}
+
+void PG::on_active_actmap()
+{
+ logger().debug("{}: {} snap_trimq={}", *this, __func__, snap_trimq);
+ peering_state.state_clear(PG_STATE_SNAPTRIM_ERROR);
+ // loops until snap_trimq is empty or SNAPTRIM_ERROR.
+ std::ignore = seastar::do_until(
+ [this] { return snap_trimq.empty()
+ || peering_state.state_test(PG_STATE_SNAPTRIM_ERROR);
+ },
+ [this] {
+ peering_state.state_set(PG_STATE_SNAPTRIM);
+ publish_stats_to_osd();
+ const auto to_trim = snap_trimq.range_start();
+ snap_trimq.erase(to_trim);
+ const auto needs_pause = !snap_trimq.empty();
+ return seastar::repeat([to_trim, needs_pause, this] {
+ logger().debug("{}: going to start SnapTrimEvent, to_trim={}",
+ *this, to_trim);
+ return shard_services.start_operation<SnapTrimEvent>(
+ this,
+ snap_mapper,
+ to_trim,
+ needs_pause
+ ).second.handle_error(
+ crimson::ct_error::enoent::handle([this] {
+ logger().error("{}: ENOENT saw, trimming stopped", *this);
+ peering_state.state_set(PG_STATE_SNAPTRIM_ERROR);
+ publish_stats_to_osd();
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ }), crimson::ct_error::eagain::handle([this] {
+ logger().info("{}: EAGAIN saw, trimming restarted", *this);
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ })
+ );
+ }).then([this, trimmed=to_trim] {
+ logger().debug("{}: trimmed snap={}", *this, trimmed);
+ });
+ }).finally([this] {
+ logger().debug("{}: PG::on_active_actmap() finished trimming",
+ *this);
+ peering_state.state_clear(PG_STATE_SNAPTRIM);
+ peering_state.state_clear(PG_STATE_SNAPTRIM_ERROR);
+ publish_stats_to_osd();
+ });
+}
+
+void PG::on_active_advmap(const OSDMapRef &osdmap)
+{
+ const auto new_removed_snaps = osdmap->get_new_removed_snaps();
+ if (auto it = new_removed_snaps.find(get_pgid().pool());
+ it != new_removed_snaps.end()) {
+ bool bad = false;
+ for (auto j : it->second) {
+ if (snap_trimq.intersects(j.first, j.second)) {
+ decltype(snap_trimq) added, overlap;
+ added.insert(j.first, j.second);
+ overlap.intersection_of(snap_trimq, added);
+ logger().error("{}: {} removed_snaps already contains {}",
+ *this, __func__, overlap);
+ bad = true;
+ snap_trimq.union_of(added);
+ } else {
+ snap_trimq.insert(j.first, j.second);
+ }
+ }
+ logger().info("{}: {} new removed snaps {}, snap_trimq now{}",
+ *this, __func__, it->second, snap_trimq);
+ assert(!bad || !local_conf().get_val<bool>("osd_debug_verify_cached_snaps"));
+ }
+}
+
+void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
+{
+ // TODO: should update the stats upon finishing the scrub
+ peering_state.update_stats([scrub_level, this](auto& history, auto& stats) {
+ const utime_t now = ceph_clock_now();
+ history.last_scrub = peering_state.get_info().last_update;
+ history.last_scrub_stamp = now;
+ history.last_clean_scrub_stamp = now;
+ if (scrub_level == scrub_level_t::deep) {
+ history.last_deep_scrub = history.last_scrub;
+ history.last_deep_scrub_stamp = now;
+ }
+ // yes, please publish the stats
+ return true;
+ });
+}
+
+void PG::log_state_enter(const char *state) {
+ logger().info("Entering state: {}", state);
+}
+
+void PG::log_state_exit(
+ const char *state_name, utime_t enter_time,
+ uint64_t events, utime_t event_dur) {
+ logger().info(
+ "Exiting state: {}, entered at {}, {} spent on {} events",
+ state_name,
+ enter_time,
+ event_dur,
+ events);
+}
+
+ceph::signedspan PG::get_mnow() const
+{
+ return shard_services.get_mnow();
+}
+
+HeartbeatStampsRef PG::get_hb_stamps(int peer)
+{
+ return shard_services.get_hb_stamps(peer);
+}
+
+void PG::schedule_renew_lease(epoch_t last_peering_reset, ceph::timespan delay)
+{
+ // handle the peering event in the background
+ renew_lease_timer.cancel();
+ renew_lease_timer.set_callback([last_peering_reset, this] {
+ (void) shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ pg_whoami,
+ pgid,
+ last_peering_reset,
+ last_peering_reset,
+ RenewLease{});
+ });
+ renew_lease_timer.arm(
+ std::chrono::duration_cast<seastar::lowres_clock::duration>(delay));
+}
+
+
+void PG::init(
+ int role,
+ const vector<int>& newup, int new_up_primary,
+ const vector<int>& newacting, int new_acting_primary,
+ const pg_history_t& history,
+ const PastIntervals& pi,
+ ObjectStore::Transaction &t)
+{
+ peering_state.init(
+ role, newup, new_up_primary, newacting,
+ new_acting_primary, history, pi, t);
+}
+
+seastar::future<> PG::read_state(crimson::os::FuturizedStore::Shard* store)
+{
+ if (__builtin_expect(stopping, false)) {
+ return seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception());
+ }
+
+ return seastar::do_with(PGMeta(*store, pgid), [] (auto& pg_meta) {
+ return pg_meta.load();
+ }).then([this, store](auto&& ret) {
+ auto [pg_info, past_intervals] = std::move(ret);
+ return peering_state.init_from_disk_state(
+ std::move(pg_info),
+ std::move(past_intervals),
+ [this, store] (PGLog &pglog) {
+ return pglog.read_log_and_missing_crimson(
+ *store,
+ coll_ref,
+ peering_state.get_info(),
+ pgmeta_oid);
+ });
+ }).then([this]() {
+ int primary, up_primary;
+ vector<int> acting, up;
+ peering_state.get_osdmap()->pg_to_up_acting_osds(
+ pgid.pgid, &up, &up_primary, &acting, &primary);
+ peering_state.init_primary_up_acting(
+ up,
+ acting,
+ up_primary,
+ primary);
+ int rr = OSDMap::calc_pg_role(pg_whoami, acting);
+ peering_state.set_role(rr);
+
+ epoch_t epoch = get_osdmap_epoch();
+ (void) shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ pg_whoami,
+ pgid,
+ epoch,
+ epoch,
+ PeeringState::Initialize());
+
+ return seastar::now();
+ });
+}
+
+PG::interruptible_future<> PG::do_peering_event(
+ PGPeeringEvent& evt, PeeringCtx &rctx)
+{
+ if (peering_state.pg_has_reset_since(evt.get_epoch_requested()) ||
+ peering_state.pg_has_reset_since(evt.get_epoch_sent())) {
+ logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
+ return interruptor::now();
+ } else {
+ logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
+ // all peering event handling needs to be run in a dedicated seastar::thread,
+ // so that event processing can involve I/O reqs freely, for example: PG::on_removal,
+ // PG::on_new_interval
+ return interruptor::async([this, &evt, &rctx] {
+ peering_state.handle_event(
+ evt.get_event(),
+ &rctx);
+ peering_state.write_if_dirty(rctx.transaction);
+ });
+ }
+}
+
+seastar::future<> PG::handle_advance_map(
+ cached_map_t next_map, PeeringCtx &rctx)
+{
+ return seastar::async([this, next_map=std::move(next_map), &rctx] {
+ vector<int> newup, newacting;
+ int up_primary, acting_primary;
+ next_map->pg_to_up_acting_osds(
+ pgid.pgid,
+ &newup, &up_primary,
+ &newacting, &acting_primary);
+ peering_state.advance_map(
+ next_map,
+ peering_state.get_osdmap(),
+ newup,
+ up_primary,
+ newacting,
+ acting_primary,
+ rctx);
+ osdmap_gate.got_map(next_map->get_epoch());
+ });
+}
+
+seastar::future<> PG::handle_activate_map(PeeringCtx &rctx)
+{
+ return seastar::async([this, &rctx] {
+ peering_state.activate_map(rctx);
+ });
+}
+
+seastar::future<> PG::handle_initialize(PeeringCtx &rctx)
+{
+ return seastar::async([this, &rctx] {
+ peering_state.handle_event(PeeringState::Initialize{}, &rctx);
+ });
+}
+
+
+void PG::print(ostream& out) const
+{
+ out << peering_state << " ";
+}
+
+void PG::dump_primary(Formatter* f)
+{
+ peering_state.dump_peering_state(f);
+
+ f->open_array_section("recovery_state");
+ PeeringState::QueryState q(f);
+ peering_state.handle_event(q, 0);
+ f->close_section();
+
+ // TODO: snap_trimq
+ // TODO: scrubber state
+ // TODO: agent state
+}
+
+std::ostream& operator<<(std::ostream& os, const PG& pg)
+{
+ os << " pg_epoch " << pg.get_osdmap_epoch() << " ";
+ pg.print(os);
+ return os;
+}
+
+std::tuple<PG::interruptible_future<>,
+ PG::interruptible_future<>>
+PG::submit_transaction(
+ ObjectContextRef&& obc,
+ ceph::os::Transaction&& txn,
+ osd_op_params_t&& osd_op_p,
+ std::vector<pg_log_entry_t>&& log_entries)
+{
+ if (__builtin_expect(stopping, false)) {
+ return {seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception()),
+ seastar::now()};
+ }
+
+ epoch_t map_epoch = get_osdmap_epoch();
+ ceph_assert(!has_reset_since(osd_op_p.at_version.epoch));
+
+ peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version);
+ peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
+ txn, true, false);
+
+ auto [submitted, all_completed] = backend->mutate_object(
+ peering_state.get_acting_recovery_backfill(),
+ std::move(obc),
+ std::move(txn),
+ std::move(osd_op_p),
+ peering_state.get_last_peering_reset(),
+ map_epoch,
+ std::move(log_entries));
+ return std::make_tuple(std::move(submitted), all_completed.then_interruptible(
+ [this, last_complete=peering_state.get_info().last_complete,
+ at_version=osd_op_p.at_version](auto acked) {
+ for (const auto& peer : acked) {
+ peering_state.update_peer_last_complete_ondisk(
+ peer.shard, peer.last_complete_ondisk);
+ }
+ peering_state.complete_write(at_version, last_complete);
+ return seastar::now();
+ }));
+}
+
+PG::interruptible_future<> PG::repair_object(
+ const hobject_t& oid,
+ eversion_t& v)
+{
+ // see also PrimaryLogPG::rep_repair_primary_object()
+ assert(is_primary());
+ logger().debug("{}: {} peers osd.{}", __func__, oid, get_acting_recovery_backfill());
+ // Add object to PG's missing set if it isn't there already
+ assert(!get_local_missing().is_missing(oid));
+ peering_state.force_object_missing(pg_whoami, oid, v);
+ auto [op, fut] = get_shard_services().start_operation<UrgentRecovery>(
+ oid, v, this, get_shard_services(), get_osdmap_epoch());
+ return std::move(fut);
+}
+
+template <class Ret, class SuccessFunc, class FailureFunc>
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>>
+PG::do_osd_ops_execute(
+ seastar::lw_shared_ptr<OpsExecuter> ox,
+ std::vector<OSDOp>& ops,
+ SuccessFunc&& success_func,
+ FailureFunc&& failure_func)
+{
+ assert(ox);
+ auto rollbacker = ox->create_rollbacker([this] (auto& obc) {
+ return obc_loader.reload_obc(obc).handle_error_interruptible(
+ load_obc_ertr::assert_all{"can't live with object state messed up"});
+ });
+ auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
+ return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) {
+ logger().debug(
+ "do_osd_ops_execute: object {} - handling op {}",
+ ox->get_target(),
+ ceph_osd_op_name(osd_op.op.op));
+ return ox->execute_op(osd_op);
+ }).safe_then_interruptible([this, ox, &ops] {
+ logger().debug(
+ "do_osd_ops_execute: object {} all operations successful",
+ ox->get_target());
+ // check for full
+ if ((ox->delta_stats.num_bytes > 0 ||
+ ox->delta_stats.num_objects > 0) &&
+ get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) {
+ const auto& m = ox->get_message();
+ if (m.get_reqid().name.is_mds() || // FIXME: ignore MDS for now
+ m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) {
+ logger().info(" full, but proceeding due to FULL_FORCE or MDS");
+ } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) {
+ // they tried, they failed.
+ logger().info(" full, replying to FULL_TRY op");
+ if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA))
+ return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>(
+ seastar::now(),
+ OpsExecuter::osd_op_ierrorator::future<>(
+ crimson::ct_error::edquot::make()));
+ else
+ return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>(
+ seastar::now(),
+ OpsExecuter::osd_op_ierrorator::future<>(
+ crimson::ct_error::enospc::make()));
+ } else {
+ // drop request
+ logger().info(" full, dropping request (bad client)");
+ return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>(
+ seastar::now(),
+ OpsExecuter::osd_op_ierrorator::future<>(
+ crimson::ct_error::eagain::make()));
+ }
+ }
+ return std::move(*ox).flush_changes_n_do_ops_effects(
+ ops,
+ snap_mapper,
+ osdriver,
+ [this] (auto&& txn,
+ auto&& obc,
+ auto&& osd_op_p,
+ auto&& log_entries) {
+ logger().debug(
+ "do_osd_ops_execute: object {} submitting txn",
+ obc->get_oid());
+ return submit_transaction(
+ std::move(obc),
+ std::move(txn),
+ std::move(osd_op_p),
+ std::move(log_entries));
+ });
+ }).safe_then_unpack_interruptible(
+ [success_func=std::move(success_func), rollbacker, this, failure_func_ptr]
+ (auto submitted_fut, auto all_completed_fut) mutable {
+ return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
+ std::move(submitted_fut),
+ all_completed_fut.safe_then_interruptible_tuple(
+ std::move(success_func),
+ crimson::ct_error::object_corrupted::handle(
+ [rollbacker, this] (const std::error_code& e) mutable {
+ // this is a path for EIO. it's special because we want to fix the obejct
+ // and try again. that is, the layer above `PG::do_osd_ops` is supposed to
+ // restart the execution.
+ return rollbacker.rollback_obc_if_modified(e).then_interruptible(
+ [obc=rollbacker.get_obc(), this] {
+ return repair_object(obc->obs.oi.soid,
+ obc->obs.oi.version).then_interruptible([] {
+ return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()};
+ });
+ });
+ }), OpsExecuter::osd_op_errorator::all_same_way(
+ [rollbacker, failure_func_ptr]
+ (const std::error_code& e) mutable {
+ return rollbacker.rollback_obc_if_modified(e).then_interruptible(
+ [e, failure_func_ptr] {
+ return (*failure_func_ptr)(e);
+ });
+ })
+ )
+ );
+ }, OpsExecuter::osd_op_errorator::all_same_way(
+ [rollbacker, failure_func_ptr]
+ (const std::error_code& e) mutable {
+ return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
+ seastar::now(),
+ e.value() == ENOENT ? (*failure_func_ptr)(e) :
+ rollbacker.rollback_obc_if_modified(e).then_interruptible(
+ [e, failure_func_ptr] {
+ return (*failure_func_ptr)(e);
+ }));
+ }));
+}
+seastar::future<> PG::submit_error_log(
+ Ref<MOSDOp> m,
+ const OpInfo &op_info,
+ ObjectContextRef obc,
+ const std::error_code e,
+ ceph_tid_t rep_tid,
+ eversion_t &version)
+{
+ const osd_reqid_t &reqid = m->get_reqid();
+ mempool::osd_pglog::list<pg_log_entry_t> log_entries;
+ log_entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR,
+ obc->obs.oi.soid,
+ next_version(),
+ eversion_t(), 0,
+ reqid, utime_t(),
+ -e.value()));
+ if (op_info.allows_returnvec()) {
+ log_entries.back().set_op_returns(m->ops);
+ }
+ ceph_assert(is_primary());
+ if (!log_entries.empty()) {
+ ceph_assert(log_entries.rbegin()->version >= projected_last_update);
+ version = projected_last_update = log_entries.rbegin()->version;
+ }
+ ceph::os::Transaction t;
+ peering_state.merge_new_log_entries(
+ log_entries, t, peering_state.get_pg_trim_to(),
+ peering_state.get_min_last_complete_ondisk());
+
+ set<pg_shard_t> waiting_on;
+ for (auto &i : get_acting_recovery_backfill()) {
+ pg_shard_t peer(i);
+ if (peer == pg_whoami) continue;
+ ceph_assert(peering_state.get_peer_missing().count(peer));
+ ceph_assert(peering_state.has_peer_info(peer));
+ auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
+ log_entries,
+ spg_t(peering_state.get_info().pgid.pgid, i.shard),
+ pg_whoami.shard,
+ get_osdmap_epoch(),
+ get_last_peering_reset(),
+ rep_tid,
+ peering_state.get_pg_trim_to(),
+ peering_state.get_min_last_complete_ondisk());
+ send_cluster_message(peer.osd, std::move(log_m), get_osdmap_epoch());
+ waiting_on.insert(peer);
+ }
+ waiting_on.insert(pg_whoami);
+ log_entry_update_waiting_on.insert(
+ std::make_pair(rep_tid, log_update_t{std::move(waiting_on)}));
+ return shard_services.get_store().do_transaction(
+ get_collection_ref(), std::move(t))
+ .then([this] {
+ peering_state.update_trim_to();
+ return seastar::now();
+ });
+}
+
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
+PG::do_osd_ops(
+ Ref<MOSDOp> m,
+ crimson::net::ConnectionRef conn,
+ ObjectContextRef obc,
+ const OpInfo &op_info,
+ const SnapContext& snapc)
+{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+ return do_osd_ops_execute<MURef<MOSDOpReply>>(
+ seastar::make_lw_shared<OpsExecuter>(
+ Ref<PG>{this}, obc, op_info, *m, conn, snapc),
+ m->ops,
+ [this, m, obc, may_write = op_info.may_write(),
+ may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] {
+ // TODO: should stop at the first op which returns a negative retval,
+ // cmpext uses it for returning the index of first unmatched byte
+ int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
+ if (may_read && result >= 0) {
+ for (auto &osdop : m->ops) {
+ if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
+ result = osdop.rval.code;
+ break;
+ }
+ }
+ } else if (result > 0 && may_write && !rvec) {
+ result = 0;
+ } else if (result < 0 && (m->ops.empty() ?
+ 0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
+ result = 0;
+ }
+ auto reply = crimson::make_message<MOSDOpReply>(m.get(),
+ result,
+ get_osdmap_epoch(),
+ 0,
+ false);
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ logger().debug(
+ "do_osd_ops: {} - object {} sending reply",
+ *m,
+ m->get_hobj());
+ if (obc->obs.exists) {
+ reply->set_reply_versions(peering_state.get_info().last_update,
+ obc->obs.oi.user_version);
+ } else {
+ reply->set_reply_versions(peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
+ }
+ return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
+ std::move(reply));
+ },
+ [m, &op_info, obc, this] (const std::error_code& e) {
+ return seastar::do_with(eversion_t(), [m, &op_info, obc, e, this](auto &version) {
+ auto fut = seastar::now();
+ epoch_t epoch = get_osdmap_epoch();
+ ceph_tid_t rep_tid = shard_services.get_tid();
+ auto last_complete = peering_state.get_info().last_complete;
+ if (op_info.may_write()) {
+ fut = submit_error_log(m, op_info, obc, e, rep_tid, version);
+ }
+ return fut.then([m, e, epoch, &op_info, rep_tid, &version, last_complete, this] {
+ auto log_reply = [m, e, this] {
+ auto reply = crimson::make_message<MOSDOpReply>(
+ m.get(), -e.value(), get_osdmap_epoch(), 0, false);
+ if (m->ops.empty() ? 0 :
+ m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
+ reply->set_result(0);
+ }
+ // For all ops except for CMPEXT, the correct error value is encoded
+ // in e.value(). For CMPEXT, osdop.rval has the actual error value.
+ if (e.value() == ct_error::cmp_fail_error_value) {
+ assert(!m->ops.empty());
+ for (auto &osdop : m->ops) {
+ if (osdop.rval < 0) {
+ reply->set_result(osdop.rval);
+ break;
+ }
+ }
+ }
+ reply->set_enoent_reply_versions(
+ peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
+ std::move(reply));
+ };
+
+ if (!peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
+ auto it = log_entry_update_waiting_on.find(rep_tid);
+ ceph_assert(it != log_entry_update_waiting_on.end());
+ auto it2 = it->second.waiting_on.find(pg_whoami);
+ ceph_assert(it2 != it->second.waiting_on.end());
+ it->second.waiting_on.erase(it2);
+
+ if (it->second.waiting_on.empty()) {
+ log_entry_update_waiting_on.erase(it);
+ if (version != eversion_t()) {
+ peering_state.complete_write(version, last_complete);
+ }
+ return log_reply();
+ } else {
+ return it->second.all_committed.get_shared_future()
+ .then([this, &version, last_complete, log_reply = std::move(log_reply)] {
+ if (version != eversion_t()) {
+ peering_state.complete_write(version, last_complete);
+ }
+ return log_reply();
+ });
+ }
+ } else {
+ return log_reply();
+ }
+ });
+ });
+ });
+}
+
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>
+PG::do_osd_ops(
+ ObjectContextRef obc,
+ std::vector<OSDOp>& ops,
+ const OpInfo &op_info,
+ const do_osd_ops_params_t &&msg_params,
+ do_osd_ops_success_func_t success_func,
+ do_osd_ops_failure_func_t failure_func)
+{
+ // This overload is generally used for internal client requests,
+ // use an empty SnapContext.
+ return seastar::do_with(
+ std::move(msg_params),
+ [=, this, &ops, &op_info](auto &msg_params) {
+ return do_osd_ops_execute<void>(
+ seastar::make_lw_shared<OpsExecuter>(
+ Ref<PG>{this},
+ std::move(obc),
+ op_info,
+ msg_params,
+ msg_params.get_connection(),
+ SnapContext{}
+ ),
+ ops,
+ std::move(success_func),
+ std::move(failure_func));
+ });
+}
+
+PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
+{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
+ auto ox = std::make_unique<PgOpsExecuter>(std::as_const(*this),
+ std::as_const(*m));
+ return interruptor::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) {
+ logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op));
+ return ox->execute_op(osd_op);
+ }).then_interruptible([m, this, ox = std::move(ox)] {
+ auto reply = crimson::make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
+ CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
+ false);
+ reply->claim_op_out_data(m->ops);
+ reply->set_reply_versions(peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
+ return seastar::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
+ }).handle_exception_type_interruptible([=, this](const crimson::osd::error& e) {
+ auto reply = crimson::make_message<MOSDOpReply>(
+ m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
+ reply->set_enoent_reply_versions(peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
+ return seastar::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
+ });
+}
+
+hobject_t PG::get_oid(const hobject_t& hobj)
+{
+ return hobj.snap == CEPH_SNAPDIR ? hobj.get_head() : hobj;
+}
+
+RWState::State PG::get_lock_type(const OpInfo &op_info)
+{
+
+ if (op_info.rwordered() && op_info.may_read()) {
+ return RWState::RWEXCL;
+ } else if (op_info.rwordered()) {
+ return RWState::RWWRITE;
+ } else {
+ ceph_assert(op_info.may_read());
+ return RWState::RWREAD;
+ }
+}
+
+void PG::check_blocklisted_obc_watchers(
+ ObjectContextRef &obc)
+{
+ if (obc->watchers.empty()) {
+ for (auto &[src, winfo] : obc->obs.oi.watchers) {
+ auto watch = crimson::osd::Watch::create(
+ obc, winfo, src.second, this);
+ watch->disconnect();
+ auto [it, emplaced] = obc->watchers.emplace(src, std::move(watch));
+ assert(emplaced);
+ logger().debug("added watch for obj {}, client {}",
+ obc->get_oid(), src.second);
+ }
+ }
+}
+
+PG::load_obc_iertr::future<>
+PG::with_locked_obc(const hobject_t &hobj,
+ const OpInfo &op_info,
+ with_obc_func_t &&f)
+{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+ const hobject_t oid = get_oid(hobj);
+ auto wrapper = [f=std::move(f), this](auto obc) {
+ check_blocklisted_obc_watchers(obc);
+ return f(obc);
+ };
+ switch (get_lock_type(op_info)) {
+ case RWState::RWREAD:
+ return obc_loader.with_obc<RWState::RWREAD>(oid, std::move(wrapper));
+ case RWState::RWWRITE:
+ return obc_loader.with_obc<RWState::RWWRITE>(oid, std::move(wrapper));
+ case RWState::RWEXCL:
+ return obc_loader.with_obc<RWState::RWEXCL>(oid, std::move(wrapper));
+ default:
+ ceph_abort();
+ };
+}
+
+PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
+{
+ if (__builtin_expect(stopping, false)) {
+ return seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception());
+ }
+
+ logger().debug("{}: {}", __func__, *req);
+ if (can_discard_replica_op(*req)) {
+ return seastar::now();
+ }
+
+ ceph::os::Transaction txn;
+ auto encoded_txn = req->get_data().cbegin();
+ decode(txn, encoded_txn);
+ auto p = req->logbl.cbegin();
+ std::vector<pg_log_entry_t> log_entries;
+ decode(log_entries, p);
+ log_operation(std::move(log_entries),
+ req->pg_trim_to,
+ req->version,
+ req->min_last_complete_ondisk,
+ !txn.empty(),
+ txn,
+ false);
+ logger().debug("PG::handle_rep_op: do_transaction...");
+ return interruptor::make_interruptible(shard_services.get_store().do_transaction(
+ coll_ref, std::move(txn))).then_interruptible(
+ [req, lcod=peering_state.get_info().last_complete, this] {
+ peering_state.update_last_complete_ondisk(lcod);
+ const auto map_epoch = get_osdmap_epoch();
+ auto reply = crimson::make_message<MOSDRepOpReply>(
+ req.get(), pg_whoami, 0,
+ map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
+ reply->set_last_complete_ondisk(lcod);
+ return shard_services.send_to_osd(req->from.osd, std::move(reply), map_epoch);
+ });
+}
+
+void PG::log_operation(
+ std::vector<pg_log_entry_t>&& logv,
+ const eversion_t &trim_to,
+ const eversion_t &roll_forward_to,
+ const eversion_t &min_last_complete_ondisk,
+ bool transaction_applied,
+ ObjectStore::Transaction &txn,
+ bool async) {
+ logger().debug("{}", __func__);
+ if (is_primary()) {
+ ceph_assert(trim_to <= peering_state.get_last_update_ondisk());
+ }
+ /* TODO: when we add snap mapper and projected log support,
+ * we'll likely want to update them here.
+ *
+ * See src/osd/PrimaryLogPG.h:log_operation for how classic
+ * handles these cases.
+ */
+#if 0
+ if (transaction_applied) {
+ //TODO:
+ //update_snap_map(logv, t);
+ }
+ auto last = logv.rbegin();
+ if (is_primary() && last != logv.rend()) {
+ projected_log.skip_can_rollback_to_to_head();
+ projected_log.trim(cct, last->version, nullptr, nullptr, nullptr);
+ }
+#endif
+ if (!is_primary()) { // && !is_ec_pg()
+ replica_clear_repop_obc(logv);
+ }
+ peering_state.append_log(std::move(logv),
+ trim_to,
+ roll_forward_to,
+ min_last_complete_ondisk,
+ txn,
+ !txn.empty(),
+ false);
+}
+
+void PG::replica_clear_repop_obc(
+ const std::vector<pg_log_entry_t> &logv) {
+ logger().debug("{} clearing {} entries", __func__, logv.size());
+ for (auto &&e: logv) {
+ logger().debug(" {} get_object_boundary(from): {} "
+ " head version(to): {}",
+ e.soid,
+ e.soid.get_object_boundary(),
+ e.soid.get_head());
+ /* Have to blast all clones, they share a snapset */
+ obc_registry.clear_range(
+ e.soid.get_object_boundary(), e.soid.get_head());
+ }
+}
+
+void PG::handle_rep_op_reply(const MOSDRepOpReply& m)
+{
+ if (!can_discard_replica_op(m)) {
+ backend->got_rep_op_reply(m);
+ }
+}
+
+PG::interruptible_future<> PG::do_update_log_missing(
+ Ref<MOSDPGUpdateLogMissing> m,
+ crimson::net::ConnectionRef conn)
+{
+ if (__builtin_expect(stopping, false)) {
+ return seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception());
+ }
+
+ ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
+ ObjectStore::Transaction t;
+ std::optional<eversion_t> op_trim_to, op_roll_forward_to;
+ if (m->pg_trim_to != eversion_t())
+ op_trim_to = m->pg_trim_to;
+ if (m->pg_roll_forward_to != eversion_t())
+ op_roll_forward_to = m->pg_roll_forward_to;
+ logger().debug("op_trim_to = {}, op_roll_forward_to = {}",
+ op_trim_to, op_roll_forward_to);
+
+ peering_state.append_log_entries_update_missing(
+ m->entries, t, op_trim_to, op_roll_forward_to);
+
+ return interruptor::make_interruptible(shard_services.get_store().do_transaction(
+ coll_ref, std::move(t))).then_interruptible(
+ [m, conn, lcod=peering_state.get_info().last_complete, this] {
+ if (!peering_state.pg_has_reset_since(m->get_epoch())) {
+ peering_state.update_last_complete_ondisk(lcod);
+ auto reply =
+ crimson::make_message<MOSDPGUpdateLogMissingReply>(
+ spg_t(peering_state.get_info().pgid.pgid, get_primary().shard),
+ pg_whoami.shard,
+ m->get_epoch(),
+ m->min_epoch,
+ m->get_tid(),
+ lcod);
+ reply->set_priority(CEPH_MSG_PRIO_HIGH);
+ return conn->send(std::move(reply));
+ }
+ return seastar::now();
+ });
+}
+
+
+PG::interruptible_future<> PG::do_update_log_missing_reply(
+ Ref<MOSDPGUpdateLogMissingReply> m)
+{
+ logger().debug("{}: got reply from {}", __func__, m->get_from());
+
+ auto it = log_entry_update_waiting_on.find(m->get_tid());
+ if (it != log_entry_update_waiting_on.end()) {
+ if (it->second.waiting_on.count(m->get_from())) {
+ it->second.waiting_on.erase(m->get_from());
+ if (m->last_complete_ondisk != eversion_t()) {
+ peering_state.update_peer_last_complete_ondisk(
+ m->get_from(), m->last_complete_ondisk);
+ }
+ } else {
+ logger().error("{} : {} got reply {} from shard we are not waiting for ",
+ __func__, peering_state.get_info().pgid, *m, m->get_from());
+ }
+
+ if (it->second.waiting_on.empty()) {
+ it->second.all_committed.set_value();
+ it->second.all_committed = {};
+ log_entry_update_waiting_on.erase(it);
+ }
+ } else {
+ logger().error("{} : {} got reply {} on unknown tid {}",
+ __func__, peering_state.get_info().pgid, *m, m->get_tid());
+ }
+ return seastar::now();
+}
+
+bool PG::old_peering_msg(
+ const epoch_t reply_epoch,
+ const epoch_t query_epoch) const
+{
+ if (const epoch_t lpr = peering_state.get_last_peering_reset();
+ lpr > reply_epoch || lpr > query_epoch) {
+ logger().debug("{}: pg changed {} lpr {}, reply_epoch {}, query_epoch {}",
+ __func__, get_info().history, lpr, reply_epoch, query_epoch);
+ return true;
+ }
+ return false;
+}
+
+bool PG::can_discard_replica_op(const Message& m, epoch_t m_map_epoch) const
+{
+ // if a repop is replied after a replica goes down in a new osdmap, and
+ // before the pg advances to this new osdmap, the repop replies before this
+ // repop can be discarded by that replica OSD, because the primary resets the
+ // connection to it when handling the new osdmap marking it down, and also
+ // resets the messenger sesssion when the replica reconnects. to avoid the
+ // out-of-order replies, the messages from that replica should be discarded.
+ const auto osdmap = peering_state.get_osdmap();
+ const int from_osd = m.get_source().num();
+ if (osdmap->is_down(from_osd)) {
+ return true;
+ }
+ // Mostly, this overlaps with the old_peering_msg
+ // condition. An important exception is pushes
+ // sent by replicas not in the acting set, since
+ // if such a replica goes down it does not cause
+ // a new interval.
+ if (osdmap->get_down_at(from_osd) >= m_map_epoch) {
+ return true;
+ }
+ // same pg?
+ // if pg changes *at all*, we reset and repeer!
+ return old_peering_msg(m_map_epoch, m_map_epoch);
+}
+
+seastar::future<> PG::stop()
+{
+ logger().info("PG {} {}", pgid, __func__);
+ stopping = true;
+ cancel_local_background_io_reservation();
+ cancel_remote_recovery_reservation();
+ check_readable_timer.cancel();
+ renew_lease_timer.cancel();
+ return osdmap_gate.stop().then([this] {
+ return wait_for_active_blocker.stop();
+ }).then([this] {
+ return recovery_handler->stop();
+ }).then([this] {
+ return recovery_backend->stop();
+ }).then([this] {
+ return backend->stop();
+ });
+}
+
+void PG::on_change(ceph::os::Transaction &t) {
+ logger().debug("{} {}:", *this, __func__);
+ context_registry_on_change();
+ obc_loader.notify_on_change(is_primary());
+ recovery_backend->on_peering_interval_change(t);
+ backend->on_actingset_changed(is_primary());
+ wait_for_active_blocker.unblock();
+ if (is_primary()) {
+ logger().debug("{} {}: requeueing", *this, __func__);
+ client_request_orderer.requeue(shard_services, this);
+ } else {
+ logger().debug("{} {}: dropping requests", *this, __func__);
+ client_request_orderer.clear_and_cancel();
+ }
+}
+
+void PG::context_registry_on_change() {
+ obc_registry.for_each([](ObjectContextRef obc) {
+ assert(obc);
+ for (auto j = obc->watchers.begin();
+ j != obc->watchers.end();
+ j = obc->watchers.erase(j)) {
+ j->second->discard_state();
+ }
+ });
+}
+
+bool PG::can_discard_op(const MOSDOp& m) const {
+ if (m.get_map_epoch() <
+ peering_state.get_info().history.same_primary_since) {
+ logger().debug("{} changed after {} dropping {} ",
+ __func__ , m.get_map_epoch(), m);
+ return true;
+ }
+
+ if ((m.get_flags() & (CEPH_OSD_FLAG_BALANCE_READS |
+ CEPH_OSD_FLAG_LOCALIZE_READS))
+ && !is_primary()
+ && (m.get_map_epoch() <
+ peering_state.get_info().history.same_interval_since))
+ {
+ // Note: the Objecter will resend on interval change without the primary
+ // changing if it actually sent to a replica. If the primary hasn't
+ // changed since the send epoch, we got it, and we're primary, it won't
+ // have resent even if the interval did change as it sent it to the primary
+ // (us).
+ return true;
+ }
+ return __builtin_expect(m.get_map_epoch()
+ < peering_state.get_info().history.same_primary_since, false);
+}
+
+bool PG::is_degraded_or_backfilling_object(const hobject_t& soid) const {
+ /* The conditions below may clear (on_local_recover, before we queue
+ * the transaction) before we actually requeue the degraded waiters
+ * in on_global_recover after the transaction completes.
+ */
+ if (peering_state.get_pg_log().get_missing().get_items().count(soid))
+ return true;
+ ceph_assert(!get_acting_recovery_backfill().empty());
+ for (auto& peer : get_acting_recovery_backfill()) {
+ if (peer == get_primary()) continue;
+ auto peer_missing_entry = peering_state.get_peer_missing().find(peer);
+ // If an object is missing on an async_recovery_target, return false.
+ // This will not block the op and the object is async recovered later.
+ if (peer_missing_entry != peering_state.get_peer_missing().end() &&
+ peer_missing_entry->second.get_items().count(soid)) {
+ return true;
+ }
+ // Object is degraded if after last_backfill AND
+ // we are backfilling it
+ if (is_backfill_target(peer) &&
+ peering_state.get_peer_info(peer).last_backfill <= soid &&
+ recovery_handler->backfill_state &&
+ recovery_handler->backfill_state->get_last_backfill_started() >= soid &&
+ recovery_backend->is_recovering(soid)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+PG::interruptible_future<std::optional<PG::complete_op_t>>
+PG::already_complete(const osd_reqid_t& reqid)
+{
+ eversion_t version;
+ version_t user_version;
+ int ret;
+ std::vector<pg_log_op_return_item_t> op_returns;
+
+ if (peering_state.get_pg_log().get_log().get_request(
+ reqid, &version, &user_version, &ret, &op_returns)) {
+ complete_op_t dupinfo{
+ user_version,
+ version,
+ ret};
+ return backend->request_committed(reqid, version).then([dupinfo] {
+ return seastar::make_ready_future<std::optional<complete_op_t>>(dupinfo);
+ });
+ } else {
+ return seastar::make_ready_future<std::optional<complete_op_t>>(std::nullopt);
+ }
+}
+
+}