diff options
Diffstat (limited to 'cts/lab')
-rw-r--r-- | cts/lab/CIB.py | 518 | ||||
-rw-r--r-- | cts/lab/CM_corosync.py | 60 | ||||
-rwxr-xr-x | cts/lab/CTSaudits.py | 879 | ||||
-rw-r--r-- | cts/lab/CTSlab.py.in | 135 | ||||
-rw-r--r-- | cts/lab/CTSscenarios.py | 563 | ||||
-rw-r--r-- | cts/lab/CTStests.py | 3178 | ||||
-rw-r--r-- | cts/lab/ClusterManager.py | 940 | ||||
-rw-r--r-- | cts/lab/Makefile.am | 31 | ||||
-rw-r--r-- | cts/lab/OCFIPraTest.py.in | 173 | ||||
-rw-r--r-- | cts/lab/__init__.py | 15 | ||||
-rw-r--r-- | cts/lab/cib_xml.py | 319 | ||||
-rwxr-xr-x | cts/lab/cluster_test.in | 175 | ||||
-rw-r--r-- | cts/lab/cts-log-watcher.in | 84 | ||||
-rwxr-xr-x | cts/lab/cts.in | 262 |
14 files changed, 7332 insertions, 0 deletions
diff --git a/cts/lab/CIB.py b/cts/lab/CIB.py new file mode 100644 index 0000000..5981654 --- /dev/null +++ b/cts/lab/CIB.py @@ -0,0 +1,518 @@ +""" CIB generator for Pacemaker's Cluster Test Suite (CTS) +""" + +__copyright__ = "Copyright 2008-2023 the Pacemaker project contributors" +__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" + +import os +import warnings +import tempfile + +from pacemaker.buildoptions import BuildOptions +from pacemaker._cts.CTS import CtsLab + + +class CibBase(object): + def __init__(self, Factory, tag, _id, **kwargs): + self.tag = tag + self.name = _id + self.kwargs = kwargs + self.children = [] + self.Factory = Factory + + def __repr__(self): + return "%s-%s" % (self.tag, self.name) + + def add_child(self, child): + self.children.append(child) + + def __setitem__(self, key, value): + if value: + self.kwargs[key] = value + else: + self.kwargs.pop(key, None) + +from cts.cib_xml import * + + +class ConfigBase(object): + cts_cib = None + version = "unknown" + Factory = None + + def __init__(self, CM, factory, tmpfile=None): + self.CM = CM + self.Factory = factory + + if not tmpfile: + warnings.filterwarnings("ignore") + f=tempfile.NamedTemporaryFile(delete=True) + f.close() + tmpfile = f.name + warnings.resetwarnings() + + self.Factory.tmpfile = tmpfile + + def version(self): + return self.version + + def NextIP(self): + ip = self.CM.Env["IPBase"] + if ":" in ip: + (prefix, sep, suffix) = ip.rpartition(":") + suffix = str(hex(int(suffix, 16)+1)).lstrip("0x") + else: + (prefix, sep, suffix) = ip.rpartition(".") + suffix = str(int(suffix)+1) + + ip = prefix + sep + suffix + self.CM.Env["IPBase"] = ip + return ip.strip() + + +class CIB12(ConfigBase): + version = "pacemaker-1.2" + counter = 1 + + def _show(self, command=""): + output = "" + (_, result) = self.Factory.rsh(self.Factory.target, "HOME=/root CIB_file="+self.Factory.tmpfile+" cibadmin -Ql "+command, verbose=1) + for line in result: + output += line + self.Factory.debug("Generated Config: "+line) + return output + + def NewIP(self, name=None, standard="ocf"): + if self.CM.Env["IPagent"] == "IPaddr2": + ip = self.NextIP() + if not name: + if ":" in ip: + (prefix, sep, suffix) = ip.rpartition(":") + name = "r"+suffix + else: + name = "r"+ip + + r = Resource(self.Factory, name, self.CM.Env["IPagent"], standard) + r["ip"] = ip + + if ":" in ip: + r["cidr_netmask"] = "64" + r["nic"] = "eth0" + else: + r["cidr_netmask"] = "32" + + else: + if not name: + name = "r%s%d" % (self.CM.Env["IPagent"], self.counter) + self.counter = self.counter + 1 + r = Resource(self.Factory, name, self.CM.Env["IPagent"], standard) + + r.add_op("monitor", "5s") + return r + + def get_node_id(self, node_name): + """ Check the cluster configuration for a node ID. """ + + # We can't account for every possible configuration, + # so we only return a node ID if: + # * The node is specified in /etc/corosync/corosync.conf + # with "ring0_addr:" equal to node_name and "nodeid:" + # explicitly specified. + # In all other cases, we return 0. + node_id = 0 + + # awkward command: use } as record separator + # so each corosync.conf "object" is one record; + # match the "node {" record that has "ring0_addr: node_name"; + # then print the substring of that record after "nodeid:" + (rc, output) = self.Factory.rsh(self.Factory.target, + r"""awk -v RS="}" """ + r"""'/^(\s*nodelist\s*{)?\s*node\s*{.*(ring0_addr|name):\s*%s(\s+|$)/""" + r"""{gsub(/.*nodeid:\s*/,"");gsub(/\s+.*$/,"");print}' %s""" + % (node_name, BuildOptions.COROSYNC_CONFIG_FILE), verbose=1) + + if rc == 0 and len(output) == 1: + try: + node_id = int(output[0]) + except ValueError: + node_id = 0 + + return node_id + + def install(self, target): + old = self.Factory.tmpfile + + # Force a rebuild + self.cts_cib = None + + self.Factory.tmpfile = BuildOptions.CIB_DIR + "/cib.xml" + self.contents(target) + self.Factory.rsh(self.Factory.target, "chown " + BuildOptions.DAEMON_USER + " " + self.Factory.tmpfile) + + self.Factory.tmpfile = old + + def contents(self, target=None): + # fencing resource + if self.cts_cib: + return self.cts_cib + + if target: + self.Factory.target = target + + self.Factory.rsh(self.Factory.target, "HOME=/root cibadmin --empty %s > %s" % (self.version, self.Factory.tmpfile)) + self.num_nodes = len(self.CM.Env["nodes"]) + + no_quorum = "stop" + if self.num_nodes < 3: + no_quorum = "ignore" + self.Factory.log("Cluster only has %d nodes, configuring: no-quorum-policy=ignore" % self.num_nodes) + + # We don't need a nodes section unless we add attributes + stn = None + + # Fencing resource + # Define first so that the shell doesn't reject every update + if self.CM.Env["DoFencing"]: + + # Define the "real" fencing device + st = Resource(self.Factory, "Fencing", ""+self.CM.Env["stonith-type"], "stonith") + + # Set a threshold for unreliable stonith devices such as the vmware one + st.add_meta("migration-threshold", "5") + st.add_op("monitor", "120s", timeout="120s") + st.add_op("stop", "0", timeout="60s") + st.add_op("start", "0", timeout="60s") + + # For remote node tests, a cluster node is stopped and brought back up + # as a remote node with the name "remote-OLDNAME". To allow fencing + # devices to fence these nodes, create a list of all possible node names. + all_node_names = [ prefix+n for n in self.CM.Env["nodes"] for prefix in ('', 'remote-') ] + + # Add all parameters specified by user + entries = self.CM.Env["stonith-params"].split(',') + for entry in entries: + try: + (name, value) = entry.split('=', 1) + except ValueError: + print("Warning: skipping invalid fencing parameter: %s" % entry) + continue + + # Allow user to specify "all" as the node list, and expand it here + if name in [ "hostlist", "pcmk_host_list" ] and value == "all": + value = ' '.join(all_node_names) + + st[name] = value + + st.commit() + + # Test advanced fencing logic + if True: + stf_nodes = [] + stt_nodes = [] + attr_nodes = {} + + # Create the levels + stl = FencingTopology(self.Factory) + for node in self.CM.Env["nodes"]: + # Remote node tests will rename the node + remote_node = "remote-" + node + + # Randomly assign node to a fencing method + ftype = self.CM.Env.random_gen.choice(["levels-and", "levels-or ", "broadcast "]) + + # For levels-and, randomly choose targeting by node name or attribute + by = "" + if ftype == "levels-and": + node_id = self.get_node_id(node) + if node_id == 0 or self.CM.Env.random_gen.choice([True, False]): + by = " (by name)" + else: + attr_nodes[node] = node_id + by = " (by attribute)" + + self.CM.log(" - Using %s fencing for node: %s%s" % (ftype, node, by)) + + if ftype == "levels-and": + # If targeting by name, add a topology level for this node + if node not in attr_nodes: + stl.level(1, node, "FencingPass,Fencing") + + # Always target remote nodes by name, otherwise we would need to add + # an attribute to the remote node only during remote tests (we don't + # want nonexistent remote nodes showing up in the non-remote tests). + # That complexity is not worth the effort. + stl.level(1, remote_node, "FencingPass,Fencing") + + # Add the node (and its remote equivalent) to the list of levels-and nodes. + stt_nodes.extend([node, remote_node]) + + elif ftype == "levels-or ": + for n in [ node, remote_node ]: + stl.level(1, n, "FencingFail") + stl.level(2, n, "Fencing") + stf_nodes.extend([node, remote_node]) + + # If any levels-and nodes were targeted by attribute, + # create the attributes and a level for the attribute. + if attr_nodes: + stn = Nodes(self.Factory) + for (node_name, node_id) in list(attr_nodes.items()): + stn.add_node(node_name, node_id, { "cts-fencing" : "levels-and" }) + stl.level(1, None, "FencingPass,Fencing", "cts-fencing", "levels-and") + + # Create a Dummy agent that always passes for levels-and + if len(stt_nodes): + stt = Resource(self.Factory, "FencingPass", "fence_dummy", "stonith") + stt["pcmk_host_list"] = " ".join(stt_nodes) + # Wait this many seconds before doing anything, handy for letting disks get flushed too + stt["random_sleep_range"] = "30" + stt["mode"] = "pass" + stt.commit() + + # Create a Dummy agent that always fails for levels-or + if len(stf_nodes): + stf = Resource(self.Factory, "FencingFail", "fence_dummy", "stonith") + stf["pcmk_host_list"] = " ".join(stf_nodes) + # Wait this many seconds before doing anything, handy for letting disks get flushed too + stf["random_sleep_range"] = "30" + stf["mode"] = "fail" + stf.commit() + + # Now commit the levels themselves + stl.commit() + + o = Option(self.Factory) + o["stonith-enabled"] = self.CM.Env["DoFencing"] + o["start-failure-is-fatal"] = "false" + o["pe-input-series-max"] = "5000" + o["shutdown-escalation"] = "5min" + o["batch-limit"] = "10" + o["dc-deadtime"] = "5s" + o["no-quorum-policy"] = no_quorum + + if self.CM.Env["DoBSC"]: + o["ident-string"] = "Linux-HA TEST configuration file - REMOVEME!!" + + o.commit() + + o = OpDefaults(self.Factory) + o["timeout"] = "90s" + o.commit() + + # Commit the nodes section if we defined one + if stn is not None: + stn.commit() + + # Add an alerts section if possible + if self.Factory.rsh.exists_on_all(self.CM.Env["notification-agent"], self.CM.Env["nodes"]): + alerts = Alerts(self.Factory) + alerts.add_alert(self.CM.Env["notification-agent"], + self.CM.Env["notification-recipient"]) + alerts.commit() + + # Add resources? + if self.CM.Env["CIBResource"]: + self.add_resources() + + if self.CM.cluster_monitor == 1: + mon = Resource(self.Factory, "cluster_mon", "ocf", "ClusterMon", "pacemaker") + mon.add_op("start", "0", requires="nothing") + mon.add_op("monitor", "5s", requires="nothing") + mon["update"] = "10" + mon["extra_options"] = "-r -n" + mon["user"] = "abeekhof" + mon["htmlfile"] = "/suse/abeekhof/Export/cluster.html" + mon.commit() + + #self._create('''location prefer-dc cluster_mon rule -INFINITY: \#is_dc eq false''') + + # generate cib + self.cts_cib = self._show() + + if self.Factory.tmpfile != BuildOptions.CIB_DIR + "/cib.xml": + self.Factory.rsh(self.Factory.target, "rm -f "+self.Factory.tmpfile) + + return self.cts_cib + + def add_resources(self): + # Per-node resources + for node in self.CM.Env["nodes"]: + name = "rsc_"+node + r = self.NewIP(name) + r.prefer(node, "100") + r.commit() + + # Migrator + # Make this slightly sticky (since we have no other location constraints) to avoid relocation during Reattach + m = Resource(self.Factory, "migrator","Dummy", "ocf", "pacemaker") + m["passwd"] = "whatever" + m.add_meta("resource-stickiness","1") + m.add_meta("allow-migrate", "1") + m.add_op("monitor", "P10S") + m.commit() + + # Ping the test exerciser + p = Resource(self.Factory, "ping-1","ping", "ocf", "pacemaker") + p.add_op("monitor", "60s") + p["host_list"] = self.CM.Env["cts-exerciser"] + p["name"] = "connected" + p["debug"] = "true" + + c = Clone(self.Factory, "Connectivity", p) + c["globally-unique"] = "false" + c.commit() + + # promotable clone resource + s = Resource(self.Factory, "stateful-1", "Stateful", "ocf", "pacemaker") + s.add_op("monitor", "15s", timeout="60s") + s.add_op("monitor", "16s", timeout="60s", role="Promoted") + ms = Clone(self.Factory, "promotable-1", s) + ms["promotable"] = "true" + ms["clone-max"] = self.num_nodes + ms["clone-node-max"] = 1 + ms["promoted-max"] = 1 + ms["promoted-node-max"] = 1 + + # Require connectivity to run the promotable clone + r = Rule(self.Factory, "connected", "-INFINITY", op="or") + r.add_child(Expression(self.Factory, "m1-connected-1", "connected", "lt", "1")) + r.add_child(Expression(self.Factory, "m1-connected-2", "connected", "not_defined", None)) + ms.prefer("connected", rule=r) + + ms.commit() + + # Group Resource + g = Group(self.Factory, "group-1") + g.add_child(self.NewIP()) + + if self.CM.Env["have_systemd"]: + sysd = Resource(self.Factory, "petulant", + "pacemaker-cts-dummyd@10", "service") + sysd.add_op("monitor", "P10S") + g.add_child(sysd) + else: + g.add_child(self.NewIP()) + + g.add_child(self.NewIP()) + + # Make group depend on the promotable clone + g.after("promotable-1", first="promote", then="start") + g.colocate("promotable-1", "INFINITY", withrole="Promoted") + + g.commit() + + # LSB resource + lsb = Resource(self.Factory, "lsb-dummy", "LSBDummy", "lsb") + lsb.add_op("monitor", "5s") + + # LSB with group + lsb.after("group-1") + lsb.colocate("group-1") + + lsb.commit() + + +class CIB20(CIB12): + version = "pacemaker-2.5" + +class CIB30(CIB12): + version = "pacemaker-3.7" + +#class HASI(CIB10): +# def add_resources(self): +# # DLM resource +# self._create('''primitive dlm ocf:pacemaker:controld op monitor interval=120s''') +# self._create('''clone dlm-clone dlm meta globally-unique=false interleave=true''') + + # O2CB resource +# self._create('''primitive o2cb ocf:ocfs2:o2cb op monitor interval=120s''') +# self._create('''clone o2cb-clone o2cb meta globally-unique=false interleave=true''') +# self._create('''colocation o2cb-with-dlm INFINITY: o2cb-clone dlm-clone''') +# self._create('''order start-o2cb-after-dlm mandatory: dlm-clone o2cb-clone''') + + +class ConfigFactory(object): + def __init__(self, CM): + self.CM = CM + self.rsh = self.CM.rsh + self.register("pacemaker12", CIB12, CM, self) + self.register("pacemaker20", CIB20, CM, self) + self.register("pacemaker30", CIB30, CM, self) +# self.register("hae", HASI, CM, self) + if not self.CM.Env["ListTests"]: + self.target = self.CM.Env["nodes"][0] + self.tmpfile = None + + def log(self, args): + self.CM.log("cib: %s" % args) + + def debug(self, args): + self.CM.debug("cib: %s" % args) + + def register(self, methodName, constructor, *args, **kargs): + """register a constructor""" + _args = [constructor] + _args.extend(args) + setattr(self, methodName, ConfigFactoryItem(*_args, **kargs)) + + def unregister(self, methodName): + """unregister a constructor""" + delattr(self, methodName) + + def createConfig(self, name="pacemaker-1.0"): + if name == "pacemaker-1.0": + name = "pacemaker10"; + elif name == "pacemaker-1.2": + name = "pacemaker12"; + elif name == "pacemaker-2.0": + name = "pacemaker20"; + elif name.startswith("pacemaker-3."): + name = "pacemaker30"; + elif name == "hasi": + name = "hae"; + + if hasattr(self, name): + return getattr(self, name)() + else: + self.CM.log("Configuration variant '%s' is unknown. Defaulting to latest config" % name) + + return self.pacemaker30() + + +class ConfigFactoryItem(object): + def __init__(self, function, *args, **kargs): + self._function = function + self._args = args + self._kargs = kargs + + def __call__(self, *args, **kargs): + """call function""" + _args = list(self._args) + _args.extend(args) + _kargs = self._kargs.copy() + _kargs.update(kargs) + return self._function(*_args,**_kargs) + +if __name__ == '__main__': + """ Unit test (pass cluster node names as command line arguments) """ + + import cts.CM_corosync + import sys + + if len(sys.argv) < 2: + print("Usage: %s <node> ..." % sys.argv[0]) + sys.exit(1) + + args = [ + "--nodes", " ".join(sys.argv[1:]), + "--clobber-cib", + "--populate-resources", + "--stack", "corosync", + "--test-ip-base", "fe80::1234:56:7890:1000", + "--stonith", "rhcs", + ] + env = CtsLab(args) + cm = CM_corosync.crm_corosync() + CibFactory = ConfigFactory(cm) + cib = CibFactory.createConfig("pacemaker-3.0") + print(cib.contents()) diff --git a/cts/lab/CM_corosync.py b/cts/lab/CM_corosync.py new file mode 100644 index 0000000..dce7e98 --- /dev/null +++ b/cts/lab/CM_corosync.py @@ -0,0 +1,60 @@ +""" Corosync-specific class for Pacemaker's Cluster Test Suite (CTS) +""" + +__copyright__ = "Copyright 2007-2023 the Pacemaker project contributors" +__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" + +from cts.ClusterManager import ClusterManager + +from pacemaker._cts.CTS import Process +from pacemaker._cts.patterns import PatternSelector + +class crm_corosync(ClusterManager): + ''' + Corosync version 2 cluster manager class + ''' + def __init__(self, name=None): + if not name: name="crm-corosync" + ClusterManager.__init__(self) + + self.fullcomplist = {} + self.templates = PatternSelector(self.name) + + def Components(self): + complist = [] + if not len(list(self.fullcomplist.keys())): + for c in [ "pacemaker-based", "pacemaker-controld", "pacemaker-attrd", "pacemaker-execd", "pacemaker-fenced" ]: + self.fullcomplist[c] = Process( + self, c, + pats = self.templates.get_component(c), + badnews_ignore = self.templates.get_component("%s-ignore" % c) + + self.templates.get_component("common-ignore")) + + # the scheduler uses dc_pats instead of pats + self.fullcomplist["pacemaker-schedulerd"] = Process( + self, "pacemaker-schedulerd", + dc_pats = self.templates.get_component("pacemaker-schedulerd"), + badnews_ignore = self.templates.get_component("pacemaker-schedulerd-ignore") + + self.templates.get_component("common-ignore")) + + # add (or replace) extra components + self.fullcomplist["corosync"] = Process( + self, "corosync", + pats = self.templates.get_component("corosync"), + badnews_ignore = self.templates.get_component("corosync-ignore") + + self.templates.get_component("common-ignore") + ) + + # Processes running under valgrind can't be shot with "killall -9 processname", + # so don't include them in the returned list + vgrind = self.Env["valgrind-procs"].split() + for key in list(self.fullcomplist.keys()): + if self.Env["valgrind-tests"]: + if key in vgrind: + self.log("Filtering %s from the component list as it is being profiled by valgrind" % key) + continue + if key == "pacemaker-fenced" and not self.Env["DoFencing"]: + continue + complist.append(self.fullcomplist[key]) + + return complist diff --git a/cts/lab/CTSaudits.py b/cts/lab/CTSaudits.py new file mode 100755 index 0000000..51a04f8 --- /dev/null +++ b/cts/lab/CTSaudits.py @@ -0,0 +1,879 @@ +""" Auditing classes for Pacemaker's Cluster Test Suite (CTS) +""" + +__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" +__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" + +import time, re, uuid + +from pacemaker.buildoptions import BuildOptions +from pacemaker._cts.watcher import LogKind, LogWatcher + +class ClusterAudit(object): + + def __init__(self, cm): + self.CM = cm + + def __call__(self): + raise ValueError("Abstract Class member (__call__)") + + def is_applicable(self): + '''Return TRUE if we are applicable in the current test configuration''' + raise ValueError("Abstract Class member (is_applicable)") + return 1 + + def log(self, args): + self.CM.log("audit: %s" % args) + + def debug(self, args): + self.CM.debug("audit: %s" % args) + + def name(self): + raise ValueError("Abstract Class member (name)") + +AllAuditClasses = [ ] + + +class LogAudit(ClusterAudit): + + def name(self): + return "LogAudit" + + def __init__(self, cm): + self.CM = cm + + def RestartClusterLogging(self, nodes=None): + if not nodes: + nodes = self.CM.Env["nodes"] + + self.CM.debug("Restarting logging on: %s" % repr(nodes)) + + for node in nodes: + if self.CM.Env["have_systemd"]: + (rc, _) = self.CM.rsh(node, "systemctl stop systemd-journald.socket") + if rc != 0: + self.CM.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node) + + (rc, _) = self.CM.rsh(node, "systemctl start systemd-journald.service") + if rc != 0: + self.CM.log ("ERROR: Cannot start 'systemd-journald' on %s" % node) + + (rc, _) = self.CM.rsh(node, "service %s restart" % self.CM.Env["syslogd"]) + if rc != 0: + self.CM.log ("ERROR: Cannot restart '%s' on %s" % (self.CM.Env["syslogd"], node)) + + def _create_watcher(self, patterns, kind): + watch = LogWatcher(self.CM.Env["LogFileName"], patterns, + self.CM.Env["nodes"], kind, "LogAudit", 5, + silent=True) + watch.set_watch() + return watch + + def TestLogging(self): + patterns = [] + prefix = "Test message from" + suffix = str(uuid.uuid4()) + watch = {} + + for node in self.CM.Env["nodes"]: + # Look for the node name in two places to make sure + # that syslog is logging with the correct hostname + m = re.search("^([^.]+).*", node) + if m: + simple = m.group(1) + else: + simple = node + patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix)) + + watch_pref = self.CM.Env["LogWatcher"] + if watch_pref == LogKind.ANY: + kinds = [ LogKind.FILE ] + if self.CM.Env["have_systemd"]: + kinds += [ LogKind.JOURNAL ] + kinds += [ LogKind.REMOTE_FILE ] + for k in kinds: + watch[k] = self._create_watcher(patterns, k) + self.CM.log("Logging test message with identifier %s" % (suffix)) + else: + watch[watch_pref] = self._create_watcher(patterns, watch_pref) + + for node in self.CM.Env["nodes"]: + cmd = "logger -p %s.info %s %s %s" % (self.CM.Env["SyslogFacility"], prefix, node, suffix) + + (rc, _) = self.CM.rsh(node, cmd, synchronous=False, verbose=0) + if rc != 0: + self.CM.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node)) + + for k in list(watch.keys()): + w = watch[k] + if watch_pref == LogKind.ANY: + self.CM.log("Checking for test message in %s logs" % (k)) + w.look_for_all(silent=True) + if w.unmatched: + for regex in w.unmatched: + self.CM.log("Test message [%s] not found in %s logs" % (regex, w.kind)) + else: + if watch_pref == LogKind.ANY: + self.CM.log("Found test message in %s logs" % (k)) + self.CM.Env["LogWatcher"] = k + return 1 + + return 0 + + def __call__(self): + max = 3 + attempt = 0 + + self.CM.ns.wait_for_all_nodes(self.CM.Env["nodes"]) + while attempt <= max and self.TestLogging() == 0: + attempt = attempt + 1 + self.RestartClusterLogging() + time.sleep(60*attempt) + + if attempt > max: + self.CM.log("ERROR: Cluster logging unrecoverable.") + return 0 + + return 1 + + def is_applicable(self): + if self.CM.Env["DoBSC"]: + return 0 + if self.CM.Env["LogAuditDisabled"]: + return 0 + return 1 + + +class DiskAudit(ClusterAudit): + + def name(self): + return "DiskspaceAudit" + + def __init__(self, cm): + self.CM = cm + + def __call__(self): + result = 1 + # @TODO Use directory of PCMK_logfile if set on host + dfcmd = "df -BM " + BuildOptions.LOG_DIR + " | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%'" + + self.CM.ns.wait_for_all_nodes(self.CM.Env["nodes"]) + for node in self.CM.Env["nodes"]: + (_, dfout) = self.CM.rsh(node, dfcmd, verbose=1) + if not dfout: + self.CM.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node)) + else: + dfout = dfout[0].strip() + + try: + (used, remain) = dfout.split() + used_percent = int(used) + remaining_mb = int(remain) + except (ValueError, TypeError): + self.CM.log("Warning: df output '%s' from %s was invalid [%s, %s]" + % (dfout, node, used, remain)) + else: + if remaining_mb < 10 or used_percent > 95: + self.CM.log("CRIT: Out of log disk space on %s (%d%% / %dMB)" + % (node, used_percent, remaining_mb)) + result = None + if self.CM.Env["continue"]: + answer = "Y" + else: + try: + answer = input('Continue? [nY]') + except EOFError as e: + answer = "n" + + if answer and answer == "n": + raise ValueError("Disk full on %s" % (node)) + + elif remaining_mb < 100 or used_percent > 90: + self.CM.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node)) + return result + + def is_applicable(self): + if self.CM.Env["DoBSC"]: + return 0 + return 1 + + +class FileAudit(ClusterAudit): + + def name(self): + return "FileAudit" + + def __init__(self, cm): + self.CM = cm + self.known = [] + + def __call__(self): + result = 1 + + self.CM.ns.wait_for_all_nodes(self.CM.Env["nodes"]) + for node in self.CM.Env["nodes"]: + + (_, lsout) = self.CM.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1) + for line in lsout: + line = line.strip() + if line not in self.known: + result = 0 + self.known.append(line) + self.CM.log("Warning: Pacemaker core file on %s: %s" % (node, line)) + + (_, lsout) = self.CM.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1) + for line in lsout: + line = line.strip() + if line not in self.known: + result = 0 + self.known.append(line) + self.CM.log("Warning: Corosync core file on %s: %s" % (node, line)) + + if node in self.CM.ShouldBeStatus and self.CM.ShouldBeStatus[node] == "down": + clean = 0 + (_, lsout) = self.CM.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1) + for line in lsout: + result = 0 + clean = 1 + self.CM.log("Warning: Stale IPC file on %s: %s" % (node, line)) + + if clean: + (_, lsout) = self.CM.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1) + for line in lsout: + self.CM.debug("ps[%s]: %s" % (node, line)) + + self.CM.rsh(node, "rm -rf /dev/shm/qb-*") + + else: + self.CM.debug("Skipping %s" % node) + + return result + + def is_applicable(self): + return 1 + + +class AuditResource(object): + def __init__(self, cm, line): + fields = line.split() + self.CM = cm + self.line = line + self.type = fields[1] + self.id = fields[2] + self.clone_id = fields[3] + self.parent = fields[4] + self.rprovider = fields[5] + self.rclass = fields[6] + self.rtype = fields[7] + self.host = fields[8] + self.needs_quorum = fields[9] + self.flags = int(fields[10]) + self.flags_s = fields[11] + + if self.parent == "NA": + self.parent = None + + def unique(self): + if self.flags & int("0x00000020", 16): + return 1 + return 0 + + def orphan(self): + if self.flags & int("0x00000001", 16): + return 1 + return 0 + + def managed(self): + if self.flags & int("0x00000002", 16): + return 1 + return 0 + + +class AuditConstraint(object): + def __init__(self, cm, line): + fields = line.split() + self.CM = cm + self.line = line + self.type = fields[1] + self.id = fields[2] + self.rsc = fields[3] + self.target = fields[4] + self.score = fields[5] + self.rsc_role = fields[6] + self.target_role = fields[7] + + if self.rsc_role == "NA": + self.rsc_role = None + if self.target_role == "NA": + self.target_role = None + + +class PrimitiveAudit(ClusterAudit): + def name(self): + return "PrimitiveAudit" + + def __init__(self, cm): + self.CM = cm + + def doResourceAudit(self, resource, quorum): + rc = 1 + active = self.CM.ResourceLocation(resource.id) + + if len(active) == 1: + if quorum: + self.debug("Resource %s active on %s" % (resource.id, repr(active))) + + elif resource.needs_quorum == 1: + self.CM.log("Resource %s active without quorum: %s" + % (resource.id, repr(active))) + rc = 0 + + elif not resource.managed(): + self.CM.log("Resource %s not managed. Active on %s" + % (resource.id, repr(active))) + + elif not resource.unique(): + # TODO: Figure out a clever way to actually audit these resource types + if len(active) > 1: + self.debug("Non-unique resource %s is active on: %s" + % (resource.id, repr(active))) + else: + self.debug("Non-unique resource %s is not active" % resource.id) + + elif len(active) > 1: + self.CM.log("Resource %s is active multiple times: %s" + % (resource.id, repr(active))) + rc = 0 + + elif resource.orphan(): + self.debug("Resource %s is an inactive orphan" % resource.id) + + elif len(self.inactive_nodes) == 0: + self.CM.log("WARN: Resource %s not served anywhere" % resource.id) + rc = 0 + + elif self.CM.Env["warn-inactive"]: + if quorum or not resource.needs_quorum: + self.CM.log("WARN: Resource %s not served anywhere (Inactive nodes: %s)" + % (resource.id, repr(self.inactive_nodes))) + else: + self.debug("Resource %s not served anywhere (Inactive nodes: %s)" + % (resource.id, repr(self.inactive_nodes))) + + elif quorum or not resource.needs_quorum: + self.debug("Resource %s not served anywhere (Inactive nodes: %s)" + % (resource.id, repr(self.inactive_nodes))) + + return rc + + def setup(self): + self.target = None + self.resources = [] + self.constraints = [] + self.active_nodes = [] + self.inactive_nodes = [] + + for node in self.CM.Env["nodes"]: + if self.CM.ShouldBeStatus[node] == "up": + self.active_nodes.append(node) + else: + self.inactive_nodes.append(node) + + for node in self.CM.Env["nodes"]: + if self.target == None and self.CM.ShouldBeStatus[node] == "up": + self.target = node + + if not self.target: + # TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource + # with CIB_file=/path/to/cib.xml even when the cluster isn't running + self.debug("No nodes active - skipping %s" % self.name()) + return 0 + + (_, lines) = self.CM.rsh(self.target, "crm_resource -c", verbose=1) + + for line in lines: + if re.search("^Resource", line): + self.resources.append(AuditResource(self.CM, line)) + elif re.search("^Constraint", line): + self.constraints.append(AuditConstraint(self.CM, line)) + else: + self.CM.log("Unknown entry: %s" % line); + + return 1 + + def __call__(self): + rc = 1 + + if not self.setup(): + return 1 + + quorum = self.CM.HasQuorum(None) + for resource in self.resources: + if resource.type == "primitive": + if self.doResourceAudit(resource, quorum) == 0: + rc = 0 + return rc + + def is_applicable(self): + # @TODO Due to long-ago refactoring, this name test would never match, + # so this audit (and those derived from it) would never run. + # Uncommenting the next lines fixes the name test, but that then + # exposes pre-existing bugs that need to be fixed. + #if self.CM["Name"] == "crm-corosync": + # return 1 + return 0 + + +class GroupAudit(PrimitiveAudit): + def name(self): + return "GroupAudit" + + def __call__(self): + rc = 1 + if not self.setup(): + return 1 + + for group in self.resources: + if group.type == "group": + first_match = 1 + group_location = None + for child in self.resources: + if child.parent == group.id: + nodes = self.CM.ResourceLocation(child.id) + + if first_match and len(nodes) > 0: + group_location = nodes[0] + + first_match = 0 + + if len(nodes) > 1: + rc = 0 + self.CM.log("Child %s of %s is active more than once: %s" + % (child.id, group.id, repr(nodes))) + + elif len(nodes) == 0: + # Groups are allowed to be partially active + # However we do need to make sure later children aren't running + group_location = None + self.debug("Child %s of %s is stopped" % (child.id, group.id)) + + elif nodes[0] != group_location: + rc = 0 + self.CM.log("Child %s of %s is active on the wrong node (%s) expected %s" + % (child.id, group.id, nodes[0], group_location)) + else: + self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0])) + + return rc + + +class CloneAudit(PrimitiveAudit): + def name(self): + return "CloneAudit" + + def __call__(self): + rc = 1 + if not self.setup(): + return 1 + + for clone in self.resources: + if clone.type == "clone": + for child in self.resources: + if child.parent == clone.id and child.type == "primitive": + self.debug("Checking child %s of %s..." % (child.id, clone.id)) + # Check max and node_max + # Obtain with: + # crm_resource -g clone_max --meta -r child.id + # crm_resource -g clone_node_max --meta -r child.id + + return rc + + +class ColocationAudit(PrimitiveAudit): + def name(self): + return "ColocationAudit" + + def crm_location(self, resource): + (rc, lines) = self.CM.rsh(self.target, "crm_resource -W -r %s -Q"%resource, verbose=1) + hosts = [] + if rc == 0: + for line in lines: + fields = line.split() + hosts.append(fields[0]) + + return hosts + + def __call__(self): + rc = 1 + if not self.setup(): + return 1 + + for coloc in self.constraints: + if coloc.type == "rsc_colocation": + source = self.crm_location(coloc.rsc) + target = self.crm_location(coloc.target) + if len(source) == 0: + self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc)) + else: + for node in source: + if not node in target: + rc = 0 + self.CM.log("Colocation audit (%s): %s running on %s (not in %s)" + % (coloc.id, coloc.rsc, node, repr(target))) + else: + self.debug("Colocation audit (%s): %s running on %s (in %s)" + % (coloc.id, coloc.rsc, node, repr(target))) + + return rc + + +class ControllerStateAudit(ClusterAudit): + def __init__(self, cm): + self.CM = cm + self.Stats = {"calls":0 + , "success":0 + , "failure":0 + , "skipped":0 + , "auditfail":0} + + def has_key(self, key): + return key in self.Stats + + def __setitem__(self, key, value): + self.Stats[key] = value + + def __getitem__(self, key): + return self.Stats[key] + + def incr(self, name): + '''Increment (or initialize) the value associated with the given name''' + if not name in self.Stats: + self.Stats[name] = 0 + self.Stats[name] = self.Stats[name]+1 + + def __call__(self): + passed = 1 + up_are_down = 0 + down_are_up = 0 + unstable_list = [] + + for node in self.CM.Env["nodes"]: + should_be = self.CM.ShouldBeStatus[node] + rc = self.CM.test_node_CM(node) + if rc > 0: + if should_be == "down": + down_are_up = down_are_up + 1 + if rc == 1: + unstable_list.append(node) + elif should_be == "up": + up_are_down = up_are_down + 1 + + if len(unstable_list) > 0: + passed = 0 + self.CM.log("Cluster is not stable: %d (of %d): %s" + % (len(unstable_list), self.CM.upcount(), repr(unstable_list))) + + if up_are_down > 0: + passed = 0 + self.CM.log("%d (of %d) nodes expected to be up were down." + % (up_are_down, len(self.CM.Env["nodes"]))) + + if down_are_up > 0: + passed = 0 + self.CM.log("%d (of %d) nodes expected to be down were up." + % (down_are_up, len(self.CM.Env["nodes"]))) + + return passed + + def name(self): + return "ControllerStateAudit" + + def is_applicable(self): + # @TODO Due to long-ago refactoring, this name test would never match, + # so this audit (and those derived from it) would never run. + # Uncommenting the next lines fixes the name test, but that then + # exposes pre-existing bugs that need to be fixed. + #if self.CM["Name"] == "crm-corosync": + # return 1 + return 0 + + +class CIBAudit(ClusterAudit): + def __init__(self, cm): + self.CM = cm + self.Stats = {"calls":0 + , "success":0 + , "failure":0 + , "skipped":0 + , "auditfail":0} + + def has_key(self, key): + return key in self.Stats + + def __setitem__(self, key, value): + self.Stats[key] = value + + def __getitem__(self, key): + return self.Stats[key] + + def incr(self, name): + '''Increment (or initialize) the value associated with the given name''' + if not name in self.Stats: + self.Stats[name] = 0 + self.Stats[name] = self.Stats[name]+1 + + def __call__(self): + passed = 1 + ccm_partitions = self.CM.find_partitions() + + if len(ccm_partitions) == 0: + self.debug("\tNo partitions to audit") + return 1 + + for partition in ccm_partitions: + self.debug("\tAuditing CIB consistency for: %s" % partition) + partition_passed = 0 + if self.audit_cib_contents(partition) == 0: + passed = 0 + + return passed + + def audit_cib_contents(self, hostlist): + passed = 1 + node0 = None + node0_xml = None + + partition_hosts = hostlist.split() + for node in partition_hosts: + node_xml = self.store_remote_cib(node, node0) + + if node_xml == None: + self.CM.log("Could not perform audit: No configuration from %s" % node) + passed = 0 + + elif node0 == None: + node0 = node + node0_xml = node_xml + + elif node0_xml == None: + self.CM.log("Could not perform audit: No configuration from %s" % node0) + passed = 0 + + else: + (rc, result) = self.CM.rsh( + node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1) + + if rc != 0: + self.CM.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc)) + passed = 0 + + for line in result: + if not re.search("<diff/>", line): + passed = 0 + self.debug("CibDiff[%s-%s]: %s" % (node0, node, line)) + else: + self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line)) + +# self.CM.rsh(node0, "rm -f %s" % node_xml) +# self.CM.rsh(node0, "rm -f %s" % node0_xml) + return passed + + def store_remote_cib(self, node, target): + combined = "" + filename = "/tmp/ctsaudit.%s.xml" % node + + if not target: + target = node + + (rc, lines) = self.CM.rsh(node, self.CM["CibQuery"], verbose=1) + if rc != 0: + self.CM.log("Could not retrieve configuration") + return None + + self.CM.rsh("localhost", "rm -f %s" % filename) + for line in lines: + self.CM.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0) + + if self.CM.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0: + self.CM.log("Could not store configuration") + return None + return filename + + def name(self): + return "CibAudit" + + def is_applicable(self): + # @TODO Due to long-ago refactoring, this name test would never match, + # so this audit (and those derived from it) would never run. + # Uncommenting the next lines fixes the name test, but that then + # exposes pre-existing bugs that need to be fixed. + #if self.CM["Name"] == "crm-corosync": + # return 1 + return 0 + + +class PartitionAudit(ClusterAudit): + def __init__(self, cm): + self.CM = cm + self.Stats = {"calls":0 + , "success":0 + , "failure":0 + , "skipped":0 + , "auditfail":0} + self.NodeEpoch = {} + self.NodeState = {} + self.NodeQuorum = {} + + def has_key(self, key): + return key in self.Stats + + def __setitem__(self, key, value): + self.Stats[key] = value + + def __getitem__(self, key): + return self.Stats[key] + + def incr(self, name): + '''Increment (or initialize) the value associated with the given name''' + if not name in self.Stats: + self.Stats[name] = 0 + self.Stats[name] = self.Stats[name]+1 + + def __call__(self): + passed = 1 + ccm_partitions = self.CM.find_partitions() + + if ccm_partitions == None or len(ccm_partitions) == 0: + return 1 + + self.CM.cluster_stable(double_check=True) + + if len(ccm_partitions) != self.CM.partitions_expected: + self.CM.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions)) + passed = 0 + for partition in ccm_partitions: + self.CM.log("\t %s" % partition) + + for partition in ccm_partitions: + partition_passed = 0 + if self.audit_partition(partition) == 0: + passed = 0 + + return passed + + def trim_string(self, avalue): + if not avalue: + return None + if len(avalue) > 1: + return avalue[:-1] + + def trim2int(self, avalue): + if not avalue: + return None + if len(avalue) > 1: + return int(avalue[:-1]) + + def audit_partition(self, partition): + passed = 1 + dc_found = [] + dc_allowed_list = [] + lowest_epoch = None + node_list = partition.split() + + self.debug("Auditing partition: %s" % (partition)) + for node in node_list: + if self.CM.ShouldBeStatus[node] != "up": + self.CM.log("Warn: Node %s appeared out of nowhere" % (node)) + self.CM.ShouldBeStatus[node] = "up" + # not in itself a reason to fail the audit (not what we're + # checking for in this audit) + + (_, out) = self.CM.rsh(node, self.CM["StatusCmd"] % node, verbose=1) + self.NodeState[node] = out[0].strip() + + (_, out) = self.CM.rsh(node, self.CM["EpochCmd"], verbose=1) + self.NodeEpoch[node] = out[0].strip() + + (_, out) = self.CM.rsh(node, self.CM["QuorumCmd"], verbose=1) + self.NodeQuorum[node] = out[0].strip() + + self.debug("Node %s: %s - %s - %s." % (node, self.NodeState[node], self.NodeEpoch[node], self.NodeQuorum[node])) + self.NodeState[node] = self.trim_string(self.NodeState[node]) + self.NodeEpoch[node] = self.trim2int(self.NodeEpoch[node]) + self.NodeQuorum[node] = self.trim_string(self.NodeQuorum[node]) + + if not self.NodeEpoch[node]: + self.CM.log("Warn: Node %s dissappeared: cant determin epoch" % (node)) + self.CM.ShouldBeStatus[node] = "down" + # not in itself a reason to fail the audit (not what we're + # checking for in this audit) + elif lowest_epoch == None or self.NodeEpoch[node] < lowest_epoch: + lowest_epoch = self.NodeEpoch[node] + + if not lowest_epoch: + self.CM.log("Lowest epoch not determined in %s" % (partition)) + passed = 0 + + for node in node_list: + if self.CM.ShouldBeStatus[node] == "up": + if self.CM.is_node_dc(node, self.NodeState[node]): + dc_found.append(node) + if self.NodeEpoch[node] == lowest_epoch: + self.debug("%s: OK" % node) + elif not self.NodeEpoch[node]: + self.debug("Check on %s ignored: no node epoch" % node) + elif not lowest_epoch: + self.debug("Check on %s ignored: no lowest epoch" % node) + else: + self.CM.log("DC %s is not the oldest node (%d vs. %d)" + % (node, self.NodeEpoch[node], lowest_epoch)) + passed = 0 + + if len(dc_found) == 0: + self.CM.log("DC not found on any of the %d allowed nodes: %s (of %s)" + % (len(dc_allowed_list), str(dc_allowed_list), str(node_list))) + + elif len(dc_found) > 1: + self.CM.log("%d DCs (%s) found in cluster partition: %s" + % (len(dc_found), str(dc_found), str(node_list))) + passed = 0 + + if passed == 0: + for node in node_list: + if self.CM.ShouldBeStatus[node] == "up": + self.CM.log("epoch %s : %s" + % (self.NodeEpoch[node], self.NodeState[node])) + + return passed + + def name(self): + return "PartitionAudit" + + def is_applicable(self): + # @TODO Due to long-ago refactoring, this name test would never match, + # so this audit (and those derived from it) would never run. + # Uncommenting the next lines fixes the name test, but that then + # exposes pre-existing bugs that need to be fixed. + #if self.CM["Name"] == "crm-corosync": + # return 1 + return 0 + +AllAuditClasses.append(DiskAudit) +AllAuditClasses.append(FileAudit) +AllAuditClasses.append(LogAudit) +AllAuditClasses.append(ControllerStateAudit) +AllAuditClasses.append(PartitionAudit) +AllAuditClasses.append(PrimitiveAudit) +AllAuditClasses.append(GroupAudit) +AllAuditClasses.append(CloneAudit) +AllAuditClasses.append(ColocationAudit) +AllAuditClasses.append(CIBAudit) + + +def AuditList(cm): + result = [] + for auditclass in AllAuditClasses: + a = auditclass(cm) + if a.is_applicable(): + result.append(a) + return result diff --git a/cts/lab/CTSlab.py.in b/cts/lab/CTSlab.py.in new file mode 100644 index 0000000..bd990fd --- /dev/null +++ b/cts/lab/CTSlab.py.in @@ -0,0 +1,135 @@ +#!@PYTHON@ +""" Command-line interface to Pacemaker's Cluster Test Suite (CTS) +""" + +__copyright__ = "Copyright 2001-2023 the Pacemaker project contributors" +__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" + +import sys, signal, os + +pdir = os.path.dirname(sys.path[0]) +sys.path.insert(0, pdir) # So that things work from the source directory + +try: + from cts.CM_corosync import * + from cts.CTSaudits import AuditList + from cts.CTStests import TestList + from cts.CTSscenarios import * + + from pacemaker._cts.CTS import CtsLab + from pacemaker._cts.logging import LogFactory +except ImportError as e: + sys.stderr.write("abort: %s\n" % e) + sys.stderr.write("check your install and PYTHONPATH; couldn't find cts libraries in:\n%s\n" % + ' '.join(sys.path)) + sys.exit(1) + +# These are globals so they can be used by the signal handler. +scenario = None +LogFactory().add_stderr() + + +def sig_handler(signum, frame) : + LogFactory().log("Interrupted by signal %d"%signum) + if scenario: scenario.summarize() + if signum == 15 : + if scenario: scenario.TearDown() + sys.exit(1) + + +def plural_s(n, uppercase=False): + if n == 1: + return "" + elif uppercase: + return "S" + else: + return "s" + + +if __name__ == '__main__': + + Environment = CtsLab(sys.argv[1:]) + NumIter = Environment["iterations"] + Tests = [] + + # Set the signal handler + signal.signal(15, sig_handler) + signal.signal(10, sig_handler) + + # Create the Cluster Manager object + cm = None + if Environment["Stack"] == "corosync 2+": + cm = crm_corosync() + + else: + LogFactory().log("Unknown stack: "+Environment["stack"]) + sys.exit(1) + + if Environment["TruncateLog"]: + if Environment["OutputFile"] is None: + LogFactory().log("Ignoring truncate request because no output file specified") + else: + LogFactory().log("Truncating %s" % Environment["OutputFile"]) + with open(Environment["OutputFile"], "w") as outputfile: + outputfile.truncate(0) + + Audits = AuditList(cm) + + if Environment["ListTests"]: + Tests = TestList(cm, Audits) + LogFactory().log("Total %d tests"%len(Tests)) + for test in Tests : + LogFactory().log(str(test.name)); + sys.exit(0) + + elif len(Environment["tests"]) == 0: + Tests = TestList(cm, Audits) + + else: + Chosen = Environment["tests"] + for TestCase in Chosen: + match = None + + for test in TestList(cm, Audits): + if test.name == TestCase: + match = test + + if not match: + LogFactory().log("--choose: No applicable/valid tests chosen") + sys.exit(1) + else: + Tests.append(match) + + # Scenario selection + if Environment["scenario"] == "basic-sanity": + scenario = RandomTests(cm, [ BasicSanityCheck(Environment) ], Audits, Tests) + + elif Environment["scenario"] == "all-once": + NumIter = len(Tests) + scenario = AllOnce( + cm, [ BootCluster(Environment) ], Audits, Tests) + elif Environment["scenario"] == "sequence": + scenario = Sequence( + cm, [ BootCluster(Environment) ], Audits, Tests) + elif Environment["scenario"] == "boot": + scenario = Boot(cm, [ LeaveBooted(Environment)], Audits, []) + else: + scenario = RandomTests( + cm, [ BootCluster(Environment) ], Audits, Tests) + + LogFactory().log(">>>>>>>>>>>>>>>> BEGINNING " + repr(NumIter) + " TEST" + plural_s(NumIter, True) + " ") + LogFactory().log("Stack: %s (%s)" % (Environment["Stack"], Environment["Name"])) + LogFactory().log("Schema: %s" % Environment["Schema"]) + LogFactory().log("Scenario: %s" % scenario.__doc__) + LogFactory().log("CTS Exerciser: %s" % Environment["cts-exerciser"]) + LogFactory().log("CTS Logfile: %s" % Environment["OutputFile"]) + LogFactory().log("Random Seed: %s" % Environment["RandSeed"]) + LogFactory().log("Syslog variant: %s" % Environment["syslogd"].strip()) + LogFactory().log("System log files: %s" % Environment["LogFileName"]) + if Environment.has_key("IPBase"): + LogFactory().log("Base IP for resources: %s" % Environment["IPBase"]) + LogFactory().log("Cluster starts at boot: %d" % Environment["at-boot"]) + + Environment.dump() + rc = Environment.run(scenario, NumIter) + sys.exit(rc) diff --git a/cts/lab/CTSscenarios.py b/cts/lab/CTSscenarios.py new file mode 100644 index 0000000..37cb094 --- /dev/null +++ b/cts/lab/CTSscenarios.py @@ -0,0 +1,563 @@ +""" Test scenario classes for Pacemaker's Cluster Test Suite (CTS) +""" + +__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" +__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" + +import os +import re +import sys +import time + +from cts.CTStests import CTSTest +from cts.CTSaudits import ClusterAudit + +from pacemaker._cts.watcher import LogWatcher + +class ScenarioComponent(object): + + def __init__(self, Env): + self.Env = Env + + def IsApplicable(self): + '''Return True if the current ScenarioComponent is applicable + in the given LabEnvironment given to the constructor. + ''' + + raise ValueError("Abstract Class member (IsApplicable)") + + def SetUp(self, CM): + '''Set up the given ScenarioComponent''' + raise ValueError("Abstract Class member (Setup)") + + def TearDown(self, CM): + '''Tear down (undo) the given ScenarioComponent''' + raise ValueError("Abstract Class member (Setup)") + + +class Scenario(object): + ( +'''The basic idea of a scenario is that of an ordered list of +ScenarioComponent objects. Each ScenarioComponent is SetUp() in turn, +and then after the tests have been run, they are torn down using TearDown() +(in reverse order). + +A Scenario is applicable to a particular cluster manager iff each +ScenarioComponent is applicable. + +A partially set up scenario is torn down if it fails during setup. +''') + + def __init__(self, ClusterManager, Components, Audits, Tests): + + "Initialize the Scenario from the list of ScenarioComponents" + + self.ClusterManager = ClusterManager + self.Components = Components + self.Audits = Audits + self.Tests = Tests + + self.BadNews = None + self.TestSets = [] + self.Stats = {"success":0, "failure":0, "BadNews":0, "skipped":0} + self.Sets = [] + + #self.ns=CTS.NodeStatus(self.Env) + + for comp in Components: + if not issubclass(comp.__class__, ScenarioComponent): + raise ValueError("Init value must be subclass of ScenarioComponent") + + for audit in Audits: + if not issubclass(audit.__class__, ClusterAudit): + raise ValueError("Init value must be subclass of ClusterAudit") + + for test in Tests: + if not issubclass(test.__class__, CTSTest): + raise ValueError("Init value must be a subclass of CTSTest") + + def IsApplicable(self): + ( +'''A Scenario IsApplicable() iff each of its ScenarioComponents IsApplicable() +''' + ) + + for comp in self.Components: + if not comp.IsApplicable(): + return None + return True + + def SetUp(self): + '''Set up the Scenario. Return TRUE on success.''' + + self.ClusterManager.prepare() + self.audit() # Also detects remote/local log config + self.ClusterManager.ns.wait_for_all_nodes(self.ClusterManager.Env["nodes"]) + + self.audit() + self.ClusterManager.install_support() + + self.BadNews = LogWatcher(self.ClusterManager.Env["LogFileName"], + self.ClusterManager.templates.get_patterns("BadNews"), + self.ClusterManager.Env["nodes"], + self.ClusterManager.Env["LogWatcher"], + "BadNews", 0) + self.BadNews.set_watch() # Call after we've figured out what type of log watching to do in LogAudit + + j = 0 + while j < len(self.Components): + if not self.Components[j].SetUp(self.ClusterManager): + # OOPS! We failed. Tear partial setups down. + self.audit() + self.ClusterManager.log("Tearing down partial setup") + self.TearDown(j) + return None + j = j + 1 + + self.audit() + return 1 + + def TearDown(self, max=None): + + '''Tear Down the Scenario - in reverse order.''' + + if max == None: + max = len(self.Components)-1 + j = max + while j >= 0: + self.Components[j].TearDown(self.ClusterManager) + j = j - 1 + + self.audit() + self.ClusterManager.install_support("uninstall") + + def incr(self, name): + '''Increment (or initialize) the value associated with the given name''' + if not name in self.Stats: + self.Stats[name] = 0 + self.Stats[name] = self.Stats[name]+1 + + def run(self, Iterations): + self.ClusterManager.oprofileStart() + try: + self.run_loop(Iterations) + self.ClusterManager.oprofileStop() + except: + self.ClusterManager.oprofileStop() + raise + + def run_loop(self, Iterations): + raise ValueError("Abstract Class member (run_loop)") + + def run_test(self, test, testcount): + nodechoice = self.ClusterManager.Env.random_node() + + ret = 1 + where = "" + did_run = 0 + + self.ClusterManager.instance_errorstoignore_clear() + self.ClusterManager.log(("Running test %s" % test.name).ljust(35) + (" (%s) " % nodechoice).ljust(15) + "[" + ("%d" % testcount).rjust(3) + "]") + + starttime = test.set_timer() + if not test.setup(nodechoice): + self.ClusterManager.log("Setup failed") + ret = 0 + + elif not test.canrunnow(nodechoice): + self.ClusterManager.log("Skipped") + test.skipped() + + else: + did_run = 1 + ret = test(nodechoice) + + if not test.teardown(nodechoice): + self.ClusterManager.log("Teardown failed") + if self.ClusterManager.Env["continue"]: + answer = "Y" + else: + try: + answer = input('Continue? [nY]') + except EOFError as e: + answer = "n" + if answer and answer == "n": + raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice)) + ret = 0 + + stoptime = time.time() + self.ClusterManager.oprofileSave(testcount) + + elapsed_time = stoptime - starttime + test_time = stoptime - test.get_timer() + if not test["min_time"]: + test["elapsed_time"] = elapsed_time + test["min_time"] = test_time + test["max_time"] = test_time + else: + test["elapsed_time"] = test["elapsed_time"] + elapsed_time + if test_time < test["min_time"]: + test["min_time"] = test_time + if test_time > test["max_time"]: + test["max_time"] = test_time + + if ret: + self.incr("success") + test.log_timer() + else: + self.incr("failure") + self.ClusterManager.statall() + did_run = 1 # Force the test count to be incremented anyway so test extraction works + + self.audit(test.errorstoignore()) + return did_run + + def summarize(self): + self.ClusterManager.log("****************") + self.ClusterManager.log("Overall Results:" + repr(self.Stats)) + self.ClusterManager.log("****************") + + stat_filter = { + "calls":0, + "failure":0, + "skipped":0, + "auditfail":0, + } + self.ClusterManager.log("Test Summary") + for test in self.Tests: + for key in list(stat_filter.keys()): + stat_filter[key] = test.Stats[key] + self.ClusterManager.log(("Test %s: "%test.name).ljust(25) + " %s"%repr(stat_filter)) + + self.ClusterManager.debug("Detailed Results") + for test in self.Tests: + self.ClusterManager.debug(("Test %s: "%test.name).ljust(25) + " %s"%repr(test.Stats)) + + self.ClusterManager.log("<<<<<<<<<<<<<<<< TESTS COMPLETED") + + def audit(self, LocalIgnore=[]): + errcount = 0 + ignorelist = [] + ignorelist.append("CTS:") + ignorelist.extend(LocalIgnore) + ignorelist.extend(self.ClusterManager.errorstoignore()) + ignorelist.extend(self.ClusterManager.instance_errorstoignore()) + + # This makes sure everything is stabilized before starting... + failed = 0 + for audit in self.Audits: + if not audit(): + self.ClusterManager.log("Audit " + audit.name() + " FAILED.") + failed += 1 + else: + self.ClusterManager.debug("Audit " + audit.name() + " passed.") + + while errcount < 1000: + match = None + if self.BadNews: + match = self.BadNews.look(0) + + if match: + add_err = 1 + for ignore in ignorelist: + if add_err == 1 and re.search(ignore, match): + add_err = 0 + if add_err == 1: + self.ClusterManager.log("BadNews: " + match) + self.incr("BadNews") + errcount = errcount + 1 + else: + break + else: + if self.ClusterManager.Env["continue"]: + answer = "Y" + else: + try: + answer = input('Big problems. Continue? [nY]') + except EOFError as e: + answer = "n" + if answer and answer == "n": + self.ClusterManager.log("Shutting down.") + self.summarize() + self.TearDown() + raise ValueError("Looks like we hit a BadNews jackpot!") + + if self.BadNews: + self.BadNews.end() + return failed + + +class AllOnce(Scenario): + '''Every Test Once''' # Accessable as __doc__ + def run_loop(self, Iterations): + testcount = 1 + for test in self.Tests: + self.run_test(test, testcount) + testcount += 1 + + +class RandomTests(Scenario): + '''Random Test Execution''' + def run_loop(self, Iterations): + testcount = 1 + while testcount <= Iterations: + test = self.ClusterManager.Env.random_gen.choice(self.Tests) + self.run_test(test, testcount) + testcount += 1 + + +class BasicSanity(Scenario): + '''Basic Cluster Sanity''' + def run_loop(self, Iterations): + testcount = 1 + while testcount <= Iterations: + test = self.Environment.random_gen.choice(self.Tests) + self.run_test(test, testcount) + testcount += 1 + + +class Sequence(Scenario): + '''Named Tests in Sequence''' + def run_loop(self, Iterations): + testcount = 1 + while testcount <= Iterations: + for test in self.Tests: + self.run_test(test, testcount) + testcount += 1 + + +class Boot(Scenario): + '''Start the Cluster''' + def run_loop(self, Iterations): + testcount = 0 + + +class BootCluster(ScenarioComponent): + ( +'''BootCluster is the most basic of ScenarioComponents. +This ScenarioComponent simply starts the cluster manager on all the nodes. +It is fairly robust as it waits for all nodes to come up before starting +as they might have been rebooted or crashed for some reason beforehand. +''') + def __init__(self, Env): + pass + + def IsApplicable(self): + '''BootCluster is so generic it is always Applicable''' + return True + + def SetUp(self, CM): + '''Basic Cluster Manager startup. Start everything''' + + CM.prepare() + + # Clear out the cobwebs ;-) + CM.stopall(verbose=True, force=True) + + # Now start the Cluster Manager on all the nodes. + CM.log("Starting Cluster Manager on all nodes.") + return CM.startall(verbose=True, quick=True) + + def TearDown(self, CM, force=False): + '''Set up the given ScenarioComponent''' + + # Stop the cluster manager everywhere + + CM.log("Stopping Cluster Manager on all nodes") + return CM.stopall(verbose=True, force=force) + + +class LeaveBooted(BootCluster): + def TearDown(self, CM): + '''Set up the given ScenarioComponent''' + + # Stop the cluster manager everywhere + + CM.log("Leaving Cluster running on all nodes") + return 1 + + +class PingFest(ScenarioComponent): + ( +'''PingFest does a flood ping to each node in the cluster from the test machine. + +If the LabEnvironment Parameter PingSize is set, it will be used as the size +of ping packet requested (via the -s option). If it is not set, it defaults +to 1024 bytes. + +According to the manual page for ping: + Outputs packets as fast as they come back or one hundred times per + second, whichever is more. For every ECHO_REQUEST sent a period ``.'' + is printed, while for every ECHO_REPLY received a backspace is printed. + This provides a rapid display of how many packets are being dropped. + Only the super-user may use this option. This can be very hard on a net- + work and should be used with caution. +''' ) + + def __init__(self, Env): + self.Env = Env + + def IsApplicable(self): + '''PingFests are always applicable ;-) + ''' + + return True + + def SetUp(self, CM): + '''Start the PingFest!''' + + self.PingSize = 1024 + if "PingSize" in list(CM.Env.keys()): + self.PingSize = CM.Env["PingSize"] + + CM.log("Starting %d byte flood pings" % self.PingSize) + + self.PingPids = [] + for node in CM.Env["nodes"]: + self.PingPids.append(self._pingchild(node)) + + CM.log("Ping PIDs: " + repr(self.PingPids)) + return 1 + + def TearDown(self, CM): + '''Stop it right now! My ears are pinging!!''' + + for pid in self.PingPids: + if pid != None: + CM.log("Stopping ping process %d" % pid) + os.kill(pid, signal.SIGKILL) + + def _pingchild(self, node): + + Args = ["ping", "-qfn", "-s", str(self.PingSize), node] + + sys.stdin.flush() + sys.stdout.flush() + sys.stderr.flush() + pid = os.fork() + + if pid < 0: + self.Env.log("Cannot fork ping child") + return None + if pid > 0: + return pid + + # Otherwise, we're the child process. + + os.execvp("ping", Args) + self.Env.log("Cannot execvp ping: " + repr(Args)) + sys.exit(1) + + +class BasicSanityCheck(ScenarioComponent): + ( +''' +''') + + def IsApplicable(self): + return self.Env["DoBSC"] + + def SetUp(self, CM): + + CM.prepare() + + # Clear out the cobwebs + self.TearDown(CM) + + # Now start the Cluster Manager on all the nodes. + CM.log("Starting Cluster Manager on BSC node(s).") + return CM.startall() + + def TearDown(self, CM): + CM.log("Stopping Cluster Manager on BSC node(s).") + return CM.stopall() + + +class Benchmark(ScenarioComponent): + ( +''' +''') + + def IsApplicable(self): + return self.Env["benchmark"] + + def SetUp(self, CM): + + CM.prepare() + + # Clear out the cobwebs + self.TearDown(CM, force=True) + + # Now start the Cluster Manager on all the nodes. + CM.log("Starting Cluster Manager on all node(s).") + return CM.startall() + + def TearDown(self, CM): + CM.log("Stopping Cluster Manager on all node(s).") + return CM.stopall() + + +class RollingUpgrade(ScenarioComponent): + ( +''' +Test a rolling upgrade between two versions of the stack +''') + + def __init__(self, Env): + self.Env = Env + + def IsApplicable(self): + if not self.Env["rpm-dir"]: + return None + if not self.Env["current-version"]: + return None + if not self.Env["previous-version"]: + return None + + return True + + def install(self, node, version): + + target_dir = "/tmp/rpm-%s" % version + src_dir = "%s/%s" % (self.CM.Env["rpm-dir"], version) + + self.CM.rsh(node, "mkdir -p %s" % target_dir) + rc = self.CM.cp("%s/*.rpm %s:%s" % (src_dir, node, target_dir)) + self.CM.rsh(node, "rpm -Uvh --force %s/*.rpm" % (target_dir)) + + return self.success() + + def upgrade(self, node): + return self.install(node, self.CM.Env["current-version"]) + + def downgrade(self, node): + return self.install(node, self.CM.Env["previous-version"]) + + def SetUp(self, CM): + print(repr(self)+"prepare") + CM.prepare() + + # Clear out the cobwebs + CM.stopall(force=True) + + CM.log("Downgrading all nodes to %s." % self.Env["previous-version"]) + + for node in self.Env["nodes"]: + if not self.downgrade(node): + CM.log("Couldn't downgrade %s" % node) + return None + + return 1 + + def TearDown(self, CM): + # Stop everything + CM.log("Stopping Cluster Manager on Upgrade nodes.") + CM.stopall() + + CM.log("Upgrading all nodes to %s." % self.Env["current-version"]) + for node in self.Env["nodes"]: + if not self.upgrade(node): + CM.log("Couldn't upgrade %s" % node) + return None + + return 1 diff --git a/cts/lab/CTStests.py b/cts/lab/CTStests.py new file mode 100644 index 0000000..61766ce --- /dev/null +++ b/cts/lab/CTStests.py @@ -0,0 +1,3178 @@ +""" Test-specific classes for Pacemaker's Cluster Test Suite (CTS) +""" + +__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors" +__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" + +# +# SPECIAL NOTE: +# +# Tests may NOT implement any cluster-manager-specific code in them. +# EXTEND the ClusterManager object to provide the base capabilities +# the test needs if you need to do something that the current CM classes +# do not. Otherwise you screw up the whole point of the object structure +# in CTS. +# +# Thank you. +# + +import os +import re +import time +import subprocess +import tempfile + +from stat import * +from cts.CTSaudits import * + +from pacemaker import BuildOptions +from pacemaker._cts.CTS import NodeStatus +from pacemaker._cts.environment import EnvFactory +from pacemaker._cts.logging import LogFactory +from pacemaker._cts.patterns import PatternSelector +from pacemaker._cts.remote import RemoteFactory +from pacemaker._cts.watcher import LogWatcher + +AllTestClasses = [ ] + + +class CTSTest(object): + ''' + A Cluster test. + We implement the basic set of properties and behaviors for a generic + cluster test. + + Cluster tests track their own statistics. + We keep each of the kinds of counts we track as separate {name,value} + pairs. + ''' + + def __init__(self, cm): + #self.name="the unnamed test" + self.Stats = {"calls":0 + , "success":0 + , "failure":0 + , "skipped":0 + , "auditfail":0} + +# if not issubclass(cm.__class__, ClusterManager): +# raise ValueError("Must be a ClusterManager object") + self.CM = cm + self.Env = EnvFactory().getInstance() + self.rsh = RemoteFactory().getInstance() + self.logger = LogFactory() + self.templates = PatternSelector(cm["Name"]) + self.Audits = [] + self.timeout = 120 + self.passed = 1 + self.is_loop = 0 + self.is_unsafe = 0 + self.is_experimental = 0 + self.is_container = 0 + self.is_valgrind = 0 + self.benchmark = 0 # which tests to benchmark + self.timer = {} # timers + + def log(self, args): + self.logger.log(args) + + def debug(self, args): + self.logger.debug(args) + + def has_key(self, key): + return key in self.Stats + + def __setitem__(self, key, value): + self.Stats[key] = value + + def __getitem__(self, key): + if str(key) == "0": + raise ValueError("Bad call to 'foo in X', should reference 'foo in X.Stats' instead") + + if key in self.Stats: + return self.Stats[key] + return None + + def log_mark(self, msg): + self.debug("MARK: test %s %s %d" % (self.name,msg,time.time())) + return + + def get_timer(self,key = "test"): + try: return self.timer[key] + except: return 0 + + def set_timer(self,key = "test"): + self.timer[key] = time.time() + return self.timer[key] + + def log_timer(self,key = "test"): + elapsed = 0 + if key in self.timer: + elapsed = time.time() - self.timer[key] + s = key == "test" and self.name or "%s:%s" % (self.name,key) + self.debug("%s runtime: %.2f" % (s, elapsed)) + del self.timer[key] + return elapsed + + def incr(self, name): + '''Increment (or initialize) the value associated with the given name''' + if not name in self.Stats: + self.Stats[name] = 0 + self.Stats[name] = self.Stats[name]+1 + + # Reset the test passed boolean + if name == "calls": + self.passed = 1 + + def failure(self, reason="none"): + '''Increment the failure count''' + self.passed = 0 + self.incr("failure") + self.logger.log(("Test %s" % self.name).ljust(35) + " FAILED: %s" % reason) + return None + + def success(self): + '''Increment the success count''' + self.incr("success") + return 1 + + def skipped(self): + '''Increment the skipped count''' + self.incr("skipped") + return 1 + + def __call__(self, node): + '''Perform the given test''' + raise ValueError("Abstract Class member (__call__)") + self.incr("calls") + return self.failure() + + def audit(self): + passed = 1 + if len(self.Audits) > 0: + for audit in self.Audits: + if not audit(): + self.logger.log("Internal %s Audit %s FAILED." % (self.name, audit.name())) + self.incr("auditfail") + passed = 0 + return passed + + def setup(self, node): + '''Setup the given test''' + return self.success() + + def teardown(self, node): + '''Tear down the given test''' + return self.success() + + def create_watch(self, patterns, timeout, name=None): + if not name: + name = self.name + return LogWatcher(self.Env["LogFileName"], patterns, self.Env["nodes"], self.Env["LogWatcher"], name, timeout) + + def local_badnews(self, prefix, watch, local_ignore=[]): + errcount = 0 + if not prefix: + prefix = "LocalBadNews:" + + ignorelist = [] + ignorelist.append(" CTS: ") + ignorelist.append(prefix) + ignorelist.extend(local_ignore) + + while errcount < 100: + match = watch.look(0) + if match: + add_err = 1 + for ignore in ignorelist: + if add_err == 1 and re.search(ignore, match): + add_err = 0 + if add_err == 1: + self.logger.log(prefix + " " + match) + errcount = errcount + 1 + else: + break + else: + self.logger.log("Too many errors!") + + watch.end() + return errcount + + def is_applicable(self): + return self.is_applicable_common() + + def is_applicable_common(self): + '''Return True if we are applicable in the current test configuration''' + #raise ValueError("Abstract Class member (is_applicable)") + + if self.is_loop and not self.Env["loop-tests"]: + return False + elif self.is_unsafe and not self.Env["unsafe-tests"]: + return False + elif self.is_valgrind and not self.Env["valgrind-tests"]: + return False + elif self.is_experimental and not self.Env["experimental-tests"]: + return False + elif self.is_container and not self.Env["container-tests"]: + return False + elif self.Env["benchmark"] and self.benchmark == 0: + return False + + return True + + def find_ocfs2_resources(self, node): + self.r_o2cb = None + self.r_ocfs2 = [] + + (_, lines) = self.rsh(node, "crm_resource -c", verbose=1) + for line in lines: + if re.search("^Resource", line): + r = AuditResource(self.CM, line) + if r.rtype == "o2cb" and r.parent != "NA": + self.debug("Found o2cb: %s" % self.r_o2cb) + self.r_o2cb = r.parent + if re.search("^Constraint", line): + c = AuditConstraint(self.CM, line) + if c.type == "rsc_colocation" and c.target == self.r_o2cb: + self.r_ocfs2.append(c.rsc) + + self.debug("Found ocfs2 filesystems: %s" % repr(self.r_ocfs2)) + return len(self.r_ocfs2) + + def canrunnow(self, node): + '''Return TRUE if we can meaningfully run right now''' + return 1 + + def errorstoignore(self): + '''Return list of errors which are 'normal' and should be ignored''' + return [] + + +class StopTest(CTSTest): + '''Stop (deactivate) the cluster manager on a node''' + def __init__(self, cm): + CTSTest.__init__(self, cm) + self.name = "Stop" + + def __call__(self, node): + '''Perform the 'stop' test. ''' + self.incr("calls") + if self.CM.ShouldBeStatus[node] != "up": + return self.skipped() + + patterns = [] + # Technically we should always be able to notice ourselves stopping + patterns.append(self.templates["Pat:We_stopped"] % node) + + # Any active node needs to notice this one left + # (note that this won't work if we have multiple partitions) + for other in self.Env["nodes"]: + if self.CM.ShouldBeStatus[other] == "up" and other != node: + patterns.append(self.templates["Pat:They_stopped"] %(other, self.CM.key_for_node(node))) + #self.debug("Checking %s will notice %s left"%(other, node)) + + watch = self.create_watch(patterns, self.Env["DeadTime"]) + watch.set_watch() + + if node == self.CM.OurNode: + self.incr("us") + else: + if self.CM.upcount() <= 1: + self.incr("all") + else: + self.incr("them") + + self.CM.StopaCM(node) + watch_result = watch.look_for_all() + + failreason = None + UnmatchedList = "||" + if watch.unmatched: + (_, output) = self.rsh(node, "/bin/ps axf", verbose=1) + for line in output: + self.debug(line) + + (_, output) = self.rsh(node, "/usr/sbin/dlm_tool dump 2>/dev/null", verbose=1) + for line in output: + self.debug(line) + + for regex in watch.unmatched: + self.logger.log ("ERROR: Shutdown pattern not found: %s" % (regex)) + UnmatchedList += regex + "||"; + failreason = "Missing shutdown pattern" + + self.CM.cluster_stable(self.Env["DeadTime"]) + + if not watch.unmatched or self.CM.upcount() == 0: + return self.success() + + if len(watch.unmatched) >= self.CM.upcount(): + return self.failure("no match against (%s)" % UnmatchedList) + + if failreason == None: + return self.success() + else: + return self.failure(failreason) +# +# We don't register StopTest because it's better when called by +# another test... +# + + +class StartTest(CTSTest): + '''Start (activate) the cluster manager on a node''' + def __init__(self, cm, debug=None): + CTSTest.__init__(self,cm) + self.name = "start" + self.debug = debug + + def __call__(self, node): + '''Perform the 'start' test. ''' + self.incr("calls") + + if self.CM.upcount() == 0: + self.incr("us") + else: + self.incr("them") + + if self.CM.ShouldBeStatus[node] != "down": + return self.skipped() + elif self.CM.StartaCM(node): + return self.success() + else: + return self.failure("Startup %s on node %s failed" + % (self.Env["Name"], node)) + +# +# We don't register StartTest because it's better when called by +# another test... +# + + +class FlipTest(CTSTest): + '''If it's running, stop it. If it's stopped start it. + Overthrow the status quo... + ''' + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "Flip" + self.start = StartTest(cm) + self.stop = StopTest(cm) + + def __call__(self, node): + '''Perform the 'Flip' test. ''' + self.incr("calls") + if self.CM.ShouldBeStatus[node] == "up": + self.incr("stopped") + ret = self.stop(node) + type = "up->down" + # Give the cluster time to recognize it's gone... + time.sleep(self.Env["StableTime"]) + elif self.CM.ShouldBeStatus[node] == "down": + self.incr("started") + ret = self.start(node) + type = "down->up" + else: + return self.skipped() + + self.incr(type) + if ret: + return self.success() + else: + return self.failure("%s failure" % type) + +# Register FlipTest as a good test to run +AllTestClasses.append(FlipTest) + + +class RestartTest(CTSTest): + '''Stop and restart a node''' + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "Restart" + self.start = StartTest(cm) + self.stop = StopTest(cm) + self.benchmark = 1 + + def __call__(self, node): + '''Perform the 'restart' test. ''' + self.incr("calls") + + self.incr("node:" + node) + + ret1 = 1 + if self.CM.StataCM(node): + self.incr("WasStopped") + if not self.start(node): + return self.failure("start (setup) failure: "+node) + + self.set_timer() + if not self.stop(node): + return self.failure("stop failure: "+node) + if not self.start(node): + return self.failure("start failure: "+node) + return self.success() + +# Register RestartTest as a good test to run +AllTestClasses.append(RestartTest) + + +class StonithdTest(CTSTest): + def __init__(self, cm): + CTSTest.__init__(self, cm) + self.name = "Stonithd" + self.startall = SimulStartLite(cm) + self.benchmark = 1 + + def __call__(self, node): + self.incr("calls") + if len(self.Env["nodes"]) < 2: + return self.skipped() + + ret = self.startall(None) + if not ret: + return self.failure("Setup failed") + + is_dc = self.CM.is_node_dc(node) + + watchpats = [] + watchpats.append(self.templates["Pat:Fencing_ok"] % node) + watchpats.append(self.templates["Pat:NodeFenced"] % node) + + if not self.Env["at-boot"]: + self.debug("Expecting %s to stay down" % node) + self.CM.ShouldBeStatus[node] = "down" + else: + self.debug("Expecting %s to come up again %d" % (node, self.Env["at-boot"])) + watchpats.append("%s.* S_STARTING -> S_PENDING" % node) + watchpats.append("%s.* S_PENDING -> S_NOT_DC" % node) + + watch = self.create_watch(watchpats, 30 + self.Env["DeadTime"] + self.Env["StableTime"] + self.Env["StartTime"]) + watch.set_watch() + + origin = self.Env.random_gen.choice(self.Env["nodes"]) + + (rc, _) = self.rsh(origin, "stonith_admin --reboot %s -VVVVVV" % node) + + if rc == 124: # CRM_EX_TIMEOUT + # Look for the patterns, usually this means the required + # device was running on the node to be fenced - or that + # the required devices were in the process of being loaded + # and/or moved + # + # Effectively the node committed suicide so there will be + # no confirmation, but pacemaker should be watching and + # fence the node again + + self.logger.log("Fencing command on %s to fence %s timed out" % (origin, node)) + + elif origin != node and rc != 0: + self.debug("Waiting for the cluster to recover") + self.CM.cluster_stable() + + self.debug("Waiting for fenced node to come back up") + self.CM.ns.wait_for_all_nodes(self.Env["nodes"], 600) + + self.logger.log("Fencing command on %s failed to fence %s (rc=%d)" % (origin, node, rc)) + + elif origin == node and rc != 255: + # 255 == broken pipe, ie. the node was fenced as expected + self.logger.log("Locally originated fencing returned %d" % rc) + + self.set_timer("fence") + matched = watch.look_for_all() + self.log_timer("fence") + self.set_timer("reform") + if watch.unmatched: + self.logger.log("Patterns not found: " + repr(watch.unmatched)) + + self.debug("Waiting for the cluster to recover") + self.CM.cluster_stable() + + self.debug("Waiting for fenced node to come back up") + self.CM.ns.wait_for_all_nodes(self.Env["nodes"], 600) + + self.debug("Waiting for the cluster to re-stabilize with all nodes") + is_stable = self.CM.cluster_stable(self.Env["StartTime"]) + + if not matched: + return self.failure("Didn't find all expected patterns") + elif not is_stable: + return self.failure("Cluster did not become stable") + + self.log_timer("reform") + return self.success() + + def errorstoignore(self): + return [ + self.templates["Pat:Fencing_start"] % ".*", + self.templates["Pat:Fencing_ok"] % ".*", + self.templates["Pat:Fencing_active"], + r"error.*: Operation 'reboot' targeting .* by .* for stonith_admin.*: Timer expired", + ] + + def is_applicable(self): + if not self.is_applicable_common(): + return False + + if "DoFencing" in list(self.Env.keys()): + return self.Env["DoFencing"] + + return True + +AllTestClasses.append(StonithdTest) + + +class StartOnebyOne(CTSTest): + '''Start all the nodes ~ one by one''' + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "StartOnebyOne" + self.stopall = SimulStopLite(cm) + self.start = StartTest(cm) + self.ns = NodeStatus(cm.Env) + + def __call__(self, dummy): + '''Perform the 'StartOnebyOne' test. ''' + self.incr("calls") + + # We ignore the "node" parameter... + + # Shut down all the nodes... + ret = self.stopall(None) + if not ret: + return self.failure("Test setup failed") + + failed = [] + self.set_timer() + for node in self.Env["nodes"]: + if not self.start(node): + failed.append(node) + + if len(failed) > 0: + return self.failure("Some node failed to start: " + repr(failed)) + + return self.success() + +# Register StartOnebyOne as a good test to run +AllTestClasses.append(StartOnebyOne) + + +class SimulStart(CTSTest): + '''Start all the nodes ~ simultaneously''' + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "SimulStart" + self.stopall = SimulStopLite(cm) + self.startall = SimulStartLite(cm) + + def __call__(self, dummy): + '''Perform the 'SimulStart' test. ''' + self.incr("calls") + + # We ignore the "node" parameter... + + # Shut down all the nodes... + ret = self.stopall(None) + if not ret: + return self.failure("Setup failed") + + if not self.startall(None): + return self.failure("Startall failed") + + return self.success() + +# Register SimulStart as a good test to run +AllTestClasses.append(SimulStart) + + +class SimulStop(CTSTest): + '''Stop all the nodes ~ simultaneously''' + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "SimulStop" + self.startall = SimulStartLite(cm) + self.stopall = SimulStopLite(cm) + + def __call__(self, dummy): + '''Perform the 'SimulStop' test. ''' + self.incr("calls") + + # We ignore the "node" parameter... + + # Start up all the nodes... + ret = self.startall(None) + if not ret: + return self.failure("Setup failed") + + if not self.stopall(None): + return self.failure("Stopall failed") + + return self.success() + +# Register SimulStop as a good test to run +AllTestClasses.append(SimulStop) + + +class StopOnebyOne(CTSTest): + '''Stop all the nodes in order''' + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "StopOnebyOne" + self.startall = SimulStartLite(cm) + self.stop = StopTest(cm) + + def __call__(self, dummy): + '''Perform the 'StopOnebyOne' test. ''' + self.incr("calls") + + # We ignore the "node" parameter... + + # Start up all the nodes... + ret = self.startall(None) + if not ret: + return self.failure("Setup failed") + + failed = [] + self.set_timer() + for node in self.Env["nodes"]: + if not self.stop(node): + failed.append(node) + + if len(failed) > 0: + return self.failure("Some node failed to stop: " + repr(failed)) + + return self.success() + +# Register StopOnebyOne as a good test to run +AllTestClasses.append(StopOnebyOne) + + +class RestartOnebyOne(CTSTest): + '''Restart all the nodes in order''' + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "RestartOnebyOne" + self.startall = SimulStartLite(cm) + + def __call__(self, dummy): + '''Perform the 'RestartOnebyOne' test. ''' + self.incr("calls") + + # We ignore the "node" parameter... + + # Start up all the nodes... + ret = self.startall(None) + if not ret: + return self.failure("Setup failed") + + did_fail = [] + self.set_timer() + self.restart = RestartTest(self.CM) + for node in self.Env["nodes"]: + if not self.restart(node): + did_fail.append(node) + + if did_fail: + return self.failure("Could not restart %d nodes: %s" + % (len(did_fail), repr(did_fail))) + return self.success() + +# Register StopOnebyOne as a good test to run +AllTestClasses.append(RestartOnebyOne) + + +class PartialStart(CTSTest): + '''Start a node - but tell it to stop before it finishes starting up''' + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "PartialStart" + self.startall = SimulStartLite(cm) + self.stopall = SimulStopLite(cm) + self.stop = StopTest(cm) + #self.is_unsafe = 1 + + def __call__(self, node): + '''Perform the 'PartialStart' test. ''' + self.incr("calls") + + ret = self.stopall(None) + if not ret: + return self.failure("Setup failed") + + watchpats = [] + watchpats.append("pacemaker-controld.*Connecting to .* cluster infrastructure") + watch = self.create_watch(watchpats, self.Env["DeadTime"]+10) + watch.set_watch() + + self.CM.StartaCMnoBlock(node) + ret = watch.look_for_all() + if not ret: + self.logger.log("Patterns not found: " + repr(watch.unmatched)) + return self.failure("Setup of %s failed" % node) + + ret = self.stop(node) + if not ret: + return self.failure("%s did not stop in time" % node) + + return self.success() + + def errorstoignore(self): + '''Return list of errors which should be ignored''' + + # We might do some fencing in the 2-node case if we make it up far enough + return [ + r"Executing reboot fencing operation", + r"Requesting fencing \([^)]+\) targeting node ", + ] + +# Register StopOnebyOne as a good test to run +AllTestClasses.append(PartialStart) + + +class StandbyTest(CTSTest): + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "Standby" + self.benchmark = 1 + + self.start = StartTest(cm) + self.startall = SimulStartLite(cm) + + # make sure the node is active + # set the node to standby mode + # check resources, none resource should be running on the node + # set the node to active mode + # check resouces, resources should have been migrated back (SHOULD THEY?) + + def __call__(self, node): + + self.incr("calls") + ret = self.startall(None) + if not ret: + return self.failure("Start all nodes failed") + + self.debug("Make sure node %s is active" % node) + if self.CM.StandbyStatus(node) != "off": + if not self.CM.SetStandbyMode(node, "off"): + return self.failure("can't set node %s to active mode" % node) + + self.CM.cluster_stable() + + status = self.CM.StandbyStatus(node) + if status != "off": + return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status)) + + self.debug("Getting resources running on node %s" % node) + rsc_on_node = self.CM.active_resources(node) + + watchpats = [] + watchpats.append(r"State transition .* -> S_POLICY_ENGINE") + watch = self.create_watch(watchpats, self.Env["DeadTime"]+10) + watch.set_watch() + + self.debug("Setting node %s to standby mode" % node) + if not self.CM.SetStandbyMode(node, "on"): + return self.failure("can't set node %s to standby mode" % node) + + self.set_timer("on") + + ret = watch.look_for_all() + if not ret: + self.logger.log("Patterns not found: " + repr(watch.unmatched)) + self.CM.SetStandbyMode(node, "off") + return self.failure("cluster didn't react to standby change on %s" % node) + + self.CM.cluster_stable() + + status = self.CM.StandbyStatus(node) + if status != "on": + return self.failure("standby status of %s is [%s] but we expect [on]" % (node, status)) + self.log_timer("on") + + self.debug("Checking resources") + bad_run = self.CM.active_resources(node) + if len(bad_run) > 0: + rc = self.failure("%s set to standby, %s is still running on it" % (node, repr(bad_run))) + self.debug("Setting node %s to active mode" % node) + self.CM.SetStandbyMode(node, "off") + return rc + + self.debug("Setting node %s to active mode" % node) + if not self.CM.SetStandbyMode(node, "off"): + return self.failure("can't set node %s to active mode" % node) + + self.set_timer("off") + self.CM.cluster_stable() + + status = self.CM.StandbyStatus(node) + if status != "off": + return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status)) + self.log_timer("off") + + return self.success() + +AllTestClasses.append(StandbyTest) + + +class ValgrindTest(CTSTest): + '''Check for memory leaks''' + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "Valgrind" + self.stopall = SimulStopLite(cm) + self.startall = SimulStartLite(cm) + self.is_valgrind = 1 + self.is_loop = 1 + + def setup(self, node): + self.incr("calls") + + ret = self.stopall(None) + if not ret: + return self.failure("Stop all nodes failed") + + # @TODO Edit /etc/sysconfig/pacemaker on all nodes to enable valgrind, + # and clear any valgrind logs from previous runs. For now, we rely on + # the user to do this manually. + + ret = self.startall(None) + if not ret: + return self.failure("Start all nodes failed") + + return self.success() + + def teardown(self, node): + # Return all nodes to normal + # @TODO Edit /etc/sysconfig/pacemaker on all nodes to disable valgrind + ret = self.stopall(None) + if not ret: + return self.failure("Stop all nodes failed") + + return self.success() + + def find_leaks(self): + # Check for leaks + # (no longer used but kept in case feature is restored) + leaked = [] + self.stop = StopTest(self.CM) + + for node in self.Env["nodes"]: + rc = self.stop(node) + if not rc: + self.failure("Couldn't shut down %s" % node) + + (rc, _) = self.rsh(node, "grep -e indirectly.*lost:.*[1-9] -e definitely.*lost:.*[1-9] -e (ERROR|error).*SUMMARY:.*[1-9].*errors %s" % self.logger.logPat) + if rc != 1: + leaked.append(node) + self.failure("Valgrind errors detected on %s" % node) + (_, output) = self.rsh(node, "grep -e lost: -e SUMMARY: %s" % self.logger.logPat, verbose=1) + for line in output: + self.logger.log(line) + (_, output) = self.rsh(node, "cat %s" % self.logger.logPat, verbose=1) + for line in output: + self.debug(line) + + self.rsh(node, "rm -f %s" % self.logger.logPat, verbose=1) + return leaked + + def __call__(self, node): + #leaked = self.find_leaks() + #if len(leaked) > 0: + # return self.failure("Nodes %s leaked" % repr(leaked)) + + return self.success() + + def errorstoignore(self): + '''Return list of errors which should be ignored''' + return [ + r"pacemaker-based.*: \*\*\*\*\*\*\*\*\*\*\*\*\*", + r"pacemaker-based.*: .* avoid confusing Valgrind", + r"HA_VALGRIND_ENABLED", + ] + + +class StandbyLoopTest(ValgrindTest): + '''Check for memory leaks by putting a node in and out of standby for an hour''' + # @TODO This is not a useful test for memory leaks + def __init__(self, cm): + ValgrindTest.__init__(self,cm) + self.name = "StandbyLoop" + + def __call__(self, node): + + lpc = 0 + delay = 2 + failed = 0 + done = time.time() + self.Env["loop-minutes"] * 60 + while time.time() <= done and not failed: + lpc = lpc + 1 + + time.sleep(delay) + if not self.CM.SetStandbyMode(node, "on"): + self.failure("can't set node %s to standby mode" % node) + failed = lpc + + time.sleep(delay) + if not self.CM.SetStandbyMode(node, "off"): + self.failure("can't set node %s to active mode" % node) + failed = lpc + + leaked = self.find_leaks() + if failed: + return self.failure("Iteration %d failed" % failed) + elif len(leaked) > 0: + return self.failure("Nodes %s leaked" % repr(leaked)) + + return self.success() + +#AllTestClasses.append(StandbyLoopTest) + + +class BandwidthTest(CTSTest): +# Tests should not be cluster-manager-specific +# If you need to find out cluster manager configuration to do this, then +# it should be added to the generic cluster manager API. + '''Test the bandwidth which the cluster uses''' + def __init__(self, cm): + CTSTest.__init__(self, cm) + self.name = "Bandwidth" + self.start = StartTest(cm) + self.__setitem__("min",0) + self.__setitem__("max",0) + self.__setitem__("totalbandwidth",0) + (handle, self.tempfile) = tempfile.mkstemp(".cts") + os.close(handle) + self.startall = SimulStartLite(cm) + + def __call__(self, node): + '''Perform the Bandwidth test''' + self.incr("calls") + + if self.CM.upcount() < 1: + return self.skipped() + + Path = self.CM.InternalCommConfig() + if "ip" not in Path["mediatype"]: + return self.skipped() + + port = Path["port"][0] + port = int(port) + + ret = self.startall(None) + if not ret: + return self.failure("Test setup failed") + time.sleep(5) # We get extra messages right after startup. + + fstmpfile = "/var/run/band_estimate" + dumpcmd = "tcpdump -p -n -c 102 -i any udp port %d > %s 2>&1" \ + % (port, fstmpfile) + + (rc, _) = self.rsh(node, dumpcmd) + if rc == 0: + farfile = "root@%s:%s" % (node, fstmpfile) + self.rsh.copy(farfile, self.tempfile) + Bandwidth = self.countbandwidth(self.tempfile) + if not Bandwidth: + self.logger.log("Could not compute bandwidth.") + return self.success() + intband = int(Bandwidth + 0.5) + self.logger.log("...bandwidth: %d bits/sec" % intband) + self.Stats["totalbandwidth"] = self.Stats["totalbandwidth"] + Bandwidth + if self.Stats["min"] == 0: + self.Stats["min"] = Bandwidth + if Bandwidth > self.Stats["max"]: + self.Stats["max"] = Bandwidth + if Bandwidth < self.Stats["min"]: + self.Stats["min"] = Bandwidth + self.rsh(node, "rm -f %s" % fstmpfile) + os.unlink(self.tempfile) + return self.success() + else: + return self.failure("no response from tcpdump command [%d]!" % rc) + + def countbandwidth(self, file): + fp = open(file, "r") + fp.seek(0) + count = 0 + sum = 0 + while 1: + line = fp.readline() + if not line: + return None + if re.search("udp",line) or re.search("UDP,", line): + count = count + 1 + linesplit = line.split(" ") + for j in range(len(linesplit)-1): + if linesplit[j] == "udp": break + if linesplit[j] == "length:": break + + try: + sum = sum + int(linesplit[j+1]) + except ValueError: + self.logger.log("Invalid tcpdump line: %s" % line) + return None + T1 = linesplit[0] + timesplit = T1.split(":") + time2split = timesplit[2].split(".") + time1 = (int(timesplit[0])*60+int(timesplit[1]))*60+int(time2split[0])+int(time2split[1])*0.000001 + break + + while count < 100: + line = fp.readline() + if not line: + return None + if re.search("udp",line) or re.search("UDP,", line): + count = count+1 + linessplit = line.split(" ") + for j in range(len(linessplit)-1): + if linessplit[j] == "udp": break + if linessplit[j] == "length:": break + try: + sum = int(linessplit[j+1]) + sum + except ValueError: + self.logger.log("Invalid tcpdump line: %s" % line) + return None + + T2 = linessplit[0] + timesplit = T2.split(":") + time2split = timesplit[2].split(".") + time2 = (int(timesplit[0])*60+int(timesplit[1]))*60+int(time2split[0])+int(time2split[1])*0.000001 + time = time2-time1 + if (time <= 0): + return 0 + return int((sum*8)/time) + + def is_applicable(self): + '''BandwidthTest never applicable''' + return False + +AllTestClasses.append(BandwidthTest) + + +################################################################### +class MaintenanceMode(CTSTest): +################################################################### + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "MaintenanceMode" + self.start = StartTest(cm) + self.startall = SimulStartLite(cm) + self.max = 30 + #self.is_unsafe = 1 + self.benchmark = 1 + self.action = "asyncmon" + self.interval = 0 + self.rid = "maintenanceDummy" + + def toggleMaintenanceMode(self, node, action): + pats = [] + pats.append(self.templates["Pat:DC_IDLE"]) + + # fail the resource right after turning Maintenance mode on + # verify it is not recovered until maintenance mode is turned off + if action == "On": + pats.append(self.templates["Pat:RscOpFail"] % (self.action, self.rid)) + else: + pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid)) + pats.append(self.templates["Pat:RscOpOK"] % ("start", self.rid)) + + watch = self.create_watch(pats, 60) + watch.set_watch() + + self.debug("Turning maintenance mode %s" % action) + self.rsh(node, self.templates["MaintenanceMode%s" % (action)]) + if (action == "On"): + self.rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self.rid, node)) + + self.set_timer("recover%s" % (action)) + watch.look_for_all() + self.log_timer("recover%s" % (action)) + if watch.unmatched: + self.debug("Failed to find patterns when turning maintenance mode %s" % action) + return repr(watch.unmatched) + + return "" + + def insertMaintenanceDummy(self, node): + pats = [] + pats.append(("%s.*" % node) + (self.templates["Pat:RscOpOK"] % ("start", self.rid))) + + watch = self.create_watch(pats, 60) + watch.set_watch() + + self.CM.AddDummyRsc(node, self.rid) + + self.set_timer("addDummy") + watch.look_for_all() + self.log_timer("addDummy") + + if watch.unmatched: + self.debug("Failed to find patterns when adding maintenance dummy resource") + return repr(watch.unmatched) + return "" + + def removeMaintenanceDummy(self, node): + pats = [] + pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid)) + + watch = self.create_watch(pats, 60) + watch.set_watch() + self.CM.RemoveDummyRsc(node, self.rid) + + self.set_timer("removeDummy") + watch.look_for_all() + self.log_timer("removeDummy") + + if watch.unmatched: + self.debug("Failed to find patterns when removing maintenance dummy resource") + return repr(watch.unmatched) + return "" + + def managedRscList(self, node): + rscList = [] + (_, lines) = self.rsh(node, "crm_resource -c", verbose=1) + for line in lines: + if re.search("^Resource", line): + tmp = AuditResource(self.CM, line) + if tmp.managed(): + rscList.append(tmp.id) + + return rscList + + def verifyResources(self, node, rscList, managed): + managedList = list(rscList) + managed_str = "managed" + if not managed: + managed_str = "unmanaged" + + (_, lines) = self.rsh(node, "crm_resource -c", verbose=1) + for line in lines: + if re.search("^Resource", line): + tmp = AuditResource(self.CM, line) + if managed and not tmp.managed(): + continue + elif not managed and tmp.managed(): + continue + elif managedList.count(tmp.id): + managedList.remove(tmp.id) + + if len(managedList) == 0: + self.debug("Found all %s resources on %s" % (managed_str, node)) + return True + + self.logger.log("Could not find all %s resources on %s. %s" % (managed_str, node, managedList)) + return False + + def __call__(self, node): + '''Perform the 'MaintenanceMode' test. ''' + self.incr("calls") + verify_managed = False + verify_unmanaged = False + failPat = "" + + ret = self.startall(None) + if not ret: + return self.failure("Setup failed") + + # get a list of all the managed resources. We use this list + # after enabling maintenance mode to verify all managed resources + # become un-managed. After maintenance mode is turned off, we use + # this list to verify all the resources become managed again. + managedResources = self.managedRscList(node) + if len(managedResources) == 0: + self.logger.log("No managed resources on %s" % node) + return self.skipped() + + # insert a fake resource we can fail during maintenance mode + # so we can verify recovery does not take place until after maintenance + # mode is disabled. + failPat = failPat + self.insertMaintenanceDummy(node) + + # toggle maintenance mode ON, then fail dummy resource. + failPat = failPat + self.toggleMaintenanceMode(node, "On") + + # verify all the resources are now unmanaged + if self.verifyResources(node, managedResources, False): + verify_unmanaged = True + + # Toggle maintenance mode OFF, verify dummy is recovered. + failPat = failPat + self.toggleMaintenanceMode(node, "Off") + + # verify all the resources are now managed again + if self.verifyResources(node, managedResources, True): + verify_managed = True + + # Remove our maintenance dummy resource. + failPat = failPat + self.removeMaintenanceDummy(node) + + self.CM.cluster_stable() + + if failPat != "": + return self.failure("Unmatched patterns: %s" % (failPat)) + elif verify_unmanaged is False: + return self.failure("Failed to verify resources became unmanaged during maintenance mode") + elif verify_managed is False: + return self.failure("Failed to verify resources switched back to managed after disabling maintenance mode") + + return self.success() + + def errorstoignore(self): + '''Return list of errors which should be ignored''' + return [ + r"Updating failcount for %s" % self.rid, + r"schedulerd.*: Recover\s+%s\s+\(.*\)" % self.rid, + r"Unknown operation: fail", + self.templates["Pat:RscOpOK"] % (self.action, self.rid), + r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self.rid, self.action, self.interval), + ] + +AllTestClasses.append(MaintenanceMode) + + +class ResourceRecover(CTSTest): + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "ResourceRecover" + self.start = StartTest(cm) + self.startall = SimulStartLite(cm) + self.max = 30 + self.rid = None + self.rid_alt = None + #self.is_unsafe = 1 + self.benchmark = 1 + + # these are the values used for the new LRM API call + self.action = "asyncmon" + self.interval = 0 + + def __call__(self, node): + '''Perform the 'ResourceRecover' test. ''' + self.incr("calls") + + ret = self.startall(None) + if not ret: + return self.failure("Setup failed") + + # List all resources active on the node (skip test if none) + resourcelist = self.CM.active_resources(node) + if len(resourcelist) == 0: + self.logger.log("No active resources on %s" % node) + return self.skipped() + + # Choose one resource at random + rsc = self.choose_resource(node, resourcelist) + if rsc is None: + return self.failure("Could not get details of resource '%s'" % self.rid) + if rsc.id == rsc.clone_id: + self.debug("Failing " + rsc.id) + else: + self.debug("Failing " + rsc.id + " (also known as " + rsc.clone_id + ")") + + # Log patterns to watch for (failure, plus restart if managed) + pats = [] + pats.append(self.templates["Pat:CloneOpFail"] % (self.action, rsc.id, rsc.clone_id)) + if rsc.managed(): + pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid)) + if rsc.unique(): + pats.append(self.templates["Pat:RscOpOK"] % ("start", self.rid)) + else: + # Anonymous clones may get restarted with a different clone number + pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*")) + + # Fail resource. (Ideally, we'd fail it twice, to ensure the fail count + # is incrementing properly, but it might restart on a different node. + # We'd have to temporarily ban it from all other nodes and ensure the + # migration-threshold hasn't been reached.) + if self.fail_resource(rsc, node, pats) is None: + return None # self.failure() already called + + return self.success() + + def choose_resource(self, node, resourcelist): + """ Choose a random resource to target """ + + self.rid = self.Env.random_gen.choice(resourcelist) + self.rid_alt = self.rid + (_, lines) = self.rsh(node, "crm_resource -c", verbose=1) + for line in lines: + if line.startswith("Resource: "): + rsc = AuditResource(self.CM, line) + if rsc.id == self.rid: + # Handle anonymous clones that get renamed + self.rid = rsc.clone_id + return rsc + return None + + def get_failcount(self, node): + """ Check the fail count of targeted resource on given node """ + + (rc, lines) = self.rsh(node, + "crm_failcount --quiet --query --resource %s " + "--operation %s --interval %d " + "--node %s" % (self.rid, self.action, + self.interval, node), verbose=1) + if rc != 0 or len(lines) != 1: + self.logger.log("crm_failcount on %s failed (%d): %s" % (node, rc, + " // ".join(map(str.strip, lines)))) + return -1 + try: + failcount = int(lines[0]) + except (IndexError, ValueError): + self.logger.log("crm_failcount output on %s unparseable: %s" % (node, + ' '.join(lines))) + return -1 + return failcount + + def fail_resource(self, rsc, node, pats): + """ Fail the targeted resource, and verify as expected """ + + orig_failcount = self.get_failcount(node) + + watch = self.create_watch(pats, 60) + watch.set_watch() + + self.rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self.rid, node)) + + self.set_timer("recover") + watch.look_for_all() + self.log_timer("recover") + + self.CM.cluster_stable() + recovered = self.CM.ResourceLocation(self.rid) + + if watch.unmatched: + return self.failure("Patterns not found: %s" % repr(watch.unmatched)) + + elif rsc.unique() and len(recovered) > 1: + return self.failure("%s is now active on more than one node: %s"%(self.rid, repr(recovered))) + + elif len(recovered) > 0: + self.debug("%s is running on: %s" % (self.rid, repr(recovered))) + + elif rsc.managed(): + return self.failure("%s was not recovered and is inactive" % self.rid) + + new_failcount = self.get_failcount(node) + if new_failcount != (orig_failcount + 1): + return self.failure("%s fail count is %d not %d" % (self.rid, + new_failcount, orig_failcount + 1)) + + return 0 # Anything but None is success + + def errorstoignore(self): + '''Return list of errors which should be ignored''' + return [ + r"Updating failcount for %s" % self.rid, + r"schedulerd.*: Recover\s+(%s|%s)\s+\(.*\)" % (self.rid, self.rid_alt), + r"Unknown operation: fail", + self.templates["Pat:RscOpOK"] % (self.action, self.rid), + r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self.rid, self.action, self.interval), + ] + +AllTestClasses.append(ResourceRecover) + + +class ComponentFail(CTSTest): + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "ComponentFail" + self.startall = SimulStartLite(cm) + self.complist = cm.Components() + self.patterns = [] + self.okerrpatterns = [] + self.is_unsafe = 1 + + def __call__(self, node): + '''Perform the 'ComponentFail' test. ''' + self.incr("calls") + self.patterns = [] + self.okerrpatterns = [] + + # start all nodes + ret = self.startall(None) + if not ret: + return self.failure("Setup failed") + + if not self.CM.cluster_stable(self.Env["StableTime"]): + return self.failure("Setup failed - unstable") + + node_is_dc = self.CM.is_node_dc(node, None) + + # select a component to kill + chosen = self.Env.random_gen.choice(self.complist) + while chosen.dc_only and node_is_dc == 0: + chosen = self.Env.random_gen.choice(self.complist) + + self.debug("...component %s (dc=%d)" % (chosen.name, node_is_dc)) + self.incr(chosen.name) + + if chosen.name != "corosync": + self.patterns.append(self.templates["Pat:ChildKilled"] %(node, chosen.name)) + self.patterns.append(self.templates["Pat:ChildRespawn"] %(node, chosen.name)) + + self.patterns.extend(chosen.pats) + if node_is_dc: + self.patterns.extend(chosen.dc_pats) + + # @TODO this should be a flag in the Component + if chosen.name in [ "corosync", "pacemaker-based", "pacemaker-fenced" ]: + # Ignore actions for fence devices if fencer will respawn + # (their registration will be lost, and probes will fail) + self.okerrpatterns = [ self.templates["Pat:Fencing_active"] ] + (_, lines) = self.rsh(node, "crm_resource -c", verbose=1) + for line in lines: + if re.search("^Resource", line): + r = AuditResource(self.CM, line) + if r.rclass == "stonith": + self.okerrpatterns.append(self.templates["Pat:Fencing_recover"] % r.id) + self.okerrpatterns.append(self.templates["Pat:Fencing_probe"] % r.id) + + # supply a copy so self.patterns doesn't end up empty + tmpPats = [] + tmpPats.extend(self.patterns) + self.patterns.extend(chosen.badnews_ignore) + + # Look for STONITH ops, depending on Env["at-boot"] we might need to change the nodes status + stonithPats = [] + stonithPats.append(self.templates["Pat:Fencing_ok"] % node) + stonith = self.create_watch(stonithPats, 0) + stonith.set_watch() + + # set the watch for stable + watch = self.create_watch( + tmpPats, self.Env["DeadTime"] + self.Env["StableTime"] + self.Env["StartTime"]) + watch.set_watch() + + # kill the component + chosen.kill(node) + + self.debug("Waiting for the cluster to recover") + self.CM.cluster_stable() + + self.debug("Waiting for any fenced node to come back up") + self.CM.ns.wait_for_all_nodes(self.Env["nodes"], 600) + + self.debug("Waiting for the cluster to re-stabilize with all nodes") + self.CM.cluster_stable(self.Env["StartTime"]) + + self.debug("Checking if %s was shot" % node) + shot = stonith.look(60) + if shot: + self.debug("Found: " + repr(shot)) + self.okerrpatterns.append(self.templates["Pat:Fencing_start"] % node) + + if not self.Env["at-boot"]: + self.CM.ShouldBeStatus[node] = "down" + + # If fencing occurred, chances are many (if not all) the expected logs + # will not be sent - or will be lost when the node reboots + return self.success() + + # check for logs indicating a graceful recovery + matched = watch.look_for_all(allow_multiple_matches=True) + if watch.unmatched: + self.logger.log("Patterns not found: " + repr(watch.unmatched)) + + self.debug("Waiting for the cluster to re-stabilize with all nodes") + is_stable = self.CM.cluster_stable(self.Env["StartTime"]) + + if not matched: + return self.failure("Didn't find all expected %s patterns" % chosen.name) + elif not is_stable: + return self.failure("Cluster did not become stable after killing %s" % chosen.name) + + return self.success() + + def errorstoignore(self): + '''Return list of errors which should be ignored''' + # Note that okerrpatterns refers to the last time we ran this test + # The good news is that this works fine for us... + self.okerrpatterns.extend(self.patterns) + return self.okerrpatterns + +AllTestClasses.append(ComponentFail) + + +class SplitBrainTest(CTSTest): + '''It is used to test split-brain. when the path between the two nodes break + check the two nodes both take over the resource''' + def __init__(self,cm): + CTSTest.__init__(self,cm) + self.name = "SplitBrain" + self.start = StartTest(cm) + self.startall = SimulStartLite(cm) + self.is_experimental = 1 + + def isolate_partition(self, partition): + other_nodes = [] + other_nodes.extend(self.Env["nodes"]) + + for node in partition: + try: + other_nodes.remove(node) + except ValueError: + self.logger.log("Node "+node+" not in " + repr(self.Env["nodes"]) + " from " +repr(partition)) + + if len(other_nodes) == 0: + return 1 + + self.debug("Creating partition: " + repr(partition)) + self.debug("Everyone else: " + repr(other_nodes)) + + for node in partition: + if not self.CM.isolate_node(node, other_nodes): + self.logger.log("Could not isolate %s" % node) + return 0 + + return 1 + + def heal_partition(self, partition): + other_nodes = [] + other_nodes.extend(self.Env["nodes"]) + + for node in partition: + try: + other_nodes.remove(node) + except ValueError: + self.logger.log("Node "+node+" not in " + repr(self.Env["nodes"])) + + if len(other_nodes) == 0: + return 1 + + self.debug("Healing partition: " + repr(partition)) + self.debug("Everyone else: " + repr(other_nodes)) + + for node in partition: + self.CM.unisolate_node(node, other_nodes) + + def __call__(self, node): + '''Perform split-brain test''' + self.incr("calls") + self.passed = 1 + partitions = {} + + ret = self.startall(None) + if not ret: + return self.failure("Setup failed") + + while 1: + # Retry until we get multiple partitions + partitions = {} + p_max = len(self.Env["nodes"]) + for node in self.Env["nodes"]: + p = self.Env.random_gen.randint(1, p_max) + if not p in partitions: + partitions[p] = [] + partitions[p].append(node) + p_max = len(list(partitions.keys())) + if p_max > 1: + break + # else, try again + + self.debug("Created %d partitions" % p_max) + for key in list(partitions.keys()): + self.debug("Partition["+str(key)+"]:\t"+repr(partitions[key])) + + # Disabling STONITH to reduce test complexity for now + self.rsh(node, "crm_attribute -V -n stonith-enabled -v false") + + for key in list(partitions.keys()): + self.isolate_partition(partitions[key]) + + count = 30 + while count > 0: + if len(self.CM.find_partitions()) != p_max: + time.sleep(10) + else: + break + else: + self.failure("Expected partitions were not created") + + # Target number of partitions formed - wait for stability + if not self.CM.cluster_stable(): + self.failure("Partitioned cluster not stable") + + # Now audit the cluster state + self.CM.partitions_expected = p_max + if not self.audit(): + self.failure("Audits failed") + self.CM.partitions_expected = 1 + + # And heal them again + for key in list(partitions.keys()): + self.heal_partition(partitions[key]) + + # Wait for a single partition to form + count = 30 + while count > 0: + if len(self.CM.find_partitions()) != 1: + time.sleep(10) + count -= 1 + else: + break + else: + self.failure("Cluster did not reform") + + # Wait for it to have the right number of members + count = 30 + while count > 0: + members = [] + + partitions = self.CM.find_partitions() + if len(partitions) > 0: + members = partitions[0].split() + + if len(members) != len(self.Env["nodes"]): + time.sleep(10) + count -= 1 + else: + break + else: + self.failure("Cluster did not completely reform") + + # Wait up to 20 minutes - the delay is more preferable than + # trying to continue with in a messed up state + if not self.CM.cluster_stable(1200): + self.failure("Reformed cluster not stable") + if self.Env["continue"]: + answer = "Y" + else: + try: + answer = input('Continue? [nY]') + except EOFError as e: + answer = "n" + if answer and answer == "n": + raise ValueError("Reformed cluster not stable") + + # Turn fencing back on + if self.Env["DoFencing"]: + self.rsh(node, "crm_attribute -V -D -n stonith-enabled") + + self.CM.cluster_stable() + + if self.passed: + return self.success() + return self.failure("See previous errors") + + def errorstoignore(self): + '''Return list of errors which are 'normal' and should be ignored''' + return [ + r"Another DC detected:", + r"(ERROR|error).*: .*Application of an update diff failed", + r"pacemaker-controld.*:.*not in our membership list", + r"CRIT:.*node.*returning after partition", + ] + + def is_applicable(self): + if not self.is_applicable_common(): + return False + return len(self.Env["nodes"]) > 2 + +AllTestClasses.append(SplitBrainTest) + + +class Reattach(CTSTest): + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "Reattach" + self.startall = SimulStartLite(cm) + self.restart1 = RestartTest(cm) + self.stopall = SimulStopLite(cm) + self.is_unsafe = 0 # Handled by canrunnow() + + def _is_managed(self, node): + (_, is_managed) = self.rsh(node, "crm_attribute -t rsc_defaults -n is-managed -q -G -d true", verbose=1) + is_managed = is_managed[0].strip() + return is_managed == "true" + + def _set_unmanaged(self, node): + self.debug("Disable resource management") + self.rsh(node, "crm_attribute -t rsc_defaults -n is-managed -v false") + + def _set_managed(self, node): + self.debug("Re-enable resource management") + self.rsh(node, "crm_attribute -t rsc_defaults -n is-managed -D") + + def setup(self, node): + attempt = 0 + if not self.startall(None): + return None + + # Make sure we are really _really_ stable and that all + # resources, including those that depend on transient node + # attributes, are started + while not self.CM.cluster_stable(double_check=True): + if attempt < 5: + attempt += 1 + self.debug("Not stable yet, re-testing") + else: + self.logger.log("Cluster is not stable") + return None + + return 1 + + def teardown(self, node): + + # Make sure 'node' is up + start = StartTest(self.CM) + start(node) + + if not self._is_managed(node): + self.logger.log("Attempting to re-enable resource management on %s" % node) + self._set_managed(node) + self.CM.cluster_stable() + if not self._is_managed(node): + self.logger.log("Could not re-enable resource management") + return 0 + + return 1 + + def canrunnow(self, node): + '''Return TRUE if we can meaningfully run right now''' + if self.find_ocfs2_resources(node): + self.logger.log("Detach/Reattach scenarios are not possible with OCFS2 services present") + return 0 + return 1 + + def __call__(self, node): + self.incr("calls") + + pats = [] + # Conveniently, the scheduler will display this message when disabling + # management, even if fencing is not enabled, so we can rely on it. + managed = self.create_watch(["No fencing will be done"], 60) + managed.set_watch() + + self._set_unmanaged(node) + + if not managed.look_for_all(): + self.logger.log("Patterns not found: " + repr(managed.unmatched)) + return self.failure("Resource management not disabled") + + pats = [] + pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*")) + pats.append(self.templates["Pat:RscOpOK"] % ("stop", ".*")) + pats.append(self.templates["Pat:RscOpOK"] % ("promote", ".*")) + pats.append(self.templates["Pat:RscOpOK"] % ("demote", ".*")) + pats.append(self.templates["Pat:RscOpOK"] % ("migrate", ".*")) + + watch = self.create_watch(pats, 60, "ShutdownActivity") + watch.set_watch() + + self.debug("Shutting down the cluster") + ret = self.stopall(None) + if not ret: + self._set_managed(node) + return self.failure("Couldn't shut down the cluster") + + self.debug("Bringing the cluster back up") + ret = self.startall(None) + time.sleep(5) # allow ping to update the CIB + if not ret: + self._set_managed(node) + return self.failure("Couldn't restart the cluster") + + if self.local_badnews("ResourceActivity:", watch): + self._set_managed(node) + return self.failure("Resources stopped or started during cluster restart") + + watch = self.create_watch(pats, 60, "StartupActivity") + watch.set_watch() + + # Re-enable resource management (and verify it happened). + self._set_managed(node) + self.CM.cluster_stable() + if not self._is_managed(node): + return self.failure("Could not re-enable resource management") + + # Ignore actions for STONITH resources + ignore = [] + (_, lines) = self.rsh(node, "crm_resource -c", verbose=1) + for line in lines: + if re.search("^Resource", line): + r = AuditResource(self.CM, line) + if r.rclass == "stonith": + + self.debug("Ignoring start actions for %s" % r.id) + ignore.append(self.templates["Pat:RscOpOK"] % ("start", r.id)) + + if self.local_badnews("ResourceActivity:", watch, ignore): + return self.failure("Resources stopped or started after resource management was re-enabled") + + return ret + + def errorstoignore(self): + '''Return list of errors which should be ignored''' + return [ + r"resource( was|s were) active at shutdown", + ] + + def is_applicable(self): + return True + +AllTestClasses.append(Reattach) + + +class SpecialTest1(CTSTest): + '''Set up a custom test to cause quorum failure issues for Andrew''' + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "SpecialTest1" + self.startall = SimulStartLite(cm) + self.restart1 = RestartTest(cm) + self.stopall = SimulStopLite(cm) + + def __call__(self, node): + '''Perform the 'SpecialTest1' test for Andrew. ''' + self.incr("calls") + + # Shut down all the nodes... + ret = self.stopall(None) + if not ret: + return self.failure("Could not stop all nodes") + + # Test config recovery when the other nodes come up + self.rsh(node, "rm -f " + BuildOptions.CIB_DIR + "/cib*") + + # Start the selected node + ret = self.restart1(node) + if not ret: + return self.failure("Could not start "+node) + + # Start all remaining nodes + ret = self.startall(None) + if not ret: + return self.failure("Could not start the remaining nodes") + + return self.success() + + def errorstoignore(self): + '''Return list of errors which should be ignored''' + # Errors that occur as a result of the CIB being wiped + return [ + r"error.*: v1 patchset error, patch failed to apply: Application of an update diff failed", + r"error.*: Resource start-up disabled since no STONITH resources have been defined", + r"error.*: Either configure some or disable STONITH with the stonith-enabled option", + r"error.*: NOTE: Clusters with shared data need STONITH to ensure data integrity", + ] + +AllTestClasses.append(SpecialTest1) + + +class HAETest(CTSTest): + '''Set up a custom test to cause quorum failure issues for Andrew''' + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "HAETest" + self.stopall = SimulStopLite(cm) + self.startall = SimulStartLite(cm) + self.is_loop = 1 + + def setup(self, node): + # Start all remaining nodes + ret = self.startall(None) + if not ret: + return self.failure("Couldn't start all nodes") + return self.success() + + def teardown(self, node): + # Stop everything + ret = self.stopall(None) + if not ret: + return self.failure("Couldn't stop all nodes") + return self.success() + + def wait_on_state(self, node, resource, expected_clones, attempts=240): + while attempts > 0: + active = 0 + (rc, lines) = self.rsh(node, "crm_resource -r %s -W -Q" % resource, verbose=1) + + # Hack until crm_resource does the right thing + if rc == 0 and lines: + active = len(lines) + + if len(lines) == expected_clones: + return 1 + + elif rc == 1: + self.debug("Resource %s is still inactive" % resource) + + elif rc == 234: + self.logger.log("Unknown resource %s" % resource) + return 0 + + elif rc == 246: + self.logger.log("Cluster is inactive") + return 0 + + elif rc != 0: + self.logger.log("Call to crm_resource failed, rc=%d" % rc) + return 0 + + else: + self.debug("Resource %s is active on %d times instead of %d" % (resource, active, expected_clones)) + + attempts -= 1 + time.sleep(1) + + return 0 + + def find_dlm(self, node): + self.r_dlm = None + + (_, lines) = self.rsh(node, "crm_resource -c", verbose=1) + for line in lines: + if re.search("^Resource", line): + r = AuditResource(self.CM, line) + if r.rtype == "controld" and r.parent != "NA": + self.debug("Found dlm: %s" % self.r_dlm) + self.r_dlm = r.parent + return 1 + return 0 + + def find_hae_resources(self, node): + self.r_dlm = None + self.r_o2cb = None + self.r_ocfs2 = [] + + if self.find_dlm(node): + self.find_ocfs2_resources(node) + + def is_applicable(self): + if not self.is_applicable_common(): + return False + if self.Env["Schema"] == "hae": + return True + return None + + +class HAERoleTest(HAETest): + def __init__(self, cm): + '''Lars' mount/unmount test for the HA extension. ''' + HAETest.__init__(self,cm) + self.name = "HAERoleTest" + + def change_state(self, node, resource, target): + (rc, _) = self.rsh(node, "crm_resource -V -r %s -p target-role -v %s --meta" % (resource, target)) + return rc + + def __call__(self, node): + self.incr("calls") + lpc = 0 + failed = 0 + delay = 2 + done = time.time() + self.Env["loop-minutes"]*60 + self.find_hae_resources(node) + + clone_max = len(self.Env["nodes"]) + while time.time() <= done and not failed: + lpc = lpc + 1 + + self.change_state(node, self.r_dlm, "Stopped") + if not self.wait_on_state(node, self.r_dlm, 0): + self.failure("%s did not go down correctly" % self.r_dlm) + failed = lpc + + self.change_state(node, self.r_dlm, "Started") + if not self.wait_on_state(node, self.r_dlm, clone_max): + self.failure("%s did not come up correctly" % self.r_dlm) + failed = lpc + + if not self.wait_on_state(node, self.r_o2cb, clone_max): + self.failure("%s did not come up correctly" % self.r_o2cb) + failed = lpc + + for fs in self.r_ocfs2: + if not self.wait_on_state(node, fs, clone_max): + self.failure("%s did not come up correctly" % fs) + failed = lpc + + if failed: + return self.failure("iteration %d failed" % failed) + return self.success() + +AllTestClasses.append(HAERoleTest) + + +class HAEStandbyTest(HAETest): + '''Set up a custom test to cause quorum failure issues for Andrew''' + def __init__(self, cm): + HAETest.__init__(self,cm) + self.name = "HAEStandbyTest" + + def change_state(self, node, resource, target): + (rc, _) = self.rsh(node, "crm_standby -V -l reboot -v %s" % (target)) + return rc + + def __call__(self, node): + self.incr("calls") + + lpc = 0 + failed = 0 + done = time.time() + self.Env["loop-minutes"]*60 + self.find_hae_resources(node) + + clone_max = len(self.Env["nodes"]) + while time.time() <= done and not failed: + lpc = lpc + 1 + + self.change_state(node, self.r_dlm, "true") + if not self.wait_on_state(node, self.r_dlm, clone_max-1): + self.failure("%s did not go down correctly" % self.r_dlm) + failed = lpc + + self.change_state(node, self.r_dlm, "false") + if not self.wait_on_state(node, self.r_dlm, clone_max): + self.failure("%s did not come up correctly" % self.r_dlm) + failed = lpc + + if not self.wait_on_state(node, self.r_o2cb, clone_max): + self.failure("%s did not come up correctly" % self.r_o2cb) + failed = lpc + + for fs in self.r_ocfs2: + if not self.wait_on_state(node, fs, clone_max): + self.failure("%s did not come up correctly" % fs) + failed = lpc + + if failed: + return self.failure("iteration %d failed" % failed) + return self.success() + +AllTestClasses.append(HAEStandbyTest) + + +class NearQuorumPointTest(CTSTest): + ''' + This test brings larger clusters near the quorum point (50%). + In addition, it will test doing starts and stops at the same time. + + Here is how I think it should work: + - loop over the nodes and decide randomly which will be up and which + will be down Use a 50% probability for each of up/down. + - figure out what to do to get into that state from the current state + - in parallel, bring up those going up and bring those going down. + ''' + + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "NearQuorumPoint" + + def __call__(self, dummy): + '''Perform the 'NearQuorumPoint' test. ''' + self.incr("calls") + startset = [] + stopset = [] + + stonith = self.CM.prepare_fencing_watcher("NearQuorumPoint") + #decide what to do with each node + for node in self.Env["nodes"]: + action = self.Env.random_gen.choice(["start","stop"]) + #action = self.Env.random_gen.choice(["start","stop","no change"]) + if action == "start" : + startset.append(node) + elif action == "stop" : + stopset.append(node) + + self.debug("start nodes:" + repr(startset)) + self.debug("stop nodes:" + repr(stopset)) + + #add search patterns + watchpats = [ ] + for node in stopset: + if self.CM.ShouldBeStatus[node] == "up": + watchpats.append(self.templates["Pat:We_stopped"] % node) + + for node in startset: + if self.CM.ShouldBeStatus[node] == "down": + #watchpats.append(self.templates["Pat:NonDC_started"] % node) + watchpats.append(self.templates["Pat:Local_started"] % node) + else: + for stopping in stopset: + if self.CM.ShouldBeStatus[stopping] == "up": + watchpats.append(self.templates["Pat:They_stopped"] % (node, self.CM.key_for_node(stopping))) + + if len(watchpats) == 0: + return self.skipped() + + if len(startset) != 0: + watchpats.append(self.templates["Pat:DC_IDLE"]) + + watch = self.create_watch(watchpats, self.Env["DeadTime"]+10) + + watch.set_watch() + + #begin actions + for node in stopset: + if self.CM.ShouldBeStatus[node] == "up": + self.CM.StopaCMnoBlock(node) + + for node in startset: + if self.CM.ShouldBeStatus[node] == "down": + self.CM.StartaCMnoBlock(node) + + #get the result + if watch.look_for_all(): + self.CM.cluster_stable() + self.CM.fencing_cleanup("NearQuorumPoint", stonith) + return self.success() + + self.logger.log("Warn: Patterns not found: " + repr(watch.unmatched)) + + #get the "bad" nodes + upnodes = [] + for node in stopset: + if self.CM.StataCM(node) == 1: + upnodes.append(node) + + downnodes = [] + for node in startset: + if self.CM.StataCM(node) == 0: + downnodes.append(node) + + self.CM.fencing_cleanup("NearQuorumPoint", stonith) + if upnodes == [] and downnodes == []: + self.CM.cluster_stable() + + # Make sure they're completely down with no residule + for node in stopset: + self.rsh(node, self.templates["StopCmd"]) + + return self.success() + + if len(upnodes) > 0: + self.logger.log("Warn: Unstoppable nodes: " + repr(upnodes)) + + if len(downnodes) > 0: + self.logger.log("Warn: Unstartable nodes: " + repr(downnodes)) + + return self.failure() + + def is_applicable(self): + return True + +AllTestClasses.append(NearQuorumPointTest) + + +class RollingUpgradeTest(CTSTest): + '''Perform a rolling upgrade of the cluster''' + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "RollingUpgrade" + self.start = StartTest(cm) + self.stop = StopTest(cm) + self.stopall = SimulStopLite(cm) + self.startall = SimulStartLite(cm) + + def setup(self, node): + # Start all remaining nodes + ret = self.stopall(None) + if not ret: + return self.failure("Couldn't stop all nodes") + + for node in self.Env["nodes"]: + if not self.downgrade(node, None): + return self.failure("Couldn't downgrade %s" % node) + + ret = self.startall(None) + if not ret: + return self.failure("Couldn't start all nodes") + return self.success() + + def teardown(self, node): + # Stop everything + ret = self.stopall(None) + if not ret: + return self.failure("Couldn't stop all nodes") + + for node in self.Env["nodes"]: + if not self.upgrade(node, None): + return self.failure("Couldn't upgrade %s" % node) + + return self.success() + + def install(self, node, version, start=1, flags="--force"): + + target_dir = "/tmp/rpm-%s" % version + src_dir = "%s/%s" % (self.Env["rpm-dir"], version) + + self.logger.log("Installing %s on %s with %s" % (version, node, flags)) + if not self.stop(node): + return self.failure("stop failure: "+node) + + self.rsh(node, "mkdir -p %s" % target_dir) + self.rsh(node, "rm -f %s/*.rpm" % target_dir) + (_, lines) = self.rsh(node, "ls -1 %s/*.rpm" % src_dir, verbose=1) + for line in lines: + line = line[:-1] + rc = self.rsh.copy("%s" % (line), "%s:%s/" % (node, target_dir)) + self.rsh(node, "rpm -Uvh %s %s/*.rpm" % (flags, target_dir)) + + if start and not self.start(node): + return self.failure("start failure: "+node) + + return self.success() + + def upgrade(self, node, start=1): + return self.install(node, self.Env["current-version"], start) + + def downgrade(self, node, start=1): + return self.install(node, self.Env["previous-version"], start, "--force --nodeps") + + def __call__(self, node): + '''Perform the 'Rolling Upgrade' test. ''' + self.incr("calls") + + for node in self.Env["nodes"]: + if self.upgrade(node): + return self.failure("Couldn't upgrade %s" % node) + + self.CM.cluster_stable() + + return self.success() + + def is_applicable(self): + if not self.is_applicable_common(): + return None + + if not "rpm-dir" in list(self.Env.keys()): + return None + if not "current-version" in list(self.Env.keys()): + return None + if not "previous-version" in list(self.Env.keys()): + return None + + return 1 + +# Register RestartTest as a good test to run +AllTestClasses.append(RollingUpgradeTest) + + +class BSC_AddResource(CTSTest): + '''Add a resource to the cluster''' + def __init__(self, cm): + CTSTest.__init__(self, cm) + self.name = "AddResource" + self.resource_offset = 0 + self.cib_cmd = """cibadmin -C -o %s -X '%s' """ + + def __call__(self, node): + self.incr("calls") + self.resource_offset = self.resource_offset + 1 + + r_id = "bsc-rsc-%s-%d" % (node, self.resource_offset) + start_pat = "pacemaker-controld.*%s_start_0.*confirmed.*ok" + + patterns = [] + patterns.append(start_pat % r_id) + + watch = self.create_watch(patterns, self.Env["DeadTime"]) + watch.set_watch() + + ip = self.NextIP() + if not self.make_ip_resource(node, r_id, "ocf", "IPaddr", ip): + return self.failure("Make resource %s failed" % r_id) + + failed = 0 + watch_result = watch.look_for_all() + if watch.unmatched: + for regex in watch.unmatched: + self.logger.log ("Warn: Pattern not found: %s" % (regex)) + failed = 1 + + if failed: + return self.failure("Resource pattern(s) not found") + + if not self.CM.cluster_stable(self.Env["DeadTime"]): + return self.failure("Unstable cluster") + + return self.success() + + def NextIP(self): + ip = self.Env["IPBase"] + if ":" in ip: + fields = ip.rpartition(":") + fields[2] = str(hex(int(fields[2], 16)+1)) + print(str(hex(int(f[2], 16)+1))) + else: + fields = ip.rpartition('.') + fields[2] = str(int(fields[2])+1) + + ip = fields[0] + fields[1] + fields[3]; + self.Env["IPBase"] = ip + return ip.strip() + + def make_ip_resource(self, node, id, rclass, type, ip): + self.logger.log("Creating %s:%s:%s (%s) on %s" % (rclass,type,id,ip,node)) + rsc_xml=""" +<primitive id="%s" class="%s" type="%s" provider="heartbeat"> + <instance_attributes id="%s"><attributes> + <nvpair id="%s" name="ip" value="%s"/> + </attributes></instance_attributes> +</primitive>""" % (id, rclass, type, id, id, ip) + + node_constraint = """ + <rsc_location id="run_%s" rsc="%s"> + <rule id="pref_run_%s" score="100"> + <expression id="%s_loc_expr" attribute="#uname" operation="eq" value="%s"/> + </rule> + </rsc_location>""" % (id, id, id, id, node) + + rc = 0 + (rc, _) = self.rsh(node, self.cib_cmd % ("constraints", node_constraint), verbose=1) + if rc != 0: + self.logger.log("Constraint creation failed: %d" % rc) + return None + + (rc, _) = self.rsh(node, self.cib_cmd % ("resources", rsc_xml), verbose=1) + if rc != 0: + self.logger.log("Resource creation failed: %d" % rc) + return None + + return 1 + + def is_applicable(self): + if self.Env["DoBSC"]: + return True + return None + +AllTestClasses.append(BSC_AddResource) + + +class SimulStopLite(CTSTest): + '''Stop any active nodes ~ simultaneously''' + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "SimulStopLite" + + def __call__(self, dummy): + '''Perform the 'SimulStopLite' setup work. ''' + self.incr("calls") + + self.debug("Setup: " + self.name) + + # We ignore the "node" parameter... + watchpats = [ ] + + for node in self.Env["nodes"]: + if self.CM.ShouldBeStatus[node] == "up": + self.incr("WasStarted") + watchpats.append(self.templates["Pat:We_stopped"] % node) + + if len(watchpats) == 0: + return self.success() + + # Stop all the nodes - at about the same time... + watch = self.create_watch(watchpats, self.Env["DeadTime"]+10) + + watch.set_watch() + self.set_timer() + for node in self.Env["nodes"]: + if self.CM.ShouldBeStatus[node] == "up": + self.CM.StopaCMnoBlock(node) + if watch.look_for_all(): + # Make sure they're completely down with no residule + for node in self.Env["nodes"]: + self.rsh(node, self.templates["StopCmd"]) + + return self.success() + + did_fail = 0 + up_nodes = [] + for node in self.Env["nodes"]: + if self.CM.StataCM(node) == 1: + did_fail = 1 + up_nodes.append(node) + + if did_fail: + return self.failure("Active nodes exist: " + repr(up_nodes)) + + self.logger.log("Warn: All nodes stopped but CTS didn't detect: " + + repr(watch.unmatched)) + + return self.failure("Missing log message: "+repr(watch.unmatched)) + + def is_applicable(self): + '''SimulStopLite is a setup test and never applicable''' + return False + + +class SimulStartLite(CTSTest): + '''Start any stopped nodes ~ simultaneously''' + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "SimulStartLite" + + def __call__(self, dummy): + '''Perform the 'SimulStartList' setup work. ''' + self.incr("calls") + self.debug("Setup: " + self.name) + + # We ignore the "node" parameter... + node_list = [] + for node in self.Env["nodes"]: + if self.CM.ShouldBeStatus[node] == "down": + self.incr("WasStopped") + node_list.append(node) + + self.set_timer() + while len(node_list) > 0: + # Repeat until all nodes come up + watchpats = [ ] + + uppat = self.templates["Pat:NonDC_started"] + if self.CM.upcount() == 0: + uppat = self.templates["Pat:Local_started"] + + watchpats.append(self.templates["Pat:DC_IDLE"]) + for node in node_list: + watchpats.append(uppat % node) + watchpats.append(self.templates["Pat:InfraUp"] % node) + watchpats.append(self.templates["Pat:PacemakerUp"] % node) + + # Start all the nodes - at about the same time... + watch = self.create_watch(watchpats, self.Env["DeadTime"]+10) + watch.set_watch() + + stonith = self.CM.prepare_fencing_watcher(self.name) + + for node in node_list: + self.CM.StartaCMnoBlock(node) + + watch.look_for_all() + + node_list = self.CM.fencing_cleanup(self.name, stonith) + + if node_list == None: + return self.failure("Cluster did not stabilize") + + # Remove node_list messages from watch.unmatched + for node in node_list: + self.logger.debug("Dealing with stonith operations for %s" % repr(node_list)) + if watch.unmatched: + try: + watch.unmatched.remove(uppat % node) + except: + self.debug("Already matched: %s" % (uppat % node)) + try: + watch.unmatched.remove(self.templates["Pat:InfraUp"] % node) + except: + self.debug("Already matched: %s" % (self.templates["Pat:InfraUp"] % node)) + try: + watch.unmatched.remove(self.templates["Pat:PacemakerUp"] % node) + except: + self.debug("Already matched: %s" % (self.templates["Pat:PacemakerUp"] % node)) + + if watch.unmatched: + for regex in watch.unmatched: + self.logger.log ("Warn: Startup pattern not found: %s" %(regex)) + + if not self.CM.cluster_stable(): + return self.failure("Cluster did not stabilize") + + did_fail = 0 + unstable = [] + for node in self.Env["nodes"]: + if self.CM.StataCM(node) == 0: + did_fail = 1 + unstable.append(node) + + if did_fail: + return self.failure("Unstarted nodes exist: " + repr(unstable)) + + unstable = [] + for node in self.Env["nodes"]: + if not self.CM.node_stable(node): + did_fail = 1 + unstable.append(node) + + if did_fail: + return self.failure("Unstable cluster nodes exist: " + repr(unstable)) + + return self.success() + + def is_applicable(self): + '''SimulStartLite is a setup test and never applicable''' + return False + + +def TestList(cm, audits): + result = [] + for testclass in AllTestClasses: + bound_test = testclass(cm) + if bound_test.is_applicable(): + bound_test.Audits = audits + result.append(bound_test) + return result + + +class RemoteLXC(CTSTest): + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = "RemoteLXC" + self.start = StartTest(cm) + self.startall = SimulStartLite(cm) + self.num_containers = 2 + self.is_container = 1 + self.failed = 0 + self.fail_string = "" + + def start_lxc_simple(self, node): + + # restore any artifacts laying around from a previous test. + self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null") + + # generate the containers, put them in the config, add some resources to them + pats = [ ] + watch = self.create_watch(pats, 120) + watch.set_watch() + pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc1")) + pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc2")) + pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc-ms")) + pats.append(self.templates["Pat:RscOpOK"] % ("promote", "lxc-ms")) + + self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -g -a -m -s -c %d &>/dev/null" % self.num_containers) + self.set_timer("remoteSimpleInit") + watch.look_for_all() + self.log_timer("remoteSimpleInit") + if watch.unmatched: + self.fail_string = "Unmatched patterns: %s" % (repr(watch.unmatched)) + self.failed = 1 + + def cleanup_lxc_simple(self, node): + + pats = [ ] + # if the test failed, attempt to clean up the cib and libvirt environment + # as best as possible + if self.failed == 1: + # restore libvirt and cib + self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null") + return + + watch = self.create_watch(pats, 120) + watch.set_watch() + + pats.append(self.templates["Pat:RscOpOK"] % ("stop", "container1")) + pats.append(self.templates["Pat:RscOpOK"] % ("stop", "container2")) + + self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -p &>/dev/null") + self.set_timer("remoteSimpleCleanup") + watch.look_for_all() + self.log_timer("remoteSimpleCleanup") + + if watch.unmatched: + self.fail_string = "Unmatched patterns: %s" % (repr(watch.unmatched)) + self.failed = 1 + + # cleanup libvirt + self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null") + + def __call__(self, node): + '''Perform the 'RemoteLXC' test. ''' + self.incr("calls") + + ret = self.startall(None) + if not ret: + return self.failure("Setup failed, start all nodes failed.") + + (rc, _) = self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -v &>/dev/null") + if rc == 1: + self.log("Environment test for lxc support failed.") + return self.skipped() + + self.start_lxc_simple(node) + self.cleanup_lxc_simple(node) + + self.debug("Waiting for the cluster to recover") + self.CM.cluster_stable() + + if self.failed == 1: + return self.failure(self.fail_string) + + return self.success() + + def errorstoignore(self): + '''Return list of errors which should be ignored''' + return [ + r"Updating failcount for ping", + r"schedulerd.*: Recover\s+(ping|lxc-ms|container)\s+\(.*\)", + # The orphaned lxc-ms resource causes an expected transition error + # that is a result of the scheduler not having knowledge that the + # promotable resource used to be a clone. As a result, it looks like that + # resource is running in multiple locations when it shouldn't... But in + # this instance we know why this error is occurring and that it is expected. + r"Calculated [Tt]ransition .*pe-error", + r"Resource lxc-ms .* is active on 2 nodes attempting recovery", + r"Unknown operation: fail", + r"VirtualDomain.*ERROR: Unable to determine emulator", + ] + +AllTestClasses.append(RemoteLXC) + + +class RemoteDriver(CTSTest): + + def __init__(self, cm): + CTSTest.__init__(self,cm) + self.name = self.__class__.__name__ + self.start = StartTest(cm) + self.startall = SimulStartLite(cm) + self.stop = StopTest(cm) + self.remote_rsc = "remote-rsc" + self.cib_cmd = """cibadmin -C -o %s -X '%s' """ + self.reset() + + def reset(self): + self.pcmk_started = 0 + self.failed = False + self.fail_string = "" + self.remote_node_added = 0 + self.remote_rsc_added = 0 + self.remote_use_reconnect_interval = self.Env.random_gen.choice([True,False]) + + def fail(self, msg): + """ Mark test as failed. """ + + self.failed = True + + # Always log the failure. + self.logger.log(msg) + + # Use first failure as test status, as it's likely to be most useful. + if not self.fail_string: + self.fail_string = msg + + def get_othernode(self, node): + for othernode in self.Env["nodes"]: + if othernode == node: + # we don't want to try and use the cib that we just shutdown. + # find a cluster node that is not our soon to be remote-node. + continue + else: + return othernode + + def del_rsc(self, node, rsc): + othernode = self.get_othernode(node) + (rc, _) = self.rsh(othernode, "crm_resource -D -r %s -t primitive" % (rsc)) + if rc != 0: + self.fail("Removal of resource '%s' failed" % rsc) + + def add_rsc(self, node, rsc_xml): + othernode = self.get_othernode(node) + (rc, _) = self.rsh(othernode, self.cib_cmd % ("resources", rsc_xml)) + if rc != 0: + self.fail("resource creation failed") + + def add_primitive_rsc(self, node): + rsc_xml = """ +<primitive class="ocf" id="%(node)s" provider="heartbeat" type="Dummy"> + <meta_attributes id="%(node)s-meta_attributes"/> + <operations> + <op id="%(node)s-monitor-interval-20s" interval="20s" name="monitor"/> + </operations> +</primitive>""" % { "node": self.remote_rsc } + self.add_rsc(node, rsc_xml) + if not self.failed: + self.remote_rsc_added = 1 + + def add_connection_rsc(self, node): + rsc_xml = """ +<primitive class="ocf" id="%(node)s" provider="pacemaker" type="remote"> + <instance_attributes id="%(node)s-instance_attributes"> + <nvpair id="%(node)s-instance_attributes-server" name="server" value="%(server)s"/> +""" % { "node": self.remote_node, "server": node } + + if self.remote_use_reconnect_interval: + # Set reconnect interval on resource + rsc_xml = rsc_xml + """ + <nvpair id="%s-instance_attributes-reconnect_interval" name="reconnect_interval" value="60s"/> +""" % (self.remote_node) + + rsc_xml = rsc_xml + """ + </instance_attributes> + <operations> + <op id="%(node)s-start" name="start" interval="0" timeout="120s"/> + <op id="%(node)s-monitor-20s" name="monitor" interval="20s" timeout="45s"/> + </operations> +</primitive> +""" % { "node": self.remote_node } + + self.add_rsc(node, rsc_xml) + if not self.failed: + self.remote_node_added = 1 + + def disable_services(self, node): + self.corosync_enabled = self.Env.service_is_enabled(node, "corosync") + if self.corosync_enabled: + self.Env.disable_service(node, "corosync") + + self.pacemaker_enabled = self.Env.service_is_enabled(node, "pacemaker") + if self.pacemaker_enabled: + self.Env.disable_service(node, "pacemaker") + + def restore_services(self, node): + if self.corosync_enabled: + self.Env.enable_service(node, "corosync") + + if self.pacemaker_enabled: + self.Env.enable_service(node, "pacemaker") + + def stop_pcmk_remote(self, node): + # disable pcmk remote + for i in range(10): + (rc, _) = self.rsh(node, "service pacemaker_remote stop") + if rc != 0: + time.sleep(6) + else: + break + + def start_pcmk_remote(self, node): + for i in range(10): + (rc, _) = self.rsh(node, "service pacemaker_remote start") + if rc != 0: + time.sleep(6) + else: + self.pcmk_started = 1 + break + + def freeze_pcmk_remote(self, node): + """ Simulate a Pacemaker Remote daemon failure. """ + + # We freeze the process. + self.rsh(node, "killall -STOP pacemaker-remoted") + + def resume_pcmk_remote(self, node): + # We resume the process. + self.rsh(node, "killall -CONT pacemaker-remoted") + + def start_metal(self, node): + # Cluster nodes are reused as remote nodes in remote tests. If cluster + # services were enabled at boot, in case the remote node got fenced, the + # cluster node would join instead of the expected remote one. Meanwhile + # pacemaker_remote would not be able to start. Depending on the chances, + # the situations might not be able to be orchestrated gracefully any more. + # + # Temporarily disable any enabled cluster serivces. + self.disable_services(node) + + pcmk_started = 0 + + # make sure the resource doesn't already exist for some reason + self.rsh(node, "crm_resource -D -r %s -t primitive" % (self.remote_rsc)) + self.rsh(node, "crm_resource -D -r %s -t primitive" % (self.remote_node)) + + if not self.stop(node): + self.fail("Failed to shutdown cluster node %s" % node) + return + + self.start_pcmk_remote(node) + + if self.pcmk_started == 0: + self.fail("Failed to start pacemaker_remote on node %s" % node) + return + + # Convert node to baremetal now that it has shutdown the cluster stack + pats = [ ] + watch = self.create_watch(pats, 120) + watch.set_watch() + pats.append(self.templates["Pat:RscOpOK"] % ("start", self.remote_node)) + pats.append(self.templates["Pat:DC_IDLE"]) + + self.add_connection_rsc(node) + + self.set_timer("remoteMetalInit") + watch.look_for_all() + self.log_timer("remoteMetalInit") + if watch.unmatched: + self.fail("Unmatched patterns: %s" % watch.unmatched) + + def migrate_connection(self, node): + if self.failed: + return + + pats = [ ] + pats.append(self.templates["Pat:RscOpOK"] % ("migrate_to", self.remote_node)) + pats.append(self.templates["Pat:RscOpOK"] % ("migrate_from", self.remote_node)) + pats.append(self.templates["Pat:DC_IDLE"]) + watch = self.create_watch(pats, 120) + watch.set_watch() + + (rc, _) = self.rsh(node, "crm_resource -M -r %s" % (self.remote_node), verbose=1) + if rc != 0: + self.fail("failed to move remote node connection resource") + return + + self.set_timer("remoteMetalMigrate") + watch.look_for_all() + self.log_timer("remoteMetalMigrate") + + if watch.unmatched: + self.fail("Unmatched patterns: %s" % watch.unmatched) + return + + def fail_rsc(self, node): + if self.failed: + return + + watchpats = [ ] + watchpats.append(self.templates["Pat:RscRemoteOpOK"] % ("stop", self.remote_rsc, self.remote_node)) + watchpats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self.remote_rsc, self.remote_node)) + watchpats.append(self.templates["Pat:DC_IDLE"]) + + watch = self.create_watch(watchpats, 120) + watch.set_watch() + + self.debug("causing dummy rsc to fail.") + + self.rsh(node, "rm -f /var/run/resource-agents/Dummy*") + + self.set_timer("remoteRscFail") + watch.look_for_all() + self.log_timer("remoteRscFail") + if watch.unmatched: + self.fail("Unmatched patterns during rsc fail: %s" % watch.unmatched) + + def fail_connection(self, node): + if self.failed: + return + + watchpats = [ ] + watchpats.append(self.templates["Pat:Fencing_ok"] % self.remote_node) + watchpats.append(self.templates["Pat:NodeFenced"] % self.remote_node) + + watch = self.create_watch(watchpats, 120) + watch.set_watch() + + # freeze the pcmk remote daemon. this will result in fencing + self.debug("Force stopped active remote node") + self.freeze_pcmk_remote(node) + + self.debug("Waiting for remote node to be fenced.") + self.set_timer("remoteMetalFence") + watch.look_for_all() + self.log_timer("remoteMetalFence") + if watch.unmatched: + self.fail("Unmatched patterns: %s" % watch.unmatched) + return + + self.debug("Waiting for the remote node to come back up") + self.CM.ns.wait_for_node(node, 120); + + pats = [ ] + watch = self.create_watch(pats, 240) + watch.set_watch() + pats.append(self.templates["Pat:RscOpOK"] % ("start", self.remote_node)) + if self.remote_rsc_added == 1: + pats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self.remote_rsc, self.remote_node)) + + # start the remote node again watch it integrate back into cluster. + self.start_pcmk_remote(node) + if self.pcmk_started == 0: + self.fail("Failed to start pacemaker_remote on node %s" % node) + return + + self.debug("Waiting for remote node to rejoin cluster after being fenced.") + self.set_timer("remoteMetalRestart") + watch.look_for_all() + self.log_timer("remoteMetalRestart") + if watch.unmatched: + self.fail("Unmatched patterns: %s" % watch.unmatched) + return + + def add_dummy_rsc(self, node): + if self.failed: + return + + # verify we can put a resource on the remote node + pats = [ ] + watch = self.create_watch(pats, 120) + watch.set_watch() + pats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self.remote_rsc, self.remote_node)) + pats.append(self.templates["Pat:DC_IDLE"]) + + # Add a resource that must live on remote-node + self.add_primitive_rsc(node) + + # force that rsc to prefer the remote node. + (rc, _) = self.CM.rsh(node, "crm_resource -M -r %s -N %s -f" % (self.remote_rsc, self.remote_node), verbose=1) + if rc != 0: + self.fail("Failed to place remote resource on remote node.") + return + + self.set_timer("remoteMetalRsc") + watch.look_for_all() + self.log_timer("remoteMetalRsc") + if watch.unmatched: + self.fail("Unmatched patterns: %s" % watch.unmatched) + + def test_attributes(self, node): + if self.failed: + return + + # This verifies permanent attributes can be set on a remote-node. It also + # verifies the remote-node can edit its own cib node section remotely. + (rc, line) = self.CM.rsh(node, "crm_attribute -l forever -n testattr -v testval -N %s" % (self.remote_node), verbose=1) + if rc != 0: + self.fail("Failed to set remote-node attribute. rc:%s output:%s" % (rc, line)) + return + + (rc, _) = self.CM.rsh(node, "crm_attribute -l forever -n testattr -q -N %s" % (self.remote_node), verbose=1) + if rc != 0: + self.fail("Failed to get remote-node attribute") + return + + (rc, _) = self.CM.rsh(node, "crm_attribute -l forever -n testattr -D -N %s" % (self.remote_node), verbose=1) + if rc != 0: + self.fail("Failed to delete remote-node attribute") + return + + def cleanup_metal(self, node): + self.restore_services(node) + + if self.pcmk_started == 0: + return + + pats = [ ] + + watch = self.create_watch(pats, 120) + watch.set_watch() + + if self.remote_rsc_added == 1: + pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.remote_rsc)) + if self.remote_node_added == 1: + pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.remote_node)) + + self.set_timer("remoteMetalCleanup") + + self.resume_pcmk_remote(node) + + if self.remote_rsc_added == 1: + + # Remove dummy resource added for remote node tests + self.debug("Cleaning up dummy rsc put on remote node") + self.rsh(self.get_othernode(node), "crm_resource -U -r %s" % self.remote_rsc) + self.del_rsc(node, self.remote_rsc) + + if self.remote_node_added == 1: + + # Remove remote node's connection resource + self.debug("Cleaning up remote node connection resource") + self.rsh(self.get_othernode(node), "crm_resource -U -r %s" % (self.remote_node)) + self.del_rsc(node, self.remote_node) + + watch.look_for_all() + self.log_timer("remoteMetalCleanup") + + if watch.unmatched: + self.fail("Unmatched patterns: %s" % watch.unmatched) + + self.stop_pcmk_remote(node) + + self.debug("Waiting for the cluster to recover") + self.CM.cluster_stable() + + if self.remote_node_added == 1: + # Remove remote node itself + self.debug("Cleaning up node entry for remote node") + self.rsh(self.get_othernode(node), "crm_node --force --remove %s" % self.remote_node) + + def setup_env(self, node): + + self.remote_node = "remote-%s" % (node) + + # we are assuming if all nodes have a key, that it is + # the right key... If any node doesn't have a remote + # key, we regenerate it everywhere. + if self.rsh.exists_on_all("/etc/pacemaker/authkey", self.Env["nodes"]): + return + + # create key locally + (handle, keyfile) = tempfile.mkstemp(".cts") + os.close(handle) + subprocess.check_call(["dd", "if=/dev/urandom", "of=%s" % keyfile, "bs=4096", "count=1"], + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + # sync key throughout the cluster + for node in self.Env["nodes"]: + self.rsh(node, "mkdir -p --mode=0750 /etc/pacemaker") + self.rsh.copy(keyfile, "root@%s:/etc/pacemaker/authkey" % node) + self.rsh(node, "chgrp haclient /etc/pacemaker /etc/pacemaker/authkey") + self.rsh(node, "chmod 0640 /etc/pacemaker/authkey") + os.unlink(keyfile) + + def is_applicable(self): + if not self.is_applicable_common(): + return False + + for node in self.Env["nodes"]: + (rc, _) = self.rsh(node, "which pacemaker-remoted >/dev/null 2>&1") + if rc != 0: + return False + return True + + def start_new_test(self, node): + self.incr("calls") + self.reset() + + ret = self.startall(None) + if not ret: + return self.failure("setup failed: could not start all nodes") + + self.setup_env(node) + self.start_metal(node) + self.add_dummy_rsc(node) + return True + + def __call__(self, node): + return self.failure("This base class is not meant to be called directly.") + + def errorstoignore(self): + '''Return list of errors which should be ignored''' + return [ r"""is running on remote.*which isn't allowed""", + r"""Connection terminated""", + r"""Could not send remote""", + ] + +# RemoteDriver is just a base class for other tests, so it is not added to AllTestClasses + + +class RemoteBasic(RemoteDriver): + + def __call__(self, node): + '''Perform the 'RemoteBaremetal' test. ''' + + if not self.start_new_test(node): + return self.failure(self.fail_string) + + self.test_attributes(node) + self.cleanup_metal(node) + + self.debug("Waiting for the cluster to recover") + self.CM.cluster_stable() + if self.failed: + return self.failure(self.fail_string) + + return self.success() + +AllTestClasses.append(RemoteBasic) + +class RemoteStonithd(RemoteDriver): + + def __call__(self, node): + '''Perform the 'RemoteStonithd' test. ''' + + if not self.start_new_test(node): + return self.failure(self.fail_string) + + self.fail_connection(node) + self.cleanup_metal(node) + + self.debug("Waiting for the cluster to recover") + self.CM.cluster_stable() + if self.failed: + return self.failure(self.fail_string) + + return self.success() + + def is_applicable(self): + if not RemoteDriver.is_applicable(self): + return False + + if "DoFencing" in list(self.Env.keys()): + return self.Env["DoFencing"] + + return True + + def errorstoignore(self): + ignore_pats = [ + r"Lost connection to Pacemaker Remote node", + r"Software caused connection abort", + r"pacemaker-controld.*:\s+error.*: Operation remote-.*_monitor", + r"pacemaker-controld.*:\s+error.*: Result of monitor operation for remote-.*", + r"schedulerd.*:\s+Recover\s+remote-.*\s+\(.*\)", + r"error: Result of monitor operation for .* on remote-.*: Internal communication failure", + ] + + ignore_pats.extend(RemoteDriver.errorstoignore(self)) + return ignore_pats + +AllTestClasses.append(RemoteStonithd) + + +class RemoteMigrate(RemoteDriver): + + def __call__(self, node): + '''Perform the 'RemoteMigrate' test. ''' + + if not self.start_new_test(node): + return self.failure(self.fail_string) + + self.migrate_connection(node) + self.cleanup_metal(node) + + self.debug("Waiting for the cluster to recover") + self.CM.cluster_stable() + if self.failed: + return self.failure(self.fail_string) + + return self.success() + + def is_applicable(self): + if not RemoteDriver.is_applicable(self): + return 0 + # This test requires at least three nodes: one to convert to a + # remote node, one to host the connection originally, and one + # to migrate the connection to. + if len(self.Env["nodes"]) < 3: + return 0 + return 1 + +AllTestClasses.append(RemoteMigrate) + + +class RemoteRscFailure(RemoteDriver): + + def __call__(self, node): + '''Perform the 'RemoteRscFailure' test. ''' + + if not self.start_new_test(node): + return self.failure(self.fail_string) + + # This is an important step. We are migrating the connection + # before failing the resource. This verifies that the migration + # has properly maintained control over the remote-node. + self.migrate_connection(node) + + self.fail_rsc(node) + self.cleanup_metal(node) + + self.debug("Waiting for the cluster to recover") + self.CM.cluster_stable() + if self.failed: + return self.failure(self.fail_string) + + return self.success() + + def errorstoignore(self): + ignore_pats = [ + r"schedulerd.*: Recover\s+remote-rsc\s+\(.*\)", + r"Dummy.*: No process state file found", + ] + + ignore_pats.extend(RemoteDriver.errorstoignore(self)) + return ignore_pats + + def is_applicable(self): + if not RemoteDriver.is_applicable(self): + return 0 + # This test requires at least three nodes: one to convert to a + # remote node, one to host the connection originally, and one + # to migrate the connection to. + if len(self.Env["nodes"]) < 3: + return 0 + return 1 + +AllTestClasses.append(RemoteRscFailure) + +# vim:ts=4:sw=4:et: diff --git a/cts/lab/ClusterManager.py b/cts/lab/ClusterManager.py new file mode 100644 index 0000000..fda4cfb --- /dev/null +++ b/cts/lab/ClusterManager.py @@ -0,0 +1,940 @@ +""" ClusterManager class for Pacemaker's Cluster Test Suite (CTS) +""" + +__copyright__ = """Copyright 2000-2023 the Pacemaker project contributors. +Certain portions by Huang Zhen <zhenhltc@cn.ibm.com> are copyright 2004 +International Business Machines. The version control history for this file +may have further details.""" +__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" + +import os +import re +import time + +from collections import UserDict + +from cts.CIB import ConfigFactory +from cts.CTStests import AuditResource + +from pacemaker.buildoptions import BuildOptions +from pacemaker._cts.CTS import NodeStatus, Process +from pacemaker._cts.environment import EnvFactory +from pacemaker._cts.logging import LogFactory +from pacemaker._cts.patterns import PatternSelector +from pacemaker._cts.remote import RemoteFactory +from pacemaker._cts.watcher import LogWatcher + +class ClusterManager(UserDict): + '''The Cluster Manager class. + This is an subclass of the Python dictionary class. + (this is because it contains lots of {name,value} pairs, + not because it's behavior is that terribly similar to a + dictionary in other ways.) + + This is an abstract class which class implements high-level + operations on the cluster and/or its cluster managers. + Actual cluster managers classes are subclassed from this type. + + One of the things we do is track the state we think every node should + be in. + ''' + + def __InitialConditions(self): + #if os.geteuid() != 0: + # raise ValueError("Must Be Root!") + None + + def _finalConditions(self): + for key in list(self.keys()): + if self[key] == None: + raise ValueError("Improper derivation: self[" + key + "] must be overridden by subclass.") + + def __init__(self): + self.Env = EnvFactory().getInstance() + self.templates = PatternSelector(self.Env["Name"]) + self.__InitialConditions() + self.logger = LogFactory() + self.TestLoggingLevel=0 + self.data = {} + self.name = self.Env["Name"] + + self.rsh = RemoteFactory().getInstance() + self.ShouldBeStatus={} + self.ns = NodeStatus(self.Env) + self.OurNode = os.uname()[1].lower() + self.__instance_errorstoignore = [] + + self.cib_installed = 0 + self.config = None + self.cluster_monitor = 0 + self.use_short_names = 1 + + if self.Env["DoBSC"]: + del self.templates["Pat:They_stopped"] + + self._finalConditions() + + self.check_transitions = 0 + self.check_elections = 0 + self.CIBsync = {} + self.CibFactory = ConfigFactory(self) + self.cib = self.CibFactory.createConfig(self.Env["Schema"]) + + def __getitem__(self, key): + if key == "Name": + return self.name + + print("FIXME: Getting %s from %s" % (key, repr(self))) + if key in self.data: + return self.data[key] + + return self.templates.get_patterns(key) + + def __setitem__(self, key, value): + print("FIXME: Setting %s=%s on %s" % (key, value, repr(self))) + self.data[key] = value + + def key_for_node(self, node): + return node + + def instance_errorstoignore_clear(self): + '''Allows the test scenario to reset instance errors to ignore on each iteration.''' + self.__instance_errorstoignore = [] + + def instance_errorstoignore(self): + '''Return list of errors which are 'normal' for a specific test instance''' + return self.__instance_errorstoignore + + def log(self, args): + self.logger.log(args) + + def debug(self, args): + self.logger.debug(args) + + def upcount(self): + '''How many nodes are up?''' + count = 0 + for node in self.Env["nodes"]: + if self.ShouldBeStatus[node] == "up": + count = count + 1 + return count + + def install_support(self, command="install"): + for node in self.Env["nodes"]: + self.rsh(node, BuildOptions.DAEMON_DIR + "/cts-support " + command) + + def prepare_fencing_watcher(self, name): + # If we don't have quorum now but get it as a result of starting this node, + # then a bunch of nodes might get fenced + upnode = None + if self.HasQuorum(None): + self.debug("Have quorum") + return None + + if not self.templates["Pat:Fencing_start"]: + print("No start pattern") + return None + + if not self.templates["Pat:Fencing_ok"]: + print("No ok pattern") + return None + + stonith = None + stonithPats = [] + for peer in self.Env["nodes"]: + if self.ShouldBeStatus[peer] != "up": + stonithPats.append(self.templates["Pat:Fencing_ok"] % peer) + stonithPats.append(self.templates["Pat:Fencing_start"] % peer) + + stonith = LogWatcher(self.Env["LogFileName"], stonithPats, self.Env["nodes"], self.Env["LogWatcher"], "StartupFencing", 0) + stonith.set_watch() + return stonith + + def fencing_cleanup(self, node, stonith): + peer_list = [] + peer_state = {} + + self.debug("Looking for nodes that were fenced as a result of %s starting" % node) + + # If we just started a node, we may now have quorum (and permission to fence) + if not stonith: + self.debug("Nothing to do") + return peer_list + + q = self.HasQuorum(None) + if not q and len(self.Env["nodes"]) > 2: + # We didn't gain quorum - we shouldn't have shot anyone + self.debug("Quorum: %d Len: %d" % (q, len(self.Env["nodes"]))) + return peer_list + + for n in self.Env["nodes"]: + peer_state[n] = "unknown" + + # Now see if any states need to be updated + self.debug("looking for: " + repr(stonith.regexes)) + shot = stonith.look(0) + while shot: + line = repr(shot) + self.debug("Found: " + line) + del stonith.regexes[stonith.whichmatch] + + # Extract node name + for n in self.Env["nodes"]: + if re.search(self.templates["Pat:Fencing_ok"] % n, shot): + peer = n + peer_state[peer] = "complete" + self.__instance_errorstoignore.append(self.templates["Pat:Fencing_ok"] % peer) + + elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot): + # TODO: Correctly detect multiple fencing operations for the same host + peer = n + peer_state[peer] = "in-progress" + self.__instance_errorstoignore.append(self.templates["Pat:Fencing_start"] % peer) + + if not peer: + self.logger.log("ERROR: Unknown stonith match: %s" % line) + + elif not peer in peer_list: + self.debug("Found peer: " + peer) + peer_list.append(peer) + + # Get the next one + shot = stonith.look(60) + + for peer in peer_list: + + self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer])) + if self.Env["at-boot"]: + self.ShouldBeStatus[peer] = "up" + else: + self.ShouldBeStatus[peer] = "down" + + if peer_state[peer] == "in-progress": + # Wait for any in-progress operations to complete + shot = stonith.look(60) + while len(stonith.regexes) and shot: + line = repr(shot) + self.debug("Found: " + line) + del stonith.regexes[stonith.whichmatch] + shot = stonith.look(60) + + # Now make sure the node is alive too + self.ns.wait_for_node(peer, self.Env["DeadTime"]) + + # Poll until it comes up + if self.Env["at-boot"]: + if not self.StataCM(peer): + time.sleep(self.Env["StartTime"]) + + if not self.StataCM(peer): + self.logger.log("ERROR: Peer %s failed to restart after being fenced" % peer) + return None + + return peer_list + + def StartaCM(self, node, verbose=False): + + '''Start up the cluster manager on a given node''' + if verbose: self.logger.log("Starting %s on node %s" % (self.templates["Name"], node)) + else: self.debug("Starting %s on node %s" % (self.templates["Name"], node)) + ret = 1 + + if not node in self.ShouldBeStatus: + self.ShouldBeStatus[node] = "down" + + if self.ShouldBeStatus[node] != "down": + return 1 + + patterns = [] + # Technically we should always be able to notice ourselves starting + patterns.append(self.templates["Pat:Local_started"] % node) + if self.upcount() == 0: + patterns.append(self.templates["Pat:DC_started"] % node) + else: + patterns.append(self.templates["Pat:NonDC_started"] % node) + + watch = LogWatcher( + self.Env["LogFileName"], patterns, self.Env["nodes"], self.Env["LogWatcher"], "StartaCM", self.Env["StartTime"]+10) + + self.install_config(node) + + self.ShouldBeStatus[node] = "any" + if self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]): + self.logger.log ("%s was already started" % (node)) + return 1 + + stonith = self.prepare_fencing_watcher(node) + watch.set_watch() + + (rc, _) = self.rsh(node, self.templates["StartCmd"]) + if rc != 0: + self.logger.log ("Warn: Start command failed on node %s" % (node)) + self.fencing_cleanup(node, stonith) + return None + + self.ShouldBeStatus[node] = "up" + watch_result = watch.look_for_all() + + if watch.unmatched: + for regex in watch.unmatched: + self.logger.log ("Warn: Startup pattern not found: %s" % (regex)) + + if watch_result and self.cluster_stable(self.Env["DeadTime"]): + #self.debug("Found match: "+ repr(watch_result)) + self.fencing_cleanup(node, stonith) + return 1 + + elif self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]): + self.fencing_cleanup(node, stonith) + return 1 + + self.logger.log ("Warn: Start failed for node %s" % (node)) + return None + + def StartaCMnoBlock(self, node, verbose=False): + + '''Start up the cluster manager on a given node with none-block mode''' + + if verbose: self.logger.log("Starting %s on node %s" % (self["Name"], node)) + else: self.debug("Starting %s on node %s" % (self["Name"], node)) + + self.install_config(node) + self.rsh(node, self.templates["StartCmd"], synchronous=False) + self.ShouldBeStatus[node] = "up" + return 1 + + def StopaCM(self, node, verbose=False, force=False): + + '''Stop the cluster manager on a given node''' + + if verbose: self.logger.log("Stopping %s on node %s" % (self["Name"], node)) + else: self.debug("Stopping %s on node %s" % (self["Name"], node)) + + if self.ShouldBeStatus[node] != "up" and force == False: + return 1 + + (rc, _) = self.rsh(node, self.templates["StopCmd"]) + if rc == 0: + # Make sure we can continue even if corosync leaks + # fdata-* is the old name + #self.rsh(node, "rm -rf /dev/shm/qb-* /dev/shm/fdata-*") + self.ShouldBeStatus[node] = "down" + self.cluster_stable(self.Env["DeadTime"]) + return 1 + else: + self.logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node)) + + return None + + def StopaCMnoBlock(self, node): + + '''Stop the cluster manager on a given node with none-block mode''' + + self.debug("Stopping %s on node %s" % (self["Name"], node)) + + self.rsh(node, self.templates["StopCmd"], synchronous=False) + self.ShouldBeStatus[node] = "down" + return 1 + + def RereadCM(self, node): + + '''Force the cluster manager on a given node to reread its config + This may be a no-op on certain cluster managers. + ''' + (rc, _) = self.rsh(node, self.templates["RereadCmd"]) + if rc == 0: + return 1 + else: + self.logger.log ("Could not force %s on node %s to reread its config" + % (self["Name"], node)) + return None + + def startall(self, nodelist=None, verbose=False, quick=False): + + '''Start the cluster manager on every node in the cluster. + We can do it on a subset of the cluster if nodelist is not None. + ''' + map = {} + if not nodelist: + nodelist = self.Env["nodes"] + + for node in nodelist: + if self.ShouldBeStatus[node] == "down": + self.ns.wait_for_all_nodes(nodelist, 300) + + if not quick: + # This is used for "basic sanity checks", so only start one node ... + if not self.StartaCM(node, verbose=verbose): + return 0 + return 1 + + # Approximation of SimulStartList for --boot + watchpats = [ ] + watchpats.append(self.templates["Pat:DC_IDLE"]) + for node in nodelist: + watchpats.append(self.templates["Pat:InfraUp"] % node) + watchpats.append(self.templates["Pat:PacemakerUp"] % node) + watchpats.append(self.templates["Pat:Local_started"] % node) + watchpats.append(self.templates["Pat:They_up"] % (nodelist[0], node)) + + # Start all the nodes - at about the same time... + watch = LogWatcher(self.Env["LogFileName"], watchpats, self.Env["nodes"], self.Env["LogWatcher"], "fast-start", self.Env["DeadTime"]+10) + watch.set_watch() + + if not self.StartaCM(nodelist[0], verbose=verbose): + return 0 + for node in nodelist: + self.StartaCMnoBlock(node, verbose=verbose) + + watch.look_for_all() + if watch.unmatched: + for regex in watch.unmatched: + self.logger.log ("Warn: Startup pattern not found: %s" % (regex)) + + if not self.cluster_stable(): + self.logger.log("Cluster did not stabilize") + return 0 + + return 1 + + def stopall(self, nodelist=None, verbose=False, force=False): + + '''Stop the cluster managers on every node in the cluster. + We can do it on a subset of the cluster if nodelist is not None. + ''' + + ret = 1 + map = {} + if not nodelist: + nodelist = self.Env["nodes"] + for node in self.Env["nodes"]: + if self.ShouldBeStatus[node] == "up" or force == True: + if not self.StopaCM(node, verbose=verbose, force=force): + ret = 0 + return ret + + def rereadall(self, nodelist=None): + + '''Force the cluster managers on every node in the cluster + to reread their config files. We can do it on a subset of the + cluster if nodelist is not None. + ''' + + map = {} + if not nodelist: + nodelist = self.Env["nodes"] + for node in self.Env["nodes"]: + if self.ShouldBeStatus[node] == "up": + self.RereadCM(node) + + def statall(self, nodelist=None): + + '''Return the status of the cluster managers in the cluster. + We can do it on a subset of the cluster if nodelist is not None. + ''' + + result = {} + if not nodelist: + nodelist = self.Env["nodes"] + for node in nodelist: + if self.StataCM(node): + result[node] = "up" + else: + result[node] = "down" + return result + + def isolate_node(self, target, nodes=None): + '''isolate the communication between the nodes''' + if not nodes: + nodes = self.Env["nodes"] + + for node in nodes: + if node != target: + rc = self.rsh(target, self.templates["BreakCommCmd"] % self.key_for_node(node)) + if rc != 0: + self.logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc)) + return None + else: + self.debug("Communication cut between %s and %s" % (target, node)) + return 1 + + def unisolate_node(self, target, nodes=None): + '''fix the communication between the nodes''' + if not nodes: + nodes = self.Env["nodes"] + + for node in nodes: + if node != target: + restored = 0 + + # Limit the amount of time we have asynchronous connectivity for + # Restore both sides as simultaneously as possible + self.rsh(target, self.templates["FixCommCmd"] % self.key_for_node(node), synchronous=False) + self.rsh(node, self.templates["FixCommCmd"] % self.key_for_node(target), synchronous=False) + self.debug("Communication restored between %s and %s" % (target, node)) + + def oprofileStart(self, node=None): + if not node: + for n in self.Env["oprofile"]: + self.oprofileStart(n) + + elif node in self.Env["oprofile"]: + self.debug("Enabling oprofile on %s" % node) + self.rsh(node, "opcontrol --init") + self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all") + self.rsh(node, "opcontrol --start") + self.rsh(node, "opcontrol --reset") + + def oprofileSave(self, test, node=None): + if not node: + for n in self.Env["oprofile"]: + self.oprofileSave(test, n) + + elif node in self.Env["oprofile"]: + self.rsh(node, "opcontrol --dump") + self.rsh(node, "opcontrol --save=cts.%d" % test) + # Read back with: opreport -l session:cts.0 image:<directory>/c* + if None: + self.rsh(node, "opcontrol --reset") + else: + self.oprofileStop(node) + self.oprofileStart(node) + + def oprofileStop(self, node=None): + if not node: + for n in self.Env["oprofile"]: + self.oprofileStop(n) + + elif node in self.Env["oprofile"]: + self.debug("Stopping oprofile on %s" % node) + self.rsh(node, "opcontrol --reset") + self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null") + + def errorstoignore(self): + # At some point implement a more elegant solution that + # also produces a report at the end + """ Return a list of known error messages that should be ignored """ + return self.templates.get_patterns("BadNewsIgnore") + + def install_config(self, node): + if not self.ns.wait_for_node(node): + self.log("Node %s is not up." % node) + return None + + if not node in self.CIBsync and self.Env["ClobberCIB"]: + self.CIBsync[node] = 1 + self.rsh(node, "rm -f " + BuildOptions.CIB_DIR + "/cib*") + + # Only install the CIB on the first node, all the other ones will pick it up from there + if self.cib_installed == 1: + return None + + self.cib_installed = 1 + if self.Env["CIBfilename"] == None: + self.log("Installing Generated CIB on node %s" % (node)) + self.cib.install(node) + + else: + self.log("Installing CIB (%s) on node %s" % (self.Env["CIBfilename"], node)) + if self.rsh.copy(self.Env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node)) != 0: + raise ValueError("Can not scp file to %s %d"%(node)) + + self.rsh(node, "chown " + BuildOptions.DAEMON_USER + " " + BuildOptions.CIB_DIR + "/cib.xml") + + def prepare(self): + '''Finish the Initialization process. Prepare to test...''' + + self.partitions_expected = 1 + for node in self.Env["nodes"]: + self.ShouldBeStatus[node] = "" + if self.Env["experimental-tests"]: + self.unisolate_node(node) + self.StataCM(node) + + def test_node_CM(self, node): + '''Report the status of the cluster manager on a given node''' + + watchpats = [ ] + watchpats.append("Current ping state: (S_IDLE|S_NOT_DC)") + watchpats.append(self.templates["Pat:NonDC_started"] % node) + watchpats.append(self.templates["Pat:DC_started"] % node) + idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, [node], self.Env["LogWatcher"], "ClusterIdle") + idle_watch.set_watch() + + (_, out) = self.rsh(node, self.templates["StatusCmd"]%node, verbose=1) + + if not out: + out = "" + else: + out = out[0].strip() + + self.debug("Node %s status: '%s'" %(node, out)) + + if out.find('ok') < 0: + if self.ShouldBeStatus[node] == "up": + self.log( + "Node status for %s is %s but we think it should be %s" + % (node, "down", self.ShouldBeStatus[node])) + self.ShouldBeStatus[node] = "down" + return 0 + + if self.ShouldBeStatus[node] == "down": + self.log( + "Node status for %s is %s but we think it should be %s: %s" + % (node, "up", self.ShouldBeStatus[node], out)) + + self.ShouldBeStatus[node] = "up" + + # check the output first - because syslog-ng loses messages + if out.find('S_NOT_DC') != -1: + # Up and stable + return 2 + if out.find('S_IDLE') != -1: + # Up and stable + return 2 + + # fall back to syslog-ng and wait + if not idle_watch.look(): + # just up + self.debug("Warn: Node %s is unstable: %s" % (node, out)) + return 1 + + # Up and stable + return 2 + + # Is the node up or is the node down + def StataCM(self, node): + '''Report the status of the cluster manager on a given node''' + + if self.test_node_CM(node) > 0: + return 1 + return None + + # Being up and being stable is not the same question... + def node_stable(self, node): + '''Report the status of the cluster manager on a given node''' + + if self.test_node_CM(node) == 2: + return 1 + self.log("Warn: Node %s not stable" % (node)) + return None + + def partition_stable(self, nodes, timeout=None): + watchpats = [ ] + watchpats.append("Current ping state: S_IDLE") + watchpats.append(self.templates["Pat:DC_IDLE"]) + self.debug("Waiting for cluster stability...") + + if timeout == None: + timeout = self.Env["DeadTime"] + + if len(nodes) < 3: + self.debug("Cluster is inactive") + return 1 + + idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, nodes.split(), self.Env["LogWatcher"], "ClusterStable", timeout) + idle_watch.set_watch() + + for node in nodes.split(): + # have each node dump its current state + self.rsh(node, self.templates["StatusCmd"] % node, verbose=1) + + ret = idle_watch.look() + while ret: + self.debug(ret) + for node in nodes.split(): + if re.search(node, ret): + return 1 + ret = idle_watch.look() + + self.debug("Warn: Partition %s not IDLE after %ds" % (repr(nodes), timeout)) + return None + + def cluster_stable(self, timeout=None, double_check=False): + partitions = self.find_partitions() + + for partition in partitions: + if not self.partition_stable(partition, timeout): + return None + + if double_check: + # Make sure we are really stable and that all resources, + # including those that depend on transient node attributes, + # are started if they were going to be + time.sleep(5) + for partition in partitions: + if not self.partition_stable(partition, timeout): + return None + + return 1 + + def is_node_dc(self, node, status_line=None): + rc = 0 + + if not status_line: + (_, out) = self.rsh(node, self.templates["StatusCmd"]%node, verbose=1) + + if out: + status_line = out[0].strip() + + if not status_line: + rc = 0 + elif status_line.find('S_IDLE') != -1: + rc = 1 + elif status_line.find('S_INTEGRATION') != -1: + rc = 1 + elif status_line.find('S_FINALIZE_JOIN') != -1: + rc = 1 + elif status_line.find('S_POLICY_ENGINE') != -1: + rc = 1 + elif status_line.find('S_TRANSITION_ENGINE') != -1: + rc = 1 + + return rc + + def active_resources(self, node): + (_, output) = self.rsh(node, "crm_resource -c", verbose=1) + resources = [] + for line in output: + if re.search("^Resource", line): + tmp = AuditResource(self, line) + if tmp.type == "primitive" and tmp.host == node: + resources.append(tmp.id) + return resources + + def ResourceLocation(self, rid): + ResourceNodes = [] + for node in self.Env["nodes"]: + if self.ShouldBeStatus[node] == "up": + + cmd = self.templates["RscRunning"] % (rid) + (rc, lines) = self.rsh(node, cmd) + + if rc == 127: + self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd) + for line in lines: + self.log("Output: "+line) + elif rc == 0: + ResourceNodes.append(node) + + return ResourceNodes + + def find_partitions(self): + ccm_partitions = [] + + for node in self.Env["nodes"]: + if self.ShouldBeStatus[node] == "up": + (_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1) + + if not out: + self.log("no partition details for %s" % node) + continue + + partition = out[0].strip() + + if len(partition) > 2: + nodes = partition.split() + nodes.sort() + partition = ' '.join(nodes) + + found = 0 + for a_partition in ccm_partitions: + if partition == a_partition: + found = 1 + if found == 0: + self.debug("Adding partition from %s: %s" % (node, partition)) + ccm_partitions.append(partition) + else: + self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node)) + + else: + self.log("bad partition details for %s" % node) + else: + self.debug("Node %s is down... skipping" % node) + + self.debug("Found partitions: %s" % repr(ccm_partitions) ) + return ccm_partitions + + def HasQuorum(self, node_list): + # If we are auditing a partition, then one side will + # have quorum and the other not. + # So the caller needs to tell us which we are checking + # If no value for node_list is specified... assume all nodes + if not node_list: + node_list = self.Env["nodes"] + + for node in node_list: + if self.ShouldBeStatus[node] == "up": + (_, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1) + quorum = quorum[0].strip() + + if quorum.find("1") != -1: + return 1 + elif quorum.find("0") != -1: + return 0 + else: + self.debug("WARN: Unexpected quorum test result from " + node + ":" + quorum) + + return 0 + + def Components(self): + complist = [] + common_ignore = [ + "Pending action:", + "(ERROR|error): crm_log_message_adv:", + "(ERROR|error): MSG: No message to dump", + "pending LRM operations at shutdown", + "Lost connection to the CIB manager", + "Connection to the CIB terminated...", + "Sending message to the CIB manager FAILED", + "Action A_RECOVER .* not supported", + "(ERROR|error): stonithd_op_result_ready: not signed on", + "pingd.*(ERROR|error): send_update: Could not send update", + "send_ipc_message: IPC Channel to .* is not connected", + "unconfirmed_actions: Waiting on .* unconfirmed actions", + "cib_native_msgready: Message pending on command channel", + r": Performing A_EXIT_1 - forcefully exiting ", + r"Resource .* was active at shutdown. You may ignore this error if it is unmanaged.", + ] + + stonith_ignore = [ + r"Updating failcount for child_DoFencing", + r"error.*: Fencer connection failed \(will retry\)", + "pacemaker-execd.*(ERROR|error): stonithd_receive_ops_result failed.", + ] + + stonith_ignore.extend(common_ignore) + + ccm = Process(self, "ccm", pats = [ + "State transition .* S_RECOVERY", + "pacemaker-controld.*Action A_RECOVER .* not supported", + r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", + r"pacemaker-controld.*: Could not recover from internal error", + "pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy", + # these status numbers are likely wrong now + r"pacemaker-controld.*exited with status 2", + r"attrd.*exited with status 1", + r"cib.*exited with status 2", + +# Not if it was fenced +# "A new node joined the cluster", + +# "WARN: determine_online_status: Node .* is unclean", +# "Scheduling node .* for fencing", +# "Executing .* fencing operation", +# "tengine_stonith_callback: .*result=0", +# "Processing I_NODE_JOIN:.* cause=C_HA_MESSAGE", +# "State transition S_.* -> S_INTEGRATION.*input=I_NODE_JOIN", + "State transition S_STARTING -> S_PENDING", + ], badnews_ignore = common_ignore) + + based = Process(self, "pacemaker-based", pats = [ + "State transition .* S_RECOVERY", + "Lost connection to the CIB manager", + "Connection to the CIB manager terminated", + r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", + "pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy", + r"pacemaker-controld.*: Could not recover from internal error", + # these status numbers are likely wrong now + r"pacemaker-controld.*exited with status 2", + r"attrd.*exited with status 1", + ], badnews_ignore = common_ignore) + + execd = Process(self, "pacemaker-execd", pats = [ + "State transition .* S_RECOVERY", + "LRM Connection failed", + "pacemaker-controld.*I_ERROR.*lrm_connection_destroy", + "State transition S_STARTING -> S_PENDING", + r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", + r"pacemaker-controld.*: Could not recover from internal error", + # this status number is likely wrong now + r"pacemaker-controld.*exited with status 2", + ], badnews_ignore = common_ignore) + + controld = Process(self, "pacemaker-controld", + pats = [ +# "WARN: determine_online_status: Node .* is unclean", +# "Scheduling node .* for fencing", +# "Executing .* fencing operation", +# "tengine_stonith_callback: .*result=0", + "State transition .* S_IDLE", + "State transition S_STARTING -> S_PENDING", + ], badnews_ignore = common_ignore) + + schedulerd = Process(self, "pacemaker-schedulerd", pats = [ + "State transition .* S_RECOVERY", + r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover", + r"pacemaker-controld.*: Could not recover from internal error", + r"pacemaker-controld.*CRIT.*: Connection to the scheduler failed", + "pacemaker-controld.*I_ERROR.*save_cib_contents", + # this status number is likely wrong now + r"pacemaker-controld.*exited with status 2", + ], badnews_ignore = common_ignore, dc_only=True) + + if self.Env["DoFencing"]: + complist.append(Process(self, "stoniths", dc_pats = [ + r"pacemaker-controld.*CRIT.*: Fencing daemon connection failed", + "Attempting connection to fencing daemon", + ], badnews_ignore = stonith_ignore)) + + ccm.pats.extend([ + # these status numbers are likely wrong now + r"attrd.*exited with status 1", + r"pacemaker-(based|controld).*exited with status 2", + ]) + based.pats.extend([ + # these status numbers are likely wrong now + r"attrd.*exited with status 1", + r"pacemaker-controld.*exited with status 2", + ]) + execd.pats.extend([ + # these status numbers are likely wrong now + r"pacemaker-controld.*exited with status 2", + ]) + + complist.append(ccm) + complist.append(based) + complist.append(execd) + complist.append(controld) + complist.append(schedulerd) + + return complist + + def StandbyStatus(self, node): + (_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1) + if not out: + return "off" + out = out[0].strip() + self.debug("Standby result: "+out) + return out + + # status == "on" : Enter Standby mode + # status == "off": Enter Active mode + def SetStandbyMode(self, node, status): + current_status = self.StandbyStatus(node) + cmd = self.templates["StandbyCmd"] % (node, status) + self.rsh(node, cmd) + return True + + def AddDummyRsc(self, node, rid): + rsc_xml = """ '<resources> + <primitive class=\"ocf\" id=\"%s\" provider=\"pacemaker\" type=\"Dummy\"> + <operations> + <op id=\"%s-interval-10s\" interval=\"10s\" name=\"monitor\"/ + </operations> + </primitive> + </resources>'""" % (rid, rid) + constraint_xml = """ '<constraints> + <rsc_location id=\"location-%s-%s\" node=\"%s\" rsc=\"%s\" score=\"INFINITY\"/> + </constraints>' + """ % (rid, node, node, rid) + + self.rsh(node, self.templates['CibAddXml'] % (rsc_xml)) + self.rsh(node, self.templates['CibAddXml'] % (constraint_xml)) + + def RemoveDummyRsc(self, node, rid): + constraint = "\"//rsc_location[@rsc='%s']\"" % (rid) + rsc = "\"//primitive[@id='%s']\"" % (rid) + + self.rsh(node, self.templates['CibDelXpath'] % constraint) + self.rsh(node, self.templates['CibDelXpath'] % rsc) diff --git a/cts/lab/Makefile.am b/cts/lab/Makefile.am new file mode 100644 index 0000000..27e39b3 --- /dev/null +++ b/cts/lab/Makefile.am @@ -0,0 +1,31 @@ +# +# Copyright 2001-2023 the Pacemaker project contributors +# +# The version control history for this file may have further details. +# +# This source code is licensed under the GNU General Public License version 2 +# or later (GPLv2+) WITHOUT ANY WARRANTY. +# + +MAINTAINERCLEANFILES = Makefile.in + +noinst_SCRIPTS = cluster_test \ + OCFIPraTest.py + +# Commands intended to be run only via other commands +halibdir = $(CRM_DAEMON_DIR) +dist_halib_SCRIPTS = cts-log-watcher + +ctslibdir = $(pythondir)/cts +ctslib_PYTHON = __init__.py \ + CIB.py \ + cib_xml.py \ + ClusterManager.py \ + CM_corosync.py \ + CTSaudits.py \ + CTSscenarios.py \ + CTStests.py + +ctsdir = $(datadir)/$(PACKAGE)/tests/cts +cts_SCRIPTS = CTSlab.py \ + cts diff --git a/cts/lab/OCFIPraTest.py.in b/cts/lab/OCFIPraTest.py.in new file mode 100644 index 0000000..2cce304 --- /dev/null +++ b/cts/lab/OCFIPraTest.py.in @@ -0,0 +1,173 @@ +#!@PYTHON@ + +'''OCF IPaddr/IPaddr2 Resource Agent Test''' + +__copyright__ = """Original Author: Huang Zhen <zhenhltc@cn.ibm.com> +Copyright 2004 International Business Machines + +with later changes copyright 2005-2023 the Pacemaker project contributors. +The version control history for this file may have further details. +""" +__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" + +import os +import sys +import time +import random +import struct +import syslog + +from pacemaker import BuildOptions + + +def usage(): + print("usage: " + sys.argv[0] \ + + " [-2]"\ + + " [--ipbase|-i first-test-ip]"\ + + " [--ipnum|-n test-ip-num]"\ + + " [--help|-h]"\ + + " [--perform|-p op]"\ + + " [number-of-iterations]") + sys.exit(1) + + +def perform_op(ra, ip, op): + os.environ["OCF_RA_VERSION_MAJOR"] = "1" + os.environ["OCF_RA_VERSION_MINOR"] = "0" + os.environ["OCF_ROOT"] = BuildOptions.OCF_ROOT_DIR + os.environ["OCF_RESOURCE_INSTANCE"] = ip + os.environ["OCF_RESOURCE_TYPE"] = ra + os.environ["OCF_RESKEY_ip"] = ip + os.environ["HA_LOGFILE"] = "/dev/null" + os.environ["HA_LOGFACILITY"] = "local7" + path = BuildOptions.OCF_ROOT_DIR + "/resource.d/heartbeat/" + ra + return os.spawnvpe(os.P_WAIT, path, [ra, op], os.environ) + + +def audit(ra, iplist, ipstatus, summary): + passed = 1 + for ip in iplist: + ret = perform_op(ra, ip, "monitor") + if ret != ipstatus[ip]: + passed = 0 + log("audit: status of %s should be %d but it is %d\t [failure]" % + (ip,ipstatus[ip],ret)) + ipstatus[ip] = ret + summary["audit"]["called"] += 1; + if passed : + summary["audit"]["success"] += 1 + else : + summary["audit"]["failure"] += 1 + + +def log(towrite): + t = time.strftime("%Y/%m/%d_%H:%M:%S\t", time.localtime(time.time())) + logstr = t + " "+str(towrite) + syslog.syslog(logstr) + print(logstr) + +if __name__ == '__main__': + ra = "IPaddr" + ipbase = "127.0.0.10" + ipnum = 1 + itnum = 50 + perform = None + summary = { + "start":{"called":0,"success":0,"failure":0}, + "stop" :{"called":0,"success":0,"failure":0}, + "audit":{"called":0,"success":0,"failure":0} + } + syslog.openlog(sys.argv[0], 0, syslog.LOG_LOCAL7) + + # Process arguments... + skipthis = None + args = sys.argv[1:] + for i in range(0, len(args)) : + if skipthis : + skipthis = None + continue + elif args[i] == "-2" : + ra = "IPaddr2" + elif args[i] == "--ip" or args[i] == "-i" : + skipthis = 1 + ipbase = args[i+1] + elif args[i] == "--ipnum" or args[i] == "-n" : + skipthis = 1 + ipnum = int(args[i+1]) + elif args[i] == "--perform" or args[i] == "-p" : + skipthis = 1 + perform = args[i+1] + elif args[i] == "--help" or args[i] == "-h" : + usage() + else: + itnum = int(args[i]) + + log("Begin OCF IPaddr/IPaddr2 Test") + + # Generate the test ips + iplist = [] + ipstatus = {} + fields = ipbase.split('.') + for i in range(0, ipnum) : + ip = fields.join('.') + iplist.append(ip) + ipstatus[ip] = perform_op(ra,ip,"monitor") + fields[3] = str(int(fields[3])+1) + log("Test ip:" + str(iplist)) + + # If use ask perform an operation + if perform != None: + log("Perform opeartion %s"%perform) + for ip in iplist: + perform_op(ra, ip, perform) + log("Done") + sys.exit() + + log("RA Type:" + ra) + log("Test Count:" + str(itnum)) + + # Prepare Random + f = open("/dev/urandom", "r") + seed = struct.unpack("BBB", f.read(3)) + f.close() + #seed=(123,321,231) + rand = random.Random() + rand.seed(seed[0]) + log("Test Random Seed:" + str(seed)) + + # + # Begin Tests + + log(">>>>>>>>>>>>>>>>>>>>>>>>") + for i in range(0, itnum): + ip = rand.choice(iplist) + if ipstatus[ip] == 0: + op = "stop" + elif ipstatus[ip] == 7: + op = "start" + else : + op = rand.choice(["start","stop"]) + + ret = perform_op(ra, ip, op) + # update status + if op == "start" and ret == 0: + ipstatus[ip] = 0 + elif op == "stop" and ret == 0: + ipstatus[ip] = 7 + else : + ipstatus[ip] = 1 + result = "" + if ret == 0: + result = "success" + else : + result = "failure" + summary[op]["called"] += 1 + summary[op][result] += 1 + log( "%d:%s %s \t[%s]"%(i, op, ip, result)) + audit(ra, iplist, ipstatus, summary) + + log("<<<<<<<<<<<<<<<<<<<<<<<<") + log("start:\t" + str(summary["start"])) + log("stop: \t" + str(summary["stop"])) + log("audit:\t" + str(summary["audit"])) + diff --git a/cts/lab/__init__.py b/cts/lab/__init__.py new file mode 100644 index 0000000..abed502 --- /dev/null +++ b/cts/lab/__init__.py @@ -0,0 +1,15 @@ +"""Python modules for Pacemaker's Cluster Test Suite (CTS) + +This package provides the following modules: + +CIB +cib_xml +CM_common +CM_corosync +CTSaudits +CTS +CTSscenarios +CTStests +patterns +watcher +""" diff --git a/cts/lab/cib_xml.py b/cts/lab/cib_xml.py new file mode 100644 index 0000000..378dd29 --- /dev/null +++ b/cts/lab/cib_xml.py @@ -0,0 +1,319 @@ +""" CIB XML generator for Pacemaker's Cluster Test Suite (CTS) +""" + +__copyright__ = "Copyright 2008-2023 the Pacemaker project contributors" +__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" + +import sys + +from cts.CIB import CibBase + + +class XmlBase(CibBase): + def __init__(self, Factory, tag, _id, **kwargs): + CibBase.__init__(self, Factory, tag, _id, **kwargs) + + def show(self): + text = '''<%s''' % self.tag + if self.name: + text += ''' id="%s"''' % (self.name) + for k in list(self.kwargs.keys()): + text += ''' %s="%s"''' % (k, self.kwargs[k]) + + if not self.children: + text += '''/>''' + return text + + text += '''>''' + + for c in self.children: + text += c.show() + + text += '''</%s>''' % self.tag + return text + + def _run(self, operation, xml, section="all", options=""): + if self.name: + label = self.name + else: + label = "<%s>" % self.tag + self.Factory.debug("Writing out %s" % label) + fixed = "HOME=/root CIB_file="+self.Factory.tmpfile + fixed += " cibadmin --%s --scope %s %s --xml-text '%s'" % (operation, section, options, xml) + (rc, _) = self.Factory.rsh(self.Factory.target, fixed) + if rc != 0: + self.Factory.log("Configure call failed: "+fixed) + sys.exit(1) + + +class InstanceAttributes(XmlBase): + """ Create an <instance_attributes> section with name-value pairs """ + + def __init__(self, Factory, name, attrs): + XmlBase.__init__(self, Factory, "instance_attributes", name) + + # Create an <nvpair> for each attribute + for (attr, value) in list(attrs.items()): + self.add_child(XmlBase(Factory, "nvpair", "%s-%s" % (name, attr), + name=attr, value=value)) + + +class Node(XmlBase): + """ Create a <node> section with node attributes for one node """ + + def __init__(self, Factory, node_name, node_id, node_attrs): + XmlBase.__init__(self, Factory, "node", node_id, uname=node_name) + self.add_child(InstanceAttributes(Factory, "%s-1" % node_name, node_attrs)) + + +class Nodes(XmlBase): + """ Create a <nodes> section """ + + def __init__(self, Factory): + XmlBase.__init__(self, Factory, "nodes", None) + + def add_node(self, node_name, node_id, node_attrs): + self.add_child(Node(self.Factory, node_name, node_id, node_attrs)) + + def commit(self): + self._run("modify", self.show(), "configuration", "--allow-create") + + +class FencingTopology(XmlBase): + def __init__(self, Factory): + XmlBase.__init__(self, Factory, "fencing-topology", None) + + def level(self, index, target, devices, target_attr=None, target_value=None): + # Generate XML ID (sanitizing target-by-attribute levels) + + if target: + xml_id = "cts-%s.%d" % (target, index) + self.add_child(XmlBase(self.Factory, "fencing-level", xml_id, target=target, index=index, devices=devices)) + + else: + xml_id = "%s-%s.%d" % (target_attr, target_value, index) + child = XmlBase(self.Factory, "fencing-level", xml_id, index=index, devices=devices) + child["target-attribute"]=target_attr + child["target-value"]=target_value + self.add_child(child) + + def commit(self): + self._run("create", self.show(), "configuration", "--allow-create") + + +class Option(XmlBase): + def __init__(self, Factory, section="cib-bootstrap-options"): + XmlBase.__init__(self, Factory, "cluster_property_set", section) + + def __setitem__(self, key, value): + self.add_child(XmlBase(self.Factory, "nvpair", "cts-%s" % key, name=key, value=value)) + + def commit(self): + self._run("modify", self.show(), "crm_config", "--allow-create") + + +class OpDefaults(XmlBase): + def __init__(self, Factory): + XmlBase.__init__(self, Factory, "op_defaults", None) + self.meta = XmlBase(self.Factory, "meta_attributes", "cts-op_defaults-meta") + self.add_child(self.meta) + + def __setitem__(self, key, value): + self.meta.add_child(XmlBase(self.Factory, "nvpair", "cts-op_defaults-%s" % key, name=key, value=value)) + + def commit(self): + self._run("modify", self.show(), "configuration", "--allow-create") + + +class Alerts(XmlBase): + def __init__(self, Factory): + XmlBase.__init__(self, Factory, "alerts", None) + self.alert_count = 0 + + def add_alert(self, path, recipient): + self.alert_count = self.alert_count + 1 + alert = XmlBase(self.Factory, "alert", "alert-%d" % self.alert_count, + path=path) + recipient1 = XmlBase(self.Factory, "recipient", + "alert-%d-recipient-1" % self.alert_count, + value=recipient) + alert.add_child(recipient1) + self.add_child(alert) + + def commit(self): + self._run("modify", self.show(), "configuration", "--allow-create") + + +class Expression(XmlBase): + def __init__(self, Factory, name, attr, op, value=None): + XmlBase.__init__(self, Factory, "expression", name, attribute=attr, operation=op) + if value: + self["value"] = value + + +class Rule(XmlBase): + def __init__(self, Factory, name, score, op="and", expr=None): + XmlBase.__init__(self, Factory, "rule", "%s" % name) + self["boolean-op"] = op + self["score"] = score + if expr: + self.add_child(expr) + + +class Resource(XmlBase): + def __init__(self, Factory, name, rtype, standard, provider=None): + XmlBase.__init__(self, Factory, "native", name) + + self.rtype = rtype + self.standard = standard + self.provider = provider + + self.op = [] + self.meta = {} + self.param = {} + + self.scores = {} + self.needs = {} + self.coloc = {} + + if self.standard == "ocf" and not provider: + self.provider = "heartbeat" + elif self.standard == "lsb": + self.provider = None + + def __setitem__(self, key, value): + self.add_param(key, value) + + def add_op(self, name, interval, **kwargs): + self.op.append( + XmlBase(self.Factory, "op", "%s-%s" % (name, interval), name=name, interval=interval, **kwargs)) + + def add_param(self, name, value): + self.param[name] = value + + def add_meta(self, name, value): + self.meta[name] = value + + def prefer(self, node, score="INFINITY", rule=None): + if not rule: + rule = Rule(self.Factory, "prefer-%s-r" % node, score, + expr=Expression(self.Factory, "prefer-%s-e" % node, "#uname", "eq", node)) + self.scores[node] = rule + + def after(self, resource, kind="Mandatory", first="start", then="start", **kwargs): + kargs = kwargs.copy() + kargs["kind"] = kind + if then: + kargs["first-action"] = "start" + kargs["then-action"] = then + + if first: + kargs["first-action"] = first + + self.needs[resource] = kargs + + def colocate(self, resource, score="INFINITY", role=None, withrole=None, **kwargs): + kargs = kwargs.copy() + kargs["score"] = score + if role: + kargs["rsc-role"] = role + if withrole: + kargs["with-rsc-role"] = withrole + + self.coloc[resource] = kargs + + def constraints(self): + text = "<constraints>" + + for k in list(self.scores.keys()): + text += '''<rsc_location id="prefer-%s" rsc="%s">''' % (k, self.name) + text += self.scores[k].show() + text += '''</rsc_location>''' + + for k in list(self.needs.keys()): + text += '''<rsc_order id="%s-after-%s" first="%s" then="%s"''' % (self.name, k, k, self.name) + kargs = self.needs[k] + for kw in list(kargs.keys()): + text += ''' %s="%s"''' % (kw, kargs[kw]) + text += '''/>''' + + for k in list(self.coloc.keys()): + text += '''<rsc_colocation id="%s-with-%s" rsc="%s" with-rsc="%s"''' % (self.name, k, self.name, k) + kargs = self.coloc[k] + for kw in list(kargs.keys()): + text += ''' %s="%s"''' % (kw, kargs[kw]) + text += '''/>''' + + text += "</constraints>" + return text + + def show(self): + text = '''<primitive id="%s" class="%s" type="%s"''' % (self.name, self.standard, self.rtype) + if self.provider: + text += ''' provider="%s"''' % (self.provider) + text += '''>''' + + if len(self.meta) > 0: + text += '''<meta_attributes id="%s-meta">''' % self.name + for p in list(self.meta.keys()): + text += '''<nvpair id="%s-%s" name="%s" value="%s"/>''' % (self.name, p, p, self.meta[p]) + text += '''</meta_attributes>''' + + if len(self.param) > 0: + text += '''<instance_attributes id="%s-params">''' % self.name + for p in list(self.param.keys()): + text += '''<nvpair id="%s-%s" name="%s" value="%s"/>''' % (self.name, p, p, self.param[p]) + text += '''</instance_attributes>''' + + if len(self.op) > 0: + text += '''<operations>''' + for o in self.op: + key = o.name + o.name = "%s-%s" % (self.name, key) + text += o.show() + o.name = key + text += '''</operations>''' + + text += '''</primitive>''' + return text + + def commit(self): + self._run("create", self.show(), "resources") + self._run("modify", self.constraints()) + + +class Group(Resource): + def __init__(self, Factory, name): + Resource.__init__(self, Factory, name, None, None) + self.tag = "group" + + def __setitem__(self, key, value): + self.add_meta(key, value) + + def show(self): + text = '''<%s id="%s">''' % (self.tag, self.name) + + if len(self.meta) > 0: + text += '''<meta_attributes id="%s-meta">''' % self.name + for p in list(self.meta.keys()): + text += '''<nvpair id="%s-%s" name="%s" value="%s"/>''' % (self.name, p, p, self.meta[p]) + text += '''</meta_attributes>''' + + for c in self.children: + text += c.show() + text += '''</%s>''' % self.tag + return text + + +class Clone(Group): + def __init__(self, Factory, name, child=None): + Group.__init__(self, Factory, name) + self.tag = "clone" + if child: + self.add_child(child) + + def add_child(self, resource): + if not self.children: + self.children.append(resource) + else: + self.Factory.log("Clones can only have a single child. Ignoring %s" % resource.name) diff --git a/cts/lab/cluster_test.in b/cts/lab/cluster_test.in new file mode 100755 index 0000000..1741b47 --- /dev/null +++ b/cts/lab/cluster_test.in @@ -0,0 +1,175 @@ +#!@BASH_PATH@ +# +# Copyright 2008-2020 the Pacemaker project contributors +# +# The version control history for this file may have further details. +# +# This source code is licensed under the GNU General Public License version 2 +# or later (GPLv2+) WITHOUT ANY WARRANTY. +# +if [ -e ~/.cts ]; then + . ~/.cts +fi +anyAsked=0 + +[ $# -lt 1 ] || CTS_numtests=$1 + +die() { echo "$@"; exit 1; } + +if [ -z "$CTS_asked_once" ]; then + anyAsked=1 + echo "This script should only be executed on the test exerciser." + echo "The test exerciser will remotely execute the actions required by the" + echo "tests and should not be part of the cluster itself." + + read -p "Is this host intended to be the test exerciser? (yN) " doUnderstand + [ "$doUnderstand" = "y" ] \ + || die "This script must be executed on the test exerciser" +fi + +if [ -z "$CTS_node_list" ]; then + anyAsked=1 + read -p "Please list your cluster nodes (eg. node1 node2 node3): " CTS_node_list +else + echo "Beginning test of cluster: $CTS_node_list" +fi + +if [ -z "$CTS_stack" ]; then + anyAsked=1 + read -p "Which cluster stack are you using? ([corosync]): " CTS_stack + [ -n "$CTS_stack" ] || CTS_stack=corosync +else + echo "Using the $CTS_stack cluster stack" +fi + +[ "${CTS_node_list}" = "${CTS_node_list/$HOSTNAME/}" ] \ + || die "This script must be executed on the test exerciser, and the test exerciser cannot be part of the cluster" + +printf "+ Bootstrapping ssh... " +if [ -z "$SSH_AUTH_SOCK" ]; then + printf "\n + Initializing SSH " + eval "$(ssh-agent)" + echo " + Adding identities..." + ssh-add + rc=$? + if [ $rc -ne 0 ]; then + echo " -- No identities added" + printf "\nThe ability to open key-based 'ssh' connections (as the user 'root') is required to use CTS.\n" + + read -p " - Do you want this program to help you create one? (yN) " auto_fix + if [ "$auto_fix" = "y" ]; then + ssh-keygen -t dsa + ssh-add + else + die "Please run 'ssh-keygen -t dsa' to create a new key" + fi + fi +else + echo "OK" +fi + +test_ok=1 +printf "+ Testing ssh configuration... " +for n in $CTS_node_list; do + ssh -l root -o PasswordAuthentication=no -o ConnectTimeout=5 "$n" /bin/true + rc=$? + if [ $rc -ne 0 ]; then + echo " - connection to $n failed" + test_ok=0 + fi +done + +if [ $test_ok -eq 0 ]; then + printf "\nThe ability to open key-based 'ssh' connections (as the user 'root') is required to use CTS.\n" + + read -p " - Do you want this program to help you with such a setup? (yN) " auto_fix + if [ "$auto_fix" = "y" ]; then + # XXX are we picking the most suitable identity? + privKey=$(ssh-add -L | head -n1 | cut -d" " -f3) + sshCopyIdOpts="-o User=root" + [ -z "$privKey" ] || sshCopyIdOpts+=" -i \"${privKey}.pub\"" + for n in $CTS_node_list; do + eval "ssh-copy-id $sshCopyIdOpts \"${n}\"" \ + || die "Attempt to 'ssh-copy-id $sshCopyIdOpts \"$n\"' failed" + done + else + die "Please install one of your SSH public keys to root's account on all cluster nodes" + fi +fi +echo "OK" + +if [ -z "$CTS_logfile" ]; then + anyAsked=1 + read -p " + Where does/should syslog store logs from remote hosts? (/var/log/messages) " CTS_logfile + [ -n "$CTS_logfile" ] || CTS_logfile=/var/log/messages +fi + +[ -e "$CTS_logfile" ] || die "$CTS_logfile doesn't exist" + +if [ -z "$CTS_logfacility" ]; then + anyAsked=1 + read -p " + Which log facility does the cluster use? (daemon) " CTS_logfacility + [ -n "$CTS_logfacility" ] || CTS_logfacility=daemon +fi + +if [ -z "$CTS_boot" ]; then + read -p "+ Is the cluster software started automatically when a node boots? [yN] " CTS_boot + if [ -z "$CTS_boot" ]; then + CTS_boot=0 + else + case $CTS_boot in + 1|y|Y) CTS_boot=1;; + *) CTS_boot=0;; + esac + fi +fi + +if [ -z "$CTS_numtests" ]; then + read -p "+ How many test iterations should be performed? (500) " CTS_numtests + [ -n "$CTS_numtests" ] || CTS_numtests=500 +fi + +if [ -z "$CTS_asked_once" ]; then + anyAsked=1 + read -p "+ What type of STONITH agent do you use? (none) " CTS_stonith + [ -z "$CTS_stonith" ] \ + || read -p "+ List any STONITH agent parameters (eq. device_host=switch.power.com): " CTS_stonith_args + [ -n "$CTS_adv" ] \ + || read -p "+ (Advanced) Any extra CTS parameters? (none) " CTS_adv +fi + +[ $anyAsked -eq 0 ] \ + || read -p "+ Save values to ~/.cts for next time? (yN) " doSave + +if [ "$doSave" = "y" ]; then + cat > ~/.cts <<-EOF + # CTS Test data + CTS_stack="$CTS_stack" + CTS_node_list="$CTS_node_list" + CTS_logfile="$CTS_logfile" + CTS_logport="$CTS_logport" + CTS_logfacility="$CTS_logfacility" + CTS_asked_once=1 + CTS_adv="$CTS_adv" + CTS_stonith="$CTS_stonith" + CTS_stonith_args="$CTS_stonith_args" + CTS_boot="$CTS_boot" +EOF +fi + +cts_extra="" +if [ -n "$CTS_stonith" ]; then + cts_extra="$cts_extra --stonith-type $CTS_stonith" + [ -z "$CTS_stonith_args" ] \ + || cts_extra="$cts_extra --stonith-params \"$CTS_stonith_args\"" +else + cts_extra="$cts_extra --stonith 0" + echo " - Testing a cluster without STONITH is like a blunt pencil... pointless" +fi + +printf "\nAll set to go for %d iterations!\n" "$CTS_numtests" +[ $anyAsked -ne 0 ] \ + || echo "+ To use a different configuration, remove ~/.cts and re-run cts (or edit it manually)." + +echo Now paste the following command into this shell: +echo "@PYTHON@ `dirname "$0"`/CTSlab.py -L \"$CTS_logfile\" --syslog-facility \"$CTS_logfacility\" --no-unsafe-tests --stack \"$CTS_stack\" $CTS_adv --at-boot \"$CTS_boot\" $cts_extra \"$CTS_numtests\" --nodes \"$CTS_node_list\"" diff --git a/cts/lab/cts-log-watcher.in b/cts/lab/cts-log-watcher.in new file mode 100644 index 0000000..cee9c94 --- /dev/null +++ b/cts/lab/cts-log-watcher.in @@ -0,0 +1,84 @@ +#!@PYTHON@ +""" Remote log reader for Pacemaker's Cluster Test Suite (CTS) + +Reads a specified number of lines from the supplied offset +Returns the current offset +Contains logic for handling truncation +""" + +__copyright__ = "Copyright 2014-2020 the Pacemaker project contributors" +__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" + +import sys +import os +import fcntl + +if __name__ == '__main__': + + limit = 0 + offset = 0 + prefix = '' + filename = '/var/log/messages' + + skipthis=None + args=sys.argv[1:] + for i in range(0, len(args)): + if skipthis: + skipthis=None + continue + + elif args[i] == '-l' or args[i] == '--limit': + skipthis=1 + limit = int(args[i+1]) + + elif args[i] == '-f' or args[i] == '--filename': + skipthis=1 + filename = args[i+1] + + elif args[i] == '-o' or args[i] == '--offset': + skipthis=1 + offset = args[i+1] + + elif args[i] == '-p' or args[i] == '--prefix': + skipthis=1 + prefix = args[i+1] + + elif args[i] == '-t' or args[i] == '--tag': + skipthis=1 + + if not os.access(filename, os.R_OK): + print(prefix + 'Last read: %d, limit=%d, count=%d - unreadable' % (0, limit, 0)) + sys.exit(1) + + logfile=open(filename, 'r') + logfile.seek(0, os.SEEK_END) + newsize=logfile.tell() + + if offset != 'EOF': + offset = int(offset) + if newsize >= offset: + logfile.seek(offset) + else: + print(prefix + ('File truncated from %d to %d' % (offset, newsize))) + if (newsize*1.05) < offset: + logfile.seek(0) + # else: we probably just lost a few logs after a fencing op + # continue from the new end + # TODO: accept a timestamp and discard all messages older than it + + # Don't block when we reach EOF + fcntl.fcntl(logfile.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) + + count = 0 + while True: + if logfile.tell() >= newsize: break + elif limit and count >= limit: break + + line = logfile.readline() + if not line: break + + print(line.strip()) + count += 1 + + print(prefix + 'Last read: %d, limit=%d, count=%d' % (logfile.tell(), limit, count)) + logfile.close() diff --git a/cts/lab/cts.in b/cts/lab/cts.in new file mode 100755 index 0000000..5b3aaab --- /dev/null +++ b/cts/lab/cts.in @@ -0,0 +1,262 @@ +#!@BASH_PATH@ +# +# Copyright 2012-2023 the Pacemaker project contributors +# +# The version control history for this file may have further details. +# +# This source code is licensed under the GNU General Public License version 2 +# or later (GPLv2+) WITHOUT ANY WARRANTY. +# + +# e.g. /etc/sysconfig or /etc/default +CONFIG_DIR=@CONFIGDIR@ + +cts_root=`dirname $0` + +logfile=0 +summary=0 +verbose=0 +watch=0 +saved=0 +tests="" + +install=0 +clean=0 +kill=0 +run=0 +boot=0 +setup=0 +target=rhel-7 +cmd="" +trace="" + +custom_log="" +patterns="-e CTS:" + + +helpmsg=$(cat <<EOF +Usage: %s [options] {[setup [TARGET]] | [OTHER-CMDS]} + +[--]help, -h show help screen and exit +-x turn on debugging +-a show relevant screen sessions and exit +-c,-g CLUSTER_NAME set the cluster name +-S show summary from the last CTS run +-s show summary for the current log (see -l) +-v increase verbosity +-p (currently unused) +-e PATTERN grep pattern to apply when 'summary' or 'watch' requested +-l print the filename of the log that would be operated on +-w continous (filtered) monitoring of the log file +-f,-sf FILE show summary for the provided log +-t TEST, [0-9]* add a test to the working set +[--]kill request termination of cluster software +[--]run request CTS run (passing remaining arguments through) +[--]boot, start request CTS run (with --boot option) +[--]clean request cleaning up after CTS run +[--]install, --inst request installing packages to get ready to run CTS +[--]setup request initialization to get ready to run CTS +trace-ls, tls list traced functions +trace-add, tadd FUNC add a function to the list of traced ones +trace-rm, trm FUNC remove a function from the list of traced ones +trace-set, tset FUNC set function(s) as the only to be traced +(f|fedora|r|rhel).* specify target distro +-- delimits tests that follow +EOF +) + +while true; do + case $1 in + -h|--help|help) printf "${helpmsg}\n" "$0"; exit;; + -x) set -x; shift;; + -a) + screen -ls | grep cts + exit 0;; + -c|-g) cluster_name=$2; shift; shift;; + -S) summary=1; saved=1; shift;; + -s) summary=1; shift;; + -v) verbose=`expr $verbose + 1`; shift;; + -p) shift;; + -e) patterns="$patterns -e `echo $2 | sed 's/ /\\\W/g'`"; shift; shift;; + -l) logfile=1; shift;; + -w) watch=1; shift;; + -f|-sf) summary=1; custom_log=$2; shift; shift;; + -t) tests="$tests $2"; shift; shift;; + [0-9]*) tests="$tests $1"; shift;; + --kill|kill) kill=1; shift; break;; + --run|run) run=1; shift; break;; + --boot|boot|start) boot=1; clean=1; shift; break;; + --clean|clean) clean=1; shift;; + --inst|--install|install) install=1; clean=1; shift;; + --setup|setup) setup=1; shift;; + trace-ls|tls) cmd=$1; shift;; + trace-add|tadd|trace-rm|trm|trace-set|tset) cmd=$1; trace=$2; shift; shift;; + f*) + target="fedora-`echo $1 | sed -e s/fedora// -e s/-// -e s/f//`" + shift;; + r|rhel) target="rhel-7"; shift;; + r*) + target="rhel-`echo $1 | sed -e s/rhel// -e s/-// -e s/r//`" + shift;; + --) shift; tests="$tests $*"; break;; + "") break;; + *) echo "Unknown argument: $1"; exit 1;; + esac +done + +# Add the location of this script +export PATH="$PATH:$cts_root" +which cluster-helper &>/dev/null +if [ $? != 0 ]; then + echo $0 needs the cluster-helper script to be in your path + exit 1 +fi + +which cluster-clean &>/dev/null +if [ $? != 0 ]; then + echo $0 needs the cluster-clean script to be in your path + exit 1 +fi + +if [ "x$cluster_name" = x ] || [ "x$cluster_name" = xpick ]; then + clusters=`ls -1 ~/.dsh/group/[a-z]+[0-9] | sed s/.*group.// | tr '\n' ' ' ` + + echo "custom) interactively define a cluster" + for i in $clusters; do + echo "$i) `cluster-helper --list short -g $i`" + done + + read -p "Choose a cluster [custom]: " cluster_name + echo +fi + +if [ -z $cluster_name ]; then + cluster_name=custom +fi + + +case $cluster_name in + custom) + read -p "Cluster name: " cluster_name + read -p "Cluster hosts: " cluster_hosts + read -p "Cluster log file: " cluster_log + cluster-helper add -g "$cluster_name" -w "$cluster_hosts" + ;; + *) + cluster_hosts=`cluster-helper --list short -g $cluster_name` + cluster_log=~/cluster-$cluster_name.log; + ;; +esac + +if [ x$cmd != x ]; then + config="${CONFIG_DIR}/pacemaker" + case $cmd in + trace-ls|tls) + cluster-helper -g $cluster_name -- grep PCMK_trace_functions $config + ;; + trace-add|tadd) + echo "Adding $trace to PCMK_trace_functions" + cluster-helper -g $cluster_name -- sed -i "s/.*PCMK_trace_functions=/PCMK_trace_functions=$trace,/" $config + ;; + trace-rm|trm) + echo "Removing $trace from PCMK_trace_functions" + cluster-helper -g $cluster_name -- sed -i "s/.*PCMK_trace_functions=\\\\\\(.*\\\\\\)$trace,\\\\\\(.*\\\\\\)/PCMK_trace_functions=\\\\\\1\\\\\\2/" $config + ;; + trace-set|tset) + echo "Setting PCMK_trace_functions to '$trace'" + cluster-helper -g $cluster_name -- sed -i "s/.*PCMK_trace_functions.*/PCMK_trace_functions=$trace/" $config + ;; + esac + exit 0 +fi + +if [ $run = 1 ]; then + install=1 + clean=1 +fi + +if [ $clean = 1 ]; then + rm -f $cluster_log; cluster-clean -g $cluster_name --kill +elif [ $kill = 1 ]; then + cluster-clean -g $cluster_name --kill-only + exit 0 +fi + +if [ $install = 1 ]; then + cluster-helper -g $cluster_name -- yum install -y pacemaker pacemaker-debuginfo pacemaker-cts libqb libqb-debuginfo +fi + +if [ $setup = 1 ]; then + cluster-init -g $cluster_name $target -u --test + exit 0 + +elif [ $boot = 1 ]; then + $cts_root/CTSlab.py -r -c -g $cluster_name --boot + rc=$? + if [ $rc = 0 ]; then + echo "The cluster is ready..." + fi + exit $rc + +elif [ $run = 1 ]; then + $cts_root/CTSlab.py -r -c -g $cluster_name 500 "$@" + exit $? + +elif [ $clean = 1 ]; then + exit 0 +fi + +screen -ls | grep cts-$cluster_name &>/dev/null +active=$? + +if [ ! -z $custom_log ]; then + cluster_log=$custom_log +fi + +if [ "x$tests" != x ] && [ "x$tests" != "x " ]; then + for t in $tests; do + echo "crm_report --cts-log $cluster_log -d -T $t" + crm_report --cts-log $cluster_log -d -T $t + done + +elif [ $logfile = 1 ]; then + echo $cluster_log + +elif [ $summary = 1 ]; then + files=$cluster_log + if [ $saved = 1 ]; then + files=`ls -1tr ~/CTS-*/cluster-log.txt` + fi + for f in $files; do + echo $f + case $verbose in + 0) cat -n $f | grep $patterns | grep -v "CTS: debug:" + ;; + 1) cat -n $f | grep $patterns | grep -v "CTS:.* cmd:" + ;; + *) cat -n $f | grep $patterns + ;; + esac + echo "" + done + +elif [ $watch = 1 ]; then + case $verbose in + 0) tail -F $cluster_log | grep $patterns | grep -v "CTS: debug:" + ;; + 1) tail -F $cluster_log | grep $patterns | grep -v "CTS:.* cmd:" + ;; + *) tail -F $cluster_log | grep $patterns + ;; + esac + +elif [ $active = 0 ]; then + screen -x cts-$cluster_name + +else + touch $cluster_log + +# . ~/.bashrc + export cluster_name cluster_hosts cluster_log + screen -S cts-$cluster_name bash +fi |