diff options
Diffstat (limited to 'src/common/async')
-rw-r--r-- | src/common/async/bind_handler.h | 111 | ||||
-rw-r--r-- | src/common/async/bind_like.h | 39 | ||||
-rw-r--r-- | src/common/async/blocked_completion.h | 290 | ||||
-rw-r--r-- | src/common/async/completion.h | 320 | ||||
-rw-r--r-- | src/common/async/context_pool.h | 94 | ||||
-rw-r--r-- | src/common/async/detail/shared_lock.h | 185 | ||||
-rw-r--r-- | src/common/async/detail/shared_mutex.h | 326 | ||||
-rw-r--r-- | src/common/async/forward_handler.h | 103 | ||||
-rw-r--r-- | src/common/async/librados_completion.h | 125 | ||||
-rw-r--r-- | src/common/async/shared_mutex.h | 212 | ||||
-rw-r--r-- | src/common/async/waiter.h | 223 | ||||
-rw-r--r-- | src/common/async/yield_context.h | 59 |
12 files changed, 2087 insertions, 0 deletions
diff --git a/src/common/async/bind_handler.h b/src/common/async/bind_handler.h new file mode 100644 index 000000000..516d8a5e8 --- /dev/null +++ b/src/common/async/bind_handler.h @@ -0,0 +1,111 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ASYNC_BIND_HANDLER_H +#define CEPH_ASYNC_BIND_HANDLER_H + +#include <tuple> +#include <boost/asio.hpp> + +namespace ceph::async { + +/** + * A bound completion handler for use with boost::asio. + * + * A completion handler wrapper that allows a tuple of arguments to be forwarded + * to the original Handler. This is intended for use with boost::asio functions + * like defer(), dispatch() and post() which expect handlers which are callable + * with no arguments. + * + * The original Handler's associated allocator and executor are maintained. + * + * @see bind_handler + */ +template <typename Handler, typename Tuple> +struct CompletionHandler { + Handler handler; + Tuple args; + + CompletionHandler(Handler&& handler, Tuple&& args) + : handler(std::move(handler)), + args(std::move(args)) + {} + + void operator()() & { + std::apply(handler, args); + } + void operator()() const & { + std::apply(handler, args); + } + void operator()() && { + std::apply(std::move(handler), std::move(args)); + } + + using allocator_type = boost::asio::associated_allocator_t<Handler>; + allocator_type get_allocator() const noexcept { + return boost::asio::get_associated_allocator(handler); + } +}; + +} // namespace ceph::async + +namespace boost::asio { + +// specialize boost::asio::associated_executor<> for CompletionHandler +template <typename Handler, typename Tuple, typename Executor> +struct associated_executor<ceph::async::CompletionHandler<Handler, Tuple>, Executor> { + using type = boost::asio::associated_executor_t<Handler, Executor>; + + static type get(const ceph::async::CompletionHandler<Handler, Tuple>& handler, + const Executor& ex = Executor()) noexcept { + return boost::asio::get_associated_executor(handler.handler, ex); + } +}; + +} // namespace boost::asio + +namespace ceph::async { + +/** + * Returns a wrapped completion handler with bound arguments. + * + * Binds the given arguments to a handler, and returns a CompletionHandler that + * is callable with no arguments. This is similar to std::bind(), except that + * all arguments must be provided. Move-only argument types are supported as + * long as the CompletionHandler's 'operator() &&' overload is used, i.e. + * std::move(handler)(). + * + * Example use: + * + * // bind the arguments (5, "hello") to a callback lambda: + * auto callback = [] (int a, std::string b) {}; + * auto handler = bind_handler(callback, 5, "hello"); + * + * // execute the bound handler on an io_context: + * boost::asio::io_context context; + * boost::asio::post(context, std::move(handler)); + * context.run(); + * + * @see CompletionHandler + */ +template <typename Handler, typename ...Args> +auto bind_handler(Handler&& h, Args&& ...args) +{ + return CompletionHandler{std::forward<Handler>(h), + std::make_tuple(std::forward<Args>(args)...)}; +} + +} // namespace ceph::async + +#endif // CEPH_ASYNC_BIND_HANDLER_H diff --git a/src/common/async/bind_like.h b/src/common/async/bind_like.h new file mode 100644 index 000000000..c360eac0a --- /dev/null +++ b/src/common/async/bind_like.h @@ -0,0 +1,39 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat <contact@redhat.com> + * Author: Adam C. Emerson + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <boost/asio/associated_allocator.hpp> +#include <boost/asio/associated_executor.hpp> +#include <boost/asio/bind_allocator.hpp> +#include <boost/asio/bind_executor.hpp> + +namespace ceph::async { +template<typename Executor, typename Allocator, typename Completion> +auto bind_ea(const Executor& executor, const Allocator& allocator, + Completion&& completion) { + return bind_allocator(allocator, + boost::asio::bind_executor( + executor, + std::forward<Completion>(completion))); +} + + +// Bind `Completion` to the executor and allocator of `Proto` +template<typename Proto, typename Completion> +auto bind_like(const Proto& proto, Completion&& completion) { + return bind_ea(boost::asio::get_associated_executor(proto), + boost::asio::get_associated_allocator(proto), + std::forward<Completion>(completion)); +} +} diff --git a/src/common/async/blocked_completion.h b/src/common/async/blocked_completion.h new file mode 100644 index 000000000..23a1319bc --- /dev/null +++ b/src/common/async/blocked_completion.h @@ -0,0 +1,290 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat + * Author: Adam C. Emerson <aemerson@redhat.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H +#define CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H + +#include <atomic> +#include <condition_variable> +#include <mutex> +#include <optional> +#include <type_traits> + +#include <boost/asio/async_result.hpp> + +#include <boost/system/error_code.hpp> +#include <boost/system/system_error.hpp> + +namespace ceph::async { + +namespace bs = boost::system; + +class use_blocked_t { + use_blocked_t(bs::error_code* ec) : ec(ec) {} +public: + use_blocked_t() = default; + + use_blocked_t operator [](bs::error_code& _ec) const { + return use_blocked_t(&_ec); + } + + bs::error_code* ec = nullptr; +}; + +inline constexpr use_blocked_t use_blocked; + +namespace detail { + +template<typename... Ts> +struct blocked_handler +{ + blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} + + void operator ()(Ts... values) noexcept { + std::scoped_lock l(*m); + *ec = bs::error_code{}; + *value = std::forward_as_tuple(std::move(values)...); + *done = true; + cv->notify_one(); + } + + void operator ()(bs::error_code ec, Ts... values) noexcept { + std::scoped_lock l(*m); + *this->ec = ec; + *value = std::forward_as_tuple(std::move(values)...); + *done = true; + cv->notify_one(); + } + + bs::error_code* ec; + std::optional<std::tuple<Ts...>>* value = nullptr; + std::mutex* m = nullptr; + std::condition_variable* cv = nullptr; + bool* done = nullptr; +}; + +template<typename T> +struct blocked_handler<T> +{ + blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} + + void operator ()(T value) noexcept { + std::scoped_lock l(*m); + *ec = bs::error_code(); + *this->value = std::move(value); + *done = true; + cv->notify_one(); + } + + void operator ()(bs::error_code ec, T value) noexcept { + std::scoped_lock l(*m); + *this->ec = ec; + *this->value = std::move(value); + *done = true; + cv->notify_one(); + } + + //private: + bs::error_code* ec; + std::optional<T>* value; + std::mutex* m = nullptr; + std::condition_variable* cv = nullptr; + bool* done = nullptr; +}; + +template<> +struct blocked_handler<void> +{ + blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} + + void operator ()() noexcept { + std::scoped_lock l(*m); + *ec = bs::error_code{}; + *done = true; + cv->notify_one(); + } + + void operator ()(bs::error_code ec) noexcept { + std::scoped_lock l(*m); + *this->ec = ec; + *done = true; + cv->notify_one(); + } + + bs::error_code* ec; + std::mutex* m = nullptr; + std::condition_variable* cv = nullptr; + bool* done = nullptr; +}; + +template<typename... Ts> +class blocked_result +{ +public: + using completion_handler_type = blocked_handler<Ts...>; + using return_type = std::tuple<Ts...>; + + explicit blocked_result(completion_handler_type& h) noexcept { + std::scoped_lock l(m); + out_ec = h.ec; + if (!out_ec) h.ec = &ec; + h.value = &value; + h.m = &m; + h.cv = &cv; + h.done = &done; + } + + return_type get() { + std::unique_lock l(m); + cv.wait(l, [this]() { return done; }); + if (!out_ec && ec) throw bs::system_error(ec); + return std::move(*value); + } + + blocked_result(const blocked_result&) = delete; + blocked_result& operator =(const blocked_result&) = delete; + blocked_result(blocked_result&&) = delete; + blocked_result& operator =(blocked_result&&) = delete; + +private: + bs::error_code* out_ec; + bs::error_code ec; + std::optional<return_type> value; + std::mutex m; + std::condition_variable cv; + bool done = false; +}; + +template<typename T> +class blocked_result<T> +{ +public: + using completion_handler_type = blocked_handler<T>; + using return_type = T; + + explicit blocked_result(completion_handler_type& h) noexcept { + std::scoped_lock l(m); + out_ec = h.ec; + if (!out_ec) h.ec = &ec; + h.value = &value; + h.m = &m; + h.cv = &cv; + h.done = &done; + } + + return_type get() { + std::unique_lock l(m); + cv.wait(l, [this]() { return done; }); + if (!out_ec && ec) throw bs::system_error(ec); + return std::move(*value); + } + + blocked_result(const blocked_result&) = delete; + blocked_result& operator =(const blocked_result&) = delete; + blocked_result(blocked_result&&) = delete; + blocked_result& operator =(blocked_result&&) = delete; + +private: + bs::error_code* out_ec; + bs::error_code ec; + std::optional<return_type> value; + std::mutex m; + std::condition_variable cv; + bool done = false; +}; + +template<> +class blocked_result<void> +{ +public: + using completion_handler_type = blocked_handler<void>; + using return_type = void; + + explicit blocked_result(completion_handler_type& h) noexcept { + std::scoped_lock l(m); + out_ec = h.ec; + if (!out_ec) h.ec = &ec; + h.m = &m; + h.cv = &cv; + h.done = &done; + } + + void get() { + std::unique_lock l(m); + cv.wait(l, [this]() { return done; }); + if (!out_ec && ec) throw bs::system_error(ec); + } + + blocked_result(const blocked_result&) = delete; + blocked_result& operator =(const blocked_result&) = delete; + blocked_result(blocked_result&&) = delete; + blocked_result& operator =(blocked_result&&) = delete; + +private: + bs::error_code* out_ec; + bs::error_code ec; + std::mutex m; + std::condition_variable cv; + bool done = false; +}; +} // namespace detail +} // namespace ceph::async + + +namespace boost::asio { +template<typename ReturnType> +class async_result<ceph::async::use_blocked_t, ReturnType()> + : public ceph::async::detail::blocked_result<void> +{ +public: + explicit async_result(typename ceph::async::detail::blocked_result<void> + ::completion_handler_type& h) + : ceph::async::detail::blocked_result<void>(h) {} +}; + +template<typename ReturnType, typename... Args> +class async_result<ceph::async::use_blocked_t, ReturnType(Args...)> + : public ceph::async::detail::blocked_result<std::decay_t<Args>...> +{ +public: + explicit async_result( + typename ceph::async::detail::blocked_result<std::decay_t<Args>...>::completion_handler_type& h) + : ceph::async::detail::blocked_result<std::decay_t<Args>...>(h) {} +}; + +template<typename ReturnType> +class async_result<ceph::async::use_blocked_t, + ReturnType(boost::system::error_code)> + : public ceph::async::detail::blocked_result<void> +{ +public: + explicit async_result( + typename ceph::async::detail::blocked_result<void>::completion_handler_type& h) + : ceph::async::detail::blocked_result<void>(h) {} +}; + +template<typename ReturnType, typename... Args> +class async_result<ceph::async::use_blocked_t, + ReturnType(boost::system::error_code, Args...)> + : public ceph::async::detail::blocked_result<std::decay_t<Args>...> +{ +public: + explicit async_result( + typename ceph::async::detail::blocked_result<std::decay_t<Args>...>::completion_handler_type& h) + : ceph::async::detail::blocked_result<std::decay_t<Args>...>(h) {} +}; +} + +#endif // !CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H diff --git a/src/common/async/completion.h b/src/common/async/completion.h new file mode 100644 index 000000000..6af9109d5 --- /dev/null +++ b/src/common/async/completion.h @@ -0,0 +1,320 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ASYNC_COMPLETION_H +#define CEPH_ASYNC_COMPLETION_H + +#include <memory> + +#include "bind_handler.h" +#include "forward_handler.h" + +namespace ceph::async { + +/** + * Abstract completion handler interface for use with boost::asio. + * + * Memory management is performed using the Handler's 'associated allocator', + * which carries the additional requirement that its memory be released before + * the Handler is invoked. This allows memory allocated for one asynchronous + * operation to be reused in its continuation. Because of this requirement, any + * calls to invoke the completion must first release ownership of it. To enforce + * this, the static functions defer()/dispatch()/post() take the completion by + * rvalue-reference to std::unique_ptr<Completion>, i.e. std::move(completion). + * + * Handlers may also have an 'associated executor', so the calls to defer(), + * dispatch(), and post() are forwarded to that executor. If there is no + * associated executor (which is generally the case unless one was bound with + * boost::asio::bind_executor()), the executor passed to Completion::create() + * is used as a default. + * + * Example use: + * + * // declare a Completion type with Signature = void(int, string) + * using MyCompletion = ceph::async::Completion<void(int, string)>; + * + * // create a completion with the given callback: + * std::unique_ptr<MyCompletion> c; + * c = MyCompletion::create(ex, [] (int a, const string& b) {}); + * + * // bind arguments to the callback and post to its associated executor: + * MyCompletion::post(std::move(c), 5, "hello"); + * + * + * Additional user data may be stored along with the Completion to take + * advantage of the handler allocator optimization. This is accomplished by + * specifying its type in the template parameter T. For example, the type + * Completion<void(), int> contains a public member variable 'int user_data'. + * Any additional arguments to Completion::create() will be forwarded to type + * T's constructor. + * + * If the AsBase<T> type tag is used, as in Completion<void(), AsBase<T>>, + * the Completion will inherit from T instead of declaring it as a member + * variable. + * + * When invoking the completion handler via defer(), dispatch(), or post(), + * care must be taken when passing arguments that refer to user data, because + * its memory is destroyed prior to invocation. In such cases, the user data + * should be moved/copied out of the Completion first. + */ +template <typename Signature, typename T = void> +class Completion; + + +/// type tag for UserData +template <typename T> struct AsBase {}; + +namespace detail { + +/// optional user data to be stored with the Completion +template <typename T> +struct UserData { + T user_data; + template <typename ...Args> + UserData(Args&& ...args) + : user_data(std::forward<Args>(args)...) + {} +}; +// AsBase specialization inherits from T +template <typename T> +struct UserData<AsBase<T>> : public T { + template <typename ...Args> + UserData(Args&& ...args) + : T(std::forward<Args>(args)...) + {} +}; +// void specialization +template <> +class UserData<void> {}; + +} // namespace detail + + +// template specialization to pull the Signature's args apart +template <typename T, typename ...Args> +class Completion<void(Args...), T> : public detail::UserData<T> { + protected: + // internal interfaces for type-erasure on the Handler/Executor. uses + // tuple<Args...> to provide perfect forwarding because you can't make + // virtual function templates + virtual void destroy_defer(std::tuple<Args...>&& args) = 0; + virtual void destroy_dispatch(std::tuple<Args...>&& args) = 0; + virtual void destroy_post(std::tuple<Args...>&& args) = 0; + virtual void destroy() = 0; + + // constructor is protected, use create(). any constructor arguments are + // forwarded to UserData + template <typename ...TArgs> + Completion(TArgs&& ...args) + : detail::UserData<T>(std::forward<TArgs>(args)...) + {} + public: + virtual ~Completion() = default; + + // use the virtual destroy() interface on delete. this allows the derived + // class to manage its memory using Handler allocators, without having to use + // a custom Deleter for std::unique_ptr<> + static void operator delete(void *p) { + static_cast<Completion*>(p)->destroy(); + } + + /// completion factory function that uses the handler's associated allocator. + /// any additional arguments are forwared to T's constructor + template <typename Executor1, typename Handler, typename ...TArgs> + static std::unique_ptr<Completion> + create(const Executor1& ex1, Handler&& handler, TArgs&& ...args); + + /// take ownership of the completion, bind any arguments to the completion + /// handler, then defer() it on its associated executor + template <typename ...Args2> + static void defer(std::unique_ptr<Completion>&& c, Args2&&...args); + + /// take ownership of the completion, bind any arguments to the completion + /// handler, then dispatch() it on its associated executor + template <typename ...Args2> + static void dispatch(std::unique_ptr<Completion>&& c, Args2&&...args); + + /// take ownership of the completion, bind any arguments to the completion + /// handler, then post() it to its associated executor + template <typename ...Args2> + static void post(std::unique_ptr<Completion>&& c, Args2&&...args); +}; + +namespace detail { + +// concrete Completion that knows how to invoke the completion handler. this +// observes all of the 'Requirements on asynchronous operations' specified by +// the C++ Networking TS +template <typename Executor1, typename Handler, typename T, typename ...Args> +class CompletionImpl final : public Completion<void(Args...), T> { + // use Handler's associated executor (or Executor1 by default) for callbacks + using Executor2 = boost::asio::associated_executor_t<Handler, Executor1>; + // maintain work on both executors + using Work1 = boost::asio::executor_work_guard<Executor1>; + using Work2 = boost::asio::executor_work_guard<Executor2>; + std::pair<Work1, Work2> work; + Handler handler; + + // use Handler's associated allocator + using Alloc2 = boost::asio::associated_allocator_t<Handler>; + using Traits2 = std::allocator_traits<Alloc2>; + using RebindAlloc2 = typename Traits2::template rebind_alloc<CompletionImpl>; + using RebindTraits2 = std::allocator_traits<RebindAlloc2>; + + // placement new for the handler allocator + static void* operator new(size_t, RebindAlloc2 alloc2) { + return RebindTraits2::allocate(alloc2, 1); + } + // placement delete for when the constructor throws during placement new + static void operator delete(void *p, RebindAlloc2 alloc2) { + RebindTraits2::deallocate(alloc2, static_cast<CompletionImpl*>(p), 1); + } + + static auto bind_and_forward(Handler&& h, std::tuple<Args...>&& args) { + return forward_handler(CompletionHandler{std::move(h), std::move(args)}); + } + + void destroy_defer(std::tuple<Args...>&& args) override { + auto w = std::move(work); + auto f = bind_and_forward(std::move(handler), std::move(args)); + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + w.second.get_executor().defer(std::move(f), alloc2); + } + void destroy_dispatch(std::tuple<Args...>&& args) override { + auto w = std::move(work); + auto f = bind_and_forward(std::move(handler), std::move(args)); + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + w.second.get_executor().dispatch(std::move(f), alloc2); + } + void destroy_post(std::tuple<Args...>&& args) override { + auto w = std::move(work); + auto f = bind_and_forward(std::move(handler), std::move(args)); + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + w.second.get_executor().post(std::move(f), alloc2); + } + void destroy() override { + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + } + + // constructor is private, use create(). extra constructor arguments are + // forwarded to UserData + template <typename ...TArgs> + CompletionImpl(const Executor1& ex1, Handler&& handler, TArgs&& ...args) + : Completion<void(Args...), T>(std::forward<TArgs>(args)...), + work(ex1, boost::asio::make_work_guard(handler, ex1)), + handler(std::move(handler)) + {} + + public: + template <typename ...TArgs> + static auto create(const Executor1& ex, Handler&& handler, TArgs&& ...args) { + auto alloc2 = boost::asio::get_associated_allocator(handler); + using Ptr = std::unique_ptr<CompletionImpl>; + return Ptr{new (alloc2) CompletionImpl(ex, std::move(handler), + std::forward<TArgs>(args)...)}; + } + + static void operator delete(void *p) { + static_cast<CompletionImpl*>(p)->destroy(); + } +}; + +} // namespace detail + + +template <typename T, typename ...Args> +template <typename Executor1, typename Handler, typename ...TArgs> +std::unique_ptr<Completion<void(Args...), T>> +Completion<void(Args...), T>::create(const Executor1& ex, + Handler&& handler, TArgs&& ...args) +{ + using Impl = detail::CompletionImpl<Executor1, Handler, T, Args...>; + return Impl::create(ex, std::forward<Handler>(handler), + std::forward<TArgs>(args)...); +} + +template <typename T, typename ...Args> +template <typename ...Args2> +void Completion<void(Args...), T>::defer(std::unique_ptr<Completion>&& ptr, + Args2&& ...args) +{ + auto c = ptr.release(); + c->destroy_defer(std::make_tuple(std::forward<Args2>(args)...)); +} + +template <typename T, typename ...Args> +template <typename ...Args2> +void Completion<void(Args...), T>::dispatch(std::unique_ptr<Completion>&& ptr, + Args2&& ...args) +{ + auto c = ptr.release(); + c->destroy_dispatch(std::make_tuple(std::forward<Args2>(args)...)); +} + +template <typename T, typename ...Args> +template <typename ...Args2> +void Completion<void(Args...), T>::post(std::unique_ptr<Completion>&& ptr, + Args2&& ...args) +{ + auto c = ptr.release(); + c->destroy_post(std::make_tuple(std::forward<Args2>(args)...)); +} + + +/// completion factory function that uses the handler's associated allocator. +/// any additional arguments are forwared to T's constructor +template <typename Signature, typename T, typename Executor1, + typename Handler, typename ...TArgs> +std::unique_ptr<Completion<Signature, T>> +create_completion(const Executor1& ex, Handler&& handler, TArgs&& ...args) +{ + return Completion<Signature, T>::create(ex, std::forward<Handler>(handler), + std::forward<TArgs>(args)...); +} + +/// take ownership of the completion, bind any arguments to the completion +/// handler, then defer() it on its associated executor +template <typename Signature, typename T, typename ...Args> +void defer(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args) +{ + Completion<Signature, T>::defer(std::move(ptr), std::forward<Args>(args)...); +} + +/// take ownership of the completion, bind any arguments to the completion +/// handler, then dispatch() it on its associated executor +template <typename Signature, typename T, typename ...Args> +void dispatch(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args) +{ + Completion<Signature, T>::dispatch(std::move(ptr), std::forward<Args>(args)...); +} + +/// take ownership of the completion, bind any arguments to the completion +/// handler, then post() it to its associated executor +template <typename Signature, typename T, typename ...Args> +void post(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args) +{ + Completion<Signature, T>::post(std::move(ptr), std::forward<Args>(args)...); +} + +} // namespace ceph::async + +#endif // CEPH_ASYNC_COMPLETION_H diff --git a/src/common/async/context_pool.h b/src/common/async/context_pool.h new file mode 100644 index 000000000..9c6cab767 --- /dev/null +++ b/src/common/async/context_pool.h @@ -0,0 +1,94 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat <contact@redhat.com> + * Author: Adam C. Emerson <aemerson@redhat.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMMON_ASYNC_CONTEXT_POOL_H +#define CEPH_COMMON_ASYNC_CONTEXT_POOL_H + +#include <cstddef> +#include <cstdint> +#include <mutex> +#include <optional> +#include <thread> +#include <vector> + +#include <boost/asio/io_context.hpp> +#include <boost/asio/executor_work_guard.hpp> + +#include "common/ceph_mutex.h" +#include "common/Thread.h" + +namespace ceph::async { +class io_context_pool { + std::vector<std::thread> threadvec; + boost::asio::io_context ioctx; + std::optional<boost::asio::executor_work_guard< + boost::asio::io_context::executor_type>> guard; + ceph::mutex m = make_mutex("ceph::io_context_pool::m"); + + void cleanup() noexcept { + guard = std::nullopt; + for (auto& th : threadvec) { + th.join(); + } + threadvec.clear(); + } +public: + io_context_pool() noexcept {} + io_context_pool(std::int16_t threadcnt) noexcept { + start(threadcnt); + } + ~io_context_pool() { + stop(); + } + void start(std::int16_t threadcnt) noexcept { + auto l = std::scoped_lock(m); + if (threadvec.empty()) { + guard.emplace(boost::asio::make_work_guard(ioctx)); + ioctx.restart(); + for (std::int16_t i = 0; i < threadcnt; ++i) { + threadvec.emplace_back(make_named_thread("io_context_pool", + [this]() { + ioctx.run(); + })); + } + } + } + void finish() noexcept { + auto l = std::scoped_lock(m); + if (!threadvec.empty()) { + cleanup(); + } + } + void stop() noexcept { + auto l = std::scoped_lock(m); + if (!threadvec.empty()) { + ioctx.stop(); + cleanup(); + } + } + + boost::asio::io_context& get_io_context() { + return ioctx; + } + operator boost::asio::io_context&() { + return ioctx; + } + boost::asio::io_context::executor_type get_executor() { + return ioctx.get_executor(); + } +}; +} + +#endif // CEPH_COMMON_ASYNC_CONTEXT_POOL_H diff --git a/src/common/async/detail/shared_lock.h b/src/common/async/detail/shared_lock.h new file mode 100644 index 000000000..12e6a9220 --- /dev/null +++ b/src/common/async/detail/shared_lock.h @@ -0,0 +1,185 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +namespace std { + +// specialize unique_lock and shared_lock for SharedMutex to operate on +// SharedMutexImpl instead, because the locks may outlive the SharedMutex itself + +template <typename Executor> +class unique_lock<ceph::async::SharedMutex<Executor>> { + public: + using mutex_type = boost::intrusive_ptr<ceph::async::detail::SharedMutexImpl>; + + unique_lock() = default; + explicit unique_lock(ceph::async::SharedMutex<Executor>& m) + : impl(m.impl), locked(true) + { + impl->lock(); + } + unique_lock(ceph::async::SharedMutex<Executor>& m, defer_lock_t t) noexcept + : impl(m.impl) + {} + unique_lock(ceph::async::SharedMutex<Executor>& m, try_to_lock_t t) + : impl(m.impl), locked(impl->try_lock()) + {} + unique_lock(ceph::async::SharedMutex<Executor>& m, adopt_lock_t t) noexcept + : impl(m.impl), locked(true) + {} + ~unique_lock() { + if (impl && locked) + impl->unlock(); + } + + unique_lock(unique_lock&& other) noexcept + : impl(std::move(other.impl)), + locked(other.locked) { + other.locked = false; + } + unique_lock& operator=(unique_lock&& other) noexcept { + if (impl && locked) { + impl->unlock(); + } + impl = std::move(other.impl); + locked = other.locked; + other.locked = false; + return *this; + } + void swap(unique_lock& other) noexcept { + using std::swap; + swap(impl, other.impl); + swap(locked, other.locked); + } + + mutex_type mutex() const noexcept { return impl; } + bool owns_lock() const noexcept { return impl && locked; } + explicit operator bool() const noexcept { return impl && locked; } + + mutex_type release() { + auto result = std::move(impl); + locked = false; + return result; + } + + void lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + impl->lock(); + locked = true; + } + bool try_lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + return locked = impl->try_lock(); + } + void unlock() { + if (!impl || !locked) + throw system_error(make_error_code(errc::operation_not_permitted)); + impl->unlock(); + locked = false; + } + private: + mutex_type impl; + bool locked{false}; +}; + +template <typename Executor> +class shared_lock<ceph::async::SharedMutex<Executor>> { + public: + using mutex_type = boost::intrusive_ptr<ceph::async::detail::SharedMutexImpl>; + + shared_lock() = default; + explicit shared_lock(ceph::async::SharedMutex<Executor>& m) + : impl(m.impl), locked(true) + { + impl->lock_shared(); + } + shared_lock(ceph::async::SharedMutex<Executor>& m, defer_lock_t t) noexcept + : impl(m.impl) + {} + shared_lock(ceph::async::SharedMutex<Executor>& m, try_to_lock_t t) + : impl(m.impl), locked(impl->try_lock_shared()) + {} + shared_lock(ceph::async::SharedMutex<Executor>& m, adopt_lock_t t) noexcept + : impl(m.impl), locked(true) + {} + + ~shared_lock() { + if (impl && locked) + impl->unlock_shared(); + } + + shared_lock(shared_lock&& other) noexcept + : impl(std::move(other.impl)), + locked(other.locked) { + other.locked = false; + } + shared_lock& operator=(shared_lock&& other) noexcept { + if (impl && locked) { + impl->unlock_shared(); + } + impl = std::move(other.impl); + locked = other.locked; + other.locked = false; + return *this; + } + void swap(shared_lock& other) noexcept { + using std::swap; + swap(impl, other.impl); + swap(locked, other.locked); + } + + mutex_type mutex() const noexcept { return impl; } + bool owns_lock() const noexcept { return impl && locked; } + explicit operator bool() const noexcept { return impl && locked; } + + mutex_type release() { + auto result = std::move(impl); + locked = false; + return result; + } + + void lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + impl->lock_shared(); + locked = true; + } + bool try_lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + return locked = impl->try_lock_shared(); + } + void unlock() { + if (!impl || !locked) + throw system_error(make_error_code(errc::operation_not_permitted)); + impl->unlock_shared(); + locked = false; + } + private: + mutex_type impl; + bool locked{false}; +}; + +} // namespace std diff --git a/src/common/async/detail/shared_mutex.h b/src/common/async/detail/shared_mutex.h new file mode 100644 index 000000000..8e5436350 --- /dev/null +++ b/src/common/async/detail/shared_mutex.h @@ -0,0 +1,326 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <condition_variable> +#include <mutex> +#include <optional> +#include <shared_mutex> // for std::shared_lock + +#include <boost/smart_ptr/intrusive_ref_counter.hpp> +#include <boost/intrusive_ptr.hpp> +#include <boost/intrusive/list.hpp> + +#include "include/ceph_assert.h" + +#include "common/async/completion.h" + +namespace ceph::async::detail { + +struct LockRequest : public boost::intrusive::list_base_hook<> { + virtual ~LockRequest() {} + virtual void complete(boost::system::error_code ec) = 0; + virtual void destroy() = 0; +}; + +class SharedMutexImpl : public boost::intrusive_ref_counter<SharedMutexImpl> { + public: + ~SharedMutexImpl(); + + template <typename Mutex, typename CompletionToken> + auto async_lock(Mutex& mtx, CompletionToken&& token); + void lock(); + void lock(boost::system::error_code& ec); + bool try_lock(); + void unlock(); + template <typename Mutex, typename CompletionToken> + auto async_lock_shared(Mutex& mtx, CompletionToken&& token); + void lock_shared(); + void lock_shared(boost::system::error_code& ec); + bool try_lock_shared(); + void unlock_shared(); + void cancel(); + + private: + using RequestList = boost::intrusive::list<LockRequest>; + + RequestList shared_queue; //< requests waiting on a shared lock + RequestList exclusive_queue; //< requests waiting on an exclusive lock + + /// lock state encodes the number of shared lockers, or 'max' for exclusive + using LockState = uint16_t; + static constexpr LockState Unlocked = 0; + static constexpr LockState Exclusive = std::numeric_limits<LockState>::max(); + static constexpr LockState MaxShared = Exclusive - 1; + LockState state = Unlocked; //< current lock state + + std::mutex mutex; //< protects lock state and wait queues + + void complete(RequestList&& requests, boost::system::error_code ec); +}; + +// sync requests live on the stack and wait on a condition variable +class SyncRequest : public LockRequest { + std::condition_variable cond; + std::optional<boost::system::error_code> ec; + public: + boost::system::error_code wait(std::unique_lock<std::mutex>& lock) { + // return the error code once its been set + cond.wait(lock, [this] { return ec; }); + return *ec; + } + void complete(boost::system::error_code ec) override { + this->ec = ec; + cond.notify_one(); + } + void destroy() override { + // nothing, SyncRequests live on the stack + } +}; + +// async requests use async::Completion to invoke a handler on its executor +template <typename Mutex, template <typename> typename Lock> +class AsyncRequest : public LockRequest { + Mutex& mutex; //< mutex argument for lock guard + public: + explicit AsyncRequest(Mutex& mutex) : mutex(mutex) {} + + using Signature = void(boost::system::error_code, Lock<Mutex>); + using LockCompletion = Completion<Signature, AsBase<AsyncRequest>>; + + void complete(boost::system::error_code ec) override { + auto r = static_cast<LockCompletion*>(this); + // pass ownership of ourselves to post(). on error, pass an empty lock + post(std::unique_ptr<LockCompletion>{r}, ec, + ec ? Lock{mutex, std::defer_lock} : Lock{mutex, std::adopt_lock}); + } + void destroy() override { + delete static_cast<LockCompletion*>(this); + } +}; + +inline SharedMutexImpl::~SharedMutexImpl() +{ + ceph_assert(state == Unlocked); + ceph_assert(shared_queue.empty()); + ceph_assert(exclusive_queue.empty()); +} + +template <typename Mutex, typename CompletionToken> +auto SharedMutexImpl::async_lock(Mutex& mtx, CompletionToken&& token) +{ + using Request = AsyncRequest<Mutex, std::unique_lock>; + using Signature = typename Request::Signature; + boost::asio::async_completion<CompletionToken, Signature> init(token); + auto& handler = init.completion_handler; + auto ex1 = mtx.get_executor(); + { + std::lock_guard lock{mutex}; + + boost::system::error_code ec; + if (state == Unlocked) { + state = Exclusive; + + // post a successful completion + auto ex2 = boost::asio::get_associated_executor(handler, ex1); + auto alloc2 = boost::asio::get_associated_allocator(handler); + auto b = bind_handler(std::move(handler), ec, + std::unique_lock{mtx, std::adopt_lock}); + ex2.post(forward_handler(std::move(b)), alloc2); + } else { + // create a request and add it to the exclusive list + using LockCompletion = typename Request::LockCompletion; + auto request = LockCompletion::create(ex1, std::move(handler), mtx); + exclusive_queue.push_back(*request.release()); + } + } + return init.result.get(); +} + +inline void SharedMutexImpl::lock() +{ + boost::system::error_code ec; + lock(ec); + if (ec) { + throw boost::system::system_error(ec); + } +} + +void SharedMutexImpl::lock(boost::system::error_code& ec) +{ + std::unique_lock lock{mutex}; + + if (state == Unlocked) { + state = Exclusive; + ec.clear(); + } else { + SyncRequest request; + exclusive_queue.push_back(request); + ec = request.wait(lock); + } +} + +inline bool SharedMutexImpl::try_lock() +{ + std::lock_guard lock{mutex}; + + if (state == Unlocked) { + state = Exclusive; + return true; + } + return false; +} + +void SharedMutexImpl::unlock() +{ + RequestList granted; + { + std::lock_guard lock{mutex}; + ceph_assert(state == Exclusive); + + if (!exclusive_queue.empty()) { + // grant next exclusive lock + auto& request = exclusive_queue.front(); + exclusive_queue.pop_front(); + granted.push_back(request); + } else { + // grant shared locks, if any + state = shared_queue.size(); + if (state > MaxShared) { + state = MaxShared; + auto end = std::next(shared_queue.begin(), MaxShared); + granted.splice(granted.end(), shared_queue, + shared_queue.begin(), end, MaxShared); + } else { + granted.splice(granted.end(), shared_queue); + } + } + } + complete(std::move(granted), boost::system::error_code{}); +} + +template <typename Mutex, typename CompletionToken> +auto SharedMutexImpl::async_lock_shared(Mutex& mtx, CompletionToken&& token) +{ + using Request = AsyncRequest<Mutex, std::shared_lock>; + using Signature = typename Request::Signature; + boost::asio::async_completion<CompletionToken, Signature> init(token); + auto& handler = init.completion_handler; + auto ex1 = mtx.get_executor(); + { + std::lock_guard lock{mutex}; + + boost::system::error_code ec; + if (exclusive_queue.empty() && state < MaxShared) { + state++; + + auto ex2 = boost::asio::get_associated_executor(handler, ex1); + auto alloc2 = boost::asio::get_associated_allocator(handler); + auto b = bind_handler(std::move(handler), ec, + std::shared_lock{mtx, std::adopt_lock}); + ex2.post(forward_handler(std::move(b)), alloc2); + } else { + using LockCompletion = typename Request::LockCompletion; + auto request = LockCompletion::create(ex1, std::move(handler), mtx); + shared_queue.push_back(*request.release()); + } + } + return init.result.get(); +} + +inline void SharedMutexImpl::lock_shared() +{ + boost::system::error_code ec; + lock_shared(ec); + if (ec) { + throw boost::system::system_error(ec); + } +} + +void SharedMutexImpl::lock_shared(boost::system::error_code& ec) +{ + std::unique_lock lock{mutex}; + + if (exclusive_queue.empty() && state < MaxShared) { + state++; + ec.clear(); + } else { + SyncRequest request; + shared_queue.push_back(request); + ec = request.wait(lock); + } +} + +inline bool SharedMutexImpl::try_lock_shared() +{ + std::lock_guard lock{mutex}; + + if (exclusive_queue.empty() && state < MaxShared) { + state++; + return true; + } + return false; +} + +inline void SharedMutexImpl::unlock_shared() +{ + std::lock_guard lock{mutex}; + ceph_assert(state != Unlocked && state <= MaxShared); + + if (state == 1 && !exclusive_queue.empty()) { + // grant next exclusive lock + state = Exclusive; + auto& request = exclusive_queue.front(); + exclusive_queue.pop_front(); + request.complete(boost::system::error_code{}); + } else if (state == MaxShared && !shared_queue.empty() && + exclusive_queue.empty()) { + // grant next shared lock + auto& request = shared_queue.front(); + shared_queue.pop_front(); + request.complete(boost::system::error_code{}); + } else { + state--; + } +} + +inline void SharedMutexImpl::cancel() +{ + RequestList canceled; + { + std::lock_guard lock{mutex}; + canceled.splice(canceled.end(), shared_queue); + canceled.splice(canceled.end(), exclusive_queue); + } + complete(std::move(canceled), boost::asio::error::operation_aborted); +} + +void SharedMutexImpl::complete(RequestList&& requests, + boost::system::error_code ec) +{ + while (!requests.empty()) { + auto& request = requests.front(); + requests.pop_front(); + try { + request.complete(ec); + } catch (...) { + // clean up any remaining completions and rethrow + requests.clear_and_dispose([] (LockRequest *r) { r->destroy(); }); + throw; + } + } +} + +} // namespace ceph::async::detail diff --git a/src/common/async/forward_handler.h b/src/common/async/forward_handler.h new file mode 100644 index 000000000..ae88cc83f --- /dev/null +++ b/src/common/async/forward_handler.h @@ -0,0 +1,103 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ASYNC_FORWARD_HANDLER_H +#define CEPH_ASYNC_FORWARD_HANDLER_H + +#include <boost/asio.hpp> + +namespace ceph::async { + +/** + * A forwarding completion handler for use with boost::asio. + * + * A completion handler wrapper that invokes the handler's operator() as an + * rvalue, regardless of whether the wrapper is invoked as an lvalue or rvalue. + * This operation is potentially destructive to the wrapped handler, so is only + * suitable for single-use handlers. + * + * This is useful when combined with bind_handler() and move-only arguments, + * because executors will always call the lvalue overload of operator(). + * + * The original Handler's associated allocator and executor are maintained. + * + * @see forward_handler + */ +template <typename Handler> +struct ForwardingHandler { + Handler handler; + + ForwardingHandler(Handler&& handler) + : handler(std::move(handler)) + {} + + template <typename ...Args> + void operator()(Args&& ...args) { + std::move(handler)(std::forward<Args>(args)...); + } + + using allocator_type = boost::asio::associated_allocator_t<Handler>; + allocator_type get_allocator() const noexcept { + return boost::asio::get_associated_allocator(handler); + } +}; + +} // namespace ceph::async + +namespace boost::asio { + +// specialize boost::asio::associated_executor<> for ForwardingHandler +template <typename Handler, typename Executor> +struct associated_executor<ceph::async::ForwardingHandler<Handler>, Executor> { + using type = boost::asio::associated_executor_t<Handler, Executor>; + + static type get(const ceph::async::ForwardingHandler<Handler>& handler, + const Executor& ex = Executor()) noexcept { + return boost::asio::get_associated_executor(handler.handler, ex); + } +}; + +} // namespace boost::asio + +namespace ceph::async { + +/** + * Returns a single-use completion handler that always forwards on operator(). + * + * Wraps a completion handler such that it is always invoked as an rvalue. This + * is necessary when combining executors and bind_handler() with move-only + * argument types. + * + * Example use: + * + * auto callback = [] (std::unique_ptr<int>&& p) {}; + * auto bound_handler = bind_handler(callback, std::make_unique<int>(5)); + * auro handler = forward_handler(std::move(bound_handler)); + * + * // execute the forwarding handler on an io_context: + * boost::asio::io_context context; + * boost::asio::post(context, std::move(handler)); + * context.run(); + * + * @see ForwardingHandler + */ +template <typename Handler> +auto forward_handler(Handler&& h) +{ + return ForwardingHandler{std::forward<Handler>(h)}; +} + +} // namespace ceph::async + +#endif // CEPH_ASYNC_FORWARD_HANDLER_H diff --git a/src/common/async/librados_completion.h b/src/common/async/librados_completion.h new file mode 100644 index 000000000..2fa5555e7 --- /dev/null +++ b/src/common/async/librados_completion.h @@ -0,0 +1,125 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat + * Author: Adam C. Emerson <aemerson@redhat.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMMON_ASYNC_LIBRADOS_COMPLETION_H +#define CEPH_COMMON_ASYNC_LIBRADOS_COMPLETION_H + +#include <atomic> +#include <condition_variable> +#include <mutex> +#include <optional> +#include <type_traits> + +#include <boost/asio/async_result.hpp> + +#include <boost/system/error_code.hpp> +#include <boost/system/system_error.hpp> + +#include "include/rados/librados.hpp" +#include "librados/AioCompletionImpl.h" + +// Allow librados::AioCompletion to be provided as a completion +// handler. This is only allowed with a signature of +// (boost::system::error_code) or (). On completion the AioCompletion +// is completed with the error_code converted to an int with +// ceph::from_error_code. +// +// async_result::return_type is void. + +namespace ceph::async { + +namespace bs = boost::system; +namespace lr = librados; + +namespace detail { + +struct librados_handler { + lr::AioCompletionImpl* pc; + + explicit librados_handler(lr::AioCompletion* c) : pc(c->pc) { + pc->get(); + } + ~librados_handler() { + if (pc) { + pc->put(); + pc = nullptr; + } + } + + librados_handler(const librados_handler&) = delete; + librados_handler& operator =(const librados_handler&) = delete; + librados_handler(librados_handler&& rhs) { + pc = rhs.pc; + rhs.pc = nullptr; + } + + void operator()(bs::error_code ec) { + pc->lock.lock(); + pc->rval = ceph::from_error_code(ec); + pc->complete = true; + pc->lock.unlock(); + + auto cb_complete = pc->callback_complete; + auto cb_complete_arg = pc->callback_complete_arg; + if (cb_complete) + cb_complete(pc, cb_complete_arg); + + auto cb_safe = pc->callback_safe; + auto cb_safe_arg = pc->callback_safe_arg; + if (cb_safe) + cb_safe(pc, cb_safe_arg); + + pc->lock.lock(); + pc->callback_complete = NULL; + pc->callback_safe = NULL; + pc->cond.notify_all(); + pc->put_unlock(); + pc = nullptr; + } + + void operator ()() { + (*this)(bs::error_code{}); + } +}; +} // namespace detail +} // namespace ceph::async + + +namespace boost::asio { +template<typename ReturnType> +class async_result<librados::AioCompletion*, ReturnType()> { +public: + using completion_handler_type = ceph::async::detail::librados_handler; + explicit async_result(completion_handler_type&) {}; + using return_type = void; + void get() { + return; + } +}; + +template<typename ReturnType> +class async_result<librados::AioCompletion*, + ReturnType(boost::system::error_code)> { +public: + using completion_handler_type = ceph::async::detail::librados_handler; + explicit async_result(completion_handler_type&) {}; + using return_type = void; + void get() { + return; + } +}; +} + +#endif // !CEPH_COMMON_ASYNC_LIBRADOS_COMPLETION_H diff --git a/src/common/async/shared_mutex.h b/src/common/async/shared_mutex.h new file mode 100644 index 000000000..3e471a4df --- /dev/null +++ b/src/common/async/shared_mutex.h @@ -0,0 +1,212 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include "common/async/detail/shared_mutex.h" + +namespace ceph::async { + +/** + * An asynchronous shared mutex for use with boost::asio. + * + * A shared mutex class with asynchronous lock operations that complete on a + * boost::asio executor. The class also has synchronous interfaces that meet + * most of the standard library's requirements for the SharedMutex concept, + * which makes it compatible with lock_guard, unique_lock, and shared_lock. + * + * All lock requests can fail with operation_aborted on cancel() or destruction. + * The non-error_code overloads of lock() and lock_shared() will throw this + * error as an exception of type boost::system::system_error. + * + * Exclusive locks are prioritized over shared locks. Locks of the same type + * are granted in fifo order. The implementation defines a limit on the number + * of shared locks to 65534 at a time. + * + * Example use: + * + * boost::asio::io_context context; + * SharedMutex mutex{context.get_executor()}; + * + * mutex.async_lock([&] (boost::system::error_code ec, auto lock) { + * if (!ec) { + * // mutate shared state ... + * } + * }); + * mutex.async_lock_shared([&] (boost::system::error_code ec, auto lock) { + * if (!ec) { + * // read shared state ... + * } + * }); + * + * context.run(); + */ +template <typename Executor> +class SharedMutex { + public: + explicit SharedMutex(const Executor& ex); + + /// on destruction, all pending lock requests are canceled + ~SharedMutex(); + + using executor_type = Executor; + executor_type get_executor() const noexcept { return ex; } + + /// initiate an asynchronous request for an exclusive lock. when the lock is + /// granted, the completion handler is invoked with a successful error code + /// and a std::unique_lock that owns this mutex. + /// Signature = void(boost::system::error_code, std::unique_lock) + template <typename CompletionToken> + auto async_lock(CompletionToken&& token); + + /// wait synchronously for an exclusive lock. if an error occurs before the + /// lock is granted, that error is thrown as an exception + void lock(); + + /// wait synchronously for an exclusive lock. if an error occurs before the + /// lock is granted, that error is assigned to 'ec' + void lock(boost::system::error_code& ec); + + /// try to acquire an exclusive lock. if the lock is not immediately + /// available, returns false + bool try_lock(); + + /// releases an exclusive lock. not required to be called from the same thread + /// that initiated the lock + void unlock(); + + /// initiate an asynchronous request for a shared lock. when the lock is + /// granted, the completion handler is invoked with a successful error code + /// and a std::shared_lock that owns this mutex. + /// Signature = void(boost::system::error_code, std::shared_lock) + template <typename CompletionToken> + auto async_lock_shared(CompletionToken&& token); + + /// wait synchronously for a shared lock. if an error occurs before the + /// lock is granted, that error is thrown as an exception + void lock_shared(); + + /// wait synchronously for a shared lock. if an error occurs before the lock + /// is granted, that error is assigned to 'ec' + void lock_shared(boost::system::error_code& ec); + + /// try to acquire a shared lock. if the lock is not immediately available, + /// returns false + bool try_lock_shared(); + + /// releases a shared lock. not required to be called from the same thread + /// that initiated the lock + void unlock_shared(); + + /// cancel any pending requests for exclusive or shared locks with an + /// operation_aborted error + void cancel(); + + private: + Executor ex; //< default callback executor + boost::intrusive_ptr<detail::SharedMutexImpl> impl; + + // allow lock guards to access impl + friend class std::unique_lock<SharedMutex>; + friend class std::shared_lock<SharedMutex>; +}; + + +template <typename Executor> +SharedMutex<Executor>::SharedMutex(const Executor& ex) + : ex(ex), impl(new detail::SharedMutexImpl) +{ +} + +template <typename Executor> +SharedMutex<Executor>::~SharedMutex() +{ + try { + impl->cancel(); + } catch (const std::exception&) { + // swallow any exceptions, the destructor can't throw + } +} + +template <typename Executor> +template <typename CompletionToken> +auto SharedMutex<Executor>::async_lock(CompletionToken&& token) +{ + return impl->async_lock(*this, std::forward<CompletionToken>(token)); +} + +template <typename Executor> +void SharedMutex<Executor>::lock() +{ + impl->lock(); +} + +template <typename Executor> +void SharedMutex<Executor>::lock(boost::system::error_code& ec) +{ + impl->lock(ec); +} + +template <typename Executor> +bool SharedMutex<Executor>::try_lock() +{ + return impl->try_lock(); +} + +template <typename Executor> +void SharedMutex<Executor>::unlock() +{ + impl->unlock(); +} + +template <typename Executor> +template <typename CompletionToken> +auto SharedMutex<Executor>::async_lock_shared(CompletionToken&& token) +{ + return impl->async_lock_shared(*this, std::forward<CompletionToken>(token)); +} + +template <typename Executor> +void SharedMutex<Executor>::lock_shared() +{ + impl->lock_shared(); +} + +template <typename Executor> +void SharedMutex<Executor>::lock_shared(boost::system::error_code& ec) +{ + impl->lock_shared(ec); +} + +template <typename Executor> +bool SharedMutex<Executor>::try_lock_shared() +{ + return impl->try_lock_shared(); +} + +template <typename Executor> +void SharedMutex<Executor>::unlock_shared() +{ + impl->unlock_shared(); +} + +template <typename Executor> +void SharedMutex<Executor>::cancel() +{ + impl->cancel(); +} + +} // namespace ceph::async + +#include "common/async/detail/shared_lock.h" diff --git a/src/common/async/waiter.h b/src/common/async/waiter.h new file mode 100644 index 000000000..219a27cf7 --- /dev/null +++ b/src/common/async/waiter.h @@ -0,0 +1,223 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMMON_WAITER_H +#define CEPH_COMMON_WAITER_H + +#include <condition_variable> +#include <tuple> + +#include <boost/asio/async_result.hpp> + +#include "include/ceph_assert.h" +#include "include/function2.hpp" + +#include "common/ceph_mutex.h" + +namespace ceph::async { +namespace detail { +// For safety reasons (avoiding undefined behavior around sequence +// points) std::reference_wrapper disallows move construction. This +// harms us in cases where we want to pass a reference in to something +// that unavoidably moves. +// +// It should not be used generally. +template<typename T> +class rvalue_reference_wrapper { +public: + // types + using type = T; + + rvalue_reference_wrapper(T& r) noexcept + : p(std::addressof(r)) {} + + // We write our semantics to match those of reference collapsing. If + // we're treated as an lvalue, collapse to one. + + rvalue_reference_wrapper(const rvalue_reference_wrapper&) noexcept = default; + rvalue_reference_wrapper(rvalue_reference_wrapper&&) noexcept = default; + + // assignment + rvalue_reference_wrapper& operator=( + const rvalue_reference_wrapper& x) noexcept = default; + rvalue_reference_wrapper& operator=( + rvalue_reference_wrapper&& x) noexcept = default; + + operator T& () const noexcept { + return *p; + } + T& get() const noexcept { + return *p; + } + + operator T&& () noexcept { + return std::move(*p); + } + T&& get() noexcept { + return std::move(*p); + } + + template<typename... Args> + std::result_of_t<T&(Args&&...)> operator ()(Args&&... args ) const { + return (*p)(std::forward<Args>(args)...); + } + + template<typename... Args> + std::result_of_t<T&&(Args&&...)> operator ()(Args&&... args ) { + return std::move(*p)(std::forward<Args>(args)...); + } + +private: + T* p; +}; + +class base { +protected: + ceph::mutex lock = ceph::make_mutex("ceph::async::detail::base::lock"); + ceph::condition_variable cond; + bool has_value = false; + + ~base() = default; + + auto wait_base() { + std::unique_lock l(lock); + cond.wait(l, [this](){ return has_value; }); + return l; + } + + auto exec_base() { + std::unique_lock l(lock); + // There's no really good way to handle being called twice + // without being reset. + ceph_assert(!has_value); + has_value = true; + cond.notify_one(); + return l; + } +}; +} + +// waiter is a replacement for C_SafeCond and friends. It is the +// moral equivalent of a future but plays well with a world of +// callbacks. +template<typename ...S> +class waiter; + +template<> +class waiter<> final : public detail::base { +public: + void wait() { + wait_base(); + has_value = false; + } + + void operator()() { + exec_base(); + } + + auto ref() { + return detail::rvalue_reference_wrapper(*this); + } + + + operator fu2::unique_function<void() &&>() { + return fu2::unique_function<void() &&>(ref()); + } +}; + +template<typename Ret> +class waiter<Ret> final : public detail::base { + std::aligned_storage_t<sizeof(Ret)> ret; + +public: + Ret wait() { + auto l = wait_base(); + auto r = reinterpret_cast<Ret*>(&ret); + auto t = std::move(*r); + r->~Ret(); + has_value = false; + return t; + } + + void operator()(Ret&& _ret) { + auto l = exec_base(); + auto r = reinterpret_cast<Ret*>(&ret); + *r = std::move(_ret); + } + + void operator()(const Ret& _ret) { + auto l = exec_base(); + auto r = reinterpret_cast<Ret*>(&ret); + *r = std::move(_ret); + } + + auto ref() { + return detail::rvalue_reference_wrapper(*this); + } + + operator fu2::unique_function<void(Ret) &&>() { + return fu2::unique_function<void(Ret) &&>(ref()); + } + + ~waiter() { + if (has_value) + reinterpret_cast<Ret*>(&ret)->~Ret(); + } +}; + +template<typename ...Ret> +class waiter final : public detail::base { + std::tuple<Ret...> ret; + +public: + std::tuple<Ret...> wait() { + using std::tuple; + auto l = wait_base(); + return std::move(ret); + auto r = reinterpret_cast<std::tuple<Ret...>*>(&ret); + auto t = std::move(*r); + r->~tuple<Ret...>(); + has_value = false; + return t; + } + + void operator()(Ret&&... _ret) { + auto l = exec_base(); + auto r = reinterpret_cast<std::tuple<Ret...>*>(&ret); + *r = std::forward_as_tuple(_ret...); + } + + void operator()(const Ret&... _ret) { + auto l = exec_base(); + auto r = reinterpret_cast<std::tuple<Ret...>*>(&ret); + *r = std::forward_as_tuple(_ret...); + } + + auto ref() { + return detail::rvalue_reference_wrapper(*this); + } + + operator fu2::unique_function<void(Ret...) &&>() { + return fu2::unique_function<void(Ret...) &&>(ref()); + } + + ~waiter() { + using std::tuple; + if (has_value) + reinterpret_cast<tuple<Ret...>*>(&ret)->~tuple<Ret...>(); + } +}; +} + +#endif // CEPH_COMMON_WAITER_H diff --git a/src/common/async/yield_context.h b/src/common/async/yield_context.h new file mode 100644 index 000000000..05e6ca614 --- /dev/null +++ b/src/common/async/yield_context.h @@ -0,0 +1,59 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <boost/range/begin.hpp> +#include <boost/range/end.hpp> +#include <boost/asio/io_context.hpp> + +#include "acconfig.h" + +#include <spawn/spawn.hpp> + +// use explicit executor types instead of the type-erased boost::asio::executor. +// coroutines wrap the default io_context executor with a strand executor +using yield_context = spawn::basic_yield_context< + boost::asio::executor_binder<void(*)(), + boost::asio::strand<boost::asio::io_context::executor_type>>>; + +/// optional-like wrapper for a spawn::yield_context and its associated +/// boost::asio::io_context. operations that take an optional_yield argument +/// will, when passed a non-empty yield context, suspend this coroutine instead +/// of the blocking the thread of execution +class optional_yield { + boost::asio::io_context *c = nullptr; + yield_context *y = nullptr; + public: + /// construct with a valid io and yield_context + explicit optional_yield(boost::asio::io_context& c, + yield_context& y) noexcept + : c(&c), y(&y) {} + + /// type tag to construct an empty object + struct empty_t {}; + optional_yield(empty_t) noexcept {} + + /// implicit conversion to bool, returns true if non-empty + operator bool() const noexcept { return y; } + + /// return a reference to the associated io_context. only valid if non-empty + boost::asio::io_context& get_io_context() const noexcept { return *c; } + + /// return a reference to the yield_context. only valid if non-empty + yield_context& get_yield_context() const noexcept { return *y; } +}; + +// type tag object to construct an empty optional_yield +static constexpr optional_yield::empty_t null_yield{}; |