summaryrefslogtreecommitdiffstats
path: root/cts/lab
diff options
context:
space:
mode:
Diffstat (limited to 'cts/lab')
-rw-r--r--cts/lab/CIB.py518
-rw-r--r--cts/lab/CM_corosync.py60
-rwxr-xr-xcts/lab/CTSaudits.py879
-rw-r--r--cts/lab/CTSlab.py.in135
-rw-r--r--cts/lab/CTSscenarios.py563
-rw-r--r--cts/lab/CTStests.py3178
-rw-r--r--cts/lab/ClusterManager.py940
-rw-r--r--cts/lab/Makefile.am31
-rw-r--r--cts/lab/OCFIPraTest.py.in173
-rw-r--r--cts/lab/__init__.py15
-rw-r--r--cts/lab/cib_xml.py319
-rwxr-xr-xcts/lab/cluster_test.in175
-rw-r--r--cts/lab/cts-log-watcher.in84
-rwxr-xr-xcts/lab/cts.in262
14 files changed, 7332 insertions, 0 deletions
diff --git a/cts/lab/CIB.py b/cts/lab/CIB.py
new file mode 100644
index 0000000..5981654
--- /dev/null
+++ b/cts/lab/CIB.py
@@ -0,0 +1,518 @@
+""" CIB generator for Pacemaker's Cluster Test Suite (CTS)
+"""
+
+__copyright__ = "Copyright 2008-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import os
+import warnings
+import tempfile
+
+from pacemaker.buildoptions import BuildOptions
+from pacemaker._cts.CTS import CtsLab
+
+
+class CibBase(object):
+ def __init__(self, Factory, tag, _id, **kwargs):
+ self.tag = tag
+ self.name = _id
+ self.kwargs = kwargs
+ self.children = []
+ self.Factory = Factory
+
+ def __repr__(self):
+ return "%s-%s" % (self.tag, self.name)
+
+ def add_child(self, child):
+ self.children.append(child)
+
+ def __setitem__(self, key, value):
+ if value:
+ self.kwargs[key] = value
+ else:
+ self.kwargs.pop(key, None)
+
+from cts.cib_xml import *
+
+
+class ConfigBase(object):
+ cts_cib = None
+ version = "unknown"
+ Factory = None
+
+ def __init__(self, CM, factory, tmpfile=None):
+ self.CM = CM
+ self.Factory = factory
+
+ if not tmpfile:
+ warnings.filterwarnings("ignore")
+ f=tempfile.NamedTemporaryFile(delete=True)
+ f.close()
+ tmpfile = f.name
+ warnings.resetwarnings()
+
+ self.Factory.tmpfile = tmpfile
+
+ def version(self):
+ return self.version
+
+ def NextIP(self):
+ ip = self.CM.Env["IPBase"]
+ if ":" in ip:
+ (prefix, sep, suffix) = ip.rpartition(":")
+ suffix = str(hex(int(suffix, 16)+1)).lstrip("0x")
+ else:
+ (prefix, sep, suffix) = ip.rpartition(".")
+ suffix = str(int(suffix)+1)
+
+ ip = prefix + sep + suffix
+ self.CM.Env["IPBase"] = ip
+ return ip.strip()
+
+
+class CIB12(ConfigBase):
+ version = "pacemaker-1.2"
+ counter = 1
+
+ def _show(self, command=""):
+ output = ""
+ (_, result) = self.Factory.rsh(self.Factory.target, "HOME=/root CIB_file="+self.Factory.tmpfile+" cibadmin -Ql "+command, verbose=1)
+ for line in result:
+ output += line
+ self.Factory.debug("Generated Config: "+line)
+ return output
+
+ def NewIP(self, name=None, standard="ocf"):
+ if self.CM.Env["IPagent"] == "IPaddr2":
+ ip = self.NextIP()
+ if not name:
+ if ":" in ip:
+ (prefix, sep, suffix) = ip.rpartition(":")
+ name = "r"+suffix
+ else:
+ name = "r"+ip
+
+ r = Resource(self.Factory, name, self.CM.Env["IPagent"], standard)
+ r["ip"] = ip
+
+ if ":" in ip:
+ r["cidr_netmask"] = "64"
+ r["nic"] = "eth0"
+ else:
+ r["cidr_netmask"] = "32"
+
+ else:
+ if not name:
+ name = "r%s%d" % (self.CM.Env["IPagent"], self.counter)
+ self.counter = self.counter + 1
+ r = Resource(self.Factory, name, self.CM.Env["IPagent"], standard)
+
+ r.add_op("monitor", "5s")
+ return r
+
+ def get_node_id(self, node_name):
+ """ Check the cluster configuration for a node ID. """
+
+ # We can't account for every possible configuration,
+ # so we only return a node ID if:
+ # * The node is specified in /etc/corosync/corosync.conf
+ # with "ring0_addr:" equal to node_name and "nodeid:"
+ # explicitly specified.
+ # In all other cases, we return 0.
+ node_id = 0
+
+ # awkward command: use } as record separator
+ # so each corosync.conf "object" is one record;
+ # match the "node {" record that has "ring0_addr: node_name";
+ # then print the substring of that record after "nodeid:"
+ (rc, output) = self.Factory.rsh(self.Factory.target,
+ r"""awk -v RS="}" """
+ r"""'/^(\s*nodelist\s*{)?\s*node\s*{.*(ring0_addr|name):\s*%s(\s+|$)/"""
+ r"""{gsub(/.*nodeid:\s*/,"");gsub(/\s+.*$/,"");print}' %s"""
+ % (node_name, BuildOptions.COROSYNC_CONFIG_FILE), verbose=1)
+
+ if rc == 0 and len(output) == 1:
+ try:
+ node_id = int(output[0])
+ except ValueError:
+ node_id = 0
+
+ return node_id
+
+ def install(self, target):
+ old = self.Factory.tmpfile
+
+ # Force a rebuild
+ self.cts_cib = None
+
+ self.Factory.tmpfile = BuildOptions.CIB_DIR + "/cib.xml"
+ self.contents(target)
+ self.Factory.rsh(self.Factory.target, "chown " + BuildOptions.DAEMON_USER + " " + self.Factory.tmpfile)
+
+ self.Factory.tmpfile = old
+
+ def contents(self, target=None):
+ # fencing resource
+ if self.cts_cib:
+ return self.cts_cib
+
+ if target:
+ self.Factory.target = target
+
+ self.Factory.rsh(self.Factory.target, "HOME=/root cibadmin --empty %s > %s" % (self.version, self.Factory.tmpfile))
+ self.num_nodes = len(self.CM.Env["nodes"])
+
+ no_quorum = "stop"
+ if self.num_nodes < 3:
+ no_quorum = "ignore"
+ self.Factory.log("Cluster only has %d nodes, configuring: no-quorum-policy=ignore" % self.num_nodes)
+
+ # We don't need a nodes section unless we add attributes
+ stn = None
+
+ # Fencing resource
+ # Define first so that the shell doesn't reject every update
+ if self.CM.Env["DoFencing"]:
+
+ # Define the "real" fencing device
+ st = Resource(self.Factory, "Fencing", ""+self.CM.Env["stonith-type"], "stonith")
+
+ # Set a threshold for unreliable stonith devices such as the vmware one
+ st.add_meta("migration-threshold", "5")
+ st.add_op("monitor", "120s", timeout="120s")
+ st.add_op("stop", "0", timeout="60s")
+ st.add_op("start", "0", timeout="60s")
+
+ # For remote node tests, a cluster node is stopped and brought back up
+ # as a remote node with the name "remote-OLDNAME". To allow fencing
+ # devices to fence these nodes, create a list of all possible node names.
+ all_node_names = [ prefix+n for n in self.CM.Env["nodes"] for prefix in ('', 'remote-') ]
+
+ # Add all parameters specified by user
+ entries = self.CM.Env["stonith-params"].split(',')
+ for entry in entries:
+ try:
+ (name, value) = entry.split('=', 1)
+ except ValueError:
+ print("Warning: skipping invalid fencing parameter: %s" % entry)
+ continue
+
+ # Allow user to specify "all" as the node list, and expand it here
+ if name in [ "hostlist", "pcmk_host_list" ] and value == "all":
+ value = ' '.join(all_node_names)
+
+ st[name] = value
+
+ st.commit()
+
+ # Test advanced fencing logic
+ if True:
+ stf_nodes = []
+ stt_nodes = []
+ attr_nodes = {}
+
+ # Create the levels
+ stl = FencingTopology(self.Factory)
+ for node in self.CM.Env["nodes"]:
+ # Remote node tests will rename the node
+ remote_node = "remote-" + node
+
+ # Randomly assign node to a fencing method
+ ftype = self.CM.Env.random_gen.choice(["levels-and", "levels-or ", "broadcast "])
+
+ # For levels-and, randomly choose targeting by node name or attribute
+ by = ""
+ if ftype == "levels-and":
+ node_id = self.get_node_id(node)
+ if node_id == 0 or self.CM.Env.random_gen.choice([True, False]):
+ by = " (by name)"
+ else:
+ attr_nodes[node] = node_id
+ by = " (by attribute)"
+
+ self.CM.log(" - Using %s fencing for node: %s%s" % (ftype, node, by))
+
+ if ftype == "levels-and":
+ # If targeting by name, add a topology level for this node
+ if node not in attr_nodes:
+ stl.level(1, node, "FencingPass,Fencing")
+
+ # Always target remote nodes by name, otherwise we would need to add
+ # an attribute to the remote node only during remote tests (we don't
+ # want nonexistent remote nodes showing up in the non-remote tests).
+ # That complexity is not worth the effort.
+ stl.level(1, remote_node, "FencingPass,Fencing")
+
+ # Add the node (and its remote equivalent) to the list of levels-and nodes.
+ stt_nodes.extend([node, remote_node])
+
+ elif ftype == "levels-or ":
+ for n in [ node, remote_node ]:
+ stl.level(1, n, "FencingFail")
+ stl.level(2, n, "Fencing")
+ stf_nodes.extend([node, remote_node])
+
+ # If any levels-and nodes were targeted by attribute,
+ # create the attributes and a level for the attribute.
+ if attr_nodes:
+ stn = Nodes(self.Factory)
+ for (node_name, node_id) in list(attr_nodes.items()):
+ stn.add_node(node_name, node_id, { "cts-fencing" : "levels-and" })
+ stl.level(1, None, "FencingPass,Fencing", "cts-fencing", "levels-and")
+
+ # Create a Dummy agent that always passes for levels-and
+ if len(stt_nodes):
+ stt = Resource(self.Factory, "FencingPass", "fence_dummy", "stonith")
+ stt["pcmk_host_list"] = " ".join(stt_nodes)
+ # Wait this many seconds before doing anything, handy for letting disks get flushed too
+ stt["random_sleep_range"] = "30"
+ stt["mode"] = "pass"
+ stt.commit()
+
+ # Create a Dummy agent that always fails for levels-or
+ if len(stf_nodes):
+ stf = Resource(self.Factory, "FencingFail", "fence_dummy", "stonith")
+ stf["pcmk_host_list"] = " ".join(stf_nodes)
+ # Wait this many seconds before doing anything, handy for letting disks get flushed too
+ stf["random_sleep_range"] = "30"
+ stf["mode"] = "fail"
+ stf.commit()
+
+ # Now commit the levels themselves
+ stl.commit()
+
+ o = Option(self.Factory)
+ o["stonith-enabled"] = self.CM.Env["DoFencing"]
+ o["start-failure-is-fatal"] = "false"
+ o["pe-input-series-max"] = "5000"
+ o["shutdown-escalation"] = "5min"
+ o["batch-limit"] = "10"
+ o["dc-deadtime"] = "5s"
+ o["no-quorum-policy"] = no_quorum
+
+ if self.CM.Env["DoBSC"]:
+ o["ident-string"] = "Linux-HA TEST configuration file - REMOVEME!!"
+
+ o.commit()
+
+ o = OpDefaults(self.Factory)
+ o["timeout"] = "90s"
+ o.commit()
+
+ # Commit the nodes section if we defined one
+ if stn is not None:
+ stn.commit()
+
+ # Add an alerts section if possible
+ if self.Factory.rsh.exists_on_all(self.CM.Env["notification-agent"], self.CM.Env["nodes"]):
+ alerts = Alerts(self.Factory)
+ alerts.add_alert(self.CM.Env["notification-agent"],
+ self.CM.Env["notification-recipient"])
+ alerts.commit()
+
+ # Add resources?
+ if self.CM.Env["CIBResource"]:
+ self.add_resources()
+
+ if self.CM.cluster_monitor == 1:
+ mon = Resource(self.Factory, "cluster_mon", "ocf", "ClusterMon", "pacemaker")
+ mon.add_op("start", "0", requires="nothing")
+ mon.add_op("monitor", "5s", requires="nothing")
+ mon["update"] = "10"
+ mon["extra_options"] = "-r -n"
+ mon["user"] = "abeekhof"
+ mon["htmlfile"] = "/suse/abeekhof/Export/cluster.html"
+ mon.commit()
+
+ #self._create('''location prefer-dc cluster_mon rule -INFINITY: \#is_dc eq false''')
+
+ # generate cib
+ self.cts_cib = self._show()
+
+ if self.Factory.tmpfile != BuildOptions.CIB_DIR + "/cib.xml":
+ self.Factory.rsh(self.Factory.target, "rm -f "+self.Factory.tmpfile)
+
+ return self.cts_cib
+
+ def add_resources(self):
+ # Per-node resources
+ for node in self.CM.Env["nodes"]:
+ name = "rsc_"+node
+ r = self.NewIP(name)
+ r.prefer(node, "100")
+ r.commit()
+
+ # Migrator
+ # Make this slightly sticky (since we have no other location constraints) to avoid relocation during Reattach
+ m = Resource(self.Factory, "migrator","Dummy", "ocf", "pacemaker")
+ m["passwd"] = "whatever"
+ m.add_meta("resource-stickiness","1")
+ m.add_meta("allow-migrate", "1")
+ m.add_op("monitor", "P10S")
+ m.commit()
+
+ # Ping the test exerciser
+ p = Resource(self.Factory, "ping-1","ping", "ocf", "pacemaker")
+ p.add_op("monitor", "60s")
+ p["host_list"] = self.CM.Env["cts-exerciser"]
+ p["name"] = "connected"
+ p["debug"] = "true"
+
+ c = Clone(self.Factory, "Connectivity", p)
+ c["globally-unique"] = "false"
+ c.commit()
+
+ # promotable clone resource
+ s = Resource(self.Factory, "stateful-1", "Stateful", "ocf", "pacemaker")
+ s.add_op("monitor", "15s", timeout="60s")
+ s.add_op("monitor", "16s", timeout="60s", role="Promoted")
+ ms = Clone(self.Factory, "promotable-1", s)
+ ms["promotable"] = "true"
+ ms["clone-max"] = self.num_nodes
+ ms["clone-node-max"] = 1
+ ms["promoted-max"] = 1
+ ms["promoted-node-max"] = 1
+
+ # Require connectivity to run the promotable clone
+ r = Rule(self.Factory, "connected", "-INFINITY", op="or")
+ r.add_child(Expression(self.Factory, "m1-connected-1", "connected", "lt", "1"))
+ r.add_child(Expression(self.Factory, "m1-connected-2", "connected", "not_defined", None))
+ ms.prefer("connected", rule=r)
+
+ ms.commit()
+
+ # Group Resource
+ g = Group(self.Factory, "group-1")
+ g.add_child(self.NewIP())
+
+ if self.CM.Env["have_systemd"]:
+ sysd = Resource(self.Factory, "petulant",
+ "pacemaker-cts-dummyd@10", "service")
+ sysd.add_op("monitor", "P10S")
+ g.add_child(sysd)
+ else:
+ g.add_child(self.NewIP())
+
+ g.add_child(self.NewIP())
+
+ # Make group depend on the promotable clone
+ g.after("promotable-1", first="promote", then="start")
+ g.colocate("promotable-1", "INFINITY", withrole="Promoted")
+
+ g.commit()
+
+ # LSB resource
+ lsb = Resource(self.Factory, "lsb-dummy", "LSBDummy", "lsb")
+ lsb.add_op("monitor", "5s")
+
+ # LSB with group
+ lsb.after("group-1")
+ lsb.colocate("group-1")
+
+ lsb.commit()
+
+
+class CIB20(CIB12):
+ version = "pacemaker-2.5"
+
+class CIB30(CIB12):
+ version = "pacemaker-3.7"
+
+#class HASI(CIB10):
+# def add_resources(self):
+# # DLM resource
+# self._create('''primitive dlm ocf:pacemaker:controld op monitor interval=120s''')
+# self._create('''clone dlm-clone dlm meta globally-unique=false interleave=true''')
+
+ # O2CB resource
+# self._create('''primitive o2cb ocf:ocfs2:o2cb op monitor interval=120s''')
+# self._create('''clone o2cb-clone o2cb meta globally-unique=false interleave=true''')
+# self._create('''colocation o2cb-with-dlm INFINITY: o2cb-clone dlm-clone''')
+# self._create('''order start-o2cb-after-dlm mandatory: dlm-clone o2cb-clone''')
+
+
+class ConfigFactory(object):
+ def __init__(self, CM):
+ self.CM = CM
+ self.rsh = self.CM.rsh
+ self.register("pacemaker12", CIB12, CM, self)
+ self.register("pacemaker20", CIB20, CM, self)
+ self.register("pacemaker30", CIB30, CM, self)
+# self.register("hae", HASI, CM, self)
+ if not self.CM.Env["ListTests"]:
+ self.target = self.CM.Env["nodes"][0]
+ self.tmpfile = None
+
+ def log(self, args):
+ self.CM.log("cib: %s" % args)
+
+ def debug(self, args):
+ self.CM.debug("cib: %s" % args)
+
+ def register(self, methodName, constructor, *args, **kargs):
+ """register a constructor"""
+ _args = [constructor]
+ _args.extend(args)
+ setattr(self, methodName, ConfigFactoryItem(*_args, **kargs))
+
+ def unregister(self, methodName):
+ """unregister a constructor"""
+ delattr(self, methodName)
+
+ def createConfig(self, name="pacemaker-1.0"):
+ if name == "pacemaker-1.0":
+ name = "pacemaker10";
+ elif name == "pacemaker-1.2":
+ name = "pacemaker12";
+ elif name == "pacemaker-2.0":
+ name = "pacemaker20";
+ elif name.startswith("pacemaker-3."):
+ name = "pacemaker30";
+ elif name == "hasi":
+ name = "hae";
+
+ if hasattr(self, name):
+ return getattr(self, name)()
+ else:
+ self.CM.log("Configuration variant '%s' is unknown. Defaulting to latest config" % name)
+
+ return self.pacemaker30()
+
+
+class ConfigFactoryItem(object):
+ def __init__(self, function, *args, **kargs):
+ self._function = function
+ self._args = args
+ self._kargs = kargs
+
+ def __call__(self, *args, **kargs):
+ """call function"""
+ _args = list(self._args)
+ _args.extend(args)
+ _kargs = self._kargs.copy()
+ _kargs.update(kargs)
+ return self._function(*_args,**_kargs)
+
+if __name__ == '__main__':
+ """ Unit test (pass cluster node names as command line arguments) """
+
+ import cts.CM_corosync
+ import sys
+
+ if len(sys.argv) < 2:
+ print("Usage: %s <node> ..." % sys.argv[0])
+ sys.exit(1)
+
+ args = [
+ "--nodes", " ".join(sys.argv[1:]),
+ "--clobber-cib",
+ "--populate-resources",
+ "--stack", "corosync",
+ "--test-ip-base", "fe80::1234:56:7890:1000",
+ "--stonith", "rhcs",
+ ]
+ env = CtsLab(args)
+ cm = CM_corosync.crm_corosync()
+ CibFactory = ConfigFactory(cm)
+ cib = CibFactory.createConfig("pacemaker-3.0")
+ print(cib.contents())
diff --git a/cts/lab/CM_corosync.py b/cts/lab/CM_corosync.py
new file mode 100644
index 0000000..dce7e98
--- /dev/null
+++ b/cts/lab/CM_corosync.py
@@ -0,0 +1,60 @@
+""" Corosync-specific class for Pacemaker's Cluster Test Suite (CTS)
+"""
+
+__copyright__ = "Copyright 2007-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from cts.ClusterManager import ClusterManager
+
+from pacemaker._cts.CTS import Process
+from pacemaker._cts.patterns import PatternSelector
+
+class crm_corosync(ClusterManager):
+ '''
+ Corosync version 2 cluster manager class
+ '''
+ def __init__(self, name=None):
+ if not name: name="crm-corosync"
+ ClusterManager.__init__(self)
+
+ self.fullcomplist = {}
+ self.templates = PatternSelector(self.name)
+
+ def Components(self):
+ complist = []
+ if not len(list(self.fullcomplist.keys())):
+ for c in [ "pacemaker-based", "pacemaker-controld", "pacemaker-attrd", "pacemaker-execd", "pacemaker-fenced" ]:
+ self.fullcomplist[c] = Process(
+ self, c,
+ pats = self.templates.get_component(c),
+ badnews_ignore = self.templates.get_component("%s-ignore" % c) +
+ self.templates.get_component("common-ignore"))
+
+ # the scheduler uses dc_pats instead of pats
+ self.fullcomplist["pacemaker-schedulerd"] = Process(
+ self, "pacemaker-schedulerd",
+ dc_pats = self.templates.get_component("pacemaker-schedulerd"),
+ badnews_ignore = self.templates.get_component("pacemaker-schedulerd-ignore") +
+ self.templates.get_component("common-ignore"))
+
+ # add (or replace) extra components
+ self.fullcomplist["corosync"] = Process(
+ self, "corosync",
+ pats = self.templates.get_component("corosync"),
+ badnews_ignore = self.templates.get_component("corosync-ignore") +
+ self.templates.get_component("common-ignore")
+ )
+
+ # Processes running under valgrind can't be shot with "killall -9 processname",
+ # so don't include them in the returned list
+ vgrind = self.Env["valgrind-procs"].split()
+ for key in list(self.fullcomplist.keys()):
+ if self.Env["valgrind-tests"]:
+ if key in vgrind:
+ self.log("Filtering %s from the component list as it is being profiled by valgrind" % key)
+ continue
+ if key == "pacemaker-fenced" and not self.Env["DoFencing"]:
+ continue
+ complist.append(self.fullcomplist[key])
+
+ return complist
diff --git a/cts/lab/CTSaudits.py b/cts/lab/CTSaudits.py
new file mode 100755
index 0000000..51a04f8
--- /dev/null
+++ b/cts/lab/CTSaudits.py
@@ -0,0 +1,879 @@
+""" Auditing classes for Pacemaker's Cluster Test Suite (CTS)
+"""
+
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import time, re, uuid
+
+from pacemaker.buildoptions import BuildOptions
+from pacemaker._cts.watcher import LogKind, LogWatcher
+
+class ClusterAudit(object):
+
+ def __init__(self, cm):
+ self.CM = cm
+
+ def __call__(self):
+ raise ValueError("Abstract Class member (__call__)")
+
+ def is_applicable(self):
+ '''Return TRUE if we are applicable in the current test configuration'''
+ raise ValueError("Abstract Class member (is_applicable)")
+ return 1
+
+ def log(self, args):
+ self.CM.log("audit: %s" % args)
+
+ def debug(self, args):
+ self.CM.debug("audit: %s" % args)
+
+ def name(self):
+ raise ValueError("Abstract Class member (name)")
+
+AllAuditClasses = [ ]
+
+
+class LogAudit(ClusterAudit):
+
+ def name(self):
+ return "LogAudit"
+
+ def __init__(self, cm):
+ self.CM = cm
+
+ def RestartClusterLogging(self, nodes=None):
+ if not nodes:
+ nodes = self.CM.Env["nodes"]
+
+ self.CM.debug("Restarting logging on: %s" % repr(nodes))
+
+ for node in nodes:
+ if self.CM.Env["have_systemd"]:
+ (rc, _) = self.CM.rsh(node, "systemctl stop systemd-journald.socket")
+ if rc != 0:
+ self.CM.log ("ERROR: Cannot stop 'systemd-journald' on %s" % node)
+
+ (rc, _) = self.CM.rsh(node, "systemctl start systemd-journald.service")
+ if rc != 0:
+ self.CM.log ("ERROR: Cannot start 'systemd-journald' on %s" % node)
+
+ (rc, _) = self.CM.rsh(node, "service %s restart" % self.CM.Env["syslogd"])
+ if rc != 0:
+ self.CM.log ("ERROR: Cannot restart '%s' on %s" % (self.CM.Env["syslogd"], node))
+
+ def _create_watcher(self, patterns, kind):
+ watch = LogWatcher(self.CM.Env["LogFileName"], patterns,
+ self.CM.Env["nodes"], kind, "LogAudit", 5,
+ silent=True)
+ watch.set_watch()
+ return watch
+
+ def TestLogging(self):
+ patterns = []
+ prefix = "Test message from"
+ suffix = str(uuid.uuid4())
+ watch = {}
+
+ for node in self.CM.Env["nodes"]:
+ # Look for the node name in two places to make sure
+ # that syslog is logging with the correct hostname
+ m = re.search("^([^.]+).*", node)
+ if m:
+ simple = m.group(1)
+ else:
+ simple = node
+ patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix))
+
+ watch_pref = self.CM.Env["LogWatcher"]
+ if watch_pref == LogKind.ANY:
+ kinds = [ LogKind.FILE ]
+ if self.CM.Env["have_systemd"]:
+ kinds += [ LogKind.JOURNAL ]
+ kinds += [ LogKind.REMOTE_FILE ]
+ for k in kinds:
+ watch[k] = self._create_watcher(patterns, k)
+ self.CM.log("Logging test message with identifier %s" % (suffix))
+ else:
+ watch[watch_pref] = self._create_watcher(patterns, watch_pref)
+
+ for node in self.CM.Env["nodes"]:
+ cmd = "logger -p %s.info %s %s %s" % (self.CM.Env["SyslogFacility"], prefix, node, suffix)
+
+ (rc, _) = self.CM.rsh(node, cmd, synchronous=False, verbose=0)
+ if rc != 0:
+ self.CM.log ("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node))
+
+ for k in list(watch.keys()):
+ w = watch[k]
+ if watch_pref == LogKind.ANY:
+ self.CM.log("Checking for test message in %s logs" % (k))
+ w.look_for_all(silent=True)
+ if w.unmatched:
+ for regex in w.unmatched:
+ self.CM.log("Test message [%s] not found in %s logs" % (regex, w.kind))
+ else:
+ if watch_pref == LogKind.ANY:
+ self.CM.log("Found test message in %s logs" % (k))
+ self.CM.Env["LogWatcher"] = k
+ return 1
+
+ return 0
+
+ def __call__(self):
+ max = 3
+ attempt = 0
+
+ self.CM.ns.wait_for_all_nodes(self.CM.Env["nodes"])
+ while attempt <= max and self.TestLogging() == 0:
+ attempt = attempt + 1
+ self.RestartClusterLogging()
+ time.sleep(60*attempt)
+
+ if attempt > max:
+ self.CM.log("ERROR: Cluster logging unrecoverable.")
+ return 0
+
+ return 1
+
+ def is_applicable(self):
+ if self.CM.Env["DoBSC"]:
+ return 0
+ if self.CM.Env["LogAuditDisabled"]:
+ return 0
+ return 1
+
+
+class DiskAudit(ClusterAudit):
+
+ def name(self):
+ return "DiskspaceAudit"
+
+ def __init__(self, cm):
+ self.CM = cm
+
+ def __call__(self):
+ result = 1
+ # @TODO Use directory of PCMK_logfile if set on host
+ dfcmd = "df -BM " + BuildOptions.LOG_DIR + " | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%'"
+
+ self.CM.ns.wait_for_all_nodes(self.CM.Env["nodes"])
+ for node in self.CM.Env["nodes"]:
+ (_, dfout) = self.CM.rsh(node, dfcmd, verbose=1)
+ if not dfout:
+ self.CM.log ("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node))
+ else:
+ dfout = dfout[0].strip()
+
+ try:
+ (used, remain) = dfout.split()
+ used_percent = int(used)
+ remaining_mb = int(remain)
+ except (ValueError, TypeError):
+ self.CM.log("Warning: df output '%s' from %s was invalid [%s, %s]"
+ % (dfout, node, used, remain))
+ else:
+ if remaining_mb < 10 or used_percent > 95:
+ self.CM.log("CRIT: Out of log disk space on %s (%d%% / %dMB)"
+ % (node, used_percent, remaining_mb))
+ result = None
+ if self.CM.Env["continue"]:
+ answer = "Y"
+ else:
+ try:
+ answer = input('Continue? [nY]')
+ except EOFError as e:
+ answer = "n"
+
+ if answer and answer == "n":
+ raise ValueError("Disk full on %s" % (node))
+
+ elif remaining_mb < 100 or used_percent > 90:
+ self.CM.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node))
+ return result
+
+ def is_applicable(self):
+ if self.CM.Env["DoBSC"]:
+ return 0
+ return 1
+
+
+class FileAudit(ClusterAudit):
+
+ def name(self):
+ return "FileAudit"
+
+ def __init__(self, cm):
+ self.CM = cm
+ self.known = []
+
+ def __call__(self):
+ result = 1
+
+ self.CM.ns.wait_for_all_nodes(self.CM.Env["nodes"])
+ for node in self.CM.Env["nodes"]:
+
+ (_, lsout) = self.CM.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1)
+ for line in lsout:
+ line = line.strip()
+ if line not in self.known:
+ result = 0
+ self.known.append(line)
+ self.CM.log("Warning: Pacemaker core file on %s: %s" % (node, line))
+
+ (_, lsout) = self.CM.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1)
+ for line in lsout:
+ line = line.strip()
+ if line not in self.known:
+ result = 0
+ self.known.append(line)
+ self.CM.log("Warning: Corosync core file on %s: %s" % (node, line))
+
+ if node in self.CM.ShouldBeStatus and self.CM.ShouldBeStatus[node] == "down":
+ clean = 0
+ (_, lsout) = self.CM.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1)
+ for line in lsout:
+ result = 0
+ clean = 1
+ self.CM.log("Warning: Stale IPC file on %s: %s" % (node, line))
+
+ if clean:
+ (_, lsout) = self.CM.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1)
+ for line in lsout:
+ self.CM.debug("ps[%s]: %s" % (node, line))
+
+ self.CM.rsh(node, "rm -rf /dev/shm/qb-*")
+
+ else:
+ self.CM.debug("Skipping %s" % node)
+
+ return result
+
+ def is_applicable(self):
+ return 1
+
+
+class AuditResource(object):
+ def __init__(self, cm, line):
+ fields = line.split()
+ self.CM = cm
+ self.line = line
+ self.type = fields[1]
+ self.id = fields[2]
+ self.clone_id = fields[3]
+ self.parent = fields[4]
+ self.rprovider = fields[5]
+ self.rclass = fields[6]
+ self.rtype = fields[7]
+ self.host = fields[8]
+ self.needs_quorum = fields[9]
+ self.flags = int(fields[10])
+ self.flags_s = fields[11]
+
+ if self.parent == "NA":
+ self.parent = None
+
+ def unique(self):
+ if self.flags & int("0x00000020", 16):
+ return 1
+ return 0
+
+ def orphan(self):
+ if self.flags & int("0x00000001", 16):
+ return 1
+ return 0
+
+ def managed(self):
+ if self.flags & int("0x00000002", 16):
+ return 1
+ return 0
+
+
+class AuditConstraint(object):
+ def __init__(self, cm, line):
+ fields = line.split()
+ self.CM = cm
+ self.line = line
+ self.type = fields[1]
+ self.id = fields[2]
+ self.rsc = fields[3]
+ self.target = fields[4]
+ self.score = fields[5]
+ self.rsc_role = fields[6]
+ self.target_role = fields[7]
+
+ if self.rsc_role == "NA":
+ self.rsc_role = None
+ if self.target_role == "NA":
+ self.target_role = None
+
+
+class PrimitiveAudit(ClusterAudit):
+ def name(self):
+ return "PrimitiveAudit"
+
+ def __init__(self, cm):
+ self.CM = cm
+
+ def doResourceAudit(self, resource, quorum):
+ rc = 1
+ active = self.CM.ResourceLocation(resource.id)
+
+ if len(active) == 1:
+ if quorum:
+ self.debug("Resource %s active on %s" % (resource.id, repr(active)))
+
+ elif resource.needs_quorum == 1:
+ self.CM.log("Resource %s active without quorum: %s"
+ % (resource.id, repr(active)))
+ rc = 0
+
+ elif not resource.managed():
+ self.CM.log("Resource %s not managed. Active on %s"
+ % (resource.id, repr(active)))
+
+ elif not resource.unique():
+ # TODO: Figure out a clever way to actually audit these resource types
+ if len(active) > 1:
+ self.debug("Non-unique resource %s is active on: %s"
+ % (resource.id, repr(active)))
+ else:
+ self.debug("Non-unique resource %s is not active" % resource.id)
+
+ elif len(active) > 1:
+ self.CM.log("Resource %s is active multiple times: %s"
+ % (resource.id, repr(active)))
+ rc = 0
+
+ elif resource.orphan():
+ self.debug("Resource %s is an inactive orphan" % resource.id)
+
+ elif len(self.inactive_nodes) == 0:
+ self.CM.log("WARN: Resource %s not served anywhere" % resource.id)
+ rc = 0
+
+ elif self.CM.Env["warn-inactive"]:
+ if quorum or not resource.needs_quorum:
+ self.CM.log("WARN: Resource %s not served anywhere (Inactive nodes: %s)"
+ % (resource.id, repr(self.inactive_nodes)))
+ else:
+ self.debug("Resource %s not served anywhere (Inactive nodes: %s)"
+ % (resource.id, repr(self.inactive_nodes)))
+
+ elif quorum or not resource.needs_quorum:
+ self.debug("Resource %s not served anywhere (Inactive nodes: %s)"
+ % (resource.id, repr(self.inactive_nodes)))
+
+ return rc
+
+ def setup(self):
+ self.target = None
+ self.resources = []
+ self.constraints = []
+ self.active_nodes = []
+ self.inactive_nodes = []
+
+ for node in self.CM.Env["nodes"]:
+ if self.CM.ShouldBeStatus[node] == "up":
+ self.active_nodes.append(node)
+ else:
+ self.inactive_nodes.append(node)
+
+ for node in self.CM.Env["nodes"]:
+ if self.target == None and self.CM.ShouldBeStatus[node] == "up":
+ self.target = node
+
+ if not self.target:
+ # TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource
+ # with CIB_file=/path/to/cib.xml even when the cluster isn't running
+ self.debug("No nodes active - skipping %s" % self.name())
+ return 0
+
+ (_, lines) = self.CM.rsh(self.target, "crm_resource -c", verbose=1)
+
+ for line in lines:
+ if re.search("^Resource", line):
+ self.resources.append(AuditResource(self.CM, line))
+ elif re.search("^Constraint", line):
+ self.constraints.append(AuditConstraint(self.CM, line))
+ else:
+ self.CM.log("Unknown entry: %s" % line);
+
+ return 1
+
+ def __call__(self):
+ rc = 1
+
+ if not self.setup():
+ return 1
+
+ quorum = self.CM.HasQuorum(None)
+ for resource in self.resources:
+ if resource.type == "primitive":
+ if self.doResourceAudit(resource, quorum) == 0:
+ rc = 0
+ return rc
+
+ def is_applicable(self):
+ # @TODO Due to long-ago refactoring, this name test would never match,
+ # so this audit (and those derived from it) would never run.
+ # Uncommenting the next lines fixes the name test, but that then
+ # exposes pre-existing bugs that need to be fixed.
+ #if self.CM["Name"] == "crm-corosync":
+ # return 1
+ return 0
+
+
+class GroupAudit(PrimitiveAudit):
+ def name(self):
+ return "GroupAudit"
+
+ def __call__(self):
+ rc = 1
+ if not self.setup():
+ return 1
+
+ for group in self.resources:
+ if group.type == "group":
+ first_match = 1
+ group_location = None
+ for child in self.resources:
+ if child.parent == group.id:
+ nodes = self.CM.ResourceLocation(child.id)
+
+ if first_match and len(nodes) > 0:
+ group_location = nodes[0]
+
+ first_match = 0
+
+ if len(nodes) > 1:
+ rc = 0
+ self.CM.log("Child %s of %s is active more than once: %s"
+ % (child.id, group.id, repr(nodes)))
+
+ elif len(nodes) == 0:
+ # Groups are allowed to be partially active
+ # However we do need to make sure later children aren't running
+ group_location = None
+ self.debug("Child %s of %s is stopped" % (child.id, group.id))
+
+ elif nodes[0] != group_location:
+ rc = 0
+ self.CM.log("Child %s of %s is active on the wrong node (%s) expected %s"
+ % (child.id, group.id, nodes[0], group_location))
+ else:
+ self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0]))
+
+ return rc
+
+
+class CloneAudit(PrimitiveAudit):
+ def name(self):
+ return "CloneAudit"
+
+ def __call__(self):
+ rc = 1
+ if not self.setup():
+ return 1
+
+ for clone in self.resources:
+ if clone.type == "clone":
+ for child in self.resources:
+ if child.parent == clone.id and child.type == "primitive":
+ self.debug("Checking child %s of %s..." % (child.id, clone.id))
+ # Check max and node_max
+ # Obtain with:
+ # crm_resource -g clone_max --meta -r child.id
+ # crm_resource -g clone_node_max --meta -r child.id
+
+ return rc
+
+
+class ColocationAudit(PrimitiveAudit):
+ def name(self):
+ return "ColocationAudit"
+
+ def crm_location(self, resource):
+ (rc, lines) = self.CM.rsh(self.target, "crm_resource -W -r %s -Q"%resource, verbose=1)
+ hosts = []
+ if rc == 0:
+ for line in lines:
+ fields = line.split()
+ hosts.append(fields[0])
+
+ return hosts
+
+ def __call__(self):
+ rc = 1
+ if not self.setup():
+ return 1
+
+ for coloc in self.constraints:
+ if coloc.type == "rsc_colocation":
+ source = self.crm_location(coloc.rsc)
+ target = self.crm_location(coloc.target)
+ if len(source) == 0:
+ self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc))
+ else:
+ for node in source:
+ if not node in target:
+ rc = 0
+ self.CM.log("Colocation audit (%s): %s running on %s (not in %s)"
+ % (coloc.id, coloc.rsc, node, repr(target)))
+ else:
+ self.debug("Colocation audit (%s): %s running on %s (in %s)"
+ % (coloc.id, coloc.rsc, node, repr(target)))
+
+ return rc
+
+
+class ControllerStateAudit(ClusterAudit):
+ def __init__(self, cm):
+ self.CM = cm
+ self.Stats = {"calls":0
+ , "success":0
+ , "failure":0
+ , "skipped":0
+ , "auditfail":0}
+
+ def has_key(self, key):
+ return key in self.Stats
+
+ def __setitem__(self, key, value):
+ self.Stats[key] = value
+
+ def __getitem__(self, key):
+ return self.Stats[key]
+
+ def incr(self, name):
+ '''Increment (or initialize) the value associated with the given name'''
+ if not name in self.Stats:
+ self.Stats[name] = 0
+ self.Stats[name] = self.Stats[name]+1
+
+ def __call__(self):
+ passed = 1
+ up_are_down = 0
+ down_are_up = 0
+ unstable_list = []
+
+ for node in self.CM.Env["nodes"]:
+ should_be = self.CM.ShouldBeStatus[node]
+ rc = self.CM.test_node_CM(node)
+ if rc > 0:
+ if should_be == "down":
+ down_are_up = down_are_up + 1
+ if rc == 1:
+ unstable_list.append(node)
+ elif should_be == "up":
+ up_are_down = up_are_down + 1
+
+ if len(unstable_list) > 0:
+ passed = 0
+ self.CM.log("Cluster is not stable: %d (of %d): %s"
+ % (len(unstable_list), self.CM.upcount(), repr(unstable_list)))
+
+ if up_are_down > 0:
+ passed = 0
+ self.CM.log("%d (of %d) nodes expected to be up were down."
+ % (up_are_down, len(self.CM.Env["nodes"])))
+
+ if down_are_up > 0:
+ passed = 0
+ self.CM.log("%d (of %d) nodes expected to be down were up."
+ % (down_are_up, len(self.CM.Env["nodes"])))
+
+ return passed
+
+ def name(self):
+ return "ControllerStateAudit"
+
+ def is_applicable(self):
+ # @TODO Due to long-ago refactoring, this name test would never match,
+ # so this audit (and those derived from it) would never run.
+ # Uncommenting the next lines fixes the name test, but that then
+ # exposes pre-existing bugs that need to be fixed.
+ #if self.CM["Name"] == "crm-corosync":
+ # return 1
+ return 0
+
+
+class CIBAudit(ClusterAudit):
+ def __init__(self, cm):
+ self.CM = cm
+ self.Stats = {"calls":0
+ , "success":0
+ , "failure":0
+ , "skipped":0
+ , "auditfail":0}
+
+ def has_key(self, key):
+ return key in self.Stats
+
+ def __setitem__(self, key, value):
+ self.Stats[key] = value
+
+ def __getitem__(self, key):
+ return self.Stats[key]
+
+ def incr(self, name):
+ '''Increment (or initialize) the value associated with the given name'''
+ if not name in self.Stats:
+ self.Stats[name] = 0
+ self.Stats[name] = self.Stats[name]+1
+
+ def __call__(self):
+ passed = 1
+ ccm_partitions = self.CM.find_partitions()
+
+ if len(ccm_partitions) == 0:
+ self.debug("\tNo partitions to audit")
+ return 1
+
+ for partition in ccm_partitions:
+ self.debug("\tAuditing CIB consistency for: %s" % partition)
+ partition_passed = 0
+ if self.audit_cib_contents(partition) == 0:
+ passed = 0
+
+ return passed
+
+ def audit_cib_contents(self, hostlist):
+ passed = 1
+ node0 = None
+ node0_xml = None
+
+ partition_hosts = hostlist.split()
+ for node in partition_hosts:
+ node_xml = self.store_remote_cib(node, node0)
+
+ if node_xml == None:
+ self.CM.log("Could not perform audit: No configuration from %s" % node)
+ passed = 0
+
+ elif node0 == None:
+ node0 = node
+ node0_xml = node_xml
+
+ elif node0_xml == None:
+ self.CM.log("Could not perform audit: No configuration from %s" % node0)
+ passed = 0
+
+ else:
+ (rc, result) = self.CM.rsh(
+ node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1)
+
+ if rc != 0:
+ self.CM.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc))
+ passed = 0
+
+ for line in result:
+ if not re.search("<diff/>", line):
+ passed = 0
+ self.debug("CibDiff[%s-%s]: %s" % (node0, node, line))
+ else:
+ self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line))
+
+# self.CM.rsh(node0, "rm -f %s" % node_xml)
+# self.CM.rsh(node0, "rm -f %s" % node0_xml)
+ return passed
+
+ def store_remote_cib(self, node, target):
+ combined = ""
+ filename = "/tmp/ctsaudit.%s.xml" % node
+
+ if not target:
+ target = node
+
+ (rc, lines) = self.CM.rsh(node, self.CM["CibQuery"], verbose=1)
+ if rc != 0:
+ self.CM.log("Could not retrieve configuration")
+ return None
+
+ self.CM.rsh("localhost", "rm -f %s" % filename)
+ for line in lines:
+ self.CM.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0)
+
+ if self.CM.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0:
+ self.CM.log("Could not store configuration")
+ return None
+ return filename
+
+ def name(self):
+ return "CibAudit"
+
+ def is_applicable(self):
+ # @TODO Due to long-ago refactoring, this name test would never match,
+ # so this audit (and those derived from it) would never run.
+ # Uncommenting the next lines fixes the name test, but that then
+ # exposes pre-existing bugs that need to be fixed.
+ #if self.CM["Name"] == "crm-corosync":
+ # return 1
+ return 0
+
+
+class PartitionAudit(ClusterAudit):
+ def __init__(self, cm):
+ self.CM = cm
+ self.Stats = {"calls":0
+ , "success":0
+ , "failure":0
+ , "skipped":0
+ , "auditfail":0}
+ self.NodeEpoch = {}
+ self.NodeState = {}
+ self.NodeQuorum = {}
+
+ def has_key(self, key):
+ return key in self.Stats
+
+ def __setitem__(self, key, value):
+ self.Stats[key] = value
+
+ def __getitem__(self, key):
+ return self.Stats[key]
+
+ def incr(self, name):
+ '''Increment (or initialize) the value associated with the given name'''
+ if not name in self.Stats:
+ self.Stats[name] = 0
+ self.Stats[name] = self.Stats[name]+1
+
+ def __call__(self):
+ passed = 1
+ ccm_partitions = self.CM.find_partitions()
+
+ if ccm_partitions == None or len(ccm_partitions) == 0:
+ return 1
+
+ self.CM.cluster_stable(double_check=True)
+
+ if len(ccm_partitions) != self.CM.partitions_expected:
+ self.CM.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions))
+ passed = 0
+ for partition in ccm_partitions:
+ self.CM.log("\t %s" % partition)
+
+ for partition in ccm_partitions:
+ partition_passed = 0
+ if self.audit_partition(partition) == 0:
+ passed = 0
+
+ return passed
+
+ def trim_string(self, avalue):
+ if not avalue:
+ return None
+ if len(avalue) > 1:
+ return avalue[:-1]
+
+ def trim2int(self, avalue):
+ if not avalue:
+ return None
+ if len(avalue) > 1:
+ return int(avalue[:-1])
+
+ def audit_partition(self, partition):
+ passed = 1
+ dc_found = []
+ dc_allowed_list = []
+ lowest_epoch = None
+ node_list = partition.split()
+
+ self.debug("Auditing partition: %s" % (partition))
+ for node in node_list:
+ if self.CM.ShouldBeStatus[node] != "up":
+ self.CM.log("Warn: Node %s appeared out of nowhere" % (node))
+ self.CM.ShouldBeStatus[node] = "up"
+ # not in itself a reason to fail the audit (not what we're
+ # checking for in this audit)
+
+ (_, out) = self.CM.rsh(node, self.CM["StatusCmd"] % node, verbose=1)
+ self.NodeState[node] = out[0].strip()
+
+ (_, out) = self.CM.rsh(node, self.CM["EpochCmd"], verbose=1)
+ self.NodeEpoch[node] = out[0].strip()
+
+ (_, out) = self.CM.rsh(node, self.CM["QuorumCmd"], verbose=1)
+ self.NodeQuorum[node] = out[0].strip()
+
+ self.debug("Node %s: %s - %s - %s." % (node, self.NodeState[node], self.NodeEpoch[node], self.NodeQuorum[node]))
+ self.NodeState[node] = self.trim_string(self.NodeState[node])
+ self.NodeEpoch[node] = self.trim2int(self.NodeEpoch[node])
+ self.NodeQuorum[node] = self.trim_string(self.NodeQuorum[node])
+
+ if not self.NodeEpoch[node]:
+ self.CM.log("Warn: Node %s dissappeared: cant determin epoch" % (node))
+ self.CM.ShouldBeStatus[node] = "down"
+ # not in itself a reason to fail the audit (not what we're
+ # checking for in this audit)
+ elif lowest_epoch == None or self.NodeEpoch[node] < lowest_epoch:
+ lowest_epoch = self.NodeEpoch[node]
+
+ if not lowest_epoch:
+ self.CM.log("Lowest epoch not determined in %s" % (partition))
+ passed = 0
+
+ for node in node_list:
+ if self.CM.ShouldBeStatus[node] == "up":
+ if self.CM.is_node_dc(node, self.NodeState[node]):
+ dc_found.append(node)
+ if self.NodeEpoch[node] == lowest_epoch:
+ self.debug("%s: OK" % node)
+ elif not self.NodeEpoch[node]:
+ self.debug("Check on %s ignored: no node epoch" % node)
+ elif not lowest_epoch:
+ self.debug("Check on %s ignored: no lowest epoch" % node)
+ else:
+ self.CM.log("DC %s is not the oldest node (%d vs. %d)"
+ % (node, self.NodeEpoch[node], lowest_epoch))
+ passed = 0
+
+ if len(dc_found) == 0:
+ self.CM.log("DC not found on any of the %d allowed nodes: %s (of %s)"
+ % (len(dc_allowed_list), str(dc_allowed_list), str(node_list)))
+
+ elif len(dc_found) > 1:
+ self.CM.log("%d DCs (%s) found in cluster partition: %s"
+ % (len(dc_found), str(dc_found), str(node_list)))
+ passed = 0
+
+ if passed == 0:
+ for node in node_list:
+ if self.CM.ShouldBeStatus[node] == "up":
+ self.CM.log("epoch %s : %s"
+ % (self.NodeEpoch[node], self.NodeState[node]))
+
+ return passed
+
+ def name(self):
+ return "PartitionAudit"
+
+ def is_applicable(self):
+ # @TODO Due to long-ago refactoring, this name test would never match,
+ # so this audit (and those derived from it) would never run.
+ # Uncommenting the next lines fixes the name test, but that then
+ # exposes pre-existing bugs that need to be fixed.
+ #if self.CM["Name"] == "crm-corosync":
+ # return 1
+ return 0
+
+AllAuditClasses.append(DiskAudit)
+AllAuditClasses.append(FileAudit)
+AllAuditClasses.append(LogAudit)
+AllAuditClasses.append(ControllerStateAudit)
+AllAuditClasses.append(PartitionAudit)
+AllAuditClasses.append(PrimitiveAudit)
+AllAuditClasses.append(GroupAudit)
+AllAuditClasses.append(CloneAudit)
+AllAuditClasses.append(ColocationAudit)
+AllAuditClasses.append(CIBAudit)
+
+
+def AuditList(cm):
+ result = []
+ for auditclass in AllAuditClasses:
+ a = auditclass(cm)
+ if a.is_applicable():
+ result.append(a)
+ return result
diff --git a/cts/lab/CTSlab.py.in b/cts/lab/CTSlab.py.in
new file mode 100644
index 0000000..bd990fd
--- /dev/null
+++ b/cts/lab/CTSlab.py.in
@@ -0,0 +1,135 @@
+#!@PYTHON@
+""" Command-line interface to Pacemaker's Cluster Test Suite (CTS)
+"""
+
+__copyright__ = "Copyright 2001-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import sys, signal, os
+
+pdir = os.path.dirname(sys.path[0])
+sys.path.insert(0, pdir) # So that things work from the source directory
+
+try:
+ from cts.CM_corosync import *
+ from cts.CTSaudits import AuditList
+ from cts.CTStests import TestList
+ from cts.CTSscenarios import *
+
+ from pacemaker._cts.CTS import CtsLab
+ from pacemaker._cts.logging import LogFactory
+except ImportError as e:
+ sys.stderr.write("abort: %s\n" % e)
+ sys.stderr.write("check your install and PYTHONPATH; couldn't find cts libraries in:\n%s\n" %
+ ' '.join(sys.path))
+ sys.exit(1)
+
+# These are globals so they can be used by the signal handler.
+scenario = None
+LogFactory().add_stderr()
+
+
+def sig_handler(signum, frame) :
+ LogFactory().log("Interrupted by signal %d"%signum)
+ if scenario: scenario.summarize()
+ if signum == 15 :
+ if scenario: scenario.TearDown()
+ sys.exit(1)
+
+
+def plural_s(n, uppercase=False):
+ if n == 1:
+ return ""
+ elif uppercase:
+ return "S"
+ else:
+ return "s"
+
+
+if __name__ == '__main__':
+
+ Environment = CtsLab(sys.argv[1:])
+ NumIter = Environment["iterations"]
+ Tests = []
+
+ # Set the signal handler
+ signal.signal(15, sig_handler)
+ signal.signal(10, sig_handler)
+
+ # Create the Cluster Manager object
+ cm = None
+ if Environment["Stack"] == "corosync 2+":
+ cm = crm_corosync()
+
+ else:
+ LogFactory().log("Unknown stack: "+Environment["stack"])
+ sys.exit(1)
+
+ if Environment["TruncateLog"]:
+ if Environment["OutputFile"] is None:
+ LogFactory().log("Ignoring truncate request because no output file specified")
+ else:
+ LogFactory().log("Truncating %s" % Environment["OutputFile"])
+ with open(Environment["OutputFile"], "w") as outputfile:
+ outputfile.truncate(0)
+
+ Audits = AuditList(cm)
+
+ if Environment["ListTests"]:
+ Tests = TestList(cm, Audits)
+ LogFactory().log("Total %d tests"%len(Tests))
+ for test in Tests :
+ LogFactory().log(str(test.name));
+ sys.exit(0)
+
+ elif len(Environment["tests"]) == 0:
+ Tests = TestList(cm, Audits)
+
+ else:
+ Chosen = Environment["tests"]
+ for TestCase in Chosen:
+ match = None
+
+ for test in TestList(cm, Audits):
+ if test.name == TestCase:
+ match = test
+
+ if not match:
+ LogFactory().log("--choose: No applicable/valid tests chosen")
+ sys.exit(1)
+ else:
+ Tests.append(match)
+
+ # Scenario selection
+ if Environment["scenario"] == "basic-sanity":
+ scenario = RandomTests(cm, [ BasicSanityCheck(Environment) ], Audits, Tests)
+
+ elif Environment["scenario"] == "all-once":
+ NumIter = len(Tests)
+ scenario = AllOnce(
+ cm, [ BootCluster(Environment) ], Audits, Tests)
+ elif Environment["scenario"] == "sequence":
+ scenario = Sequence(
+ cm, [ BootCluster(Environment) ], Audits, Tests)
+ elif Environment["scenario"] == "boot":
+ scenario = Boot(cm, [ LeaveBooted(Environment)], Audits, [])
+ else:
+ scenario = RandomTests(
+ cm, [ BootCluster(Environment) ], Audits, Tests)
+
+ LogFactory().log(">>>>>>>>>>>>>>>> BEGINNING " + repr(NumIter) + " TEST" + plural_s(NumIter, True) + " ")
+ LogFactory().log("Stack: %s (%s)" % (Environment["Stack"], Environment["Name"]))
+ LogFactory().log("Schema: %s" % Environment["Schema"])
+ LogFactory().log("Scenario: %s" % scenario.__doc__)
+ LogFactory().log("CTS Exerciser: %s" % Environment["cts-exerciser"])
+ LogFactory().log("CTS Logfile: %s" % Environment["OutputFile"])
+ LogFactory().log("Random Seed: %s" % Environment["RandSeed"])
+ LogFactory().log("Syslog variant: %s" % Environment["syslogd"].strip())
+ LogFactory().log("System log files: %s" % Environment["LogFileName"])
+ if Environment.has_key("IPBase"):
+ LogFactory().log("Base IP for resources: %s" % Environment["IPBase"])
+ LogFactory().log("Cluster starts at boot: %d" % Environment["at-boot"])
+
+ Environment.dump()
+ rc = Environment.run(scenario, NumIter)
+ sys.exit(rc)
diff --git a/cts/lab/CTSscenarios.py b/cts/lab/CTSscenarios.py
new file mode 100644
index 0000000..37cb094
--- /dev/null
+++ b/cts/lab/CTSscenarios.py
@@ -0,0 +1,563 @@
+""" Test scenario classes for Pacemaker's Cluster Test Suite (CTS)
+"""
+
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import os
+import re
+import sys
+import time
+
+from cts.CTStests import CTSTest
+from cts.CTSaudits import ClusterAudit
+
+from pacemaker._cts.watcher import LogWatcher
+
+class ScenarioComponent(object):
+
+ def __init__(self, Env):
+ self.Env = Env
+
+ def IsApplicable(self):
+ '''Return True if the current ScenarioComponent is applicable
+ in the given LabEnvironment given to the constructor.
+ '''
+
+ raise ValueError("Abstract Class member (IsApplicable)")
+
+ def SetUp(self, CM):
+ '''Set up the given ScenarioComponent'''
+ raise ValueError("Abstract Class member (Setup)")
+
+ def TearDown(self, CM):
+ '''Tear down (undo) the given ScenarioComponent'''
+ raise ValueError("Abstract Class member (Setup)")
+
+
+class Scenario(object):
+ (
+'''The basic idea of a scenario is that of an ordered list of
+ScenarioComponent objects. Each ScenarioComponent is SetUp() in turn,
+and then after the tests have been run, they are torn down using TearDown()
+(in reverse order).
+
+A Scenario is applicable to a particular cluster manager iff each
+ScenarioComponent is applicable.
+
+A partially set up scenario is torn down if it fails during setup.
+''')
+
+ def __init__(self, ClusterManager, Components, Audits, Tests):
+
+ "Initialize the Scenario from the list of ScenarioComponents"
+
+ self.ClusterManager = ClusterManager
+ self.Components = Components
+ self.Audits = Audits
+ self.Tests = Tests
+
+ self.BadNews = None
+ self.TestSets = []
+ self.Stats = {"success":0, "failure":0, "BadNews":0, "skipped":0}
+ self.Sets = []
+
+ #self.ns=CTS.NodeStatus(self.Env)
+
+ for comp in Components:
+ if not issubclass(comp.__class__, ScenarioComponent):
+ raise ValueError("Init value must be subclass of ScenarioComponent")
+
+ for audit in Audits:
+ if not issubclass(audit.__class__, ClusterAudit):
+ raise ValueError("Init value must be subclass of ClusterAudit")
+
+ for test in Tests:
+ if not issubclass(test.__class__, CTSTest):
+ raise ValueError("Init value must be a subclass of CTSTest")
+
+ def IsApplicable(self):
+ (
+'''A Scenario IsApplicable() iff each of its ScenarioComponents IsApplicable()
+'''
+ )
+
+ for comp in self.Components:
+ if not comp.IsApplicable():
+ return None
+ return True
+
+ def SetUp(self):
+ '''Set up the Scenario. Return TRUE on success.'''
+
+ self.ClusterManager.prepare()
+ self.audit() # Also detects remote/local log config
+ self.ClusterManager.ns.wait_for_all_nodes(self.ClusterManager.Env["nodes"])
+
+ self.audit()
+ self.ClusterManager.install_support()
+
+ self.BadNews = LogWatcher(self.ClusterManager.Env["LogFileName"],
+ self.ClusterManager.templates.get_patterns("BadNews"),
+ self.ClusterManager.Env["nodes"],
+ self.ClusterManager.Env["LogWatcher"],
+ "BadNews", 0)
+ self.BadNews.set_watch() # Call after we've figured out what type of log watching to do in LogAudit
+
+ j = 0
+ while j < len(self.Components):
+ if not self.Components[j].SetUp(self.ClusterManager):
+ # OOPS! We failed. Tear partial setups down.
+ self.audit()
+ self.ClusterManager.log("Tearing down partial setup")
+ self.TearDown(j)
+ return None
+ j = j + 1
+
+ self.audit()
+ return 1
+
+ def TearDown(self, max=None):
+
+ '''Tear Down the Scenario - in reverse order.'''
+
+ if max == None:
+ max = len(self.Components)-1
+ j = max
+ while j >= 0:
+ self.Components[j].TearDown(self.ClusterManager)
+ j = j - 1
+
+ self.audit()
+ self.ClusterManager.install_support("uninstall")
+
+ def incr(self, name):
+ '''Increment (or initialize) the value associated with the given name'''
+ if not name in self.Stats:
+ self.Stats[name] = 0
+ self.Stats[name] = self.Stats[name]+1
+
+ def run(self, Iterations):
+ self.ClusterManager.oprofileStart()
+ try:
+ self.run_loop(Iterations)
+ self.ClusterManager.oprofileStop()
+ except:
+ self.ClusterManager.oprofileStop()
+ raise
+
+ def run_loop(self, Iterations):
+ raise ValueError("Abstract Class member (run_loop)")
+
+ def run_test(self, test, testcount):
+ nodechoice = self.ClusterManager.Env.random_node()
+
+ ret = 1
+ where = ""
+ did_run = 0
+
+ self.ClusterManager.instance_errorstoignore_clear()
+ self.ClusterManager.log(("Running test %s" % test.name).ljust(35) + (" (%s) " % nodechoice).ljust(15) + "[" + ("%d" % testcount).rjust(3) + "]")
+
+ starttime = test.set_timer()
+ if not test.setup(nodechoice):
+ self.ClusterManager.log("Setup failed")
+ ret = 0
+
+ elif not test.canrunnow(nodechoice):
+ self.ClusterManager.log("Skipped")
+ test.skipped()
+
+ else:
+ did_run = 1
+ ret = test(nodechoice)
+
+ if not test.teardown(nodechoice):
+ self.ClusterManager.log("Teardown failed")
+ if self.ClusterManager.Env["continue"]:
+ answer = "Y"
+ else:
+ try:
+ answer = input('Continue? [nY]')
+ except EOFError as e:
+ answer = "n"
+ if answer and answer == "n":
+ raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice))
+ ret = 0
+
+ stoptime = time.time()
+ self.ClusterManager.oprofileSave(testcount)
+
+ elapsed_time = stoptime - starttime
+ test_time = stoptime - test.get_timer()
+ if not test["min_time"]:
+ test["elapsed_time"] = elapsed_time
+ test["min_time"] = test_time
+ test["max_time"] = test_time
+ else:
+ test["elapsed_time"] = test["elapsed_time"] + elapsed_time
+ if test_time < test["min_time"]:
+ test["min_time"] = test_time
+ if test_time > test["max_time"]:
+ test["max_time"] = test_time
+
+ if ret:
+ self.incr("success")
+ test.log_timer()
+ else:
+ self.incr("failure")
+ self.ClusterManager.statall()
+ did_run = 1 # Force the test count to be incremented anyway so test extraction works
+
+ self.audit(test.errorstoignore())
+ return did_run
+
+ def summarize(self):
+ self.ClusterManager.log("****************")
+ self.ClusterManager.log("Overall Results:" + repr(self.Stats))
+ self.ClusterManager.log("****************")
+
+ stat_filter = {
+ "calls":0,
+ "failure":0,
+ "skipped":0,
+ "auditfail":0,
+ }
+ self.ClusterManager.log("Test Summary")
+ for test in self.Tests:
+ for key in list(stat_filter.keys()):
+ stat_filter[key] = test.Stats[key]
+ self.ClusterManager.log(("Test %s: "%test.name).ljust(25) + " %s"%repr(stat_filter))
+
+ self.ClusterManager.debug("Detailed Results")
+ for test in self.Tests:
+ self.ClusterManager.debug(("Test %s: "%test.name).ljust(25) + " %s"%repr(test.Stats))
+
+ self.ClusterManager.log("<<<<<<<<<<<<<<<< TESTS COMPLETED")
+
+ def audit(self, LocalIgnore=[]):
+ errcount = 0
+ ignorelist = []
+ ignorelist.append("CTS:")
+ ignorelist.extend(LocalIgnore)
+ ignorelist.extend(self.ClusterManager.errorstoignore())
+ ignorelist.extend(self.ClusterManager.instance_errorstoignore())
+
+ # This makes sure everything is stabilized before starting...
+ failed = 0
+ for audit in self.Audits:
+ if not audit():
+ self.ClusterManager.log("Audit " + audit.name() + " FAILED.")
+ failed += 1
+ else:
+ self.ClusterManager.debug("Audit " + audit.name() + " passed.")
+
+ while errcount < 1000:
+ match = None
+ if self.BadNews:
+ match = self.BadNews.look(0)
+
+ if match:
+ add_err = 1
+ for ignore in ignorelist:
+ if add_err == 1 and re.search(ignore, match):
+ add_err = 0
+ if add_err == 1:
+ self.ClusterManager.log("BadNews: " + match)
+ self.incr("BadNews")
+ errcount = errcount + 1
+ else:
+ break
+ else:
+ if self.ClusterManager.Env["continue"]:
+ answer = "Y"
+ else:
+ try:
+ answer = input('Big problems. Continue? [nY]')
+ except EOFError as e:
+ answer = "n"
+ if answer and answer == "n":
+ self.ClusterManager.log("Shutting down.")
+ self.summarize()
+ self.TearDown()
+ raise ValueError("Looks like we hit a BadNews jackpot!")
+
+ if self.BadNews:
+ self.BadNews.end()
+ return failed
+
+
+class AllOnce(Scenario):
+ '''Every Test Once''' # Accessable as __doc__
+ def run_loop(self, Iterations):
+ testcount = 1
+ for test in self.Tests:
+ self.run_test(test, testcount)
+ testcount += 1
+
+
+class RandomTests(Scenario):
+ '''Random Test Execution'''
+ def run_loop(self, Iterations):
+ testcount = 1
+ while testcount <= Iterations:
+ test = self.ClusterManager.Env.random_gen.choice(self.Tests)
+ self.run_test(test, testcount)
+ testcount += 1
+
+
+class BasicSanity(Scenario):
+ '''Basic Cluster Sanity'''
+ def run_loop(self, Iterations):
+ testcount = 1
+ while testcount <= Iterations:
+ test = self.Environment.random_gen.choice(self.Tests)
+ self.run_test(test, testcount)
+ testcount += 1
+
+
+class Sequence(Scenario):
+ '''Named Tests in Sequence'''
+ def run_loop(self, Iterations):
+ testcount = 1
+ while testcount <= Iterations:
+ for test in self.Tests:
+ self.run_test(test, testcount)
+ testcount += 1
+
+
+class Boot(Scenario):
+ '''Start the Cluster'''
+ def run_loop(self, Iterations):
+ testcount = 0
+
+
+class BootCluster(ScenarioComponent):
+ (
+'''BootCluster is the most basic of ScenarioComponents.
+This ScenarioComponent simply starts the cluster manager on all the nodes.
+It is fairly robust as it waits for all nodes to come up before starting
+as they might have been rebooted or crashed for some reason beforehand.
+''')
+ def __init__(self, Env):
+ pass
+
+ def IsApplicable(self):
+ '''BootCluster is so generic it is always Applicable'''
+ return True
+
+ def SetUp(self, CM):
+ '''Basic Cluster Manager startup. Start everything'''
+
+ CM.prepare()
+
+ # Clear out the cobwebs ;-)
+ CM.stopall(verbose=True, force=True)
+
+ # Now start the Cluster Manager on all the nodes.
+ CM.log("Starting Cluster Manager on all nodes.")
+ return CM.startall(verbose=True, quick=True)
+
+ def TearDown(self, CM, force=False):
+ '''Set up the given ScenarioComponent'''
+
+ # Stop the cluster manager everywhere
+
+ CM.log("Stopping Cluster Manager on all nodes")
+ return CM.stopall(verbose=True, force=force)
+
+
+class LeaveBooted(BootCluster):
+ def TearDown(self, CM):
+ '''Set up the given ScenarioComponent'''
+
+ # Stop the cluster manager everywhere
+
+ CM.log("Leaving Cluster running on all nodes")
+ return 1
+
+
+class PingFest(ScenarioComponent):
+ (
+'''PingFest does a flood ping to each node in the cluster from the test machine.
+
+If the LabEnvironment Parameter PingSize is set, it will be used as the size
+of ping packet requested (via the -s option). If it is not set, it defaults
+to 1024 bytes.
+
+According to the manual page for ping:
+ Outputs packets as fast as they come back or one hundred times per
+ second, whichever is more. For every ECHO_REQUEST sent a period ``.''
+ is printed, while for every ECHO_REPLY received a backspace is printed.
+ This provides a rapid display of how many packets are being dropped.
+ Only the super-user may use this option. This can be very hard on a net-
+ work and should be used with caution.
+''' )
+
+ def __init__(self, Env):
+ self.Env = Env
+
+ def IsApplicable(self):
+ '''PingFests are always applicable ;-)
+ '''
+
+ return True
+
+ def SetUp(self, CM):
+ '''Start the PingFest!'''
+
+ self.PingSize = 1024
+ if "PingSize" in list(CM.Env.keys()):
+ self.PingSize = CM.Env["PingSize"]
+
+ CM.log("Starting %d byte flood pings" % self.PingSize)
+
+ self.PingPids = []
+ for node in CM.Env["nodes"]:
+ self.PingPids.append(self._pingchild(node))
+
+ CM.log("Ping PIDs: " + repr(self.PingPids))
+ return 1
+
+ def TearDown(self, CM):
+ '''Stop it right now! My ears are pinging!!'''
+
+ for pid in self.PingPids:
+ if pid != None:
+ CM.log("Stopping ping process %d" % pid)
+ os.kill(pid, signal.SIGKILL)
+
+ def _pingchild(self, node):
+
+ Args = ["ping", "-qfn", "-s", str(self.PingSize), node]
+
+ sys.stdin.flush()
+ sys.stdout.flush()
+ sys.stderr.flush()
+ pid = os.fork()
+
+ if pid < 0:
+ self.Env.log("Cannot fork ping child")
+ return None
+ if pid > 0:
+ return pid
+
+ # Otherwise, we're the child process.
+
+ os.execvp("ping", Args)
+ self.Env.log("Cannot execvp ping: " + repr(Args))
+ sys.exit(1)
+
+
+class BasicSanityCheck(ScenarioComponent):
+ (
+'''
+''')
+
+ def IsApplicable(self):
+ return self.Env["DoBSC"]
+
+ def SetUp(self, CM):
+
+ CM.prepare()
+
+ # Clear out the cobwebs
+ self.TearDown(CM)
+
+ # Now start the Cluster Manager on all the nodes.
+ CM.log("Starting Cluster Manager on BSC node(s).")
+ return CM.startall()
+
+ def TearDown(self, CM):
+ CM.log("Stopping Cluster Manager on BSC node(s).")
+ return CM.stopall()
+
+
+class Benchmark(ScenarioComponent):
+ (
+'''
+''')
+
+ def IsApplicable(self):
+ return self.Env["benchmark"]
+
+ def SetUp(self, CM):
+
+ CM.prepare()
+
+ # Clear out the cobwebs
+ self.TearDown(CM, force=True)
+
+ # Now start the Cluster Manager on all the nodes.
+ CM.log("Starting Cluster Manager on all node(s).")
+ return CM.startall()
+
+ def TearDown(self, CM):
+ CM.log("Stopping Cluster Manager on all node(s).")
+ return CM.stopall()
+
+
+class RollingUpgrade(ScenarioComponent):
+ (
+'''
+Test a rolling upgrade between two versions of the stack
+''')
+
+ def __init__(self, Env):
+ self.Env = Env
+
+ def IsApplicable(self):
+ if not self.Env["rpm-dir"]:
+ return None
+ if not self.Env["current-version"]:
+ return None
+ if not self.Env["previous-version"]:
+ return None
+
+ return True
+
+ def install(self, node, version):
+
+ target_dir = "/tmp/rpm-%s" % version
+ src_dir = "%s/%s" % (self.CM.Env["rpm-dir"], version)
+
+ self.CM.rsh(node, "mkdir -p %s" % target_dir)
+ rc = self.CM.cp("%s/*.rpm %s:%s" % (src_dir, node, target_dir))
+ self.CM.rsh(node, "rpm -Uvh --force %s/*.rpm" % (target_dir))
+
+ return self.success()
+
+ def upgrade(self, node):
+ return self.install(node, self.CM.Env["current-version"])
+
+ def downgrade(self, node):
+ return self.install(node, self.CM.Env["previous-version"])
+
+ def SetUp(self, CM):
+ print(repr(self)+"prepare")
+ CM.prepare()
+
+ # Clear out the cobwebs
+ CM.stopall(force=True)
+
+ CM.log("Downgrading all nodes to %s." % self.Env["previous-version"])
+
+ for node in self.Env["nodes"]:
+ if not self.downgrade(node):
+ CM.log("Couldn't downgrade %s" % node)
+ return None
+
+ return 1
+
+ def TearDown(self, CM):
+ # Stop everything
+ CM.log("Stopping Cluster Manager on Upgrade nodes.")
+ CM.stopall()
+
+ CM.log("Upgrading all nodes to %s." % self.Env["current-version"])
+ for node in self.Env["nodes"]:
+ if not self.upgrade(node):
+ CM.log("Couldn't upgrade %s" % node)
+ return None
+
+ return 1
diff --git a/cts/lab/CTStests.py b/cts/lab/CTStests.py
new file mode 100644
index 0000000..61766ce
--- /dev/null
+++ b/cts/lab/CTStests.py
@@ -0,0 +1,3178 @@
+""" Test-specific classes for Pacemaker's Cluster Test Suite (CTS)
+"""
+
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+#
+# SPECIAL NOTE:
+#
+# Tests may NOT implement any cluster-manager-specific code in them.
+# EXTEND the ClusterManager object to provide the base capabilities
+# the test needs if you need to do something that the current CM classes
+# do not. Otherwise you screw up the whole point of the object structure
+# in CTS.
+#
+# Thank you.
+#
+
+import os
+import re
+import time
+import subprocess
+import tempfile
+
+from stat import *
+from cts.CTSaudits import *
+
+from pacemaker import BuildOptions
+from pacemaker._cts.CTS import NodeStatus
+from pacemaker._cts.environment import EnvFactory
+from pacemaker._cts.logging import LogFactory
+from pacemaker._cts.patterns import PatternSelector
+from pacemaker._cts.remote import RemoteFactory
+from pacemaker._cts.watcher import LogWatcher
+
+AllTestClasses = [ ]
+
+
+class CTSTest(object):
+ '''
+ A Cluster test.
+ We implement the basic set of properties and behaviors for a generic
+ cluster test.
+
+ Cluster tests track their own statistics.
+ We keep each of the kinds of counts we track as separate {name,value}
+ pairs.
+ '''
+
+ def __init__(self, cm):
+ #self.name="the unnamed test"
+ self.Stats = {"calls":0
+ , "success":0
+ , "failure":0
+ , "skipped":0
+ , "auditfail":0}
+
+# if not issubclass(cm.__class__, ClusterManager):
+# raise ValueError("Must be a ClusterManager object")
+ self.CM = cm
+ self.Env = EnvFactory().getInstance()
+ self.rsh = RemoteFactory().getInstance()
+ self.logger = LogFactory()
+ self.templates = PatternSelector(cm["Name"])
+ self.Audits = []
+ self.timeout = 120
+ self.passed = 1
+ self.is_loop = 0
+ self.is_unsafe = 0
+ self.is_experimental = 0
+ self.is_container = 0
+ self.is_valgrind = 0
+ self.benchmark = 0 # which tests to benchmark
+ self.timer = {} # timers
+
+ def log(self, args):
+ self.logger.log(args)
+
+ def debug(self, args):
+ self.logger.debug(args)
+
+ def has_key(self, key):
+ return key in self.Stats
+
+ def __setitem__(self, key, value):
+ self.Stats[key] = value
+
+ def __getitem__(self, key):
+ if str(key) == "0":
+ raise ValueError("Bad call to 'foo in X', should reference 'foo in X.Stats' instead")
+
+ if key in self.Stats:
+ return self.Stats[key]
+ return None
+
+ def log_mark(self, msg):
+ self.debug("MARK: test %s %s %d" % (self.name,msg,time.time()))
+ return
+
+ def get_timer(self,key = "test"):
+ try: return self.timer[key]
+ except: return 0
+
+ def set_timer(self,key = "test"):
+ self.timer[key] = time.time()
+ return self.timer[key]
+
+ def log_timer(self,key = "test"):
+ elapsed = 0
+ if key in self.timer:
+ elapsed = time.time() - self.timer[key]
+ s = key == "test" and self.name or "%s:%s" % (self.name,key)
+ self.debug("%s runtime: %.2f" % (s, elapsed))
+ del self.timer[key]
+ return elapsed
+
+ def incr(self, name):
+ '''Increment (or initialize) the value associated with the given name'''
+ if not name in self.Stats:
+ self.Stats[name] = 0
+ self.Stats[name] = self.Stats[name]+1
+
+ # Reset the test passed boolean
+ if name == "calls":
+ self.passed = 1
+
+ def failure(self, reason="none"):
+ '''Increment the failure count'''
+ self.passed = 0
+ self.incr("failure")
+ self.logger.log(("Test %s" % self.name).ljust(35) + " FAILED: %s" % reason)
+ return None
+
+ def success(self):
+ '''Increment the success count'''
+ self.incr("success")
+ return 1
+
+ def skipped(self):
+ '''Increment the skipped count'''
+ self.incr("skipped")
+ return 1
+
+ def __call__(self, node):
+ '''Perform the given test'''
+ raise ValueError("Abstract Class member (__call__)")
+ self.incr("calls")
+ return self.failure()
+
+ def audit(self):
+ passed = 1
+ if len(self.Audits) > 0:
+ for audit in self.Audits:
+ if not audit():
+ self.logger.log("Internal %s Audit %s FAILED." % (self.name, audit.name()))
+ self.incr("auditfail")
+ passed = 0
+ return passed
+
+ def setup(self, node):
+ '''Setup the given test'''
+ return self.success()
+
+ def teardown(self, node):
+ '''Tear down the given test'''
+ return self.success()
+
+ def create_watch(self, patterns, timeout, name=None):
+ if not name:
+ name = self.name
+ return LogWatcher(self.Env["LogFileName"], patterns, self.Env["nodes"], self.Env["LogWatcher"], name, timeout)
+
+ def local_badnews(self, prefix, watch, local_ignore=[]):
+ errcount = 0
+ if not prefix:
+ prefix = "LocalBadNews:"
+
+ ignorelist = []
+ ignorelist.append(" CTS: ")
+ ignorelist.append(prefix)
+ ignorelist.extend(local_ignore)
+
+ while errcount < 100:
+ match = watch.look(0)
+ if match:
+ add_err = 1
+ for ignore in ignorelist:
+ if add_err == 1 and re.search(ignore, match):
+ add_err = 0
+ if add_err == 1:
+ self.logger.log(prefix + " " + match)
+ errcount = errcount + 1
+ else:
+ break
+ else:
+ self.logger.log("Too many errors!")
+
+ watch.end()
+ return errcount
+
+ def is_applicable(self):
+ return self.is_applicable_common()
+
+ def is_applicable_common(self):
+ '''Return True if we are applicable in the current test configuration'''
+ #raise ValueError("Abstract Class member (is_applicable)")
+
+ if self.is_loop and not self.Env["loop-tests"]:
+ return False
+ elif self.is_unsafe and not self.Env["unsafe-tests"]:
+ return False
+ elif self.is_valgrind and not self.Env["valgrind-tests"]:
+ return False
+ elif self.is_experimental and not self.Env["experimental-tests"]:
+ return False
+ elif self.is_container and not self.Env["container-tests"]:
+ return False
+ elif self.Env["benchmark"] and self.benchmark == 0:
+ return False
+
+ return True
+
+ def find_ocfs2_resources(self, node):
+ self.r_o2cb = None
+ self.r_ocfs2 = []
+
+ (_, lines) = self.rsh(node, "crm_resource -c", verbose=1)
+ for line in lines:
+ if re.search("^Resource", line):
+ r = AuditResource(self.CM, line)
+ if r.rtype == "o2cb" and r.parent != "NA":
+ self.debug("Found o2cb: %s" % self.r_o2cb)
+ self.r_o2cb = r.parent
+ if re.search("^Constraint", line):
+ c = AuditConstraint(self.CM, line)
+ if c.type == "rsc_colocation" and c.target == self.r_o2cb:
+ self.r_ocfs2.append(c.rsc)
+
+ self.debug("Found ocfs2 filesystems: %s" % repr(self.r_ocfs2))
+ return len(self.r_ocfs2)
+
+ def canrunnow(self, node):
+ '''Return TRUE if we can meaningfully run right now'''
+ return 1
+
+ def errorstoignore(self):
+ '''Return list of errors which are 'normal' and should be ignored'''
+ return []
+
+
+class StopTest(CTSTest):
+ '''Stop (deactivate) the cluster manager on a node'''
+ def __init__(self, cm):
+ CTSTest.__init__(self, cm)
+ self.name = "Stop"
+
+ def __call__(self, node):
+ '''Perform the 'stop' test. '''
+ self.incr("calls")
+ if self.CM.ShouldBeStatus[node] != "up":
+ return self.skipped()
+
+ patterns = []
+ # Technically we should always be able to notice ourselves stopping
+ patterns.append(self.templates["Pat:We_stopped"] % node)
+
+ # Any active node needs to notice this one left
+ # (note that this won't work if we have multiple partitions)
+ for other in self.Env["nodes"]:
+ if self.CM.ShouldBeStatus[other] == "up" and other != node:
+ patterns.append(self.templates["Pat:They_stopped"] %(other, self.CM.key_for_node(node)))
+ #self.debug("Checking %s will notice %s left"%(other, node))
+
+ watch = self.create_watch(patterns, self.Env["DeadTime"])
+ watch.set_watch()
+
+ if node == self.CM.OurNode:
+ self.incr("us")
+ else:
+ if self.CM.upcount() <= 1:
+ self.incr("all")
+ else:
+ self.incr("them")
+
+ self.CM.StopaCM(node)
+ watch_result = watch.look_for_all()
+
+ failreason = None
+ UnmatchedList = "||"
+ if watch.unmatched:
+ (_, output) = self.rsh(node, "/bin/ps axf", verbose=1)
+ for line in output:
+ self.debug(line)
+
+ (_, output) = self.rsh(node, "/usr/sbin/dlm_tool dump 2>/dev/null", verbose=1)
+ for line in output:
+ self.debug(line)
+
+ for regex in watch.unmatched:
+ self.logger.log ("ERROR: Shutdown pattern not found: %s" % (regex))
+ UnmatchedList += regex + "||";
+ failreason = "Missing shutdown pattern"
+
+ self.CM.cluster_stable(self.Env["DeadTime"])
+
+ if not watch.unmatched or self.CM.upcount() == 0:
+ return self.success()
+
+ if len(watch.unmatched) >= self.CM.upcount():
+ return self.failure("no match against (%s)" % UnmatchedList)
+
+ if failreason == None:
+ return self.success()
+ else:
+ return self.failure(failreason)
+#
+# We don't register StopTest because it's better when called by
+# another test...
+#
+
+
+class StartTest(CTSTest):
+ '''Start (activate) the cluster manager on a node'''
+ def __init__(self, cm, debug=None):
+ CTSTest.__init__(self,cm)
+ self.name = "start"
+ self.debug = debug
+
+ def __call__(self, node):
+ '''Perform the 'start' test. '''
+ self.incr("calls")
+
+ if self.CM.upcount() == 0:
+ self.incr("us")
+ else:
+ self.incr("them")
+
+ if self.CM.ShouldBeStatus[node] != "down":
+ return self.skipped()
+ elif self.CM.StartaCM(node):
+ return self.success()
+ else:
+ return self.failure("Startup %s on node %s failed"
+ % (self.Env["Name"], node))
+
+#
+# We don't register StartTest because it's better when called by
+# another test...
+#
+
+
+class FlipTest(CTSTest):
+ '''If it's running, stop it. If it's stopped start it.
+ Overthrow the status quo...
+ '''
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "Flip"
+ self.start = StartTest(cm)
+ self.stop = StopTest(cm)
+
+ def __call__(self, node):
+ '''Perform the 'Flip' test. '''
+ self.incr("calls")
+ if self.CM.ShouldBeStatus[node] == "up":
+ self.incr("stopped")
+ ret = self.stop(node)
+ type = "up->down"
+ # Give the cluster time to recognize it's gone...
+ time.sleep(self.Env["StableTime"])
+ elif self.CM.ShouldBeStatus[node] == "down":
+ self.incr("started")
+ ret = self.start(node)
+ type = "down->up"
+ else:
+ return self.skipped()
+
+ self.incr(type)
+ if ret:
+ return self.success()
+ else:
+ return self.failure("%s failure" % type)
+
+# Register FlipTest as a good test to run
+AllTestClasses.append(FlipTest)
+
+
+class RestartTest(CTSTest):
+ '''Stop and restart a node'''
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "Restart"
+ self.start = StartTest(cm)
+ self.stop = StopTest(cm)
+ self.benchmark = 1
+
+ def __call__(self, node):
+ '''Perform the 'restart' test. '''
+ self.incr("calls")
+
+ self.incr("node:" + node)
+
+ ret1 = 1
+ if self.CM.StataCM(node):
+ self.incr("WasStopped")
+ if not self.start(node):
+ return self.failure("start (setup) failure: "+node)
+
+ self.set_timer()
+ if not self.stop(node):
+ return self.failure("stop failure: "+node)
+ if not self.start(node):
+ return self.failure("start failure: "+node)
+ return self.success()
+
+# Register RestartTest as a good test to run
+AllTestClasses.append(RestartTest)
+
+
+class StonithdTest(CTSTest):
+ def __init__(self, cm):
+ CTSTest.__init__(self, cm)
+ self.name = "Stonithd"
+ self.startall = SimulStartLite(cm)
+ self.benchmark = 1
+
+ def __call__(self, node):
+ self.incr("calls")
+ if len(self.Env["nodes"]) < 2:
+ return self.skipped()
+
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ is_dc = self.CM.is_node_dc(node)
+
+ watchpats = []
+ watchpats.append(self.templates["Pat:Fencing_ok"] % node)
+ watchpats.append(self.templates["Pat:NodeFenced"] % node)
+
+ if not self.Env["at-boot"]:
+ self.debug("Expecting %s to stay down" % node)
+ self.CM.ShouldBeStatus[node] = "down"
+ else:
+ self.debug("Expecting %s to come up again %d" % (node, self.Env["at-boot"]))
+ watchpats.append("%s.* S_STARTING -> S_PENDING" % node)
+ watchpats.append("%s.* S_PENDING -> S_NOT_DC" % node)
+
+ watch = self.create_watch(watchpats, 30 + self.Env["DeadTime"] + self.Env["StableTime"] + self.Env["StartTime"])
+ watch.set_watch()
+
+ origin = self.Env.random_gen.choice(self.Env["nodes"])
+
+ (rc, _) = self.rsh(origin, "stonith_admin --reboot %s -VVVVVV" % node)
+
+ if rc == 124: # CRM_EX_TIMEOUT
+ # Look for the patterns, usually this means the required
+ # device was running on the node to be fenced - or that
+ # the required devices were in the process of being loaded
+ # and/or moved
+ #
+ # Effectively the node committed suicide so there will be
+ # no confirmation, but pacemaker should be watching and
+ # fence the node again
+
+ self.logger.log("Fencing command on %s to fence %s timed out" % (origin, node))
+
+ elif origin != node and rc != 0:
+ self.debug("Waiting for the cluster to recover")
+ self.CM.cluster_stable()
+
+ self.debug("Waiting for fenced node to come back up")
+ self.CM.ns.wait_for_all_nodes(self.Env["nodes"], 600)
+
+ self.logger.log("Fencing command on %s failed to fence %s (rc=%d)" % (origin, node, rc))
+
+ elif origin == node and rc != 255:
+ # 255 == broken pipe, ie. the node was fenced as expected
+ self.logger.log("Locally originated fencing returned %d" % rc)
+
+ self.set_timer("fence")
+ matched = watch.look_for_all()
+ self.log_timer("fence")
+ self.set_timer("reform")
+ if watch.unmatched:
+ self.logger.log("Patterns not found: " + repr(watch.unmatched))
+
+ self.debug("Waiting for the cluster to recover")
+ self.CM.cluster_stable()
+
+ self.debug("Waiting for fenced node to come back up")
+ self.CM.ns.wait_for_all_nodes(self.Env["nodes"], 600)
+
+ self.debug("Waiting for the cluster to re-stabilize with all nodes")
+ is_stable = self.CM.cluster_stable(self.Env["StartTime"])
+
+ if not matched:
+ return self.failure("Didn't find all expected patterns")
+ elif not is_stable:
+ return self.failure("Cluster did not become stable")
+
+ self.log_timer("reform")
+ return self.success()
+
+ def errorstoignore(self):
+ return [
+ self.templates["Pat:Fencing_start"] % ".*",
+ self.templates["Pat:Fencing_ok"] % ".*",
+ self.templates["Pat:Fencing_active"],
+ r"error.*: Operation 'reboot' targeting .* by .* for stonith_admin.*: Timer expired",
+ ]
+
+ def is_applicable(self):
+ if not self.is_applicable_common():
+ return False
+
+ if "DoFencing" in list(self.Env.keys()):
+ return self.Env["DoFencing"]
+
+ return True
+
+AllTestClasses.append(StonithdTest)
+
+
+class StartOnebyOne(CTSTest):
+ '''Start all the nodes ~ one by one'''
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "StartOnebyOne"
+ self.stopall = SimulStopLite(cm)
+ self.start = StartTest(cm)
+ self.ns = NodeStatus(cm.Env)
+
+ def __call__(self, dummy):
+ '''Perform the 'StartOnebyOne' test. '''
+ self.incr("calls")
+
+ # We ignore the "node" parameter...
+
+ # Shut down all the nodes...
+ ret = self.stopall(None)
+ if not ret:
+ return self.failure("Test setup failed")
+
+ failed = []
+ self.set_timer()
+ for node in self.Env["nodes"]:
+ if not self.start(node):
+ failed.append(node)
+
+ if len(failed) > 0:
+ return self.failure("Some node failed to start: " + repr(failed))
+
+ return self.success()
+
+# Register StartOnebyOne as a good test to run
+AllTestClasses.append(StartOnebyOne)
+
+
+class SimulStart(CTSTest):
+ '''Start all the nodes ~ simultaneously'''
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "SimulStart"
+ self.stopall = SimulStopLite(cm)
+ self.startall = SimulStartLite(cm)
+
+ def __call__(self, dummy):
+ '''Perform the 'SimulStart' test. '''
+ self.incr("calls")
+
+ # We ignore the "node" parameter...
+
+ # Shut down all the nodes...
+ ret = self.stopall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ if not self.startall(None):
+ return self.failure("Startall failed")
+
+ return self.success()
+
+# Register SimulStart as a good test to run
+AllTestClasses.append(SimulStart)
+
+
+class SimulStop(CTSTest):
+ '''Stop all the nodes ~ simultaneously'''
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "SimulStop"
+ self.startall = SimulStartLite(cm)
+ self.stopall = SimulStopLite(cm)
+
+ def __call__(self, dummy):
+ '''Perform the 'SimulStop' test. '''
+ self.incr("calls")
+
+ # We ignore the "node" parameter...
+
+ # Start up all the nodes...
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ if not self.stopall(None):
+ return self.failure("Stopall failed")
+
+ return self.success()
+
+# Register SimulStop as a good test to run
+AllTestClasses.append(SimulStop)
+
+
+class StopOnebyOne(CTSTest):
+ '''Stop all the nodes in order'''
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "StopOnebyOne"
+ self.startall = SimulStartLite(cm)
+ self.stop = StopTest(cm)
+
+ def __call__(self, dummy):
+ '''Perform the 'StopOnebyOne' test. '''
+ self.incr("calls")
+
+ # We ignore the "node" parameter...
+
+ # Start up all the nodes...
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ failed = []
+ self.set_timer()
+ for node in self.Env["nodes"]:
+ if not self.stop(node):
+ failed.append(node)
+
+ if len(failed) > 0:
+ return self.failure("Some node failed to stop: " + repr(failed))
+
+ return self.success()
+
+# Register StopOnebyOne as a good test to run
+AllTestClasses.append(StopOnebyOne)
+
+
+class RestartOnebyOne(CTSTest):
+ '''Restart all the nodes in order'''
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "RestartOnebyOne"
+ self.startall = SimulStartLite(cm)
+
+ def __call__(self, dummy):
+ '''Perform the 'RestartOnebyOne' test. '''
+ self.incr("calls")
+
+ # We ignore the "node" parameter...
+
+ # Start up all the nodes...
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ did_fail = []
+ self.set_timer()
+ self.restart = RestartTest(self.CM)
+ for node in self.Env["nodes"]:
+ if not self.restart(node):
+ did_fail.append(node)
+
+ if did_fail:
+ return self.failure("Could not restart %d nodes: %s"
+ % (len(did_fail), repr(did_fail)))
+ return self.success()
+
+# Register StopOnebyOne as a good test to run
+AllTestClasses.append(RestartOnebyOne)
+
+
+class PartialStart(CTSTest):
+ '''Start a node - but tell it to stop before it finishes starting up'''
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "PartialStart"
+ self.startall = SimulStartLite(cm)
+ self.stopall = SimulStopLite(cm)
+ self.stop = StopTest(cm)
+ #self.is_unsafe = 1
+
+ def __call__(self, node):
+ '''Perform the 'PartialStart' test. '''
+ self.incr("calls")
+
+ ret = self.stopall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ watchpats = []
+ watchpats.append("pacemaker-controld.*Connecting to .* cluster infrastructure")
+ watch = self.create_watch(watchpats, self.Env["DeadTime"]+10)
+ watch.set_watch()
+
+ self.CM.StartaCMnoBlock(node)
+ ret = watch.look_for_all()
+ if not ret:
+ self.logger.log("Patterns not found: " + repr(watch.unmatched))
+ return self.failure("Setup of %s failed" % node)
+
+ ret = self.stop(node)
+ if not ret:
+ return self.failure("%s did not stop in time" % node)
+
+ return self.success()
+
+ def errorstoignore(self):
+ '''Return list of errors which should be ignored'''
+
+ # We might do some fencing in the 2-node case if we make it up far enough
+ return [
+ r"Executing reboot fencing operation",
+ r"Requesting fencing \([^)]+\) targeting node ",
+ ]
+
+# Register StopOnebyOne as a good test to run
+AllTestClasses.append(PartialStart)
+
+
+class StandbyTest(CTSTest):
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "Standby"
+ self.benchmark = 1
+
+ self.start = StartTest(cm)
+ self.startall = SimulStartLite(cm)
+
+ # make sure the node is active
+ # set the node to standby mode
+ # check resources, none resource should be running on the node
+ # set the node to active mode
+ # check resouces, resources should have been migrated back (SHOULD THEY?)
+
+ def __call__(self, node):
+
+ self.incr("calls")
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Start all nodes failed")
+
+ self.debug("Make sure node %s is active" % node)
+ if self.CM.StandbyStatus(node) != "off":
+ if not self.CM.SetStandbyMode(node, "off"):
+ return self.failure("can't set node %s to active mode" % node)
+
+ self.CM.cluster_stable()
+
+ status = self.CM.StandbyStatus(node)
+ if status != "off":
+ return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status))
+
+ self.debug("Getting resources running on node %s" % node)
+ rsc_on_node = self.CM.active_resources(node)
+
+ watchpats = []
+ watchpats.append(r"State transition .* -> S_POLICY_ENGINE")
+ watch = self.create_watch(watchpats, self.Env["DeadTime"]+10)
+ watch.set_watch()
+
+ self.debug("Setting node %s to standby mode" % node)
+ if not self.CM.SetStandbyMode(node, "on"):
+ return self.failure("can't set node %s to standby mode" % node)
+
+ self.set_timer("on")
+
+ ret = watch.look_for_all()
+ if not ret:
+ self.logger.log("Patterns not found: " + repr(watch.unmatched))
+ self.CM.SetStandbyMode(node, "off")
+ return self.failure("cluster didn't react to standby change on %s" % node)
+
+ self.CM.cluster_stable()
+
+ status = self.CM.StandbyStatus(node)
+ if status != "on":
+ return self.failure("standby status of %s is [%s] but we expect [on]" % (node, status))
+ self.log_timer("on")
+
+ self.debug("Checking resources")
+ bad_run = self.CM.active_resources(node)
+ if len(bad_run) > 0:
+ rc = self.failure("%s set to standby, %s is still running on it" % (node, repr(bad_run)))
+ self.debug("Setting node %s to active mode" % node)
+ self.CM.SetStandbyMode(node, "off")
+ return rc
+
+ self.debug("Setting node %s to active mode" % node)
+ if not self.CM.SetStandbyMode(node, "off"):
+ return self.failure("can't set node %s to active mode" % node)
+
+ self.set_timer("off")
+ self.CM.cluster_stable()
+
+ status = self.CM.StandbyStatus(node)
+ if status != "off":
+ return self.failure("standby status of %s is [%s] but we expect [off]" % (node, status))
+ self.log_timer("off")
+
+ return self.success()
+
+AllTestClasses.append(StandbyTest)
+
+
+class ValgrindTest(CTSTest):
+ '''Check for memory leaks'''
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "Valgrind"
+ self.stopall = SimulStopLite(cm)
+ self.startall = SimulStartLite(cm)
+ self.is_valgrind = 1
+ self.is_loop = 1
+
+ def setup(self, node):
+ self.incr("calls")
+
+ ret = self.stopall(None)
+ if not ret:
+ return self.failure("Stop all nodes failed")
+
+ # @TODO Edit /etc/sysconfig/pacemaker on all nodes to enable valgrind,
+ # and clear any valgrind logs from previous runs. For now, we rely on
+ # the user to do this manually.
+
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Start all nodes failed")
+
+ return self.success()
+
+ def teardown(self, node):
+ # Return all nodes to normal
+ # @TODO Edit /etc/sysconfig/pacemaker on all nodes to disable valgrind
+ ret = self.stopall(None)
+ if not ret:
+ return self.failure("Stop all nodes failed")
+
+ return self.success()
+
+ def find_leaks(self):
+ # Check for leaks
+ # (no longer used but kept in case feature is restored)
+ leaked = []
+ self.stop = StopTest(self.CM)
+
+ for node in self.Env["nodes"]:
+ rc = self.stop(node)
+ if not rc:
+ self.failure("Couldn't shut down %s" % node)
+
+ (rc, _) = self.rsh(node, "grep -e indirectly.*lost:.*[1-9] -e definitely.*lost:.*[1-9] -e (ERROR|error).*SUMMARY:.*[1-9].*errors %s" % self.logger.logPat)
+ if rc != 1:
+ leaked.append(node)
+ self.failure("Valgrind errors detected on %s" % node)
+ (_, output) = self.rsh(node, "grep -e lost: -e SUMMARY: %s" % self.logger.logPat, verbose=1)
+ for line in output:
+ self.logger.log(line)
+ (_, output) = self.rsh(node, "cat %s" % self.logger.logPat, verbose=1)
+ for line in output:
+ self.debug(line)
+
+ self.rsh(node, "rm -f %s" % self.logger.logPat, verbose=1)
+ return leaked
+
+ def __call__(self, node):
+ #leaked = self.find_leaks()
+ #if len(leaked) > 0:
+ # return self.failure("Nodes %s leaked" % repr(leaked))
+
+ return self.success()
+
+ def errorstoignore(self):
+ '''Return list of errors which should be ignored'''
+ return [
+ r"pacemaker-based.*: \*\*\*\*\*\*\*\*\*\*\*\*\*",
+ r"pacemaker-based.*: .* avoid confusing Valgrind",
+ r"HA_VALGRIND_ENABLED",
+ ]
+
+
+class StandbyLoopTest(ValgrindTest):
+ '''Check for memory leaks by putting a node in and out of standby for an hour'''
+ # @TODO This is not a useful test for memory leaks
+ def __init__(self, cm):
+ ValgrindTest.__init__(self,cm)
+ self.name = "StandbyLoop"
+
+ def __call__(self, node):
+
+ lpc = 0
+ delay = 2
+ failed = 0
+ done = time.time() + self.Env["loop-minutes"] * 60
+ while time.time() <= done and not failed:
+ lpc = lpc + 1
+
+ time.sleep(delay)
+ if not self.CM.SetStandbyMode(node, "on"):
+ self.failure("can't set node %s to standby mode" % node)
+ failed = lpc
+
+ time.sleep(delay)
+ if not self.CM.SetStandbyMode(node, "off"):
+ self.failure("can't set node %s to active mode" % node)
+ failed = lpc
+
+ leaked = self.find_leaks()
+ if failed:
+ return self.failure("Iteration %d failed" % failed)
+ elif len(leaked) > 0:
+ return self.failure("Nodes %s leaked" % repr(leaked))
+
+ return self.success()
+
+#AllTestClasses.append(StandbyLoopTest)
+
+
+class BandwidthTest(CTSTest):
+# Tests should not be cluster-manager-specific
+# If you need to find out cluster manager configuration to do this, then
+# it should be added to the generic cluster manager API.
+ '''Test the bandwidth which the cluster uses'''
+ def __init__(self, cm):
+ CTSTest.__init__(self, cm)
+ self.name = "Bandwidth"
+ self.start = StartTest(cm)
+ self.__setitem__("min",0)
+ self.__setitem__("max",0)
+ self.__setitem__("totalbandwidth",0)
+ (handle, self.tempfile) = tempfile.mkstemp(".cts")
+ os.close(handle)
+ self.startall = SimulStartLite(cm)
+
+ def __call__(self, node):
+ '''Perform the Bandwidth test'''
+ self.incr("calls")
+
+ if self.CM.upcount() < 1:
+ return self.skipped()
+
+ Path = self.CM.InternalCommConfig()
+ if "ip" not in Path["mediatype"]:
+ return self.skipped()
+
+ port = Path["port"][0]
+ port = int(port)
+
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Test setup failed")
+ time.sleep(5) # We get extra messages right after startup.
+
+ fstmpfile = "/var/run/band_estimate"
+ dumpcmd = "tcpdump -p -n -c 102 -i any udp port %d > %s 2>&1" \
+ % (port, fstmpfile)
+
+ (rc, _) = self.rsh(node, dumpcmd)
+ if rc == 0:
+ farfile = "root@%s:%s" % (node, fstmpfile)
+ self.rsh.copy(farfile, self.tempfile)
+ Bandwidth = self.countbandwidth(self.tempfile)
+ if not Bandwidth:
+ self.logger.log("Could not compute bandwidth.")
+ return self.success()
+ intband = int(Bandwidth + 0.5)
+ self.logger.log("...bandwidth: %d bits/sec" % intband)
+ self.Stats["totalbandwidth"] = self.Stats["totalbandwidth"] + Bandwidth
+ if self.Stats["min"] == 0:
+ self.Stats["min"] = Bandwidth
+ if Bandwidth > self.Stats["max"]:
+ self.Stats["max"] = Bandwidth
+ if Bandwidth < self.Stats["min"]:
+ self.Stats["min"] = Bandwidth
+ self.rsh(node, "rm -f %s" % fstmpfile)
+ os.unlink(self.tempfile)
+ return self.success()
+ else:
+ return self.failure("no response from tcpdump command [%d]!" % rc)
+
+ def countbandwidth(self, file):
+ fp = open(file, "r")
+ fp.seek(0)
+ count = 0
+ sum = 0
+ while 1:
+ line = fp.readline()
+ if not line:
+ return None
+ if re.search("udp",line) or re.search("UDP,", line):
+ count = count + 1
+ linesplit = line.split(" ")
+ for j in range(len(linesplit)-1):
+ if linesplit[j] == "udp": break
+ if linesplit[j] == "length:": break
+
+ try:
+ sum = sum + int(linesplit[j+1])
+ except ValueError:
+ self.logger.log("Invalid tcpdump line: %s" % line)
+ return None
+ T1 = linesplit[0]
+ timesplit = T1.split(":")
+ time2split = timesplit[2].split(".")
+ time1 = (int(timesplit[0])*60+int(timesplit[1]))*60+int(time2split[0])+int(time2split[1])*0.000001
+ break
+
+ while count < 100:
+ line = fp.readline()
+ if not line:
+ return None
+ if re.search("udp",line) or re.search("UDP,", line):
+ count = count+1
+ linessplit = line.split(" ")
+ for j in range(len(linessplit)-1):
+ if linessplit[j] == "udp": break
+ if linessplit[j] == "length:": break
+ try:
+ sum = int(linessplit[j+1]) + sum
+ except ValueError:
+ self.logger.log("Invalid tcpdump line: %s" % line)
+ return None
+
+ T2 = linessplit[0]
+ timesplit = T2.split(":")
+ time2split = timesplit[2].split(".")
+ time2 = (int(timesplit[0])*60+int(timesplit[1]))*60+int(time2split[0])+int(time2split[1])*0.000001
+ time = time2-time1
+ if (time <= 0):
+ return 0
+ return int((sum*8)/time)
+
+ def is_applicable(self):
+ '''BandwidthTest never applicable'''
+ return False
+
+AllTestClasses.append(BandwidthTest)
+
+
+###################################################################
+class MaintenanceMode(CTSTest):
+###################################################################
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "MaintenanceMode"
+ self.start = StartTest(cm)
+ self.startall = SimulStartLite(cm)
+ self.max = 30
+ #self.is_unsafe = 1
+ self.benchmark = 1
+ self.action = "asyncmon"
+ self.interval = 0
+ self.rid = "maintenanceDummy"
+
+ def toggleMaintenanceMode(self, node, action):
+ pats = []
+ pats.append(self.templates["Pat:DC_IDLE"])
+
+ # fail the resource right after turning Maintenance mode on
+ # verify it is not recovered until maintenance mode is turned off
+ if action == "On":
+ pats.append(self.templates["Pat:RscOpFail"] % (self.action, self.rid))
+ else:
+ pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid))
+ pats.append(self.templates["Pat:RscOpOK"] % ("start", self.rid))
+
+ watch = self.create_watch(pats, 60)
+ watch.set_watch()
+
+ self.debug("Turning maintenance mode %s" % action)
+ self.rsh(node, self.templates["MaintenanceMode%s" % (action)])
+ if (action == "On"):
+ self.rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self.rid, node))
+
+ self.set_timer("recover%s" % (action))
+ watch.look_for_all()
+ self.log_timer("recover%s" % (action))
+ if watch.unmatched:
+ self.debug("Failed to find patterns when turning maintenance mode %s" % action)
+ return repr(watch.unmatched)
+
+ return ""
+
+ def insertMaintenanceDummy(self, node):
+ pats = []
+ pats.append(("%s.*" % node) + (self.templates["Pat:RscOpOK"] % ("start", self.rid)))
+
+ watch = self.create_watch(pats, 60)
+ watch.set_watch()
+
+ self.CM.AddDummyRsc(node, self.rid)
+
+ self.set_timer("addDummy")
+ watch.look_for_all()
+ self.log_timer("addDummy")
+
+ if watch.unmatched:
+ self.debug("Failed to find patterns when adding maintenance dummy resource")
+ return repr(watch.unmatched)
+ return ""
+
+ def removeMaintenanceDummy(self, node):
+ pats = []
+ pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid))
+
+ watch = self.create_watch(pats, 60)
+ watch.set_watch()
+ self.CM.RemoveDummyRsc(node, self.rid)
+
+ self.set_timer("removeDummy")
+ watch.look_for_all()
+ self.log_timer("removeDummy")
+
+ if watch.unmatched:
+ self.debug("Failed to find patterns when removing maintenance dummy resource")
+ return repr(watch.unmatched)
+ return ""
+
+ def managedRscList(self, node):
+ rscList = []
+ (_, lines) = self.rsh(node, "crm_resource -c", verbose=1)
+ for line in lines:
+ if re.search("^Resource", line):
+ tmp = AuditResource(self.CM, line)
+ if tmp.managed():
+ rscList.append(tmp.id)
+
+ return rscList
+
+ def verifyResources(self, node, rscList, managed):
+ managedList = list(rscList)
+ managed_str = "managed"
+ if not managed:
+ managed_str = "unmanaged"
+
+ (_, lines) = self.rsh(node, "crm_resource -c", verbose=1)
+ for line in lines:
+ if re.search("^Resource", line):
+ tmp = AuditResource(self.CM, line)
+ if managed and not tmp.managed():
+ continue
+ elif not managed and tmp.managed():
+ continue
+ elif managedList.count(tmp.id):
+ managedList.remove(tmp.id)
+
+ if len(managedList) == 0:
+ self.debug("Found all %s resources on %s" % (managed_str, node))
+ return True
+
+ self.logger.log("Could not find all %s resources on %s. %s" % (managed_str, node, managedList))
+ return False
+
+ def __call__(self, node):
+ '''Perform the 'MaintenanceMode' test. '''
+ self.incr("calls")
+ verify_managed = False
+ verify_unmanaged = False
+ failPat = ""
+
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ # get a list of all the managed resources. We use this list
+ # after enabling maintenance mode to verify all managed resources
+ # become un-managed. After maintenance mode is turned off, we use
+ # this list to verify all the resources become managed again.
+ managedResources = self.managedRscList(node)
+ if len(managedResources) == 0:
+ self.logger.log("No managed resources on %s" % node)
+ return self.skipped()
+
+ # insert a fake resource we can fail during maintenance mode
+ # so we can verify recovery does not take place until after maintenance
+ # mode is disabled.
+ failPat = failPat + self.insertMaintenanceDummy(node)
+
+ # toggle maintenance mode ON, then fail dummy resource.
+ failPat = failPat + self.toggleMaintenanceMode(node, "On")
+
+ # verify all the resources are now unmanaged
+ if self.verifyResources(node, managedResources, False):
+ verify_unmanaged = True
+
+ # Toggle maintenance mode OFF, verify dummy is recovered.
+ failPat = failPat + self.toggleMaintenanceMode(node, "Off")
+
+ # verify all the resources are now managed again
+ if self.verifyResources(node, managedResources, True):
+ verify_managed = True
+
+ # Remove our maintenance dummy resource.
+ failPat = failPat + self.removeMaintenanceDummy(node)
+
+ self.CM.cluster_stable()
+
+ if failPat != "":
+ return self.failure("Unmatched patterns: %s" % (failPat))
+ elif verify_unmanaged is False:
+ return self.failure("Failed to verify resources became unmanaged during maintenance mode")
+ elif verify_managed is False:
+ return self.failure("Failed to verify resources switched back to managed after disabling maintenance mode")
+
+ return self.success()
+
+ def errorstoignore(self):
+ '''Return list of errors which should be ignored'''
+ return [
+ r"Updating failcount for %s" % self.rid,
+ r"schedulerd.*: Recover\s+%s\s+\(.*\)" % self.rid,
+ r"Unknown operation: fail",
+ self.templates["Pat:RscOpOK"] % (self.action, self.rid),
+ r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self.rid, self.action, self.interval),
+ ]
+
+AllTestClasses.append(MaintenanceMode)
+
+
+class ResourceRecover(CTSTest):
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "ResourceRecover"
+ self.start = StartTest(cm)
+ self.startall = SimulStartLite(cm)
+ self.max = 30
+ self.rid = None
+ self.rid_alt = None
+ #self.is_unsafe = 1
+ self.benchmark = 1
+
+ # these are the values used for the new LRM API call
+ self.action = "asyncmon"
+ self.interval = 0
+
+ def __call__(self, node):
+ '''Perform the 'ResourceRecover' test. '''
+ self.incr("calls")
+
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ # List all resources active on the node (skip test if none)
+ resourcelist = self.CM.active_resources(node)
+ if len(resourcelist) == 0:
+ self.logger.log("No active resources on %s" % node)
+ return self.skipped()
+
+ # Choose one resource at random
+ rsc = self.choose_resource(node, resourcelist)
+ if rsc is None:
+ return self.failure("Could not get details of resource '%s'" % self.rid)
+ if rsc.id == rsc.clone_id:
+ self.debug("Failing " + rsc.id)
+ else:
+ self.debug("Failing " + rsc.id + " (also known as " + rsc.clone_id + ")")
+
+ # Log patterns to watch for (failure, plus restart if managed)
+ pats = []
+ pats.append(self.templates["Pat:CloneOpFail"] % (self.action, rsc.id, rsc.clone_id))
+ if rsc.managed():
+ pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.rid))
+ if rsc.unique():
+ pats.append(self.templates["Pat:RscOpOK"] % ("start", self.rid))
+ else:
+ # Anonymous clones may get restarted with a different clone number
+ pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*"))
+
+ # Fail resource. (Ideally, we'd fail it twice, to ensure the fail count
+ # is incrementing properly, but it might restart on a different node.
+ # We'd have to temporarily ban it from all other nodes and ensure the
+ # migration-threshold hasn't been reached.)
+ if self.fail_resource(rsc, node, pats) is None:
+ return None # self.failure() already called
+
+ return self.success()
+
+ def choose_resource(self, node, resourcelist):
+ """ Choose a random resource to target """
+
+ self.rid = self.Env.random_gen.choice(resourcelist)
+ self.rid_alt = self.rid
+ (_, lines) = self.rsh(node, "crm_resource -c", verbose=1)
+ for line in lines:
+ if line.startswith("Resource: "):
+ rsc = AuditResource(self.CM, line)
+ if rsc.id == self.rid:
+ # Handle anonymous clones that get renamed
+ self.rid = rsc.clone_id
+ return rsc
+ return None
+
+ def get_failcount(self, node):
+ """ Check the fail count of targeted resource on given node """
+
+ (rc, lines) = self.rsh(node,
+ "crm_failcount --quiet --query --resource %s "
+ "--operation %s --interval %d "
+ "--node %s" % (self.rid, self.action,
+ self.interval, node), verbose=1)
+ if rc != 0 or len(lines) != 1:
+ self.logger.log("crm_failcount on %s failed (%d): %s" % (node, rc,
+ " // ".join(map(str.strip, lines))))
+ return -1
+ try:
+ failcount = int(lines[0])
+ except (IndexError, ValueError):
+ self.logger.log("crm_failcount output on %s unparseable: %s" % (node,
+ ' '.join(lines)))
+ return -1
+ return failcount
+
+ def fail_resource(self, rsc, node, pats):
+ """ Fail the targeted resource, and verify as expected """
+
+ orig_failcount = self.get_failcount(node)
+
+ watch = self.create_watch(pats, 60)
+ watch.set_watch()
+
+ self.rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self.rid, node))
+
+ self.set_timer("recover")
+ watch.look_for_all()
+ self.log_timer("recover")
+
+ self.CM.cluster_stable()
+ recovered = self.CM.ResourceLocation(self.rid)
+
+ if watch.unmatched:
+ return self.failure("Patterns not found: %s" % repr(watch.unmatched))
+
+ elif rsc.unique() and len(recovered) > 1:
+ return self.failure("%s is now active on more than one node: %s"%(self.rid, repr(recovered)))
+
+ elif len(recovered) > 0:
+ self.debug("%s is running on: %s" % (self.rid, repr(recovered)))
+
+ elif rsc.managed():
+ return self.failure("%s was not recovered and is inactive" % self.rid)
+
+ new_failcount = self.get_failcount(node)
+ if new_failcount != (orig_failcount + 1):
+ return self.failure("%s fail count is %d not %d" % (self.rid,
+ new_failcount, orig_failcount + 1))
+
+ return 0 # Anything but None is success
+
+ def errorstoignore(self):
+ '''Return list of errors which should be ignored'''
+ return [
+ r"Updating failcount for %s" % self.rid,
+ r"schedulerd.*: Recover\s+(%s|%s)\s+\(.*\)" % (self.rid, self.rid_alt),
+ r"Unknown operation: fail",
+ self.templates["Pat:RscOpOK"] % (self.action, self.rid),
+ r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self.rid, self.action, self.interval),
+ ]
+
+AllTestClasses.append(ResourceRecover)
+
+
+class ComponentFail(CTSTest):
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "ComponentFail"
+ self.startall = SimulStartLite(cm)
+ self.complist = cm.Components()
+ self.patterns = []
+ self.okerrpatterns = []
+ self.is_unsafe = 1
+
+ def __call__(self, node):
+ '''Perform the 'ComponentFail' test. '''
+ self.incr("calls")
+ self.patterns = []
+ self.okerrpatterns = []
+
+ # start all nodes
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ if not self.CM.cluster_stable(self.Env["StableTime"]):
+ return self.failure("Setup failed - unstable")
+
+ node_is_dc = self.CM.is_node_dc(node, None)
+
+ # select a component to kill
+ chosen = self.Env.random_gen.choice(self.complist)
+ while chosen.dc_only and node_is_dc == 0:
+ chosen = self.Env.random_gen.choice(self.complist)
+
+ self.debug("...component %s (dc=%d)" % (chosen.name, node_is_dc))
+ self.incr(chosen.name)
+
+ if chosen.name != "corosync":
+ self.patterns.append(self.templates["Pat:ChildKilled"] %(node, chosen.name))
+ self.patterns.append(self.templates["Pat:ChildRespawn"] %(node, chosen.name))
+
+ self.patterns.extend(chosen.pats)
+ if node_is_dc:
+ self.patterns.extend(chosen.dc_pats)
+
+ # @TODO this should be a flag in the Component
+ if chosen.name in [ "corosync", "pacemaker-based", "pacemaker-fenced" ]:
+ # Ignore actions for fence devices if fencer will respawn
+ # (their registration will be lost, and probes will fail)
+ self.okerrpatterns = [ self.templates["Pat:Fencing_active"] ]
+ (_, lines) = self.rsh(node, "crm_resource -c", verbose=1)
+ for line in lines:
+ if re.search("^Resource", line):
+ r = AuditResource(self.CM, line)
+ if r.rclass == "stonith":
+ self.okerrpatterns.append(self.templates["Pat:Fencing_recover"] % r.id)
+ self.okerrpatterns.append(self.templates["Pat:Fencing_probe"] % r.id)
+
+ # supply a copy so self.patterns doesn't end up empty
+ tmpPats = []
+ tmpPats.extend(self.patterns)
+ self.patterns.extend(chosen.badnews_ignore)
+
+ # Look for STONITH ops, depending on Env["at-boot"] we might need to change the nodes status
+ stonithPats = []
+ stonithPats.append(self.templates["Pat:Fencing_ok"] % node)
+ stonith = self.create_watch(stonithPats, 0)
+ stonith.set_watch()
+
+ # set the watch for stable
+ watch = self.create_watch(
+ tmpPats, self.Env["DeadTime"] + self.Env["StableTime"] + self.Env["StartTime"])
+ watch.set_watch()
+
+ # kill the component
+ chosen.kill(node)
+
+ self.debug("Waiting for the cluster to recover")
+ self.CM.cluster_stable()
+
+ self.debug("Waiting for any fenced node to come back up")
+ self.CM.ns.wait_for_all_nodes(self.Env["nodes"], 600)
+
+ self.debug("Waiting for the cluster to re-stabilize with all nodes")
+ self.CM.cluster_stable(self.Env["StartTime"])
+
+ self.debug("Checking if %s was shot" % node)
+ shot = stonith.look(60)
+ if shot:
+ self.debug("Found: " + repr(shot))
+ self.okerrpatterns.append(self.templates["Pat:Fencing_start"] % node)
+
+ if not self.Env["at-boot"]:
+ self.CM.ShouldBeStatus[node] = "down"
+
+ # If fencing occurred, chances are many (if not all) the expected logs
+ # will not be sent - or will be lost when the node reboots
+ return self.success()
+
+ # check for logs indicating a graceful recovery
+ matched = watch.look_for_all(allow_multiple_matches=True)
+ if watch.unmatched:
+ self.logger.log("Patterns not found: " + repr(watch.unmatched))
+
+ self.debug("Waiting for the cluster to re-stabilize with all nodes")
+ is_stable = self.CM.cluster_stable(self.Env["StartTime"])
+
+ if not matched:
+ return self.failure("Didn't find all expected %s patterns" % chosen.name)
+ elif not is_stable:
+ return self.failure("Cluster did not become stable after killing %s" % chosen.name)
+
+ return self.success()
+
+ def errorstoignore(self):
+ '''Return list of errors which should be ignored'''
+ # Note that okerrpatterns refers to the last time we ran this test
+ # The good news is that this works fine for us...
+ self.okerrpatterns.extend(self.patterns)
+ return self.okerrpatterns
+
+AllTestClasses.append(ComponentFail)
+
+
+class SplitBrainTest(CTSTest):
+ '''It is used to test split-brain. when the path between the two nodes break
+ check the two nodes both take over the resource'''
+ def __init__(self,cm):
+ CTSTest.__init__(self,cm)
+ self.name = "SplitBrain"
+ self.start = StartTest(cm)
+ self.startall = SimulStartLite(cm)
+ self.is_experimental = 1
+
+ def isolate_partition(self, partition):
+ other_nodes = []
+ other_nodes.extend(self.Env["nodes"])
+
+ for node in partition:
+ try:
+ other_nodes.remove(node)
+ except ValueError:
+ self.logger.log("Node "+node+" not in " + repr(self.Env["nodes"]) + " from " +repr(partition))
+
+ if len(other_nodes) == 0:
+ return 1
+
+ self.debug("Creating partition: " + repr(partition))
+ self.debug("Everyone else: " + repr(other_nodes))
+
+ for node in partition:
+ if not self.CM.isolate_node(node, other_nodes):
+ self.logger.log("Could not isolate %s" % node)
+ return 0
+
+ return 1
+
+ def heal_partition(self, partition):
+ other_nodes = []
+ other_nodes.extend(self.Env["nodes"])
+
+ for node in partition:
+ try:
+ other_nodes.remove(node)
+ except ValueError:
+ self.logger.log("Node "+node+" not in " + repr(self.Env["nodes"]))
+
+ if len(other_nodes) == 0:
+ return 1
+
+ self.debug("Healing partition: " + repr(partition))
+ self.debug("Everyone else: " + repr(other_nodes))
+
+ for node in partition:
+ self.CM.unisolate_node(node, other_nodes)
+
+ def __call__(self, node):
+ '''Perform split-brain test'''
+ self.incr("calls")
+ self.passed = 1
+ partitions = {}
+
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ while 1:
+ # Retry until we get multiple partitions
+ partitions = {}
+ p_max = len(self.Env["nodes"])
+ for node in self.Env["nodes"]:
+ p = self.Env.random_gen.randint(1, p_max)
+ if not p in partitions:
+ partitions[p] = []
+ partitions[p].append(node)
+ p_max = len(list(partitions.keys()))
+ if p_max > 1:
+ break
+ # else, try again
+
+ self.debug("Created %d partitions" % p_max)
+ for key in list(partitions.keys()):
+ self.debug("Partition["+str(key)+"]:\t"+repr(partitions[key]))
+
+ # Disabling STONITH to reduce test complexity for now
+ self.rsh(node, "crm_attribute -V -n stonith-enabled -v false")
+
+ for key in list(partitions.keys()):
+ self.isolate_partition(partitions[key])
+
+ count = 30
+ while count > 0:
+ if len(self.CM.find_partitions()) != p_max:
+ time.sleep(10)
+ else:
+ break
+ else:
+ self.failure("Expected partitions were not created")
+
+ # Target number of partitions formed - wait for stability
+ if not self.CM.cluster_stable():
+ self.failure("Partitioned cluster not stable")
+
+ # Now audit the cluster state
+ self.CM.partitions_expected = p_max
+ if not self.audit():
+ self.failure("Audits failed")
+ self.CM.partitions_expected = 1
+
+ # And heal them again
+ for key in list(partitions.keys()):
+ self.heal_partition(partitions[key])
+
+ # Wait for a single partition to form
+ count = 30
+ while count > 0:
+ if len(self.CM.find_partitions()) != 1:
+ time.sleep(10)
+ count -= 1
+ else:
+ break
+ else:
+ self.failure("Cluster did not reform")
+
+ # Wait for it to have the right number of members
+ count = 30
+ while count > 0:
+ members = []
+
+ partitions = self.CM.find_partitions()
+ if len(partitions) > 0:
+ members = partitions[0].split()
+
+ if len(members) != len(self.Env["nodes"]):
+ time.sleep(10)
+ count -= 1
+ else:
+ break
+ else:
+ self.failure("Cluster did not completely reform")
+
+ # Wait up to 20 minutes - the delay is more preferable than
+ # trying to continue with in a messed up state
+ if not self.CM.cluster_stable(1200):
+ self.failure("Reformed cluster not stable")
+ if self.Env["continue"]:
+ answer = "Y"
+ else:
+ try:
+ answer = input('Continue? [nY]')
+ except EOFError as e:
+ answer = "n"
+ if answer and answer == "n":
+ raise ValueError("Reformed cluster not stable")
+
+ # Turn fencing back on
+ if self.Env["DoFencing"]:
+ self.rsh(node, "crm_attribute -V -D -n stonith-enabled")
+
+ self.CM.cluster_stable()
+
+ if self.passed:
+ return self.success()
+ return self.failure("See previous errors")
+
+ def errorstoignore(self):
+ '''Return list of errors which are 'normal' and should be ignored'''
+ return [
+ r"Another DC detected:",
+ r"(ERROR|error).*: .*Application of an update diff failed",
+ r"pacemaker-controld.*:.*not in our membership list",
+ r"CRIT:.*node.*returning after partition",
+ ]
+
+ def is_applicable(self):
+ if not self.is_applicable_common():
+ return False
+ return len(self.Env["nodes"]) > 2
+
+AllTestClasses.append(SplitBrainTest)
+
+
+class Reattach(CTSTest):
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "Reattach"
+ self.startall = SimulStartLite(cm)
+ self.restart1 = RestartTest(cm)
+ self.stopall = SimulStopLite(cm)
+ self.is_unsafe = 0 # Handled by canrunnow()
+
+ def _is_managed(self, node):
+ (_, is_managed) = self.rsh(node, "crm_attribute -t rsc_defaults -n is-managed -q -G -d true", verbose=1)
+ is_managed = is_managed[0].strip()
+ return is_managed == "true"
+
+ def _set_unmanaged(self, node):
+ self.debug("Disable resource management")
+ self.rsh(node, "crm_attribute -t rsc_defaults -n is-managed -v false")
+
+ def _set_managed(self, node):
+ self.debug("Re-enable resource management")
+ self.rsh(node, "crm_attribute -t rsc_defaults -n is-managed -D")
+
+ def setup(self, node):
+ attempt = 0
+ if not self.startall(None):
+ return None
+
+ # Make sure we are really _really_ stable and that all
+ # resources, including those that depend on transient node
+ # attributes, are started
+ while not self.CM.cluster_stable(double_check=True):
+ if attempt < 5:
+ attempt += 1
+ self.debug("Not stable yet, re-testing")
+ else:
+ self.logger.log("Cluster is not stable")
+ return None
+
+ return 1
+
+ def teardown(self, node):
+
+ # Make sure 'node' is up
+ start = StartTest(self.CM)
+ start(node)
+
+ if not self._is_managed(node):
+ self.logger.log("Attempting to re-enable resource management on %s" % node)
+ self._set_managed(node)
+ self.CM.cluster_stable()
+ if not self._is_managed(node):
+ self.logger.log("Could not re-enable resource management")
+ return 0
+
+ return 1
+
+ def canrunnow(self, node):
+ '''Return TRUE if we can meaningfully run right now'''
+ if self.find_ocfs2_resources(node):
+ self.logger.log("Detach/Reattach scenarios are not possible with OCFS2 services present")
+ return 0
+ return 1
+
+ def __call__(self, node):
+ self.incr("calls")
+
+ pats = []
+ # Conveniently, the scheduler will display this message when disabling
+ # management, even if fencing is not enabled, so we can rely on it.
+ managed = self.create_watch(["No fencing will be done"], 60)
+ managed.set_watch()
+
+ self._set_unmanaged(node)
+
+ if not managed.look_for_all():
+ self.logger.log("Patterns not found: " + repr(managed.unmatched))
+ return self.failure("Resource management not disabled")
+
+ pats = []
+ pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*"))
+ pats.append(self.templates["Pat:RscOpOK"] % ("stop", ".*"))
+ pats.append(self.templates["Pat:RscOpOK"] % ("promote", ".*"))
+ pats.append(self.templates["Pat:RscOpOK"] % ("demote", ".*"))
+ pats.append(self.templates["Pat:RscOpOK"] % ("migrate", ".*"))
+
+ watch = self.create_watch(pats, 60, "ShutdownActivity")
+ watch.set_watch()
+
+ self.debug("Shutting down the cluster")
+ ret = self.stopall(None)
+ if not ret:
+ self._set_managed(node)
+ return self.failure("Couldn't shut down the cluster")
+
+ self.debug("Bringing the cluster back up")
+ ret = self.startall(None)
+ time.sleep(5) # allow ping to update the CIB
+ if not ret:
+ self._set_managed(node)
+ return self.failure("Couldn't restart the cluster")
+
+ if self.local_badnews("ResourceActivity:", watch):
+ self._set_managed(node)
+ return self.failure("Resources stopped or started during cluster restart")
+
+ watch = self.create_watch(pats, 60, "StartupActivity")
+ watch.set_watch()
+
+ # Re-enable resource management (and verify it happened).
+ self._set_managed(node)
+ self.CM.cluster_stable()
+ if not self._is_managed(node):
+ return self.failure("Could not re-enable resource management")
+
+ # Ignore actions for STONITH resources
+ ignore = []
+ (_, lines) = self.rsh(node, "crm_resource -c", verbose=1)
+ for line in lines:
+ if re.search("^Resource", line):
+ r = AuditResource(self.CM, line)
+ if r.rclass == "stonith":
+
+ self.debug("Ignoring start actions for %s" % r.id)
+ ignore.append(self.templates["Pat:RscOpOK"] % ("start", r.id))
+
+ if self.local_badnews("ResourceActivity:", watch, ignore):
+ return self.failure("Resources stopped or started after resource management was re-enabled")
+
+ return ret
+
+ def errorstoignore(self):
+ '''Return list of errors which should be ignored'''
+ return [
+ r"resource( was|s were) active at shutdown",
+ ]
+
+ def is_applicable(self):
+ return True
+
+AllTestClasses.append(Reattach)
+
+
+class SpecialTest1(CTSTest):
+ '''Set up a custom test to cause quorum failure issues for Andrew'''
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "SpecialTest1"
+ self.startall = SimulStartLite(cm)
+ self.restart1 = RestartTest(cm)
+ self.stopall = SimulStopLite(cm)
+
+ def __call__(self, node):
+ '''Perform the 'SpecialTest1' test for Andrew. '''
+ self.incr("calls")
+
+ # Shut down all the nodes...
+ ret = self.stopall(None)
+ if not ret:
+ return self.failure("Could not stop all nodes")
+
+ # Test config recovery when the other nodes come up
+ self.rsh(node, "rm -f " + BuildOptions.CIB_DIR + "/cib*")
+
+ # Start the selected node
+ ret = self.restart1(node)
+ if not ret:
+ return self.failure("Could not start "+node)
+
+ # Start all remaining nodes
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Could not start the remaining nodes")
+
+ return self.success()
+
+ def errorstoignore(self):
+ '''Return list of errors which should be ignored'''
+ # Errors that occur as a result of the CIB being wiped
+ return [
+ r"error.*: v1 patchset error, patch failed to apply: Application of an update diff failed",
+ r"error.*: Resource start-up disabled since no STONITH resources have been defined",
+ r"error.*: Either configure some or disable STONITH with the stonith-enabled option",
+ r"error.*: NOTE: Clusters with shared data need STONITH to ensure data integrity",
+ ]
+
+AllTestClasses.append(SpecialTest1)
+
+
+class HAETest(CTSTest):
+ '''Set up a custom test to cause quorum failure issues for Andrew'''
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "HAETest"
+ self.stopall = SimulStopLite(cm)
+ self.startall = SimulStartLite(cm)
+ self.is_loop = 1
+
+ def setup(self, node):
+ # Start all remaining nodes
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Couldn't start all nodes")
+ return self.success()
+
+ def teardown(self, node):
+ # Stop everything
+ ret = self.stopall(None)
+ if not ret:
+ return self.failure("Couldn't stop all nodes")
+ return self.success()
+
+ def wait_on_state(self, node, resource, expected_clones, attempts=240):
+ while attempts > 0:
+ active = 0
+ (rc, lines) = self.rsh(node, "crm_resource -r %s -W -Q" % resource, verbose=1)
+
+ # Hack until crm_resource does the right thing
+ if rc == 0 and lines:
+ active = len(lines)
+
+ if len(lines) == expected_clones:
+ return 1
+
+ elif rc == 1:
+ self.debug("Resource %s is still inactive" % resource)
+
+ elif rc == 234:
+ self.logger.log("Unknown resource %s" % resource)
+ return 0
+
+ elif rc == 246:
+ self.logger.log("Cluster is inactive")
+ return 0
+
+ elif rc != 0:
+ self.logger.log("Call to crm_resource failed, rc=%d" % rc)
+ return 0
+
+ else:
+ self.debug("Resource %s is active on %d times instead of %d" % (resource, active, expected_clones))
+
+ attempts -= 1
+ time.sleep(1)
+
+ return 0
+
+ def find_dlm(self, node):
+ self.r_dlm = None
+
+ (_, lines) = self.rsh(node, "crm_resource -c", verbose=1)
+ for line in lines:
+ if re.search("^Resource", line):
+ r = AuditResource(self.CM, line)
+ if r.rtype == "controld" and r.parent != "NA":
+ self.debug("Found dlm: %s" % self.r_dlm)
+ self.r_dlm = r.parent
+ return 1
+ return 0
+
+ def find_hae_resources(self, node):
+ self.r_dlm = None
+ self.r_o2cb = None
+ self.r_ocfs2 = []
+
+ if self.find_dlm(node):
+ self.find_ocfs2_resources(node)
+
+ def is_applicable(self):
+ if not self.is_applicable_common():
+ return False
+ if self.Env["Schema"] == "hae":
+ return True
+ return None
+
+
+class HAERoleTest(HAETest):
+ def __init__(self, cm):
+ '''Lars' mount/unmount test for the HA extension. '''
+ HAETest.__init__(self,cm)
+ self.name = "HAERoleTest"
+
+ def change_state(self, node, resource, target):
+ (rc, _) = self.rsh(node, "crm_resource -V -r %s -p target-role -v %s --meta" % (resource, target))
+ return rc
+
+ def __call__(self, node):
+ self.incr("calls")
+ lpc = 0
+ failed = 0
+ delay = 2
+ done = time.time() + self.Env["loop-minutes"]*60
+ self.find_hae_resources(node)
+
+ clone_max = len(self.Env["nodes"])
+ while time.time() <= done and not failed:
+ lpc = lpc + 1
+
+ self.change_state(node, self.r_dlm, "Stopped")
+ if not self.wait_on_state(node, self.r_dlm, 0):
+ self.failure("%s did not go down correctly" % self.r_dlm)
+ failed = lpc
+
+ self.change_state(node, self.r_dlm, "Started")
+ if not self.wait_on_state(node, self.r_dlm, clone_max):
+ self.failure("%s did not come up correctly" % self.r_dlm)
+ failed = lpc
+
+ if not self.wait_on_state(node, self.r_o2cb, clone_max):
+ self.failure("%s did not come up correctly" % self.r_o2cb)
+ failed = lpc
+
+ for fs in self.r_ocfs2:
+ if not self.wait_on_state(node, fs, clone_max):
+ self.failure("%s did not come up correctly" % fs)
+ failed = lpc
+
+ if failed:
+ return self.failure("iteration %d failed" % failed)
+ return self.success()
+
+AllTestClasses.append(HAERoleTest)
+
+
+class HAEStandbyTest(HAETest):
+ '''Set up a custom test to cause quorum failure issues for Andrew'''
+ def __init__(self, cm):
+ HAETest.__init__(self,cm)
+ self.name = "HAEStandbyTest"
+
+ def change_state(self, node, resource, target):
+ (rc, _) = self.rsh(node, "crm_standby -V -l reboot -v %s" % (target))
+ return rc
+
+ def __call__(self, node):
+ self.incr("calls")
+
+ lpc = 0
+ failed = 0
+ done = time.time() + self.Env["loop-minutes"]*60
+ self.find_hae_resources(node)
+
+ clone_max = len(self.Env["nodes"])
+ while time.time() <= done and not failed:
+ lpc = lpc + 1
+
+ self.change_state(node, self.r_dlm, "true")
+ if not self.wait_on_state(node, self.r_dlm, clone_max-1):
+ self.failure("%s did not go down correctly" % self.r_dlm)
+ failed = lpc
+
+ self.change_state(node, self.r_dlm, "false")
+ if not self.wait_on_state(node, self.r_dlm, clone_max):
+ self.failure("%s did not come up correctly" % self.r_dlm)
+ failed = lpc
+
+ if not self.wait_on_state(node, self.r_o2cb, clone_max):
+ self.failure("%s did not come up correctly" % self.r_o2cb)
+ failed = lpc
+
+ for fs in self.r_ocfs2:
+ if not self.wait_on_state(node, fs, clone_max):
+ self.failure("%s did not come up correctly" % fs)
+ failed = lpc
+
+ if failed:
+ return self.failure("iteration %d failed" % failed)
+ return self.success()
+
+AllTestClasses.append(HAEStandbyTest)
+
+
+class NearQuorumPointTest(CTSTest):
+ '''
+ This test brings larger clusters near the quorum point (50%).
+ In addition, it will test doing starts and stops at the same time.
+
+ Here is how I think it should work:
+ - loop over the nodes and decide randomly which will be up and which
+ will be down Use a 50% probability for each of up/down.
+ - figure out what to do to get into that state from the current state
+ - in parallel, bring up those going up and bring those going down.
+ '''
+
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "NearQuorumPoint"
+
+ def __call__(self, dummy):
+ '''Perform the 'NearQuorumPoint' test. '''
+ self.incr("calls")
+ startset = []
+ stopset = []
+
+ stonith = self.CM.prepare_fencing_watcher("NearQuorumPoint")
+ #decide what to do with each node
+ for node in self.Env["nodes"]:
+ action = self.Env.random_gen.choice(["start","stop"])
+ #action = self.Env.random_gen.choice(["start","stop","no change"])
+ if action == "start" :
+ startset.append(node)
+ elif action == "stop" :
+ stopset.append(node)
+
+ self.debug("start nodes:" + repr(startset))
+ self.debug("stop nodes:" + repr(stopset))
+
+ #add search patterns
+ watchpats = [ ]
+ for node in stopset:
+ if self.CM.ShouldBeStatus[node] == "up":
+ watchpats.append(self.templates["Pat:We_stopped"] % node)
+
+ for node in startset:
+ if self.CM.ShouldBeStatus[node] == "down":
+ #watchpats.append(self.templates["Pat:NonDC_started"] % node)
+ watchpats.append(self.templates["Pat:Local_started"] % node)
+ else:
+ for stopping in stopset:
+ if self.CM.ShouldBeStatus[stopping] == "up":
+ watchpats.append(self.templates["Pat:They_stopped"] % (node, self.CM.key_for_node(stopping)))
+
+ if len(watchpats) == 0:
+ return self.skipped()
+
+ if len(startset) != 0:
+ watchpats.append(self.templates["Pat:DC_IDLE"])
+
+ watch = self.create_watch(watchpats, self.Env["DeadTime"]+10)
+
+ watch.set_watch()
+
+ #begin actions
+ for node in stopset:
+ if self.CM.ShouldBeStatus[node] == "up":
+ self.CM.StopaCMnoBlock(node)
+
+ for node in startset:
+ if self.CM.ShouldBeStatus[node] == "down":
+ self.CM.StartaCMnoBlock(node)
+
+ #get the result
+ if watch.look_for_all():
+ self.CM.cluster_stable()
+ self.CM.fencing_cleanup("NearQuorumPoint", stonith)
+ return self.success()
+
+ self.logger.log("Warn: Patterns not found: " + repr(watch.unmatched))
+
+ #get the "bad" nodes
+ upnodes = []
+ for node in stopset:
+ if self.CM.StataCM(node) == 1:
+ upnodes.append(node)
+
+ downnodes = []
+ for node in startset:
+ if self.CM.StataCM(node) == 0:
+ downnodes.append(node)
+
+ self.CM.fencing_cleanup("NearQuorumPoint", stonith)
+ if upnodes == [] and downnodes == []:
+ self.CM.cluster_stable()
+
+ # Make sure they're completely down with no residule
+ for node in stopset:
+ self.rsh(node, self.templates["StopCmd"])
+
+ return self.success()
+
+ if len(upnodes) > 0:
+ self.logger.log("Warn: Unstoppable nodes: " + repr(upnodes))
+
+ if len(downnodes) > 0:
+ self.logger.log("Warn: Unstartable nodes: " + repr(downnodes))
+
+ return self.failure()
+
+ def is_applicable(self):
+ return True
+
+AllTestClasses.append(NearQuorumPointTest)
+
+
+class RollingUpgradeTest(CTSTest):
+ '''Perform a rolling upgrade of the cluster'''
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "RollingUpgrade"
+ self.start = StartTest(cm)
+ self.stop = StopTest(cm)
+ self.stopall = SimulStopLite(cm)
+ self.startall = SimulStartLite(cm)
+
+ def setup(self, node):
+ # Start all remaining nodes
+ ret = self.stopall(None)
+ if not ret:
+ return self.failure("Couldn't stop all nodes")
+
+ for node in self.Env["nodes"]:
+ if not self.downgrade(node, None):
+ return self.failure("Couldn't downgrade %s" % node)
+
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Couldn't start all nodes")
+ return self.success()
+
+ def teardown(self, node):
+ # Stop everything
+ ret = self.stopall(None)
+ if not ret:
+ return self.failure("Couldn't stop all nodes")
+
+ for node in self.Env["nodes"]:
+ if not self.upgrade(node, None):
+ return self.failure("Couldn't upgrade %s" % node)
+
+ return self.success()
+
+ def install(self, node, version, start=1, flags="--force"):
+
+ target_dir = "/tmp/rpm-%s" % version
+ src_dir = "%s/%s" % (self.Env["rpm-dir"], version)
+
+ self.logger.log("Installing %s on %s with %s" % (version, node, flags))
+ if not self.stop(node):
+ return self.failure("stop failure: "+node)
+
+ self.rsh(node, "mkdir -p %s" % target_dir)
+ self.rsh(node, "rm -f %s/*.rpm" % target_dir)
+ (_, lines) = self.rsh(node, "ls -1 %s/*.rpm" % src_dir, verbose=1)
+ for line in lines:
+ line = line[:-1]
+ rc = self.rsh.copy("%s" % (line), "%s:%s/" % (node, target_dir))
+ self.rsh(node, "rpm -Uvh %s %s/*.rpm" % (flags, target_dir))
+
+ if start and not self.start(node):
+ return self.failure("start failure: "+node)
+
+ return self.success()
+
+ def upgrade(self, node, start=1):
+ return self.install(node, self.Env["current-version"], start)
+
+ def downgrade(self, node, start=1):
+ return self.install(node, self.Env["previous-version"], start, "--force --nodeps")
+
+ def __call__(self, node):
+ '''Perform the 'Rolling Upgrade' test. '''
+ self.incr("calls")
+
+ for node in self.Env["nodes"]:
+ if self.upgrade(node):
+ return self.failure("Couldn't upgrade %s" % node)
+
+ self.CM.cluster_stable()
+
+ return self.success()
+
+ def is_applicable(self):
+ if not self.is_applicable_common():
+ return None
+
+ if not "rpm-dir" in list(self.Env.keys()):
+ return None
+ if not "current-version" in list(self.Env.keys()):
+ return None
+ if not "previous-version" in list(self.Env.keys()):
+ return None
+
+ return 1
+
+# Register RestartTest as a good test to run
+AllTestClasses.append(RollingUpgradeTest)
+
+
+class BSC_AddResource(CTSTest):
+ '''Add a resource to the cluster'''
+ def __init__(self, cm):
+ CTSTest.__init__(self, cm)
+ self.name = "AddResource"
+ self.resource_offset = 0
+ self.cib_cmd = """cibadmin -C -o %s -X '%s' """
+
+ def __call__(self, node):
+ self.incr("calls")
+ self.resource_offset = self.resource_offset + 1
+
+ r_id = "bsc-rsc-%s-%d" % (node, self.resource_offset)
+ start_pat = "pacemaker-controld.*%s_start_0.*confirmed.*ok"
+
+ patterns = []
+ patterns.append(start_pat % r_id)
+
+ watch = self.create_watch(patterns, self.Env["DeadTime"])
+ watch.set_watch()
+
+ ip = self.NextIP()
+ if not self.make_ip_resource(node, r_id, "ocf", "IPaddr", ip):
+ return self.failure("Make resource %s failed" % r_id)
+
+ failed = 0
+ watch_result = watch.look_for_all()
+ if watch.unmatched:
+ for regex in watch.unmatched:
+ self.logger.log ("Warn: Pattern not found: %s" % (regex))
+ failed = 1
+
+ if failed:
+ return self.failure("Resource pattern(s) not found")
+
+ if not self.CM.cluster_stable(self.Env["DeadTime"]):
+ return self.failure("Unstable cluster")
+
+ return self.success()
+
+ def NextIP(self):
+ ip = self.Env["IPBase"]
+ if ":" in ip:
+ fields = ip.rpartition(":")
+ fields[2] = str(hex(int(fields[2], 16)+1))
+ print(str(hex(int(f[2], 16)+1)))
+ else:
+ fields = ip.rpartition('.')
+ fields[2] = str(int(fields[2])+1)
+
+ ip = fields[0] + fields[1] + fields[3];
+ self.Env["IPBase"] = ip
+ return ip.strip()
+
+ def make_ip_resource(self, node, id, rclass, type, ip):
+ self.logger.log("Creating %s:%s:%s (%s) on %s" % (rclass,type,id,ip,node))
+ rsc_xml="""
+<primitive id="%s" class="%s" type="%s" provider="heartbeat">
+ <instance_attributes id="%s"><attributes>
+ <nvpair id="%s" name="ip" value="%s"/>
+ </attributes></instance_attributes>
+</primitive>""" % (id, rclass, type, id, id, ip)
+
+ node_constraint = """
+ <rsc_location id="run_%s" rsc="%s">
+ <rule id="pref_run_%s" score="100">
+ <expression id="%s_loc_expr" attribute="#uname" operation="eq" value="%s"/>
+ </rule>
+ </rsc_location>""" % (id, id, id, id, node)
+
+ rc = 0
+ (rc, _) = self.rsh(node, self.cib_cmd % ("constraints", node_constraint), verbose=1)
+ if rc != 0:
+ self.logger.log("Constraint creation failed: %d" % rc)
+ return None
+
+ (rc, _) = self.rsh(node, self.cib_cmd % ("resources", rsc_xml), verbose=1)
+ if rc != 0:
+ self.logger.log("Resource creation failed: %d" % rc)
+ return None
+
+ return 1
+
+ def is_applicable(self):
+ if self.Env["DoBSC"]:
+ return True
+ return None
+
+AllTestClasses.append(BSC_AddResource)
+
+
+class SimulStopLite(CTSTest):
+ '''Stop any active nodes ~ simultaneously'''
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "SimulStopLite"
+
+ def __call__(self, dummy):
+ '''Perform the 'SimulStopLite' setup work. '''
+ self.incr("calls")
+
+ self.debug("Setup: " + self.name)
+
+ # We ignore the "node" parameter...
+ watchpats = [ ]
+
+ for node in self.Env["nodes"]:
+ if self.CM.ShouldBeStatus[node] == "up":
+ self.incr("WasStarted")
+ watchpats.append(self.templates["Pat:We_stopped"] % node)
+
+ if len(watchpats) == 0:
+ return self.success()
+
+ # Stop all the nodes - at about the same time...
+ watch = self.create_watch(watchpats, self.Env["DeadTime"]+10)
+
+ watch.set_watch()
+ self.set_timer()
+ for node in self.Env["nodes"]:
+ if self.CM.ShouldBeStatus[node] == "up":
+ self.CM.StopaCMnoBlock(node)
+ if watch.look_for_all():
+ # Make sure they're completely down with no residule
+ for node in self.Env["nodes"]:
+ self.rsh(node, self.templates["StopCmd"])
+
+ return self.success()
+
+ did_fail = 0
+ up_nodes = []
+ for node in self.Env["nodes"]:
+ if self.CM.StataCM(node) == 1:
+ did_fail = 1
+ up_nodes.append(node)
+
+ if did_fail:
+ return self.failure("Active nodes exist: " + repr(up_nodes))
+
+ self.logger.log("Warn: All nodes stopped but CTS didn't detect: "
+ + repr(watch.unmatched))
+
+ return self.failure("Missing log message: "+repr(watch.unmatched))
+
+ def is_applicable(self):
+ '''SimulStopLite is a setup test and never applicable'''
+ return False
+
+
+class SimulStartLite(CTSTest):
+ '''Start any stopped nodes ~ simultaneously'''
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "SimulStartLite"
+
+ def __call__(self, dummy):
+ '''Perform the 'SimulStartList' setup work. '''
+ self.incr("calls")
+ self.debug("Setup: " + self.name)
+
+ # We ignore the "node" parameter...
+ node_list = []
+ for node in self.Env["nodes"]:
+ if self.CM.ShouldBeStatus[node] == "down":
+ self.incr("WasStopped")
+ node_list.append(node)
+
+ self.set_timer()
+ while len(node_list) > 0:
+ # Repeat until all nodes come up
+ watchpats = [ ]
+
+ uppat = self.templates["Pat:NonDC_started"]
+ if self.CM.upcount() == 0:
+ uppat = self.templates["Pat:Local_started"]
+
+ watchpats.append(self.templates["Pat:DC_IDLE"])
+ for node in node_list:
+ watchpats.append(uppat % node)
+ watchpats.append(self.templates["Pat:InfraUp"] % node)
+ watchpats.append(self.templates["Pat:PacemakerUp"] % node)
+
+ # Start all the nodes - at about the same time...
+ watch = self.create_watch(watchpats, self.Env["DeadTime"]+10)
+ watch.set_watch()
+
+ stonith = self.CM.prepare_fencing_watcher(self.name)
+
+ for node in node_list:
+ self.CM.StartaCMnoBlock(node)
+
+ watch.look_for_all()
+
+ node_list = self.CM.fencing_cleanup(self.name, stonith)
+
+ if node_list == None:
+ return self.failure("Cluster did not stabilize")
+
+ # Remove node_list messages from watch.unmatched
+ for node in node_list:
+ self.logger.debug("Dealing with stonith operations for %s" % repr(node_list))
+ if watch.unmatched:
+ try:
+ watch.unmatched.remove(uppat % node)
+ except:
+ self.debug("Already matched: %s" % (uppat % node))
+ try:
+ watch.unmatched.remove(self.templates["Pat:InfraUp"] % node)
+ except:
+ self.debug("Already matched: %s" % (self.templates["Pat:InfraUp"] % node))
+ try:
+ watch.unmatched.remove(self.templates["Pat:PacemakerUp"] % node)
+ except:
+ self.debug("Already matched: %s" % (self.templates["Pat:PacemakerUp"] % node))
+
+ if watch.unmatched:
+ for regex in watch.unmatched:
+ self.logger.log ("Warn: Startup pattern not found: %s" %(regex))
+
+ if not self.CM.cluster_stable():
+ return self.failure("Cluster did not stabilize")
+
+ did_fail = 0
+ unstable = []
+ for node in self.Env["nodes"]:
+ if self.CM.StataCM(node) == 0:
+ did_fail = 1
+ unstable.append(node)
+
+ if did_fail:
+ return self.failure("Unstarted nodes exist: " + repr(unstable))
+
+ unstable = []
+ for node in self.Env["nodes"]:
+ if not self.CM.node_stable(node):
+ did_fail = 1
+ unstable.append(node)
+
+ if did_fail:
+ return self.failure("Unstable cluster nodes exist: " + repr(unstable))
+
+ return self.success()
+
+ def is_applicable(self):
+ '''SimulStartLite is a setup test and never applicable'''
+ return False
+
+
+def TestList(cm, audits):
+ result = []
+ for testclass in AllTestClasses:
+ bound_test = testclass(cm)
+ if bound_test.is_applicable():
+ bound_test.Audits = audits
+ result.append(bound_test)
+ return result
+
+
+class RemoteLXC(CTSTest):
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = "RemoteLXC"
+ self.start = StartTest(cm)
+ self.startall = SimulStartLite(cm)
+ self.num_containers = 2
+ self.is_container = 1
+ self.failed = 0
+ self.fail_string = ""
+
+ def start_lxc_simple(self, node):
+
+ # restore any artifacts laying around from a previous test.
+ self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null")
+
+ # generate the containers, put them in the config, add some resources to them
+ pats = [ ]
+ watch = self.create_watch(pats, 120)
+ watch.set_watch()
+ pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc1"))
+ pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc2"))
+ pats.append(self.templates["Pat:RscOpOK"] % ("start", "lxc-ms"))
+ pats.append(self.templates["Pat:RscOpOK"] % ("promote", "lxc-ms"))
+
+ self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -g -a -m -s -c %d &>/dev/null" % self.num_containers)
+ self.set_timer("remoteSimpleInit")
+ watch.look_for_all()
+ self.log_timer("remoteSimpleInit")
+ if watch.unmatched:
+ self.fail_string = "Unmatched patterns: %s" % (repr(watch.unmatched))
+ self.failed = 1
+
+ def cleanup_lxc_simple(self, node):
+
+ pats = [ ]
+ # if the test failed, attempt to clean up the cib and libvirt environment
+ # as best as possible
+ if self.failed == 1:
+ # restore libvirt and cib
+ self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null")
+ return
+
+ watch = self.create_watch(pats, 120)
+ watch.set_watch()
+
+ pats.append(self.templates["Pat:RscOpOK"] % ("stop", "container1"))
+ pats.append(self.templates["Pat:RscOpOK"] % ("stop", "container2"))
+
+ self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -p &>/dev/null")
+ self.set_timer("remoteSimpleCleanup")
+ watch.look_for_all()
+ self.log_timer("remoteSimpleCleanup")
+
+ if watch.unmatched:
+ self.fail_string = "Unmatched patterns: %s" % (repr(watch.unmatched))
+ self.failed = 1
+
+ # cleanup libvirt
+ self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -s -R &>/dev/null")
+
+ def __call__(self, node):
+ '''Perform the 'RemoteLXC' test. '''
+ self.incr("calls")
+
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("Setup failed, start all nodes failed.")
+
+ (rc, _) = self.rsh(node, "/usr/share/pacemaker/tests/cts/lxc_autogen.sh -v &>/dev/null")
+ if rc == 1:
+ self.log("Environment test for lxc support failed.")
+ return self.skipped()
+
+ self.start_lxc_simple(node)
+ self.cleanup_lxc_simple(node)
+
+ self.debug("Waiting for the cluster to recover")
+ self.CM.cluster_stable()
+
+ if self.failed == 1:
+ return self.failure(self.fail_string)
+
+ return self.success()
+
+ def errorstoignore(self):
+ '''Return list of errors which should be ignored'''
+ return [
+ r"Updating failcount for ping",
+ r"schedulerd.*: Recover\s+(ping|lxc-ms|container)\s+\(.*\)",
+ # The orphaned lxc-ms resource causes an expected transition error
+ # that is a result of the scheduler not having knowledge that the
+ # promotable resource used to be a clone. As a result, it looks like that
+ # resource is running in multiple locations when it shouldn't... But in
+ # this instance we know why this error is occurring and that it is expected.
+ r"Calculated [Tt]ransition .*pe-error",
+ r"Resource lxc-ms .* is active on 2 nodes attempting recovery",
+ r"Unknown operation: fail",
+ r"VirtualDomain.*ERROR: Unable to determine emulator",
+ ]
+
+AllTestClasses.append(RemoteLXC)
+
+
+class RemoteDriver(CTSTest):
+
+ def __init__(self, cm):
+ CTSTest.__init__(self,cm)
+ self.name = self.__class__.__name__
+ self.start = StartTest(cm)
+ self.startall = SimulStartLite(cm)
+ self.stop = StopTest(cm)
+ self.remote_rsc = "remote-rsc"
+ self.cib_cmd = """cibadmin -C -o %s -X '%s' """
+ self.reset()
+
+ def reset(self):
+ self.pcmk_started = 0
+ self.failed = False
+ self.fail_string = ""
+ self.remote_node_added = 0
+ self.remote_rsc_added = 0
+ self.remote_use_reconnect_interval = self.Env.random_gen.choice([True,False])
+
+ def fail(self, msg):
+ """ Mark test as failed. """
+
+ self.failed = True
+
+ # Always log the failure.
+ self.logger.log(msg)
+
+ # Use first failure as test status, as it's likely to be most useful.
+ if not self.fail_string:
+ self.fail_string = msg
+
+ def get_othernode(self, node):
+ for othernode in self.Env["nodes"]:
+ if othernode == node:
+ # we don't want to try and use the cib that we just shutdown.
+ # find a cluster node that is not our soon to be remote-node.
+ continue
+ else:
+ return othernode
+
+ def del_rsc(self, node, rsc):
+ othernode = self.get_othernode(node)
+ (rc, _) = self.rsh(othernode, "crm_resource -D -r %s -t primitive" % (rsc))
+ if rc != 0:
+ self.fail("Removal of resource '%s' failed" % rsc)
+
+ def add_rsc(self, node, rsc_xml):
+ othernode = self.get_othernode(node)
+ (rc, _) = self.rsh(othernode, self.cib_cmd % ("resources", rsc_xml))
+ if rc != 0:
+ self.fail("resource creation failed")
+
+ def add_primitive_rsc(self, node):
+ rsc_xml = """
+<primitive class="ocf" id="%(node)s" provider="heartbeat" type="Dummy">
+ <meta_attributes id="%(node)s-meta_attributes"/>
+ <operations>
+ <op id="%(node)s-monitor-interval-20s" interval="20s" name="monitor"/>
+ </operations>
+</primitive>""" % { "node": self.remote_rsc }
+ self.add_rsc(node, rsc_xml)
+ if not self.failed:
+ self.remote_rsc_added = 1
+
+ def add_connection_rsc(self, node):
+ rsc_xml = """
+<primitive class="ocf" id="%(node)s" provider="pacemaker" type="remote">
+ <instance_attributes id="%(node)s-instance_attributes">
+ <nvpair id="%(node)s-instance_attributes-server" name="server" value="%(server)s"/>
+""" % { "node": self.remote_node, "server": node }
+
+ if self.remote_use_reconnect_interval:
+ # Set reconnect interval on resource
+ rsc_xml = rsc_xml + """
+ <nvpair id="%s-instance_attributes-reconnect_interval" name="reconnect_interval" value="60s"/>
+""" % (self.remote_node)
+
+ rsc_xml = rsc_xml + """
+ </instance_attributes>
+ <operations>
+ <op id="%(node)s-start" name="start" interval="0" timeout="120s"/>
+ <op id="%(node)s-monitor-20s" name="monitor" interval="20s" timeout="45s"/>
+ </operations>
+</primitive>
+""" % { "node": self.remote_node }
+
+ self.add_rsc(node, rsc_xml)
+ if not self.failed:
+ self.remote_node_added = 1
+
+ def disable_services(self, node):
+ self.corosync_enabled = self.Env.service_is_enabled(node, "corosync")
+ if self.corosync_enabled:
+ self.Env.disable_service(node, "corosync")
+
+ self.pacemaker_enabled = self.Env.service_is_enabled(node, "pacemaker")
+ if self.pacemaker_enabled:
+ self.Env.disable_service(node, "pacemaker")
+
+ def restore_services(self, node):
+ if self.corosync_enabled:
+ self.Env.enable_service(node, "corosync")
+
+ if self.pacemaker_enabled:
+ self.Env.enable_service(node, "pacemaker")
+
+ def stop_pcmk_remote(self, node):
+ # disable pcmk remote
+ for i in range(10):
+ (rc, _) = self.rsh(node, "service pacemaker_remote stop")
+ if rc != 0:
+ time.sleep(6)
+ else:
+ break
+
+ def start_pcmk_remote(self, node):
+ for i in range(10):
+ (rc, _) = self.rsh(node, "service pacemaker_remote start")
+ if rc != 0:
+ time.sleep(6)
+ else:
+ self.pcmk_started = 1
+ break
+
+ def freeze_pcmk_remote(self, node):
+ """ Simulate a Pacemaker Remote daemon failure. """
+
+ # We freeze the process.
+ self.rsh(node, "killall -STOP pacemaker-remoted")
+
+ def resume_pcmk_remote(self, node):
+ # We resume the process.
+ self.rsh(node, "killall -CONT pacemaker-remoted")
+
+ def start_metal(self, node):
+ # Cluster nodes are reused as remote nodes in remote tests. If cluster
+ # services were enabled at boot, in case the remote node got fenced, the
+ # cluster node would join instead of the expected remote one. Meanwhile
+ # pacemaker_remote would not be able to start. Depending on the chances,
+ # the situations might not be able to be orchestrated gracefully any more.
+ #
+ # Temporarily disable any enabled cluster serivces.
+ self.disable_services(node)
+
+ pcmk_started = 0
+
+ # make sure the resource doesn't already exist for some reason
+ self.rsh(node, "crm_resource -D -r %s -t primitive" % (self.remote_rsc))
+ self.rsh(node, "crm_resource -D -r %s -t primitive" % (self.remote_node))
+
+ if not self.stop(node):
+ self.fail("Failed to shutdown cluster node %s" % node)
+ return
+
+ self.start_pcmk_remote(node)
+
+ if self.pcmk_started == 0:
+ self.fail("Failed to start pacemaker_remote on node %s" % node)
+ return
+
+ # Convert node to baremetal now that it has shutdown the cluster stack
+ pats = [ ]
+ watch = self.create_watch(pats, 120)
+ watch.set_watch()
+ pats.append(self.templates["Pat:RscOpOK"] % ("start", self.remote_node))
+ pats.append(self.templates["Pat:DC_IDLE"])
+
+ self.add_connection_rsc(node)
+
+ self.set_timer("remoteMetalInit")
+ watch.look_for_all()
+ self.log_timer("remoteMetalInit")
+ if watch.unmatched:
+ self.fail("Unmatched patterns: %s" % watch.unmatched)
+
+ def migrate_connection(self, node):
+ if self.failed:
+ return
+
+ pats = [ ]
+ pats.append(self.templates["Pat:RscOpOK"] % ("migrate_to", self.remote_node))
+ pats.append(self.templates["Pat:RscOpOK"] % ("migrate_from", self.remote_node))
+ pats.append(self.templates["Pat:DC_IDLE"])
+ watch = self.create_watch(pats, 120)
+ watch.set_watch()
+
+ (rc, _) = self.rsh(node, "crm_resource -M -r %s" % (self.remote_node), verbose=1)
+ if rc != 0:
+ self.fail("failed to move remote node connection resource")
+ return
+
+ self.set_timer("remoteMetalMigrate")
+ watch.look_for_all()
+ self.log_timer("remoteMetalMigrate")
+
+ if watch.unmatched:
+ self.fail("Unmatched patterns: %s" % watch.unmatched)
+ return
+
+ def fail_rsc(self, node):
+ if self.failed:
+ return
+
+ watchpats = [ ]
+ watchpats.append(self.templates["Pat:RscRemoteOpOK"] % ("stop", self.remote_rsc, self.remote_node))
+ watchpats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self.remote_rsc, self.remote_node))
+ watchpats.append(self.templates["Pat:DC_IDLE"])
+
+ watch = self.create_watch(watchpats, 120)
+ watch.set_watch()
+
+ self.debug("causing dummy rsc to fail.")
+
+ self.rsh(node, "rm -f /var/run/resource-agents/Dummy*")
+
+ self.set_timer("remoteRscFail")
+ watch.look_for_all()
+ self.log_timer("remoteRscFail")
+ if watch.unmatched:
+ self.fail("Unmatched patterns during rsc fail: %s" % watch.unmatched)
+
+ def fail_connection(self, node):
+ if self.failed:
+ return
+
+ watchpats = [ ]
+ watchpats.append(self.templates["Pat:Fencing_ok"] % self.remote_node)
+ watchpats.append(self.templates["Pat:NodeFenced"] % self.remote_node)
+
+ watch = self.create_watch(watchpats, 120)
+ watch.set_watch()
+
+ # freeze the pcmk remote daemon. this will result in fencing
+ self.debug("Force stopped active remote node")
+ self.freeze_pcmk_remote(node)
+
+ self.debug("Waiting for remote node to be fenced.")
+ self.set_timer("remoteMetalFence")
+ watch.look_for_all()
+ self.log_timer("remoteMetalFence")
+ if watch.unmatched:
+ self.fail("Unmatched patterns: %s" % watch.unmatched)
+ return
+
+ self.debug("Waiting for the remote node to come back up")
+ self.CM.ns.wait_for_node(node, 120);
+
+ pats = [ ]
+ watch = self.create_watch(pats, 240)
+ watch.set_watch()
+ pats.append(self.templates["Pat:RscOpOK"] % ("start", self.remote_node))
+ if self.remote_rsc_added == 1:
+ pats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self.remote_rsc, self.remote_node))
+
+ # start the remote node again watch it integrate back into cluster.
+ self.start_pcmk_remote(node)
+ if self.pcmk_started == 0:
+ self.fail("Failed to start pacemaker_remote on node %s" % node)
+ return
+
+ self.debug("Waiting for remote node to rejoin cluster after being fenced.")
+ self.set_timer("remoteMetalRestart")
+ watch.look_for_all()
+ self.log_timer("remoteMetalRestart")
+ if watch.unmatched:
+ self.fail("Unmatched patterns: %s" % watch.unmatched)
+ return
+
+ def add_dummy_rsc(self, node):
+ if self.failed:
+ return
+
+ # verify we can put a resource on the remote node
+ pats = [ ]
+ watch = self.create_watch(pats, 120)
+ watch.set_watch()
+ pats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self.remote_rsc, self.remote_node))
+ pats.append(self.templates["Pat:DC_IDLE"])
+
+ # Add a resource that must live on remote-node
+ self.add_primitive_rsc(node)
+
+ # force that rsc to prefer the remote node.
+ (rc, _) = self.CM.rsh(node, "crm_resource -M -r %s -N %s -f" % (self.remote_rsc, self.remote_node), verbose=1)
+ if rc != 0:
+ self.fail("Failed to place remote resource on remote node.")
+ return
+
+ self.set_timer("remoteMetalRsc")
+ watch.look_for_all()
+ self.log_timer("remoteMetalRsc")
+ if watch.unmatched:
+ self.fail("Unmatched patterns: %s" % watch.unmatched)
+
+ def test_attributes(self, node):
+ if self.failed:
+ return
+
+ # This verifies permanent attributes can be set on a remote-node. It also
+ # verifies the remote-node can edit its own cib node section remotely.
+ (rc, line) = self.CM.rsh(node, "crm_attribute -l forever -n testattr -v testval -N %s" % (self.remote_node), verbose=1)
+ if rc != 0:
+ self.fail("Failed to set remote-node attribute. rc:%s output:%s" % (rc, line))
+ return
+
+ (rc, _) = self.CM.rsh(node, "crm_attribute -l forever -n testattr -q -N %s" % (self.remote_node), verbose=1)
+ if rc != 0:
+ self.fail("Failed to get remote-node attribute")
+ return
+
+ (rc, _) = self.CM.rsh(node, "crm_attribute -l forever -n testattr -D -N %s" % (self.remote_node), verbose=1)
+ if rc != 0:
+ self.fail("Failed to delete remote-node attribute")
+ return
+
+ def cleanup_metal(self, node):
+ self.restore_services(node)
+
+ if self.pcmk_started == 0:
+ return
+
+ pats = [ ]
+
+ watch = self.create_watch(pats, 120)
+ watch.set_watch()
+
+ if self.remote_rsc_added == 1:
+ pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.remote_rsc))
+ if self.remote_node_added == 1:
+ pats.append(self.templates["Pat:RscOpOK"] % ("stop", self.remote_node))
+
+ self.set_timer("remoteMetalCleanup")
+
+ self.resume_pcmk_remote(node)
+
+ if self.remote_rsc_added == 1:
+
+ # Remove dummy resource added for remote node tests
+ self.debug("Cleaning up dummy rsc put on remote node")
+ self.rsh(self.get_othernode(node), "crm_resource -U -r %s" % self.remote_rsc)
+ self.del_rsc(node, self.remote_rsc)
+
+ if self.remote_node_added == 1:
+
+ # Remove remote node's connection resource
+ self.debug("Cleaning up remote node connection resource")
+ self.rsh(self.get_othernode(node), "crm_resource -U -r %s" % (self.remote_node))
+ self.del_rsc(node, self.remote_node)
+
+ watch.look_for_all()
+ self.log_timer("remoteMetalCleanup")
+
+ if watch.unmatched:
+ self.fail("Unmatched patterns: %s" % watch.unmatched)
+
+ self.stop_pcmk_remote(node)
+
+ self.debug("Waiting for the cluster to recover")
+ self.CM.cluster_stable()
+
+ if self.remote_node_added == 1:
+ # Remove remote node itself
+ self.debug("Cleaning up node entry for remote node")
+ self.rsh(self.get_othernode(node), "crm_node --force --remove %s" % self.remote_node)
+
+ def setup_env(self, node):
+
+ self.remote_node = "remote-%s" % (node)
+
+ # we are assuming if all nodes have a key, that it is
+ # the right key... If any node doesn't have a remote
+ # key, we regenerate it everywhere.
+ if self.rsh.exists_on_all("/etc/pacemaker/authkey", self.Env["nodes"]):
+ return
+
+ # create key locally
+ (handle, keyfile) = tempfile.mkstemp(".cts")
+ os.close(handle)
+ subprocess.check_call(["dd", "if=/dev/urandom", "of=%s" % keyfile, "bs=4096", "count=1"],
+ stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+
+ # sync key throughout the cluster
+ for node in self.Env["nodes"]:
+ self.rsh(node, "mkdir -p --mode=0750 /etc/pacemaker")
+ self.rsh.copy(keyfile, "root@%s:/etc/pacemaker/authkey" % node)
+ self.rsh(node, "chgrp haclient /etc/pacemaker /etc/pacemaker/authkey")
+ self.rsh(node, "chmod 0640 /etc/pacemaker/authkey")
+ os.unlink(keyfile)
+
+ def is_applicable(self):
+ if not self.is_applicable_common():
+ return False
+
+ for node in self.Env["nodes"]:
+ (rc, _) = self.rsh(node, "which pacemaker-remoted >/dev/null 2>&1")
+ if rc != 0:
+ return False
+ return True
+
+ def start_new_test(self, node):
+ self.incr("calls")
+ self.reset()
+
+ ret = self.startall(None)
+ if not ret:
+ return self.failure("setup failed: could not start all nodes")
+
+ self.setup_env(node)
+ self.start_metal(node)
+ self.add_dummy_rsc(node)
+ return True
+
+ def __call__(self, node):
+ return self.failure("This base class is not meant to be called directly.")
+
+ def errorstoignore(self):
+ '''Return list of errors which should be ignored'''
+ return [ r"""is running on remote.*which isn't allowed""",
+ r"""Connection terminated""",
+ r"""Could not send remote""",
+ ]
+
+# RemoteDriver is just a base class for other tests, so it is not added to AllTestClasses
+
+
+class RemoteBasic(RemoteDriver):
+
+ def __call__(self, node):
+ '''Perform the 'RemoteBaremetal' test. '''
+
+ if not self.start_new_test(node):
+ return self.failure(self.fail_string)
+
+ self.test_attributes(node)
+ self.cleanup_metal(node)
+
+ self.debug("Waiting for the cluster to recover")
+ self.CM.cluster_stable()
+ if self.failed:
+ return self.failure(self.fail_string)
+
+ return self.success()
+
+AllTestClasses.append(RemoteBasic)
+
+class RemoteStonithd(RemoteDriver):
+
+ def __call__(self, node):
+ '''Perform the 'RemoteStonithd' test. '''
+
+ if not self.start_new_test(node):
+ return self.failure(self.fail_string)
+
+ self.fail_connection(node)
+ self.cleanup_metal(node)
+
+ self.debug("Waiting for the cluster to recover")
+ self.CM.cluster_stable()
+ if self.failed:
+ return self.failure(self.fail_string)
+
+ return self.success()
+
+ def is_applicable(self):
+ if not RemoteDriver.is_applicable(self):
+ return False
+
+ if "DoFencing" in list(self.Env.keys()):
+ return self.Env["DoFencing"]
+
+ return True
+
+ def errorstoignore(self):
+ ignore_pats = [
+ r"Lost connection to Pacemaker Remote node",
+ r"Software caused connection abort",
+ r"pacemaker-controld.*:\s+error.*: Operation remote-.*_monitor",
+ r"pacemaker-controld.*:\s+error.*: Result of monitor operation for remote-.*",
+ r"schedulerd.*:\s+Recover\s+remote-.*\s+\(.*\)",
+ r"error: Result of monitor operation for .* on remote-.*: Internal communication failure",
+ ]
+
+ ignore_pats.extend(RemoteDriver.errorstoignore(self))
+ return ignore_pats
+
+AllTestClasses.append(RemoteStonithd)
+
+
+class RemoteMigrate(RemoteDriver):
+
+ def __call__(self, node):
+ '''Perform the 'RemoteMigrate' test. '''
+
+ if not self.start_new_test(node):
+ return self.failure(self.fail_string)
+
+ self.migrate_connection(node)
+ self.cleanup_metal(node)
+
+ self.debug("Waiting for the cluster to recover")
+ self.CM.cluster_stable()
+ if self.failed:
+ return self.failure(self.fail_string)
+
+ return self.success()
+
+ def is_applicable(self):
+ if not RemoteDriver.is_applicable(self):
+ return 0
+ # This test requires at least three nodes: one to convert to a
+ # remote node, one to host the connection originally, and one
+ # to migrate the connection to.
+ if len(self.Env["nodes"]) < 3:
+ return 0
+ return 1
+
+AllTestClasses.append(RemoteMigrate)
+
+
+class RemoteRscFailure(RemoteDriver):
+
+ def __call__(self, node):
+ '''Perform the 'RemoteRscFailure' test. '''
+
+ if not self.start_new_test(node):
+ return self.failure(self.fail_string)
+
+ # This is an important step. We are migrating the connection
+ # before failing the resource. This verifies that the migration
+ # has properly maintained control over the remote-node.
+ self.migrate_connection(node)
+
+ self.fail_rsc(node)
+ self.cleanup_metal(node)
+
+ self.debug("Waiting for the cluster to recover")
+ self.CM.cluster_stable()
+ if self.failed:
+ return self.failure(self.fail_string)
+
+ return self.success()
+
+ def errorstoignore(self):
+ ignore_pats = [
+ r"schedulerd.*: Recover\s+remote-rsc\s+\(.*\)",
+ r"Dummy.*: No process state file found",
+ ]
+
+ ignore_pats.extend(RemoteDriver.errorstoignore(self))
+ return ignore_pats
+
+ def is_applicable(self):
+ if not RemoteDriver.is_applicable(self):
+ return 0
+ # This test requires at least three nodes: one to convert to a
+ # remote node, one to host the connection originally, and one
+ # to migrate the connection to.
+ if len(self.Env["nodes"]) < 3:
+ return 0
+ return 1
+
+AllTestClasses.append(RemoteRscFailure)
+
+# vim:ts=4:sw=4:et:
diff --git a/cts/lab/ClusterManager.py b/cts/lab/ClusterManager.py
new file mode 100644
index 0000000..fda4cfb
--- /dev/null
+++ b/cts/lab/ClusterManager.py
@@ -0,0 +1,940 @@
+""" ClusterManager class for Pacemaker's Cluster Test Suite (CTS)
+"""
+
+__copyright__ = """Copyright 2000-2023 the Pacemaker project contributors.
+Certain portions by Huang Zhen <zhenhltc@cn.ibm.com> are copyright 2004
+International Business Machines. The version control history for this file
+may have further details."""
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import os
+import re
+import time
+
+from collections import UserDict
+
+from cts.CIB import ConfigFactory
+from cts.CTStests import AuditResource
+
+from pacemaker.buildoptions import BuildOptions
+from pacemaker._cts.CTS import NodeStatus, Process
+from pacemaker._cts.environment import EnvFactory
+from pacemaker._cts.logging import LogFactory
+from pacemaker._cts.patterns import PatternSelector
+from pacemaker._cts.remote import RemoteFactory
+from pacemaker._cts.watcher import LogWatcher
+
+class ClusterManager(UserDict):
+ '''The Cluster Manager class.
+ This is an subclass of the Python dictionary class.
+ (this is because it contains lots of {name,value} pairs,
+ not because it's behavior is that terribly similar to a
+ dictionary in other ways.)
+
+ This is an abstract class which class implements high-level
+ operations on the cluster and/or its cluster managers.
+ Actual cluster managers classes are subclassed from this type.
+
+ One of the things we do is track the state we think every node should
+ be in.
+ '''
+
+ def __InitialConditions(self):
+ #if os.geteuid() != 0:
+ # raise ValueError("Must Be Root!")
+ None
+
+ def _finalConditions(self):
+ for key in list(self.keys()):
+ if self[key] == None:
+ raise ValueError("Improper derivation: self[" + key + "] must be overridden by subclass.")
+
+ def __init__(self):
+ self.Env = EnvFactory().getInstance()
+ self.templates = PatternSelector(self.Env["Name"])
+ self.__InitialConditions()
+ self.logger = LogFactory()
+ self.TestLoggingLevel=0
+ self.data = {}
+ self.name = self.Env["Name"]
+
+ self.rsh = RemoteFactory().getInstance()
+ self.ShouldBeStatus={}
+ self.ns = NodeStatus(self.Env)
+ self.OurNode = os.uname()[1].lower()
+ self.__instance_errorstoignore = []
+
+ self.cib_installed = 0
+ self.config = None
+ self.cluster_monitor = 0
+ self.use_short_names = 1
+
+ if self.Env["DoBSC"]:
+ del self.templates["Pat:They_stopped"]
+
+ self._finalConditions()
+
+ self.check_transitions = 0
+ self.check_elections = 0
+ self.CIBsync = {}
+ self.CibFactory = ConfigFactory(self)
+ self.cib = self.CibFactory.createConfig(self.Env["Schema"])
+
+ def __getitem__(self, key):
+ if key == "Name":
+ return self.name
+
+ print("FIXME: Getting %s from %s" % (key, repr(self)))
+ if key in self.data:
+ return self.data[key]
+
+ return self.templates.get_patterns(key)
+
+ def __setitem__(self, key, value):
+ print("FIXME: Setting %s=%s on %s" % (key, value, repr(self)))
+ self.data[key] = value
+
+ def key_for_node(self, node):
+ return node
+
+ def instance_errorstoignore_clear(self):
+ '''Allows the test scenario to reset instance errors to ignore on each iteration.'''
+ self.__instance_errorstoignore = []
+
+ def instance_errorstoignore(self):
+ '''Return list of errors which are 'normal' for a specific test instance'''
+ return self.__instance_errorstoignore
+
+ def log(self, args):
+ self.logger.log(args)
+
+ def debug(self, args):
+ self.logger.debug(args)
+
+ def upcount(self):
+ '''How many nodes are up?'''
+ count = 0
+ for node in self.Env["nodes"]:
+ if self.ShouldBeStatus[node] == "up":
+ count = count + 1
+ return count
+
+ def install_support(self, command="install"):
+ for node in self.Env["nodes"]:
+ self.rsh(node, BuildOptions.DAEMON_DIR + "/cts-support " + command)
+
+ def prepare_fencing_watcher(self, name):
+ # If we don't have quorum now but get it as a result of starting this node,
+ # then a bunch of nodes might get fenced
+ upnode = None
+ if self.HasQuorum(None):
+ self.debug("Have quorum")
+ return None
+
+ if not self.templates["Pat:Fencing_start"]:
+ print("No start pattern")
+ return None
+
+ if not self.templates["Pat:Fencing_ok"]:
+ print("No ok pattern")
+ return None
+
+ stonith = None
+ stonithPats = []
+ for peer in self.Env["nodes"]:
+ if self.ShouldBeStatus[peer] != "up":
+ stonithPats.append(self.templates["Pat:Fencing_ok"] % peer)
+ stonithPats.append(self.templates["Pat:Fencing_start"] % peer)
+
+ stonith = LogWatcher(self.Env["LogFileName"], stonithPats, self.Env["nodes"], self.Env["LogWatcher"], "StartupFencing", 0)
+ stonith.set_watch()
+ return stonith
+
+ def fencing_cleanup(self, node, stonith):
+ peer_list = []
+ peer_state = {}
+
+ self.debug("Looking for nodes that were fenced as a result of %s starting" % node)
+
+ # If we just started a node, we may now have quorum (and permission to fence)
+ if not stonith:
+ self.debug("Nothing to do")
+ return peer_list
+
+ q = self.HasQuorum(None)
+ if not q and len(self.Env["nodes"]) > 2:
+ # We didn't gain quorum - we shouldn't have shot anyone
+ self.debug("Quorum: %d Len: %d" % (q, len(self.Env["nodes"])))
+ return peer_list
+
+ for n in self.Env["nodes"]:
+ peer_state[n] = "unknown"
+
+ # Now see if any states need to be updated
+ self.debug("looking for: " + repr(stonith.regexes))
+ shot = stonith.look(0)
+ while shot:
+ line = repr(shot)
+ self.debug("Found: " + line)
+ del stonith.regexes[stonith.whichmatch]
+
+ # Extract node name
+ for n in self.Env["nodes"]:
+ if re.search(self.templates["Pat:Fencing_ok"] % n, shot):
+ peer = n
+ peer_state[peer] = "complete"
+ self.__instance_errorstoignore.append(self.templates["Pat:Fencing_ok"] % peer)
+
+ elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot):
+ # TODO: Correctly detect multiple fencing operations for the same host
+ peer = n
+ peer_state[peer] = "in-progress"
+ self.__instance_errorstoignore.append(self.templates["Pat:Fencing_start"] % peer)
+
+ if not peer:
+ self.logger.log("ERROR: Unknown stonith match: %s" % line)
+
+ elif not peer in peer_list:
+ self.debug("Found peer: " + peer)
+ peer_list.append(peer)
+
+ # Get the next one
+ shot = stonith.look(60)
+
+ for peer in peer_list:
+
+ self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer]))
+ if self.Env["at-boot"]:
+ self.ShouldBeStatus[peer] = "up"
+ else:
+ self.ShouldBeStatus[peer] = "down"
+
+ if peer_state[peer] == "in-progress":
+ # Wait for any in-progress operations to complete
+ shot = stonith.look(60)
+ while len(stonith.regexes) and shot:
+ line = repr(shot)
+ self.debug("Found: " + line)
+ del stonith.regexes[stonith.whichmatch]
+ shot = stonith.look(60)
+
+ # Now make sure the node is alive too
+ self.ns.wait_for_node(peer, self.Env["DeadTime"])
+
+ # Poll until it comes up
+ if self.Env["at-boot"]:
+ if not self.StataCM(peer):
+ time.sleep(self.Env["StartTime"])
+
+ if not self.StataCM(peer):
+ self.logger.log("ERROR: Peer %s failed to restart after being fenced" % peer)
+ return None
+
+ return peer_list
+
+ def StartaCM(self, node, verbose=False):
+
+ '''Start up the cluster manager on a given node'''
+ if verbose: self.logger.log("Starting %s on node %s" % (self.templates["Name"], node))
+ else: self.debug("Starting %s on node %s" % (self.templates["Name"], node))
+ ret = 1
+
+ if not node in self.ShouldBeStatus:
+ self.ShouldBeStatus[node] = "down"
+
+ if self.ShouldBeStatus[node] != "down":
+ return 1
+
+ patterns = []
+ # Technically we should always be able to notice ourselves starting
+ patterns.append(self.templates["Pat:Local_started"] % node)
+ if self.upcount() == 0:
+ patterns.append(self.templates["Pat:DC_started"] % node)
+ else:
+ patterns.append(self.templates["Pat:NonDC_started"] % node)
+
+ watch = LogWatcher(
+ self.Env["LogFileName"], patterns, self.Env["nodes"], self.Env["LogWatcher"], "StartaCM", self.Env["StartTime"]+10)
+
+ self.install_config(node)
+
+ self.ShouldBeStatus[node] = "any"
+ if self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]):
+ self.logger.log ("%s was already started" % (node))
+ return 1
+
+ stonith = self.prepare_fencing_watcher(node)
+ watch.set_watch()
+
+ (rc, _) = self.rsh(node, self.templates["StartCmd"])
+ if rc != 0:
+ self.logger.log ("Warn: Start command failed on node %s" % (node))
+ self.fencing_cleanup(node, stonith)
+ return None
+
+ self.ShouldBeStatus[node] = "up"
+ watch_result = watch.look_for_all()
+
+ if watch.unmatched:
+ for regex in watch.unmatched:
+ self.logger.log ("Warn: Startup pattern not found: %s" % (regex))
+
+ if watch_result and self.cluster_stable(self.Env["DeadTime"]):
+ #self.debug("Found match: "+ repr(watch_result))
+ self.fencing_cleanup(node, stonith)
+ return 1
+
+ elif self.StataCM(node) and self.cluster_stable(self.Env["DeadTime"]):
+ self.fencing_cleanup(node, stonith)
+ return 1
+
+ self.logger.log ("Warn: Start failed for node %s" % (node))
+ return None
+
+ def StartaCMnoBlock(self, node, verbose=False):
+
+ '''Start up the cluster manager on a given node with none-block mode'''
+
+ if verbose: self.logger.log("Starting %s on node %s" % (self["Name"], node))
+ else: self.debug("Starting %s on node %s" % (self["Name"], node))
+
+ self.install_config(node)
+ self.rsh(node, self.templates["StartCmd"], synchronous=False)
+ self.ShouldBeStatus[node] = "up"
+ return 1
+
+ def StopaCM(self, node, verbose=False, force=False):
+
+ '''Stop the cluster manager on a given node'''
+
+ if verbose: self.logger.log("Stopping %s on node %s" % (self["Name"], node))
+ else: self.debug("Stopping %s on node %s" % (self["Name"], node))
+
+ if self.ShouldBeStatus[node] != "up" and force == False:
+ return 1
+
+ (rc, _) = self.rsh(node, self.templates["StopCmd"])
+ if rc == 0:
+ # Make sure we can continue even if corosync leaks
+ # fdata-* is the old name
+ #self.rsh(node, "rm -rf /dev/shm/qb-* /dev/shm/fdata-*")
+ self.ShouldBeStatus[node] = "down"
+ self.cluster_stable(self.Env["DeadTime"])
+ return 1
+ else:
+ self.logger.log ("ERROR: Could not stop %s on node %s" % (self["Name"], node))
+
+ return None
+
+ def StopaCMnoBlock(self, node):
+
+ '''Stop the cluster manager on a given node with none-block mode'''
+
+ self.debug("Stopping %s on node %s" % (self["Name"], node))
+
+ self.rsh(node, self.templates["StopCmd"], synchronous=False)
+ self.ShouldBeStatus[node] = "down"
+ return 1
+
+ def RereadCM(self, node):
+
+ '''Force the cluster manager on a given node to reread its config
+ This may be a no-op on certain cluster managers.
+ '''
+ (rc, _) = self.rsh(node, self.templates["RereadCmd"])
+ if rc == 0:
+ return 1
+ else:
+ self.logger.log ("Could not force %s on node %s to reread its config"
+ % (self["Name"], node))
+ return None
+
+ def startall(self, nodelist=None, verbose=False, quick=False):
+
+ '''Start the cluster manager on every node in the cluster.
+ We can do it on a subset of the cluster if nodelist is not None.
+ '''
+ map = {}
+ if not nodelist:
+ nodelist = self.Env["nodes"]
+
+ for node in nodelist:
+ if self.ShouldBeStatus[node] == "down":
+ self.ns.wait_for_all_nodes(nodelist, 300)
+
+ if not quick:
+ # This is used for "basic sanity checks", so only start one node ...
+ if not self.StartaCM(node, verbose=verbose):
+ return 0
+ return 1
+
+ # Approximation of SimulStartList for --boot
+ watchpats = [ ]
+ watchpats.append(self.templates["Pat:DC_IDLE"])
+ for node in nodelist:
+ watchpats.append(self.templates["Pat:InfraUp"] % node)
+ watchpats.append(self.templates["Pat:PacemakerUp"] % node)
+ watchpats.append(self.templates["Pat:Local_started"] % node)
+ watchpats.append(self.templates["Pat:They_up"] % (nodelist[0], node))
+
+ # Start all the nodes - at about the same time...
+ watch = LogWatcher(self.Env["LogFileName"], watchpats, self.Env["nodes"], self.Env["LogWatcher"], "fast-start", self.Env["DeadTime"]+10)
+ watch.set_watch()
+
+ if not self.StartaCM(nodelist[0], verbose=verbose):
+ return 0
+ for node in nodelist:
+ self.StartaCMnoBlock(node, verbose=verbose)
+
+ watch.look_for_all()
+ if watch.unmatched:
+ for regex in watch.unmatched:
+ self.logger.log ("Warn: Startup pattern not found: %s" % (regex))
+
+ if not self.cluster_stable():
+ self.logger.log("Cluster did not stabilize")
+ return 0
+
+ return 1
+
+ def stopall(self, nodelist=None, verbose=False, force=False):
+
+ '''Stop the cluster managers on every node in the cluster.
+ We can do it on a subset of the cluster if nodelist is not None.
+ '''
+
+ ret = 1
+ map = {}
+ if not nodelist:
+ nodelist = self.Env["nodes"]
+ for node in self.Env["nodes"]:
+ if self.ShouldBeStatus[node] == "up" or force == True:
+ if not self.StopaCM(node, verbose=verbose, force=force):
+ ret = 0
+ return ret
+
+ def rereadall(self, nodelist=None):
+
+ '''Force the cluster managers on every node in the cluster
+ to reread their config files. We can do it on a subset of the
+ cluster if nodelist is not None.
+ '''
+
+ map = {}
+ if not nodelist:
+ nodelist = self.Env["nodes"]
+ for node in self.Env["nodes"]:
+ if self.ShouldBeStatus[node] == "up":
+ self.RereadCM(node)
+
+ def statall(self, nodelist=None):
+
+ '''Return the status of the cluster managers in the cluster.
+ We can do it on a subset of the cluster if nodelist is not None.
+ '''
+
+ result = {}
+ if not nodelist:
+ nodelist = self.Env["nodes"]
+ for node in nodelist:
+ if self.StataCM(node):
+ result[node] = "up"
+ else:
+ result[node] = "down"
+ return result
+
+ def isolate_node(self, target, nodes=None):
+ '''isolate the communication between the nodes'''
+ if not nodes:
+ nodes = self.Env["nodes"]
+
+ for node in nodes:
+ if node != target:
+ rc = self.rsh(target, self.templates["BreakCommCmd"] % self.key_for_node(node))
+ if rc != 0:
+ self.logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc))
+ return None
+ else:
+ self.debug("Communication cut between %s and %s" % (target, node))
+ return 1
+
+ def unisolate_node(self, target, nodes=None):
+ '''fix the communication between the nodes'''
+ if not nodes:
+ nodes = self.Env["nodes"]
+
+ for node in nodes:
+ if node != target:
+ restored = 0
+
+ # Limit the amount of time we have asynchronous connectivity for
+ # Restore both sides as simultaneously as possible
+ self.rsh(target, self.templates["FixCommCmd"] % self.key_for_node(node), synchronous=False)
+ self.rsh(node, self.templates["FixCommCmd"] % self.key_for_node(target), synchronous=False)
+ self.debug("Communication restored between %s and %s" % (target, node))
+
+ def oprofileStart(self, node=None):
+ if not node:
+ for n in self.Env["oprofile"]:
+ self.oprofileStart(n)
+
+ elif node in self.Env["oprofile"]:
+ self.debug("Enabling oprofile on %s" % node)
+ self.rsh(node, "opcontrol --init")
+ self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all")
+ self.rsh(node, "opcontrol --start")
+ self.rsh(node, "opcontrol --reset")
+
+ def oprofileSave(self, test, node=None):
+ if not node:
+ for n in self.Env["oprofile"]:
+ self.oprofileSave(test, n)
+
+ elif node in self.Env["oprofile"]:
+ self.rsh(node, "opcontrol --dump")
+ self.rsh(node, "opcontrol --save=cts.%d" % test)
+ # Read back with: opreport -l session:cts.0 image:<directory>/c*
+ if None:
+ self.rsh(node, "opcontrol --reset")
+ else:
+ self.oprofileStop(node)
+ self.oprofileStart(node)
+
+ def oprofileStop(self, node=None):
+ if not node:
+ for n in self.Env["oprofile"]:
+ self.oprofileStop(n)
+
+ elif node in self.Env["oprofile"]:
+ self.debug("Stopping oprofile on %s" % node)
+ self.rsh(node, "opcontrol --reset")
+ self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null")
+
+ def errorstoignore(self):
+ # At some point implement a more elegant solution that
+ # also produces a report at the end
+ """ Return a list of known error messages that should be ignored """
+ return self.templates.get_patterns("BadNewsIgnore")
+
+ def install_config(self, node):
+ if not self.ns.wait_for_node(node):
+ self.log("Node %s is not up." % node)
+ return None
+
+ if not node in self.CIBsync and self.Env["ClobberCIB"]:
+ self.CIBsync[node] = 1
+ self.rsh(node, "rm -f " + BuildOptions.CIB_DIR + "/cib*")
+
+ # Only install the CIB on the first node, all the other ones will pick it up from there
+ if self.cib_installed == 1:
+ return None
+
+ self.cib_installed = 1
+ if self.Env["CIBfilename"] == None:
+ self.log("Installing Generated CIB on node %s" % (node))
+ self.cib.install(node)
+
+ else:
+ self.log("Installing CIB (%s) on node %s" % (self.Env["CIBfilename"], node))
+ if self.rsh.copy(self.Env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node)) != 0:
+ raise ValueError("Can not scp file to %s %d"%(node))
+
+ self.rsh(node, "chown " + BuildOptions.DAEMON_USER + " " + BuildOptions.CIB_DIR + "/cib.xml")
+
+ def prepare(self):
+ '''Finish the Initialization process. Prepare to test...'''
+
+ self.partitions_expected = 1
+ for node in self.Env["nodes"]:
+ self.ShouldBeStatus[node] = ""
+ if self.Env["experimental-tests"]:
+ self.unisolate_node(node)
+ self.StataCM(node)
+
+ def test_node_CM(self, node):
+ '''Report the status of the cluster manager on a given node'''
+
+ watchpats = [ ]
+ watchpats.append("Current ping state: (S_IDLE|S_NOT_DC)")
+ watchpats.append(self.templates["Pat:NonDC_started"] % node)
+ watchpats.append(self.templates["Pat:DC_started"] % node)
+ idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, [node], self.Env["LogWatcher"], "ClusterIdle")
+ idle_watch.set_watch()
+
+ (_, out) = self.rsh(node, self.templates["StatusCmd"]%node, verbose=1)
+
+ if not out:
+ out = ""
+ else:
+ out = out[0].strip()
+
+ self.debug("Node %s status: '%s'" %(node, out))
+
+ if out.find('ok') < 0:
+ if self.ShouldBeStatus[node] == "up":
+ self.log(
+ "Node status for %s is %s but we think it should be %s"
+ % (node, "down", self.ShouldBeStatus[node]))
+ self.ShouldBeStatus[node] = "down"
+ return 0
+
+ if self.ShouldBeStatus[node] == "down":
+ self.log(
+ "Node status for %s is %s but we think it should be %s: %s"
+ % (node, "up", self.ShouldBeStatus[node], out))
+
+ self.ShouldBeStatus[node] = "up"
+
+ # check the output first - because syslog-ng loses messages
+ if out.find('S_NOT_DC') != -1:
+ # Up and stable
+ return 2
+ if out.find('S_IDLE') != -1:
+ # Up and stable
+ return 2
+
+ # fall back to syslog-ng and wait
+ if not idle_watch.look():
+ # just up
+ self.debug("Warn: Node %s is unstable: %s" % (node, out))
+ return 1
+
+ # Up and stable
+ return 2
+
+ # Is the node up or is the node down
+ def StataCM(self, node):
+ '''Report the status of the cluster manager on a given node'''
+
+ if self.test_node_CM(node) > 0:
+ return 1
+ return None
+
+ # Being up and being stable is not the same question...
+ def node_stable(self, node):
+ '''Report the status of the cluster manager on a given node'''
+
+ if self.test_node_CM(node) == 2:
+ return 1
+ self.log("Warn: Node %s not stable" % (node))
+ return None
+
+ def partition_stable(self, nodes, timeout=None):
+ watchpats = [ ]
+ watchpats.append("Current ping state: S_IDLE")
+ watchpats.append(self.templates["Pat:DC_IDLE"])
+ self.debug("Waiting for cluster stability...")
+
+ if timeout == None:
+ timeout = self.Env["DeadTime"]
+
+ if len(nodes) < 3:
+ self.debug("Cluster is inactive")
+ return 1
+
+ idle_watch = LogWatcher(self.Env["LogFileName"], watchpats, nodes.split(), self.Env["LogWatcher"], "ClusterStable", timeout)
+ idle_watch.set_watch()
+
+ for node in nodes.split():
+ # have each node dump its current state
+ self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
+
+ ret = idle_watch.look()
+ while ret:
+ self.debug(ret)
+ for node in nodes.split():
+ if re.search(node, ret):
+ return 1
+ ret = idle_watch.look()
+
+ self.debug("Warn: Partition %s not IDLE after %ds" % (repr(nodes), timeout))
+ return None
+
+ def cluster_stable(self, timeout=None, double_check=False):
+ partitions = self.find_partitions()
+
+ for partition in partitions:
+ if not self.partition_stable(partition, timeout):
+ return None
+
+ if double_check:
+ # Make sure we are really stable and that all resources,
+ # including those that depend on transient node attributes,
+ # are started if they were going to be
+ time.sleep(5)
+ for partition in partitions:
+ if not self.partition_stable(partition, timeout):
+ return None
+
+ return 1
+
+ def is_node_dc(self, node, status_line=None):
+ rc = 0
+
+ if not status_line:
+ (_, out) = self.rsh(node, self.templates["StatusCmd"]%node, verbose=1)
+
+ if out:
+ status_line = out[0].strip()
+
+ if not status_line:
+ rc = 0
+ elif status_line.find('S_IDLE') != -1:
+ rc = 1
+ elif status_line.find('S_INTEGRATION') != -1:
+ rc = 1
+ elif status_line.find('S_FINALIZE_JOIN') != -1:
+ rc = 1
+ elif status_line.find('S_POLICY_ENGINE') != -1:
+ rc = 1
+ elif status_line.find('S_TRANSITION_ENGINE') != -1:
+ rc = 1
+
+ return rc
+
+ def active_resources(self, node):
+ (_, output) = self.rsh(node, "crm_resource -c", verbose=1)
+ resources = []
+ for line in output:
+ if re.search("^Resource", line):
+ tmp = AuditResource(self, line)
+ if tmp.type == "primitive" and tmp.host == node:
+ resources.append(tmp.id)
+ return resources
+
+ def ResourceLocation(self, rid):
+ ResourceNodes = []
+ for node in self.Env["nodes"]:
+ if self.ShouldBeStatus[node] == "up":
+
+ cmd = self.templates["RscRunning"] % (rid)
+ (rc, lines) = self.rsh(node, cmd)
+
+ if rc == 127:
+ self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd)
+ for line in lines:
+ self.log("Output: "+line)
+ elif rc == 0:
+ ResourceNodes.append(node)
+
+ return ResourceNodes
+
+ def find_partitions(self):
+ ccm_partitions = []
+
+ for node in self.Env["nodes"]:
+ if self.ShouldBeStatus[node] == "up":
+ (_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1)
+
+ if not out:
+ self.log("no partition details for %s" % node)
+ continue
+
+ partition = out[0].strip()
+
+ if len(partition) > 2:
+ nodes = partition.split()
+ nodes.sort()
+ partition = ' '.join(nodes)
+
+ found = 0
+ for a_partition in ccm_partitions:
+ if partition == a_partition:
+ found = 1
+ if found == 0:
+ self.debug("Adding partition from %s: %s" % (node, partition))
+ ccm_partitions.append(partition)
+ else:
+ self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node))
+
+ else:
+ self.log("bad partition details for %s" % node)
+ else:
+ self.debug("Node %s is down... skipping" % node)
+
+ self.debug("Found partitions: %s" % repr(ccm_partitions) )
+ return ccm_partitions
+
+ def HasQuorum(self, node_list):
+ # If we are auditing a partition, then one side will
+ # have quorum and the other not.
+ # So the caller needs to tell us which we are checking
+ # If no value for node_list is specified... assume all nodes
+ if not node_list:
+ node_list = self.Env["nodes"]
+
+ for node in node_list:
+ if self.ShouldBeStatus[node] == "up":
+ (_, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1)
+ quorum = quorum[0].strip()
+
+ if quorum.find("1") != -1:
+ return 1
+ elif quorum.find("0") != -1:
+ return 0
+ else:
+ self.debug("WARN: Unexpected quorum test result from " + node + ":" + quorum)
+
+ return 0
+
+ def Components(self):
+ complist = []
+ common_ignore = [
+ "Pending action:",
+ "(ERROR|error): crm_log_message_adv:",
+ "(ERROR|error): MSG: No message to dump",
+ "pending LRM operations at shutdown",
+ "Lost connection to the CIB manager",
+ "Connection to the CIB terminated...",
+ "Sending message to the CIB manager FAILED",
+ "Action A_RECOVER .* not supported",
+ "(ERROR|error): stonithd_op_result_ready: not signed on",
+ "pingd.*(ERROR|error): send_update: Could not send update",
+ "send_ipc_message: IPC Channel to .* is not connected",
+ "unconfirmed_actions: Waiting on .* unconfirmed actions",
+ "cib_native_msgready: Message pending on command channel",
+ r": Performing A_EXIT_1 - forcefully exiting ",
+ r"Resource .* was active at shutdown. You may ignore this error if it is unmanaged.",
+ ]
+
+ stonith_ignore = [
+ r"Updating failcount for child_DoFencing",
+ r"error.*: Fencer connection failed \(will retry\)",
+ "pacemaker-execd.*(ERROR|error): stonithd_receive_ops_result failed.",
+ ]
+
+ stonith_ignore.extend(common_ignore)
+
+ ccm = Process(self, "ccm", pats = [
+ "State transition .* S_RECOVERY",
+ "pacemaker-controld.*Action A_RECOVER .* not supported",
+ r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
+ r"pacemaker-controld.*: Could not recover from internal error",
+ "pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy",
+ # these status numbers are likely wrong now
+ r"pacemaker-controld.*exited with status 2",
+ r"attrd.*exited with status 1",
+ r"cib.*exited with status 2",
+
+# Not if it was fenced
+# "A new node joined the cluster",
+
+# "WARN: determine_online_status: Node .* is unclean",
+# "Scheduling node .* for fencing",
+# "Executing .* fencing operation",
+# "tengine_stonith_callback: .*result=0",
+# "Processing I_NODE_JOIN:.* cause=C_HA_MESSAGE",
+# "State transition S_.* -> S_INTEGRATION.*input=I_NODE_JOIN",
+ "State transition S_STARTING -> S_PENDING",
+ ], badnews_ignore = common_ignore)
+
+ based = Process(self, "pacemaker-based", pats = [
+ "State transition .* S_RECOVERY",
+ "Lost connection to the CIB manager",
+ "Connection to the CIB manager terminated",
+ r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
+ "pacemaker-controld.*I_ERROR.*crmd_cib_connection_destroy",
+ r"pacemaker-controld.*: Could not recover from internal error",
+ # these status numbers are likely wrong now
+ r"pacemaker-controld.*exited with status 2",
+ r"attrd.*exited with status 1",
+ ], badnews_ignore = common_ignore)
+
+ execd = Process(self, "pacemaker-execd", pats = [
+ "State transition .* S_RECOVERY",
+ "LRM Connection failed",
+ "pacemaker-controld.*I_ERROR.*lrm_connection_destroy",
+ "State transition S_STARTING -> S_PENDING",
+ r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
+ r"pacemaker-controld.*: Could not recover from internal error",
+ # this status number is likely wrong now
+ r"pacemaker-controld.*exited with status 2",
+ ], badnews_ignore = common_ignore)
+
+ controld = Process(self, "pacemaker-controld",
+ pats = [
+# "WARN: determine_online_status: Node .* is unclean",
+# "Scheduling node .* for fencing",
+# "Executing .* fencing operation",
+# "tengine_stonith_callback: .*result=0",
+ "State transition .* S_IDLE",
+ "State transition S_STARTING -> S_PENDING",
+ ], badnews_ignore = common_ignore)
+
+ schedulerd = Process(self, "pacemaker-schedulerd", pats = [
+ "State transition .* S_RECOVERY",
+ r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
+ r"pacemaker-controld.*: Could not recover from internal error",
+ r"pacemaker-controld.*CRIT.*: Connection to the scheduler failed",
+ "pacemaker-controld.*I_ERROR.*save_cib_contents",
+ # this status number is likely wrong now
+ r"pacemaker-controld.*exited with status 2",
+ ], badnews_ignore = common_ignore, dc_only=True)
+
+ if self.Env["DoFencing"]:
+ complist.append(Process(self, "stoniths", dc_pats = [
+ r"pacemaker-controld.*CRIT.*: Fencing daemon connection failed",
+ "Attempting connection to fencing daemon",
+ ], badnews_ignore = stonith_ignore))
+
+ ccm.pats.extend([
+ # these status numbers are likely wrong now
+ r"attrd.*exited with status 1",
+ r"pacemaker-(based|controld).*exited with status 2",
+ ])
+ based.pats.extend([
+ # these status numbers are likely wrong now
+ r"attrd.*exited with status 1",
+ r"pacemaker-controld.*exited with status 2",
+ ])
+ execd.pats.extend([
+ # these status numbers are likely wrong now
+ r"pacemaker-controld.*exited with status 2",
+ ])
+
+ complist.append(ccm)
+ complist.append(based)
+ complist.append(execd)
+ complist.append(controld)
+ complist.append(schedulerd)
+
+ return complist
+
+ def StandbyStatus(self, node):
+ (_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1)
+ if not out:
+ return "off"
+ out = out[0].strip()
+ self.debug("Standby result: "+out)
+ return out
+
+ # status == "on" : Enter Standby mode
+ # status == "off": Enter Active mode
+ def SetStandbyMode(self, node, status):
+ current_status = self.StandbyStatus(node)
+ cmd = self.templates["StandbyCmd"] % (node, status)
+ self.rsh(node, cmd)
+ return True
+
+ def AddDummyRsc(self, node, rid):
+ rsc_xml = """ '<resources>
+ <primitive class=\"ocf\" id=\"%s\" provider=\"pacemaker\" type=\"Dummy\">
+ <operations>
+ <op id=\"%s-interval-10s\" interval=\"10s\" name=\"monitor\"/
+ </operations>
+ </primitive>
+ </resources>'""" % (rid, rid)
+ constraint_xml = """ '<constraints>
+ <rsc_location id=\"location-%s-%s\" node=\"%s\" rsc=\"%s\" score=\"INFINITY\"/>
+ </constraints>'
+ """ % (rid, node, node, rid)
+
+ self.rsh(node, self.templates['CibAddXml'] % (rsc_xml))
+ self.rsh(node, self.templates['CibAddXml'] % (constraint_xml))
+
+ def RemoveDummyRsc(self, node, rid):
+ constraint = "\"//rsc_location[@rsc='%s']\"" % (rid)
+ rsc = "\"//primitive[@id='%s']\"" % (rid)
+
+ self.rsh(node, self.templates['CibDelXpath'] % constraint)
+ self.rsh(node, self.templates['CibDelXpath'] % rsc)
diff --git a/cts/lab/Makefile.am b/cts/lab/Makefile.am
new file mode 100644
index 0000000..27e39b3
--- /dev/null
+++ b/cts/lab/Makefile.am
@@ -0,0 +1,31 @@
+#
+# Copyright 2001-2023 the Pacemaker project contributors
+#
+# The version control history for this file may have further details.
+#
+# This source code is licensed under the GNU General Public License version 2
+# or later (GPLv2+) WITHOUT ANY WARRANTY.
+#
+
+MAINTAINERCLEANFILES = Makefile.in
+
+noinst_SCRIPTS = cluster_test \
+ OCFIPraTest.py
+
+# Commands intended to be run only via other commands
+halibdir = $(CRM_DAEMON_DIR)
+dist_halib_SCRIPTS = cts-log-watcher
+
+ctslibdir = $(pythondir)/cts
+ctslib_PYTHON = __init__.py \
+ CIB.py \
+ cib_xml.py \
+ ClusterManager.py \
+ CM_corosync.py \
+ CTSaudits.py \
+ CTSscenarios.py \
+ CTStests.py
+
+ctsdir = $(datadir)/$(PACKAGE)/tests/cts
+cts_SCRIPTS = CTSlab.py \
+ cts
diff --git a/cts/lab/OCFIPraTest.py.in b/cts/lab/OCFIPraTest.py.in
new file mode 100644
index 0000000..2cce304
--- /dev/null
+++ b/cts/lab/OCFIPraTest.py.in
@@ -0,0 +1,173 @@
+#!@PYTHON@
+
+'''OCF IPaddr/IPaddr2 Resource Agent Test'''
+
+__copyright__ = """Original Author: Huang Zhen <zhenhltc@cn.ibm.com>
+Copyright 2004 International Business Machines
+
+with later changes copyright 2005-2023 the Pacemaker project contributors.
+The version control history for this file may have further details.
+"""
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import os
+import sys
+import time
+import random
+import struct
+import syslog
+
+from pacemaker import BuildOptions
+
+
+def usage():
+ print("usage: " + sys.argv[0] \
+ + " [-2]"\
+ + " [--ipbase|-i first-test-ip]"\
+ + " [--ipnum|-n test-ip-num]"\
+ + " [--help|-h]"\
+ + " [--perform|-p op]"\
+ + " [number-of-iterations]")
+ sys.exit(1)
+
+
+def perform_op(ra, ip, op):
+ os.environ["OCF_RA_VERSION_MAJOR"] = "1"
+ os.environ["OCF_RA_VERSION_MINOR"] = "0"
+ os.environ["OCF_ROOT"] = BuildOptions.OCF_ROOT_DIR
+ os.environ["OCF_RESOURCE_INSTANCE"] = ip
+ os.environ["OCF_RESOURCE_TYPE"] = ra
+ os.environ["OCF_RESKEY_ip"] = ip
+ os.environ["HA_LOGFILE"] = "/dev/null"
+ os.environ["HA_LOGFACILITY"] = "local7"
+ path = BuildOptions.OCF_ROOT_DIR + "/resource.d/heartbeat/" + ra
+ return os.spawnvpe(os.P_WAIT, path, [ra, op], os.environ)
+
+
+def audit(ra, iplist, ipstatus, summary):
+ passed = 1
+ for ip in iplist:
+ ret = perform_op(ra, ip, "monitor")
+ if ret != ipstatus[ip]:
+ passed = 0
+ log("audit: status of %s should be %d but it is %d\t [failure]" %
+ (ip,ipstatus[ip],ret))
+ ipstatus[ip] = ret
+ summary["audit"]["called"] += 1;
+ if passed :
+ summary["audit"]["success"] += 1
+ else :
+ summary["audit"]["failure"] += 1
+
+
+def log(towrite):
+ t = time.strftime("%Y/%m/%d_%H:%M:%S\t", time.localtime(time.time()))
+ logstr = t + " "+str(towrite)
+ syslog.syslog(logstr)
+ print(logstr)
+
+if __name__ == '__main__':
+ ra = "IPaddr"
+ ipbase = "127.0.0.10"
+ ipnum = 1
+ itnum = 50
+ perform = None
+ summary = {
+ "start":{"called":0,"success":0,"failure":0},
+ "stop" :{"called":0,"success":0,"failure":0},
+ "audit":{"called":0,"success":0,"failure":0}
+ }
+ syslog.openlog(sys.argv[0], 0, syslog.LOG_LOCAL7)
+
+ # Process arguments...
+ skipthis = None
+ args = sys.argv[1:]
+ for i in range(0, len(args)) :
+ if skipthis :
+ skipthis = None
+ continue
+ elif args[i] == "-2" :
+ ra = "IPaddr2"
+ elif args[i] == "--ip" or args[i] == "-i" :
+ skipthis = 1
+ ipbase = args[i+1]
+ elif args[i] == "--ipnum" or args[i] == "-n" :
+ skipthis = 1
+ ipnum = int(args[i+1])
+ elif args[i] == "--perform" or args[i] == "-p" :
+ skipthis = 1
+ perform = args[i+1]
+ elif args[i] == "--help" or args[i] == "-h" :
+ usage()
+ else:
+ itnum = int(args[i])
+
+ log("Begin OCF IPaddr/IPaddr2 Test")
+
+ # Generate the test ips
+ iplist = []
+ ipstatus = {}
+ fields = ipbase.split('.')
+ for i in range(0, ipnum) :
+ ip = fields.join('.')
+ iplist.append(ip)
+ ipstatus[ip] = perform_op(ra,ip,"monitor")
+ fields[3] = str(int(fields[3])+1)
+ log("Test ip:" + str(iplist))
+
+ # If use ask perform an operation
+ if perform != None:
+ log("Perform opeartion %s"%perform)
+ for ip in iplist:
+ perform_op(ra, ip, perform)
+ log("Done")
+ sys.exit()
+
+ log("RA Type:" + ra)
+ log("Test Count:" + str(itnum))
+
+ # Prepare Random
+ f = open("/dev/urandom", "r")
+ seed = struct.unpack("BBB", f.read(3))
+ f.close()
+ #seed=(123,321,231)
+ rand = random.Random()
+ rand.seed(seed[0])
+ log("Test Random Seed:" + str(seed))
+
+ #
+ # Begin Tests
+
+ log(">>>>>>>>>>>>>>>>>>>>>>>>")
+ for i in range(0, itnum):
+ ip = rand.choice(iplist)
+ if ipstatus[ip] == 0:
+ op = "stop"
+ elif ipstatus[ip] == 7:
+ op = "start"
+ else :
+ op = rand.choice(["start","stop"])
+
+ ret = perform_op(ra, ip, op)
+ # update status
+ if op == "start" and ret == 0:
+ ipstatus[ip] = 0
+ elif op == "stop" and ret == 0:
+ ipstatus[ip] = 7
+ else :
+ ipstatus[ip] = 1
+ result = ""
+ if ret == 0:
+ result = "success"
+ else :
+ result = "failure"
+ summary[op]["called"] += 1
+ summary[op][result] += 1
+ log( "%d:%s %s \t[%s]"%(i, op, ip, result))
+ audit(ra, iplist, ipstatus, summary)
+
+ log("<<<<<<<<<<<<<<<<<<<<<<<<")
+ log("start:\t" + str(summary["start"]))
+ log("stop: \t" + str(summary["stop"]))
+ log("audit:\t" + str(summary["audit"]))
+
diff --git a/cts/lab/__init__.py b/cts/lab/__init__.py
new file mode 100644
index 0000000..abed502
--- /dev/null
+++ b/cts/lab/__init__.py
@@ -0,0 +1,15 @@
+"""Python modules for Pacemaker's Cluster Test Suite (CTS)
+
+This package provides the following modules:
+
+CIB
+cib_xml
+CM_common
+CM_corosync
+CTSaudits
+CTS
+CTSscenarios
+CTStests
+patterns
+watcher
+"""
diff --git a/cts/lab/cib_xml.py b/cts/lab/cib_xml.py
new file mode 100644
index 0000000..378dd29
--- /dev/null
+++ b/cts/lab/cib_xml.py
@@ -0,0 +1,319 @@
+""" CIB XML generator for Pacemaker's Cluster Test Suite (CTS)
+"""
+
+__copyright__ = "Copyright 2008-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import sys
+
+from cts.CIB import CibBase
+
+
+class XmlBase(CibBase):
+ def __init__(self, Factory, tag, _id, **kwargs):
+ CibBase.__init__(self, Factory, tag, _id, **kwargs)
+
+ def show(self):
+ text = '''<%s''' % self.tag
+ if self.name:
+ text += ''' id="%s"''' % (self.name)
+ for k in list(self.kwargs.keys()):
+ text += ''' %s="%s"''' % (k, self.kwargs[k])
+
+ if not self.children:
+ text += '''/>'''
+ return text
+
+ text += '''>'''
+
+ for c in self.children:
+ text += c.show()
+
+ text += '''</%s>''' % self.tag
+ return text
+
+ def _run(self, operation, xml, section="all", options=""):
+ if self.name:
+ label = self.name
+ else:
+ label = "<%s>" % self.tag
+ self.Factory.debug("Writing out %s" % label)
+ fixed = "HOME=/root CIB_file="+self.Factory.tmpfile
+ fixed += " cibadmin --%s --scope %s %s --xml-text '%s'" % (operation, section, options, xml)
+ (rc, _) = self.Factory.rsh(self.Factory.target, fixed)
+ if rc != 0:
+ self.Factory.log("Configure call failed: "+fixed)
+ sys.exit(1)
+
+
+class InstanceAttributes(XmlBase):
+ """ Create an <instance_attributes> section with name-value pairs """
+
+ def __init__(self, Factory, name, attrs):
+ XmlBase.__init__(self, Factory, "instance_attributes", name)
+
+ # Create an <nvpair> for each attribute
+ for (attr, value) in list(attrs.items()):
+ self.add_child(XmlBase(Factory, "nvpair", "%s-%s" % (name, attr),
+ name=attr, value=value))
+
+
+class Node(XmlBase):
+ """ Create a <node> section with node attributes for one node """
+
+ def __init__(self, Factory, node_name, node_id, node_attrs):
+ XmlBase.__init__(self, Factory, "node", node_id, uname=node_name)
+ self.add_child(InstanceAttributes(Factory, "%s-1" % node_name, node_attrs))
+
+
+class Nodes(XmlBase):
+ """ Create a <nodes> section """
+
+ def __init__(self, Factory):
+ XmlBase.__init__(self, Factory, "nodes", None)
+
+ def add_node(self, node_name, node_id, node_attrs):
+ self.add_child(Node(self.Factory, node_name, node_id, node_attrs))
+
+ def commit(self):
+ self._run("modify", self.show(), "configuration", "--allow-create")
+
+
+class FencingTopology(XmlBase):
+ def __init__(self, Factory):
+ XmlBase.__init__(self, Factory, "fencing-topology", None)
+
+ def level(self, index, target, devices, target_attr=None, target_value=None):
+ # Generate XML ID (sanitizing target-by-attribute levels)
+
+ if target:
+ xml_id = "cts-%s.%d" % (target, index)
+ self.add_child(XmlBase(self.Factory, "fencing-level", xml_id, target=target, index=index, devices=devices))
+
+ else:
+ xml_id = "%s-%s.%d" % (target_attr, target_value, index)
+ child = XmlBase(self.Factory, "fencing-level", xml_id, index=index, devices=devices)
+ child["target-attribute"]=target_attr
+ child["target-value"]=target_value
+ self.add_child(child)
+
+ def commit(self):
+ self._run("create", self.show(), "configuration", "--allow-create")
+
+
+class Option(XmlBase):
+ def __init__(self, Factory, section="cib-bootstrap-options"):
+ XmlBase.__init__(self, Factory, "cluster_property_set", section)
+
+ def __setitem__(self, key, value):
+ self.add_child(XmlBase(self.Factory, "nvpair", "cts-%s" % key, name=key, value=value))
+
+ def commit(self):
+ self._run("modify", self.show(), "crm_config", "--allow-create")
+
+
+class OpDefaults(XmlBase):
+ def __init__(self, Factory):
+ XmlBase.__init__(self, Factory, "op_defaults", None)
+ self.meta = XmlBase(self.Factory, "meta_attributes", "cts-op_defaults-meta")
+ self.add_child(self.meta)
+
+ def __setitem__(self, key, value):
+ self.meta.add_child(XmlBase(self.Factory, "nvpair", "cts-op_defaults-%s" % key, name=key, value=value))
+
+ def commit(self):
+ self._run("modify", self.show(), "configuration", "--allow-create")
+
+
+class Alerts(XmlBase):
+ def __init__(self, Factory):
+ XmlBase.__init__(self, Factory, "alerts", None)
+ self.alert_count = 0
+
+ def add_alert(self, path, recipient):
+ self.alert_count = self.alert_count + 1
+ alert = XmlBase(self.Factory, "alert", "alert-%d" % self.alert_count,
+ path=path)
+ recipient1 = XmlBase(self.Factory, "recipient",
+ "alert-%d-recipient-1" % self.alert_count,
+ value=recipient)
+ alert.add_child(recipient1)
+ self.add_child(alert)
+
+ def commit(self):
+ self._run("modify", self.show(), "configuration", "--allow-create")
+
+
+class Expression(XmlBase):
+ def __init__(self, Factory, name, attr, op, value=None):
+ XmlBase.__init__(self, Factory, "expression", name, attribute=attr, operation=op)
+ if value:
+ self["value"] = value
+
+
+class Rule(XmlBase):
+ def __init__(self, Factory, name, score, op="and", expr=None):
+ XmlBase.__init__(self, Factory, "rule", "%s" % name)
+ self["boolean-op"] = op
+ self["score"] = score
+ if expr:
+ self.add_child(expr)
+
+
+class Resource(XmlBase):
+ def __init__(self, Factory, name, rtype, standard, provider=None):
+ XmlBase.__init__(self, Factory, "native", name)
+
+ self.rtype = rtype
+ self.standard = standard
+ self.provider = provider
+
+ self.op = []
+ self.meta = {}
+ self.param = {}
+
+ self.scores = {}
+ self.needs = {}
+ self.coloc = {}
+
+ if self.standard == "ocf" and not provider:
+ self.provider = "heartbeat"
+ elif self.standard == "lsb":
+ self.provider = None
+
+ def __setitem__(self, key, value):
+ self.add_param(key, value)
+
+ def add_op(self, name, interval, **kwargs):
+ self.op.append(
+ XmlBase(self.Factory, "op", "%s-%s" % (name, interval), name=name, interval=interval, **kwargs))
+
+ def add_param(self, name, value):
+ self.param[name] = value
+
+ def add_meta(self, name, value):
+ self.meta[name] = value
+
+ def prefer(self, node, score="INFINITY", rule=None):
+ if not rule:
+ rule = Rule(self.Factory, "prefer-%s-r" % node, score,
+ expr=Expression(self.Factory, "prefer-%s-e" % node, "#uname", "eq", node))
+ self.scores[node] = rule
+
+ def after(self, resource, kind="Mandatory", first="start", then="start", **kwargs):
+ kargs = kwargs.copy()
+ kargs["kind"] = kind
+ if then:
+ kargs["first-action"] = "start"
+ kargs["then-action"] = then
+
+ if first:
+ kargs["first-action"] = first
+
+ self.needs[resource] = kargs
+
+ def colocate(self, resource, score="INFINITY", role=None, withrole=None, **kwargs):
+ kargs = kwargs.copy()
+ kargs["score"] = score
+ if role:
+ kargs["rsc-role"] = role
+ if withrole:
+ kargs["with-rsc-role"] = withrole
+
+ self.coloc[resource] = kargs
+
+ def constraints(self):
+ text = "<constraints>"
+
+ for k in list(self.scores.keys()):
+ text += '''<rsc_location id="prefer-%s" rsc="%s">''' % (k, self.name)
+ text += self.scores[k].show()
+ text += '''</rsc_location>'''
+
+ for k in list(self.needs.keys()):
+ text += '''<rsc_order id="%s-after-%s" first="%s" then="%s"''' % (self.name, k, k, self.name)
+ kargs = self.needs[k]
+ for kw in list(kargs.keys()):
+ text += ''' %s="%s"''' % (kw, kargs[kw])
+ text += '''/>'''
+
+ for k in list(self.coloc.keys()):
+ text += '''<rsc_colocation id="%s-with-%s" rsc="%s" with-rsc="%s"''' % (self.name, k, self.name, k)
+ kargs = self.coloc[k]
+ for kw in list(kargs.keys()):
+ text += ''' %s="%s"''' % (kw, kargs[kw])
+ text += '''/>'''
+
+ text += "</constraints>"
+ return text
+
+ def show(self):
+ text = '''<primitive id="%s" class="%s" type="%s"''' % (self.name, self.standard, self.rtype)
+ if self.provider:
+ text += ''' provider="%s"''' % (self.provider)
+ text += '''>'''
+
+ if len(self.meta) > 0:
+ text += '''<meta_attributes id="%s-meta">''' % self.name
+ for p in list(self.meta.keys()):
+ text += '''<nvpair id="%s-%s" name="%s" value="%s"/>''' % (self.name, p, p, self.meta[p])
+ text += '''</meta_attributes>'''
+
+ if len(self.param) > 0:
+ text += '''<instance_attributes id="%s-params">''' % self.name
+ for p in list(self.param.keys()):
+ text += '''<nvpair id="%s-%s" name="%s" value="%s"/>''' % (self.name, p, p, self.param[p])
+ text += '''</instance_attributes>'''
+
+ if len(self.op) > 0:
+ text += '''<operations>'''
+ for o in self.op:
+ key = o.name
+ o.name = "%s-%s" % (self.name, key)
+ text += o.show()
+ o.name = key
+ text += '''</operations>'''
+
+ text += '''</primitive>'''
+ return text
+
+ def commit(self):
+ self._run("create", self.show(), "resources")
+ self._run("modify", self.constraints())
+
+
+class Group(Resource):
+ def __init__(self, Factory, name):
+ Resource.__init__(self, Factory, name, None, None)
+ self.tag = "group"
+
+ def __setitem__(self, key, value):
+ self.add_meta(key, value)
+
+ def show(self):
+ text = '''<%s id="%s">''' % (self.tag, self.name)
+
+ if len(self.meta) > 0:
+ text += '''<meta_attributes id="%s-meta">''' % self.name
+ for p in list(self.meta.keys()):
+ text += '''<nvpair id="%s-%s" name="%s" value="%s"/>''' % (self.name, p, p, self.meta[p])
+ text += '''</meta_attributes>'''
+
+ for c in self.children:
+ text += c.show()
+ text += '''</%s>''' % self.tag
+ return text
+
+
+class Clone(Group):
+ def __init__(self, Factory, name, child=None):
+ Group.__init__(self, Factory, name)
+ self.tag = "clone"
+ if child:
+ self.add_child(child)
+
+ def add_child(self, resource):
+ if not self.children:
+ self.children.append(resource)
+ else:
+ self.Factory.log("Clones can only have a single child. Ignoring %s" % resource.name)
diff --git a/cts/lab/cluster_test.in b/cts/lab/cluster_test.in
new file mode 100755
index 0000000..1741b47
--- /dev/null
+++ b/cts/lab/cluster_test.in
@@ -0,0 +1,175 @@
+#!@BASH_PATH@
+#
+# Copyright 2008-2020 the Pacemaker project contributors
+#
+# The version control history for this file may have further details.
+#
+# This source code is licensed under the GNU General Public License version 2
+# or later (GPLv2+) WITHOUT ANY WARRANTY.
+#
+if [ -e ~/.cts ]; then
+ . ~/.cts
+fi
+anyAsked=0
+
+[ $# -lt 1 ] || CTS_numtests=$1
+
+die() { echo "$@"; exit 1; }
+
+if [ -z "$CTS_asked_once" ]; then
+ anyAsked=1
+ echo "This script should only be executed on the test exerciser."
+ echo "The test exerciser will remotely execute the actions required by the"
+ echo "tests and should not be part of the cluster itself."
+
+ read -p "Is this host intended to be the test exerciser? (yN) " doUnderstand
+ [ "$doUnderstand" = "y" ] \
+ || die "This script must be executed on the test exerciser"
+fi
+
+if [ -z "$CTS_node_list" ]; then
+ anyAsked=1
+ read -p "Please list your cluster nodes (eg. node1 node2 node3): " CTS_node_list
+else
+ echo "Beginning test of cluster: $CTS_node_list"
+fi
+
+if [ -z "$CTS_stack" ]; then
+ anyAsked=1
+ read -p "Which cluster stack are you using? ([corosync]): " CTS_stack
+ [ -n "$CTS_stack" ] || CTS_stack=corosync
+else
+ echo "Using the $CTS_stack cluster stack"
+fi
+
+[ "${CTS_node_list}" = "${CTS_node_list/$HOSTNAME/}" ] \
+ || die "This script must be executed on the test exerciser, and the test exerciser cannot be part of the cluster"
+
+printf "+ Bootstrapping ssh... "
+if [ -z "$SSH_AUTH_SOCK" ]; then
+ printf "\n + Initializing SSH "
+ eval "$(ssh-agent)"
+ echo " + Adding identities..."
+ ssh-add
+ rc=$?
+ if [ $rc -ne 0 ]; then
+ echo " -- No identities added"
+ printf "\nThe ability to open key-based 'ssh' connections (as the user 'root') is required to use CTS.\n"
+
+ read -p " - Do you want this program to help you create one? (yN) " auto_fix
+ if [ "$auto_fix" = "y" ]; then
+ ssh-keygen -t dsa
+ ssh-add
+ else
+ die "Please run 'ssh-keygen -t dsa' to create a new key"
+ fi
+ fi
+else
+ echo "OK"
+fi
+
+test_ok=1
+printf "+ Testing ssh configuration... "
+for n in $CTS_node_list; do
+ ssh -l root -o PasswordAuthentication=no -o ConnectTimeout=5 "$n" /bin/true
+ rc=$?
+ if [ $rc -ne 0 ]; then
+ echo " - connection to $n failed"
+ test_ok=0
+ fi
+done
+
+if [ $test_ok -eq 0 ]; then
+ printf "\nThe ability to open key-based 'ssh' connections (as the user 'root') is required to use CTS.\n"
+
+ read -p " - Do you want this program to help you with such a setup? (yN) " auto_fix
+ if [ "$auto_fix" = "y" ]; then
+ # XXX are we picking the most suitable identity?
+ privKey=$(ssh-add -L | head -n1 | cut -d" " -f3)
+ sshCopyIdOpts="-o User=root"
+ [ -z "$privKey" ] || sshCopyIdOpts+=" -i \"${privKey}.pub\""
+ for n in $CTS_node_list; do
+ eval "ssh-copy-id $sshCopyIdOpts \"${n}\"" \
+ || die "Attempt to 'ssh-copy-id $sshCopyIdOpts \"$n\"' failed"
+ done
+ else
+ die "Please install one of your SSH public keys to root's account on all cluster nodes"
+ fi
+fi
+echo "OK"
+
+if [ -z "$CTS_logfile" ]; then
+ anyAsked=1
+ read -p " + Where does/should syslog store logs from remote hosts? (/var/log/messages) " CTS_logfile
+ [ -n "$CTS_logfile" ] || CTS_logfile=/var/log/messages
+fi
+
+[ -e "$CTS_logfile" ] || die "$CTS_logfile doesn't exist"
+
+if [ -z "$CTS_logfacility" ]; then
+ anyAsked=1
+ read -p " + Which log facility does the cluster use? (daemon) " CTS_logfacility
+ [ -n "$CTS_logfacility" ] || CTS_logfacility=daemon
+fi
+
+if [ -z "$CTS_boot" ]; then
+ read -p "+ Is the cluster software started automatically when a node boots? [yN] " CTS_boot
+ if [ -z "$CTS_boot" ]; then
+ CTS_boot=0
+ else
+ case $CTS_boot in
+ 1|y|Y) CTS_boot=1;;
+ *) CTS_boot=0;;
+ esac
+ fi
+fi
+
+if [ -z "$CTS_numtests" ]; then
+ read -p "+ How many test iterations should be performed? (500) " CTS_numtests
+ [ -n "$CTS_numtests" ] || CTS_numtests=500
+fi
+
+if [ -z "$CTS_asked_once" ]; then
+ anyAsked=1
+ read -p "+ What type of STONITH agent do you use? (none) " CTS_stonith
+ [ -z "$CTS_stonith" ] \
+ || read -p "+ List any STONITH agent parameters (eq. device_host=switch.power.com): " CTS_stonith_args
+ [ -n "$CTS_adv" ] \
+ || read -p "+ (Advanced) Any extra CTS parameters? (none) " CTS_adv
+fi
+
+[ $anyAsked -eq 0 ] \
+ || read -p "+ Save values to ~/.cts for next time? (yN) " doSave
+
+if [ "$doSave" = "y" ]; then
+ cat > ~/.cts <<-EOF
+ # CTS Test data
+ CTS_stack="$CTS_stack"
+ CTS_node_list="$CTS_node_list"
+ CTS_logfile="$CTS_logfile"
+ CTS_logport="$CTS_logport"
+ CTS_logfacility="$CTS_logfacility"
+ CTS_asked_once=1
+ CTS_adv="$CTS_adv"
+ CTS_stonith="$CTS_stonith"
+ CTS_stonith_args="$CTS_stonith_args"
+ CTS_boot="$CTS_boot"
+EOF
+fi
+
+cts_extra=""
+if [ -n "$CTS_stonith" ]; then
+ cts_extra="$cts_extra --stonith-type $CTS_stonith"
+ [ -z "$CTS_stonith_args" ] \
+ || cts_extra="$cts_extra --stonith-params \"$CTS_stonith_args\""
+else
+ cts_extra="$cts_extra --stonith 0"
+ echo " - Testing a cluster without STONITH is like a blunt pencil... pointless"
+fi
+
+printf "\nAll set to go for %d iterations!\n" "$CTS_numtests"
+[ $anyAsked -ne 0 ] \
+ || echo "+ To use a different configuration, remove ~/.cts and re-run cts (or edit it manually)."
+
+echo Now paste the following command into this shell:
+echo "@PYTHON@ `dirname "$0"`/CTSlab.py -L \"$CTS_logfile\" --syslog-facility \"$CTS_logfacility\" --no-unsafe-tests --stack \"$CTS_stack\" $CTS_adv --at-boot \"$CTS_boot\" $cts_extra \"$CTS_numtests\" --nodes \"$CTS_node_list\""
diff --git a/cts/lab/cts-log-watcher.in b/cts/lab/cts-log-watcher.in
new file mode 100644
index 0000000..cee9c94
--- /dev/null
+++ b/cts/lab/cts-log-watcher.in
@@ -0,0 +1,84 @@
+#!@PYTHON@
+""" Remote log reader for Pacemaker's Cluster Test Suite (CTS)
+
+Reads a specified number of lines from the supplied offset
+Returns the current offset
+Contains logic for handling truncation
+"""
+
+__copyright__ = "Copyright 2014-2020 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import sys
+import os
+import fcntl
+
+if __name__ == '__main__':
+
+ limit = 0
+ offset = 0
+ prefix = ''
+ filename = '/var/log/messages'
+
+ skipthis=None
+ args=sys.argv[1:]
+ for i in range(0, len(args)):
+ if skipthis:
+ skipthis=None
+ continue
+
+ elif args[i] == '-l' or args[i] == '--limit':
+ skipthis=1
+ limit = int(args[i+1])
+
+ elif args[i] == '-f' or args[i] == '--filename':
+ skipthis=1
+ filename = args[i+1]
+
+ elif args[i] == '-o' or args[i] == '--offset':
+ skipthis=1
+ offset = args[i+1]
+
+ elif args[i] == '-p' or args[i] == '--prefix':
+ skipthis=1
+ prefix = args[i+1]
+
+ elif args[i] == '-t' or args[i] == '--tag':
+ skipthis=1
+
+ if not os.access(filename, os.R_OK):
+ print(prefix + 'Last read: %d, limit=%d, count=%d - unreadable' % (0, limit, 0))
+ sys.exit(1)
+
+ logfile=open(filename, 'r')
+ logfile.seek(0, os.SEEK_END)
+ newsize=logfile.tell()
+
+ if offset != 'EOF':
+ offset = int(offset)
+ if newsize >= offset:
+ logfile.seek(offset)
+ else:
+ print(prefix + ('File truncated from %d to %d' % (offset, newsize)))
+ if (newsize*1.05) < offset:
+ logfile.seek(0)
+ # else: we probably just lost a few logs after a fencing op
+ # continue from the new end
+ # TODO: accept a timestamp and discard all messages older than it
+
+ # Don't block when we reach EOF
+ fcntl.fcntl(logfile.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
+
+ count = 0
+ while True:
+ if logfile.tell() >= newsize: break
+ elif limit and count >= limit: break
+
+ line = logfile.readline()
+ if not line: break
+
+ print(line.strip())
+ count += 1
+
+ print(prefix + 'Last read: %d, limit=%d, count=%d' % (logfile.tell(), limit, count))
+ logfile.close()
diff --git a/cts/lab/cts.in b/cts/lab/cts.in
new file mode 100755
index 0000000..5b3aaab
--- /dev/null
+++ b/cts/lab/cts.in
@@ -0,0 +1,262 @@
+#!@BASH_PATH@
+#
+# Copyright 2012-2023 the Pacemaker project contributors
+#
+# The version control history for this file may have further details.
+#
+# This source code is licensed under the GNU General Public License version 2
+# or later (GPLv2+) WITHOUT ANY WARRANTY.
+#
+
+# e.g. /etc/sysconfig or /etc/default
+CONFIG_DIR=@CONFIGDIR@
+
+cts_root=`dirname $0`
+
+logfile=0
+summary=0
+verbose=0
+watch=0
+saved=0
+tests=""
+
+install=0
+clean=0
+kill=0
+run=0
+boot=0
+setup=0
+target=rhel-7
+cmd=""
+trace=""
+
+custom_log=""
+patterns="-e CTS:"
+
+
+helpmsg=$(cat <<EOF
+Usage: %s [options] {[setup [TARGET]] | [OTHER-CMDS]}
+
+[--]help, -h show help screen and exit
+-x turn on debugging
+-a show relevant screen sessions and exit
+-c,-g CLUSTER_NAME set the cluster name
+-S show summary from the last CTS run
+-s show summary for the current log (see -l)
+-v increase verbosity
+-p (currently unused)
+-e PATTERN grep pattern to apply when 'summary' or 'watch' requested
+-l print the filename of the log that would be operated on
+-w continous (filtered) monitoring of the log file
+-f,-sf FILE show summary for the provided log
+-t TEST, [0-9]* add a test to the working set
+[--]kill request termination of cluster software
+[--]run request CTS run (passing remaining arguments through)
+[--]boot, start request CTS run (with --boot option)
+[--]clean request cleaning up after CTS run
+[--]install, --inst request installing packages to get ready to run CTS
+[--]setup request initialization to get ready to run CTS
+trace-ls, tls list traced functions
+trace-add, tadd FUNC add a function to the list of traced ones
+trace-rm, trm FUNC remove a function from the list of traced ones
+trace-set, tset FUNC set function(s) as the only to be traced
+(f|fedora|r|rhel).* specify target distro
+-- delimits tests that follow
+EOF
+)
+
+while true; do
+ case $1 in
+ -h|--help|help) printf "${helpmsg}\n" "$0"; exit;;
+ -x) set -x; shift;;
+ -a)
+ screen -ls | grep cts
+ exit 0;;
+ -c|-g) cluster_name=$2; shift; shift;;
+ -S) summary=1; saved=1; shift;;
+ -s) summary=1; shift;;
+ -v) verbose=`expr $verbose + 1`; shift;;
+ -p) shift;;
+ -e) patterns="$patterns -e `echo $2 | sed 's/ /\\\W/g'`"; shift; shift;;
+ -l) logfile=1; shift;;
+ -w) watch=1; shift;;
+ -f|-sf) summary=1; custom_log=$2; shift; shift;;
+ -t) tests="$tests $2"; shift; shift;;
+ [0-9]*) tests="$tests $1"; shift;;
+ --kill|kill) kill=1; shift; break;;
+ --run|run) run=1; shift; break;;
+ --boot|boot|start) boot=1; clean=1; shift; break;;
+ --clean|clean) clean=1; shift;;
+ --inst|--install|install) install=1; clean=1; shift;;
+ --setup|setup) setup=1; shift;;
+ trace-ls|tls) cmd=$1; shift;;
+ trace-add|tadd|trace-rm|trm|trace-set|tset) cmd=$1; trace=$2; shift; shift;;
+ f*)
+ target="fedora-`echo $1 | sed -e s/fedora// -e s/-// -e s/f//`"
+ shift;;
+ r|rhel) target="rhel-7"; shift;;
+ r*)
+ target="rhel-`echo $1 | sed -e s/rhel// -e s/-// -e s/r//`"
+ shift;;
+ --) shift; tests="$tests $*"; break;;
+ "") break;;
+ *) echo "Unknown argument: $1"; exit 1;;
+ esac
+done
+
+# Add the location of this script
+export PATH="$PATH:$cts_root"
+which cluster-helper &>/dev/null
+if [ $? != 0 ]; then
+ echo $0 needs the cluster-helper script to be in your path
+ exit 1
+fi
+
+which cluster-clean &>/dev/null
+if [ $? != 0 ]; then
+ echo $0 needs the cluster-clean script to be in your path
+ exit 1
+fi
+
+if [ "x$cluster_name" = x ] || [ "x$cluster_name" = xpick ]; then
+ clusters=`ls -1 ~/.dsh/group/[a-z]+[0-9] | sed s/.*group.// | tr '\n' ' ' `
+
+ echo "custom) interactively define a cluster"
+ for i in $clusters; do
+ echo "$i) `cluster-helper --list short -g $i`"
+ done
+
+ read -p "Choose a cluster [custom]: " cluster_name
+ echo
+fi
+
+if [ -z $cluster_name ]; then
+ cluster_name=custom
+fi
+
+
+case $cluster_name in
+ custom)
+ read -p "Cluster name: " cluster_name
+ read -p "Cluster hosts: " cluster_hosts
+ read -p "Cluster log file: " cluster_log
+ cluster-helper add -g "$cluster_name" -w "$cluster_hosts"
+ ;;
+ *)
+ cluster_hosts=`cluster-helper --list short -g $cluster_name`
+ cluster_log=~/cluster-$cluster_name.log;
+ ;;
+esac
+
+if [ x$cmd != x ]; then
+ config="${CONFIG_DIR}/pacemaker"
+ case $cmd in
+ trace-ls|tls)
+ cluster-helper -g $cluster_name -- grep PCMK_trace_functions $config
+ ;;
+ trace-add|tadd)
+ echo "Adding $trace to PCMK_trace_functions"
+ cluster-helper -g $cluster_name -- sed -i "s/.*PCMK_trace_functions=/PCMK_trace_functions=$trace,/" $config
+ ;;
+ trace-rm|trm)
+ echo "Removing $trace from PCMK_trace_functions"
+ cluster-helper -g $cluster_name -- sed -i "s/.*PCMK_trace_functions=\\\\\\(.*\\\\\\)$trace,\\\\\\(.*\\\\\\)/PCMK_trace_functions=\\\\\\1\\\\\\2/" $config
+ ;;
+ trace-set|tset)
+ echo "Setting PCMK_trace_functions to '$trace'"
+ cluster-helper -g $cluster_name -- sed -i "s/.*PCMK_trace_functions.*/PCMK_trace_functions=$trace/" $config
+ ;;
+ esac
+ exit 0
+fi
+
+if [ $run = 1 ]; then
+ install=1
+ clean=1
+fi
+
+if [ $clean = 1 ]; then
+ rm -f $cluster_log; cluster-clean -g $cluster_name --kill
+elif [ $kill = 1 ]; then
+ cluster-clean -g $cluster_name --kill-only
+ exit 0
+fi
+
+if [ $install = 1 ]; then
+ cluster-helper -g $cluster_name -- yum install -y pacemaker pacemaker-debuginfo pacemaker-cts libqb libqb-debuginfo
+fi
+
+if [ $setup = 1 ]; then
+ cluster-init -g $cluster_name $target -u --test
+ exit 0
+
+elif [ $boot = 1 ]; then
+ $cts_root/CTSlab.py -r -c -g $cluster_name --boot
+ rc=$?
+ if [ $rc = 0 ]; then
+ echo "The cluster is ready..."
+ fi
+ exit $rc
+
+elif [ $run = 1 ]; then
+ $cts_root/CTSlab.py -r -c -g $cluster_name 500 "$@"
+ exit $?
+
+elif [ $clean = 1 ]; then
+ exit 0
+fi
+
+screen -ls | grep cts-$cluster_name &>/dev/null
+active=$?
+
+if [ ! -z $custom_log ]; then
+ cluster_log=$custom_log
+fi
+
+if [ "x$tests" != x ] && [ "x$tests" != "x " ]; then
+ for t in $tests; do
+ echo "crm_report --cts-log $cluster_log -d -T $t"
+ crm_report --cts-log $cluster_log -d -T $t
+ done
+
+elif [ $logfile = 1 ]; then
+ echo $cluster_log
+
+elif [ $summary = 1 ]; then
+ files=$cluster_log
+ if [ $saved = 1 ]; then
+ files=`ls -1tr ~/CTS-*/cluster-log.txt`
+ fi
+ for f in $files; do
+ echo $f
+ case $verbose in
+ 0) cat -n $f | grep $patterns | grep -v "CTS: debug:"
+ ;;
+ 1) cat -n $f | grep $patterns | grep -v "CTS:.* cmd:"
+ ;;
+ *) cat -n $f | grep $patterns
+ ;;
+ esac
+ echo ""
+ done
+
+elif [ $watch = 1 ]; then
+ case $verbose in
+ 0) tail -F $cluster_log | grep $patterns | grep -v "CTS: debug:"
+ ;;
+ 1) tail -F $cluster_log | grep $patterns | grep -v "CTS:.* cmd:"
+ ;;
+ *) tail -F $cluster_log | grep $patterns
+ ;;
+ esac
+
+elif [ $active = 0 ]; then
+ screen -x cts-$cluster_name
+
+else
+ touch $cluster_log
+
+# . ~/.bashrc
+ export cluster_name cluster_hosts cluster_log
+ screen -S cts-$cluster_name bash
+fi