summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/mirroring/fs/dir_map/policy.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/mirroring/fs/dir_map/policy.py')
-rw-r--r--src/pybind/mgr/mirroring/fs/dir_map/policy.py380
1 files changed, 380 insertions, 0 deletions
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/policy.py b/src/pybind/mgr/mirroring/fs/dir_map/policy.py
new file mode 100644
index 000000000..aef90b55f
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/dir_map/policy.py
@@ -0,0 +1,380 @@
+import os
+import errno
+import logging
+import time
+from threading import Lock
+from typing import Dict
+
+from .state_transition import ActionType, PolicyAction, Transition, \
+ State, StateTransition
+from ..exception import MirrorException
+
+log = logging.getLogger(__name__)
+
+class DirectoryState:
+ def __init__(self, instance_id=None, mapped_time=None):
+ self.instance_id = instance_id
+ self.mapped_time = mapped_time
+ self.state = State.UNASSOCIATED
+ self.stalled = False
+ self.transition = Transition(ActionType.NONE)
+ self.next_state = None
+ self.purging = False
+
+ def __str__(self):
+ return f'[instance_id={self.instance_id}, mapped_time={self.mapped_time},'\
+ f' state={self.state}, transition={self.transition}, next_state={self.next_state},'\
+ f' purging={self.purging}]'
+
+class Policy:
+ # number of seconds after which a directory can be reshuffled
+ # to other mirror daemon instances.
+ DIR_SHUFFLE_THROTTLE_INTERVAL = 300
+
+ def __init__(self):
+ self.dir_states = {}
+ self.instance_to_dir_map = {}
+ self.dead_instances = []
+ self.lock = Lock()
+
+ @staticmethod
+ def is_instance_action(action_type):
+ return action_type in (ActionType.ACQUIRE,
+ ActionType.RELEASE)
+
+ def is_dead_instance(self, instance_id):
+ return instance_id in self.dead_instances
+
+ def is_state_scheduled(self, dir_state, state):
+ return dir_state.state == state or dir_state.next_state == state
+
+ def is_shuffling(self, dir_path):
+ log.debug(f'is_shuffling: {dir_path}')
+ return self.is_state_scheduled(self.dir_states[dir_path], State.SHUFFLING)
+
+ def can_shuffle_dir(self, dir_path):
+ """Right now, shuffle directories only based on idleness. Later, we
+ probably want to avoid shuffling images that were recently shuffled.
+ """
+ log.debug(f'can_shuffle_dir: {dir_path}')
+ dir_state = self.dir_states[dir_path]
+ return StateTransition.is_idle(dir_state.state) and \
+ (time.time() - dir_state['mapped_time']) > Policy.DIR_SHUFFLE_THROTTLE_INTERVAL
+
+ def set_state(self, dir_state, state, ignore_current_state=False):
+ if not ignore_current_state and dir_state.state == state:
+ return False
+ elif StateTransition.is_idle(dir_state.state):
+ dir_state.state = state
+ dir_state.next_state = None
+ dir_state.transition = StateTransition.transit(
+ dir_state.state, dir_state.transition.action_type)
+ return True
+ dir_state.next_state = state
+ return False
+
+ def init(self, dir_mapping):
+ with self.lock:
+ for dir_path, dir_map in dir_mapping.items():
+ instance_id = dir_map['instance_id']
+ if instance_id:
+ if not instance_id in self.instance_to_dir_map:
+ self.instance_to_dir_map[instance_id] = []
+ self.instance_to_dir_map[instance_id].append(dir_path)
+ self.dir_states[dir_path] = DirectoryState(instance_id, dir_map['last_shuffled'])
+ dir_state = self.dir_states[dir_path]
+ state = State.INITIALIZING if instance_id else State.ASSOCIATING
+ purging = dir_map.get('purging', 0)
+ if purging:
+ dir_state.purging = True
+ state = State.DISASSOCIATING
+ if not instance_id:
+ dir_state.transition = StateTransition.transit(state,
+ dir_state.transition.action_type)
+ log.debug(f'starting state: {dir_path} {state}: {dir_state}')
+ self.set_state(dir_state, state)
+ log.debug(f'init dir_state: {dir_state}')
+
+ def lookup(self, dir_path):
+ log.debug(f'looking up {dir_path}')
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if dir_state:
+ return {'instance_id': dir_state.instance_id,
+ 'mapped_time': dir_state.mapped_time,
+ 'purging': dir_state.purging}
+ return None
+
+ def map(self, dir_path, dir_state):
+ log.debug(f'mapping {dir_path}')
+ min_instance_id = None
+ current_instance_id = dir_state.instance_id
+ if current_instance_id and not self.is_dead_instance(current_instance_id):
+ return True
+ if self.is_dead_instance(current_instance_id):
+ self.unmap(dir_path, dir_state)
+ for instance_id, dir_paths in self.instance_to_dir_map.items():
+ if self.is_dead_instance(instance_id):
+ continue
+ if not min_instance_id or len(dir_paths) < len(self.instance_to_dir_map[min_instance_id]):
+ min_instance_id = instance_id
+ if not min_instance_id:
+ log.debug(f'instance unavailable for {dir_path}')
+ return False
+ log.debug(f'dir_path {dir_path} maps to instance {min_instance_id}')
+ dir_state.instance_id = min_instance_id
+ dir_state.mapped_time = time.time()
+ self.instance_to_dir_map[min_instance_id].append(dir_path)
+ return True
+
+ def unmap(self, dir_path, dir_state):
+ instance_id = dir_state.instance_id
+ log.debug(f'unmapping {dir_path} from instance {instance_id}')
+ self.instance_to_dir_map[instance_id].remove(dir_path)
+ dir_state.instance_id = None
+ dir_state.mapped_time = None
+ if self.is_dead_instance(instance_id) and not self.instance_to_dir_map[instance_id]:
+ self.instance_to_dir_map.pop(instance_id)
+ self.dead_instances.remove(instance_id)
+
+ def shuffle(self, dirs_per_instance, include_stalled_dirs):
+ log.debug(f'directories per instance: {dirs_per_instance}')
+ shuffle_dirs = []
+ for instance_id, dir_paths in self.instance_to_dir_map.items():
+ cut_off = len(dir_paths) - dirs_per_instance
+ if cut_off > 0:
+ for dir_path in dir_paths:
+ if cut_off == 0:
+ break
+ if self.is_shuffling(dir_path):
+ cut_off -= 1
+ elif self.can_shuffle_dir(dir_path):
+ cut_off -= 1
+ shuffle_dirs.append(dir_path)
+ if include_stalled_dirs:
+ for dir_path, dir_state in self.dir_states.items():
+ if dir_state.stalled:
+ log.debug(f'{dir_path} is stalled: {dir_state} -- trigerring kick')
+ dir_state.stalled = False
+ shuffle_dirs.append(dir_path)
+ return shuffle_dirs
+
+ def execute_policy_action(self, dir_path, dir_state, policy_action):
+ log.debug(f'executing for directory {dir_path} policy_action {policy_action}')
+
+ done = True
+ if policy_action == PolicyAction.MAP:
+ done = self.map(dir_path, dir_state)
+ elif policy_action == PolicyAction.UNMAP:
+ self.unmap(dir_path, dir_state)
+ elif policy_action == PolicyAction.REMOVE:
+ if dir_state.state == State.UNASSOCIATED:
+ self.dir_states.pop(dir_path)
+ else:
+ raise Exception()
+ return done
+
+ def start_action(self, dir_path):
+ log.debug(f'start action: {dir_path}')
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if not dir_state:
+ raise Exception()
+ log.debug(f'dir_state: {dir_state}')
+ if dir_state.transition.start_policy_action:
+ stalled = not self.execute_policy_action(dir_path, dir_state,
+ dir_state.transition.start_policy_action)
+ if stalled:
+ next_action = ActionType.NONE
+ if dir_state.purging:
+ dir_state.next_state = None
+ dir_state.state = State.UNASSOCIATED
+ dir_state.transition = StateTransition.transit(State.DISASSOCIATING, ActionType.NONE)
+ self.set_state(dir_state, State.DISASSOCIATING)
+ next_action = dir_state.transition.action_type
+ else:
+ dir_state.stalled = True
+ log.debug(f'state machine stalled')
+ return next_action
+ return dir_state.transition.action_type
+
+ def finish_action(self, dir_path, r):
+ log.debug(f'finish action {dir_path} r={r}')
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if not dir_state:
+ raise Exception()
+ if r < 0 and (not Policy.is_instance_action(dir_state.transition.action_type) or
+ not dir_state.instance_id or
+ not dir_state.instance_id in self.dead_instances):
+ return True
+ log.debug(f'dir_state: {dir_state}')
+ finish_policy_action = dir_state.transition.finish_policy_action
+ dir_state.transition = StateTransition.transit(
+ dir_state.state, dir_state.transition.action_type)
+ log.debug(f'transitioned to dir_state: {dir_state}')
+ if dir_state.transition.final_state:
+ log.debug('reached final state')
+ dir_state.state = dir_state.transition.final_state
+ dir_state.transition = Transition(ActionType.NONE)
+ log.debug(f'final dir_state: {dir_state}')
+ if StateTransition.is_idle(dir_state.state) and dir_state.next_state:
+ self.set_state(dir_state, dir_state.next_state)
+ pending = not dir_state.transition.action_type == ActionType.NONE
+ if finish_policy_action:
+ self.execute_policy_action(dir_path, dir_state, finish_policy_action)
+ return pending
+
+ def find_tracked_ancestor_or_subtree(self, dir_path):
+ for tracked_path, _ in self.dir_states.items():
+ comp = [dir_path, tracked_path]
+ cpath = os.path.commonpath(comp)
+ if cpath in comp:
+ what = 'subtree' if cpath == tracked_path else 'ancestor'
+ return (tracked_path, what)
+ return None
+
+ def add_dir(self, dir_path):
+ log.debug(f'adding dir_path {dir_path}')
+ with self.lock:
+ if dir_path in self.dir_states:
+ return False
+ as_info = self.find_tracked_ancestor_or_subtree(dir_path)
+ if as_info:
+ raise MirrorException(-errno.EINVAL, f'{dir_path} is a {as_info[1]} of tracked path {as_info[0]}')
+ self.dir_states[dir_path] = DirectoryState()
+ dir_state = self.dir_states[dir_path]
+ log.debug(f'add dir_state: {dir_state}')
+ if dir_state.state == State.INITIALIZING:
+ return False
+ return self.set_state(dir_state, State.ASSOCIATING)
+
+ def remove_dir(self, dir_path):
+ log.debug(f'removing dir_path {dir_path}')
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if not dir_state:
+ return False
+ log.debug(f'removing dir_state: {dir_state}')
+ dir_state.purging = True
+ # advance the state machine with DISASSOCIATING state for removal
+ if dir_state.stalled:
+ dir_state.state = State.UNASSOCIATED
+ dir_state.transition = StateTransition.transit(State.DISASSOCIATING, ActionType.NONE)
+ r = self.set_state(dir_state, State.DISASSOCIATING)
+ log.debug(f'dir_state: {dir_state}')
+ return r
+
+ def add_instances_initial(self, instance_ids):
+ """Take care of figuring out instances which no longer exist
+ and remove them. This is to be done only once on startup to
+ identify instances which were previously removed but directories
+ are still mapped (on-disk) to them.
+ """
+ for instance_id in instance_ids:
+ if not instance_id in self.instance_to_dir_map:
+ self.instance_to_dir_map[instance_id] = []
+ dead_instances = []
+ for instance_id, _ in self.instance_to_dir_map.items():
+ if not instance_id in instance_ids:
+ dead_instances.append(instance_id)
+ if dead_instances:
+ self._remove_instances(dead_instances)
+
+ def add_instances(self, instance_ids, initial_update=False):
+ log.debug(f'adding instances: {instance_ids} initial_update {initial_update}')
+ with self.lock:
+ if initial_update:
+ self.add_instances_initial(instance_ids)
+ else:
+ nr_instances = len(self.instance_to_dir_map)
+ nr_dead_instances = len(self.dead_instances)
+ if nr_instances > 0:
+ # adjust dead instances
+ nr_instances -= nr_dead_instances
+ include_stalled_dirs = nr_instances == 0
+ for instance_id in instance_ids:
+ if not instance_id in self.instance_to_dir_map:
+ self.instance_to_dir_map[instance_id] = []
+ dirs_per_instance = int(len(self.dir_states) /
+ (len(self.instance_to_dir_map) - nr_dead_instances))
+ if dirs_per_instance == 0:
+ dirs_per_instance += 1
+ shuffle_dirs = []
+ # super set of directories which are candidates for shuffling -- choose
+ # those which can be shuffle rightaway (others will be shuffled when
+ # they reach idle state).
+ shuffle_dirs_ss = self.shuffle(dirs_per_instance, include_stalled_dirs)
+ if include_stalled_dirs:
+ return shuffle_dirs_ss
+ for dir_path in shuffle_dirs_ss:
+ dir_state = self.dir_states[dir_path]
+ if self.set_state(dir_state, State.SHUFFLING):
+ shuffle_dirs.append(dir_path)
+ log.debug(f'remapping directories: {shuffle_dirs}')
+ return shuffle_dirs
+
+ def remove_instances(self, instance_ids):
+ with self.lock:
+ return self._remove_instances(instance_ids)
+
+ def _remove_instances(self, instance_ids):
+ log.debug(f'removing instances: {instance_ids}')
+ shuffle_dirs = []
+ for instance_id in instance_ids:
+ if not instance_id in self.instance_to_dir_map:
+ continue
+ if not self.instance_to_dir_map[instance_id]:
+ self.instance_to_dir_map.pop(instance_id)
+ continue
+ self.dead_instances.append(instance_id)
+ dir_paths = self.instance_to_dir_map[instance_id]
+ log.debug(f'force shuffling instance_id {instance_id}, directories {dir_paths}')
+ for dir_path in dir_paths:
+ dir_state = self.dir_states[dir_path]
+ if self.is_state_scheduled(dir_state, State.DISASSOCIATING):
+ log.debug(f'dir_path {dir_path} is disassociating, ignoring...')
+ continue
+ log.debug(f'shuffling dir_path {dir_path}')
+ if self.set_state(dir_state, State.SHUFFLING, True):
+ shuffle_dirs.append(dir_path)
+ log.debug(f'shuffling {shuffle_dirs}')
+ return shuffle_dirs
+
+ def dir_status(self, dir_path):
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if not dir_state:
+ raise MirrorException(-errno.ENOENT, f'{dir_path} is not tracked')
+ res = {} # type: Dict
+ if dir_state.stalled:
+ res['state'] = 'stalled'
+ res['reason'] = 'no mirror daemons running'
+ elif dir_state.state == State.ASSOCIATING:
+ res['state'] = 'mapping'
+ else:
+ state = None
+ dstate = dir_state.state
+ if dstate == State.ASSOCIATING:
+ state = 'mapping'
+ elif dstate == State.DISASSOCIATING:
+ state = 'unmapping'
+ elif dstate == State.SHUFFLING:
+ state = 'shuffling'
+ elif dstate == State.ASSOCIATED:
+ state = 'mapped'
+ elif dstate == State.INITIALIZING:
+ state = 'resolving'
+ res['state'] = state
+ res['instance_id'] = dir_state.instance_id
+ res['last_shuffled'] = dir_state.mapped_time
+ return res
+
+ def instance_summary(self):
+ with self.lock:
+ res = {
+ 'mapping': {}
+ } # type: Dict
+ for instance_id, dir_paths in self.instance_to_dir_map.items():
+ res['mapping'][instance_id] = f'{len(dir_paths)} directories'
+ return res