1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
import errno
import json
import logging
import threading
import time
import rados
from .utils import MIRROR_OBJECT_PREFIX, AsyncOpTracker
log = logging.getLogger(__name__)
class Notifier:
def __init__(self, ioctx):
self.ioctx = ioctx
@staticmethod
def instance_object(instance_id):
return f'{MIRROR_OBJECT_PREFIX}.{instance_id}'
def notify_cbk(self, dir_path, callback):
def cbk(_, r, acks, timeouts):
log.debug(f'Notifier.notify_cbk: ret {r} acks: {acks} timeouts: {timeouts}')
callback(dir_path, r)
return cbk
def notify(self, dir_path, message, callback):
try:
instance_id = message[0]
message = message[1]
log.debug(f'Notifier.notify: {instance_id} {message} for {dir_path}')
self.ioctx.aio_notify(
Notifier.instance_object(
instance_id), self.notify_cbk(dir_path, callback), msg=message)
except rados.Error as e:
log.error(f'Notifier exception: {e}')
raise e
class InstanceWatcher:
INSTANCE_TIMEOUT = 30
NOTIFY_INTERVAL = 1
class Listener:
def handle_instances(self, added, removed):
raise NotImplementedError()
def __init__(self, ioctx, instances, listener):
self.ioctx = ioctx
self.listener = listener
self.instances = {}
for instance_id, data in instances.items():
self.instances[instance_id] = {'addr': data['addr'],
'seen': time.time()}
self.lock = threading.Lock()
self.cond = threading.Condition(self.lock)
self.done = threading.Event()
self.waiting = threading.Event()
self.notify_task = None
self.schedule_notify_task()
def schedule_notify_task(self):
assert self.notify_task == None
self.notify_task = threading.Timer(InstanceWatcher.NOTIFY_INTERVAL, self.notify)
self.notify_task.start()
def wait_and_stop(self):
with self.lock:
log.info('InstanceWatcher.wait_and_stop')
self.waiting.set()
self.cond.wait_for(lambda: self.done.is_set())
log.info('waiting done')
assert self.notify_task == None
def handle_notify(self, _, r, acks, timeouts):
log.debug(f'InstanceWatcher.handle_notify r={r} acks={acks} timeouts={timeouts}')
with self.lock:
try:
added = {}
removed = {}
if acks is None:
acks = []
ackd_instances = []
for ack in acks:
instance_id = str(ack[0])
ackd_instances.append(instance_id)
# sender data is quoted
notifier_data = json.loads(ack[2].decode('utf-8'))
log.debug(f'InstanceWatcher.handle_notify: {instance_id}: {notifier_data}')
if not instance_id in self.instances:
self.instances[instance_id] = {}
added[instance_id] = notifier_data['addr']
self.instances[instance_id]['addr'] = notifier_data['addr']
self.instances[instance_id]['seen'] = time.time()
# gather non responders
now = time.time()
for instance_id in list(self.instances.keys()):
data = self.instances[instance_id]
if (now - data['seen'] > InstanceWatcher.INSTANCE_TIMEOUT) or \
(self.waiting.is_set() and instance_id not in ackd_instances):
removed[instance_id] = data['addr']
self.instances.pop(instance_id)
if added or removed:
self.listener.handle_instances(added, removed)
except Exception as e:
log.warn(f'InstanceWatcher.handle_notify exception: {e}')
finally:
if not self.instances and self.waiting.is_set():
self.done.set()
self.cond.notifyAll()
else:
self.schedule_notify_task()
def notify(self):
with self.lock:
self.notify_task = None
try:
log.debug('InstanceWatcher.notify')
self.ioctx.aio_notify(MIRROR_OBJECT_PREFIX, self.handle_notify)
except rados.Error as e:
log.warn(f'InstanceWatcher exception: {e}')
self.schedule_notify_task()
|