diff options
Diffstat (limited to '')
-rw-r--r-- | src/pybind/mgr/mirroring/fs/dir_map/update.py | 151 |
1 files changed, 151 insertions, 0 deletions
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/update.py b/src/pybind/mgr/mirroring/fs/dir_map/update.py new file mode 100644 index 000000000..a70baa01a --- /dev/null +++ b/src/pybind/mgr/mirroring/fs/dir_map/update.py @@ -0,0 +1,151 @@ +import errno +import pickle +import logging + +import rados + +from ..utils import MIRROR_OBJECT_NAME, DIRECTORY_MAP_PREFIX, \ + INSTANCE_ID_PREFIX, MIRROR_OBJECT_PREFIX + +log = logging.getLogger(__name__) + +MAX_UPDATE = 256 + +class UpdateDirMapRequest: + def __init__(self, ioctx, update_mapping, removals, on_finish_callback): + self.ioctx = ioctx + self.update_mapping = update_mapping + self.removals = removals + self.on_finish_callback = on_finish_callback + + @staticmethod + def omap_key(dir_path): + return f'{DIRECTORY_MAP_PREFIX}{dir_path}' + + def send(self): + log.info('updating image map') + self.send_update() + + def send_update(self): + log.debug(f'pending updates: {len(self.update_mapping)}+{len(self.removals)}') + try: + with rados.WriteOpCtx() as write_op: + keys = [] + vals = [] + dir_keys = list(self.update_mapping.keys())[0:MAX_UPDATE] + # gather updates + for dir_path in dir_keys: + mapping = self.update_mapping.pop(dir_path) + keys.append(UpdateDirMapRequest.omap_key(dir_path)) + vals.append(pickle.dumps(mapping)) + self.ioctx.set_omap(write_op, tuple(keys), tuple(vals)) + # gather deletes + slicept = MAX_UPDATE - len(dir_keys) + removals = [UpdateDirMapRequest.omap_key(dir_path) for dir_path in self.removals[0:slicept]] + self.removals = self.removals[slicept:] + self.ioctx.remove_omap_keys(write_op, tuple(removals)) + log.debug(f'applying {len(keys)} updates, {len(removals)} deletes') + self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update) + except rados.Error as e: + log.error(f'UpdateDirMapRequest.send_update exception: {e}') + self.finish(-e.args[0]) + + def handle_update(self, completion): + r = completion.get_return_value() + log.debug(f'handle_update: r={r}') + if not r == 0: + self.finish(r) + elif self.update_mapping or self.removals: + self.send_update() + else: + self.finish(0) + + def finish(self, r): + log.info(f'finish: r={r}') + self.on_finish_callback(r) + +class UpdateInstanceRequest: + def __init__(self, ioctx, instances_added, instances_removed, on_finish_callback): + self.ioctx = ioctx + self.instances_added = instances_added + # purge vs remove: purge list is for purging on-disk instance + # object. remove is for purging instance map. + self.instances_removed = instances_removed.copy() + self.instances_purge = instances_removed.copy() + self.on_finish_callback = on_finish_callback + + @staticmethod + def omap_key(instance_id): + return f'{INSTANCE_ID_PREFIX}{instance_id}' + + @staticmethod + def cephfs_mirror_object_name(instance_id): + assert instance_id != '' + return f'{MIRROR_OBJECT_PREFIX}.{instance_id}' + + def send(self): + log.info('updating instances') + self.send_update() + + def send_update(self): + self.remove_instance_object() + + def remove_instance_object(self): + log.debug(f'pending purges: {len(self.instances_purge)}') + if not self.instances_purge: + self.update_instance_map() + return + instance_id = self.instances_purge.pop() + self.ioctx.aio_remove( + UpdateInstanceRequest.cephfs_mirror_object_name(instance_id), oncomplete=self.handle_remove) + + def handle_remove(self, completion): + r = completion.get_return_value() + log.debug(f'handle_remove: r={r}') + # cephfs-mirror instances remove their respective instance + # objects upon termination. so we handle ENOENT here. note + # that when an instance is blocklisted, it wont be able to + # purge its instance object, so we do it on its behalf. + if not r == 0 and not r == -errno.ENOENT: + self.finish(r) + return + self.remove_instance_object() + + def update_instance_map(self): + log.debug(f'pending updates: {len(self.instances_added)}+{len(self.instances_removed)}') + try: + with rados.WriteOpCtx() as write_op: + keys = [] + vals = [] + instance_ids = list(self.instances_added.keys())[0:MAX_UPDATE] + # gather updates + for instance_id in instance_ids: + data = self.instances_added.pop(instance_id) + keys.append(UpdateInstanceRequest.omap_key(instance_id)) + vals.append(pickle.dumps(data)) + self.ioctx.set_omap(write_op, tuple(keys), tuple(vals)) + # gather deletes + slicept = MAX_UPDATE - len(instance_ids) + removals = [UpdateInstanceRequest.omap_key(instance_id) \ + for instance_id in self.instances_removed[0:slicept]] + self.instances_removed = self.instances_removed[slicept:] + self.ioctx.remove_omap_keys(write_op, tuple(removals)) + log.debug(f'applying {len(keys)} updates, {len(removals)} deletes') + self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update) + except rados.Error as e: + log.error(f'UpdateInstanceRequest.update_instance_map exception: {e}') + self.finish(-e.args[0]) + + def handle_update(self, completion): + r = completion.get_return_value() + log.debug(f'handle_update: r={r}') + if not r == 0: + self.finish(r) + elif self.instances_added or self.instances_removed: + self.update_instance_map() + else: + self.finish(0) + + def finish(self, r): + log.info(f'finish: r={r}') + self.on_finish_callback(r) |