summaryrefslogtreecommitdiffstats
path: root/plugins/imptcp
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-15 16:28:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-15 16:28:20 +0000
commitdcc721a95bef6f0d8e6d8775b8efe33e5aecd562 (patch)
tree66a2774cd0ee294d019efd71d2544c70f42b2842 /plugins/imptcp
parentInitial commit. (diff)
downloadrsyslog-dcc721a95bef6f0d8e6d8775b8efe33e5aecd562.tar.xz
rsyslog-dcc721a95bef6f0d8e6d8775b8efe33e5aecd562.zip
Adding upstream version 8.2402.0.upstream/8.2402.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'plugins/imptcp')
-rw-r--r--plugins/imptcp/Makefile.am6
-rw-r--r--plugins/imptcp/Makefile.in796
-rw-r--r--plugins/imptcp/imptcp.c2676
3 files changed, 3478 insertions, 0 deletions
diff --git a/plugins/imptcp/Makefile.am b/plugins/imptcp/Makefile.am
new file mode 100644
index 0000000..bfacc88
--- /dev/null
+++ b/plugins/imptcp/Makefile.am
@@ -0,0 +1,6 @@
+pkglib_LTLIBRARIES = imptcp.la
+
+imptcp_la_SOURCES = imptcp.c
+imptcp_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS)
+imptcp_la_LDFLAGS = -module -avoid-version
+imptcp_la_LIBADD =
diff --git a/plugins/imptcp/Makefile.in b/plugins/imptcp/Makefile.in
new file mode 100644
index 0000000..a24d6fe
--- /dev/null
+++ b/plugins/imptcp/Makefile.in
@@ -0,0 +1,796 @@
+# Makefile.in generated by automake 1.16.1 from Makefile.am.
+# @configure_input@
+
+# Copyright (C) 1994-2018 Free Software Foundation, Inc.
+
+# This Makefile.in is free software; the Free Software Foundation
+# gives unlimited permission to copy and/or distribute it,
+# with or without modifications, as long as this notice is preserved.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY, to the extent permitted by law; without
+# even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+# PARTICULAR PURPOSE.
+
+@SET_MAKE@
+
+VPATH = @srcdir@
+am__is_gnu_make = { \
+ if test -z '$(MAKELEVEL)'; then \
+ false; \
+ elif test -n '$(MAKE_HOST)'; then \
+ true; \
+ elif test -n '$(MAKE_VERSION)' && test -n '$(CURDIR)'; then \
+ true; \
+ else \
+ false; \
+ fi; \
+}
+am__make_running_with_option = \
+ case $${target_option-} in \
+ ?) ;; \
+ *) echo "am__make_running_with_option: internal error: invalid" \
+ "target option '$${target_option-}' specified" >&2; \
+ exit 1;; \
+ esac; \
+ has_opt=no; \
+ sane_makeflags=$$MAKEFLAGS; \
+ if $(am__is_gnu_make); then \
+ sane_makeflags=$$MFLAGS; \
+ else \
+ case $$MAKEFLAGS in \
+ *\\[\ \ ]*) \
+ bs=\\; \
+ sane_makeflags=`printf '%s\n' "$$MAKEFLAGS" \
+ | sed "s/$$bs$$bs[$$bs $$bs ]*//g"`;; \
+ esac; \
+ fi; \
+ skip_next=no; \
+ strip_trailopt () \
+ { \
+ flg=`printf '%s\n' "$$flg" | sed "s/$$1.*$$//"`; \
+ }; \
+ for flg in $$sane_makeflags; do \
+ test $$skip_next = yes && { skip_next=no; continue; }; \
+ case $$flg in \
+ *=*|--*) continue;; \
+ -*I) strip_trailopt 'I'; skip_next=yes;; \
+ -*I?*) strip_trailopt 'I';; \
+ -*O) strip_trailopt 'O'; skip_next=yes;; \
+ -*O?*) strip_trailopt 'O';; \
+ -*l) strip_trailopt 'l'; skip_next=yes;; \
+ -*l?*) strip_trailopt 'l';; \
+ -[dEDm]) skip_next=yes;; \
+ -[JT]) skip_next=yes;; \
+ esac; \
+ case $$flg in \
+ *$$target_option*) has_opt=yes; break;; \
+ esac; \
+ done; \
+ test $$has_opt = yes
+am__make_dryrun = (target_option=n; $(am__make_running_with_option))
+am__make_keepgoing = (target_option=k; $(am__make_running_with_option))
+pkgdatadir = $(datadir)/@PACKAGE@
+pkgincludedir = $(includedir)/@PACKAGE@
+pkglibdir = $(libdir)/@PACKAGE@
+pkglibexecdir = $(libexecdir)/@PACKAGE@
+am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd
+install_sh_DATA = $(install_sh) -c -m 644
+install_sh_PROGRAM = $(install_sh) -c
+install_sh_SCRIPT = $(install_sh) -c
+INSTALL_HEADER = $(INSTALL_DATA)
+transform = $(program_transform_name)
+NORMAL_INSTALL = :
+PRE_INSTALL = :
+POST_INSTALL = :
+NORMAL_UNINSTALL = :
+PRE_UNINSTALL = :
+POST_UNINSTALL = :
+build_triplet = @build@
+host_triplet = @host@
+subdir = plugins/imptcp
+ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
+am__aclocal_m4_deps = $(top_srcdir)/m4/ac_check_define.m4 \
+ $(top_srcdir)/m4/atomic_operations.m4 \
+ $(top_srcdir)/m4/atomic_operations_64bit.m4 \
+ $(top_srcdir)/m4/libtool.m4 $(top_srcdir)/m4/ltoptions.m4 \
+ $(top_srcdir)/m4/ltsugar.m4 $(top_srcdir)/m4/ltversion.m4 \
+ $(top_srcdir)/m4/lt~obsolete.m4 $(top_srcdir)/configure.ac
+am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \
+ $(ACLOCAL_M4)
+DIST_COMMON = $(srcdir)/Makefile.am $(am__DIST_COMMON)
+mkinstalldirs = $(install_sh) -d
+CONFIG_HEADER = $(top_builddir)/config.h
+CONFIG_CLEAN_FILES =
+CONFIG_CLEAN_VPATH_FILES =
+am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`;
+am__vpath_adj = case $$p in \
+ $(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \
+ *) f=$$p;; \
+ esac;
+am__strip_dir = f=`echo $$p | sed -e 's|^.*/||'`;
+am__install_max = 40
+am__nobase_strip_setup = \
+ srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*|]/\\\\&/g'`
+am__nobase_strip = \
+ for p in $$list; do echo "$$p"; done | sed -e "s|$$srcdirstrip/||"
+am__nobase_list = $(am__nobase_strip_setup); \
+ for p in $$list; do echo "$$p $$p"; done | \
+ sed "s| $$srcdirstrip/| |;"' / .*\//!s/ .*/ ./; s,\( .*\)/[^/]*$$,\1,' | \
+ $(AWK) 'BEGIN { files["."] = "" } { files[$$2] = files[$$2] " " $$1; \
+ if (++n[$$2] == $(am__install_max)) \
+ { print $$2, files[$$2]; n[$$2] = 0; files[$$2] = "" } } \
+ END { for (dir in files) print dir, files[dir] }'
+am__base_list = \
+ sed '$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;s/\n/ /g' | \
+ sed '$$!N;$$!N;$$!N;$$!N;s/\n/ /g'
+am__uninstall_files_from_dir = { \
+ test -z "$$files" \
+ || { test ! -d "$$dir" && test ! -f "$$dir" && test ! -r "$$dir"; } \
+ || { echo " ( cd '$$dir' && rm -f" $$files ")"; \
+ $(am__cd) "$$dir" && rm -f $$files; }; \
+ }
+am__installdirs = "$(DESTDIR)$(pkglibdir)"
+LTLIBRARIES = $(pkglib_LTLIBRARIES)
+imptcp_la_DEPENDENCIES =
+am_imptcp_la_OBJECTS = imptcp_la-imptcp.lo
+imptcp_la_OBJECTS = $(am_imptcp_la_OBJECTS)
+AM_V_lt = $(am__v_lt_@AM_V@)
+am__v_lt_ = $(am__v_lt_@AM_DEFAULT_V@)
+am__v_lt_0 = --silent
+am__v_lt_1 =
+imptcp_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \
+ $(LIBTOOLFLAGS) --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \
+ $(imptcp_la_LDFLAGS) $(LDFLAGS) -o $@
+AM_V_P = $(am__v_P_@AM_V@)
+am__v_P_ = $(am__v_P_@AM_DEFAULT_V@)
+am__v_P_0 = false
+am__v_P_1 = :
+AM_V_GEN = $(am__v_GEN_@AM_V@)
+am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@)
+am__v_GEN_0 = @echo " GEN " $@;
+am__v_GEN_1 =
+AM_V_at = $(am__v_at_@AM_V@)
+am__v_at_ = $(am__v_at_@AM_DEFAULT_V@)
+am__v_at_0 = @
+am__v_at_1 =
+DEFAULT_INCLUDES = -I.@am__isrc@ -I$(top_builddir)
+depcomp = $(SHELL) $(top_srcdir)/depcomp
+am__maybe_remake_depfiles = depfiles
+am__depfiles_remade = ./$(DEPDIR)/imptcp_la-imptcp.Plo
+am__mv = mv -f
+COMPILE = $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) \
+ $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS)
+LTCOMPILE = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \
+ $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) \
+ $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) \
+ $(AM_CFLAGS) $(CFLAGS)
+AM_V_CC = $(am__v_CC_@AM_V@)
+am__v_CC_ = $(am__v_CC_@AM_DEFAULT_V@)
+am__v_CC_0 = @echo " CC " $@;
+am__v_CC_1 =
+CCLD = $(CC)
+LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \
+ $(LIBTOOLFLAGS) --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \
+ $(AM_LDFLAGS) $(LDFLAGS) -o $@
+AM_V_CCLD = $(am__v_CCLD_@AM_V@)
+am__v_CCLD_ = $(am__v_CCLD_@AM_DEFAULT_V@)
+am__v_CCLD_0 = @echo " CCLD " $@;
+am__v_CCLD_1 =
+SOURCES = $(imptcp_la_SOURCES)
+DIST_SOURCES = $(imptcp_la_SOURCES)
+am__can_run_installinfo = \
+ case $$AM_UPDATE_INFO_DIR in \
+ n|no|NO) false;; \
+ *) (install-info --version) >/dev/null 2>&1;; \
+ esac
+am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP)
+# Read a list of newline-separated strings from the standard input,
+# and print each of them once, without duplicates. Input order is
+# *not* preserved.
+am__uniquify_input = $(AWK) '\
+ BEGIN { nonempty = 0; } \
+ { items[$$0] = 1; nonempty = 1; } \
+ END { if (nonempty) { for (i in items) print i; }; } \
+'
+# Make sure the list of sources is unique. This is necessary because,
+# e.g., the same source file might be shared among _SOURCES variables
+# for different programs/libraries.
+am__define_uniq_tagged_files = \
+ list='$(am__tagged_files)'; \
+ unique=`for i in $$list; do \
+ if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \
+ done | $(am__uniquify_input)`
+ETAGS = etags
+CTAGS = ctags
+am__DIST_COMMON = $(srcdir)/Makefile.in $(top_srcdir)/depcomp
+DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST)
+ACLOCAL = @ACLOCAL@
+AMTAR = @AMTAR@
+AM_DEFAULT_VERBOSITY = @AM_DEFAULT_VERBOSITY@
+APU_CFLAGS = @APU_CFLAGS@
+APU_LIBS = @APU_LIBS@
+AR = @AR@
+AUTOCONF = @AUTOCONF@
+AUTOHEADER = @AUTOHEADER@
+AUTOMAKE = @AUTOMAKE@
+AWK = @AWK@
+CC = @CC@
+CCDEPMODE = @CCDEPMODE@
+CFLAGS = @CFLAGS@
+CIVETWEB_LIBS = @CIVETWEB_LIBS@
+CONF_FILE_PATH = @CONF_FILE_PATH@
+CPP = @CPP@
+CPPFLAGS = @CPPFLAGS@
+CURL_CFLAGS = @CURL_CFLAGS@
+CURL_LIBS = @CURL_LIBS@
+CYGPATH_W = @CYGPATH_W@
+CZMQ_CFLAGS = @CZMQ_CFLAGS@
+CZMQ_LIBS = @CZMQ_LIBS@
+DEFS = @DEFS@
+DEPDIR = @DEPDIR@
+DLLTOOL = @DLLTOOL@
+DL_LIBS = @DL_LIBS@
+DSYMUTIL = @DSYMUTIL@
+DUMPBIN = @DUMPBIN@
+ECHO_C = @ECHO_C@
+ECHO_N = @ECHO_N@
+ECHO_T = @ECHO_T@
+EGREP = @EGREP@
+EXEEXT = @EXEEXT@
+FAUP_LIBS = @FAUP_LIBS@
+FGREP = @FGREP@
+GLIB_CFLAGS = @GLIB_CFLAGS@
+GLIB_LIBS = @GLIB_LIBS@
+GNUTLS_CFLAGS = @GNUTLS_CFLAGS@
+GNUTLS_LIBS = @GNUTLS_LIBS@
+GREP = @GREP@
+GSS_LIBS = @GSS_LIBS@
+GT_KSI_LS12_CFLAGS = @GT_KSI_LS12_CFLAGS@
+GT_KSI_LS12_LIBS = @GT_KSI_LS12_LIBS@
+HASH_XXHASH_LIBS = @HASH_XXHASH_LIBS@
+HIREDIS_CFLAGS = @HIREDIS_CFLAGS@
+HIREDIS_LIBS = @HIREDIS_LIBS@
+IMUDP_LIBS = @IMUDP_LIBS@
+INSTALL = @INSTALL@
+INSTALL_DATA = @INSTALL_DATA@
+INSTALL_PROGRAM = @INSTALL_PROGRAM@
+INSTALL_SCRIPT = @INSTALL_SCRIPT@
+INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@
+IP = @IP@
+JAVA = @JAVA@
+JAVAC = @JAVAC@
+LD = @LD@
+LDFLAGS = @LDFLAGS@
+LEX = @LEX@
+LEXLIB = @LEXLIB@
+LEX_OUTPUT_ROOT = @LEX_OUTPUT_ROOT@
+LIBCAPNG_CFLAGS = @LIBCAPNG_CFLAGS@
+LIBCAPNG_LIBS = @LIBCAPNG_LIBS@
+LIBCAPNG_PRESENT_CFLAGS = @LIBCAPNG_PRESENT_CFLAGS@
+LIBCAPNG_PRESENT_LIBS = @LIBCAPNG_PRESENT_LIBS@
+LIBDBI_CFLAGS = @LIBDBI_CFLAGS@
+LIBDBI_LIBS = @LIBDBI_LIBS@
+LIBESTR_CFLAGS = @LIBESTR_CFLAGS@
+LIBESTR_LIBS = @LIBESTR_LIBS@
+LIBEVENT_CFLAGS = @LIBEVENT_CFLAGS@
+LIBEVENT_LIBS = @LIBEVENT_LIBS@
+LIBFASTJSON_CFLAGS = @LIBFASTJSON_CFLAGS@
+LIBFASTJSON_LIBS = @LIBFASTJSON_LIBS@
+LIBGCRYPT_CFLAGS = @LIBGCRYPT_CFLAGS@
+LIBGCRYPT_CONFIG = @LIBGCRYPT_CONFIG@
+LIBGCRYPT_LIBS = @LIBGCRYPT_LIBS@
+LIBLOGGING_CFLAGS = @LIBLOGGING_CFLAGS@
+LIBLOGGING_LIBS = @LIBLOGGING_LIBS@
+LIBLOGGING_STDLOG_CFLAGS = @LIBLOGGING_STDLOG_CFLAGS@
+LIBLOGGING_STDLOG_LIBS = @LIBLOGGING_STDLOG_LIBS@
+LIBLOGNORM_CFLAGS = @LIBLOGNORM_CFLAGS@
+LIBLOGNORM_LIBS = @LIBLOGNORM_LIBS@
+LIBLZ4_CFLAGS = @LIBLZ4_CFLAGS@
+LIBLZ4_LIBS = @LIBLZ4_LIBS@
+LIBM = @LIBM@
+LIBMONGOC_CFLAGS = @LIBMONGOC_CFLAGS@
+LIBMONGOC_LIBS = @LIBMONGOC_LIBS@
+LIBOBJS = @LIBOBJS@
+LIBRDKAFKA_CFLAGS = @LIBRDKAFKA_CFLAGS@
+LIBRDKAFKA_LIBS = @LIBRDKAFKA_LIBS@
+LIBS = @LIBS@
+LIBSYSTEMD_CFLAGS = @LIBSYSTEMD_CFLAGS@
+LIBSYSTEMD_JOURNAL_CFLAGS = @LIBSYSTEMD_JOURNAL_CFLAGS@
+LIBSYSTEMD_JOURNAL_LIBS = @LIBSYSTEMD_JOURNAL_LIBS@
+LIBSYSTEMD_LIBS = @LIBSYSTEMD_LIBS@
+LIBTOOL = @LIBTOOL@
+LIBUUID_CFLAGS = @LIBUUID_CFLAGS@
+LIBUUID_LIBS = @LIBUUID_LIBS@
+LIPO = @LIPO@
+LN_S = @LN_S@
+LTLIBOBJS = @LTLIBOBJS@
+LT_SYS_LIBRARY_PATH = @LT_SYS_LIBRARY_PATH@
+MAKEINFO = @MAKEINFO@
+MANIFEST_TOOL = @MANIFEST_TOOL@
+MKDIR_P = @MKDIR_P@
+MYSQL_CFLAGS = @MYSQL_CFLAGS@
+MYSQL_CONFIG = @MYSQL_CONFIG@
+MYSQL_LIBS = @MYSQL_LIBS@
+NM = @NM@
+NMEDIT = @NMEDIT@
+OBJDUMP = @OBJDUMP@
+OBJEXT = @OBJEXT@
+OPENSSL_CFLAGS = @OPENSSL_CFLAGS@
+OPENSSL_LIBS = @OPENSSL_LIBS@
+OTOOL = @OTOOL@
+OTOOL64 = @OTOOL64@
+PACKAGE = @PACKAGE@
+PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@
+PACKAGE_NAME = @PACKAGE_NAME@
+PACKAGE_STRING = @PACKAGE_STRING@
+PACKAGE_TARNAME = @PACKAGE_TARNAME@
+PACKAGE_URL = @PACKAGE_URL@
+PACKAGE_VERSION = @PACKAGE_VERSION@
+PATH_SEPARATOR = @PATH_SEPARATOR@
+PGSQL_CFLAGS = @PGSQL_CFLAGS@
+PGSQL_LIBS = @PGSQL_LIBS@
+PG_CONFIG = @PG_CONFIG@
+PID_FILE_PATH = @PID_FILE_PATH@
+PKG_CONFIG = @PKG_CONFIG@
+PKG_CONFIG_LIBDIR = @PKG_CONFIG_LIBDIR@
+PKG_CONFIG_PATH = @PKG_CONFIG_PATH@
+PROTON_CFLAGS = @PROTON_CFLAGS@
+PROTON_LIBS = @PROTON_LIBS@
+PROTON_PROACTOR_CFLAGS = @PROTON_PROACTOR_CFLAGS@
+PROTON_PROACTOR_LIBS = @PROTON_PROACTOR_LIBS@
+PTHREADS_CFLAGS = @PTHREADS_CFLAGS@
+PTHREADS_LIBS = @PTHREADS_LIBS@
+PYTHON = @PYTHON@
+PYTHON_EXEC_PREFIX = @PYTHON_EXEC_PREFIX@
+PYTHON_PLATFORM = @PYTHON_PLATFORM@
+PYTHON_PREFIX = @PYTHON_PREFIX@
+PYTHON_VERSION = @PYTHON_VERSION@
+RABBITMQ_CFLAGS = @RABBITMQ_CFLAGS@
+RABBITMQ_LIBS = @RABBITMQ_LIBS@
+RANLIB = @RANLIB@
+READLINK = @READLINK@
+REDIS = @REDIS@
+RELP_CFLAGS = @RELP_CFLAGS@
+RELP_LIBS = @RELP_LIBS@
+RSRT_CFLAGS = @RSRT_CFLAGS@
+RSRT_CFLAGS1 = @RSRT_CFLAGS1@
+RSRT_LIBS = @RSRT_LIBS@
+RSRT_LIBS1 = @RSRT_LIBS1@
+RST2MAN = @RST2MAN@
+RT_LIBS = @RT_LIBS@
+SED = @SED@
+SET_MAKE = @SET_MAKE@
+SHELL = @SHELL@
+SNMP_CFLAGS = @SNMP_CFLAGS@
+SNMP_LIBS = @SNMP_LIBS@
+SOL_LIBS = @SOL_LIBS@
+STRIP = @STRIP@
+TCL_BIN_DIR = @TCL_BIN_DIR@
+TCL_INCLUDE_SPEC = @TCL_INCLUDE_SPEC@
+TCL_LIB_FILE = @TCL_LIB_FILE@
+TCL_LIB_FLAG = @TCL_LIB_FLAG@
+TCL_LIB_SPEC = @TCL_LIB_SPEC@
+TCL_PATCH_LEVEL = @TCL_PATCH_LEVEL@
+TCL_SRC_DIR = @TCL_SRC_DIR@
+TCL_STUB_LIB_FILE = @TCL_STUB_LIB_FILE@
+TCL_STUB_LIB_FLAG = @TCL_STUB_LIB_FLAG@
+TCL_STUB_LIB_SPEC = @TCL_STUB_LIB_SPEC@
+TCL_VERSION = @TCL_VERSION@
+UDPSPOOF_CFLAGS = @UDPSPOOF_CFLAGS@
+UDPSPOOF_LIBS = @UDPSPOOF_LIBS@
+VALGRIND = @VALGRIND@
+VERSION = @VERSION@
+WARN_CFLAGS = @WARN_CFLAGS@
+WARN_LDFLAGS = @WARN_LDFLAGS@
+WARN_SCANNERFLAGS = @WARN_SCANNERFLAGS@
+WGET = @WGET@
+YACC = @YACC@
+YACC_FOUND = @YACC_FOUND@
+YFLAGS = @YFLAGS@
+ZLIB_CFLAGS = @ZLIB_CFLAGS@
+ZLIB_LIBS = @ZLIB_LIBS@
+ZSTD_CFLAGS = @ZSTD_CFLAGS@
+ZSTD_LIBS = @ZSTD_LIBS@
+abs_builddir = @abs_builddir@
+abs_srcdir = @abs_srcdir@
+abs_top_builddir = @abs_top_builddir@
+abs_top_srcdir = @abs_top_srcdir@
+ac_ct_AR = @ac_ct_AR@
+ac_ct_CC = @ac_ct_CC@
+ac_ct_DUMPBIN = @ac_ct_DUMPBIN@
+am__include = @am__include@
+am__leading_dot = @am__leading_dot@
+am__quote = @am__quote@
+am__tar = @am__tar@
+am__untar = @am__untar@
+bindir = @bindir@
+build = @build@
+build_alias = @build_alias@
+build_cpu = @build_cpu@
+build_os = @build_os@
+build_vendor = @build_vendor@
+builddir = @builddir@
+datadir = @datadir@
+datarootdir = @datarootdir@
+docdir = @docdir@
+dvidir = @dvidir@
+exec_prefix = @exec_prefix@
+host = @host@
+host_alias = @host_alias@
+host_cpu = @host_cpu@
+host_os = @host_os@
+host_vendor = @host_vendor@
+htmldir = @htmldir@
+includedir = @includedir@
+infodir = @infodir@
+install_sh = @install_sh@
+libdir = @libdir@
+libexecdir = @libexecdir@
+localedir = @localedir@
+localstatedir = @localstatedir@
+mandir = @mandir@
+mkdir_p = @mkdir_p@
+moddirs = @moddirs@
+oldincludedir = @oldincludedir@
+pdfdir = @pdfdir@
+pkgpyexecdir = @pkgpyexecdir@
+pkgpythondir = @pkgpythondir@
+prefix = @prefix@
+program_transform_name = @program_transform_name@
+psdir = @psdir@
+pyexecdir = @pyexecdir@
+pythondir = @pythondir@
+runstatedir = @runstatedir@
+sbindir = @sbindir@
+sharedstatedir = @sharedstatedir@
+srcdir = @srcdir@
+sysconfdir = @sysconfdir@
+target_alias = @target_alias@
+top_build_prefix = @top_build_prefix@
+top_builddir = @top_builddir@
+top_srcdir = @top_srcdir@
+pkglib_LTLIBRARIES = imptcp.la
+imptcp_la_SOURCES = imptcp.c
+imptcp_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS)
+imptcp_la_LDFLAGS = -module -avoid-version
+imptcp_la_LIBADD =
+all: all-am
+
+.SUFFIXES:
+.SUFFIXES: .c .lo .o .obj
+$(srcdir)/Makefile.in: $(srcdir)/Makefile.am $(am__configure_deps)
+ @for dep in $?; do \
+ case '$(am__configure_deps)' in \
+ *$$dep*) \
+ ( cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh ) \
+ && { if test -f $@; then exit 0; else break; fi; }; \
+ exit 1;; \
+ esac; \
+ done; \
+ echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu plugins/imptcp/Makefile'; \
+ $(am__cd) $(top_srcdir) && \
+ $(AUTOMAKE) --gnu plugins/imptcp/Makefile
+Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status
+ @case '$?' in \
+ *config.status*) \
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \
+ *) \
+ echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__maybe_remake_depfiles)'; \
+ cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__maybe_remake_depfiles);; \
+ esac;
+
+$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+
+$(top_srcdir)/configure: $(am__configure_deps)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+$(ACLOCAL_M4): $(am__aclocal_m4_deps)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+$(am__aclocal_m4_deps):
+
+install-pkglibLTLIBRARIES: $(pkglib_LTLIBRARIES)
+ @$(NORMAL_INSTALL)
+ @list='$(pkglib_LTLIBRARIES)'; test -n "$(pkglibdir)" || list=; \
+ list2=; for p in $$list; do \
+ if test -f $$p; then \
+ list2="$$list2 $$p"; \
+ else :; fi; \
+ done; \
+ test -z "$$list2" || { \
+ echo " $(MKDIR_P) '$(DESTDIR)$(pkglibdir)'"; \
+ $(MKDIR_P) "$(DESTDIR)$(pkglibdir)" || exit 1; \
+ echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 '$(DESTDIR)$(pkglibdir)'"; \
+ $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 "$(DESTDIR)$(pkglibdir)"; \
+ }
+
+uninstall-pkglibLTLIBRARIES:
+ @$(NORMAL_UNINSTALL)
+ @list='$(pkglib_LTLIBRARIES)'; test -n "$(pkglibdir)" || list=; \
+ for p in $$list; do \
+ $(am__strip_dir) \
+ echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=uninstall rm -f '$(DESTDIR)$(pkglibdir)/$$f'"; \
+ $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=uninstall rm -f "$(DESTDIR)$(pkglibdir)/$$f"; \
+ done
+
+clean-pkglibLTLIBRARIES:
+ -test -z "$(pkglib_LTLIBRARIES)" || rm -f $(pkglib_LTLIBRARIES)
+ @list='$(pkglib_LTLIBRARIES)'; \
+ locs=`for p in $$list; do echo $$p; done | \
+ sed 's|^[^/]*$$|.|; s|/[^/]*$$||; s|$$|/so_locations|' | \
+ sort -u`; \
+ test -z "$$locs" || { \
+ echo rm -f $${locs}; \
+ rm -f $${locs}; \
+ }
+
+imptcp.la: $(imptcp_la_OBJECTS) $(imptcp_la_DEPENDENCIES) $(EXTRA_imptcp_la_DEPENDENCIES)
+ $(AM_V_CCLD)$(imptcp_la_LINK) -rpath $(pkglibdir) $(imptcp_la_OBJECTS) $(imptcp_la_LIBADD) $(LIBS)
+
+mostlyclean-compile:
+ -rm -f *.$(OBJEXT)
+
+distclean-compile:
+ -rm -f *.tab.c
+
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/imptcp_la-imptcp.Plo@am__quote@ # am--include-marker
+
+$(am__depfiles_remade):
+ @$(MKDIR_P) $(@D)
+ @echo '# dummy' >$@-t && $(am__mv) $@-t $@
+
+am--depfiles: $(am__depfiles_remade)
+
+.c.o:
+@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.o$$||'`;\
+@am__fastdepCC_TRUE@ $(COMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ $< &&\
+@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Po
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(COMPILE) -c -o $@ $<
+
+.c.obj:
+@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.obj$$||'`;\
+@am__fastdepCC_TRUE@ $(COMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ `$(CYGPATH_W) '$<'` &&\
+@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Po
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(COMPILE) -c -o $@ `$(CYGPATH_W) '$<'`
+
+.c.lo:
+@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.lo$$||'`;\
+@am__fastdepCC_TRUE@ $(LTCOMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ $< &&\
+@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Plo
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(LTCOMPILE) -c -o $@ $<
+
+imptcp_la-imptcp.lo: imptcp.c
+@am__fastdepCC_TRUE@ $(AM_V_CC)$(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(imptcp_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT imptcp_la-imptcp.lo -MD -MP -MF $(DEPDIR)/imptcp_la-imptcp.Tpo -c -o imptcp_la-imptcp.lo `test -f 'imptcp.c' || echo '$(srcdir)/'`imptcp.c
+@am__fastdepCC_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/imptcp_la-imptcp.Tpo $(DEPDIR)/imptcp_la-imptcp.Plo
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='imptcp.c' object='imptcp_la-imptcp.lo' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(imptcp_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o imptcp_la-imptcp.lo `test -f 'imptcp.c' || echo '$(srcdir)/'`imptcp.c
+
+mostlyclean-libtool:
+ -rm -f *.lo
+
+clean-libtool:
+ -rm -rf .libs _libs
+
+ID: $(am__tagged_files)
+ $(am__define_uniq_tagged_files); mkid -fID $$unique
+tags: tags-am
+TAGS: tags
+
+tags-am: $(TAGS_DEPENDENCIES) $(am__tagged_files)
+ set x; \
+ here=`pwd`; \
+ $(am__define_uniq_tagged_files); \
+ shift; \
+ if test -z "$(ETAGS_ARGS)$$*$$unique"; then :; else \
+ test -n "$$unique" || unique=$$empty_fix; \
+ if test $$# -gt 0; then \
+ $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \
+ "$$@" $$unique; \
+ else \
+ $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \
+ $$unique; \
+ fi; \
+ fi
+ctags: ctags-am
+
+CTAGS: ctags
+ctags-am: $(TAGS_DEPENDENCIES) $(am__tagged_files)
+ $(am__define_uniq_tagged_files); \
+ test -z "$(CTAGS_ARGS)$$unique" \
+ || $(CTAGS) $(CTAGSFLAGS) $(AM_CTAGSFLAGS) $(CTAGS_ARGS) \
+ $$unique
+
+GTAGS:
+ here=`$(am__cd) $(top_builddir) && pwd` \
+ && $(am__cd) $(top_srcdir) \
+ && gtags -i $(GTAGS_ARGS) "$$here"
+cscopelist: cscopelist-am
+
+cscopelist-am: $(am__tagged_files)
+ list='$(am__tagged_files)'; \
+ case "$(srcdir)" in \
+ [\\/]* | ?:[\\/]*) sdir="$(srcdir)" ;; \
+ *) sdir=$(subdir)/$(srcdir) ;; \
+ esac; \
+ for i in $$list; do \
+ if test -f "$$i"; then \
+ echo "$(subdir)/$$i"; \
+ else \
+ echo "$$sdir/$$i"; \
+ fi; \
+ done >> $(top_builddir)/cscope.files
+
+distclean-tags:
+ -rm -f TAGS ID GTAGS GRTAGS GSYMS GPATH tags
+
+distdir: $(BUILT_SOURCES)
+ $(MAKE) $(AM_MAKEFLAGS) distdir-am
+
+distdir-am: $(DISTFILES)
+ @srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \
+ topsrcdirstrip=`echo "$(top_srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \
+ list='$(DISTFILES)'; \
+ dist_files=`for file in $$list; do echo $$file; done | \
+ sed -e "s|^$$srcdirstrip/||;t" \
+ -e "s|^$$topsrcdirstrip/|$(top_builddir)/|;t"`; \
+ case $$dist_files in \
+ */*) $(MKDIR_P) `echo "$$dist_files" | \
+ sed '/\//!d;s|^|$(distdir)/|;s,/[^/]*$$,,' | \
+ sort -u` ;; \
+ esac; \
+ for file in $$dist_files; do \
+ if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \
+ if test -d $$d/$$file; then \
+ dir=`echo "/$$file" | sed -e 's,/[^/]*$$,,'`; \
+ if test -d "$(distdir)/$$file"; then \
+ find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \
+ fi; \
+ if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \
+ cp -fpR $(srcdir)/$$file "$(distdir)$$dir" || exit 1; \
+ find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \
+ fi; \
+ cp -fpR $$d/$$file "$(distdir)$$dir" || exit 1; \
+ else \
+ test -f "$(distdir)/$$file" \
+ || cp -p $$d/$$file "$(distdir)/$$file" \
+ || exit 1; \
+ fi; \
+ done
+check-am: all-am
+check: check-am
+all-am: Makefile $(LTLIBRARIES)
+installdirs:
+ for dir in "$(DESTDIR)$(pkglibdir)"; do \
+ test -z "$$dir" || $(MKDIR_P) "$$dir"; \
+ done
+install: install-am
+install-exec: install-exec-am
+install-data: install-data-am
+uninstall: uninstall-am
+
+install-am: all-am
+ @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am
+
+installcheck: installcheck-am
+install-strip:
+ if test -z '$(STRIP)'; then \
+ $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
+ install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
+ install; \
+ else \
+ $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
+ install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
+ "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'" install; \
+ fi
+mostlyclean-generic:
+
+clean-generic:
+
+distclean-generic:
+ -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES)
+ -test . = "$(srcdir)" || test -z "$(CONFIG_CLEAN_VPATH_FILES)" || rm -f $(CONFIG_CLEAN_VPATH_FILES)
+
+maintainer-clean-generic:
+ @echo "This command is intended for maintainers to use"
+ @echo "it deletes files that may require special tools to rebuild."
+clean: clean-am
+
+clean-am: clean-generic clean-libtool clean-pkglibLTLIBRARIES \
+ mostlyclean-am
+
+distclean: distclean-am
+ -rm -f ./$(DEPDIR)/imptcp_la-imptcp.Plo
+ -rm -f Makefile
+distclean-am: clean-am distclean-compile distclean-generic \
+ distclean-tags
+
+dvi: dvi-am
+
+dvi-am:
+
+html: html-am
+
+html-am:
+
+info: info-am
+
+info-am:
+
+install-data-am:
+
+install-dvi: install-dvi-am
+
+install-dvi-am:
+
+install-exec-am: install-pkglibLTLIBRARIES
+
+install-html: install-html-am
+
+install-html-am:
+
+install-info: install-info-am
+
+install-info-am:
+
+install-man:
+
+install-pdf: install-pdf-am
+
+install-pdf-am:
+
+install-ps: install-ps-am
+
+install-ps-am:
+
+installcheck-am:
+
+maintainer-clean: maintainer-clean-am
+ -rm -f ./$(DEPDIR)/imptcp_la-imptcp.Plo
+ -rm -f Makefile
+maintainer-clean-am: distclean-am maintainer-clean-generic
+
+mostlyclean: mostlyclean-am
+
+mostlyclean-am: mostlyclean-compile mostlyclean-generic \
+ mostlyclean-libtool
+
+pdf: pdf-am
+
+pdf-am:
+
+ps: ps-am
+
+ps-am:
+
+uninstall-am: uninstall-pkglibLTLIBRARIES
+
+.MAKE: install-am install-strip
+
+.PHONY: CTAGS GTAGS TAGS all all-am am--depfiles check check-am clean \
+ clean-generic clean-libtool clean-pkglibLTLIBRARIES \
+ cscopelist-am ctags ctags-am distclean distclean-compile \
+ distclean-generic distclean-libtool distclean-tags distdir dvi \
+ dvi-am html html-am info info-am install install-am \
+ install-data install-data-am install-dvi install-dvi-am \
+ install-exec install-exec-am install-html install-html-am \
+ install-info install-info-am install-man install-pdf \
+ install-pdf-am install-pkglibLTLIBRARIES install-ps \
+ install-ps-am install-strip installcheck installcheck-am \
+ installdirs maintainer-clean maintainer-clean-generic \
+ mostlyclean mostlyclean-compile mostlyclean-generic \
+ mostlyclean-libtool pdf pdf-am ps ps-am tags tags-am uninstall \
+ uninstall-am uninstall-pkglibLTLIBRARIES
+
+.PRECIOUS: Makefile
+
+
+# Tell versions [3.59,3.63) of GNU make to not export all variables.
+# Otherwise a system limit (for SysV at least) may be exceeded.
+.NOEXPORT:
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
new file mode 100644
index 0000000..351dee0
--- /dev/null
+++ b/plugins/imptcp/imptcp.c
@@ -0,0 +1,2676 @@
+/* imptcp.c
+ * This is a native implementation of plain tcp. It is intentionally
+ * duplicate work (imtcp). The intent is to gain very fast and simple
+ * native ptcp support, utilizing the best interfaces Linux (no cross-
+ * platform intended!) has to offer.
+ *
+ * Note that in this module we try out some new naming conventions,
+ * so it may look a bit "different" from the other modules. We are
+ * investigating if removing prefixes can help make code more readable.
+ *
+ * File begun on 2010-08-10 by RGerhards
+ *
+ * Copyright 2007-2022 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#if !defined(HAVE_EPOLL_CREATE)
+# error imptcp requires OS support for epoll - can not build
+ /* imptcp gains speed by using modern Linux capabilities. As such,
+ * it can only be build on platforms supporting the epoll API.
+ */
+#endif
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <stdarg.h>
+#include <ctype.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/epoll.h>
+#include <sys/queue.h>
+#include <netinet/tcp.h>
+#include <stdint.h>
+#include <zlib.h>
+#include <sys/stat.h>
+#include <regex.h>
+#if HAVE_FCNTL_H
+#include <fcntl.h>
+#endif
+#ifdef HAVE_SYS_PRCTL_H
+# include <sys/prctl.h>
+#endif
+#include "rsyslog.h"
+#include "cfsysline.h"
+#include "prop.h"
+#include "dirty.h"
+#include "module-template.h"
+#include "unicode-helper.h"
+#include "glbl.h"
+#include "errmsg.h"
+#include "srUtils.h"
+#include "datetime.h"
+#include "ruleset.h"
+#include "msg.h"
+#include "parserif.h"
+#include "statsobj.h"
+#include "ratelimit.h"
+#include "net.h" /* for permittedPeers, may be removed when this is removed */
+
+/* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */
+#define TCPSRV_NO_ADDTL_DELIMITER -1 /* specifies that no additional delimiter is to be used in TCP framing */
+
+MODULE_TYPE_INPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("imptcp")
+
+/* static data */
+DEF_IMOD_STATIC_DATA
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(net)
+DEFobjCurrIf(prop)
+DEFobjCurrIf(datetime)
+DEFobjCurrIf(ruleset)
+DEFobjCurrIf(statsobj)
+
+/* forward references */
+static void * wrkr(void *myself);
+
+/* unfortunately, on some platforms EAGAIN == EWOULDBOLOCK and so checking against
+ * both of them generates a gcc 8 warning for this reason. We do not want to disable
+ * the warning, so we need to work around this via a macro.
+ */
+#if EAGAIN == EWOULDBLOCK
+ #define CHK_EAGAIN_EWOULDBLOCK (errno == EAGAIN)
+#else
+ #define CHK_EAGAIN_EWOULDBLOCK (errno == EAGAIN | errno == EWOULDBLOCK)
+#endif /* #if EAGAIN == EWOULDBOLOCK */
+
+#define DFLT_wrkrMax 2
+#define DFLT_inlineDispatchThreshold 1
+
+#define COMPRESS_NEVER 0
+#define COMPRESS_SINGLE_MSG 1 /* old, single-message compression */
+/* all other settings are for stream-compression */
+#define COMPRESS_STREAM_ALWAYS 2
+
+/* config settings */
+typedef struct configSettings_s {
+ int bKeepAlive; /* support keep-alive packets */
+ int iKeepAliveIntvl;
+ int iKeepAliveProbes;
+ int iKeepAliveTime;
+ int bEmitMsgOnClose; /* emit an informational message on close by remote peer */
+ int bEmitMsgOnOpen;
+ int bSuppOctetFram; /* support octet-counted framing? */
+ int iAddtlFrameDelim; /* addtl frame delimiter, e.g. for netscreen, default none */
+ int maxFrameSize;
+ uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
+ uchar *lstnIP; /* which IP we should listen on? */
+ uchar *pszBindRuleset;
+ int wrkrMax; /* max number of workers (actually "helper workers") */
+ int iTCPSessMax; /* max open connections per instance */
+} configSettings_t;
+static configSettings_t cs;
+
+struct instanceConf_s {
+ int bKeepAlive; /* support keep-alive packets */
+ int iKeepAliveIntvl;
+ int iKeepAliveProbes;
+ int iKeepAliveTime;
+ int bEmitMsgOnClose;
+ int bEmitMsgOnOpen;
+ int bSuppOctetFram; /* support octet-counted framing? */
+ int bSPFramingFix;
+ int iAddtlFrameDelim;
+ int socketBacklog;
+ sbool multiLine;
+ uint8_t compressionMode;
+ uchar *pszBindPort; /* port to bind to */
+ uchar *pszLstnPortFileName; /* Name of the file with dynamic port used by testbench*/
+ uchar *pszBindAddr; /* IP to bind socket to */
+ uchar *pszBindPath; /* Path to bind socket to */
+ uchar *pszBindRuleset; /* name of ruleset to bind to */
+ uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
+ int fCreateMode; /* file creation mode for open() */
+ uid_t fileUID; /* IDs for creation */
+ gid_t fileGID;
+ int maxFrameSize;
+ int bFailOnPerms; /* fail creation if chown fails? */
+ ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ uchar *dfltTZ;
+ sbool bUnlink;
+ sbool discardTruncatedMsg;
+ sbool flowControl;
+ unsigned int ratelimitInterval;
+ unsigned int ratelimitBurst;
+ uchar *startRegex;
+ regex_t start_preg; /* compiled version of startRegex */
+ int iTCPSessMax; /* max open connections */
+ struct instanceConf_s *next;
+};
+
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+ instanceConf_t *root, *tail;
+ int wrkrMax;
+ int bProcessOnPoller;
+ int iTCPSessMax;
+ sbool configSetViaV2Method;
+};
+
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */
+
+/* module-global parameters */
+static struct cnfparamdescr modpdescr[] = {
+ { "threads", eCmdHdlrPositiveInt, 0 },
+ { "maxsessions", eCmdHdlrInt, 0 },
+ { "processOnPoller", eCmdHdlrBinary, 0 }
+};
+static struct cnfparamblk modpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+ };
+
+/* input instance parameters */
+static struct cnfparamdescr inppdescr[] = {
+ { "port", eCmdHdlrString, 0 }, /* legacy: InputTCPServerRun */
+ { "address", eCmdHdlrString, 0 },
+ { "path", eCmdHdlrString, 0 },
+ { "unlink", eCmdHdlrBinary, 0 },
+ { "discardtruncatedmsg", eCmdHdlrBinary, 0 },
+ { "fileowner", eCmdHdlrUID, 0 },
+ { "fileownernum", eCmdHdlrInt, 0 },
+ { "filegroup", eCmdHdlrGID, 0 },
+ { "filegroupnum", eCmdHdlrInt, 0 },
+ { "filecreatemode", eCmdHdlrFileCreateMode, 0 },
+ { "failonchownfailure", eCmdHdlrBinary, 0 },
+ { "flowcontrol", eCmdHdlrBinary, 0 },
+ { "name", eCmdHdlrString, 0 },
+ { "maxframesize", eCmdHdlrInt, 0 },
+ { "framing.delimiter.regex", eCmdHdlrString, 0 },
+ { "ruleset", eCmdHdlrString, 0 },
+ { "defaulttz", eCmdHdlrString, 0 },
+ { "supportoctetcountedframing", eCmdHdlrBinary, 0 },
+ { "framingfix.cisco.asa", eCmdHdlrBinary, 0 },
+ { "maxsessions", eCmdHdlrInt, 0 },
+ { "notifyonconnectionclose", eCmdHdlrBinary, 0 },
+ { "notifyonconnectionopen", eCmdHdlrBinary, 0 },
+ { "compression.mode", eCmdHdlrGetWord, 0 },
+ { "keepalive", eCmdHdlrBinary, 0 },
+ { "keepalive.probes", eCmdHdlrInt, 0 },
+ { "keepalive.time", eCmdHdlrInt, 0 },
+ { "keepalive.interval", eCmdHdlrInt, 0 },
+ { "addtlframedelimiter", eCmdHdlrInt, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 },
+ { "multiline", eCmdHdlrBinary, 0 },
+ { "listenportfilename", eCmdHdlrString, 0 },
+ { "socketbacklog", eCmdHdlrInt, 0 }
+};
+static struct cnfparamblk inppblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(inppdescr)/sizeof(struct cnfparamdescr),
+ inppdescr
+ };
+
+#include "im-helper.h" /* must be included AFTER the type definitions! */
+static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
+
+/* data elements describing our running config */
+typedef struct ptcpsrv_s ptcpsrv_t;
+typedef struct ptcplstn_s ptcplstn_t;
+typedef struct ptcpsess_s ptcpsess_t;
+typedef struct epolld_s epolld_t;
+
+/* the ptcp server (listener) object
+ * Note that the object contains support for forming a linked list
+ * of them. It does not make sense to do this seperately.
+ */
+struct ptcpsrv_s {
+ ptcpsrv_t *pNext; /* linked list maintenance */
+ uchar *port; /* Port to listen to */
+ uchar *lstnIP; /* which IP we should listen on? */
+ uchar *path; /* Use a unix socket instead */
+ int fCreateMode; /* file creation mode for open() */
+ uid_t fileUID; /* IDs for creation */
+ gid_t fileGID;
+ int maxFrameSize;
+ int bFailOnPerms; /* fail creation if chown fails? */
+ sbool bUnixSocket;
+ int socketBacklog;
+ uchar *pszLstnPortFileName;
+ int iAddtlFrameDelim;
+ sbool multiLine;
+ int iKeepAliveIntvl;
+ int iKeepAliveProbes;
+ int iKeepAliveTime;
+ uint8_t compressionMode;
+ uchar *pszInputName;
+ uchar *dfltTZ;
+ prop_t *pInputName; /* InputName in (fast to process) property format */
+ ruleset_t *pRuleset;
+ ptcplstn_t *pLstn; /* root of our listeners */
+ ptcpsess_t *pSess; /* root of our sessions */
+ int iTCPSessCnt;
+ int iTCPSessMax;
+ pthread_mutex_t mutSessLst;
+ sbool bKeepAlive; /* support keep-alive packets */
+ sbool bEmitMsgOnClose;
+ sbool bEmitMsgOnOpen;
+ sbool bSuppOctetFram;
+ sbool bSPFramingFix;
+ sbool bUnlink;
+ sbool discardTruncatedMsg;
+ sbool flowControl;
+ ratelimit_t *ratelimiter;
+ instanceConf_t *inst;
+};
+
+/* the ptcp session object. Describes a single active session.
+ * includes support for doubly-linked list.
+ */
+struct ptcpsess_s {
+ ptcplstn_t *pLstn; /* our listener */
+ ptcpsess_t *prev, *next;
+ int sock;
+ epolld_t *epd;
+ sbool bzInitDone; /* did we do an init of zstrm already? */
+ z_stream zstrm; /* zip stream to use for tcp compression */
+ uint8_t compressionMode;
+ int iMsg; /* index of next char to store in msg */
+ int iCurrLine; /* 2nd char of current line in regex framing mode */
+ int bAtStrtOfFram; /* are we at the very beginning of a new frame? */
+ sbool bSuppOctetFram; /**< copy from listener, to speed up access */
+ sbool bSPFramingFix;
+ enum {
+ eAtStrtFram,
+ eInOctetCnt,
+ eInMsg,
+ eInMsgTruncation
+ } inputState; /* our current state */
+ int iOctetsRemain; /* Number of Octets remaining in message */
+ TCPFRAMINGMODE eFraming;
+ uchar *pMsg; /* message (fragment) received */
+ uchar *pMsg_save; /* message (fragment) save area in regex framing mode */
+ prop_t *peerName; /* host name we received messages from */
+ prop_t *peerIP;
+ const uchar *startRegex;/* cache for performance reasons */
+ int iAddtlFrameDelim; /* cache for performance reasons */
+};
+
+
+/* the ptcp listener object. Describes a single active listener.
+ */
+struct ptcplstn_s {
+ ptcpsrv_t *pSrv; /* our server */
+ ptcplstn_t *prev, *next;
+ int sock;
+ sbool bSuppOctetFram;
+ sbool bSPFramingFix;
+ epolld_t *epd;
+ statsobj_t *stats; /* listener stats */
+ intctr_t rcvdBytes;
+ intctr_t rcvdDecompressed;
+ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
+ STATSCOUNTER_DEF(ctrSessOpen, mutCtrSessOpen)
+ STATSCOUNTER_DEF(ctrSessOpenErr, mutCtrSessOpenErr)
+ STATSCOUNTER_DEF(ctrSessClose, mutCtrSessClose)
+ DEF_ATOMIC_HELPER_MUT64(mut_rcvdBytes)
+};
+
+
+/* The following structure controls the worker threads. Global data is
+ * needed for their access.
+ */
+static struct wrkrInfo_s {
+ pthread_t tid; /* the worker's thread ID */
+ long long unsigned numCalled; /* how often was this called */
+ int wrkrIdx; /* index for this worker - shortcut for thread name */
+} *wrkrInfo;
+static int wrkrRunning;
+
+
+/* type of object stored in epoll descriptor */
+typedef enum {
+ epolld_lstn,
+ epolld_sess
+} epolld_type_t;
+
+/* an epoll descriptor. contains all information necessary to process
+ * the result of epoll.
+ */
+struct epolld_s {
+ epolld_type_t typ;
+ void *ptr;
+ int sock;
+ struct epoll_event ev;
+};
+
+typedef struct io_req_s {
+ STAILQ_ENTRY(io_req_s) link;
+ epolld_t *epd;
+} io_req_t;
+
+typedef struct io_q_s {
+ STAILQ_HEAD(ioq_s, io_req_s) q;
+ STATSCOUNTER_DEF(ctrEnq, mutCtrEnq);
+ int ctrMaxSz; //TODO: discuss potential problems around concurrent reads and writes
+ int sz; //current q size
+ statsobj_t *stats;
+ pthread_mutex_t mut;
+ pthread_cond_t wakeup_worker;
+} io_q_t;
+
+/* global data */
+pthread_attr_t wrkrThrdAttr; /* Attribute for session threads; read only after startup */
+static ptcpsrv_t *pSrvRoot = NULL;
+static int epollfd = -1; /* (sole) descriptor for epoll */
+static int iMaxLine; /* maximum size of a single message */
+static io_q_t io_q;
+
+/* forward definitions */
+static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal);
+static rsRetVal addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6);
+
+
+/* function to suppress TSAN known-good case
+ * We do not use a mutex an epd, but we do always access it in
+ * pure sequence. Adding a mutex just to cover this "cosmetic"
+ * would result in uncesseary performance penalty.
+ */
+static void ATTR_NONNULL()
+imptcp_destruct_epd(ptcpsess_t *const pSess) {
+ free(pSess->epd);
+ pSess->epd = NULL;
+}
+
+/* some simple constructors/destructors */
+static void ATTR_NONNULL()
+destructSess(ptcpsess_t *const pSess)
+{
+ imptcp_destruct_epd(pSess);
+ free(pSess->pMsg_save);
+ free(pSess->pMsg);
+ prop.Destruct(&pSess->peerName);
+ prop.Destruct(&pSess->peerIP);
+ /* TODO: make these inits compile-time switch depending: */
+ pSess->pMsg = NULL;
+ free(pSess);
+}
+
+/* remove session from server */
+static void
+unlinkSess(ptcpsess_t *pSess) {
+ ptcpsrv_t *pSrv = pSess->pLstn->pSrv;
+ pthread_mutex_lock(&pSrv->mutSessLst);
+ pSrv->iTCPSessCnt--;
+ /* finally unlink session from structures */
+ if(pSess->next != NULL)
+ pSess->next->prev = pSess->prev;
+ if(pSess->prev == NULL) {
+ /* need to update root! */
+ pSrv->pSess = pSess->next;
+ } else {
+ pSess->prev->next = pSess->next;
+ }
+ pthread_mutex_unlock(&pSrv->mutSessLst);
+}
+
+static void
+destructSrv(ptcpsrv_t *pSrv)
+{
+ if(pSrv->ratelimiter != NULL)
+ ratelimitDestruct(pSrv->ratelimiter);
+ if(pSrv->pInputName != NULL)
+ prop.Destruct(&pSrv->pInputName);
+ pthread_mutex_destroy(&pSrv->mutSessLst);
+ free(pSrv->pszInputName);
+ free(pSrv->port);
+ free(pSrv->pszLstnPortFileName);
+ free(pSrv->path);
+ free(pSrv->lstnIP);
+ free(pSrv);
+}
+
+/****************************************** TCP SUPPORT FUNCTIONS ***********************************/
+/* We may later think about moving this into a helper library again. But the whole point
+ * so far was to keep everything related close togehter. -- rgerhards, 2010-08-10
+ */
+
+static rsRetVal startupUXSrv(ptcpsrv_t *pSrv) {
+ DEFiRet;
+ int sock;
+ int sockflags;
+ struct sockaddr_un local;
+
+ uchar *path = pSrv->path == NULL ? UCHAR_CONSTANT("") : pSrv->path;
+ DBGPRINTF("imptcp: creating listen unix socket at %s\n", path);
+
+ sock = socket(AF_UNIX, SOCK_STREAM, 0);
+ if(sock < 0) {
+ LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: error creating unix socket");
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
+ }
+
+ local.sun_family = AF_UNIX;
+ strncpy(local.sun_path, (char*) path, sizeof(local.sun_path)-1);
+ if (pSrv->bUnlink) {
+ unlink(local.sun_path);
+ }
+
+ /* We use non-blocking IO! */
+ if ((sockflags = fcntl(sock, F_GETFL)) != -1) {
+ sockflags |= O_NONBLOCK;
+ /* SETFL could fail too, so get it caught by the subsequent error check. */
+ sockflags = fcntl(sock, F_SETFL, sockflags);
+ }
+
+ if (sockflags == -1) {
+ LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: error setting fcntl(O_NONBLOCK) on unix socket");
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
+ }
+
+ if (bind(sock, (struct sockaddr *)&local, SUN_LEN(&local)) < 0) {
+ LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: error while binding unix socket");
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
+ }
+
+ if (listen(sock, pSrv->socketBacklog) < 0) {
+ LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: unix socket listen error");
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
+ }
+
+ if(chown(local.sun_path, pSrv->fileUID, pSrv->fileGID) != 0) {
+ if(pSrv->bFailOnPerms) {
+ LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: unix socket chown error");
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
+ }
+ }
+
+ if(chmod(local.sun_path, pSrv->fCreateMode) != 0) {
+ if(pSrv->bFailOnPerms) {
+ LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: unix socket chmod error");
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
+ }
+ }
+
+ CHKiRet(addLstn(pSrv, sock, 0));
+
+finalize_it:
+ if (iRet != RS_RET_OK) {
+ if (sock != -1) {
+ close(sock);
+ }
+ }
+
+ RETiRet;
+}
+
+/* Start up a server. That means all of its listeners are created.
+ * Does NOT yet accept/process any incoming data (but binds ports). Hint: this
+ * code is to be executed before dropping privileges.
+ */
+PRAGMA_DIAGNOSTIC_PUSH
+static rsRetVal
+startupSrv(ptcpsrv_t *pSrv)
+{
+ DEFiRet;
+ int error, maxs, on = 1;
+ int sock = -1;
+ int numSocks;
+ int sockflags;
+ struct addrinfo hints, *res = NULL, *r;
+ uchar *lstnIP;
+ int isIPv6 = 0;
+ int port_override = 0; /* if dyn port (0): use this for actually bound port */
+ union {
+ struct sockaddr *sa;
+ struct sockaddr_in *ipv4;
+ struct sockaddr_in6 *ipv6;
+ } savecast;
+
+ if (pSrv->bUnixSocket) {
+ return startupUXSrv(pSrv);
+ }
+
+ lstnIP = pSrv->lstnIP == NULL ? UCHAR_CONSTANT("") : pSrv->lstnIP;
+
+ DBGPRINTF("imptcp: creating listen socket on server '%s', port %s\n", lstnIP, pSrv->port);
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_family = glbl.GetDefPFFamily(runModConf->pConf);
+ hints.ai_socktype = SOCK_STREAM;
+
+ error = getaddrinfo((char*)pSrv->lstnIP, (char*) pSrv->port, &hints, &res);
+ if(error) {
+ DBGPRINTF("error %d querying server '%s', port '%s'\n", error, pSrv->lstnIP, pSrv->port);
+ ABORT_FINALIZE(RS_RET_INVALID_PORT);
+ }
+
+ /* Count max number of sockets we may open */
+ for(maxs = 0, r = res; r != NULL ; r = r->ai_next, maxs++) {
+ /* EMPTY */;
+ }
+
+ numSocks = 0; /* num of sockets counter at start of array */
+ for(r = res; r != NULL ; r = r->ai_next) {
+ if(port_override != 0) {
+ savecast.sa = (struct sockaddr*)r->ai_addr;
+ if(r->ai_family == AF_INET6) {
+ savecast.ipv6->sin6_port = port_override;
+ } else {
+ savecast.ipv4->sin_port = port_override;
+ }
+ }
+ sock = socket(r->ai_family, r->ai_socktype, r->ai_protocol);
+ if(sock < 0) {
+ if(!(r->ai_family == PF_INET6 && errno == EAFNOSUPPORT)) {
+ DBGPRINTF("error %d creating tcp listen socket", errno);
+ /* it is debatable if PF_INET with EAFNOSUPPORT should
+ * also be ignored...
+ */
+ }
+ continue;
+ }
+
+ if(r->ai_family == AF_INET6) {
+ isIPv6 = 1;
+#ifdef IPV6_V6ONLY
+ int iOn = 1;
+ if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&iOn, sizeof (iOn)) < 0) {
+ close(sock);
+ sock = -1;
+ continue;
+ }
+#endif
+ } else {
+ isIPv6 = 0;
+ }
+
+ if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)) < 0 ) {
+ DBGPRINTF("error %d setting tcp socket option\n", errno);
+ close(sock);
+ sock = -1;
+ continue;
+ }
+
+ /* We use non-blocking IO! */
+ if((sockflags = fcntl(sock, F_GETFL)) != -1) {
+ sockflags |= O_NONBLOCK;
+ /* SETFL could fail too, so get it caught by the subsequent
+ * error check.
+ */
+ sockflags = fcntl(sock, F_SETFL, sockflags);
+ }
+ if(sockflags == -1) {
+ DBGPRINTF("error %d setting fcntl(O_NONBLOCK) on tcp socket", errno);
+ close(sock);
+ sock = -1;
+ continue;
+ }
+
+
+
+ /* We need to enable BSD compatibility. Otherwise an attacker
+ * could flood our log files by sending us tons of ICMP errors.
+ */
+#if !defined (_AIX)
+#ifndef BSD
+ if(net.should_use_so_bsdcompat()) {
+ if (setsockopt(sock, SOL_SOCKET, SO_BSDCOMPAT,
+ (char *) &on, sizeof(on)) < 0) {
+ LogError(errno, NO_ERRCODE, "TCP setsockopt(BSDCOMPAT)");
+ close(sock);
+ sock = -1;
+ continue;
+ }
+ }
+#endif
+#endif
+ if( (bind(sock, r->ai_addr, r->ai_addrlen) < 0)
+#ifndef IPV6_V6ONLY
+ && (errno != EADDRINUSE)
+#endif
+ ) {
+ /* TODO: check if *we* bound the socket - else we *have* an error! */
+ LogError(errno, NO_ERRCODE, "Error while binding tcp socket");
+ close(sock);
+ sock = -1;
+ continue;
+ }
+
+ /* if we bind to dynamic port (port 0 given), we will do so consistently. Thus
+ * once we got a dynamic port, we will keep it and use it for other protocols
+ * as well. As of my understanding, this should always work as the OS does not
+ * pick a port that is used by some protocol (well, at least this looks very
+ * unlikely...). If our asusmption is wrong, we should iterate until we find a
+ * combination that works - it is very unusual to have the same service listen
+ * on differnt ports on IPv4 and IPv6.
+ */
+ savecast.sa = (struct sockaddr*)r->ai_addr;
+ const int currport = (isIPv6) ? savecast.ipv6->sin6_port : savecast.ipv4->sin_port;
+ if(currport == 0) {
+ socklen_t socklen_r = r->ai_addrlen;
+ if(getsockname(sock, r->ai_addr, &socklen_r) == -1) {
+ LogError(errno, NO_ERRCODE, "nsd_ptcp: ListenPortFileName: getsockname:"
+ "error while trying to get socket");
+ }
+ r->ai_addrlen = socklen_r;
+ savecast.sa = (struct sockaddr*)r->ai_addr;
+ port_override = (isIPv6) ? savecast.ipv6->sin6_port : savecast.ipv4->sin_port;
+ if(pSrv->pszLstnPortFileName != NULL) {
+ FILE *fp;
+ if((fp = fopen((const char*)pSrv->pszLstnPortFileName, "w+")) == NULL) {
+ LogError(errno, RS_RET_IO_ERROR, "imptcp: ListenPortFileName: "
+ "error while trying to open file");
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ }
+ if(isIPv6) {
+ fprintf(fp, "%d", ntohs(savecast.ipv6->sin6_port));
+ } else {
+ fprintf(fp, "%d", ntohs(savecast.ipv4->sin_port));
+ }
+ fclose(fp);
+ }
+ }
+
+ if(listen(sock, pSrv->socketBacklog) < 0) {
+ LogError(errno, NO_ERRCODE, "imptcp error listening on port");
+ DBGPRINTF("tcp listen error %d, suspending\n", errno);
+ close(sock);
+ sock = -1;
+ continue;
+ }
+
+ /* if we reach this point, we were able to obtain a valid socket, so we can
+ * create our listener object. -- rgerhards, 2010-08-10
+ */
+ CHKiRet(addLstn(pSrv, sock, isIPv6));
+ ++numSocks;
+ }
+
+ if(numSocks != maxs) {
+ DBGPRINTF("We could initialize %d TCP listen sockets out of %d we received "
+ "- this may or may not be an error indication.\n", numSocks, maxs);
+ }
+
+ if(numSocks == 0) {
+ DBGPRINTF("No TCP listen sockets could successfully be initialized");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_BIND);
+ }
+
+finalize_it:
+ if(res != NULL) {
+ freeaddrinfo(res);
+ }
+
+ if(iRet != RS_RET_OK) {
+ if(sock != -1) {
+ close(sock);
+ }
+ }
+
+ RETiRet;
+}
+PRAGMA_DIAGNOSTIC_POP
+
+/* Set pRemHost based on the address provided. This is to be called upon accept()ing
+ * a connection request. It must be provided by the socket we received the
+ * message on as well as a NI_MAXHOST size large character buffer for the FQDN.
+ * Please see http://www.hmug.org/man/3/getnameinfo.php (under Caveats)
+ * for some explanation of the code found below. If we detect a malicious
+ * hostname, we return RS_RET_MALICIOUS_HNAME and let the caller decide
+ * on how to deal with that.
+ * rgerhards, 2008-03-31
+ */
+static rsRetVal
+getPeerNames(prop_t **peerName, prop_t **peerIP, struct sockaddr *pAddr, sbool bUXServer)
+{
+ int error;
+ uchar szIP[NI_MAXHOST+1] = "";
+ uchar szHname[NI_MAXHOST+1] = "";
+ struct addrinfo hints, *res;
+ sbool bMaliciousHName = 0;
+
+ DEFiRet;
+
+ *peerName = NULL;
+ *peerIP = NULL;
+
+ if (bUXServer) {
+ strncpy((char *) szHname, (char *) glbl.GetLocalHostName(), NI_MAXHOST);
+ strncpy((char *) szIP, (char *) glbl.GetLocalHostIP(), NI_MAXHOST);
+ szHname[NI_MAXHOST] = '\0';
+ szIP[NI_MAXHOST] = '\0';
+ } else {
+ error = getnameinfo(pAddr, SALEN(pAddr), (char *) szIP, sizeof(szIP), NULL, 0, NI_NUMERICHOST);
+ if (error) {
+ DBGPRINTF("Malformed from address %s\n", gai_strerror(error));
+ strcpy((char *) szHname, "???");
+ strcpy((char *) szIP, "???");
+ ABORT_FINALIZE(RS_RET_INVALID_HNAME);
+ }
+
+ if (!glbl.GetDisableDNS(runConf)) {
+ error = getnameinfo(pAddr, SALEN(pAddr), (char *) szHname, NI_MAXHOST, NULL, 0, NI_NAMEREQD);
+ if (error == 0) {
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_flags = AI_NUMERICHOST;
+ hints.ai_socktype = SOCK_STREAM;
+ /* we now do a lookup once again. This one should fail,
+ * because we should not have obtained a non-numeric address. If
+ * we got a numeric one, someone messed with DNS!
+ */
+ if (getaddrinfo((char *) szHname, NULL, &hints, &res) == 0) {
+ freeaddrinfo(res);
+ /* OK, we know we have evil, so let's indicate this to our caller */
+ snprintf((char *) szHname, sizeof(szHname), "[MALICIOUS:IP=%s]", szIP);
+ DBGPRINTF("Malicious PTR record, IP = \"%s\" HOST = \"%s\"", szIP, szHname);
+ bMaliciousHName = 1;
+ }
+ } else {
+ strcpy((char *) szHname, (char *) szIP);
+ }
+ } else {
+ strcpy((char *) szHname, (char *) szIP);
+ }
+ }
+
+ /* We now have the names, so now let's allocate memory and store them permanently. */
+ CHKiRet(prop.Construct(peerName));
+ CHKiRet(prop.SetString(*peerName, szHname, ustrlen(szHname)));
+ CHKiRet(prop.ConstructFinalize(*peerName));
+ CHKiRet(prop.Construct(peerIP));
+ CHKiRet(prop.SetString(*peerIP, szIP, ustrlen(szIP)));
+ CHKiRet(prop.ConstructFinalize(*peerIP));
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ if(*peerName != NULL)
+ prop.Destruct(peerName);
+ if(*peerIP != NULL)
+ prop.Destruct(peerIP);
+ }
+ if(bMaliciousHName)
+ iRet = RS_RET_MALICIOUS_HNAME;
+ RETiRet;
+}
+
+
+/* Enable KEEPALIVE handling on the socket. */
+static rsRetVal
+EnableKeepAlive(ptcplstn_t *pLstn, int sock)
+{
+ int ret;
+ int optval;
+ socklen_t optlen;
+ DEFiRet;
+
+ optval = 1;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen);
+ if(ret < 0) {
+ dbgprintf("EnableKeepAlive socket call returns error %d\n", ret);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+# if defined(TCP_KEEPCNT)
+ if(pLstn->pSrv->iKeepAliveProbes > 0) {
+ optval = pLstn->pSrv->iKeepAliveProbes;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT, &optval, optlen);
+ } else {
+ ret = 0;
+ }
+# else
+ ret = -1;
+# endif
+ if(ret < 0) {
+ LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive probes - ignored");
+ }
+
+# if defined(TCP_KEEPCNT)
+ if(pLstn->pSrv->iKeepAliveTime > 0) {
+ optval = pLstn->pSrv->iKeepAliveTime;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, IPPROTO_TCP, TCP_KEEPIDLE, &optval, optlen);
+ } else {
+ ret = 0;
+ }
+# else
+ ret = -1;
+# endif
+ if(ret < 0) {
+ LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive time - ignored");
+ }
+
+# if defined(TCP_KEEPCNT)
+ if(pLstn->pSrv->iKeepAliveIntvl > 0) {
+ optval = pLstn->pSrv->iKeepAliveIntvl;
+ optlen = sizeof(optval);
+ ret = setsockopt(sock, IPPROTO_TCP, TCP_KEEPINTVL, &optval, optlen);
+ } else {
+ ret = 0;
+ }
+# else
+ ret = -1;
+# endif
+ if(ret < 0) {
+ LogError(errno, NO_ERRCODE, "imptcp cannot set keepalive intvl - ignored");
+ }
+
+ dbgprintf("KEEPALIVE enabled for socket %d\n", sock);
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* accept an incoming connection request
+ * rgerhards, 2008-04-22
+ */
+static rsRetVal ATTR_NONNULL()
+AcceptConnReq(ptcplstn_t *const pLstn, int *const newSock, prop_t **peerName, prop_t **peerIP)
+{
+ int sockflags;
+ struct sockaddr_storage addr;
+ socklen_t addrlen = sizeof(addr);
+ int iNewSock = -1;
+
+ DEFiRet;
+
+ *peerName = NULL; /* ensure we know if we don't have one! */
+ iNewSock = accept(pLstn->sock, (struct sockaddr*) &addr, &addrlen);
+ if(iNewSock < 0) {
+ if(CHK_EAGAIN_EWOULDBLOCK || errno == EMFILE)
+ ABORT_FINALIZE(RS_RET_NO_MORE_DATA);
+ LogError(errno, RS_RET_ACCEPT_ERR, "error accepting connection "
+ "on listen socket %d", pLstn->sock);
+ ABORT_FINALIZE(RS_RET_ACCEPT_ERR);
+ }
+ if(addrlen == 0) {
+ LogError(errno, RS_RET_ACCEPT_ERR, "AcceptConnReq could not obtain "
+ "remote peer identification on listen socket %d", pLstn->sock);
+ }
+
+ if(pLstn->pSrv->bKeepAlive)
+ EnableKeepAlive(pLstn, iNewSock);/* we ignore errors, best to do! */
+
+ CHKiRet(getPeerNames(peerName, peerIP, (struct sockaddr *) &addr, pLstn->pSrv->bUnixSocket));
+
+ /* set the new socket to non-blocking IO */
+ if((sockflags = fcntl(iNewSock, F_GETFL)) != -1) {
+ sockflags |= O_NONBLOCK;
+ /* SETFL could fail too, so get it caught by the subsequent
+ * error check.
+ */
+ sockflags = fcntl(iNewSock, F_SETFL, sockflags);
+ }
+
+ if(sockflags == -1) {
+ LogError(errno, RS_RET_IO_ERROR, "error setting fcntl(O_NONBLOCK) on "
+ "tcp socket %d", iNewSock);
+ prop.Destruct(peerName);
+ prop.Destruct(peerIP);
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ }
+ if(pLstn->pSrv->bEmitMsgOnOpen) {
+ LogMsg(0, RS_RET_NO_ERRCODE, LOG_INFO,
+ "imptcp: connection established with host: %s",
+ propGetSzStr(*peerName));
+ }
+
+ STATSCOUNTER_INC(pLstn->ctrSessOpen, pLstn->mutCtrSessOpen);
+ *newSock = iNewSock;
+
+finalize_it:
+ DBGPRINTF("iRet: %d\n", iRet);
+ if(iRet != RS_RET_OK) {
+ if(iRet != RS_RET_NO_MORE_DATA && pLstn->pSrv->bEmitMsgOnOpen) {
+ LogError(0, NO_ERRCODE, "imptcp: connection could not be "
+ "established with host: %s",
+ *peerName == NULL ? "(could not query)"
+ : (const char*)propGetSzStr(*peerName));
+ }
+ STATSCOUNTER_INC(pLstn->ctrSessOpenErr, pLstn->mutCtrSessOpenErr);
+ /* the close may be redundant, but that doesn't hurt... */
+ if(iNewSock != -1)
+ close(iNewSock);
+ }
+
+ RETiRet;
+}
+
+
+/* This is a helper for submitting the message to the rsyslog core.
+ * It does some common processing, including resetting the various
+ * state variables to a "processed" state.
+ * Note that this function is also called if we had a buffer overflow
+ * due to a too-long message. So far, there is no indication this
+ * happened and it may be worth thinking about different handling
+ * of this case (what obviously would require a change to this
+ * function or some related code).
+ * rgerhards, 2009-04-23
+ * EXTRACT from tcps_sess.c
+ */
+static rsRetVal
+doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub)
+{
+ smsg_t *pMsg;
+ ptcpsrv_t *pSrv;
+ DEFiRet;
+
+ if(pThis->iMsg == 0) {
+ DBGPRINTF("discarding zero-sized message\n");
+ FINALIZE;
+ }
+ pSrv = pThis->pLstn->pSrv;
+
+ /* we now create our own message object and submit it to the queue */
+ CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime));
+ MsgSetRawMsg(pMsg, (char*)pThis->pMsg, pThis->iMsg);
+ MsgSetInputName(pMsg, pSrv->pInputName);
+ MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY);
+ if(pSrv->dfltTZ != NULL)
+ MsgSetDfltTZ(pMsg, (char*) pSrv->dfltTZ);
+ MsgSetFlowControlType(pMsg, pSrv->flowControl
+ ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY);
+ pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
+ MsgSetRcvFrom(pMsg, pThis->peerName);
+ CHKiRet(MsgSetRcvFromIP(pMsg, pThis->peerIP));
+ MsgSetRuleset(pMsg, pSrv->pRuleset);
+ STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit);
+
+ ratelimitAddMsg(pSrv->ratelimiter, pMultiSub, pMsg);
+
+finalize_it:
+ /* reset status variables */
+ pThis->bAtStrtOfFram = 1;
+ pThis->iMsg = 0;
+
+ RETiRet;
+}
+
+
+/* process the data received, special case if the framing is specified via
+ * a regex. For more info see processDataRcvd().
+ */
+static rsRetVal ATTR_NONNULL()
+processDataRcvd_regexFraming(ptcpsess_t *const __restrict__ pThis,
+ char **const buff,
+ struct syslogTime *const stTime,
+ const time_t ttGenTime,
+ multi_submit_t *const pMultiSub,
+ unsigned *const __restrict__ pnMsgs)
+{
+ DEFiRet;
+ const instanceConf_t *const inst = pThis->pLstn->pSrv->inst;
+ assert(inst->startRegex != NULL);
+ const char c = **buff;
+
+ pThis->pMsg[pThis->iMsg++] = c;
+ pThis->pMsg[pThis->iMsg] = '\0';
+
+ if(pThis->iMsg == 2*iMaxLine) {
+ LogError(0, RS_RET_OVERSIZE_MSG, "imptcp: more then double max message size (%d) "
+ "received without finding frame terminator via regex - assuming "
+ "end of frame now.", pThis->iMsg+1);
+ doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub);
+ ++(*pnMsgs);
+ pThis->iMsg = 0;
+ pThis->iCurrLine = 1;
+ }
+
+
+ if(c == '\n') {
+ pThis->iCurrLine = pThis->iMsg;
+ } else {
+ const int isMatch = !regexec(&inst->start_preg, (char*)pThis->pMsg+pThis->iCurrLine, 0, NULL, 0);
+ if(isMatch) {
+ DBGPRINTF("regex match (%d), framing line: %s\n", pThis->iCurrLine, pThis->pMsg);
+ strcpy((char*)pThis->pMsg_save, (char*) pThis->pMsg+pThis->iCurrLine);
+ pThis->iMsg = pThis->iCurrLine - 1;
+
+ doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub);
+ ++(*pnMsgs);
+
+ strcpy((char*)pThis->pMsg, (char*)pThis->pMsg_save);
+ pThis->iMsg = ustrlen(pThis->pMsg_save);
+ pThis->iCurrLine = 1;
+ }
+ }
+
+ RETiRet;
+}
+
+
+/* process the data received. As TCP is stream based, we need to process the
+ * data inside a state machine. The actual data received is passed in byte-by-byte
+ * from DataRcvd, and this function here compiles messages from them and submits
+ * the end result to the queue. Introducing this function fixes a long-term bug ;)
+ * rgerhards, 2008-03-14
+ * EXTRACT from tcps_sess.c
+ */
+static rsRetVal ATTR_NONNULL(1, 2)
+processDataRcvd(ptcpsess_t *const __restrict__ pThis,
+ char **buff,
+ const int buffLen,
+ struct syslogTime *stTime,
+ const time_t ttGenTime,
+ multi_submit_t *pMultiSub,
+ unsigned *const __restrict__ pnMsgs)
+{
+ DEFiRet;
+ const char c = **buff;
+ int octetsToCopy, octetsToDiscard;
+
+ if(pThis->startRegex != NULL) {
+ processDataRcvd_regexFraming(pThis, buff, stTime, ttGenTime, pMultiSub, pnMsgs);
+ FINALIZE;
+ }
+
+ if(pThis->inputState == eAtStrtFram) {
+ if(pThis->bSuppOctetFram && isdigit((int) c)) {
+ pThis->inputState = eInOctetCnt;
+ pThis->iOctetsRemain = 0;
+ pThis->eFraming = TCP_FRAMING_OCTET_COUNTING;
+ } else if(pThis->bSPFramingFix && c == ' ') {
+ /* Cisco very occasionally sends a SP after a LF, which
+ * thrashes framing if not taken special care of. Here,
+ * we permit space *in front of the next frame* and
+ * ignore it.
+ */
+ FINALIZE;
+ } else {
+ pThis->inputState = eInMsg;
+ pThis->eFraming = TCP_FRAMING_OCTET_STUFFING;
+ }
+ }
+
+ if(pThis->inputState == eInOctetCnt) {
+ uchar *propPeerName = NULL;
+ int lenPeerName = 0;
+ uchar *propPeerIP = NULL;
+ int lenPeerIP = 0;
+ if(isdigit(c)) {
+ if(pThis->iOctetsRemain <= 200000000) {
+ pThis->iOctetsRemain = pThis->iOctetsRemain * 10 + c - '0';
+ }
+ if(pThis->iMsg < iMaxLine) {
+ *(pThis->pMsg + pThis->iMsg++) = c;
+ }
+ } else { /* done with the octet count, so this must be the SP terminator */
+ DBGPRINTF("TCP Message with octet-counter, size %d.\n", pThis->iOctetsRemain);
+ prop.GetString(pThis->peerName, &propPeerName, &lenPeerName);
+ prop.GetString(pThis->peerIP, &propPeerIP, &lenPeerIP);
+ if(c != ' ') {
+ LogError(0, NO_ERRCODE, "Framing Error in received TCP message "
+ "from peer: (hostname) %s, (ip) %s: delimiter is not "
+ "SP but has ASCII value %d.", propPeerName, propPeerIP, c);
+ }
+ if(pThis->iOctetsRemain < 1) {
+ /* TODO: handle the case where the octet count is 0! */
+ LogError(0, NO_ERRCODE, "Framing Error in received TCP message"
+ " from peer: (hostname) %s, (ip) %s: invalid octet count %d.",
+ propPeerName, propPeerIP, pThis->iOctetsRemain);
+ pThis->eFraming = TCP_FRAMING_OCTET_STUFFING;
+ } else if(pThis->iOctetsRemain > iMaxLine) {
+ /* while we can not do anything against it, we can at least log an indication
+ * that something went wrong) -- rgerhards, 2008-03-14
+ */
+ DBGPRINTF("truncating message with %d octets - max msg size is %d\n",
+ pThis->iOctetsRemain, iMaxLine);
+ LogError(0, NO_ERRCODE, "received oversize message from peer: "
+ "(hostname) %s, (ip) %s: size is %d bytes, max msg "
+ "size is %d, truncating...", propPeerName, propPeerIP,
+ pThis->iOctetsRemain, iMaxLine);
+ }
+ if(pThis->iOctetsRemain > pThis->pLstn->pSrv->maxFrameSize) {
+ LogError(0, NO_ERRCODE, "Framing Error in received TCP message "
+ "from peer: (hostname) %s, (ip) %s: frame too large: %d, "
+ "change to octet stuffing", propPeerName, propPeerIP,
+ pThis->iOctetsRemain);
+ pThis->eFraming = TCP_FRAMING_OCTET_STUFFING;
+ } else {
+ pThis->iMsg = 0;
+ }
+ pThis->inputState = eInMsg;
+ }
+ } else if(pThis->inputState == eInMsgTruncation) {
+ if ((c == '\n')
+ || ((pThis->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER)
+ && (c == pThis->iAddtlFrameDelim))) {
+ pThis->inputState = eAtStrtFram;
+ }
+ } else {
+ assert(pThis->inputState == eInMsg);
+ if (pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) {
+ int iMsg = pThis->iMsg; /* cache value for faster access */
+ if(iMsg >= iMaxLine) {
+ /* emergency, we now need to flush, no matter if we are at end of message or not... */
+ int i = 1;
+ char currBuffChar;
+ while(i < buffLen && ((currBuffChar = (*buff)[i]) != '\n'
+ && (pThis->iAddtlFrameDelim == TCPSRV_NO_ADDTL_DELIMITER
+ || currBuffChar != pThis->iAddtlFrameDelim))) {
+ i++;
+ }
+ LogError(0, NO_ERRCODE, "imptcp %s: message received is at least %d byte larger than "
+ "max msg size; message will be split starting at: \"%.*s\"\n",
+ pThis->pLstn->pSrv->pszInputName, i, (i < 32) ? i : 32, *buff);
+ doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub);
+ iMsg = 0;
+ ++(*pnMsgs);
+ if(pThis->pLstn->pSrv->discardTruncatedMsg == 1) {
+ pThis->inputState = eInMsgTruncation;
+ }
+ /* we might think if it is better to ignore the rest of the
+ * message than to treat it as a new one. Maybe this is a good
+ * candidate for a configuration parameter...
+ * rgerhards, 2006-12-04
+ */
+ }
+ if ((c == '\n')
+ || ((pThis->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER)
+ && (c == pThis->iAddtlFrameDelim))
+ ) { /* record delimiter? */
+ if(pThis->pLstn->pSrv->multiLine) {
+ if((buffLen == 1) || ((*buff)[1] == '<')) {
+ doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub);
+ iMsg = 0; /* Reset cached value! */
+ ++(*pnMsgs);
+ pThis->inputState = eAtStrtFram;
+ } else {
+ if(iMsg < iMaxLine) {
+ pThis->pMsg[iMsg++] = c;
+ }
+ }
+ } else {
+ doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub);
+ iMsg = 0; /* Reset cached value! */
+ ++(*pnMsgs);
+ pThis->inputState = eAtStrtFram;
+ }
+ } else {
+ /* IMPORTANT: here we copy the actual frame content to the message - for BOTH
+ * framing modes! If we have a message that is larger than the max msg size,
+ * we truncate it. This is the best we can do in light of what the engine supports.
+ * -- rgerhards, 2008-03-14
+ */
+ if(likely(iMsg < iMaxLine)) {
+ pThis->pMsg[iMsg++] = c;
+ }
+ }
+ pThis->iMsg = iMsg; /* update "real value" with cached one */
+ } else {
+ assert(pThis->eFraming == TCP_FRAMING_OCTET_COUNTING);
+ octetsToCopy = pThis->iOctetsRemain;
+ octetsToDiscard = 0;
+ if (buffLen < octetsToCopy) {
+ octetsToCopy = buffLen;
+ }
+ if (octetsToCopy + pThis->iMsg > iMaxLine) {
+ octetsToDiscard = octetsToCopy - (iMaxLine - pThis->iMsg);
+ octetsToCopy = iMaxLine - pThis->iMsg;
+ }
+
+ memcpy(pThis->pMsg + pThis->iMsg, *buff, octetsToCopy);
+ pThis->iMsg += octetsToCopy;
+ pThis->iOctetsRemain -= (octetsToCopy + octetsToDiscard);
+ *buff += (octetsToCopy + octetsToDiscard - 1);
+ if (pThis->iOctetsRemain == 0) {
+ /* we have end of frame! */
+ doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub);
+ ++(*pnMsgs);
+ pThis->inputState = eAtStrtFram;
+ }
+ }
+
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* Processes the data received via a TCP session. If there
+ * is no other way to handle it, data is discarded.
+ * Input parameter data is the data received, iLen is its
+ * len as returned from recv(). iLen must be 1 or more (that
+ * is errors must be handled by caller!). iTCPSess must be
+ * the index of the TCP session that received the data.
+ * rgerhards 2005-07-04
+ * And another change while generalizing. We now return either
+ * RS_RET_OK, which means the session should be kept open
+ * or anything else, which means it must be closed.
+ * rgerhards, 2008-03-01
+ * As a performance optimization, we pick up the timestamp here. Acutally,
+ * this *is* the *correct* reception step for all the data we received, because
+ * we have just received a bunch of data! -- rgerhards, 2009-06-16
+ * EXTRACT from tcps_sess.c
+ */
+static rsRetVal ATTR_NONNULL(1, 2)
+DataRcvdUncompressed(ptcpsess_t *pThis, char *pData, const size_t iLen, struct syslogTime *stTime, time_t ttGenTime)
+{
+ multi_submit_t multiSub;
+ smsg_t *pMsgs[CONF_NUM_MULTISUB];
+ char *pEnd;
+ unsigned nMsgs = 0;
+ DEFiRet;
+
+ assert(iLen > 0);
+
+ if(ttGenTime == 0)
+ datetime.getCurrTime(stTime, &ttGenTime, TIME_IN_LOCALTIME);
+ multiSub.ppMsgs = pMsgs;
+ multiSub.maxElem = CONF_NUM_MULTISUB;
+ multiSub.nElem = 0;
+
+ /* We now copy the message to the session buffer. */
+ pEnd = pData + iLen; /* this is one off, which is intensional */
+
+ while(pData < pEnd) {
+ CHKiRet(processDataRcvd(pThis, &pData, pEnd - pData, stTime, ttGenTime, &multiSub, &nMsgs));
+ pData++;
+ }
+
+ iRet = multiSubmitFlush(&multiSub);
+
+ if(runConf->globals.senderKeepTrack)
+ statsRecordSender(propGetSzStr(pThis->peerName), nMsgs, ttGenTime);
+
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal
+DataRcvdCompressed(ptcpsess_t *pThis, char *buf, size_t len)
+{
+ struct syslogTime stTime;
+ time_t ttGenTime;
+ int zRet; /* zlib return state */
+ unsigned outavail;
+ uchar zipBuf[64*1024]; // TODO: alloc on heap, and much larger (512KiB? batch size!)
+ DEFiRet;
+ // TODO: can we do stats counters? Even if they are not 100% correct under all cases,
+ // by simply updating the input and output sizes?
+ uint64_t outtotal;
+
+ datetime.getCurrTime(&stTime, &ttGenTime, TIME_IN_LOCALTIME);
+ outtotal = 0;
+
+ if(!pThis->bzInitDone) {
+ /* allocate deflate state */
+ pThis->zstrm.zalloc = Z_NULL;
+ pThis->zstrm.zfree = Z_NULL;
+ pThis->zstrm.opaque = Z_NULL;
+ zRet = inflateInit(&pThis->zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("imptcp: error %d returned from zlib/inflateInit()\n", zRet);
+ ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ }
+ pThis->bzInitDone = RSTRUE;
+ }
+
+ pThis->zstrm.next_in = (Bytef*) buf;
+ pThis->zstrm.avail_in = len;
+ /* run inflate() on buffer until everything has been uncompressed */
+ do {
+ DBGPRINTF("imptcp: in inflate() loop, avail_in %d, total_in %ld\n",
+ pThis->zstrm.avail_in, pThis->zstrm.total_in);
+ pThis->zstrm.avail_out = sizeof(zipBuf);
+ pThis->zstrm.next_out = zipBuf;
+ zRet = inflate(&pThis->zstrm, Z_SYNC_FLUSH); /* no bad return value */
+ //zRet = inflate(&pThis->zstrm, Z_NO_FLUSH); /* no bad return value */
+ DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
+ outavail = sizeof(zipBuf) - pThis->zstrm.avail_out;
+ if(outavail != 0) {
+ outtotal += outavail;
+ pThis->pLstn->rcvdDecompressed += outavail;
+ CHKiRet(DataRcvdUncompressed(pThis, (char*)zipBuf, outavail, &stTime, ttGenTime));
+ }
+ } while (pThis->zstrm.avail_out == 0);
+
+ dbgprintf("end of DataRcvCompress, sizes: in %lld, out %llu\n", (long long) len,
+ (long long unsigned) outtotal);
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal
+DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
+{
+ struct syslogTime stTime;
+ DEFiRet;
+ ATOMIC_ADD_uint64(&pThis->pLstn->rcvdBytes, &pThis->pLstn->mut_rcvdBytes, iLen);
+ if(pThis->compressionMode >= COMPRESS_STREAM_ALWAYS)
+ iRet = DataRcvdCompressed(pThis, pData, iLen);
+ else
+ iRet = DataRcvdUncompressed(pThis, pData, iLen, &stTime, 0);
+ RETiRet;
+}
+
+
+/****************************************** --END-- TCP SUPPORT FUNCTIONS ***********************************/
+
+
+static void
+initConfigSettings(void)
+{
+ cs.bEmitMsgOnClose = 0;
+ cs.bEmitMsgOnOpen = 0;
+ cs.wrkrMax = DFLT_wrkrMax;
+ cs.bSuppOctetFram = 1;
+ cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
+ cs.maxFrameSize = 200000;
+ cs.pszInputName = NULL;
+ cs.pszBindRuleset = NULL;
+ cs.pszInputName = NULL;
+ cs.lstnIP = NULL;
+}
+
+
+/* add socket to the epoll set
+ */
+static rsRetVal
+addEPollSock(epolld_type_t typ, void *ptr, int sock, epolld_t **pEpd)
+{
+ DEFiRet;
+ epolld_t *epd = NULL;
+
+ CHKmalloc(epd = calloc(1, sizeof(epolld_t)));
+ epd->typ = typ;
+ epd->ptr = ptr;
+ epd->sock = sock;
+ *pEpd = epd;
+ epd->ev.events = EPOLLIN|EPOLLONESHOT;
+ epd->ev.data.ptr = (void*) epd;
+
+ if(epoll_ctl(epollfd, EPOLL_CTL_ADD, sock, &(epd->ev)) != 0) {
+ LogError(errno, RS_RET_EPOLL_CTL_FAILED, "os error during epoll ADD");
+ ABORT_FINALIZE(RS_RET_EPOLL_CTL_FAILED);
+ }
+
+ DBGPRINTF("imptcp: added socket %d to epoll[%d] set\n", sock, epollfd);
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ if (epd != NULL) {
+ LogError(0, RS_RET_INTERNAL_ERROR, "error: could not initialize mutex for ptcp "
+ "connection for socket: %d", sock);
+ }
+ free(epd);
+ }
+ RETiRet;
+}
+
+
+/* add a listener to the server
+ */
+static rsRetVal
+addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6)
+{
+ DEFiRet;
+ ptcplstn_t *pLstn = NULL;
+ uchar statname[64];
+
+ CHKmalloc(pLstn = calloc(1, sizeof(ptcplstn_t)));
+ pLstn->pSrv = pSrv;
+ pLstn->bSuppOctetFram = pSrv->bSuppOctetFram;
+ pLstn->bSPFramingFix = pSrv->bSPFramingFix;
+ pLstn->sock = sock;
+ /* support statistics gathering */
+ uchar *inputname;
+ if(pSrv->pszInputName == NULL) {
+ inputname = (uchar*)"imptcp";
+ } else {
+ inputname = pSrv->pszInputName;
+ }
+ CHKiRet(statsobj.Construct(&(pLstn->stats)));
+ snprintf((char*)statname, sizeof(statname), "%s(%s/%s/%s)", inputname,
+ (pSrv->lstnIP == NULL) ? "*" : (char*)pSrv->lstnIP, pSrv->port,
+ isIPv6 ? "IPv6" : "IPv4");
+ statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(statsobj.SetName(pLstn->stats, statname));
+ CHKiRet(statsobj.SetOrigin(pLstn->stats, (uchar*)"imptcp"));
+ STATSCOUNTER_INIT(pLstn->ctrSubmit, pLstn->mutCtrSubmit);
+ CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->ctrSubmit)));
+ STATSCOUNTER_INIT(pLstn->ctrSessOpen, pLstn->mutCtrSessOpen);
+ CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("sessions.opened"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->ctrSessOpen)));
+ STATSCOUNTER_INIT(pLstn->ctrSessOpenErr, pLstn->mutCtrSessOpenErr);
+ CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("sessions.openfailed"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->ctrSessOpenErr)));
+ STATSCOUNTER_INIT(pLstn->ctrSessClose, pLstn->mutCtrSessClose);
+ CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("sessions.closed"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->ctrSessClose)));
+ /* the following counters are not protected by mutexes; we accept
+ * that they may not be 100% correct */
+ pLstn->rcvdBytes = 0,
+ pLstn->rcvdDecompressed = 0;
+ INIT_ATOMIC_HELPER_MUT64(pLstn->mut_rcvdBytes);
+ CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("bytes.received"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->rcvdBytes)));
+ CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("bytes.decompressed"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->rcvdDecompressed)));
+ CHKiRet(statsobj.ConstructFinalize(pLstn->stats));
+
+ CHKiRet(addEPollSock(epolld_lstn, pLstn, sock, &pLstn->epd));
+
+ /* add to start of server's listener list */
+ pLstn->prev = NULL;
+ pLstn->next = pSrv->pLstn;
+ if(pSrv->pLstn != NULL)
+ pSrv->pLstn->prev = pLstn;
+ pSrv->pLstn = pLstn;
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ if(pLstn != NULL) {
+ if(pLstn->stats != NULL)
+ statsobj.Destruct(&(pLstn->stats));
+ free(pLstn);
+ }
+ }
+
+ RETiRet;
+}
+
+
+/* add a session to the server
+ */
+static rsRetVal
+addSess(ptcplstn_t *pLstn, int sock, prop_t *peerName, prop_t *peerIP)
+{
+ DEFiRet;
+ ptcpsess_t *pSess = NULL;
+ ptcpsrv_t *pSrv = pLstn->pSrv;
+ int pmsg_size_factor;
+
+ CHKmalloc(pSess = malloc(sizeof(ptcpsess_t)));
+ pSess->next = NULL;
+ if(pLstn->pSrv->inst->startRegex == NULL) {
+ pmsg_size_factor = 1;
+ pSess->pMsg_save = NULL;
+ } else {
+ pmsg_size_factor = 2;
+ pSess->pMsg = NULL;
+ CHKmalloc(pSess->pMsg_save = malloc(1 + iMaxLine * pmsg_size_factor));
+ }
+ CHKmalloc(pSess->pMsg = malloc(1 + iMaxLine * pmsg_size_factor));
+ pSess->pLstn = pLstn;
+ pSess->sock = sock;
+ pSess->bSuppOctetFram = pLstn->bSuppOctetFram;
+ pSess->bSPFramingFix = pLstn->bSPFramingFix;
+ pSess->inputState = eAtStrtFram;
+ pSess->iMsg = 0;
+ pSess->iCurrLine = 1;
+ pSess->bzInitDone = 0;
+ pSess->bAtStrtOfFram = 1;
+ pSess->peerName = peerName;
+ pSess->peerIP = peerIP;
+ pSess->compressionMode = pLstn->pSrv->compressionMode;
+ pSess->startRegex = pLstn->pSrv->inst->startRegex;
+ pSess->iAddtlFrameDelim = pLstn->pSrv->iAddtlFrameDelim;
+
+ /* add to start of server's listener list */
+ pSess->prev = NULL;
+
+ pthread_mutex_lock(&pSrv->mutSessLst);
+ int iTCPSessMax = pSrv->inst->iTCPSessMax;
+ if (iTCPSessMax > 0 && pSrv->iTCPSessCnt >= iTCPSessMax) {
+ pthread_mutex_unlock(&pSrv->mutSessLst);
+ LogError(0, RS_RET_MAX_SESS_REACHED,
+ "too many tcp sessions - dropping incoming request");
+ ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED);
+ }
+
+ pSrv->iTCPSessCnt++;
+ pSess->next = pSrv->pSess;
+ if(pSrv->pSess != NULL)
+ pSrv->pSess->prev = pSess;
+ pSrv->pSess = pSess;
+ pthread_mutex_unlock(&pSrv->mutSessLst);
+
+ CHKiRet(addEPollSock(epolld_sess, pSess, sock, &pSess->epd));
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ if(pSess != NULL) {
+ if (pSess->next != NULL) {
+ unlinkSess(pSess);
+ }
+ free(pSess->pMsg_save);
+ free(pSess->pMsg);
+ free(pSess);
+ }
+ }
+
+ RETiRet;
+}
+
+
+/* finish zlib buffer, to be called before closing the session.
+ */
+static rsRetVal
+doZipFinish(ptcpsess_t *pSess)
+{
+ int zRet; /* zlib return state */
+ DEFiRet;
+ unsigned outavail;
+ struct syslogTime stTime;
+ uchar zipBuf[32*1024]; // TODO: use "global" one from pSess
+
+ if(!pSess->bzInitDone)
+ goto done;
+
+ pSess->zstrm.avail_in = 0;
+ /* run inflate() on buffer until everything has been compressed */
+ do {
+ DBGPRINTF("doZipFinish: in inflate() loop, avail_in %d, total_in %ld\n", pSess->zstrm.avail_in,
+ pSess->zstrm.total_in);
+ pSess->zstrm.avail_out = sizeof(zipBuf);
+ pSess->zstrm.next_out = zipBuf;
+ zRet = inflate(&pSess->zstrm, Z_FINISH); /* no bad return value */
+ DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pSess->zstrm.avail_out);
+ outavail = sizeof(zipBuf) - pSess->zstrm.avail_out;
+ if(outavail != 0) {
+ pSess->pLstn->rcvdDecompressed += outavail;
+ CHKiRet(DataRcvdUncompressed(pSess, (char*)zipBuf, outavail, &stTime, 0));
+ // TODO: query time!
+ }
+ } while (pSess->zstrm.avail_out == 0);
+
+finalize_it:
+ zRet = inflateEnd(&pSess->zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("imptcp: error %d returned from zlib/inflateEnd()\n", zRet);
+ }
+
+ pSess->bzInitDone = 0;
+done: RETiRet;
+}
+
+/* close/remove a session
+ * NOTE: we do not need to remove the socket from the epoll set, as according
+ * to the epoll man page it is automatically removed on close (Q6). The only
+ * exception is duplicated file handles, which we do not create.
+ */
+static rsRetVal
+closeSess(ptcpsess_t *pSess)
+{
+ DEFiRet;
+
+ if(pSess->compressionMode >= COMPRESS_STREAM_ALWAYS)
+ doZipFinish(pSess);
+
+ const int sock = pSess->sock;
+ close(sock);
+
+ unlinkSess(pSess);
+
+ if(pSess->pLstn->pSrv->bEmitMsgOnClose) {
+ LogMsg(0, RS_RET_NO_ERRCODE, LOG_INFO, "imptcp: session on socket %d closed "
+ "with iRet %d.\n", sock, iRet);
+ }
+ STATSCOUNTER_INC(pSess->pLstn->ctrSessClose, pSess->pLstn->mutCtrSessClose);
+
+ /* unlinked, now remove structure */
+ destructSess(pSess);
+
+ DBGPRINTF("imptcp: session on socket %d closed with iRet %d.\n", sock, iRet);
+ RETiRet;
+}
+
+
+/* create input instance, set default parameters, and
+ * add it to the list of instances.
+ */
+static rsRetVal
+createInstance(instanceConf_t **pinst)
+{
+ instanceConf_t *inst;
+ DEFiRet;
+ CHKmalloc(inst = malloc(sizeof(instanceConf_t)));
+ inst->next = NULL;
+
+ inst->pszBindPort = NULL;
+ inst->pszBindAddr = NULL;
+ inst->pszBindPath = NULL;
+ inst->fileUID = -1;
+ inst->fileGID = -1;
+ inst->maxFrameSize = 200000;
+ inst->fCreateMode = 0644;
+ inst->bFailOnPerms = 1;
+ inst->bUnlink = 0;
+ inst->discardTruncatedMsg = 0;
+ inst->flowControl = 1;
+ inst->pszBindRuleset = NULL;
+ inst->pszInputName = NULL;
+ inst->bSuppOctetFram = 1;
+ inst->bSPFramingFix = 0;
+ inst->bKeepAlive = 0;
+ inst->iKeepAliveIntvl = 0;
+ inst->iKeepAliveProbes = 0;
+ inst->iKeepAliveTime = 0;
+ inst->bEmitMsgOnClose = 0;
+ inst->bEmitMsgOnOpen = 0;
+ inst->dfltTZ = NULL;
+ inst->iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
+ inst->startRegex = NULL;
+ inst->pBindRuleset = NULL;
+ inst->ratelimitBurst = 10000; /* arbitrary high limit */
+ inst->ratelimitInterval = 0; /* off */
+ inst->compressionMode = COMPRESS_SINGLE_MSG;
+ inst->multiLine = 0;
+ inst->socketBacklog = 5;
+ inst->pszLstnPortFileName = NULL;
+ inst->iTCPSessMax = -1;
+
+ /* node created, let's add to config */
+ if(loadModConf->tail == NULL) {
+ loadModConf->tail = loadModConf->root = inst;
+ } else {
+ loadModConf->tail->next = inst;
+ loadModConf->tail = inst;
+ }
+
+ *pinst = inst;
+finalize_it:
+ RETiRet;
+}
+
+
+/* This function is called when a new listener instace shall be added to
+ * the current config object via the legacy config system. It just shuffles
+ * all parameters to the listener in-memory instance.
+ */
+static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *const pNewVal)
+{
+ instanceConf_t *inst;
+ DEFiRet;
+
+ if(pNewVal == NULL || *pNewVal == '\0') {
+ parser_errmsg("imptcp: port number must be specified, listener ignored");
+ ABORT_FINALIZE(RS_RET_PARAM_ERROR);
+ }
+
+ /* if we reach this point, a valid port is given in pNewVal */
+ CHKiRet(createInstance(&inst));
+ CHKmalloc(inst->pszBindPort = ustrdup(pNewVal));
+ if((cs.lstnIP == NULL) || (cs.lstnIP[0] == '\0')) {
+ inst->pszBindAddr = NULL;
+ } else {
+ CHKmalloc(inst->pszBindAddr = ustrdup(cs.lstnIP));
+ }
+ if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) {
+ inst->pszBindRuleset = NULL;
+ } else {
+ CHKmalloc(inst->pszBindRuleset = ustrdup(cs.pszBindRuleset));
+ }
+ if((cs.pszInputName == NULL) || (cs.pszInputName[0] == '\0')) {
+ inst->pszInputName = NULL;
+ } else {
+ CHKmalloc(inst->pszInputName = ustrdup(cs.pszInputName));
+ }
+ inst->pBindRuleset = NULL;
+ inst->bSuppOctetFram = cs.bSuppOctetFram;
+ inst->bKeepAlive = cs.bKeepAlive;
+ inst->iKeepAliveIntvl = cs.iKeepAliveIntvl;
+ inst->iKeepAliveProbes = cs.iKeepAliveProbes;
+ inst->iKeepAliveTime = cs.iKeepAliveTime;
+ inst->bEmitMsgOnClose = cs.bEmitMsgOnClose;
+ inst->bEmitMsgOnOpen = cs.bEmitMsgOnOpen;
+ inst->iAddtlFrameDelim = cs.iAddtlFrameDelim;
+ inst->maxFrameSize = cs.maxFrameSize;
+ inst->iTCPSessMax = cs.iTCPSessMax;
+
+finalize_it:
+ free(pNewVal);
+ RETiRet;
+}
+
+
+static rsRetVal
+addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
+{
+ DEFiRet;
+ ptcpsrv_t *pSrv = NULL;
+
+ CHKmalloc(pSrv = calloc(1, sizeof(ptcpsrv_t)));
+ pthread_mutex_init(&pSrv->mutSessLst, NULL);
+ pSrv->ratelimiter = NULL;
+ pSrv->pSess = NULL;
+ pSrv->pLstn = NULL;
+ pSrv->inst = inst;
+ pSrv->bSuppOctetFram = inst->bSuppOctetFram;
+ pSrv->bSPFramingFix = inst->bSPFramingFix;
+ pSrv->bKeepAlive = inst->bKeepAlive;
+ pSrv->iKeepAliveIntvl = inst->iKeepAliveIntvl;
+ pSrv->iKeepAliveProbes = inst->iKeepAliveProbes;
+ pSrv->iKeepAliveTime = inst->iKeepAliveTime;
+ pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose;
+ pSrv->bEmitMsgOnOpen = inst->bEmitMsgOnOpen;
+ pSrv->compressionMode = inst->compressionMode;
+ pSrv->dfltTZ = inst->dfltTZ;
+ if (inst->pszBindPort != NULL) {
+ CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort));
+ }
+ pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim;
+ pSrv->multiLine = inst->multiLine;
+ pSrv->socketBacklog = inst->socketBacklog;
+ pSrv->pszLstnPortFileName = inst->pszLstnPortFileName;
+ pSrv->maxFrameSize = inst->maxFrameSize;
+ if (inst->pszBindAddr == NULL) {
+ pSrv->lstnIP = NULL;
+ } else {
+ CHKmalloc(pSrv->lstnIP = ustrdup(inst->pszBindAddr));
+ }
+ if (inst->pszBindPath == NULL) {
+ pSrv->path = NULL;
+ } else {
+ CHKmalloc(pSrv->path = ustrdup(inst->pszBindPath));
+ CHKmalloc(pSrv->port = ustrdup(inst->pszBindPath));
+ pSrv->bUnixSocket = 1;
+ pSrv->fCreateMode = inst->fCreateMode;
+ pSrv->fileUID = inst->fileUID;
+ pSrv->fileGID = inst->fileGID;
+ pSrv->bFailOnPerms = inst->bFailOnPerms;
+ }
+
+ pSrv->bUnlink = inst->bUnlink;
+ pSrv->discardTruncatedMsg = inst->discardTruncatedMsg;
+ pSrv->flowControl = inst->flowControl;
+ pSrv->pRuleset = inst->pBindRuleset;
+ pSrv->pszInputName = ustrdup((inst->pszInputName == NULL) ? UCHAR_CONSTANT("imptcp") : inst->pszInputName);
+ pSrv->iTCPSessMax = inst->iTCPSessMax;
+ CHKiRet(prop.Construct(&pSrv->pInputName));
+ CHKiRet(prop.SetString(pSrv->pInputName, pSrv->pszInputName, ustrlen(pSrv->pszInputName)));
+ CHKiRet(prop.ConstructFinalize(pSrv->pInputName));
+
+ CHKiRet(ratelimitNew(&pSrv->ratelimiter, "imptcp", (char*) pSrv->port));
+ ratelimitSetLinuxLike(pSrv->ratelimiter, inst->ratelimitInterval, inst->ratelimitBurst);
+ ratelimitSetThreadSafe(pSrv->ratelimiter);
+ /* add to linked list */
+ pSrv->pNext = pSrvRoot;
+ pSrvRoot = pSrv;
+
+ /* all config vars are auto-reset -- this also is very useful with the
+ * new config format effort (v6).
+ */
+ resetConfigVariables(NULL, NULL);
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ LogError(0, NO_ERRCODE, "error %d trying to add listener", iRet);
+ if(pSrv != NULL) {
+ destructSrv(pSrv);
+ }
+ }
+ RETiRet;
+}
+
+
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static void
+startWorkerPool(void)
+{
+ int i;
+ pthread_mutex_lock(&io_q.mut); /* locking to keep Coverity happy */
+ wrkrRunning = 0;
+ pthread_mutex_unlock(&io_q.mut);
+ DBGPRINTF("imptcp: starting worker pool, %d workers\n", runModConf->wrkrMax);
+ wrkrInfo = calloc(runModConf->wrkrMax, sizeof(struct wrkrInfo_s));
+ if (wrkrInfo == NULL) {
+ LogError(errno, RS_RET_OUT_OF_MEMORY, "imptcp: worker-info array allocation failed.");
+ return;
+ }
+ for(i = 0 ; i < runModConf->wrkrMax ; ++i) {
+ /* init worker info structure! */
+ wrkrInfo[i].wrkrIdx = i;
+ wrkrInfo[i].numCalled = 0;
+ pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i]));
+ }
+
+}
+
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static void
+stopWorkerPool(void)
+{
+ int i;
+ DBGPRINTF("imptcp: stopping worker pool\n");
+ pthread_mutex_lock(&io_q.mut);
+ pthread_cond_broadcast(&io_q.wakeup_worker); /* awake wrkr if not running */
+ pthread_mutex_unlock(&io_q.mut);
+ for(i = 0 ; i < runModConf->wrkrMax ; ++i) {
+ pthread_join(wrkrInfo[i].tid, NULL);
+ DBGPRINTF("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
+ }
+ free(wrkrInfo);
+}
+
+
+
+/* start up all listeners
+ * This is a one-time stop once the module is set to start.
+ */
+static rsRetVal
+startupServers(void)
+{
+ DEFiRet;
+ rsRetVal localRet, lastErr;
+ int iOK;
+ int iAll;
+ ptcpsrv_t *pSrv;
+
+ iAll = iOK = 0;
+ lastErr = RS_RET_ERR;
+ pSrv = pSrvRoot;
+ while(pSrv != NULL) {
+ DBGPRINTF("imptcp: starting up server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName);
+ localRet = startupSrv(pSrv);
+ if(localRet == RS_RET_OK)
+ iOK++;
+ else
+ lastErr = localRet;
+ ++iAll;
+ pSrv = pSrv->pNext;
+ }
+
+ DBGPRINTF("imptcp: %d out of %d servers started successfully\n", iOK, iAll);
+ if(iOK == 0) /* iff all fails, we report an error */
+ iRet = lastErr;
+
+ RETiRet;
+}
+
+/* process new activity on listener. This means we need to accept a new
+ * connection.
+ */
+static rsRetVal ATTR_NONNULL()
+lstnActivity(ptcplstn_t *const pLstn)
+{
+ int newSock = -1;
+ prop_t *peerName;
+ prop_t *peerIP;
+ rsRetVal localRet;
+ DEFiRet;
+
+ DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock);
+ while(glbl.GetGlobalInputTermState() == 0) {
+ localRet = AcceptConnReq(pLstn, &newSock, &peerName, &peerIP);
+ DBGPRINTF("imptcp: AcceptConnReq on listen socket %d returned %d\n", pLstn->sock, localRet);
+ if(localRet == RS_RET_NO_MORE_DATA || glbl.GetGlobalInputTermState() == 1) {
+ break;
+ }
+ CHKiRet(localRet);
+ localRet = addSess(pLstn, newSock, peerName, peerIP);
+ if(localRet != RS_RET_OK) {
+ close(newSock);
+ prop.Destruct(&peerName);
+ prop.Destruct(&peerIP);
+ ABORT_FINALIZE(localRet);
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+/* process new activity on session. This means we need to accept data
+ * or close the session.
+ */
+static rsRetVal
+sessActivity(ptcpsess_t *const pSess, int *const continue_polling)
+{
+ int lenRcv;
+ int lenBuf;
+ uchar *peerName;
+ int lenPeer;
+ int remsock = 0; /* init just to keep compiler happy... :-( */
+ sbool bEmitOnClose = 0;
+ char rcvBuf[128*1024];
+ int runs = 0;
+ DEFiRet;
+
+ DBGPRINTF("imptcp: new activity on session socket %d\n", pSess->sock);
+
+ while(runs++ < 16) {
+ lenBuf = sizeof(rcvBuf);
+ lenRcv = recv(pSess->sock, rcvBuf, lenBuf, 0);
+
+ if(lenRcv > 0) {
+ /* have data, process it */
+ DBGPRINTF("imptcp: data(%d) on socket %d: %s\n", lenBuf, pSess->sock, rcvBuf);
+ CHKiRet(DataRcvd(pSess, rcvBuf, lenRcv));
+ } else if (lenRcv == 0) {
+ /* session was closed, do clean-up */
+ if(pSess->pLstn->pSrv->bEmitMsgOnClose) {
+ prop.GetString(pSess->peerName, &peerName, &lenPeer),
+ remsock = pSess->sock;
+ bEmitOnClose = 1;
+ }
+ *continue_polling = 0;
+ if(bEmitOnClose) {
+ LogError(0, RS_RET_PEER_CLOSED_CONN, "imptcp session %d closed by "
+ "remote peer %s.", remsock, peerName);
+ }
+ CHKiRet(closeSess(pSess)); /* close may emit more messages in strmzip mode! */
+ break;
+ } else {
+ if(CHK_EAGAIN_EWOULDBLOCK)
+ break;
+ DBGPRINTF("imptcp: error on session socket %d - closed.\n", pSess->sock);
+ *continue_polling = 0;
+ closeSess(pSess); /* try clean-up by dropping session */
+ break;
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* This function is called to process a single request. This may
+ * be carried out by the main worker or a helper. It can be run
+ * concurrently.
+ */
+static void
+processWorkItem(epolld_t *epd)
+{
+
+ int continue_polling = 1;
+
+ switch(epd->typ) {
+ case epolld_lstn:
+ /* listener never stops polling (except server shutdown) */
+ lstnActivity((ptcplstn_t *) epd->ptr);
+ break;
+ case epolld_sess:
+ sessActivity((ptcpsess_t *) epd->ptr, &continue_polling);
+ break;
+ default:
+ LogError(0, RS_RET_INTERNAL_ERROR, "error: invalid epolld_type_t %d after epoll", epd->typ);
+ break;
+ }
+ if (continue_polling == 1) {
+ epoll_ctl(epollfd, EPOLL_CTL_MOD, epd->sock, &(epd->ev));
+ }
+}
+
+
+static rsRetVal
+initIoQ(void)
+{
+ DEFiRet;
+ CHKiConcCtrl(pthread_mutex_init(&io_q.mut, NULL));
+ CHKiConcCtrl(pthread_cond_init(&io_q.wakeup_worker, NULL));
+ STAILQ_INIT(&io_q.q);
+ io_q.sz = 0;
+ io_q.ctrMaxSz = 0; /* TODO: discuss this and fix potential concurrent read/write issues */
+ CHKiRet(statsobj.Construct(&io_q.stats));
+ CHKiRet(statsobj.SetName(io_q.stats, (uchar*) "io-work-q"));
+ CHKiRet(statsobj.SetOrigin(io_q.stats, (uchar*) "imptcp"));
+ STATSCOUNTER_INIT(io_q.ctrEnq, io_q.mutCtrEnq);
+ CHKiRet(statsobj.AddCounter(io_q.stats, UCHAR_CONSTANT("enqueued"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &io_q.ctrEnq));
+ CHKiRet(statsobj.AddCounter(io_q.stats, UCHAR_CONSTANT("maxqsize"),
+ ctrType_Int, CTR_FLAG_NONE, &io_q.ctrMaxSz));
+ CHKiRet(statsobj.ConstructFinalize(io_q.stats));
+finalize_it:
+ RETiRet;
+}
+
+static void
+destroyIoQ(void)
+{
+ io_req_t *n;
+ if (io_q.stats != NULL) {
+ statsobj.Destruct(&io_q.stats);
+ }
+ pthread_mutex_lock(&io_q.mut);
+ while (!STAILQ_EMPTY(&io_q.q)) {
+ n = STAILQ_FIRST(&io_q.q);
+ STAILQ_REMOVE_HEAD(&io_q.q, link);
+ LogError(0, RS_RET_INTERNAL_ERROR, "imptcp: discarded enqueued io-work to allow shutdown "
+ "- ignored");
+ free(n);
+ }
+ io_q.sz = 0;
+ pthread_mutex_unlock(&io_q.mut);
+ pthread_cond_destroy(&io_q.wakeup_worker);
+ pthread_mutex_destroy(&io_q.mut);
+}
+
+static rsRetVal
+enqueueIoWork(epolld_t *epd, int dispatchInlineIfQueueFull) {
+ io_req_t *n;
+ int dispatchInline;
+ int inlineDispatchThreshold;
+ DEFiRet;
+
+ CHKmalloc(n = malloc(sizeof(io_req_t)));
+ n->epd = epd;
+
+ inlineDispatchThreshold = DFLT_inlineDispatchThreshold * runModConf->wrkrMax;
+ dispatchInline = 0;
+
+ pthread_mutex_lock(&io_q.mut);
+ if (dispatchInlineIfQueueFull && io_q.sz > inlineDispatchThreshold) {
+ dispatchInline = 1;
+ } else {
+ STAILQ_INSERT_TAIL(&io_q.q, n, link);
+ io_q.sz++;
+ STATSCOUNTER_INC(io_q.ctrEnq, io_q.mutCtrEnq);
+ STATSCOUNTER_SETMAX_NOMUT(io_q.ctrMaxSz, io_q.sz);
+ pthread_cond_signal(&io_q.wakeup_worker);
+ }
+ pthread_mutex_unlock(&io_q.mut);
+
+ if (dispatchInline == 1) {
+ free(n);
+ processWorkItem(epd);
+ }
+finalize_it:
+ if (iRet != RS_RET_OK) {
+ if (n == NULL) {
+ LogError(0, iRet, "imptcp: couldn't allocate memory to enqueue io-request - ignored");
+ }
+ }
+ RETiRet;
+}
+
+/* This function is called to process a complete workset, that
+ * is a set of events returned from epoll.
+ */
+static void
+processWorkSet(int nEvents, struct epoll_event events[])
+{
+ int iEvt;
+ int remainEvents;
+ remainEvents = nEvents;
+ epolld_t *epd;
+
+ for(iEvt = 0 ; (iEvt < nEvents) && (glbl.GetGlobalInputTermState() == 0) ; ++iEvt) {
+ epd = (epolld_t*)events[iEvt].data.ptr;
+ if(runModConf->bProcessOnPoller && remainEvents == 1) {
+ /* process self, save context switch */
+ processWorkItem(epd);
+ } else {
+ enqueueIoWork(epd, runModConf->bProcessOnPoller);
+ }
+ --remainEvents;
+ }
+}
+
+
+/* worker to process incoming requests
+ */
+static void *
+wrkr(void *myself)
+{
+ struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself;
+ pthread_mutex_lock(&io_q.mut);
+ ++wrkrRunning;
+ pthread_mutex_unlock(&io_q.mut);
+
+ uchar thrdName[32];
+ snprintf((char*)thrdName, sizeof(thrdName), "imptcp/w%d", me->wrkrIdx);
+# if defined(HAVE_PRCTL) && defined(PR_SET_NAME)
+ /* set thread name - we ignore if the call fails, has no harsh consequences... */
+ if(prctl(PR_SET_NAME, thrdName, 0, 0, 0) != 0) {
+ DBGPRINTF("prctl failed, not setting thread name for '%s'\n", thrdName);
+ }
+# endif
+
+ io_req_t *n;
+ while(1) {
+ n = NULL;
+ pthread_mutex_lock(&io_q.mut);
+ if (io_q.sz == 0) {
+ --wrkrRunning;
+ if (glbl.GetGlobalInputTermState() != 0) {
+ pthread_mutex_unlock(&io_q.mut);
+ break;
+ } else {
+ DBGPRINTF("imptcp: worker %llu waiting on new work items\n",
+ (unsigned long long) me->tid);
+ pthread_cond_wait(&io_q.wakeup_worker, &io_q.mut);
+ DBGPRINTF("imptcp: worker %llu awoken\n", (unsigned long long) me->tid);
+ }
+ ++wrkrRunning;
+ }
+ if (io_q.sz > 0) {
+ n = STAILQ_FIRST(&io_q.q);
+ STAILQ_REMOVE_HEAD(&io_q.q, link);
+ io_q.sz--;
+ }
+ pthread_mutex_unlock(&io_q.mut);
+
+ if (n != NULL) {
+ ++me->numCalled;
+ processWorkItem(n->epd);
+ free(n);
+ }
+ }
+ return NULL;
+}
+
+
+BEGINnewInpInst
+ struct cnfparamvals *pvals;
+ instanceConf_t *inst;
+ char *cstr;
+ int i;
+CODESTARTnewInpInst
+ DBGPRINTF("newInpInst (imptcp)\n");
+
+ if((pvals = nvlstGetParams(lst, &inppblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("input param blk in imptcp:\n");
+ cnfparamsPrint(&inppblk, pvals);
+ }
+
+ CHKiRet(createInstance(&inst));
+
+ for(i = 0 ; i < inppblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(inppblk.descr[i].name, "port")) {
+ inst->pszBindPort = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "address")) {
+ inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "path")) {
+ inst->pszBindPath = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "unlink")) {
+ inst->bUnlink = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "discardtruncatedmsg")) {
+ inst->discardTruncatedMsg = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "flowcontrol")) {
+ inst->flowControl = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "fileowner")) {
+ inst->fileUID = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "fileownernum")) {
+ inst->fileUID = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "filegroup")) {
+ inst->fileGID = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "filegroupnum")) {
+ inst->fileGID = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "filecreatemode")) {
+ inst->fCreateMode = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "failonpermsfailure")) {
+ inst->bFailOnPerms = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "name")) {
+ inst->pszInputName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "maxframesize")) {
+ const int max = (int) pvals[i].val.d.n;
+ if(max <= 200000000) {
+ inst->maxFrameSize = max;
+ } else {
+ parser_errmsg("imptcp: invalid value for 'maxFrameSize' "
+ "parameter given is %d, max is 200000000", max);
+ ABORT_FINALIZE(RS_RET_PARAM_ERROR);
+ }
+ } else if(!strcmp(inppblk.descr[i].name, "framing.delimiter.regex")) {
+ inst->startRegex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "ruleset")) {
+ inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "supportoctetcountedframing")) {
+ inst->bSuppOctetFram = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "framingfix.cisco.asa")) {
+ inst->bSPFramingFix = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "compression.mode")) {
+ cstr = es_str2cstr(pvals[i].val.d.estr, NULL);
+ if(!strcasecmp(cstr, "stream:always")) {
+ inst->compressionMode = COMPRESS_STREAM_ALWAYS;
+ } else if(!strcasecmp(cstr, "none")) {
+ inst->compressionMode = COMPRESS_NEVER;
+ } else {
+ parser_errmsg("imptcp: invalid value for 'compression.mode' "
+ "parameter (given is '%s')", cstr);
+ free(cstr);
+ ABORT_FINALIZE(RS_RET_PARAM_ERROR);
+ }
+ free(cstr);
+ } else if(!strcmp(inppblk.descr[i].name, "maxsessions")) {
+ inst->iTCPSessMax = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "keepalive")) {
+ inst->bKeepAlive = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "keepalive.probes")) {
+ inst->iKeepAliveProbes = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "keepalive.time")) {
+ inst->iKeepAliveTime = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "keepalive.interval")) {
+ inst->iKeepAliveIntvl = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "addtlframedelimiter")) {
+ inst->iAddtlFrameDelim = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "notifyonconnectionclose")) {
+ inst->bEmitMsgOnClose = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "notifyonconnectionopen")) {
+ inst->bEmitMsgOnOpen = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "defaulttz")) {
+ inst->dfltTZ = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
+ inst->ratelimitBurst = (unsigned int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) {
+ inst->ratelimitInterval = (unsigned int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "multiline")) {
+ inst->multiLine = (sbool) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "listenportfilename")) {
+ inst->pszLstnPortFileName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "socketbacklog")) {
+ inst->socketBacklog = (int) pvals[i].val.d.n;
+ } else {
+ dbgprintf("imptcp: program error, non-handled "
+ "param '%s'\n", inppblk.descr[i].name);
+ }
+
+ char *bindPort = (char *) inst->pszBindPort;
+ char *bindPath = (char *) inst->pszBindPath;
+ if ((bindPort == NULL || strlen(bindPort) < 1) && (bindPath == NULL || strlen (bindPath) < 1)) {
+ parser_errmsg("imptcp: Must have either port or path defined");
+ ABORT_FINALIZE(RS_RET_PARAM_ERROR);
+ }
+ }
+
+ if(inst->startRegex != NULL) {
+ const int errcode = regcomp(&inst->start_preg, (char*)inst->startRegex, REG_EXTENDED);
+ if(errcode != 0) {
+ char errbuff[512];
+ regerror(errcode, &inst->start_preg, errbuff, sizeof(errbuff));
+ parser_errmsg("imptcp: error in framing.delimiter.regex expansion: %s", errbuff);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ }
+
+ if (inst->iTCPSessMax == -1) {
+ inst->iTCPSessMax = loadModConf->iTCPSessMax;
+ }
+finalize_it:
+CODE_STD_FINALIZERnewInpInst
+ cnfparamvalsDestruct(pvals, &inppblk);
+ENDnewInpInst
+
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ /* init our settings */
+ loadModConf->wrkrMax = DFLT_wrkrMax;
+ loadModConf->bProcessOnPoller = 1;
+ loadModConf->configSetViaV2Method = 0;
+ bLegacyCnfModGlobalsPermitted = 1;
+ /* init legacy config vars */
+ initConfigSettings();
+ENDbeginCnfLoad
+
+
+BEGINsetModCnf
+ struct cnfparamvals *pvals = NULL;
+ int i;
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if(pvals == NULL) {
+ LogError(0, RS_RET_MISSING_CNFPARAMS, "imptcp: error processing module "
+ "config parameters [module(...)]");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("module (global) param blk for imptcp:\n");
+ cnfparamsPrint(&modpblk, pvals);
+ }
+
+ for(i = 0 ; i < modpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(modpblk.descr[i].name, "threads")) {
+ loadModConf->wrkrMax = (int) pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "maxsessions")) {
+ loadModConf->iTCPSessMax = (int) pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "processOnPoller")) {
+ loadModConf->bProcessOnPoller = (int) pvals[i].val.d.n;
+ } else {
+ dbgprintf("imptcp: program error, non-handled "
+ "param '%s' in beginCnfLoad\n", modpblk.descr[i].name);
+ }
+ }
+
+ /* remove all of our legacy handlers, as they can not used in addition
+ * the the new-style config method.
+ */
+ bLegacyCnfModGlobalsPermitted = 0;
+ loadModConf->configSetViaV2Method = 1;
+
+finalize_it:
+ if(pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ if(!loadModConf->configSetViaV2Method) {
+ /* persist module-specific settings from legacy config system */
+ loadModConf->wrkrMax = cs.wrkrMax;
+ }
+
+ loadModConf = NULL; /* done loading */
+ /* free legacy config vars */
+ free(cs.pszInputName);
+ free(cs.lstnIP);
+ cs.pszInputName = NULL;
+ cs.lstnIP = NULL;
+ENDendCnfLoad
+
+
+/* function to generate error message if framework does not find requested ruleset */
+static inline void
+std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, instanceConf_t *inst)
+{
+ LogError(0, NO_ERRCODE, "imptcp: ruleset '%s' for port %s not found - "
+ "using default ruleset instead", inst->pszBindRuleset,
+ inst->pszBindPort);
+}
+BEGINcheckCnf
+ instanceConf_t *inst;
+CODESTARTcheckCnf
+ for(inst = pModConf->root ; inst != NULL ; inst = inst->next) {
+ std_checkRuleset(pModConf, inst);
+ }
+ENDcheckCnf
+
+
+BEGINactivateCnfPrePrivDrop
+ instanceConf_t *inst;
+CODESTARTactivateCnfPrePrivDrop
+ iMaxLine = glbl.GetMaxLine(runConf); /* get maximum size we currently support */
+ DBGPRINTF("imptcp: config params iMaxLine %d\n", iMaxLine);
+
+ runModConf = pModConf;
+ for(inst = runModConf->root ; inst != NULL ; inst = inst->next) {
+ addListner(pModConf, inst);
+ }
+ if(pSrvRoot == NULL) {
+ LogError(0, RS_RET_NO_LSTN_DEFINED, "imptcp: no ptcp server defined, module can not run.");
+ ABORT_FINALIZE(RS_RET_NO_RUN);
+ }
+
+# if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
+ DBGPRINTF("imptcp uses epoll_create1()\n");
+ epollfd = epoll_create1(EPOLL_CLOEXEC);
+ if(epollfd < 0 && errno == ENOSYS)
+# endif
+ {
+ DBGPRINTF("imptcp uses epoll_create()\n");
+ /* reading the docs, the number of epoll events passed to
+ * epoll_create() seems not to be used at all in kernels. So
+ * we just provide "a" number, happens to be 10.
+ */
+ epollfd = epoll_create(10);
+ }
+
+ if(epollfd < 0) {
+ LogError(0, RS_RET_EPOLL_CR_FAILED, "error: epoll_create() failed");
+ ABORT_FINALIZE(RS_RET_NO_RUN);
+ }
+
+ /* start up servers, but do not yet read input data */
+ CHKiRet(startupServers());
+ DBGPRINTF("imptcp started up, but not yet receiving data\n");
+finalize_it:
+ENDactivateCnfPrePrivDrop
+
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ /* nothing to do, all done pre priv drop */
+ENDactivateCnf
+
+
+BEGINfreeCnf
+ instanceConf_t *inst, *del;
+CODESTARTfreeCnf
+ for(inst = pModConf->root ; inst != NULL ; ) {
+ free(inst->pszBindPort);
+ free(inst->pszBindPath);
+ free(inst->pszBindAddr);
+ free(inst->pszBindRuleset);
+ free(inst->pszInputName);
+ free(inst->dfltTZ);
+ if(inst->startRegex != NULL) {
+ regfree(&inst->start_preg);
+ free(inst->startRegex);
+ }
+ del = inst;
+ inst = inst->next;
+ free(del);
+ }
+ENDfreeCnf
+
+
+/* This function is called to gather input.
+ */
+BEGINrunInput
+ int nEvents;
+ struct epoll_event events[128];
+CODESTARTrunInput
+ initIoQ();
+ startWorkerPool();
+ DBGPRINTF("imptcp: now beginning to process input data\n");
+ while(glbl.GetGlobalInputTermState() == 0) {
+ DBGPRINTF("imptcp going on epoll_wait\n");
+ nEvents = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1);
+ DBGPRINTF("imptcp: epoll returned %d events\n", nEvents);
+ processWorkSet(nEvents, events);
+ }
+ DBGPRINTF("imptcp: successfully terminated\n");
+ /* we stop the worker pool in AfterRun, in case we get cancelled for some reason (old Interface) */
+ENDrunInput
+
+
+/* initialize and return if will run or not */
+BEGINwillRun
+CODESTARTwillRun
+ENDwillRun
+
+
+/* completely shut down a server, that means closing all of its
+ * listeners and sessions.
+ */
+static void
+shutdownSrv(ptcpsrv_t *pSrv)
+{
+ ptcplstn_t *pLstn, *lstnDel;
+ ptcpsess_t *pSess, *sessDel;
+
+ /* listeners */
+ pLstn = pSrv->pLstn;
+ while(pLstn != NULL) {
+ close(pLstn->sock);
+ statsobj.Destruct(&(pLstn->stats));
+ /* now unlink listner */
+ lstnDel = pLstn;
+ pLstn = pLstn->next;
+ DBGPRINTF("imptcp shutdown listen socket %d (rcvd %lld bytes, "
+ "decompressed %lld)\n", lstnDel->sock, lstnDel->rcvdBytes,
+ lstnDel->rcvdDecompressed);
+ free(lstnDel->epd);
+ free(lstnDel);
+ }
+
+ if (pSrv->bUnixSocket && pSrv->bUnlink) {
+ unlink((char*) pSrv->path);
+ }
+
+ /* sessions */
+ pSess = pSrv->pSess;
+ while(pSess != NULL) {
+ close(pSess->sock);
+ sessDel = pSess;
+ pSess = pSess->next;
+ DBGPRINTF("imptcp shutdown session socket %d\n", sessDel->sock);
+ destructSess(sessDel);
+ }
+}
+
+
+BEGINafterRun
+ ptcpsrv_t *pSrv, *srvDel;
+CODESTARTafterRun
+ stopWorkerPool();
+ destroyIoQ();
+
+ /* we need to close everything that is still open */
+ pSrv = pSrvRoot;
+ while(pSrv != NULL) {
+ srvDel = pSrv;
+ pSrv = pSrv->pNext;
+ shutdownSrv(srvDel);
+ destructSrv(srvDel);
+ }
+
+ close(epollfd);
+ENDafterRun
+
+
+BEGINmodExit
+CODESTARTmodExit
+ pthread_attr_destroy(&wrkrThrdAttr);
+ /* release objects we used */
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
+ objRelease(prop, CORE_COMPONENT);
+ objRelease(net, LM_NET_FILENAME);
+ objRelease(datetime, CORE_COMPONENT);
+ objRelease(ruleset, CORE_COMPONENT);
+ENDmodExit
+
+
+static rsRetVal
+resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
+{
+ cs.bEmitMsgOnClose = 0;
+ cs.bEmitMsgOnOpen = 0;
+ cs.wrkrMax = DFLT_wrkrMax;
+ cs.bKeepAlive = 0;
+ cs.iKeepAliveProbes = 0;
+ cs.iKeepAliveTime = 0;
+ cs.iKeepAliveIntvl = 0;
+ cs.bSuppOctetFram = 1;
+ cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
+ cs.maxFrameSize = 200000;
+ free(cs.pszInputName);
+ cs.pszInputName = NULL;
+ free(cs.lstnIP);
+ cs.lstnIP = NULL;
+ return RS_RET_OK;
+}
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
+CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES
+CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ /* request objects we use */
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
+ CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(net, LM_NET_FILENAME));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
+
+ /* initialize "read-only" thread attributes */
+ pthread_attr_init(&wrkrThrdAttr);
+ pthread_attr_setstacksize(&wrkrThrdAttr, 4096*1024);
+
+ /* init legacy config settings */
+ initConfigSettings();
+
+ /* register config file handlers */
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord,
+ addInstance, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive"), 0, eCmdHdlrBinary,
+ NULL, &cs.bKeepAlive, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_probes"), 0, eCmdHdlrInt,
+ NULL, &cs.iKeepAliveProbes, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_time"), 0, eCmdHdlrInt,
+ NULL, &cs.iKeepAliveTime, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_intvl"), 0, eCmdHdlrInt,
+ NULL, &cs.iKeepAliveIntvl, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserversupportoctetcountedframing"), 0, eCmdHdlrBinary,
+ NULL, &cs.bSuppOctetFram, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpservernotifyonconnectionclose"), 0,
+ eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt,
+ NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0,
+ eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0,
+ eCmdHdlrGetWord, NULL, &cs.lstnIP, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverbindruleset"), 0,
+ eCmdHdlrGetWord, NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID));
+ /* module-global parameters */
+ CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt,
+ NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler,
+ resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
+ENDmodInit
+
+
+/* vim:set ai:
+ */