// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #ifndef LIBRBD_TASK_FINISHER_H #define LIBRBD_TASK_FINISHER_H #include "include/common_fwd.h" #include "include/Context.h" #include "common/ceph_context.h" #include "common/Finisher.h" #include "common/ceph_mutex.h" #include "common/Timer.h" #include #include namespace librbd { struct TaskFinisherSingleton { ceph::mutex m_lock = ceph::make_mutex("librbd::TaskFinisher::m_lock"); SafeTimer *m_safe_timer; Finisher *m_finisher; static TaskFinisherSingleton& get_singleton(CephContext* cct) { return cct->lookup_or_create_singleton_object< TaskFinisherSingleton>("librbd::TaskFinisherSingleton", false, cct); } explicit TaskFinisherSingleton(CephContext *cct) { m_safe_timer = new SafeTimer(cct, m_lock, false); m_safe_timer->init(); m_finisher = new Finisher(cct, "librbd::TaskFinisher::m_finisher", "taskfin_librbd"); m_finisher->start(); } virtual ~TaskFinisherSingleton() { { std::lock_guard l{m_lock}; m_safe_timer->shutdown(); delete m_safe_timer; } m_finisher->wait_for_empty(); m_finisher->stop(); delete m_finisher; } void queue(Context* ctx, int r) { m_finisher->queue(ctx, r); } }; template class TaskFinisher { public: TaskFinisher(CephContext &cct) : m_cct(cct) { auto& singleton = TaskFinisherSingleton::get_singleton(&cct); m_lock = &singleton.m_lock; m_safe_timer = singleton.m_safe_timer; m_finisher = singleton.m_finisher; } bool cancel(const Task& task) { std::lock_guard l{*m_lock}; typename TaskContexts::iterator it = m_task_contexts.find(task); if (it == m_task_contexts.end()) { return false; } it->second.first->complete(-ECANCELED); m_safe_timer->cancel_event(it->second.second); m_task_contexts.erase(it); return true; } void cancel_all() { std::lock_guard l{*m_lock}; for (auto &[task, pair] : m_task_contexts) { pair.first->complete(-ECANCELED); m_safe_timer->cancel_event(pair.second); } m_task_contexts.clear(); } bool add_event_after(const Task& task, double seconds, Context *ctx) { std::lock_guard l{*m_lock}; if (m_task_contexts.count(task) != 0) { // task already scheduled on finisher or timer delete ctx; return false; } C_Task *timer_ctx = new C_Task(this, task); m_task_contexts[task] = std::make_pair(ctx, timer_ctx); m_safe_timer->add_event_after(seconds, timer_ctx); return true; } bool reschedule_event_after(const Task& task, double seconds) { std::lock_guard l{*m_lock}; auto it = m_task_contexts.find(task); if (it == m_task_contexts.end()) { return false; } bool canceled = m_safe_timer->cancel_event(it->second.second); if (!canceled) { return false; } auto timer_ctx = new C_Task(this, task); it->second.second = timer_ctx; m_safe_timer->add_event_after(seconds, timer_ctx); return true; } void queue(Context *ctx, int r = 0) { m_finisher->queue(ctx, r); } bool queue(const Task& task, Context *ctx) { std::lock_guard l{*m_lock}; typename TaskContexts::iterator it = m_task_contexts.find(task); if (it != m_task_contexts.end()) { if (it->second.second != NULL && m_safe_timer->cancel_event(it->second.second)) { it->second.first->complete(-ECANCELED); } else { // task already scheduled on the finisher ctx->complete(-ECANCELED); return false; } } m_task_contexts[task] = std::make_pair(ctx, reinterpret_cast(0)); m_finisher->queue(new C_Task(this, task)); return true; } private: class C_Task : public Context { public: C_Task(TaskFinisher *task_finisher, const Task& task) : m_task_finisher(task_finisher), m_task(task) { } protected: void finish(int r) override { m_task_finisher->complete(m_task); } private: TaskFinisher *m_task_finisher; Task m_task; }; CephContext &m_cct; ceph::mutex *m_lock; Finisher *m_finisher; SafeTimer *m_safe_timer; typedef std::map > TaskContexts; TaskContexts m_task_contexts; void complete(const Task& task) { Context *ctx = NULL; { std::lock_guard l{*m_lock}; typename TaskContexts::iterator it = m_task_contexts.find(task); if (it != m_task_contexts.end()) { ctx = it->second.first; m_task_contexts.erase(it); } } if (ctx != NULL) { ctx->complete(0); } } }; } // namespace librbd #endif // LIBRBD_TASK_FINISHER