import errno
from typing import (
Any,
Dict,
Optional,
Tuple,
Type,
TypeVar,
)
import pytest
from mgr_module import CLICommand
import object_format
T = TypeVar("T", bound="Parent")
class Simpler:
def __init__(self, name, val=None):
self.name = name
self.val = val or {}
self.version = 1
def to_simplified(self) -> Dict[str, Any]:
return {
"version": self.version,
"name": self.name,
"value": self.val,
}
class JSONer(Simpler):
def to_json(self) -> Dict[str, Any]:
d = self.to_simplified()
d["_json"] = True
return d
@classmethod
def from_json(cls: Type[T], data) -> T:
o = cls(data.get("name", ""), data.get("value"))
o.version = data.get("version", 1) + 1
return o
class YAMLer(Simpler):
def to_yaml(self) -> Dict[str, Any]:
d = self.to_simplified()
d["_yaml"] = True
return d
@pytest.mark.parametrize(
"obj, compatible, json_val",
[
({}, False, "{}"),
({"name": "foobar"}, False, '{"name": "foobar"}'),
([1, 2, 3], False, "[1, 2, 3]"),
(JSONer("bob"), False, '{"name": "bob", "value": {}, "version": 1}'),
(
JSONer("betty", 77),
False,
'{"name": "betty", "value": 77, "version": 1}',
),
({}, True, "{}"),
({"name": "foobar"}, True, '{"name": "foobar"}'),
(
JSONer("bob"),
True,
'{"_json": true, "name": "bob", "value": {}, "version": 1}',
),
],
)
def test_format_json(obj: Any, compatible: bool, json_val: str):
assert (
object_format.ObjectFormatAdapter(
obj, compatible=compatible, json_indent=None
).format_json()
== json_val
)
@pytest.mark.parametrize(
"obj, compatible, yaml_val",
[
({}, False, "{}\n"),
({"name": "foobar"}, False, "name: foobar\n"),
(
{"stuff": [1, 88, 909, 32]},
False,
"stuff:\n- 1\n- 88\n- 909\n- 32\n",
),
(
JSONer("zebulon", "999"),
False,
"name: zebulon\nvalue: '999'\nversion: 1\n",
),
({}, True, "{}\n"),
({"name": "foobar"}, True, "name: foobar\n"),
(
YAMLer("thingy", "404"),
True,
"_yaml: true\nname: thingy\nvalue: '404'\nversion: 1\n",
),
],
)
def test_format_yaml(obj: Any, compatible: bool, yaml_val: str):
assert (
object_format.ObjectFormatAdapter(
obj, compatible=compatible
).format_yaml()
== yaml_val
)
class Retty:
def __init__(self, v, status="") -> None:
self.value = v
self.status = status
def mgr_return_value(self) -> int:
return self.value
def mgr_status_value(self) -> str:
if self.status:
return self.status
return "NOPE"
@pytest.mark.parametrize(
"obj, ret",
[
({}, 0),
({"fish": "sticks"}, 0),
(-55, 0),
(Retty(0), 0),
(Retty(-55), -55),
],
)
def test_return_value(obj: Any, ret: int):
rva = object_format.ReturnValueAdapter(obj)
# a ReturnValueAdapter instance meets the ReturnValueProvider protocol.
assert object_format._is_return_value_provider(rva)
assert rva.mgr_return_value() == ret
@pytest.mark.parametrize(
"obj, ret",
[
({}, ""),
({"fish": "sticks"}, ""),
(-55, ""),
(Retty(0), "NOPE"),
(Retty(-55, "cake"), "cake"),
(Retty(-50, "pie"), "pie"),
],
)
def test_return_status(obj: Any, ret: str):
rva = object_format.StatusValueAdapter(obj)
# a StatusValueAdapter instance meets the StatusValueProvider protocol.
assert object_format._is_status_value_provider(rva)
assert rva.mgr_status_value() == ret
def test_valid_formats():
ofa = object_format.ObjectFormatAdapter({"fred": "wilma"})
vf = ofa.valid_formats()
assert "json" in vf
assert "yaml" in vf
assert "xml" in vf
assert "plain" in vf
def test_error_response_exceptions():
err = object_format.ErrorResponseBase()
with pytest.raises(NotImplementedError):
err.format_response()
err = object_format.UnsupportedFormat("cheese")
assert err.format_response() == (-22, "", "Unsupported format: cheese")
err = object_format.UnknownFormat("chocolate")
assert err.format_response() == (-22, "", "Unknown format name: chocolate")
@pytest.mark.parametrize(
"value, format, result",
[
({}, None, (0, "{}", "")),
({"blat": True}, "json", (0, '{\n "blat": true\n}', "")),
({"blat": True}, "yaml", (0, "blat: true\n", "")),
({"blat": True}, "toml", (-22, "", "Unknown format name: toml")),
({"blat": True}, "xml", (-22, "", "Unsupported format: xml")),
(
JSONer("hoop", "303"),
"yaml",
(0, "name: hoop\nvalue: '303'\nversion: 1\n", ""),
),
],
)
def test_responder_decorator_default(
value: Any, format: Optional[str], result: Tuple[int, str, str]
) -> None:
@object_format.Responder()
def orf_value(format: Optional[str] = None):
return value
assert orf_value(format=format) == result
class PhonyMultiYAMLFormatAdapter(object_format.ObjectFormatAdapter):
"""This adapter puts a yaml document/directive separator line
before all output. It doesn't actully support multiple documents.
"""
def format_yaml(self):
yml = super().format_yaml()
return "---\n{}".format(yml)
@pytest.mark.parametrize(
"value, format, result",
[
({}, None, (0, "{}", "")),
({"blat": True}, "json", (0, '{\n "blat": true\n}', "")),
({"blat": True}, "yaml", (0, "---\nblat: true\n", "")),
({"blat": True}, "toml", (-22, "", "Unknown format name: toml")),
({"blat": True}, "xml", (-22, "", "Unsupported format: xml")),
(
JSONer("hoop", "303"),
"yaml",
(0, "---\nname: hoop\nvalue: '303'\nversion: 1\n", ""),
),
],
)
def test_responder_decorator_custom(
value: Any, format: Optional[str], result: Tuple[int, str, str]
) -> None:
@object_format.Responder(PhonyMultiYAMLFormatAdapter)
def orf_value(format: Optional[str] = None):
return value
assert orf_value(format=format) == result
class FancyDemoAdapter(PhonyMultiYAMLFormatAdapter):
"""This adapter demonstrates adding formatting for other formats
like xml and plain text.
"""
def format_xml(self) -> str:
name = self.obj.get("name")
size = self.obj.get("size")
return f''
def format_plain(self) -> str:
name = self.obj.get("name")
size = self.obj.get("size")
es = 'es' if size != 1 else ''
return f"{size} box{es} of {name}"
class DecoDemo:
"""Class to stand in for a mgr module, used to test CLICommand integration."""
@CLICommand("alpha one", perm="rw")
@object_format.Responder()
def alpha_one(self, name: str = "default") -> Dict[str, str]:
return {
"alpha": "one",
"name": name,
"weight": 300,
}
@CLICommand("beta two", perm="r")
@object_format.Responder()
def beta_two(
self, name: str = "default", format: Optional[str] = None
) -> Dict[str, str]:
return {
"beta": "two",
"name": name,
"weight": 72,
}
@CLICommand("gamma three", perm="rw")
@object_format.Responder(FancyDemoAdapter)
def gamma_three(self, size: int = 0) -> Dict[str, Any]:
return {"name": "funnystuff", "size": size}
@CLICommand("z_err", perm="rw")
@object_format.ErrorResponseHandler()
def z_err(self, name: str = "default") -> Tuple[int, str, str]:
if "z" in name:
raise object_format.ErrorResponse(f"{name} bad")
return 0, name, ""
@CLICommand("empty one", perm="rw")
@object_format.EmptyResponder()
def empty_one(self, name: str = "default", retval: Optional[int] = None) -> None:
# in real code, this would be making some sort of state change
# but we need to handle erors still
if retval is None:
retval = -5
if name in ["pow"]:
raise object_format.ErrorResponse(name, return_value=retval)
return
@CLICommand("empty bad", perm="rw")
@object_format.EmptyResponder()
def empty_bad(self, name: str = "default") -> int:
# in real code, this would be making some sort of state change
return 5
@pytest.mark.parametrize(
"prefix, can_format, args, response",
[
(
"alpha one",
True,
{"name": "moonbase"},
(
0,
'{\n "alpha": "one",\n "name": "moonbase",\n "weight": 300\n}',
"",
),
),
# ---
(
"alpha one",
True,
{"name": "moonbase2", "format": "yaml"},
(
0,
"alpha: one\nname: moonbase2\nweight: 300\n",
"",
),
),
# ---
(
"alpha one",
True,
{"name": "moonbase2", "format": "chocolate"},
(
-22,
"",
"Unknown format name: chocolate",
),
),
# ---
(
"beta two",
True,
{"name": "blocker"},
(
0,
'{\n "beta": "two",\n "name": "blocker",\n "weight": 72\n}',
"",
),
),
# ---
(
"beta two",
True,
{"name": "test", "format": "yaml"},
(
0,
"beta: two\nname: test\nweight: 72\n",
"",
),
),
# ---
(
"beta two",
True,
{"name": "test", "format": "plain"},
(
-22,
"",
"Unsupported format: plain",
),
),
# ---
(
"gamma three",
True,
{},
(
0,
'{\n "name": "funnystuff",\n "size": 0\n}',
"",
),
),
# ---
(
"gamma three",
True,
{"size": 1, "format": "json"},
(
0,
'{\n "name": "funnystuff",\n "size": 1\n}',
"",
),
),
# ---
(
"gamma three",
True,
{"size": 1, "format": "plain"},
(
0,
"1 box of funnystuff",
"",
),
),
# ---
(
"gamma three",
True,
{"size": 2, "format": "plain"},
(
0,
"2 boxes of funnystuff",
"",
),
),
# ---
(
"gamma three",
True,
{"size": 2, "format": "xml"},
(
0,
'',
"",
),
),
# ---
(
"gamma three",
True,
{"size": 2, "format": "toml"},
(
-22,
"",
"Unknown format name: toml",
),
),
# ---
(
"z_err",
False,
{"name": "foobar"},
(
0,
"foobar",
"",
),
),
# ---
(
"z_err",
False,
{"name": "zamboni"},
(
-22,
"",
"zamboni bad",
),
),
# ---
(
"empty one",
False,
{"name": "zucchini"},
(
0,
"",
"",
),
),
# ---
(
"empty one",
False,
{"name": "pow"},
(
-5,
"",
"pow",
),
),
# Ensure setting return_value to zero even on an exception is honored
(
"empty one",
False,
{"name": "pow", "retval": 0},
(
0,
"",
"pow",
),
),
],
)
def test_cli_with_decorators(prefix, can_format, args, response):
dd = DecoDemo()
cmd = CLICommand.COMMANDS[prefix]
assert cmd.call(dd, args, None) == response
# slighly hacky way to check that the CLI "knows" about a --format option
# checking the extra_args feature of the Decorators that provide them (Responder)
if can_format:
assert 'name=format,' in cmd.args
def test_error_response():
e1 = object_format.ErrorResponse("nope")
assert e1.format_response() == (-22, "", "nope")
assert e1.return_value == -22
assert e1.errno == 22
assert "ErrorResponse" in repr(e1)
assert "nope" in repr(e1)
assert e1.mgr_return_value() == -22
try:
open("/this/is_/extremely_/unlikely/_to/exist.txt")
except Exception as e:
e2 = object_format.ErrorResponse.wrap(e)
r = e2.format_response()
assert r[0] == -errno.ENOENT
assert r[1] == ""
assert "No such file or directory" in r[2]
assert "ErrorResponse" in repr(e2)
assert "No such file or directory" in repr(e2)
assert r[0] == e2.mgr_return_value()
e3 = object_format.ErrorResponse.wrap(RuntimeError("blat"))
r = e3.format_response()
assert r[0] == -errno.EINVAL
assert r[1] == ""
assert "blat" in r[2]
assert r[0] == e3.mgr_return_value()
# A custom exception type with an errno property
class MyCoolException(Exception):
def __init__(self, err_msg: str, errno: int = 0) -> None:
super().__init__(errno, err_msg)
self.errno = errno
self.err_msg = err_msg
def __str__(self) -> str:
return self.err_msg
e4 = object_format.ErrorResponse.wrap(MyCoolException("beep", -17))
r = e4.format_response()
assert r[0] == -17
assert r[1] == ""
assert r[2] == "beep"
assert e4.mgr_return_value() == -17
e5 = object_format.ErrorResponse.wrap(MyCoolException("ok, fine", 0))
r = e5.format_response()
assert r[0] == 0
assert r[1] == ""
assert r[2] == "ok, fine"
e5 = object_format.ErrorResponse.wrap(MyCoolException("no can do", 8))
r = e5.format_response()
assert r[0] == -8
assert r[1] == ""
assert r[2] == "no can do"
# A custom exception type that inherits from ErrorResponseBase
class MyErrorResponse(object_format.ErrorResponseBase):
def __init__(self, err_msg: str, return_value: int):
super().__init__(self, err_msg)
self.msg = err_msg
self.return_value = return_value
def format_response(self):
return self.return_value, "", self.msg
e6 = object_format.ErrorResponse.wrap(MyErrorResponse("yeah, sure", 0))
r = e6.format_response()
assert r[0] == 0
assert r[1] == ""
assert r[2] == "yeah, sure"
assert isinstance(e5, object_format.ErrorResponseBase)
assert isinstance(e6, MyErrorResponse)
e7 = object_format.ErrorResponse.wrap(MyErrorResponse("no can do", -8))
r = e7.format_response()
assert r[0] == -8
assert r[1] == ""
assert r[2] == "no can do"
assert isinstance(e7, object_format.ErrorResponseBase)
assert isinstance(e7, MyErrorResponse)
def test_empty_responder_return_check():
dd = DecoDemo()
with pytest.raises(ValueError):
CLICommand.COMMANDS["empty bad"].call(dd, {}, None)