diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /qa/tasks/cephfs/test_client_recovery.py | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'qa/tasks/cephfs/test_client_recovery.py')
-rw-r--r-- | qa/tasks/cephfs/test_client_recovery.py | 698 |
1 files changed, 698 insertions, 0 deletions
diff --git a/qa/tasks/cephfs/test_client_recovery.py b/qa/tasks/cephfs/test_client_recovery.py new file mode 100644 index 000000000..24726b369 --- /dev/null +++ b/qa/tasks/cephfs/test_client_recovery.py @@ -0,0 +1,698 @@ + +""" +Teuthology task for exercising CephFS client recovery +""" + +import logging +from textwrap import dedent +import time +import distutils.version as version +import re +import os + +from teuthology.orchestra import run +from teuthology.orchestra.run import CommandFailedError +from tasks.cephfs.fuse_mount import FuseMount +from tasks.cephfs.cephfs_test_case import CephFSTestCase +from teuthology.packaging import get_package_version + +log = logging.getLogger(__name__) + + +# Arbitrary timeouts for operations involving restarting +# an MDS or waiting for it to come up +MDS_RESTART_GRACE = 60 + + +class TestClientNetworkRecovery(CephFSTestCase): + REQUIRE_KCLIENT_REMOTE = True + REQUIRE_ONE_CLIENT_REMOTE = True + CLIENTS_REQUIRED = 2 + + LOAD_SETTINGS = ["mds_reconnect_timeout", "ms_max_backoff"] + + # Environment references + mds_reconnect_timeout = None + ms_max_backoff = None + + def test_network_death(self): + """ + Simulate software freeze or temporary network failure. + + Check that the client blocks I/O during failure, and completes + I/O after failure. + """ + + session_timeout = self.fs.get_var("session_timeout") + self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false']) + + # We only need one client + self.mount_b.umount_wait() + + # Initially our one client session should be visible + client_id = self.mount_a.get_global_id() + ls_data = self._session_list() + self.assert_session_count(1, ls_data) + self.assertEqual(ls_data[0]['id'], client_id) + self.assert_session_state(client_id, "open") + + # ...and capable of doing I/O without blocking + self.mount_a.create_files() + + # ...but if we turn off the network + self.fs.set_clients_block(True) + + # ...and try and start an I/O + write_blocked = self.mount_a.write_background() + + # ...then it should block + self.assertFalse(write_blocked.finished) + self.assert_session_state(client_id, "open") + time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale + self.assertFalse(write_blocked.finished) + self.assert_session_state(client_id, "stale") + + # ...until we re-enable I/O + self.fs.set_clients_block(False) + + # ...when it should complete promptly + a = time.time() + self.wait_until_true(lambda: write_blocked.finished, self.ms_max_backoff * 2) + write_blocked.wait() # Already know we're finished, wait() to raise exception on errors + recovery_time = time.time() - a + log.info("recovery time: {0}".format(recovery_time)) + self.assert_session_state(client_id, "open") + + +class TestClientRecovery(CephFSTestCase): + REQUIRE_KCLIENT_REMOTE = True + CLIENTS_REQUIRED = 2 + + LOAD_SETTINGS = ["mds_reconnect_timeout", "ms_max_backoff"] + + # Environment references + mds_reconnect_timeout = None + ms_max_backoff = None + + def test_basic(self): + # Check that two clients come up healthy and see each others' files + # ===================================================== + self.mount_a.create_files() + self.mount_a.check_files() + self.mount_a.umount_wait() + + self.mount_b.check_files() + + self.mount_a.mount_wait() + + # Check that the admin socket interface is correctly reporting + # two sessions + # ===================================================== + ls_data = self._session_list() + self.assert_session_count(2, ls_data) + + self.assertSetEqual( + set([l['id'] for l in ls_data]), + {self.mount_a.get_global_id(), self.mount_b.get_global_id()} + ) + + def test_restart(self): + # Check that after an MDS restart both clients reconnect and continue + # to handle I/O + # ===================================================== + self.fs.mds_fail_restart() + self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE) + + self.mount_a.create_destroy() + self.mount_b.create_destroy() + + def _session_num_caps(self, client_id): + ls_data = self.fs.mds_asok(['session', 'ls']) + return int(self._session_by_id(ls_data).get(client_id, {'num_caps': None})['num_caps']) + + def test_reconnect_timeout(self): + # Reconnect timeout + # ================= + # Check that if I stop an MDS and a client goes away, the MDS waits + # for the reconnect period + + mount_a_client_id = self.mount_a.get_global_id() + + self.fs.fail() + + self.mount_a.umount_wait(force=True) + + self.fs.set_joinable() + + self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE) + # Check that the MDS locally reports its state correctly + status = self.fs.mds_asok(['status']) + self.assertIn("reconnect_status", status) + + ls_data = self._session_list() + self.assert_session_count(2, ls_data) + + # The session for the dead client should have the 'reconnect' flag set + self.assertTrue(self.get_session(mount_a_client_id)['reconnecting']) + + # Wait for the reconnect state to clear, this should take the + # reconnect timeout period. + in_reconnect_for = self.fs.wait_for_state('up:active', timeout=self.mds_reconnect_timeout * 2) + # Check that the period we waited to enter active is within a factor + # of two of the reconnect timeout. + self.assertGreater(in_reconnect_for, self.mds_reconnect_timeout // 2, + "Should have been in reconnect phase for {0} but only took {1}".format( + self.mds_reconnect_timeout, in_reconnect_for + )) + + self.assert_session_count(1) + + # Check that the client that timed out during reconnect can + # mount again and do I/O + self.mount_a.mount_wait() + self.mount_a.create_destroy() + + self.assert_session_count(2) + + def test_reconnect_eviction(self): + # Eviction during reconnect + # ========================= + mount_a_client_id = self.mount_a.get_global_id() + + self.fs.fail() + + # The mount goes away while the MDS is offline + self.mount_a.kill() + + # wait for it to die + time.sleep(5) + + self.fs.set_joinable() + + # Enter reconnect phase + self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE) + self.assert_session_count(2) + + # Evict the stuck client + self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) + self.assert_session_count(1) + + # Observe that we proceed to active phase without waiting full reconnect timeout + evict_til_active = self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE) + # Once we evict the troublemaker, the reconnect phase should complete + # in well under the reconnect timeout. + self.assertLess(evict_til_active, self.mds_reconnect_timeout * 0.5, + "reconnect did not complete soon enough after eviction, took {0}".format( + evict_til_active + )) + + # We killed earlier so must clean up before trying to use again + self.mount_a.kill_cleanup() + + # Bring the client back + self.mount_a.mount_wait() + self.mount_a.create_destroy() + + def _test_stale_caps(self, write): + session_timeout = self.fs.get_var("session_timeout") + + # Capability release from stale session + # ===================================== + if write: + cap_holder = self.mount_a.open_background() + else: + self.mount_a.run_shell(["touch", "background_file"]) + self.mount_a.umount_wait() + self.mount_a.mount_wait() + cap_holder = self.mount_a.open_background(write=False) + + self.assert_session_count(2) + mount_a_gid = self.mount_a.get_global_id() + + # Wait for the file to be visible from another client, indicating + # that mount_a has completed its network ops + self.mount_b.wait_for_visible() + + # Simulate client death + self.mount_a.suspend_netns() + + # wait for it to die so it doesn't voluntarily release buffer cap + time.sleep(5) + + try: + # Now, after session_timeout seconds, the waiter should + # complete their operation when the MDS marks the holder's + # session stale. + cap_waiter = self.mount_b.write_background() + a = time.time() + cap_waiter.wait() + b = time.time() + + # Should have succeeded + self.assertEqual(cap_waiter.exitstatus, 0) + + if write: + self.assert_session_count(1) + else: + self.assert_session_state(mount_a_gid, "stale") + + cap_waited = b - a + log.info("cap_waiter waited {0}s".format(cap_waited)) + self.assertTrue(session_timeout / 2.0 <= cap_waited <= session_timeout * 2.0, + "Capability handover took {0}, expected approx {1}".format( + cap_waited, session_timeout + )) + + self.mount_a._kill_background(cap_holder) + finally: + # teardown() doesn't quite handle this case cleanly, so help it out + self.mount_a.resume_netns() + + def test_stale_read_caps(self): + self._test_stale_caps(False) + + def test_stale_write_caps(self): + self._test_stale_caps(True) + + def test_evicted_caps(self): + # Eviction while holding a capability + # =================================== + + session_timeout = self.fs.get_var("session_timeout") + + # Take out a write capability on a file on client A, + # and then immediately kill it. + cap_holder = self.mount_a.open_background() + mount_a_client_id = self.mount_a.get_global_id() + + # Wait for the file to be visible from another client, indicating + # that mount_a has completed its network ops + self.mount_b.wait_for_visible() + + # Simulate client death + self.mount_a.suspend_netns() + + # wait for it to die so it doesn't voluntarily release buffer cap + time.sleep(5) + + try: + # The waiter should get stuck waiting for the capability + # held on the MDS by the now-dead client A + cap_waiter = self.mount_b.write_background() + time.sleep(5) + self.assertFalse(cap_waiter.finished) + + self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) + # Now, because I evicted the old holder of the capability, it should + # immediately get handed over to the waiter + a = time.time() + cap_waiter.wait() + b = time.time() + cap_waited = b - a + log.info("cap_waiter waited {0}s".format(cap_waited)) + # This is the check that it happened 'now' rather than waiting + # for the session timeout + self.assertLess(cap_waited, session_timeout / 2.0, + "Capability handover took {0}, expected less than {1}".format( + cap_waited, session_timeout / 2.0 + )) + + self.mount_a._kill_background(cap_holder) + finally: + self.mount_a.resume_netns() + + def test_trim_caps(self): + # Trim capability when reconnecting MDS + # =================================== + + count = 500 + # Create lots of files + for i in range(count): + self.mount_a.run_shell(["touch", "f{0}".format(i)]) + + # Populate mount_b's cache + self.mount_b.run_shell(["ls", "-l"]) + + client_id = self.mount_b.get_global_id() + num_caps = self._session_num_caps(client_id) + self.assertGreaterEqual(num_caps, count) + + # Restart MDS. client should trim its cache when reconnecting to the MDS + self.fs.mds_fail_restart() + self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE) + + num_caps = self._session_num_caps(client_id) + self.assertLess(num_caps, count, + "should have less than {0} capabilities, have {1}".format( + count, num_caps + )) + + def _is_flockable(self): + a_version_str = get_package_version(self.mount_a.client_remote, "fuse") + b_version_str = get_package_version(self.mount_b.client_remote, "fuse") + flock_version_str = "2.9" + + version_regex = re.compile(r"[0-9\.]+") + a_result = version_regex.match(a_version_str) + self.assertTrue(a_result) + b_result = version_regex.match(b_version_str) + self.assertTrue(b_result) + a_version = version.StrictVersion(a_result.group()) + b_version = version.StrictVersion(b_result.group()) + flock_version=version.StrictVersion(flock_version_str) + + if (a_version >= flock_version and b_version >= flock_version): + log.info("flock locks are available") + return True + else: + log.info("not testing flock locks, machines have versions {av} and {bv}".format( + av=a_version_str,bv=b_version_str)) + return False + + def test_filelock(self): + """ + Check that file lock doesn't get lost after an MDS restart + """ + + flockable = self._is_flockable() + lock_holder = self.mount_a.lock_background(do_flock=flockable) + + self.mount_b.wait_for_visible("background_file-2") + self.mount_b.check_filelock(do_flock=flockable) + + self.fs.mds_fail_restart() + self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE) + + self.mount_b.check_filelock(do_flock=flockable) + + # Tear down the background process + self.mount_a._kill_background(lock_holder) + + def test_filelock_eviction(self): + """ + Check that file lock held by evicted client is given to + waiting client. + """ + if not self._is_flockable(): + self.skipTest("flock is not available") + + lock_holder = self.mount_a.lock_background() + self.mount_b.wait_for_visible("background_file-2") + self.mount_b.check_filelock() + + lock_taker = self.mount_b.lock_and_release() + # Check the taker is waiting (doesn't get it immediately) + time.sleep(2) + self.assertFalse(lock_holder.finished) + self.assertFalse(lock_taker.finished) + + try: + mount_a_client_id = self.mount_a.get_global_id() + self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) + + # Evicting mount_a should let mount_b's attempt to take the lock + # succeed + self.wait_until_true(lambda: lock_taker.finished, timeout=10) + finally: + # Tear down the background process + self.mount_a._kill_background(lock_holder) + + # teardown() doesn't quite handle this case cleanly, so help it out + self.mount_a.kill() + self.mount_a.kill_cleanup() + + # Bring the client back + self.mount_a.mount_wait() + + def test_dir_fsync(self): + self._test_fsync(True); + + def test_create_fsync(self): + self._test_fsync(False); + + def _test_fsync(self, dirfsync): + """ + That calls to fsync guarantee visibility of metadata to another + client immediately after the fsyncing client dies. + """ + + # Leave this guy out until he's needed + self.mount_b.umount_wait() + + # Create dir + child dentry on client A, and fsync the dir + path = os.path.join(self.mount_a.mountpoint, "subdir") + self.mount_a.run_python( + dedent(""" + import os + import time + + path = "{path}" + + print("Starting creation...") + start = time.time() + + os.mkdir(path) + dfd = os.open(path, os.O_DIRECTORY) + + fd = open(os.path.join(path, "childfile"), "w") + print("Finished creation in {{0}}s".format(time.time() - start)) + + print("Starting fsync...") + start = time.time() + if {dirfsync}: + os.fsync(dfd) + else: + os.fsync(fd) + print("Finished fsync in {{0}}s".format(time.time() - start)) + """.format(path=path,dirfsync=str(dirfsync))) + ) + + # Immediately kill the MDS and then client A + self.fs.fail() + self.mount_a.kill() + self.mount_a.kill_cleanup() + + # Restart the MDS. Wait for it to come up, it'll have to time out in clientreplay + self.fs.set_joinable() + log.info("Waiting for reconnect...") + self.fs.wait_for_state("up:reconnect") + log.info("Waiting for active...") + self.fs.wait_for_state("up:active", timeout=MDS_RESTART_GRACE + self.mds_reconnect_timeout) + log.info("Reached active...") + + # Is the child dentry visible from mount B? + self.mount_b.mount_wait() + self.mount_b.run_shell(["ls", "subdir/childfile"]) + + def test_unmount_for_evicted_client(self): + """Test if client hangs on unmount after evicting the client.""" + mount_a_client_id = self.mount_a.get_global_id() + self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) + + self.mount_a.umount_wait(require_clean=True, timeout=30) + + def test_stale_renew(self): + if not isinstance(self.mount_a, FuseMount): + self.skipTest("Require FUSE client to handle signal STOP/CONT") + + session_timeout = self.fs.get_var("session_timeout") + + self.mount_a.run_shell(["mkdir", "testdir"]) + self.mount_a.run_shell(["touch", "testdir/file1"]) + # populate readdir cache + self.mount_a.run_shell(["ls", "testdir"]) + self.mount_b.run_shell(["ls", "testdir"]) + + # check if readdir cache is effective + initial_readdirs = self.fs.mds_asok(['perf', 'dump', 'mds_server', 'req_readdir_latency']) + self.mount_b.run_shell(["ls", "testdir"]) + current_readdirs = self.fs.mds_asok(['perf', 'dump', 'mds_server', 'req_readdir_latency']) + self.assertEqual(current_readdirs, initial_readdirs); + + mount_b_gid = self.mount_b.get_global_id() + # stop ceph-fuse process of mount_b + self.mount_b.suspend_netns() + + self.assert_session_state(mount_b_gid, "open") + time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale + + self.mount_a.run_shell(["touch", "testdir/file2"]) + self.assert_session_state(mount_b_gid, "stale") + + # resume ceph-fuse process of mount_b + self.mount_b.resume_netns() + # Is the new file visible from mount_b? (caps become invalid after session stale) + self.mount_b.run_shell(["ls", "testdir/file2"]) + + def test_abort_conn(self): + """ + Check that abort_conn() skips closing mds sessions. + """ + if not isinstance(self.mount_a, FuseMount): + self.skipTest("Testing libcephfs function") + + self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false']) + session_timeout = self.fs.get_var("session_timeout") + + self.mount_a.umount_wait() + self.mount_b.umount_wait() + + gid_str = self.mount_a.run_python(dedent(""" + import cephfs as libcephfs + cephfs = libcephfs.LibCephFS(conffile='') + cephfs.mount() + client_id = cephfs.get_instance_id() + cephfs.abort_conn() + print(client_id) + """) + ) + gid = int(gid_str); + + self.assert_session_state(gid, "open") + time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale + self.assert_session_state(gid, "stale") + + def test_dont_mark_unresponsive_client_stale(self): + """ + Test that an unresponsive client holding caps is not marked stale or + evicted unless another clients wants its caps. + """ + if not isinstance(self.mount_a, FuseMount): + self.skipTest("Require FUSE client to handle signal STOP/CONT") + + # XXX: To conduct this test we need at least two clients since a + # single client is never evcited by MDS. + SESSION_TIMEOUT = 30 + SESSION_AUTOCLOSE = 50 + time_at_beg = time.time() + mount_a_gid = self.mount_a.get_global_id() + _ = self.mount_a.client_pid + self.fs.set_var('session_timeout', SESSION_TIMEOUT) + self.fs.set_var('session_autoclose', SESSION_AUTOCLOSE) + self.assert_session_count(2, self.fs.mds_asok(['session', 'ls'])) + + # test that client holding cap not required by any other client is not + # marked stale when it becomes unresponsive. + self.mount_a.run_shell(['mkdir', 'dir']) + self.mount_a.send_signal('sigstop') + time.sleep(SESSION_TIMEOUT + 2) + self.assert_session_state(mount_a_gid, "open") + + # test that other clients have to wait to get the caps from + # unresponsive client until session_autoclose. + self.mount_b.run_shell(['stat', 'dir']) + self.assert_session_count(1, self.fs.mds_asok(['session', 'ls'])) + self.assertLess(time.time(), time_at_beg + SESSION_AUTOCLOSE) + + self.mount_a.send_signal('sigcont') + + def test_config_session_timeout(self): + self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false']) + session_timeout = self.fs.get_var("session_timeout") + mount_a_gid = self.mount_a.get_global_id() + + self.fs.mds_asok(['session', 'config', '%s' % mount_a_gid, 'timeout', '%s' % (session_timeout * 2)]) + + self.mount_a.kill(); + + self.assert_session_count(2) + + time.sleep(session_timeout * 1.5) + self.assert_session_state(mount_a_gid, "open") + + time.sleep(session_timeout) + self.assert_session_count(1) + + self.mount_a.kill_cleanup() + + def test_reconnect_after_blocklisted(self): + """ + Test reconnect after blocklisted. + - writing to a fd that was opened before blocklist should return -EBADF + - reading/writing to a file with lost file locks should return -EIO + - readonly fd should continue to work + """ + + self.mount_a.umount_wait() + + if isinstance(self.mount_a, FuseMount): + self.mount_a.mount_wait(mntopts=['--client_reconnect_stale=1', '--fuse_disable_pagecache=1']) + else: + try: + self.mount_a.mount_wait(mntopts=['recover_session=clean']) + except CommandFailedError: + self.mount_a.kill_cleanup() + self.skipTest("Not implemented in current kernel") + + self.mount_a.wait_until_mounted() + + path = os.path.join(self.mount_a.mountpoint, 'testfile_reconnect_after_blocklisted') + pyscript = dedent(""" + import os + import sys + import fcntl + import errno + import time + + fd1 = os.open("{path}.1", os.O_RDWR | os.O_CREAT, 0O666) + fd2 = os.open("{path}.1", os.O_RDONLY) + fd3 = os.open("{path}.2", os.O_RDWR | os.O_CREAT, 0O666) + fd4 = os.open("{path}.2", os.O_RDONLY) + + os.write(fd1, b'content') + os.read(fd2, 1); + + os.write(fd3, b'content') + os.read(fd4, 1); + fcntl.flock(fd4, fcntl.LOCK_SH | fcntl.LOCK_NB) + + print("blocklist") + sys.stdout.flush() + + sys.stdin.readline() + + # wait for mds to close session + time.sleep(10); + + # trigger 'open session' message. kclient relies on 'session reject' message + # to detect if itself is blocklisted + try: + os.stat("{path}.1") + except: + pass + + # wait for auto reconnect + time.sleep(10); + + try: + os.write(fd1, b'content') + except OSError as e: + if e.errno != errno.EBADF: + raise + else: + raise RuntimeError("write() failed to raise error") + + os.read(fd2, 1); + + try: + os.read(fd4, 1) + except OSError as e: + if e.errno != errno.EIO: + raise + else: + raise RuntimeError("read() failed to raise error") + """).format(path=path) + rproc = self.mount_a.client_remote.run( + args=['python3', '-c', pyscript], + wait=False, stdin=run.PIPE, stdout=run.PIPE) + + rproc.stdout.readline() + + mount_a_client_id = self.mount_a.get_global_id() + self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) + + rproc.stdin.writelines(['done\n']) + rproc.stdin.flush() + + rproc.wait() + self.assertEqual(rproc.exitstatus, 0) |