summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/mirroring/fs/notify.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/mirroring/fs/notify.py')
-rw-r--r--src/pybind/mgr/mirroring/fs/notify.py121
1 files changed, 121 insertions, 0 deletions
diff --git a/src/pybind/mgr/mirroring/fs/notify.py b/src/pybind/mgr/mirroring/fs/notify.py
new file mode 100644
index 000000000..992cba297
--- /dev/null
+++ b/src/pybind/mgr/mirroring/fs/notify.py
@@ -0,0 +1,121 @@
+import errno
+import json
+import logging
+import threading
+import time
+
+import rados
+
+from .utils import MIRROR_OBJECT_PREFIX, AsyncOpTracker
+
+log = logging.getLogger(__name__)
+
+class Notifier:
+ def __init__(self, ioctx):
+ self.ioctx = ioctx
+
+ @staticmethod
+ def instance_object(instance_id):
+ return f'{MIRROR_OBJECT_PREFIX}.{instance_id}'
+
+ def notify_cbk(self, dir_path, callback):
+ def cbk(_, r, acks, timeouts):
+ log.debug(f'Notifier.notify_cbk: ret {r} acks: {acks} timeouts: {timeouts}')
+ callback(dir_path, r)
+ return cbk
+
+ def notify(self, dir_path, message, callback):
+ try:
+ instance_id = message[0]
+ message = message[1]
+ log.debug(f'Notifier.notify: {instance_id} {message} for {dir_path}')
+ self.ioctx.aio_notify(
+ Notifier.instance_object(
+ instance_id), self.notify_cbk(dir_path, callback), msg=message)
+ except rados.Error as e:
+ log.error(f'Notifier exception: {e}')
+ raise e
+
+class InstanceWatcher:
+ INSTANCE_TIMEOUT = 30
+ NOTIFY_INTERVAL = 1
+
+ class Listener:
+ def handle_instances(self, added, removed):
+ raise NotImplementedError()
+
+ def __init__(self, ioctx, instances, listener):
+ self.ioctx = ioctx
+ self.listener = listener
+ self.instances = {}
+ for instance_id, data in instances.items():
+ self.instances[instance_id] = {'addr': data['addr'],
+ 'seen': time.time()}
+ self.lock = threading.Lock()
+ self.cond = threading.Condition(self.lock)
+ self.done = threading.Event()
+ self.waiting = threading.Event()
+ self.notify_task = None
+ self.schedule_notify_task()
+
+ def schedule_notify_task(self):
+ assert self.notify_task == None
+ self.notify_task = threading.Timer(InstanceWatcher.NOTIFY_INTERVAL, self.notify)
+ self.notify_task.start()
+
+ def wait_and_stop(self):
+ with self.lock:
+ log.info('InstanceWatcher.wait_and_stop')
+ self.waiting.set()
+ self.cond.wait_for(lambda: self.done.is_set())
+ log.info('waiting done')
+ assert self.notify_task == None
+
+ def handle_notify(self, _, r, acks, timeouts):
+ log.debug(f'InstanceWatcher.handle_notify r={r} acks={acks} timeouts={timeouts}')
+ with self.lock:
+ try:
+ added = {}
+ removed = {}
+ if acks is None:
+ acks = []
+ ackd_instances = []
+ for ack in acks:
+ instance_id = str(ack[0])
+ ackd_instances.append(instance_id)
+ # sender data is quoted
+ notifier_data = json.loads(ack[2].decode('utf-8'))
+ log.debug(f'InstanceWatcher.handle_notify: {instance_id}: {notifier_data}')
+ if not instance_id in self.instances:
+ self.instances[instance_id] = {}
+ added[instance_id] = notifier_data['addr']
+ self.instances[instance_id]['addr'] = notifier_data['addr']
+ self.instances[instance_id]['seen'] = time.time()
+ # gather non responders
+ now = time.time()
+ for instance_id in list(self.instances.keys()):
+ data = self.instances[instance_id]
+ if (now - data['seen'] > InstanceWatcher.INSTANCE_TIMEOUT) or \
+ (self.waiting.is_set() and instance_id not in ackd_instances):
+ removed[instance_id] = data['addr']
+ self.instances.pop(instance_id)
+ if added or removed:
+ self.listener.handle_instances(added, removed)
+ except Exception as e:
+ log.warn(f'InstanceWatcher.handle_notify exception: {e}')
+ finally:
+ if not self.instances and self.waiting.is_set():
+ self.done.set()
+ self.cond.notifyAll()
+ else:
+ self.schedule_notify_task()
+
+ def notify(self):
+ with self.lock:
+ self.notify_task = None
+ try:
+ log.debug('InstanceWatcher.notify')
+ self.ioctx.aio_notify(MIRROR_OBJECT_PREFIX, self.handle_notify)
+ except rados.Error as e:
+ log.warn(f'InstanceWatcher exception: {e}')
+ self.schedule_notify_task()