diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/tools/cephfs_mirror/PeerReplayer.h | |
parent | Initial commit. (diff) | |
download | ceph-upstream/16.2.11+ds.tar.xz ceph-upstream/16.2.11+ds.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tools/cephfs_mirror/PeerReplayer.h')
-rw-r--r-- | src/tools/cephfs_mirror/PeerReplayer.h | 319 |
1 files changed, 319 insertions, 0 deletions
diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h new file mode 100644 index 000000000..886c95329 --- /dev/null +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -0,0 +1,319 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_PEER_REPLAYER_H +#define CEPHFS_MIRROR_PEER_REPLAYER_H + +#include "common/Formatter.h" +#include "common/Thread.h" +#include "mds/FSMap.h" +#include "ServiceDaemon.h" +#include "Types.h" + +namespace cephfs { +namespace mirror { + +class FSMirror; +class PeerReplayerAdminSocketHook; + +class PeerReplayer { +public: + PeerReplayer(CephContext *cct, FSMirror *fs_mirror, + RadosRef local_cluster, const Filesystem &filesystem, + const Peer &peer, const std::set<std::string, std::less<>> &directories, + MountRef mount, ServiceDaemon *service_daemon); + ~PeerReplayer(); + + // initialize replayer for a peer + int init(); + + // shutdown replayer for a peer + void shutdown(); + + // add a directory to mirror queue + void add_directory(string_view dir_root); + + // remove a directory from queue + void remove_directory(string_view dir_root); + + // admin socket helpers + void peer_status(Formatter *f); + + // reopen logs + void reopen_logs(); + +private: + inline static const std::string PRIMARY_SNAP_ID_KEY = "primary_snap_id"; + + inline static const std::string SERVICE_DAEMON_FAILED_DIR_COUNT_KEY = "failure_count"; + inline static const std::string SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY = "recovery_count"; + + using Snapshot = std::pair<std::string, uint64_t>; + + // file descriptor "triplet" for synchronizing a snapshot + // w/ an added MountRef for accessing "previous" snapshot. + struct FHandles { + // open file descriptor on the snap directory for snapshot + // currently being synchronized. Always use this fd with + // @m_local_mount. + int c_fd; + + // open file descriptor on the "previous" snapshot or on + // dir_root on remote filesystem (based on if the snapshot + // can be used for incremental transfer). Always use this + // fd with p_mnt which either points to @m_local_mount ( + // for local incremental comparison) or @m_remote_mount ( + // for remote incremental comparison). + int p_fd; + MountRef p_mnt; + + // open file descriptor on dir_root on remote filesystem. + // Always use this fd with @m_remote_mount. + int r_fd_dir_root; + }; + + bool is_stopping() { + return m_stopping; + } + + struct Replayer; + class SnapshotReplayerThread : public Thread { + public: + SnapshotReplayerThread(PeerReplayer *peer_replayer) + : m_peer_replayer(peer_replayer) { + } + + void *entry() override { + m_peer_replayer->run(this); + return 0; + } + + private: + PeerReplayer *m_peer_replayer; + }; + + struct DirRegistry { + int fd; + bool canceled = false; + SnapshotReplayerThread *replayer; + }; + + struct SyncEntry { + std::string epath; + ceph_dir_result *dirp; // valid for directories + struct ceph_statx stx; + // set by incremental sync _after_ ensuring missing entries + // in the currently synced snapshot have been propagated to + // the remote filesystem. + bool remote_synced = false; + + SyncEntry(std::string_view path, + const struct ceph_statx &stx) + : epath(path), + stx(stx) { + } + SyncEntry(std::string_view path, + ceph_dir_result *dirp, + const struct ceph_statx &stx) + : epath(path), + dirp(dirp), + stx(stx) { + } + + bool is_directory() const { + return S_ISDIR(stx.stx_mode); + } + + bool needs_remote_sync() const { + return remote_synced; + } + void set_remote_synced() { + remote_synced = true; + } + }; + + using clock = ceph::coarse_mono_clock; + using time = ceph::coarse_mono_time; + + // stats sent to service daemon + struct ServiceDaemonStats { + uint64_t failed_dir_count = 0; + uint64_t recovered_dir_count = 0; + }; + + struct SnapSyncStat { + uint64_t nr_failures = 0; // number of consecutive failures + boost::optional<time> last_failed; // lat failed timestamp + bool failed = false; // hit upper cap for consecutive failures + boost::optional<std::pair<uint64_t, std::string>> last_synced_snap; + boost::optional<std::pair<uint64_t, std::string>> current_syncing_snap; + uint64_t synced_snap_count = 0; + uint64_t deleted_snap_count = 0; + uint64_t renamed_snap_count = 0; + time last_synced = clock::zero(); + boost::optional<double> last_sync_duration; + }; + + void _inc_failed_count(const std::string &dir_root) { + auto max_failures = g_ceph_context->_conf.get_val<uint64_t>( + "cephfs_mirror_max_consecutive_failures_per_directory"); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.last_failed = clock::now(); + if (++sync_stat.nr_failures >= max_failures && !sync_stat.failed) { + sync_stat.failed = true; + ++m_service_daemon_stats.failed_dir_count; + m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, + SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, + m_service_daemon_stats.failed_dir_count); + } + } + void _reset_failed_count(const std::string &dir_root) { + auto &sync_stat = m_snap_sync_stats.at(dir_root); + if (sync_stat.failed) { + ++m_service_daemon_stats.recovered_dir_count; + m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, + SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY, + m_service_daemon_stats.recovered_dir_count); + } + sync_stat.nr_failures = 0; + sync_stat.failed = false; + sync_stat.last_failed = boost::none; + } + + void _set_last_synced_snap(const std::string &dir_root, uint64_t snap_id, + const std::string &snap_name) { + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.last_synced_snap = std::make_pair(snap_id, snap_name); + sync_stat.current_syncing_snap = boost::none; + } + void set_last_synced_snap(const std::string &dir_root, uint64_t snap_id, + const std::string &snap_name) { + std::scoped_lock locker(m_lock); + _set_last_synced_snap(dir_root, snap_id, snap_name); + } + void set_current_syncing_snap(const std::string &dir_root, uint64_t snap_id, + const std::string &snap_name) { + std::scoped_lock locker(m_lock); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.current_syncing_snap = std::make_pair(snap_id, snap_name); + } + void clear_current_syncing_snap(const std::string &dir_root) { + std::scoped_lock locker(m_lock); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.current_syncing_snap = boost::none; + } + void inc_deleted_snap(const std::string &dir_root) { + std::scoped_lock locker(m_lock); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + ++sync_stat.deleted_snap_count; + } + void inc_renamed_snap(const std::string &dir_root) { + std::scoped_lock locker(m_lock); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + ++sync_stat.renamed_snap_count; + } + void set_last_synced_stat(const std::string &dir_root, uint64_t snap_id, + const std::string &snap_name, double duration) { + std::scoped_lock locker(m_lock); + _set_last_synced_snap(dir_root, snap_id, snap_name); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.last_synced = clock::now(); + sync_stat.last_sync_duration = duration; + ++sync_stat.synced_snap_count; + } + + bool should_backoff(const std::string &dir_root, int *retval) { + if (m_fs_mirror->is_blocklisted()) { + *retval = -EBLOCKLISTED; + return true; + } + + std::scoped_lock locker(m_lock); + if (is_stopping()) { + // ceph defines EBLOCKLISTED to ESHUTDOWN (108). so use + // EINPROGRESS to identify shutdown. + *retval = -EINPROGRESS; + return true; + } + auto &dr = m_registered.at(dir_root); + if (dr.canceled) { + *retval = -ECANCELED; + return true; + } + + *retval = 0; + return false; + } + + typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers; + + CephContext *m_cct; + FSMirror *m_fs_mirror; + RadosRef m_local_cluster; + Filesystem m_filesystem; + Peer m_peer; + // probably need to be encapsulated when supporting cancelations + std::map<std::string, DirRegistry> m_registered; + std::vector<std::string> m_directories; + std::map<std::string, SnapSyncStat> m_snap_sync_stats; + MountRef m_local_mount; + ServiceDaemon *m_service_daemon; + PeerReplayerAdminSocketHook *m_asok_hook = nullptr; + + ceph::mutex m_lock; + ceph::condition_variable m_cond; + RadosRef m_remote_cluster; + MountRef m_remote_mount; + bool m_stopping = false; + SnapshotReplayers m_replayers; + + ServiceDaemonStats m_service_daemon_stats; + + void run(SnapshotReplayerThread *replayer); + + boost::optional<std::string> pick_directory(); + int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer); + void unregister_directory(const std::string &dir_root); + int try_lock_directory(const std::string &dir_root, SnapshotReplayerThread *replayer, + DirRegistry *registry); + void unlock_directory(const std::string &dir_root, const DirRegistry ®istry); + void sync_snaps(const std::string &dir_root, std::unique_lock<ceph::mutex> &locker); + + + int build_snap_map(const std::string &dir_root, std::map<uint64_t, std::string> *snap_map, + bool is_remote=false); + + int propagate_snap_deletes(const std::string &dir_root, const std::set<std::string> &snaps); + int propagate_snap_renames(const std::string &dir_root, + const std::set<std::pair<std::string,std::string>> &snaps); + int propagate_deleted_entries(const std::string &dir_root, const std::string &epath, + const FHandles &fh); + int cleanup_remote_dir(const std::string &dir_root, const std::string &epath, + const FHandles &fh); + + int should_sync_entry(const std::string &epath, const struct ceph_statx &cstx, + const FHandles &fh, bool *need_data_sync, bool *need_attr_sync); + + int open_dir(MountRef mnt, const std::string &dir_path, boost::optional<uint64_t> snap_id); + int pre_sync_check_and_open_handles(const std::string &dir_root, const Snapshot ¤t, + boost::optional<Snapshot> prev, FHandles *fh); + void post_sync_close_handles(const FHandles &fh); + + int do_synchronize(const std::string &dir_root, const Snapshot ¤t, + boost::optional<Snapshot> prev); + + int synchronize(const std::string &dir_root, const Snapshot ¤t, + boost::optional<Snapshot> prev); + int do_sync_snaps(const std::string &dir_root); + + int remote_mkdir(const std::string &epath, const struct ceph_statx &stx, const FHandles &fh); + int remote_file_op(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx, + const FHandles &fh, bool need_data_sync, bool need_attr_sync); + int copy_to_remote(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx, + const FHandles &fh); +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_PEER_REPLAYER_H |