import json import time import logging from textwrap import dedent import datetime import gevent from teuthology.orchestra.run import CommandFailedError, Raw from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology log = logging.getLogger(__name__) class TestStrays(CephFSTestCase): MDSS_REQUIRED = 2 OPS_THROTTLE = 1 FILES_THROTTLE = 2 # Range of different file sizes used in throttle test's workload throttle_workload_size_range = 16 @for_teuthology def test_ops_throttle(self): self._test_throttling(self.OPS_THROTTLE) @for_teuthology def test_files_throttle(self): self._test_throttling(self.FILES_THROTTLE) def test_dir_deletion(self): """ That when deleting a bunch of dentries and the containing directory, everything gets purged. Catches cases where the client might e.g. fail to trim the unlinked dir from its cache. """ file_count = 1000 create_script = dedent(""" import os mountpoint = "{mountpoint}" subdir = "delete_me" size = {size} file_count = {file_count} os.mkdir(os.path.join(mountpoint, subdir)) for i in range(0, file_count): filename = "{{0}}_{{1}}.bin".format(i, size) with open(os.path.join(mountpoint, subdir, filename), 'w') as f: f.write(size * 'x') """.format( mountpoint=self.mount_a.mountpoint, size=1024, file_count=file_count )) self.mount_a.run_python(create_script) # That the dirfrag object is created self.fs.mds_asok(["flush", "journal"]) dir_ino = self.mount_a.path_to_ino("delete_me") self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0)) # Remove everything self.mount_a.run_shell(["rm", "-rf", "delete_me"]) self.fs.mds_asok(["flush", "journal"]) # That all the removed files get created as strays strays = self.get_mdc_stat("strays_created") self.assertEqual(strays, file_count + 1) # That the strays all get enqueued for purge self.wait_until_equal( lambda: self.get_mdc_stat("strays_enqueued"), strays, timeout=600 ) # That all the purge operations execute self.wait_until_equal( lambda: self.get_stat("purge_queue", "pq_executed"), strays, timeout=600 ) # That finally, the directory metadata object is gone self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0)) # That finally, the data objects are all gone self.await_data_pool_empty() def _test_throttling(self, throttle_type): self.data_log = [] try: return self._do_test_throttling(throttle_type) except: for l in self.data_log: log.info(",".join([l_.__str__() for l_ in l])) raise def _do_test_throttling(self, throttle_type): """ That the mds_max_purge_ops setting is respected """ def set_throttles(files, ops): """ Helper for updating ops/files limits, and calculating effective ops_per_pg setting to give the same ops limit. """ self.set_conf('mds', 'mds_max_purge_files', "%d" % files) self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops) pgs = self.fs.mon_manager.get_pool_int_property( self.fs.get_data_pool_name(), "pg_num" ) ops_per_pg = float(ops) / pgs self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg) # Test conditions depend on what we're going to be exercising. # * Lift the threshold on whatever throttle we are *not* testing, so # that the throttle of interest is the one that will be the bottleneck # * Create either many small files (test file count throttling) or fewer # large files (test op throttling) if throttle_type == self.OPS_THROTTLE: set_throttles(files=100000000, ops=16) size_unit = 1024 * 1024 # big files, generate lots of ops file_multiplier = 100 elif throttle_type == self.FILES_THROTTLE: # The default value of file limit is pretty permissive, so to avoid # the test running too fast, create lots of files and set the limit # pretty low. set_throttles(ops=100000000, files=6) size_unit = 1024 # small, numerous files file_multiplier = 200 else: raise NotImplementedError(throttle_type) # Pick up config changes self.fs.mds_fail_restart() self.fs.wait_for_daemons() create_script = dedent(""" import os mountpoint = "{mountpoint}" subdir = "delete_me" size_unit = {size_unit} file_multiplier = {file_multiplier} os.mkdir(os.path.join(mountpoint, subdir)) for i in range(0, file_multiplier): for size in range(0, {size_range}*size_unit, size_unit): filename = "{{0}}_{{1}}.bin".format(i, size // size_unit) with open(os.path.join(mountpoint, subdir, filename), 'w') as f: f.write(size * 'x') """.format( mountpoint=self.mount_a.mountpoint, size_unit=size_unit, file_multiplier=file_multiplier, size_range=self.throttle_workload_size_range )) self.mount_a.run_python(create_script) # We will run the deletion in the background, to reduce the risk of it completing before # we have started monitoring the stray statistics. def background(): self.mount_a.run_shell(["rm", "-rf", "delete_me"]) self.fs.mds_asok(["flush", "journal"]) background_thread = gevent.spawn(background) total_inodes = file_multiplier * self.throttle_workload_size_range + 1 mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds')) mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds')) # During this phase we look for the concurrent ops to exceed half # the limit (a heuristic) and not exceed the limit (a correctness # condition). purge_timeout = 600 elapsed = 0 files_high_water = 0 ops_high_water = 0 while True: stats = self.fs.mds_asok(['perf', 'dump']) mdc_stats = stats['mds_cache'] pq_stats = stats['purge_queue'] if elapsed >= purge_timeout: raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats)) num_strays = mdc_stats['num_strays'] num_strays_purging = pq_stats['pq_executing'] num_purge_ops = pq_stats['pq_executing_ops'] files_high_water = pq_stats['pq_executing_high_water'] ops_high_water = pq_stats['pq_executing_ops_high_water'] self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops, files_high_water, ops_high_water]) total_strays_created = mdc_stats['strays_created'] total_strays_purged = pq_stats['pq_executed'] if total_strays_purged == total_inodes: log.info("Complete purge in {0} seconds".format(elapsed)) break elif total_strays_purged > total_inodes: raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats)) else: if throttle_type == self.OPS_THROTTLE: # 11 is filer_max_purge_ops plus one for the backtrace: # limit is allowed to be overshot by this much. if num_purge_ops > mds_max_purge_ops + 11: raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format( num_purge_ops, mds_max_purge_ops )) elif throttle_type == self.FILES_THROTTLE: if num_strays_purging > mds_max_purge_files: raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format( num_strays_purging, mds_max_purge_files )) else: raise NotImplementedError(throttle_type) log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format( num_strays_purging, num_strays, total_strays_purged, total_strays_created )) time.sleep(1) elapsed += 1 background_thread.join() # Check that we got up to a respectable rate during the purge. This is totally # racy, but should be safeish unless the cluster is pathologically slow, or # insanely fast such that the deletions all pass before we have polled the # statistics. if throttle_type == self.OPS_THROTTLE: if ops_high_water < mds_max_purge_ops // 2: raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format( ops_high_water, mds_max_purge_ops )) # The MDS may go over mds_max_purge_ops for some items, like a # heavily fragmented directory. The throttle does not kick in # until *after* we reach or exceed the limit. This is expected # because we don't want to starve the PQ or never purge a # particularly large file/directory. self.assertLessEqual(ops_high_water, mds_max_purge_ops+64) elif throttle_type == self.FILES_THROTTLE: if files_high_water < mds_max_purge_files // 2: raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format( files_high_water, mds_max_purge_files )) self.assertLessEqual(files_high_water, mds_max_purge_files) # Sanity check all MDC stray stats stats = self.fs.mds_asok(['perf', 'dump']) mdc_stats = stats['mds_cache'] pq_stats = stats['purge_queue'] self.assertEqual(mdc_stats['num_strays'], 0) self.assertEqual(mdc_stats['num_strays_delayed'], 0) self.assertEqual(pq_stats['pq_executing'], 0) self.assertEqual(pq_stats['pq_executing_ops'], 0) self.assertEqual(mdc_stats['strays_created'], total_inodes) self.assertEqual(mdc_stats['strays_enqueued'], total_inodes) self.assertEqual(pq_stats['pq_executed'], total_inodes) def get_mdc_stat(self, name, mds_id=None): return self.get_stat("mds_cache", name, mds_id) def get_stat(self, subsys, name, mds_id=None): return self.fs.mds_asok(['perf', 'dump', subsys, name], mds_id=mds_id)[subsys][name] def _wait_for_counter(self, subsys, counter, expect_val, timeout=60, mds_id=None): self.wait_until_equal( lambda: self.get_stat(subsys, counter, mds_id), expect_val=expect_val, timeout=timeout, reject_fn=lambda x: x > expect_val ) def test_open_inode(self): """ That the case of a dentry unlinked while a client holds an inode open is handled correctly. The inode should be moved into a stray dentry, while the original dentry and directory should be purged. The inode's data should be purged when the client eventually closes it. """ mount_a_client_id = self.mount_a.get_global_id() # Write some bytes to a file size_mb = 8 # Hold the file open p = self.mount_a.open_background("open_file") self.mount_a.write_n_mb("open_file", size_mb) open_file_ino = self.mount_a.path_to_ino("open_file") self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2) # Unlink the dentry self.mount_a.run_shell(["rm", "-f", "open_file"]) # Wait to see the stray count increment self.wait_until_equal( lambda: self.get_mdc_stat("num_strays"), expect_val=1, timeout=60, reject_fn=lambda x: x > 1) # See that while the stray count has incremented, none have passed # on to the purge queue self.assertEqual(self.get_mdc_stat("strays_created"), 1) self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0) # See that the client still holds 2 caps self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2) # See that the data objects remain in the data pool self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024)) # Now close the file self.mount_a.kill_background(p) # Wait to see the client cap count decrement self.wait_until_equal( lambda: self.get_session(mount_a_client_id)['num_caps'], expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1 ) # Wait to see the purge counter increment, stray count go to zero self._wait_for_counter("mds_cache", "strays_enqueued", 1) self.wait_until_equal( lambda: self.get_mdc_stat("num_strays"), expect_val=0, timeout=6, reject_fn=lambda x: x > 1 ) self._wait_for_counter("purge_queue", "pq_executed", 1) # See that the data objects no longer exist self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024)) self.await_data_pool_empty() def test_reintegration_limit(self): """ That the reintegration is not blocked by full directories. """ LOW_LIMIT = 50 self.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT)) time.sleep(10) # for config to reach MDS; async create is fast!! last_reintegrated = self.get_mdc_stat("strays_reintegrated") self.mount_a.run_shell_payload(""" mkdir a b for i in `seq 1 50`; do touch a/"$i" ln a/"$i" b/"$i" done sync -f a b rm a/* """) self.wait_until_equal( lambda: self.get_mdc_stat("num_strays"), expect_val=0, timeout=60 ) curr_reintegrated = self.get_mdc_stat("strays_reintegrated") self.assertGreater(curr_reintegrated, last_reintegrated) def test_hardlink_reintegration(self): """ That removal of primary dentry of hardlinked inode results in reintegration of inode into the previously-remote dentry, rather than lingering as a stray indefinitely. """ # Write some bytes to file_a size_mb = 8 self.mount_a.run_shell(["mkdir", "dir_1"]) self.mount_a.write_n_mb("dir_1/file_a", size_mb) ino = self.mount_a.path_to_ino("dir_1/file_a") # Create a hardlink named file_b self.mount_a.run_shell(["mkdir", "dir_2"]) self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"]) self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino) # Flush journal self.fs.mds_asok(['flush', 'journal']) # See that backtrace for the file points to the file_a path pre_unlink_bt = self.fs.read_backtrace(ino) self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a") # empty mds cache. otherwise mds reintegrates stray when unlink finishes self.mount_a.umount_wait() self.fs.mds_asok(['flush', 'journal']) self.fs.mds_fail_restart() self.fs.wait_for_daemons() self.mount_a.mount_wait() # Unlink file_a self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"]) # See that a stray was created self.assertEqual(self.get_mdc_stat("num_strays"), 1) self.assertEqual(self.get_mdc_stat("strays_created"), 1) # Wait, see that data objects are still present (i.e. that the # stray did not advance to purging given time) time.sleep(30) self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024)) self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0) # See that before reintegration, the inode's backtrace points to a stray dir self.fs.mds_asok(['flush', 'journal']) self.assertTrue(self.get_backtrace_path(ino).startswith("stray")) last_reintegrated = self.get_mdc_stat("strays_reintegrated") # Do a metadata operation on the remaining link (mv is heavy handed, but # others like touch may be satisfied from caps without poking MDS) self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"]) # Stray reintegration should happen as a result of the eval_remote call # on responding to a client request. self.wait_until_equal( lambda: self.get_mdc_stat("num_strays"), expect_val=0, timeout=60 ) # See the reintegration counter increment curr_reintegrated = self.get_mdc_stat("strays_reintegrated") self.assertGreater(curr_reintegrated, last_reintegrated) last_reintegrated = curr_reintegrated # Flush the journal self.fs.mds_asok(['flush', 'journal']) # See that the backtrace for the file points to the remaining link's path post_reint_bt = self.fs.read_backtrace(ino) self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c") # mds should reintegrates stray when unlink finishes self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"]) self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"]) # Stray reintegration should happen as a result of the notify_stray call # on completion of unlink self.wait_until_equal( lambda: self.get_mdc_stat("num_strays"), expect_val=0, timeout=60 ) # See the reintegration counter increment curr_reintegrated = self.get_mdc_stat("strays_reintegrated") self.assertGreater(curr_reintegrated, last_reintegrated) last_reintegrated = curr_reintegrated # Flush the journal self.fs.mds_asok(['flush', 'journal']) # See that the backtrace for the file points to the newest link's path post_reint_bt = self.fs.read_backtrace(ino) self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d") # Now really delete it self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"]) self._wait_for_counter("mds_cache", "strays_enqueued", 1) self._wait_for_counter("purge_queue", "pq_executed", 1) self.assert_purge_idle() self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024)) # We caused the inode to go stray 3 times self.assertEqual(self.get_mdc_stat("strays_created"), 3) # We purged it at the last self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1) def test_reintegration_via_scrub(self): """ That reintegration is triggered via recursive scrub. """ self.mount_a.run_shell_payload(""" mkdir -p a b for i in `seq 1 50`; do touch a/"$i" ln a/"$i" b/"$i" done sync -f . """) self.mount_a.remount() # drop caps/cache self.fs.rank_tell(["flush", "journal"]) self.fs.rank_fail() self.fs.wait_for_daemons() # only / in cache, reintegration cannot happen self.wait_until_equal( lambda: len(self.fs.rank_tell(["dump", "tree", "/"])), expect_val=3, timeout=60 ) last_reintegrated = self.get_mdc_stat("strays_reintegrated") self.mount_a.run_shell_payload(""" rm a/* sync -f . """) self.wait_until_equal( lambda: len(self.fs.rank_tell(["dump", "tree", "/"])), expect_val=3, timeout=60 ) self.assertEqual(self.get_mdc_stat("num_strays"), 50) curr_reintegrated = self.get_mdc_stat("strays_reintegrated") self.assertEqual(last_reintegrated, curr_reintegrated) self.fs.rank_tell(["scrub", "start", "/", "recursive,force"]) self.wait_until_equal( lambda: self.get_mdc_stat("num_strays"), expect_val=0, timeout=60 ) curr_reintegrated = self.get_mdc_stat("strays_reintegrated") # N.B.: reintegrate (rename RPC) may be tried multiple times from different code paths self.assertGreaterEqual(curr_reintegrated, last_reintegrated+50) def test_mv_hardlink_cleanup(self): """ That when doing a rename from A to B, and B has hardlinks, then we make a stray for B which is then reintegrated into one of his hardlinks. """ # Create file_a, file_b, and a hardlink to file_b size_mb = 8 self.mount_a.write_n_mb("file_a", size_mb) file_a_ino = self.mount_a.path_to_ino("file_a") self.mount_a.write_n_mb("file_b", size_mb) file_b_ino = self.mount_a.path_to_ino("file_b") self.mount_a.run_shell(["ln", "file_b", "linkto_b"]) self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino) # mv file_a file_b self.mount_a.run_shell(["mv", "file_a", "file_b"]) # Stray reintegration should happen as a result of the notify_stray call on # completion of rename self.wait_until_equal( lambda: self.get_mdc_stat("num_strays"), expect_val=0, timeout=60 ) self.assertEqual(self.get_mdc_stat("strays_created"), 1) self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1) # No data objects should have been deleted, as both files still have linkage. self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024)) self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024)) self.fs.mds_asok(['flush', 'journal']) post_reint_bt = self.fs.read_backtrace(file_b_ino) self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b") def _setup_two_ranks(self): # Set up two MDSs self.fs.set_max_mds(2) # See that we have two active MDSs self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30, reject_fn=lambda v: v > 2 or v < 1) active_mds_names = self.fs.get_active_names() rank_0_id = active_mds_names[0] rank_1_id = active_mds_names[1] log.info("Ranks 0 and 1 are {0} and {1}".format( rank_0_id, rank_1_id)) # Get rid of other MDS daemons so that it's easier to know which # daemons to expect in which ranks after restarts for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}: self.mds_cluster.mds_stop(unneeded_mds) self.mds_cluster.mds_fail(unneeded_mds) return rank_0_id, rank_1_id def _force_migrate(self, path, rank=1): """ :param to_id: MDS id to move it to :param path: Filesystem path (string) to move :return: None """ self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", str(rank), path]) rpath = "/"+path self._wait_subtrees([(rpath, rank)], rank=rank, path=rpath) def _is_stopped(self, rank): mds_map = self.fs.get_mds_map() return rank not in [i['rank'] for i in mds_map['info'].values()] def test_purge_on_shutdown(self): """ That when an MDS rank is shut down, its purge queue is drained in the process. """ rank_0_id, rank_1_id = self._setup_two_ranks() self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0") self.mds_cluster.mds_fail_restart(rank_1_id) self.fs.wait_for_daemons() file_count = 5 self.mount_a.create_n_files("delete_me/file", file_count) self._force_migrate("delete_me") self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")]) self.mount_a.umount_wait() # See all the strays go into purge queue self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id) self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id) self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0) # See nothing get purged from the purge queue (yet) time.sleep(10) self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0) # Shut down rank 1 self.fs.set_max_mds(1) # It shouldn't proceed past stopping because its still not allowed # to purge time.sleep(10) self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0) self.assertFalse(self._is_stopped(1)) # Permit the daemon to start purging again self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id), 'injectargs', "--mds_max_purge_files 100") # It should now proceed through shutdown self.fs.wait_for_daemons(timeout=120) # ...and in the process purge all that data self.await_data_pool_empty() def test_migration_on_shutdown(self): """ That when an MDS rank is shut down, any non-purgeable strays get migrated to another rank. """ rank_0_id, rank_1_id = self._setup_two_ranks() # Create a non-purgeable stray in a ~mds1 stray directory # by doing a hard link and deleting the original file self.mount_a.run_shell_payload(""" mkdir dir_1 dir_2 touch dir_1/original ln dir_1/original dir_2/linkto """) self._force_migrate("dir_1") self._force_migrate("dir_2", rank=0) # empty mds cache. otherwise mds reintegrates stray when unlink finishes self.mount_a.umount_wait() self.fs.mds_asok(['flush', 'journal'], rank_1_id) self.fs.mds_asok(['cache', 'drop'], rank_1_id) self.mount_a.mount_wait() self.mount_a.run_shell(["rm", "-f", "dir_1/original"]) self.mount_a.umount_wait() self._wait_for_counter("mds_cache", "strays_created", 1, mds_id=rank_1_id) # Shut down rank 1 self.fs.set_max_mds(1) self.fs.wait_for_daemons(timeout=120) # See that the stray counter on rank 0 has incremented self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1) def test_migrate_unlinked_dir(self): """ Reproduce https://tracker.ceph.com/issues/53597 """ rank_0_id, rank_1_id = self._setup_two_ranks() self.mount_a.run_shell_payload(""" mkdir pin touch pin/placeholder """) self._force_migrate("pin") # Hold the dir open so it cannot be purged p = self.mount_a.open_dir_background("pin/to-be-unlinked") # Unlink the dentry self.mount_a.run_shell(["rmdir", "pin/to-be-unlinked"]) # Wait to see the stray count increment self.wait_until_equal( lambda: self.get_mdc_stat("num_strays", mds_id=rank_1_id), expect_val=1, timeout=60, reject_fn=lambda x: x > 1) # but not purged self.assertEqual(self.get_mdc_stat("strays_created", mds_id=rank_1_id), 1) self.assertEqual(self.get_mdc_stat("strays_enqueued", mds_id=rank_1_id), 0) # Test loading unlinked dir into cache self.fs.mds_asok(['flush', 'journal'], rank_1_id) self.fs.mds_asok(['cache', 'drop'], rank_1_id) # Shut down rank 1 self.fs.set_max_mds(1) self.fs.wait_for_daemons(timeout=120) # Now the stray should be migrated to rank 0 # self.assertEqual(self.get_mdc_stat("strays_created", mds_id=rank_0_id), 1) # https://github.com/ceph/ceph/pull/44335#issuecomment-1125940158 self.mount_a.kill_background(p) def assert_backtrace(self, ino, expected_path): """ Assert that the backtrace in the data pool for an inode matches an expected /foo/bar path. """ expected_elements = expected_path.strip("/").split("/") bt = self.fs.read_backtrace(ino) actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']])) self.assertListEqual(expected_elements, actual_elements) def get_backtrace_path(self, ino): bt = self.fs.read_backtrace(ino) elements = reversed([dn['dname'] for dn in bt['ancestors']]) return "/".join(elements) def assert_purge_idle(self): """ Assert that the MDS perf counters indicate no strays exist and no ongoing purge activity. Sanity check for when PurgeQueue should be idle. """ mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache'] pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue'] self.assertEqual(mdc_stats["num_strays"], 0) self.assertEqual(mdc_stats["num_strays_delayed"], 0) self.assertEqual(pq_stats["pq_executing"], 0) self.assertEqual(pq_stats["pq_executing_ops"], 0) def test_mv_cleanup(self): """ That when doing a rename from A to B, and B has no hardlinks, then we make a stray for B and purge him. """ # Create file_a and file_b, write some to both size_mb = 8 self.mount_a.write_n_mb("file_a", size_mb) file_a_ino = self.mount_a.path_to_ino("file_a") self.mount_a.write_n_mb("file_b", size_mb) file_b_ino = self.mount_a.path_to_ino("file_b") self.fs.mds_asok(['flush', 'journal']) self.assert_backtrace(file_a_ino, "file_a") self.assert_backtrace(file_b_ino, "file_b") # mv file_a file_b self.mount_a.run_shell(['mv', 'file_a', 'file_b']) # See that stray counter increments self.assertEqual(self.get_mdc_stat("strays_created"), 1) # Wait for purge counter to increment self._wait_for_counter("mds_cache", "strays_enqueued", 1) self._wait_for_counter("purge_queue", "pq_executed", 1) self.assert_purge_idle() # file_b should have been purged self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024)) # Backtrace should have updated from file_a to file_b self.fs.mds_asok(['flush', 'journal']) self.assert_backtrace(file_a_ino, "file_b") # file_a's data should still exist self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024)) def _pool_df(self, pool_name): """ Return a dict like { "kb_used": 0, "bytes_used": 0, "max_avail": 19630292406, "objects": 0 } :param pool_name: Which pool (must exist) """ out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty") for p in json.loads(out)['pools']: if p['name'] == pool_name: return p['stats'] raise RuntimeError("Pool '{0}' not found".format(pool_name)) def await_data_pool_empty(self): self.wait_until_true( lambda: self._pool_df( self.fs.get_data_pool_name() )['objects'] == 0, timeout=60) def test_snapshot_remove(self): """ That removal of a snapshot that references a now-unlinked file results in purging on the stray for the file. """ # Enable snapshots self.fs.set_allow_new_snaps(True) # Create a dir with a file in it size_mb = 8 self.mount_a.run_shell(["mkdir", "snapdir"]) self.mount_a.run_shell(["mkdir", "snapdir/subdir"]) self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024) file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a") # Snapshot the dir self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"]) # Cause the head revision to deviate from the snapshot self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb) # Flush the journal so that backtraces, dirfrag objects will actually be written self.fs.mds_asok(["flush", "journal"]) # Unlink the file self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"]) self.mount_a.run_shell(["rmdir", "snapdir/subdir"]) # Unmount the client because when I come back to check the data is still # in the file I don't want to just see what's in the page cache. self.mount_a.umount_wait() self.assertEqual(self.get_mdc_stat("strays_created"), 2) # FIXME: at this stage we see a purge and the stray count drops to # zero, but there's actually still a stray, so at the very # least the StrayManager stats code is slightly off self.mount_a.mount_wait() # See that the data from the snapshotted revision of the file is still present # and correct self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024) # Remove the snapshot self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"]) # Purging file_a doesn't happen until after we've flushed the journal, because # it is referenced by the snapshotted subdir, and the snapshot isn't really # gone until the journal references to it are gone self.fs.mds_asok(["flush", "journal"]) # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs. # See also: http://tracker.ceph.com/issues/20072 self.wait_until_true( lambda: self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024), timeout=60 ) # See that a purge happens now self._wait_for_counter("mds_cache", "strays_enqueued", 2) self._wait_for_counter("purge_queue", "pq_executed", 2) self.await_data_pool_empty() def test_fancy_layout(self): """ purge stray file with fancy layout """ file_name = "fancy_layout_file" self.mount_a.run_shell(["touch", file_name]) file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608" self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout) # 35MB requires 7 objects size_mb = 35 self.mount_a.write_n_mb(file_name, size_mb) self.mount_a.run_shell(["rm", "-f", file_name]) self.fs.mds_asok(["flush", "journal"]) # can't use self.fs.data_objects_absent here, it does not support fancy layout self.await_data_pool_empty() def test_dirfrag_limit(self): """ That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations). """ LOW_LIMIT = 50 self.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT)) time.sleep(10) # for config to reach MDS; async create is fast!! try: self.mount_a.create_n_files("subdir/file", LOW_LIMIT+1, finaldirsync=True) except CommandFailedError: pass # ENOSPC else: self.fail("fragment size exceeded") def test_dirfrag_limit_fragmented(self): """ That fragmentation (forced) will allow more entries to be created. """ LOW_LIMIT = 50 self.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT)) self.config_set('mds', 'mds_bal_merge_size', 1) # disable merging time.sleep(10) # for config to reach MDS; async create is fast!! # Test that we can go beyond the limit if we fragment the directory self.mount_a.create_n_files("subdir/file", LOW_LIMIT, finaldirsync=True) self.mount_a.umount_wait() # release client caps # Ensure that subdir is fragmented self.fs.rank_asok(["dirfrag", "split", "/subdir", "0/0", "1"]) self.fs.rank_asok(["flush", "journal"]) # Create 50% more files than the current fragment limit self.mount_a.mount_wait() self.mount_a.create_n_files("subdir/file", (LOW_LIMIT*3)//2, finaldirsync=True) def test_dirfrag_limit_strays(self): """ That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged. """ LOW_LIMIT = 10 # N.B. this test is inherently racy because stray removal may be faster # than slow(er) file creation. self.config_set('mds', 'mds_bal_fragment_size_max', LOW_LIMIT) time.sleep(10) # for config to reach MDS; async create is fast!! # Now test the stray directory size is limited and recovers strays_before = self.get_mdc_stat("strays_created") try: # 10 stray directories: expect collisions self.mount_a.create_n_files("subdir/file", LOW_LIMIT*10, finaldirsync=True, unlink=True) except CommandFailedError: pass # ENOSPC else: self.fail("fragment size exceeded") strays_after = self.get_mdc_stat("strays_created") self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT) self._wait_for_counter("mds_cache", "strays_enqueued", strays_after) self._wait_for_counter("purge_queue", "pq_executed", strays_after) # verify new files can be created and unlinked self.mount_a.create_n_files("subdir/file", LOW_LIMIT, dirsync=True, unlink=True) def test_purge_queue_upgrade(self): """ That when starting on a system with no purge queue in the metadata pool, we silently create one. :return: """ self.mds_cluster.mds_stop() self.mds_cluster.mds_fail() self.fs.radosm(["rm", "500.00000000"]) self.mds_cluster.mds_restart() self.fs.wait_for_daemons() def test_replicated_delete_speed(self): """ That deletions of replicated metadata are not pathologically slow """ rank_0_id, rank_1_id = self._setup_two_ranks() self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0") self.mds_cluster.mds_fail_restart(rank_1_id) self.fs.wait_for_daemons() file_count = 10 self.mount_a.create_n_files("delete_me/file", file_count) self._force_migrate("delete_me") begin = datetime.datetime.now() self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")]) end = datetime.datetime.now() # What we're really checking here is that we are completing client # operations immediately rather than delaying until the next tick. tick_period = float(self.fs.get_config("mds_tick_interval", service_type="mds")) duration = (end - begin).total_seconds() self.assertLess(duration, (file_count * tick_period) * 0.25)