// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #pragma once #include #include #include #include #include "crimson/common/log.h" #include "crimson/common/errorator.h" #ifndef NDEBUG #define INTR_FUT_DEBUG(FMT_MSG, ...) crimson::get_logger(ceph_subsys_).trace(FMT_MSG, ##__VA_ARGS__) #else #define INTR_FUT_DEBUG(FMT_MSG, ...) #endif // The interrupt condition generally works this way: // // 1. It is created by call_with_interruption_impl method, and is recorded in the thread // local global variable "::crimson::interruptible::interrupt_cond". // 2. Any continuation that's created within the execution of the continuation // that calls the call_with_interruption_impl method will capture the "interrupt_cond"; // and when they starts to run, they will put that capture interruption condition // into "::crimson::interruptible::interrupt_cond" so that further continuations // created can also capture the interruption condition; // 3. At the end of the continuation run, the global "interrupt_cond" will be cleared // to prevent other continuations that are not supposed to be interrupted wrongly // capture an interruption condition. // With this approach, continuations capture the interrupt condition at their creation, // restore the interrupt conditions at the beginning of their execution and clear those // interrupt conditions at the end of their execution. So the global "interrupt_cond" // only hold valid interrupt conditions when the corresponding continuations are actually // running after which it gets cleared. Since continuations can't be executed simultaneously, // different continuation chains won't be able to interfere with each other. // // The global "interrupt_cond" can work as a signal about whether the continuation // is supposed to be interrupted, the reason that the global "interrupt_cond" // exists is that there may be this scenario: // // Say there's some method PG::func1(), in which the continuations created may // or may not be supposed to be interrupted in different situations. If we don't // have a global signal, we have to add an extra parameter to every method like // PG::func1() to indicate whether the current run should create to-be-interrupted // continuations or not. // // interruptor::with_interruption() and helpers can be used by users to wrap a future in // the interruption machinery. namespace crimson::os::seastore { class TransactionConflictCondition; } // GCC tries to instantiate // seastar::lw_shared_ptr. // but we *may* not have the definition of TransactionConflictCondition at this moment, // a full specialization for lw_shared_ptr_accessors helps to bypass the default // lw_shared_ptr_accessors implementation, where std::is_base_of<.., T> is used. namespace seastar::internal { template<> struct lw_shared_ptr_accessors<::crimson::os::seastore::TransactionConflictCondition, void> : lw_shared_ptr_accessors_no_esft<::crimson::os::seastore::TransactionConflictCondition> {}; } SEASTAR_CONCEPT( namespace crimson::interruptible { template class interruptible_future_detail; } namespace seastar::impl { template struct is_tuple_of_futures, Rest...>> : is_tuple_of_futures> {}; } ) namespace crimson::interruptible { struct ready_future_marker {}; struct exception_future_marker {}; template class interruptible_future_builder; template struct interruptor; template using InterruptCondRef = seastar::lw_shared_ptr; template struct interrupt_cond_t { InterruptCondRef interrupt_cond; uint64_t ref_count = 0; void set( InterruptCondRef& ic) { INTR_FUT_DEBUG( "{}: going to set interrupt_cond: {}, ic: {}", __func__, (void*)interrupt_cond.get(), (void*)ic.get()); if (!interrupt_cond) { interrupt_cond = ic; } assert(interrupt_cond.get() == ic.get()); ref_count++; INTR_FUT_DEBUG( "{}: interrupt_cond: {}, ref_count: {}", __func__, (void*)interrupt_cond.get(), ref_count); } void reset() { if (--ref_count == 0) { INTR_FUT_DEBUG( "{}: clearing interrupt_cond: {},{}", __func__, (void*)interrupt_cond.get(), typeid(InterruptCond).name()); interrupt_cond.release(); } else { INTR_FUT_DEBUG( "{}: end without clearing interrupt_cond: {},{}, ref_count: {}", __func__, (void*)interrupt_cond.get(), typeid(InterruptCond).name(), ref_count); } } }; template thread_local interrupt_cond_t interrupt_cond; extern template thread_local interrupt_cond_t interrupt_cond; template class [[nodiscard]] interruptible_future_detail {}; template struct is_interruptible_future : public std::false_type {}; template struct is_interruptible_future< interruptible_future_detail< InterruptCond, FutureType>> : public std::true_type {}; template concept IsInterruptibleFuture = is_interruptible_future::value; template concept InvokeReturnsInterruptibleFuture = IsInterruptibleFuture>; namespace internal { template auto call_with_interruption_impl( InterruptCondRef interrupt_condition, Func&& func, Args&&... args) { using futurator_t = seastar::futurize>; // there might be a case like this: // with_interruption([] { // interruptor::do_for_each([] { // ... // return interruptible_errorated_future(); // }).safe_then_interruptible([] { // ... // }); // }) // In this case, as crimson::do_for_each would directly do futurize_invoke // for "call_with_interruption", we have to make sure this invocation would // not errorly release ::crimson::interruptible::interrupt_cond // If there exists an interrupt condition, which means "Func" may not be // permitted to run as a result of the interruption, test it. If it does // need to be interrupted, return an interruption; otherwise, restore the // global "interrupt_cond" with the interruption condition, and go ahead // executing the Func. assert(interrupt_condition); auto fut = interrupt_condition->template may_interrupt< typename futurator_t::type>(); INTR_FUT_DEBUG( "call_with_interruption_impl: may_interrupt: {}, " "local interrupt_condition: {}, " "global interrupt_cond: {},{}", (bool)fut, (void*)interrupt_condition.get(), (void*)interrupt_cond.interrupt_cond.get(), typeid(InterruptCond).name()); if (fut) { return std::move(*fut); } interrupt_cond.set(interrupt_condition); auto fut2 = seastar::futurize_invoke( std::forward(func), std::forward(args)...); // Clear the global "interrupt_cond" to prevent it from interfering other // continuation chains. interrupt_cond.reset(); return fut2; } } template requires (!InterruptCond::template is_interruption_v) auto call_with_interruption( InterruptCondRef interrupt_condition, Func&& func, Ret&& fut) { using Result = std::invoke_result_t; // if "T" is already an interrupt exception, return it directly; // otherwise, upper layer application may encounter errors executing // the "Func" body. if (fut.failed()) { std::exception_ptr eptr = fut.get_exception(); if (interrupt_condition->is_interruption(eptr)) { return seastar::futurize::make_exception_future(std::move(eptr)); } return internal::call_with_interruption_impl( interrupt_condition, std::forward(func), seastar::futurize::make_exception_future( std::move(eptr))); } return internal::call_with_interruption_impl( interrupt_condition, std::forward(func), std::move(fut)); } template requires (InterruptCond::template is_interruption_v) auto call_with_interruption( InterruptCondRef interrupt_condition, Func&& func, T&& arg) { using Result = std::invoke_result_t; // if "T" is already an interrupt exception, return it directly; // otherwise, upper layer application may encounter errors executing // the "Func" body. return seastar::futurize::make_exception_future( std::get<0>(std::tuple(std::forward(arg)))); } template requires (!InterruptCond::template is_interruption_v) && (!seastar::Future) auto call_with_interruption( InterruptCondRef interrupt_condition, Func&& func, T&& arg) { return internal::call_with_interruption_impl( interrupt_condition, std::forward(func), std::forward(arg)); } template > auto call_with_interruption( InterruptCondRef interrupt_condition, Func&& func) { return internal::call_with_interruption_impl( interrupt_condition, std::forward(func)); } template > Result non_futurized_call_with_interruption( InterruptCondRef interrupt_condition, Func&& func, T&&... args) { assert(interrupt_condition); auto fut = interrupt_condition->template may_interrupt>(); INTR_FUT_DEBUG( "non_futurized_call_with_interruption may_interrupt: {}, " "interrupt_condition: {}, interrupt_cond: {},{}", (bool)fut, (void*)interrupt_condition.get(), (void*)interrupt_cond.interrupt_cond.get(), typeid(InterruptCond).name()); if (fut) { std::rethrow_exception(fut->get_exception()); } interrupt_cond.set(interrupt_condition); try { if constexpr (std::is_void_v) { std::invoke(std::forward(func), std::forward(args)...); // Clear the global "interrupt_cond" to prevent it from interfering other // continuation chains. interrupt_cond.reset(); return; } else { auto&& err = std::invoke(std::forward(func), std::forward(args)...); interrupt_cond.reset(); return std::forward(err); } } catch (std::exception& e) { // Clear the global "interrupt_cond" to prevent it from interfering other // continuation chains. interrupt_cond.reset(); throw e; } } template struct interruptible_errorator; template struct parallel_for_each_ret { static_assert(seastar::Future); using type = seastar::future<>; }; template