summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/dashboard/tests/test_notification.py
blob: df303e455634f4a4afd63416f233ca1c05937259 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import random
import time
import unittest

from ..tools import NotificationQueue


class Listener(object):
    # pylint: disable=too-many-instance-attributes
    def __init__(self):
        self.type1 = []
        self.type1_ts = []
        self.type2 = []
        self.type2_ts = []
        self.type1_3 = []
        self.type1_3_ts = []
        self.all = []
        self.all_ts = []

    def register(self):
        NotificationQueue.register(self.log_type1, 'type1', priority=90)
        NotificationQueue.register(self.log_type2, 'type2')
        NotificationQueue.register(self.log_type1_3, ['type1', 'type3'])
        NotificationQueue.register(self.log_all, priority=50)

        # these should be ignored by the queue
        NotificationQueue.register(self.log_type1, 'type1')
        NotificationQueue.register(self.log_type1_3, ['type1', 'type3'])
        NotificationQueue.register(self.log_all)

    def log_type1(self, val):
        self.type1_ts.append(time.time())
        self.type1.append(val)

    def log_type2(self, val):
        self.type2_ts.append(time.time())
        self.type2.append(val)

    def log_type1_3(self, val):
        self.type1_3_ts.append(time.time())
        self.type1_3.append(val)

    def log_all(self, val):
        self.all_ts.append(time.time())
        self.all.append(val)

    def clear(self):
        self.type1 = []
        self.type1_ts = []
        self.type2 = []
        self.type2_ts = []
        self.type1_3 = []
        self.type1_3_ts = []
        self.all = []
        self.all_ts = []
        NotificationQueue.deregister(self.log_type1, 'type1')
        NotificationQueue.deregister(self.log_type2, 'type2')
        NotificationQueue.deregister(self.log_type1_3, ['type1', 'type3'])
        NotificationQueue.deregister(self.log_all)


class NotificationQueueTest(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.listener = Listener()

    def setUp(self):
        self.listener.register()

    def tearDown(self):
        self.listener.clear()

    def test_invalid_register(self):
        with self.assertRaises(Exception) as ctx:
            NotificationQueue.register(None, 1)
        self.assertEqual(str(ctx.exception),
                         "n_types param is neither a string nor a list")

    def test_notifications(self):
        NotificationQueue.start_queue()
        NotificationQueue.new_notification('type1', 1)
        NotificationQueue.new_notification('type2', 2)
        NotificationQueue.new_notification('type3', 3)
        NotificationQueue.stop()
        self.assertEqual(self.listener.type1, [1])
        self.assertEqual(self.listener.type2, [2])
        self.assertEqual(self.listener.type1_3, [1, 3])
        self.assertEqual(self.listener.all, [1, 2, 3])

        # validate priorities
        self.assertLessEqual(self.listener.type1_3_ts[0], self.listener.all_ts[0])
        self.assertLessEqual(self.listener.all_ts[0], self.listener.type1_ts[0])
        self.assertLessEqual(self.listener.type2_ts[0], self.listener.all_ts[1])
        self.assertLessEqual(self.listener.type1_3_ts[1], self.listener.all_ts[2])

    def test_notifications2(self):
        NotificationQueue.start_queue()
        for i in range(0, 600):
            typ = "type{}".format(i % 3 + 1)
            if random.random() < 0.5:
                time.sleep(0.002)
            NotificationQueue.new_notification(typ, i)
        NotificationQueue.stop()
        for i in range(0, 600):
            typ = i % 3 + 1
            if typ == 1:
                self.assertIn(i, self.listener.type1)
                self.assertIn(i, self.listener.type1_3)
            elif typ == 2:
                self.assertIn(i, self.listener.type2)
            elif typ == 3:
                self.assertIn(i, self.listener.type1_3)
            self.assertIn(i, self.listener.all)

        self.assertEqual(len(self.listener.type1), 200)
        self.assertEqual(len(self.listener.type2), 200)
        self.assertEqual(len(self.listener.type1_3), 400)
        self.assertEqual(len(self.listener.all), 600)

    def test_deregister(self):
        NotificationQueue.start_queue()
        NotificationQueue.new_notification('type1', 1)
        NotificationQueue.new_notification('type3', 3)
        NotificationQueue.stop()
        self.assertEqual(self.listener.type1, [1])
        self.assertEqual(self.listener.type1_3, [1, 3])

        NotificationQueue.start_queue()
        NotificationQueue.deregister(self.listener.log_type1_3, ['type1'])
        NotificationQueue.new_notification('type1', 4)
        NotificationQueue.new_notification('type3', 5)
        NotificationQueue.stop()
        self.assertEqual(self.listener.type1, [1, 4])
        self.assertEqual(self.listener.type1_3, [1, 3, 5])