summaryrefslogtreecommitdiffstats
path: root/src/common/async
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/async')
-rw-r--r--src/common/async/bind_handler.h111
-rw-r--r--src/common/async/bind_like.h39
-rw-r--r--src/common/async/blocked_completion.h290
-rw-r--r--src/common/async/completion.h320
-rw-r--r--src/common/async/context_pool.h94
-rw-r--r--src/common/async/detail/shared_lock.h185
-rw-r--r--src/common/async/detail/shared_mutex.h326
-rw-r--r--src/common/async/forward_handler.h103
-rw-r--r--src/common/async/librados_completion.h125
-rw-r--r--src/common/async/shared_mutex.h212
-rw-r--r--src/common/async/waiter.h223
-rw-r--r--src/common/async/yield_context.h59
12 files changed, 2087 insertions, 0 deletions
diff --git a/src/common/async/bind_handler.h b/src/common/async/bind_handler.h
new file mode 100644
index 000000000..516d8a5e8
--- /dev/null
+++ b/src/common/async/bind_handler.h
@@ -0,0 +1,111 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_ASYNC_BIND_HANDLER_H
+#define CEPH_ASYNC_BIND_HANDLER_H
+
+#include <tuple>
+#include <boost/asio.hpp>
+
+namespace ceph::async {
+
+/**
+ * A bound completion handler for use with boost::asio.
+ *
+ * A completion handler wrapper that allows a tuple of arguments to be forwarded
+ * to the original Handler. This is intended for use with boost::asio functions
+ * like defer(), dispatch() and post() which expect handlers which are callable
+ * with no arguments.
+ *
+ * The original Handler's associated allocator and executor are maintained.
+ *
+ * @see bind_handler
+ */
+template <typename Handler, typename Tuple>
+struct CompletionHandler {
+ Handler handler;
+ Tuple args;
+
+ CompletionHandler(Handler&& handler, Tuple&& args)
+ : handler(std::move(handler)),
+ args(std::move(args))
+ {}
+
+ void operator()() & {
+ std::apply(handler, args);
+ }
+ void operator()() const & {
+ std::apply(handler, args);
+ }
+ void operator()() && {
+ std::apply(std::move(handler), std::move(args));
+ }
+
+ using allocator_type = boost::asio::associated_allocator_t<Handler>;
+ allocator_type get_allocator() const noexcept {
+ return boost::asio::get_associated_allocator(handler);
+ }
+};
+
+} // namespace ceph::async
+
+namespace boost::asio {
+
+// specialize boost::asio::associated_executor<> for CompletionHandler
+template <typename Handler, typename Tuple, typename Executor>
+struct associated_executor<ceph::async::CompletionHandler<Handler, Tuple>, Executor> {
+ using type = boost::asio::associated_executor_t<Handler, Executor>;
+
+ static type get(const ceph::async::CompletionHandler<Handler, Tuple>& handler,
+ const Executor& ex = Executor()) noexcept {
+ return boost::asio::get_associated_executor(handler.handler, ex);
+ }
+};
+
+} // namespace boost::asio
+
+namespace ceph::async {
+
+/**
+ * Returns a wrapped completion handler with bound arguments.
+ *
+ * Binds the given arguments to a handler, and returns a CompletionHandler that
+ * is callable with no arguments. This is similar to std::bind(), except that
+ * all arguments must be provided. Move-only argument types are supported as
+ * long as the CompletionHandler's 'operator() &&' overload is used, i.e.
+ * std::move(handler)().
+ *
+ * Example use:
+ *
+ * // bind the arguments (5, "hello") to a callback lambda:
+ * auto callback = [] (int a, std::string b) {};
+ * auto handler = bind_handler(callback, 5, "hello");
+ *
+ * // execute the bound handler on an io_context:
+ * boost::asio::io_context context;
+ * boost::asio::post(context, std::move(handler));
+ * context.run();
+ *
+ * @see CompletionHandler
+ */
+template <typename Handler, typename ...Args>
+auto bind_handler(Handler&& h, Args&& ...args)
+{
+ return CompletionHandler{std::forward<Handler>(h),
+ std::make_tuple(std::forward<Args>(args)...)};
+}
+
+} // namespace ceph::async
+
+#endif // CEPH_ASYNC_BIND_HANDLER_H
diff --git a/src/common/async/bind_like.h b/src/common/async/bind_like.h
new file mode 100644
index 000000000..c360eac0a
--- /dev/null
+++ b/src/common/async/bind_like.h
@@ -0,0 +1,39 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat <contact@redhat.com>
+ * Author: Adam C. Emerson
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <boost/asio/associated_allocator.hpp>
+#include <boost/asio/associated_executor.hpp>
+#include <boost/asio/bind_allocator.hpp>
+#include <boost/asio/bind_executor.hpp>
+
+namespace ceph::async {
+template<typename Executor, typename Allocator, typename Completion>
+auto bind_ea(const Executor& executor, const Allocator& allocator,
+ Completion&& completion) {
+ return bind_allocator(allocator,
+ boost::asio::bind_executor(
+ executor,
+ std::forward<Completion>(completion)));
+}
+
+
+// Bind `Completion` to the executor and allocator of `Proto`
+template<typename Proto, typename Completion>
+auto bind_like(const Proto& proto, Completion&& completion) {
+ return bind_ea(boost::asio::get_associated_executor(proto),
+ boost::asio::get_associated_allocator(proto),
+ std::forward<Completion>(completion));
+}
+}
diff --git a/src/common/async/blocked_completion.h b/src/common/async/blocked_completion.h
new file mode 100644
index 000000000..23a1319bc
--- /dev/null
+++ b/src/common/async/blocked_completion.h
@@ -0,0 +1,290 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat
+ * Author: Adam C. Emerson <aemerson@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
+#define CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
+
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <optional>
+#include <type_traits>
+
+#include <boost/asio/async_result.hpp>
+
+#include <boost/system/error_code.hpp>
+#include <boost/system/system_error.hpp>
+
+namespace ceph::async {
+
+namespace bs = boost::system;
+
+class use_blocked_t {
+ use_blocked_t(bs::error_code* ec) : ec(ec) {}
+public:
+ use_blocked_t() = default;
+
+ use_blocked_t operator [](bs::error_code& _ec) const {
+ return use_blocked_t(&_ec);
+ }
+
+ bs::error_code* ec = nullptr;
+};
+
+inline constexpr use_blocked_t use_blocked;
+
+namespace detail {
+
+template<typename... Ts>
+struct blocked_handler
+{
+ blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+
+ void operator ()(Ts... values) noexcept {
+ std::scoped_lock l(*m);
+ *ec = bs::error_code{};
+ *value = std::forward_as_tuple(std::move(values)...);
+ *done = true;
+ cv->notify_one();
+ }
+
+ void operator ()(bs::error_code ec, Ts... values) noexcept {
+ std::scoped_lock l(*m);
+ *this->ec = ec;
+ *value = std::forward_as_tuple(std::move(values)...);
+ *done = true;
+ cv->notify_one();
+ }
+
+ bs::error_code* ec;
+ std::optional<std::tuple<Ts...>>* value = nullptr;
+ std::mutex* m = nullptr;
+ std::condition_variable* cv = nullptr;
+ bool* done = nullptr;
+};
+
+template<typename T>
+struct blocked_handler<T>
+{
+ blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+
+ void operator ()(T value) noexcept {
+ std::scoped_lock l(*m);
+ *ec = bs::error_code();
+ *this->value = std::move(value);
+ *done = true;
+ cv->notify_one();
+ }
+
+ void operator ()(bs::error_code ec, T value) noexcept {
+ std::scoped_lock l(*m);
+ *this->ec = ec;
+ *this->value = std::move(value);
+ *done = true;
+ cv->notify_one();
+ }
+
+ //private:
+ bs::error_code* ec;
+ std::optional<T>* value;
+ std::mutex* m = nullptr;
+ std::condition_variable* cv = nullptr;
+ bool* done = nullptr;
+};
+
+template<>
+struct blocked_handler<void>
+{
+ blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+
+ void operator ()() noexcept {
+ std::scoped_lock l(*m);
+ *ec = bs::error_code{};
+ *done = true;
+ cv->notify_one();
+ }
+
+ void operator ()(bs::error_code ec) noexcept {
+ std::scoped_lock l(*m);
+ *this->ec = ec;
+ *done = true;
+ cv->notify_one();
+ }
+
+ bs::error_code* ec;
+ std::mutex* m = nullptr;
+ std::condition_variable* cv = nullptr;
+ bool* done = nullptr;
+};
+
+template<typename... Ts>
+class blocked_result
+{
+public:
+ using completion_handler_type = blocked_handler<Ts...>;
+ using return_type = std::tuple<Ts...>;
+
+ explicit blocked_result(completion_handler_type& h) noexcept {
+ std::scoped_lock l(m);
+ out_ec = h.ec;
+ if (!out_ec) h.ec = &ec;
+ h.value = &value;
+ h.m = &m;
+ h.cv = &cv;
+ h.done = &done;
+ }
+
+ return_type get() {
+ std::unique_lock l(m);
+ cv.wait(l, [this]() { return done; });
+ if (!out_ec && ec) throw bs::system_error(ec);
+ return std::move(*value);
+ }
+
+ blocked_result(const blocked_result&) = delete;
+ blocked_result& operator =(const blocked_result&) = delete;
+ blocked_result(blocked_result&&) = delete;
+ blocked_result& operator =(blocked_result&&) = delete;
+
+private:
+ bs::error_code* out_ec;
+ bs::error_code ec;
+ std::optional<return_type> value;
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+};
+
+template<typename T>
+class blocked_result<T>
+{
+public:
+ using completion_handler_type = blocked_handler<T>;
+ using return_type = T;
+
+ explicit blocked_result(completion_handler_type& h) noexcept {
+ std::scoped_lock l(m);
+ out_ec = h.ec;
+ if (!out_ec) h.ec = &ec;
+ h.value = &value;
+ h.m = &m;
+ h.cv = &cv;
+ h.done = &done;
+ }
+
+ return_type get() {
+ std::unique_lock l(m);
+ cv.wait(l, [this]() { return done; });
+ if (!out_ec && ec) throw bs::system_error(ec);
+ return std::move(*value);
+ }
+
+ blocked_result(const blocked_result&) = delete;
+ blocked_result& operator =(const blocked_result&) = delete;
+ blocked_result(blocked_result&&) = delete;
+ blocked_result& operator =(blocked_result&&) = delete;
+
+private:
+ bs::error_code* out_ec;
+ bs::error_code ec;
+ std::optional<return_type> value;
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+};
+
+template<>
+class blocked_result<void>
+{
+public:
+ using completion_handler_type = blocked_handler<void>;
+ using return_type = void;
+
+ explicit blocked_result(completion_handler_type& h) noexcept {
+ std::scoped_lock l(m);
+ out_ec = h.ec;
+ if (!out_ec) h.ec = &ec;
+ h.m = &m;
+ h.cv = &cv;
+ h.done = &done;
+ }
+
+ void get() {
+ std::unique_lock l(m);
+ cv.wait(l, [this]() { return done; });
+ if (!out_ec && ec) throw bs::system_error(ec);
+ }
+
+ blocked_result(const blocked_result&) = delete;
+ blocked_result& operator =(const blocked_result&) = delete;
+ blocked_result(blocked_result&&) = delete;
+ blocked_result& operator =(blocked_result&&) = delete;
+
+private:
+ bs::error_code* out_ec;
+ bs::error_code ec;
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+};
+} // namespace detail
+} // namespace ceph::async
+
+
+namespace boost::asio {
+template<typename ReturnType>
+class async_result<ceph::async::use_blocked_t, ReturnType()>
+ : public ceph::async::detail::blocked_result<void>
+{
+public:
+ explicit async_result(typename ceph::async::detail::blocked_result<void>
+ ::completion_handler_type& h)
+ : ceph::async::detail::blocked_result<void>(h) {}
+};
+
+template<typename ReturnType, typename... Args>
+class async_result<ceph::async::use_blocked_t, ReturnType(Args...)>
+ : public ceph::async::detail::blocked_result<std::decay_t<Args>...>
+{
+public:
+ explicit async_result(
+ typename ceph::async::detail::blocked_result<std::decay_t<Args>...>::completion_handler_type& h)
+ : ceph::async::detail::blocked_result<std::decay_t<Args>...>(h) {}
+};
+
+template<typename ReturnType>
+class async_result<ceph::async::use_blocked_t,
+ ReturnType(boost::system::error_code)>
+ : public ceph::async::detail::blocked_result<void>
+{
+public:
+ explicit async_result(
+ typename ceph::async::detail::blocked_result<void>::completion_handler_type& h)
+ : ceph::async::detail::blocked_result<void>(h) {}
+};
+
+template<typename ReturnType, typename... Args>
+class async_result<ceph::async::use_blocked_t,
+ ReturnType(boost::system::error_code, Args...)>
+ : public ceph::async::detail::blocked_result<std::decay_t<Args>...>
+{
+public:
+ explicit async_result(
+ typename ceph::async::detail::blocked_result<std::decay_t<Args>...>::completion_handler_type& h)
+ : ceph::async::detail::blocked_result<std::decay_t<Args>...>(h) {}
+};
+}
+
+#endif // !CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
diff --git a/src/common/async/completion.h b/src/common/async/completion.h
new file mode 100644
index 000000000..6af9109d5
--- /dev/null
+++ b/src/common/async/completion.h
@@ -0,0 +1,320 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_ASYNC_COMPLETION_H
+#define CEPH_ASYNC_COMPLETION_H
+
+#include <memory>
+
+#include "bind_handler.h"
+#include "forward_handler.h"
+
+namespace ceph::async {
+
+/**
+ * Abstract completion handler interface for use with boost::asio.
+ *
+ * Memory management is performed using the Handler's 'associated allocator',
+ * which carries the additional requirement that its memory be released before
+ * the Handler is invoked. This allows memory allocated for one asynchronous
+ * operation to be reused in its continuation. Because of this requirement, any
+ * calls to invoke the completion must first release ownership of it. To enforce
+ * this, the static functions defer()/dispatch()/post() take the completion by
+ * rvalue-reference to std::unique_ptr<Completion>, i.e. std::move(completion).
+ *
+ * Handlers may also have an 'associated executor', so the calls to defer(),
+ * dispatch(), and post() are forwarded to that executor. If there is no
+ * associated executor (which is generally the case unless one was bound with
+ * boost::asio::bind_executor()), the executor passed to Completion::create()
+ * is used as a default.
+ *
+ * Example use:
+ *
+ * // declare a Completion type with Signature = void(int, string)
+ * using MyCompletion = ceph::async::Completion<void(int, string)>;
+ *
+ * // create a completion with the given callback:
+ * std::unique_ptr<MyCompletion> c;
+ * c = MyCompletion::create(ex, [] (int a, const string& b) {});
+ *
+ * // bind arguments to the callback and post to its associated executor:
+ * MyCompletion::post(std::move(c), 5, "hello");
+ *
+ *
+ * Additional user data may be stored along with the Completion to take
+ * advantage of the handler allocator optimization. This is accomplished by
+ * specifying its type in the template parameter T. For example, the type
+ * Completion<void(), int> contains a public member variable 'int user_data'.
+ * Any additional arguments to Completion::create() will be forwarded to type
+ * T's constructor.
+ *
+ * If the AsBase<T> type tag is used, as in Completion<void(), AsBase<T>>,
+ * the Completion will inherit from T instead of declaring it as a member
+ * variable.
+ *
+ * When invoking the completion handler via defer(), dispatch(), or post(),
+ * care must be taken when passing arguments that refer to user data, because
+ * its memory is destroyed prior to invocation. In such cases, the user data
+ * should be moved/copied out of the Completion first.
+ */
+template <typename Signature, typename T = void>
+class Completion;
+
+
+/// type tag for UserData
+template <typename T> struct AsBase {};
+
+namespace detail {
+
+/// optional user data to be stored with the Completion
+template <typename T>
+struct UserData {
+ T user_data;
+ template <typename ...Args>
+ UserData(Args&& ...args)
+ : user_data(std::forward<Args>(args)...)
+ {}
+};
+// AsBase specialization inherits from T
+template <typename T>
+struct UserData<AsBase<T>> : public T {
+ template <typename ...Args>
+ UserData(Args&& ...args)
+ : T(std::forward<Args>(args)...)
+ {}
+};
+// void specialization
+template <>
+class UserData<void> {};
+
+} // namespace detail
+
+
+// template specialization to pull the Signature's args apart
+template <typename T, typename ...Args>
+class Completion<void(Args...), T> : public detail::UserData<T> {
+ protected:
+ // internal interfaces for type-erasure on the Handler/Executor. uses
+ // tuple<Args...> to provide perfect forwarding because you can't make
+ // virtual function templates
+ virtual void destroy_defer(std::tuple<Args...>&& args) = 0;
+ virtual void destroy_dispatch(std::tuple<Args...>&& args) = 0;
+ virtual void destroy_post(std::tuple<Args...>&& args) = 0;
+ virtual void destroy() = 0;
+
+ // constructor is protected, use create(). any constructor arguments are
+ // forwarded to UserData
+ template <typename ...TArgs>
+ Completion(TArgs&& ...args)
+ : detail::UserData<T>(std::forward<TArgs>(args)...)
+ {}
+ public:
+ virtual ~Completion() = default;
+
+ // use the virtual destroy() interface on delete. this allows the derived
+ // class to manage its memory using Handler allocators, without having to use
+ // a custom Deleter for std::unique_ptr<>
+ static void operator delete(void *p) {
+ static_cast<Completion*>(p)->destroy();
+ }
+
+ /// completion factory function that uses the handler's associated allocator.
+ /// any additional arguments are forwared to T's constructor
+ template <typename Executor1, typename Handler, typename ...TArgs>
+ static std::unique_ptr<Completion>
+ create(const Executor1& ex1, Handler&& handler, TArgs&& ...args);
+
+ /// take ownership of the completion, bind any arguments to the completion
+ /// handler, then defer() it on its associated executor
+ template <typename ...Args2>
+ static void defer(std::unique_ptr<Completion>&& c, Args2&&...args);
+
+ /// take ownership of the completion, bind any arguments to the completion
+ /// handler, then dispatch() it on its associated executor
+ template <typename ...Args2>
+ static void dispatch(std::unique_ptr<Completion>&& c, Args2&&...args);
+
+ /// take ownership of the completion, bind any arguments to the completion
+ /// handler, then post() it to its associated executor
+ template <typename ...Args2>
+ static void post(std::unique_ptr<Completion>&& c, Args2&&...args);
+};
+
+namespace detail {
+
+// concrete Completion that knows how to invoke the completion handler. this
+// observes all of the 'Requirements on asynchronous operations' specified by
+// the C++ Networking TS
+template <typename Executor1, typename Handler, typename T, typename ...Args>
+class CompletionImpl final : public Completion<void(Args...), T> {
+ // use Handler's associated executor (or Executor1 by default) for callbacks
+ using Executor2 = boost::asio::associated_executor_t<Handler, Executor1>;
+ // maintain work on both executors
+ using Work1 = boost::asio::executor_work_guard<Executor1>;
+ using Work2 = boost::asio::executor_work_guard<Executor2>;
+ std::pair<Work1, Work2> work;
+ Handler handler;
+
+ // use Handler's associated allocator
+ using Alloc2 = boost::asio::associated_allocator_t<Handler>;
+ using Traits2 = std::allocator_traits<Alloc2>;
+ using RebindAlloc2 = typename Traits2::template rebind_alloc<CompletionImpl>;
+ using RebindTraits2 = std::allocator_traits<RebindAlloc2>;
+
+ // placement new for the handler allocator
+ static void* operator new(size_t, RebindAlloc2 alloc2) {
+ return RebindTraits2::allocate(alloc2, 1);
+ }
+ // placement delete for when the constructor throws during placement new
+ static void operator delete(void *p, RebindAlloc2 alloc2) {
+ RebindTraits2::deallocate(alloc2, static_cast<CompletionImpl*>(p), 1);
+ }
+
+ static auto bind_and_forward(Handler&& h, std::tuple<Args...>&& args) {
+ return forward_handler(CompletionHandler{std::move(h), std::move(args)});
+ }
+
+ void destroy_defer(std::tuple<Args...>&& args) override {
+ auto w = std::move(work);
+ auto f = bind_and_forward(std::move(handler), std::move(args));
+ RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
+ RebindTraits2::destroy(alloc2, this);
+ RebindTraits2::deallocate(alloc2, this, 1);
+ w.second.get_executor().defer(std::move(f), alloc2);
+ }
+ void destroy_dispatch(std::tuple<Args...>&& args) override {
+ auto w = std::move(work);
+ auto f = bind_and_forward(std::move(handler), std::move(args));
+ RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
+ RebindTraits2::destroy(alloc2, this);
+ RebindTraits2::deallocate(alloc2, this, 1);
+ w.second.get_executor().dispatch(std::move(f), alloc2);
+ }
+ void destroy_post(std::tuple<Args...>&& args) override {
+ auto w = std::move(work);
+ auto f = bind_and_forward(std::move(handler), std::move(args));
+ RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
+ RebindTraits2::destroy(alloc2, this);
+ RebindTraits2::deallocate(alloc2, this, 1);
+ w.second.get_executor().post(std::move(f), alloc2);
+ }
+ void destroy() override {
+ RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
+ RebindTraits2::destroy(alloc2, this);
+ RebindTraits2::deallocate(alloc2, this, 1);
+ }
+
+ // constructor is private, use create(). extra constructor arguments are
+ // forwarded to UserData
+ template <typename ...TArgs>
+ CompletionImpl(const Executor1& ex1, Handler&& handler, TArgs&& ...args)
+ : Completion<void(Args...), T>(std::forward<TArgs>(args)...),
+ work(ex1, boost::asio::make_work_guard(handler, ex1)),
+ handler(std::move(handler))
+ {}
+
+ public:
+ template <typename ...TArgs>
+ static auto create(const Executor1& ex, Handler&& handler, TArgs&& ...args) {
+ auto alloc2 = boost::asio::get_associated_allocator(handler);
+ using Ptr = std::unique_ptr<CompletionImpl>;
+ return Ptr{new (alloc2) CompletionImpl(ex, std::move(handler),
+ std::forward<TArgs>(args)...)};
+ }
+
+ static void operator delete(void *p) {
+ static_cast<CompletionImpl*>(p)->destroy();
+ }
+};
+
+} // namespace detail
+
+
+template <typename T, typename ...Args>
+template <typename Executor1, typename Handler, typename ...TArgs>
+std::unique_ptr<Completion<void(Args...), T>>
+Completion<void(Args...), T>::create(const Executor1& ex,
+ Handler&& handler, TArgs&& ...args)
+{
+ using Impl = detail::CompletionImpl<Executor1, Handler, T, Args...>;
+ return Impl::create(ex, std::forward<Handler>(handler),
+ std::forward<TArgs>(args)...);
+}
+
+template <typename T, typename ...Args>
+template <typename ...Args2>
+void Completion<void(Args...), T>::defer(std::unique_ptr<Completion>&& ptr,
+ Args2&& ...args)
+{
+ auto c = ptr.release();
+ c->destroy_defer(std::make_tuple(std::forward<Args2>(args)...));
+}
+
+template <typename T, typename ...Args>
+template <typename ...Args2>
+void Completion<void(Args...), T>::dispatch(std::unique_ptr<Completion>&& ptr,
+ Args2&& ...args)
+{
+ auto c = ptr.release();
+ c->destroy_dispatch(std::make_tuple(std::forward<Args2>(args)...));
+}
+
+template <typename T, typename ...Args>
+template <typename ...Args2>
+void Completion<void(Args...), T>::post(std::unique_ptr<Completion>&& ptr,
+ Args2&& ...args)
+{
+ auto c = ptr.release();
+ c->destroy_post(std::make_tuple(std::forward<Args2>(args)...));
+}
+
+
+/// completion factory function that uses the handler's associated allocator.
+/// any additional arguments are forwared to T's constructor
+template <typename Signature, typename T, typename Executor1,
+ typename Handler, typename ...TArgs>
+std::unique_ptr<Completion<Signature, T>>
+create_completion(const Executor1& ex, Handler&& handler, TArgs&& ...args)
+{
+ return Completion<Signature, T>::create(ex, std::forward<Handler>(handler),
+ std::forward<TArgs>(args)...);
+}
+
+/// take ownership of the completion, bind any arguments to the completion
+/// handler, then defer() it on its associated executor
+template <typename Signature, typename T, typename ...Args>
+void defer(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args)
+{
+ Completion<Signature, T>::defer(std::move(ptr), std::forward<Args>(args)...);
+}
+
+/// take ownership of the completion, bind any arguments to the completion
+/// handler, then dispatch() it on its associated executor
+template <typename Signature, typename T, typename ...Args>
+void dispatch(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args)
+{
+ Completion<Signature, T>::dispatch(std::move(ptr), std::forward<Args>(args)...);
+}
+
+/// take ownership of the completion, bind any arguments to the completion
+/// handler, then post() it to its associated executor
+template <typename Signature, typename T, typename ...Args>
+void post(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args)
+{
+ Completion<Signature, T>::post(std::move(ptr), std::forward<Args>(args)...);
+}
+
+} // namespace ceph::async
+
+#endif // CEPH_ASYNC_COMPLETION_H
diff --git a/src/common/async/context_pool.h b/src/common/async/context_pool.h
new file mode 100644
index 000000000..9c6cab767
--- /dev/null
+++ b/src/common/async/context_pool.h
@@ -0,0 +1,94 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat <contact@redhat.com>
+ * Author: Adam C. Emerson <aemerson@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMMON_ASYNC_CONTEXT_POOL_H
+#define CEPH_COMMON_ASYNC_CONTEXT_POOL_H
+
+#include <cstddef>
+#include <cstdint>
+#include <mutex>
+#include <optional>
+#include <thread>
+#include <vector>
+
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+
+#include "common/ceph_mutex.h"
+#include "common/Thread.h"
+
+namespace ceph::async {
+class io_context_pool {
+ std::vector<std::thread> threadvec;
+ boost::asio::io_context ioctx;
+ std::optional<boost::asio::executor_work_guard<
+ boost::asio::io_context::executor_type>> guard;
+ ceph::mutex m = make_mutex("ceph::io_context_pool::m");
+
+ void cleanup() noexcept {
+ guard = std::nullopt;
+ for (auto& th : threadvec) {
+ th.join();
+ }
+ threadvec.clear();
+ }
+public:
+ io_context_pool() noexcept {}
+ io_context_pool(std::int16_t threadcnt) noexcept {
+ start(threadcnt);
+ }
+ ~io_context_pool() {
+ stop();
+ }
+ void start(std::int16_t threadcnt) noexcept {
+ auto l = std::scoped_lock(m);
+ if (threadvec.empty()) {
+ guard.emplace(boost::asio::make_work_guard(ioctx));
+ ioctx.restart();
+ for (std::int16_t i = 0; i < threadcnt; ++i) {
+ threadvec.emplace_back(make_named_thread("io_context_pool",
+ [this]() {
+ ioctx.run();
+ }));
+ }
+ }
+ }
+ void finish() noexcept {
+ auto l = std::scoped_lock(m);
+ if (!threadvec.empty()) {
+ cleanup();
+ }
+ }
+ void stop() noexcept {
+ auto l = std::scoped_lock(m);
+ if (!threadvec.empty()) {
+ ioctx.stop();
+ cleanup();
+ }
+ }
+
+ boost::asio::io_context& get_io_context() {
+ return ioctx;
+ }
+ operator boost::asio::io_context&() {
+ return ioctx;
+ }
+ boost::asio::io_context::executor_type get_executor() {
+ return ioctx.get_executor();
+ }
+};
+}
+
+#endif // CEPH_COMMON_ASYNC_CONTEXT_POOL_H
diff --git a/src/common/async/detail/shared_lock.h b/src/common/async/detail/shared_lock.h
new file mode 100644
index 000000000..12e6a9220
--- /dev/null
+++ b/src/common/async/detail/shared_lock.h
@@ -0,0 +1,185 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+namespace std {
+
+// specialize unique_lock and shared_lock for SharedMutex to operate on
+// SharedMutexImpl instead, because the locks may outlive the SharedMutex itself
+
+template <typename Executor>
+class unique_lock<ceph::async::SharedMutex<Executor>> {
+ public:
+ using mutex_type = boost::intrusive_ptr<ceph::async::detail::SharedMutexImpl>;
+
+ unique_lock() = default;
+ explicit unique_lock(ceph::async::SharedMutex<Executor>& m)
+ : impl(m.impl), locked(true)
+ {
+ impl->lock();
+ }
+ unique_lock(ceph::async::SharedMutex<Executor>& m, defer_lock_t t) noexcept
+ : impl(m.impl)
+ {}
+ unique_lock(ceph::async::SharedMutex<Executor>& m, try_to_lock_t t)
+ : impl(m.impl), locked(impl->try_lock())
+ {}
+ unique_lock(ceph::async::SharedMutex<Executor>& m, adopt_lock_t t) noexcept
+ : impl(m.impl), locked(true)
+ {}
+ ~unique_lock() {
+ if (impl && locked)
+ impl->unlock();
+ }
+
+ unique_lock(unique_lock&& other) noexcept
+ : impl(std::move(other.impl)),
+ locked(other.locked) {
+ other.locked = false;
+ }
+ unique_lock& operator=(unique_lock&& other) noexcept {
+ if (impl && locked) {
+ impl->unlock();
+ }
+ impl = std::move(other.impl);
+ locked = other.locked;
+ other.locked = false;
+ return *this;
+ }
+ void swap(unique_lock& other) noexcept {
+ using std::swap;
+ swap(impl, other.impl);
+ swap(locked, other.locked);
+ }
+
+ mutex_type mutex() const noexcept { return impl; }
+ bool owns_lock() const noexcept { return impl && locked; }
+ explicit operator bool() const noexcept { return impl && locked; }
+
+ mutex_type release() {
+ auto result = std::move(impl);
+ locked = false;
+ return result;
+ }
+
+ void lock() {
+ if (!impl)
+ throw system_error(make_error_code(errc::operation_not_permitted));
+ if (locked)
+ throw system_error(make_error_code(errc::resource_deadlock_would_occur));
+ impl->lock();
+ locked = true;
+ }
+ bool try_lock() {
+ if (!impl)
+ throw system_error(make_error_code(errc::operation_not_permitted));
+ if (locked)
+ throw system_error(make_error_code(errc::resource_deadlock_would_occur));
+ return locked = impl->try_lock();
+ }
+ void unlock() {
+ if (!impl || !locked)
+ throw system_error(make_error_code(errc::operation_not_permitted));
+ impl->unlock();
+ locked = false;
+ }
+ private:
+ mutex_type impl;
+ bool locked{false};
+};
+
+template <typename Executor>
+class shared_lock<ceph::async::SharedMutex<Executor>> {
+ public:
+ using mutex_type = boost::intrusive_ptr<ceph::async::detail::SharedMutexImpl>;
+
+ shared_lock() = default;
+ explicit shared_lock(ceph::async::SharedMutex<Executor>& m)
+ : impl(m.impl), locked(true)
+ {
+ impl->lock_shared();
+ }
+ shared_lock(ceph::async::SharedMutex<Executor>& m, defer_lock_t t) noexcept
+ : impl(m.impl)
+ {}
+ shared_lock(ceph::async::SharedMutex<Executor>& m, try_to_lock_t t)
+ : impl(m.impl), locked(impl->try_lock_shared())
+ {}
+ shared_lock(ceph::async::SharedMutex<Executor>& m, adopt_lock_t t) noexcept
+ : impl(m.impl), locked(true)
+ {}
+
+ ~shared_lock() {
+ if (impl && locked)
+ impl->unlock_shared();
+ }
+
+ shared_lock(shared_lock&& other) noexcept
+ : impl(std::move(other.impl)),
+ locked(other.locked) {
+ other.locked = false;
+ }
+ shared_lock& operator=(shared_lock&& other) noexcept {
+ if (impl && locked) {
+ impl->unlock_shared();
+ }
+ impl = std::move(other.impl);
+ locked = other.locked;
+ other.locked = false;
+ return *this;
+ }
+ void swap(shared_lock& other) noexcept {
+ using std::swap;
+ swap(impl, other.impl);
+ swap(locked, other.locked);
+ }
+
+ mutex_type mutex() const noexcept { return impl; }
+ bool owns_lock() const noexcept { return impl && locked; }
+ explicit operator bool() const noexcept { return impl && locked; }
+
+ mutex_type release() {
+ auto result = std::move(impl);
+ locked = false;
+ return result;
+ }
+
+ void lock() {
+ if (!impl)
+ throw system_error(make_error_code(errc::operation_not_permitted));
+ if (locked)
+ throw system_error(make_error_code(errc::resource_deadlock_would_occur));
+ impl->lock_shared();
+ locked = true;
+ }
+ bool try_lock() {
+ if (!impl)
+ throw system_error(make_error_code(errc::operation_not_permitted));
+ if (locked)
+ throw system_error(make_error_code(errc::resource_deadlock_would_occur));
+ return locked = impl->try_lock_shared();
+ }
+ void unlock() {
+ if (!impl || !locked)
+ throw system_error(make_error_code(errc::operation_not_permitted));
+ impl->unlock_shared();
+ locked = false;
+ }
+ private:
+ mutex_type impl;
+ bool locked{false};
+};
+
+} // namespace std
diff --git a/src/common/async/detail/shared_mutex.h b/src/common/async/detail/shared_mutex.h
new file mode 100644
index 000000000..8e5436350
--- /dev/null
+++ b/src/common/async/detail/shared_mutex.h
@@ -0,0 +1,326 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <optional>
+#include <shared_mutex> // for std::shared_lock
+
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/intrusive/list.hpp>
+
+#include "include/ceph_assert.h"
+
+#include "common/async/completion.h"
+
+namespace ceph::async::detail {
+
+struct LockRequest : public boost::intrusive::list_base_hook<> {
+ virtual ~LockRequest() {}
+ virtual void complete(boost::system::error_code ec) = 0;
+ virtual void destroy() = 0;
+};
+
+class SharedMutexImpl : public boost::intrusive_ref_counter<SharedMutexImpl> {
+ public:
+ ~SharedMutexImpl();
+
+ template <typename Mutex, typename CompletionToken>
+ auto async_lock(Mutex& mtx, CompletionToken&& token);
+ void lock();
+ void lock(boost::system::error_code& ec);
+ bool try_lock();
+ void unlock();
+ template <typename Mutex, typename CompletionToken>
+ auto async_lock_shared(Mutex& mtx, CompletionToken&& token);
+ void lock_shared();
+ void lock_shared(boost::system::error_code& ec);
+ bool try_lock_shared();
+ void unlock_shared();
+ void cancel();
+
+ private:
+ using RequestList = boost::intrusive::list<LockRequest>;
+
+ RequestList shared_queue; //< requests waiting on a shared lock
+ RequestList exclusive_queue; //< requests waiting on an exclusive lock
+
+ /// lock state encodes the number of shared lockers, or 'max' for exclusive
+ using LockState = uint16_t;
+ static constexpr LockState Unlocked = 0;
+ static constexpr LockState Exclusive = std::numeric_limits<LockState>::max();
+ static constexpr LockState MaxShared = Exclusive - 1;
+ LockState state = Unlocked; //< current lock state
+
+ std::mutex mutex; //< protects lock state and wait queues
+
+ void complete(RequestList&& requests, boost::system::error_code ec);
+};
+
+// sync requests live on the stack and wait on a condition variable
+class SyncRequest : public LockRequest {
+ std::condition_variable cond;
+ std::optional<boost::system::error_code> ec;
+ public:
+ boost::system::error_code wait(std::unique_lock<std::mutex>& lock) {
+ // return the error code once its been set
+ cond.wait(lock, [this] { return ec; });
+ return *ec;
+ }
+ void complete(boost::system::error_code ec) override {
+ this->ec = ec;
+ cond.notify_one();
+ }
+ void destroy() override {
+ // nothing, SyncRequests live on the stack
+ }
+};
+
+// async requests use async::Completion to invoke a handler on its executor
+template <typename Mutex, template <typename> typename Lock>
+class AsyncRequest : public LockRequest {
+ Mutex& mutex; //< mutex argument for lock guard
+ public:
+ explicit AsyncRequest(Mutex& mutex) : mutex(mutex) {}
+
+ using Signature = void(boost::system::error_code, Lock<Mutex>);
+ using LockCompletion = Completion<Signature, AsBase<AsyncRequest>>;
+
+ void complete(boost::system::error_code ec) override {
+ auto r = static_cast<LockCompletion*>(this);
+ // pass ownership of ourselves to post(). on error, pass an empty lock
+ post(std::unique_ptr<LockCompletion>{r}, ec,
+ ec ? Lock{mutex, std::defer_lock} : Lock{mutex, std::adopt_lock});
+ }
+ void destroy() override {
+ delete static_cast<LockCompletion*>(this);
+ }
+};
+
+inline SharedMutexImpl::~SharedMutexImpl()
+{
+ ceph_assert(state == Unlocked);
+ ceph_assert(shared_queue.empty());
+ ceph_assert(exclusive_queue.empty());
+}
+
+template <typename Mutex, typename CompletionToken>
+auto SharedMutexImpl::async_lock(Mutex& mtx, CompletionToken&& token)
+{
+ using Request = AsyncRequest<Mutex, std::unique_lock>;
+ using Signature = typename Request::Signature;
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto& handler = init.completion_handler;
+ auto ex1 = mtx.get_executor();
+ {
+ std::lock_guard lock{mutex};
+
+ boost::system::error_code ec;
+ if (state == Unlocked) {
+ state = Exclusive;
+
+ // post a successful completion
+ auto ex2 = boost::asio::get_associated_executor(handler, ex1);
+ auto alloc2 = boost::asio::get_associated_allocator(handler);
+ auto b = bind_handler(std::move(handler), ec,
+ std::unique_lock{mtx, std::adopt_lock});
+ ex2.post(forward_handler(std::move(b)), alloc2);
+ } else {
+ // create a request and add it to the exclusive list
+ using LockCompletion = typename Request::LockCompletion;
+ auto request = LockCompletion::create(ex1, std::move(handler), mtx);
+ exclusive_queue.push_back(*request.release());
+ }
+ }
+ return init.result.get();
+}
+
+inline void SharedMutexImpl::lock()
+{
+ boost::system::error_code ec;
+ lock(ec);
+ if (ec) {
+ throw boost::system::system_error(ec);
+ }
+}
+
+void SharedMutexImpl::lock(boost::system::error_code& ec)
+{
+ std::unique_lock lock{mutex};
+
+ if (state == Unlocked) {
+ state = Exclusive;
+ ec.clear();
+ } else {
+ SyncRequest request;
+ exclusive_queue.push_back(request);
+ ec = request.wait(lock);
+ }
+}
+
+inline bool SharedMutexImpl::try_lock()
+{
+ std::lock_guard lock{mutex};
+
+ if (state == Unlocked) {
+ state = Exclusive;
+ return true;
+ }
+ return false;
+}
+
+void SharedMutexImpl::unlock()
+{
+ RequestList granted;
+ {
+ std::lock_guard lock{mutex};
+ ceph_assert(state == Exclusive);
+
+ if (!exclusive_queue.empty()) {
+ // grant next exclusive lock
+ auto& request = exclusive_queue.front();
+ exclusive_queue.pop_front();
+ granted.push_back(request);
+ } else {
+ // grant shared locks, if any
+ state = shared_queue.size();
+ if (state > MaxShared) {
+ state = MaxShared;
+ auto end = std::next(shared_queue.begin(), MaxShared);
+ granted.splice(granted.end(), shared_queue,
+ shared_queue.begin(), end, MaxShared);
+ } else {
+ granted.splice(granted.end(), shared_queue);
+ }
+ }
+ }
+ complete(std::move(granted), boost::system::error_code{});
+}
+
+template <typename Mutex, typename CompletionToken>
+auto SharedMutexImpl::async_lock_shared(Mutex& mtx, CompletionToken&& token)
+{
+ using Request = AsyncRequest<Mutex, std::shared_lock>;
+ using Signature = typename Request::Signature;
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto& handler = init.completion_handler;
+ auto ex1 = mtx.get_executor();
+ {
+ std::lock_guard lock{mutex};
+
+ boost::system::error_code ec;
+ if (exclusive_queue.empty() && state < MaxShared) {
+ state++;
+
+ auto ex2 = boost::asio::get_associated_executor(handler, ex1);
+ auto alloc2 = boost::asio::get_associated_allocator(handler);
+ auto b = bind_handler(std::move(handler), ec,
+ std::shared_lock{mtx, std::adopt_lock});
+ ex2.post(forward_handler(std::move(b)), alloc2);
+ } else {
+ using LockCompletion = typename Request::LockCompletion;
+ auto request = LockCompletion::create(ex1, std::move(handler), mtx);
+ shared_queue.push_back(*request.release());
+ }
+ }
+ return init.result.get();
+}
+
+inline void SharedMutexImpl::lock_shared()
+{
+ boost::system::error_code ec;
+ lock_shared(ec);
+ if (ec) {
+ throw boost::system::system_error(ec);
+ }
+}
+
+void SharedMutexImpl::lock_shared(boost::system::error_code& ec)
+{
+ std::unique_lock lock{mutex};
+
+ if (exclusive_queue.empty() && state < MaxShared) {
+ state++;
+ ec.clear();
+ } else {
+ SyncRequest request;
+ shared_queue.push_back(request);
+ ec = request.wait(lock);
+ }
+}
+
+inline bool SharedMutexImpl::try_lock_shared()
+{
+ std::lock_guard lock{mutex};
+
+ if (exclusive_queue.empty() && state < MaxShared) {
+ state++;
+ return true;
+ }
+ return false;
+}
+
+inline void SharedMutexImpl::unlock_shared()
+{
+ std::lock_guard lock{mutex};
+ ceph_assert(state != Unlocked && state <= MaxShared);
+
+ if (state == 1 && !exclusive_queue.empty()) {
+ // grant next exclusive lock
+ state = Exclusive;
+ auto& request = exclusive_queue.front();
+ exclusive_queue.pop_front();
+ request.complete(boost::system::error_code{});
+ } else if (state == MaxShared && !shared_queue.empty() &&
+ exclusive_queue.empty()) {
+ // grant next shared lock
+ auto& request = shared_queue.front();
+ shared_queue.pop_front();
+ request.complete(boost::system::error_code{});
+ } else {
+ state--;
+ }
+}
+
+inline void SharedMutexImpl::cancel()
+{
+ RequestList canceled;
+ {
+ std::lock_guard lock{mutex};
+ canceled.splice(canceled.end(), shared_queue);
+ canceled.splice(canceled.end(), exclusive_queue);
+ }
+ complete(std::move(canceled), boost::asio::error::operation_aborted);
+}
+
+void SharedMutexImpl::complete(RequestList&& requests,
+ boost::system::error_code ec)
+{
+ while (!requests.empty()) {
+ auto& request = requests.front();
+ requests.pop_front();
+ try {
+ request.complete(ec);
+ } catch (...) {
+ // clean up any remaining completions and rethrow
+ requests.clear_and_dispose([] (LockRequest *r) { r->destroy(); });
+ throw;
+ }
+ }
+}
+
+} // namespace ceph::async::detail
diff --git a/src/common/async/forward_handler.h b/src/common/async/forward_handler.h
new file mode 100644
index 000000000..ae88cc83f
--- /dev/null
+++ b/src/common/async/forward_handler.h
@@ -0,0 +1,103 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_ASYNC_FORWARD_HANDLER_H
+#define CEPH_ASYNC_FORWARD_HANDLER_H
+
+#include <boost/asio.hpp>
+
+namespace ceph::async {
+
+/**
+ * A forwarding completion handler for use with boost::asio.
+ *
+ * A completion handler wrapper that invokes the handler's operator() as an
+ * rvalue, regardless of whether the wrapper is invoked as an lvalue or rvalue.
+ * This operation is potentially destructive to the wrapped handler, so is only
+ * suitable for single-use handlers.
+ *
+ * This is useful when combined with bind_handler() and move-only arguments,
+ * because executors will always call the lvalue overload of operator().
+ *
+ * The original Handler's associated allocator and executor are maintained.
+ *
+ * @see forward_handler
+ */
+template <typename Handler>
+struct ForwardingHandler {
+ Handler handler;
+
+ ForwardingHandler(Handler&& handler)
+ : handler(std::move(handler))
+ {}
+
+ template <typename ...Args>
+ void operator()(Args&& ...args) {
+ std::move(handler)(std::forward<Args>(args)...);
+ }
+
+ using allocator_type = boost::asio::associated_allocator_t<Handler>;
+ allocator_type get_allocator() const noexcept {
+ return boost::asio::get_associated_allocator(handler);
+ }
+};
+
+} // namespace ceph::async
+
+namespace boost::asio {
+
+// specialize boost::asio::associated_executor<> for ForwardingHandler
+template <typename Handler, typename Executor>
+struct associated_executor<ceph::async::ForwardingHandler<Handler>, Executor> {
+ using type = boost::asio::associated_executor_t<Handler, Executor>;
+
+ static type get(const ceph::async::ForwardingHandler<Handler>& handler,
+ const Executor& ex = Executor()) noexcept {
+ return boost::asio::get_associated_executor(handler.handler, ex);
+ }
+};
+
+} // namespace boost::asio
+
+namespace ceph::async {
+
+/**
+ * Returns a single-use completion handler that always forwards on operator().
+ *
+ * Wraps a completion handler such that it is always invoked as an rvalue. This
+ * is necessary when combining executors and bind_handler() with move-only
+ * argument types.
+ *
+ * Example use:
+ *
+ * auto callback = [] (std::unique_ptr<int>&& p) {};
+ * auto bound_handler = bind_handler(callback, std::make_unique<int>(5));
+ * auro handler = forward_handler(std::move(bound_handler));
+ *
+ * // execute the forwarding handler on an io_context:
+ * boost::asio::io_context context;
+ * boost::asio::post(context, std::move(handler));
+ * context.run();
+ *
+ * @see ForwardingHandler
+ */
+template <typename Handler>
+auto forward_handler(Handler&& h)
+{
+ return ForwardingHandler{std::forward<Handler>(h)};
+}
+
+} // namespace ceph::async
+
+#endif // CEPH_ASYNC_FORWARD_HANDLER_H
diff --git a/src/common/async/librados_completion.h b/src/common/async/librados_completion.h
new file mode 100644
index 000000000..2fa5555e7
--- /dev/null
+++ b/src/common/async/librados_completion.h
@@ -0,0 +1,125 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat
+ * Author: Adam C. Emerson <aemerson@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMMON_ASYNC_LIBRADOS_COMPLETION_H
+#define CEPH_COMMON_ASYNC_LIBRADOS_COMPLETION_H
+
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <optional>
+#include <type_traits>
+
+#include <boost/asio/async_result.hpp>
+
+#include <boost/system/error_code.hpp>
+#include <boost/system/system_error.hpp>
+
+#include "include/rados/librados.hpp"
+#include "librados/AioCompletionImpl.h"
+
+// Allow librados::AioCompletion to be provided as a completion
+// handler. This is only allowed with a signature of
+// (boost::system::error_code) or (). On completion the AioCompletion
+// is completed with the error_code converted to an int with
+// ceph::from_error_code.
+//
+// async_result::return_type is void.
+
+namespace ceph::async {
+
+namespace bs = boost::system;
+namespace lr = librados;
+
+namespace detail {
+
+struct librados_handler {
+ lr::AioCompletionImpl* pc;
+
+ explicit librados_handler(lr::AioCompletion* c) : pc(c->pc) {
+ pc->get();
+ }
+ ~librados_handler() {
+ if (pc) {
+ pc->put();
+ pc = nullptr;
+ }
+ }
+
+ librados_handler(const librados_handler&) = delete;
+ librados_handler& operator =(const librados_handler&) = delete;
+ librados_handler(librados_handler&& rhs) {
+ pc = rhs.pc;
+ rhs.pc = nullptr;
+ }
+
+ void operator()(bs::error_code ec) {
+ pc->lock.lock();
+ pc->rval = ceph::from_error_code(ec);
+ pc->complete = true;
+ pc->lock.unlock();
+
+ auto cb_complete = pc->callback_complete;
+ auto cb_complete_arg = pc->callback_complete_arg;
+ if (cb_complete)
+ cb_complete(pc, cb_complete_arg);
+
+ auto cb_safe = pc->callback_safe;
+ auto cb_safe_arg = pc->callback_safe_arg;
+ if (cb_safe)
+ cb_safe(pc, cb_safe_arg);
+
+ pc->lock.lock();
+ pc->callback_complete = NULL;
+ pc->callback_safe = NULL;
+ pc->cond.notify_all();
+ pc->put_unlock();
+ pc = nullptr;
+ }
+
+ void operator ()() {
+ (*this)(bs::error_code{});
+ }
+};
+} // namespace detail
+} // namespace ceph::async
+
+
+namespace boost::asio {
+template<typename ReturnType>
+class async_result<librados::AioCompletion*, ReturnType()> {
+public:
+ using completion_handler_type = ceph::async::detail::librados_handler;
+ explicit async_result(completion_handler_type&) {};
+ using return_type = void;
+ void get() {
+ return;
+ }
+};
+
+template<typename ReturnType>
+class async_result<librados::AioCompletion*,
+ ReturnType(boost::system::error_code)> {
+public:
+ using completion_handler_type = ceph::async::detail::librados_handler;
+ explicit async_result(completion_handler_type&) {};
+ using return_type = void;
+ void get() {
+ return;
+ }
+};
+}
+
+#endif // !CEPH_COMMON_ASYNC_LIBRADOS_COMPLETION_H
diff --git a/src/common/async/shared_mutex.h b/src/common/async/shared_mutex.h
new file mode 100644
index 000000000..3e471a4df
--- /dev/null
+++ b/src/common/async/shared_mutex.h
@@ -0,0 +1,212 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "common/async/detail/shared_mutex.h"
+
+namespace ceph::async {
+
+/**
+ * An asynchronous shared mutex for use with boost::asio.
+ *
+ * A shared mutex class with asynchronous lock operations that complete on a
+ * boost::asio executor. The class also has synchronous interfaces that meet
+ * most of the standard library's requirements for the SharedMutex concept,
+ * which makes it compatible with lock_guard, unique_lock, and shared_lock.
+ *
+ * All lock requests can fail with operation_aborted on cancel() or destruction.
+ * The non-error_code overloads of lock() and lock_shared() will throw this
+ * error as an exception of type boost::system::system_error.
+ *
+ * Exclusive locks are prioritized over shared locks. Locks of the same type
+ * are granted in fifo order. The implementation defines a limit on the number
+ * of shared locks to 65534 at a time.
+ *
+ * Example use:
+ *
+ * boost::asio::io_context context;
+ * SharedMutex mutex{context.get_executor()};
+ *
+ * mutex.async_lock([&] (boost::system::error_code ec, auto lock) {
+ * if (!ec) {
+ * // mutate shared state ...
+ * }
+ * });
+ * mutex.async_lock_shared([&] (boost::system::error_code ec, auto lock) {
+ * if (!ec) {
+ * // read shared state ...
+ * }
+ * });
+ *
+ * context.run();
+ */
+template <typename Executor>
+class SharedMutex {
+ public:
+ explicit SharedMutex(const Executor& ex);
+
+ /// on destruction, all pending lock requests are canceled
+ ~SharedMutex();
+
+ using executor_type = Executor;
+ executor_type get_executor() const noexcept { return ex; }
+
+ /// initiate an asynchronous request for an exclusive lock. when the lock is
+ /// granted, the completion handler is invoked with a successful error code
+ /// and a std::unique_lock that owns this mutex.
+ /// Signature = void(boost::system::error_code, std::unique_lock)
+ template <typename CompletionToken>
+ auto async_lock(CompletionToken&& token);
+
+ /// wait synchronously for an exclusive lock. if an error occurs before the
+ /// lock is granted, that error is thrown as an exception
+ void lock();
+
+ /// wait synchronously for an exclusive lock. if an error occurs before the
+ /// lock is granted, that error is assigned to 'ec'
+ void lock(boost::system::error_code& ec);
+
+ /// try to acquire an exclusive lock. if the lock is not immediately
+ /// available, returns false
+ bool try_lock();
+
+ /// releases an exclusive lock. not required to be called from the same thread
+ /// that initiated the lock
+ void unlock();
+
+ /// initiate an asynchronous request for a shared lock. when the lock is
+ /// granted, the completion handler is invoked with a successful error code
+ /// and a std::shared_lock that owns this mutex.
+ /// Signature = void(boost::system::error_code, std::shared_lock)
+ template <typename CompletionToken>
+ auto async_lock_shared(CompletionToken&& token);
+
+ /// wait synchronously for a shared lock. if an error occurs before the
+ /// lock is granted, that error is thrown as an exception
+ void lock_shared();
+
+ /// wait synchronously for a shared lock. if an error occurs before the lock
+ /// is granted, that error is assigned to 'ec'
+ void lock_shared(boost::system::error_code& ec);
+
+ /// try to acquire a shared lock. if the lock is not immediately available,
+ /// returns false
+ bool try_lock_shared();
+
+ /// releases a shared lock. not required to be called from the same thread
+ /// that initiated the lock
+ void unlock_shared();
+
+ /// cancel any pending requests for exclusive or shared locks with an
+ /// operation_aborted error
+ void cancel();
+
+ private:
+ Executor ex; //< default callback executor
+ boost::intrusive_ptr<detail::SharedMutexImpl> impl;
+
+ // allow lock guards to access impl
+ friend class std::unique_lock<SharedMutex>;
+ friend class std::shared_lock<SharedMutex>;
+};
+
+
+template <typename Executor>
+SharedMutex<Executor>::SharedMutex(const Executor& ex)
+ : ex(ex), impl(new detail::SharedMutexImpl)
+{
+}
+
+template <typename Executor>
+SharedMutex<Executor>::~SharedMutex()
+{
+ try {
+ impl->cancel();
+ } catch (const std::exception&) {
+ // swallow any exceptions, the destructor can't throw
+ }
+}
+
+template <typename Executor>
+template <typename CompletionToken>
+auto SharedMutex<Executor>::async_lock(CompletionToken&& token)
+{
+ return impl->async_lock(*this, std::forward<CompletionToken>(token));
+}
+
+template <typename Executor>
+void SharedMutex<Executor>::lock()
+{
+ impl->lock();
+}
+
+template <typename Executor>
+void SharedMutex<Executor>::lock(boost::system::error_code& ec)
+{
+ impl->lock(ec);
+}
+
+template <typename Executor>
+bool SharedMutex<Executor>::try_lock()
+{
+ return impl->try_lock();
+}
+
+template <typename Executor>
+void SharedMutex<Executor>::unlock()
+{
+ impl->unlock();
+}
+
+template <typename Executor>
+template <typename CompletionToken>
+auto SharedMutex<Executor>::async_lock_shared(CompletionToken&& token)
+{
+ return impl->async_lock_shared(*this, std::forward<CompletionToken>(token));
+}
+
+template <typename Executor>
+void SharedMutex<Executor>::lock_shared()
+{
+ impl->lock_shared();
+}
+
+template <typename Executor>
+void SharedMutex<Executor>::lock_shared(boost::system::error_code& ec)
+{
+ impl->lock_shared(ec);
+}
+
+template <typename Executor>
+bool SharedMutex<Executor>::try_lock_shared()
+{
+ return impl->try_lock_shared();
+}
+
+template <typename Executor>
+void SharedMutex<Executor>::unlock_shared()
+{
+ impl->unlock_shared();
+}
+
+template <typename Executor>
+void SharedMutex<Executor>::cancel()
+{
+ impl->cancel();
+}
+
+} // namespace ceph::async
+
+#include "common/async/detail/shared_lock.h"
diff --git a/src/common/async/waiter.h b/src/common/async/waiter.h
new file mode 100644
index 000000000..219a27cf7
--- /dev/null
+++ b/src/common/async/waiter.h
@@ -0,0 +1,223 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMMON_WAITER_H
+#define CEPH_COMMON_WAITER_H
+
+#include <condition_variable>
+#include <tuple>
+
+#include <boost/asio/async_result.hpp>
+
+#include "include/ceph_assert.h"
+#include "include/function2.hpp"
+
+#include "common/ceph_mutex.h"
+
+namespace ceph::async {
+namespace detail {
+// For safety reasons (avoiding undefined behavior around sequence
+// points) std::reference_wrapper disallows move construction. This
+// harms us in cases where we want to pass a reference in to something
+// that unavoidably moves.
+//
+// It should not be used generally.
+template<typename T>
+class rvalue_reference_wrapper {
+public:
+ // types
+ using type = T;
+
+ rvalue_reference_wrapper(T& r) noexcept
+ : p(std::addressof(r)) {}
+
+ // We write our semantics to match those of reference collapsing. If
+ // we're treated as an lvalue, collapse to one.
+
+ rvalue_reference_wrapper(const rvalue_reference_wrapper&) noexcept = default;
+ rvalue_reference_wrapper(rvalue_reference_wrapper&&) noexcept = default;
+
+ // assignment
+ rvalue_reference_wrapper& operator=(
+ const rvalue_reference_wrapper& x) noexcept = default;
+ rvalue_reference_wrapper& operator=(
+ rvalue_reference_wrapper&& x) noexcept = default;
+
+ operator T& () const noexcept {
+ return *p;
+ }
+ T& get() const noexcept {
+ return *p;
+ }
+
+ operator T&& () noexcept {
+ return std::move(*p);
+ }
+ T&& get() noexcept {
+ return std::move(*p);
+ }
+
+ template<typename... Args>
+ std::result_of_t<T&(Args&&...)> operator ()(Args&&... args ) const {
+ return (*p)(std::forward<Args>(args)...);
+ }
+
+ template<typename... Args>
+ std::result_of_t<T&&(Args&&...)> operator ()(Args&&... args ) {
+ return std::move(*p)(std::forward<Args>(args)...);
+ }
+
+private:
+ T* p;
+};
+
+class base {
+protected:
+ ceph::mutex lock = ceph::make_mutex("ceph::async::detail::base::lock");
+ ceph::condition_variable cond;
+ bool has_value = false;
+
+ ~base() = default;
+
+ auto wait_base() {
+ std::unique_lock l(lock);
+ cond.wait(l, [this](){ return has_value; });
+ return l;
+ }
+
+ auto exec_base() {
+ std::unique_lock l(lock);
+ // There's no really good way to handle being called twice
+ // without being reset.
+ ceph_assert(!has_value);
+ has_value = true;
+ cond.notify_one();
+ return l;
+ }
+};
+}
+
+// waiter is a replacement for C_SafeCond and friends. It is the
+// moral equivalent of a future but plays well with a world of
+// callbacks.
+template<typename ...S>
+class waiter;
+
+template<>
+class waiter<> final : public detail::base {
+public:
+ void wait() {
+ wait_base();
+ has_value = false;
+ }
+
+ void operator()() {
+ exec_base();
+ }
+
+ auto ref() {
+ return detail::rvalue_reference_wrapper(*this);
+ }
+
+
+ operator fu2::unique_function<void() &&>() {
+ return fu2::unique_function<void() &&>(ref());
+ }
+};
+
+template<typename Ret>
+class waiter<Ret> final : public detail::base {
+ std::aligned_storage_t<sizeof(Ret)> ret;
+
+public:
+ Ret wait() {
+ auto l = wait_base();
+ auto r = reinterpret_cast<Ret*>(&ret);
+ auto t = std::move(*r);
+ r->~Ret();
+ has_value = false;
+ return t;
+ }
+
+ void operator()(Ret&& _ret) {
+ auto l = exec_base();
+ auto r = reinterpret_cast<Ret*>(&ret);
+ *r = std::move(_ret);
+ }
+
+ void operator()(const Ret& _ret) {
+ auto l = exec_base();
+ auto r = reinterpret_cast<Ret*>(&ret);
+ *r = std::move(_ret);
+ }
+
+ auto ref() {
+ return detail::rvalue_reference_wrapper(*this);
+ }
+
+ operator fu2::unique_function<void(Ret) &&>() {
+ return fu2::unique_function<void(Ret) &&>(ref());
+ }
+
+ ~waiter() {
+ if (has_value)
+ reinterpret_cast<Ret*>(&ret)->~Ret();
+ }
+};
+
+template<typename ...Ret>
+class waiter final : public detail::base {
+ std::tuple<Ret...> ret;
+
+public:
+ std::tuple<Ret...> wait() {
+ using std::tuple;
+ auto l = wait_base();
+ return std::move(ret);
+ auto r = reinterpret_cast<std::tuple<Ret...>*>(&ret);
+ auto t = std::move(*r);
+ r->~tuple<Ret...>();
+ has_value = false;
+ return t;
+ }
+
+ void operator()(Ret&&... _ret) {
+ auto l = exec_base();
+ auto r = reinterpret_cast<std::tuple<Ret...>*>(&ret);
+ *r = std::forward_as_tuple(_ret...);
+ }
+
+ void operator()(const Ret&... _ret) {
+ auto l = exec_base();
+ auto r = reinterpret_cast<std::tuple<Ret...>*>(&ret);
+ *r = std::forward_as_tuple(_ret...);
+ }
+
+ auto ref() {
+ return detail::rvalue_reference_wrapper(*this);
+ }
+
+ operator fu2::unique_function<void(Ret...) &&>() {
+ return fu2::unique_function<void(Ret...) &&>(ref());
+ }
+
+ ~waiter() {
+ using std::tuple;
+ if (has_value)
+ reinterpret_cast<tuple<Ret...>*>(&ret)->~tuple<Ret...>();
+ }
+};
+}
+
+#endif // CEPH_COMMON_WAITER_H
diff --git a/src/common/async/yield_context.h b/src/common/async/yield_context.h
new file mode 100644
index 000000000..05e6ca614
--- /dev/null
+++ b/src/common/async/yield_context.h
@@ -0,0 +1,59 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <boost/range/begin.hpp>
+#include <boost/range/end.hpp>
+#include <boost/asio/io_context.hpp>
+
+#include "acconfig.h"
+
+#include <spawn/spawn.hpp>
+
+// use explicit executor types instead of the type-erased boost::asio::executor.
+// coroutines wrap the default io_context executor with a strand executor
+using yield_context = spawn::basic_yield_context<
+ boost::asio::executor_binder<void(*)(),
+ boost::asio::strand<boost::asio::io_context::executor_type>>>;
+
+/// optional-like wrapper for a spawn::yield_context and its associated
+/// boost::asio::io_context. operations that take an optional_yield argument
+/// will, when passed a non-empty yield context, suspend this coroutine instead
+/// of the blocking the thread of execution
+class optional_yield {
+ boost::asio::io_context *c = nullptr;
+ yield_context *y = nullptr;
+ public:
+ /// construct with a valid io and yield_context
+ explicit optional_yield(boost::asio::io_context& c,
+ yield_context& y) noexcept
+ : c(&c), y(&y) {}
+
+ /// type tag to construct an empty object
+ struct empty_t {};
+ optional_yield(empty_t) noexcept {}
+
+ /// implicit conversion to bool, returns true if non-empty
+ operator bool() const noexcept { return y; }
+
+ /// return a reference to the associated io_context. only valid if non-empty
+ boost::asio::io_context& get_io_context() const noexcept { return *c; }
+
+ /// return a reference to the yield_context. only valid if non-empty
+ yield_context& get_yield_context() const noexcept { return *y; }
+};
+
+// type tag object to construct an empty optional_yield
+static constexpr optional_yield::empty_t null_yield{};