summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/mds_autoscaler/module.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/mds_autoscaler/module.py')
-rw-r--r--src/pybind/mgr/mds_autoscaler/module.py98
1 files changed, 98 insertions, 0 deletions
diff --git a/src/pybind/mgr/mds_autoscaler/module.py b/src/pybind/mgr/mds_autoscaler/module.py
new file mode 100644
index 000000000..a6bbadfd5
--- /dev/null
+++ b/src/pybind/mgr/mds_autoscaler/module.py
@@ -0,0 +1,98 @@
+"""
+Automatically scale MDSs based on status of the file-system using the FSMap
+"""
+
+import logging
+from typing import Optional, List, Set
+from mgr_module import MgrModule, NotifyType
+from orchestrator._interface import MDSSpec, ServiceSpec
+import orchestrator
+import copy
+
+log = logging.getLogger(__name__)
+
+
+class MDSAutoscaler(orchestrator.OrchestratorClientMixin, MgrModule):
+ """
+ MDS autoscaler.
+ """
+ NOTIFY_TYPES = [NotifyType.fs_map]
+ def __init__(self, *args, **kwargs):
+ MgrModule.__init__(self, *args, **kwargs)
+ self.set_mgr(self)
+
+ def get_service(self, fs_name: str) -> Optional[orchestrator.ServiceDescription]:
+ service = f"mds.{fs_name}"
+ completion = self.describe_service(service_type='mds',
+ service_name=service,
+ refresh=True)
+ orchestrator.raise_if_exception(completion)
+ if completion.result:
+ return completion.result[0]
+ return None
+
+ def update_daemon_count(self, spec: ServiceSpec, fs_name: str, abscount: int) -> MDSSpec:
+ ps = copy.deepcopy(spec.placement)
+ ps.count = abscount
+ newspec = MDSSpec(service_type=spec.service_type,
+ service_id=spec.service_id,
+ placement=ps)
+ return newspec
+
+ def get_required_standby_count(self, fs_map: dict, fs_name: str) -> int:
+ assert fs_map is not None
+ for fs in fs_map['filesystems']:
+ if fs['mdsmap']['fs_name'] == fs_name:
+ return fs['mdsmap']['standby_count_wanted']
+ assert False
+
+ def get_required_max_mds(self, fs_map: dict, fs_name: str) -> int:
+ assert fs_map is not None
+ for fs in fs_map['filesystems']:
+ if fs['mdsmap']['fs_name'] == fs_name:
+ return fs['mdsmap']['max_mds']
+ assert False
+
+ def verify_and_manage_mds_instance(self, fs_map: dict, fs_name: str):
+ assert fs_map is not None
+
+ try:
+ svc = self.get_service(fs_name)
+ if not svc:
+ self.log.info(f"fs {fs_name}: no service defined; skipping")
+ return
+ if not svc.spec.placement.count:
+ self.log.info(f"fs {fs_name}: service does not specify a count; skipping")
+ return
+
+ standbys_required = self.get_required_standby_count(fs_map, fs_name)
+ max_mds = self.get_required_max_mds(fs_map, fs_name)
+ want = max_mds + standbys_required
+
+ self.log.info(f"fs {fs_name}: "
+ f"max_mds={max_mds} "
+ f"standbys_required={standbys_required}, "
+ f"count={svc.spec.placement.count}")
+
+ if want == svc.spec.placement.count:
+ return
+
+ self.log.info(f"fs {fs_name}: adjusting daemon count from {svc.spec.placement.count} to {want}")
+ newspec = self.update_daemon_count(svc.spec, fs_name, want)
+ completion = self.apply_mds(newspec)
+ orchestrator.raise_if_exception(completion)
+ except orchestrator.OrchestratorError as e:
+ self.log.exception(f"fs {fs_name}: exception while updating service: {e}")
+ pass
+
+ def notify(self, notify_type, notify_id):
+ if notify_type != NotifyType.fs_map:
+ return
+ fs_map = self.get('fs_map')
+ if not fs_map:
+ return
+
+ # we don't know for which fs config has been changed
+ for fs in fs_map['filesystems']:
+ fs_name = fs['mdsmap']['fs_name']
+ self.verify_and_manage_mds_instance(fs_map, fs_name)