// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2020 Red Hat * Author: Adam C. Emerson * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H #define CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H #include #include #include #include #include #include #include #include namespace ceph::async { namespace bs = boost::system; class use_blocked_t { use_blocked_t(bs::error_code* ec) : ec(ec) {} public: use_blocked_t() = default; use_blocked_t operator [](bs::error_code& _ec) const { return use_blocked_t(&_ec); } bs::error_code* ec = nullptr; }; inline constexpr use_blocked_t use_blocked; namespace detail { template struct blocked_handler { blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} void operator ()(Ts... values) noexcept { std::scoped_lock l(*m); *ec = bs::error_code{}; *value = std::forward_as_tuple(std::move(values)...); *done = true; cv->notify_one(); } void operator ()(bs::error_code ec, Ts... values) noexcept { std::scoped_lock l(*m); *this->ec = ec; *value = std::forward_as_tuple(std::move(values)...); *done = true; cv->notify_one(); } bs::error_code* ec; std::optional>* value = nullptr; std::mutex* m = nullptr; std::condition_variable* cv = nullptr; bool* done = nullptr; }; template struct blocked_handler { blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} void operator ()(T value) noexcept { std::scoped_lock l(*m); *ec = bs::error_code(); *this->value = std::move(value); *done = true; cv->notify_one(); } void operator ()(bs::error_code ec, T value) noexcept { std::scoped_lock l(*m); *this->ec = ec; *this->value = std::move(value); *done = true; cv->notify_one(); } //private: bs::error_code* ec; std::optional* value; std::mutex* m = nullptr; std::condition_variable* cv = nullptr; bool* done = nullptr; }; template<> struct blocked_handler { blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} void operator ()() noexcept { std::scoped_lock l(*m); *ec = bs::error_code{}; *done = true; cv->notify_one(); } void operator ()(bs::error_code ec) noexcept { std::scoped_lock l(*m); *this->ec = ec; *done = true; cv->notify_one(); } bs::error_code* ec; std::mutex* m = nullptr; std::condition_variable* cv = nullptr; bool* done = nullptr; }; template class blocked_result { public: using completion_handler_type = blocked_handler; using return_type = std::tuple; explicit blocked_result(completion_handler_type& h) noexcept { std::scoped_lock l(m); out_ec = h.ec; if (!out_ec) h.ec = &ec; h.value = &value; h.m = &m; h.cv = &cv; h.done = &done; } return_type get() { std::unique_lock l(m); cv.wait(l, [this]() { return done; }); if (!out_ec && ec) throw bs::system_error(ec); return std::move(*value); } blocked_result(const blocked_result&) = delete; blocked_result& operator =(const blocked_result&) = delete; blocked_result(blocked_result&&) = delete; blocked_result& operator =(blocked_result&&) = delete; private: bs::error_code* out_ec; bs::error_code ec; std::optional value; std::mutex m; std::condition_variable cv; bool done = false; }; template class blocked_result { public: using completion_handler_type = blocked_handler; using return_type = T; explicit blocked_result(completion_handler_type& h) noexcept { std::scoped_lock l(m); out_ec = h.ec; if (!out_ec) h.ec = &ec; h.value = &value; h.m = &m; h.cv = &cv; h.done = &done; } return_type get() { std::unique_lock l(m); cv.wait(l, [this]() { return done; }); if (!out_ec && ec) throw bs::system_error(ec); return std::move(*value); } blocked_result(const blocked_result&) = delete; blocked_result& operator =(const blocked_result&) = delete; blocked_result(blocked_result&&) = delete; blocked_result& operator =(blocked_result&&) = delete; private: bs::error_code* out_ec; bs::error_code ec; std::optional value; std::mutex m; std::condition_variable cv; bool done = false; }; template<> class blocked_result { public: using completion_handler_type = blocked_handler; using return_type = void; explicit blocked_result(completion_handler_type& h) noexcept { std::scoped_lock l(m); out_ec = h.ec; if (!out_ec) h.ec = &ec; h.m = &m; h.cv = &cv; h.done = &done; } void get() { std::unique_lock l(m); cv.wait(l, [this]() { return done; }); if (!out_ec && ec) throw bs::system_error(ec); } blocked_result(const blocked_result&) = delete; blocked_result& operator =(const blocked_result&) = delete; blocked_result(blocked_result&&) = delete; blocked_result& operator =(blocked_result&&) = delete; private: bs::error_code* out_ec; bs::error_code ec; std::mutex m; std::condition_variable cv; bool done = false; }; } // namespace detail } // namespace ceph::async namespace boost::asio { template class async_result : public ceph::async::detail::blocked_result { public: explicit async_result(typename ceph::async::detail::blocked_result ::completion_handler_type& h) : ceph::async::detail::blocked_result(h) {} }; template class async_result : public ceph::async::detail::blocked_result...> { public: explicit async_result( typename ceph::async::detail::blocked_result...>::completion_handler_type& h) : ceph::async::detail::blocked_result...>(h) {} }; template class async_result : public ceph::async::detail::blocked_result { public: explicit async_result( typename ceph::async::detail::blocked_result::completion_handler_type& h) : ceph::async::detail::blocked_result(h) {} }; template class async_result : public ceph::async::detail::blocked_result...> { public: explicit async_result( typename ceph::async::detail::blocked_result...>::completion_handler_type& h) : ceph::async::detail::blocked_result...>(h) {} }; } #endif // !CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H