diff options
Diffstat (limited to 'qa/tasks/cephfs/cephfs_test_case.py')
-rw-r--r-- | qa/tasks/cephfs/cephfs_test_case.py | 462 |
1 files changed, 462 insertions, 0 deletions
diff --git a/qa/tasks/cephfs/cephfs_test_case.py b/qa/tasks/cephfs/cephfs_test_case.py new file mode 100644 index 000000000..41831dac6 --- /dev/null +++ b/qa/tasks/cephfs/cephfs_test_case.py @@ -0,0 +1,462 @@ +import json +import logging +import os +import re + +from shlex import split as shlex_split + +from tasks.ceph_test_case import CephTestCase + +from teuthology import contextutil +from teuthology.orchestra import run +from teuthology.orchestra.run import CommandFailedError + +log = logging.getLogger(__name__) + +def for_teuthology(f): + """ + Decorator that adds an "is_for_teuthology" attribute to the wrapped function + """ + f.is_for_teuthology = True + return f + + +def needs_trimming(f): + """ + Mark fn as requiring a client capable of trimming its cache (i.e. for ceph-fuse + this means it needs to be able to run as root, currently) + """ + f.needs_trimming = True + return f + + +class MountDetails(): + + def __init__(self, mntobj): + self.client_id = mntobj.client_id + self.client_keyring_path = mntobj.client_keyring_path + self.client_remote = mntobj.client_remote + self.cephfs_name = mntobj.cephfs_name + self.cephfs_mntpt = mntobj.cephfs_mntpt + self.hostfs_mntpt = mntobj.hostfs_mntpt + + def restore(self, mntobj): + mntobj.client_id = self.client_id + mntobj.client_keyring_path = self.client_keyring_path + mntobj.client_remote = self.client_remote + mntobj.cephfs_name = self.cephfs_name + mntobj.cephfs_mntpt = self.cephfs_mntpt + mntobj.hostfs_mntpt = self.hostfs_mntpt + + +class CephFSTestCase(CephTestCase): + """ + Test case for Ceph FS, requires caller to populate Filesystem and Mounts, + into the fs, mount_a, mount_b class attributes (setting mount_b is optional) + + Handles resetting the cluster under test between tests. + """ + + # FIXME weird explicit naming + mount_a = None + mount_b = None + recovery_mount = None + + # Declarative test requirements: subclasses should override these to indicate + # their special needs. If not met, tests will be skipped. + CLIENTS_REQUIRED = 1 + MDSS_REQUIRED = 1 + REQUIRE_KCLIENT_REMOTE = False + REQUIRE_ONE_CLIENT_REMOTE = False + + # Whether to create the default filesystem during setUp + REQUIRE_FILESYSTEM = True + + # requires REQUIRE_FILESYSTEM = True + REQUIRE_RECOVERY_FILESYSTEM = False + + # create a backup filesystem if required. + # required REQUIRE_FILESYSTEM enabled + REQUIRE_BACKUP_FILESYSTEM = False + + LOAD_SETTINGS = [] # type: ignore + + def _save_mount_details(self): + """ + XXX: Tests may change details of mount objects, so let's stash them so + that these details are restored later to ensure smooth setUps and + tearDowns for upcoming tests. + """ + self._orig_mount_details = [MountDetails(m) for m in self.mounts] + log.info(self._orig_mount_details) + + def _remove_blocklist(self): + # In case anything is in the OSD blocklist list, clear it out. This is to avoid + # the OSD map changing in the background (due to blocklist expiry) while tests run. + try: + self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blocklist", "clear") + except CommandFailedError: + # Fallback for older Ceph cluster + try: + blocklist = json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd("osd", + "dump", "--format=json-pretty"))['blocklist'] + log.info(f"Removing {len(blocklist)} blocklist entries") + for addr, blocklisted_at in blocklist.items(): + self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blocklist", "rm", addr) + except KeyError: + # Fallback for more older Ceph clusters, who will use 'blacklist' instead. + blacklist = json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd("osd", + "dump", "--format=json-pretty"))['blacklist'] + log.info(f"Removing {len(blacklist)} blacklist entries") + for addr, blocklisted_at in blacklist.items(): + self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blacklist", "rm", addr) + + def setUp(self): + super(CephFSTestCase, self).setUp() + + self.config_set('mon', 'mon_allow_pool_delete', True) + + if len(self.mds_cluster.mds_ids) < self.MDSS_REQUIRED: + self.skipTest("Only have {0} MDSs, require {1}".format( + len(self.mds_cluster.mds_ids), self.MDSS_REQUIRED + )) + + if len(self.mounts) < self.CLIENTS_REQUIRED: + self.skipTest("Only have {0} clients, require {1}".format( + len(self.mounts), self.CLIENTS_REQUIRED + )) + + if self.REQUIRE_ONE_CLIENT_REMOTE: + if self.mounts[0].client_remote.hostname in self.mds_cluster.get_mds_hostnames(): + self.skipTest("Require first client to be on separate server from MDSs") + + # Create friendly mount_a, mount_b attrs + for i in range(0, self.CLIENTS_REQUIRED): + setattr(self, "mount_{0}".format(chr(ord('a') + i)), self.mounts[i]) + + self.mds_cluster.clear_firewall() + + # Unmount all clients, we are about to blow away the filesystem + for mount in self.mounts: + if mount.is_mounted(): + mount.umount_wait(force=True) + self._save_mount_details() + + # To avoid any issues with e.g. unlink bugs, we destroy and recreate + # the filesystem rather than just doing a rm -rf of files + self.mds_cluster.delete_all_filesystems() + self.mds_cluster.mds_restart() # to reset any run-time configs, etc. + self.fs = None # is now invalid! + self.backup_fs = None + self.recovery_fs = None + + self._remove_blocklist() + + client_mount_ids = [m.client_id for m in self.mounts] + # In case there were any extra auth identities around from a previous + # test, delete them + for entry in self.auth_list(): + ent_type, ent_id = entry['entity'].split(".") + if ent_type == "client" and ent_id not in client_mount_ids and not (ent_id == "admin" or ent_id[:6] == 'mirror'): + self.mds_cluster.mon_manager.raw_cluster_cmd("auth", "del", entry['entity']) + + if self.REQUIRE_FILESYSTEM: + self.fs = self.mds_cluster.newfs(create=True) + + # In case some test messed with auth caps, reset them + for client_id in client_mount_ids: + cmd = ['auth', 'caps', f'client.{client_id}', 'mon','allow r', + 'osd', f'allow rw pool={self.fs.get_data_pool_name()}', + 'mds', 'allow'] + + if self.run_cluster_cmd_result(cmd) == 0: + break + + cmd[1] = 'add' + if self.run_cluster_cmd_result(cmd) != 0: + raise RuntimeError(f'Failed to create new client {cmd[2]}') + + # wait for ranks to become active + self.fs.wait_for_daemons() + + # Mount the requested number of clients + for i in range(0, self.CLIENTS_REQUIRED): + self.mounts[i].mount_wait() + + if self.REQUIRE_BACKUP_FILESYSTEM: + if not self.REQUIRE_FILESYSTEM: + self.skipTest("backup filesystem requires a primary filesystem as well") + self.fs.mon_manager.raw_cluster_cmd('fs', 'flag', 'set', + 'enable_multiple', 'true', + '--yes-i-really-mean-it') + self.backup_fs = self.mds_cluster.newfs(name="backup_fs") + self.backup_fs.wait_for_daemons() + + if self.REQUIRE_RECOVERY_FILESYSTEM: + if not self.REQUIRE_FILESYSTEM: + self.skipTest("Recovery filesystem requires a primary filesystem as well") + # After Octopus is EOL, we can remove this setting: + self.fs.mon_manager.raw_cluster_cmd('fs', 'flag', 'set', + 'enable_multiple', 'true', + '--yes-i-really-mean-it') + self.recovery_fs = self.mds_cluster.newfs(name="recovery_fs", create=False) + self.recovery_fs.set_metadata_overlay(True) + self.recovery_fs.set_data_pool_name(self.fs.get_data_pool_name()) + self.recovery_fs.create() + self.recovery_fs.getinfo(refresh=True) + self.recovery_fs.wait_for_daemons() + + # Load an config settings of interest + for setting in self.LOAD_SETTINGS: + setattr(self, setting, float(self.fs.mds_asok( + ['config', 'get', setting], list(self.mds_cluster.mds_ids)[0] + )[setting])) + + self.configs_set = set() + + def tearDown(self): + self.mds_cluster.clear_firewall() + for m in self.mounts: + m.teardown() + + # To prevent failover messages during Unwind of ceph task + self.mds_cluster.delete_all_filesystems() + + for m, md in zip(self.mounts, self._orig_mount_details): + md.restore(m) + + for subsys, key in self.configs_set: + self.mds_cluster.clear_ceph_conf(subsys, key) + + return super(CephFSTestCase, self).tearDown() + + def set_conf(self, subsys, key, value): + self.configs_set.add((subsys, key)) + self.mds_cluster.set_ceph_conf(subsys, key, value) + + def auth_list(self): + """ + Convenience wrapper on "ceph auth ls" + """ + return json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd( + "auth", "ls", "--format=json-pretty" + ))['auth_dump'] + + def assert_session_count(self, expected, ls_data=None, mds_id=None): + if ls_data is None: + ls_data = self.fs.mds_asok(['session', 'ls'], mds_id=mds_id) + + alive_count = len([s for s in ls_data if s['state'] != 'killing']) + + self.assertEqual(expected, alive_count, "Expected {0} sessions, found {1}".format( + expected, alive_count + )) + + def assert_session_state(self, client_id, expected_state): + self.assertEqual( + self._session_by_id( + self.fs.mds_asok(['session', 'ls'])).get(client_id, {'state': None})['state'], + expected_state) + + def get_session_data(self, client_id): + return self._session_by_id(client_id) + + def _session_list(self): + ls_data = self.fs.mds_asok(['session', 'ls']) + ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']] + return ls_data + + def get_session(self, client_id, session_ls=None): + if session_ls is None: + session_ls = self.fs.mds_asok(['session', 'ls']) + + return self._session_by_id(session_ls)[client_id] + + def _session_by_id(self, session_ls): + return dict([(s['id'], s) for s in session_ls]) + + def perf_dump(self, rank=None, status=None): + return self.fs.rank_asok(['perf', 'dump'], rank=rank, status=status) + + def wait_until_evicted(self, client_id, timeout=30): + def is_client_evicted(): + ls = self._session_list() + for s in ls: + if s['id'] == client_id: + return False + return True + self.wait_until_true(is_client_evicted, timeout) + + def wait_for_daemon_start(self, daemon_ids=None): + """ + Wait until all the daemons appear in the FSMap, either assigned + MDS ranks or in the list of standbys + """ + def get_daemon_names(): + return [info['name'] for info in self.mds_cluster.status().get_all()] + + if daemon_ids is None: + daemon_ids = self.mds_cluster.mds_ids + + try: + self.wait_until_true( + lambda: set(daemon_ids) & set(get_daemon_names()) == set(daemon_ids), + timeout=30 + ) + except RuntimeError: + log.warning("Timeout waiting for daemons {0}, while we have {1}".format( + daemon_ids, get_daemon_names() + )) + raise + + def delete_mds_coredump(self, daemon_id): + # delete coredump file, otherwise teuthology.internal.coredump will + # catch it later and treat it as a failure. + core_pattern = self.mds_cluster.mds_daemons[daemon_id].remote.sh( + "sudo sysctl -n kernel.core_pattern") + core_dir = os.path.dirname(core_pattern.strip()) + if core_dir: # Non-default core_pattern with a directory in it + # We have seen a core_pattern that looks like it's from teuthology's coredump + # task, so proceed to clear out the core file + if core_dir[0] == '|': + log.info("Piped core dumps to program {0}, skip cleaning".format(core_dir[1:])) + return; + + log.info("Clearing core from directory: {0}".format(core_dir)) + + # Verify that we see the expected single coredump + ls_output = self.mds_cluster.mds_daemons[daemon_id].remote.sh([ + "cd", core_dir, run.Raw('&&'), + "sudo", "ls", run.Raw('|'), "sudo", "xargs", "file" + ]) + cores = [l.partition(":")[0] + for l in ls_output.strip().split("\n") + if re.match(r'.*ceph-mds.* -i +{0}'.format(daemon_id), l)] + + log.info("Enumerated cores: {0}".format(cores)) + self.assertEqual(len(cores), 1) + + log.info("Found core file {0}, deleting it".format(cores[0])) + + self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[ + "cd", core_dir, run.Raw('&&'), "sudo", "rm", "-f", cores[0] + ]) + else: + log.info("No core_pattern directory set, nothing to clear (internal.coredump not enabled?)") + + def _get_subtrees(self, status=None, rank=None, path=None): + if path is None: + path = "/" + try: + with contextutil.safe_while(sleep=1, tries=3) as proceed: + while proceed(): + try: + if rank == "all": + subtrees = [] + for r in self.fs.get_ranks(status=status): + s = self.fs.rank_asok(["get", "subtrees"], status=status, rank=r['rank']) + s = filter(lambda s: s['auth_first'] == r['rank'] and s['auth_second'] == -2, s) + subtrees += s + else: + subtrees = self.fs.rank_asok(["get", "subtrees"], status=status, rank=rank) + subtrees = filter(lambda s: s['dir']['path'].startswith(path), subtrees) + return list(subtrees) + except CommandFailedError as e: + # Sometimes we get transient errors + if e.exitstatus == 22: + pass + else: + raise + except contextutil.MaxWhileTries as e: + raise RuntimeError(f"could not get subtree state from rank {rank}") from e + + def _wait_subtrees(self, test, status=None, rank=None, timeout=30, sleep=2, action=None, path=None): + test = sorted(test) + try: + with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed: + while proceed(): + subtrees = self._get_subtrees(status=status, rank=rank, path=path) + filtered = sorted([(s['dir']['path'], s['auth_first']) for s in subtrees]) + log.info("%s =?= %s", filtered, test) + if filtered == test: + # Confirm export_pin in output is correct: + for s in subtrees: + if s['export_pin_target'] >= 0: + self.assertTrue(s['export_pin_target'] == s['auth_first']) + return subtrees + if action is not None: + action() + except contextutil.MaxWhileTries as e: + raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e + + def _wait_until_scrub_complete(self, path="/", recursive=True, timeout=100): + out_json = self.fs.run_scrub(["start", path] + ["recursive"] if recursive else []) + if not self.fs.wait_until_scrub_complete(tag=out_json["scrub_tag"], + sleep=10, timeout=timeout): + log.info("timed out waiting for scrub to complete") + + def _wait_distributed_subtrees(self, count, status=None, rank=None, path=None): + try: + with contextutil.safe_while(sleep=5, tries=20) as proceed: + while proceed(): + subtrees = self._get_subtrees(status=status, rank=rank, path=path) + subtrees = list(filter(lambda s: s['distributed_ephemeral_pin'] == True and + s['auth_first'] == s['export_pin_target'], + subtrees)) + log.info(f"len={len(subtrees)} {subtrees}") + if len(subtrees) >= count: + return subtrees + except contextutil.MaxWhileTries as e: + raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e + + def _wait_random_subtrees(self, count, status=None, rank=None, path=None): + try: + with contextutil.safe_while(sleep=5, tries=20) as proceed: + while proceed(): + subtrees = self._get_subtrees(status=status, rank=rank, path=path) + subtrees = list(filter(lambda s: s['random_ephemeral_pin'] == True and + s['auth_first'] == s['export_pin_target'], + subtrees)) + log.info(f"len={len(subtrees)} {subtrees}") + if len(subtrees) >= count: + return subtrees + except contextutil.MaxWhileTries as e: + raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e + + def run_cluster_cmd(self, cmd): + if isinstance(cmd, str): + cmd = shlex_split(cmd) + return self.fs.mon_manager.raw_cluster_cmd(*cmd) + + def run_cluster_cmd_result(self, cmd): + if isinstance(cmd, str): + cmd = shlex_split(cmd) + return self.fs.mon_manager.raw_cluster_cmd_result(*cmd) + + def create_client(self, client_id, moncap=None, osdcap=None, mdscap=None): + if not (moncap or osdcap or mdscap): + if self.fs: + return self.fs.authorize(client_id, ('/', 'rw')) + else: + raise RuntimeError('no caps were passed and the default FS ' + 'is not created yet to allow client auth ' + 'for it.') + + cmd = ['auth', 'add', f'client.{client_id}'] + if moncap: + cmd += ['mon', moncap] + if osdcap: + cmd += ['osd', osdcap] + if mdscap: + cmd += ['mds', mdscap] + + self.run_cluster_cmd(cmd) + return self.run_cluster_cmd(f'auth get {self.client_name}') + + def create_keyring_file(self, remote, keyring): + keyring_path = remote.mktemp(data=keyring) + + # required when triggered using vstart_runner.py. + remote.run(args=['chmod', '644', keyring_path]) + + return keyring_path |