summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/selftest/module.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/pybind/mgr/selftest/module.py
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/pybind/mgr/selftest/module.py489
1 files changed, 489 insertions, 0 deletions
diff --git a/src/pybind/mgr/selftest/module.py b/src/pybind/mgr/selftest/module.py
new file mode 100644
index 00000000..6aff2aa5
--- /dev/null
+++ b/src/pybind/mgr/selftest/module.py
@@ -0,0 +1,489 @@
+
+from mgr_module import MgrModule, CommandResult, PersistentStoreDict
+import threading
+import random
+import json
+import errno
+import six
+
+
+class Module(MgrModule):
+ """
+ This module is for testing the ceph-mgr python interface from within
+ a running ceph-mgr daemon.
+
+ It implements a sychronous self-test command for calling the functions
+ in the MgrModule interface one by one, and a background "workload"
+ command for causing the module to perform some thrashing-type
+ activities in its serve() thread.
+ """
+
+ # These workloads are things that can be requested to run inside the
+ # serve() function
+ WORKLOAD_COMMAND_SPAM = "command_spam"
+ WORKLOAD_THROW_EXCEPTION = "throw_exception"
+ SHUTDOWN = "shutdown"
+
+ WORKLOADS = (WORKLOAD_COMMAND_SPAM, WORKLOAD_THROW_EXCEPTION)
+
+ # The test code in qa/ relies on these options existing -- they
+ # are of course not really used for anything in the module
+ MODULE_OPTIONS = [
+ {'name': 'testkey'},
+ {'name': 'testlkey'},
+ {'name': 'testnewline'},
+ {'name': 'roption1'},
+ {'name': 'roption2', 'type': 'str', 'default': 'xyz'},
+ {'name': 'rwoption1'},
+ {'name': 'rwoption2', 'type': 'int'},
+ {'name': 'rwoption3', 'type': 'float'},
+ {'name': 'rwoption4', 'type': 'str'},
+ {'name': 'rwoption5', 'type': 'bool'},
+ {'name': 'rwoption6', 'type': 'bool', 'default': True}
+ ]
+
+ COMMANDS = [
+ {
+ "cmd": "mgr self-test run",
+ "desc": "Run mgr python interface tests",
+ "perm": "rw"
+ },
+ {
+ "cmd": "mgr self-test background start name=workload,type=CephString",
+ "desc": "Activate a background workload (one of {0})".format(
+ ", ".join(WORKLOADS)),
+ "perm": "rw"
+ },
+ {
+ "cmd": "mgr self-test background stop",
+ "desc": "Stop background workload if any is running",
+ "perm": "rw"
+ },
+ {
+ "cmd": "mgr self-test config get name=key,type=CephString",
+ "desc": "Peek at a configuration value",
+ "perm": "rw"
+ },
+ {
+ "cmd": "mgr self-test config get_localized name=key,type=CephString",
+ "desc": "Peek at a configuration value (localized variant)",
+ "perm": "rw"
+ },
+ {
+ "cmd": "mgr self-test remote",
+ "desc": "Test inter-module calls",
+ "perm": "rw"
+ },
+ {
+ "cmd": "mgr self-test module name=module,type=CephString",
+ "desc": "Run another module's self_test() method",
+ "perm": "rw"
+ },
+ {
+ "cmd": "mgr self-test health set name=checks,type=CephString",
+ "desc": "Set a health check from a JSON-formatted description.",
+ "perm": "rw"
+ },
+ {
+ "cmd": "mgr self-test health clear name=checks,type=CephString,n=N,req=False",
+ "desc": "Clear health checks by name. If no names provided, clear all.",
+ "perm": "rw"
+ },
+ {
+ "cmd": "mgr self-test insights_set_now_offset name=hours,type=CephString",
+ "desc": "Set the now time for the insights module.",
+ "perm": "rw"
+ },
+ {
+ "cmd": "mgr self-test cluster-log name=channel,type=CephString "
+ "name=priority,type=CephString "
+ "name=message,type=CephString",
+ "desc": "Create an audit log record.",
+ "perm": "rw"
+ },
+ ]
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ self._event = threading.Event()
+ self._workload = None
+ self._health = {}
+
+ def handle_command(self, inbuf, command):
+ if command['prefix'] == 'mgr self-test run':
+ self._self_test()
+ return 0, '', 'Self-test succeeded'
+
+ elif command['prefix'] == 'mgr self-test background start':
+ if command['workload'] not in self.WORKLOADS:
+ return (-errno.EINVAL, '',
+ "Workload not found '{0}'".format(command['workload']))
+ self._workload = command['workload']
+ self._event.set()
+ return 0, '', 'Running `{0}` in background'.format(self._workload)
+
+ elif command['prefix'] == 'mgr self-test background stop':
+ if self._workload:
+ was_running = self._workload
+ self._workload = None
+ self._event.set()
+ return 0, '', 'Stopping background workload `{0}`'.format(
+ was_running)
+ else:
+ return 0, '', 'No background workload was running'
+ elif command['prefix'] == 'mgr self-test config get':
+ return 0, str(self.get_module_option(command['key'])), ''
+ elif command['prefix'] == 'mgr self-test config get_localized':
+ return 0, str(self.get_localized_module_option(command['key'])), ''
+ elif command['prefix'] == 'mgr self-test remote':
+ self._test_remote_calls()
+ return 0, '', 'Successfully called'
+ elif command['prefix'] == 'mgr self-test module':
+ try:
+ r = self.remote(command['module'], "self_test")
+ except RuntimeError as e:
+ return -1, '', "Test failed: {0}".format(e)
+ else:
+ return 0, str(r), "Self-test OK"
+ elif command['prefix'] == 'mgr self-test health set':
+ return self._health_set(inbuf, command)
+ elif command['prefix'] == 'mgr self-test health clear':
+ return self._health_clear(inbuf, command)
+ elif command['prefix'] == 'mgr self-test insights_set_now_offset':
+ return self._insights_set_now_offset(inbuf, command)
+ elif command['prefix'] == 'mgr self-test cluster-log':
+ priority_map = {
+ 'info': self.CLUSTER_LOG_PRIO_INFO,
+ 'security': self.CLUSTER_LOG_PRIO_SEC,
+ 'warning': self.CLUSTER_LOG_PRIO_WARN,
+ 'error': self.CLUSTER_LOG_PRIO_ERROR
+ }
+ self.cluster_log(command['channel'],
+ priority_map[command['priority']],
+ command['message'])
+ return 0, '', 'Successfully called'
+ else:
+ return (-errno.EINVAL, '',
+ "Command not found '{0}'".format(command['prefix']))
+
+ def _health_set(self, inbuf, command):
+ try:
+ checks = json.loads(command["checks"])
+ except Exception as e:
+ return -1, "", "Failed to decode JSON input: {}".format(e)
+
+ try:
+ for check, info in six.iteritems(checks):
+ self._health[check] = {
+ "severity": str(info["severity"]),
+ "summary": str(info["summary"]),
+ "detail": [str(m) for m in info["detail"]]
+ }
+ except Exception as e:
+ return -1, "", "Invalid health check format: {}".format(e)
+
+ self.set_health_checks(self._health)
+ return 0, "", ""
+
+ def _health_clear(self, inbuf, command):
+ if "checks" in command:
+ for check in command["checks"]:
+ if check in self._health:
+ del self._health[check]
+ else:
+ self._health = dict()
+
+ self.set_health_checks(self._health)
+ return 0, "", ""
+
+ def _insights_set_now_offset(self, inbuf, command):
+ try:
+ hours = int(command["hours"])
+ except Exception as e:
+ return -1, "", "Timestamp must be numeric: {}".format(e)
+
+ self.remote("insights", "testing_set_now_time_offset", hours)
+ return 0, "", ""
+
+ def _self_test(self):
+ self.log.info("Running self-test procedure...")
+
+ self._self_test_osdmap()
+ self._self_test_getters()
+ self._self_test_config()
+ self._self_test_store()
+ self._self_test_misc()
+ self._self_test_perf_counters()
+ self._self_persistent_store_dict()
+
+ def _self_test_getters(self):
+ self.version
+ self.get_context()
+ self.get_mgr_id()
+
+ # In this function, we will assume that the system is in a steady
+ # state, i.e. if a server/service appears in one call, it will
+ # not have gone by the time we call another function referring to it
+
+ objects = [
+ "fs_map",
+ "osdmap_crush_map_text",
+ "osd_map",
+ "config",
+ "mon_map",
+ "service_map",
+ "osd_metadata",
+ "pg_summary",
+ "pg_status",
+ "pg_dump",
+ "pg_ready",
+ "df",
+ "pg_stats",
+ "pool_stats",
+ "osd_stats",
+ "osd_ping_times",
+ "health",
+ "mon_status",
+ "mgr_map"
+ ]
+ for obj in objects:
+ assert self.get(obj) is not None
+
+ assert self.get("__OBJ_DNE__") is None
+
+ servers = self.list_servers()
+ for server in servers:
+ self.get_server(server['hostname'])
+
+ osdmap = self.get('osd_map')
+ for o in osdmap['osds']:
+ osd_id = o['osd']
+ self.get_metadata("osd", str(osd_id))
+
+ self.get_daemon_status("osd", "0")
+ #send_command
+
+ def _self_test_config(self):
+ # This is not a strong test (can't tell if values really
+ # persisted), it's just for the python interface bit.
+
+ self.set_module_option("testkey", "testvalue")
+ assert self.get_module_option("testkey") == "testvalue"
+
+ self.set_localized_module_option("testkey", "foo")
+ assert self.get_localized_module_option("testkey") == "foo"
+
+ # Must return the default value defined in MODULE_OPTIONS.
+ value = self.get_localized_module_option("rwoption6")
+ assert isinstance(value, bool)
+ assert value is True
+
+ # Use default value.
+ assert self.get_module_option("roption1") is None
+ assert self.get_module_option("roption1", "foobar") == "foobar"
+ assert self.get_module_option("roption2") == "xyz"
+ assert self.get_module_option("roption2", "foobar") == "xyz"
+
+ # Option type is not defined => return as string.
+ self.set_module_option("rwoption1", 8080)
+ value = self.get_module_option("rwoption1")
+ assert isinstance(value, str)
+ assert value == "8080"
+
+ # Option type is defined => return as integer.
+ self.set_module_option("rwoption2", 10)
+ value = self.get_module_option("rwoption2")
+ assert isinstance(value, int)
+ assert value == 10
+
+ # Option type is defined => return as float.
+ self.set_module_option("rwoption3", 1.5)
+ value = self.get_module_option("rwoption3")
+ assert isinstance(value, float)
+ assert value == 1.5
+
+ # Option type is defined => return as string.
+ self.set_module_option("rwoption4", "foo")
+ value = self.get_module_option("rwoption4")
+ assert isinstance(value, str)
+ assert value == "foo"
+
+ # Option type is defined => return as bool.
+ self.set_module_option("rwoption5", False)
+ value = self.get_module_option("rwoption5")
+ assert isinstance(value, bool)
+ assert value is False
+
+ # Specified module does not exist => return None.
+ assert self.get_module_option_ex("foo", "bar") is None
+
+ # Specified key does not exist => return None.
+ assert self.get_module_option_ex("dashboard", "bar") is None
+
+ self.set_module_option_ex("telemetry", "contact", "test@test.com")
+ assert self.get_module_option_ex("telemetry", "contact") == "test@test.com"
+
+ # No option default value, so use the specified one.
+ assert self.get_module_option_ex("dashboard", "password") is None
+ assert self.get_module_option_ex("dashboard", "password", "foobar") == "foobar"
+
+ # Option type is not defined => return as string.
+ self.set_module_option_ex("selftest", "rwoption1", 1234)
+ value = self.get_module_option_ex("selftest", "rwoption1")
+ assert isinstance(value, str)
+ assert value == "1234"
+
+ # Option type is defined => return as integer.
+ self.set_module_option_ex("telemetry", "interval", 60)
+ value = self.get_module_option_ex("telemetry", "interval")
+ assert isinstance(value, int)
+ assert value == 60
+
+ # Option type is defined => return as bool.
+ self.set_module_option_ex("telemetry", "leaderboard", True)
+ value = self.get_module_option_ex("telemetry", "leaderboard")
+ assert isinstance(value, bool)
+ assert value is True
+
+ def _self_test_store(self):
+ existing_keys = set(self.get_store_prefix("test").keys())
+ self.set_store("testkey", "testvalue")
+ assert self.get_store("testkey") == "testvalue"
+
+ assert sorted(self.get_store_prefix("test").keys()) == sorted(
+ list({"testkey"} | existing_keys))
+
+
+ def _self_test_perf_counters(self):
+ self.get_perf_schema("osd", "0")
+ self.get_counter("osd", "0", "osd.op")
+ #get_counter
+ #get_all_perf_coutners
+
+ def _self_test_misc(self):
+ self.set_uri("http://this.is.a.test.com")
+ self.set_health_checks({})
+
+ def _self_test_osdmap(self):
+ osdmap = self.get_osdmap()
+ osdmap.get_epoch()
+ osdmap.get_crush_version()
+ osdmap.dump()
+
+ inc = osdmap.new_incremental()
+ osdmap.apply_incremental(inc)
+ inc.get_epoch()
+ inc.dump()
+
+ crush = osdmap.get_crush()
+ crush.dump()
+ crush.get_item_name(-1)
+ crush.get_item_weight(-1)
+ crush.find_takes()
+ crush.get_take_weight_osd_map(-1)
+
+ #osdmap.get_pools_by_take()
+ #osdmap.calc_pg_upmaps()
+ #osdmap.map_pools_pgs_up()
+
+ #inc.set_osd_reweights
+ #inc.set_crush_compat_weight_set_weights
+
+ self.log.info("Finished self-test procedure.")
+
+ def _self_persistent_store_dict(self):
+ self.test_dict = PersistentStoreDict(self, 'test_dict')
+ for i in "abcde":
+ self.test_dict[i] = {i:1}
+ assert self.test_dict.keys() == set("abcde")
+ assert 'a' in self.test_dict
+ del self.test_dict['a']
+ assert self.test_dict.keys() == set("bcde"), self.test_dict.keys()
+ assert 'a' not in self.test_dict
+ self.test_dict.clear()
+ assert not self.test_dict, dict(self.test_dict.items())
+ self.set_store('test_dict.a', 'invalid json')
+ try:
+ self.test_dict['a']
+ assert False
+ except ValueError:
+ pass
+ assert not self.test_dict, dict(self.test_dict.items())
+
+ def _test_remote_calls(self):
+ # Test making valid call
+ self.remote("influx", "handle_command", "", {"prefix": "influx self-test"})
+
+ # Test calling module that exists but isn't enabled
+ # (arbitrarily pick a non-always-on module to use)
+ disabled_module = "telegraf"
+ mgr_map = self.get("mgr_map")
+ assert disabled_module not in mgr_map['modules']
+
+ # (This works until the Z release in about 2027)
+ latest_release = sorted(mgr_map['always_on_modules'].keys())[-1]
+ assert disabled_module not in mgr_map['always_on_modules'][latest_release]
+
+ try:
+ self.remote(disabled_module, "handle_command", {"prefix": "influx self-test"})
+ except ImportError:
+ pass
+ else:
+ raise RuntimeError("ImportError not raised for disabled module")
+
+ # Test calling module that doesn't exist
+ try:
+ self.remote("idontexist", "handle_command", {"prefix": "influx self-test"})
+ except ImportError:
+ pass
+ else:
+ raise RuntimeError("ImportError not raised for nonexistent module")
+
+ # Test calling method that doesn't exist
+ try:
+ self.remote("influx", "idontexist", {"prefix": "influx self-test"})
+ except NameError:
+ pass
+ else:
+ raise RuntimeError("KeyError not raised")
+
+
+ def shutdown(self):
+ self._workload = self.SHUTDOWN
+ self._event.set()
+
+ def _command_spam(self):
+ self.log.info("Starting command_spam workload...")
+ while not self._event.is_set():
+ osdmap = self.get_osdmap()
+ dump = osdmap.dump()
+ count = len(dump['osds'])
+ i = int(random.random() * count)
+ w = random.random()
+
+ result = CommandResult('')
+ self.send_command(result, 'mon', '', json.dumps({
+ 'prefix': 'osd reweight',
+ 'id': i,
+ 'weight': w
+ }), '')
+
+ crush = osdmap.get_crush().dump()
+ r, outb, outs = result.wait()
+
+ self._event.clear()
+ self.log.info("Ended command_spam workload...")
+
+ def serve(self):
+ while True:
+ if self._workload == self.WORKLOAD_COMMAND_SPAM:
+ self._command_spam()
+ elif self._workload == self.SHUTDOWN:
+ self.log.info("Shutting down...")
+ break
+ elif self._workload == self.WORKLOAD_THROW_EXCEPTION:
+ raise RuntimeError("Synthetic exception in serve")
+ else:
+ self.log.info("Waiting for workload request...")
+ self._event.wait()
+ self._event.clear()