summaryrefslogtreecommitdiffstats
path: root/qa/tasks/cephfs_test_runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'qa/tasks/cephfs_test_runner.py')
-rw-r--r--qa/tasks/cephfs_test_runner.py213
1 files changed, 213 insertions, 0 deletions
diff --git a/qa/tasks/cephfs_test_runner.py b/qa/tasks/cephfs_test_runner.py
new file mode 100644
index 000000000..8a4919b93
--- /dev/null
+++ b/qa/tasks/cephfs_test_runner.py
@@ -0,0 +1,213 @@
+import contextlib
+import logging
+import os
+import unittest
+from unittest import suite, loader, case
+from teuthology.task import interactive
+from teuthology import misc
+from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
+from tasks.mgr.mgr_test_case import MgrCluster
+
+log = logging.getLogger(__name__)
+
+
+class DecoratingLoader(loader.TestLoader):
+ """
+ A specialization of TestLoader that tags some extra attributes
+ onto test classes as they are loaded.
+ """
+ def __init__(self, params):
+ self._params = params
+ super(DecoratingLoader, self).__init__()
+
+ def _apply_params(self, obj):
+ for k, v in self._params.items():
+ if obj.__class__ is type:
+ cls = obj
+ else:
+ cls = obj.__class__
+ setattr(cls, k, v)
+
+ def loadTestsFromTestCase(self, testCaseClass):
+ self._apply_params(testCaseClass)
+ return super(DecoratingLoader, self).loadTestsFromTestCase(testCaseClass)
+
+ def loadTestsFromName(self, name, module=None):
+ result = super(DecoratingLoader, self).loadTestsFromName(name, module)
+
+ # Special case for when we were called with the name of a method, we get
+ # a suite with one TestCase
+ tests_in_result = list(result)
+ if len(tests_in_result) == 1 and isinstance(tests_in_result[0], case.TestCase):
+ self._apply_params(tests_in_result[0])
+
+ return result
+
+
+class LogStream(object):
+ def __init__(self):
+ self.buffer = ""
+
+ def write(self, data):
+ self.buffer += data
+ if "\n" in self.buffer:
+ lines = self.buffer.split("\n")
+ for line in lines[:-1]:
+ log.info(line)
+ self.buffer = lines[-1]
+
+ def flush(self):
+ pass
+
+
+class InteractiveFailureResult(unittest.TextTestResult):
+ """
+ Specialization that implements interactive-on-error style
+ behavior.
+ """
+ ctx = None
+
+ def addFailure(self, test, err):
+ log.error(self._exc_info_to_string(err, test))
+ log.error("Failure in test '{0}', going interactive".format(
+ self.getDescription(test)
+ ))
+ interactive.task(ctx=self.ctx, config=None)
+
+ def addError(self, test, err):
+ log.error(self._exc_info_to_string(err, test))
+ log.error("Error in test '{0}', going interactive".format(
+ self.getDescription(test)
+ ))
+ interactive.task(ctx=self.ctx, config=None)
+
+
+@contextlib.contextmanager
+def task(ctx, config):
+ """
+ Run the CephFS test cases.
+
+ Run everything in tasks/cephfs/test_*.py:
+
+ ::
+
+ tasks:
+ - install:
+ - ceph:
+ - ceph-fuse:
+ - cephfs_test_runner:
+
+ `modules` argument allows running only some specific modules:
+
+ ::
+
+ tasks:
+ ...
+ - cephfs_test_runner:
+ modules:
+ - tasks.cephfs.test_sessionmap
+ - tasks.cephfs.test_auto_repair
+
+ By default, any cases that can't be run on the current cluster configuration
+ will generate a failure. When the optional `fail_on_skip` argument is set
+ to false, any tests that can't be run on the current configuration will
+ simply be skipped:
+
+ ::
+ tasks:
+ ...
+ - cephfs_test_runner:
+ fail_on_skip: false
+
+ """
+
+ ceph_cluster = CephCluster(ctx)
+
+ if len(list(misc.all_roles_of_type(ctx.cluster, 'mds'))):
+ mds_cluster = MDSCluster(ctx)
+ fs = Filesystem(ctx)
+ else:
+ mds_cluster = None
+ fs = None
+
+ if len(list(misc.all_roles_of_type(ctx.cluster, 'mgr'))):
+ mgr_cluster = MgrCluster(ctx)
+ else:
+ mgr_cluster = None
+
+ # Mount objects, sorted by ID
+ if hasattr(ctx, 'mounts'):
+ mounts = [v for k, v in sorted(ctx.mounts.items(), key=lambda mount: mount[0])]
+ else:
+ # The test configuration has a filesystem but no fuse/kclient mounts
+ mounts = []
+
+ decorating_loader = DecoratingLoader({
+ "ctx": ctx,
+ "mounts": mounts,
+ "fs": fs,
+ "ceph_cluster": ceph_cluster,
+ "mds_cluster": mds_cluster,
+ "mgr_cluster": mgr_cluster,
+ })
+
+ fail_on_skip = config.get('fail_on_skip', True)
+
+ # Put useful things onto ctx for interactive debugging
+ ctx.fs = fs
+ ctx.mds_cluster = mds_cluster
+ ctx.mgr_cluster = mgr_cluster
+
+ # Depending on config, either load specific modules, or scan for moduless
+ if config and 'modules' in config and config['modules']:
+ module_suites = []
+ for mod_name in config['modules']:
+ # Test names like cephfs.test_auto_repair
+ module_suites.append(decorating_loader.loadTestsFromName(mod_name))
+ overall_suite = suite.TestSuite(module_suites)
+ else:
+ # Default, run all tests
+ overall_suite = decorating_loader.discover(
+ os.path.join(
+ os.path.dirname(os.path.abspath(__file__)),
+ "cephfs/"
+ )
+ )
+
+ if ctx.config.get("interactive-on-error", False):
+ InteractiveFailureResult.ctx = ctx
+ result_class = InteractiveFailureResult
+ else:
+ result_class = unittest.TextTestResult
+
+ class LoggingResult(result_class):
+ def startTest(self, test):
+ log.info("Starting test: {0}".format(self.getDescription(test)))
+ return super(LoggingResult, self).startTest(test)
+
+ def addSkip(self, test, reason):
+ if fail_on_skip:
+ # Don't just call addFailure because that requires a traceback
+ self.failures.append((test, reason))
+ else:
+ super(LoggingResult, self).addSkip(test, reason)
+
+ # Execute!
+ result = unittest.TextTestRunner(
+ stream=LogStream(),
+ resultclass=LoggingResult,
+ verbosity=2,
+ failfast=True).run(overall_suite)
+
+ if not result.wasSuccessful():
+ result.printErrors() # duplicate output at end for convenience
+
+ bad_tests = []
+ for test, error in result.errors:
+ bad_tests.append(str(test))
+ for test, failure in result.failures:
+ bad_tests.append(str(test))
+
+ raise RuntimeError("Test failure: {0}".format(", ".join(bad_tests)))
+
+ yield