diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rocksdb/tools/db_crashtest.py | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/tools/db_crashtest.py | 405 |
1 files changed, 405 insertions, 0 deletions
diff --git a/src/rocksdb/tools/db_crashtest.py b/src/rocksdb/tools/db_crashtest.py new file mode 100644 index 00000000..dbabb2b4 --- /dev/null +++ b/src/rocksdb/tools/db_crashtest.py @@ -0,0 +1,405 @@ +#! /usr/bin/env python +import os +import sys +import time +import random +import tempfile +import subprocess +import shutil +import argparse + +# params overwrite priority: +# for default: +# default_params < {blackbox,whitebox}_default_params < args +# for simple: +# default_params < {blackbox,whitebox}_default_params < +# simple_default_params < +# {blackbox,whitebox}_simple_default_params < args +# for enable_atomic_flush: +# default_params < {blackbox,whitebox}_default_params < +# atomic_flush_params < args + +expected_values_file = tempfile.NamedTemporaryFile() + +default_params = { + "acquire_snapshot_one_in": 10000, + "block_size": 16384, + "cache_size": 1048576, + "checkpoint_one_in": 1000000, + "compression_type": "snappy", + "compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1), + "compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1), + "clear_column_family_one_in": 0, + "compact_files_one_in": 1000000, + "compact_range_one_in": 1000000, + "delpercent": 4, + "delrangepercent": 1, + "destroy_db_initially": 0, + "enable_pipelined_write": lambda: random.randint(0, 1), + "expected_values_path": expected_values_file.name, + "flush_one_in": 1000000, + "max_background_compactions": 20, + "max_bytes_for_level_base": 10485760, + "max_key": 100000000, + "max_write_buffer_number": 3, + "mmap_read": lambda: random.randint(0, 1), + "nooverwritepercent": 1, + "open_files": 500000, + "prefixpercent": 5, + "progress_reports": 0, + "readpercent": 45, + "recycle_log_file_num": lambda: random.randint(0, 1), + "reopen": 20, + "snapshot_hold_ops": 100000, + "subcompactions": lambda: random.randint(1, 4), + "target_file_size_base": 2097152, + "target_file_size_multiplier": 2, + "use_direct_reads": lambda: random.randint(0, 1), + "use_direct_io_for_flush_and_compaction": lambda: random.randint(0, 1), + "use_full_merge_v1": lambda: random.randint(0, 1), + "use_merge": lambda: random.randint(0, 1), + "verify_checksum": 1, + "write_buffer_size": 4 * 1024 * 1024, + "writepercent": 35, + "format_version": lambda: random.randint(2, 4), + "index_block_restart_interval": lambda: random.choice(range(1, 16)), +} + +_TEST_DIR_ENV_VAR = 'TEST_TMPDIR' + + +def get_dbname(test_name): + test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR) + if test_tmpdir is None or test_tmpdir == "": + dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_' + test_name) + else: + dbname = test_tmpdir + "/rocksdb_crashtest_" + test_name + shutil.rmtree(dbname, True) + os.mkdir(dbname) + return dbname + + +def is_direct_io_supported(dbname): + with tempfile.NamedTemporaryFile(dir=dbname) as f: + try: + os.open(f.name, os.O_DIRECT) + except: + return False + return True + + +blackbox_default_params = { + # total time for this script to test db_stress + "duration": 6000, + # time for one db_stress instance to run + "interval": 120, + # since we will be killing anyway, use large value for ops_per_thread + "ops_per_thread": 100000000, + "set_options_one_in": 10000, + "test_batches_snapshots": 1, +} + +whitebox_default_params = { + "duration": 10000, + "log2_keys_per_lock": 10, + "ops_per_thread": 200000, + "random_kill_odd": 888887, + "test_batches_snapshots": lambda: random.randint(0, 1), +} + +simple_default_params = { + "allow_concurrent_memtable_write": lambda: random.randint(0, 1), + "column_families": 1, + "max_background_compactions": 1, + "max_bytes_for_level_base": 67108864, + "memtablerep": "skip_list", + "prefixpercent": 25, + "readpercent": 25, + "target_file_size_base": 16777216, + "target_file_size_multiplier": 1, + "test_batches_snapshots": 0, + "write_buffer_size": 32 * 1024 * 1024, +} + +blackbox_simple_default_params = { + "open_files": -1, + "set_options_one_in": 0, +} + +whitebox_simple_default_params = {} + +atomic_flush_params = { + "disable_wal": 1, + "reopen": 0, + "test_atomic_flush": 1, + # use small value for write_buffer_size so that RocksDB triggers flush + # more frequently + "write_buffer_size": 1024 * 1024, +} + + +def finalize_and_sanitize(src_params): + dest_params = dict([(k, v() if callable(v) else v) + for (k, v) in src_params.items()]) + if dest_params.get("compression_type") != "zstd" or \ + dest_params.get("compression_max_dict_bytes") == 0: + dest_params["compression_zstd_max_train_bytes"] = 0 + if dest_params.get("allow_concurrent_memtable_write", 1) == 1: + dest_params["memtablerep"] = "skip_list" + if dest_params["mmap_read"] == 1 or not is_direct_io_supported( + dest_params["db"]): + dest_params["use_direct_io_for_flush_and_compaction"] = 0 + dest_params["use_direct_reads"] = 0 + if dest_params.get("test_batches_snapshots") == 1: + dest_params["delpercent"] += dest_params["delrangepercent"] + dest_params["delrangepercent"] = 0 + return dest_params + + +def gen_cmd_params(args): + params = {} + + params.update(default_params) + if args.test_type == 'blackbox': + params.update(blackbox_default_params) + if args.test_type == 'whitebox': + params.update(whitebox_default_params) + if args.simple: + params.update(simple_default_params) + if args.test_type == 'blackbox': + params.update(blackbox_simple_default_params) + if args.test_type == 'whitebox': + params.update(whitebox_simple_default_params) + if args.enable_atomic_flush: + params.update(atomic_flush_params) + + for k, v in vars(args).items(): + if v is not None: + params[k] = v + return params + + +def gen_cmd(params, unknown_params): + cmd = ['./db_stress'] + [ + '--{0}={1}'.format(k, v) + for k, v in finalize_and_sanitize(params).items() + if k not in set(['test_type', 'simple', 'duration', 'interval', + 'random_kill_odd', 'enable_atomic_flush']) + and v is not None] + unknown_params + return cmd + + +# This script runs and kills db_stress multiple times. It checks consistency +# in case of unsafe crashes in RocksDB. +def blackbox_crash_main(args, unknown_args): + cmd_params = gen_cmd_params(args) + dbname = get_dbname('blackbox') + exit_time = time.time() + cmd_params['duration'] + + print("Running blackbox-crash-test with \n" + + "interval_between_crash=" + str(cmd_params['interval']) + "\n" + + "total-duration=" + str(cmd_params['duration']) + "\n") + + while time.time() < exit_time: + run_had_errors = False + killtime = time.time() + cmd_params['interval'] + + cmd = gen_cmd(dict( + cmd_params.items() + + {'db': dbname}.items()), unknown_args) + + child = subprocess.Popen(cmd, stderr=subprocess.PIPE) + print("Running db_stress with pid=%d: %s\n\n" + % (child.pid, ' '.join(cmd))) + + stop_early = False + while time.time() < killtime: + if child.poll() is not None: + print("WARNING: db_stress ended before kill: exitcode=%d\n" + % child.returncode) + stop_early = True + break + time.sleep(1) + + if not stop_early: + if child.poll() is not None: + print("WARNING: db_stress ended before kill: exitcode=%d\n" + % child.returncode) + else: + child.kill() + print("KILLED %d\n" % child.pid) + time.sleep(1) # time to stabilize after a kill + + while True: + line = child.stderr.readline().strip() + if line == '': + break + elif not line.startswith('WARNING'): + run_had_errors = True + print('stderr has error message:') + print('***' + line + '***') + + if run_had_errors: + sys.exit(2) + + time.sleep(1) # time to stabilize before the next run + + # we need to clean up after ourselves -- only do this on test success + shutil.rmtree(dbname, True) + + +# This python script runs db_stress multiple times. Some runs with +# kill_random_test that causes rocksdb to crash at various points in code. +def whitebox_crash_main(args, unknown_args): + cmd_params = gen_cmd_params(args) + dbname = get_dbname('whitebox') + + cur_time = time.time() + exit_time = cur_time + cmd_params['duration'] + half_time = cur_time + cmd_params['duration'] / 2 + + print("Running whitebox-crash-test with \n" + + "total-duration=" + str(cmd_params['duration']) + "\n") + + total_check_mode = 4 + check_mode = 0 + kill_random_test = cmd_params['random_kill_odd'] + kill_mode = 0 + + while time.time() < exit_time: + if check_mode == 0: + additional_opts = { + # use large ops per thread since we will kill it anyway + "ops_per_thread": 100 * cmd_params['ops_per_thread'], + } + # run with kill_random_test, with three modes. + # Mode 0 covers all kill points. Mode 1 covers less kill points but + # increases change of triggering them. Mode 2 covers even less + # frequent kill points and further increases triggering change. + if kill_mode == 0: + additional_opts.update({ + "kill_random_test": kill_random_test, + }) + elif kill_mode == 1: + additional_opts.update({ + "kill_random_test": (kill_random_test / 10 + 1), + "kill_prefix_blacklist": "WritableFileWriter::Append," + + "WritableFileWriter::WriteBuffered", + }) + elif kill_mode == 2: + # TODO: May need to adjust random odds if kill_random_test + # is too small. + additional_opts.update({ + "kill_random_test": (kill_random_test / 5000 + 1), + "kill_prefix_blacklist": "WritableFileWriter::Append," + "WritableFileWriter::WriteBuffered," + "PosixMmapFile::Allocate,WritableFileWriter::Flush", + }) + # Run kill mode 0, 1 and 2 by turn. + kill_mode = (kill_mode + 1) % 3 + elif check_mode == 1: + # normal run with universal compaction mode + additional_opts = { + "kill_random_test": None, + "ops_per_thread": cmd_params['ops_per_thread'], + "compaction_style": 1, + } + elif check_mode == 2: + # normal run with FIFO compaction mode + # ops_per_thread is divided by 5 because FIFO compaction + # style is quite a bit slower on reads with lot of files + additional_opts = { + "kill_random_test": None, + "ops_per_thread": cmd_params['ops_per_thread'] / 5, + "compaction_style": 2, + } + else: + # normal run + additional_opts = { + "kill_random_test": None, + "ops_per_thread": cmd_params['ops_per_thread'], + } + + cmd = gen_cmd(dict(cmd_params.items() + additional_opts.items() + + {'db': dbname}.items()), unknown_args) + + print "Running:" + ' '.join(cmd) + "\n" # noqa: E999 T25377293 Grandfathered in + + popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + stdoutdata, stderrdata = popen.communicate() + retncode = popen.returncode + msg = ("check_mode={0}, kill option={1}, exitcode={2}\n".format( + check_mode, additional_opts['kill_random_test'], retncode)) + print msg + print stdoutdata + + expected = False + if additional_opts['kill_random_test'] is None and (retncode == 0): + # we expect zero retncode if no kill option + expected = True + elif additional_opts['kill_random_test'] is not None and retncode < 0: + # we expect negative retncode if kill option was given + expected = True + + if not expected: + print "TEST FAILED. See kill option and exit code above!!!\n" + sys.exit(1) + + stdoutdata = stdoutdata.lower() + errorcount = (stdoutdata.count('error') - + stdoutdata.count('got errors 0 times')) + print "#times error occurred in output is " + str(errorcount) + "\n" + + if (errorcount > 0): + print "TEST FAILED. Output has 'error'!!!\n" + sys.exit(2) + if (stdoutdata.find('fail') >= 0): + print "TEST FAILED. Output has 'fail'!!!\n" + sys.exit(2) + + # First half of the duration, keep doing kill test. For the next half, + # try different modes. + if time.time() > half_time: + # we need to clean up after ourselves -- only do this on test + # success + shutil.rmtree(dbname, True) + os.mkdir(dbname) + cmd_params.pop('expected_values_path', None) + check_mode = (check_mode + 1) % total_check_mode + + time.sleep(1) # time to stabilize after a kill + + +def main(): + parser = argparse.ArgumentParser(description="This script runs and kills \ + db_stress multiple times") + parser.add_argument("test_type", choices=["blackbox", "whitebox"]) + parser.add_argument("--simple", action="store_true") + parser.add_argument("--enable_atomic_flush", action='store_true') + + all_params = dict(default_params.items() + + blackbox_default_params.items() + + whitebox_default_params.items() + + simple_default_params.items() + + blackbox_simple_default_params.items() + + whitebox_simple_default_params.items()) + + for k, v in all_params.items(): + parser.add_argument("--" + k, type=type(v() if callable(v) else v)) + # unknown_args are passed directly to db_stress + args, unknown_args = parser.parse_known_args() + + test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR) + if test_tmpdir is not None and not os.path.isdir(test_tmpdir): + print('%s env var is set to a non-existent directory: %s' % + (_TEST_DIR_ENV_VAR, test_tmpdir)) + sys.exit(1) + + if args.test_type == 'blackbox': + blackbox_crash_main(args, unknown_args) + if args.test_type == 'whitebox': + whitebox_crash_main(args, unknown_args) + +if __name__ == '__main__': + main() |