summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/osd_operations
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/osd_operations')
-rw-r--r--src/crimson/osd/osd_operations/background_recovery.cc207
-rw-r--r--src/crimson/osd/osd_operations/background_recovery.h144
-rw-r--r--src/crimson/osd/osd_operations/client_request.cc388
-rw-r--r--src/crimson/osd/osd_operations/client_request.h281
-rw-r--r--src/crimson/osd/osd_operations/client_request_common.cc64
-rw-r--r--src/crimson/osd/osd_operations/client_request_common.h20
-rw-r--r--src/crimson/osd/osd_operations/common/pg_pipeline.h31
-rw-r--r--src/crimson/osd/osd_operations/internal_client_request.cc130
-rw-r--r--src/crimson/osd/osd_operations/internal_client_request.h68
-rw-r--r--src/crimson/osd/osd_operations/logmissing_request.cc79
-rw-r--r--src/crimson/osd/osd_operations/logmissing_request.h81
-rw-r--r--src/crimson/osd/osd_operations/logmissing_request_reply.cc68
-rw-r--r--src/crimson/osd/osd_operations/logmissing_request_reply.h79
-rw-r--r--src/crimson/osd/osd_operations/osdop_params.h22
-rw-r--r--src/crimson/osd/osd_operations/peering_event.cc190
-rw-r--r--src/crimson/osd/osd_operations/peering_event.h207
-rw-r--r--src/crimson/osd/osd_operations/pg_advance_map.cc130
-rw-r--r--src/crimson/osd/osd_operations/pg_advance_map.h61
-rw-r--r--src/crimson/osd/osd_operations/recovery_subrequest.cc46
-rw-r--r--src/crimson/osd/osd_operations/recovery_subrequest.h81
-rw-r--r--src/crimson/osd/osd_operations/replicated_request.cc80
-rw-r--r--src/crimson/osd/osd_operations/replicated_request.h80
-rw-r--r--src/crimson/osd/osd_operations/snaptrim_event.cc569
-rw-r--r--src/crimson/osd/osd_operations/snaptrim_event.h210
24 files changed, 3316 insertions, 0 deletions
diff --git a/src/crimson/osd/osd_operations/background_recovery.cc b/src/crimson/osd/osd_operations/background_recovery.cc
new file mode 100644
index 000000000..953ec9595
--- /dev/null
+++ b/src/crimson/osd/osd_operations/background_recovery.cc
@@ -0,0 +1,207 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+#include <seastar/core/sleep.hh>
+
+#include "messages/MOSDOp.h"
+
+#include "crimson/osd/pg.h"
+#include "crimson/osd/shard_services.h"
+#include "common/Formatter.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
+#include "crimson/osd/osd_operations/background_recovery.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson {
+ template <>
+ struct EventBackendRegistry<osd::UrgentRecovery> {
+ static std::tuple<> get_backends() {
+ return {};
+ }
+ };
+
+ template <>
+ struct EventBackendRegistry<osd::PglogBasedRecovery> {
+ static std::tuple<> get_backends() {
+ return {};
+ }
+ };
+}
+
+namespace crimson::osd {
+
+template <class T>
+BackgroundRecoveryT<T>::BackgroundRecoveryT(
+ Ref<PG> pg,
+ ShardServices &ss,
+ epoch_t epoch_started,
+ crimson::osd::scheduler::scheduler_class_t scheduler_class,
+ float delay)
+ : pg(pg),
+ epoch_started(epoch_started),
+ delay(delay),
+ ss(ss),
+ scheduler_class(scheduler_class)
+{}
+
+template <class T>
+void BackgroundRecoveryT<T>::print(std::ostream &lhs) const
+{
+ lhs << "BackgroundRecovery(" << pg->get_pgid() << ")";
+}
+
+template <class T>
+void BackgroundRecoveryT<T>::dump_detail(Formatter *f) const
+{
+ f->dump_stream("pgid") << pg->get_pgid();
+ f->open_object_section("recovery_detail");
+ {
+ // TODO pg->dump_recovery_state(f);
+ }
+ f->close_section();
+}
+
+template <class T>
+seastar::future<> BackgroundRecoveryT<T>::start()
+{
+ logger().debug("{}: start", *this);
+
+ typename T::IRef ref = static_cast<T*>(this);
+ auto maybe_delay = seastar::now();
+ if (delay) {
+ maybe_delay = seastar::sleep(
+ std::chrono::milliseconds(std::lround(delay * 1000)));
+ }
+ return maybe_delay.then([ref, this] {
+ return this->template with_blocking_event<OperationThrottler::BlockingEvent>(
+ [ref, this] (auto&& trigger) {
+ return ss.with_throttle_while(
+ std::move(trigger),
+ this, get_scheduler_params(), [this] {
+ return T::interruptor::with_interruption([this] {
+ return do_recovery();
+ }, [](std::exception_ptr) {
+ return seastar::make_ready_future<bool>(false);
+ }, pg);
+ }).handle_exception_type([ref, this](const std::system_error& err) {
+ if (err.code() == std::make_error_code(std::errc::interrupted)) {
+ logger().debug("{} recovery interruped: {}", *pg, err.what());
+ return seastar::now();
+ }
+ return seastar::make_exception_future<>(err);
+ });
+ });
+ });
+}
+
+UrgentRecovery::UrgentRecovery(
+ const hobject_t& soid,
+ const eversion_t& need,
+ Ref<PG> pg,
+ ShardServices& ss,
+ epoch_t epoch_started)
+ : BackgroundRecoveryT{pg, ss, epoch_started,
+ crimson::osd::scheduler::scheduler_class_t::immediate},
+ soid{soid}, need(need)
+{
+}
+
+UrgentRecovery::interruptible_future<bool>
+UrgentRecovery::do_recovery()
+{
+ logger().debug("{}: {}", __func__, *this);
+ if (!pg->has_reset_since(epoch_started)) {
+ return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent,
+ interruptor>([this] (auto&& trigger) {
+ return pg->get_recovery_handler()->recover_missing(trigger, soid, need);
+ }).then_interruptible([] {
+ return seastar::make_ready_future<bool>(false);
+ });
+ }
+ return seastar::make_ready_future<bool>(false);
+}
+
+void UrgentRecovery::print(std::ostream &lhs) const
+{
+ lhs << "UrgentRecovery(" << pg->get_pgid() << ", "
+ << soid << ", v" << need << ", epoch_started: "
+ << epoch_started << ")";
+}
+
+void UrgentRecovery::dump_detail(Formatter *f) const
+{
+ f->dump_stream("pgid") << pg->get_pgid();
+ f->open_object_section("recovery_detail");
+ {
+ f->dump_stream("oid") << soid;
+ f->dump_stream("version") << need;
+ }
+ f->close_section();
+}
+
+PglogBasedRecovery::PglogBasedRecovery(
+ Ref<PG> pg,
+ ShardServices &ss,
+ const epoch_t epoch_started,
+ float delay)
+ : BackgroundRecoveryT(
+ std::move(pg),
+ ss,
+ epoch_started,
+ crimson::osd::scheduler::scheduler_class_t::background_recovery,
+ delay)
+{}
+
+PglogBasedRecovery::interruptible_future<bool>
+PglogBasedRecovery::do_recovery()
+{
+ if (pg->has_reset_since(epoch_started)) {
+ return seastar::make_ready_future<bool>(false);
+ }
+ return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent,
+ interruptor>([this] (auto&& trigger) {
+ return pg->get_recovery_handler()->start_recovery_ops(
+ trigger,
+ crimson::common::local_conf()->osd_recovery_max_single_start);
+ });
+}
+
+PGPeeringPipeline &BackfillRecovery::peering_pp(PG &pg)
+{
+ return pg.peering_request_pg_pipeline;
+}
+
+BackfillRecovery::interruptible_future<bool>
+BackfillRecovery::do_recovery()
+{
+ logger().debug("{}", __func__);
+
+ if (pg->has_reset_since(epoch_started)) {
+ logger().debug("{}: pg got reset since epoch_started={}",
+ __func__, epoch_started);
+ return seastar::make_ready_future<bool>(false);
+ }
+ // TODO: limits
+ return enter_stage<interruptor>(
+ // process_event() of our boost::statechart machine is non-reentrant.
+ // with the backfill_pipeline we protect it from a second entry from
+ // the implementation of BackfillListener.
+ // additionally, this stage serves to synchronize with PeeringEvent.
+ peering_pp(*pg).process
+ ).then_interruptible([this] {
+ pg->get_recovery_handler()->dispatch_backfill_event(std::move(evt));
+ return seastar::make_ready_future<bool>(false);
+ });
+}
+
+template class BackgroundRecoveryT<UrgentRecovery>;
+template class BackgroundRecoveryT<PglogBasedRecovery>;
+template class BackgroundRecoveryT<BackfillRecovery>;
+
+} // namespace crimson::osd
diff --git a/src/crimson/osd/osd_operations/background_recovery.h b/src/crimson/osd/osd_operations/background_recovery.h
new file mode 100644
index 000000000..17f2cd57a
--- /dev/null
+++ b/src/crimson/osd/osd_operations/background_recovery.h
@@ -0,0 +1,144 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/statechart/event_base.hpp>
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/recovery_backend.h"
+#include "crimson/common/type_helpers.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/pg.h"
+
+namespace crimson::osd {
+class PG;
+class ShardServices;
+
+template <class T>
+class BackgroundRecoveryT : public PhasedOperationT<T> {
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::background_recovery;
+
+ BackgroundRecoveryT(
+ Ref<PG> pg,
+ ShardServices &ss,
+ epoch_t epoch_started,
+ crimson::osd::scheduler::scheduler_class_t scheduler_class, float delay = 0);
+
+ virtual void print(std::ostream &) const;
+ seastar::future<> start();
+
+protected:
+ Ref<PG> pg;
+ const epoch_t epoch_started;
+ float delay = 0;
+
+private:
+ virtual void dump_detail(Formatter *f) const;
+ crimson::osd::scheduler::params_t get_scheduler_params() const {
+ return {
+ 1, // cost
+ 0, // owner
+ scheduler_class
+ };
+ }
+ using do_recovery_ret_t = typename PhasedOperationT<T>::template interruptible_future<bool>;
+ virtual do_recovery_ret_t do_recovery() = 0;
+ ShardServices &ss;
+ const crimson::osd::scheduler::scheduler_class_t scheduler_class;
+};
+
+/// represent a recovery initiated for serving a client request
+///
+/// unlike @c PglogBasedRecovery and @c BackfillRecovery,
+/// @c UrgentRecovery is not throttled by the scheduler. and it
+/// utilizes @c RecoveryBackend directly to recover the unreadable
+/// object.
+class UrgentRecovery final : public BackgroundRecoveryT<UrgentRecovery> {
+public:
+ UrgentRecovery(
+ const hobject_t& soid,
+ const eversion_t& need,
+ Ref<PG> pg,
+ ShardServices& ss,
+ epoch_t epoch_started);
+ void print(std::ostream&) const final;
+
+ std::tuple<
+ OperationThrottler::BlockingEvent,
+ RecoveryBackend::RecoveryBlockingEvent
+ > tracking_events;
+
+private:
+ void dump_detail(Formatter* f) const final;
+ interruptible_future<bool> do_recovery() override;
+ const hobject_t soid;
+ const eversion_t need;
+};
+
+class PglogBasedRecovery final : public BackgroundRecoveryT<PglogBasedRecovery> {
+public:
+ PglogBasedRecovery(
+ Ref<PG> pg,
+ ShardServices &ss,
+ epoch_t epoch_started,
+ float delay = 0);
+
+ std::tuple<
+ OperationThrottler::BlockingEvent,
+ RecoveryBackend::RecoveryBlockingEvent
+ > tracking_events;
+
+private:
+ interruptible_future<bool> do_recovery() override;
+};
+
+class BackfillRecovery final : public BackgroundRecoveryT<BackfillRecovery> {
+public:
+
+ template <class EventT>
+ BackfillRecovery(
+ Ref<PG> pg,
+ ShardServices &ss,
+ epoch_t epoch_started,
+ const EventT& evt);
+
+ PipelineHandle& get_handle() { return handle; }
+
+ std::tuple<
+ OperationThrottler::BlockingEvent,
+ PGPeeringPipeline::Process::BlockingEvent
+ > tracking_events;
+
+private:
+ boost::intrusive_ptr<const boost::statechart::event_base> evt;
+ PipelineHandle handle;
+
+ static PGPeeringPipeline &peering_pp(PG &pg);
+ interruptible_future<bool> do_recovery() override;
+};
+
+template <class EventT>
+BackfillRecovery::BackfillRecovery(
+ Ref<PG> pg,
+ ShardServices &ss,
+ const epoch_t epoch_started,
+ const EventT& evt)
+ : BackgroundRecoveryT(
+ std::move(pg),
+ ss,
+ epoch_started,
+ crimson::osd::scheduler::scheduler_class_t::background_best_effort),
+ evt(evt.intrusive_from_this())
+{}
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::BackfillRecovery> : fmt::ostream_formatter {};
+template <> struct fmt::formatter<crimson::osd::PglogBasedRecovery> : fmt::ostream_formatter {};
+template <> struct fmt::formatter<crimson::osd::UrgentRecovery> : fmt::ostream_formatter {};
+template <class T> struct fmt::formatter<crimson::osd::BackgroundRecoveryT<T>> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc
new file mode 100644
index 000000000..9374fbde2
--- /dev/null
+++ b/src/crimson/osd/osd_operations/client_request.cc
@@ -0,0 +1,388 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+
+#include "crimson/common/exception.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+#include "common/Formatter.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "osd/object_state_fmt.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+
+void ClientRequest::Orderer::requeue(
+ ShardServices &shard_services, Ref<PG> pg)
+{
+ for (auto &req: list) {
+ logger().debug("{}: {} requeueing {}", __func__, *pg, req);
+ req.reset_instance_handle();
+ std::ignore = req.with_pg_int(shard_services, pg);
+ }
+}
+
+void ClientRequest::Orderer::clear_and_cancel()
+{
+ for (auto i = list.begin(); i != list.end(); ) {
+ logger().debug(
+ "ClientRequest::Orderer::clear_and_cancel: {}",
+ *i);
+ i->complete_request();
+ remove_request(*(i++));
+ }
+}
+
+void ClientRequest::complete_request()
+{
+ track_event<CompletionEvent>();
+ on_complete.set_value();
+}
+
+ClientRequest::ClientRequest(
+ ShardServices &shard_services, crimson::net::ConnectionRef conn,
+ Ref<MOSDOp> &&m)
+ : put_historic_shard_services(&shard_services),
+ conn(std::move(conn)),
+ m(std::move(m)),
+ instance_handle(new instance_handle_t)
+{}
+
+ClientRequest::~ClientRequest()
+{
+ logger().debug("{}: destroying", *this);
+}
+
+void ClientRequest::print(std::ostream &lhs) const
+{
+ lhs << "m=[" << *m << "]";
+}
+
+void ClientRequest::dump_detail(Formatter *f) const
+{
+ logger().debug("{}: dumping", *this);
+ std::apply([f] (auto... event) {
+ (..., event.dump(f));
+ }, tracking_events);
+}
+
+ConnectionPipeline &ClientRequest::get_connection_pipeline()
+{
+ return get_osd_priv(conn.get()).client_request_conn_pipeline;
+}
+
+ClientRequest::PGPipeline &ClientRequest::client_pp(PG &pg)
+{
+ return pg.request_pg_pipeline;
+}
+
+bool ClientRequest::is_pg_op() const
+{
+ return std::any_of(
+ begin(m->ops), end(m->ops),
+ [](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
+}
+
+seastar::future<> ClientRequest::with_pg_int(
+ ShardServices &shard_services, Ref<PG> pgref)
+{
+ epoch_t same_interval_since = pgref->get_interval_start_epoch();
+ logger().debug("{} same_interval_since: {}", *this, same_interval_since);
+ if (m->finish_decode()) {
+ m->clear_payload();
+ }
+ const auto this_instance_id = instance_id++;
+ OperationRef opref{this};
+ auto instance_handle = get_instance_handle();
+ auto &ihref = *instance_handle;
+ return interruptor::with_interruption(
+ [this, pgref, this_instance_id, &ihref, &shard_services]() mutable {
+ PG &pg = *pgref;
+ if (pg.can_discard_op(*m)) {
+ return shard_services.send_incremental_map(
+ std::ref(*conn), m->get_map_epoch()
+ ).then([this, this_instance_id, pgref] {
+ logger().debug("{}.{}: discarding", *this, this_instance_id);
+ pgref->client_request_orderer.remove_request(*this);
+ complete_request();
+ return interruptor::now();
+ });
+ }
+ return ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this
+ ).then_interruptible([this, this_instance_id, &pg, &ihref] {
+ logger().debug("{}.{}: after await_map stage", *this, this_instance_id);
+ return ihref.enter_blocker(
+ *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map,
+ m->get_min_epoch(), nullptr);
+ }).then_interruptible([this, this_instance_id, &pg, &ihref](auto map) {
+ logger().debug("{}.{}: after wait_for_map", *this, this_instance_id);
+ return ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
+ }).then_interruptible([this, this_instance_id, &pg, &ihref]() {
+ logger().debug(
+ "{}.{}: after wait_for_active stage", *this, this_instance_id);
+ return ihref.enter_blocker(
+ *this,
+ pg.wait_for_active_blocker,
+ &decltype(pg.wait_for_active_blocker)::wait);
+ }).then_interruptible([this, pgref, this_instance_id, &ihref]() mutable
+ -> interruptible_future<> {
+ logger().debug(
+ "{}.{}: after wait_for_active", *this, this_instance_id);
+ if (is_pg_op()) {
+ return process_pg_op(pgref);
+ } else {
+ return process_op(ihref, pgref);
+ }
+ }).then_interruptible([this, this_instance_id, pgref] {
+ logger().debug("{}.{}: after process*", *this, this_instance_id);
+ pgref->client_request_orderer.remove_request(*this);
+ complete_request();
+ });
+ }, [this, this_instance_id, pgref](std::exception_ptr eptr) {
+ // TODO: better debug output
+ logger().debug("{}.{}: interrupted {}", *this, this_instance_id, eptr);
+ }, pgref).finally(
+ [opref=std::move(opref), pgref=std::move(pgref),
+ instance_handle=std::move(instance_handle), &ihref] {
+ ihref.handle.exit();
+ });
+}
+
+seastar::future<> ClientRequest::with_pg(
+ ShardServices &shard_services, Ref<PG> pgref)
+{
+ put_historic_shard_services = &shard_services;
+ pgref->client_request_orderer.add_request(*this);
+ auto ret = on_complete.get_future();
+ std::ignore = with_pg_int(
+ shard_services, std::move(pgref)
+ );
+ return ret;
+}
+
+ClientRequest::interruptible_future<>
+ClientRequest::process_pg_op(
+ Ref<PG> &pg)
+{
+ return pg->do_pg_ops(
+ m
+ ).then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) {
+ return conn->send(std::move(reply));
+ });
+}
+
+auto ClientRequest::reply_op_error(const Ref<PG>& pg, int err)
+{
+ logger().debug("{}: replying with error {}", *this, err);
+ auto reply = crimson::make_message<MOSDOpReply>(
+ m.get(), err, pg->get_osdmap_epoch(),
+ m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
+ !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
+ reply->set_reply_versions(eversion_t(), 0);
+ reply->set_op_returns(std::vector<pg_log_op_return_item_t>{});
+ return conn->send(std::move(reply));
+}
+
+ClientRequest::interruptible_future<>
+ClientRequest::process_op(instance_handle_t &ihref, Ref<PG> &pg)
+{
+ return ihref.enter_stage<interruptor>(
+ client_pp(*pg).recover_missing,
+ *this
+ ).then_interruptible(
+ [this, pg]() mutable {
+ if (pg->is_primary()) {
+ return do_recover_missing(pg, m->get_hobj());
+ } else {
+ logger().debug("process_op: Skipping do_recover_missing"
+ "on non primary pg");
+ return interruptor::now();
+ }
+ }).then_interruptible([this, pg, &ihref]() mutable {
+ return pg->already_complete(m->get_reqid()).then_interruptible(
+ [this, pg, &ihref](auto completed) mutable
+ -> PG::load_obc_iertr::future<> {
+ if (completed) {
+ auto reply = crimson::make_message<MOSDOpReply>(
+ m.get(), completed->err, pg->get_osdmap_epoch(),
+ CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
+ reply->set_reply_versions(completed->version, completed->user_version);
+ return conn->send(std::move(reply));
+ } else {
+ return ihref.enter_stage<interruptor>(client_pp(*pg).get_obc, *this
+ ).then_interruptible(
+ [this, pg, &ihref]() mutable -> PG::load_obc_iertr::future<> {
+ logger().debug("{}: in get_obc stage", *this);
+ op_info.set_from_op(&*m, *pg->get_osdmap());
+ return pg->with_locked_obc(
+ m->get_hobj(), op_info,
+ [this, pg, &ihref](auto obc) mutable {
+ logger().debug("{}: got obc {}", *this, obc->obs);
+ return ihref.enter_stage<interruptor>(
+ client_pp(*pg).process, *this
+ ).then_interruptible([this, pg, obc, &ihref]() mutable {
+ return do_process(ihref, pg, obc);
+ });
+ });
+ });
+ }
+ });
+ }).handle_error_interruptible(
+ PG::load_obc_ertr::all_same_way([this, pg=std::move(pg)](const auto &code) {
+ logger().error("ClientRequest saw error code {}", code);
+ assert(code.value() > 0);
+ return reply_op_error(pg, -code.value());
+ }));
+}
+
+ClientRequest::interruptible_future<>
+ClientRequest::do_process(
+ instance_handle_t &ihref,
+ Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
+{
+ if (m->has_flag(CEPH_OSD_FLAG_PARALLELEXEC)) {
+ return reply_op_error(pg, -EINVAL);
+ }
+ const pg_pool_t pool = pg->get_pgpool().info;
+ if (pool.has_flag(pg_pool_t::FLAG_EIO)) {
+ // drop op on the floor; the client will handle returning EIO
+ if (m->has_flag(CEPH_OSD_FLAG_SUPPORTSPOOLEIO)) {
+ logger().debug("discarding op due to pool EIO flag");
+ return seastar::now();
+ } else {
+ logger().debug("replying EIO due to pool EIO flag");
+ return reply_op_error(pg, -EIO);
+ }
+ }
+ if (m->get_oid().name.size()
+ > crimson::common::local_conf()->osd_max_object_name_len) {
+ return reply_op_error(pg, -ENAMETOOLONG);
+ } else if (m->get_hobj().get_key().size()
+ > crimson::common::local_conf()->osd_max_object_name_len) {
+ return reply_op_error(pg, -ENAMETOOLONG);
+ } else if (m->get_hobj().nspace.size()
+ > crimson::common::local_conf()->osd_max_object_namespace_len) {
+ return reply_op_error(pg, -ENAMETOOLONG);
+ } else if (m->get_hobj().oid.name.empty()) {
+ return reply_op_error(pg, -EINVAL);
+ } else if (pg->get_osdmap()->is_blocklisted(conn->get_peer_addr())) {
+ logger().info("{} is blocklisted", conn->get_peer_addr());
+ return reply_op_error(pg, -EBLOCKLISTED);
+ }
+
+ if (!obc->obs.exists && !op_info.may_write()) {
+ return reply_op_error(pg, -ENOENT);
+ }
+
+ SnapContext snapc = get_snapc(pg,obc);
+
+ if ((m->has_flag(CEPH_OSD_FLAG_ORDERSNAP)) &&
+ snapc.seq < obc->ssc->snapset.seq) {
+ logger().debug("{} ORDERSNAP flag set and snapc seq {}",
+ " < snapset seq {} on {}",
+ __func__, snapc.seq, obc->ssc->snapset.seq,
+ obc->obs.oi.soid);
+ return reply_op_error(pg, -EOLDSNAPC);
+ }
+
+ if (!pg->is_primary()) {
+ // primary can handle both normal ops and balanced reads
+ if (is_misdirected(*pg)) {
+ logger().trace("do_process: dropping misdirected op");
+ return seastar::now();
+ } else if (const hobject_t& hoid = m->get_hobj();
+ !pg->get_peering_state().can_serve_replica_read(hoid)) {
+ logger().debug("{}: unstable write on replica, "
+ "bouncing to primary",
+ __func__);
+ return reply_op_error(pg, -EAGAIN);
+ } else {
+ logger().debug("{}: serving replica read on oid {}",
+ __func__, m->get_hobj());
+ }
+ }
+ return pg->do_osd_ops(m, conn, obc, op_info, snapc).safe_then_unpack_interruptible(
+ [this, pg, &ihref](auto submitted, auto all_completed) mutable {
+ return submitted.then_interruptible([this, pg, &ihref] {
+ return ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
+ }).then_interruptible(
+ [this, pg, all_completed=std::move(all_completed), &ihref]() mutable {
+ return all_completed.safe_then_interruptible(
+ [this, pg, &ihref](MURef<MOSDOpReply> reply) {
+ return ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this
+ ).then_interruptible(
+ [this, reply=std::move(reply)]() mutable {
+ logger().debug("{}: sending response", *this);
+ return conn->send(std::move(reply));
+ });
+ }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable {
+ return process_op(ihref, pg);
+ }));
+ });
+ }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable {
+ return process_op(ihref, pg);
+ }));
+}
+
+bool ClientRequest::is_misdirected(const PG& pg) const
+{
+ // otherwise take a closer look
+ if (const int flags = m->get_flags();
+ flags & CEPH_OSD_FLAG_BALANCE_READS ||
+ flags & CEPH_OSD_FLAG_LOCALIZE_READS) {
+ if (!op_info.may_read()) {
+ // no read found, so it can't be balanced read
+ return true;
+ }
+ if (op_info.may_write() || op_info.may_cache()) {
+ // write op, but i am not primary
+ return true;
+ }
+ // balanced reads; any replica will do
+ return false;
+ }
+ // neither balanced nor localize reads
+ return true;
+}
+
+void ClientRequest::put_historic() const
+{
+ ceph_assert_always(put_historic_shard_services);
+ put_historic_shard_services->get_registry().put_historic(*this);
+}
+
+const SnapContext ClientRequest::get_snapc(
+ Ref<PG>& pg,
+ crimson::osd::ObjectContextRef obc) const
+{
+ SnapContext snapc;
+ if (op_info.may_write() || op_info.may_cache()) {
+ // snap
+ if (pg->get_pgpool().info.is_pool_snaps_mode()) {
+ // use pool's snapc
+ snapc = pg->get_pgpool().snapc;
+ logger().debug("{} using pool's snapc snaps={}",
+ __func__, snapc.snaps);
+
+ } else {
+ // client specified snapc
+ snapc.seq = m->get_snap_seq();
+ snapc.snaps = m->get_snaps();
+ logger().debug("{} client specified snapc seq={} snaps={}",
+ __func__, snapc.seq, snapc.snaps);
+ }
+ }
+ return snapc;
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h
new file mode 100644
index 000000000..b2dce1e87
--- /dev/null
+++ b/src/crimson/osd/osd_operations/client_request.h
@@ -0,0 +1,281 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future.hh>
+
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+#include "osd/osd_op_util.h"
+#include "crimson/net/Connection.h"
+#include "crimson/osd/object_context.h"
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/client_request_common.h"
+#include "crimson/osd/osd_operations/common/pg_pipeline.h"
+#include "crimson/osd/pg_activation_blocker.h"
+#include "crimson/osd/pg_map.h"
+#include "crimson/common/type_helpers.h"
+#include "crimson/common/utility.h"
+#include "messages/MOSDOp.h"
+
+namespace crimson::osd {
+class PG;
+class OSD;
+class ShardServices;
+
+class ClientRequest final : public PhasedOperationT<ClientRequest>,
+ private CommonClientRequest {
+ // Initially set to primary core, updated to pg core after move,
+ // used by put_historic
+ ShardServices *put_historic_shard_services = nullptr;
+
+ crimson::net::ConnectionRef conn;
+ // must be after conn due to ConnectionPipeline's life-time
+ Ref<MOSDOp> m;
+ OpInfo op_info;
+ seastar::promise<> on_complete;
+ unsigned instance_id = 0;
+
+public:
+ class PGPipeline : public CommonPGPipeline {
+ public:
+ struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> {
+ static constexpr auto type_name = "ClientRequest::PGPipeline::await_map";
+ } await_map;
+ struct WaitRepop : OrderedConcurrentPhaseT<WaitRepop> {
+ static constexpr auto type_name = "ClientRequest::PGPipeline::wait_repop";
+ } wait_repop;
+ struct SendReply : OrderedExclusivePhaseT<SendReply> {
+ static constexpr auto type_name = "ClientRequest::PGPipeline::send_reply";
+ } send_reply;
+ friend class ClientRequest;
+ friend class LttngBackend;
+ friend class HistoricBackend;
+ friend class ReqRequest;
+ friend class LogMissingRequest;
+ friend class LogMissingRequestReply;
+ };
+
+ /**
+ * instance_handle_t
+ *
+ * Client request is, at present, the only Operation which can be requeued.
+ * This is, mostly, fine. However, reusing the PipelineHandle or
+ * BlockingEvent structures before proving that the prior instance has stopped
+ * can create hangs or crashes due to violations of the BlockerT and
+ * PipelineHandle invariants.
+ *
+ * To solve this, we create an instance_handle_t which contains the events
+ * for the portion of execution that can be rerun as well as the
+ * PipelineHandle. ClientRequest::with_pg_int grabs a reference to the current
+ * instance_handle_t and releases its PipelineHandle in the finally block.
+ * On requeue, we create a new instance_handle_t with a fresh PipelineHandle
+ * and events tuple and use it and use it for the next invocation of
+ * with_pg_int.
+ */
+ std::tuple<
+ StartEvent,
+ ConnectionPipeline::AwaitActive::BlockingEvent,
+ ConnectionPipeline::AwaitMap::BlockingEvent,
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ ConnectionPipeline::GetPG::BlockingEvent,
+ PGMap::PGCreationBlockingEvent,
+ CompletionEvent
+ > tracking_events;
+
+ class instance_handle_t : public boost::intrusive_ref_counter<
+ instance_handle_t, boost::thread_unsafe_counter> {
+ public:
+ // intrusive_ptr because seastar::lw_shared_ptr includes a cpu debug check
+ // that we will fail since the core on which we allocate the request may not
+ // be the core on which we perform with_pg_int. This is harmless, since we
+ // don't leave any references on the source core, so we just bypass it by using
+ // intrusive_ptr instead.
+ using ref_t = boost::intrusive_ptr<instance_handle_t>;
+ PipelineHandle handle;
+
+ std::tuple<
+ PGPipeline::AwaitMap::BlockingEvent,
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ PGPipeline::WaitForActive::BlockingEvent,
+ PGActivationBlocker::BlockingEvent,
+ PGPipeline::RecoverMissing::BlockingEvent,
+ PGPipeline::GetOBC::BlockingEvent,
+ PGPipeline::Process::BlockingEvent,
+ PGPipeline::WaitRepop::BlockingEvent,
+ PGPipeline::SendReply::BlockingEvent,
+ CompletionEvent
+ > pg_tracking_events;
+
+ template <typename BlockingEventT, typename InterruptorT=void, typename F>
+ auto with_blocking_event(F &&f, ClientRequest &op) {
+ auto ret = std::forward<F>(f)(
+ typename BlockingEventT::template Trigger<ClientRequest>{
+ std::get<BlockingEventT>(pg_tracking_events), op
+ });
+ if constexpr (std::is_same_v<InterruptorT, void>) {
+ return ret;
+ } else {
+ using ret_t = decltype(ret);
+ return typename InterruptorT::template futurize_t<ret_t>{std::move(ret)};
+ }
+ }
+
+ template <typename InterruptorT=void, typename StageT>
+ auto enter_stage(StageT &stage, ClientRequest &op) {
+ return this->template with_blocking_event<
+ typename StageT::BlockingEvent,
+ InterruptorT>(
+ [&stage, this](auto &&trigger) {
+ return handle.template enter<ClientRequest>(
+ stage, std::move(trigger));
+ }, op);
+ }
+
+ template <
+ typename InterruptorT=void, typename BlockingObj, typename Method,
+ typename... Args>
+ auto enter_blocker(
+ ClientRequest &op, BlockingObj &obj, Method method, Args&&... args) {
+ return this->template with_blocking_event<
+ typename BlockingObj::Blocker::BlockingEvent,
+ InterruptorT>(
+ [&obj, method,
+ args=std::forward_as_tuple(std::move(args)...)](auto &&trigger) mutable {
+ return apply_method_to_tuple(
+ obj, method,
+ std::tuple_cat(
+ std::forward_as_tuple(std::move(trigger)),
+ std::move(args))
+ );
+ }, op);
+ }
+ };
+ instance_handle_t::ref_t instance_handle;
+ void reset_instance_handle() {
+ instance_handle = new instance_handle_t;
+ }
+ auto get_instance_handle() { return instance_handle; }
+
+ using ordering_hook_t = boost::intrusive::list_member_hook<>;
+ ordering_hook_t ordering_hook;
+ class Orderer {
+ using list_t = boost::intrusive::list<
+ ClientRequest,
+ boost::intrusive::member_hook<
+ ClientRequest,
+ typename ClientRequest::ordering_hook_t,
+ &ClientRequest::ordering_hook>
+ >;
+ list_t list;
+
+ public:
+ void add_request(ClientRequest &request) {
+ assert(!request.ordering_hook.is_linked());
+ intrusive_ptr_add_ref(&request);
+ list.push_back(request);
+ }
+ void remove_request(ClientRequest &request) {
+ assert(request.ordering_hook.is_linked());
+ list.erase(list_t::s_iterator_to(request));
+ intrusive_ptr_release(&request);
+ }
+ void requeue(ShardServices &shard_services, Ref<PG> pg);
+ void clear_and_cancel();
+ };
+ void complete_request();
+
+ static constexpr OperationTypeCode type = OperationTypeCode::client_request;
+
+ ClientRequest(
+ ShardServices &shard_services,
+ crimson::net::ConnectionRef, Ref<MOSDOp> &&m);
+ ~ClientRequest();
+
+ void print(std::ostream &) const final;
+ void dump_detail(Formatter *f) const final;
+
+ static constexpr bool can_create() { return false; }
+ spg_t get_pgid() const {
+ return m->get_spg();
+ }
+ PipelineHandle &get_handle() { return instance_handle->handle; }
+ epoch_t get_epoch() const { return m->get_min_epoch(); }
+
+ ConnectionPipeline &get_connection_pipeline();
+ seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+ assert(conn);
+ return conn.get_foreign(
+ ).then([this](auto f_conn) {
+ conn.reset();
+ return f_conn;
+ });
+ }
+ void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+ assert(!conn);
+ conn = make_local_shared_foreign(std::move(_conn));
+ }
+
+ seastar::future<> with_pg_int(
+ ShardServices &shard_services, Ref<PG> pg);
+
+public:
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pgref);
+
+private:
+ template <typename FuncT>
+ interruptible_future<> with_sequencer(FuncT&& func);
+ auto reply_op_error(const Ref<PG>& pg, int err);
+
+ interruptible_future<> do_process(
+ instance_handle_t &ihref,
+ Ref<PG>& pg,
+ crimson::osd::ObjectContextRef obc);
+ ::crimson::interruptible::interruptible_future<
+ ::crimson::osd::IOInterruptCondition> process_pg_op(
+ Ref<PG> &pg);
+ ::crimson::interruptible::interruptible_future<
+ ::crimson::osd::IOInterruptCondition> process_op(
+ instance_handle_t &ihref,
+ Ref<PG> &pg);
+ bool is_pg_op() const;
+
+ PGPipeline &client_pp(PG &pg);
+
+ template <typename Errorator>
+ using interruptible_errorator =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ Errorator>;
+
+ bool is_misdirected(const PG& pg) const;
+
+ const SnapContext get_snapc(
+ Ref<PG>& pg,
+ crimson::osd::ObjectContextRef obc) const;
+
+public:
+
+ friend class LttngBackend;
+ friend class HistoricBackend;
+
+ auto get_started() const {
+ return get_event<StartEvent>().get_timestamp();
+ };
+
+ auto get_completed() const {
+ return get_event<CompletionEvent>().get_timestamp();
+ };
+
+ void put_historic() const;
+};
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::ClientRequest> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/osd/osd_operations/client_request_common.cc b/src/crimson/osd/osd_operations/client_request_common.cc
new file mode 100644
index 000000000..cfd22c774
--- /dev/null
+++ b/src/crimson/osd/osd_operations/client_request_common.cc
@@ -0,0 +1,64 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "crimson/osd/osd_operations/client_request_common.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd_operations/background_recovery.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+typename InterruptibleOperation::template interruptible_future<>
+CommonClientRequest::do_recover_missing(
+ Ref<PG>& pg, const hobject_t& soid)
+{
+ eversion_t ver;
+ assert(pg->is_primary());
+ logger().debug("{} check for recovery, {}", __func__, soid);
+ if (!pg->is_unreadable_object(soid, &ver) &&
+ !pg->is_degraded_or_backfilling_object(soid)) {
+ return seastar::now();
+ }
+ logger().debug("{} need to wait for recovery, {}", __func__, soid);
+ if (pg->get_recovery_backend()->is_recovering(soid)) {
+ return pg->get_recovery_backend()->get_recovering(soid).wait_for_recovered();
+ } else {
+ auto [op, fut] =
+ pg->get_shard_services().start_operation<UrgentRecovery>(
+ soid, ver, pg, pg->get_shard_services(), pg->get_osdmap_epoch());
+ return std::move(fut);
+ }
+}
+
+bool CommonClientRequest::should_abort_request(
+ const Operation& op,
+ std::exception_ptr eptr)
+{
+ if (*eptr.__cxa_exception_type() ==
+ typeid(::crimson::common::actingset_changed)) {
+ try {
+ std::rethrow_exception(eptr);
+ } catch(::crimson::common::actingset_changed& e) {
+ if (e.is_primary()) {
+ logger().debug("{} {} operation restart, acting set changed", __func__, op);
+ return false;
+ } else {
+ logger().debug("{} {} operation abort, up primary changed", __func__, op);
+ return true;
+ }
+ }
+ } else {
+ assert(*eptr.__cxa_exception_type() ==
+ typeid(crimson::common::system_shutdown_exception));
+ crimson::get_logger(ceph_subsys_osd).debug(
+ "{} {} operation skipped, system shutdown", __func__, op);
+ return true;
+ }
+}
+
+} // namespace crimson::osd
diff --git a/src/crimson/osd/osd_operations/client_request_common.h b/src/crimson/osd/osd_operations/client_request_common.h
new file mode 100644
index 000000000..6a8a78966
--- /dev/null
+++ b/src/crimson/osd/osd_operations/client_request_common.h
@@ -0,0 +1,20 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/common/operation.h"
+#include "crimson/common/type_helpers.h"
+#include "crimson/osd/osd_operation.h"
+
+namespace crimson::osd {
+
+struct CommonClientRequest {
+ static InterruptibleOperation::template interruptible_future<>
+ do_recover_missing(Ref<PG>& pg, const hobject_t& soid);
+
+ static bool should_abort_request(
+ const crimson::Operation& op, std::exception_ptr eptr);
+};
+
+} // namespace crimson::osd
diff --git a/src/crimson/osd/osd_operations/common/pg_pipeline.h b/src/crimson/osd/osd_operations/common/pg_pipeline.h
new file mode 100644
index 000000000..58fa07b8b
--- /dev/null
+++ b/src/crimson/osd/osd_operations/common/pg_pipeline.h
@@ -0,0 +1,31 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "osd/osd_op_util.h"
+#include "crimson/osd/osd_operation.h"
+
+namespace crimson::osd {
+
+class CommonPGPipeline {
+protected:
+ friend class InternalClientRequest;
+ friend class SnapTrimEvent;
+ friend class SnapTrimObjSubEvent;
+
+ struct WaitForActive : OrderedExclusivePhaseT<WaitForActive> {
+ static constexpr auto type_name = "CommonPGPipeline:::wait_for_active";
+ } wait_for_active;
+ struct RecoverMissing : OrderedExclusivePhaseT<RecoverMissing> {
+ static constexpr auto type_name = "CommonPGPipeline::recover_missing";
+ } recover_missing;
+ struct GetOBC : OrderedExclusivePhaseT<GetOBC> {
+ static constexpr auto type_name = "CommonPGPipeline::get_obc";
+ } get_obc;
+ struct Process : OrderedExclusivePhaseT<Process> {
+ static constexpr auto type_name = "CommonPGPipeline::process";
+ } process;
+};
+
+} // namespace crimson::osd
diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc
new file mode 100644
index 000000000..1e9b842b2
--- /dev/null
+++ b/src/crimson/osd/osd_operations/internal_client_request.cc
@@ -0,0 +1,130 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include <seastar/core/future.hh>
+
+#include "crimson/osd/osd_operations/internal_client_request.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson {
+ template <>
+ struct EventBackendRegistry<osd::InternalClientRequest> {
+ static std::tuple<> get_backends() {
+ return {};
+ }
+ };
+}
+
+
+namespace crimson::osd {
+
+InternalClientRequest::InternalClientRequest(Ref<PG> pg)
+ : pg(std::move(pg))
+{
+ assert(bool(this->pg));
+ assert(this->pg->is_primary());
+}
+
+InternalClientRequest::~InternalClientRequest()
+{
+ logger().debug("{}: destroying", *this);
+}
+
+void InternalClientRequest::print(std::ostream &) const
+{
+}
+
+void InternalClientRequest::dump_detail(Formatter *f) const
+{
+}
+
+CommonPGPipeline& InternalClientRequest::client_pp()
+{
+ return pg->request_pg_pipeline;
+}
+
+seastar::future<> InternalClientRequest::start()
+{
+ track_event<StartEvent>();
+ return crimson::common::handle_system_shutdown([this] {
+ return seastar::repeat([this] {
+ logger().debug("{}: in repeat", *this);
+ return interruptor::with_interruption([this]() mutable {
+ return enter_stage<interruptor>(
+ client_pp().wait_for_active
+ ).then_interruptible([this] {
+ return with_blocking_event<PGActivationBlocker::BlockingEvent,
+ interruptor>([this] (auto&& trigger) {
+ return pg->wait_for_active_blocker.wait(std::move(trigger));
+ });
+ }).then_interruptible([this] {
+ return enter_stage<interruptor>(
+ client_pp().recover_missing);
+ }).then_interruptible([this] {
+ return do_recover_missing(pg, get_target_oid());
+ }).then_interruptible([this] {
+ return enter_stage<interruptor>(
+ client_pp().get_obc);
+ }).then_interruptible([this] () -> PG::load_obc_iertr::future<> {
+ logger().debug("{}: getting obc lock", *this);
+ return seastar::do_with(create_osd_ops(),
+ [this](auto& osd_ops) mutable {
+ logger().debug("InternalClientRequest: got {} OSDOps to execute",
+ std::size(osd_ops));
+ [[maybe_unused]] const int ret = op_info.set_from_op(
+ std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap());
+ assert(ret == 0);
+ return pg->with_locked_obc(get_target_oid(), op_info,
+ [&osd_ops, this](auto obc) {
+ return enter_stage<interruptor>(client_pp().process
+ ).then_interruptible(
+ [obc=std::move(obc), &osd_ops, this] {
+ return pg->do_osd_ops(
+ std::move(obc),
+ osd_ops,
+ std::as_const(op_info),
+ get_do_osd_ops_params(),
+ [] {
+ return PG::do_osd_ops_iertr::now();
+ },
+ [] (const std::error_code& e) {
+ return PG::do_osd_ops_iertr::now();
+ }
+ ).safe_then_unpack_interruptible(
+ [](auto submitted, auto all_completed) {
+ return all_completed.handle_error_interruptible(
+ crimson::ct_error::eagain::handle([] {
+ return seastar::now();
+ }));
+ }, crimson::ct_error::eagain::handle([] {
+ return interruptor::now();
+ })
+ );
+ });
+ });
+ });
+ }).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] {
+ return seastar::now();
+ })).then_interruptible([] {
+ return seastar::stop_iteration::yes;
+ });
+ }, [this](std::exception_ptr eptr) {
+ if (should_abort_request(*this, std::move(eptr))) {
+ return seastar::stop_iteration::yes;
+ } else {
+ return seastar::stop_iteration::no;
+ }
+ }, pg);
+ }).then([this] {
+ track_event<CompletionEvent>();
+ });
+ });
+}
+
+} // namespace crimson::osd
+
diff --git a/src/crimson/osd/osd_operations/internal_client_request.h b/src/crimson/osd/osd_operations/internal_client_request.h
new file mode 100644
index 000000000..8eed12e05
--- /dev/null
+++ b/src/crimson/osd/osd_operations/internal_client_request.h
@@ -0,0 +1,68 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/common/type_helpers.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/client_request_common.h"
+#include "crimson/osd/osd_operations/common/pg_pipeline.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/pg_activation_blocker.h"
+
+namespace crimson::osd {
+
+class InternalClientRequest : public PhasedOperationT<InternalClientRequest>,
+ private CommonClientRequest {
+public:
+ explicit InternalClientRequest(Ref<PG> pg);
+ ~InternalClientRequest();
+
+ // imposed by `ShardService::start_operation<T>(...)`.
+ seastar::future<> start();
+
+protected:
+ virtual const hobject_t& get_target_oid() const = 0;
+ virtual PG::do_osd_ops_params_t get_do_osd_ops_params() const = 0;
+ virtual std::vector<OSDOp> create_osd_ops() = 0;
+
+ const PG& get_pg() const {
+ return *pg;
+ }
+
+private:
+ friend OperationT<InternalClientRequest>;
+
+ static constexpr OperationTypeCode type =
+ OperationTypeCode::internal_client_request;
+
+ void print(std::ostream &) const final;
+ void dump_detail(Formatter *f) const final;
+
+ CommonPGPipeline& client_pp();
+
+ seastar::future<> do_process();
+
+ Ref<PG> pg;
+ OpInfo op_info;
+ PipelineHandle handle;
+
+public:
+ PipelineHandle& get_handle() { return handle; }
+
+ std::tuple<
+ StartEvent,
+ CommonPGPipeline::WaitForActive::BlockingEvent,
+ PGActivationBlocker::BlockingEvent,
+ CommonPGPipeline::RecoverMissing::BlockingEvent,
+ CommonPGPipeline::GetOBC::BlockingEvent,
+ CommonPGPipeline::Process::BlockingEvent,
+ CompletionEvent
+ > tracking_events;
+};
+
+} // namespace crimson::osd
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::InternalClientRequest> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/osd/osd_operations/logmissing_request.cc b/src/crimson/osd/osd_operations/logmissing_request.cc
new file mode 100644
index 000000000..739b46406
--- /dev/null
+++ b/src/crimson/osd/osd_operations/logmissing_request.cc
@@ -0,0 +1,79 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "logmissing_request.h"
+
+#include "common/Formatter.h"
+
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
+#include "crimson/osd/pg.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+LogMissingRequest::LogMissingRequest(crimson::net::ConnectionRef&& conn,
+ Ref<MOSDPGUpdateLogMissing> &&req)
+ : conn{std::move(conn)},
+ req{std::move(req)}
+{}
+
+void LogMissingRequest::print(std::ostream& os) const
+{
+ os << "LogMissingRequest("
+ << "from=" << req->from
+ << " req=" << *req
+ << ")";
+}
+
+void LogMissingRequest::dump_detail(Formatter *f) const
+{
+ f->open_object_section("LogMissingRequest");
+ f->dump_stream("req_tid") << req->get_tid();
+ f->dump_stream("pgid") << req->get_spg();
+ f->dump_unsigned("map_epoch", req->get_map_epoch());
+ f->dump_unsigned("min_epoch", req->get_min_epoch());
+ f->dump_stream("entries") << req->entries;
+ f->dump_stream("from") << req->from;
+ f->close_section();
+}
+
+ConnectionPipeline &LogMissingRequest::get_connection_pipeline()
+{
+ return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+}
+
+ClientRequest::PGPipeline &LogMissingRequest::client_pp(PG &pg)
+{
+ return pg.request_pg_pipeline;
+}
+
+seastar::future<> LogMissingRequest::with_pg(
+ ShardServices &shard_services, Ref<PG> pg)
+{
+ logger().debug("{}: LogMissingRequest::with_pg", *this);
+
+ IRef ref = this;
+ return interruptor::with_interruption([this, pg] {
+ logger().debug("{}: pg present", *this);
+ return this->template enter_stage<interruptor>(client_pp(*pg).await_map
+ ).then_interruptible([this, pg] {
+ return this->template with_blocking_event<
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+ >([this, pg](auto &&trigger) {
+ return pg->osdmap_gate.wait_for_map(
+ std::move(trigger), req->min_epoch);
+ });
+ }).then_interruptible([this, pg](auto) {
+ return pg->do_update_log_missing(req, conn);
+ });
+ }, [ref](std::exception_ptr) { return seastar::now(); }, pg);
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/logmissing_request.h b/src/crimson/osd/osd_operations/logmissing_request.h
new file mode 100644
index 000000000..71d0816fd
--- /dev/null
+++ b/src/crimson/osd/osd_operations/logmissing_request.h
@@ -0,0 +1,81 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/pg_map.h"
+#include "crimson/common/type_helpers.h"
+#include "messages/MOSDPGUpdateLogMissing.h"
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson::osd {
+
+class ShardServices;
+
+class OSD;
+class PG;
+
+class LogMissingRequest final : public PhasedOperationT<LogMissingRequest> {
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::logmissing_request;
+ LogMissingRequest(crimson::net::ConnectionRef&&, Ref<MOSDPGUpdateLogMissing>&&);
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter* f) const final;
+
+ static constexpr bool can_create() { return false; }
+ spg_t get_pgid() const {
+ return req->get_spg();
+ }
+ PipelineHandle &get_handle() { return handle; }
+ epoch_t get_epoch() const { return req->get_min_epoch(); }
+
+ ConnectionPipeline &get_connection_pipeline();
+ seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+ assert(conn);
+ return conn.get_foreign(
+ ).then([this](auto f_conn) {
+ conn.reset();
+ return f_conn;
+ });
+ }
+ void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+ assert(!conn);
+ conn = make_local_shared_foreign(std::move(_conn));
+ }
+
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
+
+ std::tuple<
+ StartEvent,
+ ConnectionPipeline::AwaitActive::BlockingEvent,
+ ConnectionPipeline::AwaitMap::BlockingEvent,
+ ConnectionPipeline::GetPG::BlockingEvent,
+ ClientRequest::PGPipeline::AwaitMap::BlockingEvent,
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ PGMap::PGCreationBlockingEvent,
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
+ > tracking_events;
+
+private:
+ ClientRequest::PGPipeline &client_pp(PG &pg);
+
+ crimson::net::ConnectionRef conn;
+ // must be after `conn` to ensure the ConnectionPipeline's is alive
+ PipelineHandle handle;
+ Ref<MOSDPGUpdateLogMissing> req;
+};
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::LogMissingRequest> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/osd/osd_operations/logmissing_request_reply.cc b/src/crimson/osd/osd_operations/logmissing_request_reply.cc
new file mode 100644
index 000000000..b4bf2938e
--- /dev/null
+++ b/src/crimson/osd/osd_operations/logmissing_request_reply.cc
@@ -0,0 +1,68 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "logmissing_request_reply.h"
+
+#include "common/Formatter.h"
+
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
+#include "crimson/osd/pg.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+LogMissingRequestReply::LogMissingRequestReply(
+ crimson::net::ConnectionRef&& conn,
+ Ref<MOSDPGUpdateLogMissingReply> &&req)
+ : conn{std::move(conn)},
+ req{std::move(req)}
+{}
+
+void LogMissingRequestReply::print(std::ostream& os) const
+{
+ os << "LogMissingRequestReply("
+ << "from=" << req->from
+ << " req=" << *req
+ << ")";
+}
+
+void LogMissingRequestReply::dump_detail(Formatter *f) const
+{
+ f->open_object_section("LogMissingRequestReply");
+ f->dump_stream("rep_tid") << req->get_tid();
+ f->dump_stream("pgid") << req->get_spg();
+ f->dump_unsigned("map_epoch", req->get_map_epoch());
+ f->dump_unsigned("min_epoch", req->get_min_epoch());
+ f->dump_stream("from") << req->from;
+ f->close_section();
+}
+
+ConnectionPipeline &LogMissingRequestReply::get_connection_pipeline()
+{
+ return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+}
+
+ClientRequest::PGPipeline &LogMissingRequestReply::client_pp(PG &pg)
+{
+ return pg.request_pg_pipeline;
+}
+
+seastar::future<> LogMissingRequestReply::with_pg(
+ ShardServices &shard_services, Ref<PG> pg)
+{
+ logger().debug("{}: LogMissingRequestReply::with_pg", *this);
+
+ IRef ref = this;
+ return interruptor::with_interruption([this, pg] {
+ return pg->do_update_log_missing_reply(std::move(req));
+ }, [ref](std::exception_ptr) { return seastar::now(); }, pg);
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/logmissing_request_reply.h b/src/crimson/osd/osd_operations/logmissing_request_reply.h
new file mode 100644
index 000000000..c89131fec
--- /dev/null
+++ b/src/crimson/osd/osd_operations/logmissing_request_reply.h
@@ -0,0 +1,79 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/pg_map.h"
+#include "crimson/common/type_helpers.h"
+#include "messages/MOSDPGUpdateLogMissingReply.h"
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson::osd {
+
+class ShardServices;
+
+class OSD;
+class PG;
+
+class LogMissingRequestReply final : public PhasedOperationT<LogMissingRequestReply> {
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::logmissing_request_reply;
+ LogMissingRequestReply(crimson::net::ConnectionRef&&, Ref<MOSDPGUpdateLogMissingReply>&&);
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter* f) const final;
+
+ static constexpr bool can_create() { return false; }
+ spg_t get_pgid() const {
+ return req->get_spg();
+ }
+ PipelineHandle &get_handle() { return handle; }
+ epoch_t get_epoch() const { return req->get_min_epoch(); }
+
+ ConnectionPipeline &get_connection_pipeline();
+ seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+ assert(conn);
+ return conn.get_foreign(
+ ).then([this](auto f_conn) {
+ conn.reset();
+ return f_conn;
+ });
+ }
+ void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+ assert(!conn);
+ conn = make_local_shared_foreign(std::move(_conn));
+ }
+
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
+
+ std::tuple<
+ StartEvent,
+ ConnectionPipeline::AwaitActive::BlockingEvent,
+ ConnectionPipeline::AwaitMap::BlockingEvent,
+ ConnectionPipeline::GetPG::BlockingEvent,
+ PGMap::PGCreationBlockingEvent,
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
+ > tracking_events;
+
+private:
+ ClientRequest::PGPipeline &client_pp(PG &pg);
+
+ crimson::net::ConnectionRef conn;
+ // must be after `conn` to ensure the ConnectionPipeline's is alive
+ PipelineHandle handle;
+ Ref<MOSDPGUpdateLogMissingReply> req;
+};
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::LogMissingRequestReply> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/osd/osd_operations/osdop_params.h b/src/crimson/osd/osd_operations/osdop_params.h
new file mode 100644
index 000000000..c7b81e765
--- /dev/null
+++ b/src/crimson/osd/osd_operations/osdop_params.h
@@ -0,0 +1,22 @@
+#pragma once
+
+#include "messages/MOSDOp.h"
+#include "osd/osd_types.h"
+#include "crimson/common/type_helpers.h"
+
+// The fields in this struct are parameters that may be needed in multiple
+// level of processing. I inclosed all those parameters in this struct to
+// avoid passing each of them as a method parameter.
+struct osd_op_params_t {
+ osd_reqid_t req_id;
+ utime_t mtime;
+ eversion_t at_version;
+ eversion_t pg_trim_to;
+ eversion_t min_last_complete_ondisk;
+ eversion_t last_complete;
+ version_t user_at_version = 0;
+ bool user_modify = false;
+ ObjectCleanRegions clean_regions;
+
+ osd_op_params_t() = default;
+};
diff --git a/src/crimson/osd/osd_operations/peering_event.cc b/src/crimson/osd/osd_operations/peering_event.cc
new file mode 100644
index 000000000..ea4662bd0
--- /dev/null
+++ b/src/crimson/osd/osd_operations/peering_event.cc
@@ -0,0 +1,190 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+#include <seastar/core/sleep.hh>
+
+#include "messages/MOSDPGLog.h"
+
+#include "common/Formatter.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_connection_priv.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+template <class T>
+void PeeringEvent<T>::print(std::ostream &lhs) const
+{
+ lhs << "PeeringEvent("
+ << "from=" << from
+ << " pgid=" << pgid
+ << " sent=" << evt.get_epoch_sent()
+ << " requested=" << evt.get_epoch_requested()
+ << " evt=" << evt.get_desc()
+ << ")";
+}
+
+template <class T>
+void PeeringEvent<T>::dump_detail(Formatter *f) const
+{
+ f->open_object_section("PeeringEvent");
+ f->dump_stream("from") << from;
+ f->dump_stream("pgid") << pgid;
+ f->dump_int("sent", evt.get_epoch_sent());
+ f->dump_int("requested", evt.get_epoch_requested());
+ f->dump_string("evt", evt.get_desc());
+ f->open_array_section("events");
+ {
+ std::apply([f](auto&... events) {
+ (..., events.dump(f));
+ }, static_cast<const T*>(this)->tracking_events);
+ }
+ f->close_section();
+ f->close_section();
+}
+
+
+template <class T>
+PGPeeringPipeline &PeeringEvent<T>::peering_pp(PG &pg)
+{
+ return pg.peering_request_pg_pipeline;
+}
+
+template <class T>
+seastar::future<> PeeringEvent<T>::with_pg(
+ ShardServices &shard_services, Ref<PG> pg)
+{
+ if (!pg) {
+ logger().warn("{}: pg absent, did not create", *this);
+ on_pg_absent(shard_services);
+ that()->get_handle().exit();
+ return complete_rctx_no_pg(shard_services);
+ }
+
+ using interruptor = typename T::interruptor;
+ return interruptor::with_interruption([this, pg, &shard_services] {
+ logger().debug("{}: pg present", *this);
+ return this->template enter_stage<interruptor>(peering_pp(*pg).await_map
+ ).then_interruptible([this, pg] {
+ return this->template with_blocking_event<
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+ >([this, pg](auto &&trigger) {
+ return pg->osdmap_gate.wait_for_map(
+ std::move(trigger), evt.get_epoch_sent());
+ });
+ }).then_interruptible([this, pg](auto) {
+ return this->template enter_stage<interruptor>(peering_pp(*pg).process);
+ }).then_interruptible([this, pg, &shard_services] {
+ return pg->do_peering_event(evt, ctx
+ ).then_interruptible([this, pg, &shard_services] {
+ that()->get_handle().exit();
+ return complete_rctx(shard_services, pg);
+ });
+ }).then_interruptible([pg, &shard_services]()
+ -> typename T::template interruptible_future<> {
+ if (!pg->get_need_up_thru()) {
+ return seastar::now();
+ }
+ return shard_services.send_alive(pg->get_same_interval_since());
+ }).then_interruptible([&shard_services] {
+ return shard_services.send_pg_temp();
+ });
+ }, [this](std::exception_ptr ep) {
+ logger().debug("{}: interrupted with {}", *this, ep);
+ }, pg);
+}
+
+template <class T>
+void PeeringEvent<T>::on_pg_absent(ShardServices &)
+{
+ logger().debug("{}: pg absent, dropping", *this);
+}
+
+template <class T>
+typename PeeringEvent<T>::template interruptible_future<>
+PeeringEvent<T>::complete_rctx(ShardServices &shard_services, Ref<PG> pg)
+{
+ logger().debug("{}: submitting ctx", *this);
+ return shard_services.dispatch_context(
+ pg->get_collection_ref(),
+ std::move(ctx));
+}
+
+ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline()
+{
+ return get_osd_priv(conn.get()).peering_request_conn_pipeline;
+}
+
+void RemotePeeringEvent::on_pg_absent(ShardServices &shard_services)
+{
+ if (auto& e = get_event().get_event();
+ e.dynamic_type() == MQuery::static_type()) {
+ const auto map_epoch =
+ shard_services.get_map()->get_epoch();
+ const auto& q = static_cast<const MQuery&>(e);
+ const pg_info_t empty{spg_t{pgid.pgid, q.query.to}};
+ if (q.query.type == q.query.LOG ||
+ q.query.type == q.query.FULLLOG) {
+ auto m = crimson::make_message<MOSDPGLog>(q.query.from, q.query.to,
+ map_epoch, empty,
+ q.query.epoch_sent);
+ ctx.send_osd_message(q.from.osd, std::move(m));
+ } else {
+ ctx.send_notify(q.from.osd, {q.query.from, q.query.to,
+ q.query.epoch_sent,
+ map_epoch, empty,
+ PastIntervals{}});
+ }
+ }
+}
+
+RemotePeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(
+ ShardServices &shard_services,
+ Ref<PG> pg)
+{
+ if (pg) {
+ return PeeringEvent::complete_rctx(shard_services, pg);
+ } else {
+ return shard_services.dispatch_context_messages(std::move(ctx));
+ }
+}
+
+seastar::future<> RemotePeeringEvent::complete_rctx_no_pg(
+ ShardServices &shard_services)
+{
+ return shard_services.dispatch_context_messages(std::move(ctx));
+}
+
+seastar::future<> LocalPeeringEvent::start()
+{
+ logger().debug("{}: start", *this);
+
+ IRef ref = this;
+ auto maybe_delay = seastar::now();
+ if (delay) {
+ maybe_delay = seastar::sleep(
+ std::chrono::milliseconds(std::lround(delay * 1000)));
+ }
+ return maybe_delay.then([this] {
+ return with_pg(pg->get_shard_services(), pg);
+ }).finally([ref=std::move(ref)] {
+ logger().debug("{}: complete", *ref);
+ });
+}
+
+
+LocalPeeringEvent::~LocalPeeringEvent() {}
+
+template class PeeringEvent<RemotePeeringEvent>;
+template class PeeringEvent<LocalPeeringEvent>;
+
+}
diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h
new file mode 100644
index 000000000..e94caead1
--- /dev/null
+++ b/src/crimson/osd/osd_operations/peering_event.h
@@ -0,0 +1,207 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+#include <seastar/core/future.hh>
+
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_operation.h"
+#include "osd/osd_types.h"
+#include "osd/PGPeeringEvent.h"
+#include "osd/PeeringState.h"
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson::osd {
+
+class OSD;
+class ShardServices;
+class PG;
+class BackfillRecovery;
+
+ class PGPeeringPipeline {
+ struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> {
+ static constexpr auto type_name = "PeeringEvent::PGPipeline::await_map";
+ } await_map;
+ struct Process : OrderedExclusivePhaseT<Process> {
+ static constexpr auto type_name = "PeeringEvent::PGPipeline::process";
+ } process;
+ template <class T>
+ friend class PeeringEvent;
+ friend class LocalPeeringEvent;
+ friend class RemotePeeringEvent;
+ friend class PGAdvanceMap;
+ friend class BackfillRecovery;
+ };
+
+template <class T>
+class PeeringEvent : public PhasedOperationT<T> {
+ T* that() {
+ return static_cast<T*>(this);
+ }
+ const T* that() const {
+ return static_cast<const T*>(this);
+ }
+
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::peering_event;
+
+protected:
+ PGPeeringPipeline &peering_pp(PG &pg);
+
+ PeeringCtx ctx;
+ pg_shard_t from;
+ spg_t pgid;
+ float delay = 0;
+ PGPeeringEvent evt;
+
+ const pg_shard_t get_from() const {
+ return from;
+ }
+
+ const spg_t get_pgid() const {
+ return pgid;
+ }
+
+ const PGPeeringEvent &get_event() const {
+ return evt;
+ }
+
+ virtual void on_pg_absent(ShardServices &);
+
+ virtual typename PeeringEvent::template interruptible_future<>
+ complete_rctx(ShardServices &, Ref<PG>);
+
+ virtual seastar::future<> complete_rctx_no_pg(
+ ShardServices &shard_services
+ ) { return seastar::now();}
+
+public:
+ template <typename... Args>
+ PeeringEvent(
+ const pg_shard_t &from, const spg_t &pgid,
+ Args&&... args) :
+ from(from),
+ pgid(pgid),
+ evt(std::forward<Args>(args)...)
+ {}
+ template <typename... Args>
+ PeeringEvent(
+ const pg_shard_t &from, const spg_t &pgid,
+ float delay, Args&&... args) :
+ from(from),
+ pgid(pgid),
+ delay(delay),
+ evt(std::forward<Args>(args)...)
+ {}
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter* f) const final;
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
+};
+
+class RemotePeeringEvent : public PeeringEvent<RemotePeeringEvent> {
+protected:
+ crimson::net::ConnectionRef conn;
+ // must be after conn due to ConnectionPipeline's life-time
+ PipelineHandle handle;
+
+ void on_pg_absent(ShardServices &) final;
+ PeeringEvent::interruptible_future<> complete_rctx(
+ ShardServices &shard_services,
+ Ref<PG> pg) override;
+ seastar::future<> complete_rctx_no_pg(
+ ShardServices &shard_services
+ ) override;
+
+public:
+ class OSDPipeline {
+ struct AwaitActive : OrderedExclusivePhaseT<AwaitActive> {
+ static constexpr auto type_name =
+ "PeeringRequest::OSDPipeline::await_active";
+ } await_active;
+ friend class RemotePeeringEvent;
+ };
+
+ template <typename... Args>
+ RemotePeeringEvent(crimson::net::ConnectionRef conn, Args&&... args) :
+ PeeringEvent(std::forward<Args>(args)...),
+ conn(conn)
+ {}
+
+ std::tuple<
+ StartEvent,
+ ConnectionPipeline::AwaitActive::BlockingEvent,
+ ConnectionPipeline::AwaitMap::BlockingEvent,
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ ConnectionPipeline::GetPG::BlockingEvent,
+ PGMap::PGCreationBlockingEvent,
+ PGPeeringPipeline::AwaitMap::BlockingEvent,
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ PGPeeringPipeline::Process::BlockingEvent,
+ OSDPipeline::AwaitActive::BlockingEvent,
+ CompletionEvent
+ > tracking_events;
+
+ static constexpr bool can_create() { return true; }
+ auto get_create_info() { return std::move(evt.create_info); }
+ spg_t get_pgid() const {
+ return pgid;
+ }
+ PipelineHandle &get_handle() { return handle; }
+ epoch_t get_epoch() const { return evt.get_epoch_sent(); }
+
+ ConnectionPipeline &get_connection_pipeline();
+ seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+ assert(conn);
+ return conn.get_foreign(
+ ).then([this](auto f_conn) {
+ conn.reset();
+ return f_conn;
+ });
+ }
+ void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+ assert(!conn);
+ conn = make_local_shared_foreign(std::move(_conn));
+ }
+};
+
+class LocalPeeringEvent final : public PeeringEvent<LocalPeeringEvent> {
+protected:
+ Ref<PG> pg;
+ PipelineHandle handle;
+
+public:
+ template <typename... Args>
+ LocalPeeringEvent(Ref<PG> pg, Args&&... args) :
+ PeeringEvent(std::forward<Args>(args)...),
+ pg(pg)
+ {}
+
+ seastar::future<> start();
+ virtual ~LocalPeeringEvent();
+
+ PipelineHandle &get_handle() { return handle; }
+
+ std::tuple<
+ StartEvent,
+ PGPeeringPipeline::AwaitMap::BlockingEvent,
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ PGPeeringPipeline::Process::BlockingEvent,
+ CompletionEvent
+ > tracking_events;
+};
+
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::LocalPeeringEvent> : fmt::ostream_formatter {};
+template <> struct fmt::formatter<crimson::osd::RemotePeeringEvent> : fmt::ostream_formatter {};
+template <class T> struct fmt::formatter<crimson::osd::PeeringEvent<T>> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc
new file mode 100644
index 000000000..3706af810
--- /dev/null
+++ b/src/crimson/osd/osd_operations/pg_advance_map.cc
@@ -0,0 +1,130 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include "include/types.h"
+#include "common/Formatter.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osdmap_service.h"
+#include "crimson/osd/shard_services.h"
+#include "crimson/osd/osd_operations/pg_advance_map.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
+#include "osd/PeeringState.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+PGAdvanceMap::PGAdvanceMap(
+ ShardServices &shard_services, Ref<PG> pg, epoch_t to,
+ PeeringCtx &&rctx, bool do_init)
+ : shard_services(shard_services), pg(pg), to(to),
+ rctx(std::move(rctx)), do_init(do_init)
+{
+ logger().debug("{}: created", *this);
+}
+
+PGAdvanceMap::~PGAdvanceMap() {}
+
+void PGAdvanceMap::print(std::ostream &lhs) const
+{
+ lhs << "PGAdvanceMap("
+ << "pg=" << pg->get_pgid()
+ << " from=" << (from ? *from : -1)
+ << " to=" << to;
+ if (do_init) {
+ lhs << " do_init";
+ }
+ lhs << ")";
+}
+
+void PGAdvanceMap::dump_detail(Formatter *f) const
+{
+ f->open_object_section("PGAdvanceMap");
+ f->dump_stream("pgid") << pg->get_pgid();
+ if (from) {
+ f->dump_int("from", *from);
+ }
+ f->dump_int("to", to);
+ f->dump_bool("do_init", do_init);
+ f->close_section();
+}
+
+PGPeeringPipeline &PGAdvanceMap::peering_pp(PG &pg)
+{
+ return pg.peering_request_pg_pipeline;
+}
+
+seastar::future<> PGAdvanceMap::start()
+{
+ using cached_map_t = OSDMapService::cached_map_t;
+
+ logger().debug("{}: start", *this);
+
+ IRef ref = this;
+ return enter_stage<>(
+ peering_pp(*pg).process
+ ).then([this] {
+ /*
+ * PGAdvanceMap is scheduled at pg creation and when
+ * broadcasting new osdmaps to pgs. We are not able to serialize
+ * between the two different PGAdvanceMap callers since a new pg
+ * will get advanced to the latest osdmap at it's creation.
+ * As a result, we may need to adjust the PGAdvance operation
+ * 'from' epoch.
+ * See: https://tracker.ceph.com/issues/61744
+ */
+ from = pg->get_osdmap_epoch();
+ auto fut = seastar::now();
+ if (do_init) {
+ fut = pg->handle_initialize(rctx
+ ).then([this] {
+ return pg->handle_activate_map(rctx);
+ });
+ }
+ return fut.then([this] {
+ ceph_assert(std::cmp_less_equal(*from, to));
+ return seastar::do_for_each(
+ boost::make_counting_iterator(*from + 1),
+ boost::make_counting_iterator(to + 1),
+ [this](epoch_t next_epoch) {
+ logger().debug("{}: start: getting map {}",
+ *this, next_epoch);
+ return shard_services.get_map(next_epoch).then(
+ [this] (cached_map_t&& next_map) {
+ logger().debug("{}: advancing map to {}",
+ *this, next_map->get_epoch());
+ return pg->handle_advance_map(next_map, rctx);
+ });
+ }).then([this] {
+ return pg->handle_activate_map(rctx).then([this] {
+ logger().debug("{}: map activated", *this);
+ if (do_init) {
+ shard_services.pg_created(pg->get_pgid(), pg);
+ logger().info("PGAdvanceMap::start new pg {}", *pg);
+ }
+ return seastar::when_all_succeed(
+ pg->get_need_up_thru()
+ ? shard_services.send_alive(
+ pg->get_same_interval_since())
+ : seastar::now(),
+ shard_services.dispatch_context(
+ pg->get_collection_ref(),
+ std::move(rctx)));
+ });
+ }).then_unpack([this] {
+ logger().debug("{}: sending pg temp", *this);
+ return shard_services.send_pg_temp();
+ });
+ });
+ }).then([this, ref=std::move(ref)] {
+ logger().debug("{}: complete", *this);
+ });
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/pg_advance_map.h b/src/crimson/osd/osd_operations/pg_advance_map.h
new file mode 100644
index 000000000..b712cc12e
--- /dev/null
+++ b/src/crimson/osd/osd_operations/pg_advance_map.h
@@ -0,0 +1,61 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+#include <seastar/core/future.hh>
+
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "osd/osd_types.h"
+#include "crimson/common/type_helpers.h"
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson::osd {
+
+class ShardServices;
+class PG;
+
+class PGAdvanceMap : public PhasedOperationT<PGAdvanceMap> {
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::pg_advance_map;
+
+protected:
+ ShardServices &shard_services;
+ Ref<PG> pg;
+ PipelineHandle handle;
+
+ std::optional<epoch_t> from;
+ epoch_t to;
+
+ PeeringCtx rctx;
+ const bool do_init;
+
+public:
+ PGAdvanceMap(
+ ShardServices &shard_services, Ref<PG> pg, epoch_t to,
+ PeeringCtx &&rctx, bool do_init);
+ ~PGAdvanceMap();
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter *f) const final;
+ seastar::future<> start();
+ PipelineHandle &get_handle() { return handle; }
+
+ std::tuple<
+ PGPeeringPipeline::Process::BlockingEvent
+ > tracking_events;
+
+private:
+ PGPeeringPipeline &peering_pp(PG &pg);
+};
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::PGAdvanceMap> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.cc b/src/crimson/osd/osd_operations/recovery_subrequest.cc
new file mode 100644
index 000000000..68655b8da
--- /dev/null
+++ b/src/crimson/osd/osd_operations/recovery_subrequest.cc
@@ -0,0 +1,46 @@
+#include <fmt/format.h>
+#include <fmt/ostream.h>
+
+#include "crimson/osd/osd_operations/recovery_subrequest.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd_connection_priv.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson {
+ template <>
+ struct EventBackendRegistry<osd::RecoverySubRequest> {
+ static std::tuple<> get_backends() {
+ return {};
+ }
+ };
+}
+
+namespace crimson::osd {
+
+seastar::future<> RecoverySubRequest::with_pg(
+ ShardServices &shard_services, Ref<PG> pgref)
+{
+ logger().debug("{}: {}", "RecoverySubRequest::with_pg", *this);
+
+ track_event<StartEvent>();
+ IRef opref = this;
+ return interruptor::with_interruption([this, pgref] {
+ return pgref->get_recovery_backend()->handle_recovery_op(m, conn);
+ }, [](std::exception_ptr) {
+ return seastar::now();
+ }, pgref).finally([this, opref, pgref] {
+ track_event<CompletionEvent>();
+ });
+}
+
+ConnectionPipeline &RecoverySubRequest::get_connection_pipeline()
+{
+ return get_osd_priv(conn.get()).peering_request_conn_pipeline;
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.h b/src/crimson/osd/osd_operations/recovery_subrequest.h
new file mode 100644
index 000000000..07c7c95b5
--- /dev/null
+++ b/src/crimson/osd/osd_operations/recovery_subrequest.h
@@ -0,0 +1,81 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "osd/osd_op_util.h"
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/pg.h"
+#include "crimson/common/type_helpers.h"
+#include "messages/MOSDFastDispatchOp.h"
+
+namespace crimson::osd {
+
+class PG;
+
+class RecoverySubRequest final : public PhasedOperationT<RecoverySubRequest> {
+public:
+ static constexpr OperationTypeCode type =
+ OperationTypeCode::background_recovery_sub;
+
+ RecoverySubRequest(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDFastDispatchOp>&& m)
+ : conn(conn), m(m) {}
+
+ void print(std::ostream& out) const final
+ {
+ out << *m;
+ }
+
+ void dump_detail(Formatter *f) const final
+ {
+ }
+
+ static constexpr bool can_create() { return false; }
+ spg_t get_pgid() const {
+ return m->get_spg();
+ }
+ PipelineHandle &get_handle() { return handle; }
+ epoch_t get_epoch() const { return m->get_min_epoch(); }
+
+ ConnectionPipeline &get_connection_pipeline();
+ seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+ assert(conn);
+ return conn.get_foreign(
+ ).then([this](auto f_conn) {
+ conn.reset();
+ return f_conn;
+ });
+ }
+ void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+ assert(!conn);
+ conn = make_local_shared_foreign(std::move(_conn));
+ }
+
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
+
+ std::tuple<
+ StartEvent,
+ ConnectionPipeline::AwaitActive::BlockingEvent,
+ ConnectionPipeline::AwaitMap::BlockingEvent,
+ ConnectionPipeline::GetPG::BlockingEvent,
+ PGMap::PGCreationBlockingEvent,
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ CompletionEvent
+ > tracking_events;
+
+private:
+ crimson::net::ConnectionRef conn;
+ // must be after `conn` to ensure the ConnectionPipeline's is alive
+ PipelineHandle handle;
+ Ref<MOSDFastDispatchOp> m;
+};
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::RecoverySubRequest> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/osd/osd_operations/replicated_request.cc b/src/crimson/osd/osd_operations/replicated_request.cc
new file mode 100644
index 000000000..09217575c
--- /dev/null
+++ b/src/crimson/osd/osd_operations/replicated_request.cc
@@ -0,0 +1,80 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "replicated_request.h"
+
+#include "common/Formatter.h"
+
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
+#include "crimson/osd/pg.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+RepRequest::RepRequest(crimson::net::ConnectionRef&& conn,
+ Ref<MOSDRepOp> &&req)
+ : conn{std::move(conn)},
+ req{std::move(req)}
+{}
+
+void RepRequest::print(std::ostream& os) const
+{
+ os << "RepRequest("
+ << "from=" << req->from
+ << " req=" << *req
+ << ")";
+}
+
+void RepRequest::dump_detail(Formatter *f) const
+{
+ f->open_object_section("RepRequest");
+ f->dump_stream("reqid") << req->reqid;
+ f->dump_stream("pgid") << req->get_spg();
+ f->dump_unsigned("map_epoch", req->get_map_epoch());
+ f->dump_unsigned("min_epoch", req->get_min_epoch());
+ f->dump_stream("oid") << req->poid;
+ f->dump_stream("from") << req->from;
+ f->close_section();
+}
+
+ConnectionPipeline &RepRequest::get_connection_pipeline()
+{
+ return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+}
+
+ClientRequest::PGPipeline &RepRequest::client_pp(PG &pg)
+{
+ return pg.request_pg_pipeline;
+}
+
+seastar::future<> RepRequest::with_pg(
+ ShardServices &shard_services, Ref<PG> pg)
+{
+ logger().debug("{}: RepRequest::with_pg", *this);
+ IRef ref = this;
+ return interruptor::with_interruption([this, pg] {
+ logger().debug("{}: pg present", *this);
+ return this->template enter_stage<interruptor>(client_pp(*pg).await_map
+ ).then_interruptible([this, pg] {
+ return this->template with_blocking_event<
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+ >([this, pg](auto &&trigger) {
+ return pg->osdmap_gate.wait_for_map(
+ std::move(trigger), req->min_epoch);
+ });
+ }).then_interruptible([this, pg] (auto) {
+ return pg->handle_rep_op(req);
+ });
+ }, [ref](std::exception_ptr) {
+ return seastar::now();
+ }, pg);
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/replicated_request.h b/src/crimson/osd/osd_operations/replicated_request.h
new file mode 100644
index 000000000..c742888d9
--- /dev/null
+++ b/src/crimson/osd/osd_operations/replicated_request.h
@@ -0,0 +1,80 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/pg_map.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/common/type_helpers.h"
+#include "messages/MOSDRepOp.h"
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson::osd {
+
+class ShardServices;
+
+class OSD;
+class PG;
+
+class RepRequest final : public PhasedOperationT<RepRequest> {
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::replicated_request;
+ RepRequest(crimson::net::ConnectionRef&&, Ref<MOSDRepOp>&&);
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter* f) const final;
+
+ static constexpr bool can_create() { return false; }
+ spg_t get_pgid() const {
+ return req->get_spg();
+ }
+ PipelineHandle &get_handle() { return handle; }
+ epoch_t get_epoch() const { return req->get_min_epoch(); }
+
+ ConnectionPipeline &get_connection_pipeline();
+ seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+ assert(conn);
+ return conn.get_foreign(
+ ).then([this](auto f_conn) {
+ conn.reset();
+ return f_conn;
+ });
+ }
+ void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+ assert(!conn);
+ conn = make_local_shared_foreign(std::move(_conn));
+ }
+
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
+
+ std::tuple<
+ StartEvent,
+ ConnectionPipeline::AwaitActive::BlockingEvent,
+ ConnectionPipeline::AwaitMap::BlockingEvent,
+ ConnectionPipeline::GetPG::BlockingEvent,
+ ClientRequest::PGPipeline::AwaitMap::BlockingEvent,
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ PGMap::PGCreationBlockingEvent,
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
+ > tracking_events;
+
+private:
+ ClientRequest::PGPipeline &client_pp(PG &pg);
+
+ crimson::net::ConnectionRef conn;
+ PipelineHandle handle;
+ Ref<MOSDRepOp> req;
+};
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::RepRequest> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/osd/osd_operations/snaptrim_event.cc b/src/crimson/osd/osd_operations/snaptrim_event.cc
new file mode 100644
index 000000000..e4a1b04df
--- /dev/null
+++ b/src/crimson/osd/osd_operations/snaptrim_event.cc
@@ -0,0 +1,569 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/osd/osd_operations/snaptrim_event.h"
+#include "crimson/osd/ops_executer.h"
+#include "crimson/osd/pg.h"
+#include <seastar/core/sleep.hh>
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson {
+ template <>
+ struct EventBackendRegistry<osd::SnapTrimEvent> {
+ static std::tuple<> get_backends() {
+ return {};
+ }
+ };
+
+ template <>
+ struct EventBackendRegistry<osd::SnapTrimObjSubEvent> {
+ static std::tuple<> get_backends() {
+ return {};
+ }
+ };
+}
+
+namespace crimson::osd {
+
+PG::interruptible_future<>
+PG::SnapTrimMutex::lock(SnapTrimEvent &st_event) noexcept
+{
+ return st_event.enter_stage<interruptor>(wait_pg
+ ).then_interruptible([this] {
+ return mutex.lock();
+ });
+}
+
+void SnapTrimEvent::SubOpBlocker::dump_detail(Formatter *f) const
+{
+ f->open_array_section("dependent_operations");
+ {
+ for (const auto &kv : subops) {
+ f->dump_unsigned("op_id", kv.first);
+ }
+ }
+ f->close_section();
+}
+
+template <class... Args>
+void SnapTrimEvent::SubOpBlocker::emplace_back(Args&&... args)
+{
+ subops.emplace_back(std::forward<Args>(args)...);
+};
+
+SnapTrimEvent::remove_or_update_iertr::future<>
+SnapTrimEvent::SubOpBlocker::wait_completion()
+{
+ return interruptor::do_for_each(subops, [](auto&& kv) {
+ return std::move(kv.second);
+ });
+}
+
+void SnapTrimEvent::print(std::ostream &lhs) const
+{
+ lhs << "SnapTrimEvent("
+ << "pgid=" << pg->get_pgid()
+ << " snapid=" << snapid
+ << " needs_pause=" << needs_pause
+ << ")";
+}
+
+void SnapTrimEvent::dump_detail(Formatter *f) const
+{
+ f->open_object_section("SnapTrimEvent");
+ f->dump_stream("pgid") << pg->get_pgid();
+ f->close_section();
+}
+
+SnapTrimEvent::snap_trim_ertr::future<seastar::stop_iteration>
+SnapTrimEvent::start()
+{
+ logger().debug("{}: {}", *this, __func__);
+ return with_pg(
+ pg->get_shard_services(), pg
+ ).finally([ref=IRef{this}, this] {
+ logger().debug("{}: complete", *ref);
+ return handle.complete();
+ });
+}
+
+CommonPGPipeline& SnapTrimEvent::client_pp()
+{
+ return pg->request_pg_pipeline;
+}
+
+SnapTrimEvent::snap_trim_ertr::future<seastar::stop_iteration>
+SnapTrimEvent::with_pg(
+ ShardServices &shard_services, Ref<PG> _pg)
+{
+ return interruptor::with_interruption([&shard_services, this] {
+ return enter_stage<interruptor>(
+ client_pp().wait_for_active
+ ).then_interruptible([this] {
+ return with_blocking_event<PGActivationBlocker::BlockingEvent,
+ interruptor>([this] (auto&& trigger) {
+ return pg->wait_for_active_blocker.wait(std::move(trigger));
+ });
+ }).then_interruptible([this] {
+ return enter_stage<interruptor>(
+ client_pp().recover_missing);
+ }).then_interruptible([] {
+ //return do_recover_missing(pg, get_target_oid());
+ return seastar::now();
+ }).then_interruptible([this] {
+ return enter_stage<interruptor>(
+ client_pp().get_obc);
+ }).then_interruptible([this] {
+ return pg->snaptrim_mutex.lock(*this);
+ }).then_interruptible([this] {
+ return enter_stage<interruptor>(
+ client_pp().process);
+ }).then_interruptible([&shard_services, this] {
+ return interruptor::async([this] {
+ std::vector<hobject_t> to_trim;
+ using crimson::common::local_conf;
+ const auto max =
+ local_conf().get_val<uint64_t>("osd_pg_max_concurrent_snap_trims");
+ // we need to look for at least 1 snaptrim, otherwise we'll misinterpret
+ // the ENOENT below and erase snapid.
+ int r = snap_mapper.get_next_objects_to_trim(
+ snapid,
+ max,
+ &to_trim);
+ if (r == -ENOENT) {
+ to_trim.clear(); // paranoia
+ return to_trim;
+ } else if (r != 0) {
+ logger().error("{}: get_next_objects_to_trim returned {}",
+ *this, cpp_strerror(r));
+ ceph_abort_msg("get_next_objects_to_trim returned an invalid code");
+ } else {
+ assert(!to_trim.empty());
+ }
+ logger().debug("{}: async almost done line {}", *this, __LINE__);
+ return to_trim;
+ }).then_interruptible([&shard_services, this] (const auto& to_trim) {
+ if (to_trim.empty()) {
+ // the legit ENOENT -> done
+ logger().debug("{}: to_trim is empty! Stopping iteration", *this);
+ pg->snaptrim_mutex.unlock();
+ return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ }
+ return [&shard_services, this](const auto &to_trim) {
+ for (const auto& object : to_trim) {
+ logger().debug("{}: trimming {}", *this, object);
+ auto [op, fut] = shard_services.start_operation_may_interrupt<
+ interruptor, SnapTrimObjSubEvent>(
+ pg,
+ object,
+ snapid);
+ subop_blocker.emplace_back(
+ op->get_id(),
+ std::move(fut)
+ );
+ }
+ return interruptor::now();
+ }(to_trim).then_interruptible([this] {
+ return enter_stage<interruptor>(wait_subop);
+ }).then_interruptible([this] {
+ logger().debug("{}: awaiting completion", *this);
+ return subop_blocker.wait_completion();
+ }).finally([this] {
+ pg->snaptrim_mutex.unlock();
+ }).safe_then_interruptible([this] {
+ if (!needs_pause) {
+ return interruptor::now();
+ }
+ // let's know operators we're waiting
+ return enter_stage<interruptor>(
+ wait_trim_timer
+ ).then_interruptible([this] {
+ using crimson::common::local_conf;
+ const auto time_to_sleep =
+ local_conf().template get_val<double>("osd_snap_trim_sleep");
+ logger().debug("{}: time_to_sleep {}", *this, time_to_sleep);
+ // TODO: this logic should be more sophisticated and distinguish
+ // between SSDs, HDDs and the hybrid case
+ return seastar::sleep(
+ std::chrono::milliseconds(std::lround(time_to_sleep * 1000)));
+ });
+ }).safe_then_interruptible([this] {
+ logger().debug("{}: all completed", *this);
+ return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ });
+ });
+ }, [this](std::exception_ptr eptr) -> snap_trim_ertr::future<seastar::stop_iteration> {
+ logger().debug("{}: interrupted {}", *this, eptr);
+ return crimson::ct_error::eagain::make();
+ }, pg);
+}
+
+
+CommonPGPipeline& SnapTrimObjSubEvent::client_pp()
+{
+ return pg->request_pg_pipeline;
+}
+
+SnapTrimObjSubEvent::remove_or_update_iertr::future<>
+SnapTrimObjSubEvent::start()
+{
+ logger().debug("{}: start", *this);
+ return with_pg(
+ pg->get_shard_services(), pg
+ ).finally([ref=IRef{this}, this] {
+ logger().debug("{}: complete", *ref);
+ return handle.complete();
+ });
+}
+
+SnapTrimObjSubEvent::remove_or_update_iertr::future<>
+SnapTrimObjSubEvent::remove_clone(
+ ObjectContextRef obc,
+ ObjectContextRef head_obc,
+ ceph::os::Transaction& txn,
+ std::vector<pg_log_entry_t>& log_entries
+) {
+ const auto p = std::find(
+ head_obc->ssc->snapset.clones.begin(),
+ head_obc->ssc->snapset.clones.end(),
+ coid.snap);
+ if (p == head_obc->ssc->snapset.clones.end()) {
+ logger().error("{}: Snap {} not in clones",
+ *this, coid.snap);
+ return crimson::ct_error::enoent::make();
+ }
+ assert(p != head_obc->ssc->snapset.clones.end());
+ snapid_t last = coid.snap;
+ delta_stats.num_bytes -= head_obc->ssc->snapset.get_clone_bytes(last);
+
+ if (p != head_obc->ssc->snapset.clones.begin()) {
+ // not the oldest... merge overlap into next older clone
+ std::vector<snapid_t>::iterator n = p - 1;
+ hobject_t prev_coid = coid;
+ prev_coid.snap = *n;
+
+ // does the classical OSD really need is_present_clone(prev_coid)?
+ delta_stats.num_bytes -= head_obc->ssc->snapset.get_clone_bytes(*n);
+ head_obc->ssc->snapset.clone_overlap[*n].intersection_of(
+ head_obc->ssc->snapset.clone_overlap[*p]);
+ delta_stats.num_bytes += head_obc->ssc->snapset.get_clone_bytes(*n);
+ }
+ delta_stats.num_objects--;
+ if (obc->obs.oi.is_dirty()) {
+ delta_stats.num_objects_dirty--;
+ }
+ if (obc->obs.oi.is_omap()) {
+ delta_stats.num_objects_omap--;
+ }
+ if (obc->obs.oi.is_whiteout()) {
+ logger().debug("{}: trimming whiteout on {}",
+ *this, coid);
+ delta_stats.num_whiteouts--;
+ }
+ delta_stats.num_object_clones--;
+
+ obc->obs.exists = false;
+ head_obc->ssc->snapset.clones.erase(p);
+ head_obc->ssc->snapset.clone_overlap.erase(last);
+ head_obc->ssc->snapset.clone_size.erase(last);
+ head_obc->ssc->snapset.clone_snaps.erase(last);
+
+ log_entries.emplace_back(
+ pg_log_entry_t{
+ pg_log_entry_t::DELETE,
+ coid,
+ osd_op_p.at_version,
+ obc->obs.oi.version,
+ 0,
+ osd_reqid_t(),
+ obc->obs.oi.mtime, // will be replaced in `apply_to()`
+ 0}
+ );
+ txn.remove(
+ pg->get_collection_ref()->get_cid(),
+ ghobject_t{coid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD});
+ obc->obs.oi = object_info_t(coid);
+ return OpsExecuter::snap_map_remove(coid, pg->snap_mapper, pg->osdriver, txn);
+}
+
+void SnapTrimObjSubEvent::remove_head_whiteout(
+ ObjectContextRef obc,
+ ObjectContextRef head_obc,
+ ceph::os::Transaction& txn,
+ std::vector<pg_log_entry_t>& log_entries
+) {
+ // NOTE: this arguably constitutes minor interference with the
+ // tiering agent if this is a cache tier since a snap trim event
+ // is effectively evicting a whiteout we might otherwise want to
+ // keep around.
+ const auto head_oid = coid.get_head();
+ logger().info("{}: {} removing {}",
+ *this, coid, head_oid);
+ log_entries.emplace_back(
+ pg_log_entry_t{
+ pg_log_entry_t::DELETE,
+ head_oid,
+ osd_op_p.at_version,
+ head_obc->obs.oi.version,
+ 0,
+ osd_reqid_t(),
+ obc->obs.oi.mtime, // will be replaced in `apply_to()`
+ 0}
+ );
+ logger().info("{}: remove snap head", *this);
+ object_info_t& oi = head_obc->obs.oi;
+ delta_stats.num_objects--;
+ if (oi.is_dirty()) {
+ delta_stats.num_objects_dirty--;
+ }
+ if (oi.is_omap()) {
+ delta_stats.num_objects_omap--;
+ }
+ if (oi.is_whiteout()) {
+ logger().debug("{}: trimming whiteout on {}",
+ *this, oi.soid);
+ delta_stats.num_whiteouts--;
+ }
+ head_obc->obs.exists = false;
+ head_obc->obs.oi = object_info_t(head_oid);
+ txn.remove(pg->get_collection_ref()->get_cid(),
+ ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD});
+}
+
+SnapTrimObjSubEvent::interruptible_future<>
+SnapTrimObjSubEvent::adjust_snaps(
+ ObjectContextRef obc,
+ ObjectContextRef head_obc,
+ const std::set<snapid_t>& new_snaps,
+ ceph::os::Transaction& txn,
+ std::vector<pg_log_entry_t>& log_entries
+) {
+ head_obc->ssc->snapset.clone_snaps[coid.snap] =
+ std::vector<snapid_t>(new_snaps.rbegin(), new_snaps.rend());
+
+ // we still do a 'modify' event on this object just to trigger a
+ // snapmapper.update ... :(
+ obc->obs.oi.prior_version = obc->obs.oi.version;
+ obc->obs.oi.version = osd_op_p.at_version;
+ ceph::bufferlist bl;
+ encode(obc->obs.oi,
+ bl,
+ pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
+ txn.setattr(
+ pg->get_collection_ref()->get_cid(),
+ ghobject_t{coid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD},
+ OI_ATTR,
+ bl);
+ log_entries.emplace_back(
+ pg_log_entry_t{
+ pg_log_entry_t::MODIFY,
+ coid,
+ obc->obs.oi.version,
+ obc->obs.oi.prior_version,
+ 0,
+ osd_reqid_t(),
+ obc->obs.oi.mtime,
+ 0}
+ );
+ return OpsExecuter::snap_map_modify(
+ coid, new_snaps, pg->snap_mapper, pg->osdriver, txn);
+}
+
+void SnapTrimObjSubEvent::update_head(
+ ObjectContextRef obc,
+ ObjectContextRef head_obc,
+ ceph::os::Transaction& txn,
+ std::vector<pg_log_entry_t>& log_entries
+) {
+ const auto head_oid = coid.get_head();
+ logger().info("{}: writing updated snapset on {}, snapset is {}",
+ *this, head_oid, head_obc->ssc->snapset);
+ log_entries.emplace_back(
+ pg_log_entry_t{
+ pg_log_entry_t::MODIFY,
+ head_oid,
+ osd_op_p.at_version,
+ head_obc->obs.oi.version,
+ 0,
+ osd_reqid_t(),
+ obc->obs.oi.mtime,
+ 0}
+ );
+
+ head_obc->obs.oi.prior_version = head_obc->obs.oi.version;
+ head_obc->obs.oi.version = osd_op_p.at_version;
+
+ std::map<std::string, ceph::bufferlist, std::less<>> attrs;
+ ceph::bufferlist bl;
+ encode(head_obc->ssc->snapset, bl);
+ attrs[SS_ATTR] = std::move(bl);
+
+ bl.clear();
+ head_obc->obs.oi.encode_no_oid(bl,
+ pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
+ attrs[OI_ATTR] = std::move(bl);
+ txn.setattrs(
+ pg->get_collection_ref()->get_cid(),
+ ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD},
+ attrs);
+}
+
+SnapTrimObjSubEvent::remove_or_update_iertr::future<
+ SnapTrimObjSubEvent::remove_or_update_ret_t>
+SnapTrimObjSubEvent::remove_or_update(
+ ObjectContextRef obc,
+ ObjectContextRef head_obc)
+{
+ auto citer = head_obc->ssc->snapset.clone_snaps.find(coid.snap);
+ if (citer == head_obc->ssc->snapset.clone_snaps.end()) {
+ logger().error("{}: No clone_snaps in snapset {} for object {}",
+ *this, head_obc->ssc->snapset, coid);
+ return crimson::ct_error::enoent::make();
+ }
+ const auto& old_snaps = citer->second;
+ if (old_snaps.empty()) {
+ logger().error("{}: no object info snaps for object {}",
+ *this, coid);
+ return crimson::ct_error::enoent::make();
+ }
+ if (head_obc->ssc->snapset.seq == 0) {
+ logger().error("{}: no snapset.seq for object {}",
+ *this, coid);
+ return crimson::ct_error::enoent::make();
+ }
+ const OSDMapRef& osdmap = pg->get_osdmap();
+ std::set<snapid_t> new_snaps;
+ for (const auto& old_snap : old_snaps) {
+ if (!osdmap->in_removed_snaps_queue(pg->get_info().pgid.pgid.pool(),
+ old_snap)
+ && old_snap != snap_to_trim) {
+ new_snaps.insert(old_snap);
+ }
+ }
+
+ return seastar::do_with(ceph::os::Transaction{}, [=, this](auto &txn) {
+ std::vector<pg_log_entry_t> log_entries{};
+
+ int64_t num_objects_before_trim = delta_stats.num_objects;
+ osd_op_p.at_version = pg->next_version();
+ auto ret = remove_or_update_iertr::now();
+ if (new_snaps.empty()) {
+ // remove clone from snapset
+ logger().info("{}: {} snaps {} -> {} ... deleting",
+ *this, coid, old_snaps, new_snaps);
+ ret = remove_clone(obc, head_obc, txn, log_entries);
+ } else {
+ // save adjusted snaps for this object
+ logger().info("{}: {} snaps {} -> {}",
+ *this, coid, old_snaps, new_snaps);
+ ret = adjust_snaps(obc, head_obc, new_snaps, txn, log_entries);
+ }
+ return std::move(ret).safe_then_interruptible(
+ [&txn, obc, num_objects_before_trim, log_entries=std::move(log_entries), head_obc=std::move(head_obc), this]() mutable {
+ osd_op_p.at_version = pg->next_version();
+
+ // save head snapset
+ logger().debug("{}: {} new snapset {} on {}",
+ *this, coid, head_obc->ssc->snapset, head_obc->obs.oi);
+ if (head_obc->ssc->snapset.clones.empty() && head_obc->obs.oi.is_whiteout()) {
+ remove_head_whiteout(obc, head_obc, txn, log_entries);
+ } else {
+ update_head(obc, head_obc, txn, log_entries);
+ }
+ // Stats reporting - Set number of objects trimmed
+ if (num_objects_before_trim > delta_stats.num_objects) {
+ //int64_t num_objects_trimmed =
+ // num_objects_before_trim - delta_stats.num_objects;
+ //add_objects_trimmed_count(num_objects_trimmed);
+ }
+ }).safe_then_interruptible(
+ [&txn, log_entries=std::move(log_entries)] () mutable {
+ return remove_or_update_iertr::make_ready_future<remove_or_update_ret_t>(
+ std::make_pair(std::move(txn), std::move(log_entries)));
+ });
+ });
+}
+
+SnapTrimObjSubEvent::remove_or_update_iertr::future<>
+SnapTrimObjSubEvent::with_pg(
+ ShardServices &shard_services, Ref<PG> _pg)
+{
+ return enter_stage<interruptor>(
+ client_pp().wait_for_active
+ ).then_interruptible([this] {
+ return with_blocking_event<PGActivationBlocker::BlockingEvent,
+ interruptor>([this] (auto&& trigger) {
+ return pg->wait_for_active_blocker.wait(std::move(trigger));
+ });
+ }).then_interruptible([this] {
+ return enter_stage<interruptor>(
+ client_pp().recover_missing);
+ }).then_interruptible([] {
+ //return do_recover_missing(pg, get_target_oid());
+ return seastar::now();
+ }).then_interruptible([this] {
+ return enter_stage<interruptor>(
+ client_pp().get_obc);
+ }).then_interruptible([this] {
+ logger().debug("{}: getting obc for {}", *this, coid);
+ // end of commonality
+ // with_clone_obc_direct lock both clone's and head's obcs
+ return pg->obc_loader.with_clone_obc_direct<RWState::RWWRITE>(
+ coid,
+ [this](auto head_obc, auto clone_obc) {
+ logger().debug("{}: got clone_obc={}", *this, clone_obc->get_oid());
+ return enter_stage<interruptor>(
+ client_pp().process
+ ).then_interruptible(
+ [this,clone_obc=std::move(clone_obc), head_obc=std::move(head_obc)]() mutable {
+ logger().debug("{}: processing clone_obc={}", *this, clone_obc->get_oid());
+ return remove_or_update(
+ clone_obc, head_obc
+ ).safe_then_unpack_interruptible([clone_obc, this]
+ (auto&& txn, auto&& log_entries) mutable {
+ auto [submitted, all_completed] = pg->submit_transaction(
+ std::move(clone_obc),
+ std::move(txn),
+ std::move(osd_op_p),
+ std::move(log_entries));
+ return submitted.then_interruptible(
+ [all_completed=std::move(all_completed), this] () mutable {
+ return enter_stage<interruptor>(
+ wait_repop
+ ).then_interruptible([all_completed=std::move(all_completed)] () mutable {
+ return std::move(all_completed);
+ });
+ });
+ });
+ });
+ }).handle_error_interruptible(
+ remove_or_update_iertr::pass_further{},
+ crimson::ct_error::assert_all{"unexpected error in SnapTrimObjSubEvent"}
+ );
+ });
+}
+
+void SnapTrimObjSubEvent::print(std::ostream &lhs) const
+{
+ lhs << "SnapTrimObjSubEvent("
+ << "coid=" << coid
+ << " snapid=" << snap_to_trim
+ << ")";
+}
+
+void SnapTrimObjSubEvent::dump_detail(Formatter *f) const
+{
+ f->open_object_section("SnapTrimObjSubEvent");
+ f->dump_stream("coid") << coid;
+ f->close_section();
+}
+
+} // namespace crimson::osd
diff --git a/src/crimson/osd/osd_operations/snaptrim_event.h b/src/crimson/osd/osd_operations/snaptrim_event.h
new file mode 100644
index 000000000..a3a970a04
--- /dev/null
+++ b/src/crimson/osd/osd_operations/snaptrim_event.h
@@ -0,0 +1,210 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+#include <seastar/core/future.hh>
+
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/common/pg_pipeline.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/pg_activation_blocker.h"
+#include "osd/osd_types.h"
+#include "osd/PGPeeringEvent.h"
+#include "osd/PeeringState.h"
+
+namespace ceph {
+ class Formatter;
+}
+
+class SnapMapper;
+
+namespace crimson::osd {
+
+class OSD;
+class ShardServices;
+
+// trim up to `max` objects for snapshot `snapid
+class SnapTrimEvent final : public PhasedOperationT<SnapTrimEvent> {
+public:
+ using remove_or_update_ertr =
+ crimson::errorator<crimson::ct_error::enoent>;
+ using remove_or_update_iertr =
+ crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, remove_or_update_ertr>;
+ using snap_trim_ertr = remove_or_update_ertr::extend<
+ crimson::ct_error::eagain>;
+ using snap_trim_iertr = remove_or_update_iertr::extend<
+ crimson::ct_error::eagain>;
+
+ static constexpr OperationTypeCode type = OperationTypeCode::snaptrim_event;
+
+ SnapTrimEvent(Ref<PG> pg,
+ SnapMapper& snap_mapper,
+ const snapid_t snapid,
+ const bool needs_pause)
+ : pg(std::move(pg)),
+ snap_mapper(snap_mapper),
+ snapid(snapid),
+ needs_pause(needs_pause) {}
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter* f) const final;
+ snap_trim_ertr::future<seastar::stop_iteration> start();
+ snap_trim_ertr::future<seastar::stop_iteration> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
+
+private:
+ CommonPGPipeline& client_pp();
+
+ // bases on 998cb8c141bb89aafae298a9d5e130fbd78fe5f2
+ struct SubOpBlocker : crimson::BlockerT<SubOpBlocker> {
+ static constexpr const char* type_name = "CompoundOpBlocker";
+
+ using id_done_t = std::pair<crimson::Operation::id_t,
+ remove_or_update_iertr::future<>>;
+
+ void dump_detail(Formatter *f) const final;
+
+ template <class... Args>
+ void emplace_back(Args&&... args);
+
+ remove_or_update_iertr::future<> wait_completion();
+ private:
+ std::vector<id_done_t> subops;
+ } subop_blocker;
+
+ // we don't need to synchronize with other instances of SnapTrimEvent;
+ // it's here for the sake of op tracking.
+ struct WaitSubop : OrderedConcurrentPhaseT<WaitSubop> {
+ static constexpr auto type_name = "SnapTrimEvent::wait_subop";
+ } wait_subop;
+
+ // an instantiator can instruct us to go over this stage and then
+ // wait for the future to implement throttling. It is implemented
+ // that way to for the sake of tracking ops.
+ struct WaitTrimTimer : OrderedExclusivePhaseT<WaitTrimTimer> {
+ static constexpr auto type_name = "SnapTrimEvent::wait_trim_timer";
+ } wait_trim_timer;
+
+ PipelineHandle handle;
+ Ref<PG> pg;
+ SnapMapper& snap_mapper;
+ const snapid_t snapid;
+ const bool needs_pause;
+
+public:
+ PipelineHandle& get_handle() { return handle; }
+
+ std::tuple<
+ StartEvent,
+ CommonPGPipeline::WaitForActive::BlockingEvent,
+ PGActivationBlocker::BlockingEvent,
+ CommonPGPipeline::RecoverMissing::BlockingEvent,
+ CommonPGPipeline::GetOBC::BlockingEvent,
+ CommonPGPipeline::Process::BlockingEvent,
+ WaitSubop::BlockingEvent,
+ PG::SnapTrimMutex::WaitPG::BlockingEvent,
+ WaitTrimTimer::BlockingEvent,
+ CompletionEvent
+ > tracking_events;
+
+ friend class PG::SnapTrimMutex;
+};
+
+// remove single object. a SnapTrimEvent can create multiple subrequests.
+// the division of labour is needed because of the restriction that an Op
+// cannot revisite a pipeline's stage it already saw.
+class SnapTrimObjSubEvent : public PhasedOperationT<SnapTrimObjSubEvent> {
+public:
+ using remove_or_update_ertr =
+ crimson::errorator<crimson::ct_error::enoent>;
+ using remove_or_update_iertr =
+ crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, remove_or_update_ertr>;
+
+ static constexpr OperationTypeCode type =
+ OperationTypeCode::snaptrimobj_subevent;
+
+ SnapTrimObjSubEvent(
+ Ref<PG> pg,
+ const hobject_t& coid,
+ snapid_t snap_to_trim)
+ : pg(std::move(pg)),
+ coid(coid),
+ snap_to_trim(snap_to_trim) {
+ }
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter* f) const final;
+ remove_or_update_iertr::future<> start();
+ remove_or_update_iertr::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
+
+ CommonPGPipeline& client_pp();
+
+private:
+ object_stat_sum_t delta_stats;
+
+ remove_or_update_iertr::future<> remove_clone(
+ ObjectContextRef obc,
+ ObjectContextRef head_obc,
+ ceph::os::Transaction& txn,
+ std::vector<pg_log_entry_t>& log_entries);
+ void remove_head_whiteout(
+ ObjectContextRef obc,
+ ObjectContextRef head_obc,
+ ceph::os::Transaction& txn,
+ std::vector<pg_log_entry_t>& log_entries);
+ interruptible_future<> adjust_snaps(
+ ObjectContextRef obc,
+ ObjectContextRef head_obc,
+ const std::set<snapid_t>& new_snaps,
+ ceph::os::Transaction& txn,
+ std::vector<pg_log_entry_t>& log_entries);
+ void update_head(
+ ObjectContextRef obc,
+ ObjectContextRef head_obc,
+ ceph::os::Transaction& txn,
+ std::vector<pg_log_entry_t>& log_entries);
+
+ using remove_or_update_ret_t =
+ std::pair<ceph::os::Transaction, std::vector<pg_log_entry_t>>;
+ remove_or_update_iertr::future<remove_or_update_ret_t>
+ remove_or_update(ObjectContextRef obc, ObjectContextRef head_obc);
+
+ // we don't need to synchronize with other instances started by
+ // SnapTrimEvent; it's here for the sake of op tracking.
+ struct WaitRepop : OrderedConcurrentPhaseT<WaitRepop> {
+ static constexpr auto type_name = "SnapTrimObjSubEvent::wait_repop";
+ } wait_repop;
+
+ Ref<PG> pg;
+ PipelineHandle handle;
+ osd_op_params_t osd_op_p;
+ const hobject_t coid;
+ const snapid_t snap_to_trim;
+
+public:
+ PipelineHandle& get_handle() { return handle; }
+
+ std::tuple<
+ StartEvent,
+ CommonPGPipeline::WaitForActive::BlockingEvent,
+ PGActivationBlocker::BlockingEvent,
+ CommonPGPipeline::RecoverMissing::BlockingEvent,
+ CommonPGPipeline::GetOBC::BlockingEvent,
+ CommonPGPipeline::Process::BlockingEvent,
+ WaitRepop::BlockingEvent,
+ CompletionEvent
+ > tracking_events;
+};
+
+} // namespace crimson::osd
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::SnapTrimEvent> : fmt::ostream_formatter {};
+template <> struct fmt::formatter<crimson::osd::SnapTrimObjSubEvent> : fmt::ostream_formatter {};
+#endif