summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/tests
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/pybind/mgr/tests
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/tests')
-rw-r--r--src/pybind/mgr/tests/__init__.py226
-rw-r--r--src/pybind/mgr/tests/test_mgr_util.py19
-rw-r--r--src/pybind/mgr/tests/test_object_format.py582
-rw-r--r--src/pybind/mgr/tests/test_tls.py55
4 files changed, 882 insertions, 0 deletions
diff --git a/src/pybind/mgr/tests/__init__.py b/src/pybind/mgr/tests/__init__.py
new file mode 100644
index 000000000..633959084
--- /dev/null
+++ b/src/pybind/mgr/tests/__init__.py
@@ -0,0 +1,226 @@
+# type: ignore
+
+import json
+import logging
+import os
+
+if 'UNITTEST' in os.environ:
+
+ # Mock ceph_module. Otherwise every module that is involved in a testcase and imports it will
+ # raise an ImportError
+
+ import sys
+
+ try:
+ from unittest import mock
+ except ImportError:
+ import mock
+
+ M_classes = set()
+
+ class M(object):
+ """
+ Note that:
+
+ * self.set_store() populates self._store
+ * self.set_module_option() populates self._store[module_name]
+ * self.get(thing) comes from self._store['_ceph_get' + thing]
+
+ """
+
+ def mock_store_get(self, kind, key, default):
+ if not hasattr(self, '_store'):
+ self._store = {}
+ return self._store.get(f'mock_store/{kind}/{key}', default)
+
+ def mock_store_set(self, kind, key, value):
+ if not hasattr(self, '_store'):
+ self._store = {}
+ k = f'mock_store/{kind}/{key}'
+ if value is None:
+ if k in self._store:
+ del self._store[k]
+ else:
+ self._store[k] = value
+
+ def mock_store_prefix(self, kind, prefix):
+ if not hasattr(self, '_store'):
+ self._store = {}
+ full_prefix = f'mock_store/{kind}/{prefix}'
+ kind_len = len(f'mock_store/{kind}/')
+ return {
+ k[kind_len:]: v for k, v in self._store.items()
+ if k.startswith(full_prefix)
+ }
+
+ def _ceph_get_store(self, k):
+ return self.mock_store_get('store', k, None)
+
+ def _ceph_set_store(self, k, v):
+ self.mock_store_set('store', k, v)
+
+ def _ceph_get_store_prefix(self, prefix):
+ return self.mock_store_prefix('store', prefix)
+
+ def _ceph_get_module_option(self, module, key, localized_prefix=None):
+ try:
+ _, val, _ = self.check_mon_command({
+ 'prefix': 'config get',
+ 'who': 'mgr',
+ 'key': f'mgr/{module}/{key}'
+ })
+ except FileNotFoundError:
+ val = None
+ mo = [o for o in self.MODULE_OPTIONS if o['name'] == key]
+ if len(mo) >= 1: # >= 1, cause self.MODULE_OPTIONS. otherwise it
+ # fails when importing multiple modules.
+ if 'default' in mo and val is None:
+ val = mo[0]['default']
+ if val is not None:
+ cls = {
+ 'str': str,
+ 'secs': int,
+ 'bool': lambda s: bool(s) and s != 'false' and s != 'False',
+ 'int': int,
+ }[mo[0].get('type', 'str')]
+ return cls(val)
+ return val
+ else:
+ return val if val is not None else ''
+
+ def _ceph_set_module_option(self, module, key, val):
+ _, _, _ = self.check_mon_command({
+ 'prefix': 'config set',
+ 'who': 'mgr',
+ 'name': f'mgr/{module}/{key}',
+ 'value': val
+ })
+ return val
+
+ def _ceph_get(self, data_name):
+ return self.mock_store_get('_ceph_get', data_name, mock.MagicMock())
+
+ def _ceph_send_command(self, res, svc_type, svc_id, command, tag, inbuf):
+
+ cmd = json.loads(command)
+ getattr(self, '_mon_commands_sent', []).append(cmd)
+
+ # Mocking the config store is handy sometimes:
+ def config_get():
+ who = cmd['who'].split('.')
+ whos = ['global'] + ['.'.join(who[:i + 1]) for i in range(len(who))]
+ for attepmt in reversed(whos):
+ val = self.mock_store_get('config', f'{attepmt}/{cmd["key"]}', None)
+ if val is not None:
+ return val
+ return None
+
+ def config_set():
+ self.mock_store_set('config', f'{cmd["who"]}/{cmd["name"]}', cmd['value'])
+ return ''
+
+ def config_rm():
+ self.mock_store_set('config', f'{cmd["who"]}/{cmd["name"]}', None)
+ return ''
+
+ def config_dump():
+ r = []
+ for prefix, value in self.mock_store_prefix('config', '').items():
+ section, name = prefix.split('/', 1)
+ r.append({
+ 'name': name,
+ 'section': section,
+ 'value': value
+ })
+ return json.dumps(r)
+
+ outb = ''
+ if cmd['prefix'] == 'config get':
+ outb = config_get()
+ elif cmd['prefix'] == 'config set':
+ outb = config_set()
+ elif cmd['prefix'] == 'config dump':
+ outb = config_dump()
+ elif cmd['prefix'] == 'config rm':
+ outb = config_rm()
+ elif hasattr(self, '_mon_command_mock_' + cmd['prefix'].replace(' ', '_')):
+ a = getattr(self, '_mon_command_mock_' + cmd['prefix'].replace(' ', '_'))
+ outb = a(cmd)
+
+ res.complete(0, outb, '')
+
+ def _ceph_get_foreign_option(self, entity, name):
+ who = entity.split('.')
+ whos = ['global'] + ['.'.join(who[:i + 1]) for i in range(len(who))]
+ for attepmt in reversed(whos):
+ val = self.mock_store_get('config', f'{attepmt}/{name}', None)
+ if val is not None:
+ return val
+ return None
+
+ def assert_issued_mon_command(self, command):
+ assert command in self._mon_commands_sent, self._mon_commands_sent
+
+ @property
+ def _logger(self):
+ return logging.getLogger(__name__)
+
+ @_logger.setter
+ def _logger(self, _):
+ pass
+
+ def __init__(self, *args):
+ self._mon_commands_sent = []
+ if not hasattr(self, '_store'):
+ self._store = {}
+
+ if self.__class__ not in M_classes:
+ # call those only once.
+ self._register_commands('')
+ self._register_options('')
+ M_classes.add(self.__class__)
+
+ super(M, self).__init__()
+ self._ceph_get_version = mock.Mock()
+ self._ceph_get_ceph_conf_path = mock.MagicMock()
+ self._ceph_get_option = mock.MagicMock()
+ self._ceph_get_context = mock.MagicMock()
+ self._ceph_register_client = mock.MagicMock()
+ self._ceph_set_health_checks = mock.MagicMock()
+ self._configure_logging = lambda *_: None
+ self._unconfigure_logging = mock.MagicMock()
+ self._ceph_log = mock.MagicMock()
+ self._ceph_dispatch_remote = lambda *_: None
+ self._ceph_get_mgr_id = mock.MagicMock()
+
+ cm = mock.Mock()
+ cm.BaseMgrModule = M
+ cm.BaseMgrStandbyModule = M
+ sys.modules['ceph_module'] = cm
+
+ def mock_ceph_modules():
+ class MockRadosError(Exception):
+ def __init__(self, message, errno=None):
+ super(MockRadosError, self).__init__(message)
+ self.errno = errno
+
+ def __str__(self):
+ msg = super(MockRadosError, self).__str__()
+ if self.errno is None:
+ return msg
+ return '[errno {0}] {1}'.format(self.errno, msg)
+
+ class MockObjectNotFound(Exception):
+ pass
+
+ sys.modules.update({
+ 'rados': mock.MagicMock(
+ Error=MockRadosError,
+ OSError=MockRadosError,
+ ObjectNotFound=MockObjectNotFound),
+ 'rbd': mock.Mock(),
+ 'cephfs': mock.Mock(),
+ })
+
+ # Unconditionally mock the rados objects when we're imported
+ mock_ceph_modules() # type: ignore
diff --git a/src/pybind/mgr/tests/test_mgr_util.py b/src/pybind/mgr/tests/test_mgr_util.py
new file mode 100644
index 000000000..fb7732d5c
--- /dev/null
+++ b/src/pybind/mgr/tests/test_mgr_util.py
@@ -0,0 +1,19 @@
+import datetime
+import mgr_util
+
+import pytest
+
+
+@pytest.mark.parametrize(
+ "delta, out",
+ [
+ (datetime.timedelta(minutes=90), '90m'),
+ (datetime.timedelta(minutes=190), '3h'),
+ (datetime.timedelta(days=3), '3d'),
+ (datetime.timedelta(hours=3), '3h'),
+ (datetime.timedelta(days=365 * 3.1), '3y'),
+ (datetime.timedelta(minutes=90), '90m'),
+ ]
+)
+def test_pretty_timedelta(delta: datetime.timedelta, out: str):
+ assert mgr_util.to_pretty_timedelta(delta) == out
diff --git a/src/pybind/mgr/tests/test_object_format.py b/src/pybind/mgr/tests/test_object_format.py
new file mode 100644
index 000000000..d2fd20870
--- /dev/null
+++ b/src/pybind/mgr/tests/test_object_format.py
@@ -0,0 +1,582 @@
+import errno
+from typing import (
+ Any,
+ Dict,
+ Optional,
+ Tuple,
+ Type,
+ TypeVar,
+)
+
+import pytest
+
+from mgr_module import CLICommand
+import object_format
+
+
+T = TypeVar("T", bound="Parent")
+
+
+class Simpler:
+ def __init__(self, name, val=None):
+ self.name = name
+ self.val = val or {}
+ self.version = 1
+
+ def to_simplified(self) -> Dict[str, Any]:
+ return {
+ "version": self.version,
+ "name": self.name,
+ "value": self.val,
+ }
+
+
+class JSONer(Simpler):
+ def to_json(self) -> Dict[str, Any]:
+ d = self.to_simplified()
+ d["_json"] = True
+ return d
+
+ @classmethod
+ def from_json(cls: Type[T], data) -> T:
+ o = cls(data.get("name", ""), data.get("value"))
+ o.version = data.get("version", 1) + 1
+ return o
+
+
+class YAMLer(Simpler):
+ def to_yaml(self) -> Dict[str, Any]:
+ d = self.to_simplified()
+ d["_yaml"] = True
+ return d
+
+
+@pytest.mark.parametrize(
+ "obj, compatible, json_val",
+ [
+ ({}, False, "{}"),
+ ({"name": "foobar"}, False, '{"name": "foobar"}'),
+ ([1, 2, 3], False, "[1, 2, 3]"),
+ (JSONer("bob"), False, '{"name": "bob", "value": {}, "version": 1}'),
+ (
+ JSONer("betty", 77),
+ False,
+ '{"name": "betty", "value": 77, "version": 1}',
+ ),
+ ({}, True, "{}"),
+ ({"name": "foobar"}, True, '{"name": "foobar"}'),
+ (
+ JSONer("bob"),
+ True,
+ '{"_json": true, "name": "bob", "value": {}, "version": 1}',
+ ),
+ ],
+)
+def test_format_json(obj: Any, compatible: bool, json_val: str):
+ assert (
+ object_format.ObjectFormatAdapter(
+ obj, compatible=compatible, json_indent=None
+ ).format_json()
+ == json_val
+ )
+
+
+@pytest.mark.parametrize(
+ "obj, compatible, yaml_val",
+ [
+ ({}, False, "{}\n"),
+ ({"name": "foobar"}, False, "name: foobar\n"),
+ (
+ {"stuff": [1, 88, 909, 32]},
+ False,
+ "stuff:\n- 1\n- 88\n- 909\n- 32\n",
+ ),
+ (
+ JSONer("zebulon", "999"),
+ False,
+ "name: zebulon\nvalue: '999'\nversion: 1\n",
+ ),
+ ({}, True, "{}\n"),
+ ({"name": "foobar"}, True, "name: foobar\n"),
+ (
+ YAMLer("thingy", "404"),
+ True,
+ "_yaml: true\nname: thingy\nvalue: '404'\nversion: 1\n",
+ ),
+ ],
+)
+def test_format_yaml(obj: Any, compatible: bool, yaml_val: str):
+ assert (
+ object_format.ObjectFormatAdapter(
+ obj, compatible=compatible
+ ).format_yaml()
+ == yaml_val
+ )
+
+
+class Retty:
+ def __init__(self, v) -> None:
+ self.value = v
+
+ def mgr_return_value(self) -> int:
+ return self.value
+
+
+@pytest.mark.parametrize(
+ "obj, ret",
+ [
+ ({}, 0),
+ ({"fish": "sticks"}, 0),
+ (-55, 0),
+ (Retty(0), 0),
+ (Retty(-55), -55),
+ ],
+)
+def test_return_value(obj: Any, ret: int):
+ rva = object_format.ReturnValueAdapter(obj)
+ # a ReturnValueAdapter instance meets the ReturnValueProvider protocol.
+ assert object_format._is_return_value_provider(rva)
+ assert rva.mgr_return_value() == ret
+
+
+def test_valid_formats():
+ ofa = object_format.ObjectFormatAdapter({"fred": "wilma"})
+ vf = ofa.valid_formats()
+ assert "json" in vf
+ assert "yaml" in vf
+ assert "xml" in vf
+ assert "plain" in vf
+
+
+def test_error_response_exceptions():
+ err = object_format.ErrorResponseBase()
+ with pytest.raises(NotImplementedError):
+ err.format_response()
+
+ err = object_format.UnsupportedFormat("cheese")
+ assert err.format_response() == (-22, "", "Unsupported format: cheese")
+
+ err = object_format.UnknownFormat("chocolate")
+ assert err.format_response() == (-22, "", "Unknown format name: chocolate")
+
+
+@pytest.mark.parametrize(
+ "value, format, result",
+ [
+ ({}, None, (0, "{}", "")),
+ ({"blat": True}, "json", (0, '{\n "blat": true\n}', "")),
+ ({"blat": True}, "yaml", (0, "blat: true\n", "")),
+ ({"blat": True}, "toml", (-22, "", "Unknown format name: toml")),
+ ({"blat": True}, "xml", (-22, "", "Unsupported format: xml")),
+ (
+ JSONer("hoop", "303"),
+ "yaml",
+ (0, "name: hoop\nvalue: '303'\nversion: 1\n", ""),
+ ),
+ ],
+)
+def test_responder_decorator_default(
+ value: Any, format: Optional[str], result: Tuple[int, str, str]
+) -> None:
+ @object_format.Responder()
+ def orf_value(format: Optional[str] = None):
+ return value
+
+ assert orf_value(format=format) == result
+
+
+class PhonyMultiYAMLFormatAdapter(object_format.ObjectFormatAdapter):
+ """This adapter puts a yaml document/directive separator line
+ before all output. It doesn't actully support multiple documents.
+ """
+ def format_yaml(self):
+ yml = super().format_yaml()
+ return "---\n{}".format(yml)
+
+
+@pytest.mark.parametrize(
+ "value, format, result",
+ [
+ ({}, None, (0, "{}", "")),
+ ({"blat": True}, "json", (0, '{\n "blat": true\n}', "")),
+ ({"blat": True}, "yaml", (0, "---\nblat: true\n", "")),
+ ({"blat": True}, "toml", (-22, "", "Unknown format name: toml")),
+ ({"blat": True}, "xml", (-22, "", "Unsupported format: xml")),
+ (
+ JSONer("hoop", "303"),
+ "yaml",
+ (0, "---\nname: hoop\nvalue: '303'\nversion: 1\n", ""),
+ ),
+ ],
+)
+def test_responder_decorator_custom(
+ value: Any, format: Optional[str], result: Tuple[int, str, str]
+) -> None:
+ @object_format.Responder(PhonyMultiYAMLFormatAdapter)
+ def orf_value(format: Optional[str] = None):
+ return value
+
+ assert orf_value(format=format) == result
+
+
+class FancyDemoAdapter(PhonyMultiYAMLFormatAdapter):
+ """This adapter demonstrates adding formatting for other formats
+ like xml and plain text.
+ """
+ def format_xml(self) -> str:
+ name = self.obj.get("name")
+ size = self.obj.get("size")
+ return f'<object name="{name}" size="{size}" />'
+
+ def format_plain(self) -> str:
+ name = self.obj.get("name")
+ size = self.obj.get("size")
+ es = 'es' if size != 1 else ''
+ return f"{size} box{es} of {name}"
+
+
+class DecoDemo:
+ """Class to stand in for a mgr module, used to test CLICommand integration."""
+
+ @CLICommand("alpha one", perm="rw")
+ @object_format.Responder()
+ def alpha_one(self, name: str = "default") -> Dict[str, str]:
+ return {
+ "alpha": "one",
+ "name": name,
+ "weight": 300,
+ }
+
+ @CLICommand("beta two", perm="r")
+ @object_format.Responder()
+ def beta_two(
+ self, name: str = "default", format: Optional[str] = None
+ ) -> Dict[str, str]:
+ return {
+ "beta": "two",
+ "name": name,
+ "weight": 72,
+ }
+
+ @CLICommand("gamma three", perm="rw")
+ @object_format.Responder(FancyDemoAdapter)
+ def gamma_three(self, size: int = 0) -> Dict[str, Any]:
+ return {"name": "funnystuff", "size": size}
+
+ @CLICommand("z_err", perm="rw")
+ @object_format.ErrorResponseHandler()
+ def z_err(self, name: str = "default") -> Tuple[int, str, str]:
+ if "z" in name:
+ raise object_format.ErrorResponse(f"{name} bad")
+ return 0, name, ""
+
+ @CLICommand("empty one", perm="rw")
+ @object_format.EmptyResponder()
+ def empty_one(self, name: str = "default", retval: Optional[int] = None) -> None:
+ # in real code, this would be making some sort of state change
+ # but we need to handle erors still
+ if retval is None:
+ retval = -5
+ if name in ["pow"]:
+ raise object_format.ErrorResponse(name, return_value=retval)
+ return
+
+ @CLICommand("empty bad", perm="rw")
+ @object_format.EmptyResponder()
+ def empty_bad(self, name: str = "default") -> int:
+ # in real code, this would be making some sort of state change
+ return 5
+
+
+@pytest.mark.parametrize(
+ "prefix, can_format, args, response",
+ [
+ (
+ "alpha one",
+ True,
+ {"name": "moonbase"},
+ (
+ 0,
+ '{\n "alpha": "one",\n "name": "moonbase",\n "weight": 300\n}',
+ "",
+ ),
+ ),
+ # ---
+ (
+ "alpha one",
+ True,
+ {"name": "moonbase2", "format": "yaml"},
+ (
+ 0,
+ "alpha: one\nname: moonbase2\nweight: 300\n",
+ "",
+ ),
+ ),
+ # ---
+ (
+ "alpha one",
+ True,
+ {"name": "moonbase2", "format": "chocolate"},
+ (
+ -22,
+ "",
+ "Unknown format name: chocolate",
+ ),
+ ),
+ # ---
+ (
+ "beta two",
+ True,
+ {"name": "blocker"},
+ (
+ 0,
+ '{\n "beta": "two",\n "name": "blocker",\n "weight": 72\n}',
+ "",
+ ),
+ ),
+ # ---
+ (
+ "beta two",
+ True,
+ {"name": "test", "format": "yaml"},
+ (
+ 0,
+ "beta: two\nname: test\nweight: 72\n",
+ "",
+ ),
+ ),
+ # ---
+ (
+ "beta two",
+ True,
+ {"name": "test", "format": "plain"},
+ (
+ -22,
+ "",
+ "Unsupported format: plain",
+ ),
+ ),
+ # ---
+ (
+ "gamma three",
+ True,
+ {},
+ (
+ 0,
+ '{\n "name": "funnystuff",\n "size": 0\n}',
+ "",
+ ),
+ ),
+ # ---
+ (
+ "gamma three",
+ True,
+ {"size": 1, "format": "json"},
+ (
+ 0,
+ '{\n "name": "funnystuff",\n "size": 1\n}',
+ "",
+ ),
+ ),
+ # ---
+ (
+ "gamma three",
+ True,
+ {"size": 1, "format": "plain"},
+ (
+ 0,
+ "1 box of funnystuff",
+ "",
+ ),
+ ),
+ # ---
+ (
+ "gamma three",
+ True,
+ {"size": 2, "format": "plain"},
+ (
+ 0,
+ "2 boxes of funnystuff",
+ "",
+ ),
+ ),
+ # ---
+ (
+ "gamma three",
+ True,
+ {"size": 2, "format": "xml"},
+ (
+ 0,
+ '<object name="funnystuff" size="2" />',
+ "",
+ ),
+ ),
+ # ---
+ (
+ "gamma three",
+ True,
+ {"size": 2, "format": "toml"},
+ (
+ -22,
+ "",
+ "Unknown format name: toml",
+ ),
+ ),
+ # ---
+ (
+ "z_err",
+ False,
+ {"name": "foobar"},
+ (
+ 0,
+ "foobar",
+ "",
+ ),
+ ),
+ # ---
+ (
+ "z_err",
+ False,
+ {"name": "zamboni"},
+ (
+ -22,
+ "",
+ "zamboni bad",
+ ),
+ ),
+ # ---
+ (
+ "empty one",
+ False,
+ {"name": "zucchini"},
+ (
+ 0,
+ "",
+ "",
+ ),
+ ),
+ # ---
+ (
+ "empty one",
+ False,
+ {"name": "pow"},
+ (
+ -5,
+ "",
+ "pow",
+ ),
+ ),
+ # Ensure setting return_value to zero even on an exception is honored
+ (
+ "empty one",
+ False,
+ {"name": "pow", "retval": 0},
+ (
+ 0,
+ "",
+ "pow",
+ ),
+ ),
+ ],
+)
+def test_cli_with_decorators(prefix, can_format, args, response):
+ dd = DecoDemo()
+ cmd = CLICommand.COMMANDS[prefix]
+ assert cmd.call(dd, args, None) == response
+ # slighly hacky way to check that the CLI "knows" about a --format option
+ # checking the extra_args feature of the Decorators that provide them (Responder)
+ if can_format:
+ assert 'name=format,' in cmd.args
+
+
+def test_error_response():
+ e1 = object_format.ErrorResponse("nope")
+ assert e1.format_response() == (-22, "", "nope")
+ assert e1.return_value == -22
+ assert e1.errno == 22
+ assert "ErrorResponse" in repr(e1)
+ assert "nope" in repr(e1)
+ assert e1.mgr_return_value() == -22
+
+ try:
+ open("/this/is_/extremely_/unlikely/_to/exist.txt")
+ except Exception as e:
+ e2 = object_format.ErrorResponse.wrap(e)
+ r = e2.format_response()
+ assert r[0] == -errno.ENOENT
+ assert r[1] == ""
+ assert "No such file or directory" in r[2]
+ assert "ErrorResponse" in repr(e2)
+ assert "No such file or directory" in repr(e2)
+ assert r[0] == e2.mgr_return_value()
+
+ e3 = object_format.ErrorResponse.wrap(RuntimeError("blat"))
+ r = e3.format_response()
+ assert r[0] == -errno.EINVAL
+ assert r[1] == ""
+ assert "blat" in r[2]
+ assert r[0] == e3.mgr_return_value()
+
+ # A custom exception type with an errno property
+
+ class MyCoolException(Exception):
+ def __init__(self, err_msg: str, errno: int = 0) -> None:
+ super().__init__(errno, err_msg)
+ self.errno = errno
+ self.err_msg = err_msg
+
+ def __str__(self) -> str:
+ return self.err_msg
+
+ e4 = object_format.ErrorResponse.wrap(MyCoolException("beep", -17))
+ r = e4.format_response()
+ assert r[0] == -17
+ assert r[1] == ""
+ assert r[2] == "beep"
+ assert e4.mgr_return_value() == -17
+
+ e5 = object_format.ErrorResponse.wrap(MyCoolException("ok, fine", 0))
+ r = e5.format_response()
+ assert r[0] == 0
+ assert r[1] == ""
+ assert r[2] == "ok, fine"
+
+ e5 = object_format.ErrorResponse.wrap(MyCoolException("no can do", 8))
+ r = e5.format_response()
+ assert r[0] == -8
+ assert r[1] == ""
+ assert r[2] == "no can do"
+
+ # A custom exception type that inherits from ErrorResponseBase
+
+ class MyErrorResponse(object_format.ErrorResponseBase):
+ def __init__(self, err_msg: str, return_value: int):
+ super().__init__(self, err_msg)
+ self.msg = err_msg
+ self.return_value = return_value
+
+ def format_response(self):
+ return self.return_value, "", self.msg
+
+
+ e6 = object_format.ErrorResponse.wrap(MyErrorResponse("yeah, sure", 0))
+ r = e6.format_response()
+ assert r[0] == 0
+ assert r[1] == ""
+ assert r[2] == "yeah, sure"
+ assert isinstance(e5, object_format.ErrorResponseBase)
+ assert isinstance(e6, MyErrorResponse)
+
+ e7 = object_format.ErrorResponse.wrap(MyErrorResponse("no can do", -8))
+ r = e7.format_response()
+ assert r[0] == -8
+ assert r[1] == ""
+ assert r[2] == "no can do"
+ assert isinstance(e7, object_format.ErrorResponseBase)
+ assert isinstance(e7, MyErrorResponse)
+
+
+def test_empty_responder_return_check():
+ dd = DecoDemo()
+ with pytest.raises(ValueError):
+ CLICommand.COMMANDS["empty bad"].call(dd, {}, None)
diff --git a/src/pybind/mgr/tests/test_tls.py b/src/pybind/mgr/tests/test_tls.py
new file mode 100644
index 000000000..19ce46a93
--- /dev/null
+++ b/src/pybind/mgr/tests/test_tls.py
@@ -0,0 +1,55 @@
+from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException, get_cert_issuer_info
+from OpenSSL import crypto, SSL
+
+import unittest
+
+
+valid_ceph_cert = """-----BEGIN CERTIFICATE-----\nMIICxjCCAa4CEQCpHIQuSYhCII1J0SVGYnT1MA0GCSqGSIb3DQEBDQUAMCExDTAL\nBgNVBAoMBENlcGgxEDAOBgNVBAMMB2NlcGhhZG0wHhcNMjIwNzA2MTE1MjUyWhcN\nMzIwNzAzMTE1MjUyWjAhMQ0wCwYDVQQKDARDZXBoMRAwDgYDVQQDDAdjZXBoYWRt\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAn2ApFna2CVYE7RDtjJVk\ncJTcJQrjzDOlCoZtxb1QMCQZMXjx/7d6bseQP+dkkeA0hZxnjJZWeu6c/YnQ1JiT\n2aDuDpWoJAaiinHRJyZuY5tqG+ggn95RdToZVbeC+0uALzYi4UFacC3sfpkyIKBR\nic43+2fQNz0PZ+8INSTtm75Y53gbWuGF7Dv95200AmAN2/u8LKWZIvdhbRborxOF\nlK2T40qbj9eH3ewIN/6Eibxrvg4va3pIoOaq0XdJHAL/MjDGJAtahPIenwcjuega\n4PSlB0h3qiyFXz7BG8P0QsPP6slyD58ZJtCGtJiWPOhlq47DlnWlJzRGDEFLLryf\n8wIDAQABMA0GCSqGSIb3DQEBDQUAA4IBAQBixd7RZawlYiTZaCmv3Vy7X/hhabac\nE/YiuFt1YMe0C9+D8IcCQN/IRww/Bi7Af6tm+ncHT9GsOGWX6hahXDKTw3b9nSDi\nETvjkUTYOayZGfhYpRA6m6e/2ypcUYsiXRDY9zneDKCdPREIA1D6L2fROHetFX9r\nX9rSry01xrYwNlYA1e6GLMXm2NaGsLT3JJlRBtT3P7f1jtRGXcwkc7ns0AtW0uNj\nGqRLHfJazdgWJFsj8vBdMs7Ci0C/b5/f7J/DLpPCvUA3Fqwn9MzHl01UwlDsKy1a\nROi4cfQNOLbWX8g3PfIlqtdGYNA77UPxvy1SUimmtdopZaEVWKkqeWYK\n-----END CERTIFICATE-----\n
+"""
+
+invalid_cert = """-----BEGIN CERTIFICATE-----\nMIICxjCCAa4CEQCpHIQuSYhCII1J0SVGYnT1MA0GCSqGSIb3DQEBDQUAMCExDTAL\nBgNVBAoMBENlcGgxEDAOBgNVBAMMB2NlcGhhZG0wHhcNMjIwNzA2MTE1MjUyWhcN\nMzIwNzAzMTE1MjUyWjAhMQ0wCwYDVQQKDARDZXBoMRAwDgYDVQQDDAdjZXBoYWRt\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEBn2ApFna2CVYE7RDtjJVk\ncJTcJQrjzDOlCoZtxb1QMCQZMXjx/7d6bseQP+dkkeA0hZxnjJZWeu6c/YnQ1JiT\n2aDuDpWoJAaiinHRJyZuY5tqG+ggn95RdToZVbeC+0uALzYi4UFacC3sfpkyIKBR\nic43+2fQNz0PZ+8INSTtm75Y53gbWuGF7Dv95200AmAN2/u8LKWZIvdhbRborxOF\nlK2T40qbj9eH3ewIN/6Eibxrvg4va3pIoOaq0XdJHAL/MjDGJAtahPIenwcjuega\n4PSlB0h3qiyFXz7BG8P0QsPP6slyD58ZJtCGtJiWPOhlq47DlnWlJzRGDEFLLryf\n8wIDAQABMA0GCSqGSIb3DQEBDQUAA4IBAQBixd7RZawlYiTZaCmv3Vy7X/hhabac\nE/YiuFt1YMe0C9+D8IcCQN/IRww/Bi7Af6tm+ncHT9GsOGWX6hahXDKTw3b9nSDi\nETvjkUTYOayZGfhYpRA6m6e/2ypcUYsiXRDY9zneDKCdPREIA1D6L2fROHetFX9r\nX9rSry01xrYwNlYA1e6GLMXm2NaGsLT3JJlRBtT3P7f1jtRGXcwkc7ns0AtW0uNj\nGqRLHfJazdgWJFsj8vBdMs7Ci0C/b5/f7J/DLpPCvUA3Fqwn9MzHl01UwlDsKy1a\nROi4cfQNOLbWX8g3PfIlqtdGYNA77UPxvy1SUimmtdopZa\n-----END CERTIFICATE-----\n
+"""
+
+class TLSchecks(unittest.TestCase):
+
+ def test_defaults(self):
+ crt, key = create_self_signed_cert()
+ verify_tls(crt, key)
+
+ def test_specific_dname(self):
+ crt, key = create_self_signed_cert(dname={'O': 'Ceph', 'OU': 'testsuite'})
+ verify_tls(crt, key)
+
+ def test_invalid_RDN(self):
+ self.assertRaises(ValueError, create_self_signed_cert,
+ dname={'O': 'Ceph', 'Bogus': 'testsuite'})
+
+ def test_invalid_key(self):
+ crt, key = create_self_signed_cert()
+
+ # fudge the key, to force an error to be detected during verify_tls
+ fudged = f"{key[:-35]}c0ffee==\n{key[-25:]}".encode('utf-8')
+ self.assertRaises(ServerConfigException, verify_tls, crt, fudged)
+
+ def test_mismatched_tls(self):
+ crt, _ = create_self_signed_cert()
+
+ # generate another key
+ new_key = crypto.PKey()
+ new_key.generate_key(crypto.TYPE_RSA, 2048)
+ new_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, new_key).decode('utf-8')
+
+ self.assertRaises(ServerConfigException, verify_tls, crt, new_key)
+
+ def test_get_cert_issuer_info(self):
+
+ # valid certificate
+ org, cn = get_cert_issuer_info(valid_ceph_cert)
+ assert org == 'Ceph'
+ assert cn == 'cephadm'
+
+ # empty certificate
+ self.assertRaises(ServerConfigException, get_cert_issuer_info, '')
+
+ # invalid certificate
+ self.assertRaises(ServerConfigException, get_cert_issuer_info, invalid_cert)