summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_coroutine.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_coroutine.cc')
-rw-r--r--src/rgw/rgw_coroutine.cc1130
1 files changed, 1130 insertions, 0 deletions
diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc
new file mode 100644
index 000000000..a9c9c38e3
--- /dev/null
+++ b/src/rgw/rgw_coroutine.cc
@@ -0,0 +1,1130 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "include/Context.h"
+#include "common/ceph_json.h"
+#include "rgw_coroutine.h"
+
+// re-include our assert to clobber the system one; fix dout:
+#include "include/ceph_assert.h"
+
+#include <boost/asio/yield.hpp>
+
+#define dout_subsys ceph_subsys_rgw
+#define dout_context g_ceph_context
+
+using namespace std;
+
+class RGWCompletionManager::WaitContext : public Context {
+ RGWCompletionManager *manager;
+ void *opaque;
+public:
+ WaitContext(RGWCompletionManager *_cm, void *_opaque) : manager(_cm), opaque(_opaque) {}
+ void finish(int r) override {
+ manager->_wakeup(opaque);
+ }
+};
+
+RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct),
+ timer(cct, lock)
+{
+ timer.init();
+}
+
+RGWCompletionManager::~RGWCompletionManager()
+{
+ std::lock_guard l{lock};
+ timer.cancel_all_events();
+ timer.shutdown();
+}
+
+void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
+{
+ std::lock_guard l{lock};
+ _complete(cn, io_id, user_info);
+}
+
+void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn)
+{
+ std::lock_guard l{lock};
+ if (cn) {
+ cns.insert(cn);
+ }
+}
+
+void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn)
+{
+ std::lock_guard l{lock};
+ if (cn) {
+ cns.erase(cn);
+ }
+}
+
+void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
+{
+ if (cn) {
+ cns.erase(cn);
+ }
+
+ if (complete_reqs_set.find(io_id) != complete_reqs_set.end()) {
+ /* already have completion for this io_id, don't allow multiple completions for it */
+ return;
+ }
+ complete_reqs.push_back(io_completion{io_id, user_info});
+ cond.notify_all();
+}
+
+int RGWCompletionManager::get_next(io_completion *io)
+{
+ std::unique_lock l{lock};
+ while (complete_reqs.empty()) {
+ if (going_down) {
+ return -ECANCELED;
+ }
+ cond.wait(l);
+ }
+ *io = complete_reqs.front();
+ complete_reqs_set.erase(io->io_id);
+ complete_reqs.pop_front();
+ return 0;
+}
+
+bool RGWCompletionManager::try_get_next(io_completion *io)
+{
+ std::lock_guard l{lock};
+ if (complete_reqs.empty()) {
+ return false;
+ }
+ *io = complete_reqs.front();
+ complete_reqs_set.erase(io->io_id);
+ complete_reqs.pop_front();
+ return true;
+}
+
+void RGWCompletionManager::go_down()
+{
+ std::lock_guard l{lock};
+ for (auto cn : cns) {
+ cn->unregister();
+ }
+ going_down = true;
+ cond.notify_all();
+}
+
+void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info)
+{
+ std::lock_guard l{lock};
+ ceph_assert(waiters.find(opaque) == waiters.end());
+ waiters[opaque] = user_info;
+ timer.add_event_after(interval, new WaitContext(this, opaque));
+}
+
+void RGWCompletionManager::wakeup(void *opaque)
+{
+ std::lock_guard l{lock};
+ _wakeup(opaque);
+}
+
+void RGWCompletionManager::_wakeup(void *opaque)
+{
+ map<void *, void *>::iterator iter = waiters.find(opaque);
+ if (iter != waiters.end()) {
+ void *user_id = iter->second;
+ waiters.erase(iter);
+ _complete(NULL, rgw_io_id{0, -1} /* no IO id */, user_id);
+ }
+}
+
+RGWCoroutine::~RGWCoroutine() {
+ for (auto stack : spawned.entries) {
+ stack->put();
+ }
+}
+
+void RGWCoroutine::init_new_io(RGWIOProvider *io_provider)
+{
+ ceph_assert(stack); // if there's no stack, io_provider won't be uninitialized
+ stack->init_new_io(io_provider);
+}
+
+void RGWCoroutine::set_io_blocked(bool flag) {
+ if (stack) {
+ stack->set_io_blocked(flag);
+ }
+}
+
+void RGWCoroutine::set_sleeping(bool flag) {
+ if (stack) {
+ stack->set_sleeping(flag);
+ }
+}
+
+int RGWCoroutine::io_block(int ret, int64_t io_id) {
+ return io_block(ret, rgw_io_id{io_id, -1});
+}
+
+int RGWCoroutine::io_block(int ret, const rgw_io_id& io_id) {
+ if (!stack) {
+ return 0;
+ }
+ if (stack->consume_io_finish(io_id)) {
+ return 0;
+ }
+ set_io_blocked(true);
+ stack->set_io_blocked_id(io_id);
+ return ret;
+}
+
+void RGWCoroutine::io_complete(const rgw_io_id& io_id) {
+ if (stack) {
+ stack->io_complete(io_id);
+ }
+}
+
+void RGWCoroutine::StatusItem::dump(Formatter *f) const {
+ ::encode_json("timestamp", timestamp, f);
+ ::encode_json("status", status, f);
+}
+
+stringstream& RGWCoroutine::Status::set_status()
+{
+ std::unique_lock l{lock};
+ string s = status.str();
+ status.str(string());
+ if (!timestamp.is_zero()) {
+ history.push_back(StatusItem(timestamp, s));
+ }
+ if (history.size() > (size_t)max_history) {
+ history.pop_front();
+ }
+ timestamp = ceph_clock_now();
+
+ return status;
+}
+
+RGWCoroutinesManager::~RGWCoroutinesManager() {
+ stop();
+ completion_mgr->put();
+ if (cr_registry) {
+ cr_registry->remove(this);
+ }
+}
+
+int64_t RGWCoroutinesManager::get_next_io_id()
+{
+ return (int64_t)++max_io_id;
+}
+
+uint64_t RGWCoroutinesManager::get_next_stack_id() {
+ return (uint64_t)++max_stack_id;
+}
+
+RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
+ done_flag(false), error_flag(false), blocked_flag(false),
+ sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false),
+ retcode(0), run_count(0),
+ env(NULL), parent(NULL)
+{
+ id = ops_mgr->get_next_stack_id();
+ if (start) {
+ ops.push_back(start);
+ }
+ pos = ops.begin();
+}
+
+RGWCoroutinesStack::~RGWCoroutinesStack()
+{
+ for (auto op : ops) {
+ op->put();
+ }
+
+ for (auto stack : spawned.entries) {
+ stack->put();
+ }
+}
+
+int RGWCoroutinesStack::operate(const DoutPrefixProvider *dpp, RGWCoroutinesEnv *_env)
+{
+ env = _env;
+ RGWCoroutine *op = *pos;
+ op->stack = this;
+ ldpp_dout(dpp, 20) << *op << ": operate()" << dendl;
+ int r = op->operate_wrapper(dpp);
+ if (r < 0) {
+ ldpp_dout(dpp, 20) << *op << ": operate() returned r=" << r << dendl;
+ }
+
+ error_flag = op->is_error();
+
+ if (op->is_done()) {
+ int op_retcode = r;
+ r = unwind(op_retcode);
+ op->put();
+ done_flag = (pos == ops.end());
+ blocked_flag &= !done_flag;
+ if (done_flag) {
+ retcode = op_retcode;
+ }
+ return r;
+ }
+
+ /* should r ever be negative at this point? */
+ ceph_assert(r >= 0);
+
+ return 0;
+}
+
+string RGWCoroutinesStack::error_str()
+{
+ if (pos != ops.end()) {
+ return (*pos)->error_str();
+ }
+ return string();
+}
+
+void RGWCoroutinesStack::call(RGWCoroutine *next_op) {
+ if (!next_op) {
+ return;
+ }
+ ops.push_back(next_op);
+ if (pos != ops.end()) {
+ ++pos;
+ } else {
+ pos = ops.begin();
+ }
+}
+
+void RGWCoroutinesStack::schedule()
+{
+ env->manager->schedule(env, this);
+}
+
+void RGWCoroutinesStack::_schedule()
+{
+ env->manager->_schedule(env, this);
+}
+
+RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCoroutine *op, bool wait)
+{
+ if (!op) {
+ return NULL;
+ }
+
+ rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned);
+
+ RGWCoroutinesStack *stack = env->manager->allocate_stack();
+ s->add_pending(stack);
+ stack->parent = this;
+
+ stack->get(); /* we'll need to collect the stack */
+ stack->call(op);
+
+ env->manager->schedule(env, stack);
+
+ if (wait) {
+ set_blocked_by(stack);
+ }
+
+ return stack;
+}
+
+RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait)
+{
+ return spawn(NULL, op, wait);
+}
+
+int RGWCoroutinesStack::wait(const utime_t& interval)
+{
+ RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
+ completion_mgr->wait_interval((void *)this, interval, (void *)this);
+ set_io_blocked(true);
+ set_interval_wait(true);
+ return 0;
+}
+
+void RGWCoroutinesStack::wakeup()
+{
+ RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
+ completion_mgr->wakeup((void *)this);
+}
+
+void RGWCoroutinesStack::io_complete(const rgw_io_id& io_id)
+{
+ RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
+ completion_mgr->complete(nullptr, io_id, (void *)this);
+}
+
+int RGWCoroutinesStack::unwind(int retcode)
+{
+ rgw_spawned_stacks *src_spawned = &(*pos)->spawned;
+
+ if (pos == ops.begin()) {
+ ldout(cct, 15) << "stack " << (void *)this << " end" << dendl;
+ spawned.inherit(src_spawned);
+ ops.clear();
+ pos = ops.end();
+ return retcode;
+ }
+
+ --pos;
+ ops.pop_back();
+ RGWCoroutine *op = *pos;
+ op->set_retcode(retcode);
+ op->spawned.inherit(src_spawned);
+ return 0;
+}
+
+void RGWCoroutinesStack::cancel()
+{
+ while (!ops.empty()) {
+ RGWCoroutine *op = *pos;
+ unwind(-ECANCELED);
+ op->put();
+ }
+ put();
+}
+
+bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
+{
+ bool need_retry = false;
+ rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
+ *ret = 0;
+ vector<RGWCoroutinesStack *> new_list;
+
+ for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
+ RGWCoroutinesStack *stack = *iter;
+ if (stack == skip_stack || !stack->is_done()) {
+ new_list.push_back(stack);
+ if (!stack->is_done()) {
+ ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl;
+ } else if (stack == skip_stack) {
+ ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " explicitly skipping stack" << dendl;
+ }
+ continue;
+ }
+ if (stack_id) {
+ *stack_id = stack->get_id();
+ }
+ int r = stack->get_ret_status();
+ stack->put();
+ if (r < 0) {
+ *ret = r;
+ ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " encountered error (r=" << r << "), skipping next stacks" << dendl;
+ new_list.insert(new_list.end(), ++iter, s->entries.end());
+ need_retry = (iter != s->entries.end());
+ break;
+ }
+
+ ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is complete" << dendl;
+ }
+
+ s->entries.swap(new_list);
+ return need_retry;
+}
+
+bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
+{
+ rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
+ *ret = 0;
+
+ if (collected_stack) {
+ *collected_stack = NULL;
+ }
+
+ for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
+ RGWCoroutinesStack *stack = *iter;
+ if (!stack->is_done()) {
+ continue;
+ }
+ int r = stack->get_ret_status();
+ if (r < 0) {
+ *ret = r;
+ }
+
+ if (collected_stack) {
+ *collected_stack = stack;
+ }
+ stack->put();
+
+ s->entries.erase(iter);
+ return true;
+ }
+
+ return false;
+}
+
+bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
+{
+ return collect(NULL, ret, skip_stack, stack_id);
+}
+
+static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
+{
+ (static_cast<RGWAioCompletionNotifier *>(arg))->cb();
+}
+
+RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr),
+ io_id(_io_id),
+ user_data(_user_data), registered(true) {
+ c = librados::Rados::aio_create_completion(this, _aio_completion_notifier_cb);
+}
+
+RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
+{
+ return ops_mgr->create_completion_notifier(this);
+}
+
+RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr()
+{
+ return ops_mgr->get_completion_mgr();
+}
+
+bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s)
+{
+ if (blocking_stacks.empty()) {
+ return false;
+ }
+
+ set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin();
+ *s = *iter;
+ blocking_stacks.erase(iter);
+ (*s)->blocked_by_stack.erase(this);
+
+ return true;
+}
+
+void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op)
+{
+ if (!op) {
+ return;
+ }
+ string err = op->error_str();
+ if (err.empty()) {
+ return;
+ }
+ lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
+}
+
+void RGWCoroutinesStack::dump(Formatter *f) const {
+ stringstream ss;
+ ss << (void *)this;
+ ::encode_json("stack", ss.str(), f);
+ ::encode_json("run_count", run_count, f);
+ f->open_array_section("ops");
+ for (auto& i : ops) {
+ encode_json("op", *i, f);
+ }
+ f->close_section();
+}
+
+void RGWCoroutinesStack::init_new_io(RGWIOProvider *io_provider)
+{
+ io_provider->set_io_user_info((void *)this);
+ io_provider->assign_io(env->manager->get_io_id_provider());
+}
+
+bool RGWCoroutinesStack::try_io_unblock(const rgw_io_id& io_id)
+{
+ if (!can_io_unblock(io_id)) {
+ auto p = io_finish_ids.emplace(io_id.id, io_id);
+ auto& iter = p.first;
+ bool inserted = p.second;
+ if (!inserted) { /* could not insert, entry already existed, add channel to completion mask */
+ iter->second.channels |= io_id.channels;
+ }
+ return false;
+ }
+
+ return true;
+}
+
+bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id& io_id)
+{
+ auto iter = io_finish_ids.find(io_id.id);
+ if (iter == io_finish_ids.end()) {
+ return false;
+ }
+ int finish_mask = iter->second.channels;
+ bool found = (finish_mask & io_id.channels) != 0;
+
+ finish_mask &= ~(finish_mask & io_id.channels);
+
+ if (finish_mask == 0) {
+ io_finish_ids.erase(iter);
+ }
+ return found;
+}
+
+
+void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
+ RGWCompletionManager::io_completion& io, int *blocked_count, int *interval_wait_count)
+{
+ ceph_assert(ceph_mutex_is_wlocked(lock));
+ RGWCoroutinesStack *stack = static_cast<RGWCoroutinesStack *>(io.user_info);
+ if (context_stacks.find(stack) == context_stacks.end()) {
+ return;
+ }
+ if (!stack->try_io_unblock(io.io_id)) {
+ return;
+ }
+ if (stack->is_io_blocked()) {
+ --(*blocked_count);
+ stack->set_io_blocked(false);
+ if (stack->is_interval_waiting()) {
+ --(*interval_wait_count);
+ }
+ }
+ stack->set_interval_wait(false);
+ if (!stack->is_done()) {
+ if (!stack->is_scheduled) {
+ scheduled_stacks.push_back(stack);
+ stack->set_is_scheduled(true);
+ }
+ } else {
+ context_stacks.erase(stack);
+ stack->put();
+ }
+}
+
+void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
+{
+ std::unique_lock wl{lock};
+ _schedule(env, stack);
+}
+
+void RGWCoroutinesManager::_schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
+{
+ ceph_assert(ceph_mutex_is_wlocked(lock));
+ if (!stack->is_scheduled) {
+ env->scheduled_stacks->push_back(stack);
+ stack->set_is_scheduled(true);
+ }
+ set<RGWCoroutinesStack *>& context_stacks = run_contexts[env->run_context];
+ context_stacks.insert(stack);
+}
+
+void RGWCoroutinesManager::set_sleeping(RGWCoroutine *cr, bool flag)
+{
+ cr->set_sleeping(flag);
+}
+
+void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, const rgw_io_id& io_id)
+{
+ cr->io_complete(io_id);
+}
+
+int RGWCoroutinesManager::run(const DoutPrefixProvider *dpp, list<RGWCoroutinesStack *>& stacks)
+{
+ int ret = 0;
+ int blocked_count = 0;
+ int interval_wait_count = 0;
+ bool canceled = false; // set on going_down
+ RGWCoroutinesEnv env;
+ bool op_not_blocked;
+
+ uint64_t run_context = ++run_context_count;
+
+ lock.lock();
+ set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
+ list<RGWCoroutinesStack *> scheduled_stacks;
+ for (auto& st : stacks) {
+ context_stacks.insert(st);
+ scheduled_stacks.push_back(st);
+ st->set_is_scheduled(true);
+ }
+ env.run_context = run_context;
+ env.manager = this;
+ env.scheduled_stacks = &scheduled_stacks;
+
+ for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
+ RGWCompletionManager::io_completion io;
+ RGWCoroutinesStack *stack = *iter;
+ ++iter;
+ scheduled_stacks.pop_front();
+
+ if (context_stacks.find(stack) == context_stacks.end()) {
+ /* stack was probably schedule more than once due to IO, but was since complete */
+ goto next;
+ }
+ env.stack = stack;
+
+ lock.unlock();
+
+ ret = stack->operate(dpp, &env);
+
+ lock.lock();
+
+ stack->set_is_scheduled(false);
+ if (ret < 0) {
+ ldpp_dout(dpp, 20) << "stack->operate() returned ret=" << ret << dendl;
+ }
+
+ if (stack->is_error()) {
+ report_error(stack);
+ }
+
+ op_not_blocked = false;
+
+ if (stack->is_io_blocked()) {
+ ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is io blocked" << dendl;
+ if (stack->is_interval_waiting()) {
+ interval_wait_count++;
+ }
+ blocked_count++;
+ } else if (stack->is_blocked()) {
+ /* do nothing, we'll re-add the stack when the blocking stack is done,
+ * or when we're awaken
+ */
+ ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is_blocked_by_stack()=" << stack->is_blocked_by_stack()
+ << " is_sleeping=" << stack->is_sleeping() << " waiting_for_child()=" << stack->waiting_for_child() << dendl;
+ } else if (stack->is_done()) {
+ ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is done" << dendl;
+ RGWCoroutinesStack *s;
+ while (stack->unblock_stack(&s)) {
+ if (!s->is_blocked_by_stack() && !s->is_done()) {
+ if (s->is_io_blocked()) {
+ if (stack->is_interval_waiting()) {
+ interval_wait_count++;
+ }
+ blocked_count++;
+ } else {
+ s->_schedule();
+ }
+ }
+ }
+ if (stack->parent && stack->parent->waiting_for_child()) {
+ stack->parent->set_wait_for_child(false);
+ stack->parent->_schedule();
+ }
+ context_stacks.erase(stack);
+ stack->put();
+ stack = NULL;
+ } else {
+ op_not_blocked = true;
+ stack->run_count++;
+ stack->_schedule();
+ }
+
+ if (!op_not_blocked && stack) {
+ stack->run_count = 0;
+ }
+
+ while (completion_mgr->try_get_next(&io)) {
+ handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count, &interval_wait_count);
+ }
+
+ /*
+ * only account blocked operations that are not in interval_wait, these are stacks that
+ * were put on a wait without any real IO operations. While we mark these as io_blocked,
+ * these aren't really waiting for IOs
+ */
+ while (blocked_count - interval_wait_count >= ops_window) {
+ lock.unlock();
+ ret = completion_mgr->get_next(&io);
+ lock.lock();
+ if (ret < 0) {
+ ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
+ }
+ handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count, &interval_wait_count);
+ }
+
+next:
+ while (scheduled_stacks.empty() && blocked_count > 0) {
+ lock.unlock();
+ ret = completion_mgr->get_next(&io);
+ lock.lock();
+ if (ret < 0) {
+ ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
+ }
+ if (going_down) {
+ ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
+ ret = -ECANCELED;
+ canceled = true;
+ break;
+ }
+ handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count, &interval_wait_count);
+ iter = scheduled_stacks.begin();
+ }
+ if (canceled) {
+ break;
+ }
+
+ if (iter == scheduled_stacks.end()) {
+ iter = scheduled_stacks.begin();
+ }
+ }
+
+ if (!context_stacks.empty() && !going_down) {
+ JSONFormatter formatter(true);
+ formatter.open_array_section("context_stacks");
+ for (auto& s : context_stacks) {
+ ::encode_json("entry", *s, &formatter);
+ }
+ formatter.close_section();
+ lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n";
+ formatter.flush(*_dout);
+ *_dout << dendl;
+ ceph_assert(context_stacks.empty() || going_down); // assert on deadlock
+ }
+
+ for (auto stack : context_stacks) {
+ ldout(cct, 20) << "clearing stack on run() exit: stack=" << (void *)stack << " nref=" << stack->get_nref() << dendl;
+ stack->cancel();
+ }
+ run_contexts.erase(run_context);
+ lock.unlock();
+
+ return ret;
+}
+
+int RGWCoroutinesManager::run(const DoutPrefixProvider *dpp, RGWCoroutine *op)
+{
+ if (!op) {
+ return 0;
+ }
+ list<RGWCoroutinesStack *> stacks;
+ RGWCoroutinesStack *stack = allocate_stack();
+ op->get();
+ stack->call(op);
+
+ stacks.push_back(stack);
+
+ int r = run(dpp, stacks);
+ if (r < 0) {
+ ldpp_dout(dpp, 20) << "run(stacks) returned r=" << r << dendl;
+ } else {
+ r = op->get_ret_status();
+ }
+ op->put();
+
+ return r;
+}
+
+RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
+{
+ rgw_io_id io_id{get_next_io_id(), -1};
+ RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, io_id, (void *)stack);
+ completion_mgr->register_completion_notifier(cn);
+ return cn;
+}
+
+void RGWCoroutinesManager::dump(Formatter *f) const {
+ std::shared_lock rl{lock};
+
+ f->open_array_section("run_contexts");
+ for (auto& i : run_contexts) {
+ f->open_object_section("context");
+ ::encode_json("id", i.first, f);
+ f->open_array_section("entries");
+ for (auto& s : i.second) {
+ ::encode_json("entry", *s, f);
+ }
+ f->close_section();
+ f->close_section();
+ }
+ f->close_section();
+}
+
+RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() {
+ return new RGWCoroutinesStack(cct, this);
+}
+
+string RGWCoroutinesManager::get_id()
+{
+ if (!id.empty()) {
+ return id;
+ }
+ stringstream ss;
+ ss << (void *)this;
+ return ss.str();
+}
+
+void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr)
+{
+ std::unique_lock wl{lock};
+ if (managers.find(mgr) == managers.end()) {
+ managers.insert(mgr);
+ get();
+ }
+}
+
+void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr)
+{
+ std::unique_lock wl{lock};
+ if (managers.find(mgr) != managers.end()) {
+ managers.erase(mgr);
+ put();
+ }
+}
+
+RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry()
+{
+ AdminSocket *admin_socket = cct->get_admin_socket();
+ if (!admin_command.empty()) {
+ admin_socket->unregister_commands(this);
+ }
+}
+
+int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command)
+{
+ AdminSocket *admin_socket = cct->get_admin_socket();
+ if (!admin_command.empty()) {
+ admin_socket->unregister_commands(this);
+ }
+ admin_command = command;
+ int r = admin_socket->register_command(admin_command, this,
+ "dump current coroutines stack state");
+ if (r < 0) {
+ lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
+ return r;
+ }
+ return 0;
+}
+
+int RGWCoroutinesManagerRegistry::call(std::string_view command,
+ const cmdmap_t& cmdmap,
+ const bufferlist&,
+ Formatter *f,
+ std::ostream& ss,
+ bufferlist& out) {
+ std::shared_lock rl{lock};
+ ::encode_json("cr_managers", *this, f);
+ return 0;
+}
+
+void RGWCoroutinesManagerRegistry::dump(Formatter *f) const {
+ f->open_array_section("coroutine_managers");
+ for (auto m : managers) {
+ ::encode_json("entry", *m, f);
+ }
+ f->close_section();
+}
+
+void RGWCoroutine::call(RGWCoroutine *op)
+{
+ if (op) {
+ stack->call(op);
+ } else {
+ // the call()er expects this to set a retcode
+ retcode = 0;
+ }
+}
+
+RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait)
+{
+ return stack->spawn(this, op, wait);
+}
+
+bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
+{
+ return stack->collect(this, ret, skip_stack, stack_id);
+}
+
+bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
+{
+ return stack->collect_next(this, ret, collected_stack);
+}
+
+int RGWCoroutine::wait(const utime_t& interval)
+{
+ return stack->wait(interval);
+}
+
+void RGWCoroutine::wait_for_child()
+{
+ /* should only wait for child if there is a child that is not done yet, and no complete children */
+ if (spawned.entries.empty()) {
+ return;
+ }
+ for (vector<RGWCoroutinesStack *>::iterator iter = spawned.entries.begin(); iter != spawned.entries.end(); ++iter) {
+ if ((*iter)->is_done()) {
+ return;
+ }
+ }
+ stack->set_wait_for_child(true);
+}
+
+string RGWCoroutine::to_str() const
+{
+ return typeid(*this).name();
+}
+
+ostream& operator<<(ostream& out, const RGWCoroutine& cr)
+{
+ out << "cr:s=" << (void *)cr.get_stack() << ":op=" << (void *)&cr << ":" << typeid(cr).name();
+ return out;
+}
+
+bool RGWCoroutine::drain_children(int num_cr_left,
+ RGWCoroutinesStack *skip_stack,
+ std::optional<std::function<void(uint64_t stack_id, int ret)> > cb)
+{
+ bool done = false;
+ ceph_assert(num_cr_left >= 0);
+ if (num_cr_left == 0 && skip_stack) {
+ num_cr_left = 1;
+ }
+ reenter(&drain_status.cr) {
+ while (num_spawned() > (size_t)num_cr_left) {
+ yield wait_for_child();
+ int ret;
+ uint64_t stack_id;
+ bool again = false;
+ do {
+ again = collect(&ret, skip_stack, &stack_id);
+ if (ret < 0) {
+ ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
+ /* we should have reported this error */
+ log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
+ }
+ if (cb) {
+ (*cb)(stack_id, ret);
+ }
+ } while (again);
+ }
+ done = true;
+ }
+ return done;
+}
+
+bool RGWCoroutine::drain_children(int num_cr_left,
+ std::optional<std::function<int(uint64_t stack_id, int ret)> > cb)
+{
+ bool done = false;
+ ceph_assert(num_cr_left >= 0);
+
+ reenter(&drain_status.cr) {
+ while (num_spawned() > (size_t)num_cr_left) {
+ yield wait_for_child();
+ int ret;
+ uint64_t stack_id;
+ bool again = false;
+ do {
+ again = collect(&ret, nullptr, &stack_id);
+ if (ret < 0) {
+ ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
+ /* we should have reported this error */
+ log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
+ }
+ if (cb && !drain_status.should_exit) {
+ int r = (*cb)(stack_id, ret);
+ if (r < 0) {
+ drain_status.ret = r;
+ drain_status.should_exit = true;
+ num_cr_left = 0; /* need to drain all */
+ }
+ }
+ } while (again);
+ }
+ done = true;
+ }
+ return done;
+}
+
+void RGWCoroutine::wakeup()
+{
+ if (stack) {
+ stack->wakeup();
+ }
+}
+
+RGWCoroutinesEnv *RGWCoroutine::get_env() const
+{
+ return stack->get_env();
+}
+
+void RGWCoroutine::dump(Formatter *f) const {
+ if (!description.str().empty()) {
+ encode_json("description", description.str(), f);
+ }
+ encode_json("type", to_str(), f);
+ if (!spawned.entries.empty()) {
+ f->open_array_section("spawned");
+ for (auto& i : spawned.entries) {
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%p", (void *)i);
+ encode_json("stack", string(buf), f);
+ }
+ f->close_section();
+ }
+ if (!status.history.empty()) {
+ encode_json("history", status.history, f);
+ }
+
+ if (!status.status.str().empty()) {
+ f->open_object_section("status");
+ encode_json("status", status.status.str(), f);
+ encode_json("timestamp", status.timestamp, f);
+ f->close_section();
+ }
+}
+
+RGWSimpleCoroutine::~RGWSimpleCoroutine()
+{
+ if (!called_cleanup) {
+ request_cleanup();
+ }
+}
+
+void RGWSimpleCoroutine::call_cleanup()
+{
+ called_cleanup = true;
+ request_cleanup();
+}
+
+int RGWSimpleCoroutine::operate(const DoutPrefixProvider *dpp)
+{
+ int ret = 0;
+ reenter(this) {
+ yield return state_init();
+ yield return state_send_request(dpp);
+ yield return state_request_complete();
+ yield return state_all_complete();
+ drain_all();
+ call_cleanup();
+ return set_state(RGWCoroutine_Done, ret);
+ }
+ return 0;
+}
+
+int RGWSimpleCoroutine::state_init()
+{
+ int ret = init();
+ if (ret < 0) {
+ call_cleanup();
+ return set_state(RGWCoroutine_Error, ret);
+ }
+ return 0;
+}
+
+int RGWSimpleCoroutine::state_send_request(const DoutPrefixProvider *dpp)
+{
+ int ret = send_request(dpp);
+ if (ret < 0) {
+ call_cleanup();
+ return set_state(RGWCoroutine_Error, ret);
+ }
+ return io_block(0);
+}
+
+int RGWSimpleCoroutine::state_request_complete()
+{
+ int ret = request_complete();
+ if (ret < 0) {
+ call_cleanup();
+ return set_state(RGWCoroutine_Error, ret);
+ }
+ return 0;
+}
+
+int RGWSimpleCoroutine::state_all_complete()
+{
+ int ret = finish();
+ if (ret < 0) {
+ call_cleanup();
+ return set_state(RGWCoroutine_Error, ret);
+ }
+ return 0;
+}
+
+