summaryrefslogtreecommitdiffstats
path: root/qa/tasks/cephfs/caps_helper.py
blob: 39b5963befa6abb6dd2e5e4aff19b6fe174f3a43 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""
Helper methods to test that MON and MDS caps are enforced properly.
"""
from tasks.cephfs.cephfs_test_case import CephFSTestCase

from teuthology.orchestra.run import Raw

class CapsHelper(CephFSTestCase):

    def run_mon_cap_tests(self, moncap, keyring):
        keyring_path = self.create_keyring_file(self.fs.admin_remote, keyring)

        fsls = self.run_cluster_cmd(f'fs ls --id {self.client_id} -k '
                                    f'{keyring_path}')

        # we need to check only for default FS when fsname clause is absent
        # in MON/MDS caps
        if 'fsname' not in moncap:
            self.assertIn(self.fs.name, fsls)
            return

        fss = (self.fs1.name, self.fs2.name) if hasattr(self, 'fs1') else \
            (self.fs.name,)
        for fsname in fss:
                if fsname in moncap:
                    self.assertIn('name: ' + fsname, fsls)
                else:
                    self.assertNotIn('name: ' + fsname, fsls)

    def run_mds_cap_tests(self, filepaths, filedata, mounts, perm):
        self.conduct_pos_test_for_read_caps(filepaths, filedata, mounts)

        if perm == 'rw':
            self.conduct_pos_test_for_write_caps(filepaths, mounts)
        elif perm == 'r':
            self.conduct_neg_test_for_write_caps(filepaths, mounts)
        else:
            raise RuntimeError(f'perm = {perm}\nIt should be "r" or "rw".')

    def conduct_pos_test_for_read_caps(self, filepaths, filedata, mounts):
        for mount in mounts:
            for path, data in zip(filepaths, filedata):
                # XXX: conduct tests only if path belongs to current mount; in
                # teuth tests client are located on same machines.
                if path.find(mount.hostfs_mntpt) != -1:
                    contents = mount.read_file(path)
                    self.assertEqual(data, contents)

    def conduct_pos_test_for_write_caps(self, filepaths, mounts):
        filedata = ('some new data on first fs', 'some new data on second fs')

        for mount in mounts:
            for path, data in zip(filepaths, filedata):
                if path.find(mount.hostfs_mntpt) != -1:
                    # test that write was successful
                    mount.write_file(path=path, data=data)
                    # verify that contents written was same as the one that was
                    # intended
                    contents1 = mount.read_file(path=path)
                    self.assertEqual(data, contents1)

    def conduct_neg_test_for_write_caps(self, filepaths, mounts):
        cmdargs = ['echo', 'some random data', Raw('|'), 'tee']

        for mount in mounts:
            for path in filepaths:
                if path.find(mount.hostfs_mntpt) != -1:
                    cmdargs.append(path)
                    mount.negtestcmd(args=cmdargs, retval=1,
                                     errmsg='permission denied')

    def get_mon_cap_from_keyring(self, client_name):
        keyring = self.run_cluster_cmd(cmd=f'auth get {client_name}')
        for line in keyring.split('\n'):
            if 'caps mon' in line:
                return line[line.find(' = "') + 4 : -1]

        raise RuntimeError('get_save_mon_cap: mon cap not found in keyring. '
                           'keyring -\n' + keyring)