1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
"""
Automatically scale MDSs based on status of the file-system using the FSMap
"""
import logging
from typing import Optional, List, Set
from mgr_module import MgrModule, NotifyType
from orchestrator._interface import MDSSpec, ServiceSpec
import orchestrator
import copy
log = logging.getLogger(__name__)
class MDSAutoscaler(orchestrator.OrchestratorClientMixin, MgrModule):
"""
MDS autoscaler.
"""
NOTIFY_TYPES = [NotifyType.fs_map]
def __init__(self, *args, **kwargs):
MgrModule.__init__(self, *args, **kwargs)
self.set_mgr(self)
def get_service(self, fs_name: str) -> Optional[orchestrator.ServiceDescription]:
service = f"mds.{fs_name}"
completion = self.describe_service(service_type='mds',
service_name=service,
refresh=True)
orchestrator.raise_if_exception(completion)
if completion.result:
return completion.result[0]
return None
def update_daemon_count(self, spec: ServiceSpec, fs_name: str, abscount: int) -> MDSSpec:
ps = copy.deepcopy(spec.placement)
ps.count = abscount
newspec = MDSSpec(service_type=spec.service_type,
service_id=spec.service_id,
placement=ps)
return newspec
def get_required_standby_count(self, fs_map: dict, fs_name: str) -> int:
assert fs_map is not None
for fs in fs_map['filesystems']:
if fs['mdsmap']['fs_name'] == fs_name:
return fs['mdsmap']['standby_count_wanted']
assert False
def get_required_max_mds(self, fs_map: dict, fs_name: str) -> int:
assert fs_map is not None
for fs in fs_map['filesystems']:
if fs['mdsmap']['fs_name'] == fs_name:
return fs['mdsmap']['max_mds']
assert False
def verify_and_manage_mds_instance(self, fs_map: dict, fs_name: str):
assert fs_map is not None
try:
svc = self.get_service(fs_name)
if not svc:
self.log.info(f"fs {fs_name}: no service defined; skipping")
return
if not svc.spec.placement.count:
self.log.info(f"fs {fs_name}: service does not specify a count; skipping")
return
standbys_required = self.get_required_standby_count(fs_map, fs_name)
max_mds = self.get_required_max_mds(fs_map, fs_name)
want = max_mds + standbys_required
self.log.info(f"fs {fs_name}: "
f"max_mds={max_mds} "
f"standbys_required={standbys_required}, "
f"count={svc.spec.placement.count}")
if want == svc.spec.placement.count:
return
self.log.info(f"fs {fs_name}: adjusting daemon count from {svc.spec.placement.count} to {want}")
newspec = self.update_daemon_count(svc.spec, fs_name, want)
completion = self.apply_mds(newspec)
orchestrator.raise_if_exception(completion)
except orchestrator.OrchestratorError as e:
self.log.exception(f"fs {fs_name}: exception while updating service: {e}")
pass
def notify(self, notify_type, notify_id):
if notify_type != NotifyType.fs_map:
return
fs_map = self.get('fs_map')
if not fs_map:
return
# we don't know for which fs config has been changed
for fs in fs_map['filesystems']:
fs_name = fs['mdsmap']['fs_name']
self.verify_and_manage_mds_instance(fs_map, fs_name)
|