summaryrefslogtreecommitdiffstats
path: root/src/rgw/driver/rados/rgw_pubsub_push.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/driver/rados/rgw_pubsub_push.cc')
-rw-r--r--src/rgw/driver/rados/rgw_pubsub_push.cc169
1 files changed, 55 insertions, 114 deletions
diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc
index bdb24ce9a..05dc9e65d 100644
--- a/src/rgw/driver/rados/rgw_pubsub_push.cc
+++ b/src/rgw/driver/rados/rgw_pubsub_push.cc
@@ -115,6 +115,55 @@ public:
}
};
+namespace {
+// this allows waiting untill "finish()" is called from a different thread
+// waiting could be blocking the waiting thread or yielding, depending
+// with compilation flag support and whether the optional_yield is set
+class Waiter {
+ using Signature = void(boost::system::error_code);
+ using Completion = ceph::async::Completion<Signature>;
+ using CompletionInit = boost::asio::async_completion<yield_context, Signature>;
+ std::unique_ptr<Completion> completion = nullptr;
+ int ret;
+
+ bool done = false;
+ mutable std::mutex lock;
+ mutable std::condition_variable cond;
+
+public:
+ int wait(optional_yield y) {
+ std::unique_lock l{lock};
+ if (done) {
+ return ret;
+ }
+ if (y) {
+ boost::system::error_code ec;
+ auto&& token = y.get_yield_context()[ec];
+ CompletionInit init(token);
+ completion = Completion::create(y.get_io_context().get_executor(),
+ std::move(init.completion_handler));
+ l.unlock();
+ init.result.get();
+ return -ec.value();
+ }
+ cond.wait(l, [this]{return (done==true);});
+ return ret;
+ }
+
+ void finish(int r) {
+ std::unique_lock l{lock};
+ ret = r;
+ done = true;
+ if (completion) {
+ boost::system::error_code ec(-ret, boost::system::system_category());
+ Completion::post(std::move(completion), ec);
+ } else {
+ cond.notify_all();
+ }
+ }
+};
+} // namespace
+
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
private:
@@ -187,71 +236,17 @@ public:
}
}
- // this allows waiting untill "finish()" is called from a different thread
- // waiting could be blocking the waiting thread or yielding, depending
- // with compilation flag support and whether the optional_yield is set
- class Waiter {
- using Signature = void(boost::system::error_code);
- using Completion = ceph::async::Completion<Signature>;
- std::unique_ptr<Completion> completion = nullptr;
- int ret;
-
- mutable std::atomic<bool> done = false;
- mutable std::mutex lock;
- mutable std::condition_variable cond;
-
- template <typename ExecutionContext, typename CompletionToken>
- auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
- boost::asio::async_completion<CompletionToken, Signature> init(token);
- auto& handler = init.completion_handler;
- {
- std::unique_lock l{lock};
- completion = Completion::create(ctx.get_executor(), std::move(handler));
- }
- return init.result.get();
- }
-
- public:
- int wait(optional_yield y) {
- if (done) {
- return ret;
- }
- if (y) {
- auto& io_ctx = y.get_io_context();
- auto& yield_ctx = y.get_yield_context();
- boost::system::error_code ec;
- async_wait(io_ctx, yield_ctx[ec]);
- return -ec.value();
- }
- std::unique_lock l(lock);
- cond.wait(l, [this]{return (done==true);});
- return ret;
- }
-
- void finish(int r) {
- std::unique_lock l{lock};
- ret = r;
- done = true;
- if (completion) {
- boost::system::error_code ec(-ret, boost::system::system_category());
- Completion::post(std::move(completion), ec);
- } else {
- cond.notify_all();
- }
- }
- };
-
int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
if (ack_level == ack_level_t::None) {
return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
} else {
// TODO: currently broker and routable are the same - this will require different flags but the same mechanism
- // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
- auto w = std::unique_ptr<Waiter>(new Waiter);
+ auto w = std::make_unique<Waiter>();
const auto rc = amqp::publish_with_confirm(conn_id,
topic,
json_format_pubsub_event(event),
- std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+ [wp = w.get()](int r) { wp->finish(r);}
+ );
if (rc < 0) {
// failed to publish, does not wait for reply
return rc;
@@ -314,70 +309,16 @@ public:
}
}
- // this allows waiting untill "finish()" is called from a different thread
- // waiting could be blocking the waiting thread or yielding, depending
- // with compilation flag support and whether the optional_yield is set
- class Waiter {
- using Signature = void(boost::system::error_code);
- using Completion = ceph::async::Completion<Signature>;
- std::unique_ptr<Completion> completion = nullptr;
- int ret;
-
- mutable std::atomic<bool> done = false;
- mutable std::mutex lock;
- mutable std::condition_variable cond;
-
- template <typename ExecutionContext, typename CompletionToken>
- auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
- boost::asio::async_completion<CompletionToken, Signature> init(token);
- auto& handler = init.completion_handler;
- {
- std::unique_lock l{lock};
- completion = Completion::create(ctx.get_executor(), std::move(handler));
- }
- return init.result.get();
- }
-
- public:
- int wait(optional_yield y) {
- if (done) {
- return ret;
- }
- if (y) {
- auto& io_ctx = y.get_io_context();
- auto& yield_ctx = y.get_yield_context();
- boost::system::error_code ec;
- async_wait(io_ctx, yield_ctx[ec]);
- return -ec.value();
- }
- std::unique_lock l(lock);
- cond.wait(l, [this]{return (done==true);});
- return ret;
- }
-
- void finish(int r) {
- std::unique_lock l{lock};
- ret = r;
- done = true;
- if (completion) {
- boost::system::error_code ec(-ret, boost::system::system_category());
- Completion::post(std::move(completion), ec);
- } else {
- cond.notify_all();
- }
- }
- };
-
int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
if (ack_level == ack_level_t::None) {
return kafka::publish(conn_name, topic, json_format_pubsub_event(event));
} else {
- // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
- auto w = std::unique_ptr<Waiter>(new Waiter);
+ auto w = std::make_unique<Waiter>();
const auto rc = kafka::publish_with_confirm(conn_name,
topic,
json_format_pubsub_event(event),
- std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+ [wp = w.get()](int r) { wp->finish(r); }
+ );
if (rc < 0) {
// failed to publish, does not wait for reply
return rc;