1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
"""
Scrub osds
"""
import contextlib
import gevent
import logging
import random
import time
from tasks import ceph_manager
from teuthology import misc as teuthology
log = logging.getLogger(__name__)
@contextlib.contextmanager
def task(ctx, config):
"""
Run scrub periodically. Randomly chooses an OSD to scrub.
The config should be as follows:
scrub:
frequency: <seconds between scrubs>
deep: <bool for deepness>
example:
tasks:
- ceph:
- scrub:
frequency: 30
deep: 0
"""
if config is None:
config = {}
assert isinstance(config, dict), \
'scrub task only accepts a dict for configuration'
log.info('Beginning scrub...')
first_mon = teuthology.get_first_mon(ctx, config)
(mon,) = ctx.cluster.only(first_mon).remotes.keys()
manager = ceph_manager.CephManager(
mon,
ctx=ctx,
logger=log.getChild('ceph_manager'),
)
num_osds = teuthology.num_instances_of_type(ctx.cluster, 'osd')
while len(manager.get_osd_status()['up']) < num_osds:
time.sleep(10)
scrub_proc = Scrubber(
manager,
config,
)
try:
yield
finally:
log.info('joining scrub')
scrub_proc.do_join()
class Scrubber:
"""
Scrubbing is actually performed during initialization
"""
def __init__(self, manager, config):
"""
Spawn scrubbing thread upon completion.
"""
self.ceph_manager = manager
self.ceph_manager.wait_for_clean()
osd_status = self.ceph_manager.get_osd_status()
self.osds = osd_status['up']
self.config = config
if self.config is None:
self.config = dict()
else:
def tmp(x):
"""Local display"""
print(x)
self.log = tmp
self.stopping = False
log.info("spawning thread")
self.thread = gevent.spawn(self.do_scrub)
def do_join(self):
"""Scrubbing thread finished"""
self.stopping = True
self.thread.get()
def do_scrub(self):
"""Perform the scrub operation"""
frequency = self.config.get("frequency", 30)
deep = self.config.get("deep", 0)
log.info("stopping %s" % self.stopping)
while not self.stopping:
osd = str(random.choice(self.osds))
if deep:
cmd = 'deep-scrub'
else:
cmd = 'scrub'
log.info('%sbing %s' % (cmd, osd))
self.ceph_manager.raw_cluster_cmd('osd', cmd, osd)
time.sleep(frequency)
|