// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #ifndef CEPHFS_MIRROR_PEER_REPLAYER_H #define CEPHFS_MIRROR_PEER_REPLAYER_H #include "common/Formatter.h" #include "common/Thread.h" #include "mds/FSMap.h" #include "ServiceDaemon.h" #include "Types.h" namespace cephfs { namespace mirror { class FSMirror; class PeerReplayerAdminSocketHook; class PeerReplayer { public: PeerReplayer(CephContext *cct, FSMirror *fs_mirror, RadosRef local_cluster, const Filesystem &filesystem, const Peer &peer, const std::set> &directories, MountRef mount, ServiceDaemon *service_daemon); ~PeerReplayer(); // initialize replayer for a peer int init(); // shutdown replayer for a peer void shutdown(); // add a directory to mirror queue void add_directory(string_view dir_root); // remove a directory from queue void remove_directory(string_view dir_root); // admin socket helpers void peer_status(Formatter *f); // reopen logs void reopen_logs(); private: inline static const std::string PRIMARY_SNAP_ID_KEY = "primary_snap_id"; inline static const std::string SERVICE_DAEMON_FAILED_DIR_COUNT_KEY = "failure_count"; inline static const std::string SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY = "recovery_count"; using Snapshot = std::pair; // file descriptor "triplet" for synchronizing a snapshot // w/ an added MountRef for accessing "previous" snapshot. struct FHandles { // open file descriptor on the snap directory for snapshot // currently being synchronized. Always use this fd with // @m_local_mount. int c_fd; // open file descriptor on the "previous" snapshot or on // dir_root on remote filesystem (based on if the snapshot // can be used for incremental transfer). Always use this // fd with p_mnt which either points to @m_local_mount ( // for local incremental comparison) or @m_remote_mount ( // for remote incremental comparison). int p_fd; MountRef p_mnt; // open file descriptor on dir_root on remote filesystem. // Always use this fd with @m_remote_mount. int r_fd_dir_root; }; bool is_stopping() { return m_stopping; } struct Replayer; class SnapshotReplayerThread : public Thread { public: SnapshotReplayerThread(PeerReplayer *peer_replayer) : m_peer_replayer(peer_replayer) { } void *entry() override { m_peer_replayer->run(this); return 0; } private: PeerReplayer *m_peer_replayer; }; struct DirRegistry { int fd; bool canceled = false; SnapshotReplayerThread *replayer; }; struct SyncEntry { std::string epath; ceph_dir_result *dirp; // valid for directories struct ceph_statx stx; // set by incremental sync _after_ ensuring missing entries // in the currently synced snapshot have been propagated to // the remote filesystem. bool remote_synced = false; SyncEntry(std::string_view path, const struct ceph_statx &stx) : epath(path), stx(stx) { } SyncEntry(std::string_view path, ceph_dir_result *dirp, const struct ceph_statx &stx) : epath(path), dirp(dirp), stx(stx) { } bool is_directory() const { return S_ISDIR(stx.stx_mode); } bool needs_remote_sync() const { return remote_synced; } void set_remote_synced() { remote_synced = true; } }; using clock = ceph::coarse_mono_clock; using time = ceph::coarse_mono_time; // stats sent to service daemon struct ServiceDaemonStats { uint64_t failed_dir_count = 0; uint64_t recovered_dir_count = 0; }; struct SnapSyncStat { uint64_t nr_failures = 0; // number of consecutive failures boost::optional