diff options
Diffstat (limited to 'src/crimson/osd/osd_operations')
-rw-r--r-- | src/crimson/osd/osd_operations/background_recovery.cc | 140 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/background_recovery.h | 126 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/client_request.cc | 201 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/client_request.h | 76 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/compound_peering_request.cc | 170 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/compound_peering_request.h | 40 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/osdop_params.h | 27 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/peering_event.cc | 173 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/peering_event.h | 142 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/pg_advance_map.cc | 97 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/pg_advance_map.h | 50 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/recovery_subrequest.cc | 29 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/recovery_subrequest.h | 45 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/replicated_request.cc | 74 | ||||
-rw-r--r-- | src/crimson/osd/osd_operations/replicated_request.h | 58 |
15 files changed, 1448 insertions, 0 deletions
diff --git a/src/crimson/osd/osd_operations/background_recovery.cc b/src/crimson/osd/osd_operations/background_recovery.cc new file mode 100644 index 000000000..126e0e902 --- /dev/null +++ b/src/crimson/osd/osd_operations/background_recovery.cc @@ -0,0 +1,140 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <seastar/core/future.hh> + +#include "messages/MOSDOp.h" + +#include "crimson/osd/pg.h" +#include "crimson/osd/shard_services.h" +#include "common/Formatter.h" +#include "crimson/osd/osd_operations/background_recovery.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +BackgroundRecovery::BackgroundRecovery( + Ref<PG> pg, + ShardServices &ss, + epoch_t epoch_started, + crimson::osd::scheduler::scheduler_class_t scheduler_class) + : pg(pg), + epoch_started(epoch_started), + ss(ss), + scheduler_class(scheduler_class) +{} + +void BackgroundRecovery::print(std::ostream &lhs) const +{ + lhs << "BackgroundRecovery(" << pg->get_pgid() << ")"; +} + +void BackgroundRecovery::dump_detail(Formatter *f) const +{ + f->dump_stream("pgid") << pg->get_pgid(); + f->open_object_section("recovery_detail"); + { + // TODO pg->dump_recovery_state(f); + } + f->close_section(); +} + +seastar::future<> BackgroundRecovery::start() +{ + logger().debug("{}: start", *this); + + IRef ref = this; + return ss.throttler.with_throttle_while( + this, get_scheduler_params(), [this] { + return do_recovery(); + }).handle_exception_type([ref, this](const std::system_error& err) { + if (err.code() == std::make_error_code(std::errc::interrupted)) { + logger().debug("{} recovery interruped: {}", *pg, err.what()); + return seastar::now(); + } + return seastar::make_exception_future<>(err); + }); +} + +seastar::future<bool> UrgentRecovery::do_recovery() +{ + if (!pg->has_reset_since(epoch_started)) { + return with_blocking_future( + pg->get_recovery_handler()->recover_missing(soid, need) + ).then([] { + return seastar::make_ready_future<bool>(false); + }); + } + return seastar::make_ready_future<bool>(false); +} + +void UrgentRecovery::print(std::ostream &lhs) const +{ + lhs << "UrgentRecovery(" << pg->get_pgid() << ", " + << soid << ", v" << need << ")"; +} + +void UrgentRecovery::dump_detail(Formatter *f) const +{ + f->dump_stream("pgid") << pg->get_pgid(); + f->open_object_section("recovery_detail"); + { + f->dump_stream("oid") << soid; + f->dump_stream("version") << need; + } + f->close_section(); +} + +PglogBasedRecovery::PglogBasedRecovery( + Ref<PG> pg, + ShardServices &ss, + const epoch_t epoch_started) + : BackgroundRecovery( + std::move(pg), + ss, + epoch_started, + crimson::osd::scheduler::scheduler_class_t::background_recovery) +{} + +seastar::future<bool> PglogBasedRecovery::do_recovery() +{ + if (pg->has_reset_since(epoch_started)) + return seastar::make_ready_future<bool>(false); + return with_blocking_future( + pg->get_recovery_handler()->start_recovery_ops( + crimson::common::local_conf()->osd_recovery_max_single_start)); +} + +BackfillRecovery::BackfillRecoveryPipeline &BackfillRecovery::bp(PG &pg) +{ + return pg.backfill_pipeline; +} + +seastar::future<bool> BackfillRecovery::do_recovery() +{ + logger().debug("{}", __func__); + + if (pg->has_reset_since(epoch_started)) { + logger().debug("{}: pg got reset since epoch_started={}", + __func__, epoch_started); + return seastar::make_ready_future<bool>(false); + } + // TODO: limits + return with_blocking_future( + // process_event() of our boost::statechart machine is non-reentrant. + // with the backfill_pipeline we protect it from a second entry from + // the implementation of BackfillListener. + // additionally, this stage serves to synchronize with PeeringEvent. + handle.enter(bp(*pg).process) + ).then([this] { + pg->get_recovery_handler()->dispatch_backfill_event(std::move(evt)); + return seastar::make_ready_future<bool>(false); + }); +} + +} // namespace crimson::osd diff --git a/src/crimson/osd/osd_operations/background_recovery.h b/src/crimson/osd/osd_operations/background_recovery.h new file mode 100644 index 000000000..37e46c588 --- /dev/null +++ b/src/crimson/osd/osd_operations/background_recovery.h @@ -0,0 +1,126 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <boost/statechart/event_base.hpp> + +#include "crimson/net/Connection.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/common/type_helpers.h" + +#include "messages/MOSDOp.h" + +namespace crimson::osd { +class PG; +class ShardServices; + +class BackgroundRecovery : public OperationT<BackgroundRecovery> { +public: + static constexpr OperationTypeCode type = OperationTypeCode::background_recovery; + + BackgroundRecovery( + Ref<PG> pg, + ShardServices &ss, + epoch_t epoch_started, + crimson::osd::scheduler::scheduler_class_t scheduler_class); + + virtual void print(std::ostream &) const; + seastar::future<> start(); + +protected: + Ref<PG> pg; + const epoch_t epoch_started; + +private: + virtual void dump_detail(Formatter *f) const; + crimson::osd::scheduler::params_t get_scheduler_params() const { + return { + 1, // cost + 0, // owner + scheduler_class + }; + } + virtual seastar::future<bool> do_recovery() = 0; + ShardServices &ss; + const crimson::osd::scheduler::scheduler_class_t scheduler_class; +}; + +/// represent a recovery initiated for serving a client request +/// +/// unlike @c PglogBasedRecovery and @c BackfillRecovery, +/// @c UrgentRecovery is not throttled by the scheduler. and it +/// utilizes @c RecoveryBackend directly to recover the unreadable +/// object. +class UrgentRecovery final : public BackgroundRecovery { +public: + UrgentRecovery( + const hobject_t& soid, + const eversion_t& need, + Ref<PG> pg, + ShardServices& ss, + epoch_t epoch_started) + : BackgroundRecovery{pg, ss, epoch_started, + crimson::osd::scheduler::scheduler_class_t::immediate}, + soid{soid}, need(need) {} + void print(std::ostream&) const final; + +private: + void dump_detail(Formatter* f) const final; + seastar::future<bool> do_recovery() override; + const hobject_t soid; + const eversion_t need; +}; + +class PglogBasedRecovery final : public BackgroundRecovery { +public: + PglogBasedRecovery( + Ref<PG> pg, + ShardServices &ss, + epoch_t epoch_started); + +private: + seastar::future<bool> do_recovery() override; +}; + +class BackfillRecovery final : public BackgroundRecovery { +public: + class BackfillRecoveryPipeline { + OrderedPipelinePhase process = { + "BackfillRecovery::PGPipeline::process" + }; + friend class BackfillRecovery; + friend class PeeringEvent; + }; + + template <class EventT> + BackfillRecovery( + Ref<PG> pg, + ShardServices &ss, + epoch_t epoch_started, + const EventT& evt); + + static BackfillRecoveryPipeline &bp(PG &pg); + +private: + boost::intrusive_ptr<const boost::statechart::event_base> evt; + OrderedPipelinePhase::Handle handle; + seastar::future<bool> do_recovery() override; +}; + +template <class EventT> +BackfillRecovery::BackfillRecovery( + Ref<PG> pg, + ShardServices &ss, + const epoch_t epoch_started, + const EventT& evt) + : BackgroundRecovery( + std::move(pg), + ss, + epoch_started, + crimson::osd::scheduler::scheduler_class_t::background_best_effort), + evt(evt.intrusive_from_this()) +{} + + +} diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc new file mode 100644 index 000000000..87b8fc788 --- /dev/null +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -0,0 +1,201 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <seastar/core/future.hh> + +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" + +#include "crimson/common/exception.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/osd.h" +#include "common/Formatter.h" +#include "crimson/osd/osd_operations/client_request.h" +#include "crimson/osd/osd_connection_priv.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +ClientRequest::ClientRequest( + OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m) + : osd(osd), conn(conn), m(m) +{} + +void ClientRequest::print(std::ostream &lhs) const +{ + lhs << *m; +} + +void ClientRequest::dump_detail(Formatter *f) const +{ +} + +ClientRequest::ConnectionPipeline &ClientRequest::cp() +{ + return get_osd_priv(conn.get()).client_request_conn_pipeline; +} + +ClientRequest::PGPipeline &ClientRequest::pp(PG &pg) +{ + return pg.client_request_pg_pipeline; +} + +bool ClientRequest::is_pg_op() const +{ + return std::any_of( + begin(m->ops), end(m->ops), + [](auto& op) { return ceph_osd_op_type_pg(op.op.op); }); +} + +seastar::future<> ClientRequest::start() +{ + logger().debug("{}: start", *this); + + IRef opref = this; + return crimson::common::handle_system_shutdown( + [this, opref=std::move(opref)]() mutable { + return seastar::repeat([this, opref]() mutable { + return with_blocking_future(handle.enter(cp().await_map)) + .then([this]() { + return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch())); + }).then([this](epoch_t epoch) { + return with_blocking_future(handle.enter(cp().get_pg)); + }).then([this] { + return with_blocking_future(osd.wait_for_pg(m->get_spg())); + }).then([this, opref](Ref<PG> pgref) { + PG &pg = *pgref; + if (pg.can_discard_op(*m)) { + return osd.send_incremental_map(conn, m->get_map_epoch()); + } + return with_blocking_future( + handle.enter(pp(pg).await_map) + ).then([this, &pg]() mutable { + return with_blocking_future( + pg.osdmap_gate.wait_for_map(m->get_min_epoch())); + }).then([this, &pg](auto map) mutable { + return with_blocking_future( + handle.enter(pp(pg).wait_for_active)); + }).then([this, &pg]() mutable { + return with_blocking_future(pg.wait_for_active_blocker.wait()); + }).then([this, pgref=std::move(pgref)]() mutable { + if (m->finish_decode()) { + m->clear_payload(); + } + if (is_pg_op()) { + return process_pg_op(pgref); + } else { + return process_op(pgref); + } + }); + }).then([] { + return seastar::stop_iteration::yes; + }).handle_exception_type([](crimson::common::actingset_changed& e) { + if (e.is_primary()) { + logger().debug("operation restart, acting set changed"); + return seastar::stop_iteration::no; + } else { + logger().debug("operation abort, up primary changed"); + return seastar::stop_iteration::yes; + } + }); + }); + }); +} + +seastar::future<> ClientRequest::process_pg_op( + Ref<PG> &pg) +{ + return pg->do_pg_ops(m) + .then([this, pg=std::move(pg)](Ref<MOSDOpReply> reply) { + return conn->send(reply); + }); +} + +seastar::future<> ClientRequest::process_op( + Ref<PG> &pgref) +{ + PG& pg = *pgref; + return with_blocking_future( + handle.enter(pp(pg).recover_missing) + ).then([this, &pg, pgref] { + eversion_t ver; + const hobject_t& soid = m->get_hobj(); + logger().debug("{} check for recovery, {}", *this, soid); + if (pg.is_unreadable_object(soid, &ver) || + pg.is_degraded_or_backfilling_object(soid)) { + logger().debug("{} need to wait for recovery, {}", *this, soid); + if (pg.get_recovery_backend()->is_recovering(soid)) { + return pg.get_recovery_backend()->get_recovering(soid).wait_for_recovered(); + } else { + auto [op, fut] = osd.get_shard_services().start_operation<UrgentRecovery>( + soid, ver, pgref, osd.get_shard_services(), pg.get_osdmap_epoch()); + return std::move(fut); + } + } + return seastar::now(); + }).then([this, &pg] { + return with_blocking_future(handle.enter(pp(pg).get_obc)); + }).then([this, &pg]() -> PG::load_obc_ertr::future<> { + op_info.set_from_op(&*m, *pg.get_osdmap()); + return pg.with_locked_obc(m, op_info, this, [this, &pg](auto obc) { + return with_blocking_future( + handle.enter(pp(pg).process) + ).then([this, &pg, obc] { + if (!pg.is_primary()) { + // primary can handle both normal ops and balanced reads + if (is_misdirected(pg)) { + logger().trace("process_op: dropping misdirected op"); + return seastar::make_ready_future<Ref<MOSDOpReply>>(); + } else if (const hobject_t& hoid = m->get_hobj(); + !pg.get_peering_state().can_serve_replica_read(hoid)) { + auto reply = make_message<MOSDOpReply>( + m.get(), -EAGAIN, pg.get_osdmap_epoch(), + m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK), + !m->has_flag(CEPH_OSD_FLAG_RETURNVEC)); + return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply)); + } + } + return pg.do_osd_ops(m, obc, op_info); + }).then([this](Ref<MOSDOpReply> reply) { + if (reply) { + return conn->send(std::move(reply)); + } else { + return seastar::now(); + } + }); + }); + }).safe_then([pgref=std::move(pgref)] { + return seastar::now(); + }, PG::load_obc_ertr::all_same_way([](auto &code) { + logger().error("ClientRequest saw error code {}", code); + return seastar::now(); + })); +} + +bool ClientRequest::is_misdirected(const PG& pg) const +{ + // otherwise take a closer look + if (const int flags = m->get_flags(); + flags & CEPH_OSD_FLAG_BALANCE_READS || + flags & CEPH_OSD_FLAG_LOCALIZE_READS) { + if (!op_info.may_read()) { + // no read found, so it can't be balanced read + return true; + } + if (op_info.may_write() || op_info.may_cache()) { + // write op, but i am not primary + return true; + } + // balanced reads; any replica will do + return pg.is_nonprimary(); + } + // neither balanced nor localize reads + return true; +} + +} diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h new file mode 100644 index 000000000..ea3124a93 --- /dev/null +++ b/src/crimson/osd/osd_operations/client_request.h @@ -0,0 +1,76 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "osd/osd_op_util.h" +#include "crimson/net/Connection.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/common/type_helpers.h" +#include "messages/MOSDOp.h" + +namespace crimson::osd { +class PG; +class OSD; + +class ClientRequest final : public OperationT<ClientRequest> { + OSD &osd; + crimson::net::ConnectionRef conn; + Ref<MOSDOp> m; + OpInfo op_info; + OrderedPipelinePhase::Handle handle; + +public: + class ConnectionPipeline { + OrderedPipelinePhase await_map = { + "ClientRequest::ConnectionPipeline::await_map" + }; + OrderedPipelinePhase get_pg = { + "ClientRequest::ConnectionPipeline::get_pg" + }; + friend class ClientRequest; + }; + class PGPipeline { + OrderedPipelinePhase await_map = { + "ClientRequest::PGPipeline::await_map" + }; + OrderedPipelinePhase wait_for_active = { + "ClientRequest::PGPipeline::wait_for_active" + }; + OrderedPipelinePhase recover_missing = { + "ClientRequest::PGPipeline::recover_missing" + }; + OrderedPipelinePhase get_obc = { + "ClientRequest::PGPipeline::get_obc" + }; + OrderedPipelinePhase process = { + "ClientRequest::PGPipeline::process" + }; + friend class ClientRequest; + }; + + static constexpr OperationTypeCode type = OperationTypeCode::client_request; + + ClientRequest(OSD &osd, crimson::net::ConnectionRef, Ref<MOSDOp> &&m); + + void print(std::ostream &) const final; + void dump_detail(Formatter *f) const final; + +public: + seastar::future<> start(); + +private: + seastar::future<> process_pg_op( + Ref<PG> &pg); + seastar::future<> process_op( + Ref<PG> &pg); + bool is_pg_op() const; + + ConnectionPipeline &cp(); + PGPipeline &pp(PG &pg); + +private: + bool is_misdirected(const PG& pg) const; +}; + +} diff --git a/src/crimson/osd/osd_operations/compound_peering_request.cc b/src/crimson/osd/osd_operations/compound_peering_request.cc new file mode 100644 index 000000000..e55760096 --- /dev/null +++ b/src/crimson/osd/osd_operations/compound_peering_request.cc @@ -0,0 +1,170 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <seastar/core/future.hh> + +#include "osd/PeeringState.h" + +#include "messages/MOSDPGQuery.h" +#include "messages/MOSDPGCreate2.h" + +#include "common/Formatter.h" + +#include "crimson/common/exception.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/osd.h" +#include "crimson/osd/osd_operations/compound_peering_request.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace { +using namespace crimson::osd; + +struct compound_state { + seastar::promise<BufferedRecoveryMessages> promise; + // assuming crimson-osd won't need to be compatible with pre-octopus + // releases + BufferedRecoveryMessages ctx{ceph_release_t::octopus}; + compound_state() = default; + ~compound_state() { + promise.set_value(std::move(ctx)); + } +}; +using compound_state_ref = seastar::lw_shared_ptr<compound_state>; + +class PeeringSubEvent : public RemotePeeringEvent { + compound_state_ref state; +public: + template <typename... Args> + PeeringSubEvent(compound_state_ref state, Args &&... args) : + RemotePeeringEvent(std::forward<Args>(args)...), state(state) {} + + seastar::future<> complete_rctx(Ref<crimson::osd::PG> pg) final { + logger().debug("{}: submitting ctx transaction", *this); + state->ctx.accept_buffered_messages(ctx); + state = {}; + if (!pg) { + ceph_assert(ctx.transaction.empty()); + return seastar::now(); + } else { + return osd.get_shard_services().dispatch_context_transaction( + pg->get_collection_ref(), ctx); + } + } +}; + +std::vector<OperationRef> handle_pg_create( + OSD &osd, + crimson::net::ConnectionRef conn, + compound_state_ref state, + Ref<MOSDPGCreate2> m) +{ + std::vector<OperationRef> ret; + for (auto& [pgid, when] : m->pgs) { + const auto &[created, created_stamp] = when; + auto q = m->pg_extra.find(pgid); + ceph_assert(q != m->pg_extra.end()); + auto& [history, pi] = q->second; + logger().debug( + "{}: {} e{} @{} " + "history {} pi {}", + __func__, pgid, created, created_stamp, + history, pi); + if (!pi.empty() && + m->epoch < pi.get_bounds().second) { + logger().error( + "got pg_create on {} epoch {} " + "unmatched past_intervals {} (history {})", + pgid, m->epoch, + pi, history); + } else { + auto op = osd.get_shard_services().start_operation<PeeringSubEvent>( + state, + osd, + conn, + osd.get_shard_services(), + pg_shard_t(), + pgid, + m->epoch, + m->epoch, + NullEvt(), + true, + new PGCreateInfo(pgid, m->epoch, history, pi, true)).first; + ret.push_back(op); + } + } + return ret; +} + +struct SubOpBlocker : BlockerT<SubOpBlocker> { + static constexpr const char * type_name = "CompoundOpBlocker"; + + std::vector<OperationRef> subops; + SubOpBlocker(std::vector<OperationRef> &&subops) : subops(subops) {} + + virtual void dump_detail(Formatter *f) const { + f->open_array_section("dependent_operations"); + { + for (auto &i : subops) { + i->dump_brief(f); + } + } + f->close_section(); + } +}; + +} // namespace + +namespace crimson::osd { + +CompoundPeeringRequest::CompoundPeeringRequest( + OSD &osd, crimson::net::ConnectionRef conn, Ref<Message> m) + : osd(osd), + conn(conn), + m(m) +{} + +void CompoundPeeringRequest::print(std::ostream &lhs) const +{ + lhs << *m; +} + +void CompoundPeeringRequest::dump_detail(Formatter *f) const +{ + f->dump_stream("message") << *m; +} + +seastar::future<> CompoundPeeringRequest::start() +{ + logger().info("{}: starting", *this); + auto state = seastar::make_lw_shared<compound_state>(); + auto blocker = std::make_unique<SubOpBlocker>( + [&] { + assert((m->get_type() == MSG_OSD_PG_CREATE2)); + return handle_pg_create( + osd, + conn, + state, + boost::static_pointer_cast<MOSDPGCreate2>(m)); + }()); + + IRef ref = this; + logger().info("{}: about to fork future", *this); + return crimson::common::handle_system_shutdown( + [this, ref, blocker=std::move(blocker), state]() mutable { + return with_blocking_future( + blocker->make_blocking_future(state->promise.get_future()) + ).then([this, blocker=std::move(blocker)](auto &&ctx) { + logger().info("{}: sub events complete", *this); + return osd.get_shard_services().dispatch_context_messages(std::move(ctx)); + }).then([this, ref=std::move(ref)] { + logger().info("{}: complete", *this); + }); + }); +} + +} // namespace crimson::osd diff --git a/src/crimson/osd/osd_operations/compound_peering_request.h b/src/crimson/osd/osd_operations/compound_peering_request.h new file mode 100644 index 000000000..495306d75 --- /dev/null +++ b/src/crimson/osd/osd_operations/compound_peering_request.h @@ -0,0 +1,40 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <iostream> +#include <seastar/core/future.hh> + +#include "msg/MessageRef.h" + +#include "crimson/net/Connection.h" +#include "crimson/osd/osd_operation.h" + +namespace crimson::osd { + +class OSD; +class PG; + +using osd_id_t = int; + +class CompoundPeeringRequest : public OperationT<CompoundPeeringRequest> { +public: + static constexpr OperationTypeCode type = + OperationTypeCode::compound_peering_request; + +private: + OSD &osd; + crimson::net::ConnectionRef conn; + Ref<Message> m; + +public: + CompoundPeeringRequest( + OSD &osd, crimson::net::ConnectionRef conn, Ref<Message> m); + + void print(std::ostream &) const final; + void dump_detail(Formatter *f) const final; + seastar::future<> start(); +}; + +} diff --git a/src/crimson/osd/osd_operations/osdop_params.h b/src/crimson/osd/osd_operations/osdop_params.h new file mode 100644 index 000000000..a0bd9dcbb --- /dev/null +++ b/src/crimson/osd/osd_operations/osdop_params.h @@ -0,0 +1,27 @@ +#pragma once + +#include "messages/MOSDOp.h" +#include "osd/osd_types.h" +#include "crimson/common/type_helpers.h" + +// The fields in this struct are parameters that may be needed in multiple +// level of processing. I inclosed all those parameters in this struct to +// avoid passing each of them as a method parameter. +struct osd_op_params_t { + Ref<MOSDOp> req; + eversion_t at_version; + eversion_t pg_trim_to; + eversion_t min_last_complete_ondisk; + eversion_t last_complete; + version_t user_at_version = 0; + bool user_modify = false; + ObjectCleanRegions clean_regions; + + osd_op_params_t() = default; + osd_op_params_t(Ref<MOSDOp>&& req) : req(req) {} + osd_op_params_t(Ref<MOSDOp>&& req, eversion_t at_version, eversion_t pg_trim_to, + eversion_t mlcod, eversion_t lc, version_t user_at_version) : + req(req), at_version(at_version), pg_trim_to(pg_trim_to), + min_last_complete_ondisk(mlcod), last_complete(lc), + user_at_version(user_at_version) {} +}; diff --git a/src/crimson/osd/osd_operations/peering_event.cc b/src/crimson/osd/osd_operations/peering_event.cc new file mode 100644 index 000000000..d3c6ccf81 --- /dev/null +++ b/src/crimson/osd/osd_operations/peering_event.cc @@ -0,0 +1,173 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <seastar/core/future.hh> + +#include "messages/MOSDPGLog.h" + +#include "common/Formatter.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/osd.h" +#include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/osd_connection_priv.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +void PeeringEvent::print(std::ostream &lhs) const +{ + lhs << "PeeringEvent(" + << "from=" << from + << " pgid=" << pgid + << " sent=" << evt.get_epoch_sent() + << " requested=" << evt.get_epoch_requested() + << " evt=" << evt.get_desc() + << ")"; +} + +void PeeringEvent::dump_detail(Formatter *f) const +{ + f->open_object_section("PeeringEvent"); + f->dump_stream("from") << from; + f->dump_stream("pgid") << pgid; + f->dump_int("sent", evt.get_epoch_sent()); + f->dump_int("requested", evt.get_epoch_requested()); + f->dump_string("evt", evt.get_desc()); + f->close_section(); +} + + +PeeringEvent::PGPipeline &PeeringEvent::pp(PG &pg) +{ + return pg.peering_request_pg_pipeline; +} + +seastar::future<> PeeringEvent::start() +{ + + logger().debug("{}: start", *this); + + IRef ref = this; + return [this] { + if (delay) { + return seastar::sleep(std::chrono::milliseconds( + std::lround(delay*1000))); + } else { + return seastar::now(); + } + }().then([this] { + return get_pg(); + }).then([this](Ref<PG> pg) { + if (!pg) { + logger().warn("{}: pg absent, did not create", *this); + on_pg_absent(); + handle.exit(); + return complete_rctx(pg); + } else { + logger().debug("{}: pg present", *this); + return with_blocking_future(handle.enter(pp(*pg).await_map) + ).then([this, pg] { + return with_blocking_future( + pg->osdmap_gate.wait_for_map(evt.get_epoch_sent())); + }).then([this, pg](auto) { + return with_blocking_future(handle.enter(pp(*pg).process)); + }).then([this, pg] { + // TODO: likely we should synchronize also with the pg log-based + // recovery. + return with_blocking_future( + handle.enter(BackfillRecovery::bp(*pg).process)); + }).then([this, pg] { + pg->do_peering_event(evt, ctx); + handle.exit(); + return complete_rctx(pg); + }).then([this, pg] { + return pg->get_need_up_thru() ? shard_services.send_alive(pg->get_same_interval_since()) + : seastar::now(); + }); + } + }).then([this] { + return shard_services.send_pg_temp(); + }).then([this, ref=std::move(ref)] { + logger().debug("{}: complete", *this); + }); +} + +void PeeringEvent::on_pg_absent() +{ + logger().debug("{}: pg absent, dropping", *this); +} + +seastar::future<> PeeringEvent::complete_rctx(Ref<PG> pg) +{ + logger().debug("{}: submitting ctx", *this); + return shard_services.dispatch_context( + pg->get_collection_ref(), + std::move(ctx)); +} + +RemotePeeringEvent::ConnectionPipeline &RemotePeeringEvent::cp() +{ + return get_osd_priv(conn.get()).peering_request_conn_pipeline; +} + +void RemotePeeringEvent::on_pg_absent() +{ + if (auto& e = get_event().get_event(); + e.dynamic_type() == MQuery::static_type()) { + const auto map_epoch = + shard_services.get_osdmap_service().get_map()->get_epoch(); + const auto& q = static_cast<const MQuery&>(e); + const pg_info_t empty{spg_t{pgid.pgid, q.query.to}}; + if (q.query.type == q.query.LOG || + q.query.type == q.query.FULLLOG) { + auto m = ceph::make_message<MOSDPGLog>(q.query.from, q.query.to, + map_epoch, empty, + q.query.epoch_sent); + ctx.send_osd_message(q.from.osd, std::move(m)); + } else { + ctx.send_notify(q.from.osd, {q.query.from, q.query.to, + q.query.epoch_sent, + map_epoch, empty, + PastIntervals{}}); + } + } +} + +seastar::future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg) +{ + if (pg) { + return PeeringEvent::complete_rctx(pg); + } else { + return shard_services.dispatch_context_messages(std::move(ctx)); + } +} + +seastar::future<Ref<PG>> RemotePeeringEvent::get_pg() +{ + return with_blocking_future( + handle.enter(cp().await_map) + ).then([this] { + return with_blocking_future( + osd.osdmap_gate.wait_for_map(evt.get_epoch_sent())); + }).then([this](auto epoch) { + logger().debug("{}: got map {}", *this, epoch); + return with_blocking_future(handle.enter(cp().get_pg)); + }).then([this] { + return with_blocking_future( + osd.get_or_create_pg( + pgid, evt.get_epoch_sent(), std::move(evt.create_info))); + }); +} + +seastar::future<Ref<PG>> LocalPeeringEvent::get_pg() { + return seastar::make_ready_future<Ref<PG>>(pg); +} + +LocalPeeringEvent::~LocalPeeringEvent() {} + +} diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h new file mode 100644 index 000000000..3a6c0678c --- /dev/null +++ b/src/crimson/osd/osd_operations/peering_event.h @@ -0,0 +1,142 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <iostream> +#include <seastar/core/future.hh> + +#include "crimson/osd/osd_operation.h" +#include "osd/osd_types.h" +#include "osd/PGPeeringEvent.h" +#include "osd/PeeringState.h" + +namespace ceph { + class Formatter; +} + +namespace crimson::osd { + +class OSD; +class ShardServices; +class PG; + +class PeeringEvent : public OperationT<PeeringEvent> { +public: + static constexpr OperationTypeCode type = OperationTypeCode::peering_event; + + class PGPipeline { + OrderedPipelinePhase await_map = { + "PeeringEvent::PGPipeline::await_map" + }; + OrderedPipelinePhase process = { + "PeeringEvent::PGPipeline::process" + }; + friend class PeeringEvent; + friend class PGAdvanceMap; + }; + +protected: + OrderedPipelinePhase::Handle handle; + PGPipeline &pp(PG &pg); + + ShardServices &shard_services; + PeeringCtx ctx; + pg_shard_t from; + spg_t pgid; + float delay = 0; + PGPeeringEvent evt; + + const pg_shard_t get_from() const { + return from; + } + + const spg_t get_pgid() const { + return pgid; + } + + const PGPeeringEvent &get_event() const { + return evt; + } + + virtual void on_pg_absent(); + virtual seastar::future<> complete_rctx(Ref<PG>); + virtual seastar::future<Ref<PG>> get_pg() = 0; + +public: + template <typename... Args> + PeeringEvent( + ShardServices &shard_services, const pg_shard_t &from, const spg_t &pgid, + Args&&... args) : + shard_services(shard_services), + ctx{ceph_release_t::octopus}, + from(from), + pgid(pgid), + evt(std::forward<Args>(args)...) + {} + template <typename... Args> + PeeringEvent( + ShardServices &shard_services, const pg_shard_t &from, const spg_t &pgid, + float delay, Args&&... args) : + shard_services(shard_services), + ctx{ceph_release_t::octopus}, + from(from), + pgid(pgid), + delay(delay), + evt(std::forward<Args>(args)...) + {} + + void print(std::ostream &) const final; + void dump_detail(ceph::Formatter* f) const final; + seastar::future<> start(); +}; + +class RemotePeeringEvent : public PeeringEvent { +protected: + OSD &osd; + crimson::net::ConnectionRef conn; + + void on_pg_absent() final; + seastar::future<> complete_rctx(Ref<PG> pg) override; + seastar::future<Ref<PG>> get_pg() final; + +public: + class ConnectionPipeline { + OrderedPipelinePhase await_map = { + "PeeringRequest::ConnectionPipeline::await_map" + }; + OrderedPipelinePhase get_pg = { + "PeeringRequest::ConnectionPipeline::get_pg" + }; + friend class RemotePeeringEvent; + }; + + template <typename... Args> + RemotePeeringEvent(OSD &osd, crimson::net::ConnectionRef conn, Args&&... args) : + PeeringEvent(std::forward<Args>(args)...), + osd(osd), + conn(conn) + {} + +private: + ConnectionPipeline &cp(); +}; + +class LocalPeeringEvent final : public PeeringEvent { +protected: + seastar::future<Ref<PG>> get_pg() final; + + Ref<PG> pg; + +public: + template <typename... Args> + LocalPeeringEvent(Ref<PG> pg, Args&&... args) : + PeeringEvent(std::forward<Args>(args)...), + pg(pg) + {} + + virtual ~LocalPeeringEvent(); +}; + + +} diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc new file mode 100644 index 000000000..a96479d40 --- /dev/null +++ b/src/crimson/osd/osd_operations/pg_advance_map.cc @@ -0,0 +1,97 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "crimson/osd/osd_operations/pg_advance_map.h" + +#include <boost/smart_ptr/local_shared_ptr.hpp> +#include <seastar/core/future.hh> + +#include "include/types.h" +#include "common/Formatter.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/osd.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +PGAdvanceMap::PGAdvanceMap( + OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to, + PeeringCtx &&rctx, bool do_init) + : osd(osd), pg(pg), from(from), to(to), + rctx(std::move(rctx)), do_init(do_init) {} + +PGAdvanceMap::~PGAdvanceMap() {} + +void PGAdvanceMap::print(std::ostream &lhs) const +{ + lhs << "PGAdvanceMap(" + << "pg=" << pg->get_pgid() + << " from=" << from + << " to=" << to; + if (do_init) { + lhs << " do_init"; + } + lhs << ")"; +} + +void PGAdvanceMap::dump_detail(Formatter *f) const +{ + f->open_object_section("PGAdvanceMap"); + f->dump_stream("pgid") << pg->get_pgid(); + f->dump_int("from", from); + f->dump_int("to", to); + f->dump_bool("do_init", do_init); + f->close_section(); +} + +seastar::future<> PGAdvanceMap::start() +{ + using cached_map_t = boost::local_shared_ptr<const OSDMap>; + + logger().debug("{}: start", *this); + + IRef ref = this; + return with_blocking_future( + handle.enter(pg->peering_request_pg_pipeline.process)) + .then([this] { + if (do_init) { + pg->handle_initialize(rctx); + pg->handle_activate_map(rctx); + } + return seastar::do_for_each( + boost::make_counting_iterator(from + 1), + boost::make_counting_iterator(to + 1), + [this](epoch_t next_epoch) { + return osd.get_map(next_epoch).then( + [this] (cached_map_t&& next_map) { + pg->handle_advance_map(next_map, rctx); + }); + }).then([this] { + pg->handle_activate_map(rctx); + handle.exit(); + if (do_init) { + osd.pg_map.pg_created(pg->get_pgid(), pg); + osd.shard_services.inc_pg_num(); + logger().info("PGAdvanceMap::start new pg {}", *pg); + } + return seastar::when_all_succeed( + pg->get_need_up_thru() \ + ? osd.shard_services.send_alive(pg->get_same_interval_since()) + : seastar::now(), + osd.shard_services.dispatch_context( + pg->get_collection_ref(), + std::move(rctx))); + }).then_unpack([this] { + return osd.shard_services.send_pg_temp(); + }); + }).then([this, ref=std::move(ref)] { + logger().debug("{}: complete", *this); + }); +} + +} diff --git a/src/crimson/osd/osd_operations/pg_advance_map.h b/src/crimson/osd/osd_operations/pg_advance_map.h new file mode 100644 index 000000000..1b27037eb --- /dev/null +++ b/src/crimson/osd/osd_operations/pg_advance_map.h @@ -0,0 +1,50 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <iostream> +#include <seastar/core/future.hh> + +#include "crimson/osd/osd_operation.h" +#include "osd/osd_types.h" +#include "crimson/common/type_helpers.h" +#include "osd/PeeringState.h" + +namespace ceph { + class Formatter; +} + +namespace crimson::osd { + +class OSD; +class PG; + +class PGAdvanceMap : public OperationT<PGAdvanceMap> { +public: + static constexpr OperationTypeCode type = OperationTypeCode::pg_advance_map; + +protected: + OrderedPipelinePhase::Handle handle; + + OSD &osd; + Ref<PG> pg; + + epoch_t from; + epoch_t to; + + PeeringCtx rctx; + const bool do_init; + +public: + PGAdvanceMap( + OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to, + PeeringCtx &&rctx, bool do_init); + ~PGAdvanceMap(); + + void print(std::ostream &) const final; + void dump_detail(ceph::Formatter *f) const final; + seastar::future<> start(); +}; + +} diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.cc b/src/crimson/osd/osd_operations/recovery_subrequest.cc new file mode 100644 index 000000000..820c7beab --- /dev/null +++ b/src/crimson/osd/osd_operations/recovery_subrequest.cc @@ -0,0 +1,29 @@ +#include <fmt/format.h> +#include <fmt/ostream.h> + +#include "crimson/osd/osd_operations/recovery_subrequest.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +seastar::future<> RecoverySubRequest::start() { + logger().debug("{}: start", *this); + + IRef opref = this; + return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch())) + .then([this] (epoch_t epoch) { + return with_blocking_future(osd.wait_for_pg(m->get_spg())); + }).then([this, opref=std::move(opref)] (Ref<PG> pgref) { + return seastar::do_with(std::move(pgref), std::move(opref), + [this](auto& pgref, auto& opref) { + return pgref->get_recovery_backend()->handle_recovery_op(m); + }); + }); +} + +} diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.h b/src/crimson/osd/osd_operations/recovery_subrequest.h new file mode 100644 index 000000000..b151e5c1d --- /dev/null +++ b/src/crimson/osd/osd_operations/recovery_subrequest.h @@ -0,0 +1,45 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "osd/osd_op_util.h" +#include "crimson/net/Connection.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/osd.h" +#include "crimson/common/type_helpers.h" +#include "messages/MOSDPGPull.h" +#include "messages/MOSDPGPush.h" +#include "messages/MOSDPGPushReply.h" +#include "messages/MOSDPGRecoveryDelete.h" +#include "messages/MOSDPGRecoveryDeleteReply.h" + +namespace crimson::osd { + +class OSD; +class PG; + +class RecoverySubRequest final : public OperationT<RecoverySubRequest> { +public: + static constexpr OperationTypeCode type = OperationTypeCode::background_recovery_sub; + + RecoverySubRequest(OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDFastDispatchOp>&& m) + : osd(osd), conn(conn), m(m) {} + + void print(std::ostream& out) const final + { + out << *m; + } + + void dump_detail(Formatter *f) const final + { + } + + seastar::future<> start(); +private: + OSD& osd; + crimson::net::ConnectionRef conn; + Ref<MOSDFastDispatchOp> m; +}; + +} diff --git a/src/crimson/osd/osd_operations/replicated_request.cc b/src/crimson/osd/osd_operations/replicated_request.cc new file mode 100644 index 000000000..34487f9e4 --- /dev/null +++ b/src/crimson/osd/osd_operations/replicated_request.cc @@ -0,0 +1,74 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "replicated_request.h" + +#include "common/Formatter.h" +#include "messages/MOSDRepOp.h" + +#include "crimson/osd/osd.h" +#include "crimson/osd/osd_connection_priv.h" +#include "crimson/osd/pg.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +RepRequest::RepRequest(OSD &osd, + crimson::net::ConnectionRef&& conn, + Ref<MOSDRepOp> &&req) + : osd{osd}, + conn{std::move(conn)}, + req{req} +{} + +void RepRequest::print(std::ostream& os) const +{ + os << "RepRequest(" + << "from=" << req->from + << " req=" << *req + << ")"; +} + +void RepRequest::dump_detail(Formatter *f) const +{ + f->open_object_section("RepRequest"); + f->dump_stream("reqid") << req->reqid; + f->dump_stream("pgid") << req->get_spg(); + f->dump_unsigned("map_epoch", req->get_map_epoch()); + f->dump_unsigned("min_epoch", req->get_min_epoch()); + f->dump_stream("oid") << req->poid; + f->dump_stream("from") << req->from; + f->close_section(); +} + +RepRequest::ConnectionPipeline &RepRequest::cp() +{ + return get_osd_priv(conn.get()).replicated_request_conn_pipeline; +} + +RepRequest::PGPipeline &RepRequest::pp(PG &pg) +{ + return pg.replicated_request_pg_pipeline; +} + +seastar::future<> RepRequest::start() +{ + logger().debug("{} start", *this); + IRef ref = this; + return with_blocking_future(handle.enter(cp().await_map)) + .then([this]() { + return with_blocking_future(osd.osdmap_gate.wait_for_map(req->get_min_epoch())); + }).then([this](epoch_t epoch) { + return with_blocking_future(handle.enter(cp().get_pg)); + }).then([this] { + return with_blocking_future(osd.wait_for_pg(req->get_spg())); + }).then([this, ref=std::move(ref)](Ref<PG> pg) { + return pg->handle_rep_op(std::move(req)); + }); +} +} diff --git a/src/crimson/osd/osd_operations/replicated_request.h b/src/crimson/osd/osd_operations/replicated_request.h new file mode 100644 index 000000000..8e9cfc9fe --- /dev/null +++ b/src/crimson/osd/osd_operations/replicated_request.h @@ -0,0 +1,58 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/net/Connection.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/common/type_helpers.h" + +class MOSDRepOp; + +namespace ceph { + class Formatter; +} + +namespace crimson::osd { + +class OSD; +class PG; + +class RepRequest final : public OperationT<RepRequest> { +public: + class ConnectionPipeline { + OrderedPipelinePhase await_map = { + "RepRequest::ConnectionPipeline::await_map" + }; + OrderedPipelinePhase get_pg = { + "RepRequest::ConnectionPipeline::get_pg" + }; + friend RepRequest; + }; + class PGPipeline { + OrderedPipelinePhase await_map = { + "RepRequest::PGPipeline::await_map" + }; + OrderedPipelinePhase process = { + "RepRequest::PGPipeline::process" + }; + friend RepRequest; + }; + static constexpr OperationTypeCode type = OperationTypeCode::replicated_request; + RepRequest(OSD&, crimson::net::ConnectionRef&&, Ref<MOSDRepOp>&&); + + void print(std::ostream &) const final; + void dump_detail(ceph::Formatter* f) const final; + seastar::future<> start(); + +private: + ConnectionPipeline &cp(); + PGPipeline &pp(PG &pg); + + OSD &osd; + crimson::net::ConnectionRef conn; + Ref<MOSDRepOp> req; + OrderedPipelinePhase::Handle handle; +}; + +} |