summaryrefslogtreecommitdiffstats
path: root/monitoring/ceph-mixin/tests_alerts/validate_rules.py
diff options
context:
space:
mode:
Diffstat (limited to 'monitoring/ceph-mixin/tests_alerts/validate_rules.py')
-rwxr-xr-xmonitoring/ceph-mixin/tests_alerts/validate_rules.py571
1 files changed, 571 insertions, 0 deletions
diff --git a/monitoring/ceph-mixin/tests_alerts/validate_rules.py b/monitoring/ceph-mixin/tests_alerts/validate_rules.py
new file mode 100755
index 000000000..c24ce5c59
--- /dev/null
+++ b/monitoring/ceph-mixin/tests_alerts/validate_rules.py
@@ -0,0 +1,571 @@
+#!/usr/bin/env python3
+#
+# Check the Prometheus rules for format, and integration
+# with the unit tests. This script has the following exit
+# codes:
+# 0 .. Everything worked
+# 4 .. rule problems or missing unit tests
+# 8 .. Missing fields in YAML
+# 12 .. Invalid YAML - unable to load
+# 16 .. Missing input files
+#
+# Externals
+# snmptranslate .. used to determine the oid's in the MIB to verify the rule -> MIB is correct
+#
+
+import re
+import os
+import sys
+import yaml
+import shutil
+import string
+from bs4 import BeautifulSoup
+from typing import List, Any, Dict, Set, Optional, Tuple
+import subprocess
+
+import urllib.request
+import urllib.error
+from urllib.parse import urlparse
+
+from settings import ALERTS_FILE, MIB_FILE, UNIT_TESTS_FILE
+
+DOCLINK_NAME = 'documentation'
+
+
+def isascii(s: str) -> bool:
+ try:
+ s.encode('ascii')
+ except UnicodeEncodeError:
+ return False
+ return True
+
+
+def read_file(file_name: str) -> Tuple[str, str]:
+ try:
+ with open(file_name, 'r') as input_file:
+ raw_data = input_file.read()
+ except OSError:
+ return '', f"Unable to open {file_name}"
+
+ return raw_data, ''
+
+
+def load_yaml(file_name: str) -> Tuple[Dict[str, Any], str]:
+ data = {}
+ errs = ''
+
+ raw_data, err = read_file(file_name)
+ if not err:
+
+ try:
+ data = yaml.safe_load(raw_data)
+ except yaml.YAMLError as e:
+ errs = f"filename '{file_name} is not a valid YAML file"
+
+ return data, errs
+
+
+def run_command(command: str):
+ c = command.split()
+ completion = subprocess.run(c, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ return (completion.returncode,
+ completion.stdout.decode('utf-8').split('\n'),
+ completion.stderr.decode('utf-8').split('\n'))
+
+
+class HTMLCache:
+ def __init__(self) -> None:
+ self.cache: Dict[str, Tuple[int, str]] = {}
+
+ def fetch(self, url_str: str) -> None:
+ parsed = urlparse(url_str)
+ url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
+
+ if url in self.cache:
+ return self.cache[url]
+
+ req = urllib.request.Request(url)
+ try:
+ r = urllib.request.urlopen(req)
+ except urllib.error.HTTPError as e:
+ self.cache[url] = e.code, e.reason
+ return self.cache[url]
+ except urllib.error.URLError as e:
+ self.cache[url] = 400, e.reason
+ return self.cache[url]
+
+ if r.status == 200:
+ html = r.read().decode('utf-8')
+ self.cache[url] = 200, html
+ return self.cache[url]
+
+ self.cache[url] = r.status, r.reason
+ return r.status, r.reason
+
+ @property
+ def cached_pages(self) -> List[str]:
+ return self.cache.keys()
+
+ @property
+ def cached_pages_total(self) -> int:
+ return len(self.cache.keys())
+
+class PrometheusRule:
+ expected_attrs = [
+ 'alert',
+ 'expr',
+ 'labels',
+ 'annotations'
+ ]
+
+ def __init__(self, rule_group, rule_data: Dict[str, Any]):
+
+ assert 'alert' in rule_data
+ self.group: RuleGroup = rule_group
+ self.name = rule_data.get('alert')
+ self.rule = rule_data
+ self.errors: List[str] = []
+ self.warnings: List[str] = []
+ self.validate()
+
+ @property
+ def has_oid(self):
+ return True if self.rule.get('labels', {}).get('oid', '') else False
+
+ @property
+ def labels(self) -> Dict[str, str]:
+ return self.rule.get('labels', {})
+
+ @property
+ def annotations(self) -> Dict[str, str]:
+ return self.rule.get('annotations', {})
+
+ def _check_alert_name(self):
+ # this is simplistic, but works in the context of the alert name
+ if self.name[0] in string.ascii_uppercase and \
+ self.name != self.name.lower() and \
+ self.name != self.name.upper() and \
+ " " not in self.name and \
+ "_" not in self.name:
+ return
+
+ self.warnings.append("Alert name is not in CamelCase format")
+
+ def _check_structure(self):
+ rule_attrs = self.rule.keys()
+ missing_attrs = [a for a in PrometheusRule.expected_attrs if a not in rule_attrs]
+
+ if missing_attrs:
+ self.errors.append(
+ f"invalid alert structure. Missing field{'s' if len(missing_attrs) > 1 else ''}"
+ f": {','.join(missing_attrs)}")
+
+ def _check_labels(self):
+ for rqd in ['severity', 'type']:
+ if rqd not in self.labels.keys():
+ self.errors.append(f"rule is missing {rqd} label definition")
+
+ def _check_annotations(self):
+ for rqd in ['summary', 'description']:
+ if rqd not in self.annotations:
+ self.errors.append(f"rule is missing {rqd} annotation definition")
+
+ def _check_doclink(self):
+ doclink = self.annotations.get(DOCLINK_NAME, '')
+
+ if doclink:
+ url = urlparse(doclink)
+ status, content = self.group.fetch_html_page(doclink)
+ if status == 200:
+ if url.fragment:
+ soup = BeautifulSoup(content, 'html.parser')
+ if not soup.find(id=url.fragment):
+ self.errors.append(f"documentation link error: {url.fragment} anchor not found on the page")
+ else:
+ # catch all
+ self.errors.append(f"documentation link error: {status} {content}")
+
+ def _check_snmp(self):
+ oid = self.labels.get('oid', '')
+
+ if self.labels.get('severity', '') == 'critical' and not oid:
+ self.warnings.append("critical level alert is missing an SNMP oid entry")
+ if oid and not re.search('^1.3.6.1.4.1.50495.1.2.\\d+.\\d+.\\d+$', oid):
+ self.errors.append("invalid OID format provided")
+ if self.group.get_oids():
+ if oid and oid not in self.group.get_oids():
+ self.errors.append(f"rule defines an OID {oid} that is missing from the MIB file({os.path.basename(MIB_FILE)})")
+
+ def _check_ascii(self):
+ if 'oid' not in self.labels:
+ return
+
+ desc = self.annotations.get('description', '')
+ summary = self.annotations.get('summary', '')
+ if not isascii(desc):
+ self.errors.append(f"non-ascii characters found in 'description' field will cause issues in associated snmp trap.")
+ if not isascii(summary):
+ self.errors.append(f"non-ascii characters found in 'summary' field will cause issues in associated snmp trap.")
+
+ def validate(self):
+
+ self._check_alert_name()
+ self._check_structure()
+ self._check_labels()
+ self._check_annotations()
+ self._check_doclink()
+ self._check_snmp()
+ self._check_ascii()
+ char = '.'
+
+ if self.errors:
+ char = 'E'
+ self.group.update('error', self.name)
+ elif self.warnings:
+ char = 'W'
+ self.group.update('warning', self.name)
+
+ sys.stdout.write(char)
+
+
+class RuleGroup:
+
+ def __init__(self, rule_file, group_name: str, group_name_width: int):
+ self.rule_file: RuleFile = rule_file
+ self.group_name = group_name
+ self.rules: Dict[str, PrometheusRule] = {}
+ self.problems = {
+ "error": [],
+ "warning": [],
+ }
+
+ sys.stdout.write(f"\n\t{group_name:<{group_name_width}} : ")
+
+ def add_rule(self, rule_data:Dict[str, Any]):
+ alert_name = rule_data.get('alert')
+ self.rules[alert_name] = PrometheusRule(self, rule_data)
+
+ def update(self, problem_type:str, alert_name:str):
+ assert problem_type in ['error', 'warning']
+
+ self.problems[problem_type].append(alert_name)
+ self.rule_file.update(self.group_name)
+
+ def fetch_html_page(self, url):
+ return self.rule_file.fetch_html_page(url)
+
+ def get_oids(self):
+ return self.rule_file.oid_list
+
+ @property
+ def error_count(self):
+ return len(self.problems['error'])
+
+ def warning_count(self):
+ return len(self.problems['warning'])
+
+ @property
+ def count(self):
+ return len(self.rules)
+
+
+class RuleFile:
+
+ def __init__(self, parent, file_name, rules, oid_list):
+ self.parent = parent
+ self.file_name = file_name
+ self.rules: Dict[str, Any] = rules
+ self.oid_list = oid_list
+ self.problems: Set[str] = set()
+ self.group: Dict[str, RuleGroup] = {}
+ self.alert_names_seen: Set[str] = set()
+ self.duplicate_alert_names:List[str] = []
+ self.html_cache = HTMLCache()
+
+ assert 'groups' in self.rules
+ self.max_group_name_width = self.get_max_group_name()
+ self.load_groups()
+
+ def update(self, group_name):
+ self.problems.add(group_name)
+ self.parent.mark_invalid()
+
+ def fetch_html_page(self, url):
+ return self.html_cache.fetch(url)
+
+ @property
+ def group_count(self):
+ return len(self.rules['groups'])
+
+ @property
+ def rule_count(self):
+ rule_count = 0
+ for _group_name, rule_group in self.group.items():
+ rule_count += rule_group.count
+ return rule_count
+
+ @property
+ def oid_count(self):
+ oid_count = 0
+ for _group_name, rule_group in self.group.items():
+ for _rule_name, rule in rule_group.rules.items():
+ if rule.has_oid:
+ oid_count += 1
+ return oid_count
+
+ @property
+ def group_names(self):
+ return self.group.keys()
+
+ @property
+ def problem_count(self):
+ return len(self.problems)
+
+ def get_max_group_name(self):
+ group_name_list = []
+ for group in self.rules.get('groups'):
+ group_name_list.append(group['name'])
+ return max([len(g) for g in group_name_list])
+
+ def load_groups(self):
+ sys.stdout.write("\nChecking rule groups")
+ for group in self.rules.get('groups'):
+ group_name = group['name']
+ rules = group['rules']
+ self.group[group_name] = RuleGroup(self, group_name, self.max_group_name_width)
+ for rule_data in rules:
+ if 'alert' in rule_data:
+ alert_name = rule_data.get('alert')
+ if alert_name in self.alert_names_seen:
+ self.duplicate_alert_names.append(alert_name)
+ else:
+ self.alert_names_seen.add(alert_name)
+ self.group[group_name].add_rule(rule_data)
+ else:
+ # skipped recording rule
+ pass
+
+ def report(self):
+ def max_width(item_list: Set[str], min_width: int = 0) -> int:
+ return max([len(i) for i in item_list] + [min_width])
+
+ if not self.problems and not self.duplicate_alert_names:
+ print("\nNo problems detected in the rule file")
+ return
+
+ print("\nProblem Report\n")
+
+ group_width = max_width(self.problems, 5)
+ alert_names = set()
+ for g in self.problems:
+ group = self.group[g]
+ alert_names.update(group.problems.get('error', []))
+ alert_names.update(group.problems.get('warning', []))
+ alert_width = max_width(alert_names, 10)
+
+ template = " {group:<{group_width}} {severity:<8} {alert_name:<{alert_width}} {description}"
+
+ print(template.format(
+ group="Group",
+ group_width=group_width,
+ severity="Severity",
+ alert_name="Alert Name",
+ alert_width=alert_width,
+ description="Problem Description"))
+
+ print(template.format(
+ group="-----",
+ group_width=group_width,
+ severity="--------",
+ alert_name="----------",
+ alert_width=alert_width,
+ description="-------------------"))
+
+ for group_name in sorted(self.problems):
+ group = self.group[group_name]
+ rules = group.rules
+ for alert_name in group.problems.get('error', []):
+ for desc in rules[alert_name].errors:
+ print(template.format(
+ group=group_name,
+ group_width=group_width,
+ severity="Error",
+ alert_name=alert_name,
+ alert_width=alert_width,
+ description=desc))
+ for alert_name in group.problems.get('warning', []):
+ for desc in rules[alert_name].warnings:
+ print(template.format(
+ group=group_name,
+ group_width=group_width,
+ severity="Warning",
+ alert_name=alert_name,
+ alert_width=alert_width,
+ description=desc))
+ if self.duplicate_alert_names:
+ print("Duplicate alert names detected:")
+ for a in self.duplicate_alert_names:
+ print(f" - {a}")
+
+
+class UnitTests:
+ expected_attrs = [
+ 'rule_files',
+ 'tests',
+ 'evaluation_interval'
+ ]
+ def __init__(self, filename):
+ self.filename = filename
+ self.unit_test_data: Dict[str, Any] = {}
+ self.alert_names_seen: Set[str] = set()
+ self.problems: List[str] = []
+ self.load()
+
+ def load(self):
+ self.unit_test_data, errs = load_yaml(self.filename)
+ if errs:
+ print(f"\n\nError in unit tests file: {errs}")
+ sys.exit(12)
+
+ missing_attr = [a for a in UnitTests.expected_attrs if a not in self.unit_test_data.keys()]
+ if missing_attr:
+ print(f"\nMissing attributes in unit tests: {','.join(missing_attr)}")
+ sys.exit(8)
+
+ def _check_alert_names(self, alert_names: List[str]):
+ alerts_tested: Set[str] = set()
+ for t in self.unit_test_data.get('tests'):
+ test_cases = t.get('alert_rule_test', [])
+ if not test_cases:
+ continue
+ for case in test_cases:
+ alertname = case.get('alertname', '')
+ if alertname:
+ alerts_tested.add(alertname)
+
+ alerts_defined = set(alert_names)
+ self.problems = list(alerts_defined.difference(alerts_tested))
+
+ def process(self, defined_alert_names: List[str]):
+ self._check_alert_names(defined_alert_names)
+
+ def report(self) -> None:
+
+ if not self.problems:
+ print("\nNo problems detected in unit tests file")
+ return
+
+ print("\nUnit tests are incomplete. Tests missing for the following alerts;")
+ for p in self.problems:
+ print(f" - {p}")
+
+class RuleChecker:
+
+ def __init__(self, rules_filename: str = None, test_filename: str = None):
+ self.rules_filename = rules_filename or ALERTS_FILE
+ self.test_filename = test_filename or UNIT_TESTS_FILE
+ self.rule_file: Optional[RuleFile] = None
+ self.unit_tests: Optional[UnitTests] = None
+ self.rule_file_problems: bool = False
+ self.errors = {}
+ self.warnings = {}
+ self.error_count = 0
+ self.warning_count = 0
+ self.oid_count = 0
+
+ self.oid_list = self.build_oid_list()
+
+ def build_oid_list(self) -> List[str]:
+
+ cmd = shutil.which('snmptranslate')
+ if not cmd:
+ return []
+
+ rc, stdout, stderr = run_command(f"{cmd} -Pu -Tz -M ../../snmp:/usr/share/snmp/mibs -m CEPH-MIB")
+ if rc != 0:
+ return []
+
+ oid_list: List[str] = []
+ for line in stdout[:-1]:
+ _label, oid = line.replace('"', '').replace('\t', ' ').split()
+ oid_list.append(oid)
+
+ return oid_list
+
+ @property
+ def status(self):
+ if self.rule_file_problems or self.unit_tests.problems:
+ return 4
+
+ return 0
+
+ def mark_invalid(self):
+ self.rule_file_problems = True
+
+ def summarise_rule_file(self):
+ for group_name in self.rule_file.problems:
+ group = self.rule_file.group[group_name]
+ self.error_count += len(group.problems['error'])
+ self.warning_count += len(group.problems['warning'])
+
+ def ready(self):
+ errs: List[str] = []
+ ready_state = True
+ if not os.path.exists(self.rules_filename):
+ errs.append(f"rule file '{self.rules_filename}' not found")
+ ready_state = False
+
+ if not os.path.exists(self.test_filename):
+ errs.append(f"test file '{self.test_filename}' not found")
+ ready_state = False
+
+ return ready_state, errs
+
+ def run(self):
+
+ ready, errs = self.ready()
+ if not ready:
+ print("Unable to start:")
+ for e in errs:
+ print(f"- {e}")
+ sys.exit(16)
+
+ rules, errs = load_yaml(self.rules_filename)
+ if errs:
+ print(errs)
+ sys.exit(12)
+
+ self.rule_file = RuleFile(self, self.rules_filename, rules, self.oid_list)
+ self.summarise_rule_file()
+
+ self.unit_tests = UnitTests(self.test_filename)
+ self.unit_tests.process(self.rule_file.alert_names_seen)
+
+ def report(self):
+ print("\n\nSummary\n")
+ print(f"Rule file : {self.rules_filename}")
+ print(f"Unit Test file : {self.test_filename}")
+ print(f"\nRule groups processed : {self.rule_file.group_count:>3}")
+ print(f"Rules processed : {self.rule_file.rule_count:>3}")
+ print(f"SNMP OIDs declared : {self.rule_file.oid_count:>3} {'(snmptranslate missing, unable to cross check)' if not self.oid_list else ''}")
+ print(f"Rule errors : {self.error_count:>3}")
+ print(f"Rule warnings : {self.warning_count:>3}")
+ print(f"Rule name duplicates : {len(self.rule_file.duplicate_alert_names):>3}")
+ print(f"Unit tests missing : {len(self.unit_tests.problems):>3}")
+
+ self.rule_file.report()
+ self.unit_tests.report()
+
+
+def main():
+ checker = RuleChecker()
+
+ checker.run()
+ checker.report()
+ print()
+
+ sys.exit(checker.status)
+
+
+if __name__ == '__main__':
+ main()