summaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--python/Makefile.am10
-rw-r--r--python/pacemaker/Makefile.am4
-rw-r--r--python/pacemaker/_cts/CTS.py31
-rw-r--r--python/pacemaker/_cts/Makefile.am14
-rw-r--r--python/pacemaker/_cts/audits.py1029
-rw-r--r--python/pacemaker/_cts/cib.py425
-rw-r--r--python/pacemaker/_cts/cibxml.py734
-rw-r--r--python/pacemaker/_cts/clustermanager.py916
-rw-r--r--python/pacemaker/_cts/cmcorosync.py80
-rw-r--r--python/pacemaker/_cts/environment.py35
-rw-r--r--python/pacemaker/_cts/input.py18
-rw-r--r--python/pacemaker/_cts/logging.py2
-rw-r--r--python/pacemaker/_cts/network.py59
-rw-r--r--python/pacemaker/_cts/patterns.py14
-rw-r--r--python/pacemaker/_cts/process.py2
-rw-r--r--python/pacemaker/_cts/remote.py8
-rw-r--r--python/pacemaker/_cts/scenarios.py422
-rw-r--r--python/pacemaker/_cts/test.py35
-rw-r--r--python/pacemaker/_cts/tests/Makefile.am14
-rw-r--r--python/pacemaker/_cts/tests/__init__.py87
-rw-r--r--python/pacemaker/_cts/tests/componentfail.py167
-rw-r--r--python/pacemaker/_cts/tests/ctstest.py252
-rw-r--r--python/pacemaker/_cts/tests/fliptest.py61
-rw-r--r--python/pacemaker/_cts/tests/maintenancemode.py238
-rw-r--r--python/pacemaker/_cts/tests/nearquorumpointtest.py125
-rw-r--r--python/pacemaker/_cts/tests/partialstart.py75
-rw-r--r--python/pacemaker/_cts/tests/reattach.py221
-rw-r--r--python/pacemaker/_cts/tests/remotebasic.py39
-rw-r--r--python/pacemaker/_cts/tests/remotedriver.py556
-rw-r--r--python/pacemaker/_cts/tests/remotemigrate.py63
-rw-r--r--python/pacemaker/_cts/tests/remoterscfailure.py73
-rw-r--r--python/pacemaker/_cts/tests/remotestonithd.py62
-rw-r--r--python/pacemaker/_cts/tests/resourcerecover.py175
-rw-r--r--python/pacemaker/_cts/tests/restartonebyone.py58
-rw-r--r--python/pacemaker/_cts/tests/restarttest.py49
-rw-r--r--python/pacemaker/_cts/tests/resynccib.py75
-rw-r--r--python/pacemaker/_cts/tests/simulstart.py42
-rw-r--r--python/pacemaker/_cts/tests/simulstartlite.py133
-rw-r--r--python/pacemaker/_cts/tests/simulstop.py42
-rw-r--r--python/pacemaker/_cts/tests/simulstoplite.py91
-rw-r--r--python/pacemaker/_cts/tests/splitbraintest.py215
-rw-r--r--python/pacemaker/_cts/tests/standbytest.py110
-rw-r--r--python/pacemaker/_cts/tests/startonebyone.py55
-rw-r--r--python/pacemaker/_cts/tests/starttest.py54
-rw-r--r--python/pacemaker/_cts/tests/stonithdtest.py145
-rw-r--r--python/pacemaker/_cts/tests/stoponebyone.py56
-rw-r--r--python/pacemaker/_cts/tests/stoptest.py99
-rw-r--r--python/pacemaker/_cts/timer.py63
-rw-r--r--python/pacemaker/_cts/watcher.py10
-rw-r--r--python/pacemaker/buildoptions.py.in3
-rw-r--r--python/pylintrc3
-rw-r--r--python/setup.py.in2
-rw-r--r--python/tests/Makefile.am3
-rw-r--r--python/tests/__init__.py0
-rw-r--r--python/tests/test_cts_network.py37
55 files changed, 7304 insertions, 87 deletions
diff --git a/python/Makefile.am b/python/Makefile.am
index 6cefb63..803fb0c 100644
--- a/python/Makefile.am
+++ b/python/Makefile.am
@@ -11,10 +11,16 @@ MAINTAINERCLEANFILES = Makefile.in
EXTRA_DIST = pylintrc
-SUBDIRS = pacemaker tests
+SUBDIRS = pacemaker \
+ tests
+.PHONY: check-local
check-local:
- $(PYTHON) -m unittest discover -v -s tests
+ if [ "x$(top_srcdir)" != "x$(top_builddir)" ]; then \
+ cp -r $(top_srcdir)/python/* $(abs_top_builddir)/python/; \
+ fi
+ PYTHONPATH=$(top_builddir)/python $(PYTHON) -m unittest discover -v -s $(top_builddir)/python/tests
+.PHONY: pylint
pylint:
pylint $(SUBDIRS)
diff --git a/python/pacemaker/Makefile.am b/python/pacemaker/Makefile.am
index f209bba..df9cc46 100644
--- a/python/pacemaker/Makefile.am
+++ b/python/pacemaker/Makefile.am
@@ -9,8 +9,8 @@
MAINTAINERCLEANFILES = Makefile.in
-pkgpython_PYTHON = __init__.py \
- exitstatus.py
+pkgpython_PYTHON = __init__.py \
+ exitstatus.py
nodist_pkgpython_PYTHON = buildoptions.py
diff --git a/python/pacemaker/_cts/CTS.py b/python/pacemaker/_cts/CTS.py
index 4ca7e59..166ea10 100644
--- a/python/pacemaker/_cts/CTS.py
+++ b/python/pacemaker/_cts/CTS.py
@@ -10,6 +10,7 @@ import traceback
from pacemaker.exitstatus import ExitStatus
from pacemaker._cts.environment import EnvFactory
+from pacemaker._cts.input import should_continue
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.remote import RemoteFactory
@@ -32,7 +33,7 @@ class CtsLab:
def __init__(self, args=None):
""" Create a new CtsLab instance. This class can be treated kind
of like a dictionary due to the presence of typical dict functions
- like has_key, __getitem__, and __setitem__. However, it is not a
+ like __contains__, __getitem__, and __setitem__. However, it is not a
dictionary so do not rely on standard dictionary behavior.
Arguments:
@@ -48,10 +49,12 @@ class CtsLab:
self._env.dump()
- def has_key(self, key):
+ def __contains__(self, key):
""" Does the given environment key exist? """
- return key in list(self._env.keys())
+ # pylint gets confused because of EnvFactory here.
+ # pylint: disable=unsupported-membership-test
+ return key in self._env
def __getitem__(self, key):
""" Return the given environment key, or raise KeyError if it does
@@ -90,7 +93,7 @@ class CtsLab:
for node in self._env["nodes"]:
self._logger.log(" * %s" % (node))
- if not scenario.SetUp():
+ if not scenario.setup():
return ExitStatus.ERROR
# We want to alert on any exceptions caused by running a scenario, so
@@ -103,16 +106,16 @@ class CtsLab:
self._logger.traceback(traceback)
scenario.summarize()
- scenario.TearDown()
+ scenario.teardown()
return ExitStatus.ERROR
- scenario.TearDown()
+ scenario.teardown()
scenario.summarize()
- if scenario.Stats["failure"] > 0:
+ if scenario.stats["failure"] > 0:
return ExitStatus.ERROR
- if scenario.Stats["success"] != iterations:
+ if scenario.stats["success"] != iterations:
self._logger.log("No failure count but success != requested iterations")
return ExitStatus.ERROR
@@ -177,15 +180,7 @@ class NodeStatus:
timeout -= 1
LogFactory().log("%s did not come up within %d tries" % (node, initial_timeout))
- if self._env["continue"]:
- answer = "Y"
- else:
- try:
- answer = input('Continue? [nY]')
- except EOFError:
- answer = "n"
-
- if answer and answer == "n":
+ if not should_continue(self._env["continue"]):
raise ValueError("%s did not come up within %d tries" % (node, initial_timeout))
return False
@@ -241,4 +236,4 @@ class Process:
(rc, _) = self._cm.rsh(node, "killall -9 %s" % self.name)
if rc != 0:
- self._cm.log ("ERROR: Kill %s failed on node %s" % (self.name, node))
+ self._cm.log("ERROR: Kill %s failed on node %s" % (self.name, node))
diff --git a/python/pacemaker/_cts/Makefile.am b/python/pacemaker/_cts/Makefile.am
index 3b3e3f8..efb0019 100644
--- a/python/pacemaker/_cts/Makefile.am
+++ b/python/pacemaker/_cts/Makefile.am
@@ -11,14 +11,6 @@ MAINTAINERCLEANFILES = Makefile.in
pkgpythondir = $(pythondir)/$(PACKAGE)/_cts
-pkgpython_PYTHON = CTS.py \
- __init__.py \
- corosync.py \
- environment.py \
- errors.py \
- logging.py \
- patterns.py \
- process.py \
- remote.py \
- test.py \
- watcher.py
+pkgpython_PYTHON = $(wildcard *.py)
+
+SUBDIRS = tests
diff --git a/python/pacemaker/_cts/audits.py b/python/pacemaker/_cts/audits.py
new file mode 100644
index 0000000..dc66f96
--- /dev/null
+++ b/python/pacemaker/_cts/audits.py
@@ -0,0 +1,1029 @@
+""" Auditing classes for Pacemaker's Cluster Test Suite (CTS) """
+
+__all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import re
+import time
+import uuid
+
+from pacemaker.buildoptions import BuildOptions
+from pacemaker._cts.input import should_continue
+from pacemaker._cts.watcher import LogKind, LogWatcher
+
+
+class ClusterAudit:
+ """ The base class for various kinds of auditors. Specific audit implementations
+ should be built on top of this one. Audits can do all kinds of checks on the
+ system. The basic interface for callers is the `__call__` method, which
+ returns True if the audit passes and False if it fails.
+ """
+
+ def __init__(self, cm):
+ """ Create a new ClusterAudit instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ # pylint: disable=invalid-name
+ self._cm = cm
+ self.name = None
+
+ def __call__(self):
+ raise NotImplementedError
+
+ def is_applicable(self):
+ """ Return True if this audit is applicable in the current test configuration.
+ This method must be implemented by all subclasses.
+ """
+
+ raise NotImplementedError
+
+ def log(self, args):
+ """ Log a message """
+
+ self._cm.log("audit: %s" % args)
+
+ def debug(self, args):
+ """ Log a debug message """
+
+ self._cm.debug("audit: %s" % args)
+
+
+class LogAudit(ClusterAudit):
+ """ Audit each cluster node to verify that some logging system is usable.
+ This is done by logging a unique test message and then verifying that
+ we can read back that test message using logging tools.
+ """
+
+ def __init__(self, cm):
+ """ Create a new LogAudit instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ ClusterAudit.__init__(self, cm)
+ self.name = "LogAudit"
+
+ def _restart_cluster_logging(self, nodes=None):
+ """ Restart logging on the given nodes, or all if none are given """
+
+ if not nodes:
+ nodes = self._cm.env["nodes"]
+
+ self._cm.debug("Restarting logging on: %r" % nodes)
+
+ for node in nodes:
+ if self._cm.env["have_systemd"]:
+ (rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket")
+ if rc != 0:
+ self._cm.log("ERROR: Cannot stop 'systemd-journald' on %s" % node)
+
+ (rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service")
+ if rc != 0:
+ self._cm.log("ERROR: Cannot start 'systemd-journald' on %s" % node)
+
+ (rc, _) = self._cm.rsh(node, "service %s restart" % self._cm.env["syslogd"])
+ if rc != 0:
+ self._cm.log("ERROR: Cannot restart '%s' on %s" % (self._cm.env["syslogd"], node))
+
+ def _create_watcher(self, patterns, kind):
+ """ Create a new LogWatcher instance for the given patterns """
+
+ watch = LogWatcher(self._cm.env["LogFileName"], patterns,
+ self._cm.env["nodes"], kind, "LogAudit", 5,
+ silent=True)
+ watch.set_watch()
+ return watch
+
+ def _test_logging(self):
+ """ Perform the log audit """
+
+ patterns = []
+ prefix = "Test message from"
+ suffix = str(uuid.uuid4())
+ watch = {}
+
+ for node in self._cm.env["nodes"]:
+ # Look for the node name in two places to make sure
+ # that syslog is logging with the correct hostname
+ m = re.search("^([^.]+).*", node)
+ if m:
+ simple = m.group(1)
+ else:
+ simple = node
+
+ patterns.append("%s.*%s %s %s" % (simple, prefix, node, suffix))
+
+ watch_pref = self._cm.env["LogWatcher"]
+ if watch_pref == LogKind.ANY:
+ kinds = [LogKind.FILE]
+ if self._cm.env["have_systemd"]:
+ kinds += [LogKind.JOURNAL]
+ kinds += [LogKind.REMOTE_FILE]
+ for k in kinds:
+ watch[k] = self._create_watcher(patterns, k)
+ self._cm.log("Logging test message with identifier %s" % suffix)
+ else:
+ watch[watch_pref] = self._create_watcher(patterns, watch_pref)
+
+ for node in self._cm.env["nodes"]:
+ cmd = "logger -p %s.info %s %s %s" % (self._cm.env["SyslogFacility"], prefix, node, suffix)
+
+ (rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0)
+ if rc != 0:
+ self._cm.log("ERROR: Cannot execute remote command [%s] on %s" % (cmd, node))
+
+ for k in list(watch.keys()):
+ w = watch[k]
+ if watch_pref == LogKind.ANY:
+ self._cm.log("Checking for test message in %s logs" % k)
+ w.look_for_all(silent=True)
+ if w.unmatched:
+ for regex in w.unmatched:
+ self._cm.log("Test message [%s] not found in %s logs" % (regex, w.kind))
+ else:
+ if watch_pref == LogKind.ANY:
+ self._cm.log("Found test message in %s logs" % k)
+ self._cm.env["LogWatcher"] = k
+ return 1
+
+ return False
+
+ def __call__(self):
+ max_attempts = 3
+ attempt = 0
+
+ self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
+ while attempt <= max_attempts and not self._test_logging():
+ attempt += 1
+ self._restart_cluster_logging()
+ time.sleep(60*attempt)
+
+ if attempt > max_attempts:
+ self._cm.log("ERROR: Cluster logging unrecoverable.")
+ return False
+
+ return True
+
+ def is_applicable(self):
+ """ Return True if this audit is applicable in the current test configuration. """
+
+ if self._cm.env["LogAuditDisabled"]:
+ return False
+
+ return True
+
+
+class DiskAudit(ClusterAudit):
+ """ Audit disk usage on cluster nodes to verify that there is enough free
+ space left on whichever mounted file system holds the logs.
+
+ Warn on: less than 100 MB or 10% of free space
+ Error on: less than 10 MB or 5% of free space
+ """
+
+ def __init__(self, cm):
+ """ Create a new DiskAudit instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ ClusterAudit.__init__(self, cm)
+ self.name = "DiskspaceAudit"
+
+ def __call__(self):
+ result = True
+
+ # @TODO Use directory of PCMK_logfile if set on host
+ dfcmd = "df -BM %s | tail -1 | awk '{print $(NF-1)\" \"$(NF-2)}' | tr -d 'M%%'" % BuildOptions.LOG_DIR
+
+ self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
+ for node in self._cm.env["nodes"]:
+ (_, dfout) = self._cm.rsh(node, dfcmd, verbose=1)
+ if not dfout:
+ self._cm.log("ERROR: Cannot execute remote df command [%s] on %s" % (dfcmd, node))
+ continue
+
+ dfout = dfout[0].strip()
+
+ try:
+ (used, remain) = dfout.split()
+ used_percent = int(used)
+ remaining_mb = int(remain)
+ except (ValueError, TypeError):
+ self._cm.log("Warning: df output '%s' from %s was invalid [%s, %s]"
+ % (dfout, node, used, remain))
+ else:
+ if remaining_mb < 10 or used_percent > 95:
+ self._cm.log("CRIT: Out of log disk space on %s (%d%% / %dMB)"
+ % (node, used_percent, remaining_mb))
+ result = False
+
+ if not should_continue(self._cm.env):
+ raise ValueError("Disk full on %s" % node)
+
+ elif remaining_mb < 100 or used_percent > 90:
+ self._cm.log("WARN: Low on log disk space (%dMB) on %s" % (remaining_mb, node))
+
+ return result
+
+ def is_applicable(self):
+ """ Return True if this audit is applicable in the current test configuration. """
+
+ return True
+
+
+class FileAudit(ClusterAudit):
+ """ Audit the filesystem looking for various failure conditions:
+
+ * The presence of core dumps from corosync or Pacemaker daemons
+ * Stale IPC files
+ """
+
+ def __init__(self, cm):
+ """ Create a new FileAudit instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ ClusterAudit.__init__(self, cm)
+ self.known = []
+ self.name = "FileAudit"
+
+ def __call__(self):
+ result = True
+
+ self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
+ for node in self._cm.env["nodes"]:
+
+ (_, lsout) = self._cm.rsh(node, "ls -al /var/lib/pacemaker/cores/* | grep core.[0-9]", verbose=1)
+ for line in lsout:
+ line = line.strip()
+
+ if line not in self.known:
+ result = False
+ self.known.append(line)
+ self._cm.log("Warning: Pacemaker core file on %s: %s" % (node, line))
+
+ (_, lsout) = self._cm.rsh(node, "ls -al /var/lib/corosync | grep core.[0-9]", verbose=1)
+ for line in lsout:
+ line = line.strip()
+
+ if line not in self.known:
+ result = False
+ self.known.append(line)
+ self._cm.log("Warning: Corosync core file on %s: %s" % (node, line))
+
+ if self._cm.expected_status.get(node) == "down":
+ clean = False
+ (_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1)
+
+ for line in lsout:
+ result = False
+ clean = True
+ self._cm.log("Warning: Stale IPC file on %s: %s" % (node, line))
+
+ if clean:
+ (_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1)
+
+ for line in lsout:
+ self._cm.debug("ps[%s]: %s" % (node, line))
+
+ self._cm.rsh(node, "rm -rf /dev/shm/qb-*")
+
+ else:
+ self._cm.debug("Skipping %s" % node)
+
+ return result
+
+ def is_applicable(self):
+ """ Return True if this audit is applicable in the current test configuration. """
+
+ return True
+
+
+class AuditResource:
+ """ A base class for storing information about a cluster resource """
+
+ def __init__(self, cm, line):
+ """ Create a new AuditResource instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ line -- One line of output from `crm_resource` describing a single
+ resource
+ """
+
+ # pylint: disable=invalid-name
+ fields = line.split()
+ self._cm = cm
+ self.line = line
+ self.type = fields[1]
+ self.id = fields[2]
+ self.clone_id = fields[3]
+ self.parent = fields[4]
+ self.rprovider = fields[5]
+ self.rclass = fields[6]
+ self.rtype = fields[7]
+ self.host = fields[8]
+ self.needs_quorum = fields[9]
+ self.flags = int(fields[10])
+ self.flags_s = fields[11]
+
+ if self.parent == "NA":
+ self.parent = None
+
+ @property
+ def unique(self):
+ """ Is this resource unique? """
+
+ return self.flags & 0x20
+
+ @property
+ def orphan(self):
+ """ Is this resource an orphan? """
+
+ return self.flags & 0x01
+
+ @property
+ def managed(self):
+ """ Is this resource managed by the cluster? """
+
+ return self.flags & 0x02
+
+
+class AuditConstraint:
+ """ A base class for storing information about a cluster constraint """
+
+ def __init__(self, cm, line):
+ """ Create a new AuditConstraint instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ line -- One line of output from `crm_resource` describing a single
+ constraint
+ """
+
+ # pylint: disable=invalid-name
+ fields = line.split()
+ self._cm = cm
+ self.line = line
+ self.type = fields[1]
+ self.id = fields[2]
+ self.rsc = fields[3]
+ self.target = fields[4]
+ self.score = fields[5]
+ self.rsc_role = fields[6]
+ self.target_role = fields[7]
+
+ if self.rsc_role == "NA":
+ self.rsc_role = None
+
+ if self.target_role == "NA":
+ self.target_role = None
+
+
+class PrimitiveAudit(ClusterAudit):
+ """ Audit primitive resources to verify a variety of conditions, including that
+ they are active and managed only when expected; they are active on the
+ expected clusted node; and that they are not orphaned.
+ """
+
+ def __init__(self, cm):
+ """ Create a new PrimitiveAudit instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ ClusterAudit.__init__(self, cm)
+ self.name = "PrimitiveAudit"
+
+ self._active_nodes = []
+ self._constraints = []
+ self._inactive_nodes = []
+ self._resources = []
+ self._target = None
+
+ def _audit_resource(self, resource, quorum):
+ """ Perform the audit of a single resource """
+
+ rc = True
+ active = self._cm.resource_location(resource.id)
+
+ if len(active) == 1:
+ if quorum:
+ self.debug("Resource %s active on %r" % (resource.id, active))
+
+ elif resource.needs_quorum == 1:
+ self._cm.log("Resource %s active without quorum: %r" % (resource.id, active))
+ rc = False
+
+ elif not resource.managed:
+ self._cm.log("Resource %s not managed. Active on %r" % (resource.id, active))
+
+ elif not resource.unique:
+ # TODO: Figure out a clever way to actually audit these resource types
+ if len(active) > 1:
+ self.debug("Non-unique resource %s is active on: %r" % (resource.id, active))
+ else:
+ self.debug("Non-unique resource %s is not active" % resource.id)
+
+ elif len(active) > 1:
+ self._cm.log("Resource %s is active multiple times: %r" % (resource.id, active))
+ rc = False
+
+ elif resource.orphan:
+ self.debug("Resource %s is an inactive orphan" % resource.id)
+
+ elif not self._inactive_nodes:
+ self._cm.log("WARN: Resource %s not served anywhere" % resource.id)
+ rc = False
+
+ elif self._cm.env["warn-inactive"]:
+ if quorum or not resource.needs_quorum:
+ self._cm.log("WARN: Resource %s not served anywhere (Inactive nodes: %r)"
+ % (resource.id, self._inactive_nodes))
+ else:
+ self.debug("Resource %s not served anywhere (Inactive nodes: %r)"
+ % (resource.id, self._inactive_nodes))
+
+ elif quorum or not resource.needs_quorum:
+ self.debug("Resource %s not served anywhere (Inactive nodes: %r)"
+ % (resource.id, self._inactive_nodes))
+
+ return rc
+
+ def _setup(self):
+ """ Verify cluster nodes are active, and collect resource and colocation
+ information used for performing the audit.
+ """
+
+ for node in self._cm.env["nodes"]:
+ if self._cm.expected_status[node] == "up":
+ self._active_nodes.append(node)
+ else:
+ self._inactive_nodes.append(node)
+
+ for node in self._cm.env["nodes"]:
+ if self._target is None and self._cm.expected_status[node] == "up":
+ self._target = node
+
+ if not self._target:
+ # TODO: In Pacemaker 1.0 clusters we'll be able to run crm_resource
+ # with CIB_file=/path/to/cib.xml even when the cluster isn't running
+ self.debug("No nodes active - skipping %s" % self.name)
+ return False
+
+ (_, lines) = self._cm.rsh(self._target, "crm_resource -c", verbose=1)
+
+ for line in lines:
+ if re.search("^Resource", line):
+ self._resources.append(AuditResource(self._cm, line))
+ elif re.search("^Constraint", line):
+ self._constraints.append(AuditConstraint(self._cm, line))
+ else:
+ self._cm.log("Unknown entry: %s" % line)
+
+ return True
+
+ def __call__(self):
+ result = True
+
+ if not self._setup():
+ return result
+
+ quorum = self._cm.has_quorum(None)
+ for resource in self._resources:
+ if resource.type == "primitive" and not self._audit_resource(resource, quorum):
+ result = False
+
+ return result
+
+ def is_applicable(self):
+ """ Return True if this audit is applicable in the current test configuration. """
+
+ # @TODO Due to long-ago refactoring, this name test would never match,
+ # so this audit (and those derived from it) would never run.
+ # Uncommenting the next lines fixes the name test, but that then
+ # exposes pre-existing bugs that need to be fixed.
+ #if self._cm["Name"] == "crm-corosync":
+ # return True
+ return False
+
+
+class GroupAudit(PrimitiveAudit):
+ """ Audit group resources to verify that each of its child primitive
+ resources is active on the expected cluster node.
+ """
+
+ def __init__(self, cm):
+ """ Create a new GroupAudit instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ PrimitiveAudit.__init__(self, cm)
+ self.name = "GroupAudit"
+
+ def __call__(self):
+ result = True
+
+ if not self._setup():
+ return result
+
+ for group in self._resources:
+ if group.type != "group":
+ continue
+
+ first_match = True
+ group_location = None
+
+ for child in self._resources:
+ if child.parent != group.id:
+ continue
+
+ nodes = self._cm.resource_location(child.id)
+
+ if first_match and len(nodes) > 0:
+ group_location = nodes[0]
+
+ first_match = False
+
+ if len(nodes) > 1:
+ result = False
+ self._cm.log("Child %s of %s is active more than once: %r"
+ % (child.id, group.id, nodes))
+
+ elif not nodes:
+ # Groups are allowed to be partially active
+ # However we do need to make sure later children aren't running
+ group_location = None
+ self.debug("Child %s of %s is stopped" % (child.id, group.id))
+
+ elif nodes[0] != group_location:
+ result = False
+ self._cm.log("Child %s of %s is active on the wrong node (%s) expected %s"
+ % (child.id, group.id, nodes[0], group_location))
+ else:
+ self.debug("Child %s of %s is active on %s" % (child.id, group.id, nodes[0]))
+
+ return result
+
+
+class CloneAudit(PrimitiveAudit):
+ """ Audit clone resources. NOTE: Currently, this class does not perform
+ any actual audit functions.
+ """
+
+ def __init__(self, cm):
+ """ Create a new CloneAudit instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ PrimitiveAudit.__init__(self, cm)
+ self.name = "CloneAudit"
+
+ def __call__(self):
+ result = True
+
+ if not self._setup():
+ return result
+
+ for clone in self._resources:
+ if clone.type != "clone":
+ continue
+
+ for child in self._resources:
+ if child.parent == clone.id and child.type == "primitive":
+ self.debug("Checking child %s of %s..." % (child.id, clone.id))
+ # Check max and node_max
+ # Obtain with:
+ # crm_resource -g clone_max --meta -r child.id
+ # crm_resource -g clone_node_max --meta -r child.id
+
+ return result
+
+
+class ColocationAudit(PrimitiveAudit):
+ """ Audit cluster resources to verify that those that should be colocated
+ with each other actually are.
+ """
+
+ def __init__(self, cm):
+ """ Create a new ColocationAudit instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ PrimitiveAudit.__init__(self, cm)
+ self.name = "ColocationAudit"
+
+ def _crm_location(self, resource):
+ """ Return a list of cluster nodes where a given resource is running """
+
+ (rc, lines) = self._cm.rsh(self._target, "crm_resource -W -r %s -Q" % resource, verbose=1)
+ hosts = []
+
+ if rc == 0:
+ for line in lines:
+ fields = line.split()
+ hosts.append(fields[0])
+
+ return hosts
+
+ def __call__(self):
+ result = True
+
+ if not self._setup():
+ return result
+
+ for coloc in self._constraints:
+ if coloc.type != "rsc_colocation":
+ continue
+
+ source = self._crm_location(coloc.rsc)
+ target = self._crm_location(coloc.target)
+
+ if not source:
+ self.debug("Colocation audit (%s): %s not running" % (coloc.id, coloc.rsc))
+ else:
+ for node in source:
+ if not node in target:
+ result = False
+ self._cm.log("Colocation audit (%s): %s running on %s (not in %r)"
+ % (coloc.id, coloc.rsc, node, target))
+ else:
+ self.debug("Colocation audit (%s): %s running on %s (in %r)"
+ % (coloc.id, coloc.rsc, node, target))
+
+ return result
+
+
+class ControllerStateAudit(ClusterAudit):
+ """ Audit cluster nodes to verify that those we expect to be active are
+ active, and those that are expected to be inactive are inactive.
+ """
+
+ def __init__(self, cm):
+ """ Create a new ControllerStateAudit instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ ClusterAudit.__init__(self, cm)
+ self.name = "ControllerStateAudit"
+
+ def __call__(self):
+ result = True
+ up_are_down = 0
+ down_are_up = 0
+ unstable_list = []
+
+ for node in self._cm.env["nodes"]:
+ should_be = self._cm.expected_status[node]
+ rc = self._cm.test_node_cm(node)
+
+ if rc > 0:
+ if should_be == "down":
+ down_are_up += 1
+
+ if rc == 1:
+ unstable_list.append(node)
+
+ elif should_be == "up":
+ up_are_down += 1
+
+ if len(unstable_list) > 0:
+ result = False
+ self._cm.log("Cluster is not stable: %d (of %d): %r"
+ % (len(unstable_list), self._cm.upcount(), unstable_list))
+
+ if up_are_down > 0:
+ result = False
+ self._cm.log("%d (of %d) nodes expected to be up were down."
+ % (up_are_down, len(self._cm.env["nodes"])))
+
+ if down_are_up > 0:
+ result = False
+ self._cm.log("%d (of %d) nodes expected to be down were up."
+ % (down_are_up, len(self._cm.env["nodes"])))
+
+ return result
+
+ def is_applicable(self):
+ """ Return True if this audit is applicable in the current test configuration. """
+
+ # @TODO Due to long-ago refactoring, this name test would never match,
+ # so this audit (and those derived from it) would never run.
+ # Uncommenting the next lines fixes the name test, but that then
+ # exposes pre-existing bugs that need to be fixed.
+ #if self._cm["Name"] == "crm-corosync":
+ # return True
+ return False
+
+
+class CIBAudit(ClusterAudit):
+ """ Audit the CIB by verifying that it is identical across cluster nodes """
+
+ def __init__(self, cm):
+ """ Create a new CIBAudit instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ ClusterAudit.__init__(self, cm)
+ self.name = "CibAudit"
+
+ def __call__(self):
+ result = True
+ ccm_partitions = self._cm.find_partitions()
+
+ if not ccm_partitions:
+ self.debug("\tNo partitions to audit")
+ return result
+
+ for partition in ccm_partitions:
+ self.debug("\tAuditing CIB consistency for: %s" % partition)
+
+ if self._audit_cib_contents(partition) == 0:
+ result = False
+
+ return result
+
+ def _audit_cib_contents(self, hostlist):
+ """ Perform the CIB audit on the given hosts """
+
+ passed = True
+ node0 = None
+ node0_xml = None
+
+ partition_hosts = hostlist.split()
+ for node in partition_hosts:
+ node_xml = self._store_remote_cib(node, node0)
+
+ if node_xml is None:
+ self._cm.log("Could not perform audit: No configuration from %s" % node)
+ passed = False
+
+ elif node0 is None:
+ node0 = node
+ node0_xml = node_xml
+
+ elif node0_xml is None:
+ self._cm.log("Could not perform audit: No configuration from %s" % node0)
+ passed = False
+
+ else:
+ (rc, result) = self._cm.rsh(
+ node0, "crm_diff -VV -cf --new %s --original %s" % (node_xml, node0_xml), verbose=1)
+
+ if rc != 0:
+ self._cm.log("Diff between %s and %s failed: %d" % (node0_xml, node_xml, rc))
+ passed = False
+
+ for line in result:
+ if not re.search("<diff/>", line):
+ passed = False
+ self.debug("CibDiff[%s-%s]: %s" % (node0, node, line))
+ else:
+ self.debug("CibDiff[%s-%s] Ignoring: %s" % (node0, node, line))
+
+ return passed
+
+ def _store_remote_cib(self, node, target):
+ """ Store a copy of the given node's CIB on the given target node. If
+ no target is given, store the CIB on the given node.
+ """
+
+ filename = "/tmp/ctsaudit.%s.xml" % node
+
+ if not target:
+ target = node
+
+ (rc, lines) = self._cm.rsh(node, self._cm["CibQuery"], verbose=1)
+ if rc != 0:
+ self._cm.log("Could not retrieve configuration")
+ return None
+
+ self._cm.rsh("localhost", "rm -f %s" % filename)
+ for line in lines:
+ self._cm.rsh("localhost", "echo \'%s\' >> %s" % (line[:-1], filename), verbose=0)
+
+ if self._cm.rsh.copy(filename, "root@%s:%s" % (target, filename), silent=True) != 0:
+ self._cm.log("Could not store configuration")
+ return None
+
+ return filename
+
+ def is_applicable(self):
+ """ Return True if this audit is applicable in the current test configuration. """
+
+ # @TODO Due to long-ago refactoring, this name test would never match,
+ # so this audit (and those derived from it) would never run.
+ # Uncommenting the next lines fixes the name test, but that then
+ # exposes pre-existing bugs that need to be fixed.
+ #if self._cm["Name"] == "crm-corosync":
+ # return True
+ return False
+
+
+class PartitionAudit(ClusterAudit):
+ """ Audit each partition in a cluster to verify a variety of conditions:
+
+ * The number of partitions and the nodes in each is as expected
+ * Each node is active when it should be active and inactive when it
+ should be inactive
+ * The status and epoch of each node is as expected
+ * A partition has quorum
+ * A partition has a DC when expected
+ """
+
+ def __init__(self, cm):
+ """ Create a new PartitionAudit instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ ClusterAudit.__init__(self, cm)
+ self.name = "PartitionAudit"
+
+ self._node_epoch = {}
+ self._node_state = {}
+ self._node_quorum = {}
+
+ def __call__(self):
+ result = True
+ ccm_partitions = self._cm.find_partitions()
+
+ if not ccm_partitions:
+ return result
+
+ self._cm.cluster_stable(double_check=True)
+
+ if len(ccm_partitions) != self._cm.partitions_expected:
+ self._cm.log("ERROR: %d cluster partitions detected:" % len(ccm_partitions))
+ result = False
+
+ for partition in ccm_partitions:
+ self._cm.log("\t %s" % partition)
+
+ for partition in ccm_partitions:
+ if self._audit_partition(partition) == 0:
+ result = False
+
+ return result
+
+ def _trim_string(self, avalue):
+ """ Remove the last character from a multi-character string """
+
+ if not avalue:
+ return None
+
+ if len(avalue) > 1:
+ return avalue[:-1]
+
+ return avalue
+
+ def _trim2int(self, avalue):
+ """ Remove the last character from a multi-character string and convert
+ the result to an int.
+ """
+
+ trimmed = self._trim_string(avalue)
+ if trimmed:
+ return int(trimmed)
+
+ return None
+
+ def _audit_partition(self, partition):
+ """ Perform the audit of a single partition """
+
+ passed = True
+ dc_found = []
+ dc_allowed_list = []
+ lowest_epoch = None
+ node_list = partition.split()
+
+ self.debug("Auditing partition: %s" % partition)
+ for node in node_list:
+ if self._cm.expected_status[node] != "up":
+ self._cm.log("Warn: Node %s appeared out of nowhere" % node)
+ self._cm.expected_status[node] = "up"
+ # not in itself a reason to fail the audit (not what we're
+ # checking for in this audit)
+
+ (_, out) = self._cm.rsh(node, self._cm["StatusCmd"] % node, verbose=1)
+ self._node_state[node] = out[0].strip()
+
+ (_, out) = self._cm.rsh(node, self._cm["EpochCmd"], verbose=1)
+ self._node_epoch[node] = out[0].strip()
+
+ (_, out) = self._cm.rsh(node, self._cm["QuorumCmd"], verbose=1)
+ self._node_quorum[node] = out[0].strip()
+
+ self.debug("Node %s: %s - %s - %s." % (node, self._node_state[node], self._node_epoch[node], self._node_quorum[node]))
+ self._node_state[node] = self._trim_string(self._node_state[node])
+ self._node_epoch[node] = self._trim2int(self._node_epoch[node])
+ self._node_quorum[node] = self._trim_string(self._node_quorum[node])
+
+ if not self._node_epoch[node]:
+ self._cm.log("Warn: Node %s dissappeared: cant determin epoch" % node)
+ self._cm.expected_status[node] = "down"
+ # not in itself a reason to fail the audit (not what we're
+ # checking for in this audit)
+ elif lowest_epoch is None or self._node_epoch[node] < lowest_epoch:
+ lowest_epoch = self._node_epoch[node]
+
+ if not lowest_epoch:
+ self._cm.log("Lowest epoch not determined in %s" % partition)
+ passed = False
+
+ for node in node_list:
+ if self._cm.expected_status[node] != "up":
+ continue
+
+ if self._cm.is_node_dc(node, self._node_state[node]):
+ dc_found.append(node)
+ if self._node_epoch[node] == lowest_epoch:
+ self.debug("%s: OK" % node)
+ elif not self._node_epoch[node]:
+ self.debug("Check on %s ignored: no node epoch" % node)
+ elif not lowest_epoch:
+ self.debug("Check on %s ignored: no lowest epoch" % node)
+ else:
+ self._cm.log("DC %s is not the oldest node (%d vs. %d)"
+ % (node, self._node_epoch[node], lowest_epoch))
+ passed = False
+
+ if not dc_found:
+ self._cm.log("DC not found on any of the %d allowed nodes: %s (of %s)"
+ % (len(dc_allowed_list), str(dc_allowed_list), str(node_list)))
+
+ elif len(dc_found) > 1:
+ self._cm.log("%d DCs (%s) found in cluster partition: %s"
+ % (len(dc_found), str(dc_found), str(node_list)))
+ passed = False
+
+ if not passed:
+ for node in node_list:
+ if self._cm.expected_status[node] == "up":
+ self._cm.log("epoch %s : %s"
+ % (self._node_epoch[node], self._node_state[node]))
+
+ return passed
+
+ def is_applicable(self):
+ """ Return True if this audit is applicable in the current test configuration. """
+
+ # @TODO Due to long-ago refactoring, this name test would never match,
+ # so this audit (and those derived from it) would never run.
+ # Uncommenting the next lines fixes the name test, but that then
+ # exposes pre-existing bugs that need to be fixed.
+ #if self._cm["Name"] == "crm-corosync":
+ # return True
+ return False
+
+
+# pylint: disable=invalid-name
+def audit_list(cm):
+ """ Return a list of instances of applicable audits that can be performed
+ for the given ClusterManager.
+ """
+
+ result = []
+
+ for auditclass in [DiskAudit, FileAudit, LogAudit, ControllerStateAudit,
+ PartitionAudit, PrimitiveAudit, GroupAudit, CloneAudit,
+ ColocationAudit, CIBAudit]:
+ a = auditclass(cm)
+ if a.is_applicable():
+ result.append(a)
+
+ return result
diff --git a/python/pacemaker/_cts/cib.py b/python/pacemaker/_cts/cib.py
new file mode 100644
index 0000000..b8b5d5d
--- /dev/null
+++ b/python/pacemaker/_cts/cib.py
@@ -0,0 +1,425 @@
+""" CIB generator for Pacemaker's Cluster Test Suite (CTS) """
+
+__all__ = ["ConfigFactory"]
+__copyright__ = "Copyright 2008-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import warnings
+import tempfile
+
+from pacemaker.buildoptions import BuildOptions
+from pacemaker._cts.cibxml import Alerts, Clone, Expression, FencingTopology, Group, Nodes, OpDefaults, Option, Resource, Rule
+from pacemaker._cts.network import next_ip
+
+
+class CIB:
+ """ A class for generating, representing, and installing a CIB file onto
+ cluster nodes
+ """
+
+ def __init__(self, cm, version, factory, tmpfile=None):
+ """ Create a new CIB instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ version -- The schema syntax version
+ factory -- A ConfigFactory instance
+ tmpfile -- Where to store the CIB, or None to use a new tempfile
+ """
+
+ # pylint: disable=invalid-name
+ self._cib = None
+ self._cm = cm
+ self._counter = 1
+ self._factory = factory
+ self._num_nodes = 0
+
+ self.version = version
+
+ if not tmpfile:
+ warnings.filterwarnings("ignore")
+
+ # pylint: disable=consider-using-with
+ f = tempfile.NamedTemporaryFile(delete=True)
+ f.close()
+ tmpfile = f.name
+
+ warnings.resetwarnings()
+
+ self._factory.tmpfile = tmpfile
+
+ def _show(self):
+ """ Query a cluster node for its generated CIB; log and return the result """
+
+ output = ""
+ (_, result) = self._factory.rsh(self._factory.target, "HOME=/root CIB_file=%s cibadmin -Ql" % self._factory.tmpfile, verbose=1)
+
+ for line in result:
+ output += line
+ self._factory.debug("Generated Config: %s" % line)
+
+ return output
+
+ def new_ip(self, name=None):
+ """ Generate an IP resource for the next available IP address, optionally
+ specifying the resource's name.
+ """
+
+ if self._cm.env["IPagent"] == "IPaddr2":
+ ip = next_ip(self._cm.env["IPBase"])
+ if not name:
+ if ":" in ip:
+ (_, _, suffix) = ip.rpartition(":")
+ name = "r%s" % suffix
+ else:
+ name = "r%s" % ip
+
+ r = Resource(self._factory, name, self._cm.env["IPagent"], "ocf")
+ r["ip"] = ip
+
+ if ":" in ip:
+ r["cidr_netmask"] = "64"
+ r["nic"] = "eth0"
+ else:
+ r["cidr_netmask"] = "32"
+
+ else:
+ if not name:
+ name = "r%s%d" % (self._cm.env["IPagent"], self._counter)
+ self._counter += 1
+
+ r = Resource(self._factory, name, self._cm.env["IPagent"], "ocf")
+
+ r.add_op("monitor", "5s")
+ return r
+
+ def get_node_id(self, node_name):
+ """ Check the cluster configuration for the node ID for the given node_name """
+
+ # We can't account for every possible configuration,
+ # so we only return a node ID if:
+ # * The node is specified in /etc/corosync/corosync.conf
+ # with "ring0_addr:" equal to node_name and "nodeid:"
+ # explicitly specified.
+ # In all other cases, we return 0.
+ node_id = 0
+
+ # awkward command: use } as record separator
+ # so each corosync.conf "object" is one record;
+ # match the "node {" record that has "ring0_addr: node_name";
+ # then print the substring of that record after "nodeid:"
+ awk = r"""awk -v RS="}" """ \
+ r"""'/^(\s*nodelist\s*{)?\s*node\s*{.*(ring0_addr|name):\s*%s(\s+|$)/""" \
+ r"""{gsub(/.*nodeid:\s*/,"");gsub(/\s+.*$/,"");print}' %s""" \
+ % (node_name, BuildOptions.COROSYNC_CONFIG_FILE)
+
+ (rc, output) = self._factory.rsh(self._factory.target, awk, verbose=1)
+
+ if rc == 0 and len(output) == 1:
+ try:
+ node_id = int(output[0])
+ except ValueError:
+ node_id = 0
+
+ return node_id
+
+ def install(self, target):
+ """ Generate a CIB file and install it to the given cluster node """
+
+ old = self._factory.tmpfile
+
+ # Force a rebuild
+ self._cib = None
+
+ self._factory.tmpfile = "%s/cib.xml" % BuildOptions.CIB_DIR
+ self.contents(target)
+ self._factory.rsh(self._factory.target, "chown %s %s" % (BuildOptions.DAEMON_USER, self._factory.tmpfile))
+
+ self._factory.tmpfile = old
+
+ def contents(self, target):
+ """ Generate a complete CIB file """
+
+ # fencing resource
+ if self._cib:
+ return self._cib
+
+ if target:
+ self._factory.target = target
+
+ self._factory.rsh(self._factory.target, "HOME=/root cibadmin --empty %s > %s" % (self.version, self._factory.tmpfile))
+ self._num_nodes = len(self._cm.env["nodes"])
+
+ no_quorum = "stop"
+ if self._num_nodes < 3:
+ no_quorum = "ignore"
+ self._factory.log("Cluster only has %d nodes, configuring: no-quorum-policy=ignore" % self._num_nodes)
+
+ # We don't need a nodes section unless we add attributes
+ stn = None
+
+ # Fencing resource
+ # Define first so that the shell doesn't reject every update
+ if self._cm.env["DoFencing"]:
+
+ # Define the "real" fencing device
+ st = Resource(self._factory, "Fencing", self._cm.env["stonith-type"], "stonith")
+
+ # Set a threshold for unreliable stonith devices such as the vmware one
+ st.add_meta("migration-threshold", "5")
+ st.add_op("monitor", "120s", timeout="120s")
+ st.add_op("stop", "0", timeout="60s")
+ st.add_op("start", "0", timeout="60s")
+
+ # For remote node tests, a cluster node is stopped and brought back up
+ # as a remote node with the name "remote-OLDNAME". To allow fencing
+ # devices to fence these nodes, create a list of all possible node names.
+ all_node_names = [prefix+n for n in self._cm.env["nodes"] for prefix in ('', 'remote-')]
+
+ # Add all parameters specified by user
+ entries = self._cm.env["stonith-params"].split(',')
+ for entry in entries:
+ try:
+ (name, value) = entry.split('=', 1)
+ except ValueError:
+ print("Warning: skipping invalid fencing parameter: %s" % entry)
+ continue
+
+ # Allow user to specify "all" as the node list, and expand it here
+ if name in ["hostlist", "pcmk_host_list"] and value == "all":
+ value = ' '.join(all_node_names)
+
+ st[name] = value
+
+ st.commit()
+
+ # Test advanced fencing logic
+ stf_nodes = []
+ stt_nodes = []
+ attr_nodes = {}
+
+ # Create the levels
+ stl = FencingTopology(self._factory)
+ for node in self._cm.env["nodes"]:
+ # Remote node tests will rename the node
+ remote_node = "remote-%s" % node
+
+ # Randomly assign node to a fencing method
+ ftype = self._cm.env.random_gen.choice(["levels-and", "levels-or ", "broadcast "])
+
+ # For levels-and, randomly choose targeting by node name or attribute
+ by = ""
+
+ if ftype == "levels-and":
+ node_id = self.get_node_id(node)
+
+ if node_id == 0 or self._cm.env.random_gen.choice([True, False]):
+ by = " (by name)"
+ else:
+ attr_nodes[node] = node_id
+ by = " (by attribute)"
+
+ self._cm.log(" - Using %s fencing for node: %s%s" % (ftype, node, by))
+
+ if ftype == "levels-and":
+ # If targeting by name, add a topology level for this node
+ if node not in attr_nodes:
+ stl.level(1, node, "FencingPass,Fencing")
+
+ # Always target remote nodes by name, otherwise we would need to add
+ # an attribute to the remote node only during remote tests (we don't
+ # want nonexistent remote nodes showing up in the non-remote tests).
+ # That complexity is not worth the effort.
+ stl.level(1, remote_node, "FencingPass,Fencing")
+
+ # Add the node (and its remote equivalent) to the list of levels-and nodes.
+ stt_nodes.extend([node, remote_node])
+
+ elif ftype == "levels-or ":
+ for n in [node, remote_node]:
+ stl.level(1, n, "FencingFail")
+ stl.level(2, n, "Fencing")
+
+ stf_nodes.extend([node, remote_node])
+
+ # If any levels-and nodes were targeted by attribute,
+ # create the attributes and a level for the attribute.
+ if attr_nodes:
+ stn = Nodes(self._factory)
+
+ for (node_name, node_id) in attr_nodes.items():
+ stn.add_node(node_name, node_id, {"cts-fencing": "levels-and"})
+
+ stl.level(1, None, "FencingPass,Fencing", "cts-fencing", "levels-and")
+
+ # Create a Dummy agent that always passes for levels-and
+ if stt_nodes:
+ stt = Resource(self._factory, "FencingPass", "fence_dummy", "stonith")
+ stt["pcmk_host_list"] = " ".join(stt_nodes)
+ # Wait this many seconds before doing anything, handy for letting disks get flushed too
+ stt["random_sleep_range"] = "30"
+ stt["mode"] = "pass"
+ stt.commit()
+
+ # Create a Dummy agent that always fails for levels-or
+ if stf_nodes:
+ stf = Resource(self._factory, "FencingFail", "fence_dummy", "stonith")
+ stf["pcmk_host_list"] = " ".join(stf_nodes)
+ # Wait this many seconds before doing anything, handy for letting disks get flushed too
+ stf["random_sleep_range"] = "30"
+ stf["mode"] = "fail"
+ stf.commit()
+
+ # Now commit the levels themselves
+ stl.commit()
+
+ o = Option(self._factory)
+ o["stonith-enabled"] = self._cm.env["DoFencing"]
+ o["start-failure-is-fatal"] = "false"
+ o["pe-input-series-max"] = "5000"
+ o["shutdown-escalation"] = "5min"
+ o["batch-limit"] = "10"
+ o["dc-deadtime"] = "5s"
+ o["no-quorum-policy"] = no_quorum
+
+ o.commit()
+
+ o = OpDefaults(self._factory)
+ o["timeout"] = "90s"
+ o.commit()
+
+ # Commit the nodes section if we defined one
+ if stn is not None:
+ stn.commit()
+
+ # Add an alerts section if possible
+ if self._factory.rsh.exists_on_all(self._cm.env["notification-agent"], self._cm.env["nodes"]):
+ alerts = Alerts(self._factory)
+ alerts.add_alert(self._cm.env["notification-agent"],
+ self._cm.env["notification-recipient"])
+ alerts.commit()
+
+ # Add resources?
+ if self._cm.env["CIBResource"]:
+ self.add_resources()
+
+ # generate cib
+ self._cib = self._show()
+
+ if self._factory.tmpfile != "%s/cib.xml" % BuildOptions.CIB_DIR:
+ self._factory.rsh(self._factory.target, "rm -f %s" % self._factory.tmpfile)
+
+ return self._cib
+
+ def add_resources(self):
+ """ Add various resources and their constraints to the CIB """
+
+ # Per-node resources
+ for node in self._cm.env["nodes"]:
+ name = "rsc_%s" % node
+ r = self.new_ip(name)
+ r.prefer(node, "100")
+ r.commit()
+
+ # Migrator
+ # Make this slightly sticky (since we have no other location constraints) to avoid relocation during Reattach
+ m = Resource(self._factory, "migrator", "Dummy", "ocf", "pacemaker")
+ m["passwd"] = "whatever"
+ m.add_meta("resource-stickiness", "1")
+ m.add_meta("allow-migrate", "1")
+ m.add_op("monitor", "P10S")
+ m.commit()
+
+ # Ping the test exerciser
+ p = Resource(self._factory, "ping-1", "ping", "ocf", "pacemaker")
+ p.add_op("monitor", "60s")
+ p["host_list"] = self._cm.env["cts-exerciser"]
+ p["name"] = "connected"
+ p["debug"] = "true"
+
+ c = Clone(self._factory, "Connectivity", p)
+ c["globally-unique"] = "false"
+ c.commit()
+
+ # promotable clone resource
+ s = Resource(self._factory, "stateful-1", "Stateful", "ocf", "pacemaker")
+ s.add_op("monitor", "15s", timeout="60s")
+ s.add_op("monitor", "16s", timeout="60s", role="Promoted")
+ ms = Clone(self._factory, "promotable-1", s)
+ ms["promotable"] = "true"
+ ms["clone-max"] = self._num_nodes
+ ms["clone-node-max"] = 1
+ ms["promoted-max"] = 1
+ ms["promoted-node-max"] = 1
+
+ # Require connectivity to run the promotable clone
+ r = Rule(self._factory, "connected", "-INFINITY", op="or")
+ r.add_child(Expression(self._factory, "m1-connected-1", "connected", "lt", "1"))
+ r.add_child(Expression(self._factory, "m1-connected-2", "connected", "not_defined", None))
+ ms.prefer("connected", rule=r)
+
+ ms.commit()
+
+ # Group Resource
+ g = Group(self._factory, "group-1")
+ g.add_child(self.new_ip())
+
+ if self._cm.env["have_systemd"]:
+ sysd = Resource(self._factory, "petulant", "pacemaker-cts-dummyd@10", "service")
+ sysd.add_op("monitor", "P10S")
+ g.add_child(sysd)
+ else:
+ g.add_child(self.new_ip())
+
+ g.add_child(self.new_ip())
+
+ # Make group depend on the promotable clone
+ g.after("promotable-1", first="promote", then="start")
+ g.colocate("promotable-1", "INFINITY", withrole="Promoted")
+
+ g.commit()
+
+ # LSB resource
+ lsb = Resource(self._factory, "lsb-dummy", "LSBDummy", "lsb")
+ lsb.add_op("monitor", "5s")
+
+ # LSB with group
+ lsb.after("group-1")
+ lsb.colocate("group-1")
+
+ lsb.commit()
+
+
+class ConfigFactory:
+ """ Singleton to generate a CIB file for the environment's schema version """
+
+ def __init__(self, cm):
+ """ Create a new ConfigFactory instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ # pylint: disable=invalid-name
+ self._cm = cm
+ self.rsh = self._cm.rsh
+ if not self._cm.env["ListTests"]:
+ self.target = self._cm.env["nodes"][0]
+ self.tmpfile = None
+
+ def log(self, args):
+ """ Log a message """
+
+ self._cm.log("cib: %s" % args)
+
+ def debug(self, args):
+ """ Log a debug message """
+
+ self._cm.debug("cib: %s" % args)
+
+ def create_config(self, name="pacemaker-%s" % BuildOptions.CIB_SCHEMA_VERSION):
+ """ Return a CIB object for the given schema version """
+
+ return CIB(self._cm, name, self)
diff --git a/python/pacemaker/_cts/cibxml.py b/python/pacemaker/_cts/cibxml.py
new file mode 100644
index 0000000..52e3721
--- /dev/null
+++ b/python/pacemaker/_cts/cibxml.py
@@ -0,0 +1,734 @@
+""" CIB XML generator for Pacemaker's Cluster Test Suite (CTS) """
+
+__all__ = [
+ "Alerts",
+ "Clone",
+ "Expression",
+ "FencingTopology",
+ "Group",
+ "Nodes",
+ "OpDefaults",
+ "Option",
+ "Resource",
+ "Rule",
+]
+__copyright__ = "Copyright 2008-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+
+def key_val_string(**kwargs):
+ """ Given keyword arguments as key=value pairs, construct a single string
+ containing all those pairs separated by spaces. This is suitable for
+ using in an XML element as a list of its attributes.
+
+ Any pairs that have value=None will be skipped.
+
+ Note that a dictionary can be passed to this function instead of kwargs
+ by using a construction like:
+
+ key_val_string(**{"a": 1, "b": 2})
+ """
+
+ retval = ""
+
+ for (k, v) in kwargs.items():
+ if v is None:
+ continue
+
+ retval += ' %s="%s"' % (k, v)
+
+ return retval
+
+
+def element(element_name, **kwargs):
+ """ Create an XML element string with the given element_name and attributes.
+ This element does not support having any children, so it will be closed
+ on the same line. The attributes are processed by key_val_string.
+ """
+
+ return "<%s %s/>" % (element_name, key_val_string(**kwargs))
+
+
+def containing_element(element_name, inner, **kwargs):
+ """ Like element, but surrounds some child text passed by the inner
+ parameter.
+ """
+
+ attrs = key_val_string(**kwargs)
+ return "<%s %s>%s</%s>" % (element_name, attrs, inner, element_name)
+
+
+class XmlBase:
+ """ A base class for deriving all kinds of XML sections in the CIB. This
+ class contains only the most basic operations common to all sections.
+ It is up to subclasses to provide most behavior.
+
+ Note that subclasses of this base class often have different sets of
+ arguments to their __init__ methods. In general this is not a great
+ practice, however it is so thoroughly used in these classes that trying
+ to straighten it out is likely to cause more bugs than just leaving it
+ alone for now.
+ """
+
+ def __init__(self, factory, tag, _id, **kwargs):
+ """ Create a new XmlBase instance
+
+ Arguments:
+
+ factory -- A ConfigFactory instance
+ tag -- The XML element's start and end tag
+ _id -- A unique name for the element
+ kwargs -- Any additional key/value pairs that should be added to
+ this element as attributes
+ """
+
+ self._children = []
+ self._factory = factory
+ self._kwargs = kwargs
+ self._tag = tag
+
+ self.name = _id
+
+ def __repr__(self):
+ """ Return a short string description of this XML section """
+
+ return "%s-%s" % (self._tag, self.name)
+
+ def add_child(self, child):
+ """ Add an XML section as a child of this one """
+
+ self._children.append(child)
+
+ def __setitem__(self, key, value):
+ """ Add a key/value pair to this element, resulting in it becoming an
+ XML attribute. If value is None, remove the key.
+ """
+
+ if value:
+ self._kwargs[key] = value
+ else:
+ self._kwargs.pop(key, None)
+
+ def show(self):
+ """ Return a string representation of this XML section, including all
+ of its children
+ """
+
+ text = '''<%s''' % self._tag
+ if self.name:
+ text += ''' id="%s"''' % self.name
+
+ text += key_val_string(**self._kwargs)
+
+ if not self._children:
+ text += '''/>'''
+ return text
+
+ text += '''>'''
+
+ for c in self._children:
+ text += c.show()
+
+ text += '''</%s>''' % self._tag
+ return text
+
+ def _run(self, operation, xml, section, options=""):
+ """ Update the CIB on the cluster to include this XML section, including
+ all of its children
+
+ Arguments:
+
+ operation -- Whether this update is a "create" or "modify" operation
+ xml -- The XML to update the CIB with, typically the result
+ of calling show
+ section -- Which section of the CIB this update applies to (see
+ the --scope argument to cibadmin for allowed values)
+ options -- Extra options to pass to cibadmin
+ """
+
+ if self.name:
+ label = self.name
+ else:
+ label = "<%s>" % self._tag
+
+ self._factory.debug("Writing out %s" % label)
+
+ fixed = "HOME=/root CIB_file=%s" % self._factory.tmpfile
+ fixed += " cibadmin --%s --scope %s %s --xml-text '%s'" % (operation, section, options, xml)
+
+ (rc, _) = self._factory.rsh(self._factory.target, fixed)
+ if rc != 0:
+ raise RuntimeError("Configure call failed: %s" % fixed)
+
+
+class InstanceAttributes(XmlBase):
+ """ A class that creates an <instance_attributes> XML section with
+ key/value pairs
+ """
+
+ def __init__(self, factory, _id, attrs):
+ """ Create a new InstanceAttributes instance
+
+ Arguments:
+
+ factory -- A ConfigFactory instance
+ _id -- A unique name for the element
+ attrs -- Key/value pairs to add as nvpair child elements
+ """
+
+ XmlBase.__init__(self, factory, "instance_attributes", _id)
+
+ # Create an <nvpair> for each attribute
+ for (attr, value) in attrs.items():
+ self.add_child(XmlBase(factory, "nvpair", "%s-%s" % (_id, attr),
+ name=attr, value=value))
+
+
+class Node(XmlBase):
+ """ A class that creates a <node> XML section for a single node, complete
+ with node attributes
+ """
+
+ def __init__(self, factory, node_name, node_id, node_attrs):
+ """ Create a new Node instance
+
+ Arguments:
+
+ factory -- A ConfigFactory instance
+ node_name -- The value of the uname attribute for this node
+ node_id -- A unique name for the element
+ node_attrs -- Additional key/value pairs to set as instance
+ attributes for this node
+ """
+
+ XmlBase.__init__(self, factory, "node", node_id, uname=node_name)
+ self.add_child(InstanceAttributes(factory, "%s-1" % node_name, node_attrs))
+
+
+class Nodes(XmlBase):
+ """ A class that creates a <nodes> XML section containing multiple Node
+ instances as children
+ """
+
+ def __init__(self, factory):
+ """ Create a new Nodes instance
+
+ Arguments:
+
+ factory -- A ConfigFactory instance
+ """
+
+ XmlBase.__init__(self, factory, "nodes", None)
+
+ def add_node(self, node_name, node_id, node_attrs):
+ """ Add a child node element
+
+ Arguments:
+
+ node_name -- The value of the uname attribute for this node
+ node_id -- A unique name for the element
+ node_attrs -- Additional key/value pairs to set as instance
+ attributes for this node
+ """
+
+ self.add_child(Node(self._factory, node_name, node_id, node_attrs))
+
+ def commit(self):
+ """ Modify the CIB on the cluster to include this XML section """
+
+ self._run("modify", self.show(), "configuration", "--allow-create")
+
+
+class FencingTopology(XmlBase):
+ """ A class that creates a <fencing-topology> XML section describing how
+ fencing is configured in the cluster
+ """
+
+ def __init__(self, factory):
+ """ Create a new FencingTopology instance
+
+ Arguments:
+
+ factory -- A ConfigFactory instance
+ """
+
+ XmlBase.__init__(self, factory, "fencing-topology", None)
+
+ def level(self, index, target, devices, target_attr=None, target_value=None):
+ """ Generate a <fencing-level> XML element
+
+ index -- The order in which to attempt fencing-levels
+ (1 through 9). Levels are attempted in ascending
+ order until one succeeds.
+ target -- The name of a single node to which this level applies
+ devices -- A list of devices that must all be tried for this
+ level
+ target_attr -- The name of a node attribute that is set for nodes
+ to which this level applies
+ target_value -- The value of a node attribute that is set for nodes
+ to which this level applies
+ """
+
+ if target:
+ xml_id = "cts-%s.%d" % (target, index)
+ self.add_child(XmlBase(self._factory, "fencing-level", xml_id, target=target, index=index, devices=devices))
+
+ else:
+ xml_id = "%s-%s.%d" % (target_attr, target_value, index)
+ child = XmlBase(self._factory, "fencing-level", xml_id, index=index, devices=devices)
+ child["target-attribute"] = target_attr
+ child["target-value"] = target_value
+ self.add_child(child)
+
+ def commit(self):
+ """ Create this XML section in the CIB """
+
+ self._run("create", self.show(), "configuration", "--allow-create")
+
+
+class Option(XmlBase):
+ """ A class that creates a <cluster_property_set> XML section of key/value
+ pairs for cluster-wide configuration settings
+ """
+
+ def __init__(self, factory, _id="cib-bootstrap-options"):
+ """ Create a new Option instance
+
+ Arguments:
+
+ factory -- A ConfigFactory instance
+ _id -- A unique name for the element
+ """
+
+ XmlBase.__init__(self, factory, "cluster_property_set", _id)
+
+ def __setitem__(self, key, value):
+ """ Add a child nvpair element containing the given key/value pair """
+
+ self.add_child(XmlBase(self._factory, "nvpair", "cts-%s" % key, name=key, value=value))
+
+ def commit(self):
+ """ Modify the CIB on the cluster to include this XML section """
+
+ self._run("modify", self.show(), "crm_config", "--allow-create")
+
+
+class OpDefaults(XmlBase):
+ """ A class that creates a <cts-op_defaults-meta> XML section of key/value
+ pairs for operation default settings
+ """
+
+ def __init__(self, factory):
+ """ Create a new OpDefaults instance
+
+ Arguments:
+
+ factory -- A ConfigFactory instance
+ """
+
+ XmlBase.__init__(self, factory, "op_defaults", None)
+ self.meta = XmlBase(self._factory, "meta_attributes", "cts-op_defaults-meta")
+ self.add_child(self.meta)
+
+ def __setitem__(self, key, value):
+ """ Add a child nvpair meta_attribute element containing the given
+ key/value pair
+ """
+
+ self.meta.add_child(XmlBase(self._factory, "nvpair", "cts-op_defaults-%s" % key, name=key, value=value))
+
+ def commit(self):
+ """ Modify the CIB on the cluster to include this XML section """
+
+ self._run("modify", self.show(), "configuration", "--allow-create")
+
+
+class Alerts(XmlBase):
+ """ A class that creates an <alerts> XML section """
+
+ def __init__(self, factory):
+ """ Create a new Alerts instance
+
+ Arguments:
+
+ factory -- A ConfigFactory instance
+ """
+
+ XmlBase.__init__(self, factory, "alerts", None)
+ self._alert_count = 0
+
+ def add_alert(self, path, recipient):
+ """ Create a new alert as a child of this XML section
+
+ Arguments:
+
+ path -- The path to a script to be called when a cluster
+ event occurs
+ recipient -- An environment variable to be passed to the script
+ """
+
+ self._alert_count += 1
+ alert = XmlBase(self._factory, "alert", "alert-%d" % self._alert_count,
+ path=path)
+ recipient1 = XmlBase(self._factory, "recipient",
+ "alert-%d-recipient-1" % self._alert_count,
+ value=recipient)
+ alert.add_child(recipient1)
+ self.add_child(alert)
+
+ def commit(self):
+ """ Modify the CIB on the cluster to include this XML section """
+
+ self._run("modify", self.show(), "configuration", "--allow-create")
+
+
+class Expression(XmlBase):
+ """ A class that creates an <expression> XML element as part of some
+ constraint rule
+ """
+
+ def __init__(self, factory, _id, attr, op, value=None):
+ """ Create a new Expression instance
+
+ Arguments:
+
+ factory -- A ConfigFactory instance
+ _id -- A unique name for the element
+ attr -- The attribute to be tested
+ op -- The comparison to perform ("lt", "eq", "defined", etc.)
+ value -- Value for comparison (can be None for "defined" and
+ "not_defined" operations)
+ """
+
+ XmlBase.__init__(self, factory, "expression", _id, attribute=attr, operation=op)
+ if value:
+ self["value"] = value
+
+
+class Rule(XmlBase):
+ """ A class that creates a <rule> XML section consisting of one or more
+ expressions, as part of some constraint
+ """
+
+ def __init__(self, factory, _id, score, op="and", expr=None):
+ """ Create a new Rule instance
+
+ Arguments:
+
+ factory -- A ConfigFactory instance
+ _id -- A unique name for the element
+ score -- If this rule is used in a location constraint and
+ evaluates to true, apply this score to the constraint
+ op -- If this rule contains more than one expression, use this
+ boolean op when evaluating
+ expr -- An Expression instance that can be added to this Rule
+ when it is created
+ """
+
+ XmlBase.__init__(self, factory, "rule", _id)
+
+ self["boolean-op"] = op
+ self["score"] = score
+
+ if expr:
+ self.add_child(expr)
+
+
+class Resource(XmlBase):
+ """ A base class that creates all kinds of <resource> XML sections fully
+ describing a single cluster resource. This defaults to primitive
+ resources, but subclasses can create other types.
+ """
+
+ def __init__(self, factory, _id, rtype, standard, provider=None):
+ """ Create a new Resource instance
+
+ Arguments:
+
+ factory -- A ConfigFactory instance
+ _id -- A unique name for the element
+ rtype -- The name of the resource agent
+ standard -- The standard the resource agent follows ("ocf",
+ "systemd", etc.)
+ provider -- The vendor providing the resource agent
+ """
+
+ XmlBase.__init__(self, factory, "native", _id)
+
+ self._provider = provider
+ self._rtype = rtype
+ self._standard = standard
+
+ self._meta = {}
+ self._op = []
+ self._param = {}
+
+ self._coloc = {}
+ self._needs = {}
+ self._scores = {}
+
+ if self._standard == "ocf" and not provider:
+ self._provider = "heartbeat"
+ elif self._standard == "lsb":
+ self._provider = None
+
+ def __setitem__(self, key, value):
+ """ Add a child nvpair element containing the given key/value pair as
+ an instance attribute
+ """
+
+ self._add_param(key, value)
+
+ def add_op(self, _id, interval, **kwargs):
+ """ Add an operation child XML element to this resource
+
+ Arguments:
+
+ _id -- A unique name for the element. Also, the action to
+ perform ("monitor", "start", "stop", etc.)
+ interval -- How frequently (in seconds) to perform the operation
+ kwargs -- Any additional key/value pairs that should be added to
+ this element as attributes
+ """
+
+ self._op.append(XmlBase(self._factory, "op", "%s-%s" % (_id, interval),
+ name=_id, interval=interval, **kwargs))
+
+ def _add_param(self, name, value):
+ """ Add a child nvpair element containing the given key/value pair as
+ an instance attribute
+ """
+
+ self._param[name] = value
+
+ def add_meta(self, name, value):
+ """ Add a child nvpair element containing the given key/value pair as
+ a meta attribute
+ """
+
+ self._meta[name] = value
+
+ def prefer(self, node, score="INFINITY", rule=None):
+ """ Add a location constraint where this resource prefers some node
+
+ Arguments:
+
+ node -- The name of the node to prefer
+ score -- Apply this score to the location constraint
+ rule -- A Rule instance to use in creating this constraint, instead
+ of creating a new rule
+ """
+
+ if not rule:
+ rule = Rule(self._factory, "prefer-%s-r" % node, score,
+ expr=Expression(self._factory, "prefer-%s-e" % node, "#uname", "eq", node))
+
+ self._scores[node] = rule
+
+ def after(self, resource, kind="Mandatory", first="start", then="start", **kwargs):
+ """ Create an ordering constraint between this resource and some other
+
+ Arguments:
+
+ resource -- The name of the dependent resource
+ kind -- How to enforce the constraint ("mandatory", "optional",
+ "serialize")
+ first -- The action that this resource must complete before the
+ then-action can be initiated for the dependent resource
+ ("start", "stop", "promote", "demote")
+ then -- The action that the dependent resource can execute only
+ after the first-action has completed (same values as
+ first)
+ kwargs -- Any additional key/value pairs that should be added to
+ this element as attributes
+ """
+
+ kargs = kwargs.copy()
+ kargs["kind"] = kind
+
+ if then:
+ kargs["first-action"] = "start"
+ kargs["then-action"] = then
+
+ if first:
+ kargs["first-action"] = first
+
+ self._needs[resource] = kargs
+
+ def colocate(self, resource, score="INFINITY", role=None, withrole=None, **kwargs):
+ """ Create a colocation constraint between this resource and some other
+
+ Arguments:
+
+ resource -- The name of the resource that should be located relative
+ this one
+ score -- Apply this score to the colocation constraint
+ role -- Apply this colocation constraint only to promotable clones
+ in this role ("started", "promoted", "unpromoted")
+ withrole -- Apply this colocation constraint only to with-rsc promotable
+ clones in this role
+ kwargs -- Any additional key/value pairs that should be added to
+ this element as attributes
+ """
+
+ kargs = kwargs.copy()
+ kargs["score"] = score
+
+ if role:
+ kargs["rsc-role"] = role
+
+ if withrole:
+ kargs["with-rsc-role"] = withrole
+
+ self._coloc[resource] = kargs
+
+ def _constraints(self):
+ """ Generate a <constraints> XML section containing all previously added
+ ordering and colocation constraints
+ """
+
+ text = "<constraints>"
+
+ for (k, v) in self._scores.items():
+ attrs = {"id": "prefer-%s" % k, "rsc": self.name}
+ text += containing_element("rsc_location", v.show(), **attrs)
+
+ for (k, kargs) in self._needs.items():
+ attrs = {"id": "%s-after-%s" % (self.name, k), "first": k, "then": self.name}
+ text += element("rsc_order", **attrs, **kargs)
+
+ for (k, kargs) in self._coloc.items():
+ attrs = {"id": "%s-with-%s" % (self.name, k), "rsc": self.name, "with-rsc": k}
+ text += element("rsc_colocation", **attrs)
+
+ text += "</constraints>"
+ return text
+
+ def show(self):
+ """ Return a string representation of this XML section, including all
+ of its children
+ """
+
+ text = '''<primitive id="%s" class="%s" type="%s"''' % (self.name, self._standard, self._rtype)
+
+ if self._provider:
+ text += ''' provider="%s"''' % self._provider
+
+ text += '''>'''
+
+ if self._meta:
+ nvpairs = ""
+ for (p, v) in self._meta.items():
+ attrs = {"id": "%s-%s" % (self.name, p), "name": p, "value": v}
+ nvpairs += element("nvpair", **attrs)
+
+ text += containing_element("meta_attributes", nvpairs,
+ id="%s-meta" % self.name)
+
+ if self._param:
+ nvpairs = ""
+ for (p, v) in self._param.items():
+ attrs = {"id": "%s-%s" % (self.name, p), "name": p, "value": v}
+ nvpairs += element("nvpair", **attrs)
+
+ text += containing_element("instance_attributes", nvpairs,
+ id="%s-params" % self.name)
+
+ if self._op:
+ text += '''<operations>'''
+
+ for o in self._op:
+ key = o.name
+ o.name = "%s-%s" % (self.name, key)
+ text += o.show()
+ o.name = key
+
+ text += '''</operations>'''
+
+ text += '''</primitive>'''
+ return text
+
+ def commit(self):
+ """ Modify the CIB on the cluster to include this XML section """
+
+ self._run("create", self.show(), "resources")
+ self._run("modify", self._constraints(), "constraints")
+
+
+class Group(Resource):
+ """ A specialized Resource subclass that creates a <group> XML section
+ describing a single group resource consisting of multiple child
+ primitive resources
+ """
+
+ def __init__(self, factory, _id):
+ """ Create a new Group instance
+
+ Arguments:
+
+ factory -- A ConfigFactory instance
+ _id -- A unique name for the element
+ """
+
+ Resource.__init__(self, factory, _id, None, None)
+ self.tag = "group"
+
+ def __setitem__(self, key, value):
+ self.add_meta(key, value)
+
+ def show(self):
+ """ Return a string representation of this XML section, including all
+ of its children
+ """
+
+ text = '''<%s id="%s">''' % (self.tag, self.name)
+
+ if len(self._meta) > 0:
+ nvpairs = ""
+ for (p, v) in self._meta.items():
+ attrs = {"id": "%s-%s" % (self.name, p), "name": p, "value": v}
+ nvpairs += element("nvpair", **attrs)
+
+ text += containing_element("meta_attributes", nvpairs,
+ id="%s-meta" % self.name)
+
+ for c in self._children:
+ text += c.show()
+
+ text += '''</%s>''' % self.tag
+ return text
+
+
+class Clone(Group):
+ """ A specialized Group subclass that creates a <clone> XML section
+ describing a clone resource containing multiple instances of a
+ single primitive resource
+ """
+
+ def __init__(self, factory, _id, child=None):
+ """ Create a new Clone instance
+
+ Arguments:
+
+ factory -- A ConfigFactory instance
+ _id -- A unique name for the element
+ child -- A Resource instance that can be added to this Clone
+ when it is created. Alternately, use add_child later.
+ Note that a Clone may only have one child.
+ """
+
+ Group.__init__(self, factory, _id)
+ self.tag = "clone"
+
+ if child:
+ self.add_child(child)
+
+ def add_child(self, child):
+ """ Add the given resource as a child of this Clone. Note that a
+ Clone resource only supports one child at a time.
+ """
+
+ if not self._children:
+ self._children.append(child)
+ else:
+ self._factory.log("Clones can only have a single child. Ignoring %s" % child.name)
diff --git a/python/pacemaker/_cts/clustermanager.py b/python/pacemaker/_cts/clustermanager.py
new file mode 100644
index 0000000..652108f
--- /dev/null
+++ b/python/pacemaker/_cts/clustermanager.py
@@ -0,0 +1,916 @@
+""" ClusterManager class for Pacemaker's Cluster Test Suite (CTS) """
+
+__all__ = ["ClusterManager"]
+__copyright__ = """Copyright 2000-2023 the Pacemaker project contributors.
+Certain portions by Huang Zhen <zhenhltc@cn.ibm.com> are copyright 2004
+International Business Machines. The version control history for this file
+may have further details."""
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import os
+import re
+import time
+
+from collections import UserDict
+
+from pacemaker.buildoptions import BuildOptions
+from pacemaker._cts.CTS import NodeStatus
+from pacemaker._cts.audits import AuditResource
+from pacemaker._cts.cib import ConfigFactory
+from pacemaker._cts.environment import EnvFactory
+from pacemaker._cts.logging import LogFactory
+from pacemaker._cts.patterns import PatternSelector
+from pacemaker._cts.remote import RemoteFactory
+from pacemaker._cts.watcher import LogWatcher
+
+# pylint doesn't understand that self._rsh is callable (it stores the
+# singleton instance of RemoteExec, as returned by the getInstance method
+# of RemoteFactory). It's possible we could fix this with type annotations,
+# but those were introduced with python 3.5 and we only support python 3.4.
+# I think we could also fix this by getting rid of the getInstance methods,
+# but that's a project for another day. For now, just disable the warning.
+# pylint: disable=not-callable
+
+# ClusterManager has a lot of methods.
+# pylint: disable=too-many-public-methods
+
+class ClusterManager(UserDict):
+ """ An abstract base class for managing the cluster. This class implements
+ high-level operations on the cluster and/or its cluster managers.
+ Actual cluster-specific management classes should be subclassed from this
+ one.
+
+ Among other things, this class tracks the state every node is expected to
+ be in.
+ """
+
+ def _final_conditions(self):
+ """ Check all keys to make sure they have a non-None value """
+
+ for (key, val) in self._data.items():
+ if val is None:
+ raise ValueError("Improper derivation: self[%s] must be overridden by subclass." % key)
+
+ def __init__(self):
+ """ Create a new ClusterManager instance. This class can be treated
+ kind of like a dictionary due to the process of certain dict
+ functions like __getitem__ and __setitem__. This is because it
+ contains a lot of name/value pairs. However, it is not actually
+ a dictionary so do not rely on standard dictionary behavior.
+ """
+
+ # Eventually, ClusterManager should not be a UserDict subclass. Until
+ # that point...
+ # pylint: disable=super-init-not-called
+ self.__instance_errors_to_ignore = []
+
+ self._cib_installed = False
+ self._data = {}
+ self._logger = LogFactory()
+
+ self.env = EnvFactory().getInstance()
+ self.expected_status = {}
+ self.name = self.env["Name"]
+ # pylint: disable=invalid-name
+ self.ns = NodeStatus(self.env)
+ self.our_node = os.uname()[1].lower()
+ self.partitions_expected = 1
+ self.rsh = RemoteFactory().getInstance()
+ self.templates = PatternSelector(self.env["Name"])
+
+ self._final_conditions()
+
+ self._cib_factory = ConfigFactory(self)
+ self._cib = self._cib_factory.create_config(self.env["Schema"])
+ self._cib_sync = {}
+
+ def __getitem__(self, key):
+ if key == "Name":
+ return self.name
+
+ print("FIXME: Getting %s from %r" % (key, self))
+ if key in self._data:
+ return self._data[key]
+
+ return self.templates.get_patterns(key)
+
+ def __setitem__(self, key, value):
+ print("FIXME: Setting %s=%s on %r" % (key, value, self))
+ self._data[key] = value
+
+ def clear_instance_errors_to_ignore(self):
+ """ Reset instance-specific errors to ignore on each iteration """
+
+ self.__instance_errors_to_ignore = []
+
+ @property
+ def instance_errors_to_ignore(self):
+ """ Return a list of known errors that should be ignored for a specific
+ test instance
+ """
+
+ return self.__instance_errors_to_ignore
+
+ @property
+ def errors_to_ignore(self):
+ """ Return a list of known error messages that should be ignored """
+
+ return self.templates.get_patterns("BadNewsIgnore")
+
+ def log(self, args):
+ """ Log a message """
+
+ self._logger.log(args)
+
+ def debug(self, args):
+ """ Log a debug message """
+
+ self._logger.debug(args)
+
+ def upcount(self):
+ """ How many nodes are up? """
+
+ count = 0
+
+ for node in self.env["nodes"]:
+ if self.expected_status[node] == "up":
+ count += 1
+
+ return count
+
+ def install_support(self, command="install"):
+ """ Install or uninstall the CTS support files - various init scripts and data,
+ daemons, fencing agents, etc.
+ """
+
+ for node in self.env["nodes"]:
+ self.rsh(node, "%s/cts-support %s" % (BuildOptions.DAEMON_DIR, command))
+
+ def prepare_fencing_watcher(self):
+ """ Return a LogWatcher object that watches for fencing log messages """
+
+ # If we don't have quorum now but get it as a result of starting this node,
+ # then a bunch of nodes might get fenced
+ if self.has_quorum(None):
+ self.debug("Have quorum")
+ return None
+
+ if not self.templates["Pat:Fencing_start"]:
+ print("No start pattern")
+ return None
+
+ if not self.templates["Pat:Fencing_ok"]:
+ print("No ok pattern")
+ return None
+
+ stonith = None
+ stonith_pats = []
+ for peer in self.env["nodes"]:
+ if self.expected_status[peer] == "up":
+ continue
+
+ stonith_pats.extend([
+ self.templates["Pat:Fencing_ok"] % peer,
+ self.templates["Pat:Fencing_start"] % peer,
+ ])
+
+ stonith = LogWatcher(self.env["LogFileName"], stonith_pats, self.env["nodes"],
+ self.env["LogWatcher"], "StartupFencing", 0)
+ stonith.set_watch()
+ return stonith
+
+ def fencing_cleanup(self, node, stonith):
+ """ Wait for a previously fenced node to return to the cluster """
+
+ peer_list = []
+ peer_state = {}
+
+ self.debug("Looking for nodes that were fenced as a result of %s starting" % node)
+
+ # If we just started a node, we may now have quorum (and permission to fence)
+ if not stonith:
+ self.debug("Nothing to do")
+ return peer_list
+
+ q = self.has_quorum(None)
+ if not q and len(self.env["nodes"]) > 2:
+ # We didn't gain quorum - we shouldn't have shot anyone
+ self.debug("Quorum: %s Len: %d" % (q, len(self.env["nodes"])))
+ return peer_list
+
+ for n in self.env["nodes"]:
+ peer_state[n] = "unknown"
+
+ # Now see if any states need to be updated
+ self.debug("looking for: %r" % stonith.regexes)
+ shot = stonith.look(0)
+
+ while shot:
+ self.debug("Found: %r" % shot)
+ del stonith.regexes[stonith.whichmatch]
+
+ # Extract node name
+ for n in self.env["nodes"]:
+ if re.search(self.templates["Pat:Fencing_ok"] % n, shot):
+ peer = n
+ peer_state[peer] = "complete"
+ self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_ok"] % peer)
+
+ elif peer_state[n] != "complete" and re.search(self.templates["Pat:Fencing_start"] % n, shot):
+ # TODO: Correctly detect multiple fencing operations for the same host
+ peer = n
+ peer_state[peer] = "in-progress"
+ self.__instance_errors_to_ignore.append(self.templates["Pat:Fencing_start"] % peer)
+
+ if not peer:
+ self._logger.log("ERROR: Unknown stonith match: %r" % shot)
+
+ elif not peer in peer_list:
+ self.debug("Found peer: %s" % peer)
+ peer_list.append(peer)
+
+ # Get the next one
+ shot = stonith.look(60)
+
+ for peer in peer_list:
+ self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer]))
+ if self.env["at-boot"]:
+ self.expected_status[peer] = "up"
+ else:
+ self.expected_status[peer] = "down"
+
+ if peer_state[peer] == "in-progress":
+ # Wait for any in-progress operations to complete
+ shot = stonith.look(60)
+
+ while stonith.regexes and shot:
+ self.debug("Found: %r" % shot)
+ del stonith.regexes[stonith.whichmatch]
+ shot = stonith.look(60)
+
+ # Now make sure the node is alive too
+ self.ns.wait_for_node(peer, self.env["DeadTime"])
+
+ # Poll until it comes up
+ if self.env["at-boot"]:
+ if not self.stat_cm(peer):
+ time.sleep(self.env["StartTime"])
+
+ if not self.stat_cm(peer):
+ self._logger.log("ERROR: Peer %s failed to restart after being fenced" % peer)
+ return None
+
+ return peer_list
+
+ def start_cm(self, node, verbose=False):
+ """ Start up the cluster manager on a given node """
+
+ if verbose:
+ self._logger.log("Starting %s on node %s" % (self.templates["Name"], node))
+ else:
+ self.debug("Starting %s on node %s" % (self.templates["Name"], node))
+
+ if not node in self.expected_status:
+ self.expected_status[node] = "down"
+
+ if self.expected_status[node] != "down":
+ return True
+
+ # Technically we should always be able to notice ourselves starting
+ patterns = [
+ self.templates["Pat:Local_started"] % node,
+ ]
+
+ if self.upcount() == 0:
+ patterns.append(self.templates["Pat:DC_started"] % node)
+ else:
+ patterns.append(self.templates["Pat:NonDC_started"] % node)
+
+ watch = LogWatcher(self.env["LogFileName"], patterns, self.env["nodes"], self.env["LogWatcher"],
+ "StartaCM", self.env["StartTime"] + 10)
+
+ self.install_config(node)
+
+ self.expected_status[node] = "any"
+
+ if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]):
+ self._logger.log("%s was already started" % node)
+ return True
+
+ stonith = self.prepare_fencing_watcher()
+ watch.set_watch()
+
+ (rc, _) = self.rsh(node, self.templates["StartCmd"])
+ if rc != 0:
+ self._logger.log("Warn: Start command failed on node %s" % node)
+ self.fencing_cleanup(node, stonith)
+ return False
+
+ self.expected_status[node] = "up"
+ watch_result = watch.look_for_all()
+
+ if watch.unmatched:
+ for regex in watch.unmatched:
+ self._logger.log("Warn: Startup pattern not found: %s" % regex)
+
+ if watch_result and self.cluster_stable(self.env["DeadTime"]):
+ self.fencing_cleanup(node, stonith)
+ return True
+
+ if self.stat_cm(node) and self.cluster_stable(self.env["DeadTime"]):
+ self.fencing_cleanup(node, stonith)
+ return True
+
+ self._logger.log("Warn: Start failed for node %s" % node)
+ return False
+
+ def start_cm_async(self, node, verbose=False):
+ """ Start up the cluster manager on a given node without blocking """
+
+ if verbose:
+ self._logger.log("Starting %s on node %s" % (self["Name"], node))
+ else:
+ self.debug("Starting %s on node %s" % (self["Name"], node))
+
+ self.install_config(node)
+ self.rsh(node, self.templates["StartCmd"], synchronous=False)
+ self.expected_status[node] = "up"
+
+ def stop_cm(self, node, verbose=False, force=False):
+ """ Stop the cluster manager on a given node """
+
+ if verbose:
+ self._logger.log("Stopping %s on node %s" % (self["Name"], node))
+ else:
+ self.debug("Stopping %s on node %s" % (self["Name"], node))
+
+ if self.expected_status[node] != "up" and not force:
+ return True
+
+ (rc, _) = self.rsh(node, self.templates["StopCmd"])
+ if rc == 0:
+ # Make sure we can continue even if corosync leaks
+ self.expected_status[node] = "down"
+ self.cluster_stable(self.env["DeadTime"])
+ return True
+
+ self._logger.log("ERROR: Could not stop %s on node %s" % (self["Name"], node))
+ return False
+
+ def stop_cm_async(self, node):
+ """ Stop the cluster manager on a given node without blocking """
+
+ self.debug("Stopping %s on node %s" % (self["Name"], node))
+
+ self.rsh(node, self.templates["StopCmd"], synchronous=False)
+ self.expected_status[node] = "down"
+
+ def startall(self, nodelist=None, verbose=False, quick=False):
+ """ Start the cluster manager on every node in the cluster, or on every
+ node in nodelist if not None
+ """
+
+ if not nodelist:
+ nodelist = self.env["nodes"]
+
+ for node in nodelist:
+ if self.expected_status[node] == "down":
+ self.ns.wait_for_all_nodes(nodelist, 300)
+
+ if not quick:
+ # This is used for "basic sanity checks", so only start one node ...
+ return self.start_cm(nodelist[0], verbose=verbose)
+
+ # Approximation of SimulStartList for --boot
+ watchpats = [
+ self.templates["Pat:DC_IDLE"],
+ ]
+ for node in nodelist:
+ watchpats.extend([
+ self.templates["Pat:InfraUp"] % node,
+ self.templates["Pat:PacemakerUp"] % node,
+ self.templates["Pat:Local_started"] % node,
+ self.templates["Pat:They_up"] % (nodelist[0], node),
+ ])
+
+ # Start all the nodes - at about the same time...
+ watch = LogWatcher(self.env["LogFileName"], watchpats, self.env["nodes"],
+ self.env["LogWatcher"], "fast-start", self.env["DeadTime"] + 10)
+ watch.set_watch()
+
+ if not self.start_cm(nodelist[0], verbose=verbose):
+ return False
+
+ for node in nodelist:
+ self.start_cm_async(node, verbose=verbose)
+
+ watch.look_for_all()
+ if watch.unmatched:
+ for regex in watch.unmatched:
+ self._logger.log("Warn: Startup pattern not found: %s" % regex)
+
+ if not self.cluster_stable():
+ self._logger.log("Cluster did not stabilize")
+ return False
+
+ return True
+
+ def stopall(self, nodelist=None, verbose=False, force=False):
+ """ Stop the cluster manager on every node in the cluster, or on every
+ node in nodelist if not None
+ """
+
+ ret = True
+
+ if not nodelist:
+ nodelist = self.env["nodes"]
+
+ for node in self.env["nodes"]:
+ if self.expected_status[node] == "up" or force:
+ if not self.stop_cm(node, verbose=verbose, force=force):
+ ret = False
+
+ return ret
+
+ def statall(self, nodelist=None):
+ """ Return the status of the cluster manager on every node in the cluster,
+ or on every node in nodelist if not None
+ """
+
+ result = {}
+
+ if not nodelist:
+ nodelist = self.env["nodes"]
+
+ for node in nodelist:
+ if self.stat_cm(node):
+ result[node] = "up"
+ else:
+ result[node] = "down"
+
+ return result
+
+ def isolate_node(self, target, nodes=None):
+ """ Break communication between the target node and all other nodes in the
+ cluster, or nodes if not None
+ """
+
+ if not nodes:
+ nodes = self.env["nodes"]
+
+ for node in nodes:
+ if node == target:
+ continue
+
+ (rc, _) = self.rsh(target, self.templates["BreakCommCmd"] % node)
+ if rc != 0:
+ self._logger.log("Could not break the communication between %s and %s: %d" % (target, node, rc))
+ return False
+
+ self.debug("Communication cut between %s and %s" % (target, node))
+
+ return True
+
+ def unisolate_node(self, target, nodes=None):
+ """ Re-establish communication between the target node and all other nodes
+ in the cluster, or nodes if not None
+ """
+
+ if not nodes:
+ nodes = self.env["nodes"]
+
+ for node in nodes:
+ if node == target:
+ continue
+
+ # Limit the amount of time we have asynchronous connectivity for
+ # Restore both sides as simultaneously as possible
+ self.rsh(target, self.templates["FixCommCmd"] % node, synchronous=False)
+ self.rsh(node, self.templates["FixCommCmd"] % target, synchronous=False)
+ self.debug("Communication restored between %s and %s" % (target, node))
+
+ def oprofile_start(self, node=None):
+ """ Start profiling on the given node, or all nodes in the cluster """
+
+ if not node:
+ for n in self.env["oprofile"]:
+ self.oprofile_start(n)
+
+ elif node in self.env["oprofile"]:
+ self.debug("Enabling oprofile on %s" % node)
+ self.rsh(node, "opcontrol --init")
+ self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all")
+ self.rsh(node, "opcontrol --start")
+ self.rsh(node, "opcontrol --reset")
+
+ def oprofile_save(self, test, node=None):
+ """ Save profiling data and restart profiling on the given node, or all
+ nodes in the cluster if None
+ """
+
+ if not node:
+ for n in self.env["oprofile"]:
+ self.oprofile_save(test, n)
+
+ elif node in self.env["oprofile"]:
+ self.rsh(node, "opcontrol --dump")
+ self.rsh(node, "opcontrol --save=cts.%d" % test)
+ # Read back with: opreport -l session:cts.0 image:<directory>/c*
+ self.oprofile_stop(node)
+ self.oprofile_start(node)
+
+ def oprofile_stop(self, node=None):
+ """ Start profiling on the given node, or all nodes in the cluster. This
+ does not save profiling data, so call oprofile_save first if needed.
+ """
+
+ if not node:
+ for n in self.env["oprofile"]:
+ self.oprofile_stop(n)
+
+ elif node in self.env["oprofile"]:
+ self.debug("Stopping oprofile on %s" % node)
+ self.rsh(node, "opcontrol --reset")
+ self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null")
+
+ def install_config(self, node):
+ """ Remove and re-install the CIB on the first node in the cluster """
+
+ if not self.ns.wait_for_node(node):
+ self.log("Node %s is not up." % node)
+ return
+
+ if node in self._cib_sync or not self.env["ClobberCIB"]:
+ return
+
+ self._cib_sync[node] = True
+ self.rsh(node, "rm -f %s/cib*" % BuildOptions.CIB_DIR)
+
+ # Only install the CIB on the first node, all the other ones will pick it up from there
+ if self._cib_installed:
+ return
+
+ self._cib_installed = True
+ if self.env["CIBfilename"] is None:
+ self.log("Installing Generated CIB on node %s" % node)
+ self._cib.install(node)
+
+ else:
+ self.log("Installing CIB (%s) on node %s" % (self.env["CIBfilename"], node))
+
+ rc = self.rsh.copy(self.env["CIBfilename"], "root@" + (self.templates["CIBfile"] % node))
+
+ if rc != 0:
+ raise ValueError("Can not scp file to %s %d" % (node, rc))
+
+ self.rsh(node, "chown %s %s/cib.xml" % (BuildOptions.DAEMON_USER, BuildOptions.CIB_DIR))
+
+ def prepare(self):
+ """ Finish initialization by clearing out the expected status and recording
+ the current status of every node in the cluster
+ """
+
+ self.partitions_expected = 1
+ for node in self.env["nodes"]:
+ self.expected_status[node] = ""
+
+ if self.env["experimental-tests"]:
+ self.unisolate_node(node)
+
+ self.stat_cm(node)
+
+ def test_node_cm(self, node):
+ """ Check the status of a given node. Returns 0 if the node is
+ down, 1 if the node is up but unstable, and 2 if the node is
+ up and stable
+ """
+
+ watchpats = [
+ "Current ping state: (S_IDLE|S_NOT_DC)",
+ self.templates["Pat:NonDC_started"] % node,
+ self.templates["Pat:DC_started"] % node,
+ ]
+
+ idle_watch = LogWatcher(self.env["LogFileName"], watchpats, [node],
+ self.env["LogWatcher"], "ClusterIdle")
+ idle_watch.set_watch()
+
+ (_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
+
+ if not out:
+ out = ""
+ else:
+ out = out[0].strip()
+
+ self.debug("Node %s status: '%s'" % (node, out))
+
+ if out.find('ok') < 0:
+ if self.expected_status[node] == "up":
+ self.log("Node status for %s is %s but we think it should be %s"
+ % (node, "down", self.expected_status[node]))
+
+ self.expected_status[node] = "down"
+ return 0
+
+ if self.expected_status[node] == "down":
+ self.log("Node status for %s is %s but we think it should be %s: %s"
+ % (node, "up", self.expected_status[node], out))
+
+ self.expected_status[node] = "up"
+
+ # check the output first - because syslog-ng loses messages
+ if out.find('S_NOT_DC') != -1:
+ # Up and stable
+ return 2
+
+ if out.find('S_IDLE') != -1:
+ # Up and stable
+ return 2
+
+ # fall back to syslog-ng and wait
+ if not idle_watch.look():
+ # just up
+ self.debug("Warn: Node %s is unstable: %s" % (node, out))
+ return 1
+
+ # Up and stable
+ return 2
+
+ def stat_cm(self, node):
+ """ Report the status of the cluster manager on a given node """
+
+ return self.test_node_cm(node) > 0
+
+ # Being up and being stable is not the same question...
+ def node_stable(self, node):
+ """ Return whether or not the given node is stable """
+
+ if self.test_node_cm(node) == 2:
+ return True
+
+ self.log("Warn: Node %s not stable" % node)
+ return False
+
+ def partition_stable(self, nodes, timeout=None):
+ """ Return whether or not all nodes in the given partition are stable """
+
+ watchpats = [
+ "Current ping state: S_IDLE",
+ self.templates["Pat:DC_IDLE"],
+ ]
+
+ self.debug("Waiting for cluster stability...")
+
+ if timeout is None:
+ timeout = self.env["DeadTime"]
+
+ if len(nodes) < 3:
+ self.debug("Cluster is inactive")
+ return True
+
+ idle_watch = LogWatcher(self.env["LogFileName"], watchpats, nodes.split(),
+ self.env["LogWatcher"], "ClusterStable", timeout)
+ idle_watch.set_watch()
+
+ for node in nodes.split():
+ # have each node dump its current state
+ self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
+
+ ret = idle_watch.look()
+
+ while ret:
+ self.debug(ret)
+
+ for node in nodes.split():
+ if re.search(node, ret):
+ return True
+
+ ret = idle_watch.look()
+
+ self.debug("Warn: Partition %r not IDLE after %ds" % (nodes, timeout))
+ return False
+
+ def cluster_stable(self, timeout=None, double_check=False):
+ """ Return whether or not all nodes in the cluster are stable """
+
+ partitions = self.find_partitions()
+
+ for partition in partitions:
+ if not self.partition_stable(partition, timeout):
+ return False
+
+ if not double_check:
+ return True
+
+ # Make sure we are really stable and that all resources,
+ # including those that depend on transient node attributes,
+ # are started if they were going to be
+ time.sleep(5)
+ for partition in partitions:
+ if not self.partition_stable(partition, timeout):
+ return False
+
+ return True
+
+ def is_node_dc(self, node, status_line=None):
+ """ Return whether or not the given node is the cluster DC by checking
+ the given status_line, or by querying the cluster if None
+ """
+
+ if not status_line:
+ (_, out) = self.rsh(node, self.templates["StatusCmd"] % node, verbose=1)
+
+ if out:
+ status_line = out[0].strip()
+
+ if not status_line:
+ return False
+
+ if status_line.find('S_IDLE') != -1:
+ return True
+
+ if status_line.find('S_INTEGRATION') != -1:
+ return True
+
+ if status_line.find('S_FINALIZE_JOIN') != -1:
+ return True
+
+ if status_line.find('S_POLICY_ENGINE') != -1:
+ return True
+
+ if status_line.find('S_TRANSITION_ENGINE') != -1:
+ return True
+
+ return False
+
+ def active_resources(self, node):
+ """ Return a list of primitive resources active on the given node """
+
+ (_, output) = self.rsh(node, "crm_resource -c", verbose=1)
+ resources = []
+ for line in output:
+ if not re.search("^Resource", line):
+ continue
+
+ tmp = AuditResource(self, line)
+ if tmp.type == "primitive" and tmp.host == node:
+ resources.append(tmp.id)
+
+ return resources
+
+ def resource_location(self, rid):
+ """ Return a list of nodes on which the given resource is running """
+
+ resource_nodes = []
+ for node in self.env["nodes"]:
+ if self.expected_status[node] != "up":
+ continue
+
+ cmd = self.templates["RscRunning"] % rid
+ (rc, lines) = self.rsh(node, cmd)
+
+ if rc == 127:
+ self.log("Command '%s' failed. Binary or pacemaker-cts package not installed?" % cmd)
+ for line in lines:
+ self.log("Output: %s " % line)
+
+ elif rc == 0:
+ resource_nodes.append(node)
+
+ return resource_nodes
+
+ def find_partitions(self):
+ """ Return a list of all partitions in the cluster. Each element of the
+ list is itself a list of all active nodes in that partition.
+ """
+
+ ccm_partitions = []
+
+ for node in self.env["nodes"]:
+ if self.expected_status[node] != "up":
+ self.debug("Node %s is down... skipping" % node)
+ continue
+
+ (_, out) = self.rsh(node, self.templates["PartitionCmd"], verbose=1)
+
+ if not out:
+ self.log("no partition details for %s" % node)
+ continue
+
+ partition = out[0].strip()
+
+ if len(partition) <= 2:
+ self.log("bad partition details for %s" % node)
+ continue
+
+ nodes = partition.split()
+ nodes.sort()
+ partition = ' '.join(nodes)
+
+ found = 0
+ for a_partition in ccm_partitions:
+ if partition == a_partition:
+ found = 1
+
+ if found == 0:
+ self.debug("Adding partition from %s: %s" % (node, partition))
+ ccm_partitions.append(partition)
+ else:
+ self.debug("Partition '%s' from %s is consistent with existing entries" % (partition, node))
+
+ self.debug("Found partitions: %r" % ccm_partitions)
+ return ccm_partitions
+
+ def has_quorum(self, node_list):
+ """ Return whether or not the cluster has quorum """
+
+ # If we are auditing a partition, then one side will
+ # have quorum and the other not.
+ # So the caller needs to tell us which we are checking
+ # If no value for node_list is specified... assume all nodes
+ if not node_list:
+ node_list = self.env["nodes"]
+
+ for node in node_list:
+ if self.expected_status[node] != "up":
+ continue
+
+ (_, quorum) = self.rsh(node, self.templates["QuorumCmd"], verbose=1)
+ quorum = quorum[0].strip()
+
+ if quorum.find("1") != -1:
+ return True
+
+ if quorum.find("0") != -1:
+ return False
+
+ self.debug("WARN: Unexpected quorum test result from %s:%s" % (node, quorum))
+
+ return False
+
+ @property
+ def components(self):
+ """ A list of all patterns that should be ignored for the cluster's
+ components. This must be provided by all subclasses.
+ """
+
+ raise NotImplementedError
+
+ def in_standby_mode(self, node):
+ """ Return whether or not the node is in Standby """
+
+ (_, out) = self.rsh(node, self.templates["StandbyQueryCmd"] % node, verbose=1)
+
+ if not out:
+ return False
+
+ out = out[0].strip()
+ self.debug("Standby result: %s" % out)
+ return out == "on"
+
+ def set_standby_mode(self, node, status):
+ """ Set node to Standby if status is True, or Active if status is False.
+ Return whether the node is now in the requested status.
+ """
+
+ current_status = self.in_standby_mode(node)
+
+ if current_status == status:
+ return True
+
+ if status:
+ cmd = self.templates["StandbyCmd"] % (node, "on")
+ else:
+ cmd = self.templates["StandbyCmd"] % (node, "off")
+
+ (rc, _) = self.rsh(node, cmd)
+ return rc == 0
+
+ def add_dummy_rsc(self, node, rid):
+ """ Add a dummy resource with the given ID to the given node """
+
+ rsc_xml = """ '<resources>
+ <primitive class=\"ocf\" id=\"%s\" provider=\"pacemaker\" type=\"Dummy\">
+ <operations>
+ <op id=\"%s-interval-10s\" interval=\"10s\" name=\"monitor\"/
+ </operations>
+ </primitive>
+ </resources>'""" % (rid, rid)
+ constraint_xml = """ '<constraints>
+ <rsc_location id=\"location-%s-%s\" node=\"%s\" rsc=\"%s\" score=\"INFINITY\"/>
+ </constraints>'
+ """ % (rid, node, node, rid)
+
+ self.rsh(node, self.templates['CibAddXml'] % rsc_xml)
+ self.rsh(node, self.templates['CibAddXml'] % constraint_xml)
+
+ def remove_dummy_rsc(self, node, rid):
+ """ Remove the previously added dummy resource given by rid on the
+ given node
+ """
+
+ constraint = "\"//rsc_location[@rsc='%s']\"" % rid
+ rsc = "\"//primitive[@id='%s']\"" % rid
+
+ self.rsh(node, self.templates['CibDelXpath'] % constraint)
+ self.rsh(node, self.templates['CibDelXpath'] % rsc)
diff --git a/python/pacemaker/_cts/cmcorosync.py b/python/pacemaker/_cts/cmcorosync.py
new file mode 100644
index 0000000..cac059b
--- /dev/null
+++ b/python/pacemaker/_cts/cmcorosync.py
@@ -0,0 +1,80 @@
+""" Corosync-specific class for Pacemaker's Cluster Test Suite (CTS) """
+
+__all__ = ["Corosync2"]
+__copyright__ = "Copyright 2007-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.CTS import Process
+from pacemaker._cts.clustermanager import ClusterManager
+from pacemaker._cts.patterns import PatternSelector
+
+# Throughout this file, pylint has trouble understanding that EnvFactory
+# is a singleton instance that can be treated as a subscriptable object.
+# Various warnings are disabled because of this. See also a comment about
+# self._rsh in environment.py.
+# pylint: disable=unsubscriptable-object
+
+class Corosync2(ClusterManager):
+ """ A subclass of ClusterManager specialized to handle corosync2 and later
+ based clusters
+ """
+
+ def __init__(self):
+ """ Create a new Corosync2 instance """
+
+ ClusterManager.__init__(self)
+
+ self._fullcomplist = {}
+ self.templates = PatternSelector(self.name)
+
+ @property
+ def components(self):
+ """ A list of all patterns that should be ignored for the cluster's
+ components.
+ """
+
+ complist = []
+
+ if not self._fullcomplist:
+ common_ignore = self.templates.get_component("common-ignore")
+
+ daemons = [
+ "pacemaker-based",
+ "pacemaker-controld",
+ "pacemaker-attrd",
+ "pacemaker-execd",
+ "pacemaker-fenced"
+ ]
+ for c in daemons:
+ badnews = self.templates.get_component("%s-ignore" % c) + common_ignore
+ proc = Process(self, c, pats=self.templates.get_component(c),
+ badnews_ignore=badnews)
+ self._fullcomplist[c] = proc
+
+ # the scheduler uses dc_pats instead of pats
+ badnews = self.templates.get_component("pacemaker-schedulerd-ignore") + common_ignore
+ proc = Process(self, "pacemaker-schedulerd",
+ dc_pats=self.templates.get_component("pacemaker-schedulerd"),
+ badnews_ignore=badnews)
+ self._fullcomplist["pacemaker-schedulerd"] = proc
+
+ # add (or replace) extra components
+ badnews = self.templates.get_component("corosync-ignore") + common_ignore
+ proc = Process(self, "corosync", pats=self.templates.get_component("corosync"),
+ badnews_ignore=badnews)
+ self._fullcomplist["corosync"] = proc
+
+ # Processes running under valgrind can't be shot with "killall -9 processname",
+ # so don't include them in the returned list
+ vgrind = self.env["valgrind-procs"].split()
+ for (key, val) in self._fullcomplist.items():
+ if self.env["valgrind-tests"] and key in vgrind:
+ self.log("Filtering %s from the component list as it is being profiled by valgrind" % key)
+ continue
+
+ if key == "pacemaker-fenced" and not self.env["DoFencing"]:
+ continue
+
+ complist.append(val)
+
+ return complist
diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py
index e4d70e6..732ab24 100644
--- a/python/pacemaker/_cts/environment.py
+++ b/python/pacemaker/_cts/environment.py
@@ -11,6 +11,7 @@ import socket
import sys
import time
+from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.remote import RemoteFactory
from pacemaker._cts.watcher import LogKind
@@ -31,7 +32,7 @@ class Environment:
def __init__(self, args):
""" Create a new Environment instance. This class can be treated kind
of like a dictionary due to the presence of typical dict functions
- like has_key, __getitem__, and __setitem__. However, it is not a
+ like __contains__, __getitem__, and __setitem__. However, it is not a
dictionary so do not rely on standard dictionary behavior.
Arguments:
@@ -100,7 +101,7 @@ class Environment:
return list(self.data.keys())
- def has_key(self, key):
+ def __contains__(self, key):
""" Does the given environment key exist? """
if key == "nodes":
@@ -120,10 +121,7 @@ class Environment:
if key == "Name":
return self._get_stack_short()
- if key in self.data:
- return self.data[key]
-
- return None
+ return self.data.get(key)
def __setitem__(self, key, value):
""" Set the given environment key to the given value, overriding any
@@ -161,6 +159,14 @@ class Environment:
return self.random_gen.choice(self["nodes"])
+ def get(self, key, default=None):
+ """ Return the value for key if key is in the environment, else default """
+
+ if key == "nodes":
+ return self._nodes
+
+ return self.data.get(key, default)
+
def _set_stack(self, name):
""" Normalize the given cluster stack name """
@@ -279,7 +285,7 @@ class Environment:
# pylint: disable=no-member
if int(self["IPBase"].split('.')[3]) >= 240:
self._logger.log("Could not determine an offset for IPaddr resources. Upper bound is too high: %s %s"
- % (self["IPBase"], self["IPBase"].split('.')[3]))
+ % (self["IPBase"], self["IPBase"].split('.')[3]))
self["IPBase"] = " fe80::1234:56:7890:1000"
self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"])
@@ -294,7 +300,7 @@ class Environment:
# it as an int in __init__ and treat it as an int everywhere.
# pylint: disable=bad-string-format-type
self._logger.log("Limiting the number of nodes configured=%d (max=%d)"
- %(len(self["nodes"]), self["node-limit"]))
+ % (len(self["nodes"]), self["node-limit"]))
while len(self["nodes"]) > self["node-limit"]:
self["nodes"].pop(len(self["nodes"])-1)
@@ -398,15 +404,9 @@ class Environment:
grp4.add_argument("--boot",
action="store_true",
help="")
- grp4.add_argument("--bsc",
- action="store_true",
- help="")
grp4.add_argument("--cib-filename",
metavar="PATH",
help="Install the given CIB file to the cluster")
- grp4.add_argument("--container-tests",
- action="store_true",
- help="Include pacemaker_remote tests that run in lxc container resources")
grp4.add_argument("--experimental-tests",
action="store_true",
help="Include experimental tests")
@@ -438,7 +438,7 @@ class Environment:
help="Use QARSH to access nodes instead of SSH")
grp4.add_argument("--schema",
metavar="SCHEMA",
- default="pacemaker-3.0",
+ default="pacemaker-%s" % BuildOptions.CIB_SCHEMA_VERSION,
help="Create a CIB conforming to the given schema")
grp4.add_argument("--seed",
metavar="SEED",
@@ -491,7 +491,6 @@ class Environment:
self["at-boot"] = args.at_boot in ["1", "yes"]
self["benchmark"] = args.benchmark
self["continue"] = args.always_continue
- self["container-tests"] = args.container_tests
self["experimental-tests"] = args.experimental_tests
self["iterations"] = args.iterations
self["loop-minutes"] = args.loop_minutes
@@ -542,10 +541,6 @@ class Environment:
if args.boot:
self["scenario"] = "boot"
- if args.bsc:
- self["DoBSC"] = True
- self["scenario"] = "basic-sanity"
-
if args.cib_filename:
self["CIBfilename"] = args.cib_filename
else:
diff --git a/python/pacemaker/_cts/input.py b/python/pacemaker/_cts/input.py
new file mode 100644
index 0000000..7e734f6
--- /dev/null
+++ b/python/pacemaker/_cts/input.py
@@ -0,0 +1,18 @@
+""" User input related utilities for CTS """
+
+__all__ = ["should_continue"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+def should_continue(env):
+ """ On failure, prompt the user to see if we should continue """
+
+ if env["continue"]:
+ return True
+
+ try:
+ answer = input("Continue? [yN]")
+ except EOFError:
+ answer = "n"
+
+ return answer in ["y", "Y"]
diff --git a/python/pacemaker/_cts/logging.py b/python/pacemaker/_cts/logging.py
index d9f3012..6c7bfb0 100644
--- a/python/pacemaker/_cts/logging.py
+++ b/python/pacemaker/_cts/logging.py
@@ -21,7 +21,7 @@ class Logger:
self._logfile = filename
if tag:
- self._source = tag + ": "
+ self._source = "%s: " % tag
else:
self._source = ""
diff --git a/python/pacemaker/_cts/network.py b/python/pacemaker/_cts/network.py
new file mode 100644
index 0000000..33e401f
--- /dev/null
+++ b/python/pacemaker/_cts/network.py
@@ -0,0 +1,59 @@
+""" Network related utilities for CTS """
+
+__all__ = ["next_ip"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+# pylint: disable=global-statement
+CURRENT_IP = None
+
+def next_ip(ip_base=None, reset=False):
+ """ Return the next available IP address.
+
+ Arguments:
+
+ ip_base -- The initial IP address to start from. The first call to next_ip
+ will return the next IP address from this base. Each subsequent
+ call will return the next address from the previous call, so you
+ can just omit this argument for subsequent calls.
+ reset -- Force next_ip to start from ip_base again. This requires also
+ passing the ip_base argument. (Mostly useful for unit testing,
+ but may be useful elsewhere).
+
+ This function only increments the last portion of the IP address. Once it
+ has hit the upper limit, ValueError will be raised.
+ """
+
+ global CURRENT_IP
+
+ if CURRENT_IP is None or reset:
+ CURRENT_IP = ip_base
+
+ new_ip = None
+
+ # Split the existing IP address up into a tuple of:
+ # (everything except the last part of the addr, the separator, the last part of the addr).
+ # For instance, "192.168.1.2" becomes ("192.168.1", ".", "2"). Then,
+ # increment the last part of the address and paste everything back
+ # together.
+ if ":" in CURRENT_IP:
+ # This is an IPv6 address
+ fields = CURRENT_IP.rpartition(":")
+ new_ip = int(fields[2], 16) + 1
+
+ if new_ip > 0xffff:
+ raise ValueError("No more available IP addresses")
+
+ # hex() puts "0x" at the front of the string, so strip it off.
+ new_ip = hex(new_ip)[2:]
+
+ else:
+ # This is an IPv4 address
+ fields = CURRENT_IP.rpartition(".")
+ new_ip = int(fields[2]) + 1
+
+ if new_ip > 255:
+ raise ValueError("No more available IP addresses")
+
+ CURRENT_IP = "%s%s%s" % (fields[0], fields[1], new_ip)
+ return CURRENT_IP
diff --git a/python/pacemaker/_cts/patterns.py b/python/pacemaker/_cts/patterns.py
index 880477a..0fb1c2b 100644
--- a/python/pacemaker/_cts/patterns.py
+++ b/python/pacemaker/_cts/patterns.py
@@ -220,7 +220,7 @@ class Corosync2Patterns(BasePatterns):
r"pending LRM operations at shutdown",
r"Lost connection to the CIB manager",
r"pacemaker-controld.*:\s*Action A_RECOVER .* not supported",
- r"pacemaker-controld.*:\s*Performing A_EXIT_1 - forcefully exiting ",
+ r"pacemaker-controld.*:\s*Exiting now due to errors",
r".*:\s*Requesting fencing \([^)]+\) targeting node ",
r"(Blackbox dump requested|Problem detected)",
]
@@ -238,7 +238,7 @@ class Corosync2Patterns(BasePatterns):
r"error:.*Connection to cib_(shm|rw).* (failed|closed)",
r"error:.*cib_(shm|rw) IPC provider disconnected while waiting",
r"error:.*Connection to (fencer|stonith-ng).* (closed|failed|lost)",
- r"crit: Fencing daemon connection failed",
+ r"error: Lost fencer connection",
# This is overbroad, but we don't have a way to say that only
# certain transition errors are acceptable (if the fencer respawns,
# fence devices may appear multiply active). We have to rely on
@@ -253,7 +253,7 @@ class Corosync2Patterns(BasePatterns):
# it's possible for another daemon to lose that connection and
# exit before losing the cluster connection.
r"pacemakerd.*:\s*warning:.*Lost connection to cluster layer",
- r"pacemaker-attrd.*:\s*(crit|error):.*Lost connection to (cluster layer|the CIB manager)",
+ r"pacemaker-attrd.*:\s*(crit|error):.*Lost connection to (Corosync process group|the CIB manager)",
r"pacemaker-based.*:\s*(crit|error):.*Lost connection to cluster layer",
r"pacemaker-controld.*:\s*(crit|error):.*Lost connection to (cluster layer|the CIB manager)",
r"pacemaker-fenced.*:\s*(crit|error):.*Lost connection to (cluster layer|the CIB manager)",
@@ -290,7 +290,7 @@ class Corosync2Patterns(BasePatterns):
]
self._components["pacemaker-execd"] = [
- r"pacemaker-controld.*Connection to executor failed",
+ r"pacemaker-controld.*Lost connection to local executor",
r"pacemaker-controld.*I_ERROR.*lrm_connection_destroy",
r"pacemaker-controld.*State transition .* S_RECOVERY",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
@@ -317,7 +317,7 @@ class Corosync2Patterns(BasePatterns):
r"State transition .* S_RECOVERY",
r"pacemakerd.* Respawning pacemaker-controld subdaemon after unexpected exit",
r"pacemaker-controld\[[0-9]+\] exited with status 1 \(",
- r"Connection to the scheduler failed",
+ r"pacemaker-controld.*Lost connection to the scheduler",
r"pacemaker-controld.*I_ERROR.*save_cib_contents",
r"pacemaker-controld.*: Input I_TERMINATE .*from do_recover",
r"pacemaker-controld.*Could not recover from internal error",
@@ -329,13 +329,13 @@ class Corosync2Patterns(BasePatterns):
self._components["pacemaker-fenced"] = [
r"error:.*Connection to (fencer|stonith-ng).* (closed|failed|lost)",
- r"Fencing daemon connection failed",
+ r"Lost fencer connection",
r"pacemaker-controld.*Fencer successfully connected",
]
self._components["pacemaker-fenced-ignore"] = [
r"(error|warning):.*Connection to (fencer|stonith-ng).* (closed|failed|lost)",
- r"crit:.*Fencing daemon connection failed",
+ r"error:.*Lost fencer connection",
r"error:.*Fencer connection failed \(will retry\)",
r"pacemaker-controld.*:\s+Result of .* operation for Fencing.*Error \(Lost connection to fencer\)",
# This is overbroad, but we don't have a way to say that only
diff --git a/python/pacemaker/_cts/process.py b/python/pacemaker/_cts/process.py
index 2940b71..757360c 100644
--- a/python/pacemaker/_cts/process.py
+++ b/python/pacemaker/_cts/process.py
@@ -63,7 +63,7 @@ def pipe_communicate(pipes, check_stderr=False, stdin=None):
output = pipe_outputs[0].decode(sys.stdout.encoding)
if check_stderr:
- output = output + pipe_outputs[1].decode(sys.stderr.encoding)
+ output += pipe_outputs[1].decode(sys.stderr.encoding)
return output
diff --git a/python/pacemaker/_cts/remote.py b/python/pacemaker/_cts/remote.py
index 99d2ed7..4b6b8f6 100644
--- a/python/pacemaker/_cts/remote.py
+++ b/python/pacemaker/_cts/remote.py
@@ -7,7 +7,7 @@ __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT AN
import re
import os
-from subprocess import Popen,PIPE
+from subprocess import Popen, PIPE
from threading import Thread
from pacemaker._cts.logging import LogFactory
@@ -71,7 +71,7 @@ class AsyncCmd(Thread):
self._proc.wait()
if self._delegate:
- self._logger.debug("cmd: pid %d returned %d to %s" % (self._proc.pid, self._proc.returncode, repr(self._delegate)))
+ self._logger.debug("cmd: pid %d returned %d to %r" % (self._proc.pid, self._proc.returncode, self._delegate))
else:
self._logger.debug("cmd: pid %d returned %d" % (self._proc.pid, self._proc.returncode))
@@ -126,7 +126,7 @@ class RemoteExec:
sysname = args[0]
command = args[1]
- if sysname is None or sysname.lower() == self._our_node or sysname == "localhost":
+ if sysname is None or sysname.lower() in [self._our_node, "localhost"]:
ret = command
else:
ret = "%s %s '%s'" % (self._command, sysname, self._fixcmd(command))
@@ -188,7 +188,7 @@ class RemoteExec:
result = None
# pylint: disable=consider-using-with
proc = Popen(self._cmd([node, command]),
- stdout = PIPE, stderr = PIPE, close_fds = True, shell = True)
+ stdout=PIPE, stderr=PIPE, close_fds=True, shell=True)
if not synchronous and proc.pid > 0 and not self._silent:
aproc = AsyncCmd(node, command, proc=proc)
diff --git a/python/pacemaker/_cts/scenarios.py b/python/pacemaker/_cts/scenarios.py
new file mode 100644
index 0000000..769b2d0
--- /dev/null
+++ b/python/pacemaker/_cts/scenarios.py
@@ -0,0 +1,422 @@
+""" Test scenario classes for Pacemaker's Cluster Test Suite (CTS) """
+
+__all__ = [
+ "AllOnce",
+ "Boot",
+ "BootCluster",
+ "LeaveBooted",
+ "RandomTests",
+ "Sequence",
+]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import re
+import time
+
+from pacemaker._cts.audits import ClusterAudit
+from pacemaker._cts.input import should_continue
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.watcher import LogWatcher
+
+class ScenarioComponent:
+ """ The base class for all scenario components. A scenario component is
+ one single step in a scenario. Each component is basically just a setup
+ and teardown method.
+ """
+
+ def __init__(self, cm, env):
+ """ Create a new ScenarioComponent instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ env -- An Environment instance
+ """
+
+ # pylint: disable=invalid-name
+ self._cm = cm
+ self._env = env
+
+ def is_applicable(self):
+ """ Return True if this component is applicable in the given Environment.
+ This method must be provided by all subclasses.
+ """
+
+ raise NotImplementedError
+
+ def setup(self):
+ """ Set up the component, returning True on success. This method must be
+ provided by all subclasses.
+ """
+
+ raise NotImplementedError
+
+ def teardown(self):
+ """ Tear down the given component. This method must be provided by all
+ subclasses.
+ """
+
+ raise NotImplementedError
+
+
+class Scenario:
+ """ The base class for scenario. A scenario is an ordered list of
+ ScenarioComponent objects. A scenario proceeds by setting up all its
+ components in sequence, running a list of tests and audits, and then
+ tearing down its components in reverse.
+ """
+
+ def __init__(self, cm, components, audits, tests):
+ """ Create a new Scenario instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ components -- A list of ScenarioComponents comprising this Scenario
+ audits -- A list of ClusterAudits that will be performed as
+ part of this Scenario
+ tests -- A list of CTSTests that will be run
+ """
+
+ # pylint: disable=invalid-name
+
+ self.stats = {
+ "success": 0,
+ "failure": 0,
+ "BadNews": 0,
+ "skipped": 0
+ }
+ self.tests = tests
+
+ self._audits = audits
+ self._bad_news = None
+ self._cm = cm
+ self._components = components
+
+ for comp in components:
+ if not issubclass(comp.__class__, ScenarioComponent):
+ raise ValueError("Init value must be subclass of ScenarioComponent")
+
+ for audit in audits:
+ if not issubclass(audit.__class__, ClusterAudit):
+ raise ValueError("Init value must be subclass of ClusterAudit")
+
+ for test in tests:
+ if not issubclass(test.__class__, CTSTest):
+ raise ValueError("Init value must be a subclass of CTSTest")
+
+ def is_applicable(self):
+ """ Return True if all ScenarioComponents are applicable """
+
+ for comp in self._components:
+ if not comp.is_applicable():
+ return False
+
+ return True
+
+ def setup(self):
+ """ Set up the scenario, returning True on success. If setup fails at
+ some point, tear down those components that did successfully set up.
+ """
+
+ self._cm.prepare()
+ self.audit() # Also detects remote/local log config
+ self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
+
+ self.audit()
+ self._cm.install_support()
+
+ self._bad_news = LogWatcher(self._cm.env["LogFileName"],
+ self._cm.templates.get_patterns("BadNews"),
+ self._cm.env["nodes"],
+ self._cm.env["LogWatcher"],
+ "BadNews", 0)
+ self._bad_news.set_watch() # Call after we've figured out what type of log watching to do in LogAudit
+
+ j = 0
+ while j < len(self._components):
+ if not self._components[j].setup():
+ # OOPS! We failed. Tear partial setups down.
+ self.audit()
+ self._cm.log("Tearing down partial setup")
+ self.teardown(j)
+ return False
+
+ j += 1
+
+ self.audit()
+ return True
+
+ def teardown(self, n_components=None):
+ """ Tear down the scenario in the reverse order it was set up. If
+ n_components is not None, only tear down that many components.
+ """
+
+ if not n_components:
+ n_components = len(self._components)-1
+
+ j = n_components
+
+ while j >= 0:
+ self._components[j].teardown()
+ j -= 1
+
+ self.audit()
+ self._cm.install_support("uninstall")
+
+ def incr(self, name):
+ """ Increment the given stats key """
+
+ if not name in self.stats:
+ self.stats[name] = 0
+
+ self.stats[name] += 1
+
+ def run(self, iterations):
+ """ Run all tests in the scenario the given number of times """
+
+ self._cm.oprofile_start()
+
+ try:
+ self._run_loop(iterations)
+ self._cm.oprofile_stop()
+ except:
+ self._cm.oprofile_stop()
+ raise
+
+ def _run_loop(self, iterations):
+ """ Do the hard part of the run method - actually run all the tests the
+ given number of times.
+ """
+
+ raise NotImplementedError
+
+ def run_test(self, test, testcount):
+ """ Run the given test. testcount is the number of tests (including
+ this one) that have been run across all iterations.
+ """
+
+ nodechoice = self._cm.env.random_node()
+
+ ret = True
+ did_run = False
+
+ self._cm.clear_instance_errors_to_ignore()
+ choice = "(%s)" % nodechoice
+ self._cm.log("Running test {:<22} {:<15} [{:>3}]".format(test.name, choice, testcount))
+
+ starttime = test.set_timer()
+
+ if not test.setup(nodechoice):
+ self._cm.log("Setup failed")
+ ret = False
+ else:
+ did_run = True
+ ret = test(nodechoice)
+
+ if not test.teardown(nodechoice):
+ self._cm.log("Teardown failed")
+
+ if not should_continue(self._cm.env):
+ raise ValueError("Teardown of %s on %s failed" % (test.name, nodechoice))
+
+ ret = False
+
+ stoptime = time.time()
+ self._cm.oprofile_save(testcount)
+
+ elapsed_time = stoptime - starttime
+ test_time = stoptime - test.get_timer()
+
+ if "min_time" not in test.stats:
+ test.stats["elapsed_time"] = elapsed_time
+ test.stats["min_time"] = test_time
+ test.stats["max_time"] = test_time
+ else:
+ test.stats["elapsed_time"] += elapsed_time
+
+ if test_time < test.stats["min_time"]:
+ test.stats["min_time"] = test_time
+
+ if test_time > test.stats["max_time"]:
+ test.stats["max_time"] = test_time
+
+ if ret:
+ self.incr("success")
+ test.log_timer()
+ else:
+ self.incr("failure")
+ self._cm.statall()
+ did_run = True # Force the test count to be incremented anyway so test extraction works
+
+ self.audit(test.errors_to_ignore)
+ return did_run
+
+ def summarize(self):
+ """ Output scenario results """
+
+ self._cm.log("****************")
+ self._cm.log("Overall Results:%r" % self.stats)
+ self._cm.log("****************")
+
+ stat_filter = {
+ "calls": 0,
+ "failure": 0,
+ "skipped": 0,
+ "auditfail": 0,
+ }
+
+ self._cm.log("Test Summary")
+ for test in self.tests:
+ for key in stat_filter:
+ stat_filter[key] = test.stats[key]
+
+ name = "Test %s:" % test.name
+ self._cm.log("{:<25} {!r}".format(name, stat_filter))
+
+ self._cm.debug("Detailed Results")
+ for test in self.tests:
+ name = "Test %s:" % test.name
+ self._cm.debug("{:<25} {!r}".format(name, stat_filter))
+
+ self._cm.log("<<<<<<<<<<<<<<<< TESTS COMPLETED")
+
+ def audit(self, local_ignore=None):
+ """ Perform all scenario audits and log results. If there are too many
+ failures, prompt the user to confirm that the scenario should continue
+ running.
+ """
+
+ errcount = 0
+
+ ignorelist = ["CTS:"]
+
+ if local_ignore:
+ ignorelist.extend(local_ignore)
+
+ ignorelist.extend(self._cm.errors_to_ignore)
+ ignorelist.extend(self._cm.instance_errors_to_ignore)
+
+ # This makes sure everything is stabilized before starting...
+ failed = 0
+ for audit in self._audits:
+ if not audit():
+ self._cm.log("Audit %s FAILED." % audit.name)
+ failed += 1
+ else:
+ self._cm.debug("Audit %s passed." % audit.name)
+
+ while errcount < 1000:
+ match = None
+ if self._bad_news:
+ match = self._bad_news.look(0)
+
+ if match:
+ add_err = True
+
+ for ignore in ignorelist:
+ if add_err and re.search(ignore, match):
+ add_err = False
+
+ if add_err:
+ self._cm.log("BadNews: %s" % match)
+ self.incr("BadNews")
+ errcount += 1
+ else:
+ break
+ else:
+ print("Big problems")
+ if not should_continue(self._cm.env):
+ self._cm.log("Shutting down.")
+ self.summarize()
+ self.teardown()
+ raise ValueError("Looks like we hit a BadNews jackpot!")
+
+ if self._bad_news:
+ self._bad_news.end()
+
+ return failed
+
+
+class AllOnce(Scenario):
+ """ Every Test Once """
+
+ def _run_loop(self, iterations):
+ testcount = 1
+
+ for test in self.tests:
+ self.run_test(test, testcount)
+ testcount += 1
+
+
+class RandomTests(Scenario):
+ """ Random Test Execution """
+
+ def _run_loop(self, iterations):
+ testcount = 1
+
+ while testcount <= iterations:
+ test = self._cm.env.random_gen.choice(self.tests)
+ self.run_test(test, testcount)
+ testcount += 1
+
+
+class Sequence(Scenario):
+ """ Named Tests in Sequence """
+
+ def _run_loop(self, iterations):
+ testcount = 1
+
+ while testcount <= iterations:
+ for test in self.tests:
+ self.run_test(test, testcount)
+ testcount += 1
+
+
+class Boot(Scenario):
+ """ Start the Cluster """
+
+ def _run_loop(self, iterations):
+ return
+
+
+class BootCluster(ScenarioComponent):
+ """ The BootCluster component simply starts the cluster manager on all
+ nodes, waiting for each to come up before starting given that a node
+ might have been rebooted or crashed beforehand.
+ """
+
+ def is_applicable(self):
+ """ BootCluster is always applicable """
+
+ return True
+
+ def setup(self):
+ """ Set up the component, returning True on success """
+
+ self._cm.prepare()
+
+ # Clear out the cobwebs ;-)
+ self._cm.stopall(verbose=True, force=True)
+
+ # Now start the Cluster Manager on all the nodes.
+ self._cm.log("Starting Cluster Manager on all nodes.")
+ return self._cm.startall(verbose=True, quick=True)
+
+ def teardown(self):
+ """ Tear down the component """
+
+ self._cm.log("Stopping Cluster Manager on all nodes")
+ self._cm.stopall(verbose=True, force=False)
+
+
+class LeaveBooted(BootCluster):
+ """ The LeaveBooted component leaves all nodes up when the scenario
+ is complete.
+ """
+
+ def teardown(self):
+ """ Tear down the component """
+
+ self._cm.log("Leaving Cluster running on all nodes")
diff --git a/python/pacemaker/_cts/test.py b/python/pacemaker/_cts/test.py
index fb809a9..577ebb3 100644
--- a/python/pacemaker/_cts/test.py
+++ b/python/pacemaker/_cts/test.py
@@ -147,7 +147,7 @@ class Test:
this requires all subclasses to set self._daemon_location before
accessing this property or an exception will be raised.
"""
- return os.path.join(self.logdir, self._daemon_location + ".log")
+ return os.path.join(self.logdir, "%s.log" % self._daemon_location)
###
### PRIVATE METHODS
@@ -287,6 +287,17 @@ class Test:
self._patterns.append(Pattern(pattern, negative=negative, regex=regex))
+ def _signal_dict(self):
+ """ Return a dictionary mapping signal numbers to their names """
+
+ # FIXME: When we support python >= 3.5, this function can be replaced with:
+ # signal.Signals(self.daemon_process.returncode).name
+ return {
+ getattr(signal, _signame): _signame
+ for _signame in dir(signal)
+ if _signame.startswith("SIG") and not _signame.startswith("SIG_")
+ }
+
def clean_environment(self):
""" Clean up the host after executing a test """
@@ -295,13 +306,11 @@ class Test:
self._daemon_process.terminate()
self._daemon_process.wait()
else:
- return_code = {
- getattr(signal, _signame): _signame
- for _signame in dir(signal)
- if _signame.startswith('SIG') and not _signame.startswith("SIG_")
- }.get(-self._daemon_process.returncode, "RET=%d" % (self._daemon_process.returncode))
+ rc = self._daemon_process.returncode
+ signame = self._signal_dict().get(-rc, "RET=%s" % rc)
msg = "FAILURE - '%s' failed. %s abnormally exited during test (%s)."
- self._result_txt = msg % (self.name, self._daemon_location, return_code)
+
+ self._result_txt = msg % (self.name, self._daemon_location, signame)
self.exitcode = ExitStatus.ERROR
self._daemon_process = None
@@ -311,7 +320,7 @@ class Test:
# makes fenced output any kind of 8 bit value - while still interesting
# for debugging and we'd still like the regression-test to go over the
# full set of test-cases
- with open(self.logpath, 'rt', encoding = "ISO-8859-1") as logfile:
+ with open(self.logpath, 'rt', encoding="ISO-8859-1") as logfile:
for line in logfile.readlines():
self._daemon_output += line
@@ -361,7 +370,7 @@ class Test:
if self.verbose:
print("Step %d SUCCESS" % (i))
- i = i + 1
+ i += 1
self.clean_environment()
@@ -427,7 +436,7 @@ class Test:
if args['validate']:
if args['check_rng']:
- rng_file = rng_directory() + "/api/api-result.rng"
+ rng_file = "%s/api/api-result.rng" % rng_directory()
else:
rng_file = None
@@ -478,7 +487,7 @@ class Test:
if not self.force_wait and logfile is None \
and os.path.exists(self.logpath):
- logfile = io.open(self.logpath, 'rt', encoding = "ISO-8859-1")
+ logfile = io.open(self.logpath, 'rt', encoding="ISO-8859-1")
if not self.force_wait and logfile is not None:
for line in logfile.readlines():
@@ -562,10 +571,10 @@ class Tests:
continue
if test.exitcode != ExitStatus.OK:
- failures = failures + 1
+ failures += 1
test.print_result(" ")
else:
- success = success + 1
+ success += 1
if failures == 0:
print(" None")
diff --git a/python/pacemaker/_cts/tests/Makefile.am b/python/pacemaker/_cts/tests/Makefile.am
new file mode 100644
index 0000000..0dba74b
--- /dev/null
+++ b/python/pacemaker/_cts/tests/Makefile.am
@@ -0,0 +1,14 @@
+#
+# Copyright 2023 the Pacemaker project contributors
+#
+# The version control history for this file may have further details.
+#
+# This source code is licensed under the GNU General Public License version 2
+# or later (GPLv2+) WITHOUT ANY WARRANTY.
+#
+
+MAINTAINERCLEANFILES = Makefile.in
+
+pkgpythondir = $(pythondir)/$(PACKAGE)/_cts/tests
+
+pkgpython_PYTHON = $(wildcard *.py)
diff --git a/python/pacemaker/_cts/tests/__init__.py b/python/pacemaker/_cts/tests/__init__.py
new file mode 100644
index 0000000..63b34aa
--- /dev/null
+++ b/python/pacemaker/_cts/tests/__init__.py
@@ -0,0 +1,87 @@
+"""
+Test classes for the `pacemaker._cts` package.
+"""
+
+__copyright__ = "Copyright 2023 the Pacemaker project contributors"
+__license__ = "GNU Lesser General Public License version 2.1 or later (LGPLv2.1+)"
+
+from pacemaker._cts.tests.componentfail import ComponentFail
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.fliptest import FlipTest
+from pacemaker._cts.tests.maintenancemode import MaintenanceMode
+from pacemaker._cts.tests.nearquorumpointtest import NearQuorumPointTest
+from pacemaker._cts.tests.partialstart import PartialStart
+from pacemaker._cts.tests.reattach import Reattach
+from pacemaker._cts.tests.restartonebyone import RestartOnebyOne
+from pacemaker._cts.tests.resourcerecover import ResourceRecover
+from pacemaker._cts.tests.restarttest import RestartTest
+from pacemaker._cts.tests.resynccib import ResyncCIB
+from pacemaker._cts.tests.remotebasic import RemoteBasic
+from pacemaker._cts.tests.remotedriver import RemoteDriver
+from pacemaker._cts.tests.remotemigrate import RemoteMigrate
+from pacemaker._cts.tests.remoterscfailure import RemoteRscFailure
+from pacemaker._cts.tests.remotestonithd import RemoteStonithd
+from pacemaker._cts.tests.simulstart import SimulStart
+from pacemaker._cts.tests.simulstop import SimulStop
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.tests.simulstoplite import SimulStopLite
+from pacemaker._cts.tests.splitbraintest import SplitBrainTest
+from pacemaker._cts.tests.standbytest import StandbyTest
+from pacemaker._cts.tests.starttest import StartTest
+from pacemaker._cts.tests.startonebyone import StartOnebyOne
+from pacemaker._cts.tests.stonithdtest import StonithdTest
+from pacemaker._cts.tests.stoponebyone import StopOnebyOne
+from pacemaker._cts.tests.stoptest import StopTest
+
+def test_list(cm, audits):
+ """ Return a list of test class objects that are enabled and whose
+ is_applicable methods return True. These are the tests that
+ should be run.
+ """
+
+ # cm is a reasonable name here.
+ # pylint: disable=invalid-name
+
+ # A list of all enabled test classes, in the order that they should
+ # be run (if we're doing --once). There are various other ways of
+ # specifying which tests should be run, in which case the order here
+ # will not matter.
+ #
+ # Note that just because a test is listed here doesn't mean it will
+ # definitely be run - is_applicable is still taken into consideration.
+ # Also note that there are other tests that are excluded from this
+ # list for various reasons.
+ enabled_test_classes = [
+ FlipTest,
+ RestartTest,
+ StonithdTest,
+ StartOnebyOne,
+ SimulStart,
+ SimulStop,
+ StopOnebyOne,
+ RestartOnebyOne,
+ PartialStart,
+ StandbyTest,
+ MaintenanceMode,
+ ResourceRecover,
+ ComponentFail,
+ SplitBrainTest,
+ Reattach,
+ ResyncCIB,
+ NearQuorumPointTest,
+ RemoteBasic,
+ RemoteStonithd,
+ RemoteMigrate,
+ RemoteRscFailure,
+ ]
+
+ result = []
+
+ for testclass in enabled_test_classes:
+ bound_test = testclass(cm)
+
+ if bound_test.is_applicable():
+ bound_test.audits = audits
+ result.append(bound_test)
+
+ return result
diff --git a/python/pacemaker/_cts/tests/componentfail.py b/python/pacemaker/_cts/tests/componentfail.py
new file mode 100644
index 0000000..f3d3622
--- /dev/null
+++ b/python/pacemaker/_cts/tests/componentfail.py
@@ -0,0 +1,167 @@
+""" Kill a pacemaker daemon and test how the cluster recovers """
+
+__all__ = ["ComponentFail"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import re
+
+from pacemaker._cts.audits import AuditResource
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class ComponentFail(CTSTest):
+ """ A concrete test that kills a random pacemaker daemon and waits for the
+ cluster to recover
+ """
+
+ def __init__(self, cm):
+ """ Create a new ComponentFail instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+
+ self.is_unsafe = True
+ self.name = "ComponentFail"
+
+ self._complist = cm.components
+ self._okerrpatterns = []
+ self._patterns = []
+ self._startall = SimulStartLite(cm)
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ self.incr("calls")
+ self._patterns = []
+ self._okerrpatterns = []
+
+ # start all nodes
+ ret = self._startall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ if not self._cm.cluster_stable(self._env["StableTime"]):
+ return self.failure("Setup failed - unstable")
+
+ node_is_dc = self._cm.is_node_dc(node, None)
+
+ # select a component to kill
+ chosen = self._env.random_gen.choice(self._complist)
+ while chosen.dc_only and not node_is_dc:
+ chosen = self._env.random_gen.choice(self._complist)
+
+ self.debug("...component %s (dc=%s)" % (chosen.name, node_is_dc))
+ self.incr(chosen.name)
+
+ if chosen.name != "corosync":
+ self._patterns.extend([
+ self.templates["Pat:ChildKilled"] % (node, chosen.name),
+ self.templates["Pat:ChildRespawn"] % (node, chosen.name),
+ ])
+
+ self._patterns.extend(chosen.pats)
+ if node_is_dc:
+ self._patterns.extend(chosen.dc_pats)
+
+ # @TODO this should be a flag in the Component
+ if chosen.name in ["corosync", "pacemaker-based", "pacemaker-fenced"]:
+ # Ignore actions for fence devices if fencer will respawn
+ # (their registration will be lost, and probes will fail)
+ self._okerrpatterns = [
+ self.templates["Pat:Fencing_active"],
+ ]
+ (_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
+
+ for line in lines:
+ if re.search("^Resource", line):
+ r = AuditResource(self._cm, line)
+
+ if r.rclass == "stonith":
+ self._okerrpatterns.extend([
+ self.templates["Pat:Fencing_recover"] % r.id,
+ self.templates["Pat:Fencing_probe"] % r.id,
+ ])
+
+ # supply a copy so self.patterns doesn't end up empty
+ tmp_pats = self._patterns.copy()
+ self._patterns.extend(chosen.badnews_ignore)
+
+ # Look for STONITH ops, depending on Env["at-boot"] we might need to change the nodes status
+ stonith_pats = [
+ self.templates["Pat:Fencing_ok"] % node
+ ]
+ stonith = self.create_watch(stonith_pats, 0)
+ stonith.set_watch()
+
+ # set the watch for stable
+ watch = self.create_watch(
+ tmp_pats, self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"])
+
+ watch.set_watch()
+
+ # kill the component
+ chosen.kill(node)
+
+ self.debug("Waiting for the cluster to recover")
+ self._cm.cluster_stable()
+
+ self.debug("Waiting for any fenced node to come back up")
+ self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
+
+ self.debug("Waiting for the cluster to re-stabilize with all nodes")
+ self._cm.cluster_stable(self._env["StartTime"])
+
+ self.debug("Checking if %s was shot" % node)
+ shot = stonith.look(60)
+
+ if shot:
+ self.debug("Found: %r" % shot)
+ self._okerrpatterns.append(self.templates["Pat:Fencing_start"] % node)
+
+ if not self._env["at-boot"]:
+ self._cm.expected_status[node] = "down"
+
+ # If fencing occurred, chances are many (if not all) the expected logs
+ # will not be sent - or will be lost when the node reboots
+ return self.success()
+
+ # check for logs indicating a graceful recovery
+ matched = watch.look_for_all(allow_multiple_matches=True)
+ if watch.unmatched:
+ self._logger.log("Patterns not found: %r" % watch.unmatched)
+
+ self.debug("Waiting for the cluster to re-stabilize with all nodes")
+ is_stable = self._cm.cluster_stable(self._env["StartTime"])
+
+ if not matched:
+ return self.failure("Didn't find all expected %s patterns" % chosen.name)
+
+ if not is_stable:
+ return self.failure("Cluster did not become stable after killing %s" % chosen.name)
+
+ return self.success()
+
+ @property
+ def errors_to_ignore(self):
+ """ Return list of errors which should be ignored """
+
+ # Note that okerrpatterns refers to the last time we ran this test
+ # The good news is that this works fine for us...
+ self._okerrpatterns.extend(self._patterns)
+ return self._okerrpatterns
diff --git a/python/pacemaker/_cts/tests/ctstest.py b/python/pacemaker/_cts/tests/ctstest.py
new file mode 100644
index 0000000..8669e48
--- /dev/null
+++ b/python/pacemaker/_cts/tests/ctstest.py
@@ -0,0 +1,252 @@
+""" Base classes for CTS tests """
+
+__all__ = ["CTSTest"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import re
+
+from pacemaker._cts.environment import EnvFactory
+from pacemaker._cts.logging import LogFactory
+from pacemaker._cts.patterns import PatternSelector
+from pacemaker._cts.remote import RemoteFactory
+from pacemaker._cts.timer import Timer
+from pacemaker._cts.watcher import LogWatcher
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+
+
+class CTSTest:
+ """ The base class for all cluster tests. This implements a basic set of
+ properties and behaviors like setup, tear down, time keeping, and
+ statistics tracking. It is up to specific tests to implement their own
+ specialized behavior on top of this class.
+ """
+
+ def __init__(self, cm):
+ """ Create a new CTSTest instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ # pylint: disable=invalid-name
+
+ self.audits = []
+ self.name = None
+ self.templates = PatternSelector(cm["Name"])
+
+ self.stats = {
+ "auditfail": 0,
+ "calls": 0,
+ "failure": 0,
+ "skipped": 0,
+ "success": 0
+ }
+
+ self._cm = cm
+ self._env = EnvFactory().getInstance()
+ self._r_o2cb = None
+ self._r_ocfs2 = []
+ self._rsh = RemoteFactory().getInstance()
+ self._logger = LogFactory()
+ self._timers = {}
+
+ self.benchmark = True # which tests to benchmark
+ self.failed = False
+ self.is_experimental = False
+ self.is_loop = False
+ self.is_unsafe = False
+ self.is_valgrind = False
+ self.passed = True
+
+ def log(self, args):
+ """ Log a message """
+
+ self._logger.log(args)
+
+ def debug(self, args):
+ """ Log a debug message """
+
+ self._logger.debug(args)
+
+ def get_timer(self, key="test"):
+ """ Get the start time of the given timer """
+
+ try:
+ return self._timers[key].start_time
+ except KeyError:
+ return 0
+
+ def set_timer(self, key="test"):
+ """ Set the start time of the given timer to now, and return
+ that time
+ """
+
+ if key not in self._timers:
+ self._timers[key] = Timer(self._logger, self.name, key)
+
+ self._timers[key].start()
+ return self._timers[key].start_time
+
+ def log_timer(self, key="test"):
+ """ Log the elapsed time of the given timer """
+
+ if key not in self._timers:
+ return
+
+ elapsed = self._timers[key].elapsed
+ self.debug("%s:%s runtime: %.2f" % (self.name, key, elapsed))
+ del self._timers[key]
+
+ def incr(self, name):
+ """ Increment the given stats key """
+
+ if name not in self.stats:
+ self.stats[name] = 0
+
+ self.stats[name] += 1
+
+ # Reset the test passed boolean
+ if name == "calls":
+ self.passed = True
+
+ def failure(self, reason="none"):
+ """ Increment the failure count, with an optional failure reason """
+
+ self.passed = False
+ self.incr("failure")
+ self._logger.log(("Test %s" % self.name).ljust(35) + " FAILED: %s" % reason)
+
+ return False
+
+ def success(self):
+ """ Increment the success count """
+
+ self.incr("success")
+ return True
+
+ def skipped(self):
+ """ Increment the skipped count """
+
+ self.incr("skipped")
+ return True
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ raise NotImplementedError
+
+ def audit(self):
+ """ Perform all the relevant audits (see ClusterAudit), returning
+ whether or not they all passed.
+ """
+
+ passed = True
+
+ for audit in self.audits:
+ if not audit():
+ self._logger.log("Internal %s Audit %s FAILED." % (self.name, audit.name))
+ self.incr("auditfail")
+ passed = False
+
+ return passed
+
+ def setup(self, node):
+ """ Setup this test """
+
+ # node is used in subclasses
+ # pylint: disable=unused-argument
+
+ return self.success()
+
+ def teardown(self, node):
+ """ Tear down this test """
+
+ # node is used in subclasses
+ # pylint: disable=unused-argument
+
+ return self.success()
+
+ def create_watch(self, patterns, timeout, name=None):
+ """ Create a new LogWatcher object with the given patterns, timeout,
+ and optional name. This object can be used to search log files
+ for matching patterns during this test's run.
+ """
+ if not name:
+ name = self.name
+
+ return LogWatcher(self._env["LogFileName"], patterns, self._env["nodes"], self._env["LogWatcher"], name, timeout)
+
+ def local_badnews(self, prefix, watch, local_ignore=None):
+ """ Use the given watch object to search through log files for messages
+ starting with the given prefix. If no prefix is given, use
+ "LocalBadNews:" by default. The optional local_ignore list should
+ be a list of regexes that, if found in a line, will cause that line
+ to be ignored.
+
+ Return the number of matches found.
+ """
+ errcount = 0
+ if not prefix:
+ prefix = "LocalBadNews:"
+
+ ignorelist = [" CTS: ", prefix]
+
+ if local_ignore:
+ ignorelist += local_ignore
+
+ while errcount < 100:
+ match = watch.look(0)
+ if match:
+ add_err = True
+
+ for ignore in ignorelist:
+ if add_err and re.search(ignore, match):
+ add_err = False
+
+ if add_err:
+ self._logger.log("%s %s" % (prefix, match))
+ errcount += 1
+ else:
+ break
+ else:
+ self._logger.log("Too many errors!")
+
+ watch.end()
+ return errcount
+
+ def is_applicable(self):
+ """ Return True if this test is applicable in the current test configuration.
+ This method must be implemented by all subclasses.
+ """
+
+ if self.is_loop and not self._env["loop-tests"]:
+ return False
+
+ if self.is_unsafe and not self._env["unsafe-tests"]:
+ return False
+
+ if self.is_valgrind and not self._env["valgrind-tests"]:
+ return False
+
+ if self.is_experimental and not self._env["experimental-tests"]:
+ return False
+
+ if self._env["benchmark"] and not self.benchmark:
+ return False
+
+ return True
+
+ @property
+ def errors_to_ignore(self):
+ """ Return list of errors which should be ignored """
+
+ return []
diff --git a/python/pacemaker/_cts/tests/fliptest.py b/python/pacemaker/_cts/tests/fliptest.py
new file mode 100644
index 0000000..5e77936
--- /dev/null
+++ b/python/pacemaker/_cts/tests/fliptest.py
@@ -0,0 +1,61 @@
+""" Stop running nodes, and start stopped nodes """
+
+__all__ = ["FlipTest"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import time
+
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.starttest import StartTest
+from pacemaker._cts.tests.stoptest import StopTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class FlipTest(CTSTest):
+ """ A concrete test that stops running nodes and starts stopped nodes """
+
+ def __init__(self, cm):
+ """ Create a new FlipTest instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+ self.name = "Flip"
+
+ self._start = StartTest(cm)
+ self._stop = StopTest(cm)
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ self.incr("calls")
+
+ if self._cm.expected_status[node] == "up":
+ self.incr("stopped")
+ ret = self._stop(node)
+ kind = "up->down"
+ # Give the cluster time to recognize it's gone...
+ time.sleep(self._env["StableTime"])
+ elif self._cm.expected_status[node] == "down":
+ self.incr("started")
+ ret = self._start(node)
+ kind = "down->up"
+ else:
+ return self.skipped()
+
+ self.incr(kind)
+ if ret:
+ return self.success()
+
+ return self.failure("%s failure" % kind)
diff --git a/python/pacemaker/_cts/tests/maintenancemode.py b/python/pacemaker/_cts/tests/maintenancemode.py
new file mode 100644
index 0000000..3c57c07
--- /dev/null
+++ b/python/pacemaker/_cts/tests/maintenancemode.py
@@ -0,0 +1,238 @@
+""" Toggle nodes in and out of maintenance mode """
+
+__all__ = ["MaintenanceMode"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import re
+
+from pacemaker._cts.audits import AuditResource
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.tests.starttest import StartTest
+from pacemaker._cts.timer import Timer
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+
+
+class MaintenanceMode(CTSTest):
+ """ A concrete test that toggles nodes in and out of maintenance mode """
+
+ def __init__(self, cm):
+ """ Create a new MaintenanceMode instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+
+ self.benchmark = True
+ self.name = "MaintenanceMode"
+
+ self._action = "asyncmon"
+ self._rid = "maintenanceDummy"
+ self._start = StartTest(cm)
+ self._startall = SimulStartLite(cm)
+
+ def _toggle_maintenance_mode(self, node, enabled):
+ """ Toggle maintenance mode on the given node """
+
+ pats = [
+ self.templates["Pat:DC_IDLE"]
+ ]
+
+ if enabled:
+ action = "On"
+ else:
+ action = "Off"
+
+ # fail the resource right after turning Maintenance mode on
+ # verify it is not recovered until maintenance mode is turned off
+ if enabled:
+ pats.append(self.templates["Pat:RscOpFail"] % (self._action, self._rid))
+ else:
+ pats.extend([
+ self.templates["Pat:RscOpOK"] % ("stop", self._rid),
+ self.templates["Pat:RscOpOK"] % ("start", self._rid)
+ ])
+
+ watch = self.create_watch(pats, 60)
+ watch.set_watch()
+
+ self.debug("Turning maintenance mode %s" % action)
+ self._rsh(node, self.templates["MaintenanceMode%s" % action])
+
+ if enabled:
+ self._rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self._rid, node))
+
+ with Timer(self._logger, self.name, "recover%s" % action):
+ watch.look_for_all()
+
+ if watch.unmatched:
+ self.debug("Failed to find patterns when turning maintenance mode %s" % action)
+ return repr(watch.unmatched)
+
+ return ""
+
+ def _insert_maintenance_dummy(self, node):
+ """ Create a dummy resource on the given node """
+
+ pats = [
+ ("%s.*" % node) + (self.templates["Pat:RscOpOK"] % ("start", self._rid))
+ ]
+
+ watch = self.create_watch(pats, 60)
+ watch.set_watch()
+
+ self._cm.add_dummy_rsc(node, self._rid)
+
+ with Timer(self._logger, self.name, "addDummy"):
+ watch.look_for_all()
+
+ if watch.unmatched:
+ self.debug("Failed to find patterns when adding maintenance dummy resource")
+ return repr(watch.unmatched)
+
+ return ""
+
+ def _remove_maintenance_dummy(self, node):
+ """ Remove the previously created dummy resource on the given node """
+
+ pats = [
+ self.templates["Pat:RscOpOK"] % ("stop", self._rid)
+ ]
+
+ watch = self.create_watch(pats, 60)
+ watch.set_watch()
+ self._cm.remove_dummy_rsc(node, self._rid)
+
+ with Timer(self._logger, self.name, "removeDummy"):
+ watch.look_for_all()
+
+ if watch.unmatched:
+ self.debug("Failed to find patterns when removing maintenance dummy resource")
+ return repr(watch.unmatched)
+
+ return ""
+
+ def _managed_rscs(self, node):
+ """ Return a list of all resources managed by the cluster """
+
+ rscs = []
+ (_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
+
+ for line in lines:
+ if re.search("^Resource", line):
+ tmp = AuditResource(self._cm, line)
+
+ if tmp.managed:
+ rscs.append(tmp.id)
+
+ return rscs
+
+ def _verify_resources(self, node, rscs, managed):
+ """ Verify that all resources in rscList are managed if they are expected
+ to be, or unmanaged if they are expected to be.
+ """
+
+ managed_rscs = rscs
+ managed_str = "managed"
+
+ if not managed:
+ managed_str = "unmanaged"
+
+ (_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
+ for line in lines:
+ if re.search("^Resource", line):
+ tmp = AuditResource(self._cm, line)
+
+ if managed and not tmp.managed:
+ continue
+
+ if not managed and tmp.managed:
+ continue
+
+ if managed_rscs.count(tmp.id):
+ managed_rscs.remove(tmp.id)
+
+ if not managed_rscs:
+ self.debug("Found all %s resources on %s" % (managed_str, node))
+ return True
+
+ self._logger.log("Could not find all %s resources on %s. %s" % (managed_str, node, managed_rscs))
+ return False
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ self.incr("calls")
+ verify_managed = False
+ verify_unmanaged = False
+ fail_pat = ""
+
+ if not self._startall(None):
+ return self.failure("Setup failed")
+
+ # get a list of all the managed resources. We use this list
+ # after enabling maintenance mode to verify all managed resources
+ # become un-managed. After maintenance mode is turned off, we use
+ # this list to verify all the resources become managed again.
+ managed_rscs = self._managed_rscs(node)
+ if not managed_rscs:
+ self._logger.log("No managed resources on %s" % node)
+ return self.skipped()
+
+ # insert a fake resource we can fail during maintenance mode
+ # so we can verify recovery does not take place until after maintenance
+ # mode is disabled.
+ fail_pat += self._insert_maintenance_dummy(node)
+
+ # toggle maintenance mode ON, then fail dummy resource.
+ fail_pat += self._toggle_maintenance_mode(node, True)
+
+ # verify all the resources are now unmanaged
+ if self._verify_resources(node, managed_rscs, False):
+ verify_unmanaged = True
+
+ # Toggle maintenance mode OFF, verify dummy is recovered.
+ fail_pat += self._toggle_maintenance_mode(node, False)
+
+ # verify all the resources are now managed again
+ if self._verify_resources(node, managed_rscs, True):
+ verify_managed = True
+
+ # Remove our maintenance dummy resource.
+ fail_pat += self._remove_maintenance_dummy(node)
+
+ self._cm.cluster_stable()
+
+ if fail_pat != "":
+ return self.failure("Unmatched patterns: %s" % fail_pat)
+
+ if not verify_unmanaged:
+ return self.failure("Failed to verify resources became unmanaged during maintenance mode")
+
+ if not verify_managed:
+ return self.failure("Failed to verify resources switched back to managed after disabling maintenance mode")
+
+ return self.success()
+
+ @property
+ def errors_to_ignore(self):
+ """ Return list of errors which should be ignored """
+
+ return [
+ r"Updating failcount for %s" % self._rid,
+ r"schedulerd.*: Recover\s+%s\s+\(.*\)" % self._rid,
+ r"Unknown operation: fail",
+ self.templates["Pat:RscOpOK"] % (self._action, self._rid),
+ r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self._rid, self._action, 0)
+ ]
diff --git a/python/pacemaker/_cts/tests/nearquorumpointtest.py b/python/pacemaker/_cts/tests/nearquorumpointtest.py
new file mode 100644
index 0000000..c5b70b7
--- /dev/null
+++ b/python/pacemaker/_cts/tests/nearquorumpointtest.py
@@ -0,0 +1,125 @@
+""" Randomly start and stop nodes to bring the cluster close to the quorum point """
+
+__all__ = ["NearQuorumPointTest"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class NearQuorumPointTest(CTSTest):
+ """ A concrete test that randomly starts and stops nodes to bring the
+ cluster close to the quorum point
+ """
+
+ def __init__(self, cm):
+ """ Create a new NearQuorumPointTest instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+
+ self.name = "NearQuorumPoint"
+
+ def __call__(self, dummy):
+ """ Perform this test """
+
+ self.incr("calls")
+ startset = []
+ stopset = []
+
+ stonith = self._cm.prepare_fencing_watcher()
+ #decide what to do with each node
+ for node in self._env["nodes"]:
+ action = self._env.random_gen.choice(["start", "stop"])
+
+ if action == "start":
+ startset.append(node)
+ elif action == "stop":
+ stopset.append(node)
+
+ self.debug("start nodes:%r" % startset)
+ self.debug("stop nodes:%r" % stopset)
+
+ #add search patterns
+ watchpats = []
+ for node in stopset:
+ if self._cm.expected_status[node] == "up":
+ watchpats.append(self.templates["Pat:We_stopped"] % node)
+
+ for node in startset:
+ if self._cm.expected_status[node] == "down":
+ watchpats.append(self.templates["Pat:Local_started"] % node)
+ else:
+ for stopping in stopset:
+ if self._cm.expected_status[stopping] == "up":
+ watchpats.append(self.templates["Pat:They_stopped"] % (node, stopping))
+
+ if not watchpats:
+ return self.skipped()
+
+ if startset:
+ watchpats.append(self.templates["Pat:DC_IDLE"])
+
+ watch = self.create_watch(watchpats, self._env["DeadTime"] + 10)
+
+ watch.set_watch()
+
+ #begin actions
+ for node in stopset:
+ if self._cm.expected_status[node] == "up":
+ self._cm.stop_cm_async(node)
+
+ for node in startset:
+ if self._cm.expected_status[node] == "down":
+ self._cm.start_cm_async(node)
+
+ #get the result
+ if watch.look_for_all():
+ self._cm.cluster_stable()
+ self._cm.fencing_cleanup("NearQuorumPoint", stonith)
+ return self.success()
+
+ self._logger.log("Warn: Patterns not found: %r" % watch.unmatched)
+
+ #get the "bad" nodes
+ upnodes = []
+ for node in stopset:
+ if self._cm.stat_cm(node):
+ upnodes.append(node)
+
+ downnodes = []
+ for node in startset:
+ if not self._cm.stat_cm(node):
+ downnodes.append(node)
+
+ self._cm.fencing_cleanup("NearQuorumPoint", stonith)
+ if not upnodes and not downnodes:
+ self._cm.cluster_stable()
+
+ # Make sure they're completely down with no residule
+ for node in stopset:
+ self._rsh(node, self.templates["StopCmd"])
+
+ return self.success()
+
+ if upnodes:
+ self._logger.log("Warn: Unstoppable nodes: %r" % upnodes)
+
+ if downnodes:
+ self._logger.log("Warn: Unstartable nodes: %r" % downnodes)
+
+ return self.failure()
diff --git a/python/pacemaker/_cts/tests/partialstart.py b/python/pacemaker/_cts/tests/partialstart.py
new file mode 100644
index 0000000..1b074e6
--- /dev/null
+++ b/python/pacemaker/_cts/tests/partialstart.py
@@ -0,0 +1,75 @@
+""" Start a node and then tell it to stop before it is fully running """
+
+__all__ = ["PartialStart"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.tests.simulstoplite import SimulStopLite
+from pacemaker._cts.tests.stoptest import StopTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class PartialStart(CTSTest):
+ """ A concrete test that interrupts a node before it's finished starting up """
+
+ def __init__(self, cm):
+ """ Create a new PartialStart instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+
+ self.name = "PartialStart"
+
+ self._startall = SimulStartLite(cm)
+ self._stop = StopTest(cm)
+ self._stopall = SimulStopLite(cm)
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ self.incr("calls")
+
+ ret = self._stopall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ watchpats = [
+ "pacemaker-controld.*Connecting to .* cluster infrastructure"
+ ]
+ watch = self.create_watch(watchpats, self._env["DeadTime"] + 10)
+ watch.set_watch()
+
+ self._cm.start_cm_async(node)
+ ret = watch.look_for_all()
+ if not ret:
+ self._logger.log("Patterns not found: %r" % watch.unmatched)
+ return self.failure("Setup of %s failed" % node)
+
+ ret = self._stop(node)
+ if not ret:
+ return self.failure("%s did not stop in time" % node)
+
+ return self.success()
+
+ @property
+ def errors_to_ignore(self):
+ """ Return list of errors which should be ignored """
+
+ # We might do some fencing in the 2-node case if we make it up far enough
+ return [
+ r"Executing reboot fencing operation",
+ r"Requesting fencing \([^)]+\) targeting node "
+ ]
diff --git a/python/pacemaker/_cts/tests/reattach.py b/python/pacemaker/_cts/tests/reattach.py
new file mode 100644
index 0000000..4452bc0
--- /dev/null
+++ b/python/pacemaker/_cts/tests/reattach.py
@@ -0,0 +1,221 @@
+""" Restart the cluster and verify resources remain running """
+
+__all__ = ["Reattach"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import re
+import time
+
+from pacemaker.exitstatus import ExitStatus
+from pacemaker._cts.audits import AuditResource
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.tests.simulstoplite import SimulStopLite
+from pacemaker._cts.tests.starttest import StartTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+
+
+class Reattach(CTSTest):
+ """ A concrete test that restarts the cluster and verifies that resources
+ remain running throughout
+ """
+
+ def __init__(self, cm):
+ """ Create a new Reattach instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+
+ self.name = "Reattach"
+
+ self._startall = SimulStartLite(cm)
+ self._stopall = SimulStopLite(cm)
+
+ def _is_managed(self, node):
+ """ Are resources managed by the cluster? """
+
+ (_, is_managed) = self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -q -G -d true", verbose=1)
+ is_managed = is_managed[0].strip()
+ return is_managed == "true"
+
+ def _set_unmanaged(self, node):
+ """ Disable resource management """
+
+ self.debug("Disable resource management")
+ self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -v false")
+
+ def _set_managed(self, node):
+ """ Enable resource management """
+
+ self.debug("Re-enable resource management")
+ self._rsh(node, "crm_attribute -t rsc_defaults -n is-managed -D")
+
+ def _disable_incompatible_rscs(self, node):
+ """ Disable resources that are incompatible with this test
+
+ Starts and stops of stonith-class resources are implemented internally
+ by Pacemaker, which means that they must stop when Pacemaker is
+ stopped, even if unmanaged. Disable them before running the Reattach
+ test so they don't affect resource placement.
+
+ OCFS2 resources must be disabled too for some reason.
+
+ Set target-role to "Stopped" for any of these resources in the CIB.
+ """
+
+ self.debug("Disable incompatible (stonith/OCFS2) resources")
+ xml = """'<meta_attributes id="cts-lab-Reattach-meta">
+ <nvpair id="cts-lab-Reattach-target-role" name="target-role" value="Stopped"/>
+ <rule id="cts-lab-Reattach-rule" boolean-op="or" score="INFINITY">
+ <rsc_expression id="cts-lab-Reattach-stonith" class="stonith"/>
+ <rsc_expression id="cts-lab-Reattach-o2cb" type="o2cb"/>
+ </rule>
+ </meta_attributes>' --scope rsc_defaults"""
+ return self._rsh(node, self._cm.templates['CibAddXml'] % xml)
+
+ def _enable_incompatible_rscs(self, node):
+ """ Re-enable resources that were incompatible with this test """
+
+ self.debug("Re-enable incompatible (stonith/OCFS2) resources")
+ xml = """<meta_attributes id="cts-lab-Reattach-meta">"""
+ return self._rsh(node, """cibadmin --delete --xml-text '%s'""" % xml)
+
+ def _reprobe(self, node):
+ """ Reprobe all resources
+
+ The placement of some resources (such as promotable-1 in the
+ lab-generated CIB) is affected by constraints using node-attribute-based
+ rules. An earlier test may have erased the relevant node attribute, so
+ do a reprobe, which should add the attribute back.
+ """
+
+ return self._rsh(node, """crm_resource --refresh""")
+
+ def setup(self, node):
+ """ Setup this test """
+
+ if not self._startall(None):
+ return self.failure("Startall failed")
+
+ (rc, _) = self._disable_incompatible_rscs(node)
+ if rc != ExitStatus.OK:
+ return self.failure("Couldn't modify CIB to stop incompatible resources")
+
+ (rc, _) = self._reprobe(node)
+ if rc != ExitStatus.OK:
+ return self.failure("Couldn't reprobe resources")
+
+ if not self._cm.cluster_stable(double_check=True):
+ return self.failure("Cluster did not stabilize after setup")
+
+ return self.success()
+
+ def teardown(self, node):
+ """ Tear down this test """
+
+ # Make sure 'node' is up
+ start = StartTest(self._cm)
+ start(node)
+
+ if not self._is_managed(node):
+ self._set_managed(node)
+
+ (rc, _) = self._enable_incompatible_rscs(node)
+ if rc != ExitStatus.OK:
+ return self.failure("Couldn't modify CIB to re-enable incompatible resources")
+
+ if not self._cm.cluster_stable():
+ return self.failure("Cluster did not stabilize after teardown")
+ if not self._is_managed(node):
+ return self.failure("Could not re-enable resource management")
+
+ return self.success()
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ self.incr("calls")
+
+ # Conveniently, the scheduler will display this message when disabling
+ # management, even if fencing is not enabled, so we can rely on it.
+ managed = self.create_watch(["No fencing will be done"], 60)
+ managed.set_watch()
+
+ self._set_unmanaged(node)
+
+ if not managed.look_for_all():
+ self._logger.log("Patterns not found: %r" % managed.unmatched)
+ return self.failure("Resource management not disabled")
+
+ pats = [
+ self.templates["Pat:RscOpOK"] % ("start", ".*"),
+ self.templates["Pat:RscOpOK"] % ("stop", ".*"),
+ self.templates["Pat:RscOpOK"] % ("promote", ".*"),
+ self.templates["Pat:RscOpOK"] % ("demote", ".*"),
+ self.templates["Pat:RscOpOK"] % ("migrate", ".*")
+ ]
+
+ watch = self.create_watch(pats, 60, "ShutdownActivity")
+ watch.set_watch()
+
+ self.debug("Shutting down the cluster")
+ ret = self._stopall(None)
+ if not ret:
+ self._set_managed(node)
+ return self.failure("Couldn't shut down the cluster")
+
+ self.debug("Bringing the cluster back up")
+ ret = self._startall(None)
+ time.sleep(5) # allow ping to update the CIB
+ if not ret:
+ self._set_managed(node)
+ return self.failure("Couldn't restart the cluster")
+
+ if self.local_badnews("ResourceActivity:", watch):
+ self._set_managed(node)
+ return self.failure("Resources stopped or started during cluster restart")
+
+ watch = self.create_watch(pats, 60, "StartupActivity")
+ watch.set_watch()
+
+ # Re-enable resource management (and verify it happened).
+ self._set_managed(node)
+ self._cm.cluster_stable()
+ if not self._is_managed(node):
+ return self.failure("Could not re-enable resource management")
+
+ # Ignore actions for STONITH resources
+ ignore = []
+ (_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
+ for line in lines:
+ if re.search("^Resource", line):
+ r = AuditResource(self._cm, line)
+
+ if r.rclass == "stonith":
+ self.debug("Ignoring start actions for %s" % r.id)
+ ignore.append(self.templates["Pat:RscOpOK"] % ("start", r.id))
+
+ if self.local_badnews("ResourceActivity:", watch, ignore):
+ return self.failure("Resources stopped or started after resource management was re-enabled")
+
+ return ret
+
+ @property
+ def errors_to_ignore(self):
+ """ Return list of errors which should be ignored """
+
+ return [
+ r"resource( was|s were) active at shutdown"
+ ]
diff --git a/python/pacemaker/_cts/tests/remotebasic.py b/python/pacemaker/_cts/tests/remotebasic.py
new file mode 100644
index 0000000..2f25aaf
--- /dev/null
+++ b/python/pacemaker/_cts/tests/remotebasic.py
@@ -0,0 +1,39 @@
+""" Start and stop a remote node """
+
+__all__ = ["RemoteBasic"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.remotedriver import RemoteDriver
+
+
+class RemoteBasic(RemoteDriver):
+ """ A concrete test that starts and stops a remote node """
+
+ def __init__(self, cm):
+ """ Create a new RemoteBasic instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ RemoteDriver.__init__(self, cm)
+
+ self.name = "RemoteBasic"
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ if not self.start_new_test(node):
+ return self.failure(self.fail_string)
+
+ self.test_attributes(node)
+ self.cleanup_metal(node)
+
+ self.debug("Waiting for the cluster to recover")
+ self._cm.cluster_stable()
+ if self.failed:
+ return self.failure(self.fail_string)
+
+ return self.success()
diff --git a/python/pacemaker/_cts/tests/remotedriver.py b/python/pacemaker/_cts/tests/remotedriver.py
new file mode 100644
index 0000000..c5b0292
--- /dev/null
+++ b/python/pacemaker/_cts/tests/remotedriver.py
@@ -0,0 +1,556 @@
+""" Base classes for CTS tests """
+
+__all__ = ["RemoteDriver"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import os
+import time
+import subprocess
+import tempfile
+
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.tests.starttest import StartTest
+from pacemaker._cts.tests.stoptest import StopTest
+from pacemaker._cts.timer import Timer
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+
+
+class RemoteDriver(CTSTest):
+ """ A specialized base class for cluster tests that run on Pacemaker
+ Remote nodes. This builds on top of CTSTest to provide methods
+ for starting and stopping services and resources, and managing
+ remote nodes. This is still just an abstract class -- specific
+ tests need to implement their own specialized behavior.
+ """
+
+ def __init__(self, cm):
+ """ Create a new RemoteDriver instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+ self.name = "RemoteDriver"
+
+ self._corosync_enabled = False
+ self._pacemaker_enabled = False
+ self._remote_node = None
+ self._remote_rsc = "remote-rsc"
+ self._start = StartTest(cm)
+ self._startall = SimulStartLite(cm)
+ self._stop = StopTest(cm)
+
+ self.reset()
+
+ def reset(self):
+ """ Reset the state of this test back to what it was before the test
+ was run
+ """
+
+ self.failed = False
+ self.fail_string = ""
+
+ self._pcmk_started = False
+ self._remote_node_added = False
+ self._remote_rsc_added = False
+ self._remote_use_reconnect_interval = self._env.random_gen.choice([True, False])
+
+ def fail(self, msg):
+ """ Mark test as failed """
+
+ self.failed = True
+
+ # Always log the failure.
+ self._logger.log(msg)
+
+ # Use first failure as test status, as it's likely to be most useful.
+ if not self.fail_string:
+ self.fail_string = msg
+
+ def _get_other_node(self, node):
+ """ Get the first cluster node out of the environment that is not the
+ given node. Typically, this is used to find some node that will
+ still be active that we can run cluster commands on.
+ """
+
+ for othernode in self._env["nodes"]:
+ if othernode == node:
+ # we don't want to try and use the cib that we just shutdown.
+ # find a cluster node that is not our soon to be remote-node.
+ continue
+
+ return othernode
+
+ def _del_rsc(self, node, rsc):
+ """ Delete the given named resource from the cluster. The given `node`
+ is the cluster node on which we should *not* run the delete command.
+ """
+
+ othernode = self._get_other_node(node)
+ (rc, _) = self._rsh(othernode, "crm_resource -D -r %s -t primitive" % rsc)
+ if rc != 0:
+ self.fail("Removal of resource '%s' failed" % rsc)
+
+ def _add_rsc(self, node, rsc_xml):
+ """ Add a resource given in XML format to the cluster. The given `node`
+ is the cluster node on which we should *not* run the add command.
+ """
+
+ othernode = self._get_other_node(node)
+ (rc, _) = self._rsh(othernode, "cibadmin -C -o resources -X '%s'" % rsc_xml)
+ if rc != 0:
+ self.fail("resource creation failed")
+
+ def _add_primitive_rsc(self, node):
+ """ Add a primitive heartbeat resource for the remote node to the
+ cluster. The given `node` is the cluster node on which we should
+ *not* run the add command.
+ """
+
+ rsc_xml = """
+<primitive class="ocf" id="%(node)s" provider="heartbeat" type="Dummy">
+ <meta_attributes id="%(node)s-meta_attributes"/>
+ <operations>
+ <op id="%(node)s-monitor-interval-20s" interval="20s" name="monitor"/>
+ </operations>
+</primitive>""" % {
+ "node": self._remote_rsc
+}
+
+ self._add_rsc(node, rsc_xml)
+ if not self.failed:
+ self._remote_rsc_added = True
+
+ def _add_connection_rsc(self, node):
+ """ Add a primitive connection resource for the remote node to the
+ cluster. The given `node` is teh cluster node on which we should
+ *not* run the add command.
+ """
+
+ rsc_xml = """
+<primitive class="ocf" id="%(node)s" provider="pacemaker" type="remote">
+ <instance_attributes id="%(node)s-instance_attributes">
+ <nvpair id="%(node)s-instance_attributes-server" name="server" value="%(server)s"/>
+""" % {
+ "node": self._remote_node, "server": node
+}
+
+ if self._remote_use_reconnect_interval:
+ # Set reconnect interval on resource
+ rsc_xml += """
+ <nvpair id="%s-instance_attributes-reconnect_interval" name="reconnect_interval" value="60s"/>
+""" % self._remote_node
+
+ rsc_xml += """
+ </instance_attributes>
+ <operations>
+ <op id="%(node)s-start" name="start" interval="0" timeout="120s"/>
+ <op id="%(node)s-monitor-20s" name="monitor" interval="20s" timeout="45s"/>
+ </operations>
+</primitive>
+""" % {
+ "node": self._remote_node
+}
+
+ self._add_rsc(node, rsc_xml)
+ if not self.failed:
+ self._remote_node_added = True
+
+ def _disable_services(self, node):
+ """ Disable the corosync and pacemaker services on the given node """
+
+ self._corosync_enabled = self._env.service_is_enabled(node, "corosync")
+ if self._corosync_enabled:
+ self._env.disable_service(node, "corosync")
+
+ self._pacemaker_enabled = self._env.service_is_enabled(node, "pacemaker")
+ if self._pacemaker_enabled:
+ self._env.disable_service(node, "pacemaker")
+
+ def _enable_services(self, node):
+ """ Enable the corosync and pacemaker services on the given node """
+
+ if self._corosync_enabled:
+ self._env.enable_service(node, "corosync")
+
+ if self._pacemaker_enabled:
+ self._env.enable_service(node, "pacemaker")
+
+ def _stop_pcmk_remote(self, node):
+ """ Stop the Pacemaker Remote service on the given node """
+
+ for _ in range(10):
+ (rc, _) = self._rsh(node, "service pacemaker_remote stop")
+ if rc != 0:
+ time.sleep(6)
+ else:
+ break
+
+ def _start_pcmk_remote(self, node):
+ """ Start the Pacemaker Remote service on the given node """
+
+ for _ in range(10):
+ (rc, _) = self._rsh(node, "service pacemaker_remote start")
+ if rc != 0:
+ time.sleep(6)
+ else:
+ self._pcmk_started = True
+ break
+
+ def _freeze_pcmk_remote(self, node):
+ """ Simulate a Pacemaker Remote daemon failure """
+
+ self._rsh(node, "killall -STOP pacemaker-remoted")
+
+ def _resume_pcmk_remote(self, node):
+ """ Simulate the Pacemaker Remote daemon recovering """
+
+ self._rsh(node, "killall -CONT pacemaker-remoted")
+
+ def _start_metal(self, node):
+ """ Setup a Pacemaker Remote configuration. Remove any existing
+ connection resources or nodes. Start the pacemaker_remote service.
+ Create a connection resource.
+ """
+
+ # Cluster nodes are reused as remote nodes in remote tests. If cluster
+ # services were enabled at boot, in case the remote node got fenced, the
+ # cluster node would join instead of the expected remote one. Meanwhile
+ # pacemaker_remote would not be able to start. Depending on the chances,
+ # the situations might not be able to be orchestrated gracefully any more.
+ #
+ # Temporarily disable any enabled cluster serivces.
+ self._disable_services(node)
+
+ # make sure the resource doesn't already exist for some reason
+ self._rsh(node, "crm_resource -D -r %s -t primitive" % self._remote_rsc)
+ self._rsh(node, "crm_resource -D -r %s -t primitive" % self._remote_node)
+
+ if not self._stop(node):
+ self.fail("Failed to shutdown cluster node %s" % node)
+ return
+
+ self._start_pcmk_remote(node)
+
+ if not self._pcmk_started:
+ self.fail("Failed to start pacemaker_remote on node %s" % node)
+ return
+
+ # Convert node to baremetal now that it has shutdown the cluster stack
+ pats = []
+ watch = self.create_watch(pats, 120)
+ watch.set_watch()
+
+ pats.extend([
+ self.templates["Pat:RscOpOK"] % ("start", self._remote_node),
+ self.templates["Pat:DC_IDLE"]
+ ])
+
+ self._add_connection_rsc(node)
+
+ with Timer(self._logger, self.name, "remoteMetalInit"):
+ watch.look_for_all()
+
+ if watch.unmatched:
+ self.fail("Unmatched patterns: %s" % watch.unmatched)
+
+ def migrate_connection(self, node):
+ """ Move the remote connection resource from the node it's currently
+ running on to any other available node
+ """
+
+ if self.failed:
+ return
+
+ pats = [
+ self.templates["Pat:RscOpOK"] % ("migrate_to", self._remote_node),
+ self.templates["Pat:RscOpOK"] % ("migrate_from", self._remote_node),
+ self.templates["Pat:DC_IDLE"]
+ ]
+
+ watch = self.create_watch(pats, 120)
+ watch.set_watch()
+
+ (rc, _) = self._rsh(node, "crm_resource -M -r %s" % self._remote_node, verbose=1)
+ if rc != 0:
+ self.fail("failed to move remote node connection resource")
+ return
+
+ with Timer(self._logger, self.name, "remoteMetalMigrate"):
+ watch.look_for_all()
+
+ if watch.unmatched:
+ self.fail("Unmatched patterns: %s" % watch.unmatched)
+
+ def fail_rsc(self, node):
+ """ Cause the dummy resource running on a Pacemaker Remote node to fail
+ and verify that the failure is logged correctly
+ """
+
+ if self.failed:
+ return
+
+ watchpats = [
+ self.templates["Pat:RscRemoteOpOK"] % ("stop", self._remote_rsc, self._remote_node),
+ self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node),
+ self.templates["Pat:DC_IDLE"]
+ ]
+
+ watch = self.create_watch(watchpats, 120)
+ watch.set_watch()
+
+ self.debug("causing dummy rsc to fail.")
+
+ self._rsh(node, "rm -f /var/run/resource-agents/Dummy*")
+
+ with Timer(self._logger, self.name, "remoteRscFail"):
+ watch.look_for_all()
+
+ if watch.unmatched:
+ self.fail("Unmatched patterns during rsc fail: %s" % watch.unmatched)
+
+ def fail_connection(self, node):
+ """ Cause the remote connection resource to fail and verify that the
+ node is fenced and the connection resource is restarted on another
+ node.
+ """
+
+ if self.failed:
+ return
+
+ watchpats = [
+ self.templates["Pat:Fencing_ok"] % self._remote_node,
+ self.templates["Pat:NodeFenced"] % self._remote_node
+ ]
+
+ watch = self.create_watch(watchpats, 120)
+ watch.set_watch()
+
+ # freeze the pcmk remote daemon. this will result in fencing
+ self.debug("Force stopped active remote node")
+ self._freeze_pcmk_remote(node)
+
+ self.debug("Waiting for remote node to be fenced.")
+
+ with Timer(self._logger, self.name, "remoteMetalFence"):
+ watch.look_for_all()
+
+ if watch.unmatched:
+ self.fail("Unmatched patterns: %s" % watch.unmatched)
+ return
+
+ self.debug("Waiting for the remote node to come back up")
+ self._cm.ns.wait_for_node(node, 120)
+
+ pats = []
+
+ watch = self.create_watch(pats, 240)
+ watch.set_watch()
+
+ pats.append(self.templates["Pat:RscOpOK"] % ("start", self._remote_node))
+
+ if self._remote_rsc_added:
+ pats.append(self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node))
+
+ # start the remote node again watch it integrate back into cluster.
+ self._start_pcmk_remote(node)
+ if not self._pcmk_started:
+ self.fail("Failed to start pacemaker_remote on node %s" % node)
+ return
+
+ self.debug("Waiting for remote node to rejoin cluster after being fenced.")
+
+ with Timer(self._logger, self.name, "remoteMetalRestart"):
+ watch.look_for_all()
+
+ if watch.unmatched:
+ self.fail("Unmatched patterns: %s" % watch.unmatched)
+
+ def _add_dummy_rsc(self, node):
+ """ Add a dummy resource that runs on the Pacemaker Remote node """
+
+ if self.failed:
+ return
+
+ # verify we can put a resource on the remote node
+ pats = []
+ watch = self.create_watch(pats, 120)
+ watch.set_watch()
+
+ pats.extend([
+ self.templates["Pat:RscRemoteOpOK"] % ("start", self._remote_rsc, self._remote_node),
+ self.templates["Pat:DC_IDLE"]
+ ])
+
+ # Add a resource that must live on remote-node
+ self._add_primitive_rsc(node)
+
+ # force that rsc to prefer the remote node.
+ (rc, _) = self._cm.rsh(node, "crm_resource -M -r %s -N %s -f" % (self._remote_rsc, self._remote_node), verbose=1)
+ if rc != 0:
+ self.fail("Failed to place remote resource on remote node.")
+ return
+
+ with Timer(self._logger, self.name, "remoteMetalRsc"):
+ watch.look_for_all()
+
+ if watch.unmatched:
+ self.fail("Unmatched patterns: %s" % watch.unmatched)
+
+ def test_attributes(self, node):
+ """ Verify that attributes can be set on the Pacemaker Remote node """
+
+ if self.failed:
+ return
+
+ # This verifies permanent attributes can be set on a remote-node. It also
+ # verifies the remote-node can edit its own cib node section remotely.
+ (rc, line) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -v testval -N %s" % self._remote_node, verbose=1)
+ if rc != 0:
+ self.fail("Failed to set remote-node attribute. rc:%s output:%s" % (rc, line))
+ return
+
+ (rc, _) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -q -N %s" % self._remote_node, verbose=1)
+ if rc != 0:
+ self.fail("Failed to get remote-node attribute")
+ return
+
+ (rc, _) = self._cm.rsh(node, "crm_attribute -l forever -n testattr -D -N %s" % self._remote_node, verbose=1)
+ if rc != 0:
+ self.fail("Failed to delete remote-node attribute")
+
+ def cleanup_metal(self, node):
+ """ Clean up the Pacemaker Remote node configuration previously created by
+ _setup_metal. Stop and remove dummy resources and connection resources.
+ Stop the pacemaker_remote service. Remove the remote node itself.
+ """
+
+ self._enable_services(node)
+
+ if not self._pcmk_started:
+ return
+
+ pats = []
+
+ watch = self.create_watch(pats, 120)
+ watch.set_watch()
+
+ if self._remote_rsc_added:
+ pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._remote_rsc))
+
+ if self._remote_node_added:
+ pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._remote_node))
+
+ with Timer(self._logger, self.name, "remoteMetalCleanup"):
+ self._resume_pcmk_remote(node)
+
+ if self._remote_rsc_added:
+ # Remove dummy resource added for remote node tests
+ self.debug("Cleaning up dummy rsc put on remote node")
+ self._rsh(self._get_other_node(node), "crm_resource -U -r %s" % self._remote_rsc)
+ self._del_rsc(node, self._remote_rsc)
+
+ if self._remote_node_added:
+ # Remove remote node's connection resource
+ self.debug("Cleaning up remote node connection resource")
+ self._rsh(self._get_other_node(node), "crm_resource -U -r %s" % self._remote_node)
+ self._del_rsc(node, self._remote_node)
+
+ watch.look_for_all()
+
+ if watch.unmatched:
+ self.fail("Unmatched patterns: %s" % watch.unmatched)
+
+ self._stop_pcmk_remote(node)
+
+ self.debug("Waiting for the cluster to recover")
+ self._cm.cluster_stable()
+
+ if self._remote_node_added:
+ # Remove remote node itself
+ self.debug("Cleaning up node entry for remote node")
+ self._rsh(self._get_other_node(node), "crm_node --force --remove %s" % self._remote_node)
+
+ def _setup_env(self, node):
+ """ Setup the environment to allow Pacemaker Remote to function. This
+ involves generating a key and copying it to all nodes in the cluster.
+ """
+
+ self._remote_node = "remote-%s" % node
+
+ # we are assuming if all nodes have a key, that it is
+ # the right key... If any node doesn't have a remote
+ # key, we regenerate it everywhere.
+ if self._rsh.exists_on_all("/etc/pacemaker/authkey", self._env["nodes"]):
+ return
+
+ # create key locally
+ (handle, keyfile) = tempfile.mkstemp(".cts")
+ os.close(handle)
+ subprocess.check_call(["dd", "if=/dev/urandom", "of=%s" % keyfile, "bs=4096", "count=1"],
+ stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+
+ # sync key throughout the cluster
+ for n in self._env["nodes"]:
+ self._rsh(n, "mkdir -p --mode=0750 /etc/pacemaker")
+ self._rsh.copy(keyfile, "root@%s:/etc/pacemaker/authkey" % n)
+ self._rsh(n, "chgrp haclient /etc/pacemaker /etc/pacemaker/authkey")
+ self._rsh(n, "chmod 0640 /etc/pacemaker/authkey")
+
+ os.unlink(keyfile)
+
+ def is_applicable(self):
+ """ Return True if this test is applicable in the current test configuration. """
+
+ if not CTSTest.is_applicable(self):
+ return False
+
+ for node in self._env["nodes"]:
+ (rc, _) = self._rsh(node, "which pacemaker-remoted >/dev/null 2>&1")
+ if rc != 0:
+ return False
+
+ return True
+
+ def start_new_test(self, node):
+ """ Prepare a remote test for running by setting up its environment
+ and resources
+ """
+
+ self.incr("calls")
+ self.reset()
+
+ ret = self._startall(None)
+ if not ret:
+ return self.failure("setup failed: could not start all nodes")
+
+ self._setup_env(node)
+ self._start_metal(node)
+ self._add_dummy_rsc(node)
+ return True
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ raise NotImplementedError
+
+ @property
+ def errors_to_ignore(self):
+ """ Return list of errors which should be ignored """
+
+ return [
+ r"""is running on remote.*which isn't allowed""",
+ r"""Connection terminated""",
+ r"""Could not send remote"""
+ ]
diff --git a/python/pacemaker/_cts/tests/remotemigrate.py b/python/pacemaker/_cts/tests/remotemigrate.py
new file mode 100644
index 0000000..e22e98f
--- /dev/null
+++ b/python/pacemaker/_cts/tests/remotemigrate.py
@@ -0,0 +1,63 @@
+""" Move a connection resource from one node to another """
+
+__all__ = ["RemoteMigrate"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.remotedriver import RemoteDriver
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class RemoteMigrate(RemoteDriver):
+ """ A concrete test that moves a connection resource from one node to another """
+
+ def __init__(self, cm):
+ """ Create a new RemoteMigrate instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ RemoteDriver.__init__(self, cm)
+
+ self.name = "RemoteMigrate"
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ # This code is very similar to __call__ in remotestonithd.py, but I don't think
+ # it's worth turning into a library function nor making one a subclass of the
+ # other. I think that's more confusing than leaving the duplication.
+ # pylint: disable=duplicate-code
+
+ if not self.start_new_test(node):
+ return self.failure(self.fail_string)
+
+ self.migrate_connection(node)
+ self.cleanup_metal(node)
+
+ self.debug("Waiting for the cluster to recover")
+ self._cm.cluster_stable()
+ if self.failed:
+ return self.failure(self.fail_string)
+
+ return self.success()
+
+ def is_applicable(self):
+ """ Return True if this test is applicable in the current test configuration. """
+
+ if not RemoteDriver.is_applicable(self):
+ return False
+
+ # This test requires at least three nodes: one to convert to a
+ # remote node, one to host the connection originally, and one
+ # to migrate the connection to.
+ return len(self._env["nodes"]) >= 3
diff --git a/python/pacemaker/_cts/tests/remoterscfailure.py b/python/pacemaker/_cts/tests/remoterscfailure.py
new file mode 100644
index 0000000..6f221de
--- /dev/null
+++ b/python/pacemaker/_cts/tests/remoterscfailure.py
@@ -0,0 +1,73 @@
+""" Cause the Pacemaker Remote connection resource to fail """
+
+__all__ = ["RemoteRscFailure"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.remotedriver import RemoteDriver
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class RemoteRscFailure(RemoteDriver):
+ """ A concrete test that causes the Pacemaker Remote connection resource
+ to fail
+ """
+
+ def __init__(self, cm):
+ """ Create a new RemoteRscFailure instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ RemoteDriver.__init__(self, cm)
+ self.name = "RemoteRscFailure"
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ if not self.start_new_test(node):
+ return self.failure(self.fail_string)
+
+ # This is an important step. We are migrating the connection
+ # before failing the resource. This verifies that the migration
+ # has properly maintained control over the remote-node.
+ self.migrate_connection(node)
+
+ self.fail_rsc(node)
+ self.cleanup_metal(node)
+
+ self.debug("Waiting for the cluster to recover")
+ self._cm.cluster_stable()
+ if self.failed:
+ return self.failure(self.fail_string)
+
+ return self.success()
+
+ @property
+ def errors_to_ignore(self):
+ """ Return list of errors which should be ignored """
+
+ return [
+ r"schedulerd.*: Recover\s+remote-rsc\s+\(.*\)",
+ r"Dummy.*: No process state file found"
+ ] + super().errors_to_ignore
+
+ def is_applicable(self):
+ """ Return True if this test is applicable in the current test configuration. """
+
+ if not RemoteDriver.is_applicable(self):
+ return False
+
+ # This test requires at least three nodes: one to convert to a
+ # remote node, one to host the connection originally, and one
+ # to migrate the connection to.
+ return len(self._env["nodes"]) >= 3
diff --git a/python/pacemaker/_cts/tests/remotestonithd.py b/python/pacemaker/_cts/tests/remotestonithd.py
new file mode 100644
index 0000000..f684992
--- /dev/null
+++ b/python/pacemaker/_cts/tests/remotestonithd.py
@@ -0,0 +1,62 @@
+""" Fail the connection resource and fence the remote node """
+
+__all__ = ["RemoteStonithd"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.remotedriver import RemoteDriver
+
+
+class RemoteStonithd(RemoteDriver):
+ """ A concrete test that fails the connection resource and fences the
+ remote node
+ """
+
+ def __init__(self, cm):
+ """ Create a new RemoteStonithd instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ RemoteDriver.__init__(self, cm)
+
+ self.name = "RemoteStonithd"
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ if not self.start_new_test(node):
+ return self.failure(self.fail_string)
+
+ self.fail_connection(node)
+ self.cleanup_metal(node)
+
+ self.debug("Waiting for the cluster to recover")
+ self._cm.cluster_stable()
+ if self.failed:
+ return self.failure(self.fail_string)
+
+ return self.success()
+
+ def is_applicable(self):
+ """ Return True if this test is applicable in the current test configuration. """
+
+ if not RemoteDriver.is_applicable(self):
+ return False
+
+ return self._env.get("DoFencing", True)
+
+ @property
+ def errors_to_ignore(self):
+ """ Return list of errors which should be ignored """
+
+ return [
+ r"Lost connection to Pacemaker Remote node",
+ r"Software caused connection abort",
+ r"pacemaker-controld.*:\s+error.*: Operation remote-.*_monitor",
+ r"pacemaker-controld.*:\s+error.*: Result of monitor operation for remote-.*",
+ r"schedulerd.*:\s+Recover\s+remote-.*\s+\(.*\)",
+ r"error: Result of monitor operation for .* on remote-.*: Internal communication failure"
+ ] + super().errors_to_ignore
diff --git a/python/pacemaker/_cts/tests/resourcerecover.py b/python/pacemaker/_cts/tests/resourcerecover.py
new file mode 100644
index 0000000..252eb1f
--- /dev/null
+++ b/python/pacemaker/_cts/tests/resourcerecover.py
@@ -0,0 +1,175 @@
+""" Fail a random resource and verify its fail count increases """
+
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.audits import AuditResource
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.tests.starttest import StartTest
+from pacemaker._cts.timer import Timer
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+
+
+class ResourceRecover(CTSTest):
+ """ A concrete test that fails a random resource """
+
+ def __init__(self, cm):
+ """ Create a new ResourceRecover instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+
+ self.benchmark = True
+ self.name = "ResourceRecover"
+
+ self._action = "asyncmon"
+ self._interval = 0
+ self._rid = None
+ self._rid_alt = None
+ self._start = StartTest(cm)
+ self._startall = SimulStartLite(cm)
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ self.incr("calls")
+
+ if not self._startall(None):
+ return self.failure("Setup failed")
+
+ # List all resources active on the node (skip test if none)
+ resourcelist = self._cm.active_resources(node)
+ if not resourcelist:
+ self._logger.log("No active resources on %s" % node)
+ return self.skipped()
+
+ # Choose one resource at random
+ rsc = self._choose_resource(node, resourcelist)
+ if rsc is None:
+ return self.failure("Could not get details of resource '%s'" % self._rid)
+
+ if rsc.id == rsc.clone_id:
+ self.debug("Failing %s" % rsc.id)
+ else:
+ self.debug("Failing %s (also known as %s)" % (rsc.id, rsc.clone_id))
+
+ # Log patterns to watch for (failure, plus restart if managed)
+ pats = [
+ self.templates["Pat:CloneOpFail"] % (self._action, rsc.id, rsc.clone_id)
+ ]
+
+ if rsc.managed:
+ pats.append(self.templates["Pat:RscOpOK"] % ("stop", self._rid))
+
+ if rsc.unique:
+ pats.append(self.templates["Pat:RscOpOK"] % ("start", self._rid))
+ else:
+ # Anonymous clones may get restarted with a different clone number
+ pats.append(self.templates["Pat:RscOpOK"] % ("start", ".*"))
+
+ # Fail resource. (Ideally, we'd fail it twice, to ensure the fail count
+ # is incrementing properly, but it might restart on a different node.
+ # We'd have to temporarily ban it from all other nodes and ensure the
+ # migration-threshold hasn't been reached.)
+ if self._fail_resource(rsc, node, pats) is None:
+ # self.failure() already called
+ return None
+
+ return self.success()
+
+ def _choose_resource(self, node, resourcelist):
+ """ Choose a random resource to target """
+
+ self._rid = self._env.random_gen.choice(resourcelist)
+ self._rid_alt = self._rid
+ (_, lines) = self._rsh(node, "crm_resource -c", verbose=1)
+
+ for line in lines:
+ if line.startswith("Resource: "):
+ rsc = AuditResource(self._cm, line)
+
+ if rsc.id == self._rid:
+ # Handle anonymous clones that get renamed
+ self._rid = rsc.clone_id
+ return rsc
+
+ return None
+
+ def _get_failcount(self, node):
+ """ Check the fail count of targeted resource on given node """
+
+ cmd = "crm_failcount --quiet --query --resource %s --operation %s --interval %d --node %s"
+ (rc, lines) = self._rsh(node, cmd % (self._rid, self._action, self._interval, node),
+ verbose=1)
+
+ if rc != 0 or len(lines) != 1:
+ lines = [l.strip() for l in lines]
+ self._logger.log("crm_failcount on %s failed (%d): %s" % (node, rc, " // ".join(lines)))
+ return -1
+
+ try:
+ failcount = int(lines[0])
+ except (IndexError, ValueError):
+ self._logger.log("crm_failcount output on %s unparseable: %s" % (node, " ".join(lines)))
+ return -1
+
+ return failcount
+
+ def _fail_resource(self, rsc, node, pats):
+ """ Fail the targeted resource, and verify as expected """
+
+ orig_failcount = self._get_failcount(node)
+
+ watch = self.create_watch(pats, 60)
+ watch.set_watch()
+
+ self._rsh(node, "crm_resource -V -F -r %s -H %s &>/dev/null" % (self._rid, node))
+
+ with Timer(self._logger, self.name, "recover"):
+ watch.look_for_all()
+
+ self._cm.cluster_stable()
+ recovered = self._cm.resource_location(self._rid)
+
+ if watch.unmatched:
+ return self.failure("Patterns not found: %r" % watch.unmatched)
+
+ if rsc.unique and len(recovered) > 1:
+ return self.failure("%s is now active on more than one node: %r" % (self._rid, recovered))
+
+ if recovered:
+ self.debug("%s is running on: %r" % (self._rid, recovered))
+
+ elif rsc.managed:
+ return self.failure("%s was not recovered and is inactive" % self._rid)
+
+ new_failcount = self._get_failcount(node)
+ if new_failcount != orig_failcount + 1:
+ return self.failure("%s fail count is %d not %d"
+ % (self._rid, new_failcount, orig_failcount + 1))
+
+ return 0 # Anything but None is success
+
+ @property
+ def errors_to_ignore(self):
+ """ Return list of errors which should be ignored """
+
+ return [
+ r"Updating failcount for %s" % self._rid,
+ r"schedulerd.*: Recover\s+(%s|%s)\s+\(.*\)" % (self._rid, self._rid_alt),
+ r"Unknown operation: fail",
+ self.templates["Pat:RscOpOK"] % (self._action, self._rid),
+ r"(ERROR|error).*: Action %s_%s_%d .* initiated outside of a transition" % (self._rid, self._action, self._interval)
+ ]
diff --git a/python/pacemaker/_cts/tests/restartonebyone.py b/python/pacemaker/_cts/tests/restartonebyone.py
new file mode 100644
index 0000000..23b3a68
--- /dev/null
+++ b/python/pacemaker/_cts/tests/restartonebyone.py
@@ -0,0 +1,58 @@
+""" Restart all nodes in order """
+
+__all__ = ["RestartOnebyOne"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.restarttest import RestartTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class RestartOnebyOne(CTSTest):
+ """ A concrete test that restarts all nodes in order """
+
+ def __init__(self, cm):
+ """ Create a new RestartOnebyOne instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+
+ self.name = "RestartOnebyOne"
+
+ self._restart = None
+ self._startall = SimulStartLite(cm)
+
+ def __call__(self, dummy):
+ """ Perform the test """
+
+ self.incr("calls")
+
+ ret = self._startall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ did_fail = []
+ self.set_timer()
+ self._restart = RestartTest(self._cm)
+
+ for node in self._env["nodes"]:
+ if not self._restart(node):
+ did_fail.append(node)
+
+ if did_fail:
+ return self.failure("Could not restart %d nodes: %r" % (len(did_fail), did_fail))
+
+ return self.success()
diff --git a/python/pacemaker/_cts/tests/restarttest.py b/python/pacemaker/_cts/tests/restarttest.py
new file mode 100644
index 0000000..3b628ce
--- /dev/null
+++ b/python/pacemaker/_cts/tests/restarttest.py
@@ -0,0 +1,49 @@
+""" Stop and restart a node """
+
+__all__ = ["RestartTest"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.starttest import StartTest
+from pacemaker._cts.tests.stoptest import StopTest
+
+
+class RestartTest(CTSTest):
+ """ A concrete test that stops and restarts a node """
+
+ def __init__(self, cm):
+ """ Create a new RestartTest instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+ self.benchmark = True
+ self.name = "Restart"
+
+ self._start = StartTest(cm)
+ self._stop = StopTest(cm)
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ self.incr("calls")
+ self.incr("node:%s" % node)
+
+ if self._cm.stat_cm(node):
+ self.incr("WasStopped")
+ if not self._start(node):
+ return self.failure("start (setup) failure: %s" % node)
+
+ self.set_timer()
+
+ if not self._stop(node):
+ return self.failure("stop failure: %s" % node)
+
+ if not self._start(node):
+ return self.failure("start failure: %s" % node)
+
+ return self.success()
diff --git a/python/pacemaker/_cts/tests/resynccib.py b/python/pacemaker/_cts/tests/resynccib.py
new file mode 100644
index 0000000..fe634d6
--- /dev/null
+++ b/python/pacemaker/_cts/tests/resynccib.py
@@ -0,0 +1,75 @@
+""" Start the cluster without a CIB and verify it gets copied from another node """
+
+__all__ = ["ResyncCIB"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker import BuildOptions
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.restarttest import RestartTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.tests.simulstoplite import SimulStopLite
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+
+
+class ResyncCIB(CTSTest):
+ """ A concrete test that starts the cluster on one node without a CIB and
+ verifies the CIB is copied over when the remaining nodes join
+ """
+
+ def __init__(self, cm):
+ """ Create a new ResyncCIB instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+
+ self.name = "ResyncCIB"
+
+ self._restart1 = RestartTest(cm)
+ self._startall = SimulStartLite(cm)
+ self._stopall = SimulStopLite(cm)
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ self.incr("calls")
+
+ # Shut down all the nodes...
+ if not self._stopall(None):
+ return self.failure("Could not stop all nodes")
+
+ # Test config recovery when the other nodes come up
+ self._rsh(node, "rm -f %s/cib*" % BuildOptions.CIB_DIR)
+
+ # Start the selected node
+ if not self._restart1(node):
+ return self.failure("Could not start %s" % node)
+
+ # Start all remaining nodes
+ if not self._startall(None):
+ return self.failure("Could not start the remaining nodes")
+
+ return self.success()
+
+ @property
+ def errors_to_ignore(self):
+ """ Return list of errors which should be ignored """
+
+ # Errors that occur as a result of the CIB being wiped
+ return [
+ r"error.*: v1 patchset error, patch failed to apply: Application of an update diff failed",
+ r"error.*: Resource start-up disabled since no STONITH resources have been defined",
+ r"error.*: Either configure some or disable STONITH with the stonith-enabled option",
+ r"error.*: NOTE: Clusters with shared data need STONITH to ensure data integrity"
+ ]
diff --git a/python/pacemaker/_cts/tests/simulstart.py b/python/pacemaker/_cts/tests/simulstart.py
new file mode 100644
index 0000000..88a7f2f
--- /dev/null
+++ b/python/pacemaker/_cts/tests/simulstart.py
@@ -0,0 +1,42 @@
+""" Start all stopped nodes simultaneously """
+
+__all__ = ["SimulStart"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.tests.simulstoplite import SimulStopLite
+
+
+class SimulStart(CTSTest):
+ """ A concrete test that starts all stopped nodes simultaneously """
+
+ def __init__(self, cm):
+ """ Create a new SimulStart instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+
+ self.name = "SimulStart"
+
+ self._startall = SimulStartLite(cm)
+ self._stopall = SimulStopLite(cm)
+
+ def __call__(self, dummy):
+ """ Perform this test """
+
+ self.incr("calls")
+
+ ret = self._stopall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ if not self._startall(None):
+ return self.failure("Startall failed")
+
+ return self.success()
diff --git a/python/pacemaker/_cts/tests/simulstartlite.py b/python/pacemaker/_cts/tests/simulstartlite.py
new file mode 100644
index 0000000..c5c51e1
--- /dev/null
+++ b/python/pacemaker/_cts/tests/simulstartlite.py
@@ -0,0 +1,133 @@
+""" Simultaneously start stopped nodes """
+
+__all__ = ["SimulStartLite"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class SimulStartLite(CTSTest):
+ """ A pseudo-test that is only used to set up conditions before running
+ some other test. This class starts any stopped nodes more or less
+ simultaneously.
+
+ Other test classes should not use this one as a superclass.
+ """
+
+ def __init__(self, cm):
+ """ Create a new SimulStartLite instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+ self.name = "SimulStartLite"
+
+ def __call__(self, dummy):
+ """ Start all stopped nodes more or less simultaneously, returning
+ whether this succeeded or not.
+ """
+
+ self.incr("calls")
+ self.debug("Setup: %s" % self.name)
+
+ # We ignore the "node" parameter...
+ node_list = []
+ for node in self._env["nodes"]:
+ if self._cm.expected_status[node] == "down":
+ self.incr("WasStopped")
+ node_list.append(node)
+
+ self.set_timer()
+ while len(node_list) > 0:
+ # Repeat until all nodes come up
+ uppat = self.templates["Pat:NonDC_started"]
+ if self._cm.upcount() == 0:
+ uppat = self.templates["Pat:Local_started"]
+
+ watchpats = [
+ self.templates["Pat:DC_IDLE"]
+ ]
+ for node in node_list:
+ watchpats.extend([uppat % node,
+ self.templates["Pat:InfraUp"] % node,
+ self.templates["Pat:PacemakerUp"] % node])
+
+ # Start all the nodes - at about the same time...
+ watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
+ watch.set_watch()
+
+ stonith = self._cm.prepare_fencing_watcher()
+
+ for node in node_list:
+ self._cm.start_cm_async(node)
+
+ watch.look_for_all()
+
+ node_list = self._cm.fencing_cleanup(self.name, stonith)
+
+ if node_list is None:
+ return self.failure("Cluster did not stabilize")
+
+ # Remove node_list messages from watch.unmatched
+ for node in node_list:
+ self._logger.debug("Dealing with stonith operations for %s" % node_list)
+ if watch.unmatched:
+ try:
+ watch.unmatched.remove(uppat % node)
+ except ValueError:
+ self.debug("Already matched: %s" % (uppat % node))
+
+ try:
+ watch.unmatched.remove(self.templates["Pat:InfraUp"] % node)
+ except ValueError:
+ self.debug("Already matched: %s" % (self.templates["Pat:InfraUp"] % node))
+
+ try:
+ watch.unmatched.remove(self.templates["Pat:PacemakerUp"] % node)
+ except ValueError:
+ self.debug("Already matched: %s" % (self.templates["Pat:PacemakerUp"] % node))
+
+ if watch.unmatched:
+ for regex in watch.unmatched:
+ self._logger.log("Warn: Startup pattern not found: %s" % regex)
+
+ if not self._cm.cluster_stable():
+ return self.failure("Cluster did not stabilize")
+
+ did_fail = False
+ unstable = []
+ for node in self._env["nodes"]:
+ if not self._cm.stat_cm(node):
+ did_fail = True
+ unstable.append(node)
+
+ if did_fail:
+ return self.failure("Unstarted nodes exist: %s" % unstable)
+
+ unstable = []
+ for node in self._env["nodes"]:
+ if not self._cm.node_stable(node):
+ did_fail = True
+ unstable.append(node)
+
+ if did_fail:
+ return self.failure("Unstable cluster nodes exist: %s" % unstable)
+
+ return self.success()
+
+ def is_applicable(self):
+ """ SimulStartLite is a setup test and never applicable """
+
+ return False
diff --git a/python/pacemaker/_cts/tests/simulstop.py b/python/pacemaker/_cts/tests/simulstop.py
new file mode 100644
index 0000000..174c533
--- /dev/null
+++ b/python/pacemaker/_cts/tests/simulstop.py
@@ -0,0 +1,42 @@
+""" Stop all running nodes simultaneously """
+
+__all__ = ["SimulStop"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.tests.simulstoplite import SimulStopLite
+
+
+class SimulStop(CTSTest):
+ """ A concrete test that stops all running nodes simultaneously """
+
+ def __init__(self, cm):
+ """ Create a new SimulStop instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+
+ self.name = "SimulStop"
+
+ self._startall = SimulStartLite(cm)
+ self._stopall = SimulStopLite(cm)
+
+ def __call__(self, dummy):
+ """ Perform this test """
+
+ self.incr("calls")
+
+ ret = self._startall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ if not self._stopall(None):
+ return self.failure("Stopall failed")
+
+ return self.success()
diff --git a/python/pacemaker/_cts/tests/simulstoplite.py b/python/pacemaker/_cts/tests/simulstoplite.py
new file mode 100644
index 0000000..d2e687e
--- /dev/null
+++ b/python/pacemaker/_cts/tests/simulstoplite.py
@@ -0,0 +1,91 @@
+""" Simultaneously stop running nodes """
+
+__all__ = ["SimulStopLite"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class SimulStopLite(CTSTest):
+ """ A pseudo-test that is only used to set up conditions before running
+ some other test. This class stops any running nodes more or less
+ simultaneously. It can be used both to set up a test or to clean up
+ a test.
+
+ Other test classes should not use this one as a superclass.
+ """
+
+ def __init__(self, cm):
+ """ Create a new SimulStopLite instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+ self.name = "SimulStopLite"
+
+ def __call__(self, dummy):
+ """ Stop all running nodes more or less simultaneously, returning
+ whether this succeeded or not.
+ """
+
+ self.incr("calls")
+ self.debug("Setup: %s" % self.name)
+
+ # We ignore the "node" parameter...
+ watchpats = []
+
+ for node in self._env["nodes"]:
+ if self._cm.expected_status[node] == "up":
+ self.incr("WasStarted")
+ watchpats.append(self.templates["Pat:We_stopped"] % node)
+
+ if len(watchpats) == 0:
+ return self.success()
+
+ # Stop all the nodes - at about the same time...
+ watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
+
+ watch.set_watch()
+ self.set_timer()
+ for node in self._env["nodes"]:
+ if self._cm.expected_status[node] == "up":
+ self._cm.stop_cm_async(node)
+
+ if watch.look_for_all():
+ # Make sure they're completely down with no residule
+ for node in self._env["nodes"]:
+ self._rsh(node, self.templates["StopCmd"])
+
+ return self.success()
+
+ did_fail = False
+ up_nodes = []
+ for node in self._env["nodes"]:
+ if self._cm.stat_cm(node):
+ did_fail = True
+ up_nodes.append(node)
+
+ if did_fail:
+ return self.failure("Active nodes exist: %s" % up_nodes)
+
+ self._logger.log("Warn: All nodes stopped but CTS didn't detect: %s" % watch.unmatched)
+ return self.failure("Missing log message: %s " % watch.unmatched)
+
+ def is_applicable(self):
+ """ SimulStopLite is a setup test and never applicable """
+
+ return False
diff --git a/python/pacemaker/_cts/tests/splitbraintest.py b/python/pacemaker/_cts/tests/splitbraintest.py
new file mode 100644
index 0000000..09d5f55
--- /dev/null
+++ b/python/pacemaker/_cts/tests/splitbraintest.py
@@ -0,0 +1,215 @@
+""" Create a split brain cluster and verify a resource is multiply managed """
+
+__all__ = ["SplitBrainTest"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import time
+
+from pacemaker._cts.input import should_continue
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.tests.starttest import StartTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class SplitBrainTest(CTSTest):
+ """ A concrete test that creates a split brain cluster and verifies that
+ one node in each partition takes over the resource, resulting in two
+ nodes running the same resource.
+ """
+
+ def __init__(self, cm):
+ """ Create a new SplitBrainTest instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+
+ self.is_experimental = True
+ self.name = "SplitBrain"
+
+ self._start = StartTest(cm)
+ self._startall = SimulStartLite(cm)
+
+ def _isolate_partition(self, partition):
+ """ Create a new partition containing the given nodes """
+
+ other_nodes = self._env["nodes"].copy()
+
+ for node in partition:
+ try:
+ other_nodes.remove(node)
+ except ValueError:
+ self._logger.log("Node %s not in %r from %r" % (node, self._env["nodes"], partition))
+
+ if not other_nodes:
+ return
+
+ self.debug("Creating partition: %r" % partition)
+ self.debug("Everyone else: %r" % other_nodes)
+
+ for node in partition:
+ if not self._cm.isolate_node(node, other_nodes):
+ self._logger.log("Could not isolate %s" % node)
+ return
+
+ def _heal_partition(self, partition):
+ """ Move the given nodes out of their own partition back into the cluster """
+
+ other_nodes = self._env["nodes"].copy()
+
+ for node in partition:
+ try:
+ other_nodes.remove(node)
+ except ValueError:
+ self._logger.log("Node %s not in %r" % (node, self._env["nodes"]))
+
+ if len(other_nodes) == 0:
+ return
+
+ self.debug("Healing partition: %r" % partition)
+ self.debug("Everyone else: %r" % other_nodes)
+
+ for node in partition:
+ self._cm.unisolate_node(node, other_nodes)
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ self.incr("calls")
+ self.passed = True
+ partitions = {}
+
+ if not self._startall(None):
+ return self.failure("Setup failed")
+
+ while True:
+ # Retry until we get multiple partitions
+ partitions = {}
+ p_max = len(self._env["nodes"])
+
+ for n in self._env["nodes"]:
+ p = self._env.random_gen.randint(1, p_max)
+
+ if p not in partitions:
+ partitions[p] = []
+
+ partitions[p].append(n)
+
+ p_max = len(partitions)
+ if p_max > 1:
+ break
+ # else, try again
+
+ self.debug("Created %d partitions" % p_max)
+ for (key, val) in partitions.items():
+ self.debug("Partition[%s]:\t%r" % (key, val))
+
+ # Disabling STONITH to reduce test complexity for now
+ self._rsh(node, "crm_attribute -V -n stonith-enabled -v false")
+
+ for val in partitions.values():
+ self._isolate_partition(val)
+
+ count = 30
+ while count > 0:
+ if len(self._cm.find_partitions()) != p_max:
+ time.sleep(10)
+ else:
+ break
+ else:
+ self.failure("Expected partitions were not created")
+
+ # Target number of partitions formed - wait for stability
+ if not self._cm.cluster_stable():
+ self.failure("Partitioned cluster not stable")
+
+ # Now audit the cluster state
+ self._cm.partitions_expected = p_max
+ if not self.audit():
+ self.failure("Audits failed")
+
+ self._cm.partitions_expected = 1
+
+ # And heal them again
+ for val in partitions.values():
+ self._heal_partition(val)
+
+ # Wait for a single partition to form
+ count = 30
+ while count > 0:
+ if len(self._cm.find_partitions()) != 1:
+ time.sleep(10)
+ count -= 1
+ else:
+ break
+ else:
+ self.failure("Cluster did not reform")
+
+ # Wait for it to have the right number of members
+ count = 30
+ while count > 0:
+ members = []
+
+ partitions = self._cm.find_partitions()
+ if partitions:
+ members = partitions[0].split()
+
+ if len(members) != len(self._env["nodes"]):
+ time.sleep(10)
+ count -= 1
+ else:
+ break
+ else:
+ self.failure("Cluster did not completely reform")
+
+ # Wait up to 20 minutes - the delay is more preferable than
+ # trying to continue with in a messed up state
+ if not self._cm.cluster_stable(1200):
+ self.failure("Reformed cluster not stable")
+
+ if not should_continue(self._env):
+ raise ValueError("Reformed cluster not stable")
+
+ # Turn fencing back on
+ if self._env["DoFencing"]:
+ self._rsh(node, "crm_attribute -V -D -n stonith-enabled")
+
+ self._cm.cluster_stable()
+
+ if self.passed:
+ return self.success()
+
+ return self.failure("See previous errors")
+
+ @property
+ def errors_to_ignore(self):
+ """ Return list of errors which should be ignored """
+
+ return [
+ r"Another DC detected:",
+ r"(ERROR|error).*: .*Application of an update diff failed",
+ r"pacemaker-controld.*:.*not in our membership list",
+ r"CRIT:.*node.*returning after partition"
+ ]
+
+ def is_applicable(self):
+ """ Return True if this test is applicable in the current test configuration. """
+
+ if not CTSTest.is_applicable(self):
+ return False
+
+ return len(self._env["nodes"]) > 2
diff --git a/python/pacemaker/_cts/tests/standbytest.py b/python/pacemaker/_cts/tests/standbytest.py
new file mode 100644
index 0000000..a9ce8ec
--- /dev/null
+++ b/python/pacemaker/_cts/tests/standbytest.py
@@ -0,0 +1,110 @@
+""" Put a node into standby mode and check that resources migrate """
+
+__all__ = ["StandbyTest"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.tests.starttest import StartTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class StandbyTest(CTSTest):
+ """ A concrete tests that puts a node into standby and checks that resources
+ migrate away from the node
+ """
+
+ def __init__(self, cm):
+ """ Create a new StandbyTest instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+
+ self.benchmark = True
+ self.name = "Standby"
+
+ self._start = StartTest(cm)
+ self._startall = SimulStartLite(cm)
+
+ # make sure the node is active
+ # set the node to standby mode
+ # check resources, none resource should be running on the node
+ # set the node to active mode
+ # check resources, resources should have been migrated back (SHOULD THEY?)
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ self.incr("calls")
+ ret = self._startall(None)
+ if not ret:
+ return self.failure("Start all nodes failed")
+
+ self.debug("Make sure node %s is active" % node)
+ if self._cm.in_standby_mode(node):
+ if not self._cm.set_standby_mode(node, False):
+ return self.failure("can't set node %s to active mode" % node)
+
+ self._cm.cluster_stable()
+
+ if self._cm.in_standby_mode(node):
+ return self.failure("standby status of %s is [on] but we expect [off]" % node)
+
+ watchpats = [
+ r"State transition .* -> S_POLICY_ENGINE",
+ ]
+ watch = self.create_watch(watchpats, self._env["DeadTime"]+10)
+ watch.set_watch()
+
+ self.debug("Setting node %s to standby mode" % node)
+ if not self._cm.set_standby_mode(node, True):
+ return self.failure("can't set node %s to standby mode" % node)
+
+ self.set_timer("on")
+
+ ret = watch.look_for_all()
+ if not ret:
+ self._logger.log("Patterns not found: %r" % watch.unmatched)
+ self._cm.set_standby_mode(node, False)
+ return self.failure("cluster didn't react to standby change on %s" % node)
+
+ self._cm.cluster_stable()
+
+ if not self._cm.in_standby_mode(node):
+ return self.failure("standby status of %s is [off] but we expect [on]" % node)
+
+ self.log_timer("on")
+
+ self.debug("Checking resources")
+ rscs_on_node = self._cm.active_resources(node)
+ if rscs_on_node:
+ rc = self.failure("%s set to standby, %r is still running on it" % (node, rscs_on_node))
+ self.debug("Setting node %s to active mode" % node)
+ self._cm.set_standby_mode(node, False)
+ return rc
+
+ self.debug("Setting node %s to active mode" % node)
+ if not self._cm.set_standby_mode(node, False):
+ return self.failure("can't set node %s to active mode" % node)
+
+ self.set_timer("off")
+ self._cm.cluster_stable()
+
+ if self._cm.in_standby_mode(node):
+ return self.failure("standby status of %s is [on] but we expect [off]" % node)
+
+ self.log_timer("off")
+
+ return self.success()
diff --git a/python/pacemaker/_cts/tests/startonebyone.py b/python/pacemaker/_cts/tests/startonebyone.py
new file mode 100644
index 0000000..6a01097
--- /dev/null
+++ b/python/pacemaker/_cts/tests/startonebyone.py
@@ -0,0 +1,55 @@
+""" Start all stopped nodes serially """
+
+__all__ = ["StartOnebyOne"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.simulstoplite import SimulStopLite
+from pacemaker._cts.tests.starttest import StartTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class StartOnebyOne(CTSTest):
+ """ A concrete test that starts all stopped nodes serially """
+
+ def __init__(self, cm):
+ """ Create a new StartOnebyOne instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+ self.name = "StartOnebyOne"
+
+ self._start = StartTest(cm)
+ self._stopall = SimulStopLite(cm)
+
+ def __call__(self, dummy):
+ """ Perform this test """
+
+ self.incr("calls")
+
+ ret = self._stopall(None)
+ if not ret:
+ return self.failure("Test setup failed")
+
+ failed = []
+ self.set_timer()
+ for node in self._env["nodes"]:
+ if not self._start(node):
+ failed.append(node)
+
+ if failed:
+ return self.failure("Some node failed to start: %r" % failed)
+
+ return self.success()
diff --git a/python/pacemaker/_cts/tests/starttest.py b/python/pacemaker/_cts/tests/starttest.py
new file mode 100644
index 0000000..6387511
--- /dev/null
+++ b/python/pacemaker/_cts/tests/starttest.py
@@ -0,0 +1,54 @@
+""" Start the cluster manager on a given node """
+
+__all__ = ["StartTest"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class StartTest(CTSTest):
+ """ A pseudo-test that is only used to set up conditions before running
+ some other test. This class starts the cluster manager on a given
+ node.
+
+ Other test classes should not use this one as a superclass.
+ """
+
+ def __init__(self, cm):
+ """ Create a new StartTest instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+ self.name = "Start"
+
+ def __call__(self, node):
+ """ Start the given node, returning whether this succeeded or not """
+
+ self.incr("calls")
+
+ if self._cm.upcount() == 0:
+ self.incr("us")
+ else:
+ self.incr("them")
+
+ if self._cm.expected_status[node] != "down":
+ return self.skipped()
+
+ if self._cm.start_cm(node):
+ return self.success()
+
+ return self.failure("Startup %s on node %s failed"
+ % (self._env["Name"], node))
diff --git a/python/pacemaker/_cts/tests/stonithdtest.py b/python/pacemaker/_cts/tests/stonithdtest.py
new file mode 100644
index 0000000..0dce291
--- /dev/null
+++ b/python/pacemaker/_cts/tests/stonithdtest.py
@@ -0,0 +1,145 @@
+""" Fence a running node and wait for it to restart """
+
+__all__ = ["StonithdTest"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker.exitstatus import ExitStatus
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.timer import Timer
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class StonithdTest(CTSTest):
+ """ A concrete test that fences a running node and waits for it to restart """
+
+ def __init__(self, cm):
+ """ Create a new StonithdTest instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+ self.benchmark = True
+ self.name = "Stonithd"
+
+ self._startall = SimulStartLite(cm)
+
+ def __call__(self, node):
+ """ Perform this test """
+
+ self.incr("calls")
+ if len(self._env["nodes"]) < 2:
+ return self.skipped()
+
+ ret = self._startall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ watchpats = [
+ self.templates["Pat:Fencing_ok"] % node,
+ self.templates["Pat:NodeFenced"] % node,
+ ]
+
+ if not self._env["at-boot"]:
+ self.debug("Expecting %s to stay down" % node)
+ self._cm.expected_status[node] = "down"
+ else:
+ self.debug("Expecting %s to come up again %d" % (node, self._env["at-boot"]))
+ watchpats.extend([
+ "%s.* S_STARTING -> S_PENDING" % node,
+ "%s.* S_PENDING -> S_NOT_DC" % node,
+ ])
+
+ watch = self.create_watch(watchpats, 30 + self._env["DeadTime"] + self._env["StableTime"] + self._env["StartTime"])
+ watch.set_watch()
+
+ origin = self._env.random_gen.choice(self._env["nodes"])
+
+ (rc, _) = self._rsh(origin, "stonith_admin --reboot %s -VVVVVV" % node)
+
+ if rc == ExitStatus.TIMEOUT:
+ # Look for the patterns, usually this means the required
+ # device was running on the node to be fenced - or that
+ # the required devices were in the process of being loaded
+ # and/or moved
+ #
+ # Effectively the node committed suicide so there will be
+ # no confirmation, but pacemaker should be watching and
+ # fence the node again
+
+ self._logger.log("Fencing command on %s to fence %s timed out" % (origin, node))
+
+ elif origin != node and rc != 0:
+ self.debug("Waiting for the cluster to recover")
+ self._cm.cluster_stable()
+
+ self.debug("Waiting for fenced node to come back up")
+ self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
+
+ self._logger.log("Fencing command on %s failed to fence %s (rc=%d)" % (origin, node, rc))
+
+ elif origin == node and rc != 255:
+ # 255 == broken pipe, ie. the node was fenced as expected
+ self._logger.log("Locally originated fencing returned %d" % rc)
+
+ with Timer(self._logger, self.name, "fence"):
+ matched = watch.look_for_all()
+
+ self.set_timer("reform")
+ if watch.unmatched:
+ self._logger.log("Patterns not found: %r" % watch.unmatched)
+
+ self.debug("Waiting for the cluster to recover")
+ self._cm.cluster_stable()
+
+ self.debug("Waiting for fenced node to come back up")
+ self._cm.ns.wait_for_all_nodes(self._env["nodes"], 600)
+
+ self.debug("Waiting for the cluster to re-stabilize with all nodes")
+ is_stable = self._cm.cluster_stable(self._env["StartTime"])
+
+ if not matched:
+ return self.failure("Didn't find all expected patterns")
+
+ if not is_stable:
+ return self.failure("Cluster did not become stable")
+
+ self.log_timer("reform")
+ return self.success()
+
+ @property
+ def errors_to_ignore(self):
+ """ Return list of errors which should be ignored """
+
+ return [
+ self.templates["Pat:Fencing_start"] % ".*",
+ self.templates["Pat:Fencing_ok"] % ".*",
+ self.templates["Pat:Fencing_active"],
+ r"error.*: Operation 'reboot' targeting .* by .* for stonith_admin.*: Timer expired"
+ ]
+
+ def is_applicable(self):
+ """ Return True if this test is applicable in the current test configuration. """
+
+ if not CTSTest.is_applicable(self):
+ return False
+
+ # pylint gets confused because of EnvFactory here.
+ # pylint: disable=unsupported-membership-test
+ if "DoFencing" in self._env:
+ return self._env["DoFencing"]
+
+ return True
diff --git a/python/pacemaker/_cts/tests/stoponebyone.py b/python/pacemaker/_cts/tests/stoponebyone.py
new file mode 100644
index 0000000..d75d282
--- /dev/null
+++ b/python/pacemaker/_cts/tests/stoponebyone.py
@@ -0,0 +1,56 @@
+""" Stop all running nodes serially """
+
+__all__ = ["StopOnebyOne"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+from pacemaker._cts.tests.simulstartlite import SimulStartLite
+from pacemaker._cts.tests.stoptest import StopTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class StopOnebyOne(CTSTest):
+ """ A concrete test that stops all running nodes serially """
+
+ def __init__(self, cm):
+ """ Create a new StartOnebyOne instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+
+ self.name = "StopOnebyOne"
+
+ self._startall = SimulStartLite(cm)
+ self._stop = StopTest(cm)
+
+ def __call__(self, dummy):
+ """ Perform this test """
+
+ self.incr("calls")
+
+ ret = self._startall(None)
+ if not ret:
+ return self.failure("Setup failed")
+
+ failed = []
+ self.set_timer()
+ for node in self._env["nodes"]:
+ if not self._stop(node):
+ failed.append(node)
+
+ if failed:
+ return self.failure("Some node failed to stop: %r" % failed)
+
+ return self.success()
diff --git a/python/pacemaker/_cts/tests/stoptest.py b/python/pacemaker/_cts/tests/stoptest.py
new file mode 100644
index 0000000..8f496d3
--- /dev/null
+++ b/python/pacemaker/_cts/tests/stoptest.py
@@ -0,0 +1,99 @@
+""" Stop the cluster manager on a given node """
+
+__all__ = ["StopTest"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+from pacemaker._cts.tests.ctstest import CTSTest
+
+# Disable various pylint warnings that occur in so many places throughout this
+# file it's easiest to just take care of them globally. This does introduce the
+# possibility that we'll miss some other cause of the same warning, but we'll
+# just have to be careful.
+
+# pylint doesn't understand that self._rsh is callable.
+# pylint: disable=not-callable
+# pylint doesn't understand that self._env is subscriptable.
+# pylint: disable=unsubscriptable-object
+
+
+class StopTest(CTSTest):
+ """ A pseudo-test that is only used to set up conditions before running
+ some other test. This class stops the cluster manager on a given
+ node.
+
+ Other test classes should not use this one as a superclass.
+ """
+
+ def __init__(self, cm):
+ """ Create a new StopTest instance
+
+ Arguments:
+
+ cm -- A ClusterManager instance
+ """
+
+ CTSTest.__init__(self, cm)
+ self.name = "Stop"
+
+ def __call__(self, node):
+ """ Stop the given node, returning whether this succeeded or not """
+
+ self.incr("calls")
+ if self._cm.expected_status[node] != "up":
+ return self.skipped()
+
+ # Technically we should always be able to notice ourselves stopping
+ patterns = [
+ self.templates["Pat:We_stopped"] % node,
+ ]
+
+ # Any active node needs to notice this one left
+ # (note that this won't work if we have multiple partitions)
+ for other in self._env["nodes"]:
+ if self._cm.expected_status[other] == "up" and other != node:
+ patterns.append(self.templates["Pat:They_stopped"] %(other, node))
+
+ watch = self.create_watch(patterns, self._env["DeadTime"])
+ watch.set_watch()
+
+ if node == self._cm.our_node:
+ self.incr("us")
+ else:
+ if self._cm.upcount() <= 1:
+ self.incr("all")
+ else:
+ self.incr("them")
+
+ self._cm.stop_cm(node)
+ watch.look_for_all()
+
+ failreason = None
+ unmatched_str = "||"
+
+ if watch.unmatched:
+ (_, output) = self._rsh(node, "/bin/ps axf", verbose=1)
+ for line in output:
+ self.debug(line)
+
+ (_, output) = self._rsh(node, "/usr/sbin/dlm_tool dump 2>/dev/null", verbose=1)
+ for line in output:
+ self.debug(line)
+
+ for regex in watch.unmatched:
+ self._logger.log("ERROR: Shutdown pattern not found: %s" % regex)
+ unmatched_str += "%s||" % regex
+ failreason = "Missing shutdown pattern"
+
+ self._cm.cluster_stable(self._env["DeadTime"])
+
+ if not watch.unmatched or self._cm.upcount() == 0:
+ return self.success()
+
+ if len(watch.unmatched) >= self._cm.upcount():
+ return self.failure("no match against (%s)" % unmatched_str)
+
+ if failreason is None:
+ return self.success()
+
+ return self.failure(failreason)
diff --git a/python/pacemaker/_cts/timer.py b/python/pacemaker/_cts/timer.py
new file mode 100644
index 0000000..122b70b
--- /dev/null
+++ b/python/pacemaker/_cts/timer.py
@@ -0,0 +1,63 @@
+""" Timer-related utilities for CTS """
+
+__all__ = ["Timer"]
+__copyright__ = "Copyright 2000-2023 the Pacemaker project contributors"
+__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
+
+import time
+
+class Timer:
+ """ A class for measuring the runtime of some task. A Timer may be used
+ manually or as a context manager, like so:
+
+ with Timer(logger, "SomeTest", "SomeTimer"):
+ ...
+
+ A Timer runs from when start() is called until the timer is deleted
+ or reset() is called. There is no explicit stop method.
+ """
+
+ def __init__(self, logger, test_name, timer_name):
+ """ Create a new Timer instance.
+
+ Arguments:
+
+ logger -- A Logger instance that can be used to record when
+ the timer stopped
+ test_name -- The name of the test this timer is being run for
+ timer_name -- The name of this timer
+ """
+
+ self._logger = logger
+ self._start_time = None
+ self._test_name = test_name
+ self._timer_name = timer_name
+
+ def __enter__(self):
+ self.start()
+ return self
+
+ def __exit__(self, *args):
+ self._logger.debug("%s:%s runtime: %.2f" % (self._test_name, self._timer_name, self.elapsed))
+
+ def reset(self):
+ """ Restart the timer """
+
+ self.start()
+
+ def start(self):
+ """ Start the timer """
+
+ self._start_time = time.time()
+
+ @property
+ def start_time(self):
+ """ When did the timer start? """
+
+ return self._start_time
+
+ @property
+ def elapsed(self):
+ """ How long has the timer been running for? """
+
+ return time.time() - self._start_time
diff --git a/python/pacemaker/_cts/watcher.py b/python/pacemaker/_cts/watcher.py
index 3bdb892..3e6d702 100644
--- a/python/pacemaker/_cts/watcher.py
+++ b/python/pacemaker/_cts/watcher.py
@@ -13,7 +13,7 @@ from pacemaker.buildoptions import BuildOptions
from pacemaker._cts.logging import LogFactory
from pacemaker._cts.remote import RemoteFactory
-LOG_WATCHER_BIN = BuildOptions.DAEMON_DIR + "/cts-log-watcher"
+LOG_WATCHER_BIN = "%s/cts-log-watcher" % BuildOptions.DAEMON_DIR
@unique
class LogKind(Enum):
@@ -139,7 +139,7 @@ class FileObj(SearchObj):
if match:
self.offset = match.group(1)
- self.debug("Got %d lines, new offset: %s %s" % (len(out), self.offset, repr(self._delegate)))
+ self.debug("Got %d lines, new offset: %s %r" % (len(out), self.offset, self._delegate))
elif re.search(r"^CTSwatcher:.*truncated", line):
self.log(line)
elif re.search(r"^CTSwatcher:", line):
@@ -294,8 +294,8 @@ class JournalObj(SearchObj):
self.limit = lines[0].strip()
self.debug("Set limit to: %s" % self.limit)
else:
- self.debug("Unable to set limit for %s because date returned %d lines with status %d" % (self.host,
- len(lines), rc))
+ self.debug("Unable to set limit for %s because date returned %d lines with status %d"
+ % (self.host, len(lines), rc))
class LogWatcher:
""" A class for watching a single log file or journal across multiple hosts,
@@ -413,7 +413,7 @@ class LogWatcher:
for t in pending:
t.join(60.0)
if t.is_alive():
- self._logger.log("%s: Aborting after 20s waiting for %s logging commands" % (self.name, repr(t)))
+ self._logger.log("%s: Aborting after 20s waiting for %r logging commands" % (self.name, t))
return
def end(self):
diff --git a/python/pacemaker/buildoptions.py.in b/python/pacemaker/buildoptions.py.in
index 53b492b..17fe981 100644
--- a/python/pacemaker/buildoptions.py.in
+++ b/python/pacemaker/buildoptions.py.in
@@ -22,6 +22,9 @@ class BuildOptions:
CIB_DIR = "@CRM_CONFIG_DIR@"
""" Where CIB files are stored """
+ CIB_SCHEMA_VERSION = "@CIB_VERSION@"
+ """ Latest supported CIB schema version number """
+
COROSYNC_CONFIG_FILE = "@PCMK__COROSYNC_CONF@"
""" Path to the corosync config file """
diff --git a/python/pylintrc b/python/pylintrc
index e65110b..f46eece 100644
--- a/python/pylintrc
+++ b/python/pylintrc
@@ -446,7 +446,8 @@ exclude-too-few-public-methods=
[CLASSES]
# List of method names used to declare (i.e. assign) instance attributes.
-defining-attr-methods=__init__,__new__,setUp,__post_init__
+# CHANGED: Remove setUp and __post_init__, add reset
+defining-attr-methods=__init__,__new__,reset
# List of valid names for the first argument in a class method.
valid-classmethod-first-arg=cls
diff --git a/python/setup.py.in b/python/setup.py.in
index c4083da..e9d61d0 100644
--- a/python/setup.py.in
+++ b/python/setup.py.in
@@ -16,5 +16,5 @@ setup(name='pacemaker',
license='LGPLv2.1+',
url='https://clusterlabs.org/pacemaker/',
description='Python libraries for Pacemaker',
- packages=['pacemaker', 'pacemaker._cts'],
+ packages=['pacemaker', 'pacemaker._cts', 'pacemaker._cts.tests'],
)
diff --git a/python/tests/Makefile.am b/python/tests/Makefile.am
index 490b272..219812c 100644
--- a/python/tests/Makefile.am
+++ b/python/tests/Makefile.am
@@ -9,4 +9,5 @@
MAINTAINERCLEANFILES = Makefile.in
-EXTRA_DIST = $(wildcard test_*)
+EXTRA_DIST = $(wildcard test_*) \
+ __init__.py
diff --git a/python/tests/__init__.py b/python/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/python/tests/__init__.py
diff --git a/python/tests/test_cts_network.py b/python/tests/test_cts_network.py
new file mode 100644
index 0000000..4aea8b9
--- /dev/null
+++ b/python/tests/test_cts_network.py
@@ -0,0 +1,37 @@
+# These warnings are not useful in unit tests.
+# pylint: disable=missing-class-docstring,missing-function-docstring,missing-module-docstring
+
+__copyright__ = "Copyright 2023 the Pacemaker project contributors"
+__license__ = "GPLv2+"
+
+import unittest
+
+from pacemaker._cts.network import next_ip
+
+# next_ip makes a bunch of assumptions that we are not going to test here:
+#
+# * The env argument actually contains an "IPBase" key with a string in it
+# * The string is a properly formatted IPv4 or IPv6 address, with no extra
+# leading or trailing whitespace
+
+class NextIPTestCase(unittest.TestCase):
+ def test_ipv4(self):
+ # The first time next_ip is called, it will read the IPBase out of
+ # the environment. After that, it just goes off whatever it
+ # previously returned, so the environment value doesn't matter.
+ self.assertEqual(next_ip("1.1.1.1"), "1.1.1.2")
+ self.assertEqual(next_ip(), "1.1.1.3")
+
+ # Passing reset=True will force it to re-read the environment. Here,
+ # we use that to ask it for a value outside of the available range.
+ self.assertRaises(ValueError, next_ip, "1.1.1.255", reset=True)
+
+ def test_ipv6(self):
+ # Same comments as for the test_ipv4 function, plus we need to reset
+ # here because otherwise it will remember what happened in that function.
+ self.assertEqual(next_ip("fe80::fc54:ff:fe09:101", reset=True),
+ "fe80::fc54:ff:fe09:102")
+ self.assertEqual(next_ip(),
+ "fe80::fc54:ff:fe09:103")
+
+ self.assertRaises(ValueError, next_ip, "fe80::fc54:ff:fe09:ffff", reset=True)