diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 16:28:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 16:28:20 +0000 |
commit | dcc721a95bef6f0d8e6d8775b8efe33e5aecd562 (patch) | |
tree | 66a2774cd0ee294d019efd71d2544c70f42b2842 /plugins/omelasticsearch | |
parent | Initial commit. (diff) | |
download | rsyslog-dcc721a95bef6f0d8e6d8775b8efe33e5aecd562.tar.xz rsyslog-dcc721a95bef6f0d8e6d8775b8efe33e5aecd562.zip |
Adding upstream version 8.2402.0.upstream/8.2402.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'plugins/omelasticsearch')
-rw-r--r-- | plugins/omelasticsearch/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/omelasticsearch/Makefile.in | 801 | ||||
-rw-r--r-- | plugins/omelasticsearch/README | 21 | ||||
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 2319 |
4 files changed, 3149 insertions, 0 deletions
diff --git a/plugins/omelasticsearch/Makefile.am b/plugins/omelasticsearch/Makefile.am new file mode 100644 index 0000000..ddaa59a --- /dev/null +++ b/plugins/omelasticsearch/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = omelasticsearch.la + +omelasticsearch_la_SOURCES = omelasticsearch.c +omelasticsearch_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CURL_CFLAGS) +omelasticsearch_la_LDFLAGS = -module -avoid-version +omelasticsearch_la_LIBADD = $(CURL_LIBS) $(LIBM) + +EXTRA_DIST = diff --git a/plugins/omelasticsearch/Makefile.in b/plugins/omelasticsearch/Makefile.in new file mode 100644 index 0000000..e329e68 --- /dev/null +++ b/plugins/omelasticsearch/Makefile.in @@ -0,0 +1,801 @@ +# Makefile.in generated by automake 1.16.1 from Makefile.am. +# @configure_input@ + +# Copyright (C) 1994-2018 Free Software Foundation, Inc. + +# This Makefile.in is free software; the Free Software Foundation +# gives unlimited permission to copy and/or distribute it, +# with or without modifications, as long as this notice is preserved. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY, to the extent permitted by law; without +# even the implied warranty of MERCHANTABILITY or FITNESS FOR A +# PARTICULAR PURPOSE. + +@SET_MAKE@ + +VPATH = @srcdir@ +am__is_gnu_make = { \ + if test -z '$(MAKELEVEL)'; then \ + false; \ + elif test -n '$(MAKE_HOST)'; then \ + true; \ + elif test -n '$(MAKE_VERSION)' && test -n '$(CURDIR)'; then \ + true; \ + else \ + false; \ + fi; \ +} +am__make_running_with_option = \ + case $${target_option-} in \ + ?) ;; \ + *) echo "am__make_running_with_option: internal error: invalid" \ + "target option '$${target_option-}' specified" >&2; \ + exit 1;; \ + esac; \ + has_opt=no; \ + sane_makeflags=$$MAKEFLAGS; \ + if $(am__is_gnu_make); then \ + sane_makeflags=$$MFLAGS; \ + else \ + case $$MAKEFLAGS in \ + *\\[\ \ ]*) \ + bs=\\; \ + sane_makeflags=`printf '%s\n' "$$MAKEFLAGS" \ + | sed "s/$$bs$$bs[$$bs $$bs ]*//g"`;; \ + esac; \ + fi; \ + skip_next=no; \ + strip_trailopt () \ + { \ + flg=`printf '%s\n' "$$flg" | sed "s/$$1.*$$//"`; \ + }; \ + for flg in $$sane_makeflags; do \ + test $$skip_next = yes && { skip_next=no; continue; }; \ + case $$flg in \ + *=*|--*) continue;; \ + -*I) strip_trailopt 'I'; skip_next=yes;; \ + -*I?*) strip_trailopt 'I';; \ + -*O) strip_trailopt 'O'; skip_next=yes;; \ + -*O?*) strip_trailopt 'O';; \ + -*l) strip_trailopt 'l'; skip_next=yes;; \ + -*l?*) strip_trailopt 'l';; \ + -[dEDm]) skip_next=yes;; \ + -[JT]) skip_next=yes;; \ + esac; \ + case $$flg in \ + *$$target_option*) has_opt=yes; break;; \ + esac; \ + done; \ + test $$has_opt = yes +am__make_dryrun = (target_option=n; $(am__make_running_with_option)) +am__make_keepgoing = (target_option=k; $(am__make_running_with_option)) +pkgdatadir = $(datadir)/@PACKAGE@ +pkgincludedir = $(includedir)/@PACKAGE@ +pkglibdir = $(libdir)/@PACKAGE@ +pkglibexecdir = $(libexecdir)/@PACKAGE@ +am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd +install_sh_DATA = $(install_sh) -c -m 644 +install_sh_PROGRAM = $(install_sh) -c +install_sh_SCRIPT = $(install_sh) -c +INSTALL_HEADER = $(INSTALL_DATA) +transform = $(program_transform_name) +NORMAL_INSTALL = : +PRE_INSTALL = : +POST_INSTALL = : +NORMAL_UNINSTALL = : +PRE_UNINSTALL = : +POST_UNINSTALL = : +build_triplet = @build@ +host_triplet = @host@ +subdir = plugins/omelasticsearch +ACLOCAL_M4 = $(top_srcdir)/aclocal.m4 +am__aclocal_m4_deps = $(top_srcdir)/m4/ac_check_define.m4 \ + $(top_srcdir)/m4/atomic_operations.m4 \ + $(top_srcdir)/m4/atomic_operations_64bit.m4 \ + $(top_srcdir)/m4/libtool.m4 $(top_srcdir)/m4/ltoptions.m4 \ + $(top_srcdir)/m4/ltsugar.m4 $(top_srcdir)/m4/ltversion.m4 \ + $(top_srcdir)/m4/lt~obsolete.m4 $(top_srcdir)/configure.ac +am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \ + $(ACLOCAL_M4) +DIST_COMMON = $(srcdir)/Makefile.am $(am__DIST_COMMON) +mkinstalldirs = $(install_sh) -d +CONFIG_HEADER = $(top_builddir)/config.h +CONFIG_CLEAN_FILES = +CONFIG_CLEAN_VPATH_FILES = +am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`; +am__vpath_adj = case $$p in \ + $(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \ + *) f=$$p;; \ + esac; +am__strip_dir = f=`echo $$p | sed -e 's|^.*/||'`; +am__install_max = 40 +am__nobase_strip_setup = \ + srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*|]/\\\\&/g'` +am__nobase_strip = \ + for p in $$list; do echo "$$p"; done | sed -e "s|$$srcdirstrip/||" +am__nobase_list = $(am__nobase_strip_setup); \ + for p in $$list; do echo "$$p $$p"; done | \ + sed "s| $$srcdirstrip/| |;"' / .*\//!s/ .*/ ./; s,\( .*\)/[^/]*$$,\1,' | \ + $(AWK) 'BEGIN { files["."] = "" } { files[$$2] = files[$$2] " " $$1; \ + if (++n[$$2] == $(am__install_max)) \ + { print $$2, files[$$2]; n[$$2] = 0; files[$$2] = "" } } \ + END { for (dir in files) print dir, files[dir] }' +am__base_list = \ + sed '$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;s/\n/ /g' | \ + sed '$$!N;$$!N;$$!N;$$!N;s/\n/ /g' +am__uninstall_files_from_dir = { \ + test -z "$$files" \ + || { test ! -d "$$dir" && test ! -f "$$dir" && test ! -r "$$dir"; } \ + || { echo " ( cd '$$dir' && rm -f" $$files ")"; \ + $(am__cd) "$$dir" && rm -f $$files; }; \ + } +am__installdirs = "$(DESTDIR)$(pkglibdir)" +LTLIBRARIES = $(pkglib_LTLIBRARIES) +am__DEPENDENCIES_1 = +omelasticsearch_la_DEPENDENCIES = $(am__DEPENDENCIES_1) \ + $(am__DEPENDENCIES_1) +am_omelasticsearch_la_OBJECTS = omelasticsearch_la-omelasticsearch.lo +omelasticsearch_la_OBJECTS = $(am_omelasticsearch_la_OBJECTS) +AM_V_lt = $(am__v_lt_@AM_V@) +am__v_lt_ = $(am__v_lt_@AM_DEFAULT_V@) +am__v_lt_0 = --silent +am__v_lt_1 = +omelasticsearch_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC \ + $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=link $(CCLD) \ + $(AM_CFLAGS) $(CFLAGS) $(omelasticsearch_la_LDFLAGS) \ + $(LDFLAGS) -o $@ +AM_V_P = $(am__v_P_@AM_V@) +am__v_P_ = $(am__v_P_@AM_DEFAULT_V@) +am__v_P_0 = false +am__v_P_1 = : +AM_V_GEN = $(am__v_GEN_@AM_V@) +am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@) +am__v_GEN_0 = @echo " GEN " $@; +am__v_GEN_1 = +AM_V_at = $(am__v_at_@AM_V@) +am__v_at_ = $(am__v_at_@AM_DEFAULT_V@) +am__v_at_0 = @ +am__v_at_1 = +DEFAULT_INCLUDES = -I.@am__isrc@ -I$(top_builddir) +depcomp = $(SHELL) $(top_srcdir)/depcomp +am__maybe_remake_depfiles = depfiles +am__depfiles_remade = \ + ./$(DEPDIR)/omelasticsearch_la-omelasticsearch.Plo +am__mv = mv -f +COMPILE = $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) \ + $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) +LTCOMPILE = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ + $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) \ + $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) \ + $(AM_CFLAGS) $(CFLAGS) +AM_V_CC = $(am__v_CC_@AM_V@) +am__v_CC_ = $(am__v_CC_@AM_DEFAULT_V@) +am__v_CC_0 = @echo " CC " $@; +am__v_CC_1 = +CCLD = $(CC) +LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ + $(LIBTOOLFLAGS) --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \ + $(AM_LDFLAGS) $(LDFLAGS) -o $@ +AM_V_CCLD = $(am__v_CCLD_@AM_V@) +am__v_CCLD_ = $(am__v_CCLD_@AM_DEFAULT_V@) +am__v_CCLD_0 = @echo " CCLD " $@; +am__v_CCLD_1 = +SOURCES = $(omelasticsearch_la_SOURCES) +DIST_SOURCES = $(omelasticsearch_la_SOURCES) +am__can_run_installinfo = \ + case $$AM_UPDATE_INFO_DIR in \ + n|no|NO) false;; \ + *) (install-info --version) >/dev/null 2>&1;; \ + esac +am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP) +# Read a list of newline-separated strings from the standard input, +# and print each of them once, without duplicates. Input order is +# *not* preserved. +am__uniquify_input = $(AWK) '\ + BEGIN { nonempty = 0; } \ + { items[$$0] = 1; nonempty = 1; } \ + END { if (nonempty) { for (i in items) print i; }; } \ +' +# Make sure the list of sources is unique. This is necessary because, +# e.g., the same source file might be shared among _SOURCES variables +# for different programs/libraries. +am__define_uniq_tagged_files = \ + list='$(am__tagged_files)'; \ + unique=`for i in $$list; do \ + if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \ + done | $(am__uniquify_input)` +ETAGS = etags +CTAGS = ctags +am__DIST_COMMON = $(srcdir)/Makefile.in $(top_srcdir)/depcomp README +DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST) +ACLOCAL = @ACLOCAL@ +AMTAR = @AMTAR@ +AM_DEFAULT_VERBOSITY = @AM_DEFAULT_VERBOSITY@ +APU_CFLAGS = @APU_CFLAGS@ +APU_LIBS = @APU_LIBS@ +AR = @AR@ +AUTOCONF = @AUTOCONF@ +AUTOHEADER = @AUTOHEADER@ +AUTOMAKE = @AUTOMAKE@ +AWK = @AWK@ +CC = @CC@ +CCDEPMODE = @CCDEPMODE@ +CFLAGS = @CFLAGS@ +CIVETWEB_LIBS = @CIVETWEB_LIBS@ +CONF_FILE_PATH = @CONF_FILE_PATH@ +CPP = @CPP@ +CPPFLAGS = @CPPFLAGS@ +CURL_CFLAGS = @CURL_CFLAGS@ +CURL_LIBS = @CURL_LIBS@ +CYGPATH_W = @CYGPATH_W@ +CZMQ_CFLAGS = @CZMQ_CFLAGS@ +CZMQ_LIBS = @CZMQ_LIBS@ +DEFS = @DEFS@ +DEPDIR = @DEPDIR@ +DLLTOOL = @DLLTOOL@ +DL_LIBS = @DL_LIBS@ +DSYMUTIL = @DSYMUTIL@ +DUMPBIN = @DUMPBIN@ +ECHO_C = @ECHO_C@ +ECHO_N = @ECHO_N@ +ECHO_T = @ECHO_T@ +EGREP = @EGREP@ +EXEEXT = @EXEEXT@ +FAUP_LIBS = @FAUP_LIBS@ +FGREP = @FGREP@ +GLIB_CFLAGS = @GLIB_CFLAGS@ +GLIB_LIBS = @GLIB_LIBS@ +GNUTLS_CFLAGS = @GNUTLS_CFLAGS@ +GNUTLS_LIBS = @GNUTLS_LIBS@ +GREP = @GREP@ +GSS_LIBS = @GSS_LIBS@ +GT_KSI_LS12_CFLAGS = @GT_KSI_LS12_CFLAGS@ +GT_KSI_LS12_LIBS = @GT_KSI_LS12_LIBS@ +HASH_XXHASH_LIBS = @HASH_XXHASH_LIBS@ +HIREDIS_CFLAGS = @HIREDIS_CFLAGS@ +HIREDIS_LIBS = @HIREDIS_LIBS@ +IMUDP_LIBS = @IMUDP_LIBS@ +INSTALL = @INSTALL@ +INSTALL_DATA = @INSTALL_DATA@ +INSTALL_PROGRAM = @INSTALL_PROGRAM@ +INSTALL_SCRIPT = @INSTALL_SCRIPT@ +INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@ +IP = @IP@ +JAVA = @JAVA@ +JAVAC = @JAVAC@ +LD = @LD@ +LDFLAGS = @LDFLAGS@ +LEX = @LEX@ +LEXLIB = @LEXLIB@ +LEX_OUTPUT_ROOT = @LEX_OUTPUT_ROOT@ +LIBCAPNG_CFLAGS = @LIBCAPNG_CFLAGS@ +LIBCAPNG_LIBS = @LIBCAPNG_LIBS@ +LIBCAPNG_PRESENT_CFLAGS = @LIBCAPNG_PRESENT_CFLAGS@ +LIBCAPNG_PRESENT_LIBS = @LIBCAPNG_PRESENT_LIBS@ +LIBDBI_CFLAGS = @LIBDBI_CFLAGS@ +LIBDBI_LIBS = @LIBDBI_LIBS@ +LIBESTR_CFLAGS = @LIBESTR_CFLAGS@ +LIBESTR_LIBS = @LIBESTR_LIBS@ +LIBEVENT_CFLAGS = @LIBEVENT_CFLAGS@ +LIBEVENT_LIBS = @LIBEVENT_LIBS@ +LIBFASTJSON_CFLAGS = @LIBFASTJSON_CFLAGS@ +LIBFASTJSON_LIBS = @LIBFASTJSON_LIBS@ +LIBGCRYPT_CFLAGS = @LIBGCRYPT_CFLAGS@ +LIBGCRYPT_CONFIG = @LIBGCRYPT_CONFIG@ +LIBGCRYPT_LIBS = @LIBGCRYPT_LIBS@ +LIBLOGGING_CFLAGS = @LIBLOGGING_CFLAGS@ +LIBLOGGING_LIBS = @LIBLOGGING_LIBS@ +LIBLOGGING_STDLOG_CFLAGS = @LIBLOGGING_STDLOG_CFLAGS@ +LIBLOGGING_STDLOG_LIBS = @LIBLOGGING_STDLOG_LIBS@ +LIBLOGNORM_CFLAGS = @LIBLOGNORM_CFLAGS@ +LIBLOGNORM_LIBS = @LIBLOGNORM_LIBS@ +LIBLZ4_CFLAGS = @LIBLZ4_CFLAGS@ +LIBLZ4_LIBS = @LIBLZ4_LIBS@ +LIBM = @LIBM@ +LIBMONGOC_CFLAGS = @LIBMONGOC_CFLAGS@ +LIBMONGOC_LIBS = @LIBMONGOC_LIBS@ +LIBOBJS = @LIBOBJS@ +LIBRDKAFKA_CFLAGS = @LIBRDKAFKA_CFLAGS@ +LIBRDKAFKA_LIBS = @LIBRDKAFKA_LIBS@ +LIBS = @LIBS@ +LIBSYSTEMD_CFLAGS = @LIBSYSTEMD_CFLAGS@ +LIBSYSTEMD_JOURNAL_CFLAGS = @LIBSYSTEMD_JOURNAL_CFLAGS@ +LIBSYSTEMD_JOURNAL_LIBS = @LIBSYSTEMD_JOURNAL_LIBS@ +LIBSYSTEMD_LIBS = @LIBSYSTEMD_LIBS@ +LIBTOOL = @LIBTOOL@ +LIBUUID_CFLAGS = @LIBUUID_CFLAGS@ +LIBUUID_LIBS = @LIBUUID_LIBS@ +LIPO = @LIPO@ +LN_S = @LN_S@ +LTLIBOBJS = @LTLIBOBJS@ +LT_SYS_LIBRARY_PATH = @LT_SYS_LIBRARY_PATH@ +MAKEINFO = @MAKEINFO@ +MANIFEST_TOOL = @MANIFEST_TOOL@ +MKDIR_P = @MKDIR_P@ +MYSQL_CFLAGS = @MYSQL_CFLAGS@ +MYSQL_CONFIG = @MYSQL_CONFIG@ +MYSQL_LIBS = @MYSQL_LIBS@ +NM = @NM@ +NMEDIT = @NMEDIT@ +OBJDUMP = @OBJDUMP@ +OBJEXT = @OBJEXT@ +OPENSSL_CFLAGS = @OPENSSL_CFLAGS@ +OPENSSL_LIBS = @OPENSSL_LIBS@ +OTOOL = @OTOOL@ +OTOOL64 = @OTOOL64@ +PACKAGE = @PACKAGE@ +PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@ +PACKAGE_NAME = @PACKAGE_NAME@ +PACKAGE_STRING = @PACKAGE_STRING@ +PACKAGE_TARNAME = @PACKAGE_TARNAME@ +PACKAGE_URL = @PACKAGE_URL@ +PACKAGE_VERSION = @PACKAGE_VERSION@ +PATH_SEPARATOR = @PATH_SEPARATOR@ +PGSQL_CFLAGS = @PGSQL_CFLAGS@ +PGSQL_LIBS = @PGSQL_LIBS@ +PG_CONFIG = @PG_CONFIG@ +PID_FILE_PATH = @PID_FILE_PATH@ +PKG_CONFIG = @PKG_CONFIG@ +PKG_CONFIG_LIBDIR = @PKG_CONFIG_LIBDIR@ +PKG_CONFIG_PATH = @PKG_CONFIG_PATH@ +PROTON_CFLAGS = @PROTON_CFLAGS@ +PROTON_LIBS = @PROTON_LIBS@ +PROTON_PROACTOR_CFLAGS = @PROTON_PROACTOR_CFLAGS@ +PROTON_PROACTOR_LIBS = @PROTON_PROACTOR_LIBS@ +PTHREADS_CFLAGS = @PTHREADS_CFLAGS@ +PTHREADS_LIBS = @PTHREADS_LIBS@ +PYTHON = @PYTHON@ +PYTHON_EXEC_PREFIX = @PYTHON_EXEC_PREFIX@ +PYTHON_PLATFORM = @PYTHON_PLATFORM@ +PYTHON_PREFIX = @PYTHON_PREFIX@ +PYTHON_VERSION = @PYTHON_VERSION@ +RABBITMQ_CFLAGS = @RABBITMQ_CFLAGS@ +RABBITMQ_LIBS = @RABBITMQ_LIBS@ +RANLIB = @RANLIB@ +READLINK = @READLINK@ +REDIS = @REDIS@ +RELP_CFLAGS = @RELP_CFLAGS@ +RELP_LIBS = @RELP_LIBS@ +RSRT_CFLAGS = @RSRT_CFLAGS@ +RSRT_CFLAGS1 = @RSRT_CFLAGS1@ +RSRT_LIBS = @RSRT_LIBS@ +RSRT_LIBS1 = @RSRT_LIBS1@ +RST2MAN = @RST2MAN@ +RT_LIBS = @RT_LIBS@ +SED = @SED@ +SET_MAKE = @SET_MAKE@ +SHELL = @SHELL@ +SNMP_CFLAGS = @SNMP_CFLAGS@ +SNMP_LIBS = @SNMP_LIBS@ +SOL_LIBS = @SOL_LIBS@ +STRIP = @STRIP@ +TCL_BIN_DIR = @TCL_BIN_DIR@ +TCL_INCLUDE_SPEC = @TCL_INCLUDE_SPEC@ +TCL_LIB_FILE = @TCL_LIB_FILE@ +TCL_LIB_FLAG = @TCL_LIB_FLAG@ +TCL_LIB_SPEC = @TCL_LIB_SPEC@ +TCL_PATCH_LEVEL = @TCL_PATCH_LEVEL@ +TCL_SRC_DIR = @TCL_SRC_DIR@ +TCL_STUB_LIB_FILE = @TCL_STUB_LIB_FILE@ +TCL_STUB_LIB_FLAG = @TCL_STUB_LIB_FLAG@ +TCL_STUB_LIB_SPEC = @TCL_STUB_LIB_SPEC@ +TCL_VERSION = @TCL_VERSION@ +UDPSPOOF_CFLAGS = @UDPSPOOF_CFLAGS@ +UDPSPOOF_LIBS = @UDPSPOOF_LIBS@ +VALGRIND = @VALGRIND@ +VERSION = @VERSION@ +WARN_CFLAGS = @WARN_CFLAGS@ +WARN_LDFLAGS = @WARN_LDFLAGS@ +WARN_SCANNERFLAGS = @WARN_SCANNERFLAGS@ +WGET = @WGET@ +YACC = @YACC@ +YACC_FOUND = @YACC_FOUND@ +YFLAGS = @YFLAGS@ +ZLIB_CFLAGS = @ZLIB_CFLAGS@ +ZLIB_LIBS = @ZLIB_LIBS@ +ZSTD_CFLAGS = @ZSTD_CFLAGS@ +ZSTD_LIBS = @ZSTD_LIBS@ +abs_builddir = @abs_builddir@ +abs_srcdir = @abs_srcdir@ +abs_top_builddir = @abs_top_builddir@ +abs_top_srcdir = @abs_top_srcdir@ +ac_ct_AR = @ac_ct_AR@ +ac_ct_CC = @ac_ct_CC@ +ac_ct_DUMPBIN = @ac_ct_DUMPBIN@ +am__include = @am__include@ +am__leading_dot = @am__leading_dot@ +am__quote = @am__quote@ +am__tar = @am__tar@ +am__untar = @am__untar@ +bindir = @bindir@ +build = @build@ +build_alias = @build_alias@ +build_cpu = @build_cpu@ +build_os = @build_os@ +build_vendor = @build_vendor@ +builddir = @builddir@ +datadir = @datadir@ +datarootdir = @datarootdir@ +docdir = @docdir@ +dvidir = @dvidir@ +exec_prefix = @exec_prefix@ +host = @host@ +host_alias = @host_alias@ +host_cpu = @host_cpu@ +host_os = @host_os@ +host_vendor = @host_vendor@ +htmldir = @htmldir@ +includedir = @includedir@ +infodir = @infodir@ +install_sh = @install_sh@ +libdir = @libdir@ +libexecdir = @libexecdir@ +localedir = @localedir@ +localstatedir = @localstatedir@ +mandir = @mandir@ +mkdir_p = @mkdir_p@ +moddirs = @moddirs@ +oldincludedir = @oldincludedir@ +pdfdir = @pdfdir@ +pkgpyexecdir = @pkgpyexecdir@ +pkgpythondir = @pkgpythondir@ +prefix = @prefix@ +program_transform_name = @program_transform_name@ +psdir = @psdir@ +pyexecdir = @pyexecdir@ +pythondir = @pythondir@ +runstatedir = @runstatedir@ +sbindir = @sbindir@ +sharedstatedir = @sharedstatedir@ +srcdir = @srcdir@ +sysconfdir = @sysconfdir@ +target_alias = @target_alias@ +top_build_prefix = @top_build_prefix@ +top_builddir = @top_builddir@ +top_srcdir = @top_srcdir@ +pkglib_LTLIBRARIES = omelasticsearch.la +omelasticsearch_la_SOURCES = omelasticsearch.c +omelasticsearch_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CURL_CFLAGS) +omelasticsearch_la_LDFLAGS = -module -avoid-version +omelasticsearch_la_LIBADD = $(CURL_LIBS) $(LIBM) +EXTRA_DIST = +all: all-am + +.SUFFIXES: +.SUFFIXES: .c .lo .o .obj +$(srcdir)/Makefile.in: $(srcdir)/Makefile.am $(am__configure_deps) + @for dep in $?; do \ + case '$(am__configure_deps)' in \ + *$$dep*) \ + ( cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh ) \ + && { if test -f $@; then exit 0; else break; fi; }; \ + exit 1;; \ + esac; \ + done; \ + echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu plugins/omelasticsearch/Makefile'; \ + $(am__cd) $(top_srcdir) && \ + $(AUTOMAKE) --gnu plugins/omelasticsearch/Makefile +Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status + @case '$?' in \ + *config.status*) \ + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \ + *) \ + echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__maybe_remake_depfiles)'; \ + cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__maybe_remake_depfiles);; \ + esac; + +$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh + +$(top_srcdir)/configure: $(am__configure_deps) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh +$(ACLOCAL_M4): $(am__aclocal_m4_deps) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh +$(am__aclocal_m4_deps): + +install-pkglibLTLIBRARIES: $(pkglib_LTLIBRARIES) + @$(NORMAL_INSTALL) + @list='$(pkglib_LTLIBRARIES)'; test -n "$(pkglibdir)" || list=; \ + list2=; for p in $$list; do \ + if test -f $$p; then \ + list2="$$list2 $$p"; \ + else :; fi; \ + done; \ + test -z "$$list2" || { \ + echo " $(MKDIR_P) '$(DESTDIR)$(pkglibdir)'"; \ + $(MKDIR_P) "$(DESTDIR)$(pkglibdir)" || exit 1; \ + echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 '$(DESTDIR)$(pkglibdir)'"; \ + $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 "$(DESTDIR)$(pkglibdir)"; \ + } + +uninstall-pkglibLTLIBRARIES: + @$(NORMAL_UNINSTALL) + @list='$(pkglib_LTLIBRARIES)'; test -n "$(pkglibdir)" || list=; \ + for p in $$list; do \ + $(am__strip_dir) \ + echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=uninstall rm -f '$(DESTDIR)$(pkglibdir)/$$f'"; \ + $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=uninstall rm -f "$(DESTDIR)$(pkglibdir)/$$f"; \ + done + +clean-pkglibLTLIBRARIES: + -test -z "$(pkglib_LTLIBRARIES)" || rm -f $(pkglib_LTLIBRARIES) + @list='$(pkglib_LTLIBRARIES)'; \ + locs=`for p in $$list; do echo $$p; done | \ + sed 's|^[^/]*$$|.|; s|/[^/]*$$||; s|$$|/so_locations|' | \ + sort -u`; \ + test -z "$$locs" || { \ + echo rm -f $${locs}; \ + rm -f $${locs}; \ + } + +omelasticsearch.la: $(omelasticsearch_la_OBJECTS) $(omelasticsearch_la_DEPENDENCIES) $(EXTRA_omelasticsearch_la_DEPENDENCIES) + $(AM_V_CCLD)$(omelasticsearch_la_LINK) -rpath $(pkglibdir) $(omelasticsearch_la_OBJECTS) $(omelasticsearch_la_LIBADD) $(LIBS) + +mostlyclean-compile: + -rm -f *.$(OBJEXT) + +distclean-compile: + -rm -f *.tab.c + +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/omelasticsearch_la-omelasticsearch.Plo@am__quote@ # am--include-marker + +$(am__depfiles_remade): + @$(MKDIR_P) $(@D) + @echo '# dummy' >$@-t && $(am__mv) $@-t $@ + +am--depfiles: $(am__depfiles_remade) + +.c.o: +@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.o$$||'`;\ +@am__fastdepCC_TRUE@ $(COMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ $< &&\ +@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Po +@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(COMPILE) -c -o $@ $< + +.c.obj: +@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.obj$$||'`;\ +@am__fastdepCC_TRUE@ $(COMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ `$(CYGPATH_W) '$<'` &&\ +@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Po +@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(COMPILE) -c -o $@ `$(CYGPATH_W) '$<'` + +.c.lo: +@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.lo$$||'`;\ +@am__fastdepCC_TRUE@ $(LTCOMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ $< &&\ +@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Plo +@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(LTCOMPILE) -c -o $@ $< + +omelasticsearch_la-omelasticsearch.lo: omelasticsearch.c +@am__fastdepCC_TRUE@ $(AM_V_CC)$(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(omelasticsearch_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT omelasticsearch_la-omelasticsearch.lo -MD -MP -MF $(DEPDIR)/omelasticsearch_la-omelasticsearch.Tpo -c -o omelasticsearch_la-omelasticsearch.lo `test -f 'omelasticsearch.c' || echo '$(srcdir)/'`omelasticsearch.c +@am__fastdepCC_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/omelasticsearch_la-omelasticsearch.Tpo $(DEPDIR)/omelasticsearch_la-omelasticsearch.Plo +@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='omelasticsearch.c' object='omelasticsearch_la-omelasticsearch.lo' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(omelasticsearch_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o omelasticsearch_la-omelasticsearch.lo `test -f 'omelasticsearch.c' || echo '$(srcdir)/'`omelasticsearch.c + +mostlyclean-libtool: + -rm -f *.lo + +clean-libtool: + -rm -rf .libs _libs + +ID: $(am__tagged_files) + $(am__define_uniq_tagged_files); mkid -fID $$unique +tags: tags-am +TAGS: tags + +tags-am: $(TAGS_DEPENDENCIES) $(am__tagged_files) + set x; \ + here=`pwd`; \ + $(am__define_uniq_tagged_files); \ + shift; \ + if test -z "$(ETAGS_ARGS)$$*$$unique"; then :; else \ + test -n "$$unique" || unique=$$empty_fix; \ + if test $$# -gt 0; then \ + $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \ + "$$@" $$unique; \ + else \ + $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \ + $$unique; \ + fi; \ + fi +ctags: ctags-am + +CTAGS: ctags +ctags-am: $(TAGS_DEPENDENCIES) $(am__tagged_files) + $(am__define_uniq_tagged_files); \ + test -z "$(CTAGS_ARGS)$$unique" \ + || $(CTAGS) $(CTAGSFLAGS) $(AM_CTAGSFLAGS) $(CTAGS_ARGS) \ + $$unique + +GTAGS: + here=`$(am__cd) $(top_builddir) && pwd` \ + && $(am__cd) $(top_srcdir) \ + && gtags -i $(GTAGS_ARGS) "$$here" +cscopelist: cscopelist-am + +cscopelist-am: $(am__tagged_files) + list='$(am__tagged_files)'; \ + case "$(srcdir)" in \ + [\\/]* | ?:[\\/]*) sdir="$(srcdir)" ;; \ + *) sdir=$(subdir)/$(srcdir) ;; \ + esac; \ + for i in $$list; do \ + if test -f "$$i"; then \ + echo "$(subdir)/$$i"; \ + else \ + echo "$$sdir/$$i"; \ + fi; \ + done >> $(top_builddir)/cscope.files + +distclean-tags: + -rm -f TAGS ID GTAGS GRTAGS GSYMS GPATH tags + +distdir: $(BUILT_SOURCES) + $(MAKE) $(AM_MAKEFLAGS) distdir-am + +distdir-am: $(DISTFILES) + @srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \ + topsrcdirstrip=`echo "$(top_srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \ + list='$(DISTFILES)'; \ + dist_files=`for file in $$list; do echo $$file; done | \ + sed -e "s|^$$srcdirstrip/||;t" \ + -e "s|^$$topsrcdirstrip/|$(top_builddir)/|;t"`; \ + case $$dist_files in \ + */*) $(MKDIR_P) `echo "$$dist_files" | \ + sed '/\//!d;s|^|$(distdir)/|;s,/[^/]*$$,,' | \ + sort -u` ;; \ + esac; \ + for file in $$dist_files; do \ + if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \ + if test -d $$d/$$file; then \ + dir=`echo "/$$file" | sed -e 's,/[^/]*$$,,'`; \ + if test -d "$(distdir)/$$file"; then \ + find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \ + fi; \ + if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \ + cp -fpR $(srcdir)/$$file "$(distdir)$$dir" || exit 1; \ + find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \ + fi; \ + cp -fpR $$d/$$file "$(distdir)$$dir" || exit 1; \ + else \ + test -f "$(distdir)/$$file" \ + || cp -p $$d/$$file "$(distdir)/$$file" \ + || exit 1; \ + fi; \ + done +check-am: all-am +check: check-am +all-am: Makefile $(LTLIBRARIES) +installdirs: + for dir in "$(DESTDIR)$(pkglibdir)"; do \ + test -z "$$dir" || $(MKDIR_P) "$$dir"; \ + done +install: install-am +install-exec: install-exec-am +install-data: install-data-am +uninstall: uninstall-am + +install-am: all-am + @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am + +installcheck: installcheck-am +install-strip: + if test -z '$(STRIP)'; then \ + $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \ + install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \ + install; \ + else \ + $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \ + install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \ + "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'" install; \ + fi +mostlyclean-generic: + +clean-generic: + +distclean-generic: + -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES) + -test . = "$(srcdir)" || test -z "$(CONFIG_CLEAN_VPATH_FILES)" || rm -f $(CONFIG_CLEAN_VPATH_FILES) + +maintainer-clean-generic: + @echo "This command is intended for maintainers to use" + @echo "it deletes files that may require special tools to rebuild." +clean: clean-am + +clean-am: clean-generic clean-libtool clean-pkglibLTLIBRARIES \ + mostlyclean-am + +distclean: distclean-am + -rm -f ./$(DEPDIR)/omelasticsearch_la-omelasticsearch.Plo + -rm -f Makefile +distclean-am: clean-am distclean-compile distclean-generic \ + distclean-tags + +dvi: dvi-am + +dvi-am: + +html: html-am + +html-am: + +info: info-am + +info-am: + +install-data-am: + +install-dvi: install-dvi-am + +install-dvi-am: + +install-exec-am: install-pkglibLTLIBRARIES + +install-html: install-html-am + +install-html-am: + +install-info: install-info-am + +install-info-am: + +install-man: + +install-pdf: install-pdf-am + +install-pdf-am: + +install-ps: install-ps-am + +install-ps-am: + +installcheck-am: + +maintainer-clean: maintainer-clean-am + -rm -f ./$(DEPDIR)/omelasticsearch_la-omelasticsearch.Plo + -rm -f Makefile +maintainer-clean-am: distclean-am maintainer-clean-generic + +mostlyclean: mostlyclean-am + +mostlyclean-am: mostlyclean-compile mostlyclean-generic \ + mostlyclean-libtool + +pdf: pdf-am + +pdf-am: + +ps: ps-am + +ps-am: + +uninstall-am: uninstall-pkglibLTLIBRARIES + +.MAKE: install-am install-strip + +.PHONY: CTAGS GTAGS TAGS all all-am am--depfiles check check-am clean \ + clean-generic clean-libtool clean-pkglibLTLIBRARIES \ + cscopelist-am ctags ctags-am distclean distclean-compile \ + distclean-generic distclean-libtool distclean-tags distdir dvi \ + dvi-am html html-am info info-am install install-am \ + install-data install-data-am install-dvi install-dvi-am \ + install-exec install-exec-am install-html install-html-am \ + install-info install-info-am install-man install-pdf \ + install-pdf-am install-pkglibLTLIBRARIES install-ps \ + install-ps-am install-strip installcheck installcheck-am \ + installdirs maintainer-clean maintainer-clean-generic \ + mostlyclean mostlyclean-compile mostlyclean-generic \ + mostlyclean-libtool pdf pdf-am ps ps-am tags tags-am uninstall \ + uninstall-am uninstall-pkglibLTLIBRARIES + +.PRECIOUS: Makefile + + +# Tell versions [3.59,3.63) of GNU make to not export all variables. +# Otherwise a system limit (for SysV at least) may be exceeded. +.NOEXPORT: diff --git a/plugins/omelasticsearch/README b/plugins/omelasticsearch/README new file mode 100644 index 0000000..b8bf415 --- /dev/null +++ b/plugins/omelasticsearch/README @@ -0,0 +1,21 @@ +How to access ElasticSearch on local machine (for testing): +=========================================================== +see: https://github.com/mobz/elasticsearch-head + +How to produce an error: +======================== +It's quite easy to get 400, if you put a wrong mapping to your +index. That would be easy to reproduce in "normal" omelasticsearch usage +conditions, by only altering the ES configuration: + +1. Make your index first. Let's call it "testindex": +$ curl -XPUT localhost:9200/testindex/ + +2. Put your mapping for a search type called "mytype", where you specify +that date property should be an integer: +$ curl -XPUT localhost:9200/testindex/mytype/_mapping -d '{"mytype":{"properties": {"timegenerated":{"type":"integer"}}}}' + +3. Now try to insert something where date is not an integer: +$ curl -XPOST localhost:9200/testindex/mytype/ -d '{"timegenerated":"bla"}' +{"error":"MapperParsingException[Failed to parse [date]]; nested: NumberFormatException[For input string: \"bla\"]; ","status":400} + diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c new file mode 100644 index 0000000..b297a92 --- /dev/null +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -0,0 +1,2319 @@ +/* omelasticsearch.c + * This is the http://www.elasticsearch.org/ output module. + * + * NOTE: read comments in module-template.h for more specifics! + * + * Copyright 2011 Nathan Scott. + * Copyright 2009-2022 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <memory.h> +#include <string.h> +#include <curl/curl.h> +#include <curl/easy.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#if defined(__FreeBSD__) +#include <unistd.h> +#endif +#include <json.h> +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "statsobj.h" +#include "cfsysline.h" +#include "unicode-helper.h" +#include "obj-types.h" +#include "ratelimit.h" +#include "ruleset.h" + +#ifndef O_LARGEFILE +# define O_LARGEFILE 0 +#endif + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omelasticsearch") + +/* internal structures */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(statsobj) +DEFobjCurrIf(prop) +DEFobjCurrIf(ruleset) + +statsobj_t *indexStats; +STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit) +STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail) +STATSCOUNTER_DEF(indexHTTPReqFail, mutIndexHTTPReqFail) +STATSCOUNTER_DEF(checkConnFail, mutCheckConnFail) +STATSCOUNTER_DEF(indexESFail, mutIndexESFail) +STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) +STATSCOUNTER_DEF(indexBadResponse, mutIndexBadResponse) +STATSCOUNTER_DEF(indexDuplicate, mutIndexDuplicate) +STATSCOUNTER_DEF(indexBadArgument, mutIndexBadArgument) +STATSCOUNTER_DEF(indexBulkRejection, mutIndexBulkRejection) +STATSCOUNTER_DEF(indexOtherResponse, mutIndexOtherResponse) +STATSCOUNTER_DEF(rebinds, mutRebinds) + +static prop_t *pInputName = NULL; + +# define META_STRT "{\"index\":{\"_index\": \"" +# define META_STRT_CREATE "{\"create\":{" /* \"_index\": \" */ +# define META_IX "\"_index\": \"" +# define META_TYPE "\",\"_type\":\"" +# define META_PIPELINE "\",\"pipeline\":\"" +# define META_PARENT "\",\"_parent\":\"" +# define META_ID "\", \"_id\":\"" +# define META_END "\"}}\n" +# define META_END_NOQUOTE " }}\n" + +typedef enum { + ES_WRITE_INDEX, + ES_WRITE_CREATE, + ES_WRITE_UPDATE, /* not supported */ + ES_WRITE_UPSERT /* not supported */ +} es_write_ops_t; + +#define WRKR_DATA_TYPE_ES 0xBADF0001 + +#define DEFAULT_REBIND_INTERVAL -1 + +/* REST API for elasticsearch hits this URL: + * http://<hostName>:<restPort>/<searchIndex>/<searchType> + */ +/* bulk API uses /_bulk */ +typedef struct curl_slist HEADER; +typedef struct instanceConf_s { + int defaultPort; + int fdErrFile; /* error file fd or -1 if not open */ + pthread_mutex_t mutErrFile; + uchar **serverBaseUrls; + int numServers; + long healthCheckTimeout; + long indexTimeout; + uchar *uid; + uchar *pwd; + uchar *authBuf; + uchar *searchIndex; + uchar *searchType; + uchar *pipelineName; + sbool skipPipelineIfEmpty; + uchar *parent; + uchar *tplName; + uchar *timeout; + uchar *bulkId; + uchar *errorFile; + int esVersion; + sbool errorOnly; + sbool interleaved; + sbool dynSrchIdx; + sbool dynSrchType; + sbool dynParent; + sbool dynBulkId; + sbool dynPipelineName; + sbool bulkmode; + size_t maxbytes; + sbool useHttps; + sbool allowUnsignedCerts; + sbool skipVerifyHost; + uchar *caCertFile; + uchar *myCertFile; + uchar *myPrivKeyFile; + es_write_ops_t writeOperation; + sbool retryFailures; + unsigned int ratelimitInterval; + unsigned int ratelimitBurst; + /* for retries */ + ratelimit_t *ratelimiter; + uchar *retryRulesetName; + ruleset_t *retryRuleset; + int rebindInterval; + struct instanceConf_s *next; +} instanceData; + +struct modConfData_s { + rsconf_t *pConf; /* our overall config object */ + instanceConf_t *root, *tail; +}; +static modConfData_t *loadModConf = NULL; /* modConf ptr to use for the current load process */ + +typedef struct wrkrInstanceData { + PTR_ASSERT_DEF + instanceData *pData; + int serverIndex; + int replyLen; + size_t replyBufLen; + char *reply; + CURL *curlCheckConnHandle; /* libcurl session handle for checking the server connection */ + CURL *curlPostHandle; /* libcurl session handle for posting data to the server */ + HEADER *curlHeader; /* json POST request info */ + uchar *restURL; /* last used URL for error reporting */ + struct { + es_str_t *data; + int nmemb; /* number of messages in batch (for statistics counting) */ + uchar *currTpl1; + uchar *currTpl2; + } batch; + int nOperations; /* counter used with rebindInterval */ +} wrkrInstanceData_t; + +/* tables for interfacing with the v6 config system */ +/* action (instance) parameters */ +static struct cnfparamdescr actpdescr[] = { + { "server", eCmdHdlrArray, 0 }, + { "serverport", eCmdHdlrInt, 0 }, + { "healthchecktimeout", eCmdHdlrInt, 0 }, + { "indextimeout", eCmdHdlrInt, 0 }, + { "uid", eCmdHdlrGetWord, 0 }, + { "pwd", eCmdHdlrGetWord, 0 }, + { "searchindex", eCmdHdlrGetWord, 0 }, + { "searchtype", eCmdHdlrGetWord, 0 }, + { "pipelinename", eCmdHdlrGetWord, 0 }, + { "skippipelineifempty", eCmdHdlrBinary, 0 }, + { "parent", eCmdHdlrGetWord, 0 }, + { "dynsearchindex", eCmdHdlrBinary, 0 }, + { "dynsearchtype", eCmdHdlrBinary, 0 }, + { "dynparent", eCmdHdlrBinary, 0 }, + { "bulkmode", eCmdHdlrBinary, 0 }, + { "maxbytes", eCmdHdlrSize, 0 }, + { "asyncrepl", eCmdHdlrGoneAway, 0 }, + { "usehttps", eCmdHdlrBinary, 0 }, + { "timeout", eCmdHdlrGetWord, 0 }, + { "errorfile", eCmdHdlrGetWord, 0 }, + { "erroronly", eCmdHdlrBinary, 0 }, + { "interleaved", eCmdHdlrBinary, 0 }, + { "template", eCmdHdlrGetWord, 0 }, + { "dynbulkid", eCmdHdlrBinary, 0 }, + { "dynpipelinename", eCmdHdlrBinary, 0 }, + { "bulkid", eCmdHdlrGetWord, 0 }, + { "allowunsignedcerts", eCmdHdlrBinary, 0 }, + { "skipverifyhost", eCmdHdlrBinary, 0 }, + { "tls.cacert", eCmdHdlrString, 0 }, + { "tls.mycert", eCmdHdlrString, 0 }, + { "tls.myprivkey", eCmdHdlrString, 0 }, + { "writeoperation", eCmdHdlrGetWord, 0 }, + { "retryfailures", eCmdHdlrBinary, 0 }, + { "ratelimit.interval", eCmdHdlrInt, 0 }, + { "ratelimit.burst", eCmdHdlrInt, 0 }, + { "retryruleset", eCmdHdlrString, 0 }, + { "rebindinterval", eCmdHdlrInt, 0 }, + { "esversion.major", eCmdHdlrPositiveInt, 0 } +}; +static struct cnfparamblk actpblk = + { CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr + }; + +static rsRetVal curlSetup(wrkrInstanceData_t *pWrkrData); + +BEGINcreateInstance +CODESTARTcreateInstance + int r; + pData->fdErrFile = -1; + if((r = pthread_mutex_init(&pData->mutErrFile, NULL)) != 0) { + LogError(r, RS_RET_ERR, "omelasticsearch: cannot create " + "error file mutex, failing this action"); + ABORT_FINALIZE(RS_RET_ERR); + } + pData->caCertFile = NULL; + pData->myCertFile = NULL; + pData->myPrivKeyFile = NULL; + pData->ratelimiter = NULL; + pData->retryRulesetName = NULL; + pData->retryRuleset = NULL; + pData->rebindInterval = DEFAULT_REBIND_INTERVAL; + pData->esVersion = 0; +finalize_it: +ENDcreateInstance + +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES); + pWrkrData->curlHeader = NULL; + pWrkrData->curlPostHandle = NULL; + pWrkrData->curlCheckConnHandle = NULL; + pWrkrData->serverIndex = 0; + pWrkrData->restURL = NULL; + if(pData->bulkmode) { + pWrkrData->batch.currTpl1 = NULL; + pWrkrData->batch.currTpl2 = NULL; + if((pWrkrData->batch.data = es_newStr(1024)) == NULL) { + LogError(0, RS_RET_OUT_OF_MEMORY, + "omelasticsearch: error creating batch string " + "turned off bulk mode\n"); + pData->bulkmode = 0; /* at least it works */ + } + } + pWrkrData->nOperations = 0; + pWrkrData->reply = NULL; + pWrkrData->replyLen = 0; + pWrkrData->replyBufLen = 0; + iRet = curlSetup(pWrkrData); +ENDcreateWrkrInstance + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + +BEGINfreeInstance + int i; +CODESTARTfreeInstance + if(pData->fdErrFile != -1) + close(pData->fdErrFile); + + if(loadModConf != NULL) { + /* we keep our instances in our own internal list - this also + * means we need to cleanup that list, else we cause grief. + */ + instanceData *prev = NULL; + for(instanceData *inst = loadModConf->root ; inst != NULL ; inst = inst->next) { + if(inst == pData) { + if(loadModConf->tail == inst) { + loadModConf->tail = prev; + } + prev->next = inst->next; + /* no need to correct inst back to prev - we exit now! */ + break; + } else { + prev = inst; + } + } + } + + pthread_mutex_destroy(&pData->mutErrFile); + for(i = 0 ; i < pData->numServers ; ++i) + free(pData->serverBaseUrls[i]); + free(pData->serverBaseUrls); + free(pData->uid); + free(pData->pwd); + free(pData->authBuf); + free(pData->searchIndex); + free(pData->searchType); + free(pData->pipelineName); + free(pData->parent); + free(pData->tplName); + free(pData->timeout); + free(pData->errorFile); + free(pData->bulkId); + free(pData->caCertFile); + free(pData->myCertFile); + free(pData->myPrivKeyFile); + free(pData->retryRulesetName); + if (pData->ratelimiter != NULL) + ratelimitDestruct(pData->ratelimiter); +ENDfreeInstance + +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + if(pWrkrData->curlHeader != NULL) { + curl_slist_free_all(pWrkrData->curlHeader); + pWrkrData->curlHeader = NULL; + } + if(pWrkrData->curlCheckConnHandle != NULL) { + curl_easy_cleanup(pWrkrData->curlCheckConnHandle); + pWrkrData->curlCheckConnHandle = NULL; + } + if(pWrkrData->curlPostHandle != NULL) { + curl_easy_cleanup(pWrkrData->curlPostHandle); + pWrkrData->curlPostHandle = NULL; + } + if (pWrkrData->restURL != NULL) { + free(pWrkrData->restURL); + pWrkrData->restURL = NULL; + } + es_deleteStr(pWrkrData->batch.data); + free(pWrkrData->reply); +ENDfreeWrkrInstance + +BEGINdbgPrintInstInfo + int i; +CODESTARTdbgPrintInstInfo + dbgprintf("omelasticsearch\n"); + dbgprintf("\ttemplate='%s'\n", pData->tplName); + dbgprintf("\tnumServers=%d\n", pData->numServers); + dbgprintf("\thealthCheckTimeout=%lu\n", pData->healthCheckTimeout); + dbgprintf("\tindexTimeout=%lu\n", pData->indexTimeout); + dbgprintf("\tserverBaseUrls="); + for(i = 0 ; i < pData->numServers ; ++i) + dbgprintf("%c'%s'", i == 0 ? '[' : ' ', pData->serverBaseUrls[i]); + dbgprintf("]\n"); + dbgprintf("\tdefaultPort=%d\n", pData->defaultPort); + dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid); + dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : ""); + dbgprintf("\tsearch index='%s'\n", pData->searchIndex == NULL + ? (uchar*)"(not configured)" : pData->searchIndex); + dbgprintf("\tsearch type='%s'\n", pData->searchType == NULL + ? (uchar*)"(not configured)" : pData->searchType); + dbgprintf("\tpipeline name='%s'\n", pData->pipelineName); + dbgprintf("\tdynamic pipeline name=%d\n", pData->dynPipelineName); + dbgprintf("\tskipPipelineIfEmpty=%d\n", pData->skipPipelineIfEmpty); + dbgprintf("\tparent='%s'\n", pData->parent); + dbgprintf("\ttimeout='%s'\n", pData->timeout); + dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx); + dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType); + dbgprintf("\tdynamic parent=%d\n", pData->dynParent); + dbgprintf("\tuse https=%d\n", pData->useHttps); + dbgprintf("\tbulkmode=%d\n", pData->bulkmode); + dbgprintf("\tmaxbytes=%zu\n", pData->maxbytes); + dbgprintf("\tallowUnsignedCerts=%d\n", pData->allowUnsignedCerts); + dbgprintf("\tskipVerifyHost=%d\n", pData->skipVerifyHost); + dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ? + (uchar*)"(not configured)" : pData->errorFile); + dbgprintf("\terroronly=%d\n", pData->errorOnly); + dbgprintf("\tinterleaved=%d\n", pData->interleaved); + dbgprintf("\tdynbulkid=%d\n", pData->dynBulkId); + dbgprintf("\tbulkid='%s'\n", pData->bulkId); + dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile); + dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile); + dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile); + dbgprintf("\twriteoperation='%d'\n", pData->writeOperation); + dbgprintf("\tretryfailures='%d'\n", pData->retryFailures); + dbgprintf("\tratelimit.interval='%u'\n", pData->ratelimitInterval); + dbgprintf("\tratelimit.burst='%u'\n", pData->ratelimitBurst); + dbgprintf("\trebindinterval='%d'\n", pData->rebindInterval); +ENDdbgPrintInstInfo + + +/* elasticsearch POST result string ... useful for debugging */ +static size_t +curlResult(void *const ptr, const size_t size, const size_t nmemb, void *const userdata) +{ + const char *const p = (const char *)ptr; + wrkrInstanceData_t *const pWrkrData = (wrkrInstanceData_t*) userdata; + char *buf; + const size_t size_add = size*nmemb; + size_t newlen; + PTR_ASSERT_CHK(pWrkrData, WRKR_DATA_TYPE_ES); + newlen = pWrkrData->replyLen + size_add; + if(newlen + 1 > pWrkrData->replyBufLen) { + if((buf = realloc(pWrkrData->reply, pWrkrData->replyBufLen + size_add + 1)) == NULL) { + LogError(errno, RS_RET_ERR, "omelasticsearch: realloc failed in curlResult"); + return 0; /* abort due to failure */ + } + pWrkrData->replyBufLen += size_add + 1; + pWrkrData->reply = buf; + } + memcpy(pWrkrData->reply+pWrkrData->replyLen, p, size_add); + pWrkrData->replyLen = newlen; + return size_add; +} + +/* Build basic URL part, which includes hostname and port as follows: + * http://hostname:port/ based on a server param + * Newly creates a cstr for this purpose. + * Note: serverParam MUST NOT end in '/' (caller must strip if it exists) + */ +static rsRetVal +computeBaseUrl(const char*const serverParam, + const int defaultPort, + const sbool useHttps, + uchar **baseUrl) +{ +# define SCHEME_HTTPS "https://" +# define SCHEME_HTTP "http://" + + char portBuf[64]; + int r = 0; + const char *host = serverParam; + DEFiRet; + + assert(serverParam[strlen(serverParam)-1] != '/'); + + es_str_t *urlBuf = es_newStr(256); + if (urlBuf == NULL) { + LogError(0, RS_RET_OUT_OF_MEMORY, + "omelasticsearch: failed to allocate es_str urlBuf in computeBaseUrl"); + ABORT_FINALIZE(RS_RET_ERR); + } + + /* Find where the hostname/ip of the server starts. If the scheme is not specified + in the uri, start the buffer with a scheme corresponding to the useHttps parameter. + */ + if (strcasestr(serverParam, SCHEME_HTTP)) + host = serverParam + strlen(SCHEME_HTTP); + else if (strcasestr(serverParam, SCHEME_HTTPS)) + host = serverParam + strlen(SCHEME_HTTPS); + else + r = useHttps ? es_addBuf(&urlBuf, SCHEME_HTTPS, sizeof(SCHEME_HTTPS)-1) : + es_addBuf(&urlBuf, SCHEME_HTTP, sizeof(SCHEME_HTTP)-1); + + if (r == 0) r = es_addBuf(&urlBuf, (char *)serverParam, strlen(serverParam)); + if (r == 0 && !strchr(host, ':')) { + snprintf(portBuf, sizeof(portBuf), ":%d", defaultPort); + r = es_addBuf(&urlBuf, portBuf, strlen(portBuf)); + } + if (r == 0) r = es_addChar(&urlBuf, '/'); + if (r == 0) *baseUrl = (uchar*) es_str2cstr(urlBuf, NULL); + + if (r != 0 || baseUrl == NULL) { + LogError(0, RS_RET_ERR, + "omelasticsearch: error occurred computing baseUrl from server %s", serverParam); + ABORT_FINALIZE(RS_RET_ERR); + } +finalize_it: + if (urlBuf) { + es_deleteStr(urlBuf); + } + RETiRet; +} + +static inline void +incrementServerIndex(wrkrInstanceData_t *pWrkrData) +{ + pWrkrData->serverIndex = (pWrkrData->serverIndex + 1) % pWrkrData->pData->numServers; +} + + +/* checks if connection to ES can be established; also iterates over + * potential servers to support high availability (HA) feature. If it + * needs to switch server, will record new one in curl handle. + */ +static rsRetVal ATTR_NONNULL() +checkConn(wrkrInstanceData_t *const pWrkrData) +{ +# define HEALTH_URI "_cat/health" + CURL *curl; + CURLcode res; + es_str_t *urlBuf; + char* healthUrl = NULL; + char* serverUrl; + int i; + int r; + DEFiRet; + + pWrkrData->replyLen = 0; + curl = pWrkrData->curlCheckConnHandle; + urlBuf = es_newStr(256); + if (urlBuf == NULL) { + LogError(0, RS_RET_OUT_OF_MEMORY, + "omelasticsearch: unable to allocate buffer for health check uri."); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + + for(i = 0; i < pWrkrData->pData->numServers; ++i) { + serverUrl = (char*) pWrkrData->pData->serverBaseUrls[pWrkrData->serverIndex]; + + es_emptyStr(urlBuf); + r = es_addBuf(&urlBuf, serverUrl, strlen(serverUrl)); + if(r == 0) r = es_addBuf(&urlBuf, HEALTH_URI, sizeof(HEALTH_URI)-1); + if(r == 0) healthUrl = es_str2cstr(urlBuf, NULL); + if(r != 0 || healthUrl == NULL) { + LogError(0, RS_RET_OUT_OF_MEMORY, + "omelasticsearch: unable to allocate buffer for health check uri."); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + + curl_easy_setopt(curl, CURLOPT_URL, healthUrl); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlResult); + res = curl_easy_perform(curl); + free(healthUrl); + + if (res == CURLE_OK) { + DBGPRINTF("omelasticsearch: checkConn %s completed with success " + "on attempt %d\n", serverUrl, i); + ABORT_FINALIZE(RS_RET_OK); + } + + DBGPRINTF("omelasticsearch: checkConn %s failed on attempt %d: %s\n", + serverUrl, i, curl_easy_strerror(res)); + STATSCOUNTER_INC(checkConnFail, mutCheckConnFail); + incrementServerIndex(pWrkrData); + } + + LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING, + "omelasticsearch: checkConn failed after %d attempts.", i); + ABORT_FINALIZE(RS_RET_SUSPENDED); + +finalize_it: + if(urlBuf != NULL) + es_deleteStr(urlBuf); + RETiRet; +} + + +BEGINtryResume +CODESTARTtryResume + DBGPRINTF("omelasticsearch: tryResume called\n"); + iRet = checkConn(pWrkrData); +ENDtryResume + + +/* get the current index and type for this message */ +static void ATTR_NONNULL(1) +getIndexTypeAndParent(const instanceData *const pData, uchar **const tpls, + uchar **const srchIndex, uchar **const srchType, uchar **const parent, + uchar **const bulkId, uchar **const pipelineName) +{ + *srchIndex = pData->searchIndex; + *parent = pData->parent; + *srchType = pData->searchType; + *bulkId = pData->bulkId; + *pipelineName = pData->pipelineName; + if(tpls == NULL) { + goto done; + } + + int iNumTpls = 1; + if(pData->dynSrchIdx) { + *srchIndex = tpls[iNumTpls]; + ++iNumTpls; + } + if(pData->dynSrchType) { + *srchType = tpls[iNumTpls]; + ++iNumTpls; + } + if(pData->dynParent) { + *parent = tpls[iNumTpls]; + ++iNumTpls; + } + if(pData->dynBulkId) { + *bulkId = tpls[iNumTpls]; + ++iNumTpls; + } + if(pData->dynPipelineName) { + *pipelineName = tpls[iNumTpls]; + ++iNumTpls; + } + +done: + return; +} + + +static rsRetVal ATTR_NONNULL(1) +setPostURL(wrkrInstanceData_t *const pWrkrData, uchar **const tpls) +{ + uchar *searchIndex = NULL; + uchar *searchType; + uchar *pipelineName; + uchar *parent; + uchar *bulkId; + char* baseUrl; + /* since 7.0, the API always requires /idx/_doc, so use that if searchType is not explicitly set */ + uchar* actualSearchType = (uchar*)"_doc"; + es_str_t *url; + int r = 0; + DEFiRet; + instanceData *const pData = pWrkrData->pData; + char separator; + const int bulkmode = pData->bulkmode; + + baseUrl = (char*)pData->serverBaseUrls[pWrkrData->serverIndex]; + url = es_newStrFromCStr(baseUrl, strlen(baseUrl)); + if (url == NULL) { + LogError(0, RS_RET_OUT_OF_MEMORY, + "omelasticsearch: error allocating new estr for POST url."); + ABORT_FINALIZE(RS_RET_ERR); + } + + separator = '?'; + + if(bulkmode) { + r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); + parent = NULL; + } else { + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName); + if(searchIndex != NULL) { + r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); + if(searchType != NULL && searchType[0] != '\0') { + actualSearchType = searchType; + } + if(r == 0) r = es_addChar(&url, '/'); + if(r == 0) r = es_addBuf(&url, (char*)actualSearchType, ustrlen(actualSearchType)); + } + if(pipelineName != NULL && (!pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) { + if(r == 0) r = es_addChar(&url, separator); + if(r == 0) r = es_addBuf(&url, "pipeline=", sizeof("pipeline=")-1); + if(r == 0) r = es_addBuf(&url, (char*)pipelineName, ustrlen(pipelineName)); + separator = '&'; + } + } + + if(pData->timeout != NULL) { + if(r == 0) r = es_addChar(&url, separator); + if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1); + if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout)); + separator = '&'; + } + + if(parent != NULL) { + if(r == 0) r = es_addChar(&url, separator); + if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1); + if(r == 0) es_addBuf(&url, (char*)parent, ustrlen(parent)); + } + + if(pWrkrData->restURL != NULL) + free(pWrkrData->restURL); + + pWrkrData->restURL = (uchar*)es_str2cstr(url, NULL); + curl_easy_setopt(pWrkrData->curlPostHandle, CURLOPT_URL, pWrkrData->restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pWrkrData->restURL); + +finalize_it: + if (url != NULL) + es_deleteStr(url); + RETiRet; +} + + +/* this method computes the expected size of adding the next message into + * the batched request to elasticsearch + */ +static size_t +computeMessageSize(const wrkrInstanceData_t *const pWrkrData, + const uchar *const message, + uchar **const tpls) +{ + size_t r = sizeof(META_END)-1 + sizeof("\n")-1; + if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE) + r += sizeof(META_STRT_CREATE)-1; + else + r += sizeof(META_STRT)-1; + + uchar *searchIndex = NULL; + uchar *searchType; + uchar *parent = NULL; + uchar *bulkId = NULL; + uchar *pipelineName; + + getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName); + r += ustrlen((char *)message); + if(searchIndex != NULL) { + r += ustrlen(searchIndex); + } + if(searchType != NULL) { + if(searchType[0] == '\0') { + r += 4; // "_doc" + } else { + r += ustrlen(searchType); + } + } + if(parent != NULL) { + r += sizeof(META_PARENT)-1 + ustrlen(parent); + } + if(bulkId != NULL) { + r += sizeof(META_ID)-1 + ustrlen(bulkId); + } + if(pipelineName != NULL && (!pWrkrData->pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) { + r += sizeof(META_PIPELINE)-1 + ustrlen(pipelineName); + } + + return r; +} + + +/* this method does not directly submit but builds a batch instead. It + * may submit, if we have dynamic index/type and the current type or + * index changes. + */ +static rsRetVal +buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls) +{ + int length = strlen((char *)message); + int r; + int endQuote = 1; + uchar *searchIndex = NULL; + uchar *searchType; + uchar *parent = NULL; + uchar *bulkId = NULL; + uchar *pipelineName; + DEFiRet; + + getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName); + if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE) { + r = es_addBuf(&pWrkrData->batch.data, META_STRT_CREATE, sizeof(META_STRT_CREATE)-1); + endQuote = 0; + } else + r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1); + if(searchIndex != NULL) { + endQuote = 1; + if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE) + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_IX, sizeof(META_IX)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex, + ustrlen(searchIndex)); + if(searchType != NULL && searchType[0] != '\0') { + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType, + ustrlen(searchType)); + } + } + if(parent != NULL) { + endQuote = 1; + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)parent, ustrlen(parent)); + } + if(pipelineName != NULL && (!pWrkrData->pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) { + endQuote = 1; + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PIPELINE, sizeof(META_PIPELINE)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)pipelineName, ustrlen(pipelineName)); + } + if(bulkId != NULL) { + endQuote = 1; + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_ID, sizeof(META_ID)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)bulkId, ustrlen(bulkId)); + } + if(endQuote == 0) { + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END_NOQUOTE, sizeof(META_END_NOQUOTE)-1); + } else { + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1); + } + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)message, length); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, "\n", sizeof("\n")-1); + if(r != 0) { + LogError(0, RS_RET_ERR, + "omelasticsearch: growing batch failed with code %d", r); + ABORT_FINALIZE(RS_RET_ERR); + } + ++pWrkrData->batch.nmemb; + iRet = RS_RET_OK; + +finalize_it: + RETiRet; +} + +/* + * Dumps entire bulk request and response in error log + */ +static rsRetVal +getDataErrorDefault(wrkrInstanceData_t *pWrkrData,fjson_object **pReplyRoot,uchar *reqmsg,char **rendered) +{ + DEFiRet; + fjson_object *req=NULL; + fjson_object *errRoot=NULL; + fjson_object *replyRoot = *pReplyRoot; + + if((req=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + fjson_object_object_add(req, "url", fjson_object_new_string((char*)pWrkrData->restURL)); + fjson_object_object_add(req, "postdata", fjson_object_new_string((char*)reqmsg)); + + if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + fjson_object_object_add(errRoot, "request", req); + fjson_object_object_add(errRoot, "reply", replyRoot); + *rendered = strdup((char*)fjson_object_to_json_string(errRoot)); + + req=NULL; + fjson_object_put(errRoot); + + *pReplyRoot = NULL; /* tell caller not to delete once again! */ + + finalize_it: + fjson_object_put(req); + RETiRet; +} + +/* + * Sets bulkRequestNextSectionStart pointer to next sections start in the buffer pointed by bulkRequest. + * Sections are marked by { and } + */ +static rsRetVal +getSection(const char* bulkRequest, const char **bulkRequestNextSectionStart ) +{ + DEFiRet; + char* idx =0; + if( (idx = strchr(bulkRequest,'\n')) != 0)/*intermediate section*/ + { + *bulkRequestNextSectionStart = ++idx; + } + else + { + *bulkRequestNextSectionStart=0; + ABORT_FINALIZE(RS_RET_ERR); + } + + finalize_it: + RETiRet; +} + +/* + * Sets the new string in singleRequest for one request in bulkRequest + * and sets lastLocation pointer to the location till which bulkrequest has been parsed. + * (used as input to make function thread safe.) + */ +static rsRetVal +getSingleRequest(const char* bulkRequest, char** singleRequest, const char **lastLocation) +{ + DEFiRet; + const char *req = bulkRequest; + const char *start = bulkRequest; + if (getSection(req,&req)!=RS_RET_OK) + ABORT_FINALIZE(RS_RET_ERR); + + if (getSection(req,&req)!=RS_RET_OK) + ABORT_FINALIZE(RS_RET_ERR); + + CHKmalloc(*singleRequest = (char*) calloc (req - start+ 1 + 1,1)); + /* (req - start+ 1 == length of data + 1 for terminal char)*/ + memcpy(*singleRequest,start,req - start); + *lastLocation=req; + +finalize_it: + RETiRet; +} + +/* + * check the status of response from ES + */ +static int checkReplyStatus(fjson_object* ok) { + return (ok == NULL || !fjson_object_is_type(ok, fjson_type_int) || fjson_object_get_int(ok) < 0 || + fjson_object_get_int(ok) > 299); +} + +/* + * Context object for error file content creation or status check + * response_item - the full {"create":{"_index":"idxname",.....}} + * response_body - the inner hash of the response_item - {"_index":"idxname",...} + * status - the "status" field from the inner hash - "status":500 + * should be able to use fjson_object_get_int(status) to get the http result code + */ +typedef struct exeContext{ + int statusCheckOnly; + fjson_object *errRoot; + rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response, + fjson_object *response_item, fjson_object *response_body, fjson_object *status); + es_write_ops_t writeOperation; + ratelimit_t *ratelimiter; + ruleset_t *retryRuleset; + struct json_tokener *jTokener; +} context; + +/* + * get content to be written in error file using context passed + */ +static rsRetVal +parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **pReplyRoot,uchar *reqmsg,context *ctx) +{ + DEFiRet; + fjson_object *replyRoot = *pReplyRoot; + int i; + int numitems; + fjson_object *items=NULL, *jo_errors = NULL; + + /*iterate over items*/ + if(!fjson_object_object_get_ex(replyRoot, "items", &items)) { + LogError(0, RS_RET_DATAFAIL, + "omelasticsearch: error in elasticsearch reply: " + "bulkmode insert does not return array, reply is: %s", + pWrkrData->reply); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + + numitems = fjson_object_array_length(items); + + int errors = 0; + if(fjson_object_object_get_ex(replyRoot, "errors", &jo_errors)) { + errors = fjson_object_get_boolean(jo_errors); + if (!errors && pWrkrData->pData->retryFailures) { + STATSCOUNTER_ADD(indexSuccess, mutIndexSuccess, numitems); + return RS_RET_OK; + } + } + + if (reqmsg) { + DBGPRINTF("omelasticsearch: Entire request %s\n", reqmsg); + } else { + DBGPRINTF("omelasticsearch: Empty request\n"); + } + const char *lastReqRead= (char*)reqmsg; + + DBGPRINTF("omelasticsearch: %d items in reply\n", numitems); + for(i = 0 ; i < numitems ; ++i) { + + fjson_object *item=NULL; + fjson_object *result=NULL; + fjson_object *ok=NULL; + int itemStatus=0; + item = fjson_object_array_get_idx(items, i); + if(item == NULL) { + LogError(0, RS_RET_DATAFAIL, + "omelasticsearch: error in elasticsearch reply: " + "cannot obtain reply array item %d", i); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + fjson_object_object_get_ex(item, "create", &result); + if(result == NULL || !fjson_object_is_type(result, fjson_type_object)) { + fjson_object_object_get_ex(item, "index", &result); + if(result == NULL || !fjson_object_is_type(result, fjson_type_object)) { + LogError(0, RS_RET_DATAFAIL, + "omelasticsearch: error in elasticsearch reply: " + "cannot obtain 'result' item for #%d", i); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + } + + fjson_object_object_get_ex(result, "status", &ok); + itemStatus = checkReplyStatus(ok); + + char *request =0; + char *response =0; + if(ctx->statusCheckOnly || (NULL == lastReqRead)) { + if(itemStatus) { + DBGPRINTF("omelasticsearch: error in elasticsearch reply: item %d, " + "status is %d\n", i, fjson_object_get_int(ok)); + DBGPRINTF("omelasticsearch: status check found error.\n"); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + + } else { + if(getSingleRequest(lastReqRead,&request,&lastReqRead) != RS_RET_OK) { + DBGPRINTF("omelasticsearch: Couldn't get post request\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + response = (char*)fjson_object_to_json_string_ext(result, FJSON_TO_STRING_PLAIN); + + if(response==NULL) { + free(request);/*as its has been assigned.*/ + DBGPRINTF("omelasticsearch: Error getting fjson_object_to_string_ext. Cannot " + "continue\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + + /*call the context*/ + rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request, + response, item, result, ok); + + /*free memory in any case*/ + free(request); + + if(ret != RS_RET_OK) { + DBGPRINTF("omelasticsearch: Error in preparing errorfileContent. Cannot continue\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + } + } + +finalize_it: + RETiRet; +} + +/* + * Dumps only failed requests of bulk insert + */ +static rsRetVal +getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response, + fjson_object *response_item, fjson_object *response_body, fjson_object *status) +{ + DEFiRet; + (void)response_item; /* unused */ + (void)response_body; /* unused */ + (void)status; /* unused */ + if(itemStatus) { + fjson_object *onlyErrorResponses =NULL; + fjson_object *onlyErrorRequests=NULL; + + if(!fjson_object_object_get_ex(ctx->errRoot, "reply", &onlyErrorResponses)) { + DBGPRINTF("omelasticsearch: Failed to get reply json array. Invalid context. Cannot " + "continue\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + fjson_object_array_add(onlyErrorResponses, fjson_object_new_string(response)); + + if(!fjson_object_object_get_ex(ctx->errRoot, "request", &onlyErrorRequests)) { + DBGPRINTF("omelasticsearch: Failed to get request json array. Invalid context. Cannot " + "continue\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + + fjson_object_array_add(onlyErrorRequests, fjson_object_new_string(request)); + + } + +finalize_it: + RETiRet; +} + +/* + * Dumps all requests of bulk insert interleaved with request and response + */ + +static rsRetVal +getDataInterleaved(context *ctx, + int __attribute__((unused)) itemStatus, + char *request, + char *response, + fjson_object *response_item, + fjson_object *response_body, + fjson_object *status +) +{ + DEFiRet; + (void)response_item; /* unused */ + (void)response_body; /* unused */ + (void)status; /* unused */ + fjson_object *interleaved =NULL; + if(!fjson_object_object_get_ex(ctx->errRoot, "response", &interleaved)) { + DBGPRINTF("omelasticsearch: Failed to get response json array. Invalid context. Cannot continue\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + + fjson_object *interleavedNode=NULL; + /*create interleaved node that has req and response json data*/ + if((interleavedNode=fjson_object_new_object()) == NULL) + { + DBGPRINTF("omelasticsearch: Failed to create interleaved node. Cann't continue\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + fjson_object_object_add(interleavedNode,"request", fjson_object_new_string(request)); + fjson_object_object_add(interleavedNode,"reply", fjson_object_new_string(response)); + + fjson_object_array_add(interleaved, interleavedNode); + + + + finalize_it: + RETiRet; +} + + +/* + * Dumps only failed requests of bulk insert interleaved with request and response + */ + +static rsRetVal +getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response, + fjson_object *response_item, fjson_object *response_body, fjson_object *status) +{ + DEFiRet; + if (itemStatus) { + if(getDataInterleaved(ctx, itemStatus,request,response, + response_item, response_body, status)!= RS_RET_OK) { + ABORT_FINALIZE(RS_RET_ERR); + } + } + + finalize_it: + RETiRet; +} + +/* input JSON looks like this: + * {"someoperation":{"field1":"value1","field2":{.....}}} + * output looks like this: + * {"writeoperation":"someoperation","field1":"value1","field2":{.....}} + */ +static rsRetVal +formatBulkReqOrResp(fjson_object *jo_input, fjson_object *jo_output) +{ + DEFiRet; + fjson_object *jo = NULL; + struct json_object_iterator it = json_object_iter_begin(jo_input); + struct json_object_iterator itEnd = json_object_iter_end(jo_input); + + /* set the writeoperation if not already set */ + if (!fjson_object_object_get_ex(jo_output, "writeoperation", NULL)) { + const char *optype = NULL; + if (!json_object_iter_equal(&it, &itEnd)) + optype = json_object_iter_peek_name(&it); + if (optype) { + jo = json_object_new_string(optype); + } else { + jo = json_object_new_string("unknown"); + } + CHKmalloc(jo); + json_object_object_add(jo_output, "writeoperation", jo); + } + if (!json_object_iter_equal(&it, &itEnd)) { + /* now iterate the operation object */ + jo = json_object_iter_peek_value(&it); + it = json_object_iter_begin(jo); + itEnd = json_object_iter_end(jo); + while (!json_object_iter_equal(&it, &itEnd)) { + const char *name = json_object_iter_peek_name(&it); + /* do not overwrite existing fields */ + if (!fjson_object_object_get_ex(jo_output, name, NULL)) { + json_object_object_add(jo_output, name, + json_object_get(json_object_iter_peek_value(&it))); + } + json_object_iter_next(&it); + } + } +finalize_it: + RETiRet; +} + +/* request string looks like this (other fields are "_parent" and "pipeline") + * "{\"create\":{\"_index\": \"rsyslog_testbench\",\"_type\":\"test-type\", + * \"_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}}\n + * {\"msgnum\":\"x00000000\",\"viaq_msg_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}\n" + * store the metadata header fields in the metadata object + * start = first \n + 1 + * end = last \n + */ +static rsRetVal +createMsgFromRequest(const char *request, context *ctx, smsg_t **msg, fjson_object *omes) +{ + DEFiRet; + fjson_object *jo_msg = NULL, *jo_metadata = NULL, *jo_request = NULL; + const char *datastart, *dataend; + size_t datalen; + enum json_tokener_error json_error; + + *msg = NULL; + if (!(datastart = strchr(request, '\n')) || (datastart[1] != '{')) { + LogError(0, RS_RET_ERR, + "omelasticsearch: malformed original request - " + "could not find start of original data [%s]", + request); + ABORT_FINALIZE(RS_RET_ERR); + } + datalen = datastart - request; + json_tokener_reset(ctx->jTokener); + jo_metadata = json_tokener_parse_ex(ctx->jTokener, request, datalen); + json_error = fjson_tokener_get_error(ctx->jTokener); + if (!jo_metadata || (json_error != fjson_tokener_success)) { + LogError(0, RS_RET_ERR, + "omelasticsearch: parse error [%s] - could not convert original " + "request metadata header JSON back into JSON object [%s]", + fjson_tokener_error_desc(json_error), request); + ABORT_FINALIZE(RS_RET_ERR); + } + CHKiRet(formatBulkReqOrResp(jo_metadata, omes)); + + datastart++; /* advance to '{' */ + if (!(dataend = strchr(datastart, '\n')) || (dataend[1] != '\0')) { + LogError(0, RS_RET_ERR, + "omelasticsearch: malformed original request - " + "could not find end of original data [%s]", + request); + ABORT_FINALIZE(RS_RET_ERR); + } + datalen = dataend - datastart; + json_tokener_reset(ctx->jTokener); + jo_request = json_tokener_parse_ex(ctx->jTokener, datastart, datalen); + json_error = fjson_tokener_get_error(ctx->jTokener); + if (!jo_request || (json_error != fjson_tokener_success)) { + LogError(0, RS_RET_ERR, + "omelasticsearch: parse error [%s] - could not convert original " + "request JSON back into JSON object [%s]", + fjson_tokener_error_desc(json_error), request); + ABORT_FINALIZE(RS_RET_ERR); + } + + CHKiRet(msgConstruct(msg)); + MsgSetFlowControlType(*msg, eFLOWCTL_FULL_DELAY); + MsgSetInputName(*msg, pInputName); + if (fjson_object_object_get_ex(jo_request, "message", &jo_msg)) { + const char *rawmsg = json_object_get_string(jo_msg); + const size_t msgLen = (size_t)json_object_get_string_len(jo_msg); + MsgSetRawMsg(*msg, rawmsg, msgLen); + } else { + /* use entire data part of request as rawmsg */ + MsgSetRawMsg(*msg, datastart, datalen); + } + MsgSetMSGoffs(*msg, 0); /* we do not have a header... */ + MsgSetTAG(*msg, (const uchar *)"omes", 4); + CHKiRet(msgAddJSON(*msg, (uchar*)"!", jo_request, 0, 0)); + +finalize_it: + if (jo_metadata) + json_object_put(jo_metadata); + RETiRet; +} + + +static rsRetVal +getDataRetryFailures(context *ctx,int itemStatus,char *request,char *response, + fjson_object *response_item, fjson_object *response_body, fjson_object *status) +{ + DEFiRet; + fjson_object *omes = NULL, *jo = NULL; + int istatus = fjson_object_get_int(status); + int iscreateop = 0; + const char *optype = NULL; + smsg_t *msg = NULL; + int need_free_omes = 0; + + (void)response; + (void)itemStatus; + (void)response_body; + CHKmalloc(omes = json_object_new_object()); + need_free_omes = 1; + /* this adds metadata header fields to omes */ + if (RS_RET_OK != (iRet = createMsgFromRequest(request, ctx, &msg, omes))) { + if (iRet != RS_RET_OUT_OF_MEMORY) { + STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse); + } else { + ABORT_FINALIZE(iRet); + } + } + CHKmalloc(msg); + /* this adds response fields as local variables to omes */ + if (RS_RET_OK != (iRet = formatBulkReqOrResp(response_item, omes))) { + if (iRet != RS_RET_OUT_OF_MEMORY) { + STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse); + } else { + ABORT_FINALIZE(iRet); + } + } + if (fjson_object_object_get_ex(omes, "writeoperation", &jo)) { + optype = json_object_get_string(jo); + if (optype && !strcmp("create", optype)) + iscreateop = 1; + if (optype && !strcmp("index", optype) && (ctx->writeOperation == ES_WRITE_INDEX)) + iscreateop = 1; + } + + if (!optype) { + STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse); + LogMsg(0, RS_RET_ERR, LOG_INFO, + "omelasticsearch: no recognized operation type in response [%s]", + response); + } else if ((istatus == 200) || (istatus == 201)) { + STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); + } else if ((istatus == 409) && iscreateop) { + STATSCOUNTER_INC(indexDuplicate, mutIndexDuplicate); + } else if ((istatus == 400) || (istatus < 200)) { + STATSCOUNTER_INC(indexBadArgument, mutIndexBadArgument); + } else { + fjson_object *error = NULL, *errtype = NULL; + if(fjson_object_object_get_ex(omes, "error", &error) && + fjson_object_object_get_ex(error, "type", &errtype)) { + if (istatus == 429) { + STATSCOUNTER_INC(indexBulkRejection, mutIndexBulkRejection); + } else { + STATSCOUNTER_INC(indexOtherResponse, mutIndexOtherResponse); + } + } else { + STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse); + LogMsg(0, RS_RET_ERR, LOG_INFO, + "omelasticsearch: unexpected error response [%s]", + response); + } + } + + need_free_omes = 0; + CHKiRet(msgAddJSON(msg, (uchar*)".omes", omes, 0, 0)); + MsgSetRuleset(msg, ctx->retryRuleset); + CHKiRet(ratelimitAddMsg(ctx->ratelimiter, NULL, msg)); +finalize_it: + if (need_free_omes) + json_object_put(omes); + RETiRet; +} + +/* + * get erroronly context + */ +static rsRetVal +initializeErrorOnlyConext(wrkrInstanceData_t *pWrkrData,context *ctx){ + DEFiRet; + ctx->statusCheckOnly=0; + fjson_object *errRoot=NULL; + fjson_object *onlyErrorResponses =NULL; + fjson_object *onlyErrorRequests=NULL; + if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + + if((onlyErrorResponses=fjson_object_new_array()) == NULL) { + fjson_object_put(errRoot); + ABORT_FINALIZE(RS_RET_ERR); + } + if((onlyErrorRequests=fjson_object_new_array()) == NULL) { + fjson_object_put(errRoot); + fjson_object_put(onlyErrorResponses); + ABORT_FINALIZE(RS_RET_ERR); + } + + fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL)); + fjson_object_object_add(errRoot,"request",onlyErrorRequests); + fjson_object_object_add(errRoot, "reply", onlyErrorResponses); + ctx->errRoot = errRoot; + ctx->prepareErrorFileContent= &getDataErrorOnly; + finalize_it: + RETiRet; +} + +/* + * get interleaved context + */ +static rsRetVal +initializeInterleavedConext(wrkrInstanceData_t *pWrkrData,context *ctx){ + DEFiRet; + ctx->statusCheckOnly=0; + fjson_object *errRoot=NULL; + fjson_object *interleaved =NULL; + if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + if((interleaved=fjson_object_new_array()) == NULL) { + fjson_object_put(errRoot); + ABORT_FINALIZE(RS_RET_ERR); + } + + + fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL)); + fjson_object_object_add(errRoot,"response",interleaved); + ctx->errRoot = errRoot; + ctx->prepareErrorFileContent= &getDataInterleaved; + finalize_it: + RETiRet; +} + +/*get interleaved context*/ +static rsRetVal +initializeErrorInterleavedConext(wrkrInstanceData_t *pWrkrData,context *ctx){ + DEFiRet; + ctx->statusCheckOnly=0; + fjson_object *errRoot=NULL; + fjson_object *interleaved =NULL; + if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + if((interleaved=fjson_object_new_array()) == NULL) { + fjson_object_put(errRoot); + ABORT_FINALIZE(RS_RET_ERR); + } + + + fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL)); + fjson_object_object_add(errRoot,"response",interleaved); + ctx->errRoot = errRoot; + ctx->prepareErrorFileContent= &getDataErrorOnlyInterleaved; + finalize_it: + RETiRet; +} + +/*get retry failures context*/ +static rsRetVal +initializeRetryFailuresContext(wrkrInstanceData_t *pWrkrData,context *ctx){ + DEFiRet; + ctx->statusCheckOnly=0; + fjson_object *errRoot=NULL; + if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + + + fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL)); + ctx->errRoot = errRoot; + ctx->prepareErrorFileContent= &getDataRetryFailures; + CHKmalloc(ctx->jTokener = json_tokener_new()); + finalize_it: + RETiRet; +} + + +/* write data error request/replies to separate error file + * Note: we open the file but never close it before exit. If it + * needs to be closed, HUP must be sent. + */ +static rsRetVal ATTR_NONNULL() +writeDataError(wrkrInstanceData_t *const pWrkrData, + instanceData *const pData, fjson_object **const pReplyRoot, + uchar *const reqmsg) +{ + char *rendered = NULL; + size_t toWrite; + ssize_t wrRet; + sbool bMutLocked = 0; + context ctx; + ctx.errRoot=0; + ctx.writeOperation = pWrkrData->pData->writeOperation; + ctx.ratelimiter = pWrkrData->pData->ratelimiter; + ctx.retryRuleset = pWrkrData->pData->retryRuleset; + ctx.jTokener = NULL; + DEFiRet; + + if(pData->errorFile == NULL) { + DBGPRINTF("omelasticsearch: no local error logger defined - " + "ignoring ES error information\n"); + FINALIZE; + } + + pthread_mutex_lock(&pData->mutErrFile); + bMutLocked = 1; + + DBGPRINTF("omelasticsearch: error file mode: erroronly='%d' errorInterleaved='%d'\n", + pData->errorOnly, pData->interleaved); + + if(pData->interleaved ==0 && pData->errorOnly ==0)/*default write*/ + { + if(getDataErrorDefault(pWrkrData,pReplyRoot, reqmsg, &rendered) != RS_RET_OK) { + ABORT_FINALIZE(RS_RET_ERR); + } + } else { + /*get correct context.*/ + if(pData->interleaved && pData->errorOnly) + { + if(initializeErrorInterleavedConext(pWrkrData, &ctx) != RS_RET_OK) { + DBGPRINTF("omelasticsearch: error initializing error interleaved context.\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + + } else if(pData->errorOnly) { + if(initializeErrorOnlyConext(pWrkrData, &ctx) != RS_RET_OK) { + + DBGPRINTF("omelasticsearch: error initializing error only context.\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + } else if(pData->interleaved) { + if(initializeInterleavedConext(pWrkrData, &ctx) != RS_RET_OK) { + DBGPRINTF("omelasticsearch: error initializing error interleaved context.\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + } else if(pData->retryFailures) { + if(initializeRetryFailuresContext(pWrkrData, &ctx) != RS_RET_OK) { + DBGPRINTF("omelasticsearch: error initializing retry failures context.\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + } else { + DBGPRINTF("omelasticsearch: None of the modes match file write. No data to write.\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + + /*execute context*/ + if(parseRequestAndResponseForContext(pWrkrData, pReplyRoot, reqmsg, &ctx)!= RS_RET_OK) { + DBGPRINTF("omelasticsearch: error creating file content.\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + CHKmalloc(rendered = strdup((char*)fjson_object_to_json_string(ctx.errRoot))); + } + + + if(pData->fdErrFile == -1) { + pData->fdErrFile = open((char*)pData->errorFile, + O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, + S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); + if(pData->fdErrFile == -1) { + LogError(errno, RS_RET_ERR, "omelasticsearch: error opening error file %s", + pData->errorFile); + ABORT_FINALIZE(RS_RET_ERR); + } + } + + /* we do not do real error-handling on the err file, as this finally complicates + * things way to much. + */ + DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered); + toWrite = strlen(rendered) + 1; + /* Note: we overwrite the '\0' terminator with '\n' -- so we avoid + * caling malloc() -- write() does NOT need '\0'! + */ + rendered[toWrite-1] = '\n'; /* NO LONGER A STRING! */ + wrRet = write(pData->fdErrFile, rendered, toWrite); + if(wrRet != (ssize_t) toWrite) { + LogError(errno, RS_RET_IO_ERROR, + "omelasticsearch: error writing error file %s, write returned %lld", + pData->errorFile, (long long) wrRet); + } + +finalize_it: + if(bMutLocked) + pthread_mutex_unlock(&pData->mutErrFile); + free(rendered); + fjson_object_put(ctx.errRoot); + if (ctx.jTokener) + json_tokener_free(ctx.jTokener); + RETiRet; +} + + +static rsRetVal +checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root, uchar *reqmsg) +{ + DEFiRet; + context ctx; + ctx.errRoot = 0; + ctx.writeOperation = pWrkrData->pData->writeOperation; + ctx.ratelimiter = pWrkrData->pData->ratelimiter; + ctx.retryRuleset = pWrkrData->pData->retryRuleset; + ctx.statusCheckOnly=1; + ctx.jTokener = NULL; + if (pWrkrData->pData->retryFailures) { + ctx.statusCheckOnly=0; + CHKiRet(initializeRetryFailuresContext(pWrkrData, &ctx)); + } + if(parseRequestAndResponseForContext(pWrkrData,&root,reqmsg,&ctx)!= RS_RET_OK) { + DBGPRINTF("omelasticsearch: error found in elasticsearch reply\n"); + ABORT_FINALIZE(RS_RET_DATAFAIL); + } + +finalize_it: + fjson_object_put(ctx.errRoot); + if (ctx.jTokener) + json_tokener_free(ctx.jTokener); + RETiRet; +} + + +static rsRetVal +checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg) +{ + fjson_object *root; + fjson_object *status; + DEFiRet; + + root = fjson_tokener_parse(pWrkrData->reply); + if(root == NULL) { + LogMsg(0, RS_RET_ERR, LOG_WARNING, + "omelasticsearch: could not parse JSON result"); + ABORT_FINALIZE(RS_RET_ERR); + } + + if(pWrkrData->pData->bulkmode) { + iRet = checkResultBulkmode(pWrkrData, root, reqmsg); + } else { + if(fjson_object_object_get_ex(root, "status", &status)) { + iRet = RS_RET_DATAFAIL; + } + } + + /* Note: we ignore errors writing the error file, as we cannot handle + * these in any case. + */ + if(iRet == RS_RET_DATAFAIL) { + STATSCOUNTER_INC(indexESFail, mutIndexESFail); + writeDataError(pWrkrData, pWrkrData->pData, &root, reqmsg); + iRet = RS_RET_OK; /* we have handled the problem! */ + } + +finalize_it: + if(root != NULL) + fjson_object_put(root); + if(iRet != RS_RET_OK) { + STATSCOUNTER_INC(indexESFail, mutIndexESFail); + } + RETiRet; +} + +static void ATTR_NONNULL() +initializeBatch(wrkrInstanceData_t *pWrkrData) +{ + es_emptyStr(pWrkrData->batch.data); + pWrkrData->batch.nmemb = 0; +} + +static rsRetVal ATTR_NONNULL(1, 2) +curlPost(wrkrInstanceData_t *pWrkrData, uchar *message, int msglen, uchar **tpls, const int nmsgs) +{ + CURLcode code; + CURL *const curl = pWrkrData->curlPostHandle; + char errbuf[CURL_ERROR_SIZE] = ""; + DEFiRet; + + PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES); + + if ((pWrkrData->pData->rebindInterval > -1) && + (pWrkrData->nOperations > pWrkrData->pData->rebindInterval)) { + curl_easy_setopt(curl, CURLOPT_FRESH_CONNECT, 1); + pWrkrData->nOperations = 0; + STATSCOUNTER_INC(rebinds, mutRebinds); + } else { + /* by default, reuse existing connections */ + curl_easy_setopt(curl, CURLOPT_FRESH_CONNECT, 0); + } + if ((pWrkrData->pData->rebindInterval > -1) && + (pWrkrData->nOperations == pWrkrData->pData->rebindInterval)) { + curl_easy_setopt(curl, CURLOPT_FORBID_REUSE, 1); + } else { + curl_easy_setopt(curl, CURLOPT_FORBID_REUSE, 0); + } + + if(pWrkrData->pData->numServers > 1) { + /* needs to be called to support ES HA feature */ + CHKiRet(checkConn(pWrkrData)); + } + pWrkrData->replyLen = 0; + CHKiRet(setPostURL(pWrkrData, tpls)); + + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); + curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, errbuf); + code = curl_easy_perform(curl); + DBGPRINTF("curl returned %lld\n", (long long) code); + if (code != CURLE_OK && code != CURLE_HTTP_RETURNED_ERROR) { + STATSCOUNTER_INC(indexHTTPReqFail, mutIndexHTTPReqFail); + indexHTTPFail += nmsgs; + LogError(0, RS_RET_SUSPENDED, + "omelasticsearch: we are suspending ourselfs due " + "to server failure %lld: %s", (long long) code, errbuf); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + + if (pWrkrData->pData->rebindInterval > -1) + pWrkrData->nOperations++; + + if(pWrkrData->reply == NULL) { + DBGPRINTF("omelasticsearch: pWrkrData reply==NULL, replyLen = '%d'\n", + pWrkrData->replyLen); + } else { + DBGPRINTF("omelasticsearch: pWrkrData replyLen = '%d'\n", pWrkrData->replyLen); + if(pWrkrData->replyLen > 0) { + pWrkrData->reply[pWrkrData->replyLen] = '\0'; + /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */ + } + DBGPRINTF("omelasticsearch: pWrkrData reply: '%s'\n", pWrkrData->reply); + CHKiRet(checkResult(pWrkrData, message)); + } + +finalize_it: + incrementServerIndex(pWrkrData); + RETiRet; +} + +static rsRetVal +submitBatch(wrkrInstanceData_t *pWrkrData) +{ + char *cstr = NULL; + DEFiRet; + + cstr = es_str2cstr(pWrkrData->batch.data, NULL); + dbgprintf("omelasticsearch: submitBatch, batch: '%s'\n", cstr); + + CHKiRet(curlPost(pWrkrData, (uchar*) cstr, strlen(cstr), NULL, pWrkrData->batch.nmemb)); + +finalize_it: + free(cstr); + RETiRet; +} + +BEGINbeginTransaction +CODESTARTbeginTransaction + if(!pWrkrData->pData->bulkmode) { + FINALIZE; + } + + initializeBatch(pWrkrData); +finalize_it: +ENDbeginTransaction + +BEGINdoAction +CODESTARTdoAction + STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); + + if(pWrkrData->pData->bulkmode) { + const size_t nBytes = computeMessageSize(pWrkrData, ppString[0], ppString); + + /* If max bytes is set and this next message will put us over the limit, + * submit the current buffer and reset */ + if(pWrkrData->pData->maxbytes > 0 + && es_strlen(pWrkrData->batch.data) + nBytes > pWrkrData->pData->maxbytes ) { + dbgprintf("omelasticsearch: maxbytes limit reached, submitting partial " + "batch of %d elements.\n", pWrkrData->batch.nmemb); + CHKiRet(submitBatch(pWrkrData)); + initializeBatch(pWrkrData); + } + CHKiRet(buildBatch(pWrkrData, ppString[0], ppString)); + + /* If there is only one item in the batch, all previous items have been + * submitted or this is the first item for this transaction. Return previous + * committed so that all items leading up to the current (exclusive) + * are not replayed should a failure occur anywhere else in the transaction. */ + iRet = pWrkrData->batch.nmemb == 1 ? RS_RET_PREVIOUS_COMMITTED : RS_RET_DEFER_COMMIT; + } else { + CHKiRet(curlPost(pWrkrData, ppString[0], strlen((char*)ppString[0]), + ppString, 1)); + } +finalize_it: +ENDdoAction + + +BEGINendTransaction +CODESTARTendTransaction + /* End Transaction only if batch data is not empty */ + if (pWrkrData->batch.data != NULL && pWrkrData->batch.nmemb > 0) { + CHKiRet(submitBatch(pWrkrData)); + } else { + dbgprintf("omelasticsearch: endTransaction, pWrkrData->batch.data is NULL, " + "nothing to send. \n"); + } +finalize_it: +ENDendTransaction + +static rsRetVal +computeAuthHeader(char* uid, char* pwd, uchar** authBuf) { + int r; + DEFiRet; + + es_str_t* auth = es_newStr(1024); + if (auth == NULL) { + LogError(0, RS_RET_OUT_OF_MEMORY, + "omelasticsearch: failed to allocate es_str auth for auth header construction"); + ABORT_FINALIZE(RS_RET_ERR); + } + + r = es_addBuf(&auth, uid, strlen(uid)); + if(r == 0) r = es_addChar(&auth, ':'); + if(r == 0 && pwd != NULL) r = es_addBuf(&auth, pwd, strlen(pwd)); + if(r == 0) *authBuf = (uchar*) es_str2cstr(auth, NULL); + + if (r != 0 || *authBuf == NULL) { + LogError(0, RS_RET_ERR, "omelasticsearch: failed to build auth header\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + +finalize_it: + if (auth != NULL) + es_deleteStr(auth); + RETiRet; +} + +static void ATTR_NONNULL() +curlSetupCommon(wrkrInstanceData_t *const pWrkrData, CURL *const handle) +{ + PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, pWrkrData->curlHeader); + curl_easy_setopt(handle, CURLOPT_NOSIGNAL, TRUE); + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); + curl_easy_setopt(handle, CURLOPT_WRITEDATA, pWrkrData); + if(pWrkrData->pData->allowUnsignedCerts) + curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, FALSE); + if(pWrkrData->pData->skipVerifyHost) + curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, FALSE); + if(pWrkrData->pData->authBuf != NULL) { + curl_easy_setopt(handle, CURLOPT_USERPWD, pWrkrData->pData->authBuf); + curl_easy_setopt(handle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); + } + if(pWrkrData->pData->caCertFile) + curl_easy_setopt(handle, CURLOPT_CAINFO, pWrkrData->pData->caCertFile); + if(pWrkrData->pData->myCertFile) + curl_easy_setopt(handle, CURLOPT_SSLCERT, pWrkrData->pData->myCertFile); + if(pWrkrData->pData->myPrivKeyFile) + curl_easy_setopt(handle, CURLOPT_SSLKEY, pWrkrData->pData->myPrivKeyFile); + /* uncomment for in-dept debuggung: + curl_easy_setopt(handle, CURLOPT_VERBOSE, TRUE); */ +} + +static void ATTR_NONNULL() +curlCheckConnSetup(wrkrInstanceData_t *const pWrkrData) +{ + PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES); + curlSetupCommon(pWrkrData, pWrkrData->curlCheckConnHandle); + curl_easy_setopt(pWrkrData->curlCheckConnHandle, + CURLOPT_TIMEOUT_MS, pWrkrData->pData->healthCheckTimeout); +} + +static void ATTR_NONNULL(1) +curlPostSetup(wrkrInstanceData_t *const pWrkrData) +{ + PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES); + curlSetupCommon(pWrkrData, pWrkrData->curlPostHandle); + curl_easy_setopt(pWrkrData->curlPostHandle, CURLOPT_POST, 1); + curl_easy_setopt(pWrkrData->curlPostHandle, + CURLOPT_TIMEOUT_MS, pWrkrData->pData->indexTimeout); +} + +#define CONTENT_JSON "Content-Type: application/json; charset=utf-8" + +static rsRetVal ATTR_NONNULL() +curlSetup(wrkrInstanceData_t *const pWrkrData) +{ + DEFiRet; + pWrkrData->curlHeader = curl_slist_append(NULL, CONTENT_JSON); + CHKmalloc(pWrkrData->curlPostHandle = curl_easy_init());; + curlPostSetup(pWrkrData); + + CHKmalloc(pWrkrData->curlCheckConnHandle = curl_easy_init()); + curlCheckConnSetup(pWrkrData); + +finalize_it: + if(iRet != RS_RET_OK && pWrkrData->curlPostHandle != NULL) { + curl_easy_cleanup(pWrkrData->curlPostHandle); + pWrkrData->curlPostHandle = NULL; + } + RETiRet; +} + +static void ATTR_NONNULL() +setInstParamDefaults(instanceData *const pData) +{ + pData->serverBaseUrls = NULL; + pData->defaultPort = 9200; + pData->healthCheckTimeout = 3500; + pData->indexTimeout = 0; + pData->uid = NULL; + pData->pwd = NULL; + pData->authBuf = NULL; + pData->searchIndex = NULL; + pData->searchType = NULL; + pData->pipelineName = NULL; + pData->dynPipelineName = 0; + pData->skipPipelineIfEmpty = 0; + pData->parent = NULL; + pData->timeout = NULL; + pData->dynSrchIdx = 0; + pData->dynSrchType = 0; + pData->dynParent = 0; + pData->useHttps = 0; + pData->bulkmode = 0; + pData->maxbytes = 104857600; //100 MB Is the default max message size that ships with ElasticSearch + pData->allowUnsignedCerts = 0; + pData->skipVerifyHost = 0; + pData->tplName = NULL; + pData->errorFile = NULL; + pData->errorOnly=0; + pData->interleaved=0; + pData->dynBulkId= 0; + pData->bulkId = NULL; + pData->caCertFile = NULL; + pData->myCertFile = NULL; + pData->myPrivKeyFile = NULL; + pData->writeOperation = ES_WRITE_INDEX; + pData->retryFailures = 0; + pData->ratelimitBurst = 20000; + pData->ratelimitInterval = 600; + pData->ratelimiter = NULL; + pData->retryRulesetName = NULL; + pData->retryRuleset = NULL; + pData->rebindInterval = DEFAULT_REBIND_INTERVAL; +} + +BEGINnewActInst + struct cnfparamvals *pvals; + char* serverParam = NULL; + struct cnfarray* servers = NULL; + int i; + int iNumTpls; + FILE *fp; + char errStr[1024]; +CODESTARTnewActInst + if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); + + for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "server")) { + servers = pvals[i].val.d.ar; + } else if(!strcmp(actpblk.descr[i].name, "errorfile")) { + pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "erroronly")) { + pData->errorOnly = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "interleaved")) { + pData->interleaved = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "serverport")) { + pData->defaultPort = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "healthchecktimeout")) { + pData->healthCheckTimeout = (long) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "indextimeout")) { + pData->indexTimeout = (long) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "uid")) { + pData->uid = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "pwd")) { + pData->pwd = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "searchindex")) { + pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "searchtype")) { + pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "pipelinename")) { + pData->pipelineName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynpipelinename")) { + pData->dynPipelineName = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "skippipelineifempty")) { + pData->skipPipelineIfEmpty = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "parent")) { + pData->parent = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) { + pData->dynSrchIdx = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) { + pData->dynSrchType = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynparent")) { + pData->dynParent = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "bulkmode")) { + pData->bulkmode = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "maxbytes")) { + pData->maxbytes = (size_t) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "allowunsignedcerts")) { + pData->allowUnsignedCerts = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "skipverifyhost")) { + pData->skipVerifyHost = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "timeout")) { + pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "usehttps")) { + pData->useHttps = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynbulkid")) { + pData->dynBulkId = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "bulkid")) { + pData->bulkId = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "tls.cacert")) { + pData->caCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)pData->caCertFile, "r"); + if(fp == NULL) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + LogError(0, RS_RET_NO_FILE_ACCESS, + "error: 'tls.cacert' file %s couldn't be accessed: %s\n", + pData->caCertFile, errStr); + } else { + fclose(fp); + } + } else if(!strcmp(actpblk.descr[i].name, "tls.mycert")) { + pData->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)pData->myCertFile, "r"); + if(fp == NULL) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + LogError(0, RS_RET_NO_FILE_ACCESS, + "error: 'tls.mycert' file %s couldn't be accessed: %s\n", + pData->myCertFile, errStr); + } else { + fclose(fp); + } + } else if(!strcmp(actpblk.descr[i].name, "tls.myprivkey")) { + pData->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)pData->myPrivKeyFile, "r"); + if(fp == NULL) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + LogError(0, RS_RET_NO_FILE_ACCESS, + "error: 'tls.myprivkey' file %s couldn't be accessed: %s\n", + pData->myPrivKeyFile, errStr); + } else { + fclose(fp); + } + } else if(!strcmp(actpblk.descr[i].name, "writeoperation")) { + char *writeop = es_str2cstr(pvals[i].val.d.estr, NULL); + if (writeop && !strcmp(writeop, "create")) { + pData->writeOperation = ES_WRITE_CREATE; + } else if (writeop && !strcmp(writeop, "index")) { + pData->writeOperation = ES_WRITE_INDEX; + } else if (writeop) { + LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: invalid value '%s' for writeoperation: " + "must be one of 'index' or 'create' - using default value 'index'", writeop); + pData->writeOperation = ES_WRITE_INDEX; + } + free(writeop); + } else if(!strcmp(actpblk.descr[i].name, "retryfailures")) { + pData->retryFailures = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "ratelimit.burst")) { + pData->ratelimitBurst = (unsigned int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "ratelimit.interval")) { + pData->ratelimitInterval = (unsigned int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "retryruleset")) { + pData->retryRulesetName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "rebindinterval")) { + pData->rebindInterval = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "esversion.major")) { + pData->esVersion = pvals[i].val.d.n; + } else { + LogError(0, RS_RET_INTERNAL_ERROR, "omelasticsearch: program error, " + "non-handled param '%s'", actpblk.descr[i].name); + } + } + + if(pData->pwd != NULL && pData->uid == NULL) { + LogError(0, RS_RET_UID_MISSING, + "omelasticsearch: password is provided, but no uid " + "- action definition invalid"); + ABORT_FINALIZE(RS_RET_UID_MISSING); + } + if(pData->dynSrchIdx && pData->searchIndex == NULL) { + LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic search index, but no " + "name for index template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + if(pData->dynSrchType && pData->searchType == NULL) { + LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic search type, but no " + "name for type template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + if(pData->dynParent && pData->parent == NULL) { + LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic parent, but no " + "name for parent template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + if(pData->dynBulkId && pData->bulkId == NULL) { + LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic bulkid, but no " + "name for bulkid template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + if(pData->dynPipelineName && pData->pipelineName == NULL) { + LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic pipeline name, but no " + "name for pipelineName template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + + if (pData->uid != NULL) + CHKiRet(computeAuthHeader((char*) pData->uid, (char*) pData->pwd, &pData->authBuf)); + + iNumTpls = 1; + if(pData->dynSrchIdx) ++iNumTpls; + if(pData->dynSrchType) ++iNumTpls; + if(pData->dynParent) ++iNumTpls; + if(pData->dynBulkId) ++iNumTpls; + if(pData->dynPipelineName) ++iNumTpls; + DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); + CODE_STD_STRING_REQUESTnewActInst(iNumTpls) + + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? + " StdJSONFmt" : (char*)pData->tplName), + OMSR_NO_RQD_TPL_OPTS)); + + + /* we need to request additional templates. If we have a dynamic search index, + * it will always be string 1. Type may be 1 or 2, depending on whether search + * index is dynamic as well. Rule needs to be followed throughout the module. + */ + iNumTpls = 1; + if(pData->dynSrchIdx) { + CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->searchIndex), + OMSR_NO_RQD_TPL_OPTS)); + ++iNumTpls; + } + if(pData->dynSrchType) { + CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->searchType), + OMSR_NO_RQD_TPL_OPTS)); + ++iNumTpls; + } + if(pData->dynParent) { + CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->parent), + OMSR_NO_RQD_TPL_OPTS)); + ++iNumTpls; + } + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + ++iNumTpls; + } + if(pData->dynPipelineName) { + CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->pipelineName), + OMSR_NO_RQD_TPL_OPTS)); + ++iNumTpls; + } + + + if (servers != NULL) { + pData->numServers = servers->nmemb; + pData->serverBaseUrls = malloc(servers->nmemb * sizeof(uchar*)); + if (pData->serverBaseUrls == NULL) { + LogError(0, RS_RET_ERR, "omelasticsearch: unable to allocate buffer " + "for ElasticSearch server configuration."); + ABORT_FINALIZE(RS_RET_ERR); + } + + for(i = 0 ; i < servers->nmemb ; ++i) { + serverParam = es_str2cstr(servers->arr[i], NULL); + if (serverParam == NULL) { + LogError(0, RS_RET_ERR, "omelasticsearch: unable to allocate buffer " + "for ElasticSearch server configuration."); + ABORT_FINALIZE(RS_RET_ERR); + } + /* Remove a trailing slash if it exists */ + const size_t serverParamLastChar = strlen(serverParam)-1; + if (serverParam[serverParamLastChar] == '/') { + serverParam[serverParamLastChar] = '\0'; + } + CHKiRet(computeBaseUrl(serverParam, pData->defaultPort, pData->useHttps, + pData->serverBaseUrls + i)); + free(serverParam); + serverParam = NULL; + } + } else { + LogMsg(0, RS_RET_OK, LOG_WARNING, + "omelasticsearch: No servers specified, using localhost"); + pData->numServers = 1; + pData->serverBaseUrls = malloc(sizeof(uchar*)); + if (pData->serverBaseUrls == NULL) { + LogError(0, RS_RET_ERR, "omelasticsearch: unable to allocate buffer " + "for ElasticSearch server configuration."); + ABORT_FINALIZE(RS_RET_ERR); + } + CHKiRet(computeBaseUrl("localhost", pData->defaultPort, pData->useHttps, pData->serverBaseUrls)); + } + + if(pData->esVersion < 8) { + if(pData->searchIndex == NULL) + pData->searchIndex = (uchar*) strdup("system"); + if(pData->searchType == NULL) + pData->searchType = (uchar*) strdup("events"); + + if ((pData->writeOperation != ES_WRITE_INDEX) && (pData->bulkId == NULL)) { + LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + } + + if (pData->retryFailures) { + CHKiRet(ratelimitNew(&pData->ratelimiter, "omelasticsearch", NULL)); + ratelimitSetLinuxLike(pData->ratelimiter, pData->ratelimitInterval, pData->ratelimitBurst); + ratelimitSetNoTimeCache(pData->ratelimiter); + } + + /* node created, let's add to list of instance configs for the module */ + if(loadModConf->tail == NULL) { + loadModConf->tail = loadModConf->root = pData; + } else { + loadModConf->tail->next = pData; + loadModConf->tail = pData; + } + +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); + if (serverParam) + free(serverParam); +ENDnewActInst + + +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; + pModConf->root = pModConf->tail = NULL; +ENDbeginCnfLoad + + +BEGINendCnfLoad +CODESTARTendCnfLoad + loadModConf = NULL; /* done loading */ +ENDendCnfLoad + + +BEGINcheckCnf + instanceConf_t *inst; +CODESTARTcheckCnf + for(inst = pModConf->root ; inst != NULL ; inst = inst->next) { + ruleset_t *pRuleset; + rsRetVal localRet; + + if (inst->retryRulesetName) { + localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, inst->retryRulesetName); + if(localRet == RS_RET_NOT_FOUND) { + LogError(0, localRet, "omelasticsearch: retryruleset '%s' not found - " + "no retry ruleset will be used", inst->retryRulesetName); + } else { + inst->retryRuleset = pRuleset; + } + } + } +ENDcheckCnf + + +BEGINactivateCnf +CODESTARTactivateCnf +ENDactivateCnf + + +BEGINfreeCnf +CODESTARTfreeCnf +ENDfreeCnf + + +BEGINdoHUP +CODESTARTdoHUP + pthread_mutex_lock(&pData->mutErrFile); + if(pData->fdErrFile != -1) { + close(pData->fdErrFile); + pData->fdErrFile = -1; + } + pthread_mutex_unlock(&pData->mutErrFile); +ENDdoHUP + + +BEGINmodExit +CODESTARTmodExit + if(pInputName != NULL) + prop.Destruct(&pInputName); + curl_global_cleanup(); + statsobj.Destruct(&indexStats); + objRelease(statsobj, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); + objRelease(ruleset, CORE_COMPONENT); +ENDmodExit + +NO_LEGACY_CONF_parseSelectorAct + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_doHUP +CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ +CODEqueryEtryPt_STD_CONF2_QUERIES +ENDqueryEtryPt + + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(ruleset, CORE_COMPONENT)); + + if (curl_global_init(CURL_GLOBAL_ALL) != 0) { + LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled"); + ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); + } + + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&indexStats)); + CHKiRet(statsobj.SetName(indexStats, (uchar *)"omelasticsearch")); + CHKiRet(statsobj.SetOrigin(indexStats, (uchar *)"omelasticsearch")); + STATSCOUNTER_INIT(indexSubmit, mutIndexSubmit); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submitted", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSubmit)); + STATSCOUNTER_INIT(indexHTTPFail, mutIndexHTTPFail); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.http", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexHTTPFail)); + STATSCOUNTER_INIT(indexHTTPReqFail, mutIndexHTTPReqFail); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.httprequests", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexHTTPReqFail)); + STATSCOUNTER_INIT(checkConnFail, mutCheckConnFail); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.checkConn", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &checkConnFail)); + STATSCOUNTER_INIT(indexESFail, mutIndexESFail); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexESFail)); + STATSCOUNTER_INIT(indexSuccess, mutIndexSuccess); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.success", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSuccess)); + STATSCOUNTER_INIT(indexBadResponse, mutIndexBadResponse); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bad", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadResponse)); + STATSCOUNTER_INIT(indexDuplicate, mutIndexDuplicate); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.duplicate", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexDuplicate)); + STATSCOUNTER_INIT(indexBadArgument, mutIndexBadArgument); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.badargument", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadArgument)); + STATSCOUNTER_INIT(indexBulkRejection, mutIndexBulkRejection); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bulkrejection", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBulkRejection)); + STATSCOUNTER_INIT(indexOtherResponse, mutIndexOtherResponse); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.other", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexOtherResponse)); + STATSCOUNTER_INIT(rebinds, mutRebinds); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"rebinds", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &rebinds)); + CHKiRet(statsobj.ConstructFinalize(indexStats)); + CHKiRet(prop.Construct(&pInputName)); + CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("omelasticsearch"), sizeof("omelasticsearch") - 1)); + CHKiRet(prop.ConstructFinalize(pInputName)); +ENDmodInit + +/* vi:set ai: + */ |