diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/cephadm/tests/test_util_funcs.py | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/cephadm/tests/test_util_funcs.py')
-rw-r--r-- | src/cephadm/tests/test_util_funcs.py | 808 |
1 files changed, 808 insertions, 0 deletions
diff --git a/src/cephadm/tests/test_util_funcs.py b/src/cephadm/tests/test_util_funcs.py new file mode 100644 index 000000000..270753a55 --- /dev/null +++ b/src/cephadm/tests/test_util_funcs.py @@ -0,0 +1,808 @@ +# Tests for various assorted utility functions found within cephadm +# +from unittest import mock + +import functools +import io +import os +import sys + +import pytest + +from tests.fixtures import with_cephadm_ctx, import_cephadm + +_cephadm = import_cephadm() + + +class TestCopyTree: + def _copy_tree(self, *args, **kwargs): + with with_cephadm_ctx([]) as ctx: + with mock.patch("cephadm.extract_uid_gid") as eug: + eug.return_value = (os.getuid(), os.getgid()) + _cephadm.copy_tree(ctx, *args, **kwargs) + + def test_one_dir(self, tmp_path): + """Copy one dir into a non-existing dest dir.""" + src1 = tmp_path / "src1" + dst = tmp_path / "dst" + src1.mkdir(parents=True) + + with (src1 / "foo.txt").open("w") as fh: + fh.write("hello\n") + fh.write("earth\n") + + assert not (dst / "foo.txt").exists() + + self._copy_tree([src1], dst) + assert (dst / "foo.txt").exists() + + def test_one_existing_dir(self, tmp_path): + """Copy one dir into an existing dest dir.""" + src1 = tmp_path / "src1" + dst = tmp_path / "dst" + src1.mkdir(parents=True) + dst.mkdir(parents=True) + + with (src1 / "foo.txt").open("w") as fh: + fh.write("hello\n") + fh.write("earth\n") + + assert not (dst / "src1").exists() + + self._copy_tree([src1], dst) + assert (dst / "src1/foo.txt").exists() + + def test_two_dirs(self, tmp_path): + """Copy two source directories into an existing dest dir.""" + src1 = tmp_path / "src1" + src2 = tmp_path / "src2" + dst = tmp_path / "dst" + src1.mkdir(parents=True) + src2.mkdir(parents=True) + dst.mkdir(parents=True) + + with (src1 / "foo.txt").open("w") as fh: + fh.write("hello\n") + fh.write("earth\n") + with (src2 / "bar.txt").open("w") as fh: + fh.write("goodbye\n") + fh.write("mars\n") + + assert not (dst / "src1").exists() + assert not (dst / "src2").exists() + + self._copy_tree([src1, src2], dst) + assert (dst / "src1/foo.txt").exists() + assert (dst / "src2/bar.txt").exists() + + def test_one_dir_set_uid(self, tmp_path): + """Explicity pass uid/gid values and assert these are passed to chown.""" + # Because this test will often be run by non-root users it is necessary + # to mock os.chown or we too easily run into perms issues. + src1 = tmp_path / "src1" + dst = tmp_path / "dst" + src1.mkdir(parents=True) + + with (src1 / "foo.txt").open("w") as fh: + fh.write("hello\n") + fh.write("earth\n") + + assert not (dst / "foo.txt").exists() + + with mock.patch("os.chown") as _chown: + _chown.return_value = None + self._copy_tree([src1], dst, uid=0, gid=0) + assert len(_chown.mock_calls) >= 2 + for c in _chown.mock_calls: + assert c == mock.call(mock.ANY, 0, 0) + assert (dst / "foo.txt").exists() + + +class TestCopyFiles: + def _copy_files(self, *args, **kwargs): + with with_cephadm_ctx([]) as ctx: + with mock.patch("cephadm.extract_uid_gid") as eug: + eug.return_value = (os.getuid(), os.getgid()) + _cephadm.copy_files(ctx, *args, **kwargs) + + def test_one_file(self, tmp_path): + """Copy one file into the dest dir.""" + file1 = tmp_path / "f1.txt" + dst = tmp_path / "dst" + dst.mkdir(parents=True) + + with file1.open("w") as fh: + fh.write("its test time\n") + + self._copy_files([file1], dst) + assert (dst / "f1.txt").exists() + + def test_one_file_nodest(self, tmp_path): + """Copy one file to the given destination path.""" + file1 = tmp_path / "f1.txt" + dst = tmp_path / "dst" + + with file1.open("w") as fh: + fh.write("its test time\n") + + self._copy_files([file1], dst) + assert not dst.is_dir() + assert dst.is_file() + assert dst.open("r").read() == "its test time\n" + + def test_three_files(self, tmp_path): + """Copy one file into the dest dir.""" + file1 = tmp_path / "f1.txt" + file2 = tmp_path / "f2.txt" + file3 = tmp_path / "f3.txt" + dst = tmp_path / "dst" + dst.mkdir(parents=True) + + with file1.open("w") as fh: + fh.write("its test time\n") + with file2.open("w") as fh: + fh.write("f2\n") + with file3.open("w") as fh: + fh.write("f3\n") + + self._copy_files([file1, file2, file3], dst) + assert (dst / "f1.txt").exists() + assert (dst / "f2.txt").exists() + assert (dst / "f3.txt").exists() + + def test_three_files_nodest(self, tmp_path): + """Copy files to dest path (not a dir). This is not a useful operation.""" + file1 = tmp_path / "f1.txt" + file2 = tmp_path / "f2.txt" + file3 = tmp_path / "f3.txt" + dst = tmp_path / "dst" + + with file1.open("w") as fh: + fh.write("its test time\n") + with file2.open("w") as fh: + fh.write("f2\n") + with file3.open("w") as fh: + fh.write("f3\n") + + self._copy_files([file1, file2, file3], dst) + assert not dst.is_dir() + assert dst.is_file() + assert dst.open("r").read() == "f3\n" + + def test_one_file_set_uid(self, tmp_path): + """Explicity pass uid/gid values and assert these are passed to chown.""" + # Because this test will often be run by non-root users it is necessary + # to mock os.chown or we too easily run into perms issues. + file1 = tmp_path / "f1.txt" + dst = tmp_path / "dst" + dst.mkdir(parents=True) + + with file1.open("w") as fh: + fh.write("its test time\n") + + assert not (dst / "f1.txt").exists() + + with mock.patch("os.chown") as _chown: + _chown.return_value = None + self._copy_files([file1], dst, uid=0, gid=0) + assert len(_chown.mock_calls) >= 1 + for c in _chown.mock_calls: + assert c == mock.call(mock.ANY, 0, 0) + assert (dst / "f1.txt").exists() + + +class TestMoveFiles: + def _move_files(self, *args, **kwargs): + with with_cephadm_ctx([]) as ctx: + with mock.patch("cephadm.extract_uid_gid") as eug: + eug.return_value = (os.getuid(), os.getgid()) + _cephadm.move_files(ctx, *args, **kwargs) + + def test_one_file(self, tmp_path): + """Move a named file to test dest path.""" + file1 = tmp_path / "f1.txt" + dst = tmp_path / "dst" + + with file1.open("w") as fh: + fh.write("lets moove\n") + + assert not dst.exists() + assert file1.is_file() + + self._move_files([file1], dst) + assert dst.is_file() + assert not file1.exists() + + def test_one_file_destdir(self, tmp_path): + """Move a file into an existing dest dir.""" + file1 = tmp_path / "f1.txt" + dst = tmp_path / "dst" + dst.mkdir(parents=True) + + with file1.open("w") as fh: + fh.write("lets moove\n") + + assert not (dst / "f1.txt").exists() + assert file1.is_file() + + self._move_files([file1], dst) + assert (dst / "f1.txt").is_file() + assert not file1.exists() + + def test_one_file_one_link(self, tmp_path): + """Move a file and a symlink to that file to a dest dir.""" + file1 = tmp_path / "f1.txt" + link1 = tmp_path / "lnk" + dst = tmp_path / "dst" + dst.mkdir(parents=True) + + with file1.open("w") as fh: + fh.write("lets moove\n") + os.symlink("f1.txt", link1) + + assert not (dst / "f1.txt").exists() + assert file1.is_file() + assert link1.exists() + + self._move_files([file1, link1], dst) + assert (dst / "f1.txt").is_file() + assert (dst / "lnk").is_symlink() + assert not file1.exists() + assert not link1.exists() + assert (dst / "f1.txt").open("r").read() == "lets moove\n" + assert (dst / "lnk").open("r").read() == "lets moove\n" + + def test_one_file_set_uid(self, tmp_path): + """Explicity pass uid/gid values and assert these are passed to chown.""" + # Because this test will often be run by non-root users it is necessary + # to mock os.chown or we too easily run into perms issues. + file1 = tmp_path / "f1.txt" + dst = tmp_path / "dst" + + with file1.open("w") as fh: + fh.write("lets moove\n") + + assert not dst.exists() + assert file1.is_file() + + with mock.patch("os.chown") as _chown: + _chown.return_value = None + self._move_files([file1], dst, uid=0, gid=0) + assert len(_chown.mock_calls) >= 1 + for c in _chown.mock_calls: + assert c == mock.call(mock.ANY, 0, 0) + assert dst.is_file() + assert not file1.exists() + + +def test_recursive_chown(tmp_path): + d1 = tmp_path / "dir1" + d2 = d1 / "dir2" + f1 = d2 / "file1.txt" + d2.mkdir(parents=True) + + with f1.open("w") as fh: + fh.write("low down\n") + + with mock.patch("os.chown") as _chown: + _chown.return_value = None + _cephadm.recursive_chown(str(d1), uid=500, gid=500) + assert len(_chown.mock_calls) == 3 + assert _chown.mock_calls[0] == mock.call(str(d1), 500, 500) + assert _chown.mock_calls[1] == mock.call(str(d2), 500, 500) + assert _chown.mock_calls[2] == mock.call(str(f1), 500, 500) + + +class TestFindExecutable: + def test_standard_exe(self): + # pretty much every system will have `true` on the path. It's a safe choice + # for the first assertion + exe = _cephadm.find_executable("true") + assert exe.endswith("true") + + def test_custom_path(self, tmp_path): + foo_sh = tmp_path / "foo.sh" + with open(foo_sh, "w") as fh: + fh.write("#!/bin/sh\n") + fh.write("echo foo\n") + foo_sh.chmod(0o755) + + exe = _cephadm.find_executable(foo_sh) + assert str(exe) == str(foo_sh) + + def test_no_path(self, monkeypatch): + monkeypatch.delenv("PATH") + exe = _cephadm.find_executable("true") + assert exe.endswith("true") + + def test_no_path_no_confstr(self, monkeypatch): + def _fail(_): + raise ValueError("fail") + + monkeypatch.delenv("PATH") + monkeypatch.setattr("os.confstr", _fail) + exe = _cephadm.find_executable("true") + assert exe.endswith("true") + + def test_unset_path(self): + exe = _cephadm.find_executable("true", path="") + assert exe is None + + def test_no_such_exe(self): + exe = _cephadm.find_executable("foo_bar-baz.noway") + assert exe is None + + +def test_find_program(): + exe = _cephadm.find_program("true") + assert exe.endswith("true") + + with pytest.raises(ValueError): + _cephadm.find_program("foo_bar-baz.noway") + + +def _mk_fake_call(enabled, active): + def _fake_call(ctx, cmd, **kwargs): + if "is-enabled" in cmd: + if isinstance(enabled, Exception): + raise enabled + return enabled + if "is-active" in cmd: + if isinstance(active, Exception): + raise active + return active + raise ValueError("should not get here") + + return _fake_call + + +@pytest.mark.parametrize( + "enabled_out, active_out, expected", + [ + ( + # ok, all is well + ("", "", 0), + ("active", "", 0), + (True, "running", True), + ), + ( + # disabled, unknown if active + ("disabled", "", 1), + ("", "", 0), + (False, "unknown", True), + ), + ( + # is-enabled error (not disabled, unknown if active + ("bleh", "", 1), + ("", "", 0), + (False, "unknown", False), + ), + ( + # is-enabled ok, inactive is stopped + ("", "", 0), + ("inactive", "", 0), + (True, "stopped", True), + ), + ( + # is-enabled ok, failed is error + ("", "", 0), + ("failed", "", 0), + (True, "error", True), + ), + ( + # is-enabled ok, auto-restart is error + ("", "", 0), + ("auto-restart", "", 0), + (True, "error", True), + ), + ( + # error exec'ing is-enabled cmd + ValueError("bonk"), + ("active", "", 0), + (False, "running", False), + ), + ( + # error exec'ing is-enabled cmd + ("", "", 0), + ValueError("blat"), + (True, "unknown", True), + ), + ], +) +def test_check_unit(enabled_out, active_out, expected): + with with_cephadm_ctx([]) as ctx: + _cephadm.call.side_effect = _mk_fake_call( + enabled=enabled_out, + active=active_out, + ) + enabled, state, installed = _cephadm.check_unit(ctx, "foobar") + assert (enabled, state, installed) == expected + + +class FakeEnabler: + def __init__(self, should_be_called): + self._should_be_called = should_be_called + self._services = [] + + def enable_service(self, service): + self._services.append(service) + + def check_expected(self): + if not self._should_be_called: + assert not self._services + return + # there are currently seven chron/chrony type services that + # cephadm looks for. Make sure it probed for each of them + # or more in case someone adds to the list. + assert len(self._services) >= 7 + assert "chrony.service" in self._services + assert "ntp.service" in self._services + + +@pytest.mark.parametrize( + "call_fn, enabler, expected", + [ + # Test that time sync services are not enabled + ( + _mk_fake_call( + enabled=("", "", 1), + active=("", "", 1), + ), + None, + False, + ), + # Test that time sync service is enabled + ( + _mk_fake_call( + enabled=("", "", 0), + active=("active", "", 0), + ), + None, + True, + ), + # Test that time sync is not enabled, and try to enable them. + # This one needs to be not running, but installed in order to + # call the enabler. It should call the enabler with every known + # service name. + ( + _mk_fake_call( + enabled=("disabled", "", 1), + active=("", "", 1), + ), + FakeEnabler(True), + False, + ), + # Test that time sync is enabled, with an enabler passed which + # will check that the enabler was never called. + ( + _mk_fake_call( + enabled=("", "", 0), + active=("active", "", 0), + ), + FakeEnabler(False), + True, + ), + ], +) +def test_check_time_sync(call_fn, enabler, expected): + """The check_time_sync call actually checks if a time synchronization service + is enabled. It is also the only consumer of check_units. + """ + with with_cephadm_ctx([]) as ctx: + _cephadm.call.side_effect = call_fn + result = _cephadm.check_time_sync(ctx, enabler=enabler) + assert result == expected + if enabler is not None: + enabler.check_expected() + + +@pytest.mark.parametrize( + "content, expected", + [ + ( + """#JUNK + FOO=1 + """, + (None, None, None), + ), + ( + """# A sample from a real centos system +NAME="CentOS Stream" +VERSION="8" +ID="centos" +ID_LIKE="rhel fedora" +VERSION_ID="8" +PLATFORM_ID="platform:el8" +PRETTY_NAME="CentOS Stream 8" +ANSI_COLOR="0;31" +CPE_NAME="cpe:/o:centos:centos:8" +HOME_URL="https://centos.org/" +BUG_REPORT_URL="https://bugzilla.redhat.com/" +REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux 8" +REDHAT_SUPPORT_PRODUCT_VERSION="CentOS Stream" + """, + ("centos", "8", None), + ), + ( + """# Minimal but complete, made up vals +ID="hpec" +VERSION_ID="33" +VERSION_CODENAME="hpec nimda" + """, + ("hpec", "33", "hpec nimda"), + ), + ( + """# Minimal but complete, no quotes +ID=hpec +VERSION_ID=33 +VERSION_CODENAME=hpec nimda + """, + ("hpec", "33", "hpec nimda"), + ), + ], +) +def test_get_distro(monkeypatch, content, expected): + def _fake_open(*args, **kwargs): + return io.StringIO(content) + + monkeypatch.setattr("builtins.open", _fake_open) + assert _cephadm.get_distro() == expected + + +class FakeContext: + """FakeContext is a minimal type for passing as a ctx, when + with_cephadm_ctx is not appropriate (it enables too many mocks, etc). + """ + + timeout = 30 + + +def _has_non_zero_exit(clog): + assert any("Non-zero exit" in ll for _, _, ll in clog.record_tuples) + + +def _has_values_somewhere(clog, values, non_zero=True): + if non_zero: + _has_non_zero_exit(clog) + for value in values: + assert any(value in ll for _, _, ll in clog.record_tuples) + + +@pytest.mark.parametrize( + "pyline, expected, call_kwargs, log_check", + [ + pytest.param( + "import time; time.sleep(0.1)", + ("", "", 0), + {}, + None, + id="brief-sleep", + ), + pytest.param( + "import sys; sys.exit(2)", + ("", "", 2), + {}, + _has_non_zero_exit, + id="exit-non-zero", + ), + pytest.param( + "import sys; sys.exit(0)", + ("", "", 0), + {"desc": "success"}, + None, + id="success-with-desc", + ), + pytest.param( + "print('foo'); print('bar')", + ("foo\nbar\n", "", 0), + {"desc": "stdout"}, + None, + id="stdout-print", + ), + pytest.param( + "import sys; sys.stderr.write('la\\nla\\nla\\n')", + ("", "la\nla\nla\n", 0), + {"desc": "stderr"}, + None, + id="stderr-print", + ), + pytest.param( + "for i in range(501): print(i, flush=True)", + lambda r: r[2] == 0 and r[1] == "" and "500" in r[0].splitlines(), + {}, + None, + id="stdout-long", + ), + pytest.param( + "for i in range(1000000): print(i, flush=True)", + lambda r: r[2] == 0 + and r[1] == "" + and len(r[0].splitlines()) == 1000000, + {}, + None, + id="stdout-very-long", + ), + pytest.param( + "import sys; sys.stderr.write('pow\\noof\\nouch\\n'); sys.exit(1)", + ("", "pow\noof\nouch\n", 1), + {"desc": "stderr"}, + functools.partial( + _has_values_somewhere, + values=["pow", "oof", "ouch"], + non_zero=True, + ), + id="stderr-logged-non-zero", + ), + pytest.param( + "import time; time.sleep(4)", + ("", "", 124), + {"timeout": 1}, + None, + id="long-sleep", + ), + pytest.param( + "import time\nfor i in range(100):\n\tprint(i, flush=True); time.sleep(0.01)", + ("", "", 124), + {"timeout": 0.5}, + None, + id="slow-print-timeout", + ), + # Commands that time out collect no logs, return empty std{out,err} strings + ], +) +def test_call(caplog, monkeypatch, pyline, expected, call_kwargs, log_check): + import logging + + caplog.set_level(logging.INFO) + monkeypatch.setattr("cephadm.logger", logging.getLogger()) + ctx = FakeContext() + result = _cephadm.call(ctx, [sys.executable, "-c", pyline], **call_kwargs) + if callable(expected): + assert expected(result) + else: + assert result == expected + if callable(log_check): + log_check(caplog) + + +class TestWriteNew: + def test_success(self, tmp_path): + "Test the simple basic feature of writing a file." + dest = tmp_path / "foo.txt" + with _cephadm.write_new(dest) as fh: + fh.write("something\n") + fh.write("something else\n") + + with open(dest, "r") as fh: + assert fh.read() == "something\nsomething else\n" + + def test_write_ower_mode(self, tmp_path): + "Test that the owner and perms options function." + dest = tmp_path / "foo.txt" + + # if this is test run as non-root, we can't really change ownership + uid = os.getuid() + gid = os.getgid() + + with _cephadm.write_new(dest, owner=(uid, gid), perms=0o600) as fh: + fh.write("xomething\n") + fh.write("xomething else\n") + + with open(dest, "r") as fh: + assert fh.read() == "xomething\nxomething else\n" + sr = os.fstat(fh.fileno()) + assert sr.st_uid == uid + assert sr.st_gid == gid + assert (sr.st_mode & 0o777) == 0o600 + + def test_encoding(self, tmp_path): + "Test that the encoding option functions." + dest = tmp_path / "foo.txt" + msg = "\u2603\u26C5\n" + with _cephadm.write_new(dest, encoding='utf-8') as fh: + fh.write(msg) + with open(dest, "rb") as fh: + b1 = fh.read() + assert b1.decode('utf-8') == msg + + dest = tmp_path / "foo2.txt" + with _cephadm.write_new(dest, encoding='utf-16le') as fh: + fh.write(msg) + with open(dest, "rb") as fh: + b2 = fh.read() + assert b2.decode('utf-16le') == msg + + # the binary data should differ due to the different encodings + assert b1 != b2 + + def test_cleanup(self, tmp_path): + "Test that an exception during write leaves no file behind." + dest = tmp_path / "foo.txt" + with pytest.raises(ValueError): + with _cephadm.write_new(dest) as fh: + fh.write("hello\n") + raise ValueError("foo") + fh.write("world\n") + assert not dest.exists() + assert not dest.with_name(dest.name+".new").exists() + assert list(dest.parent.iterdir()) == [] + + +class CompareContext1: + cfg_data = { + "name": "mane", + "fsid": "foobar", + "image": "fake.io/noway/nohow:gndn", + "meta": { + "fruit": "banana", + "vegetable": "carrot", + }, + "params": { + "osd_fsid": "robble", + "tcp_ports": [404, 9999], + }, + "config_blobs": { + "alpha": {"sloop": "John B"}, + "beta": {"forest": "birch"}, + "gamma": {"forest": "pine"}, + }, + } + + def check(self, ctx): + assert ctx.name == 'mane' + assert ctx.fsid == 'foobar' + assert ctx.image == 'fake.io/noway/nohow:gndn' + assert ctx.meta_properties == {"fruit": "banana", "vegetable": "carrot"} + assert ctx.config_blobs == { + "alpha": {"sloop": "John B"}, + "beta": {"forest": "birch"}, + "gamma": {"forest": "pine"}, + } + assert ctx.osd_fsid == "robble" + assert ctx.tcp_ports == [404, 9999] + + +class CompareContext2: + cfg_data = { + "name": "cc2", + "fsid": "foobar", + "meta": { + "fruit": "banana", + "vegetable": "carrot", + }, + "params": {}, + "config_blobs": { + "alpha": {"sloop": "John B"}, + "beta": {"forest": "birch"}, + "gamma": {"forest": "pine"}, + }, + } + + def check(self, ctx): + assert ctx.name == 'cc2' + assert ctx.fsid == 'foobar' + assert ctx.image == 'quay.io/ceph/ceph:v18' + assert ctx.meta_properties == {"fruit": "banana", "vegetable": "carrot"} + assert ctx.config_blobs == { + "alpha": {"sloop": "John B"}, + "beta": {"forest": "birch"}, + "gamma": {"forest": "pine"}, + } + assert ctx.osd_fsid is None + assert ctx.tcp_ports is None + + +@pytest.mark.parametrize( + "cc", + [ + CompareContext1(), + CompareContext2(), + ], +) +def test_apply_deploy_config_to_ctx(cc, monkeypatch): + import logging + + monkeypatch.setattr("cephadm.logger", logging.getLogger()) + ctx = FakeContext() + _cephadm.apply_deploy_config_to_ctx(cc.cfg_data, ctx) + cc.check(ctx) |