diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/pybind/mgr/mirroring/fs | |
parent | Initial commit. (diff) | |
download | ceph-upstream/18.2.2.tar.xz ceph-upstream/18.2.2.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/mirroring/fs')
-rw-r--r-- | src/pybind/mgr/mirroring/fs/__init__.py | 0 | ||||
-rw-r--r-- | src/pybind/mgr/mirroring/fs/blocklist.py | 10 | ||||
-rw-r--r-- | src/pybind/mgr/mirroring/fs/dir_map/__init__.py | 0 | ||||
-rw-r--r-- | src/pybind/mgr/mirroring/fs/dir_map/create.py | 23 | ||||
-rw-r--r-- | src/pybind/mgr/mirroring/fs/dir_map/load.py | 74 | ||||
-rw-r--r-- | src/pybind/mgr/mirroring/fs/dir_map/policy.py | 380 | ||||
-rw-r--r-- | src/pybind/mgr/mirroring/fs/dir_map/state_transition.py | 94 | ||||
-rw-r--r-- | src/pybind/mgr/mirroring/fs/dir_map/update.py | 151 | ||||
-rw-r--r-- | src/pybind/mgr/mirroring/fs/exception.py | 3 | ||||
-rw-r--r-- | src/pybind/mgr/mirroring/fs/notify.py | 121 | ||||
-rw-r--r-- | src/pybind/mgr/mirroring/fs/snapshot_mirror.py | 792 | ||||
-rw-r--r-- | src/pybind/mgr/mirroring/fs/utils.py | 152 |
12 files changed, 1800 insertions, 0 deletions
diff --git a/src/pybind/mgr/mirroring/fs/__init__.py b/src/pybind/mgr/mirroring/fs/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/__init__.py diff --git a/src/pybind/mgr/mirroring/fs/blocklist.py b/src/pybind/mgr/mirroring/fs/blocklist.py new file mode 100644 index 000000000..473b5f262 --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/blocklist.py @@ -0,0 +1,10 @@ +import logging + +log = logging.getLogger(__name__) + +def blocklist(mgr, addr): + cmd = {'prefix': 'osd blocklist', 'blocklistop': 'add', 'addr': str(addr)} + r, outs, err = mgr.mon_command(cmd) + if r != 0: + log.error(f'blocklist error: {err}') + return r diff --git a/src/pybind/mgr/mirroring/fs/dir_map/__init__.py b/src/pybind/mgr/mirroring/fs/dir_map/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/dir_map/__init__.py diff --git a/src/pybind/mgr/mirroring/fs/dir_map/create.py b/src/pybind/mgr/mirroring/fs/dir_map/create.py new file mode 100644 index 000000000..963dfe915 --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/dir_map/create.py @@ -0,0 +1,23 @@ +import errno +import logging + +import rados + +from ..exception import MirrorException +from ..utils import MIRROR_OBJECT_NAME + +log = logging.getLogger(__name__) + +def create_mirror_object(rados_inst, pool_id): + log.info(f'creating mirror object: {MIRROR_OBJECT_NAME}') + try: + with rados_inst.open_ioctx2(pool_id) as ioctx: + with rados.WriteOpCtx() as write_op: + write_op.new(rados.LIBRADOS_CREATE_EXCLUSIVE) + ioctx.operate_write_op(write_op, MIRROR_OBJECT_NAME) + except rados.Error as e: + if e.errno == errno.EEXIST: + # be graceful + return -e.errno + log.error(f'failed to create mirror object: {e}') + raise Exception(-e.args[0]) diff --git a/src/pybind/mgr/mirroring/fs/dir_map/load.py b/src/pybind/mgr/mirroring/fs/dir_map/load.py new file mode 100644 index 000000000..42468b4e8 --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/dir_map/load.py @@ -0,0 +1,74 @@ +import errno +import pickle +import logging +from typing import Dict + +import rados + +from ..exception import MirrorException +from ..utils import MIRROR_OBJECT_NAME, DIRECTORY_MAP_PREFIX, \ + INSTANCE_ID_PREFIX + +log = logging.getLogger(__name__) + +MAX_RETURN = 256 + +def handle_dir_load(dir_mapping, dir_map): + for directory_str, encoded_map in dir_map.items(): + dir_path = directory_str[len(DIRECTORY_MAP_PREFIX):] + decoded_map = pickle.loads(encoded_map) + log.debug(f'{dir_path} -> {decoded_map}') + dir_mapping[dir_path] = decoded_map + +def load_dir_map(ioctx): + dir_mapping = {} # type: Dict[str, Dict] + log.info('loading dir map...') + try: + with rados.ReadOpCtx() as read_op: + start = "" + while True: + iter, ret = ioctx.get_omap_vals(read_op, start, DIRECTORY_MAP_PREFIX, MAX_RETURN) + if not ret == 0: + log.error(f'failed to fetch dir mapping omap') + raise Exception(-errno.EINVAL) + ioctx.operate_read_op(read_op, MIRROR_OBJECT_NAME) + dir_map = dict(iter) + if not dir_map: + break + handle_dir_load(dir_mapping, dir_map) + start = dir_map.popitem()[0] + log.info("loaded {0} directory mapping(s) from disk".format(len(dir_mapping))) + return dir_mapping + except rados.Error as e: + log.error(f'exception when loading directory mapping: {e}') + raise Exception(-e.errno) + +def handle_instance_load(instance_mapping, instance_map): + for instance, e_data in instance_map.items(): + instance_id = instance[len(INSTANCE_ID_PREFIX):] + d_data = pickle.loads(e_data) + log.debug(f'{instance_id} -> {d_data}') + instance_mapping[instance_id] = d_data + +def load_instances(ioctx): + instance_mapping = {} # type: Dict[str, Dict] + log.info('loading instances...') + try: + with rados.ReadOpCtx() as read_op: + start = "" + while True: + iter, ret = ioctx.get_omap_vals(read_op, start, INSTANCE_ID_PREFIX, MAX_RETURN) + if not ret == 0: + log.error(f'failed to fetch instance omap') + raise Exception(-errno.EINVAL) + ioctx.operate_read_op(read_op, MIRROR_OBJECT_NAME) + instance_map = dict(iter) + if not instance_map: + break + handle_instance_load(instance_mapping, instance_map) + start = instance_map.popitem()[0] + log.info("loaded {0} instance(s) from disk".format(len(instance_mapping))) + return instance_mapping + except rados.Error as e: + log.error(f'exception when loading instances: {e}') + raise Exception(-e.errno) diff --git a/src/pybind/mgr/mirroring/fs/dir_map/policy.py b/src/pybind/mgr/mirroring/fs/dir_map/policy.py new file mode 100644 index 000000000..aef90b55f --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/dir_map/policy.py @@ -0,0 +1,380 @@ +import os +import errno +import logging +import time +from threading import Lock +from typing import Dict + +from .state_transition import ActionType, PolicyAction, Transition, \ + State, StateTransition +from ..exception import MirrorException + +log = logging.getLogger(__name__) + +class DirectoryState: + def __init__(self, instance_id=None, mapped_time=None): + self.instance_id = instance_id + self.mapped_time = mapped_time + self.state = State.UNASSOCIATED + self.stalled = False + self.transition = Transition(ActionType.NONE) + self.next_state = None + self.purging = False + + def __str__(self): + return f'[instance_id={self.instance_id}, mapped_time={self.mapped_time},'\ + f' state={self.state}, transition={self.transition}, next_state={self.next_state},'\ + f' purging={self.purging}]' + +class Policy: + # number of seconds after which a directory can be reshuffled + # to other mirror daemon instances. + DIR_SHUFFLE_THROTTLE_INTERVAL = 300 + + def __init__(self): + self.dir_states = {} + self.instance_to_dir_map = {} + self.dead_instances = [] + self.lock = Lock() + + @staticmethod + def is_instance_action(action_type): + return action_type in (ActionType.ACQUIRE, + ActionType.RELEASE) + + def is_dead_instance(self, instance_id): + return instance_id in self.dead_instances + + def is_state_scheduled(self, dir_state, state): + return dir_state.state == state or dir_state.next_state == state + + def is_shuffling(self, dir_path): + log.debug(f'is_shuffling: {dir_path}') + return self.is_state_scheduled(self.dir_states[dir_path], State.SHUFFLING) + + def can_shuffle_dir(self, dir_path): + """Right now, shuffle directories only based on idleness. Later, we + probably want to avoid shuffling images that were recently shuffled. + """ + log.debug(f'can_shuffle_dir: {dir_path}') + dir_state = self.dir_states[dir_path] + return StateTransition.is_idle(dir_state.state) and \ + (time.time() - dir_state['mapped_time']) > Policy.DIR_SHUFFLE_THROTTLE_INTERVAL + + def set_state(self, dir_state, state, ignore_current_state=False): + if not ignore_current_state and dir_state.state == state: + return False + elif StateTransition.is_idle(dir_state.state): + dir_state.state = state + dir_state.next_state = None + dir_state.transition = StateTransition.transit( + dir_state.state, dir_state.transition.action_type) + return True + dir_state.next_state = state + return False + + def init(self, dir_mapping): + with self.lock: + for dir_path, dir_map in dir_mapping.items(): + instance_id = dir_map['instance_id'] + if instance_id: + if not instance_id in self.instance_to_dir_map: + self.instance_to_dir_map[instance_id] = [] + self.instance_to_dir_map[instance_id].append(dir_path) + self.dir_states[dir_path] = DirectoryState(instance_id, dir_map['last_shuffled']) + dir_state = self.dir_states[dir_path] + state = State.INITIALIZING if instance_id else State.ASSOCIATING + purging = dir_map.get('purging', 0) + if purging: + dir_state.purging = True + state = State.DISASSOCIATING + if not instance_id: + dir_state.transition = StateTransition.transit(state, + dir_state.transition.action_type) + log.debug(f'starting state: {dir_path} {state}: {dir_state}') + self.set_state(dir_state, state) + log.debug(f'init dir_state: {dir_state}') + + def lookup(self, dir_path): + log.debug(f'looking up {dir_path}') + with self.lock: + dir_state = self.dir_states.get(dir_path, None) + if dir_state: + return {'instance_id': dir_state.instance_id, + 'mapped_time': dir_state.mapped_time, + 'purging': dir_state.purging} + return None + + def map(self, dir_path, dir_state): + log.debug(f'mapping {dir_path}') + min_instance_id = None + current_instance_id = dir_state.instance_id + if current_instance_id and not self.is_dead_instance(current_instance_id): + return True + if self.is_dead_instance(current_instance_id): + self.unmap(dir_path, dir_state) + for instance_id, dir_paths in self.instance_to_dir_map.items(): + if self.is_dead_instance(instance_id): + continue + if not min_instance_id or len(dir_paths) < len(self.instance_to_dir_map[min_instance_id]): + min_instance_id = instance_id + if not min_instance_id: + log.debug(f'instance unavailable for {dir_path}') + return False + log.debug(f'dir_path {dir_path} maps to instance {min_instance_id}') + dir_state.instance_id = min_instance_id + dir_state.mapped_time = time.time() + self.instance_to_dir_map[min_instance_id].append(dir_path) + return True + + def unmap(self, dir_path, dir_state): + instance_id = dir_state.instance_id + log.debug(f'unmapping {dir_path} from instance {instance_id}') + self.instance_to_dir_map[instance_id].remove(dir_path) + dir_state.instance_id = None + dir_state.mapped_time = None + if self.is_dead_instance(instance_id) and not self.instance_to_dir_map[instance_id]: + self.instance_to_dir_map.pop(instance_id) + self.dead_instances.remove(instance_id) + + def shuffle(self, dirs_per_instance, include_stalled_dirs): + log.debug(f'directories per instance: {dirs_per_instance}') + shuffle_dirs = [] + for instance_id, dir_paths in self.instance_to_dir_map.items(): + cut_off = len(dir_paths) - dirs_per_instance + if cut_off > 0: + for dir_path in dir_paths: + if cut_off == 0: + break + if self.is_shuffling(dir_path): + cut_off -= 1 + elif self.can_shuffle_dir(dir_path): + cut_off -= 1 + shuffle_dirs.append(dir_path) + if include_stalled_dirs: + for dir_path, dir_state in self.dir_states.items(): + if dir_state.stalled: + log.debug(f'{dir_path} is stalled: {dir_state} -- trigerring kick') + dir_state.stalled = False + shuffle_dirs.append(dir_path) + return shuffle_dirs + + def execute_policy_action(self, dir_path, dir_state, policy_action): + log.debug(f'executing for directory {dir_path} policy_action {policy_action}') + + done = True + if policy_action == PolicyAction.MAP: + done = self.map(dir_path, dir_state) + elif policy_action == PolicyAction.UNMAP: + self.unmap(dir_path, dir_state) + elif policy_action == PolicyAction.REMOVE: + if dir_state.state == State.UNASSOCIATED: + self.dir_states.pop(dir_path) + else: + raise Exception() + return done + + def start_action(self, dir_path): + log.debug(f'start action: {dir_path}') + with self.lock: + dir_state = self.dir_states.get(dir_path, None) + if not dir_state: + raise Exception() + log.debug(f'dir_state: {dir_state}') + if dir_state.transition.start_policy_action: + stalled = not self.execute_policy_action(dir_path, dir_state, + dir_state.transition.start_policy_action) + if stalled: + next_action = ActionType.NONE + if dir_state.purging: + dir_state.next_state = None + dir_state.state = State.UNASSOCIATED + dir_state.transition = StateTransition.transit(State.DISASSOCIATING, ActionType.NONE) + self.set_state(dir_state, State.DISASSOCIATING) + next_action = dir_state.transition.action_type + else: + dir_state.stalled = True + log.debug(f'state machine stalled') + return next_action + return dir_state.transition.action_type + + def finish_action(self, dir_path, r): + log.debug(f'finish action {dir_path} r={r}') + with self.lock: + dir_state = self.dir_states.get(dir_path, None) + if not dir_state: + raise Exception() + if r < 0 and (not Policy.is_instance_action(dir_state.transition.action_type) or + not dir_state.instance_id or + not dir_state.instance_id in self.dead_instances): + return True + log.debug(f'dir_state: {dir_state}') + finish_policy_action = dir_state.transition.finish_policy_action + dir_state.transition = StateTransition.transit( + dir_state.state, dir_state.transition.action_type) + log.debug(f'transitioned to dir_state: {dir_state}') + if dir_state.transition.final_state: + log.debug('reached final state') + dir_state.state = dir_state.transition.final_state + dir_state.transition = Transition(ActionType.NONE) + log.debug(f'final dir_state: {dir_state}') + if StateTransition.is_idle(dir_state.state) and dir_state.next_state: + self.set_state(dir_state, dir_state.next_state) + pending = not dir_state.transition.action_type == ActionType.NONE + if finish_policy_action: + self.execute_policy_action(dir_path, dir_state, finish_policy_action) + return pending + + def find_tracked_ancestor_or_subtree(self, dir_path): + for tracked_path, _ in self.dir_states.items(): + comp = [dir_path, tracked_path] + cpath = os.path.commonpath(comp) + if cpath in comp: + what = 'subtree' if cpath == tracked_path else 'ancestor' + return (tracked_path, what) + return None + + def add_dir(self, dir_path): + log.debug(f'adding dir_path {dir_path}') + with self.lock: + if dir_path in self.dir_states: + return False + as_info = self.find_tracked_ancestor_or_subtree(dir_path) + if as_info: + raise MirrorException(-errno.EINVAL, f'{dir_path} is a {as_info[1]} of tracked path {as_info[0]}') + self.dir_states[dir_path] = DirectoryState() + dir_state = self.dir_states[dir_path] + log.debug(f'add dir_state: {dir_state}') + if dir_state.state == State.INITIALIZING: + return False + return self.set_state(dir_state, State.ASSOCIATING) + + def remove_dir(self, dir_path): + log.debug(f'removing dir_path {dir_path}') + with self.lock: + dir_state = self.dir_states.get(dir_path, None) + if not dir_state: + return False + log.debug(f'removing dir_state: {dir_state}') + dir_state.purging = True + # advance the state machine with DISASSOCIATING state for removal + if dir_state.stalled: + dir_state.state = State.UNASSOCIATED + dir_state.transition = StateTransition.transit(State.DISASSOCIATING, ActionType.NONE) + r = self.set_state(dir_state, State.DISASSOCIATING) + log.debug(f'dir_state: {dir_state}') + return r + + def add_instances_initial(self, instance_ids): + """Take care of figuring out instances which no longer exist + and remove them. This is to be done only once on startup to + identify instances which were previously removed but directories + are still mapped (on-disk) to them. + """ + for instance_id in instance_ids: + if not instance_id in self.instance_to_dir_map: + self.instance_to_dir_map[instance_id] = [] + dead_instances = [] + for instance_id, _ in self.instance_to_dir_map.items(): + if not instance_id in instance_ids: + dead_instances.append(instance_id) + if dead_instances: + self._remove_instances(dead_instances) + + def add_instances(self, instance_ids, initial_update=False): + log.debug(f'adding instances: {instance_ids} initial_update {initial_update}') + with self.lock: + if initial_update: + self.add_instances_initial(instance_ids) + else: + nr_instances = len(self.instance_to_dir_map) + nr_dead_instances = len(self.dead_instances) + if nr_instances > 0: + # adjust dead instances + nr_instances -= nr_dead_instances + include_stalled_dirs = nr_instances == 0 + for instance_id in instance_ids: + if not instance_id in self.instance_to_dir_map: + self.instance_to_dir_map[instance_id] = [] + dirs_per_instance = int(len(self.dir_states) / + (len(self.instance_to_dir_map) - nr_dead_instances)) + if dirs_per_instance == 0: + dirs_per_instance += 1 + shuffle_dirs = [] + # super set of directories which are candidates for shuffling -- choose + # those which can be shuffle rightaway (others will be shuffled when + # they reach idle state). + shuffle_dirs_ss = self.shuffle(dirs_per_instance, include_stalled_dirs) + if include_stalled_dirs: + return shuffle_dirs_ss + for dir_path in shuffle_dirs_ss: + dir_state = self.dir_states[dir_path] + if self.set_state(dir_state, State.SHUFFLING): + shuffle_dirs.append(dir_path) + log.debug(f'remapping directories: {shuffle_dirs}') + return shuffle_dirs + + def remove_instances(self, instance_ids): + with self.lock: + return self._remove_instances(instance_ids) + + def _remove_instances(self, instance_ids): + log.debug(f'removing instances: {instance_ids}') + shuffle_dirs = [] + for instance_id in instance_ids: + if not instance_id in self.instance_to_dir_map: + continue + if not self.instance_to_dir_map[instance_id]: + self.instance_to_dir_map.pop(instance_id) + continue + self.dead_instances.append(instance_id) + dir_paths = self.instance_to_dir_map[instance_id] + log.debug(f'force shuffling instance_id {instance_id}, directories {dir_paths}') + for dir_path in dir_paths: + dir_state = self.dir_states[dir_path] + if self.is_state_scheduled(dir_state, State.DISASSOCIATING): + log.debug(f'dir_path {dir_path} is disassociating, ignoring...') + continue + log.debug(f'shuffling dir_path {dir_path}') + if self.set_state(dir_state, State.SHUFFLING, True): + shuffle_dirs.append(dir_path) + log.debug(f'shuffling {shuffle_dirs}') + return shuffle_dirs + + def dir_status(self, dir_path): + with self.lock: + dir_state = self.dir_states.get(dir_path, None) + if not dir_state: + raise MirrorException(-errno.ENOENT, f'{dir_path} is not tracked') + res = {} # type: Dict + if dir_state.stalled: + res['state'] = 'stalled' + res['reason'] = 'no mirror daemons running' + elif dir_state.state == State.ASSOCIATING: + res['state'] = 'mapping' + else: + state = None + dstate = dir_state.state + if dstate == State.ASSOCIATING: + state = 'mapping' + elif dstate == State.DISASSOCIATING: + state = 'unmapping' + elif dstate == State.SHUFFLING: + state = 'shuffling' + elif dstate == State.ASSOCIATED: + state = 'mapped' + elif dstate == State.INITIALIZING: + state = 'resolving' + res['state'] = state + res['instance_id'] = dir_state.instance_id + res['last_shuffled'] = dir_state.mapped_time + return res + + def instance_summary(self): + with self.lock: + res = { + 'mapping': {} + } # type: Dict + for instance_id, dir_paths in self.instance_to_dir_map.items(): + res['mapping'][instance_id] = f'{len(dir_paths)} directories' + return res diff --git a/src/pybind/mgr/mirroring/fs/dir_map/state_transition.py b/src/pybind/mgr/mirroring/fs/dir_map/state_transition.py new file mode 100644 index 000000000..ef59a6a87 --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/dir_map/state_transition.py @@ -0,0 +1,94 @@ +import logging +from enum import Enum, unique +from typing import Dict + +log = logging.getLogger(__name__) + +@unique +class State(Enum): + UNASSOCIATED = 0 + INITIALIZING = 1 + ASSOCIATING = 2 + ASSOCIATED = 3 + SHUFFLING = 4 + DISASSOCIATING = 5 + +@unique +class ActionType(Enum): + NONE = 0 + MAP_UPDATE = 1 + MAP_REMOVE = 2 + ACQUIRE = 3 + RELEASE = 4 + +@unique +class PolicyAction(Enum): + MAP = 0 + UNMAP = 1 + REMOVE = 2 + +class TransitionKey: + def __init__(self, state, action_type): + self.transition_key = [state, action_type] + + def __hash__(self): + return hash(tuple(self.transition_key)) + + def __eq__(self, other): + return self.transition_key == other.transition_key + + def __neq__(self, other): + return not(self == other) + +class Transition: + def __init__(self, action_type, start_policy_action=None, + finish_policy_action=None, final_state=None): + self.action_type = action_type + self.start_policy_action = start_policy_action + self.finish_policy_action = finish_policy_action + self.final_state = final_state + + def __str__(self): + return "[action_type={0}, start_policy_action={1}, finish_policy_action={2}, final_state={3}".format( + self.action_type, self.start_policy_action, self.finish_policy_action, self.final_state) + +class StateTransition: + transition_table = {} # type: Dict[TransitionKey, Transition] + + @staticmethod + def transit(state, action_type): + try: + return StateTransition.transition_table[TransitionKey(state, action_type)] + except KeyError: + raise Exception() + + @staticmethod + def is_idle(state): + return state in (State.UNASSOCIATED, State.ASSOCIATED) + +StateTransition.transition_table = { + TransitionKey(State.INITIALIZING, ActionType.NONE) : Transition(ActionType.ACQUIRE), + TransitionKey(State.INITIALIZING, ActionType.ACQUIRE) : Transition(ActionType.NONE, + final_state=State.ASSOCIATED), + + TransitionKey(State.ASSOCIATING, ActionType.NONE) : Transition(ActionType.MAP_UPDATE, + start_policy_action=PolicyAction.MAP), + TransitionKey(State.ASSOCIATING, ActionType.MAP_UPDATE) : Transition(ActionType.ACQUIRE), + TransitionKey(State.ASSOCIATING, ActionType.ACQUIRE) : Transition(ActionType.NONE, + final_state=State.ASSOCIATED), + + TransitionKey(State.DISASSOCIATING, ActionType.NONE) : Transition(ActionType.RELEASE, + finish_policy_action=PolicyAction.UNMAP), + TransitionKey(State.DISASSOCIATING, ActionType.RELEASE) : Transition(ActionType.MAP_REMOVE, + finish_policy_action=PolicyAction.REMOVE), + TransitionKey(State.DISASSOCIATING, ActionType.MAP_REMOVE) : Transition(ActionType.NONE, + final_state=State.UNASSOCIATED), + + TransitionKey(State.SHUFFLING, ActionType.NONE) : Transition(ActionType.RELEASE, + finish_policy_action=PolicyAction.UNMAP), + TransitionKey(State.SHUFFLING, ActionType.RELEASE) : Transition(ActionType.MAP_UPDATE, + start_policy_action=PolicyAction.MAP), + TransitionKey(State.SHUFFLING, ActionType.MAP_UPDATE) : Transition(ActionType.ACQUIRE), + TransitionKey(State.SHUFFLING, ActionType.ACQUIRE) : Transition(ActionType.NONE, + final_state=State.ASSOCIATED), + } diff --git a/src/pybind/mgr/mirroring/fs/dir_map/update.py b/src/pybind/mgr/mirroring/fs/dir_map/update.py new file mode 100644 index 000000000..a70baa01a --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/dir_map/update.py @@ -0,0 +1,151 @@ +import errno +import pickle +import logging + +import rados + +from ..utils import MIRROR_OBJECT_NAME, DIRECTORY_MAP_PREFIX, \ + INSTANCE_ID_PREFIX, MIRROR_OBJECT_PREFIX + +log = logging.getLogger(__name__) + +MAX_UPDATE = 256 + +class UpdateDirMapRequest: + def __init__(self, ioctx, update_mapping, removals, on_finish_callback): + self.ioctx = ioctx + self.update_mapping = update_mapping + self.removals = removals + self.on_finish_callback = on_finish_callback + + @staticmethod + def omap_key(dir_path): + return f'{DIRECTORY_MAP_PREFIX}{dir_path}' + + def send(self): + log.info('updating image map') + self.send_update() + + def send_update(self): + log.debug(f'pending updates: {len(self.update_mapping)}+{len(self.removals)}') + try: + with rados.WriteOpCtx() as write_op: + keys = [] + vals = [] + dir_keys = list(self.update_mapping.keys())[0:MAX_UPDATE] + # gather updates + for dir_path in dir_keys: + mapping = self.update_mapping.pop(dir_path) + keys.append(UpdateDirMapRequest.omap_key(dir_path)) + vals.append(pickle.dumps(mapping)) + self.ioctx.set_omap(write_op, tuple(keys), tuple(vals)) + # gather deletes + slicept = MAX_UPDATE - len(dir_keys) + removals = [UpdateDirMapRequest.omap_key(dir_path) for dir_path in self.removals[0:slicept]] + self.removals = self.removals[slicept:] + self.ioctx.remove_omap_keys(write_op, tuple(removals)) + log.debug(f'applying {len(keys)} updates, {len(removals)} deletes') + self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update) + except rados.Error as e: + log.error(f'UpdateDirMapRequest.send_update exception: {e}') + self.finish(-e.args[0]) + + def handle_update(self, completion): + r = completion.get_return_value() + log.debug(f'handle_update: r={r}') + if not r == 0: + self.finish(r) + elif self.update_mapping or self.removals: + self.send_update() + else: + self.finish(0) + + def finish(self, r): + log.info(f'finish: r={r}') + self.on_finish_callback(r) + +class UpdateInstanceRequest: + def __init__(self, ioctx, instances_added, instances_removed, on_finish_callback): + self.ioctx = ioctx + self.instances_added = instances_added + # purge vs remove: purge list is for purging on-disk instance + # object. remove is for purging instance map. + self.instances_removed = instances_removed.copy() + self.instances_purge = instances_removed.copy() + self.on_finish_callback = on_finish_callback + + @staticmethod + def omap_key(instance_id): + return f'{INSTANCE_ID_PREFIX}{instance_id}' + + @staticmethod + def cephfs_mirror_object_name(instance_id): + assert instance_id != '' + return f'{MIRROR_OBJECT_PREFIX}.{instance_id}' + + def send(self): + log.info('updating instances') + self.send_update() + + def send_update(self): + self.remove_instance_object() + + def remove_instance_object(self): + log.debug(f'pending purges: {len(self.instances_purge)}') + if not self.instances_purge: + self.update_instance_map() + return + instance_id = self.instances_purge.pop() + self.ioctx.aio_remove( + UpdateInstanceRequest.cephfs_mirror_object_name(instance_id), oncomplete=self.handle_remove) + + def handle_remove(self, completion): + r = completion.get_return_value() + log.debug(f'handle_remove: r={r}') + # cephfs-mirror instances remove their respective instance + # objects upon termination. so we handle ENOENT here. note + # that when an instance is blocklisted, it wont be able to + # purge its instance object, so we do it on its behalf. + if not r == 0 and not r == -errno.ENOENT: + self.finish(r) + return + self.remove_instance_object() + + def update_instance_map(self): + log.debug(f'pending updates: {len(self.instances_added)}+{len(self.instances_removed)}') + try: + with rados.WriteOpCtx() as write_op: + keys = [] + vals = [] + instance_ids = list(self.instances_added.keys())[0:MAX_UPDATE] + # gather updates + for instance_id in instance_ids: + data = self.instances_added.pop(instance_id) + keys.append(UpdateInstanceRequest.omap_key(instance_id)) + vals.append(pickle.dumps(data)) + self.ioctx.set_omap(write_op, tuple(keys), tuple(vals)) + # gather deletes + slicept = MAX_UPDATE - len(instance_ids) + removals = [UpdateInstanceRequest.omap_key(instance_id) \ + for instance_id in self.instances_removed[0:slicept]] + self.instances_removed = self.instances_removed[slicept:] + self.ioctx.remove_omap_keys(write_op, tuple(removals)) + log.debug(f'applying {len(keys)} updates, {len(removals)} deletes') + self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update) + except rados.Error as e: + log.error(f'UpdateInstanceRequest.update_instance_map exception: {e}') + self.finish(-e.args[0]) + + def handle_update(self, completion): + r = completion.get_return_value() + log.debug(f'handle_update: r={r}') + if not r == 0: + self.finish(r) + elif self.instances_added or self.instances_removed: + self.update_instance_map() + else: + self.finish(0) + + def finish(self, r): + log.info(f'finish: r={r}') + self.on_finish_callback(r) diff --git a/src/pybind/mgr/mirroring/fs/exception.py b/src/pybind/mgr/mirroring/fs/exception.py new file mode 100644 index 000000000..d041b276c --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/exception.py @@ -0,0 +1,3 @@ +class MirrorException(Exception): + def __init__(self, error_code, error_msg=''): + super().__init__(error_code, error_msg) diff --git a/src/pybind/mgr/mirroring/fs/notify.py b/src/pybind/mgr/mirroring/fs/notify.py new file mode 100644 index 000000000..992cba297 --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/notify.py @@ -0,0 +1,121 @@ +import errno +import json +import logging +import threading +import time + +import rados + +from .utils import MIRROR_OBJECT_PREFIX, AsyncOpTracker + +log = logging.getLogger(__name__) + +class Notifier: + def __init__(self, ioctx): + self.ioctx = ioctx + + @staticmethod + def instance_object(instance_id): + return f'{MIRROR_OBJECT_PREFIX}.{instance_id}' + + def notify_cbk(self, dir_path, callback): + def cbk(_, r, acks, timeouts): + log.debug(f'Notifier.notify_cbk: ret {r} acks: {acks} timeouts: {timeouts}') + callback(dir_path, r) + return cbk + + def notify(self, dir_path, message, callback): + try: + instance_id = message[0] + message = message[1] + log.debug(f'Notifier.notify: {instance_id} {message} for {dir_path}') + self.ioctx.aio_notify( + Notifier.instance_object( + instance_id), self.notify_cbk(dir_path, callback), msg=message) + except rados.Error as e: + log.error(f'Notifier exception: {e}') + raise e + +class InstanceWatcher: + INSTANCE_TIMEOUT = 30 + NOTIFY_INTERVAL = 1 + + class Listener: + def handle_instances(self, added, removed): + raise NotImplementedError() + + def __init__(self, ioctx, instances, listener): + self.ioctx = ioctx + self.listener = listener + self.instances = {} + for instance_id, data in instances.items(): + self.instances[instance_id] = {'addr': data['addr'], + 'seen': time.time()} + self.lock = threading.Lock() + self.cond = threading.Condition(self.lock) + self.done = threading.Event() + self.waiting = threading.Event() + self.notify_task = None + self.schedule_notify_task() + + def schedule_notify_task(self): + assert self.notify_task == None + self.notify_task = threading.Timer(InstanceWatcher.NOTIFY_INTERVAL, self.notify) + self.notify_task.start() + + def wait_and_stop(self): + with self.lock: + log.info('InstanceWatcher.wait_and_stop') + self.waiting.set() + self.cond.wait_for(lambda: self.done.is_set()) + log.info('waiting done') + assert self.notify_task == None + + def handle_notify(self, _, r, acks, timeouts): + log.debug(f'InstanceWatcher.handle_notify r={r} acks={acks} timeouts={timeouts}') + with self.lock: + try: + added = {} + removed = {} + if acks is None: + acks = [] + ackd_instances = [] + for ack in acks: + instance_id = str(ack[0]) + ackd_instances.append(instance_id) + # sender data is quoted + notifier_data = json.loads(ack[2].decode('utf-8')) + log.debug(f'InstanceWatcher.handle_notify: {instance_id}: {notifier_data}') + if not instance_id in self.instances: + self.instances[instance_id] = {} + added[instance_id] = notifier_data['addr'] + self.instances[instance_id]['addr'] = notifier_data['addr'] + self.instances[instance_id]['seen'] = time.time() + # gather non responders + now = time.time() + for instance_id in list(self.instances.keys()): + data = self.instances[instance_id] + if (now - data['seen'] > InstanceWatcher.INSTANCE_TIMEOUT) or \ + (self.waiting.is_set() and instance_id not in ackd_instances): + removed[instance_id] = data['addr'] + self.instances.pop(instance_id) + if added or removed: + self.listener.handle_instances(added, removed) + except Exception as e: + log.warn(f'InstanceWatcher.handle_notify exception: {e}') + finally: + if not self.instances and self.waiting.is_set(): + self.done.set() + self.cond.notifyAll() + else: + self.schedule_notify_task() + + def notify(self): + with self.lock: + self.notify_task = None + try: + log.debug('InstanceWatcher.notify') + self.ioctx.aio_notify(MIRROR_OBJECT_PREFIX, self.handle_notify) + except rados.Error as e: + log.warn(f'InstanceWatcher exception: {e}') + self.schedule_notify_task() diff --git a/src/pybind/mgr/mirroring/fs/snapshot_mirror.py b/src/pybind/mgr/mirroring/fs/snapshot_mirror.py new file mode 100644 index 000000000..6fa8d0c4c --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/snapshot_mirror.py @@ -0,0 +1,792 @@ +import base64 +import errno +import json +import logging +import os +import pickle +import re +import stat +import threading +import uuid +from typing import Dict, Any + +import cephfs +import rados + +from mgr_util import RTimer, CephfsClient, open_filesystem,\ + CephfsConnectionException +from mgr_module import NotifyType +from .blocklist import blocklist +from .notify import Notifier, InstanceWatcher +from .utils import INSTANCE_ID_PREFIX, MIRROR_OBJECT_NAME, Finisher, \ + AsyncOpTracker, connect_to_filesystem, disconnect_from_filesystem +from .exception import MirrorException +from .dir_map.create import create_mirror_object +from .dir_map.load import load_dir_map, load_instances +from .dir_map.update import UpdateDirMapRequest, UpdateInstanceRequest +from .dir_map.policy import Policy +from .dir_map.state_transition import ActionType + +log = logging.getLogger(__name__) + +CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL = 1 + +class FSPolicy: + class InstanceListener(InstanceWatcher.Listener): + def __init__(self, fspolicy): + self.fspolicy = fspolicy + + def handle_instances(self, added, removed): + self.fspolicy.update_instances(added, removed) + + def __init__(self, mgr, ioctx): + self.mgr = mgr + self.ioctx = ioctx + self.pending = [] + self.policy = Policy() + self.lock = threading.Lock() + self.cond = threading.Condition(self.lock) + self.dir_paths = [] + self.async_requests = {} + self.finisher = Finisher() + self.op_tracker = AsyncOpTracker() + self.notifier = Notifier(ioctx) + self.instance_listener = FSPolicy.InstanceListener(self) + self.instance_watcher = None + self.stopping = threading.Event() + self.timer_task = RTimer(CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL, + self.process_updates) + self.timer_task.start() + + def schedule_action(self, dir_paths): + self.dir_paths.extend(dir_paths) + + def init(self, dir_mapping, instances): + with self.lock: + self.policy.init(dir_mapping) + # we'll schedule action for all directories, so don't bother capturing + # directory names here. + self.policy.add_instances(list(instances.keys()), initial_update=True) + self.instance_watcher = InstanceWatcher(self.ioctx, instances, + self.instance_listener) + self.schedule_action(list(dir_mapping.keys())) + + def shutdown(self): + with self.lock: + log.debug('FSPolicy.shutdown') + self.stopping.set() + log.debug('canceling update timer task') + self.timer_task.cancel() + log.debug('update timer task canceled') + if self.instance_watcher: + log.debug('stopping instance watcher') + self.instance_watcher.wait_and_stop() + log.debug('stopping instance watcher') + self.op_tracker.wait_for_ops() + log.debug('FSPolicy.shutdown done') + + def handle_update_mapping(self, updates, removals, request_id, callback, r): + log.info(f'handle_update_mapping: {updates} {removals} {request_id} {callback} {r}') + with self.lock: + try: + self.async_requests.pop(request_id) + if callback: + callback(updates, removals, r) + finally: + self.op_tracker.finish_async_op() + + def handle_update_instances(self, instances_added, instances_removed, request_id, r): + log.info(f'handle_update_instances: {instances_added} {instances_removed} {request_id} {r}') + with self.lock: + try: + self.async_requests.pop(request_id) + if self.stopping.is_set(): + log.debug(f'handle_update_instances: policy shutting down') + return + schedules = [] + if instances_removed: + schedules.extend(self.policy.remove_instances(instances_removed)) + if instances_added: + schedules.extend(self.policy.add_instances(instances_added)) + self.schedule_action(schedules) + finally: + self.op_tracker.finish_async_op() + + def update_mapping(self, update_map, removals, callback=None): + log.info(f'updating directory map: {len(update_map)}+{len(removals)} updates') + request_id = str(uuid.uuid4()) + def async_callback(r): + self.finisher.queue(self.handle_update_mapping, + [list(update_map.keys()), removals, request_id, callback, r]) + request = UpdateDirMapRequest(self.ioctx, update_map.copy(), removals.copy(), async_callback) + self.async_requests[request_id] = request + self.op_tracker.start_async_op() + log.debug(f'async request_id: {request_id}') + request.send() + + def update_instances(self, added, removed): + logging.debug(f'update_instances: added={added}, removed={removed}') + for instance_id, addr in removed.items(): + log.info(f'blocklisting instance_id: {instance_id} addr: {addr}') + blocklist(self.mgr, addr) + with self.lock: + instances_added = {} + instances_removed = [] + for instance_id, addr in added.items(): + instances_added[instance_id] = {'version': 1, 'addr': addr} + instances_removed = list(removed.keys()) + request_id = str(uuid.uuid4()) + def async_callback(r): + self.finisher.queue(self.handle_update_instances, + [list(instances_added.keys()), instances_removed, request_id, r]) + # blacklisted instances can be removed at this point. remapping directories + # mapped to blacklisted instances on module startup is handled in policy + # add_instances(). + request = UpdateInstanceRequest(self.ioctx, instances_added.copy(), + instances_removed.copy(), async_callback) + self.async_requests[request_id] = request + log.debug(f'async request_id: {request_id}') + self.op_tracker.start_async_op() + request.send() + + def continue_action(self, updates, removals, r): + log.debug(f'continuing action: {updates}+{removals} r={r}') + if self.stopping.is_set(): + log.debug('continue_action: policy shutting down') + return + schedules = [] + for dir_path in updates: + schedule = self.policy.finish_action(dir_path, r) + if schedule: + schedules.append(dir_path) + for dir_path in removals: + schedule = self.policy.finish_action(dir_path, r) + if schedule: + schedules.append(dir_path) + self.schedule_action(schedules) + + def handle_peer_ack(self, dir_path, r): + log.info(f'handle_peer_ack: {dir_path} r={r}') + with self.lock: + try: + if self.stopping.is_set(): + log.debug(f'handle_peer_ack: policy shutting down') + return + self.continue_action([dir_path], [], r) + finally: + self.op_tracker.finish_async_op() + + def process_updates(self): + def acquire_message(dir_path): + return json.dumps({'dir_path': dir_path, + 'mode': 'acquire' + }) + def release_message(dir_path): + return json.dumps({'dir_path': dir_path, + 'mode': 'release' + }) + with self.lock: + if not self.dir_paths or self.stopping.is_set(): + return + update_map = {} + removals = [] + notifies = {} + instance_purges = [] + for dir_path in self.dir_paths: + action_type = self.policy.start_action(dir_path) + lookup_info = self.policy.lookup(dir_path) + log.debug(f'processing action: dir_path: {dir_path}, lookup_info: {lookup_info}, action_type: {action_type}') + if action_type == ActionType.NONE: + continue + elif action_type == ActionType.MAP_UPDATE: + # take care to not overwrite purge status + update_map[dir_path] = {'version': 1, + 'instance_id': lookup_info['instance_id'], + 'last_shuffled': lookup_info['mapped_time'] + } + if lookup_info['purging']: + update_map[dir_path]['purging'] = 1 + elif action_type == ActionType.MAP_REMOVE: + removals.append(dir_path) + elif action_type == ActionType.ACQUIRE: + notifies[dir_path] = (lookup_info['instance_id'], acquire_message(dir_path)) + elif action_type == ActionType.RELEASE: + notifies[dir_path] = (lookup_info['instance_id'], release_message(dir_path)) + if update_map or removals: + self.update_mapping(update_map, removals, callback=self.continue_action) + for dir_path, message in notifies.items(): + self.op_tracker.start_async_op() + self.notifier.notify(dir_path, message, self.handle_peer_ack) + self.dir_paths.clear() + + def add_dir(self, dir_path): + with self.lock: + lookup_info = self.policy.lookup(dir_path) + if lookup_info: + if lookup_info['purging']: + raise MirrorException(-errno.EAGAIN, f'remove in-progress for {dir_path}') + else: + raise MirrorException(-errno.EEXIST, f'directory {dir_path} is already tracked') + schedule = self.policy.add_dir(dir_path) + if not schedule: + return + update_map = {dir_path: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0}} + updated = False + def update_safe(updates, removals, r): + nonlocal updated + updated = True + self.cond.notifyAll() + self.update_mapping(update_map, [], callback=update_safe) + self.cond.wait_for(lambda: updated) + self.schedule_action([dir_path]) + + def remove_dir(self, dir_path): + with self.lock: + lookup_info = self.policy.lookup(dir_path) + if not lookup_info: + raise MirrorException(-errno.ENOENT, f'directory {dir_path} id not tracked') + if lookup_info['purging']: + raise MirrorException(-errno.EINVAL, f'directory {dir_path} is under removal') + update_map = {dir_path: {'version': 1, + 'instance_id': lookup_info['instance_id'], + 'last_shuffled': lookup_info['mapped_time'], + 'purging': 1}} + updated = False + sync_lock = threading.Lock() + sync_cond = threading.Condition(sync_lock) + def update_safe(r): + with sync_lock: + nonlocal updated + updated = True + sync_cond.notifyAll() + request = UpdateDirMapRequest(self.ioctx, update_map.copy(), [], update_safe) + request.send() + with sync_lock: + sync_cond.wait_for(lambda: updated) + schedule = self.policy.remove_dir(dir_path) + if schedule: + self.schedule_action([dir_path]) + + def status(self, dir_path): + with self.lock: + res = self.policy.dir_status(dir_path) + return 0, json.dumps(res, indent=4, sort_keys=True), '' + + def summary(self): + with self.lock: + res = self.policy.instance_summary() + return 0, json.dumps(res, indent=4, sort_keys=True), '' + +class FSSnapshotMirror: + PEER_CONFIG_KEY_PREFIX = "cephfs/mirror/peer" + + def __init__(self, mgr): + self.mgr = mgr + self.rados = mgr.rados + self.pool_policy = {} + self.fs_map = self.mgr.get('fs_map') + self.lock = threading.Lock() + self.refresh_pool_policy() + self.local_fs = CephfsClient(mgr) + + def notify(self, notify_type: NotifyType): + log.debug(f'got notify type {notify_type}') + if notify_type == NotifyType.fs_map: + with self.lock: + self.fs_map = self.mgr.get('fs_map') + self.refresh_pool_policy_locked() + + @staticmethod + def make_spec(client_name, cluster_name): + return f'{client_name}@{cluster_name}' + + @staticmethod + def split_spec(spec): + try: + client_id, cluster_name = spec.split('@') + _, client_name = client_id.split('.') + return client_name, cluster_name + except ValueError: + raise MirrorException(-errno.EINVAL, f'invalid cluster spec {spec}') + + @staticmethod + def get_metadata_pool(filesystem, fs_map): + for fs in fs_map['filesystems']: + if fs['mdsmap']['fs_name'] == filesystem: + return fs['mdsmap']['metadata_pool'] + return None + + @staticmethod + def get_filesystem_id(filesystem, fs_map): + for fs in fs_map['filesystems']: + if fs['mdsmap']['fs_name'] == filesystem: + return fs['id'] + return None + + @staticmethod + def peer_config_key(filesystem, peer_uuid): + return f'{FSSnapshotMirror.PEER_CONFIG_KEY_PREFIX}/{filesystem}/{peer_uuid}' + + def config_set(self, key, val=None): + """set or remove a key from mon config store""" + if val: + cmd = {'prefix': 'config-key set', + 'key': key, 'val': val} + else: + cmd = {'prefix': 'config-key rm', + 'key': key} + r, outs, err = self.mgr.mon_command(cmd) + if r < 0: + log.error(f'mon command to set/remove config-key {key} failed: {err}') + raise Exception(-errno.EINVAL) + + def config_get(self, key): + """fetch a config key value from mon config store""" + cmd = {'prefix': 'config-key get', 'key': key} + r, outs, err = self.mgr.mon_command(cmd) + if r < 0 and not r == -errno.ENOENT: + log.error(f'mon command to get config-key {key} failed: {err}') + raise Exception(-errno.EINVAL) + val = {} + if r == 0: + val = json.loads(outs) + return val + + def filesystem_exist(self, filesystem): + for fs in self.fs_map['filesystems']: + if fs['mdsmap']['fs_name'] == filesystem: + return True + return False + + def get_mirrored_filesystems(self): + return [fs['mdsmap']['fs_name'] for fs in self.fs_map['filesystems'] if fs.get('mirror_info', None)] + + def get_filesystem_peers(self, filesystem): + """To be used when mirroring in enabled for the filesystem""" + for fs in self.fs_map['filesystems']: + if fs['mdsmap']['fs_name'] == filesystem: + return fs['mirror_info']['peers'] + return None + + def peer_exists(self, filesystem, remote_cluster_spec, remote_fs_name): + peers = self.get_filesystem_peers(filesystem) + for _, rem in peers.items(): + remote = rem['remote'] + spec = FSSnapshotMirror.make_spec(remote['client_name'], remote['cluster_name']) + if spec == remote_cluster_spec and remote['fs_name'] == remote_fs_name: + return True + return False + + @staticmethod + def get_mirror_info(fs): + try: + val = fs.getxattr('/', 'ceph.mirror.info') + match = re.search(r'^cluster_id=([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}) fs_id=(\d+)$', + val.decode('utf-8')) + if match and len(match.groups()) == 2: + return {'cluster_id': match.group(1), + 'fs_id': int(match.group(2)) + } + raise MirrorException(-errno.EINVAL, 'invalid ceph.mirror.info value format') + except cephfs.Error as e: + raise MirrorException(-e.errno, 'error fetching ceph.mirror.info xattr') + + @staticmethod + def set_mirror_info(local_cluster_id, local_fsid, remote_fs): + log.info(f'setting {local_cluster_id}::{local_fsid} on remote') + try: + remote_fs.setxattr('/', 'ceph.mirror.info', + f'cluster_id={local_cluster_id} fs_id={local_fsid}'.encode('utf-8'), os.XATTR_CREATE) + except cephfs.Error as e: + if e.errno == errno.EEXIST: + try: + mi = FSSnapshotMirror.get_mirror_info(remote_fs) + cluster_id = mi['cluster_id'] + fs_id = mi['fs_id'] + if not (cluster_id == local_cluster_id and fs_id == local_fsid): + raise MirrorException(-errno.EEXIST, f'peer mirrorred by: (cluster_id: {cluster_id}, fs_id: {fs_id})') + except MirrorException: + # if mirror info cannot be fetched for some reason, let's just + # fail. + raise MirrorException(-errno.EEXIST, f'already an active peer') + else: + log.error(f'error setting mirrored fsid: {e}') + raise Exception(-e.errno) + + def resolve_peer(self, fs_name, peer_uuid): + peers = self.get_filesystem_peers(fs_name) + for peer, rem in peers.items(): + if peer == peer_uuid: + return rem['remote'] + return None + + def purge_mirror_info(self, local_fs_name, peer_uuid): + log.debug(f'local fs={local_fs_name} peer_uuid={peer_uuid}') + # resolve the peer to its spec + rem = self.resolve_peer(local_fs_name, peer_uuid) + if not rem: + return + log.debug(f'peer_uuid={peer_uuid} resolved to {rem}') + _, client_name = rem['client_name'].split('.') + + # fetch auth details from config store + remote_conf = self.config_get(FSSnapshotMirror.peer_config_key(local_fs_name, peer_uuid)) + remote_cluster, remote_fs = connect_to_filesystem(client_name, + rem['cluster_name'], + rem['fs_name'], 'remote', conf_dct=remote_conf) + try: + remote_fs.removexattr('/', 'ceph.mirror.info') + except cephfs.Error as e: + if not e.errno == errno.ENOENT: + log.error('error removing mirror info') + raise Exception(-e.errno) + finally: + disconnect_from_filesystem(rem['cluster_name'], rem['fs_name'], remote_cluster, remote_fs) + + def verify_and_set_mirror_info(self, local_fs_name, remote_cluster_spec, remote_fs_name, remote_conf={}): + log.debug(f'local fs={local_fs_name} remote={remote_cluster_spec}/{remote_fs_name}') + + client_name, cluster_name = FSSnapshotMirror.split_spec(remote_cluster_spec) + remote_cluster, remote_fs = connect_to_filesystem(client_name, cluster_name, remote_fs_name, + 'remote', conf_dct=remote_conf) + try: + local_cluster_id = self.rados.get_fsid() + remote_cluster_id = remote_cluster.get_fsid() + log.debug(f'local_cluster_id={local_cluster_id} remote_cluster_id={remote_cluster_id}') + if 'fsid' in remote_conf: + if not remote_cluster_id == remote_conf['fsid']: + raise MirrorException(-errno.EINVAL, 'FSID mismatch between bootstrap token and remote cluster') + + local_fscid = remote_fscid = None + with open_filesystem(self.local_fs, local_fs_name) as local_fsh: + local_fscid = local_fsh.get_fscid() + remote_fscid = remote_fs.get_fscid() + log.debug(f'local_fscid={local_fscid} remote_fscid={remote_fscid}') + mi = None + try: + mi = FSSnapshotMirror.get_mirror_info(local_fsh) + except MirrorException as me: + if me.args[0] != -errno.ENODATA: + raise Exception(-errno.EINVAL) + if mi and mi['cluster_id'] == remote_cluster_id and mi['fs_id'] == remote_fscid: + raise MirrorException(-errno.EINVAL, f'file system is an active peer for file system: {remote_fs_name}') + + if local_cluster_id == remote_cluster_id and local_fscid == remote_fscid: + raise MirrorException(-errno.EINVAL, "'Source and destination cluster fsid and "\ + "file-system name can't be the same") + FSSnapshotMirror.set_mirror_info(local_cluster_id, local_fscid, remote_fs) + finally: + disconnect_from_filesystem(cluster_name, remote_fs_name, remote_cluster, remote_fs) + + def init_pool_policy(self, filesystem): + metadata_pool_id = FSSnapshotMirror.get_metadata_pool(filesystem, self.fs_map) + if not metadata_pool_id: + log.error(f'cannot find metadata pool-id for filesystem {filesystem}') + raise Exception(-errno.EINVAL) + try: + ioctx = self.rados.open_ioctx2(metadata_pool_id) + # TODO: make async if required + dir_mapping = load_dir_map(ioctx) + instances = load_instances(ioctx) + # init policy + fspolicy = FSPolicy(self.mgr, ioctx) + log.debug(f'init policy for filesystem {filesystem}: pool-id {metadata_pool_id}') + fspolicy.init(dir_mapping, instances) + self.pool_policy[filesystem] = fspolicy + except rados.Error as e: + log.error(f'failed to access pool-id {metadata_pool_id} for filesystem {filesystem}: {e}') + raise Exception(-e.errno) + + def refresh_pool_policy_locked(self): + filesystems = self.get_mirrored_filesystems() + log.debug(f'refreshing policy for {filesystems}') + for filesystem in list(self.pool_policy): + if not filesystem in filesystems: + log.info(f'shutdown pool policy for {filesystem}') + fspolicy = self.pool_policy.pop(filesystem) + fspolicy.shutdown() + for filesystem in filesystems: + if not filesystem in self.pool_policy: + log.info(f'init pool policy for {filesystem}') + self.init_pool_policy(filesystem) + + def refresh_pool_policy(self): + with self.lock: + self.refresh_pool_policy_locked() + + def enable_mirror(self, filesystem): + log.info(f'enabling mirror for filesystem {filesystem}') + with self.lock: + try: + metadata_pool_id = FSSnapshotMirror.get_metadata_pool(filesystem, self.fs_map) + if not metadata_pool_id: + log.error(f'cannot find metadata pool-id for filesystem {filesystem}') + raise Exception(-errno.EINVAL) + create_mirror_object(self.rados, metadata_pool_id) + cmd = {'prefix': 'fs mirror enable', 'fs_name': filesystem} + r, outs, err = self.mgr.mon_command(cmd) + if r < 0: + log.error(f'mon command to enable mirror failed: {err}') + raise Exception(-errno.EINVAL) + return 0, json.dumps({}), '' + except MirrorException as me: + return me.args[0], '', me.args[1] + except Exception as me: + return me.args[0], '', 'failed to enable mirroring' + + def disable_mirror(self, filesystem): + log.info(f'disabling mirror for filesystem {filesystem}') + try: + with self.lock: + cmd = {'prefix': 'fs mirror disable', 'fs_name': filesystem} + r, outs, err = self.mgr.mon_command(cmd) + if r < 0: + log.error(f'mon command to disable mirror failed: {err}') + raise Exception(-errno.EINVAL) + return 0, json.dumps({}), '' + except MirrorException as me: + return me.args[0], '', me.args[1] + except Exception as e: + return e.args[0], '', 'failed to disable mirroring' + + def peer_list(self, filesystem): + try: + with self.lock: + fspolicy = self.pool_policy.get(filesystem, None) + if not fspolicy: + raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored') + peers = self.get_filesystem_peers(filesystem) + peer_res = {} + for peer_uuid, rem in peers.items(): + conf = self.config_get(FSSnapshotMirror.peer_config_key(filesystem, peer_uuid)) + remote = rem['remote'] + peer_res[peer_uuid] = {'client_name': remote['client_name'], + 'site_name': remote['cluster_name'], + 'fs_name': remote['fs_name'] + } + if 'mon_host' in conf: + peer_res[peer_uuid]['mon_host'] = conf['mon_host'] + return 0, json.dumps(peer_res), '' + except MirrorException as me: + return me.args[0], '', me.args[1] + except Exception as e: + return e.args[0], '', 'failed to list peers' + + def peer_add(self, filesystem, remote_cluster_spec, remote_fs_name, remote_conf): + try: + if remote_fs_name == None: + remote_fs_name = filesystem + with self.lock: + fspolicy = self.pool_policy.get(filesystem, None) + if not fspolicy: + raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored') + ### peer updates for key, site-name are not yet supported + if self.peer_exists(filesystem, remote_cluster_spec, remote_fs_name): + return 0, json.dumps({}), '' + # _own_ the peer + self.verify_and_set_mirror_info(filesystem, remote_cluster_spec, remote_fs_name, remote_conf) + # unique peer uuid + peer_uuid = str(uuid.uuid4()) + config_key = FSSnapshotMirror.peer_config_key(filesystem, peer_uuid) + if remote_conf.get('mon_host') and remote_conf.get('key'): + self.config_set(config_key, json.dumps(remote_conf)) + cmd = {'prefix': 'fs mirror peer_add', + 'fs_name': filesystem, + 'uuid': peer_uuid, + 'remote_cluster_spec': remote_cluster_spec, + 'remote_fs_name': remote_fs_name} + r, outs, err = self.mgr.mon_command(cmd) + if r < 0: + log.error(f'mon command to add peer failed: {err}') + try: + log.debug(f'cleaning up config-key for {peer_uuid}') + self.config_set(config_key) + except: + pass + raise Exception(-errno.EINVAL) + return 0, json.dumps({}), '' + except MirrorException as me: + return me.args[0], '', me.args[1] + except Exception as e: + return e.args[0], '', 'failed to add peer' + + def peer_remove(self, filesystem, peer_uuid): + try: + with self.lock: + fspolicy = self.pool_policy.get(filesystem, None) + if not fspolicy: + raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored') + # ok, this is being a bit lazy. remove mirror info from peer followed + # by purging the peer from fsmap. if the mirror daemon fs map updates + # are laggy, they happily continue to synchronize. ideally, we should + # purge the peer from fsmap here and purge mirror info on fsmap update + # (in notify()). but thats not straightforward -- before purging mirror + # info, we would need to wait for all mirror daemons to catch up with + # fsmap updates. this involves mirror daemons sending the fsmap epoch + # they have seen in reply to a notify request. TODO: fix this. + self.purge_mirror_info(filesystem, peer_uuid) + cmd = {'prefix': 'fs mirror peer_remove', + 'fs_name': filesystem, + 'uuid': peer_uuid} + r, outs, err = self.mgr.mon_command(cmd) + if r < 0: + log.error(f'mon command to remove peer failed: {err}') + raise Exception(-errno.EINVAL) + self.config_set(FSSnapshotMirror.peer_config_key(filesystem, peer_uuid)) + return 0, json.dumps({}), '' + except MirrorException as me: + return me.args[0], '', me.args[1] + except Exception as e: + return e.args[0], '', 'failed to remove peer' + + def peer_bootstrap_create(self, fs_name, client_name, site_name): + """create a bootstrap token for this peer filesystem""" + try: + with self.lock: + cmd = {'prefix': 'fs authorize', + 'filesystem': fs_name, + 'entity': client_name, + 'caps': ['/', 'rwps']} + r, outs, err = self.mgr.mon_command(cmd) + if r < 0: + log.error(f'mon command to create peer user failed: {err}') + raise Exception(-errno.EINVAL) + cmd = {'prefix': 'auth get', + 'entity': client_name, + 'format': 'json'} + r, outs, err = self.mgr.mon_command(cmd) + if r < 0: + log.error(f'mon command to fetch keyring failed: {err}') + raise Exception(-errno.EINVAL) + outs = json.loads(outs) + outs0 = outs[0] + token_dct = {'fsid': self.mgr.rados.get_fsid(), + 'filesystem': fs_name, + 'user': outs0['entity'], + 'site_name': site_name, + 'key': outs0['key'], + 'mon_host': self.mgr.rados.conf_get('mon_host')} + token_str = json.dumps(token_dct).encode('utf-8') + encoded_token = base64.b64encode(token_str) + return 0, json.dumps({'token': encoded_token.decode('utf-8')}), '' + except MirrorException as me: + return me.args[0], '', me.args[1] + except Exception as e: + return e.args[0], '', 'failed to bootstrap peer' + + def peer_bootstrap_import(self, filesystem, token): + try: + token_str = base64.b64decode(token) + token_dct = json.loads(token_str.decode('utf-8')) + except: + return -errno.EINVAL, '', 'failed to parse token' + client_name = token_dct.pop('user') + cluster_name = token_dct.pop('site_name') + remote_fs_name = token_dct.pop('filesystem') + remote_cluster_spec = f'{client_name}@{cluster_name}' + return self.peer_add(filesystem, remote_cluster_spec, remote_fs_name, token_dct) + + @staticmethod + def norm_path(dir_path): + if not os.path.isabs(dir_path): + raise MirrorException(-errno.EINVAL, f'{dir_path} should be an absolute path') + return os.path.normpath(dir_path) + + def add_dir(self, filesystem, dir_path): + try: + with self.lock: + if not self.filesystem_exist(filesystem): + raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist') + fspolicy = self.pool_policy.get(filesystem, None) + if not fspolicy: + raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored') + dir_path = FSSnapshotMirror.norm_path(dir_path) + log.debug(f'path normalized to {dir_path}') + fspolicy.add_dir(dir_path) + return 0, json.dumps({}), '' + except MirrorException as me: + return me.args[0], '', me.args[1] + except Exception as e: + return e.args[0], '', 'failed to add directory' + + def remove_dir(self, filesystem, dir_path): + try: + with self.lock: + if not self.filesystem_exist(filesystem): + raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist') + fspolicy = self.pool_policy.get(filesystem, None) + if not fspolicy: + raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored') + dir_path = FSSnapshotMirror.norm_path(dir_path) + fspolicy.remove_dir(dir_path) + return 0, json.dumps({}), '' + except MirrorException as me: + return me.args[0], '', me.args[1] + except Exception as e: + return e.args[0], '', 'failed to remove directory' + + def status(self,filesystem, dir_path): + try: + with self.lock: + if not self.filesystem_exist(filesystem): + raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist') + fspolicy = self.pool_policy.get(filesystem, None) + if not fspolicy: + raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored') + dir_path = FSSnapshotMirror.norm_path(dir_path) + return fspolicy.status(dir_path) + except MirrorException as me: + return me.args[0], '', me.args[1] + + def show_distribution(self, filesystem): + try: + with self.lock: + if not self.filesystem_exist(filesystem): + raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist') + fspolicy = self.pool_policy.get(filesystem, None) + if not fspolicy: + raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored') + return fspolicy.summary() + except MirrorException as me: + return me.args[0], '', me.args[1] + + def daemon_status(self): + try: + with self.lock: + daemons = [] + sm = self.mgr.get('service_map') + daemon_entry = sm['services'].get('cephfs-mirror', None) + log.debug(f'daemon_entry: {daemon_entry}') + if daemon_entry is not None: + for daemon_key in daemon_entry.get('daemons', []): + try: + daemon_id = int(daemon_key) + except ValueError: + continue + daemon = { + 'daemon_id' : daemon_id, + 'filesystems' : [] + } # type: Dict[str, Any] + daemon_status = self.mgr.get_daemon_status('cephfs-mirror', daemon_key) + if not daemon_status: + log.debug(f'daemon status not yet availble for cephfs-mirror daemon: {daemon_key}') + continue + status = json.loads(daemon_status['status_json']) + for fs_id, fs_desc in status.items(): + fs = {'filesystem_id' : int(fs_id), + 'name' : fs_desc['name'], + 'directory_count' : fs_desc['directory_count'], + 'peers' : [] + } # type: Dict[str, Any] + for peer_uuid, peer_desc in fs_desc['peers'].items(): + peer = { + 'uuid' : peer_uuid, + 'remote' : peer_desc['remote'], + 'stats' : peer_desc['stats'] + } + fs['peers'].append(peer) + daemon['filesystems'].append(fs) + daemons.append(daemon) + return 0, json.dumps(daemons), '' + except MirrorException as me: + return me.args[0], '', me.args[1] diff --git a/src/pybind/mgr/mirroring/fs/utils.py b/src/pybind/mgr/mirroring/fs/utils.py new file mode 100644 index 000000000..5e1d05373 --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/utils.py @@ -0,0 +1,152 @@ +import errno +import logging +import threading + +import rados +import cephfs + +from .exception import MirrorException + +MIRROR_OBJECT_PREFIX = 'cephfs_mirror' +MIRROR_OBJECT_NAME = MIRROR_OBJECT_PREFIX + +INSTANCE_ID_PREFIX = "instance_" +DIRECTORY_MAP_PREFIX = "dir_map_" + +log = logging.getLogger(__name__) + +def connect_to_cluster(client_name, cluster_name, conf_dct, desc=''): + try: + log.debug(f'connecting to {desc} cluster: {client_name}/{cluster_name}') + mon_host = conf_dct.get('mon_host', '') + cephx_key = conf_dct.get('key', '') + if mon_host and cephx_key: + r_rados = rados.Rados(rados_id=client_name, conf={'mon_host': mon_host, + 'key': cephx_key}) + else: + r_rados = rados.Rados(rados_id=client_name, clustername=cluster_name) + r_rados.conf_read_file() + r_rados.connect() + log.debug(f'connected to {desc} cluster') + return r_rados + except rados.Error as e: + if e.errno == errno.ENOENT: + raise MirrorException(-e.errno, f'cluster {cluster_name} does not exist') + else: + log.error(f'error connecting to cluster: {e}') + raise Exception(-e.errno) + +def disconnect_from_cluster(cluster_name, cluster): + try: + log.debug(f'disconnecting from cluster {cluster_name}') + cluster.shutdown() + log.debug(f'disconnected from cluster {cluster_name}') + except Exception as e: + log.error(f'error disconnecting: {e}') + +def connect_to_filesystem(client_name, cluster_name, fs_name, desc, conf_dct={}): + try: + cluster = connect_to_cluster(client_name, cluster_name, conf_dct, desc) + log.debug(f'connecting to {desc} filesystem: {fs_name}') + fs = cephfs.LibCephFS(rados_inst=cluster) + fs.conf_set("client_mount_uid", "0") + fs.conf_set("client_mount_gid", "0") + fs.conf_set("client_check_pool_perm", "false") + log.debug('CephFS initializing...') + fs.init() + log.debug('CephFS mounting...') + fs.mount(filesystem_name=fs_name.encode('utf-8')) + log.debug(f'Connection to cephfs {fs_name} complete') + return (cluster, fs) + except cephfs.Error as e: + if e.errno == errno.ENOENT: + raise MirrorException(-e.errno, f'filesystem {fs_name} does not exist') + else: + log.error(f'error connecting to filesystem {fs_name}: {e}') + raise Exception(-e.errno) + +def disconnect_from_filesystem(cluster_name, fs_name, cluster, fs_handle): + try: + log.debug(f'disconnecting from filesystem {fs_name}') + fs_handle.shutdown() + log.debug(f'disconnected from filesystem {fs_name}') + disconnect_from_cluster(cluster_name, cluster) + except Exception as e: + log.error(f'error disconnecting: {e}') + +class _ThreadWrapper(threading.Thread): + def __init__(self, name): + self.q = [] + self.stopping = threading.Event() + self.terminated = threading.Event() + self.lock = threading.Lock() + self.cond = threading.Condition(self.lock) + super().__init__(name=name) + super().start() + + def run(self): + try: + with self.lock: + while True: + self.cond.wait_for(lambda: self.q or self.stopping.is_set()) + if self.stopping.is_set(): + log.debug('thread exiting') + self.terminated.set() + self.cond.notifyAll() + return + q = self.q.copy() + self.q.clear() + self.lock.release() + try: + for item in q: + log.debug(f'calling {item[0]} params {item[1]}') + item[0](*item[1]) + except Exception as e: + log.warn(f'callback exception: {e}') + self.lock.acquire() + except Exception as e: + log.info(f'threading exception: {e}') + + def queue(self, cbk, args): + with self.lock: + self.q.append((cbk, args)) + self.cond.notifyAll() + + def stop(self): + with self.lock: + self.stopping.set() + self.cond.notifyAll() + self.cond.wait_for(lambda: self.terminated.is_set()) + +class Finisher: + def __init__(self): + self.lock = threading.Lock() + self.thread = _ThreadWrapper(name='finisher') + + def queue(self, cbk, args=[]): + with self.lock: + self.thread.queue(cbk, args) + +class AsyncOpTracker: + def __init__(self): + self.ops_in_progress = 0 + self.lock = threading.Lock() + self.cond = threading.Condition(self.lock) + + def start_async_op(self): + with self.lock: + self.ops_in_progress += 1 + log.debug(f'start_async_op: {self.ops_in_progress}') + + def finish_async_op(self): + with self.lock: + self.ops_in_progress -= 1 + log.debug(f'finish_async_op: {self.ops_in_progress}') + assert(self.ops_in_progress >= 0) + self.cond.notifyAll() + + def wait_for_ops(self): + with self.lock: + log.debug(f'wait_for_ops: {self.ops_in_progress}') + self.cond.wait_for(lambda: self.ops_in_progress == 0) + log.debug(f'done') |