summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/mirroring/fs/notify.py
blob: 992cba2973e1106c2d6ec2fed3264f034910bc4d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import errno
import json
import logging
import threading
import time

import rados

from .utils import MIRROR_OBJECT_PREFIX, AsyncOpTracker

log = logging.getLogger(__name__)

class Notifier:
    def __init__(self, ioctx):
        self.ioctx = ioctx

    @staticmethod
    def instance_object(instance_id):
        return f'{MIRROR_OBJECT_PREFIX}.{instance_id}'

    def notify_cbk(self, dir_path, callback):
        def cbk(_, r, acks, timeouts):
            log.debug(f'Notifier.notify_cbk: ret {r} acks: {acks} timeouts: {timeouts}')
            callback(dir_path, r)
        return cbk

    def notify(self, dir_path, message, callback):
        try:
            instance_id = message[0]
            message = message[1]
            log.debug(f'Notifier.notify: {instance_id} {message} for {dir_path}')
            self.ioctx.aio_notify(
                Notifier.instance_object(
                    instance_id), self.notify_cbk(dir_path, callback), msg=message)
        except rados.Error as e:
            log.error(f'Notifier exception: {e}')
            raise e

class InstanceWatcher:
    INSTANCE_TIMEOUT = 30
    NOTIFY_INTERVAL = 1

    class Listener:
        def handle_instances(self, added, removed):
            raise NotImplementedError()

    def __init__(self, ioctx, instances, listener):
        self.ioctx = ioctx
        self.listener = listener
        self.instances = {}
        for instance_id, data in instances.items():
            self.instances[instance_id] = {'addr': data['addr'],
                                           'seen': time.time()}
        self.lock = threading.Lock()
        self.cond = threading.Condition(self.lock)
        self.done = threading.Event()
        self.waiting = threading.Event()
        self.notify_task = None
        self.schedule_notify_task()

    def schedule_notify_task(self):
        assert self.notify_task == None
        self.notify_task = threading.Timer(InstanceWatcher.NOTIFY_INTERVAL, self.notify)
        self.notify_task.start()

    def wait_and_stop(self):
        with self.lock:
            log.info('InstanceWatcher.wait_and_stop')
            self.waiting.set()
            self.cond.wait_for(lambda: self.done.is_set())
            log.info('waiting done')
            assert self.notify_task == None

    def handle_notify(self, _, r, acks, timeouts):
        log.debug(f'InstanceWatcher.handle_notify r={r} acks={acks} timeouts={timeouts}')
        with self.lock:
            try:
                added = {}
                removed = {}
                if acks is None:
                    acks = []
                ackd_instances = []
                for ack in acks:
                    instance_id = str(ack[0])
                    ackd_instances.append(instance_id)
                    # sender data is quoted
                    notifier_data = json.loads(ack[2].decode('utf-8'))
                    log.debug(f'InstanceWatcher.handle_notify: {instance_id}: {notifier_data}')
                    if not instance_id in self.instances:
                        self.instances[instance_id] = {}
                        added[instance_id] = notifier_data['addr']
                    self.instances[instance_id]['addr'] = notifier_data['addr']
                    self.instances[instance_id]['seen'] = time.time()
                # gather non responders
                now = time.time()
                for instance_id in list(self.instances.keys()):
                    data = self.instances[instance_id]
                    if (now - data['seen'] > InstanceWatcher.INSTANCE_TIMEOUT) or \
                       (self.waiting.is_set() and instance_id not in ackd_instances):
                        removed[instance_id] = data['addr']
                        self.instances.pop(instance_id)
                if added or removed:
                    self.listener.handle_instances(added, removed)
            except Exception as e:
                log.warn(f'InstanceWatcher.handle_notify exception: {e}')
            finally:
                if not self.instances and self.waiting.is_set():
                    self.done.set()
                    self.cond.notifyAll()
                else:
                    self.schedule_notify_task()

    def notify(self):
        with self.lock:
            self.notify_task = None
            try:
                log.debug('InstanceWatcher.notify')
                self.ioctx.aio_notify(MIRROR_OBJECT_PREFIX, self.handle_notify)
            except rados.Error as e:
                log.warn(f'InstanceWatcher exception: {e}')
                self.schedule_notify_task()