From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/tools/rbd_mirror/PoolReplayer.h | 288 ++++++++++++++++++++++++++++++++++++ 1 file changed, 288 insertions(+) create mode 100644 src/tools/rbd_mirror/PoolReplayer.h (limited to 'src/tools/rbd_mirror/PoolReplayer.h') diff --git a/src/tools/rbd_mirror/PoolReplayer.h b/src/tools/rbd_mirror/PoolReplayer.h new file mode 100644 index 000000000..e0fd75377 --- /dev/null +++ b/src/tools/rbd_mirror/PoolReplayer.h @@ -0,0 +1,288 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_MIRROR_POOL_REPLAYER_H +#define CEPH_RBD_MIRROR_POOL_REPLAYER_H + +#include "common/Cond.h" +#include "common/ceph_mutex.h" +#include "include/rados/librados.hpp" +#include "librbd/Utils.h" +#include "librbd/asio/ContextWQ.h" + +#include "tools/rbd_mirror/LeaderWatcher.h" +#include "tools/rbd_mirror/NamespaceReplayer.h" +#include "tools/rbd_mirror/Throttler.h" +#include "tools/rbd_mirror/Types.h" +#include "tools/rbd_mirror/leader_watcher/Types.h" +#include "tools/rbd_mirror/service_daemon/Types.h" + +#include +#include +#include +#include + +class AdminSocketHook; + +namespace journal { struct CacheManagerHandler; } + +namespace librbd { class ImageCtx; } + +namespace rbd { +namespace mirror { + +template class RemotePoolPoller; +namespace remote_pool_poller { struct Listener; } + +struct PoolMetaCache; +template class ServiceDaemon; +template struct Threads; + + +/** + * Controls mirroring for a single remote cluster. + */ +template +class PoolReplayer { +public: + PoolReplayer(Threads *threads, + ServiceDaemon *service_daemon, + journal::CacheManagerHandler *cache_manager_handler, + PoolMetaCache* pool_meta_cache, + int64_t local_pool_id, const PeerSpec &peer, + const std::vector &args); + ~PoolReplayer(); + PoolReplayer(const PoolReplayer&) = delete; + PoolReplayer& operator=(const PoolReplayer&) = delete; + + bool is_blocklisted() const; + bool is_leader() const; + bool is_running() const; + + void init(const std::string& site_name); + void shut_down(); + + void run(); + + void print_status(Formatter *f); + void start(); + void stop(bool manual); + void restart(); + void flush(); + void release_leader(); + void reopen_logs(); + +private: + /** + * @verbatim + * + * + * | + * v + * INIT + * | + * v + * <---------------------\ + * . | + * . (leader acquired) | + * v | + * NOTIFY_NAMESPACE_WATCHERS NOTIFY_NAMESPACE_WATCHERS + * | ^ + * v . + * . + * . . + * . (leader lost / shut down) . + * . . . . . . . . . . . . . . . . + * + * @endverbatim + */ + + struct RemotePoolPollerListener; + + int init_rados(const std::string &cluster_name, + const std::string &client_name, + const std::string &mon_host, + const std::string &key, + const std::string &description, RadosRef *rados_ref, + bool strip_cluster_overrides); + + void update_namespace_replayers(); + int list_mirroring_namespaces(std::set *namespaces); + + void namespace_replayer_acquire_leader(const std::string &name, + Context *on_finish); + + void handle_post_acquire_leader(Context *on_finish); + void handle_pre_release_leader(Context *on_finish); + + void handle_update_leader(const std::string &leader_instance_id); + + void handle_instances_added(const std::vector &instance_ids); + void handle_instances_removed(const std::vector &instance_ids); + + // sync version, executed in the caller thread + template + void with_namespace_replayers(L &&callback) { + std::lock_guard locker{m_lock}; + + if (m_namespace_replayers_locked) { + ceph_assert(m_on_namespace_replayers_unlocked == nullptr); + C_SaferCond cond; + m_on_namespace_replayers_unlocked = &cond; + m_lock.unlock(); + cond.wait(); + m_lock.lock(); + } else { + m_namespace_replayers_locked = true; + } + + ceph_assert(m_namespace_replayers_locked); + callback(); // may temporary release the lock + ceph_assert(m_namespace_replayers_locked); + + if (m_on_namespace_replayers_unlocked == nullptr) { + m_namespace_replayers_locked = false; + return; + } + + m_threads->work_queue->queue(m_on_namespace_replayers_unlocked); + m_on_namespace_replayers_unlocked = nullptr; + } + + // async version + template + void with_namespace_replayers(L &&callback, Context *on_finish) { + std::lock_guard locker{m_lock}; + + on_finish = librbd::util::create_async_context_callback( + m_threads->work_queue, new LambdaContext( + [this, on_finish](int r) { + { + std::lock_guard locker{m_lock}; + ceph_assert(m_namespace_replayers_locked); + + m_namespace_replayers_locked = false; + + if (m_on_namespace_replayers_unlocked != nullptr) { + m_namespace_replayers_locked = true; + m_threads->work_queue->queue(m_on_namespace_replayers_unlocked); + m_on_namespace_replayers_unlocked = nullptr; + } + } + on_finish->complete(r); + })); + + auto on_lock = new LambdaContext( + [this, callback, on_finish](int) { + std::lock_guard locker{m_lock}; + ceph_assert(m_namespace_replayers_locked); + + callback(on_finish); + }); + + if (m_namespace_replayers_locked) { + ceph_assert(m_on_namespace_replayers_unlocked == nullptr); + m_on_namespace_replayers_unlocked = on_lock; + return; + } + + m_namespace_replayers_locked = true; + m_threads->work_queue->queue(on_lock); + } + + void handle_remote_pool_meta_updated(const RemotePoolMeta& remote_pool_meta); + + Threads *m_threads; + ServiceDaemon *m_service_daemon; + journal::CacheManagerHandler *m_cache_manager_handler; + PoolMetaCache* m_pool_meta_cache; + int64_t m_local_pool_id = -1; + PeerSpec m_peer; + std::vector m_args; + + mutable ceph::mutex m_lock; + ceph::condition_variable m_cond; + std::string m_site_name; + bool m_stopping = false; + bool m_manual_stop = false; + bool m_blocklisted = false; + + RadosRef m_local_rados; + RadosRef m_remote_rados; + + librados::IoCtx m_local_io_ctx; + librados::IoCtx m_remote_io_ctx; + + std::string m_local_mirror_uuid; + + RemotePoolMeta m_remote_pool_meta; + std::unique_ptr m_remote_pool_poller_listener; + std::unique_ptr> m_remote_pool_poller; + + std::unique_ptr> m_default_namespace_replayer; + std::map *> m_namespace_replayers; + + std::string m_asok_hook_name; + AdminSocketHook *m_asok_hook = nullptr; + + service_daemon::CalloutId m_callout_id = service_daemon::CALLOUT_ID_NONE; + + bool m_leader = false; + bool m_namespace_replayers_locked = false; + Context *m_on_namespace_replayers_unlocked = nullptr; + + class PoolReplayerThread : public Thread { + PoolReplayer *m_pool_replayer; + public: + PoolReplayerThread(PoolReplayer *pool_replayer) + : m_pool_replayer(pool_replayer) { + } + void *entry() override { + m_pool_replayer->run(); + return 0; + } + } m_pool_replayer_thread; + + class LeaderListener : public leader_watcher::Listener { + public: + LeaderListener(PoolReplayer *pool_replayer) + : m_pool_replayer(pool_replayer) { + } + + protected: + void post_acquire_handler(Context *on_finish) override { + m_pool_replayer->handle_post_acquire_leader(on_finish); + } + + void pre_release_handler(Context *on_finish) override { + m_pool_replayer->handle_pre_release_leader(on_finish); + } + + void update_leader_handler( + const std::string &leader_instance_id) override { + m_pool_replayer->handle_update_leader(leader_instance_id); + } + + void handle_instances_added(const InstanceIds& instance_ids) override { + m_pool_replayer->handle_instances_added(instance_ids); + } + + void handle_instances_removed(const InstanceIds& instance_ids) override { + m_pool_replayer->handle_instances_removed(instance_ids); + } + + private: + PoolReplayer *m_pool_replayer; + } m_leader_listener; + + std::unique_ptr> m_leader_watcher; + std::unique_ptr> m_image_sync_throttler; + std::unique_ptr> m_image_deletion_throttler; +}; + +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::PoolReplayer; + +#endif // CEPH_RBD_MIRROR_POOL_REPLAYER_H -- cgit v1.2.3