summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/osd_operations
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/osd_operations')
-rw-r--r--src/crimson/osd/osd_operations/background_recovery.cc140
-rw-r--r--src/crimson/osd/osd_operations/background_recovery.h126
-rw-r--r--src/crimson/osd/osd_operations/client_request.cc201
-rw-r--r--src/crimson/osd/osd_operations/client_request.h76
-rw-r--r--src/crimson/osd/osd_operations/compound_peering_request.cc170
-rw-r--r--src/crimson/osd/osd_operations/compound_peering_request.h40
-rw-r--r--src/crimson/osd/osd_operations/osdop_params.h27
-rw-r--r--src/crimson/osd/osd_operations/peering_event.cc173
-rw-r--r--src/crimson/osd/osd_operations/peering_event.h142
-rw-r--r--src/crimson/osd/osd_operations/pg_advance_map.cc97
-rw-r--r--src/crimson/osd/osd_operations/pg_advance_map.h50
-rw-r--r--src/crimson/osd/osd_operations/recovery_subrequest.cc29
-rw-r--r--src/crimson/osd/osd_operations/recovery_subrequest.h45
-rw-r--r--src/crimson/osd/osd_operations/replicated_request.cc74
-rw-r--r--src/crimson/osd/osd_operations/replicated_request.h58
15 files changed, 1448 insertions, 0 deletions
diff --git a/src/crimson/osd/osd_operations/background_recovery.cc b/src/crimson/osd/osd_operations/background_recovery.cc
new file mode 100644
index 000000000..126e0e902
--- /dev/null
+++ b/src/crimson/osd/osd_operations/background_recovery.cc
@@ -0,0 +1,140 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include "messages/MOSDOp.h"
+
+#include "crimson/osd/pg.h"
+#include "crimson/osd/shard_services.h"
+#include "common/Formatter.h"
+#include "crimson/osd/osd_operations/background_recovery.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+BackgroundRecovery::BackgroundRecovery(
+ Ref<PG> pg,
+ ShardServices &ss,
+ epoch_t epoch_started,
+ crimson::osd::scheduler::scheduler_class_t scheduler_class)
+ : pg(pg),
+ epoch_started(epoch_started),
+ ss(ss),
+ scheduler_class(scheduler_class)
+{}
+
+void BackgroundRecovery::print(std::ostream &lhs) const
+{
+ lhs << "BackgroundRecovery(" << pg->get_pgid() << ")";
+}
+
+void BackgroundRecovery::dump_detail(Formatter *f) const
+{
+ f->dump_stream("pgid") << pg->get_pgid();
+ f->open_object_section("recovery_detail");
+ {
+ // TODO pg->dump_recovery_state(f);
+ }
+ f->close_section();
+}
+
+seastar::future<> BackgroundRecovery::start()
+{
+ logger().debug("{}: start", *this);
+
+ IRef ref = this;
+ return ss.throttler.with_throttle_while(
+ this, get_scheduler_params(), [this] {
+ return do_recovery();
+ }).handle_exception_type([ref, this](const std::system_error& err) {
+ if (err.code() == std::make_error_code(std::errc::interrupted)) {
+ logger().debug("{} recovery interruped: {}", *pg, err.what());
+ return seastar::now();
+ }
+ return seastar::make_exception_future<>(err);
+ });
+}
+
+seastar::future<bool> UrgentRecovery::do_recovery()
+{
+ if (!pg->has_reset_since(epoch_started)) {
+ return with_blocking_future(
+ pg->get_recovery_handler()->recover_missing(soid, need)
+ ).then([] {
+ return seastar::make_ready_future<bool>(false);
+ });
+ }
+ return seastar::make_ready_future<bool>(false);
+}
+
+void UrgentRecovery::print(std::ostream &lhs) const
+{
+ lhs << "UrgentRecovery(" << pg->get_pgid() << ", "
+ << soid << ", v" << need << ")";
+}
+
+void UrgentRecovery::dump_detail(Formatter *f) const
+{
+ f->dump_stream("pgid") << pg->get_pgid();
+ f->open_object_section("recovery_detail");
+ {
+ f->dump_stream("oid") << soid;
+ f->dump_stream("version") << need;
+ }
+ f->close_section();
+}
+
+PglogBasedRecovery::PglogBasedRecovery(
+ Ref<PG> pg,
+ ShardServices &ss,
+ const epoch_t epoch_started)
+ : BackgroundRecovery(
+ std::move(pg),
+ ss,
+ epoch_started,
+ crimson::osd::scheduler::scheduler_class_t::background_recovery)
+{}
+
+seastar::future<bool> PglogBasedRecovery::do_recovery()
+{
+ if (pg->has_reset_since(epoch_started))
+ return seastar::make_ready_future<bool>(false);
+ return with_blocking_future(
+ pg->get_recovery_handler()->start_recovery_ops(
+ crimson::common::local_conf()->osd_recovery_max_single_start));
+}
+
+BackfillRecovery::BackfillRecoveryPipeline &BackfillRecovery::bp(PG &pg)
+{
+ return pg.backfill_pipeline;
+}
+
+seastar::future<bool> BackfillRecovery::do_recovery()
+{
+ logger().debug("{}", __func__);
+
+ if (pg->has_reset_since(epoch_started)) {
+ logger().debug("{}: pg got reset since epoch_started={}",
+ __func__, epoch_started);
+ return seastar::make_ready_future<bool>(false);
+ }
+ // TODO: limits
+ return with_blocking_future(
+ // process_event() of our boost::statechart machine is non-reentrant.
+ // with the backfill_pipeline we protect it from a second entry from
+ // the implementation of BackfillListener.
+ // additionally, this stage serves to synchronize with PeeringEvent.
+ handle.enter(bp(*pg).process)
+ ).then([this] {
+ pg->get_recovery_handler()->dispatch_backfill_event(std::move(evt));
+ return seastar::make_ready_future<bool>(false);
+ });
+}
+
+} // namespace crimson::osd
diff --git a/src/crimson/osd/osd_operations/background_recovery.h b/src/crimson/osd/osd_operations/background_recovery.h
new file mode 100644
index 000000000..37e46c588
--- /dev/null
+++ b/src/crimson/osd/osd_operations/background_recovery.h
@@ -0,0 +1,126 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/statechart/event_base.hpp>
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/common/type_helpers.h"
+
+#include "messages/MOSDOp.h"
+
+namespace crimson::osd {
+class PG;
+class ShardServices;
+
+class BackgroundRecovery : public OperationT<BackgroundRecovery> {
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::background_recovery;
+
+ BackgroundRecovery(
+ Ref<PG> pg,
+ ShardServices &ss,
+ epoch_t epoch_started,
+ crimson::osd::scheduler::scheduler_class_t scheduler_class);
+
+ virtual void print(std::ostream &) const;
+ seastar::future<> start();
+
+protected:
+ Ref<PG> pg;
+ const epoch_t epoch_started;
+
+private:
+ virtual void dump_detail(Formatter *f) const;
+ crimson::osd::scheduler::params_t get_scheduler_params() const {
+ return {
+ 1, // cost
+ 0, // owner
+ scheduler_class
+ };
+ }
+ virtual seastar::future<bool> do_recovery() = 0;
+ ShardServices &ss;
+ const crimson::osd::scheduler::scheduler_class_t scheduler_class;
+};
+
+/// represent a recovery initiated for serving a client request
+///
+/// unlike @c PglogBasedRecovery and @c BackfillRecovery,
+/// @c UrgentRecovery is not throttled by the scheduler. and it
+/// utilizes @c RecoveryBackend directly to recover the unreadable
+/// object.
+class UrgentRecovery final : public BackgroundRecovery {
+public:
+ UrgentRecovery(
+ const hobject_t& soid,
+ const eversion_t& need,
+ Ref<PG> pg,
+ ShardServices& ss,
+ epoch_t epoch_started)
+ : BackgroundRecovery{pg, ss, epoch_started,
+ crimson::osd::scheduler::scheduler_class_t::immediate},
+ soid{soid}, need(need) {}
+ void print(std::ostream&) const final;
+
+private:
+ void dump_detail(Formatter* f) const final;
+ seastar::future<bool> do_recovery() override;
+ const hobject_t soid;
+ const eversion_t need;
+};
+
+class PglogBasedRecovery final : public BackgroundRecovery {
+public:
+ PglogBasedRecovery(
+ Ref<PG> pg,
+ ShardServices &ss,
+ epoch_t epoch_started);
+
+private:
+ seastar::future<bool> do_recovery() override;
+};
+
+class BackfillRecovery final : public BackgroundRecovery {
+public:
+ class BackfillRecoveryPipeline {
+ OrderedPipelinePhase process = {
+ "BackfillRecovery::PGPipeline::process"
+ };
+ friend class BackfillRecovery;
+ friend class PeeringEvent;
+ };
+
+ template <class EventT>
+ BackfillRecovery(
+ Ref<PG> pg,
+ ShardServices &ss,
+ epoch_t epoch_started,
+ const EventT& evt);
+
+ static BackfillRecoveryPipeline &bp(PG &pg);
+
+private:
+ boost::intrusive_ptr<const boost::statechart::event_base> evt;
+ OrderedPipelinePhase::Handle handle;
+ seastar::future<bool> do_recovery() override;
+};
+
+template <class EventT>
+BackfillRecovery::BackfillRecovery(
+ Ref<PG> pg,
+ ShardServices &ss,
+ const epoch_t epoch_started,
+ const EventT& evt)
+ : BackgroundRecovery(
+ std::move(pg),
+ ss,
+ epoch_started,
+ crimson::osd::scheduler::scheduler_class_t::background_best_effort),
+ evt(evt.intrusive_from_this())
+{}
+
+
+}
diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc
new file mode 100644
index 000000000..87b8fc788
--- /dev/null
+++ b/src/crimson/osd/osd_operations/client_request.cc
@@ -0,0 +1,201 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+
+#include "crimson/common/exception.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+#include "common/Formatter.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_connection_priv.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+ClientRequest::ClientRequest(
+ OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m)
+ : osd(osd), conn(conn), m(m)
+{}
+
+void ClientRequest::print(std::ostream &lhs) const
+{
+ lhs << *m;
+}
+
+void ClientRequest::dump_detail(Formatter *f) const
+{
+}
+
+ClientRequest::ConnectionPipeline &ClientRequest::cp()
+{
+ return get_osd_priv(conn.get()).client_request_conn_pipeline;
+}
+
+ClientRequest::PGPipeline &ClientRequest::pp(PG &pg)
+{
+ return pg.client_request_pg_pipeline;
+}
+
+bool ClientRequest::is_pg_op() const
+{
+ return std::any_of(
+ begin(m->ops), end(m->ops),
+ [](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
+}
+
+seastar::future<> ClientRequest::start()
+{
+ logger().debug("{}: start", *this);
+
+ IRef opref = this;
+ return crimson::common::handle_system_shutdown(
+ [this, opref=std::move(opref)]() mutable {
+ return seastar::repeat([this, opref]() mutable {
+ return with_blocking_future(handle.enter(cp().await_map))
+ .then([this]() {
+ return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
+ }).then([this](epoch_t epoch) {
+ return with_blocking_future(handle.enter(cp().get_pg));
+ }).then([this] {
+ return with_blocking_future(osd.wait_for_pg(m->get_spg()));
+ }).then([this, opref](Ref<PG> pgref) {
+ PG &pg = *pgref;
+ if (pg.can_discard_op(*m)) {
+ return osd.send_incremental_map(conn, m->get_map_epoch());
+ }
+ return with_blocking_future(
+ handle.enter(pp(pg).await_map)
+ ).then([this, &pg]() mutable {
+ return with_blocking_future(
+ pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
+ }).then([this, &pg](auto map) mutable {
+ return with_blocking_future(
+ handle.enter(pp(pg).wait_for_active));
+ }).then([this, &pg]() mutable {
+ return with_blocking_future(pg.wait_for_active_blocker.wait());
+ }).then([this, pgref=std::move(pgref)]() mutable {
+ if (m->finish_decode()) {
+ m->clear_payload();
+ }
+ if (is_pg_op()) {
+ return process_pg_op(pgref);
+ } else {
+ return process_op(pgref);
+ }
+ });
+ }).then([] {
+ return seastar::stop_iteration::yes;
+ }).handle_exception_type([](crimson::common::actingset_changed& e) {
+ if (e.is_primary()) {
+ logger().debug("operation restart, acting set changed");
+ return seastar::stop_iteration::no;
+ } else {
+ logger().debug("operation abort, up primary changed");
+ return seastar::stop_iteration::yes;
+ }
+ });
+ });
+ });
+}
+
+seastar::future<> ClientRequest::process_pg_op(
+ Ref<PG> &pg)
+{
+ return pg->do_pg_ops(m)
+ .then([this, pg=std::move(pg)](Ref<MOSDOpReply> reply) {
+ return conn->send(reply);
+ });
+}
+
+seastar::future<> ClientRequest::process_op(
+ Ref<PG> &pgref)
+{
+ PG& pg = *pgref;
+ return with_blocking_future(
+ handle.enter(pp(pg).recover_missing)
+ ).then([this, &pg, pgref] {
+ eversion_t ver;
+ const hobject_t& soid = m->get_hobj();
+ logger().debug("{} check for recovery, {}", *this, soid);
+ if (pg.is_unreadable_object(soid, &ver) ||
+ pg.is_degraded_or_backfilling_object(soid)) {
+ logger().debug("{} need to wait for recovery, {}", *this, soid);
+ if (pg.get_recovery_backend()->is_recovering(soid)) {
+ return pg.get_recovery_backend()->get_recovering(soid).wait_for_recovered();
+ } else {
+ auto [op, fut] = osd.get_shard_services().start_operation<UrgentRecovery>(
+ soid, ver, pgref, osd.get_shard_services(), pg.get_osdmap_epoch());
+ return std::move(fut);
+ }
+ }
+ return seastar::now();
+ }).then([this, &pg] {
+ return with_blocking_future(handle.enter(pp(pg).get_obc));
+ }).then([this, &pg]() -> PG::load_obc_ertr::future<> {
+ op_info.set_from_op(&*m, *pg.get_osdmap());
+ return pg.with_locked_obc(m, op_info, this, [this, &pg](auto obc) {
+ return with_blocking_future(
+ handle.enter(pp(pg).process)
+ ).then([this, &pg, obc] {
+ if (!pg.is_primary()) {
+ // primary can handle both normal ops and balanced reads
+ if (is_misdirected(pg)) {
+ logger().trace("process_op: dropping misdirected op");
+ return seastar::make_ready_future<Ref<MOSDOpReply>>();
+ } else if (const hobject_t& hoid = m->get_hobj();
+ !pg.get_peering_state().can_serve_replica_read(hoid)) {
+ auto reply = make_message<MOSDOpReply>(
+ m.get(), -EAGAIN, pg.get_osdmap_epoch(),
+ m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
+ !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
+ return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ }
+ }
+ return pg.do_osd_ops(m, obc, op_info);
+ }).then([this](Ref<MOSDOpReply> reply) {
+ if (reply) {
+ return conn->send(std::move(reply));
+ } else {
+ return seastar::now();
+ }
+ });
+ });
+ }).safe_then([pgref=std::move(pgref)] {
+ return seastar::now();
+ }, PG::load_obc_ertr::all_same_way([](auto &code) {
+ logger().error("ClientRequest saw error code {}", code);
+ return seastar::now();
+ }));
+}
+
+bool ClientRequest::is_misdirected(const PG& pg) const
+{
+ // otherwise take a closer look
+ if (const int flags = m->get_flags();
+ flags & CEPH_OSD_FLAG_BALANCE_READS ||
+ flags & CEPH_OSD_FLAG_LOCALIZE_READS) {
+ if (!op_info.may_read()) {
+ // no read found, so it can't be balanced read
+ return true;
+ }
+ if (op_info.may_write() || op_info.may_cache()) {
+ // write op, but i am not primary
+ return true;
+ }
+ // balanced reads; any replica will do
+ return pg.is_nonprimary();
+ }
+ // neither balanced nor localize reads
+ return true;
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h
new file mode 100644
index 000000000..ea3124a93
--- /dev/null
+++ b/src/crimson/osd/osd_operations/client_request.h
@@ -0,0 +1,76 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "osd/osd_op_util.h"
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/common/type_helpers.h"
+#include "messages/MOSDOp.h"
+
+namespace crimson::osd {
+class PG;
+class OSD;
+
+class ClientRequest final : public OperationT<ClientRequest> {
+ OSD &osd;
+ crimson::net::ConnectionRef conn;
+ Ref<MOSDOp> m;
+ OpInfo op_info;
+ OrderedPipelinePhase::Handle handle;
+
+public:
+ class ConnectionPipeline {
+ OrderedPipelinePhase await_map = {
+ "ClientRequest::ConnectionPipeline::await_map"
+ };
+ OrderedPipelinePhase get_pg = {
+ "ClientRequest::ConnectionPipeline::get_pg"
+ };
+ friend class ClientRequest;
+ };
+ class PGPipeline {
+ OrderedPipelinePhase await_map = {
+ "ClientRequest::PGPipeline::await_map"
+ };
+ OrderedPipelinePhase wait_for_active = {
+ "ClientRequest::PGPipeline::wait_for_active"
+ };
+ OrderedPipelinePhase recover_missing = {
+ "ClientRequest::PGPipeline::recover_missing"
+ };
+ OrderedPipelinePhase get_obc = {
+ "ClientRequest::PGPipeline::get_obc"
+ };
+ OrderedPipelinePhase process = {
+ "ClientRequest::PGPipeline::process"
+ };
+ friend class ClientRequest;
+ };
+
+ static constexpr OperationTypeCode type = OperationTypeCode::client_request;
+
+ ClientRequest(OSD &osd, crimson::net::ConnectionRef, Ref<MOSDOp> &&m);
+
+ void print(std::ostream &) const final;
+ void dump_detail(Formatter *f) const final;
+
+public:
+ seastar::future<> start();
+
+private:
+ seastar::future<> process_pg_op(
+ Ref<PG> &pg);
+ seastar::future<> process_op(
+ Ref<PG> &pg);
+ bool is_pg_op() const;
+
+ ConnectionPipeline &cp();
+ PGPipeline &pp(PG &pg);
+
+private:
+ bool is_misdirected(const PG& pg) const;
+};
+
+}
diff --git a/src/crimson/osd/osd_operations/compound_peering_request.cc b/src/crimson/osd/osd_operations/compound_peering_request.cc
new file mode 100644
index 000000000..e55760096
--- /dev/null
+++ b/src/crimson/osd/osd_operations/compound_peering_request.cc
@@ -0,0 +1,170 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include "osd/PeeringState.h"
+
+#include "messages/MOSDPGQuery.h"
+#include "messages/MOSDPGCreate2.h"
+
+#include "common/Formatter.h"
+
+#include "crimson/common/exception.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_operations/compound_peering_request.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace {
+using namespace crimson::osd;
+
+struct compound_state {
+ seastar::promise<BufferedRecoveryMessages> promise;
+ // assuming crimson-osd won't need to be compatible with pre-octopus
+ // releases
+ BufferedRecoveryMessages ctx{ceph_release_t::octopus};
+ compound_state() = default;
+ ~compound_state() {
+ promise.set_value(std::move(ctx));
+ }
+};
+using compound_state_ref = seastar::lw_shared_ptr<compound_state>;
+
+class PeeringSubEvent : public RemotePeeringEvent {
+ compound_state_ref state;
+public:
+ template <typename... Args>
+ PeeringSubEvent(compound_state_ref state, Args &&... args) :
+ RemotePeeringEvent(std::forward<Args>(args)...), state(state) {}
+
+ seastar::future<> complete_rctx(Ref<crimson::osd::PG> pg) final {
+ logger().debug("{}: submitting ctx transaction", *this);
+ state->ctx.accept_buffered_messages(ctx);
+ state = {};
+ if (!pg) {
+ ceph_assert(ctx.transaction.empty());
+ return seastar::now();
+ } else {
+ return osd.get_shard_services().dispatch_context_transaction(
+ pg->get_collection_ref(), ctx);
+ }
+ }
+};
+
+std::vector<OperationRef> handle_pg_create(
+ OSD &osd,
+ crimson::net::ConnectionRef conn,
+ compound_state_ref state,
+ Ref<MOSDPGCreate2> m)
+{
+ std::vector<OperationRef> ret;
+ for (auto& [pgid, when] : m->pgs) {
+ const auto &[created, created_stamp] = when;
+ auto q = m->pg_extra.find(pgid);
+ ceph_assert(q != m->pg_extra.end());
+ auto& [history, pi] = q->second;
+ logger().debug(
+ "{}: {} e{} @{} "
+ "history {} pi {}",
+ __func__, pgid, created, created_stamp,
+ history, pi);
+ if (!pi.empty() &&
+ m->epoch < pi.get_bounds().second) {
+ logger().error(
+ "got pg_create on {} epoch {} "
+ "unmatched past_intervals {} (history {})",
+ pgid, m->epoch,
+ pi, history);
+ } else {
+ auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
+ state,
+ osd,
+ conn,
+ osd.get_shard_services(),
+ pg_shard_t(),
+ pgid,
+ m->epoch,
+ m->epoch,
+ NullEvt(),
+ true,
+ new PGCreateInfo(pgid, m->epoch, history, pi, true)).first;
+ ret.push_back(op);
+ }
+ }
+ return ret;
+}
+
+struct SubOpBlocker : BlockerT<SubOpBlocker> {
+ static constexpr const char * type_name = "CompoundOpBlocker";
+
+ std::vector<OperationRef> subops;
+ SubOpBlocker(std::vector<OperationRef> &&subops) : subops(subops) {}
+
+ virtual void dump_detail(Formatter *f) const {
+ f->open_array_section("dependent_operations");
+ {
+ for (auto &i : subops) {
+ i->dump_brief(f);
+ }
+ }
+ f->close_section();
+ }
+};
+
+} // namespace
+
+namespace crimson::osd {
+
+CompoundPeeringRequest::CompoundPeeringRequest(
+ OSD &osd, crimson::net::ConnectionRef conn, Ref<Message> m)
+ : osd(osd),
+ conn(conn),
+ m(m)
+{}
+
+void CompoundPeeringRequest::print(std::ostream &lhs) const
+{
+ lhs << *m;
+}
+
+void CompoundPeeringRequest::dump_detail(Formatter *f) const
+{
+ f->dump_stream("message") << *m;
+}
+
+seastar::future<> CompoundPeeringRequest::start()
+{
+ logger().info("{}: starting", *this);
+ auto state = seastar::make_lw_shared<compound_state>();
+ auto blocker = std::make_unique<SubOpBlocker>(
+ [&] {
+ assert((m->get_type() == MSG_OSD_PG_CREATE2));
+ return handle_pg_create(
+ osd,
+ conn,
+ state,
+ boost::static_pointer_cast<MOSDPGCreate2>(m));
+ }());
+
+ IRef ref = this;
+ logger().info("{}: about to fork future", *this);
+ return crimson::common::handle_system_shutdown(
+ [this, ref, blocker=std::move(blocker), state]() mutable {
+ return with_blocking_future(
+ blocker->make_blocking_future(state->promise.get_future())
+ ).then([this, blocker=std::move(blocker)](auto &&ctx) {
+ logger().info("{}: sub events complete", *this);
+ return osd.get_shard_services().dispatch_context_messages(std::move(ctx));
+ }).then([this, ref=std::move(ref)] {
+ logger().info("{}: complete", *this);
+ });
+ });
+}
+
+} // namespace crimson::osd
diff --git a/src/crimson/osd/osd_operations/compound_peering_request.h b/src/crimson/osd/osd_operations/compound_peering_request.h
new file mode 100644
index 000000000..495306d75
--- /dev/null
+++ b/src/crimson/osd/osd_operations/compound_peering_request.h
@@ -0,0 +1,40 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+#include <seastar/core/future.hh>
+
+#include "msg/MessageRef.h"
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+
+namespace crimson::osd {
+
+class OSD;
+class PG;
+
+using osd_id_t = int;
+
+class CompoundPeeringRequest : public OperationT<CompoundPeeringRequest> {
+public:
+ static constexpr OperationTypeCode type =
+ OperationTypeCode::compound_peering_request;
+
+private:
+ OSD &osd;
+ crimson::net::ConnectionRef conn;
+ Ref<Message> m;
+
+public:
+ CompoundPeeringRequest(
+ OSD &osd, crimson::net::ConnectionRef conn, Ref<Message> m);
+
+ void print(std::ostream &) const final;
+ void dump_detail(Formatter *f) const final;
+ seastar::future<> start();
+};
+
+}
diff --git a/src/crimson/osd/osd_operations/osdop_params.h b/src/crimson/osd/osd_operations/osdop_params.h
new file mode 100644
index 000000000..a0bd9dcbb
--- /dev/null
+++ b/src/crimson/osd/osd_operations/osdop_params.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include "messages/MOSDOp.h"
+#include "osd/osd_types.h"
+#include "crimson/common/type_helpers.h"
+
+// The fields in this struct are parameters that may be needed in multiple
+// level of processing. I inclosed all those parameters in this struct to
+// avoid passing each of them as a method parameter.
+struct osd_op_params_t {
+ Ref<MOSDOp> req;
+ eversion_t at_version;
+ eversion_t pg_trim_to;
+ eversion_t min_last_complete_ondisk;
+ eversion_t last_complete;
+ version_t user_at_version = 0;
+ bool user_modify = false;
+ ObjectCleanRegions clean_regions;
+
+ osd_op_params_t() = default;
+ osd_op_params_t(Ref<MOSDOp>&& req) : req(req) {}
+ osd_op_params_t(Ref<MOSDOp>&& req, eversion_t at_version, eversion_t pg_trim_to,
+ eversion_t mlcod, eversion_t lc, version_t user_at_version) :
+ req(req), at_version(at_version), pg_trim_to(pg_trim_to),
+ min_last_complete_ondisk(mlcod), last_complete(lc),
+ user_at_version(user_at_version) {}
+};
diff --git a/src/crimson/osd/osd_operations/peering_event.cc b/src/crimson/osd/osd_operations/peering_event.cc
new file mode 100644
index 000000000..d3c6ccf81
--- /dev/null
+++ b/src/crimson/osd/osd_operations/peering_event.cc
@@ -0,0 +1,173 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include "messages/MOSDPGLog.h"
+
+#include "common/Formatter.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_connection_priv.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+void PeeringEvent::print(std::ostream &lhs) const
+{
+ lhs << "PeeringEvent("
+ << "from=" << from
+ << " pgid=" << pgid
+ << " sent=" << evt.get_epoch_sent()
+ << " requested=" << evt.get_epoch_requested()
+ << " evt=" << evt.get_desc()
+ << ")";
+}
+
+void PeeringEvent::dump_detail(Formatter *f) const
+{
+ f->open_object_section("PeeringEvent");
+ f->dump_stream("from") << from;
+ f->dump_stream("pgid") << pgid;
+ f->dump_int("sent", evt.get_epoch_sent());
+ f->dump_int("requested", evt.get_epoch_requested());
+ f->dump_string("evt", evt.get_desc());
+ f->close_section();
+}
+
+
+PeeringEvent::PGPipeline &PeeringEvent::pp(PG &pg)
+{
+ return pg.peering_request_pg_pipeline;
+}
+
+seastar::future<> PeeringEvent::start()
+{
+
+ logger().debug("{}: start", *this);
+
+ IRef ref = this;
+ return [this] {
+ if (delay) {
+ return seastar::sleep(std::chrono::milliseconds(
+ std::lround(delay*1000)));
+ } else {
+ return seastar::now();
+ }
+ }().then([this] {
+ return get_pg();
+ }).then([this](Ref<PG> pg) {
+ if (!pg) {
+ logger().warn("{}: pg absent, did not create", *this);
+ on_pg_absent();
+ handle.exit();
+ return complete_rctx(pg);
+ } else {
+ logger().debug("{}: pg present", *this);
+ return with_blocking_future(handle.enter(pp(*pg).await_map)
+ ).then([this, pg] {
+ return with_blocking_future(
+ pg->osdmap_gate.wait_for_map(evt.get_epoch_sent()));
+ }).then([this, pg](auto) {
+ return with_blocking_future(handle.enter(pp(*pg).process));
+ }).then([this, pg] {
+ // TODO: likely we should synchronize also with the pg log-based
+ // recovery.
+ return with_blocking_future(
+ handle.enter(BackfillRecovery::bp(*pg).process));
+ }).then([this, pg] {
+ pg->do_peering_event(evt, ctx);
+ handle.exit();
+ return complete_rctx(pg);
+ }).then([this, pg] {
+ return pg->get_need_up_thru() ? shard_services.send_alive(pg->get_same_interval_since())
+ : seastar::now();
+ });
+ }
+ }).then([this] {
+ return shard_services.send_pg_temp();
+ }).then([this, ref=std::move(ref)] {
+ logger().debug("{}: complete", *this);
+ });
+}
+
+void PeeringEvent::on_pg_absent()
+{
+ logger().debug("{}: pg absent, dropping", *this);
+}
+
+seastar::future<> PeeringEvent::complete_rctx(Ref<PG> pg)
+{
+ logger().debug("{}: submitting ctx", *this);
+ return shard_services.dispatch_context(
+ pg->get_collection_ref(),
+ std::move(ctx));
+}
+
+RemotePeeringEvent::ConnectionPipeline &RemotePeeringEvent::cp()
+{
+ return get_osd_priv(conn.get()).peering_request_conn_pipeline;
+}
+
+void RemotePeeringEvent::on_pg_absent()
+{
+ if (auto& e = get_event().get_event();
+ e.dynamic_type() == MQuery::static_type()) {
+ const auto map_epoch =
+ shard_services.get_osdmap_service().get_map()->get_epoch();
+ const auto& q = static_cast<const MQuery&>(e);
+ const pg_info_t empty{spg_t{pgid.pgid, q.query.to}};
+ if (q.query.type == q.query.LOG ||
+ q.query.type == q.query.FULLLOG) {
+ auto m = ceph::make_message<MOSDPGLog>(q.query.from, q.query.to,
+ map_epoch, empty,
+ q.query.epoch_sent);
+ ctx.send_osd_message(q.from.osd, std::move(m));
+ } else {
+ ctx.send_notify(q.from.osd, {q.query.from, q.query.to,
+ q.query.epoch_sent,
+ map_epoch, empty,
+ PastIntervals{}});
+ }
+ }
+}
+
+seastar::future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg)
+{
+ if (pg) {
+ return PeeringEvent::complete_rctx(pg);
+ } else {
+ return shard_services.dispatch_context_messages(std::move(ctx));
+ }
+}
+
+seastar::future<Ref<PG>> RemotePeeringEvent::get_pg()
+{
+ return with_blocking_future(
+ handle.enter(cp().await_map)
+ ).then([this] {
+ return with_blocking_future(
+ osd.osdmap_gate.wait_for_map(evt.get_epoch_sent()));
+ }).then([this](auto epoch) {
+ logger().debug("{}: got map {}", *this, epoch);
+ return with_blocking_future(handle.enter(cp().get_pg));
+ }).then([this] {
+ return with_blocking_future(
+ osd.get_or_create_pg(
+ pgid, evt.get_epoch_sent(), std::move(evt.create_info)));
+ });
+}
+
+seastar::future<Ref<PG>> LocalPeeringEvent::get_pg() {
+ return seastar::make_ready_future<Ref<PG>>(pg);
+}
+
+LocalPeeringEvent::~LocalPeeringEvent() {}
+
+}
diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h
new file mode 100644
index 000000000..3a6c0678c
--- /dev/null
+++ b/src/crimson/osd/osd_operations/peering_event.h
@@ -0,0 +1,142 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+#include <seastar/core/future.hh>
+
+#include "crimson/osd/osd_operation.h"
+#include "osd/osd_types.h"
+#include "osd/PGPeeringEvent.h"
+#include "osd/PeeringState.h"
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson::osd {
+
+class OSD;
+class ShardServices;
+class PG;
+
+class PeeringEvent : public OperationT<PeeringEvent> {
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::peering_event;
+
+ class PGPipeline {
+ OrderedPipelinePhase await_map = {
+ "PeeringEvent::PGPipeline::await_map"
+ };
+ OrderedPipelinePhase process = {
+ "PeeringEvent::PGPipeline::process"
+ };
+ friend class PeeringEvent;
+ friend class PGAdvanceMap;
+ };
+
+protected:
+ OrderedPipelinePhase::Handle handle;
+ PGPipeline &pp(PG &pg);
+
+ ShardServices &shard_services;
+ PeeringCtx ctx;
+ pg_shard_t from;
+ spg_t pgid;
+ float delay = 0;
+ PGPeeringEvent evt;
+
+ const pg_shard_t get_from() const {
+ return from;
+ }
+
+ const spg_t get_pgid() const {
+ return pgid;
+ }
+
+ const PGPeeringEvent &get_event() const {
+ return evt;
+ }
+
+ virtual void on_pg_absent();
+ virtual seastar::future<> complete_rctx(Ref<PG>);
+ virtual seastar::future<Ref<PG>> get_pg() = 0;
+
+public:
+ template <typename... Args>
+ PeeringEvent(
+ ShardServices &shard_services, const pg_shard_t &from, const spg_t &pgid,
+ Args&&... args) :
+ shard_services(shard_services),
+ ctx{ceph_release_t::octopus},
+ from(from),
+ pgid(pgid),
+ evt(std::forward<Args>(args)...)
+ {}
+ template <typename... Args>
+ PeeringEvent(
+ ShardServices &shard_services, const pg_shard_t &from, const spg_t &pgid,
+ float delay, Args&&... args) :
+ shard_services(shard_services),
+ ctx{ceph_release_t::octopus},
+ from(from),
+ pgid(pgid),
+ delay(delay),
+ evt(std::forward<Args>(args)...)
+ {}
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter* f) const final;
+ seastar::future<> start();
+};
+
+class RemotePeeringEvent : public PeeringEvent {
+protected:
+ OSD &osd;
+ crimson::net::ConnectionRef conn;
+
+ void on_pg_absent() final;
+ seastar::future<> complete_rctx(Ref<PG> pg) override;
+ seastar::future<Ref<PG>> get_pg() final;
+
+public:
+ class ConnectionPipeline {
+ OrderedPipelinePhase await_map = {
+ "PeeringRequest::ConnectionPipeline::await_map"
+ };
+ OrderedPipelinePhase get_pg = {
+ "PeeringRequest::ConnectionPipeline::get_pg"
+ };
+ friend class RemotePeeringEvent;
+ };
+
+ template <typename... Args>
+ RemotePeeringEvent(OSD &osd, crimson::net::ConnectionRef conn, Args&&... args) :
+ PeeringEvent(std::forward<Args>(args)...),
+ osd(osd),
+ conn(conn)
+ {}
+
+private:
+ ConnectionPipeline &cp();
+};
+
+class LocalPeeringEvent final : public PeeringEvent {
+protected:
+ seastar::future<Ref<PG>> get_pg() final;
+
+ Ref<PG> pg;
+
+public:
+ template <typename... Args>
+ LocalPeeringEvent(Ref<PG> pg, Args&&... args) :
+ PeeringEvent(std::forward<Args>(args)...),
+ pg(pg)
+ {}
+
+ virtual ~LocalPeeringEvent();
+};
+
+
+}
diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc
new file mode 100644
index 000000000..a96479d40
--- /dev/null
+++ b/src/crimson/osd/osd_operations/pg_advance_map.cc
@@ -0,0 +1,97 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/osd/osd_operations/pg_advance_map.h"
+
+#include <boost/smart_ptr/local_shared_ptr.hpp>
+#include <seastar/core/future.hh>
+
+#include "include/types.h"
+#include "common/Formatter.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+PGAdvanceMap::PGAdvanceMap(
+ OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
+ PeeringCtx &&rctx, bool do_init)
+ : osd(osd), pg(pg), from(from), to(to),
+ rctx(std::move(rctx)), do_init(do_init) {}
+
+PGAdvanceMap::~PGAdvanceMap() {}
+
+void PGAdvanceMap::print(std::ostream &lhs) const
+{
+ lhs << "PGAdvanceMap("
+ << "pg=" << pg->get_pgid()
+ << " from=" << from
+ << " to=" << to;
+ if (do_init) {
+ lhs << " do_init";
+ }
+ lhs << ")";
+}
+
+void PGAdvanceMap::dump_detail(Formatter *f) const
+{
+ f->open_object_section("PGAdvanceMap");
+ f->dump_stream("pgid") << pg->get_pgid();
+ f->dump_int("from", from);
+ f->dump_int("to", to);
+ f->dump_bool("do_init", do_init);
+ f->close_section();
+}
+
+seastar::future<> PGAdvanceMap::start()
+{
+ using cached_map_t = boost::local_shared_ptr<const OSDMap>;
+
+ logger().debug("{}: start", *this);
+
+ IRef ref = this;
+ return with_blocking_future(
+ handle.enter(pg->peering_request_pg_pipeline.process))
+ .then([this] {
+ if (do_init) {
+ pg->handle_initialize(rctx);
+ pg->handle_activate_map(rctx);
+ }
+ return seastar::do_for_each(
+ boost::make_counting_iterator(from + 1),
+ boost::make_counting_iterator(to + 1),
+ [this](epoch_t next_epoch) {
+ return osd.get_map(next_epoch).then(
+ [this] (cached_map_t&& next_map) {
+ pg->handle_advance_map(next_map, rctx);
+ });
+ }).then([this] {
+ pg->handle_activate_map(rctx);
+ handle.exit();
+ if (do_init) {
+ osd.pg_map.pg_created(pg->get_pgid(), pg);
+ osd.shard_services.inc_pg_num();
+ logger().info("PGAdvanceMap::start new pg {}", *pg);
+ }
+ return seastar::when_all_succeed(
+ pg->get_need_up_thru() \
+ ? osd.shard_services.send_alive(pg->get_same_interval_since())
+ : seastar::now(),
+ osd.shard_services.dispatch_context(
+ pg->get_collection_ref(),
+ std::move(rctx)));
+ }).then_unpack([this] {
+ return osd.shard_services.send_pg_temp();
+ });
+ }).then([this, ref=std::move(ref)] {
+ logger().debug("{}: complete", *this);
+ });
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/pg_advance_map.h b/src/crimson/osd/osd_operations/pg_advance_map.h
new file mode 100644
index 000000000..1b27037eb
--- /dev/null
+++ b/src/crimson/osd/osd_operations/pg_advance_map.h
@@ -0,0 +1,50 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+#include <seastar/core/future.hh>
+
+#include "crimson/osd/osd_operation.h"
+#include "osd/osd_types.h"
+#include "crimson/common/type_helpers.h"
+#include "osd/PeeringState.h"
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson::osd {
+
+class OSD;
+class PG;
+
+class PGAdvanceMap : public OperationT<PGAdvanceMap> {
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::pg_advance_map;
+
+protected:
+ OrderedPipelinePhase::Handle handle;
+
+ OSD &osd;
+ Ref<PG> pg;
+
+ epoch_t from;
+ epoch_t to;
+
+ PeeringCtx rctx;
+ const bool do_init;
+
+public:
+ PGAdvanceMap(
+ OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
+ PeeringCtx &&rctx, bool do_init);
+ ~PGAdvanceMap();
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter *f) const final;
+ seastar::future<> start();
+};
+
+}
diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.cc b/src/crimson/osd/osd_operations/recovery_subrequest.cc
new file mode 100644
index 000000000..820c7beab
--- /dev/null
+++ b/src/crimson/osd/osd_operations/recovery_subrequest.cc
@@ -0,0 +1,29 @@
+#include <fmt/format.h>
+#include <fmt/ostream.h>
+
+#include "crimson/osd/osd_operations/recovery_subrequest.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+seastar::future<> RecoverySubRequest::start() {
+ logger().debug("{}: start", *this);
+
+ IRef opref = this;
+ return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()))
+ .then([this] (epoch_t epoch) {
+ return with_blocking_future(osd.wait_for_pg(m->get_spg()));
+ }).then([this, opref=std::move(opref)] (Ref<PG> pgref) {
+ return seastar::do_with(std::move(pgref), std::move(opref),
+ [this](auto& pgref, auto& opref) {
+ return pgref->get_recovery_backend()->handle_recovery_op(m);
+ });
+ });
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.h b/src/crimson/osd/osd_operations/recovery_subrequest.h
new file mode 100644
index 000000000..b151e5c1d
--- /dev/null
+++ b/src/crimson/osd/osd_operations/recovery_subrequest.h
@@ -0,0 +1,45 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "osd/osd_op_util.h"
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd.h"
+#include "crimson/common/type_helpers.h"
+#include "messages/MOSDPGPull.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+#include "messages/MOSDPGRecoveryDelete.h"
+#include "messages/MOSDPGRecoveryDeleteReply.h"
+
+namespace crimson::osd {
+
+class OSD;
+class PG;
+
+class RecoverySubRequest final : public OperationT<RecoverySubRequest> {
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::background_recovery_sub;
+
+ RecoverySubRequest(OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDFastDispatchOp>&& m)
+ : osd(osd), conn(conn), m(m) {}
+
+ void print(std::ostream& out) const final
+ {
+ out << *m;
+ }
+
+ void dump_detail(Formatter *f) const final
+ {
+ }
+
+ seastar::future<> start();
+private:
+ OSD& osd;
+ crimson::net::ConnectionRef conn;
+ Ref<MOSDFastDispatchOp> m;
+};
+
+}
diff --git a/src/crimson/osd/osd_operations/replicated_request.cc b/src/crimson/osd/osd_operations/replicated_request.cc
new file mode 100644
index 000000000..34487f9e4
--- /dev/null
+++ b/src/crimson/osd/osd_operations/replicated_request.cc
@@ -0,0 +1,74 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "replicated_request.h"
+
+#include "common/Formatter.h"
+#include "messages/MOSDRepOp.h"
+
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "crimson/osd/pg.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+RepRequest::RepRequest(OSD &osd,
+ crimson::net::ConnectionRef&& conn,
+ Ref<MOSDRepOp> &&req)
+ : osd{osd},
+ conn{std::move(conn)},
+ req{req}
+{}
+
+void RepRequest::print(std::ostream& os) const
+{
+ os << "RepRequest("
+ << "from=" << req->from
+ << " req=" << *req
+ << ")";
+}
+
+void RepRequest::dump_detail(Formatter *f) const
+{
+ f->open_object_section("RepRequest");
+ f->dump_stream("reqid") << req->reqid;
+ f->dump_stream("pgid") << req->get_spg();
+ f->dump_unsigned("map_epoch", req->get_map_epoch());
+ f->dump_unsigned("min_epoch", req->get_min_epoch());
+ f->dump_stream("oid") << req->poid;
+ f->dump_stream("from") << req->from;
+ f->close_section();
+}
+
+RepRequest::ConnectionPipeline &RepRequest::cp()
+{
+ return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+}
+
+RepRequest::PGPipeline &RepRequest::pp(PG &pg)
+{
+ return pg.replicated_request_pg_pipeline;
+}
+
+seastar::future<> RepRequest::start()
+{
+ logger().debug("{} start", *this);
+ IRef ref = this;
+ return with_blocking_future(handle.enter(cp().await_map))
+ .then([this]() {
+ return with_blocking_future(osd.osdmap_gate.wait_for_map(req->get_min_epoch()));
+ }).then([this](epoch_t epoch) {
+ return with_blocking_future(handle.enter(cp().get_pg));
+ }).then([this] {
+ return with_blocking_future(osd.wait_for_pg(req->get_spg()));
+ }).then([this, ref=std::move(ref)](Ref<PG> pg) {
+ return pg->handle_rep_op(std::move(req));
+ });
+}
+}
diff --git a/src/crimson/osd/osd_operations/replicated_request.h b/src/crimson/osd/osd_operations/replicated_request.h
new file mode 100644
index 000000000..8e9cfc9fe
--- /dev/null
+++ b/src/crimson/osd/osd_operations/replicated_request.h
@@ -0,0 +1,58 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/common/type_helpers.h"
+
+class MOSDRepOp;
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson::osd {
+
+class OSD;
+class PG;
+
+class RepRequest final : public OperationT<RepRequest> {
+public:
+ class ConnectionPipeline {
+ OrderedPipelinePhase await_map = {
+ "RepRequest::ConnectionPipeline::await_map"
+ };
+ OrderedPipelinePhase get_pg = {
+ "RepRequest::ConnectionPipeline::get_pg"
+ };
+ friend RepRequest;
+ };
+ class PGPipeline {
+ OrderedPipelinePhase await_map = {
+ "RepRequest::PGPipeline::await_map"
+ };
+ OrderedPipelinePhase process = {
+ "RepRequest::PGPipeline::process"
+ };
+ friend RepRequest;
+ };
+ static constexpr OperationTypeCode type = OperationTypeCode::replicated_request;
+ RepRequest(OSD&, crimson::net::ConnectionRef&&, Ref<MOSDRepOp>&&);
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter* f) const final;
+ seastar::future<> start();
+
+private:
+ ConnectionPipeline &cp();
+ PGPipeline &pp(PG &pg);
+
+ OSD &osd;
+ crimson::net::ConnectionRef conn;
+ Ref<MOSDRepOp> req;
+ OrderedPipelinePhase::Handle handle;
+};
+
+}