diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/common/async | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/common/async')
-rw-r--r-- | src/common/async/bind_handler.h | 111 | ||||
-rw-r--r-- | src/common/async/completion.h | 320 | ||||
-rw-r--r-- | src/common/async/detail/shared_lock.h | 185 | ||||
-rw-r--r-- | src/common/async/detail/shared_mutex.h | 326 | ||||
-rw-r--r-- | src/common/async/forward_handler.h | 103 | ||||
-rw-r--r-- | src/common/async/shared_mutex.h | 212 | ||||
-rw-r--r-- | src/common/async/yield_context.h | 67 |
7 files changed, 1324 insertions, 0 deletions
diff --git a/src/common/async/bind_handler.h b/src/common/async/bind_handler.h new file mode 100644 index 00000000..516d8a5e --- /dev/null +++ b/src/common/async/bind_handler.h @@ -0,0 +1,111 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ASYNC_BIND_HANDLER_H +#define CEPH_ASYNC_BIND_HANDLER_H + +#include <tuple> +#include <boost/asio.hpp> + +namespace ceph::async { + +/** + * A bound completion handler for use with boost::asio. + * + * A completion handler wrapper that allows a tuple of arguments to be forwarded + * to the original Handler. This is intended for use with boost::asio functions + * like defer(), dispatch() and post() which expect handlers which are callable + * with no arguments. + * + * The original Handler's associated allocator and executor are maintained. + * + * @see bind_handler + */ +template <typename Handler, typename Tuple> +struct CompletionHandler { + Handler handler; + Tuple args; + + CompletionHandler(Handler&& handler, Tuple&& args) + : handler(std::move(handler)), + args(std::move(args)) + {} + + void operator()() & { + std::apply(handler, args); + } + void operator()() const & { + std::apply(handler, args); + } + void operator()() && { + std::apply(std::move(handler), std::move(args)); + } + + using allocator_type = boost::asio::associated_allocator_t<Handler>; + allocator_type get_allocator() const noexcept { + return boost::asio::get_associated_allocator(handler); + } +}; + +} // namespace ceph::async + +namespace boost::asio { + +// specialize boost::asio::associated_executor<> for CompletionHandler +template <typename Handler, typename Tuple, typename Executor> +struct associated_executor<ceph::async::CompletionHandler<Handler, Tuple>, Executor> { + using type = boost::asio::associated_executor_t<Handler, Executor>; + + static type get(const ceph::async::CompletionHandler<Handler, Tuple>& handler, + const Executor& ex = Executor()) noexcept { + return boost::asio::get_associated_executor(handler.handler, ex); + } +}; + +} // namespace boost::asio + +namespace ceph::async { + +/** + * Returns a wrapped completion handler with bound arguments. + * + * Binds the given arguments to a handler, and returns a CompletionHandler that + * is callable with no arguments. This is similar to std::bind(), except that + * all arguments must be provided. Move-only argument types are supported as + * long as the CompletionHandler's 'operator() &&' overload is used, i.e. + * std::move(handler)(). + * + * Example use: + * + * // bind the arguments (5, "hello") to a callback lambda: + * auto callback = [] (int a, std::string b) {}; + * auto handler = bind_handler(callback, 5, "hello"); + * + * // execute the bound handler on an io_context: + * boost::asio::io_context context; + * boost::asio::post(context, std::move(handler)); + * context.run(); + * + * @see CompletionHandler + */ +template <typename Handler, typename ...Args> +auto bind_handler(Handler&& h, Args&& ...args) +{ + return CompletionHandler{std::forward<Handler>(h), + std::make_tuple(std::forward<Args>(args)...)}; +} + +} // namespace ceph::async + +#endif // CEPH_ASYNC_BIND_HANDLER_H diff --git a/src/common/async/completion.h b/src/common/async/completion.h new file mode 100644 index 00000000..6af9109d --- /dev/null +++ b/src/common/async/completion.h @@ -0,0 +1,320 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ASYNC_COMPLETION_H +#define CEPH_ASYNC_COMPLETION_H + +#include <memory> + +#include "bind_handler.h" +#include "forward_handler.h" + +namespace ceph::async { + +/** + * Abstract completion handler interface for use with boost::asio. + * + * Memory management is performed using the Handler's 'associated allocator', + * which carries the additional requirement that its memory be released before + * the Handler is invoked. This allows memory allocated for one asynchronous + * operation to be reused in its continuation. Because of this requirement, any + * calls to invoke the completion must first release ownership of it. To enforce + * this, the static functions defer()/dispatch()/post() take the completion by + * rvalue-reference to std::unique_ptr<Completion>, i.e. std::move(completion). + * + * Handlers may also have an 'associated executor', so the calls to defer(), + * dispatch(), and post() are forwarded to that executor. If there is no + * associated executor (which is generally the case unless one was bound with + * boost::asio::bind_executor()), the executor passed to Completion::create() + * is used as a default. + * + * Example use: + * + * // declare a Completion type with Signature = void(int, string) + * using MyCompletion = ceph::async::Completion<void(int, string)>; + * + * // create a completion with the given callback: + * std::unique_ptr<MyCompletion> c; + * c = MyCompletion::create(ex, [] (int a, const string& b) {}); + * + * // bind arguments to the callback and post to its associated executor: + * MyCompletion::post(std::move(c), 5, "hello"); + * + * + * Additional user data may be stored along with the Completion to take + * advantage of the handler allocator optimization. This is accomplished by + * specifying its type in the template parameter T. For example, the type + * Completion<void(), int> contains a public member variable 'int user_data'. + * Any additional arguments to Completion::create() will be forwarded to type + * T's constructor. + * + * If the AsBase<T> type tag is used, as in Completion<void(), AsBase<T>>, + * the Completion will inherit from T instead of declaring it as a member + * variable. + * + * When invoking the completion handler via defer(), dispatch(), or post(), + * care must be taken when passing arguments that refer to user data, because + * its memory is destroyed prior to invocation. In such cases, the user data + * should be moved/copied out of the Completion first. + */ +template <typename Signature, typename T = void> +class Completion; + + +/// type tag for UserData +template <typename T> struct AsBase {}; + +namespace detail { + +/// optional user data to be stored with the Completion +template <typename T> +struct UserData { + T user_data; + template <typename ...Args> + UserData(Args&& ...args) + : user_data(std::forward<Args>(args)...) + {} +}; +// AsBase specialization inherits from T +template <typename T> +struct UserData<AsBase<T>> : public T { + template <typename ...Args> + UserData(Args&& ...args) + : T(std::forward<Args>(args)...) + {} +}; +// void specialization +template <> +class UserData<void> {}; + +} // namespace detail + + +// template specialization to pull the Signature's args apart +template <typename T, typename ...Args> +class Completion<void(Args...), T> : public detail::UserData<T> { + protected: + // internal interfaces for type-erasure on the Handler/Executor. uses + // tuple<Args...> to provide perfect forwarding because you can't make + // virtual function templates + virtual void destroy_defer(std::tuple<Args...>&& args) = 0; + virtual void destroy_dispatch(std::tuple<Args...>&& args) = 0; + virtual void destroy_post(std::tuple<Args...>&& args) = 0; + virtual void destroy() = 0; + + // constructor is protected, use create(). any constructor arguments are + // forwarded to UserData + template <typename ...TArgs> + Completion(TArgs&& ...args) + : detail::UserData<T>(std::forward<TArgs>(args)...) + {} + public: + virtual ~Completion() = default; + + // use the virtual destroy() interface on delete. this allows the derived + // class to manage its memory using Handler allocators, without having to use + // a custom Deleter for std::unique_ptr<> + static void operator delete(void *p) { + static_cast<Completion*>(p)->destroy(); + } + + /// completion factory function that uses the handler's associated allocator. + /// any additional arguments are forwared to T's constructor + template <typename Executor1, typename Handler, typename ...TArgs> + static std::unique_ptr<Completion> + create(const Executor1& ex1, Handler&& handler, TArgs&& ...args); + + /// take ownership of the completion, bind any arguments to the completion + /// handler, then defer() it on its associated executor + template <typename ...Args2> + static void defer(std::unique_ptr<Completion>&& c, Args2&&...args); + + /// take ownership of the completion, bind any arguments to the completion + /// handler, then dispatch() it on its associated executor + template <typename ...Args2> + static void dispatch(std::unique_ptr<Completion>&& c, Args2&&...args); + + /// take ownership of the completion, bind any arguments to the completion + /// handler, then post() it to its associated executor + template <typename ...Args2> + static void post(std::unique_ptr<Completion>&& c, Args2&&...args); +}; + +namespace detail { + +// concrete Completion that knows how to invoke the completion handler. this +// observes all of the 'Requirements on asynchronous operations' specified by +// the C++ Networking TS +template <typename Executor1, typename Handler, typename T, typename ...Args> +class CompletionImpl final : public Completion<void(Args...), T> { + // use Handler's associated executor (or Executor1 by default) for callbacks + using Executor2 = boost::asio::associated_executor_t<Handler, Executor1>; + // maintain work on both executors + using Work1 = boost::asio::executor_work_guard<Executor1>; + using Work2 = boost::asio::executor_work_guard<Executor2>; + std::pair<Work1, Work2> work; + Handler handler; + + // use Handler's associated allocator + using Alloc2 = boost::asio::associated_allocator_t<Handler>; + using Traits2 = std::allocator_traits<Alloc2>; + using RebindAlloc2 = typename Traits2::template rebind_alloc<CompletionImpl>; + using RebindTraits2 = std::allocator_traits<RebindAlloc2>; + + // placement new for the handler allocator + static void* operator new(size_t, RebindAlloc2 alloc2) { + return RebindTraits2::allocate(alloc2, 1); + } + // placement delete for when the constructor throws during placement new + static void operator delete(void *p, RebindAlloc2 alloc2) { + RebindTraits2::deallocate(alloc2, static_cast<CompletionImpl*>(p), 1); + } + + static auto bind_and_forward(Handler&& h, std::tuple<Args...>&& args) { + return forward_handler(CompletionHandler{std::move(h), std::move(args)}); + } + + void destroy_defer(std::tuple<Args...>&& args) override { + auto w = std::move(work); + auto f = bind_and_forward(std::move(handler), std::move(args)); + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + w.second.get_executor().defer(std::move(f), alloc2); + } + void destroy_dispatch(std::tuple<Args...>&& args) override { + auto w = std::move(work); + auto f = bind_and_forward(std::move(handler), std::move(args)); + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + w.second.get_executor().dispatch(std::move(f), alloc2); + } + void destroy_post(std::tuple<Args...>&& args) override { + auto w = std::move(work); + auto f = bind_and_forward(std::move(handler), std::move(args)); + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + w.second.get_executor().post(std::move(f), alloc2); + } + void destroy() override { + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + } + + // constructor is private, use create(). extra constructor arguments are + // forwarded to UserData + template <typename ...TArgs> + CompletionImpl(const Executor1& ex1, Handler&& handler, TArgs&& ...args) + : Completion<void(Args...), T>(std::forward<TArgs>(args)...), + work(ex1, boost::asio::make_work_guard(handler, ex1)), + handler(std::move(handler)) + {} + + public: + template <typename ...TArgs> + static auto create(const Executor1& ex, Handler&& handler, TArgs&& ...args) { + auto alloc2 = boost::asio::get_associated_allocator(handler); + using Ptr = std::unique_ptr<CompletionImpl>; + return Ptr{new (alloc2) CompletionImpl(ex, std::move(handler), + std::forward<TArgs>(args)...)}; + } + + static void operator delete(void *p) { + static_cast<CompletionImpl*>(p)->destroy(); + } +}; + +} // namespace detail + + +template <typename T, typename ...Args> +template <typename Executor1, typename Handler, typename ...TArgs> +std::unique_ptr<Completion<void(Args...), T>> +Completion<void(Args...), T>::create(const Executor1& ex, + Handler&& handler, TArgs&& ...args) +{ + using Impl = detail::CompletionImpl<Executor1, Handler, T, Args...>; + return Impl::create(ex, std::forward<Handler>(handler), + std::forward<TArgs>(args)...); +} + +template <typename T, typename ...Args> +template <typename ...Args2> +void Completion<void(Args...), T>::defer(std::unique_ptr<Completion>&& ptr, + Args2&& ...args) +{ + auto c = ptr.release(); + c->destroy_defer(std::make_tuple(std::forward<Args2>(args)...)); +} + +template <typename T, typename ...Args> +template <typename ...Args2> +void Completion<void(Args...), T>::dispatch(std::unique_ptr<Completion>&& ptr, + Args2&& ...args) +{ + auto c = ptr.release(); + c->destroy_dispatch(std::make_tuple(std::forward<Args2>(args)...)); +} + +template <typename T, typename ...Args> +template <typename ...Args2> +void Completion<void(Args...), T>::post(std::unique_ptr<Completion>&& ptr, + Args2&& ...args) +{ + auto c = ptr.release(); + c->destroy_post(std::make_tuple(std::forward<Args2>(args)...)); +} + + +/// completion factory function that uses the handler's associated allocator. +/// any additional arguments are forwared to T's constructor +template <typename Signature, typename T, typename Executor1, + typename Handler, typename ...TArgs> +std::unique_ptr<Completion<Signature, T>> +create_completion(const Executor1& ex, Handler&& handler, TArgs&& ...args) +{ + return Completion<Signature, T>::create(ex, std::forward<Handler>(handler), + std::forward<TArgs>(args)...); +} + +/// take ownership of the completion, bind any arguments to the completion +/// handler, then defer() it on its associated executor +template <typename Signature, typename T, typename ...Args> +void defer(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args) +{ + Completion<Signature, T>::defer(std::move(ptr), std::forward<Args>(args)...); +} + +/// take ownership of the completion, bind any arguments to the completion +/// handler, then dispatch() it on its associated executor +template <typename Signature, typename T, typename ...Args> +void dispatch(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args) +{ + Completion<Signature, T>::dispatch(std::move(ptr), std::forward<Args>(args)...); +} + +/// take ownership of the completion, bind any arguments to the completion +/// handler, then post() it to its associated executor +template <typename Signature, typename T, typename ...Args> +void post(std::unique_ptr<Completion<Signature, T>>&& ptr, Args&& ...args) +{ + Completion<Signature, T>::post(std::move(ptr), std::forward<Args>(args)...); +} + +} // namespace ceph::async + +#endif // CEPH_ASYNC_COMPLETION_H diff --git a/src/common/async/detail/shared_lock.h b/src/common/async/detail/shared_lock.h new file mode 100644 index 00000000..12e6a922 --- /dev/null +++ b/src/common/async/detail/shared_lock.h @@ -0,0 +1,185 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +namespace std { + +// specialize unique_lock and shared_lock for SharedMutex to operate on +// SharedMutexImpl instead, because the locks may outlive the SharedMutex itself + +template <typename Executor> +class unique_lock<ceph::async::SharedMutex<Executor>> { + public: + using mutex_type = boost::intrusive_ptr<ceph::async::detail::SharedMutexImpl>; + + unique_lock() = default; + explicit unique_lock(ceph::async::SharedMutex<Executor>& m) + : impl(m.impl), locked(true) + { + impl->lock(); + } + unique_lock(ceph::async::SharedMutex<Executor>& m, defer_lock_t t) noexcept + : impl(m.impl) + {} + unique_lock(ceph::async::SharedMutex<Executor>& m, try_to_lock_t t) + : impl(m.impl), locked(impl->try_lock()) + {} + unique_lock(ceph::async::SharedMutex<Executor>& m, adopt_lock_t t) noexcept + : impl(m.impl), locked(true) + {} + ~unique_lock() { + if (impl && locked) + impl->unlock(); + } + + unique_lock(unique_lock&& other) noexcept + : impl(std::move(other.impl)), + locked(other.locked) { + other.locked = false; + } + unique_lock& operator=(unique_lock&& other) noexcept { + if (impl && locked) { + impl->unlock(); + } + impl = std::move(other.impl); + locked = other.locked; + other.locked = false; + return *this; + } + void swap(unique_lock& other) noexcept { + using std::swap; + swap(impl, other.impl); + swap(locked, other.locked); + } + + mutex_type mutex() const noexcept { return impl; } + bool owns_lock() const noexcept { return impl && locked; } + explicit operator bool() const noexcept { return impl && locked; } + + mutex_type release() { + auto result = std::move(impl); + locked = false; + return result; + } + + void lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + impl->lock(); + locked = true; + } + bool try_lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + return locked = impl->try_lock(); + } + void unlock() { + if (!impl || !locked) + throw system_error(make_error_code(errc::operation_not_permitted)); + impl->unlock(); + locked = false; + } + private: + mutex_type impl; + bool locked{false}; +}; + +template <typename Executor> +class shared_lock<ceph::async::SharedMutex<Executor>> { + public: + using mutex_type = boost::intrusive_ptr<ceph::async::detail::SharedMutexImpl>; + + shared_lock() = default; + explicit shared_lock(ceph::async::SharedMutex<Executor>& m) + : impl(m.impl), locked(true) + { + impl->lock_shared(); + } + shared_lock(ceph::async::SharedMutex<Executor>& m, defer_lock_t t) noexcept + : impl(m.impl) + {} + shared_lock(ceph::async::SharedMutex<Executor>& m, try_to_lock_t t) + : impl(m.impl), locked(impl->try_lock_shared()) + {} + shared_lock(ceph::async::SharedMutex<Executor>& m, adopt_lock_t t) noexcept + : impl(m.impl), locked(true) + {} + + ~shared_lock() { + if (impl && locked) + impl->unlock_shared(); + } + + shared_lock(shared_lock&& other) noexcept + : impl(std::move(other.impl)), + locked(other.locked) { + other.locked = false; + } + shared_lock& operator=(shared_lock&& other) noexcept { + if (impl && locked) { + impl->unlock_shared(); + } + impl = std::move(other.impl); + locked = other.locked; + other.locked = false; + return *this; + } + void swap(shared_lock& other) noexcept { + using std::swap; + swap(impl, other.impl); + swap(locked, other.locked); + } + + mutex_type mutex() const noexcept { return impl; } + bool owns_lock() const noexcept { return impl && locked; } + explicit operator bool() const noexcept { return impl && locked; } + + mutex_type release() { + auto result = std::move(impl); + locked = false; + return result; + } + + void lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + impl->lock_shared(); + locked = true; + } + bool try_lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + return locked = impl->try_lock_shared(); + } + void unlock() { + if (!impl || !locked) + throw system_error(make_error_code(errc::operation_not_permitted)); + impl->unlock_shared(); + locked = false; + } + private: + mutex_type impl; + bool locked{false}; +}; + +} // namespace std diff --git a/src/common/async/detail/shared_mutex.h b/src/common/async/detail/shared_mutex.h new file mode 100644 index 00000000..8e543635 --- /dev/null +++ b/src/common/async/detail/shared_mutex.h @@ -0,0 +1,326 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <condition_variable> +#include <mutex> +#include <optional> +#include <shared_mutex> // for std::shared_lock + +#include <boost/smart_ptr/intrusive_ref_counter.hpp> +#include <boost/intrusive_ptr.hpp> +#include <boost/intrusive/list.hpp> + +#include "include/ceph_assert.h" + +#include "common/async/completion.h" + +namespace ceph::async::detail { + +struct LockRequest : public boost::intrusive::list_base_hook<> { + virtual ~LockRequest() {} + virtual void complete(boost::system::error_code ec) = 0; + virtual void destroy() = 0; +}; + +class SharedMutexImpl : public boost::intrusive_ref_counter<SharedMutexImpl> { + public: + ~SharedMutexImpl(); + + template <typename Mutex, typename CompletionToken> + auto async_lock(Mutex& mtx, CompletionToken&& token); + void lock(); + void lock(boost::system::error_code& ec); + bool try_lock(); + void unlock(); + template <typename Mutex, typename CompletionToken> + auto async_lock_shared(Mutex& mtx, CompletionToken&& token); + void lock_shared(); + void lock_shared(boost::system::error_code& ec); + bool try_lock_shared(); + void unlock_shared(); + void cancel(); + + private: + using RequestList = boost::intrusive::list<LockRequest>; + + RequestList shared_queue; //< requests waiting on a shared lock + RequestList exclusive_queue; //< requests waiting on an exclusive lock + + /// lock state encodes the number of shared lockers, or 'max' for exclusive + using LockState = uint16_t; + static constexpr LockState Unlocked = 0; + static constexpr LockState Exclusive = std::numeric_limits<LockState>::max(); + static constexpr LockState MaxShared = Exclusive - 1; + LockState state = Unlocked; //< current lock state + + std::mutex mutex; //< protects lock state and wait queues + + void complete(RequestList&& requests, boost::system::error_code ec); +}; + +// sync requests live on the stack and wait on a condition variable +class SyncRequest : public LockRequest { + std::condition_variable cond; + std::optional<boost::system::error_code> ec; + public: + boost::system::error_code wait(std::unique_lock<std::mutex>& lock) { + // return the error code once its been set + cond.wait(lock, [this] { return ec; }); + return *ec; + } + void complete(boost::system::error_code ec) override { + this->ec = ec; + cond.notify_one(); + } + void destroy() override { + // nothing, SyncRequests live on the stack + } +}; + +// async requests use async::Completion to invoke a handler on its executor +template <typename Mutex, template <typename> typename Lock> +class AsyncRequest : public LockRequest { + Mutex& mutex; //< mutex argument for lock guard + public: + explicit AsyncRequest(Mutex& mutex) : mutex(mutex) {} + + using Signature = void(boost::system::error_code, Lock<Mutex>); + using LockCompletion = Completion<Signature, AsBase<AsyncRequest>>; + + void complete(boost::system::error_code ec) override { + auto r = static_cast<LockCompletion*>(this); + // pass ownership of ourselves to post(). on error, pass an empty lock + post(std::unique_ptr<LockCompletion>{r}, ec, + ec ? Lock{mutex, std::defer_lock} : Lock{mutex, std::adopt_lock}); + } + void destroy() override { + delete static_cast<LockCompletion*>(this); + } +}; + +inline SharedMutexImpl::~SharedMutexImpl() +{ + ceph_assert(state == Unlocked); + ceph_assert(shared_queue.empty()); + ceph_assert(exclusive_queue.empty()); +} + +template <typename Mutex, typename CompletionToken> +auto SharedMutexImpl::async_lock(Mutex& mtx, CompletionToken&& token) +{ + using Request = AsyncRequest<Mutex, std::unique_lock>; + using Signature = typename Request::Signature; + boost::asio::async_completion<CompletionToken, Signature> init(token); + auto& handler = init.completion_handler; + auto ex1 = mtx.get_executor(); + { + std::lock_guard lock{mutex}; + + boost::system::error_code ec; + if (state == Unlocked) { + state = Exclusive; + + // post a successful completion + auto ex2 = boost::asio::get_associated_executor(handler, ex1); + auto alloc2 = boost::asio::get_associated_allocator(handler); + auto b = bind_handler(std::move(handler), ec, + std::unique_lock{mtx, std::adopt_lock}); + ex2.post(forward_handler(std::move(b)), alloc2); + } else { + // create a request and add it to the exclusive list + using LockCompletion = typename Request::LockCompletion; + auto request = LockCompletion::create(ex1, std::move(handler), mtx); + exclusive_queue.push_back(*request.release()); + } + } + return init.result.get(); +} + +inline void SharedMutexImpl::lock() +{ + boost::system::error_code ec; + lock(ec); + if (ec) { + throw boost::system::system_error(ec); + } +} + +void SharedMutexImpl::lock(boost::system::error_code& ec) +{ + std::unique_lock lock{mutex}; + + if (state == Unlocked) { + state = Exclusive; + ec.clear(); + } else { + SyncRequest request; + exclusive_queue.push_back(request); + ec = request.wait(lock); + } +} + +inline bool SharedMutexImpl::try_lock() +{ + std::lock_guard lock{mutex}; + + if (state == Unlocked) { + state = Exclusive; + return true; + } + return false; +} + +void SharedMutexImpl::unlock() +{ + RequestList granted; + { + std::lock_guard lock{mutex}; + ceph_assert(state == Exclusive); + + if (!exclusive_queue.empty()) { + // grant next exclusive lock + auto& request = exclusive_queue.front(); + exclusive_queue.pop_front(); + granted.push_back(request); + } else { + // grant shared locks, if any + state = shared_queue.size(); + if (state > MaxShared) { + state = MaxShared; + auto end = std::next(shared_queue.begin(), MaxShared); + granted.splice(granted.end(), shared_queue, + shared_queue.begin(), end, MaxShared); + } else { + granted.splice(granted.end(), shared_queue); + } + } + } + complete(std::move(granted), boost::system::error_code{}); +} + +template <typename Mutex, typename CompletionToken> +auto SharedMutexImpl::async_lock_shared(Mutex& mtx, CompletionToken&& token) +{ + using Request = AsyncRequest<Mutex, std::shared_lock>; + using Signature = typename Request::Signature; + boost::asio::async_completion<CompletionToken, Signature> init(token); + auto& handler = init.completion_handler; + auto ex1 = mtx.get_executor(); + { + std::lock_guard lock{mutex}; + + boost::system::error_code ec; + if (exclusive_queue.empty() && state < MaxShared) { + state++; + + auto ex2 = boost::asio::get_associated_executor(handler, ex1); + auto alloc2 = boost::asio::get_associated_allocator(handler); + auto b = bind_handler(std::move(handler), ec, + std::shared_lock{mtx, std::adopt_lock}); + ex2.post(forward_handler(std::move(b)), alloc2); + } else { + using LockCompletion = typename Request::LockCompletion; + auto request = LockCompletion::create(ex1, std::move(handler), mtx); + shared_queue.push_back(*request.release()); + } + } + return init.result.get(); +} + +inline void SharedMutexImpl::lock_shared() +{ + boost::system::error_code ec; + lock_shared(ec); + if (ec) { + throw boost::system::system_error(ec); + } +} + +void SharedMutexImpl::lock_shared(boost::system::error_code& ec) +{ + std::unique_lock lock{mutex}; + + if (exclusive_queue.empty() && state < MaxShared) { + state++; + ec.clear(); + } else { + SyncRequest request; + shared_queue.push_back(request); + ec = request.wait(lock); + } +} + +inline bool SharedMutexImpl::try_lock_shared() +{ + std::lock_guard lock{mutex}; + + if (exclusive_queue.empty() && state < MaxShared) { + state++; + return true; + } + return false; +} + +inline void SharedMutexImpl::unlock_shared() +{ + std::lock_guard lock{mutex}; + ceph_assert(state != Unlocked && state <= MaxShared); + + if (state == 1 && !exclusive_queue.empty()) { + // grant next exclusive lock + state = Exclusive; + auto& request = exclusive_queue.front(); + exclusive_queue.pop_front(); + request.complete(boost::system::error_code{}); + } else if (state == MaxShared && !shared_queue.empty() && + exclusive_queue.empty()) { + // grant next shared lock + auto& request = shared_queue.front(); + shared_queue.pop_front(); + request.complete(boost::system::error_code{}); + } else { + state--; + } +} + +inline void SharedMutexImpl::cancel() +{ + RequestList canceled; + { + std::lock_guard lock{mutex}; + canceled.splice(canceled.end(), shared_queue); + canceled.splice(canceled.end(), exclusive_queue); + } + complete(std::move(canceled), boost::asio::error::operation_aborted); +} + +void SharedMutexImpl::complete(RequestList&& requests, + boost::system::error_code ec) +{ + while (!requests.empty()) { + auto& request = requests.front(); + requests.pop_front(); + try { + request.complete(ec); + } catch (...) { + // clean up any remaining completions and rethrow + requests.clear_and_dispose([] (LockRequest *r) { r->destroy(); }); + throw; + } + } +} + +} // namespace ceph::async::detail diff --git a/src/common/async/forward_handler.h b/src/common/async/forward_handler.h new file mode 100644 index 00000000..ae88cc83 --- /dev/null +++ b/src/common/async/forward_handler.h @@ -0,0 +1,103 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ASYNC_FORWARD_HANDLER_H +#define CEPH_ASYNC_FORWARD_HANDLER_H + +#include <boost/asio.hpp> + +namespace ceph::async { + +/** + * A forwarding completion handler for use with boost::asio. + * + * A completion handler wrapper that invokes the handler's operator() as an + * rvalue, regardless of whether the wrapper is invoked as an lvalue or rvalue. + * This operation is potentially destructive to the wrapped handler, so is only + * suitable for single-use handlers. + * + * This is useful when combined with bind_handler() and move-only arguments, + * because executors will always call the lvalue overload of operator(). + * + * The original Handler's associated allocator and executor are maintained. + * + * @see forward_handler + */ +template <typename Handler> +struct ForwardingHandler { + Handler handler; + + ForwardingHandler(Handler&& handler) + : handler(std::move(handler)) + {} + + template <typename ...Args> + void operator()(Args&& ...args) { + std::move(handler)(std::forward<Args>(args)...); + } + + using allocator_type = boost::asio::associated_allocator_t<Handler>; + allocator_type get_allocator() const noexcept { + return boost::asio::get_associated_allocator(handler); + } +}; + +} // namespace ceph::async + +namespace boost::asio { + +// specialize boost::asio::associated_executor<> for ForwardingHandler +template <typename Handler, typename Executor> +struct associated_executor<ceph::async::ForwardingHandler<Handler>, Executor> { + using type = boost::asio::associated_executor_t<Handler, Executor>; + + static type get(const ceph::async::ForwardingHandler<Handler>& handler, + const Executor& ex = Executor()) noexcept { + return boost::asio::get_associated_executor(handler.handler, ex); + } +}; + +} // namespace boost::asio + +namespace ceph::async { + +/** + * Returns a single-use completion handler that always forwards on operator(). + * + * Wraps a completion handler such that it is always invoked as an rvalue. This + * is necessary when combining executors and bind_handler() with move-only + * argument types. + * + * Example use: + * + * auto callback = [] (std::unique_ptr<int>&& p) {}; + * auto bound_handler = bind_handler(callback, std::make_unique<int>(5)); + * auro handler = forward_handler(std::move(bound_handler)); + * + * // execute the forwarding handler on an io_context: + * boost::asio::io_context context; + * boost::asio::post(context, std::move(handler)); + * context.run(); + * + * @see ForwardingHandler + */ +template <typename Handler> +auto forward_handler(Handler&& h) +{ + return ForwardingHandler{std::forward<Handler>(h)}; +} + +} // namespace ceph::async + +#endif // CEPH_ASYNC_FORWARD_HANDLER_H diff --git a/src/common/async/shared_mutex.h b/src/common/async/shared_mutex.h new file mode 100644 index 00000000..3e471a4d --- /dev/null +++ b/src/common/async/shared_mutex.h @@ -0,0 +1,212 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include "common/async/detail/shared_mutex.h" + +namespace ceph::async { + +/** + * An asynchronous shared mutex for use with boost::asio. + * + * A shared mutex class with asynchronous lock operations that complete on a + * boost::asio executor. The class also has synchronous interfaces that meet + * most of the standard library's requirements for the SharedMutex concept, + * which makes it compatible with lock_guard, unique_lock, and shared_lock. + * + * All lock requests can fail with operation_aborted on cancel() or destruction. + * The non-error_code overloads of lock() and lock_shared() will throw this + * error as an exception of type boost::system::system_error. + * + * Exclusive locks are prioritized over shared locks. Locks of the same type + * are granted in fifo order. The implementation defines a limit on the number + * of shared locks to 65534 at a time. + * + * Example use: + * + * boost::asio::io_context context; + * SharedMutex mutex{context.get_executor()}; + * + * mutex.async_lock([&] (boost::system::error_code ec, auto lock) { + * if (!ec) { + * // mutate shared state ... + * } + * }); + * mutex.async_lock_shared([&] (boost::system::error_code ec, auto lock) { + * if (!ec) { + * // read shared state ... + * } + * }); + * + * context.run(); + */ +template <typename Executor> +class SharedMutex { + public: + explicit SharedMutex(const Executor& ex); + + /// on destruction, all pending lock requests are canceled + ~SharedMutex(); + + using executor_type = Executor; + executor_type get_executor() const noexcept { return ex; } + + /// initiate an asynchronous request for an exclusive lock. when the lock is + /// granted, the completion handler is invoked with a successful error code + /// and a std::unique_lock that owns this mutex. + /// Signature = void(boost::system::error_code, std::unique_lock) + template <typename CompletionToken> + auto async_lock(CompletionToken&& token); + + /// wait synchronously for an exclusive lock. if an error occurs before the + /// lock is granted, that error is thrown as an exception + void lock(); + + /// wait synchronously for an exclusive lock. if an error occurs before the + /// lock is granted, that error is assigned to 'ec' + void lock(boost::system::error_code& ec); + + /// try to acquire an exclusive lock. if the lock is not immediately + /// available, returns false + bool try_lock(); + + /// releases an exclusive lock. not required to be called from the same thread + /// that initiated the lock + void unlock(); + + /// initiate an asynchronous request for a shared lock. when the lock is + /// granted, the completion handler is invoked with a successful error code + /// and a std::shared_lock that owns this mutex. + /// Signature = void(boost::system::error_code, std::shared_lock) + template <typename CompletionToken> + auto async_lock_shared(CompletionToken&& token); + + /// wait synchronously for a shared lock. if an error occurs before the + /// lock is granted, that error is thrown as an exception + void lock_shared(); + + /// wait synchronously for a shared lock. if an error occurs before the lock + /// is granted, that error is assigned to 'ec' + void lock_shared(boost::system::error_code& ec); + + /// try to acquire a shared lock. if the lock is not immediately available, + /// returns false + bool try_lock_shared(); + + /// releases a shared lock. not required to be called from the same thread + /// that initiated the lock + void unlock_shared(); + + /// cancel any pending requests for exclusive or shared locks with an + /// operation_aborted error + void cancel(); + + private: + Executor ex; //< default callback executor + boost::intrusive_ptr<detail::SharedMutexImpl> impl; + + // allow lock guards to access impl + friend class std::unique_lock<SharedMutex>; + friend class std::shared_lock<SharedMutex>; +}; + + +template <typename Executor> +SharedMutex<Executor>::SharedMutex(const Executor& ex) + : ex(ex), impl(new detail::SharedMutexImpl) +{ +} + +template <typename Executor> +SharedMutex<Executor>::~SharedMutex() +{ + try { + impl->cancel(); + } catch (const std::exception&) { + // swallow any exceptions, the destructor can't throw + } +} + +template <typename Executor> +template <typename CompletionToken> +auto SharedMutex<Executor>::async_lock(CompletionToken&& token) +{ + return impl->async_lock(*this, std::forward<CompletionToken>(token)); +} + +template <typename Executor> +void SharedMutex<Executor>::lock() +{ + impl->lock(); +} + +template <typename Executor> +void SharedMutex<Executor>::lock(boost::system::error_code& ec) +{ + impl->lock(ec); +} + +template <typename Executor> +bool SharedMutex<Executor>::try_lock() +{ + return impl->try_lock(); +} + +template <typename Executor> +void SharedMutex<Executor>::unlock() +{ + impl->unlock(); +} + +template <typename Executor> +template <typename CompletionToken> +auto SharedMutex<Executor>::async_lock_shared(CompletionToken&& token) +{ + return impl->async_lock_shared(*this, std::forward<CompletionToken>(token)); +} + +template <typename Executor> +void SharedMutex<Executor>::lock_shared() +{ + impl->lock_shared(); +} + +template <typename Executor> +void SharedMutex<Executor>::lock_shared(boost::system::error_code& ec) +{ + impl->lock_shared(ec); +} + +template <typename Executor> +bool SharedMutex<Executor>::try_lock_shared() +{ + return impl->try_lock_shared(); +} + +template <typename Executor> +void SharedMutex<Executor>::unlock_shared() +{ + impl->unlock_shared(); +} + +template <typename Executor> +void SharedMutex<Executor>::cancel() +{ + impl->cancel(); +} + +} // namespace ceph::async + +#include "common/async/detail/shared_lock.h" diff --git a/src/common/async/yield_context.h b/src/common/async/yield_context.h new file mode 100644 index 00000000..436192c0 --- /dev/null +++ b/src/common/async/yield_context.h @@ -0,0 +1,67 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <boost/range/begin.hpp> +#include <boost/range/end.hpp> +#include <boost/asio/io_context.hpp> + +#include "acconfig.h" + +#ifndef HAVE_BOOST_CONTEXT + +// hide the dependencies on boost::context and boost::coroutines +namespace boost::asio { +struct yield_context; +} + +#else // HAVE_BOOST_CONTEXT +#ifndef BOOST_COROUTINES_NO_DEPRECATION_WARNING +#define BOOST_COROUTINES_NO_DEPRECATION_WARNING +#endif +#include <boost/asio/spawn.hpp> + +#endif // HAVE_BOOST_CONTEXT + + +/// optional-like wrapper for a boost::asio::yield_context and its associated +/// boost::asio::io_context. operations that take an optional_yield argument +/// will, when passed a non-empty yield context, suspend this coroutine instead +/// of the blocking the thread of execution +class optional_yield { + boost::asio::io_context *c = nullptr; + boost::asio::yield_context *y = nullptr; + public: + /// construct with a valid io and yield_context + explicit optional_yield(boost::asio::io_context& c, + boost::asio::yield_context& y) noexcept + : c(&c), y(&y) {} + + /// type tag to construct an empty object + struct empty_t {}; + optional_yield(empty_t) noexcept {} + + /// implicit conversion to bool, returns true if non-empty + operator bool() const noexcept { return y; } + + /// return a reference to the associated io_context. only valid if non-empty + boost::asio::io_context& get_io_context() const noexcept { return *c; } + + /// return a reference to the yield_context. only valid if non-empty + boost::asio::yield_context& get_yield_context() const noexcept { return *y; } +}; + +// type tag object to construct an empty optional_yield +static constexpr optional_yield::empty_t null_yield{}; |