summaryrefslogtreecommitdiffstats
path: root/src/common/async/blocked_completion.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/async/blocked_completion.h')
-rw-r--r--src/common/async/blocked_completion.h290
1 files changed, 290 insertions, 0 deletions
diff --git a/src/common/async/blocked_completion.h b/src/common/async/blocked_completion.h
new file mode 100644
index 000000000..23a1319bc
--- /dev/null
+++ b/src/common/async/blocked_completion.h
@@ -0,0 +1,290 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat
+ * Author: Adam C. Emerson <aemerson@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
+#define CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
+
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <optional>
+#include <type_traits>
+
+#include <boost/asio/async_result.hpp>
+
+#include <boost/system/error_code.hpp>
+#include <boost/system/system_error.hpp>
+
+namespace ceph::async {
+
+namespace bs = boost::system;
+
+class use_blocked_t {
+ use_blocked_t(bs::error_code* ec) : ec(ec) {}
+public:
+ use_blocked_t() = default;
+
+ use_blocked_t operator [](bs::error_code& _ec) const {
+ return use_blocked_t(&_ec);
+ }
+
+ bs::error_code* ec = nullptr;
+};
+
+inline constexpr use_blocked_t use_blocked;
+
+namespace detail {
+
+template<typename... Ts>
+struct blocked_handler
+{
+ blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+
+ void operator ()(Ts... values) noexcept {
+ std::scoped_lock l(*m);
+ *ec = bs::error_code{};
+ *value = std::forward_as_tuple(std::move(values)...);
+ *done = true;
+ cv->notify_one();
+ }
+
+ void operator ()(bs::error_code ec, Ts... values) noexcept {
+ std::scoped_lock l(*m);
+ *this->ec = ec;
+ *value = std::forward_as_tuple(std::move(values)...);
+ *done = true;
+ cv->notify_one();
+ }
+
+ bs::error_code* ec;
+ std::optional<std::tuple<Ts...>>* value = nullptr;
+ std::mutex* m = nullptr;
+ std::condition_variable* cv = nullptr;
+ bool* done = nullptr;
+};
+
+template<typename T>
+struct blocked_handler<T>
+{
+ blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+
+ void operator ()(T value) noexcept {
+ std::scoped_lock l(*m);
+ *ec = bs::error_code();
+ *this->value = std::move(value);
+ *done = true;
+ cv->notify_one();
+ }
+
+ void operator ()(bs::error_code ec, T value) noexcept {
+ std::scoped_lock l(*m);
+ *this->ec = ec;
+ *this->value = std::move(value);
+ *done = true;
+ cv->notify_one();
+ }
+
+ //private:
+ bs::error_code* ec;
+ std::optional<T>* value;
+ std::mutex* m = nullptr;
+ std::condition_variable* cv = nullptr;
+ bool* done = nullptr;
+};
+
+template<>
+struct blocked_handler<void>
+{
+ blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+
+ void operator ()() noexcept {
+ std::scoped_lock l(*m);
+ *ec = bs::error_code{};
+ *done = true;
+ cv->notify_one();
+ }
+
+ void operator ()(bs::error_code ec) noexcept {
+ std::scoped_lock l(*m);
+ *this->ec = ec;
+ *done = true;
+ cv->notify_one();
+ }
+
+ bs::error_code* ec;
+ std::mutex* m = nullptr;
+ std::condition_variable* cv = nullptr;
+ bool* done = nullptr;
+};
+
+template<typename... Ts>
+class blocked_result
+{
+public:
+ using completion_handler_type = blocked_handler<Ts...>;
+ using return_type = std::tuple<Ts...>;
+
+ explicit blocked_result(completion_handler_type& h) noexcept {
+ std::scoped_lock l(m);
+ out_ec = h.ec;
+ if (!out_ec) h.ec = &ec;
+ h.value = &value;
+ h.m = &m;
+ h.cv = &cv;
+ h.done = &done;
+ }
+
+ return_type get() {
+ std::unique_lock l(m);
+ cv.wait(l, [this]() { return done; });
+ if (!out_ec && ec) throw bs::system_error(ec);
+ return std::move(*value);
+ }
+
+ blocked_result(const blocked_result&) = delete;
+ blocked_result& operator =(const blocked_result&) = delete;
+ blocked_result(blocked_result&&) = delete;
+ blocked_result& operator =(blocked_result&&) = delete;
+
+private:
+ bs::error_code* out_ec;
+ bs::error_code ec;
+ std::optional<return_type> value;
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+};
+
+template<typename T>
+class blocked_result<T>
+{
+public:
+ using completion_handler_type = blocked_handler<T>;
+ using return_type = T;
+
+ explicit blocked_result(completion_handler_type& h) noexcept {
+ std::scoped_lock l(m);
+ out_ec = h.ec;
+ if (!out_ec) h.ec = &ec;
+ h.value = &value;
+ h.m = &m;
+ h.cv = &cv;
+ h.done = &done;
+ }
+
+ return_type get() {
+ std::unique_lock l(m);
+ cv.wait(l, [this]() { return done; });
+ if (!out_ec && ec) throw bs::system_error(ec);
+ return std::move(*value);
+ }
+
+ blocked_result(const blocked_result&) = delete;
+ blocked_result& operator =(const blocked_result&) = delete;
+ blocked_result(blocked_result&&) = delete;
+ blocked_result& operator =(blocked_result&&) = delete;
+
+private:
+ bs::error_code* out_ec;
+ bs::error_code ec;
+ std::optional<return_type> value;
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+};
+
+template<>
+class blocked_result<void>
+{
+public:
+ using completion_handler_type = blocked_handler<void>;
+ using return_type = void;
+
+ explicit blocked_result(completion_handler_type& h) noexcept {
+ std::scoped_lock l(m);
+ out_ec = h.ec;
+ if (!out_ec) h.ec = &ec;
+ h.m = &m;
+ h.cv = &cv;
+ h.done = &done;
+ }
+
+ void get() {
+ std::unique_lock l(m);
+ cv.wait(l, [this]() { return done; });
+ if (!out_ec && ec) throw bs::system_error(ec);
+ }
+
+ blocked_result(const blocked_result&) = delete;
+ blocked_result& operator =(const blocked_result&) = delete;
+ blocked_result(blocked_result&&) = delete;
+ blocked_result& operator =(blocked_result&&) = delete;
+
+private:
+ bs::error_code* out_ec;
+ bs::error_code ec;
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+};
+} // namespace detail
+} // namespace ceph::async
+
+
+namespace boost::asio {
+template<typename ReturnType>
+class async_result<ceph::async::use_blocked_t, ReturnType()>
+ : public ceph::async::detail::blocked_result<void>
+{
+public:
+ explicit async_result(typename ceph::async::detail::blocked_result<void>
+ ::completion_handler_type& h)
+ : ceph::async::detail::blocked_result<void>(h) {}
+};
+
+template<typename ReturnType, typename... Args>
+class async_result<ceph::async::use_blocked_t, ReturnType(Args...)>
+ : public ceph::async::detail::blocked_result<std::decay_t<Args>...>
+{
+public:
+ explicit async_result(
+ typename ceph::async::detail::blocked_result<std::decay_t<Args>...>::completion_handler_type& h)
+ : ceph::async::detail::blocked_result<std::decay_t<Args>...>(h) {}
+};
+
+template<typename ReturnType>
+class async_result<ceph::async::use_blocked_t,
+ ReturnType(boost::system::error_code)>
+ : public ceph::async::detail::blocked_result<void>
+{
+public:
+ explicit async_result(
+ typename ceph::async::detail::blocked_result<void>::completion_handler_type& h)
+ : ceph::async::detail::blocked_result<void>(h) {}
+};
+
+template<typename ReturnType, typename... Args>
+class async_result<ceph::async::use_blocked_t,
+ ReturnType(boost::system::error_code, Args...)>
+ : public ceph::async::detail::blocked_result<std::decay_t<Args>...>
+{
+public:
+ explicit async_result(
+ typename ceph::async::detail::blocked_result<std::decay_t<Args>...>::completion_handler_type& h)
+ : ceph::async::detail::blocked_result<std::decay_t<Args>...>(h) {}
+};
+}
+
+#endif // !CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H