diff options
Diffstat (limited to 'src/crimson/common/interruptible_future.h')
-rw-r--r-- | src/crimson/common/interruptible_future.h | 1600 |
1 files changed, 1600 insertions, 0 deletions
diff --git a/src/crimson/common/interruptible_future.h b/src/crimson/common/interruptible_future.h new file mode 100644 index 000000000..c0e2c346c --- /dev/null +++ b/src/crimson/common/interruptible_future.h @@ -0,0 +1,1600 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/future-util.hh> +#include <seastar/core/do_with.hh> +#include <seastar/core/when_all.hh> +#include <seastar/core/thread.hh> + +#include "crimson/common/log.h" +#include "crimson/common/errorator.h" +#ifndef NDEBUG +#define INTR_FUT_DEBUG(FMT_MSG, ...) crimson::get_logger(ceph_subsys_).trace(FMT_MSG, ##__VA_ARGS__) +#else +#define INTR_FUT_DEBUG(FMT_MSG, ...) +#endif + +// The interrupt condition generally works this way: +// +// 1. It is created by call_with_interruption_impl method, and is recorded in the thread +// local global variable "::crimson::interruptible::interrupt_cond". +// 2. Any continuation that's created within the execution of the continuation +// that calls the call_with_interruption_impl method will capture the "interrupt_cond"; +// and when they starts to run, they will put that capture interruption condition +// into "::crimson::interruptible::interrupt_cond" so that further continuations +// created can also capture the interruption condition; +// 3. At the end of the continuation run, the global "interrupt_cond" will be cleared +// to prevent other continuations that are not supposed to be interrupted wrongly +// capture an interruption condition. +// With this approach, continuations capture the interrupt condition at their creation, +// restore the interrupt conditions at the beginning of their execution and clear those +// interrupt conditions at the end of their execution. So the global "interrupt_cond" +// only hold valid interrupt conditions when the corresponding continuations are actually +// running after which it gets cleared. Since continuations can't be executed simultaneously, +// different continuation chains won't be able to interfere with each other. +// +// The global "interrupt_cond" can work as a signal about whether the continuation +// is supposed to be interrupted, the reason that the global "interrupt_cond" +// exists is that there may be this scenario: +// +// Say there's some method PG::func1(), in which the continuations created may +// or may not be supposed to be interrupted in different situations. If we don't +// have a global signal, we have to add an extra parameter to every method like +// PG::func1() to indicate whether the current run should create to-be-interrupted +// continuations or not. +// +// interruptor::with_interruption() and helpers can be used by users to wrap a future in +// the interruption machinery. + +namespace crimson::os::seastore { + class TransactionConflictCondition; +} + +// GCC tries to instantiate +// seastar::lw_shared_ptr<crimson::os::seastore::TransactionConflictCondition>. +// but we *may* not have the definition of TransactionConflictCondition at this moment, +// a full specialization for lw_shared_ptr_accessors helps to bypass the default +// lw_shared_ptr_accessors implementation, where std::is_base_of<.., T> is used. +namespace seastar::internal { + template<> + struct lw_shared_ptr_accessors<::crimson::os::seastore::TransactionConflictCondition, void> + : lw_shared_ptr_accessors_no_esft<::crimson::os::seastore::TransactionConflictCondition> + {}; +} + +SEASTAR_CONCEPT( +namespace crimson::interruptible { + template<typename InterruptCond, typename FutureType> + class interruptible_future_detail; +} +namespace seastar::impl { + template <typename InterruptCond, typename FutureType, typename... Rest> + struct is_tuple_of_futures<std::tuple<crimson::interruptible::interruptible_future_detail<InterruptCond, FutureType>, Rest...>> + : is_tuple_of_futures<std::tuple<Rest...>> {}; +} +) + +namespace crimson::interruptible { + +struct ready_future_marker {}; +struct exception_future_marker {}; + +template <typename InterruptCond> +class interruptible_future_builder; + +template <typename InterruptCond> +struct interruptor; + +template <typename InterruptCond> +using InterruptCondRef = seastar::lw_shared_ptr<InterruptCond>; + +template <typename InterruptCond> +struct interrupt_cond_t { + InterruptCondRef<InterruptCond> interrupt_cond; + uint64_t ref_count = 0; + void set( + InterruptCondRef<InterruptCond>& ic) { + INTR_FUT_DEBUG( + "{}: going to set interrupt_cond: {}, ic: {}", + __func__, + (void*)interrupt_cond.get(), + (void*)ic.get()); + if (!interrupt_cond) { + interrupt_cond = ic; + } + assert(interrupt_cond.get() == ic.get()); + ref_count++; + INTR_FUT_DEBUG( + "{}: interrupt_cond: {}, ref_count: {}", + __func__, + (void*)interrupt_cond.get(), + ref_count); + } + void reset() { + if (--ref_count == 0) { + INTR_FUT_DEBUG( + "{}: clearing interrupt_cond: {},{}", + __func__, + (void*)interrupt_cond.get(), + typeid(InterruptCond).name()); + interrupt_cond.release(); + } else { + INTR_FUT_DEBUG( + "{}: end without clearing interrupt_cond: {},{}, ref_count: {}", + __func__, + (void*)interrupt_cond.get(), + typeid(InterruptCond).name(), + ref_count); + } + } +}; + +template <typename InterruptCond> +thread_local interrupt_cond_t<InterruptCond> interrupt_cond; + +extern template thread_local interrupt_cond_t<crimson::os::seastore::TransactionConflictCondition> +interrupt_cond<crimson::os::seastore::TransactionConflictCondition>; + +template <typename InterruptCond, typename FutureType> +class [[nodiscard]] interruptible_future_detail {}; + +template <typename FutureType> +struct is_interruptible_future : public std::false_type {}; + +template <typename InterruptCond, typename FutureType> +struct is_interruptible_future< + interruptible_future_detail< + InterruptCond, + FutureType>> + : public std::true_type {}; +template <typename FutureType> +concept IsInterruptibleFuture = is_interruptible_future<FutureType>::value; +template <typename Func, typename... Args> +concept InvokeReturnsInterruptibleFuture = + IsInterruptibleFuture<std::invoke_result_t<Func, Args...>>; + +namespace internal { + +template <typename InterruptCond, typename Func, typename... Args> +auto call_with_interruption_impl( + InterruptCondRef<InterruptCond> interrupt_condition, + Func&& func, Args&&... args) +{ + using futurator_t = seastar::futurize<std::invoke_result_t<Func, Args...>>; + // there might be a case like this: + // with_interruption([] { + // interruptor::do_for_each([] { + // ... + // return interruptible_errorated_future(); + // }).safe_then_interruptible([] { + // ... + // }); + // }) + // In this case, as crimson::do_for_each would directly do futurize_invoke + // for "call_with_interruption", we have to make sure this invocation would + // not errorly release ::crimson::interruptible::interrupt_cond<InterruptCond> + + // If there exists an interrupt condition, which means "Func" may not be + // permitted to run as a result of the interruption, test it. If it does + // need to be interrupted, return an interruption; otherwise, restore the + // global "interrupt_cond" with the interruption condition, and go ahead + // executing the Func. + assert(interrupt_condition); + auto fut = interrupt_condition->template may_interrupt< + typename futurator_t::type>(); + INTR_FUT_DEBUG( + "call_with_interruption_impl: may_interrupt: {}, " + "local interrupt_condition: {}, " + "global interrupt_cond: {},{}", + (bool)fut, + (void*)interrupt_condition.get(), + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + if (fut) { + return std::move(*fut); + } + interrupt_cond<InterruptCond>.set(interrupt_condition); + + auto fut2 = seastar::futurize_invoke( + std::forward<Func>(func), + std::forward<Args>(args)...); + // Clear the global "interrupt_cond" to prevent it from interfering other + // continuation chains. + interrupt_cond<InterruptCond>.reset(); + return fut2; +} + +} + +template <typename InterruptCond, typename Func, seastar::Future Ret> +requires (!InterruptCond::template is_interruption_v<Ret>) +auto call_with_interruption( + InterruptCondRef<InterruptCond> interrupt_condition, + Func&& func, Ret&& fut) +{ + using Result = std::invoke_result_t<Func, Ret>; + // if "T" is already an interrupt exception, return it directly; + // otherwise, upper layer application may encounter errors executing + // the "Func" body. + if (fut.failed()) { + std::exception_ptr eptr = fut.get_exception(); + if (interrupt_condition->is_interruption(eptr)) { + return seastar::futurize<Result>::make_exception_future(std::move(eptr)); + } + return internal::call_with_interruption_impl( + interrupt_condition, + std::forward<Func>(func), + seastar::futurize<Ret>::make_exception_future( + std::move(eptr))); + } + return internal::call_with_interruption_impl( + interrupt_condition, + std::forward<Func>(func), + std::move(fut)); +} + +template <typename InterruptCond, typename Func, typename T> +requires (InterruptCond::template is_interruption_v<T>) +auto call_with_interruption( + InterruptCondRef<InterruptCond> interrupt_condition, + Func&& func, T&& arg) +{ + using Result = std::invoke_result_t<Func, T>; + // if "T" is already an interrupt exception, return it directly; + // otherwise, upper layer application may encounter errors executing + // the "Func" body. + return seastar::futurize<Result>::make_exception_future( + std::get<0>(std::tuple(std::forward<T>(arg)))); +} + +template <typename InterruptCond, typename Func, typename T> +requires (!InterruptCond::template is_interruption_v<T>) && (!seastar::Future<T>) +auto call_with_interruption( + InterruptCondRef<InterruptCond> interrupt_condition, + Func&& func, T&& arg) +{ + return internal::call_with_interruption_impl( + interrupt_condition, + std::forward<Func>(func), + std::forward<T>(arg)); +} + +template <typename InterruptCond, typename Func, + typename Result = std::invoke_result_t<Func>> +auto call_with_interruption( + InterruptCondRef<InterruptCond> interrupt_condition, + Func&& func) +{ + return internal::call_with_interruption_impl( + interrupt_condition, + std::forward<Func>(func)); +} + +template <typename InterruptCond, typename Func, typename... T, + typename Result = std::invoke_result_t<Func, T...>> +Result non_futurized_call_with_interruption( + InterruptCondRef<InterruptCond> interrupt_condition, + Func&& func, T&&... args) +{ + assert(interrupt_condition); + auto fut = interrupt_condition->template may_interrupt<seastar::future<>>(); + INTR_FUT_DEBUG( + "non_futurized_call_with_interruption may_interrupt: {}, " + "interrupt_condition: {}, interrupt_cond: {},{}", + (bool)fut, + (void*)interrupt_condition.get(), + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + if (fut) { + std::rethrow_exception(fut->get_exception()); + } + interrupt_cond<InterruptCond>.set(interrupt_condition); + try { + if constexpr (std::is_void_v<Result>) { + std::invoke(std::forward<Func>(func), std::forward<T>(args)...); + + // Clear the global "interrupt_cond" to prevent it from interfering other + // continuation chains. + interrupt_cond<InterruptCond>.reset(); + return; + } else { + auto&& err = std::invoke(std::forward<Func>(func), std::forward<T>(args)...); + interrupt_cond<InterruptCond>.reset(); + return std::forward<Result>(err); + } + } catch (std::exception& e) { + // Clear the global "interrupt_cond" to prevent it from interfering other + // continuation chains. + interrupt_cond<InterruptCond>.reset(); + throw e; + } +} + +template <typename InterruptCond, typename Errorator> +struct interruptible_errorator; + +template <typename T> +struct parallel_for_each_ret { + static_assert(seastar::Future<T>); + using type = seastar::future<>; +}; + +template <template <typename...> typename ErroratedFuture, typename T> +struct parallel_for_each_ret< + ErroratedFuture< + ::crimson::errorated_future_marker<T>>> { + using type = ErroratedFuture<::crimson::errorated_future_marker<void>>; +}; + +template <typename InterruptCond, typename FutureType> +class parallel_for_each_state final : private seastar::continuation_base<> { + using elem_ret_t = std::conditional_t< + IsInterruptibleFuture<FutureType>, + typename FutureType::core_type, + FutureType>; + using future_t = interruptible_future_detail< + InterruptCond, + typename parallel_for_each_ret<elem_ret_t>::type>; + std::vector<future_t> _incomplete; + seastar::promise<> _result; + std::exception_ptr _ex; +private: + void wait_for_one() noexcept { + while (!_incomplete.empty() && _incomplete.back().available()) { + if (_incomplete.back().failed()) { + _ex = _incomplete.back().get_exception(); + } + _incomplete.pop_back(); + } + if (!_incomplete.empty()) { + seastar::internal::set_callback(std::move(_incomplete.back()), + static_cast<continuation_base<>*>(this)); + _incomplete.pop_back(); + return; + } + if (__builtin_expect(bool(_ex), false)) { + _result.set_exception(std::move(_ex)); + } else { + _result.set_value(); + } + delete this; + } + virtual void run_and_dispose() noexcept override { + if (_state.failed()) { + _ex = std::move(_state).get_exception(); + } + _state = {}; + wait_for_one(); + } + task* waiting_task() noexcept override { return _result.waiting_task(); } +public: + parallel_for_each_state(size_t n) { + _incomplete.reserve(n); + } + void add_future(future_t&& f) { + _incomplete.push_back(std::move(f)); + } + future_t get_future() { + auto ret = _result.get_future(); + wait_for_one(); + return ret; + } + static future_t now() { + return seastar::now(); + } +}; + +template <typename InterruptCond, typename T> +class [[nodiscard]] interruptible_future_detail<InterruptCond, seastar::future<T>> + : private seastar::future<T> { +public: + using core_type = seastar::future<T>; + template <typename U> + using interrupt_futurize_t = + typename interruptor<InterruptCond>::template futurize_t<U>; + using core_type::get0; + using core_type::core_type; + using core_type::get_exception; + using core_type::ignore_ready_future; + + [[gnu::always_inline]] + interruptible_future_detail(seastar::future<T>&& base) + : core_type(std::move(base)) + {} + + using value_type = typename seastar::future<T>::value_type; + using tuple_type = typename seastar::future<T>::tuple_type; + + [[gnu::always_inline]] + value_type&& get() { + if (core_type::available()) { + return core_type::get(); + } else { + // destined to wait! + auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; + INTR_FUT_DEBUG( + "interruptible_future_detail::get() waiting, interrupt_cond: {},{}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + interrupt_cond<InterruptCond>.reset(); + auto&& value = core_type::get(); + interrupt_cond<InterruptCond>.set(interruption_condition); + INTR_FUT_DEBUG( + "interruptible_future_detail::get() got, interrupt_cond: {},{}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + return std::move(value); + } + } + + using core_type::available; + using core_type::failed; + + template <typename Func, + typename Result = interrupt_futurize_t< + std::invoke_result_t<Func, seastar::future<T>>>> + [[gnu::always_inline]] + Result then_wrapped_interruptible(Func&& func) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + return core_type::then_wrapped( + [func=std::move(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& fut) mutable { + return call_with_interruption( + std::move(interrupt_condition), + std::forward<Func>(func), + std::move(fut)); + }); + } + + template <typename Func> + [[gnu::always_inline]] + auto then_interruptible(Func&& func) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + if constexpr (std::is_void_v<T>) { + auto fut = core_type::then( + [func=std::move(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + () mutable { + return call_with_interruption( + interrupt_condition, + std::move(func)); + }); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } else { + auto fut = core_type::then( + [func=std::move(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (T&& arg) mutable { + return call_with_interruption( + interrupt_condition, + std::move(func), + std::forward<T>(arg)); + }); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + } + + template <typename Func> + [[gnu::always_inline]] + auto then_unpack_interruptible(Func&& func) { + return then_interruptible([func=std::forward<Func>(func)](T&& tuple) mutable { + return std::apply(std::forward<Func>(func), std::move(tuple)); + }); + } + + template <typename Func, + typename Result =interrupt_futurize_t< + std::result_of_t<Func(std::exception_ptr)>>> + [[gnu::always_inline]] + Result handle_exception_interruptible(Func&& func) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + return core_type::then_wrapped( + [func=std::forward<Func>(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& fut) mutable { + if (!fut.failed()) { + return seastar::make_ready_future<T>(fut.get()); + } else { + return call_with_interruption( + interrupt_condition, + std::move(func), + fut.get_exception()); + } + }); + } + + template <bool may_interrupt = true, typename Func, + typename Result = interrupt_futurize_t< + std::result_of_t<Func()>>> + [[gnu::always_inline]] + Result finally_interruptible(Func&& func) { + if constexpr (may_interrupt) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + return core_type::then_wrapped( + [func=std::forward<Func>(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& fut) mutable { + return call_with_interruption( + interrupt_condition, + std::move(func)); + }); + } else { + return core_type::finally(std::forward<Func>(func)); + } + } + + template <typename Func, + typename Result = interrupt_futurize_t< + std::result_of_t<Func( + typename seastar::function_traits<Func>::template arg<0>::type)>>> + [[gnu::always_inline]] + Result handle_exception_type_interruptible(Func&& func) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + using trait = seastar::function_traits<Func>; + static_assert(trait::arity == 1, "func can take only one parameter"); + using ex_type = typename trait::template arg<0>::type; + return core_type::then_wrapped( + [func=std::forward<Func>(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& fut) mutable -> Result { + if (!fut.failed()) { + return seastar::make_ready_future<T>(fut.get()); + } else { + try { + std::rethrow_exception(fut.get_exception()); + } catch (ex_type& ex) { + return call_with_interruption( + interrupt_condition, + std::move(func), ex); + } + } + }); + } + + + using my_type = interruptible_future_detail<InterruptCond, seastar::future<T>>; + + template <typename Func> + [[gnu::always_inline]] + my_type finally(Func&& func) { + return core_type::finally(std::forward<Func>(func)); + } +private: + template <typename Func> + [[gnu::always_inline]] + auto handle_interruption(Func&& func) { + return core_type::then_wrapped( + [func=std::move(func)](auto&& fut) mutable { + if (fut.failed()) { + std::exception_ptr ex = fut.get_exception(); + if (InterruptCond::is_interruption(ex)) { + return seastar::futurize_invoke(std::move(func), std::move(ex)); + } else { + return seastar::make_exception_future<T>(std::move(ex)); + } + } else { + return seastar::make_ready_future<T>(fut.get()); + } + }); + } + + seastar::future<T> to_future() { + return static_cast<core_type&&>(std::move(*this)); + } + // this is only supposed to be invoked by seastar functions + template <typename Func, + typename Result = interrupt_futurize_t< + std::result_of_t<Func(seastar::future<T>)>>> + [[gnu::always_inline]] + Result then_wrapped(Func&& func) { + return core_type::then_wrapped( + [func=std::move(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& fut) mutable { + return call_with_interruption( + interrupt_condition, + std::forward<Func>(func), + std::move(fut)); + }); + } + friend interruptor<InterruptCond>; + friend class interruptible_future_builder<InterruptCond>; + template <typename U> + friend struct ::seastar::futurize; + template <typename> + friend class ::seastar::future; + template <typename HeldState, typename Future> + friend class seastar::internal::do_with_state; + template<typename TX, typename F> + friend inline auto ::seastar::internal::do_with_impl(TX&& rvalue, F&& f); + template<typename T1, typename T2, typename T3_or_F, typename... More> + friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more); + template <typename T1, typename T2, typename... More> + friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more); + template <typename, typename> + friend class ::crimson::maybe_handle_error_t; + template <typename> + friend class ::seastar::internal::extract_values_from_futures_vector; + template <typename, typename> + friend class interruptible_future_detail; + template <typename ResolvedVectorTransform, typename Future> + friend inline typename ResolvedVectorTransform::future_type + seastar::internal::complete_when_all( + std::vector<Future>&& futures, + typename std::vector<Future>::iterator pos) noexcept; + template <typename> + friend class ::seastar::internal::when_all_state_component; + template <typename Lock, typename Func> + friend inline auto seastar::with_lock(Lock& lock, Func&& f); + template <typename IC, typename FT> + friend class parallel_for_each_state; +}; + +template <typename InterruptCond, typename Errorator> +struct interruptible_errorator { + using base_ertr = Errorator; + using intr_cond_t = InterruptCond; + + template <typename ValueT = void> + using future = interruptible_future_detail<InterruptCond, + typename Errorator::template future<ValueT>>; + + template <class... NewAllowedErrorsT> + using extend = interruptible_errorator< + InterruptCond, + typename Errorator::template extend<NewAllowedErrorsT...>>; + + template <class Ertr> + using extend_ertr = interruptible_errorator< + InterruptCond, + typename Errorator::template extend_ertr<Ertr>>; + + template <typename ValueT = void, typename... A> + static interruptible_future_detail< + InterruptCond, + typename Errorator::template future<ValueT>> + make_ready_future(A&&... value) { + return interruptible_future_detail< + InterruptCond, typename Errorator::template future<ValueT>>( + Errorator::template make_ready_future<ValueT>( + std::forward<A>(value)...)); + } + static interruptible_future_detail< + InterruptCond, + typename Errorator::template future<>> now() { + return interruptible_future_detail< + InterruptCond, typename Errorator::template future<>>( + Errorator::now()); + } + + using pass_further = typename Errorator::pass_further; +}; + +template <typename InterruptCond, + template <typename...> typename ErroratedFuture, + typename T> +class [[nodiscard]] interruptible_future_detail< + InterruptCond, + ErroratedFuture<::crimson::errorated_future_marker<T>>> + : private ErroratedFuture<::crimson::errorated_future_marker<T>> +{ +public: + using core_type = ErroratedFuture<crimson::errorated_future_marker<T>>; + using errorator_type = typename core_type::errorator_type; + using interrupt_errorator_type = + interruptible_errorator<InterruptCond, errorator_type>; + using interrupt_cond_type = InterruptCond; + + template <typename U> + using interrupt_futurize_t = + typename interruptor<InterruptCond>::template futurize_t<U>; + + using core_type::available; + using core_type::failed; + using core_type::core_type; + using core_type::get_exception; + + using value_type = typename core_type::value_type; + + interruptible_future_detail(seastar::future<T>&& fut) + : core_type(std::move(fut)) + {} + + template <template <typename...> typename ErroratedFuture2, + typename... U> + [[gnu::always_inline]] + interruptible_future_detail( + ErroratedFuture2<::crimson::errorated_future_marker<U...>>&& fut) + : core_type(std::move(fut)) {} + + template <template <typename...> typename ErroratedFuture2, + typename... U> + [[gnu::always_inline]] + interruptible_future_detail( + interruptible_future_detail<InterruptCond, + ErroratedFuture2<::crimson::errorated_future_marker<U...>>>&& fut) + : core_type(static_cast<typename std::decay_t<decltype(fut)>::core_type&&>(fut)) { + using src_errorator_t = \ + typename ErroratedFuture2< + ::crimson::errorated_future_marker<U...>>::errorator_type; + static_assert(core_type::errorator_type::template contains_once_v< + src_errorator_t>, + "conversion is only possible from less-or-eq errorated future!"); + } + + [[gnu::always_inline]] + interruptible_future_detail( + interruptible_future_detail<InterruptCond, seastar::future<T>>&& fut) + : core_type(static_cast<seastar::future<T>&&>(fut)) {} + + template <class... A> + [[gnu::always_inline]] + interruptible_future_detail(ready_future_marker, A&&... a) + : core_type(::seastar::make_ready_future<typename core_type::value_type>( + std::forward<A>(a)...)) { + } + [[gnu::always_inline]] + interruptible_future_detail(exception_future_marker, ::seastar::future_state_base&& state) noexcept + : core_type(::seastar::futurize<core_type>::make_exception_future(std::move(state))) { + } + [[gnu::always_inline]] + interruptible_future_detail(exception_future_marker, std::exception_ptr&& ep) noexcept + : core_type(::seastar::futurize<core_type>::make_exception_future(std::move(ep))) { + } + + template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT, + std::enable_if_t<!interruptible, int> = 0> + [[gnu::always_inline]] + auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) { + auto fut = core_type::safe_then( + std::forward<ValueInterruptCondT>(valfunc), + std::forward<ErrorVisitorT>(errfunc)); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + + template <typename... Args> + auto si_then(Args&&... args) { + return safe_then_interruptible(std::forward<Args>(args)...); + } + + + template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT, + typename U = T, std::enable_if_t<!std::is_void_v<U> && interruptible, int> = 0> + [[gnu::always_inline]] + auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + auto fut = core_type::safe_then( + [func=std::move(valfunc), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (T&& args) mutable { + return call_with_interruption( + interrupt_condition, + std::move(func), + std::forward<T>(args)); + }, [func=std::move(errfunc), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& err) mutable -> decltype(auto) { + constexpr bool return_void = std::is_void_v< + std::invoke_result_t<ErrorVisitorT, + std::decay_t<decltype(err)>>>; + constexpr bool return_err = ::crimson::is_error_v< + std::decay_t<std::invoke_result_t<ErrorVisitorT, + std::decay_t<decltype(err)>>>>; + if constexpr (return_err || return_void) { + return non_futurized_call_with_interruption( + interrupt_condition, + std::move(func), + std::move(err)); + } else { + return call_with_interruption( + interrupt_condition, + std::move(func), + std::move(err)); + } + }); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + + template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT, + typename U = T, std::enable_if_t<std::is_void_v<U> && interruptible, int> = 0> + [[gnu::always_inline]] + auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + auto fut = core_type::safe_then( + [func=std::move(valfunc), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + () mutable { + return call_with_interruption( + interrupt_condition, + std::move(func)); + }, [func=std::move(errfunc), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& err) mutable -> decltype(auto) { + constexpr bool return_void = std::is_void_v< + std::invoke_result_t<ErrorVisitorT, + std::decay_t<decltype(err)>>>; + constexpr bool return_err = ::crimson::is_error_v< + std::decay_t<std::invoke_result_t<ErrorVisitorT, + std::decay_t<decltype(err)>>>>; + if constexpr (return_err || return_void) { + return non_futurized_call_with_interruption( + interrupt_condition, + std::move(func), + std::move(err)); + } else { + return call_with_interruption( + interrupt_condition, + std::move(func), + std::move(err)); + } + }); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + + template <bool interruptible = true, typename ValueInterruptCondT, + typename U = T, std::enable_if_t<std::is_void_v<T> && interruptible, int> = 0> + [[gnu::always_inline]] + auto safe_then_interruptible(ValueInterruptCondT&& valfunc) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + auto fut = core_type::safe_then( + [func=std::move(valfunc), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + () mutable { + return call_with_interruption( + interrupt_condition, + std::move(func)); + }); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + + template <typename ValFuncT, typename ErrorFuncT> + [[gnu::always_inline]] + auto safe_then_unpack_interruptible(ValFuncT&& func, ErrorFuncT&& errfunc) { + return safe_then_interruptible([func=std::forward<ValFuncT>(func)](T&& tuple) mutable { + return std::apply(std::forward<ValFuncT>(func), std::move(tuple)); + }, std::forward<ErrorFuncT>(errfunc)); + } + + template <typename ValFuncT> + [[gnu::always_inline]] + auto safe_then_unpack_interruptible(ValFuncT&& func) { + return safe_then_interruptible([func=std::forward<ValFuncT>(func)](T&& tuple) mutable { + return std::apply(std::forward<ValFuncT>(func), std::move(tuple)); + }); + } + + template <bool interruptible = true, typename ValueInterruptCondT, + typename U = T, std::enable_if_t<!std::is_void_v<T> && interruptible, int> = 0> + [[gnu::always_inline]] + auto safe_then_interruptible(ValueInterruptCondT&& valfunc) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + auto fut = core_type::safe_then( + [func=std::move(valfunc), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (T&& arg) mutable { + return call_with_interruption( + interrupt_condition, + std::move(func), + std::forward<T>(arg)); + }); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + + template <bool interruptible = true, typename ValueInterruptCondT, + std::enable_if_t<!interruptible, int> = 0> + [[gnu::always_inline]] + auto safe_then_interruptible(ValueInterruptCondT&& valfunc) { + auto fut = core_type::safe_then(std::forward<ValueInterruptCondT>(valfunc)); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + + template <typename ValueInterruptCondT, + typename ErrorVisitorHeadT, + typename... ErrorVisitorTailT> + [[gnu::always_inline]] + auto safe_then_interruptible(ValueInterruptCondT&& valfunc, + ErrorVisitorHeadT&& err_func_head, + ErrorVisitorTailT&&... err_func_tail) { + return safe_then_interruptible( + std::forward<ValueInterruptCondT>(valfunc), + ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head), + std::forward<ErrorVisitorTailT>(err_func_tail)...)); + } + + template <typename ValueInterruptCondT, + typename ErrorVisitorHeadT, + typename... ErrorVisitorTailT> + [[gnu::always_inline]] + auto safe_then_interruptible_tuple(ValueInterruptCondT&& valfunc, + ErrorVisitorHeadT&& err_func_head, + ErrorVisitorTailT&&... err_func_tail) { + return safe_then_interruptible( + std::forward<ValueInterruptCondT>(valfunc), + ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head), + std::forward<ErrorVisitorTailT>(err_func_tail)...)); + } + + template <typename ValFuncT, + typename ErrorVisitorHeadT, + typename... ErrorVisitorTailT> + [[gnu::always_inline]] + auto safe_then_unpack_interruptible_tuple( + ValFuncT&& valfunc, + ErrorVisitorHeadT&& err_func_head, + ErrorVisitorTailT&&... err_func_tail) { + return safe_then_interruptible_tuple( + [valfunc=std::forward<ValFuncT>(valfunc)](T&& tuple) mutable { + return std::apply(std::forward<ValFuncT>(valfunc), std::move(tuple)); + }, + ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head), + std::forward<ErrorVisitorTailT>(err_func_tail)...)); + } + + template <bool interruptible = true, typename ErrorFunc> + auto handle_error_interruptible(ErrorFunc&& errfunc) { + if constexpr (interruptible) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + auto fut = core_type::handle_error( + [errfunc=std::move(errfunc), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& err) mutable -> decltype(auto) { + constexpr bool return_void = std::is_void_v< + std::invoke_result_t<ErrorFunc, + std::decay_t<decltype(err)>>>; + constexpr bool return_err = ::crimson::is_error_v< + std::decay_t<std::invoke_result_t<ErrorFunc, + std::decay_t<decltype(err)>>>>; + if constexpr (return_err || return_void) { + return non_futurized_call_with_interruption( + interrupt_condition, + std::move(errfunc), + std::move(err)); + } else { + return call_with_interruption( + interrupt_condition, + std::move(errfunc), + std::move(err)); + } + }); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } else { + return core_type::handle_error(std::forward<ErrorFunc>(errfunc)); + } + } + + template <typename ErrorFuncHead, + typename... ErrorFuncTail> + auto handle_error_interruptible(ErrorFuncHead&& error_func_head, + ErrorFuncTail&&... error_func_tail) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + static_assert(sizeof...(ErrorFuncTail) > 0); + return this->handle_error_interruptible( + ::crimson::composer( + std::forward<ErrorFuncHead>(error_func_head), + std::forward<ErrorFuncTail>(error_func_tail)...)); + } + + template <typename Func> + [[gnu::always_inline]] + auto finally(Func&& func) { + auto fut = core_type::finally(std::forward<Func>(func)); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + +private: + using core_type::_then; + template <typename Func> + [[gnu::always_inline]] + auto handle_interruption(Func&& func) { + // see errorator.h safe_then definition + using func_result_t = + typename std::invoke_result<Func, std::exception_ptr>::type; + using func_ertr_t = + typename core_type::template get_errorator_t<func_result_t>; + using this_ertr_t = typename core_type::errorator_type; + using ret_ertr_t = typename this_ertr_t::template extend_ertr<func_ertr_t>; + using futurator_t = typename ret_ertr_t::template futurize<func_result_t>; + return core_type::then_wrapped( + [func=std::move(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& fut) mutable + -> typename futurator_t::type { + if (fut.failed()) { + std::exception_ptr ex = fut.get_exception(); + if (InterruptCond::is_interruption(ex)) { + return futurator_t::invoke(std::move(func), std::move(ex)); + } else { + return futurator_t::make_exception_future(std::move(ex)); + } + } else { + return std::move(fut); + } + }); + } + + ErroratedFuture<::crimson::errorated_future_marker<T>> + to_future() { + return static_cast<core_type&&>(std::move(*this)); + } + + friend class interruptor<InterruptCond>; + friend class interruptible_future_builder<InterruptCond>; + template <typename U> + friend struct ::seastar::futurize; + template <typename> + friend class ::seastar::future; + template<typename TX, typename F> + friend inline auto ::seastar::internal::do_with_impl(TX&& rvalue, F&& f); + template<typename T1, typename T2, typename T3_or_F, typename... More> + friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more); + template <typename T1, typename T2, typename... More> + friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more); + template <typename HeldState, typename Future> + friend class seastar::internal::do_with_state; + template <typename, typename> + friend class ::crimson::maybe_handle_error_t; + template <typename, typename> + friend class interruptible_future_detail; + template <typename Lock, typename Func> + friend inline auto seastar::with_lock(Lock& lock, Func&& f); + template <typename IC, typename FT> + friend class parallel_for_each_state; +}; + +template <typename InterruptCond, typename T = void> +using interruptible_future = + interruptible_future_detail<InterruptCond, seastar::future<T>>; + +template <typename InterruptCond, typename Errorator, typename T = void> +using interruptible_errorated_future = + interruptible_future_detail< + InterruptCond, + typename Errorator::template future<T>>; + +template <typename InterruptCond> +struct interruptor +{ +public: + using condition = InterruptCond; + + template <typename FutureType> + [[gnu::always_inline]] + static interruptible_future_detail<InterruptCond, FutureType> + make_interruptible(FutureType&& fut) { + return interruptible_future_detail<InterruptCond, FutureType>(std::move(fut)); + } + + [[gnu::always_inline]] + static interruptible_future_detail<InterruptCond, seastar::future<>> now() { + return interruptible_future_detail< + InterruptCond, + seastar::future<>>(seastar::now()); + } + + template <typename ValueT = void, typename... A> + [[gnu::always_inline]] + static interruptible_future_detail<InterruptCond, seastar::future<ValueT>> + make_ready_future(A&&... value) { + return interruptible_future_detail<InterruptCond, seastar::future<ValueT>>( + seastar::make_ready_future<ValueT>(std::forward<A>(value)...)); + } + + template <typename T> + struct futurize { + using type = interruptible_future_detail< + InterruptCond, typename seastar::futurize<T>::type>; + }; + + template <typename FutureType> + struct futurize<interruptible_future_detail<InterruptCond, FutureType>> { + using type = interruptible_future_detail<InterruptCond, FutureType>; + }; + + template <typename T> + using futurize_t = typename futurize<T>::type; + + template <typename Container, typename AsyncAction> + [[gnu::always_inline]] + static auto do_for_each(Container& c, AsyncAction&& action) { + return do_for_each(std::begin(c), std::end(c), + std::forward<AsyncAction>(action)); + } + + template <typename OpFunc, typename OnInterrupt, + typename... Params> + static inline auto with_interruption_cond( + OpFunc&& opfunc, OnInterrupt&& efunc, InterruptCond &&cond, Params&&... params) { + INTR_FUT_DEBUG( + "with_interruption_cond: interrupt_cond: {}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get()); + return internal::call_with_interruption_impl( + seastar::make_lw_shared<InterruptCond>(std::move(cond)), + std::forward<OpFunc>(opfunc), + std::forward<Params>(params)... + ).template handle_interruption(std::move(efunc)); + } + + template <typename OpFunc, typename OnInterrupt, + typename... InterruptCondParams> + static inline auto with_interruption( + OpFunc&& opfunc, OnInterrupt&& efunc, InterruptCondParams&&... params) { + return with_interruption_cond( + std::forward<OpFunc>(opfunc), + std::forward<OnInterrupt>(efunc), + InterruptCond(std::forward<InterruptCondParams>(params)...)); + } + + template <typename Error, + typename Func, + typename... Params> + static inline auto with_interruption_to_error( + Func &&f, InterruptCond &&cond, Params&&... params) { + using func_result_t = std::invoke_result_t<Func, Params...>; + using func_ertr_t = + typename seastar::template futurize< + func_result_t>::core_type::errorator_type; + using with_trans_ertr = + typename func_ertr_t::template extend_ertr<errorator<Error>>; + + using value_type = typename func_result_t::value_type; + using ftype = typename std::conditional_t< + std::is_same_v<value_type, seastar::internal::monostate>, + typename with_trans_ertr::template future<>, + typename with_trans_ertr::template future<value_type>>; + + return with_interruption_cond( + std::forward<Func>(f), + [](auto e) -> ftype { + return Error::make(); + }, + std::forward<InterruptCond>(cond), + std::forward<Params>(params)...); + } + + template <typename Func> + [[gnu::always_inline]] + static auto wrap_function(Func&& func) { + return [func=std::forward<Func>(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]() mutable { + return call_with_interruption( + interrupt_condition, + std::forward<Func>(func)); + }; + } + + template <typename Iterator, + InvokeReturnsInterruptibleFuture<typename Iterator::reference> AsyncAction> + [[gnu::always_inline]] + static auto do_for_each(Iterator begin, Iterator end, AsyncAction&& action) { + using Result = std::invoke_result_t<AsyncAction, typename Iterator::reference>; + if constexpr (seastar::Future<typename Result::core_type>) { + return make_interruptible( + ::seastar::do_for_each(begin, end, + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (typename Iterator::reference x) mutable { + return call_with_interruption( + interrupt_condition, + std::move(action), + std::forward<decltype(*begin)>(x)).to_future(); + }) + ); + } else { + return make_interruptible( + ::crimson::do_for_each(begin, end, + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (typename Iterator::reference x) mutable { + return call_with_interruption( + interrupt_condition, + std::move(action), + std::forward<decltype(*begin)>(x)).to_future(); + }) + ); + } + } + + template <typename Iterator, typename AsyncAction> + requires (!InvokeReturnsInterruptibleFuture<AsyncAction, typename Iterator::reference>) + [[gnu::always_inline]] + static auto do_for_each(Iterator begin, Iterator end, AsyncAction&& action) { + if constexpr (seastar::InvokeReturnsAnyFuture<AsyncAction, typename Iterator::reference>) { + return make_interruptible( + ::seastar::do_for_each(begin, end, + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (typename Iterator::reference x) mutable { + return call_with_interruption( + interrupt_condition, + std::move(action), + std::forward<decltype(*begin)>(x)); + }) + ); + } else { + return make_interruptible( + ::crimson::do_for_each(begin, end, + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (typename Iterator::reference x) mutable { + return call_with_interruption( + interrupt_condition, + std::move(action), + std::forward<decltype(*begin)>(x)); + }) + ); + } + } + + template <InvokeReturnsInterruptibleFuture AsyncAction> + [[gnu::always_inline]] + static auto repeat(AsyncAction&& action) { + using Result = std::invoke_result_t<AsyncAction>; + if constexpr (seastar::Future<typename Result::core_type>) { + return make_interruptible( + ::seastar::repeat( + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] { + return call_with_interruption( + interrupt_condition, + std::move(action)).to_future(); + }) + ); + } else { + return make_interruptible( + ::crimson::repeat( + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]() mutable { + return call_with_interruption( + interrupt_condition, + std::move(action)).to_future(); + }) + ); + } + } + template <typename AsyncAction> + requires (!InvokeReturnsInterruptibleFuture<AsyncAction>) + [[gnu::always_inline]] + static auto repeat(AsyncAction&& action) { + if constexpr (seastar::InvokeReturnsAnyFuture<AsyncAction>) { + return make_interruptible( + ::seastar::repeat( + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] { + return call_with_interruption( + interrupt_condition, + std::move(action)); + }) + ); + } else { + return make_interruptible( + ::crimson::repeat( + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] { + return call_with_interruption( + interrupt_condition, + std::move(action)); + }) + ); + } + } + + template <typename Iterator, typename Func> + static inline auto parallel_for_each( + Iterator begin, + Iterator end, + Func&& func + ) noexcept { + using ResultType = std::invoke_result_t<Func, typename Iterator::reference>; + parallel_for_each_state<InterruptCond, ResultType>* s = nullptr; + auto decorated_func = + [func=std::forward<Func>(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (decltype(*Iterator())&& x) mutable { + return call_with_interruption( + interrupt_condition, + std::forward<Func>(func), + std::forward<decltype(*begin)>(x)); + }; + // Process all elements, giving each future the following treatment: + // - available, not failed: do nothing + // - available, failed: collect exception in ex + // - not available: collect in s (allocating it if needed) + while (begin != end) { + auto f = seastar::futurize_invoke(decorated_func, *begin++); + if (!f.available() || f.failed()) { + if (!s) { + using itraits = std::iterator_traits<Iterator>; + auto n = (seastar::internal::iterator_range_estimate_vector_capacity( + begin, end, typename itraits::iterator_category()) + 1); + s = new parallel_for_each_state<InterruptCond, ResultType>(n); + } + s->add_future(std::move(f)); + } + } + // If any futures were not available, hand off to parallel_for_each_state::start(). + // Otherwise we can return a result immediately. + if (s) { + // s->get_future() takes ownership of s (and chains it to one of the futures it contains) + // so this isn't a leak + return s->get_future(); + } + return parallel_for_each_state<InterruptCond, ResultType>::now(); + } + + template <typename Container, typename Func> + static inline auto parallel_for_each(Container& container, Func&& func) noexcept { + return parallel_for_each( + std::begin(container), + std::end(container), + std::forward<Func>(func)); + } + + template <typename Iterator, typename Mapper, typename Initial, typename Reduce> + static inline interruptible_future<InterruptCond, Initial> map_reduce( + Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce&& reduce) { + struct state { + Initial result; + Reduce reduce; + }; + auto s = seastar::make_lw_shared(state{std::move(initial), std::move(reduce)}); + interruptible_future<InterruptCond> ret = seastar::make_ready_future<>(); + while (begin != end) { + ret = seastar::futurize_invoke(mapper, *begin++).then_wrapped_interruptible( + [s = s.get(), ret = std::move(ret)] (auto f) mutable { + try { + s->result = s->reduce(std::move(s->result), std::move(f.get0())); + return std::move(ret); + } catch (...) { + return std::move(ret).then_wrapped_interruptible([ex = std::current_exception()] (auto f) { + f.ignore_ready_future(); + return seastar::make_exception_future<>(ex); + }); + } + }); + } + return ret.then_interruptible([s] { + return seastar::make_ready_future<Initial>(std::move(s->result)); + }); + } + template <typename Range, typename Mapper, typename Initial, typename Reduce> + static inline interruptible_future<InterruptCond, Initial> map_reduce( + Range&& range, Mapper&& mapper, Initial initial, Reduce&& reduce) { + return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper), + std::move(initial), std::move(reduce)); + } + + template<typename Fut> + requires seastar::Future<Fut> || IsInterruptibleFuture<Fut> + static auto futurize_invoke_if_func(Fut&& fut) noexcept { + return std::forward<Fut>(fut); + } + + template<typename Func> + requires (!seastar::Future<Func>) && (!IsInterruptibleFuture<Func>) + static auto futurize_invoke_if_func(Func&& func) noexcept { + return seastar::futurize_invoke(std::forward<Func>(func)); + } + + template <typename... FutOrFuncs> + static inline auto when_all(FutOrFuncs&&... fut_or_funcs) noexcept { + return ::seastar::internal::when_all_impl( + futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...); + } + + template <typename... FutOrFuncs> + static inline auto when_all_succeed(FutOrFuncs&&... fut_or_funcs) noexcept { + return ::seastar::internal::when_all_succeed_impl( + futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...); + } + + template <typename Func, + typename Result = futurize_t<std::invoke_result_t<Func>>> + static inline Result async(Func&& func) { + auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; + INTR_FUT_DEBUG( + "interruptible_future_detail::async() yielding out, " + "interrupt_cond {},{} cleared", + (void*)interruption_condition.get(), + typeid(InterruptCond).name()); + interrupt_cond<InterruptCond>.reset(); + auto ret = seastar::async([func=std::forward<Func>(func), + interruption_condition] () mutable { + return non_futurized_call_with_interruption( + interruption_condition, std::forward<Func>(func)); + }); + interrupt_cond<InterruptCond>.set(interruption_condition); + INTR_FUT_DEBUG( + "interruptible_future_detail::async() yield back, interrupt_cond: {},{}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + return ret; + } + + template <class FutureT> + static decltype(auto) green_get(FutureT&& fut) { + if (fut.available()) { + return fut.get(); + } else { + // destined to wait! + auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; + INTR_FUT_DEBUG( + "green_get() waiting, interrupt_cond: {},{}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + interrupt_cond<InterruptCond>.reset(); + auto&& value = fut.get(); + interrupt_cond<InterruptCond>.set(interruption_condition); + INTR_FUT_DEBUG( + "green_get() got, interrupt_cond: {},{}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + return std::move(value); + } + } + + static void yield() { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; + INTR_FUT_DEBUG( + "interruptible_future_detail::yield() yielding out, " + "interrupt_cond {},{} cleared", + (void*)interruption_condition.get(), + typeid(InterruptCond).name()); + interrupt_cond<InterruptCond>.reset(); + seastar::thread::yield(); + interrupt_cond<InterruptCond>.set(interruption_condition); + INTR_FUT_DEBUG( + "interruptible_future_detail::yield() yield back, interrupt_cond: {},{}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + } + + static void maybe_yield() { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + if (seastar::thread::should_yield()) { + auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; + INTR_FUT_DEBUG( + "interruptible_future_detail::may_yield() yielding out, " + "interrupt_cond {},{} cleared", + (void*)interruption_condition.get(), + typeid(InterruptCond).name()); + interrupt_cond<InterruptCond>.reset(); + seastar::thread::yield(); + interrupt_cond<InterruptCond>.set(interruption_condition); + INTR_FUT_DEBUG( + "interruptible_future_detail::may_yield() yield back, interrupt_cond: {},{}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + } + } +}; + +} // namespace crimson::interruptible + +namespace seastar { + +template <typename InterruptCond, typename... T> +struct futurize<::crimson::interruptible::interruptible_future_detail< + InterruptCond, seastar::future<T...>>> { + using type = ::crimson::interruptible::interruptible_future_detail< + InterruptCond, seastar::future<T...>>; + + using value_type = typename type::value_type; + using tuple_type = typename type::tuple_type; + + static type from_tuple(tuple_type&& value) { + return type(ready_future_marker(), std::move(value)); + } + static type from_tuple(const tuple_type& value) { + return type(ready_future_marker(), value); + } + static type from_tuple(value_type&& value) { + return type(ready_future_marker(), std::move(value)); + } + static type from_tuple(const value_type& value) { + return type(ready_future_marker(), value); + } + + template <typename Func, typename... FuncArgs> + [[gnu::always_inline]] + static inline type invoke(Func&& func, FuncArgs&&... args) noexcept { + try { + return func(std::forward<FuncArgs>(args)...); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template <typename Func> + [[gnu::always_inline]] + static type invoke(Func&& func, seastar::internal::monostate) noexcept { + try { + return ::seastar::futurize_invoke(std::forward<Func>(func)); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template <typename Arg> + static inline type make_exception_future(Arg&& arg) noexcept { + return seastar::make_exception_future<T...>(std::forward<Arg>(arg)); + } + + static inline type make_exception_future(future_state_base&& state) noexcept { + return seastar::internal::make_exception_future<T...>(std::move(state)); + } + + template<typename PromiseT, typename Func> + static void satisfy_with_result_of(PromiseT&& pr, Func&& func) { + func().forward_to(std::move(pr)); + } +}; + +template <typename InterruptCond, + template <typename...> typename ErroratedFuture, + typename... T> +struct futurize< + ::crimson::interruptible::interruptible_future_detail< + InterruptCond, + ErroratedFuture<::crimson::errorated_future_marker<T...>> + > +> { + using type = ::crimson::interruptible::interruptible_future_detail< + InterruptCond, + ErroratedFuture<::crimson::errorated_future_marker<T...>>>; + using core_type = ErroratedFuture< + ::crimson::errorated_future_marker<T...>>; + using errorator_type = + ::crimson::interruptible::interruptible_errorator< + InterruptCond, + typename ErroratedFuture< + ::crimson::errorated_future_marker<T...>>::errorator_type>; + + template<typename Func, typename... FuncArgs> + static inline type invoke(Func&& func, FuncArgs&&... args) noexcept { + try { + return func(std::forward<FuncArgs>(args)...); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template <typename Func> + [[gnu::always_inline]] + static type invoke(Func&& func, seastar::internal::monostate) noexcept { + try { + return ::seastar::futurize_invoke(std::forward<Func>(func)); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template <typename Arg> + static inline type make_exception_future(Arg&& arg) noexcept { + return core_type::errorator_type::template make_exception_future2<T...>( + std::forward<Arg>(arg)); + } + + template<typename PromiseT, typename Func> + static void satisfy_with_result_of(PromiseT&& pr, Func&& func) { + func().forward_to(std::move(pr)); + } + +}; + +template <typename InterruptCond, typename FutureType> +struct continuation_base_from_future< + ::crimson::interruptible::interruptible_future_detail<InterruptCond, FutureType>> { + using type = typename seastar::continuation_base_from_future<FutureType>::type; +}; + +template <typename InterruptCond, typename FutureType> +struct is_future< + ::crimson::interruptible::interruptible_future_detail< + InterruptCond, + FutureType>> + : std::true_type {}; +} // namespace seastar |