From 0fcce96a175531ec6042cde1b11a0052aa261dd5 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 19:43:34 +0200 Subject: Adding upstream version 1.3.2. Signed-off-by: Daniel Baumann --- suricata/update/engine.py | 196 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 196 insertions(+) create mode 100644 suricata/update/engine.py (limited to 'suricata/update/engine.py') diff --git a/suricata/update/engine.py b/suricata/update/engine.py new file mode 100644 index 0000000..22ad9b3 --- /dev/null +++ b/suricata/update/engine.py @@ -0,0 +1,196 @@ +# Copyright (C) 2017 Open Information Security Foundation +# Copyright (c) 2015 Jason Ish +# +# You can copy, redistribute or modify this Program under the terms of +# the GNU General Public License version 2 as published by the Free +# Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# version 2 along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +# This module contains functions for interacting with the Suricata +# application (aka the engine). + +from __future__ import print_function + +import sys +import os +import os.path +import subprocess +import re +import logging +import shutil +import yaml +import tempfile +from collections import namedtuple + +logger = logging.getLogger() + +SuricataVersion = namedtuple( + "SuricataVersion", ["major", "minor", "patch", "full", "short", "raw"]) + +def get_build_info(suricata): + build_info = { + "features": [], + } + build_info_output = subprocess.check_output([suricata, "--build-info"]) + for line in build_info_output.decode("utf-8").split("\n"): + line = line.strip() + if line.startswith("--prefix"): + build_info["prefix"] = line.split()[-1].strip() + elif line.startswith("--sysconfdir"): + build_info["sysconfdir"] = line.split()[-1].strip() + elif line.startswith("--localstatedir"): + build_info["localstatedir"] = line.split()[-1].strip() + elif line.startswith("--datarootdir"): + build_info["datarootdir"] = line.split()[-1].strip() + elif line.startswith("Features:"): + build_info["features"] = line.split()[1:] + elif line.startswith("This is Suricata version"): + build_info["version"] = parse_version(line) + + if not "prefix" in build_info: + logger.warning("--prefix not found in build-info.") + if not "sysconfdir" in build_info: + logger.warning("--sysconfdir not found in build-info.") + if not "localstatedir" in build_info: + logger.warning("--localstatedir not found in build-info.") + + return build_info + +class Configuration: + """An abstraction over the Suricata configuration file.""" + + def __init__(self, conf, build_info = {}): + self.conf = conf + self.build_info = build_info + + def keys(self): + return self.conf.keys() + + def has_key(self, key): + return key in self.conf + + def get(self, key): + return self.conf.get(key, None) + + def is_true(self, key, truthy=[]): + if not key in self.conf: + logger.warning( + "Suricata configuration key does not exist: %s" % (key)) + return False + if key in self.conf: + val = self.conf[key] + if val.lower() in ["1", "yes", "true"] + truthy: + return True + return False + + @classmethod + def load(cls, config_filename, suricata_path=None): + env = build_env() + env["SC_LOG_LEVEL"] = "Error" + if not suricata_path: + suricata_path = get_path() + if not suricata_path: + raise Exception("Suricata program could not be found.") + if not os.path.exists(suricata_path): + raise Exception("Suricata program %s does not exist.", suricata_path) + configuration_dump = subprocess.check_output( + [suricata_path, "-c", config_filename, "--dump-config"], + env=env) + conf = {} + for line in configuration_dump.splitlines(): + try: + key, val = line.decode().split(" = ") + conf[key] = val + except: + logger.warning("Failed to parse: %s", line) + build_info = get_build_info(suricata_path) + return cls(conf, build_info) + +def get_path(program="suricata"): + """Find Suricata in the shell path.""" + # First look for Suricata relative to suricata-update. + relative_path = os.path.join(os.path.dirname(sys.argv[0]), "suricata") + if os.path.exists(relative_path): + logger.debug("Found suricata at %s" % (relative_path)) + return relative_path + + # Otherwise look for it in the path. + for path in os.environ["PATH"].split(os.pathsep): + if not path: + continue + suricata_path = os.path.join(path, program) + logger.debug("Looking for %s in %s" % (program, path)) + if os.path.exists(suricata_path): + logger.debug("Found %s." % (suricata_path)) + return suricata_path + return None + +def parse_version(buf): + m = re.search(r"((\d+)\.(\d+)(\.(\d+))?([\w\-]+)?)", str(buf).strip()) + if m: + full = m.group(1) + major = int(m.group(2)) + minor = int(m.group(3)) + if not m.group(5): + patch = 0 + else: + patch = int(m.group(5)) + short = "%s.%s" % (major, minor) + return SuricataVersion( + major=major, minor=minor, patch=patch, short=short, full=full, + raw=buf) + return None + +def get_version(path): + """Get a SuricataVersion named tuple describing the version. + + If no path argument is found, the envionment PATH will be + searched. + """ + if not path: + return None + output = subprocess.check_output([path, "-V"]) + if output: + return parse_version(output) + return None + +def test_configuration(suricata_path, suricata_conf=None, rule_filename=None): + """Test the Suricata configuration with -T.""" + tempdir = tempfile.mkdtemp() + test_command = [ + suricata_path, + "-T", + "-l", tempdir, + ] + if suricata_conf: + test_command += ["-c", suricata_conf] + if rule_filename: + test_command += ["-S", rule_filename] + + env = build_env() + env["SC_LOG_LEVEL"] = "Warning" + + logger.debug("Running %s; env=%s", " ".join(test_command), str(env)) + rc = subprocess.Popen(test_command, env=env).wait() + ret = True if rc == 0 else False + + # Cleanup the temp dir + shutil.rmtree(tempdir) + + return ret + +def build_env(): + env = os.environ.copy() + env["SC_LOG_FORMAT"] = "%t - <%d> -- " + env["SC_LOG_LEVEL"] = "Error" + env["ASAN_OPTIONS"] = "detect_leaks=0" + return env -- cgit v1.2.3