diff options
Diffstat (limited to 'src/rocksdb/tools/advisor')
30 files changed, 3902 insertions, 0 deletions
diff --git a/src/rocksdb/tools/advisor/README.md b/src/rocksdb/tools/advisor/README.md new file mode 100644 index 000000000..f1e7165e4 --- /dev/null +++ b/src/rocksdb/tools/advisor/README.md @@ -0,0 +1,96 @@ +# Rocksdb Tuning Advisor + +## Motivation + +The performance of Rocksdb is contingent on its tuning. However, +because of the complexity of its underlying technology and a large number of +configurable parameters, a good configuration is sometimes hard to obtain. The aim of +the python command-line tool, Rocksdb Advisor, is to automate the process of +suggesting improvements in the configuration based on advice from Rocksdb +experts. + +## Overview + +Experts share their wisdom as rules comprising of conditions and suggestions in the INI format (refer +[rules.ini](https://github.com/facebook/rocksdb/blob/master/tools/advisor/advisor/rules.ini)). +Users provide the Rocksdb configuration that they want to improve upon (as the +familiar Rocksdb OPTIONS file — +[example](https://github.com/facebook/rocksdb/blob/master/examples/rocksdb_option_file_example.ini)) +and the path of the file which contains Rocksdb logs and statistics. +The [Advisor](https://github.com/facebook/rocksdb/blob/master/tools/advisor/advisor/rule_parser_example.py) +creates appropriate DataSource objects (for Rocksdb +[logs](https://github.com/facebook/rocksdb/blob/master/tools/advisor/advisor/db_log_parser.py), +[options](https://github.com/facebook/rocksdb/blob/master/tools/advisor/advisor/db_options_parser.py), +[statistics](https://github.com/facebook/rocksdb/blob/master/tools/advisor/advisor/db_stats_fetcher.py) etc.) +and provides them to the [Rules Engine](https://github.com/facebook/rocksdb/blob/master/tools/advisor/advisor/rule_parser.py). +The Rules uses rules from experts to parse data-sources and trigger appropriate rules. +The Advisor's output gives information about which rules were triggered, +why they were triggered and what each of them suggests. Each suggestion +provided by a triggered rule advises some action on a Rocksdb +configuration option, for example, increase CFOptions.write_buffer_size, +set bloom_bits to 2 etc. + +## Usage + +### Prerequisites +The tool needs the following to run: +* python3 + +### Running the tool +An example command to run the tool: + +```shell +cd rocksdb/tools/advisor +python3 -m advisor.rule_parser_example --rules_spec=advisor/rules.ini --rocksdb_options=test/input_files/OPTIONS-000005 --log_files_path_prefix=test/input_files/LOG-0 --stats_dump_period_sec=20 +``` + +### Command-line arguments + +Most important amongst all the input that the Advisor needs, are the rules +spec and starting Rocksdb configuration. The configuration is provided as the +familiar Rocksdb Options file (refer [example](https://github.com/facebook/rocksdb/blob/master/examples/rocksdb_option_file_example.ini)). +The Rules spec is written in the INI format (more details in +[rules.ini](https://github.com/facebook/rocksdb/blob/master/tools/advisor/advisor/rules.ini)). + +In brief, a Rule is made of conditions and is triggered when all its +constituent conditions are triggered. When triggered, a Rule suggests changes +(increase/decrease/set to a suggested value) to certain Rocksdb options that +aim to improve Rocksdb performance. Every Condition has a 'source' i.e. +the data source that would be checked for triggering that condition. +For example, a log Condition (with 'source=LOG') is triggered if a particular +'regex' is found in the Rocksdb LOG files. As of now the Rules Engine +supports 3 types of Conditions (and consequently data-sources): +LOG, OPTIONS, TIME_SERIES. The TIME_SERIES data can be sourced from the +Rocksdb [statistics](https://github.com/facebook/rocksdb/blob/master/include/rocksdb/statistics.h) +or [perf context](https://github.com/facebook/rocksdb/blob/master/include/rocksdb/perf_context.h). + +For more information about the remaining command-line arguments, run: + +```shell +cd rocksdb/tools/advisor +python3 -m advisor.rule_parser_example --help +``` + +### Sample output + +Here, a Rocksdb log-based rule has been triggered: + +```shell +Rule: stall-too-many-memtables +LogCondition: stall-too-many-memtables regex: Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ +Suggestion: inc-bg-flush option : DBOptions.max_background_flushes action : increase suggested_values : ['2'] +Suggestion: inc-write-buffer option : CFOptions.max_write_buffer_number action : increase +scope: col_fam: +{'default'} +``` + +## Running the tests + +Tests for the code have been added to the +[test/](https://github.com/facebook/rocksdb/tree/master/tools/advisor/test) +directory. For example, to run the unit tests for db_log_parser.py: + +```shell +cd rocksdb/tools/advisor +python3 -m unittest -v test.test_db_log_parser +``` diff --git a/src/rocksdb/tools/advisor/advisor/__init__.py b/src/rocksdb/tools/advisor/advisor/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/__init__.py diff --git a/src/rocksdb/tools/advisor/advisor/bench_runner.py b/src/rocksdb/tools/advisor/advisor/bench_runner.py new file mode 100644 index 000000000..7c7ee7882 --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/bench_runner.py @@ -0,0 +1,39 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +from abc import ABC, abstractmethod +import re + + +class BenchmarkRunner(ABC): + @staticmethod + @abstractmethod + def is_metric_better(new_metric, old_metric): + pass + + @abstractmethod + def run_experiment(self): + # should return a list of DataSource objects + pass + + @staticmethod + def get_info_log_file_name(log_dir, db_path): + # Example: DB Path = /dev/shm and OPTIONS file has option + # db_log_dir=/tmp/rocks/, then the name of the log file will be + # 'dev_shm_LOG' and its location will be /tmp/rocks. If db_log_dir is + # not specified in the OPTIONS file, then the location of the log file + # will be /dev/shm and the name of the file will be 'LOG' + file_name = '' + if log_dir: + # refer GetInfoLogPrefix() in rocksdb/util/filename.cc + # example db_path: /dev/shm/dbbench + file_name = db_path[1:] # to ignore the leading '/' character + to_be_replaced = re.compile('[^0-9a-zA-Z\-_\.]') + for character in to_be_replaced.findall(db_path): + file_name = file_name.replace(character, '_') + if not file_name.endswith('_'): + file_name += '_' + file_name += 'LOG' + return file_name diff --git a/src/rocksdb/tools/advisor/advisor/config_optimizer_example.py b/src/rocksdb/tools/advisor/advisor/config_optimizer_example.py new file mode 100644 index 000000000..e3736387e --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/config_optimizer_example.py @@ -0,0 +1,134 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import argparse +from advisor.db_config_optimizer import ConfigOptimizer +from advisor.db_log_parser import NO_COL_FAMILY +from advisor.db_options_parser import DatabaseOptions +from advisor.rule_parser import RulesSpec + + +CONFIG_OPT_NUM_ITER = 10 + + +def main(args): + # initialise the RulesSpec parser + rule_spec_parser = RulesSpec(args.rules_spec) + # initialise the benchmark runner + bench_runner_module = __import__( + args.benchrunner_module, fromlist=[args.benchrunner_class] + ) + bench_runner_class = getattr(bench_runner_module, args.benchrunner_class) + ods_args = {} + if args.ods_client and args.ods_entity: + ods_args['client_script'] = args.ods_client + ods_args['entity'] = args.ods_entity + if args.ods_key_prefix: + ods_args['key_prefix'] = args.ods_key_prefix + db_bench_runner = bench_runner_class(args.benchrunner_pos_args, ods_args) + # initialise the database configuration + db_options = DatabaseOptions(args.rocksdb_options, args.misc_options) + # set the frequency at which stats are dumped in the LOG file and the + # location of the LOG file. + db_log_dump_settings = { + "DBOptions.stats_dump_period_sec": { + NO_COL_FAMILY: args.stats_dump_period_sec + } + } + db_options.update_options(db_log_dump_settings) + # initialise the configuration optimizer + config_optimizer = ConfigOptimizer( + db_bench_runner, + db_options, + rule_spec_parser, + args.base_db_path + ) + # run the optimiser to improve the database configuration for given + # benchmarks, with the help of expert-specified rules + final_db_options = config_optimizer.run() + # generate the final rocksdb options file + print( + 'Final configuration in: ' + + final_db_options.generate_options_config('final') + ) + print( + 'Final miscellaneous options: ' + + repr(final_db_options.get_misc_options()) + ) + + +if __name__ == '__main__': + ''' + An example run of this tool from the command-line would look like: + python3 -m advisor.config_optimizer_example + --base_db_path=/tmp/rocksdbtest-155919/dbbench + --rocksdb_options=temp/OPTIONS_boot.tmp --misc_options bloom_bits=2 + --rules_spec=advisor/rules.ini --stats_dump_period_sec=20 + --benchrunner_module=advisor.db_bench_runner + --benchrunner_class=DBBenchRunner --benchrunner_pos_args ./../../db_bench + readwhilewriting use_existing_db=true duration=90 + ''' + parser = argparse.ArgumentParser(description='This script is used for\ + searching for a better database configuration') + parser.add_argument( + '--rocksdb_options', required=True, type=str, + help='path of the starting Rocksdb OPTIONS file' + ) + # these are options that are column-family agnostic and are not yet + # supported by the Rocksdb Options file: eg. bloom_bits=2 + parser.add_argument( + '--misc_options', nargs='*', + help='whitespace-separated list of options that are not supported ' + + 'by the Rocksdb OPTIONS file, given in the ' + + '<option_name>=<option_value> format eg. "bloom_bits=2 ' + + 'rate_limiter_bytes_per_sec=128000000"') + parser.add_argument( + '--base_db_path', required=True, type=str, + help='path for the Rocksdb database' + ) + parser.add_argument( + '--rules_spec', required=True, type=str, + help='path of the file containing the expert-specified Rules' + ) + parser.add_argument( + '--stats_dump_period_sec', required=True, type=int, + help='the frequency (in seconds) at which STATISTICS are printed to ' + + 'the Rocksdb LOG file' + ) + # ODS arguments + parser.add_argument( + '--ods_client', type=str, help='the ODS client binary' + ) + parser.add_argument( + '--ods_entity', type=str, + help='the servers for which the ODS stats need to be fetched' + ) + parser.add_argument( + '--ods_key_prefix', type=str, + help='the prefix that needs to be attached to the keys of time ' + + 'series to be fetched from ODS' + ) + # benchrunner_module example: advisor.db_benchmark_client + parser.add_argument( + '--benchrunner_module', required=True, type=str, + help='the module containing the BenchmarkRunner class to be used by ' + + 'the Optimizer, example: advisor.db_bench_runner' + ) + # benchrunner_class example: DBBenchRunner + parser.add_argument( + '--benchrunner_class', required=True, type=str, + help='the name of the BenchmarkRunner class to be used by the ' + + 'Optimizer, should be present in the module provided in the ' + + 'benchrunner_module argument, example: DBBenchRunner' + ) + parser.add_argument( + '--benchrunner_pos_args', nargs='*', + help='whitespace-separated positional arguments that are passed on ' + + 'to the constructor of the BenchmarkRunner class provided in the ' + + 'benchrunner_class argument, example: "use_existing_db=true ' + + 'duration=900"' + ) + args = parser.parse_args() + main(args) diff --git a/src/rocksdb/tools/advisor/advisor/db_bench_runner.py b/src/rocksdb/tools/advisor/advisor/db_bench_runner.py new file mode 100644 index 000000000..54424440b --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/db_bench_runner.py @@ -0,0 +1,245 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +from advisor.bench_runner import BenchmarkRunner +from advisor.db_log_parser import DataSource, DatabaseLogs, NO_COL_FAMILY +from advisor.db_stats_fetcher import ( + LogStatsParser, OdsStatsFetcher, DatabasePerfContext +) +import shutil +import subprocess +import time + + +''' +NOTE: This is not thread-safe, because the output file is simply overwritten. +''' + + +class DBBenchRunner(BenchmarkRunner): + OUTPUT_FILE = "temp/dbbench_out.tmp" + ERROR_FILE = "temp/dbbench_err.tmp" + DB_PATH = "DB path" + THROUGHPUT = "ops/sec" + PERF_CON = " PERF_CONTEXT:" + + @staticmethod + def is_metric_better(new_metric, old_metric): + # for db_bench 'throughput' is the metric returned by run_experiment + return new_metric >= old_metric + + @staticmethod + def get_opt_args_str(misc_options_dict): + # given a dictionary of options and their values, return a string + # that can be appended as command-line arguments + optional_args_str = "" + for option_name, option_value in misc_options_dict.items(): + if option_value: + optional_args_str += ( + " --" + option_name + "=" + str(option_value) + ) + return optional_args_str + + def __init__(self, positional_args, ods_args=None): + # parse positional_args list appropriately + self.db_bench_binary = positional_args[0] + self.benchmark = positional_args[1] + self.db_bench_args = None + if len(positional_args) > 2: + # options list with each option given as "<option>=<value>" + self.db_bench_args = positional_args[2:] + # save ods_args, if provided + self.ods_args = ods_args + + def _parse_output(self, get_perf_context=False): + ''' + Sample db_bench output after running 'readwhilewriting' benchmark: + DB path: [/tmp/rocksdbtest-155919/dbbench]\n + readwhilewriting : 16.582 micros/op 60305 ops/sec; 4.2 MB/s (3433828\ + of 5427999 found)\n + PERF_CONTEXT:\n + user_key_comparison_count = 500466712, block_cache_hit_count = ...\n + ''' + output = { + self.THROUGHPUT: None, self.DB_PATH: None, self.PERF_CON: None + } + perf_context_begins = False + with open(self.OUTPUT_FILE, 'r') as fp: + for line in fp: + if line.startswith(self.benchmark): + # line from sample output: + # readwhilewriting : 16.582 micros/op 60305 ops/sec; \ + # 4.2 MB/s (3433828 of 5427999 found)\n + print(line) # print output of the benchmark run + token_list = line.strip().split() + for ix, token in enumerate(token_list): + if token.startswith(self.THROUGHPUT): + # in above example, throughput = 60305 ops/sec + output[self.THROUGHPUT] = ( + float(token_list[ix - 1]) + ) + break + elif get_perf_context and line.startswith(self.PERF_CON): + # the following lines in the output contain perf context + # statistics (refer example above) + perf_context_begins = True + elif get_perf_context and perf_context_begins: + # Sample perf_context output: + # user_key_comparison_count = 500, block_cache_hit_count =\ + # 468, block_read_count = 580, block_read_byte = 445, ... + token_list = line.strip().split(',') + # token_list = ['user_key_comparison_count = 500', + # 'block_cache_hit_count = 468','block_read_count = 580'... + perf_context = { + tk.split('=')[0].strip(): tk.split('=')[1].strip() + for tk in token_list + if tk + } + # TODO(poojam23): this is a hack and should be replaced + # with the timestamp that db_bench will provide per printed + # perf_context + timestamp = int(time.time()) + perf_context_ts = {} + for stat in perf_context.keys(): + perf_context_ts[stat] = { + timestamp: int(perf_context[stat]) + } + output[self.PERF_CON] = perf_context_ts + perf_context_begins = False + elif line.startswith(self.DB_PATH): + # line from sample output: + # DB path: [/tmp/rocksdbtest-155919/dbbench]\n + output[self.DB_PATH] = ( + line.split('[')[1].split(']')[0] + ) + return output + + def get_log_options(self, db_options, db_path): + # get the location of the LOG file and the frequency at which stats are + # dumped in the LOG file + log_dir_path = None + stats_freq_sec = None + logs_file_prefix = None + + # fetch frequency at which the stats are dumped in the Rocksdb logs + dump_period = 'DBOptions.stats_dump_period_sec' + # fetch the directory, if specified, in which the Rocksdb logs are + # dumped, by default logs are dumped in same location as database + log_dir = 'DBOptions.db_log_dir' + log_options = db_options.get_options([dump_period, log_dir]) + if dump_period in log_options: + stats_freq_sec = int(log_options[dump_period][NO_COL_FAMILY]) + if log_dir in log_options: + log_dir_path = log_options[log_dir][NO_COL_FAMILY] + + log_file_name = DBBenchRunner.get_info_log_file_name( + log_dir_path, db_path + ) + + if not log_dir_path: + log_dir_path = db_path + if not log_dir_path.endswith('/'): + log_dir_path += '/' + + logs_file_prefix = log_dir_path + log_file_name + return (logs_file_prefix, stats_freq_sec) + + def _get_options_command_line_args_str(self, curr_options): + ''' + This method uses the provided Rocksdb OPTIONS to create a string of + command-line arguments for db_bench. + The --options_file argument is always given and the options that are + not supported by the OPTIONS file are given as separate arguments. + ''' + optional_args_str = DBBenchRunner.get_opt_args_str( + curr_options.get_misc_options() + ) + # generate an options configuration file + options_file = curr_options.generate_options_config(nonce='12345') + optional_args_str += " --options_file=" + options_file + return optional_args_str + + def _setup_db_before_experiment(self, curr_options, db_path): + # remove destination directory if it already exists + try: + shutil.rmtree(db_path, ignore_errors=True) + except OSError as e: + print('Error: rmdir ' + e.filename + ' ' + e.strerror) + # setup database with a million keys using the fillrandom benchmark + command = "%s --benchmarks=fillrandom --db=%s --num=1000000" % ( + self.db_bench_binary, db_path + ) + args_str = self._get_options_command_line_args_str(curr_options) + command += args_str + self._run_command(command) + + def _build_experiment_command(self, curr_options, db_path): + command = "%s --benchmarks=%s --statistics --perf_level=3 --db=%s" % ( + self.db_bench_binary, self.benchmark, db_path + ) + # fetch the command-line arguments string for providing Rocksdb options + args_str = self._get_options_command_line_args_str(curr_options) + # handle the command-line args passed in the constructor, these + # arguments are specific to db_bench + for cmd_line_arg in self.db_bench_args: + args_str += (" --" + cmd_line_arg) + command += args_str + return command + + def _run_command(self, command): + out_file = open(self.OUTPUT_FILE, "w+") + err_file = open(self.ERROR_FILE, "w+") + print('executing... - ' + command) + subprocess.call(command, shell=True, stdout=out_file, stderr=err_file) + out_file.close() + err_file.close() + + def run_experiment(self, db_options, db_path): + # setup the Rocksdb database before running experiment + self._setup_db_before_experiment(db_options, db_path) + # get the command to run the experiment + command = self._build_experiment_command(db_options, db_path) + experiment_start_time = int(time.time()) + # run experiment + self._run_command(command) + experiment_end_time = int(time.time()) + # parse the db_bench experiment output + parsed_output = self._parse_output(get_perf_context=True) + + # get the log files path prefix and frequency at which Rocksdb stats + # are dumped in the logs + logs_file_prefix, stats_freq_sec = self.get_log_options( + db_options, parsed_output[self.DB_PATH] + ) + # create the Rocksbd LOGS object + db_logs = DatabaseLogs( + logs_file_prefix, db_options.get_column_families() + ) + # Create the Log STATS object + db_log_stats = LogStatsParser(logs_file_prefix, stats_freq_sec) + # Create the PerfContext STATS object + db_perf_context = DatabasePerfContext( + parsed_output[self.PERF_CON], 0, False + ) + # create the data-sources dictionary + data_sources = { + DataSource.Type.DB_OPTIONS: [db_options], + DataSource.Type.LOG: [db_logs], + DataSource.Type.TIME_SERIES: [db_log_stats, db_perf_context] + } + # Create the ODS STATS object + if self.ods_args: + key_prefix = '' + if 'key_prefix' in self.ods_args: + key_prefix = self.ods_args['key_prefix'] + data_sources[DataSource.Type.TIME_SERIES].append(OdsStatsFetcher( + self.ods_args['client_script'], + self.ods_args['entity'], + experiment_start_time, + experiment_end_time, + key_prefix + )) + # return the experiment's data-sources and throughput + return data_sources, parsed_output[self.THROUGHPUT] diff --git a/src/rocksdb/tools/advisor/advisor/db_config_optimizer.py b/src/rocksdb/tools/advisor/advisor/db_config_optimizer.py new file mode 100644 index 000000000..508c0f8fe --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/db_config_optimizer.py @@ -0,0 +1,282 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +from advisor.db_log_parser import NO_COL_FAMILY +from advisor.db_options_parser import DatabaseOptions +from advisor.rule_parser import Suggestion +import copy +import random + + +class ConfigOptimizer: + SCOPE = 'scope' + SUGG_VAL = 'suggested values' + + @staticmethod + def apply_action_on_value(old_value, action, suggested_values): + chosen_sugg_val = None + if suggested_values: + chosen_sugg_val = random.choice(list(suggested_values)) + new_value = None + if action is Suggestion.Action.set or not old_value: + assert(chosen_sugg_val) + new_value = chosen_sugg_val + else: + # For increase/decrease actions, currently the code tries to make + # a 30% change in the option's value per iteration. An addend is + # also present (+1 or -1) to handle the cases when the option's + # old value was 0 or the final int() conversion suppressed the 30% + # change made to the option + old_value = float(old_value) + mul = 0 + add = 0 + if action is Suggestion.Action.increase: + if old_value < 0: + mul = 0.7 + add = 2 + else: + mul = 1.3 + add = 2 + elif action is Suggestion.Action.decrease: + if old_value < 0: + mul = 1.3 + add = -2 + else: + mul = 0.7 + add = -2 + new_value = int(old_value * mul + add) + return new_value + + @staticmethod + def improve_db_config(options, rule, suggestions_dict): + # this method takes ONE 'rule' and applies all its suggestions on the + # appropriate options + required_options = [] + rule_suggestions = [] + for sugg_name in rule.get_suggestions(): + option = suggestions_dict[sugg_name].option + action = suggestions_dict[sugg_name].action + # A Suggestion in the rules spec must have the 'option' and + # 'action' fields defined, always call perform_checks() method + # after parsing the rules file using RulesSpec + assert(option) + assert(action) + required_options.append(option) + rule_suggestions.append(suggestions_dict[sugg_name]) + current_config = options.get_options(required_options) + # Create the updated configuration from the rule's suggestions + updated_config = {} + for sugg in rule_suggestions: + # case: when the option is not present in the current configuration + if sugg.option not in current_config: + try: + new_value = ConfigOptimizer.apply_action_on_value( + None, sugg.action, sugg.suggested_values + ) + if sugg.option not in updated_config: + updated_config[sugg.option] = {} + if DatabaseOptions.is_misc_option(sugg.option): + # this suggestion is on an option that is not yet + # supported by the Rocksdb OPTIONS file and so it is + # not prefixed by a section type. + updated_config[sugg.option][NO_COL_FAMILY] = new_value + else: + for col_fam in rule.get_trigger_column_families(): + updated_config[sugg.option][col_fam] = new_value + except AssertionError: + print( + 'WARNING(ConfigOptimizer): provide suggested_values ' + + 'for ' + sugg.option + ) + continue + # case: when the option is present in the current configuration + if NO_COL_FAMILY in current_config[sugg.option]: + old_value = current_config[sugg.option][NO_COL_FAMILY] + try: + new_value = ConfigOptimizer.apply_action_on_value( + old_value, sugg.action, sugg.suggested_values + ) + if sugg.option not in updated_config: + updated_config[sugg.option] = {} + updated_config[sugg.option][NO_COL_FAMILY] = new_value + except AssertionError: + print( + 'WARNING(ConfigOptimizer): provide suggested_values ' + + 'for ' + sugg.option + ) + else: + for col_fam in rule.get_trigger_column_families(): + old_value = None + if col_fam in current_config[sugg.option]: + old_value = current_config[sugg.option][col_fam] + try: + new_value = ConfigOptimizer.apply_action_on_value( + old_value, sugg.action, sugg.suggested_values + ) + if sugg.option not in updated_config: + updated_config[sugg.option] = {} + updated_config[sugg.option][col_fam] = new_value + except AssertionError: + print( + 'WARNING(ConfigOptimizer): provide ' + + 'suggested_values for ' + sugg.option + ) + return current_config, updated_config + + @staticmethod + def pick_rule_to_apply(rules, last_rule_name, rules_tried, backtrack): + if not rules: + print('\nNo more rules triggered!') + return None + # if the last rule provided an improvement in the database performance, + # and it was triggered again (i.e. it is present in 'rules'), then pick + # the same rule for this iteration too. + if last_rule_name and not backtrack: + for rule in rules: + if rule.name == last_rule_name: + return rule + # there was no previous rule OR the previous rule did not improve db + # performance OR it was not triggered for this iteration, + # then pick another rule that has not been tried yet + for rule in rules: + if rule.name not in rules_tried: + return rule + print('\nAll rules have been exhausted') + return None + + @staticmethod + def apply_suggestions( + triggered_rules, + current_rule_name, + rules_tried, + backtrack, + curr_options, + suggestions_dict + ): + curr_rule = ConfigOptimizer.pick_rule_to_apply( + triggered_rules, current_rule_name, rules_tried, backtrack + ) + if not curr_rule: + return tuple([None]*4) + # if a rule has been picked for improving db_config, update rules_tried + rules_tried.add(curr_rule.name) + # get updated config based on the picked rule + curr_conf, updated_conf = ConfigOptimizer.improve_db_config( + curr_options, curr_rule, suggestions_dict + ) + conf_diff = DatabaseOptions.get_options_diff(curr_conf, updated_conf) + if not conf_diff: # the current and updated configs are the same + curr_rule, rules_tried, curr_conf, updated_conf = ( + ConfigOptimizer.apply_suggestions( + triggered_rules, + None, + rules_tried, + backtrack, + curr_options, + suggestions_dict + ) + ) + print('returning from apply_suggestions') + return (curr_rule, rules_tried, curr_conf, updated_conf) + + # TODO(poojam23): check if this method is required or can we directly set + # the config equal to the curr_config + @staticmethod + def get_backtrack_config(curr_config, updated_config): + diff = DatabaseOptions.get_options_diff(curr_config, updated_config) + bt_config = {} + for option in diff: + bt_config[option] = {} + for col_fam in diff[option]: + bt_config[option][col_fam] = diff[option][col_fam][0] + print(bt_config) + return bt_config + + def __init__(self, bench_runner, db_options, rule_parser, base_db): + self.bench_runner = bench_runner + self.db_options = db_options + self.rule_parser = rule_parser + self.base_db_path = base_db + + def run(self): + # In every iteration of this method's optimization loop we pick ONE + # RULE from all the triggered rules and apply all its suggestions to + # the appropriate options. + # bootstrapping the optimizer + print('Bootstrapping optimizer:') + options = copy.deepcopy(self.db_options) + old_data_sources, old_metric = ( + self.bench_runner.run_experiment(options, self.base_db_path) + ) + print('Initial metric: ' + str(old_metric)) + self.rule_parser.load_rules_from_spec() + self.rule_parser.perform_section_checks() + triggered_rules = self.rule_parser.get_triggered_rules( + old_data_sources, options.get_column_families() + ) + print('\nTriggered:') + self.rule_parser.print_rules(triggered_rules) + backtrack = False + rules_tried = set() + curr_rule, rules_tried, curr_conf, updated_conf = ( + ConfigOptimizer.apply_suggestions( + triggered_rules, + None, + rules_tried, + backtrack, + options, + self.rule_parser.get_suggestions_dict() + ) + ) + # the optimizer loop + while curr_rule: + print('\nRule picked for next iteration:') + print(curr_rule.name) + print('\ncurrent config:') + print(curr_conf) + print('updated config:') + print(updated_conf) + options.update_options(updated_conf) + # run bench_runner with updated config + new_data_sources, new_metric = ( + self.bench_runner.run_experiment(options, self.base_db_path) + ) + print('\nnew metric: ' + str(new_metric)) + backtrack = not self.bench_runner.is_metric_better( + new_metric, old_metric + ) + # update triggered_rules, metric, data_sources, if required + if backtrack: + # revert changes to options config + print('\nBacktracking to previous configuration') + backtrack_conf = ConfigOptimizer.get_backtrack_config( + curr_conf, updated_conf + ) + options.update_options(backtrack_conf) + else: + # run advisor on new data sources + self.rule_parser.load_rules_from_spec() # reboot the advisor + self.rule_parser.perform_section_checks() + triggered_rules = self.rule_parser.get_triggered_rules( + new_data_sources, options.get_column_families() + ) + print('\nTriggered:') + self.rule_parser.print_rules(triggered_rules) + old_metric = new_metric + old_data_sources = new_data_sources + rules_tried = set() + # pick rule to work on and set curr_rule to that + curr_rule, rules_tried, curr_conf, updated_conf = ( + ConfigOptimizer.apply_suggestions( + triggered_rules, + curr_rule.name, + rules_tried, + backtrack, + options, + self.rule_parser.get_suggestions_dict() + ) + ) + # return the final database options configuration + return options diff --git a/src/rocksdb/tools/advisor/advisor/db_log_parser.py b/src/rocksdb/tools/advisor/advisor/db_log_parser.py new file mode 100644 index 000000000..efd41a81a --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/db_log_parser.py @@ -0,0 +1,131 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +from abc import ABC, abstractmethod +from calendar import timegm +from enum import Enum +import glob +import re +import time + + +NO_COL_FAMILY = 'DB_WIDE' + + +class DataSource(ABC): + class Type(Enum): + LOG = 1 + DB_OPTIONS = 2 + TIME_SERIES = 3 + + def __init__(self, type): + self.type = type + + @abstractmethod + def check_and_trigger_conditions(self, conditions): + pass + + +class Log: + @staticmethod + def is_new_log(log_line): + # The assumption is that a new log will start with a date printed in + # the below regex format. + date_regex = '\d{4}/\d{2}/\d{2}-\d{2}:\d{2}:\d{2}\.\d{6}' + return re.match(date_regex, log_line) + + def __init__(self, log_line, column_families): + token_list = log_line.strip().split() + self.time = token_list[0] + self.context = token_list[1] + self.message = " ".join(token_list[2:]) + self.column_family = None + # example log for 'default' column family: + # "2018/07/25-17:29:05.176080 7f969de68700 [db/compaction_job.cc:1634] + # [default] [JOB 3] Compacting 24@0 + 16@1 files to L1, score 6.00\n" + for col_fam in column_families: + search_for_str = '\[' + col_fam + '\]' + if re.search(search_for_str, self.message): + self.column_family = col_fam + break + if not self.column_family: + self.column_family = NO_COL_FAMILY + + def get_human_readable_time(self): + # example from a log line: '2018/07/25-11:25:45.782710' + return self.time + + def get_column_family(self): + return self.column_family + + def get_context(self): + return self.context + + def get_message(self): + return self.message + + def append_message(self, remaining_log): + self.message = self.message + '\n' + remaining_log.strip() + + def get_timestamp(self): + # example: '2018/07/25-11:25:45.782710' will be converted to the GMT + # Unix timestamp 1532517945 (note: this method assumes that self.time + # is in GMT) + hr_time = self.time + 'GMT' + timestamp = timegm(time.strptime(hr_time, "%Y/%m/%d-%H:%M:%S.%f%Z")) + return timestamp + + def __repr__(self): + return ( + 'time: ' + self.time + '; context: ' + self.context + + '; col_fam: ' + self.column_family + + '; message: ' + self.message + ) + + +class DatabaseLogs(DataSource): + def __init__(self, logs_path_prefix, column_families): + super().__init__(DataSource.Type.LOG) + self.logs_path_prefix = logs_path_prefix + self.column_families = column_families + + def trigger_conditions_for_log(self, conditions, log): + # For a LogCondition object, trigger is: + # Dict[column_family_name, List[Log]]. This explains why the condition + # was triggered and for which column families. + for cond in conditions: + if re.search(cond.regex, log.get_message(), re.IGNORECASE): + trigger = cond.get_trigger() + if not trigger: + trigger = {} + if log.get_column_family() not in trigger: + trigger[log.get_column_family()] = [] + trigger[log.get_column_family()].append(log) + cond.set_trigger(trigger) + + def check_and_trigger_conditions(self, conditions): + for file_name in glob.glob(self.logs_path_prefix + '*'): + # TODO(poojam23): find a way to distinguish between log files + # - generated in the current experiment but are labeled 'old' + # because they LOGs exceeded the file size limit AND + # - generated in some previous experiment that are also labeled + # 'old' and were not deleted for some reason + if re.search('old', file_name, re.IGNORECASE): + continue + with open(file_name, 'r') as db_logs: + new_log = None + for line in db_logs: + if Log.is_new_log(line): + if new_log: + self.trigger_conditions_for_log( + conditions, new_log + ) + new_log = Log(line, self.column_families) + else: + # To account for logs split into multiple lines + new_log.append_message(line) + # Check for the last log in the file. + if new_log: + self.trigger_conditions_for_log(conditions, new_log) diff --git a/src/rocksdb/tools/advisor/advisor/db_options_parser.py b/src/rocksdb/tools/advisor/advisor/db_options_parser.py new file mode 100644 index 000000000..e689d892a --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/db_options_parser.py @@ -0,0 +1,358 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import copy +from advisor.db_log_parser import DataSource, NO_COL_FAMILY +from advisor.ini_parser import IniParser +import os + + +class OptionsSpecParser(IniParser): + @staticmethod + def is_new_option(line): + return '=' in line + + @staticmethod + def get_section_type(line): + ''' + Example section header: [TableOptions/BlockBasedTable "default"] + Here ConfigurationOptimizer returned would be + 'TableOptions.BlockBasedTable' + ''' + section_path = line.strip()[1:-1].split()[0] + section_type = '.'.join(section_path.split('/')) + return section_type + + @staticmethod + def get_section_name(line): + # example: get_section_name('[CFOptions "default"]') + token_list = line.strip()[1:-1].split('"') + # token_list = ['CFOptions', 'default', ''] + if len(token_list) < 3: + return None + return token_list[1] # return 'default' + + @staticmethod + def get_section_str(section_type, section_name): + # Example: + # Case 1: get_section_str('DBOptions', NO_COL_FAMILY) + # Case 2: get_section_str('TableOptions.BlockBasedTable', 'default') + section_type = '/'.join(section_type.strip().split('.')) + # Case 1: section_type = 'DBOptions' + # Case 2: section_type = 'TableOptions/BlockBasedTable' + section_str = '[' + section_type + if section_name == NO_COL_FAMILY: + # Case 1: '[DBOptions]' + return (section_str + ']') + else: + # Case 2: '[TableOptions/BlockBasedTable "default"]' + return section_str + ' "' + section_name + '"]' + + @staticmethod + def get_option_str(key, values): + option_str = key + '=' + # get_option_str('db_log_dir', None), returns 'db_log_dir=' + if values: + # example: + # get_option_str('max_bytes_for_level_multiplier_additional', + # [1,1,1,1,1,1,1]), returned string: + # 'max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1' + if isinstance(values, list): + for value in values: + option_str += (str(value) + ':') + option_str = option_str[:-1] + else: + # example: get_option_str('write_buffer_size', 1048576) + # returned string: 'write_buffer_size=1048576' + option_str += str(values) + return option_str + + +class DatabaseOptions(DataSource): + + @staticmethod + def is_misc_option(option_name): + # these are miscellaneous options that are not yet supported by the + # Rocksdb options file, hence they are not prefixed with any section + # name + return '.' not in option_name + + @staticmethod + def get_options_diff(opt_old, opt_new): + # type: Dict[option, Dict[col_fam, value]] X 2 -> + # Dict[option, Dict[col_fam, Tuple(old_value, new_value)]] + # note: diff should contain a tuple of values only if they are + # different from each other + options_union = set(opt_old.keys()).union(set(opt_new.keys())) + diff = {} + for opt in options_union: + diff[opt] = {} + # if option in options_union, then it must be in one of the configs + if opt not in opt_old: + for col_fam in opt_new[opt]: + diff[opt][col_fam] = (None, opt_new[opt][col_fam]) + elif opt not in opt_new: + for col_fam in opt_old[opt]: + diff[opt][col_fam] = (opt_old[opt][col_fam], None) + else: + for col_fam in opt_old[opt]: + if col_fam in opt_new[opt]: + if opt_old[opt][col_fam] != opt_new[opt][col_fam]: + diff[opt][col_fam] = ( + opt_old[opt][col_fam], + opt_new[opt][col_fam] + ) + else: + diff[opt][col_fam] = (opt_old[opt][col_fam], None) + for col_fam in opt_new[opt]: + if col_fam in opt_old[opt]: + if opt_old[opt][col_fam] != opt_new[opt][col_fam]: + diff[opt][col_fam] = ( + opt_old[opt][col_fam], + opt_new[opt][col_fam] + ) + else: + diff[opt][col_fam] = (None, opt_new[opt][col_fam]) + if not diff[opt]: + diff.pop(opt) + return diff + + def __init__(self, rocksdb_options, misc_options=None): + super().__init__(DataSource.Type.DB_OPTIONS) + # The options are stored in the following data structure: + # Dict[section_type, Dict[section_name, Dict[option_name, value]]] + self.options_dict = None + self.column_families = None + # Load the options from the given file to a dictionary. + self.load_from_source(rocksdb_options) + # Setup the miscellaneous options expected to be List[str], where each + # element in the List has the format "<option_name>=<option_value>" + # These options are the ones that are not yet supported by the Rocksdb + # OPTIONS file, so they are provided separately + self.setup_misc_options(misc_options) + + def setup_misc_options(self, misc_options): + self.misc_options = {} + if misc_options: + for option_pair_str in misc_options: + option_name = option_pair_str.split('=')[0].strip() + option_value = option_pair_str.split('=')[1].strip() + self.misc_options[option_name] = option_value + + def load_from_source(self, options_path): + self.options_dict = {} + with open(options_path, 'r') as db_options: + for line in db_options: + line = OptionsSpecParser.remove_trailing_comment(line) + if not line: + continue + if OptionsSpecParser.is_section_header(line): + curr_sec_type = ( + OptionsSpecParser.get_section_type(line) + ) + curr_sec_name = OptionsSpecParser.get_section_name(line) + if curr_sec_type not in self.options_dict: + self.options_dict[curr_sec_type] = {} + if not curr_sec_name: + curr_sec_name = NO_COL_FAMILY + self.options_dict[curr_sec_type][curr_sec_name] = {} + # example: if the line read from the Rocksdb OPTIONS file + # is [CFOptions "default"], then the section type is + # CFOptions and 'default' is the name of a column family + # that for this database, so it's added to the list of + # column families stored in this object + if curr_sec_type == 'CFOptions': + if not self.column_families: + self.column_families = [] + self.column_families.append(curr_sec_name) + elif OptionsSpecParser.is_new_option(line): + key, value = OptionsSpecParser.get_key_value_pair(line) + self.options_dict[curr_sec_type][curr_sec_name][key] = ( + value + ) + else: + error = 'Not able to parse line in Options file.' + OptionsSpecParser.exit_with_parse_error(line, error) + + def get_misc_options(self): + # these are options that are not yet supported by the Rocksdb OPTIONS + # file, hence they are provided and stored separately + return self.misc_options + + def get_column_families(self): + return self.column_families + + def get_all_options(self): + # This method returns all the options that are stored in this object as + # a: Dict[<sec_type>.<option_name>: Dict[col_fam, option_value]] + all_options = [] + # Example: in the section header '[CFOptions "default"]' read from the + # OPTIONS file, sec_type='CFOptions' + for sec_type in self.options_dict: + for col_fam in self.options_dict[sec_type]: + for opt_name in self.options_dict[sec_type][col_fam]: + option = sec_type + '.' + opt_name + all_options.append(option) + all_options.extend(list(self.misc_options.keys())) + return self.get_options(all_options) + + def get_options(self, reqd_options): + # type: List[str] -> Dict[str, Dict[str, Any]] + # List[option] -> Dict[option, Dict[col_fam, value]] + reqd_options_dict = {} + for option in reqd_options: + if DatabaseOptions.is_misc_option(option): + # the option is not prefixed by '<section_type>.' because it is + # not yet supported by the Rocksdb OPTIONS file; so it has to + # be fetched from the misc_options dictionary + if option not in self.misc_options: + continue + if option not in reqd_options_dict: + reqd_options_dict[option] = {} + reqd_options_dict[option][NO_COL_FAMILY] = ( + self.misc_options[option] + ) + else: + # Example: option = 'TableOptions.BlockBasedTable.block_align' + # then, sec_type = 'TableOptions.BlockBasedTable' + sec_type = '.'.join(option.split('.')[:-1]) + # opt_name = 'block_align' + opt_name = option.split('.')[-1] + if sec_type not in self.options_dict: + continue + for col_fam in self.options_dict[sec_type]: + if opt_name in self.options_dict[sec_type][col_fam]: + if option not in reqd_options_dict: + reqd_options_dict[option] = {} + reqd_options_dict[option][col_fam] = ( + self.options_dict[sec_type][col_fam][opt_name] + ) + return reqd_options_dict + + def update_options(self, options): + # An example 'options' object looks like: + # {'DBOptions.max_background_jobs': {NO_COL_FAMILY: 2}, + # 'CFOptions.write_buffer_size': {'default': 1048576, 'cf_A': 128000}, + # 'bloom_bits': {NO_COL_FAMILY: 4}} + for option in options: + if DatabaseOptions.is_misc_option(option): + # this is a misc_option i.e. an option that is not yet + # supported by the Rocksdb OPTIONS file, so it is not prefixed + # by '<section_type>.' and must be stored in the separate + # misc_options dictionary + if NO_COL_FAMILY not in options[option]: + print( + 'WARNING(DatabaseOptions.update_options): not ' + + 'updating option ' + option + ' because it is in ' + + 'misc_option format but its scope is not ' + + NO_COL_FAMILY + '. Check format of option.' + ) + continue + self.misc_options[option] = options[option][NO_COL_FAMILY] + else: + sec_name = '.'.join(option.split('.')[:-1]) + opt_name = option.split('.')[-1] + if sec_name not in self.options_dict: + self.options_dict[sec_name] = {} + for col_fam in options[option]: + # if the option is not already present in the dictionary, + # it will be inserted, else it will be updated to the new + # value + if col_fam not in self.options_dict[sec_name]: + self.options_dict[sec_name][col_fam] = {} + self.options_dict[sec_name][col_fam][opt_name] = ( + copy.deepcopy(options[option][col_fam]) + ) + + def generate_options_config(self, nonce): + # this method generates a Rocksdb OPTIONS file in the INI format from + # the options stored in self.options_dict + this_path = os.path.abspath(os.path.dirname(__file__)) + file_name = '../temp/OPTIONS_' + str(nonce) + '.tmp' + file_path = os.path.join(this_path, file_name) + with open(file_path, 'w') as fp: + for section in self.options_dict: + for col_fam in self.options_dict[section]: + fp.write( + OptionsSpecParser.get_section_str(section, col_fam) + + '\n' + ) + for option in self.options_dict[section][col_fam]: + values = self.options_dict[section][col_fam][option] + fp.write( + OptionsSpecParser.get_option_str(option, values) + + '\n' + ) + fp.write('\n') + return file_path + + def check_and_trigger_conditions(self, conditions): + for cond in conditions: + reqd_options_dict = self.get_options(cond.options) + # This contains the indices of options that are specific to some + # column family and are not database-wide options. + incomplete_option_ix = [] + options = [] + missing_reqd_option = False + for ix, option in enumerate(cond.options): + if option not in reqd_options_dict: + print( + 'WARNING(DatabaseOptions.check_and_trigger): ' + + 'skipping condition ' + cond.name + ' because it ' + 'requires option ' + option + ' but this option is' + + ' not available' + ) + missing_reqd_option = True + break # required option is absent + if NO_COL_FAMILY in reqd_options_dict[option]: + options.append(reqd_options_dict[option][NO_COL_FAMILY]) + else: + options.append(None) + incomplete_option_ix.append(ix) + + if missing_reqd_option: + continue + + # if all the options are database-wide options + if not incomplete_option_ix: + try: + if eval(cond.eval_expr): + cond.set_trigger({NO_COL_FAMILY: options}) + except Exception as e: + print( + 'WARNING(DatabaseOptions) check_and_trigger:' + str(e) + ) + continue + + # for all the options that are not database-wide, we look for their + # values specific to column families + col_fam_options_dict = {} + for col_fam in self.column_families: + present = True + for ix in incomplete_option_ix: + option = cond.options[ix] + if col_fam not in reqd_options_dict[option]: + present = False + break + options[ix] = reqd_options_dict[option][col_fam] + if present: + try: + if eval(cond.eval_expr): + col_fam_options_dict[col_fam] = ( + copy.deepcopy(options) + ) + except Exception as e: + print( + 'WARNING(DatabaseOptions) check_and_trigger: ' + + str(e) + ) + # Trigger for an OptionCondition object is of the form: + # Dict[col_fam_name: List[option_value]] + # where col_fam_name is the name of a column family for which + # 'eval_expr' evaluated to True and List[option_value] is the list + # of values of the options specified in the condition's 'options' + # field + if col_fam_options_dict: + cond.set_trigger(col_fam_options_dict) diff --git a/src/rocksdb/tools/advisor/advisor/db_stats_fetcher.py b/src/rocksdb/tools/advisor/advisor/db_stats_fetcher.py new file mode 100755 index 000000000..cf497cf1f --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/db_stats_fetcher.py @@ -0,0 +1,338 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +from advisor.db_log_parser import Log +from advisor.db_timeseries_parser import TimeSeriesData, NO_ENTITY +import copy +import glob +import re +import subprocess +import time + + +class LogStatsParser(TimeSeriesData): + STATS = 'STATISTICS:' + + @staticmethod + def parse_log_line_for_stats(log_line): + # Example stat line (from LOG file): + # "rocksdb.db.get.micros P50 : 8.4 P95 : 21.8 P99 : 33.9 P100 : 92.0\n" + token_list = log_line.strip().split() + # token_list = ['rocksdb.db.get.micros', 'P50', ':', '8.4', 'P95', ':', + # '21.8', 'P99', ':', '33.9', 'P100', ':', '92.0'] + stat_prefix = token_list[0] + '.' # 'rocksdb.db.get.micros.' + stat_values = [ + token + for token in token_list[1:] + if token != ':' + ] + # stat_values = ['P50', '8.4', 'P95', '21.8', 'P99', '33.9', 'P100', + # '92.0'] + stat_dict = {} + for ix, metric in enumerate(stat_values): + if ix % 2 == 0: + stat_name = stat_prefix + metric + stat_name = stat_name.lower() # Note: case insensitive names + else: + stat_dict[stat_name] = float(metric) + # stat_dict = {'rocksdb.db.get.micros.p50': 8.4, + # 'rocksdb.db.get.micros.p95': 21.8, 'rocksdb.db.get.micros.p99': 33.9, + # 'rocksdb.db.get.micros.p100': 92.0} + return stat_dict + + def __init__(self, logs_path_prefix, stats_freq_sec): + super().__init__() + self.logs_file_prefix = logs_path_prefix + self.stats_freq_sec = stats_freq_sec + self.duration_sec = 60 + + def get_keys_from_conditions(self, conditions): + # Note: case insensitive stat names + reqd_stats = [] + for cond in conditions: + for key in cond.keys: + key = key.lower() + # some keys are prepended with '[]' for OdsStatsFetcher to + # replace this with the appropriate key_prefix, remove these + # characters here since the LogStatsParser does not need + # a prefix + if key.startswith('[]'): + reqd_stats.append(key[2:]) + else: + reqd_stats.append(key) + return reqd_stats + + def add_to_timeseries(self, log, reqd_stats): + # this method takes in the Log object that contains the Rocksdb stats + # and a list of required stats, then it parses the stats line by line + # to fetch required stats and add them to the keys_ts object + # Example: reqd_stats = ['rocksdb.block.cache.hit.count', + # 'rocksdb.db.get.micros.p99'] + # Let log.get_message() returns following string: + # "[WARN] [db/db_impl.cc:485] STATISTICS:\n + # rocksdb.block.cache.miss COUNT : 1459\n + # rocksdb.block.cache.hit COUNT : 37\n + # ... + # rocksdb.db.get.micros P50 : 15.6 P95 : 39.7 P99 : 62.6 P100 : 148.0\n + # ..." + new_lines = log.get_message().split('\n') + # let log_ts = 1532518219 + log_ts = log.get_timestamp() + # example updates to keys_ts: + # keys_ts[NO_ENTITY]['rocksdb.db.get.micros.p99'][1532518219] = 62.6 + # keys_ts[NO_ENTITY]['rocksdb.block.cache.hit.count'][1532518219] = 37 + for line in new_lines[1:]: # new_lines[0] does not contain any stats + stats_on_line = self.parse_log_line_for_stats(line) + for stat in stats_on_line: + if stat in reqd_stats: + if stat not in self.keys_ts[NO_ENTITY]: + self.keys_ts[NO_ENTITY][stat] = {} + self.keys_ts[NO_ENTITY][stat][log_ts] = stats_on_line[stat] + + def fetch_timeseries(self, reqd_stats): + # this method parses the Rocksdb LOG file and generates timeseries for + # each of the statistic in the list reqd_stats + self.keys_ts = {NO_ENTITY: {}} + for file_name in glob.glob(self.logs_file_prefix + '*'): + # TODO(poojam23): find a way to distinguish between 'old' log files + # from current and previous experiments, present in the same + # directory + if re.search('old', file_name, re.IGNORECASE): + continue + with open(file_name, 'r') as db_logs: + new_log = None + for line in db_logs: + if Log.is_new_log(line): + if ( + new_log and + re.search(self.STATS, new_log.get_message()) + ): + self.add_to_timeseries(new_log, reqd_stats) + new_log = Log(line, column_families=[]) + else: + # To account for logs split into multiple lines + new_log.append_message(line) + # Check for the last log in the file. + if new_log and re.search(self.STATS, new_log.get_message()): + self.add_to_timeseries(new_log, reqd_stats) + + +class DatabasePerfContext(TimeSeriesData): + # TODO(poojam23): check if any benchrunner provides PerfContext sampled at + # regular intervals + def __init__(self, perf_context_ts, stats_freq_sec, cumulative): + ''' + perf_context_ts is expected to be in the following format: + Dict[metric, Dict[timestamp, value]], where for + each (metric, timestamp) pair, the value is database-wide (i.e. + summed over all the threads involved) + if stats_freq_sec == 0, per-metric only one value is reported + ''' + super().__init__() + self.stats_freq_sec = stats_freq_sec + self.keys_ts = {NO_ENTITY: perf_context_ts} + if cumulative: + self.unaccumulate_metrics() + + def unaccumulate_metrics(self): + # if the perf context metrics provided are cumulative in nature, this + # method can be used to convert them to a disjoint format + epoch_ts = copy.deepcopy(self.keys_ts) + for stat in self.keys_ts[NO_ENTITY]: + timeseries = sorted( + list(self.keys_ts[NO_ENTITY][stat].keys()), reverse=True + ) + if len(timeseries) < 2: + continue + for ix, ts in enumerate(timeseries[:-1]): + epoch_ts[NO_ENTITY][stat][ts] = ( + epoch_ts[NO_ENTITY][stat][ts] - + epoch_ts[NO_ENTITY][stat][timeseries[ix+1]] + ) + if epoch_ts[NO_ENTITY][stat][ts] < 0: + raise ValueError('DBPerfContext: really cumulative?') + # drop the smallest timestamp in the timeseries for this metric + epoch_ts[NO_ENTITY][stat].pop(timeseries[-1]) + self.keys_ts = epoch_ts + + def get_keys_from_conditions(self, conditions): + reqd_stats = [] + for cond in conditions: + reqd_stats.extend([key.lower() for key in cond.keys]) + return reqd_stats + + def fetch_timeseries(self, statistics): + # this method is redundant for DatabasePerfContext because the __init__ + # does the job of populating 'keys_ts' + pass + + +class OdsStatsFetcher(TimeSeriesData): + # class constants + OUTPUT_FILE = 'temp/stats_out.tmp' + ERROR_FILE = 'temp/stats_err.tmp' + RAPIDO_COMMAND = "%s --entity=%s --key=%s --tstart=%s --tend=%s --showtime" + + # static methods + @staticmethod + def _get_string_in_quotes(value): + return '"' + str(value) + '"' + + @staticmethod + def _get_time_value_pair(pair_string): + # example pair_string: '[1532544591, 97.3653601828]' + pair_string = pair_string.replace('[', '') + pair_string = pair_string.replace(']', '') + pair = pair_string.split(',') + first = int(pair[0].strip()) + second = float(pair[1].strip()) + return [first, second] + + @staticmethod + def _get_ods_cli_stime(start_time): + diff = int(time.time() - int(start_time)) + stime = str(diff) + '_s' + return stime + + def __init__( + self, client, entities, start_time, end_time, key_prefix=None + ): + super().__init__() + self.client = client + self.entities = entities + self.start_time = start_time + self.end_time = end_time + self.key_prefix = key_prefix + self.stats_freq_sec = 60 + self.duration_sec = 60 + + def execute_script(self, command): + print('executing...') + print(command) + out_file = open(self.OUTPUT_FILE, "w+") + err_file = open(self.ERROR_FILE, "w+") + subprocess.call(command, shell=True, stdout=out_file, stderr=err_file) + out_file.close() + err_file.close() + + def parse_rapido_output(self): + # Output looks like the following: + # <entity_name>\t<key_name>\t[[ts, value], [ts, value], ...] + # ts = timestamp; value = value of key_name in entity_name at time ts + self.keys_ts = {} + with open(self.OUTPUT_FILE, 'r') as fp: + for line in fp: + token_list = line.strip().split('\t') + entity = token_list[0] + key = token_list[1] + if entity not in self.keys_ts: + self.keys_ts[entity] = {} + if key not in self.keys_ts[entity]: + self.keys_ts[entity][key] = {} + list_of_lists = [ + self._get_time_value_pair(pair_string) + for pair_string in token_list[2].split('],') + ] + value = {pair[0]: pair[1] for pair in list_of_lists} + self.keys_ts[entity][key] = value + + def parse_ods_output(self): + # Output looks like the following: + # <entity_name>\t<key_name>\t<timestamp>\t<value> + # there is one line per (entity_name, key_name, timestamp) + self.keys_ts = {} + with open(self.OUTPUT_FILE, 'r') as fp: + for line in fp: + token_list = line.split() + entity = token_list[0] + if entity not in self.keys_ts: + self.keys_ts[entity] = {} + key = token_list[1] + if key not in self.keys_ts[entity]: + self.keys_ts[entity][key] = {} + self.keys_ts[entity][key][token_list[2]] = token_list[3] + + def fetch_timeseries(self, statistics): + # this method fetches the timeseries of required stats from the ODS + # service and populates the 'keys_ts' object appropriately + print('OdsStatsFetcher: fetching ' + str(statistics)) + if re.search('rapido', self.client, re.IGNORECASE): + command = self.RAPIDO_COMMAND % ( + self.client, + self._get_string_in_quotes(self.entities), + self._get_string_in_quotes(','.join(statistics)), + self._get_string_in_quotes(self.start_time), + self._get_string_in_quotes(self.end_time) + ) + # Run the tool and fetch the time-series data + self.execute_script(command) + # Parse output and populate the 'keys_ts' map + self.parse_rapido_output() + elif re.search('ods', self.client, re.IGNORECASE): + command = ( + self.client + ' ' + + '--stime=' + self._get_ods_cli_stime(self.start_time) + ' ' + + self._get_string_in_quotes(self.entities) + ' ' + + self._get_string_in_quotes(','.join(statistics)) + ) + # Run the tool and fetch the time-series data + self.execute_script(command) + # Parse output and populate the 'keys_ts' map + self.parse_ods_output() + + def get_keys_from_conditions(self, conditions): + reqd_stats = [] + for cond in conditions: + for key in cond.keys: + use_prefix = False + if key.startswith('[]'): + use_prefix = True + key = key[2:] + # TODO(poojam23): this is very hacky and needs to be improved + if key.startswith("rocksdb"): + key += ".60" + if use_prefix: + if not self.key_prefix: + print('Warning: OdsStatsFetcher might need key prefix') + print('for the key: ' + key) + else: + key = self.key_prefix + "." + key + reqd_stats.append(key) + return reqd_stats + + def fetch_rate_url(self, entities, keys, window_len, percent, display): + # type: (List[str], List[str], str, str, bool) -> str + transform_desc = ( + "rate(" + str(window_len) + ",duration=" + str(self.duration_sec) + ) + if percent: + transform_desc = transform_desc + ",%)" + else: + transform_desc = transform_desc + ")" + if re.search('rapido', self.client, re.IGNORECASE): + command = self.RAPIDO_COMMAND + " --transform=%s --url=%s" + command = command % ( + self.client, + self._get_string_in_quotes(','.join(entities)), + self._get_string_in_quotes(','.join(keys)), + self._get_string_in_quotes(self.start_time), + self._get_string_in_quotes(self.end_time), + self._get_string_in_quotes(transform_desc), + self._get_string_in_quotes(display) + ) + elif re.search('ods', self.client, re.IGNORECASE): + command = ( + self.client + ' ' + + '--stime=' + self._get_ods_cli_stime(self.start_time) + ' ' + + '--fburlonly ' + + self._get_string_in_quotes(entities) + ' ' + + self._get_string_in_quotes(','.join(keys)) + ' ' + + self._get_string_in_quotes(transform_desc) + ) + self.execute_script(command) + url = "" + with open(self.OUTPUT_FILE, 'r') as fp: + url = fp.readline() + return url diff --git a/src/rocksdb/tools/advisor/advisor/db_timeseries_parser.py b/src/rocksdb/tools/advisor/advisor/db_timeseries_parser.py new file mode 100644 index 000000000..308eb139a --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/db_timeseries_parser.py @@ -0,0 +1,208 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +from abc import abstractmethod +from advisor.db_log_parser import DataSource +from enum import Enum +import math + + +NO_ENTITY = 'ENTITY_PLACEHOLDER' + + +class TimeSeriesData(DataSource): + class Behavior(Enum): + bursty = 1 + evaluate_expression = 2 + + class AggregationOperator(Enum): + avg = 1 + max = 2 + min = 3 + latest = 4 + oldest = 5 + + def __init__(self): + super().__init__(DataSource.Type.TIME_SERIES) + self.keys_ts = None # Dict[entity, Dict[key, Dict[timestamp, value]]] + self.stats_freq_sec = None + + @abstractmethod + def get_keys_from_conditions(self, conditions): + # This method takes in a list of time-series conditions; for each + # condition it manipulates the 'keys' in the way that is supported by + # the subclass implementing this method + pass + + @abstractmethod + def fetch_timeseries(self, required_statistics): + # this method takes in a list of statistics and fetches the timeseries + # for each of them and populates the 'keys_ts' dictionary + pass + + def fetch_burst_epochs( + self, entities, statistic, window_sec, threshold, percent + ): + # type: (str, int, float, bool) -> Dict[str, Dict[int, float]] + # this method calculates the (percent) rate change in the 'statistic' + # for each entity (over 'window_sec' seconds) and returns the epochs + # where this rate change is greater than or equal to the 'threshold' + # value + if self.stats_freq_sec == 0: + # not time series data, cannot check for bursty behavior + return + if window_sec < self.stats_freq_sec: + window_sec = self.stats_freq_sec + # 'window_samples' is the number of windows to go back to + # compare the current window with, while calculating rate change. + window_samples = math.ceil(window_sec / self.stats_freq_sec) + burst_epochs = {} + # if percent = False: + # curr_val = value at window for which rate change is being calculated + # prev_val = value at window that is window_samples behind curr_window + # Then rate_without_percent = + # ((curr_val-prev_val)*duration_sec)/(curr_timestamp-prev_timestamp) + # if percent = True: + # rate_with_percent = (rate_without_percent * 100) / prev_val + # These calculations are in line with the rate() transform supported + # by ODS + for entity in entities: + if statistic not in self.keys_ts[entity]: + continue + timestamps = sorted(list(self.keys_ts[entity][statistic].keys())) + for ix in range(window_samples, len(timestamps), 1): + first_ts = timestamps[ix - window_samples] + last_ts = timestamps[ix] + first_val = self.keys_ts[entity][statistic][first_ts] + last_val = self.keys_ts[entity][statistic][last_ts] + diff = last_val - first_val + if percent: + diff = diff * 100 / first_val + rate = (diff * self.duration_sec) / (last_ts - first_ts) + # if the rate change is greater than the provided threshold, + # then the condition is triggered for entity at time 'last_ts' + if rate >= threshold: + if entity not in burst_epochs: + burst_epochs[entity] = {} + burst_epochs[entity][last_ts] = rate + return burst_epochs + + def fetch_aggregated_values(self, entity, statistics, aggregation_op): + # type: (str, AggregationOperator) -> Dict[str, float] + # this method performs the aggregation specified by 'aggregation_op' + # on the timeseries of 'statistics' for 'entity' and returns: + # Dict[statistic, aggregated_value] + result = {} + for stat in statistics: + if stat not in self.keys_ts[entity]: + continue + agg_val = None + if aggregation_op is self.AggregationOperator.latest: + latest_timestamp = max(list(self.keys_ts[entity][stat].keys())) + agg_val = self.keys_ts[entity][stat][latest_timestamp] + elif aggregation_op is self.AggregationOperator.oldest: + oldest_timestamp = min(list(self.keys_ts[entity][stat].keys())) + agg_val = self.keys_ts[entity][stat][oldest_timestamp] + elif aggregation_op is self.AggregationOperator.max: + agg_val = max(list(self.keys_ts[entity][stat].values())) + elif aggregation_op is self.AggregationOperator.min: + agg_val = min(list(self.keys_ts[entity][stat].values())) + elif aggregation_op is self.AggregationOperator.avg: + values = list(self.keys_ts[entity][stat].values()) + agg_val = sum(values) / len(values) + result[stat] = agg_val + return result + + def check_and_trigger_conditions(self, conditions): + # get the list of statistics that need to be fetched + reqd_keys = self.get_keys_from_conditions(conditions) + # fetch the required statistics and populate the map 'keys_ts' + self.fetch_timeseries(reqd_keys) + # Trigger the appropriate conditions + for cond in conditions: + complete_keys = self.get_keys_from_conditions([cond]) + # Get the entities that have all statistics required by 'cond': + # an entity is checked for a given condition only if we possess all + # of the condition's 'keys' for that entity + entities_with_stats = [] + for entity in self.keys_ts: + stat_missing = False + for stat in complete_keys: + if stat not in self.keys_ts[entity]: + stat_missing = True + break + if not stat_missing: + entities_with_stats.append(entity) + if not entities_with_stats: + continue + if cond.behavior is self.Behavior.bursty: + # for a condition that checks for bursty behavior, only one key + # should be present in the condition's 'keys' field + result = self.fetch_burst_epochs( + entities_with_stats, + complete_keys[0], # there should be only one key + cond.window_sec, + cond.rate_threshold, + True + ) + # Trigger in this case is: + # Dict[entity_name, Dict[timestamp, rate_change]] + # where the inner dictionary contains rate_change values when + # the rate_change >= threshold provided, with the + # corresponding timestamps + if result: + cond.set_trigger(result) + elif cond.behavior is self.Behavior.evaluate_expression: + self.handle_evaluate_expression( + cond, + complete_keys, + entities_with_stats + ) + + def handle_evaluate_expression(self, condition, statistics, entities): + trigger = {} + # check 'condition' for each of these entities + for entity in entities: + if hasattr(condition, 'aggregation_op'): + # in this case, the aggregation operation is performed on each + # of the condition's 'keys' and then with aggregated values + # condition's 'expression' is evaluated; if it evaluates to + # True, then list of the keys values is added to the + # condition's trigger: Dict[entity_name, List[stats]] + result = self.fetch_aggregated_values( + entity, statistics, condition.aggregation_op + ) + keys = [result[key] for key in statistics] + try: + if eval(condition.expression): + trigger[entity] = keys + except Exception as e: + print( + 'WARNING(TimeSeriesData) check_and_trigger: ' + str(e) + ) + else: + # assumption: all stats have same series of timestamps + # this is similar to the above but 'expression' is evaluated at + # each timestamp, since there is no aggregation, and all the + # epochs are added to the trigger when the condition's + # 'expression' evaluated to true; so trigger is: + # Dict[entity, Dict[timestamp, List[stats]]] + for epoch in self.keys_ts[entity][statistics[0]].keys(): + keys = [ + self.keys_ts[entity][key][epoch] + for key in statistics + ] + try: + if eval(condition.expression): + if entity not in trigger: + trigger[entity] = {} + trigger[entity][epoch] = keys + except Exception as e: + print( + 'WARNING(TimeSeriesData) check_and_trigger: ' + + str(e) + ) + if trigger: + condition.set_trigger(trigger) diff --git a/src/rocksdb/tools/advisor/advisor/ini_parser.py b/src/rocksdb/tools/advisor/advisor/ini_parser.py new file mode 100644 index 000000000..4776ef209 --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/ini_parser.py @@ -0,0 +1,76 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +from enum import Enum + + +class IniParser: + class Element(Enum): + rule = 1 + cond = 2 + sugg = 3 + key_val = 4 + comment = 5 + + @staticmethod + def remove_trailing_comment(line): + line = line.strip() + comment_start = line.find('#') + if comment_start > -1: + return line[:comment_start] + return line + + @staticmethod + def is_section_header(line): + # A section header looks like: [Rule "my-new-rule"]. Essentially, + # a line that is in square-brackets. + line = line.strip() + if line.startswith('[') and line.endswith(']'): + return True + return False + + @staticmethod + def get_section_name(line): + # For a section header: [Rule "my-new-rule"], this method will return + # "my-new-rule". + token_list = line.strip()[1:-1].split('"') + if len(token_list) < 3: + error = 'needed section header: [<section_type> "<section_name>"]' + raise ValueError('Parsing error: ' + error + '\n' + line) + return token_list[1] + + @staticmethod + def get_element(line): + line = IniParser.remove_trailing_comment(line) + if not line: + return IniParser.Element.comment + if IniParser.is_section_header(line): + if line.strip()[1:-1].startswith('Suggestion'): + return IniParser.Element.sugg + if line.strip()[1:-1].startswith('Rule'): + return IniParser.Element.rule + if line.strip()[1:-1].startswith('Condition'): + return IniParser.Element.cond + if '=' in line: + return IniParser.Element.key_val + error = 'not a recognizable RulesSpec element' + raise ValueError('Parsing error: ' + error + '\n' + line) + + @staticmethod + def get_key_value_pair(line): + line = line.strip() + key = line.split('=')[0].strip() + value = "=".join(line.split('=')[1:]) + if value == "": # if the option has no value + return (key, None) + values = IniParser.get_list_from_value(value) + if len(values) == 1: + return (key, value) + return (key, values) + + @staticmethod + def get_list_from_value(value): + values = value.strip().split(':') + return values diff --git a/src/rocksdb/tools/advisor/advisor/rule_parser.py b/src/rocksdb/tools/advisor/advisor/rule_parser.py new file mode 100644 index 000000000..592218f4a --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/rule_parser.py @@ -0,0 +1,528 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +from abc import ABC, abstractmethod +from advisor.db_log_parser import DataSource, NO_COL_FAMILY +from advisor.db_timeseries_parser import TimeSeriesData +from enum import Enum +from advisor.ini_parser import IniParser +import re + + +class Section(ABC): + def __init__(self, name): + self.name = name + + @abstractmethod + def set_parameter(self, key, value): + pass + + @abstractmethod + def perform_checks(self): + pass + + +class Rule(Section): + def __init__(self, name): + super().__init__(name) + self.conditions = None + self.suggestions = None + self.overlap_time_seconds = None + self.trigger_entities = None + self.trigger_column_families = None + + def set_parameter(self, key, value): + # If the Rule is associated with a single suggestion/condition, then + # value will be a string and not a list. Hence, convert it to a single + # element list before storing it in self.suggestions or + # self.conditions. + if key == 'conditions': + if isinstance(value, str): + self.conditions = [value] + else: + self.conditions = value + elif key == 'suggestions': + if isinstance(value, str): + self.suggestions = [value] + else: + self.suggestions = value + elif key == 'overlap_time_period': + self.overlap_time_seconds = value + + def get_suggestions(self): + return self.suggestions + + def perform_checks(self): + if not self.conditions or len(self.conditions) < 1: + raise ValueError( + self.name + ': rule must have at least one condition' + ) + if not self.suggestions or len(self.suggestions) < 1: + raise ValueError( + self.name + ': rule must have at least one suggestion' + ) + if self.overlap_time_seconds: + if len(self.conditions) != 2: + raise ValueError( + self.name + ": rule must be associated with 2 conditions\ + in order to check for a time dependency between them" + ) + time_format = '^\d+[s|m|h|d]$' + if ( + not + re.match(time_format, self.overlap_time_seconds, re.IGNORECASE) + ): + raise ValueError( + self.name + ": overlap_time_seconds format: \d+[s|m|h|d]" + ) + else: # convert to seconds + in_seconds = int(self.overlap_time_seconds[:-1]) + if self.overlap_time_seconds[-1] == 'm': + in_seconds *= 60 + elif self.overlap_time_seconds[-1] == 'h': + in_seconds *= (60 * 60) + elif self.overlap_time_seconds[-1] == 'd': + in_seconds *= (24 * 60 * 60) + self.overlap_time_seconds = in_seconds + + def get_overlap_timestamps(self, key1_trigger_epochs, key2_trigger_epochs): + # this method takes in 2 timeseries i.e. timestamps at which the + # rule's 2 TIME_SERIES conditions were triggered and it finds + # (if present) the first pair of timestamps at which the 2 conditions + # were triggered within 'overlap_time_seconds' of each other + key1_lower_bounds = [ + epoch - self.overlap_time_seconds + for epoch in key1_trigger_epochs + ] + key1_lower_bounds.sort() + key2_trigger_epochs.sort() + trigger_ix = 0 + overlap_pair = None + for key1_lb in key1_lower_bounds: + while ( + key2_trigger_epochs[trigger_ix] < key1_lb and + trigger_ix < len(key2_trigger_epochs) + ): + trigger_ix += 1 + if trigger_ix >= len(key2_trigger_epochs): + break + if ( + key2_trigger_epochs[trigger_ix] <= + key1_lb + (2 * self.overlap_time_seconds) + ): + overlap_pair = ( + key2_trigger_epochs[trigger_ix], + key1_lb + self.overlap_time_seconds + ) + break + return overlap_pair + + def get_trigger_entities(self): + return self.trigger_entities + + def get_trigger_column_families(self): + return self.trigger_column_families + + def is_triggered(self, conditions_dict, column_families): + if self.overlap_time_seconds: + condition1 = conditions_dict[self.conditions[0]] + condition2 = conditions_dict[self.conditions[1]] + if not ( + condition1.get_data_source() is DataSource.Type.TIME_SERIES and + condition2.get_data_source() is DataSource.Type.TIME_SERIES + ): + raise ValueError(self.name + ': need 2 timeseries conditions') + + map1 = condition1.get_trigger() + map2 = condition2.get_trigger() + if not (map1 and map2): + return False + + self.trigger_entities = {} + is_triggered = False + entity_intersection = ( + set(map1.keys()).intersection(set(map2.keys())) + ) + for entity in entity_intersection: + overlap_timestamps_pair = ( + self.get_overlap_timestamps( + list(map1[entity].keys()), list(map2[entity].keys()) + ) + ) + if overlap_timestamps_pair: + self.trigger_entities[entity] = overlap_timestamps_pair + is_triggered = True + if is_triggered: + self.trigger_column_families = set(column_families) + return is_triggered + else: + all_conditions_triggered = True + self.trigger_column_families = set(column_families) + for cond_name in self.conditions: + cond = conditions_dict[cond_name] + if not cond.get_trigger(): + all_conditions_triggered = False + break + if ( + cond.get_data_source() is DataSource.Type.LOG or + cond.get_data_source() is DataSource.Type.DB_OPTIONS + ): + cond_col_fam = set(cond.get_trigger().keys()) + if NO_COL_FAMILY in cond_col_fam: + cond_col_fam = set(column_families) + self.trigger_column_families = ( + self.trigger_column_families.intersection(cond_col_fam) + ) + elif cond.get_data_source() is DataSource.Type.TIME_SERIES: + cond_entities = set(cond.get_trigger().keys()) + if self.trigger_entities is None: + self.trigger_entities = cond_entities + else: + self.trigger_entities = ( + self.trigger_entities.intersection(cond_entities) + ) + if not (self.trigger_entities or self.trigger_column_families): + all_conditions_triggered = False + break + if not all_conditions_triggered: # clean up if rule not triggered + self.trigger_column_families = None + self.trigger_entities = None + return all_conditions_triggered + + def __repr__(self): + # Append conditions + rule_string = "Rule: " + self.name + " has conditions:: " + is_first = True + for cond in self.conditions: + if is_first: + rule_string += cond + is_first = False + else: + rule_string += (" AND " + cond) + # Append suggestions + rule_string += "\nsuggestions:: " + is_first = True + for sugg in self.suggestions: + if is_first: + rule_string += sugg + is_first = False + else: + rule_string += (", " + sugg) + if self.trigger_entities: + rule_string += (', entities:: ' + str(self.trigger_entities)) + if self.trigger_column_families: + rule_string += (', col_fam:: ' + str(self.trigger_column_families)) + # Return constructed string + return rule_string + + +class Suggestion(Section): + class Action(Enum): + set = 1 + increase = 2 + decrease = 3 + + def __init__(self, name): + super().__init__(name) + self.option = None + self.action = None + self.suggested_values = None + self.description = None + + def set_parameter(self, key, value): + if key == 'option': + # Note: + # case 1: 'option' is supported by Rocksdb OPTIONS file; in this + # case the option belongs to one of the sections in the config + # file and it's name is prefixed by "<section_type>." + # case 2: 'option' is not supported by Rocksdb OPTIONS file; the + # option is not expected to have the character '.' in its name + self.option = value + elif key == 'action': + if self.option and not value: + raise ValueError(self.name + ': provide action for option') + self.action = self.Action[value] + elif key == 'suggested_values': + if isinstance(value, str): + self.suggested_values = [value] + else: + self.suggested_values = value + elif key == 'description': + self.description = value + + def perform_checks(self): + if not self.description: + if not self.option: + raise ValueError(self.name + ': provide option or description') + if not self.action: + raise ValueError(self.name + ': provide action for option') + if self.action is self.Action.set and not self.suggested_values: + raise ValueError( + self.name + ': provide suggested value for option' + ) + + def __repr__(self): + sugg_string = "Suggestion: " + self.name + if self.description: + sugg_string += (' description : ' + self.description) + else: + sugg_string += ( + ' option : ' + self.option + ' action : ' + self.action.name + ) + if self.suggested_values: + sugg_string += ( + ' suggested_values : ' + str(self.suggested_values) + ) + return sugg_string + + +class Condition(Section): + def __init__(self, name): + super().__init__(name) + self.data_source = None + self.trigger = None + + def perform_checks(self): + if not self.data_source: + raise ValueError(self.name + ': condition not tied to data source') + + def set_data_source(self, data_source): + self.data_source = data_source + + def get_data_source(self): + return self.data_source + + def reset_trigger(self): + self.trigger = None + + def set_trigger(self, condition_trigger): + self.trigger = condition_trigger + + def get_trigger(self): + return self.trigger + + def is_triggered(self): + if self.trigger: + return True + return False + + def set_parameter(self, key, value): + # must be defined by the subclass + raise NotImplementedError(self.name + ': provide source for condition') + + +class LogCondition(Condition): + @classmethod + def create(cls, base_condition): + base_condition.set_data_source(DataSource.Type['LOG']) + base_condition.__class__ = cls + return base_condition + + def set_parameter(self, key, value): + if key == 'regex': + self.regex = value + + def perform_checks(self): + super().perform_checks() + if not self.regex: + raise ValueError(self.name + ': provide regex for log condition') + + def __repr__(self): + log_cond_str = "LogCondition: " + self.name + log_cond_str += (" regex: " + self.regex) + # if self.trigger: + # log_cond_str += (" trigger: " + str(self.trigger)) + return log_cond_str + + +class OptionCondition(Condition): + @classmethod + def create(cls, base_condition): + base_condition.set_data_source(DataSource.Type['DB_OPTIONS']) + base_condition.__class__ = cls + return base_condition + + def set_parameter(self, key, value): + if key == 'options': + if isinstance(value, str): + self.options = [value] + else: + self.options = value + elif key == 'evaluate': + self.eval_expr = value + + def perform_checks(self): + super().perform_checks() + if not self.options: + raise ValueError(self.name + ': options missing in condition') + if not self.eval_expr: + raise ValueError(self.name + ': expression missing in condition') + + def __repr__(self): + opt_cond_str = "OptionCondition: " + self.name + opt_cond_str += (" options: " + str(self.options)) + opt_cond_str += (" expression: " + self.eval_expr) + if self.trigger: + opt_cond_str += (" trigger: " + str(self.trigger)) + return opt_cond_str + + +class TimeSeriesCondition(Condition): + @classmethod + def create(cls, base_condition): + base_condition.set_data_source(DataSource.Type['TIME_SERIES']) + base_condition.__class__ = cls + return base_condition + + def set_parameter(self, key, value): + if key == 'keys': + if isinstance(value, str): + self.keys = [value] + else: + self.keys = value + elif key == 'behavior': + self.behavior = TimeSeriesData.Behavior[value] + elif key == 'rate_threshold': + self.rate_threshold = float(value) + elif key == 'window_sec': + self.window_sec = int(value) + elif key == 'evaluate': + self.expression = value + elif key == 'aggregation_op': + self.aggregation_op = TimeSeriesData.AggregationOperator[value] + + def perform_checks(self): + if not self.keys: + raise ValueError(self.name + ': specify timeseries key') + if not self.behavior: + raise ValueError(self.name + ': specify triggering behavior') + if self.behavior is TimeSeriesData.Behavior.bursty: + if not self.rate_threshold: + raise ValueError(self.name + ': specify rate burst threshold') + if not self.window_sec: + self.window_sec = 300 # default window length is 5 minutes + if len(self.keys) > 1: + raise ValueError(self.name + ': specify only one key') + elif self.behavior is TimeSeriesData.Behavior.evaluate_expression: + if not (self.expression): + raise ValueError(self.name + ': specify evaluation expression') + else: + raise ValueError(self.name + ': trigger behavior not supported') + + def __repr__(self): + ts_cond_str = "TimeSeriesCondition: " + self.name + ts_cond_str += (" statistics: " + str(self.keys)) + ts_cond_str += (" behavior: " + self.behavior.name) + if self.behavior is TimeSeriesData.Behavior.bursty: + ts_cond_str += (" rate_threshold: " + str(self.rate_threshold)) + ts_cond_str += (" window_sec: " + str(self.window_sec)) + if self.behavior is TimeSeriesData.Behavior.evaluate_expression: + ts_cond_str += (" expression: " + self.expression) + if hasattr(self, 'aggregation_op'): + ts_cond_str += (" aggregation_op: " + self.aggregation_op.name) + if self.trigger: + ts_cond_str += (" trigger: " + str(self.trigger)) + return ts_cond_str + + +class RulesSpec: + def __init__(self, rules_path): + self.file_path = rules_path + + def initialise_fields(self): + self.rules_dict = {} + self.conditions_dict = {} + self.suggestions_dict = {} + + def perform_section_checks(self): + for rule in self.rules_dict.values(): + rule.perform_checks() + for cond in self.conditions_dict.values(): + cond.perform_checks() + for sugg in self.suggestions_dict.values(): + sugg.perform_checks() + + def load_rules_from_spec(self): + self.initialise_fields() + with open(self.file_path, 'r') as db_rules: + curr_section = None + for line in db_rules: + line = IniParser.remove_trailing_comment(line) + if not line: + continue + element = IniParser.get_element(line) + if element is IniParser.Element.comment: + continue + elif element is not IniParser.Element.key_val: + curr_section = element # it's a new IniParser header + section_name = IniParser.get_section_name(line) + if element is IniParser.Element.rule: + new_rule = Rule(section_name) + self.rules_dict[section_name] = new_rule + elif element is IniParser.Element.cond: + new_cond = Condition(section_name) + self.conditions_dict[section_name] = new_cond + elif element is IniParser.Element.sugg: + new_suggestion = Suggestion(section_name) + self.suggestions_dict[section_name] = new_suggestion + elif element is IniParser.Element.key_val: + key, value = IniParser.get_key_value_pair(line) + if curr_section is IniParser.Element.rule: + new_rule.set_parameter(key, value) + elif curr_section is IniParser.Element.cond: + if key == 'source': + if value == 'LOG': + new_cond = LogCondition.create(new_cond) + elif value == 'OPTIONS': + new_cond = OptionCondition.create(new_cond) + elif value == 'TIME_SERIES': + new_cond = TimeSeriesCondition.create(new_cond) + else: + new_cond.set_parameter(key, value) + elif curr_section is IniParser.Element.sugg: + new_suggestion.set_parameter(key, value) + + def get_rules_dict(self): + return self.rules_dict + + def get_conditions_dict(self): + return self.conditions_dict + + def get_suggestions_dict(self): + return self.suggestions_dict + + def get_triggered_rules(self, data_sources, column_families): + self.trigger_conditions(data_sources) + triggered_rules = [] + for rule in self.rules_dict.values(): + if rule.is_triggered(self.conditions_dict, column_families): + triggered_rules.append(rule) + return triggered_rules + + def trigger_conditions(self, data_sources): + for source_type in data_sources: + cond_subset = [ + cond + for cond in self.conditions_dict.values() + if cond.get_data_source() is source_type + ] + if not cond_subset: + continue + for source in data_sources[source_type]: + source.check_and_trigger_conditions(cond_subset) + + def print_rules(self, rules): + for rule in rules: + print('\nRule: ' + rule.name) + for cond_name in rule.conditions: + print(repr(self.conditions_dict[cond_name])) + for sugg_name in rule.suggestions: + print(repr(self.suggestions_dict[sugg_name])) + if rule.trigger_entities: + print('scope: entities:') + print(rule.trigger_entities) + if rule.trigger_column_families: + print('scope: col_fam:') + print(rule.trigger_column_families) diff --git a/src/rocksdb/tools/advisor/advisor/rule_parser_example.py b/src/rocksdb/tools/advisor/advisor/rule_parser_example.py new file mode 100644 index 000000000..d2348e5ae --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/rule_parser_example.py @@ -0,0 +1,89 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +from advisor.rule_parser import RulesSpec +from advisor.db_log_parser import DatabaseLogs, DataSource +from advisor.db_options_parser import DatabaseOptions +from advisor.db_stats_fetcher import LogStatsParser, OdsStatsFetcher +import argparse + + +def main(args): + # initialise the RulesSpec parser + rule_spec_parser = RulesSpec(args.rules_spec) + rule_spec_parser.load_rules_from_spec() + rule_spec_parser.perform_section_checks() + # initialize the DatabaseOptions object + db_options = DatabaseOptions(args.rocksdb_options) + # Create DatabaseLogs object + db_logs = DatabaseLogs( + args.log_files_path_prefix, db_options.get_column_families() + ) + # Create the Log STATS object + db_log_stats = LogStatsParser( + args.log_files_path_prefix, args.stats_dump_period_sec + ) + data_sources = { + DataSource.Type.DB_OPTIONS: [db_options], + DataSource.Type.LOG: [db_logs], + DataSource.Type.TIME_SERIES: [db_log_stats] + } + if args.ods_client: + data_sources[DataSource.Type.TIME_SERIES].append(OdsStatsFetcher( + args.ods_client, + args.ods_entity, + args.ods_tstart, + args.ods_tend, + args.ods_key_prefix + )) + triggered_rules = rule_spec_parser.get_triggered_rules( + data_sources, db_options.get_column_families() + ) + rule_spec_parser.print_rules(triggered_rules) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Use this script to get\ + suggestions for improving Rocksdb performance.') + parser.add_argument( + '--rules_spec', required=True, type=str, + help='path of the file containing the expert-specified Rules' + ) + parser.add_argument( + '--rocksdb_options', required=True, type=str, + help='path of the starting Rocksdb OPTIONS file' + ) + parser.add_argument( + '--log_files_path_prefix', required=True, type=str, + help='path prefix of the Rocksdb LOG files' + ) + parser.add_argument( + '--stats_dump_period_sec', required=True, type=int, + help='the frequency (in seconds) at which STATISTICS are printed to ' + + 'the Rocksdb LOG file' + ) + # ODS arguments + parser.add_argument( + '--ods_client', type=str, help='the ODS client binary' + ) + parser.add_argument( + '--ods_entity', type=str, + help='the servers for which the ODS stats need to be fetched' + ) + parser.add_argument( + '--ods_key_prefix', type=str, + help='the prefix that needs to be attached to the keys of time ' + + 'series to be fetched from ODS' + ) + parser.add_argument( + '--ods_tstart', type=int, + help='start time of timeseries to be fetched from ODS' + ) + parser.add_argument( + '--ods_tend', type=int, + help='end time of timeseries to be fetched from ODS' + ) + args = parser.parse_args() + main(args) diff --git a/src/rocksdb/tools/advisor/advisor/rules.ini b/src/rocksdb/tools/advisor/advisor/rules.ini new file mode 100644 index 000000000..ec7a07e60 --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/rules.ini @@ -0,0 +1,214 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). +# +# FORMAT: very similar to the Rocksdb ini file in terms of syntax +# (refer rocksdb/examples/rocksdb_option_file_example.ini) +# +# The Rules INI file is made up of multiple sections and each section is made +# up of multiple key-value pairs. The recognized section types are: +# Rule, Suggestion, Condition. Each section must have a name specified in "" +# in the section header. This name acts as an identifier in that section +# type's namespace. A section header looks like: +# [<section_type> "<section_name_identifier>"] +# +# There should be at least one Rule section in the file with its corresponding +# Condition and Suggestion sections. A Rule is triggered only when all of its +# conditions are triggered. The order in which a Rule's conditions and +# suggestions are specified has no significance. +# +# A Condition must be associated with a data source specified by the parameter +# 'source' and this must be the first parameter specified for the Condition. +# A condition can be associated with one or more Rules. +# +# A Suggestion is an advised change to a Rocksdb option to improve the +# performance of the database in some way. Every suggestion can be a part of +# one or more Rules. + +[Rule "stall-too-many-memtables"] +suggestions=inc-bg-flush:inc-write-buffer +conditions=stall-too-many-memtables + +[Condition "stall-too-many-memtables"] +source=LOG +regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ + +[Rule "stall-too-many-L0"] +suggestions=inc-max-subcompactions:inc-max-bg-compactions:inc-write-buffer-size:dec-max-bytes-for-level-base:inc-l0-slowdown-writes-trigger +conditions=stall-too-many-L0 + +[Condition "stall-too-many-L0"] +source=LOG +regex=Stalling writes because we have \d+ level-0 files + +[Rule "stop-too-many-L0"] +suggestions=inc-max-bg-compactions:inc-write-buffer-size:inc-l0-stop-writes-trigger +conditions=stop-too-many-L0 + +[Condition "stop-too-many-L0"] +source=LOG +regex=Stopping writes because we have \d+ level-0 files + +[Rule "stall-too-many-compaction-bytes"] +suggestions=inc-max-bg-compactions:inc-write-buffer-size:inc-hard-pending-compaction-bytes-limit:inc-soft-pending-compaction-bytes-limit +conditions=stall-too-many-compaction-bytes + +[Condition "stall-too-many-compaction-bytes"] +source=LOG +regex=Stalling writes because of estimated pending compaction bytes \d+ + +[Suggestion "inc-bg-flush"] +option=DBOptions.max_background_flushes +action=increase +suggested_values=2 + +[Suggestion "inc-write-buffer"] +option=CFOptions.max_write_buffer_number +action=increase + +[Suggestion "inc-max-subcompactions"] +option=DBOptions.max_subcompactions +action=increase + +[Suggestion "inc-max-bg-compactions"] +option=DBOptions.max_background_compactions +action=increase +suggested_values=2 + +[Suggestion "inc-write-buffer-size"] +option=CFOptions.write_buffer_size +action=increase + +[Suggestion "dec-max-bytes-for-level-base"] +option=CFOptions.max_bytes_for_level_base +action=decrease + +[Suggestion "inc-l0-slowdown-writes-trigger"] +option=CFOptions.level0_slowdown_writes_trigger +action=increase + +[Suggestion "inc-l0-stop-writes-trigger"] +option=CFOptions.level0_stop_writes_trigger +action=increase + +[Suggestion "inc-hard-pending-compaction-bytes-limit"] +option=CFOptions.hard_pending_compaction_bytes_limit +action=increase + +[Suggestion "inc-soft-pending-compaction-bytes-limit"] +option=CFOptions.soft_pending_compaction_bytes_limit +action=increase + +[Rule "level0-level1-ratio"] +conditions=level0-level1-ratio +suggestions=inc-base-max-bytes + +[Condition "level0-level1-ratio"] +source=OPTIONS +options=CFOptions.level0_file_num_compaction_trigger:CFOptions.write_buffer_size:CFOptions.max_bytes_for_level_base +evaluate=int(options[0])*int(options[1])-int(options[2])>=1 # should evaluate to a boolean, condition triggered if evaluates to true + +[Suggestion "inc-base-max-bytes"] +option=CFOptions.max_bytes_for_level_base +action=increase + +[Rules "tuning-iostat-burst"] +conditions=large-db-get-p99 +suggestions=bytes-per-sync-non0:wal-bytes-per-sync-non0:set-rate-limiter +#overlap_time_period=10m + +[Condition "write-burst"] +source=TIME_SERIES +keys=dyno.flash_write_bytes_per_sec +behavior=bursty +window_sec=300 # the smaller this window, the more sensitivity to changes in the time series, so the rate_threshold should be bigger; when it's 60, then same as diff(%) +rate_threshold=20 + +[Condition "large-p99-read-latency"] +source=TIME_SERIES +keys=[]rocksdb.read.block.get.micros.p99 +behavior=bursty +window_sec=300 +rate_threshold=10 + +[Condition "large-db-get-p99"] +source=TIME_SERIES +keys=[]rocksdb.db.get.micros.p50:[]rocksdb.db.get.micros.p99 +behavior=evaluate_expression +evaluate=(keys[1]/keys[0])>5 + +[Suggestion "bytes-per-sync-non0"] +option=DBOptions.bytes_per_sync +action=set +suggested_values=1048576 + +[Suggestion "wal-bytes-per-sync-non0"] +option=DBOptions.wal_bytes_per_sync +action=set +suggested_values=1048576 + +[Suggestion "set-rate-limiter"] +option=rate_limiter_bytes_per_sec +action=set +suggested_values=1024000 + +[Rule "bloom-filter-percent-useful"] +conditions=bloom-filter-percent-useful +suggestions=inc-bloom-bits-per-key + +[Condition "bloom-filter-percent-useful"] +source=TIME_SERIES +keys=[]rocksdb.bloom.filter.useful.count:[]rocksdb.bloom.filter.full.positive.count:[]rocksdb.bloom.filter.full.true.positive.count +behavior=evaluate_expression +evaluate=((keys[0]+keys[2])/(keys[0]+keys[1]))<0.9 # should evaluate to a boolean +aggregation_op=latest + +[Rule "bloom-not-enabled"] +conditions=bloom-not-enabled +suggestions=inc-bloom-bits-per-key + +[Condition "bloom-not-enabled"] +source=TIME_SERIES +keys=[]rocksdb.bloom.filter.useful.count:[]rocksdb.bloom.filter.full.positive.count:[]rocksdb.bloom.filter.full.true.positive.count +behavior=evaluate_expression +evaluate=keys[0]+keys[1]+keys[2]==0 +aggregation_op=avg + +[Suggestion "inc-bloom-bits-per-key"] +option=bloom_bits +action=increase +suggested_values=2 + +[Rule "small-l0-files"] +conditions=small-l0-files +suggestions=dec-max-bytes-for-level-base:inc-write-buffer-size + +[Condition "small-l0-files"] +source=OPTIONS +options=CFOptions.max_bytes_for_level_base:CFOptions.level0_file_num_compaction_trigger:CFOptions.write_buffer_size +evaluate=int(options[0])>(10*int(options[1])*int(options[2])) + +[Rule "decompress-time-long"] +conditions=decompress-time-long +suggestions=dec-block-size:inc-block-cache-size:faster-compression-type + +[Condition "decompress-time-long"] +source=TIME_SERIES +keys=block_decompress_time:block_read_time:block_checksum_time +behavior=evaluate_expression +evaluate=(keys[0]/(keys[0]+keys[1]+keys[2]))>0.3 + +[Suggestion "dec-block-size"] +option=TableOptions.BlockBasedTable.block_size +action=decrease + +[Suggestion "inc-block-cache-size"] +option=cache_size +action=increase +suggested_values=16000000 + +[Suggestion "faster-compression-type"] +option=CFOptions.compression +action=set +suggested_values=kLZ4Compression diff --git a/src/rocksdb/tools/advisor/test/__init__.py b/src/rocksdb/tools/advisor/test/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/rocksdb/tools/advisor/test/__init__.py diff --git a/src/rocksdb/tools/advisor/test/input_files/LOG-0 b/src/rocksdb/tools/advisor/test/input_files/LOG-0 new file mode 100644 index 000000000..3c9d51641 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/LOG-0 @@ -0,0 +1,30 @@ +2018/05/25-14:30:05.601692 7f82bd676200 RocksDB version: 5.14.0 +2018/05/25-14:30:07.626719 7f82ba72e700 (Original Log Time 2018/05/25-14:30:07.621966) [db/db_impl_compaction_flush.cc:1424] Calling FlushMemTableToOutputFile with column family [default], flush slots available 1, compaction slots available 1, flush slots scheduled 1, compaction slots scheduled 0 +2018/05/25-14:30:07.626725 7f82ba72e700 [db/flush_job.cc:301] [default] [JOB 3] Flushing memtable with next log file: 8 +2018/05/25-14:30:07.626738 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527283807626732, "job": 3, "event": "flush_started", "num_memtables": 1, "num_entries": 28018, "num_deletes": 0, "memory_usage": 4065512, "flush_reason": "Write Buffer Full"} +2018/05/25-14:30:07.626740 7f82ba72e700 [db/flush_job.cc:331] [default] [JOB 3] Level-0 flush table #10: started +2018/05/25-14:30:07.764232 7f82b2f20700 [db/db_impl_write.cc:1373] [default] New memtable created with log file: #11. Immutable memtables: 1. +2018/05/25-14:30:07.764240 7f82b2f20700 [WARN] [db/column_family.cc:743] [default] Stopping writes because we have 2 immutable memtables (waiting for flush), max_write_buffer_number is set to 2 +2018/05/23-11:53:12.800143 7f9f36b40700 [WARN] [db/column_family.cc:799] [default] Stalling writes because we have 4 level-0 files rate 39886 +2018/05/23-11:53:12.800143 7f9f36b40700 [WARN] [db/column_family.cc:799] [default] Stopping writes because we have 4 level-0 files rate 39886 +2018/05/25-14:30:09.398302 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527283809398276, "cf_name": "default", "job": 3, "event": "table_file_creation", "file_number": 10, "file_size": 1890434, "table_properties": {"data_size": 1876749, "index_size": 23346, "filter_size": 0, "raw_key_size": 663120, "raw_average_key_size": 24, "raw_value_size": 2763000, "raw_average_value_size": 100, "num_data_blocks": 838, "num_entries": 27630, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": "0"}} +2018/05/25-14:30:09.398351 7f82ba72e700 [db/flush_job.cc:371] [default] [JOB 3] Level-0 flush table #10: 1890434 bytes OK +2018/05/25-14:30:25.491635 7f82ba72e700 [db/flush_job.cc:331] [default] [JOB 10] Level-0 flush table #23: started +2018/05/25-14:30:25.643618 7f82b2f20700 [db/db_impl_write.cc:1373] [default] New memtable created with log file: #24. Immutable memtables: 1. +2018/05/25-14:30:25.643633 7f82b2f20700 [WARN] [db/column_family.cc:743] [default] Stopping writes because we have 2 immutable memtables (waiting for flush), max_write_buffer_number is set to 2 +2018/05/25-14:30:27.288181 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527283827288158, "cf_name": "default", "job": 10, "event": "table_file_creation", "file_number": 23, "file_size": 1893200, "table_properties": {"data_size": 1879460, "index_size": 23340, "filter_size": 0, "raw_key_size": 663360, "raw_average_key_size": 24, "raw_value_size": 2764000, "raw_average_value_size": 100, "num_data_blocks": 838, "num_entries": 27640, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": "0"}} +2018/05/25-14:30:27.288210 7f82ba72e700 [db/flush_job.cc:371] [default] [JOB 10] Level-0 flush table #23: 1893200 bytes OK +2018/05/25-14:30:27.289353 7f82ba72e700 [WARN] [db/column_family.cc:764] [default] Stalling writes because of estimated pending compaction bytes 14410584 +2018/05/25-14:30:27.289390 7f82ba72e700 (Original Log Time 2018/05/25-14:30:27.288829) [db/memtable_list.cc:377] [default] Level-0 commit table #23 started +2018/05/25-14:30:27.289393 7f82ba72e700 (Original Log Time 2018/05/25-14:30:27.289332) [db/memtable_list.cc:409] [default] Level-0 commit table #23: memtable #1 done +2018/05/25-14:34:21.047206 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527284061047181, "cf_name": "default", "job": 44, "event": "table_file_creation", "file_number": 84, "file_size": 1890780, "table_properties": {"data_size": 1877100, "index_size": 23309, "filter_size": 0, "raw_key_size": 662808, "raw_average_key_size": 24, "raw_value_size": 2761700, "raw_average_value_size": 100, "num_data_blocks": 837, "num_entries": 27617, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": "0"}} +2018/05/25-14:34:21.047233 7f82ba72e700 [db/flush_job.cc:371] [default] [JOB 44] Level-0 flush table #84: 1890780 bytes OK +2018/05/25-14:34:21.048017 7f82ba72e700 (Original Log Time 2018/05/25-14:34:21.048005) EVENT_LOG_v1 {"time_micros": 1527284061047997, "job": 44, "event": "flush_finished", "output_compression": "Snappy", "lsm_state": [2, 1, 0, 0, 0, 0, 0], "immutable_memtables": 1} +2018/05/25-14:34:21.048592 7f82bd676200 [DEBUG] [db/db_impl_files.cc:261] [JOB 45] Delete /tmp/rocksdbtest-155919/dbbench/000084.sst type=2 #84 -- OK +2018/05/25-14:34:21.048603 7f82bd676200 EVENT_LOG_v1 {"time_micros": 1527284061048600, "job": 45, "event": "table_file_deletion", "file_number": 84} +2018/05/25-14:34:21.048981 7f82bd676200 [db/db_impl.cc:398] Shutdown complete +2018/05/25-14:34:21.049000 7f82bd676200 [db/db_impl.cc:563] [col-fam-A] random log message for testing +2018/05/25-14:34:21.049010 7f82bd676200 [db/db_impl.cc:234] [col-fam-B] log continuing on next line +remaining part of the log +2018/05/25-14:34:21.049020 7f82bd676200 [db/db_impl.cc:653] [col-fam-A] another random log message +2018/05/25-14:34:21.049025 7f82bd676200 [db/db_impl.cc:331] [unknown] random log message no column family diff --git a/src/rocksdb/tools/advisor/test/input_files/LOG-1 b/src/rocksdb/tools/advisor/test/input_files/LOG-1 new file mode 100644 index 000000000..b163f9a99 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/LOG-1 @@ -0,0 +1,25 @@ +2018/05/25-14:30:05.601692 7f82bd676200 RocksDB version: 5.14.0 +2018/05/25-14:30:07.626719 7f82ba72e700 (Original Log Time 2018/05/25-14:30:07.621966) [db/db_impl_compaction_flush.cc:1424] Calling FlushMemTableToOutputFile with column family [default], flush slots available 1, compaction slots available 1, flush slots scheduled 1, compaction slots scheduled 0 +2018/05/25-14:30:07.626725 7f82ba72e700 [db/flush_job.cc:301] [default] [JOB 3] Flushing memtable with next log file: 8 +2018/05/25-14:30:07.626738 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527283807626732, "job": 3, "event": "flush_started", "num_memtables": 1, "num_entries": 28018, "num_deletes": 0, "memory_usage": 4065512, "flush_reason": "Write Buffer Full"} +2018/05/25-14:30:07.626740 7f82ba72e700 [db/flush_job.cc:331] [default] [JOB 3] Level-0 flush table #10: started +2018/05/25-14:30:07.764232 7f82b2f20700 [db/db_impl_write.cc:1373] [default] New memtable created with log file: #11. Immutable memtables: 1. +2018/05/25-14:30:07.764240 7f82b2f20700 [WARN] [db/column_family.cc:743] [default] Stopping writes because we have 2 immutable memtables (waiting for flush), max_write_buffer_number is set to 2 +2018/05/23-11:53:12.800143 7f9f36b40700 [WARN] [db/column_family.cc:799] [default] Stalling writes because we have 4 level-0 files rate 39886 +2018/05/23-11:53:12.800143 7f9f36b40700 [WARN] [db/column_family.cc:799] [default] Stopping writes because we have 4 level-0 files rate 39886 +2018/05/25-14:30:09.398302 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527283809398276, "cf_name": "default", "job": 3, "event": "table_file_creation", "file_number": 10, "file_size": 1890434, "table_properties": {"data_size": 1876749, "index_size": 23346, "filter_size": 0, "raw_key_size": 663120, "raw_average_key_size": 24, "raw_value_size": 2763000, "raw_average_value_size": 100, "num_data_blocks": 838, "num_entries": 27630, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": "0"}} +2018/05/25-14:30:09.398351 7f82ba72e700 [db/flush_job.cc:371] [default] [JOB 3] Level-0 flush table #10: 1890434 bytes OK +2018/05/25-14:30:25.491635 7f82ba72e700 [db/flush_job.cc:331] [default] [JOB 10] Level-0 flush table #23: started +2018/05/25-14:30:25.643618 7f82b2f20700 [db/db_impl_write.cc:1373] [default] New memtable created with log file: #24. Immutable memtables: 1. +2018/05/25-14:30:25.643633 7f82b2f20700 [WARN] [db/column_family.cc:743] [default] Stopping writes because we have 2 immutable memtables (waiting for flush), max_write_buffer_number is set to 2 +2018/05/25-14:30:27.288181 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527283827288158, "cf_name": "default", "job": 10, "event": "table_file_creation", "file_number": 23, "file_size": 1893200, "table_properties": {"data_size": 1879460, "index_size": 23340, "filter_size": 0, "raw_key_size": 663360, "raw_average_key_size": 24, "raw_value_size": 2764000, "raw_average_value_size": 100, "num_data_blocks": 838, "num_entries": 27640, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": "0"}} +2018/05/25-14:30:27.288210 7f82ba72e700 [db/flush_job.cc:371] [default] [JOB 10] Level-0 flush table #23: 1893200 bytes OK +2018/05/25-14:30:27.289353 7f82ba72e700 [WARN] [db/column_family.cc:764] [default] Stopping writes because of estimated pending compaction bytes 14410584 +2018/05/25-14:30:27.289390 7f82ba72e700 (Original Log Time 2018/05/25-14:30:27.288829) [db/memtable_list.cc:377] [default] Level-0 commit table #23 started +2018/05/25-14:30:27.289393 7f82ba72e700 (Original Log Time 2018/05/25-14:30:27.289332) [db/memtable_list.cc:409] [default] Level-0 commit table #23: memtable #1 done +2018/05/25-14:34:21.047206 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527284061047181, "cf_name": "default", "job": 44, "event": "table_file_creation", "file_number": 84, "file_size": 1890780, "table_properties": {"data_size": 1877100, "index_size": 23309, "filter_size": 0, "raw_key_size": 662808, "raw_average_key_size": 24, "raw_value_size": 2761700, "raw_average_value_size": 100, "num_data_blocks": 837, "num_entries": 27617, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": "0"}} +2018/05/25-14:34:21.047233 7f82ba72e700 [db/flush_job.cc:371] [default] [JOB 44] Level-0 flush table #84: 1890780 bytes OK +2018/05/25-14:34:21.048017 7f82ba72e700 (Original Log Time 2018/05/25-14:34:21.048005) EVENT_LOG_v1 {"time_micros": 1527284061047997, "job": 44, "event": "flush_finished", "output_compression": "Snappy", "lsm_state": [2, 1, 0, 0, 0, 0, 0], "immutable_memtables": 1} +2018/05/25-14:34:21.048592 7f82bd676200 [DEBUG] [db/db_impl_files.cc:261] [JOB 45] Delete /tmp/rocksdbtest-155919/dbbench/000084.sst type=2 #84 -- OK +2018/05/25-14:34:21.048603 7f82bd676200 EVENT_LOG_v1 {"time_micros": 1527284061048600, "job": 45, "event": "table_file_deletion", "file_number": 84} +2018/05/25-14:34:21.048981 7f82bd676200 [db/db_impl.cc:398] Shutdown complete diff --git a/src/rocksdb/tools/advisor/test/input_files/OPTIONS-000005 b/src/rocksdb/tools/advisor/test/input_files/OPTIONS-000005 new file mode 100644 index 000000000..009edb04d --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/OPTIONS-000005 @@ -0,0 +1,49 @@ +# This is a RocksDB option file. +# +# For detailed file format spec, please refer to the example file +# in examples/rocksdb_option_file_example.ini +# + +[Version] + rocksdb_version=5.14.0 + options_file_version=1.1 + +[DBOptions] + manual_wal_flush=false + allow_ingest_behind=false + db_write_buffer_size=0 + db_log_dir= + random_access_max_buffer_size=1048576 + +[CFOptions "default"] + ttl=0 + max_bytes_for_level_base=268435456 + max_bytes_for_level_multiplier=10.000000 + level0_file_num_compaction_trigger=4 + level0_stop_writes_trigger=36 + write_buffer_size=4194000 + min_write_buffer_number_to_merge=1 + num_levels=7 + compaction_filter_factory=nullptr + compaction_style=kCompactionStyleLevel + +[TableOptions/BlockBasedTable "default"] + block_align=false + index_type=kBinarySearch + +[CFOptions "col_fam_A"] +ttl=0 +max_bytes_for_level_base=268435456 +max_bytes_for_level_multiplier=10.000000 +level0_file_num_compaction_trigger=5 +level0_stop_writes_trigger=36 +write_buffer_size=1024000 +min_write_buffer_number_to_merge=1 +num_levels=5 +compaction_filter_factory=nullptr +compaction_style=kCompactionStyleLevel + +[TableOptions/BlockBasedTable "col_fam_A"] +block_align=true +block_restart_interval=16 +index_type=kBinarySearch diff --git a/src/rocksdb/tools/advisor/test/input_files/log_stats_parser_keys_ts b/src/rocksdb/tools/advisor/test/input_files/log_stats_parser_keys_ts new file mode 100644 index 000000000..e8ade9e3e --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/log_stats_parser_keys_ts @@ -0,0 +1,3 @@ +rocksdb.number.block.decompressed.count: 1530896335 88.0, 1530896361 788338.0, 1530896387 1539256.0, 1530896414 2255696.0, 1530896440 3009325.0, 1530896466 3767183.0, 1530896492 4529775.0, 1530896518 5297809.0, 1530896545 6033802.0, 1530896570 6794129.0 +rocksdb.db.get.micros.p50: 1530896335 295.5, 1530896361 16.561841, 1530896387 16.20677, 1530896414 16.31508, 1530896440 16.346602, 1530896466 16.284669, 1530896492 16.16005, 1530896518 16.069096, 1530896545 16.028746, 1530896570 15.9638 +rocksdb.manifest.file.sync.micros.p99: 1530896335 649.0, 1530896361 835.0, 1530896387 1435.0, 1530896414 9938.0, 1530896440 9938.0, 1530896466 9938.0, 1530896492 9938.0, 1530896518 1882.0, 1530896545 1837.0, 1530896570 1792.0 diff --git a/src/rocksdb/tools/advisor/test/input_files/rules_err1.ini b/src/rocksdb/tools/advisor/test/input_files/rules_err1.ini new file mode 100644 index 000000000..23be55dde --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/rules_err1.ini @@ -0,0 +1,56 @@ +[Rule "missing-suggestions"] +suggestions= +conditions=missing-source + +[Condition "normal-rule"] +source=LOG +regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ + +[Suggestion "inc-bg-flush"] +option=DBOptions.max_background_flushes +action=increase + +[Suggestion "inc-write-buffer"] +option=CFOptions.max_write_buffer_number +action=increase + +[Rule "missing-conditions"] +conditions= +suggestions=missing-description + +[Condition "missing-options"] +source=OPTIONS +options= +evaluate=int(options[0])*int(options[1])-int(options[2])<(-251659456) # should evaluate to a boolean + +[Rule "missing-expression"] +conditions=missing-expression +suggestions=missing-description + +[Condition "missing-expression"] +source=OPTIONS +options=CFOptions.level0_file_num_compaction_trigger:CFOptions.write_buffer_size:CFOptions.max_bytes_for_level_base +evaluate= + +[Suggestion "missing-description"] +description= + +[Rule "stop-too-many-L0"] +suggestions=inc-max-bg-compactions:missing-action:inc-l0-stop-writes-trigger +conditions=missing-regex + +[Condition "missing-regex"] +source=LOG +regex= + +[Suggestion "missing-option"] +option= +action=increase + +[Suggestion "normal-suggestion"] +option=CFOptions.write_buffer_size +action=increase + +[Suggestion "inc-l0-stop-writes-trigger"] +option=CFOptions.level0_stop_writes_trigger +action=increase diff --git a/src/rocksdb/tools/advisor/test/input_files/rules_err2.ini b/src/rocksdb/tools/advisor/test/input_files/rules_err2.ini new file mode 100644 index 000000000..bce21dba9 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/rules_err2.ini @@ -0,0 +1,15 @@ +[Rule "normal-rule"] +suggestions=inc-bg-flush:inc-write-buffer +conditions=missing-source + +[Condition "missing-source"] +source= +regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ + +[Suggestion "inc-bg-flush"] +option=DBOptions.max_background_flushes +action=increase + +[Suggestion "inc-write-buffer"] +option=CFOptions.max_write_buffer_number +action=increase diff --git a/src/rocksdb/tools/advisor/test/input_files/rules_err3.ini b/src/rocksdb/tools/advisor/test/input_files/rules_err3.ini new file mode 100644 index 000000000..73c06e469 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/rules_err3.ini @@ -0,0 +1,15 @@ +[Rule "normal-rule"] +suggestions=missing-action:inc-write-buffer +conditions=missing-source + +[Condition "normal-condition"] +source=LOG +regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ + +[Suggestion "missing-action"] +option=DBOptions.max_background_flushes +action= + +[Suggestion "inc-write-buffer"] +option=CFOptions.max_write_buffer_number +action=increase diff --git a/src/rocksdb/tools/advisor/test/input_files/rules_err4.ini b/src/rocksdb/tools/advisor/test/input_files/rules_err4.ini new file mode 100644 index 000000000..4d4aa3c70 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/rules_err4.ini @@ -0,0 +1,15 @@ +[Rule "normal-rule"] +suggestions=inc-bg-flush +conditions=missing-source + +[Condition "normal-condition"] +source=LOG +regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ + +[Suggestion "inc-bg-flush"] +option=DBOptions.max_background_flushes +action=increase + +[Suggestion] # missing section name +option=CFOptions.max_write_buffer_number +action=increase diff --git a/src/rocksdb/tools/advisor/test/input_files/test_rules.ini b/src/rocksdb/tools/advisor/test/input_files/test_rules.ini new file mode 100644 index 000000000..97b9374fc --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/test_rules.ini @@ -0,0 +1,47 @@ +[Rule "single-condition-false"] +suggestions=inc-bg-flush:inc-write-buffer +conditions=log-4-false + +[Rule "multiple-conds-true"] +suggestions=inc-write-buffer +conditions=log-1-true:log-2-true:log-3-true + +[Rule "multiple-conds-one-false"] +suggestions=inc-bg-flush +conditions=log-1-true:log-4-false:log-3-true + +[Rule "multiple-conds-all-false"] +suggestions=l0-l1-ratio-health-check +conditions=log-4-false:options-1-false + +[Condition "log-1-true"] +source=LOG +regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ + +[Condition "log-2-true"] +source=LOG +regex=Stalling writes because we have \d+ level-0 files + +[Condition "log-3-true"] +source=LOG +regex=Stopping writes because we have \d+ level-0 files + +[Condition "log-4-false"] +source=LOG +regex=Stalling writes because of estimated pending compaction bytes \d+ + +[Condition "options-1-false"] +source=OPTIONS +options=CFOptions.level0_file_num_compaction_trigger:CFOptions.write_buffer_size:DBOptions.random_access_max_buffer_size +evaluate=int(options[0])*int(options[1])-int(options[2])<0 # should evaluate to a boolean + +[Suggestion "inc-bg-flush"] +option=DBOptions.max_background_flushes +action=increase + +[Suggestion "inc-write-buffer"] +option=CFOptions.max_write_buffer_number +action=increase + +[Suggestion "l0-l1-ratio-health-check"] +description='modify options such that (level0_file_num_compaction_trigger * write_buffer_size - max_bytes_for_level_base < 5) is satisfied' diff --git a/src/rocksdb/tools/advisor/test/input_files/triggered_rules.ini b/src/rocksdb/tools/advisor/test/input_files/triggered_rules.ini new file mode 100644 index 000000000..83b96da2b --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/triggered_rules.ini @@ -0,0 +1,83 @@ +[Rule "stall-too-many-memtables"] +suggestions=inc-bg-flush:inc-write-buffer +conditions=stall-too-many-memtables + +[Condition "stall-too-many-memtables"] +source=LOG +regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ + +[Rule "stall-too-many-L0"] +suggestions=inc-max-subcompactions:inc-max-bg-compactions:inc-write-buffer-size:dec-max-bytes-for-level-base:inc-l0-slowdown-writes-trigger +conditions=stall-too-many-L0 + +[Condition "stall-too-many-L0"] +source=LOG +regex=Stalling writes because we have \d+ level-0 files + +[Rule "stop-too-many-L0"] +suggestions=inc-max-bg-compactions:inc-write-buffer-size:inc-l0-stop-writes-trigger +conditions=stop-too-many-L0 + +[Condition "stop-too-many-L0"] +source=LOG +regex=Stopping writes because we have \d+ level-0 files + +[Rule "stall-too-many-compaction-bytes"] +suggestions=inc-max-bg-compactions:inc-write-buffer-size:inc-hard-pending-compaction-bytes-limit:inc-soft-pending-compaction-bytes-limit +conditions=stall-too-many-compaction-bytes + +[Condition "stall-too-many-compaction-bytes"] +source=LOG +regex=Stalling writes because of estimated pending compaction bytes \d+ + +[Suggestion "inc-bg-flush"] +option=DBOptions.max_background_flushes +action=increase + +[Suggestion "inc-write-buffer"] +option=CFOptions.max_write_buffer_number +action=increase + +[Suggestion "inc-max-subcompactions"] +option=DBOptions.max_subcompactions +action=increase + +[Suggestion "inc-max-bg-compactions"] +option=DBOptions.max_background_compactions +action=increase + +[Suggestion "inc-write-buffer-size"] +option=CFOptions.write_buffer_size +action=increase + +[Suggestion "dec-max-bytes-for-level-base"] +option=CFOptions.max_bytes_for_level_base +action=decrease + +[Suggestion "inc-l0-slowdown-writes-trigger"] +option=CFOptions.level0_slowdown_writes_trigger +action=increase + +[Suggestion "inc-l0-stop-writes-trigger"] +option=CFOptions.level0_stop_writes_trigger +action=increase + +[Suggestion "inc-hard-pending-compaction-bytes-limit"] +option=CFOptions.hard_pending_compaction_bytes_limit +action=increase + +[Suggestion "inc-soft-pending-compaction-bytes-limit"] +option=CFOptions.soft_pending_compaction_bytes_limit +action=increase + +[Rule "level0-level1-ratio"] +conditions=level0-level1-ratio +suggestions=l0-l1-ratio-health-check + +[Condition "level0-level1-ratio"] +source=OPTIONS +options=CFOptions.level0_file_num_compaction_trigger:CFOptions.write_buffer_size:CFOptions.max_bytes_for_level_base +evaluate=int(options[0])*int(options[1])-int(options[2])>=-268173312 # should evaluate to a boolean, condition triggered if evaluates to true + +[Suggestion "l0-l1-ratio-health-check"] +description='modify options such that (level0_file_num_compaction_trigger * write_buffer_size - max_bytes_for_level_base < -268173312) is satisfied' diff --git a/src/rocksdb/tools/advisor/test/test_db_bench_runner.py b/src/rocksdb/tools/advisor/test/test_db_bench_runner.py new file mode 100644 index 000000000..1c4f77d50 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/test_db_bench_runner.py @@ -0,0 +1,147 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +from advisor.db_bench_runner import DBBenchRunner +from advisor.db_log_parser import NO_COL_FAMILY, DataSource +from advisor.db_options_parser import DatabaseOptions +import os +import unittest + + +class TestDBBenchRunnerMethods(unittest.TestCase): + def setUp(self): + self.pos_args = [ + './../../db_bench', + 'overwrite', + 'use_existing_db=true', + 'duration=10' + ] + self.bench_runner = DBBenchRunner(self.pos_args) + this_path = os.path.abspath(os.path.dirname(__file__)) + options_path = os.path.join(this_path, 'input_files/OPTIONS-000005') + self.db_options = DatabaseOptions(options_path) + + def test_setup(self): + self.assertEqual(self.bench_runner.db_bench_binary, self.pos_args[0]) + self.assertEqual(self.bench_runner.benchmark, self.pos_args[1]) + self.assertSetEqual( + set(self.bench_runner.db_bench_args), set(self.pos_args[2:]) + ) + + def test_get_info_log_file_name(self): + log_file_name = DBBenchRunner.get_info_log_file_name( + None, 'random_path' + ) + self.assertEqual(log_file_name, 'LOG') + + log_file_name = DBBenchRunner.get_info_log_file_name( + '/dev/shm/', '/tmp/rocksdbtest-155919/dbbench/' + ) + self.assertEqual(log_file_name, 'tmp_rocksdbtest-155919_dbbench_LOG') + + def test_get_opt_args_str(self): + misc_opt_dict = {'bloom_bits': 2, 'empty_opt': None, 'rate_limiter': 3} + optional_args_str = DBBenchRunner.get_opt_args_str(misc_opt_dict) + self.assertEqual(optional_args_str, ' --bloom_bits=2 --rate_limiter=3') + + def test_get_log_options(self): + db_path = '/tmp/rocksdb-155919/dbbench' + # when db_log_dir is present in the db_options + update_dict = { + 'DBOptions.db_log_dir': {NO_COL_FAMILY: '/dev/shm'}, + 'DBOptions.stats_dump_period_sec': {NO_COL_FAMILY: '20'} + } + self.db_options.update_options(update_dict) + log_file_prefix, stats_freq = self.bench_runner.get_log_options( + self.db_options, db_path + ) + self.assertEqual( + log_file_prefix, '/dev/shm/tmp_rocksdb-155919_dbbench_LOG' + ) + self.assertEqual(stats_freq, 20) + + update_dict = { + 'DBOptions.db_log_dir': {NO_COL_FAMILY: None}, + 'DBOptions.stats_dump_period_sec': {NO_COL_FAMILY: '30'} + } + self.db_options.update_options(update_dict) + log_file_prefix, stats_freq = self.bench_runner.get_log_options( + self.db_options, db_path + ) + self.assertEqual(log_file_prefix, '/tmp/rocksdb-155919/dbbench/LOG') + self.assertEqual(stats_freq, 30) + + def test_build_experiment_command(self): + # add some misc_options to db_options + update_dict = { + 'bloom_bits': {NO_COL_FAMILY: 2}, + 'rate_limiter_bytes_per_sec': {NO_COL_FAMILY: 128000000} + } + self.db_options.update_options(update_dict) + db_path = '/dev/shm' + experiment_command = self.bench_runner._build_experiment_command( + self.db_options, db_path + ) + opt_args_str = DBBenchRunner.get_opt_args_str( + self.db_options.get_misc_options() + ) + opt_args_str += ( + ' --options_file=' + + self.db_options.generate_options_config('12345') + ) + for arg in self.pos_args[2:]: + opt_args_str += (' --' + arg) + expected_command = ( + self.pos_args[0] + ' --benchmarks=' + self.pos_args[1] + + ' --statistics --perf_level=3 --db=' + db_path + opt_args_str + ) + self.assertEqual(experiment_command, expected_command) + + +class TestDBBenchRunner(unittest.TestCase): + def setUp(self): + # Note: the db_bench binary should be present in the rocksdb/ directory + self.pos_args = [ + './../../db_bench', + 'overwrite', + 'use_existing_db=true', + 'duration=20' + ] + self.bench_runner = DBBenchRunner(self.pos_args) + this_path = os.path.abspath(os.path.dirname(__file__)) + options_path = os.path.join(this_path, 'input_files/OPTIONS-000005') + self.db_options = DatabaseOptions(options_path) + + def test_experiment_output(self): + update_dict = {'bloom_bits': {NO_COL_FAMILY: 2}} + self.db_options.update_options(update_dict) + db_path = '/dev/shm' + data_sources, throughput = self.bench_runner.run_experiment( + self.db_options, db_path + ) + self.assertEqual( + data_sources[DataSource.Type.DB_OPTIONS][0].type, + DataSource.Type.DB_OPTIONS + ) + self.assertEqual( + data_sources[DataSource.Type.LOG][0].type, + DataSource.Type.LOG + ) + self.assertEqual(len(data_sources[DataSource.Type.TIME_SERIES]), 2) + self.assertEqual( + data_sources[DataSource.Type.TIME_SERIES][0].type, + DataSource.Type.TIME_SERIES + ) + self.assertEqual( + data_sources[DataSource.Type.TIME_SERIES][1].type, + DataSource.Type.TIME_SERIES + ) + self.assertEqual( + data_sources[DataSource.Type.TIME_SERIES][1].stats_freq_sec, 0 + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/rocksdb/tools/advisor/test/test_db_log_parser.py b/src/rocksdb/tools/advisor/test/test_db_log_parser.py new file mode 100644 index 000000000..b70430433 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/test_db_log_parser.py @@ -0,0 +1,103 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +from advisor.db_log_parser import DatabaseLogs, Log, NO_COL_FAMILY +from advisor.rule_parser import Condition, LogCondition +import os +import unittest + + +class TestLog(unittest.TestCase): + def setUp(self): + self.column_families = ['default', 'col_fam_A'] + + def test_get_column_family(self): + test_log = ( + "2018/05/25-14:34:21.047233 7f82ba72e700 [db/flush_job.cc:371] " + + "[col_fam_A] [JOB 44] Level-0 flush table #84: 1890780 bytes OK" + ) + db_log = Log(test_log, self.column_families) + self.assertEqual('col_fam_A', db_log.get_column_family()) + + test_log = ( + "2018/05/25-14:34:21.047233 7f82ba72e700 [db/flush_job.cc:371] " + + "[JOB 44] Level-0 flush table #84: 1890780 bytes OK" + ) + db_log = Log(test_log, self.column_families) + db_log.append_message('[default] some remaining part of log') + self.assertEqual(NO_COL_FAMILY, db_log.get_column_family()) + + def test_get_methods(self): + hr_time = "2018/05/25-14:30:25.491635" + context = "7f82ba72e700" + message = ( + "[db/flush_job.cc:331] [default] [JOB 10] Level-0 flush table " + + "#23: started" + ) + test_log = hr_time + " " + context + " " + message + db_log = Log(test_log, self.column_families) + self.assertEqual(db_log.get_message(), message) + remaining_message = "[col_fam_A] some more logs" + db_log.append_message(remaining_message) + self.assertEqual( + db_log.get_human_readable_time(), "2018/05/25-14:30:25.491635" + ) + self.assertEqual(db_log.get_context(), "7f82ba72e700") + self.assertEqual(db_log.get_timestamp(), 1527258625) + self.assertEqual( + db_log.get_message(), str(message + '\n' + remaining_message) + ) + + def test_is_new_log(self): + new_log = "2018/05/25-14:34:21.047233 context random new log" + remaining_log = "2018/05/25 not really a new log" + self.assertTrue(Log.is_new_log(new_log)) + self.assertFalse(Log.is_new_log(remaining_log)) + + +class TestDatabaseLogs(unittest.TestCase): + def test_check_and_trigger_conditions(self): + this_path = os.path.abspath(os.path.dirname(__file__)) + logs_path_prefix = os.path.join(this_path, 'input_files/LOG-0') + column_families = ['default', 'col-fam-A', 'col-fam-B'] + db_logs = DatabaseLogs(logs_path_prefix, column_families) + # matches, has 2 col_fams + condition1 = LogCondition.create(Condition('cond-A')) + condition1.set_parameter('regex', 'random log message') + # matches, multiple lines message + condition2 = LogCondition.create(Condition('cond-B')) + condition2.set_parameter('regex', 'continuing on next line') + # does not match + condition3 = LogCondition.create(Condition('cond-C')) + condition3.set_parameter('regex', 'this should match no log') + db_logs.check_and_trigger_conditions( + [condition1, condition2, condition3] + ) + cond1_trigger = condition1.get_trigger() + self.assertEqual(2, len(cond1_trigger.keys())) + self.assertSetEqual( + {'col-fam-A', NO_COL_FAMILY}, set(cond1_trigger.keys()) + ) + self.assertEqual(2, len(cond1_trigger['col-fam-A'])) + messages = [ + "[db/db_impl.cc:563] [col-fam-A] random log message for testing", + "[db/db_impl.cc:653] [col-fam-A] another random log message" + ] + self.assertIn(cond1_trigger['col-fam-A'][0].get_message(), messages) + self.assertIn(cond1_trigger['col-fam-A'][1].get_message(), messages) + self.assertEqual(1, len(cond1_trigger[NO_COL_FAMILY])) + self.assertEqual( + cond1_trigger[NO_COL_FAMILY][0].get_message(), + "[db/db_impl.cc:331] [unknown] random log message no column family" + ) + cond2_trigger = condition2.get_trigger() + self.assertEqual(['col-fam-B'], list(cond2_trigger.keys())) + self.assertEqual(1, len(cond2_trigger['col-fam-B'])) + self.assertEqual( + cond2_trigger['col-fam-B'][0].get_message(), + "[db/db_impl.cc:234] [col-fam-B] log continuing on next line\n" + + "remaining part of the log" + ) + self.assertIsNone(condition3.get_trigger()) diff --git a/src/rocksdb/tools/advisor/test/test_db_options_parser.py b/src/rocksdb/tools/advisor/test/test_db_options_parser.py new file mode 100644 index 000000000..d53a9bdb5 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/test_db_options_parser.py @@ -0,0 +1,216 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +from advisor.db_log_parser import NO_COL_FAMILY +from advisor.db_options_parser import DatabaseOptions +from advisor.rule_parser import Condition, OptionCondition +import os +import unittest + + +class TestDatabaseOptions(unittest.TestCase): + def setUp(self): + self.this_path = os.path.abspath(os.path.dirname(__file__)) + self.og_options = os.path.join( + self.this_path, 'input_files/OPTIONS-000005' + ) + misc_options = [ + 'bloom_bits = 4', 'rate_limiter_bytes_per_sec = 1024000' + ] + # create the options object + self.db_options = DatabaseOptions(self.og_options, misc_options) + # perform clean-up before running tests + self.generated_options = os.path.join( + self.this_path, '../temp/OPTIONS_testing.tmp' + ) + if os.path.isfile(self.generated_options): + os.remove(self.generated_options) + + def test_get_options_diff(self): + old_opt = { + 'DBOptions.stats_dump_freq_sec': {NO_COL_FAMILY: '20'}, + 'CFOptions.write_buffer_size': { + 'default': '1024000', + 'col_fam_A': '128000', + 'col_fam_B': '128000000' + }, + 'DBOptions.use_fsync': {NO_COL_FAMILY: 'true'}, + 'DBOptions.max_log_file_size': {NO_COL_FAMILY: '128000000'} + } + new_opt = { + 'bloom_bits': {NO_COL_FAMILY: '4'}, + 'CFOptions.write_buffer_size': { + 'default': '128000000', + 'col_fam_A': '128000', + 'col_fam_C': '128000000' + }, + 'DBOptions.use_fsync': {NO_COL_FAMILY: 'true'}, + 'DBOptions.max_log_file_size': {NO_COL_FAMILY: '0'} + } + diff = DatabaseOptions.get_options_diff(old_opt, new_opt) + + expected_diff = { + 'DBOptions.stats_dump_freq_sec': {NO_COL_FAMILY: ('20', None)}, + 'bloom_bits': {NO_COL_FAMILY: (None, '4')}, + 'CFOptions.write_buffer_size': { + 'default': ('1024000', '128000000'), + 'col_fam_B': ('128000000', None), + 'col_fam_C': (None, '128000000') + }, + 'DBOptions.max_log_file_size': {NO_COL_FAMILY: ('128000000', '0')} + } + self.assertDictEqual(diff, expected_diff) + + def test_is_misc_option(self): + self.assertTrue(DatabaseOptions.is_misc_option('bloom_bits')) + self.assertFalse( + DatabaseOptions.is_misc_option('DBOptions.stats_dump_freq_sec') + ) + + def test_set_up(self): + options = self.db_options.get_all_options() + self.assertEqual(22, len(options.keys())) + expected_misc_options = { + 'bloom_bits': '4', 'rate_limiter_bytes_per_sec': '1024000' + } + self.assertDictEqual( + expected_misc_options, self.db_options.get_misc_options() + ) + self.assertListEqual( + ['default', 'col_fam_A'], self.db_options.get_column_families() + ) + + def test_get_options(self): + opt_to_get = [ + 'DBOptions.manual_wal_flush', 'DBOptions.db_write_buffer_size', + 'bloom_bits', 'CFOptions.compaction_filter_factory', + 'CFOptions.num_levels', 'rate_limiter_bytes_per_sec', + 'TableOptions.BlockBasedTable.block_align', 'random_option' + ] + options = self.db_options.get_options(opt_to_get) + expected_options = { + 'DBOptions.manual_wal_flush': {NO_COL_FAMILY: 'false'}, + 'DBOptions.db_write_buffer_size': {NO_COL_FAMILY: '0'}, + 'bloom_bits': {NO_COL_FAMILY: '4'}, + 'CFOptions.compaction_filter_factory': { + 'default': 'nullptr', 'col_fam_A': 'nullptr' + }, + 'CFOptions.num_levels': {'default': '7', 'col_fam_A': '5'}, + 'rate_limiter_bytes_per_sec': {NO_COL_FAMILY: '1024000'}, + 'TableOptions.BlockBasedTable.block_align': { + 'default': 'false', 'col_fam_A': 'true' + } + } + self.assertDictEqual(expected_options, options) + + def test_update_options(self): + # add new, update old, set old + # before updating + expected_old_opts = { + 'DBOptions.db_log_dir': {NO_COL_FAMILY: None}, + 'DBOptions.manual_wal_flush': {NO_COL_FAMILY: 'false'}, + 'bloom_bits': {NO_COL_FAMILY: '4'}, + 'CFOptions.num_levels': {'default': '7', 'col_fam_A': '5'}, + 'TableOptions.BlockBasedTable.block_restart_interval': { + 'col_fam_A': '16' + } + } + get_opts = list(expected_old_opts.keys()) + options = self.db_options.get_options(get_opts) + self.assertEqual(expected_old_opts, options) + # after updating options + update_opts = { + 'DBOptions.db_log_dir': {NO_COL_FAMILY: '/dev/shm'}, + 'DBOptions.manual_wal_flush': {NO_COL_FAMILY: 'true'}, + 'bloom_bits': {NO_COL_FAMILY: '2'}, + 'CFOptions.num_levels': {'col_fam_A': '7'}, + 'TableOptions.BlockBasedTable.block_restart_interval': { + 'default': '32' + }, + 'random_misc_option': {NO_COL_FAMILY: 'something'} + } + self.db_options.update_options(update_opts) + update_opts['CFOptions.num_levels']['default'] = '7' + update_opts['TableOptions.BlockBasedTable.block_restart_interval'] = { + 'default': '32', 'col_fam_A': '16' + } + get_opts.append('random_misc_option') + options = self.db_options.get_options(get_opts) + self.assertDictEqual(update_opts, options) + expected_misc_options = { + 'bloom_bits': '2', + 'rate_limiter_bytes_per_sec': '1024000', + 'random_misc_option': 'something' + } + self.assertDictEqual( + expected_misc_options, self.db_options.get_misc_options() + ) + + def test_generate_options_config(self): + # make sure file does not exist from before + self.assertFalse(os.path.isfile(self.generated_options)) + self.db_options.generate_options_config('testing') + self.assertTrue(os.path.isfile(self.generated_options)) + + def test_check_and_trigger_conditions(self): + # options only from CFOptions + # setup the OptionCondition objects to check and trigger + update_dict = { + 'CFOptions.level0_file_num_compaction_trigger': {'col_fam_A': '4'}, + 'CFOptions.max_bytes_for_level_base': {'col_fam_A': '10'} + } + self.db_options.update_options(update_dict) + cond1 = Condition('opt-cond-1') + cond1 = OptionCondition.create(cond1) + cond1.set_parameter( + 'options', [ + 'CFOptions.level0_file_num_compaction_trigger', + 'TableOptions.BlockBasedTable.block_restart_interval', + 'CFOptions.max_bytes_for_level_base' + ] + ) + cond1.set_parameter( + 'evaluate', + 'int(options[0])*int(options[1])-int(options[2])>=0' + ) + # only DBOptions + cond2 = Condition('opt-cond-2') + cond2 = OptionCondition.create(cond2) + cond2.set_parameter( + 'options', [ + 'DBOptions.db_write_buffer_size', + 'bloom_bits', + 'rate_limiter_bytes_per_sec' + ] + ) + cond2.set_parameter( + 'evaluate', + '(int(options[2]) * int(options[1]) * int(options[0]))==0' + ) + # mix of CFOptions and DBOptions + cond3 = Condition('opt-cond-3') + cond3 = OptionCondition.create(cond3) + cond3.set_parameter( + 'options', [ + 'DBOptions.db_write_buffer_size', # 0 + 'CFOptions.num_levels', # 5, 7 + 'bloom_bits' # 4 + ] + ) + cond3.set_parameter( + 'evaluate', 'int(options[2])*int(options[0])+int(options[1])>6' + ) + self.db_options.check_and_trigger_conditions([cond1, cond2, cond3]) + + cond1_trigger = {'col_fam_A': ['4', '16', '10']} + self.assertDictEqual(cond1_trigger, cond1.get_trigger()) + cond2_trigger = {NO_COL_FAMILY: ['0', '4', '1024000']} + self.assertDictEqual(cond2_trigger, cond2.get_trigger()) + cond3_trigger = {'default': ['0', '7', '4']} + self.assertDictEqual(cond3_trigger, cond3.get_trigger()) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/rocksdb/tools/advisor/test/test_db_stats_fetcher.py b/src/rocksdb/tools/advisor/test/test_db_stats_fetcher.py new file mode 100644 index 000000000..afbbe8339 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/test_db_stats_fetcher.py @@ -0,0 +1,126 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +from advisor.db_stats_fetcher import LogStatsParser, DatabasePerfContext +from advisor.db_timeseries_parser import NO_ENTITY +from advisor.rule_parser import Condition, TimeSeriesCondition +import os +import time +import unittest +from unittest.mock import MagicMock + + +class TestLogStatsParser(unittest.TestCase): + def setUp(self): + this_path = os.path.abspath(os.path.dirname(__file__)) + stats_file = os.path.join( + this_path, 'input_files/log_stats_parser_keys_ts' + ) + # populate the keys_ts dictionary of LogStatsParser + self.stats_dict = {NO_ENTITY: {}} + with open(stats_file, 'r') as fp: + for line in fp: + stat_name = line.split(':')[0].strip() + self.stats_dict[NO_ENTITY][stat_name] = {} + token_list = line.split(':')[1].strip().split(',') + for token in token_list: + timestamp = int(token.split()[0]) + value = float(token.split()[1]) + self.stats_dict[NO_ENTITY][stat_name][timestamp] = value + self.log_stats_parser = LogStatsParser('dummy_log_file', 20) + self.log_stats_parser.keys_ts = self.stats_dict + + def test_check_and_trigger_conditions_bursty(self): + # mock fetch_timeseries() because 'keys_ts' has been pre-populated + self.log_stats_parser.fetch_timeseries = MagicMock() + # condition: bursty + cond1 = Condition('cond-1') + cond1 = TimeSeriesCondition.create(cond1) + cond1.set_parameter('keys', 'rocksdb.db.get.micros.p50') + cond1.set_parameter('behavior', 'bursty') + cond1.set_parameter('window_sec', 40) + cond1.set_parameter('rate_threshold', 0) + self.log_stats_parser.check_and_trigger_conditions([cond1]) + expected_cond_trigger = { + NO_ENTITY: {1530896440: 0.9767546362322214} + } + self.assertDictEqual(expected_cond_trigger, cond1.get_trigger()) + # ensure that fetch_timeseries() was called once + self.log_stats_parser.fetch_timeseries.assert_called_once() + + def test_check_and_trigger_conditions_eval_agg(self): + # mock fetch_timeseries() because 'keys_ts' has been pre-populated + self.log_stats_parser.fetch_timeseries = MagicMock() + # condition: evaluate_expression + cond1 = Condition('cond-1') + cond1 = TimeSeriesCondition.create(cond1) + cond1.set_parameter('keys', 'rocksdb.db.get.micros.p50') + cond1.set_parameter('behavior', 'evaluate_expression') + keys = [ + 'rocksdb.manifest.file.sync.micros.p99', + 'rocksdb.db.get.micros.p50' + ] + cond1.set_parameter('keys', keys) + cond1.set_parameter('aggregation_op', 'latest') + # condition evaluates to FALSE + cond1.set_parameter('evaluate', 'keys[0]-(keys[1]*100)>200') + self.log_stats_parser.check_and_trigger_conditions([cond1]) + expected_cond_trigger = {NO_ENTITY: [1792.0, 15.9638]} + self.assertIsNone(cond1.get_trigger()) + # condition evaluates to TRUE + cond1.set_parameter('evaluate', 'keys[0]-(keys[1]*100)<200') + self.log_stats_parser.check_and_trigger_conditions([cond1]) + expected_cond_trigger = {NO_ENTITY: [1792.0, 15.9638]} + self.assertDictEqual(expected_cond_trigger, cond1.get_trigger()) + # ensure that fetch_timeseries() was called + self.log_stats_parser.fetch_timeseries.assert_called() + + def test_check_and_trigger_conditions_eval(self): + # mock fetch_timeseries() because 'keys_ts' has been pre-populated + self.log_stats_parser.fetch_timeseries = MagicMock() + # condition: evaluate_expression + cond1 = Condition('cond-1') + cond1 = TimeSeriesCondition.create(cond1) + cond1.set_parameter('keys', 'rocksdb.db.get.micros.p50') + cond1.set_parameter('behavior', 'evaluate_expression') + keys = [ + 'rocksdb.manifest.file.sync.micros.p99', + 'rocksdb.db.get.micros.p50' + ] + cond1.set_parameter('keys', keys) + cond1.set_parameter('evaluate', 'keys[0]-(keys[1]*100)>500') + self.log_stats_parser.check_and_trigger_conditions([cond1]) + expected_trigger = {NO_ENTITY: { + 1530896414: [9938.0, 16.31508], + 1530896440: [9938.0, 16.346602], + 1530896466: [9938.0, 16.284669], + 1530896492: [9938.0, 16.16005] + }} + self.assertDictEqual(expected_trigger, cond1.get_trigger()) + self.log_stats_parser.fetch_timeseries.assert_called_once() + + +class TestDatabasePerfContext(unittest.TestCase): + def test_unaccumulate_metrics(self): + perf_dict = { + "user_key_comparison_count": 675903942, + "block_cache_hit_count": 830086, + } + timestamp = int(time.time()) + perf_ts = {} + for key in perf_dict: + perf_ts[key] = {} + start_val = perf_dict[key] + for ix in range(5): + perf_ts[key][timestamp+(ix*10)] = start_val + (2 * ix * ix) + db_perf_context = DatabasePerfContext(perf_ts, 10, True) + timestamps = [timestamp+(ix*10) for ix in range(1, 5, 1)] + values = [val for val in range(2, 15, 4)] + inner_dict = {timestamps[ix]: values[ix] for ix in range(4)} + expected_keys_ts = {NO_ENTITY: { + 'user_key_comparison_count': inner_dict, + 'block_cache_hit_count': inner_dict + }} + self.assertDictEqual(expected_keys_ts, db_perf_context.keys_ts) diff --git a/src/rocksdb/tools/advisor/test/test_rule_parser.py b/src/rocksdb/tools/advisor/test/test_rule_parser.py new file mode 100644 index 000000000..9f1d0bf5c --- /dev/null +++ b/src/rocksdb/tools/advisor/test/test_rule_parser.py @@ -0,0 +1,234 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import os +import unittest +from advisor.rule_parser import RulesSpec +from advisor.db_log_parser import DatabaseLogs, DataSource +from advisor.db_options_parser import DatabaseOptions + +RuleToSuggestions = { + "stall-too-many-memtables": [ + 'inc-bg-flush', + 'inc-write-buffer' + ], + "stall-too-many-L0": [ + 'inc-max-subcompactions', + 'inc-max-bg-compactions', + 'inc-write-buffer-size', + 'dec-max-bytes-for-level-base', + 'inc-l0-slowdown-writes-trigger' + ], + "stop-too-many-L0": [ + 'inc-max-bg-compactions', + 'inc-write-buffer-size', + 'inc-l0-stop-writes-trigger' + ], + "stall-too-many-compaction-bytes": [ + 'inc-max-bg-compactions', + 'inc-write-buffer-size', + 'inc-hard-pending-compaction-bytes-limit', + 'inc-soft-pending-compaction-bytes-limit' + ], + "level0-level1-ratio": [ + 'l0-l1-ratio-health-check' + ] +} + + +class TestAllRulesTriggered(unittest.TestCase): + def setUp(self): + # load the Rules + this_path = os.path.abspath(os.path.dirname(__file__)) + ini_path = os.path.join(this_path, 'input_files/triggered_rules.ini') + self.db_rules = RulesSpec(ini_path) + self.db_rules.load_rules_from_spec() + self.db_rules.perform_section_checks() + # load the data sources: LOG and OPTIONS + log_path = os.path.join(this_path, 'input_files/LOG-0') + options_path = os.path.join(this_path, 'input_files/OPTIONS-000005') + db_options_parser = DatabaseOptions(options_path) + self.column_families = db_options_parser.get_column_families() + db_logs_parser = DatabaseLogs(log_path, self.column_families) + self.data_sources = { + DataSource.Type.DB_OPTIONS: [db_options_parser], + DataSource.Type.LOG: [db_logs_parser] + } + + def test_triggered_conditions(self): + conditions_dict = self.db_rules.get_conditions_dict() + rules_dict = self.db_rules.get_rules_dict() + # Make sure none of the conditions is triggered beforehand + for cond in conditions_dict.values(): + self.assertFalse(cond.is_triggered(), repr(cond)) + for rule in rules_dict.values(): + self.assertFalse( + rule.is_triggered(conditions_dict, self.column_families), + repr(rule) + ) + + # # Trigger the conditions as per the data sources. + # trigger_conditions(, conditions_dict) + + # Get the set of rules that have been triggered + triggered_rules = self.db_rules.get_triggered_rules( + self.data_sources, self.column_families + ) + + # Make sure each condition and rule is triggered + for cond in conditions_dict.values(): + if cond.get_data_source() is DataSource.Type.TIME_SERIES: + continue + self.assertTrue(cond.is_triggered(), repr(cond)) + + for rule in rules_dict.values(): + self.assertIn(rule, triggered_rules) + # Check the suggestions made by the triggered rules + for sugg in rule.get_suggestions(): + self.assertIn(sugg, RuleToSuggestions[rule.name]) + + for rule in triggered_rules: + self.assertIn(rule, rules_dict.values()) + for sugg in RuleToSuggestions[rule.name]: + self.assertIn(sugg, rule.get_suggestions()) + + +class TestConditionsConjunctions(unittest.TestCase): + def setUp(self): + # load the Rules + this_path = os.path.abspath(os.path.dirname(__file__)) + ini_path = os.path.join(this_path, 'input_files/test_rules.ini') + self.db_rules = RulesSpec(ini_path) + self.db_rules.load_rules_from_spec() + self.db_rules.perform_section_checks() + # load the data sources: LOG and OPTIONS + log_path = os.path.join(this_path, 'input_files/LOG-1') + options_path = os.path.join(this_path, 'input_files/OPTIONS-000005') + db_options_parser = DatabaseOptions(options_path) + self.column_families = db_options_parser.get_column_families() + db_logs_parser = DatabaseLogs(log_path, self.column_families) + self.data_sources = { + DataSource.Type.DB_OPTIONS: [db_options_parser], + DataSource.Type.LOG: [db_logs_parser] + } + + def test_condition_conjunctions(self): + conditions_dict = self.db_rules.get_conditions_dict() + rules_dict = self.db_rules.get_rules_dict() + # Make sure none of the conditions is triggered beforehand + for cond in conditions_dict.values(): + self.assertFalse(cond.is_triggered(), repr(cond)) + for rule in rules_dict.values(): + self.assertFalse( + rule.is_triggered(conditions_dict, self.column_families), + repr(rule) + ) + + # Trigger the conditions as per the data sources. + self.db_rules.trigger_conditions(self.data_sources) + + # Check for the conditions + conds_triggered = ['log-1-true', 'log-2-true', 'log-3-true'] + conds_not_triggered = ['log-4-false', 'options-1-false'] + for cond in conds_triggered: + self.assertTrue(conditions_dict[cond].is_triggered(), repr(cond)) + for cond in conds_not_triggered: + self.assertFalse(conditions_dict[cond].is_triggered(), repr(cond)) + + # Check for the rules + rules_triggered = ['multiple-conds-true'] + rules_not_triggered = [ + 'single-condition-false', + 'multiple-conds-one-false', + 'multiple-conds-all-false' + ] + for rule_name in rules_triggered: + rule = rules_dict[rule_name] + self.assertTrue( + rule.is_triggered(conditions_dict, self.column_families), + repr(rule) + ) + for rule_name in rules_not_triggered: + rule = rules_dict[rule_name] + self.assertFalse( + rule.is_triggered(conditions_dict, self.column_families), + repr(rule) + ) + + +class TestSanityChecker(unittest.TestCase): + def setUp(self): + this_path = os.path.abspath(os.path.dirname(__file__)) + ini_path = os.path.join(this_path, 'input_files/rules_err1.ini') + db_rules = RulesSpec(ini_path) + db_rules.load_rules_from_spec() + self.rules_dict = db_rules.get_rules_dict() + self.conditions_dict = db_rules.get_conditions_dict() + self.suggestions_dict = db_rules.get_suggestions_dict() + + def test_rule_missing_suggestions(self): + regex = '.*rule must have at least one suggestion.*' + with self.assertRaisesRegex(ValueError, regex): + self.rules_dict['missing-suggestions'].perform_checks() + + def test_rule_missing_conditions(self): + regex = '.*rule must have at least one condition.*' + with self.assertRaisesRegex(ValueError, regex): + self.rules_dict['missing-conditions'].perform_checks() + + def test_condition_missing_regex(self): + regex = '.*provide regex for log condition.*' + with self.assertRaisesRegex(ValueError, regex): + self.conditions_dict['missing-regex'].perform_checks() + + def test_condition_missing_options(self): + regex = '.*options missing in condition.*' + with self.assertRaisesRegex(ValueError, regex): + self.conditions_dict['missing-options'].perform_checks() + + def test_condition_missing_expression(self): + regex = '.*expression missing in condition.*' + with self.assertRaisesRegex(ValueError, regex): + self.conditions_dict['missing-expression'].perform_checks() + + def test_suggestion_missing_option(self): + regex = '.*provide option or description.*' + with self.assertRaisesRegex(ValueError, regex): + self.suggestions_dict['missing-option'].perform_checks() + + def test_suggestion_missing_description(self): + regex = '.*provide option or description.*' + with self.assertRaisesRegex(ValueError, regex): + self.suggestions_dict['missing-description'].perform_checks() + + +class TestParsingErrors(unittest.TestCase): + def setUp(self): + self.this_path = os.path.abspath(os.path.dirname(__file__)) + + def test_condition_missing_source(self): + ini_path = os.path.join(self.this_path, 'input_files/rules_err2.ini') + db_rules = RulesSpec(ini_path) + regex = '.*provide source for condition.*' + with self.assertRaisesRegex(NotImplementedError, regex): + db_rules.load_rules_from_spec() + + def test_suggestion_missing_action(self): + ini_path = os.path.join(self.this_path, 'input_files/rules_err3.ini') + db_rules = RulesSpec(ini_path) + regex = '.*provide action for option.*' + with self.assertRaisesRegex(ValueError, regex): + db_rules.load_rules_from_spec() + + def test_section_no_name(self): + ini_path = os.path.join(self.this_path, 'input_files/rules_err4.ini') + db_rules = RulesSpec(ini_path) + regex = 'Parsing error: needed section header:.*' + with self.assertRaisesRegex(ValueError, regex): + db_rules.load_rules_from_spec() + + +if __name__ == '__main__': + unittest.main() |