diff options
Diffstat (limited to 'src/crimson/osd/osd_operations')
24 files changed, 3316 insertions, 0 deletions
diff --git a/src/crimson/osd/osd_operations/background_recovery.cc b/src/crimson/osd/osd_operations/background_recovery.cc new file mode 100644 index 000000000..953ec9595 --- /dev/null +++ b/src/crimson/osd/osd_operations/background_recovery.cc @@ -0,0 +1,207 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <seastar/core/future.hh> +#include <seastar/core/sleep.hh> + +#include "messages/MOSDOp.h" + +#include "crimson/osd/pg.h" +#include "crimson/osd/shard_services.h" +#include "common/Formatter.h" +#include "crimson/osd/osd_operation_external_tracking.h" +#include "crimson/osd/osd_operations/background_recovery.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson { + template <> + struct EventBackendRegistry<osd::UrgentRecovery> { + static std::tuple<> get_backends() { + return {}; + } + }; + + template <> + struct EventBackendRegistry<osd::PglogBasedRecovery> { + static std::tuple<> get_backends() { + return {}; + } + }; +} + +namespace crimson::osd { + +template <class T> +BackgroundRecoveryT<T>::BackgroundRecoveryT( + Ref<PG> pg, + ShardServices &ss, + epoch_t epoch_started, + crimson::osd::scheduler::scheduler_class_t scheduler_class, + float delay) + : pg(pg), + epoch_started(epoch_started), + delay(delay), + ss(ss), + scheduler_class(scheduler_class) +{} + +template <class T> +void BackgroundRecoveryT<T>::print(std::ostream &lhs) const +{ + lhs << "BackgroundRecovery(" << pg->get_pgid() << ")"; +} + +template <class T> +void BackgroundRecoveryT<T>::dump_detail(Formatter *f) const +{ + f->dump_stream("pgid") << pg->get_pgid(); + f->open_object_section("recovery_detail"); + { + // TODO pg->dump_recovery_state(f); + } + f->close_section(); +} + +template <class T> +seastar::future<> BackgroundRecoveryT<T>::start() +{ + logger().debug("{}: start", *this); + + typename T::IRef ref = static_cast<T*>(this); + auto maybe_delay = seastar::now(); + if (delay) { + maybe_delay = seastar::sleep( + std::chrono::milliseconds(std::lround(delay * 1000))); + } + return maybe_delay.then([ref, this] { + return this->template with_blocking_event<OperationThrottler::BlockingEvent>( + [ref, this] (auto&& trigger) { + return ss.with_throttle_while( + std::move(trigger), + this, get_scheduler_params(), [this] { + return T::interruptor::with_interruption([this] { + return do_recovery(); + }, [](std::exception_ptr) { + return seastar::make_ready_future<bool>(false); + }, pg); + }).handle_exception_type([ref, this](const std::system_error& err) { + if (err.code() == std::make_error_code(std::errc::interrupted)) { + logger().debug("{} recovery interruped: {}", *pg, err.what()); + return seastar::now(); + } + return seastar::make_exception_future<>(err); + }); + }); + }); +} + +UrgentRecovery::UrgentRecovery( + const hobject_t& soid, + const eversion_t& need, + Ref<PG> pg, + ShardServices& ss, + epoch_t epoch_started) + : BackgroundRecoveryT{pg, ss, epoch_started, + crimson::osd::scheduler::scheduler_class_t::immediate}, + soid{soid}, need(need) +{ +} + +UrgentRecovery::interruptible_future<bool> +UrgentRecovery::do_recovery() +{ + logger().debug("{}: {}", __func__, *this); + if (!pg->has_reset_since(epoch_started)) { + return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent, + interruptor>([this] (auto&& trigger) { + return pg->get_recovery_handler()->recover_missing(trigger, soid, need); + }).then_interruptible([] { + return seastar::make_ready_future<bool>(false); + }); + } + return seastar::make_ready_future<bool>(false); +} + +void UrgentRecovery::print(std::ostream &lhs) const +{ + lhs << "UrgentRecovery(" << pg->get_pgid() << ", " + << soid << ", v" << need << ", epoch_started: " + << epoch_started << ")"; +} + +void UrgentRecovery::dump_detail(Formatter *f) const +{ + f->dump_stream("pgid") << pg->get_pgid(); + f->open_object_section("recovery_detail"); + { + f->dump_stream("oid") << soid; + f->dump_stream("version") << need; + } + f->close_section(); +} + +PglogBasedRecovery::PglogBasedRecovery( + Ref<PG> pg, + ShardServices &ss, + const epoch_t epoch_started, + float delay) + : BackgroundRecoveryT( + std::move(pg), + ss, + epoch_started, + crimson::osd::scheduler::scheduler_class_t::background_recovery, + delay) +{} + +PglogBasedRecovery::interruptible_future<bool> +PglogBasedRecovery::do_recovery() +{ + if (pg->has_reset_since(epoch_started)) { + return seastar::make_ready_future<bool>(false); + } + return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent, + interruptor>([this] (auto&& trigger) { + return pg->get_recovery_handler()->start_recovery_ops( + trigger, + crimson::common::local_conf()->osd_recovery_max_single_start); + }); +} + +PGPeeringPipeline &BackfillRecovery::peering_pp(PG &pg) +{ + return pg.peering_request_pg_pipeline; +} + +BackfillRecovery::interruptible_future<bool> +BackfillRecovery::do_recovery() +{ + logger().debug("{}", __func__); + + if (pg->has_reset_since(epoch_started)) { + logger().debug("{}: pg got reset since epoch_started={}", + __func__, epoch_started); + return seastar::make_ready_future<bool>(false); + } + // TODO: limits + return enter_stage<interruptor>( + // process_event() of our boost::statechart machine is non-reentrant. + // with the backfill_pipeline we protect it from a second entry from + // the implementation of BackfillListener. + // additionally, this stage serves to synchronize with PeeringEvent. + peering_pp(*pg).process + ).then_interruptible([this] { + pg->get_recovery_handler()->dispatch_backfill_event(std::move(evt)); + return seastar::make_ready_future<bool>(false); + }); +} + +template class BackgroundRecoveryT<UrgentRecovery>; +template class BackgroundRecoveryT<PglogBasedRecovery>; +template class BackgroundRecoveryT<BackfillRecovery>; + +} // namespace crimson::osd diff --git a/src/crimson/osd/osd_operations/background_recovery.h b/src/crimson/osd/osd_operations/background_recovery.h new file mode 100644 index 000000000..17f2cd57a --- /dev/null +++ b/src/crimson/osd/osd_operations/background_recovery.h @@ -0,0 +1,144 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <boost/statechart/event_base.hpp> + +#include "crimson/net/Connection.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/recovery_backend.h" +#include "crimson/common/type_helpers.h" +#include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/pg.h" + +namespace crimson::osd { +class PG; +class ShardServices; + +template <class T> +class BackgroundRecoveryT : public PhasedOperationT<T> { +public: + static constexpr OperationTypeCode type = OperationTypeCode::background_recovery; + + BackgroundRecoveryT( + Ref<PG> pg, + ShardServices &ss, + epoch_t epoch_started, + crimson::osd::scheduler::scheduler_class_t scheduler_class, float delay = 0); + + virtual void print(std::ostream &) const; + seastar::future<> start(); + +protected: + Ref<PG> pg; + const epoch_t epoch_started; + float delay = 0; + +private: + virtual void dump_detail(Formatter *f) const; + crimson::osd::scheduler::params_t get_scheduler_params() const { + return { + 1, // cost + 0, // owner + scheduler_class + }; + } + using do_recovery_ret_t = typename PhasedOperationT<T>::template interruptible_future<bool>; + virtual do_recovery_ret_t do_recovery() = 0; + ShardServices &ss; + const crimson::osd::scheduler::scheduler_class_t scheduler_class; +}; + +/// represent a recovery initiated for serving a client request +/// +/// unlike @c PglogBasedRecovery and @c BackfillRecovery, +/// @c UrgentRecovery is not throttled by the scheduler. and it +/// utilizes @c RecoveryBackend directly to recover the unreadable +/// object. +class UrgentRecovery final : public BackgroundRecoveryT<UrgentRecovery> { +public: + UrgentRecovery( + const hobject_t& soid, + const eversion_t& need, + Ref<PG> pg, + ShardServices& ss, + epoch_t epoch_started); + void print(std::ostream&) const final; + + std::tuple< + OperationThrottler::BlockingEvent, + RecoveryBackend::RecoveryBlockingEvent + > tracking_events; + +private: + void dump_detail(Formatter* f) const final; + interruptible_future<bool> do_recovery() override; + const hobject_t soid; + const eversion_t need; +}; + +class PglogBasedRecovery final : public BackgroundRecoveryT<PglogBasedRecovery> { +public: + PglogBasedRecovery( + Ref<PG> pg, + ShardServices &ss, + epoch_t epoch_started, + float delay = 0); + + std::tuple< + OperationThrottler::BlockingEvent, + RecoveryBackend::RecoveryBlockingEvent + > tracking_events; + +private: + interruptible_future<bool> do_recovery() override; +}; + +class BackfillRecovery final : public BackgroundRecoveryT<BackfillRecovery> { +public: + + template <class EventT> + BackfillRecovery( + Ref<PG> pg, + ShardServices &ss, + epoch_t epoch_started, + const EventT& evt); + + PipelineHandle& get_handle() { return handle; } + + std::tuple< + OperationThrottler::BlockingEvent, + PGPeeringPipeline::Process::BlockingEvent + > tracking_events; + +private: + boost::intrusive_ptr<const boost::statechart::event_base> evt; + PipelineHandle handle; + + static PGPeeringPipeline &peering_pp(PG &pg); + interruptible_future<bool> do_recovery() override; +}; + +template <class EventT> +BackfillRecovery::BackfillRecovery( + Ref<PG> pg, + ShardServices &ss, + const epoch_t epoch_started, + const EventT& evt) + : BackgroundRecoveryT( + std::move(pg), + ss, + epoch_started, + crimson::osd::scheduler::scheduler_class_t::background_best_effort), + evt(evt.intrusive_from_this()) +{} + +} + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::osd::BackfillRecovery> : fmt::ostream_formatter {}; +template <> struct fmt::formatter<crimson::osd::PglogBasedRecovery> : fmt::ostream_formatter {}; +template <> struct fmt::formatter<crimson::osd::UrgentRecovery> : fmt::ostream_formatter {}; +template <class T> struct fmt::formatter<crimson::osd::BackgroundRecoveryT<T>> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc new file mode 100644 index 000000000..9374fbde2 --- /dev/null +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -0,0 +1,388 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" + +#include "crimson/common/exception.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/osd.h" +#include "common/Formatter.h" +#include "crimson/osd/osd_operation_external_tracking.h" +#include "crimson/osd/osd_operations/client_request.h" +#include "crimson/osd/osd_connection_priv.h" +#include "osd/object_state_fmt.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + + +void ClientRequest::Orderer::requeue( + ShardServices &shard_services, Ref<PG> pg) +{ + for (auto &req: list) { + logger().debug("{}: {} requeueing {}", __func__, *pg, req); + req.reset_instance_handle(); + std::ignore = req.with_pg_int(shard_services, pg); + } +} + +void ClientRequest::Orderer::clear_and_cancel() +{ + for (auto i = list.begin(); i != list.end(); ) { + logger().debug( + "ClientRequest::Orderer::clear_and_cancel: {}", + *i); + i->complete_request(); + remove_request(*(i++)); + } +} + +void ClientRequest::complete_request() +{ + track_event<CompletionEvent>(); + on_complete.set_value(); +} + +ClientRequest::ClientRequest( + ShardServices &shard_services, crimson::net::ConnectionRef conn, + Ref<MOSDOp> &&m) + : put_historic_shard_services(&shard_services), + conn(std::move(conn)), + m(std::move(m)), + instance_handle(new instance_handle_t) +{} + +ClientRequest::~ClientRequest() +{ + logger().debug("{}: destroying", *this); +} + +void ClientRequest::print(std::ostream &lhs) const +{ + lhs << "m=[" << *m << "]"; +} + +void ClientRequest::dump_detail(Formatter *f) const +{ + logger().debug("{}: dumping", *this); + std::apply([f] (auto... event) { + (..., event.dump(f)); + }, tracking_events); +} + +ConnectionPipeline &ClientRequest::get_connection_pipeline() +{ + return get_osd_priv(conn.get()).client_request_conn_pipeline; +} + +ClientRequest::PGPipeline &ClientRequest::client_pp(PG &pg) +{ + return pg.request_pg_pipeline; +} + +bool ClientRequest::is_pg_op() const +{ + return std::any_of( + begin(m->ops), end(m->ops), + [](auto& op) { return ceph_osd_op_type_pg(op.op.op); }); +} + +seastar::future<> ClientRequest::with_pg_int( + ShardServices &shard_services, Ref<PG> pgref) +{ + epoch_t same_interval_since = pgref->get_interval_start_epoch(); + logger().debug("{} same_interval_since: {}", *this, same_interval_since); + if (m->finish_decode()) { + m->clear_payload(); + } + const auto this_instance_id = instance_id++; + OperationRef opref{this}; + auto instance_handle = get_instance_handle(); + auto &ihref = *instance_handle; + return interruptor::with_interruption( + [this, pgref, this_instance_id, &ihref, &shard_services]() mutable { + PG &pg = *pgref; + if (pg.can_discard_op(*m)) { + return shard_services.send_incremental_map( + std::ref(*conn), m->get_map_epoch() + ).then([this, this_instance_id, pgref] { + logger().debug("{}.{}: discarding", *this, this_instance_id); + pgref->client_request_orderer.remove_request(*this); + complete_request(); + return interruptor::now(); + }); + } + return ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this + ).then_interruptible([this, this_instance_id, &pg, &ihref] { + logger().debug("{}.{}: after await_map stage", *this, this_instance_id); + return ihref.enter_blocker( + *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map, + m->get_min_epoch(), nullptr); + }).then_interruptible([this, this_instance_id, &pg, &ihref](auto map) { + logger().debug("{}.{}: after wait_for_map", *this, this_instance_id); + return ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this); + }).then_interruptible([this, this_instance_id, &pg, &ihref]() { + logger().debug( + "{}.{}: after wait_for_active stage", *this, this_instance_id); + return ihref.enter_blocker( + *this, + pg.wait_for_active_blocker, + &decltype(pg.wait_for_active_blocker)::wait); + }).then_interruptible([this, pgref, this_instance_id, &ihref]() mutable + -> interruptible_future<> { + logger().debug( + "{}.{}: after wait_for_active", *this, this_instance_id); + if (is_pg_op()) { + return process_pg_op(pgref); + } else { + return process_op(ihref, pgref); + } + }).then_interruptible([this, this_instance_id, pgref] { + logger().debug("{}.{}: after process*", *this, this_instance_id); + pgref->client_request_orderer.remove_request(*this); + complete_request(); + }); + }, [this, this_instance_id, pgref](std::exception_ptr eptr) { + // TODO: better debug output + logger().debug("{}.{}: interrupted {}", *this, this_instance_id, eptr); + }, pgref).finally( + [opref=std::move(opref), pgref=std::move(pgref), + instance_handle=std::move(instance_handle), &ihref] { + ihref.handle.exit(); + }); +} + +seastar::future<> ClientRequest::with_pg( + ShardServices &shard_services, Ref<PG> pgref) +{ + put_historic_shard_services = &shard_services; + pgref->client_request_orderer.add_request(*this); + auto ret = on_complete.get_future(); + std::ignore = with_pg_int( + shard_services, std::move(pgref) + ); + return ret; +} + +ClientRequest::interruptible_future<> +ClientRequest::process_pg_op( + Ref<PG> &pg) +{ + return pg->do_pg_ops( + m + ).then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) { + return conn->send(std::move(reply)); + }); +} + +auto ClientRequest::reply_op_error(const Ref<PG>& pg, int err) +{ + logger().debug("{}: replying with error {}", *this, err); + auto reply = crimson::make_message<MOSDOpReply>( + m.get(), err, pg->get_osdmap_epoch(), + m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK), + !m->has_flag(CEPH_OSD_FLAG_RETURNVEC)); + reply->set_reply_versions(eversion_t(), 0); + reply->set_op_returns(std::vector<pg_log_op_return_item_t>{}); + return conn->send(std::move(reply)); +} + +ClientRequest::interruptible_future<> +ClientRequest::process_op(instance_handle_t &ihref, Ref<PG> &pg) +{ + return ihref.enter_stage<interruptor>( + client_pp(*pg).recover_missing, + *this + ).then_interruptible( + [this, pg]() mutable { + if (pg->is_primary()) { + return do_recover_missing(pg, m->get_hobj()); + } else { + logger().debug("process_op: Skipping do_recover_missing" + "on non primary pg"); + return interruptor::now(); + } + }).then_interruptible([this, pg, &ihref]() mutable { + return pg->already_complete(m->get_reqid()).then_interruptible( + [this, pg, &ihref](auto completed) mutable + -> PG::load_obc_iertr::future<> { + if (completed) { + auto reply = crimson::make_message<MOSDOpReply>( + m.get(), completed->err, pg->get_osdmap_epoch(), + CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false); + reply->set_reply_versions(completed->version, completed->user_version); + return conn->send(std::move(reply)); + } else { + return ihref.enter_stage<interruptor>(client_pp(*pg).get_obc, *this + ).then_interruptible( + [this, pg, &ihref]() mutable -> PG::load_obc_iertr::future<> { + logger().debug("{}: in get_obc stage", *this); + op_info.set_from_op(&*m, *pg->get_osdmap()); + return pg->with_locked_obc( + m->get_hobj(), op_info, + [this, pg, &ihref](auto obc) mutable { + logger().debug("{}: got obc {}", *this, obc->obs); + return ihref.enter_stage<interruptor>( + client_pp(*pg).process, *this + ).then_interruptible([this, pg, obc, &ihref]() mutable { + return do_process(ihref, pg, obc); + }); + }); + }); + } + }); + }).handle_error_interruptible( + PG::load_obc_ertr::all_same_way([this, pg=std::move(pg)](const auto &code) { + logger().error("ClientRequest saw error code {}", code); + assert(code.value() > 0); + return reply_op_error(pg, -code.value()); + })); +} + +ClientRequest::interruptible_future<> +ClientRequest::do_process( + instance_handle_t &ihref, + Ref<PG>& pg, crimson::osd::ObjectContextRef obc) +{ + if (m->has_flag(CEPH_OSD_FLAG_PARALLELEXEC)) { + return reply_op_error(pg, -EINVAL); + } + const pg_pool_t pool = pg->get_pgpool().info; + if (pool.has_flag(pg_pool_t::FLAG_EIO)) { + // drop op on the floor; the client will handle returning EIO + if (m->has_flag(CEPH_OSD_FLAG_SUPPORTSPOOLEIO)) { + logger().debug("discarding op due to pool EIO flag"); + return seastar::now(); + } else { + logger().debug("replying EIO due to pool EIO flag"); + return reply_op_error(pg, -EIO); + } + } + if (m->get_oid().name.size() + > crimson::common::local_conf()->osd_max_object_name_len) { + return reply_op_error(pg, -ENAMETOOLONG); + } else if (m->get_hobj().get_key().size() + > crimson::common::local_conf()->osd_max_object_name_len) { + return reply_op_error(pg, -ENAMETOOLONG); + } else if (m->get_hobj().nspace.size() + > crimson::common::local_conf()->osd_max_object_namespace_len) { + return reply_op_error(pg, -ENAMETOOLONG); + } else if (m->get_hobj().oid.name.empty()) { + return reply_op_error(pg, -EINVAL); + } else if (pg->get_osdmap()->is_blocklisted(conn->get_peer_addr())) { + logger().info("{} is blocklisted", conn->get_peer_addr()); + return reply_op_error(pg, -EBLOCKLISTED); + } + + if (!obc->obs.exists && !op_info.may_write()) { + return reply_op_error(pg, -ENOENT); + } + + SnapContext snapc = get_snapc(pg,obc); + + if ((m->has_flag(CEPH_OSD_FLAG_ORDERSNAP)) && + snapc.seq < obc->ssc->snapset.seq) { + logger().debug("{} ORDERSNAP flag set and snapc seq {}", + " < snapset seq {} on {}", + __func__, snapc.seq, obc->ssc->snapset.seq, + obc->obs.oi.soid); + return reply_op_error(pg, -EOLDSNAPC); + } + + if (!pg->is_primary()) { + // primary can handle both normal ops and balanced reads + if (is_misdirected(*pg)) { + logger().trace("do_process: dropping misdirected op"); + return seastar::now(); + } else if (const hobject_t& hoid = m->get_hobj(); + !pg->get_peering_state().can_serve_replica_read(hoid)) { + logger().debug("{}: unstable write on replica, " + "bouncing to primary", + __func__); + return reply_op_error(pg, -EAGAIN); + } else { + logger().debug("{}: serving replica read on oid {}", + __func__, m->get_hobj()); + } + } + return pg->do_osd_ops(m, conn, obc, op_info, snapc).safe_then_unpack_interruptible( + [this, pg, &ihref](auto submitted, auto all_completed) mutable { + return submitted.then_interruptible([this, pg, &ihref] { + return ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this); + }).then_interruptible( + [this, pg, all_completed=std::move(all_completed), &ihref]() mutable { + return all_completed.safe_then_interruptible( + [this, pg, &ihref](MURef<MOSDOpReply> reply) { + return ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this + ).then_interruptible( + [this, reply=std::move(reply)]() mutable { + logger().debug("{}: sending response", *this); + return conn->send(std::move(reply)); + }); + }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable { + return process_op(ihref, pg); + })); + }); + }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable { + return process_op(ihref, pg); + })); +} + +bool ClientRequest::is_misdirected(const PG& pg) const +{ + // otherwise take a closer look + if (const int flags = m->get_flags(); + flags & CEPH_OSD_FLAG_BALANCE_READS || + flags & CEPH_OSD_FLAG_LOCALIZE_READS) { + if (!op_info.may_read()) { + // no read found, so it can't be balanced read + return true; + } + if (op_info.may_write() || op_info.may_cache()) { + // write op, but i am not primary + return true; + } + // balanced reads; any replica will do + return false; + } + // neither balanced nor localize reads + return true; +} + +void ClientRequest::put_historic() const +{ + ceph_assert_always(put_historic_shard_services); + put_historic_shard_services->get_registry().put_historic(*this); +} + +const SnapContext ClientRequest::get_snapc( + Ref<PG>& pg, + crimson::osd::ObjectContextRef obc) const +{ + SnapContext snapc; + if (op_info.may_write() || op_info.may_cache()) { + // snap + if (pg->get_pgpool().info.is_pool_snaps_mode()) { + // use pool's snapc + snapc = pg->get_pgpool().snapc; + logger().debug("{} using pool's snapc snaps={}", + __func__, snapc.snaps); + + } else { + // client specified snapc + snapc.seq = m->get_snap_seq(); + snapc.snaps = m->get_snaps(); + logger().debug("{} client specified snapc seq={} snaps={}", + __func__, snapc.seq, snapc.snaps); + } + } + return snapc; +} + +} diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h new file mode 100644 index 000000000..b2dce1e87 --- /dev/null +++ b/src/crimson/osd/osd_operations/client_request.h @@ -0,0 +1,281 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/future.hh> + +#include <boost/intrusive/list.hpp> +#include <boost/intrusive_ptr.hpp> + +#include "osd/osd_op_util.h" +#include "crimson/net/Connection.h" +#include "crimson/osd/object_context.h" +#include "crimson/osd/osdmap_gate.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/osd_operations/client_request_common.h" +#include "crimson/osd/osd_operations/common/pg_pipeline.h" +#include "crimson/osd/pg_activation_blocker.h" +#include "crimson/osd/pg_map.h" +#include "crimson/common/type_helpers.h" +#include "crimson/common/utility.h" +#include "messages/MOSDOp.h" + +namespace crimson::osd { +class PG; +class OSD; +class ShardServices; + +class ClientRequest final : public PhasedOperationT<ClientRequest>, + private CommonClientRequest { + // Initially set to primary core, updated to pg core after move, + // used by put_historic + ShardServices *put_historic_shard_services = nullptr; + + crimson::net::ConnectionRef conn; + // must be after conn due to ConnectionPipeline's life-time + Ref<MOSDOp> m; + OpInfo op_info; + seastar::promise<> on_complete; + unsigned instance_id = 0; + +public: + class PGPipeline : public CommonPGPipeline { + public: + struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> { + static constexpr auto type_name = "ClientRequest::PGPipeline::await_map"; + } await_map; + struct WaitRepop : OrderedConcurrentPhaseT<WaitRepop> { + static constexpr auto type_name = "ClientRequest::PGPipeline::wait_repop"; + } wait_repop; + struct SendReply : OrderedExclusivePhaseT<SendReply> { + static constexpr auto type_name = "ClientRequest::PGPipeline::send_reply"; + } send_reply; + friend class ClientRequest; + friend class LttngBackend; + friend class HistoricBackend; + friend class ReqRequest; + friend class LogMissingRequest; + friend class LogMissingRequestReply; + }; + + /** + * instance_handle_t + * + * Client request is, at present, the only Operation which can be requeued. + * This is, mostly, fine. However, reusing the PipelineHandle or + * BlockingEvent structures before proving that the prior instance has stopped + * can create hangs or crashes due to violations of the BlockerT and + * PipelineHandle invariants. + * + * To solve this, we create an instance_handle_t which contains the events + * for the portion of execution that can be rerun as well as the + * PipelineHandle. ClientRequest::with_pg_int grabs a reference to the current + * instance_handle_t and releases its PipelineHandle in the finally block. + * On requeue, we create a new instance_handle_t with a fresh PipelineHandle + * and events tuple and use it and use it for the next invocation of + * with_pg_int. + */ + std::tuple< + StartEvent, + ConnectionPipeline::AwaitActive::BlockingEvent, + ConnectionPipeline::AwaitMap::BlockingEvent, + OSD_OSDMapGate::OSDMapBlocker::BlockingEvent, + ConnectionPipeline::GetPG::BlockingEvent, + PGMap::PGCreationBlockingEvent, + CompletionEvent + > tracking_events; + + class instance_handle_t : public boost::intrusive_ref_counter< + instance_handle_t, boost::thread_unsafe_counter> { + public: + // intrusive_ptr because seastar::lw_shared_ptr includes a cpu debug check + // that we will fail since the core on which we allocate the request may not + // be the core on which we perform with_pg_int. This is harmless, since we + // don't leave any references on the source core, so we just bypass it by using + // intrusive_ptr instead. + using ref_t = boost::intrusive_ptr<instance_handle_t>; + PipelineHandle handle; + + std::tuple< + PGPipeline::AwaitMap::BlockingEvent, + PG_OSDMapGate::OSDMapBlocker::BlockingEvent, + PGPipeline::WaitForActive::BlockingEvent, + PGActivationBlocker::BlockingEvent, + PGPipeline::RecoverMissing::BlockingEvent, + PGPipeline::GetOBC::BlockingEvent, + PGPipeline::Process::BlockingEvent, + PGPipeline::WaitRepop::BlockingEvent, + PGPipeline::SendReply::BlockingEvent, + CompletionEvent + > pg_tracking_events; + + template <typename BlockingEventT, typename InterruptorT=void, typename F> + auto with_blocking_event(F &&f, ClientRequest &op) { + auto ret = std::forward<F>(f)( + typename BlockingEventT::template Trigger<ClientRequest>{ + std::get<BlockingEventT>(pg_tracking_events), op + }); + if constexpr (std::is_same_v<InterruptorT, void>) { + return ret; + } else { + using ret_t = decltype(ret); + return typename InterruptorT::template futurize_t<ret_t>{std::move(ret)}; + } + } + + template <typename InterruptorT=void, typename StageT> + auto enter_stage(StageT &stage, ClientRequest &op) { + return this->template with_blocking_event< + typename StageT::BlockingEvent, + InterruptorT>( + [&stage, this](auto &&trigger) { + return handle.template enter<ClientRequest>( + stage, std::move(trigger)); + }, op); + } + + template < + typename InterruptorT=void, typename BlockingObj, typename Method, + typename... Args> + auto enter_blocker( + ClientRequest &op, BlockingObj &obj, Method method, Args&&... args) { + return this->template with_blocking_event< + typename BlockingObj::Blocker::BlockingEvent, + InterruptorT>( + [&obj, method, + args=std::forward_as_tuple(std::move(args)...)](auto &&trigger) mutable { + return apply_method_to_tuple( + obj, method, + std::tuple_cat( + std::forward_as_tuple(std::move(trigger)), + std::move(args)) + ); + }, op); + } + }; + instance_handle_t::ref_t instance_handle; + void reset_instance_handle() { + instance_handle = new instance_handle_t; + } + auto get_instance_handle() { return instance_handle; } + + using ordering_hook_t = boost::intrusive::list_member_hook<>; + ordering_hook_t ordering_hook; + class Orderer { + using list_t = boost::intrusive::list< + ClientRequest, + boost::intrusive::member_hook< + ClientRequest, + typename ClientRequest::ordering_hook_t, + &ClientRequest::ordering_hook> + >; + list_t list; + + public: + void add_request(ClientRequest &request) { + assert(!request.ordering_hook.is_linked()); + intrusive_ptr_add_ref(&request); + list.push_back(request); + } + void remove_request(ClientRequest &request) { + assert(request.ordering_hook.is_linked()); + list.erase(list_t::s_iterator_to(request)); + intrusive_ptr_release(&request); + } + void requeue(ShardServices &shard_services, Ref<PG> pg); + void clear_and_cancel(); + }; + void complete_request(); + + static constexpr OperationTypeCode type = OperationTypeCode::client_request; + + ClientRequest( + ShardServices &shard_services, + crimson::net::ConnectionRef, Ref<MOSDOp> &&m); + ~ClientRequest(); + + void print(std::ostream &) const final; + void dump_detail(Formatter *f) const final; + + static constexpr bool can_create() { return false; } + spg_t get_pgid() const { + return m->get_spg(); + } + PipelineHandle &get_handle() { return instance_handle->handle; } + epoch_t get_epoch() const { return m->get_min_epoch(); } + + ConnectionPipeline &get_connection_pipeline(); + seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() { + assert(conn); + return conn.get_foreign( + ).then([this](auto f_conn) { + conn.reset(); + return f_conn; + }); + } + void finish_remote_submission(crimson::net::ConnectionFRef _conn) { + assert(!conn); + conn = make_local_shared_foreign(std::move(_conn)); + } + + seastar::future<> with_pg_int( + ShardServices &shard_services, Ref<PG> pg); + +public: + seastar::future<> with_pg( + ShardServices &shard_services, Ref<PG> pgref); + +private: + template <typename FuncT> + interruptible_future<> with_sequencer(FuncT&& func); + auto reply_op_error(const Ref<PG>& pg, int err); + + interruptible_future<> do_process( + instance_handle_t &ihref, + Ref<PG>& pg, + crimson::osd::ObjectContextRef obc); + ::crimson::interruptible::interruptible_future< + ::crimson::osd::IOInterruptCondition> process_pg_op( + Ref<PG> &pg); + ::crimson::interruptible::interruptible_future< + ::crimson::osd::IOInterruptCondition> process_op( + instance_handle_t &ihref, + Ref<PG> &pg); + bool is_pg_op() const; + + PGPipeline &client_pp(PG &pg); + + template <typename Errorator> + using interruptible_errorator = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + Errorator>; + + bool is_misdirected(const PG& pg) const; + + const SnapContext get_snapc( + Ref<PG>& pg, + crimson::osd::ObjectContextRef obc) const; + +public: + + friend class LttngBackend; + friend class HistoricBackend; + + auto get_started() const { + return get_event<StartEvent>().get_timestamp(); + }; + + auto get_completed() const { + return get_event<CompletionEvent>().get_timestamp(); + }; + + void put_historic() const; +}; + +} + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::osd::ClientRequest> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/osd/osd_operations/client_request_common.cc b/src/crimson/osd/osd_operations/client_request_common.cc new file mode 100644 index 000000000..cfd22c774 --- /dev/null +++ b/src/crimson/osd/osd_operations/client_request_common.cc @@ -0,0 +1,64 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#include "crimson/osd/osd_operations/client_request_common.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/osd_operations/background_recovery.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +typename InterruptibleOperation::template interruptible_future<> +CommonClientRequest::do_recover_missing( + Ref<PG>& pg, const hobject_t& soid) +{ + eversion_t ver; + assert(pg->is_primary()); + logger().debug("{} check for recovery, {}", __func__, soid); + if (!pg->is_unreadable_object(soid, &ver) && + !pg->is_degraded_or_backfilling_object(soid)) { + return seastar::now(); + } + logger().debug("{} need to wait for recovery, {}", __func__, soid); + if (pg->get_recovery_backend()->is_recovering(soid)) { + return pg->get_recovery_backend()->get_recovering(soid).wait_for_recovered(); + } else { + auto [op, fut] = + pg->get_shard_services().start_operation<UrgentRecovery>( + soid, ver, pg, pg->get_shard_services(), pg->get_osdmap_epoch()); + return std::move(fut); + } +} + +bool CommonClientRequest::should_abort_request( + const Operation& op, + std::exception_ptr eptr) +{ + if (*eptr.__cxa_exception_type() == + typeid(::crimson::common::actingset_changed)) { + try { + std::rethrow_exception(eptr); + } catch(::crimson::common::actingset_changed& e) { + if (e.is_primary()) { + logger().debug("{} {} operation restart, acting set changed", __func__, op); + return false; + } else { + logger().debug("{} {} operation abort, up primary changed", __func__, op); + return true; + } + } + } else { + assert(*eptr.__cxa_exception_type() == + typeid(crimson::common::system_shutdown_exception)); + crimson::get_logger(ceph_subsys_osd).debug( + "{} {} operation skipped, system shutdown", __func__, op); + return true; + } +} + +} // namespace crimson::osd diff --git a/src/crimson/osd/osd_operations/client_request_common.h b/src/crimson/osd/osd_operations/client_request_common.h new file mode 100644 index 000000000..6a8a78966 --- /dev/null +++ b/src/crimson/osd/osd_operations/client_request_common.h @@ -0,0 +1,20 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/common/operation.h" +#include "crimson/common/type_helpers.h" +#include "crimson/osd/osd_operation.h" + +namespace crimson::osd { + +struct CommonClientRequest { + static InterruptibleOperation::template interruptible_future<> + do_recover_missing(Ref<PG>& pg, const hobject_t& soid); + + static bool should_abort_request( + const crimson::Operation& op, std::exception_ptr eptr); +}; + +} // namespace crimson::osd diff --git a/src/crimson/osd/osd_operations/common/pg_pipeline.h b/src/crimson/osd/osd_operations/common/pg_pipeline.h new file mode 100644 index 000000000..58fa07b8b --- /dev/null +++ b/src/crimson/osd/osd_operations/common/pg_pipeline.h @@ -0,0 +1,31 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "osd/osd_op_util.h" +#include "crimson/osd/osd_operation.h" + +namespace crimson::osd { + +class CommonPGPipeline { +protected: + friend class InternalClientRequest; + friend class SnapTrimEvent; + friend class SnapTrimObjSubEvent; + + struct WaitForActive : OrderedExclusivePhaseT<WaitForActive> { + static constexpr auto type_name = "CommonPGPipeline:::wait_for_active"; + } wait_for_active; + struct RecoverMissing : OrderedExclusivePhaseT<RecoverMissing> { + static constexpr auto type_name = "CommonPGPipeline::recover_missing"; + } recover_missing; + struct GetOBC : OrderedExclusivePhaseT<GetOBC> { + static constexpr auto type_name = "CommonPGPipeline::get_obc"; + } get_obc; + struct Process : OrderedExclusivePhaseT<Process> { + static constexpr auto type_name = "CommonPGPipeline::process"; + } process; +}; + +} // namespace crimson::osd diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc new file mode 100644 index 000000000..1e9b842b2 --- /dev/null +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -0,0 +1,130 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#include <seastar/core/future.hh> + +#include "crimson/osd/osd_operations/internal_client_request.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson { + template <> + struct EventBackendRegistry<osd::InternalClientRequest> { + static std::tuple<> get_backends() { + return {}; + } + }; +} + + +namespace crimson::osd { + +InternalClientRequest::InternalClientRequest(Ref<PG> pg) + : pg(std::move(pg)) +{ + assert(bool(this->pg)); + assert(this->pg->is_primary()); +} + +InternalClientRequest::~InternalClientRequest() +{ + logger().debug("{}: destroying", *this); +} + +void InternalClientRequest::print(std::ostream &) const +{ +} + +void InternalClientRequest::dump_detail(Formatter *f) const +{ +} + +CommonPGPipeline& InternalClientRequest::client_pp() +{ + return pg->request_pg_pipeline; +} + +seastar::future<> InternalClientRequest::start() +{ + track_event<StartEvent>(); + return crimson::common::handle_system_shutdown([this] { + return seastar::repeat([this] { + logger().debug("{}: in repeat", *this); + return interruptor::with_interruption([this]() mutable { + return enter_stage<interruptor>( + client_pp().wait_for_active + ).then_interruptible([this] { + return with_blocking_event<PGActivationBlocker::BlockingEvent, + interruptor>([this] (auto&& trigger) { + return pg->wait_for_active_blocker.wait(std::move(trigger)); + }); + }).then_interruptible([this] { + return enter_stage<interruptor>( + client_pp().recover_missing); + }).then_interruptible([this] { + return do_recover_missing(pg, get_target_oid()); + }).then_interruptible([this] { + return enter_stage<interruptor>( + client_pp().get_obc); + }).then_interruptible([this] () -> PG::load_obc_iertr::future<> { + logger().debug("{}: getting obc lock", *this); + return seastar::do_with(create_osd_ops(), + [this](auto& osd_ops) mutable { + logger().debug("InternalClientRequest: got {} OSDOps to execute", + std::size(osd_ops)); + [[maybe_unused]] const int ret = op_info.set_from_op( + std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap()); + assert(ret == 0); + return pg->with_locked_obc(get_target_oid(), op_info, + [&osd_ops, this](auto obc) { + return enter_stage<interruptor>(client_pp().process + ).then_interruptible( + [obc=std::move(obc), &osd_ops, this] { + return pg->do_osd_ops( + std::move(obc), + osd_ops, + std::as_const(op_info), + get_do_osd_ops_params(), + [] { + return PG::do_osd_ops_iertr::now(); + }, + [] (const std::error_code& e) { + return PG::do_osd_ops_iertr::now(); + } + ).safe_then_unpack_interruptible( + [](auto submitted, auto all_completed) { + return all_completed.handle_error_interruptible( + crimson::ct_error::eagain::handle([] { + return seastar::now(); + })); + }, crimson::ct_error::eagain::handle([] { + return interruptor::now(); + }) + ); + }); + }); + }); + }).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] { + return seastar::now(); + })).then_interruptible([] { + return seastar::stop_iteration::yes; + }); + }, [this](std::exception_ptr eptr) { + if (should_abort_request(*this, std::move(eptr))) { + return seastar::stop_iteration::yes; + } else { + return seastar::stop_iteration::no; + } + }, pg); + }).then([this] { + track_event<CompletionEvent>(); + }); + }); +} + +} // namespace crimson::osd + diff --git a/src/crimson/osd/osd_operations/internal_client_request.h b/src/crimson/osd/osd_operations/internal_client_request.h new file mode 100644 index 000000000..8eed12e05 --- /dev/null +++ b/src/crimson/osd/osd_operations/internal_client_request.h @@ -0,0 +1,68 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/common/type_helpers.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/osd_operations/client_request_common.h" +#include "crimson/osd/osd_operations/common/pg_pipeline.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/pg_activation_blocker.h" + +namespace crimson::osd { + +class InternalClientRequest : public PhasedOperationT<InternalClientRequest>, + private CommonClientRequest { +public: + explicit InternalClientRequest(Ref<PG> pg); + ~InternalClientRequest(); + + // imposed by `ShardService::start_operation<T>(...)`. + seastar::future<> start(); + +protected: + virtual const hobject_t& get_target_oid() const = 0; + virtual PG::do_osd_ops_params_t get_do_osd_ops_params() const = 0; + virtual std::vector<OSDOp> create_osd_ops() = 0; + + const PG& get_pg() const { + return *pg; + } + +private: + friend OperationT<InternalClientRequest>; + + static constexpr OperationTypeCode type = + OperationTypeCode::internal_client_request; + + void print(std::ostream &) const final; + void dump_detail(Formatter *f) const final; + + CommonPGPipeline& client_pp(); + + seastar::future<> do_process(); + + Ref<PG> pg; + OpInfo op_info; + PipelineHandle handle; + +public: + PipelineHandle& get_handle() { return handle; } + + std::tuple< + StartEvent, + CommonPGPipeline::WaitForActive::BlockingEvent, + PGActivationBlocker::BlockingEvent, + CommonPGPipeline::RecoverMissing::BlockingEvent, + CommonPGPipeline::GetOBC::BlockingEvent, + CommonPGPipeline::Process::BlockingEvent, + CompletionEvent + > tracking_events; +}; + +} // namespace crimson::osd + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::osd::InternalClientRequest> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/osd/osd_operations/logmissing_request.cc b/src/crimson/osd/osd_operations/logmissing_request.cc new file mode 100644 index 000000000..739b46406 --- /dev/null +++ b/src/crimson/osd/osd_operations/logmissing_request.cc @@ -0,0 +1,79 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "logmissing_request.h" + +#include "common/Formatter.h" + +#include "crimson/osd/osd.h" +#include "crimson/osd/osd_connection_priv.h" +#include "crimson/osd/osd_operation_external_tracking.h" +#include "crimson/osd/pg.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +LogMissingRequest::LogMissingRequest(crimson::net::ConnectionRef&& conn, + Ref<MOSDPGUpdateLogMissing> &&req) + : conn{std::move(conn)}, + req{std::move(req)} +{} + +void LogMissingRequest::print(std::ostream& os) const +{ + os << "LogMissingRequest(" + << "from=" << req->from + << " req=" << *req + << ")"; +} + +void LogMissingRequest::dump_detail(Formatter *f) const +{ + f->open_object_section("LogMissingRequest"); + f->dump_stream("req_tid") << req->get_tid(); + f->dump_stream("pgid") << req->get_spg(); + f->dump_unsigned("map_epoch", req->get_map_epoch()); + f->dump_unsigned("min_epoch", req->get_min_epoch()); + f->dump_stream("entries") << req->entries; + f->dump_stream("from") << req->from; + f->close_section(); +} + +ConnectionPipeline &LogMissingRequest::get_connection_pipeline() +{ + return get_osd_priv(conn.get()).replicated_request_conn_pipeline; +} + +ClientRequest::PGPipeline &LogMissingRequest::client_pp(PG &pg) +{ + return pg.request_pg_pipeline; +} + +seastar::future<> LogMissingRequest::with_pg( + ShardServices &shard_services, Ref<PG> pg) +{ + logger().debug("{}: LogMissingRequest::with_pg", *this); + + IRef ref = this; + return interruptor::with_interruption([this, pg] { + logger().debug("{}: pg present", *this); + return this->template enter_stage<interruptor>(client_pp(*pg).await_map + ).then_interruptible([this, pg] { + return this->template with_blocking_event< + PG_OSDMapGate::OSDMapBlocker::BlockingEvent + >([this, pg](auto &&trigger) { + return pg->osdmap_gate.wait_for_map( + std::move(trigger), req->min_epoch); + }); + }).then_interruptible([this, pg](auto) { + return pg->do_update_log_missing(req, conn); + }); + }, [ref](std::exception_ptr) { return seastar::now(); }, pg); +} + +} diff --git a/src/crimson/osd/osd_operations/logmissing_request.h b/src/crimson/osd/osd_operations/logmissing_request.h new file mode 100644 index 000000000..71d0816fd --- /dev/null +++ b/src/crimson/osd/osd_operations/logmissing_request.h @@ -0,0 +1,81 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/net/Connection.h" +#include "crimson/osd/osdmap_gate.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/osd_operations/client_request.h" +#include "crimson/osd/pg_map.h" +#include "crimson/common/type_helpers.h" +#include "messages/MOSDPGUpdateLogMissing.h" + +namespace ceph { + class Formatter; +} + +namespace crimson::osd { + +class ShardServices; + +class OSD; +class PG; + +class LogMissingRequest final : public PhasedOperationT<LogMissingRequest> { +public: + static constexpr OperationTypeCode type = OperationTypeCode::logmissing_request; + LogMissingRequest(crimson::net::ConnectionRef&&, Ref<MOSDPGUpdateLogMissing>&&); + + void print(std::ostream &) const final; + void dump_detail(ceph::Formatter* f) const final; + + static constexpr bool can_create() { return false; } + spg_t get_pgid() const { + return req->get_spg(); + } + PipelineHandle &get_handle() { return handle; } + epoch_t get_epoch() const { return req->get_min_epoch(); } + + ConnectionPipeline &get_connection_pipeline(); + seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() { + assert(conn); + return conn.get_foreign( + ).then([this](auto f_conn) { + conn.reset(); + return f_conn; + }); + } + void finish_remote_submission(crimson::net::ConnectionFRef _conn) { + assert(!conn); + conn = make_local_shared_foreign(std::move(_conn)); + } + + seastar::future<> with_pg( + ShardServices &shard_services, Ref<PG> pg); + + std::tuple< + StartEvent, + ConnectionPipeline::AwaitActive::BlockingEvent, + ConnectionPipeline::AwaitMap::BlockingEvent, + ConnectionPipeline::GetPG::BlockingEvent, + ClientRequest::PGPipeline::AwaitMap::BlockingEvent, + PG_OSDMapGate::OSDMapBlocker::BlockingEvent, + PGMap::PGCreationBlockingEvent, + OSD_OSDMapGate::OSDMapBlocker::BlockingEvent + > tracking_events; + +private: + ClientRequest::PGPipeline &client_pp(PG &pg); + + crimson::net::ConnectionRef conn; + // must be after `conn` to ensure the ConnectionPipeline's is alive + PipelineHandle handle; + Ref<MOSDPGUpdateLogMissing> req; +}; + +} + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::osd::LogMissingRequest> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/osd/osd_operations/logmissing_request_reply.cc b/src/crimson/osd/osd_operations/logmissing_request_reply.cc new file mode 100644 index 000000000..b4bf2938e --- /dev/null +++ b/src/crimson/osd/osd_operations/logmissing_request_reply.cc @@ -0,0 +1,68 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "logmissing_request_reply.h" + +#include "common/Formatter.h" + +#include "crimson/osd/osd.h" +#include "crimson/osd/osd_connection_priv.h" +#include "crimson/osd/osd_operation_external_tracking.h" +#include "crimson/osd/pg.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +LogMissingRequestReply::LogMissingRequestReply( + crimson::net::ConnectionRef&& conn, + Ref<MOSDPGUpdateLogMissingReply> &&req) + : conn{std::move(conn)}, + req{std::move(req)} +{} + +void LogMissingRequestReply::print(std::ostream& os) const +{ + os << "LogMissingRequestReply(" + << "from=" << req->from + << " req=" << *req + << ")"; +} + +void LogMissingRequestReply::dump_detail(Formatter *f) const +{ + f->open_object_section("LogMissingRequestReply"); + f->dump_stream("rep_tid") << req->get_tid(); + f->dump_stream("pgid") << req->get_spg(); + f->dump_unsigned("map_epoch", req->get_map_epoch()); + f->dump_unsigned("min_epoch", req->get_min_epoch()); + f->dump_stream("from") << req->from; + f->close_section(); +} + +ConnectionPipeline &LogMissingRequestReply::get_connection_pipeline() +{ + return get_osd_priv(conn.get()).replicated_request_conn_pipeline; +} + +ClientRequest::PGPipeline &LogMissingRequestReply::client_pp(PG &pg) +{ + return pg.request_pg_pipeline; +} + +seastar::future<> LogMissingRequestReply::with_pg( + ShardServices &shard_services, Ref<PG> pg) +{ + logger().debug("{}: LogMissingRequestReply::with_pg", *this); + + IRef ref = this; + return interruptor::with_interruption([this, pg] { + return pg->do_update_log_missing_reply(std::move(req)); + }, [ref](std::exception_ptr) { return seastar::now(); }, pg); +} + +} diff --git a/src/crimson/osd/osd_operations/logmissing_request_reply.h b/src/crimson/osd/osd_operations/logmissing_request_reply.h new file mode 100644 index 000000000..c89131fec --- /dev/null +++ b/src/crimson/osd/osd_operations/logmissing_request_reply.h @@ -0,0 +1,79 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/net/Connection.h" +#include "crimson/osd/osdmap_gate.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/osd_operations/client_request.h" +#include "crimson/osd/pg_map.h" +#include "crimson/common/type_helpers.h" +#include "messages/MOSDPGUpdateLogMissingReply.h" + +namespace ceph { + class Formatter; +} + +namespace crimson::osd { + +class ShardServices; + +class OSD; +class PG; + +class LogMissingRequestReply final : public PhasedOperationT<LogMissingRequestReply> { +public: + static constexpr OperationTypeCode type = OperationTypeCode::logmissing_request_reply; + LogMissingRequestReply(crimson::net::ConnectionRef&&, Ref<MOSDPGUpdateLogMissingReply>&&); + + void print(std::ostream &) const final; + void dump_detail(ceph::Formatter* f) const final; + + static constexpr bool can_create() { return false; } + spg_t get_pgid() const { + return req->get_spg(); + } + PipelineHandle &get_handle() { return handle; } + epoch_t get_epoch() const { return req->get_min_epoch(); } + + ConnectionPipeline &get_connection_pipeline(); + seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() { + assert(conn); + return conn.get_foreign( + ).then([this](auto f_conn) { + conn.reset(); + return f_conn; + }); + } + void finish_remote_submission(crimson::net::ConnectionFRef _conn) { + assert(!conn); + conn = make_local_shared_foreign(std::move(_conn)); + } + + seastar::future<> with_pg( + ShardServices &shard_services, Ref<PG> pg); + + std::tuple< + StartEvent, + ConnectionPipeline::AwaitActive::BlockingEvent, + ConnectionPipeline::AwaitMap::BlockingEvent, + ConnectionPipeline::GetPG::BlockingEvent, + PGMap::PGCreationBlockingEvent, + OSD_OSDMapGate::OSDMapBlocker::BlockingEvent + > tracking_events; + +private: + ClientRequest::PGPipeline &client_pp(PG &pg); + + crimson::net::ConnectionRef conn; + // must be after `conn` to ensure the ConnectionPipeline's is alive + PipelineHandle handle; + Ref<MOSDPGUpdateLogMissingReply> req; +}; + +} + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::osd::LogMissingRequestReply> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/osd/osd_operations/osdop_params.h b/src/crimson/osd/osd_operations/osdop_params.h new file mode 100644 index 000000000..c7b81e765 --- /dev/null +++ b/src/crimson/osd/osd_operations/osdop_params.h @@ -0,0 +1,22 @@ +#pragma once + +#include "messages/MOSDOp.h" +#include "osd/osd_types.h" +#include "crimson/common/type_helpers.h" + +// The fields in this struct are parameters that may be needed in multiple +// level of processing. I inclosed all those parameters in this struct to +// avoid passing each of them as a method parameter. +struct osd_op_params_t { + osd_reqid_t req_id; + utime_t mtime; + eversion_t at_version; + eversion_t pg_trim_to; + eversion_t min_last_complete_ondisk; + eversion_t last_complete; + version_t user_at_version = 0; + bool user_modify = false; + ObjectCleanRegions clean_regions; + + osd_op_params_t() = default; +}; diff --git a/src/crimson/osd/osd_operations/peering_event.cc b/src/crimson/osd/osd_operations/peering_event.cc new file mode 100644 index 000000000..ea4662bd0 --- /dev/null +++ b/src/crimson/osd/osd_operations/peering_event.cc @@ -0,0 +1,190 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <seastar/core/future.hh> +#include <seastar/core/sleep.hh> + +#include "messages/MOSDPGLog.h" + +#include "common/Formatter.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/osd.h" +#include "crimson/osd/osd_operation_external_tracking.h" +#include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/osd_connection_priv.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +template <class T> +void PeeringEvent<T>::print(std::ostream &lhs) const +{ + lhs << "PeeringEvent(" + << "from=" << from + << " pgid=" << pgid + << " sent=" << evt.get_epoch_sent() + << " requested=" << evt.get_epoch_requested() + << " evt=" << evt.get_desc() + << ")"; +} + +template <class T> +void PeeringEvent<T>::dump_detail(Formatter *f) const +{ + f->open_object_section("PeeringEvent"); + f->dump_stream("from") << from; + f->dump_stream("pgid") << pgid; + f->dump_int("sent", evt.get_epoch_sent()); + f->dump_int("requested", evt.get_epoch_requested()); + f->dump_string("evt", evt.get_desc()); + f->open_array_section("events"); + { + std::apply([f](auto&... events) { + (..., events.dump(f)); + }, static_cast<const T*>(this)->tracking_events); + } + f->close_section(); + f->close_section(); +} + + +template <class T> +PGPeeringPipeline &PeeringEvent<T>::peering_pp(PG &pg) +{ + return pg.peering_request_pg_pipeline; +} + +template <class T> +seastar::future<> PeeringEvent<T>::with_pg( + ShardServices &shard_services, Ref<PG> pg) +{ + if (!pg) { + logger().warn("{}: pg absent, did not create", *this); + on_pg_absent(shard_services); + that()->get_handle().exit(); + return complete_rctx_no_pg(shard_services); + } + + using interruptor = typename T::interruptor; + return interruptor::with_interruption([this, pg, &shard_services] { + logger().debug("{}: pg present", *this); + return this->template enter_stage<interruptor>(peering_pp(*pg).await_map + ).then_interruptible([this, pg] { + return this->template with_blocking_event< + PG_OSDMapGate::OSDMapBlocker::BlockingEvent + >([this, pg](auto &&trigger) { + return pg->osdmap_gate.wait_for_map( + std::move(trigger), evt.get_epoch_sent()); + }); + }).then_interruptible([this, pg](auto) { + return this->template enter_stage<interruptor>(peering_pp(*pg).process); + }).then_interruptible([this, pg, &shard_services] { + return pg->do_peering_event(evt, ctx + ).then_interruptible([this, pg, &shard_services] { + that()->get_handle().exit(); + return complete_rctx(shard_services, pg); + }); + }).then_interruptible([pg, &shard_services]() + -> typename T::template interruptible_future<> { + if (!pg->get_need_up_thru()) { + return seastar::now(); + } + return shard_services.send_alive(pg->get_same_interval_since()); + }).then_interruptible([&shard_services] { + return shard_services.send_pg_temp(); + }); + }, [this](std::exception_ptr ep) { + logger().debug("{}: interrupted with {}", *this, ep); + }, pg); +} + +template <class T> +void PeeringEvent<T>::on_pg_absent(ShardServices &) +{ + logger().debug("{}: pg absent, dropping", *this); +} + +template <class T> +typename PeeringEvent<T>::template interruptible_future<> +PeeringEvent<T>::complete_rctx(ShardServices &shard_services, Ref<PG> pg) +{ + logger().debug("{}: submitting ctx", *this); + return shard_services.dispatch_context( + pg->get_collection_ref(), + std::move(ctx)); +} + +ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline() +{ + return get_osd_priv(conn.get()).peering_request_conn_pipeline; +} + +void RemotePeeringEvent::on_pg_absent(ShardServices &shard_services) +{ + if (auto& e = get_event().get_event(); + e.dynamic_type() == MQuery::static_type()) { + const auto map_epoch = + shard_services.get_map()->get_epoch(); + const auto& q = static_cast<const MQuery&>(e); + const pg_info_t empty{spg_t{pgid.pgid, q.query.to}}; + if (q.query.type == q.query.LOG || + q.query.type == q.query.FULLLOG) { + auto m = crimson::make_message<MOSDPGLog>(q.query.from, q.query.to, + map_epoch, empty, + q.query.epoch_sent); + ctx.send_osd_message(q.from.osd, std::move(m)); + } else { + ctx.send_notify(q.from.osd, {q.query.from, q.query.to, + q.query.epoch_sent, + map_epoch, empty, + PastIntervals{}}); + } + } +} + +RemotePeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx( + ShardServices &shard_services, + Ref<PG> pg) +{ + if (pg) { + return PeeringEvent::complete_rctx(shard_services, pg); + } else { + return shard_services.dispatch_context_messages(std::move(ctx)); + } +} + +seastar::future<> RemotePeeringEvent::complete_rctx_no_pg( + ShardServices &shard_services) +{ + return shard_services.dispatch_context_messages(std::move(ctx)); +} + +seastar::future<> LocalPeeringEvent::start() +{ + logger().debug("{}: start", *this); + + IRef ref = this; + auto maybe_delay = seastar::now(); + if (delay) { + maybe_delay = seastar::sleep( + std::chrono::milliseconds(std::lround(delay * 1000))); + } + return maybe_delay.then([this] { + return with_pg(pg->get_shard_services(), pg); + }).finally([ref=std::move(ref)] { + logger().debug("{}: complete", *ref); + }); +} + + +LocalPeeringEvent::~LocalPeeringEvent() {} + +template class PeeringEvent<RemotePeeringEvent>; +template class PeeringEvent<LocalPeeringEvent>; + +} diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h new file mode 100644 index 000000000..e94caead1 --- /dev/null +++ b/src/crimson/osd/osd_operations/peering_event.h @@ -0,0 +1,207 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <iostream> +#include <seastar/core/future.hh> + +#include "crimson/osd/osdmap_gate.h" +#include "crimson/osd/osd_operation.h" +#include "osd/osd_types.h" +#include "osd/PGPeeringEvent.h" +#include "osd/PeeringState.h" + +namespace ceph { + class Formatter; +} + +namespace crimson::osd { + +class OSD; +class ShardServices; +class PG; +class BackfillRecovery; + + class PGPeeringPipeline { + struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> { + static constexpr auto type_name = "PeeringEvent::PGPipeline::await_map"; + } await_map; + struct Process : OrderedExclusivePhaseT<Process> { + static constexpr auto type_name = "PeeringEvent::PGPipeline::process"; + } process; + template <class T> + friend class PeeringEvent; + friend class LocalPeeringEvent; + friend class RemotePeeringEvent; + friend class PGAdvanceMap; + friend class BackfillRecovery; + }; + +template <class T> +class PeeringEvent : public PhasedOperationT<T> { + T* that() { + return static_cast<T*>(this); + } + const T* that() const { + return static_cast<const T*>(this); + } + +public: + static constexpr OperationTypeCode type = OperationTypeCode::peering_event; + +protected: + PGPeeringPipeline &peering_pp(PG &pg); + + PeeringCtx ctx; + pg_shard_t from; + spg_t pgid; + float delay = 0; + PGPeeringEvent evt; + + const pg_shard_t get_from() const { + return from; + } + + const spg_t get_pgid() const { + return pgid; + } + + const PGPeeringEvent &get_event() const { + return evt; + } + + virtual void on_pg_absent(ShardServices &); + + virtual typename PeeringEvent::template interruptible_future<> + complete_rctx(ShardServices &, Ref<PG>); + + virtual seastar::future<> complete_rctx_no_pg( + ShardServices &shard_services + ) { return seastar::now();} + +public: + template <typename... Args> + PeeringEvent( + const pg_shard_t &from, const spg_t &pgid, + Args&&... args) : + from(from), + pgid(pgid), + evt(std::forward<Args>(args)...) + {} + template <typename... Args> + PeeringEvent( + const pg_shard_t &from, const spg_t &pgid, + float delay, Args&&... args) : + from(from), + pgid(pgid), + delay(delay), + evt(std::forward<Args>(args)...) + {} + + void print(std::ostream &) const final; + void dump_detail(ceph::Formatter* f) const final; + seastar::future<> with_pg( + ShardServices &shard_services, Ref<PG> pg); +}; + +class RemotePeeringEvent : public PeeringEvent<RemotePeeringEvent> { +protected: + crimson::net::ConnectionRef conn; + // must be after conn due to ConnectionPipeline's life-time + PipelineHandle handle; + + void on_pg_absent(ShardServices &) final; + PeeringEvent::interruptible_future<> complete_rctx( + ShardServices &shard_services, + Ref<PG> pg) override; + seastar::future<> complete_rctx_no_pg( + ShardServices &shard_services + ) override; + +public: + class OSDPipeline { + struct AwaitActive : OrderedExclusivePhaseT<AwaitActive> { + static constexpr auto type_name = + "PeeringRequest::OSDPipeline::await_active"; + } await_active; + friend class RemotePeeringEvent; + }; + + template <typename... Args> + RemotePeeringEvent(crimson::net::ConnectionRef conn, Args&&... args) : + PeeringEvent(std::forward<Args>(args)...), + conn(conn) + {} + + std::tuple< + StartEvent, + ConnectionPipeline::AwaitActive::BlockingEvent, + ConnectionPipeline::AwaitMap::BlockingEvent, + OSD_OSDMapGate::OSDMapBlocker::BlockingEvent, + ConnectionPipeline::GetPG::BlockingEvent, + PGMap::PGCreationBlockingEvent, + PGPeeringPipeline::AwaitMap::BlockingEvent, + PG_OSDMapGate::OSDMapBlocker::BlockingEvent, + PGPeeringPipeline::Process::BlockingEvent, + OSDPipeline::AwaitActive::BlockingEvent, + CompletionEvent + > tracking_events; + + static constexpr bool can_create() { return true; } + auto get_create_info() { return std::move(evt.create_info); } + spg_t get_pgid() const { + return pgid; + } + PipelineHandle &get_handle() { return handle; } + epoch_t get_epoch() const { return evt.get_epoch_sent(); } + + ConnectionPipeline &get_connection_pipeline(); + seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() { + assert(conn); + return conn.get_foreign( + ).then([this](auto f_conn) { + conn.reset(); + return f_conn; + }); + } + void finish_remote_submission(crimson::net::ConnectionFRef _conn) { + assert(!conn); + conn = make_local_shared_foreign(std::move(_conn)); + } +}; + +class LocalPeeringEvent final : public PeeringEvent<LocalPeeringEvent> { +protected: + Ref<PG> pg; + PipelineHandle handle; + +public: + template <typename... Args> + LocalPeeringEvent(Ref<PG> pg, Args&&... args) : + PeeringEvent(std::forward<Args>(args)...), + pg(pg) + {} + + seastar::future<> start(); + virtual ~LocalPeeringEvent(); + + PipelineHandle &get_handle() { return handle; } + + std::tuple< + StartEvent, + PGPeeringPipeline::AwaitMap::BlockingEvent, + PG_OSDMapGate::OSDMapBlocker::BlockingEvent, + PGPeeringPipeline::Process::BlockingEvent, + CompletionEvent + > tracking_events; +}; + + +} + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::osd::LocalPeeringEvent> : fmt::ostream_formatter {}; +template <> struct fmt::formatter<crimson::osd::RemotePeeringEvent> : fmt::ostream_formatter {}; +template <class T> struct fmt::formatter<crimson::osd::PeeringEvent<T>> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc new file mode 100644 index 000000000..3706af810 --- /dev/null +++ b/src/crimson/osd/osd_operations/pg_advance_map.cc @@ -0,0 +1,130 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <seastar/core/future.hh> + +#include "include/types.h" +#include "common/Formatter.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/osdmap_service.h" +#include "crimson/osd/shard_services.h" +#include "crimson/osd/osd_operations/pg_advance_map.h" +#include "crimson/osd/osd_operation_external_tracking.h" +#include "osd/PeeringState.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +PGAdvanceMap::PGAdvanceMap( + ShardServices &shard_services, Ref<PG> pg, epoch_t to, + PeeringCtx &&rctx, bool do_init) + : shard_services(shard_services), pg(pg), to(to), + rctx(std::move(rctx)), do_init(do_init) +{ + logger().debug("{}: created", *this); +} + +PGAdvanceMap::~PGAdvanceMap() {} + +void PGAdvanceMap::print(std::ostream &lhs) const +{ + lhs << "PGAdvanceMap(" + << "pg=" << pg->get_pgid() + << " from=" << (from ? *from : -1) + << " to=" << to; + if (do_init) { + lhs << " do_init"; + } + lhs << ")"; +} + +void PGAdvanceMap::dump_detail(Formatter *f) const +{ + f->open_object_section("PGAdvanceMap"); + f->dump_stream("pgid") << pg->get_pgid(); + if (from) { + f->dump_int("from", *from); + } + f->dump_int("to", to); + f->dump_bool("do_init", do_init); + f->close_section(); +} + +PGPeeringPipeline &PGAdvanceMap::peering_pp(PG &pg) +{ + return pg.peering_request_pg_pipeline; +} + +seastar::future<> PGAdvanceMap::start() +{ + using cached_map_t = OSDMapService::cached_map_t; + + logger().debug("{}: start", *this); + + IRef ref = this; + return enter_stage<>( + peering_pp(*pg).process + ).then([this] { + /* + * PGAdvanceMap is scheduled at pg creation and when + * broadcasting new osdmaps to pgs. We are not able to serialize + * between the two different PGAdvanceMap callers since a new pg + * will get advanced to the latest osdmap at it's creation. + * As a result, we may need to adjust the PGAdvance operation + * 'from' epoch. + * See: https://tracker.ceph.com/issues/61744 + */ + from = pg->get_osdmap_epoch(); + auto fut = seastar::now(); + if (do_init) { + fut = pg->handle_initialize(rctx + ).then([this] { + return pg->handle_activate_map(rctx); + }); + } + return fut.then([this] { + ceph_assert(std::cmp_less_equal(*from, to)); + return seastar::do_for_each( + boost::make_counting_iterator(*from + 1), + boost::make_counting_iterator(to + 1), + [this](epoch_t next_epoch) { + logger().debug("{}: start: getting map {}", + *this, next_epoch); + return shard_services.get_map(next_epoch).then( + [this] (cached_map_t&& next_map) { + logger().debug("{}: advancing map to {}", + *this, next_map->get_epoch()); + return pg->handle_advance_map(next_map, rctx); + }); + }).then([this] { + return pg->handle_activate_map(rctx).then([this] { + logger().debug("{}: map activated", *this); + if (do_init) { + shard_services.pg_created(pg->get_pgid(), pg); + logger().info("PGAdvanceMap::start new pg {}", *pg); + } + return seastar::when_all_succeed( + pg->get_need_up_thru() + ? shard_services.send_alive( + pg->get_same_interval_since()) + : seastar::now(), + shard_services.dispatch_context( + pg->get_collection_ref(), + std::move(rctx))); + }); + }).then_unpack([this] { + logger().debug("{}: sending pg temp", *this); + return shard_services.send_pg_temp(); + }); + }); + }).then([this, ref=std::move(ref)] { + logger().debug("{}: complete", *this); + }); +} + +} diff --git a/src/crimson/osd/osd_operations/pg_advance_map.h b/src/crimson/osd/osd_operations/pg_advance_map.h new file mode 100644 index 000000000..b712cc12e --- /dev/null +++ b/src/crimson/osd/osd_operations/pg_advance_map.h @@ -0,0 +1,61 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <iostream> +#include <seastar/core/future.hh> + +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/osd_operations/peering_event.h" +#include "osd/osd_types.h" +#include "crimson/common/type_helpers.h" + +namespace ceph { + class Formatter; +} + +namespace crimson::osd { + +class ShardServices; +class PG; + +class PGAdvanceMap : public PhasedOperationT<PGAdvanceMap> { +public: + static constexpr OperationTypeCode type = OperationTypeCode::pg_advance_map; + +protected: + ShardServices &shard_services; + Ref<PG> pg; + PipelineHandle handle; + + std::optional<epoch_t> from; + epoch_t to; + + PeeringCtx rctx; + const bool do_init; + +public: + PGAdvanceMap( + ShardServices &shard_services, Ref<PG> pg, epoch_t to, + PeeringCtx &&rctx, bool do_init); + ~PGAdvanceMap(); + + void print(std::ostream &) const final; + void dump_detail(ceph::Formatter *f) const final; + seastar::future<> start(); + PipelineHandle &get_handle() { return handle; } + + std::tuple< + PGPeeringPipeline::Process::BlockingEvent + > tracking_events; + +private: + PGPeeringPipeline &peering_pp(PG &pg); +}; + +} + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::osd::PGAdvanceMap> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.cc b/src/crimson/osd/osd_operations/recovery_subrequest.cc new file mode 100644 index 000000000..68655b8da --- /dev/null +++ b/src/crimson/osd/osd_operations/recovery_subrequest.cc @@ -0,0 +1,46 @@ +#include <fmt/format.h> +#include <fmt/ostream.h> + +#include "crimson/osd/osd_operations/recovery_subrequest.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/osd_connection_priv.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson { + template <> + struct EventBackendRegistry<osd::RecoverySubRequest> { + static std::tuple<> get_backends() { + return {}; + } + }; +} + +namespace crimson::osd { + +seastar::future<> RecoverySubRequest::with_pg( + ShardServices &shard_services, Ref<PG> pgref) +{ + logger().debug("{}: {}", "RecoverySubRequest::with_pg", *this); + + track_event<StartEvent>(); + IRef opref = this; + return interruptor::with_interruption([this, pgref] { + return pgref->get_recovery_backend()->handle_recovery_op(m, conn); + }, [](std::exception_ptr) { + return seastar::now(); + }, pgref).finally([this, opref, pgref] { + track_event<CompletionEvent>(); + }); +} + +ConnectionPipeline &RecoverySubRequest::get_connection_pipeline() +{ + return get_osd_priv(conn.get()).peering_request_conn_pipeline; +} + +} diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.h b/src/crimson/osd/osd_operations/recovery_subrequest.h new file mode 100644 index 000000000..07c7c95b5 --- /dev/null +++ b/src/crimson/osd/osd_operations/recovery_subrequest.h @@ -0,0 +1,81 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "osd/osd_op_util.h" +#include "crimson/net/Connection.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/pg.h" +#include "crimson/common/type_helpers.h" +#include "messages/MOSDFastDispatchOp.h" + +namespace crimson::osd { + +class PG; + +class RecoverySubRequest final : public PhasedOperationT<RecoverySubRequest> { +public: + static constexpr OperationTypeCode type = + OperationTypeCode::background_recovery_sub; + + RecoverySubRequest( + crimson::net::ConnectionRef conn, + Ref<MOSDFastDispatchOp>&& m) + : conn(conn), m(m) {} + + void print(std::ostream& out) const final + { + out << *m; + } + + void dump_detail(Formatter *f) const final + { + } + + static constexpr bool can_create() { return false; } + spg_t get_pgid() const { + return m->get_spg(); + } + PipelineHandle &get_handle() { return handle; } + epoch_t get_epoch() const { return m->get_min_epoch(); } + + ConnectionPipeline &get_connection_pipeline(); + seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() { + assert(conn); + return conn.get_foreign( + ).then([this](auto f_conn) { + conn.reset(); + return f_conn; + }); + } + void finish_remote_submission(crimson::net::ConnectionFRef _conn) { + assert(!conn); + conn = make_local_shared_foreign(std::move(_conn)); + } + + seastar::future<> with_pg( + ShardServices &shard_services, Ref<PG> pg); + + std::tuple< + StartEvent, + ConnectionPipeline::AwaitActive::BlockingEvent, + ConnectionPipeline::AwaitMap::BlockingEvent, + ConnectionPipeline::GetPG::BlockingEvent, + PGMap::PGCreationBlockingEvent, + OSD_OSDMapGate::OSDMapBlocker::BlockingEvent, + CompletionEvent + > tracking_events; + +private: + crimson::net::ConnectionRef conn; + // must be after `conn` to ensure the ConnectionPipeline's is alive + PipelineHandle handle; + Ref<MOSDFastDispatchOp> m; +}; + +} + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::osd::RecoverySubRequest> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/osd/osd_operations/replicated_request.cc b/src/crimson/osd/osd_operations/replicated_request.cc new file mode 100644 index 000000000..09217575c --- /dev/null +++ b/src/crimson/osd/osd_operations/replicated_request.cc @@ -0,0 +1,80 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "replicated_request.h" + +#include "common/Formatter.h" + +#include "crimson/osd/osd.h" +#include "crimson/osd/osd_connection_priv.h" +#include "crimson/osd/osd_operation_external_tracking.h" +#include "crimson/osd/pg.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +RepRequest::RepRequest(crimson::net::ConnectionRef&& conn, + Ref<MOSDRepOp> &&req) + : conn{std::move(conn)}, + req{std::move(req)} +{} + +void RepRequest::print(std::ostream& os) const +{ + os << "RepRequest(" + << "from=" << req->from + << " req=" << *req + << ")"; +} + +void RepRequest::dump_detail(Formatter *f) const +{ + f->open_object_section("RepRequest"); + f->dump_stream("reqid") << req->reqid; + f->dump_stream("pgid") << req->get_spg(); + f->dump_unsigned("map_epoch", req->get_map_epoch()); + f->dump_unsigned("min_epoch", req->get_min_epoch()); + f->dump_stream("oid") << req->poid; + f->dump_stream("from") << req->from; + f->close_section(); +} + +ConnectionPipeline &RepRequest::get_connection_pipeline() +{ + return get_osd_priv(conn.get()).replicated_request_conn_pipeline; +} + +ClientRequest::PGPipeline &RepRequest::client_pp(PG &pg) +{ + return pg.request_pg_pipeline; +} + +seastar::future<> RepRequest::with_pg( + ShardServices &shard_services, Ref<PG> pg) +{ + logger().debug("{}: RepRequest::with_pg", *this); + IRef ref = this; + return interruptor::with_interruption([this, pg] { + logger().debug("{}: pg present", *this); + return this->template enter_stage<interruptor>(client_pp(*pg).await_map + ).then_interruptible([this, pg] { + return this->template with_blocking_event< + PG_OSDMapGate::OSDMapBlocker::BlockingEvent + >([this, pg](auto &&trigger) { + return pg->osdmap_gate.wait_for_map( + std::move(trigger), req->min_epoch); + }); + }).then_interruptible([this, pg] (auto) { + return pg->handle_rep_op(req); + }); + }, [ref](std::exception_ptr) { + return seastar::now(); + }, pg); +} + +} diff --git a/src/crimson/osd/osd_operations/replicated_request.h b/src/crimson/osd/osd_operations/replicated_request.h new file mode 100644 index 000000000..c742888d9 --- /dev/null +++ b/src/crimson/osd/osd_operations/replicated_request.h @@ -0,0 +1,80 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/net/Connection.h" +#include "crimson/osd/osdmap_gate.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/pg_map.h" +#include "crimson/osd/osd_operations/client_request.h" +#include "crimson/common/type_helpers.h" +#include "messages/MOSDRepOp.h" + +namespace ceph { + class Formatter; +} + +namespace crimson::osd { + +class ShardServices; + +class OSD; +class PG; + +class RepRequest final : public PhasedOperationT<RepRequest> { +public: + static constexpr OperationTypeCode type = OperationTypeCode::replicated_request; + RepRequest(crimson::net::ConnectionRef&&, Ref<MOSDRepOp>&&); + + void print(std::ostream &) const final; + void dump_detail(ceph::Formatter* f) const final; + + static constexpr bool can_create() { return false; } + spg_t get_pgid() const { + return req->get_spg(); + } + PipelineHandle &get_handle() { return handle; } + epoch_t get_epoch() const { return req->get_min_epoch(); } + + ConnectionPipeline &get_connection_pipeline(); + seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() { + assert(conn); + return conn.get_foreign( + ).then([this](auto f_conn) { + conn.reset(); + return f_conn; + }); + } + void finish_remote_submission(crimson::net::ConnectionFRef _conn) { + assert(!conn); + conn = make_local_shared_foreign(std::move(_conn)); + } + + seastar::future<> with_pg( + ShardServices &shard_services, Ref<PG> pg); + + std::tuple< + StartEvent, + ConnectionPipeline::AwaitActive::BlockingEvent, + ConnectionPipeline::AwaitMap::BlockingEvent, + ConnectionPipeline::GetPG::BlockingEvent, + ClientRequest::PGPipeline::AwaitMap::BlockingEvent, + PG_OSDMapGate::OSDMapBlocker::BlockingEvent, + PGMap::PGCreationBlockingEvent, + OSD_OSDMapGate::OSDMapBlocker::BlockingEvent + > tracking_events; + +private: + ClientRequest::PGPipeline &client_pp(PG &pg); + + crimson::net::ConnectionRef conn; + PipelineHandle handle; + Ref<MOSDRepOp> req; +}; + +} + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::osd::RepRequest> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/osd/osd_operations/snaptrim_event.cc b/src/crimson/osd/osd_operations/snaptrim_event.cc new file mode 100644 index 000000000..e4a1b04df --- /dev/null +++ b/src/crimson/osd/osd_operations/snaptrim_event.cc @@ -0,0 +1,569 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "crimson/osd/osd_operations/snaptrim_event.h" +#include "crimson/osd/ops_executer.h" +#include "crimson/osd/pg.h" +#include <seastar/core/sleep.hh> + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson { + template <> + struct EventBackendRegistry<osd::SnapTrimEvent> { + static std::tuple<> get_backends() { + return {}; + } + }; + + template <> + struct EventBackendRegistry<osd::SnapTrimObjSubEvent> { + static std::tuple<> get_backends() { + return {}; + } + }; +} + +namespace crimson::osd { + +PG::interruptible_future<> +PG::SnapTrimMutex::lock(SnapTrimEvent &st_event) noexcept +{ + return st_event.enter_stage<interruptor>(wait_pg + ).then_interruptible([this] { + return mutex.lock(); + }); +} + +void SnapTrimEvent::SubOpBlocker::dump_detail(Formatter *f) const +{ + f->open_array_section("dependent_operations"); + { + for (const auto &kv : subops) { + f->dump_unsigned("op_id", kv.first); + } + } + f->close_section(); +} + +template <class... Args> +void SnapTrimEvent::SubOpBlocker::emplace_back(Args&&... args) +{ + subops.emplace_back(std::forward<Args>(args)...); +}; + +SnapTrimEvent::remove_or_update_iertr::future<> +SnapTrimEvent::SubOpBlocker::wait_completion() +{ + return interruptor::do_for_each(subops, [](auto&& kv) { + return std::move(kv.second); + }); +} + +void SnapTrimEvent::print(std::ostream &lhs) const +{ + lhs << "SnapTrimEvent(" + << "pgid=" << pg->get_pgid() + << " snapid=" << snapid + << " needs_pause=" << needs_pause + << ")"; +} + +void SnapTrimEvent::dump_detail(Formatter *f) const +{ + f->open_object_section("SnapTrimEvent"); + f->dump_stream("pgid") << pg->get_pgid(); + f->close_section(); +} + +SnapTrimEvent::snap_trim_ertr::future<seastar::stop_iteration> +SnapTrimEvent::start() +{ + logger().debug("{}: {}", *this, __func__); + return with_pg( + pg->get_shard_services(), pg + ).finally([ref=IRef{this}, this] { + logger().debug("{}: complete", *ref); + return handle.complete(); + }); +} + +CommonPGPipeline& SnapTrimEvent::client_pp() +{ + return pg->request_pg_pipeline; +} + +SnapTrimEvent::snap_trim_ertr::future<seastar::stop_iteration> +SnapTrimEvent::with_pg( + ShardServices &shard_services, Ref<PG> _pg) +{ + return interruptor::with_interruption([&shard_services, this] { + return enter_stage<interruptor>( + client_pp().wait_for_active + ).then_interruptible([this] { + return with_blocking_event<PGActivationBlocker::BlockingEvent, + interruptor>([this] (auto&& trigger) { + return pg->wait_for_active_blocker.wait(std::move(trigger)); + }); + }).then_interruptible([this] { + return enter_stage<interruptor>( + client_pp().recover_missing); + }).then_interruptible([] { + //return do_recover_missing(pg, get_target_oid()); + return seastar::now(); + }).then_interruptible([this] { + return enter_stage<interruptor>( + client_pp().get_obc); + }).then_interruptible([this] { + return pg->snaptrim_mutex.lock(*this); + }).then_interruptible([this] { + return enter_stage<interruptor>( + client_pp().process); + }).then_interruptible([&shard_services, this] { + return interruptor::async([this] { + std::vector<hobject_t> to_trim; + using crimson::common::local_conf; + const auto max = + local_conf().get_val<uint64_t>("osd_pg_max_concurrent_snap_trims"); + // we need to look for at least 1 snaptrim, otherwise we'll misinterpret + // the ENOENT below and erase snapid. + int r = snap_mapper.get_next_objects_to_trim( + snapid, + max, + &to_trim); + if (r == -ENOENT) { + to_trim.clear(); // paranoia + return to_trim; + } else if (r != 0) { + logger().error("{}: get_next_objects_to_trim returned {}", + *this, cpp_strerror(r)); + ceph_abort_msg("get_next_objects_to_trim returned an invalid code"); + } else { + assert(!to_trim.empty()); + } + logger().debug("{}: async almost done line {}", *this, __LINE__); + return to_trim; + }).then_interruptible([&shard_services, this] (const auto& to_trim) { + if (to_trim.empty()) { + // the legit ENOENT -> done + logger().debug("{}: to_trim is empty! Stopping iteration", *this); + pg->snaptrim_mutex.unlock(); + return snap_trim_iertr::make_ready_future<seastar::stop_iteration>( + seastar::stop_iteration::yes); + } + return [&shard_services, this](const auto &to_trim) { + for (const auto& object : to_trim) { + logger().debug("{}: trimming {}", *this, object); + auto [op, fut] = shard_services.start_operation_may_interrupt< + interruptor, SnapTrimObjSubEvent>( + pg, + object, + snapid); + subop_blocker.emplace_back( + op->get_id(), + std::move(fut) + ); + } + return interruptor::now(); + }(to_trim).then_interruptible([this] { + return enter_stage<interruptor>(wait_subop); + }).then_interruptible([this] { + logger().debug("{}: awaiting completion", *this); + return subop_blocker.wait_completion(); + }).finally([this] { + pg->snaptrim_mutex.unlock(); + }).safe_then_interruptible([this] { + if (!needs_pause) { + return interruptor::now(); + } + // let's know operators we're waiting + return enter_stage<interruptor>( + wait_trim_timer + ).then_interruptible([this] { + using crimson::common::local_conf; + const auto time_to_sleep = + local_conf().template get_val<double>("osd_snap_trim_sleep"); + logger().debug("{}: time_to_sleep {}", *this, time_to_sleep); + // TODO: this logic should be more sophisticated and distinguish + // between SSDs, HDDs and the hybrid case + return seastar::sleep( + std::chrono::milliseconds(std::lround(time_to_sleep * 1000))); + }); + }).safe_then_interruptible([this] { + logger().debug("{}: all completed", *this); + return snap_trim_iertr::make_ready_future<seastar::stop_iteration>( + seastar::stop_iteration::no); + }); + }); + }); + }, [this](std::exception_ptr eptr) -> snap_trim_ertr::future<seastar::stop_iteration> { + logger().debug("{}: interrupted {}", *this, eptr); + return crimson::ct_error::eagain::make(); + }, pg); +} + + +CommonPGPipeline& SnapTrimObjSubEvent::client_pp() +{ + return pg->request_pg_pipeline; +} + +SnapTrimObjSubEvent::remove_or_update_iertr::future<> +SnapTrimObjSubEvent::start() +{ + logger().debug("{}: start", *this); + return with_pg( + pg->get_shard_services(), pg + ).finally([ref=IRef{this}, this] { + logger().debug("{}: complete", *ref); + return handle.complete(); + }); +} + +SnapTrimObjSubEvent::remove_or_update_iertr::future<> +SnapTrimObjSubEvent::remove_clone( + ObjectContextRef obc, + ObjectContextRef head_obc, + ceph::os::Transaction& txn, + std::vector<pg_log_entry_t>& log_entries +) { + const auto p = std::find( + head_obc->ssc->snapset.clones.begin(), + head_obc->ssc->snapset.clones.end(), + coid.snap); + if (p == head_obc->ssc->snapset.clones.end()) { + logger().error("{}: Snap {} not in clones", + *this, coid.snap); + return crimson::ct_error::enoent::make(); + } + assert(p != head_obc->ssc->snapset.clones.end()); + snapid_t last = coid.snap; + delta_stats.num_bytes -= head_obc->ssc->snapset.get_clone_bytes(last); + + if (p != head_obc->ssc->snapset.clones.begin()) { + // not the oldest... merge overlap into next older clone + std::vector<snapid_t>::iterator n = p - 1; + hobject_t prev_coid = coid; + prev_coid.snap = *n; + + // does the classical OSD really need is_present_clone(prev_coid)? + delta_stats.num_bytes -= head_obc->ssc->snapset.get_clone_bytes(*n); + head_obc->ssc->snapset.clone_overlap[*n].intersection_of( + head_obc->ssc->snapset.clone_overlap[*p]); + delta_stats.num_bytes += head_obc->ssc->snapset.get_clone_bytes(*n); + } + delta_stats.num_objects--; + if (obc->obs.oi.is_dirty()) { + delta_stats.num_objects_dirty--; + } + if (obc->obs.oi.is_omap()) { + delta_stats.num_objects_omap--; + } + if (obc->obs.oi.is_whiteout()) { + logger().debug("{}: trimming whiteout on {}", + *this, coid); + delta_stats.num_whiteouts--; + } + delta_stats.num_object_clones--; + + obc->obs.exists = false; + head_obc->ssc->snapset.clones.erase(p); + head_obc->ssc->snapset.clone_overlap.erase(last); + head_obc->ssc->snapset.clone_size.erase(last); + head_obc->ssc->snapset.clone_snaps.erase(last); + + log_entries.emplace_back( + pg_log_entry_t{ + pg_log_entry_t::DELETE, + coid, + osd_op_p.at_version, + obc->obs.oi.version, + 0, + osd_reqid_t(), + obc->obs.oi.mtime, // will be replaced in `apply_to()` + 0} + ); + txn.remove( + pg->get_collection_ref()->get_cid(), + ghobject_t{coid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD}); + obc->obs.oi = object_info_t(coid); + return OpsExecuter::snap_map_remove(coid, pg->snap_mapper, pg->osdriver, txn); +} + +void SnapTrimObjSubEvent::remove_head_whiteout( + ObjectContextRef obc, + ObjectContextRef head_obc, + ceph::os::Transaction& txn, + std::vector<pg_log_entry_t>& log_entries +) { + // NOTE: this arguably constitutes minor interference with the + // tiering agent if this is a cache tier since a snap trim event + // is effectively evicting a whiteout we might otherwise want to + // keep around. + const auto head_oid = coid.get_head(); + logger().info("{}: {} removing {}", + *this, coid, head_oid); + log_entries.emplace_back( + pg_log_entry_t{ + pg_log_entry_t::DELETE, + head_oid, + osd_op_p.at_version, + head_obc->obs.oi.version, + 0, + osd_reqid_t(), + obc->obs.oi.mtime, // will be replaced in `apply_to()` + 0} + ); + logger().info("{}: remove snap head", *this); + object_info_t& oi = head_obc->obs.oi; + delta_stats.num_objects--; + if (oi.is_dirty()) { + delta_stats.num_objects_dirty--; + } + if (oi.is_omap()) { + delta_stats.num_objects_omap--; + } + if (oi.is_whiteout()) { + logger().debug("{}: trimming whiteout on {}", + *this, oi.soid); + delta_stats.num_whiteouts--; + } + head_obc->obs.exists = false; + head_obc->obs.oi = object_info_t(head_oid); + txn.remove(pg->get_collection_ref()->get_cid(), + ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD}); +} + +SnapTrimObjSubEvent::interruptible_future<> +SnapTrimObjSubEvent::adjust_snaps( + ObjectContextRef obc, + ObjectContextRef head_obc, + const std::set<snapid_t>& new_snaps, + ceph::os::Transaction& txn, + std::vector<pg_log_entry_t>& log_entries +) { + head_obc->ssc->snapset.clone_snaps[coid.snap] = + std::vector<snapid_t>(new_snaps.rbegin(), new_snaps.rend()); + + // we still do a 'modify' event on this object just to trigger a + // snapmapper.update ... :( + obc->obs.oi.prior_version = obc->obs.oi.version; + obc->obs.oi.version = osd_op_p.at_version; + ceph::bufferlist bl; + encode(obc->obs.oi, + bl, + pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr)); + txn.setattr( + pg->get_collection_ref()->get_cid(), + ghobject_t{coid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD}, + OI_ATTR, + bl); + log_entries.emplace_back( + pg_log_entry_t{ + pg_log_entry_t::MODIFY, + coid, + obc->obs.oi.version, + obc->obs.oi.prior_version, + 0, + osd_reqid_t(), + obc->obs.oi.mtime, + 0} + ); + return OpsExecuter::snap_map_modify( + coid, new_snaps, pg->snap_mapper, pg->osdriver, txn); +} + +void SnapTrimObjSubEvent::update_head( + ObjectContextRef obc, + ObjectContextRef head_obc, + ceph::os::Transaction& txn, + std::vector<pg_log_entry_t>& log_entries +) { + const auto head_oid = coid.get_head(); + logger().info("{}: writing updated snapset on {}, snapset is {}", + *this, head_oid, head_obc->ssc->snapset); + log_entries.emplace_back( + pg_log_entry_t{ + pg_log_entry_t::MODIFY, + head_oid, + osd_op_p.at_version, + head_obc->obs.oi.version, + 0, + osd_reqid_t(), + obc->obs.oi.mtime, + 0} + ); + + head_obc->obs.oi.prior_version = head_obc->obs.oi.version; + head_obc->obs.oi.version = osd_op_p.at_version; + + std::map<std::string, ceph::bufferlist, std::less<>> attrs; + ceph::bufferlist bl; + encode(head_obc->ssc->snapset, bl); + attrs[SS_ATTR] = std::move(bl); + + bl.clear(); + head_obc->obs.oi.encode_no_oid(bl, + pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr)); + attrs[OI_ATTR] = std::move(bl); + txn.setattrs( + pg->get_collection_ref()->get_cid(), + ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD}, + attrs); +} + +SnapTrimObjSubEvent::remove_or_update_iertr::future< + SnapTrimObjSubEvent::remove_or_update_ret_t> +SnapTrimObjSubEvent::remove_or_update( + ObjectContextRef obc, + ObjectContextRef head_obc) +{ + auto citer = head_obc->ssc->snapset.clone_snaps.find(coid.snap); + if (citer == head_obc->ssc->snapset.clone_snaps.end()) { + logger().error("{}: No clone_snaps in snapset {} for object {}", + *this, head_obc->ssc->snapset, coid); + return crimson::ct_error::enoent::make(); + } + const auto& old_snaps = citer->second; + if (old_snaps.empty()) { + logger().error("{}: no object info snaps for object {}", + *this, coid); + return crimson::ct_error::enoent::make(); + } + if (head_obc->ssc->snapset.seq == 0) { + logger().error("{}: no snapset.seq for object {}", + *this, coid); + return crimson::ct_error::enoent::make(); + } + const OSDMapRef& osdmap = pg->get_osdmap(); + std::set<snapid_t> new_snaps; + for (const auto& old_snap : old_snaps) { + if (!osdmap->in_removed_snaps_queue(pg->get_info().pgid.pgid.pool(), + old_snap) + && old_snap != snap_to_trim) { + new_snaps.insert(old_snap); + } + } + + return seastar::do_with(ceph::os::Transaction{}, [=, this](auto &txn) { + std::vector<pg_log_entry_t> log_entries{}; + + int64_t num_objects_before_trim = delta_stats.num_objects; + osd_op_p.at_version = pg->next_version(); + auto ret = remove_or_update_iertr::now(); + if (new_snaps.empty()) { + // remove clone from snapset + logger().info("{}: {} snaps {} -> {} ... deleting", + *this, coid, old_snaps, new_snaps); + ret = remove_clone(obc, head_obc, txn, log_entries); + } else { + // save adjusted snaps for this object + logger().info("{}: {} snaps {} -> {}", + *this, coid, old_snaps, new_snaps); + ret = adjust_snaps(obc, head_obc, new_snaps, txn, log_entries); + } + return std::move(ret).safe_then_interruptible( + [&txn, obc, num_objects_before_trim, log_entries=std::move(log_entries), head_obc=std::move(head_obc), this]() mutable { + osd_op_p.at_version = pg->next_version(); + + // save head snapset + logger().debug("{}: {} new snapset {} on {}", + *this, coid, head_obc->ssc->snapset, head_obc->obs.oi); + if (head_obc->ssc->snapset.clones.empty() && head_obc->obs.oi.is_whiteout()) { + remove_head_whiteout(obc, head_obc, txn, log_entries); + } else { + update_head(obc, head_obc, txn, log_entries); + } + // Stats reporting - Set number of objects trimmed + if (num_objects_before_trim > delta_stats.num_objects) { + //int64_t num_objects_trimmed = + // num_objects_before_trim - delta_stats.num_objects; + //add_objects_trimmed_count(num_objects_trimmed); + } + }).safe_then_interruptible( + [&txn, log_entries=std::move(log_entries)] () mutable { + return remove_or_update_iertr::make_ready_future<remove_or_update_ret_t>( + std::make_pair(std::move(txn), std::move(log_entries))); + }); + }); +} + +SnapTrimObjSubEvent::remove_or_update_iertr::future<> +SnapTrimObjSubEvent::with_pg( + ShardServices &shard_services, Ref<PG> _pg) +{ + return enter_stage<interruptor>( + client_pp().wait_for_active + ).then_interruptible([this] { + return with_blocking_event<PGActivationBlocker::BlockingEvent, + interruptor>([this] (auto&& trigger) { + return pg->wait_for_active_blocker.wait(std::move(trigger)); + }); + }).then_interruptible([this] { + return enter_stage<interruptor>( + client_pp().recover_missing); + }).then_interruptible([] { + //return do_recover_missing(pg, get_target_oid()); + return seastar::now(); + }).then_interruptible([this] { + return enter_stage<interruptor>( + client_pp().get_obc); + }).then_interruptible([this] { + logger().debug("{}: getting obc for {}", *this, coid); + // end of commonality + // with_clone_obc_direct lock both clone's and head's obcs + return pg->obc_loader.with_clone_obc_direct<RWState::RWWRITE>( + coid, + [this](auto head_obc, auto clone_obc) { + logger().debug("{}: got clone_obc={}", *this, clone_obc->get_oid()); + return enter_stage<interruptor>( + client_pp().process + ).then_interruptible( + [this,clone_obc=std::move(clone_obc), head_obc=std::move(head_obc)]() mutable { + logger().debug("{}: processing clone_obc={}", *this, clone_obc->get_oid()); + return remove_or_update( + clone_obc, head_obc + ).safe_then_unpack_interruptible([clone_obc, this] + (auto&& txn, auto&& log_entries) mutable { + auto [submitted, all_completed] = pg->submit_transaction( + std::move(clone_obc), + std::move(txn), + std::move(osd_op_p), + std::move(log_entries)); + return submitted.then_interruptible( + [all_completed=std::move(all_completed), this] () mutable { + return enter_stage<interruptor>( + wait_repop + ).then_interruptible([all_completed=std::move(all_completed)] () mutable { + return std::move(all_completed); + }); + }); + }); + }); + }).handle_error_interruptible( + remove_or_update_iertr::pass_further{}, + crimson::ct_error::assert_all{"unexpected error in SnapTrimObjSubEvent"} + ); + }); +} + +void SnapTrimObjSubEvent::print(std::ostream &lhs) const +{ + lhs << "SnapTrimObjSubEvent(" + << "coid=" << coid + << " snapid=" << snap_to_trim + << ")"; +} + +void SnapTrimObjSubEvent::dump_detail(Formatter *f) const +{ + f->open_object_section("SnapTrimObjSubEvent"); + f->dump_stream("coid") << coid; + f->close_section(); +} + +} // namespace crimson::osd diff --git a/src/crimson/osd/osd_operations/snaptrim_event.h b/src/crimson/osd/osd_operations/snaptrim_event.h new file mode 100644 index 000000000..a3a970a04 --- /dev/null +++ b/src/crimson/osd/osd_operations/snaptrim_event.h @@ -0,0 +1,210 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <iostream> +#include <seastar/core/future.hh> + +#include "crimson/osd/osdmap_gate.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/osd_operations/common/pg_pipeline.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/pg_activation_blocker.h" +#include "osd/osd_types.h" +#include "osd/PGPeeringEvent.h" +#include "osd/PeeringState.h" + +namespace ceph { + class Formatter; +} + +class SnapMapper; + +namespace crimson::osd { + +class OSD; +class ShardServices; + +// trim up to `max` objects for snapshot `snapid +class SnapTrimEvent final : public PhasedOperationT<SnapTrimEvent> { +public: + using remove_or_update_ertr = + crimson::errorator<crimson::ct_error::enoent>; + using remove_or_update_iertr = + crimson::interruptible::interruptible_errorator< + IOInterruptCondition, remove_or_update_ertr>; + using snap_trim_ertr = remove_or_update_ertr::extend< + crimson::ct_error::eagain>; + using snap_trim_iertr = remove_or_update_iertr::extend< + crimson::ct_error::eagain>; + + static constexpr OperationTypeCode type = OperationTypeCode::snaptrim_event; + + SnapTrimEvent(Ref<PG> pg, + SnapMapper& snap_mapper, + const snapid_t snapid, + const bool needs_pause) + : pg(std::move(pg)), + snap_mapper(snap_mapper), + snapid(snapid), + needs_pause(needs_pause) {} + + void print(std::ostream &) const final; + void dump_detail(ceph::Formatter* f) const final; + snap_trim_ertr::future<seastar::stop_iteration> start(); + snap_trim_ertr::future<seastar::stop_iteration> with_pg( + ShardServices &shard_services, Ref<PG> pg); + +private: + CommonPGPipeline& client_pp(); + + // bases on 998cb8c141bb89aafae298a9d5e130fbd78fe5f2 + struct SubOpBlocker : crimson::BlockerT<SubOpBlocker> { + static constexpr const char* type_name = "CompoundOpBlocker"; + + using id_done_t = std::pair<crimson::Operation::id_t, + remove_or_update_iertr::future<>>; + + void dump_detail(Formatter *f) const final; + + template <class... Args> + void emplace_back(Args&&... args); + + remove_or_update_iertr::future<> wait_completion(); + private: + std::vector<id_done_t> subops; + } subop_blocker; + + // we don't need to synchronize with other instances of SnapTrimEvent; + // it's here for the sake of op tracking. + struct WaitSubop : OrderedConcurrentPhaseT<WaitSubop> { + static constexpr auto type_name = "SnapTrimEvent::wait_subop"; + } wait_subop; + + // an instantiator can instruct us to go over this stage and then + // wait for the future to implement throttling. It is implemented + // that way to for the sake of tracking ops. + struct WaitTrimTimer : OrderedExclusivePhaseT<WaitTrimTimer> { + static constexpr auto type_name = "SnapTrimEvent::wait_trim_timer"; + } wait_trim_timer; + + PipelineHandle handle; + Ref<PG> pg; + SnapMapper& snap_mapper; + const snapid_t snapid; + const bool needs_pause; + +public: + PipelineHandle& get_handle() { return handle; } + + std::tuple< + StartEvent, + CommonPGPipeline::WaitForActive::BlockingEvent, + PGActivationBlocker::BlockingEvent, + CommonPGPipeline::RecoverMissing::BlockingEvent, + CommonPGPipeline::GetOBC::BlockingEvent, + CommonPGPipeline::Process::BlockingEvent, + WaitSubop::BlockingEvent, + PG::SnapTrimMutex::WaitPG::BlockingEvent, + WaitTrimTimer::BlockingEvent, + CompletionEvent + > tracking_events; + + friend class PG::SnapTrimMutex; +}; + +// remove single object. a SnapTrimEvent can create multiple subrequests. +// the division of labour is needed because of the restriction that an Op +// cannot revisite a pipeline's stage it already saw. +class SnapTrimObjSubEvent : public PhasedOperationT<SnapTrimObjSubEvent> { +public: + using remove_or_update_ertr = + crimson::errorator<crimson::ct_error::enoent>; + using remove_or_update_iertr = + crimson::interruptible::interruptible_errorator< + IOInterruptCondition, remove_or_update_ertr>; + + static constexpr OperationTypeCode type = + OperationTypeCode::snaptrimobj_subevent; + + SnapTrimObjSubEvent( + Ref<PG> pg, + const hobject_t& coid, + snapid_t snap_to_trim) + : pg(std::move(pg)), + coid(coid), + snap_to_trim(snap_to_trim) { + } + + void print(std::ostream &) const final; + void dump_detail(ceph::Formatter* f) const final; + remove_or_update_iertr::future<> start(); + remove_or_update_iertr::future<> with_pg( + ShardServices &shard_services, Ref<PG> pg); + + CommonPGPipeline& client_pp(); + +private: + object_stat_sum_t delta_stats; + + remove_or_update_iertr::future<> remove_clone( + ObjectContextRef obc, + ObjectContextRef head_obc, + ceph::os::Transaction& txn, + std::vector<pg_log_entry_t>& log_entries); + void remove_head_whiteout( + ObjectContextRef obc, + ObjectContextRef head_obc, + ceph::os::Transaction& txn, + std::vector<pg_log_entry_t>& log_entries); + interruptible_future<> adjust_snaps( + ObjectContextRef obc, + ObjectContextRef head_obc, + const std::set<snapid_t>& new_snaps, + ceph::os::Transaction& txn, + std::vector<pg_log_entry_t>& log_entries); + void update_head( + ObjectContextRef obc, + ObjectContextRef head_obc, + ceph::os::Transaction& txn, + std::vector<pg_log_entry_t>& log_entries); + + using remove_or_update_ret_t = + std::pair<ceph::os::Transaction, std::vector<pg_log_entry_t>>; + remove_or_update_iertr::future<remove_or_update_ret_t> + remove_or_update(ObjectContextRef obc, ObjectContextRef head_obc); + + // we don't need to synchronize with other instances started by + // SnapTrimEvent; it's here for the sake of op tracking. + struct WaitRepop : OrderedConcurrentPhaseT<WaitRepop> { + static constexpr auto type_name = "SnapTrimObjSubEvent::wait_repop"; + } wait_repop; + + Ref<PG> pg; + PipelineHandle handle; + osd_op_params_t osd_op_p; + const hobject_t coid; + const snapid_t snap_to_trim; + +public: + PipelineHandle& get_handle() { return handle; } + + std::tuple< + StartEvent, + CommonPGPipeline::WaitForActive::BlockingEvent, + PGActivationBlocker::BlockingEvent, + CommonPGPipeline::RecoverMissing::BlockingEvent, + CommonPGPipeline::GetOBC::BlockingEvent, + CommonPGPipeline::Process::BlockingEvent, + WaitRepop::BlockingEvent, + CompletionEvent + > tracking_events; +}; + +} // namespace crimson::osd + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::osd::SnapTrimEvent> : fmt::ostream_formatter {}; +template <> struct fmt::formatter<crimson::osd::SnapTrimObjSubEvent> : fmt::ostream_formatter {}; +#endif |