summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/tools/advisor/advisor/rule_parser.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/tools/advisor/advisor/rule_parser.py510
1 files changed, 510 insertions, 0 deletions
diff --git a/src/rocksdb/tools/advisor/advisor/rule_parser.py b/src/rocksdb/tools/advisor/advisor/rule_parser.py
new file mode 100644
index 000000000..169a55363
--- /dev/null
+++ b/src/rocksdb/tools/advisor/advisor/rule_parser.py
@@ -0,0 +1,510 @@
+# Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+# This source code is licensed under both the GPLv2 (found in the
+# COPYING file in the root directory) and Apache 2.0 License
+# (found in the LICENSE.Apache file in the root directory).
+
+import re
+from abc import ABC, abstractmethod
+from enum import Enum
+
+from advisor.db_log_parser import DataSource, NO_COL_FAMILY
+from advisor.db_timeseries_parser import TimeSeriesData
+from advisor.ini_parser import IniParser
+
+
+class Section(ABC):
+ def __init__(self, name):
+ self.name = name
+
+ @abstractmethod
+ def set_parameter(self, key, value):
+ pass
+
+ @abstractmethod
+ def perform_checks(self):
+ pass
+
+
+class Rule(Section):
+ def __init__(self, name):
+ super().__init__(name)
+ self.conditions = None
+ self.suggestions = None
+ self.overlap_time_seconds = None
+ self.trigger_entities = None
+ self.trigger_column_families = None
+
+ def set_parameter(self, key, value):
+ # If the Rule is associated with a single suggestion/condition, then
+ # value will be a string and not a list. Hence, convert it to a single
+ # element list before storing it in self.suggestions or
+ # self.conditions.
+ if key == "conditions":
+ if isinstance(value, str):
+ self.conditions = [value]
+ else:
+ self.conditions = value
+ elif key == "suggestions":
+ if isinstance(value, str):
+ self.suggestions = [value]
+ else:
+ self.suggestions = value
+ elif key == "overlap_time_period":
+ self.overlap_time_seconds = value
+
+ def get_suggestions(self):
+ return self.suggestions
+
+ def perform_checks(self):
+ if not self.conditions or len(self.conditions) < 1:
+ raise ValueError(self.name + ": rule must have at least one condition")
+ if not self.suggestions or len(self.suggestions) < 1:
+ raise ValueError(self.name + ": rule must have at least one suggestion")
+ if self.overlap_time_seconds:
+ if len(self.conditions) != 2:
+ raise ValueError(
+ self.name
+ + ": rule must be associated with 2 conditions\
+ in order to check for a time dependency between them"
+ )
+ time_format = "^\d+[s|m|h|d]$" # noqa
+ if not re.match(time_format, self.overlap_time_seconds, re.IGNORECASE):
+ raise ValueError(
+ self.name + ": overlap_time_seconds format: \d+[s|m|h|d]"
+ )
+ else: # convert to seconds
+ in_seconds = int(self.overlap_time_seconds[:-1])
+ if self.overlap_time_seconds[-1] == "m":
+ in_seconds *= 60
+ elif self.overlap_time_seconds[-1] == "h":
+ in_seconds *= 60 * 60
+ elif self.overlap_time_seconds[-1] == "d":
+ in_seconds *= 24 * 60 * 60
+ self.overlap_time_seconds = in_seconds
+
+ def get_overlap_timestamps(self, key1_trigger_epochs, key2_trigger_epochs):
+ # this method takes in 2 timeseries i.e. timestamps at which the
+ # rule's 2 TIME_SERIES conditions were triggered and it finds
+ # (if present) the first pair of timestamps at which the 2 conditions
+ # were triggered within 'overlap_time_seconds' of each other
+ key1_lower_bounds = [
+ epoch - self.overlap_time_seconds for epoch in key1_trigger_epochs
+ ]
+ key1_lower_bounds.sort()
+ key2_trigger_epochs.sort()
+ trigger_ix = 0
+ overlap_pair = None
+ for key1_lb in key1_lower_bounds:
+ while key2_trigger_epochs[trigger_ix] < key1_lb and trigger_ix < len(
+ key2_trigger_epochs
+ ):
+ trigger_ix += 1
+ if trigger_ix >= len(key2_trigger_epochs):
+ break
+ if key2_trigger_epochs[trigger_ix] <= key1_lb + (
+ 2 * self.overlap_time_seconds
+ ):
+ overlap_pair = (
+ key2_trigger_epochs[trigger_ix],
+ key1_lb + self.overlap_time_seconds,
+ )
+ break
+ return overlap_pair
+
+ def get_trigger_entities(self):
+ return self.trigger_entities
+
+ def get_trigger_column_families(self):
+ return self.trigger_column_families
+
+ def is_triggered(self, conditions_dict, column_families):
+ if self.overlap_time_seconds:
+ condition1 = conditions_dict[self.conditions[0]]
+ condition2 = conditions_dict[self.conditions[1]]
+ if not (
+ condition1.get_data_source() is DataSource.Type.TIME_SERIES
+ and condition2.get_data_source() is DataSource.Type.TIME_SERIES
+ ):
+ raise ValueError(self.name + ": need 2 timeseries conditions")
+
+ map1 = condition1.get_trigger()
+ map2 = condition2.get_trigger()
+ if not (map1 and map2):
+ return False
+
+ self.trigger_entities = {}
+ is_triggered = False
+ entity_intersection = set(map1.keys()).intersection(set(map2.keys()))
+ for entity in entity_intersection:
+ overlap_timestamps_pair = self.get_overlap_timestamps(
+ list(map1[entity].keys()), list(map2[entity].keys())
+ )
+ if overlap_timestamps_pair:
+ self.trigger_entities[entity] = overlap_timestamps_pair
+ is_triggered = True
+ if is_triggered:
+ self.trigger_column_families = set(column_families)
+ return is_triggered
+ else:
+ all_conditions_triggered = True
+ self.trigger_column_families = set(column_families)
+ for cond_name in self.conditions:
+ cond = conditions_dict[cond_name]
+ if not cond.get_trigger():
+ all_conditions_triggered = False
+ break
+ if (
+ cond.get_data_source() is DataSource.Type.LOG
+ or cond.get_data_source() is DataSource.Type.DB_OPTIONS
+ ):
+ cond_col_fam = set(cond.get_trigger().keys())
+ if NO_COL_FAMILY in cond_col_fam:
+ cond_col_fam = set(column_families)
+ self.trigger_column_families = (
+ self.trigger_column_families.intersection(cond_col_fam)
+ )
+ elif cond.get_data_source() is DataSource.Type.TIME_SERIES:
+ cond_entities = set(cond.get_trigger().keys())
+ if self.trigger_entities is None:
+ self.trigger_entities = cond_entities
+ else:
+ self.trigger_entities = self.trigger_entities.intersection(
+ cond_entities
+ )
+ if not (self.trigger_entities or self.trigger_column_families):
+ all_conditions_triggered = False
+ break
+ if not all_conditions_triggered: # clean up if rule not triggered
+ self.trigger_column_families = None
+ self.trigger_entities = None
+ return all_conditions_triggered
+
+ def __repr__(self):
+ # Append conditions
+ rule_string = "Rule: " + self.name + " has conditions:: "
+ is_first = True
+ for cond in self.conditions:
+ if is_first:
+ rule_string += cond
+ is_first = False
+ else:
+ rule_string += " AND " + cond
+ # Append suggestions
+ rule_string += "\nsuggestions:: "
+ is_first = True
+ for sugg in self.suggestions:
+ if is_first:
+ rule_string += sugg
+ is_first = False
+ else:
+ rule_string += ", " + sugg
+ if self.trigger_entities:
+ rule_string += ", entities:: " + str(self.trigger_entities)
+ if self.trigger_column_families:
+ rule_string += ", col_fam:: " + str(self.trigger_column_families)
+ # Return constructed string
+ return rule_string
+
+
+class Suggestion(Section):
+ class Action(Enum):
+ set = 1
+ increase = 2
+ decrease = 3
+
+ def __init__(self, name):
+ super().__init__(name)
+ self.option = None
+ self.action = None
+ self.suggested_values = None
+ self.description = None
+
+ def set_parameter(self, key, value):
+ if key == "option":
+ # Note:
+ # case 1: 'option' is supported by Rocksdb OPTIONS file; in this
+ # case the option belongs to one of the sections in the config
+ # file and it's name is prefixed by "<section_type>."
+ # case 2: 'option' is not supported by Rocksdb OPTIONS file; the
+ # option is not expected to have the character '.' in its name
+ self.option = value
+ elif key == "action":
+ if self.option and not value:
+ raise ValueError(self.name + ": provide action for option")
+ self.action = self.Action[value]
+ elif key == "suggested_values":
+ if isinstance(value, str):
+ self.suggested_values = [value]
+ else:
+ self.suggested_values = value
+ elif key == "description":
+ self.description = value
+
+ def perform_checks(self):
+ if not self.description:
+ if not self.option:
+ raise ValueError(self.name + ": provide option or description")
+ if not self.action:
+ raise ValueError(self.name + ": provide action for option")
+ if self.action is self.Action.set and not self.suggested_values:
+ raise ValueError(self.name + ": provide suggested value for option")
+
+ def __repr__(self):
+ sugg_string = "Suggestion: " + self.name
+ if self.description:
+ sugg_string += " description : " + self.description
+ else:
+ sugg_string += " option : " + self.option + " action : " + self.action.name
+ if self.suggested_values:
+ sugg_string += " suggested_values : " + str(self.suggested_values)
+ return sugg_string
+
+
+class Condition(Section):
+ def __init__(self, name):
+ super().__init__(name)
+ self.data_source = None
+ self.trigger = None
+
+ def perform_checks(self):
+ if not self.data_source:
+ raise ValueError(self.name + ": condition not tied to data source")
+
+ def set_data_source(self, data_source):
+ self.data_source = data_source
+
+ def get_data_source(self):
+ return self.data_source
+
+ def reset_trigger(self):
+ self.trigger = None
+
+ def set_trigger(self, condition_trigger):
+ self.trigger = condition_trigger
+
+ def get_trigger(self):
+ return self.trigger
+
+ def is_triggered(self):
+ if self.trigger:
+ return True
+ return False
+
+ def set_parameter(self, key, value):
+ # must be defined by the subclass
+ raise NotImplementedError(self.name + ": provide source for condition")
+
+
+class LogCondition(Condition):
+ @classmethod
+ def create(cls, base_condition):
+ base_condition.set_data_source(DataSource.Type["LOG"])
+ base_condition.__class__ = cls
+ return base_condition
+
+ def set_parameter(self, key, value):
+ if key == "regex":
+ self.regex = value
+
+ def perform_checks(self):
+ super().perform_checks()
+ if not self.regex:
+ raise ValueError(self.name + ": provide regex for log condition")
+
+ def __repr__(self):
+ log_cond_str = "LogCondition: " + self.name
+ log_cond_str += " regex: " + self.regex
+ # if self.trigger:
+ # log_cond_str += (" trigger: " + str(self.trigger))
+ return log_cond_str
+
+
+class OptionCondition(Condition):
+ @classmethod
+ def create(cls, base_condition):
+ base_condition.set_data_source(DataSource.Type["DB_OPTIONS"])
+ base_condition.__class__ = cls
+ return base_condition
+
+ def set_parameter(self, key, value):
+ if key == "options":
+ if isinstance(value, str):
+ self.options = [value]
+ else:
+ self.options = value
+ elif key == "evaluate":
+ self.eval_expr = value
+
+ def perform_checks(self):
+ super().perform_checks()
+ if not self.options:
+ raise ValueError(self.name + ": options missing in condition")
+ if not self.eval_expr:
+ raise ValueError(self.name + ": expression missing in condition")
+
+ def __repr__(self):
+ opt_cond_str = "OptionCondition: " + self.name
+ opt_cond_str += " options: " + str(self.options)
+ opt_cond_str += " expression: " + self.eval_expr
+ if self.trigger:
+ opt_cond_str += " trigger: " + str(self.trigger)
+ return opt_cond_str
+
+
+class TimeSeriesCondition(Condition):
+ @classmethod
+ def create(cls, base_condition):
+ base_condition.set_data_source(DataSource.Type["TIME_SERIES"])
+ base_condition.__class__ = cls
+ return base_condition
+
+ def set_parameter(self, key, value):
+ if key == "keys":
+ if isinstance(value, str):
+ self.keys = [value]
+ else:
+ self.keys = value
+ elif key == "behavior":
+ self.behavior = TimeSeriesData.Behavior[value]
+ elif key == "rate_threshold":
+ self.rate_threshold = float(value)
+ elif key == "window_sec":
+ self.window_sec = int(value)
+ elif key == "evaluate":
+ self.expression = value
+ elif key == "aggregation_op":
+ self.aggregation_op = TimeSeriesData.AggregationOperator[value]
+
+ def perform_checks(self):
+ if not self.keys:
+ raise ValueError(self.name + ": specify timeseries key")
+ if not self.behavior:
+ raise ValueError(self.name + ": specify triggering behavior")
+ if self.behavior is TimeSeriesData.Behavior.bursty:
+ if not self.rate_threshold:
+ raise ValueError(self.name + ": specify rate burst threshold")
+ if not self.window_sec:
+ self.window_sec = 300 # default window length is 5 minutes
+ if len(self.keys) > 1:
+ raise ValueError(self.name + ": specify only one key")
+ elif self.behavior is TimeSeriesData.Behavior.evaluate_expression:
+ if not (self.expression):
+ raise ValueError(self.name + ": specify evaluation expression")
+ else:
+ raise ValueError(self.name + ": trigger behavior not supported")
+
+ def __repr__(self):
+ ts_cond_str = "TimeSeriesCondition: " + self.name
+ ts_cond_str += " statistics: " + str(self.keys)
+ ts_cond_str += " behavior: " + self.behavior.name
+ if self.behavior is TimeSeriesData.Behavior.bursty:
+ ts_cond_str += " rate_threshold: " + str(self.rate_threshold)
+ ts_cond_str += " window_sec: " + str(self.window_sec)
+ if self.behavior is TimeSeriesData.Behavior.evaluate_expression:
+ ts_cond_str += " expression: " + self.expression
+ if hasattr(self, "aggregation_op"):
+ ts_cond_str += " aggregation_op: " + self.aggregation_op.name
+ if self.trigger:
+ ts_cond_str += " trigger: " + str(self.trigger)
+ return ts_cond_str
+
+
+class RulesSpec:
+ def __init__(self, rules_path):
+ self.file_path = rules_path
+
+ def initialise_fields(self):
+ self.rules_dict = {}
+ self.conditions_dict = {}
+ self.suggestions_dict = {}
+
+ def perform_section_checks(self):
+ for rule in self.rules_dict.values():
+ rule.perform_checks()
+ for cond in self.conditions_dict.values():
+ cond.perform_checks()
+ for sugg in self.suggestions_dict.values():
+ sugg.perform_checks()
+
+ def load_rules_from_spec(self):
+ self.initialise_fields()
+ with open(self.file_path, "r") as db_rules:
+ curr_section = None
+ for line in db_rules:
+ line = IniParser.remove_trailing_comment(line)
+ if not line:
+ continue
+ element = IniParser.get_element(line)
+ if element is IniParser.Element.comment:
+ continue
+ elif element is not IniParser.Element.key_val:
+ curr_section = element # it's a new IniParser header
+ section_name = IniParser.get_section_name(line)
+ if element is IniParser.Element.rule:
+ new_rule = Rule(section_name)
+ self.rules_dict[section_name] = new_rule
+ elif element is IniParser.Element.cond:
+ new_cond = Condition(section_name)
+ self.conditions_dict[section_name] = new_cond
+ elif element is IniParser.Element.sugg:
+ new_suggestion = Suggestion(section_name)
+ self.suggestions_dict[section_name] = new_suggestion
+ elif element is IniParser.Element.key_val:
+ key, value = IniParser.get_key_value_pair(line)
+ if curr_section is IniParser.Element.rule:
+ new_rule.set_parameter(key, value)
+ elif curr_section is IniParser.Element.cond:
+ if key == "source":
+ if value == "LOG":
+ new_cond = LogCondition.create(new_cond)
+ elif value == "OPTIONS":
+ new_cond = OptionCondition.create(new_cond)
+ elif value == "TIME_SERIES":
+ new_cond = TimeSeriesCondition.create(new_cond)
+ else:
+ new_cond.set_parameter(key, value)
+ elif curr_section is IniParser.Element.sugg:
+ new_suggestion.set_parameter(key, value)
+
+ def get_rules_dict(self):
+ return self.rules_dict
+
+ def get_conditions_dict(self):
+ return self.conditions_dict
+
+ def get_suggestions_dict(self):
+ return self.suggestions_dict
+
+ def get_triggered_rules(self, data_sources, column_families):
+ self.trigger_conditions(data_sources)
+ triggered_rules = []
+ for rule in self.rules_dict.values():
+ if rule.is_triggered(self.conditions_dict, column_families):
+ triggered_rules.append(rule)
+ return triggered_rules
+
+ def trigger_conditions(self, data_sources):
+ for source_type in data_sources:
+ cond_subset = [
+ cond
+ for cond in self.conditions_dict.values()
+ if cond.get_data_source() is source_type
+ ]
+ if not cond_subset:
+ continue
+ for source in data_sources[source_type]:
+ source.check_and_trigger_conditions(cond_subset)
+
+ def print_rules(self, rules):
+ for rule in rules:
+ print("\nRule: " + rule.name)
+ for cond_name in rule.conditions:
+ print(repr(self.conditions_dict[cond_name]))
+ for sugg_name in rule.suggestions:
+ print(repr(self.suggestions_dict[sugg_name]))
+ if rule.trigger_entities:
+ print("scope: entities:")
+ print(rule.trigger_entities)
+ if rule.trigger_column_families:
+ print("scope: col_fam:")
+ print(rule.trigger_column_families)