summaryrefslogtreecommitdiffstats
path: root/qa/tasks/fwd_scrub.py
diff options
context:
space:
mode:
Diffstat (limited to 'qa/tasks/fwd_scrub.py')
-rw-r--r--qa/tasks/fwd_scrub.py152
1 files changed, 152 insertions, 0 deletions
diff --git a/qa/tasks/fwd_scrub.py b/qa/tasks/fwd_scrub.py
new file mode 100644
index 000000000..44fd97baa
--- /dev/null
+++ b/qa/tasks/fwd_scrub.py
@@ -0,0 +1,152 @@
+"""
+Thrash mds by simulating failures
+"""
+import logging
+import contextlib
+
+from gevent import sleep, GreenletExit
+from gevent.greenlet import Greenlet
+from gevent.event import Event
+from teuthology import misc as teuthology
+
+from tasks import ceph_manager
+from tasks.cephfs.filesystem import MDSCluster, Filesystem
+from tasks.thrasher import Thrasher
+
+log = logging.getLogger(__name__)
+
+class ForwardScrubber(Thrasher, Greenlet):
+ """
+ ForwardScrubber::
+
+ The ForwardScrubber does forward scrubbing of file-systems during execution
+ of other tasks (workunits, etc).
+ """
+
+ def __init__(self, fs, scrub_timeout=300, sleep_between_iterations=1):
+ super(ForwardScrubber, self).__init__()
+
+ self.logger = log.getChild('fs.[{f}]'.format(f=fs.name))
+ self.fs = fs
+ self.name = 'thrasher.fs.[{f}]'.format(f=fs.name)
+ self.stopping = Event()
+ self.scrub_timeout = scrub_timeout
+ self.sleep_between_iterations = sleep_between_iterations
+
+ def _run(self):
+ try:
+ self.do_scrub()
+ except Exception as e:
+ self.set_thrasher_exception(e)
+ self.logger.exception("exception:")
+ # allow successful completion so gevent doesn't see an exception...
+
+ def stop(self):
+ self.stopping.set()
+
+ def do_scrub(self):
+ """
+ Perform the file-system scrubbing
+ """
+ self.logger.info(f'start scrubbing fs: {self.fs.name}')
+
+ try:
+ while not self.stopping.is_set():
+ self._scrub()
+ sleep(self.sleep_between_iterations)
+ except GreenletExit:
+ pass
+
+ self.logger.info(f'end scrubbing fs: {self.fs.name}')
+
+ def _scrub(self, path="/", recursive=True):
+ self.logger.info(f"scrubbing fs: {self.fs.name}")
+ scrubopts = ["force"]
+ if recursive:
+ scrubopts.append("recursive")
+ out_json = self.fs.run_scrub(["start", path, ",".join(scrubopts)])
+ assert out_json is not None
+
+ tag = out_json['scrub_tag']
+
+ assert tag is not None
+ assert out_json['return_code'] == 0
+ assert out_json['mode'] == 'asynchronous'
+
+ return self.fs.wait_until_scrub_complete(tag=tag, sleep=30,
+ timeout=self.scrub_timeout)
+
+def stop_all_fwd_scrubbers(thrashers):
+ for thrasher in thrashers:
+ if not isinstance(thrasher, ForwardScrubber):
+ continue
+ thrasher.stop()
+ thrasher.join()
+ if thrasher.exception is not None:
+ raise RuntimeError(f"error during scrub thrashing: {thrasher.exception}")
+
+
+@contextlib.contextmanager
+def task(ctx, config):
+ """
+ Stress test the mds by running scrub iterations while another task/workunit
+ is running.
+ Example config:
+
+ - fwd_scrub:
+ scrub_timeout: 300
+ sleep_between_iterations: 1
+ """
+
+ mds_cluster = MDSCluster(ctx)
+
+ if config is None:
+ config = {}
+ assert isinstance(config, dict), \
+ 'fwd_scrub task only accepts a dict for configuration'
+ mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds'))
+ assert len(mdslist) > 0, \
+ 'fwd_scrub task requires at least 1 metadata server'
+
+ (first,) = ctx.cluster.only(f'mds.{mdslist[0]}').remotes.keys()
+ manager = ceph_manager.CephManager(
+ first, ctx=ctx, logger=log.getChild('ceph_manager'),
+ )
+
+ # make sure everyone is in active, standby, or standby-replay
+ log.info('Wait for all MDSs to reach steady state...')
+ status = mds_cluster.status()
+ while True:
+ steady = True
+ for info in status.get_all():
+ state = info['state']
+ if state not in ('up:active', 'up:standby', 'up:standby-replay'):
+ steady = False
+ break
+ if steady:
+ break
+ sleep(2)
+ status = mds_cluster.status()
+
+ log.info('Ready to start scrub thrashing')
+
+ manager.wait_for_clean()
+ assert manager.is_clean()
+
+ if 'cluster' not in config:
+ config['cluster'] = 'ceph'
+
+ for fs in status.get_filesystems():
+ fwd_scrubber = ForwardScrubber(Filesystem(ctx, fscid=fs['id']),
+ config['scrub_timeout'],
+ config['sleep_between_iterations'])
+ fwd_scrubber.start()
+ ctx.ceph[config['cluster']].thrashers.append(fwd_scrubber)
+
+ try:
+ log.debug('Yielding')
+ yield
+ finally:
+ log.info('joining ForwardScrubbers')
+ stop_all_fwd_scrubbers(ctx.ceph[config['cluster']].thrashers)
+ log.info('done joining')