diff options
Diffstat (limited to 'plugins/imptcp')
-rw-r--r-- | plugins/imptcp/Makefile.am | 6 | ||||
-rw-r--r-- | plugins/imptcp/Makefile.in | 796 | ||||
-rw-r--r-- | plugins/imptcp/imptcp.c | 2676 |
3 files changed, 3478 insertions, 0 deletions
diff --git a/plugins/imptcp/Makefile.am b/plugins/imptcp/Makefile.am new file mode 100644 index 0000000..bfacc88 --- /dev/null +++ b/plugins/imptcp/Makefile.am @@ -0,0 +1,6 @@ +pkglib_LTLIBRARIES = imptcp.la + +imptcp_la_SOURCES = imptcp.c +imptcp_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) +imptcp_la_LDFLAGS = -module -avoid-version +imptcp_la_LIBADD = diff --git a/plugins/imptcp/Makefile.in b/plugins/imptcp/Makefile.in new file mode 100644 index 0000000..a24d6fe --- /dev/null +++ b/plugins/imptcp/Makefile.in @@ -0,0 +1,796 @@ +# Makefile.in generated by automake 1.16.1 from Makefile.am. +# @configure_input@ + +# Copyright (C) 1994-2018 Free Software Foundation, Inc. + +# This Makefile.in is free software; the Free Software Foundation +# gives unlimited permission to copy and/or distribute it, +# with or without modifications, as long as this notice is preserved. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY, to the extent permitted by law; without +# even the implied warranty of MERCHANTABILITY or FITNESS FOR A +# PARTICULAR PURPOSE. + +@SET_MAKE@ + +VPATH = @srcdir@ +am__is_gnu_make = { \ + if test -z '$(MAKELEVEL)'; then \ + false; \ + elif test -n '$(MAKE_HOST)'; then \ + true; \ + elif test -n '$(MAKE_VERSION)' && test -n '$(CURDIR)'; then \ + true; \ + else \ + false; \ + fi; \ +} +am__make_running_with_option = \ + case $${target_option-} in \ + ?) ;; \ + *) echo "am__make_running_with_option: internal error: invalid" \ + "target option '$${target_option-}' specified" >&2; \ + exit 1;; \ + esac; \ + has_opt=no; \ + sane_makeflags=$$MAKEFLAGS; \ + if $(am__is_gnu_make); then \ + sane_makeflags=$$MFLAGS; \ + else \ + case $$MAKEFLAGS in \ + *\\[\ \ ]*) \ + bs=\\; \ + sane_makeflags=`printf '%s\n' "$$MAKEFLAGS" \ + | sed "s/$$bs$$bs[$$bs $$bs ]*//g"`;; \ + esac; \ + fi; \ + skip_next=no; \ + strip_trailopt () \ + { \ + flg=`printf '%s\n' "$$flg" | sed "s/$$1.*$$//"`; \ + }; \ + for flg in $$sane_makeflags; do \ + test $$skip_next = yes && { skip_next=no; continue; }; \ + case $$flg in \ + *=*|--*) continue;; \ + -*I) strip_trailopt 'I'; skip_next=yes;; \ + -*I?*) strip_trailopt 'I';; \ + -*O) strip_trailopt 'O'; skip_next=yes;; \ + -*O?*) strip_trailopt 'O';; \ + -*l) strip_trailopt 'l'; skip_next=yes;; \ + -*l?*) strip_trailopt 'l';; \ + -[dEDm]) skip_next=yes;; \ + -[JT]) skip_next=yes;; \ + esac; \ + case $$flg in \ + *$$target_option*) has_opt=yes; break;; \ + esac; \ + done; \ + test $$has_opt = yes +am__make_dryrun = (target_option=n; $(am__make_running_with_option)) +am__make_keepgoing = (target_option=k; $(am__make_running_with_option)) +pkgdatadir = $(datadir)/@PACKAGE@ +pkgincludedir = $(includedir)/@PACKAGE@ +pkglibdir = $(libdir)/@PACKAGE@ +pkglibexecdir = $(libexecdir)/@PACKAGE@ +am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd +install_sh_DATA = $(install_sh) -c -m 644 +install_sh_PROGRAM = $(install_sh) -c +install_sh_SCRIPT = $(install_sh) -c +INSTALL_HEADER = $(INSTALL_DATA) +transform = $(program_transform_name) +NORMAL_INSTALL = : +PRE_INSTALL = : +POST_INSTALL = : +NORMAL_UNINSTALL = : +PRE_UNINSTALL = : +POST_UNINSTALL = : +build_triplet = @build@ +host_triplet = @host@ +subdir = plugins/imptcp +ACLOCAL_M4 = $(top_srcdir)/aclocal.m4 +am__aclocal_m4_deps = $(top_srcdir)/m4/ac_check_define.m4 \ + $(top_srcdir)/m4/atomic_operations.m4 \ + $(top_srcdir)/m4/atomic_operations_64bit.m4 \ + $(top_srcdir)/m4/libtool.m4 $(top_srcdir)/m4/ltoptions.m4 \ + $(top_srcdir)/m4/ltsugar.m4 $(top_srcdir)/m4/ltversion.m4 \ + $(top_srcdir)/m4/lt~obsolete.m4 $(top_srcdir)/configure.ac +am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \ + $(ACLOCAL_M4) +DIST_COMMON = $(srcdir)/Makefile.am $(am__DIST_COMMON) +mkinstalldirs = $(install_sh) -d +CONFIG_HEADER = $(top_builddir)/config.h +CONFIG_CLEAN_FILES = +CONFIG_CLEAN_VPATH_FILES = +am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`; +am__vpath_adj = case $$p in \ + $(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \ + *) f=$$p;; \ + esac; +am__strip_dir = f=`echo $$p | sed -e 's|^.*/||'`; +am__install_max = 40 +am__nobase_strip_setup = \ + srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*|]/\\\\&/g'` +am__nobase_strip = \ + for p in $$list; do echo "$$p"; done | sed -e "s|$$srcdirstrip/||" +am__nobase_list = $(am__nobase_strip_setup); \ + for p in $$list; do echo "$$p $$p"; done | \ + sed "s| $$srcdirstrip/| |;"' / .*\//!s/ .*/ ./; s,\( .*\)/[^/]*$$,\1,' | \ + $(AWK) 'BEGIN { files["."] = "" } { files[$$2] = files[$$2] " " $$1; \ + if (++n[$$2] == $(am__install_max)) \ + { print $$2, files[$$2]; n[$$2] = 0; files[$$2] = "" } } \ + END { for (dir in files) print dir, files[dir] }' +am__base_list = \ + sed '$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;s/\n/ /g' | \ + sed '$$!N;$$!N;$$!N;$$!N;s/\n/ /g' +am__uninstall_files_from_dir = { \ + test -z "$$files" \ + || { test ! -d "$$dir" && test ! -f "$$dir" && test ! -r "$$dir"; } \ + || { echo " ( cd '$$dir' && rm -f" $$files ")"; \ + $(am__cd) "$$dir" && rm -f $$files; }; \ + } +am__installdirs = "$(DESTDIR)$(pkglibdir)" +LTLIBRARIES = $(pkglib_LTLIBRARIES) +imptcp_la_DEPENDENCIES = +am_imptcp_la_OBJECTS = imptcp_la-imptcp.lo +imptcp_la_OBJECTS = $(am_imptcp_la_OBJECTS) +AM_V_lt = $(am__v_lt_@AM_V@) +am__v_lt_ = $(am__v_lt_@AM_DEFAULT_V@) +am__v_lt_0 = --silent +am__v_lt_1 = +imptcp_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ + $(LIBTOOLFLAGS) --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \ + $(imptcp_la_LDFLAGS) $(LDFLAGS) -o $@ +AM_V_P = $(am__v_P_@AM_V@) +am__v_P_ = $(am__v_P_@AM_DEFAULT_V@) +am__v_P_0 = false +am__v_P_1 = : +AM_V_GEN = $(am__v_GEN_@AM_V@) +am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@) +am__v_GEN_0 = @echo " GEN " $@; +am__v_GEN_1 = +AM_V_at = $(am__v_at_@AM_V@) +am__v_at_ = $(am__v_at_@AM_DEFAULT_V@) +am__v_at_0 = @ +am__v_at_1 = +DEFAULT_INCLUDES = -I.@am__isrc@ -I$(top_builddir) +depcomp = $(SHELL) $(top_srcdir)/depcomp +am__maybe_remake_depfiles = depfiles +am__depfiles_remade = ./$(DEPDIR)/imptcp_la-imptcp.Plo +am__mv = mv -f +COMPILE = $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) \ + $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) +LTCOMPILE = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ + $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) \ + $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) \ + $(AM_CFLAGS) $(CFLAGS) +AM_V_CC = $(am__v_CC_@AM_V@) +am__v_CC_ = $(am__v_CC_@AM_DEFAULT_V@) +am__v_CC_0 = @echo " CC " $@; +am__v_CC_1 = +CCLD = $(CC) +LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ + $(LIBTOOLFLAGS) --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \ + $(AM_LDFLAGS) $(LDFLAGS) -o $@ +AM_V_CCLD = $(am__v_CCLD_@AM_V@) +am__v_CCLD_ = $(am__v_CCLD_@AM_DEFAULT_V@) +am__v_CCLD_0 = @echo " CCLD " $@; +am__v_CCLD_1 = +SOURCES = $(imptcp_la_SOURCES) +DIST_SOURCES = $(imptcp_la_SOURCES) +am__can_run_installinfo = \ + case $$AM_UPDATE_INFO_DIR in \ + n|no|NO) false;; \ + *) (install-info --version) >/dev/null 2>&1;; \ + esac +am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP) +# Read a list of newline-separated strings from the standard input, +# and print each of them once, without duplicates. Input order is +# *not* preserved. +am__uniquify_input = $(AWK) '\ + BEGIN { nonempty = 0; } \ + { items[$$0] = 1; nonempty = 1; } \ + END { if (nonempty) { for (i in items) print i; }; } \ +' +# Make sure the list of sources is unique. This is necessary because, +# e.g., the same source file might be shared among _SOURCES variables +# for different programs/libraries. +am__define_uniq_tagged_files = \ + list='$(am__tagged_files)'; \ + unique=`for i in $$list; do \ + if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \ + done | $(am__uniquify_input)` +ETAGS = etags +CTAGS = ctags +am__DIST_COMMON = $(srcdir)/Makefile.in $(top_srcdir)/depcomp +DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST) +ACLOCAL = @ACLOCAL@ +AMTAR = @AMTAR@ +AM_DEFAULT_VERBOSITY = @AM_DEFAULT_VERBOSITY@ +APU_CFLAGS = @APU_CFLAGS@ +APU_LIBS = @APU_LIBS@ +AR = @AR@ +AUTOCONF = @AUTOCONF@ +AUTOHEADER = @AUTOHEADER@ +AUTOMAKE = @AUTOMAKE@ +AWK = @AWK@ +CC = @CC@ +CCDEPMODE = @CCDEPMODE@ +CFLAGS = @CFLAGS@ +CIVETWEB_LIBS = @CIVETWEB_LIBS@ +CONF_FILE_PATH = @CONF_FILE_PATH@ +CPP = @CPP@ +CPPFLAGS = @CPPFLAGS@ +CURL_CFLAGS = @CURL_CFLAGS@ +CURL_LIBS = @CURL_LIBS@ +CYGPATH_W = @CYGPATH_W@ +CZMQ_CFLAGS = @CZMQ_CFLAGS@ +CZMQ_LIBS = @CZMQ_LIBS@ +DEFS = @DEFS@ +DEPDIR = @DEPDIR@ +DLLTOOL = @DLLTOOL@ +DL_LIBS = @DL_LIBS@ +DSYMUTIL = @DSYMUTIL@ +DUMPBIN = @DUMPBIN@ +ECHO_C = @ECHO_C@ +ECHO_N = @ECHO_N@ +ECHO_T = @ECHO_T@ +EGREP = @EGREP@ +EXEEXT = @EXEEXT@ +FAUP_LIBS = @FAUP_LIBS@ +FGREP = @FGREP@ +GLIB_CFLAGS = @GLIB_CFLAGS@ +GLIB_LIBS = @GLIB_LIBS@ +GNUTLS_CFLAGS = @GNUTLS_CFLAGS@ +GNUTLS_LIBS = @GNUTLS_LIBS@ +GREP = @GREP@ +GSS_LIBS = @GSS_LIBS@ +GT_KSI_LS12_CFLAGS = @GT_KSI_LS12_CFLAGS@ +GT_KSI_LS12_LIBS = @GT_KSI_LS12_LIBS@ +HASH_XXHASH_LIBS = @HASH_XXHASH_LIBS@ +HIREDIS_CFLAGS = @HIREDIS_CFLAGS@ +HIREDIS_LIBS = @HIREDIS_LIBS@ +IMUDP_LIBS = @IMUDP_LIBS@ +INSTALL = @INSTALL@ +INSTALL_DATA = @INSTALL_DATA@ +INSTALL_PROGRAM = @INSTALL_PROGRAM@ +INSTALL_SCRIPT = @INSTALL_SCRIPT@ +INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@ +IP = @IP@ +JAVA = @JAVA@ +JAVAC = @JAVAC@ +LD = @LD@ +LDFLAGS = @LDFLAGS@ +LEX = @LEX@ +LEXLIB = @LEXLIB@ +LEX_OUTPUT_ROOT = @LEX_OUTPUT_ROOT@ +LIBCAPNG_CFLAGS = @LIBCAPNG_CFLAGS@ +LIBCAPNG_LIBS = @LIBCAPNG_LIBS@ +LIBCAPNG_PRESENT_CFLAGS = @LIBCAPNG_PRESENT_CFLAGS@ +LIBCAPNG_PRESENT_LIBS = @LIBCAPNG_PRESENT_LIBS@ +LIBDBI_CFLAGS = @LIBDBI_CFLAGS@ +LIBDBI_LIBS = @LIBDBI_LIBS@ +LIBESTR_CFLAGS = @LIBESTR_CFLAGS@ +LIBESTR_LIBS = @LIBESTR_LIBS@ +LIBEVENT_CFLAGS = @LIBEVENT_CFLAGS@ +LIBEVENT_LIBS = @LIBEVENT_LIBS@ +LIBFASTJSON_CFLAGS = @LIBFASTJSON_CFLAGS@ +LIBFASTJSON_LIBS = @LIBFASTJSON_LIBS@ +LIBGCRYPT_CFLAGS = @LIBGCRYPT_CFLAGS@ +LIBGCRYPT_CONFIG = @LIBGCRYPT_CONFIG@ +LIBGCRYPT_LIBS = @LIBGCRYPT_LIBS@ +LIBLOGGING_CFLAGS = @LIBLOGGING_CFLAGS@ +LIBLOGGING_LIBS = @LIBLOGGING_LIBS@ +LIBLOGGING_STDLOG_CFLAGS = @LIBLOGGING_STDLOG_CFLAGS@ +LIBLOGGING_STDLOG_LIBS = @LIBLOGGING_STDLOG_LIBS@ +LIBLOGNORM_CFLAGS = @LIBLOGNORM_CFLAGS@ +LIBLOGNORM_LIBS = @LIBLOGNORM_LIBS@ +LIBLZ4_CFLAGS = @LIBLZ4_CFLAGS@ +LIBLZ4_LIBS = @LIBLZ4_LIBS@ +LIBM = @LIBM@ +LIBMONGOC_CFLAGS = @LIBMONGOC_CFLAGS@ +LIBMONGOC_LIBS = @LIBMONGOC_LIBS@ +LIBOBJS = @LIBOBJS@ +LIBRDKAFKA_CFLAGS = @LIBRDKAFKA_CFLAGS@ +LIBRDKAFKA_LIBS = @LIBRDKAFKA_LIBS@ +LIBS = @LIBS@ +LIBSYSTEMD_CFLAGS = @LIBSYSTEMD_CFLAGS@ +LIBSYSTEMD_JOURNAL_CFLAGS = @LIBSYSTEMD_JOURNAL_CFLAGS@ +LIBSYSTEMD_JOURNAL_LIBS = @LIBSYSTEMD_JOURNAL_LIBS@ +LIBSYSTEMD_LIBS = @LIBSYSTEMD_LIBS@ +LIBTOOL = @LIBTOOL@ +LIBUUID_CFLAGS = @LIBUUID_CFLAGS@ +LIBUUID_LIBS = @LIBUUID_LIBS@ +LIPO = @LIPO@ +LN_S = @LN_S@ +LTLIBOBJS = @LTLIBOBJS@ +LT_SYS_LIBRARY_PATH = @LT_SYS_LIBRARY_PATH@ +MAKEINFO = @MAKEINFO@ +MANIFEST_TOOL = @MANIFEST_TOOL@ +MKDIR_P = @MKDIR_P@ +MYSQL_CFLAGS = @MYSQL_CFLAGS@ +MYSQL_CONFIG = @MYSQL_CONFIG@ +MYSQL_LIBS = @MYSQL_LIBS@ +NM = @NM@ +NMEDIT = @NMEDIT@ +OBJDUMP = @OBJDUMP@ +OBJEXT = @OBJEXT@ +OPENSSL_CFLAGS = @OPENSSL_CFLAGS@ +OPENSSL_LIBS = @OPENSSL_LIBS@ +OTOOL = @OTOOL@ +OTOOL64 = @OTOOL64@ +PACKAGE = @PACKAGE@ +PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@ +PACKAGE_NAME = @PACKAGE_NAME@ +PACKAGE_STRING = @PACKAGE_STRING@ +PACKAGE_TARNAME = @PACKAGE_TARNAME@ +PACKAGE_URL = @PACKAGE_URL@ +PACKAGE_VERSION = @PACKAGE_VERSION@ +PATH_SEPARATOR = @PATH_SEPARATOR@ +PGSQL_CFLAGS = @PGSQL_CFLAGS@ +PGSQL_LIBS = @PGSQL_LIBS@ +PG_CONFIG = @PG_CONFIG@ +PID_FILE_PATH = @PID_FILE_PATH@ +PKG_CONFIG = @PKG_CONFIG@ +PKG_CONFIG_LIBDIR = @PKG_CONFIG_LIBDIR@ +PKG_CONFIG_PATH = @PKG_CONFIG_PATH@ +PROTON_CFLAGS = @PROTON_CFLAGS@ +PROTON_LIBS = @PROTON_LIBS@ +PROTON_PROACTOR_CFLAGS = @PROTON_PROACTOR_CFLAGS@ +PROTON_PROACTOR_LIBS = @PROTON_PROACTOR_LIBS@ +PTHREADS_CFLAGS = @PTHREADS_CFLAGS@ +PTHREADS_LIBS = @PTHREADS_LIBS@ +PYTHON = @PYTHON@ +PYTHON_EXEC_PREFIX = @PYTHON_EXEC_PREFIX@ +PYTHON_PLATFORM = @PYTHON_PLATFORM@ +PYTHON_PREFIX = @PYTHON_PREFIX@ +PYTHON_VERSION = @PYTHON_VERSION@ +RABBITMQ_CFLAGS = @RABBITMQ_CFLAGS@ +RABBITMQ_LIBS = @RABBITMQ_LIBS@ +RANLIB = @RANLIB@ +READLINK = @READLINK@ +REDIS = @REDIS@ +RELP_CFLAGS = @RELP_CFLAGS@ +RELP_LIBS = @RELP_LIBS@ +RSRT_CFLAGS = @RSRT_CFLAGS@ +RSRT_CFLAGS1 = @RSRT_CFLAGS1@ +RSRT_LIBS = @RSRT_LIBS@ +RSRT_LIBS1 = @RSRT_LIBS1@ +RST2MAN = @RST2MAN@ +RT_LIBS = @RT_LIBS@ +SED = @SED@ +SET_MAKE = @SET_MAKE@ +SHELL = @SHELL@ +SNMP_CFLAGS = @SNMP_CFLAGS@ +SNMP_LIBS = @SNMP_LIBS@ +SOL_LIBS = @SOL_LIBS@ +STRIP = @STRIP@ +TCL_BIN_DIR = @TCL_BIN_DIR@ +TCL_INCLUDE_SPEC = @TCL_INCLUDE_SPEC@ +TCL_LIB_FILE = @TCL_LIB_FILE@ +TCL_LIB_FLAG = @TCL_LIB_FLAG@ +TCL_LIB_SPEC = @TCL_LIB_SPEC@ +TCL_PATCH_LEVEL = @TCL_PATCH_LEVEL@ +TCL_SRC_DIR = @TCL_SRC_DIR@ +TCL_STUB_LIB_FILE = @TCL_STUB_LIB_FILE@ +TCL_STUB_LIB_FLAG = @TCL_STUB_LIB_FLAG@ +TCL_STUB_LIB_SPEC = @TCL_STUB_LIB_SPEC@ +TCL_VERSION = @TCL_VERSION@ +UDPSPOOF_CFLAGS = @UDPSPOOF_CFLAGS@ +UDPSPOOF_LIBS = @UDPSPOOF_LIBS@ +VALGRIND = @VALGRIND@ +VERSION = @VERSION@ +WARN_CFLAGS = @WARN_CFLAGS@ +WARN_LDFLAGS = @WARN_LDFLAGS@ +WARN_SCANNERFLAGS = @WARN_SCANNERFLAGS@ +WGET = @WGET@ +YACC = @YACC@ +YACC_FOUND = @YACC_FOUND@ +YFLAGS = @YFLAGS@ +ZLIB_CFLAGS = @ZLIB_CFLAGS@ +ZLIB_LIBS = @ZLIB_LIBS@ +ZSTD_CFLAGS = @ZSTD_CFLAGS@ +ZSTD_LIBS = @ZSTD_LIBS@ +abs_builddir = @abs_builddir@ +abs_srcdir = @abs_srcdir@ +abs_top_builddir = @abs_top_builddir@ +abs_top_srcdir = @abs_top_srcdir@ +ac_ct_AR = @ac_ct_AR@ +ac_ct_CC = @ac_ct_CC@ +ac_ct_DUMPBIN = @ac_ct_DUMPBIN@ +am__include = @am__include@ +am__leading_dot = @am__leading_dot@ +am__quote = @am__quote@ +am__tar = @am__tar@ +am__untar = @am__untar@ +bindir = @bindir@ +build = @build@ +build_alias = @build_alias@ +build_cpu = @build_cpu@ +build_os = @build_os@ +build_vendor = @build_vendor@ +builddir = @builddir@ +datadir = @datadir@ +datarootdir = @datarootdir@ +docdir = @docdir@ +dvidir = @dvidir@ +exec_prefix = @exec_prefix@ +host = @host@ +host_alias = @host_alias@ +host_cpu = @host_cpu@ +host_os = @host_os@ +host_vendor = @host_vendor@ +htmldir = @htmldir@ +includedir = @includedir@ +infodir = @infodir@ +install_sh = @install_sh@ +libdir = @libdir@ +libexecdir = @libexecdir@ +localedir = @localedir@ +localstatedir = @localstatedir@ +mandir = @mandir@ +mkdir_p = @mkdir_p@ +moddirs = @moddirs@ +oldincludedir = @oldincludedir@ +pdfdir = @pdfdir@ +pkgpyexecdir = @pkgpyexecdir@ +pkgpythondir = @pkgpythondir@ +prefix = @prefix@ +program_transform_name = @program_transform_name@ +psdir = @psdir@ +pyexecdir = @pyexecdir@ +pythondir = @pythondir@ +runstatedir = @runstatedir@ +sbindir = @sbindir@ +sharedstatedir = @sharedstatedir@ +srcdir = @srcdir@ +sysconfdir = @sysconfdir@ +target_alias = @target_alias@ +top_build_prefix = @top_build_prefix@ +top_builddir = @top_builddir@ +top_srcdir = @top_srcdir@ +pkglib_LTLIBRARIES = imptcp.la +imptcp_la_SOURCES = imptcp.c +imptcp_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) +imptcp_la_LDFLAGS = -module -avoid-version +imptcp_la_LIBADD = +all: all-am + +.SUFFIXES: +.SUFFIXES: .c .lo .o .obj +$(srcdir)/Makefile.in: $(srcdir)/Makefile.am $(am__configure_deps) + @for dep in $?; do \ + case '$(am__configure_deps)' in \ + *$$dep*) \ + ( cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh ) \ + && { if test -f $@; then exit 0; else break; fi; }; \ + exit 1;; \ + esac; \ + done; \ + echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu plugins/imptcp/Makefile'; \ + $(am__cd) $(top_srcdir) && \ + $(AUTOMAKE) --gnu plugins/imptcp/Makefile +Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status + @case '$?' in \ + *config.status*) \ + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \ + *) \ + echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__maybe_remake_depfiles)'; \ + cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__maybe_remake_depfiles);; \ + esac; + +$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh + +$(top_srcdir)/configure: $(am__configure_deps) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh +$(ACLOCAL_M4): $(am__aclocal_m4_deps) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh +$(am__aclocal_m4_deps): + +install-pkglibLTLIBRARIES: $(pkglib_LTLIBRARIES) + @$(NORMAL_INSTALL) + @list='$(pkglib_LTLIBRARIES)'; test -n "$(pkglibdir)" || list=; \ + list2=; for p in $$list; do \ + if test -f $$p; then \ + list2="$$list2 $$p"; \ + else :; fi; \ + done; \ + test -z "$$list2" || { \ + echo " $(MKDIR_P) '$(DESTDIR)$(pkglibdir)'"; \ + $(MKDIR_P) "$(DESTDIR)$(pkglibdir)" || exit 1; \ + echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 '$(DESTDIR)$(pkglibdir)'"; \ + $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 "$(DESTDIR)$(pkglibdir)"; \ + } + +uninstall-pkglibLTLIBRARIES: + @$(NORMAL_UNINSTALL) + @list='$(pkglib_LTLIBRARIES)'; test -n "$(pkglibdir)" || list=; \ + for p in $$list; do \ + $(am__strip_dir) \ + echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=uninstall rm -f '$(DESTDIR)$(pkglibdir)/$$f'"; \ + $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=uninstall rm -f "$(DESTDIR)$(pkglibdir)/$$f"; \ + done + +clean-pkglibLTLIBRARIES: + -test -z "$(pkglib_LTLIBRARIES)" || rm -f $(pkglib_LTLIBRARIES) + @list='$(pkglib_LTLIBRARIES)'; \ + locs=`for p in $$list; do echo $$p; done | \ + sed 's|^[^/]*$$|.|; s|/[^/]*$$||; s|$$|/so_locations|' | \ + sort -u`; \ + test -z "$$locs" || { \ + echo rm -f $${locs}; \ + rm -f $${locs}; \ + } + +imptcp.la: $(imptcp_la_OBJECTS) $(imptcp_la_DEPENDENCIES) $(EXTRA_imptcp_la_DEPENDENCIES) + $(AM_V_CCLD)$(imptcp_la_LINK) -rpath $(pkglibdir) $(imptcp_la_OBJECTS) $(imptcp_la_LIBADD) $(LIBS) + +mostlyclean-compile: + -rm -f *.$(OBJEXT) + +distclean-compile: + -rm -f *.tab.c + +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/imptcp_la-imptcp.Plo@am__quote@ # am--include-marker + +$(am__depfiles_remade): + @$(MKDIR_P) $(@D) + @echo '# dummy' >$@-t && $(am__mv) $@-t $@ + +am--depfiles: $(am__depfiles_remade) + +.c.o: +@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.o$$||'`;\ +@am__fastdepCC_TRUE@ $(COMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ $< &&\ +@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Po +@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(COMPILE) -c -o $@ $< + +.c.obj: +@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.obj$$||'`;\ +@am__fastdepCC_TRUE@ $(COMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ `$(CYGPATH_W) '$<'` &&\ +@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Po +@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(COMPILE) -c -o $@ `$(CYGPATH_W) '$<'` + +.c.lo: +@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.lo$$||'`;\ +@am__fastdepCC_TRUE@ $(LTCOMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ $< &&\ +@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Plo +@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(LTCOMPILE) -c -o $@ $< + +imptcp_la-imptcp.lo: imptcp.c +@am__fastdepCC_TRUE@ $(AM_V_CC)$(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(imptcp_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT imptcp_la-imptcp.lo -MD -MP -MF $(DEPDIR)/imptcp_la-imptcp.Tpo -c -o imptcp_la-imptcp.lo `test -f 'imptcp.c' || echo '$(srcdir)/'`imptcp.c +@am__fastdepCC_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/imptcp_la-imptcp.Tpo $(DEPDIR)/imptcp_la-imptcp.Plo +@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='imptcp.c' object='imptcp_la-imptcp.lo' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(imptcp_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o imptcp_la-imptcp.lo `test -f 'imptcp.c' || echo '$(srcdir)/'`imptcp.c + +mostlyclean-libtool: + -rm -f *.lo + +clean-libtool: + -rm -rf .libs _libs + +ID: $(am__tagged_files) + $(am__define_uniq_tagged_files); mkid -fID $$unique +tags: tags-am +TAGS: tags + +tags-am: $(TAGS_DEPENDENCIES) $(am__tagged_files) + set x; \ + here=`pwd`; \ + $(am__define_uniq_tagged_files); \ + shift; \ + if test -z "$(ETAGS_ARGS)$$*$$unique"; then :; else \ + test -n "$$unique" || unique=$$empty_fix; \ + if test $$# -gt 0; then \ + $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \ + "$$@" $$unique; \ + else \ + $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \ + $$unique; \ + fi; \ + fi +ctags: ctags-am + +CTAGS: ctags +ctags-am: $(TAGS_DEPENDENCIES) $(am__tagged_files) + $(am__define_uniq_tagged_files); \ + test -z "$(CTAGS_ARGS)$$unique" \ + || $(CTAGS) $(CTAGSFLAGS) $(AM_CTAGSFLAGS) $(CTAGS_ARGS) \ + $$unique + +GTAGS: + here=`$(am__cd) $(top_builddir) && pwd` \ + && $(am__cd) $(top_srcdir) \ + && gtags -i $(GTAGS_ARGS) "$$here" +cscopelist: cscopelist-am + +cscopelist-am: $(am__tagged_files) + list='$(am__tagged_files)'; \ + case "$(srcdir)" in \ + [\\/]* | ?:[\\/]*) sdir="$(srcdir)" ;; \ + *) sdir=$(subdir)/$(srcdir) ;; \ + esac; \ + for i in $$list; do \ + if test -f "$$i"; then \ + echo "$(subdir)/$$i"; \ + else \ + echo "$$sdir/$$i"; \ + fi; \ + done >> $(top_builddir)/cscope.files + +distclean-tags: + -rm -f TAGS ID GTAGS GRTAGS GSYMS GPATH tags + +distdir: $(BUILT_SOURCES) + $(MAKE) $(AM_MAKEFLAGS) distdir-am + +distdir-am: $(DISTFILES) + @srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \ + topsrcdirstrip=`echo "$(top_srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \ + list='$(DISTFILES)'; \ + dist_files=`for file in $$list; do echo $$file; done | \ + sed -e "s|^$$srcdirstrip/||;t" \ + -e "s|^$$topsrcdirstrip/|$(top_builddir)/|;t"`; \ + case $$dist_files in \ + */*) $(MKDIR_P) `echo "$$dist_files" | \ + sed '/\//!d;s|^|$(distdir)/|;s,/[^/]*$$,,' | \ + sort -u` ;; \ + esac; \ + for file in $$dist_files; do \ + if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \ + if test -d $$d/$$file; then \ + dir=`echo "/$$file" | sed -e 's,/[^/]*$$,,'`; \ + if test -d "$(distdir)/$$file"; then \ + find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \ + fi; \ + if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \ + cp -fpR $(srcdir)/$$file "$(distdir)$$dir" || exit 1; \ + find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \ + fi; \ + cp -fpR $$d/$$file "$(distdir)$$dir" || exit 1; \ + else \ + test -f "$(distdir)/$$file" \ + || cp -p $$d/$$file "$(distdir)/$$file" \ + || exit 1; \ + fi; \ + done +check-am: all-am +check: check-am +all-am: Makefile $(LTLIBRARIES) +installdirs: + for dir in "$(DESTDIR)$(pkglibdir)"; do \ + test -z "$$dir" || $(MKDIR_P) "$$dir"; \ + done +install: install-am +install-exec: install-exec-am +install-data: install-data-am +uninstall: uninstall-am + +install-am: all-am + @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am + +installcheck: installcheck-am +install-strip: + if test -z '$(STRIP)'; then \ + $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \ + install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \ + install; \ + else \ + $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \ + install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \ + "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'" install; \ + fi +mostlyclean-generic: + +clean-generic: + +distclean-generic: + -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES) + -test . = "$(srcdir)" || test -z "$(CONFIG_CLEAN_VPATH_FILES)" || rm -f $(CONFIG_CLEAN_VPATH_FILES) + +maintainer-clean-generic: + @echo "This command is intended for maintainers to use" + @echo "it deletes files that may require special tools to rebuild." +clean: clean-am + +clean-am: clean-generic clean-libtool clean-pkglibLTLIBRARIES \ + mostlyclean-am + +distclean: distclean-am + -rm -f ./$(DEPDIR)/imptcp_la-imptcp.Plo + -rm -f Makefile +distclean-am: clean-am distclean-compile distclean-generic \ + distclean-tags + +dvi: dvi-am + +dvi-am: + +html: html-am + +html-am: + +info: info-am + +info-am: + +install-data-am: + +install-dvi: install-dvi-am + +install-dvi-am: + +install-exec-am: install-pkglibLTLIBRARIES + +install-html: install-html-am + +install-html-am: + +install-info: install-info-am + +install-info-am: + +install-man: + +install-pdf: install-pdf-am + +install-pdf-am: + +install-ps: install-ps-am + +install-ps-am: + +installcheck-am: + +maintainer-clean: maintainer-clean-am + -rm -f ./$(DEPDIR)/imptcp_la-imptcp.Plo + -rm -f Makefile +maintainer-clean-am: distclean-am maintainer-clean-generic + +mostlyclean: mostlyclean-am + +mostlyclean-am: mostlyclean-compile mostlyclean-generic \ + mostlyclean-libtool + +pdf: pdf-am + +pdf-am: + +ps: ps-am + +ps-am: + +uninstall-am: uninstall-pkglibLTLIBRARIES + +.MAKE: install-am install-strip + +.PHONY: CTAGS GTAGS TAGS all all-am am--depfiles check check-am clean \ + clean-generic clean-libtool clean-pkglibLTLIBRARIES \ + cscopelist-am ctags ctags-am distclean distclean-compile \ + distclean-generic distclean-libtool distclean-tags distdir dvi \ + dvi-am html html-am info info-am install install-am \ + install-data install-data-am install-dvi install-dvi-am \ + install-exec install-exec-am install-html install-html-am \ + install-info install-info-am install-man install-pdf \ + install-pdf-am install-pkglibLTLIBRARIES install-ps \ + install-ps-am install-strip installcheck installcheck-am \ + installdirs maintainer-clean maintainer-clean-generic \ + mostlyclean mostlyclean-compile mostlyclean-generic \ + mostlyclean-libtool pdf pdf-am ps ps-am tags tags-am uninstall \ + uninstall-am uninstall-pkglibLTLIBRARIES + +.PRECIOUS: Makefile + + +# Tell versions [3.59,3.63) of GNU make to not export all variables. +# Otherwise a system limit (for SysV at least) may be exceeded. +.NOEXPORT: diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c new file mode 100644 index 0000000..351dee0 --- /dev/null +++ b/plugins/imptcp/imptcp.c @@ -0,0 +1,2676 @@ +/* imptcp.c + * This is a native implementation of plain tcp. It is intentionally + * duplicate work (imtcp). The intent is to gain very fast and simple + * native ptcp support, utilizing the best interfaces Linux (no cross- + * platform intended!) has to offer. + * + * Note that in this module we try out some new naming conventions, + * so it may look a bit "different" from the other modules. We are + * investigating if removing prefixes can help make code more readable. + * + * File begun on 2010-08-10 by RGerhards + * + * Copyright 2007-2022 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#if !defined(HAVE_EPOLL_CREATE) +# error imptcp requires OS support for epoll - can not build + /* imptcp gains speed by using modern Linux capabilities. As such, + * it can only be build on platforms supporting the epoll API. + */ +#endif + +#include <stdio.h> +#include <stdlib.h> +#include <assert.h> +#include <string.h> +#include <errno.h> +#include <unistd.h> +#include <stdarg.h> +#include <ctype.h> +#include <netinet/in.h> +#include <netdb.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <sys/epoll.h> +#include <sys/queue.h> +#include <netinet/tcp.h> +#include <stdint.h> +#include <zlib.h> +#include <sys/stat.h> +#include <regex.h> +#if HAVE_FCNTL_H +#include <fcntl.h> +#endif +#ifdef HAVE_SYS_PRCTL_H +# include <sys/prctl.h> +#endif +#include "rsyslog.h" +#include "cfsysline.h" +#include "prop.h" +#include "dirty.h" +#include "module-template.h" +#include "unicode-helper.h" +#include "glbl.h" +#include "errmsg.h" +#include "srUtils.h" +#include "datetime.h" +#include "ruleset.h" +#include "msg.h" +#include "parserif.h" +#include "statsobj.h" +#include "ratelimit.h" +#include "net.h" /* for permittedPeers, may be removed when this is removed */ + +/* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */ +#define TCPSRV_NO_ADDTL_DELIMITER -1 /* specifies that no additional delimiter is to be used in TCP framing */ + +MODULE_TYPE_INPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("imptcp") + +/* static data */ +DEF_IMOD_STATIC_DATA +DEFobjCurrIf(glbl) +DEFobjCurrIf(net) +DEFobjCurrIf(prop) +DEFobjCurrIf(datetime) +DEFobjCurrIf(ruleset) +DEFobjCurrIf(statsobj) + +/* forward references */ +static void * wrkr(void *myself); + +/* unfortunately, on some platforms EAGAIN == EWOULDBOLOCK and so checking against + * both of them generates a gcc 8 warning for this reason. We do not want to disable + * the warning, so we need to work around this via a macro. + */ +#if EAGAIN == EWOULDBLOCK + #define CHK_EAGAIN_EWOULDBLOCK (errno == EAGAIN) +#else + #define CHK_EAGAIN_EWOULDBLOCK (errno == EAGAIN | errno == EWOULDBLOCK) +#endif /* #if EAGAIN == EWOULDBOLOCK */ + +#define DFLT_wrkrMax 2 +#define DFLT_inlineDispatchThreshold 1 + +#define COMPRESS_NEVER 0 +#define COMPRESS_SINGLE_MSG 1 /* old, single-message compression */ +/* all other settings are for stream-compression */ +#define COMPRESS_STREAM_ALWAYS 2 + +/* config settings */ +typedef struct configSettings_s { + int bKeepAlive; /* support keep-alive packets */ + int iKeepAliveIntvl; + int iKeepAliveProbes; + int iKeepAliveTime; + int bEmitMsgOnClose; /* emit an informational message on close by remote peer */ + int bEmitMsgOnOpen; + int bSuppOctetFram; /* support octet-counted framing? */ + int iAddtlFrameDelim; /* addtl frame delimiter, e.g. for netscreen, default none */ + int maxFrameSize; + uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ + uchar *lstnIP; /* which IP we should listen on? */ + uchar *pszBindRuleset; + int wrkrMax; /* max number of workers (actually "helper workers") */ + int iTCPSessMax; /* max open connections per instance */ +} configSettings_t; +static configSettings_t cs; + +struct instanceConf_s { + int bKeepAlive; /* support keep-alive packets */ + int iKeepAliveIntvl; + int iKeepAliveProbes; + int iKeepAliveTime; + int bEmitMsgOnClose; + int bEmitMsgOnOpen; + int bSuppOctetFram; /* support octet-counted framing? */ + int bSPFramingFix; + int iAddtlFrameDelim; + int socketBacklog; + sbool multiLine; + uint8_t compressionMode; + uchar *pszBindPort; /* port to bind to */ + uchar *pszLstnPortFileName; /* Name of the file with dynamic port used by testbench*/ + uchar *pszBindAddr; /* IP to bind socket to */ + uchar *pszBindPath; /* Path to bind socket to */ + uchar *pszBindRuleset; /* name of ruleset to bind to */ + uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ + int fCreateMode; /* file creation mode for open() */ + uid_t fileUID; /* IDs for creation */ + gid_t fileGID; + int maxFrameSize; + int bFailOnPerms; /* fail creation if chown fails? */ + ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + uchar *dfltTZ; + sbool bUnlink; + sbool discardTruncatedMsg; + sbool flowControl; + unsigned int ratelimitInterval; + unsigned int ratelimitBurst; + uchar *startRegex; + regex_t start_preg; /* compiled version of startRegex */ + int iTCPSessMax; /* max open connections */ + struct instanceConf_s *next; +}; + + +struct modConfData_s { + rsconf_t *pConf; /* our overall config object */ + instanceConf_t *root, *tail; + int wrkrMax; + int bProcessOnPoller; + int iTCPSessMax; + sbool configSetViaV2Method; +}; + +static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ +static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ + +/* module-global parameters */ +static struct cnfparamdescr modpdescr[] = { + { "threads", eCmdHdlrPositiveInt, 0 }, + { "maxsessions", eCmdHdlrInt, 0 }, + { "processOnPoller", eCmdHdlrBinary, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + +/* input instance parameters */ +static struct cnfparamdescr inppdescr[] = { + { "port", eCmdHdlrString, 0 }, /* legacy: InputTCPServerRun */ + { "address", eCmdHdlrString, 0 }, + { "path", eCmdHdlrString, 0 }, + { "unlink", eCmdHdlrBinary, 0 }, + { "discardtruncatedmsg", eCmdHdlrBinary, 0 }, + { "fileowner", eCmdHdlrUID, 0 }, + { "fileownernum", eCmdHdlrInt, 0 }, + { "filegroup", eCmdHdlrGID, 0 }, + { "filegroupnum", eCmdHdlrInt, 0 }, + { "filecreatemode", eCmdHdlrFileCreateMode, 0 }, + { "failonchownfailure", eCmdHdlrBinary, 0 }, + { "flowcontrol", eCmdHdlrBinary, 0 }, + { "name", eCmdHdlrString, 0 }, + { "maxframesize", eCmdHdlrInt, 0 }, + { "framing.delimiter.regex", eCmdHdlrString, 0 }, + { "ruleset", eCmdHdlrString, 0 }, + { "defaulttz", eCmdHdlrString, 0 }, + { "supportoctetcountedframing", eCmdHdlrBinary, 0 }, + { "framingfix.cisco.asa", eCmdHdlrBinary, 0 }, + { "maxsessions", eCmdHdlrInt, 0 }, + { "notifyonconnectionclose", eCmdHdlrBinary, 0 }, + { "notifyonconnectionopen", eCmdHdlrBinary, 0 }, + { "compression.mode", eCmdHdlrGetWord, 0 }, + { "keepalive", eCmdHdlrBinary, 0 }, + { "keepalive.probes", eCmdHdlrInt, 0 }, + { "keepalive.time", eCmdHdlrInt, 0 }, + { "keepalive.interval", eCmdHdlrInt, 0 }, + { "addtlframedelimiter", eCmdHdlrInt, 0 }, + { "ratelimit.interval", eCmdHdlrInt, 0 }, + { "ratelimit.burst", eCmdHdlrInt, 0 }, + { "multiline", eCmdHdlrBinary, 0 }, + { "listenportfilename", eCmdHdlrString, 0 }, + { "socketbacklog", eCmdHdlrInt, 0 } +}; +static struct cnfparamblk inppblk = + { CNFPARAMBLK_VERSION, + sizeof(inppdescr)/sizeof(struct cnfparamdescr), + inppdescr + }; + +#include "im-helper.h" /* must be included AFTER the type definitions! */ +static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ + +/* data elements describing our running config */ +typedef struct ptcpsrv_s ptcpsrv_t; +typedef struct ptcplstn_s ptcplstn_t; +typedef struct ptcpsess_s ptcpsess_t; +typedef struct epolld_s epolld_t; + +/* the ptcp server (listener) object + * Note that the object contains support for forming a linked list + * of them. It does not make sense to do this seperately. + */ +struct ptcpsrv_s { + ptcpsrv_t *pNext; /* linked list maintenance */ + uchar *port; /* Port to listen to */ + uchar *lstnIP; /* which IP we should listen on? */ + uchar *path; /* Use a unix socket instead */ + int fCreateMode; /* file creation mode for open() */ + uid_t fileUID; /* IDs for creation */ + gid_t fileGID; + int maxFrameSize; + int bFailOnPerms; /* fail creation if chown fails? */ + sbool bUnixSocket; + int socketBacklog; + uchar *pszLstnPortFileName; + int iAddtlFrameDelim; + sbool multiLine; + int iKeepAliveIntvl; + int iKeepAliveProbes; + int iKeepAliveTime; + uint8_t compressionMode; + uchar *pszInputName; + uchar *dfltTZ; + prop_t *pInputName; /* InputName in (fast to process) property format */ + ruleset_t *pRuleset; + ptcplstn_t *pLstn; /* root of our listeners */ + ptcpsess_t *pSess; /* root of our sessions */ + int iTCPSessCnt; + int iTCPSessMax; + pthread_mutex_t mutSessLst; + sbool bKeepAlive; /* support keep-alive packets */ + sbool bEmitMsgOnClose; + sbool bEmitMsgOnOpen; + sbool bSuppOctetFram; + sbool bSPFramingFix; + sbool bUnlink; + sbool discardTruncatedMsg; + sbool flowControl; + ratelimit_t *ratelimiter; + instanceConf_t *inst; +}; + +/* the ptcp session object. Describes a single active session. + * includes support for doubly-linked list. + */ +struct ptcpsess_s { + ptcplstn_t *pLstn; /* our listener */ + ptcpsess_t *prev, *next; + int sock; + epolld_t *epd; + sbool bzInitDone; /* did we do an init of zstrm already? */ + z_stream zstrm; /* zip stream to use for tcp compression */ + uint8_t compressionMode; + int iMsg; /* index of next char to store in msg */ + int iCurrLine; /* 2nd char of current line in regex framing mode */ + int bAtStrtOfFram; /* are we at the very beginning of a new frame? */ + sbool bSuppOctetFram; /**< copy from listener, to speed up access */ + sbool bSPFramingFix; + enum { + eAtStrtFram, + eInOctetCnt, + eInMsg, + eInMsgTruncation + } inputState; /* our current state */ + int iOctetsRemain; /* Number of Octets remaining in message */ + TCPFRAMINGMODE eFraming; + uchar *pMsg; /* message (fragment) received */ + uchar *pMsg_save; /* message (fragment) save area in regex framing mode */ + prop_t *peerName; /* host name we received messages from */ + prop_t *peerIP; + const uchar *startRegex;/* cache for performance reasons */ + int iAddtlFrameDelim; /* cache for performance reasons */ +}; + + +/* the ptcp listener object. Describes a single active listener. + */ +struct ptcplstn_s { + ptcpsrv_t *pSrv; /* our server */ + ptcplstn_t *prev, *next; + int sock; + sbool bSuppOctetFram; + sbool bSPFramingFix; + epolld_t *epd; + statsobj_t *stats; /* listener stats */ + intctr_t rcvdBytes; + intctr_t rcvdDecompressed; + STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) + STATSCOUNTER_DEF(ctrSessOpen, mutCtrSessOpen) + STATSCOUNTER_DEF(ctrSessOpenErr, mutCtrSessOpenErr) + STATSCOUNTER_DEF(ctrSessClose, mutCtrSessClose) + DEF_ATOMIC_HELPER_MUT64(mut_rcvdBytes) +}; + + +/* The following structure controls the worker threads. Global data is + * needed for their access. + */ +static struct wrkrInfo_s { + pthread_t tid; /* the worker's thread ID */ + long long unsigned numCalled; /* how often was this called */ + int wrkrIdx; /* index for this worker - shortcut for thread name */ +} *wrkrInfo; +static int wrkrRunning; + + +/* type of object stored in epoll descriptor */ +typedef enum { + epolld_lstn, + epolld_sess +} epolld_type_t; + +/* an epoll descriptor. contains all information necessary to process + * the result of epoll. + */ +struct epolld_s { + epolld_type_t typ; + void *ptr; + int sock; + struct epoll_event ev; +}; + +typedef struct io_req_s { + STAILQ_ENTRY(io_req_s) link; + epolld_t *epd; +} io_req_t; + +typedef struct io_q_s { + STAILQ_HEAD(ioq_s, io_req_s) q; + STATSCOUNTER_DEF(ctrEnq, mutCtrEnq); + int ctrMaxSz; //TODO: discuss potential problems around concurrent reads and writes + int sz; //current q size + statsobj_t *stats; + pthread_mutex_t mut; + pthread_cond_t wakeup_worker; +} io_q_t; + +/* global data */ +pthread_attr_t wrkrThrdAttr; /* Attribute for session threads; read only after startup */ +static ptcpsrv_t *pSrvRoot = NULL; +static int epollfd = -1; /* (sole) descriptor for epoll */ +static int iMaxLine; /* maximum size of a single message */ +static io_q_t io_q; + +/* forward definitions */ +static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal); +static rsRetVal addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6); + + +/* function to suppress TSAN known-good case + * We do not use a mutex an epd, but we do always access it in + * pure sequence. Adding a mutex just to cover this "cosmetic" + * would result in uncesseary performance penalty. + */ +static void ATTR_NONNULL() +imptcp_destruct_epd(ptcpsess_t *const pSess) { + free(pSess->epd); + pSess->epd = NULL; +} + +/* some simple constructors/destructors */ +static void ATTR_NONNULL() +destructSess(ptcpsess_t *const pSess) +{ + imptcp_destruct_epd(pSess); + free(pSess->pMsg_save); + free(pSess->pMsg); + prop.Destruct(&pSess->peerName); + prop.Destruct(&pSess->peerIP); + /* TODO: make these inits compile-time switch depending: */ + pSess->pMsg = NULL; + free(pSess); +} + +/* remove session from server */ +static void +unlinkSess(ptcpsess_t *pSess) { + ptcpsrv_t *pSrv = pSess->pLstn->pSrv; + pthread_mutex_lock(&pSrv->mutSessLst); + pSrv->iTCPSessCnt--; + /* finally unlink session from structures */ + if(pSess->next != NULL) + pSess->next->prev = pSess->prev; + if(pSess->prev == NULL) { + /* need to update root! */ + pSrv->pSess = pSess->next; + } else { + pSess->prev->next = pSess->next; + } + pthread_mutex_unlock(&pSrv->mutSessLst); +} + +static void +destructSrv(ptcpsrv_t *pSrv) +{ + if(pSrv->ratelimiter != NULL) + ratelimitDestruct(pSrv->ratelimiter); + if(pSrv->pInputName != NULL) + prop.Destruct(&pSrv->pInputName); + pthread_mutex_destroy(&pSrv->mutSessLst); + free(pSrv->pszInputName); + free(pSrv->port); + free(pSrv->pszLstnPortFileName); + free(pSrv->path); + free(pSrv->lstnIP); + free(pSrv); +} + +/****************************************** TCP SUPPORT FUNCTIONS ***********************************/ +/* We may later think about moving this into a helper library again. But the whole point + * so far was to keep everything related close togehter. -- rgerhards, 2010-08-10 + */ + +static rsRetVal startupUXSrv(ptcpsrv_t *pSrv) { + DEFiRet; + int sock; + int sockflags; + struct sockaddr_un local; + + uchar *path = pSrv->path == NULL ? UCHAR_CONSTANT("") : pSrv->path; + DBGPRINTF("imptcp: creating listen unix socket at %s\n", path); + + sock = socket(AF_UNIX, SOCK_STREAM, 0); + if(sock < 0) { + LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: error creating unix socket"); + ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); + } + + local.sun_family = AF_UNIX; + strncpy(local.sun_path, (char*) path, sizeof(local.sun_path)-1); + if (pSrv->bUnlink) { + unlink(local.sun_path); + } + + /* We use non-blocking IO! */ + if ((sockflags = fcntl(sock, F_GETFL)) != -1) { + sockflags |= O_NONBLOCK; + /* SETFL could fail too, so get it caught by the subsequent error check. */ + sockflags = fcntl(sock, F_SETFL, sockflags); + } + + if (sockflags == -1) { + LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: error setting fcntl(O_NONBLOCK) on unix socket"); + ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); + } + + if (bind(sock, (struct sockaddr *)&local, SUN_LEN(&local)) < 0) { + LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: error while binding unix socket"); + ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); + } + + if (listen(sock, pSrv->socketBacklog) < 0) { + LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: unix socket listen error"); + ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); + } + + if(chown(local.sun_path, pSrv->fileUID, pSrv->fileGID) != 0) { + if(pSrv->bFailOnPerms) { + LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: unix socket chown error"); + ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); + } + } + + if(chmod(local.sun_path, pSrv->fCreateMode) != 0) { + if(pSrv->bFailOnPerms) { + LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: unix socket chmod error"); + ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); + } + } + + CHKiRet(addLstn(pSrv, sock, 0)); + +finalize_it: + if (iRet != RS_RET_OK) { + if (sock != -1) { + close(sock); + } + } + + RETiRet; +} + +/* Start up a server. That means all of its listeners are created. + * Does NOT yet accept/process any incoming data (but binds ports). Hint: this + * code is to be executed before dropping privileges. + */ +PRAGMA_DIAGNOSTIC_PUSH +static rsRetVal +startupSrv(ptcpsrv_t *pSrv) +{ + DEFiRet; + int error, maxs, on = 1; + int sock = -1; + int numSocks; + int sockflags; + struct addrinfo hints, *res = NULL, *r; + uchar *lstnIP; + int isIPv6 = 0; + int port_override = 0; /* if dyn port (0): use this for actually bound port */ + union { + struct sockaddr *sa; + struct sockaddr_in *ipv4; + struct sockaddr_in6 *ipv6; + } savecast; + + if (pSrv->bUnixSocket) { + return startupUXSrv(pSrv); + } + + lstnIP = pSrv->lstnIP == NULL ? UCHAR_CONSTANT("") : pSrv->lstnIP; + + DBGPRINTF("imptcp: creating listen socket on server '%s', port %s\n", lstnIP, pSrv->port); + + memset(&hints, 0, sizeof(hints)); + hints.ai_flags = AI_PASSIVE; + hints.ai_family = glbl.GetDefPFFamily(runModConf->pConf); + hints.ai_socktype = SOCK_STREAM; + + error = getaddrinfo((char*)pSrv->lstnIP, (char*) pSrv->port, &hints, &res); + if(error) { + DBGPRINTF("error %d querying server '%s', port '%s'\n", error, pSrv->lstnIP, pSrv->port); + ABORT_FINALIZE(RS_RET_INVALID_PORT); + } + + /* Count max number of sockets we may open */ + for(maxs = 0, r = res; r != NULL ; r = r->ai_next, maxs++) { + /* EMPTY */; + } + + numSocks = 0; /* num of sockets counter at start of array */ + for(r = res; r != NULL ; r = r->ai_next) { + if(port_override != 0) { + savecast.sa = (struct sockaddr*)r->ai_addr; + if(r->ai_family == AF_INET6) { + savecast.ipv6->sin6_port = port_override; + } else { + savecast.ipv4->sin_port = port_override; + } + } + sock = socket(r->ai_family, r->ai_socktype, r->ai_protocol); + if(sock < 0) { + if(!(r->ai_family == PF_INET6 && errno == EAFNOSUPPORT)) { + DBGPRINTF("error %d creating tcp listen socket", errno); + /* it is debatable if PF_INET with EAFNOSUPPORT should + * also be ignored... + */ + } + continue; + } + + if(r->ai_family == AF_INET6) { + isIPv6 = 1; +#ifdef IPV6_V6ONLY + int iOn = 1; + if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&iOn, sizeof (iOn)) < 0) { + close(sock); + sock = -1; + continue; + } +#endif + } else { + isIPv6 = 0; + } + + if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)) < 0 ) { + DBGPRINTF("error %d setting tcp socket option\n", errno); + close(sock); + sock = -1; + continue; + } + + /* We use non-blocking IO! */ + if((sockflags = fcntl(sock, F_GETFL)) != -1) { + sockflags |= O_NONBLOCK; + /* SETFL could fail too, so get it caught by the subsequent + * error check. + */ + sockflags = fcntl(sock, F_SETFL, sockflags); + } + if(sockflags == -1) { + DBGPRINTF("error %d setting fcntl(O_NONBLOCK) on tcp socket", errno); + close(sock); + sock = -1; + continue; + } + + + + /* We need to enable BSD compatibility. Otherwise an attacker + * could flood our log files by sending us tons of ICMP errors. + */ +#if !defined (_AIX) +#ifndef BSD + if(net.should_use_so_bsdcompat()) { + if (setsockopt(sock, SOL_SOCKET, SO_BSDCOMPAT, + (char *) &on, sizeof(on)) < 0) { + LogError(errno, NO_ERRCODE, "TCP setsockopt(BSDCOMPAT)"); + close(sock); + sock = -1; + continue; + } + } +#endif +#endif + if( (bind(sock, r->ai_addr, r->ai_addrlen) < 0) +#ifndef IPV6_V6ONLY + && (errno != EADDRINUSE) +#endif + ) { + /* TODO: check if *we* bound the socket - else we *have* an error! */ + LogError(errno, NO_ERRCODE, "Error while binding tcp socket"); + close(sock); + sock = -1; + continue; + } + + /* if we bind to dynamic port (port 0 given), we will do so consistently. Thus + * once we got a dynamic port, we will keep it and use it for other protocols + * as well. As of my understanding, this should always work as the OS does not + * pick a port that is used by some protocol (well, at least this looks very + * unlikely...). If our asusmption is wrong, we should iterate until we find a + * combination that works - it is very unusual to have the same service listen + * on differnt ports on IPv4 and IPv6. + */ + savecast.sa = (struct sockaddr*)r->ai_addr; + const int currport = (isIPv6) ? savecast.ipv6->sin6_port : savecast.ipv4->sin_port; + if(currport == 0) { + socklen_t socklen_r = r->ai_addrlen; + if(getsockname(sock, r->ai_addr, &socklen_r) == -1) { + LogError(errno, NO_ERRCODE, "nsd_ptcp: ListenPortFileName: getsockname:" + "error while trying to get socket"); + } + r->ai_addrlen = socklen_r; + savecast.sa = (struct sockaddr*)r->ai_addr; + port_override = (isIPv6) ? savecast.ipv6->sin6_port : savecast.ipv4->sin_port; + if(pSrv->pszLstnPortFileName != NULL) { + FILE *fp; + if((fp = fopen((const char*)pSrv->pszLstnPortFileName, "w+")) == NULL) { + LogError(errno, RS_RET_IO_ERROR, "imptcp: ListenPortFileName: " + "error while trying to open file"); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + if(isIPv6) { + fprintf(fp, "%d", ntohs(savecast.ipv6->sin6_port)); + } else { + fprintf(fp, "%d", ntohs(savecast.ipv4->sin_port)); + } + fclose(fp); + } + } + + if(listen(sock, pSrv->socketBacklog) < 0) { + LogError(errno, NO_ERRCODE, "imptcp error listening on port"); + DBGPRINTF("tcp listen error %d, suspending\n", errno); + close(sock); + sock = -1; + continue; + } + + /* if we reach this point, we were able to obtain a valid socket, so we can + * create our listener object. -- rgerhards, 2010-08-10 + */ + CHKiRet(addLstn(pSrv, sock, isIPv6)); + ++numSocks; + } + + if(numSocks != maxs) { + DBGPRINTF("We could initialize %d TCP listen sockets out of %d we received " + "- this may or may not be an error indication.\n", numSocks, maxs); + } + + if(numSocks == 0) { + DBGPRINTF("No TCP listen sockets could successfully be initialized"); + ABORT_FINALIZE(RS_RET_COULD_NOT_BIND); + } + +finalize_it: + if(res != NULL) { + freeaddrinfo(res); + } + + if(iRet != RS_RET_OK) { + if(sock != -1) { + close(sock); + } + } + + RETiRet; +} +PRAGMA_DIAGNOSTIC_POP + +/* Set pRemHost based on the address provided. This is to be called upon accept()ing + * a connection request. It must be provided by the socket we received the + * message on as well as a NI_MAXHOST size large character buffer for the FQDN. + * Please see http://www.hmug.org/man/3/getnameinfo.php (under Caveats) + * for some explanation of the code found below. If we detect a malicious + * hostname, we return RS_RET_MALICIOUS_HNAME and let the caller decide + * on how to deal with that. + * rgerhards, 2008-03-31 + */ +static rsRetVal +getPeerNames(prop_t **peerName, prop_t **peerIP, struct sockaddr *pAddr, sbool bUXServer) +{ + int error; + uchar szIP[NI_MAXHOST+1] = ""; + uchar szHname[NI_MAXHOST+1] = ""; + struct addrinfo hints, *res; + sbool bMaliciousHName = 0; + + DEFiRet; + + *peerName = NULL; + *peerIP = NULL; + + if (bUXServer) { + strncpy((char *) szHname, (char *) glbl.GetLocalHostName(), NI_MAXHOST); + strncpy((char *) szIP, (char *) glbl.GetLocalHostIP(), NI_MAXHOST); + szHname[NI_MAXHOST] = '\0'; + szIP[NI_MAXHOST] = '\0'; + } else { + error = getnameinfo(pAddr, SALEN(pAddr), (char *) szIP, sizeof(szIP), NULL, 0, NI_NUMERICHOST); + if (error) { + DBGPRINTF("Malformed from address %s\n", gai_strerror(error)); + strcpy((char *) szHname, "???"); + strcpy((char *) szIP, "???"); + ABORT_FINALIZE(RS_RET_INVALID_HNAME); + } + + if (!glbl.GetDisableDNS(runConf)) { + error = getnameinfo(pAddr, SALEN(pAddr), (char *) szHname, NI_MAXHOST, NULL, 0, NI_NAMEREQD); + if (error == 0) { + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_flags = AI_NUMERICHOST; + hints.ai_socktype = SOCK_STREAM; + /* we now do a lookup once again. This one should fail, + * because we should not have obtained a non-numeric address. If + * we got a numeric one, someone messed with DNS! + */ + if (getaddrinfo((char *) szHname, NULL, &hints, &res) == 0) { + freeaddrinfo(res); + /* OK, we know we have evil, so let's indicate this to our caller */ + snprintf((char *) szHname, sizeof(szHname), "[MALICIOUS:IP=%s]", szIP); + DBGPRINTF("Malicious PTR record, IP = \"%s\" HOST = \"%s\"", szIP, szHname); + bMaliciousHName = 1; + } + } else { + strcpy((char *) szHname, (char *) szIP); + } + } else { + strcpy((char *) szHname, (char *) szIP); + } + } + + /* We now have the names, so now let's allocate memory and store them permanently. */ + CHKiRet(prop.Construct(peerName)); + CHKiRet(prop.SetString(*peerName, szHname, ustrlen(szHname))); + CHKiRet(prop.ConstructFinalize(*peerName)); + CHKiRet(prop.Construct(peerIP)); + CHKiRet(prop.SetString(*peerIP, szIP, ustrlen(szIP))); + CHKiRet(prop.ConstructFinalize(*peerIP)); + +finalize_it: + if(iRet != RS_RET_OK) { + if(*peerName != NULL) + prop.Destruct(peerName); + if(*peerIP != NULL) + prop.Destruct(peerIP); + } + if(bMaliciousHName) + iRet = RS_RET_MALICIOUS_HNAME; + RETiRet; +} + + +/* Enable KEEPALIVE handling on the socket. */ +static rsRetVal +EnableKeepAlive(ptcplstn_t *pLstn, int sock) +{ + int ret; + int optval; + socklen_t optlen; + DEFiRet; + + optval = 1; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); + if(ret < 0) { + dbgprintf("EnableKeepAlive socket call returns error %d\n", ret); + ABORT_FINALIZE(RS_RET_ERR); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveProbes > 0) { + optval = pLstn->pSrv->iKeepAliveProbes; + optlen = sizeof(optval); + ret = setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive probes - ignored"); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveTime > 0) { + optval = pLstn->pSrv->iKeepAliveTime; + optlen = sizeof(optval); + ret = setsockopt(sock, IPPROTO_TCP, TCP_KEEPIDLE, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive time - ignored"); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveIntvl > 0) { + optval = pLstn->pSrv->iKeepAliveIntvl; + optlen = sizeof(optval); + ret = setsockopt(sock, IPPROTO_TCP, TCP_KEEPINTVL, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + LogError(errno, NO_ERRCODE, "imptcp cannot set keepalive intvl - ignored"); + } + + dbgprintf("KEEPALIVE enabled for socket %d\n", sock); + +finalize_it: + RETiRet; +} + + +/* accept an incoming connection request + * rgerhards, 2008-04-22 + */ +static rsRetVal ATTR_NONNULL() +AcceptConnReq(ptcplstn_t *const pLstn, int *const newSock, prop_t **peerName, prop_t **peerIP) +{ + int sockflags; + struct sockaddr_storage addr; + socklen_t addrlen = sizeof(addr); + int iNewSock = -1; + + DEFiRet; + + *peerName = NULL; /* ensure we know if we don't have one! */ + iNewSock = accept(pLstn->sock, (struct sockaddr*) &addr, &addrlen); + if(iNewSock < 0) { + if(CHK_EAGAIN_EWOULDBLOCK || errno == EMFILE) + ABORT_FINALIZE(RS_RET_NO_MORE_DATA); + LogError(errno, RS_RET_ACCEPT_ERR, "error accepting connection " + "on listen socket %d", pLstn->sock); + ABORT_FINALIZE(RS_RET_ACCEPT_ERR); + } + if(addrlen == 0) { + LogError(errno, RS_RET_ACCEPT_ERR, "AcceptConnReq could not obtain " + "remote peer identification on listen socket %d", pLstn->sock); + } + + if(pLstn->pSrv->bKeepAlive) + EnableKeepAlive(pLstn, iNewSock);/* we ignore errors, best to do! */ + + CHKiRet(getPeerNames(peerName, peerIP, (struct sockaddr *) &addr, pLstn->pSrv->bUnixSocket)); + + /* set the new socket to non-blocking IO */ + if((sockflags = fcntl(iNewSock, F_GETFL)) != -1) { + sockflags |= O_NONBLOCK; + /* SETFL could fail too, so get it caught by the subsequent + * error check. + */ + sockflags = fcntl(iNewSock, F_SETFL, sockflags); + } + + if(sockflags == -1) { + LogError(errno, RS_RET_IO_ERROR, "error setting fcntl(O_NONBLOCK) on " + "tcp socket %d", iNewSock); + prop.Destruct(peerName); + prop.Destruct(peerIP); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + if(pLstn->pSrv->bEmitMsgOnOpen) { + LogMsg(0, RS_RET_NO_ERRCODE, LOG_INFO, + "imptcp: connection established with host: %s", + propGetSzStr(*peerName)); + } + + STATSCOUNTER_INC(pLstn->ctrSessOpen, pLstn->mutCtrSessOpen); + *newSock = iNewSock; + +finalize_it: + DBGPRINTF("iRet: %d\n", iRet); + if(iRet != RS_RET_OK) { + if(iRet != RS_RET_NO_MORE_DATA && pLstn->pSrv->bEmitMsgOnOpen) { + LogError(0, NO_ERRCODE, "imptcp: connection could not be " + "established with host: %s", + *peerName == NULL ? "(could not query)" + : (const char*)propGetSzStr(*peerName)); + } + STATSCOUNTER_INC(pLstn->ctrSessOpenErr, pLstn->mutCtrSessOpenErr); + /* the close may be redundant, but that doesn't hurt... */ + if(iNewSock != -1) + close(iNewSock); + } + + RETiRet; +} + + +/* This is a helper for submitting the message to the rsyslog core. + * It does some common processing, including resetting the various + * state variables to a "processed" state. + * Note that this function is also called if we had a buffer overflow + * due to a too-long message. So far, there is no indication this + * happened and it may be worth thinking about different handling + * of this case (what obviously would require a change to this + * function or some related code). + * rgerhards, 2009-04-23 + * EXTRACT from tcps_sess.c + */ +static rsRetVal +doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub) +{ + smsg_t *pMsg; + ptcpsrv_t *pSrv; + DEFiRet; + + if(pThis->iMsg == 0) { + DBGPRINTF("discarding zero-sized message\n"); + FINALIZE; + } + pSrv = pThis->pLstn->pSrv; + + /* we now create our own message object and submit it to the queue */ + CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); + MsgSetRawMsg(pMsg, (char*)pThis->pMsg, pThis->iMsg); + MsgSetInputName(pMsg, pSrv->pInputName); + MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); + if(pSrv->dfltTZ != NULL) + MsgSetDfltTZ(pMsg, (char*) pSrv->dfltTZ); + MsgSetFlowControlType(pMsg, pSrv->flowControl + ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY); + pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; + MsgSetRcvFrom(pMsg, pThis->peerName); + CHKiRet(MsgSetRcvFromIP(pMsg, pThis->peerIP)); + MsgSetRuleset(pMsg, pSrv->pRuleset); + STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit); + + ratelimitAddMsg(pSrv->ratelimiter, pMultiSub, pMsg); + +finalize_it: + /* reset status variables */ + pThis->bAtStrtOfFram = 1; + pThis->iMsg = 0; + + RETiRet; +} + + +/* process the data received, special case if the framing is specified via + * a regex. For more info see processDataRcvd(). + */ +static rsRetVal ATTR_NONNULL() +processDataRcvd_regexFraming(ptcpsess_t *const __restrict__ pThis, + char **const buff, + struct syslogTime *const stTime, + const time_t ttGenTime, + multi_submit_t *const pMultiSub, + unsigned *const __restrict__ pnMsgs) +{ + DEFiRet; + const instanceConf_t *const inst = pThis->pLstn->pSrv->inst; + assert(inst->startRegex != NULL); + const char c = **buff; + + pThis->pMsg[pThis->iMsg++] = c; + pThis->pMsg[pThis->iMsg] = '\0'; + + if(pThis->iMsg == 2*iMaxLine) { + LogError(0, RS_RET_OVERSIZE_MSG, "imptcp: more then double max message size (%d) " + "received without finding frame terminator via regex - assuming " + "end of frame now.", pThis->iMsg+1); + doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); + ++(*pnMsgs); + pThis->iMsg = 0; + pThis->iCurrLine = 1; + } + + + if(c == '\n') { + pThis->iCurrLine = pThis->iMsg; + } else { + const int isMatch = !regexec(&inst->start_preg, (char*)pThis->pMsg+pThis->iCurrLine, 0, NULL, 0); + if(isMatch) { + DBGPRINTF("regex match (%d), framing line: %s\n", pThis->iCurrLine, pThis->pMsg); + strcpy((char*)pThis->pMsg_save, (char*) pThis->pMsg+pThis->iCurrLine); + pThis->iMsg = pThis->iCurrLine - 1; + + doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); + ++(*pnMsgs); + + strcpy((char*)pThis->pMsg, (char*)pThis->pMsg_save); + pThis->iMsg = ustrlen(pThis->pMsg_save); + pThis->iCurrLine = 1; + } + } + + RETiRet; +} + + +/* process the data received. As TCP is stream based, we need to process the + * data inside a state machine. The actual data received is passed in byte-by-byte + * from DataRcvd, and this function here compiles messages from them and submits + * the end result to the queue. Introducing this function fixes a long-term bug ;) + * rgerhards, 2008-03-14 + * EXTRACT from tcps_sess.c + */ +static rsRetVal ATTR_NONNULL(1, 2) +processDataRcvd(ptcpsess_t *const __restrict__ pThis, + char **buff, + const int buffLen, + struct syslogTime *stTime, + const time_t ttGenTime, + multi_submit_t *pMultiSub, + unsigned *const __restrict__ pnMsgs) +{ + DEFiRet; + const char c = **buff; + int octetsToCopy, octetsToDiscard; + + if(pThis->startRegex != NULL) { + processDataRcvd_regexFraming(pThis, buff, stTime, ttGenTime, pMultiSub, pnMsgs); + FINALIZE; + } + + if(pThis->inputState == eAtStrtFram) { + if(pThis->bSuppOctetFram && isdigit((int) c)) { + pThis->inputState = eInOctetCnt; + pThis->iOctetsRemain = 0; + pThis->eFraming = TCP_FRAMING_OCTET_COUNTING; + } else if(pThis->bSPFramingFix && c == ' ') { + /* Cisco very occasionally sends a SP after a LF, which + * thrashes framing if not taken special care of. Here, + * we permit space *in front of the next frame* and + * ignore it. + */ + FINALIZE; + } else { + pThis->inputState = eInMsg; + pThis->eFraming = TCP_FRAMING_OCTET_STUFFING; + } + } + + if(pThis->inputState == eInOctetCnt) { + uchar *propPeerName = NULL; + int lenPeerName = 0; + uchar *propPeerIP = NULL; + int lenPeerIP = 0; + if(isdigit(c)) { + if(pThis->iOctetsRemain <= 200000000) { + pThis->iOctetsRemain = pThis->iOctetsRemain * 10 + c - '0'; + } + if(pThis->iMsg < iMaxLine) { + *(pThis->pMsg + pThis->iMsg++) = c; + } + } else { /* done with the octet count, so this must be the SP terminator */ + DBGPRINTF("TCP Message with octet-counter, size %d.\n", pThis->iOctetsRemain); + prop.GetString(pThis->peerName, &propPeerName, &lenPeerName); + prop.GetString(pThis->peerIP, &propPeerIP, &lenPeerIP); + if(c != ' ') { + LogError(0, NO_ERRCODE, "Framing Error in received TCP message " + "from peer: (hostname) %s, (ip) %s: delimiter is not " + "SP but has ASCII value %d.", propPeerName, propPeerIP, c); + } + if(pThis->iOctetsRemain < 1) { + /* TODO: handle the case where the octet count is 0! */ + LogError(0, NO_ERRCODE, "Framing Error in received TCP message" + " from peer: (hostname) %s, (ip) %s: invalid octet count %d.", + propPeerName, propPeerIP, pThis->iOctetsRemain); + pThis->eFraming = TCP_FRAMING_OCTET_STUFFING; + } else if(pThis->iOctetsRemain > iMaxLine) { + /* while we can not do anything against it, we can at least log an indication + * that something went wrong) -- rgerhards, 2008-03-14 + */ + DBGPRINTF("truncating message with %d octets - max msg size is %d\n", + pThis->iOctetsRemain, iMaxLine); + LogError(0, NO_ERRCODE, "received oversize message from peer: " + "(hostname) %s, (ip) %s: size is %d bytes, max msg " + "size is %d, truncating...", propPeerName, propPeerIP, + pThis->iOctetsRemain, iMaxLine); + } + if(pThis->iOctetsRemain > pThis->pLstn->pSrv->maxFrameSize) { + LogError(0, NO_ERRCODE, "Framing Error in received TCP message " + "from peer: (hostname) %s, (ip) %s: frame too large: %d, " + "change to octet stuffing", propPeerName, propPeerIP, + pThis->iOctetsRemain); + pThis->eFraming = TCP_FRAMING_OCTET_STUFFING; + } else { + pThis->iMsg = 0; + } + pThis->inputState = eInMsg; + } + } else if(pThis->inputState == eInMsgTruncation) { + if ((c == '\n') + || ((pThis->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) + && (c == pThis->iAddtlFrameDelim))) { + pThis->inputState = eAtStrtFram; + } + } else { + assert(pThis->inputState == eInMsg); + if (pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { + int iMsg = pThis->iMsg; /* cache value for faster access */ + if(iMsg >= iMaxLine) { + /* emergency, we now need to flush, no matter if we are at end of message or not... */ + int i = 1; + char currBuffChar; + while(i < buffLen && ((currBuffChar = (*buff)[i]) != '\n' + && (pThis->iAddtlFrameDelim == TCPSRV_NO_ADDTL_DELIMITER + || currBuffChar != pThis->iAddtlFrameDelim))) { + i++; + } + LogError(0, NO_ERRCODE, "imptcp %s: message received is at least %d byte larger than " + "max msg size; message will be split starting at: \"%.*s\"\n", + pThis->pLstn->pSrv->pszInputName, i, (i < 32) ? i : 32, *buff); + doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); + iMsg = 0; + ++(*pnMsgs); + if(pThis->pLstn->pSrv->discardTruncatedMsg == 1) { + pThis->inputState = eInMsgTruncation; + } + /* we might think if it is better to ignore the rest of the + * message than to treat it as a new one. Maybe this is a good + * candidate for a configuration parameter... + * rgerhards, 2006-12-04 + */ + } + if ((c == '\n') + || ((pThis->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) + && (c == pThis->iAddtlFrameDelim)) + ) { /* record delimiter? */ + if(pThis->pLstn->pSrv->multiLine) { + if((buffLen == 1) || ((*buff)[1] == '<')) { + doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); + iMsg = 0; /* Reset cached value! */ + ++(*pnMsgs); + pThis->inputState = eAtStrtFram; + } else { + if(iMsg < iMaxLine) { + pThis->pMsg[iMsg++] = c; + } + } + } else { + doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); + iMsg = 0; /* Reset cached value! */ + ++(*pnMsgs); + pThis->inputState = eAtStrtFram; + } + } else { + /* IMPORTANT: here we copy the actual frame content to the message - for BOTH + * framing modes! If we have a message that is larger than the max msg size, + * we truncate it. This is the best we can do in light of what the engine supports. + * -- rgerhards, 2008-03-14 + */ + if(likely(iMsg < iMaxLine)) { + pThis->pMsg[iMsg++] = c; + } + } + pThis->iMsg = iMsg; /* update "real value" with cached one */ + } else { + assert(pThis->eFraming == TCP_FRAMING_OCTET_COUNTING); + octetsToCopy = pThis->iOctetsRemain; + octetsToDiscard = 0; + if (buffLen < octetsToCopy) { + octetsToCopy = buffLen; + } + if (octetsToCopy + pThis->iMsg > iMaxLine) { + octetsToDiscard = octetsToCopy - (iMaxLine - pThis->iMsg); + octetsToCopy = iMaxLine - pThis->iMsg; + } + + memcpy(pThis->pMsg + pThis->iMsg, *buff, octetsToCopy); + pThis->iMsg += octetsToCopy; + pThis->iOctetsRemain -= (octetsToCopy + octetsToDiscard); + *buff += (octetsToCopy + octetsToDiscard - 1); + if (pThis->iOctetsRemain == 0) { + /* we have end of frame! */ + doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); + ++(*pnMsgs); + pThis->inputState = eAtStrtFram; + } + } + + } + +finalize_it: + RETiRet; +} + + +/* Processes the data received via a TCP session. If there + * is no other way to handle it, data is discarded. + * Input parameter data is the data received, iLen is its + * len as returned from recv(). iLen must be 1 or more (that + * is errors must be handled by caller!). iTCPSess must be + * the index of the TCP session that received the data. + * rgerhards 2005-07-04 + * And another change while generalizing. We now return either + * RS_RET_OK, which means the session should be kept open + * or anything else, which means it must be closed. + * rgerhards, 2008-03-01 + * As a performance optimization, we pick up the timestamp here. Acutally, + * this *is* the *correct* reception step for all the data we received, because + * we have just received a bunch of data! -- rgerhards, 2009-06-16 + * EXTRACT from tcps_sess.c + */ +static rsRetVal ATTR_NONNULL(1, 2) +DataRcvdUncompressed(ptcpsess_t *pThis, char *pData, const size_t iLen, struct syslogTime *stTime, time_t ttGenTime) +{ + multi_submit_t multiSub; + smsg_t *pMsgs[CONF_NUM_MULTISUB]; + char *pEnd; + unsigned nMsgs = 0; + DEFiRet; + + assert(iLen > 0); + + if(ttGenTime == 0) + datetime.getCurrTime(stTime, &ttGenTime, TIME_IN_LOCALTIME); + multiSub.ppMsgs = pMsgs; + multiSub.maxElem = CONF_NUM_MULTISUB; + multiSub.nElem = 0; + + /* We now copy the message to the session buffer. */ + pEnd = pData + iLen; /* this is one off, which is intensional */ + + while(pData < pEnd) { + CHKiRet(processDataRcvd(pThis, &pData, pEnd - pData, stTime, ttGenTime, &multiSub, &nMsgs)); + pData++; + } + + iRet = multiSubmitFlush(&multiSub); + + if(runConf->globals.senderKeepTrack) + statsRecordSender(propGetSzStr(pThis->peerName), nMsgs, ttGenTime); + +finalize_it: + RETiRet; +} + +static rsRetVal +DataRcvdCompressed(ptcpsess_t *pThis, char *buf, size_t len) +{ + struct syslogTime stTime; + time_t ttGenTime; + int zRet; /* zlib return state */ + unsigned outavail; + uchar zipBuf[64*1024]; // TODO: alloc on heap, and much larger (512KiB? batch size!) + DEFiRet; + // TODO: can we do stats counters? Even if they are not 100% correct under all cases, + // by simply updating the input and output sizes? + uint64_t outtotal; + + datetime.getCurrTime(&stTime, &ttGenTime, TIME_IN_LOCALTIME); + outtotal = 0; + + if(!pThis->bzInitDone) { + /* allocate deflate state */ + pThis->zstrm.zalloc = Z_NULL; + pThis->zstrm.zfree = Z_NULL; + pThis->zstrm.opaque = Z_NULL; + zRet = inflateInit(&pThis->zstrm); + if(zRet != Z_OK) { + DBGPRINTF("imptcp: error %d returned from zlib/inflateInit()\n", zRet); + ABORT_FINALIZE(RS_RET_ZLIB_ERR); + } + pThis->bzInitDone = RSTRUE; + } + + pThis->zstrm.next_in = (Bytef*) buf; + pThis->zstrm.avail_in = len; + /* run inflate() on buffer until everything has been uncompressed */ + do { + DBGPRINTF("imptcp: in inflate() loop, avail_in %d, total_in %ld\n", + pThis->zstrm.avail_in, pThis->zstrm.total_in); + pThis->zstrm.avail_out = sizeof(zipBuf); + pThis->zstrm.next_out = zipBuf; + zRet = inflate(&pThis->zstrm, Z_SYNC_FLUSH); /* no bad return value */ + //zRet = inflate(&pThis->zstrm, Z_NO_FLUSH); /* no bad return value */ + DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out); + outavail = sizeof(zipBuf) - pThis->zstrm.avail_out; + if(outavail != 0) { + outtotal += outavail; + pThis->pLstn->rcvdDecompressed += outavail; + CHKiRet(DataRcvdUncompressed(pThis, (char*)zipBuf, outavail, &stTime, ttGenTime)); + } + } while (pThis->zstrm.avail_out == 0); + + dbgprintf("end of DataRcvCompress, sizes: in %lld, out %llu\n", (long long) len, + (long long unsigned) outtotal); +finalize_it: + RETiRet; +} + +static rsRetVal +DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) +{ + struct syslogTime stTime; + DEFiRet; + ATOMIC_ADD_uint64(&pThis->pLstn->rcvdBytes, &pThis->pLstn->mut_rcvdBytes, iLen); + if(pThis->compressionMode >= COMPRESS_STREAM_ALWAYS) + iRet = DataRcvdCompressed(pThis, pData, iLen); + else + iRet = DataRcvdUncompressed(pThis, pData, iLen, &stTime, 0); + RETiRet; +} + + +/****************************************** --END-- TCP SUPPORT FUNCTIONS ***********************************/ + + +static void +initConfigSettings(void) +{ + cs.bEmitMsgOnClose = 0; + cs.bEmitMsgOnOpen = 0; + cs.wrkrMax = DFLT_wrkrMax; + cs.bSuppOctetFram = 1; + cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; + cs.maxFrameSize = 200000; + cs.pszInputName = NULL; + cs.pszBindRuleset = NULL; + cs.pszInputName = NULL; + cs.lstnIP = NULL; +} + + +/* add socket to the epoll set + */ +static rsRetVal +addEPollSock(epolld_type_t typ, void *ptr, int sock, epolld_t **pEpd) +{ + DEFiRet; + epolld_t *epd = NULL; + + CHKmalloc(epd = calloc(1, sizeof(epolld_t))); + epd->typ = typ; + epd->ptr = ptr; + epd->sock = sock; + *pEpd = epd; + epd->ev.events = EPOLLIN|EPOLLONESHOT; + epd->ev.data.ptr = (void*) epd; + + if(epoll_ctl(epollfd, EPOLL_CTL_ADD, sock, &(epd->ev)) != 0) { + LogError(errno, RS_RET_EPOLL_CTL_FAILED, "os error during epoll ADD"); + ABORT_FINALIZE(RS_RET_EPOLL_CTL_FAILED); + } + + DBGPRINTF("imptcp: added socket %d to epoll[%d] set\n", sock, epollfd); + +finalize_it: + if(iRet != RS_RET_OK) { + if (epd != NULL) { + LogError(0, RS_RET_INTERNAL_ERROR, "error: could not initialize mutex for ptcp " + "connection for socket: %d", sock); + } + free(epd); + } + RETiRet; +} + + +/* add a listener to the server + */ +static rsRetVal +addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6) +{ + DEFiRet; + ptcplstn_t *pLstn = NULL; + uchar statname[64]; + + CHKmalloc(pLstn = calloc(1, sizeof(ptcplstn_t))); + pLstn->pSrv = pSrv; + pLstn->bSuppOctetFram = pSrv->bSuppOctetFram; + pLstn->bSPFramingFix = pSrv->bSPFramingFix; + pLstn->sock = sock; + /* support statistics gathering */ + uchar *inputname; + if(pSrv->pszInputName == NULL) { + inputname = (uchar*)"imptcp"; + } else { + inputname = pSrv->pszInputName; + } + CHKiRet(statsobj.Construct(&(pLstn->stats))); + snprintf((char*)statname, sizeof(statname), "%s(%s/%s/%s)", inputname, + (pSrv->lstnIP == NULL) ? "*" : (char*)pSrv->lstnIP, pSrv->port, + isIPv6 ? "IPv6" : "IPv4"); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(pLstn->stats, statname)); + CHKiRet(statsobj.SetOrigin(pLstn->stats, (uchar*)"imptcp")); + STATSCOUNTER_INIT(pLstn->ctrSubmit, pLstn->mutCtrSubmit); + CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->ctrSubmit))); + STATSCOUNTER_INIT(pLstn->ctrSessOpen, pLstn->mutCtrSessOpen); + CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("sessions.opened"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->ctrSessOpen))); + STATSCOUNTER_INIT(pLstn->ctrSessOpenErr, pLstn->mutCtrSessOpenErr); + CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("sessions.openfailed"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->ctrSessOpenErr))); + STATSCOUNTER_INIT(pLstn->ctrSessClose, pLstn->mutCtrSessClose); + CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("sessions.closed"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->ctrSessClose))); + /* the following counters are not protected by mutexes; we accept + * that they may not be 100% correct */ + pLstn->rcvdBytes = 0, + pLstn->rcvdDecompressed = 0; + INIT_ATOMIC_HELPER_MUT64(pLstn->mut_rcvdBytes); + CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("bytes.received"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->rcvdBytes))); + CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("bytes.decompressed"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->rcvdDecompressed))); + CHKiRet(statsobj.ConstructFinalize(pLstn->stats)); + + CHKiRet(addEPollSock(epolld_lstn, pLstn, sock, &pLstn->epd)); + + /* add to start of server's listener list */ + pLstn->prev = NULL; + pLstn->next = pSrv->pLstn; + if(pSrv->pLstn != NULL) + pSrv->pLstn->prev = pLstn; + pSrv->pLstn = pLstn; + +finalize_it: + if(iRet != RS_RET_OK) { + if(pLstn != NULL) { + if(pLstn->stats != NULL) + statsobj.Destruct(&(pLstn->stats)); + free(pLstn); + } + } + + RETiRet; +} + + +/* add a session to the server + */ +static rsRetVal +addSess(ptcplstn_t *pLstn, int sock, prop_t *peerName, prop_t *peerIP) +{ + DEFiRet; + ptcpsess_t *pSess = NULL; + ptcpsrv_t *pSrv = pLstn->pSrv; + int pmsg_size_factor; + + CHKmalloc(pSess = malloc(sizeof(ptcpsess_t))); + pSess->next = NULL; + if(pLstn->pSrv->inst->startRegex == NULL) { + pmsg_size_factor = 1; + pSess->pMsg_save = NULL; + } else { + pmsg_size_factor = 2; + pSess->pMsg = NULL; + CHKmalloc(pSess->pMsg_save = malloc(1 + iMaxLine * pmsg_size_factor)); + } + CHKmalloc(pSess->pMsg = malloc(1 + iMaxLine * pmsg_size_factor)); + pSess->pLstn = pLstn; + pSess->sock = sock; + pSess->bSuppOctetFram = pLstn->bSuppOctetFram; + pSess->bSPFramingFix = pLstn->bSPFramingFix; + pSess->inputState = eAtStrtFram; + pSess->iMsg = 0; + pSess->iCurrLine = 1; + pSess->bzInitDone = 0; + pSess->bAtStrtOfFram = 1; + pSess->peerName = peerName; + pSess->peerIP = peerIP; + pSess->compressionMode = pLstn->pSrv->compressionMode; + pSess->startRegex = pLstn->pSrv->inst->startRegex; + pSess->iAddtlFrameDelim = pLstn->pSrv->iAddtlFrameDelim; + + /* add to start of server's listener list */ + pSess->prev = NULL; + + pthread_mutex_lock(&pSrv->mutSessLst); + int iTCPSessMax = pSrv->inst->iTCPSessMax; + if (iTCPSessMax > 0 && pSrv->iTCPSessCnt >= iTCPSessMax) { + pthread_mutex_unlock(&pSrv->mutSessLst); + LogError(0, RS_RET_MAX_SESS_REACHED, + "too many tcp sessions - dropping incoming request"); + ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED); + } + + pSrv->iTCPSessCnt++; + pSess->next = pSrv->pSess; + if(pSrv->pSess != NULL) + pSrv->pSess->prev = pSess; + pSrv->pSess = pSess; + pthread_mutex_unlock(&pSrv->mutSessLst); + + CHKiRet(addEPollSock(epolld_sess, pSess, sock, &pSess->epd)); + +finalize_it: + if(iRet != RS_RET_OK) { + if(pSess != NULL) { + if (pSess->next != NULL) { + unlinkSess(pSess); + } + free(pSess->pMsg_save); + free(pSess->pMsg); + free(pSess); + } + } + + RETiRet; +} + + +/* finish zlib buffer, to be called before closing the session. + */ +static rsRetVal +doZipFinish(ptcpsess_t *pSess) +{ + int zRet; /* zlib return state */ + DEFiRet; + unsigned outavail; + struct syslogTime stTime; + uchar zipBuf[32*1024]; // TODO: use "global" one from pSess + + if(!pSess->bzInitDone) + goto done; + + pSess->zstrm.avail_in = 0; + /* run inflate() on buffer until everything has been compressed */ + do { + DBGPRINTF("doZipFinish: in inflate() loop, avail_in %d, total_in %ld\n", pSess->zstrm.avail_in, + pSess->zstrm.total_in); + pSess->zstrm.avail_out = sizeof(zipBuf); + pSess->zstrm.next_out = zipBuf; + zRet = inflate(&pSess->zstrm, Z_FINISH); /* no bad return value */ + DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pSess->zstrm.avail_out); + outavail = sizeof(zipBuf) - pSess->zstrm.avail_out; + if(outavail != 0) { + pSess->pLstn->rcvdDecompressed += outavail; + CHKiRet(DataRcvdUncompressed(pSess, (char*)zipBuf, outavail, &stTime, 0)); + // TODO: query time! + } + } while (pSess->zstrm.avail_out == 0); + +finalize_it: + zRet = inflateEnd(&pSess->zstrm); + if(zRet != Z_OK) { + DBGPRINTF("imptcp: error %d returned from zlib/inflateEnd()\n", zRet); + } + + pSess->bzInitDone = 0; +done: RETiRet; +} + +/* close/remove a session + * NOTE: we do not need to remove the socket from the epoll set, as according + * to the epoll man page it is automatically removed on close (Q6). The only + * exception is duplicated file handles, which we do not create. + */ +static rsRetVal +closeSess(ptcpsess_t *pSess) +{ + DEFiRet; + + if(pSess->compressionMode >= COMPRESS_STREAM_ALWAYS) + doZipFinish(pSess); + + const int sock = pSess->sock; + close(sock); + + unlinkSess(pSess); + + if(pSess->pLstn->pSrv->bEmitMsgOnClose) { + LogMsg(0, RS_RET_NO_ERRCODE, LOG_INFO, "imptcp: session on socket %d closed " + "with iRet %d.\n", sock, iRet); + } + STATSCOUNTER_INC(pSess->pLstn->ctrSessClose, pSess->pLstn->mutCtrSessClose); + + /* unlinked, now remove structure */ + destructSess(pSess); + + DBGPRINTF("imptcp: session on socket %d closed with iRet %d.\n", sock, iRet); + RETiRet; +} + + +/* create input instance, set default parameters, and + * add it to the list of instances. + */ +static rsRetVal +createInstance(instanceConf_t **pinst) +{ + instanceConf_t *inst; + DEFiRet; + CHKmalloc(inst = malloc(sizeof(instanceConf_t))); + inst->next = NULL; + + inst->pszBindPort = NULL; + inst->pszBindAddr = NULL; + inst->pszBindPath = NULL; + inst->fileUID = -1; + inst->fileGID = -1; + inst->maxFrameSize = 200000; + inst->fCreateMode = 0644; + inst->bFailOnPerms = 1; + inst->bUnlink = 0; + inst->discardTruncatedMsg = 0; + inst->flowControl = 1; + inst->pszBindRuleset = NULL; + inst->pszInputName = NULL; + inst->bSuppOctetFram = 1; + inst->bSPFramingFix = 0; + inst->bKeepAlive = 0; + inst->iKeepAliveIntvl = 0; + inst->iKeepAliveProbes = 0; + inst->iKeepAliveTime = 0; + inst->bEmitMsgOnClose = 0; + inst->bEmitMsgOnOpen = 0; + inst->dfltTZ = NULL; + inst->iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; + inst->startRegex = NULL; + inst->pBindRuleset = NULL; + inst->ratelimitBurst = 10000; /* arbitrary high limit */ + inst->ratelimitInterval = 0; /* off */ + inst->compressionMode = COMPRESS_SINGLE_MSG; + inst->multiLine = 0; + inst->socketBacklog = 5; + inst->pszLstnPortFileName = NULL; + inst->iTCPSessMax = -1; + + /* node created, let's add to config */ + if(loadModConf->tail == NULL) { + loadModConf->tail = loadModConf->root = inst; + } else { + loadModConf->tail->next = inst; + loadModConf->tail = inst; + } + + *pinst = inst; +finalize_it: + RETiRet; +} + + +/* This function is called when a new listener instace shall be added to + * the current config object via the legacy config system. It just shuffles + * all parameters to the listener in-memory instance. + */ +static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *const pNewVal) +{ + instanceConf_t *inst; + DEFiRet; + + if(pNewVal == NULL || *pNewVal == '\0') { + parser_errmsg("imptcp: port number must be specified, listener ignored"); + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + } + + /* if we reach this point, a valid port is given in pNewVal */ + CHKiRet(createInstance(&inst)); + CHKmalloc(inst->pszBindPort = ustrdup(pNewVal)); + if((cs.lstnIP == NULL) || (cs.lstnIP[0] == '\0')) { + inst->pszBindAddr = NULL; + } else { + CHKmalloc(inst->pszBindAddr = ustrdup(cs.lstnIP)); + } + if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) { + inst->pszBindRuleset = NULL; + } else { + CHKmalloc(inst->pszBindRuleset = ustrdup(cs.pszBindRuleset)); + } + if((cs.pszInputName == NULL) || (cs.pszInputName[0] == '\0')) { + inst->pszInputName = NULL; + } else { + CHKmalloc(inst->pszInputName = ustrdup(cs.pszInputName)); + } + inst->pBindRuleset = NULL; + inst->bSuppOctetFram = cs.bSuppOctetFram; + inst->bKeepAlive = cs.bKeepAlive; + inst->iKeepAliveIntvl = cs.iKeepAliveIntvl; + inst->iKeepAliveProbes = cs.iKeepAliveProbes; + inst->iKeepAliveTime = cs.iKeepAliveTime; + inst->bEmitMsgOnClose = cs.bEmitMsgOnClose; + inst->bEmitMsgOnOpen = cs.bEmitMsgOnOpen; + inst->iAddtlFrameDelim = cs.iAddtlFrameDelim; + inst->maxFrameSize = cs.maxFrameSize; + inst->iTCPSessMax = cs.iTCPSessMax; + +finalize_it: + free(pNewVal); + RETiRet; +} + + +static rsRetVal +addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) +{ + DEFiRet; + ptcpsrv_t *pSrv = NULL; + + CHKmalloc(pSrv = calloc(1, sizeof(ptcpsrv_t))); + pthread_mutex_init(&pSrv->mutSessLst, NULL); + pSrv->ratelimiter = NULL; + pSrv->pSess = NULL; + pSrv->pLstn = NULL; + pSrv->inst = inst; + pSrv->bSuppOctetFram = inst->bSuppOctetFram; + pSrv->bSPFramingFix = inst->bSPFramingFix; + pSrv->bKeepAlive = inst->bKeepAlive; + pSrv->iKeepAliveIntvl = inst->iKeepAliveIntvl; + pSrv->iKeepAliveProbes = inst->iKeepAliveProbes; + pSrv->iKeepAliveTime = inst->iKeepAliveTime; + pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose; + pSrv->bEmitMsgOnOpen = inst->bEmitMsgOnOpen; + pSrv->compressionMode = inst->compressionMode; + pSrv->dfltTZ = inst->dfltTZ; + if (inst->pszBindPort != NULL) { + CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort)); + } + pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim; + pSrv->multiLine = inst->multiLine; + pSrv->socketBacklog = inst->socketBacklog; + pSrv->pszLstnPortFileName = inst->pszLstnPortFileName; + pSrv->maxFrameSize = inst->maxFrameSize; + if (inst->pszBindAddr == NULL) { + pSrv->lstnIP = NULL; + } else { + CHKmalloc(pSrv->lstnIP = ustrdup(inst->pszBindAddr)); + } + if (inst->pszBindPath == NULL) { + pSrv->path = NULL; + } else { + CHKmalloc(pSrv->path = ustrdup(inst->pszBindPath)); + CHKmalloc(pSrv->port = ustrdup(inst->pszBindPath)); + pSrv->bUnixSocket = 1; + pSrv->fCreateMode = inst->fCreateMode; + pSrv->fileUID = inst->fileUID; + pSrv->fileGID = inst->fileGID; + pSrv->bFailOnPerms = inst->bFailOnPerms; + } + + pSrv->bUnlink = inst->bUnlink; + pSrv->discardTruncatedMsg = inst->discardTruncatedMsg; + pSrv->flowControl = inst->flowControl; + pSrv->pRuleset = inst->pBindRuleset; + pSrv->pszInputName = ustrdup((inst->pszInputName == NULL) ? UCHAR_CONSTANT("imptcp") : inst->pszInputName); + pSrv->iTCPSessMax = inst->iTCPSessMax; + CHKiRet(prop.Construct(&pSrv->pInputName)); + CHKiRet(prop.SetString(pSrv->pInputName, pSrv->pszInputName, ustrlen(pSrv->pszInputName))); + CHKiRet(prop.ConstructFinalize(pSrv->pInputName)); + + CHKiRet(ratelimitNew(&pSrv->ratelimiter, "imptcp", (char*) pSrv->port)); + ratelimitSetLinuxLike(pSrv->ratelimiter, inst->ratelimitInterval, inst->ratelimitBurst); + ratelimitSetThreadSafe(pSrv->ratelimiter); + /* add to linked list */ + pSrv->pNext = pSrvRoot; + pSrvRoot = pSrv; + + /* all config vars are auto-reset -- this also is very useful with the + * new config format effort (v6). + */ + resetConfigVariables(NULL, NULL); + +finalize_it: + if(iRet != RS_RET_OK) { + LogError(0, NO_ERRCODE, "error %d trying to add listener", iRet); + if(pSrv != NULL) { + destructSrv(pSrv); + } + } + RETiRet; +} + + +/* destroy worker pool structures and wait for workers to terminate + */ +static void +startWorkerPool(void) +{ + int i; + pthread_mutex_lock(&io_q.mut); /* locking to keep Coverity happy */ + wrkrRunning = 0; + pthread_mutex_unlock(&io_q.mut); + DBGPRINTF("imptcp: starting worker pool, %d workers\n", runModConf->wrkrMax); + wrkrInfo = calloc(runModConf->wrkrMax, sizeof(struct wrkrInfo_s)); + if (wrkrInfo == NULL) { + LogError(errno, RS_RET_OUT_OF_MEMORY, "imptcp: worker-info array allocation failed."); + return; + } + for(i = 0 ; i < runModConf->wrkrMax ; ++i) { + /* init worker info structure! */ + wrkrInfo[i].wrkrIdx = i; + wrkrInfo[i].numCalled = 0; + pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i])); + } + +} + +/* destroy worker pool structures and wait for workers to terminate + */ +static void +stopWorkerPool(void) +{ + int i; + DBGPRINTF("imptcp: stopping worker pool\n"); + pthread_mutex_lock(&io_q.mut); + pthread_cond_broadcast(&io_q.wakeup_worker); /* awake wrkr if not running */ + pthread_mutex_unlock(&io_q.mut); + for(i = 0 ; i < runModConf->wrkrMax ; ++i) { + pthread_join(wrkrInfo[i].tid, NULL); + DBGPRINTF("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); + } + free(wrkrInfo); +} + + + +/* start up all listeners + * This is a one-time stop once the module is set to start. + */ +static rsRetVal +startupServers(void) +{ + DEFiRet; + rsRetVal localRet, lastErr; + int iOK; + int iAll; + ptcpsrv_t *pSrv; + + iAll = iOK = 0; + lastErr = RS_RET_ERR; + pSrv = pSrvRoot; + while(pSrv != NULL) { + DBGPRINTF("imptcp: starting up server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName); + localRet = startupSrv(pSrv); + if(localRet == RS_RET_OK) + iOK++; + else + lastErr = localRet; + ++iAll; + pSrv = pSrv->pNext; + } + + DBGPRINTF("imptcp: %d out of %d servers started successfully\n", iOK, iAll); + if(iOK == 0) /* iff all fails, we report an error */ + iRet = lastErr; + + RETiRet; +} + +/* process new activity on listener. This means we need to accept a new + * connection. + */ +static rsRetVal ATTR_NONNULL() +lstnActivity(ptcplstn_t *const pLstn) +{ + int newSock = -1; + prop_t *peerName; + prop_t *peerIP; + rsRetVal localRet; + DEFiRet; + + DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock); + while(glbl.GetGlobalInputTermState() == 0) { + localRet = AcceptConnReq(pLstn, &newSock, &peerName, &peerIP); + DBGPRINTF("imptcp: AcceptConnReq on listen socket %d returned %d\n", pLstn->sock, localRet); + if(localRet == RS_RET_NO_MORE_DATA || glbl.GetGlobalInputTermState() == 1) { + break; + } + CHKiRet(localRet); + localRet = addSess(pLstn, newSock, peerName, peerIP); + if(localRet != RS_RET_OK) { + close(newSock); + prop.Destruct(&peerName); + prop.Destruct(&peerIP); + ABORT_FINALIZE(localRet); + } + } + +finalize_it: + RETiRet; +} + +/* process new activity on session. This means we need to accept data + * or close the session. + */ +static rsRetVal +sessActivity(ptcpsess_t *const pSess, int *const continue_polling) +{ + int lenRcv; + int lenBuf; + uchar *peerName; + int lenPeer; + int remsock = 0; /* init just to keep compiler happy... :-( */ + sbool bEmitOnClose = 0; + char rcvBuf[128*1024]; + int runs = 0; + DEFiRet; + + DBGPRINTF("imptcp: new activity on session socket %d\n", pSess->sock); + + while(runs++ < 16) { + lenBuf = sizeof(rcvBuf); + lenRcv = recv(pSess->sock, rcvBuf, lenBuf, 0); + + if(lenRcv > 0) { + /* have data, process it */ + DBGPRINTF("imptcp: data(%d) on socket %d: %s\n", lenBuf, pSess->sock, rcvBuf); + CHKiRet(DataRcvd(pSess, rcvBuf, lenRcv)); + } else if (lenRcv == 0) { + /* session was closed, do clean-up */ + if(pSess->pLstn->pSrv->bEmitMsgOnClose) { + prop.GetString(pSess->peerName, &peerName, &lenPeer), + remsock = pSess->sock; + bEmitOnClose = 1; + } + *continue_polling = 0; + if(bEmitOnClose) { + LogError(0, RS_RET_PEER_CLOSED_CONN, "imptcp session %d closed by " + "remote peer %s.", remsock, peerName); + } + CHKiRet(closeSess(pSess)); /* close may emit more messages in strmzip mode! */ + break; + } else { + if(CHK_EAGAIN_EWOULDBLOCK) + break; + DBGPRINTF("imptcp: error on session socket %d - closed.\n", pSess->sock); + *continue_polling = 0; + closeSess(pSess); /* try clean-up by dropping session */ + break; + } + } + +finalize_it: + RETiRet; +} + + +/* This function is called to process a single request. This may + * be carried out by the main worker or a helper. It can be run + * concurrently. + */ +static void +processWorkItem(epolld_t *epd) +{ + + int continue_polling = 1; + + switch(epd->typ) { + case epolld_lstn: + /* listener never stops polling (except server shutdown) */ + lstnActivity((ptcplstn_t *) epd->ptr); + break; + case epolld_sess: + sessActivity((ptcpsess_t *) epd->ptr, &continue_polling); + break; + default: + LogError(0, RS_RET_INTERNAL_ERROR, "error: invalid epolld_type_t %d after epoll", epd->typ); + break; + } + if (continue_polling == 1) { + epoll_ctl(epollfd, EPOLL_CTL_MOD, epd->sock, &(epd->ev)); + } +} + + +static rsRetVal +initIoQ(void) +{ + DEFiRet; + CHKiConcCtrl(pthread_mutex_init(&io_q.mut, NULL)); + CHKiConcCtrl(pthread_cond_init(&io_q.wakeup_worker, NULL)); + STAILQ_INIT(&io_q.q); + io_q.sz = 0; + io_q.ctrMaxSz = 0; /* TODO: discuss this and fix potential concurrent read/write issues */ + CHKiRet(statsobj.Construct(&io_q.stats)); + CHKiRet(statsobj.SetName(io_q.stats, (uchar*) "io-work-q")); + CHKiRet(statsobj.SetOrigin(io_q.stats, (uchar*) "imptcp")); + STATSCOUNTER_INIT(io_q.ctrEnq, io_q.mutCtrEnq); + CHKiRet(statsobj.AddCounter(io_q.stats, UCHAR_CONSTANT("enqueued"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &io_q.ctrEnq)); + CHKiRet(statsobj.AddCounter(io_q.stats, UCHAR_CONSTANT("maxqsize"), + ctrType_Int, CTR_FLAG_NONE, &io_q.ctrMaxSz)); + CHKiRet(statsobj.ConstructFinalize(io_q.stats)); +finalize_it: + RETiRet; +} + +static void +destroyIoQ(void) +{ + io_req_t *n; + if (io_q.stats != NULL) { + statsobj.Destruct(&io_q.stats); + } + pthread_mutex_lock(&io_q.mut); + while (!STAILQ_EMPTY(&io_q.q)) { + n = STAILQ_FIRST(&io_q.q); + STAILQ_REMOVE_HEAD(&io_q.q, link); + LogError(0, RS_RET_INTERNAL_ERROR, "imptcp: discarded enqueued io-work to allow shutdown " + "- ignored"); + free(n); + } + io_q.sz = 0; + pthread_mutex_unlock(&io_q.mut); + pthread_cond_destroy(&io_q.wakeup_worker); + pthread_mutex_destroy(&io_q.mut); +} + +static rsRetVal +enqueueIoWork(epolld_t *epd, int dispatchInlineIfQueueFull) { + io_req_t *n; + int dispatchInline; + int inlineDispatchThreshold; + DEFiRet; + + CHKmalloc(n = malloc(sizeof(io_req_t))); + n->epd = epd; + + inlineDispatchThreshold = DFLT_inlineDispatchThreshold * runModConf->wrkrMax; + dispatchInline = 0; + + pthread_mutex_lock(&io_q.mut); + if (dispatchInlineIfQueueFull && io_q.sz > inlineDispatchThreshold) { + dispatchInline = 1; + } else { + STAILQ_INSERT_TAIL(&io_q.q, n, link); + io_q.sz++; + STATSCOUNTER_INC(io_q.ctrEnq, io_q.mutCtrEnq); + STATSCOUNTER_SETMAX_NOMUT(io_q.ctrMaxSz, io_q.sz); + pthread_cond_signal(&io_q.wakeup_worker); + } + pthread_mutex_unlock(&io_q.mut); + + if (dispatchInline == 1) { + free(n); + processWorkItem(epd); + } +finalize_it: + if (iRet != RS_RET_OK) { + if (n == NULL) { + LogError(0, iRet, "imptcp: couldn't allocate memory to enqueue io-request - ignored"); + } + } + RETiRet; +} + +/* This function is called to process a complete workset, that + * is a set of events returned from epoll. + */ +static void +processWorkSet(int nEvents, struct epoll_event events[]) +{ + int iEvt; + int remainEvents; + remainEvents = nEvents; + epolld_t *epd; + + for(iEvt = 0 ; (iEvt < nEvents) && (glbl.GetGlobalInputTermState() == 0) ; ++iEvt) { + epd = (epolld_t*)events[iEvt].data.ptr; + if(runModConf->bProcessOnPoller && remainEvents == 1) { + /* process self, save context switch */ + processWorkItem(epd); + } else { + enqueueIoWork(epd, runModConf->bProcessOnPoller); + } + --remainEvents; + } +} + + +/* worker to process incoming requests + */ +static void * +wrkr(void *myself) +{ + struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself; + pthread_mutex_lock(&io_q.mut); + ++wrkrRunning; + pthread_mutex_unlock(&io_q.mut); + + uchar thrdName[32]; + snprintf((char*)thrdName, sizeof(thrdName), "imptcp/w%d", me->wrkrIdx); +# if defined(HAVE_PRCTL) && defined(PR_SET_NAME) + /* set thread name - we ignore if the call fails, has no harsh consequences... */ + if(prctl(PR_SET_NAME, thrdName, 0, 0, 0) != 0) { + DBGPRINTF("prctl failed, not setting thread name for '%s'\n", thrdName); + } +# endif + + io_req_t *n; + while(1) { + n = NULL; + pthread_mutex_lock(&io_q.mut); + if (io_q.sz == 0) { + --wrkrRunning; + if (glbl.GetGlobalInputTermState() != 0) { + pthread_mutex_unlock(&io_q.mut); + break; + } else { + DBGPRINTF("imptcp: worker %llu waiting on new work items\n", + (unsigned long long) me->tid); + pthread_cond_wait(&io_q.wakeup_worker, &io_q.mut); + DBGPRINTF("imptcp: worker %llu awoken\n", (unsigned long long) me->tid); + } + ++wrkrRunning; + } + if (io_q.sz > 0) { + n = STAILQ_FIRST(&io_q.q); + STAILQ_REMOVE_HEAD(&io_q.q, link); + io_q.sz--; + } + pthread_mutex_unlock(&io_q.mut); + + if (n != NULL) { + ++me->numCalled; + processWorkItem(n->epd); + free(n); + } + } + return NULL; +} + + +BEGINnewInpInst + struct cnfparamvals *pvals; + instanceConf_t *inst; + char *cstr; + int i; +CODESTARTnewInpInst + DBGPRINTF("newInpInst (imptcp)\n"); + + if((pvals = nvlstGetParams(lst, &inppblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("input param blk in imptcp:\n"); + cnfparamsPrint(&inppblk, pvals); + } + + CHKiRet(createInstance(&inst)); + + for(i = 0 ; i < inppblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(inppblk.descr[i].name, "port")) { + inst->pszBindPort = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "address")) { + inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "path")) { + inst->pszBindPath = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "unlink")) { + inst->bUnlink = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "discardtruncatedmsg")) { + inst->discardTruncatedMsg = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "flowcontrol")) { + inst->flowControl = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "fileowner")) { + inst->fileUID = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "fileownernum")) { + inst->fileUID = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "filegroup")) { + inst->fileGID = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "filegroupnum")) { + inst->fileGID = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "filecreatemode")) { + inst->fCreateMode = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "failonpermsfailure")) { + inst->bFailOnPerms = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "name")) { + inst->pszInputName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "maxframesize")) { + const int max = (int) pvals[i].val.d.n; + if(max <= 200000000) { + inst->maxFrameSize = max; + } else { + parser_errmsg("imptcp: invalid value for 'maxFrameSize' " + "parameter given is %d, max is 200000000", max); + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + } + } else if(!strcmp(inppblk.descr[i].name, "framing.delimiter.regex")) { + inst->startRegex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "ruleset")) { + inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "supportoctetcountedframing")) { + inst->bSuppOctetFram = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "framingfix.cisco.asa")) { + inst->bSPFramingFix = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "compression.mode")) { + cstr = es_str2cstr(pvals[i].val.d.estr, NULL); + if(!strcasecmp(cstr, "stream:always")) { + inst->compressionMode = COMPRESS_STREAM_ALWAYS; + } else if(!strcasecmp(cstr, "none")) { + inst->compressionMode = COMPRESS_NEVER; + } else { + parser_errmsg("imptcp: invalid value for 'compression.mode' " + "parameter (given is '%s')", cstr); + free(cstr); + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + } + free(cstr); + } else if(!strcmp(inppblk.descr[i].name, "maxsessions")) { + inst->iTCPSessMax = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "keepalive")) { + inst->bKeepAlive = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "keepalive.probes")) { + inst->iKeepAliveProbes = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "keepalive.time")) { + inst->iKeepAliveTime = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "keepalive.interval")) { + inst->iKeepAliveIntvl = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "addtlframedelimiter")) { + inst->iAddtlFrameDelim = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "notifyonconnectionclose")) { + inst->bEmitMsgOnClose = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "notifyonconnectionopen")) { + inst->bEmitMsgOnOpen = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "defaulttz")) { + inst->dfltTZ = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) { + inst->ratelimitBurst = (unsigned int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) { + inst->ratelimitInterval = (unsigned int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "multiline")) { + inst->multiLine = (sbool) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "listenportfilename")) { + inst->pszLstnPortFileName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "socketbacklog")) { + inst->socketBacklog = (int) pvals[i].val.d.n; + } else { + dbgprintf("imptcp: program error, non-handled " + "param '%s'\n", inppblk.descr[i].name); + } + + char *bindPort = (char *) inst->pszBindPort; + char *bindPath = (char *) inst->pszBindPath; + if ((bindPort == NULL || strlen(bindPort) < 1) && (bindPath == NULL || strlen (bindPath) < 1)) { + parser_errmsg("imptcp: Must have either port or path defined"); + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + } + } + + if(inst->startRegex != NULL) { + const int errcode = regcomp(&inst->start_preg, (char*)inst->startRegex, REG_EXTENDED); + if(errcode != 0) { + char errbuff[512]; + regerror(errcode, &inst->start_preg, errbuff, sizeof(errbuff)); + parser_errmsg("imptcp: error in framing.delimiter.regex expansion: %s", errbuff); + ABORT_FINALIZE(RS_RET_ERR); + } + } + + if (inst->iTCPSessMax == -1) { + inst->iTCPSessMax = loadModConf->iTCPSessMax; + } +finalize_it: +CODE_STD_FINALIZERnewInpInst + cnfparamvalsDestruct(pvals, &inppblk); +ENDnewInpInst + + +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; + /* init our settings */ + loadModConf->wrkrMax = DFLT_wrkrMax; + loadModConf->bProcessOnPoller = 1; + loadModConf->configSetViaV2Method = 0; + bLegacyCnfModGlobalsPermitted = 1; + /* init legacy config vars */ + initConfigSettings(); +ENDbeginCnfLoad + + +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + LogError(0, RS_RET_MISSING_CNFPARAMS, "imptcp: error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for imptcp:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(modpblk.descr[i].name, "threads")) { + loadModConf->wrkrMax = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "maxsessions")) { + loadModConf->iTCPSessMax = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "processOnPoller")) { + loadModConf->bProcessOnPoller = (int) pvals[i].val.d.n; + } else { + dbgprintf("imptcp: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + /* remove all of our legacy handlers, as they can not used in addition + * the the new-style config method. + */ + bLegacyCnfModGlobalsPermitted = 0; + loadModConf->configSetViaV2Method = 1; + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + +BEGINendCnfLoad +CODESTARTendCnfLoad + if(!loadModConf->configSetViaV2Method) { + /* persist module-specific settings from legacy config system */ + loadModConf->wrkrMax = cs.wrkrMax; + } + + loadModConf = NULL; /* done loading */ + /* free legacy config vars */ + free(cs.pszInputName); + free(cs.lstnIP); + cs.pszInputName = NULL; + cs.lstnIP = NULL; +ENDendCnfLoad + + +/* function to generate error message if framework does not find requested ruleset */ +static inline void +std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, instanceConf_t *inst) +{ + LogError(0, NO_ERRCODE, "imptcp: ruleset '%s' for port %s not found - " + "using default ruleset instead", inst->pszBindRuleset, + inst->pszBindPort); +} +BEGINcheckCnf + instanceConf_t *inst; +CODESTARTcheckCnf + for(inst = pModConf->root ; inst != NULL ; inst = inst->next) { + std_checkRuleset(pModConf, inst); + } +ENDcheckCnf + + +BEGINactivateCnfPrePrivDrop + instanceConf_t *inst; +CODESTARTactivateCnfPrePrivDrop + iMaxLine = glbl.GetMaxLine(runConf); /* get maximum size we currently support */ + DBGPRINTF("imptcp: config params iMaxLine %d\n", iMaxLine); + + runModConf = pModConf; + for(inst = runModConf->root ; inst != NULL ; inst = inst->next) { + addListner(pModConf, inst); + } + if(pSrvRoot == NULL) { + LogError(0, RS_RET_NO_LSTN_DEFINED, "imptcp: no ptcp server defined, module can not run."); + ABORT_FINALIZE(RS_RET_NO_RUN); + } + +# if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + DBGPRINTF("imptcp uses epoll_create1()\n"); + epollfd = epoll_create1(EPOLL_CLOEXEC); + if(epollfd < 0 && errno == ENOSYS) +# endif + { + DBGPRINTF("imptcp uses epoll_create()\n"); + /* reading the docs, the number of epoll events passed to + * epoll_create() seems not to be used at all in kernels. So + * we just provide "a" number, happens to be 10. + */ + epollfd = epoll_create(10); + } + + if(epollfd < 0) { + LogError(0, RS_RET_EPOLL_CR_FAILED, "error: epoll_create() failed"); + ABORT_FINALIZE(RS_RET_NO_RUN); + } + + /* start up servers, but do not yet read input data */ + CHKiRet(startupServers()); + DBGPRINTF("imptcp started up, but not yet receiving data\n"); +finalize_it: +ENDactivateCnfPrePrivDrop + + +BEGINactivateCnf +CODESTARTactivateCnf + /* nothing to do, all done pre priv drop */ +ENDactivateCnf + + +BEGINfreeCnf + instanceConf_t *inst, *del; +CODESTARTfreeCnf + for(inst = pModConf->root ; inst != NULL ; ) { + free(inst->pszBindPort); + free(inst->pszBindPath); + free(inst->pszBindAddr); + free(inst->pszBindRuleset); + free(inst->pszInputName); + free(inst->dfltTZ); + if(inst->startRegex != NULL) { + regfree(&inst->start_preg); + free(inst->startRegex); + } + del = inst; + inst = inst->next; + free(del); + } +ENDfreeCnf + + +/* This function is called to gather input. + */ +BEGINrunInput + int nEvents; + struct epoll_event events[128]; +CODESTARTrunInput + initIoQ(); + startWorkerPool(); + DBGPRINTF("imptcp: now beginning to process input data\n"); + while(glbl.GetGlobalInputTermState() == 0) { + DBGPRINTF("imptcp going on epoll_wait\n"); + nEvents = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1); + DBGPRINTF("imptcp: epoll returned %d events\n", nEvents); + processWorkSet(nEvents, events); + } + DBGPRINTF("imptcp: successfully terminated\n"); + /* we stop the worker pool in AfterRun, in case we get cancelled for some reason (old Interface) */ +ENDrunInput + + +/* initialize and return if will run or not */ +BEGINwillRun +CODESTARTwillRun +ENDwillRun + + +/* completely shut down a server, that means closing all of its + * listeners and sessions. + */ +static void +shutdownSrv(ptcpsrv_t *pSrv) +{ + ptcplstn_t *pLstn, *lstnDel; + ptcpsess_t *pSess, *sessDel; + + /* listeners */ + pLstn = pSrv->pLstn; + while(pLstn != NULL) { + close(pLstn->sock); + statsobj.Destruct(&(pLstn->stats)); + /* now unlink listner */ + lstnDel = pLstn; + pLstn = pLstn->next; + DBGPRINTF("imptcp shutdown listen socket %d (rcvd %lld bytes, " + "decompressed %lld)\n", lstnDel->sock, lstnDel->rcvdBytes, + lstnDel->rcvdDecompressed); + free(lstnDel->epd); + free(lstnDel); + } + + if (pSrv->bUnixSocket && pSrv->bUnlink) { + unlink((char*) pSrv->path); + } + + /* sessions */ + pSess = pSrv->pSess; + while(pSess != NULL) { + close(pSess->sock); + sessDel = pSess; + pSess = pSess->next; + DBGPRINTF("imptcp shutdown session socket %d\n", sessDel->sock); + destructSess(sessDel); + } +} + + +BEGINafterRun + ptcpsrv_t *pSrv, *srvDel; +CODESTARTafterRun + stopWorkerPool(); + destroyIoQ(); + + /* we need to close everything that is still open */ + pSrv = pSrvRoot; + while(pSrv != NULL) { + srvDel = pSrv; + pSrv = pSrv->pNext; + shutdownSrv(srvDel); + destructSrv(srvDel); + } + + close(epollfd); +ENDafterRun + + +BEGINmodExit +CODESTARTmodExit + pthread_attr_destroy(&wrkrThrdAttr); + /* release objects we used */ + objRelease(glbl, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); + objRelease(net, LM_NET_FILENAME); + objRelease(datetime, CORE_COMPONENT); + objRelease(ruleset, CORE_COMPONENT); +ENDmodExit + + +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + cs.bEmitMsgOnClose = 0; + cs.bEmitMsgOnOpen = 0; + cs.wrkrMax = DFLT_wrkrMax; + cs.bKeepAlive = 0; + cs.iKeepAliveProbes = 0; + cs.iKeepAliveTime = 0; + cs.iKeepAliveIntvl = 0; + cs.bSuppOctetFram = 1; + cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; + cs.maxFrameSize = 200000; + free(cs.pszInputName); + cs.pszInputName = NULL; + free(cs.lstnIP); + cs.lstnIP = NULL; + return RS_RET_OK; +} + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES +CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES +CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + /* request objects we use */ + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(net, LM_NET_FILENAME)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); + CHKiRet(objUse(ruleset, CORE_COMPONENT)); + + /* initialize "read-only" thread attributes */ + pthread_attr_init(&wrkrThrdAttr); + pthread_attr_setstacksize(&wrkrThrdAttr, 4096*1024); + + /* init legacy config settings */ + initConfigSettings(); + + /* register config file handlers */ + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord, + addInstance, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive"), 0, eCmdHdlrBinary, + NULL, &cs.bKeepAlive, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_probes"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveProbes, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_time"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveTime, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_intvl"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveIntvl, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserversupportoctetcountedframing"), 0, eCmdHdlrBinary, + NULL, &cs.bSuppOctetFram, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpservernotifyonconnectionclose"), 0, + eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt, + NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0, + eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0, + eCmdHdlrGetWord, NULL, &cs.lstnIP, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverbindruleset"), 0, + eCmdHdlrGetWord, NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID)); + /* module-global parameters */ + CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt, + NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler, + resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); +ENDmodInit + + +/* vim:set ai: + */ |