summaryrefslogtreecommitdiffstats
path: root/qa/tasks/scrub.py
blob: ddc1a9164cfc825cd5ba9e35cc4c1eb088baadac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""
Scrub osds
"""
import contextlib
import gevent
import logging
import random
import time

from tasks import ceph_manager
from teuthology import misc as teuthology

log = logging.getLogger(__name__)

@contextlib.contextmanager
def task(ctx, config):
    """
    Run scrub periodically. Randomly chooses an OSD to scrub.

    The config should be as follows:

    scrub:
        frequency: <seconds between scrubs>
        deep: <bool for deepness>

    example:

    tasks:
    - ceph:
    - scrub:
        frequency: 30
        deep: 0
    """
    if config is None:
        config = {}
    assert isinstance(config, dict), \
        'scrub task only accepts a dict for configuration'

    log.info('Beginning scrub...')

    first_mon = teuthology.get_first_mon(ctx, config)
    (mon,) = ctx.cluster.only(first_mon).remotes.keys()

    manager = ceph_manager.CephManager(
        mon,
        ctx=ctx,
        logger=log.getChild('ceph_manager'),
        )

    num_osds = teuthology.num_instances_of_type(ctx.cluster, 'osd')
    while len(manager.get_osd_status()['up']) < num_osds:
        time.sleep(10)

    scrub_proc = Scrubber(
        manager,
        config,
        )
    try:
        yield
    finally:
        log.info('joining scrub')
        scrub_proc.do_join()

class Scrubber:
    """
    Scrubbing is actually performed during initialization
    """
    def __init__(self, manager, config):
        """
        Spawn scrubbing thread upon completion.
        """
        self.ceph_manager = manager
        self.ceph_manager.wait_for_clean()

        osd_status = self.ceph_manager.get_osd_status()
        self.osds = osd_status['up']

        self.config = config
        if self.config is None:
            self.config = dict()

        else:
            def tmp(x):
                """Local display"""
                print(x)
            self.log = tmp

        self.stopping = False

        log.info("spawning thread")

        self.thread = gevent.spawn(self.do_scrub)

    def do_join(self):
        """Scrubbing thread finished"""
        self.stopping = True
        self.thread.get()

    def do_scrub(self):
        """Perform the scrub operation"""
        frequency = self.config.get("frequency", 30)
        deep = self.config.get("deep", 0)

        log.info("stopping %s" % self.stopping)

        while not self.stopping:
            osd = str(random.choice(self.osds))

            if deep:
                cmd = 'deep-scrub'
            else:
                cmd = 'scrub'

            log.info('%sbing %s' % (cmd, osd))
            self.ceph_manager.raw_cluster_cmd('osd', cmd, osd)

            time.sleep(frequency)