summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/mirroring/fs/dir_map/update.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/mirroring/fs/dir_map/update.py')
-rw-r--r--src/pybind/mgr/mirroring/fs/dir_map/update.py151
1 files changed, 151 insertions, 0 deletions
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/update.py b/src/pybind/mgr/mirroring/fs/dir_map/update.py
new file mode 100644
index 000000000..a70baa01a
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/dir_map/update.py
@@ -0,0 +1,151 @@
+import errno
+import pickle
+import logging
+
+import rados
+
+from ..utils import MIRROR_OBJECT_NAME, DIRECTORY_MAP_PREFIX, \
+ INSTANCE_ID_PREFIX, MIRROR_OBJECT_PREFIX
+
+log = logging.getLogger(__name__)
+
+MAX_UPDATE = 256
+
+class UpdateDirMapRequest:
+ def __init__(self, ioctx, update_mapping, removals, on_finish_callback):
+ self.ioctx = ioctx
+ self.update_mapping = update_mapping
+ self.removals = removals
+ self.on_finish_callback = on_finish_callback
+
+ @staticmethod
+ def omap_key(dir_path):
+ return f'{DIRECTORY_MAP_PREFIX}{dir_path}'
+
+ def send(self):
+ log.info('updating image map')
+ self.send_update()
+
+ def send_update(self):
+ log.debug(f'pending updates: {len(self.update_mapping)}+{len(self.removals)}')
+ try:
+ with rados.WriteOpCtx() as write_op:
+ keys = []
+ vals = []
+ dir_keys = list(self.update_mapping.keys())[0:MAX_UPDATE]
+ # gather updates
+ for dir_path in dir_keys:
+ mapping = self.update_mapping.pop(dir_path)
+ keys.append(UpdateDirMapRequest.omap_key(dir_path))
+ vals.append(pickle.dumps(mapping))
+ self.ioctx.set_omap(write_op, tuple(keys), tuple(vals))
+ # gather deletes
+ slicept = MAX_UPDATE - len(dir_keys)
+ removals = [UpdateDirMapRequest.omap_key(dir_path) for dir_path in self.removals[0:slicept]]
+ self.removals = self.removals[slicept:]
+ self.ioctx.remove_omap_keys(write_op, tuple(removals))
+ log.debug(f'applying {len(keys)} updates, {len(removals)} deletes')
+ self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update)
+ except rados.Error as e:
+ log.error(f'UpdateDirMapRequest.send_update exception: {e}')
+ self.finish(-e.args[0])
+
+ def handle_update(self, completion):
+ r = completion.get_return_value()
+ log.debug(f'handle_update: r={r}')
+ if not r == 0:
+ self.finish(r)
+ elif self.update_mapping or self.removals:
+ self.send_update()
+ else:
+ self.finish(0)
+
+ def finish(self, r):
+ log.info(f'finish: r={r}')
+ self.on_finish_callback(r)
+
+class UpdateInstanceRequest:
+ def __init__(self, ioctx, instances_added, instances_removed, on_finish_callback):
+ self.ioctx = ioctx
+ self.instances_added = instances_added
+ # purge vs remove: purge list is for purging on-disk instance
+ # object. remove is for purging instance map.
+ self.instances_removed = instances_removed.copy()
+ self.instances_purge = instances_removed.copy()
+ self.on_finish_callback = on_finish_callback
+
+ @staticmethod
+ def omap_key(instance_id):
+ return f'{INSTANCE_ID_PREFIX}{instance_id}'
+
+ @staticmethod
+ def cephfs_mirror_object_name(instance_id):
+ assert instance_id != ''
+ return f'{MIRROR_OBJECT_PREFIX}.{instance_id}'
+
+ def send(self):
+ log.info('updating instances')
+ self.send_update()
+
+ def send_update(self):
+ self.remove_instance_object()
+
+ def remove_instance_object(self):
+ log.debug(f'pending purges: {len(self.instances_purge)}')
+ if not self.instances_purge:
+ self.update_instance_map()
+ return
+ instance_id = self.instances_purge.pop()
+ self.ioctx.aio_remove(
+ UpdateInstanceRequest.cephfs_mirror_object_name(instance_id), oncomplete=self.handle_remove)
+
+ def handle_remove(self, completion):
+ r = completion.get_return_value()
+ log.debug(f'handle_remove: r={r}')
+ # cephfs-mirror instances remove their respective instance
+ # objects upon termination. so we handle ENOENT here. note
+ # that when an instance is blocklisted, it wont be able to
+ # purge its instance object, so we do it on its behalf.
+ if not r == 0 and not r == -errno.ENOENT:
+ self.finish(r)
+ return
+ self.remove_instance_object()
+
+ def update_instance_map(self):
+ log.debug(f'pending updates: {len(self.instances_added)}+{len(self.instances_removed)}')
+ try:
+ with rados.WriteOpCtx() as write_op:
+ keys = []
+ vals = []
+ instance_ids = list(self.instances_added.keys())[0:MAX_UPDATE]
+ # gather updates
+ for instance_id in instance_ids:
+ data = self.instances_added.pop(instance_id)
+ keys.append(UpdateInstanceRequest.omap_key(instance_id))
+ vals.append(pickle.dumps(data))
+ self.ioctx.set_omap(write_op, tuple(keys), tuple(vals))
+ # gather deletes
+ slicept = MAX_UPDATE - len(instance_ids)
+ removals = [UpdateInstanceRequest.omap_key(instance_id) \
+ for instance_id in self.instances_removed[0:slicept]]
+ self.instances_removed = self.instances_removed[slicept:]
+ self.ioctx.remove_omap_keys(write_op, tuple(removals))
+ log.debug(f'applying {len(keys)} updates, {len(removals)} deletes')
+ self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update)
+ except rados.Error as e:
+ log.error(f'UpdateInstanceRequest.update_instance_map exception: {e}')
+ self.finish(-e.args[0])
+
+ def handle_update(self, completion):
+ r = completion.get_return_value()
+ log.debug(f'handle_update: r={r}')
+ if not r == 0:
+ self.finish(r)
+ elif self.instances_added or self.instances_removed:
+ self.update_instance_map()
+ else:
+ self.finish(0)
+
+ def finish(self, r):
+ log.info(f'finish: r={r}')
+ self.on_finish_callback(r)