diff options
Diffstat (limited to '')
-rw-r--r-- | test/unittests/test_utils.py | 1514 |
1 files changed, 1514 insertions, 0 deletions
diff --git a/test/unittests/test_utils.py b/test/unittests/test_utils.py new file mode 100644 index 0000000..bf06fbd --- /dev/null +++ b/test/unittests/test_utils.py @@ -0,0 +1,1514 @@ +from __future__ import unicode_literals +# Copyright (C) 2014 Kristoffer Gronlund <kgronlund@suse.com> +# See COPYING for license information. +# +# unit tests for utils.py + +import os +import socket +import re +import imp +import subprocess +import unittest +import pytest +import logging +from unittest import mock +from itertools import chain + +import crmsh.utils +from crmsh import utils, config, tmpfiles, constants, parallax + +logging.basicConfig(level=logging.DEBUG) + +def setup_function(): + utils._ip_for_cloud = None + # Mock memoize method and reload the module under test later with imp + mock.patch('crmsh.utils.memoize', lambda x: x).start() + imp.reload(utils) + + +@mock.patch("crmsh.sh.ShellUtils.get_stdout_stderr") +def test_print_cluster_nodes(mock_run): + mock_run.return_value = (0, "data", None) + utils.print_cluster_nodes() + mock_run.assert_called_once_with("crm_node -l") + + +@mock.patch("crmsh.sh.ShellUtils.get_stdout") +def test_package_is_installed_local(mock_run): + mock_run.return_value = (0, None) + res = utils.package_is_installed("crmsh") + assert res is True + mock_run.assert_called_once_with("rpm -q --quiet crmsh") + + +@mock.patch('crmsh.utils.detect_file') +def test_check_file_content_included_target_not_exist(mock_detect): + mock_detect.side_effect = [True, False] + res = utils.check_file_content_included("file1", "file2") + assert res is False + mock_detect.assert_has_calls([ + mock.call("file1", remote=None), + mock.call("file2", remote=None) + ]) + + +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +@mock.patch('crmsh.utils.detect_file') +def test_check_file_content_included(mock_detect, mock_run): + mock_detect.side_effect = [True, True] + mock_run.side_effect = ["data data", "data"] + + res = utils.check_file_content_included("file1", "file2") + assert res is True + + mock_detect.assert_has_calls([ + mock.call("file1", remote=None), + mock.call("file2", remote=None) + ]) + mock_run.assert_has_calls([ + mock.call("cat file2", host=None), + mock.call("cat file1", host=None) + ]) + + +@mock.patch('re.search') +@mock.patch('crmsh.sh.ShellUtils.get_stdout') +def test_get_nodeid_from_name_run_None1(mock_get_stdout, mock_re_search): + mock_get_stdout.return_value = (1, None) + mock_re_search_inst = mock.Mock() + mock_re_search.return_value = mock_re_search_inst + res = utils.get_nodeid_from_name("node1") + assert res is None + mock_get_stdout.assert_called_once_with('crm_node -l') + mock_re_search.assert_not_called() + + +@mock.patch('re.search') +@mock.patch('crmsh.sh.ShellUtils.get_stdout') +def test_get_nodeid_from_name_run_None2(mock_get_stdout, mock_re_search): + mock_get_stdout.return_value = (0, "172167901 node1 member\n172168231 node2 member") + mock_re_search.return_value = None + res = utils.get_nodeid_from_name("node111") + assert res is None + mock_get_stdout.assert_called_once_with('crm_node -l') + mock_re_search.assert_called_once_with(r'^([0-9]+) node111 ', mock_get_stdout.return_value[1], re.M) + + +@mock.patch('re.search') +@mock.patch('crmsh.sh.ShellUtils.get_stdout') +def test_get_nodeid_from_name(mock_get_stdout, mock_re_search): + mock_get_stdout.return_value = (0, "172167901 node1 member\n172168231 node2 member") + mock_re_search_inst = mock.Mock() + mock_re_search.return_value = mock_re_search_inst + mock_re_search_inst.group.return_value = '172168231' + res = utils.get_nodeid_from_name("node2") + assert res == '172168231' + mock_get_stdout.assert_called_once_with('crm_node -l') + mock_re_search.assert_called_once_with(r'^([0-9]+) node2 ', mock_get_stdout.return_value[1], re.M) + mock_re_search_inst.group.assert_called_once_with(1) + + +@mock.patch('crmsh.sh.LocalShell.get_rc_and_error') +def test_check_ssh_passwd_need(mock_run): + mock_run.return_value = (1, 'foo') + res = utils.check_ssh_passwd_need("bob", "alice", "node1") + assert res is True + mock_run.assert_called_once_with( + "bob", + " ssh -o StrictHostKeyChecking=no -o EscapeChar=none -o ConnectTimeout=15 -T -o Batchmode=yes alice@node1 true", + ) + + +@mock.patch('crmsh.utils.list_cluster_nodes') +def test_cluster_run_cmd_exception(mock_list_nodes): + mock_list_nodes.return_value = None + with pytest.raises(ValueError) as err: + utils.cluster_run_cmd("test") + assert str(err.value) == "Failed to get node list from cluster" + mock_list_nodes.assert_called_once_with() + + +@mock.patch('crmsh.utils.list_cluster_nodes') +def test_list_cluster_nodes_except_me_exception(mock_list_nodes): + mock_list_nodes.return_value = None + with pytest.raises(ValueError) as err: + utils.list_cluster_nodes_except_me() + assert str(err.value) == "Failed to get node list from cluster" + mock_list_nodes.assert_called_once_with() + + +@mock.patch('crmsh.utils.this_node') +@mock.patch('crmsh.utils.list_cluster_nodes') +def test_list_cluster_nodes_except_me(mock_list_nodes, mock_this_node): + mock_list_nodes.return_value = ["node1", "node2"] + mock_this_node.return_value = "node1" + res = utils.list_cluster_nodes_except_me() + assert res == ["node2"] + mock_list_nodes.assert_called_once_with() + mock_this_node.assert_called_once_with() + + +def test_to_ascii(): + assert utils.to_ascii(None) is None + assert utils.to_ascii('test') == 'test' + assert utils.to_ascii(b'test') == 'test' + # Test not utf-8 characters + with mock.patch('traceback.print_exc') as mock_traceback: + assert utils.to_ascii(b'te\xe9st') == 'test' + mock_traceback.assert_called_once_with() + + +def test_systeminfo(): + assert utils.getuser() is not None + assert utils.gethomedir() is not None + assert utils.get_tempdir() is not None + + +def test_shadowcib(): + assert utils.get_cib_in_use() == "" + utils.set_cib_in_use("foo") + assert utils.get_cib_in_use() == "foo" + utils.clear_cib_in_use() + assert utils.get_cib_in_use() == "" + + +def test_booleans(): + truthy = ['yes', 'Yes', 'True', 'true', 'TRUE', + 'YES', 'on', 'On', 'ON'] + falsy = ['no', 'false', 'off', 'OFF', 'FALSE', 'nO'] + not_truthy = ['', 'not', 'ONN', 'TRUETH', 'yess'] + for case in chain(truthy, falsy): + assert utils.verify_boolean(case) is True + for case in truthy: + assert utils.is_boolean_true(case) is True + assert utils.is_boolean_false(case) is False + assert utils.get_boolean(case) is True + for case in falsy: + assert utils.is_boolean_true(case) is False + assert utils.is_boolean_false(case) is True + assert utils.get_boolean(case, dflt=True) is False + for case in not_truthy: + assert utils.verify_boolean(case) is False + assert utils.is_boolean_true(case) is False + assert utils.is_boolean_false(case) is False + assert utils.get_boolean(case) is False + + +def test_olist(): + lst = utils.olist(['B', 'C', 'A']) + lst.append('f') + lst.append('aA') + lst.append('_') + assert 'aa' in lst + assert 'a' in lst + assert list(lst) == ['b', 'c', 'a', 'f', 'aa', '_'] + + +def test_add_sudo(): + tmpuser = config.core.user + try: + config.core.user = 'root' + assert utils.add_sudo('ls').startswith('sudo') + config.core.user = '' + assert utils.add_sudo('ls') == 'ls' + finally: + config.core.user = tmpuser + + +def test_str2tmp(): + txt = "This is a test string" + filename = utils.str2tmp(txt) + assert os.path.isfile(filename) + assert open(filename).read() == txt + "\n" + assert utils.file2str(filename) == txt + os.unlink(filename) + + +@mock.patch('logging.Logger.error') +def test_sanity(mock_error): + sane_paths = ['foo/bar', 'foo', '/foo/bar', 'foo0', + 'foo_bar', 'foo-bar', '0foo', '.foo', + 'foo.bar'] + insane_paths = ['#foo', 'foo?', 'foo*', 'foo$', 'foo[bar]', + 'foo`', "foo'", 'foo/*'] + for p in sane_paths: + assert utils.is_path_sane(p) + for p in insane_paths: + assert not utils.is_path_sane(p) + sane_filenames = ['foo', '0foo', '0', '.foo'] + insane_filenames = ['foo/bar'] + for p in sane_filenames: + assert utils.is_filename_sane(p) + for p in insane_filenames: + assert not utils.is_filename_sane(p) + sane_names = ['foo'] + insane_names = ["f'o"] + for n in sane_names: + assert utils.is_name_sane(n) + for n in insane_names: + assert not utils.is_name_sane(n) + + +def test_nvpairs2dict(): + assert utils.nvpairs2dict(['a=b', 'c=d']) == {'a': 'b', 'c': 'd'} + assert utils.nvpairs2dict(['a=b=c', 'c=d']) == {'a': 'b=c', 'c': 'd'} + assert utils.nvpairs2dict(['a']) == {'a': None} + + +def test_validity(): + assert utils.is_id_valid('foo0') + assert not utils.is_id_valid('0foo') + + +def test_msec(): + assert utils.crm_msec('1ms') == 1 + assert utils.crm_msec('1s') == 1000 + assert utils.crm_msec('1us') == 0 + assert utils.crm_msec('1') == 1000 + assert utils.crm_msec('1m') == 60*1000 + assert utils.crm_msec('1h') == 60*60*1000 + + +def test_parse_sysconfig(): + """ + bsc#1129317: Fails on this line + + FW_SERVICES_ACCEPT_EXT="0/0,tcp,22,,hitcount=3,blockseconds=60,recentname=ssh" + """ + s = ''' +FW_SERVICES_ACCEPT_EXT="0/0,tcp,22,,hitcount=3,blockseconds=60,recentname=ssh" +''' + + fd, fname = tmpfiles.create() + with open(fname, 'w') as f: + f.write(s) + sc = utils.parse_sysconfig(fname) + assert ("FW_SERVICES_ACCEPT_EXT" in sc) + +def test_sysconfig_set(): + s = ''' +FW_SERVICES_ACCEPT_EXT="0/0,tcp,22,,hitcount=3,blockseconds=60,recentname=ssh" +''' + fd, fname = tmpfiles.create() + with open(fname, 'w') as f: + f.write(s) + utils.sysconfig_set(fname, FW_SERVICES_ACCEPT_EXT="foo=bar", FOO="bar") + sc = utils.parse_sysconfig(fname) + assert (sc.get("FW_SERVICES_ACCEPT_EXT") == "foo=bar") + assert (sc.get("FOO") == "bar") + +def test_sysconfig_set_bsc1145823(): + s = '''# this is test +#age=1000 +''' + fd, fname = tmpfiles.create() + with open(fname, 'w') as f: + f.write(s) + utils.sysconfig_set(fname, age="100") + sc = utils.parse_sysconfig(fname) + assert (sc.get("age") == "100") + +@mock.patch("crmsh.utils.IP.is_ipv6") +@mock.patch("socket.socket") +@mock.patch("crmsh.utils.closing") +def test_check_port_open_false(mock_closing, mock_socket, mock_is_ipv6): + mock_is_ipv6.return_value = False + sock_inst = mock.Mock() + mock_socket.return_value = sock_inst + mock_closing.return_value.__enter__.return_value = sock_inst + sock_inst.connect_ex.return_value = 1 + + assert utils.check_port_open("10.10.10.1", 22) is False + + mock_is_ipv6.assert_called_once_with("10.10.10.1") + mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM) + mock_closing.assert_called_once_with(sock_inst) + sock_inst.connect_ex.assert_called_once_with(("10.10.10.1", 22)) + +@mock.patch("crmsh.utils.IP.is_ipv6") +@mock.patch("socket.socket") +@mock.patch("crmsh.utils.closing") +def test_check_port_open_true(mock_closing, mock_socket, mock_is_ipv6): + mock_is_ipv6.return_value = True + sock_inst = mock.Mock() + mock_socket.return_value = sock_inst + mock_closing.return_value.__enter__.return_value = sock_inst + sock_inst.connect_ex.return_value = 0 + + assert utils.check_port_open("2001:db8:10::7", 22) is True + + mock_is_ipv6.assert_called_once_with("2001:db8:10::7") + mock_socket.assert_called_once_with(socket.AF_INET6, socket.SOCK_STREAM) + mock_closing.assert_called_once_with(sock_inst) + sock_inst.connect_ex.assert_called_once_with(("2001:db8:10::7", 22)) + +def test_valid_port(): + assert utils.valid_port(1) is False + assert utils.valid_port(10000000) is False + assert utils.valid_port(1234) is True + +@mock.patch("crmsh.corosync.get_value") +def test_is_qdevice_configured_false(mock_get_value): + mock_get_value.return_value = "ip" + assert utils.is_qdevice_configured() is False + mock_get_value.assert_called_once_with("quorum.device.model") + +@mock.patch("crmsh.corosync.get_value") +def test_is_qdevice_configured_true(mock_get_value): + mock_get_value.return_value = "net" + assert utils.is_qdevice_configured() is True + mock_get_value.assert_called_once_with("quorum.device.model") + +@mock.patch("crmsh.corosync.get_value") +def test_is_qdevice_tls_on_false(mock_get_value): + mock_get_value.return_value = "off" + assert utils.is_qdevice_tls_on() is False + mock_get_value.assert_called_once_with("quorum.device.net.tls") + +@mock.patch("crmsh.corosync.get_value") +def test_is_qdevice_tls_on_true(mock_get_value): + mock_get_value.return_value = "on" + assert utils.is_qdevice_tls_on() is True + mock_get_value.assert_called_once_with("quorum.device.net.tls") + +@mock.patch("crmsh.sh.ShellUtils.get_stdout") +def test_get_nodeinfo_from_cmaptool_return_none(mock_get_stdout): + mock_get_stdout.return_value = (1, None) + assert bool(utils.get_nodeinfo_from_cmaptool()) is False + mock_get_stdout.assert_called_once_with("corosync-cmapctl -b runtime.totem.pg.mrp.srp.members") + +@mock.patch("re.findall") +@mock.patch("re.search") +@mock.patch("crmsh.sh.ShellUtils.get_stdout") +def test_get_nodeinfo_from_cmaptool(mock_get_stdout, mock_search, mock_findall): + mock_get_stdout.return_value = (0, 'runtime.totem.pg.mrp.srp.members.1.ip (str) = r(0) ip(192.168.43.129)\nruntime.totem.pg.mrp.srp.members.2.ip (str) = r(0) ip(192.168.43.128)') + match_inst1 = mock.Mock() + match_inst2 = mock.Mock() + mock_search.side_effect = [match_inst1, match_inst2] + match_inst1.group.return_value = '1' + match_inst2.group.return_value = '2' + mock_findall.side_effect = [["192.168.43.129"], ["192.168.43.128"]] + + result = utils.get_nodeinfo_from_cmaptool() + assert result['1'] == ["192.168.43.129"] + assert result['2'] == ["192.168.43.128"] + + mock_get_stdout.assert_called_once_with("corosync-cmapctl -b runtime.totem.pg.mrp.srp.members") + mock_search.assert_has_calls([ + mock.call(r'members\.(.*)\.ip', 'runtime.totem.pg.mrp.srp.members.1.ip (str) = r(0) ip(192.168.43.129)'), + mock.call(r'members\.(.*)\.ip', 'runtime.totem.pg.mrp.srp.members.2.ip (str) = r(0) ip(192.168.43.128)') + ]) + match_inst1.group.assert_called_once_with(1) + match_inst2.group.assert_called_once_with(1) + mock_findall.assert_has_calls([ + mock.call(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}', 'runtime.totem.pg.mrp.srp.members.1.ip (str) = r(0) ip(192.168.43.129)'), + mock.call(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}', 'runtime.totem.pg.mrp.srp.members.2.ip (str) = r(0) ip(192.168.43.128)') + ]) + +@mock.patch("crmsh.utils.get_nodeinfo_from_cmaptool") +@mock.patch("crmsh.service_manager.ServiceManager.service_is_active") +def test_valid_nodeid_false_service_not_active(mock_is_active, mock_nodeinfo): + mock_is_active.return_value = False + assert utils.valid_nodeid("3") is False + mock_is_active.assert_called_once_with('corosync.service') + mock_nodeinfo.assert_not_called() + +@mock.patch("crmsh.utils.get_nodeinfo_from_cmaptool") +@mock.patch("crmsh.service_manager.ServiceManager.service_is_active") +def test_valid_nodeid_false(mock_is_active, mock_nodeinfo): + mock_is_active.return_value = True + mock_nodeinfo.return_value = {'1': ["10.10.10.1"], "2": ["20.20.20.2"]} + assert utils.valid_nodeid("3") is False + mock_is_active.assert_called_once_with('corosync.service') + mock_nodeinfo.assert_called_once_with() + +@mock.patch("crmsh.utils.get_nodeinfo_from_cmaptool") +@mock.patch("crmsh.service_manager.ServiceManager.service_is_active") +def test_valid_nodeid_true(mock_is_active, mock_nodeinfo): + mock_is_active.return_value = True + mock_nodeinfo.return_value = {'1': ["10.10.10.1"], "2": ["20.20.20.2"]} + assert utils.valid_nodeid("2") is True + mock_is_active.assert_called_once_with('corosync.service') + mock_nodeinfo.assert_called_once_with() + +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_detect_aws_false(mock_run): + mock_run.side_effect = ["test", "test"] + assert utils.detect_aws() is False + mock_run.assert_has_calls([ + mock.call("dmidecode -s system-version"), + mock.call("dmidecode -s system-manufacturer") + ]) + +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_detect_aws_xen(mock_run): + mock_run.side_effect = ["4.2.amazon", "Xen"] + assert utils.detect_aws() is True + mock_run.assert_has_calls([ + mock.call("dmidecode -s system-version"), + mock.call("dmidecode -s system-manufacturer") + ]) + +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_detect_aws_kvm(mock_run): + mock_run.side_effect = ["Not Specified", "Amazon EC2"] + assert utils.detect_aws() is True + mock_run.assert_has_calls([ + mock.call("dmidecode -s system-version"), + mock.call("dmidecode -s system-manufacturer") + ]) + +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_detect_azure_false(mock_run): + mock_run.side_effect = ["test", "test"] + assert utils.detect_azure() is False + mock_run.assert_has_calls([ + mock.call("dmidecode -s system-manufacturer"), + mock.call("dmidecode -s chassis-asset-tag") + ]) + +@mock.patch("crmsh.utils._cloud_metadata_request") +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_detect_azure_microsoft_corporation(mock_run, mock_request): + mock_run.side_effect = ["microsoft corporation", "test"] + mock_request.return_value = "data" + assert utils.detect_azure() is True + mock_run.assert_has_calls([ + mock.call("dmidecode -s system-manufacturer"), + mock.call("dmidecode -s chassis-asset-tag") + ]) + +@mock.patch("crmsh.utils._cloud_metadata_request") +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_detect_azure_chassis(mock_run, mock_request): + mock_run.side_effect = ["test", "7783-7084-3265-9085-8269-3286-77"] + mock_request.return_value = "data" + assert utils.detect_azure() is True + mock_run.assert_has_calls([ + mock.call("dmidecode -s system-manufacturer"), + mock.call("dmidecode -s chassis-asset-tag") + ]) + +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_detect_gcp_false(mock_run): + mock_run.return_value = "test" + assert utils.detect_gcp() is False + mock_run.assert_called_once_with("dmidecode -s bios-vendor") + +@mock.patch("crmsh.utils._cloud_metadata_request") +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_detect_gcp(mock_run, mock_request): + mock_run.return_value = "Google instance" + mock_request.return_value = "data" + assert utils.detect_gcp() is True + mock_run.assert_called_once_with("dmidecode -s bios-vendor") + +@mock.patch("crmsh.utils.is_program") +def test_detect_cloud_no_cmd(mock_is_program): + mock_is_program.return_value = False + assert utils.detect_cloud() is None + mock_is_program.assert_called_once_with("dmidecode") + +@mock.patch("crmsh.utils.detect_aws") +@mock.patch("crmsh.utils.is_program") +def test_detect_cloud_aws(mock_is_program, mock_aws): + mock_is_program.return_value = True + mock_aws.return_value = True + assert utils.detect_cloud() == constants.CLOUD_AWS + mock_is_program.assert_called_once_with("dmidecode") + mock_aws.assert_called_once_with() + +@mock.patch("crmsh.utils.detect_azure") +@mock.patch("crmsh.utils.detect_aws") +@mock.patch("crmsh.utils.is_program") +def test_detect_cloud_azure(mock_is_program, mock_aws, mock_azure): + mock_is_program.return_value = True + mock_aws.return_value = False + mock_azure.return_value = True + assert utils.detect_cloud() == constants.CLOUD_AZURE + mock_is_program.assert_called_once_with("dmidecode") + mock_aws.assert_called_once_with() + mock_azure.assert_called_once_with() + +@mock.patch("crmsh.utils.detect_gcp") +@mock.patch("crmsh.utils.detect_azure") +@mock.patch("crmsh.utils.detect_aws") +@mock.patch("crmsh.utils.is_program") +def test_detect_cloud_gcp(mock_is_program, mock_aws, mock_azure, mock_gcp): + mock_is_program.return_value = True + mock_aws.return_value = False + mock_azure.return_value = False + mock_gcp.return_value = True + assert utils.detect_cloud() == constants.CLOUD_GCP + mock_is_program.assert_called_once_with("dmidecode") + mock_aws.assert_called_once_with() + mock_azure.assert_called_once_with() + mock_gcp.assert_called_once_with() + +@mock.patch("crmsh.sh.ShellUtils.get_stdout") +def test_interface_choice(mock_get_stdout): + ip_a_output = """ +1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000 + link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 + inet 127.0.0.1/8 scope host lo + valid_lft forever preferred_lft forever + inet6 ::1/128 scope host + valid_lft forever preferred_lft forever +2: enp1s0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000 + link/ether 52:54:00:9e:1b:4f brd ff:ff:ff:ff:ff:ff + inet 192.168.122.241/24 brd 192.168.122.255 scope global enp1s0 + valid_lft forever preferred_lft forever + inet6 fe80::5054:ff:fe9e:1b4f/64 scope link + valid_lft forever preferred_lft forever +3: br-933fa0e1438c: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default + link/ether 9e:fe:24:df:59:49 brd ff:ff:ff:ff:ff:ff + inet 10.10.10.1/24 brd 10.10.10.255 scope global br-933fa0e1438c + valid_lft forever preferred_lft forever + inet6 fe80::9cfe:24ff:fedf:5949/64 scope link + valid_lft forever preferred_lft forever +4: veth3fff6e9@if7: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue master docker0 state UP group default + link/ether 1e:2c:b3:73:6b:42 brd ff:ff:ff:ff:ff:ff link-netnsid 0 + inet6 fe80::1c2c:b3ff:fe73:6b42/64 scope link + valid_lft forever preferred_lft forever + valid_lft forever preferred_lft forever +""" + mock_get_stdout.return_value = (0, ip_a_output) + assert utils.interface_choice() == ["enp1s0", "br-933fa0e1438c", "veth3fff6e9"] + mock_get_stdout.assert_called_once_with("ip a") + + +class TestIP(unittest.TestCase): + """ + Unitary tests for class utils.IP + """ + @classmethod + def setUpClass(cls): + """ + Global setUp. + """ + + def setUp(self): + """ + Test setUp. + """ + self.ip_inst = utils.IP("10.10.10.1") + + def tearDown(self): + """ + Test tearDown. + """ + + @classmethod + def tearDownClass(cls): + """ + Global tearDown. + """ + + @mock.patch('ipaddress.ip_address') + def test_ip_address(self, mock_ip_address): + mock_ip_address_inst = mock.Mock() + mock_ip_address.return_value = mock_ip_address_inst + self.ip_inst.ip_address + mock_ip_address.assert_called_once_with("10.10.10.1") + + @mock.patch('crmsh.utils.IP.ip_address', new_callable=mock.PropertyMock) + def test_version(self, mock_ip_address): + mock_ip_address_inst = mock.Mock(version=4) + mock_ip_address.return_value = mock_ip_address_inst + res = self.ip_inst.version + self.assertEqual(res, mock_ip_address_inst.version) + mock_ip_address.assert_called_once_with() + + @mock.patch('crmsh.utils.IP.ip_address', new_callable=mock.PropertyMock) + def test_is_mcast(self, mock_ip_address): + mock_ip_address_inst = mock.Mock(is_multicast=False) + mock_ip_address.return_value = mock_ip_address_inst + res = utils.IP.is_mcast("10.10.10.1") + self.assertEqual(res, False) + mock_ip_address.assert_called_once_with() + + @mock.patch('crmsh.utils.IP.version', new_callable=mock.PropertyMock) + def test_is_ipv6(self, mock_version): + mock_version.return_value = 4 + res = utils.IP.is_ipv6("10.10.10.1") + self.assertEqual(res, False) + mock_version.assert_called_once_with() + + @mock.patch('crmsh.utils.IP.ip_address', new_callable=mock.PropertyMock) + def test_is_loopback(self, mock_ip_address): + mock_ip_address_inst = mock.Mock(is_loopback=False) + mock_ip_address.return_value = mock_ip_address_inst + res = self.ip_inst.is_loopback + self.assertEqual(res, mock_ip_address_inst.is_loopback) + mock_ip_address.assert_called_once_with() + + +class TestInterface(unittest.TestCase): + """ + Unitary tests for class utils.Interface + """ + @classmethod + def setUpClass(cls): + """ + Global setUp. + """ + + def setUp(self): + """ + Test setUp. + """ + self.interface = utils.Interface("10.10.10.123/24") + + def tearDown(self): + """ + Test tearDown. + """ + + @classmethod + def tearDownClass(cls): + """ + Global tearDown. + """ + + def test_ip_with_mask(self): + assert self.interface.ip_with_mask == "10.10.10.123/24" + + @mock.patch('ipaddress.ip_interface') + def test_ip_interface(self, mock_ip_interface): + mock_ip_interface_inst = mock.Mock() + mock_ip_interface.return_value = mock_ip_interface_inst + self.interface.ip_interface + mock_ip_interface.assert_called_once_with("10.10.10.123/24") + + @mock.patch('crmsh.utils.Interface.ip_interface', new_callable=mock.PropertyMock) + def test_network(self, mock_ip_interface): + mock_ip_interface_inst = mock.Mock() + mock_ip_interface.return_value = mock_ip_interface_inst + mock_ip_interface_inst.network = mock.Mock(network_address="10.10.10.0") + assert self.interface.network == "10.10.10.0" + mock_ip_interface.assert_called_once_with() + + +class TestInterfacesInfo(unittest.TestCase): + """ + Unitary tests for class utils.InterfacesInfo + """ + + network_output_error = """1: lo inet 127.0.0.1/8 scope host lo\ valid_lft forever preferred_lft forever +2: enp1s0 inet 192.168.122.241/24 brd 192.168.122.255 scope global enp1s0 +61: tun0 inet 10.163.45.46 peer 10.163.45.45/32 scope global tun0""" + + @classmethod + def setUpClass(cls): + """ + Global setUp. + """ + + def setUp(self): + """ + Test setUp. + """ + self.interfaces_info = utils.InterfacesInfo() + self.interfaces_info_with_second_hb = utils.InterfacesInfo(second_heartbeat=True) + self.interfaces_info_with_custom_nic = utils.InterfacesInfo(second_heartbeat=True, custom_nic_list=['eth1']) + self.interfaces_info_with_wrong_nic = utils.InterfacesInfo(custom_nic_list=['eth7']) + self.interfaces_info_fake = utils.InterfacesInfo() + self.interfaces_info_fake._nic_info_dict = { + "eth0": [mock.Mock(ip="10.10.10.1", network="10.10.10.0"), mock.Mock(ip="10.10.10.2", network="10.10.10.0")], + "eth1": [mock.Mock(ip="20.20.20.1", network="20.20.20.0")] + } + self.interfaces_info_fake._default_nic_list = ["eth7"] + + def tearDown(self): + """ + Test tearDown. + """ + + @classmethod + def tearDownClass(cls): + """ + Global tearDown. + """ + + @mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') + def test_get_interfaces_info_no_address(self, mock_run): + only_lo = "1: lo inet 127.0.0.1/8 scope host lo\ valid_lft forever preferred_lft forever" + mock_run.return_value = (0, only_lo, None) + with self.assertRaises(ValueError) as err: + self.interfaces_info.get_interfaces_info() + self.assertEqual("No address configured", str(err.exception)) + mock_run.assert_called_once_with("ip -4 -o addr show") + + @mock.patch('crmsh.utils.Interface') + @mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') + def test_get_interfaces_info_one_addr(self, mock_run, mock_interface): + mock_run.return_value = (0, self.network_output_error, None) + mock_interface_inst_1 = mock.Mock(is_loopback=True, is_link_local=False) + mock_interface_inst_2 = mock.Mock(is_loopback=False, is_link_local=False) + mock_interface.side_effect = [mock_interface_inst_1, mock_interface_inst_2] + + with self.assertRaises(ValueError) as err: + self.interfaces_info_with_second_hb.get_interfaces_info() + self.assertEqual("Cannot configure second heartbeat, since only one address is available", str(err.exception)) + + mock_run.assert_called_once_with("ip -4 -o addr show") + mock_interface.assert_has_calls([ + mock.call("127.0.0.1/8"), + mock.call("192.168.122.241/24") + ]) + + def test_nic_list(self): + res = self.interfaces_info_fake.nic_list + self.assertEqual(res, ["eth0", "eth1"]) + + def test_interface_list(self): + res = self.interfaces_info_fake.interface_list + assert len(res) == 3 + + @mock.patch('crmsh.utils.InterfacesInfo.interface_list', new_callable=mock.PropertyMock) + def test_ip_list(self, mock_interface_list): + mock_interface_list.return_value = [ + mock.Mock(ip="10.10.10.1"), + mock.Mock(ip="10.10.10.2") + ] + res = self.interfaces_info_fake.ip_list + self.assertEqual(res, ["10.10.10.1", "10.10.10.2"]) + mock_interface_list.assert_called_once_with() + + @mock.patch('crmsh.utils.InterfacesInfo.ip_list', new_callable=mock.PropertyMock) + @mock.patch('crmsh.utils.InterfacesInfo.get_interfaces_info') + def test_get_local_ip_list(self, mock_get_info, mock_ip_list): + mock_ip_list.return_value = ["10.10.10.1", "10.10.10.2"] + res = utils.InterfacesInfo.get_local_ip_list(False) + self.assertEqual(res, mock_ip_list.return_value) + mock_get_info.assert_called_once_with() + mock_ip_list.assert_called_once_with() + + @mock.patch('crmsh.utils.InterfacesInfo.ip_list', new_callable=mock.PropertyMock) + @mock.patch('crmsh.utils.IP.is_ipv6') + @mock.patch('crmsh.utils.InterfacesInfo.get_interfaces_info') + def test_ip_in_local(self, mock_get_info, mock_is_ipv6, mock_ip_list): + mock_is_ipv6.return_value = False + mock_ip_list.return_value = ["10.10.10.1", "10.10.10.2"] + res = utils.InterfacesInfo.ip_in_local("10.10.10.1") + assert res is True + mock_get_info.assert_called_once_with() + mock_ip_list.assert_called_once_with() + mock_is_ipv6.assert_called_once_with("10.10.10.1") + + @mock.patch('crmsh.utils.InterfacesInfo.interface_list', new_callable=mock.PropertyMock) + def test_network_list(self, mock_interface_list): + mock_interface_list.return_value = [ + mock.Mock(network="10.10.10.0"), + mock.Mock(network="20.20.20.0") + ] + res = self.interfaces_info.network_list + self.assertEqual(res, list(set(["10.10.10.0", "20.20.20.0"]))) + mock_interface_list.assert_called_once_with() + + def test_nic_first_ip(self): + res = self.interfaces_info_fake._nic_first_ip("eth0") + self.assertEqual(res, "10.10.10.1") + + @mock.patch('crmsh.utils.InterfacesInfo.nic_list', new_callable=mock.PropertyMock) + @mock.patch('logging.Logger.warning') + @mock.patch('crmsh.utils.InterfacesInfo.get_interfaces_info') + @mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') + def test_get_default_nic_list_from_route_no_default(self, mock_run, mock_get_interfaces_info, mock_warn, mock_nic_list): + output = """10.10.10.0/24 dev eth1 proto kernel scope link src 10.10.10.51 + 20.20.20.0/24 dev eth2 proto kernel scope link src 20.20.20.51""" + mock_run.return_value = (0, output, None) + mock_nic_list.side_effect = [["eth0", "eth1"], ["eth0", "eth1"]] + + res = self.interfaces_info.get_default_nic_list_from_route() + self.assertEqual(res, ["eth0"]) + + mock_run.assert_called_once_with("ip -o route show") + mock_warn.assert_called_once_with("No default route configured. Using the first found nic") + mock_nic_list.assert_has_calls([mock.call(), mock.call()]) + + @mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') + def test_get_default_nic_list_from_route(self, mock_run): + output = """default via 192.168.122.1 dev eth8 proto dhcp + 10.10.10.0/24 dev eth1 proto kernel scope link src 10.10.10.51 + 20.20.20.0/24 dev eth2 proto kernel scope link src 20.20.20.51 + 192.168.122.0/24 dev eth8 proto kernel scope link src 192.168.122.120""" + mock_run.return_value = (0, output, None) + + res = self.interfaces_info.get_default_nic_list_from_route() + self.assertEqual(res, ["eth8"]) + + mock_run.assert_called_once_with("ip -o route show") + + @mock.patch('crmsh.utils.InterfacesInfo.nic_list', new_callable=mock.PropertyMock) + def test_get_default_ip_list_failed_detect(self, mock_nic_list): + mock_nic_list.side_effect = [["eth0", "eth1"], ["eth0", "eth1"]] + + with self.assertRaises(ValueError) as err: + self.interfaces_info_with_wrong_nic.get_default_ip_list() + self.assertEqual("Failed to detect IP address for eth7", str(err.exception)) + + mock_nic_list.assert_has_calls([mock.call(), mock.call()]) + + @mock.patch('crmsh.utils.InterfacesInfo._nic_first_ip') + @mock.patch('crmsh.utils.InterfacesInfo.nic_list', new_callable=mock.PropertyMock) + def test_get_default_ip_list(self, mock_nic_list, mock_first_ip): + mock_nic_list.side_effect = [["eth0", "eth1"], ["eth0", "eth1"], ["eth0", "eth1"]] + mock_first_ip.side_effect = ["10.10.10.1", "20.20.20.1"] + + res = self.interfaces_info_with_custom_nic.get_default_ip_list() + self.assertEqual(res, ["10.10.10.1", "20.20.20.1"]) + + mock_nic_list.assert_has_calls([mock.call(), mock.call(), mock.call()]) + mock_first_ip.assert_has_calls([mock.call("eth1"), mock.call("eth0")]) + + +@mock.patch("crmsh.utils.get_nodeid_from_name") +def test_get_iplist_from_name_no_nodeid(mock_get_nodeid): + mock_get_nodeid.return_value = None + res = utils.get_iplist_from_name("test") + assert res == [] + mock_get_nodeid.assert_called_once_with("test") + + +@mock.patch("crmsh.utils.get_nodeinfo_from_cmaptool") +@mock.patch("crmsh.utils.get_nodeid_from_name") +def test_get_iplist_from_name_no_nodeinfo(mock_get_nodeid, mock_get_nodeinfo): + mock_get_nodeid.return_value = "1" + mock_get_nodeinfo.return_value = None + res = utils.get_iplist_from_name("test") + assert res == [] + mock_get_nodeid.assert_called_once_with("test") + mock_get_nodeinfo.assert_called_once_with() + + +@mock.patch("crmsh.utils.get_nodeinfo_from_cmaptool") +@mock.patch("crmsh.utils.get_nodeid_from_name") +def test_get_iplist_from_name(mock_get_nodeid, mock_get_nodeinfo): + mock_get_nodeid.return_value = "1" + mock_get_nodeinfo.return_value = {"1": ["10.10.10.1"], "2": ["10.10.10.2"]} + res = utils.get_iplist_from_name("test") + assert res == ["10.10.10.1"] + mock_get_nodeid.assert_called_once_with("test") + mock_get_nodeinfo.assert_called_once_with() + + +@mock.patch("crmsh.sh.ShellUtils.get_stdout_stderr") +def test_ping_node(mock_run): + mock_run.return_value = (1, None, "error data") + with pytest.raises(ValueError) as err: + utils.ping_node("node_unreachable") + assert str(err.value) == 'host "node_unreachable" is unreachable: error data' + mock_run.assert_called_once_with("ping -c 1 node_unreachable") + + +def test_calculate_quorate_status(): + assert utils.calculate_quorate_status(3, 2) is True + assert utils.calculate_quorate_status(3, 1) is False + + +@mock.patch("crmsh.sh.ClusterShell.get_stdout_or_raise_error") +def test_get_quorum_votes_dict(mock_run): + mock_run.return_value = """ +Votequorum information +---------------------- +Expected votes: 1 +Highest expected: 1 +Total votes: 1 +Quorum: 1 +Flags: Quorate + """ + res = utils.get_quorum_votes_dict() + assert res == {'Expected': '1', 'Total': '1'} + mock_run.assert_called_once_with("corosync-quorumtool -s", None, success_exit_status={0, 2}) + + +def test_re_split_string(): + assert utils.re_split_string('[; ]', "/dev/sda1; /dev/sdb1 ; ") == ["/dev/sda1", "/dev/sdb1"] + assert utils.re_split_string('[; ]', "/dev/sda1 ") == ["/dev/sda1"] + + +@mock.patch('crmsh.utils.get_dev_info') +def test_has_dev_partitioned(mock_get_dev_info): + mock_get_dev_info.return_value = """ +disk +part + """ + res = utils.has_dev_partitioned("/dev/sda1") + assert res is True + mock_get_dev_info.assert_called_once_with("/dev/sda1", "NAME", peer=None) + + +@mock.patch('crmsh.utils.get_dev_uuid') +def test_compare_uuid_with_peer_dev_cannot_find_local(mock_get_dev_uuid): + mock_get_dev_uuid.return_value = "" + with pytest.raises(ValueError) as err: + utils.compare_uuid_with_peer_dev(["/dev/sdb1"], "node2") + assert str(err.value) == "Cannot find UUID for /dev/sdb1 on local" + mock_get_dev_uuid.assert_called_once_with("/dev/sdb1") + + +@mock.patch('crmsh.utils.get_dev_uuid') +def test_compare_uuid_with_peer_dev_cannot_find_peer(mock_get_dev_uuid): + mock_get_dev_uuid.side_effect = ["1234", ""] + with pytest.raises(ValueError) as err: + utils.compare_uuid_with_peer_dev(["/dev/sdb1"], "node2") + assert str(err.value) == "Cannot find UUID for /dev/sdb1 on node2" + mock_get_dev_uuid.assert_has_calls([ + mock.call("/dev/sdb1"), + mock.call("/dev/sdb1", "node2") + ]) + + +@mock.patch('crmsh.utils.get_dev_uuid') +def test_compare_uuid_with_peer_dev(mock_get_dev_uuid): + mock_get_dev_uuid.side_effect = ["1234", "5678"] + with pytest.raises(ValueError) as err: + utils.compare_uuid_with_peer_dev(["/dev/sdb1"], "node2") + assert str(err.value) == "UUID of /dev/sdb1 not same with peer node2" + mock_get_dev_uuid.assert_has_calls([ + mock.call("/dev/sdb1"), + mock.call("/dev/sdb1", "node2") + ]) + + +@mock.patch('crmsh.utils.get_dev_info') +def test_is_dev_used_for_lvm(mock_dev_info): + mock_dev_info.return_value = "lvm" + res = utils.is_dev_used_for_lvm("/dev/sda1") + assert res is True + mock_dev_info.assert_called_once_with("/dev/sda1", "TYPE", peer=None) + + +@mock.patch('crmsh.utils.get_dev_info') +def test_is_dev_a_plain_raw_disk_or_partition(mock_dev_info): + mock_dev_info.return_value = "raid1\nlvm" + res = utils.is_dev_a_plain_raw_disk_or_partition("/dev/md127") + assert res is False + mock_dev_info.assert_called_once_with("/dev/md127", "TYPE", peer=None) + + +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_get_dev_info(mock_run): + mock_run.return_value = "data" + res = utils.get_dev_info("/dev/sda1", "TYPE") + assert res == "data" + mock_run.assert_called_once_with("lsblk -fno TYPE /dev/sda1", None) + + +@mock.patch('crmsh.utils.get_dev_info') +def test_get_dev_fs_type(mock_get_info): + mock_get_info.return_value = "data" + res = utils.get_dev_fs_type("/dev/sda1") + assert res == "data" + mock_get_info.assert_called_once_with("/dev/sda1", "FSTYPE", peer=None) + + +@mock.patch('crmsh.utils.get_dev_info') +def test_get_dev_uuid(mock_get_info): + mock_get_info.return_value = "uuid" + res = utils.get_dev_uuid("/dev/sda1") + assert res == "uuid" + mock_get_info.assert_called_once_with("/dev/sda1", "UUID", peer=None) + + +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_get_pe_number_except(mock_run): + mock_run.return_value = "data" + with pytest.raises(ValueError) as err: + utils.get_pe_number("vg1") + assert str(err.value) == "Cannot find PE on VG(vg1)" + mock_run.assert_called_once_with("vgdisplay vg1") + + +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_get_pe_number(mock_run): + mock_run.return_value = """ +PE Size 4.00 MiB +Total PE 1534 +Alloc PE / Size 1534 / 5.99 GiB + """ + res = utils.get_pe_number("vg1") + assert res == 1534 + mock_run.assert_called_once_with("vgdisplay vg1") + + +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_get_all_vg_name(mock_run): + mock_run.return_value = """ +--- Volume group --- + VG Name ocfs2-vg + System ID + """ + res = utils.get_all_vg_name() + assert res == ["ocfs2-vg"] + mock_run.assert_called_once_with("vgdisplay") + + +@mock.patch('crmsh.utils.randomword') +def test_gen_unused_id(mock_rand): + mock_rand.return_value = "1234xxxx" + res = utils.gen_unused_id(["test-id"], "test-id") + assert res == "test-id-1234xxxx" + mock_rand.assert_called_once_with(6) + + +@mock.patch('random.choice') +def test_randomword(mock_rand): + import string + mock_rand.side_effect = ['z', 'f', 'k', 'e', 'c', 'd'] + res = utils.randomword() + assert res == "zfkecd" + mock_rand.assert_has_calls([mock.call(string.ascii_lowercase) for x in range(6)]) + + +@mock.patch('crmsh.cibconfig.cib_factory') +def test_all_exist_id(mock_cib): + mock_cib.refresh = mock.Mock() + mock_cib.id_list = mock.Mock() + mock_cib.id_list.return_value = ['1', '2'] + res = utils.all_exist_id() + assert res == ['1', '2'] + mock_cib.id_list.assert_called_once_with() + mock_cib.refresh.assert_called_once_with() + + +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_has_mount_point_used(mock_run): + mock_run.return_value = """ +/dev/vda2 on /usr/local type btrfs (rw,relatime,space_cache,subvolid=259,subvol=/@/usr/local) +/dev/vda2 on /opt type btrfs (rw,relatime,space_cache,subvolid=263,subvol=/@/opt) +/dev/vda2 on /var/lib/docker/btrfs type btrfs (rw,relatime,space_cache,subvolid=258,subvol=/@/var) + """ + res = utils.has_mount_point_used("/opt") + assert res is True + mock_run.assert_called_once_with("mount") + + +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_has_disk_mounted(mock_run): + mock_run.return_value = """ +/dev/vda2 on /usr/local type btrfs (rw,relatime,space_cache,subvolid=259,subvol=/@/usr/local) +/dev/vda2 on /opt type btrfs (rw,relatime,space_cache,subvolid=263,subvol=/@/opt) +/dev/vda2 on /var/lib/docker/btrfs type btrfs (rw,relatime,space_cache,subvolid=258,subvol=/@/var) + """ + res = utils.has_disk_mounted("/dev/vda2") + assert res is True + mock_run.assert_called_once_with("mount") + + +@mock.patch('crmsh.sbd.SBDManager.is_using_diskless_sbd') +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_has_stonith_running(mock_run, mock_diskless): + mock_run.return_value = """ +stonith-sbd +1 fence device found + """ + mock_diskless.return_value = True + res = utils.has_stonith_running() + assert res is True + mock_run.assert_called_once_with("stonith_admin -L") + mock_diskless.assert_called_once_with() + + +@mock.patch('crmsh.utils.S_ISBLK') +@mock.patch('os.stat') +def test_is_block_device_error(mock_stat, mock_isblk): + mock_stat_inst = mock.Mock(st_mode=12345) + mock_stat.return_value = mock_stat_inst + mock_isblk.side_effect = OSError + res = utils.is_block_device("/dev/sda1") + assert res is False + mock_stat.assert_called_once_with("/dev/sda1") + mock_isblk.assert_called_once_with(12345) + + +@mock.patch('crmsh.utils.S_ISBLK') +@mock.patch('os.stat') +def test_is_block_device(mock_stat, mock_isblk): + mock_stat_inst = mock.Mock(st_mode=12345) + mock_stat.return_value = mock_stat_inst + mock_isblk.return_value = True + res = utils.is_block_device("/dev/sda1") + assert res is True + mock_stat.assert_called_once_with("/dev/sda1") + mock_isblk.assert_called_once_with(12345) + + +@mock.patch('crmsh.utils.ping_node') +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_check_all_nodes_reachable(mock_run, mock_ping): + mock_run.return_value = "1084783297 15sp2-1 member" + utils.check_all_nodes_reachable() + mock_run.assert_called_once_with("crm_node -l") + mock_ping.assert_called_once_with("15sp2-1") + + +@mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') +def test_detect_virt(mock_run): + mock_run.return_value = (0, None, None) + assert utils.detect_virt() is True + mock_run.assert_called_once_with("systemd-detect-virt") + + +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_is_standby(mock_run): + mock_run.return_value = """ +Node List: +* Node 15sp2-1: standby + """ + assert utils.is_standby("15sp2-1") is True + mock_run.assert_called_once_with("crm_mon -1") + + +@mock.patch('crmsh.sh.cluster_shell') +def test_get_dlm_option_dict(mock_run): + mock_run_inst = mock.Mock() + mock_run.return_value = mock_run_inst + mock_run_inst.get_stdout_or_raise_error.return_value = """ +key1=value1 +key2=value2 + """ + res_dict = utils.get_dlm_option_dict() + assert res_dict == { + "key1": "value1", + "key2": "value2" + } + mock_run_inst.get_stdout_or_raise_error.assert_called_once_with("dlm_tool dump_config", None) + + +@mock.patch('crmsh.utils.get_dlm_option_dict') +def test_set_dlm_option_exception(mock_get_dict): + mock_get_dict.return_value = { + "key1": "value1", + "key2": "value2" + } + with pytest.raises(ValueError) as err: + utils.set_dlm_option(name="xin") + assert str(err.value) == '"name" is not dlm config option' + + +@mock.patch('crmsh.sh.cluster_shell') +@mock.patch('crmsh.utils.get_dlm_option_dict') +def test_set_dlm_option(mock_get_dict, mock_run): + mock_run_inst = mock.Mock() + mock_run.return_value = mock_run_inst + mock_get_dict.return_value = { + "key1": "value1", + "key2": "value2" + } + utils.set_dlm_option(key2="test") + mock_run_inst.get_stdout_or_raise_error.assert_called_once_with('dlm_tool set_config "key2=test"', None) + + +@mock.patch('crmsh.utils.has_resource_configured') +def test_is_dlm_configured(mock_configured): + mock_configured.return_value = True + assert utils.is_dlm_configured() is True + mock_configured.assert_called_once_with(constants.DLM_CONTROLD_RA, peer=None) + + +@mock.patch('crmsh.sh.cluster_shell') +def test_is_quorate_exception(mock_run): + mock_run_inst = mock.Mock() + mock_run.return_value = mock_run_inst + mock_run_inst.get_stdout_or_raise_error.return_value = "data" + with pytest.raises(ValueError) as err: + utils.is_quorate() + assert str(err.value) == "Failed to get quorate status from corosync-quorumtool" + mock_run_inst.get_stdout_or_raise_error.assert_called_once_with("corosync-quorumtool -s", None, success_exit_status={0, 2}) + + +@mock.patch('crmsh.sh.cluster_shell') +def test_is_quorate(mock_run): + mock_run_inst = mock.Mock() + mock_run.return_value = mock_run_inst + mock_run_inst.get_stdout_or_raise_error.return_value = """ +Ring ID: 1084783297/440 +Quorate: Yes + """ + assert utils.is_quorate() is True + mock_run_inst.get_stdout_or_raise_error.assert_called_once_with("corosync-quorumtool -s", None, success_exit_status={0, 2}) + + +@mock.patch('crmsh.utils.etree.fromstring') +@mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') +def test_list_cluster_nodes_none(mock_run, mock_etree): + mock_run.return_value = (0, "data", None) + mock_etree.return_value = None + res = utils.list_cluster_nodes() + assert res is None + mock_run.assert_called_once_with(constants.CIB_QUERY, no_reg=False) + mock_etree.assert_called_once_with("data") + + +@mock.patch('crmsh.utils.etree.fromstring') +@mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') +def test_list_cluster_nodes_none_no_reg(mock_run, mock_etree): + mock_run.return_value = (0, "data", None) + mock_etree.return_value = None + res = utils.list_cluster_nodes(no_reg=True) + assert res is None + mock_run.assert_called_once_with(constants.CIB_QUERY, no_reg=True) + mock_etree.assert_called_once_with("data") + + +@mock.patch('os.path.isfile') +@mock.patch('os.getenv') +@mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') +def test_list_cluster_nodes_cib_not_exist(mock_run, mock_env, mock_isfile): + mock_run.return_value = (1, None, None) + mock_env.return_value = constants.CIB_RAW_FILE + mock_isfile.return_value = False + res = utils.list_cluster_nodes() + assert res is None + mock_run.assert_called_once_with(constants.CIB_QUERY, no_reg=False) + mock_env.assert_called_once_with("CIB_file", constants.CIB_RAW_FILE) + mock_isfile.assert_called_once_with(constants.CIB_RAW_FILE) + + +@mock.patch('crmsh.xmlutil.file2cib_elem') +@mock.patch('os.path.isfile') +@mock.patch('os.getenv') +@mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') +def test_list_cluster_nodes(mock_run, mock_env, mock_isfile, mock_file2elem): + mock_run.return_value = (1, None, None) + mock_env.return_value = constants.CIB_RAW_FILE + mock_isfile.return_value = True + mock_cib_inst = mock.Mock() + mock_file2elem.return_value = mock_cib_inst + mock_node_inst1 = mock.Mock() + mock_node_inst2 = mock.Mock() + mock_node_inst1.get.side_effect = ["node1", "remote"] + mock_node_inst2.get.side_effect = ["node2", "member"] + mock_cib_inst.xpath.side_effect = [[mock_node_inst1, mock_node_inst2], "data"] + + res = utils.list_cluster_nodes() + assert res == ["node2"] + + mock_run.assert_called_once_with(constants.CIB_QUERY, no_reg=False) + mock_env.assert_called_once_with("CIB_file", constants.CIB_RAW_FILE) + mock_isfile.assert_called_once_with(constants.CIB_RAW_FILE) + mock_file2elem.assert_called_once_with(constants.CIB_RAW_FILE) + mock_cib_inst.xpath.assert_has_calls([ + mock.call(constants.XML_NODE_PATH), + mock.call("//primitive[@id='node1']/instance_attributes/nvpair[@name='server']") + ]) + + +@mock.patch('os.getenv') +@mock.patch('crmsh.sh.cluster_shell') +def test_get_property(mock_run, mock_env): + mock_run_inst = mock.Mock() + mock_run.return_value = mock_run_inst + mock_run_inst.get_rc_stdout_stderr_without_input.return_value = (0, "data", "") + mock_env.return_value = "cib.xml" + assert utils.get_property("no-quorum-policy") == "data" + mock_run_inst.get_rc_stdout_stderr_without_input.assert_called_once_with(None, "CIB_file=cib.xml sudo --preserve-env=CIB_file crm configure get_property no-quorum-policy") + + +@mock.patch('logging.Logger.warning') +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +@mock.patch('crmsh.utils.get_property') +def test_set_property(mock_get, mock_run, mock_warn): + mock_get.return_value = "start" + utils.set_property("no-quorum-policy", "stop") + mock_run.assert_called_once_with("crm configure property no-quorum-policy=stop") + mock_warn.assert_called_once_with('"no-quorum-policy" in crm_config is set to stop, it was start') + + +@mock.patch('crmsh.utils.get_property') +def test_set_property_the_same(mock_get): + mock_get.return_value = "value1" + utils.set_property("no-quorum-policy", "value1") + mock_get.assert_called_once_with("no-quorum-policy", "crm_config") + + +@mock.patch('crmsh.utils.crm_msec') +@mock.patch('crmsh.utils.get_property') +def test_set_property_conditional(mock_get, mock_msec): + mock_get.return_value = "10s" + mock_msec.side_effect = ["1000", "1000"] + utils.set_property("timeout", "10", conditional=True) + mock_get.assert_called_once_with("timeout", "crm_config") + mock_msec.assert_has_calls([mock.call("10s"), mock.call("10")]) + + +@mock.patch('crmsh.utils.is_dlm_configured') +def test_check_no_quorum_policy_with_dlm_return(mock_dlm): + mock_dlm.return_value = False + utils.check_no_quorum_policy_with_dlm() + mock_dlm.assert_called_once_with() + + +@mock.patch('logging.Logger.warning') +@mock.patch('crmsh.utils.get_property') +@mock.patch('crmsh.utils.is_dlm_configured') +def test_check_no_quorum_policy_with_dlm(mock_dlm, mock_get_property, mock_warn): + mock_dlm.return_value = True + mock_get_property.return_value = "stop" + utils.check_no_quorum_policy_with_dlm() + mock_dlm.assert_called_once_with() + mock_get_property.assert_called_once_with("no-quorum-policy") + mock_warn.assert_called_once_with('The DLM cluster best practice suggests to set the cluster property "no-quorum-policy=freeze"') + + +@mock.patch('crmsh.utils.is_qdevice_configured') +@mock.patch('crmsh.utils.list_cluster_nodes') +def test_is_2node_cluster_without_qdevice(mock_list, mock_is_qdevice): + mock_list.return_value = ["node1", "node2"] + mock_is_qdevice.return_value = False + res = utils.is_2node_cluster_without_qdevice() + assert res is True + mock_list.assert_called_once_with() + mock_is_qdevice.assert_called_once_with() + + +def test_get_systemd_timeout_start_in_sec(): + res = utils.get_systemd_timeout_start_in_sec("1min 31s") + assert res == 91 + + +@mock.patch('crmsh.utils.is_larger_than_min_version') +@mock.patch('crmsh.cibconfig.cib_factory') +def test_is_ocf_1_1_cib_schema_detected(mock_cib, mock_larger): + config.core.OCF_1_1_SUPPORT = True + mock_cib.get_schema = mock.Mock() + mock_cib.get_schema.return_value = "pacemaker-3.5" + mock_larger.return_value = True + assert utils.is_ocf_1_1_cib_schema_detected() is True + mock_cib.get_schema.assert_called_once_with() + mock_larger.assert_called_once_with("pacemaker-3.5", constants.SCHEMA_MIN_VER_SUPPORT_OCF_1_1) + + +@mock.patch('logging.Logger.warning') +@mock.patch('crmsh.utils.is_ocf_1_1_cib_schema_detected') +def test_handle_role_for_ocf_1_1(mock_support, mock_warn): + mock_support.return_value = False + assert utils.handle_role_for_ocf_1_1("Promoted") == "Master" + mock_support.assert_called_once_with() + mock_warn.assert_called_once_with('Convert "%s" to "%s" since the current schema version is old and not upgraded yet. Please consider "%s"', "Promoted", "Master", constants.CIB_UPGRADE) + + +@mock.patch('logging.Logger.info') +@mock.patch('crmsh.utils.is_ocf_1_1_cib_schema_detected') +def test_handle_role_for_ocf_1_1_convert_new(mock_support, mock_info): + config.core.OCF_1_1_SUPPORT = True + mock_support.return_value = True + assert utils.handle_role_for_ocf_1_1("Master") == "Promoted" + mock_support.assert_called_once_with() + mock_info.assert_called_once_with('Convert deprecated "%s" to "%s"', "Master", "Promoted") + + +@mock.patch('crmsh.utils.is_ocf_1_1_cib_schema_detected') +def test_handle_role_for_ocf_1_1_return(mock_support): + mock_support.return_value = True + assert utils.handle_role_for_ocf_1_1("Promoted") == "Promoted" + mock_support.assert_called_once_with() + + +def test_handle_role_for_ocf_1_1_return_not_role(): + assert utils.handle_role_for_ocf_1_1("test", name='other') == "test" + + +def test_compatible_role(): + assert utils.compatible_role("Slave", "Unpromoted") is True + + +@mock.patch('logging.Logger.warning') +@mock.patch('crmsh.sh.ClusterShell.get_stdout_or_raise_error') +def test_fetch_cluster_node_list_from_node(mock_run, mock_warn): + mock_run.return_value = """ + + 1 node1 + 2 node2 lost + 3 node3 member + """ + assert utils.fetch_cluster_node_list_from_node("node1") == ["node3"] + mock_run.assert_called_once_with("crm_node -l", "node1") + mock_warn.assert_has_calls([ + mock.call("The node '%s' has no known name and/or state information", "1"), + mock.call("The node '%s'(state '%s') is not a current member", "node2", "lost") + ]) + + +@mock.patch('crmsh.utils.list_cluster_nodes_except_me') +def test_cluster_copy_file_return(mock_list_nodes): + mock_list_nodes.return_value = [] + assert utils.cluster_copy_file("/file1") == True + + +@mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') +def test_has_sudo_access(mock_run): + mock_run.return_value = (0, None, None) + assert utils.has_sudo_access() is True + mock_run.assert_called_once_with("sudo -S -k -n id -u") + + +@mock.patch('grp.getgrgid') +@mock.patch('os.getgroups') +def test_in_haclient(mock_group, mock_getgrgid): + mock_group.return_value = [90, 100] + mock_getgrgid_inst1 = mock.Mock(gr_name=constants.HA_GROUP) + mock_getgrgid_inst2 = mock.Mock(gr_name="other") + mock_getgrgid.side_effect = [mock_getgrgid_inst1, mock_getgrgid_inst2] + assert utils.in_haclient() is True + mock_group.assert_called_once_with() + + +@mock.patch('crmsh.utils.in_haclient') +@mock.patch('crmsh.userdir.getuser') +def test_check_user_access_root(mock_user, mock_in): + mock_user.return_value = 'root' + utils.check_user_access('cluster') + mock_in.assert_not_called() + + +@mock.patch('crmsh.utils.has_sudo_access') +@mock.patch('crmsh.utils.in_haclient') +@mock.patch('crmsh.userdir.getuser') +def test_check_user_access_haclient(mock_user, mock_in, mock_sudo): + mock_user.return_value = 'user' + mock_in.return_value = True + utils.check_user_access('ra') + mock_sudo.assert_not_called() + + +@mock.patch('logging.Logger.error') +@mock.patch('crmsh.utils.has_sudo_access') +@mock.patch('crmsh.utils.in_haclient') +@mock.patch('crmsh.userdir.getuser') +def test_check_user_access_need_sudo(mock_user, mock_in, mock_sudo, mock_error): + mock_user.return_value = 'user' + mock_in.return_value = False + mock_sudo.return_value = True + with pytest.raises(utils.TerminateSubCommand) as err: + utils.check_user_access('ra') + mock_error.assert_called_once_with('Please run this command starting with "sudo"') + + +@mock.patch('logging.Logger.error') +@mock.patch('crmsh.utils.has_sudo_access') +@mock.patch('crmsh.utils.in_haclient') +@mock.patch('crmsh.userdir.getuser') +def test_check_user_access_acl(mock_user, mock_in, mock_sudo, mock_error): + mock_user.return_value = 'user' + mock_in.return_value = False + mock_sudo.return_value = False + with pytest.raises(utils.TerminateSubCommand) as err: + utils.check_user_access('ra') + mock_error.assert_called_once_with('This command needs higher privilege.\nOption 1) Please consider to add "user" as sudoer. For example:\n sudo bash -c \'echo "user ALL=(ALL) NOPASSWD:ALL" > /etc/sudoers.d/user\'\nOption 2) Add "user" to the haclient group. For example:\n sudo usermod -g haclient user') + + +@mock.patch('logging.Logger.error') +@mock.patch('crmsh.utils.has_sudo_access') +@mock.patch('crmsh.utils.in_haclient') +@mock.patch('crmsh.userdir.getuser') +def test_check_user_access_cluster(mock_user, mock_in, mock_sudo, mock_error): + mock_user.return_value = 'user' + mock_in.return_value = False + mock_sudo.return_value = False + with pytest.raises(utils.TerminateSubCommand) as err: + utils.check_user_access('cluster') + mock_error.assert_called_once_with('Please run this command starting with "sudo".\nCurrently, this command needs to use sudo to escalate itself as root.\nPlease consider to add "user" as sudoer. For example:\n sudo bash -c \'echo "user ALL=(ALL) NOPASSWD:ALL" > /etc/sudoers.d/user\'') |