summaryrefslogtreecommitdiffstats
path: root/src/common/async
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/common/async/bind_handler.h111
-rw-r--r--src/common/async/completion.h320
-rw-r--r--src/common/async/detail/shared_lock.h185
-rw-r--r--src/common/async/detail/shared_mutex.h326
-rw-r--r--src/common/async/forward_handler.h103
-rw-r--r--src/common/async/shared_mutex.h212
-rw-r--r--src/common/async/yield_context.h67
7 files changed, 1324 insertions, 0 deletions
diff --git a/src/common/async/bind_handler.h b/src/common/async/bind_handler.h
new file mode 100644
index 00000000..516d8a5e
--- /dev/null
+++ b/src/common/async/bind_handler.h
@@ -0,0 +1,111 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_ASYNC_BIND_HANDLER_H
+#define CEPH_ASYNC_BIND_HANDLER_H
+
+#include <tuple>
+#include <boost/asio.hpp>
+
+namespace ceph::async {
+
+/**
+ * A bound completion handler for use with boost::asio.
+ *
+ * A completion handler wrapper that allows a tuple of arguments to be forwarded
+ * to the original Handler. This is intended for use with boost::asio functions
+ * like defer(), dispatch() and post() which expect handlers which are callable
+ * with no arguments.
+ *
+ * The original Handler's associated allocator and executor are maintained.
+ *
+ * @see bind_handler
+ */
+template <typename Handler, typename Tuple>
+struct CompletionHandler {
+ Handler handler;
+ Tuple args;
+
+ CompletionHandler(Handler&& handler, Tuple&& args)
+ : handler(std::move(handler)),
+ args(std::move(args))
+ {}
+
+ void operator()() & {
+ std::apply(handler, args);
+ }
+ void operator()() const & {
+ std::apply(handler, args);
+ }
+ void operator()() && {
+ std::apply(std::move(handler), std::move(args));
+ }
+
+ using allocator_type = boost::asio::associated_allocator_t<Handler>;
+ allocator_type get_allocator() const noexcept {
+ return boost::asio::get_associated_allocator(handler);
+ }
+};
+
+} // namespace ceph::async
+
+namespace boost::asio {
+
+// specialize boost::asio::associated_executor<> for CompletionHandler
+template <typename Handler, typename Tuple, typename Executor>
+struct associated_executor<ceph::async::CompletionHandler<Handler, Tuple>, Executor> {
+ using type = boost::asio::associated_executor_t<Handler, Executor>;
+
+ static type get(const ceph::async::CompletionHandler<Handler, Tuple>& handler,
+ const Executor& ex = Executor()) noexcept {
+ return boost::asio::get_associated_executor(handler.handler, ex);
+ }
+};
+
+} // namespace boost::asio
+
+namespace ceph::async {
+
+/**
+ * Returns a wrapped completion handler with bound arguments.
+ *
+ * Binds the given arguments to a handler, and returns a CompletionHandler that
+ * is callable with no arguments. This is similar to std::bind(), except that
+ * all arguments must be provided. Move-only argument types are supported as
+ * long as the CompletionHandler's 'operator() &&' overload is used, i.e.
+ * std::move(handler)().
+ *
+ * Example use:
+ *
+ * // bind the arguments (5, "hello") to a callback lambda:
+ * auto callback = [] (int a, std::string b) {};
+ * auto handler = bind_handler(callback, 5, "hello");
+ *
+ * // execute the bound handler on an io_context:
+ * boost::asio::io_context context;
+ * boost::asio::post(context, std::move(handler));
+ * context.run();
+ *
+ * @see CompletionHandler
+ */
+template <typename Handler, typename ...Args>
+auto bind_handler(Handler&& h, Args&& ...args)
+{
+ return CompletionHandler{std::forward<Handler>(h),
+ std::make_tuple(std::forward<Args>(args)...)};
+}
+
+} // namespace ceph::async
+
+#endif // CEPH_ASYNC_BIND_HANDLER_H
diff --git a/src/common/async/completion.h b/src/common/async/completion.h
new file mode 100644
index 00000000..6af9109d
--- /dev/null
+++ b/src/common/async/completion.h
@@ -0,0 +1,320 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_ASYNC_COMPLETION_H
+#define CEPH_ASYNC_COMPLETION_H
+
+#include <memory>
+
+#include "bind_handler.h"
+#include "forward_handler.h"
+
+namespace ceph::async {
+
+/**
+ * Abstract completion handler interface for use with boost::asio.
+ *
+ * Memory management is performed using the Handler's 'associated allocator',
+ * which carries the additional requirement that its memory be released before
+ * the Handler is invoked. This allows memory allocated for one asynchronous
+ * operation to be reused in its continuation. Because of this requirement, any
+ * calls to invoke the completion must first release ownership of it. To enforce
+ * this, the static functions defer()/dispatch()/post() take the completion by
+ * rvalue-reference to std::unique_ptr<Completion>, i.e. std::move(completion).
+ *
+ * Handlers may also have an 'associated executor', so the calls to defer(),
+ * dispatch(), and post() are forwarded to that executor. If there is no
+ * associated executor (which is generally the case unless one was bound with
+ * boost::asio::bind_executor()), the executor passed to Completion::create()
+ * is used as a default.
+ *
+ * Example use:
+ *
+ * // declare a Completion type with Signature = void(int, string)
+ * using MyCompletion = ceph::async::Completion<void(int, string)>;
+ *
+ * // create a completion with the given callback:
+ * std::unique_ptr<MyCompletion> c;
+ * c = MyCompletion::create(ex, [] (int a, const string& b) {});
+ *
+ * // bind arguments to the callback and post to its associated executor:
+ * MyCompletion::post(std::move(c), 5, "hello");
+ *
+ *
+ * Additional user data may be stored along with the Completion to take
+ * advantage of the handler allocator optimization. This is accomplished by
+ * specifying its type in the template parameter T. For example, the type
+ * Completion<void(), int> contains a public member variable 'int user_data'.
+ * Any additional arguments to Completion::create() will be forwarded to type
+ * T's constructor.
+ *
+ * If the AsBase<T> type tag is used, as in Completion<void(), AsBase<T>>,
+ * the Completion will inherit from T instead of declaring it as a member
+ * variable.
+ *
+ * When invoking the completion handler via defer(), dispatch(), or post(),
+ * care must be taken when passing arguments that refer to user data, because
+ * its memory is destroyed prior to invocation. In such cases, the user data
+ * should be moved/copied out of the Completion first.
+ */
+template <typename Signature, typename T = void>
+class Completion;
+
+
+/// type tag for UserData
+template <typename T> struct AsBase {};
+
+namespace detail {
+
+/// optional user data to be stored with the Completion
+template <typename T>
+struct UserData {
+ T user_data;
+ template <typename ...Args>
+ UserData(Args&& ...args)
+ : user_data(std::forward<Args>(args)...)
+ {}
+};
+// AsBase specialization inherits from T
+template <typename T>
+struct UserData<AsBase<T>> : public T {
+ template <typename ...Args>
+ UserData(Args&& ...args)
+ : T(std::forward<Args>(args)...)
+ {}
+};
+// void specialization
+template <>
+class UserData<void> {};
+
+} // namespace detail
+
+
+// template specialization to pull the Signature's args apart
+template <typename T, typename ...Args>
+class Completion<void(Args...), T> : public detail::UserData<T> {
+ protected:
+ // internal interfaces for type-erasure on the Handler/Executor. uses
+ // tuple<Args...> to provide perfect forwarding because you can't make
+ // virtual function templates
+ virtual void destroy_defer(std::tuple<Args...>&& args) = 0;
+ virtual void destroy_dispatch(std::tuple<Args...>&& args) = 0;
+ virtual void destroy_post(std::tuple<Args...>&& args) = 0;
+ virtual void destroy() = 0;
+
+ // constructor is protected, use create(). any constructor arguments are
+ // forwarded to UserData
+ template <typename ...TArgs>
+ Completion(TArgs&& ...args)
+ : detail::UserData<T>(std::forward<TArgs>(args)...)
+ {}
+ public:
+ virtual ~Completion() = default;
+
+ // use the virtual destroy() interface on delete. this allows the derived
+ // class to manage its memory using Handler allocators, without having to use
+ // a custom Deleter for std::unique_ptr<>
+ static void operator delete(void *p) {
+ static_cast<Completion*>(p)->destroy();
+ }
+
+ /// completion factory function that uses the handler's associated allocator.
+ /// any additional arguments are forwared to T's constructor
+ template <typename Executor1, typename Handler, typename ...TArgs>
+ static std::unique_ptr<Completion>
+ create(const Executor1& ex1, Handler&& handler, TArgs&& ...args);
+
+ /// take ownership of the completion, bind any arguments to the completion
+ /// handler, then defer() it on its associated executor
+ template <typename ...Args2>
+ static void defer(std::unique_ptr<Completion>&& c, Args2&&...args);
+
+ /// take ownership of the completion, bind any arguments to the completion
+ /// handler, then dispatch() it on its associated executor
+ template <typename ...Args2>
+ static void dispatch(std::unique_ptr<Completion>&& c, Args2&&...args);
+
+ /// take ownership of the completion, bind any arguments to the completion
+ /// handler, then post() it to its associated executor
+ template <typename ...Args2>
+ static void post(std::unique_ptr<Completion>&& c, Args2&&...args);
+};
+
+namespace detail {
+
+// concrete Completion that knows how to invoke the completion handler. this
+// observes all of the 'Requirements on asynchronous operations' specified by
+// the C++ Networking TS
+template <typename Executor1, typename Handler, typename T, typename ...Args>
+class CompletionImpl final : public Completion<void(Args...), T> {
+ // use Handler's associated executor (or Executor1 by default) for callbacks
+ using Executor2 = boost::asio::associated_executor_t<Handler, Executor1>;
+ // maintain work on both executors
+ using Work1 = boost::asio::executor_work_guard<Executor1>;
+ using Work2 = boost::asio::executor_work_guard<Executor2>;
+ std::pair<Work1, Work2> work;
+ Handler handler;
+
+ // use Handler's associated allocator
+ using Alloc2 = boost::asio::associated_allocator_t<Handler>;
+ using Traits2 = std::allocator_traits<Alloc2>;
+ using RebindAlloc2 = typename Traits2::template rebind_alloc<CompletionImpl>;
+ using RebindTraits2 = std::allocator_traits<RebindAlloc2>;
+
+ // placement new for the handler allocator
+ static void* operator new(size_t, RebindAlloc2 alloc2) {
+ return RebindTraits2::allocate(alloc2, 1);
+ }
+ // placement delete for when the constructor throws during placement new
+ static void operator delete(void *p, RebindAlloc2 alloc2) {
+ RebindTraits2::deallocate(alloc2, static_cast<CompletionImpl*>(p), 1);
+ }
+
+ static auto bind_and_forward(Handler&& h, std::tuple<Args...>&& args) {
+ return forward_handler(CompletionHandler{std::move(h), std::move(args)});
+ }
+
+ void destroy_defer(std::tuple<Args...>&& args) override {
+ auto w = std::move(work);
+ auto f = bind_and_forward(std::move(handler), std::move(args));
+ RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
+ RebindTraits2::destroy(alloc2, this);
+ RebindTraits2::deallocate(alloc2, this, 1);
+ w.second.get_executor().defer(std::move(f), alloc2);
+ }
+ void destroy_dispatch(std::tuple<Args...>&& args) override {
+ auto w = std::move(work);
+ auto f = bind_and_forward(std::move(handler), std::move(args));
+ RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
+ RebindTraits2::destroy(alloc2, this);
+ RebindTraits2::deallocate(alloc2, this, 1);
+ w.second.get_executor().dispatch(std::move(f), alloc2);
+ }
+ void destroy_post(std::tuple<Args...>&& args) override {
+ auto w = std::move(work);
+ auto f = bind_and_forward(std::move(handler), std::move(args));
+ RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
+ RebindTraits2::destroy(alloc2, this);
+ RebindTraits2::deallocate(alloc2, this, 1);
+ w.second.get_executor().post(std::move(f), alloc2);
+ }
+ void destroy() override {
+ RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
+ RebindTraits2::destroy(alloc2, this);
+ RebindTraits2::deallocate(alloc2, this, 1);
+ }
+
+ // constructor is private, use create(). extra constructor arguments are
+ // forwarded to UserData
+ template <typename ...TArgs>
+ CompletionImpl(const Executor1& ex1, Handler&& handler, TArgs&& ...args)
+ : Completion<void(Args...), T>(std::forward<TArgs>(args)...),
+ work(ex1, boost::asio::make_work_guard(handler, ex1)),
+ handler(std::move(handler))
+ {}
+
+ public:
+ template <typename ...TArgs>
+ static auto create(const Executor1& ex, Handler&& handler, TArgs&& ...args) {
+ auto alloc2 = boost::asio::get_associated_allocator(handler);
+ using Ptr = std::unique_ptr<CompletionImpl>;
+ return Ptr{new (alloc2) CompletionImpl(ex, std::move(handler),
+ std::forward<TArgs>(args)...)};
+ }
+
+ static void operator delete(void *p) {
+ static_cast<CompletionImpl*>(p)->destroy();
+ }
+};
+
+} // namespace detail
+
+
+template <typename T, typename ...Args>
+template <typename Executor1, typename Handler, typename ...TArgs>
+std::unique_ptr<Completion<void(Args...), T>>
+Completion<void(Args...), T>::create(const Executor1& ex,
+ Handler&& handler, TArgs&& ...args)
+{
+ using Impl = detail::CompletionImpl<Executor1, Handler, T, Args...>;
+ return Impl::create(ex, std::forward<Handler>(handler),
+ std::forward<TArgs>(args)...);
+}
+
+template <typename T, typename ...Args>
+template <typename ...Args2>
+void Completion<void(Args...), T>::defer(std::unique_ptr<Completion>&& ptr,
+ Args2&& ...args)
+{
+ auto c = ptr.release();
+ c->destroy_defer(std::make_tuple(std::forward<Args2>(args)...));
+}
+
+template <typename T, typename ...Args>
+template <typename ...Args2>
+void Completion<void(Args...), T>::dispatch(std::unique_ptr<Completion>&& ptr,
+ Args2&& ...args)
+{
+ auto c = ptr.release();
+ c->destroy_dispatch(std::make_tuple(std::forward<Args2>(args)...));
+}
+
+template <typename T, typename ...Args>
+template <typename ...Args2>
+void Completion<void(Args...), T>::post(std::unique_ptr<Completion>&& ptr,
+ Args2&& ...args)
+{
+ auto c = ptr.release();
+ c->destroy_post(std::make_tuple(std::forward<Args2>(args)...));
+}
+
+
+/// completion factory function that uses the handler's associated allocator.
+/// any additional arguments are forwared to T's constructor
+template <typename Signature, typename T, typename Executor1,
+ typename Handler, typename ...TArgs>
+std::unique_ptr<Completion<Signature, T>>
+create_completion(const Executor1& ex, Handler&& handler, TArgs&& ...args)
+{
+ return Completion<Signature, T>::create(ex, std::forward<Handler>(handler),
+ std::forward<TArgs>(args)...);
+}
+
+/// take ownership of the completion, bind any arguments to the completion
+/// handler, then defer() it on its associated executor
+template <typename Signature, typename T, typename ...Args>
+void defer(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args)
+{
+ Completion<Signature, T>::defer(std::move(ptr), std::forward<Args>(args)...);
+}
+
+/// take ownership of the completion, bind any arguments to the completion
+/// handler, then dispatch() it on its associated executor
+template <typename Signature, typename T, typename ...Args>
+void dispatch(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args)
+{
+ Completion<Signature, T>::dispatch(std::move(ptr), std::forward<Args>(args)...);
+}
+
+/// take ownership of the completion, bind any arguments to the completion
+/// handler, then post() it to its associated executor
+template <typename Signature, typename T, typename ...Args>
+void post(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args)
+{
+ Completion<Signature, T>::post(std::move(ptr), std::forward<Args>(args)...);
+}
+
+} // namespace ceph::async
+
+#endif // CEPH_ASYNC_COMPLETION_H
diff --git a/src/common/async/detail/shared_lock.h b/src/common/async/detail/shared_lock.h
new file mode 100644
index 00000000..12e6a922
--- /dev/null
+++ b/src/common/async/detail/shared_lock.h
@@ -0,0 +1,185 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+namespace std {
+
+// specialize unique_lock and shared_lock for SharedMutex to operate on
+// SharedMutexImpl instead, because the locks may outlive the SharedMutex itself
+
+template <typename Executor>
+class unique_lock<ceph::async::SharedMutex<Executor>> {
+ public:
+ using mutex_type = boost::intrusive_ptr<ceph::async::detail::SharedMutexImpl>;
+
+ unique_lock() = default;
+ explicit unique_lock(ceph::async::SharedMutex<Executor>& m)
+ : impl(m.impl), locked(true)
+ {
+ impl->lock();
+ }
+ unique_lock(ceph::async::SharedMutex<Executor>& m, defer_lock_t t) noexcept
+ : impl(m.impl)
+ {}
+ unique_lock(ceph::async::SharedMutex<Executor>& m, try_to_lock_t t)
+ : impl(m.impl), locked(impl->try_lock())
+ {}
+ unique_lock(ceph::async::SharedMutex<Executor>& m, adopt_lock_t t) noexcept
+ : impl(m.impl), locked(true)
+ {}
+ ~unique_lock() {
+ if (impl && locked)
+ impl->unlock();
+ }
+
+ unique_lock(unique_lock&& other) noexcept
+ : impl(std::move(other.impl)),
+ locked(other.locked) {
+ other.locked = false;
+ }
+ unique_lock& operator=(unique_lock&& other) noexcept {
+ if (impl && locked) {
+ impl->unlock();
+ }
+ impl = std::move(other.impl);
+ locked = other.locked;
+ other.locked = false;
+ return *this;
+ }
+ void swap(unique_lock& other) noexcept {
+ using std::swap;
+ swap(impl, other.impl);
+ swap(locked, other.locked);
+ }
+
+ mutex_type mutex() const noexcept { return impl; }
+ bool owns_lock() const noexcept { return impl && locked; }
+ explicit operator bool() const noexcept { return impl && locked; }
+
+ mutex_type release() {
+ auto result = std::move(impl);
+ locked = false;
+ return result;
+ }
+
+ void lock() {
+ if (!impl)
+ throw system_error(make_error_code(errc::operation_not_permitted));
+ if (locked)
+ throw system_error(make_error_code(errc::resource_deadlock_would_occur));
+ impl->lock();
+ locked = true;
+ }
+ bool try_lock() {
+ if (!impl)
+ throw system_error(make_error_code(errc::operation_not_permitted));
+ if (locked)
+ throw system_error(make_error_code(errc::resource_deadlock_would_occur));
+ return locked = impl->try_lock();
+ }
+ void unlock() {
+ if (!impl || !locked)
+ throw system_error(make_error_code(errc::operation_not_permitted));
+ impl->unlock();
+ locked = false;
+ }
+ private:
+ mutex_type impl;
+ bool locked{false};
+};
+
+template <typename Executor>
+class shared_lock<ceph::async::SharedMutex<Executor>> {
+ public:
+ using mutex_type = boost::intrusive_ptr<ceph::async::detail::SharedMutexImpl>;
+
+ shared_lock() = default;
+ explicit shared_lock(ceph::async::SharedMutex<Executor>& m)
+ : impl(m.impl), locked(true)
+ {
+ impl->lock_shared();
+ }
+ shared_lock(ceph::async::SharedMutex<Executor>& m, defer_lock_t t) noexcept
+ : impl(m.impl)
+ {}
+ shared_lock(ceph::async::SharedMutex<Executor>& m, try_to_lock_t t)
+ : impl(m.impl), locked(impl->try_lock_shared())
+ {}
+ shared_lock(ceph::async::SharedMutex<Executor>& m, adopt_lock_t t) noexcept
+ : impl(m.impl), locked(true)
+ {}
+
+ ~shared_lock() {
+ if (impl && locked)
+ impl->unlock_shared();
+ }
+
+ shared_lock(shared_lock&& other) noexcept
+ : impl(std::move(other.impl)),
+ locked(other.locked) {
+ other.locked = false;
+ }
+ shared_lock& operator=(shared_lock&& other) noexcept {
+ if (impl && locked) {
+ impl->unlock_shared();
+ }
+ impl = std::move(other.impl);
+ locked = other.locked;
+ other.locked = false;
+ return *this;
+ }
+ void swap(shared_lock& other) noexcept {
+ using std::swap;
+ swap(impl, other.impl);
+ swap(locked, other.locked);
+ }
+
+ mutex_type mutex() const noexcept { return impl; }
+ bool owns_lock() const noexcept { return impl && locked; }
+ explicit operator bool() const noexcept { return impl && locked; }
+
+ mutex_type release() {
+ auto result = std::move(impl);
+ locked = false;
+ return result;
+ }
+
+ void lock() {
+ if (!impl)
+ throw system_error(make_error_code(errc::operation_not_permitted));
+ if (locked)
+ throw system_error(make_error_code(errc::resource_deadlock_would_occur));
+ impl->lock_shared();
+ locked = true;
+ }
+ bool try_lock() {
+ if (!impl)
+ throw system_error(make_error_code(errc::operation_not_permitted));
+ if (locked)
+ throw system_error(make_error_code(errc::resource_deadlock_would_occur));
+ return locked = impl->try_lock_shared();
+ }
+ void unlock() {
+ if (!impl || !locked)
+ throw system_error(make_error_code(errc::operation_not_permitted));
+ impl->unlock_shared();
+ locked = false;
+ }
+ private:
+ mutex_type impl;
+ bool locked{false};
+};
+
+} // namespace std
diff --git a/src/common/async/detail/shared_mutex.h b/src/common/async/detail/shared_mutex.h
new file mode 100644
index 00000000..8e543635
--- /dev/null
+++ b/src/common/async/detail/shared_mutex.h
@@ -0,0 +1,326 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <optional>
+#include <shared_mutex> // for std::shared_lock
+
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/intrusive/list.hpp>
+
+#include "include/ceph_assert.h"
+
+#include "common/async/completion.h"
+
+namespace ceph::async::detail {
+
+struct LockRequest : public boost::intrusive::list_base_hook<> {
+ virtual ~LockRequest() {}
+ virtual void complete(boost::system::error_code ec) = 0;
+ virtual void destroy() = 0;
+};
+
+class SharedMutexImpl : public boost::intrusive_ref_counter<SharedMutexImpl> {
+ public:
+ ~SharedMutexImpl();
+
+ template <typename Mutex, typename CompletionToken>
+ auto async_lock(Mutex& mtx, CompletionToken&& token);
+ void lock();
+ void lock(boost::system::error_code& ec);
+ bool try_lock();
+ void unlock();
+ template <typename Mutex, typename CompletionToken>
+ auto async_lock_shared(Mutex& mtx, CompletionToken&& token);
+ void lock_shared();
+ void lock_shared(boost::system::error_code& ec);
+ bool try_lock_shared();
+ void unlock_shared();
+ void cancel();
+
+ private:
+ using RequestList = boost::intrusive::list<LockRequest>;
+
+ RequestList shared_queue; //< requests waiting on a shared lock
+ RequestList exclusive_queue; //< requests waiting on an exclusive lock
+
+ /// lock state encodes the number of shared lockers, or 'max' for exclusive
+ using LockState = uint16_t;
+ static constexpr LockState Unlocked = 0;
+ static constexpr LockState Exclusive = std::numeric_limits<LockState>::max();
+ static constexpr LockState MaxShared = Exclusive - 1;
+ LockState state = Unlocked; //< current lock state
+
+ std::mutex mutex; //< protects lock state and wait queues
+
+ void complete(RequestList&& requests, boost::system::error_code ec);
+};
+
+// sync requests live on the stack and wait on a condition variable
+class SyncRequest : public LockRequest {
+ std::condition_variable cond;
+ std::optional<boost::system::error_code> ec;
+ public:
+ boost::system::error_code wait(std::unique_lock<std::mutex>& lock) {
+ // return the error code once its been set
+ cond.wait(lock, [this] { return ec; });
+ return *ec;
+ }
+ void complete(boost::system::error_code ec) override {
+ this->ec = ec;
+ cond.notify_one();
+ }
+ void destroy() override {
+ // nothing, SyncRequests live on the stack
+ }
+};
+
+// async requests use async::Completion to invoke a handler on its executor
+template <typename Mutex, template <typename> typename Lock>
+class AsyncRequest : public LockRequest {
+ Mutex& mutex; //< mutex argument for lock guard
+ public:
+ explicit AsyncRequest(Mutex& mutex) : mutex(mutex) {}
+
+ using Signature = void(boost::system::error_code, Lock<Mutex>);
+ using LockCompletion = Completion<Signature, AsBase<AsyncRequest>>;
+
+ void complete(boost::system::error_code ec) override {
+ auto r = static_cast<LockCompletion*>(this);
+ // pass ownership of ourselves to post(). on error, pass an empty lock
+ post(std::unique_ptr<LockCompletion>{r}, ec,
+ ec ? Lock{mutex, std::defer_lock} : Lock{mutex, std::adopt_lock});
+ }
+ void destroy() override {
+ delete static_cast<LockCompletion*>(this);
+ }
+};
+
+inline SharedMutexImpl::~SharedMutexImpl()
+{
+ ceph_assert(state == Unlocked);
+ ceph_assert(shared_queue.empty());
+ ceph_assert(exclusive_queue.empty());
+}
+
+template <typename Mutex, typename CompletionToken>
+auto SharedMutexImpl::async_lock(Mutex& mtx, CompletionToken&& token)
+{
+ using Request = AsyncRequest<Mutex, std::unique_lock>;
+ using Signature = typename Request::Signature;
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto& handler = init.completion_handler;
+ auto ex1 = mtx.get_executor();
+ {
+ std::lock_guard lock{mutex};
+
+ boost::system::error_code ec;
+ if (state == Unlocked) {
+ state = Exclusive;
+
+ // post a successful completion
+ auto ex2 = boost::asio::get_associated_executor(handler, ex1);
+ auto alloc2 = boost::asio::get_associated_allocator(handler);
+ auto b = bind_handler(std::move(handler), ec,
+ std::unique_lock{mtx, std::adopt_lock});
+ ex2.post(forward_handler(std::move(b)), alloc2);
+ } else {
+ // create a request and add it to the exclusive list
+ using LockCompletion = typename Request::LockCompletion;
+ auto request = LockCompletion::create(ex1, std::move(handler), mtx);
+ exclusive_queue.push_back(*request.release());
+ }
+ }
+ return init.result.get();
+}
+
+inline void SharedMutexImpl::lock()
+{
+ boost::system::error_code ec;
+ lock(ec);
+ if (ec) {
+ throw boost::system::system_error(ec);
+ }
+}
+
+void SharedMutexImpl::lock(boost::system::error_code& ec)
+{
+ std::unique_lock lock{mutex};
+
+ if (state == Unlocked) {
+ state = Exclusive;
+ ec.clear();
+ } else {
+ SyncRequest request;
+ exclusive_queue.push_back(request);
+ ec = request.wait(lock);
+ }
+}
+
+inline bool SharedMutexImpl::try_lock()
+{
+ std::lock_guard lock{mutex};
+
+ if (state == Unlocked) {
+ state = Exclusive;
+ return true;
+ }
+ return false;
+}
+
+void SharedMutexImpl::unlock()
+{
+ RequestList granted;
+ {
+ std::lock_guard lock{mutex};
+ ceph_assert(state == Exclusive);
+
+ if (!exclusive_queue.empty()) {
+ // grant next exclusive lock
+ auto& request = exclusive_queue.front();
+ exclusive_queue.pop_front();
+ granted.push_back(request);
+ } else {
+ // grant shared locks, if any
+ state = shared_queue.size();
+ if (state > MaxShared) {
+ state = MaxShared;
+ auto end = std::next(shared_queue.begin(), MaxShared);
+ granted.splice(granted.end(), shared_queue,
+ shared_queue.begin(), end, MaxShared);
+ } else {
+ granted.splice(granted.end(), shared_queue);
+ }
+ }
+ }
+ complete(std::move(granted), boost::system::error_code{});
+}
+
+template <typename Mutex, typename CompletionToken>
+auto SharedMutexImpl::async_lock_shared(Mutex& mtx, CompletionToken&& token)
+{
+ using Request = AsyncRequest<Mutex, std::shared_lock>;
+ using Signature = typename Request::Signature;
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto& handler = init.completion_handler;
+ auto ex1 = mtx.get_executor();
+ {
+ std::lock_guard lock{mutex};
+
+ boost::system::error_code ec;
+ if (exclusive_queue.empty() && state < MaxShared) {
+ state++;
+
+ auto ex2 = boost::asio::get_associated_executor(handler, ex1);
+ auto alloc2 = boost::asio::get_associated_allocator(handler);
+ auto b = bind_handler(std::move(handler), ec,
+ std::shared_lock{mtx, std::adopt_lock});
+ ex2.post(forward_handler(std::move(b)), alloc2);
+ } else {
+ using LockCompletion = typename Request::LockCompletion;
+ auto request = LockCompletion::create(ex1, std::move(handler), mtx);
+ shared_queue.push_back(*request.release());
+ }
+ }
+ return init.result.get();
+}
+
+inline void SharedMutexImpl::lock_shared()
+{
+ boost::system::error_code ec;
+ lock_shared(ec);
+ if (ec) {
+ throw boost::system::system_error(ec);
+ }
+}
+
+void SharedMutexImpl::lock_shared(boost::system::error_code& ec)
+{
+ std::unique_lock lock{mutex};
+
+ if (exclusive_queue.empty() && state < MaxShared) {
+ state++;
+ ec.clear();
+ } else {
+ SyncRequest request;
+ shared_queue.push_back(request);
+ ec = request.wait(lock);
+ }
+}
+
+inline bool SharedMutexImpl::try_lock_shared()
+{
+ std::lock_guard lock{mutex};
+
+ if (exclusive_queue.empty() && state < MaxShared) {
+ state++;
+ return true;
+ }
+ return false;
+}
+
+inline void SharedMutexImpl::unlock_shared()
+{
+ std::lock_guard lock{mutex};
+ ceph_assert(state != Unlocked && state <= MaxShared);
+
+ if (state == 1 && !exclusive_queue.empty()) {
+ // grant next exclusive lock
+ state = Exclusive;
+ auto& request = exclusive_queue.front();
+ exclusive_queue.pop_front();
+ request.complete(boost::system::error_code{});
+ } else if (state == MaxShared && !shared_queue.empty() &&
+ exclusive_queue.empty()) {
+ // grant next shared lock
+ auto& request = shared_queue.front();
+ shared_queue.pop_front();
+ request.complete(boost::system::error_code{});
+ } else {
+ state--;
+ }
+}
+
+inline void SharedMutexImpl::cancel()
+{
+ RequestList canceled;
+ {
+ std::lock_guard lock{mutex};
+ canceled.splice(canceled.end(), shared_queue);
+ canceled.splice(canceled.end(), exclusive_queue);
+ }
+ complete(std::move(canceled), boost::asio::error::operation_aborted);
+}
+
+void SharedMutexImpl::complete(RequestList&& requests,
+ boost::system::error_code ec)
+{
+ while (!requests.empty()) {
+ auto& request = requests.front();
+ requests.pop_front();
+ try {
+ request.complete(ec);
+ } catch (...) {
+ // clean up any remaining completions and rethrow
+ requests.clear_and_dispose([] (LockRequest *r) { r->destroy(); });
+ throw;
+ }
+ }
+}
+
+} // namespace ceph::async::detail
diff --git a/src/common/async/forward_handler.h b/src/common/async/forward_handler.h
new file mode 100644
index 00000000..ae88cc83
--- /dev/null
+++ b/src/common/async/forward_handler.h
@@ -0,0 +1,103 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_ASYNC_FORWARD_HANDLER_H
+#define CEPH_ASYNC_FORWARD_HANDLER_H
+
+#include <boost/asio.hpp>
+
+namespace ceph::async {
+
+/**
+ * A forwarding completion handler for use with boost::asio.
+ *
+ * A completion handler wrapper that invokes the handler's operator() as an
+ * rvalue, regardless of whether the wrapper is invoked as an lvalue or rvalue.
+ * This operation is potentially destructive to the wrapped handler, so is only
+ * suitable for single-use handlers.
+ *
+ * This is useful when combined with bind_handler() and move-only arguments,
+ * because executors will always call the lvalue overload of operator().
+ *
+ * The original Handler's associated allocator and executor are maintained.
+ *
+ * @see forward_handler
+ */
+template <typename Handler>
+struct ForwardingHandler {
+ Handler handler;
+
+ ForwardingHandler(Handler&& handler)
+ : handler(std::move(handler))
+ {}
+
+ template <typename ...Args>
+ void operator()(Args&& ...args) {
+ std::move(handler)(std::forward<Args>(args)...);
+ }
+
+ using allocator_type = boost::asio::associated_allocator_t<Handler>;
+ allocator_type get_allocator() const noexcept {
+ return boost::asio::get_associated_allocator(handler);
+ }
+};
+
+} // namespace ceph::async
+
+namespace boost::asio {
+
+// specialize boost::asio::associated_executor<> for ForwardingHandler
+template <typename Handler, typename Executor>
+struct associated_executor<ceph::async::ForwardingHandler<Handler>, Executor> {
+ using type = boost::asio::associated_executor_t<Handler, Executor>;
+
+ static type get(const ceph::async::ForwardingHandler<Handler>& handler,
+ const Executor& ex = Executor()) noexcept {
+ return boost::asio::get_associated_executor(handler.handler, ex);
+ }
+};
+
+} // namespace boost::asio
+
+namespace ceph::async {
+
+/**
+ * Returns a single-use completion handler that always forwards on operator().
+ *
+ * Wraps a completion handler such that it is always invoked as an rvalue. This
+ * is necessary when combining executors and bind_handler() with move-only
+ * argument types.
+ *
+ * Example use:
+ *
+ * auto callback = [] (std::unique_ptr<int>&& p) {};
+ * auto bound_handler = bind_handler(callback, std::make_unique<int>(5));
+ * auro handler = forward_handler(std::move(bound_handler));
+ *
+ * // execute the forwarding handler on an io_context:
+ * boost::asio::io_context context;
+ * boost::asio::post(context, std::move(handler));
+ * context.run();
+ *
+ * @see ForwardingHandler
+ */
+template <typename Handler>
+auto forward_handler(Handler&& h)
+{
+ return ForwardingHandler{std::forward<Handler>(h)};
+}
+
+} // namespace ceph::async
+
+#endif // CEPH_ASYNC_FORWARD_HANDLER_H
diff --git a/src/common/async/shared_mutex.h b/src/common/async/shared_mutex.h
new file mode 100644
index 00000000..3e471a4d
--- /dev/null
+++ b/src/common/async/shared_mutex.h
@@ -0,0 +1,212 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "common/async/detail/shared_mutex.h"
+
+namespace ceph::async {
+
+/**
+ * An asynchronous shared mutex for use with boost::asio.
+ *
+ * A shared mutex class with asynchronous lock operations that complete on a
+ * boost::asio executor. The class also has synchronous interfaces that meet
+ * most of the standard library's requirements for the SharedMutex concept,
+ * which makes it compatible with lock_guard, unique_lock, and shared_lock.
+ *
+ * All lock requests can fail with operation_aborted on cancel() or destruction.
+ * The non-error_code overloads of lock() and lock_shared() will throw this
+ * error as an exception of type boost::system::system_error.
+ *
+ * Exclusive locks are prioritized over shared locks. Locks of the same type
+ * are granted in fifo order. The implementation defines a limit on the number
+ * of shared locks to 65534 at a time.
+ *
+ * Example use:
+ *
+ * boost::asio::io_context context;
+ * SharedMutex mutex{context.get_executor()};
+ *
+ * mutex.async_lock([&] (boost::system::error_code ec, auto lock) {
+ * if (!ec) {
+ * // mutate shared state ...
+ * }
+ * });
+ * mutex.async_lock_shared([&] (boost::system::error_code ec, auto lock) {
+ * if (!ec) {
+ * // read shared state ...
+ * }
+ * });
+ *
+ * context.run();
+ */
+template <typename Executor>
+class SharedMutex {
+ public:
+ explicit SharedMutex(const Executor& ex);
+
+ /// on destruction, all pending lock requests are canceled
+ ~SharedMutex();
+
+ using executor_type = Executor;
+ executor_type get_executor() const noexcept { return ex; }
+
+ /// initiate an asynchronous request for an exclusive lock. when the lock is
+ /// granted, the completion handler is invoked with a successful error code
+ /// and a std::unique_lock that owns this mutex.
+ /// Signature = void(boost::system::error_code, std::unique_lock)
+ template <typename CompletionToken>
+ auto async_lock(CompletionToken&& token);
+
+ /// wait synchronously for an exclusive lock. if an error occurs before the
+ /// lock is granted, that error is thrown as an exception
+ void lock();
+
+ /// wait synchronously for an exclusive lock. if an error occurs before the
+ /// lock is granted, that error is assigned to 'ec'
+ void lock(boost::system::error_code& ec);
+
+ /// try to acquire an exclusive lock. if the lock is not immediately
+ /// available, returns false
+ bool try_lock();
+
+ /// releases an exclusive lock. not required to be called from the same thread
+ /// that initiated the lock
+ void unlock();
+
+ /// initiate an asynchronous request for a shared lock. when the lock is
+ /// granted, the completion handler is invoked with a successful error code
+ /// and a std::shared_lock that owns this mutex.
+ /// Signature = void(boost::system::error_code, std::shared_lock)
+ template <typename CompletionToken>
+ auto async_lock_shared(CompletionToken&& token);
+
+ /// wait synchronously for a shared lock. if an error occurs before the
+ /// lock is granted, that error is thrown as an exception
+ void lock_shared();
+
+ /// wait synchronously for a shared lock. if an error occurs before the lock
+ /// is granted, that error is assigned to 'ec'
+ void lock_shared(boost::system::error_code& ec);
+
+ /// try to acquire a shared lock. if the lock is not immediately available,
+ /// returns false
+ bool try_lock_shared();
+
+ /// releases a shared lock. not required to be called from the same thread
+ /// that initiated the lock
+ void unlock_shared();
+
+ /// cancel any pending requests for exclusive or shared locks with an
+ /// operation_aborted error
+ void cancel();
+
+ private:
+ Executor ex; //< default callback executor
+ boost::intrusive_ptr<detail::SharedMutexImpl> impl;
+
+ // allow lock guards to access impl
+ friend class std::unique_lock<SharedMutex>;
+ friend class std::shared_lock<SharedMutex>;
+};
+
+
+template <typename Executor>
+SharedMutex<Executor>::SharedMutex(const Executor& ex)
+ : ex(ex), impl(new detail::SharedMutexImpl)
+{
+}
+
+template <typename Executor>
+SharedMutex<Executor>::~SharedMutex()
+{
+ try {
+ impl->cancel();
+ } catch (const std::exception&) {
+ // swallow any exceptions, the destructor can't throw
+ }
+}
+
+template <typename Executor>
+template <typename CompletionToken>
+auto SharedMutex<Executor>::async_lock(CompletionToken&& token)
+{
+ return impl->async_lock(*this, std::forward<CompletionToken>(token));
+}
+
+template <typename Executor>
+void SharedMutex<Executor>::lock()
+{
+ impl->lock();
+}
+
+template <typename Executor>
+void SharedMutex<Executor>::lock(boost::system::error_code& ec)
+{
+ impl->lock(ec);
+}
+
+template <typename Executor>
+bool SharedMutex<Executor>::try_lock()
+{
+ return impl->try_lock();
+}
+
+template <typename Executor>
+void SharedMutex<Executor>::unlock()
+{
+ impl->unlock();
+}
+
+template <typename Executor>
+template <typename CompletionToken>
+auto SharedMutex<Executor>::async_lock_shared(CompletionToken&& token)
+{
+ return impl->async_lock_shared(*this, std::forward<CompletionToken>(token));
+}
+
+template <typename Executor>
+void SharedMutex<Executor>::lock_shared()
+{
+ impl->lock_shared();
+}
+
+template <typename Executor>
+void SharedMutex<Executor>::lock_shared(boost::system::error_code& ec)
+{
+ impl->lock_shared(ec);
+}
+
+template <typename Executor>
+bool SharedMutex<Executor>::try_lock_shared()
+{
+ return impl->try_lock_shared();
+}
+
+template <typename Executor>
+void SharedMutex<Executor>::unlock_shared()
+{
+ impl->unlock_shared();
+}
+
+template <typename Executor>
+void SharedMutex<Executor>::cancel()
+{
+ impl->cancel();
+}
+
+} // namespace ceph::async
+
+#include "common/async/detail/shared_lock.h"
diff --git a/src/common/async/yield_context.h b/src/common/async/yield_context.h
new file mode 100644
index 00000000..436192c0
--- /dev/null
+++ b/src/common/async/yield_context.h
@@ -0,0 +1,67 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <boost/range/begin.hpp>
+#include <boost/range/end.hpp>
+#include <boost/asio/io_context.hpp>
+
+#include "acconfig.h"
+
+#ifndef HAVE_BOOST_CONTEXT
+
+// hide the dependencies on boost::context and boost::coroutines
+namespace boost::asio {
+struct yield_context;
+}
+
+#else // HAVE_BOOST_CONTEXT
+#ifndef BOOST_COROUTINES_NO_DEPRECATION_WARNING
+#define BOOST_COROUTINES_NO_DEPRECATION_WARNING
+#endif
+#include <boost/asio/spawn.hpp>
+
+#endif // HAVE_BOOST_CONTEXT
+
+
+/// optional-like wrapper for a boost::asio::yield_context and its associated
+/// boost::asio::io_context. operations that take an optional_yield argument
+/// will, when passed a non-empty yield context, suspend this coroutine instead
+/// of the blocking the thread of execution
+class optional_yield {
+ boost::asio::io_context *c = nullptr;
+ boost::asio::yield_context *y = nullptr;
+ public:
+ /// construct with a valid io and yield_context
+ explicit optional_yield(boost::asio::io_context& c,
+ boost::asio::yield_context& y) noexcept
+ : c(&c), y(&y) {}
+
+ /// type tag to construct an empty object
+ struct empty_t {};
+ optional_yield(empty_t) noexcept {}
+
+ /// implicit conversion to bool, returns true if non-empty
+ operator bool() const noexcept { return y; }
+
+ /// return a reference to the associated io_context. only valid if non-empty
+ boost::asio::io_context& get_io_context() const noexcept { return *c; }
+
+ /// return a reference to the yield_context. only valid if non-empty
+ boost::asio::yield_context& get_yield_context() const noexcept { return *y; }
+};
+
+// type tag object to construct an empty optional_yield
+static constexpr optional_yield::empty_t null_yield{};