summaryrefslogtreecommitdiffstats
path: root/qa/tasks/ceph.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--qa/tasks/ceph.py1960
1 files changed, 1960 insertions, 0 deletions
diff --git a/qa/tasks/ceph.py b/qa/tasks/ceph.py
new file mode 100644
index 000000000..105362d48
--- /dev/null
+++ b/qa/tasks/ceph.py
@@ -0,0 +1,1960 @@
+"""
+Ceph cluster task.
+
+Handle the setup, starting, and clean-up of a Ceph cluster.
+"""
+from copy import deepcopy
+from io import BytesIO
+from io import StringIO
+
+import argparse
+import configobj
+import contextlib
+import errno
+import logging
+import os
+import json
+import time
+import gevent
+import re
+import socket
+import yaml
+
+from paramiko import SSHException
+from tasks.ceph_manager import CephManager, write_conf, get_valgrind_args
+from tarfile import ReadError
+from tasks.cephfs.filesystem import MDSCluster, Filesystem
+from teuthology import misc as teuthology
+from teuthology import contextutil
+from teuthology import exceptions
+from teuthology.orchestra import run
+from tasks import ceph_client as cclient
+from teuthology.orchestra.daemon import DaemonGroup
+from tasks.daemonwatchdog import DaemonWatchdog
+
+CEPH_ROLE_TYPES = ['mon', 'mgr', 'osd', 'mds', 'rgw']
+DATA_PATH = '/var/lib/ceph/{type_}/{cluster}-{id_}'
+
+log = logging.getLogger(__name__)
+
+
+def generate_caps(type_):
+ """
+ Each call will return the next capability for each system type
+ (essentially a subset of possible role values). Valid types are osd,
+ mds and client.
+ """
+ defaults = dict(
+ osd=dict(
+ mon='allow profile osd',
+ mgr='allow profile osd',
+ osd='allow *',
+ ),
+ mgr=dict(
+ mon='allow profile mgr',
+ osd='allow *',
+ mds='allow *',
+ ),
+ mds=dict(
+ mon='allow *',
+ mgr='allow *',
+ osd='allow *',
+ mds='allow',
+ ),
+ client=dict(
+ mon='allow rw',
+ mgr='allow r',
+ osd='allow rwx',
+ mds='allow',
+ ),
+ )
+ for subsystem, capability in defaults[type_].items():
+ yield '--cap'
+ yield subsystem
+ yield capability
+
+
+def update_archive_setting(ctx, key, value):
+ """
+ Add logs directory to job's info log file
+ """
+ if ctx.archive is None:
+ return
+ with open(os.path.join(ctx.archive, 'info.yaml'), 'r+') as info_file:
+ info_yaml = yaml.safe_load(info_file)
+ info_file.seek(0)
+ if 'archive' in info_yaml:
+ info_yaml['archive'][key] = value
+ else:
+ info_yaml['archive'] = {key: value}
+ yaml.safe_dump(info_yaml, info_file, default_flow_style=False)
+
+
+@contextlib.contextmanager
+def ceph_crash(ctx, config):
+ """
+ Gather crash dumps from /var/lib/ceph/crash
+ """
+
+ # Add crash directory to job's archive
+ update_archive_setting(ctx, 'crash', '/var/lib/ceph/crash')
+
+ try:
+ yield
+
+ finally:
+ if ctx.archive is not None:
+ log.info('Archiving crash dumps...')
+ path = os.path.join(ctx.archive, 'remote')
+ try:
+ os.makedirs(path)
+ except OSError:
+ pass
+ for remote in ctx.cluster.remotes.keys():
+ sub = os.path.join(path, remote.shortname)
+ try:
+ os.makedirs(sub)
+ except OSError:
+ pass
+ try:
+ teuthology.pull_directory(remote, '/var/lib/ceph/crash',
+ os.path.join(sub, 'crash'))
+ except ReadError:
+ pass
+
+
+@contextlib.contextmanager
+def ceph_log(ctx, config):
+ """
+ Create /var/log/ceph log directory that is open to everyone.
+ Add valgrind and profiling-logger directories.
+
+ :param ctx: Context
+ :param config: Configuration
+ """
+ log.info('Making ceph log dir writeable by non-root...')
+ run.wait(
+ ctx.cluster.run(
+ args=[
+ 'sudo',
+ 'chmod',
+ '777',
+ '/var/log/ceph',
+ ],
+ wait=False,
+ )
+ )
+ log.info('Disabling ceph logrotate...')
+ run.wait(
+ ctx.cluster.run(
+ args=[
+ 'sudo',
+ 'rm', '-f', '--',
+ '/etc/logrotate.d/ceph',
+ ],
+ wait=False,
+ )
+ )
+ log.info('Creating extra log directories...')
+ run.wait(
+ ctx.cluster.run(
+ args=[
+ 'sudo',
+ 'install', '-d', '-m0777', '--',
+ '/var/log/ceph/valgrind',
+ '/var/log/ceph/profiling-logger',
+ ],
+ wait=False,
+ )
+ )
+
+ # Add logs directory to job's info log file
+ update_archive_setting(ctx, 'log', '/var/log/ceph')
+
+ class Rotater(object):
+ stop_event = gevent.event.Event()
+
+ def invoke_logrotate(self):
+ # 1) install ceph-test.conf in /etc/logrotate.d
+ # 2) continuously loop over logrotate invocation with ceph-test.conf
+ while not self.stop_event.is_set():
+ self.stop_event.wait(timeout=30)
+ try:
+ procs = ctx.cluster.run(
+ args=['sudo', 'logrotate', '/etc/logrotate.d/ceph-test.conf'],
+ wait=False,
+ stderr=StringIO()
+ )
+ run.wait(procs)
+ except exceptions.ConnectionLostError as e:
+ # Some tests may power off nodes during test, in which
+ # case we will see connection errors that we should ignore.
+ log.debug("Missed logrotate, node '{0}' is offline".format(
+ e.node))
+ except EOFError:
+ # Paramiko sometimes raises this when it fails to
+ # connect to a node during open_session. As with
+ # ConnectionLostError, we ignore this because nodes
+ # are allowed to get power cycled during tests.
+ log.debug("Missed logrotate, EOFError")
+ except SSHException:
+ log.debug("Missed logrotate, SSHException")
+ except run.CommandFailedError as e:
+ for p in procs:
+ if p.finished and p.exitstatus != 0:
+ err = p.stderr.getvalue()
+ if 'error: error renaming temp state file' in err:
+ log.info('ignoring transient state error: %s', e)
+ else:
+ raise
+ except socket.error as e:
+ if e.errno in (errno.EHOSTUNREACH, errno.ECONNRESET):
+ log.debug("Missed logrotate, host unreachable")
+ else:
+ raise
+
+ def begin(self):
+ self.thread = gevent.spawn(self.invoke_logrotate)
+
+ def end(self):
+ self.stop_event.set()
+ self.thread.get()
+
+ def write_rotate_conf(ctx, daemons):
+ testdir = teuthology.get_testdir(ctx)
+ remote_logrotate_conf = '%s/logrotate.ceph-test.conf' % testdir
+ rotate_conf_path = os.path.join(os.path.dirname(__file__), 'logrotate.conf')
+ with open(rotate_conf_path) as f:
+ conf = ""
+ for daemon, size in daemons.items():
+ log.info('writing logrotate stanza for {}'.format(daemon))
+ conf += f.read().format(daemon_type=daemon,
+ max_size=size)
+ f.seek(0, 0)
+
+ for remote in ctx.cluster.remotes.keys():
+ remote.write_file(remote_logrotate_conf, BytesIO(conf.encode()))
+ remote.sh(
+ f'sudo mv {remote_logrotate_conf} /etc/logrotate.d/ceph-test.conf && '
+ 'sudo chmod 0644 /etc/logrotate.d/ceph-test.conf && '
+ 'sudo chown root.root /etc/logrotate.d/ceph-test.conf')
+ remote.chcon('/etc/logrotate.d/ceph-test.conf',
+ 'system_u:object_r:etc_t:s0')
+
+ if ctx.config.get('log-rotate'):
+ daemons = ctx.config.get('log-rotate')
+ log.info('Setting up log rotation with ' + str(daemons))
+ write_rotate_conf(ctx, daemons)
+ logrotater = Rotater()
+ logrotater.begin()
+ try:
+ yield
+
+ finally:
+ if ctx.config.get('log-rotate'):
+ log.info('Shutting down logrotate')
+ logrotater.end()
+ ctx.cluster.sh('sudo rm /etc/logrotate.d/ceph-test.conf')
+ if ctx.archive is not None and \
+ not (ctx.config.get('archive-on-error') and ctx.summary['success']):
+ # and logs
+ log.info('Compressing logs...')
+ run.wait(
+ ctx.cluster.run(
+ args=[
+ 'sudo',
+ 'find',
+ '/var/log/ceph',
+ '-name',
+ '*.log',
+ '-print0',
+ run.Raw('|'),
+ 'sudo',
+ 'xargs',
+ '-0',
+ '--no-run-if-empty',
+ '--',
+ 'gzip',
+ '--',
+ ],
+ wait=False,
+ ),
+ )
+
+ log.info('Archiving logs...')
+ path = os.path.join(ctx.archive, 'remote')
+ try:
+ os.makedirs(path)
+ except OSError:
+ pass
+ for remote in ctx.cluster.remotes.keys():
+ sub = os.path.join(path, remote.shortname)
+ try:
+ os.makedirs(sub)
+ except OSError:
+ pass
+ teuthology.pull_directory(remote, '/var/log/ceph',
+ os.path.join(sub, 'log'))
+
+
+def assign_devs(roles, devs):
+ """
+ Create a dictionary of devs indexed by roles
+
+ :param roles: List of roles
+ :param devs: Corresponding list of devices.
+ :returns: Dictionary of devs indexed by roles.
+ """
+ return dict(zip(roles, devs))
+
+
+@contextlib.contextmanager
+def valgrind_post(ctx, config):
+ """
+ After the tests run, look through all the valgrind logs. Exceptions are raised
+ if textual errors occurred in the logs, or if valgrind exceptions were detected in
+ the logs.
+
+ :param ctx: Context
+ :param config: Configuration
+ """
+ try:
+ yield
+ finally:
+ lookup_procs = list()
+ log.info('Checking for errors in any valgrind logs...')
+ for remote in ctx.cluster.remotes.keys():
+ # look at valgrind logs for each node
+ proc = remote.run(
+ args="sudo zgrep '<kind>' /var/log/ceph/valgrind/* "
+ # include a second file so that we always get
+ # a filename prefix on the output
+ "/dev/null | sort | uniq",
+ wait=False,
+ check_status=False,
+ stdout=StringIO(),
+ )
+ lookup_procs.append((proc, remote))
+
+ valgrind_exception = None
+ for (proc, remote) in lookup_procs:
+ proc.wait()
+ out = proc.stdout.getvalue()
+ for line in out.split('\n'):
+ if line == '':
+ continue
+ try:
+ (file, kind) = line.split(':')
+ except Exception:
+ log.error('failed to split line %s', line)
+ raise
+ log.debug('file %s kind %s', file, kind)
+ if (file.find('mds') >= 0) and kind.find('Lost') > 0:
+ continue
+ log.error('saw valgrind issue %s in %s', kind, file)
+ valgrind_exception = Exception('saw valgrind issues')
+
+ if config.get('expect_valgrind_errors'):
+ if not valgrind_exception:
+ raise Exception('expected valgrind issues and found none')
+ else:
+ if valgrind_exception:
+ raise valgrind_exception
+
+
+@contextlib.contextmanager
+def crush_setup(ctx, config):
+ cluster_name = config['cluster']
+ first_mon = teuthology.get_first_mon(ctx, config, cluster_name)
+ (mon_remote,) = ctx.cluster.only(first_mon).remotes.keys()
+
+ profile = config.get('crush_tunables', 'default')
+ log.info('Setting crush tunables to %s', profile)
+ mon_remote.run(
+ args=['sudo', 'ceph', '--cluster', cluster_name,
+ 'osd', 'crush', 'tunables', profile])
+ yield
+
+
+@contextlib.contextmanager
+def check_enable_crimson(ctx, config):
+ # enable crimson-osds if crimson
+ log.info("check_enable_crimson: {}".format(is_crimson(config)))
+ if is_crimson(config):
+ cluster_name = config['cluster']
+ first_mon = teuthology.get_first_mon(ctx, config, cluster_name)
+ (mon_remote,) = ctx.cluster.only(first_mon).remotes.keys()
+ log.info('check_enable_crimson: setting set-allow-crimson')
+ mon_remote.run(
+ args=[
+ 'sudo', 'ceph', '--cluster', cluster_name,
+ 'osd', 'set-allow-crimson', '--yes-i-really-mean-it'
+ ]
+ )
+ yield
+
+
+@contextlib.contextmanager
+def setup_manager(ctx, config):
+ first_mon = teuthology.get_first_mon(ctx, config, config['cluster'])
+ (mon,) = ctx.cluster.only(first_mon).remotes.keys()
+ if not hasattr(ctx, 'managers'):
+ ctx.managers = {}
+ ctx.managers[config['cluster']] = CephManager(
+ mon,
+ ctx=ctx,
+ logger=log.getChild('ceph_manager.' + config['cluster']),
+ cluster=config['cluster'],
+ )
+ yield
+
+@contextlib.contextmanager
+def create_rbd_pool(ctx, config):
+ cluster_name = config['cluster']
+ first_mon = teuthology.get_first_mon(ctx, config, cluster_name)
+ (mon_remote,) = ctx.cluster.only(first_mon).remotes.keys()
+ log.info('Waiting for OSDs to come up')
+ teuthology.wait_until_osds_up(
+ ctx,
+ cluster=ctx.cluster,
+ remote=mon_remote,
+ ceph_cluster=cluster_name,
+ )
+ if config.get('create_rbd_pool', True):
+ log.info('Creating RBD pool')
+ mon_remote.run(
+ args=['sudo', 'ceph', '--cluster', cluster_name,
+ 'osd', 'pool', 'create', 'rbd', '8'])
+ mon_remote.run(
+ args=[
+ 'sudo', 'ceph', '--cluster', cluster_name,
+ 'osd', 'pool', 'application', 'enable',
+ 'rbd', 'rbd', '--yes-i-really-mean-it'
+ ],
+ check_status=False)
+ yield
+
+@contextlib.contextmanager
+def cephfs_setup(ctx, config):
+ cluster_name = config['cluster']
+
+ first_mon = teuthology.get_first_mon(ctx, config, cluster_name)
+ (mon_remote,) = ctx.cluster.only(first_mon).remotes.keys()
+ mdss = ctx.cluster.only(teuthology.is_type('mds', cluster_name))
+ # If there are any MDSs, then create a filesystem for them to use
+ # Do this last because requires mon cluster to be up and running
+ if mdss.remotes:
+ log.info('Setting up CephFS filesystem(s)...')
+ cephfs_config = config.get('cephfs', {})
+ fs_configs = cephfs_config.pop('fs', [{'name': 'cephfs'}])
+
+ # wait for standbys to become available (slow due to valgrind, perhaps)
+ mdsc = MDSCluster(ctx)
+ mds_count = len(list(teuthology.all_roles_of_type(ctx.cluster, 'mds')))
+ with contextutil.safe_while(sleep=2,tries=150) as proceed:
+ while proceed():
+ if len(mdsc.get_standby_daemons()) >= mds_count:
+ break
+
+ fss = []
+ for fs_config in fs_configs:
+ assert isinstance(fs_config, dict)
+ name = fs_config.pop('name')
+ temp = deepcopy(cephfs_config)
+ teuthology.deep_merge(temp, fs_config)
+ subvols = config.get('subvols', None)
+ if subvols:
+ teuthology.deep_merge(temp, {'subvols': subvols})
+ fs = Filesystem(ctx, fs_config=temp, name=name, create=True)
+ fss.append(fs)
+
+ yield
+
+ for fs in fss:
+ fs.destroy()
+ else:
+ yield
+
+@contextlib.contextmanager
+def watchdog_setup(ctx, config):
+ ctx.ceph[config['cluster']].thrashers = []
+ ctx.ceph[config['cluster']].watchdog = DaemonWatchdog(ctx, config, ctx.ceph[config['cluster']].thrashers)
+ ctx.ceph[config['cluster']].watchdog.start()
+ yield
+
+def get_mons(roles, ips, cluster_name,
+ mon_bind_msgr2=False,
+ mon_bind_addrvec=False):
+ """
+ Get monitors and their associated addresses
+ """
+ mons = {}
+ v1_ports = {}
+ v2_ports = {}
+ is_mon = teuthology.is_type('mon', cluster_name)
+ for idx, roles in enumerate(roles):
+ for role in roles:
+ if not is_mon(role):
+ continue
+ if ips[idx] not in v1_ports:
+ v1_ports[ips[idx]] = 6789
+ else:
+ v1_ports[ips[idx]] += 1
+ if mon_bind_msgr2:
+ if ips[idx] not in v2_ports:
+ v2_ports[ips[idx]] = 3300
+ addr = '{ip}'.format(ip=ips[idx])
+ else:
+ assert mon_bind_addrvec
+ v2_ports[ips[idx]] += 1
+ addr = '[v2:{ip}:{port2},v1:{ip}:{port1}]'.format(
+ ip=ips[idx],
+ port2=v2_ports[ips[idx]],
+ port1=v1_ports[ips[idx]],
+ )
+ elif mon_bind_addrvec:
+ addr = '[v1:{ip}:{port}]'.format(
+ ip=ips[idx],
+ port=v1_ports[ips[idx]],
+ )
+ else:
+ addr = '{ip}:{port}'.format(
+ ip=ips[idx],
+ port=v1_ports[ips[idx]],
+ )
+ mons[role] = addr
+ assert mons
+ return mons
+
+def skeleton_config(ctx, roles, ips, mons, cluster='ceph'):
+ """
+ Returns a ConfigObj that is prefilled with a skeleton config.
+
+ Use conf[section][key]=value or conf.merge to change it.
+
+ Use conf.write to write it out, override .filename first if you want.
+ """
+ path = os.path.join(os.path.dirname(__file__), 'ceph.conf.template')
+ conf = configobj.ConfigObj(path, file_error=True)
+ mon_hosts = []
+ for role, addr in mons.items():
+ mon_cluster, _, _ = teuthology.split_role(role)
+ if mon_cluster != cluster:
+ continue
+ name = teuthology.ceph_role(role)
+ conf.setdefault(name, {})
+ mon_hosts.append(addr)
+ conf.setdefault('global', {})
+ conf['global']['mon host'] = ','.join(mon_hosts)
+ # set up standby mds's
+ is_mds = teuthology.is_type('mds', cluster)
+ for roles_subset in roles:
+ for role in roles_subset:
+ if is_mds(role):
+ name = teuthology.ceph_role(role)
+ conf.setdefault(name, {})
+ return conf
+
+def create_simple_monmap(ctx, remote, conf, mons,
+ path=None,
+ mon_bind_addrvec=False):
+ """
+ Writes a simple monmap based on current ceph.conf into path, or
+ <testdir>/monmap by default.
+
+ Assumes ceph_conf is up to date.
+
+ Assumes mon sections are named "mon.*", with the dot.
+
+ :return the FSID (as a string) of the newly created monmap
+ """
+
+ addresses = list(mons.items())
+ assert addresses, "There are no monitors in config!"
+ log.debug('Ceph mon addresses: %s', addresses)
+
+ try:
+ log.debug('writing out conf {c}'.format(c=conf))
+ except:
+ log.debug('my conf logging attempt failed')
+ testdir = teuthology.get_testdir(ctx)
+ tmp_conf_path = '{tdir}/ceph.tmp.conf'.format(tdir=testdir)
+ conf_fp = BytesIO()
+ conf.write(conf_fp)
+ conf_fp.seek(0)
+ teuthology.write_file(remote, tmp_conf_path, conf_fp)
+ args = [
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ '{tdir}/archive/coverage'.format(tdir=testdir),
+ 'monmaptool',
+ '-c',
+ '{conf}'.format(conf=tmp_conf_path),
+ '--create',
+ '--clobber',
+ ]
+ if mon_bind_addrvec:
+ args.extend(['--enable-all-features'])
+ for (role, addr) in addresses:
+ _, _, n = teuthology.split_role(role)
+ if mon_bind_addrvec and (',' in addr or 'v' in addr or ':' in addr):
+ args.extend(('--addv', n, addr))
+ else:
+ args.extend(('--add', n, addr))
+ if not path:
+ path = '{tdir}/monmap'.format(tdir=testdir)
+ args.extend([
+ '--print',
+ path
+ ])
+
+ monmap_output = remote.sh(args)
+ fsid = re.search("generated fsid (.+)$",
+ monmap_output, re.MULTILINE).group(1)
+ teuthology.delete_file(remote, tmp_conf_path)
+ return fsid
+
+
+def is_crimson(config):
+ return config.get('flavor', 'default') == 'crimson'
+
+
+def maybe_redirect_stderr(config, type_, args, log_path):
+ if type_ == 'osd' and is_crimson(config):
+ # teuthworker uses ubuntu:ubuntu to access the test nodes
+ create_log_cmd = \
+ f'sudo install -b -o ubuntu -g ubuntu /dev/null {log_path}'
+ return create_log_cmd, args + [run.Raw('2>>'), log_path]
+ else:
+ return None, args
+
+
+@contextlib.contextmanager
+def cluster(ctx, config):
+ """
+ Handle the creation and removal of a ceph cluster.
+
+ On startup:
+ Create directories needed for the cluster.
+ Create remote journals for all osds.
+ Create and set keyring.
+ Copy the monmap to the test systems.
+ Setup mon nodes.
+ Setup mds nodes.
+ Mkfs osd nodes.
+ Add keyring information to monmaps
+ Mkfs mon nodes.
+
+ On exit:
+ If errors occurred, extract a failure message and store in ctx.summary.
+ Unmount all test files and temporary journaling files.
+ Save the monitor information and archive all ceph logs.
+ Cleanup the keyring setup, and remove all monitor map and data files left over.
+
+ :param ctx: Context
+ :param config: Configuration
+ """
+ if ctx.config.get('use_existing_cluster', False) is True:
+ log.info("'use_existing_cluster' is true; skipping cluster creation")
+ yield
+
+ testdir = teuthology.get_testdir(ctx)
+ cluster_name = config['cluster']
+ data_dir = '{tdir}/{cluster}.data'.format(tdir=testdir, cluster=cluster_name)
+ log.info('Creating ceph cluster %s...', cluster_name)
+ log.info('config %s', config)
+ log.info('ctx.config %s', ctx.config)
+ run.wait(
+ ctx.cluster.run(
+ args=[
+ 'install', '-d', '-m0755', '--',
+ data_dir,
+ ],
+ wait=False,
+ )
+ )
+
+ run.wait(
+ ctx.cluster.run(
+ args=[
+ 'sudo',
+ 'install', '-d', '-m0777', '--', '/var/run/ceph',
+ ],
+ wait=False,
+ )
+ )
+
+ devs_to_clean = {}
+ remote_to_roles_to_devs = {}
+ osds = ctx.cluster.only(teuthology.is_type('osd', cluster_name))
+ for remote, roles_for_host in osds.remotes.items():
+ devs = teuthology.get_scratch_devices(remote)
+ roles_to_devs = assign_devs(
+ teuthology.cluster_roles_of_type(roles_for_host, 'osd', cluster_name), devs
+ )
+ devs_to_clean[remote] = []
+ log.info('osd dev map: {}'.format(roles_to_devs))
+ assert roles_to_devs, \
+ "remote {} has osd roles, but no osd devices were specified!".format(remote.hostname)
+ remote_to_roles_to_devs[remote] = roles_to_devs
+ log.info("remote_to_roles_to_devs: {}".format(remote_to_roles_to_devs))
+ for osd_role, dev_name in remote_to_roles_to_devs.items():
+ assert dev_name, "{} has no associated device!".format(osd_role)
+
+ log.info('Generating config...')
+ remotes_and_roles = ctx.cluster.remotes.items()
+ roles = [role_list for (remote, role_list) in remotes_and_roles]
+ ips = [host for (host, port) in
+ (remote.ssh.get_transport().getpeername() for (remote, role_list) in remotes_and_roles)]
+ mons = get_mons(
+ roles, ips, cluster_name,
+ mon_bind_msgr2=config.get('mon_bind_msgr2'),
+ mon_bind_addrvec=config.get('mon_bind_addrvec'),
+ )
+ conf = skeleton_config(
+ ctx, roles=roles, ips=ips, mons=mons, cluster=cluster_name,
+ )
+ for section, keys in config['conf'].items():
+ for key, value in keys.items():
+ log.info("[%s] %s = %s" % (section, key, value))
+ if section not in conf:
+ conf[section] = {}
+ conf[section][key] = value
+
+ if not hasattr(ctx, 'ceph'):
+ ctx.ceph = {}
+ ctx.ceph[cluster_name] = argparse.Namespace()
+ ctx.ceph[cluster_name].conf = conf
+ ctx.ceph[cluster_name].mons = mons
+
+ default_keyring = '/etc/ceph/{cluster}.keyring'.format(cluster=cluster_name)
+ keyring_path = config.get('keyring_path', default_keyring)
+
+ coverage_dir = '{tdir}/archive/coverage'.format(tdir=testdir)
+
+ firstmon = teuthology.get_first_mon(ctx, config, cluster_name)
+
+ log.info('Setting up %s...' % firstmon)
+ ctx.cluster.only(firstmon).run(
+ args=[
+ 'sudo',
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ coverage_dir,
+ 'ceph-authtool',
+ '--create-keyring',
+ keyring_path,
+ ],
+ )
+ ctx.cluster.only(firstmon).run(
+ args=[
+ 'sudo',
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ coverage_dir,
+ 'ceph-authtool',
+ '--gen-key',
+ '--name=mon.',
+ keyring_path,
+ ],
+ )
+ ctx.cluster.only(firstmon).run(
+ args=[
+ 'sudo',
+ 'chmod',
+ '0644',
+ keyring_path,
+ ],
+ )
+ (mon0_remote,) = ctx.cluster.only(firstmon).remotes.keys()
+ monmap_path = '{tdir}/{cluster}.monmap'.format(tdir=testdir,
+ cluster=cluster_name)
+ fsid = create_simple_monmap(
+ ctx,
+ remote=mon0_remote,
+ conf=conf,
+ mons=mons,
+ path=monmap_path,
+ mon_bind_addrvec=config.get('mon_bind_addrvec'),
+ )
+ ctx.ceph[cluster_name].fsid = fsid
+ if not 'global' in conf:
+ conf['global'] = {}
+ conf['global']['fsid'] = fsid
+
+ default_conf_path = '/etc/ceph/{cluster}.conf'.format(cluster=cluster_name)
+ conf_path = config.get('conf_path', default_conf_path)
+ log.info('Writing %s for FSID %s...' % (conf_path, fsid))
+ write_conf(ctx, conf_path, cluster_name)
+
+ log.info('Creating admin key on %s...' % firstmon)
+ ctx.cluster.only(firstmon).run(
+ args=[
+ 'sudo',
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ coverage_dir,
+ 'ceph-authtool',
+ '--gen-key',
+ '--name=client.admin',
+ '--cap', 'mon', 'allow *',
+ '--cap', 'osd', 'allow *',
+ '--cap', 'mds', 'allow *',
+ '--cap', 'mgr', 'allow *',
+ keyring_path,
+ ],
+ )
+
+ log.info('Copying monmap to all nodes...')
+ keyring = mon0_remote.read_file(keyring_path)
+ monmap = mon0_remote.read_file(monmap_path)
+
+ for rem in ctx.cluster.remotes.keys():
+ # copy mon key and initial monmap
+ log.info('Sending monmap to node {remote}'.format(remote=rem))
+ rem.write_file(keyring_path, keyring, mode='0644', sudo=True)
+ rem.write_file(monmap_path, monmap)
+
+ log.info('Setting up mon nodes...')
+ mons = ctx.cluster.only(teuthology.is_type('mon', cluster_name))
+
+ if not config.get('skip_mgr_daemons', False):
+ log.info('Setting up mgr nodes...')
+ mgrs = ctx.cluster.only(teuthology.is_type('mgr', cluster_name))
+ for remote, roles_for_host in mgrs.remotes.items():
+ for role in teuthology.cluster_roles_of_type(roles_for_host, 'mgr',
+ cluster_name):
+ _, _, id_ = teuthology.split_role(role)
+ mgr_dir = DATA_PATH.format(
+ type_='mgr', cluster=cluster_name, id_=id_)
+ remote.run(
+ args=[
+ 'sudo',
+ 'mkdir',
+ '-p',
+ mgr_dir,
+ run.Raw('&&'),
+ 'sudo',
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ coverage_dir,
+ 'ceph-authtool',
+ '--create-keyring',
+ '--gen-key',
+ '--name=mgr.{id}'.format(id=id_),
+ mgr_dir + '/keyring',
+ ],
+ )
+
+ log.info('Setting up mds nodes...')
+ mdss = ctx.cluster.only(teuthology.is_type('mds', cluster_name))
+ for remote, roles_for_host in mdss.remotes.items():
+ for role in teuthology.cluster_roles_of_type(roles_for_host, 'mds',
+ cluster_name):
+ _, _, id_ = teuthology.split_role(role)
+ mds_dir = DATA_PATH.format(
+ type_='mds', cluster=cluster_name, id_=id_)
+ remote.run(
+ args=[
+ 'sudo',
+ 'mkdir',
+ '-p',
+ mds_dir,
+ run.Raw('&&'),
+ 'sudo',
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ coverage_dir,
+ 'ceph-authtool',
+ '--create-keyring',
+ '--gen-key',
+ '--name=mds.{id}'.format(id=id_),
+ mds_dir + '/keyring',
+ ],
+ )
+ remote.run(args=[
+ 'sudo', 'chown', '-R', 'ceph:ceph', mds_dir
+ ])
+
+ cclient.create_keyring(ctx, cluster_name)
+ log.info('Running mkfs on osd nodes...')
+
+ if not hasattr(ctx, 'disk_config'):
+ ctx.disk_config = argparse.Namespace()
+ if not hasattr(ctx.disk_config, 'remote_to_roles_to_dev'):
+ ctx.disk_config.remote_to_roles_to_dev = {}
+ if not hasattr(ctx.disk_config, 'remote_to_roles_to_dev_mount_options'):
+ ctx.disk_config.remote_to_roles_to_dev_mount_options = {}
+ if not hasattr(ctx.disk_config, 'remote_to_roles_to_dev_fstype'):
+ ctx.disk_config.remote_to_roles_to_dev_fstype = {}
+
+ teuthology.deep_merge(ctx.disk_config.remote_to_roles_to_dev, remote_to_roles_to_devs)
+
+ log.info("ctx.disk_config.remote_to_roles_to_dev: {r}".format(r=str(ctx.disk_config.remote_to_roles_to_dev)))
+
+ for remote, roles_for_host in osds.remotes.items():
+ roles_to_devs = remote_to_roles_to_devs[remote]
+
+ for role in teuthology.cluster_roles_of_type(roles_for_host, 'osd', cluster_name):
+ _, _, id_ = teuthology.split_role(role)
+ mnt_point = DATA_PATH.format(
+ type_='osd', cluster=cluster_name, id_=id_)
+ remote.run(
+ args=[
+ 'sudo',
+ 'mkdir',
+ '-p',
+ mnt_point,
+ ])
+ log.info('roles_to_devs: {}'.format(roles_to_devs))
+ log.info('role: {}'.format(role))
+ if roles_to_devs.get(role):
+ dev = roles_to_devs[role]
+ fs = config.get('fs')
+ package = None
+ mkfs_options = config.get('mkfs_options')
+ mount_options = config.get('mount_options')
+ if fs == 'btrfs':
+ # package = 'btrfs-tools'
+ if mount_options is None:
+ mount_options = ['noatime', 'user_subvol_rm_allowed']
+ if mkfs_options is None:
+ mkfs_options = ['-m', 'single',
+ '-l', '32768',
+ '-n', '32768']
+ if fs == 'xfs':
+ # package = 'xfsprogs'
+ if mount_options is None:
+ mount_options = ['noatime']
+ if mkfs_options is None:
+ mkfs_options = ['-f', '-i', 'size=2048']
+ if fs == 'ext4' or fs == 'ext3':
+ if mount_options is None:
+ mount_options = ['noatime', 'user_xattr']
+
+ if mount_options is None:
+ mount_options = []
+ if mkfs_options is None:
+ mkfs_options = []
+ mkfs = ['mkfs.%s' % fs] + mkfs_options
+ log.info('%s on %s on %s' % (mkfs, dev, remote))
+ if package is not None:
+ remote.sh('sudo apt-get install -y %s' % package)
+
+ try:
+ remote.run(args=['yes', run.Raw('|')] + ['sudo'] + mkfs + [dev])
+ except run.CommandFailedError:
+ # Newer btfs-tools doesn't prompt for overwrite, use -f
+ if '-f' not in mount_options:
+ mkfs_options.append('-f')
+ mkfs = ['mkfs.%s' % fs] + mkfs_options
+ log.info('%s on %s on %s' % (mkfs, dev, remote))
+ remote.run(args=['yes', run.Raw('|')] + ['sudo'] + mkfs + [dev])
+
+ log.info('mount %s on %s -o %s' % (dev, remote,
+ ','.join(mount_options)))
+ remote.run(
+ args=[
+ 'sudo',
+ 'mount',
+ '-t', fs,
+ '-o', ','.join(mount_options),
+ dev,
+ mnt_point,
+ ]
+ )
+ remote.run(
+ args=[
+ 'sudo', '/sbin/restorecon', mnt_point,
+ ],
+ check_status=False,
+ )
+ if not remote in ctx.disk_config.remote_to_roles_to_dev_mount_options:
+ ctx.disk_config.remote_to_roles_to_dev_mount_options[remote] = {}
+ ctx.disk_config.remote_to_roles_to_dev_mount_options[remote][role] = mount_options
+ if not remote in ctx.disk_config.remote_to_roles_to_dev_fstype:
+ ctx.disk_config.remote_to_roles_to_dev_fstype[remote] = {}
+ ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role] = fs
+ devs_to_clean[remote].append(mnt_point)
+
+ for role in teuthology.cluster_roles_of_type(roles_for_host, 'osd', cluster_name):
+ _, _, id_ = teuthology.split_role(role)
+ try:
+ args = ['sudo',
+ 'MALLOC_CHECK_=3',
+ 'adjust-ulimits',
+ 'ceph-coverage', coverage_dir,
+ 'ceph-osd',
+ '--no-mon-config',
+ '--cluster', cluster_name,
+ '--mkfs',
+ '--mkkey',
+ '-i', id_,
+ '--monmap', monmap_path]
+ log_path = f'/var/log/ceph/{cluster_name}-osd.{id_}.log'
+ create_log_cmd, args = \
+ maybe_redirect_stderr(config, 'osd', args, log_path)
+ if create_log_cmd:
+ remote.sh(create_log_cmd)
+ remote.run(args=args)
+ except run.CommandFailedError:
+ # try without --no-mon-config.. this may be an upgrade test
+ remote.run(
+ args=[
+ 'sudo',
+ 'MALLOC_CHECK_=3',
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ coverage_dir,
+ 'ceph-osd',
+ '--cluster',
+ cluster_name,
+ '--mkfs',
+ '--mkkey',
+ '-i', id_,
+ '--monmap', monmap_path,
+ ],
+ )
+ mnt_point = DATA_PATH.format(
+ type_='osd', cluster=cluster_name, id_=id_)
+ remote.run(args=[
+ 'sudo', 'chown', '-R', 'ceph:ceph', mnt_point
+ ])
+
+ log.info('Reading keys from all nodes...')
+ keys_fp = BytesIO()
+ keys = []
+ for remote, roles_for_host in ctx.cluster.remotes.items():
+ for type_ in ['mgr', 'mds', 'osd']:
+ if type_ == 'mgr' and config.get('skip_mgr_daemons', False):
+ continue
+ for role in teuthology.cluster_roles_of_type(roles_for_host, type_, cluster_name):
+ _, _, id_ = teuthology.split_role(role)
+ data = remote.read_file(
+ os.path.join(
+ DATA_PATH.format(
+ type_=type_, id_=id_, cluster=cluster_name),
+ 'keyring',
+ ),
+ sudo=True,
+ )
+ keys.append((type_, id_, data))
+ keys_fp.write(data)
+ for remote, roles_for_host in ctx.cluster.remotes.items():
+ for role in teuthology.cluster_roles_of_type(roles_for_host, 'client', cluster_name):
+ _, _, id_ = teuthology.split_role(role)
+ data = remote.read_file(
+ '/etc/ceph/{cluster}.client.{id}.keyring'.format(id=id_, cluster=cluster_name)
+ )
+ keys.append(('client', id_, data))
+ keys_fp.write(data)
+
+ log.info('Adding keys to all mons...')
+ writes = mons.run(
+ args=[
+ 'sudo', 'tee', '-a',
+ keyring_path,
+ ],
+ stdin=run.PIPE,
+ wait=False,
+ stdout=BytesIO(),
+ )
+ keys_fp.seek(0)
+ teuthology.feed_many_stdins_and_close(keys_fp, writes)
+ run.wait(writes)
+ for type_, id_, data in keys:
+ run.wait(
+ mons.run(
+ args=[
+ 'sudo',
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ coverage_dir,
+ 'ceph-authtool',
+ keyring_path,
+ '--name={type}.{id}'.format(
+ type=type_,
+ id=id_,
+ ),
+ ] + list(generate_caps(type_)),
+ wait=False,
+ ),
+ )
+
+ log.info('Running mkfs on mon nodes...')
+ for remote, roles_for_host in mons.remotes.items():
+ for role in teuthology.cluster_roles_of_type(roles_for_host, 'mon', cluster_name):
+ _, _, id_ = teuthology.split_role(role)
+ mnt_point = DATA_PATH.format(
+ type_='mon', id_=id_, cluster=cluster_name)
+ remote.run(
+ args=[
+ 'sudo',
+ 'mkdir',
+ '-p',
+ mnt_point,
+ ],
+ )
+ remote.run(
+ args=[
+ 'sudo',
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ coverage_dir,
+ 'ceph-mon',
+ '--cluster', cluster_name,
+ '--mkfs',
+ '-i', id_,
+ '--monmap', monmap_path,
+ '--keyring', keyring_path,
+ ],
+ )
+ remote.run(args=[
+ 'sudo', 'chown', '-R', 'ceph:ceph', mnt_point
+ ])
+
+ run.wait(
+ mons.run(
+ args=[
+ 'rm',
+ '--',
+ monmap_path,
+ ],
+ wait=False,
+ ),
+ )
+
+ try:
+ yield
+ except Exception:
+ # we need to know this below
+ ctx.summary['success'] = False
+ raise
+ finally:
+ (mon0_remote,) = ctx.cluster.only(firstmon).remotes.keys()
+
+ log.info('Checking cluster log for badness...')
+
+ def first_in_ceph_log(pattern, excludes):
+ """
+ Find the first occurrence of the pattern specified in the Ceph log,
+ Returns None if none found.
+
+ :param pattern: Pattern scanned for.
+ :param excludes: Patterns to ignore.
+ :return: First line of text (or None if not found)
+ """
+ args = [
+ 'sudo',
+ 'egrep', pattern,
+ '/var/log/ceph/{cluster}.log'.format(cluster=cluster_name),
+ ]
+ for exclude in excludes:
+ args.extend([run.Raw('|'), 'egrep', '-v', exclude])
+ args.extend([
+ run.Raw('|'), 'head', '-n', '1',
+ ])
+ stdout = mon0_remote.sh(args)
+ return stdout or None
+
+ if first_in_ceph_log('\[ERR\]|\[WRN\]|\[SEC\]',
+ config['log_ignorelist']) is not None:
+ log.warning('Found errors (ERR|WRN|SEC) in cluster log')
+ ctx.summary['success'] = False
+ # use the most severe problem as the failure reason
+ if 'failure_reason' not in ctx.summary:
+ for pattern in ['\[SEC\]', '\[ERR\]', '\[WRN\]']:
+ match = first_in_ceph_log(pattern, config['log_ignorelist'])
+ if match is not None:
+ ctx.summary['failure_reason'] = \
+ '"{match}" in cluster log'.format(
+ match=match.rstrip('\n'),
+ )
+ break
+
+ for remote, dirs in devs_to_clean.items():
+ for dir_ in dirs:
+ log.info('Unmounting %s on %s' % (dir_, remote))
+ try:
+ remote.run(
+ args=[
+ 'sync',
+ run.Raw('&&'),
+ 'sudo',
+ 'umount',
+ '-f',
+ dir_
+ ]
+ )
+ except Exception as e:
+ remote.run(args=[
+ 'sudo',
+ run.Raw('PATH=/usr/sbin:$PATH'),
+ 'lsof',
+ run.Raw(';'),
+ 'ps', 'auxf',
+ ])
+ raise e
+
+ if ctx.archive is not None and \
+ not (ctx.config.get('archive-on-error') and ctx.summary['success']):
+
+ # archive mon data, too
+ log.info('Archiving mon data...')
+ path = os.path.join(ctx.archive, 'data')
+ try:
+ os.makedirs(path)
+ except OSError as e:
+ if e.errno == errno.EEXIST:
+ pass
+ else:
+ raise
+ for remote, roles in mons.remotes.items():
+ for role in roles:
+ is_mon = teuthology.is_type('mon', cluster_name)
+ if is_mon(role):
+ _, _, id_ = teuthology.split_role(role)
+ mon_dir = DATA_PATH.format(
+ type_='mon', id_=id_, cluster=cluster_name)
+ teuthology.pull_directory_tarball(
+ remote,
+ mon_dir,
+ path + '/' + role + '.tgz')
+
+ log.info('Cleaning ceph cluster...')
+ run.wait(
+ ctx.cluster.run(
+ args=[
+ 'sudo',
+ 'rm',
+ '-rf',
+ '--',
+ conf_path,
+ keyring_path,
+ data_dir,
+ monmap_path,
+ run.Raw('{tdir}/../*.pid'.format(tdir=testdir)),
+ ],
+ wait=False,
+ ),
+ )
+
+
+def osd_scrub_pgs(ctx, config):
+ """
+ Scrub pgs when we exit.
+
+ First make sure all pgs are active and clean.
+ Next scrub all osds.
+ Then periodically check until all pgs have scrub time stamps that
+ indicate the last scrub completed. Time out if no progress is made
+ here after two minutes.
+ """
+ retries = 40
+ delays = 20
+ cluster_name = config['cluster']
+ manager = ctx.managers[cluster_name]
+ for _ in range(retries):
+ stats = manager.get_pg_stats()
+ unclean = [stat['pgid'] for stat in stats if 'active+clean' not in stat['state']]
+ split_merge = []
+ osd_dump = manager.get_osd_dump_json()
+ try:
+ split_merge = [i['pool_name'] for i in osd_dump['pools'] if i['pg_num'] != i['pg_num_target']]
+ except KeyError:
+ # we don't support pg_num_target before nautilus
+ pass
+ if not unclean and not split_merge:
+ break
+ waiting_on = []
+ if unclean:
+ waiting_on.append(f'{unclean} to go clean')
+ if split_merge:
+ waiting_on.append(f'{split_merge} to split/merge')
+ waiting_on = ' and '.join(waiting_on)
+ log.info('Waiting for all PGs to be active+clean and split+merged, waiting on %s', waiting_on)
+ time.sleep(delays)
+ else:
+ raise RuntimeError("Scrubbing terminated -- not all pgs were active and clean.")
+ check_time_now = time.localtime()
+ time.sleep(1)
+ all_roles = teuthology.all_roles(ctx.cluster)
+ for role in teuthology.cluster_roles_of_type(all_roles, 'osd', cluster_name):
+ log.info("Scrubbing {osd}".format(osd=role))
+ _, _, id_ = teuthology.split_role(role)
+ # allow this to fail; in certain cases the OSD might not be up
+ # at this point. we will catch all pgs below.
+ try:
+ manager.raw_cluster_cmd('tell', 'osd.' + id_, 'config', 'set',
+ 'osd_debug_deep_scrub_sleep', '0');
+ manager.raw_cluster_cmd('osd', 'deep-scrub', id_)
+ except run.CommandFailedError:
+ pass
+ prev_good = 0
+ gap_cnt = 0
+ loop = True
+ while loop:
+ stats = manager.get_pg_stats()
+ timez = [(stat['pgid'],stat['last_scrub_stamp']) for stat in stats]
+ loop = False
+ thiscnt = 0
+ re_scrub = []
+ for (pgid, tmval) in timez:
+ t = tmval[0:tmval.find('.')].replace(' ', 'T')
+ pgtm = time.strptime(t, '%Y-%m-%dT%H:%M:%S')
+ if pgtm > check_time_now:
+ thiscnt += 1
+ else:
+ log.info('pgid %s last_scrub_stamp %s %s <= %s', pgid, tmval, pgtm, check_time_now)
+ loop = True
+ re_scrub.append(pgid)
+ if thiscnt > prev_good:
+ prev_good = thiscnt
+ gap_cnt = 0
+ else:
+ gap_cnt += 1
+ if gap_cnt % 6 == 0:
+ for pgid in re_scrub:
+ # re-request scrub every so often in case the earlier
+ # request was missed. do not do it every time because
+ # the scrub may be in progress or not reported yet and
+ # we will starve progress.
+ manager.raw_cluster_cmd('pg', 'deep-scrub', pgid)
+ if gap_cnt > retries:
+ raise RuntimeError('Exiting scrub checking -- not all pgs scrubbed.')
+ if loop:
+ log.info('Still waiting for all pgs to be scrubbed.')
+ time.sleep(delays)
+
+
+@contextlib.contextmanager
+def run_daemon(ctx, config, type_):
+ """
+ Run daemons for a role type. Handle the startup and termination of a a daemon.
+ On startup -- set coverages, cpu_profile, valgrind values for all remotes,
+ and a max_mds value for one mds.
+ On cleanup -- Stop all existing daemons of this type.
+
+ :param ctx: Context
+ :param config: Configuration
+ :param type_: Role type
+ """
+ cluster_name = config['cluster']
+ log.info('Starting %s daemons in cluster %s...', type_, cluster_name)
+ testdir = teuthology.get_testdir(ctx)
+ daemons = ctx.cluster.only(teuthology.is_type(type_, cluster_name))
+
+ # check whether any daemons if this type are configured
+ if daemons is None:
+ return
+ coverage_dir = '{tdir}/archive/coverage'.format(tdir=testdir)
+
+ daemon_signal = 'kill'
+ if config.get('coverage') or config.get('valgrind') is not None:
+ daemon_signal = 'term'
+
+ # create osds in order. (this only matters for pre-luminous, which might
+ # be jewel/hammer, which doesn't take an id_ argument to legacy 'osd create').
+ osd_uuids = {}
+ for remote, roles_for_host in daemons.remotes.items():
+ is_type_ = teuthology.is_type(type_, cluster_name)
+ for role in roles_for_host:
+ if not is_type_(role):
+ continue
+ _, _, id_ = teuthology.split_role(role)
+
+
+ if type_ == 'osd':
+ datadir='/var/lib/ceph/osd/{cluster}-{id}'.format(
+ cluster=cluster_name, id=id_)
+ osd_uuid = remote.read_file(
+ datadir + '/fsid', sudo=True).decode().strip()
+ osd_uuids[id_] = osd_uuid
+ for osd_id in range(len(osd_uuids)):
+ id_ = str(osd_id)
+ osd_uuid = osd_uuids.get(id_)
+ try:
+ remote.run(
+ args=[
+ 'sudo', 'ceph', '--cluster', cluster_name,
+ 'osd', 'new', osd_uuid, id_,
+ ]
+ )
+ except:
+ # fallback to pre-luminous (jewel)
+ remote.run(
+ args=[
+ 'sudo', 'ceph', '--cluster', cluster_name,
+ 'osd', 'create', osd_uuid,
+ ]
+ )
+ if config.get('add_osds_to_crush'):
+ remote.run(
+ args=[
+ 'sudo', 'ceph', '--cluster', cluster_name,
+ 'osd', 'crush', 'create-or-move', 'osd.' + id_,
+ '1.0', 'host=localhost', 'root=default',
+ ]
+ )
+
+ for remote, roles_for_host in daemons.remotes.items():
+ is_type_ = teuthology.is_type(type_, cluster_name)
+ for role in roles_for_host:
+ if not is_type_(role):
+ continue
+ _, _, id_ = teuthology.split_role(role)
+
+ run_cmd = [
+ 'sudo',
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ coverage_dir,
+ 'daemon-helper',
+ daemon_signal,
+ ]
+ run_cmd_tail = [
+ 'ceph-%s' % (type_),
+ '-f',
+ '--cluster', cluster_name,
+ '-i', id_]
+
+ if type_ in config.get('cpu_profile', []):
+ profile_path = '/var/log/ceph/profiling-logger/%s.prof' % (role)
+ run_cmd.extend(['env', 'CPUPROFILE=%s' % profile_path])
+
+ vc = config.get('valgrind')
+ if vc is not None:
+ valgrind_args = None
+ if type_ in vc:
+ valgrind_args = vc[type_]
+ if role in vc:
+ valgrind_args = vc[role]
+ exit_on_first_error = vc.get('exit_on_first_error', True)
+ run_cmd = get_valgrind_args(testdir, role, run_cmd, valgrind_args,
+ exit_on_first_error=exit_on_first_error)
+
+ run_cmd.extend(run_cmd_tail)
+ log_path = f'/var/log/ceph/{cluster_name}-{type_}.{id_}.log'
+ create_log_cmd, run_cmd = \
+ maybe_redirect_stderr(config, type_, run_cmd, log_path)
+ if create_log_cmd:
+ remote.sh(create_log_cmd)
+ # always register mgr; don't necessarily start
+ ctx.daemons.register_daemon(
+ remote, type_, id_,
+ cluster=cluster_name,
+ args=run_cmd,
+ logger=log.getChild(role),
+ stdin=run.PIPE,
+ wait=False
+ )
+ if type_ != 'mgr' or not config.get('skip_mgr_daemons', False):
+ role = cluster_name + '.' + type_
+ ctx.daemons.get_daemon(type_, id_, cluster_name).restart()
+
+ # kludge: run any pre-manager commands
+ if type_ == 'mon':
+ for cmd in config.get('pre-mgr-commands', []):
+ firstmon = teuthology.get_first_mon(ctx, config, cluster_name)
+ (remote,) = ctx.cluster.only(firstmon).remotes.keys()
+ remote.run(args=cmd.split(' '))
+
+ try:
+ yield
+ finally:
+ teuthology.stop_daemons_of_type(ctx, type_, cluster_name)
+
+
+def healthy(ctx, config):
+ """
+ Wait for all osd's to be up, and for the ceph health monitor to return HEALTH_OK.
+
+ :param ctx: Context
+ :param config: Configuration
+ """
+ config = config if isinstance(config, dict) else dict()
+ cluster_name = config.get('cluster', 'ceph')
+ log.info('Waiting until %s daemons up and pgs clean...', cluster_name)
+ manager = ctx.managers[cluster_name]
+ try:
+ manager.wait_for_mgr_available(timeout=30)
+ except (run.CommandFailedError, AssertionError) as e:
+ log.info('ignoring mgr wait error, probably testing upgrade: %s', e)
+
+ manager.wait_for_all_osds_up(timeout=300)
+
+ try:
+ manager.flush_all_pg_stats()
+ except (run.CommandFailedError, Exception) as e:
+ log.info('ignoring flush pg stats error, probably testing upgrade: %s', e)
+ manager.wait_for_clean()
+
+ if config.get('wait-for-healthy', True):
+ log.info('Waiting until ceph cluster %s is healthy...', cluster_name)
+ manager.wait_until_healthy(timeout=300)
+
+ if ctx.cluster.only(teuthology.is_type('mds', cluster_name)).remotes:
+ # Some MDSs exist, wait for them to be healthy
+ for fs in Filesystem.get_all_fs(ctx):
+ fs.wait_for_daemons(timeout=300)
+
+def wait_for_mon_quorum(ctx, config):
+ """
+ Check renote ceph status until all monitors are up.
+
+ :param ctx: Context
+ :param config: Configuration
+ """
+ if isinstance(config, dict):
+ mons = config['daemons']
+ cluster_name = config.get('cluster', 'ceph')
+ else:
+ assert isinstance(config, list)
+ mons = config
+ cluster_name = 'ceph'
+ firstmon = teuthology.get_first_mon(ctx, config, cluster_name)
+ (remote,) = ctx.cluster.only(firstmon).remotes.keys()
+ with contextutil.safe_while(sleep=10, tries=60,
+ action='wait for monitor quorum') as proceed:
+ while proceed():
+ quorum_status = remote.sh('sudo ceph quorum_status',
+ logger=log.getChild('quorum_status'))
+ j = json.loads(quorum_status)
+ q = j.get('quorum_names', [])
+ log.debug('Quorum: %s', q)
+ if sorted(q) == sorted(mons):
+ break
+
+
+def created_pool(ctx, config):
+ """
+ Add new pools to the dictionary of pools that the ceph-manager
+ knows about.
+ """
+ for new_pool in config:
+ if new_pool not in ctx.managers['ceph'].pools:
+ ctx.managers['ceph'].pools[new_pool] = ctx.managers['ceph'].get_pool_int_property(
+ new_pool, 'pg_num')
+
+
+@contextlib.contextmanager
+def suppress_mon_health_to_clog(ctx, config):
+ """
+ set the option, and then restore it with its original value
+
+ Note, due to the way how tasks are executed/nested, it's not suggested to
+ use this method as a standalone task. otherwise, it's likely that it will
+ restore the tweaked option at the /end/ of 'tasks' block.
+ """
+ if config.get('mon-health-to-clog', 'true') == 'false':
+ cluster = config.get('cluster', 'ceph')
+ manager = ctx.managers[cluster]
+ manager.raw_cluster_command(
+ 'config', 'set', 'mon', 'mon_health_to_clog', 'false'
+ )
+ yield
+ manager.raw_cluster_command(
+ 'config', 'rm', 'mon', 'mon_health_to_clog'
+ )
+ else:
+ yield
+
+@contextlib.contextmanager
+def restart(ctx, config):
+ """
+ restart ceph daemons
+
+ For example::
+ tasks:
+ - ceph.restart: [all]
+
+ For example::
+ tasks:
+ - ceph.restart: [osd.0, mon.1, mds.*]
+
+ or::
+
+ tasks:
+ - ceph.restart:
+ daemons: [osd.0, mon.1]
+ wait-for-healthy: false
+ wait-for-osds-up: true
+
+ :param ctx: Context
+ :param config: Configuration
+ """
+ if config is None:
+ config = {}
+ elif isinstance(config, list):
+ config = {'daemons': config}
+
+ daemons = ctx.daemons.resolve_role_list(config.get('daemons', None), CEPH_ROLE_TYPES, True)
+ clusters = set()
+
+ with suppress_mon_health_to_clog(ctx, config):
+ for role in daemons:
+ cluster, type_, id_ = teuthology.split_role(role)
+ ctx.daemons.get_daemon(type_, id_, cluster).stop()
+ if type_ == 'osd':
+ ctx.managers[cluster].mark_down_osd(id_)
+ ctx.daemons.get_daemon(type_, id_, cluster).restart()
+ clusters.add(cluster)
+
+ if config.get('wait-for-healthy', True):
+ for cluster in clusters:
+ healthy(ctx=ctx, config=dict(cluster=cluster))
+ if config.get('wait-for-osds-up', False):
+ for cluster in clusters:
+ ctx.managers[cluster].wait_for_all_osds_up()
+ if config.get('expected-failure') is not None:
+ log.info('Checking for expected-failure in osds logs after restart...')
+ expected_fail = config.get('expected-failure')
+ is_osd = teuthology.is_type('osd')
+ for role in daemons:
+ if not is_osd(role):
+ continue
+ (remote,) = ctx.cluster.only(role).remotes.keys()
+ cluster, type_, id_ = teuthology.split_role(role)
+ remote.run(
+ args = ['sudo',
+ 'egrep', expected_fail,
+ '/var/log/ceph/{cluster}-{type_}.{id_}.log'.format(cluster=cluster, type_=type_, id_=id_),
+ ])
+ yield
+
+
+@contextlib.contextmanager
+def stop(ctx, config):
+ """
+ Stop ceph daemons
+
+ For example::
+ tasks:
+ - ceph.stop: [mds.*]
+
+ tasks:
+ - ceph.stop: [osd.0, osd.2]
+
+ tasks:
+ - ceph.stop:
+ daemons: [osd.0, osd.2]
+
+ """
+ if config is None:
+ config = {}
+ elif isinstance(config, list):
+ config = {'daemons': config}
+
+ daemons = ctx.daemons.resolve_role_list(config.get('daemons', None), CEPH_ROLE_TYPES, True)
+ clusters = set()
+
+ for role in daemons:
+ cluster, type_, id_ = teuthology.split_role(role)
+ ctx.daemons.get_daemon(type_, id_, cluster).stop()
+ clusters.add(cluster)
+
+
+ for cluster in clusters:
+ ctx.ceph[cluster].watchdog.stop()
+ ctx.ceph[cluster].watchdog.join()
+
+ yield
+
+
+@contextlib.contextmanager
+def wait_for_failure(ctx, config):
+ """
+ Wait for a failure of a ceph daemon
+
+ For example::
+ tasks:
+ - ceph.wait_for_failure: [mds.*]
+
+ tasks:
+ - ceph.wait_for_failure: [osd.0, osd.2]
+
+ tasks:
+ - ceph.wait_for_failure:
+ daemons: [osd.0, osd.2]
+
+ """
+ if config is None:
+ config = {}
+ elif isinstance(config, list):
+ config = {'daemons': config}
+
+ daemons = ctx.daemons.resolve_role_list(config.get('daemons', None), CEPH_ROLE_TYPES, True)
+ for role in daemons:
+ cluster, type_, id_ = teuthology.split_role(role)
+ try:
+ ctx.daemons.get_daemon(type_, id_, cluster).wait()
+ except:
+ log.info('Saw expected daemon failure. Continuing.')
+ pass
+ else:
+ raise RuntimeError('daemon %s did not fail' % role)
+
+ yield
+
+
+def validate_config(ctx, config):
+ """
+ Perform some simple validation on task configuration.
+ Raises exceptions.ConfigError if an error is found.
+ """
+ # check for osds from multiple clusters on the same host
+ for remote, roles_for_host in ctx.cluster.remotes.items():
+ last_cluster = None
+ last_role = None
+ for role in roles_for_host:
+ role_cluster, role_type, _ = teuthology.split_role(role)
+ if role_type != 'osd':
+ continue
+ if last_cluster and last_cluster != role_cluster:
+ msg = "Host should not have osds (%s and %s) from multiple clusters" % (
+ last_role, role)
+ raise exceptions.ConfigError(msg)
+ last_cluster = role_cluster
+ last_role = role
+
+
+@contextlib.contextmanager
+def task(ctx, config):
+ """
+ Set up and tear down a Ceph cluster.
+
+ For example::
+
+ tasks:
+ - ceph:
+ - interactive:
+
+ You can also specify what branch to run::
+
+ tasks:
+ - ceph:
+ branch: foo
+
+ Or a tag::
+
+ tasks:
+ - ceph:
+ tag: v0.42.13
+
+ Or a sha1::
+
+ tasks:
+ - ceph:
+ sha1: 1376a5ab0c89780eab39ffbbe436f6a6092314ed
+
+ Or a local source dir::
+
+ tasks:
+ - ceph:
+ path: /home/sage/ceph
+
+ To capture code coverage data, use::
+
+ tasks:
+ - ceph:
+ coverage: true
+
+ To use btrfs, ext4, or xfs on the target's scratch disks, use::
+
+ tasks:
+ - ceph:
+ fs: xfs
+ mkfs_options: [-b,size=65536,-l,logdev=/dev/sdc1]
+ mount_options: [nobarrier, inode64]
+
+ To change the cephfs's default max_mds (1), use::
+
+ tasks:
+ - ceph:
+ cephfs:
+ max_mds: 2
+
+ To change the max_mds of a specific filesystem, use::
+
+ tasks:
+ - ceph:
+ cephfs:
+ max_mds: 2
+ fs:
+ - name: a
+ max_mds: 3
+ - name: b
+
+ In the above example, filesystem 'a' will have 'max_mds' 3,
+ and filesystme 'b' will have 'max_mds' 2.
+
+ To change the mdsmap's default session_timeout (60 seconds), use::
+
+ tasks:
+ - ceph:
+ cephfs:
+ session_timeout: 300
+
+ Note, this will cause the task to check the /scratch_devs file on each node
+ for available devices. If no such file is found, /dev/sdb will be used.
+
+ To run some daemons under valgrind, include their names
+ and the tool/args to use in a valgrind section::
+
+ tasks:
+ - ceph:
+ valgrind:
+ mds.1: --tool=memcheck
+ osd.1: [--tool=memcheck, --leak-check=no]
+
+ Those nodes which are using memcheck or valgrind will get
+ checked for bad results.
+
+ To adjust or modify config options, use::
+
+ tasks:
+ - ceph:
+ conf:
+ section:
+ key: value
+
+ For example::
+
+ tasks:
+ - ceph:
+ conf:
+ mds.0:
+ some option: value
+ other key: other value
+ client.0:
+ debug client: 10
+ debug ms: 1
+
+ By default, the cluster log is checked for errors and warnings,
+ and the run marked failed if any appear. You can ignore log
+ entries by giving a list of egrep compatible regexes, i.e.:
+
+ tasks:
+ - ceph:
+ log-ignorelist: ['foo.*bar', 'bad message']
+
+ To run multiple ceph clusters, use multiple ceph tasks, and roles
+ with a cluster name prefix, e.g. cluster1.client.0. Roles with no
+ cluster use the default cluster name, 'ceph'. OSDs from separate
+ clusters must be on separate hosts. Clients and non-osd daemons
+ from multiple clusters may be colocated. For each cluster, add an
+ instance of the ceph task with the cluster name specified, e.g.::
+
+ roles:
+ - [mon.a, osd.0, osd.1]
+ - [backup.mon.a, backup.osd.0, backup.osd.1]
+ - [client.0, backup.client.0]
+ tasks:
+ - ceph:
+ cluster: ceph
+ - ceph:
+ cluster: backup
+
+ :param ctx: Context
+ :param config: Configuration
+
+ """
+ if config is None:
+ config = {}
+ assert isinstance(config, dict), \
+ "task ceph only supports a dictionary for configuration"
+
+ overrides = ctx.config.get('overrides', {})
+ teuthology.deep_merge(config, overrides.get('ceph', {}))
+
+ first_ceph_cluster = False
+ if not hasattr(ctx, 'daemons'):
+ first_ceph_cluster = True
+ ctx.daemons = DaemonGroup()
+
+ testdir = teuthology.get_testdir(ctx)
+ if config.get('coverage'):
+ coverage_dir = '{tdir}/archive/coverage'.format(tdir=testdir)
+ log.info('Creating coverage directory...')
+ run.wait(
+ ctx.cluster.run(
+ args=[
+ 'install', '-d', '-m0755', '--',
+ coverage_dir,
+ ],
+ wait=False,
+ )
+ )
+
+ if 'cluster' not in config:
+ config['cluster'] = 'ceph'
+
+ validate_config(ctx, config)
+
+ subtasks = []
+ if first_ceph_cluster:
+ # these tasks handle general log setup and parsing on all hosts,
+ # so they should only be run once
+ subtasks = [
+ lambda: ceph_log(ctx=ctx, config=None),
+ lambda: ceph_crash(ctx=ctx, config=None),
+ lambda: valgrind_post(ctx=ctx, config=config),
+ ]
+
+ subtasks += [
+ lambda: cluster(ctx=ctx, config=dict(
+ conf=config.get('conf', {}),
+ fs=config.get('fs', 'xfs'),
+ mkfs_options=config.get('mkfs_options', None),
+ mount_options=config.get('mount_options', None),
+ skip_mgr_daemons=config.get('skip_mgr_daemons', False),
+ log_ignorelist=config.get('log-ignorelist', []),
+ cpu_profile=set(config.get('cpu_profile', []),),
+ cluster=config['cluster'],
+ mon_bind_msgr2=config.get('mon_bind_msgr2', True),
+ mon_bind_addrvec=config.get('mon_bind_addrvec', True),
+ )),
+ lambda: run_daemon(ctx=ctx, config=config, type_='mon'),
+ lambda: run_daemon(ctx=ctx, config=config, type_='mgr'),
+ lambda: crush_setup(ctx=ctx, config=config),
+ lambda: check_enable_crimson(ctx=ctx, config=config),
+ lambda: run_daemon(ctx=ctx, config=config, type_='osd'),
+ lambda: setup_manager(ctx=ctx, config=config),
+ lambda: create_rbd_pool(ctx=ctx, config=config),
+ lambda: run_daemon(ctx=ctx, config=config, type_='mds'),
+ lambda: cephfs_setup(ctx=ctx, config=config),
+ lambda: watchdog_setup(ctx=ctx, config=config),
+ ]
+
+ with contextutil.nested(*subtasks):
+ try:
+ if config.get('wait-for-healthy', True):
+ healthy(ctx=ctx, config=dict(cluster=config['cluster']))
+
+ yield
+ finally:
+ # set pg_num_targets back to actual pg_num, so we don't have to
+ # wait for pending merges (which can take a while!)
+ if not config.get('skip_stop_pg_num_changes', True):
+ ctx.managers[config['cluster']].stop_pg_num_changes()
+
+ if config.get('wait-for-scrub', True):
+ # wait for pgs to become active+clean in case any
+ # recoveries were triggered since the last health check
+ ctx.managers[config['cluster']].wait_for_clean()
+ osd_scrub_pgs(ctx, config)
+
+ # stop logging health to clog during shutdown, or else we generate
+ # a bunch of scary messages unrelated to our actual run.
+ firstmon = teuthology.get_first_mon(ctx, config, config['cluster'])
+ (mon0_remote,) = ctx.cluster.only(firstmon).remotes.keys()
+ mon0_remote.run(
+ args=[
+ 'sudo',
+ 'ceph',
+ '--cluster', config['cluster'],
+ 'config', 'set', 'global',
+ 'mon_health_to_clog', 'false',
+ ],
+ check_status=False,
+ )