From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/pybind/mgr/progress/test_progress.py | 174 +++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 src/pybind/mgr/progress/test_progress.py (limited to 'src/pybind/mgr/progress/test_progress.py') diff --git a/src/pybind/mgr/progress/test_progress.py b/src/pybind/mgr/progress/test_progress.py new file mode 100644 index 000000000..47baa177e --- /dev/null +++ b/src/pybind/mgr/progress/test_progress.py @@ -0,0 +1,174 @@ +#python unit test +import unittest +import os +import sys +from tests import mock + +import pytest +import json +os.environ['UNITTEST'] = "1" +sys.path.insert(0, "../../pybind/mgr") +from progress import module + +class TestPgRecoveryEvent(object): + # Testing PgRecoveryEvent class + + def setup_method(self): + # Creating the class and Mocking + # a bunch of attributes for testing + module._module = mock.Mock() # just so Event._refresh() works + self.test_event = module.PgRecoveryEvent(None, None, [module.PgId(1,i) for i in range(3)], [0], 30, False) + + def test_pg_update(self): + # Test for a completed event when the pg states show active+clean + pg_progress = { + "pgs": { + "1.0": { + "state": "active+clean", + "num_bytes": 10, + "num_bytes_recovered": 10, + "reported_epoch": 30, + }, + "1.1": { + "state": "active+clean", + "num_bytes": 10, + "num_bytes_recovered": 10, + "reported_epoch": 30, + }, + "1.2": { + "state": "active+clean", + "num_bytes": 10, + "num_bytes_recovered": 10, + "reported_epoch": 30, + }, + }, + "pg_ready": True, + } + self.test_event.pg_update(pg_progress, mock.Mock()) + assert self.test_event._progress == 1.0 + + +class OSDMap: + + # This is an artificial class to help + # _osd_in_out function have all the + # necessary characteristics, some + # of the funcitons are copied from + # mgr_module + + def __init__(self, dump, pg_stats): + self._dump = dump + self._pg_stats = pg_stats + + def _pg_to_up_acting_osds(self, pool_id, ps): + pg_id = str(pool_id) + "." + str(ps) + for pg in self._pg_stats["pg_stats"]: + if pg["pg_id"] == pg_id: + ret = { + "up_primary": pg["up_primary"], + "acting_primary": pg["acting_primary"], + "up": pg["up"], + "acting": pg["acting"] + } + return ret + + def dump(self): + return self._dump + + def get_pools(self): + d = self._dump() + return dict([(p['pool'], p) for p in d['pools']]) + + def get_pools_by_name(self): + d = self._dump() + return dict([(p['pool_name'], p) for p in d['pools']]) + + def pg_to_up_acting_osds(self, pool_id, ps): + return self._pg_to_up_acting_osds(pool_id, ps) + + +class TestModule(object): + # Testing Module Class + + def setup_method(self): + # Creating the class and Mocking a + # bunch of attributes for testing + + module.PgRecoveryEvent.pg_update = mock.Mock() + module.Module._ceph_get_option = mock.Mock() # .__init__ + module.Module._configure_logging = lambda *args: ... # .__init__ + self.test_module = module.Module('module_name', 0, 0) # so we can see if an event gets created + self.test_module.get = mock.Mock() # so we can call pg_update + self.test_module._complete = mock.Mock() # we want just to see if this event gets called + self.test_module.get_osdmap = mock.Mock() # so that self.get_osdmap().get_epoch() works + module._module = mock.Mock() # so that Event.refresh() works + + def test_osd_in_out(self): + # test for the correct event being + # triggered and completed. + + old_pg_stats = { + "pg_stats":[ + { + "pg_id": "1.0", + "up_primary": 3, + "acting_primary": 3, + "up": [ + 3, + 0 + ], + "acting": [ + 3, + 0 + ] + + }, + + ] + } + new_pg_stats = { + "pg_stats":[ + { + "pg_id": "1.0", + "up_primary": 0, + "acting_primary": 0, + "up": [ + 0, + 2 + ], + "acting": [ + 0, + 2 + ] + }, + ] + } + + old_dump ={ + "pools": [ + { + "pool": 1, + "pg_num": 1 + } + ] + } + + new_dump = { + "pools": [ + { + "pool": 1, + "pg_num": 1 + } + ] + } + + new_map = OSDMap(new_dump, new_pg_stats) + old_map = OSDMap(old_dump, old_pg_stats) + self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "out") + # check if only one event is created + assert len(self.test_module._events) == 1 + self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "in") + # check if complete function is called + assert self.test_module._complete.call_count == 1 + # check if a PgRecovery Event was created and pg_update gets triggered + assert module.PgRecoveryEvent.pg_update.call_count == 2 -- cgit v1.2.3