From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/pybind/mgr/tests/__init__.py | 226 +++++++++++ src/pybind/mgr/tests/test_mgr_util.py | 19 + src/pybind/mgr/tests/test_object_format.py | 582 +++++++++++++++++++++++++++++ src/pybind/mgr/tests/test_tls.py | 55 +++ 4 files changed, 882 insertions(+) create mode 100644 src/pybind/mgr/tests/__init__.py create mode 100644 src/pybind/mgr/tests/test_mgr_util.py create mode 100644 src/pybind/mgr/tests/test_object_format.py create mode 100644 src/pybind/mgr/tests/test_tls.py (limited to 'src/pybind/mgr/tests') diff --git a/src/pybind/mgr/tests/__init__.py b/src/pybind/mgr/tests/__init__.py new file mode 100644 index 000000000..633959084 --- /dev/null +++ b/src/pybind/mgr/tests/__init__.py @@ -0,0 +1,226 @@ +# type: ignore + +import json +import logging +import os + +if 'UNITTEST' in os.environ: + + # Mock ceph_module. Otherwise every module that is involved in a testcase and imports it will + # raise an ImportError + + import sys + + try: + from unittest import mock + except ImportError: + import mock + + M_classes = set() + + class M(object): + """ + Note that: + + * self.set_store() populates self._store + * self.set_module_option() populates self._store[module_name] + * self.get(thing) comes from self._store['_ceph_get' + thing] + + """ + + def mock_store_get(self, kind, key, default): + if not hasattr(self, '_store'): + self._store = {} + return self._store.get(f'mock_store/{kind}/{key}', default) + + def mock_store_set(self, kind, key, value): + if not hasattr(self, '_store'): + self._store = {} + k = f'mock_store/{kind}/{key}' + if value is None: + if k in self._store: + del self._store[k] + else: + self._store[k] = value + + def mock_store_prefix(self, kind, prefix): + if not hasattr(self, '_store'): + self._store = {} + full_prefix = f'mock_store/{kind}/{prefix}' + kind_len = len(f'mock_store/{kind}/') + return { + k[kind_len:]: v for k, v in self._store.items() + if k.startswith(full_prefix) + } + + def _ceph_get_store(self, k): + return self.mock_store_get('store', k, None) + + def _ceph_set_store(self, k, v): + self.mock_store_set('store', k, v) + + def _ceph_get_store_prefix(self, prefix): + return self.mock_store_prefix('store', prefix) + + def _ceph_get_module_option(self, module, key, localized_prefix=None): + try: + _, val, _ = self.check_mon_command({ + 'prefix': 'config get', + 'who': 'mgr', + 'key': f'mgr/{module}/{key}' + }) + except FileNotFoundError: + val = None + mo = [o for o in self.MODULE_OPTIONS if o['name'] == key] + if len(mo) >= 1: # >= 1, cause self.MODULE_OPTIONS. otherwise it + # fails when importing multiple modules. + if 'default' in mo and val is None: + val = mo[0]['default'] + if val is not None: + cls = { + 'str': str, + 'secs': int, + 'bool': lambda s: bool(s) and s != 'false' and s != 'False', + 'int': int, + }[mo[0].get('type', 'str')] + return cls(val) + return val + else: + return val if val is not None else '' + + def _ceph_set_module_option(self, module, key, val): + _, _, _ = self.check_mon_command({ + 'prefix': 'config set', + 'who': 'mgr', + 'name': f'mgr/{module}/{key}', + 'value': val + }) + return val + + def _ceph_get(self, data_name): + return self.mock_store_get('_ceph_get', data_name, mock.MagicMock()) + + def _ceph_send_command(self, res, svc_type, svc_id, command, tag, inbuf): + + cmd = json.loads(command) + getattr(self, '_mon_commands_sent', []).append(cmd) + + # Mocking the config store is handy sometimes: + def config_get(): + who = cmd['who'].split('.') + whos = ['global'] + ['.'.join(who[:i + 1]) for i in range(len(who))] + for attepmt in reversed(whos): + val = self.mock_store_get('config', f'{attepmt}/{cmd["key"]}', None) + if val is not None: + return val + return None + + def config_set(): + self.mock_store_set('config', f'{cmd["who"]}/{cmd["name"]}', cmd['value']) + return '' + + def config_rm(): + self.mock_store_set('config', f'{cmd["who"]}/{cmd["name"]}', None) + return '' + + def config_dump(): + r = [] + for prefix, value in self.mock_store_prefix('config', '').items(): + section, name = prefix.split('/', 1) + r.append({ + 'name': name, + 'section': section, + 'value': value + }) + return json.dumps(r) + + outb = '' + if cmd['prefix'] == 'config get': + outb = config_get() + elif cmd['prefix'] == 'config set': + outb = config_set() + elif cmd['prefix'] == 'config dump': + outb = config_dump() + elif cmd['prefix'] == 'config rm': + outb = config_rm() + elif hasattr(self, '_mon_command_mock_' + cmd['prefix'].replace(' ', '_')): + a = getattr(self, '_mon_command_mock_' + cmd['prefix'].replace(' ', '_')) + outb = a(cmd) + + res.complete(0, outb, '') + + def _ceph_get_foreign_option(self, entity, name): + who = entity.split('.') + whos = ['global'] + ['.'.join(who[:i + 1]) for i in range(len(who))] + for attepmt in reversed(whos): + val = self.mock_store_get('config', f'{attepmt}/{name}', None) + if val is not None: + return val + return None + + def assert_issued_mon_command(self, command): + assert command in self._mon_commands_sent, self._mon_commands_sent + + @property + def _logger(self): + return logging.getLogger(__name__) + + @_logger.setter + def _logger(self, _): + pass + + def __init__(self, *args): + self._mon_commands_sent = [] + if not hasattr(self, '_store'): + self._store = {} + + if self.__class__ not in M_classes: + # call those only once. + self._register_commands('') + self._register_options('') + M_classes.add(self.__class__) + + super(M, self).__init__() + self._ceph_get_version = mock.Mock() + self._ceph_get_ceph_conf_path = mock.MagicMock() + self._ceph_get_option = mock.MagicMock() + self._ceph_get_context = mock.MagicMock() + self._ceph_register_client = mock.MagicMock() + self._ceph_set_health_checks = mock.MagicMock() + self._configure_logging = lambda *_: None + self._unconfigure_logging = mock.MagicMock() + self._ceph_log = mock.MagicMock() + self._ceph_dispatch_remote = lambda *_: None + self._ceph_get_mgr_id = mock.MagicMock() + + cm = mock.Mock() + cm.BaseMgrModule = M + cm.BaseMgrStandbyModule = M + sys.modules['ceph_module'] = cm + + def mock_ceph_modules(): + class MockRadosError(Exception): + def __init__(self, message, errno=None): + super(MockRadosError, self).__init__(message) + self.errno = errno + + def __str__(self): + msg = super(MockRadosError, self).__str__() + if self.errno is None: + return msg + return '[errno {0}] {1}'.format(self.errno, msg) + + class MockObjectNotFound(Exception): + pass + + sys.modules.update({ + 'rados': mock.MagicMock( + Error=MockRadosError, + OSError=MockRadosError, + ObjectNotFound=MockObjectNotFound), + 'rbd': mock.Mock(), + 'cephfs': mock.Mock(), + }) + + # Unconditionally mock the rados objects when we're imported + mock_ceph_modules() # type: ignore diff --git a/src/pybind/mgr/tests/test_mgr_util.py b/src/pybind/mgr/tests/test_mgr_util.py new file mode 100644 index 000000000..fb7732d5c --- /dev/null +++ b/src/pybind/mgr/tests/test_mgr_util.py @@ -0,0 +1,19 @@ +import datetime +import mgr_util + +import pytest + + +@pytest.mark.parametrize( + "delta, out", + [ + (datetime.timedelta(minutes=90), '90m'), + (datetime.timedelta(minutes=190), '3h'), + (datetime.timedelta(days=3), '3d'), + (datetime.timedelta(hours=3), '3h'), + (datetime.timedelta(days=365 * 3.1), '3y'), + (datetime.timedelta(minutes=90), '90m'), + ] +) +def test_pretty_timedelta(delta: datetime.timedelta, out: str): + assert mgr_util.to_pretty_timedelta(delta) == out diff --git a/src/pybind/mgr/tests/test_object_format.py b/src/pybind/mgr/tests/test_object_format.py new file mode 100644 index 000000000..d2fd20870 --- /dev/null +++ b/src/pybind/mgr/tests/test_object_format.py @@ -0,0 +1,582 @@ +import errno +from typing import ( + Any, + Dict, + Optional, + Tuple, + Type, + TypeVar, +) + +import pytest + +from mgr_module import CLICommand +import object_format + + +T = TypeVar("T", bound="Parent") + + +class Simpler: + def __init__(self, name, val=None): + self.name = name + self.val = val or {} + self.version = 1 + + def to_simplified(self) -> Dict[str, Any]: + return { + "version": self.version, + "name": self.name, + "value": self.val, + } + + +class JSONer(Simpler): + def to_json(self) -> Dict[str, Any]: + d = self.to_simplified() + d["_json"] = True + return d + + @classmethod + def from_json(cls: Type[T], data) -> T: + o = cls(data.get("name", ""), data.get("value")) + o.version = data.get("version", 1) + 1 + return o + + +class YAMLer(Simpler): + def to_yaml(self) -> Dict[str, Any]: + d = self.to_simplified() + d["_yaml"] = True + return d + + +@pytest.mark.parametrize( + "obj, compatible, json_val", + [ + ({}, False, "{}"), + ({"name": "foobar"}, False, '{"name": "foobar"}'), + ([1, 2, 3], False, "[1, 2, 3]"), + (JSONer("bob"), False, '{"name": "bob", "value": {}, "version": 1}'), + ( + JSONer("betty", 77), + False, + '{"name": "betty", "value": 77, "version": 1}', + ), + ({}, True, "{}"), + ({"name": "foobar"}, True, '{"name": "foobar"}'), + ( + JSONer("bob"), + True, + '{"_json": true, "name": "bob", "value": {}, "version": 1}', + ), + ], +) +def test_format_json(obj: Any, compatible: bool, json_val: str): + assert ( + object_format.ObjectFormatAdapter( + obj, compatible=compatible, json_indent=None + ).format_json() + == json_val + ) + + +@pytest.mark.parametrize( + "obj, compatible, yaml_val", + [ + ({}, False, "{}\n"), + ({"name": "foobar"}, False, "name: foobar\n"), + ( + {"stuff": [1, 88, 909, 32]}, + False, + "stuff:\n- 1\n- 88\n- 909\n- 32\n", + ), + ( + JSONer("zebulon", "999"), + False, + "name: zebulon\nvalue: '999'\nversion: 1\n", + ), + ({}, True, "{}\n"), + ({"name": "foobar"}, True, "name: foobar\n"), + ( + YAMLer("thingy", "404"), + True, + "_yaml: true\nname: thingy\nvalue: '404'\nversion: 1\n", + ), + ], +) +def test_format_yaml(obj: Any, compatible: bool, yaml_val: str): + assert ( + object_format.ObjectFormatAdapter( + obj, compatible=compatible + ).format_yaml() + == yaml_val + ) + + +class Retty: + def __init__(self, v) -> None: + self.value = v + + def mgr_return_value(self) -> int: + return self.value + + +@pytest.mark.parametrize( + "obj, ret", + [ + ({}, 0), + ({"fish": "sticks"}, 0), + (-55, 0), + (Retty(0), 0), + (Retty(-55), -55), + ], +) +def test_return_value(obj: Any, ret: int): + rva = object_format.ReturnValueAdapter(obj) + # a ReturnValueAdapter instance meets the ReturnValueProvider protocol. + assert object_format._is_return_value_provider(rva) + assert rva.mgr_return_value() == ret + + +def test_valid_formats(): + ofa = object_format.ObjectFormatAdapter({"fred": "wilma"}) + vf = ofa.valid_formats() + assert "json" in vf + assert "yaml" in vf + assert "xml" in vf + assert "plain" in vf + + +def test_error_response_exceptions(): + err = object_format.ErrorResponseBase() + with pytest.raises(NotImplementedError): + err.format_response() + + err = object_format.UnsupportedFormat("cheese") + assert err.format_response() == (-22, "", "Unsupported format: cheese") + + err = object_format.UnknownFormat("chocolate") + assert err.format_response() == (-22, "", "Unknown format name: chocolate") + + +@pytest.mark.parametrize( + "value, format, result", + [ + ({}, None, (0, "{}", "")), + ({"blat": True}, "json", (0, '{\n "blat": true\n}', "")), + ({"blat": True}, "yaml", (0, "blat: true\n", "")), + ({"blat": True}, "toml", (-22, "", "Unknown format name: toml")), + ({"blat": True}, "xml", (-22, "", "Unsupported format: xml")), + ( + JSONer("hoop", "303"), + "yaml", + (0, "name: hoop\nvalue: '303'\nversion: 1\n", ""), + ), + ], +) +def test_responder_decorator_default( + value: Any, format: Optional[str], result: Tuple[int, str, str] +) -> None: + @object_format.Responder() + def orf_value(format: Optional[str] = None): + return value + + assert orf_value(format=format) == result + + +class PhonyMultiYAMLFormatAdapter(object_format.ObjectFormatAdapter): + """This adapter puts a yaml document/directive separator line + before all output. It doesn't actully support multiple documents. + """ + def format_yaml(self): + yml = super().format_yaml() + return "---\n{}".format(yml) + + +@pytest.mark.parametrize( + "value, format, result", + [ + ({}, None, (0, "{}", "")), + ({"blat": True}, "json", (0, '{\n "blat": true\n}', "")), + ({"blat": True}, "yaml", (0, "---\nblat: true\n", "")), + ({"blat": True}, "toml", (-22, "", "Unknown format name: toml")), + ({"blat": True}, "xml", (-22, "", "Unsupported format: xml")), + ( + JSONer("hoop", "303"), + "yaml", + (0, "---\nname: hoop\nvalue: '303'\nversion: 1\n", ""), + ), + ], +) +def test_responder_decorator_custom( + value: Any, format: Optional[str], result: Tuple[int, str, str] +) -> None: + @object_format.Responder(PhonyMultiYAMLFormatAdapter) + def orf_value(format: Optional[str] = None): + return value + + assert orf_value(format=format) == result + + +class FancyDemoAdapter(PhonyMultiYAMLFormatAdapter): + """This adapter demonstrates adding formatting for other formats + like xml and plain text. + """ + def format_xml(self) -> str: + name = self.obj.get("name") + size = self.obj.get("size") + return f'' + + def format_plain(self) -> str: + name = self.obj.get("name") + size = self.obj.get("size") + es = 'es' if size != 1 else '' + return f"{size} box{es} of {name}" + + +class DecoDemo: + """Class to stand in for a mgr module, used to test CLICommand integration.""" + + @CLICommand("alpha one", perm="rw") + @object_format.Responder() + def alpha_one(self, name: str = "default") -> Dict[str, str]: + return { + "alpha": "one", + "name": name, + "weight": 300, + } + + @CLICommand("beta two", perm="r") + @object_format.Responder() + def beta_two( + self, name: str = "default", format: Optional[str] = None + ) -> Dict[str, str]: + return { + "beta": "two", + "name": name, + "weight": 72, + } + + @CLICommand("gamma three", perm="rw") + @object_format.Responder(FancyDemoAdapter) + def gamma_three(self, size: int = 0) -> Dict[str, Any]: + return {"name": "funnystuff", "size": size} + + @CLICommand("z_err", perm="rw") + @object_format.ErrorResponseHandler() + def z_err(self, name: str = "default") -> Tuple[int, str, str]: + if "z" in name: + raise object_format.ErrorResponse(f"{name} bad") + return 0, name, "" + + @CLICommand("empty one", perm="rw") + @object_format.EmptyResponder() + def empty_one(self, name: str = "default", retval: Optional[int] = None) -> None: + # in real code, this would be making some sort of state change + # but we need to handle erors still + if retval is None: + retval = -5 + if name in ["pow"]: + raise object_format.ErrorResponse(name, return_value=retval) + return + + @CLICommand("empty bad", perm="rw") + @object_format.EmptyResponder() + def empty_bad(self, name: str = "default") -> int: + # in real code, this would be making some sort of state change + return 5 + + +@pytest.mark.parametrize( + "prefix, can_format, args, response", + [ + ( + "alpha one", + True, + {"name": "moonbase"}, + ( + 0, + '{\n "alpha": "one",\n "name": "moonbase",\n "weight": 300\n}', + "", + ), + ), + # --- + ( + "alpha one", + True, + {"name": "moonbase2", "format": "yaml"}, + ( + 0, + "alpha: one\nname: moonbase2\nweight: 300\n", + "", + ), + ), + # --- + ( + "alpha one", + True, + {"name": "moonbase2", "format": "chocolate"}, + ( + -22, + "", + "Unknown format name: chocolate", + ), + ), + # --- + ( + "beta two", + True, + {"name": "blocker"}, + ( + 0, + '{\n "beta": "two",\n "name": "blocker",\n "weight": 72\n}', + "", + ), + ), + # --- + ( + "beta two", + True, + {"name": "test", "format": "yaml"}, + ( + 0, + "beta: two\nname: test\nweight: 72\n", + "", + ), + ), + # --- + ( + "beta two", + True, + {"name": "test", "format": "plain"}, + ( + -22, + "", + "Unsupported format: plain", + ), + ), + # --- + ( + "gamma three", + True, + {}, + ( + 0, + '{\n "name": "funnystuff",\n "size": 0\n}', + "", + ), + ), + # --- + ( + "gamma three", + True, + {"size": 1, "format": "json"}, + ( + 0, + '{\n "name": "funnystuff",\n "size": 1\n}', + "", + ), + ), + # --- + ( + "gamma three", + True, + {"size": 1, "format": "plain"}, + ( + 0, + "1 box of funnystuff", + "", + ), + ), + # --- + ( + "gamma three", + True, + {"size": 2, "format": "plain"}, + ( + 0, + "2 boxes of funnystuff", + "", + ), + ), + # --- + ( + "gamma three", + True, + {"size": 2, "format": "xml"}, + ( + 0, + '', + "", + ), + ), + # --- + ( + "gamma three", + True, + {"size": 2, "format": "toml"}, + ( + -22, + "", + "Unknown format name: toml", + ), + ), + # --- + ( + "z_err", + False, + {"name": "foobar"}, + ( + 0, + "foobar", + "", + ), + ), + # --- + ( + "z_err", + False, + {"name": "zamboni"}, + ( + -22, + "", + "zamboni bad", + ), + ), + # --- + ( + "empty one", + False, + {"name": "zucchini"}, + ( + 0, + "", + "", + ), + ), + # --- + ( + "empty one", + False, + {"name": "pow"}, + ( + -5, + "", + "pow", + ), + ), + # Ensure setting return_value to zero even on an exception is honored + ( + "empty one", + False, + {"name": "pow", "retval": 0}, + ( + 0, + "", + "pow", + ), + ), + ], +) +def test_cli_with_decorators(prefix, can_format, args, response): + dd = DecoDemo() + cmd = CLICommand.COMMANDS[prefix] + assert cmd.call(dd, args, None) == response + # slighly hacky way to check that the CLI "knows" about a --format option + # checking the extra_args feature of the Decorators that provide them (Responder) + if can_format: + assert 'name=format,' in cmd.args + + +def test_error_response(): + e1 = object_format.ErrorResponse("nope") + assert e1.format_response() == (-22, "", "nope") + assert e1.return_value == -22 + assert e1.errno == 22 + assert "ErrorResponse" in repr(e1) + assert "nope" in repr(e1) + assert e1.mgr_return_value() == -22 + + try: + open("/this/is_/extremely_/unlikely/_to/exist.txt") + except Exception as e: + e2 = object_format.ErrorResponse.wrap(e) + r = e2.format_response() + assert r[0] == -errno.ENOENT + assert r[1] == "" + assert "No such file or directory" in r[2] + assert "ErrorResponse" in repr(e2) + assert "No such file or directory" in repr(e2) + assert r[0] == e2.mgr_return_value() + + e3 = object_format.ErrorResponse.wrap(RuntimeError("blat")) + r = e3.format_response() + assert r[0] == -errno.EINVAL + assert r[1] == "" + assert "blat" in r[2] + assert r[0] == e3.mgr_return_value() + + # A custom exception type with an errno property + + class MyCoolException(Exception): + def __init__(self, err_msg: str, errno: int = 0) -> None: + super().__init__(errno, err_msg) + self.errno = errno + self.err_msg = err_msg + + def __str__(self) -> str: + return self.err_msg + + e4 = object_format.ErrorResponse.wrap(MyCoolException("beep", -17)) + r = e4.format_response() + assert r[0] == -17 + assert r[1] == "" + assert r[2] == "beep" + assert e4.mgr_return_value() == -17 + + e5 = object_format.ErrorResponse.wrap(MyCoolException("ok, fine", 0)) + r = e5.format_response() + assert r[0] == 0 + assert r[1] == "" + assert r[2] == "ok, fine" + + e5 = object_format.ErrorResponse.wrap(MyCoolException("no can do", 8)) + r = e5.format_response() + assert r[0] == -8 + assert r[1] == "" + assert r[2] == "no can do" + + # A custom exception type that inherits from ErrorResponseBase + + class MyErrorResponse(object_format.ErrorResponseBase): + def __init__(self, err_msg: str, return_value: int): + super().__init__(self, err_msg) + self.msg = err_msg + self.return_value = return_value + + def format_response(self): + return self.return_value, "", self.msg + + + e6 = object_format.ErrorResponse.wrap(MyErrorResponse("yeah, sure", 0)) + r = e6.format_response() + assert r[0] == 0 + assert r[1] == "" + assert r[2] == "yeah, sure" + assert isinstance(e5, object_format.ErrorResponseBase) + assert isinstance(e6, MyErrorResponse) + + e7 = object_format.ErrorResponse.wrap(MyErrorResponse("no can do", -8)) + r = e7.format_response() + assert r[0] == -8 + assert r[1] == "" + assert r[2] == "no can do" + assert isinstance(e7, object_format.ErrorResponseBase) + assert isinstance(e7, MyErrorResponse) + + +def test_empty_responder_return_check(): + dd = DecoDemo() + with pytest.raises(ValueError): + CLICommand.COMMANDS["empty bad"].call(dd, {}, None) diff --git a/src/pybind/mgr/tests/test_tls.py b/src/pybind/mgr/tests/test_tls.py new file mode 100644 index 000000000..19ce46a93 --- /dev/null +++ b/src/pybind/mgr/tests/test_tls.py @@ -0,0 +1,55 @@ +from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException, get_cert_issuer_info +from OpenSSL import crypto, SSL + +import unittest + + +valid_ceph_cert = """-----BEGIN CERTIFICATE-----\nMIICxjCCAa4CEQCpHIQuSYhCII1J0SVGYnT1MA0GCSqGSIb3DQEBDQUAMCExDTAL\nBgNVBAoMBENlcGgxEDAOBgNVBAMMB2NlcGhhZG0wHhcNMjIwNzA2MTE1MjUyWhcN\nMzIwNzAzMTE1MjUyWjAhMQ0wCwYDVQQKDARDZXBoMRAwDgYDVQQDDAdjZXBoYWRt\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAn2ApFna2CVYE7RDtjJVk\ncJTcJQrjzDOlCoZtxb1QMCQZMXjx/7d6bseQP+dkkeA0hZxnjJZWeu6c/YnQ1JiT\n2aDuDpWoJAaiinHRJyZuY5tqG+ggn95RdToZVbeC+0uALzYi4UFacC3sfpkyIKBR\nic43+2fQNz0PZ+8INSTtm75Y53gbWuGF7Dv95200AmAN2/u8LKWZIvdhbRborxOF\nlK2T40qbj9eH3ewIN/6Eibxrvg4va3pIoOaq0XdJHAL/MjDGJAtahPIenwcjuega\n4PSlB0h3qiyFXz7BG8P0QsPP6slyD58ZJtCGtJiWPOhlq47DlnWlJzRGDEFLLryf\n8wIDAQABMA0GCSqGSIb3DQEBDQUAA4IBAQBixd7RZawlYiTZaCmv3Vy7X/hhabac\nE/YiuFt1YMe0C9+D8IcCQN/IRww/Bi7Af6tm+ncHT9GsOGWX6hahXDKTw3b9nSDi\nETvjkUTYOayZGfhYpRA6m6e/2ypcUYsiXRDY9zneDKCdPREIA1D6L2fROHetFX9r\nX9rSry01xrYwNlYA1e6GLMXm2NaGsLT3JJlRBtT3P7f1jtRGXcwkc7ns0AtW0uNj\nGqRLHfJazdgWJFsj8vBdMs7Ci0C/b5/f7J/DLpPCvUA3Fqwn9MzHl01UwlDsKy1a\nROi4cfQNOLbWX8g3PfIlqtdGYNA77UPxvy1SUimmtdopZaEVWKkqeWYK\n-----END CERTIFICATE-----\n +""" + +invalid_cert = """-----BEGIN CERTIFICATE-----\nMIICxjCCAa4CEQCpHIQuSYhCII1J0SVGYnT1MA0GCSqGSIb3DQEBDQUAMCExDTAL\nBgNVBAoMBENlcGgxEDAOBgNVBAMMB2NlcGhhZG0wHhcNMjIwNzA2MTE1MjUyWhcN\nMzIwNzAzMTE1MjUyWjAhMQ0wCwYDVQQKDARDZXBoMRAwDgYDVQQDDAdjZXBoYWRt\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEBn2ApFna2CVYE7RDtjJVk\ncJTcJQrjzDOlCoZtxb1QMCQZMXjx/7d6bseQP+dkkeA0hZxnjJZWeu6c/YnQ1JiT\n2aDuDpWoJAaiinHRJyZuY5tqG+ggn95RdToZVbeC+0uALzYi4UFacC3sfpkyIKBR\nic43+2fQNz0PZ+8INSTtm75Y53gbWuGF7Dv95200AmAN2/u8LKWZIvdhbRborxOF\nlK2T40qbj9eH3ewIN/6Eibxrvg4va3pIoOaq0XdJHAL/MjDGJAtahPIenwcjuega\n4PSlB0h3qiyFXz7BG8P0QsPP6slyD58ZJtCGtJiWPOhlq47DlnWlJzRGDEFLLryf\n8wIDAQABMA0GCSqGSIb3DQEBDQUAA4IBAQBixd7RZawlYiTZaCmv3Vy7X/hhabac\nE/YiuFt1YMe0C9+D8IcCQN/IRww/Bi7Af6tm+ncHT9GsOGWX6hahXDKTw3b9nSDi\nETvjkUTYOayZGfhYpRA6m6e/2ypcUYsiXRDY9zneDKCdPREIA1D6L2fROHetFX9r\nX9rSry01xrYwNlYA1e6GLMXm2NaGsLT3JJlRBtT3P7f1jtRGXcwkc7ns0AtW0uNj\nGqRLHfJazdgWJFsj8vBdMs7Ci0C/b5/f7J/DLpPCvUA3Fqwn9MzHl01UwlDsKy1a\nROi4cfQNOLbWX8g3PfIlqtdGYNA77UPxvy1SUimmtdopZa\n-----END CERTIFICATE-----\n +""" + +class TLSchecks(unittest.TestCase): + + def test_defaults(self): + crt, key = create_self_signed_cert() + verify_tls(crt, key) + + def test_specific_dname(self): + crt, key = create_self_signed_cert(dname={'O': 'Ceph', 'OU': 'testsuite'}) + verify_tls(crt, key) + + def test_invalid_RDN(self): + self.assertRaises(ValueError, create_self_signed_cert, + dname={'O': 'Ceph', 'Bogus': 'testsuite'}) + + def test_invalid_key(self): + crt, key = create_self_signed_cert() + + # fudge the key, to force an error to be detected during verify_tls + fudged = f"{key[:-35]}c0ffee==\n{key[-25:]}".encode('utf-8') + self.assertRaises(ServerConfigException, verify_tls, crt, fudged) + + def test_mismatched_tls(self): + crt, _ = create_self_signed_cert() + + # generate another key + new_key = crypto.PKey() + new_key.generate_key(crypto.TYPE_RSA, 2048) + new_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, new_key).decode('utf-8') + + self.assertRaises(ServerConfigException, verify_tls, crt, new_key) + + def test_get_cert_issuer_info(self): + + # valid certificate + org, cn = get_cert_issuer_info(valid_ceph_cert) + assert org == 'Ceph' + assert cn == 'cephadm' + + # empty certificate + self.assertRaises(ServerConfigException, get_cert_issuer_info, '') + + # invalid certificate + self.assertRaises(ServerConfigException, get_cert_issuer_info, invalid_cert) -- cgit v1.2.3