""" Run ceph-dedup-tool """ import contextlib import logging import gevent from teuthology import misc as teuthology import json import time from io import StringIO log = logging.getLogger(__name__) @contextlib.contextmanager def task(ctx, config): """ Run ceph-dedup-tool. The config should be as follows:: ceph-dedup-tool: clients: [client list] op: pool: chunk_pool: chunk_size: chunk_algorithm: fingerprint_algorithm: chunk_dedup_threashold: max_thread: wakeup_period: For example:: tasks: - exec: client.0: - sudo ceph osd pool create low_tier 4 - deduplication: clients: [client.0] op: 'sample-dedup' pool: 'default.rgw.buckets.data' chunk_pool: 'low_tier' chunk_size: 131072 chunk_algorithm: 'fastcdc' fingerprint_algorithm: 'sha1' chunk_dedup_threshold: 5 max_thread: 2 wakeup_period: 20 sampling_ratio: 100 """ log.info('Beginning deduplication...') assert isinstance(config, dict), \ "please list clients to run on" args = [ 'ceph-dedup-tool'] if config.get('op', None): args.extend(['--op', config.get('op', None)]) if config.get('chunk_pool', None): args.extend(['--chunk-pool', config.get('chunk_pool', None)]) if config.get('chunk_size', False): args.extend(['--chunk-size', str(config.get('chunk_size', 131072))]) if config.get('chunk_algorithm', False): args.extend(['--chunk-algorithm', config.get('chunk_algorithm', None)] ) if config.get('fingerprint_algorithm', False): args.extend(['--fingerprint-algorithm', config.get('fingerprint_algorithm', None)] ) if config.get('chunk_dedup_threshold', False): args.extend(['--chunk-dedup-threshold', str(config.get('chunk_dedup_threshold', 1))]) if config.get('max_thread', False): args.extend(['--max-thread', str(config.get('max_thread', 2))]) if config.get('sampling_ratio', False): args.extend(['--sampling-ratio', str(config.get('sampling_ratio', 100))]) if config.get('wakeup_period', False): args.extend(['--wakeup-period', str(config.get('wakeup_period', 20))]) if config.get('pool', False): args.extend(['--pool', config.get('pool', None)]) args.extend([ '--debug', '--daemon', '--loop']) def thread(): run_remote(args, False, 0) def run_remote(args, need_wait, client_num): clients = ['client.{id}'.format(id=id_) for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')] log.info('clients are %s' % clients) role = 'client.{id}'.format(id=client_num) if role not in clients: raise Exception('wrong client {c}'.format(c=role)) assert isinstance(role, str) PREFIX = 'client.' assert role.startswith(PREFIX) testdir = teuthology.get_testdir(ctx) cmd_args = [ 'adjust-ulimits', 'ceph-coverage', '{tdir}/archive/coverage'.format(tdir=testdir)] cmd_args.extend(args) log.info("cmd: %s", cmd_args) tries = 0 while True: (remote,) = ctx.cluster.only(role).remotes.keys() proc = remote.run( args=cmd_args, wait=need_wait, check_status=False, stdout=StringIO(), ) log.info('exitstatus {r}'.format(r=proc.exitstatus)) if proc.exitstatus == 0 or need_wait == False: log.info('proc stdout ', proc.stdout.getvalue()) return proc.stdout.getvalue().strip() tries += 1 if tries > 30: raise Exception('timed out getting correct exitstatus') time.sleep(30) def get_chunk_objs(chunk_pool): chunk_obj_list = run_remote(('rados ls -p ' + chunk_pool).split(), True, 1).split() if chunk_obj_list == False: return None else: return chunk_obj_list def get_ref_list(chunk_pool, chunk_obj): # get reference list of chunk object dump_str = run_remote( ('ceph-dedup-tool --op dump-chunk-refs --chunk-pool ' + chunk_pool + ' --object ' + chunk_obj).split(), True, 1 ) # fail in case that reference object is not written assert len(dump_str) > 0 log.info('{0} obj has {1} refs' .format(chunk_obj, json.loads(dump_str)['count'])) # check if chunk object's reference object exists in base-tier ref_list = json.loads(dump_str)['refs'] return ref_list # To validate whether the sample-dedup operation works well, this function checks if # 1. sample-dedup has been started and # 2. reference of chunk objects' exists in correct base pool def validate(): log.info('start validating sample-dedup') base_pool = config.get('pool', None) chunk_pool = config.get('chunk_pool', None) max_validation_cnt = 15 retry_cnt = 0 # chunk objs for re-validation after chunk-repair retry_chunk_objs = list() # check whether sample-dedup has been started chunk_obj_list = get_chunk_objs(chunk_pool) while (chunk_obj_list == None or len(chunk_obj_list) == 0) and retry_cnt < max_validation_cnt: # retry getting # chunk objs after 30 secs of sleep time.sleep(30) chunk_obj_list = get_chunk_objs(chunk_pool) retry_cnt += 1 log.info('chunk pool empty. retry ', retry_cnt) assert retry_cnt < max_validation_cnt log.info('sample-dedup started successfully') retry_cnt = 0 max_validation_cnt = 5 # validate chunk pool for max_validation_cnt times while retry_cnt < max_validation_cnt: for chunk_obj in chunk_obj_list: ref_list = get_ref_list(chunk_pool, chunk_obj) for ref in ref_list: ret = run_remote( ('rados -p ' + base_pool + ' stat ' + ref['oid']) .split(), True, 1 ) # check if ref exists in base pool if ret == False or len(ret) == 0: # if ref not exists in base pool, try repair in order to avoid # false-positive inconsistent reference ret = run_remote(('ceph osd pool stats ' + base_pool).split(), True, 1) assert len(ret) > 0 base_pool_id = ret.split()[3] ret = run_remote( ('ceph-dedup-tool --op chunk-repair --chunk-pool ' + chunk_pool + ' --object ' + chunk_obj + ' --target-ref ' + ref['oid'] + ' --target-ref-pool-id ' + base_pool_id) .split(), True, 1 ) retry_chunk_objs.append(chunk_obj) log.info('{0} obj exists in {1}'.format(ref['oid'], base_pool)) # retry validation for repaired objects for chunk_obj in retry_chunk_objs: ref_list = get_ref_list(chunk_pool, chunk_obj) for ref in ref_list: ret = run_remote( ('rados -p ' + base_pool + ' stat ' + ref['oid']) .split(), True, 1 ) assert len(ret) > 0 log.info( '{0} obj exists in {1} after repair'.format(ref['oid'], base_pool) ) retry_chunk_objs = list() # get chunk objects for the next loop chunk_obj_list = get_chunk_objs(chunk_pool) retry_cnt += 1 time.sleep(30) return True running = gevent.spawn(thread) checker = gevent.spawn(validate) try: yield finally: log.info('joining ceph-dedup-tool') running.get() checker.get()