summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/mirroring/fs
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/mirroring/fs')
-rw-r--r--src/pybind/mgr/mirroring/fs/__init__.py0
-rw-r--r--src/pybind/mgr/mirroring/fs/blocklist.py10
-rw-r--r--src/pybind/mgr/mirroring/fs/dir_map/__init__.py0
-rw-r--r--src/pybind/mgr/mirroring/fs/dir_map/create.py23
-rw-r--r--src/pybind/mgr/mirroring/fs/dir_map/load.py74
-rw-r--r--src/pybind/mgr/mirroring/fs/dir_map/policy.py380
-rw-r--r--src/pybind/mgr/mirroring/fs/dir_map/state_transition.py94
-rw-r--r--src/pybind/mgr/mirroring/fs/dir_map/update.py151
-rw-r--r--src/pybind/mgr/mirroring/fs/exception.py3
-rw-r--r--src/pybind/mgr/mirroring/fs/notify.py121
-rw-r--r--src/pybind/mgr/mirroring/fs/snapshot_mirror.py792
-rw-r--r--src/pybind/mgr/mirroring/fs/utils.py152
12 files changed, 1800 insertions, 0 deletions
diff --git a/src/pybind/mgr/mirroring/fs/__init__.py b/src/pybind/mgr/mirroring/fs/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/__init__.py
diff --git a/src/pybind/mgr/mirroring/fs/blocklist.py b/src/pybind/mgr/mirroring/fs/blocklist.py
new file mode 100644
index 000000000..473b5f262
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/blocklist.py
@@ -0,0 +1,10 @@
+import logging
+
+log = logging.getLogger(__name__)
+
+def blocklist(mgr, addr):
+ cmd = {'prefix': 'osd blocklist', 'blocklistop': 'add', 'addr': str(addr)}
+ r, outs, err = mgr.mon_command(cmd)
+ if r != 0:
+ log.error(f'blocklist error: {err}')
+ return r
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/__init__.py b/src/pybind/mgr/mirroring/fs/dir_map/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/dir_map/__init__.py
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/create.py b/src/pybind/mgr/mirroring/fs/dir_map/create.py
new file mode 100644
index 000000000..963dfe915
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/dir_map/create.py
@@ -0,0 +1,23 @@
+import errno
+import logging
+
+import rados
+
+from ..exception import MirrorException
+from ..utils import MIRROR_OBJECT_NAME
+
+log = logging.getLogger(__name__)
+
+def create_mirror_object(rados_inst, pool_id):
+ log.info(f'creating mirror object: {MIRROR_OBJECT_NAME}')
+ try:
+ with rados_inst.open_ioctx2(pool_id) as ioctx:
+ with rados.WriteOpCtx() as write_op:
+ write_op.new(rados.LIBRADOS_CREATE_EXCLUSIVE)
+ ioctx.operate_write_op(write_op, MIRROR_OBJECT_NAME)
+ except rados.Error as e:
+ if e.errno == errno.EEXIST:
+ # be graceful
+ return -e.errno
+ log.error(f'failed to create mirror object: {e}')
+ raise Exception(-e.args[0])
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/load.py b/src/pybind/mgr/mirroring/fs/dir_map/load.py
new file mode 100644
index 000000000..42468b4e8
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/dir_map/load.py
@@ -0,0 +1,74 @@
+import errno
+import pickle
+import logging
+from typing import Dict
+
+import rados
+
+from ..exception import MirrorException
+from ..utils import MIRROR_OBJECT_NAME, DIRECTORY_MAP_PREFIX, \
+ INSTANCE_ID_PREFIX
+
+log = logging.getLogger(__name__)
+
+MAX_RETURN = 256
+
+def handle_dir_load(dir_mapping, dir_map):
+ for directory_str, encoded_map in dir_map.items():
+ dir_path = directory_str[len(DIRECTORY_MAP_PREFIX):]
+ decoded_map = pickle.loads(encoded_map)
+ log.debug(f'{dir_path} -> {decoded_map}')
+ dir_mapping[dir_path] = decoded_map
+
+def load_dir_map(ioctx):
+ dir_mapping = {} # type: Dict[str, Dict]
+ log.info('loading dir map...')
+ try:
+ with rados.ReadOpCtx() as read_op:
+ start = ""
+ while True:
+ iter, ret = ioctx.get_omap_vals(read_op, start, DIRECTORY_MAP_PREFIX, MAX_RETURN)
+ if not ret == 0:
+ log.error(f'failed to fetch dir mapping omap')
+ raise Exception(-errno.EINVAL)
+ ioctx.operate_read_op(read_op, MIRROR_OBJECT_NAME)
+ dir_map = dict(iter)
+ if not dir_map:
+ break
+ handle_dir_load(dir_mapping, dir_map)
+ start = dir_map.popitem()[0]
+ log.info("loaded {0} directory mapping(s) from disk".format(len(dir_mapping)))
+ return dir_mapping
+ except rados.Error as e:
+ log.error(f'exception when loading directory mapping: {e}')
+ raise Exception(-e.errno)
+
+def handle_instance_load(instance_mapping, instance_map):
+ for instance, e_data in instance_map.items():
+ instance_id = instance[len(INSTANCE_ID_PREFIX):]
+ d_data = pickle.loads(e_data)
+ log.debug(f'{instance_id} -> {d_data}')
+ instance_mapping[instance_id] = d_data
+
+def load_instances(ioctx):
+ instance_mapping = {} # type: Dict[str, Dict]
+ log.info('loading instances...')
+ try:
+ with rados.ReadOpCtx() as read_op:
+ start = ""
+ while True:
+ iter, ret = ioctx.get_omap_vals(read_op, start, INSTANCE_ID_PREFIX, MAX_RETURN)
+ if not ret == 0:
+ log.error(f'failed to fetch instance omap')
+ raise Exception(-errno.EINVAL)
+ ioctx.operate_read_op(read_op, MIRROR_OBJECT_NAME)
+ instance_map = dict(iter)
+ if not instance_map:
+ break
+ handle_instance_load(instance_mapping, instance_map)
+ start = instance_map.popitem()[0]
+ log.info("loaded {0} instance(s) from disk".format(len(instance_mapping)))
+ return instance_mapping
+ except rados.Error as e:
+ log.error(f'exception when loading instances: {e}')
+ raise Exception(-e.errno)
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/policy.py b/src/pybind/mgr/mirroring/fs/dir_map/policy.py
new file mode 100644
index 000000000..aef90b55f
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/dir_map/policy.py
@@ -0,0 +1,380 @@
+import os
+import errno
+import logging
+import time
+from threading import Lock
+from typing import Dict
+
+from .state_transition import ActionType, PolicyAction, Transition, \
+ State, StateTransition
+from ..exception import MirrorException
+
+log = logging.getLogger(__name__)
+
+class DirectoryState:
+ def __init__(self, instance_id=None, mapped_time=None):
+ self.instance_id = instance_id
+ self.mapped_time = mapped_time
+ self.state = State.UNASSOCIATED
+ self.stalled = False
+ self.transition = Transition(ActionType.NONE)
+ self.next_state = None
+ self.purging = False
+
+ def __str__(self):
+ return f'[instance_id={self.instance_id}, mapped_time={self.mapped_time},'\
+ f' state={self.state}, transition={self.transition}, next_state={self.next_state},'\
+ f' purging={self.purging}]'
+
+class Policy:
+ # number of seconds after which a directory can be reshuffled
+ # to other mirror daemon instances.
+ DIR_SHUFFLE_THROTTLE_INTERVAL = 300
+
+ def __init__(self):
+ self.dir_states = {}
+ self.instance_to_dir_map = {}
+ self.dead_instances = []
+ self.lock = Lock()
+
+ @staticmethod
+ def is_instance_action(action_type):
+ return action_type in (ActionType.ACQUIRE,
+ ActionType.RELEASE)
+
+ def is_dead_instance(self, instance_id):
+ return instance_id in self.dead_instances
+
+ def is_state_scheduled(self, dir_state, state):
+ return dir_state.state == state or dir_state.next_state == state
+
+ def is_shuffling(self, dir_path):
+ log.debug(f'is_shuffling: {dir_path}')
+ return self.is_state_scheduled(self.dir_states[dir_path], State.SHUFFLING)
+
+ def can_shuffle_dir(self, dir_path):
+ """Right now, shuffle directories only based on idleness. Later, we
+ probably want to avoid shuffling images that were recently shuffled.
+ """
+ log.debug(f'can_shuffle_dir: {dir_path}')
+ dir_state = self.dir_states[dir_path]
+ return StateTransition.is_idle(dir_state.state) and \
+ (time.time() - dir_state['mapped_time']) > Policy.DIR_SHUFFLE_THROTTLE_INTERVAL
+
+ def set_state(self, dir_state, state, ignore_current_state=False):
+ if not ignore_current_state and dir_state.state == state:
+ return False
+ elif StateTransition.is_idle(dir_state.state):
+ dir_state.state = state
+ dir_state.next_state = None
+ dir_state.transition = StateTransition.transit(
+ dir_state.state, dir_state.transition.action_type)
+ return True
+ dir_state.next_state = state
+ return False
+
+ def init(self, dir_mapping):
+ with self.lock:
+ for dir_path, dir_map in dir_mapping.items():
+ instance_id = dir_map['instance_id']
+ if instance_id:
+ if not instance_id in self.instance_to_dir_map:
+ self.instance_to_dir_map[instance_id] = []
+ self.instance_to_dir_map[instance_id].append(dir_path)
+ self.dir_states[dir_path] = DirectoryState(instance_id, dir_map['last_shuffled'])
+ dir_state = self.dir_states[dir_path]
+ state = State.INITIALIZING if instance_id else State.ASSOCIATING
+ purging = dir_map.get('purging', 0)
+ if purging:
+ dir_state.purging = True
+ state = State.DISASSOCIATING
+ if not instance_id:
+ dir_state.transition = StateTransition.transit(state,
+ dir_state.transition.action_type)
+ log.debug(f'starting state: {dir_path} {state}: {dir_state}')
+ self.set_state(dir_state, state)
+ log.debug(f'init dir_state: {dir_state}')
+
+ def lookup(self, dir_path):
+ log.debug(f'looking up {dir_path}')
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if dir_state:
+ return {'instance_id': dir_state.instance_id,
+ 'mapped_time': dir_state.mapped_time,
+ 'purging': dir_state.purging}
+ return None
+
+ def map(self, dir_path, dir_state):
+ log.debug(f'mapping {dir_path}')
+ min_instance_id = None
+ current_instance_id = dir_state.instance_id
+ if current_instance_id and not self.is_dead_instance(current_instance_id):
+ return True
+ if self.is_dead_instance(current_instance_id):
+ self.unmap(dir_path, dir_state)
+ for instance_id, dir_paths in self.instance_to_dir_map.items():
+ if self.is_dead_instance(instance_id):
+ continue
+ if not min_instance_id or len(dir_paths) < len(self.instance_to_dir_map[min_instance_id]):
+ min_instance_id = instance_id
+ if not min_instance_id:
+ log.debug(f'instance unavailable for {dir_path}')
+ return False
+ log.debug(f'dir_path {dir_path} maps to instance {min_instance_id}')
+ dir_state.instance_id = min_instance_id
+ dir_state.mapped_time = time.time()
+ self.instance_to_dir_map[min_instance_id].append(dir_path)
+ return True
+
+ def unmap(self, dir_path, dir_state):
+ instance_id = dir_state.instance_id
+ log.debug(f'unmapping {dir_path} from instance {instance_id}')
+ self.instance_to_dir_map[instance_id].remove(dir_path)
+ dir_state.instance_id = None
+ dir_state.mapped_time = None
+ if self.is_dead_instance(instance_id) and not self.instance_to_dir_map[instance_id]:
+ self.instance_to_dir_map.pop(instance_id)
+ self.dead_instances.remove(instance_id)
+
+ def shuffle(self, dirs_per_instance, include_stalled_dirs):
+ log.debug(f'directories per instance: {dirs_per_instance}')
+ shuffle_dirs = []
+ for instance_id, dir_paths in self.instance_to_dir_map.items():
+ cut_off = len(dir_paths) - dirs_per_instance
+ if cut_off > 0:
+ for dir_path in dir_paths:
+ if cut_off == 0:
+ break
+ if self.is_shuffling(dir_path):
+ cut_off -= 1
+ elif self.can_shuffle_dir(dir_path):
+ cut_off -= 1
+ shuffle_dirs.append(dir_path)
+ if include_stalled_dirs:
+ for dir_path, dir_state in self.dir_states.items():
+ if dir_state.stalled:
+ log.debug(f'{dir_path} is stalled: {dir_state} -- trigerring kick')
+ dir_state.stalled = False
+ shuffle_dirs.append(dir_path)
+ return shuffle_dirs
+
+ def execute_policy_action(self, dir_path, dir_state, policy_action):
+ log.debug(f'executing for directory {dir_path} policy_action {policy_action}')
+
+ done = True
+ if policy_action == PolicyAction.MAP:
+ done = self.map(dir_path, dir_state)
+ elif policy_action == PolicyAction.UNMAP:
+ self.unmap(dir_path, dir_state)
+ elif policy_action == PolicyAction.REMOVE:
+ if dir_state.state == State.UNASSOCIATED:
+ self.dir_states.pop(dir_path)
+ else:
+ raise Exception()
+ return done
+
+ def start_action(self, dir_path):
+ log.debug(f'start action: {dir_path}')
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if not dir_state:
+ raise Exception()
+ log.debug(f'dir_state: {dir_state}')
+ if dir_state.transition.start_policy_action:
+ stalled = not self.execute_policy_action(dir_path, dir_state,
+ dir_state.transition.start_policy_action)
+ if stalled:
+ next_action = ActionType.NONE
+ if dir_state.purging:
+ dir_state.next_state = None
+ dir_state.state = State.UNASSOCIATED
+ dir_state.transition = StateTransition.transit(State.DISASSOCIATING, ActionType.NONE)
+ self.set_state(dir_state, State.DISASSOCIATING)
+ next_action = dir_state.transition.action_type
+ else:
+ dir_state.stalled = True
+ log.debug(f'state machine stalled')
+ return next_action
+ return dir_state.transition.action_type
+
+ def finish_action(self, dir_path, r):
+ log.debug(f'finish action {dir_path} r={r}')
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if not dir_state:
+ raise Exception()
+ if r < 0 and (not Policy.is_instance_action(dir_state.transition.action_type) or
+ not dir_state.instance_id or
+ not dir_state.instance_id in self.dead_instances):
+ return True
+ log.debug(f'dir_state: {dir_state}')
+ finish_policy_action = dir_state.transition.finish_policy_action
+ dir_state.transition = StateTransition.transit(
+ dir_state.state, dir_state.transition.action_type)
+ log.debug(f'transitioned to dir_state: {dir_state}')
+ if dir_state.transition.final_state:
+ log.debug('reached final state')
+ dir_state.state = dir_state.transition.final_state
+ dir_state.transition = Transition(ActionType.NONE)
+ log.debug(f'final dir_state: {dir_state}')
+ if StateTransition.is_idle(dir_state.state) and dir_state.next_state:
+ self.set_state(dir_state, dir_state.next_state)
+ pending = not dir_state.transition.action_type == ActionType.NONE
+ if finish_policy_action:
+ self.execute_policy_action(dir_path, dir_state, finish_policy_action)
+ return pending
+
+ def find_tracked_ancestor_or_subtree(self, dir_path):
+ for tracked_path, _ in self.dir_states.items():
+ comp = [dir_path, tracked_path]
+ cpath = os.path.commonpath(comp)
+ if cpath in comp:
+ what = 'subtree' if cpath == tracked_path else 'ancestor'
+ return (tracked_path, what)
+ return None
+
+ def add_dir(self, dir_path):
+ log.debug(f'adding dir_path {dir_path}')
+ with self.lock:
+ if dir_path in self.dir_states:
+ return False
+ as_info = self.find_tracked_ancestor_or_subtree(dir_path)
+ if as_info:
+ raise MirrorException(-errno.EINVAL, f'{dir_path} is a {as_info[1]} of tracked path {as_info[0]}')
+ self.dir_states[dir_path] = DirectoryState()
+ dir_state = self.dir_states[dir_path]
+ log.debug(f'add dir_state: {dir_state}')
+ if dir_state.state == State.INITIALIZING:
+ return False
+ return self.set_state(dir_state, State.ASSOCIATING)
+
+ def remove_dir(self, dir_path):
+ log.debug(f'removing dir_path {dir_path}')
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if not dir_state:
+ return False
+ log.debug(f'removing dir_state: {dir_state}')
+ dir_state.purging = True
+ # advance the state machine with DISASSOCIATING state for removal
+ if dir_state.stalled:
+ dir_state.state = State.UNASSOCIATED
+ dir_state.transition = StateTransition.transit(State.DISASSOCIATING, ActionType.NONE)
+ r = self.set_state(dir_state, State.DISASSOCIATING)
+ log.debug(f'dir_state: {dir_state}')
+ return r
+
+ def add_instances_initial(self, instance_ids):
+ """Take care of figuring out instances which no longer exist
+ and remove them. This is to be done only once on startup to
+ identify instances which were previously removed but directories
+ are still mapped (on-disk) to them.
+ """
+ for instance_id in instance_ids:
+ if not instance_id in self.instance_to_dir_map:
+ self.instance_to_dir_map[instance_id] = []
+ dead_instances = []
+ for instance_id, _ in self.instance_to_dir_map.items():
+ if not instance_id in instance_ids:
+ dead_instances.append(instance_id)
+ if dead_instances:
+ self._remove_instances(dead_instances)
+
+ def add_instances(self, instance_ids, initial_update=False):
+ log.debug(f'adding instances: {instance_ids} initial_update {initial_update}')
+ with self.lock:
+ if initial_update:
+ self.add_instances_initial(instance_ids)
+ else:
+ nr_instances = len(self.instance_to_dir_map)
+ nr_dead_instances = len(self.dead_instances)
+ if nr_instances > 0:
+ # adjust dead instances
+ nr_instances -= nr_dead_instances
+ include_stalled_dirs = nr_instances == 0
+ for instance_id in instance_ids:
+ if not instance_id in self.instance_to_dir_map:
+ self.instance_to_dir_map[instance_id] = []
+ dirs_per_instance = int(len(self.dir_states) /
+ (len(self.instance_to_dir_map) - nr_dead_instances))
+ if dirs_per_instance == 0:
+ dirs_per_instance += 1
+ shuffle_dirs = []
+ # super set of directories which are candidates for shuffling -- choose
+ # those which can be shuffle rightaway (others will be shuffled when
+ # they reach idle state).
+ shuffle_dirs_ss = self.shuffle(dirs_per_instance, include_stalled_dirs)
+ if include_stalled_dirs:
+ return shuffle_dirs_ss
+ for dir_path in shuffle_dirs_ss:
+ dir_state = self.dir_states[dir_path]
+ if self.set_state(dir_state, State.SHUFFLING):
+ shuffle_dirs.append(dir_path)
+ log.debug(f'remapping directories: {shuffle_dirs}')
+ return shuffle_dirs
+
+ def remove_instances(self, instance_ids):
+ with self.lock:
+ return self._remove_instances(instance_ids)
+
+ def _remove_instances(self, instance_ids):
+ log.debug(f'removing instances: {instance_ids}')
+ shuffle_dirs = []
+ for instance_id in instance_ids:
+ if not instance_id in self.instance_to_dir_map:
+ continue
+ if not self.instance_to_dir_map[instance_id]:
+ self.instance_to_dir_map.pop(instance_id)
+ continue
+ self.dead_instances.append(instance_id)
+ dir_paths = self.instance_to_dir_map[instance_id]
+ log.debug(f'force shuffling instance_id {instance_id}, directories {dir_paths}')
+ for dir_path in dir_paths:
+ dir_state = self.dir_states[dir_path]
+ if self.is_state_scheduled(dir_state, State.DISASSOCIATING):
+ log.debug(f'dir_path {dir_path} is disassociating, ignoring...')
+ continue
+ log.debug(f'shuffling dir_path {dir_path}')
+ if self.set_state(dir_state, State.SHUFFLING, True):
+ shuffle_dirs.append(dir_path)
+ log.debug(f'shuffling {shuffle_dirs}')
+ return shuffle_dirs
+
+ def dir_status(self, dir_path):
+ with self.lock:
+ dir_state = self.dir_states.get(dir_path, None)
+ if not dir_state:
+ raise MirrorException(-errno.ENOENT, f'{dir_path} is not tracked')
+ res = {} # type: Dict
+ if dir_state.stalled:
+ res['state'] = 'stalled'
+ res['reason'] = 'no mirror daemons running'
+ elif dir_state.state == State.ASSOCIATING:
+ res['state'] = 'mapping'
+ else:
+ state = None
+ dstate = dir_state.state
+ if dstate == State.ASSOCIATING:
+ state = 'mapping'
+ elif dstate == State.DISASSOCIATING:
+ state = 'unmapping'
+ elif dstate == State.SHUFFLING:
+ state = 'shuffling'
+ elif dstate == State.ASSOCIATED:
+ state = 'mapped'
+ elif dstate == State.INITIALIZING:
+ state = 'resolving'
+ res['state'] = state
+ res['instance_id'] = dir_state.instance_id
+ res['last_shuffled'] = dir_state.mapped_time
+ return res
+
+ def instance_summary(self):
+ with self.lock:
+ res = {
+ 'mapping': {}
+ } # type: Dict
+ for instance_id, dir_paths in self.instance_to_dir_map.items():
+ res['mapping'][instance_id] = f'{len(dir_paths)} directories'
+ return res
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/state_transition.py b/src/pybind/mgr/mirroring/fs/dir_map/state_transition.py
new file mode 100644
index 000000000..ef59a6a87
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/dir_map/state_transition.py
@@ -0,0 +1,94 @@
+import logging
+from enum import Enum, unique
+from typing import Dict
+
+log = logging.getLogger(__name__)
+
+@unique
+class State(Enum):
+ UNASSOCIATED = 0
+ INITIALIZING = 1
+ ASSOCIATING = 2
+ ASSOCIATED = 3
+ SHUFFLING = 4
+ DISASSOCIATING = 5
+
+@unique
+class ActionType(Enum):
+ NONE = 0
+ MAP_UPDATE = 1
+ MAP_REMOVE = 2
+ ACQUIRE = 3
+ RELEASE = 4
+
+@unique
+class PolicyAction(Enum):
+ MAP = 0
+ UNMAP = 1
+ REMOVE = 2
+
+class TransitionKey:
+ def __init__(self, state, action_type):
+ self.transition_key = [state, action_type]
+
+ def __hash__(self):
+ return hash(tuple(self.transition_key))
+
+ def __eq__(self, other):
+ return self.transition_key == other.transition_key
+
+ def __neq__(self, other):
+ return not(self == other)
+
+class Transition:
+ def __init__(self, action_type, start_policy_action=None,
+ finish_policy_action=None, final_state=None):
+ self.action_type = action_type
+ self.start_policy_action = start_policy_action
+ self.finish_policy_action = finish_policy_action
+ self.final_state = final_state
+
+ def __str__(self):
+ return "[action_type={0}, start_policy_action={1}, finish_policy_action={2}, final_state={3}".format(
+ self.action_type, self.start_policy_action, self.finish_policy_action, self.final_state)
+
+class StateTransition:
+ transition_table = {} # type: Dict[TransitionKey, Transition]
+
+ @staticmethod
+ def transit(state, action_type):
+ try:
+ return StateTransition.transition_table[TransitionKey(state, action_type)]
+ except KeyError:
+ raise Exception()
+
+ @staticmethod
+ def is_idle(state):
+ return state in (State.UNASSOCIATED, State.ASSOCIATED)
+
+StateTransition.transition_table = {
+ TransitionKey(State.INITIALIZING, ActionType.NONE) : Transition(ActionType.ACQUIRE),
+ TransitionKey(State.INITIALIZING, ActionType.ACQUIRE) : Transition(ActionType.NONE,
+ final_state=State.ASSOCIATED),
+
+ TransitionKey(State.ASSOCIATING, ActionType.NONE) : Transition(ActionType.MAP_UPDATE,
+ start_policy_action=PolicyAction.MAP),
+ TransitionKey(State.ASSOCIATING, ActionType.MAP_UPDATE) : Transition(ActionType.ACQUIRE),
+ TransitionKey(State.ASSOCIATING, ActionType.ACQUIRE) : Transition(ActionType.NONE,
+ final_state=State.ASSOCIATED),
+
+ TransitionKey(State.DISASSOCIATING, ActionType.NONE) : Transition(ActionType.RELEASE,
+ finish_policy_action=PolicyAction.UNMAP),
+ TransitionKey(State.DISASSOCIATING, ActionType.RELEASE) : Transition(ActionType.MAP_REMOVE,
+ finish_policy_action=PolicyAction.REMOVE),
+ TransitionKey(State.DISASSOCIATING, ActionType.MAP_REMOVE) : Transition(ActionType.NONE,
+ final_state=State.UNASSOCIATED),
+
+ TransitionKey(State.SHUFFLING, ActionType.NONE) : Transition(ActionType.RELEASE,
+ finish_policy_action=PolicyAction.UNMAP),
+ TransitionKey(State.SHUFFLING, ActionType.RELEASE) : Transition(ActionType.MAP_UPDATE,
+ start_policy_action=PolicyAction.MAP),
+ TransitionKey(State.SHUFFLING, ActionType.MAP_UPDATE) : Transition(ActionType.ACQUIRE),
+ TransitionKey(State.SHUFFLING, ActionType.ACQUIRE) : Transition(ActionType.NONE,
+ final_state=State.ASSOCIATED),
+ }
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/update.py b/src/pybind/mgr/mirroring/fs/dir_map/update.py
new file mode 100644
index 000000000..a70baa01a
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/dir_map/update.py
@@ -0,0 +1,151 @@
+import errno
+import pickle
+import logging
+
+import rados
+
+from ..utils import MIRROR_OBJECT_NAME, DIRECTORY_MAP_PREFIX, \
+ INSTANCE_ID_PREFIX, MIRROR_OBJECT_PREFIX
+
+log = logging.getLogger(__name__)
+
+MAX_UPDATE = 256
+
+class UpdateDirMapRequest:
+ def __init__(self, ioctx, update_mapping, removals, on_finish_callback):
+ self.ioctx = ioctx
+ self.update_mapping = update_mapping
+ self.removals = removals
+ self.on_finish_callback = on_finish_callback
+
+ @staticmethod
+ def omap_key(dir_path):
+ return f'{DIRECTORY_MAP_PREFIX}{dir_path}'
+
+ def send(self):
+ log.info('updating image map')
+ self.send_update()
+
+ def send_update(self):
+ log.debug(f'pending updates: {len(self.update_mapping)}+{len(self.removals)}')
+ try:
+ with rados.WriteOpCtx() as write_op:
+ keys = []
+ vals = []
+ dir_keys = list(self.update_mapping.keys())[0:MAX_UPDATE]
+ # gather updates
+ for dir_path in dir_keys:
+ mapping = self.update_mapping.pop(dir_path)
+ keys.append(UpdateDirMapRequest.omap_key(dir_path))
+ vals.append(pickle.dumps(mapping))
+ self.ioctx.set_omap(write_op, tuple(keys), tuple(vals))
+ # gather deletes
+ slicept = MAX_UPDATE - len(dir_keys)
+ removals = [UpdateDirMapRequest.omap_key(dir_path) for dir_path in self.removals[0:slicept]]
+ self.removals = self.removals[slicept:]
+ self.ioctx.remove_omap_keys(write_op, tuple(removals))
+ log.debug(f'applying {len(keys)} updates, {len(removals)} deletes')
+ self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update)
+ except rados.Error as e:
+ log.error(f'UpdateDirMapRequest.send_update exception: {e}')
+ self.finish(-e.args[0])
+
+ def handle_update(self, completion):
+ r = completion.get_return_value()
+ log.debug(f'handle_update: r={r}')
+ if not r == 0:
+ self.finish(r)
+ elif self.update_mapping or self.removals:
+ self.send_update()
+ else:
+ self.finish(0)
+
+ def finish(self, r):
+ log.info(f'finish: r={r}')
+ self.on_finish_callback(r)
+
+class UpdateInstanceRequest:
+ def __init__(self, ioctx, instances_added, instances_removed, on_finish_callback):
+ self.ioctx = ioctx
+ self.instances_added = instances_added
+ # purge vs remove: purge list is for purging on-disk instance
+ # object. remove is for purging instance map.
+ self.instances_removed = instances_removed.copy()
+ self.instances_purge = instances_removed.copy()
+ self.on_finish_callback = on_finish_callback
+
+ @staticmethod
+ def omap_key(instance_id):
+ return f'{INSTANCE_ID_PREFIX}{instance_id}'
+
+ @staticmethod
+ def cephfs_mirror_object_name(instance_id):
+ assert instance_id != ''
+ return f'{MIRROR_OBJECT_PREFIX}.{instance_id}'
+
+ def send(self):
+ log.info('updating instances')
+ self.send_update()
+
+ def send_update(self):
+ self.remove_instance_object()
+
+ def remove_instance_object(self):
+ log.debug(f'pending purges: {len(self.instances_purge)}')
+ if not self.instances_purge:
+ self.update_instance_map()
+ return
+ instance_id = self.instances_purge.pop()
+ self.ioctx.aio_remove(
+ UpdateInstanceRequest.cephfs_mirror_object_name(instance_id), oncomplete=self.handle_remove)
+
+ def handle_remove(self, completion):
+ r = completion.get_return_value()
+ log.debug(f'handle_remove: r={r}')
+ # cephfs-mirror instances remove their respective instance
+ # objects upon termination. so we handle ENOENT here. note
+ # that when an instance is blocklisted, it wont be able to
+ # purge its instance object, so we do it on its behalf.
+ if not r == 0 and not r == -errno.ENOENT:
+ self.finish(r)
+ return
+ self.remove_instance_object()
+
+ def update_instance_map(self):
+ log.debug(f'pending updates: {len(self.instances_added)}+{len(self.instances_removed)}')
+ try:
+ with rados.WriteOpCtx() as write_op:
+ keys = []
+ vals = []
+ instance_ids = list(self.instances_added.keys())[0:MAX_UPDATE]
+ # gather updates
+ for instance_id in instance_ids:
+ data = self.instances_added.pop(instance_id)
+ keys.append(UpdateInstanceRequest.omap_key(instance_id))
+ vals.append(pickle.dumps(data))
+ self.ioctx.set_omap(write_op, tuple(keys), tuple(vals))
+ # gather deletes
+ slicept = MAX_UPDATE - len(instance_ids)
+ removals = [UpdateInstanceRequest.omap_key(instance_id) \
+ for instance_id in self.instances_removed[0:slicept]]
+ self.instances_removed = self.instances_removed[slicept:]
+ self.ioctx.remove_omap_keys(write_op, tuple(removals))
+ log.debug(f'applying {len(keys)} updates, {len(removals)} deletes')
+ self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update)
+ except rados.Error as e:
+ log.error(f'UpdateInstanceRequest.update_instance_map exception: {e}')
+ self.finish(-e.args[0])
+
+ def handle_update(self, completion):
+ r = completion.get_return_value()
+ log.debug(f'handle_update: r={r}')
+ if not r == 0:
+ self.finish(r)
+ elif self.instances_added or self.instances_removed:
+ self.update_instance_map()
+ else:
+ self.finish(0)
+
+ def finish(self, r):
+ log.info(f'finish: r={r}')
+ self.on_finish_callback(r)
diff --git a/src/pybind/mgr/mirroring/fs/exception.py b/src/pybind/mgr/mirroring/fs/exception.py
new file mode 100644
index 000000000..d041b276c
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/exception.py
@@ -0,0 +1,3 @@
+class MirrorException(Exception):
+ def __init__(self, error_code, error_msg=''):
+ super().__init__(error_code, error_msg)
diff --git a/src/pybind/mgr/mirroring/fs/notify.py b/src/pybind/mgr/mirroring/fs/notify.py
new file mode 100644
index 000000000..992cba297
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/notify.py
@@ -0,0 +1,121 @@
+import errno
+import json
+import logging
+import threading
+import time
+
+import rados
+
+from .utils import MIRROR_OBJECT_PREFIX, AsyncOpTracker
+
+log = logging.getLogger(__name__)
+
+class Notifier:
+ def __init__(self, ioctx):
+ self.ioctx = ioctx
+
+ @staticmethod
+ def instance_object(instance_id):
+ return f'{MIRROR_OBJECT_PREFIX}.{instance_id}'
+
+ def notify_cbk(self, dir_path, callback):
+ def cbk(_, r, acks, timeouts):
+ log.debug(f'Notifier.notify_cbk: ret {r} acks: {acks} timeouts: {timeouts}')
+ callback(dir_path, r)
+ return cbk
+
+ def notify(self, dir_path, message, callback):
+ try:
+ instance_id = message[0]
+ message = message[1]
+ log.debug(f'Notifier.notify: {instance_id} {message} for {dir_path}')
+ self.ioctx.aio_notify(
+ Notifier.instance_object(
+ instance_id), self.notify_cbk(dir_path, callback), msg=message)
+ except rados.Error as e:
+ log.error(f'Notifier exception: {e}')
+ raise e
+
+class InstanceWatcher:
+ INSTANCE_TIMEOUT = 30
+ NOTIFY_INTERVAL = 1
+
+ class Listener:
+ def handle_instances(self, added, removed):
+ raise NotImplementedError()
+
+ def __init__(self, ioctx, instances, listener):
+ self.ioctx = ioctx
+ self.listener = listener
+ self.instances = {}
+ for instance_id, data in instances.items():
+ self.instances[instance_id] = {'addr': data['addr'],
+ 'seen': time.time()}
+ self.lock = threading.Lock()
+ self.cond = threading.Condition(self.lock)
+ self.done = threading.Event()
+ self.waiting = threading.Event()
+ self.notify_task = None
+ self.schedule_notify_task()
+
+ def schedule_notify_task(self):
+ assert self.notify_task == None
+ self.notify_task = threading.Timer(InstanceWatcher.NOTIFY_INTERVAL, self.notify)
+ self.notify_task.start()
+
+ def wait_and_stop(self):
+ with self.lock:
+ log.info('InstanceWatcher.wait_and_stop')
+ self.waiting.set()
+ self.cond.wait_for(lambda: self.done.is_set())
+ log.info('waiting done')
+ assert self.notify_task == None
+
+ def handle_notify(self, _, r, acks, timeouts):
+ log.debug(f'InstanceWatcher.handle_notify r={r} acks={acks} timeouts={timeouts}')
+ with self.lock:
+ try:
+ added = {}
+ removed = {}
+ if acks is None:
+ acks = []
+ ackd_instances = []
+ for ack in acks:
+ instance_id = str(ack[0])
+ ackd_instances.append(instance_id)
+ # sender data is quoted
+ notifier_data = json.loads(ack[2].decode('utf-8'))
+ log.debug(f'InstanceWatcher.handle_notify: {instance_id}: {notifier_data}')
+ if not instance_id in self.instances:
+ self.instances[instance_id] = {}
+ added[instance_id] = notifier_data['addr']
+ self.instances[instance_id]['addr'] = notifier_data['addr']
+ self.instances[instance_id]['seen'] = time.time()
+ # gather non responders
+ now = time.time()
+ for instance_id in list(self.instances.keys()):
+ data = self.instances[instance_id]
+ if (now - data['seen'] > InstanceWatcher.INSTANCE_TIMEOUT) or \
+ (self.waiting.is_set() and instance_id not in ackd_instances):
+ removed[instance_id] = data['addr']
+ self.instances.pop(instance_id)
+ if added or removed:
+ self.listener.handle_instances(added, removed)
+ except Exception as e:
+ log.warn(f'InstanceWatcher.handle_notify exception: {e}')
+ finally:
+ if not self.instances and self.waiting.is_set():
+ self.done.set()
+ self.cond.notifyAll()
+ else:
+ self.schedule_notify_task()
+
+ def notify(self):
+ with self.lock:
+ self.notify_task = None
+ try:
+ log.debug('InstanceWatcher.notify')
+ self.ioctx.aio_notify(MIRROR_OBJECT_PREFIX, self.handle_notify)
+ except rados.Error as e:
+ log.warn(f'InstanceWatcher exception: {e}')
+ self.schedule_notify_task()
diff --git a/src/pybind/mgr/mirroring/fs/snapshot_mirror.py b/src/pybind/mgr/mirroring/fs/snapshot_mirror.py
new file mode 100644
index 000000000..6fa8d0c4c
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/snapshot_mirror.py
@@ -0,0 +1,792 @@
+import base64
+import errno
+import json
+import logging
+import os
+import pickle
+import re
+import stat
+import threading
+import uuid
+from typing import Dict, Any
+
+import cephfs
+import rados
+
+from mgr_util import RTimer, CephfsClient, open_filesystem,\
+ CephfsConnectionException
+from mgr_module import NotifyType
+from .blocklist import blocklist
+from .notify import Notifier, InstanceWatcher
+from .utils import INSTANCE_ID_PREFIX, MIRROR_OBJECT_NAME, Finisher, \
+ AsyncOpTracker, connect_to_filesystem, disconnect_from_filesystem
+from .exception import MirrorException
+from .dir_map.create import create_mirror_object
+from .dir_map.load import load_dir_map, load_instances
+from .dir_map.update import UpdateDirMapRequest, UpdateInstanceRequest
+from .dir_map.policy import Policy
+from .dir_map.state_transition import ActionType
+
+log = logging.getLogger(__name__)
+
+CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL = 1
+
+class FSPolicy:
+ class InstanceListener(InstanceWatcher.Listener):
+ def __init__(self, fspolicy):
+ self.fspolicy = fspolicy
+
+ def handle_instances(self, added, removed):
+ self.fspolicy.update_instances(added, removed)
+
+ def __init__(self, mgr, ioctx):
+ self.mgr = mgr
+ self.ioctx = ioctx
+ self.pending = []
+ self.policy = Policy()
+ self.lock = threading.Lock()
+ self.cond = threading.Condition(self.lock)
+ self.dir_paths = []
+ self.async_requests = {}
+ self.finisher = Finisher()
+ self.op_tracker = AsyncOpTracker()
+ self.notifier = Notifier(ioctx)
+ self.instance_listener = FSPolicy.InstanceListener(self)
+ self.instance_watcher = None
+ self.stopping = threading.Event()
+ self.timer_task = RTimer(CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL,
+ self.process_updates)
+ self.timer_task.start()
+
+ def schedule_action(self, dir_paths):
+ self.dir_paths.extend(dir_paths)
+
+ def init(self, dir_mapping, instances):
+ with self.lock:
+ self.policy.init(dir_mapping)
+ # we'll schedule action for all directories, so don't bother capturing
+ # directory names here.
+ self.policy.add_instances(list(instances.keys()), initial_update=True)
+ self.instance_watcher = InstanceWatcher(self.ioctx, instances,
+ self.instance_listener)
+ self.schedule_action(list(dir_mapping.keys()))
+
+ def shutdown(self):
+ with self.lock:
+ log.debug('FSPolicy.shutdown')
+ self.stopping.set()
+ log.debug('canceling update timer task')
+ self.timer_task.cancel()
+ log.debug('update timer task canceled')
+ if self.instance_watcher:
+ log.debug('stopping instance watcher')
+ self.instance_watcher.wait_and_stop()
+ log.debug('stopping instance watcher')
+ self.op_tracker.wait_for_ops()
+ log.debug('FSPolicy.shutdown done')
+
+ def handle_update_mapping(self, updates, removals, request_id, callback, r):
+ log.info(f'handle_update_mapping: {updates} {removals} {request_id} {callback} {r}')
+ with self.lock:
+ try:
+ self.async_requests.pop(request_id)
+ if callback:
+ callback(updates, removals, r)
+ finally:
+ self.op_tracker.finish_async_op()
+
+ def handle_update_instances(self, instances_added, instances_removed, request_id, r):
+ log.info(f'handle_update_instances: {instances_added} {instances_removed} {request_id} {r}')
+ with self.lock:
+ try:
+ self.async_requests.pop(request_id)
+ if self.stopping.is_set():
+ log.debug(f'handle_update_instances: policy shutting down')
+ return
+ schedules = []
+ if instances_removed:
+ schedules.extend(self.policy.remove_instances(instances_removed))
+ if instances_added:
+ schedules.extend(self.policy.add_instances(instances_added))
+ self.schedule_action(schedules)
+ finally:
+ self.op_tracker.finish_async_op()
+
+ def update_mapping(self, update_map, removals, callback=None):
+ log.info(f'updating directory map: {len(update_map)}+{len(removals)} updates')
+ request_id = str(uuid.uuid4())
+ def async_callback(r):
+ self.finisher.queue(self.handle_update_mapping,
+ [list(update_map.keys()), removals, request_id, callback, r])
+ request = UpdateDirMapRequest(self.ioctx, update_map.copy(), removals.copy(), async_callback)
+ self.async_requests[request_id] = request
+ self.op_tracker.start_async_op()
+ log.debug(f'async request_id: {request_id}')
+ request.send()
+
+ def update_instances(self, added, removed):
+ logging.debug(f'update_instances: added={added}, removed={removed}')
+ for instance_id, addr in removed.items():
+ log.info(f'blocklisting instance_id: {instance_id} addr: {addr}')
+ blocklist(self.mgr, addr)
+ with self.lock:
+ instances_added = {}
+ instances_removed = []
+ for instance_id, addr in added.items():
+ instances_added[instance_id] = {'version': 1, 'addr': addr}
+ instances_removed = list(removed.keys())
+ request_id = str(uuid.uuid4())
+ def async_callback(r):
+ self.finisher.queue(self.handle_update_instances,
+ [list(instances_added.keys()), instances_removed, request_id, r])
+ # blacklisted instances can be removed at this point. remapping directories
+ # mapped to blacklisted instances on module startup is handled in policy
+ # add_instances().
+ request = UpdateInstanceRequest(self.ioctx, instances_added.copy(),
+ instances_removed.copy(), async_callback)
+ self.async_requests[request_id] = request
+ log.debug(f'async request_id: {request_id}')
+ self.op_tracker.start_async_op()
+ request.send()
+
+ def continue_action(self, updates, removals, r):
+ log.debug(f'continuing action: {updates}+{removals} r={r}')
+ if self.stopping.is_set():
+ log.debug('continue_action: policy shutting down')
+ return
+ schedules = []
+ for dir_path in updates:
+ schedule = self.policy.finish_action(dir_path, r)
+ if schedule:
+ schedules.append(dir_path)
+ for dir_path in removals:
+ schedule = self.policy.finish_action(dir_path, r)
+ if schedule:
+ schedules.append(dir_path)
+ self.schedule_action(schedules)
+
+ def handle_peer_ack(self, dir_path, r):
+ log.info(f'handle_peer_ack: {dir_path} r={r}')
+ with self.lock:
+ try:
+ if self.stopping.is_set():
+ log.debug(f'handle_peer_ack: policy shutting down')
+ return
+ self.continue_action([dir_path], [], r)
+ finally:
+ self.op_tracker.finish_async_op()
+
+ def process_updates(self):
+ def acquire_message(dir_path):
+ return json.dumps({'dir_path': dir_path,
+ 'mode': 'acquire'
+ })
+ def release_message(dir_path):
+ return json.dumps({'dir_path': dir_path,
+ 'mode': 'release'
+ })
+ with self.lock:
+ if not self.dir_paths or self.stopping.is_set():
+ return
+ update_map = {}
+ removals = []
+ notifies = {}
+ instance_purges = []
+ for dir_path in self.dir_paths:
+ action_type = self.policy.start_action(dir_path)
+ lookup_info = self.policy.lookup(dir_path)
+ log.debug(f'processing action: dir_path: {dir_path}, lookup_info: {lookup_info}, action_type: {action_type}')
+ if action_type == ActionType.NONE:
+ continue
+ elif action_type == ActionType.MAP_UPDATE:
+ # take care to not overwrite purge status
+ update_map[dir_path] = {'version': 1,
+ 'instance_id': lookup_info['instance_id'],
+ 'last_shuffled': lookup_info['mapped_time']
+ }
+ if lookup_info['purging']:
+ update_map[dir_path]['purging'] = 1
+ elif action_type == ActionType.MAP_REMOVE:
+ removals.append(dir_path)
+ elif action_type == ActionType.ACQUIRE:
+ notifies[dir_path] = (lookup_info['instance_id'], acquire_message(dir_path))
+ elif action_type == ActionType.RELEASE:
+ notifies[dir_path] = (lookup_info['instance_id'], release_message(dir_path))
+ if update_map or removals:
+ self.update_mapping(update_map, removals, callback=self.continue_action)
+ for dir_path, message in notifies.items():
+ self.op_tracker.start_async_op()
+ self.notifier.notify(dir_path, message, self.handle_peer_ack)
+ self.dir_paths.clear()
+
+ def add_dir(self, dir_path):
+ with self.lock:
+ lookup_info = self.policy.lookup(dir_path)
+ if lookup_info:
+ if lookup_info['purging']:
+ raise MirrorException(-errno.EAGAIN, f'remove in-progress for {dir_path}')
+ else:
+ raise MirrorException(-errno.EEXIST, f'directory {dir_path} is already tracked')
+ schedule = self.policy.add_dir(dir_path)
+ if not schedule:
+ return
+ update_map = {dir_path: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0}}
+ updated = False
+ def update_safe(updates, removals, r):
+ nonlocal updated
+ updated = True
+ self.cond.notifyAll()
+ self.update_mapping(update_map, [], callback=update_safe)
+ self.cond.wait_for(lambda: updated)
+ self.schedule_action([dir_path])
+
+ def remove_dir(self, dir_path):
+ with self.lock:
+ lookup_info = self.policy.lookup(dir_path)
+ if not lookup_info:
+ raise MirrorException(-errno.ENOENT, f'directory {dir_path} id not tracked')
+ if lookup_info['purging']:
+ raise MirrorException(-errno.EINVAL, f'directory {dir_path} is under removal')
+ update_map = {dir_path: {'version': 1,
+ 'instance_id': lookup_info['instance_id'],
+ 'last_shuffled': lookup_info['mapped_time'],
+ 'purging': 1}}
+ updated = False
+ sync_lock = threading.Lock()
+ sync_cond = threading.Condition(sync_lock)
+ def update_safe(r):
+ with sync_lock:
+ nonlocal updated
+ updated = True
+ sync_cond.notifyAll()
+ request = UpdateDirMapRequest(self.ioctx, update_map.copy(), [], update_safe)
+ request.send()
+ with sync_lock:
+ sync_cond.wait_for(lambda: updated)
+ schedule = self.policy.remove_dir(dir_path)
+ if schedule:
+ self.schedule_action([dir_path])
+
+ def status(self, dir_path):
+ with self.lock:
+ res = self.policy.dir_status(dir_path)
+ return 0, json.dumps(res, indent=4, sort_keys=True), ''
+
+ def summary(self):
+ with self.lock:
+ res = self.policy.instance_summary()
+ return 0, json.dumps(res, indent=4, sort_keys=True), ''
+
+class FSSnapshotMirror:
+ PEER_CONFIG_KEY_PREFIX = "cephfs/mirror/peer"
+
+ def __init__(self, mgr):
+ self.mgr = mgr
+ self.rados = mgr.rados
+ self.pool_policy = {}
+ self.fs_map = self.mgr.get('fs_map')
+ self.lock = threading.Lock()
+ self.refresh_pool_policy()
+ self.local_fs = CephfsClient(mgr)
+
+ def notify(self, notify_type: NotifyType):
+ log.debug(f'got notify type {notify_type}')
+ if notify_type == NotifyType.fs_map:
+ with self.lock:
+ self.fs_map = self.mgr.get('fs_map')
+ self.refresh_pool_policy_locked()
+
+ @staticmethod
+ def make_spec(client_name, cluster_name):
+ return f'{client_name}@{cluster_name}'
+
+ @staticmethod
+ def split_spec(spec):
+ try:
+ client_id, cluster_name = spec.split('@')
+ _, client_name = client_id.split('.')
+ return client_name, cluster_name
+ except ValueError:
+ raise MirrorException(-errno.EINVAL, f'invalid cluster spec {spec}')
+
+ @staticmethod
+ def get_metadata_pool(filesystem, fs_map):
+ for fs in fs_map['filesystems']:
+ if fs['mdsmap']['fs_name'] == filesystem:
+ return fs['mdsmap']['metadata_pool']
+ return None
+
+ @staticmethod
+ def get_filesystem_id(filesystem, fs_map):
+ for fs in fs_map['filesystems']:
+ if fs['mdsmap']['fs_name'] == filesystem:
+ return fs['id']
+ return None
+
+ @staticmethod
+ def peer_config_key(filesystem, peer_uuid):
+ return f'{FSSnapshotMirror.PEER_CONFIG_KEY_PREFIX}/{filesystem}/{peer_uuid}'
+
+ def config_set(self, key, val=None):
+ """set or remove a key from mon config store"""
+ if val:
+ cmd = {'prefix': 'config-key set',
+ 'key': key, 'val': val}
+ else:
+ cmd = {'prefix': 'config-key rm',
+ 'key': key}
+ r, outs, err = self.mgr.mon_command(cmd)
+ if r < 0:
+ log.error(f'mon command to set/remove config-key {key} failed: {err}')
+ raise Exception(-errno.EINVAL)
+
+ def config_get(self, key):
+ """fetch a config key value from mon config store"""
+ cmd = {'prefix': 'config-key get', 'key': key}
+ r, outs, err = self.mgr.mon_command(cmd)
+ if r < 0 and not r == -errno.ENOENT:
+ log.error(f'mon command to get config-key {key} failed: {err}')
+ raise Exception(-errno.EINVAL)
+ val = {}
+ if r == 0:
+ val = json.loads(outs)
+ return val
+
+ def filesystem_exist(self, filesystem):
+ for fs in self.fs_map['filesystems']:
+ if fs['mdsmap']['fs_name'] == filesystem:
+ return True
+ return False
+
+ def get_mirrored_filesystems(self):
+ return [fs['mdsmap']['fs_name'] for fs in self.fs_map['filesystems'] if fs.get('mirror_info', None)]
+
+ def get_filesystem_peers(self, filesystem):
+ """To be used when mirroring in enabled for the filesystem"""
+ for fs in self.fs_map['filesystems']:
+ if fs['mdsmap']['fs_name'] == filesystem:
+ return fs['mirror_info']['peers']
+ return None
+
+ def peer_exists(self, filesystem, remote_cluster_spec, remote_fs_name):
+ peers = self.get_filesystem_peers(filesystem)
+ for _, rem in peers.items():
+ remote = rem['remote']
+ spec = FSSnapshotMirror.make_spec(remote['client_name'], remote['cluster_name'])
+ if spec == remote_cluster_spec and remote['fs_name'] == remote_fs_name:
+ return True
+ return False
+
+ @staticmethod
+ def get_mirror_info(fs):
+ try:
+ val = fs.getxattr('/', 'ceph.mirror.info')
+ match = re.search(r'^cluster_id=([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}) fs_id=(\d+)$',
+ val.decode('utf-8'))
+ if match and len(match.groups()) == 2:
+ return {'cluster_id': match.group(1),
+ 'fs_id': int(match.group(2))
+ }
+ raise MirrorException(-errno.EINVAL, 'invalid ceph.mirror.info value format')
+ except cephfs.Error as e:
+ raise MirrorException(-e.errno, 'error fetching ceph.mirror.info xattr')
+
+ @staticmethod
+ def set_mirror_info(local_cluster_id, local_fsid, remote_fs):
+ log.info(f'setting {local_cluster_id}::{local_fsid} on remote')
+ try:
+ remote_fs.setxattr('/', 'ceph.mirror.info',
+ f'cluster_id={local_cluster_id} fs_id={local_fsid}'.encode('utf-8'), os.XATTR_CREATE)
+ except cephfs.Error as e:
+ if e.errno == errno.EEXIST:
+ try:
+ mi = FSSnapshotMirror.get_mirror_info(remote_fs)
+ cluster_id = mi['cluster_id']
+ fs_id = mi['fs_id']
+ if not (cluster_id == local_cluster_id and fs_id == local_fsid):
+ raise MirrorException(-errno.EEXIST, f'peer mirrorred by: (cluster_id: {cluster_id}, fs_id: {fs_id})')
+ except MirrorException:
+ # if mirror info cannot be fetched for some reason, let's just
+ # fail.
+ raise MirrorException(-errno.EEXIST, f'already an active peer')
+ else:
+ log.error(f'error setting mirrored fsid: {e}')
+ raise Exception(-e.errno)
+
+ def resolve_peer(self, fs_name, peer_uuid):
+ peers = self.get_filesystem_peers(fs_name)
+ for peer, rem in peers.items():
+ if peer == peer_uuid:
+ return rem['remote']
+ return None
+
+ def purge_mirror_info(self, local_fs_name, peer_uuid):
+ log.debug(f'local fs={local_fs_name} peer_uuid={peer_uuid}')
+ # resolve the peer to its spec
+ rem = self.resolve_peer(local_fs_name, peer_uuid)
+ if not rem:
+ return
+ log.debug(f'peer_uuid={peer_uuid} resolved to {rem}')
+ _, client_name = rem['client_name'].split('.')
+
+ # fetch auth details from config store
+ remote_conf = self.config_get(FSSnapshotMirror.peer_config_key(local_fs_name, peer_uuid))
+ remote_cluster, remote_fs = connect_to_filesystem(client_name,
+ rem['cluster_name'],
+ rem['fs_name'], 'remote', conf_dct=remote_conf)
+ try:
+ remote_fs.removexattr('/', 'ceph.mirror.info')
+ except cephfs.Error as e:
+ if not e.errno == errno.ENOENT:
+ log.error('error removing mirror info')
+ raise Exception(-e.errno)
+ finally:
+ disconnect_from_filesystem(rem['cluster_name'], rem['fs_name'], remote_cluster, remote_fs)
+
+ def verify_and_set_mirror_info(self, local_fs_name, remote_cluster_spec, remote_fs_name, remote_conf={}):
+ log.debug(f'local fs={local_fs_name} remote={remote_cluster_spec}/{remote_fs_name}')
+
+ client_name, cluster_name = FSSnapshotMirror.split_spec(remote_cluster_spec)
+ remote_cluster, remote_fs = connect_to_filesystem(client_name, cluster_name, remote_fs_name,
+ 'remote', conf_dct=remote_conf)
+ try:
+ local_cluster_id = self.rados.get_fsid()
+ remote_cluster_id = remote_cluster.get_fsid()
+ log.debug(f'local_cluster_id={local_cluster_id} remote_cluster_id={remote_cluster_id}')
+ if 'fsid' in remote_conf:
+ if not remote_cluster_id == remote_conf['fsid']:
+ raise MirrorException(-errno.EINVAL, 'FSID mismatch between bootstrap token and remote cluster')
+
+ local_fscid = remote_fscid = None
+ with open_filesystem(self.local_fs, local_fs_name) as local_fsh:
+ local_fscid = local_fsh.get_fscid()
+ remote_fscid = remote_fs.get_fscid()
+ log.debug(f'local_fscid={local_fscid} remote_fscid={remote_fscid}')
+ mi = None
+ try:
+ mi = FSSnapshotMirror.get_mirror_info(local_fsh)
+ except MirrorException as me:
+ if me.args[0] != -errno.ENODATA:
+ raise Exception(-errno.EINVAL)
+ if mi and mi['cluster_id'] == remote_cluster_id and mi['fs_id'] == remote_fscid:
+ raise MirrorException(-errno.EINVAL, f'file system is an active peer for file system: {remote_fs_name}')
+
+ if local_cluster_id == remote_cluster_id and local_fscid == remote_fscid:
+ raise MirrorException(-errno.EINVAL, "'Source and destination cluster fsid and "\
+ "file-system name can't be the same")
+ FSSnapshotMirror.set_mirror_info(local_cluster_id, local_fscid, remote_fs)
+ finally:
+ disconnect_from_filesystem(cluster_name, remote_fs_name, remote_cluster, remote_fs)
+
+ def init_pool_policy(self, filesystem):
+ metadata_pool_id = FSSnapshotMirror.get_metadata_pool(filesystem, self.fs_map)
+ if not metadata_pool_id:
+ log.error(f'cannot find metadata pool-id for filesystem {filesystem}')
+ raise Exception(-errno.EINVAL)
+ try:
+ ioctx = self.rados.open_ioctx2(metadata_pool_id)
+ # TODO: make async if required
+ dir_mapping = load_dir_map(ioctx)
+ instances = load_instances(ioctx)
+ # init policy
+ fspolicy = FSPolicy(self.mgr, ioctx)
+ log.debug(f'init policy for filesystem {filesystem}: pool-id {metadata_pool_id}')
+ fspolicy.init(dir_mapping, instances)
+ self.pool_policy[filesystem] = fspolicy
+ except rados.Error as e:
+ log.error(f'failed to access pool-id {metadata_pool_id} for filesystem {filesystem}: {e}')
+ raise Exception(-e.errno)
+
+ def refresh_pool_policy_locked(self):
+ filesystems = self.get_mirrored_filesystems()
+ log.debug(f'refreshing policy for {filesystems}')
+ for filesystem in list(self.pool_policy):
+ if not filesystem in filesystems:
+ log.info(f'shutdown pool policy for {filesystem}')
+ fspolicy = self.pool_policy.pop(filesystem)
+ fspolicy.shutdown()
+ for filesystem in filesystems:
+ if not filesystem in self.pool_policy:
+ log.info(f'init pool policy for {filesystem}')
+ self.init_pool_policy(filesystem)
+
+ def refresh_pool_policy(self):
+ with self.lock:
+ self.refresh_pool_policy_locked()
+
+ def enable_mirror(self, filesystem):
+ log.info(f'enabling mirror for filesystem {filesystem}')
+ with self.lock:
+ try:
+ metadata_pool_id = FSSnapshotMirror.get_metadata_pool(filesystem, self.fs_map)
+ if not metadata_pool_id:
+ log.error(f'cannot find metadata pool-id for filesystem {filesystem}')
+ raise Exception(-errno.EINVAL)
+ create_mirror_object(self.rados, metadata_pool_id)
+ cmd = {'prefix': 'fs mirror enable', 'fs_name': filesystem}
+ r, outs, err = self.mgr.mon_command(cmd)
+ if r < 0:
+ log.error(f'mon command to enable mirror failed: {err}')
+ raise Exception(-errno.EINVAL)
+ return 0, json.dumps({}), ''
+ except MirrorException as me:
+ return me.args[0], '', me.args[1]
+ except Exception as me:
+ return me.args[0], '', 'failed to enable mirroring'
+
+ def disable_mirror(self, filesystem):
+ log.info(f'disabling mirror for filesystem {filesystem}')
+ try:
+ with self.lock:
+ cmd = {'prefix': 'fs mirror disable', 'fs_name': filesystem}
+ r, outs, err = self.mgr.mon_command(cmd)
+ if r < 0:
+ log.error(f'mon command to disable mirror failed: {err}')
+ raise Exception(-errno.EINVAL)
+ return 0, json.dumps({}), ''
+ except MirrorException as me:
+ return me.args[0], '', me.args[1]
+ except Exception as e:
+ return e.args[0], '', 'failed to disable mirroring'
+
+ def peer_list(self, filesystem):
+ try:
+ with self.lock:
+ fspolicy = self.pool_policy.get(filesystem, None)
+ if not fspolicy:
+ raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
+ peers = self.get_filesystem_peers(filesystem)
+ peer_res = {}
+ for peer_uuid, rem in peers.items():
+ conf = self.config_get(FSSnapshotMirror.peer_config_key(filesystem, peer_uuid))
+ remote = rem['remote']
+ peer_res[peer_uuid] = {'client_name': remote['client_name'],
+ 'site_name': remote['cluster_name'],
+ 'fs_name': remote['fs_name']
+ }
+ if 'mon_host' in conf:
+ peer_res[peer_uuid]['mon_host'] = conf['mon_host']
+ return 0, json.dumps(peer_res), ''
+ except MirrorException as me:
+ return me.args[0], '', me.args[1]
+ except Exception as e:
+ return e.args[0], '', 'failed to list peers'
+
+ def peer_add(self, filesystem, remote_cluster_spec, remote_fs_name, remote_conf):
+ try:
+ if remote_fs_name == None:
+ remote_fs_name = filesystem
+ with self.lock:
+ fspolicy = self.pool_policy.get(filesystem, None)
+ if not fspolicy:
+ raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
+ ### peer updates for key, site-name are not yet supported
+ if self.peer_exists(filesystem, remote_cluster_spec, remote_fs_name):
+ return 0, json.dumps({}), ''
+ # _own_ the peer
+ self.verify_and_set_mirror_info(filesystem, remote_cluster_spec, remote_fs_name, remote_conf)
+ # unique peer uuid
+ peer_uuid = str(uuid.uuid4())
+ config_key = FSSnapshotMirror.peer_config_key(filesystem, peer_uuid)
+ if remote_conf.get('mon_host') and remote_conf.get('key'):
+ self.config_set(config_key, json.dumps(remote_conf))
+ cmd = {'prefix': 'fs mirror peer_add',
+ 'fs_name': filesystem,
+ 'uuid': peer_uuid,
+ 'remote_cluster_spec': remote_cluster_spec,
+ 'remote_fs_name': remote_fs_name}
+ r, outs, err = self.mgr.mon_command(cmd)
+ if r < 0:
+ log.error(f'mon command to add peer failed: {err}')
+ try:
+ log.debug(f'cleaning up config-key for {peer_uuid}')
+ self.config_set(config_key)
+ except:
+ pass
+ raise Exception(-errno.EINVAL)
+ return 0, json.dumps({}), ''
+ except MirrorException as me:
+ return me.args[0], '', me.args[1]
+ except Exception as e:
+ return e.args[0], '', 'failed to add peer'
+
+ def peer_remove(self, filesystem, peer_uuid):
+ try:
+ with self.lock:
+ fspolicy = self.pool_policy.get(filesystem, None)
+ if not fspolicy:
+ raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
+ # ok, this is being a bit lazy. remove mirror info from peer followed
+ # by purging the peer from fsmap. if the mirror daemon fs map updates
+ # are laggy, they happily continue to synchronize. ideally, we should
+ # purge the peer from fsmap here and purge mirror info on fsmap update
+ # (in notify()). but thats not straightforward -- before purging mirror
+ # info, we would need to wait for all mirror daemons to catch up with
+ # fsmap updates. this involves mirror daemons sending the fsmap epoch
+ # they have seen in reply to a notify request. TODO: fix this.
+ self.purge_mirror_info(filesystem, peer_uuid)
+ cmd = {'prefix': 'fs mirror peer_remove',
+ 'fs_name': filesystem,
+ 'uuid': peer_uuid}
+ r, outs, err = self.mgr.mon_command(cmd)
+ if r < 0:
+ log.error(f'mon command to remove peer failed: {err}')
+ raise Exception(-errno.EINVAL)
+ self.config_set(FSSnapshotMirror.peer_config_key(filesystem, peer_uuid))
+ return 0, json.dumps({}), ''
+ except MirrorException as me:
+ return me.args[0], '', me.args[1]
+ except Exception as e:
+ return e.args[0], '', 'failed to remove peer'
+
+ def peer_bootstrap_create(self, fs_name, client_name, site_name):
+ """create a bootstrap token for this peer filesystem"""
+ try:
+ with self.lock:
+ cmd = {'prefix': 'fs authorize',
+ 'filesystem': fs_name,
+ 'entity': client_name,
+ 'caps': ['/', 'rwps']}
+ r, outs, err = self.mgr.mon_command(cmd)
+ if r < 0:
+ log.error(f'mon command to create peer user failed: {err}')
+ raise Exception(-errno.EINVAL)
+ cmd = {'prefix': 'auth get',
+ 'entity': client_name,
+ 'format': 'json'}
+ r, outs, err = self.mgr.mon_command(cmd)
+ if r < 0:
+ log.error(f'mon command to fetch keyring failed: {err}')
+ raise Exception(-errno.EINVAL)
+ outs = json.loads(outs)
+ outs0 = outs[0]
+ token_dct = {'fsid': self.mgr.rados.get_fsid(),
+ 'filesystem': fs_name,
+ 'user': outs0['entity'],
+ 'site_name': site_name,
+ 'key': outs0['key'],
+ 'mon_host': self.mgr.rados.conf_get('mon_host')}
+ token_str = json.dumps(token_dct).encode('utf-8')
+ encoded_token = base64.b64encode(token_str)
+ return 0, json.dumps({'token': encoded_token.decode('utf-8')}), ''
+ except MirrorException as me:
+ return me.args[0], '', me.args[1]
+ except Exception as e:
+ return e.args[0], '', 'failed to bootstrap peer'
+
+ def peer_bootstrap_import(self, filesystem, token):
+ try:
+ token_str = base64.b64decode(token)
+ token_dct = json.loads(token_str.decode('utf-8'))
+ except:
+ return -errno.EINVAL, '', 'failed to parse token'
+ client_name = token_dct.pop('user')
+ cluster_name = token_dct.pop('site_name')
+ remote_fs_name = token_dct.pop('filesystem')
+ remote_cluster_spec = f'{client_name}@{cluster_name}'
+ return self.peer_add(filesystem, remote_cluster_spec, remote_fs_name, token_dct)
+
+ @staticmethod
+ def norm_path(dir_path):
+ if not os.path.isabs(dir_path):
+ raise MirrorException(-errno.EINVAL, f'{dir_path} should be an absolute path')
+ return os.path.normpath(dir_path)
+
+ def add_dir(self, filesystem, dir_path):
+ try:
+ with self.lock:
+ if not self.filesystem_exist(filesystem):
+ raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
+ fspolicy = self.pool_policy.get(filesystem, None)
+ if not fspolicy:
+ raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
+ dir_path = FSSnapshotMirror.norm_path(dir_path)
+ log.debug(f'path normalized to {dir_path}')
+ fspolicy.add_dir(dir_path)
+ return 0, json.dumps({}), ''
+ except MirrorException as me:
+ return me.args[0], '', me.args[1]
+ except Exception as e:
+ return e.args[0], '', 'failed to add directory'
+
+ def remove_dir(self, filesystem, dir_path):
+ try:
+ with self.lock:
+ if not self.filesystem_exist(filesystem):
+ raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
+ fspolicy = self.pool_policy.get(filesystem, None)
+ if not fspolicy:
+ raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
+ dir_path = FSSnapshotMirror.norm_path(dir_path)
+ fspolicy.remove_dir(dir_path)
+ return 0, json.dumps({}), ''
+ except MirrorException as me:
+ return me.args[0], '', me.args[1]
+ except Exception as e:
+ return e.args[0], '', 'failed to remove directory'
+
+ def status(self,filesystem, dir_path):
+ try:
+ with self.lock:
+ if not self.filesystem_exist(filesystem):
+ raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
+ fspolicy = self.pool_policy.get(filesystem, None)
+ if not fspolicy:
+ raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
+ dir_path = FSSnapshotMirror.norm_path(dir_path)
+ return fspolicy.status(dir_path)
+ except MirrorException as me:
+ return me.args[0], '', me.args[1]
+
+ def show_distribution(self, filesystem):
+ try:
+ with self.lock:
+ if not self.filesystem_exist(filesystem):
+ raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
+ fspolicy = self.pool_policy.get(filesystem, None)
+ if not fspolicy:
+ raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
+ return fspolicy.summary()
+ except MirrorException as me:
+ return me.args[0], '', me.args[1]
+
+ def daemon_status(self):
+ try:
+ with self.lock:
+ daemons = []
+ sm = self.mgr.get('service_map')
+ daemon_entry = sm['services'].get('cephfs-mirror', None)
+ log.debug(f'daemon_entry: {daemon_entry}')
+ if daemon_entry is not None:
+ for daemon_key in daemon_entry.get('daemons', []):
+ try:
+ daemon_id = int(daemon_key)
+ except ValueError:
+ continue
+ daemon = {
+ 'daemon_id' : daemon_id,
+ 'filesystems' : []
+ } # type: Dict[str, Any]
+ daemon_status = self.mgr.get_daemon_status('cephfs-mirror', daemon_key)
+ if not daemon_status:
+ log.debug(f'daemon status not yet availble for cephfs-mirror daemon: {daemon_key}')
+ continue
+ status = json.loads(daemon_status['status_json'])
+ for fs_id, fs_desc in status.items():
+ fs = {'filesystem_id' : int(fs_id),
+ 'name' : fs_desc['name'],
+ 'directory_count' : fs_desc['directory_count'],
+ 'peers' : []
+ } # type: Dict[str, Any]
+ for peer_uuid, peer_desc in fs_desc['peers'].items():
+ peer = {
+ 'uuid' : peer_uuid,
+ 'remote' : peer_desc['remote'],
+ 'stats' : peer_desc['stats']
+ }
+ fs['peers'].append(peer)
+ daemon['filesystems'].append(fs)
+ daemons.append(daemon)
+ return 0, json.dumps(daemons), ''
+ except MirrorException as me:
+ return me.args[0], '', me.args[1]
diff --git a/src/pybind/mgr/mirroring/fs/utils.py b/src/pybind/mgr/mirroring/fs/utils.py
new file mode 100644
index 000000000..5e1d05373
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/utils.py
@@ -0,0 +1,152 @@
+import errno
+import logging
+import threading
+
+import rados
+import cephfs
+
+from .exception import MirrorException
+
+MIRROR_OBJECT_PREFIX = 'cephfs_mirror'
+MIRROR_OBJECT_NAME = MIRROR_OBJECT_PREFIX
+
+INSTANCE_ID_PREFIX = "instance_"
+DIRECTORY_MAP_PREFIX = "dir_map_"
+
+log = logging.getLogger(__name__)
+
+def connect_to_cluster(client_name, cluster_name, conf_dct, desc=''):
+ try:
+ log.debug(f'connecting to {desc} cluster: {client_name}/{cluster_name}')
+ mon_host = conf_dct.get('mon_host', '')
+ cephx_key = conf_dct.get('key', '')
+ if mon_host and cephx_key:
+ r_rados = rados.Rados(rados_id=client_name, conf={'mon_host': mon_host,
+ 'key': cephx_key})
+ else:
+ r_rados = rados.Rados(rados_id=client_name, clustername=cluster_name)
+ r_rados.conf_read_file()
+ r_rados.connect()
+ log.debug(f'connected to {desc} cluster')
+ return r_rados
+ except rados.Error as e:
+ if e.errno == errno.ENOENT:
+ raise MirrorException(-e.errno, f'cluster {cluster_name} does not exist')
+ else:
+ log.error(f'error connecting to cluster: {e}')
+ raise Exception(-e.errno)
+
+def disconnect_from_cluster(cluster_name, cluster):
+ try:
+ log.debug(f'disconnecting from cluster {cluster_name}')
+ cluster.shutdown()
+ log.debug(f'disconnected from cluster {cluster_name}')
+ except Exception as e:
+ log.error(f'error disconnecting: {e}')
+
+def connect_to_filesystem(client_name, cluster_name, fs_name, desc, conf_dct={}):
+ try:
+ cluster = connect_to_cluster(client_name, cluster_name, conf_dct, desc)
+ log.debug(f'connecting to {desc} filesystem: {fs_name}')
+ fs = cephfs.LibCephFS(rados_inst=cluster)
+ fs.conf_set("client_mount_uid", "0")
+ fs.conf_set("client_mount_gid", "0")
+ fs.conf_set("client_check_pool_perm", "false")
+ log.debug('CephFS initializing...')
+ fs.init()
+ log.debug('CephFS mounting...')
+ fs.mount(filesystem_name=fs_name.encode('utf-8'))
+ log.debug(f'Connection to cephfs {fs_name} complete')
+ return (cluster, fs)
+ except cephfs.Error as e:
+ if e.errno == errno.ENOENT:
+ raise MirrorException(-e.errno, f'filesystem {fs_name} does not exist')
+ else:
+ log.error(f'error connecting to filesystem {fs_name}: {e}')
+ raise Exception(-e.errno)
+
+def disconnect_from_filesystem(cluster_name, fs_name, cluster, fs_handle):
+ try:
+ log.debug(f'disconnecting from filesystem {fs_name}')
+ fs_handle.shutdown()
+ log.debug(f'disconnected from filesystem {fs_name}')
+ disconnect_from_cluster(cluster_name, cluster)
+ except Exception as e:
+ log.error(f'error disconnecting: {e}')
+
+class _ThreadWrapper(threading.Thread):
+ def __init__(self, name):
+ self.q = []
+ self.stopping = threading.Event()
+ self.terminated = threading.Event()
+ self.lock = threading.Lock()
+ self.cond = threading.Condition(self.lock)
+ super().__init__(name=name)
+ super().start()
+
+ def run(self):
+ try:
+ with self.lock:
+ while True:
+ self.cond.wait_for(lambda: self.q or self.stopping.is_set())
+ if self.stopping.is_set():
+ log.debug('thread exiting')
+ self.terminated.set()
+ self.cond.notifyAll()
+ return
+ q = self.q.copy()
+ self.q.clear()
+ self.lock.release()
+ try:
+ for item in q:
+ log.debug(f'calling {item[0]} params {item[1]}')
+ item[0](*item[1])
+ except Exception as e:
+ log.warn(f'callback exception: {e}')
+ self.lock.acquire()
+ except Exception as e:
+ log.info(f'threading exception: {e}')
+
+ def queue(self, cbk, args):
+ with self.lock:
+ self.q.append((cbk, args))
+ self.cond.notifyAll()
+
+ def stop(self):
+ with self.lock:
+ self.stopping.set()
+ self.cond.notifyAll()
+ self.cond.wait_for(lambda: self.terminated.is_set())
+
+class Finisher:
+ def __init__(self):
+ self.lock = threading.Lock()
+ self.thread = _ThreadWrapper(name='finisher')
+
+ def queue(self, cbk, args=[]):
+ with self.lock:
+ self.thread.queue(cbk, args)
+
+class AsyncOpTracker:
+ def __init__(self):
+ self.ops_in_progress = 0
+ self.lock = threading.Lock()
+ self.cond = threading.Condition(self.lock)
+
+ def start_async_op(self):
+ with self.lock:
+ self.ops_in_progress += 1
+ log.debug(f'start_async_op: {self.ops_in_progress}')
+
+ def finish_async_op(self):
+ with self.lock:
+ self.ops_in_progress -= 1
+ log.debug(f'finish_async_op: {self.ops_in_progress}')
+ assert(self.ops_in_progress >= 0)
+ self.cond.notifyAll()
+
+ def wait_for_ops(self):
+ with self.lock:
+ log.debug(f'wait_for_ops: {self.ops_in_progress}')
+ self.cond.wait_for(lambda: self.ops_in_progress == 0)
+ log.debug(f'done')