1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
|
import concurrent.futures
import difflib
import tarfile
import glob
import re
import socket
from crmsh import utils, userdir
from crmsh.sh import ShellUtils
import behave_agent
COLOR_MODE = r'\x1b\[[0-9]+m'
def get_file_type(file_path):
rc, out, _ = ShellUtils().get_stdout_stderr("file {}".format(file_path))
if re.search(r'{}: bzip2'.format(file_path), out):
return "bzip2"
if re.search(r'{}: directory'.format(file_path), out):
return "directory"
def get_all_files(archive_path):
archive_type = get_file_type(archive_path)
if archive_type == "bzip2":
with tarfile.open(archive_path) as tar:
return tar.getnames()
if archive_type == "directory":
all_files = glob.glob("{}/*".format(archive_path)) + glob.glob("{}/*/*".format(archive_path))
return all_files
def file_in_archive(f, archive_path):
for item in get_all_files(archive_path):
if re.search(r'/{}$'.format(f), item):
return True
return False
def me():
return socket.gethostname()
def _wrap_cmd_non_root(cmd):
"""
When running command under sudoer, or the current user is not root,
wrap crm cluster join command with '<user>@', and for the -N option, too
"""
sudoer = userdir.get_sudoer()
current_user = userdir.getuser()
if sudoer:
user = sudoer
elif current_user != 'root':
user = current_user
else:
return cmd
if re.search('cluster (:?join|geo_join|geo_init_arbitrator)', cmd) and "@" not in cmd:
cmd = re.sub(r'''((?:-c|-N|--qnetd-hostname|--cluster-node|--arbitrator)(?:\s+|=)['"]?)(\S{2,}['"]?)''', f'\\1{user}@\\2', cmd)
elif "cluster init" in cmd and ("-N" in cmd or "--qnetd-hostname" in cmd) and "@" not in cmd:
cmd = re.sub(r'''((?:-c|-N|--qnetd-hostname|--cluster-node)(?:\s+|=)['"]?)(\S{2,}['"]?)''', f'\\1{user}@\\2', cmd)
elif "cluster init" in cmd and "--node" in cmd and "@" not in cmd:
search_patt = r"--node [\'\"](.*)[\'\"]"
res = re.search(search_patt, cmd)
if res:
node_str = ' '.join([f"{user}@{n}" for n in res.group(1).split()])
cmd = re.sub(search_patt, f"--node '{node_str}'", cmd)
return cmd
def run_command(context, cmd, exit_on_fail=True):
cmd = _wrap_cmd_non_root(cmd)
rc, out, err = ShellUtils().get_stdout_stderr(cmd)
context.return_code = rc
if out:
out = re.sub(COLOR_MODE, '', out)
context.stdout = out
if err:
err = re.sub(COLOR_MODE, '', err)
context.stderr = err
if rc != 0 and exit_on_fail:
if out:
context.logger.info("\n{}\n".format(out))
context.logger.error("\n{}\n".format(err))
context.failed = True
return rc, out, err
def run_command_local_or_remote(context, cmd, addr, exit_on_fail=True):
if addr == me():
return run_command(context, cmd, exit_on_fail)
cmd = _wrap_cmd_non_root(cmd)
sudoer = userdir.get_sudoer()
if sudoer is None:
user = None
else:
user = sudoer
cmd = f'sudo {cmd}'
hosts = addr.split(',')
with concurrent.futures.ThreadPoolExecutor(max_workers=len(hosts)) as executor:
results = list(executor.map(lambda x: (x, behave_agent.call(x, 1122, cmd, user=user)), hosts))
out = utils.to_ascii(results[0][1][1])
err = utils.to_ascii(results[0][1][2])
context.stdout = out
context.stderr = err
context.return_code = 0
for host, (rc, stdout, stderr) in results:
if rc != 0:
err = re.sub(COLOR_MODE, '', utils.to_ascii(stderr))
context.stderr = err
if exit_on_fail:
import os
context.logger.error("Failed to run %s on %s@%s :%s", cmd, os.geteuid(), host, err)
raise ValueError("{}".format(err))
else:
return
return 0, out, err
def check_service_state(context, service_name, state, addr):
if state not in ["started", "stopped", "enabled", "disabled"]:
context.logger.error("\nService state should be \"started/stopped/enabled/disabled\"\n")
context.failed = True
if state in {'enabled', 'disabled'}:
rc, _, _ = behave_agent.call(addr, 1122, f'systemctl is-enabled {service_name}', 'root')
return (state == 'enabled') == (rc == 0)
elif state in {'started', 'stopped'}:
rc, _, _ = behave_agent.call(addr, 1122, f'systemctl is-active {service_name}', 'root')
return (state == 'started') == (rc == 0)
else:
context.logger.error("\nService state should be \"started/stopped/enabled/disabled\"\n")
raise ValueError("Service state should be \"started/stopped/enabled/disabled\"")
def check_cluster_state(context, state, addr):
return check_service_state(context, 'pacemaker.service', state, addr)
def is_unclean(node):
rc, out, err = ShellUtils().get_stdout_stderr("crm_mon -1")
return "{}: UNCLEAN".format(node) in out
def online(context, nodelist):
rc = True
_, out = ShellUtils().get_stdout("sudo crm_node -l")
for node in nodelist.split():
node_info = "{} member".format(node)
if not node_info in out:
rc = False
context.logger.error("\nNode \"{}\" not online\n".format(node))
return rc
def assert_eq(expected, actual):
if expected != actual:
msg = "\033[32m" "Expected" "\033[31m" " != Actual" "\033[0m" "\n" \
"\033[32m" "Expected:" "\033[0m" " {}\n" \
"\033[31m" "Actual:" "\033[0m" " {}".format(expected, actual)
if isinstance(expected, str) and '\n' in expected:
try:
diff = '\n'.join(difflib.unified_diff(
expected.splitlines(),
actual.splitlines(),
fromfile="expected",
tofile="actual",
lineterm="",
))
msg = "{}\n" "\033[31m" "Diff:" "\033[0m" "\n{}".format(msg, diff)
except Exception:
pass
raise AssertionError(msg)
def assert_in(expected, actual):
if expected not in actual:
msg = "\033[32m" "Expected" "\033[31m" " not in Actual" "\033[0m" "\n" \
"\033[32m" "Expected:" "\033[0m" " {}\n" \
"\033[31m" "Actual:" "\033[0m" " {}".format(expected, actual)
raise AssertionError(msg)
|