summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/progress/test_progress.py
blob: 47baa177e25b7b59cbb5b7da71ec4e9b297de6d0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#python unit test
import unittest
import os
import sys
from tests import mock

import pytest
import json
os.environ['UNITTEST'] = "1"
sys.path.insert(0, "../../pybind/mgr")
from progress import module

class TestPgRecoveryEvent(object):
    # Testing PgRecoveryEvent class

    def setup_method(self):
        # Creating the class and Mocking 
        # a bunch of attributes for testing
        module._module = mock.Mock() # just so Event._refresh() works
        self.test_event = module.PgRecoveryEvent(None, None, [module.PgId(1,i) for i in range(3)], [0], 30, False)

    def test_pg_update(self):
        # Test for a completed event when the pg states show active+clean
        pg_progress = {
            "pgs": {
                "1.0": {
                    "state": "active+clean",
                    "num_bytes": 10,
                    "num_bytes_recovered": 10,
                    "reported_epoch": 30,
                },
                "1.1": {
                    "state": "active+clean",
                    "num_bytes": 10,
                    "num_bytes_recovered": 10,
                    "reported_epoch": 30,
                },
                "1.2": {
                    "state": "active+clean",
                    "num_bytes": 10,
                    "num_bytes_recovered": 10,
                    "reported_epoch": 30,
                },
            },
            "pg_ready": True,
        }
        self.test_event.pg_update(pg_progress, mock.Mock())
        assert self.test_event._progress == 1.0


class OSDMap: 
    
    # This is an artificial class to help
    # _osd_in_out function have all the 
    # necessary characteristics, some
    # of the funcitons are copied from
    # mgr_module

    def __init__(self, dump, pg_stats):
        self._dump = dump
        self._pg_stats = pg_stats
        
    def _pg_to_up_acting_osds(self, pool_id, ps):
        pg_id = str(pool_id) + "." + str(ps)
        for pg in self._pg_stats["pg_stats"]:
            if pg["pg_id"] == pg_id:
                ret = {
                        "up_primary": pg["up_primary"],
                        "acting_primary": pg["acting_primary"],
                        "up": pg["up"],
                        "acting": pg["acting"]
                        }
                return ret

    def dump(self):
        return self._dump

    def get_pools(self):
        d = self._dump()
        return dict([(p['pool'], p) for p in d['pools']])

    def get_pools_by_name(self):
        d = self._dump()
        return dict([(p['pool_name'], p) for p in d['pools']])

    def pg_to_up_acting_osds(self, pool_id, ps):
        return self._pg_to_up_acting_osds(pool_id, ps)


class TestModule(object):
    # Testing Module Class
    
    def setup_method(self):
        # Creating the class and Mocking a
        # bunch of attributes for testing

        module.PgRecoveryEvent.pg_update = mock.Mock()
        module.Module._ceph_get_option = mock.Mock()  # .__init__
        module.Module._configure_logging = lambda *args: ...  # .__init__
        self.test_module = module.Module('module_name', 0, 0)  # so we can see if an event gets created
        self.test_module.get = mock.Mock() # so we can call pg_update
        self.test_module._complete = mock.Mock() # we want just to see if this event gets called
        self.test_module.get_osdmap = mock.Mock() # so that self.get_osdmap().get_epoch() works
        module._module = mock.Mock() # so that Event.refresh() works

    def test_osd_in_out(self):
        # test for the correct event being
        # triggered and completed.

        old_pg_stats = {
            "pg_stats":[
                {
                "pg_id": "1.0",
                "up_primary": 3,
                "acting_primary": 3,
                "up": [
                    3,
                    0
                    ],
                "acting": [
                    3,
                    0
                    ]
        
                },

                ]
            }
        new_pg_stats = {
            "pg_stats":[
                {
              "pg_id": "1.0",
              "up_primary": 0,
              "acting_primary": 0,
              "up": [
                0,
                2
              ],
              "acting": [
                0,
                2
              ]
            },
                ]
            }

        old_dump ={ 
            "pools": [
                {
                    "pool": 1,
                    "pg_num": 1
                    }
                ]
            }

        new_dump = {
                "pools": [
                    {
                        "pool": 1,
                        "pg_num": 1
                        }
                    ]
                }

        new_map = OSDMap(new_dump, new_pg_stats)
        old_map = OSDMap(old_dump, old_pg_stats)
        self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "out")
        # check if only one event is created
        assert len(self.test_module._events) == 1
        self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "in")
        # check if complete function is called
        assert self.test_module._complete.call_count == 1
        # check if a PgRecovery Event was created and pg_update gets triggered
        assert module.PgRecoveryEvent.pg_update.call_count == 2