// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "replicated_backend.h" #include "messages/MOSDRepOpReply.h" #include "crimson/common/exception.h" #include "crimson/common/log.h" #include "crimson/os/futurized_store.h" #include "crimson/osd/shard_services.h" #include "osd/PeeringState.h" SET_SUBSYS(osd); ReplicatedBackend::ReplicatedBackend(pg_t pgid, pg_shard_t whoami, ReplicatedBackend::CollectionRef coll, crimson::osd::ShardServices& shard_services, DoutPrefixProvider &dpp) : PGBackend{whoami.shard, coll, shard_services, dpp}, pgid{pgid}, whoami{whoami} {} ReplicatedBackend::ll_read_ierrorator::future ReplicatedBackend::_read(const hobject_t& hoid, const uint64_t off, const uint64_t len, const uint32_t flags) { return store->read(coll, ghobject_t{hoid}, off, len, flags); } ReplicatedBackend::rep_op_fut_t ReplicatedBackend::_submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, osd_op_params_t&& osd_op_p, epoch_t min_epoch, epoch_t map_epoch, std::vector&& log_entries) { LOG_PREFIX(ReplicatedBackend::_submit_transaction); const ceph_tid_t tid = shard_services.get_tid(); auto pending_txn = pending_trans.try_emplace(tid, pg_shards.size(), osd_op_p.at_version).first; bufferlist encoded_txn; encode(txn, encoded_txn); DEBUGDPP("object {}", dpp, hoid); auto all_completed = interruptor::make_interruptible( shard_services.get_store().do_transaction(coll, std::move(txn)) ).then_interruptible([FNAME, this, peers=pending_txn->second.weak_from_this()] { if (!peers) { // for now, only actingset_changed can cause peers // to be nullptr ERRORDPP("peers is null, this should be impossible", dpp); assert(0 == "impossible"); } if (--peers->pending == 0) { peers->all_committed.set_value(); peers->all_committed = {}; return seastar::now(); } return peers->all_committed.get_shared_future(); }).then_interruptible([pending_txn, this] { auto acked_peers = std::move(pending_txn->second.acked_peers); pending_trans.erase(pending_txn); return seastar::make_ready_future(std::move(acked_peers)); }); auto sends = std::make_unique>>(); for (auto pg_shard : pg_shards) { if (pg_shard != whoami) { auto m = crimson::make_message( osd_op_p.req_id, whoami, spg_t{pgid, pg_shard.shard}, hoid, CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, map_epoch, min_epoch, tid, osd_op_p.at_version); m->set_data(encoded_txn); pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); encode(log_entries, m->logbl); m->pg_trim_to = osd_op_p.pg_trim_to; m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk; m->set_rollback_to(osd_op_p.at_version); // TODO: set more stuff. e.g., pg_states sends->emplace_back(shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch)); } } auto sends_complete = seastar::when_all_succeed( sends->begin(), sends->end() ).finally([sends=std::move(sends)] {}); return {std::move(sends_complete), std::move(all_completed)}; } void ReplicatedBackend::on_actingset_changed(bool same_primary) { crimson::common::actingset_changed e_actingset_changed{same_primary}; for (auto& [tid, pending_txn] : pending_trans) { pending_txn.all_committed.set_exception(e_actingset_changed); } pending_trans.clear(); } void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply) { LOG_PREFIX(ReplicatedBackend::got_rep_op_reply); auto found = pending_trans.find(reply.get_tid()); if (found == pending_trans.end()) { WARNDPP("cannot find rep op for message {}", dpp, reply); return; } auto& peers = found->second; for (auto& peer : peers.acked_peers) { if (peer.shard == reply.from) { peer.last_complete_ondisk = reply.get_last_complete_ondisk(); if (--peers.pending == 0) { peers.all_committed.set_value(); peers.all_committed = {}; } return; } } } seastar::future<> ReplicatedBackend::stop() { LOG_PREFIX(ReplicatedBackend::stop); INFODPP("cid {}", coll->get_cid()); for (auto& [tid, pending_on] : pending_trans) { pending_on.all_committed.set_exception( crimson::common::system_shutdown_exception()); } pending_trans.clear(); return seastar::now(); } seastar::future<> ReplicatedBackend::request_committed(const osd_reqid_t& reqid, const eversion_t& at_version) { if (std::empty(pending_trans)) { return seastar::now(); } auto iter = pending_trans.begin(); auto& pending_txn = iter->second; if (pending_txn.at_version > at_version) { return seastar::now(); } for (; iter->second.at_version < at_version; ++iter); // As for now, the previous client_request with the same reqid // mustn't have finished, as that would mean later client_requests // has finished before earlier ones. // // The following line of code should be "assert(pending_txn.at_version == at_version)", // as there can be only one transaction at any time in pending_trans due to // PG::request_pg_pipeline. But there's a high possibility that we will // improve the parallelism here in the future, which means there may be multiple // client requests in flight, so we loosed the restriction to as follows. Correct // me if I'm wrong:-) assert(iter != pending_trans.end() && iter->second.at_version == at_version); if (iter->second.pending) { return iter->second.all_committed.get_shared_future(); } else { return seastar::now(); } }