From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- .../osd/osd_operations/compound_peering_request.cc | 170 +++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 src/crimson/osd/osd_operations/compound_peering_request.cc (limited to 'src/crimson/osd/osd_operations/compound_peering_request.cc') diff --git a/src/crimson/osd/osd_operations/compound_peering_request.cc b/src/crimson/osd/osd_operations/compound_peering_request.cc new file mode 100644 index 000000000..e55760096 --- /dev/null +++ b/src/crimson/osd/osd_operations/compound_peering_request.cc @@ -0,0 +1,170 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include "osd/PeeringState.h" + +#include "messages/MOSDPGQuery.h" +#include "messages/MOSDPGCreate2.h" + +#include "common/Formatter.h" + +#include "crimson/common/exception.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/osd.h" +#include "crimson/osd/osd_operations/compound_peering_request.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace { +using namespace crimson::osd; + +struct compound_state { + seastar::promise promise; + // assuming crimson-osd won't need to be compatible with pre-octopus + // releases + BufferedRecoveryMessages ctx{ceph_release_t::octopus}; + compound_state() = default; + ~compound_state() { + promise.set_value(std::move(ctx)); + } +}; +using compound_state_ref = seastar::lw_shared_ptr; + +class PeeringSubEvent : public RemotePeeringEvent { + compound_state_ref state; +public: + template + PeeringSubEvent(compound_state_ref state, Args &&... args) : + RemotePeeringEvent(std::forward(args)...), state(state) {} + + seastar::future<> complete_rctx(Ref pg) final { + logger().debug("{}: submitting ctx transaction", *this); + state->ctx.accept_buffered_messages(ctx); + state = {}; + if (!pg) { + ceph_assert(ctx.transaction.empty()); + return seastar::now(); + } else { + return osd.get_shard_services().dispatch_context_transaction( + pg->get_collection_ref(), ctx); + } + } +}; + +std::vector handle_pg_create( + OSD &osd, + crimson::net::ConnectionRef conn, + compound_state_ref state, + Ref m) +{ + std::vector ret; + for (auto& [pgid, when] : m->pgs) { + const auto &[created, created_stamp] = when; + auto q = m->pg_extra.find(pgid); + ceph_assert(q != m->pg_extra.end()); + auto& [history, pi] = q->second; + logger().debug( + "{}: {} e{} @{} " + "history {} pi {}", + __func__, pgid, created, created_stamp, + history, pi); + if (!pi.empty() && + m->epoch < pi.get_bounds().second) { + logger().error( + "got pg_create on {} epoch {} " + "unmatched past_intervals {} (history {})", + pgid, m->epoch, + pi, history); + } else { + auto op = osd.get_shard_services().start_operation( + state, + osd, + conn, + osd.get_shard_services(), + pg_shard_t(), + pgid, + m->epoch, + m->epoch, + NullEvt(), + true, + new PGCreateInfo(pgid, m->epoch, history, pi, true)).first; + ret.push_back(op); + } + } + return ret; +} + +struct SubOpBlocker : BlockerT { + static constexpr const char * type_name = "CompoundOpBlocker"; + + std::vector subops; + SubOpBlocker(std::vector &&subops) : subops(subops) {} + + virtual void dump_detail(Formatter *f) const { + f->open_array_section("dependent_operations"); + { + for (auto &i : subops) { + i->dump_brief(f); + } + } + f->close_section(); + } +}; + +} // namespace + +namespace crimson::osd { + +CompoundPeeringRequest::CompoundPeeringRequest( + OSD &osd, crimson::net::ConnectionRef conn, Ref m) + : osd(osd), + conn(conn), + m(m) +{} + +void CompoundPeeringRequest::print(std::ostream &lhs) const +{ + lhs << *m; +} + +void CompoundPeeringRequest::dump_detail(Formatter *f) const +{ + f->dump_stream("message") << *m; +} + +seastar::future<> CompoundPeeringRequest::start() +{ + logger().info("{}: starting", *this); + auto state = seastar::make_lw_shared(); + auto blocker = std::make_unique( + [&] { + assert((m->get_type() == MSG_OSD_PG_CREATE2)); + return handle_pg_create( + osd, + conn, + state, + boost::static_pointer_cast(m)); + }()); + + IRef ref = this; + logger().info("{}: about to fork future", *this); + return crimson::common::handle_system_shutdown( + [this, ref, blocker=std::move(blocker), state]() mutable { + return with_blocking_future( + blocker->make_blocking_future(state->promise.get_future()) + ).then([this, blocker=std::move(blocker)](auto &&ctx) { + logger().info("{}: sub events complete", *this); + return osd.get_shard_services().dispatch_context_messages(std::move(ctx)); + }).then([this, ref=std::move(ref)] { + logger().info("{}: complete", *this); + }); + }); +} + +} // namespace crimson::osd -- cgit v1.2.3