diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/tools/advisor | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/tools/advisor')
30 files changed, 3870 insertions, 0 deletions
diff --git a/src/rocksdb/tools/advisor/README.md b/src/rocksdb/tools/advisor/README.md new file mode 100644 index 000000000..b02d7ec50 --- /dev/null +++ b/src/rocksdb/tools/advisor/README.md @@ -0,0 +1,96 @@ +# Rocksdb Tuning Advisor + +## Motivation + +The performance of Rocksdb is contingent on its tuning. However, +because of the complexity of its underlying technology and a large number of +configurable parameters, a good configuration is sometimes hard to obtain. The aim of +the python command-line tool, Rocksdb Advisor, is to automate the process of +suggesting improvements in the configuration based on advice from Rocksdb +experts. + +## Overview + +Experts share their wisdom as rules comprising of conditions and suggestions in the INI format (refer +[rules.ini](https://github.com/facebook/rocksdb/blob/main/tools/advisor/advisor/rules.ini)). +Users provide the Rocksdb configuration that they want to improve upon (as the +familiar Rocksdb OPTIONS file — +[example](https://github.com/facebook/rocksdb/blob/main/examples/rocksdb_option_file_example.ini)) +and the path of the file which contains Rocksdb logs and statistics. +The [Advisor](https://github.com/facebook/rocksdb/blob/main/tools/advisor/advisor/rule_parser_example.py) +creates appropriate DataSource objects (for Rocksdb +[logs](https://github.com/facebook/rocksdb/blob/main/tools/advisor/advisor/db_log_parser.py), +[options](https://github.com/facebook/rocksdb/blob/main/tools/advisor/advisor/db_options_parser.py), +[statistics](https://github.com/facebook/rocksdb/blob/main/tools/advisor/advisor/db_stats_fetcher.py) etc.) +and provides them to the [Rules Engine](https://github.com/facebook/rocksdb/blob/main/tools/advisor/advisor/rule_parser.py). +The Rules uses rules from experts to parse data-sources and trigger appropriate rules. +The Advisor's output gives information about which rules were triggered, +why they were triggered and what each of them suggests. Each suggestion +provided by a triggered rule advises some action on a Rocksdb +configuration option, for example, increase CFOptions.write_buffer_size, +set bloom_bits to 2 etc. + +## Usage + +### Prerequisites +The tool needs the following to run: +* python3 + +### Running the tool +An example command to run the tool: + +```shell +cd rocksdb/tools/advisor +python3 -m advisor.rule_parser_example --rules_spec=advisor/rules.ini --rocksdb_options=test/input_files/OPTIONS-000005 --log_files_path_prefix=test/input_files/LOG-0 --stats_dump_period_sec=20 +``` + +### Command-line arguments + +Most important amongst all the input that the Advisor needs, are the rules +spec and starting Rocksdb configuration. The configuration is provided as the +familiar Rocksdb Options file (refer [example](https://github.com/facebook/rocksdb/blob/main/examples/rocksdb_option_file_example.ini)). +The Rules spec is written in the INI format (more details in +[rules.ini](https://github.com/facebook/rocksdb/blob/main/tools/advisor/advisor/rules.ini)). + +In brief, a Rule is made of conditions and is triggered when all its +constituent conditions are triggered. When triggered, a Rule suggests changes +(increase/decrease/set to a suggested value) to certain Rocksdb options that +aim to improve Rocksdb performance. Every Condition has a 'source' i.e. +the data source that would be checked for triggering that condition. +For example, a log Condition (with 'source=LOG') is triggered if a particular +'regex' is found in the Rocksdb LOG files. As of now the Rules Engine +supports 3 types of Conditions (and consequently data-sources): +LOG, OPTIONS, TIME_SERIES. The TIME_SERIES data can be sourced from the +Rocksdb [statistics](https://github.com/facebook/rocksdb/blob/main/include/rocksdb/statistics.h) +or [perf context](https://github.com/facebook/rocksdb/blob/main/include/rocksdb/perf_context.h). + +For more information about the remaining command-line arguments, run: + +```shell +cd rocksdb/tools/advisor +python3 -m advisor.rule_parser_example --help +``` + +### Sample output + +Here, a Rocksdb log-based rule has been triggered: + +```shell +Rule: stall-too-many-memtables +LogCondition: stall-too-many-memtables regex: Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ +Suggestion: inc-bg-flush option : DBOptions.max_background_flushes action : increase suggested_values : ['2'] +Suggestion: inc-write-buffer option : CFOptions.max_write_buffer_number action : increase +scope: col_fam: +{'default'} +``` + +## Running the tests + +Tests for the code have been added to the +[test/](https://github.com/facebook/rocksdb/tree/main/tools/advisor/test) +directory. For example, to run the unit tests for db_log_parser.py: + +```shell +cd rocksdb/tools/advisor +python3 -m unittest -v test.test_db_log_parser +``` diff --git a/src/rocksdb/tools/advisor/advisor/__init__.py b/src/rocksdb/tools/advisor/advisor/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/__init__.py diff --git a/src/rocksdb/tools/advisor/advisor/bench_runner.py b/src/rocksdb/tools/advisor/advisor/bench_runner.py new file mode 100644 index 000000000..45d6c8313 --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/bench_runner.py @@ -0,0 +1,39 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import re +from abc import ABC, abstractmethod + + +class BenchmarkRunner(ABC): + @staticmethod + @abstractmethod + def is_metric_better(new_metric, old_metric): + pass + + @abstractmethod + def run_experiment(self): + # should return a list of DataSource objects + pass + + @staticmethod + def get_info_log_file_name(log_dir, db_path): + # Example: DB Path = /dev/shm and OPTIONS file has option + # db_log_dir=/tmp/rocks/, then the name of the log file will be + # 'dev_shm_LOG' and its location will be /tmp/rocks. If db_log_dir is + # not specified in the OPTIONS file, then the location of the log file + # will be /dev/shm and the name of the file will be 'LOG' + file_name = "" + if log_dir: + # refer GetInfoLogPrefix() in rocksdb/util/filename.cc + # example db_path: /dev/shm/dbbench + file_name = db_path[1:] # to ignore the leading '/' character + to_be_replaced = re.compile("[^0-9a-zA-Z\-_\.]") # noqa + for character in to_be_replaced.findall(db_path): + file_name = file_name.replace(character, "_") + if not file_name.endswith("_"): + file_name += "_" + file_name += "LOG" + return file_name diff --git a/src/rocksdb/tools/advisor/advisor/config_optimizer_example.py b/src/rocksdb/tools/advisor/advisor/config_optimizer_example.py new file mode 100644 index 000000000..40e2bb953 --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/config_optimizer_example.py @@ -0,0 +1,140 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import argparse + +from advisor.db_config_optimizer import ConfigOptimizer +from advisor.db_log_parser import NO_COL_FAMILY +from advisor.db_options_parser import DatabaseOptions +from advisor.rule_parser import RulesSpec + + +CONFIG_OPT_NUM_ITER = 10 + + +def main(args): + # initialise the RulesSpec parser + rule_spec_parser = RulesSpec(args.rules_spec) + # initialise the benchmark runner + bench_runner_module = __import__( + args.benchrunner_module, fromlist=[args.benchrunner_class] + ) + bench_runner_class = getattr(bench_runner_module, args.benchrunner_class) + ods_args = {} + if args.ods_client and args.ods_entity: + ods_args["client_script"] = args.ods_client + ods_args["entity"] = args.ods_entity + if args.ods_key_prefix: + ods_args["key_prefix"] = args.ods_key_prefix + db_bench_runner = bench_runner_class(args.benchrunner_pos_args, ods_args) + # initialise the database configuration + db_options = DatabaseOptions(args.rocksdb_options, args.misc_options) + # set the frequency at which stats are dumped in the LOG file and the + # location of the LOG file. + db_log_dump_settings = { + "DBOptions.stats_dump_period_sec": {NO_COL_FAMILY: args.stats_dump_period_sec} + } + db_options.update_options(db_log_dump_settings) + # initialise the configuration optimizer + config_optimizer = ConfigOptimizer( + db_bench_runner, db_options, rule_spec_parser, args.base_db_path + ) + # run the optimiser to improve the database configuration for given + # benchmarks, with the help of expert-specified rules + final_db_options = config_optimizer.run() + # generate the final rocksdb options file + print( + "Final configuration in: " + final_db_options.generate_options_config("final") + ) + print("Final miscellaneous options: " + repr(final_db_options.get_misc_options())) + + +if __name__ == "__main__": + """ + An example run of this tool from the command-line would look like: + python3 -m advisor.config_optimizer_example + --base_db_path=/tmp/rocksdbtest-155919/dbbench + --rocksdb_options=temp/OPTIONS_boot.tmp --misc_options bloom_bits=2 + --rules_spec=advisor/rules.ini --stats_dump_period_sec=20 + --benchrunner_module=advisor.db_bench_runner + --benchrunner_class=DBBenchRunner --benchrunner_pos_args ./../../db_bench + readwhilewriting use_existing_db=true duration=90 + """ + parser = argparse.ArgumentParser( + description="This script is used for\ + searching for a better database configuration" + ) + parser.add_argument( + "--rocksdb_options", + required=True, + type=str, + help="path of the starting Rocksdb OPTIONS file", + ) + # these are options that are column-family agnostic and are not yet + # supported by the Rocksdb Options file: eg. bloom_bits=2 + parser.add_argument( + "--misc_options", + nargs="*", + help="whitespace-separated list of options that are not supported " + + "by the Rocksdb OPTIONS file, given in the " + + '<option_name>=<option_value> format eg. "bloom_bits=2 ' + + 'rate_limiter_bytes_per_sec=128000000"', + ) + parser.add_argument( + "--base_db_path", required=True, type=str, help="path for the Rocksdb database" + ) + parser.add_argument( + "--rules_spec", + required=True, + type=str, + help="path of the file containing the expert-specified Rules", + ) + parser.add_argument( + "--stats_dump_period_sec", + required=True, + type=int, + help="the frequency (in seconds) at which STATISTICS are printed to " + + "the Rocksdb LOG file", + ) + # ODS arguments + parser.add_argument("--ods_client", type=str, help="the ODS client binary") + parser.add_argument( + "--ods_entity", + type=str, + help="the servers for which the ODS stats need to be fetched", + ) + parser.add_argument( + "--ods_key_prefix", + type=str, + help="the prefix that needs to be attached to the keys of time " + + "series to be fetched from ODS", + ) + # benchrunner_module example: advisor.db_benchmark_client + parser.add_argument( + "--benchrunner_module", + required=True, + type=str, + help="the module containing the BenchmarkRunner class to be used by " + + "the Optimizer, example: advisor.db_bench_runner", + ) + # benchrunner_class example: DBBenchRunner + parser.add_argument( + "--benchrunner_class", + required=True, + type=str, + help="the name of the BenchmarkRunner class to be used by the " + + "Optimizer, should be present in the module provided in the " + + "benchrunner_module argument, example: DBBenchRunner", + ) + parser.add_argument( + "--benchrunner_pos_args", + nargs="*", + help="whitespace-separated positional arguments that are passed on " + + "to the constructor of the BenchmarkRunner class provided in the " + + 'benchrunner_class argument, example: "use_existing_db=true ' + + 'duration=900"', + ) + args = parser.parse_args() + main(args) diff --git a/src/rocksdb/tools/advisor/advisor/db_bench_runner.py b/src/rocksdb/tools/advisor/advisor/db_bench_runner.py new file mode 100644 index 000000000..f5802ed15 --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/db_bench_runner.py @@ -0,0 +1,237 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import shutil +import subprocess +import time + +from advisor.bench_runner import BenchmarkRunner +from advisor.db_log_parser import DatabaseLogs, DataSource, NO_COL_FAMILY +from advisor.db_stats_fetcher import ( + DatabasePerfContext, + LogStatsParser, + OdsStatsFetcher, +) + + +""" +NOTE: This is not thread-safe, because the output file is simply overwritten. +""" + + +class DBBenchRunner(BenchmarkRunner): + OUTPUT_FILE = "temp/dbbench_out.tmp" + ERROR_FILE = "temp/dbbench_err.tmp" + DB_PATH = "DB path" + THROUGHPUT = "ops/sec" + PERF_CON = " PERF_CONTEXT:" + + @staticmethod + def is_metric_better(new_metric, old_metric): + # for db_bench 'throughput' is the metric returned by run_experiment + return new_metric >= old_metric + + @staticmethod + def get_opt_args_str(misc_options_dict): + # given a dictionary of options and their values, return a string + # that can be appended as command-line arguments + optional_args_str = "" + for option_name, option_value in misc_options_dict.items(): + if option_value: + optional_args_str += " --" + option_name + "=" + str(option_value) + return optional_args_str + + def __init__(self, positional_args, ods_args=None): + # parse positional_args list appropriately + self.db_bench_binary = positional_args[0] + self.benchmark = positional_args[1] + self.db_bench_args = None + if len(positional_args) > 2: + # options list with each option given as "<option>=<value>" + self.db_bench_args = positional_args[2:] + # save ods_args, if provided + self.ods_args = ods_args + + def _parse_output(self, get_perf_context=False): + """ + Sample db_bench output after running 'readwhilewriting' benchmark: + DB path: [/tmp/rocksdbtest-155919/dbbench]\n + readwhilewriting : 16.582 micros/op 60305 ops/sec; 4.2 MB/s (3433828\ + of 5427999 found)\n + PERF_CONTEXT:\n + user_key_comparison_count = 500466712, block_cache_hit_count = ...\n + """ + output = {self.THROUGHPUT: None, self.DB_PATH: None, self.PERF_CON: None} + perf_context_begins = False + with open(self.OUTPUT_FILE, "r") as fp: + for line in fp: + if line.startswith(self.benchmark): + # line from sample output: + # readwhilewriting : 16.582 micros/op 60305 ops/sec; \ + # 4.2 MB/s (3433828 of 5427999 found)\n + print(line) # print output of the benchmark run + token_list = line.strip().split() + for ix, token in enumerate(token_list): + if token.startswith(self.THROUGHPUT): + # in above example, throughput = 60305 ops/sec + output[self.THROUGHPUT] = float(token_list[ix - 1]) + break + elif get_perf_context and line.startswith(self.PERF_CON): + # the following lines in the output contain perf context + # statistics (refer example above) + perf_context_begins = True + elif get_perf_context and perf_context_begins: + # Sample perf_context output: + # user_key_comparison_count = 500, block_cache_hit_count =\ + # 468, block_read_count = 580, block_read_byte = 445, ... + token_list = line.strip().split(",") + # token_list = ['user_key_comparison_count = 500', + # 'block_cache_hit_count = 468','block_read_count = 580'... + perf_context = { + tk.split("=")[0].strip(): tk.split("=")[1].strip() + for tk in token_list + if tk + } + # TODO(poojam23): this is a hack and should be replaced + # with the timestamp that db_bench will provide per printed + # perf_context + timestamp = int(time.time()) + perf_context_ts = {} + for stat in perf_context.keys(): + perf_context_ts[stat] = {timestamp: int(perf_context[stat])} + output[self.PERF_CON] = perf_context_ts + perf_context_begins = False + elif line.startswith(self.DB_PATH): + # line from sample output: + # DB path: [/tmp/rocksdbtest-155919/dbbench]\n + output[self.DB_PATH] = line.split("[")[1].split("]")[0] + return output + + def get_log_options(self, db_options, db_path): + # get the location of the LOG file and the frequency at which stats are + # dumped in the LOG file + log_dir_path = None + stats_freq_sec = None + logs_file_prefix = None + + # fetch frequency at which the stats are dumped in the Rocksdb logs + dump_period = "DBOptions.stats_dump_period_sec" + # fetch the directory, if specified, in which the Rocksdb logs are + # dumped, by default logs are dumped in same location as database + log_dir = "DBOptions.db_log_dir" + log_options = db_options.get_options([dump_period, log_dir]) + if dump_period in log_options: + stats_freq_sec = int(log_options[dump_period][NO_COL_FAMILY]) + if log_dir in log_options: + log_dir_path = log_options[log_dir][NO_COL_FAMILY] + + log_file_name = DBBenchRunner.get_info_log_file_name(log_dir_path, db_path) + + if not log_dir_path: + log_dir_path = db_path + if not log_dir_path.endswith("/"): + log_dir_path += "/" + + logs_file_prefix = log_dir_path + log_file_name + return (logs_file_prefix, stats_freq_sec) + + def _get_options_command_line_args_str(self, curr_options): + """ + This method uses the provided Rocksdb OPTIONS to create a string of + command-line arguments for db_bench. + The --options_file argument is always given and the options that are + not supported by the OPTIONS file are given as separate arguments. + """ + optional_args_str = DBBenchRunner.get_opt_args_str( + curr_options.get_misc_options() + ) + # generate an options configuration file + options_file = curr_options.generate_options_config(nonce="12345") + optional_args_str += " --options_file=" + options_file + return optional_args_str + + def _setup_db_before_experiment(self, curr_options, db_path): + # remove destination directory if it already exists + try: + shutil.rmtree(db_path, ignore_errors=True) + except OSError as e: + print("Error: rmdir " + e.filename + " " + e.strerror) + # setup database with a million keys using the fillrandom benchmark + command = "%s --benchmarks=fillrandom --db=%s --num=1000000" % ( + self.db_bench_binary, + db_path, + ) + args_str = self._get_options_command_line_args_str(curr_options) + command += args_str + self._run_command(command) + + def _build_experiment_command(self, curr_options, db_path): + command = "%s --benchmarks=%s --statistics --perf_level=3 --db=%s" % ( + self.db_bench_binary, + self.benchmark, + db_path, + ) + # fetch the command-line arguments string for providing Rocksdb options + args_str = self._get_options_command_line_args_str(curr_options) + # handle the command-line args passed in the constructor, these + # arguments are specific to db_bench + for cmd_line_arg in self.db_bench_args: + args_str += " --" + cmd_line_arg + command += args_str + return command + + def _run_command(self, command): + out_file = open(self.OUTPUT_FILE, "w+") + err_file = open(self.ERROR_FILE, "w+") + print("executing... - " + command) + subprocess.call(command, shell=True, stdout=out_file, stderr=err_file) + out_file.close() + err_file.close() + + def run_experiment(self, db_options, db_path): + # setup the Rocksdb database before running experiment + self._setup_db_before_experiment(db_options, db_path) + # get the command to run the experiment + command = self._build_experiment_command(db_options, db_path) + experiment_start_time = int(time.time()) + # run experiment + self._run_command(command) + experiment_end_time = int(time.time()) + # parse the db_bench experiment output + parsed_output = self._parse_output(get_perf_context=True) + + # get the log files path prefix and frequency at which Rocksdb stats + # are dumped in the logs + logs_file_prefix, stats_freq_sec = self.get_log_options( + db_options, parsed_output[self.DB_PATH] + ) + # create the Rocksbd LOGS object + db_logs = DatabaseLogs(logs_file_prefix, db_options.get_column_families()) + # Create the Log STATS object + db_log_stats = LogStatsParser(logs_file_prefix, stats_freq_sec) + # Create the PerfContext STATS object + db_perf_context = DatabasePerfContext(parsed_output[self.PERF_CON], 0, False) + # create the data-sources dictionary + data_sources = { + DataSource.Type.DB_OPTIONS: [db_options], + DataSource.Type.LOG: [db_logs], + DataSource.Type.TIME_SERIES: [db_log_stats, db_perf_context], + } + # Create the ODS STATS object + if self.ods_args: + key_prefix = "" + if "key_prefix" in self.ods_args: + key_prefix = self.ods_args["key_prefix"] + data_sources[DataSource.Type.TIME_SERIES].append( + OdsStatsFetcher( + self.ods_args["client_script"], + self.ods_args["entity"], + experiment_start_time, + experiment_end_time, + key_prefix, + ) + ) + # return the experiment's data-sources and throughput + return data_sources, parsed_output[self.THROUGHPUT] diff --git a/src/rocksdb/tools/advisor/advisor/db_config_optimizer.py b/src/rocksdb/tools/advisor/advisor/db_config_optimizer.py new file mode 100644 index 000000000..413778478 --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/db_config_optimizer.py @@ -0,0 +1,293 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import copy +import random + +from advisor.db_log_parser import NO_COL_FAMILY +from advisor.db_options_parser import DatabaseOptions +from advisor.rule_parser import Suggestion + + +class ConfigOptimizer: + SCOPE = "scope" + SUGG_VAL = "suggested values" + + @staticmethod + def apply_action_on_value(old_value, action, suggested_values): + chosen_sugg_val = None + if suggested_values: + chosen_sugg_val = random.choice(list(suggested_values)) + new_value = None + if action is Suggestion.Action.set or not old_value: + assert chosen_sugg_val + new_value = chosen_sugg_val + else: + # For increase/decrease actions, currently the code tries to make + # a 30% change in the option's value per iteration. An addend is + # also present (+1 or -1) to handle the cases when the option's + # old value was 0 or the final int() conversion suppressed the 30% + # change made to the option + old_value = float(old_value) + mul = 0 + add = 0 + if action is Suggestion.Action.increase: + if old_value < 0: + mul = 0.7 + add = 2 + else: + mul = 1.3 + add = 2 + elif action is Suggestion.Action.decrease: + if old_value < 0: + mul = 1.3 + add = -2 + else: + mul = 0.7 + add = -2 + new_value = int(old_value * mul + add) + return new_value + + @staticmethod + def improve_db_config(options, rule, suggestions_dict): + # this method takes ONE 'rule' and applies all its suggestions on the + # appropriate options + required_options = [] + rule_suggestions = [] + for sugg_name in rule.get_suggestions(): + option = suggestions_dict[sugg_name].option + action = suggestions_dict[sugg_name].action + # A Suggestion in the rules spec must have the 'option' and + # 'action' fields defined, always call perform_checks() method + # after parsing the rules file using RulesSpec + assert option + assert action + required_options.append(option) + rule_suggestions.append(suggestions_dict[sugg_name]) + current_config = options.get_options(required_options) + # Create the updated configuration from the rule's suggestions + updated_config = {} + for sugg in rule_suggestions: + # case: when the option is not present in the current configuration + if sugg.option not in current_config: + try: + new_value = ConfigOptimizer.apply_action_on_value( + None, sugg.action, sugg.suggested_values + ) + if sugg.option not in updated_config: + updated_config[sugg.option] = {} + if DatabaseOptions.is_misc_option(sugg.option): + # this suggestion is on an option that is not yet + # supported by the Rocksdb OPTIONS file and so it is + # not prefixed by a section type. + updated_config[sugg.option][NO_COL_FAMILY] = new_value + else: + for col_fam in rule.get_trigger_column_families(): + updated_config[sugg.option][col_fam] = new_value + except AssertionError: + print( + "WARNING(ConfigOptimizer): provide suggested_values " + + "for " + + sugg.option + ) + continue + # case: when the option is present in the current configuration + if NO_COL_FAMILY in current_config[sugg.option]: + old_value = current_config[sugg.option][NO_COL_FAMILY] + try: + new_value = ConfigOptimizer.apply_action_on_value( + old_value, sugg.action, sugg.suggested_values + ) + if sugg.option not in updated_config: + updated_config[sugg.option] = {} + updated_config[sugg.option][NO_COL_FAMILY] = new_value + except AssertionError: + print( + "WARNING(ConfigOptimizer): provide suggested_values " + + "for " + + sugg.option + ) + else: + for col_fam in rule.get_trigger_column_families(): + old_value = None + if col_fam in current_config[sugg.option]: + old_value = current_config[sugg.option][col_fam] + try: + new_value = ConfigOptimizer.apply_action_on_value( + old_value, sugg.action, sugg.suggested_values + ) + if sugg.option not in updated_config: + updated_config[sugg.option] = {} + updated_config[sugg.option][col_fam] = new_value + except AssertionError: + print( + "WARNING(ConfigOptimizer): provide " + + "suggested_values for " + + sugg.option + ) + return current_config, updated_config + + @staticmethod + def pick_rule_to_apply(rules, last_rule_name, rules_tried, backtrack): + if not rules: + print("\nNo more rules triggered!") + return None + # if the last rule provided an improvement in the database performance, + # and it was triggered again (i.e. it is present in 'rules'), then pick + # the same rule for this iteration too. + if last_rule_name and not backtrack: + for rule in rules: + if rule.name == last_rule_name: + return rule + # there was no previous rule OR the previous rule did not improve db + # performance OR it was not triggered for this iteration, + # then pick another rule that has not been tried yet + for rule in rules: + if rule.name not in rules_tried: + return rule + print("\nAll rules have been exhausted") + return None + + @staticmethod + def apply_suggestions( + triggered_rules, + current_rule_name, + rules_tried, + backtrack, + curr_options, + suggestions_dict, + ): + curr_rule = ConfigOptimizer.pick_rule_to_apply( + triggered_rules, current_rule_name, rules_tried, backtrack + ) + if not curr_rule: + return tuple([None] * 4) + # if a rule has been picked for improving db_config, update rules_tried + rules_tried.add(curr_rule.name) + # get updated config based on the picked rule + curr_conf, updated_conf = ConfigOptimizer.improve_db_config( + curr_options, curr_rule, suggestions_dict + ) + conf_diff = DatabaseOptions.get_options_diff(curr_conf, updated_conf) + if not conf_diff: # the current and updated configs are the same + ( + curr_rule, + rules_tried, + curr_conf, + updated_conf, + ) = ConfigOptimizer.apply_suggestions( + triggered_rules, + None, + rules_tried, + backtrack, + curr_options, + suggestions_dict, + ) + print("returning from apply_suggestions") + return (curr_rule, rules_tried, curr_conf, updated_conf) + + # TODO(poojam23): check if this method is required or can we directly set + # the config equal to the curr_config + @staticmethod + def get_backtrack_config(curr_config, updated_config): + diff = DatabaseOptions.get_options_diff(curr_config, updated_config) + bt_config = {} + for option in diff: + bt_config[option] = {} + for col_fam in diff[option]: + bt_config[option][col_fam] = diff[option][col_fam][0] + print(bt_config) + return bt_config + + def __init__(self, bench_runner, db_options, rule_parser, base_db): + self.bench_runner = bench_runner + self.db_options = db_options + self.rule_parser = rule_parser + self.base_db_path = base_db + + def run(self): + # In every iteration of this method's optimization loop we pick ONE + # RULE from all the triggered rules and apply all its suggestions to + # the appropriate options. + # bootstrapping the optimizer + print("Bootstrapping optimizer:") + options = copy.deepcopy(self.db_options) + old_data_sources, old_metric = self.bench_runner.run_experiment( + options, self.base_db_path + ) + print("Initial metric: " + str(old_metric)) + self.rule_parser.load_rules_from_spec() + self.rule_parser.perform_section_checks() + triggered_rules = self.rule_parser.get_triggered_rules( + old_data_sources, options.get_column_families() + ) + print("\nTriggered:") + self.rule_parser.print_rules(triggered_rules) + backtrack = False + rules_tried = set() + ( + curr_rule, + rules_tried, + curr_conf, + updated_conf, + ) = ConfigOptimizer.apply_suggestions( + triggered_rules, + None, + rules_tried, + backtrack, + options, + self.rule_parser.get_suggestions_dict(), + ) + # the optimizer loop + while curr_rule: + print("\nRule picked for next iteration:") + print(curr_rule.name) + print("\ncurrent config:") + print(curr_conf) + print("updated config:") + print(updated_conf) + options.update_options(updated_conf) + # run bench_runner with updated config + new_data_sources, new_metric = self.bench_runner.run_experiment( + options, self.base_db_path + ) + print("\nnew metric: " + str(new_metric)) + backtrack = not self.bench_runner.is_metric_better(new_metric, old_metric) + # update triggered_rules, metric, data_sources, if required + if backtrack: + # revert changes to options config + print("\nBacktracking to previous configuration") + backtrack_conf = ConfigOptimizer.get_backtrack_config( + curr_conf, updated_conf + ) + options.update_options(backtrack_conf) + else: + # run advisor on new data sources + self.rule_parser.load_rules_from_spec() # reboot the advisor + self.rule_parser.perform_section_checks() + triggered_rules = self.rule_parser.get_triggered_rules( + new_data_sources, options.get_column_families() + ) + print("\nTriggered:") + self.rule_parser.print_rules(triggered_rules) + old_metric = new_metric + old_data_sources = new_data_sources + rules_tried = set() + # pick rule to work on and set curr_rule to that + ( + curr_rule, + rules_tried, + curr_conf, + updated_conf, + ) = ConfigOptimizer.apply_suggestions( + triggered_rules, + curr_rule.name, + rules_tried, + backtrack, + options, + self.rule_parser.get_suggestions_dict(), + ) + # return the final database options configuration + return options diff --git a/src/rocksdb/tools/advisor/advisor/db_log_parser.py b/src/rocksdb/tools/advisor/advisor/db_log_parser.py new file mode 100644 index 000000000..9ba541fc3 --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/db_log_parser.py @@ -0,0 +1,134 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import glob +import re +import time +from abc import ABC, abstractmethod +from calendar import timegm +from enum import Enum + + +NO_COL_FAMILY = "DB_WIDE" + + +class DataSource(ABC): + class Type(Enum): + LOG = 1 + DB_OPTIONS = 2 + TIME_SERIES = 3 + + def __init__(self, type): + self.type = type + + @abstractmethod + def check_and_trigger_conditions(self, conditions): + pass + + +class Log: + @staticmethod + def is_new_log(log_line): + # The assumption is that a new log will start with a date printed in + # the below regex format. + date_regex = "\d{4}/\d{2}/\d{2}-\d{2}:\d{2}:\d{2}\.\d{6}" # noqa + return re.match(date_regex, log_line) + + def __init__(self, log_line, column_families): + token_list = log_line.strip().split() + self.time = token_list[0] + self.context = token_list[1] + self.message = " ".join(token_list[2:]) + self.column_family = None + # example log for 'default' column family: + # "2018/07/25-17:29:05.176080 7f969de68700 [db/compaction_job.cc:1634] + # [default] [JOB 3] Compacting 24@0 + 16@1 files to L1, score 6.00\n" + for col_fam in column_families: + search_for_str = "\[" + col_fam + "\]" # noqa + if re.search(search_for_str, self.message): + self.column_family = col_fam + break + if not self.column_family: + self.column_family = NO_COL_FAMILY + + def get_human_readable_time(self): + # example from a log line: '2018/07/25-11:25:45.782710' + return self.time + + def get_column_family(self): + return self.column_family + + def get_context(self): + return self.context + + def get_message(self): + return self.message + + def append_message(self, remaining_log): + self.message = self.message + "\n" + remaining_log.strip() + + def get_timestamp(self): + # example: '2018/07/25-11:25:45.782710' will be converted to the GMT + # Unix timestamp 1532517945 (note: this method assumes that self.time + # is in GMT) + hr_time = self.time + "GMT" + timestamp = timegm(time.strptime(hr_time, "%Y/%m/%d-%H:%M:%S.%f%Z")) + return timestamp + + def __repr__(self): + return ( + "time: " + + self.time + + "; context: " + + self.context + + "; col_fam: " + + self.column_family + + "; message: " + + self.message + ) + + +class DatabaseLogs(DataSource): + def __init__(self, logs_path_prefix, column_families): + super().__init__(DataSource.Type.LOG) + self.logs_path_prefix = logs_path_prefix + self.column_families = column_families + + def trigger_conditions_for_log(self, conditions, log): + # For a LogCondition object, trigger is: + # Dict[column_family_name, List[Log]]. This explains why the condition + # was triggered and for which column families. + for cond in conditions: + if re.search(cond.regex, log.get_message(), re.IGNORECASE): + trigger = cond.get_trigger() + if not trigger: + trigger = {} + if log.get_column_family() not in trigger: + trigger[log.get_column_family()] = [] + trigger[log.get_column_family()].append(log) + cond.set_trigger(trigger) + + def check_and_trigger_conditions(self, conditions): + for file_name in glob.glob(self.logs_path_prefix + "*"): + # TODO(poojam23): find a way to distinguish between log files + # - generated in the current experiment but are labeled 'old' + # because they LOGs exceeded the file size limit AND + # - generated in some previous experiment that are also labeled + # 'old' and were not deleted for some reason + if re.search("old", file_name, re.IGNORECASE): + continue + with open(file_name, "r") as db_logs: + new_log = None + for line in db_logs: + if Log.is_new_log(line): + if new_log: + self.trigger_conditions_for_log(conditions, new_log) + new_log = Log(line, self.column_families) + else: + # To account for logs split into multiple lines + new_log.append_message(line) + # Check for the last log in the file. + if new_log: + self.trigger_conditions_for_log(conditions, new_log) diff --git a/src/rocksdb/tools/advisor/advisor/db_options_parser.py b/src/rocksdb/tools/advisor/advisor/db_options_parser.py new file mode 100644 index 000000000..062aeeec4 --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/db_options_parser.py @@ -0,0 +1,348 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import copy +import os + +from advisor.db_log_parser import DataSource, NO_COL_FAMILY +from advisor.ini_parser import IniParser + + +class OptionsSpecParser(IniParser): + @staticmethod + def is_new_option(line): + return "=" in line + + @staticmethod + def get_section_type(line): + """ + Example section header: [TableOptions/BlockBasedTable "default"] + Here ConfigurationOptimizer returned would be + 'TableOptions.BlockBasedTable' + """ + section_path = line.strip()[1:-1].split()[0] + section_type = ".".join(section_path.split("/")) + return section_type + + @staticmethod + def get_section_name(line): + # example: get_section_name('[CFOptions "default"]') + token_list = line.strip()[1:-1].split('"') + # token_list = ['CFOptions', 'default', ''] + if len(token_list) < 3: + return None + return token_list[1] # return 'default' + + @staticmethod + def get_section_str(section_type, section_name): + # Example: + # Case 1: get_section_str('DBOptions', NO_COL_FAMILY) + # Case 2: get_section_str('TableOptions.BlockBasedTable', 'default') + section_type = "/".join(section_type.strip().split(".")) + # Case 1: section_type = 'DBOptions' + # Case 2: section_type = 'TableOptions/BlockBasedTable' + section_str = "[" + section_type + if section_name == NO_COL_FAMILY: + # Case 1: '[DBOptions]' + return section_str + "]" + else: + # Case 2: '[TableOptions/BlockBasedTable "default"]' + return section_str + ' "' + section_name + '"]' + + @staticmethod + def get_option_str(key, values): + option_str = key + "=" + # get_option_str('db_log_dir', None), returns 'db_log_dir=' + if values: + # example: + # get_option_str('max_bytes_for_level_multiplier_additional', + # [1,1,1,1,1,1,1]), returned string: + # 'max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1' + if isinstance(values, list): + for value in values: + option_str += str(value) + ":" + option_str = option_str[:-1] + else: + # example: get_option_str('write_buffer_size', 1048576) + # returned string: 'write_buffer_size=1048576' + option_str += str(values) + return option_str + + +class DatabaseOptions(DataSource): + @staticmethod + def is_misc_option(option_name): + # these are miscellaneous options that are not yet supported by the + # Rocksdb options file, hence they are not prefixed with any section + # name + return "." not in option_name + + @staticmethod + def get_options_diff(opt_old, opt_new): + # type: Dict[option, Dict[col_fam, value]] X 2 -> + # Dict[option, Dict[col_fam, Tuple(old_value, new_value)]] + # note: diff should contain a tuple of values only if they are + # different from each other + options_union = set(opt_old.keys()).union(set(opt_new.keys())) + diff = {} + for opt in options_union: + diff[opt] = {} + # if option in options_union, then it must be in one of the configs + if opt not in opt_old: + for col_fam in opt_new[opt]: + diff[opt][col_fam] = (None, opt_new[opt][col_fam]) + elif opt not in opt_new: + for col_fam in opt_old[opt]: + diff[opt][col_fam] = (opt_old[opt][col_fam], None) + else: + for col_fam in opt_old[opt]: + if col_fam in opt_new[opt]: + if opt_old[opt][col_fam] != opt_new[opt][col_fam]: + diff[opt][col_fam] = ( + opt_old[opt][col_fam], + opt_new[opt][col_fam], + ) + else: + diff[opt][col_fam] = (opt_old[opt][col_fam], None) + for col_fam in opt_new[opt]: + if col_fam in opt_old[opt]: + if opt_old[opt][col_fam] != opt_new[opt][col_fam]: + diff[opt][col_fam] = ( + opt_old[opt][col_fam], + opt_new[opt][col_fam], + ) + else: + diff[opt][col_fam] = (None, opt_new[opt][col_fam]) + if not diff[opt]: + diff.pop(opt) + return diff + + def __init__(self, rocksdb_options, misc_options=None): + super().__init__(DataSource.Type.DB_OPTIONS) + # The options are stored in the following data structure: + # Dict[section_type, Dict[section_name, Dict[option_name, value]]] + self.options_dict = None + self.column_families = None + # Load the options from the given file to a dictionary. + self.load_from_source(rocksdb_options) + # Setup the miscellaneous options expected to be List[str], where each + # element in the List has the format "<option_name>=<option_value>" + # These options are the ones that are not yet supported by the Rocksdb + # OPTIONS file, so they are provided separately + self.setup_misc_options(misc_options) + + def setup_misc_options(self, misc_options): + self.misc_options = {} + if misc_options: + for option_pair_str in misc_options: + option_name = option_pair_str.split("=")[0].strip() + option_value = option_pair_str.split("=")[1].strip() + self.misc_options[option_name] = option_value + + def load_from_source(self, options_path): + self.options_dict = {} + with open(options_path, "r") as db_options: + for line in db_options: + line = OptionsSpecParser.remove_trailing_comment(line) + if not line: + continue + if OptionsSpecParser.is_section_header(line): + curr_sec_type = OptionsSpecParser.get_section_type(line) + curr_sec_name = OptionsSpecParser.get_section_name(line) + if curr_sec_type not in self.options_dict: + self.options_dict[curr_sec_type] = {} + if not curr_sec_name: + curr_sec_name = NO_COL_FAMILY + self.options_dict[curr_sec_type][curr_sec_name] = {} + # example: if the line read from the Rocksdb OPTIONS file + # is [CFOptions "default"], then the section type is + # CFOptions and 'default' is the name of a column family + # that for this database, so it's added to the list of + # column families stored in this object + if curr_sec_type == "CFOptions": + if not self.column_families: + self.column_families = [] + self.column_families.append(curr_sec_name) + elif OptionsSpecParser.is_new_option(line): + key, value = OptionsSpecParser.get_key_value_pair(line) + self.options_dict[curr_sec_type][curr_sec_name][key] = value + else: + error = "Not able to parse line in Options file." + OptionsSpecParser.exit_with_parse_error(line, error) + + def get_misc_options(self): + # these are options that are not yet supported by the Rocksdb OPTIONS + # file, hence they are provided and stored separately + return self.misc_options + + def get_column_families(self): + return self.column_families + + def get_all_options(self): + # This method returns all the options that are stored in this object as + # a: Dict[<sec_type>.<option_name>: Dict[col_fam, option_value]] + all_options = [] + # Example: in the section header '[CFOptions "default"]' read from the + # OPTIONS file, sec_type='CFOptions' + for sec_type in self.options_dict: + for col_fam in self.options_dict[sec_type]: + for opt_name in self.options_dict[sec_type][col_fam]: + option = sec_type + "." + opt_name + all_options.append(option) + all_options.extend(list(self.misc_options.keys())) + return self.get_options(all_options) + + def get_options(self, reqd_options): + # type: List[str] -> Dict[str, Dict[str, Any]] + # List[option] -> Dict[option, Dict[col_fam, value]] + reqd_options_dict = {} + for option in reqd_options: + if DatabaseOptions.is_misc_option(option): + # the option is not prefixed by '<section_type>.' because it is + # not yet supported by the Rocksdb OPTIONS file; so it has to + # be fetched from the misc_options dictionary + if option not in self.misc_options: + continue + if option not in reqd_options_dict: + reqd_options_dict[option] = {} + reqd_options_dict[option][NO_COL_FAMILY] = self.misc_options[option] + else: + # Example: option = 'TableOptions.BlockBasedTable.block_align' + # then, sec_type = 'TableOptions.BlockBasedTable' + sec_type = ".".join(option.split(".")[:-1]) + # opt_name = 'block_align' + opt_name = option.split(".")[-1] + if sec_type not in self.options_dict: + continue + for col_fam in self.options_dict[sec_type]: + if opt_name in self.options_dict[sec_type][col_fam]: + if option not in reqd_options_dict: + reqd_options_dict[option] = {} + reqd_options_dict[option][col_fam] = self.options_dict[ + sec_type + ][col_fam][opt_name] + return reqd_options_dict + + def update_options(self, options): + # An example 'options' object looks like: + # {'DBOptions.max_background_jobs': {NO_COL_FAMILY: 2}, + # 'CFOptions.write_buffer_size': {'default': 1048576, 'cf_A': 128000}, + # 'bloom_bits': {NO_COL_FAMILY: 4}} + for option in options: + if DatabaseOptions.is_misc_option(option): + # this is a misc_option i.e. an option that is not yet + # supported by the Rocksdb OPTIONS file, so it is not prefixed + # by '<section_type>.' and must be stored in the separate + # misc_options dictionary + if NO_COL_FAMILY not in options[option]: + print( + "WARNING(DatabaseOptions.update_options): not " + + "updating option " + + option + + " because it is in " + + "misc_option format but its scope is not " + + NO_COL_FAMILY + + ". Check format of option." + ) + continue + self.misc_options[option] = options[option][NO_COL_FAMILY] + else: + sec_name = ".".join(option.split(".")[:-1]) + opt_name = option.split(".")[-1] + if sec_name not in self.options_dict: + self.options_dict[sec_name] = {} + for col_fam in options[option]: + # if the option is not already present in the dictionary, + # it will be inserted, else it will be updated to the new + # value + if col_fam not in self.options_dict[sec_name]: + self.options_dict[sec_name][col_fam] = {} + self.options_dict[sec_name][col_fam][opt_name] = copy.deepcopy( + options[option][col_fam] + ) + + def generate_options_config(self, nonce): + # this method generates a Rocksdb OPTIONS file in the INI format from + # the options stored in self.options_dict + this_path = os.path.abspath(os.path.dirname(__file__)) + file_name = "../temp/OPTIONS_" + str(nonce) + ".tmp" + file_path = os.path.join(this_path, file_name) + with open(file_path, "w") as fp: + for section in self.options_dict: + for col_fam in self.options_dict[section]: + fp.write(OptionsSpecParser.get_section_str(section, col_fam) + "\n") + for option in self.options_dict[section][col_fam]: + values = self.options_dict[section][col_fam][option] + fp.write( + OptionsSpecParser.get_option_str(option, values) + "\n" + ) + fp.write("\n") + return file_path + + def check_and_trigger_conditions(self, conditions): + for cond in conditions: + reqd_options_dict = self.get_options(cond.options) + # This contains the indices of options that are specific to some + # column family and are not database-wide options. + incomplete_option_ix = [] + options = [] + missing_reqd_option = False + for ix, option in enumerate(cond.options): + if option not in reqd_options_dict: + print( + "WARNING(DatabaseOptions.check_and_trigger): " + + "skipping condition " + + cond.name + + " because it " + "requires option " + + option + + " but this option is" + + " not available" + ) + missing_reqd_option = True + break # required option is absent + if NO_COL_FAMILY in reqd_options_dict[option]: + options.append(reqd_options_dict[option][NO_COL_FAMILY]) + else: + options.append(None) + incomplete_option_ix.append(ix) + + if missing_reqd_option: + continue + + # if all the options are database-wide options + if not incomplete_option_ix: + try: + if eval(cond.eval_expr): + cond.set_trigger({NO_COL_FAMILY: options}) + except Exception as e: + print("WARNING(DatabaseOptions) check_and_trigger:" + str(e)) + continue + + # for all the options that are not database-wide, we look for their + # values specific to column families + col_fam_options_dict = {} + for col_fam in self.column_families: + present = True + for ix in incomplete_option_ix: + option = cond.options[ix] + if col_fam not in reqd_options_dict[option]: + present = False + break + options[ix] = reqd_options_dict[option][col_fam] + if present: + try: + if eval(cond.eval_expr): + col_fam_options_dict[col_fam] = copy.deepcopy(options) + except Exception as e: + print("WARNING(DatabaseOptions) check_and_trigger: " + str(e)) + # Trigger for an OptionCondition object is of the form: + # Dict[col_fam_name: List[option_value]] + # where col_fam_name is the name of a column family for which + # 'eval_expr' evaluated to True and List[option_value] is the list + # of values of the options specified in the condition's 'options' + # field + if col_fam_options_dict: + cond.set_trigger(col_fam_options_dict) diff --git a/src/rocksdb/tools/advisor/advisor/db_stats_fetcher.py b/src/rocksdb/tools/advisor/advisor/db_stats_fetcher.py new file mode 100755 index 000000000..30d1ad8b3 --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/db_stats_fetcher.py @@ -0,0 +1,346 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import copy +import glob +import re +import subprocess +import time +from typing import List + +from advisor.db_log_parser import Log +from advisor.db_timeseries_parser import NO_ENTITY, TimeSeriesData + + +class LogStatsParser(TimeSeriesData): + STATS = "STATISTICS:" + + @staticmethod + def parse_log_line_for_stats(log_line): + # Example stat line (from LOG file): + # "rocksdb.db.get.micros P50 : 8.4 P95 : 21.8 P99 : 33.9 P100 : 92.0\n" + token_list = log_line.strip().split() + # token_list = ['rocksdb.db.get.micros', 'P50', ':', '8.4', 'P95', ':', + # '21.8', 'P99', ':', '33.9', 'P100', ':', '92.0'] + stat_prefix = token_list[0] + "." # 'rocksdb.db.get.micros.' + stat_values = [token for token in token_list[1:] if token != ":"] + # stat_values = ['P50', '8.4', 'P95', '21.8', 'P99', '33.9', 'P100', + # '92.0'] + stat_dict = {} + for ix, metric in enumerate(stat_values): + if ix % 2 == 0: + stat_name = stat_prefix + metric + stat_name = stat_name.lower() # Note: case insensitive names + else: + stat_dict[stat_name] = float(metric) + # stat_dict = {'rocksdb.db.get.micros.p50': 8.4, + # 'rocksdb.db.get.micros.p95': 21.8, 'rocksdb.db.get.micros.p99': 33.9, + # 'rocksdb.db.get.micros.p100': 92.0} + return stat_dict + + def __init__(self, logs_path_prefix, stats_freq_sec): + super().__init__() + self.logs_file_prefix = logs_path_prefix + self.stats_freq_sec = stats_freq_sec + self.duration_sec = 60 + + def get_keys_from_conditions(self, conditions): + # Note: case insensitive stat names + reqd_stats = [] + for cond in conditions: + for key in cond.keys: + key = key.lower() + # some keys are prepended with '[]' for OdsStatsFetcher to + # replace this with the appropriate key_prefix, remove these + # characters here since the LogStatsParser does not need + # a prefix + if key.startswith("[]"): + reqd_stats.append(key[2:]) + else: + reqd_stats.append(key) + return reqd_stats + + def add_to_timeseries(self, log, reqd_stats): + # this method takes in the Log object that contains the Rocksdb stats + # and a list of required stats, then it parses the stats line by line + # to fetch required stats and add them to the keys_ts object + # Example: reqd_stats = ['rocksdb.block.cache.hit.count', + # 'rocksdb.db.get.micros.p99'] + # Let log.get_message() returns following string: + # "[WARN] [db/db_impl.cc:485] STATISTICS:\n + # rocksdb.block.cache.miss COUNT : 1459\n + # rocksdb.block.cache.hit COUNT : 37\n + # ... + # rocksdb.db.get.micros P50 : 15.6 P95 : 39.7 P99 : 62.6 P100 : 148.0\n + # ..." + new_lines = log.get_message().split("\n") + # let log_ts = 1532518219 + log_ts = log.get_timestamp() + # example updates to keys_ts: + # keys_ts[NO_ENTITY]['rocksdb.db.get.micros.p99'][1532518219] = 62.6 + # keys_ts[NO_ENTITY]['rocksdb.block.cache.hit.count'][1532518219] = 37 + for line in new_lines[1:]: # new_lines[0] does not contain any stats + stats_on_line = self.parse_log_line_for_stats(line) + for stat in stats_on_line: + if stat in reqd_stats: + if stat not in self.keys_ts[NO_ENTITY]: + self.keys_ts[NO_ENTITY][stat] = {} + self.keys_ts[NO_ENTITY][stat][log_ts] = stats_on_line[stat] + + def fetch_timeseries(self, reqd_stats): + # this method parses the Rocksdb LOG file and generates timeseries for + # each of the statistic in the list reqd_stats + self.keys_ts = {NO_ENTITY: {}} + for file_name in glob.glob(self.logs_file_prefix + "*"): + # TODO(poojam23): find a way to distinguish between 'old' log files + # from current and previous experiments, present in the same + # directory + if re.search("old", file_name, re.IGNORECASE): + continue + with open(file_name, "r") as db_logs: + new_log = None + for line in db_logs: + if Log.is_new_log(line): + if new_log and re.search(self.STATS, new_log.get_message()): + self.add_to_timeseries(new_log, reqd_stats) + new_log = Log(line, column_families=[]) + else: + # To account for logs split into multiple lines + new_log.append_message(line) + # Check for the last log in the file. + if new_log and re.search(self.STATS, new_log.get_message()): + self.add_to_timeseries(new_log, reqd_stats) + + +class DatabasePerfContext(TimeSeriesData): + # TODO(poojam23): check if any benchrunner provides PerfContext sampled at + # regular intervals + def __init__(self, perf_context_ts, stats_freq_sec, cumulative): + """ + perf_context_ts is expected to be in the following format: + Dict[metric, Dict[timestamp, value]], where for + each (metric, timestamp) pair, the value is database-wide (i.e. + summed over all the threads involved) + if stats_freq_sec == 0, per-metric only one value is reported + """ + super().__init__() + self.stats_freq_sec = stats_freq_sec + self.keys_ts = {NO_ENTITY: perf_context_ts} + if cumulative: + self.unaccumulate_metrics() + + def unaccumulate_metrics(self): + # if the perf context metrics provided are cumulative in nature, this + # method can be used to convert them to a disjoint format + epoch_ts = copy.deepcopy(self.keys_ts) + for stat in self.keys_ts[NO_ENTITY]: + timeseries = sorted( + list(self.keys_ts[NO_ENTITY][stat].keys()), reverse=True + ) + if len(timeseries) < 2: + continue + for ix, ts in enumerate(timeseries[:-1]): + epoch_ts[NO_ENTITY][stat][ts] = ( + epoch_ts[NO_ENTITY][stat][ts] + - epoch_ts[NO_ENTITY][stat][timeseries[ix + 1]] + ) + if epoch_ts[NO_ENTITY][stat][ts] < 0: + raise ValueError("DBPerfContext: really cumulative?") + # drop the smallest timestamp in the timeseries for this metric + epoch_ts[NO_ENTITY][stat].pop(timeseries[-1]) + self.keys_ts = epoch_ts + + def get_keys_from_conditions(self, conditions): + reqd_stats = [] + for cond in conditions: + reqd_stats.extend([key.lower() for key in cond.keys]) + return reqd_stats + + def fetch_timeseries(self, statistics): + # this method is redundant for DatabasePerfContext because the __init__ + # does the job of populating 'keys_ts' + pass + + +class OdsStatsFetcher(TimeSeriesData): + # class constants + OUTPUT_FILE = "temp/stats_out.tmp" + ERROR_FILE = "temp/stats_err.tmp" + RAPIDO_COMMAND = "%s --entity=%s --key=%s --tstart=%s --tend=%s --showtime" + + # static methods + @staticmethod + def _get_string_in_quotes(value): + return '"' + str(value) + '"' + + @staticmethod + def _get_time_value_pair(pair_string): + # example pair_string: '[1532544591, 97.3653601828]' + pair_string = pair_string.replace("[", "") + pair_string = pair_string.replace("]", "") + pair = pair_string.split(",") + first = int(pair[0].strip()) + second = float(pair[1].strip()) + return [first, second] + + @staticmethod + def _get_ods_cli_stime(start_time): + diff = int(time.time() - int(start_time)) + stime = str(diff) + "_s" + return stime + + def __init__(self, client, entities, start_time, end_time, key_prefix=None): + super().__init__() + self.client = client + self.entities = entities + self.start_time = start_time + self.end_time = end_time + self.key_prefix = key_prefix + self.stats_freq_sec = 60 + self.duration_sec = 60 + + def execute_script(self, command): + print("executing...") + print(command) + out_file = open(self.OUTPUT_FILE, "w+") + err_file = open(self.ERROR_FILE, "w+") + subprocess.call(command, shell=True, stdout=out_file, stderr=err_file) + out_file.close() + err_file.close() + + def parse_rapido_output(self): + # Output looks like the following: + # <entity_name>\t<key_name>\t[[ts, value], [ts, value], ...] + # ts = timestamp; value = value of key_name in entity_name at time ts + self.keys_ts = {} + with open(self.OUTPUT_FILE, "r") as fp: + for line in fp: + token_list = line.strip().split("\t") + entity = token_list[0] + key = token_list[1] + if entity not in self.keys_ts: + self.keys_ts[entity] = {} + if key not in self.keys_ts[entity]: + self.keys_ts[entity][key] = {} + list_of_lists = [ + self._get_time_value_pair(pair_string) + for pair_string in token_list[2].split("],") + ] + value = {pair[0]: pair[1] for pair in list_of_lists} + self.keys_ts[entity][key] = value + + def parse_ods_output(self): + # Output looks like the following: + # <entity_name>\t<key_name>\t<timestamp>\t<value> + # there is one line per (entity_name, key_name, timestamp) + self.keys_ts = {} + with open(self.OUTPUT_FILE, "r") as fp: + for line in fp: + token_list = line.split() + entity = token_list[0] + if entity not in self.keys_ts: + self.keys_ts[entity] = {} + key = token_list[1] + if key not in self.keys_ts[entity]: + self.keys_ts[entity][key] = {} + self.keys_ts[entity][key][token_list[2]] = token_list[3] + + def fetch_timeseries(self, statistics): + # this method fetches the timeseries of required stats from the ODS + # service and populates the 'keys_ts' object appropriately + print("OdsStatsFetcher: fetching " + str(statistics)) + if re.search("rapido", self.client, re.IGNORECASE): + command = self.RAPIDO_COMMAND % ( + self.client, + self._get_string_in_quotes(self.entities), + self._get_string_in_quotes(",".join(statistics)), + self._get_string_in_quotes(self.start_time), + self._get_string_in_quotes(self.end_time), + ) + # Run the tool and fetch the time-series data + self.execute_script(command) + # Parse output and populate the 'keys_ts' map + self.parse_rapido_output() + elif re.search("ods", self.client, re.IGNORECASE): + command = ( + self.client + + " " + + "--stime=" + + self._get_ods_cli_stime(self.start_time) + + " " + + self._get_string_in_quotes(self.entities) + + " " + + self._get_string_in_quotes(",".join(statistics)) + ) + # Run the tool and fetch the time-series data + self.execute_script(command) + # Parse output and populate the 'keys_ts' map + self.parse_ods_output() + + def get_keys_from_conditions(self, conditions): + reqd_stats = [] + for cond in conditions: + for key in cond.keys: + use_prefix = False + if key.startswith("[]"): + use_prefix = True + key = key[2:] + # TODO(poojam23): this is very hacky and needs to be improved + if key.startswith("rocksdb"): + key += ".60" + if use_prefix: + if not self.key_prefix: + print("Warning: OdsStatsFetcher might need key prefix") + print("for the key: " + key) + else: + key = self.key_prefix + "." + key + reqd_stats.append(key) + return reqd_stats + + def fetch_rate_url( + self, + entities: List[str], + keys: List[str], + window_len: str, + percent: str, + display: bool, + ) -> str: + transform_desc = ( + "rate(" + str(window_len) + ",duration=" + str(self.duration_sec) + ) + if percent: + transform_desc = transform_desc + ",%)" + else: + transform_desc = transform_desc + ")" + if re.search("rapido", self.client, re.IGNORECASE): + command = self.RAPIDO_COMMAND + " --transform=%s --url=%s" + command = command % ( + self.client, + self._get_string_in_quotes(",".join(entities)), + self._get_string_in_quotes(",".join(keys)), + self._get_string_in_quotes(self.start_time), + self._get_string_in_quotes(self.end_time), + self._get_string_in_quotes(transform_desc), + self._get_string_in_quotes(display), + ) + elif re.search("ods", self.client, re.IGNORECASE): + command = ( + self.client + + " " + + "--stime=" + + self._get_ods_cli_stime(self.start_time) + + " " + + "--fburlonly " + + self._get_string_in_quotes(entities) + + " " + + self._get_string_in_quotes(",".join(keys)) + + " " + + self._get_string_in_quotes(transform_desc) + ) + self.execute_script(command) + url = "" + with open(self.OUTPUT_FILE, "r") as fp: + url = fp.readline() + return url diff --git a/src/rocksdb/tools/advisor/advisor/db_timeseries_parser.py b/src/rocksdb/tools/advisor/advisor/db_timeseries_parser.py new file mode 100644 index 000000000..5840d7b90 --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/db_timeseries_parser.py @@ -0,0 +1,203 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import math +from abc import abstractmethod +from enum import Enum +from typing import Dict + +from advisor.db_log_parser import DataSource + + +NO_ENTITY = "ENTITY_PLACEHOLDER" + + +class TimeSeriesData(DataSource): + class Behavior(Enum): + bursty = 1 + evaluate_expression = 2 + + class AggregationOperator(Enum): + avg = 1 + max = 2 + min = 3 + latest = 4 + oldest = 5 + + def __init__(self): + super().__init__(DataSource.Type.TIME_SERIES) + self.keys_ts = None # Dict[entity, Dict[key, Dict[timestamp, value]]] + self.stats_freq_sec = None + + @abstractmethod + def get_keys_from_conditions(self, conditions): + # This method takes in a list of time-series conditions; for each + # condition it manipulates the 'keys' in the way that is supported by + # the subclass implementing this method + pass + + @abstractmethod + def fetch_timeseries(self, required_statistics): + # this method takes in a list of statistics and fetches the timeseries + # for each of them and populates the 'keys_ts' dictionary + pass + + def fetch_burst_epochs( + self, + entities: str, + statistic: int, + window_sec: float, + threshold: bool, + percent: bool, + ) -> Dict[str, Dict[int, float]]: + # this method calculates the (percent) rate change in the 'statistic' + # for each entity (over 'window_sec' seconds) and returns the epochs + # where this rate change is greater than or equal to the 'threshold' + # value + if self.stats_freq_sec == 0: + # not time series data, cannot check for bursty behavior + return + if window_sec < self.stats_freq_sec: + window_sec = self.stats_freq_sec + # 'window_samples' is the number of windows to go back to + # compare the current window with, while calculating rate change. + window_samples = math.ceil(window_sec / self.stats_freq_sec) + burst_epochs = {} + # if percent = False: + # curr_val = value at window for which rate change is being calculated + # prev_val = value at window that is window_samples behind curr_window + # Then rate_without_percent = + # ((curr_val-prev_val)*duration_sec)/(curr_timestamp-prev_timestamp) + # if percent = True: + # rate_with_percent = (rate_without_percent * 100) / prev_val + # These calculations are in line with the rate() transform supported + # by ODS + for entity in entities: + if statistic not in self.keys_ts[entity]: + continue + timestamps = sorted(list(self.keys_ts[entity][statistic].keys())) + for ix in range(window_samples, len(timestamps), 1): + first_ts = timestamps[ix - window_samples] + last_ts = timestamps[ix] + first_val = self.keys_ts[entity][statistic][first_ts] + last_val = self.keys_ts[entity][statistic][last_ts] + diff = last_val - first_val + if percent: + diff = diff * 100 / first_val + rate = (diff * self.duration_sec) / (last_ts - first_ts) + # if the rate change is greater than the provided threshold, + # then the condition is triggered for entity at time 'last_ts' + if rate >= threshold: + if entity not in burst_epochs: + burst_epochs[entity] = {} + burst_epochs[entity][last_ts] = rate + return burst_epochs + + def fetch_aggregated_values(self, entity, statistics, aggregation_op): + # this method performs the aggregation specified by 'aggregation_op' + # on the timeseries of 'statistics' for 'entity' and returns: + # Dict[statistic, aggregated_value] + result = {} + for stat in statistics: + if stat not in self.keys_ts[entity]: + continue + agg_val = None + if aggregation_op is self.AggregationOperator.latest: + latest_timestamp = max(list(self.keys_ts[entity][stat].keys())) + agg_val = self.keys_ts[entity][stat][latest_timestamp] + elif aggregation_op is self.AggregationOperator.oldest: + oldest_timestamp = min(list(self.keys_ts[entity][stat].keys())) + agg_val = self.keys_ts[entity][stat][oldest_timestamp] + elif aggregation_op is self.AggregationOperator.max: + agg_val = max(list(self.keys_ts[entity][stat].values())) + elif aggregation_op is self.AggregationOperator.min: + agg_val = min(list(self.keys_ts[entity][stat].values())) + elif aggregation_op is self.AggregationOperator.avg: + values = list(self.keys_ts[entity][stat].values()) + agg_val = sum(values) / len(values) + result[stat] = agg_val + return result + + def check_and_trigger_conditions(self, conditions): + # get the list of statistics that need to be fetched + reqd_keys = self.get_keys_from_conditions(conditions) + # fetch the required statistics and populate the map 'keys_ts' + self.fetch_timeseries(reqd_keys) + # Trigger the appropriate conditions + for cond in conditions: + complete_keys = self.get_keys_from_conditions([cond]) + # Get the entities that have all statistics required by 'cond': + # an entity is checked for a given condition only if we possess all + # of the condition's 'keys' for that entity + entities_with_stats = [] + for entity in self.keys_ts: + stat_missing = False + for stat in complete_keys: + if stat not in self.keys_ts[entity]: + stat_missing = True + break + if not stat_missing: + entities_with_stats.append(entity) + if not entities_with_stats: + continue + if cond.behavior is self.Behavior.bursty: + # for a condition that checks for bursty behavior, only one key + # should be present in the condition's 'keys' field + result = self.fetch_burst_epochs( + entities_with_stats, + complete_keys[0], # there should be only one key + cond.window_sec, + cond.rate_threshold, + True, + ) + # Trigger in this case is: + # Dict[entity_name, Dict[timestamp, rate_change]] + # where the inner dictionary contains rate_change values when + # the rate_change >= threshold provided, with the + # corresponding timestamps + if result: + cond.set_trigger(result) + elif cond.behavior is self.Behavior.evaluate_expression: + self.handle_evaluate_expression( + cond, complete_keys, entities_with_stats + ) + + def handle_evaluate_expression(self, condition, statistics, entities): + trigger = {} + # check 'condition' for each of these entities + for entity in entities: + if hasattr(condition, "aggregation_op"): + # in this case, the aggregation operation is performed on each + # of the condition's 'keys' and then with aggregated values + # condition's 'expression' is evaluated; if it evaluates to + # True, then list of the keys values is added to the + # condition's trigger: Dict[entity_name, List[stats]] + result = self.fetch_aggregated_values( + entity, statistics, condition.aggregation_op + ) + keys = [result[key] for key in statistics] + try: + if eval(condition.expression): + trigger[entity] = keys + except Exception as e: + print("WARNING(TimeSeriesData) check_and_trigger: " + str(e)) + else: + # assumption: all stats have same series of timestamps + # this is similar to the above but 'expression' is evaluated at + # each timestamp, since there is no aggregation, and all the + # epochs are added to the trigger when the condition's + # 'expression' evaluated to true; so trigger is: + # Dict[entity, Dict[timestamp, List[stats]]] + for epoch in self.keys_ts[entity][statistics[0]].keys(): + keys = [self.keys_ts[entity][key][epoch] for key in statistics] + try: + if eval(condition.expression): + if entity not in trigger: + trigger[entity] = {} + trigger[entity][epoch] = keys + except Exception as e: + print("WARNING(TimeSeriesData) check_and_trigger: " + str(e)) + if trigger: + condition.set_trigger(trigger) diff --git a/src/rocksdb/tools/advisor/advisor/ini_parser.py b/src/rocksdb/tools/advisor/advisor/ini_parser.py new file mode 100644 index 000000000..3379ea3cd --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/ini_parser.py @@ -0,0 +1,76 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +from enum import Enum + + +class IniParser: + class Element(Enum): + rule = 1 + cond = 2 + sugg = 3 + key_val = 4 + comment = 5 + + @staticmethod + def remove_trailing_comment(line): + line = line.strip() + comment_start = line.find("#") + if comment_start > -1: + return line[:comment_start] + return line + + @staticmethod + def is_section_header(line): + # A section header looks like: [Rule "my-new-rule"]. Essentially, + # a line that is in square-brackets. + line = line.strip() + if line.startswith("[") and line.endswith("]"): + return True + return False + + @staticmethod + def get_section_name(line): + # For a section header: [Rule "my-new-rule"], this method will return + # "my-new-rule". + token_list = line.strip()[1:-1].split('"') + if len(token_list) < 3: + error = 'needed section header: [<section_type> "<section_name>"]' + raise ValueError("Parsing error: " + error + "\n" + line) + return token_list[1] + + @staticmethod + def get_element(line): + line = IniParser.remove_trailing_comment(line) + if not line: + return IniParser.Element.comment + if IniParser.is_section_header(line): + if line.strip()[1:-1].startswith("Suggestion"): + return IniParser.Element.sugg + if line.strip()[1:-1].startswith("Rule"): + return IniParser.Element.rule + if line.strip()[1:-1].startswith("Condition"): + return IniParser.Element.cond + if "=" in line: + return IniParser.Element.key_val + error = "not a recognizable RulesSpec element" + raise ValueError("Parsing error: " + error + "\n" + line) + + @staticmethod + def get_key_value_pair(line): + line = line.strip() + key = line.split("=")[0].strip() + value = "=".join(line.split("=")[1:]) + if value == "": # if the option has no value + return (key, None) + values = IniParser.get_list_from_value(value) + if len(values) == 1: + return (key, value) + return (key, values) + + @staticmethod + def get_list_from_value(value): + values = value.strip().split(":") + return values diff --git a/src/rocksdb/tools/advisor/advisor/rule_parser.py b/src/rocksdb/tools/advisor/advisor/rule_parser.py new file mode 100644 index 000000000..169a55363 --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/rule_parser.py @@ -0,0 +1,510 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import re +from abc import ABC, abstractmethod +from enum import Enum + +from advisor.db_log_parser import DataSource, NO_COL_FAMILY +from advisor.db_timeseries_parser import TimeSeriesData +from advisor.ini_parser import IniParser + + +class Section(ABC): + def __init__(self, name): + self.name = name + + @abstractmethod + def set_parameter(self, key, value): + pass + + @abstractmethod + def perform_checks(self): + pass + + +class Rule(Section): + def __init__(self, name): + super().__init__(name) + self.conditions = None + self.suggestions = None + self.overlap_time_seconds = None + self.trigger_entities = None + self.trigger_column_families = None + + def set_parameter(self, key, value): + # If the Rule is associated with a single suggestion/condition, then + # value will be a string and not a list. Hence, convert it to a single + # element list before storing it in self.suggestions or + # self.conditions. + if key == "conditions": + if isinstance(value, str): + self.conditions = [value] + else: + self.conditions = value + elif key == "suggestions": + if isinstance(value, str): + self.suggestions = [value] + else: + self.suggestions = value + elif key == "overlap_time_period": + self.overlap_time_seconds = value + + def get_suggestions(self): + return self.suggestions + + def perform_checks(self): + if not self.conditions or len(self.conditions) < 1: + raise ValueError(self.name + ": rule must have at least one condition") + if not self.suggestions or len(self.suggestions) < 1: + raise ValueError(self.name + ": rule must have at least one suggestion") + if self.overlap_time_seconds: + if len(self.conditions) != 2: + raise ValueError( + self.name + + ": rule must be associated with 2 conditions\ + in order to check for a time dependency between them" + ) + time_format = "^\d+[s|m|h|d]$" # noqa + if not re.match(time_format, self.overlap_time_seconds, re.IGNORECASE): + raise ValueError( + self.name + ": overlap_time_seconds format: \d+[s|m|h|d]" + ) + else: # convert to seconds + in_seconds = int(self.overlap_time_seconds[:-1]) + if self.overlap_time_seconds[-1] == "m": + in_seconds *= 60 + elif self.overlap_time_seconds[-1] == "h": + in_seconds *= 60 * 60 + elif self.overlap_time_seconds[-1] == "d": + in_seconds *= 24 * 60 * 60 + self.overlap_time_seconds = in_seconds + + def get_overlap_timestamps(self, key1_trigger_epochs, key2_trigger_epochs): + # this method takes in 2 timeseries i.e. timestamps at which the + # rule's 2 TIME_SERIES conditions were triggered and it finds + # (if present) the first pair of timestamps at which the 2 conditions + # were triggered within 'overlap_time_seconds' of each other + key1_lower_bounds = [ + epoch - self.overlap_time_seconds for epoch in key1_trigger_epochs + ] + key1_lower_bounds.sort() + key2_trigger_epochs.sort() + trigger_ix = 0 + overlap_pair = None + for key1_lb in key1_lower_bounds: + while key2_trigger_epochs[trigger_ix] < key1_lb and trigger_ix < len( + key2_trigger_epochs + ): + trigger_ix += 1 + if trigger_ix >= len(key2_trigger_epochs): + break + if key2_trigger_epochs[trigger_ix] <= key1_lb + ( + 2 * self.overlap_time_seconds + ): + overlap_pair = ( + key2_trigger_epochs[trigger_ix], + key1_lb + self.overlap_time_seconds, + ) + break + return overlap_pair + + def get_trigger_entities(self): + return self.trigger_entities + + def get_trigger_column_families(self): + return self.trigger_column_families + + def is_triggered(self, conditions_dict, column_families): + if self.overlap_time_seconds: + condition1 = conditions_dict[self.conditions[0]] + condition2 = conditions_dict[self.conditions[1]] + if not ( + condition1.get_data_source() is DataSource.Type.TIME_SERIES + and condition2.get_data_source() is DataSource.Type.TIME_SERIES + ): + raise ValueError(self.name + ": need 2 timeseries conditions") + + map1 = condition1.get_trigger() + map2 = condition2.get_trigger() + if not (map1 and map2): + return False + + self.trigger_entities = {} + is_triggered = False + entity_intersection = set(map1.keys()).intersection(set(map2.keys())) + for entity in entity_intersection: + overlap_timestamps_pair = self.get_overlap_timestamps( + list(map1[entity].keys()), list(map2[entity].keys()) + ) + if overlap_timestamps_pair: + self.trigger_entities[entity] = overlap_timestamps_pair + is_triggered = True + if is_triggered: + self.trigger_column_families = set(column_families) + return is_triggered + else: + all_conditions_triggered = True + self.trigger_column_families = set(column_families) + for cond_name in self.conditions: + cond = conditions_dict[cond_name] + if not cond.get_trigger(): + all_conditions_triggered = False + break + if ( + cond.get_data_source() is DataSource.Type.LOG + or cond.get_data_source() is DataSource.Type.DB_OPTIONS + ): + cond_col_fam = set(cond.get_trigger().keys()) + if NO_COL_FAMILY in cond_col_fam: + cond_col_fam = set(column_families) + self.trigger_column_families = ( + self.trigger_column_families.intersection(cond_col_fam) + ) + elif cond.get_data_source() is DataSource.Type.TIME_SERIES: + cond_entities = set(cond.get_trigger().keys()) + if self.trigger_entities is None: + self.trigger_entities = cond_entities + else: + self.trigger_entities = self.trigger_entities.intersection( + cond_entities + ) + if not (self.trigger_entities or self.trigger_column_families): + all_conditions_triggered = False + break + if not all_conditions_triggered: # clean up if rule not triggered + self.trigger_column_families = None + self.trigger_entities = None + return all_conditions_triggered + + def __repr__(self): + # Append conditions + rule_string = "Rule: " + self.name + " has conditions:: " + is_first = True + for cond in self.conditions: + if is_first: + rule_string += cond + is_first = False + else: + rule_string += " AND " + cond + # Append suggestions + rule_string += "\nsuggestions:: " + is_first = True + for sugg in self.suggestions: + if is_first: + rule_string += sugg + is_first = False + else: + rule_string += ", " + sugg + if self.trigger_entities: + rule_string += ", entities:: " + str(self.trigger_entities) + if self.trigger_column_families: + rule_string += ", col_fam:: " + str(self.trigger_column_families) + # Return constructed string + return rule_string + + +class Suggestion(Section): + class Action(Enum): + set = 1 + increase = 2 + decrease = 3 + + def __init__(self, name): + super().__init__(name) + self.option = None + self.action = None + self.suggested_values = None + self.description = None + + def set_parameter(self, key, value): + if key == "option": + # Note: + # case 1: 'option' is supported by Rocksdb OPTIONS file; in this + # case the option belongs to one of the sections in the config + # file and it's name is prefixed by "<section_type>." + # case 2: 'option' is not supported by Rocksdb OPTIONS file; the + # option is not expected to have the character '.' in its name + self.option = value + elif key == "action": + if self.option and not value: + raise ValueError(self.name + ": provide action for option") + self.action = self.Action[value] + elif key == "suggested_values": + if isinstance(value, str): + self.suggested_values = [value] + else: + self.suggested_values = value + elif key == "description": + self.description = value + + def perform_checks(self): + if not self.description: + if not self.option: + raise ValueError(self.name + ": provide option or description") + if not self.action: + raise ValueError(self.name + ": provide action for option") + if self.action is self.Action.set and not self.suggested_values: + raise ValueError(self.name + ": provide suggested value for option") + + def __repr__(self): + sugg_string = "Suggestion: " + self.name + if self.description: + sugg_string += " description : " + self.description + else: + sugg_string += " option : " + self.option + " action : " + self.action.name + if self.suggested_values: + sugg_string += " suggested_values : " + str(self.suggested_values) + return sugg_string + + +class Condition(Section): + def __init__(self, name): + super().__init__(name) + self.data_source = None + self.trigger = None + + def perform_checks(self): + if not self.data_source: + raise ValueError(self.name + ": condition not tied to data source") + + def set_data_source(self, data_source): + self.data_source = data_source + + def get_data_source(self): + return self.data_source + + def reset_trigger(self): + self.trigger = None + + def set_trigger(self, condition_trigger): + self.trigger = condition_trigger + + def get_trigger(self): + return self.trigger + + def is_triggered(self): + if self.trigger: + return True + return False + + def set_parameter(self, key, value): + # must be defined by the subclass + raise NotImplementedError(self.name + ": provide source for condition") + + +class LogCondition(Condition): + @classmethod + def create(cls, base_condition): + base_condition.set_data_source(DataSource.Type["LOG"]) + base_condition.__class__ = cls + return base_condition + + def set_parameter(self, key, value): + if key == "regex": + self.regex = value + + def perform_checks(self): + super().perform_checks() + if not self.regex: + raise ValueError(self.name + ": provide regex for log condition") + + def __repr__(self): + log_cond_str = "LogCondition: " + self.name + log_cond_str += " regex: " + self.regex + # if self.trigger: + # log_cond_str += (" trigger: " + str(self.trigger)) + return log_cond_str + + +class OptionCondition(Condition): + @classmethod + def create(cls, base_condition): + base_condition.set_data_source(DataSource.Type["DB_OPTIONS"]) + base_condition.__class__ = cls + return base_condition + + def set_parameter(self, key, value): + if key == "options": + if isinstance(value, str): + self.options = [value] + else: + self.options = value + elif key == "evaluate": + self.eval_expr = value + + def perform_checks(self): + super().perform_checks() + if not self.options: + raise ValueError(self.name + ": options missing in condition") + if not self.eval_expr: + raise ValueError(self.name + ": expression missing in condition") + + def __repr__(self): + opt_cond_str = "OptionCondition: " + self.name + opt_cond_str += " options: " + str(self.options) + opt_cond_str += " expression: " + self.eval_expr + if self.trigger: + opt_cond_str += " trigger: " + str(self.trigger) + return opt_cond_str + + +class TimeSeriesCondition(Condition): + @classmethod + def create(cls, base_condition): + base_condition.set_data_source(DataSource.Type["TIME_SERIES"]) + base_condition.__class__ = cls + return base_condition + + def set_parameter(self, key, value): + if key == "keys": + if isinstance(value, str): + self.keys = [value] + else: + self.keys = value + elif key == "behavior": + self.behavior = TimeSeriesData.Behavior[value] + elif key == "rate_threshold": + self.rate_threshold = float(value) + elif key == "window_sec": + self.window_sec = int(value) + elif key == "evaluate": + self.expression = value + elif key == "aggregation_op": + self.aggregation_op = TimeSeriesData.AggregationOperator[value] + + def perform_checks(self): + if not self.keys: + raise ValueError(self.name + ": specify timeseries key") + if not self.behavior: + raise ValueError(self.name + ": specify triggering behavior") + if self.behavior is TimeSeriesData.Behavior.bursty: + if not self.rate_threshold: + raise ValueError(self.name + ": specify rate burst threshold") + if not self.window_sec: + self.window_sec = 300 # default window length is 5 minutes + if len(self.keys) > 1: + raise ValueError(self.name + ": specify only one key") + elif self.behavior is TimeSeriesData.Behavior.evaluate_expression: + if not (self.expression): + raise ValueError(self.name + ": specify evaluation expression") + else: + raise ValueError(self.name + ": trigger behavior not supported") + + def __repr__(self): + ts_cond_str = "TimeSeriesCondition: " + self.name + ts_cond_str += " statistics: " + str(self.keys) + ts_cond_str += " behavior: " + self.behavior.name + if self.behavior is TimeSeriesData.Behavior.bursty: + ts_cond_str += " rate_threshold: " + str(self.rate_threshold) + ts_cond_str += " window_sec: " + str(self.window_sec) + if self.behavior is TimeSeriesData.Behavior.evaluate_expression: + ts_cond_str += " expression: " + self.expression + if hasattr(self, "aggregation_op"): + ts_cond_str += " aggregation_op: " + self.aggregation_op.name + if self.trigger: + ts_cond_str += " trigger: " + str(self.trigger) + return ts_cond_str + + +class RulesSpec: + def __init__(self, rules_path): + self.file_path = rules_path + + def initialise_fields(self): + self.rules_dict = {} + self.conditions_dict = {} + self.suggestions_dict = {} + + def perform_section_checks(self): + for rule in self.rules_dict.values(): + rule.perform_checks() + for cond in self.conditions_dict.values(): + cond.perform_checks() + for sugg in self.suggestions_dict.values(): + sugg.perform_checks() + + def load_rules_from_spec(self): + self.initialise_fields() + with open(self.file_path, "r") as db_rules: + curr_section = None + for line in db_rules: + line = IniParser.remove_trailing_comment(line) + if not line: + continue + element = IniParser.get_element(line) + if element is IniParser.Element.comment: + continue + elif element is not IniParser.Element.key_val: + curr_section = element # it's a new IniParser header + section_name = IniParser.get_section_name(line) + if element is IniParser.Element.rule: + new_rule = Rule(section_name) + self.rules_dict[section_name] = new_rule + elif element is IniParser.Element.cond: + new_cond = Condition(section_name) + self.conditions_dict[section_name] = new_cond + elif element is IniParser.Element.sugg: + new_suggestion = Suggestion(section_name) + self.suggestions_dict[section_name] = new_suggestion + elif element is IniParser.Element.key_val: + key, value = IniParser.get_key_value_pair(line) + if curr_section is IniParser.Element.rule: + new_rule.set_parameter(key, value) + elif curr_section is IniParser.Element.cond: + if key == "source": + if value == "LOG": + new_cond = LogCondition.create(new_cond) + elif value == "OPTIONS": + new_cond = OptionCondition.create(new_cond) + elif value == "TIME_SERIES": + new_cond = TimeSeriesCondition.create(new_cond) + else: + new_cond.set_parameter(key, value) + elif curr_section is IniParser.Element.sugg: + new_suggestion.set_parameter(key, value) + + def get_rules_dict(self): + return self.rules_dict + + def get_conditions_dict(self): + return self.conditions_dict + + def get_suggestions_dict(self): + return self.suggestions_dict + + def get_triggered_rules(self, data_sources, column_families): + self.trigger_conditions(data_sources) + triggered_rules = [] + for rule in self.rules_dict.values(): + if rule.is_triggered(self.conditions_dict, column_families): + triggered_rules.append(rule) + return triggered_rules + + def trigger_conditions(self, data_sources): + for source_type in data_sources: + cond_subset = [ + cond + for cond in self.conditions_dict.values() + if cond.get_data_source() is source_type + ] + if not cond_subset: + continue + for source in data_sources[source_type]: + source.check_and_trigger_conditions(cond_subset) + + def print_rules(self, rules): + for rule in rules: + print("\nRule: " + rule.name) + for cond_name in rule.conditions: + print(repr(self.conditions_dict[cond_name])) + for sugg_name in rule.suggestions: + print(repr(self.suggestions_dict[sugg_name])) + if rule.trigger_entities: + print("scope: entities:") + print(rule.trigger_entities) + if rule.trigger_column_families: + print("scope: col_fam:") + print(rule.trigger_column_families) diff --git a/src/rocksdb/tools/advisor/advisor/rule_parser_example.py b/src/rocksdb/tools/advisor/advisor/rule_parser_example.py new file mode 100644 index 000000000..6c04ff2bf --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/rule_parser_example.py @@ -0,0 +1,98 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import argparse + +from advisor.db_log_parser import DatabaseLogs, DataSource +from advisor.db_options_parser import DatabaseOptions +from advisor.db_stats_fetcher import LogStatsParser, OdsStatsFetcher +from advisor.rule_parser import RulesSpec + + +def main(args): + # initialise the RulesSpec parser + rule_spec_parser = RulesSpec(args.rules_spec) + rule_spec_parser.load_rules_from_spec() + rule_spec_parser.perform_section_checks() + # initialize the DatabaseOptions object + db_options = DatabaseOptions(args.rocksdb_options) + # Create DatabaseLogs object + db_logs = DatabaseLogs(args.log_files_path_prefix, db_options.get_column_families()) + # Create the Log STATS object + db_log_stats = LogStatsParser( + args.log_files_path_prefix, args.stats_dump_period_sec + ) + data_sources = { + DataSource.Type.DB_OPTIONS: [db_options], + DataSource.Type.LOG: [db_logs], + DataSource.Type.TIME_SERIES: [db_log_stats], + } + if args.ods_client: + data_sources[DataSource.Type.TIME_SERIES].append( + OdsStatsFetcher( + args.ods_client, + args.ods_entity, + args.ods_tstart, + args.ods_tend, + args.ods_key_prefix, + ) + ) + triggered_rules = rule_spec_parser.get_triggered_rules( + data_sources, db_options.get_column_families() + ) + rule_spec_parser.print_rules(triggered_rules) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Use this script to get\ + suggestions for improving Rocksdb performance." + ) + parser.add_argument( + "--rules_spec", + required=True, + type=str, + help="path of the file containing the expert-specified Rules", + ) + parser.add_argument( + "--rocksdb_options", + required=True, + type=str, + help="path of the starting Rocksdb OPTIONS file", + ) + parser.add_argument( + "--log_files_path_prefix", + required=True, + type=str, + help="path prefix of the Rocksdb LOG files", + ) + parser.add_argument( + "--stats_dump_period_sec", + required=True, + type=int, + help="the frequency (in seconds) at which STATISTICS are printed to " + + "the Rocksdb LOG file", + ) + # ODS arguments + parser.add_argument("--ods_client", type=str, help="the ODS client binary") + parser.add_argument( + "--ods_entity", + type=str, + help="the servers for which the ODS stats need to be fetched", + ) + parser.add_argument( + "--ods_key_prefix", + type=str, + help="the prefix that needs to be attached to the keys of time " + + "series to be fetched from ODS", + ) + parser.add_argument( + "--ods_tstart", type=int, help="start time of timeseries to be fetched from ODS" + ) + parser.add_argument( + "--ods_tend", type=int, help="end time of timeseries to be fetched from ODS" + ) + args = parser.parse_args() + main(args) diff --git a/src/rocksdb/tools/advisor/advisor/rules.ini b/src/rocksdb/tools/advisor/advisor/rules.ini new file mode 100644 index 000000000..ec7a07e60 --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/rules.ini @@ -0,0 +1,214 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). +# +# FORMAT: very similar to the Rocksdb ini file in terms of syntax +# (refer rocksdb/examples/rocksdb_option_file_example.ini) +# +# The Rules INI file is made up of multiple sections and each section is made +# up of multiple key-value pairs. The recognized section types are: +# Rule, Suggestion, Condition. Each section must have a name specified in "" +# in the section header. This name acts as an identifier in that section +# type's namespace. A section header looks like: +# [<section_type> "<section_name_identifier>"] +# +# There should be at least one Rule section in the file with its corresponding +# Condition and Suggestion sections. A Rule is triggered only when all of its +# conditions are triggered. The order in which a Rule's conditions and +# suggestions are specified has no significance. +# +# A Condition must be associated with a data source specified by the parameter +# 'source' and this must be the first parameter specified for the Condition. +# A condition can be associated with one or more Rules. +# +# A Suggestion is an advised change to a Rocksdb option to improve the +# performance of the database in some way. Every suggestion can be a part of +# one or more Rules. + +[Rule "stall-too-many-memtables"] +suggestions=inc-bg-flush:inc-write-buffer +conditions=stall-too-many-memtables + +[Condition "stall-too-many-memtables"] +source=LOG +regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ + +[Rule "stall-too-many-L0"] +suggestions=inc-max-subcompactions:inc-max-bg-compactions:inc-write-buffer-size:dec-max-bytes-for-level-base:inc-l0-slowdown-writes-trigger +conditions=stall-too-many-L0 + +[Condition "stall-too-many-L0"] +source=LOG +regex=Stalling writes because we have \d+ level-0 files + +[Rule "stop-too-many-L0"] +suggestions=inc-max-bg-compactions:inc-write-buffer-size:inc-l0-stop-writes-trigger +conditions=stop-too-many-L0 + +[Condition "stop-too-many-L0"] +source=LOG +regex=Stopping writes because we have \d+ level-0 files + +[Rule "stall-too-many-compaction-bytes"] +suggestions=inc-max-bg-compactions:inc-write-buffer-size:inc-hard-pending-compaction-bytes-limit:inc-soft-pending-compaction-bytes-limit +conditions=stall-too-many-compaction-bytes + +[Condition "stall-too-many-compaction-bytes"] +source=LOG +regex=Stalling writes because of estimated pending compaction bytes \d+ + +[Suggestion "inc-bg-flush"] +option=DBOptions.max_background_flushes +action=increase +suggested_values=2 + +[Suggestion "inc-write-buffer"] +option=CFOptions.max_write_buffer_number +action=increase + +[Suggestion "inc-max-subcompactions"] +option=DBOptions.max_subcompactions +action=increase + +[Suggestion "inc-max-bg-compactions"] +option=DBOptions.max_background_compactions +action=increase +suggested_values=2 + +[Suggestion "inc-write-buffer-size"] +option=CFOptions.write_buffer_size +action=increase + +[Suggestion "dec-max-bytes-for-level-base"] +option=CFOptions.max_bytes_for_level_base +action=decrease + +[Suggestion "inc-l0-slowdown-writes-trigger"] +option=CFOptions.level0_slowdown_writes_trigger +action=increase + +[Suggestion "inc-l0-stop-writes-trigger"] +option=CFOptions.level0_stop_writes_trigger +action=increase + +[Suggestion "inc-hard-pending-compaction-bytes-limit"] +option=CFOptions.hard_pending_compaction_bytes_limit +action=increase + +[Suggestion "inc-soft-pending-compaction-bytes-limit"] +option=CFOptions.soft_pending_compaction_bytes_limit +action=increase + +[Rule "level0-level1-ratio"] +conditions=level0-level1-ratio +suggestions=inc-base-max-bytes + +[Condition "level0-level1-ratio"] +source=OPTIONS +options=CFOptions.level0_file_num_compaction_trigger:CFOptions.write_buffer_size:CFOptions.max_bytes_for_level_base +evaluate=int(options[0])*int(options[1])-int(options[2])>=1 # should evaluate to a boolean, condition triggered if evaluates to true + +[Suggestion "inc-base-max-bytes"] +option=CFOptions.max_bytes_for_level_base +action=increase + +[Rules "tuning-iostat-burst"] +conditions=large-db-get-p99 +suggestions=bytes-per-sync-non0:wal-bytes-per-sync-non0:set-rate-limiter +#overlap_time_period=10m + +[Condition "write-burst"] +source=TIME_SERIES +keys=dyno.flash_write_bytes_per_sec +behavior=bursty +window_sec=300 # the smaller this window, the more sensitivity to changes in the time series, so the rate_threshold should be bigger; when it's 60, then same as diff(%) +rate_threshold=20 + +[Condition "large-p99-read-latency"] +source=TIME_SERIES +keys=[]rocksdb.read.block.get.micros.p99 +behavior=bursty +window_sec=300 +rate_threshold=10 + +[Condition "large-db-get-p99"] +source=TIME_SERIES +keys=[]rocksdb.db.get.micros.p50:[]rocksdb.db.get.micros.p99 +behavior=evaluate_expression +evaluate=(keys[1]/keys[0])>5 + +[Suggestion "bytes-per-sync-non0"] +option=DBOptions.bytes_per_sync +action=set +suggested_values=1048576 + +[Suggestion "wal-bytes-per-sync-non0"] +option=DBOptions.wal_bytes_per_sync +action=set +suggested_values=1048576 + +[Suggestion "set-rate-limiter"] +option=rate_limiter_bytes_per_sec +action=set +suggested_values=1024000 + +[Rule "bloom-filter-percent-useful"] +conditions=bloom-filter-percent-useful +suggestions=inc-bloom-bits-per-key + +[Condition "bloom-filter-percent-useful"] +source=TIME_SERIES +keys=[]rocksdb.bloom.filter.useful.count:[]rocksdb.bloom.filter.full.positive.count:[]rocksdb.bloom.filter.full.true.positive.count +behavior=evaluate_expression +evaluate=((keys[0]+keys[2])/(keys[0]+keys[1]))<0.9 # should evaluate to a boolean +aggregation_op=latest + +[Rule "bloom-not-enabled"] +conditions=bloom-not-enabled +suggestions=inc-bloom-bits-per-key + +[Condition "bloom-not-enabled"] +source=TIME_SERIES +keys=[]rocksdb.bloom.filter.useful.count:[]rocksdb.bloom.filter.full.positive.count:[]rocksdb.bloom.filter.full.true.positive.count +behavior=evaluate_expression +evaluate=keys[0]+keys[1]+keys[2]==0 +aggregation_op=avg + +[Suggestion "inc-bloom-bits-per-key"] +option=bloom_bits +action=increase +suggested_values=2 + +[Rule "small-l0-files"] +conditions=small-l0-files +suggestions=dec-max-bytes-for-level-base:inc-write-buffer-size + +[Condition "small-l0-files"] +source=OPTIONS +options=CFOptions.max_bytes_for_level_base:CFOptions.level0_file_num_compaction_trigger:CFOptions.write_buffer_size +evaluate=int(options[0])>(10*int(options[1])*int(options[2])) + +[Rule "decompress-time-long"] +conditions=decompress-time-long +suggestions=dec-block-size:inc-block-cache-size:faster-compression-type + +[Condition "decompress-time-long"] +source=TIME_SERIES +keys=block_decompress_time:block_read_time:block_checksum_time +behavior=evaluate_expression +evaluate=(keys[0]/(keys[0]+keys[1]+keys[2]))>0.3 + +[Suggestion "dec-block-size"] +option=TableOptions.BlockBasedTable.block_size +action=decrease + +[Suggestion "inc-block-cache-size"] +option=cache_size +action=increase +suggested_values=16000000 + +[Suggestion "faster-compression-type"] +option=CFOptions.compression +action=set +suggested_values=kLZ4Compression diff --git a/src/rocksdb/tools/advisor/test/__init__.py b/src/rocksdb/tools/advisor/test/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/rocksdb/tools/advisor/test/__init__.py diff --git a/src/rocksdb/tools/advisor/test/input_files/LOG-0 b/src/rocksdb/tools/advisor/test/input_files/LOG-0 new file mode 100644 index 000000000..3c9d51641 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/LOG-0 @@ -0,0 +1,30 @@ +2018/05/25-14:30:05.601692 7f82bd676200 RocksDB version: 5.14.0 +2018/05/25-14:30:07.626719 7f82ba72e700 (Original Log Time 2018/05/25-14:30:07.621966) [db/db_impl_compaction_flush.cc:1424] Calling FlushMemTableToOutputFile with column family [default], flush slots available 1, compaction slots available 1, flush slots scheduled 1, compaction slots scheduled 0 +2018/05/25-14:30:07.626725 7f82ba72e700 [db/flush_job.cc:301] [default] [JOB 3] Flushing memtable with next log file: 8 +2018/05/25-14:30:07.626738 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527283807626732, "job": 3, "event": "flush_started", "num_memtables": 1, "num_entries": 28018, "num_deletes": 0, "memory_usage": 4065512, "flush_reason": "Write Buffer Full"} +2018/05/25-14:30:07.626740 7f82ba72e700 [db/flush_job.cc:331] [default] [JOB 3] Level-0 flush table #10: started +2018/05/25-14:30:07.764232 7f82b2f20700 [db/db_impl_write.cc:1373] [default] New memtable created with log file: #11. Immutable memtables: 1. +2018/05/25-14:30:07.764240 7f82b2f20700 [WARN] [db/column_family.cc:743] [default] Stopping writes because we have 2 immutable memtables (waiting for flush), max_write_buffer_number is set to 2 +2018/05/23-11:53:12.800143 7f9f36b40700 [WARN] [db/column_family.cc:799] [default] Stalling writes because we have 4 level-0 files rate 39886 +2018/05/23-11:53:12.800143 7f9f36b40700 [WARN] [db/column_family.cc:799] [default] Stopping writes because we have 4 level-0 files rate 39886 +2018/05/25-14:30:09.398302 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527283809398276, "cf_name": "default", "job": 3, "event": "table_file_creation", "file_number": 10, "file_size": 1890434, "table_properties": {"data_size": 1876749, "index_size": 23346, "filter_size": 0, "raw_key_size": 663120, "raw_average_key_size": 24, "raw_value_size": 2763000, "raw_average_value_size": 100, "num_data_blocks": 838, "num_entries": 27630, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": "0"}} +2018/05/25-14:30:09.398351 7f82ba72e700 [db/flush_job.cc:371] [default] [JOB 3] Level-0 flush table #10: 1890434 bytes OK +2018/05/25-14:30:25.491635 7f82ba72e700 [db/flush_job.cc:331] [default] [JOB 10] Level-0 flush table #23: started +2018/05/25-14:30:25.643618 7f82b2f20700 [db/db_impl_write.cc:1373] [default] New memtable created with log file: #24. Immutable memtables: 1. +2018/05/25-14:30:25.643633 7f82b2f20700 [WARN] [db/column_family.cc:743] [default] Stopping writes because we have 2 immutable memtables (waiting for flush), max_write_buffer_number is set to 2 +2018/05/25-14:30:27.288181 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527283827288158, "cf_name": "default", "job": 10, "event": "table_file_creation", "file_number": 23, "file_size": 1893200, "table_properties": {"data_size": 1879460, "index_size": 23340, "filter_size": 0, "raw_key_size": 663360, "raw_average_key_size": 24, "raw_value_size": 2764000, "raw_average_value_size": 100, "num_data_blocks": 838, "num_entries": 27640, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": "0"}} +2018/05/25-14:30:27.288210 7f82ba72e700 [db/flush_job.cc:371] [default] [JOB 10] Level-0 flush table #23: 1893200 bytes OK +2018/05/25-14:30:27.289353 7f82ba72e700 [WARN] [db/column_family.cc:764] [default] Stalling writes because of estimated pending compaction bytes 14410584 +2018/05/25-14:30:27.289390 7f82ba72e700 (Original Log Time 2018/05/25-14:30:27.288829) [db/memtable_list.cc:377] [default] Level-0 commit table #23 started +2018/05/25-14:30:27.289393 7f82ba72e700 (Original Log Time 2018/05/25-14:30:27.289332) [db/memtable_list.cc:409] [default] Level-0 commit table #23: memtable #1 done +2018/05/25-14:34:21.047206 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527284061047181, "cf_name": "default", "job": 44, "event": "table_file_creation", "file_number": 84, "file_size": 1890780, "table_properties": {"data_size": 1877100, "index_size": 23309, "filter_size": 0, "raw_key_size": 662808, "raw_average_key_size": 24, "raw_value_size": 2761700, "raw_average_value_size": 100, "num_data_blocks": 837, "num_entries": 27617, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": "0"}} +2018/05/25-14:34:21.047233 7f82ba72e700 [db/flush_job.cc:371] [default] [JOB 44] Level-0 flush table #84: 1890780 bytes OK +2018/05/25-14:34:21.048017 7f82ba72e700 (Original Log Time 2018/05/25-14:34:21.048005) EVENT_LOG_v1 {"time_micros": 1527284061047997, "job": 44, "event": "flush_finished", "output_compression": "Snappy", "lsm_state": [2, 1, 0, 0, 0, 0, 0], "immutable_memtables": 1} +2018/05/25-14:34:21.048592 7f82bd676200 [DEBUG] [db/db_impl_files.cc:261] [JOB 45] Delete /tmp/rocksdbtest-155919/dbbench/000084.sst type=2 #84 -- OK +2018/05/25-14:34:21.048603 7f82bd676200 EVENT_LOG_v1 {"time_micros": 1527284061048600, "job": 45, "event": "table_file_deletion", "file_number": 84} +2018/05/25-14:34:21.048981 7f82bd676200 [db/db_impl.cc:398] Shutdown complete +2018/05/25-14:34:21.049000 7f82bd676200 [db/db_impl.cc:563] [col-fam-A] random log message for testing +2018/05/25-14:34:21.049010 7f82bd676200 [db/db_impl.cc:234] [col-fam-B] log continuing on next line +remaining part of the log +2018/05/25-14:34:21.049020 7f82bd676200 [db/db_impl.cc:653] [col-fam-A] another random log message +2018/05/25-14:34:21.049025 7f82bd676200 [db/db_impl.cc:331] [unknown] random log message no column family diff --git a/src/rocksdb/tools/advisor/test/input_files/LOG-1 b/src/rocksdb/tools/advisor/test/input_files/LOG-1 new file mode 100644 index 000000000..b163f9a99 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/LOG-1 @@ -0,0 +1,25 @@ +2018/05/25-14:30:05.601692 7f82bd676200 RocksDB version: 5.14.0 +2018/05/25-14:30:07.626719 7f82ba72e700 (Original Log Time 2018/05/25-14:30:07.621966) [db/db_impl_compaction_flush.cc:1424] Calling FlushMemTableToOutputFile with column family [default], flush slots available 1, compaction slots available 1, flush slots scheduled 1, compaction slots scheduled 0 +2018/05/25-14:30:07.626725 7f82ba72e700 [db/flush_job.cc:301] [default] [JOB 3] Flushing memtable with next log file: 8 +2018/05/25-14:30:07.626738 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527283807626732, "job": 3, "event": "flush_started", "num_memtables": 1, "num_entries": 28018, "num_deletes": 0, "memory_usage": 4065512, "flush_reason": "Write Buffer Full"} +2018/05/25-14:30:07.626740 7f82ba72e700 [db/flush_job.cc:331] [default] [JOB 3] Level-0 flush table #10: started +2018/05/25-14:30:07.764232 7f82b2f20700 [db/db_impl_write.cc:1373] [default] New memtable created with log file: #11. Immutable memtables: 1. +2018/05/25-14:30:07.764240 7f82b2f20700 [WARN] [db/column_family.cc:743] [default] Stopping writes because we have 2 immutable memtables (waiting for flush), max_write_buffer_number is set to 2 +2018/05/23-11:53:12.800143 7f9f36b40700 [WARN] [db/column_family.cc:799] [default] Stalling writes because we have 4 level-0 files rate 39886 +2018/05/23-11:53:12.800143 7f9f36b40700 [WARN] [db/column_family.cc:799] [default] Stopping writes because we have 4 level-0 files rate 39886 +2018/05/25-14:30:09.398302 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527283809398276, "cf_name": "default", "job": 3, "event": "table_file_creation", "file_number": 10, "file_size": 1890434, "table_properties": {"data_size": 1876749, "index_size": 23346, "filter_size": 0, "raw_key_size": 663120, "raw_average_key_size": 24, "raw_value_size": 2763000, "raw_average_value_size": 100, "num_data_blocks": 838, "num_entries": 27630, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": "0"}} +2018/05/25-14:30:09.398351 7f82ba72e700 [db/flush_job.cc:371] [default] [JOB 3] Level-0 flush table #10: 1890434 bytes OK +2018/05/25-14:30:25.491635 7f82ba72e700 [db/flush_job.cc:331] [default] [JOB 10] Level-0 flush table #23: started +2018/05/25-14:30:25.643618 7f82b2f20700 [db/db_impl_write.cc:1373] [default] New memtable created with log file: #24. Immutable memtables: 1. +2018/05/25-14:30:25.643633 7f82b2f20700 [WARN] [db/column_family.cc:743] [default] Stopping writes because we have 2 immutable memtables (waiting for flush), max_write_buffer_number is set to 2 +2018/05/25-14:30:27.288181 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527283827288158, "cf_name": "default", "job": 10, "event": "table_file_creation", "file_number": 23, "file_size": 1893200, "table_properties": {"data_size": 1879460, "index_size": 23340, "filter_size": 0, "raw_key_size": 663360, "raw_average_key_size": 24, "raw_value_size": 2764000, "raw_average_value_size": 100, "num_data_blocks": 838, "num_entries": 27640, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": "0"}} +2018/05/25-14:30:27.288210 7f82ba72e700 [db/flush_job.cc:371] [default] [JOB 10] Level-0 flush table #23: 1893200 bytes OK +2018/05/25-14:30:27.289353 7f82ba72e700 [WARN] [db/column_family.cc:764] [default] Stopping writes because of estimated pending compaction bytes 14410584 +2018/05/25-14:30:27.289390 7f82ba72e700 (Original Log Time 2018/05/25-14:30:27.288829) [db/memtable_list.cc:377] [default] Level-0 commit table #23 started +2018/05/25-14:30:27.289393 7f82ba72e700 (Original Log Time 2018/05/25-14:30:27.289332) [db/memtable_list.cc:409] [default] Level-0 commit table #23: memtable #1 done +2018/05/25-14:34:21.047206 7f82ba72e700 EVENT_LOG_v1 {"time_micros": 1527284061047181, "cf_name": "default", "job": 44, "event": "table_file_creation", "file_number": 84, "file_size": 1890780, "table_properties": {"data_size": 1877100, "index_size": 23309, "filter_size": 0, "raw_key_size": 662808, "raw_average_key_size": 24, "raw_value_size": 2761700, "raw_average_value_size": 100, "num_data_blocks": 837, "num_entries": 27617, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": "0"}} +2018/05/25-14:34:21.047233 7f82ba72e700 [db/flush_job.cc:371] [default] [JOB 44] Level-0 flush table #84: 1890780 bytes OK +2018/05/25-14:34:21.048017 7f82ba72e700 (Original Log Time 2018/05/25-14:34:21.048005) EVENT_LOG_v1 {"time_micros": 1527284061047997, "job": 44, "event": "flush_finished", "output_compression": "Snappy", "lsm_state": [2, 1, 0, 0, 0, 0, 0], "immutable_memtables": 1} +2018/05/25-14:34:21.048592 7f82bd676200 [DEBUG] [db/db_impl_files.cc:261] [JOB 45] Delete /tmp/rocksdbtest-155919/dbbench/000084.sst type=2 #84 -- OK +2018/05/25-14:34:21.048603 7f82bd676200 EVENT_LOG_v1 {"time_micros": 1527284061048600, "job": 45, "event": "table_file_deletion", "file_number": 84} +2018/05/25-14:34:21.048981 7f82bd676200 [db/db_impl.cc:398] Shutdown complete diff --git a/src/rocksdb/tools/advisor/test/input_files/OPTIONS-000005 b/src/rocksdb/tools/advisor/test/input_files/OPTIONS-000005 new file mode 100644 index 000000000..009edb04d --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/OPTIONS-000005 @@ -0,0 +1,49 @@ +# This is a RocksDB option file. +# +# For detailed file format spec, please refer to the example file +# in examples/rocksdb_option_file_example.ini +# + +[Version] + rocksdb_version=5.14.0 + options_file_version=1.1 + +[DBOptions] + manual_wal_flush=false + allow_ingest_behind=false + db_write_buffer_size=0 + db_log_dir= + random_access_max_buffer_size=1048576 + +[CFOptions "default"] + ttl=0 + max_bytes_for_level_base=268435456 + max_bytes_for_level_multiplier=10.000000 + level0_file_num_compaction_trigger=4 + level0_stop_writes_trigger=36 + write_buffer_size=4194000 + min_write_buffer_number_to_merge=1 + num_levels=7 + compaction_filter_factory=nullptr + compaction_style=kCompactionStyleLevel + +[TableOptions/BlockBasedTable "default"] + block_align=false + index_type=kBinarySearch + +[CFOptions "col_fam_A"] +ttl=0 +max_bytes_for_level_base=268435456 +max_bytes_for_level_multiplier=10.000000 +level0_file_num_compaction_trigger=5 +level0_stop_writes_trigger=36 +write_buffer_size=1024000 +min_write_buffer_number_to_merge=1 +num_levels=5 +compaction_filter_factory=nullptr +compaction_style=kCompactionStyleLevel + +[TableOptions/BlockBasedTable "col_fam_A"] +block_align=true +block_restart_interval=16 +index_type=kBinarySearch diff --git a/src/rocksdb/tools/advisor/test/input_files/log_stats_parser_keys_ts b/src/rocksdb/tools/advisor/test/input_files/log_stats_parser_keys_ts new file mode 100644 index 000000000..e8ade9e3e --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/log_stats_parser_keys_ts @@ -0,0 +1,3 @@ +rocksdb.number.block.decompressed.count: 1530896335 88.0, 1530896361 788338.0, 1530896387 1539256.0, 1530896414 2255696.0, 1530896440 3009325.0, 1530896466 3767183.0, 1530896492 4529775.0, 1530896518 5297809.0, 1530896545 6033802.0, 1530896570 6794129.0 +rocksdb.db.get.micros.p50: 1530896335 295.5, 1530896361 16.561841, 1530896387 16.20677, 1530896414 16.31508, 1530896440 16.346602, 1530896466 16.284669, 1530896492 16.16005, 1530896518 16.069096, 1530896545 16.028746, 1530896570 15.9638 +rocksdb.manifest.file.sync.micros.p99: 1530896335 649.0, 1530896361 835.0, 1530896387 1435.0, 1530896414 9938.0, 1530896440 9938.0, 1530896466 9938.0, 1530896492 9938.0, 1530896518 1882.0, 1530896545 1837.0, 1530896570 1792.0 diff --git a/src/rocksdb/tools/advisor/test/input_files/rules_err1.ini b/src/rocksdb/tools/advisor/test/input_files/rules_err1.ini new file mode 100644 index 000000000..23be55dde --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/rules_err1.ini @@ -0,0 +1,56 @@ +[Rule "missing-suggestions"] +suggestions= +conditions=missing-source + +[Condition "normal-rule"] +source=LOG +regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ + +[Suggestion "inc-bg-flush"] +option=DBOptions.max_background_flushes +action=increase + +[Suggestion "inc-write-buffer"] +option=CFOptions.max_write_buffer_number +action=increase + +[Rule "missing-conditions"] +conditions= +suggestions=missing-description + +[Condition "missing-options"] +source=OPTIONS +options= +evaluate=int(options[0])*int(options[1])-int(options[2])<(-251659456) # should evaluate to a boolean + +[Rule "missing-expression"] +conditions=missing-expression +suggestions=missing-description + +[Condition "missing-expression"] +source=OPTIONS +options=CFOptions.level0_file_num_compaction_trigger:CFOptions.write_buffer_size:CFOptions.max_bytes_for_level_base +evaluate= + +[Suggestion "missing-description"] +description= + +[Rule "stop-too-many-L0"] +suggestions=inc-max-bg-compactions:missing-action:inc-l0-stop-writes-trigger +conditions=missing-regex + +[Condition "missing-regex"] +source=LOG +regex= + +[Suggestion "missing-option"] +option= +action=increase + +[Suggestion "normal-suggestion"] +option=CFOptions.write_buffer_size +action=increase + +[Suggestion "inc-l0-stop-writes-trigger"] +option=CFOptions.level0_stop_writes_trigger +action=increase diff --git a/src/rocksdb/tools/advisor/test/input_files/rules_err2.ini b/src/rocksdb/tools/advisor/test/input_files/rules_err2.ini new file mode 100644 index 000000000..bce21dba9 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/rules_err2.ini @@ -0,0 +1,15 @@ +[Rule "normal-rule"] +suggestions=inc-bg-flush:inc-write-buffer +conditions=missing-source + +[Condition "missing-source"] +source= +regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ + +[Suggestion "inc-bg-flush"] +option=DBOptions.max_background_flushes +action=increase + +[Suggestion "inc-write-buffer"] +option=CFOptions.max_write_buffer_number +action=increase diff --git a/src/rocksdb/tools/advisor/test/input_files/rules_err3.ini b/src/rocksdb/tools/advisor/test/input_files/rules_err3.ini new file mode 100644 index 000000000..73c06e469 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/rules_err3.ini @@ -0,0 +1,15 @@ +[Rule "normal-rule"] +suggestions=missing-action:inc-write-buffer +conditions=missing-source + +[Condition "normal-condition"] +source=LOG +regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ + +[Suggestion "missing-action"] +option=DBOptions.max_background_flushes +action= + +[Suggestion "inc-write-buffer"] +option=CFOptions.max_write_buffer_number +action=increase diff --git a/src/rocksdb/tools/advisor/test/input_files/rules_err4.ini b/src/rocksdb/tools/advisor/test/input_files/rules_err4.ini new file mode 100644 index 000000000..4d4aa3c70 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/rules_err4.ini @@ -0,0 +1,15 @@ +[Rule "normal-rule"] +suggestions=inc-bg-flush +conditions=missing-source + +[Condition "normal-condition"] +source=LOG +regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ + +[Suggestion "inc-bg-flush"] +option=DBOptions.max_background_flushes +action=increase + +[Suggestion] # missing section name +option=CFOptions.max_write_buffer_number +action=increase diff --git a/src/rocksdb/tools/advisor/test/input_files/test_rules.ini b/src/rocksdb/tools/advisor/test/input_files/test_rules.ini new file mode 100644 index 000000000..97b9374fc --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/test_rules.ini @@ -0,0 +1,47 @@ +[Rule "single-condition-false"] +suggestions=inc-bg-flush:inc-write-buffer +conditions=log-4-false + +[Rule "multiple-conds-true"] +suggestions=inc-write-buffer +conditions=log-1-true:log-2-true:log-3-true + +[Rule "multiple-conds-one-false"] +suggestions=inc-bg-flush +conditions=log-1-true:log-4-false:log-3-true + +[Rule "multiple-conds-all-false"] +suggestions=l0-l1-ratio-health-check +conditions=log-4-false:options-1-false + +[Condition "log-1-true"] +source=LOG +regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ + +[Condition "log-2-true"] +source=LOG +regex=Stalling writes because we have \d+ level-0 files + +[Condition "log-3-true"] +source=LOG +regex=Stopping writes because we have \d+ level-0 files + +[Condition "log-4-false"] +source=LOG +regex=Stalling writes because of estimated pending compaction bytes \d+ + +[Condition "options-1-false"] +source=OPTIONS +options=CFOptions.level0_file_num_compaction_trigger:CFOptions.write_buffer_size:DBOptions.random_access_max_buffer_size +evaluate=int(options[0])*int(options[1])-int(options[2])<0 # should evaluate to a boolean + +[Suggestion "inc-bg-flush"] +option=DBOptions.max_background_flushes +action=increase + +[Suggestion "inc-write-buffer"] +option=CFOptions.max_write_buffer_number +action=increase + +[Suggestion "l0-l1-ratio-health-check"] +description='modify options such that (level0_file_num_compaction_trigger * write_buffer_size - max_bytes_for_level_base < 5) is satisfied' diff --git a/src/rocksdb/tools/advisor/test/input_files/triggered_rules.ini b/src/rocksdb/tools/advisor/test/input_files/triggered_rules.ini new file mode 100644 index 000000000..83b96da2b --- /dev/null +++ b/src/rocksdb/tools/advisor/test/input_files/triggered_rules.ini @@ -0,0 +1,83 @@ +[Rule "stall-too-many-memtables"] +suggestions=inc-bg-flush:inc-write-buffer +conditions=stall-too-many-memtables + +[Condition "stall-too-many-memtables"] +source=LOG +regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ + +[Rule "stall-too-many-L0"] +suggestions=inc-max-subcompactions:inc-max-bg-compactions:inc-write-buffer-size:dec-max-bytes-for-level-base:inc-l0-slowdown-writes-trigger +conditions=stall-too-many-L0 + +[Condition "stall-too-many-L0"] +source=LOG +regex=Stalling writes because we have \d+ level-0 files + +[Rule "stop-too-many-L0"] +suggestions=inc-max-bg-compactions:inc-write-buffer-size:inc-l0-stop-writes-trigger +conditions=stop-too-many-L0 + +[Condition "stop-too-many-L0"] +source=LOG +regex=Stopping writes because we have \d+ level-0 files + +[Rule "stall-too-many-compaction-bytes"] +suggestions=inc-max-bg-compactions:inc-write-buffer-size:inc-hard-pending-compaction-bytes-limit:inc-soft-pending-compaction-bytes-limit +conditions=stall-too-many-compaction-bytes + +[Condition "stall-too-many-compaction-bytes"] +source=LOG +regex=Stalling writes because of estimated pending compaction bytes \d+ + +[Suggestion "inc-bg-flush"] +option=DBOptions.max_background_flushes +action=increase + +[Suggestion "inc-write-buffer"] +option=CFOptions.max_write_buffer_number +action=increase + +[Suggestion "inc-max-subcompactions"] +option=DBOptions.max_subcompactions +action=increase + +[Suggestion "inc-max-bg-compactions"] +option=DBOptions.max_background_compactions +action=increase + +[Suggestion "inc-write-buffer-size"] +option=CFOptions.write_buffer_size +action=increase + +[Suggestion "dec-max-bytes-for-level-base"] +option=CFOptions.max_bytes_for_level_base +action=decrease + +[Suggestion "inc-l0-slowdown-writes-trigger"] +option=CFOptions.level0_slowdown_writes_trigger +action=increase + +[Suggestion "inc-l0-stop-writes-trigger"] +option=CFOptions.level0_stop_writes_trigger +action=increase + +[Suggestion "inc-hard-pending-compaction-bytes-limit"] +option=CFOptions.hard_pending_compaction_bytes_limit +action=increase + +[Suggestion "inc-soft-pending-compaction-bytes-limit"] +option=CFOptions.soft_pending_compaction_bytes_limit +action=increase + +[Rule "level0-level1-ratio"] +conditions=level0-level1-ratio +suggestions=l0-l1-ratio-health-check + +[Condition "level0-level1-ratio"] +source=OPTIONS +options=CFOptions.level0_file_num_compaction_trigger:CFOptions.write_buffer_size:CFOptions.max_bytes_for_level_base +evaluate=int(options[0])*int(options[1])-int(options[2])>=-268173312 # should evaluate to a boolean, condition triggered if evaluates to true + +[Suggestion "l0-l1-ratio-health-check"] +description='modify options such that (level0_file_num_compaction_trigger * write_buffer_size - max_bytes_for_level_base < -268173312) is satisfied' diff --git a/src/rocksdb/tools/advisor/test/test_db_bench_runner.py b/src/rocksdb/tools/advisor/test/test_db_bench_runner.py new file mode 100644 index 000000000..57306c942 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/test_db_bench_runner.py @@ -0,0 +1,141 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import os +import unittest + +from advisor.db_bench_runner import DBBenchRunner +from advisor.db_log_parser import DataSource, NO_COL_FAMILY +from advisor.db_options_parser import DatabaseOptions + + +class TestDBBenchRunnerMethods(unittest.TestCase): + def setUp(self): + self.pos_args = [ + "./../../db_bench", + "overwrite", + "use_existing_db=true", + "duration=10", + ] + self.bench_runner = DBBenchRunner(self.pos_args) + this_path = os.path.abspath(os.path.dirname(__file__)) + options_path = os.path.join(this_path, "input_files/OPTIONS-000005") + self.db_options = DatabaseOptions(options_path) + + def test_setup(self): + self.assertEqual(self.bench_runner.db_bench_binary, self.pos_args[0]) + self.assertEqual(self.bench_runner.benchmark, self.pos_args[1]) + self.assertSetEqual( + set(self.bench_runner.db_bench_args), set(self.pos_args[2:]) + ) + + def test_get_info_log_file_name(self): + log_file_name = DBBenchRunner.get_info_log_file_name(None, "random_path") + self.assertEqual(log_file_name, "LOG") + + log_file_name = DBBenchRunner.get_info_log_file_name( + "/dev/shm/", "/tmp/rocksdbtest-155919/dbbench/" + ) + self.assertEqual(log_file_name, "tmp_rocksdbtest-155919_dbbench_LOG") + + def test_get_opt_args_str(self): + misc_opt_dict = {"bloom_bits": 2, "empty_opt": None, "rate_limiter": 3} + optional_args_str = DBBenchRunner.get_opt_args_str(misc_opt_dict) + self.assertEqual(optional_args_str, " --bloom_bits=2 --rate_limiter=3") + + def test_get_log_options(self): + db_path = "/tmp/rocksdb-155919/dbbench" + # when db_log_dir is present in the db_options + update_dict = { + "DBOptions.db_log_dir": {NO_COL_FAMILY: "/dev/shm"}, + "DBOptions.stats_dump_period_sec": {NO_COL_FAMILY: "20"}, + } + self.db_options.update_options(update_dict) + log_file_prefix, stats_freq = self.bench_runner.get_log_options( + self.db_options, db_path + ) + self.assertEqual(log_file_prefix, "/dev/shm/tmp_rocksdb-155919_dbbench_LOG") + self.assertEqual(stats_freq, 20) + + update_dict = { + "DBOptions.db_log_dir": {NO_COL_FAMILY: None}, + "DBOptions.stats_dump_period_sec": {NO_COL_FAMILY: "30"}, + } + self.db_options.update_options(update_dict) + log_file_prefix, stats_freq = self.bench_runner.get_log_options( + self.db_options, db_path + ) + self.assertEqual(log_file_prefix, "/tmp/rocksdb-155919/dbbench/LOG") + self.assertEqual(stats_freq, 30) + + def test_build_experiment_command(self): + # add some misc_options to db_options + update_dict = { + "bloom_bits": {NO_COL_FAMILY: 2}, + "rate_limiter_bytes_per_sec": {NO_COL_FAMILY: 128000000}, + } + self.db_options.update_options(update_dict) + db_path = "/dev/shm" + experiment_command = self.bench_runner._build_experiment_command( + self.db_options, db_path + ) + opt_args_str = DBBenchRunner.get_opt_args_str( + self.db_options.get_misc_options() + ) + opt_args_str += " --options_file=" + self.db_options.generate_options_config( + "12345" + ) + for arg in self.pos_args[2:]: + opt_args_str += " --" + arg + expected_command = ( + self.pos_args[0] + + " --benchmarks=" + + self.pos_args[1] + + " --statistics --perf_level=3 --db=" + + db_path + + opt_args_str + ) + self.assertEqual(experiment_command, expected_command) + + +class TestDBBenchRunner(unittest.TestCase): + def setUp(self): + # Note: the db_bench binary should be present in the rocksdb/ directory + self.pos_args = [ + "./../../db_bench", + "overwrite", + "use_existing_db=true", + "duration=20", + ] + self.bench_runner = DBBenchRunner(self.pos_args) + this_path = os.path.abspath(os.path.dirname(__file__)) + options_path = os.path.join(this_path, "input_files/OPTIONS-000005") + self.db_options = DatabaseOptions(options_path) + + def test_experiment_output(self): + update_dict = {"bloom_bits": {NO_COL_FAMILY: 2}} + self.db_options.update_options(update_dict) + db_path = "/dev/shm" + data_sources, throughput = self.bench_runner.run_experiment( + self.db_options, db_path + ) + self.assertEqual( + data_sources[DataSource.Type.DB_OPTIONS][0].type, DataSource.Type.DB_OPTIONS + ) + self.assertEqual(data_sources[DataSource.Type.LOG][0].type, DataSource.Type.LOG) + self.assertEqual(len(data_sources[DataSource.Type.TIME_SERIES]), 2) + self.assertEqual( + data_sources[DataSource.Type.TIME_SERIES][0].type, + DataSource.Type.TIME_SERIES, + ) + self.assertEqual( + data_sources[DataSource.Type.TIME_SERIES][1].type, + DataSource.Type.TIME_SERIES, + ) + self.assertEqual(data_sources[DataSource.Type.TIME_SERIES][1].stats_freq_sec, 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/rocksdb/tools/advisor/test/test_db_log_parser.py b/src/rocksdb/tools/advisor/test/test_db_log_parser.py new file mode 100644 index 000000000..6862691c1 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/test_db_log_parser.py @@ -0,0 +1,96 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import os +import unittest + +from advisor.db_log_parser import DatabaseLogs, Log, NO_COL_FAMILY +from advisor.rule_parser import Condition, LogCondition + + +class TestLog(unittest.TestCase): + def setUp(self): + self.column_families = ["default", "col_fam_A"] + + def test_get_column_family(self): + test_log = ( + "2018/05/25-14:34:21.047233 7f82ba72e700 [db/flush_job.cc:371] " + + "[col_fam_A] [JOB 44] Level-0 flush table #84: 1890780 bytes OK" + ) + db_log = Log(test_log, self.column_families) + self.assertEqual("col_fam_A", db_log.get_column_family()) + + test_log = ( + "2018/05/25-14:34:21.047233 7f82ba72e700 [db/flush_job.cc:371] " + + "[JOB 44] Level-0 flush table #84: 1890780 bytes OK" + ) + db_log = Log(test_log, self.column_families) + db_log.append_message("[default] some remaining part of log") + self.assertEqual(NO_COL_FAMILY, db_log.get_column_family()) + + def test_get_methods(self): + hr_time = "2018/05/25-14:30:25.491635" + context = "7f82ba72e700" + message = ( + "[db/flush_job.cc:331] [default] [JOB 10] Level-0 flush table " + + "#23: started" + ) + test_log = hr_time + " " + context + " " + message + db_log = Log(test_log, self.column_families) + self.assertEqual(db_log.get_message(), message) + remaining_message = "[col_fam_A] some more logs" + db_log.append_message(remaining_message) + self.assertEqual(db_log.get_human_readable_time(), "2018/05/25-14:30:25.491635") + self.assertEqual(db_log.get_context(), "7f82ba72e700") + self.assertEqual(db_log.get_timestamp(), 1527258625) + self.assertEqual(db_log.get_message(), str(message + "\n" + remaining_message)) + + def test_is_new_log(self): + new_log = "2018/05/25-14:34:21.047233 context random new log" + remaining_log = "2018/05/25 not really a new log" + self.assertTrue(Log.is_new_log(new_log)) + self.assertFalse(Log.is_new_log(remaining_log)) + + +class TestDatabaseLogs(unittest.TestCase): + def test_check_and_trigger_conditions(self): + this_path = os.path.abspath(os.path.dirname(__file__)) + logs_path_prefix = os.path.join(this_path, "input_files/LOG-0") + column_families = ["default", "col-fam-A", "col-fam-B"] + db_logs = DatabaseLogs(logs_path_prefix, column_families) + # matches, has 2 col_fams + condition1 = LogCondition.create(Condition("cond-A")) + condition1.set_parameter("regex", "random log message") + # matches, multiple lines message + condition2 = LogCondition.create(Condition("cond-B")) + condition2.set_parameter("regex", "continuing on next line") + # does not match + condition3 = LogCondition.create(Condition("cond-C")) + condition3.set_parameter("regex", "this should match no log") + db_logs.check_and_trigger_conditions([condition1, condition2, condition3]) + cond1_trigger = condition1.get_trigger() + self.assertEqual(2, len(cond1_trigger.keys())) + self.assertSetEqual({"col-fam-A", NO_COL_FAMILY}, set(cond1_trigger.keys())) + self.assertEqual(2, len(cond1_trigger["col-fam-A"])) + messages = [ + "[db/db_impl.cc:563] [col-fam-A] random log message for testing", + "[db/db_impl.cc:653] [col-fam-A] another random log message", + ] + self.assertIn(cond1_trigger["col-fam-A"][0].get_message(), messages) + self.assertIn(cond1_trigger["col-fam-A"][1].get_message(), messages) + self.assertEqual(1, len(cond1_trigger[NO_COL_FAMILY])) + self.assertEqual( + cond1_trigger[NO_COL_FAMILY][0].get_message(), + "[db/db_impl.cc:331] [unknown] random log message no column family", + ) + cond2_trigger = condition2.get_trigger() + self.assertEqual(["col-fam-B"], list(cond2_trigger.keys())) + self.assertEqual(1, len(cond2_trigger["col-fam-B"])) + self.assertEqual( + cond2_trigger["col-fam-B"][0].get_message(), + "[db/db_impl.cc:234] [col-fam-B] log continuing on next line\n" + + "remaining part of the log", + ) + self.assertIsNone(condition3.get_trigger()) diff --git a/src/rocksdb/tools/advisor/test/test_db_options_parser.py b/src/rocksdb/tools/advisor/test/test_db_options_parser.py new file mode 100644 index 000000000..cdeebaefa --- /dev/null +++ b/src/rocksdb/tools/advisor/test/test_db_options_parser.py @@ -0,0 +1,214 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import os +import unittest + +from advisor.db_log_parser import NO_COL_FAMILY +from advisor.db_options_parser import DatabaseOptions +from advisor.rule_parser import Condition, OptionCondition + + +class TestDatabaseOptions(unittest.TestCase): + def setUp(self): + self.this_path = os.path.abspath(os.path.dirname(__file__)) + self.og_options = os.path.join(self.this_path, "input_files/OPTIONS-000005") + misc_options = ["bloom_bits = 4", "rate_limiter_bytes_per_sec = 1024000"] + # create the options object + self.db_options = DatabaseOptions(self.og_options, misc_options) + # perform clean-up before running tests + self.generated_options = os.path.join( + self.this_path, "../temp/OPTIONS_testing.tmp" + ) + if os.path.isfile(self.generated_options): + os.remove(self.generated_options) + + def test_get_options_diff(self): + old_opt = { + "DBOptions.stats_dump_freq_sec": {NO_COL_FAMILY: "20"}, + "CFOptions.write_buffer_size": { + "default": "1024000", + "col_fam_A": "128000", + "col_fam_B": "128000000", + }, + "DBOptions.use_fsync": {NO_COL_FAMILY: "true"}, + "DBOptions.max_log_file_size": {NO_COL_FAMILY: "128000000"}, + } + new_opt = { + "bloom_bits": {NO_COL_FAMILY: "4"}, + "CFOptions.write_buffer_size": { + "default": "128000000", + "col_fam_A": "128000", + "col_fam_C": "128000000", + }, + "DBOptions.use_fsync": {NO_COL_FAMILY: "true"}, + "DBOptions.max_log_file_size": {NO_COL_FAMILY: "0"}, + } + diff = DatabaseOptions.get_options_diff(old_opt, new_opt) + + expected_diff = { + "DBOptions.stats_dump_freq_sec": {NO_COL_FAMILY: ("20", None)}, + "bloom_bits": {NO_COL_FAMILY: (None, "4")}, + "CFOptions.write_buffer_size": { + "default": ("1024000", "128000000"), + "col_fam_B": ("128000000", None), + "col_fam_C": (None, "128000000"), + }, + "DBOptions.max_log_file_size": {NO_COL_FAMILY: ("128000000", "0")}, + } + self.assertDictEqual(diff, expected_diff) + + def test_is_misc_option(self): + self.assertTrue(DatabaseOptions.is_misc_option("bloom_bits")) + self.assertFalse( + DatabaseOptions.is_misc_option("DBOptions.stats_dump_freq_sec") + ) + + def test_set_up(self): + options = self.db_options.get_all_options() + self.assertEqual(22, len(options.keys())) + expected_misc_options = { + "bloom_bits": "4", + "rate_limiter_bytes_per_sec": "1024000", + } + self.assertDictEqual(expected_misc_options, self.db_options.get_misc_options()) + self.assertListEqual( + ["default", "col_fam_A"], self.db_options.get_column_families() + ) + + def test_get_options(self): + opt_to_get = [ + "DBOptions.manual_wal_flush", + "DBOptions.db_write_buffer_size", + "bloom_bits", + "CFOptions.compaction_filter_factory", + "CFOptions.num_levels", + "rate_limiter_bytes_per_sec", + "TableOptions.BlockBasedTable.block_align", + "random_option", + ] + options = self.db_options.get_options(opt_to_get) + expected_options = { + "DBOptions.manual_wal_flush": {NO_COL_FAMILY: "false"}, + "DBOptions.db_write_buffer_size": {NO_COL_FAMILY: "0"}, + "bloom_bits": {NO_COL_FAMILY: "4"}, + "CFOptions.compaction_filter_factory": { + "default": "nullptr", + "col_fam_A": "nullptr", + }, + "CFOptions.num_levels": {"default": "7", "col_fam_A": "5"}, + "rate_limiter_bytes_per_sec": {NO_COL_FAMILY: "1024000"}, + "TableOptions.BlockBasedTable.block_align": { + "default": "false", + "col_fam_A": "true", + }, + } + self.assertDictEqual(expected_options, options) + + def test_update_options(self): + # add new, update old, set old + # before updating + expected_old_opts = { + "DBOptions.db_log_dir": {NO_COL_FAMILY: None}, + "DBOptions.manual_wal_flush": {NO_COL_FAMILY: "false"}, + "bloom_bits": {NO_COL_FAMILY: "4"}, + "CFOptions.num_levels": {"default": "7", "col_fam_A": "5"}, + "TableOptions.BlockBasedTable.block_restart_interval": {"col_fam_A": "16"}, + } + get_opts = list(expected_old_opts.keys()) + options = self.db_options.get_options(get_opts) + self.assertEqual(expected_old_opts, options) + # after updating options + update_opts = { + "DBOptions.db_log_dir": {NO_COL_FAMILY: "/dev/shm"}, + "DBOptions.manual_wal_flush": {NO_COL_FAMILY: "true"}, + "bloom_bits": {NO_COL_FAMILY: "2"}, + "CFOptions.num_levels": {"col_fam_A": "7"}, + "TableOptions.BlockBasedTable.block_restart_interval": {"default": "32"}, + "random_misc_option": {NO_COL_FAMILY: "something"}, + } + self.db_options.update_options(update_opts) + update_opts["CFOptions.num_levels"]["default"] = "7" + update_opts["TableOptions.BlockBasedTable.block_restart_interval"] = { + "default": "32", + "col_fam_A": "16", + } + get_opts.append("random_misc_option") + options = self.db_options.get_options(get_opts) + self.assertDictEqual(update_opts, options) + expected_misc_options = { + "bloom_bits": "2", + "rate_limiter_bytes_per_sec": "1024000", + "random_misc_option": "something", + } + self.assertDictEqual(expected_misc_options, self.db_options.get_misc_options()) + + def test_generate_options_config(self): + # make sure file does not exist from before + self.assertFalse(os.path.isfile(self.generated_options)) + self.db_options.generate_options_config("testing") + self.assertTrue(os.path.isfile(self.generated_options)) + + def test_check_and_trigger_conditions(self): + # options only from CFOptions + # setup the OptionCondition objects to check and trigger + update_dict = { + "CFOptions.level0_file_num_compaction_trigger": {"col_fam_A": "4"}, + "CFOptions.max_bytes_for_level_base": {"col_fam_A": "10"}, + } + self.db_options.update_options(update_dict) + cond1 = Condition("opt-cond-1") + cond1 = OptionCondition.create(cond1) + cond1.set_parameter( + "options", + [ + "CFOptions.level0_file_num_compaction_trigger", + "TableOptions.BlockBasedTable.block_restart_interval", + "CFOptions.max_bytes_for_level_base", + ], + ) + cond1.set_parameter( + "evaluate", "int(options[0])*int(options[1])-int(options[2])>=0" + ) + # only DBOptions + cond2 = Condition("opt-cond-2") + cond2 = OptionCondition.create(cond2) + cond2.set_parameter( + "options", + [ + "DBOptions.db_write_buffer_size", + "bloom_bits", + "rate_limiter_bytes_per_sec", + ], + ) + cond2.set_parameter( + "evaluate", "(int(options[2]) * int(options[1]) * int(options[0]))==0" + ) + # mix of CFOptions and DBOptions + cond3 = Condition("opt-cond-3") + cond3 = OptionCondition.create(cond3) + cond3.set_parameter( + "options", + [ + "DBOptions.db_write_buffer_size", # 0 + "CFOptions.num_levels", # 5, 7 + "bloom_bits", # 4 + ], + ) + cond3.set_parameter( + "evaluate", "int(options[2])*int(options[0])+int(options[1])>6" + ) + self.db_options.check_and_trigger_conditions([cond1, cond2, cond3]) + + cond1_trigger = {"col_fam_A": ["4", "16", "10"]} + self.assertDictEqual(cond1_trigger, cond1.get_trigger()) + cond2_trigger = {NO_COL_FAMILY: ["0", "4", "1024000"]} + self.assertDictEqual(cond2_trigger, cond2.get_trigger()) + cond3_trigger = {"default": ["0", "7", "4"]} + self.assertDictEqual(cond3_trigger, cond3.get_trigger()) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/rocksdb/tools/advisor/test/test_db_stats_fetcher.py b/src/rocksdb/tools/advisor/test/test_db_stats_fetcher.py new file mode 100644 index 000000000..e2c29ab74 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/test_db_stats_fetcher.py @@ -0,0 +1,121 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import os +import time +import unittest +from unittest.mock import MagicMock + +from advisor.db_stats_fetcher import DatabasePerfContext, LogStatsParser +from advisor.db_timeseries_parser import NO_ENTITY +from advisor.rule_parser import Condition, TimeSeriesCondition + + +class TestLogStatsParser(unittest.TestCase): + def setUp(self): + this_path = os.path.abspath(os.path.dirname(__file__)) + stats_file = os.path.join(this_path, "input_files/log_stats_parser_keys_ts") + # populate the keys_ts dictionary of LogStatsParser + self.stats_dict = {NO_ENTITY: {}} + with open(stats_file, "r") as fp: + for line in fp: + stat_name = line.split(":")[0].strip() + self.stats_dict[NO_ENTITY][stat_name] = {} + token_list = line.split(":")[1].strip().split(",") + for token in token_list: + timestamp = int(token.split()[0]) + value = float(token.split()[1]) + self.stats_dict[NO_ENTITY][stat_name][timestamp] = value + self.log_stats_parser = LogStatsParser("dummy_log_file", 20) + self.log_stats_parser.keys_ts = self.stats_dict + + def test_check_and_trigger_conditions_bursty(self): + # mock fetch_timeseries() because 'keys_ts' has been pre-populated + self.log_stats_parser.fetch_timeseries = MagicMock() + # condition: bursty + cond1 = Condition("cond-1") + cond1 = TimeSeriesCondition.create(cond1) + cond1.set_parameter("keys", "rocksdb.db.get.micros.p50") + cond1.set_parameter("behavior", "bursty") + cond1.set_parameter("window_sec", 40) + cond1.set_parameter("rate_threshold", 0) + self.log_stats_parser.check_and_trigger_conditions([cond1]) + expected_cond_trigger = {NO_ENTITY: {1530896440: 0.9767546362322214}} + self.assertDictEqual(expected_cond_trigger, cond1.get_trigger()) + # ensure that fetch_timeseries() was called once + self.log_stats_parser.fetch_timeseries.assert_called_once() + + def test_check_and_trigger_conditions_eval_agg(self): + # mock fetch_timeseries() because 'keys_ts' has been pre-populated + self.log_stats_parser.fetch_timeseries = MagicMock() + # condition: evaluate_expression + cond1 = Condition("cond-1") + cond1 = TimeSeriesCondition.create(cond1) + cond1.set_parameter("keys", "rocksdb.db.get.micros.p50") + cond1.set_parameter("behavior", "evaluate_expression") + keys = ["rocksdb.manifest.file.sync.micros.p99", "rocksdb.db.get.micros.p50"] + cond1.set_parameter("keys", keys) + cond1.set_parameter("aggregation_op", "latest") + # condition evaluates to FALSE + cond1.set_parameter("evaluate", "keys[0]-(keys[1]*100)>200") + self.log_stats_parser.check_and_trigger_conditions([cond1]) + expected_cond_trigger = {NO_ENTITY: [1792.0, 15.9638]} + self.assertIsNone(cond1.get_trigger()) + # condition evaluates to TRUE + cond1.set_parameter("evaluate", "keys[0]-(keys[1]*100)<200") + self.log_stats_parser.check_and_trigger_conditions([cond1]) + expected_cond_trigger = {NO_ENTITY: [1792.0, 15.9638]} + self.assertDictEqual(expected_cond_trigger, cond1.get_trigger()) + # ensure that fetch_timeseries() was called + self.log_stats_parser.fetch_timeseries.assert_called() + + def test_check_and_trigger_conditions_eval(self): + # mock fetch_timeseries() because 'keys_ts' has been pre-populated + self.log_stats_parser.fetch_timeseries = MagicMock() + # condition: evaluate_expression + cond1 = Condition("cond-1") + cond1 = TimeSeriesCondition.create(cond1) + cond1.set_parameter("keys", "rocksdb.db.get.micros.p50") + cond1.set_parameter("behavior", "evaluate_expression") + keys = ["rocksdb.manifest.file.sync.micros.p99", "rocksdb.db.get.micros.p50"] + cond1.set_parameter("keys", keys) + cond1.set_parameter("evaluate", "keys[0]-(keys[1]*100)>500") + self.log_stats_parser.check_and_trigger_conditions([cond1]) + expected_trigger = { + NO_ENTITY: { + 1530896414: [9938.0, 16.31508], + 1530896440: [9938.0, 16.346602], + 1530896466: [9938.0, 16.284669], + 1530896492: [9938.0, 16.16005], + } + } + self.assertDictEqual(expected_trigger, cond1.get_trigger()) + self.log_stats_parser.fetch_timeseries.assert_called_once() + + +class TestDatabasePerfContext(unittest.TestCase): + def test_unaccumulate_metrics(self): + perf_dict = { + "user_key_comparison_count": 675903942, + "block_cache_hit_count": 830086, + } + timestamp = int(time.time()) + perf_ts = {} + for key in perf_dict: + perf_ts[key] = {} + start_val = perf_dict[key] + for ix in range(5): + perf_ts[key][timestamp + (ix * 10)] = start_val + (2 * ix * ix) + db_perf_context = DatabasePerfContext(perf_ts, 10, True) + timestamps = [timestamp + (ix * 10) for ix in range(1, 5, 1)] + values = [val for val in range(2, 15, 4)] + inner_dict = {timestamps[ix]: values[ix] for ix in range(4)} + expected_keys_ts = { + NO_ENTITY: { + "user_key_comparison_count": inner_dict, + "block_cache_hit_count": inner_dict, + } + } + self.assertDictEqual(expected_keys_ts, db_perf_context.keys_ts) diff --git a/src/rocksdb/tools/advisor/test/test_rule_parser.py b/src/rocksdb/tools/advisor/test/test_rule_parser.py new file mode 100644 index 000000000..4ea4ca159 --- /dev/null +++ b/src/rocksdb/tools/advisor/test/test_rule_parser.py @@ -0,0 +1,226 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import os +import unittest + +from advisor.db_log_parser import DatabaseLogs, DataSource +from advisor.db_options_parser import DatabaseOptions +from advisor.rule_parser import RulesSpec + +RuleToSuggestions = { + "stall-too-many-memtables": ["inc-bg-flush", "inc-write-buffer"], + "stall-too-many-L0": [ + "inc-max-subcompactions", + "inc-max-bg-compactions", + "inc-write-buffer-size", + "dec-max-bytes-for-level-base", + "inc-l0-slowdown-writes-trigger", + ], + "stop-too-many-L0": [ + "inc-max-bg-compactions", + "inc-write-buffer-size", + "inc-l0-stop-writes-trigger", + ], + "stall-too-many-compaction-bytes": [ + "inc-max-bg-compactions", + "inc-write-buffer-size", + "inc-hard-pending-compaction-bytes-limit", + "inc-soft-pending-compaction-bytes-limit", + ], + "level0-level1-ratio": ["l0-l1-ratio-health-check"], +} + + +class TestAllRulesTriggered(unittest.TestCase): + def setUp(self): + # load the Rules + this_path = os.path.abspath(os.path.dirname(__file__)) + ini_path = os.path.join(this_path, "input_files/triggered_rules.ini") + self.db_rules = RulesSpec(ini_path) + self.db_rules.load_rules_from_spec() + self.db_rules.perform_section_checks() + # load the data sources: LOG and OPTIONS + log_path = os.path.join(this_path, "input_files/LOG-0") + options_path = os.path.join(this_path, "input_files/OPTIONS-000005") + db_options_parser = DatabaseOptions(options_path) + self.column_families = db_options_parser.get_column_families() + db_logs_parser = DatabaseLogs(log_path, self.column_families) + self.data_sources = { + DataSource.Type.DB_OPTIONS: [db_options_parser], + DataSource.Type.LOG: [db_logs_parser], + } + + def test_triggered_conditions(self): + conditions_dict = self.db_rules.get_conditions_dict() + rules_dict = self.db_rules.get_rules_dict() + # Make sure none of the conditions is triggered beforehand + for cond in conditions_dict.values(): + self.assertFalse(cond.is_triggered(), repr(cond)) + for rule in rules_dict.values(): + self.assertFalse( + rule.is_triggered(conditions_dict, self.column_families), repr(rule) + ) + + # # Trigger the conditions as per the data sources. + # trigger_conditions(, conditions_dict) + + # Get the set of rules that have been triggered + triggered_rules = self.db_rules.get_triggered_rules( + self.data_sources, self.column_families + ) + + # Make sure each condition and rule is triggered + for cond in conditions_dict.values(): + if cond.get_data_source() is DataSource.Type.TIME_SERIES: + continue + self.assertTrue(cond.is_triggered(), repr(cond)) + + for rule in rules_dict.values(): + self.assertIn(rule, triggered_rules) + # Check the suggestions made by the triggered rules + for sugg in rule.get_suggestions(): + self.assertIn(sugg, RuleToSuggestions[rule.name]) + + for rule in triggered_rules: + self.assertIn(rule, rules_dict.values()) + for sugg in RuleToSuggestions[rule.name]: + self.assertIn(sugg, rule.get_suggestions()) + + +class TestConditionsConjunctions(unittest.TestCase): + def setUp(self): + # load the Rules + this_path = os.path.abspath(os.path.dirname(__file__)) + ini_path = os.path.join(this_path, "input_files/test_rules.ini") + self.db_rules = RulesSpec(ini_path) + self.db_rules.load_rules_from_spec() + self.db_rules.perform_section_checks() + # load the data sources: LOG and OPTIONS + log_path = os.path.join(this_path, "input_files/LOG-1") + options_path = os.path.join(this_path, "input_files/OPTIONS-000005") + db_options_parser = DatabaseOptions(options_path) + self.column_families = db_options_parser.get_column_families() + db_logs_parser = DatabaseLogs(log_path, self.column_families) + self.data_sources = { + DataSource.Type.DB_OPTIONS: [db_options_parser], + DataSource.Type.LOG: [db_logs_parser], + } + + def test_condition_conjunctions(self): + conditions_dict = self.db_rules.get_conditions_dict() + rules_dict = self.db_rules.get_rules_dict() + # Make sure none of the conditions is triggered beforehand + for cond in conditions_dict.values(): + self.assertFalse(cond.is_triggered(), repr(cond)) + for rule in rules_dict.values(): + self.assertFalse( + rule.is_triggered(conditions_dict, self.column_families), repr(rule) + ) + + # Trigger the conditions as per the data sources. + self.db_rules.trigger_conditions(self.data_sources) + + # Check for the conditions + conds_triggered = ["log-1-true", "log-2-true", "log-3-true"] + conds_not_triggered = ["log-4-false", "options-1-false"] + for cond in conds_triggered: + self.assertTrue(conditions_dict[cond].is_triggered(), repr(cond)) + for cond in conds_not_triggered: + self.assertFalse(conditions_dict[cond].is_triggered(), repr(cond)) + + # Check for the rules + rules_triggered = ["multiple-conds-true"] + rules_not_triggered = [ + "single-condition-false", + "multiple-conds-one-false", + "multiple-conds-all-false", + ] + for rule_name in rules_triggered: + rule = rules_dict[rule_name] + self.assertTrue( + rule.is_triggered(conditions_dict, self.column_families), repr(rule) + ) + for rule_name in rules_not_triggered: + rule = rules_dict[rule_name] + self.assertFalse( + rule.is_triggered(conditions_dict, self.column_families), repr(rule) + ) + + +class TestSanityChecker(unittest.TestCase): + def setUp(self): + this_path = os.path.abspath(os.path.dirname(__file__)) + ini_path = os.path.join(this_path, "input_files/rules_err1.ini") + db_rules = RulesSpec(ini_path) + db_rules.load_rules_from_spec() + self.rules_dict = db_rules.get_rules_dict() + self.conditions_dict = db_rules.get_conditions_dict() + self.suggestions_dict = db_rules.get_suggestions_dict() + + def test_rule_missing_suggestions(self): + regex = ".*rule must have at least one suggestion.*" + with self.assertRaisesRegex(ValueError, regex): + self.rules_dict["missing-suggestions"].perform_checks() + + def test_rule_missing_conditions(self): + regex = ".*rule must have at least one condition.*" + with self.assertRaisesRegex(ValueError, regex): + self.rules_dict["missing-conditions"].perform_checks() + + def test_condition_missing_regex(self): + regex = ".*provide regex for log condition.*" + with self.assertRaisesRegex(ValueError, regex): + self.conditions_dict["missing-regex"].perform_checks() + + def test_condition_missing_options(self): + regex = ".*options missing in condition.*" + with self.assertRaisesRegex(ValueError, regex): + self.conditions_dict["missing-options"].perform_checks() + + def test_condition_missing_expression(self): + regex = ".*expression missing in condition.*" + with self.assertRaisesRegex(ValueError, regex): + self.conditions_dict["missing-expression"].perform_checks() + + def test_suggestion_missing_option(self): + regex = ".*provide option or description.*" + with self.assertRaisesRegex(ValueError, regex): + self.suggestions_dict["missing-option"].perform_checks() + + def test_suggestion_missing_description(self): + regex = ".*provide option or description.*" + with self.assertRaisesRegex(ValueError, regex): + self.suggestions_dict["missing-description"].perform_checks() + + +class TestParsingErrors(unittest.TestCase): + def setUp(self): + self.this_path = os.path.abspath(os.path.dirname(__file__)) + + def test_condition_missing_source(self): + ini_path = os.path.join(self.this_path, "input_files/rules_err2.ini") + db_rules = RulesSpec(ini_path) + regex = ".*provide source for condition.*" + with self.assertRaisesRegex(NotImplementedError, regex): + db_rules.load_rules_from_spec() + + def test_suggestion_missing_action(self): + ini_path = os.path.join(self.this_path, "input_files/rules_err3.ini") + db_rules = RulesSpec(ini_path) + regex = ".*provide action for option.*" + with self.assertRaisesRegex(ValueError, regex): + db_rules.load_rules_from_spec() + + def test_section_no_name(self): + ini_path = os.path.join(self.this_path, "input_files/rules_err4.ini") + db_rules = RulesSpec(ini_path) + regex = "Parsing error: needed section header:.*" + with self.assertRaisesRegex(ValueError, regex): + db_rules.load_rules_from_spec() + + +if __name__ == "__main__": + unittest.main() |