From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/common/async/bind_handler.h | 111 +++++++++++ src/common/async/bind_like.h | 39 ++++ src/common/async/blocked_completion.h | 290 +++++++++++++++++++++++++++++ src/common/async/completion.h | 320 ++++++++++++++++++++++++++++++++ src/common/async/context_pool.h | 94 ++++++++++ src/common/async/detail/shared_lock.h | 185 +++++++++++++++++++ src/common/async/detail/shared_mutex.h | 326 +++++++++++++++++++++++++++++++++ src/common/async/forward_handler.h | 103 +++++++++++ src/common/async/librados_completion.h | 125 +++++++++++++ src/common/async/shared_mutex.h | 212 +++++++++++++++++++++ src/common/async/waiter.h | 223 ++++++++++++++++++++++ src/common/async/yield_context.h | 59 ++++++ 12 files changed, 2087 insertions(+) create mode 100644 src/common/async/bind_handler.h create mode 100644 src/common/async/bind_like.h create mode 100644 src/common/async/blocked_completion.h create mode 100644 src/common/async/completion.h create mode 100644 src/common/async/context_pool.h create mode 100644 src/common/async/detail/shared_lock.h create mode 100644 src/common/async/detail/shared_mutex.h create mode 100644 src/common/async/forward_handler.h create mode 100644 src/common/async/librados_completion.h create mode 100644 src/common/async/shared_mutex.h create mode 100644 src/common/async/waiter.h create mode 100644 src/common/async/yield_context.h (limited to 'src/common/async') diff --git a/src/common/async/bind_handler.h b/src/common/async/bind_handler.h new file mode 100644 index 000000000..516d8a5e8 --- /dev/null +++ b/src/common/async/bind_handler.h @@ -0,0 +1,111 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ASYNC_BIND_HANDLER_H +#define CEPH_ASYNC_BIND_HANDLER_H + +#include +#include + +namespace ceph::async { + +/** + * A bound completion handler for use with boost::asio. + * + * A completion handler wrapper that allows a tuple of arguments to be forwarded + * to the original Handler. This is intended for use with boost::asio functions + * like defer(), dispatch() and post() which expect handlers which are callable + * with no arguments. + * + * The original Handler's associated allocator and executor are maintained. + * + * @see bind_handler + */ +template +struct CompletionHandler { + Handler handler; + Tuple args; + + CompletionHandler(Handler&& handler, Tuple&& args) + : handler(std::move(handler)), + args(std::move(args)) + {} + + void operator()() & { + std::apply(handler, args); + } + void operator()() const & { + std::apply(handler, args); + } + void operator()() && { + std::apply(std::move(handler), std::move(args)); + } + + using allocator_type = boost::asio::associated_allocator_t; + allocator_type get_allocator() const noexcept { + return boost::asio::get_associated_allocator(handler); + } +}; + +} // namespace ceph::async + +namespace boost::asio { + +// specialize boost::asio::associated_executor<> for CompletionHandler +template +struct associated_executor, Executor> { + using type = boost::asio::associated_executor_t; + + static type get(const ceph::async::CompletionHandler& handler, + const Executor& ex = Executor()) noexcept { + return boost::asio::get_associated_executor(handler.handler, ex); + } +}; + +} // namespace boost::asio + +namespace ceph::async { + +/** + * Returns a wrapped completion handler with bound arguments. + * + * Binds the given arguments to a handler, and returns a CompletionHandler that + * is callable with no arguments. This is similar to std::bind(), except that + * all arguments must be provided. Move-only argument types are supported as + * long as the CompletionHandler's 'operator() &&' overload is used, i.e. + * std::move(handler)(). + * + * Example use: + * + * // bind the arguments (5, "hello") to a callback lambda: + * auto callback = [] (int a, std::string b) {}; + * auto handler = bind_handler(callback, 5, "hello"); + * + * // execute the bound handler on an io_context: + * boost::asio::io_context context; + * boost::asio::post(context, std::move(handler)); + * context.run(); + * + * @see CompletionHandler + */ +template +auto bind_handler(Handler&& h, Args&& ...args) +{ + return CompletionHandler{std::forward(h), + std::make_tuple(std::forward(args)...)}; +} + +} // namespace ceph::async + +#endif // CEPH_ASYNC_BIND_HANDLER_H diff --git a/src/common/async/bind_like.h b/src/common/async/bind_like.h new file mode 100644 index 000000000..c360eac0a --- /dev/null +++ b/src/common/async/bind_like.h @@ -0,0 +1,39 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat + * Author: Adam C. Emerson + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include +#include + +namespace ceph::async { +template +auto bind_ea(const Executor& executor, const Allocator& allocator, + Completion&& completion) { + return bind_allocator(allocator, + boost::asio::bind_executor( + executor, + std::forward(completion))); +} + + +// Bind `Completion` to the executor and allocator of `Proto` +template +auto bind_like(const Proto& proto, Completion&& completion) { + return bind_ea(boost::asio::get_associated_executor(proto), + boost::asio::get_associated_allocator(proto), + std::forward(completion)); +} +} diff --git a/src/common/async/blocked_completion.h b/src/common/async/blocked_completion.h new file mode 100644 index 000000000..23a1319bc --- /dev/null +++ b/src/common/async/blocked_completion.h @@ -0,0 +1,290 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat + * Author: Adam C. Emerson + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H +#define CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H + +#include +#include +#include +#include +#include + +#include + +#include +#include + +namespace ceph::async { + +namespace bs = boost::system; + +class use_blocked_t { + use_blocked_t(bs::error_code* ec) : ec(ec) {} +public: + use_blocked_t() = default; + + use_blocked_t operator [](bs::error_code& _ec) const { + return use_blocked_t(&_ec); + } + + bs::error_code* ec = nullptr; +}; + +inline constexpr use_blocked_t use_blocked; + +namespace detail { + +template +struct blocked_handler +{ + blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} + + void operator ()(Ts... values) noexcept { + std::scoped_lock l(*m); + *ec = bs::error_code{}; + *value = std::forward_as_tuple(std::move(values)...); + *done = true; + cv->notify_one(); + } + + void operator ()(bs::error_code ec, Ts... values) noexcept { + std::scoped_lock l(*m); + *this->ec = ec; + *value = std::forward_as_tuple(std::move(values)...); + *done = true; + cv->notify_one(); + } + + bs::error_code* ec; + std::optional>* value = nullptr; + std::mutex* m = nullptr; + std::condition_variable* cv = nullptr; + bool* done = nullptr; +}; + +template +struct blocked_handler +{ + blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} + + void operator ()(T value) noexcept { + std::scoped_lock l(*m); + *ec = bs::error_code(); + *this->value = std::move(value); + *done = true; + cv->notify_one(); + } + + void operator ()(bs::error_code ec, T value) noexcept { + std::scoped_lock l(*m); + *this->ec = ec; + *this->value = std::move(value); + *done = true; + cv->notify_one(); + } + + //private: + bs::error_code* ec; + std::optional* value; + std::mutex* m = nullptr; + std::condition_variable* cv = nullptr; + bool* done = nullptr; +}; + +template<> +struct blocked_handler +{ + blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} + + void operator ()() noexcept { + std::scoped_lock l(*m); + *ec = bs::error_code{}; + *done = true; + cv->notify_one(); + } + + void operator ()(bs::error_code ec) noexcept { + std::scoped_lock l(*m); + *this->ec = ec; + *done = true; + cv->notify_one(); + } + + bs::error_code* ec; + std::mutex* m = nullptr; + std::condition_variable* cv = nullptr; + bool* done = nullptr; +}; + +template +class blocked_result +{ +public: + using completion_handler_type = blocked_handler; + using return_type = std::tuple; + + explicit blocked_result(completion_handler_type& h) noexcept { + std::scoped_lock l(m); + out_ec = h.ec; + if (!out_ec) h.ec = &ec; + h.value = &value; + h.m = &m; + h.cv = &cv; + h.done = &done; + } + + return_type get() { + std::unique_lock l(m); + cv.wait(l, [this]() { return done; }); + if (!out_ec && ec) throw bs::system_error(ec); + return std::move(*value); + } + + blocked_result(const blocked_result&) = delete; + blocked_result& operator =(const blocked_result&) = delete; + blocked_result(blocked_result&&) = delete; + blocked_result& operator =(blocked_result&&) = delete; + +private: + bs::error_code* out_ec; + bs::error_code ec; + std::optional value; + std::mutex m; + std::condition_variable cv; + bool done = false; +}; + +template +class blocked_result +{ +public: + using completion_handler_type = blocked_handler; + using return_type = T; + + explicit blocked_result(completion_handler_type& h) noexcept { + std::scoped_lock l(m); + out_ec = h.ec; + if (!out_ec) h.ec = &ec; + h.value = &value; + h.m = &m; + h.cv = &cv; + h.done = &done; + } + + return_type get() { + std::unique_lock l(m); + cv.wait(l, [this]() { return done; }); + if (!out_ec && ec) throw bs::system_error(ec); + return std::move(*value); + } + + blocked_result(const blocked_result&) = delete; + blocked_result& operator =(const blocked_result&) = delete; + blocked_result(blocked_result&&) = delete; + blocked_result& operator =(blocked_result&&) = delete; + +private: + bs::error_code* out_ec; + bs::error_code ec; + std::optional value; + std::mutex m; + std::condition_variable cv; + bool done = false; +}; + +template<> +class blocked_result +{ +public: + using completion_handler_type = blocked_handler; + using return_type = void; + + explicit blocked_result(completion_handler_type& h) noexcept { + std::scoped_lock l(m); + out_ec = h.ec; + if (!out_ec) h.ec = &ec; + h.m = &m; + h.cv = &cv; + h.done = &done; + } + + void get() { + std::unique_lock l(m); + cv.wait(l, [this]() { return done; }); + if (!out_ec && ec) throw bs::system_error(ec); + } + + blocked_result(const blocked_result&) = delete; + blocked_result& operator =(const blocked_result&) = delete; + blocked_result(blocked_result&&) = delete; + blocked_result& operator =(blocked_result&&) = delete; + +private: + bs::error_code* out_ec; + bs::error_code ec; + std::mutex m; + std::condition_variable cv; + bool done = false; +}; +} // namespace detail +} // namespace ceph::async + + +namespace boost::asio { +template +class async_result + : public ceph::async::detail::blocked_result +{ +public: + explicit async_result(typename ceph::async::detail::blocked_result + ::completion_handler_type& h) + : ceph::async::detail::blocked_result(h) {} +}; + +template +class async_result + : public ceph::async::detail::blocked_result...> +{ +public: + explicit async_result( + typename ceph::async::detail::blocked_result...>::completion_handler_type& h) + : ceph::async::detail::blocked_result...>(h) {} +}; + +template +class async_result + : public ceph::async::detail::blocked_result +{ +public: + explicit async_result( + typename ceph::async::detail::blocked_result::completion_handler_type& h) + : ceph::async::detail::blocked_result(h) {} +}; + +template +class async_result + : public ceph::async::detail::blocked_result...> +{ +public: + explicit async_result( + typename ceph::async::detail::blocked_result...>::completion_handler_type& h) + : ceph::async::detail::blocked_result...>(h) {} +}; +} + +#endif // !CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H diff --git a/src/common/async/completion.h b/src/common/async/completion.h new file mode 100644 index 000000000..6af9109d5 --- /dev/null +++ b/src/common/async/completion.h @@ -0,0 +1,320 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ASYNC_COMPLETION_H +#define CEPH_ASYNC_COMPLETION_H + +#include + +#include "bind_handler.h" +#include "forward_handler.h" + +namespace ceph::async { + +/** + * Abstract completion handler interface for use with boost::asio. + * + * Memory management is performed using the Handler's 'associated allocator', + * which carries the additional requirement that its memory be released before + * the Handler is invoked. This allows memory allocated for one asynchronous + * operation to be reused in its continuation. Because of this requirement, any + * calls to invoke the completion must first release ownership of it. To enforce + * this, the static functions defer()/dispatch()/post() take the completion by + * rvalue-reference to std::unique_ptr, i.e. std::move(completion). + * + * Handlers may also have an 'associated executor', so the calls to defer(), + * dispatch(), and post() are forwarded to that executor. If there is no + * associated executor (which is generally the case unless one was bound with + * boost::asio::bind_executor()), the executor passed to Completion::create() + * is used as a default. + * + * Example use: + * + * // declare a Completion type with Signature = void(int, string) + * using MyCompletion = ceph::async::Completion; + * + * // create a completion with the given callback: + * std::unique_ptr c; + * c = MyCompletion::create(ex, [] (int a, const string& b) {}); + * + * // bind arguments to the callback and post to its associated executor: + * MyCompletion::post(std::move(c), 5, "hello"); + * + * + * Additional user data may be stored along with the Completion to take + * advantage of the handler allocator optimization. This is accomplished by + * specifying its type in the template parameter T. For example, the type + * Completion contains a public member variable 'int user_data'. + * Any additional arguments to Completion::create() will be forwarded to type + * T's constructor. + * + * If the AsBase type tag is used, as in Completion>, + * the Completion will inherit from T instead of declaring it as a member + * variable. + * + * When invoking the completion handler via defer(), dispatch(), or post(), + * care must be taken when passing arguments that refer to user data, because + * its memory is destroyed prior to invocation. In such cases, the user data + * should be moved/copied out of the Completion first. + */ +template +class Completion; + + +/// type tag for UserData +template struct AsBase {}; + +namespace detail { + +/// optional user data to be stored with the Completion +template +struct UserData { + T user_data; + template + UserData(Args&& ...args) + : user_data(std::forward(args)...) + {} +}; +// AsBase specialization inherits from T +template +struct UserData> : public T { + template + UserData(Args&& ...args) + : T(std::forward(args)...) + {} +}; +// void specialization +template <> +class UserData {}; + +} // namespace detail + + +// template specialization to pull the Signature's args apart +template +class Completion : public detail::UserData { + protected: + // internal interfaces for type-erasure on the Handler/Executor. uses + // tuple to provide perfect forwarding because you can't make + // virtual function templates + virtual void destroy_defer(std::tuple&& args) = 0; + virtual void destroy_dispatch(std::tuple&& args) = 0; + virtual void destroy_post(std::tuple&& args) = 0; + virtual void destroy() = 0; + + // constructor is protected, use create(). any constructor arguments are + // forwarded to UserData + template + Completion(TArgs&& ...args) + : detail::UserData(std::forward(args)...) + {} + public: + virtual ~Completion() = default; + + // use the virtual destroy() interface on delete. this allows the derived + // class to manage its memory using Handler allocators, without having to use + // a custom Deleter for std::unique_ptr<> + static void operator delete(void *p) { + static_cast(p)->destroy(); + } + + /// completion factory function that uses the handler's associated allocator. + /// any additional arguments are forwared to T's constructor + template + static std::unique_ptr + create(const Executor1& ex1, Handler&& handler, TArgs&& ...args); + + /// take ownership of the completion, bind any arguments to the completion + /// handler, then defer() it on its associated executor + template + static void defer(std::unique_ptr&& c, Args2&&...args); + + /// take ownership of the completion, bind any arguments to the completion + /// handler, then dispatch() it on its associated executor + template + static void dispatch(std::unique_ptr&& c, Args2&&...args); + + /// take ownership of the completion, bind any arguments to the completion + /// handler, then post() it to its associated executor + template + static void post(std::unique_ptr&& c, Args2&&...args); +}; + +namespace detail { + +// concrete Completion that knows how to invoke the completion handler. this +// observes all of the 'Requirements on asynchronous operations' specified by +// the C++ Networking TS +template +class CompletionImpl final : public Completion { + // use Handler's associated executor (or Executor1 by default) for callbacks + using Executor2 = boost::asio::associated_executor_t; + // maintain work on both executors + using Work1 = boost::asio::executor_work_guard; + using Work2 = boost::asio::executor_work_guard; + std::pair work; + Handler handler; + + // use Handler's associated allocator + using Alloc2 = boost::asio::associated_allocator_t; + using Traits2 = std::allocator_traits; + using RebindAlloc2 = typename Traits2::template rebind_alloc; + using RebindTraits2 = std::allocator_traits; + + // placement new for the handler allocator + static void* operator new(size_t, RebindAlloc2 alloc2) { + return RebindTraits2::allocate(alloc2, 1); + } + // placement delete for when the constructor throws during placement new + static void operator delete(void *p, RebindAlloc2 alloc2) { + RebindTraits2::deallocate(alloc2, static_cast(p), 1); + } + + static auto bind_and_forward(Handler&& h, std::tuple&& args) { + return forward_handler(CompletionHandler{std::move(h), std::move(args)}); + } + + void destroy_defer(std::tuple&& args) override { + auto w = std::move(work); + auto f = bind_and_forward(std::move(handler), std::move(args)); + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + w.second.get_executor().defer(std::move(f), alloc2); + } + void destroy_dispatch(std::tuple&& args) override { + auto w = std::move(work); + auto f = bind_and_forward(std::move(handler), std::move(args)); + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + w.second.get_executor().dispatch(std::move(f), alloc2); + } + void destroy_post(std::tuple&& args) override { + auto w = std::move(work); + auto f = bind_and_forward(std::move(handler), std::move(args)); + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + w.second.get_executor().post(std::move(f), alloc2); + } + void destroy() override { + RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler); + RebindTraits2::destroy(alloc2, this); + RebindTraits2::deallocate(alloc2, this, 1); + } + + // constructor is private, use create(). extra constructor arguments are + // forwarded to UserData + template + CompletionImpl(const Executor1& ex1, Handler&& handler, TArgs&& ...args) + : Completion(std::forward(args)...), + work(ex1, boost::asio::make_work_guard(handler, ex1)), + handler(std::move(handler)) + {} + + public: + template + static auto create(const Executor1& ex, Handler&& handler, TArgs&& ...args) { + auto alloc2 = boost::asio::get_associated_allocator(handler); + using Ptr = std::unique_ptr; + return Ptr{new (alloc2) CompletionImpl(ex, std::move(handler), + std::forward(args)...)}; + } + + static void operator delete(void *p) { + static_cast(p)->destroy(); + } +}; + +} // namespace detail + + +template +template +std::unique_ptr> +Completion::create(const Executor1& ex, + Handler&& handler, TArgs&& ...args) +{ + using Impl = detail::CompletionImpl; + return Impl::create(ex, std::forward(handler), + std::forward(args)...); +} + +template +template +void Completion::defer(std::unique_ptr&& ptr, + Args2&& ...args) +{ + auto c = ptr.release(); + c->destroy_defer(std::make_tuple(std::forward(args)...)); +} + +template +template +void Completion::dispatch(std::unique_ptr&& ptr, + Args2&& ...args) +{ + auto c = ptr.release(); + c->destroy_dispatch(std::make_tuple(std::forward(args)...)); +} + +template +template +void Completion::post(std::unique_ptr&& ptr, + Args2&& ...args) +{ + auto c = ptr.release(); + c->destroy_post(std::make_tuple(std::forward(args)...)); +} + + +/// completion factory function that uses the handler's associated allocator. +/// any additional arguments are forwared to T's constructor +template +std::unique_ptr> +create_completion(const Executor1& ex, Handler&& handler, TArgs&& ...args) +{ + return Completion::create(ex, std::forward(handler), + std::forward(args)...); +} + +/// take ownership of the completion, bind any arguments to the completion +/// handler, then defer() it on its associated executor +template +void defer(std::unique_ptr>&& ptr, Args&& ...args) +{ + Completion::defer(std::move(ptr), std::forward(args)...); +} + +/// take ownership of the completion, bind any arguments to the completion +/// handler, then dispatch() it on its associated executor +template +void dispatch(std::unique_ptr>&& ptr, Args&& ...args) +{ + Completion::dispatch(std::move(ptr), std::forward(args)...); +} + +/// take ownership of the completion, bind any arguments to the completion +/// handler, then post() it to its associated executor +template +void post(std::unique_ptr>&& ptr, Args&& ...args) +{ + Completion::post(std::move(ptr), std::forward(args)...); +} + +} // namespace ceph::async + +#endif // CEPH_ASYNC_COMPLETION_H diff --git a/src/common/async/context_pool.h b/src/common/async/context_pool.h new file mode 100644 index 000000000..9c6cab767 --- /dev/null +++ b/src/common/async/context_pool.h @@ -0,0 +1,94 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * Author: Adam C. Emerson + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMMON_ASYNC_CONTEXT_POOL_H +#define CEPH_COMMON_ASYNC_CONTEXT_POOL_H + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "common/ceph_mutex.h" +#include "common/Thread.h" + +namespace ceph::async { +class io_context_pool { + std::vector threadvec; + boost::asio::io_context ioctx; + std::optional> guard; + ceph::mutex m = make_mutex("ceph::io_context_pool::m"); + + void cleanup() noexcept { + guard = std::nullopt; + for (auto& th : threadvec) { + th.join(); + } + threadvec.clear(); + } +public: + io_context_pool() noexcept {} + io_context_pool(std::int16_t threadcnt) noexcept { + start(threadcnt); + } + ~io_context_pool() { + stop(); + } + void start(std::int16_t threadcnt) noexcept { + auto l = std::scoped_lock(m); + if (threadvec.empty()) { + guard.emplace(boost::asio::make_work_guard(ioctx)); + ioctx.restart(); + for (std::int16_t i = 0; i < threadcnt; ++i) { + threadvec.emplace_back(make_named_thread("io_context_pool", + [this]() { + ioctx.run(); + })); + } + } + } + void finish() noexcept { + auto l = std::scoped_lock(m); + if (!threadvec.empty()) { + cleanup(); + } + } + void stop() noexcept { + auto l = std::scoped_lock(m); + if (!threadvec.empty()) { + ioctx.stop(); + cleanup(); + } + } + + boost::asio::io_context& get_io_context() { + return ioctx; + } + operator boost::asio::io_context&() { + return ioctx; + } + boost::asio::io_context::executor_type get_executor() { + return ioctx.get_executor(); + } +}; +} + +#endif // CEPH_COMMON_ASYNC_CONTEXT_POOL_H diff --git a/src/common/async/detail/shared_lock.h b/src/common/async/detail/shared_lock.h new file mode 100644 index 000000000..12e6a9220 --- /dev/null +++ b/src/common/async/detail/shared_lock.h @@ -0,0 +1,185 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +namespace std { + +// specialize unique_lock and shared_lock for SharedMutex to operate on +// SharedMutexImpl instead, because the locks may outlive the SharedMutex itself + +template +class unique_lock> { + public: + using mutex_type = boost::intrusive_ptr; + + unique_lock() = default; + explicit unique_lock(ceph::async::SharedMutex& m) + : impl(m.impl), locked(true) + { + impl->lock(); + } + unique_lock(ceph::async::SharedMutex& m, defer_lock_t t) noexcept + : impl(m.impl) + {} + unique_lock(ceph::async::SharedMutex& m, try_to_lock_t t) + : impl(m.impl), locked(impl->try_lock()) + {} + unique_lock(ceph::async::SharedMutex& m, adopt_lock_t t) noexcept + : impl(m.impl), locked(true) + {} + ~unique_lock() { + if (impl && locked) + impl->unlock(); + } + + unique_lock(unique_lock&& other) noexcept + : impl(std::move(other.impl)), + locked(other.locked) { + other.locked = false; + } + unique_lock& operator=(unique_lock&& other) noexcept { + if (impl && locked) { + impl->unlock(); + } + impl = std::move(other.impl); + locked = other.locked; + other.locked = false; + return *this; + } + void swap(unique_lock& other) noexcept { + using std::swap; + swap(impl, other.impl); + swap(locked, other.locked); + } + + mutex_type mutex() const noexcept { return impl; } + bool owns_lock() const noexcept { return impl && locked; } + explicit operator bool() const noexcept { return impl && locked; } + + mutex_type release() { + auto result = std::move(impl); + locked = false; + return result; + } + + void lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + impl->lock(); + locked = true; + } + bool try_lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + return locked = impl->try_lock(); + } + void unlock() { + if (!impl || !locked) + throw system_error(make_error_code(errc::operation_not_permitted)); + impl->unlock(); + locked = false; + } + private: + mutex_type impl; + bool locked{false}; +}; + +template +class shared_lock> { + public: + using mutex_type = boost::intrusive_ptr; + + shared_lock() = default; + explicit shared_lock(ceph::async::SharedMutex& m) + : impl(m.impl), locked(true) + { + impl->lock_shared(); + } + shared_lock(ceph::async::SharedMutex& m, defer_lock_t t) noexcept + : impl(m.impl) + {} + shared_lock(ceph::async::SharedMutex& m, try_to_lock_t t) + : impl(m.impl), locked(impl->try_lock_shared()) + {} + shared_lock(ceph::async::SharedMutex& m, adopt_lock_t t) noexcept + : impl(m.impl), locked(true) + {} + + ~shared_lock() { + if (impl && locked) + impl->unlock_shared(); + } + + shared_lock(shared_lock&& other) noexcept + : impl(std::move(other.impl)), + locked(other.locked) { + other.locked = false; + } + shared_lock& operator=(shared_lock&& other) noexcept { + if (impl && locked) { + impl->unlock_shared(); + } + impl = std::move(other.impl); + locked = other.locked; + other.locked = false; + return *this; + } + void swap(shared_lock& other) noexcept { + using std::swap; + swap(impl, other.impl); + swap(locked, other.locked); + } + + mutex_type mutex() const noexcept { return impl; } + bool owns_lock() const noexcept { return impl && locked; } + explicit operator bool() const noexcept { return impl && locked; } + + mutex_type release() { + auto result = std::move(impl); + locked = false; + return result; + } + + void lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + impl->lock_shared(); + locked = true; + } + bool try_lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + return locked = impl->try_lock_shared(); + } + void unlock() { + if (!impl || !locked) + throw system_error(make_error_code(errc::operation_not_permitted)); + impl->unlock_shared(); + locked = false; + } + private: + mutex_type impl; + bool locked{false}; +}; + +} // namespace std diff --git a/src/common/async/detail/shared_mutex.h b/src/common/async/detail/shared_mutex.h new file mode 100644 index 000000000..8e5436350 --- /dev/null +++ b/src/common/async/detail/shared_mutex.h @@ -0,0 +1,326 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include // for std::shared_lock + +#include +#include +#include + +#include "include/ceph_assert.h" + +#include "common/async/completion.h" + +namespace ceph::async::detail { + +struct LockRequest : public boost::intrusive::list_base_hook<> { + virtual ~LockRequest() {} + virtual void complete(boost::system::error_code ec) = 0; + virtual void destroy() = 0; +}; + +class SharedMutexImpl : public boost::intrusive_ref_counter { + public: + ~SharedMutexImpl(); + + template + auto async_lock(Mutex& mtx, CompletionToken&& token); + void lock(); + void lock(boost::system::error_code& ec); + bool try_lock(); + void unlock(); + template + auto async_lock_shared(Mutex& mtx, CompletionToken&& token); + void lock_shared(); + void lock_shared(boost::system::error_code& ec); + bool try_lock_shared(); + void unlock_shared(); + void cancel(); + + private: + using RequestList = boost::intrusive::list; + + RequestList shared_queue; //< requests waiting on a shared lock + RequestList exclusive_queue; //< requests waiting on an exclusive lock + + /// lock state encodes the number of shared lockers, or 'max' for exclusive + using LockState = uint16_t; + static constexpr LockState Unlocked = 0; + static constexpr LockState Exclusive = std::numeric_limits::max(); + static constexpr LockState MaxShared = Exclusive - 1; + LockState state = Unlocked; //< current lock state + + std::mutex mutex; //< protects lock state and wait queues + + void complete(RequestList&& requests, boost::system::error_code ec); +}; + +// sync requests live on the stack and wait on a condition variable +class SyncRequest : public LockRequest { + std::condition_variable cond; + std::optional ec; + public: + boost::system::error_code wait(std::unique_lock& lock) { + // return the error code once its been set + cond.wait(lock, [this] { return ec; }); + return *ec; + } + void complete(boost::system::error_code ec) override { + this->ec = ec; + cond.notify_one(); + } + void destroy() override { + // nothing, SyncRequests live on the stack + } +}; + +// async requests use async::Completion to invoke a handler on its executor +template typename Lock> +class AsyncRequest : public LockRequest { + Mutex& mutex; //< mutex argument for lock guard + public: + explicit AsyncRequest(Mutex& mutex) : mutex(mutex) {} + + using Signature = void(boost::system::error_code, Lock); + using LockCompletion = Completion>; + + void complete(boost::system::error_code ec) override { + auto r = static_cast(this); + // pass ownership of ourselves to post(). on error, pass an empty lock + post(std::unique_ptr{r}, ec, + ec ? Lock{mutex, std::defer_lock} : Lock{mutex, std::adopt_lock}); + } + void destroy() override { + delete static_cast(this); + } +}; + +inline SharedMutexImpl::~SharedMutexImpl() +{ + ceph_assert(state == Unlocked); + ceph_assert(shared_queue.empty()); + ceph_assert(exclusive_queue.empty()); +} + +template +auto SharedMutexImpl::async_lock(Mutex& mtx, CompletionToken&& token) +{ + using Request = AsyncRequest; + using Signature = typename Request::Signature; + boost::asio::async_completion init(token); + auto& handler = init.completion_handler; + auto ex1 = mtx.get_executor(); + { + std::lock_guard lock{mutex}; + + boost::system::error_code ec; + if (state == Unlocked) { + state = Exclusive; + + // post a successful completion + auto ex2 = boost::asio::get_associated_executor(handler, ex1); + auto alloc2 = boost::asio::get_associated_allocator(handler); + auto b = bind_handler(std::move(handler), ec, + std::unique_lock{mtx, std::adopt_lock}); + ex2.post(forward_handler(std::move(b)), alloc2); + } else { + // create a request and add it to the exclusive list + using LockCompletion = typename Request::LockCompletion; + auto request = LockCompletion::create(ex1, std::move(handler), mtx); + exclusive_queue.push_back(*request.release()); + } + } + return init.result.get(); +} + +inline void SharedMutexImpl::lock() +{ + boost::system::error_code ec; + lock(ec); + if (ec) { + throw boost::system::system_error(ec); + } +} + +void SharedMutexImpl::lock(boost::system::error_code& ec) +{ + std::unique_lock lock{mutex}; + + if (state == Unlocked) { + state = Exclusive; + ec.clear(); + } else { + SyncRequest request; + exclusive_queue.push_back(request); + ec = request.wait(lock); + } +} + +inline bool SharedMutexImpl::try_lock() +{ + std::lock_guard lock{mutex}; + + if (state == Unlocked) { + state = Exclusive; + return true; + } + return false; +} + +void SharedMutexImpl::unlock() +{ + RequestList granted; + { + std::lock_guard lock{mutex}; + ceph_assert(state == Exclusive); + + if (!exclusive_queue.empty()) { + // grant next exclusive lock + auto& request = exclusive_queue.front(); + exclusive_queue.pop_front(); + granted.push_back(request); + } else { + // grant shared locks, if any + state = shared_queue.size(); + if (state > MaxShared) { + state = MaxShared; + auto end = std::next(shared_queue.begin(), MaxShared); + granted.splice(granted.end(), shared_queue, + shared_queue.begin(), end, MaxShared); + } else { + granted.splice(granted.end(), shared_queue); + } + } + } + complete(std::move(granted), boost::system::error_code{}); +} + +template +auto SharedMutexImpl::async_lock_shared(Mutex& mtx, CompletionToken&& token) +{ + using Request = AsyncRequest; + using Signature = typename Request::Signature; + boost::asio::async_completion init(token); + auto& handler = init.completion_handler; + auto ex1 = mtx.get_executor(); + { + std::lock_guard lock{mutex}; + + boost::system::error_code ec; + if (exclusive_queue.empty() && state < MaxShared) { + state++; + + auto ex2 = boost::asio::get_associated_executor(handler, ex1); + auto alloc2 = boost::asio::get_associated_allocator(handler); + auto b = bind_handler(std::move(handler), ec, + std::shared_lock{mtx, std::adopt_lock}); + ex2.post(forward_handler(std::move(b)), alloc2); + } else { + using LockCompletion = typename Request::LockCompletion; + auto request = LockCompletion::create(ex1, std::move(handler), mtx); + shared_queue.push_back(*request.release()); + } + } + return init.result.get(); +} + +inline void SharedMutexImpl::lock_shared() +{ + boost::system::error_code ec; + lock_shared(ec); + if (ec) { + throw boost::system::system_error(ec); + } +} + +void SharedMutexImpl::lock_shared(boost::system::error_code& ec) +{ + std::unique_lock lock{mutex}; + + if (exclusive_queue.empty() && state < MaxShared) { + state++; + ec.clear(); + } else { + SyncRequest request; + shared_queue.push_back(request); + ec = request.wait(lock); + } +} + +inline bool SharedMutexImpl::try_lock_shared() +{ + std::lock_guard lock{mutex}; + + if (exclusive_queue.empty() && state < MaxShared) { + state++; + return true; + } + return false; +} + +inline void SharedMutexImpl::unlock_shared() +{ + std::lock_guard lock{mutex}; + ceph_assert(state != Unlocked && state <= MaxShared); + + if (state == 1 && !exclusive_queue.empty()) { + // grant next exclusive lock + state = Exclusive; + auto& request = exclusive_queue.front(); + exclusive_queue.pop_front(); + request.complete(boost::system::error_code{}); + } else if (state == MaxShared && !shared_queue.empty() && + exclusive_queue.empty()) { + // grant next shared lock + auto& request = shared_queue.front(); + shared_queue.pop_front(); + request.complete(boost::system::error_code{}); + } else { + state--; + } +} + +inline void SharedMutexImpl::cancel() +{ + RequestList canceled; + { + std::lock_guard lock{mutex}; + canceled.splice(canceled.end(), shared_queue); + canceled.splice(canceled.end(), exclusive_queue); + } + complete(std::move(canceled), boost::asio::error::operation_aborted); +} + +void SharedMutexImpl::complete(RequestList&& requests, + boost::system::error_code ec) +{ + while (!requests.empty()) { + auto& request = requests.front(); + requests.pop_front(); + try { + request.complete(ec); + } catch (...) { + // clean up any remaining completions and rethrow + requests.clear_and_dispose([] (LockRequest *r) { r->destroy(); }); + throw; + } + } +} + +} // namespace ceph::async::detail diff --git a/src/common/async/forward_handler.h b/src/common/async/forward_handler.h new file mode 100644 index 000000000..ae88cc83f --- /dev/null +++ b/src/common/async/forward_handler.h @@ -0,0 +1,103 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ASYNC_FORWARD_HANDLER_H +#define CEPH_ASYNC_FORWARD_HANDLER_H + +#include + +namespace ceph::async { + +/** + * A forwarding completion handler for use with boost::asio. + * + * A completion handler wrapper that invokes the handler's operator() as an + * rvalue, regardless of whether the wrapper is invoked as an lvalue or rvalue. + * This operation is potentially destructive to the wrapped handler, so is only + * suitable for single-use handlers. + * + * This is useful when combined with bind_handler() and move-only arguments, + * because executors will always call the lvalue overload of operator(). + * + * The original Handler's associated allocator and executor are maintained. + * + * @see forward_handler + */ +template +struct ForwardingHandler { + Handler handler; + + ForwardingHandler(Handler&& handler) + : handler(std::move(handler)) + {} + + template + void operator()(Args&& ...args) { + std::move(handler)(std::forward(args)...); + } + + using allocator_type = boost::asio::associated_allocator_t; + allocator_type get_allocator() const noexcept { + return boost::asio::get_associated_allocator(handler); + } +}; + +} // namespace ceph::async + +namespace boost::asio { + +// specialize boost::asio::associated_executor<> for ForwardingHandler +template +struct associated_executor, Executor> { + using type = boost::asio::associated_executor_t; + + static type get(const ceph::async::ForwardingHandler& handler, + const Executor& ex = Executor()) noexcept { + return boost::asio::get_associated_executor(handler.handler, ex); + } +}; + +} // namespace boost::asio + +namespace ceph::async { + +/** + * Returns a single-use completion handler that always forwards on operator(). + * + * Wraps a completion handler such that it is always invoked as an rvalue. This + * is necessary when combining executors and bind_handler() with move-only + * argument types. + * + * Example use: + * + * auto callback = [] (std::unique_ptr&& p) {}; + * auto bound_handler = bind_handler(callback, std::make_unique(5)); + * auro handler = forward_handler(std::move(bound_handler)); + * + * // execute the forwarding handler on an io_context: + * boost::asio::io_context context; + * boost::asio::post(context, std::move(handler)); + * context.run(); + * + * @see ForwardingHandler + */ +template +auto forward_handler(Handler&& h) +{ + return ForwardingHandler{std::forward(h)}; +} + +} // namespace ceph::async + +#endif // CEPH_ASYNC_FORWARD_HANDLER_H diff --git a/src/common/async/librados_completion.h b/src/common/async/librados_completion.h new file mode 100644 index 000000000..2fa5555e7 --- /dev/null +++ b/src/common/async/librados_completion.h @@ -0,0 +1,125 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat + * Author: Adam C. Emerson + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMMON_ASYNC_LIBRADOS_COMPLETION_H +#define CEPH_COMMON_ASYNC_LIBRADOS_COMPLETION_H + +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include "include/rados/librados.hpp" +#include "librados/AioCompletionImpl.h" + +// Allow librados::AioCompletion to be provided as a completion +// handler. This is only allowed with a signature of +// (boost::system::error_code) or (). On completion the AioCompletion +// is completed with the error_code converted to an int with +// ceph::from_error_code. +// +// async_result::return_type is void. + +namespace ceph::async { + +namespace bs = boost::system; +namespace lr = librados; + +namespace detail { + +struct librados_handler { + lr::AioCompletionImpl* pc; + + explicit librados_handler(lr::AioCompletion* c) : pc(c->pc) { + pc->get(); + } + ~librados_handler() { + if (pc) { + pc->put(); + pc = nullptr; + } + } + + librados_handler(const librados_handler&) = delete; + librados_handler& operator =(const librados_handler&) = delete; + librados_handler(librados_handler&& rhs) { + pc = rhs.pc; + rhs.pc = nullptr; + } + + void operator()(bs::error_code ec) { + pc->lock.lock(); + pc->rval = ceph::from_error_code(ec); + pc->complete = true; + pc->lock.unlock(); + + auto cb_complete = pc->callback_complete; + auto cb_complete_arg = pc->callback_complete_arg; + if (cb_complete) + cb_complete(pc, cb_complete_arg); + + auto cb_safe = pc->callback_safe; + auto cb_safe_arg = pc->callback_safe_arg; + if (cb_safe) + cb_safe(pc, cb_safe_arg); + + pc->lock.lock(); + pc->callback_complete = NULL; + pc->callback_safe = NULL; + pc->cond.notify_all(); + pc->put_unlock(); + pc = nullptr; + } + + void operator ()() { + (*this)(bs::error_code{}); + } +}; +} // namespace detail +} // namespace ceph::async + + +namespace boost::asio { +template +class async_result { +public: + using completion_handler_type = ceph::async::detail::librados_handler; + explicit async_result(completion_handler_type&) {}; + using return_type = void; + void get() { + return; + } +}; + +template +class async_result { +public: + using completion_handler_type = ceph::async::detail::librados_handler; + explicit async_result(completion_handler_type&) {}; + using return_type = void; + void get() { + return; + } +}; +} + +#endif // !CEPH_COMMON_ASYNC_LIBRADOS_COMPLETION_H diff --git a/src/common/async/shared_mutex.h b/src/common/async/shared_mutex.h new file mode 100644 index 000000000..3e471a4df --- /dev/null +++ b/src/common/async/shared_mutex.h @@ -0,0 +1,212 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include "common/async/detail/shared_mutex.h" + +namespace ceph::async { + +/** + * An asynchronous shared mutex for use with boost::asio. + * + * A shared mutex class with asynchronous lock operations that complete on a + * boost::asio executor. The class also has synchronous interfaces that meet + * most of the standard library's requirements for the SharedMutex concept, + * which makes it compatible with lock_guard, unique_lock, and shared_lock. + * + * All lock requests can fail with operation_aborted on cancel() or destruction. + * The non-error_code overloads of lock() and lock_shared() will throw this + * error as an exception of type boost::system::system_error. + * + * Exclusive locks are prioritized over shared locks. Locks of the same type + * are granted in fifo order. The implementation defines a limit on the number + * of shared locks to 65534 at a time. + * + * Example use: + * + * boost::asio::io_context context; + * SharedMutex mutex{context.get_executor()}; + * + * mutex.async_lock([&] (boost::system::error_code ec, auto lock) { + * if (!ec) { + * // mutate shared state ... + * } + * }); + * mutex.async_lock_shared([&] (boost::system::error_code ec, auto lock) { + * if (!ec) { + * // read shared state ... + * } + * }); + * + * context.run(); + */ +template +class SharedMutex { + public: + explicit SharedMutex(const Executor& ex); + + /// on destruction, all pending lock requests are canceled + ~SharedMutex(); + + using executor_type = Executor; + executor_type get_executor() const noexcept { return ex; } + + /// initiate an asynchronous request for an exclusive lock. when the lock is + /// granted, the completion handler is invoked with a successful error code + /// and a std::unique_lock that owns this mutex. + /// Signature = void(boost::system::error_code, std::unique_lock) + template + auto async_lock(CompletionToken&& token); + + /// wait synchronously for an exclusive lock. if an error occurs before the + /// lock is granted, that error is thrown as an exception + void lock(); + + /// wait synchronously for an exclusive lock. if an error occurs before the + /// lock is granted, that error is assigned to 'ec' + void lock(boost::system::error_code& ec); + + /// try to acquire an exclusive lock. if the lock is not immediately + /// available, returns false + bool try_lock(); + + /// releases an exclusive lock. not required to be called from the same thread + /// that initiated the lock + void unlock(); + + /// initiate an asynchronous request for a shared lock. when the lock is + /// granted, the completion handler is invoked with a successful error code + /// and a std::shared_lock that owns this mutex. + /// Signature = void(boost::system::error_code, std::shared_lock) + template + auto async_lock_shared(CompletionToken&& token); + + /// wait synchronously for a shared lock. if an error occurs before the + /// lock is granted, that error is thrown as an exception + void lock_shared(); + + /// wait synchronously for a shared lock. if an error occurs before the lock + /// is granted, that error is assigned to 'ec' + void lock_shared(boost::system::error_code& ec); + + /// try to acquire a shared lock. if the lock is not immediately available, + /// returns false + bool try_lock_shared(); + + /// releases a shared lock. not required to be called from the same thread + /// that initiated the lock + void unlock_shared(); + + /// cancel any pending requests for exclusive or shared locks with an + /// operation_aborted error + void cancel(); + + private: + Executor ex; //< default callback executor + boost::intrusive_ptr impl; + + // allow lock guards to access impl + friend class std::unique_lock; + friend class std::shared_lock; +}; + + +template +SharedMutex::SharedMutex(const Executor& ex) + : ex(ex), impl(new detail::SharedMutexImpl) +{ +} + +template +SharedMutex::~SharedMutex() +{ + try { + impl->cancel(); + } catch (const std::exception&) { + // swallow any exceptions, the destructor can't throw + } +} + +template +template +auto SharedMutex::async_lock(CompletionToken&& token) +{ + return impl->async_lock(*this, std::forward(token)); +} + +template +void SharedMutex::lock() +{ + impl->lock(); +} + +template +void SharedMutex::lock(boost::system::error_code& ec) +{ + impl->lock(ec); +} + +template +bool SharedMutex::try_lock() +{ + return impl->try_lock(); +} + +template +void SharedMutex::unlock() +{ + impl->unlock(); +} + +template +template +auto SharedMutex::async_lock_shared(CompletionToken&& token) +{ + return impl->async_lock_shared(*this, std::forward(token)); +} + +template +void SharedMutex::lock_shared() +{ + impl->lock_shared(); +} + +template +void SharedMutex::lock_shared(boost::system::error_code& ec) +{ + impl->lock_shared(ec); +} + +template +bool SharedMutex::try_lock_shared() +{ + return impl->try_lock_shared(); +} + +template +void SharedMutex::unlock_shared() +{ + impl->unlock_shared(); +} + +template +void SharedMutex::cancel() +{ + impl->cancel(); +} + +} // namespace ceph::async + +#include "common/async/detail/shared_lock.h" diff --git a/src/common/async/waiter.h b/src/common/async/waiter.h new file mode 100644 index 000000000..219a27cf7 --- /dev/null +++ b/src/common/async/waiter.h @@ -0,0 +1,223 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMMON_WAITER_H +#define CEPH_COMMON_WAITER_H + +#include +#include + +#include + +#include "include/ceph_assert.h" +#include "include/function2.hpp" + +#include "common/ceph_mutex.h" + +namespace ceph::async { +namespace detail { +// For safety reasons (avoiding undefined behavior around sequence +// points) std::reference_wrapper disallows move construction. This +// harms us in cases where we want to pass a reference in to something +// that unavoidably moves. +// +// It should not be used generally. +template +class rvalue_reference_wrapper { +public: + // types + using type = T; + + rvalue_reference_wrapper(T& r) noexcept + : p(std::addressof(r)) {} + + // We write our semantics to match those of reference collapsing. If + // we're treated as an lvalue, collapse to one. + + rvalue_reference_wrapper(const rvalue_reference_wrapper&) noexcept = default; + rvalue_reference_wrapper(rvalue_reference_wrapper&&) noexcept = default; + + // assignment + rvalue_reference_wrapper& operator=( + const rvalue_reference_wrapper& x) noexcept = default; + rvalue_reference_wrapper& operator=( + rvalue_reference_wrapper&& x) noexcept = default; + + operator T& () const noexcept { + return *p; + } + T& get() const noexcept { + return *p; + } + + operator T&& () noexcept { + return std::move(*p); + } + T&& get() noexcept { + return std::move(*p); + } + + template + std::result_of_t operator ()(Args&&... args ) const { + return (*p)(std::forward(args)...); + } + + template + std::result_of_t operator ()(Args&&... args ) { + return std::move(*p)(std::forward(args)...); + } + +private: + T* p; +}; + +class base { +protected: + ceph::mutex lock = ceph::make_mutex("ceph::async::detail::base::lock"); + ceph::condition_variable cond; + bool has_value = false; + + ~base() = default; + + auto wait_base() { + std::unique_lock l(lock); + cond.wait(l, [this](){ return has_value; }); + return l; + } + + auto exec_base() { + std::unique_lock l(lock); + // There's no really good way to handle being called twice + // without being reset. + ceph_assert(!has_value); + has_value = true; + cond.notify_one(); + return l; + } +}; +} + +// waiter is a replacement for C_SafeCond and friends. It is the +// moral equivalent of a future but plays well with a world of +// callbacks. +template +class waiter; + +template<> +class waiter<> final : public detail::base { +public: + void wait() { + wait_base(); + has_value = false; + } + + void operator()() { + exec_base(); + } + + auto ref() { + return detail::rvalue_reference_wrapper(*this); + } + + + operator fu2::unique_function() { + return fu2::unique_function(ref()); + } +}; + +template +class waiter final : public detail::base { + std::aligned_storage_t ret; + +public: + Ret wait() { + auto l = wait_base(); + auto r = reinterpret_cast(&ret); + auto t = std::move(*r); + r->~Ret(); + has_value = false; + return t; + } + + void operator()(Ret&& _ret) { + auto l = exec_base(); + auto r = reinterpret_cast(&ret); + *r = std::move(_ret); + } + + void operator()(const Ret& _ret) { + auto l = exec_base(); + auto r = reinterpret_cast(&ret); + *r = std::move(_ret); + } + + auto ref() { + return detail::rvalue_reference_wrapper(*this); + } + + operator fu2::unique_function() { + return fu2::unique_function(ref()); + } + + ~waiter() { + if (has_value) + reinterpret_cast(&ret)->~Ret(); + } +}; + +template +class waiter final : public detail::base { + std::tuple ret; + +public: + std::tuple wait() { + using std::tuple; + auto l = wait_base(); + return std::move(ret); + auto r = reinterpret_cast*>(&ret); + auto t = std::move(*r); + r->~tuple(); + has_value = false; + return t; + } + + void operator()(Ret&&... _ret) { + auto l = exec_base(); + auto r = reinterpret_cast*>(&ret); + *r = std::forward_as_tuple(_ret...); + } + + void operator()(const Ret&... _ret) { + auto l = exec_base(); + auto r = reinterpret_cast*>(&ret); + *r = std::forward_as_tuple(_ret...); + } + + auto ref() { + return detail::rvalue_reference_wrapper(*this); + } + + operator fu2::unique_function() { + return fu2::unique_function(ref()); + } + + ~waiter() { + using std::tuple; + if (has_value) + reinterpret_cast*>(&ret)->~tuple(); + } +}; +} + +#endif // CEPH_COMMON_WAITER_H diff --git a/src/common/async/yield_context.h b/src/common/async/yield_context.h new file mode 100644 index 000000000..05e6ca614 --- /dev/null +++ b/src/common/async/yield_context.h @@ -0,0 +1,59 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include + +#include "acconfig.h" + +#include + +// use explicit executor types instead of the type-erased boost::asio::executor. +// coroutines wrap the default io_context executor with a strand executor +using yield_context = spawn::basic_yield_context< + boost::asio::executor_binder>>; + +/// optional-like wrapper for a spawn::yield_context and its associated +/// boost::asio::io_context. operations that take an optional_yield argument +/// will, when passed a non-empty yield context, suspend this coroutine instead +/// of the blocking the thread of execution +class optional_yield { + boost::asio::io_context *c = nullptr; + yield_context *y = nullptr; + public: + /// construct with a valid io and yield_context + explicit optional_yield(boost::asio::io_context& c, + yield_context& y) noexcept + : c(&c), y(&y) {} + + /// type tag to construct an empty object + struct empty_t {}; + optional_yield(empty_t) noexcept {} + + /// implicit conversion to bool, returns true if non-empty + operator bool() const noexcept { return y; } + + /// return a reference to the associated io_context. only valid if non-empty + boost::asio::io_context& get_io_context() const noexcept { return *c; } + + /// return a reference to the yield_context. only valid if non-empty + yield_context& get_yield_context() const noexcept { return *y; } +}; + +// type tag object to construct an empty optional_yield +static constexpr optional_yield::empty_t null_yield{}; -- cgit v1.2.3