diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /qa/tasks/cephfs/test_mirroring.py | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'qa/tasks/cephfs/test_mirroring.py')
-rw-r--r-- | qa/tasks/cephfs/test_mirroring.py | 1263 |
1 files changed, 1263 insertions, 0 deletions
diff --git a/qa/tasks/cephfs/test_mirroring.py b/qa/tasks/cephfs/test_mirroring.py new file mode 100644 index 000000000..a5f8cdac7 --- /dev/null +++ b/qa/tasks/cephfs/test_mirroring.py @@ -0,0 +1,1263 @@ +import os +import json +import errno +import logging +import random +import time + +from io import StringIO +from collections import deque + +from tasks.cephfs.cephfs_test_case import CephFSTestCase +from teuthology.exceptions import CommandFailedError +from teuthology.contextutil import safe_while + +log = logging.getLogger(__name__) + +class TestMirroring(CephFSTestCase): + MDSS_REQUIRED = 5 + CLIENTS_REQUIRED = 2 + REQUIRE_BACKUP_FILESYSTEM = True + + MODULE_NAME = "mirroring" + + def setUp(self): + super(TestMirroring, self).setUp() + self.primary_fs_name = self.fs.name + self.primary_fs_id = self.fs.id + self.secondary_fs_name = self.backup_fs.name + self.secondary_fs_id = self.backup_fs.id + self.enable_mirroring_module() + + def tearDown(self): + self.disable_mirroring_module() + super(TestMirroring, self).tearDown() + + def enable_mirroring_module(self): + self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "enable", TestMirroring.MODULE_NAME) + + def disable_mirroring_module(self): + self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "disable", TestMirroring.MODULE_NAME) + + def enable_mirroring(self, fs_name, fs_id): + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "enable", fs_name) + time.sleep(10) + # verify via asok + res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', + 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') + self.assertTrue(res['peers'] == {}) + self.assertTrue(res['snap_dirs']['dir_count'] == 0) + + def disable_mirroring(self, fs_name, fs_id): + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "disable", fs_name) + time.sleep(10) + # verify via asok + try: + self.mirror_daemon_command(f'mirror status for fs: {fs_name}', + 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') + except CommandFailedError: + pass + else: + raise RuntimeError('expected admin socket to be unavailable') + + def verify_peer_added(self, fs_name, fs_id, peer_spec, remote_fs_name=None): + # verify via asok + res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', + 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') + peer_uuid = self.get_peer_uuid(peer_spec) + self.assertTrue(peer_uuid in res['peers']) + client_name = res['peers'][peer_uuid]['remote']['client_name'] + cluster_name = res['peers'][peer_uuid]['remote']['cluster_name'] + self.assertTrue(peer_spec == f'{client_name}@{cluster_name}') + if remote_fs_name: + self.assertTrue(self.secondary_fs_name == res['peers'][peer_uuid]['remote']['fs_name']) + else: + self.assertTrue(self.fs_name == res['peers'][peer_uuid]['remote']['fs_name']) + + def peer_add(self, fs_name, fs_id, peer_spec, remote_fs_name=None): + if remote_fs_name: + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_add", fs_name, peer_spec, remote_fs_name) + else: + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_add", fs_name, peer_spec) + time.sleep(10) + self.verify_peer_added(fs_name, fs_id, peer_spec, remote_fs_name) + + def peer_remove(self, fs_name, fs_id, peer_spec): + peer_uuid = self.get_peer_uuid(peer_spec) + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_remove", fs_name, peer_uuid) + time.sleep(10) + # verify via asok + res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', + 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') + self.assertTrue(res['peers'] == {} and res['snap_dirs']['dir_count'] == 0) + + def bootstrap_peer(self, fs_name, client_name, site_name): + outj = json.loads(self.mgr_cluster.mon_manager.raw_cluster_cmd( + "fs", "snapshot", "mirror", "peer_bootstrap", "create", fs_name, client_name, site_name)) + return outj['token'] + + def import_peer(self, fs_name, token): + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_bootstrap", "import", + fs_name, token) + + def add_directory(self, fs_name, fs_id, dir_name): + # get initial dir count + res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', + 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') + dir_count = res['snap_dirs']['dir_count'] + log.debug(f'initial dir_count={dir_count}') + + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "add", fs_name, dir_name) + + time.sleep(10) + # verify via asok + res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', + 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') + new_dir_count = res['snap_dirs']['dir_count'] + log.debug(f'new dir_count={new_dir_count}') + self.assertTrue(new_dir_count > dir_count) + + def remove_directory(self, fs_name, fs_id, dir_name): + # get initial dir count + res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', + 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') + dir_count = res['snap_dirs']['dir_count'] + log.debug(f'initial dir_count={dir_count}') + + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "remove", fs_name, dir_name) + + time.sleep(10) + # verify via asok + res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', + 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') + new_dir_count = res['snap_dirs']['dir_count'] + log.debug(f'new dir_count={new_dir_count}') + self.assertTrue(new_dir_count < dir_count) + + def check_peer_status(self, fs_name, fs_id, peer_spec, dir_name, expected_snap_name, + expected_snap_count): + peer_uuid = self.get_peer_uuid(peer_spec) + res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', + 'fs', 'mirror', 'peer', 'status', + f'{fs_name}@{fs_id}', peer_uuid) + self.assertTrue(dir_name in res) + self.assertTrue(res[dir_name]['last_synced_snap']['name'] == expected_snap_name) + self.assertTrue(res[dir_name]['snaps_synced'] == expected_snap_count) + + def check_peer_status_deleted_snap(self, fs_name, fs_id, peer_spec, dir_name, + expected_delete_count): + peer_uuid = self.get_peer_uuid(peer_spec) + res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', + 'fs', 'mirror', 'peer', 'status', + f'{fs_name}@{fs_id}', peer_uuid) + self.assertTrue(dir_name in res) + self.assertTrue(res[dir_name]['snaps_deleted'] == expected_delete_count) + + def check_peer_status_renamed_snap(self, fs_name, fs_id, peer_spec, dir_name, + expected_rename_count): + peer_uuid = self.get_peer_uuid(peer_spec) + res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', + 'fs', 'mirror', 'peer', 'status', + f'{fs_name}@{fs_id}', peer_uuid) + self.assertTrue(dir_name in res) + self.assertTrue(res[dir_name]['snaps_renamed'] == expected_rename_count) + + def check_peer_snap_in_progress(self, fs_name, fs_id, + peer_spec, dir_name, snap_name): + peer_uuid = self.get_peer_uuid(peer_spec) + res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', + 'fs', 'mirror', 'peer', 'status', + f'{fs_name}@{fs_id}', peer_uuid) + self.assertTrue('syncing' == res[dir_name]['state']) + self.assertTrue(res[dir_name]['current_sycning_snap']['name'] == snap_name) + + def verify_snapshot(self, dir_name, snap_name): + snap_list = self.mount_b.ls(path=f'{dir_name}/.snap') + self.assertTrue(snap_name in snap_list) + + source_res = self.mount_a.dir_checksum(path=f'{dir_name}/.snap/{snap_name}', + follow_symlinks=True) + log.debug(f'source snapshot checksum {snap_name} {source_res}') + + dest_res = self.mount_b.dir_checksum(path=f'{dir_name}/.snap/{snap_name}', + follow_symlinks=True) + log.debug(f'destination snapshot checksum {snap_name} {dest_res}') + self.assertTrue(source_res == dest_res) + + def verify_failed_directory(self, fs_name, fs_id, peer_spec, dir_name): + peer_uuid = self.get_peer_uuid(peer_spec) + res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', + 'fs', 'mirror', 'peer', 'status', + f'{fs_name}@{fs_id}', peer_uuid) + self.assertTrue('failed' == res[dir_name]['state']) + + def get_peer_uuid(self, peer_spec): + status = self.fs.status() + fs_map = status.get_fsmap_byname(self.primary_fs_name) + peers = fs_map['mirror_info']['peers'] + for peer_uuid, mirror_info in peers.items(): + client_name = mirror_info['remote']['client_name'] + cluster_name = mirror_info['remote']['cluster_name'] + remote_peer_spec = f'{client_name}@{cluster_name}' + if peer_spec == remote_peer_spec: + return peer_uuid + return None + + def get_daemon_admin_socket(self): + """overloaded by teuthology override (fs/mirror/clients/mirror.yaml)""" + return "/var/run/ceph/cephfs-mirror.asok" + + def get_mirror_daemon_pid(self): + """pid file overloaded in fs/mirror/clients/mirror.yaml""" + return self.mount_a.run_shell(['cat', '/var/run/ceph/cephfs-mirror.pid']).stdout.getvalue().strip() + + def get_mirror_rados_addr(self, fs_name, fs_id): + """return the rados addr used by cephfs-mirror instance""" + res = self.mirror_daemon_command(f'mirror status for fs: {fs_name}', + 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}') + return res['rados_inst'] + + def mirror_daemon_command(self, cmd_label, *args): + asok_path = self.get_daemon_admin_socket() + try: + # use mount_a's remote to execute command + p = self.mount_a.client_remote.run(args= + ['ceph', '--admin-daemon', asok_path] + list(args), + stdout=StringIO(), stderr=StringIO(), timeout=30, + check_status=True, label=cmd_label) + p.wait() + except CommandFailedError as ce: + log.warn(f'mirror daemon command with label "{cmd_label}" failed: {ce}') + raise + res = p.stdout.getvalue().strip() + log.debug(f'command returned={res}') + return json.loads(res) + + def get_mirror_daemon_status(self): + daemon_status = json.loads(self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "daemon", "status")) + log.debug(f'daemon_status: {daemon_status}') + # running a single mirror daemon is supported + status = daemon_status[0] + log.debug(f'status: {status}') + return status + + def test_basic_mirror_commands(self): + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_mirror_peer_commands(self): + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + + # add peer + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + # remove peer + self.peer_remove(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph") + + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_mirror_disable_with_peer(self): + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + + # add peer + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_matching_peer(self): + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + + try: + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph") + except CommandFailedError as ce: + if ce.exitstatus != errno.EINVAL: + raise RuntimeError('invalid errno when adding a matching remote peer') + else: + raise RuntimeError('adding a peer matching local spec should fail') + + # verify via asok -- nothing should get added + res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', + 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') + self.assertTrue(res['peers'] == {}) + + # and explicitly specifying the spec (via filesystem name) should fail too + try: + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.primary_fs_name) + except CommandFailedError as ce: + if ce.exitstatus != errno.EINVAL: + raise RuntimeError('invalid errno when adding a matching remote peer') + else: + raise RuntimeError('adding a peer matching local spec should fail') + + # verify via asok -- nothing should get added + res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', + 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') + self.assertTrue(res['peers'] == {}) + + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_mirror_peer_add_existing(self): + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + + # add peer + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + # adding the same peer should be idempotent + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + # remove peer + self.peer_remove(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph") + + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_peer_commands_with_mirroring_disabled(self): + # try adding peer when mirroring is not enabled + try: + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + except CommandFailedError as ce: + if ce.exitstatus != errno.EINVAL: + raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a peer') + else: + raise RuntimeError(-errno.EINVAL, 'expected peer_add to fail') + + # try removing peer + try: + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_remove", self.primary_fs_name, 'dummy-uuid') + except CommandFailedError as ce: + if ce.exitstatus != errno.EINVAL: + raise RuntimeError(-errno.EINVAL, 'incorrect error code when removing a peer') + else: + raise RuntimeError(-errno.EINVAL, 'expected peer_remove to fail') + + def test_add_directory_with_mirroring_disabled(self): + # try adding a directory when mirroring is not enabled + try: + self.add_directory(self.primary_fs_name, self.primary_fs_id, "/d1") + except CommandFailedError as ce: + if ce.exitstatus != errno.EINVAL: + raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a directory') + else: + raise RuntimeError(-errno.EINVAL, 'expected directory add to fail') + + def test_directory_commands(self): + self.mount_a.run_shell(["mkdir", "d1"]) + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1') + try: + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1') + except CommandFailedError as ce: + if ce.exitstatus != errno.EEXIST: + raise RuntimeError(-errno.EINVAL, 'incorrect error code when re-adding a directory') + else: + raise RuntimeError(-errno.EINVAL, 'expected directory add to fail') + self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d1') + try: + self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d1') + except CommandFailedError as ce: + if ce.exitstatus not in (errno.ENOENT, errno.EINVAL): + raise RuntimeError(-errno.EINVAL, 'incorrect error code when re-deleting a directory') + else: + raise RuntimeError(-errno.EINVAL, 'expected directory removal to fail') + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.mount_a.run_shell(["rmdir", "d1"]) + + def test_add_relative_directory_path(self): + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + try: + self.add_directory(self.primary_fs_name, self.primary_fs_id, './d1') + except CommandFailedError as ce: + if ce.exitstatus != errno.EINVAL: + raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a relative path dir') + else: + raise RuntimeError(-errno.EINVAL, 'expected directory add to fail') + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_add_directory_path_normalization(self): + self.mount_a.run_shell(["mkdir", "-p", "d1/d2/d3"]) + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1/d2/d3') + def check_add_command_failure(dir_path): + try: + self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path) + except CommandFailedError as ce: + if ce.exitstatus != errno.EEXIST: + raise RuntimeError(-errno.EINVAL, 'incorrect error code when re-adding a directory') + else: + raise RuntimeError(-errno.EINVAL, 'expected directory add to fail') + + # everything points for /d1/d2/d3 + check_add_command_failure('/d1/d2/././././././d3') + check_add_command_failure('/d1/d2/././././././d3//////') + check_add_command_failure('/d1/d2/../d2/././././d3') + check_add_command_failure('/././././d1/./././d2/./././d3//////') + check_add_command_failure('/./d1/./d2/./d3/../../../d1/d2/d3') + + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.mount_a.run_shell(["rm", "-rf", "d1"]) + + def test_add_ancestor_and_child_directory(self): + self.mount_a.run_shell(["mkdir", "-p", "d1/d2/d3"]) + self.mount_a.run_shell(["mkdir", "-p", "d1/d4"]) + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1/d2/') + def check_add_command_failure(dir_path): + try: + self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path) + except CommandFailedError as ce: + if ce.exitstatus != errno.EINVAL: + raise RuntimeError(-errno.EINVAL, 'incorrect error code when adding a directory') + else: + raise RuntimeError(-errno.EINVAL, 'expected directory add to fail') + + # cannot add ancestors or a subtree for an existing directory + check_add_command_failure('/') + check_add_command_failure('/d1') + check_add_command_failure('/d1/d2/d3') + + # obviously, one can add a non-ancestor or non-subtree + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1/d4/') + + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.mount_a.run_shell(["rm", "-rf", "d1"]) + + def test_cephfs_mirror_blocklist(self): + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + + # add peer + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', + 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') + peers_1 = set(res['peers']) + + # fetch rados address for blacklist check + rados_inst = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id) + + # simulate non-responding mirror daemon by sending SIGSTOP + pid = self.get_mirror_daemon_pid() + log.debug(f'SIGSTOP to cephfs-mirror pid {pid}') + self.mount_a.run_shell(['kill', '-SIGSTOP', pid]) + + # wait for blocklist timeout -- the manager module would blocklist + # the mirror daemon + time.sleep(40) + + # wake up the mirror daemon -- at this point, the daemon should know + # that it has been blocklisted + log.debug('SIGCONT to cephfs-mirror') + self.mount_a.run_shell(['kill', '-SIGCONT', pid]) + + # check if the rados addr is blocklisted + self.assertTrue(self.mds_cluster.is_addr_blocklisted(rados_inst)) + + # wait enough so that the mirror daemon restarts blocklisted instances + time.sleep(40) + rados_inst_new = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id) + + # and we should get a new rados instance + self.assertTrue(rados_inst != rados_inst_new) + + # along with peers that were added + res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', + 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') + peers_2 = set(res['peers']) + self.assertTrue(peers_1, peers_2) + + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_cephfs_mirror_stats(self): + log.debug('reconfigure client auth caps') + self.mds_cluster.mon_manager.raw_cluster_cmd_result( + 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), + 'mds', 'allow rw', + 'mon', 'allow r', + 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( + self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) + + log.debug(f'mounting filesystem {self.secondary_fs_name}') + self.mount_b.umount_wait() + self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name) + + # create a bunch of files in a directory to snap + self.mount_a.run_shell(["mkdir", "d0"]) + self.mount_a.create_n_files('d0/file', 50, sync=True) + + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + # take a snapshot + self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) + + time.sleep(30) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0', 'snap0', 1) + self.verify_snapshot('d0', 'snap0') + + # some more IO + self.mount_a.run_shell(["mkdir", "d0/d00"]) + self.mount_a.run_shell(["mkdir", "d0/d01"]) + + self.mount_a.create_n_files('d0/d00/more_file', 20, sync=True) + self.mount_a.create_n_files('d0/d01/some_more_file', 75, sync=True) + + # take another snapshot + self.mount_a.run_shell(["mkdir", "d0/.snap/snap1"]) + + time.sleep(60) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0', 'snap1', 2) + self.verify_snapshot('d0', 'snap1') + + # delete a snapshot + self.mount_a.run_shell(["rmdir", "d0/.snap/snap0"]) + + time.sleep(10) + snap_list = self.mount_b.ls(path='d0/.snap') + self.assertTrue('snap0' not in snap_list) + self.check_peer_status_deleted_snap(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0', 1) + + # rename a snapshot + self.mount_a.run_shell(["mv", "d0/.snap/snap1", "d0/.snap/snap2"]) + + time.sleep(10) + snap_list = self.mount_b.ls(path='d0/.snap') + self.assertTrue('snap1' not in snap_list) + self.assertTrue('snap2' in snap_list) + self.check_peer_status_renamed_snap(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0', 1) + + self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0') + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_cephfs_mirror_cancel_sync(self): + log.debug('reconfigure client auth caps') + self.mds_cluster.mon_manager.raw_cluster_cmd_result( + 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), + 'mds', 'allow rw', + 'mon', 'allow r', + 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( + self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) + + log.debug(f'mounting filesystem {self.secondary_fs_name}') + self.mount_b.umount_wait() + self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name) + + # create a bunch of files in a directory to snap + self.mount_a.run_shell(["mkdir", "d0"]) + for i in range(8): + filename = f'file.{i}' + self.mount_a.write_n_mb(os.path.join('d0', filename), 1024) + + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + # take a snapshot + self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) + + time.sleep(10) + self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0', 'snap0') + + self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0') + + snap_list = self.mount_b.ls(path='d0/.snap') + self.assertTrue('snap0' not in snap_list) + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_cephfs_mirror_restart_sync_on_blocklist(self): + log.debug('reconfigure client auth caps') + self.mds_cluster.mon_manager.raw_cluster_cmd_result( + 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), + 'mds', 'allow rw', + 'mon', 'allow r', + 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( + self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) + + log.debug(f'mounting filesystem {self.secondary_fs_name}') + self.mount_b.umount_wait() + self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name) + + # create a bunch of files in a directory to snap + self.mount_a.run_shell(["mkdir", "d0"]) + for i in range(8): + filename = f'file.{i}' + self.mount_a.write_n_mb(os.path.join('d0', filename), 1024) + + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + # fetch rados address for blacklist check + rados_inst = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id) + + # take a snapshot + self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) + + time.sleep(10) + self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0', 'snap0') + + # simulate non-responding mirror daemon by sending SIGSTOP + pid = self.get_mirror_daemon_pid() + log.debug(f'SIGSTOP to cephfs-mirror pid {pid}') + self.mount_a.run_shell(['kill', '-SIGSTOP', pid]) + + # wait for blocklist timeout -- the manager module would blocklist + # the mirror daemon + time.sleep(40) + + # wake up the mirror daemon -- at this point, the daemon should know + # that it has been blocklisted + log.debug('SIGCONT to cephfs-mirror') + self.mount_a.run_shell(['kill', '-SIGCONT', pid]) + + # check if the rados addr is blocklisted + self.assertTrue(self.mds_cluster.is_addr_blocklisted(rados_inst)) + + time.sleep(500) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0', 'snap0', expected_snap_count=1) + self.verify_snapshot('d0', 'snap0') + + self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0') + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_cephfs_mirror_failed_sync_with_correction(self): + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + # add a non-existent directory for synchronization + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') + + # wait for mirror daemon to mark it the directory as failed + time.sleep(120) + self.verify_failed_directory(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0') + + # create the directory + self.mount_a.run_shell(["mkdir", "d0"]) + self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) + + # wait for correction + time.sleep(120) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0', 'snap0', 1) + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_cephfs_mirror_service_daemon_status(self): + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + time.sleep(30) + status = self.get_mirror_daemon_status() + + # assumption for this test: mirroring enabled for a single filesystem w/ single + # peer + + # we have not added any directories + peer = status['filesystems'][0]['peers'][0] + self.assertEquals(status['filesystems'][0]['directory_count'], 0) + self.assertEquals(peer['stats']['failure_count'], 0) + self.assertEquals(peer['stats']['recovery_count'], 0) + + # add a non-existent directory for synchronization -- check if its reported + # in daemon stats + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') + + time.sleep(120) + status = self.get_mirror_daemon_status() + # we added one + peer = status['filesystems'][0]['peers'][0] + self.assertEquals(status['filesystems'][0]['directory_count'], 1) + # failure count should be reflected + self.assertEquals(peer['stats']['failure_count'], 1) + self.assertEquals(peer['stats']['recovery_count'], 0) + + # create the directory, mirror daemon would recover + self.mount_a.run_shell(["mkdir", "d0"]) + + time.sleep(120) + status = self.get_mirror_daemon_status() + peer = status['filesystems'][0]['peers'][0] + self.assertEquals(status['filesystems'][0]['directory_count'], 1) + # failure and recovery count should be reflected + self.assertEquals(peer['stats']['failure_count'], 1) + self.assertEquals(peer['stats']['recovery_count'], 1) + + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_mirroring_init_failure(self): + """Test mirror daemon init failure""" + + # disable mgr mirroring plugin as it would try to load dir map on + # on mirroring enabled for a filesystem (an throw up erorrs in + # the logs) + self.disable_mirroring_module() + + # enable mirroring through mon interface -- this should result in the mirror daemon + # failing to enable mirroring due to absence of `cephfs_mirorr` index object. + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "enable", self.primary_fs_name) + + with safe_while(sleep=5, tries=10, action='wait for failed state') as proceed: + while proceed(): + try: + # verify via asok + res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', + 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') + if not 'state' in res: + return + self.assertTrue(res['state'] == "failed") + return True + except: + pass + + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "disable", self.primary_fs_name) + time.sleep(10) + # verify via asok + try: + self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', + 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') + except CommandFailedError: + pass + else: + raise RuntimeError('expected admin socket to be unavailable') + + def test_mirroring_init_failure_with_recovery(self): + """Test if the mirror daemon can recover from a init failure""" + + # disable mgr mirroring plugin as it would try to load dir map on + # on mirroring enabled for a filesystem (an throw up erorrs in + # the logs) + self.disable_mirroring_module() + + # enable mirroring through mon interface -- this should result in the mirror daemon + # failing to enable mirroring due to absence of `cephfs_mirror` index object. + + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "enable", self.primary_fs_name) + # need safe_while since non-failed status pops up as mirroring is restarted + # internally in mirror daemon. + with safe_while(sleep=5, tries=20, action='wait for failed state') as proceed: + while proceed(): + try: + # verify via asok + res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', + 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') + if not 'state' in res: + return + self.assertTrue(res['state'] == "failed") + return True + except: + pass + + # create the index object and check daemon recovery + try: + p = self.mount_a.client_remote.run(args=['rados', '-p', self.fs.metadata_pool_name, 'create', 'cephfs_mirror'], + stdout=StringIO(), stderr=StringIO(), timeout=30, + check_status=True, label="create index object") + p.wait() + except CommandFailedError as ce: + log.warn(f'mirror daemon command to create mirror index object failed: {ce}') + raise + time.sleep(30) + res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', + 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') + self.assertTrue(res['peers'] == {}) + self.assertTrue(res['snap_dirs']['dir_count'] == 0) + + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "mirror", "disable", self.primary_fs_name) + time.sleep(10) + # verify via asok + try: + self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}', + 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}') + except CommandFailedError: + pass + else: + raise RuntimeError('expected admin socket to be unavailable') + + def test_cephfs_mirror_peer_bootstrap(self): + """Test importing peer bootstrap token""" + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + + # create a bootstrap token for the peer + bootstrap_token = self.bootstrap_peer(self.secondary_fs_name, "client.mirror_peer_bootstrap", "site-remote") + + # import the peer via bootstrap token + self.import_peer(self.primary_fs_name, bootstrap_token) + time.sleep(10) + self.verify_peer_added(self.primary_fs_name, self.primary_fs_id, "client.mirror_peer_bootstrap@site-remote", + self.secondary_fs_name) + + # verify via peer_list interface + peer_uuid = self.get_peer_uuid("client.mirror_peer_bootstrap@site-remote") + res = json.loads(self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "peer_list", self.primary_fs_name)) + self.assertTrue(peer_uuid in res) + self.assertTrue('mon_host' in res[peer_uuid] and res[peer_uuid]['mon_host'] != '') + + # remove peer + self.peer_remove(self.primary_fs_name, self.primary_fs_id, "client.mirror_peer_bootstrap@site-remote") + # disable mirroring + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_cephfs_mirror_symlink_sync(self): + log.debug('reconfigure client auth caps') + self.mds_cluster.mon_manager.raw_cluster_cmd_result( + 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), + 'mds', 'allow rw', + 'mon', 'allow r', + 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( + self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) + + log.debug(f'mounting filesystem {self.secondary_fs_name}') + self.mount_b.umount_wait() + self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name) + + # create a bunch of files w/ symbolic links in a directory to snap + self.mount_a.run_shell(["mkdir", "d0"]) + self.mount_a.create_n_files('d0/file', 10, sync=True) + self.mount_a.run_shell(["ln", "-s", "./file_0", "d0/sym_0"]) + self.mount_a.run_shell(["ln", "-s", "./file_1", "d0/sym_1"]) + self.mount_a.run_shell(["ln", "-s", "./file_2", "d0/sym_2"]) + + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + # take a snapshot + self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) + + time.sleep(30) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0', 'snap0', 1) + self.verify_snapshot('d0', 'snap0') + + self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0') + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_cephfs_mirror_with_parent_snapshot(self): + """Test snapshot synchronization with parent directory snapshots""" + self.mount_a.run_shell(["mkdir", "-p", "d0/d1/d2/d3"]) + + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0/d1/d2/d3') + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + # take a snapshot + self.mount_a.run_shell(["mkdir", "d0/d1/d2/d3/.snap/snap0"]) + + time.sleep(30) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0/d1/d2/d3', 'snap0', 1) + + # create snapshots in parent directories + self.mount_a.run_shell(["mkdir", "d0/.snap/snap_d0"]) + self.mount_a.run_shell(["mkdir", "d0/d1/.snap/snap_d1"]) + self.mount_a.run_shell(["mkdir", "d0/d1/d2/.snap/snap_d2"]) + + # try syncing more snapshots + self.mount_a.run_shell(["mkdir", "d0/d1/d2/d3/.snap/snap1"]) + time.sleep(30) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0/d1/d2/d3', 'snap1', 2) + + self.mount_a.run_shell(["rmdir", "d0/d1/d2/d3/.snap/snap0"]) + self.mount_a.run_shell(["rmdir", "d0/d1/d2/d3/.snap/snap1"]) + time.sleep(15) + self.check_peer_status_deleted_snap(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0/d1/d2/d3', 2) + + self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0/d1/d2/d3') + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_cephfs_mirror_remove_on_stall(self): + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + + # fetch rados address for blacklist check + rados_inst = self.get_mirror_rados_addr(self.primary_fs_name, self.primary_fs_id) + + # simulate non-responding mirror daemon by sending SIGSTOP + pid = self.get_mirror_daemon_pid() + log.debug(f'SIGSTOP to cephfs-mirror pid {pid}') + self.mount_a.run_shell(['kill', '-SIGSTOP', pid]) + + # wait for blocklist timeout -- the manager module would blocklist + # the mirror daemon + time.sleep(40) + + # make sure the rados addr is blocklisted + self.assertTrue(self.mds_cluster.is_addr_blocklisted(rados_inst)) + + # now we are sure that there are no "active" mirror daemons -- add a directory path. + dir_path_p = "/d0/d1" + dir_path = "/d0/d1/d2" + + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "add", self.primary_fs_name, dir_path) + + time.sleep(10) + # this uses an undocumented interface to get dirpath map state + res_json = self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path) + res = json.loads(res_json) + # there are no mirror daemons + self.assertTrue(res['state'], 'stalled') + + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "remove", self.primary_fs_name, dir_path) + + time.sleep(10) + try: + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path) + except CommandFailedError as ce: + if ce.exitstatus != errno.ENOENT: + raise RuntimeError('invalid errno when checking dirmap status for non-existent directory') + else: + raise RuntimeError('incorrect errno when checking dirmap state for non-existent directory') + + # adding a parent directory should be allowed + self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "add", self.primary_fs_name, dir_path_p) + + time.sleep(10) + # however, this directory path should get stalled too + res_json = self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path_p) + res = json.loads(res_json) + # there are no mirror daemons + self.assertTrue(res['state'], 'stalled') + + # wake up the mirror daemon -- at this point, the daemon should know + # that it has been blocklisted + log.debug('SIGCONT to cephfs-mirror') + self.mount_a.run_shell(['kill', '-SIGCONT', pid]) + + # wait for restart mirror on blocklist + time.sleep(60) + res_json = self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "snapshot", "mirror", "dirmap", self.primary_fs_name, dir_path_p) + res = json.loads(res_json) + # there are no mirror daemons + self.assertTrue(res['state'], 'mapped') + + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_cephfs_mirror_incremental_sync(self): + """ Test incremental snapshot synchronization (based on mtime differences).""" + log.debug('reconfigure client auth caps') + self.mds_cluster.mon_manager.raw_cluster_cmd_result( + 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), + 'mds', 'allow rw', + 'mon', 'allow r', + 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( + self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) + log.debug(f'mounting filesystem {self.secondary_fs_name}') + self.mount_b.umount_wait() + self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name) + + repo = 'ceph-qa-suite' + repo_dir = 'ceph_repo' + repo_path = f'{repo_dir}/{repo}' + + def clone_repo(): + self.mount_a.run_shell([ + 'git', 'clone', '--branch', 'giant', + f'http://github.com/ceph/{repo}', repo_path]) + + def exec_git_cmd(cmd_list): + self.mount_a.run_shell(['git', '--git-dir', f'{self.mount_a.mountpoint}/{repo_path}/.git', *cmd_list]) + + self.mount_a.run_shell(["mkdir", repo_dir]) + clone_repo() + + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{repo_path}') + self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a']) + + # full copy, takes time + time.sleep(500) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1) + self.verify_snapshot(repo_path, 'snap_a') + + # create some diff + num = random.randint(5, 20) + log.debug(f'resetting to HEAD~{num}') + exec_git_cmd(["reset", "--hard", f'HEAD~{num}']) + + self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_b']) + # incremental copy, should be fast + time.sleep(180) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2) + self.verify_snapshot(repo_path, 'snap_b') + + # diff again, this time back to HEAD + log.debug('resetting to HEAD') + exec_git_cmd(["pull"]) + + self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_c']) + # incremental copy, should be fast + time.sleep(180) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", f'/{repo_path}', 'snap_c', 3) + self.verify_snapshot(repo_path, 'snap_c') + + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_cephfs_mirror_incremental_sync_with_type_mixup(self): + """ Test incremental snapshot synchronization with file type changes. + + The same filename exist as a different type in subsequent snapshot. + This verifies if the mirror daemon can identify file type mismatch and + sync snapshots. + + \ snap_0 snap_1 snap_2 snap_3 + \----------------------------------------------- + file_x | reg sym dir reg + | + file_y | dir reg sym dir + | + file_z | sym dir reg sym + """ + log.debug('reconfigure client auth caps') + self.mds_cluster.mon_manager.raw_cluster_cmd_result( + 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), + 'mds', 'allow rw', + 'mon', 'allow r', + 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( + self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) + log.debug(f'mounting filesystem {self.secondary_fs_name}') + self.mount_b.umount_wait() + self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name) + + typs = deque(['reg', 'dir', 'sym']) + def cleanup_and_create_with_type(dirname, fnames): + self.mount_a.run_shell_payload(f"rm -rf {dirname}/*") + fidx = 0 + for t in typs: + fname = f'{dirname}/{fnames[fidx]}' + log.debug(f'file: {fname} type: {t}') + if t == 'reg': + self.mount_a.run_shell(["touch", fname]) + self.mount_a.write_file(fname, data=fname) + elif t == 'dir': + self.mount_a.run_shell(["mkdir", fname]) + elif t == 'sym': + # verify ELOOP in mirror daemon + self.mount_a.run_shell(["ln", "-s", "..", fname]) + fidx += 1 + + def verify_types(dirname, fnames, snap_name): + tidx = 0 + for fname in fnames: + t = self.mount_b.run_shell_payload(f"stat -c %F {dirname}/.snap/{snap_name}/{fname}").stdout.getvalue().strip() + if typs[tidx] == 'reg': + self.assertEquals('regular file', t) + elif typs[tidx] == 'dir': + self.assertEquals('directory', t) + elif typs[tidx] == 'sym': + self.assertEquals('symbolic link', t) + tidx += 1 + + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + self.mount_a.run_shell(["mkdir", "d0"]) + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') + + fnames = ['file_x', 'file_y', 'file_z'] + turns = 0 + while turns != len(typs): + snapname = f'snap_{turns}' + cleanup_and_create_with_type('d0', fnames) + self.mount_a.run_shell(['mkdir', f'd0/.snap/{snapname}']) + time.sleep(30) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0', snapname, turns+1) + verify_types('d0', fnames, snapname) + # next type + typs.rotate(1) + turns += 1 + + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_cephfs_mirror_sync_with_purged_snapshot(self): + """Test snapshot synchronization in midst of snapshot deletes. + + Deleted the previous snapshot when the mirror daemon is figuring out + incremental differences between current and previous snaphot. The + mirror daemon should identify the purge and switch to using remote + comparison to sync the snapshot (in the next iteration of course). + """ + + log.debug('reconfigure client auth caps') + self.mds_cluster.mon_manager.raw_cluster_cmd_result( + 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), + 'mds', 'allow rw', + 'mon', 'allow r', + 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( + self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) + log.debug(f'mounting filesystem {self.secondary_fs_name}') + self.mount_b.umount_wait() + self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name) + + repo = 'ceph-qa-suite' + repo_dir = 'ceph_repo' + repo_path = f'{repo_dir}/{repo}' + + def clone_repo(): + self.mount_a.run_shell([ + 'git', 'clone', '--branch', 'giant', + f'http://github.com/ceph/{repo}', repo_path]) + + def exec_git_cmd(cmd_list): + self.mount_a.run_shell(['git', '--git-dir', f'{self.mount_a.mountpoint}/{repo_path}/.git', *cmd_list]) + + self.mount_a.run_shell(["mkdir", repo_dir]) + clone_repo() + + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{repo_path}') + self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a']) + + # full copy, takes time + time.sleep(500) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1) + self.verify_snapshot(repo_path, 'snap_a') + + # create some diff + num = random.randint(60, 100) + log.debug(f'resetting to HEAD~{num}') + exec_git_cmd(["reset", "--hard", f'HEAD~{num}']) + + self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_b']) + + time.sleep(15) + self.mount_a.run_shell(['rmdir', f'{repo_path}/.snap/snap_a']) + + # incremental copy but based on remote dir_root + time.sleep(300) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2) + self.verify_snapshot(repo_path, 'snap_b') + + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_cephfs_mirror_peer_add_primary(self): + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + # try adding the primary file system as a peer to secondary file + # system + try: + self.peer_add(self.secondary_fs_name, self.secondary_fs_id, "client.mirror_remote@ceph", self.primary_fs_name) + except CommandFailedError as ce: + if ce.exitstatus != errno.EINVAL: + raise RuntimeError('invalid errno when adding a primary file system') + else: + raise RuntimeError('adding peer should fail') + + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + def test_cephfs_mirror_cancel_mirroring_and_readd(self): + """ + Test adding a directory path for synchronization post removal of already added directory paths + + ... to ensure that synchronization of the newly added directory path functions + as expected. Note that we schedule three (3) directories for mirroring to ensure + that all replayer threads (3 by default) in the mirror daemon are busy. + """ + log.debug('reconfigure client auth caps') + self.mds_cluster.mon_manager.raw_cluster_cmd_result( + 'auth', 'caps', "client.{0}".format(self.mount_b.client_id), + 'mds', 'allow rw', + 'mon', 'allow r', + 'osd', 'allow rw pool={0}, allow rw pool={1}'.format( + self.backup_fs.get_data_pool_name(), self.backup_fs.get_data_pool_name())) + + log.debug(f'mounting filesystem {self.secondary_fs_name}') + self.mount_b.umount_wait() + self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name) + + # create a bunch of files in a directory to snap + self.mount_a.run_shell(["mkdir", "d0"]) + self.mount_a.run_shell(["mkdir", "d1"]) + self.mount_a.run_shell(["mkdir", "d2"]) + for i in range(4): + filename = f'file.{i}' + self.mount_a.write_n_mb(os.path.join('d0', filename), 1024) + self.mount_a.write_n_mb(os.path.join('d1', filename), 1024) + self.mount_a.write_n_mb(os.path.join('d2', filename), 1024) + + log.debug('enabling mirroring') + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + log.debug('adding directory paths') + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1') + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d2') + self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) + + # take snapshots + log.debug('taking snapshots') + self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) + self.mount_a.run_shell(["mkdir", "d1/.snap/snap0"]) + self.mount_a.run_shell(["mkdir", "d2/.snap/snap0"]) + + time.sleep(10) + log.debug('checking snap in progress') + self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0', 'snap0') + self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d1', 'snap0') + self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d2', 'snap0') + + log.debug('removing directories 1') + self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0') + log.debug('removing directories 2') + self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d1') + log.debug('removing directories 3') + self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d2') + + log.debug('removing snapshots') + self.mount_a.run_shell(["rmdir", "d0/.snap/snap0"]) + self.mount_a.run_shell(["rmdir", "d1/.snap/snap0"]) + self.mount_a.run_shell(["rmdir", "d2/.snap/snap0"]) + + for i in range(4): + filename = f'file.{i}' + log.debug(f'deleting {filename}') + self.mount_a.run_shell(["rm", "-f", os.path.join('d0', filename)]) + self.mount_a.run_shell(["rm", "-f", os.path.join('d1', filename)]) + self.mount_a.run_shell(["rm", "-f", os.path.join('d2', filename)]) + + log.debug('creating new files...') + self.mount_a.create_n_files('d0/file', 50, sync=True) + self.mount_a.create_n_files('d1/file', 50, sync=True) + self.mount_a.create_n_files('d2/file', 50, sync=True) + + log.debug('adding directory paths') + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1') + self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d2') + + log.debug('creating new snapshots...') + self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) + self.mount_a.run_shell(["mkdir", "d1/.snap/snap0"]) + self.mount_a.run_shell(["mkdir", "d2/.snap/snap0"]) + + time.sleep(60) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0', 'snap0', 1) + self.verify_snapshot('d0', 'snap0') + + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d1', 'snap0', 1) + self.verify_snapshot('d1', 'snap0') + + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d2', 'snap0', 1) + self.verify_snapshot('d2', 'snap0') + + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) |