// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "LeaderWatcher.h" #include "common/Cond.h" #include "common/Timer.h" #include "common/debug.h" #include "common/errno.h" #include "cls/rbd/cls_rbd_client.h" #include "include/stringify.h" #include "librbd/Utils.h" #include "librbd/asio/ContextWQ.h" #include "librbd/watcher/Types.h" #include "Threads.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd_mirror #undef dout_prefix #define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \ << this << " " << __func__ << ": " namespace rbd { namespace mirror { using namespace leader_watcher; using librbd::util::create_async_context_callback; using librbd::util::create_context_callback; using librbd::util::create_rados_callback; template LeaderWatcher::LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx, leader_watcher::Listener *listener) : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER), m_threads(threads), m_listener(listener), m_instances_listener(this), m_lock(ceph::make_mutex("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name())), m_notifier_id(librados::Rados(io_ctx).get_instance_id()), m_instance_id(stringify(m_notifier_id)), m_leader_lock(new LeaderLock(m_ioctx, *m_threads->asio_engine, m_oid, this, true, m_cct->_conf.get_val( "rbd_blocklist_expire_seconds"))) { } template LeaderWatcher::~LeaderWatcher() { ceph_assert(m_instances == nullptr); ceph_assert(m_timer_task == nullptr); delete m_leader_lock; } template std::string LeaderWatcher::get_instance_id() { return m_instance_id; } template int LeaderWatcher::init() { C_SaferCond init_ctx; init(&init_ctx); return init_ctx.wait(); } template void LeaderWatcher::init(Context *on_finish) { dout(10) << "notifier_id=" << m_notifier_id << dendl; std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); m_on_finish = on_finish; create_leader_object(); } template void LeaderWatcher::create_leader_object() { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_lock)); librados::ObjectWriteOperation op; op.create(false); librados::AioCompletion *aio_comp = create_rados_callback< LeaderWatcher, &LeaderWatcher::handle_create_leader_object>(this); int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); ceph_assert(r == 0); aio_comp->release(); } template void LeaderWatcher::handle_create_leader_object(int r) { dout(10) << "r=" << r << dendl; Context *on_finish = nullptr; { std::lock_guard locker{m_lock}; if (r == 0) { register_watch(); return; } derr << "error creating " << m_oid << " object: " << cpp_strerror(r) << dendl; std::swap(on_finish, m_on_finish); } on_finish->complete(r); } template void LeaderWatcher::register_watch() { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< LeaderWatcher, &LeaderWatcher::handle_register_watch>(this)); librbd::Watcher::register_watch(ctx); } template void LeaderWatcher::handle_register_watch(int r) { dout(10) << "r=" << r << dendl; Context *on_finish = nullptr; { std::lock_guard timer_locker(m_threads->timer_lock); std::lock_guard locker{m_lock}; if (r < 0) { derr << "error registering leader watcher for " << m_oid << " object: " << cpp_strerror(r) << dendl; } else { schedule_acquire_leader_lock(0); } ceph_assert(m_on_finish != nullptr); std::swap(on_finish, m_on_finish); } on_finish->complete(r); } template void LeaderWatcher::shut_down() { C_SaferCond shut_down_ctx; shut_down(&shut_down_ctx); int r = shut_down_ctx.wait(); ceph_assert(r == 0); } template void LeaderWatcher::shut_down(Context *on_finish) { dout(10) << dendl; std::scoped_lock locker{m_threads->timer_lock, m_lock}; ceph_assert(m_on_shut_down_finish == nullptr); m_on_shut_down_finish = on_finish; cancel_timer_task(); shut_down_leader_lock(); } template void LeaderWatcher::shut_down_leader_lock() { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< LeaderWatcher, &LeaderWatcher::handle_shut_down_leader_lock>(this)); m_leader_lock->shut_down(ctx); } template void LeaderWatcher::handle_shut_down_leader_lock(int r) { dout(10) << "r=" << r << dendl; std::lock_guard locker{m_lock}; if (r < 0) { derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl; } unregister_watch(); } template void LeaderWatcher::unregister_watch() { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< LeaderWatcher, &LeaderWatcher::handle_unregister_watch>(this)); librbd::Watcher::unregister_watch(ctx); } template void LeaderWatcher::handle_unregister_watch(int r) { dout(10) << "r=" << r << dendl; if (r < 0) { derr << "error unregistering leader watcher for " << m_oid << " object: " << cpp_strerror(r) << dendl; } wait_for_tasks(); } template void LeaderWatcher::wait_for_tasks() { dout(10) << dendl; std::scoped_lock locker{m_threads->timer_lock, m_lock}; schedule_timer_task("wait for tasks", 0, false, &LeaderWatcher::handle_wait_for_tasks, true); } template void LeaderWatcher::handle_wait_for_tasks() { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_on_shut_down_finish != nullptr); ceph_assert(!m_timer_op_tracker.empty()); m_timer_op_tracker.finish_op(); auto ctx = new LambdaContext([this](int r) { Context *on_finish; { // ensure lock isn't held when completing shut down std::lock_guard locker{m_lock}; ceph_assert(m_on_shut_down_finish != nullptr); on_finish = m_on_shut_down_finish; } on_finish->complete(0); }); m_work_queue->queue(ctx, 0); } template bool LeaderWatcher::is_blocklisted() const { std::lock_guard locker{m_lock}; return m_blocklisted; } template bool LeaderWatcher::is_leader() const { std::lock_guard locker{m_lock}; return is_leader(m_lock); } template bool LeaderWatcher::is_leader(ceph::mutex &lock) const { ceph_assert(ceph_mutex_is_locked(m_lock)); bool leader = m_leader_lock->is_leader(); dout(10) << leader << dendl; return leader; } template bool LeaderWatcher::is_releasing_leader() const { std::lock_guard locker{m_lock}; return is_releasing_leader(m_lock); } template bool LeaderWatcher::is_releasing_leader(ceph::mutex &lock) const { ceph_assert(ceph_mutex_is_locked(m_lock)); bool releasing = m_leader_lock->is_releasing_leader(); dout(10) << releasing << dendl; return releasing; } template bool LeaderWatcher::get_leader_instance_id(std::string *instance_id) const { dout(10) << dendl; std::lock_guard locker{m_lock}; if (is_leader(m_lock) || is_releasing_leader(m_lock)) { *instance_id = m_instance_id; return true; } if (!m_locker.cookie.empty()) { *instance_id = stringify(m_locker.entity.num()); return true; } return false; } template void LeaderWatcher::release_leader() { dout(10) << dendl; std::lock_guard locker{m_lock}; if (!is_leader(m_lock)) { return; } release_leader_lock(); } template void LeaderWatcher::list_instances(std::vector *instance_ids) { dout(10) << dendl; std::lock_guard locker{m_lock}; instance_ids->clear(); if (m_instances != nullptr) { m_instances->list(instance_ids); } } template void LeaderWatcher::cancel_timer_task() { ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph_assert(ceph_mutex_is_locked(m_lock)); if (m_timer_task == nullptr) { return; } dout(10) << m_timer_task << dendl; bool canceled = m_threads->timer->cancel_event(m_timer_task); ceph_assert(canceled); m_timer_task = nullptr; } template void LeaderWatcher::schedule_timer_task(const std::string &name, int delay_factor, bool leader, TimerCallback timer_callback, bool shutting_down) { ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph_assert(ceph_mutex_is_locked(m_lock)); if (!shutting_down && m_on_shut_down_finish != nullptr) { return; } cancel_timer_task(); m_timer_task = new LambdaContext( [this, leader, timer_callback](int r) { ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); m_timer_task = nullptr; if (m_timer_op_tracker.empty()) { std::lock_guard locker{m_lock}; execute_timer_task(leader, timer_callback); return; } // old timer task is still running -- do not start next // task until the previous task completes if (m_timer_gate == nullptr) { m_timer_gate = new C_TimerGate(this); m_timer_op_tracker.wait_for_ops(m_timer_gate); } m_timer_gate->leader = leader; m_timer_gate->timer_callback = timer_callback; }); int after = delay_factor * m_cct->_conf.get_val( "rbd_mirror_leader_heartbeat_interval"); dout(10) << "scheduling " << name << " after " << after << " sec (task " << m_timer_task << ")" << dendl; m_threads->timer->add_event_after(after, m_timer_task); } template void LeaderWatcher::execute_timer_task(bool leader, TimerCallback timer_callback) { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_timer_op_tracker.empty()); if (is_leader(m_lock) != leader) { return; } m_timer_op_tracker.start_op(); (this->*timer_callback)(); } template void LeaderWatcher::handle_post_acquire_leader_lock(int r, Context *on_finish) { dout(10) << "r=" << r << dendl; if (r < 0) { if (r == -EAGAIN) { dout(10) << "already locked" << dendl; } else { derr << "error acquiring leader lock: " << cpp_strerror(r) << dendl; } on_finish->complete(r); return; } std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); m_on_finish = on_finish; m_ret_val = 0; init_instances(); } template void LeaderWatcher::handle_pre_release_leader_lock(Context *on_finish) { dout(10) << dendl; std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); m_on_finish = on_finish; m_ret_val = 0; notify_listener(); } template void LeaderWatcher::handle_post_release_leader_lock(int r, Context *on_finish) { dout(10) << "r=" << r << dendl; if (r < 0) { on_finish->complete(r); return; } std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); m_on_finish = on_finish; notify_lock_released(); } template void LeaderWatcher::break_leader_lock() { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(!m_timer_op_tracker.empty()); if (m_locker.cookie.empty()) { get_locker(); return; } Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< LeaderWatcher, &LeaderWatcher::handle_break_leader_lock>(this)); m_leader_lock->break_lock(m_locker, true, ctx); } template void LeaderWatcher::handle_break_leader_lock(int r) { dout(10) << "r=" << r << dendl; std::scoped_lock locker{m_threads->timer_lock, m_lock}; ceph_assert(!m_timer_op_tracker.empty()); if (m_leader_lock->is_shutdown()) { dout(10) << "canceling due to shutdown" << dendl; m_timer_op_tracker.finish_op(); return; } if (r < 0 && r != -ENOENT) { derr << "error breaking leader lock: " << cpp_strerror(r) << dendl; schedule_acquire_leader_lock(1); m_timer_op_tracker.finish_op(); return; } m_locker = {}; m_acquire_attempts = 0; acquire_leader_lock(); } template void LeaderWatcher::schedule_get_locker(bool reset_leader, uint32_t delay_factor) { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph_assert(ceph_mutex_is_locked(m_lock)); if (reset_leader) { m_locker = {}; m_acquire_attempts = 0; } schedule_timer_task("get locker", delay_factor, false, &LeaderWatcher::get_locker, false); } template void LeaderWatcher::get_locker() { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(!m_timer_op_tracker.empty()); C_GetLocker *get_locker_ctx = new C_GetLocker(this); Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx); m_leader_lock->get_locker(&get_locker_ctx->locker, ctx); } template void LeaderWatcher::handle_get_locker(int r, librbd::managed_lock::Locker& locker) { dout(10) << "r=" << r << dendl; std::scoped_lock l{m_threads->timer_lock, m_lock}; ceph_assert(!m_timer_op_tracker.empty()); if (m_leader_lock->is_shutdown()) { dout(10) << "canceling due to shutdown" << dendl; m_timer_op_tracker.finish_op(); return; } if (is_leader(m_lock)) { m_locker = {}; m_timer_op_tracker.finish_op(); return; } if (r == -ENOENT) { m_locker = {}; m_acquire_attempts = 0; acquire_leader_lock(); return; } else if (r < 0) { derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl; schedule_get_locker(true, 1); m_timer_op_tracker.finish_op(); return; } bool notify_listener = false; if (m_locker != locker) { m_locker = locker; notify_listener = true; if (m_acquire_attempts > 1) { dout(10) << "new lock owner detected -- resetting heartbeat counter" << dendl; m_acquire_attempts = 0; } } if (m_acquire_attempts >= m_cct->_conf.get_val( "rbd_mirror_leader_max_acquire_attempts_before_break")) { dout(0) << "breaking leader lock after " << m_acquire_attempts << " " << "failed attempts to acquire" << dendl; break_leader_lock(); return; } schedule_acquire_leader_lock(1); if (!notify_listener) { m_timer_op_tracker.finish_op(); return; } auto ctx = new LambdaContext( [this](int r) { std::string instance_id; if (get_leader_instance_id(&instance_id)) { m_listener->update_leader_handler(instance_id); } std::scoped_lock locker{m_threads->timer_lock, m_lock}; m_timer_op_tracker.finish_op(); }); m_work_queue->queue(ctx, 0); } template void LeaderWatcher::schedule_acquire_leader_lock(uint32_t delay_factor) { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph_assert(ceph_mutex_is_locked(m_lock)); schedule_timer_task("acquire leader lock", delay_factor * m_cct->_conf.get_val("rbd_mirror_leader_max_missed_heartbeats"), false, &LeaderWatcher::acquire_leader_lock, false); } template void LeaderWatcher::acquire_leader_lock() { ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(!m_timer_op_tracker.empty()); ++m_acquire_attempts; dout(10) << "acquire_attempts=" << m_acquire_attempts << dendl; Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< LeaderWatcher, &LeaderWatcher::handle_acquire_leader_lock>(this)); m_leader_lock->try_acquire_lock(ctx); } template void LeaderWatcher::handle_acquire_leader_lock(int r) { dout(10) << "r=" << r << dendl; std::scoped_lock locker{m_threads->timer_lock, m_lock}; ceph_assert(!m_timer_op_tracker.empty()); if (m_leader_lock->is_shutdown()) { dout(10) << "canceling due to shutdown" << dendl; m_timer_op_tracker.finish_op(); return; } if (r < 0) { if (r == -EAGAIN) { dout(10) << "already locked" << dendl; } else { derr << "error acquiring lock: " << cpp_strerror(r) << dendl; } get_locker(); return; } m_locker = {}; m_acquire_attempts = 0; if (m_ret_val) { dout(5) << "releasing due to error on notify" << dendl; release_leader_lock(); m_timer_op_tracker.finish_op(); return; } notify_heartbeat(); } template void LeaderWatcher::release_leader_lock() { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< LeaderWatcher, &LeaderWatcher::handle_release_leader_lock>(this)); m_leader_lock->release_lock(ctx); } template void LeaderWatcher::handle_release_leader_lock(int r) { dout(10) << "r=" << r << dendl; std::scoped_lock locker{m_threads->timer_lock, m_lock}; if (r < 0) { derr << "error releasing lock: " << cpp_strerror(r) << dendl; return; } schedule_acquire_leader_lock(1); } template void LeaderWatcher::init_instances() { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_instances == nullptr); m_instances = Instances::create(m_threads, m_ioctx, m_instance_id, m_instances_listener); Context *ctx = create_context_callback< LeaderWatcher, &LeaderWatcher::handle_init_instances>(this); m_instances->init(ctx); } template void LeaderWatcher::handle_init_instances(int r) { dout(10) << "r=" << r << dendl; Context *on_finish = nullptr; if (r < 0) { std::lock_guard locker{m_lock}; derr << "error initializing instances: " << cpp_strerror(r) << dendl; m_instances->destroy(); m_instances = nullptr; ceph_assert(m_on_finish != nullptr); std::swap(m_on_finish, on_finish); } else { std::lock_guard locker{m_lock}; notify_listener(); return; } on_finish->complete(r); } template void LeaderWatcher::shut_down_instances() { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_instances != nullptr); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback, &LeaderWatcher::handle_shut_down_instances>(this)); m_instances->shut_down(ctx); } template void LeaderWatcher::handle_shut_down_instances(int r) { dout(10) << "r=" << r << dendl; ceph_assert(r == 0); Context *on_finish = nullptr; { std::lock_guard locker{m_lock}; m_instances->destroy(); m_instances = nullptr; ceph_assert(m_on_finish != nullptr); std::swap(m_on_finish, on_finish); } on_finish->complete(r); } template void LeaderWatcher::notify_listener() { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< LeaderWatcher, &LeaderWatcher::handle_notify_listener>(this)); if (is_leader(m_lock)) { ctx = new LambdaContext( [this, ctx](int r) { m_listener->post_acquire_handler(ctx); }); } else { ctx = new LambdaContext( [this, ctx](int r) { m_listener->pre_release_handler(ctx); }); } m_work_queue->queue(ctx, 0); } template void LeaderWatcher::handle_notify_listener(int r) { dout(10) << "r=" << r << dendl; std::lock_guard locker{m_lock}; if (r < 0) { derr << "error notifying listener: " << cpp_strerror(r) << dendl; m_ret_val = r; } if (is_leader(m_lock)) { notify_lock_acquired(); } else { shut_down_instances(); } } template void LeaderWatcher::notify_lock_acquired() { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_context_callback< LeaderWatcher, &LeaderWatcher::handle_notify_lock_acquired>(this); bufferlist bl; encode(NotifyMessage{LockAcquiredPayload{}}, bl); send_notify(bl, nullptr, ctx); } template void LeaderWatcher::handle_notify_lock_acquired(int r) { dout(10) << "r=" << r << dendl; Context *on_finish = nullptr; { std::lock_guard locker{m_lock}; if (r < 0 && r != -ETIMEDOUT) { derr << "error notifying leader lock acquired: " << cpp_strerror(r) << dendl; m_ret_val = r; } ceph_assert(m_on_finish != nullptr); std::swap(m_on_finish, on_finish); if (m_ret_val == 0) { // listener should be ready for instance add/remove events now m_instances->unblock_listener(); } } on_finish->complete(0); } template void LeaderWatcher::notify_lock_released() { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_context_callback< LeaderWatcher, &LeaderWatcher::handle_notify_lock_released>(this); bufferlist bl; encode(NotifyMessage{LockReleasedPayload{}}, bl); send_notify(bl, nullptr, ctx); } template void LeaderWatcher::handle_notify_lock_released(int r) { dout(10) << "r=" << r << dendl; Context *on_finish = nullptr; { std::lock_guard locker{m_lock}; if (r < 0 && r != -ETIMEDOUT) { derr << "error notifying leader lock released: " << cpp_strerror(r) << dendl; } ceph_assert(m_on_finish != nullptr); std::swap(m_on_finish, on_finish); } on_finish->complete(r); } template void LeaderWatcher::notify_heartbeat() { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(!m_timer_op_tracker.empty()); if (!is_leader(m_lock)) { dout(5) << "not leader, canceling" << dendl; m_timer_op_tracker.finish_op(); return; } Context *ctx = create_context_callback< LeaderWatcher, &LeaderWatcher::handle_notify_heartbeat>(this); bufferlist bl; encode(NotifyMessage{HeartbeatPayload{}}, bl); m_heartbeat_response.acks.clear(); send_notify(bl, &m_heartbeat_response, ctx); } template void LeaderWatcher::handle_notify_heartbeat(int r) { dout(10) << "r=" << r << dendl; std::scoped_lock locker{m_threads->timer_lock, m_lock}; ceph_assert(!m_timer_op_tracker.empty()); m_timer_op_tracker.finish_op(); if (m_leader_lock->is_shutdown()) { dout(10) << "canceling due to shutdown" << dendl; return; } else if (!is_leader(m_lock)) { return; } if (r < 0 && r != -ETIMEDOUT) { derr << "error notifying heartbeat: " << cpp_strerror(r) << ", releasing leader" << dendl; release_leader_lock(); return; } dout(10) << m_heartbeat_response.acks.size() << " acks received, " << m_heartbeat_response.timeouts.size() << " timed out" << dendl; std::vector instance_ids; for (auto &it: m_heartbeat_response.acks) { uint64_t notifier_id = it.first.gid; instance_ids.push_back(stringify(notifier_id)); } if (!instance_ids.empty()) { m_instances->acked(instance_ids); } schedule_timer_task("heartbeat", 1, true, &LeaderWatcher::notify_heartbeat, false); } template void LeaderWatcher::handle_heartbeat(Context *on_notify_ack) { dout(10) << dendl; { std::scoped_lock locker{m_threads->timer_lock, m_lock}; if (is_leader(m_lock)) { dout(5) << "got another leader heartbeat, ignoring" << dendl; } else if (!m_locker.cookie.empty()) { cancel_timer_task(); m_acquire_attempts = 0; schedule_acquire_leader_lock(1); } } on_notify_ack->complete(0); } template void LeaderWatcher::handle_lock_acquired(Context *on_notify_ack) { dout(10) << dendl; { std::scoped_lock locker{m_threads->timer_lock, m_lock}; if (is_leader(m_lock)) { dout(5) << "got another leader lock_acquired, ignoring" << dendl; } else { cancel_timer_task(); schedule_get_locker(true, 0); } } on_notify_ack->complete(0); } template void LeaderWatcher::handle_lock_released(Context *on_notify_ack) { dout(10) << dendl; { std::scoped_lock locker{m_threads->timer_lock, m_lock}; if (is_leader(m_lock)) { dout(5) << "got another leader lock_released, ignoring" << dendl; } else { cancel_timer_task(); schedule_get_locker(true, 0); } } on_notify_ack->complete(0); } template void LeaderWatcher::handle_notify(uint64_t notify_id, uint64_t handle, uint64_t notifier_id, bufferlist &bl) { dout(10) << "notify_id=" << notify_id << ", handle=" << handle << ", " << "notifier_id=" << notifier_id << dendl; Context *ctx = new C_NotifyAck(this, notify_id, handle); if (notifier_id == m_notifier_id) { dout(10) << "our own notification, ignoring" << dendl; ctx->complete(0); return; } NotifyMessage notify_message; try { auto iter = bl.cbegin(); decode(notify_message, iter); } catch (const buffer::error &err) { derr << "error decoding image notification: " << err.what() << dendl; ctx->complete(0); return; } apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload); } template void LeaderWatcher::handle_rewatch_complete(int r) { dout(5) << "r=" << r << dendl; if (r == -EBLOCKLISTED) { dout(1) << "blocklisted detected" << dendl; m_blocklisted = true; return; } m_leader_lock->reacquire_lock(nullptr); } template void LeaderWatcher::handle_payload(const HeartbeatPayload &payload, Context *on_notify_ack) { dout(10) << "heartbeat" << dendl; handle_heartbeat(on_notify_ack); } template void LeaderWatcher::handle_payload(const LockAcquiredPayload &payload, Context *on_notify_ack) { dout(10) << "lock_acquired" << dendl; handle_lock_acquired(on_notify_ack); } template void LeaderWatcher::handle_payload(const LockReleasedPayload &payload, Context *on_notify_ack) { dout(10) << "lock_released" << dendl; handle_lock_released(on_notify_ack); } template void LeaderWatcher::handle_payload(const UnknownPayload &payload, Context *on_notify_ack) { dout(10) << "unknown" << dendl; on_notify_ack->complete(0); } } // namespace mirror } // namespace rbd template class rbd::mirror::LeaderWatcher;