diff options
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/tools/advisor/advisor/rule_parser.py | 510 |
1 files changed, 510 insertions, 0 deletions
diff --git a/src/rocksdb/tools/advisor/advisor/rule_parser.py b/src/rocksdb/tools/advisor/advisor/rule_parser.py new file mode 100644 index 000000000..169a55363 --- /dev/null +++ b/src/rocksdb/tools/advisor/advisor/rule_parser.py @@ -0,0 +1,510 @@ +# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +# This source code is licensed under both the GPLv2 (found in the +# COPYING file in the root directory) and Apache 2.0 License +# (found in the LICENSE.Apache file in the root directory). + +import re +from abc import ABC, abstractmethod +from enum import Enum + +from advisor.db_log_parser import DataSource, NO_COL_FAMILY +from advisor.db_timeseries_parser import TimeSeriesData +from advisor.ini_parser import IniParser + + +class Section(ABC): + def __init__(self, name): + self.name = name + + @abstractmethod + def set_parameter(self, key, value): + pass + + @abstractmethod + def perform_checks(self): + pass + + +class Rule(Section): + def __init__(self, name): + super().__init__(name) + self.conditions = None + self.suggestions = None + self.overlap_time_seconds = None + self.trigger_entities = None + self.trigger_column_families = None + + def set_parameter(self, key, value): + # If the Rule is associated with a single suggestion/condition, then + # value will be a string and not a list. Hence, convert it to a single + # element list before storing it in self.suggestions or + # self.conditions. + if key == "conditions": + if isinstance(value, str): + self.conditions = [value] + else: + self.conditions = value + elif key == "suggestions": + if isinstance(value, str): + self.suggestions = [value] + else: + self.suggestions = value + elif key == "overlap_time_period": + self.overlap_time_seconds = value + + def get_suggestions(self): + return self.suggestions + + def perform_checks(self): + if not self.conditions or len(self.conditions) < 1: + raise ValueError(self.name + ": rule must have at least one condition") + if not self.suggestions or len(self.suggestions) < 1: + raise ValueError(self.name + ": rule must have at least one suggestion") + if self.overlap_time_seconds: + if len(self.conditions) != 2: + raise ValueError( + self.name + + ": rule must be associated with 2 conditions\ + in order to check for a time dependency between them" + ) + time_format = "^\d+[s|m|h|d]$" # noqa + if not re.match(time_format, self.overlap_time_seconds, re.IGNORECASE): + raise ValueError( + self.name + ": overlap_time_seconds format: \d+[s|m|h|d]" + ) + else: # convert to seconds + in_seconds = int(self.overlap_time_seconds[:-1]) + if self.overlap_time_seconds[-1] == "m": + in_seconds *= 60 + elif self.overlap_time_seconds[-1] == "h": + in_seconds *= 60 * 60 + elif self.overlap_time_seconds[-1] == "d": + in_seconds *= 24 * 60 * 60 + self.overlap_time_seconds = in_seconds + + def get_overlap_timestamps(self, key1_trigger_epochs, key2_trigger_epochs): + # this method takes in 2 timeseries i.e. timestamps at which the + # rule's 2 TIME_SERIES conditions were triggered and it finds + # (if present) the first pair of timestamps at which the 2 conditions + # were triggered within 'overlap_time_seconds' of each other + key1_lower_bounds = [ + epoch - self.overlap_time_seconds for epoch in key1_trigger_epochs + ] + key1_lower_bounds.sort() + key2_trigger_epochs.sort() + trigger_ix = 0 + overlap_pair = None + for key1_lb in key1_lower_bounds: + while key2_trigger_epochs[trigger_ix] < key1_lb and trigger_ix < len( + key2_trigger_epochs + ): + trigger_ix += 1 + if trigger_ix >= len(key2_trigger_epochs): + break + if key2_trigger_epochs[trigger_ix] <= key1_lb + ( + 2 * self.overlap_time_seconds + ): + overlap_pair = ( + key2_trigger_epochs[trigger_ix], + key1_lb + self.overlap_time_seconds, + ) + break + return overlap_pair + + def get_trigger_entities(self): + return self.trigger_entities + + def get_trigger_column_families(self): + return self.trigger_column_families + + def is_triggered(self, conditions_dict, column_families): + if self.overlap_time_seconds: + condition1 = conditions_dict[self.conditions[0]] + condition2 = conditions_dict[self.conditions[1]] + if not ( + condition1.get_data_source() is DataSource.Type.TIME_SERIES + and condition2.get_data_source() is DataSource.Type.TIME_SERIES + ): + raise ValueError(self.name + ": need 2 timeseries conditions") + + map1 = condition1.get_trigger() + map2 = condition2.get_trigger() + if not (map1 and map2): + return False + + self.trigger_entities = {} + is_triggered = False + entity_intersection = set(map1.keys()).intersection(set(map2.keys())) + for entity in entity_intersection: + overlap_timestamps_pair = self.get_overlap_timestamps( + list(map1[entity].keys()), list(map2[entity].keys()) + ) + if overlap_timestamps_pair: + self.trigger_entities[entity] = overlap_timestamps_pair + is_triggered = True + if is_triggered: + self.trigger_column_families = set(column_families) + return is_triggered + else: + all_conditions_triggered = True + self.trigger_column_families = set(column_families) + for cond_name in self.conditions: + cond = conditions_dict[cond_name] + if not cond.get_trigger(): + all_conditions_triggered = False + break + if ( + cond.get_data_source() is DataSource.Type.LOG + or cond.get_data_source() is DataSource.Type.DB_OPTIONS + ): + cond_col_fam = set(cond.get_trigger().keys()) + if NO_COL_FAMILY in cond_col_fam: + cond_col_fam = set(column_families) + self.trigger_column_families = ( + self.trigger_column_families.intersection(cond_col_fam) + ) + elif cond.get_data_source() is DataSource.Type.TIME_SERIES: + cond_entities = set(cond.get_trigger().keys()) + if self.trigger_entities is None: + self.trigger_entities = cond_entities + else: + self.trigger_entities = self.trigger_entities.intersection( + cond_entities + ) + if not (self.trigger_entities or self.trigger_column_families): + all_conditions_triggered = False + break + if not all_conditions_triggered: # clean up if rule not triggered + self.trigger_column_families = None + self.trigger_entities = None + return all_conditions_triggered + + def __repr__(self): + # Append conditions + rule_string = "Rule: " + self.name + " has conditions:: " + is_first = True + for cond in self.conditions: + if is_first: + rule_string += cond + is_first = False + else: + rule_string += " AND " + cond + # Append suggestions + rule_string += "\nsuggestions:: " + is_first = True + for sugg in self.suggestions: + if is_first: + rule_string += sugg + is_first = False + else: + rule_string += ", " + sugg + if self.trigger_entities: + rule_string += ", entities:: " + str(self.trigger_entities) + if self.trigger_column_families: + rule_string += ", col_fam:: " + str(self.trigger_column_families) + # Return constructed string + return rule_string + + +class Suggestion(Section): + class Action(Enum): + set = 1 + increase = 2 + decrease = 3 + + def __init__(self, name): + super().__init__(name) + self.option = None + self.action = None + self.suggested_values = None + self.description = None + + def set_parameter(self, key, value): + if key == "option": + # Note: + # case 1: 'option' is supported by Rocksdb OPTIONS file; in this + # case the option belongs to one of the sections in the config + # file and it's name is prefixed by "<section_type>." + # case 2: 'option' is not supported by Rocksdb OPTIONS file; the + # option is not expected to have the character '.' in its name + self.option = value + elif key == "action": + if self.option and not value: + raise ValueError(self.name + ": provide action for option") + self.action = self.Action[value] + elif key == "suggested_values": + if isinstance(value, str): + self.suggested_values = [value] + else: + self.suggested_values = value + elif key == "description": + self.description = value + + def perform_checks(self): + if not self.description: + if not self.option: + raise ValueError(self.name + ": provide option or description") + if not self.action: + raise ValueError(self.name + ": provide action for option") + if self.action is self.Action.set and not self.suggested_values: + raise ValueError(self.name + ": provide suggested value for option") + + def __repr__(self): + sugg_string = "Suggestion: " + self.name + if self.description: + sugg_string += " description : " + self.description + else: + sugg_string += " option : " + self.option + " action : " + self.action.name + if self.suggested_values: + sugg_string += " suggested_values : " + str(self.suggested_values) + return sugg_string + + +class Condition(Section): + def __init__(self, name): + super().__init__(name) + self.data_source = None + self.trigger = None + + def perform_checks(self): + if not self.data_source: + raise ValueError(self.name + ": condition not tied to data source") + + def set_data_source(self, data_source): + self.data_source = data_source + + def get_data_source(self): + return self.data_source + + def reset_trigger(self): + self.trigger = None + + def set_trigger(self, condition_trigger): + self.trigger = condition_trigger + + def get_trigger(self): + return self.trigger + + def is_triggered(self): + if self.trigger: + return True + return False + + def set_parameter(self, key, value): + # must be defined by the subclass + raise NotImplementedError(self.name + ": provide source for condition") + + +class LogCondition(Condition): + @classmethod + def create(cls, base_condition): + base_condition.set_data_source(DataSource.Type["LOG"]) + base_condition.__class__ = cls + return base_condition + + def set_parameter(self, key, value): + if key == "regex": + self.regex = value + + def perform_checks(self): + super().perform_checks() + if not self.regex: + raise ValueError(self.name + ": provide regex for log condition") + + def __repr__(self): + log_cond_str = "LogCondition: " + self.name + log_cond_str += " regex: " + self.regex + # if self.trigger: + # log_cond_str += (" trigger: " + str(self.trigger)) + return log_cond_str + + +class OptionCondition(Condition): + @classmethod + def create(cls, base_condition): + base_condition.set_data_source(DataSource.Type["DB_OPTIONS"]) + base_condition.__class__ = cls + return base_condition + + def set_parameter(self, key, value): + if key == "options": + if isinstance(value, str): + self.options = [value] + else: + self.options = value + elif key == "evaluate": + self.eval_expr = value + + def perform_checks(self): + super().perform_checks() + if not self.options: + raise ValueError(self.name + ": options missing in condition") + if not self.eval_expr: + raise ValueError(self.name + ": expression missing in condition") + + def __repr__(self): + opt_cond_str = "OptionCondition: " + self.name + opt_cond_str += " options: " + str(self.options) + opt_cond_str += " expression: " + self.eval_expr + if self.trigger: + opt_cond_str += " trigger: " + str(self.trigger) + return opt_cond_str + + +class TimeSeriesCondition(Condition): + @classmethod + def create(cls, base_condition): + base_condition.set_data_source(DataSource.Type["TIME_SERIES"]) + base_condition.__class__ = cls + return base_condition + + def set_parameter(self, key, value): + if key == "keys": + if isinstance(value, str): + self.keys = [value] + else: + self.keys = value + elif key == "behavior": + self.behavior = TimeSeriesData.Behavior[value] + elif key == "rate_threshold": + self.rate_threshold = float(value) + elif key == "window_sec": + self.window_sec = int(value) + elif key == "evaluate": + self.expression = value + elif key == "aggregation_op": + self.aggregation_op = TimeSeriesData.AggregationOperator[value] + + def perform_checks(self): + if not self.keys: + raise ValueError(self.name + ": specify timeseries key") + if not self.behavior: + raise ValueError(self.name + ": specify triggering behavior") + if self.behavior is TimeSeriesData.Behavior.bursty: + if not self.rate_threshold: + raise ValueError(self.name + ": specify rate burst threshold") + if not self.window_sec: + self.window_sec = 300 # default window length is 5 minutes + if len(self.keys) > 1: + raise ValueError(self.name + ": specify only one key") + elif self.behavior is TimeSeriesData.Behavior.evaluate_expression: + if not (self.expression): + raise ValueError(self.name + ": specify evaluation expression") + else: + raise ValueError(self.name + ": trigger behavior not supported") + + def __repr__(self): + ts_cond_str = "TimeSeriesCondition: " + self.name + ts_cond_str += " statistics: " + str(self.keys) + ts_cond_str += " behavior: " + self.behavior.name + if self.behavior is TimeSeriesData.Behavior.bursty: + ts_cond_str += " rate_threshold: " + str(self.rate_threshold) + ts_cond_str += " window_sec: " + str(self.window_sec) + if self.behavior is TimeSeriesData.Behavior.evaluate_expression: + ts_cond_str += " expression: " + self.expression + if hasattr(self, "aggregation_op"): + ts_cond_str += " aggregation_op: " + self.aggregation_op.name + if self.trigger: + ts_cond_str += " trigger: " + str(self.trigger) + return ts_cond_str + + +class RulesSpec: + def __init__(self, rules_path): + self.file_path = rules_path + + def initialise_fields(self): + self.rules_dict = {} + self.conditions_dict = {} + self.suggestions_dict = {} + + def perform_section_checks(self): + for rule in self.rules_dict.values(): + rule.perform_checks() + for cond in self.conditions_dict.values(): + cond.perform_checks() + for sugg in self.suggestions_dict.values(): + sugg.perform_checks() + + def load_rules_from_spec(self): + self.initialise_fields() + with open(self.file_path, "r") as db_rules: + curr_section = None + for line in db_rules: + line = IniParser.remove_trailing_comment(line) + if not line: + continue + element = IniParser.get_element(line) + if element is IniParser.Element.comment: + continue + elif element is not IniParser.Element.key_val: + curr_section = element # it's a new IniParser header + section_name = IniParser.get_section_name(line) + if element is IniParser.Element.rule: + new_rule = Rule(section_name) + self.rules_dict[section_name] = new_rule + elif element is IniParser.Element.cond: + new_cond = Condition(section_name) + self.conditions_dict[section_name] = new_cond + elif element is IniParser.Element.sugg: + new_suggestion = Suggestion(section_name) + self.suggestions_dict[section_name] = new_suggestion + elif element is IniParser.Element.key_val: + key, value = IniParser.get_key_value_pair(line) + if curr_section is IniParser.Element.rule: + new_rule.set_parameter(key, value) + elif curr_section is IniParser.Element.cond: + if key == "source": + if value == "LOG": + new_cond = LogCondition.create(new_cond) + elif value == "OPTIONS": + new_cond = OptionCondition.create(new_cond) + elif value == "TIME_SERIES": + new_cond = TimeSeriesCondition.create(new_cond) + else: + new_cond.set_parameter(key, value) + elif curr_section is IniParser.Element.sugg: + new_suggestion.set_parameter(key, value) + + def get_rules_dict(self): + return self.rules_dict + + def get_conditions_dict(self): + return self.conditions_dict + + def get_suggestions_dict(self): + return self.suggestions_dict + + def get_triggered_rules(self, data_sources, column_families): + self.trigger_conditions(data_sources) + triggered_rules = [] + for rule in self.rules_dict.values(): + if rule.is_triggered(self.conditions_dict, column_families): + triggered_rules.append(rule) + return triggered_rules + + def trigger_conditions(self, data_sources): + for source_type in data_sources: + cond_subset = [ + cond + for cond in self.conditions_dict.values() + if cond.get_data_source() is source_type + ] + if not cond_subset: + continue + for source in data_sources[source_type]: + source.check_and_trigger_conditions(cond_subset) + + def print_rules(self, rules): + for rule in rules: + print("\nRule: " + rule.name) + for cond_name in rule.conditions: + print(repr(self.conditions_dict[cond_name])) + for sugg_name in rule.suggestions: + print(repr(self.suggestions_dict[sugg_name])) + if rule.trigger_entities: + print("scope: entities:") + print(rule.trigger_entities) + if rule.trigger_column_families: + print("scope: col_fam:") + print(rule.trigger_column_families) |