summaryrefslogtreecommitdiffstats
path: root/src/tools/rbd_mirror/InstanceWatcher.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/tools/rbd_mirror/InstanceWatcher.h
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tools/rbd_mirror/InstanceWatcher.h')
-rw-r--r--src/tools/rbd_mirror/InstanceWatcher.h269
1 files changed, 269 insertions, 0 deletions
diff --git a/src/tools/rbd_mirror/InstanceWatcher.h b/src/tools/rbd_mirror/InstanceWatcher.h
new file mode 100644
index 000000000..08e40b40b
--- /dev/null
+++ b/src/tools/rbd_mirror/InstanceWatcher.h
@@ -0,0 +1,269 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
+#define CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
+
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "common/AsyncOpTracker.h"
+#include "librbd/Watcher.h"
+#include "librbd/managed_lock/Types.h"
+#include "tools/rbd_mirror/instance_watcher/Types.h"
+
+namespace librbd {
+
+class AsioEngine;
+class ImageCtx;
+template <typename> class ManagedLock;
+
+} // namespace librbd
+
+namespace rbd {
+namespace mirror {
+
+template <typename> class InstanceReplayer;
+template <typename> class Throttler;
+template <typename> struct Threads;
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class InstanceWatcher : protected librbd::Watcher {
+ using librbd::Watcher::unregister_watch; // Silence overloaded virtual warning
+public:
+ static void get_instances(librados::IoCtx &io_ctx,
+ std::vector<std::string> *instance_ids,
+ Context *on_finish);
+ static void remove_instance(librados::IoCtx &io_ctx,
+ librbd::AsioEngine& asio_engine,
+ const std::string &instance_id,
+ Context *on_finish);
+
+ static InstanceWatcher *create(
+ librados::IoCtx &io_ctx, librbd::AsioEngine& asio_engine,
+ InstanceReplayer<ImageCtxT> *instance_replayer,
+ Throttler<ImageCtxT> *image_sync_throttler);
+ void destroy() {
+ delete this;
+ }
+
+ InstanceWatcher(librados::IoCtx &io_ctx, librbd::AsioEngine& asio_engine,
+ InstanceReplayer<ImageCtxT> *instance_replayer,
+ Throttler<ImageCtxT> *image_sync_throttler,
+ const std::string &instance_id);
+ ~InstanceWatcher() override;
+
+ inline std::string &get_instance_id() {
+ return m_instance_id;
+ }
+
+ int init();
+ void shut_down();
+
+ void init(Context *on_finish);
+ void shut_down(Context *on_finish);
+ void remove(Context *on_finish);
+
+ void notify_image_acquire(const std::string &instance_id,
+ const std::string &global_image_id,
+ Context *on_notify_ack);
+ void notify_image_release(const std::string &instance_id,
+ const std::string &global_image_id,
+ Context *on_notify_ack);
+ void notify_peer_image_removed(const std::string &instance_id,
+ const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ Context *on_notify_ack);
+
+ void notify_sync_request(const std::string &sync_id, Context *on_sync_start);
+ bool cancel_sync_request(const std::string &sync_id);
+ void notify_sync_complete(const std::string &sync_id);
+
+ void cancel_notify_requests(const std::string &instance_id);
+
+ void handle_acquire_leader();
+ void handle_release_leader();
+ void handle_update_leader(const std::string &leader_instance_id);
+
+private:
+ /**
+ * @verbatim
+ *
+ * BREAK_INSTANCE_LOCK -------\
+ * ^ |
+ * | (error) |
+ * GET_INSTANCE_LOCKER * * *>|
+ * ^ (remove) |
+ * | |
+ * <uninitialized> <----------------+---- WAIT_FOR_NOTIFY_OPS
+ * | (init) ^ | ^
+ * v (error) * | |
+ * REGISTER_INSTANCE * * * * * *|* *> UNREGISTER_INSTANCE
+ * | * | ^
+ * v (error) * v |
+ * CREATE_INSTANCE_OBJECT * * * * * *> REMOVE_INSTANCE_OBJECT
+ * | * ^
+ * v (error) * |
+ * REGISTER_WATCH * * * * * * * * * *> UNREGISTER_WATCH
+ * | * ^
+ * v (error) * |
+ * ACQUIRE_LOCK * * * * * * * * * * * RELEASE_LOCK
+ * | ^
+ * v (shut_down) |
+ * <watching> -------------------------------/
+ *
+ * @endverbatim
+ */
+
+ struct C_NotifyInstanceRequest;
+ struct C_SyncRequest;
+
+ typedef std::pair<std::string, std::string> Id;
+
+ struct HandlePayloadVisitor : public boost::static_visitor<void> {
+ InstanceWatcher *instance_watcher;
+ std::string instance_id;
+ C_NotifyAck *on_notify_ack;
+
+ HandlePayloadVisitor(InstanceWatcher *instance_watcher,
+ const std::string &instance_id,
+ C_NotifyAck *on_notify_ack)
+ : instance_watcher(instance_watcher), instance_id(instance_id),
+ on_notify_ack(on_notify_ack) {
+ }
+
+ template <typename Payload>
+ inline void operator()(const Payload &payload) const {
+ instance_watcher->handle_payload(instance_id, payload, on_notify_ack);
+ }
+ };
+
+ struct Request {
+ std::string instance_id;
+ uint64_t request_id;
+ C_NotifyAck *on_notify_ack = nullptr;
+
+ Request(const std::string &instance_id, uint64_t request_id)
+ : instance_id(instance_id), request_id(request_id) {
+ }
+
+ inline bool operator<(const Request &rhs) const {
+ return instance_id < rhs.instance_id ||
+ (instance_id == rhs.instance_id && request_id < rhs.request_id);
+ }
+ };
+
+ Threads<ImageCtxT> *m_threads;
+ InstanceReplayer<ImageCtxT> *m_instance_replayer;
+ Throttler<ImageCtxT> *m_image_sync_throttler;
+ std::string m_instance_id;
+
+ mutable ceph::mutex m_lock;
+ librbd::ManagedLock<ImageCtxT> *m_instance_lock;
+ Context *m_on_finish = nullptr;
+ int m_ret_val = 0;
+ std::string m_leader_instance_id;
+ librbd::managed_lock::Locker m_instance_locker;
+ std::set<std::pair<std::string, C_NotifyInstanceRequest *>> m_notify_ops;
+ AsyncOpTracker m_notify_op_tracker;
+ uint64_t m_request_seq = 0;
+ std::set<Request> m_requests;
+ std::set<C_NotifyInstanceRequest *> m_suspended_ops;
+ std::map<std::string, C_SyncRequest *> m_inflight_sync_reqs;
+
+ inline bool is_leader() const {
+ return m_leader_instance_id == m_instance_id;
+ }
+
+ void register_instance();
+ void handle_register_instance(int r);
+
+ void create_instance_object();
+ void handle_create_instance_object(int r);
+
+ void register_watch();
+ void handle_register_watch(int r);
+
+ void acquire_lock();
+ void handle_acquire_lock(int r);
+
+ void release_lock();
+ void handle_release_lock(int r);
+
+ void unregister_watch();
+ void handle_unregister_watch(int r);
+
+ void remove_instance_object();
+ void handle_remove_instance_object(int r);
+
+ void unregister_instance();
+ void handle_unregister_instance(int r);
+
+ void wait_for_notify_ops();
+ void handle_wait_for_notify_ops(int r);
+
+ void get_instance_locker();
+ void handle_get_instance_locker(int r);
+
+ void break_instance_lock();
+ void handle_break_instance_lock(int r);
+
+ void suspend_notify_request(C_NotifyInstanceRequest *req);
+ bool unsuspend_notify_request(C_NotifyInstanceRequest *req);
+ void unsuspend_notify_requests();
+
+ void notify_sync_complete(const ceph::mutex& lock, const std::string &sync_id);
+ void handle_notify_sync_request(C_SyncRequest *sync_ctx, int r);
+ void handle_notify_sync_complete(C_SyncRequest *sync_ctx, int r);
+
+ void notify_sync_start(const std::string &instance_id,
+ const std::string &sync_id);
+
+ Context *prepare_request(const std::string &instance_id, uint64_t request_id,
+ C_NotifyAck *on_notify_ack);
+ void complete_request(const std::string &instance_id, uint64_t request_id,
+ int r);
+
+ void handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist &bl) override;
+
+ void handle_image_acquire(const std::string &global_image_id,
+ Context *on_finish);
+ void handle_image_release(const std::string &global_image_id,
+ Context *on_finish);
+ void handle_peer_image_removed(const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ Context *on_finish);
+
+ void handle_sync_request(const std::string &instance_id,
+ const std::string &sync_id, Context *on_finish);
+ void handle_sync_start(const std::string &instance_id,
+ const std::string &sync_id, Context *on_finish);
+
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::ImageAcquirePayload &payload,
+ C_NotifyAck *on_notify_ack);
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::ImageReleasePayload &payload,
+ C_NotifyAck *on_notify_ack);
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::PeerImageRemovedPayload &payload,
+ C_NotifyAck *on_notify_ack);
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::SyncRequestPayload &payload,
+ C_NotifyAck *on_notify_ack);
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::SyncStartPayload &payload,
+ C_NotifyAck *on_notify_ack);
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::UnknownPayload &payload,
+ C_NotifyAck *on_notify_ack);
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_INSTANCE_WATCHER_H