diff options
Diffstat (limited to '')
-rw-r--r-- | plugins/omkafka/Makefile.am | 18 | ||||
-rw-r--r-- | plugins/omkafka/Makefile.in | 815 | ||||
-rw-r--r-- | plugins/omkafka/dummy.c | 171 | ||||
-rw-r--r-- | plugins/omkafka/omkafka.c | 2141 |
4 files changed, 3145 insertions, 0 deletions
diff --git a/plugins/omkafka/Makefile.am b/plugins/omkafka/Makefile.am new file mode 100644 index 0000000..dd7b69a --- /dev/null +++ b/plugins/omkafka/Makefile.am @@ -0,0 +1,18 @@ +pkglib_LTLIBRARIES = omkafka.la + +if OMKAFKA_USE_DUMMY +omkafka_la_SOURCES = dummy.c +omkafka_la_LDFLAGS = -module -avoid-version +omkafka_la_CPPFLAGS = -I$(top_srcdir) $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) -D MODNAME=omkafka +else +omkafka_la_SOURCES = omkafka.c +if ENABLE_KAFKA_STATIC +omkafka_la_LDFLAGS = -module -avoid-version -Wl,--whole-archive -l:librdkafka-static.a -Wl,--no-whole-archive -ldl -lresolv -lcurl -lssl -lpthread -lcrypto -lsasl2 -lz -llz4 -lrt -lm +else +omkafka_la_LDFLAGS = -module -avoid-version $(LIBRDKAFKA_LIBS) -lm +endif +omkafka_la_CPPFLAGS = -I$(top_srcdir) $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) +endif + +omkafka_la_LIBADD = +EXTRA_DIST = diff --git a/plugins/omkafka/Makefile.in b/plugins/omkafka/Makefile.in new file mode 100644 index 0000000..c0148d0 --- /dev/null +++ b/plugins/omkafka/Makefile.in @@ -0,0 +1,815 @@ +# Makefile.in generated by automake 1.16.1 from Makefile.am. +# @configure_input@ + +# Copyright (C) 1994-2018 Free Software Foundation, Inc. + +# This Makefile.in is free software; the Free Software Foundation +# gives unlimited permission to copy and/or distribute it, +# with or without modifications, as long as this notice is preserved. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY, to the extent permitted by law; without +# even the implied warranty of MERCHANTABILITY or FITNESS FOR A +# PARTICULAR PURPOSE. + +@SET_MAKE@ + +VPATH = @srcdir@ +am__is_gnu_make = { \ + if test -z '$(MAKELEVEL)'; then \ + false; \ + elif test -n '$(MAKE_HOST)'; then \ + true; \ + elif test -n '$(MAKE_VERSION)' && test -n '$(CURDIR)'; then \ + true; \ + else \ + false; \ + fi; \ +} +am__make_running_with_option = \ + case $${target_option-} in \ + ?) ;; \ + *) echo "am__make_running_with_option: internal error: invalid" \ + "target option '$${target_option-}' specified" >&2; \ + exit 1;; \ + esac; \ + has_opt=no; \ + sane_makeflags=$$MAKEFLAGS; \ + if $(am__is_gnu_make); then \ + sane_makeflags=$$MFLAGS; \ + else \ + case $$MAKEFLAGS in \ + *\\[\ \ ]*) \ + bs=\\; \ + sane_makeflags=`printf '%s\n' "$$MAKEFLAGS" \ + | sed "s/$$bs$$bs[$$bs $$bs ]*//g"`;; \ + esac; \ + fi; \ + skip_next=no; \ + strip_trailopt () \ + { \ + flg=`printf '%s\n' "$$flg" | sed "s/$$1.*$$//"`; \ + }; \ + for flg in $$sane_makeflags; do \ + test $$skip_next = yes && { skip_next=no; continue; }; \ + case $$flg in \ + *=*|--*) continue;; \ + -*I) strip_trailopt 'I'; skip_next=yes;; \ + -*I?*) strip_trailopt 'I';; \ + -*O) strip_trailopt 'O'; skip_next=yes;; \ + -*O?*) strip_trailopt 'O';; \ + -*l) strip_trailopt 'l'; skip_next=yes;; \ + -*l?*) strip_trailopt 'l';; \ + -[dEDm]) skip_next=yes;; \ + -[JT]) skip_next=yes;; \ + esac; \ + case $$flg in \ + *$$target_option*) has_opt=yes; break;; \ + esac; \ + done; \ + test $$has_opt = yes +am__make_dryrun = (target_option=n; $(am__make_running_with_option)) +am__make_keepgoing = (target_option=k; $(am__make_running_with_option)) +pkgdatadir = $(datadir)/@PACKAGE@ +pkgincludedir = $(includedir)/@PACKAGE@ +pkglibdir = $(libdir)/@PACKAGE@ +pkglibexecdir = $(libexecdir)/@PACKAGE@ +am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd +install_sh_DATA = $(install_sh) -c -m 644 +install_sh_PROGRAM = $(install_sh) -c +install_sh_SCRIPT = $(install_sh) -c +INSTALL_HEADER = $(INSTALL_DATA) +transform = $(program_transform_name) +NORMAL_INSTALL = : +PRE_INSTALL = : +POST_INSTALL = : +NORMAL_UNINSTALL = : +PRE_UNINSTALL = : +POST_UNINSTALL = : +build_triplet = @build@ +host_triplet = @host@ +subdir = plugins/omkafka +ACLOCAL_M4 = $(top_srcdir)/aclocal.m4 +am__aclocal_m4_deps = $(top_srcdir)/m4/ac_check_define.m4 \ + $(top_srcdir)/m4/atomic_operations.m4 \ + $(top_srcdir)/m4/atomic_operations_64bit.m4 \ + $(top_srcdir)/m4/libtool.m4 $(top_srcdir)/m4/ltoptions.m4 \ + $(top_srcdir)/m4/ltsugar.m4 $(top_srcdir)/m4/ltversion.m4 \ + $(top_srcdir)/m4/lt~obsolete.m4 $(top_srcdir)/configure.ac +am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \ + $(ACLOCAL_M4) +DIST_COMMON = $(srcdir)/Makefile.am $(am__DIST_COMMON) +mkinstalldirs = $(install_sh) -d +CONFIG_HEADER = $(top_builddir)/config.h +CONFIG_CLEAN_FILES = +CONFIG_CLEAN_VPATH_FILES = +am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`; +am__vpath_adj = case $$p in \ + $(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \ + *) f=$$p;; \ + esac; +am__strip_dir = f=`echo $$p | sed -e 's|^.*/||'`; +am__install_max = 40 +am__nobase_strip_setup = \ + srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*|]/\\\\&/g'` +am__nobase_strip = \ + for p in $$list; do echo "$$p"; done | sed -e "s|$$srcdirstrip/||" +am__nobase_list = $(am__nobase_strip_setup); \ + for p in $$list; do echo "$$p $$p"; done | \ + sed "s| $$srcdirstrip/| |;"' / .*\//!s/ .*/ ./; s,\( .*\)/[^/]*$$,\1,' | \ + $(AWK) 'BEGIN { files["."] = "" } { files[$$2] = files[$$2] " " $$1; \ + if (++n[$$2] == $(am__install_max)) \ + { print $$2, files[$$2]; n[$$2] = 0; files[$$2] = "" } } \ + END { for (dir in files) print dir, files[dir] }' +am__base_list = \ + sed '$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;s/\n/ /g' | \ + sed '$$!N;$$!N;$$!N;$$!N;s/\n/ /g' +am__uninstall_files_from_dir = { \ + test -z "$$files" \ + || { test ! -d "$$dir" && test ! -f "$$dir" && test ! -r "$$dir"; } \ + || { echo " ( cd '$$dir' && rm -f" $$files ")"; \ + $(am__cd) "$$dir" && rm -f $$files; }; \ + } +am__installdirs = "$(DESTDIR)$(pkglibdir)" +LTLIBRARIES = $(pkglib_LTLIBRARIES) +omkafka_la_DEPENDENCIES = +am__omkafka_la_SOURCES_DIST = omkafka.c dummy.c +@OMKAFKA_USE_DUMMY_FALSE@am_omkafka_la_OBJECTS = \ +@OMKAFKA_USE_DUMMY_FALSE@ omkafka_la-omkafka.lo +@OMKAFKA_USE_DUMMY_TRUE@am_omkafka_la_OBJECTS = omkafka_la-dummy.lo +omkafka_la_OBJECTS = $(am_omkafka_la_OBJECTS) +AM_V_lt = $(am__v_lt_@AM_V@) +am__v_lt_ = $(am__v_lt_@AM_DEFAULT_V@) +am__v_lt_0 = --silent +am__v_lt_1 = +omkafka_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ + $(LIBTOOLFLAGS) --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \ + $(omkafka_la_LDFLAGS) $(LDFLAGS) -o $@ +AM_V_P = $(am__v_P_@AM_V@) +am__v_P_ = $(am__v_P_@AM_DEFAULT_V@) +am__v_P_0 = false +am__v_P_1 = : +AM_V_GEN = $(am__v_GEN_@AM_V@) +am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@) +am__v_GEN_0 = @echo " GEN " $@; +am__v_GEN_1 = +AM_V_at = $(am__v_at_@AM_V@) +am__v_at_ = $(am__v_at_@AM_DEFAULT_V@) +am__v_at_0 = @ +am__v_at_1 = +DEFAULT_INCLUDES = -I.@am__isrc@ -I$(top_builddir) +depcomp = $(SHELL) $(top_srcdir)/depcomp +am__maybe_remake_depfiles = depfiles +am__depfiles_remade = ./$(DEPDIR)/omkafka_la-dummy.Plo \ + ./$(DEPDIR)/omkafka_la-omkafka.Plo +am__mv = mv -f +COMPILE = $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) \ + $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) +LTCOMPILE = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ + $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) \ + $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) \ + $(AM_CFLAGS) $(CFLAGS) +AM_V_CC = $(am__v_CC_@AM_V@) +am__v_CC_ = $(am__v_CC_@AM_DEFAULT_V@) +am__v_CC_0 = @echo " CC " $@; +am__v_CC_1 = +CCLD = $(CC) +LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ + $(LIBTOOLFLAGS) --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \ + $(AM_LDFLAGS) $(LDFLAGS) -o $@ +AM_V_CCLD = $(am__v_CCLD_@AM_V@) +am__v_CCLD_ = $(am__v_CCLD_@AM_DEFAULT_V@) +am__v_CCLD_0 = @echo " CCLD " $@; +am__v_CCLD_1 = +SOURCES = $(omkafka_la_SOURCES) +DIST_SOURCES = $(am__omkafka_la_SOURCES_DIST) +am__can_run_installinfo = \ + case $$AM_UPDATE_INFO_DIR in \ + n|no|NO) false;; \ + *) (install-info --version) >/dev/null 2>&1;; \ + esac +am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP) +# Read a list of newline-separated strings from the standard input, +# and print each of them once, without duplicates. Input order is +# *not* preserved. +am__uniquify_input = $(AWK) '\ + BEGIN { nonempty = 0; } \ + { items[$$0] = 1; nonempty = 1; } \ + END { if (nonempty) { for (i in items) print i; }; } \ +' +# Make sure the list of sources is unique. This is necessary because, +# e.g., the same source file might be shared among _SOURCES variables +# for different programs/libraries. +am__define_uniq_tagged_files = \ + list='$(am__tagged_files)'; \ + unique=`for i in $$list; do \ + if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \ + done | $(am__uniquify_input)` +ETAGS = etags +CTAGS = ctags +am__DIST_COMMON = $(srcdir)/Makefile.in $(top_srcdir)/depcomp +DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST) +ACLOCAL = @ACLOCAL@ +AMTAR = @AMTAR@ +AM_DEFAULT_VERBOSITY = @AM_DEFAULT_VERBOSITY@ +APU_CFLAGS = @APU_CFLAGS@ +APU_LIBS = @APU_LIBS@ +AR = @AR@ +AUTOCONF = @AUTOCONF@ +AUTOHEADER = @AUTOHEADER@ +AUTOMAKE = @AUTOMAKE@ +AWK = @AWK@ +CC = @CC@ +CCDEPMODE = @CCDEPMODE@ +CFLAGS = @CFLAGS@ +CIVETWEB_LIBS = @CIVETWEB_LIBS@ +CONF_FILE_PATH = @CONF_FILE_PATH@ +CPP = @CPP@ +CPPFLAGS = @CPPFLAGS@ +CURL_CFLAGS = @CURL_CFLAGS@ +CURL_LIBS = @CURL_LIBS@ +CYGPATH_W = @CYGPATH_W@ +CZMQ_CFLAGS = @CZMQ_CFLAGS@ +CZMQ_LIBS = @CZMQ_LIBS@ +DEFS = @DEFS@ +DEPDIR = @DEPDIR@ +DLLTOOL = @DLLTOOL@ +DL_LIBS = @DL_LIBS@ +DSYMUTIL = @DSYMUTIL@ +DUMPBIN = @DUMPBIN@ +ECHO_C = @ECHO_C@ +ECHO_N = @ECHO_N@ +ECHO_T = @ECHO_T@ +EGREP = @EGREP@ +EXEEXT = @EXEEXT@ +FAUP_LIBS = @FAUP_LIBS@ +FGREP = @FGREP@ +GLIB_CFLAGS = @GLIB_CFLAGS@ +GLIB_LIBS = @GLIB_LIBS@ +GNUTLS_CFLAGS = @GNUTLS_CFLAGS@ +GNUTLS_LIBS = @GNUTLS_LIBS@ +GREP = @GREP@ +GSS_LIBS = @GSS_LIBS@ +GT_KSI_LS12_CFLAGS = @GT_KSI_LS12_CFLAGS@ +GT_KSI_LS12_LIBS = @GT_KSI_LS12_LIBS@ +HASH_XXHASH_LIBS = @HASH_XXHASH_LIBS@ +HIREDIS_CFLAGS = @HIREDIS_CFLAGS@ +HIREDIS_LIBS = @HIREDIS_LIBS@ +IMUDP_LIBS = @IMUDP_LIBS@ +INSTALL = @INSTALL@ +INSTALL_DATA = @INSTALL_DATA@ +INSTALL_PROGRAM = @INSTALL_PROGRAM@ +INSTALL_SCRIPT = @INSTALL_SCRIPT@ +INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@ +IP = @IP@ +JAVA = @JAVA@ +JAVAC = @JAVAC@ +LD = @LD@ +LDFLAGS = @LDFLAGS@ +LEX = @LEX@ +LEXLIB = @LEXLIB@ +LEX_OUTPUT_ROOT = @LEX_OUTPUT_ROOT@ +LIBCAPNG_CFLAGS = @LIBCAPNG_CFLAGS@ +LIBCAPNG_LIBS = @LIBCAPNG_LIBS@ +LIBCAPNG_PRESENT_CFLAGS = @LIBCAPNG_PRESENT_CFLAGS@ +LIBCAPNG_PRESENT_LIBS = @LIBCAPNG_PRESENT_LIBS@ +LIBDBI_CFLAGS = @LIBDBI_CFLAGS@ +LIBDBI_LIBS = @LIBDBI_LIBS@ +LIBESTR_CFLAGS = @LIBESTR_CFLAGS@ +LIBESTR_LIBS = @LIBESTR_LIBS@ +LIBEVENT_CFLAGS = @LIBEVENT_CFLAGS@ +LIBEVENT_LIBS = @LIBEVENT_LIBS@ +LIBFASTJSON_CFLAGS = @LIBFASTJSON_CFLAGS@ +LIBFASTJSON_LIBS = @LIBFASTJSON_LIBS@ +LIBGCRYPT_CFLAGS = @LIBGCRYPT_CFLAGS@ +LIBGCRYPT_CONFIG = @LIBGCRYPT_CONFIG@ +LIBGCRYPT_LIBS = @LIBGCRYPT_LIBS@ +LIBLOGGING_CFLAGS = @LIBLOGGING_CFLAGS@ +LIBLOGGING_LIBS = @LIBLOGGING_LIBS@ +LIBLOGGING_STDLOG_CFLAGS = @LIBLOGGING_STDLOG_CFLAGS@ +LIBLOGGING_STDLOG_LIBS = @LIBLOGGING_STDLOG_LIBS@ +LIBLOGNORM_CFLAGS = @LIBLOGNORM_CFLAGS@ +LIBLOGNORM_LIBS = @LIBLOGNORM_LIBS@ +LIBLZ4_CFLAGS = @LIBLZ4_CFLAGS@ +LIBLZ4_LIBS = @LIBLZ4_LIBS@ +LIBM = @LIBM@ +LIBMONGOC_CFLAGS = @LIBMONGOC_CFLAGS@ +LIBMONGOC_LIBS = @LIBMONGOC_LIBS@ +LIBOBJS = @LIBOBJS@ +LIBRDKAFKA_CFLAGS = @LIBRDKAFKA_CFLAGS@ +LIBRDKAFKA_LIBS = @LIBRDKAFKA_LIBS@ +LIBS = @LIBS@ +LIBSYSTEMD_CFLAGS = @LIBSYSTEMD_CFLAGS@ +LIBSYSTEMD_JOURNAL_CFLAGS = @LIBSYSTEMD_JOURNAL_CFLAGS@ +LIBSYSTEMD_JOURNAL_LIBS = @LIBSYSTEMD_JOURNAL_LIBS@ +LIBSYSTEMD_LIBS = @LIBSYSTEMD_LIBS@ +LIBTOOL = @LIBTOOL@ +LIBUUID_CFLAGS = @LIBUUID_CFLAGS@ +LIBUUID_LIBS = @LIBUUID_LIBS@ +LIPO = @LIPO@ +LN_S = @LN_S@ +LTLIBOBJS = @LTLIBOBJS@ +LT_SYS_LIBRARY_PATH = @LT_SYS_LIBRARY_PATH@ +MAKEINFO = @MAKEINFO@ +MANIFEST_TOOL = @MANIFEST_TOOL@ +MKDIR_P = @MKDIR_P@ +MYSQL_CFLAGS = @MYSQL_CFLAGS@ +MYSQL_CONFIG = @MYSQL_CONFIG@ +MYSQL_LIBS = @MYSQL_LIBS@ +NM = @NM@ +NMEDIT = @NMEDIT@ +OBJDUMP = @OBJDUMP@ +OBJEXT = @OBJEXT@ +OPENSSL_CFLAGS = @OPENSSL_CFLAGS@ +OPENSSL_LIBS = @OPENSSL_LIBS@ +OTOOL = @OTOOL@ +OTOOL64 = @OTOOL64@ +PACKAGE = @PACKAGE@ +PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@ +PACKAGE_NAME = @PACKAGE_NAME@ +PACKAGE_STRING = @PACKAGE_STRING@ +PACKAGE_TARNAME = @PACKAGE_TARNAME@ +PACKAGE_URL = @PACKAGE_URL@ +PACKAGE_VERSION = @PACKAGE_VERSION@ +PATH_SEPARATOR = @PATH_SEPARATOR@ +PGSQL_CFLAGS = @PGSQL_CFLAGS@ +PGSQL_LIBS = @PGSQL_LIBS@ +PG_CONFIG = @PG_CONFIG@ +PID_FILE_PATH = @PID_FILE_PATH@ +PKG_CONFIG = @PKG_CONFIG@ +PKG_CONFIG_LIBDIR = @PKG_CONFIG_LIBDIR@ +PKG_CONFIG_PATH = @PKG_CONFIG_PATH@ +PROTON_CFLAGS = @PROTON_CFLAGS@ +PROTON_LIBS = @PROTON_LIBS@ +PROTON_PROACTOR_CFLAGS = @PROTON_PROACTOR_CFLAGS@ +PROTON_PROACTOR_LIBS = @PROTON_PROACTOR_LIBS@ +PTHREADS_CFLAGS = @PTHREADS_CFLAGS@ +PTHREADS_LIBS = @PTHREADS_LIBS@ +PYTHON = @PYTHON@ +PYTHON_EXEC_PREFIX = @PYTHON_EXEC_PREFIX@ +PYTHON_PLATFORM = @PYTHON_PLATFORM@ +PYTHON_PREFIX = @PYTHON_PREFIX@ +PYTHON_VERSION = @PYTHON_VERSION@ +RABBITMQ_CFLAGS = @RABBITMQ_CFLAGS@ +RABBITMQ_LIBS = @RABBITMQ_LIBS@ +RANLIB = @RANLIB@ +READLINK = @READLINK@ +REDIS = @REDIS@ +RELP_CFLAGS = @RELP_CFLAGS@ +RELP_LIBS = @RELP_LIBS@ +RSRT_CFLAGS = @RSRT_CFLAGS@ +RSRT_CFLAGS1 = @RSRT_CFLAGS1@ +RSRT_LIBS = @RSRT_LIBS@ +RSRT_LIBS1 = @RSRT_LIBS1@ +RST2MAN = @RST2MAN@ +RT_LIBS = @RT_LIBS@ +SED = @SED@ +SET_MAKE = @SET_MAKE@ +SHELL = @SHELL@ +SNMP_CFLAGS = @SNMP_CFLAGS@ +SNMP_LIBS = @SNMP_LIBS@ +SOL_LIBS = @SOL_LIBS@ +STRIP = @STRIP@ +TCL_BIN_DIR = @TCL_BIN_DIR@ +TCL_INCLUDE_SPEC = @TCL_INCLUDE_SPEC@ +TCL_LIB_FILE = @TCL_LIB_FILE@ +TCL_LIB_FLAG = @TCL_LIB_FLAG@ +TCL_LIB_SPEC = @TCL_LIB_SPEC@ +TCL_PATCH_LEVEL = @TCL_PATCH_LEVEL@ +TCL_SRC_DIR = @TCL_SRC_DIR@ +TCL_STUB_LIB_FILE = @TCL_STUB_LIB_FILE@ +TCL_STUB_LIB_FLAG = @TCL_STUB_LIB_FLAG@ +TCL_STUB_LIB_SPEC = @TCL_STUB_LIB_SPEC@ +TCL_VERSION = @TCL_VERSION@ +UDPSPOOF_CFLAGS = @UDPSPOOF_CFLAGS@ +UDPSPOOF_LIBS = @UDPSPOOF_LIBS@ +VALGRIND = @VALGRIND@ +VERSION = @VERSION@ +WARN_CFLAGS = @WARN_CFLAGS@ +WARN_LDFLAGS = @WARN_LDFLAGS@ +WARN_SCANNERFLAGS = @WARN_SCANNERFLAGS@ +WGET = @WGET@ +YACC = @YACC@ +YACC_FOUND = @YACC_FOUND@ +YFLAGS = @YFLAGS@ +ZLIB_CFLAGS = @ZLIB_CFLAGS@ +ZLIB_LIBS = @ZLIB_LIBS@ +ZSTD_CFLAGS = @ZSTD_CFLAGS@ +ZSTD_LIBS = @ZSTD_LIBS@ +abs_builddir = @abs_builddir@ +abs_srcdir = @abs_srcdir@ +abs_top_builddir = @abs_top_builddir@ +abs_top_srcdir = @abs_top_srcdir@ +ac_ct_AR = @ac_ct_AR@ +ac_ct_CC = @ac_ct_CC@ +ac_ct_DUMPBIN = @ac_ct_DUMPBIN@ +am__include = @am__include@ +am__leading_dot = @am__leading_dot@ +am__quote = @am__quote@ +am__tar = @am__tar@ +am__untar = @am__untar@ +bindir = @bindir@ +build = @build@ +build_alias = @build_alias@ +build_cpu = @build_cpu@ +build_os = @build_os@ +build_vendor = @build_vendor@ +builddir = @builddir@ +datadir = @datadir@ +datarootdir = @datarootdir@ +docdir = @docdir@ +dvidir = @dvidir@ +exec_prefix = @exec_prefix@ +host = @host@ +host_alias = @host_alias@ +host_cpu = @host_cpu@ +host_os = @host_os@ +host_vendor = @host_vendor@ +htmldir = @htmldir@ +includedir = @includedir@ +infodir = @infodir@ +install_sh = @install_sh@ +libdir = @libdir@ +libexecdir = @libexecdir@ +localedir = @localedir@ +localstatedir = @localstatedir@ +mandir = @mandir@ +mkdir_p = @mkdir_p@ +moddirs = @moddirs@ +oldincludedir = @oldincludedir@ +pdfdir = @pdfdir@ +pkgpyexecdir = @pkgpyexecdir@ +pkgpythondir = @pkgpythondir@ +prefix = @prefix@ +program_transform_name = @program_transform_name@ +psdir = @psdir@ +pyexecdir = @pyexecdir@ +pythondir = @pythondir@ +runstatedir = @runstatedir@ +sbindir = @sbindir@ +sharedstatedir = @sharedstatedir@ +srcdir = @srcdir@ +sysconfdir = @sysconfdir@ +target_alias = @target_alias@ +top_build_prefix = @top_build_prefix@ +top_builddir = @top_builddir@ +top_srcdir = @top_srcdir@ +pkglib_LTLIBRARIES = omkafka.la +@OMKAFKA_USE_DUMMY_FALSE@omkafka_la_SOURCES = omkafka.c +@OMKAFKA_USE_DUMMY_TRUE@omkafka_la_SOURCES = dummy.c +@ENABLE_KAFKA_STATIC_FALSE@@OMKAFKA_USE_DUMMY_FALSE@omkafka_la_LDFLAGS = -module -avoid-version $(LIBRDKAFKA_LIBS) -lm +@ENABLE_KAFKA_STATIC_TRUE@@OMKAFKA_USE_DUMMY_FALSE@omkafka_la_LDFLAGS = -module -avoid-version -Wl,--whole-archive -l:librdkafka-static.a -Wl,--no-whole-archive -ldl -lresolv -lcurl -lssl -lpthread -lcrypto -lsasl2 -lz -llz4 -lrt -lm +@OMKAFKA_USE_DUMMY_TRUE@omkafka_la_LDFLAGS = -module -avoid-version +@OMKAFKA_USE_DUMMY_FALSE@omkafka_la_CPPFLAGS = -I$(top_srcdir) $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) +@OMKAFKA_USE_DUMMY_TRUE@omkafka_la_CPPFLAGS = -I$(top_srcdir) $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) -D MODNAME=omkafka +omkafka_la_LIBADD = +EXTRA_DIST = +all: all-am + +.SUFFIXES: +.SUFFIXES: .c .lo .o .obj +$(srcdir)/Makefile.in: $(srcdir)/Makefile.am $(am__configure_deps) + @for dep in $?; do \ + case '$(am__configure_deps)' in \ + *$$dep*) \ + ( cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh ) \ + && { if test -f $@; then exit 0; else break; fi; }; \ + exit 1;; \ + esac; \ + done; \ + echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu plugins/omkafka/Makefile'; \ + $(am__cd) $(top_srcdir) && \ + $(AUTOMAKE) --gnu plugins/omkafka/Makefile +Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status + @case '$?' in \ + *config.status*) \ + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \ + *) \ + echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__maybe_remake_depfiles)'; \ + cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__maybe_remake_depfiles);; \ + esac; + +$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh + +$(top_srcdir)/configure: $(am__configure_deps) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh +$(ACLOCAL_M4): $(am__aclocal_m4_deps) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh +$(am__aclocal_m4_deps): + +install-pkglibLTLIBRARIES: $(pkglib_LTLIBRARIES) + @$(NORMAL_INSTALL) + @list='$(pkglib_LTLIBRARIES)'; test -n "$(pkglibdir)" || list=; \ + list2=; for p in $$list; do \ + if test -f $$p; then \ + list2="$$list2 $$p"; \ + else :; fi; \ + done; \ + test -z "$$list2" || { \ + echo " $(MKDIR_P) '$(DESTDIR)$(pkglibdir)'"; \ + $(MKDIR_P) "$(DESTDIR)$(pkglibdir)" || exit 1; \ + echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 '$(DESTDIR)$(pkglibdir)'"; \ + $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 "$(DESTDIR)$(pkglibdir)"; \ + } + +uninstall-pkglibLTLIBRARIES: + @$(NORMAL_UNINSTALL) + @list='$(pkglib_LTLIBRARIES)'; test -n "$(pkglibdir)" || list=; \ + for p in $$list; do \ + $(am__strip_dir) \ + echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=uninstall rm -f '$(DESTDIR)$(pkglibdir)/$$f'"; \ + $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=uninstall rm -f "$(DESTDIR)$(pkglibdir)/$$f"; \ + done + +clean-pkglibLTLIBRARIES: + -test -z "$(pkglib_LTLIBRARIES)" || rm -f $(pkglib_LTLIBRARIES) + @list='$(pkglib_LTLIBRARIES)'; \ + locs=`for p in $$list; do echo $$p; done | \ + sed 's|^[^/]*$$|.|; s|/[^/]*$$||; s|$$|/so_locations|' | \ + sort -u`; \ + test -z "$$locs" || { \ + echo rm -f $${locs}; \ + rm -f $${locs}; \ + } + +omkafka.la: $(omkafka_la_OBJECTS) $(omkafka_la_DEPENDENCIES) $(EXTRA_omkafka_la_DEPENDENCIES) + $(AM_V_CCLD)$(omkafka_la_LINK) -rpath $(pkglibdir) $(omkafka_la_OBJECTS) $(omkafka_la_LIBADD) $(LIBS) + +mostlyclean-compile: + -rm -f *.$(OBJEXT) + +distclean-compile: + -rm -f *.tab.c + +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/omkafka_la-dummy.Plo@am__quote@ # am--include-marker +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/omkafka_la-omkafka.Plo@am__quote@ # am--include-marker + +$(am__depfiles_remade): + @$(MKDIR_P) $(@D) + @echo '# dummy' >$@-t && $(am__mv) $@-t $@ + +am--depfiles: $(am__depfiles_remade) + +.c.o: +@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.o$$||'`;\ +@am__fastdepCC_TRUE@ $(COMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ $< &&\ +@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Po +@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(COMPILE) -c -o $@ $< + +.c.obj: +@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.obj$$||'`;\ +@am__fastdepCC_TRUE@ $(COMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ `$(CYGPATH_W) '$<'` &&\ +@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Po +@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(COMPILE) -c -o $@ `$(CYGPATH_W) '$<'` + +.c.lo: +@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.lo$$||'`;\ +@am__fastdepCC_TRUE@ $(LTCOMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ $< &&\ +@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Plo +@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(LTCOMPILE) -c -o $@ $< + +omkafka_la-omkafka.lo: omkafka.c +@am__fastdepCC_TRUE@ $(AM_V_CC)$(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(omkafka_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT omkafka_la-omkafka.lo -MD -MP -MF $(DEPDIR)/omkafka_la-omkafka.Tpo -c -o omkafka_la-omkafka.lo `test -f 'omkafka.c' || echo '$(srcdir)/'`omkafka.c +@am__fastdepCC_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/omkafka_la-omkafka.Tpo $(DEPDIR)/omkafka_la-omkafka.Plo +@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='omkafka.c' object='omkafka_la-omkafka.lo' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(omkafka_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o omkafka_la-omkafka.lo `test -f 'omkafka.c' || echo '$(srcdir)/'`omkafka.c + +omkafka_la-dummy.lo: dummy.c +@am__fastdepCC_TRUE@ $(AM_V_CC)$(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(omkafka_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT omkafka_la-dummy.lo -MD -MP -MF $(DEPDIR)/omkafka_la-dummy.Tpo -c -o omkafka_la-dummy.lo `test -f 'dummy.c' || echo '$(srcdir)/'`dummy.c +@am__fastdepCC_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/omkafka_la-dummy.Tpo $(DEPDIR)/omkafka_la-dummy.Plo +@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='dummy.c' object='omkafka_la-dummy.lo' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(omkafka_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o omkafka_la-dummy.lo `test -f 'dummy.c' || echo '$(srcdir)/'`dummy.c + +mostlyclean-libtool: + -rm -f *.lo + +clean-libtool: + -rm -rf .libs _libs + +ID: $(am__tagged_files) + $(am__define_uniq_tagged_files); mkid -fID $$unique +tags: tags-am +TAGS: tags + +tags-am: $(TAGS_DEPENDENCIES) $(am__tagged_files) + set x; \ + here=`pwd`; \ + $(am__define_uniq_tagged_files); \ + shift; \ + if test -z "$(ETAGS_ARGS)$$*$$unique"; then :; else \ + test -n "$$unique" || unique=$$empty_fix; \ + if test $$# -gt 0; then \ + $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \ + "$$@" $$unique; \ + else \ + $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \ + $$unique; \ + fi; \ + fi +ctags: ctags-am + +CTAGS: ctags +ctags-am: $(TAGS_DEPENDENCIES) $(am__tagged_files) + $(am__define_uniq_tagged_files); \ + test -z "$(CTAGS_ARGS)$$unique" \ + || $(CTAGS) $(CTAGSFLAGS) $(AM_CTAGSFLAGS) $(CTAGS_ARGS) \ + $$unique + +GTAGS: + here=`$(am__cd) $(top_builddir) && pwd` \ + && $(am__cd) $(top_srcdir) \ + && gtags -i $(GTAGS_ARGS) "$$here" +cscopelist: cscopelist-am + +cscopelist-am: $(am__tagged_files) + list='$(am__tagged_files)'; \ + case "$(srcdir)" in \ + [\\/]* | ?:[\\/]*) sdir="$(srcdir)" ;; \ + *) sdir=$(subdir)/$(srcdir) ;; \ + esac; \ + for i in $$list; do \ + if test -f "$$i"; then \ + echo "$(subdir)/$$i"; \ + else \ + echo "$$sdir/$$i"; \ + fi; \ + done >> $(top_builddir)/cscope.files + +distclean-tags: + -rm -f TAGS ID GTAGS GRTAGS GSYMS GPATH tags + +distdir: $(BUILT_SOURCES) + $(MAKE) $(AM_MAKEFLAGS) distdir-am + +distdir-am: $(DISTFILES) + @srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \ + topsrcdirstrip=`echo "$(top_srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \ + list='$(DISTFILES)'; \ + dist_files=`for file in $$list; do echo $$file; done | \ + sed -e "s|^$$srcdirstrip/||;t" \ + -e "s|^$$topsrcdirstrip/|$(top_builddir)/|;t"`; \ + case $$dist_files in \ + */*) $(MKDIR_P) `echo "$$dist_files" | \ + sed '/\//!d;s|^|$(distdir)/|;s,/[^/]*$$,,' | \ + sort -u` ;; \ + esac; \ + for file in $$dist_files; do \ + if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \ + if test -d $$d/$$file; then \ + dir=`echo "/$$file" | sed -e 's,/[^/]*$$,,'`; \ + if test -d "$(distdir)/$$file"; then \ + find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \ + fi; \ + if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \ + cp -fpR $(srcdir)/$$file "$(distdir)$$dir" || exit 1; \ + find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \ + fi; \ + cp -fpR $$d/$$file "$(distdir)$$dir" || exit 1; \ + else \ + test -f "$(distdir)/$$file" \ + || cp -p $$d/$$file "$(distdir)/$$file" \ + || exit 1; \ + fi; \ + done +check-am: all-am +check: check-am +all-am: Makefile $(LTLIBRARIES) +installdirs: + for dir in "$(DESTDIR)$(pkglibdir)"; do \ + test -z "$$dir" || $(MKDIR_P) "$$dir"; \ + done +install: install-am +install-exec: install-exec-am +install-data: install-data-am +uninstall: uninstall-am + +install-am: all-am + @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am + +installcheck: installcheck-am +install-strip: + if test -z '$(STRIP)'; then \ + $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \ + install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \ + install; \ + else \ + $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \ + install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \ + "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'" install; \ + fi +mostlyclean-generic: + +clean-generic: + +distclean-generic: + -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES) + -test . = "$(srcdir)" || test -z "$(CONFIG_CLEAN_VPATH_FILES)" || rm -f $(CONFIG_CLEAN_VPATH_FILES) + +maintainer-clean-generic: + @echo "This command is intended for maintainers to use" + @echo "it deletes files that may require special tools to rebuild." +clean: clean-am + +clean-am: clean-generic clean-libtool clean-pkglibLTLIBRARIES \ + mostlyclean-am + +distclean: distclean-am + -rm -f ./$(DEPDIR)/omkafka_la-dummy.Plo + -rm -f ./$(DEPDIR)/omkafka_la-omkafka.Plo + -rm -f Makefile +distclean-am: clean-am distclean-compile distclean-generic \ + distclean-tags + +dvi: dvi-am + +dvi-am: + +html: html-am + +html-am: + +info: info-am + +info-am: + +install-data-am: + +install-dvi: install-dvi-am + +install-dvi-am: + +install-exec-am: install-pkglibLTLIBRARIES + +install-html: install-html-am + +install-html-am: + +install-info: install-info-am + +install-info-am: + +install-man: + +install-pdf: install-pdf-am + +install-pdf-am: + +install-ps: install-ps-am + +install-ps-am: + +installcheck-am: + +maintainer-clean: maintainer-clean-am + -rm -f ./$(DEPDIR)/omkafka_la-dummy.Plo + -rm -f ./$(DEPDIR)/omkafka_la-omkafka.Plo + -rm -f Makefile +maintainer-clean-am: distclean-am maintainer-clean-generic + +mostlyclean: mostlyclean-am + +mostlyclean-am: mostlyclean-compile mostlyclean-generic \ + mostlyclean-libtool + +pdf: pdf-am + +pdf-am: + +ps: ps-am + +ps-am: + +uninstall-am: uninstall-pkglibLTLIBRARIES + +.MAKE: install-am install-strip + +.PHONY: CTAGS GTAGS TAGS all all-am am--depfiles check check-am clean \ + clean-generic clean-libtool clean-pkglibLTLIBRARIES \ + cscopelist-am ctags ctags-am distclean distclean-compile \ + distclean-generic distclean-libtool distclean-tags distdir dvi \ + dvi-am html html-am info info-am install install-am \ + install-data install-data-am install-dvi install-dvi-am \ + install-exec install-exec-am install-html install-html-am \ + install-info install-info-am install-man install-pdf \ + install-pdf-am install-pkglibLTLIBRARIES install-ps \ + install-ps-am install-strip installcheck installcheck-am \ + installdirs maintainer-clean maintainer-clean-generic \ + mostlyclean mostlyclean-compile mostlyclean-generic \ + mostlyclean-libtool pdf pdf-am ps ps-am tags tags-am uninstall \ + uninstall-am uninstall-pkglibLTLIBRARIES + +.PRECIOUS: Makefile + + +# Tell versions [3.59,3.63) of GNU make to not export all variables. +# Otherwise a system limit (for SysV at least) may be exceeded. +.NOEXPORT: diff --git a/plugins/omkafka/dummy.c b/plugins/omkafka/dummy.c new file mode 100644 index 0000000..fe8c249 --- /dev/null +++ b/plugins/omkafka/dummy.c @@ -0,0 +1,171 @@ +/* a dummy module to be loaded if we cannot build this module, but + * configure required it to be "optional". + * + * Copyright 2020 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <unistd.h> +#include <stdint.h> +#include <pthread.h> +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "parserif.h" + +#define MODULE_NAME(x) #x + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME(MODULE_NAME(MODNAME)) + + +DEF_OMOD_STATIC_DATA + +/* config variables */ +typedef struct _instanceData { + char *dummy; +} instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + +struct modConfData_s { +}; + +/* modConf ptr to use for the current load process */ +static modConfData_t *loadModConf = NULL; +/* modConf ptr to use for the current exec process */ +static modConfData_t *runModConf = NULL; + + + + +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + loadModConf = pModConf; +ENDbeginCnfLoad + +BEGINendCnfLoad +CODESTARTendCnfLoad +ENDendCnfLoad + +BEGINcheckCnf +CODESTARTcheckCnf +ENDcheckCnf + +BEGINactivateCnf +CODESTARTactivateCnf + runModConf = pModConf; +ENDactivateCnf + +BEGINfreeCnf +CODESTARTfreeCnf +ENDfreeCnf + + +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature +ENDisCompatibleWithFeature + + +BEGINfreeInstance +CODESTARTfreeInstance +ENDfreeInstance + + +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + +BEGINsetModCnf +CODESTARTsetModCnf + (void) lst; + parser_errmsg("%s is an optional module which could not be built on your platform " + "please remove it from the configuration or upgrade your platform", MODULE_NAME(MODNAME)); +ENDsetModCnf + + +BEGINnewActInst +CODESTARTnewActInst + (void) pData; + (void) ppModData; + parser_errmsg("%s is an optional module which could not be built on your platform " + "please remove it from the configuration or upgrade your platform", MODULE_NAME(MODNAME)); +ENDnewActInst + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + + +BEGINtryResume +CODESTARTtryResume +ENDtryResume + + +BEGINdoAction_NoStrings +CODESTARTdoAction + (void) pMsgData; +ENDdoAction + + +NO_LEGACY_CONF_parseSelectorAct + + +BEGINmodExit +CODESTARTmodExit +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_QUERIES +ENDqueryEtryPt + + +BEGINmodInit() +CODESTARTmodInit + /* we only support the current interface specification */ + *ipIFVersProvided = CURR_MOD_IF_VERSION; +CODEmodInit_QueryRegCFSLineHdlr + dbgprintf("dummy module compiled with rsyslog version %s.\n", VERSION); +ENDmodInit diff --git a/plugins/omkafka/omkafka.c b/plugins/omkafka/omkafka.c new file mode 100644 index 0000000..e8eae08 --- /dev/null +++ b/plugins/omkafka/omkafka.c @@ -0,0 +1,2141 @@ +/* omkafka.c + * This output plugin make rsyslog talk to Apache Kafka. + * + * Copyright 2014-2017 by Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <strings.h> +#include <assert.h> +#include <errno.h> +#include <fcntl.h> +#include <pthread.h> +#include <sys/uio.h> +#include <sys/queue.h> +#include <sys/types.h> +#include <math.h> +#ifdef HAVE_SYS_STAT_H +# include <sys/stat.h> +#endif +#include <unistd.h> +#include <librdkafka/rdkafka.h> + +#include "rsyslog.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "atomic.h" +#include "statsobj.h" +#include "unicode-helper.h" +#include "datetime.h" + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omkafka") + +/* internal structures + */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(datetime) +DEFobjCurrIf(strm) +DEFobjCurrIf(statsobj) + +statsobj_t *kafkaStats; +STATSCOUNTER_DEF(ctrQueueSize, mutCtrQueueSize); +STATSCOUNTER_DEF(ctrTopicSubmit, mutCtrTopicSubmit); +STATSCOUNTER_DEF(ctrKafkaFail, mutCtrKafkaFail); +STATSCOUNTER_DEF(ctrCacheMiss, mutCtrCacheMiss); +STATSCOUNTER_DEF(ctrCacheEvict, mutCtrCacheEvict); +STATSCOUNTER_DEF(ctrCacheSkip, mutCtrCacheSkip); +STATSCOUNTER_DEF(ctrKafkaAck, mutCtrKafkaAck); +STATSCOUNTER_DEF(ctrKafkaMsgTooLarge, mutCtrKafkaMsgTooLarge); +STATSCOUNTER_DEF(ctrKafkaUnknownTopic, mutCtrKafkaUnknownTopic); +STATSCOUNTER_DEF(ctrKafkaQueueFull, mutCtrKafkaQueueFull); +STATSCOUNTER_DEF(ctrKafkaUnknownPartition, mutCtrKafkaUnknownPartition); +STATSCOUNTER_DEF(ctrKafkaOtherErrors, mutCtrKafkaOtherErrors); +STATSCOUNTER_DEF(ctrKafkaRespTimedOut, mutCtrKafkaRespTimedOut); +STATSCOUNTER_DEF(ctrKafkaRespTransport, mutCtrKafkaRespTransport); +STATSCOUNTER_DEF(ctrKafkaRespBrokerDown, mutCtrKafkaRespBrokerDown); +STATSCOUNTER_DEF(ctrKafkaRespAuth, mutCtrKafkaRespAuth); +STATSCOUNTER_DEF(ctrKafkaRespSSL, mutCtrKafkaRespSSL); +STATSCOUNTER_DEF(ctrKafkaRespOther, mutCtrKafkaRespOther); + +#define MAX_ERRMSG 1024 /* max size of error messages that we support */ + +#ifndef SLIST_INIT +#define SLIST_INIT(head) do { \ + (head)->slh_first = NULL; \ +} while (/*CONSTCOND*/0) +#endif + +#ifndef SLIST_ENTRY +#define SLIST_ENTRY(type) \ + struct { \ + struct type *sle_next; /* next element */ \ + } +#endif + +#ifndef SLIST_HEAD +#define SLIST_HEAD(name, type) \ +struct name { \ + struct type *slh_first; /* first element */ \ +} +#endif + +#ifndef SLIST_INSERT_HEAD +#define SLIST_INSERT_HEAD(head, elm, field) do { \ + (elm)->field.sle_next = (head)->slh_first; \ + (head)->slh_first = (elm); \ +} while (/*CONSTCOND*/0) +#endif + +#ifndef SLIST_REMOVE_HEAD +#define SLIST_REMOVE_HEAD(head, field) do { \ + (head)->slh_first = (head)->slh_first->field.sle_next; \ +} while (/*CONSTCOND*/0) +#endif + +#ifndef SLIST_FIRST +#define SLIST_FIRST(head) ((head)->slh_first) +#endif + +#ifndef SLIST_NEXT +#define SLIST_NEXT(elm, field) ((elm)->field.sle_next) +#endif + +#ifndef SLIST_EMPTY +#define SLIST_EMPTY(head) ((head)->slh_first == NULL) +#endif + +#ifndef SLIST_REMOVE +#define SLIST_REMOVE(head, elm, type, field) do { \ + if ((head)->slh_first == (elm)) { \ + SLIST_REMOVE_HEAD((head), field); \ + } \ + else { \ + struct type *curelm = (head)->slh_first; \ + while(curelm->field.sle_next != (elm)) \ + curelm = curelm->field.sle_next; \ + curelm->field.sle_next = curelm->field.sle_next->field.sle_next; \ + } \ +} while (/*CONSTCOND*/0) +#endif + +#define NO_FIXED_PARTITION -1 /* signifies that no fixed partition config exists */ + +struct kafka_params { + const char *name; + const char *val; +}; + +#ifndef O_LARGEFILE +#define O_LARGEFILE 0 +#endif + +/* flags for writeKafka: shall we resubmit a failed message? */ +#define RESUBMIT 1 +#define NO_RESUBMIT 0 + +#ifdef HAVE_ATOMIC_BUILTINS64 +static uint64 clockTopicAccess = 0; +#else +static unsigned clockTopicAccess = 0; +#endif +/* and the "tick" function */ +#ifndef HAVE_ATOMIC_BUILTINS +static pthread_mutex_t mutClock; +#endif +static uint64 +getClockTopicAccess(void) +{ +#ifdef HAVE_ATOMIC_BUILTINS64 + return ATOMIC_INC_AND_FETCH_uint64(&clockTopicAccess, &mutClock); +#else + return ATOMIC_INC_AND_FETCH_unsigned(&clockTopicAccess, &mutClock); +#endif +} + +/* Needed for Kafka timestamp librdkafka > 0.9.4 */ +#define KAFKA_TimeStamp "\"%timestamp:::date-unixtimestamp%\"" + +static int closeTimeout = 1000; +static pthread_mutex_t closeTimeoutMut = PTHREAD_MUTEX_INITIALIZER; + +/* stats callback window metrics */ +static uint64 rtt_avg_usec; +static uint64 throttle_avg_msec; +static uint64 int_latency_avg_usec; + +/* dynamic topic cache */ +struct s_dynaTopicCacheEntry { + uchar *pName; + rd_kafka_topic_t *pTopic; + uint64 clkTickAccessed; + pthread_rwlock_t lock; +}; +typedef struct s_dynaTopicCacheEntry dynaTopicCacheEntry; + +/* Struct for Failed Messages Listitems */ +struct s_failedmsg_entry { + uchar* key; + uchar* payload; + uchar* topicname; + SLIST_ENTRY(s_failedmsg_entry) entries; /* List. */ +} ; +typedef struct s_failedmsg_entry failedmsg_entry; + +typedef struct _instanceData { + uchar *topic; + sbool dynaKey; + sbool dynaTopic; + dynaTopicCacheEntry **dynCache; + pthread_mutex_t mutDynCache; + rd_kafka_topic_t *pTopic; + int iCurrElt; + int iCurrCacheSize; + int bReportErrs; + int iDynaTopicCacheSize; + uchar *tplName; /* assigned output template */ + char *brokers; + sbool autoPartition; + int fixedPartition; + int nPartitions; + uint32_t currPartition; + DEF_ATOMIC_HELPER_MUT(mutCurrPartition); + int nConfParams; + struct kafka_params *confParams; + int nTopicConfParams; + struct kafka_params *topicConfParams; + uchar *errorFile; + uchar *key; + int bReopenOnHup; + int bResubmitOnFailure; /* Resubmit failed messages into kafka queue*/ + int bKeepFailedMessages;/* Keep Failed messages in memory, + only works if bResubmitOnFailure is enabled */ + uchar *failedMsgFile; /* file in which failed messages are being stored on + shutdown and loaded on startup */ + + int fdErrFile; /* error file fd or -1 if not open */ + pthread_mutex_t mutErrFile; + uchar *statsFile; + int fdStatsFile; /* stats file fd or -1 if not open */ + pthread_mutex_t mutStatsFile; + int bIsOpen; + int bIsSuspended; /* when broker fail, we need to suspend the action */ + pthread_rwlock_t rkLock; + pthread_mutex_t mut_doAction; /* make sure one wrkr instance max in parallel */ + rd_kafka_t *rk; + int closeTimeout; + SLIST_HEAD(failedmsg_listhead, s_failedmsg_entry) failedmsg_head; + + uchar *statsName; + statsobj_t *stats; + STATSCOUNTER_DEF(ctrTopicSubmit, mutCtrTopicSubmit); + STATSCOUNTER_DEF(ctrKafkaFail, mutCtrKafkaFail); + STATSCOUNTER_DEF(ctrKafkaAck, mutCtrKafkaAck); +} instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + +#define INST_STATSCOUNTER_INC(inst, ctr, mut) \ + do { \ + if (inst->stats) { STATSCOUNTER_INC(ctr, mut); } \ + } while(0); + +/* tables for interfacing with the v6 config system */ +/* action (instance) parameters */ +static struct cnfparamdescr actpdescr[] = { + { "topic", eCmdHdlrString, CNFPARAM_REQUIRED }, + { "dynatopic", eCmdHdlrBinary, 0 }, + { "dynatopic.cachesize", eCmdHdlrInt, 0 }, + { "dynakey", eCmdHdlrBinary, 0 }, + { "partitions.auto", eCmdHdlrBinary, 0 }, /* use librdkafka's automatic partitioning function */ + { "partitions.number", eCmdHdlrPositiveInt, 0 }, + { "partitions.usefixed", eCmdHdlrNonNegInt, 0 }, /* expert parameter, "nails" partition */ + { "broker", eCmdHdlrArray, 0 }, + { "confparam", eCmdHdlrArray, 0 }, + { "topicconfparam", eCmdHdlrArray, 0 }, + { "errorfile", eCmdHdlrGetWord, 0 }, + { "statsfile", eCmdHdlrGetWord, 0 }, + { "key", eCmdHdlrGetWord, 0 }, + { "template", eCmdHdlrGetWord, 0 }, + { "closetimeout", eCmdHdlrPositiveInt, 0 }, + { "reopenonhup", eCmdHdlrBinary, 0 }, + { "resubmitonfailure", eCmdHdlrBinary, 0 }, /* Resubmit message into kafaj queue on failure */ + { "keepfailedmessages", eCmdHdlrBinary, 0 }, + { "failedmsgfile", eCmdHdlrGetWord, 0 }, + { "statsname", eCmdHdlrGetWord, 0 } +}; +static struct cnfparamblk actpblk = + { CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr + }; + +BEGINinitConfVars /* (re)set config variables to default values */ +CODESTARTinitConfVars +ENDinitConfVars + +static uint32_t +getPartition(instanceData *const __restrict__ pData) +{ + if (pData->autoPartition) { + return RD_KAFKA_PARTITION_UA; + } else { + return (pData->fixedPartition == NO_FIXED_PARTITION) ? + ATOMIC_INC_AND_FETCH_unsigned(&pData->currPartition, + &pData->mutCurrPartition) % pData->nPartitions + : (unsigned) pData->fixedPartition; + } +} + +/* must always be called with appropriate locks taken */ +static void +free_topic(rd_kafka_topic_t **topic) +{ + if (*topic != NULL) { + DBGPRINTF("omkafka: closing topic %s\n", rd_kafka_topic_name(*topic)); + rd_kafka_topic_destroy(*topic); + *topic = NULL; + } +} + +static void ATTR_NONNULL(1) +failedmsg_entry_destruct(failedmsg_entry *const __restrict__ fmsgEntry) { + free(fmsgEntry->key); + free(fmsgEntry->payload); + free(fmsgEntry->topicname); + free(fmsgEntry); +} + +/* note: we need the length of message as we need to deal with + * non-NUL terminated strings under some circumstances. + */ +static failedmsg_entry * ATTR_NONNULL(3,5) +failedmsg_entry_construct(const char *const key, const size_t keylen, const char *const msg, +const size_t msglen, const char *const topicname) +{ + failedmsg_entry *etry = NULL; + + if((etry = malloc(sizeof(struct s_failedmsg_entry))) == NULL) { + return NULL; + } + + if (key) { + if((etry->key = (uchar*)malloc(keylen+1)) == NULL) { + free(etry); + return NULL; + } + memcpy(etry->key, key, keylen); + etry->key[keylen] = '\0'; + } else { + etry->key=NULL; + } + + if((etry->payload = (uchar*)malloc(msglen+1)) == NULL) { + free(etry->key); + free(etry); + return NULL; + } + memcpy(etry->payload, msg, msglen); + etry->payload[msglen] = '\0'; + if((etry->topicname = (uchar*)strdup(topicname)) == NULL) { + free(etry->key); + free(etry->payload); + free(etry); + return NULL; + } + return etry; +} + +/* destroy topic item */ +/* must be called with write(rkLock) */ +static void +closeTopic(instanceData *__restrict__ const pData) +{ + free_topic(&pData->pTopic); +} + +/* these dynaTopic* functions are only slightly modified versions of those found in omfile.c. + * check the sources in omfile.c for more descriptive comments about each of these functions. + * i will only put the bare descriptions in this one. 2015-01-09 - Tait Clarridge + */ + +/* delete a cache entry from the dynamic topic cache */ +/* must be called with lock(mutDynCache) */ +static rsRetVal +dynaTopicDelCacheEntry(instanceData *__restrict__ const pData, const int iEntry, const int bFreeEntry) +{ + dynaTopicCacheEntry **pCache = pData->dynCache; + DEFiRet; + assert(pCache != NULL); + + if(pCache[iEntry] == NULL) + FINALIZE; + pthread_rwlock_wrlock(&pCache[iEntry]->lock); + + DBGPRINTF("Removing entry %d for topic '%s' from dynaCache.\n", iEntry, + pCache[iEntry]->pName == NULL ? UCHAR_CONSTANT("[OPEN FAILED]") : pCache[iEntry]->pName); + + if(pCache[iEntry]->pName != NULL) { + free(pCache[iEntry]->pName); + pCache[iEntry]->pName = NULL; + } + + pthread_rwlock_unlock(&pCache[iEntry]->lock); + + if(bFreeEntry) { + pthread_rwlock_destroy(&pCache[iEntry]->lock); + free(pCache[iEntry]); + pCache[iEntry] = NULL; + } + +finalize_it: + RETiRet; +} + +/* clear the entire dynamic topic cache */ +static void +dynaTopicFreeCacheEntries(instanceData *__restrict__ const pData) +{ + register int i; + assert(pData != NULL); + + pthread_mutex_lock(&pData->mutDynCache); + for(i = 0 ; i < pData->iCurrCacheSize ; ++i) { + dynaTopicDelCacheEntry(pData, i, 1); + } + pData->iCurrElt = -1; /* invalidate current element */ + pthread_mutex_unlock(&pData->mutDynCache); +} + +/* create the topic object */ +/* must be called with _atleast_ read(rkLock) */ +static rsRetVal +createTopic(instanceData *__restrict__ const pData, const uchar *__restrict__ const newTopicName, +rd_kafka_topic_t** topic) { +/* Get a new topic conf */ + rd_kafka_topic_conf_t *const topicconf = rd_kafka_topic_conf_new(); + char errstr[MAX_ERRMSG]; + rd_kafka_topic_t *rkt = NULL; + DEFiRet; + + *topic = NULL; + + if(topicconf == NULL) { + LogError(0, RS_RET_KAFKA_ERROR, + "omkafka: error creating kafka topic conf obj: %s\n", + rd_kafka_err2str(rd_kafka_last_error())); + ABORT_FINALIZE(RS_RET_KAFKA_ERROR); + } + for(int i = 0 ; i < pData->nTopicConfParams ; ++i) { + DBGPRINTF("omkafka: setting custom topic configuration parameter: %s:%s\n", + pData->topicConfParams[i].name, + pData->topicConfParams[i].val); + if(rd_kafka_topic_conf_set(topicconf, pData->topicConfParams[i].name, + pData->topicConfParams[i].val, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + if(pData->bReportErrs) { + LogError(0, RS_RET_PARAM_ERROR, "error in kafka " + "topic conf parameter '%s=%s': %s", + pData->topicConfParams[i].name, + pData->topicConfParams[i].val, errstr); + } else { + DBGPRINTF("omkafka: setting custom topic configuration parameter '%s=%s': %s", + pData->topicConfParams[i].name, + pData->topicConfParams[i].val, errstr); + } + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + } + } + rkt = rd_kafka_topic_new(pData->rk, (char *)newTopicName, topicconf); + if(rkt == NULL) { + LogError(0, RS_RET_KAFKA_ERROR, + "omkafka: error creating kafka topic: %s\n", + rd_kafka_err2str(rd_kafka_last_error())); + ABORT_FINALIZE(RS_RET_KAFKA_ERROR); + } + *topic = rkt; +finalize_it: + RETiRet; +} + +/* create the topic object */ +/* must be called with write(rkLock) */ +static rsRetVal +prepareTopic(instanceData *__restrict__ const pData, const uchar *__restrict__ const newTopicName) +{ + DEFiRet; + iRet = createTopic(pData, newTopicName, &pData->pTopic); + if(iRet != RS_RET_OK) { + if(pData->pTopic != NULL) { + closeTopic(pData); + } + } + RETiRet; +} + +/* check dynamic topic cache for existence of the already created topic. + * if it does not exist, create a new one, or if we are currently using it + * as of the last message, keep using it. + * + * must be called with read(rkLock) + * must be called with mutDynCache locked + */ +static rsRetVal ATTR_NONNULL() +prepareDynTopic(instanceData *__restrict__ const pData, const uchar *__restrict__ const newTopicName, + rd_kafka_topic_t** topic, pthread_rwlock_t** lock) +{ + uint64 ctOldest; + int iOldest; + int i; + int iFirstFree; + rsRetVal localRet; + dynaTopicCacheEntry **pCache; + dynaTopicCacheEntry *entry = NULL; + rd_kafka_topic_t *tmpTopic = NULL; + DEFiRet; + assert(pData != NULL); + assert(newTopicName != NULL); + + pCache = pData->dynCache; + /* first check, if we still have the current topic */ + if ((pData->iCurrElt != -1) + && !ustrcmp(newTopicName, pCache[pData->iCurrElt]->pName)) { + /* great, we are all set */ + pCache[pData->iCurrElt]->clkTickAccessed = getClockTopicAccess(); + entry = pCache[pData->iCurrElt]; + STATSCOUNTER_INC(ctrCacheSkip, mutCtrCacheSkip); + FINALIZE; + } + + /* ok, no luck. Now let's search the table if we find a matching spot. + * While doing so, we also prepare for creation of a new one. + */ + pData->iCurrElt = -1; + iFirstFree = -1; + iOldest = 0; + ctOldest = getClockTopicAccess(); + for(i = 0 ; i < pData->iCurrCacheSize ; ++i) { + if(pCache[i] == NULL || pCache[i]->pName == NULL) { + if(iFirstFree == -1) + iFirstFree = i; + } else { /*got an element, let's see if it matches */ + if(!ustrcmp(newTopicName, pCache[i]->pName)) { + /* we found our element! */ + entry = pCache[i]; + pData->iCurrElt = i; + /* update "timestamp" for LRU */ + pCache[i]->clkTickAccessed = getClockTopicAccess(); + FINALIZE; + } + /* did not find it - so lets keep track of the counters for LRU */ + if(pCache[i]->clkTickAccessed < ctOldest) { + ctOldest = pCache[i]->clkTickAccessed; + iOldest = i; + } + } + } + STATSCOUNTER_INC(ctrCacheMiss, mutCtrCacheMiss); + + /* invalidate iCurrElt as we may error-exit out of this function when the currrent + * iCurrElt has been freed or otherwise become unusable. This is a precaution, and + * performance-wise it may be better to do that in each of the exits. However, that + * is error-prone, so I prefer to do it here. -- rgerhards, 2010-03-02 + */ + pData->iCurrElt = -1; + + if(iFirstFree == -1 && (pData->iCurrCacheSize < pData->iDynaTopicCacheSize)) { + /* there is space left, so set it to that index */ + iFirstFree = pData->iCurrCacheSize++; + } + + if(iFirstFree == -1) { + dynaTopicDelCacheEntry(pData, iOldest, 0); + STATSCOUNTER_INC(ctrCacheEvict, mutCtrCacheEvict); + iFirstFree = iOldest; /* this one *is* now free ;) */ + } else { + pCache[iFirstFree] = NULL; + } + /* we need to allocate memory for the cache structure */ + if(pCache[iFirstFree] == NULL) { + CHKmalloc(pCache[iFirstFree] = + (dynaTopicCacheEntry*) calloc(1, sizeof(dynaTopicCacheEntry))); + CHKiRet(pthread_rwlock_init(&pCache[iFirstFree]->lock, NULL)); + } + + /* Ok, we finally can open the topic */ + localRet = createTopic(pData, newTopicName, &tmpTopic); + + if(localRet != RS_RET_OK) { + LogError(0, localRet, "Could not open dynamic topic '%s' " + "[state %d] - discarding message", + newTopicName, localRet); + ABORT_FINALIZE(localRet); + } + + if((pCache[iFirstFree]->pName = ustrdup(newTopicName)) == NULL) { + free_topic(&tmpTopic); + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + pCache[iFirstFree]->pTopic = tmpTopic; + pCache[iFirstFree]->clkTickAccessed = getClockTopicAccess(); + entry = pCache[iFirstFree]; + pData->iCurrElt = iFirstFree; + DBGPRINTF("Added new entry %d for topic cache, topic '%s'.\n", iFirstFree, newTopicName); + +finalize_it: + if (iRet == RS_RET_OK) { + *topic = entry->pTopic; + *lock = &entry->lock; + } + RETiRet; +} + +/* write data error request/replies to separate error file + * Note: we open the file but never close it before exit. If it + * needs to be closed, HUP must be sent. + */ +static rsRetVal +writeDataError(instanceData *const pData, + const char *const __restrict__ data, + const size_t lenData, + const int kafkaErr) +{ + int bLocked = 0; + struct json_object *json = NULL; + DEFiRet; + + if(pData->errorFile == NULL) { + FINALIZE; + } + + json = json_object_new_object(); + if(json == NULL) { + ABORT_FINALIZE(RS_RET_ERR); + } + struct json_object *jval; + jval = json_object_new_int(kafkaErr); + json_object_object_add(json, "errcode", jval); + jval = json_object_new_string(rd_kafka_err2str(kafkaErr)); + json_object_object_add(json, "errmsg", jval); + jval = json_object_new_string_len(data, lenData); + json_object_object_add(json, "data", jval); + + struct iovec iov[2]; + iov[0].iov_base = (void*) json_object_get_string(json); + iov[0].iov_len = strlen(iov[0].iov_base); + iov[1].iov_base = (char *) "\n"; + iov[1].iov_len = 1; + + /* we must protect the file write do operations due to other wrks & HUP */ + pthread_mutex_lock(&pData->mutErrFile); + bLocked = 1; + if(pData->fdErrFile == -1) { + pData->fdErrFile = open((char*)pData->errorFile, + O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, + S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); + if(pData->fdErrFile == -1) { + LogError(errno, RS_RET_ERR, "omkafka: error opening error file %s", + pData->errorFile); + ABORT_FINALIZE(RS_RET_ERR); + } + } + + /* Note: we do not do real error-handling on the err file, as this + * complicates things way to much. + */ + const ssize_t nwritten = writev(pData->fdErrFile, iov, sizeof(iov)/sizeof(struct iovec)); + if(nwritten != (ssize_t) iov[0].iov_len + 1) { + LogError(errno, RS_RET_ERR, + "omkafka: error writing error file, write returns %lld\n", + (long long) nwritten); + } + +finalize_it: + if(bLocked) + pthread_mutex_unlock(&pData->mutErrFile); + if(json != NULL) + json_object_put(json); + RETiRet; +} + +/* write librdkafka stats object to a file + * Note: we open the file but never close it before exit. If it + * needs to be closed, HUP must be sent. + * Assumes pData->statsFile != NULL. + */ +static rsRetVal +writeStats(instanceData *const pData, + char *statsData, + const size_t lenData) +{ + int bLocked = 0; + DEFiRet; + + /* Protect the file write from operations due to other wrks & HUP */ + pthread_mutex_lock(&pData->mutStatsFile); + bLocked = 1; + if(pData->fdStatsFile == -1) { + pData->fdStatsFile = open((char*)pData->statsFile, + O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, + S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); + if(pData->fdStatsFile == -1) { + LogError(errno, RS_RET_ERR, "omkafka: error opening stats file %s", + pData->statsFile); + ABORT_FINALIZE(RS_RET_ERR); + } + } + + ssize_t nwritten = write(pData->fdStatsFile, statsData, lenData); + nwritten += write(pData->fdStatsFile, "\n", 1); + if(nwritten != (ssize_t) lenData + 1) { + LogError(errno, RS_RET_ERR, + "omkafka: error writing stats file, write returns %lld, expected %lld\n", + (long long) nwritten, (long long)(lenData + 1)); + } + +finalize_it: + if(bLocked) + pthread_mutex_unlock(&pData->mutStatsFile); + RETiRet; +} + +/* identify and count specific types of kafka failures. + */ +static rsRetVal +updateKafkaFailureCounts(rd_kafka_resp_err_t err) { + DEFiRet; + if (err == RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE) { + STATSCOUNTER_INC(ctrKafkaMsgTooLarge, mutCtrKafkaMsgTooLarge); + } else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) { + STATSCOUNTER_INC(ctrKafkaUnknownTopic, mutCtrKafkaUnknownTopic); + } else if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) { + STATSCOUNTER_INC(ctrKafkaQueueFull, mutCtrKafkaQueueFull); + } else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION) { + STATSCOUNTER_INC(ctrKafkaUnknownPartition, mutCtrKafkaUnknownPartition); + } else { + STATSCOUNTER_INC(ctrKafkaOtherErrors, mutCtrKafkaOtherErrors); + } + + RETiRet; +} + +/* must be called with read(rkLock) + * b_do_resubmit tells if we shall resubmit on error or not. This is needed + * when we submit already resubmitted messages. + */ +static rsRetVal ATTR_NONNULL(1, 3) +writeKafka(instanceData *const pData, uchar *const key, uchar *const msg, + uchar *const msgTimestamp, uchar *const topic, const int b_do_resubmit) +{ + DEFiRet; + const int partition = getPartition(pData); + rd_kafka_topic_t *rkt = NULL; + pthread_rwlock_t *dynTopicLock = NULL; + failedmsg_entry* fmsgEntry; + int topic_mut_locked = 0; + rd_kafka_resp_err_t msg_kafka_response; +#if RD_KAFKA_VERSION >= 0x00090400 + int64_t ttMsgTimestamp; +#else + int msg_enqueue_status = 0; +#endif + + DBGPRINTF("omkafka: trying to send: key:'%s', msg:'%s', timestamp:'%s'\n", + key, msg, msgTimestamp); + + if(pData->dynaTopic) { + DBGPRINTF("omkafka: topic to insert to: %s\n", topic); + /* ensure locking happens all inside this function */ + pthread_mutex_lock(&pData->mutDynCache); + const rsRetVal localRet = prepareDynTopic(pData, topic, &rkt, &dynTopicLock); + if (localRet == RS_RET_OK) { + pthread_rwlock_rdlock(dynTopicLock); + topic_mut_locked = 1; + } + pthread_mutex_unlock(&pData->mutDynCache); + CHKiRet(localRet); + } else { + rkt = pData->pTopic; + } + +#if RD_KAFKA_VERSION >= 0x00090400 + if (msgTimestamp == NULL) { + /* Resubmitted items don't have a timestamp */ + ttMsgTimestamp = 0; + } else { + ttMsgTimestamp = atoi((char*)msgTimestamp); /* Convert timestamp into int */ + ttMsgTimestamp *= 1000; /* Timestamp in Milliseconds for kafka */ + } + DBGPRINTF("omkafka: rd_kafka_producev timestamp=%s/%" PRId64 "\n", msgTimestamp, ttMsgTimestamp); + + /* Using new kafka producev API, includes Timestamp! */ + if (key == NULL) { + msg_kafka_response = rd_kafka_producev(pData->rk, + RD_KAFKA_V_RKT(rkt), + RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_VALUE(msg, strlen((char*)msg)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_TIMESTAMP(ttMsgTimestamp), + RD_KAFKA_V_KEY(NULL, 0), + RD_KAFKA_V_END); + } else { + DBGPRINTF("omkafka: rd_kafka_producev key=%s\n", key); + msg_kafka_response = rd_kafka_producev(pData->rk, + RD_KAFKA_V_RKT(rkt), + RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_VALUE(msg, strlen((char*)msg)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_TIMESTAMP(ttMsgTimestamp), + RD_KAFKA_V_KEY(key,strlen((char*)key)), + RD_KAFKA_V_END); + } + + if (msg_kafka_response != RD_KAFKA_RESP_ERR_NO_ERROR ) { + updateKafkaFailureCounts(msg_kafka_response); + + /* Put into kafka queue, again if configured! */ + if (pData->bResubmitOnFailure && + b_do_resubmit && + msg_kafka_response != RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE) { + DBGPRINTF("omkafka: Failed to produce to topic '%s' (rd_kafka_producev)" + "partition %d: '%d/%s' - adding MSG '%s' to failed for RETRY!\n", + rd_kafka_topic_name(rkt), partition, msg_kafka_response, + rd_kafka_err2str(msg_kafka_response), msg); + CHKmalloc(fmsgEntry = failedmsg_entry_construct((char*) key, key ? strlen((char*)key) : 0, + (char*) msg, strlen((char*)msg),rd_kafka_topic_name(rkt))); + SLIST_INSERT_HEAD(&pData->failedmsg_head, fmsgEntry, entries); + } else { + LogError(0, RS_RET_KAFKA_PRODUCE_ERR, + "omkafka: Failed to produce to topic '%s' (rd_kafka_producev)" + "partition %d: %d/%s - KEY '%s' -MSG '%s'\n", + rd_kafka_topic_name(rkt), partition, msg_kafka_response, + rd_kafka_err2str(msg_kafka_response), key, msg); + } + } +#else + + DBGPRINTF("omkafka: rd_kafka_produce\n"); + /* Using old kafka produce API */ + msg_enqueue_status = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, + msg, strlen((char*)msg), key, + key ? strlen((char*)key) : 0, + NULL); + if(msg_enqueue_status == -1) { + msg_kafka_response = rd_kafka_last_error(); + updateKafkaFailureCounts(msg_kafka_response); + + /* Put into kafka queue, again if configured! */ + if (pData->bResubmitOnFailure && + b_do_resubmit && + msg_kafka_response != RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE) { + DBGPRINTF("omkafka: Failed to produce to topic '%s' (rd_kafka_produce)" + "partition %d: '%d/%s' - adding MSG '%s' KEY '%s' to failed for RETRY!\n", + rd_kafka_topic_name(rkt), partition, msg_kafka_response, + rd_kafka_err2str(rd_kafka_errno2err(errno)), msg, key ? (const char*) key : ""); + CHKmalloc(fmsgEntry = failedmsg_entry_construct((char*) key, key ? strlen((char*)key) : 0, + (char*) msg, strlen((char*)msg),rd_kafka_topic_name(rkt))); + SLIST_INSERT_HEAD(&pData->failedmsg_head, fmsgEntry, entries); + } else { + LogError(0, RS_RET_KAFKA_PRODUCE_ERR, + "omkafka: Failed to produce to topic '%s' (rd_kafka_produce) " + "partition %d: %d/%s - MSG '%s' KEY '%s'\n", + rd_kafka_topic_name(rkt), partition, msg_kafka_response, + rd_kafka_err2str(msg_kafka_response), msg, key); + } + } +#endif + + const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */ + DBGPRINTF("omkafka: writeKafka kafka outqueue length: %d, callbacks called %d\n", + rd_kafka_outq_len(pData->rk), callbacksCalled); + +#if RD_KAFKA_VERSION >= 0x00090400 + if (msg_kafka_response != RD_KAFKA_RESP_ERR_NO_ERROR) { +#else + if (msg_enqueue_status == -1) { +#endif + STATSCOUNTER_INC(ctrKafkaFail, mutCtrKafkaFail); + INST_STATSCOUNTER_INC(pData, pData->ctrKafkaFail, pData->mutCtrKafkaFail); + ABORT_FINALIZE(RS_RET_KAFKA_PRODUCE_ERR); + /* ABORT_FINALIZE isn't absolutely necessary as of now, + because this is the last line anyway, but its useful to ensure + correctness in case we add more stuff below this line at some point*/ + } + +finalize_it: + if(topic_mut_locked) { + pthread_rwlock_unlock(dynTopicLock); + } + DBGPRINTF("omkafka: writeKafka returned %d\n", iRet); + if(iRet != RS_RET_OK) { + iRet = RS_RET_SUSPENDED; + } + STATSCOUNTER_SETMAX_NOMUT(ctrQueueSize, (unsigned) rd_kafka_outq_len(pData->rk)); + STATSCOUNTER_INC(ctrTopicSubmit, mutCtrTopicSubmit); + INST_STATSCOUNTER_INC(pData, pData->ctrTopicSubmit, pData->mutCtrTopicSubmit); + RETiRet; +} + +static void +deliveryCallback(rd_kafka_t __attribute__((unused)) *rk, + const rd_kafka_message_t *rkmessage, + void *opaque) +{ + instanceData *const pData = (instanceData *) opaque; + failedmsg_entry* fmsgEntry; + DEFiRet; + + if (rkmessage->err) { + updateKafkaFailureCounts(rkmessage->err); + + /* Put into kafka queue, again if configured! */ + if (pData->bResubmitOnFailure) { + DBGPRINTF("omkafka: kafka delivery FAIL on Topic '%s', msg '%.*s', key '%.*s' -" + " adding to FAILED MSGs for RETRY!\n", + rd_kafka_topic_name(rkmessage->rkt), + (int)(rkmessage->len-1), (char*)rkmessage->payload, + (int)(rkmessage->key_len), (char*)rkmessage->key); + CHKmalloc(fmsgEntry = failedmsg_entry_construct(rkmessage->key, rkmessage->key_len, + rkmessage->payload, rkmessage->len,rd_kafka_topic_name(rkmessage->rkt))); + SLIST_INSERT_HEAD(&pData->failedmsg_head, fmsgEntry, entries); + } else { + LogError(0, RS_RET_ERR, + "omkafka: kafka delivery FAIL on Topic '%s', msg '%.*s', key '%.*s'\n", + rd_kafka_topic_name(rkmessage->rkt), + (int)(rkmessage->len-1), (char*)rkmessage->payload, + (int)(rkmessage->key_len), (char*)rkmessage->key); + writeDataError(pData, (char*) rkmessage->payload, rkmessage->len, rkmessage->err); + } + STATSCOUNTER_INC(ctrKafkaFail, mutCtrKafkaFail); + INST_STATSCOUNTER_INC(pData, pData->ctrKafkaFail, pData->mutCtrKafkaFail); + } else { + DBGPRINTF("omkafka: kafka delivery SUCCESS on msg '%.*s'\n", (int)(rkmessage->len-1), + (char*)rkmessage->payload); + STATSCOUNTER_INC(ctrKafkaAck, mutCtrKafkaAck); + INST_STATSCOUNTER_INC(pData, pData->ctrKafkaAck, pData->mutCtrKafkaAck); + } +finalize_it: + if(iRet != RS_RET_OK) { + DBGPRINTF("omkafka: deliveryCallback returned failure %d\n", iRet); + } +} + +/** + * This function looks for a json object that corresponds to the + * passed name and returns it is found. Otherwise returns NULL. + * It will be used for processing stats callback json object. + */ +static struct fjson_object * +get_object(struct fjson_object *fj_obj, const char * name) { + struct fjson_object_iterator it = fjson_object_iter_begin(fj_obj); + struct fjson_object_iterator itEnd = fjson_object_iter_end(fj_obj); + while (!fjson_object_iter_equal (&it, &itEnd)) { + const char * key = fjson_object_iter_peek_name (&it); + struct fjson_object * val = fjson_object_iter_peek_value(&it); + if(!strncmp(key, name, strlen(name))){ + return val; + } + fjson_object_iter_next (&it); + } + + return NULL; +} + +/** + * This function performs a two level search in stats callback json + * object. It iterates over broker objects and for each broker object + * returns desired level2 value (such as avg/min/max) for specified + * level1 window statistic (such as rtt/throttle/int_latency). Threshold + * allows skipping values that are too small, so that they don't + * impact on aggregate averaged value that is returned. + */ +static uint64 +jsonExtractWindoStats(struct fjson_object * stats_object, + const char * level1_obj_name, const char * level2_obj_name, + unsigned long skip_threshold) { + uint64 level2_val; + uint64 agg_val = 0; + uint64 ret_val = 0; + int active_brokers = 0; + + struct fjson_object * brokers_obj = get_object(stats_object, "brokers"); + if (brokers_obj == NULL) { + LogMsg(0, NO_ERRCODE, LOG_ERR, "jsonExtractWindowStat: failed to find brokers object"); + return ret_val; + } + + /* iterate over borkers to get level1 window objects at level2 (min, max, avg, etc.) */ + struct fjson_object_iterator it = fjson_object_iter_begin(brokers_obj); + struct fjson_object_iterator itEnd = fjson_object_iter_end(brokers_obj); + while (!fjson_object_iter_equal (&it, &itEnd)) { + struct fjson_object * val = fjson_object_iter_peek_value(&it); + struct fjson_object * level1_obj = get_object(val, level1_obj_name); + if(level1_obj == NULL) + return ret_val; + + struct fjson_object * level2_obj = get_object(level1_obj, level2_obj_name); + if(level2_obj == NULL) + return ret_val; + + level2_val = fjson_object_get_int64(level2_obj); + if (level2_val > skip_threshold) { + agg_val += level2_val; + active_brokers++; + } + fjson_object_iter_next (&it); + } + if(active_brokers > 0) { + ret_val = agg_val/active_brokers; + } + + return ret_val; +} + +/** + * librdkafka will call this function after every statistics.interval.ms + * interval, which is specified in confParam. See the explanation at: + * https://github.com/edenhill/librdkafka/wiki/Statistics + * + * Here we have extracted windows stats: rtt, throttle time, and internal + * latency averages. These values will be logged as impstats messages. + */ +static int +statsCallback(rd_kafka_t __attribute__((unused)) *rk, + char *json, size_t __attribute__((unused)) json_len, + void __attribute__((unused)) *opaque) { + instanceData *const pData = (instanceData *) opaque; + char buf[2048]; + char handler_name[1024] = "unknown"; + int replyq = 0; + int msg_cnt = 0; + int msg_size = 0; + uint64 msg_max = 0; + uint64 msg_size_max = 0; + + struct fjson_object * stats_object = NULL; + struct fjson_object * fj_obj = NULL; + + DBGPRINTF("omkafka: librdkafka stats callback: %s\n", json); + + /* prepare fjson object from stats callback for parsing */ + stats_object = fjson_tokener_parse(json); + if (stats_object == NULL) { + LogMsg(0, NO_ERRCODE, LOG_ERR, "statsCallback: fjson tokenizer failed:"); + return 0; + } + enum fjson_type type = fjson_object_get_type(stats_object); + if (type != fjson_type_object) { + LogMsg(0, NO_ERRCODE, LOG_ERR, "statsCallback: json is not of type object; can't process statsCB\n"); + return 0; + } + + /* top level stats extraction through libfastjson based parsing */ + fj_obj = get_object(stats_object, "name"); + if (fj_obj != NULL) + snprintf(handler_name, sizeof(handler_name), "%s", (char *)fjson_object_get_string(fj_obj)); + fj_obj = get_object(stats_object, "replyq"); + replyq = (fj_obj == NULL) ? 0 : fjson_object_get_int(fj_obj); + fj_obj = get_object(stats_object, "msg_cnt"); + msg_cnt = (fj_obj == NULL) ? 0 : fjson_object_get_int(fj_obj); + fj_obj = get_object(stats_object, "msg_size"); + msg_size = (fj_obj == NULL) ? 0 : fjson_object_get_int(fj_obj); + fj_obj = get_object(stats_object, "msg_max"); + msg_max = (fj_obj == NULL) ? 0 : fjson_object_get_int64(fj_obj); + fj_obj = get_object(stats_object, "msg_size_max"); + msg_size_max = (fj_obj == NULL) ? 0 : fjson_object_get_int64(fj_obj); + + /* window stats extraction to be picked up by impstats counters */ + rtt_avg_usec = jsonExtractWindoStats(stats_object, "rtt", "avg", 100); + throttle_avg_msec = jsonExtractWindoStats(stats_object, "throttle", "avg", 0); + int_latency_avg_usec = jsonExtractWindoStats(stats_object, "int_latency", "avg", 0); + json_object_put (stats_object); + + /* emit a log line to get stats visibility per librdkafka client */ + snprintf(buf, sizeof(buf), + "statscb_window_stats: handler_name=%s replyq=%d msg_cnt=%d msg_size=%d " + "msg_max=%lld msg_size_max=%lld rtt_avg_usec=%lld throttle_avg_msec=%lld " + "int_latency_avg_usec=%lld", + handler_name, replyq, msg_cnt, msg_size, msg_max, msg_size_max, + rtt_avg_usec, throttle_avg_msec, int_latency_avg_usec); + LogMsg(0, NO_ERRCODE, LOG_INFO, "%s\n", buf); + + /* Write the entire json stats object, if requested */ + if (pData->statsFile != NULL) + writeStats(pData, json, json_len); + + return 0; +} + +static void +kafkaLogger(const rd_kafka_t __attribute__((unused)) *rk, int level, + const char *fac, const char *buf) +{ + DBGPRINTF("omkafka: kafka log message [%d,%s]: %s\n", + level, fac, buf); +} + +/* should be called with write(rkLock) */ +static void +do_rd_kafka_destroy(instanceData *const __restrict__ pData) +{ + if (pData->rk == NULL) { + DBGPRINTF("omkafka: onDestroy can't close, handle wasn't open\n"); + goto done; + } + int queuedCount = rd_kafka_outq_len(pData->rk); + DBGPRINTF("omkafka: onDestroy closing - items left in outqueue: %d\n", queuedCount); + + struct timespec tOut; + timeoutComp(&tOut, pData->closeTimeout); + + while (timeoutVal(&tOut) > 0) { + queuedCount = rd_kafka_outq_len(pData->rk); + if (queuedCount > 0) { + /* Flush all remaining kafka messages (rd_kafka_poll is called inside) */ + const int flushStatus = rd_kafka_flush(pData->rk, pData->closeTimeout); + if (flushStatus == RD_KAFKA_RESP_ERR_NO_ERROR) { + DBGPRINTF("omkafka: onDestroyflushed remaining '%d' messages " + "to kafka topic '%s'\n", queuedCount, + (pData->pTopic == NULL ? "NULL" : rd_kafka_topic_name(pData->pTopic)) + ); + + /* Trigger callbacks a last time before shutdown */ + const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */ + DBGPRINTF("omkafka: onDestroy kafka outqueue length: %d, " + "callbacks called %d\n", rd_kafka_outq_len(pData->rk), + callbacksCalled); + } else /* TODO: Handle unsend messages here! */ { + /* timeout = RD_KAFKA_RESP_ERR__TIMED_OUT */ + LogError(0, RS_RET_KAFKA_ERROR, "omkafka: onDestroy " + "Failed to send remaining '%d' messages to " + "topic '%s' on shutdown with error: '%s'", + queuedCount, + (pData->pTopic == NULL ? "NULL" : rd_kafka_topic_name(pData->pTopic)), + rd_kafka_err2str(flushStatus)); +#if RD_KAFKA_VERSION >= 0x010001ff + rd_kafka_purge(pData->rk, RD_KAFKA_PURGE_F_QUEUE | RD_KAFKA_PURGE_F_INFLIGHT); + /* Trigger callbacks a last time before shutdown */ + const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */ + DBGPRINTF("omkafka: onDestroy kafka outqueue length: %d, " + "callbacks called %d\n", rd_kafka_outq_len(pData->rk), + callbacksCalled); +#endif + } + } else { + break; + } + } + if (queuedCount > 0) { + LogMsg(0, RS_RET_ERR, LOG_WARNING, + "omkafka: queue-drain for close timed-out took too long, " + "items left in outqueue: %d -- this may indicate data loss", + rd_kafka_outq_len(pData->rk)); + } + if (pData->dynaTopic) { + dynaTopicFreeCacheEntries(pData); + } else { + closeTopic(pData); + } + + /* Final destroy of kafka!*/ + rd_kafka_destroy(pData->rk); + +# if RD_KAFKA_VERSION < 0x00090001 + /* Wait for kafka being destroyed in old API */ + if (rd_kafka_wait_destroyed(10000) < 0) { + LogError(0, RS_RET_ERR, "omkafka: rd_kafka_destroy did not finish after grace timeout (10s)!"); + } else { + DBGPRINTF("omkafka: rd_kafka_destroy successfully finished\n"); + } +# endif + + pData->rk = NULL; +done: return; +} + +/* should be called with write(rkLock) */ +static void +closeKafka(instanceData *const __restrict__ pData) +{ + if(pData->bIsOpen) { + do_rd_kafka_destroy(pData); + pData->bIsOpen = 0; + } +} + +static void +errorCallback(rd_kafka_t __attribute__((unused)) *rk, + int __attribute__((unused)) err, + const char *reason, + void __attribute__((unused)) *opaque) +{ + /* Get InstanceData pointer */ + instanceData *const pData = (instanceData *) opaque; + + /* count kafka transport errors that cause action suspension */ + if (err == RD_KAFKA_RESP_ERR__MSG_TIMED_OUT) { + STATSCOUNTER_INC(ctrKafkaRespTimedOut, mutCtrKafkaRespTimedOut); + } else if (err == RD_KAFKA_RESP_ERR__TRANSPORT) { + STATSCOUNTER_INC(ctrKafkaRespTransport, mutCtrKafkaRespTransport); + } else if (err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN) { + STATSCOUNTER_INC(ctrKafkaRespBrokerDown, mutCtrKafkaRespBrokerDown); + } else if (err == RD_KAFKA_RESP_ERR__AUTHENTICATION) { + STATSCOUNTER_INC(ctrKafkaRespAuth, mutCtrKafkaRespAuth); + } else if (err == RD_KAFKA_RESP_ERR__SSL) { + STATSCOUNTER_INC(ctrKafkaRespSSL, mutCtrKafkaRespSSL); + } else { + STATSCOUNTER_INC(ctrKafkaRespOther, mutCtrKafkaRespOther); + } + + /* Handle common transport error codes*/ + if (err == RD_KAFKA_RESP_ERR__MSG_TIMED_OUT || + err == RD_KAFKA_RESP_ERR__TRANSPORT || + err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN || + err == RD_KAFKA_RESP_ERR__AUTHENTICATION || + err == RD_KAFKA_RESP_ERR__SSL) { + /* Broker transport error, we need to disable the action for now!*/ + pData->bIsSuspended = 1; + LogMsg(0, RS_RET_KAFKA_ERROR, LOG_WARNING, + "omkafka: action will suspended due to kafka error %d: %s", + err, rd_kafka_err2str(err)); + } else { + LogError(0, RS_RET_KAFKA_ERROR, "omkafka: kafka error message: %d,'%s','%s'", + err, rd_kafka_err2str(err), reason); + } +} + + + +#if 0 /* the stock librdkafka version in Ubuntu 14.04 LTS does NOT support metadata :-( */ +/* Note: this is a skeleton, with some code missing--> add it when it is actually implemented. */ +static int +getConfiguredPartitions() +{ + struct rd_kafka_metadata *pMetadata; + if(rd_kafka_metadata(pData->rk, 0, rkt, &pMetadata, 8) + == RD_KAFKA_RESP_ERR_NO_ERROR) { + dbgprintf("omkafka: topic '%s' has %d partitions\n", + pData->topic, pMetadata->topics[0]->partition_cnt); + rd_kafka_metadata_destroy(pMetadata); + } else { + dbgprintf("omkafka: error reading metadata\n"); + // TODO: handle this gracefull **when** we actually need + // the metadata -- or remove completely. 2014-12-12 rgerhards + } +} +#endif + +/* should be called with write(rkLock) */ +static rsRetVal +openKafka(instanceData *const __restrict__ pData) +{ + char errstr[MAX_ERRMSG]; + DEFiRet; + + if(pData->bIsOpen) + FINALIZE; + + pData->pTopic = NULL; + + /* main conf */ + rd_kafka_conf_t *const conf = rd_kafka_conf_new(); + if(conf == NULL) { + LogError(0, RS_RET_KAFKA_ERROR, "omkafka: error creating kafka conf obj: %s\n", + rd_kafka_err2str(rd_kafka_last_error())); + ABORT_FINALIZE(RS_RET_KAFKA_ERROR); + } + +#ifdef DEBUG + /* enable kafka debug output */ + if(rd_kafka_conf_set(conf, "debug", RD_KAFKA_DEBUG_CONTEXTS, + errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + LogError(0, RS_RET_KAFKA_ERROR, "omkafka: error setting kafka debug option: %s\n", errstr); + /* DO NOT ABORT IN THIS CASE! */ + } +#endif + + for(int i = 0 ; i < pData->nConfParams ; ++i) { + DBGPRINTF("omkafka: setting custom configuration parameter: %s:%s\n", + pData->confParams[i].name, + pData->confParams[i].val); + if(rd_kafka_conf_set(conf, pData->confParams[i].name, + pData->confParams[i].val, errstr, sizeof(errstr)) + != RD_KAFKA_CONF_OK) { + if(pData->bReportErrs) { + LogError(0, RS_RET_PARAM_ERROR, "error setting custom configuration " + "parameter '%s=%s': %s", + pData->confParams[i].name, + pData->confParams[i].val, errstr); + } else { + DBGPRINTF("omkafka: error setting custom configuration parameter '%s=%s': %s", + pData->confParams[i].name, + pData->confParams[i].val, errstr); + } + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + } + } + rd_kafka_conf_set_opaque(conf, (void *) pData); + rd_kafka_conf_set_dr_msg_cb(conf, deliveryCallback); + rd_kafka_conf_set_error_cb(conf, errorCallback); + rd_kafka_conf_set_stats_cb(conf, statsCallback); +# if RD_KAFKA_VERSION >= 0x00090001 + rd_kafka_conf_set_log_cb(conf, kafkaLogger); +# endif + + char kafkaErrMsg[1024]; + pData->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, kafkaErrMsg, sizeof(kafkaErrMsg)); + if(pData->rk == NULL) { + LogError(0, RS_RET_KAFKA_ERROR, + "omkafka: error creating kafka handle: %s\n", kafkaErrMsg); + ABORT_FINALIZE(RS_RET_KAFKA_ERROR); + } + +# if RD_KAFKA_VERSION < 0x00090001 + rd_kafka_conf_set_log_cb(pData->rk, kafkaLogger); +# endif + DBGPRINTF("omkafka setting brokers: '%s'n", pData->brokers); + if(rd_kafka_brokers_add(pData->rk, (char*)pData->brokers) == 0) { + LogError(0, RS_RET_KAFKA_NO_VALID_BROKERS, + "omkafka: no valid brokers specified: %s\n", pData->brokers); + ABORT_FINALIZE(RS_RET_KAFKA_NO_VALID_BROKERS); + } + + pData->bIsOpen = 1; +finalize_it: + if(iRet == RS_RET_OK) { + pData->bReportErrs = 1; + } else { + pData->bReportErrs = 0; + if(pData->rk != NULL) { + do_rd_kafka_destroy(pData); + } + } + RETiRet; +} + +static rsRetVal +setupKafkaHandle(instanceData *const __restrict__ pData, int recreate) +{ + DEFiRet; + pthread_rwlock_wrlock(&pData->rkLock); + if (recreate) { + closeKafka(pData); + } + CHKiRet(openKafka(pData)); + if (! pData->dynaTopic) { + if( pData->pTopic == NULL) + CHKiRet(prepareTopic(pData, pData->topic)); + } +finalize_it: + if (iRet != RS_RET_OK) { + if (pData->rk != NULL) { + closeKafka(pData); + } + + /* Parameter Error's cannot be resumed, so we need to disable the action */ + if (iRet == RS_RET_PARAM_ERROR) { + iRet = RS_RET_DISABLE_ACTION; + LogError(0, iRet, "omkafka: action will be disabled due invalid " + "kafka configuration parameters\n"); + } + + } + pthread_rwlock_unlock(&pData->rkLock); + RETiRet; +} + +static rsRetVal +checkFailedMessages(instanceData *const __restrict__ pData) +{ + failedmsg_entry* fmsgEntry; + DEFiRet; + + /* Loop through failed messages, reprocess them first! */ + while (!SLIST_EMPTY(&pData->failedmsg_head)) { + fmsgEntry = SLIST_FIRST(&pData->failedmsg_head); + assert(fmsgEntry != NULL); + /* Put back into kafka! */ + iRet = writeKafka(pData, (uchar*) fmsgEntry->key, (uchar*) fmsgEntry->payload, NULL, + fmsgEntry->topicname,NO_RESUBMIT); + if(iRet != RS_RET_OK) { + LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING, + "omkafka: failed to deliver failed msg '%.*s' with status %d. " + "- suspending AGAIN!", + (int)(strlen((char*)fmsgEntry->payload)-1), + (char*)fmsgEntry->payload, iRet); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } else { + DBGPRINTF("omkafka: successfully delivered failed msg '%.*s'.\n", + (int)(strlen((char*)fmsgEntry->payload)-1), + (char*)fmsgEntry->payload); + /* Note: we can use SLIST even though it is o(n), because the element + * in question is always either the root or the next element and + * SLIST_REMOVE iterates only until the element to be deleted is found. + * We cannot use SLIST_REMOVE_HEAD() as new elements may have been + * added in the delivery callback! + * TODO: sounds like bad logic -- why do we add and remove, just simply + * keep it in queue? + */ + SLIST_REMOVE(&pData->failedmsg_head, fmsgEntry, s_failedmsg_entry, entries); + failedmsg_entry_destruct(fmsgEntry); + } + } + +finalize_it: + RETiRet; +} + +/* This function persists failed messages into a data file, so they can + * be resend on next startup. + * alorbach, 2017-06-02 + */ +static rsRetVal ATTR_NONNULL(1) +persistFailedMsgs(instanceData *const __restrict__ pData) +{ + DEFiRet; + int fdMsgFile = -1; + ssize_t nwritten; + + if(SLIST_EMPTY(&pData->failedmsg_head)) { + DBGPRINTF("omkafka: persistFailedMsgs: We do not need to persist failed messages.\n"); + FINALIZE; + } + + fdMsgFile = open((char*)pData->failedMsgFile, + O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, + S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); + if(fdMsgFile == -1) { + LogError(errno, RS_RET_ERR, "omkafka: persistFailedMsgs error opening failed msg file %s", + pData->failedMsgFile); + ABORT_FINALIZE(RS_RET_ERR); + } + + while (!SLIST_EMPTY(&pData->failedmsg_head)) { + failedmsg_entry* fmsgEntry = SLIST_FIRST(&pData->failedmsg_head); + assert(fmsgEntry != NULL); + nwritten = write(fdMsgFile, fmsgEntry->topicname, ustrlen(fmsgEntry->topicname) ); + if(nwritten != -1) + nwritten = write(fdMsgFile, "\t", 1); + if((nwritten != -1) && (fmsgEntry->key)) + nwritten = write(fdMsgFile, fmsgEntry->key, ustrlen(fmsgEntry->key) ); + if(nwritten != -1) + nwritten = write(fdMsgFile, "\t", 1); + if(nwritten != -1) + nwritten = write(fdMsgFile, fmsgEntry->payload, ustrlen(fmsgEntry->payload) ); + if(nwritten == -1) { + LogError(errno, RS_RET_ERR, "omkafka: persistFailedMsgs error writing failed msg file"); + ABORT_FINALIZE(RS_RET_ERR); + } else { + DBGPRINTF("omkafka: persistFailedMsgs successfully written loaded msg '%.*s' for " + "topic '%s'\n", (int)(strlen((char*)fmsgEntry->payload)-1), + fmsgEntry->payload, fmsgEntry->topicname); + } + SLIST_REMOVE_HEAD(&pData->failedmsg_head, entries); + failedmsg_entry_destruct(fmsgEntry); + } + +finalize_it: + if(fdMsgFile != -1) { + close(fdMsgFile); + } + if(iRet != RS_RET_OK) { + LogError(0, iRet, "omkafka: could not persist failed messages " + "file %s - failed messages will be lost.", + (char*)pData->failedMsgFile); + } + RETiRet; +} + +/* This function loads failed messages from a data file, so they can + * be resend after action startup. + * alorbach, 2017-06-06 + */ +static rsRetVal +loadFailedMsgs(instanceData *const __restrict__ pData) +{ + DEFiRet; + struct stat stat_buf; + failedmsg_entry* fmsgEntry; + strm_t *pstrmFMSG = NULL; + cstr_t *pCStr = NULL; + uchar *puStr; + char *pStrTabPos; + char *pStrTabPos2; + + assert(pData->failedMsgFile != NULL); + + /* check if the file exists */ + if(stat((char*) pData->failedMsgFile, &stat_buf) == -1) { + if(errno == ENOENT) { + DBGPRINTF("omkafka: loadFailedMsgs failed messages file %s wasn't found, " + "continue startup\n", pData->failedMsgFile); + ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); + } else { + LogError(errno, RS_RET_IO_ERROR, + "omkafka: loadFailedMsgs could not open failed messages file %s", + pData->failedMsgFile); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + } else { + DBGPRINTF("omkafka: loadFailedMsgs found failed message file %s.\n", + pData->failedMsgFile); + } + + /* File exists, we can load and process it */ + CHKiRet(strm.Construct(&pstrmFMSG)); + CHKiRet(strm.SettOperationsMode(pstrmFMSG, STREAMMODE_READ)); + CHKiRet(strm.SetsType(pstrmFMSG, STREAMTYPE_FILE_SINGLE)); + CHKiRet(strm.SetFName(pstrmFMSG, pData->failedMsgFile, ustrlen(pData->failedMsgFile))); + CHKiRet(strm.ConstructFinalize(pstrmFMSG)); + + while(strm.ReadLine(pstrmFMSG, &pCStr, 0, 0, NULL, 0, NULL) == RS_RET_OK) { + if(rsCStrLen(pCStr) == 0) { + /* we do not process empty lines */ + DBGPRINTF("omkafka: loadFailedMsgs msg was empty!"); + } else { + puStr = rsCStrGetSzStrNoNULL(pCStr); //topic + pStrTabPos = index((char*)puStr, '\t'); //key + pStrTabPos2 = index((char*)pStrTabPos+1, '\t'); //msg + if ((pStrTabPos != NULL) && (pStrTabPos2 != NULL)) { + *pStrTabPos = '\0'; /* split string into two */ + *pStrTabPos2 = '\0'; /* split string into two */ + DBGPRINTF("omkafka: loadFailedMsgs successfully loaded msg '%s' for " + "topic '%s' key '%s' \n", + pStrTabPos2+1, (char*)puStr, pStrTabPos+1); + if (strlen(pStrTabPos+1)) { + CHKmalloc(fmsgEntry = failedmsg_entry_construct( + pStrTabPos+1,strlen(pStrTabPos+1), + pStrTabPos2+1,strlen(pStrTabPos2+1), + (char*)puStr)); + } else { + CHKmalloc(fmsgEntry = failedmsg_entry_construct( + NULL,0, + pStrTabPos2+1,strlen(pStrTabPos2+1), + (char*)puStr)); + } + SLIST_INSERT_HEAD(&pData->failedmsg_head, fmsgEntry, entries); + } else { + LogError(0, RS_RET_ERR, "omkafka: loadFailedMsgs droping invalid msg found: %s", + (char*)rsCStrGetSzStrNoNULL(pCStr)); + } + } + + rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */ + } +finalize_it: + if(pstrmFMSG != NULL) { + strm.Destruct(&pstrmFMSG); + } + + if(iRet != RS_RET_OK) { + /* We ignore FILE NOT FOUND here */ + if (iRet != RS_RET_FILE_NOT_FOUND) { + LogError(0, iRet, "omkafka: could not load failed messages " + "from file %s error %d - failed messages will not be resend.", + (char*)pData->failedMsgFile, iRet); + } + } else { + DBGPRINTF("omkafka: loadFailedMsgs unlinking '%s'\n", (char*)pData->failedMsgFile); + /* Delete file if still exists! */ + const int r = unlink((char*)pData->failedMsgFile); + if(r != 0 && r != ENOENT) { + LogError(errno, RS_RET_ERR, "omkafka: loadFailedMsgs failed to remove " + "file \"%s\"", (char*)pData->failedMsgFile); + } + } + + RETiRet; +} + +BEGINdoHUP +CODESTARTdoHUP + pthread_mutex_lock(&pData->mutErrFile); + if(pData->fdErrFile != -1) { + close(pData->fdErrFile); + pData->fdErrFile = -1; + } + pthread_mutex_unlock(&pData->mutErrFile); + pthread_mutex_lock(&pData->mutStatsFile); + if(pData->fdStatsFile != -1) { + close(pData->fdStatsFile); + pData->fdStatsFile = -1; + } + pthread_mutex_unlock(&pData->mutStatsFile); + if (pData->bReopenOnHup) { + CHKiRet(setupKafkaHandle(pData, 1)); + } else { + /* Optional */ + const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */ + LogMsg(0, NO_ERRCODE, LOG_INFO, "omkafka: doHUP kafka - '%s' outqueue length: %d," + "callbacks called %d\n", pData->tplName, + rd_kafka_outq_len(pData->rk), callbacksCalled); + } +finalize_it: +ENDdoHUP + +BEGINcreateInstance +CODESTARTcreateInstance + pData->currPartition = 0; + pData->bIsOpen = 0; + pData->bIsSuspended = 0; + pData->fdErrFile = -1; + pData->fdStatsFile = -1; + pData->pTopic = NULL; + pData->bReportErrs = 1; + pData->bReopenOnHup = 1; + pData->bResubmitOnFailure = 0; + pData->bKeepFailedMessages = 0; + pData->failedMsgFile = NULL; + SLIST_INIT(&pData->failedmsg_head); + CHKiRet(pthread_mutex_init(&pData->mut_doAction, NULL)); + CHKiRet(pthread_mutex_init(&pData->mutErrFile, NULL)); + CHKiRet(pthread_mutex_init(&pData->mutStatsFile, NULL)); + CHKiRet(pthread_rwlock_init(&pData->rkLock, NULL)); + CHKiRet(pthread_mutex_init(&pData->mutDynCache, NULL)); + INIT_ATOMIC_HELPER_MUT(pData->mutCurrPartition); +finalize_it: +ENDcreateInstance + + +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature +ENDisCompatibleWithFeature + + +BEGINfreeInstance +CODESTARTfreeInstance + /* Helpers for Failed Msg List */ + failedmsg_entry* fmsgEntry1; + failedmsg_entry* fmsgEntry2; + if(pData->fdErrFile != -1) + close(pData->fdErrFile); + if(pData->fdStatsFile != -1) + close(pData->fdStatsFile); + /* Closing Kafka first! */ + pthread_rwlock_wrlock(&pData->rkLock); + closeKafka(pData); + if(pData->dynaTopic && pData->dynCache != NULL) { + free(pData->dynCache); + pData->dynCache = NULL; + } + /* Persist failed messages */ + if (pData->bResubmitOnFailure && pData->bKeepFailedMessages && pData->failedMsgFile != NULL) { + persistFailedMsgs(pData); + } + pthread_rwlock_unlock(&pData->rkLock); + + if (pData->stats) { + statsobj.Destruct(&pData->stats); + } + + /* Delete Linked List for failed msgs */ + fmsgEntry1 = SLIST_FIRST(&pData->failedmsg_head); + while (fmsgEntry1 != NULL) { + fmsgEntry2 = SLIST_NEXT(fmsgEntry1, entries); + failedmsg_entry_destruct(fmsgEntry1); + fmsgEntry1 = fmsgEntry2; + } + SLIST_INIT(&pData->failedmsg_head); + /* Free other mem */ + free(pData->errorFile); + free(pData->statsFile); + free(pData->failedMsgFile); + free(pData->topic); + free(pData->brokers); + free(pData->tplName); + free(pData->statsName); + for(int i = 0 ; i < pData->nConfParams ; ++i) { + free((void*) pData->confParams[i].name); + free((void*) pData->confParams[i].val); + } + free(pData->confParams); + for(int i = 0 ; i < pData->nTopicConfParams ; ++i) { + free((void*) pData->topicConfParams[i].name); + free((void*) pData->topicConfParams[i].val); + } + free(pData->topicConfParams); + DESTROY_ATOMIC_HELPER_MUT(pData->mutCurrPartition); + pthread_rwlock_destroy(&pData->rkLock); + pthread_mutex_destroy(&pData->mut_doAction); + pthread_mutex_destroy(&pData->mutErrFile); + pthread_mutex_destroy(&pData->mutStatsFile); + pthread_mutex_destroy(&pData->mutDynCache); +ENDfreeInstance + +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + + +BEGINtryResume + int iKafkaRet; + const struct rd_kafka_metadata *metadata; +CODESTARTtryResume + pthread_mutex_lock(&pWrkrData->pData->mut_doAction); /* see doAction header comment! */ + CHKiRet(setupKafkaHandle(pWrkrData->pData, 0)); + + if ((iKafkaRet = rd_kafka_metadata(pWrkrData->pData->rk, 0, NULL, &metadata, 1000)) + != RD_KAFKA_RESP_ERR_NO_ERROR) { + DBGPRINTF("omkafka: tryResume failed, brokers down %d,%s\n", iKafkaRet, rd_kafka_err2str(iKafkaRet)); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } else { + DBGPRINTF("omkafka: tryResume success, %d brokers UP\n", metadata->broker_cnt); + /* Reset suspended state */ + pWrkrData->pData->bIsSuspended = 0; + /* free mem*/ + rd_kafka_metadata_destroy(metadata); + } + +finalize_it: + pthread_mutex_unlock(&pWrkrData->pData->mut_doAction); /* see doAction header comment! */ + DBGPRINTF("omkafka: tryResume returned %d\n", iRet); +ENDtryResume + + +/* IMPORTANT NOTE on multithreading: + * librdkafka creates background threads itself. So omkafka basically needs to move + * memory buffers over to librdkafka, which then does the heavy hauling. As such, we + * think that it is best to run max one wrkr instance of omkafka -- otherwise we just + * get additional locking (contention) overhead without any real gain. As such, + * we use a global mutex for doAction which ensures only one worker can be active + * at any given time. That mutex is also used to guard utility functions (like + * tryResume) which may also be accessed by multiple workers in parallel. + * Note: shall this method be changed, the kafka connection/suspension handling needs + * to be refactored. The current code assumes that all workers share state information + * including librdkafka handles. + */ +BEGINdoAction +CODESTARTdoAction + failedmsg_entry* fmsgEntry; + instanceData *const pData = pWrkrData->pData; + int need_unlock = 0; + int dynaTopicID = 0; + int dynaKeyID = 0; + + if (pData->dynaKey) { + dynaKeyID=2; + if (pData->dynaTopic) { + dynaTopicID=3; + } + } else { + if (pData->dynaTopic) { + dynaTopicID=2; + } + } + pthread_mutex_lock(&pData->mut_doAction); + if (! pData->bIsOpen) + CHKiRet(setupKafkaHandle(pData, 0)); + + /* Lock here to prevent msg loss */ + pthread_rwlock_rdlock(&pData->rkLock); + need_unlock = 1; + + /* We need to trigger callbacks first in order to suspend the Action properly on failure */ + const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */ + DBGPRINTF("omkafka: doAction kafka outqueue length: %d, callbacks called %d\n", + rd_kafka_outq_len(pData->rk), callbacksCalled); + + /* Reprocess failed messages! */ + if (pData->bResubmitOnFailure) { + iRet = checkFailedMessages(pData); + if(iRet != RS_RET_OK) { + DBGPRINTF("omkafka: doAction failed to submit FAILED messages with status %d\n", iRet); + + if (pData->bResubmitOnFailure) { + if (pData->dynaKey || pData->key) { + DBGPRINTF("omkafka: also adding MSG '%.*s' for topic '%s' key '%s' " + "to failed for RETRY!\n", + (int)(strlen((char*)ppString[0])-1), ppString[0], + pData->dynaTopic ? ppString[dynaTopicID] : pData->topic, + pData->dynaKey ? ppString[dynaKeyID] : pData->key); + } else { + DBGPRINTF("omkafka: also adding MSG '%.*s' for topic '%s' " + "to failed for RETRY!\n", + (int)(strlen((char*)ppString[0])-1), ppString[0], + pData->dynaTopic ? ppString[dynaTopicID] : pData->topic); + } + CHKmalloc(fmsgEntry = failedmsg_entry_construct( + (char*) (pData->dynaKey ? ppString[dynaKeyID] : pData->key), + pData->dynaKey || pData->key ? + strlen((char*)(pData->dynaKey ? ppString[dynaKeyID] : pData->key)) : 0, + (char*)ppString[0], strlen((char*)ppString[0]), + (char*) (pData->dynaTopic ? ppString[dynaTopicID] : pData->topic))); + SLIST_INSERT_HEAD(&pData->failedmsg_head, fmsgEntry, entries); + } + ABORT_FINALIZE(iRet); + } + } + + /* support dynamic topic */ + iRet = writeKafka(pData, pData->dynaKey ? ppString[dynaKeyID] : pData->key, ppString[0], ppString[1], + pData->dynaTopic ? ppString[dynaTopicID] : pData->topic, RESUBMIT); + +finalize_it: + if(need_unlock) { + pthread_rwlock_unlock(&pData->rkLock); + } + + if(iRet != RS_RET_OK) { + DBGPRINTF("omkafka: doAction failed with status %d\n", iRet); + } + + /* Suspend Action if broker problems were reported in error callback */ + if (pData->bIsSuspended) { + DBGPRINTF("omkafka: doAction broker failure detected, suspending action\n"); + iRet = RS_RET_SUSPENDED; + } + pthread_mutex_unlock(&pData->mut_doAction); /* must be after last pData access! */ +ENDdoAction + + +static void +setInstParamDefaults(instanceData *pData) +{ + pData->topic = NULL; + pData->pTopic = NULL; + pData->dynaKey = 0; + pData->dynaTopic = 0; + pData->iDynaTopicCacheSize = 50; + pData->brokers = NULL; + pData->autoPartition = 0; + pData->fixedPartition = NO_FIXED_PARTITION; + pData->nPartitions = 1; + pData->nConfParams = 0; + pData->confParams = NULL; + pData->nTopicConfParams = 0; + pData->topicConfParams = NULL; + pData->errorFile = NULL; + pData->statsFile = NULL; + pData->failedMsgFile = NULL; + pData->key = NULL; + pData->closeTimeout = 2000; +} + +static rsRetVal +processKafkaParam(char *const param, + const char **const name, + const char **const paramval) +{ + DEFiRet; + char *val = strstr(param, "="); + if(val == NULL) { + LogError(0, RS_RET_PARAM_ERROR, "missing equal sign in " + "parameter '%s'", param); + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + } + *val = '\0'; /* terminates name */ + ++val; /* now points to begin of value */ + CHKmalloc(*name = strdup(param)); + CHKmalloc(*paramval = strdup(val)); +finalize_it: + RETiRet; +} + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; + int iNumTpls; +CODESTARTnewActInst + if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); + + for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "topic")) { + pData->topic = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynakey")) { + pData->dynaKey = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynatopic")) { + pData->dynaTopic = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynatopic.cachesize")) { + pData->iDynaTopicCacheSize = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "closetimeout")) { + pData->closeTimeout = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "partitions.auto")) { + pData->autoPartition = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "partitions.number")) { + pData->nPartitions = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "partitions.usefixed")) { + pData->fixedPartition = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "broker")) { + es_str_t *es = es_newStr(128); + int bNeedComma = 0; + for(int j = 0 ; j < pvals[i].val.d.ar->nmemb ; ++j) { + if(bNeedComma) + es_addChar(&es, ','); + es_addStr(&es, pvals[i].val.d.ar->arr[j]); + bNeedComma = 1; + } + pData->brokers = es_str2cstr(es, NULL); + es_deleteStr(es); + } else if(!strcmp(actpblk.descr[i].name, "confparam")) { + pData->nConfParams = pvals[i].val.d.ar->nmemb; + CHKmalloc(pData->confParams = malloc(sizeof(struct kafka_params) * + pvals[i].val.d.ar->nmemb )); + for(int j = 0 ; j < pvals[i].val.d.ar->nmemb ; ++j) { + char *cstr = es_str2cstr(pvals[i].val.d.ar->arr[j], NULL); + CHKiRet(processKafkaParam(cstr, &pData->confParams[j].name, + &pData->confParams[j].val)); + free(cstr); + } + } else if(!strcmp(actpblk.descr[i].name, "topicconfparam")) { + pData->nTopicConfParams = pvals[i].val.d.ar->nmemb; + CHKmalloc(pData->topicConfParams = malloc(sizeof(struct kafka_params) * + pvals[i].val.d.ar->nmemb )); + for(int j = 0 ; j < pvals[i].val.d.ar->nmemb ; ++j) { + char *cstr = es_str2cstr(pvals[i].val.d.ar->arr[j], NULL); + CHKiRet(processKafkaParam(cstr, &pData->topicConfParams[j].name, + &pData->topicConfParams[j].val)); + free(cstr); + } + } else if(!strcmp(actpblk.descr[i].name, "errorfile")) { + pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "statsfile")) { + pData->statsFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "key")) { + pData->key = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "reopenonhup")) { + pData->bReopenOnHup = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "resubmitonfailure")) { + pData->bResubmitOnFailure = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "keepfailedmessages")) { + pData->bKeepFailedMessages = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "failedmsgfile")) { + pData->failedMsgFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "statsname")) { + pData->statsName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + LogError(0, RS_RET_INTERNAL_ERROR, + "omkafka: program error, non-handled param '%s'\n", actpblk.descr[i].name); + } + } + + if(pData->brokers == NULL) { + CHKmalloc(pData->brokers = strdup("localhost:9092")); + LogMsg(0, NO_ERRCODE, LOG_INFO, "imkafka: \"broker\" parameter not specified " + "using default of localhost:9092 -- this may not be what you want!"); + } + + if(pData->dynaKey && pData->key == NULL) { + LogError(0, RS_RET_CONFIG_ERROR, + "omkafka: requested dynamic key, but no " + "name for key template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + + if(pData->dynaTopic && pData->topic == NULL) { + LogError(0, RS_RET_CONFIG_ERROR, + "omkafka: requested dynamic topic, but no " + "name for topic template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + + iNumTpls = 2; + if(pData->dynaKey) ++iNumTpls; + if(pData->dynaTopic) ++iNumTpls; + CODE_STD_STRING_REQUESTnewActInst(iNumTpls); + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? + "RSYSLOG_FileFormat" : (char*)pData->tplName), + OMSR_NO_RQD_TPL_OPTS)); + + CHKiRet(OMSRsetEntry(*ppOMSR, 1, (uchar*)strdup(" KAFKA_TimeStamp"), + OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynaKey) + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->key), OMSR_NO_RQD_TPL_OPTS)); + + if(pData->dynaTopic) { + CHKiRet(OMSRsetEntry(*ppOMSR, pData->dynaKey?3:2, ustrdup(pData->topic), OMSR_NO_RQD_TPL_OPTS)); + CHKmalloc(pData->dynCache = (dynaTopicCacheEntry**) + calloc(pData->iDynaTopicCacheSize, sizeof(dynaTopicCacheEntry*))); + pData->iCurrElt = -1; + } + + pthread_mutex_lock(&closeTimeoutMut); + if (closeTimeout < pData->closeTimeout) { + closeTimeout = pData->closeTimeout; + } + pthread_mutex_unlock(&closeTimeoutMut); + + /* Load failed messages here (If enabled), do NOT check for IRET!*/ + if (pData->bKeepFailedMessages && pData->failedMsgFile != NULL) { + loadFailedMsgs(pData); + } + + if (pData->statsName) { + CHKiRet(statsobj.Construct(&pData->stats)); + CHKiRet(statsobj.SetName(pData->stats, (uchar *)pData->statsName)); + CHKiRet(statsobj.SetOrigin(pData->stats, (uchar *)"omkafka")); + + /* Track following stats */ + STATSCOUNTER_INIT(pData->ctrTopicSubmit, pData->mutCtrTopicSubmit); + CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"submitted", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->ctrTopicSubmit)); + STATSCOUNTER_INIT(pData->ctrKafkaFail, pData->mutCtrKafkaFail); + CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"failures", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->ctrKafkaFail)); + STATSCOUNTER_INIT(pData->ctrKafkaAck, pData->mutCtrKafkaAck); + CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"acked", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->ctrKafkaAck)); + CHKiRet(statsobj.ConstructFinalize(pData->stats)); + } + +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); +ENDnewActInst + + +BEGINmodExit +CODESTARTmodExit + statsobj.Destruct(&kafkaStats); + CHKiRet(objRelease(statsobj, CORE_COMPONENT)); + DESTROY_ATOMIC_HELPER_MUT(mutClock); + + pthread_mutex_lock(&closeTimeoutMut); + int timeout = closeTimeout; + pthread_mutex_unlock(&closeTimeoutMut); + pthread_mutex_destroy(&closeTimeoutMut); + if (rd_kafka_wait_destroyed(timeout) != 0) { + LogMsg(0, RS_RET_OK, LOG_WARNING, + "omkafka: could not terminate librdkafka gracefully, " + "%d threads still remain.\n", rd_kafka_thread_cnt()); + } +finalize_it: +ENDmodExit + + +NO_LEGACY_CONF_parseSelectorAct +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES +CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_doHUP +ENDqueryEtryPt + + +BEGINmodInit() +CODESTARTmodInit + uchar *pTmp; +INITLegCnfVars + *ipIFVersProvided = CURR_MOD_IF_VERSION; +CODEmodInit_QueryRegCFSLineHdlr + dbgprintf("just because librdkafka needs it, sqrt of 4 is %f\n", sqrt(4.0)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); + CHKiRet(objUse(strm, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + + INIT_ATOMIC_HELPER_MUT(mutClock); + + DBGPRINTF("omkafka %s using librdkafka version %s, 0x%x\n", + VERSION, rd_kafka_version_str(), rd_kafka_version()); + CHKiRet(statsobj.Construct(&kafkaStats)); + CHKiRet(statsobj.SetName(kafkaStats, (uchar *)"omkafka")); + CHKiRet(statsobj.SetOrigin(kafkaStats, (uchar*)"omkafka")); + STATSCOUNTER_INIT(ctrTopicSubmit, mutCtrTopicSubmit); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"submitted", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrTopicSubmit)); + STATSCOUNTER_INIT(ctrQueueSize, mutCtrQueueSize); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"maxoutqsize", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrQueueSize)); + STATSCOUNTER_INIT(ctrKafkaFail, mutCtrKafkaFail); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"failures", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaFail)); + STATSCOUNTER_INIT(ctrCacheSkip, mutCtrCacheSkip); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"topicdynacache.skipped", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrCacheSkip)); + STATSCOUNTER_INIT(ctrCacheMiss, mutCtrCacheMiss); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"topicdynacache.miss", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrCacheMiss)); + STATSCOUNTER_INIT(ctrCacheEvict, mutCtrCacheEvict); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"topicdynacache.evicted", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrCacheEvict)); + STATSCOUNTER_INIT(ctrKafkaAck, mutCtrKafkaAck); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"acked", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaAck)); + STATSCOUNTER_INIT(ctrKafkaMsgTooLarge, mutCtrKafkaMsgTooLarge); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"failures_msg_too_large", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaMsgTooLarge)); + STATSCOUNTER_INIT(ctrKafkaUnknownTopic, mutCtrKafkaUnknownTopic); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"failures_unknown_topic", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaUnknownTopic)); + STATSCOUNTER_INIT(ctrKafkaQueueFull, mutCtrKafkaQueueFull); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"failures_queue_full", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaQueueFull)); + STATSCOUNTER_INIT(ctrKafkaUnknownPartition, mutCtrKafkaUnknownPartition); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"failures_unknown_partition", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaUnknownPartition)); + STATSCOUNTER_INIT(ctrKafkaOtherErrors, mutCtrKafkaOtherErrors); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"failures_other", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaOtherErrors)); + STATSCOUNTER_INIT(ctrKafkaRespTimedOut, mutCtrKafkaRespTimedOut); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"errors_timed_out", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaRespTimedOut)); + STATSCOUNTER_INIT(ctrKafkaRespTransport, mutCtrKafkaRespTransport); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"errors_transport", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaRespTransport)); + STATSCOUNTER_INIT(ctrKafkaRespBrokerDown, mutCtrKafkaRespBrokerDown); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"errors_broker_down", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaRespBrokerDown)); + STATSCOUNTER_INIT(ctrKafkaRespAuth, mutCtrKafkaRespAuth); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"errors_auth", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaRespAuth)); + STATSCOUNTER_INIT(ctrKafkaRespSSL, mutCtrKafkaRespSSL); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"errors_ssl", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaRespSSL)); + STATSCOUNTER_INIT(ctrKafkaRespOther, mutCtrKafkaRespOther); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"errors_other", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaRespOther)); + CHKiRet(statsobj.AddCounter(kafkaStats, UCHAR_CONSTANT("rtt_avg_usec"), + ctrType_Int, CTR_FLAG_NONE, &rtt_avg_usec)); + CHKiRet(statsobj.AddCounter(kafkaStats, UCHAR_CONSTANT("throttle_avg_msec"), + ctrType_Int, CTR_FLAG_NONE, &throttle_avg_msec)); + CHKiRet(statsobj.AddCounter(kafkaStats, UCHAR_CONSTANT("int_latency_avg_usec"), + ctrType_Int, CTR_FLAG_NONE, &int_latency_avg_usec)); + CHKiRet(statsobj.ConstructFinalize(kafkaStats)); + + DBGPRINTF("omkafka: Add KAFKA_TimeStamp to template system ONCE\n"); + pTmp = (uchar*) KAFKA_TimeStamp; + tplAddLine(ourConf, " KAFKA_TimeStamp", &pTmp); +ENDmodInit |