From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- qa/tasks/cephfs/caps_helper.py | 79 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 qa/tasks/cephfs/caps_helper.py (limited to 'qa/tasks/cephfs/caps_helper.py') diff --git a/qa/tasks/cephfs/caps_helper.py b/qa/tasks/cephfs/caps_helper.py new file mode 100644 index 000000000..39b5963be --- /dev/null +++ b/qa/tasks/cephfs/caps_helper.py @@ -0,0 +1,79 @@ +""" +Helper methods to test that MON and MDS caps are enforced properly. +""" +from tasks.cephfs.cephfs_test_case import CephFSTestCase + +from teuthology.orchestra.run import Raw + +class CapsHelper(CephFSTestCase): + + def run_mon_cap_tests(self, moncap, keyring): + keyring_path = self.create_keyring_file(self.fs.admin_remote, keyring) + + fsls = self.run_cluster_cmd(f'fs ls --id {self.client_id} -k ' + f'{keyring_path}') + + # we need to check only for default FS when fsname clause is absent + # in MON/MDS caps + if 'fsname' not in moncap: + self.assertIn(self.fs.name, fsls) + return + + fss = (self.fs1.name, self.fs2.name) if hasattr(self, 'fs1') else \ + (self.fs.name,) + for fsname in fss: + if fsname in moncap: + self.assertIn('name: ' + fsname, fsls) + else: + self.assertNotIn('name: ' + fsname, fsls) + + def run_mds_cap_tests(self, filepaths, filedata, mounts, perm): + self.conduct_pos_test_for_read_caps(filepaths, filedata, mounts) + + if perm == 'rw': + self.conduct_pos_test_for_write_caps(filepaths, mounts) + elif perm == 'r': + self.conduct_neg_test_for_write_caps(filepaths, mounts) + else: + raise RuntimeError(f'perm = {perm}\nIt should be "r" or "rw".') + + def conduct_pos_test_for_read_caps(self, filepaths, filedata, mounts): + for mount in mounts: + for path, data in zip(filepaths, filedata): + # XXX: conduct tests only if path belongs to current mount; in + # teuth tests client are located on same machines. + if path.find(mount.hostfs_mntpt) != -1: + contents = mount.read_file(path) + self.assertEqual(data, contents) + + def conduct_pos_test_for_write_caps(self, filepaths, mounts): + filedata = ('some new data on first fs', 'some new data on second fs') + + for mount in mounts: + for path, data in zip(filepaths, filedata): + if path.find(mount.hostfs_mntpt) != -1: + # test that write was successful + mount.write_file(path=path, data=data) + # verify that contents written was same as the one that was + # intended + contents1 = mount.read_file(path=path) + self.assertEqual(data, contents1) + + def conduct_neg_test_for_write_caps(self, filepaths, mounts): + cmdargs = ['echo', 'some random data', Raw('|'), 'tee'] + + for mount in mounts: + for path in filepaths: + if path.find(mount.hostfs_mntpt) != -1: + cmdargs.append(path) + mount.negtestcmd(args=cmdargs, retval=1, + errmsg='permission denied') + + def get_mon_cap_from_keyring(self, client_name): + keyring = self.run_cluster_cmd(cmd=f'auth get {client_name}') + for line in keyring.split('\n'): + if 'caps mon' in line: + return line[line.find(' = "') + 4 : -1] + + raise RuntimeError('get_save_mon_cap: mon cap not found in keyring. ' + 'keyring -\n' + keyring) -- cgit v1.2.3