From 389020e14594e4894e28d1eb9103c210b142509e Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 23 May 2024 18:45:13 +0200 Subject: Adding upstream version 18.2.3. Signed-off-by: Daniel Baumann --- src/rgw/driver/rados/rgw_pubsub_push.cc | 169 +++++++++++--------------------- 1 file changed, 55 insertions(+), 114 deletions(-) (limited to 'src/rgw/driver/rados/rgw_pubsub_push.cc') diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index bdb24ce9a..05dc9e65d 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -115,6 +115,55 @@ public: } }; +namespace { +// this allows waiting untill "finish()" is called from a different thread +// waiting could be blocking the waiting thread or yielding, depending +// with compilation flag support and whether the optional_yield is set +class Waiter { + using Signature = void(boost::system::error_code); + using Completion = ceph::async::Completion; + using CompletionInit = boost::asio::async_completion; + std::unique_ptr completion = nullptr; + int ret; + + bool done = false; + mutable std::mutex lock; + mutable std::condition_variable cond; + +public: + int wait(optional_yield y) { + std::unique_lock l{lock}; + if (done) { + return ret; + } + if (y) { + boost::system::error_code ec; + auto&& token = y.get_yield_context()[ec]; + CompletionInit init(token); + completion = Completion::create(y.get_io_context().get_executor(), + std::move(init.completion_handler)); + l.unlock(); + init.result.get(); + return -ec.value(); + } + cond.wait(l, [this]{return (done==true);}); + return ret; + } + + void finish(int r) { + std::unique_lock l{lock}; + ret = r; + done = true; + if (completion) { + boost::system::error_code ec(-ret, boost::system::system_category()); + Completion::post(std::move(completion), ec); + } else { + cond.notify_all(); + } + } +}; +} // namespace + #ifdef WITH_RADOSGW_AMQP_ENDPOINT class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { private: @@ -187,71 +236,17 @@ public: } } - // this allows waiting untill "finish()" is called from a different thread - // waiting could be blocking the waiting thread or yielding, depending - // with compilation flag support and whether the optional_yield is set - class Waiter { - using Signature = void(boost::system::error_code); - using Completion = ceph::async::Completion; - std::unique_ptr completion = nullptr; - int ret; - - mutable std::atomic done = false; - mutable std::mutex lock; - mutable std::condition_variable cond; - - template - auto async_wait(ExecutionContext& ctx, CompletionToken&& token) { - boost::asio::async_completion init(token); - auto& handler = init.completion_handler; - { - std::unique_lock l{lock}; - completion = Completion::create(ctx.get_executor(), std::move(handler)); - } - return init.result.get(); - } - - public: - int wait(optional_yield y) { - if (done) { - return ret; - } - if (y) { - auto& io_ctx = y.get_io_context(); - auto& yield_ctx = y.get_yield_context(); - boost::system::error_code ec; - async_wait(io_ctx, yield_ctx[ec]); - return -ec.value(); - } - std::unique_lock l(lock); - cond.wait(l, [this]{return (done==true);}); - return ret; - } - - void finish(int r) { - std::unique_lock l{lock}; - ret = r; - done = true; - if (completion) { - boost::system::error_code ec(-ret, boost::system::system_category()); - Completion::post(std::move(completion), ec); - } else { - cond.notify_all(); - } - } - }; - int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { if (ack_level == ack_level_t::None) { return amqp::publish(conn_id, topic, json_format_pubsub_event(event)); } else { // TODO: currently broker and routable are the same - this will require different flags but the same mechanism - // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine - auto w = std::unique_ptr(new Waiter); + auto w = std::make_unique(); const auto rc = amqp::publish_with_confirm(conn_id, topic, json_format_pubsub_event(event), - std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); + [wp = w.get()](int r) { wp->finish(r);} + ); if (rc < 0) { // failed to publish, does not wait for reply return rc; @@ -314,70 +309,16 @@ public: } } - // this allows waiting untill "finish()" is called from a different thread - // waiting could be blocking the waiting thread or yielding, depending - // with compilation flag support and whether the optional_yield is set - class Waiter { - using Signature = void(boost::system::error_code); - using Completion = ceph::async::Completion; - std::unique_ptr completion = nullptr; - int ret; - - mutable std::atomic done = false; - mutable std::mutex lock; - mutable std::condition_variable cond; - - template - auto async_wait(ExecutionContext& ctx, CompletionToken&& token) { - boost::asio::async_completion init(token); - auto& handler = init.completion_handler; - { - std::unique_lock l{lock}; - completion = Completion::create(ctx.get_executor(), std::move(handler)); - } - return init.result.get(); - } - - public: - int wait(optional_yield y) { - if (done) { - return ret; - } - if (y) { - auto& io_ctx = y.get_io_context(); - auto& yield_ctx = y.get_yield_context(); - boost::system::error_code ec; - async_wait(io_ctx, yield_ctx[ec]); - return -ec.value(); - } - std::unique_lock l(lock); - cond.wait(l, [this]{return (done==true);}); - return ret; - } - - void finish(int r) { - std::unique_lock l{lock}; - ret = r; - done = true; - if (completion) { - boost::system::error_code ec(-ret, boost::system::system_category()); - Completion::post(std::move(completion), ec); - } else { - cond.notify_all(); - } - } - }; - int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { if (ack_level == ack_level_t::None) { return kafka::publish(conn_name, topic, json_format_pubsub_event(event)); } else { - // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine - auto w = std::unique_ptr(new Waiter); + auto w = std::make_unique(); const auto rc = kafka::publish_with_confirm(conn_name, topic, json_format_pubsub_event(event), - std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); + [wp = w.get()](int r) { wp->finish(r); } + ); if (rc < 0) { // failed to publish, does not wait for reply return rc; -- cgit v1.2.3