diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 17:43:34 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 17:43:34 +0000 |
commit | 0fcce96a175531ec6042cde1b11a0052aa261dd5 (patch) | |
tree | 898a1e161c4984b41e6a732866bd73b24f0f7b7a /suricata/update/net.py | |
parent | Initial commit. (diff) | |
download | suricata-update-0fcce96a175531ec6042cde1b11a0052aa261dd5.tar.xz suricata-update-0fcce96a175531ec6042cde1b11a0052aa261dd5.zip |
Adding upstream version 1.3.2.upstream/1.3.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'suricata/update/net.py')
-rw-r--r-- | suricata/update/net.py | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/suricata/update/net.py b/suricata/update/net.py new file mode 100644 index 0000000..eac060e --- /dev/null +++ b/suricata/update/net.py @@ -0,0 +1,175 @@ +# Copyright (C) 2017 Open Information Security Foundation +# Copyright (c) 2013 Jason Ish +# +# You can copy, redistribute or modify this Program under the terms of +# the GNU General Public License version 2 as published by the Free +# Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# version 2 along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +""" Module for network related operations. """ + +import platform +import logging +import ssl +import re + +try: + # Python 3.3... + from urllib.request import urlopen, build_opener + from urllib.error import HTTPError + from urllib.request import HTTPSHandler +except ImportError: + # Python 2.6, 2.7. + from urllib2 import urlopen, build_opener + from urllib2 import HTTPError + from urllib2 import HTTPSHandler + +from suricata.update.version import version +from suricata.update import config +from suricata.update import osinfo + +logger = logging.getLogger() + +# Number of bytes to read at a time in a GET request. +GET_BLOCK_SIZE = 8192 + +user_agent_suricata_verison = "Unknown" +custom_user_agent = None + +def set_custom_user_agent(ua): + global custom_user_agent + custom_user_agent = ua + +def set_user_agent_suricata_version(version): + global user_agent_suricata_verison + user_agent_suricata_verison = version + +def build_user_agent(): + params = [] + has_custom_user_agent = config.has("user-agent") + if has_custom_user_agent: + user_agent = config.get("user-agent") + if user_agent is None or len(user_agent.strip()) == 0: + logger.debug("Suppressing HTTP User-Agent header") + return None + return user_agent + + params = [] + try: + params.append("OS: {}".format(platform.system())) + except Exception as err: + logger.error("Failed to set user-agent OS: {}".format(str(err))) + try: + params.append("CPU: {}".format(osinfo.arch())) + except Exception as err: + logger.error("Failed to set user-agent architecture: {}".format(str(err))) + try: + params.append("Python: {}".format(platform.python_version())) + except Exception as err: + logger.error("Failed to set user-agent python version: {}".format(str(err))) + try: + params.append("Dist: {}".format(osinfo.dist())) + except Exception as err: + logger.error("Failed to set user-agent distribution: {}".format(str(err))) + + params.append("Suricata: %s" % (user_agent_suricata_verison)) + + return "Suricata-Update/%s (%s)" % ( + version, "; ".join(params)) + + +def is_header_clean(header): + if len(header) != 2: + return False + name, val = header[0].strip(), header[1].strip() + if re.match( r"^[\w-]+$", name) and re.match(r"^[\w\s -~]+$", val): + return True + return False + + +def get(url, fileobj, progress_hook=None): + """ Perform a GET request against a URL writing the contents into + the provided file-like object. + + :param url: The URL to fetch + :param fileobj: The fileobj to write the content to + :param progress_hook: The function to call with progress updates + + :returns: Returns a tuple containing the number of bytes read and + the result of the info() function from urllib2.urlopen(). + + :raises: Exceptions from urllib2.urlopen() and writing to the + provided fileobj may occur. + """ + + user_agent = build_user_agent() + + try: + # Wrap in a try as Python versions prior to 2.7.9 don't have + # create_default_context, but some distros have backported it. + ssl_context = ssl.create_default_context() + if config.get("no-check-certificate"): + logger.debug("Disabling SSL/TLS certificate verification.") + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + opener = build_opener(HTTPSHandler(context=ssl_context)) + except: + opener = build_opener() + + if user_agent: + logger.debug("Setting HTTP User-Agent to %s", user_agent) + http_headers = [("User-Agent", user_agent)] + else: + http_headers = [(header, value) for header, + value in opener.addheaders if header.lower() != "user-agent"] + if isinstance(url, tuple): + header = url[1].split(":") if url[1] is not None else None + if header and is_header_clean(header=header): + name, val = header[0].strip(), header[1].strip() + logger.debug("Setting HTTP header %s to %s", name, val) + http_headers.append((name, val)) + elif header: + logger.error("Header not set as it does not meet the criteria") + url = url[0] + opener.addheaders = http_headers + + try: + remote = opener.open(url, timeout=30) + except ValueError as ve: + logger.error(ve) + else: + info = remote.info() + content_length = info.get("content-length") + content_length = int(content_length) if content_length else 0 + bytes_read = 0 + while True: + buf = remote.read(GET_BLOCK_SIZE) + if not buf: + # EOF + break + bytes_read += len(buf) + fileobj.write(buf) + if progress_hook: + progress_hook(content_length, bytes_read) + remote.close() + fileobj.flush() + return bytes_read, info + + +if __name__ == "__main__": + + import sys + + try: + get(sys.argv[1], sys.stdout) + except Exception as err: + print("ERROR: %s" % (err)) |