diff options
Diffstat (limited to 'suricata/update/matchers.py')
-rw-r--r-- | suricata/update/matchers.py | 331 |
1 files changed, 331 insertions, 0 deletions
diff --git a/suricata/update/matchers.py b/suricata/update/matchers.py new file mode 100644 index 0000000..56a9e29 --- /dev/null +++ b/suricata/update/matchers.py @@ -0,0 +1,331 @@ +# Copyright (C) 2017 Open Information Security Foundation +# +# You can copy, redistribute or modify this Program under the terms of +# the GNU General Public License version 2 as published by the Free +# Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# version 2 along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +# This module contains functions for matching rules for disabling, +# enabling, converting to drop or modification. + +import re +import os.path +import logging +import shlex +import fnmatch +import suricata.update.rule + + +logger = logging.getLogger() + + +class AllRuleMatcher(object): + """Matcher object to match all rules. """ + + def match(self, rule): + return True + + @classmethod + def parse(cls, buf): + if buf.strip() == "*": + return cls() + return None + + +class ProtoRuleMatcher: + """A rule matcher that matches on the protocol of a rule.""" + + def __init__(self, proto): + self.proto = proto + + def match(self, rule): + return rule.proto == self.proto + + +class IdRuleMatcher(object): + """Matcher object to match an idstools rule object by its signature + ID.""" + + def __init__(self, generatorId=None, signatureId=None): + self.signatureIds = [] + if generatorId and signatureId: + self.signatureIds.append((generatorId, signatureId)) + + def match(self, rule): + for (generatorId, signatureId) in self.signatureIds: + if generatorId == rule.gid and signatureId == rule.sid: + return True + return False + + @classmethod + def parse(cls, buf): + matcher = cls() + + for entry in buf.split(","): + entry = entry.strip() + + parts = entry.split(":", 1) + if not parts: + return None + if len(parts) == 1: + try: + signatureId = int(parts[0]) + matcher.signatureIds.append((1, signatureId)) + except: + return None + else: + try: + generatorId = int(parts[0]) + signatureId = int(parts[1]) + matcher.signatureIds.append((generatorId, signatureId)) + except: + return None + + return matcher + + +class FilenameMatcher(object): + """Matcher object to match a rule by its filename. This is similar to + a group but has no specifier prefix. + """ + + def __init__(self, pattern): + self.pattern = pattern + + def match(self, rule): + if hasattr(rule, "group") and rule.group is not None: + return fnmatch.fnmatch(rule.group, self.pattern) + return False + + @classmethod + def parse(cls, buf): + if buf.startswith("filename:"): + try: + group = buf.split(":", 1)[1] + return cls(group.strip()) + except: + pass + return None + + +class GroupMatcher(object): + """Matcher object to match an idstools rule object by its group (ie: + filename). + + The group is just the basename of the rule file with or without + extension. + + Examples: + - emerging-shellcode + - emerging-trojan.rules + + """ + + def __init__(self, pattern): + self.pattern = pattern + + def match(self, rule): + if hasattr(rule, "group") and rule.group is not None: + if fnmatch.fnmatch(os.path.basename(rule.group), self.pattern): + return True + # Try matching against the rule group without the file + # extension. + if fnmatch.fnmatch( + os.path.splitext( + os.path.basename(rule.group))[0], self.pattern): + return True + return False + + @classmethod + def parse(cls, buf): + if buf.startswith("group:"): + try: + logger.debug("Parsing group matcher: %s" % (buf)) + group = buf.split(":", 1)[1] + return cls(group.strip()) + except: + pass + if buf.endswith(".rules"): + return cls(buf.strip()) + return None + + +class ReRuleMatcher(object): + """Matcher object to match an idstools rule object by regular + expression.""" + + def __init__(self, pattern): + self.pattern = pattern + + def match(self, rule): + if self.pattern.search(rule.raw): + return True + return False + + @classmethod + def parse(cls, buf): + if buf.startswith("re:"): + try: + logger.debug("Parsing regex matcher: %s" % (buf)) + patternstr = buf.split(":", 1)[1].strip() + pattern = re.compile(patternstr, re.I) + return cls(pattern) + except: + pass + return None + + +class MetadataRuleMatch(object): + """ Matcher that matches on key/value style metadata fields. Case insensitive. """ + + def __init__(self, key, value): + self.key = key + self.value = value + + def match(self, rule): + for entry in rule.metadata: + parts = entry.strip().split(" ", 1) + if parts[0].strip().lower() == self.key and parts[1].strip().lower() == self.value: + print(rule) + return True + return False + + @classmethod + def parse(cls, buf): + print(buf) + if buf.startswith("metadata:"): + buf = buf.split(":", 1)[1].strip() + parts = buf.split(" ", 1) + if len(parts) == 2: + key = parts[0].strip().lower() + val = parts[1].strip().lower() + return cls(key, val) + return None + + +class ModifyRuleFilter(object): + """Filter to modify an idstools rule object. + + Important note: This filter does not modify the rule inplace, but + instead returns a new rule object with the modification. + """ + + def __init__(self, matcher, pattern, repl): + self.matcher = matcher + self.pattern = pattern + self.repl = repl + + def match(self, rule): + return self.matcher.match(rule) + + def run(self, rule): + modified_rule = self.pattern.sub(self.repl, rule.format()) + parsed = suricata.update.rule.parse(modified_rule, rule.group) + if parsed is None: + logger.error("Modification of rule %s results in invalid rule: %s", + rule.idstr, modified_rule) + return rule + return parsed + + @classmethod + def parse(cls, buf): + tokens = shlex.split(buf) + if len(tokens) == 3: + matchstring, a, b = tokens + elif len(tokens) > 3 and tokens[0] == "modifysid": + matchstring, a, b = tokens[1], tokens[2], tokens[4] + else: + raise Exception("Bad number of arguments.") + matcher = parse_rule_match(matchstring) + if not matcher: + raise Exception("Bad match string: %s" % (matchstring)) + pattern = re.compile(a) + + # Convert Oinkmaster backticks to Python. + b = re.sub(r"\$\{(\d+)\}", "\\\\\\1", b) + + return cls(matcher, pattern, b) + + +class DropRuleFilter(object): + """ Filter to modify an idstools rule object to a drop rule. """ + + def __init__(self, matcher): + self.matcher = matcher + + def match(self, rule): + if rule["noalert"]: + return False + return self.matcher.match(rule) + + def run(self, rule): + drop_rule = suricata.update.rule.parse(re.sub( + r"^\w+", "drop", rule.raw)) + drop_rule.enabled = rule.enabled + return drop_rule + +class AddMetadataFilter(object): + + def __init__(self, matcher, key, val): + self.matcher = matcher + self.key = key + self.val = val + + def match(self, rule): + return self.matcher.match(rule) + + def run(self, rule): + new_rule_string = re.sub(r";\s*\)$", "; metadata: {} {};)".format(self.key, self.val), rule.format()) + new_rule = suricata.update.rule.parse(new_rule_string, rule.group) + if not new_rule: + logger.error("Rule is not valid after adding metadata: [{}]: {}".format(rule.idstr, new_rule_string)) + return rule + return new_rule + + @classmethod + def parse(cls, buf): + try: + command, match_string, key, val = shlex.split(buf) + except: + raise Exception("metadata-add: invalid number of arguments") + matcher = parse_rule_match(match_string) + if not matcher: + raise Exception("Bad match string: %s" % (matchstring)) + return cls(matcher, key, val) + + +def parse_rule_match(match): + matcher = AllRuleMatcher.parse(match) + if matcher: + return matcher + + matcher = IdRuleMatcher.parse(match) + if matcher: + return matcher + + matcher = ReRuleMatcher.parse(match) + if matcher: + return matcher + + matcher = FilenameMatcher.parse(match) + if matcher: + return matcher + + matcher = GroupMatcher.parse(match) + if matcher: + return matcher + + matcher = MetadataRuleMatch.parse(match) + if matcher: + return matcher + + return None |