1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
import logging
import json
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from teuthology.exceptions import CommandFailedError
from teuthology.contextutil import safe_while
log = logging.getLogger(__name__)
class TestFSTop(CephFSTestCase):
CLIENTS_REQUIRED = 2
def setUp(self):
super(TestFSTop, self).setUp()
self._enable_mgr_stats_plugin()
def tearDown(self):
self._disable_mgr_stats_plugin()
super(TestFSTop, self).tearDown()
def _enable_mgr_stats_plugin(self):
return self.get_ceph_cmd_stdout("mgr", "module", "enable", "stats")
def _disable_mgr_stats_plugin(self):
return self.get_ceph_cmd_stdout("mgr", "module", "disable", "stats")
def _fstop_dump(self, *args):
return self.mount_a.run_shell(['cephfs-top',
'--id=admin',
*args]).stdout.getvalue()
def _get_metrics(self, verifier_callback, trials, *args):
metrics = None
done = False
with safe_while(sleep=1, tries=trials, action='wait for metrics') as proceed:
while proceed():
metrics = json.loads(self._fstop_dump(*args))
done = verifier_callback(metrics)
if done:
break
return done, metrics
# TESTS
def test_fstop_non_existent_cluster(self):
try:
self.mount_a.run_shell(['cephfs-top',
'--cluster=hpec',
'--id=admin',
'--selftest'])
except CommandFailedError:
pass
else:
raise RuntimeError('expected cephfs-top command to fail.')
def test_fstop(self):
try:
self.mount_a.run_shell(['cephfs-top',
'--id=admin',
'--selftest'])
except CommandFailedError:
raise RuntimeError('cephfs-top --selftest failed')
def test_dump(self):
"""
Tests 'cephfs-top --dump' output is valid
"""
def verify_fstop_metrics(metrics):
clients = metrics.get('filesystems').get(self.fs.name, {})
if str(self.mount_a.get_global_id()) in clients and \
str(self.mount_b.get_global_id()) in clients:
return True
return False
# validate
valid, metrics = self._get_metrics(verify_fstop_metrics, 30, '--dump')
log.debug("metrics={0}".format(metrics))
self.assertTrue(valid)
def test_dumpfs(self):
"""
Tests 'cephfs-top --dumpfs' output is valid
"""
newfs_name = "cephfs_b"
def verify_fstop_metrics(metrics):
clients = metrics.get(newfs_name, {})
if self.fs.name not in metrics and \
str(self.mount_b.get_global_id()) in clients:
return True
return False
# umount mount_b, mount another filesystem on it and use --dumpfs filter
self.mount_b.umount_wait()
self.run_ceph_cmd("fs", "flag", "set", "enable_multiple", "true",
"--yes-i-really-mean-it")
# create a new filesystem
fs_b = self.mds_cluster.newfs(name=newfs_name)
# mount cephfs_b on mount_b
self.mount_b.mount_wait(cephfs_name=fs_b.name)
# validate
valid, metrics = self._get_metrics(verify_fstop_metrics, 30,
'--dumpfs={}'.format(newfs_name))
log.debug("metrics={0}".format(metrics))
# restore mount_b
self.mount_b.umount_wait()
self.mount_b.mount_wait(cephfs_name=self.fs.name)
self.assertTrue(valid)
|