summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/mirroring/fs/dir_map
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/pybind/mgr/mirroring/fs/dir_map
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/mirroring/fs/dir_map')
-rw-r--r--src/pybind/mgr/mirroring/fs/dir_map/__init__.py0
-rw-r--r--src/pybind/mgr/mirroring/fs/dir_map/create.py23
-rw-r--r--src/pybind/mgr/mirroring/fs/dir_map/load.py74
-rw-r--r--src/pybind/mgr/mirroring/fs/dir_map/policy.py380
-rw-r--r--src/pybind/mgr/mirroring/fs/dir_map/state_transition.py94
-rw-r--r--src/pybind/mgr/mirroring/fs/dir_map/update.py151
6 files changed, 722 insertions, 0 deletions
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/__init__.py b/src/pybind/mgr/mirroring/fs/dir_map/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/dir_map/__init__.py
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/create.py b/src/pybind/mgr/mirroring/fs/dir_map/create.py
new file mode 100644
index 000000000..963dfe915
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/dir_map/create.py
@@ -0,0 +1,23 @@
+import errno
+import logging
+
+import rados
+
+from ..exception import MirrorException
+from ..utils import MIRROR_OBJECT_NAME
+
+log = logging.getLogger(__name__)
+
+def create_mirror_object(rados_inst, pool_id):
+ log.info(f'creating mirror object: {MIRROR_OBJECT_NAME}')
+ try:
+ with rados_inst.open_ioctx2(pool_id) as ioctx:
+ with rados.WriteOpCtx() as write_op:
+ write_op.new(rados.LIBRADOS_CREATE_EXCLUSIVE)
+ ioctx.operate_write_op(write_op, MIRROR_OBJECT_NAME)
+ except rados.Error as e:
+ if e.errno == errno.EEXIST:
+ # be graceful
+ return -e.errno
+ log.error(f'failed to create mirror object: {e}')
+ raise Exception(-e.args[0])
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/load.py b/src/pybind/mgr/mirroring/fs/dir_map/load.py
new file mode 100644
index 000000000..42468b4e8
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/dir_map/load.py
@@ -0,0 +1,74 @@
+import errno
+import pickle
+import logging
+from typing import Dict
+
+import rados
+
+from ..exception import MirrorException
+from ..utils import MIRROR_OBJECT_NAME, DIRECTORY_MAP_PREFIX, \
+ INSTANCE_ID_PREFIX
+
+log = logging.getLogger(__name__)
+
+MAX_RETURN = 256
+
+def handle_dir_load(dir_mapping, dir_map):
+ for directory_str, encoded_map in dir_map.items():
+ dir_path = directory_str[len(DIRECTORY_MAP_PREFIX):]
+ decoded_map = pickle.loads(encoded_map)
+ log.debug(f'{dir_path} -> {decoded_map}')
+ dir_mapping[dir_path] = decoded_map
+
+def load_dir_map(ioctx):
+ dir_mapping = {} # type: Dict[str, Dict]
+ log.info('loading dir map...')
+ try:
+ with rados.ReadOpCtx() as read_op:
+ start = ""
+ while True:
+ iter, ret = ioctx.get_omap_vals(read_op, start, DIRECTORY_MAP_PREFIX, MAX_RETURN)
+ if not ret == 0:
+ log.error(f'failed to fetch dir mapping omap')
+ raise Exception(-errno.EINVAL)
+ ioctx.operate_read_op(read_op, MIRROR_OBJECT_NAME)
+ dir_map = dict(iter)
+ if not dir_map:
+ break
+ handle_dir_load(dir_mapping, dir_map)
+ start = dir_map.popitem()[0]
+ log.info("loaded {0} directory mapping(s) from disk".format(len(dir_mapping)))
+ return dir_mapping
+ except rados.Error as e:
+ log.error(f'exception when loading directory mapping: {e}')
+ raise Exception(-e.errno)
+
+def handle_instance_load(instance_mapping, instance_map):
+ for instance, e_data in instance_map.items():
+ instance_id = instance[len(INSTANCE_ID_PREFIX):]
+ d_data = pickle.loads(e_data)
+ log.debug(f'{instance_id} -> {d_data}')
+ instance_mapping[instance_id] = d_data
+
+def load_instances(ioctx):
+ instance_mapping = {} # type: Dict[str, Dict]
+ log.info('loading instances...')
+ try:
+ with rados.ReadOpCtx() as read_op:
+ start = ""
+ while True:
+ iter, ret = ioctx.get_omap_vals(read_op, start, INSTANCE_ID_PREFIX, MAX_RETURN)
+ if not ret == 0:
+ log.error(f'failed to fetch instance omap')
+ raise Exception(-errno.EINVAL)
+ ioctx.operate_read_op(read_op, MIRROR_OBJECT_NAME)
+ instance_map = dict(iter)
+ if not instance_map:
+ break
+ handle_instance_load(instance_mapping, instance_map)
+ start = instance_map.popitem()[0]
+ log.info("loaded {0} instance(s) from disk".format(len(instance_mapping)))
+ return instance_mapping
+ except rados.Error as e:
+ log.error(f'exception when loading instances: {e}')
+ raise Exception(-e.errno)
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/policy.py b/src/pybind/mgr/mirroring/fs/dir_map/policy.py
new file mode 100644
index 000000000..aef90b55f
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/dir_map/policy.py
@@ -0,0 +1,380 @@
+import os
+import errno
+import logging
+import time
+from threading import Lock
+from typing import Dict
+
+from .state_transition import ActionType, PolicyAction, Transition, \
+ State, StateTransition
+from ..exception import MirrorException
+
+log = logging.getLogger(__name__)
+
+class DirectoryState:
+ def __init__(self, instance_id=None, mapped_time=None):
+ self.instance_id = instance_id
+ self.mapped_time = mapped_time
+ self.state = State.UNASSOCIATED
+ self.stalled = False
+ self.transition = Transition(ActionType.NONE)
+ self.next_state = None
+ self.purging = False
+
+ def __str__(self):
+ return f'[instance_id={self.instance_id}, mapped_time={self.mapped_time},'\
+ f' state={self.state}, transition={self.transition}, next_state={self.next_state},'\
+ f' purging={self.purging}]'
+
+class Policy:
+ # number of seconds after which a directory can be reshuffled
+ # to other mirror daemon instances.
+ DIR_SHUFFLE_THROTTLE_INTERVAL = 300
+
+ def __init__(self):
+ self.dir_states = {}
+ self.instance_to_dir_map = {}
+ self.dead_instances = []
+ self.lock = Lock()
+
+ @staticmethod
+ def is_instance_action(action_type):
+ return action_type in (ActionType.ACQUIRE,
+ ActionType.RELEASE)
+
+ def is_dead_instance(self, instance_id):
+ return instance_id in self.dead_instances
+
+ def is_state_scheduled(self, dir_state, state):
+ return dir_state.state == state or dir_state.next_state == state
+
+ def is_shuffling(self, dir_path):
+ log.debug(f'is_shuffling: {dir_path}')
+ return self.is_state_scheduled(self.dir_states[dir_path], State.SHUFFLING)
+
+ def can_shuffle_dir(self, dir_path):
+ """Right now, shuffle directories only based on idleness. Later, we
+ probably want to avoid shuffling images that were recently shuffled.
+ """
+ log.debug(f'can_shuffle_dir: {dir_path}')
+ dir_state = self.dir_states[dir_path]
+ return StateTransition.is_idle(dir_state.state) and \
+ (time.time() - dir_state['mapped_time']) > Policy.DIR_SHUFFLE_THROTTLE_INTERVAL
+
+ def set_state(self, dir_state, state, ignore_current_state=False):
+ if not ignore_current_state and dir_state.state == state:
+ return False
+ elif StateTransition.is_idle(dir_state.state):
+ dir_state.state = state
+ dir_state.next_state = None
+ dir_state.transition = StateTransition.transit(
+ dir_state.state, dir_state.transition.action_type)
+ return True
+ dir_state.next_state = state
+ return False
+
+ def init(self, dir_mapping):
+ with self.lock:
+ for dir_path, dir_map in dir_mapping.items():
+ instance_id = dir_map['instance_id']
+ if instance_id:
+ if not instance_id in self.instance_to_dir_map:
+ self.instance_to_dir_map[instance_id] = []
+ self.instance_to_dir_map[instance_id].append(dir_path)
+ self.dir_states[dir_path] = DirectoryState(instance_id, dir_map['last_shuffled'])
+ dir_state = self.dir_states[dir_path]
+ state = State.INITIALIZING if instance_id else State.ASSOCIATING
+ purging = dir_map.get('purging', 0)
+ if purging:
+ dir_state.purging = True
+ state = State.DISASSOCIATING
+ if not instance_id:
+ dir_state.transition = StateTransition.transit(state,
+ dir_state.transition.action_type)
+ log.debug(f'starting state: {dir_path} {state}: {dir_state}')
+ self.set_state(dir_state, state)
+ log.debug(f'init dir_state: {dir_state}')
+
+ def lookup(self, dir_path):
+ log.debug(f'looking up {dir_path}')
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if dir_state:
+ return {'instance_id': dir_state.instance_id,
+ 'mapped_time': dir_state.mapped_time,
+ 'purging': dir_state.purging}
+ return None
+
+ def map(self, dir_path, dir_state):
+ log.debug(f'mapping {dir_path}')
+ min_instance_id = None
+ current_instance_id = dir_state.instance_id
+ if current_instance_id and not self.is_dead_instance(current_instance_id):
+ return True
+ if self.is_dead_instance(current_instance_id):
+ self.unmap(dir_path, dir_state)
+ for instance_id, dir_paths in self.instance_to_dir_map.items():
+ if self.is_dead_instance(instance_id):
+ continue
+ if not min_instance_id or len(dir_paths) < len(self.instance_to_dir_map[min_instance_id]):
+ min_instance_id = instance_id
+ if not min_instance_id:
+ log.debug(f'instance unavailable for {dir_path}')
+ return False
+ log.debug(f'dir_path {dir_path} maps to instance {min_instance_id}')
+ dir_state.instance_id = min_instance_id
+ dir_state.mapped_time = time.time()
+ self.instance_to_dir_map[min_instance_id].append(dir_path)
+ return True
+
+ def unmap(self, dir_path, dir_state):
+ instance_id = dir_state.instance_id
+ log.debug(f'unmapping {dir_path} from instance {instance_id}')
+ self.instance_to_dir_map[instance_id].remove(dir_path)
+ dir_state.instance_id = None
+ dir_state.mapped_time = None
+ if self.is_dead_instance(instance_id) and not self.instance_to_dir_map[instance_id]:
+ self.instance_to_dir_map.pop(instance_id)
+ self.dead_instances.remove(instance_id)
+
+ def shuffle(self, dirs_per_instance, include_stalled_dirs):
+ log.debug(f'directories per instance: {dirs_per_instance}')
+ shuffle_dirs = []
+ for instance_id, dir_paths in self.instance_to_dir_map.items():
+ cut_off = len(dir_paths) - dirs_per_instance
+ if cut_off > 0:
+ for dir_path in dir_paths:
+ if cut_off == 0:
+ break
+ if self.is_shuffling(dir_path):
+ cut_off -= 1
+ elif self.can_shuffle_dir(dir_path):
+ cut_off -= 1
+ shuffle_dirs.append(dir_path)
+ if include_stalled_dirs:
+ for dir_path, dir_state in self.dir_states.items():
+ if dir_state.stalled:
+ log.debug(f'{dir_path} is stalled: {dir_state} -- trigerring kick')
+ dir_state.stalled = False
+ shuffle_dirs.append(dir_path)
+ return shuffle_dirs
+
+ def execute_policy_action(self, dir_path, dir_state, policy_action):
+ log.debug(f'executing for directory {dir_path} policy_action {policy_action}')
+
+ done = True
+ if policy_action == PolicyAction.MAP:
+ done = self.map(dir_path, dir_state)
+ elif policy_action == PolicyAction.UNMAP:
+ self.unmap(dir_path, dir_state)
+ elif policy_action == PolicyAction.REMOVE:
+ if dir_state.state == State.UNASSOCIATED:
+ self.dir_states.pop(dir_path)
+ else:
+ raise Exception()
+ return done
+
+ def start_action(self, dir_path):
+ log.debug(f'start action: {dir_path}')
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if not dir_state:
+ raise Exception()
+ log.debug(f'dir_state: {dir_state}')
+ if dir_state.transition.start_policy_action:
+ stalled = not self.execute_policy_action(dir_path, dir_state,
+ dir_state.transition.start_policy_action)
+ if stalled:
+ next_action = ActionType.NONE
+ if dir_state.purging:
+ dir_state.next_state = None
+ dir_state.state = State.UNASSOCIATED
+ dir_state.transition = StateTransition.transit(State.DISASSOCIATING, ActionType.NONE)
+ self.set_state(dir_state, State.DISASSOCIATING)
+ next_action = dir_state.transition.action_type
+ else:
+ dir_state.stalled = True
+ log.debug(f'state machine stalled')
+ return next_action
+ return dir_state.transition.action_type
+
+ def finish_action(self, dir_path, r):
+ log.debug(f'finish action {dir_path} r={r}')
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if not dir_state:
+ raise Exception()
+ if r < 0 and (not Policy.is_instance_action(dir_state.transition.action_type) or
+ not dir_state.instance_id or
+ not dir_state.instance_id in self.dead_instances):
+ return True
+ log.debug(f'dir_state: {dir_state}')
+ finish_policy_action = dir_state.transition.finish_policy_action
+ dir_state.transition = StateTransition.transit(
+ dir_state.state, dir_state.transition.action_type)
+ log.debug(f'transitioned to dir_state: {dir_state}')
+ if dir_state.transition.final_state:
+ log.debug('reached final state')
+ dir_state.state = dir_state.transition.final_state
+ dir_state.transition = Transition(ActionType.NONE)
+ log.debug(f'final dir_state: {dir_state}')
+ if StateTransition.is_idle(dir_state.state) and dir_state.next_state:
+ self.set_state(dir_state, dir_state.next_state)
+ pending = not dir_state.transition.action_type == ActionType.NONE
+ if finish_policy_action:
+ self.execute_policy_action(dir_path, dir_state, finish_policy_action)
+ return pending
+
+ def find_tracked_ancestor_or_subtree(self, dir_path):
+ for tracked_path, _ in self.dir_states.items():
+ comp = [dir_path, tracked_path]
+ cpath = os.path.commonpath(comp)
+ if cpath in comp:
+ what = 'subtree' if cpath == tracked_path else 'ancestor'
+ return (tracked_path, what)
+ return None
+
+ def add_dir(self, dir_path):
+ log.debug(f'adding dir_path {dir_path}')
+ with self.lock:
+ if dir_path in self.dir_states:
+ return False
+ as_info = self.find_tracked_ancestor_or_subtree(dir_path)
+ if as_info:
+ raise MirrorException(-errno.EINVAL, f'{dir_path} is a {as_info[1]} of tracked path {as_info[0]}')
+ self.dir_states[dir_path] = DirectoryState()
+ dir_state = self.dir_states[dir_path]
+ log.debug(f'add dir_state: {dir_state}')
+ if dir_state.state == State.INITIALIZING:
+ return False
+ return self.set_state(dir_state, State.ASSOCIATING)
+
+ def remove_dir(self, dir_path):
+ log.debug(f'removing dir_path {dir_path}')
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if not dir_state:
+ return False
+ log.debug(f'removing dir_state: {dir_state}')
+ dir_state.purging = True
+ # advance the state machine with DISASSOCIATING state for removal
+ if dir_state.stalled:
+ dir_state.state = State.UNASSOCIATED
+ dir_state.transition = StateTransition.transit(State.DISASSOCIATING, ActionType.NONE)
+ r = self.set_state(dir_state, State.DISASSOCIATING)
+ log.debug(f'dir_state: {dir_state}')
+ return r
+
+ def add_instances_initial(self, instance_ids):
+ """Take care of figuring out instances which no longer exist
+ and remove them. This is to be done only once on startup to
+ identify instances which were previously removed but directories
+ are still mapped (on-disk) to them.
+ """
+ for instance_id in instance_ids:
+ if not instance_id in self.instance_to_dir_map:
+ self.instance_to_dir_map[instance_id] = []
+ dead_instances = []
+ for instance_id, _ in self.instance_to_dir_map.items():
+ if not instance_id in instance_ids:
+ dead_instances.append(instance_id)
+ if dead_instances:
+ self._remove_instances(dead_instances)
+
+ def add_instances(self, instance_ids, initial_update=False):
+ log.debug(f'adding instances: {instance_ids} initial_update {initial_update}')
+ with self.lock:
+ if initial_update:
+ self.add_instances_initial(instance_ids)
+ else:
+ nr_instances = len(self.instance_to_dir_map)
+ nr_dead_instances = len(self.dead_instances)
+ if nr_instances > 0:
+ # adjust dead instances
+ nr_instances -= nr_dead_instances
+ include_stalled_dirs = nr_instances == 0
+ for instance_id in instance_ids:
+ if not instance_id in self.instance_to_dir_map:
+ self.instance_to_dir_map[instance_id] = []
+ dirs_per_instance = int(len(self.dir_states) /
+ (len(self.instance_to_dir_map) - nr_dead_instances))
+ if dirs_per_instance == 0:
+ dirs_per_instance += 1
+ shuffle_dirs = []
+ # super set of directories which are candidates for shuffling -- choose
+ # those which can be shuffle rightaway (others will be shuffled when
+ # they reach idle state).
+ shuffle_dirs_ss = self.shuffle(dirs_per_instance, include_stalled_dirs)
+ if include_stalled_dirs:
+ return shuffle_dirs_ss
+ for dir_path in shuffle_dirs_ss:
+ dir_state = self.dir_states[dir_path]
+ if self.set_state(dir_state, State.SHUFFLING):
+ shuffle_dirs.append(dir_path)
+ log.debug(f'remapping directories: {shuffle_dirs}')
+ return shuffle_dirs
+
+ def remove_instances(self, instance_ids):
+ with self.lock:
+ return self._remove_instances(instance_ids)
+
+ def _remove_instances(self, instance_ids):
+ log.debug(f'removing instances: {instance_ids}')
+ shuffle_dirs = []
+ for instance_id in instance_ids:
+ if not instance_id in self.instance_to_dir_map:
+ continue
+ if not self.instance_to_dir_map[instance_id]:
+ self.instance_to_dir_map.pop(instance_id)
+ continue
+ self.dead_instances.append(instance_id)
+ dir_paths = self.instance_to_dir_map[instance_id]
+ log.debug(f'force shuffling instance_id {instance_id}, directories {dir_paths}')
+ for dir_path in dir_paths:
+ dir_state = self.dir_states[dir_path]
+ if self.is_state_scheduled(dir_state, State.DISASSOCIATING):
+ log.debug(f'dir_path {dir_path} is disassociating, ignoring...')
+ continue
+ log.debug(f'shuffling dir_path {dir_path}')
+ if self.set_state(dir_state, State.SHUFFLING, True):
+ shuffle_dirs.append(dir_path)
+ log.debug(f'shuffling {shuffle_dirs}')
+ return shuffle_dirs
+
+ def dir_status(self, dir_path):
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if not dir_state:
+ raise MirrorException(-errno.ENOENT, f'{dir_path} is not tracked')
+ res = {} # type: Dict
+ if dir_state.stalled:
+ res['state'] = 'stalled'
+ res['reason'] = 'no mirror daemons running'
+ elif dir_state.state == State.ASSOCIATING:
+ res['state'] = 'mapping'
+ else:
+ state = None
+ dstate = dir_state.state
+ if dstate == State.ASSOCIATING:
+ state = 'mapping'
+ elif dstate == State.DISASSOCIATING:
+ state = 'unmapping'
+ elif dstate == State.SHUFFLING:
+ state = 'shuffling'
+ elif dstate == State.ASSOCIATED:
+ state = 'mapped'
+ elif dstate == State.INITIALIZING:
+ state = 'resolving'
+ res['state'] = state
+ res['instance_id'] = dir_state.instance_id
+ res['last_shuffled'] = dir_state.mapped_time
+ return res
+
+ def instance_summary(self):
+ with self.lock:
+ res = {
+ 'mapping': {}
+ } # type: Dict
+ for instance_id, dir_paths in self.instance_to_dir_map.items():
+ res['mapping'][instance_id] = f'{len(dir_paths)} directories'
+ return res
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/state_transition.py b/src/pybind/mgr/mirroring/fs/dir_map/state_transition.py
new file mode 100644
index 000000000..ef59a6a87
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/dir_map/state_transition.py
@@ -0,0 +1,94 @@
+import logging
+from enum import Enum, unique
+from typing import Dict
+
+log = logging.getLogger(__name__)
+
+@unique
+class State(Enum):
+ UNASSOCIATED = 0
+ INITIALIZING = 1
+ ASSOCIATING = 2
+ ASSOCIATED = 3
+ SHUFFLING = 4
+ DISASSOCIATING = 5
+
+@unique
+class ActionType(Enum):
+ NONE = 0
+ MAP_UPDATE = 1
+ MAP_REMOVE = 2
+ ACQUIRE = 3
+ RELEASE = 4
+
+@unique
+class PolicyAction(Enum):
+ MAP = 0
+ UNMAP = 1
+ REMOVE = 2
+
+class TransitionKey:
+ def __init__(self, state, action_type):
+ self.transition_key = [state, action_type]
+
+ def __hash__(self):
+ return hash(tuple(self.transition_key))
+
+ def __eq__(self, other):
+ return self.transition_key == other.transition_key
+
+ def __neq__(self, other):
+ return not(self == other)
+
+class Transition:
+ def __init__(self, action_type, start_policy_action=None,
+ finish_policy_action=None, final_state=None):
+ self.action_type = action_type
+ self.start_policy_action = start_policy_action
+ self.finish_policy_action = finish_policy_action
+ self.final_state = final_state
+
+ def __str__(self):
+ return "[action_type={0}, start_policy_action={1}, finish_policy_action={2}, final_state={3}".format(
+ self.action_type, self.start_policy_action, self.finish_policy_action, self.final_state)
+
+class StateTransition:
+ transition_table = {} # type: Dict[TransitionKey, Transition]
+
+ @staticmethod
+ def transit(state, action_type):
+ try:
+ return StateTransition.transition_table[TransitionKey(state, action_type)]
+ except KeyError:
+ raise Exception()
+
+ @staticmethod
+ def is_idle(state):
+ return state in (State.UNASSOCIATED, State.ASSOCIATED)
+
+StateTransition.transition_table = {
+ TransitionKey(State.INITIALIZING, ActionType.NONE) : Transition(ActionType.ACQUIRE),
+ TransitionKey(State.INITIALIZING, ActionType.ACQUIRE) : Transition(ActionType.NONE,
+ final_state=State.ASSOCIATED),
+
+ TransitionKey(State.ASSOCIATING, ActionType.NONE) : Transition(ActionType.MAP_UPDATE,
+ start_policy_action=PolicyAction.MAP),
+ TransitionKey(State.ASSOCIATING, ActionType.MAP_UPDATE) : Transition(ActionType.ACQUIRE),
+ TransitionKey(State.ASSOCIATING, ActionType.ACQUIRE) : Transition(ActionType.NONE,
+ final_state=State.ASSOCIATED),
+
+ TransitionKey(State.DISASSOCIATING, ActionType.NONE) : Transition(ActionType.RELEASE,
+ finish_policy_action=PolicyAction.UNMAP),
+ TransitionKey(State.DISASSOCIATING, ActionType.RELEASE) : Transition(ActionType.MAP_REMOVE,
+ finish_policy_action=PolicyAction.REMOVE),
+ TransitionKey(State.DISASSOCIATING, ActionType.MAP_REMOVE) : Transition(ActionType.NONE,
+ final_state=State.UNASSOCIATED),
+
+ TransitionKey(State.SHUFFLING, ActionType.NONE) : Transition(ActionType.RELEASE,
+ finish_policy_action=PolicyAction.UNMAP),
+ TransitionKey(State.SHUFFLING, ActionType.RELEASE) : Transition(ActionType.MAP_UPDATE,
+ start_policy_action=PolicyAction.MAP),
+ TransitionKey(State.SHUFFLING, ActionType.MAP_UPDATE) : Transition(ActionType.ACQUIRE),
+ TransitionKey(State.SHUFFLING, ActionType.ACQUIRE) : Transition(ActionType.NONE,
+ final_state=State.ASSOCIATED),
+ }
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/update.py b/src/pybind/mgr/mirroring/fs/dir_map/update.py
new file mode 100644
index 000000000..a70baa01a
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/dir_map/update.py
@@ -0,0 +1,151 @@
+import errno
+import pickle
+import logging
+
+import rados
+
+from ..utils import MIRROR_OBJECT_NAME, DIRECTORY_MAP_PREFIX, \
+ INSTANCE_ID_PREFIX, MIRROR_OBJECT_PREFIX
+
+log = logging.getLogger(__name__)
+
+MAX_UPDATE = 256
+
+class UpdateDirMapRequest:
+ def __init__(self, ioctx, update_mapping, removals, on_finish_callback):
+ self.ioctx = ioctx
+ self.update_mapping = update_mapping
+ self.removals = removals
+ self.on_finish_callback = on_finish_callback
+
+ @staticmethod
+ def omap_key(dir_path):
+ return f'{DIRECTORY_MAP_PREFIX}{dir_path}'
+
+ def send(self):
+ log.info('updating image map')
+ self.send_update()
+
+ def send_update(self):
+ log.debug(f'pending updates: {len(self.update_mapping)}+{len(self.removals)}')
+ try:
+ with rados.WriteOpCtx() as write_op:
+ keys = []
+ vals = []
+ dir_keys = list(self.update_mapping.keys())[0:MAX_UPDATE]
+ # gather updates
+ for dir_path in dir_keys:
+ mapping = self.update_mapping.pop(dir_path)
+ keys.append(UpdateDirMapRequest.omap_key(dir_path))
+ vals.append(pickle.dumps(mapping))
+ self.ioctx.set_omap(write_op, tuple(keys), tuple(vals))
+ # gather deletes
+ slicept = MAX_UPDATE - len(dir_keys)
+ removals = [UpdateDirMapRequest.omap_key(dir_path) for dir_path in self.removals[0:slicept]]
+ self.removals = self.removals[slicept:]
+ self.ioctx.remove_omap_keys(write_op, tuple(removals))
+ log.debug(f'applying {len(keys)} updates, {len(removals)} deletes')
+ self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update)
+ except rados.Error as e:
+ log.error(f'UpdateDirMapRequest.send_update exception: {e}')
+ self.finish(-e.args[0])
+
+ def handle_update(self, completion):
+ r = completion.get_return_value()
+ log.debug(f'handle_update: r={r}')
+ if not r == 0:
+ self.finish(r)
+ elif self.update_mapping or self.removals:
+ self.send_update()
+ else:
+ self.finish(0)
+
+ def finish(self, r):
+ log.info(f'finish: r={r}')
+ self.on_finish_callback(r)
+
+class UpdateInstanceRequest:
+ def __init__(self, ioctx, instances_added, instances_removed, on_finish_callback):
+ self.ioctx = ioctx
+ self.instances_added = instances_added
+ # purge vs remove: purge list is for purging on-disk instance
+ # object. remove is for purging instance map.
+ self.instances_removed = instances_removed.copy()
+ self.instances_purge = instances_removed.copy()
+ self.on_finish_callback = on_finish_callback
+
+ @staticmethod
+ def omap_key(instance_id):
+ return f'{INSTANCE_ID_PREFIX}{instance_id}'
+
+ @staticmethod
+ def cephfs_mirror_object_name(instance_id):
+ assert instance_id != ''
+ return f'{MIRROR_OBJECT_PREFIX}.{instance_id}'
+
+ def send(self):
+ log.info('updating instances')
+ self.send_update()
+
+ def send_update(self):
+ self.remove_instance_object()
+
+ def remove_instance_object(self):
+ log.debug(f'pending purges: {len(self.instances_purge)}')
+ if not self.instances_purge:
+ self.update_instance_map()
+ return
+ instance_id = self.instances_purge.pop()
+ self.ioctx.aio_remove(
+ UpdateInstanceRequest.cephfs_mirror_object_name(instance_id), oncomplete=self.handle_remove)
+
+ def handle_remove(self, completion):
+ r = completion.get_return_value()
+ log.debug(f'handle_remove: r={r}')
+ # cephfs-mirror instances remove their respective instance
+ # objects upon termination. so we handle ENOENT here. note
+ # that when an instance is blocklisted, it wont be able to
+ # purge its instance object, so we do it on its behalf.
+ if not r == 0 and not r == -errno.ENOENT:
+ self.finish(r)
+ return
+ self.remove_instance_object()
+
+ def update_instance_map(self):
+ log.debug(f'pending updates: {len(self.instances_added)}+{len(self.instances_removed)}')
+ try:
+ with rados.WriteOpCtx() as write_op:
+ keys = []
+ vals = []
+ instance_ids = list(self.instances_added.keys())[0:MAX_UPDATE]
+ # gather updates
+ for instance_id in instance_ids:
+ data = self.instances_added.pop(instance_id)
+ keys.append(UpdateInstanceRequest.omap_key(instance_id))
+ vals.append(pickle.dumps(data))
+ self.ioctx.set_omap(write_op, tuple(keys), tuple(vals))
+ # gather deletes
+ slicept = MAX_UPDATE - len(instance_ids)
+ removals = [UpdateInstanceRequest.omap_key(instance_id) \
+ for instance_id in self.instances_removed[0:slicept]]
+ self.instances_removed = self.instances_removed[slicept:]
+ self.ioctx.remove_omap_keys(write_op, tuple(removals))
+ log.debug(f'applying {len(keys)} updates, {len(removals)} deletes')
+ self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update)
+ except rados.Error as e:
+ log.error(f'UpdateInstanceRequest.update_instance_map exception: {e}')
+ self.finish(-e.args[0])
+
+ def handle_update(self, completion):
+ r = completion.get_return_value()
+ log.debug(f'handle_update: r={r}')
+ if not r == 0:
+ self.finish(r)
+ elif self.instances_added or self.instances_removed:
+ self.update_instance_map()
+ else:
+ self.finish(0)
+
+ def finish(self, r):
+ log.info(f'finish: r={r}')
+ self.on_finish_callback(r)