summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/balancer/module.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/pybind/mgr/balancer/module.py
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/balancer/module.py')
-rw-r--r--src/pybind/mgr/balancer/module.py1409
1 files changed, 1409 insertions, 0 deletions
diff --git a/src/pybind/mgr/balancer/module.py b/src/pybind/mgr/balancer/module.py
new file mode 100644
index 000000000..1c4042511
--- /dev/null
+++ b/src/pybind/mgr/balancer/module.py
@@ -0,0 +1,1409 @@
+"""
+Balance PG distribution across OSDs.
+"""
+
+import copy
+import enum
+import errno
+import json
+import math
+import random
+import time
+from mgr_module import CLIReadCommand, CLICommand, CommandResult, MgrModule, Option, OSDMap
+from threading import Event
+from typing import cast, Any, Dict, List, Optional, Sequence, Tuple, Union
+from mgr_module import CRUSHMap
+import datetime
+
+TIME_FORMAT = '%Y-%m-%d_%H:%M:%S'
+
+
+class MappingState:
+ def __init__(self, osdmap, raw_pg_stats, raw_pool_stats, desc=''):
+ self.desc = desc
+ self.osdmap = osdmap
+ self.osdmap_dump = self.osdmap.dump()
+ self.crush = osdmap.get_crush()
+ self.crush_dump = self.crush.dump()
+ self.raw_pg_stats = raw_pg_stats
+ self.raw_pool_stats = raw_pool_stats
+ self.pg_stat = {
+ i['pgid']: i['stat_sum'] for i in raw_pg_stats.get('pg_stats', [])
+ }
+ osd_poolids = [p['pool'] for p in self.osdmap_dump.get('pools', [])]
+ pg_poolids = [p['poolid'] for p in raw_pool_stats.get('pool_stats', [])]
+ self.poolids = set(osd_poolids) & set(pg_poolids)
+ self.pg_up = {}
+ self.pg_up_by_poolid = {}
+ for poolid in self.poolids:
+ self.pg_up_by_poolid[poolid] = osdmap.map_pool_pgs_up(poolid)
+ for a, b in self.pg_up_by_poolid[poolid].items():
+ self.pg_up[a] = b
+
+ def calc_misplaced_from(self, other_ms):
+ num = len(other_ms.pg_up)
+ misplaced = 0
+ for pgid, before in other_ms.pg_up.items():
+ if before != self.pg_up.get(pgid, []):
+ misplaced += 1
+ if num > 0:
+ return float(misplaced) / float(num)
+ return 0.0
+
+
+class Mode(enum.Enum):
+ none = 'none'
+ crush_compat = 'crush-compat'
+ upmap = 'upmap'
+
+
+class Plan(object):
+ def __init__(self, name, mode, osdmap, pools):
+ self.name = name
+ self.mode = mode
+ self.osdmap = osdmap
+ self.osdmap_dump = osdmap.dump()
+ self.pools = pools
+ self.osd_weights = {}
+ self.compat_ws = {}
+ self.inc = osdmap.new_incremental()
+ self.pg_status = {}
+
+ def dump(self) -> str:
+ return json.dumps(self.inc.dump(), indent=4, sort_keys=True)
+
+ def show(self) -> str:
+ return 'upmap plan'
+
+
+class MsPlan(Plan):
+ """
+ Plan with a preloaded MappingState member.
+ """
+
+ def __init__(self, name: str, mode: str, ms: MappingState, pools: List[str]) -> None:
+ super(MsPlan, self).__init__(name, mode, ms.osdmap, pools)
+ self.initial = ms
+
+ def final_state(self) -> MappingState:
+ self.inc.set_osd_reweights(self.osd_weights)
+ self.inc.set_crush_compat_weight_set_weights(self.compat_ws)
+ return MappingState(self.initial.osdmap.apply_incremental(self.inc),
+ self.initial.raw_pg_stats,
+ self.initial.raw_pool_stats,
+ 'plan %s final' % self.name)
+
+ def show(self) -> str:
+ ls = []
+ ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
+ ls.append('# starting crush version %d' %
+ self.initial.osdmap.get_crush_version())
+ ls.append('# mode %s' % self.mode)
+ if len(self.compat_ws) and \
+ not CRUSHMap.have_default_choose_args(self.initial.crush_dump):
+ ls.append('ceph osd crush weight-set create-compat')
+ for osd, weight in self.compat_ws.items():
+ ls.append('ceph osd crush weight-set reweight-compat %s %f' %
+ (osd, weight))
+ for osd, weight in self.osd_weights.items():
+ ls.append('ceph osd reweight osd.%d %f' % (osd, weight))
+ incdump = self.inc.dump()
+ for pgid in incdump.get('old_pg_upmap_items', []):
+ ls.append('ceph osd rm-pg-upmap-items %s' % pgid)
+ for item in incdump.get('new_pg_upmap_items', []):
+ osdlist = []
+ for m in item['mappings']:
+ osdlist += [m['from'], m['to']]
+ ls.append('ceph osd pg-upmap-items %s %s' %
+ (item['pgid'], ' '.join([str(a) for a in osdlist])))
+ return '\n'.join(ls)
+
+
+class Eval:
+ def __init__(self, ms: MappingState):
+ self.ms = ms
+ self.root_ids: Dict[str, int] = {} # root name -> id
+ self.pool_name: Dict[str, str] = {} # pool id -> pool name
+ self.pool_id: Dict[str, int] = {} # pool name -> id
+ self.pool_roots: Dict[str, List[str]] = {} # pool name -> root name
+ self.root_pools: Dict[str, List[str]] = {} # root name -> pools
+ self.target_by_root: Dict[str, Dict[int, float]] = {} # root name -> target weight map
+ self.count_by_pool: Dict[str, dict] = {}
+ self.count_by_root: Dict[str, dict] = {}
+ self.actual_by_pool: Dict[str, dict] = {} # pool -> by_* -> actual weight map
+ self.actual_by_root: Dict[str, dict] = {} # pool -> by_* -> actual weight map
+ self.total_by_pool: Dict[str, dict] = {} # pool -> by_* -> total
+ self.total_by_root: Dict[str, dict] = {} # root -> by_* -> total
+ self.stats_by_pool: Dict[str, dict] = {} # pool -> by_* -> stddev or avg -> value
+ self.stats_by_root: Dict[str, dict] = {} # root -> by_* -> stddev or avg -> value
+
+ self.score_by_pool: Dict[str, float] = {}
+ self.score_by_root: Dict[str, Dict[str, float]] = {}
+
+ self.score = 0.0
+
+ def show(self, verbose: bool = False) -> str:
+ if verbose:
+ r = self.ms.desc + '\n'
+ r += 'target_by_root %s\n' % self.target_by_root
+ r += 'actual_by_pool %s\n' % self.actual_by_pool
+ r += 'actual_by_root %s\n' % self.actual_by_root
+ r += 'count_by_pool %s\n' % self.count_by_pool
+ r += 'count_by_root %s\n' % self.count_by_root
+ r += 'total_by_pool %s\n' % self.total_by_pool
+ r += 'total_by_root %s\n' % self.total_by_root
+ r += 'stats_by_root %s\n' % self.stats_by_root
+ r += 'score_by_pool %s\n' % self.score_by_pool
+ r += 'score_by_root %s\n' % self.score_by_root
+ else:
+ r = self.ms.desc + ' '
+ r += 'score %f (lower is better)\n' % self.score
+ return r
+
+ def calc_stats(self, count, target, total):
+ num = max(len(target), 1)
+ r: Dict[str, Dict[str, Union[int, float]]] = {}
+ for t in ('pgs', 'objects', 'bytes'):
+ if total[t] == 0:
+ r[t] = {
+ 'max': 0,
+ 'min': 0,
+ 'avg': 0,
+ 'stddev': 0,
+ 'sum_weight': 0,
+ 'score': 0,
+ }
+ continue
+
+ avg = float(total[t]) / float(num)
+ dev = 0.0
+
+ # score is a measure of how uneven the data distribution is.
+ # score lies between [0, 1), 0 means perfect distribution.
+ score = 0.0
+ sum_weight = 0.0
+
+ for k, v in count[t].items():
+ # adjust/normalize by weight
+ if target[k]:
+ adjusted = float(v) / target[k] / float(num)
+ else:
+ adjusted = 0.0
+
+ # Overweighted devices and their weights are factors to calculate reweight_urgency.
+ # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
+ # situation than one 10% overfilled with 5 2% underfilled devices
+ if adjusted > avg:
+ '''
+ F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
+ x = (adjusted - avg)/avg.
+ Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
+ To bring range of F(x) in range [0, 1), we need to make the above modification.
+
+ In general, we need to use a function F(x), where x = (adjusted - avg)/avg
+ 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
+ 2. A larger value of x, should imply more urgency to reweight.
+ 3. Also, the difference between F(x) when x is large, should be minimal.
+ 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
+
+ Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
+
+ cdf of standard normal distribution: https://stackoverflow.com/a/29273201
+ '''
+ score += target[k] * (math.erf(((adjusted - avg) / avg) / math.sqrt(2.0)))
+ sum_weight += target[k]
+ dev += (avg - adjusted) * (avg - adjusted)
+ stddev = math.sqrt(dev / float(max(num - 1, 1)))
+ score = score / max(sum_weight, 1)
+ r[t] = {
+ 'max': max(count[t].values()),
+ 'min': min(count[t].values()),
+ 'avg': avg,
+ 'stddev': stddev,
+ 'sum_weight': sum_weight,
+ 'score': score,
+ }
+ return r
+
+
+class Module(MgrModule):
+ MODULE_OPTIONS = [
+ Option(name='active',
+ type='bool',
+ default=True,
+ desc='automatically balance PGs across cluster',
+ runtime=True),
+ Option(name='begin_time',
+ type='str',
+ default='0000',
+ desc='beginning time of day to automatically balance',
+ long_desc='This is a time of day in the format HHMM.',
+ runtime=True),
+ Option(name='end_time',
+ type='str',
+ default='2359',
+ desc='ending time of day to automatically balance',
+ long_desc='This is a time of day in the format HHMM.',
+ runtime=True),
+ Option(name='begin_weekday',
+ type='uint',
+ default=0,
+ min=0,
+ max=6,
+ desc='Restrict automatic balancing to this day of the week or later',
+ long_desc='0 = Sunday, 1 = Monday, etc.',
+ runtime=True),
+ Option(name='end_weekday',
+ type='uint',
+ default=0,
+ min=0,
+ max=6,
+ desc='Restrict automatic balancing to days of the week earlier than this',
+ long_desc='0 = Sunday, 1 = Monday, etc.',
+ runtime=True),
+ Option(name='crush_compat_max_iterations',
+ type='uint',
+ default=25,
+ min=1,
+ max=250,
+ desc='maximum number of iterations to attempt optimization',
+ runtime=True),
+ Option(name='crush_compat_metrics',
+ type='str',
+ default='pgs,objects,bytes',
+ desc='metrics with which to calculate OSD utilization',
+ long_desc='Value is a list of one or more of "pgs", "objects", or "bytes", and indicates which metrics to use to balance utilization.',
+ runtime=True),
+ Option(name='crush_compat_step',
+ type='float',
+ default=.5,
+ min=.001,
+ max=.999,
+ desc='aggressiveness of optimization',
+ long_desc='.99 is very aggressive, .01 is less aggressive',
+ runtime=True),
+ Option(name='min_score',
+ type='float',
+ default=0,
+ desc='minimum score, below which no optimization is attempted',
+ runtime=True),
+ Option(name='mode',
+ desc='Balancer mode',
+ default='upmap',
+ enum_allowed=['none', 'crush-compat', 'upmap'],
+ runtime=True),
+ Option(name='sleep_interval',
+ type='secs',
+ default=60,
+ desc='how frequently to wake up and attempt optimization',
+ runtime=True),
+ Option(name='upmap_max_optimizations',
+ type='uint',
+ default=10,
+ desc='maximum upmap optimizations to make per attempt',
+ runtime=True),
+ Option(name='upmap_max_deviation',
+ type='int',
+ default=5,
+ min=1,
+ desc='deviation below which no optimization is attempted',
+ long_desc='If the number of PGs are within this count then no optimization is attempted',
+ runtime=True),
+ Option(name='pool_ids',
+ type='str',
+ default='',
+ desc='pools which the automatic balancing will be limited to',
+ runtime=True)
+ ]
+
+ active = False
+ run = True
+ plans: Dict[str, Plan] = {}
+ mode = ''
+ optimizing = False
+ last_optimize_started = ''
+ last_optimize_duration = ''
+ optimize_result = ''
+ no_optimization_needed = False
+ success_string = 'Optimization plan created successfully'
+ in_progress_string = 'in progress'
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super(Module, self).__init__(*args, **kwargs)
+ self.event = Event()
+
+ @CLIReadCommand('balancer status')
+ def show_status(self) -> Tuple[int, str, str]:
+ """
+ Show balancer status
+ """
+ s = {
+ 'plans': list(self.plans.keys()),
+ 'active': self.active,
+ 'last_optimize_started': self.last_optimize_started,
+ 'last_optimize_duration': self.last_optimize_duration,
+ 'optimize_result': self.optimize_result,
+ 'no_optimization_needed': self.no_optimization_needed,
+ 'mode': self.get_module_option('mode'),
+ }
+ return (0, json.dumps(s, indent=4, sort_keys=True), '')
+
+ @CLICommand('balancer mode')
+ def set_mode(self, mode: Mode) -> Tuple[int, str, str]:
+ """
+ Set balancer mode
+ """
+ if mode == Mode.upmap:
+ min_compat_client = self.get_osdmap().dump().get('require_min_compat_client', '')
+ if min_compat_client < 'luminous': # works well because version is alphabetized..
+ warn = ('min_compat_client "%s" '
+ '< "luminous", which is required for pg-upmap. '
+ 'Try "ceph osd set-require-min-compat-client luminous" '
+ 'before enabling this mode' % min_compat_client)
+ return (-errno.EPERM, '', warn)
+ elif mode == Mode.crush_compat:
+ ms = MappingState(self.get_osdmap(),
+ self.get("pg_stats"),
+ self.get("pool_stats"),
+ 'initialize compat weight-set')
+ self.get_compat_weight_set_weights(ms) # ignore error
+ self.set_module_option('mode', mode.value)
+ return (0, '', '')
+
+ @CLICommand('balancer on')
+ def on(self) -> Tuple[int, str, str]:
+ """
+ Enable automatic balancing
+ """
+ if not self.active:
+ self.set_module_option('active', 'true')
+ self.active = True
+ self.event.set()
+ return (0, '', '')
+
+ @CLICommand('balancer off')
+ def off(self) -> Tuple[int, str, str]:
+ """
+ Disable automatic balancing
+ """
+ if self.active:
+ self.set_module_option('active', 'false')
+ self.active = False
+ self.event.set()
+ return (0, '', '')
+
+ @CLIReadCommand('balancer pool ls')
+ def pool_ls(self) -> Tuple[int, str, str]:
+ """
+ List automatic balancing pools
+
+ Note that empty list means all existing pools will be automatic balancing targets,
+ which is the default behaviour of balancer.
+ """
+ pool_ids = cast(str, self.get_module_option('pool_ids'))
+ if pool_ids == '':
+ return (0, '', '')
+ pool_ids = [int(p) for p in pool_ids.split(',')]
+ pool_name_by_id = dict((p['pool'], p['pool_name'])
+ for p in self.get_osdmap().dump().get('pools', []))
+ should_prune = False
+ final_ids: List[int] = []
+ final_names = []
+ for p in pool_ids:
+ if p in pool_name_by_id:
+ final_ids.append(p)
+ final_names.append(pool_name_by_id[p])
+ else:
+ should_prune = True
+ if should_prune: # some pools were gone, prune
+ self.set_module_option('pool_ids', ','.join(str(p) for p in final_ids))
+ return (0, json.dumps(sorted(final_names), indent=4, sort_keys=True), '')
+
+ @CLICommand('balancer pool add')
+ def pool_add(self, pools: Sequence[str]) -> Tuple[int, str, str]:
+ """
+ Enable automatic balancing for specific pools
+ """
+ raw_names = pools
+ pool_id_by_name = dict((p['pool_name'], p['pool'])
+ for p in self.get_osdmap().dump().get('pools', []))
+ invalid_names = [p for p in raw_names if p not in pool_id_by_name]
+ if invalid_names:
+ return (-errno.EINVAL, '', 'pool(s) %s not found' % invalid_names)
+ to_add = set(str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name)
+ pool_ids = cast(str, self.get_module_option('pool_ids'))
+ existing = set(pool_ids.split(',') if pool_ids else [])
+ final = to_add | existing
+ self.set_module_option('pool_ids', ','.join(final))
+ return (0, '', '')
+
+ @CLICommand('balancer pool rm')
+ def pool_rm(self, pools: Sequence[str]) -> Tuple[int, str, str]:
+ """
+ Disable automatic balancing for specific pools
+ """
+ raw_names = pools
+ existing = cast(str, self.get_module_option('pool_ids'))
+ if existing == '': # for idempotence
+ return (0, '', '')
+ existing = existing.split(',')
+ osdmap = self.get_osdmap()
+ pool_ids = [str(p['pool']) for p in osdmap.dump().get('pools', [])]
+ pool_id_by_name = dict((p['pool_name'], p['pool']) for p in osdmap.dump().get('pools', []))
+ final = [p for p in existing if p in pool_ids]
+ to_delete = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name]
+ final = set(final) - set(to_delete)
+ self.set_module_option('pool_ids', ','.join(final))
+ return (0, '', '')
+
+ def _state_from_option(self, option: Optional[str] = None) -> Tuple[MappingState, List[str]]:
+ pools = []
+ if option is None:
+ ms = MappingState(self.get_osdmap(),
+ self.get("pg_stats"),
+ self.get("pool_stats"),
+ 'current cluster')
+ elif option in self.plans:
+ plan = self.plans.get(option)
+ assert plan
+ pools = plan.pools
+ if plan.mode == 'upmap':
+ # Note that for upmap, to improve the efficiency,
+ # we use a basic version of Plan without keeping the obvious
+ # *redundant* MS member.
+ # Hence ms might not be accurate here since we are basically
+ # using an old snapshotted osdmap vs a fresh copy of pg_stats.
+ # It should not be a big deal though..
+ ms = MappingState(plan.osdmap,
+ self.get("pg_stats"),
+ self.get("pool_stats"),
+ f'plan "{plan.name}"')
+ else:
+ ms = cast(MsPlan, plan).final_state()
+ else:
+ # not a plan, does it look like a pool?
+ osdmap = self.get_osdmap()
+ valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
+ if option not in valid_pool_names:
+ raise ValueError(f'option "{option}" not a plan or a pool')
+ pools.append(option)
+ ms = MappingState(osdmap,
+ self.get("pg_stats"),
+ self.get("pool_stats"),
+ f'pool "{option}"')
+ return ms, pools
+
+ @CLIReadCommand('balancer eval-verbose')
+ def plan_eval_verbose(self, option: Optional[str] = None):
+ """
+ Evaluate data distribution for the current cluster or specific pool or specific
+ plan (verbosely)
+ """
+ try:
+ ms, pools = self._state_from_option(option)
+ return (0, self.evaluate(ms, pools, verbose=True), '')
+ except ValueError as e:
+ return (-errno.EINVAL, '', str(e))
+
+ @CLIReadCommand('balancer eval')
+ def plan_eval_brief(self, option: Optional[str] = None):
+ """
+ Evaluate data distribution for the current cluster or specific pool or specific plan
+ """
+ try:
+ ms, pools = self._state_from_option(option)
+ return (0, self.evaluate(ms, pools, verbose=False), '')
+ except ValueError as e:
+ return (-errno.EINVAL, '', str(e))
+
+ @CLIReadCommand('balancer optimize')
+ def plan_optimize(self, plan: str, pools: List[str] = []) -> Tuple[int, str, str]:
+ """
+ Run optimizer to create a new plan
+ """
+ # The GIL can be release by the active balancer, so disallow when active
+ if self.active:
+ return (-errno.EINVAL, '', 'Balancer enabled, disable to optimize manually')
+ if self.optimizing:
+ return (-errno.EINVAL, '', 'Balancer finishing up....try again')
+ osdmap = self.get_osdmap()
+ valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
+ invalid_pool_names = []
+ for p in pools:
+ if p not in valid_pool_names:
+ invalid_pool_names.append(p)
+ if len(invalid_pool_names):
+ return (-errno.EINVAL, '', 'pools %s not found' % invalid_pool_names)
+ plan_ = self.plan_create(plan, osdmap, pools)
+ self.last_optimize_started = time.asctime(time.localtime())
+ self.optimize_result = self.in_progress_string
+ start = time.time()
+ r, detail = self.optimize(plan_)
+ end = time.time()
+ self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
+ if r == 0:
+ # Add plan if an optimization was created
+ self.optimize_result = self.success_string
+ self.plans[plan] = plan_
+ else:
+ self.optimize_result = detail
+ return (r, '', detail)
+
+ @CLIReadCommand('balancer show')
+ def plan_show(self, plan: str) -> Tuple[int, str, str]:
+ """
+ Show details of an optimization plan
+ """
+ plan_ = self.plans.get(plan)
+ if not plan_:
+ return (-errno.ENOENT, '', f'plan {plan} not found')
+ return (0, plan_.show(), '')
+
+ @CLICommand('balancer rm')
+ def plan_rm(self, plan: str) -> Tuple[int, str, str]:
+ """
+ Discard an optimization plan
+ """
+ if plan in self.plans:
+ del self.plans[plan]
+ return (0, '', '')
+
+ @CLICommand('balancer reset')
+ def plan_reset(self) -> Tuple[int, str, str]:
+ """
+ Discard all optimization plans
+ """
+ self.plans = {}
+ return (0, '', '')
+
+ @CLIReadCommand('balancer dump')
+ def plan_dump(self, plan: str) -> Tuple[int, str, str]:
+ """
+ Show an optimization plan
+ """
+ plan_ = self.plans.get(plan)
+ if not plan_:
+ return -errno.ENOENT, '', f'plan {plan} not found'
+ else:
+ return (0, plan_.dump(), '')
+
+ @CLIReadCommand('balancer ls')
+ def plan_ls(self) -> Tuple[int, str, str]:
+ """
+ List all plans
+ """
+ return (0, json.dumps([p for p in self.plans], indent=4, sort_keys=True), '')
+
+ @CLIReadCommand('balancer execute')
+ def plan_execute(self, plan: str) -> Tuple[int, str, str]:
+ """
+ Execute an optimization plan
+ """
+ # The GIL can be release by the active balancer, so disallow when active
+ if self.active:
+ return (-errno.EINVAL, '', 'Balancer enabled, disable to execute a plan')
+ if self.optimizing:
+ return (-errno.EINVAL, '', 'Balancer finishing up....try again')
+ plan_ = self.plans.get(plan)
+ if not plan_:
+ return (-errno.ENOENT, '', f'plan {plan} not found')
+ r, detail = self.execute(plan_)
+ self.plan_rm(plan)
+ return (r, '', detail)
+
+ def shutdown(self) -> None:
+ self.log.info('Stopping')
+ self.run = False
+ self.event.set()
+
+ def time_permit(self) -> bool:
+ local_time = time.localtime()
+ time_of_day = time.strftime('%H%M', local_time)
+ weekday = (local_time.tm_wday + 1) % 7 # be compatible with C
+ permit = False
+
+ def check_time(time: str, option: str):
+ if len(time) != 4:
+ self.log.error('invalid time for %s - expected HHMM format', option)
+ try:
+ datetime.time(int(time[:2]), int(time[2:]))
+ except ValueError as err:
+ self.log.error('invalid time for %s - %s', option, err)
+
+ begin_time = cast(str, self.get_module_option('begin_time'))
+ check_time(begin_time, 'begin_time')
+ end_time = cast(str, self.get_module_option('end_time'))
+ check_time(end_time, 'end_time')
+ if begin_time < end_time:
+ permit = begin_time <= time_of_day < end_time
+ elif begin_time == end_time:
+ permit = True
+ else:
+ permit = time_of_day >= begin_time or time_of_day < end_time
+ if not permit:
+ self.log.debug("should run between %s - %s, now %s, skipping",
+ begin_time, end_time, time_of_day)
+ return False
+
+ begin_weekday = cast(int, self.get_module_option('begin_weekday'))
+ end_weekday = cast(int, self.get_module_option('end_weekday'))
+ if begin_weekday < end_weekday:
+ permit = begin_weekday <= weekday <= end_weekday
+ elif begin_weekday == end_weekday:
+ permit = True
+ else:
+ permit = weekday >= begin_weekday or weekday < end_weekday
+ if not permit:
+ self.log.debug("should run between weekday %d - %d, now %d, skipping",
+ begin_weekday, end_weekday, weekday)
+ return False
+
+ return True
+
+ def serve(self) -> None:
+ self.log.info('Starting')
+ while self.run:
+ self.active = cast(bool, self.get_module_option('active'))
+ sleep_interval = cast(float, self.get_module_option('sleep_interval'))
+ self.log.debug('Waking up [%s, now %s]',
+ "active" if self.active else "inactive",
+ time.strftime(TIME_FORMAT, time.localtime()))
+ if self.active and self.time_permit():
+ self.log.debug('Running')
+ name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime())
+ osdmap = self.get_osdmap()
+ pool_ids = cast(str, self.get_module_option('pool_ids'))
+ if pool_ids:
+ allow = [int(p) for p in pool_ids.split(',')]
+ else:
+ allow = []
+ final: List[str] = []
+ if allow:
+ pools = osdmap.dump().get('pools', [])
+ valid = [p['pool'] for p in pools]
+ ids = set(allow) & set(valid)
+ if set(allow) - set(valid): # some pools were gone, prune
+ self.set_module_option('pool_ids', ','.join(str(p) for p in ids))
+ pool_name_by_id = dict((p['pool'], p['pool_name']) for p in pools)
+ final = [pool_name_by_id[p] for p in ids if p in pool_name_by_id]
+ plan = self.plan_create(name, osdmap, final)
+ self.optimizing = True
+ self.last_optimize_started = time.asctime(time.localtime())
+ self.optimize_result = self.in_progress_string
+ start = time.time()
+ r, detail = self.optimize(plan)
+ end = time.time()
+ self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
+ if r == 0:
+ self.optimize_result = self.success_string
+ self.execute(plan)
+ else:
+ self.optimize_result = detail
+ self.optimizing = False
+ self.log.debug('Sleeping for %d', sleep_interval)
+ self.event.wait(sleep_interval)
+ self.event.clear()
+
+ def plan_create(self, name: str, osdmap: OSDMap, pools: List[str]) -> Plan:
+ mode = cast(str, self.get_module_option('mode'))
+ if mode == 'upmap':
+ # drop unnecessary MS member for upmap mode.
+ # this way we could effectively eliminate the usage of a
+ # complete pg_stats, which can become horribly inefficient
+ # as pg_num grows..
+ plan = Plan(name, mode, osdmap, pools)
+ else:
+ plan = MsPlan(name,
+ mode,
+ MappingState(osdmap,
+ self.get("pg_stats"),
+ self.get("pool_stats"),
+ 'plan %s initial' % name),
+ pools)
+ return plan
+
+ def calc_eval(self, ms: MappingState, pools: List[str]) -> Eval:
+ pe = Eval(ms)
+ pool_rule = {}
+ pool_info = {}
+ for p in ms.osdmap_dump.get('pools', []):
+ if len(pools) and p['pool_name'] not in pools:
+ continue
+ # skip dead or not-yet-ready pools too
+ if p['pool'] not in ms.poolids:
+ continue
+ pe.pool_name[p['pool']] = p['pool_name']
+ pe.pool_id[p['pool_name']] = p['pool']
+ pool_rule[p['pool_name']] = p['crush_rule']
+ pe.pool_roots[p['pool_name']] = []
+ pool_info[p['pool_name']] = p
+ if len(pool_info) == 0:
+ return pe
+ self.log.debug('pool_name %s' % pe.pool_name)
+ self.log.debug('pool_id %s' % pe.pool_id)
+ self.log.debug('pools %s' % pools)
+ self.log.debug('pool_rule %s' % pool_rule)
+
+ osd_weight = {a['osd']: a['weight']
+ for a in ms.osdmap_dump.get('osds', []) if a['weight'] > 0}
+
+ # get expected distributions by root
+ actual_by_root: Dict[str, Dict[str, dict]] = {}
+ rootids = ms.crush.find_takes()
+ roots = []
+ for rootid in rootids:
+ ls = ms.osdmap.get_pools_by_take(rootid)
+ want = []
+ # find out roots associating with pools we are passed in
+ for candidate in ls:
+ if candidate in pe.pool_name:
+ want.append(candidate)
+ if len(want) == 0:
+ continue
+ root = ms.crush.get_item_name(rootid)
+ pe.root_pools[root] = []
+ for poolid in want:
+ pe.pool_roots[pe.pool_name[poolid]].append(root)
+ pe.root_pools[root].append(pe.pool_name[poolid])
+ pe.root_ids[root] = rootid
+ roots.append(root)
+ weight_map = ms.crush.get_take_weight_osd_map(rootid)
+ adjusted_map = {
+ osd: cw * osd_weight[osd]
+ for osd, cw in weight_map.items() if osd in osd_weight and cw > 0
+ }
+ sum_w = sum(adjusted_map.values())
+ assert len(adjusted_map) == 0 or sum_w > 0
+ pe.target_by_root[root] = {osd: w / sum_w
+ for osd, w in adjusted_map.items()}
+ actual_by_root[root] = {
+ 'pgs': {},
+ 'objects': {},
+ 'bytes': {},
+ }
+ for osd in pe.target_by_root[root]:
+ actual_by_root[root]['pgs'][osd] = 0
+ actual_by_root[root]['objects'][osd] = 0
+ actual_by_root[root]['bytes'][osd] = 0
+ pe.total_by_root[root] = {
+ 'pgs': 0,
+ 'objects': 0,
+ 'bytes': 0,
+ }
+ self.log.debug('pool_roots %s' % pe.pool_roots)
+ self.log.debug('root_pools %s' % pe.root_pools)
+ self.log.debug('target_by_root %s' % pe.target_by_root)
+
+ # pool and root actual
+ for pool, pi in pool_info.items():
+ poolid = pi['pool']
+ pm = ms.pg_up_by_poolid[poolid]
+ pgs = 0
+ objects = 0
+ bytes = 0
+ pgs_by_osd = {}
+ objects_by_osd = {}
+ bytes_by_osd = {}
+ for pgid, up in pm.items():
+ for osd in [int(osd) for osd in up]:
+ if osd == CRUSHMap.ITEM_NONE:
+ continue
+ if osd not in pgs_by_osd:
+ pgs_by_osd[osd] = 0
+ objects_by_osd[osd] = 0
+ bytes_by_osd[osd] = 0
+ pgs_by_osd[osd] += 1
+ objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects']
+ bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes']
+ # pick a root to associate this pg instance with.
+ # note that this is imprecise if the roots have
+ # overlapping children.
+ # FIXME: divide bytes by k for EC pools.
+ for root in pe.pool_roots[pool]:
+ if osd in pe.target_by_root[root]:
+ actual_by_root[root]['pgs'][osd] += 1
+ actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects']
+ actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes']
+ pgs += 1
+ objects += ms.pg_stat[pgid]['num_objects']
+ bytes += ms.pg_stat[pgid]['num_bytes']
+ pe.total_by_root[root]['pgs'] += 1
+ pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects']
+ pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes']
+ break
+ pe.count_by_pool[pool] = {
+ 'pgs': {
+ k: v
+ for k, v in pgs_by_osd.items()
+ },
+ 'objects': {
+ k: v
+ for k, v in objects_by_osd.items()
+ },
+ 'bytes': {
+ k: v
+ for k, v in bytes_by_osd.items()
+ },
+ }
+ pe.actual_by_pool[pool] = {
+ 'pgs': {
+ k: float(v) / float(max(pgs, 1))
+ for k, v in pgs_by_osd.items()
+ },
+ 'objects': {
+ k: float(v) / float(max(objects, 1))
+ for k, v in objects_by_osd.items()
+ },
+ 'bytes': {
+ k: float(v) / float(max(bytes, 1))
+ for k, v in bytes_by_osd.items()
+ },
+ }
+ pe.total_by_pool[pool] = {
+ 'pgs': pgs,
+ 'objects': objects,
+ 'bytes': bytes,
+ }
+ for root in pe.total_by_root:
+ pe.count_by_root[root] = {
+ 'pgs': {
+ k: float(v)
+ for k, v in actual_by_root[root]['pgs'].items()
+ },
+ 'objects': {
+ k: float(v)
+ for k, v in actual_by_root[root]['objects'].items()
+ },
+ 'bytes': {
+ k: float(v)
+ for k, v in actual_by_root[root]['bytes'].items()
+ },
+ }
+ pe.actual_by_root[root] = {
+ 'pgs': {
+ k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1))
+ for k, v in actual_by_root[root]['pgs'].items()
+ },
+ 'objects': {
+ k: float(v) / float(max(pe.total_by_root[root]['objects'], 1))
+ for k, v in actual_by_root[root]['objects'].items()
+ },
+ 'bytes': {
+ k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1))
+ for k, v in actual_by_root[root]['bytes'].items()
+ },
+ }
+ self.log.debug('actual_by_pool %s' % pe.actual_by_pool)
+ self.log.debug('actual_by_root %s' % pe.actual_by_root)
+
+ # average and stddev and score
+ pe.stats_by_root = {
+ a: pe.calc_stats(
+ b,
+ pe.target_by_root[a],
+ pe.total_by_root[a]
+ ) for a, b in pe.count_by_root.items()
+ }
+ self.log.debug('stats_by_root %s' % pe.stats_by_root)
+
+ # the scores are already normalized
+ pe.score_by_root = {
+ r: {
+ 'pgs': pe.stats_by_root[r]['pgs']['score'],
+ 'objects': pe.stats_by_root[r]['objects']['score'],
+ 'bytes': pe.stats_by_root[r]['bytes']['score'],
+ } for r in pe.total_by_root.keys()
+ }
+ self.log.debug('score_by_root %s' % pe.score_by_root)
+
+ # get the list of score metrics, comma separated
+ metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',')
+
+ # total score is just average of normalized stddevs
+ pe.score = 0.0
+ for r, vs in pe.score_by_root.items():
+ for k, v in vs.items():
+ if k in metrics:
+ pe.score += v
+ pe.score /= len(metrics) * len(roots)
+ return pe
+
+ def evaluate(self, ms: MappingState, pools: List[str], verbose: bool = False) -> str:
+ pe = self.calc_eval(ms, pools)
+ return pe.show(verbose=verbose)
+
+ def optimize(self, plan: Plan) -> Tuple[int, str]:
+ self.log.info('Optimize plan %s' % plan.name)
+ max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio'))
+ self.log.info('Mode %s, max misplaced %f' %
+ (plan.mode, max_misplaced))
+
+ info = self.get('pg_status')
+ unknown = info.get('unknown_pgs_ratio', 0.0)
+ degraded = info.get('degraded_ratio', 0.0)
+ inactive = info.get('inactive_pgs_ratio', 0.0)
+ misplaced = info.get('misplaced_ratio', 0.0)
+ plan.pg_status = info
+ self.log.debug('unknown %f degraded %f inactive %f misplaced %g',
+ unknown, degraded, inactive, misplaced)
+ if unknown > 0.0:
+ detail = 'Some PGs (%f) are unknown; try again later' % unknown
+ self.log.info(detail)
+ return -errno.EAGAIN, detail
+ elif degraded > 0.0:
+ detail = 'Some objects (%f) are degraded; try again later' % degraded
+ self.log.info(detail)
+ return -errno.EAGAIN, detail
+ elif inactive > 0.0:
+ detail = 'Some PGs (%f) are inactive; try again later' % inactive
+ self.log.info(detail)
+ return -errno.EAGAIN, detail
+ elif misplaced >= max_misplaced:
+ detail = 'Too many objects (%f > %f) are misplaced; ' \
+ 'try again later' % (misplaced, max_misplaced)
+ self.log.info(detail)
+ return -errno.EAGAIN, detail
+ else:
+ if plan.mode == 'upmap':
+ return self.do_upmap(plan)
+ elif plan.mode == 'crush-compat':
+ return self.do_crush_compat(cast(MsPlan, plan))
+ elif plan.mode == 'none':
+ detail = 'Please do "ceph balancer mode" to choose a valid mode first'
+ self.log.info('Idle')
+ return -errno.ENOEXEC, detail
+ else:
+ detail = 'Unrecognized mode %s' % plan.mode
+ self.log.info(detail)
+ return -errno.EINVAL, detail
+
+ def do_upmap(self, plan: Plan) -> Tuple[int, str]:
+ self.log.info('do_upmap')
+ max_optimizations = cast(float, self.get_module_option('upmap_max_optimizations'))
+ max_deviation = cast(int, self.get_module_option('upmap_max_deviation'))
+ osdmap_dump = plan.osdmap_dump
+
+ if len(plan.pools):
+ pools = plan.pools
+ else: # all
+ pools = [str(i['pool_name']) for i in osdmap_dump.get('pools', [])]
+ if len(pools) == 0:
+ detail = 'No pools available'
+ self.log.info(detail)
+ return -errno.ENOENT, detail
+ # shuffle pool list so they all get equal (in)attention
+ random.shuffle(pools)
+ self.log.info('pools %s' % pools)
+
+ adjusted_pools = []
+ inc = plan.inc
+ total_did = 0
+ left = max_optimizations
+ pools_with_pg_merge = [p['pool_name'] for p in osdmap_dump.get('pools', [])
+ if p['pg_num'] > p['pg_num_target']]
+ crush_rule_by_pool_name = dict((p['pool_name'], p['crush_rule'])
+ for p in osdmap_dump.get('pools', []))
+ for pool in pools:
+ if pool not in crush_rule_by_pool_name:
+ self.log.info('pool %s does not exist' % pool)
+ continue
+ if pool in pools_with_pg_merge:
+ self.log.info('pool %s has pending PG(s) for merging, skipping for now' % pool)
+ continue
+ adjusted_pools.append(pool)
+ # shuffle so all pools get equal (in)attention
+ random.shuffle(adjusted_pools)
+ pool_dump = osdmap_dump.get('pools', [])
+ for pool in adjusted_pools:
+ for p in pool_dump:
+ if p['pool_name'] == pool:
+ pool_id = p['pool']
+ break
+
+ # note that here we deliberately exclude any scrubbing pgs too
+ # since scrubbing activities have significant impacts on performance
+ num_pg_active_clean = 0
+ for p in plan.pg_status.get('pgs_by_pool_state', []):
+ pgs_pool_id = p['pool_id']
+ if pgs_pool_id != pool_id:
+ continue
+ for s in p['pg_state_counts']:
+ if s['state_name'] == 'active+clean':
+ num_pg_active_clean += s['count']
+ break
+ available = min(left, num_pg_active_clean)
+ did = plan.osdmap.calc_pg_upmaps(inc, max_deviation, available, [pool])
+ total_did += did
+ left -= did
+ if left <= 0:
+ break
+ self.log.info('prepared %d/%d changes' % (total_did, max_optimizations))
+ if total_did == 0:
+ self.no_optimization_needed = True
+ return -errno.EALREADY, 'Unable to find further optimization, ' \
+ 'or pool(s) pg_num is decreasing, ' \
+ 'or distribution is already perfect'
+ return 0, ''
+
+ def do_crush_compat(self, plan: MsPlan) -> Tuple[int, str]:
+ self.log.info('do_crush_compat')
+ max_iterations = cast(int, self.get_module_option('crush_compat_max_iterations'))
+ if max_iterations < 1:
+ return -errno.EINVAL, '"crush_compat_max_iterations" must be >= 1'
+ step = cast(float, self.get_module_option('crush_compat_step'))
+ if step <= 0 or step >= 1.0:
+ return -errno.EINVAL, '"crush_compat_step" must be in (0, 1)'
+ max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio'))
+ min_pg_per_osd = 2
+
+ ms = plan.initial
+ osdmap = ms.osdmap
+ crush = osdmap.get_crush()
+ pe = self.calc_eval(ms, plan.pools)
+ min_score_to_optimize = cast(float, self.get_module_option('min_score'))
+ if pe.score <= min_score_to_optimize:
+ if pe.score == 0:
+ detail = 'Distribution is already perfect'
+ else:
+ detail = 'score %f <= min_score %f, will not optimize' \
+ % (pe.score, min_score_to_optimize)
+ self.log.info(detail)
+ return -errno.EALREADY, detail
+
+ # get current osd reweights
+ orig_osd_weight = {a['osd']: a['weight']
+ for a in ms.osdmap_dump.get('osds', [])}
+
+ # get current compat weight-set weights
+ orig_ws = self.get_compat_weight_set_weights(ms)
+ if not orig_ws:
+ return -errno.EAGAIN, 'compat weight-set not available'
+ orig_ws = {a: b for a, b in orig_ws.items() if a >= 0}
+
+ # Make sure roots don't overlap their devices. If so, we
+ # can't proceed.
+ roots = list(pe.target_by_root.keys())
+ self.log.debug('roots %s', roots)
+ visited: Dict[int, str] = {}
+ overlap: Dict[int, List[str]] = {}
+ for root, wm in pe.target_by_root.items():
+ for osd in wm:
+ if osd in visited:
+ if osd not in overlap:
+ overlap[osd] = [visited[osd]]
+ overlap[osd].append(root)
+ visited[osd] = root
+ if len(overlap) > 0:
+ detail = 'Some osds belong to multiple subtrees: %s' % \
+ overlap
+ self.log.error(detail)
+ return -errno.EOPNOTSUPP, detail
+
+ # rebalance by pgs, objects, or bytes
+ metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',')
+ key = metrics[0] # balancing using the first score metric
+ if key not in ['pgs', 'bytes', 'objects']:
+ self.log.warning("Invalid crush_compat balancing key %s. Using 'pgs'." % key)
+ key = 'pgs'
+
+ # go
+ best_ws = copy.deepcopy(orig_ws)
+ best_ow = copy.deepcopy(orig_osd_weight)
+ best_pe = pe
+ left = max_iterations
+ bad_steps = 0
+ next_ws = copy.deepcopy(best_ws)
+ next_ow = copy.deepcopy(best_ow)
+ while left > 0:
+ # adjust
+ self.log.debug('best_ws %s' % best_ws)
+ random.shuffle(roots)
+ for root in roots:
+ pools = best_pe.root_pools[root]
+ osds = len(best_pe.target_by_root[root])
+ min_pgs = osds * min_pg_per_osd
+ if best_pe.total_by_root[root][key] < min_pgs:
+ self.log.info('Skipping root %s (pools %s), total pgs %d '
+ '< minimum %d (%d per osd)',
+ root, pools,
+ best_pe.total_by_root[root][key],
+ min_pgs, min_pg_per_osd)
+ continue
+ self.log.info('Balancing root %s (pools %s) by %s' %
+ (root, pools, key))
+ target = best_pe.target_by_root[root]
+ actual = best_pe.actual_by_root[root][key]
+ queue = sorted(actual.keys(),
+ key=lambda osd: -abs(target[osd] - actual[osd]))
+ for osd in queue:
+ if orig_osd_weight[osd] == 0:
+ self.log.debug('skipping out osd.%d', osd)
+ else:
+ deviation = target[osd] - actual[osd]
+ if deviation == 0:
+ break
+ self.log.debug('osd.%d deviation %f', osd, deviation)
+ weight = best_ws[osd]
+ ow = orig_osd_weight[osd]
+ if actual[osd] > 0:
+ calc_weight = target[osd] / actual[osd] * weight * ow
+ else:
+ # for newly created osds, reset calc_weight at target value
+ # this way weight-set will end up absorbing *step* of its
+ # target (final) value at the very beginning and slowly catch up later.
+ # note that if this turns out causing too many misplaced
+ # pgs, then we'll reduce step and retry
+ calc_weight = target[osd]
+ new_weight = weight * (1.0 - step) + calc_weight * step
+ self.log.debug('Reweight osd.%d %f -> %f', osd, weight,
+ new_weight)
+ next_ws[osd] = new_weight
+ if ow < 1.0:
+ new_ow = min(1.0, max(step + (1.0 - step) * ow,
+ ow + .005))
+ self.log.debug('Reweight osd.%d reweight %f -> %f',
+ osd, ow, new_ow)
+ next_ow[osd] = new_ow
+
+ # normalize weights under this root
+ root_weight = crush.get_item_weight(pe.root_ids[root])
+ root_sum = sum(b for a, b in next_ws.items()
+ if a in target.keys())
+ if root_sum > 0 and root_weight > 0:
+ factor = root_sum / root_weight
+ self.log.debug('normalizing root %s %d, weight %f, '
+ 'ws sum %f, factor %f',
+ root, pe.root_ids[root], root_weight,
+ root_sum, factor)
+ for osd in actual.keys():
+ next_ws[osd] = next_ws[osd] / factor
+
+ # recalc
+ plan.compat_ws = copy.deepcopy(next_ws)
+ next_ms = plan.final_state()
+ next_pe = self.calc_eval(next_ms, plan.pools)
+ next_misplaced = next_ms.calc_misplaced_from(ms)
+ self.log.debug('Step result score %f -> %f, misplacing %f',
+ best_pe.score, next_pe.score, next_misplaced)
+
+ if next_misplaced > max_misplaced:
+ if best_pe.score < pe.score:
+ self.log.debug('Step misplaced %f > max %f, stopping',
+ next_misplaced, max_misplaced)
+ break
+ step /= 2.0
+ next_ws = copy.deepcopy(best_ws)
+ next_ow = copy.deepcopy(best_ow)
+ self.log.debug('Step misplaced %f > max %f, reducing step to %f',
+ next_misplaced, max_misplaced, step)
+ else:
+ if next_pe.score > best_pe.score * 1.0001:
+ bad_steps += 1
+ if bad_steps < 5 and random.randint(0, 100) < 70:
+ self.log.debug('Score got worse, taking another step')
+ else:
+ step /= 2.0
+ next_ws = copy.deepcopy(best_ws)
+ next_ow = copy.deepcopy(best_ow)
+ self.log.debug('Score got worse, trying smaller step %f',
+ step)
+ else:
+ bad_steps = 0
+ best_pe = next_pe
+ best_ws = copy.deepcopy(next_ws)
+ best_ow = copy.deepcopy(next_ow)
+ if best_pe.score == 0:
+ break
+ left -= 1
+
+ # allow a small regression if we are phasing out osd weights
+ fudge = 0.0
+ if best_ow != orig_osd_weight:
+ fudge = .001
+
+ if best_pe.score < pe.score + fudge:
+ self.log.info('Success, score %f -> %f', pe.score, best_pe.score)
+ plan.compat_ws = best_ws
+ for osd, w in best_ow.items():
+ if w != orig_osd_weight[osd]:
+ self.log.debug('osd.%d reweight %f', osd, w)
+ plan.osd_weights[osd] = w
+ return 0, ''
+ else:
+ self.log.info('Failed to find further optimization, score %f',
+ pe.score)
+ plan.compat_ws = {}
+ return -errno.EDOM, 'Unable to find further optimization, ' \
+ 'change balancer mode and retry might help'
+
+ def get_compat_weight_set_weights(self, ms: MappingState):
+ have_choose_args = CRUSHMap.have_default_choose_args(ms.crush_dump)
+ if have_choose_args:
+ # get number of buckets in choose_args
+ choose_args_len = len(CRUSHMap.get_default_choose_args(ms.crush_dump))
+ if not have_choose_args or choose_args_len != len(ms.crush_dump['buckets']):
+ # enable compat weight-set first
+ self.log.debug('no choose_args or all buckets do not have weight-sets')
+ self.log.debug('ceph osd crush weight-set create-compat')
+ result = CommandResult('')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd crush weight-set create-compat',
+ 'format': 'json',
+ }), '')
+ r, outb, outs = result.wait()
+ if r != 0:
+ self.log.error('Error creating compat weight-set')
+ return
+
+ result = CommandResult('')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd crush dump',
+ 'format': 'json',
+ }), '')
+ r, outb, outs = result.wait()
+ if r != 0:
+ self.log.error('Error dumping crush map')
+ return
+ try:
+ crushmap = json.loads(outb)
+ except json.JSONDecodeError:
+ raise RuntimeError('unable to parse crush map')
+ else:
+ crushmap = ms.crush_dump
+
+ raw = CRUSHMap.get_default_choose_args(crushmap)
+ weight_set = {}
+ for b in raw:
+ bucket = None
+ for t in crushmap['buckets']:
+ if t['id'] == b['bucket_id']:
+ bucket = t
+ break
+ if not bucket:
+ raise RuntimeError('could not find bucket %s' % b['bucket_id'])
+ self.log.debug('bucket items %s' % bucket['items'])
+ self.log.debug('weight set %s' % b['weight_set'][0])
+ if len(bucket['items']) != len(b['weight_set'][0]):
+ raise RuntimeError('weight-set size does not match bucket items')
+ for pos in range(len(bucket['items'])):
+ weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos]
+
+ self.log.debug('weight_set weights %s' % weight_set)
+ return weight_set
+
+ def do_crush(self) -> None:
+ self.log.info('do_crush (not yet implemented)')
+
+ def do_osd_weight(self) -> None:
+ self.log.info('do_osd_weight (not yet implemented)')
+
+ def execute(self, plan: Plan) -> Tuple[int, str]:
+ self.log.info('Executing plan %s' % plan.name)
+
+ commands = []
+
+ # compat weight-set
+ if len(plan.compat_ws):
+ ms_plan = cast(MsPlan, plan)
+ if not CRUSHMap.have_default_choose_args(ms_plan.initial.crush_dump):
+ self.log.debug('ceph osd crush weight-set create-compat')
+ result = CommandResult('')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd crush weight-set create-compat',
+ 'format': 'json',
+ }), '')
+ r, outb, outs = result.wait()
+ if r != 0:
+ self.log.error('Error creating compat weight-set')
+ return r, outs
+
+ for osd, weight in plan.compat_ws.items():
+ self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f',
+ osd, weight)
+ result = CommandResult('')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd crush weight-set reweight-compat',
+ 'format': 'json',
+ 'item': 'osd.%d' % osd,
+ 'weight': [weight],
+ }), '')
+ commands.append(result)
+
+ # new_weight
+ reweightn = {}
+ for osd, weight in plan.osd_weights.items():
+ reweightn[str(osd)] = str(int(weight * float(0x10000)))
+ if len(reweightn):
+ self.log.info('ceph osd reweightn %s', reweightn)
+ result = CommandResult('')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd reweightn',
+ 'format': 'json',
+ 'weights': json.dumps(reweightn),
+ }), '')
+ commands.append(result)
+
+ # upmap
+ incdump = plan.inc.dump()
+ for item in incdump.get('new_pg_upmap', []):
+ self.log.info('ceph osd pg-upmap %s mappings %s', item['pgid'],
+ item['osds'])
+ result = CommandResult('foo')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd pg-upmap',
+ 'format': 'json',
+ 'pgid': item['pgid'],
+ 'id': item['osds'],
+ }), 'foo')
+ commands.append(result)
+
+ for pgid in incdump.get('old_pg_upmap', []):
+ self.log.info('ceph osd rm-pg-upmap %s', pgid)
+ result = CommandResult('foo')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd rm-pg-upmap',
+ 'format': 'json',
+ 'pgid': pgid,
+ }), 'foo')
+ commands.append(result)
+
+ for item in incdump.get('new_pg_upmap_items', []):
+ self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'],
+ item['mappings'])
+ osdlist = []
+ for m in item['mappings']:
+ osdlist += [m['from'], m['to']]
+ result = CommandResult('foo')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd pg-upmap-items',
+ 'format': 'json',
+ 'pgid': item['pgid'],
+ 'id': osdlist,
+ }), 'foo')
+ commands.append(result)
+
+ for pgid in incdump.get('old_pg_upmap_items', []):
+ self.log.info('ceph osd rm-pg-upmap-items %s', pgid)
+ result = CommandResult('foo')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd rm-pg-upmap-items',
+ 'format': 'json',
+ 'pgid': pgid,
+ }), 'foo')
+ commands.append(result)
+
+ # wait for commands
+ self.log.debug('commands %s' % commands)
+ for result in commands:
+ r, outb, outs = result.wait()
+ if r != 0:
+ self.log.error('execute error: r = %d, detail = %s' % (r, outs))
+ return r, outs
+ self.log.debug('done')
+ return 0, ''
+
+ def gather_telemetry(self) -> Dict[str, Any]:
+ return {
+ 'active': self.active,
+ 'mode': self.mode,
+ }