summaryrefslogtreecommitdiffstats
path: root/src/cephadm/tests/test_util_funcs.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/cephadm/tests/test_util_funcs.py')
-rw-r--r--src/cephadm/tests/test_util_funcs.py808
1 files changed, 808 insertions, 0 deletions
diff --git a/src/cephadm/tests/test_util_funcs.py b/src/cephadm/tests/test_util_funcs.py
new file mode 100644
index 000000000..270753a55
--- /dev/null
+++ b/src/cephadm/tests/test_util_funcs.py
@@ -0,0 +1,808 @@
+# Tests for various assorted utility functions found within cephadm
+#
+from unittest import mock
+
+import functools
+import io
+import os
+import sys
+
+import pytest
+
+from tests.fixtures import with_cephadm_ctx, import_cephadm
+
+_cephadm = import_cephadm()
+
+
+class TestCopyTree:
+ def _copy_tree(self, *args, **kwargs):
+ with with_cephadm_ctx([]) as ctx:
+ with mock.patch("cephadm.extract_uid_gid") as eug:
+ eug.return_value = (os.getuid(), os.getgid())
+ _cephadm.copy_tree(ctx, *args, **kwargs)
+
+ def test_one_dir(self, tmp_path):
+ """Copy one dir into a non-existing dest dir."""
+ src1 = tmp_path / "src1"
+ dst = tmp_path / "dst"
+ src1.mkdir(parents=True)
+
+ with (src1 / "foo.txt").open("w") as fh:
+ fh.write("hello\n")
+ fh.write("earth\n")
+
+ assert not (dst / "foo.txt").exists()
+
+ self._copy_tree([src1], dst)
+ assert (dst / "foo.txt").exists()
+
+ def test_one_existing_dir(self, tmp_path):
+ """Copy one dir into an existing dest dir."""
+ src1 = tmp_path / "src1"
+ dst = tmp_path / "dst"
+ src1.mkdir(parents=True)
+ dst.mkdir(parents=True)
+
+ with (src1 / "foo.txt").open("w") as fh:
+ fh.write("hello\n")
+ fh.write("earth\n")
+
+ assert not (dst / "src1").exists()
+
+ self._copy_tree([src1], dst)
+ assert (dst / "src1/foo.txt").exists()
+
+ def test_two_dirs(self, tmp_path):
+ """Copy two source directories into an existing dest dir."""
+ src1 = tmp_path / "src1"
+ src2 = tmp_path / "src2"
+ dst = tmp_path / "dst"
+ src1.mkdir(parents=True)
+ src2.mkdir(parents=True)
+ dst.mkdir(parents=True)
+
+ with (src1 / "foo.txt").open("w") as fh:
+ fh.write("hello\n")
+ fh.write("earth\n")
+ with (src2 / "bar.txt").open("w") as fh:
+ fh.write("goodbye\n")
+ fh.write("mars\n")
+
+ assert not (dst / "src1").exists()
+ assert not (dst / "src2").exists()
+
+ self._copy_tree([src1, src2], dst)
+ assert (dst / "src1/foo.txt").exists()
+ assert (dst / "src2/bar.txt").exists()
+
+ def test_one_dir_set_uid(self, tmp_path):
+ """Explicity pass uid/gid values and assert these are passed to chown."""
+ # Because this test will often be run by non-root users it is necessary
+ # to mock os.chown or we too easily run into perms issues.
+ src1 = tmp_path / "src1"
+ dst = tmp_path / "dst"
+ src1.mkdir(parents=True)
+
+ with (src1 / "foo.txt").open("w") as fh:
+ fh.write("hello\n")
+ fh.write("earth\n")
+
+ assert not (dst / "foo.txt").exists()
+
+ with mock.patch("os.chown") as _chown:
+ _chown.return_value = None
+ self._copy_tree([src1], dst, uid=0, gid=0)
+ assert len(_chown.mock_calls) >= 2
+ for c in _chown.mock_calls:
+ assert c == mock.call(mock.ANY, 0, 0)
+ assert (dst / "foo.txt").exists()
+
+
+class TestCopyFiles:
+ def _copy_files(self, *args, **kwargs):
+ with with_cephadm_ctx([]) as ctx:
+ with mock.patch("cephadm.extract_uid_gid") as eug:
+ eug.return_value = (os.getuid(), os.getgid())
+ _cephadm.copy_files(ctx, *args, **kwargs)
+
+ def test_one_file(self, tmp_path):
+ """Copy one file into the dest dir."""
+ file1 = tmp_path / "f1.txt"
+ dst = tmp_path / "dst"
+ dst.mkdir(parents=True)
+
+ with file1.open("w") as fh:
+ fh.write("its test time\n")
+
+ self._copy_files([file1], dst)
+ assert (dst / "f1.txt").exists()
+
+ def test_one_file_nodest(self, tmp_path):
+ """Copy one file to the given destination path."""
+ file1 = tmp_path / "f1.txt"
+ dst = tmp_path / "dst"
+
+ with file1.open("w") as fh:
+ fh.write("its test time\n")
+
+ self._copy_files([file1], dst)
+ assert not dst.is_dir()
+ assert dst.is_file()
+ assert dst.open("r").read() == "its test time\n"
+
+ def test_three_files(self, tmp_path):
+ """Copy one file into the dest dir."""
+ file1 = tmp_path / "f1.txt"
+ file2 = tmp_path / "f2.txt"
+ file3 = tmp_path / "f3.txt"
+ dst = tmp_path / "dst"
+ dst.mkdir(parents=True)
+
+ with file1.open("w") as fh:
+ fh.write("its test time\n")
+ with file2.open("w") as fh:
+ fh.write("f2\n")
+ with file3.open("w") as fh:
+ fh.write("f3\n")
+
+ self._copy_files([file1, file2, file3], dst)
+ assert (dst / "f1.txt").exists()
+ assert (dst / "f2.txt").exists()
+ assert (dst / "f3.txt").exists()
+
+ def test_three_files_nodest(self, tmp_path):
+ """Copy files to dest path (not a dir). This is not a useful operation."""
+ file1 = tmp_path / "f1.txt"
+ file2 = tmp_path / "f2.txt"
+ file3 = tmp_path / "f3.txt"
+ dst = tmp_path / "dst"
+
+ with file1.open("w") as fh:
+ fh.write("its test time\n")
+ with file2.open("w") as fh:
+ fh.write("f2\n")
+ with file3.open("w") as fh:
+ fh.write("f3\n")
+
+ self._copy_files([file1, file2, file3], dst)
+ assert not dst.is_dir()
+ assert dst.is_file()
+ assert dst.open("r").read() == "f3\n"
+
+ def test_one_file_set_uid(self, tmp_path):
+ """Explicity pass uid/gid values and assert these are passed to chown."""
+ # Because this test will often be run by non-root users it is necessary
+ # to mock os.chown or we too easily run into perms issues.
+ file1 = tmp_path / "f1.txt"
+ dst = tmp_path / "dst"
+ dst.mkdir(parents=True)
+
+ with file1.open("w") as fh:
+ fh.write("its test time\n")
+
+ assert not (dst / "f1.txt").exists()
+
+ with mock.patch("os.chown") as _chown:
+ _chown.return_value = None
+ self._copy_files([file1], dst, uid=0, gid=0)
+ assert len(_chown.mock_calls) >= 1
+ for c in _chown.mock_calls:
+ assert c == mock.call(mock.ANY, 0, 0)
+ assert (dst / "f1.txt").exists()
+
+
+class TestMoveFiles:
+ def _move_files(self, *args, **kwargs):
+ with with_cephadm_ctx([]) as ctx:
+ with mock.patch("cephadm.extract_uid_gid") as eug:
+ eug.return_value = (os.getuid(), os.getgid())
+ _cephadm.move_files(ctx, *args, **kwargs)
+
+ def test_one_file(self, tmp_path):
+ """Move a named file to test dest path."""
+ file1 = tmp_path / "f1.txt"
+ dst = tmp_path / "dst"
+
+ with file1.open("w") as fh:
+ fh.write("lets moove\n")
+
+ assert not dst.exists()
+ assert file1.is_file()
+
+ self._move_files([file1], dst)
+ assert dst.is_file()
+ assert not file1.exists()
+
+ def test_one_file_destdir(self, tmp_path):
+ """Move a file into an existing dest dir."""
+ file1 = tmp_path / "f1.txt"
+ dst = tmp_path / "dst"
+ dst.mkdir(parents=True)
+
+ with file1.open("w") as fh:
+ fh.write("lets moove\n")
+
+ assert not (dst / "f1.txt").exists()
+ assert file1.is_file()
+
+ self._move_files([file1], dst)
+ assert (dst / "f1.txt").is_file()
+ assert not file1.exists()
+
+ def test_one_file_one_link(self, tmp_path):
+ """Move a file and a symlink to that file to a dest dir."""
+ file1 = tmp_path / "f1.txt"
+ link1 = tmp_path / "lnk"
+ dst = tmp_path / "dst"
+ dst.mkdir(parents=True)
+
+ with file1.open("w") as fh:
+ fh.write("lets moove\n")
+ os.symlink("f1.txt", link1)
+
+ assert not (dst / "f1.txt").exists()
+ assert file1.is_file()
+ assert link1.exists()
+
+ self._move_files([file1, link1], dst)
+ assert (dst / "f1.txt").is_file()
+ assert (dst / "lnk").is_symlink()
+ assert not file1.exists()
+ assert not link1.exists()
+ assert (dst / "f1.txt").open("r").read() == "lets moove\n"
+ assert (dst / "lnk").open("r").read() == "lets moove\n"
+
+ def test_one_file_set_uid(self, tmp_path):
+ """Explicity pass uid/gid values and assert these are passed to chown."""
+ # Because this test will often be run by non-root users it is necessary
+ # to mock os.chown or we too easily run into perms issues.
+ file1 = tmp_path / "f1.txt"
+ dst = tmp_path / "dst"
+
+ with file1.open("w") as fh:
+ fh.write("lets moove\n")
+
+ assert not dst.exists()
+ assert file1.is_file()
+
+ with mock.patch("os.chown") as _chown:
+ _chown.return_value = None
+ self._move_files([file1], dst, uid=0, gid=0)
+ assert len(_chown.mock_calls) >= 1
+ for c in _chown.mock_calls:
+ assert c == mock.call(mock.ANY, 0, 0)
+ assert dst.is_file()
+ assert not file1.exists()
+
+
+def test_recursive_chown(tmp_path):
+ d1 = tmp_path / "dir1"
+ d2 = d1 / "dir2"
+ f1 = d2 / "file1.txt"
+ d2.mkdir(parents=True)
+
+ with f1.open("w") as fh:
+ fh.write("low down\n")
+
+ with mock.patch("os.chown") as _chown:
+ _chown.return_value = None
+ _cephadm.recursive_chown(str(d1), uid=500, gid=500)
+ assert len(_chown.mock_calls) == 3
+ assert _chown.mock_calls[0] == mock.call(str(d1), 500, 500)
+ assert _chown.mock_calls[1] == mock.call(str(d2), 500, 500)
+ assert _chown.mock_calls[2] == mock.call(str(f1), 500, 500)
+
+
+class TestFindExecutable:
+ def test_standard_exe(self):
+ # pretty much every system will have `true` on the path. It's a safe choice
+ # for the first assertion
+ exe = _cephadm.find_executable("true")
+ assert exe.endswith("true")
+
+ def test_custom_path(self, tmp_path):
+ foo_sh = tmp_path / "foo.sh"
+ with open(foo_sh, "w") as fh:
+ fh.write("#!/bin/sh\n")
+ fh.write("echo foo\n")
+ foo_sh.chmod(0o755)
+
+ exe = _cephadm.find_executable(foo_sh)
+ assert str(exe) == str(foo_sh)
+
+ def test_no_path(self, monkeypatch):
+ monkeypatch.delenv("PATH")
+ exe = _cephadm.find_executable("true")
+ assert exe.endswith("true")
+
+ def test_no_path_no_confstr(self, monkeypatch):
+ def _fail(_):
+ raise ValueError("fail")
+
+ monkeypatch.delenv("PATH")
+ monkeypatch.setattr("os.confstr", _fail)
+ exe = _cephadm.find_executable("true")
+ assert exe.endswith("true")
+
+ def test_unset_path(self):
+ exe = _cephadm.find_executable("true", path="")
+ assert exe is None
+
+ def test_no_such_exe(self):
+ exe = _cephadm.find_executable("foo_bar-baz.noway")
+ assert exe is None
+
+
+def test_find_program():
+ exe = _cephadm.find_program("true")
+ assert exe.endswith("true")
+
+ with pytest.raises(ValueError):
+ _cephadm.find_program("foo_bar-baz.noway")
+
+
+def _mk_fake_call(enabled, active):
+ def _fake_call(ctx, cmd, **kwargs):
+ if "is-enabled" in cmd:
+ if isinstance(enabled, Exception):
+ raise enabled
+ return enabled
+ if "is-active" in cmd:
+ if isinstance(active, Exception):
+ raise active
+ return active
+ raise ValueError("should not get here")
+
+ return _fake_call
+
+
+@pytest.mark.parametrize(
+ "enabled_out, active_out, expected",
+ [
+ (
+ # ok, all is well
+ ("", "", 0),
+ ("active", "", 0),
+ (True, "running", True),
+ ),
+ (
+ # disabled, unknown if active
+ ("disabled", "", 1),
+ ("", "", 0),
+ (False, "unknown", True),
+ ),
+ (
+ # is-enabled error (not disabled, unknown if active
+ ("bleh", "", 1),
+ ("", "", 0),
+ (False, "unknown", False),
+ ),
+ (
+ # is-enabled ok, inactive is stopped
+ ("", "", 0),
+ ("inactive", "", 0),
+ (True, "stopped", True),
+ ),
+ (
+ # is-enabled ok, failed is error
+ ("", "", 0),
+ ("failed", "", 0),
+ (True, "error", True),
+ ),
+ (
+ # is-enabled ok, auto-restart is error
+ ("", "", 0),
+ ("auto-restart", "", 0),
+ (True, "error", True),
+ ),
+ (
+ # error exec'ing is-enabled cmd
+ ValueError("bonk"),
+ ("active", "", 0),
+ (False, "running", False),
+ ),
+ (
+ # error exec'ing is-enabled cmd
+ ("", "", 0),
+ ValueError("blat"),
+ (True, "unknown", True),
+ ),
+ ],
+)
+def test_check_unit(enabled_out, active_out, expected):
+ with with_cephadm_ctx([]) as ctx:
+ _cephadm.call.side_effect = _mk_fake_call(
+ enabled=enabled_out,
+ active=active_out,
+ )
+ enabled, state, installed = _cephadm.check_unit(ctx, "foobar")
+ assert (enabled, state, installed) == expected
+
+
+class FakeEnabler:
+ def __init__(self, should_be_called):
+ self._should_be_called = should_be_called
+ self._services = []
+
+ def enable_service(self, service):
+ self._services.append(service)
+
+ def check_expected(self):
+ if not self._should_be_called:
+ assert not self._services
+ return
+ # there are currently seven chron/chrony type services that
+ # cephadm looks for. Make sure it probed for each of them
+ # or more in case someone adds to the list.
+ assert len(self._services) >= 7
+ assert "chrony.service" in self._services
+ assert "ntp.service" in self._services
+
+
+@pytest.mark.parametrize(
+ "call_fn, enabler, expected",
+ [
+ # Test that time sync services are not enabled
+ (
+ _mk_fake_call(
+ enabled=("", "", 1),
+ active=("", "", 1),
+ ),
+ None,
+ False,
+ ),
+ # Test that time sync service is enabled
+ (
+ _mk_fake_call(
+ enabled=("", "", 0),
+ active=("active", "", 0),
+ ),
+ None,
+ True,
+ ),
+ # Test that time sync is not enabled, and try to enable them.
+ # This one needs to be not running, but installed in order to
+ # call the enabler. It should call the enabler with every known
+ # service name.
+ (
+ _mk_fake_call(
+ enabled=("disabled", "", 1),
+ active=("", "", 1),
+ ),
+ FakeEnabler(True),
+ False,
+ ),
+ # Test that time sync is enabled, with an enabler passed which
+ # will check that the enabler was never called.
+ (
+ _mk_fake_call(
+ enabled=("", "", 0),
+ active=("active", "", 0),
+ ),
+ FakeEnabler(False),
+ True,
+ ),
+ ],
+)
+def test_check_time_sync(call_fn, enabler, expected):
+ """The check_time_sync call actually checks if a time synchronization service
+ is enabled. It is also the only consumer of check_units.
+ """
+ with with_cephadm_ctx([]) as ctx:
+ _cephadm.call.side_effect = call_fn
+ result = _cephadm.check_time_sync(ctx, enabler=enabler)
+ assert result == expected
+ if enabler is not None:
+ enabler.check_expected()
+
+
+@pytest.mark.parametrize(
+ "content, expected",
+ [
+ (
+ """#JUNK
+ FOO=1
+ """,
+ (None, None, None),
+ ),
+ (
+ """# A sample from a real centos system
+NAME="CentOS Stream"
+VERSION="8"
+ID="centos"
+ID_LIKE="rhel fedora"
+VERSION_ID="8"
+PLATFORM_ID="platform:el8"
+PRETTY_NAME="CentOS Stream 8"
+ANSI_COLOR="0;31"
+CPE_NAME="cpe:/o:centos:centos:8"
+HOME_URL="https://centos.org/"
+BUG_REPORT_URL="https://bugzilla.redhat.com/"
+REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux 8"
+REDHAT_SUPPORT_PRODUCT_VERSION="CentOS Stream"
+ """,
+ ("centos", "8", None),
+ ),
+ (
+ """# Minimal but complete, made up vals
+ID="hpec"
+VERSION_ID="33"
+VERSION_CODENAME="hpec nimda"
+ """,
+ ("hpec", "33", "hpec nimda"),
+ ),
+ (
+ """# Minimal but complete, no quotes
+ID=hpec
+VERSION_ID=33
+VERSION_CODENAME=hpec nimda
+ """,
+ ("hpec", "33", "hpec nimda"),
+ ),
+ ],
+)
+def test_get_distro(monkeypatch, content, expected):
+ def _fake_open(*args, **kwargs):
+ return io.StringIO(content)
+
+ monkeypatch.setattr("builtins.open", _fake_open)
+ assert _cephadm.get_distro() == expected
+
+
+class FakeContext:
+ """FakeContext is a minimal type for passing as a ctx, when
+ with_cephadm_ctx is not appropriate (it enables too many mocks, etc).
+ """
+
+ timeout = 30
+
+
+def _has_non_zero_exit(clog):
+ assert any("Non-zero exit" in ll for _, _, ll in clog.record_tuples)
+
+
+def _has_values_somewhere(clog, values, non_zero=True):
+ if non_zero:
+ _has_non_zero_exit(clog)
+ for value in values:
+ assert any(value in ll for _, _, ll in clog.record_tuples)
+
+
+@pytest.mark.parametrize(
+ "pyline, expected, call_kwargs, log_check",
+ [
+ pytest.param(
+ "import time; time.sleep(0.1)",
+ ("", "", 0),
+ {},
+ None,
+ id="brief-sleep",
+ ),
+ pytest.param(
+ "import sys; sys.exit(2)",
+ ("", "", 2),
+ {},
+ _has_non_zero_exit,
+ id="exit-non-zero",
+ ),
+ pytest.param(
+ "import sys; sys.exit(0)",
+ ("", "", 0),
+ {"desc": "success"},
+ None,
+ id="success-with-desc",
+ ),
+ pytest.param(
+ "print('foo'); print('bar')",
+ ("foo\nbar\n", "", 0),
+ {"desc": "stdout"},
+ None,
+ id="stdout-print",
+ ),
+ pytest.param(
+ "import sys; sys.stderr.write('la\\nla\\nla\\n')",
+ ("", "la\nla\nla\n", 0),
+ {"desc": "stderr"},
+ None,
+ id="stderr-print",
+ ),
+ pytest.param(
+ "for i in range(501): print(i, flush=True)",
+ lambda r: r[2] == 0 and r[1] == "" and "500" in r[0].splitlines(),
+ {},
+ None,
+ id="stdout-long",
+ ),
+ pytest.param(
+ "for i in range(1000000): print(i, flush=True)",
+ lambda r: r[2] == 0
+ and r[1] == ""
+ and len(r[0].splitlines()) == 1000000,
+ {},
+ None,
+ id="stdout-very-long",
+ ),
+ pytest.param(
+ "import sys; sys.stderr.write('pow\\noof\\nouch\\n'); sys.exit(1)",
+ ("", "pow\noof\nouch\n", 1),
+ {"desc": "stderr"},
+ functools.partial(
+ _has_values_somewhere,
+ values=["pow", "oof", "ouch"],
+ non_zero=True,
+ ),
+ id="stderr-logged-non-zero",
+ ),
+ pytest.param(
+ "import time; time.sleep(4)",
+ ("", "", 124),
+ {"timeout": 1},
+ None,
+ id="long-sleep",
+ ),
+ pytest.param(
+ "import time\nfor i in range(100):\n\tprint(i, flush=True); time.sleep(0.01)",
+ ("", "", 124),
+ {"timeout": 0.5},
+ None,
+ id="slow-print-timeout",
+ ),
+ # Commands that time out collect no logs, return empty std{out,err} strings
+ ],
+)
+def test_call(caplog, monkeypatch, pyline, expected, call_kwargs, log_check):
+ import logging
+
+ caplog.set_level(logging.INFO)
+ monkeypatch.setattr("cephadm.logger", logging.getLogger())
+ ctx = FakeContext()
+ result = _cephadm.call(ctx, [sys.executable, "-c", pyline], **call_kwargs)
+ if callable(expected):
+ assert expected(result)
+ else:
+ assert result == expected
+ if callable(log_check):
+ log_check(caplog)
+
+
+class TestWriteNew:
+ def test_success(self, tmp_path):
+ "Test the simple basic feature of writing a file."
+ dest = tmp_path / "foo.txt"
+ with _cephadm.write_new(dest) as fh:
+ fh.write("something\n")
+ fh.write("something else\n")
+
+ with open(dest, "r") as fh:
+ assert fh.read() == "something\nsomething else\n"
+
+ def test_write_ower_mode(self, tmp_path):
+ "Test that the owner and perms options function."
+ dest = tmp_path / "foo.txt"
+
+ # if this is test run as non-root, we can't really change ownership
+ uid = os.getuid()
+ gid = os.getgid()
+
+ with _cephadm.write_new(dest, owner=(uid, gid), perms=0o600) as fh:
+ fh.write("xomething\n")
+ fh.write("xomething else\n")
+
+ with open(dest, "r") as fh:
+ assert fh.read() == "xomething\nxomething else\n"
+ sr = os.fstat(fh.fileno())
+ assert sr.st_uid == uid
+ assert sr.st_gid == gid
+ assert (sr.st_mode & 0o777) == 0o600
+
+ def test_encoding(self, tmp_path):
+ "Test that the encoding option functions."
+ dest = tmp_path / "foo.txt"
+ msg = "\u2603\u26C5\n"
+ with _cephadm.write_new(dest, encoding='utf-8') as fh:
+ fh.write(msg)
+ with open(dest, "rb") as fh:
+ b1 = fh.read()
+ assert b1.decode('utf-8') == msg
+
+ dest = tmp_path / "foo2.txt"
+ with _cephadm.write_new(dest, encoding='utf-16le') as fh:
+ fh.write(msg)
+ with open(dest, "rb") as fh:
+ b2 = fh.read()
+ assert b2.decode('utf-16le') == msg
+
+ # the binary data should differ due to the different encodings
+ assert b1 != b2
+
+ def test_cleanup(self, tmp_path):
+ "Test that an exception during write leaves no file behind."
+ dest = tmp_path / "foo.txt"
+ with pytest.raises(ValueError):
+ with _cephadm.write_new(dest) as fh:
+ fh.write("hello\n")
+ raise ValueError("foo")
+ fh.write("world\n")
+ assert not dest.exists()
+ assert not dest.with_name(dest.name+".new").exists()
+ assert list(dest.parent.iterdir()) == []
+
+
+class CompareContext1:
+ cfg_data = {
+ "name": "mane",
+ "fsid": "foobar",
+ "image": "fake.io/noway/nohow:gndn",
+ "meta": {
+ "fruit": "banana",
+ "vegetable": "carrot",
+ },
+ "params": {
+ "osd_fsid": "robble",
+ "tcp_ports": [404, 9999],
+ },
+ "config_blobs": {
+ "alpha": {"sloop": "John B"},
+ "beta": {"forest": "birch"},
+ "gamma": {"forest": "pine"},
+ },
+ }
+
+ def check(self, ctx):
+ assert ctx.name == 'mane'
+ assert ctx.fsid == 'foobar'
+ assert ctx.image == 'fake.io/noway/nohow:gndn'
+ assert ctx.meta_properties == {"fruit": "banana", "vegetable": "carrot"}
+ assert ctx.config_blobs == {
+ "alpha": {"sloop": "John B"},
+ "beta": {"forest": "birch"},
+ "gamma": {"forest": "pine"},
+ }
+ assert ctx.osd_fsid == "robble"
+ assert ctx.tcp_ports == [404, 9999]
+
+
+class CompareContext2:
+ cfg_data = {
+ "name": "cc2",
+ "fsid": "foobar",
+ "meta": {
+ "fruit": "banana",
+ "vegetable": "carrot",
+ },
+ "params": {},
+ "config_blobs": {
+ "alpha": {"sloop": "John B"},
+ "beta": {"forest": "birch"},
+ "gamma": {"forest": "pine"},
+ },
+ }
+
+ def check(self, ctx):
+ assert ctx.name == 'cc2'
+ assert ctx.fsid == 'foobar'
+ assert ctx.image == 'quay.io/ceph/ceph:v18'
+ assert ctx.meta_properties == {"fruit": "banana", "vegetable": "carrot"}
+ assert ctx.config_blobs == {
+ "alpha": {"sloop": "John B"},
+ "beta": {"forest": "birch"},
+ "gamma": {"forest": "pine"},
+ }
+ assert ctx.osd_fsid is None
+ assert ctx.tcp_ports is None
+
+
+@pytest.mark.parametrize(
+ "cc",
+ [
+ CompareContext1(),
+ CompareContext2(),
+ ],
+)
+def test_apply_deploy_config_to_ctx(cc, monkeypatch):
+ import logging
+
+ monkeypatch.setattr("cephadm.logger", logging.getLogger())
+ ctx = FakeContext()
+ _cephadm.apply_deploy_config_to_ctx(cc.cfg_data, ctx)
+ cc.check(ctx)