diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/pybind/mgr/mirroring/fs/dir_map/policy.py | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/mirroring/fs/dir_map/policy.py')
-rw-r--r-- | src/pybind/mgr/mirroring/fs/dir_map/policy.py | 380 |
1 files changed, 380 insertions, 0 deletions
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/policy.py b/src/pybind/mgr/mirroring/fs/dir_map/policy.py new file mode 100644 index 000000000..aef90b55f --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/dir_map/policy.py @@ -0,0 +1,380 @@ +import os +import errno +import logging +import time +from threading import Lock +from typing import Dict + +from .state_transition import ActionType, PolicyAction, Transition, \ + State, StateTransition +from ..exception import MirrorException + +log = logging.getLogger(__name__) + +class DirectoryState: + def __init__(self, instance_id=None, mapped_time=None): + self.instance_id = instance_id + self.mapped_time = mapped_time + self.state = State.UNASSOCIATED + self.stalled = False + self.transition = Transition(ActionType.NONE) + self.next_state = None + self.purging = False + + def __str__(self): + return f'[instance_id={self.instance_id}, mapped_time={self.mapped_time},'\ + f' state={self.state}, transition={self.transition}, next_state={self.next_state},'\ + f' purging={self.purging}]' + +class Policy: + # number of seconds after which a directory can be reshuffled + # to other mirror daemon instances. + DIR_SHUFFLE_THROTTLE_INTERVAL = 300 + + def __init__(self): + self.dir_states = {} + self.instance_to_dir_map = {} + self.dead_instances = [] + self.lock = Lock() + + @staticmethod + def is_instance_action(action_type): + return action_type in (ActionType.ACQUIRE, + ActionType.RELEASE) + + def is_dead_instance(self, instance_id): + return instance_id in self.dead_instances + + def is_state_scheduled(self, dir_state, state): + return dir_state.state == state or dir_state.next_state == state + + def is_shuffling(self, dir_path): + log.debug(f'is_shuffling: {dir_path}') + return self.is_state_scheduled(self.dir_states[dir_path], State.SHUFFLING) + + def can_shuffle_dir(self, dir_path): + """Right now, shuffle directories only based on idleness. Later, we + probably want to avoid shuffling images that were recently shuffled. + """ + log.debug(f'can_shuffle_dir: {dir_path}') + dir_state = self.dir_states[dir_path] + return StateTransition.is_idle(dir_state.state) and \ + (time.time() - dir_state['mapped_time']) > Policy.DIR_SHUFFLE_THROTTLE_INTERVAL + + def set_state(self, dir_state, state, ignore_current_state=False): + if not ignore_current_state and dir_state.state == state: + return False + elif StateTransition.is_idle(dir_state.state): + dir_state.state = state + dir_state.next_state = None + dir_state.transition = StateTransition.transit( + dir_state.state, dir_state.transition.action_type) + return True + dir_state.next_state = state + return False + + def init(self, dir_mapping): + with self.lock: + for dir_path, dir_map in dir_mapping.items(): + instance_id = dir_map['instance_id'] + if instance_id: + if not instance_id in self.instance_to_dir_map: + self.instance_to_dir_map[instance_id] = [] + self.instance_to_dir_map[instance_id].append(dir_path) + self.dir_states[dir_path] = DirectoryState(instance_id, dir_map['last_shuffled']) + dir_state = self.dir_states[dir_path] + state = State.INITIALIZING if instance_id else State.ASSOCIATING + purging = dir_map.get('purging', 0) + if purging: + dir_state.purging = True + state = State.DISASSOCIATING + if not instance_id: + dir_state.transition = StateTransition.transit(state, + dir_state.transition.action_type) + log.debug(f'starting state: {dir_path} {state}: {dir_state}') + self.set_state(dir_state, state) + log.debug(f'init dir_state: {dir_state}') + + def lookup(self, dir_path): + log.debug(f'looking up {dir_path}') + with self.lock: + dir_state = self.dir_states.get(dir_path, None) + if dir_state: + return {'instance_id': dir_state.instance_id, + 'mapped_time': dir_state.mapped_time, + 'purging': dir_state.purging} + return None + + def map(self, dir_path, dir_state): + log.debug(f'mapping {dir_path}') + min_instance_id = None + current_instance_id = dir_state.instance_id + if current_instance_id and not self.is_dead_instance(current_instance_id): + return True + if self.is_dead_instance(current_instance_id): + self.unmap(dir_path, dir_state) + for instance_id, dir_paths in self.instance_to_dir_map.items(): + if self.is_dead_instance(instance_id): + continue + if not min_instance_id or len(dir_paths) < len(self.instance_to_dir_map[min_instance_id]): + min_instance_id = instance_id + if not min_instance_id: + log.debug(f'instance unavailable for {dir_path}') + return False + log.debug(f'dir_path {dir_path} maps to instance {min_instance_id}') + dir_state.instance_id = min_instance_id + dir_state.mapped_time = time.time() + self.instance_to_dir_map[min_instance_id].append(dir_path) + return True + + def unmap(self, dir_path, dir_state): + instance_id = dir_state.instance_id + log.debug(f'unmapping {dir_path} from instance {instance_id}') + self.instance_to_dir_map[instance_id].remove(dir_path) + dir_state.instance_id = None + dir_state.mapped_time = None + if self.is_dead_instance(instance_id) and not self.instance_to_dir_map[instance_id]: + self.instance_to_dir_map.pop(instance_id) + self.dead_instances.remove(instance_id) + + def shuffle(self, dirs_per_instance, include_stalled_dirs): + log.debug(f'directories per instance: {dirs_per_instance}') + shuffle_dirs = [] + for instance_id, dir_paths in self.instance_to_dir_map.items(): + cut_off = len(dir_paths) - dirs_per_instance + if cut_off > 0: + for dir_path in dir_paths: + if cut_off == 0: + break + if self.is_shuffling(dir_path): + cut_off -= 1 + elif self.can_shuffle_dir(dir_path): + cut_off -= 1 + shuffle_dirs.append(dir_path) + if include_stalled_dirs: + for dir_path, dir_state in self.dir_states.items(): + if dir_state.stalled: + log.debug(f'{dir_path} is stalled: {dir_state} -- trigerring kick') + dir_state.stalled = False + shuffle_dirs.append(dir_path) + return shuffle_dirs + + def execute_policy_action(self, dir_path, dir_state, policy_action): + log.debug(f'executing for directory {dir_path} policy_action {policy_action}') + + done = True + if policy_action == PolicyAction.MAP: + done = self.map(dir_path, dir_state) + elif policy_action == PolicyAction.UNMAP: + self.unmap(dir_path, dir_state) + elif policy_action == PolicyAction.REMOVE: + if dir_state.state == State.UNASSOCIATED: + self.dir_states.pop(dir_path) + else: + raise Exception() + return done + + def start_action(self, dir_path): + log.debug(f'start action: {dir_path}') + with self.lock: + dir_state = self.dir_states.get(dir_path, None) + if not dir_state: + raise Exception() + log.debug(f'dir_state: {dir_state}') + if dir_state.transition.start_policy_action: + stalled = not self.execute_policy_action(dir_path, dir_state, + dir_state.transition.start_policy_action) + if stalled: + next_action = ActionType.NONE + if dir_state.purging: + dir_state.next_state = None + dir_state.state = State.UNASSOCIATED + dir_state.transition = StateTransition.transit(State.DISASSOCIATING, ActionType.NONE) + self.set_state(dir_state, State.DISASSOCIATING) + next_action = dir_state.transition.action_type + else: + dir_state.stalled = True + log.debug(f'state machine stalled') + return next_action + return dir_state.transition.action_type + + def finish_action(self, dir_path, r): + log.debug(f'finish action {dir_path} r={r}') + with self.lock: + dir_state = self.dir_states.get(dir_path, None) + if not dir_state: + raise Exception() + if r < 0 and (not Policy.is_instance_action(dir_state.transition.action_type) or + not dir_state.instance_id or + not dir_state.instance_id in self.dead_instances): + return True + log.debug(f'dir_state: {dir_state}') + finish_policy_action = dir_state.transition.finish_policy_action + dir_state.transition = StateTransition.transit( + dir_state.state, dir_state.transition.action_type) + log.debug(f'transitioned to dir_state: {dir_state}') + if dir_state.transition.final_state: + log.debug('reached final state') + dir_state.state = dir_state.transition.final_state + dir_state.transition = Transition(ActionType.NONE) + log.debug(f'final dir_state: {dir_state}') + if StateTransition.is_idle(dir_state.state) and dir_state.next_state: + self.set_state(dir_state, dir_state.next_state) + pending = not dir_state.transition.action_type == ActionType.NONE + if finish_policy_action: + self.execute_policy_action(dir_path, dir_state, finish_policy_action) + return pending + + def find_tracked_ancestor_or_subtree(self, dir_path): + for tracked_path, _ in self.dir_states.items(): + comp = [dir_path, tracked_path] + cpath = os.path.commonpath(comp) + if cpath in comp: + what = 'subtree' if cpath == tracked_path else 'ancestor' + return (tracked_path, what) + return None + + def add_dir(self, dir_path): + log.debug(f'adding dir_path {dir_path}') + with self.lock: + if dir_path in self.dir_states: + return False + as_info = self.find_tracked_ancestor_or_subtree(dir_path) + if as_info: + raise MirrorException(-errno.EINVAL, f'{dir_path} is a {as_info[1]} of tracked path {as_info[0]}') + self.dir_states[dir_path] = DirectoryState() + dir_state = self.dir_states[dir_path] + log.debug(f'add dir_state: {dir_state}') + if dir_state.state == State.INITIALIZING: + return False + return self.set_state(dir_state, State.ASSOCIATING) + + def remove_dir(self, dir_path): + log.debug(f'removing dir_path {dir_path}') + with self.lock: + dir_state = self.dir_states.get(dir_path, None) + if not dir_state: + return False + log.debug(f'removing dir_state: {dir_state}') + dir_state.purging = True + # advance the state machine with DISASSOCIATING state for removal + if dir_state.stalled: + dir_state.state = State.UNASSOCIATED + dir_state.transition = StateTransition.transit(State.DISASSOCIATING, ActionType.NONE) + r = self.set_state(dir_state, State.DISASSOCIATING) + log.debug(f'dir_state: {dir_state}') + return r + + def add_instances_initial(self, instance_ids): + """Take care of figuring out instances which no longer exist + and remove them. This is to be done only once on startup to + identify instances which were previously removed but directories + are still mapped (on-disk) to them. + """ + for instance_id in instance_ids: + if not instance_id in self.instance_to_dir_map: + self.instance_to_dir_map[instance_id] = [] + dead_instances = [] + for instance_id, _ in self.instance_to_dir_map.items(): + if not instance_id in instance_ids: + dead_instances.append(instance_id) + if dead_instances: + self._remove_instances(dead_instances) + + def add_instances(self, instance_ids, initial_update=False): + log.debug(f'adding instances: {instance_ids} initial_update {initial_update}') + with self.lock: + if initial_update: + self.add_instances_initial(instance_ids) + else: + nr_instances = len(self.instance_to_dir_map) + nr_dead_instances = len(self.dead_instances) + if nr_instances > 0: + # adjust dead instances + nr_instances -= nr_dead_instances + include_stalled_dirs = nr_instances == 0 + for instance_id in instance_ids: + if not instance_id in self.instance_to_dir_map: + self.instance_to_dir_map[instance_id] = [] + dirs_per_instance = int(len(self.dir_states) / + (len(self.instance_to_dir_map) - nr_dead_instances)) + if dirs_per_instance == 0: + dirs_per_instance += 1 + shuffle_dirs = [] + # super set of directories which are candidates for shuffling -- choose + # those which can be shuffle rightaway (others will be shuffled when + # they reach idle state). + shuffle_dirs_ss = self.shuffle(dirs_per_instance, include_stalled_dirs) + if include_stalled_dirs: + return shuffle_dirs_ss + for dir_path in shuffle_dirs_ss: + dir_state = self.dir_states[dir_path] + if self.set_state(dir_state, State.SHUFFLING): + shuffle_dirs.append(dir_path) + log.debug(f'remapping directories: {shuffle_dirs}') + return shuffle_dirs + + def remove_instances(self, instance_ids): + with self.lock: + return self._remove_instances(instance_ids) + + def _remove_instances(self, instance_ids): + log.debug(f'removing instances: {instance_ids}') + shuffle_dirs = [] + for instance_id in instance_ids: + if not instance_id in self.instance_to_dir_map: + continue + if not self.instance_to_dir_map[instance_id]: + self.instance_to_dir_map.pop(instance_id) + continue + self.dead_instances.append(instance_id) + dir_paths = self.instance_to_dir_map[instance_id] + log.debug(f'force shuffling instance_id {instance_id}, directories {dir_paths}') + for dir_path in dir_paths: + dir_state = self.dir_states[dir_path] + if self.is_state_scheduled(dir_state, State.DISASSOCIATING): + log.debug(f'dir_path {dir_path} is disassociating, ignoring...') + continue + log.debug(f'shuffling dir_path {dir_path}') + if self.set_state(dir_state, State.SHUFFLING, True): + shuffle_dirs.append(dir_path) + log.debug(f'shuffling {shuffle_dirs}') + return shuffle_dirs + + def dir_status(self, dir_path): + with self.lock: + dir_state = self.dir_states.get(dir_path, None) + if not dir_state: + raise MirrorException(-errno.ENOENT, f'{dir_path} is not tracked') + res = {} # type: Dict + if dir_state.stalled: + res['state'] = 'stalled' + res['reason'] = 'no mirror daemons running' + elif dir_state.state == State.ASSOCIATING: + res['state'] = 'mapping' + else: + state = None + dstate = dir_state.state + if dstate == State.ASSOCIATING: + state = 'mapping' + elif dstate == State.DISASSOCIATING: + state = 'unmapping' + elif dstate == State.SHUFFLING: + state = 'shuffling' + elif dstate == State.ASSOCIATED: + state = 'mapped' + elif dstate == State.INITIALIZING: + state = 'resolving' + res['state'] = state + res['instance_id'] = dir_state.instance_id + res['last_shuffled'] = dir_state.mapped_time + return res + + def instance_summary(self): + with self.lock: + res = { + 'mapping': {} + } # type: Dict + for instance_id, dir_paths in self.instance_to_dir_map.items(): + res['mapping'][instance_id] = f'{len(dir_paths)} directories' + return res |