summaryrefslogtreecommitdiffstats
path: root/qa/tasks/mds_thrash.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /qa/tasks/mds_thrash.py
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'qa/tasks/mds_thrash.py')
-rw-r--r--qa/tasks/mds_thrash.py543
1 files changed, 543 insertions, 0 deletions
diff --git a/qa/tasks/mds_thrash.py b/qa/tasks/mds_thrash.py
new file mode 100644
index 00000000..ac543838
--- /dev/null
+++ b/qa/tasks/mds_thrash.py
@@ -0,0 +1,543 @@
+"""
+Thrash mds by simulating failures
+"""
+import logging
+import contextlib
+import itertools
+import random
+import signal
+import time
+
+from gevent import sleep
+from gevent.greenlet import Greenlet
+from gevent.event import Event
+from teuthology import misc as teuthology
+
+from tasks import ceph_manager
+from tasks.cephfs.filesystem import MDSCluster, Filesystem
+
+log = logging.getLogger(__name__)
+
+class DaemonWatchdog(Greenlet):
+ """
+ DaemonWatchdog::
+
+ Watch Ceph daemons for failures. If an extended failure is detected (i.e.
+ not intentional), then the watchdog will unmount file systems and send
+ SIGTERM to all daemons. The duration of an extended failure is configurable
+ with watchdog_daemon_timeout.
+
+ watchdog_daemon_timeout [default: 300]: number of seconds a daemon
+ is allowed to be failed before the watchdog will bark.
+ """
+
+ def __init__(self, ctx, manager, config, thrashers):
+ Greenlet.__init__(self)
+ self.ctx = ctx
+ self.config = config
+ self.e = None
+ self.logger = log.getChild('daemon_watchdog')
+ self.manager = manager
+ self.name = 'watchdog'
+ self.stopping = Event()
+ self.thrashers = thrashers
+
+ def _run(self):
+ try:
+ self.watch()
+ except Exception as e:
+ # See _run exception comment for MDSThrasher
+ self.e = e
+ self.logger.exception("exception:")
+ # allow successful completion so gevent doesn't see an exception...
+
+ def log(self, x):
+ """Write data to logger"""
+ self.logger.info(x)
+
+ def stop(self):
+ self.stopping.set()
+
+ def bark(self):
+ self.log("BARK! unmounting mounts and killing all daemons")
+ for mount in self.ctx.mounts.values():
+ try:
+ mount.umount_wait(force=True)
+ except:
+ self.logger.exception("ignoring exception:")
+ daemons = []
+ daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)))
+ daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)))
+ for daemon in daemons:
+ try:
+ daemon.signal(signal.SIGTERM)
+ except:
+ self.logger.exception("ignoring exception:")
+
+ def watch(self):
+ self.log("watchdog starting")
+ daemon_timeout = int(self.config.get('watchdog_daemon_timeout', 300))
+ daemon_failure_time = {}
+ while not self.stopping.is_set():
+ bark = False
+ now = time.time()
+
+ mons = self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)
+ mdss = self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)
+ clients = self.ctx.daemons.iter_daemons_of_role('client', cluster=self.manager.cluster)
+
+ #for daemon in mons:
+ # self.log("mon daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
+ #for daemon in mdss:
+ # self.log("mds daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
+
+ daemon_failures = []
+ daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mons))
+ daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mdss))
+ for daemon in daemon_failures:
+ name = daemon.role + '.' + daemon.id_
+ dt = daemon_failure_time.setdefault(name, (daemon, now))
+ assert dt[0] is daemon
+ delta = now-dt[1]
+ self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta))
+ if delta > daemon_timeout:
+ bark = True
+
+ # If a daemon is no longer failed, remove it from tracking:
+ for name in daemon_failure_time.keys():
+ if name not in [d.role + '.' + d.id_ for d in daemon_failures]:
+ self.log("daemon {name} has been restored".format(name=name))
+ del daemon_failure_time[name]
+
+ for thrasher in self.thrashers:
+ if thrasher.e is not None:
+ self.log("thrasher on fs.{name} failed".format(name=thrasher.fs.name))
+ bark = True
+
+ if bark:
+ self.bark()
+ return
+
+ sleep(5)
+
+ self.log("watchdog finished")
+
+class MDSThrasher(Greenlet):
+ """
+ MDSThrasher::
+
+ The MDSThrasher thrashes MDSs during execution of other tasks (workunits, etc).
+
+ The config is optional. Many of the config parameters are a a maximum value
+ to use when selecting a random value from a range. To always use the maximum
+ value, set no_random to true. The config is a dict containing some or all of:
+
+ max_thrash: [default: 1] the maximum number of active MDSs per FS that will be thrashed at
+ any given time.
+
+ max_thrash_delay: [default: 30] maximum number of seconds to delay before
+ thrashing again.
+
+ max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in
+ the replay state before thrashing.
+
+ max_revive_delay: [default: 10] maximum number of seconds to delay before
+ bringing back a thrashed MDS.
+
+ randomize: [default: true] enables randomization and use the max/min values
+
+ seed: [no default] seed the random number generator
+
+ thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
+ during replay. Value should be between 0.0 and 1.0.
+
+ thrash_max_mds: [default: 0.05] likelihood that the max_mds of the mds
+ cluster will be modified to a value [1, current) or (current, starting
+ max_mds]. Value should be between 0.0 and 1.0.
+
+ thrash_while_stopping: [default: false] thrash an MDS while there
+ are MDS in up:stopping (because max_mds was changed and some
+ MDS were deactivated).
+
+ thrash_weights: allows specific MDSs to be thrashed more/less frequently.
+ This option overrides anything specified by max_thrash. This option is a
+ dict containing mds.x: weight pairs. For example, [mds.a: 0.7, mds.b:
+ 0.3, mds.c: 0.0]. Each weight is a value from 0.0 to 1.0. Any MDSs not
+ specified will be automatically given a weight of 0.0 (not thrashed).
+ For a given MDS, by default the trasher delays for up to
+ max_thrash_delay, trashes, waits for the MDS to recover, and iterates.
+ If a non-zero weight is specified for an MDS, for each iteration the
+ thrasher chooses whether to thrash during that iteration based on a
+ random value [0-1] not exceeding the weight of that MDS.
+
+ Examples::
+
+
+ The following example sets the likelihood that mds.a will be thrashed
+ to 80%, mds.b to 20%, and other MDSs will not be thrashed. It also sets the
+ likelihood that an MDS will be thrashed in replay to 40%.
+ Thrash weights do not have to sum to 1.
+
+ tasks:
+ - ceph:
+ - mds_thrash:
+ thrash_weights:
+ - mds.a: 0.8
+ - mds.b: 0.2
+ thrash_in_replay: 0.4
+ - ceph-fuse:
+ - workunit:
+ clients:
+ all: [suites/fsx.sh]
+
+ The following example disables randomization, and uses the max delay values:
+
+ tasks:
+ - ceph:
+ - mds_thrash:
+ max_thrash_delay: 10
+ max_revive_delay: 1
+ max_replay_thrash_delay: 4
+
+ """
+
+ def __init__(self, ctx, manager, config, fs, max_mds):
+ Greenlet.__init__(self)
+
+ self.config = config
+ self.ctx = ctx
+ self.e = None
+ self.logger = log.getChild('fs.[{f}]'.format(f = fs.name))
+ self.fs = fs
+ self.manager = manager
+ self.max_mds = max_mds
+ self.name = 'thrasher.fs.[{f}]'.format(f = fs.name)
+ self.stopping = Event()
+
+ self.randomize = bool(self.config.get('randomize', True))
+ self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.05))
+ self.max_thrash = int(self.config.get('max_thrash', 1))
+ self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0))
+ self.thrash_in_replay = float(self.config.get('thrash_in_replay', False))
+ assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format(
+ v=self.thrash_in_replay)
+ self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0))
+ self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0))
+
+ def _run(self):
+ try:
+ self.do_thrash()
+ except Exception as e:
+ # Log exceptions here so we get the full backtrace (gevent loses them).
+ # Also allow successful completion as gevent exception handling is a broken mess:
+ #
+ # 2017-02-03T14:34:01.259 CRITICAL:root: File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051)
+ # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error
+ # self.print_exception(context, type, value, tb)
+ # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 605, in print_exception
+ # traceback.print_exception(type, value, tb, file=errstream)
+ # File "/usr/lib/python2.7/traceback.py", line 124, in print_exception
+ # _print(file, 'Traceback (most recent call last):')
+ # File "/usr/lib/python2.7/traceback.py", line 13, in _print
+ # file.write(str+terminator)
+ # 2017-02-03T14:34:01.261 CRITICAL:root:IOError
+ self.e = e
+ self.logger.exception("exception:")
+ # allow successful completion so gevent doesn't see an exception...
+
+ def log(self, x):
+ """Write data to logger assigned to this MDThrasher"""
+ self.logger.info(x)
+
+ def stop(self):
+ self.stopping.set()
+
+ def kill_mds(self, mds):
+ if self.config.get('powercycle'):
+ (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
+ remotes.keys())
+ self.log('kill_mds on mds.{m} doing powercycle of {s}'.
+ format(m=mds, s=remote.name))
+ self._assert_ipmi(remote)
+ remote.console.power_off()
+ else:
+ self.ctx.daemons.get_daemon('mds', mds).stop()
+
+ @staticmethod
+ def _assert_ipmi(remote):
+ assert remote.console.has_ipmi_credentials, (
+ "powercycling requested but RemoteConsole is not "
+ "initialized. Check ipmi config.")
+
+ def revive_mds(self, mds):
+ """
+ Revive mds -- do an ipmpi powercycle (if indicated by the config)
+ and then restart.
+ """
+ if self.config.get('powercycle'):
+ (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
+ remotes.keys())
+ self.log('revive_mds on mds.{m} doing powercycle of {s}'.
+ format(m=mds, s=remote.name))
+ self._assert_ipmi(remote)
+ remote.console.power_on()
+ self.manager.make_admin_daemon_dir(self.ctx, remote)
+ args = []
+ self.ctx.daemons.get_daemon('mds', mds).restart(*args)
+
+ def wait_for_stable(self, rank = None, gid = None):
+ self.log('waiting for mds cluster to stabilize...')
+ for itercount in itertools.count():
+ status = self.fs.status()
+ max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
+ ranks = list(status.get_ranks(self.fs.id))
+ stopping = sum(1 for _ in ranks if "up:stopping" == _['state'])
+ actives = sum(1 for _ in ranks
+ if "up:active" == _['state'] and "laggy_since" not in _)
+
+ if not bool(self.config.get('thrash_while_stopping', False)) and stopping > 0:
+ if itercount % 5 == 0:
+ self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)')
+ else:
+ if rank is not None:
+ try:
+ info = status.get_rank(self.fs.id, rank)
+ if info['gid'] != gid and "up:active" == info['state']:
+ self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid))
+ return status
+ except:
+ pass # no rank present
+ if actives >= max_mds:
+ # no replacement can occur!
+ self.log("cluster has {actives} actives (max_mds is {max_mds}), no MDS can replace rank {rank}".format(
+ actives=actives, max_mds=max_mds, rank=rank))
+ return status
+ else:
+ if actives == max_mds:
+ self.log('mds cluster has {count} alive and active, now stable!'.format(count = actives))
+ return status, None
+ if itercount > 300/2: # 5 minutes
+ raise RuntimeError('timeout waiting for cluster to stabilize')
+ elif itercount % 5 == 0:
+ self.log('mds map: {status}'.format(status=status))
+ else:
+ self.log('no change')
+ sleep(2)
+
+ def do_thrash(self):
+ """
+ Perform the random thrashing action
+ """
+
+ self.log('starting mds_do_thrash for fs {fs}'.format(fs = self.fs.name))
+ stats = {
+ "max_mds": 0,
+ "deactivate": 0,
+ "kill": 0,
+ }
+
+ while not self.stopping.is_set():
+ delay = self.max_thrash_delay
+ if self.randomize:
+ delay = random.randrange(0.0, self.max_thrash_delay)
+
+ if delay > 0.0:
+ self.log('waiting for {delay} secs before thrashing'.format(delay=delay))
+ self.stopping.wait(delay)
+ if self.stopping.is_set():
+ continue
+
+ status = self.fs.status()
+
+ if random.random() <= self.thrash_max_mds:
+ max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
+ options = list(range(1, max_mds))+list(range(max_mds+1, self.max_mds+1))
+ if len(options) > 0:
+ sample = random.sample(options, 1)
+ new_max_mds = sample[0]
+ self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds))
+ self.fs.set_max_mds(new_max_mds)
+ stats['max_mds'] += 1
+ self.wait_for_stable()
+
+ count = 0
+ for info in status.get_ranks(self.fs.id):
+ name = info['name']
+ label = 'mds.' + name
+ rank = info['rank']
+ gid = info['gid']
+
+ # if thrash_weights isn't specified and we've reached max_thrash,
+ # we're done
+ count = count + 1
+ if 'thrash_weights' not in self.config and count > self.max_thrash:
+ break
+
+ weight = 1.0
+ if 'thrash_weights' in self.config:
+ weight = self.config['thrash_weights'].get(label, '0.0')
+ skip = random.randrange(0.0, 1.0)
+ if weight <= skip:
+ self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight))
+ continue
+
+ self.log('kill {label} (rank={rank})'.format(label=label, rank=rank))
+ self.kill_mds(name)
+ stats['kill'] += 1
+
+ # wait for mon to report killed mds as crashed
+ last_laggy_since = None
+ itercount = 0
+ while True:
+ status = self.fs.status()
+ info = status.get_mds(name)
+ if not info:
+ break
+ if 'laggy_since' in info:
+ last_laggy_since = info['laggy_since']
+ break
+ if any([(f == name) for f in status.get_fsmap(self.fs.id)['mdsmap']['failed']]):
+ break
+ self.log(
+ 'waiting till mds map indicates {label} is laggy/crashed, in failed state, or {label} is removed from mdsmap'.format(
+ label=label))
+ itercount = itercount + 1
+ if itercount > 10:
+ self.log('mds map: {status}'.format(status=status))
+ sleep(2)
+
+ if last_laggy_since:
+ self.log(
+ '{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since))
+ else:
+ self.log('{label} down, removed from mdsmap'.format(label=label, since=last_laggy_since))
+
+ # wait for a standby mds to takeover and become active
+ status = self.wait_for_stable(rank, gid)
+
+ # wait for a while before restarting old active to become new
+ # standby
+ delay = self.max_revive_delay
+ if self.randomize:
+ delay = random.randrange(0.0, self.max_revive_delay)
+
+ self.log('waiting for {delay} secs before reviving {label}'.format(
+ delay=delay, label=label))
+ sleep(delay)
+
+ self.log('reviving {label}'.format(label=label))
+ self.revive_mds(name)
+
+ for itercount in itertools.count():
+ if itercount > 300/2: # 5 minutes
+ raise RuntimeError('timeout waiting for MDS to revive')
+ status = self.fs.status()
+ info = status.get_mds(name)
+ if info and info['state'] in ('up:standby', 'up:standby-replay', 'up:active'):
+ self.log('{label} reported in {state} state'.format(label=label, state=info['state']))
+ break
+ self.log(
+ 'waiting till mds map indicates {label} is in active, standby or standby-replay'.format(label=label))
+ sleep(2)
+
+ for stat in stats:
+ self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat]))
+
+ # don't do replay thrashing right now
+# for info in status.get_replays(self.fs.id):
+# # this might race with replay -> active transition...
+# if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay:
+# delay = self.max_replay_thrash_delay
+# if self.randomize:
+# delay = random.randrange(0.0, self.max_replay_thrash_delay)
+# sleep(delay)
+# self.log('kill replaying mds.{id}'.format(id=self.to_kill))
+# self.kill_mds(self.to_kill)
+#
+# delay = self.max_revive_delay
+# if self.randomize:
+# delay = random.randrange(0.0, self.max_revive_delay)
+#
+# self.log('waiting for {delay} secs before reviving mds.{id}'.format(
+# delay=delay, id=self.to_kill))
+# sleep(delay)
+#
+# self.log('revive mds.{id}'.format(id=self.to_kill))
+# self.revive_mds(self.to_kill)
+
+
+@contextlib.contextmanager
+def task(ctx, config):
+ """
+ Stress test the mds by thrashing while another task/workunit
+ is running.
+
+ Please refer to MDSThrasher class for further information on the
+ available options.
+ """
+
+ mds_cluster = MDSCluster(ctx)
+
+ if config is None:
+ config = {}
+ assert isinstance(config, dict), \
+ 'mds_thrash task only accepts a dict for configuration'
+ mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds'))
+ assert len(mdslist) > 1, \
+ 'mds_thrash task requires at least 2 metadata servers'
+
+ # choose random seed
+ if 'seed' in config:
+ seed = int(config['seed'])
+ else:
+ seed = int(time.time())
+ log.info('mds thrasher using random seed: {seed}'.format(seed=seed))
+ random.seed(seed)
+
+ (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.keys()
+ manager = ceph_manager.CephManager(
+ first, ctx=ctx, logger=log.getChild('ceph_manager'),
+ )
+
+ # make sure everyone is in active, standby, or standby-replay
+ log.info('Wait for all MDSs to reach steady state...')
+ status = mds_cluster.status()
+ while True:
+ steady = True
+ for info in status.get_all():
+ state = info['state']
+ if state not in ('up:active', 'up:standby', 'up:standby-replay'):
+ steady = False
+ break
+ if steady:
+ break
+ sleep(2)
+ status = mds_cluster.status()
+ log.info('Ready to start thrashing')
+
+ thrashers = []
+
+ watchdog = DaemonWatchdog(ctx, manager, config, thrashers)
+ watchdog.start()
+
+ manager.wait_for_clean()
+ assert manager.is_clean()
+ for fs in status.get_filesystems():
+ thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds'])
+ thrasher.start()
+ thrashers.append(thrasher)
+
+ try:
+ log.debug('Yielding')
+ yield
+ finally:
+ log.info('joining mds_thrashers')
+ for thrasher in thrashers:
+ thrasher.stop()
+ if thrasher.e:
+ raise RuntimeError('error during thrashing')
+ thrasher.join()
+ log.info('done joining')
+
+ watchdog.stop()
+ watchdog.join()