1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
"""
CephFS sub-tasks.
"""
import logging
import re
from tasks.cephfs.filesystem import Filesystem, MDSCluster
log = logging.getLogger(__name__)
# Everything up to CEPH_MDSMAP_ALLOW_STANDBY_REPLAY
CEPH_MDSMAP_ALLOW_STANDBY_REPLAY = (1<<5)
CEPH_MDSMAP_LAST = CEPH_MDSMAP_ALLOW_STANDBY_REPLAY
UPGRADE_FLAGS_MASK = ((CEPH_MDSMAP_LAST<<1) - 1)
def pre_upgrade_save(ctx, config):
"""
That the upgrade procedure doesn't clobber state: save state.
"""
mdsc = MDSCluster(ctx)
status = mdsc.status()
state = {}
ctx['mds-upgrade-state'] = state
for fs in list(status.get_filesystems()):
fscid = fs['id']
mdsmap = fs['mdsmap']
fs_state = {}
fs_state['epoch'] = mdsmap['epoch']
fs_state['max_mds'] = mdsmap['max_mds']
fs_state['flags'] = mdsmap['flags'] & UPGRADE_FLAGS_MASK
state[fscid] = fs_state
log.debug(f"fs fscid={fscid},name={mdsmap['fs_name']} state = {fs_state}")
def post_upgrade_checks(ctx, config):
"""
That the upgrade procedure doesn't clobber state.
"""
state = ctx['mds-upgrade-state']
mdsc = MDSCluster(ctx)
status = mdsc.status()
for fs in list(status.get_filesystems()):
fscid = fs['id']
mdsmap = fs['mdsmap']
fs_state = state[fscid]
log.debug(f"checking fs fscid={fscid},name={mdsmap['fs_name']} state = {fs_state}")
# check state was restored to previous values
assert fs_state['max_mds'] == mdsmap['max_mds']
assert fs_state['flags'] == (mdsmap['flags'] & UPGRADE_FLAGS_MASK)
# now confirm that the upgrade procedure was followed
epoch = mdsmap['epoch']
pre_upgrade_epoch = fs_state['epoch']
assert pre_upgrade_epoch < epoch
should_decrease_max_mds = fs_state['max_mds'] > 1
did_decrease_max_mds = False
should_disable_allow_standby_replay = fs_state['flags'] & CEPH_MDSMAP_ALLOW_STANDBY_REPLAY
did_disable_allow_standby_replay = False
for i in range(pre_upgrade_epoch+1, mdsmap['epoch']):
old_status = mdsc.status(epoch=i)
old_fs = old_status.get_fsmap(fscid)
old_mdsmap = old_fs['mdsmap']
if should_decrease_max_mds and old_mdsmap['max_mds'] == 1:
log.debug(f"max_mds reduced in epoch {i}")
did_decrease_max_mds = True
if should_disable_allow_standby_replay and not (old_mdsmap['flags'] & CEPH_MDSMAP_ALLOW_STANDBY_REPLAY):
log.debug(f"allow_standby_replay disabled in epoch {i}")
did_disable_allow_standby_replay = True
assert not should_decrease_max_mds or did_decrease_max_mds
assert not should_disable_allow_standby_replay or did_disable_allow_standby_replay
def ready(ctx, config):
"""
That the file system is ready for clients.
"""
if config is None:
config = {}
assert isinstance(config, dict), \
'task only accepts a dict for configuration'
timeout = config.get('timeout', 300)
mdsc = MDSCluster(ctx)
status = mdsc.status()
for filesystem in status.get_filesystems():
fs = Filesystem(ctx, fscid=filesystem['id'])
fs.wait_for_daemons(timeout=timeout, status=status)
def clients_evicted(ctx, config):
"""
Check clients are evicted, unmount (cleanup) if so.
"""
if config is None:
config = {}
assert isinstance(config, dict), \
'task only accepts a dict for configuration'
clients = config.get('clients')
if clients is None:
clients = {("client."+client_id): True for client_id in ctx.mounts}
log.info("clients is {}".format(str(clients)))
fs = Filesystem(ctx)
status = fs.status()
has_session = set()
mounts = {}
for client in clients:
client_id = re.match("^client.([0-9]+)$", client).groups(1)[0]
mounts[client] = ctx.mounts.get(client_id)
for rank in fs.get_ranks(status=status):
ls = fs.rank_asok(['session', 'ls'], rank=rank['rank'], status=status)
for session in ls:
for client, evicted in clients.items():
mount = mounts.get(client)
if mount is not None:
global_id = mount.get_global_id()
if session['id'] == global_id:
if evicted:
raise RuntimeError("client still has session: {}".format(str(session)))
else:
log.info("client {} has a session with MDS {}.{}".format(client, fs.id, rank['rank']))
has_session.add(client)
no_session = set(clients) - has_session
should_assert = False
for client, evicted in clients.items():
mount = mounts.get(client)
if mount is not None:
if evicted:
log.info("confirming client {} is blocklisted".format(client))
assert fs.is_addr_blocklisted(mount.get_global_addr())
elif client in no_session:
log.info("client {} should not be evicted but has no session with an MDS".format(client))
fs.is_addr_blocklisted(mount.get_global_addr()) # for debugging
should_assert = True
if should_assert:
raise RuntimeError("some clients which should not be evicted have no session with an MDS?")
|