From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/tools/rbd_mirror/LeaderWatcher.cc | 1069 +++++++++++++++++++++++++++++++++ 1 file changed, 1069 insertions(+) create mode 100644 src/tools/rbd_mirror/LeaderWatcher.cc (limited to 'src/tools/rbd_mirror/LeaderWatcher.cc') diff --git a/src/tools/rbd_mirror/LeaderWatcher.cc b/src/tools/rbd_mirror/LeaderWatcher.cc new file mode 100644 index 000000000..8f12af14c --- /dev/null +++ b/src/tools/rbd_mirror/LeaderWatcher.cc @@ -0,0 +1,1069 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "LeaderWatcher.h" +#include "common/Cond.h" +#include "common/Timer.h" +#include "common/debug.h" +#include "common/errno.h" +#include "cls/rbd/cls_rbd_client.h" +#include "include/stringify.h" +#include "librbd/Utils.h" +#include "librbd/asio/ContextWQ.h" +#include "librbd/watcher/Types.h" +#include "Threads.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \ + << this << " " << __func__ << ": " +namespace rbd { +namespace mirror { + +using namespace leader_watcher; + +using librbd::util::create_async_context_callback; +using librbd::util::create_context_callback; +using librbd::util::create_rados_callback; + +template +LeaderWatcher::LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx, + leader_watcher::Listener *listener) + : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER), + m_threads(threads), m_listener(listener), m_instances_listener(this), + m_lock(ceph::make_mutex("rbd::mirror::LeaderWatcher " + + io_ctx.get_pool_name())), + m_notifier_id(librados::Rados(io_ctx).get_instance_id()), + m_instance_id(stringify(m_notifier_id)), + m_leader_lock(new LeaderLock(m_ioctx, *m_threads->asio_engine, m_oid, this, + true, m_cct->_conf.get_val( + "rbd_blocklist_expire_seconds"))) { +} + +template +LeaderWatcher::~LeaderWatcher() { + ceph_assert(m_instances == nullptr); + ceph_assert(m_timer_task == nullptr); + + delete m_leader_lock; +} + +template +std::string LeaderWatcher::get_instance_id() { + return m_instance_id; +} + +template +int LeaderWatcher::init() { + C_SaferCond init_ctx; + init(&init_ctx); + return init_ctx.wait(); +} + +template +void LeaderWatcher::init(Context *on_finish) { + dout(10) << "notifier_id=" << m_notifier_id << dendl; + + std::lock_guard locker{m_lock}; + + ceph_assert(m_on_finish == nullptr); + m_on_finish = on_finish; + + create_leader_object(); +} + +template +void LeaderWatcher::create_leader_object() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + + librados::ObjectWriteOperation op; + op.create(false); + + librados::AioCompletion *aio_comp = create_rados_callback< + LeaderWatcher, &LeaderWatcher::handle_create_leader_object>(this); + int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); + ceph_assert(r == 0); + aio_comp->release(); +} + +template +void LeaderWatcher::handle_create_leader_object(int r) { + dout(10) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + std::lock_guard locker{m_lock}; + + if (r == 0) { + register_watch(); + return; + } + + derr << "error creating " << m_oid << " object: " << cpp_strerror(r) + << dendl; + + std::swap(on_finish, m_on_finish); + } + on_finish->complete(r); +} + +template +void LeaderWatcher::register_watch() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_register_watch>(this)); + + librbd::Watcher::register_watch(ctx); +} + +template +void LeaderWatcher::handle_register_watch(int r) { + dout(10) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + std::lock_guard timer_locker(m_threads->timer_lock); + std::lock_guard locker{m_lock}; + + if (r < 0) { + derr << "error registering leader watcher for " << m_oid << " object: " + << cpp_strerror(r) << dendl; + } else { + schedule_acquire_leader_lock(0); + } + + ceph_assert(m_on_finish != nullptr); + std::swap(on_finish, m_on_finish); + } + + on_finish->complete(r); +} + +template +void LeaderWatcher::shut_down() { + C_SaferCond shut_down_ctx; + shut_down(&shut_down_ctx); + int r = shut_down_ctx.wait(); + ceph_assert(r == 0); +} + +template +void LeaderWatcher::shut_down(Context *on_finish) { + dout(10) << dendl; + + std::scoped_lock locker{m_threads->timer_lock, m_lock}; + + ceph_assert(m_on_shut_down_finish == nullptr); + m_on_shut_down_finish = on_finish; + cancel_timer_task(); + shut_down_leader_lock(); +} + +template +void LeaderWatcher::shut_down_leader_lock() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_shut_down_leader_lock>(this)); + + m_leader_lock->shut_down(ctx); +} + +template +void LeaderWatcher::handle_shut_down_leader_lock(int r) { + dout(10) << "r=" << r << dendl; + + std::lock_guard locker{m_lock}; + + if (r < 0) { + derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl; + } + + unregister_watch(); +} + +template +void LeaderWatcher::unregister_watch() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_unregister_watch>(this)); + + librbd::Watcher::unregister_watch(ctx); +} + +template +void LeaderWatcher::handle_unregister_watch(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "error unregistering leader watcher for " << m_oid << " object: " + << cpp_strerror(r) << dendl; + } + wait_for_tasks(); +} + +template +void LeaderWatcher::wait_for_tasks() { + dout(10) << dendl; + + std::scoped_lock locker{m_threads->timer_lock, m_lock}; + schedule_timer_task("wait for tasks", 0, false, + &LeaderWatcher::handle_wait_for_tasks, true); +} + +template +void LeaderWatcher::handle_wait_for_tasks() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_on_shut_down_finish != nullptr); + + ceph_assert(!m_timer_op_tracker.empty()); + m_timer_op_tracker.finish_op(); + + auto ctx = new LambdaContext([this](int r) { + Context *on_finish; + { + // ensure lock isn't held when completing shut down + std::lock_guard locker{m_lock}; + ceph_assert(m_on_shut_down_finish != nullptr); + on_finish = m_on_shut_down_finish; + } + on_finish->complete(0); + }); + m_work_queue->queue(ctx, 0); +} + +template +bool LeaderWatcher::is_blocklisted() const { + std::lock_guard locker{m_lock}; + return m_blocklisted; +} + +template +bool LeaderWatcher::is_leader() const { + std::lock_guard locker{m_lock}; + return is_leader(m_lock); +} + +template +bool LeaderWatcher::is_leader(ceph::mutex &lock) const { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + bool leader = m_leader_lock->is_leader(); + dout(10) << leader << dendl; + return leader; +} + +template +bool LeaderWatcher::is_releasing_leader() const { + std::lock_guard locker{m_lock}; + return is_releasing_leader(m_lock); +} + +template +bool LeaderWatcher::is_releasing_leader(ceph::mutex &lock) const { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + bool releasing = m_leader_lock->is_releasing_leader(); + dout(10) << releasing << dendl; + return releasing; +} + +template +bool LeaderWatcher::get_leader_instance_id(std::string *instance_id) const { + dout(10) << dendl; + + std::lock_guard locker{m_lock}; + + if (is_leader(m_lock) || is_releasing_leader(m_lock)) { + *instance_id = m_instance_id; + return true; + } + + if (!m_locker.cookie.empty()) { + *instance_id = stringify(m_locker.entity.num()); + return true; + } + + return false; +} + +template +void LeaderWatcher::release_leader() { + dout(10) << dendl; + + std::lock_guard locker{m_lock}; + if (!is_leader(m_lock)) { + return; + } + + release_leader_lock(); +} + +template +void LeaderWatcher::list_instances(std::vector *instance_ids) { + dout(10) << dendl; + + std::lock_guard locker{m_lock}; + + instance_ids->clear(); + if (m_instances != nullptr) { + m_instances->list(instance_ids); + } +} + +template +void LeaderWatcher::cancel_timer_task() { + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + + if (m_timer_task == nullptr) { + return; + } + + dout(10) << m_timer_task << dendl; + bool canceled = m_threads->timer->cancel_event(m_timer_task); + ceph_assert(canceled); + m_timer_task = nullptr; +} + +template +void LeaderWatcher::schedule_timer_task(const std::string &name, + int delay_factor, bool leader, + TimerCallback timer_callback, + bool shutting_down) { + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + + if (!shutting_down && m_on_shut_down_finish != nullptr) { + return; + } + + cancel_timer_task(); + + m_timer_task = new LambdaContext( + [this, leader, timer_callback](int r) { + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + m_timer_task = nullptr; + + if (m_timer_op_tracker.empty()) { + std::lock_guard locker{m_lock}; + execute_timer_task(leader, timer_callback); + return; + } + + // old timer task is still running -- do not start next + // task until the previous task completes + if (m_timer_gate == nullptr) { + m_timer_gate = new C_TimerGate(this); + m_timer_op_tracker.wait_for_ops(m_timer_gate); + } + m_timer_gate->leader = leader; + m_timer_gate->timer_callback = timer_callback; + }); + + int after = delay_factor * m_cct->_conf.get_val( + "rbd_mirror_leader_heartbeat_interval"); + + dout(10) << "scheduling " << name << " after " << after << " sec (task " + << m_timer_task << ")" << dendl; + m_threads->timer->add_event_after(after, m_timer_task); +} + +template +void LeaderWatcher::execute_timer_task(bool leader, + TimerCallback timer_callback) { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_timer_op_tracker.empty()); + + if (is_leader(m_lock) != leader) { + return; + } + + m_timer_op_tracker.start_op(); + (this->*timer_callback)(); +} + +template +void LeaderWatcher::handle_post_acquire_leader_lock(int r, + Context *on_finish) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + if (r == -EAGAIN) { + dout(10) << "already locked" << dendl; + } else { + derr << "error acquiring leader lock: " << cpp_strerror(r) << dendl; + } + on_finish->complete(r); + return; + } + + std::lock_guard locker{m_lock}; + ceph_assert(m_on_finish == nullptr); + m_on_finish = on_finish; + m_ret_val = 0; + + init_instances(); +} + +template +void LeaderWatcher::handle_pre_release_leader_lock(Context *on_finish) { + dout(10) << dendl; + + std::lock_guard locker{m_lock}; + ceph_assert(m_on_finish == nullptr); + m_on_finish = on_finish; + m_ret_val = 0; + + notify_listener(); +} + +template +void LeaderWatcher::handle_post_release_leader_lock(int r, + Context *on_finish) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + on_finish->complete(r); + return; + } + + std::lock_guard locker{m_lock}; + ceph_assert(m_on_finish == nullptr); + m_on_finish = on_finish; + + notify_lock_released(); +} + +template +void LeaderWatcher::break_leader_lock() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(!m_timer_op_tracker.empty()); + + if (m_locker.cookie.empty()) { + get_locker(); + return; + } + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_break_leader_lock>(this)); + + m_leader_lock->break_lock(m_locker, true, ctx); +} + +template +void LeaderWatcher::handle_break_leader_lock(int r) { + dout(10) << "r=" << r << dendl; + + std::scoped_lock locker{m_threads->timer_lock, m_lock}; + ceph_assert(!m_timer_op_tracker.empty()); + + if (m_leader_lock->is_shutdown()) { + dout(10) << "canceling due to shutdown" << dendl; + m_timer_op_tracker.finish_op(); + return; + } + + if (r < 0 && r != -ENOENT) { + derr << "error breaking leader lock: " << cpp_strerror(r) << dendl; + schedule_acquire_leader_lock(1); + m_timer_op_tracker.finish_op(); + return; + } + + m_locker = {}; + m_acquire_attempts = 0; + acquire_leader_lock(); +} + +template +void LeaderWatcher::schedule_get_locker(bool reset_leader, + uint32_t delay_factor) { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + + if (reset_leader) { + m_locker = {}; + m_acquire_attempts = 0; + } + + schedule_timer_task("get locker", delay_factor, false, + &LeaderWatcher::get_locker, false); +} + +template +void LeaderWatcher::get_locker() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(!m_timer_op_tracker.empty()); + + C_GetLocker *get_locker_ctx = new C_GetLocker(this); + Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx); + + m_leader_lock->get_locker(&get_locker_ctx->locker, ctx); +} + +template +void LeaderWatcher::handle_get_locker(int r, + librbd::managed_lock::Locker& locker) { + dout(10) << "r=" << r << dendl; + + std::scoped_lock l{m_threads->timer_lock, m_lock}; + ceph_assert(!m_timer_op_tracker.empty()); + + if (m_leader_lock->is_shutdown()) { + dout(10) << "canceling due to shutdown" << dendl; + m_timer_op_tracker.finish_op(); + return; + } + + if (is_leader(m_lock)) { + m_locker = {}; + m_timer_op_tracker.finish_op(); + return; + } + + if (r == -ENOENT) { + m_locker = {}; + m_acquire_attempts = 0; + acquire_leader_lock(); + return; + } else if (r < 0) { + derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl; + schedule_get_locker(true, 1); + m_timer_op_tracker.finish_op(); + return; + } + + bool notify_listener = false; + if (m_locker != locker) { + m_locker = locker; + notify_listener = true; + if (m_acquire_attempts > 1) { + dout(10) << "new lock owner detected -- resetting heartbeat counter" + << dendl; + m_acquire_attempts = 0; + } + } + + if (m_acquire_attempts >= m_cct->_conf.get_val( + "rbd_mirror_leader_max_acquire_attempts_before_break")) { + dout(0) << "breaking leader lock after " << m_acquire_attempts << " " + << "failed attempts to acquire" << dendl; + break_leader_lock(); + return; + } + + schedule_acquire_leader_lock(1); + + if (!notify_listener) { + m_timer_op_tracker.finish_op(); + return; + } + + auto ctx = new LambdaContext( + [this](int r) { + std::string instance_id; + if (get_leader_instance_id(&instance_id)) { + m_listener->update_leader_handler(instance_id); + } + std::scoped_lock locker{m_threads->timer_lock, m_lock}; + m_timer_op_tracker.finish_op(); + }); + m_work_queue->queue(ctx, 0); +} + +template +void LeaderWatcher::schedule_acquire_leader_lock(uint32_t delay_factor) { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + + schedule_timer_task("acquire leader lock", + delay_factor * + m_cct->_conf.get_val("rbd_mirror_leader_max_missed_heartbeats"), + false, &LeaderWatcher::acquire_leader_lock, false); +} + +template +void LeaderWatcher::acquire_leader_lock() { + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(!m_timer_op_tracker.empty()); + + ++m_acquire_attempts; + dout(10) << "acquire_attempts=" << m_acquire_attempts << dendl; + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_acquire_leader_lock>(this)); + m_leader_lock->try_acquire_lock(ctx); +} + +template +void LeaderWatcher::handle_acquire_leader_lock(int r) { + dout(10) << "r=" << r << dendl; + + std::scoped_lock locker{m_threads->timer_lock, m_lock}; + ceph_assert(!m_timer_op_tracker.empty()); + + if (m_leader_lock->is_shutdown()) { + dout(10) << "canceling due to shutdown" << dendl; + m_timer_op_tracker.finish_op(); + return; + } + + if (r < 0) { + if (r == -EAGAIN) { + dout(10) << "already locked" << dendl; + } else { + derr << "error acquiring lock: " << cpp_strerror(r) << dendl; + } + + get_locker(); + return; + } + + m_locker = {}; + m_acquire_attempts = 0; + + if (m_ret_val) { + dout(5) << "releasing due to error on notify" << dendl; + release_leader_lock(); + m_timer_op_tracker.finish_op(); + return; + } + + notify_heartbeat(); +} + +template +void LeaderWatcher::release_leader_lock() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_release_leader_lock>(this)); + + m_leader_lock->release_lock(ctx); +} + +template +void LeaderWatcher::handle_release_leader_lock(int r) { + dout(10) << "r=" << r << dendl; + + std::scoped_lock locker{m_threads->timer_lock, m_lock}; + + if (r < 0) { + derr << "error releasing lock: " << cpp_strerror(r) << dendl; + return; + } + + schedule_acquire_leader_lock(1); +} + +template +void LeaderWatcher::init_instances() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_instances == nullptr); + + m_instances = Instances::create(m_threads, m_ioctx, m_instance_id, + m_instances_listener); + + Context *ctx = create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_init_instances>(this); + + m_instances->init(ctx); +} + +template +void LeaderWatcher::handle_init_instances(int r) { + dout(10) << "r=" << r << dendl; + + Context *on_finish = nullptr; + if (r < 0) { + std::lock_guard locker{m_lock}; + derr << "error initializing instances: " << cpp_strerror(r) << dendl; + m_instances->destroy(); + m_instances = nullptr; + + ceph_assert(m_on_finish != nullptr); + std::swap(m_on_finish, on_finish); + } else { + std::lock_guard locker{m_lock}; + notify_listener(); + return; + } + + on_finish->complete(r); +} + +template +void LeaderWatcher::shut_down_instances() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_instances != nullptr); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback, + &LeaderWatcher::handle_shut_down_instances>(this)); + + m_instances->shut_down(ctx); +} + +template +void LeaderWatcher::handle_shut_down_instances(int r) { + dout(10) << "r=" << r << dendl; + ceph_assert(r == 0); + + Context *on_finish = nullptr; + { + std::lock_guard locker{m_lock}; + + m_instances->destroy(); + m_instances = nullptr; + + ceph_assert(m_on_finish != nullptr); + std::swap(m_on_finish, on_finish); + } + on_finish->complete(r); +} + +template +void LeaderWatcher::notify_listener() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_notify_listener>(this)); + + if (is_leader(m_lock)) { + ctx = new LambdaContext( + [this, ctx](int r) { + m_listener->post_acquire_handler(ctx); + }); + } else { + ctx = new LambdaContext( + [this, ctx](int r) { + m_listener->pre_release_handler(ctx); + }); + } + m_work_queue->queue(ctx, 0); +} + +template +void LeaderWatcher::handle_notify_listener(int r) { + dout(10) << "r=" << r << dendl; + + std::lock_guard locker{m_lock}; + + if (r < 0) { + derr << "error notifying listener: " << cpp_strerror(r) << dendl; + m_ret_val = r; + } + + if (is_leader(m_lock)) { + notify_lock_acquired(); + } else { + shut_down_instances(); + } +} + +template +void LeaderWatcher::notify_lock_acquired() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + + Context *ctx = create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_notify_lock_acquired>(this); + + bufferlist bl; + encode(NotifyMessage{LockAcquiredPayload{}}, bl); + + send_notify(bl, nullptr, ctx); +} + +template +void LeaderWatcher::handle_notify_lock_acquired(int r) { + dout(10) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + std::lock_guard locker{m_lock}; + if (r < 0 && r != -ETIMEDOUT) { + derr << "error notifying leader lock acquired: " << cpp_strerror(r) + << dendl; + m_ret_val = r; + } + + ceph_assert(m_on_finish != nullptr); + std::swap(m_on_finish, on_finish); + + if (m_ret_val == 0) { + // listener should be ready for instance add/remove events now + m_instances->unblock_listener(); + } + } + on_finish->complete(0); +} + +template +void LeaderWatcher::notify_lock_released() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + + Context *ctx = create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_notify_lock_released>(this); + + bufferlist bl; + encode(NotifyMessage{LockReleasedPayload{}}, bl); + + send_notify(bl, nullptr, ctx); +} + +template +void LeaderWatcher::handle_notify_lock_released(int r) { + dout(10) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + std::lock_guard locker{m_lock}; + if (r < 0 && r != -ETIMEDOUT) { + derr << "error notifying leader lock released: " << cpp_strerror(r) + << dendl; + } + + ceph_assert(m_on_finish != nullptr); + std::swap(m_on_finish, on_finish); + } + on_finish->complete(r); +} + +template +void LeaderWatcher::notify_heartbeat() { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(!m_timer_op_tracker.empty()); + + if (!is_leader(m_lock)) { + dout(5) << "not leader, canceling" << dendl; + m_timer_op_tracker.finish_op(); + return; + } + + Context *ctx = create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_notify_heartbeat>(this); + + bufferlist bl; + encode(NotifyMessage{HeartbeatPayload{}}, bl); + + m_heartbeat_response.acks.clear(); + send_notify(bl, &m_heartbeat_response, ctx); +} + +template +void LeaderWatcher::handle_notify_heartbeat(int r) { + dout(10) << "r=" << r << dendl; + + std::scoped_lock locker{m_threads->timer_lock, m_lock}; + ceph_assert(!m_timer_op_tracker.empty()); + + m_timer_op_tracker.finish_op(); + if (m_leader_lock->is_shutdown()) { + dout(10) << "canceling due to shutdown" << dendl; + return; + } else if (!is_leader(m_lock)) { + return; + } + + if (r < 0 && r != -ETIMEDOUT) { + derr << "error notifying heartbeat: " << cpp_strerror(r) + << ", releasing leader" << dendl; + release_leader_lock(); + return; + } + + dout(10) << m_heartbeat_response.acks.size() << " acks received, " + << m_heartbeat_response.timeouts.size() << " timed out" << dendl; + + std::vector instance_ids; + for (auto &it: m_heartbeat_response.acks) { + uint64_t notifier_id = it.first.gid; + instance_ids.push_back(stringify(notifier_id)); + } + if (!instance_ids.empty()) { + m_instances->acked(instance_ids); + } + + schedule_timer_task("heartbeat", 1, true, + &LeaderWatcher::notify_heartbeat, false); +} + +template +void LeaderWatcher::handle_heartbeat(Context *on_notify_ack) { + dout(10) << dendl; + + { + std::scoped_lock locker{m_threads->timer_lock, m_lock}; + if (is_leader(m_lock)) { + dout(5) << "got another leader heartbeat, ignoring" << dendl; + } else if (!m_locker.cookie.empty()) { + cancel_timer_task(); + m_acquire_attempts = 0; + schedule_acquire_leader_lock(1); + } + } + + on_notify_ack->complete(0); +} + +template +void LeaderWatcher::handle_lock_acquired(Context *on_notify_ack) { + dout(10) << dendl; + + { + std::scoped_lock locker{m_threads->timer_lock, m_lock}; + if (is_leader(m_lock)) { + dout(5) << "got another leader lock_acquired, ignoring" << dendl; + } else { + cancel_timer_task(); + schedule_get_locker(true, 0); + } + } + + on_notify_ack->complete(0); +} + +template +void LeaderWatcher::handle_lock_released(Context *on_notify_ack) { + dout(10) << dendl; + + { + std::scoped_lock locker{m_threads->timer_lock, m_lock}; + if (is_leader(m_lock)) { + dout(5) << "got another leader lock_released, ignoring" << dendl; + } else { + cancel_timer_task(); + schedule_get_locker(true, 0); + } + } + + on_notify_ack->complete(0); +} + +template +void LeaderWatcher::handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist &bl) { + dout(10) << "notify_id=" << notify_id << ", handle=" << handle << ", " + << "notifier_id=" << notifier_id << dendl; + + Context *ctx = new C_NotifyAck(this, notify_id, handle); + + if (notifier_id == m_notifier_id) { + dout(10) << "our own notification, ignoring" << dendl; + ctx->complete(0); + return; + } + + NotifyMessage notify_message; + try { + auto iter = bl.cbegin(); + decode(notify_message, iter); + } catch (const buffer::error &err) { + derr << "error decoding image notification: " << err.what() << dendl; + ctx->complete(0); + return; + } + + apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload); +} + +template +void LeaderWatcher::handle_rewatch_complete(int r) { + dout(5) << "r=" << r << dendl; + + if (r == -EBLOCKLISTED) { + dout(1) << "blocklisted detected" << dendl; + m_blocklisted = true; + return; + } + + m_leader_lock->reacquire_lock(nullptr); +} + +template +void LeaderWatcher::handle_payload(const HeartbeatPayload &payload, + Context *on_notify_ack) { + dout(10) << "heartbeat" << dendl; + + handle_heartbeat(on_notify_ack); +} + +template +void LeaderWatcher::handle_payload(const LockAcquiredPayload &payload, + Context *on_notify_ack) { + dout(10) << "lock_acquired" << dendl; + + handle_lock_acquired(on_notify_ack); +} + +template +void LeaderWatcher::handle_payload(const LockReleasedPayload &payload, + Context *on_notify_ack) { + dout(10) << "lock_released" << dendl; + + handle_lock_released(on_notify_ack); +} + +template +void LeaderWatcher::handle_payload(const UnknownPayload &payload, + Context *on_notify_ack) { + dout(10) << "unknown" << dendl; + + on_notify_ack->complete(0); +} + +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::LeaderWatcher; -- cgit v1.2.3