summaryrefslogtreecommitdiffstats
path: root/qa/tasks/fwd_scrub.py
blob: 44fd97baa0a6377fd0b7818af3cb3c602bbba7ed (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
Thrash mds by simulating failures
"""
import logging
import contextlib

from gevent import sleep, GreenletExit
from gevent.greenlet import Greenlet
from gevent.event import Event
from teuthology import misc as teuthology

from tasks import ceph_manager
from tasks.cephfs.filesystem import MDSCluster, Filesystem
from tasks.thrasher import Thrasher

log = logging.getLogger(__name__)

class ForwardScrubber(Thrasher, Greenlet):
    """
    ForwardScrubber::

    The ForwardScrubber does forward scrubbing of file-systems during execution
    of other tasks (workunits, etc).
    """

    def __init__(self, fs, scrub_timeout=300, sleep_between_iterations=1):
        super(ForwardScrubber, self).__init__()

        self.logger = log.getChild('fs.[{f}]'.format(f=fs.name))
        self.fs = fs
        self.name = 'thrasher.fs.[{f}]'.format(f=fs.name)
        self.stopping = Event()
        self.scrub_timeout = scrub_timeout
        self.sleep_between_iterations = sleep_between_iterations

    def _run(self):
        try:
            self.do_scrub()
        except Exception as e:
            self.set_thrasher_exception(e)
            self.logger.exception("exception:")
            # allow successful completion so gevent doesn't see an exception...

    def stop(self):
        self.stopping.set()

    def do_scrub(self):
        """
        Perform the file-system scrubbing
        """
        self.logger.info(f'start scrubbing fs: {self.fs.name}')

        try:
            while not self.stopping.is_set():
                self._scrub()
                sleep(self.sleep_between_iterations)
        except GreenletExit:
            pass

        self.logger.info(f'end scrubbing fs: {self.fs.name}')

    def _scrub(self, path="/", recursive=True):
        self.logger.info(f"scrubbing fs: {self.fs.name}")
        scrubopts = ["force"]
        if recursive:
            scrubopts.append("recursive")
        out_json = self.fs.run_scrub(["start", path, ",".join(scrubopts)])
        assert out_json is not None

        tag = out_json['scrub_tag']

        assert tag is not None
        assert out_json['return_code'] == 0
        assert out_json['mode'] == 'asynchronous'

        return self.fs.wait_until_scrub_complete(tag=tag, sleep=30,
                                                 timeout=self.scrub_timeout)

def stop_all_fwd_scrubbers(thrashers):
    for thrasher in thrashers:
        if not isinstance(thrasher, ForwardScrubber):
            continue
        thrasher.stop()
        thrasher.join()
        if thrasher.exception is not None:
            raise RuntimeError(f"error during scrub thrashing: {thrasher.exception}")


@contextlib.contextmanager
def task(ctx, config):
    """
    Stress test the mds by running scrub iterations while another task/workunit
    is running.
    Example config:

    - fwd_scrub:
      scrub_timeout: 300
      sleep_between_iterations: 1
    """

    mds_cluster = MDSCluster(ctx)

    if config is None:
        config = {}
    assert isinstance(config, dict), \
        'fwd_scrub task only accepts a dict for configuration'
    mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds'))
    assert len(mdslist) > 0, \
        'fwd_scrub task requires at least 1 metadata server'

    (first,) = ctx.cluster.only(f'mds.{mdslist[0]}').remotes.keys()
    manager = ceph_manager.CephManager(
        first, ctx=ctx, logger=log.getChild('ceph_manager'),
    )

    # make sure everyone is in active, standby, or standby-replay
    log.info('Wait for all MDSs to reach steady state...')
    status = mds_cluster.status()
    while True:
        steady = True
        for info in status.get_all():
            state = info['state']
            if state not in ('up:active', 'up:standby', 'up:standby-replay'):
                steady = False
                break
        if steady:
            break
        sleep(2)
        status = mds_cluster.status()

    log.info('Ready to start scrub thrashing')

    manager.wait_for_clean()
    assert manager.is_clean()

    if 'cluster' not in config:
        config['cluster'] = 'ceph'

    for fs in status.get_filesystems():
        fwd_scrubber = ForwardScrubber(Filesystem(ctx, fscid=fs['id']),
                                       config['scrub_timeout'],
                                       config['sleep_between_iterations'])
        fwd_scrubber.start()
        ctx.ceph[config['cluster']].thrashers.append(fwd_scrubber)

    try:
        log.debug('Yielding')
        yield
    finally:
        log.info('joining ForwardScrubbers')
        stop_all_fwd_scrubbers(ctx.ceph[config['cluster']].thrashers)
        log.info('done joining')