diff options
Diffstat (limited to '')
-rw-r--r-- | src/crimson/osd/replicated_backend.cc | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc new file mode 100644 index 000000000..0ff4ad573 --- /dev/null +++ b/src/crimson/osd/replicated_backend.cc @@ -0,0 +1,174 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "replicated_backend.h" + +#include "messages/MOSDRepOpReply.h" + +#include "crimson/common/exception.h" +#include "crimson/common/log.h" +#include "crimson/os/futurized_store.h" +#include "crimson/osd/shard_services.h" +#include "osd/PeeringState.h" + +SET_SUBSYS(osd); + +ReplicatedBackend::ReplicatedBackend(pg_t pgid, + pg_shard_t whoami, + ReplicatedBackend::CollectionRef coll, + crimson::osd::ShardServices& shard_services, + DoutPrefixProvider &dpp) + : PGBackend{whoami.shard, coll, shard_services, dpp}, + pgid{pgid}, + whoami{whoami} +{} + +ReplicatedBackend::ll_read_ierrorator::future<ceph::bufferlist> +ReplicatedBackend::_read(const hobject_t& hoid, + const uint64_t off, + const uint64_t len, + const uint32_t flags) +{ + return store->read(coll, ghobject_t{hoid}, off, len, flags); +} + +ReplicatedBackend::rep_op_fut_t +ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards, + const hobject_t& hoid, + ceph::os::Transaction&& txn, + osd_op_params_t&& osd_op_p, + epoch_t min_epoch, epoch_t map_epoch, + std::vector<pg_log_entry_t>&& log_entries) +{ + LOG_PREFIX(ReplicatedBackend::_submit_transaction); + + const ceph_tid_t tid = shard_services.get_tid(); + auto pending_txn = + pending_trans.try_emplace(tid, pg_shards.size(), osd_op_p.at_version).first; + bufferlist encoded_txn; + encode(txn, encoded_txn); + + DEBUGDPP("object {}", dpp, hoid); + auto all_completed = interruptor::make_interruptible( + shard_services.get_store().do_transaction(coll, std::move(txn)) + ).then_interruptible([FNAME, this, + peers=pending_txn->second.weak_from_this()] { + if (!peers) { + // for now, only actingset_changed can cause peers + // to be nullptr + ERRORDPP("peers is null, this should be impossible", dpp); + assert(0 == "impossible"); + } + if (--peers->pending == 0) { + peers->all_committed.set_value(); + peers->all_committed = {}; + return seastar::now(); + } + return peers->all_committed.get_shared_future(); + }).then_interruptible([pending_txn, this] { + auto acked_peers = std::move(pending_txn->second.acked_peers); + pending_trans.erase(pending_txn); + return seastar::make_ready_future<crimson::osd::acked_peers_t>(std::move(acked_peers)); + }); + + auto sends = std::make_unique<std::vector<seastar::future<>>>(); + for (auto pg_shard : pg_shards) { + if (pg_shard != whoami) { + auto m = crimson::make_message<MOSDRepOp>( + osd_op_p.req_id, + whoami, + spg_t{pgid, pg_shard.shard}, + hoid, + CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, + map_epoch, + min_epoch, + tid, + osd_op_p.at_version); + m->set_data(encoded_txn); + pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); + encode(log_entries, m->logbl); + m->pg_trim_to = osd_op_p.pg_trim_to; + m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk; + m->set_rollback_to(osd_op_p.at_version); + // TODO: set more stuff. e.g., pg_states + sends->emplace_back(shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch)); + } + } + auto sends_complete = seastar::when_all_succeed( + sends->begin(), sends->end() + ).finally([sends=std::move(sends)] {}); + return {std::move(sends_complete), std::move(all_completed)}; +} + +void ReplicatedBackend::on_actingset_changed(bool same_primary) +{ + crimson::common::actingset_changed e_actingset_changed{same_primary}; + for (auto& [tid, pending_txn] : pending_trans) { + pending_txn.all_committed.set_exception(e_actingset_changed); + } + pending_trans.clear(); +} + +void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply) +{ + LOG_PREFIX(ReplicatedBackend::got_rep_op_reply); + auto found = pending_trans.find(reply.get_tid()); + if (found == pending_trans.end()) { + WARNDPP("cannot find rep op for message {}", dpp, reply); + return; + } + auto& peers = found->second; + for (auto& peer : peers.acked_peers) { + if (peer.shard == reply.from) { + peer.last_complete_ondisk = reply.get_last_complete_ondisk(); + if (--peers.pending == 0) { + peers.all_committed.set_value(); + peers.all_committed = {}; + } + return; + } + } +} + +seastar::future<> ReplicatedBackend::stop() +{ + LOG_PREFIX(ReplicatedBackend::stop); + INFODPP("cid {}", coll->get_cid()); + for (auto& [tid, pending_on] : pending_trans) { + pending_on.all_committed.set_exception( + crimson::common::system_shutdown_exception()); + } + pending_trans.clear(); + return seastar::now(); +} + +seastar::future<> +ReplicatedBackend::request_committed(const osd_reqid_t& reqid, + const eversion_t& at_version) +{ + if (std::empty(pending_trans)) { + return seastar::now(); + } + auto iter = pending_trans.begin(); + auto& pending_txn = iter->second; + if (pending_txn.at_version > at_version) { + return seastar::now(); + } + for (; iter->second.at_version < at_version; ++iter); + // As for now, the previous client_request with the same reqid + // mustn't have finished, as that would mean later client_requests + // has finished before earlier ones. + // + // The following line of code should be "assert(pending_txn.at_version == at_version)", + // as there can be only one transaction at any time in pending_trans due to + // PG::request_pg_pipeline. But there's a high possibility that we will + // improve the parallelism here in the future, which means there may be multiple + // client requests in flight, so we loosed the restriction to as follows. Correct + // me if I'm wrong:-) + assert(iter != pending_trans.end() && iter->second.at_version == at_version); + if (iter->second.pending) { + return iter->second.all_committed.get_shared_future(); + } else { + return seastar::now(); + } +} |