From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- qa/tasks/fwd_scrub.py | 152 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 qa/tasks/fwd_scrub.py (limited to 'qa/tasks/fwd_scrub.py') diff --git a/qa/tasks/fwd_scrub.py b/qa/tasks/fwd_scrub.py new file mode 100644 index 000000000..44fd97baa --- /dev/null +++ b/qa/tasks/fwd_scrub.py @@ -0,0 +1,152 @@ +""" +Thrash mds by simulating failures +""" +import logging +import contextlib + +from gevent import sleep, GreenletExit +from gevent.greenlet import Greenlet +from gevent.event import Event +from teuthology import misc as teuthology + +from tasks import ceph_manager +from tasks.cephfs.filesystem import MDSCluster, Filesystem +from tasks.thrasher import Thrasher + +log = logging.getLogger(__name__) + +class ForwardScrubber(Thrasher, Greenlet): + """ + ForwardScrubber:: + + The ForwardScrubber does forward scrubbing of file-systems during execution + of other tasks (workunits, etc). + """ + + def __init__(self, fs, scrub_timeout=300, sleep_between_iterations=1): + super(ForwardScrubber, self).__init__() + + self.logger = log.getChild('fs.[{f}]'.format(f=fs.name)) + self.fs = fs + self.name = 'thrasher.fs.[{f}]'.format(f=fs.name) + self.stopping = Event() + self.scrub_timeout = scrub_timeout + self.sleep_between_iterations = sleep_between_iterations + + def _run(self): + try: + self.do_scrub() + except Exception as e: + self.set_thrasher_exception(e) + self.logger.exception("exception:") + # allow successful completion so gevent doesn't see an exception... + + def stop(self): + self.stopping.set() + + def do_scrub(self): + """ + Perform the file-system scrubbing + """ + self.logger.info(f'start scrubbing fs: {self.fs.name}') + + try: + while not self.stopping.is_set(): + self._scrub() + sleep(self.sleep_between_iterations) + except GreenletExit: + pass + + self.logger.info(f'end scrubbing fs: {self.fs.name}') + + def _scrub(self, path="/", recursive=True): + self.logger.info(f"scrubbing fs: {self.fs.name}") + scrubopts = ["force"] + if recursive: + scrubopts.append("recursive") + out_json = self.fs.run_scrub(["start", path, ",".join(scrubopts)]) + assert out_json is not None + + tag = out_json['scrub_tag'] + + assert tag is not None + assert out_json['return_code'] == 0 + assert out_json['mode'] == 'asynchronous' + + return self.fs.wait_until_scrub_complete(tag=tag, sleep=30, + timeout=self.scrub_timeout) + +def stop_all_fwd_scrubbers(thrashers): + for thrasher in thrashers: + if not isinstance(thrasher, ForwardScrubber): + continue + thrasher.stop() + thrasher.join() + if thrasher.exception is not None: + raise RuntimeError(f"error during scrub thrashing: {thrasher.exception}") + + +@contextlib.contextmanager +def task(ctx, config): + """ + Stress test the mds by running scrub iterations while another task/workunit + is running. + Example config: + + - fwd_scrub: + scrub_timeout: 300 + sleep_between_iterations: 1 + """ + + mds_cluster = MDSCluster(ctx) + + if config is None: + config = {} + assert isinstance(config, dict), \ + 'fwd_scrub task only accepts a dict for configuration' + mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds')) + assert len(mdslist) > 0, \ + 'fwd_scrub task requires at least 1 metadata server' + + (first,) = ctx.cluster.only(f'mds.{mdslist[0]}').remotes.keys() + manager = ceph_manager.CephManager( + first, ctx=ctx, logger=log.getChild('ceph_manager'), + ) + + # make sure everyone is in active, standby, or standby-replay + log.info('Wait for all MDSs to reach steady state...') + status = mds_cluster.status() + while True: + steady = True + for info in status.get_all(): + state = info['state'] + if state not in ('up:active', 'up:standby', 'up:standby-replay'): + steady = False + break + if steady: + break + sleep(2) + status = mds_cluster.status() + + log.info('Ready to start scrub thrashing') + + manager.wait_for_clean() + assert manager.is_clean() + + if 'cluster' not in config: + config['cluster'] = 'ceph' + + for fs in status.get_filesystems(): + fwd_scrubber = ForwardScrubber(Filesystem(ctx, fscid=fs['id']), + config['scrub_timeout'], + config['sleep_between_iterations']) + fwd_scrubber.start() + ctx.ceph[config['cluster']].thrashers.append(fwd_scrubber) + + try: + log.debug('Yielding') + yield + finally: + log.info('joining ForwardScrubbers') + stop_all_fwd_scrubbers(ctx.ceph[config['cluster']].thrashers) + log.info('done joining') -- cgit v1.2.3