summaryrefslogtreecommitdiffstats
path: root/contrib/mmkubernetes
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/mmkubernetes')
-rw-r--r--contrib/mmkubernetes/Makefile.am8
-rw-r--r--contrib/mmkubernetes/Makefile.in800
-rw-r--r--contrib/mmkubernetes/k8s_container_name.rulebase3
-rw-r--r--contrib/mmkubernetes/k8s_filename.rulebase2
-rw-r--r--contrib/mmkubernetes/mmkubernetes.c2085
5 files changed, 2898 insertions, 0 deletions
diff --git a/contrib/mmkubernetes/Makefile.am b/contrib/mmkubernetes/Makefile.am
new file mode 100644
index 0000000..88dca48
--- /dev/null
+++ b/contrib/mmkubernetes/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = mmkubernetes.la
+
+mmkubernetes_la_SOURCES = mmkubernetes.c
+mmkubernetes_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CURL_CFLAGS) $(LIBLOGNORM_CFLAGS)
+mmkubernetes_la_LDFLAGS = -module -avoid-version
+mmkubernetes_la_LIBADD = $(CURL_LIBS) $(LIBLOGNORM_LIBS)
+
+EXTRA_DIST = k8s_filename.rulebase k8s_container_name.rulebase
diff --git a/contrib/mmkubernetes/Makefile.in b/contrib/mmkubernetes/Makefile.in
new file mode 100644
index 0000000..983a32f
--- /dev/null
+++ b/contrib/mmkubernetes/Makefile.in
@@ -0,0 +1,800 @@
+# Makefile.in generated by automake 1.16.1 from Makefile.am.
+# @configure_input@
+
+# Copyright (C) 1994-2018 Free Software Foundation, Inc.
+
+# This Makefile.in is free software; the Free Software Foundation
+# gives unlimited permission to copy and/or distribute it,
+# with or without modifications, as long as this notice is preserved.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY, to the extent permitted by law; without
+# even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+# PARTICULAR PURPOSE.
+
+@SET_MAKE@
+
+VPATH = @srcdir@
+am__is_gnu_make = { \
+ if test -z '$(MAKELEVEL)'; then \
+ false; \
+ elif test -n '$(MAKE_HOST)'; then \
+ true; \
+ elif test -n '$(MAKE_VERSION)' && test -n '$(CURDIR)'; then \
+ true; \
+ else \
+ false; \
+ fi; \
+}
+am__make_running_with_option = \
+ case $${target_option-} in \
+ ?) ;; \
+ *) echo "am__make_running_with_option: internal error: invalid" \
+ "target option '$${target_option-}' specified" >&2; \
+ exit 1;; \
+ esac; \
+ has_opt=no; \
+ sane_makeflags=$$MAKEFLAGS; \
+ if $(am__is_gnu_make); then \
+ sane_makeflags=$$MFLAGS; \
+ else \
+ case $$MAKEFLAGS in \
+ *\\[\ \ ]*) \
+ bs=\\; \
+ sane_makeflags=`printf '%s\n' "$$MAKEFLAGS" \
+ | sed "s/$$bs$$bs[$$bs $$bs ]*//g"`;; \
+ esac; \
+ fi; \
+ skip_next=no; \
+ strip_trailopt () \
+ { \
+ flg=`printf '%s\n' "$$flg" | sed "s/$$1.*$$//"`; \
+ }; \
+ for flg in $$sane_makeflags; do \
+ test $$skip_next = yes && { skip_next=no; continue; }; \
+ case $$flg in \
+ *=*|--*) continue;; \
+ -*I) strip_trailopt 'I'; skip_next=yes;; \
+ -*I?*) strip_trailopt 'I';; \
+ -*O) strip_trailopt 'O'; skip_next=yes;; \
+ -*O?*) strip_trailopt 'O';; \
+ -*l) strip_trailopt 'l'; skip_next=yes;; \
+ -*l?*) strip_trailopt 'l';; \
+ -[dEDm]) skip_next=yes;; \
+ -[JT]) skip_next=yes;; \
+ esac; \
+ case $$flg in \
+ *$$target_option*) has_opt=yes; break;; \
+ esac; \
+ done; \
+ test $$has_opt = yes
+am__make_dryrun = (target_option=n; $(am__make_running_with_option))
+am__make_keepgoing = (target_option=k; $(am__make_running_with_option))
+pkgdatadir = $(datadir)/@PACKAGE@
+pkgincludedir = $(includedir)/@PACKAGE@
+pkglibdir = $(libdir)/@PACKAGE@
+pkglibexecdir = $(libexecdir)/@PACKAGE@
+am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd
+install_sh_DATA = $(install_sh) -c -m 644
+install_sh_PROGRAM = $(install_sh) -c
+install_sh_SCRIPT = $(install_sh) -c
+INSTALL_HEADER = $(INSTALL_DATA)
+transform = $(program_transform_name)
+NORMAL_INSTALL = :
+PRE_INSTALL = :
+POST_INSTALL = :
+NORMAL_UNINSTALL = :
+PRE_UNINSTALL = :
+POST_UNINSTALL = :
+build_triplet = @build@
+host_triplet = @host@
+subdir = contrib/mmkubernetes
+ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
+am__aclocal_m4_deps = $(top_srcdir)/m4/ac_check_define.m4 \
+ $(top_srcdir)/m4/atomic_operations.m4 \
+ $(top_srcdir)/m4/atomic_operations_64bit.m4 \
+ $(top_srcdir)/m4/libtool.m4 $(top_srcdir)/m4/ltoptions.m4 \
+ $(top_srcdir)/m4/ltsugar.m4 $(top_srcdir)/m4/ltversion.m4 \
+ $(top_srcdir)/m4/lt~obsolete.m4 $(top_srcdir)/configure.ac
+am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \
+ $(ACLOCAL_M4)
+DIST_COMMON = $(srcdir)/Makefile.am $(am__DIST_COMMON)
+mkinstalldirs = $(install_sh) -d
+CONFIG_HEADER = $(top_builddir)/config.h
+CONFIG_CLEAN_FILES =
+CONFIG_CLEAN_VPATH_FILES =
+am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`;
+am__vpath_adj = case $$p in \
+ $(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \
+ *) f=$$p;; \
+ esac;
+am__strip_dir = f=`echo $$p | sed -e 's|^.*/||'`;
+am__install_max = 40
+am__nobase_strip_setup = \
+ srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*|]/\\\\&/g'`
+am__nobase_strip = \
+ for p in $$list; do echo "$$p"; done | sed -e "s|$$srcdirstrip/||"
+am__nobase_list = $(am__nobase_strip_setup); \
+ for p in $$list; do echo "$$p $$p"; done | \
+ sed "s| $$srcdirstrip/| |;"' / .*\//!s/ .*/ ./; s,\( .*\)/[^/]*$$,\1,' | \
+ $(AWK) 'BEGIN { files["."] = "" } { files[$$2] = files[$$2] " " $$1; \
+ if (++n[$$2] == $(am__install_max)) \
+ { print $$2, files[$$2]; n[$$2] = 0; files[$$2] = "" } } \
+ END { for (dir in files) print dir, files[dir] }'
+am__base_list = \
+ sed '$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;s/\n/ /g' | \
+ sed '$$!N;$$!N;$$!N;$$!N;s/\n/ /g'
+am__uninstall_files_from_dir = { \
+ test -z "$$files" \
+ || { test ! -d "$$dir" && test ! -f "$$dir" && test ! -r "$$dir"; } \
+ || { echo " ( cd '$$dir' && rm -f" $$files ")"; \
+ $(am__cd) "$$dir" && rm -f $$files; }; \
+ }
+am__installdirs = "$(DESTDIR)$(pkglibdir)"
+LTLIBRARIES = $(pkglib_LTLIBRARIES)
+am__DEPENDENCIES_1 =
+mmkubernetes_la_DEPENDENCIES = $(am__DEPENDENCIES_1) \
+ $(am__DEPENDENCIES_1)
+am_mmkubernetes_la_OBJECTS = mmkubernetes_la-mmkubernetes.lo
+mmkubernetes_la_OBJECTS = $(am_mmkubernetes_la_OBJECTS)
+AM_V_lt = $(am__v_lt_@AM_V@)
+am__v_lt_ = $(am__v_lt_@AM_DEFAULT_V@)
+am__v_lt_0 = --silent
+am__v_lt_1 =
+mmkubernetes_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC \
+ $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=link $(CCLD) \
+ $(AM_CFLAGS) $(CFLAGS) $(mmkubernetes_la_LDFLAGS) $(LDFLAGS) \
+ -o $@
+AM_V_P = $(am__v_P_@AM_V@)
+am__v_P_ = $(am__v_P_@AM_DEFAULT_V@)
+am__v_P_0 = false
+am__v_P_1 = :
+AM_V_GEN = $(am__v_GEN_@AM_V@)
+am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@)
+am__v_GEN_0 = @echo " GEN " $@;
+am__v_GEN_1 =
+AM_V_at = $(am__v_at_@AM_V@)
+am__v_at_ = $(am__v_at_@AM_DEFAULT_V@)
+am__v_at_0 = @
+am__v_at_1 =
+DEFAULT_INCLUDES = -I.@am__isrc@ -I$(top_builddir)
+depcomp = $(SHELL) $(top_srcdir)/depcomp
+am__maybe_remake_depfiles = depfiles
+am__depfiles_remade = ./$(DEPDIR)/mmkubernetes_la-mmkubernetes.Plo
+am__mv = mv -f
+COMPILE = $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) \
+ $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS)
+LTCOMPILE = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \
+ $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) \
+ $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) \
+ $(AM_CFLAGS) $(CFLAGS)
+AM_V_CC = $(am__v_CC_@AM_V@)
+am__v_CC_ = $(am__v_CC_@AM_DEFAULT_V@)
+am__v_CC_0 = @echo " CC " $@;
+am__v_CC_1 =
+CCLD = $(CC)
+LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \
+ $(LIBTOOLFLAGS) --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \
+ $(AM_LDFLAGS) $(LDFLAGS) -o $@
+AM_V_CCLD = $(am__v_CCLD_@AM_V@)
+am__v_CCLD_ = $(am__v_CCLD_@AM_DEFAULT_V@)
+am__v_CCLD_0 = @echo " CCLD " $@;
+am__v_CCLD_1 =
+SOURCES = $(mmkubernetes_la_SOURCES)
+DIST_SOURCES = $(mmkubernetes_la_SOURCES)
+am__can_run_installinfo = \
+ case $$AM_UPDATE_INFO_DIR in \
+ n|no|NO) false;; \
+ *) (install-info --version) >/dev/null 2>&1;; \
+ esac
+am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP)
+# Read a list of newline-separated strings from the standard input,
+# and print each of them once, without duplicates. Input order is
+# *not* preserved.
+am__uniquify_input = $(AWK) '\
+ BEGIN { nonempty = 0; } \
+ { items[$$0] = 1; nonempty = 1; } \
+ END { if (nonempty) { for (i in items) print i; }; } \
+'
+# Make sure the list of sources is unique. This is necessary because,
+# e.g., the same source file might be shared among _SOURCES variables
+# for different programs/libraries.
+am__define_uniq_tagged_files = \
+ list='$(am__tagged_files)'; \
+ unique=`for i in $$list; do \
+ if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \
+ done | $(am__uniquify_input)`
+ETAGS = etags
+CTAGS = ctags
+am__DIST_COMMON = $(srcdir)/Makefile.in $(top_srcdir)/depcomp
+DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST)
+ACLOCAL = @ACLOCAL@
+AMTAR = @AMTAR@
+AM_DEFAULT_VERBOSITY = @AM_DEFAULT_VERBOSITY@
+APU_CFLAGS = @APU_CFLAGS@
+APU_LIBS = @APU_LIBS@
+AR = @AR@
+AUTOCONF = @AUTOCONF@
+AUTOHEADER = @AUTOHEADER@
+AUTOMAKE = @AUTOMAKE@
+AWK = @AWK@
+CC = @CC@
+CCDEPMODE = @CCDEPMODE@
+CFLAGS = @CFLAGS@
+CIVETWEB_LIBS = @CIVETWEB_LIBS@
+CONF_FILE_PATH = @CONF_FILE_PATH@
+CPP = @CPP@
+CPPFLAGS = @CPPFLAGS@
+CURL_CFLAGS = @CURL_CFLAGS@
+CURL_LIBS = @CURL_LIBS@
+CYGPATH_W = @CYGPATH_W@
+CZMQ_CFLAGS = @CZMQ_CFLAGS@
+CZMQ_LIBS = @CZMQ_LIBS@
+DEFS = @DEFS@
+DEPDIR = @DEPDIR@
+DLLTOOL = @DLLTOOL@
+DL_LIBS = @DL_LIBS@
+DSYMUTIL = @DSYMUTIL@
+DUMPBIN = @DUMPBIN@
+ECHO_C = @ECHO_C@
+ECHO_N = @ECHO_N@
+ECHO_T = @ECHO_T@
+EGREP = @EGREP@
+EXEEXT = @EXEEXT@
+FAUP_LIBS = @FAUP_LIBS@
+FGREP = @FGREP@
+GLIB_CFLAGS = @GLIB_CFLAGS@
+GLIB_LIBS = @GLIB_LIBS@
+GNUTLS_CFLAGS = @GNUTLS_CFLAGS@
+GNUTLS_LIBS = @GNUTLS_LIBS@
+GREP = @GREP@
+GSS_LIBS = @GSS_LIBS@
+GT_KSI_LS12_CFLAGS = @GT_KSI_LS12_CFLAGS@
+GT_KSI_LS12_LIBS = @GT_KSI_LS12_LIBS@
+HASH_XXHASH_LIBS = @HASH_XXHASH_LIBS@
+HIREDIS_CFLAGS = @HIREDIS_CFLAGS@
+HIREDIS_LIBS = @HIREDIS_LIBS@
+IMUDP_LIBS = @IMUDP_LIBS@
+INSTALL = @INSTALL@
+INSTALL_DATA = @INSTALL_DATA@
+INSTALL_PROGRAM = @INSTALL_PROGRAM@
+INSTALL_SCRIPT = @INSTALL_SCRIPT@
+INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@
+IP = @IP@
+JAVA = @JAVA@
+JAVAC = @JAVAC@
+LD = @LD@
+LDFLAGS = @LDFLAGS@
+LEX = @LEX@
+LEXLIB = @LEXLIB@
+LEX_OUTPUT_ROOT = @LEX_OUTPUT_ROOT@
+LIBCAPNG_CFLAGS = @LIBCAPNG_CFLAGS@
+LIBCAPNG_LIBS = @LIBCAPNG_LIBS@
+LIBCAPNG_PRESENT_CFLAGS = @LIBCAPNG_PRESENT_CFLAGS@
+LIBCAPNG_PRESENT_LIBS = @LIBCAPNG_PRESENT_LIBS@
+LIBDBI_CFLAGS = @LIBDBI_CFLAGS@
+LIBDBI_LIBS = @LIBDBI_LIBS@
+LIBESTR_CFLAGS = @LIBESTR_CFLAGS@
+LIBESTR_LIBS = @LIBESTR_LIBS@
+LIBEVENT_CFLAGS = @LIBEVENT_CFLAGS@
+LIBEVENT_LIBS = @LIBEVENT_LIBS@
+LIBFASTJSON_CFLAGS = @LIBFASTJSON_CFLAGS@
+LIBFASTJSON_LIBS = @LIBFASTJSON_LIBS@
+LIBGCRYPT_CFLAGS = @LIBGCRYPT_CFLAGS@
+LIBGCRYPT_CONFIG = @LIBGCRYPT_CONFIG@
+LIBGCRYPT_LIBS = @LIBGCRYPT_LIBS@
+LIBLOGGING_CFLAGS = @LIBLOGGING_CFLAGS@
+LIBLOGGING_LIBS = @LIBLOGGING_LIBS@
+LIBLOGGING_STDLOG_CFLAGS = @LIBLOGGING_STDLOG_CFLAGS@
+LIBLOGGING_STDLOG_LIBS = @LIBLOGGING_STDLOG_LIBS@
+LIBLOGNORM_CFLAGS = @LIBLOGNORM_CFLAGS@
+LIBLOGNORM_LIBS = @LIBLOGNORM_LIBS@
+LIBLZ4_CFLAGS = @LIBLZ4_CFLAGS@
+LIBLZ4_LIBS = @LIBLZ4_LIBS@
+LIBM = @LIBM@
+LIBMONGOC_CFLAGS = @LIBMONGOC_CFLAGS@
+LIBMONGOC_LIBS = @LIBMONGOC_LIBS@
+LIBOBJS = @LIBOBJS@
+LIBRDKAFKA_CFLAGS = @LIBRDKAFKA_CFLAGS@
+LIBRDKAFKA_LIBS = @LIBRDKAFKA_LIBS@
+LIBS = @LIBS@
+LIBSYSTEMD_CFLAGS = @LIBSYSTEMD_CFLAGS@
+LIBSYSTEMD_JOURNAL_CFLAGS = @LIBSYSTEMD_JOURNAL_CFLAGS@
+LIBSYSTEMD_JOURNAL_LIBS = @LIBSYSTEMD_JOURNAL_LIBS@
+LIBSYSTEMD_LIBS = @LIBSYSTEMD_LIBS@
+LIBTOOL = @LIBTOOL@
+LIBUUID_CFLAGS = @LIBUUID_CFLAGS@
+LIBUUID_LIBS = @LIBUUID_LIBS@
+LIPO = @LIPO@
+LN_S = @LN_S@
+LTLIBOBJS = @LTLIBOBJS@
+LT_SYS_LIBRARY_PATH = @LT_SYS_LIBRARY_PATH@
+MAKEINFO = @MAKEINFO@
+MANIFEST_TOOL = @MANIFEST_TOOL@
+MKDIR_P = @MKDIR_P@
+MYSQL_CFLAGS = @MYSQL_CFLAGS@
+MYSQL_CONFIG = @MYSQL_CONFIG@
+MYSQL_LIBS = @MYSQL_LIBS@
+NM = @NM@
+NMEDIT = @NMEDIT@
+OBJDUMP = @OBJDUMP@
+OBJEXT = @OBJEXT@
+OPENSSL_CFLAGS = @OPENSSL_CFLAGS@
+OPENSSL_LIBS = @OPENSSL_LIBS@
+OTOOL = @OTOOL@
+OTOOL64 = @OTOOL64@
+PACKAGE = @PACKAGE@
+PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@
+PACKAGE_NAME = @PACKAGE_NAME@
+PACKAGE_STRING = @PACKAGE_STRING@
+PACKAGE_TARNAME = @PACKAGE_TARNAME@
+PACKAGE_URL = @PACKAGE_URL@
+PACKAGE_VERSION = @PACKAGE_VERSION@
+PATH_SEPARATOR = @PATH_SEPARATOR@
+PGSQL_CFLAGS = @PGSQL_CFLAGS@
+PGSQL_LIBS = @PGSQL_LIBS@
+PG_CONFIG = @PG_CONFIG@
+PID_FILE_PATH = @PID_FILE_PATH@
+PKG_CONFIG = @PKG_CONFIG@
+PKG_CONFIG_LIBDIR = @PKG_CONFIG_LIBDIR@
+PKG_CONFIG_PATH = @PKG_CONFIG_PATH@
+PROTON_CFLAGS = @PROTON_CFLAGS@
+PROTON_LIBS = @PROTON_LIBS@
+PROTON_PROACTOR_CFLAGS = @PROTON_PROACTOR_CFLAGS@
+PROTON_PROACTOR_LIBS = @PROTON_PROACTOR_LIBS@
+PTHREADS_CFLAGS = @PTHREADS_CFLAGS@
+PTHREADS_LIBS = @PTHREADS_LIBS@
+PYTHON = @PYTHON@
+PYTHON_EXEC_PREFIX = @PYTHON_EXEC_PREFIX@
+PYTHON_PLATFORM = @PYTHON_PLATFORM@
+PYTHON_PREFIX = @PYTHON_PREFIX@
+PYTHON_VERSION = @PYTHON_VERSION@
+RABBITMQ_CFLAGS = @RABBITMQ_CFLAGS@
+RABBITMQ_LIBS = @RABBITMQ_LIBS@
+RANLIB = @RANLIB@
+READLINK = @READLINK@
+REDIS = @REDIS@
+RELP_CFLAGS = @RELP_CFLAGS@
+RELP_LIBS = @RELP_LIBS@
+RSRT_CFLAGS = @RSRT_CFLAGS@
+RSRT_CFLAGS1 = @RSRT_CFLAGS1@
+RSRT_LIBS = @RSRT_LIBS@
+RSRT_LIBS1 = @RSRT_LIBS1@
+RST2MAN = @RST2MAN@
+RT_LIBS = @RT_LIBS@
+SED = @SED@
+SET_MAKE = @SET_MAKE@
+SHELL = @SHELL@
+SNMP_CFLAGS = @SNMP_CFLAGS@
+SNMP_LIBS = @SNMP_LIBS@
+SOL_LIBS = @SOL_LIBS@
+STRIP = @STRIP@
+TCL_BIN_DIR = @TCL_BIN_DIR@
+TCL_INCLUDE_SPEC = @TCL_INCLUDE_SPEC@
+TCL_LIB_FILE = @TCL_LIB_FILE@
+TCL_LIB_FLAG = @TCL_LIB_FLAG@
+TCL_LIB_SPEC = @TCL_LIB_SPEC@
+TCL_PATCH_LEVEL = @TCL_PATCH_LEVEL@
+TCL_SRC_DIR = @TCL_SRC_DIR@
+TCL_STUB_LIB_FILE = @TCL_STUB_LIB_FILE@
+TCL_STUB_LIB_FLAG = @TCL_STUB_LIB_FLAG@
+TCL_STUB_LIB_SPEC = @TCL_STUB_LIB_SPEC@
+TCL_VERSION = @TCL_VERSION@
+UDPSPOOF_CFLAGS = @UDPSPOOF_CFLAGS@
+UDPSPOOF_LIBS = @UDPSPOOF_LIBS@
+VALGRIND = @VALGRIND@
+VERSION = @VERSION@
+WARN_CFLAGS = @WARN_CFLAGS@
+WARN_LDFLAGS = @WARN_LDFLAGS@
+WARN_SCANNERFLAGS = @WARN_SCANNERFLAGS@
+WGET = @WGET@
+YACC = @YACC@
+YACC_FOUND = @YACC_FOUND@
+YFLAGS = @YFLAGS@
+ZLIB_CFLAGS = @ZLIB_CFLAGS@
+ZLIB_LIBS = @ZLIB_LIBS@
+ZSTD_CFLAGS = @ZSTD_CFLAGS@
+ZSTD_LIBS = @ZSTD_LIBS@
+abs_builddir = @abs_builddir@
+abs_srcdir = @abs_srcdir@
+abs_top_builddir = @abs_top_builddir@
+abs_top_srcdir = @abs_top_srcdir@
+ac_ct_AR = @ac_ct_AR@
+ac_ct_CC = @ac_ct_CC@
+ac_ct_DUMPBIN = @ac_ct_DUMPBIN@
+am__include = @am__include@
+am__leading_dot = @am__leading_dot@
+am__quote = @am__quote@
+am__tar = @am__tar@
+am__untar = @am__untar@
+bindir = @bindir@
+build = @build@
+build_alias = @build_alias@
+build_cpu = @build_cpu@
+build_os = @build_os@
+build_vendor = @build_vendor@
+builddir = @builddir@
+datadir = @datadir@
+datarootdir = @datarootdir@
+docdir = @docdir@
+dvidir = @dvidir@
+exec_prefix = @exec_prefix@
+host = @host@
+host_alias = @host_alias@
+host_cpu = @host_cpu@
+host_os = @host_os@
+host_vendor = @host_vendor@
+htmldir = @htmldir@
+includedir = @includedir@
+infodir = @infodir@
+install_sh = @install_sh@
+libdir = @libdir@
+libexecdir = @libexecdir@
+localedir = @localedir@
+localstatedir = @localstatedir@
+mandir = @mandir@
+mkdir_p = @mkdir_p@
+moddirs = @moddirs@
+oldincludedir = @oldincludedir@
+pdfdir = @pdfdir@
+pkgpyexecdir = @pkgpyexecdir@
+pkgpythondir = @pkgpythondir@
+prefix = @prefix@
+program_transform_name = @program_transform_name@
+psdir = @psdir@
+pyexecdir = @pyexecdir@
+pythondir = @pythondir@
+runstatedir = @runstatedir@
+sbindir = @sbindir@
+sharedstatedir = @sharedstatedir@
+srcdir = @srcdir@
+sysconfdir = @sysconfdir@
+target_alias = @target_alias@
+top_build_prefix = @top_build_prefix@
+top_builddir = @top_builddir@
+top_srcdir = @top_srcdir@
+pkglib_LTLIBRARIES = mmkubernetes.la
+mmkubernetes_la_SOURCES = mmkubernetes.c
+mmkubernetes_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CURL_CFLAGS) $(LIBLOGNORM_CFLAGS)
+mmkubernetes_la_LDFLAGS = -module -avoid-version
+mmkubernetes_la_LIBADD = $(CURL_LIBS) $(LIBLOGNORM_LIBS)
+EXTRA_DIST = k8s_filename.rulebase k8s_container_name.rulebase
+all: all-am
+
+.SUFFIXES:
+.SUFFIXES: .c .lo .o .obj
+$(srcdir)/Makefile.in: $(srcdir)/Makefile.am $(am__configure_deps)
+ @for dep in $?; do \
+ case '$(am__configure_deps)' in \
+ *$$dep*) \
+ ( cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh ) \
+ && { if test -f $@; then exit 0; else break; fi; }; \
+ exit 1;; \
+ esac; \
+ done; \
+ echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu contrib/mmkubernetes/Makefile'; \
+ $(am__cd) $(top_srcdir) && \
+ $(AUTOMAKE) --gnu contrib/mmkubernetes/Makefile
+Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status
+ @case '$?' in \
+ *config.status*) \
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \
+ *) \
+ echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__maybe_remake_depfiles)'; \
+ cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__maybe_remake_depfiles);; \
+ esac;
+
+$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+
+$(top_srcdir)/configure: $(am__configure_deps)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+$(ACLOCAL_M4): $(am__aclocal_m4_deps)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+$(am__aclocal_m4_deps):
+
+install-pkglibLTLIBRARIES: $(pkglib_LTLIBRARIES)
+ @$(NORMAL_INSTALL)
+ @list='$(pkglib_LTLIBRARIES)'; test -n "$(pkglibdir)" || list=; \
+ list2=; for p in $$list; do \
+ if test -f $$p; then \
+ list2="$$list2 $$p"; \
+ else :; fi; \
+ done; \
+ test -z "$$list2" || { \
+ echo " $(MKDIR_P) '$(DESTDIR)$(pkglibdir)'"; \
+ $(MKDIR_P) "$(DESTDIR)$(pkglibdir)" || exit 1; \
+ echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 '$(DESTDIR)$(pkglibdir)'"; \
+ $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 "$(DESTDIR)$(pkglibdir)"; \
+ }
+
+uninstall-pkglibLTLIBRARIES:
+ @$(NORMAL_UNINSTALL)
+ @list='$(pkglib_LTLIBRARIES)'; test -n "$(pkglibdir)" || list=; \
+ for p in $$list; do \
+ $(am__strip_dir) \
+ echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=uninstall rm -f '$(DESTDIR)$(pkglibdir)/$$f'"; \
+ $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=uninstall rm -f "$(DESTDIR)$(pkglibdir)/$$f"; \
+ done
+
+clean-pkglibLTLIBRARIES:
+ -test -z "$(pkglib_LTLIBRARIES)" || rm -f $(pkglib_LTLIBRARIES)
+ @list='$(pkglib_LTLIBRARIES)'; \
+ locs=`for p in $$list; do echo $$p; done | \
+ sed 's|^[^/]*$$|.|; s|/[^/]*$$||; s|$$|/so_locations|' | \
+ sort -u`; \
+ test -z "$$locs" || { \
+ echo rm -f $${locs}; \
+ rm -f $${locs}; \
+ }
+
+mmkubernetes.la: $(mmkubernetes_la_OBJECTS) $(mmkubernetes_la_DEPENDENCIES) $(EXTRA_mmkubernetes_la_DEPENDENCIES)
+ $(AM_V_CCLD)$(mmkubernetes_la_LINK) -rpath $(pkglibdir) $(mmkubernetes_la_OBJECTS) $(mmkubernetes_la_LIBADD) $(LIBS)
+
+mostlyclean-compile:
+ -rm -f *.$(OBJEXT)
+
+distclean-compile:
+ -rm -f *.tab.c
+
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/mmkubernetes_la-mmkubernetes.Plo@am__quote@ # am--include-marker
+
+$(am__depfiles_remade):
+ @$(MKDIR_P) $(@D)
+ @echo '# dummy' >$@-t && $(am__mv) $@-t $@
+
+am--depfiles: $(am__depfiles_remade)
+
+.c.o:
+@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.o$$||'`;\
+@am__fastdepCC_TRUE@ $(COMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ $< &&\
+@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Po
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(COMPILE) -c -o $@ $<
+
+.c.obj:
+@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.obj$$||'`;\
+@am__fastdepCC_TRUE@ $(COMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ `$(CYGPATH_W) '$<'` &&\
+@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Po
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(COMPILE) -c -o $@ `$(CYGPATH_W) '$<'`
+
+.c.lo:
+@am__fastdepCC_TRUE@ $(AM_V_CC)depbase=`echo $@ | sed 's|[^/]*$$|$(DEPDIR)/&|;s|\.lo$$||'`;\
+@am__fastdepCC_TRUE@ $(LTCOMPILE) -MT $@ -MD -MP -MF $$depbase.Tpo -c -o $@ $< &&\
+@am__fastdepCC_TRUE@ $(am__mv) $$depbase.Tpo $$depbase.Plo
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(LTCOMPILE) -c -o $@ $<
+
+mmkubernetes_la-mmkubernetes.lo: mmkubernetes.c
+@am__fastdepCC_TRUE@ $(AM_V_CC)$(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(mmkubernetes_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT mmkubernetes_la-mmkubernetes.lo -MD -MP -MF $(DEPDIR)/mmkubernetes_la-mmkubernetes.Tpo -c -o mmkubernetes_la-mmkubernetes.lo `test -f 'mmkubernetes.c' || echo '$(srcdir)/'`mmkubernetes.c
+@am__fastdepCC_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/mmkubernetes_la-mmkubernetes.Tpo $(DEPDIR)/mmkubernetes_la-mmkubernetes.Plo
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='mmkubernetes.c' object='mmkubernetes_la-mmkubernetes.lo' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(mmkubernetes_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o mmkubernetes_la-mmkubernetes.lo `test -f 'mmkubernetes.c' || echo '$(srcdir)/'`mmkubernetes.c
+
+mostlyclean-libtool:
+ -rm -f *.lo
+
+clean-libtool:
+ -rm -rf .libs _libs
+
+ID: $(am__tagged_files)
+ $(am__define_uniq_tagged_files); mkid -fID $$unique
+tags: tags-am
+TAGS: tags
+
+tags-am: $(TAGS_DEPENDENCIES) $(am__tagged_files)
+ set x; \
+ here=`pwd`; \
+ $(am__define_uniq_tagged_files); \
+ shift; \
+ if test -z "$(ETAGS_ARGS)$$*$$unique"; then :; else \
+ test -n "$$unique" || unique=$$empty_fix; \
+ if test $$# -gt 0; then \
+ $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \
+ "$$@" $$unique; \
+ else \
+ $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \
+ $$unique; \
+ fi; \
+ fi
+ctags: ctags-am
+
+CTAGS: ctags
+ctags-am: $(TAGS_DEPENDENCIES) $(am__tagged_files)
+ $(am__define_uniq_tagged_files); \
+ test -z "$(CTAGS_ARGS)$$unique" \
+ || $(CTAGS) $(CTAGSFLAGS) $(AM_CTAGSFLAGS) $(CTAGS_ARGS) \
+ $$unique
+
+GTAGS:
+ here=`$(am__cd) $(top_builddir) && pwd` \
+ && $(am__cd) $(top_srcdir) \
+ && gtags -i $(GTAGS_ARGS) "$$here"
+cscopelist: cscopelist-am
+
+cscopelist-am: $(am__tagged_files)
+ list='$(am__tagged_files)'; \
+ case "$(srcdir)" in \
+ [\\/]* | ?:[\\/]*) sdir="$(srcdir)" ;; \
+ *) sdir=$(subdir)/$(srcdir) ;; \
+ esac; \
+ for i in $$list; do \
+ if test -f "$$i"; then \
+ echo "$(subdir)/$$i"; \
+ else \
+ echo "$$sdir/$$i"; \
+ fi; \
+ done >> $(top_builddir)/cscope.files
+
+distclean-tags:
+ -rm -f TAGS ID GTAGS GRTAGS GSYMS GPATH tags
+
+distdir: $(BUILT_SOURCES)
+ $(MAKE) $(AM_MAKEFLAGS) distdir-am
+
+distdir-am: $(DISTFILES)
+ @srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \
+ topsrcdirstrip=`echo "$(top_srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \
+ list='$(DISTFILES)'; \
+ dist_files=`for file in $$list; do echo $$file; done | \
+ sed -e "s|^$$srcdirstrip/||;t" \
+ -e "s|^$$topsrcdirstrip/|$(top_builddir)/|;t"`; \
+ case $$dist_files in \
+ */*) $(MKDIR_P) `echo "$$dist_files" | \
+ sed '/\//!d;s|^|$(distdir)/|;s,/[^/]*$$,,' | \
+ sort -u` ;; \
+ esac; \
+ for file in $$dist_files; do \
+ if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \
+ if test -d $$d/$$file; then \
+ dir=`echo "/$$file" | sed -e 's,/[^/]*$$,,'`; \
+ if test -d "$(distdir)/$$file"; then \
+ find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \
+ fi; \
+ if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \
+ cp -fpR $(srcdir)/$$file "$(distdir)$$dir" || exit 1; \
+ find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \
+ fi; \
+ cp -fpR $$d/$$file "$(distdir)$$dir" || exit 1; \
+ else \
+ test -f "$(distdir)/$$file" \
+ || cp -p $$d/$$file "$(distdir)/$$file" \
+ || exit 1; \
+ fi; \
+ done
+check-am: all-am
+check: check-am
+all-am: Makefile $(LTLIBRARIES)
+installdirs:
+ for dir in "$(DESTDIR)$(pkglibdir)"; do \
+ test -z "$$dir" || $(MKDIR_P) "$$dir"; \
+ done
+install: install-am
+install-exec: install-exec-am
+install-data: install-data-am
+uninstall: uninstall-am
+
+install-am: all-am
+ @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am
+
+installcheck: installcheck-am
+install-strip:
+ if test -z '$(STRIP)'; then \
+ $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
+ install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
+ install; \
+ else \
+ $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
+ install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
+ "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'" install; \
+ fi
+mostlyclean-generic:
+
+clean-generic:
+
+distclean-generic:
+ -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES)
+ -test . = "$(srcdir)" || test -z "$(CONFIG_CLEAN_VPATH_FILES)" || rm -f $(CONFIG_CLEAN_VPATH_FILES)
+
+maintainer-clean-generic:
+ @echo "This command is intended for maintainers to use"
+ @echo "it deletes files that may require special tools to rebuild."
+clean: clean-am
+
+clean-am: clean-generic clean-libtool clean-pkglibLTLIBRARIES \
+ mostlyclean-am
+
+distclean: distclean-am
+ -rm -f ./$(DEPDIR)/mmkubernetes_la-mmkubernetes.Plo
+ -rm -f Makefile
+distclean-am: clean-am distclean-compile distclean-generic \
+ distclean-tags
+
+dvi: dvi-am
+
+dvi-am:
+
+html: html-am
+
+html-am:
+
+info: info-am
+
+info-am:
+
+install-data-am:
+
+install-dvi: install-dvi-am
+
+install-dvi-am:
+
+install-exec-am: install-pkglibLTLIBRARIES
+
+install-html: install-html-am
+
+install-html-am:
+
+install-info: install-info-am
+
+install-info-am:
+
+install-man:
+
+install-pdf: install-pdf-am
+
+install-pdf-am:
+
+install-ps: install-ps-am
+
+install-ps-am:
+
+installcheck-am:
+
+maintainer-clean: maintainer-clean-am
+ -rm -f ./$(DEPDIR)/mmkubernetes_la-mmkubernetes.Plo
+ -rm -f Makefile
+maintainer-clean-am: distclean-am maintainer-clean-generic
+
+mostlyclean: mostlyclean-am
+
+mostlyclean-am: mostlyclean-compile mostlyclean-generic \
+ mostlyclean-libtool
+
+pdf: pdf-am
+
+pdf-am:
+
+ps: ps-am
+
+ps-am:
+
+uninstall-am: uninstall-pkglibLTLIBRARIES
+
+.MAKE: install-am install-strip
+
+.PHONY: CTAGS GTAGS TAGS all all-am am--depfiles check check-am clean \
+ clean-generic clean-libtool clean-pkglibLTLIBRARIES \
+ cscopelist-am ctags ctags-am distclean distclean-compile \
+ distclean-generic distclean-libtool distclean-tags distdir dvi \
+ dvi-am html html-am info info-am install install-am \
+ install-data install-data-am install-dvi install-dvi-am \
+ install-exec install-exec-am install-html install-html-am \
+ install-info install-info-am install-man install-pdf \
+ install-pdf-am install-pkglibLTLIBRARIES install-ps \
+ install-ps-am install-strip installcheck installcheck-am \
+ installdirs maintainer-clean maintainer-clean-generic \
+ mostlyclean mostlyclean-compile mostlyclean-generic \
+ mostlyclean-libtool pdf pdf-am ps ps-am tags tags-am uninstall \
+ uninstall-am uninstall-pkglibLTLIBRARIES
+
+.PRECIOUS: Makefile
+
+
+# Tell versions [3.59,3.63) of GNU make to not export all variables.
+# Otherwise a system limit (for SysV at least) may be exceeded.
+.NOEXPORT:
diff --git a/contrib/mmkubernetes/k8s_container_name.rulebase b/contrib/mmkubernetes/k8s_container_name.rulebase
new file mode 100644
index 0000000..1fe7373
--- /dev/null
+++ b/contrib/mmkubernetes/k8s_container_name.rulebase
@@ -0,0 +1,3 @@
+version=2
+rule=:%k8s_prefix:char-to:_%_%container_name:char-to:.%.%container_hash:char-to:_%_%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest%
+rule=:%k8s_prefix:char-to:_%_%container_name:char-to:_%_%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest%
diff --git a/contrib/mmkubernetes/k8s_filename.rulebase b/contrib/mmkubernetes/k8s_filename.rulebase
new file mode 100644
index 0000000..a7b2cac
--- /dev/null
+++ b/contrib/mmkubernetes/k8s_filename.rulebase
@@ -0,0 +1,2 @@
+version=2
+rule=:/var/log/containers/%pod_name:char-to:_%_%namespace_name:char-to:_%_%container_name_and_id:char-to:.%.log
diff --git a/contrib/mmkubernetes/mmkubernetes.c b/contrib/mmkubernetes/mmkubernetes.c
new file mode 100644
index 0000000..525962e
--- /dev/null
+++ b/contrib/mmkubernetes/mmkubernetes.c
@@ -0,0 +1,2085 @@
+/* mmkubernetes.c
+ * This is a message modification module. It uses metadata obtained
+ * from the message to query Kubernetes and obtain additional metadata
+ * relating to the container instance.
+ *
+ * Inspired by:
+ * https://github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter
+ *
+ * NOTE: read comments in module-template.h for details on the calling interface!
+ *
+ * Copyright 2016 Red Hat Inc.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* needed for asprintf */
+#ifndef _GNU_SOURCE
+# define _GNU_SOURCE
+#endif
+
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sys/stat.h>
+#include <libestr.h>
+#include <liblognorm.h>
+#include <json.h>
+#include <curl/curl.h>
+#include <curl/easy.h>
+#include <pthread.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "statsobj.h"
+#include "regexp.h"
+#include "hashtable.h"
+#include "hashtable_itr.h"
+#include "srUtils.h"
+#include "unicode-helper.h"
+#include "datetime.h"
+
+/* static data */
+MODULE_TYPE_OUTPUT /* this is technically an output plugin */
+MODULE_TYPE_KEEP /* releasing the module would cause a leak through libcurl */
+MODULE_CNFNAME("mmkubernetes")
+DEF_OMOD_STATIC_DATA
+DEFobjCurrIf(regexp)
+DEFobjCurrIf(statsobj)
+DEFobjCurrIf(datetime)
+
+#define HAVE_LOADSAMPLESFROMSTRING 1
+#if defined(NO_LOADSAMPLESFROMSTRING)
+#undef HAVE_LOADSAMPLESFROMSTRING
+#endif
+/* original from fluentd plugin:
+ * 'var\.log\.containers\.(?<pod_name>[a-z0-9]([-a-z0-9]*[a-z0-9])?\
+ * (\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace>[^_]+)_\
+ * (?<container_name>.+)-(?<docker_id>[a-z0-9]{64})\.log$'
+ * this is for _tag_ match, not actual filename match - in_tail turns filename
+ * into a fluentd tag
+ */
+#define DFLT_FILENAME_LNRULES "rule=:/var/log/containers/%pod_name:char-to:_%_"\
+ "%namespace_name:char-to:_%_%container_name_and_id:char-to:.%.log"
+#define DFLT_FILENAME_RULEBASE "/etc/rsyslog.d/k8s_filename.rulebase"
+/* original from fluentd plugin:
+ * '^(?<name_prefix>[^_]+)_(?<container_name>[^\._]+)\
+ * (\.(?<container_hash>[^_]+))?_(?<pod_name>[^_]+)_\
+ * (?<namespace>[^_]+)_[^_]+_[^_]+$'
+ */
+#define DFLT_CONTAINER_LNRULES "rule=:%k8s_prefix:char-to:_%_%container_name:char-to:.%."\
+ "%container_hash:char-to:_%_"\
+ "%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest%\n"\
+ "rule=:%k8s_prefix:char-to:_%_%container_name:char-to:_%_"\
+ "%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest%"
+#define DFLT_CONTAINER_RULEBASE "/etc/rsyslog.d/k8s_container_name.rulebase"
+#define DFLT_SRCMD_PATH "$!metadata!filename"
+#define DFLT_DSTMD_PATH "$!"
+#define DFLT_DE_DOT 1 /* true */
+#define DFLT_DE_DOT_SEPARATOR "_"
+#define DFLT_CONTAINER_NAME "$!CONTAINER_NAME" /* name of variable holding CONTAINER_NAME value */
+#define DFLT_CONTAINER_ID_FULL "$!CONTAINER_ID_FULL" /* name of variable holding CONTAINER_ID_FULL value */
+#define DFLT_KUBERNETES_URL "https://kubernetes.default.svc.cluster.local:443"
+#define DFLT_BUSY_RETRY_INTERVAL 5 /* retry every 5 seconds */
+#define DFLT_SSL_PARTIAL_CHAIN 0 /* disallow X509_V_FLAG_PARTIAL_CHAIN by default */
+#define DFLT_CACHE_ENTRY_TTL 3600 /* delete entries from the cache older than 3600 seconds */
+#define DFLT_CACHE_EXPIRE_INTERVAL -1 /* delete all expired entries from the cache every N seconds
+ -1 disables cache expiration/ttl checking
+ 0 means - run cache expiration for every record */
+
+/* only support setting the partial chain flag on openssl platforms that have the define */
+#if defined(ENABLE_OPENSSL) && defined(X509_V_FLAG_PARTIAL_CHAIN)
+#define SUPPORT_SSL_PARTIAL_CHAIN 1
+#endif
+
+struct cache_entry_s {
+ time_t ttl; /* when this entry should expire */
+ void *data; /* the user data */
+};
+
+static struct cache_s {
+ const uchar *kbUrl;
+ struct hashtable *mdHt;
+ struct hashtable *nsHt;
+ pthread_mutex_t *cacheMtx;
+ int lastBusyTime; /* when we got the last busy response from kubernetes */
+ time_t expirationTime; /* if cache expiration checking is enable, time to check for expiration */
+} **caches;
+
+typedef struct {
+ int nmemb;
+ uchar **patterns;
+ regex_t *regexps;
+} annotation_match_t;
+
+/* module configuration data */
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+ uchar *kubernetesUrl; /* scheme, host, port, and optional path prefix for Kubernetes API lookups */
+ uchar *srcMetadataPath; /* where to get data for kubernetes queries */
+ uchar *dstMetadataPath; /* where to put metadata obtained from kubernetes */
+ uchar *caCertFile; /* File holding the CA cert (+optional chain) of CA that issued the Kubernetes server cert */
+ uchar *myCertFile; /* File holding cert corresponding to private key used for client cert auth */
+ uchar *myPrivKeyFile; /* File holding private key corresponding to cert used for client cert auth */
+ sbool allowUnsignedCerts; /* For testing/debugging - do not check for CA certs (CURLOPT_SSL_VERIFYPEER FALSE) */
+ sbool skipVerifyHost; /* For testing/debugging - skip cert hostname verify (CURLOPT_SSL_VERIFYHOST FALSE) */
+ uchar *token; /* The token value to use to authenticate to Kubernetes - takes precedence over tokenFile */
+ uchar *tokenFile; /* The file whose contents is the token value to use to authenticate to Kubernetes */
+ sbool de_dot; /* If true (default), convert '.' characters in labels & annotations to de_dot_separator */
+ uchar *de_dot_separator; /* separator character (default '_') to use for de_dotting */
+ size_t de_dot_separator_len; /* length of separator character */
+ annotation_match_t annotation_match; /* annotation keys must match these to be included in record */
+ char *fnRules; /* lognorm rules for container log filename match */
+ uchar *fnRulebase; /* lognorm rulebase filename for container log filename match */
+ char *contRules; /* lognorm rules for CONTAINER_NAME value match */
+ uchar *contRulebase; /* lognorm rulebase filename for CONTAINER_NAME value match */
+ int busyRetryInterval; /* how to handle 429 response - 0 means error, non-zero means retry every N seconds */
+ sbool sslPartialChain; /* if true, allow using intermediate certs without root certs */
+ int cacheEntryTTL; /* delete entries from the cache if they are older than this many seconds */
+ int cacheExpireInterval; /* delete all expired entries from the cache every this many seconds */
+};
+
+/* action (instance) configuration data */
+typedef struct _instanceData {
+ uchar *kubernetesUrl; /* scheme, host, port, and optional path prefix for Kubernetes API lookups */
+ msgPropDescr_t *srcMetadataDescr; /* where to get data for kubernetes queries */
+ uchar *dstMetadataPath; /* where to put metadata obtained from kubernetes */
+ uchar *caCertFile; /* File holding the CA cert (+optional chain) of CA that issued the Kubernetes server cert */
+ uchar *myCertFile; /* File holding cert corresponding to private key used for client cert auth */
+ uchar *myPrivKeyFile; /* File holding private key corresponding to cert used for client cert auth */
+ sbool allowUnsignedCerts; /* For testing/debugging - do not check for CA certs (CURLOPT_SSL_VERIFYPEER FALSE) */
+ sbool skipVerifyHost; /* For testing/debugging - skip cert hostname verify (CURLOPT_SSL_VERIFYHOST FALSE) */
+ uchar *token; /* The token value to use to authenticate to Kubernetes - takes precedence over tokenFile */
+ uchar *tokenFile; /* The file whose contents is the token value to use to authenticate to Kubernetes */
+ sbool de_dot; /* If true (default), convert '.' characters in labels & annotations to de_dot_separator */
+ uchar *de_dot_separator; /* separator character (default '_') to use for de_dotting */
+ size_t de_dot_separator_len; /* length of separator character */
+ annotation_match_t annotation_match; /* annotation keys must match these to be included in record */
+ char *fnRules; /* lognorm rules for container log filename match */
+ uchar *fnRulebase; /* lognorm rulebase filename for container log filename match */
+ ln_ctx fnCtxln; /**< context to be used for liblognorm */
+ char *contRules; /* lognorm rules for CONTAINER_NAME value match */
+ uchar *contRulebase; /* lognorm rulebase filename for CONTAINER_NAME value match */
+ ln_ctx contCtxln; /**< context to be used for liblognorm */
+ msgPropDescr_t *contNameDescr; /* CONTAINER_NAME field */
+ msgPropDescr_t *contIdFullDescr; /* CONTAINER_ID_FULL field */
+ struct cache_s *cache;
+ int busyRetryInterval; /* how to handle 429 response - 0 means error, non-zero means retry every N seconds */
+ sbool sslPartialChain; /* if true, allow using intermediate certs without root certs */
+ int cacheEntryTTL; /* delete entries from the cache if they are older than this many seconds */
+ int cacheExpireInterval; /* delete all expired entries from the cache every this many seconds */
+} instanceData;
+
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ CURL *curlCtx;
+ struct curl_slist *curlHdr;
+ char *curlRply;
+ size_t curlRplyLen;
+ statsobj_t *stats; /* stats for this instance */
+ STATSCOUNTER_DEF(k8sRecordSeen, mutK8sRecordSeen);
+ STATSCOUNTER_DEF(namespaceMetadataSuccess, mutNamespaceMetadataSuccess);
+ STATSCOUNTER_DEF(namespaceMetadataNotFound, mutNamespaceMetadataNotFound);
+ STATSCOUNTER_DEF(namespaceMetadataBusy, mutNamespaceMetadataBusy);
+ STATSCOUNTER_DEF(namespaceMetadataError, mutNamespaceMetadataError);
+ STATSCOUNTER_DEF(podMetadataSuccess, mutPodMetadataSuccess);
+ STATSCOUNTER_DEF(podMetadataNotFound, mutPodMetadataNotFound);
+ STATSCOUNTER_DEF(podMetadataBusy, mutPodMetadataBusy);
+ STATSCOUNTER_DEF(podMetadataError, mutPodMetadataError);
+ STATSCOUNTER_DEF(podCacheNumEntries, mutPodCacheNumEntries);
+ STATSCOUNTER_DEF(namespaceCacheNumEntries, mutNamespaceCacheNumEntries);
+ STATSCOUNTER_DEF(podCacheHits, mutPodCacheHits);
+ STATSCOUNTER_DEF(namespaceCacheHits, mutNamespaceCacheHits);
+ /* cache misses should correspond to metadata success, busy, etc. k8s api calls */
+ STATSCOUNTER_DEF(podCacheMisses, mutPodCacheMisses);
+ STATSCOUNTER_DEF(namespaceCacheMisses, mutNamespaceCacheMisses);
+} wrkrInstanceData_t;
+
+/* module parameters (v6 config format) */
+static struct cnfparamdescr modpdescr[] = {
+ { "kubernetesurl", eCmdHdlrString, 0 },
+ { "srcmetadatapath", eCmdHdlrString, 0 },
+ { "dstmetadatapath", eCmdHdlrString, 0 },
+ { "tls.cacert", eCmdHdlrString, 0 },
+ { "tls.mycert", eCmdHdlrString, 0 },
+ { "tls.myprivkey", eCmdHdlrString, 0 },
+ { "allowunsignedcerts", eCmdHdlrBinary, 0 },
+ { "skipverifyhost", eCmdHdlrBinary, 0 },
+ { "token", eCmdHdlrString, 0 },
+ { "tokenfile", eCmdHdlrString, 0 },
+ { "annotation_match", eCmdHdlrArray, 0 },
+ { "de_dot", eCmdHdlrBinary, 0 },
+ { "de_dot_separator", eCmdHdlrString, 0 },
+ { "filenamerulebase", eCmdHdlrString, 0 },
+ { "containerrulebase", eCmdHdlrString, 0 },
+ { "busyretryinterval", eCmdHdlrInt, 0 },
+ { "sslpartialchain", eCmdHdlrBinary, 0 },
+ { "cacheentryttl", eCmdHdlrInt, 0 },
+ { "cacheexpireinterval", eCmdHdlrInt, 0 }
+#if HAVE_LOADSAMPLESFROMSTRING == 1
+ ,
+ { "filenamerules", eCmdHdlrArray, 0 },
+ { "containerrules", eCmdHdlrArray, 0 }
+#endif
+};
+static struct cnfparamblk modpblk = {
+ CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+};
+
+/* action (instance) parameters (v6 config format) */
+static struct cnfparamdescr actpdescr[] = {
+ { "kubernetesurl", eCmdHdlrString, 0 },
+ { "srcmetadatapath", eCmdHdlrString, 0 },
+ { "dstmetadatapath", eCmdHdlrString, 0 },
+ { "tls.cacert", eCmdHdlrString, 0 },
+ { "tls.mycert", eCmdHdlrString, 0 },
+ { "tls.myprivkey", eCmdHdlrString, 0 },
+ { "allowunsignedcerts", eCmdHdlrBinary, 0 },
+ { "skipverifyhost", eCmdHdlrBinary, 0 },
+ { "token", eCmdHdlrString, 0 },
+ { "tokenfile", eCmdHdlrString, 0 },
+ { "annotation_match", eCmdHdlrArray, 0 },
+ { "de_dot", eCmdHdlrBinary, 0 },
+ { "de_dot_separator", eCmdHdlrString, 0 },
+ { "filenamerulebase", eCmdHdlrString, 0 },
+ { "containerrulebase", eCmdHdlrString, 0 },
+ { "busyretryinterval", eCmdHdlrInt, 0 },
+ { "sslpartialchain", eCmdHdlrBinary, 0 },
+ { "cacheentryttl", eCmdHdlrInt, 0 },
+ { "cacheexpireinterval", eCmdHdlrInt, 0 }
+#if HAVE_LOADSAMPLESFROMSTRING == 1
+ ,
+ { "filenamerules", eCmdHdlrArray, 0 },
+ { "containerrules", eCmdHdlrArray, 0 }
+#endif
+};
+static struct cnfparamblk actpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
+
+static modConfData_t *loadModConf = NULL; /* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL; /* modConf ptr to use for the current exec process */
+
+static void free_annotationmatch(annotation_match_t *match) {
+ if (match) {
+ for(int ii = 0 ; ii < match->nmemb; ++ii) {
+ if (match->patterns)
+ free(match->patterns[ii]);
+ if (match->regexps)
+ regexp.regfree(&match->regexps[ii]);
+ }
+ free(match->patterns);
+ match->patterns = NULL;
+ free(match->regexps);
+ match->regexps = NULL;
+ match->nmemb = 0;
+ }
+}
+
+static int init_annotationmatch(annotation_match_t *match, struct cnfarray *ar) {
+ DEFiRet;
+
+ match->nmemb = ar->nmemb;
+ CHKmalloc(match->patterns = calloc(sizeof(uchar*), match->nmemb));
+ CHKmalloc(match->regexps = calloc(sizeof(regex_t), match->nmemb));
+ for(int jj = 0; jj < ar->nmemb; ++jj) {
+ int rexret = 0;
+ match->patterns[jj] = (uchar*)es_str2cstr(ar->arr[jj], NULL);
+ rexret = regexp.regcomp(&match->regexps[jj],
+ (char *)match->patterns[jj], REG_EXTENDED|REG_NOSUB);
+ if (0 != rexret) {
+ char errMsg[512];
+ regexp.regerror(rexret, &match->regexps[jj], errMsg, sizeof(errMsg));
+ iRet = RS_RET_CONFIG_ERROR;
+ LogError(0, iRet,
+ "error: could not compile annotation_match string [%s]"
+ " into an extended regexp - %d: %s\n",
+ match->patterns[jj], rexret, errMsg);
+ break;
+ }
+ }
+finalize_it:
+ if (iRet)
+ free_annotationmatch(match);
+ RETiRet;
+}
+
+static int copy_annotationmatch(annotation_match_t *src, annotation_match_t *dest) {
+ DEFiRet;
+
+ dest->nmemb = src->nmemb;
+ CHKmalloc(dest->patterns = malloc(sizeof(uchar*) * dest->nmemb));
+ CHKmalloc(dest->regexps = calloc(sizeof(regex_t), dest->nmemb));
+ for(int jj = 0 ; jj < src->nmemb ; ++jj) {
+ CHKmalloc(dest->patterns[jj] = (uchar*)strdup((char *)src->patterns[jj]));
+ /* assumes was already successfully compiled */
+ regexp.regcomp(&dest->regexps[jj], (char *)dest->patterns[jj], REG_EXTENDED|REG_NOSUB);
+ }
+finalize_it:
+ if (iRet)
+ free_annotationmatch(dest);
+ RETiRet;
+}
+
+/* takes a hash of annotations and returns another json object hash containing only the
+ * keys that match - this logic is taken directly from fluent-plugin-kubernetes_metadata_filter
+ * except that we do not add the key multiple times to the object to be returned
+ */
+static struct json_object *match_annotations(annotation_match_t *match,
+ struct json_object *annotations) {
+ struct json_object *ret = NULL;
+
+ for (int jj = 0; jj < match->nmemb; ++jj) {
+ struct json_object_iterator it = json_object_iter_begin(annotations);
+ struct json_object_iterator itEnd = json_object_iter_end(annotations);
+ for (;!json_object_iter_equal(&it, &itEnd); json_object_iter_next(&it)) {
+ const char *const key = json_object_iter_peek_name(&it);
+ if (!ret || !fjson_object_object_get_ex(ret, key, NULL)) {
+ if (!regexp.regexec(&match->regexps[jj], key, 0, NULL, 0)) {
+ if (!ret) {
+ ret = json_object_new_object();
+ }
+ json_object_object_add(ret, key,
+ json_object_get(json_object_iter_peek_value(&it)));
+ }
+ }
+ }
+ }
+ return ret;
+}
+
+/* This will take a hash of labels or annotations and will de_dot the keys.
+ * It will return a brand new hash. AFAICT, there is no safe way to
+ * iterate over the hash while modifying it in place.
+ */
+static struct json_object *de_dot_json_object(struct json_object *jobj,
+ const char *delim, size_t delim_len) {
+ struct json_object *ret = NULL;
+ struct json_object_iterator it = json_object_iter_begin(jobj);
+ struct json_object_iterator itEnd = json_object_iter_end(jobj);
+ es_str_t *new_es_key = NULL;
+ DEFiRet;
+
+ ret = json_object_new_object();
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ const char *const key = json_object_iter_peek_name(&it);
+ const char *cc = strstr(key, ".");
+ if (NULL == cc) {
+ json_object_object_add(ret, key,
+ json_object_get(json_object_iter_peek_value(&it)));
+ } else {
+ char *new_key = NULL;
+ const char *prevcc = key;
+ new_es_key = es_newStrFromCStr(key, (es_size_t)(cc-prevcc));
+ while (cc) {
+ if (es_addBuf(&new_es_key, (char *)delim, (es_size_t)delim_len))
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ cc += 1; /* one past . */
+ prevcc = cc; /* beginning of next substring */
+ if ((cc = strstr(prevcc, ".")) || (cc = strchr(prevcc, '\0'))) {
+ if (es_addBuf(&new_es_key, (char *)prevcc, (es_size_t)(cc-prevcc)))
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ if (!*cc)
+ cc = NULL; /* EOS - done */
+ }
+ }
+ new_key = es_str2cstr(new_es_key, NULL);
+ es_deleteStr(new_es_key);
+ new_es_key = NULL;
+ json_object_object_add(ret, new_key,
+ json_object_get(json_object_iter_peek_value(&it)));
+ free(new_key);
+ }
+ json_object_iter_next(&it);
+ }
+finalize_it:
+ if (iRet != RS_RET_OK) {
+ json_object_put(ret);
+ ret = NULL;
+ }
+ if (new_es_key)
+ es_deleteStr(new_es_key);
+ return ret;
+}
+
+/* given a "metadata" object field, do
+ * - make sure "annotations" field has only the matching keys
+ * - de_dot the "labels" and "annotations" fields keys
+ * This modifies the jMetadata object in place
+ */
+static void parse_labels_annotations(struct json_object *jMetadata,
+ annotation_match_t *match, sbool de_dot,
+ const char *delim, size_t delim_len) {
+ struct json_object *jo = NULL;
+
+ if (fjson_object_object_get_ex(jMetadata, "annotations", &jo)) {
+ if ((jo = match_annotations(match, jo)))
+ json_object_object_add(jMetadata, "annotations", jo);
+ else
+ json_object_object_del(jMetadata, "annotations");
+ }
+ /* dedot labels and annotations */
+ if (de_dot) {
+ struct json_object *jo2 = NULL;
+ if (fjson_object_object_get_ex(jMetadata, "annotations", &jo)) {
+ if ((jo2 = de_dot_json_object(jo, delim, delim_len))) {
+ json_object_object_add(jMetadata, "annotations", jo2);
+ }
+ }
+ if (fjson_object_object_get_ex(jMetadata, "labels", &jo)) {
+ if ((jo2 = de_dot_json_object(jo, delim, delim_len))) {
+ json_object_object_add(jMetadata, "labels", jo2);
+ }
+ }
+ }
+}
+
+#if HAVE_LOADSAMPLESFROMSTRING == 1
+static int array_to_rules(struct cnfarray *ar, char **rules) {
+ DEFiRet;
+ es_str_t *tmpstr = NULL;
+ es_size_t size = 0;
+
+ if (rules == NULL)
+ FINALIZE;
+ *rules = NULL;
+ if (!ar->nmemb)
+ FINALIZE;
+ for (int jj = 0; jj < ar->nmemb; jj++)
+ size += es_strlen(ar->arr[jj]);
+ if (!size)
+ FINALIZE;
+ CHKmalloc(tmpstr = es_newStr(size));
+ CHKiRet((es_addStr(&tmpstr, ar->arr[0])));
+ CHKiRet((es_addBufConstcstr(&tmpstr, "\n")));
+ for(int jj=1; jj < ar->nmemb; ++jj) {
+ CHKiRet((es_addStr(&tmpstr, ar->arr[jj])));
+ CHKiRet((es_addBufConstcstr(&tmpstr, "\n")));
+ }
+ CHKiRet((es_addBufConstcstr(&tmpstr, "\0")));
+ CHKmalloc(*rules = es_str2cstr(tmpstr, NULL));
+finalize_it:
+ if (tmpstr) {
+ es_deleteStr(tmpstr);
+ }
+ if (iRet != RS_RET_OK) {
+ free(*rules);
+ *rules = NULL;
+ }
+ RETiRet;
+}
+#endif
+
+/* callback for liblognorm error messages */
+static void
+errCallBack(void __attribute__((unused)) *cookie, const char *msg,
+ size_t __attribute__((unused)) lenMsg)
+{
+ LogError(0, RS_RET_ERR_LIBLOGNORM, "liblognorm error: %s", msg);
+}
+
+static rsRetVal
+set_lnctx(ln_ctx *ctxln, char *instRules, uchar *instRulebase, char *modRules, uchar *modRulebase)
+{
+ DEFiRet;
+ if (ctxln == NULL)
+ FINALIZE;
+ CHKmalloc(*ctxln = ln_initCtx());
+ ln_setErrMsgCB(*ctxln, errCallBack, NULL);
+ if(instRules) {
+#if HAVE_LOADSAMPLESFROMSTRING == 1
+ if(ln_loadSamplesFromString(*ctxln, instRules) !=0) {
+ LogError(0, RS_RET_NO_RULEBASE, "error: normalization rules '%s' "
+ "could not be loaded", instRules);
+ ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
+ }
+#else
+ (void)instRules;
+#endif
+ } else if(instRulebase) {
+ if(ln_loadSamples(*ctxln, (char*) instRulebase) != 0) {
+ LogError(0, RS_RET_NO_RULEBASE, "error: normalization rulebase '%s' "
+ "could not be loaded", instRulebase);
+ ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
+ }
+ } else if(modRules) {
+#if HAVE_LOADSAMPLESFROMSTRING == 1
+ if(ln_loadSamplesFromString(*ctxln, modRules) !=0) {
+ LogError(0, RS_RET_NO_RULEBASE, "error: normalization rules '%s' "
+ "could not be loaded", modRules);
+ ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
+ }
+#else
+ (void)modRules;
+#endif
+ } else if(modRulebase) {
+ if(ln_loadSamples(*ctxln, (char*) modRulebase) != 0) {
+ LogError(0, RS_RET_NO_RULEBASE, "error: normalization rulebase '%s' "
+ "could not be loaded", modRulebase);
+ ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
+ }
+ }
+finalize_it:
+ if (iRet != RS_RET_OK){
+ ln_exitCtx(*ctxln);
+ *ctxln = NULL;
+ }
+ RETiRet;
+}
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ENDbeginCnfLoad
+
+
+BEGINsetModCnf
+ struct cnfparamvals *pvals = NULL;
+ int i;
+ FILE *fp = NULL;
+ int ret;
+ char errStr[1024];
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if(pvals == NULL) {
+ LogError(0, RS_RET_MISSING_CNFPARAMS, "mmkubernetes: "
+ "error processing module config parameters [module(...)]");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("module (global) param blk for mmkubernetes:\n");
+ cnfparamsPrint(&modpblk, pvals);
+ }
+
+ loadModConf->de_dot = DFLT_DE_DOT;
+ loadModConf->busyRetryInterval = DFLT_BUSY_RETRY_INTERVAL;
+ loadModConf->sslPartialChain = DFLT_SSL_PARTIAL_CHAIN;
+ loadModConf->cacheEntryTTL = DFLT_CACHE_ENTRY_TTL;
+ loadModConf->cacheExpireInterval = DFLT_CACHE_EXPIRE_INTERVAL;
+ for(i = 0 ; i < modpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed) {
+ continue;
+ } else if(!strcmp(modpblk.descr[i].name, "kubernetesurl")) {
+ free(loadModConf->kubernetesUrl);
+ loadModConf->kubernetesUrl = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(modpblk.descr[i].name, "srcmetadatapath")) {
+ free(loadModConf->srcMetadataPath);
+ loadModConf->srcMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ /* todo: sanitize the path */
+ } else if(!strcmp(modpblk.descr[i].name, "dstmetadatapath")) {
+ free(loadModConf->dstMetadataPath);
+ loadModConf->dstMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ /* todo: sanitize the path */
+ } else if(!strcmp(modpblk.descr[i].name, "tls.cacert")) {
+ free(loadModConf->caCertFile);
+ loadModConf->caCertFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ fp = fopen((const char*)loadModConf->caCertFile, "r");
+ if(fp == NULL) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ iRet = RS_RET_NO_FILE_ACCESS;
+ LogError(0, iRet,
+ "error: 'tls.cacert' file %s couldn't be accessed: %s\n",
+ loadModConf->caCertFile, errStr);
+ ABORT_FINALIZE(iRet);
+ } else {
+ fclose(fp);
+ fp = NULL;
+ }
+ } else if(!strcmp(modpblk.descr[i].name, "tls.mycert")) {
+ free(loadModConf->myCertFile);
+ loadModConf->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ fp = fopen((const char*)loadModConf->myCertFile, "r");
+ if(fp == NULL) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ iRet = RS_RET_NO_FILE_ACCESS;
+ LogError(0, iRet,
+ "error: 'tls.mycert' file %s couldn't be accessed: %s\n",
+ loadModConf->myCertFile, errStr);
+ } else {
+ fclose(fp);
+ fp = NULL;
+ }
+ } else if(!strcmp(modpblk.descr[i].name, "tls.myprivkey")) {
+ loadModConf->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ fp = fopen((const char*)loadModConf->myPrivKeyFile, "r");
+ if(fp == NULL) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ iRet = RS_RET_NO_FILE_ACCESS;
+ LogError(0, iRet,
+ "error: 'tls.myprivkey' file %s couldn't be accessed: %s\n",
+ loadModConf->myPrivKeyFile, errStr);
+ } else {
+ fclose(fp);
+ fp = NULL;
+ }
+ } else if(!strcmp(modpblk.descr[i].name, "allowunsignedcerts")) {
+ loadModConf->allowUnsignedCerts = pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "skipverifyhost")) {
+ loadModConf->skipVerifyHost = pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "token")) {
+ free(loadModConf->token);
+ loadModConf->token = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(modpblk.descr[i].name, "tokenfile")) {
+ free(loadModConf->tokenFile);
+ loadModConf->tokenFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ fp = fopen((const char*)loadModConf->tokenFile, "r");
+ if(fp == NULL) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ iRet = RS_RET_NO_FILE_ACCESS;
+ LogError(0, iRet,
+ "error: token file %s couldn't be accessed: %s\n",
+ loadModConf->tokenFile, errStr);
+ ABORT_FINALIZE(iRet);
+ } else {
+ fclose(fp);
+ fp = NULL;
+ }
+ } else if(!strcmp(modpblk.descr[i].name, "annotation_match")) {
+ free_annotationmatch(&loadModConf->annotation_match);
+ if ((ret = init_annotationmatch(&loadModConf->annotation_match, pvals[i].val.d.ar)))
+ ABORT_FINALIZE(ret);
+ } else if(!strcmp(modpblk.descr[i].name, "de_dot")) {
+ loadModConf->de_dot = pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "de_dot_separator")) {
+ free(loadModConf->de_dot_separator);
+ loadModConf->de_dot_separator = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+#if HAVE_LOADSAMPLESFROMSTRING == 1
+ } else if(!strcmp(modpblk.descr[i].name, "filenamerules")) {
+ free(loadModConf->fnRules);
+ CHKiRet((array_to_rules(pvals[i].val.d.ar, &loadModConf->fnRules)));
+#endif
+ } else if(!strcmp(modpblk.descr[i].name, "filenamerulebase")) {
+ free(loadModConf->fnRulebase);
+ loadModConf->fnRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ fp = fopen((const char*)loadModConf->fnRulebase, "r");
+ if(fp == NULL) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ iRet = RS_RET_NO_FILE_ACCESS;
+ LogError(0, iRet,
+ "error: filenamerulebase file %s couldn't be accessed: %s\n",
+ loadModConf->fnRulebase, errStr);
+ ABORT_FINALIZE(iRet);
+ } else {
+ fclose(fp);
+ fp = NULL;
+ }
+#if HAVE_LOADSAMPLESFROMSTRING == 1
+ } else if(!strcmp(modpblk.descr[i].name, "containerrules")) {
+ free(loadModConf->contRules);
+ CHKiRet((array_to_rules(pvals[i].val.d.ar, &loadModConf->contRules)));
+#endif
+ } else if(!strcmp(modpblk.descr[i].name, "containerrulebase")) {
+ free(loadModConf->contRulebase);
+ loadModConf->contRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ fp = fopen((const char*)loadModConf->contRulebase, "r");
+ if(fp == NULL) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ iRet = RS_RET_NO_FILE_ACCESS;
+ LogError(0, iRet,
+ "error: containerrulebase file %s couldn't be accessed: %s\n",
+ loadModConf->contRulebase, errStr);
+ ABORT_FINALIZE(iRet);
+ } else {
+ fclose(fp);
+ fp = NULL;
+ }
+ } else if(!strcmp(modpblk.descr[i].name, "busyretryinterval")) {
+ loadModConf->busyRetryInterval = pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "sslpartialchain")) {
+#if defined(SUPPORT_SSL_PARTIAL_CHAIN)
+ loadModConf->sslPartialChain = pvals[i].val.d.n;
+#else
+ LogMsg(0, RS_RET_VALUE_NOT_IN_THIS_MODE, LOG_INFO,
+ "sslpartialchain is only supported for OpenSSL\n");
+#endif
+ } else if(!strcmp(modpblk.descr[i].name, "cacheentryttl")) {
+ loadModConf->cacheEntryTTL = pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "cacheexpireinterval")) {
+ loadModConf->cacheExpireInterval = pvals[i].val.d.n;
+ } else {
+ dbgprintf("mmkubernetes: program error, non-handled "
+ "param '%s' in module() block\n", modpblk.descr[i].name);
+ /* todo: error message? */
+ }
+ }
+
+#if HAVE_LOADSAMPLESFROMSTRING == 1
+ if (loadModConf->fnRules && loadModConf->fnRulebase) {
+ LogError(0, RS_RET_CONFIG_ERROR,
+ "mmkubernetes: only 1 of filenamerules or filenamerulebase may be used");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+ if (loadModConf->contRules && loadModConf->contRulebase) {
+ LogError(0, RS_RET_CONFIG_ERROR,
+ "mmkubernetes: only 1 of containerrules or containerrulebase may be used");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+#endif
+
+ if ((loadModConf->cacheExpireInterval > -1)) {
+ if ((loadModConf->cacheEntryTTL < 0)) {
+ LogError(0, RS_RET_CONFIG_ERROR,
+ "mmkubernetes: cacheentryttl value [%d] is invalid - "
+ "value must be 0 or greater",
+ loadModConf->cacheEntryTTL);
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+ }
+
+ /* set defaults */
+ if(loadModConf->srcMetadataPath == NULL)
+ loadModConf->srcMetadataPath = (uchar *) strdup(DFLT_SRCMD_PATH);
+ if(loadModConf->dstMetadataPath == NULL)
+ loadModConf->dstMetadataPath = (uchar *) strdup(DFLT_DSTMD_PATH);
+ if(loadModConf->de_dot_separator == NULL)
+ loadModConf->de_dot_separator = (uchar *) strdup(DFLT_DE_DOT_SEPARATOR);
+ if(loadModConf->de_dot_separator)
+ loadModConf->de_dot_separator_len = strlen((const char *)loadModConf->de_dot_separator);
+#if HAVE_LOADSAMPLESFROMSTRING == 1
+ if (loadModConf->fnRules == NULL && loadModConf->fnRulebase == NULL)
+ loadModConf->fnRules = strdup(DFLT_FILENAME_LNRULES);
+ if (loadModConf->contRules == NULL && loadModConf->contRulebase == NULL)
+ loadModConf->contRules = strdup(DFLT_CONTAINER_LNRULES);
+#else
+ if (loadModConf->fnRulebase == NULL)
+ loadModConf->fnRulebase = (uchar *)strdup(DFLT_FILENAME_RULEBASE);
+ if (loadModConf->contRulebase == NULL)
+ loadModConf->contRulebase = (uchar *)strdup(DFLT_CONTAINER_RULEBASE);
+#endif
+ caches = calloc(1, sizeof(struct cache_s *));
+
+finalize_it:
+ if (fp)
+ fclose(fp);
+ if(pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ free(pData->kubernetesUrl);
+ msgPropDescrDestruct(pData->srcMetadataDescr);
+ free(pData->srcMetadataDescr);
+ free(pData->dstMetadataPath);
+ free(pData->caCertFile);
+ free(pData->myCertFile);
+ free(pData->myPrivKeyFile);
+ free(pData->token);
+ free(pData->tokenFile);
+ free(pData->fnRules);
+ free(pData->fnRulebase);
+ ln_exitCtx(pData->fnCtxln);
+ free(pData->contRules);
+ free(pData->contRulebase);
+ ln_exitCtx(pData->contCtxln);
+ free_annotationmatch(&pData->annotation_match);
+ free(pData->de_dot_separator);
+ msgPropDescrDestruct(pData->contNameDescr);
+ free(pData->contNameDescr);
+ msgPropDescrDestruct(pData->contIdFullDescr);
+ free(pData->contIdFullDescr);
+ENDfreeInstance
+
+static size_t curlCB(char *data, size_t size, size_t nmemb, void *usrptr)
+{
+ DEFiRet;
+ wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t *) usrptr;
+ char * buf;
+ size_t newlen;
+
+ newlen = pWrkrData->curlRplyLen + size * nmemb;
+ CHKmalloc(buf = realloc(pWrkrData->curlRply, newlen));
+ memcpy(buf + pWrkrData->curlRplyLen, data, size * nmemb);
+ pWrkrData->curlRply = buf;
+ pWrkrData->curlRplyLen = newlen;
+
+finalize_it:
+ if (iRet != RS_RET_OK) {
+ return 0;
+ }
+ return size * nmemb;
+}
+
+#if defined(SUPPORT_SSL_PARTIAL_CHAIN)
+static CURLcode set_ssl_partial_chain(CURL *curl, void *ssl_ctx, void *userptr)
+{
+ (void)userptr; /* currently unused */
+ CURLcode rv = CURLE_ABORTED_BY_CALLBACK;
+ X509_STORE *store = NULL;
+
+ store = SSL_CTX_get_cert_store((SSL_CTX *)ssl_ctx);
+ if(!store)
+ goto finalize_it;
+ if(!X509_STORE_set_flags(store, X509_V_FLAG_PARTIAL_CHAIN))
+ goto finalize_it;
+ rv = CURLE_OK;
+finalize_it:
+ return rv;
+}
+#endif
+
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ CURL *ctx;
+ struct curl_slist *hdr = NULL;
+ char *tokenHdr = NULL;
+ FILE *fp = NULL;
+ char *token = NULL;
+ char *statsName = NULL;
+
+ CHKiRet(statsobj.Construct(&(pWrkrData->stats)));
+ if ((-1 == asprintf(&statsName, "mmkubernetes(%s)", pWrkrData->pData->kubernetesUrl)) ||
+ (!statsName)) {
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+ CHKiRet(statsobj.SetName(pWrkrData->stats, (uchar *)statsName));
+ free(statsName);
+ statsName = NULL;
+ CHKiRet(statsobj.SetOrigin(pWrkrData->stats, UCHAR_CONSTANT("mmkubernetes")));
+ STATSCOUNTER_INIT(pWrkrData->k8sRecordSeen, pWrkrData->mutK8sRecordSeen);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("recordseen"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->k8sRecordSeen)));
+ STATSCOUNTER_INIT(pWrkrData->namespaceMetadataSuccess, pWrkrData->mutNamespaceMetadataSuccess);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadatasuccess"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataSuccess)));
+ STATSCOUNTER_INIT(pWrkrData->namespaceMetadataNotFound, pWrkrData->mutNamespaceMetadataNotFound);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadatanotfound"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataNotFound)));
+ STATSCOUNTER_INIT(pWrkrData->namespaceMetadataBusy, pWrkrData->mutNamespaceMetadataBusy);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadatabusy"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataBusy)));
+ STATSCOUNTER_INIT(pWrkrData->namespaceMetadataError, pWrkrData->mutNamespaceMetadataError);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadataerror"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataError)));
+ STATSCOUNTER_INIT(pWrkrData->podMetadataSuccess, pWrkrData->mutPodMetadataSuccess);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadatasuccess"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataSuccess)));
+ STATSCOUNTER_INIT(pWrkrData->podMetadataNotFound, pWrkrData->mutPodMetadataNotFound);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadatanotfound"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataNotFound)));
+ STATSCOUNTER_INIT(pWrkrData->podMetadataBusy, pWrkrData->mutPodMetadataBusy);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadatabusy"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataBusy)));
+ STATSCOUNTER_INIT(pWrkrData->podMetadataError, pWrkrData->mutPodMetadataError);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadataerror"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataError)));
+ STATSCOUNTER_INIT(pWrkrData->namespaceCacheNumEntries, pWrkrData->mutNamespaceCacheNumEntries);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacecachenumentries"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceCacheNumEntries)));
+ STATSCOUNTER_INIT(pWrkrData->podCacheNumEntries, pWrkrData->mutPodCacheNumEntries);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podcachenumentries"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podCacheNumEntries)));
+ STATSCOUNTER_INIT(pWrkrData->namespaceCacheHits, pWrkrData->mutNamespaceCacheHits);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacecachehits"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceCacheHits)));
+ STATSCOUNTER_INIT(pWrkrData->podCacheHits, pWrkrData->mutPodCacheHits);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podcachehits"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podCacheHits)));
+ STATSCOUNTER_INIT(pWrkrData->namespaceCacheMisses, pWrkrData->mutNamespaceCacheMisses);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacecachemisses"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceCacheMisses)));
+ STATSCOUNTER_INIT(pWrkrData->podCacheMisses, pWrkrData->mutPodCacheMisses);
+ CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podcachemisses"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podCacheMisses)));
+ CHKiRet(statsobj.ConstructFinalize(pWrkrData->stats));
+
+ hdr = curl_slist_append(hdr, "Content-Type: text/json; charset=utf-8");
+ if (pWrkrData->pData->token) {
+ if ((-1 == asprintf(&tokenHdr, "Authorization: Bearer %s", pWrkrData->pData->token)) ||
+ (!tokenHdr)) {
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+ } else if (pWrkrData->pData->tokenFile) {
+ struct stat statbuf;
+ fp = fopen((const char*)pWrkrData->pData->tokenFile, "r");
+ if (fp && !fstat(fileno(fp), &statbuf)) {
+ size_t bytesread;
+ CHKmalloc(token = malloc((statbuf.st_size+1)*sizeof(char)));
+ if (0 < (bytesread = fread(token, sizeof(char), statbuf.st_size, fp))) {
+ token[bytesread] = '\0';
+ if ((-1 == asprintf(&tokenHdr, "Authorization: Bearer %s", token)) ||
+ (!tokenHdr)) {
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+ }
+ free(token);
+ token = NULL;
+ }
+ if (fp) {
+ fclose(fp);
+ fp = NULL;
+ }
+ }
+ if (tokenHdr) {
+ hdr = curl_slist_append(hdr, tokenHdr);
+ free(tokenHdr);
+ }
+ pWrkrData->curlHdr = hdr;
+ ctx = curl_easy_init();
+ curl_easy_setopt(ctx, CURLOPT_HTTPHEADER, hdr);
+ curl_easy_setopt(ctx, CURLOPT_WRITEFUNCTION, curlCB);
+ curl_easy_setopt(ctx, CURLOPT_WRITEDATA, pWrkrData);
+ if(pWrkrData->pData->caCertFile)
+ curl_easy_setopt(ctx, CURLOPT_CAINFO, pWrkrData->pData->caCertFile);
+ if(pWrkrData->pData->myCertFile)
+ curl_easy_setopt(ctx, CURLOPT_SSLCERT, pWrkrData->pData->myCertFile);
+ if(pWrkrData->pData->myPrivKeyFile)
+ curl_easy_setopt(ctx, CURLOPT_SSLKEY, pWrkrData->pData->myPrivKeyFile);
+ if(pWrkrData->pData->allowUnsignedCerts)
+ curl_easy_setopt(ctx, CURLOPT_SSL_VERIFYPEER, 0);
+ if(pWrkrData->pData->skipVerifyHost)
+ curl_easy_setopt(ctx, CURLOPT_SSL_VERIFYHOST, 0);
+#if defined(SUPPORT_SSL_PARTIAL_CHAIN)
+ if(pWrkrData->pData->sslPartialChain) {
+ curl_easy_setopt(ctx, CURLOPT_SSL_CTX_FUNCTION, set_ssl_partial_chain);
+ curl_easy_setopt(ctx, CURLOPT_SSL_CTX_DATA, NULL);
+ }
+#endif
+ pWrkrData->curlCtx = ctx;
+finalize_it:
+ free(token);
+ free(statsName);
+ if ((iRet != RS_RET_OK) && pWrkrData->stats) {
+ statsobj.Destruct(&(pWrkrData->stats));
+ }
+ if (fp) {
+ fclose(fp);
+ }
+ENDcreateWrkrInstance
+
+
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ curl_easy_cleanup(pWrkrData->curlCtx);
+ curl_slist_free_all(pWrkrData->curlHdr);
+ statsobj.Destruct(&(pWrkrData->stats));
+ENDfreeWrkrInstance
+
+
+/* next function is work-around to avoid type-unsafe casts. It looks
+ * like not really needed in practice, but gcc 8 complains and doing
+ * it 100% correct for sure does not hurt ;-) -- rgerhards, 2018-07-19
+ */
+static void
+hashtable_json_object_put(void *jso)
+{
+ json_object_put((struct fjson_object *)jso);
+}
+
+static void
+cache_entry_free(struct cache_entry_s *cache_entry)
+{
+ if (NULL != cache_entry) {
+ if (cache_entry->data) {
+ hashtable_json_object_put(cache_entry->data);
+ cache_entry->data = NULL;
+ }
+ free(cache_entry);
+ }
+}
+
+static void
+cache_entry_free_raw(void *cache_entry_void)
+{
+ cache_entry_free((struct cache_entry_s *)cache_entry_void);
+}
+
+static struct cache_s *
+cacheNew(instanceData *pData)
+{
+ DEFiRet;
+ struct cache_s *cache = NULL;
+ time_t now;
+ int need_mutex_destroy = 0;
+
+ CHKmalloc(cache = (struct cache_s *)calloc(1, sizeof(struct cache_s)));
+ CHKmalloc(cache->cacheMtx = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t)));
+ CHKmalloc(cache->mdHt = create_hashtable(100, hash_from_string,
+ key_equals_string, cache_entry_free_raw));
+ CHKmalloc(cache->nsHt = create_hashtable(100, hash_from_string,
+ key_equals_string, cache_entry_free_raw));
+ CHKiConcCtrl(pthread_mutex_init(cache->cacheMtx, NULL));
+ need_mutex_destroy = 1;
+ datetime.GetTime(&now);
+ cache->kbUrl = pData->kubernetesUrl;
+ cache->expirationTime = 0;
+ if (pData->cacheExpireInterval > -1)
+ cache->expirationTime = pData->cacheExpireInterval + pData->cacheEntryTTL + now;
+ cache->lastBusyTime = 0;
+ dbgprintf("mmkubernetes: created cache mdht [%p] nsht [%p]\n",
+ cache->mdHt, cache->nsHt);
+
+finalize_it:
+ if (iRet != RS_RET_OK) {
+ LogError(errno, iRet, "mmkubernetes: cacheNew: unable to create metadata cache for %s",
+ pData->kubernetesUrl);
+ if (cache) {
+ if (cache->mdHt)
+ hashtable_destroy(cache->mdHt, 1);
+ if (cache->nsHt)
+ hashtable_destroy(cache->nsHt, 1);
+ if (cache->cacheMtx) {
+ if (need_mutex_destroy)
+ pthread_mutex_destroy(cache->cacheMtx);
+ free(cache->cacheMtx);
+ }
+ free(cache);
+ cache = NULL;
+ }
+ }
+ return cache;
+}
+
+
+static void cacheFree(struct cache_s *cache)
+{
+ hashtable_destroy(cache->mdHt, 1);
+ hashtable_destroy(cache->nsHt, 1);
+ pthread_mutex_destroy(cache->cacheMtx);
+ free(cache->cacheMtx);
+ free(cache);
+}
+
+/* must be called with cache->cacheMtx held */
+/* assumes caller has reference to jso (json_object_get or is a new object) */
+static struct cache_entry_s *cache_entry_new(time_t ttl, struct fjson_object *jso)
+{
+ DEFiRet;
+ struct cache_entry_s *cache_entry = NULL;
+
+ CHKmalloc(cache_entry = malloc(sizeof(struct cache_entry_s)));
+ cache_entry->ttl = ttl;
+ cache_entry->data = (void *)jso;
+finalize_it:
+ if (iRet) {
+ free(cache_entry);
+ cache_entry = NULL;
+ }
+ return cache_entry;
+}
+
+static int cache_delete_expired_entries(wrkrInstanceData_t *pWrkrData, int isnsmd, time_t now)
+{
+ struct hashtable *ht = isnsmd ? pWrkrData->pData->cache->nsHt : pWrkrData->pData->cache->mdHt;
+ struct hashtable_itr *itr = NULL;
+ int more;
+
+ if ((pWrkrData->pData->cacheExpireInterval < 0) || (now < pWrkrData->pData->cache->expirationTime)) {
+ return 0; /* not enabled or not time yet */
+ }
+
+ /* set next expiration time */
+ pWrkrData->pData->cache->expirationTime = now + pWrkrData->pData->cacheExpireInterval;
+
+ if (hashtable_count(ht) < 1)
+ return 1; /* expire interval hit but nothing to do */
+
+ itr = hashtable_iterator(ht);
+ if (NULL == itr)
+ return 1; /* expire interval hit but nothing to do - err? */
+
+ do {
+ struct cache_entry_s *cache_entry = (struct cache_entry_s *)hashtable_iterator_value(itr);
+
+ if (now >= cache_entry->ttl) {
+ cache_entry_free(cache_entry);
+ if (isnsmd) {
+ STATSCOUNTER_DEC(pWrkrData->namespaceCacheNumEntries,
+ pWrkrData->mutNamespaceCacheNumEntries);
+ } else {
+ STATSCOUNTER_DEC(pWrkrData->podCacheNumEntries,
+ pWrkrData->mutPodCacheNumEntries);
+ }
+ more = hashtable_iterator_remove(itr);
+ } else {
+ more = hashtable_iterator_advance(itr);
+ }
+ } while (more);
+ free(itr);
+ dbgprintf("mmkubernetes: cache_delete_expired_entries: cleaned [%s] cache - size is now [%llu]\n",
+ isnsmd ? "namespace" : "pod",
+ isnsmd ? pWrkrData->namespaceCacheNumEntries : pWrkrData->podCacheNumEntries);
+ return 1;
+}
+
+/* must be called with cache->cacheMtx held */
+static struct fjson_object *
+cache_entry_get(wrkrInstanceData_t *pWrkrData,
+ int isnsmd, const char *key, time_t now)
+{
+ struct fjson_object *jso = NULL;
+ struct cache_entry_s *cache_entry = NULL;
+ int checkttl = 1;
+ struct hashtable *ht = isnsmd ? pWrkrData->pData->cache->nsHt : pWrkrData->pData->cache->mdHt;
+
+ /* see if it is time for a general cache expiration */
+ if (cache_delete_expired_entries(pWrkrData, isnsmd, now))
+ checkttl = 0; /* no need to check ttl now */
+ cache_entry = (struct cache_entry_s *)hashtable_search(ht, (void *)key);
+ if (cache_entry && checkttl && (now >= cache_entry->ttl)) {
+ cache_entry = (struct cache_entry_s *)hashtable_remove(ht, (void *)key);
+ if (isnsmd) {
+ STATSCOUNTER_DEC(pWrkrData->namespaceCacheNumEntries,
+ pWrkrData->mutNamespaceCacheNumEntries);
+ } else {
+ STATSCOUNTER_DEC(pWrkrData->podCacheNumEntries,
+ pWrkrData->mutPodCacheNumEntries);
+ }
+ cache_entry_free(cache_entry);
+ cache_entry = NULL;
+ }
+ if (cache_entry) {
+ jso = (struct fjson_object *)cache_entry->data;
+ if (isnsmd) {
+ STATSCOUNTER_INC(pWrkrData->namespaceCacheHits,
+ pWrkrData->mutNamespaceCacheHits);
+ } else {
+ STATSCOUNTER_INC(pWrkrData->podCacheHits,
+ pWrkrData->mutPodCacheHits);
+ }
+ dbgprintf("mmkubernetes: cache_entry_get: cache hit for [%s] cache key [%s] - hits is now [%llu]\n",
+ isnsmd ? "namespace" : "pod", key,
+ isnsmd ? pWrkrData->namespaceCacheHits : pWrkrData->podCacheHits);
+ } else {
+ if (isnsmd) {
+ STATSCOUNTER_INC(pWrkrData->namespaceCacheMisses,
+ pWrkrData->mutNamespaceCacheMisses);
+ } else {
+ STATSCOUNTER_INC(pWrkrData->podCacheMisses,
+ pWrkrData->mutPodCacheMisses);
+ }
+ dbgprintf("mmkubernetes: cache_entry_get: cache miss for [%s] cache key [%s] - misses is now [%llu]\n",
+ isnsmd ? "namespace" : "pod", key,
+ isnsmd ? pWrkrData->namespaceCacheMisses : pWrkrData->podCacheMisses);
+ }
+
+ return jso;
+}
+
+/* must be called with cache->cacheMtx held */
+/* key is passed in - caller must copy or otherwise ensure it is ok to pass off
+ * ownership
+ */
+static rsRetVal
+cache_entry_add(wrkrInstanceData_t *pWrkrData,
+ int isnsmd, const char *key, struct fjson_object *jso, time_t now, const int bDupKey)
+{
+ DEFiRet;
+ struct cache_entry_s *cache_entry = NULL;
+ struct hashtable *ht = isnsmd ? pWrkrData->pData->cache->nsHt : pWrkrData->pData->cache->mdHt;
+
+ /* see if it is time for a general cache expiration */
+ (void)cache_delete_expired_entries(pWrkrData, isnsmd, now);
+ CHKmalloc(cache_entry = cache_entry_new(now + pWrkrData->pData->cacheEntryTTL, jso));
+ if (cache_entry) {
+ if (!hashtable_insert(ht, (void *)(bDupKey ? strdup(key) : key), cache_entry))
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+
+ if (isnsmd) {
+ STATSCOUNTER_INC(pWrkrData->namespaceCacheNumEntries,
+ pWrkrData->mutNamespaceCacheNumEntries);
+ } else {
+ STATSCOUNTER_INC(pWrkrData->podCacheNumEntries,
+ pWrkrData->mutPodCacheNumEntries);
+ }
+ cache_entry = NULL;
+ }
+finalize_it:
+ if (cache_entry)
+ cache_entry_free(cache_entry);
+ return iRet;
+}
+
+/* must be called with cache->cacheMtx held */
+static struct fjson_object *cache_entry_get_md(wrkrInstanceData_t *pWrkrData, const char *key, time_t now)
+{
+ return cache_entry_get(pWrkrData, 0, key, now);
+}
+
+/* must be called with cache->cacheMtx held */
+static struct fjson_object *cache_entry_get_nsmd(wrkrInstanceData_t *pWrkrData, const char *key, time_t now)
+{
+ return cache_entry_get(pWrkrData, 1, key, now);
+}
+
+/* must be called with cache->cacheMtx held */
+static rsRetVal cache_entry_add_md(wrkrInstanceData_t *pWrkrData, const char *key,
+ struct fjson_object *jso, time_t now)
+{
+ return cache_entry_add(pWrkrData, 0, key, jso, now, 0);
+}
+
+/* must be called with cache->cacheMtx held */
+static rsRetVal cache_entry_add_nsmd(wrkrInstanceData_t *pWrkrData, const char *key,
+ struct fjson_object *jso, time_t now)
+{
+ return cache_entry_add(pWrkrData, 1, key, jso, now, 1);
+}
+
+
+BEGINnewActInst
+ struct cnfparamvals *pvals = NULL;
+ int i;
+ FILE *fp = NULL;
+ char *rxstr = NULL;
+ char *srcMetadataPath = NULL;
+ char errStr[1024];
+CODESTARTnewActInst
+ DBGPRINTF("newActInst (mmkubernetes)\n");
+
+ pvals = nvlstGetParams(lst, &actpblk, NULL);
+ if(pvals == NULL) {
+ LogError(0, RS_RET_MISSING_CNFPARAMS, "mmkubernetes: "
+ "error processing config parameters [action(...)]");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("action param blk in mmkubernetes:\n");
+ cnfparamsPrint(&actpblk, pvals);
+ }
+
+ CODE_STD_STRING_REQUESTnewActInst(1)
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+ CHKiRet(createInstance(&pData));
+
+ pData->de_dot = loadModConf->de_dot;
+ pData->allowUnsignedCerts = loadModConf->allowUnsignedCerts;
+ pData->skipVerifyHost = loadModConf->skipVerifyHost;
+ pData->busyRetryInterval = loadModConf->busyRetryInterval;
+ pData->sslPartialChain = loadModConf->sslPartialChain;
+ pData->cacheEntryTTL = loadModConf->cacheEntryTTL;
+ pData->cacheExpireInterval = loadModConf->cacheExpireInterval;
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed) {
+ continue;
+ } else if(!strcmp(actpblk.descr[i].name, "kubernetesurl")) {
+ free(pData->kubernetesUrl);
+ pData->kubernetesUrl = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "srcmetadatapath")) {
+ msgPropDescrDestruct(pData->srcMetadataDescr);
+ free(pData->srcMetadataDescr);
+ CHKmalloc(pData->srcMetadataDescr = malloc(sizeof(msgPropDescr_t)));
+ srcMetadataPath = es_str2cstr(pvals[i].val.d.estr, NULL);
+ CHKiRet(msgPropDescrFill(pData->srcMetadataDescr, (uchar *)srcMetadataPath,
+ strlen(srcMetadataPath)));
+ /* todo: sanitize the path */
+ } else if(!strcmp(actpblk.descr[i].name, "dstmetadatapath")) {
+ free(pData->dstMetadataPath);
+ pData->dstMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ /* todo: sanitize the path */
+ } else if(!strcmp(actpblk.descr[i].name, "tls.cacert")) {
+ free(pData->caCertFile);
+ pData->caCertFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ fp = fopen((const char*)pData->caCertFile, "r");
+ if(fp == NULL) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ iRet = RS_RET_NO_FILE_ACCESS;
+ LogError(0, iRet,
+ "error: certificate file %s couldn't be accessed: %s\n",
+ pData->caCertFile, errStr);
+ ABORT_FINALIZE(iRet);
+ } else {
+ fclose(fp);
+ fp = NULL;
+ }
+ } else if(!strcmp(actpblk.descr[i].name, "tls.mycert")) {
+ pData->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ fp = fopen((const char*)pData->myCertFile, "r");
+ if(fp == NULL) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ iRet = RS_RET_NO_FILE_ACCESS;
+ LogError(0, iRet,
+ "error: 'tls.mycert' file %s couldn't be accessed: %s\n",
+ pData->myCertFile, errStr);
+ } else {
+ fclose(fp);
+ fp = NULL;
+ }
+ } else if(!strcmp(actpblk.descr[i].name, "tls.myprivkey")) {
+ pData->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ fp = fopen((const char*)pData->myPrivKeyFile, "r");
+ if(fp == NULL) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ iRet = RS_RET_NO_FILE_ACCESS;
+ LogError(0, iRet,
+ "error: 'tls.myprivkey' file %s couldn't be accessed: %s\n",
+ pData->myPrivKeyFile, errStr);
+ } else {
+ fclose(fp);
+ fp = NULL;
+ }
+ } else if(!strcmp(actpblk.descr[i].name, "allowunsignedcerts")) {
+ pData->allowUnsignedCerts = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "skipverifyhost")) {
+ pData->skipVerifyHost = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "token")) {
+ free(pData->token);
+ pData->token = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "tokenfile")) {
+ free(pData->tokenFile);
+ pData->tokenFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ fp = fopen((const char*)pData->tokenFile, "r");
+ if(fp == NULL) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ iRet = RS_RET_NO_FILE_ACCESS;
+ LogError(0, iRet,
+ "error: token file %s couldn't be accessed: %s\n",
+ pData->tokenFile, errStr);
+ ABORT_FINALIZE(iRet);
+ } else {
+ fclose(fp);
+ fp = NULL;
+ }
+ } else if(!strcmp(actpblk.descr[i].name, "annotation_match")) {
+ free_annotationmatch(&pData->annotation_match);
+ if (RS_RET_OK != (iRet = init_annotationmatch(&pData->annotation_match, pvals[i].val.d.ar)))
+ ABORT_FINALIZE(iRet);
+ } else if(!strcmp(actpblk.descr[i].name, "de_dot")) {
+ pData->de_dot = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "de_dot_separator")) {
+ free(pData->de_dot_separator);
+ pData->de_dot_separator = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+#if HAVE_LOADSAMPLESFROMSTRING == 1
+ } else if(!strcmp(modpblk.descr[i].name, "filenamerules")) {
+ free(pData->fnRules);
+ CHKiRet((array_to_rules(pvals[i].val.d.ar, &pData->fnRules)));
+#endif
+ } else if(!strcmp(modpblk.descr[i].name, "filenamerulebase")) {
+ free(pData->fnRulebase);
+ pData->fnRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ fp = fopen((const char*)pData->fnRulebase, "r");
+ if(fp == NULL) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ iRet = RS_RET_NO_FILE_ACCESS;
+ LogError(0, iRet,
+ "error: filenamerulebase file %s couldn't be accessed: %s\n",
+ pData->fnRulebase, errStr);
+ ABORT_FINALIZE(iRet);
+ } else {
+ fclose(fp);
+ fp = NULL;
+ }
+#if HAVE_LOADSAMPLESFROMSTRING == 1
+ } else if(!strcmp(modpblk.descr[i].name, "containerrules")) {
+ free(pData->contRules);
+ CHKiRet((array_to_rules(pvals[i].val.d.ar, &pData->contRules)));
+#endif
+ } else if(!strcmp(modpblk.descr[i].name, "containerrulebase")) {
+ free(pData->contRulebase);
+ pData->contRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+ fp = fopen((const char*)pData->contRulebase, "r");
+ if(fp == NULL) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ iRet = RS_RET_NO_FILE_ACCESS;
+ LogError(0, iRet,
+ "error: containerrulebase file %s couldn't be accessed: %s\n",
+ pData->contRulebase, errStr);
+ ABORT_FINALIZE(iRet);
+ } else {
+ fclose(fp);
+ fp = NULL;
+ }
+ } else if(!strcmp(actpblk.descr[i].name, "busyretryinterval")) {
+ pData->busyRetryInterval = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "sslpartialchain")) {
+#if defined(SUPPORT_SSL_PARTIAL_CHAIN)
+ pData->sslPartialChain = pvals[i].val.d.n;
+#else
+ LogMsg(0, RS_RET_VALUE_NOT_IN_THIS_MODE, LOG_INFO,
+ "sslpartialchain is only supported for OpenSSL\n");
+#endif
+ } else if(!strcmp(actpblk.descr[i].name, "cacheentryttl")) {
+ pData->cacheEntryTTL = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "cacheexpireinterval")) {
+ pData->cacheExpireInterval = pvals[i].val.d.n;
+ } else {
+ dbgprintf("mmkubernetes: program error, non-handled "
+ "param '%s' in action() block\n", actpblk.descr[i].name);
+ /* todo: error message? */
+ }
+ }
+
+#if HAVE_LOADSAMPLESFROMSTRING == 1
+ if (pData->fnRules && pData->fnRulebase) {
+ LogError(0, RS_RET_CONFIG_ERROR,
+ "mmkubernetes: only 1 of filenamerules or filenamerulebase may be used");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+ if (pData->contRules && pData->contRulebase) {
+ LogError(0, RS_RET_CONFIG_ERROR,
+ "mmkubernetes: only 1 of containerrules or containerrulebase may be used");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+#endif
+ CHKiRet(set_lnctx(&pData->fnCtxln, pData->fnRules, pData->fnRulebase,
+ loadModConf->fnRules, loadModConf->fnRulebase));
+ CHKiRet(set_lnctx(&pData->contCtxln, pData->contRules, pData->contRulebase,
+ loadModConf->contRules, loadModConf->contRulebase));
+
+ if ((pData->cacheExpireInterval > -1)) {
+ if ((pData->cacheEntryTTL < 0)) {
+ LogError(0, RS_RET_CONFIG_ERROR,
+ "mmkubernetes: cacheentryttl value [%d] is invalid - "
+ "value must be 0 or greater",
+ pData->cacheEntryTTL);
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+ }
+
+ if(pData->kubernetesUrl == NULL) {
+ if(loadModConf->kubernetesUrl == NULL) {
+ CHKmalloc(pData->kubernetesUrl = (uchar *) strdup(DFLT_KUBERNETES_URL));
+ } else {
+ CHKmalloc(pData->kubernetesUrl = (uchar *) strdup((char *) loadModConf->kubernetesUrl));
+ }
+ }
+ if(pData->srcMetadataDescr == NULL) {
+ CHKmalloc(pData->srcMetadataDescr = malloc(sizeof(msgPropDescr_t)));
+ CHKiRet(msgPropDescrFill(pData->srcMetadataDescr, loadModConf->srcMetadataPath,
+ strlen((char *)loadModConf->srcMetadataPath)));
+ }
+ if(pData->dstMetadataPath == NULL)
+ pData->dstMetadataPath = (uchar *) strdup((char *) loadModConf->dstMetadataPath);
+ if(pData->caCertFile == NULL && loadModConf->caCertFile)
+ pData->caCertFile = (uchar *) strdup((char *) loadModConf->caCertFile);
+ if(pData->myCertFile == NULL && loadModConf->myCertFile)
+ pData->myCertFile = (uchar *) strdup((char *) loadModConf->myCertFile);
+ if(pData->myPrivKeyFile == NULL && loadModConf->myPrivKeyFile)
+ pData->myPrivKeyFile = (uchar *) strdup((char *) loadModConf->myPrivKeyFile);
+ if(pData->token == NULL && loadModConf->token)
+ pData->token = (uchar *) strdup((char *) loadModConf->token);
+ if(pData->tokenFile == NULL && loadModConf->tokenFile)
+ pData->tokenFile = (uchar *) strdup((char *) loadModConf->tokenFile);
+ if(pData->de_dot_separator == NULL && loadModConf->de_dot_separator)
+ pData->de_dot_separator = (uchar *) strdup((char *) loadModConf->de_dot_separator);
+ if((pData->annotation_match.nmemb == 0) && (loadModConf->annotation_match.nmemb > 0))
+ copy_annotationmatch(&loadModConf->annotation_match, &pData->annotation_match);
+
+ if(pData->de_dot_separator)
+ pData->de_dot_separator_len = strlen((const char *)pData->de_dot_separator);
+
+ CHKmalloc(pData->contNameDescr = malloc(sizeof(msgPropDescr_t)));
+ CHKiRet(msgPropDescrFill(pData->contNameDescr, (uchar*) DFLT_CONTAINER_NAME,
+ strlen(DFLT_CONTAINER_NAME)));
+ CHKmalloc(pData->contIdFullDescr = malloc(sizeof(msgPropDescr_t)));
+ CHKiRet(msgPropDescrFill(pData->contIdFullDescr, (uchar*) DFLT_CONTAINER_ID_FULL,
+ strlen(DFLT_CONTAINER_NAME)));
+
+ /* get the cache for this url */
+ for(i = 0; caches[i] != NULL; i++) {
+ if(!strcmp((char *) pData->kubernetesUrl, (char *) caches[i]->kbUrl))
+ break;
+ }
+ if(caches[i] != NULL) {
+ pData->cache = caches[i];
+ } else {
+ CHKmalloc(pData->cache = cacheNew(pData));
+ struct cache_s **new_caches = realloc(caches, (i + 2) * sizeof(struct cache_s *));
+ CHKmalloc(new_caches);
+ caches = new_caches;
+ caches[i] = pData->cache;
+ caches[i + 1] = NULL;
+ }
+CODE_STD_FINALIZERnewActInst
+ if(pvals != NULL)
+ cnfparamvalsDestruct(pvals, &actpblk);
+ if(fp)
+ fclose(fp);
+ free(rxstr);
+ free(srcMetadataPath);
+ENDnewActInst
+
+
+/* legacy config format is not supported */
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ if(strncmp((char *) p, ":mmkubernetes:", sizeof(":mmkubernetes:") - 1)) {
+ LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "mmkubernetes supports only v6+ config format, use: "
+ "action(type=\"mmkubernetes\" ...)");
+ }
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ int i;
+
+ free(pModConf->kubernetesUrl);
+ free(pModConf->srcMetadataPath);
+ free(pModConf->dstMetadataPath);
+ free(pModConf->caCertFile);
+ free(pModConf->myCertFile);
+ free(pModConf->myPrivKeyFile);
+ free(pModConf->token);
+ free(pModConf->tokenFile);
+ free(pModConf->de_dot_separator);
+ free(pModConf->fnRules);
+ free(pModConf->fnRulebase);
+ free(pModConf->contRules);
+ free(pModConf->contRulebase);
+ free_annotationmatch(&pModConf->annotation_match);
+ for(i = 0; caches[i] != NULL; i++) {
+ dbgprintf("mmkubernetes: freeing cache [%d] mdht [%p] nsht [%p]\n",
+ i, caches[i]->mdHt, caches[i]->nsHt);
+ cacheFree(caches[i]);
+ }
+ free(caches);
+ENDfreeCnf
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ dbgprintf("mmkubernetes\n");
+ dbgprintf("\tkubernetesUrl='%s'\n", pData->kubernetesUrl);
+ dbgprintf("\tsrcMetadataPath='%s'\n", pData->srcMetadataDescr->name);
+ dbgprintf("\tdstMetadataPath='%s'\n", pData->dstMetadataPath);
+ dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile);
+ dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile);
+ dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile);
+ dbgprintf("\tallowUnsignedCerts='%d'\n", pData->allowUnsignedCerts);
+ dbgprintf("\tskipVerifyHost='%d'\n", pData->skipVerifyHost);
+ dbgprintf("\ttoken='%s'\n", pData->token);
+ dbgprintf("\ttokenFile='%s'\n", pData->tokenFile);
+ dbgprintf("\tde_dot='%d'\n", pData->de_dot);
+ dbgprintf("\tde_dot_separator='%s'\n", pData->de_dot_separator);
+ dbgprintf("\tfilenamerulebase='%s'\n", pData->fnRulebase);
+ dbgprintf("\tcontainerrulebase='%s'\n", pData->contRulebase);
+#if HAVE_LOADSAMPLESFROMSTRING == 1
+ dbgprintf("\tfilenamerules='%s'\n", pData->fnRules);
+ dbgprintf("\tcontainerrules='%s'\n", pData->contRules);
+#endif
+ dbgprintf("\tbusyretryinterval='%d'\n", pData->busyRetryInterval);
+ dbgprintf("\tcacheentryttl='%d'\n", pData->cacheEntryTTL);
+ dbgprintf("\tcacheexpireinterval='%d'\n", pData->cacheExpireInterval);
+ENDdbgPrintInstInfo
+
+
+BEGINtryResume
+CODESTARTtryResume
+ENDtryResume
+
+static rsRetVal
+extractMsgMetadata(smsg_t *pMsg, instanceData *pData, struct json_object **json)
+{
+ DEFiRet;
+ uchar *filename = NULL, *container_name = NULL, *container_id_full = NULL;
+ rs_size_t fnLen, container_name_len, container_id_full_len;
+ unsigned short freeFn = 0, free_container_name = 0, free_container_id_full = 0;
+ int lnret;
+ struct json_object *cnid = NULL;
+
+ if (!json)
+ FINALIZE;
+ *json = NULL;
+ /* extract metadata from the CONTAINER_NAME field and see if CONTAINER_ID_FULL is present */
+ container_name = MsgGetProp(pMsg, NULL, pData->contNameDescr,
+ &container_name_len, &free_container_name, NULL);
+ container_id_full = MsgGetProp(
+ pMsg, NULL, pData->contIdFullDescr, &container_id_full_len, &free_container_id_full, NULL);
+
+ if (container_name && container_id_full && container_name_len && container_id_full_len) {
+ dbgprintf("mmkubernetes: CONTAINER_NAME: '%s' CONTAINER_ID_FULL: '%s'.\n",
+ container_name, container_id_full);
+ if ((lnret = ln_normalize(pData->contCtxln, (char*)container_name,
+ container_name_len, json))) {
+ if (LN_WRONGPARSER != lnret) {
+ LogMsg(0, RS_RET_ERR, LOG_ERR,
+ "mmkubernetes: error parsing container_name [%s]: [%d]",
+ container_name, lnret);
+
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ /* else assume parser didn't find a match and fall through */
+ } else if (fjson_object_object_get_ex(*json, "pod_name", NULL) &&
+ fjson_object_object_get_ex(*json, "namespace_name", NULL) &&
+ fjson_object_object_get_ex(*json, "container_name", NULL)) {
+ /* if we have fields for pod name, namespace name, container name,
+ * and container id, we are good to go */
+ /* add field for container id */
+ json_object_object_add(*json, "container_id",
+ json_object_new_string_len((const char *)container_id_full,
+ container_id_full_len));
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+ }
+
+ /* extract metadata from the file name */
+ filename = MsgGetProp(pMsg, NULL, pData->srcMetadataDescr, &fnLen, &freeFn, NULL);
+ if((filename == NULL) || (fnLen == 0))
+ ABORT_FINALIZE(RS_RET_NOT_FOUND);
+
+ dbgprintf("mmkubernetes: filename: '%s' len %d.\n", filename, fnLen);
+ if ((lnret = ln_normalize(pData->fnCtxln, (char*)filename, fnLen, json))) {
+ if (LN_WRONGPARSER != lnret) {
+ LogMsg(0, RS_RET_ERR, LOG_ERR,
+ "mmkubernetes: error parsing container_name [%s]: [%d]",
+ filename, lnret);
+
+ ABORT_FINALIZE(RS_RET_ERR);
+ } else {
+ /* no match */
+ ABORT_FINALIZE(RS_RET_NOT_FOUND);
+ }
+ }
+ /* if we have fields for pod name, namespace name, container name,
+ * and container id, we are good to go */
+ if (fjson_object_object_get_ex(*json, "pod_name", NULL) &&
+ fjson_object_object_get_ex(*json, "namespace_name", NULL) &&
+ fjson_object_object_get_ex(*json, "container_name_and_id", &cnid)) {
+ /* parse container_name_and_id into container_name and container_id */
+ const char *container_name_and_id = json_object_get_string(cnid);
+ const char *last_dash = NULL;
+ if (container_name_and_id && (last_dash = strrchr(container_name_and_id, '-')) &&
+ *(last_dash + 1) && (last_dash != container_name_and_id)) {
+ json_object_object_add(*json, "container_name",
+ json_object_new_string_len(container_name_and_id,
+ (int)(last_dash-container_name_and_id)));
+ json_object_object_add(*json, "container_id",
+ json_object_new_string(last_dash + 1));
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+ }
+ ABORT_FINALIZE(RS_RET_NOT_FOUND);
+finalize_it:
+ if(freeFn)
+ free(filename);
+ if (free_container_name)
+ free(container_name);
+ if (free_container_id_full)
+ free(container_id_full);
+ if (iRet != RS_RET_OK) {
+ json_object_put(*json);
+ *json = NULL;
+ }
+ RETiRet;
+}
+
+
+static rsRetVal
+queryKB(wrkrInstanceData_t *pWrkrData, char *url, time_t now, struct json_object **rply)
+{
+ DEFiRet;
+ CURLcode ccode;
+ struct json_tokener *jt = NULL;
+ struct json_object *jo;
+ long resp_code = 400;
+
+ if (pWrkrData->pData->cache->lastBusyTime) {
+ now -= pWrkrData->pData->cache->lastBusyTime;
+ if (now < pWrkrData->pData->busyRetryInterval) {
+ LogMsg(0, RS_RET_RETRY, LOG_DEBUG,
+ "mmkubernetes: Waited [%ld] of [%d] seconds for the requested url [%s]\n",
+ now, pWrkrData->pData->busyRetryInterval, url);
+ ABORT_FINALIZE(RS_RET_RETRY);
+ } else {
+ LogMsg(0, RS_RET_OK, LOG_DEBUG,
+ "mmkubernetes: Cleared busy status after [%d] seconds - "
+ "will retry the requested url [%s]\n",
+ pWrkrData->pData->busyRetryInterval, url);
+ pWrkrData->pData->cache->lastBusyTime = 0;
+ }
+ }
+
+ /* query kubernetes for pod info */
+ ccode = curl_easy_setopt(pWrkrData->curlCtx, CURLOPT_URL, url);
+ if(ccode != CURLE_OK)
+ ABORT_FINALIZE(RS_RET_ERR);
+ if(CURLE_OK != (ccode = curl_easy_perform(pWrkrData->curlCtx))) {
+ LogMsg(0, RS_RET_ERR, LOG_ERR,
+ "mmkubernetes: failed to connect to [%s] - %d:%s\n",
+ url, ccode, curl_easy_strerror(ccode));
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ if(CURLE_OK != (ccode = curl_easy_getinfo(pWrkrData->curlCtx,
+ CURLINFO_RESPONSE_CODE, &resp_code))) {
+ LogMsg(0, RS_RET_ERR, LOG_ERR,
+ "mmkubernetes: could not get response code from query to [%s] - %d:%s\n",
+ url, ccode, curl_easy_strerror(ccode));
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ if(resp_code == 401) {
+ LogMsg(0, RS_RET_ERR, LOG_ERR,
+ "mmkubernetes: Unauthorized: not allowed to view url - "
+ "check token/auth credentials [%s]\n",
+ url);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ if(resp_code == 403) {
+ LogMsg(0, RS_RET_ERR, LOG_ERR,
+ "mmkubernetes: Forbidden: no access - "
+ "check permissions to view url [%s]\n",
+ url);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ if(resp_code == 404) {
+ LogMsg(0, RS_RET_NOT_FOUND, LOG_INFO,
+ "mmkubernetes: Not Found: the resource does not exist at url [%s]\n",
+ url);
+ ABORT_FINALIZE(RS_RET_NOT_FOUND);
+ }
+ if(resp_code == 429) {
+ if (pWrkrData->pData->busyRetryInterval) {
+ pWrkrData->pData->cache->lastBusyTime = now;
+ }
+
+ LogMsg(0, RS_RET_RETRY, LOG_INFO,
+ "mmkubernetes: Too Many Requests: the server is too heavily loaded "
+ "to provide the data for the requested url [%s]\n",
+ url);
+ ABORT_FINALIZE(RS_RET_RETRY);
+ }
+ if(resp_code != 200) {
+ LogMsg(0, RS_RET_ERR, LOG_ERR,
+ "mmkubernetes: server returned unexpected code [%ld] for url [%s]\n",
+ resp_code, url);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ /* parse retrieved data */
+ jt = json_tokener_new();
+ json_tokener_reset(jt);
+ jo = json_tokener_parse_ex(jt, pWrkrData->curlRply, pWrkrData->curlRplyLen);
+ json_tokener_free(jt);
+ if(!json_object_is_type(jo, json_type_object)) {
+ json_object_put(jo);
+ jo = NULL;
+ LogMsg(0, RS_RET_JSON_PARSE_ERR, LOG_INFO,
+ "mmkubernetes: unable to parse string as JSON:[%.*s]\n",
+ (int)pWrkrData->curlRplyLen, pWrkrData->curlRply);
+ ABORT_FINALIZE(RS_RET_JSON_PARSE_ERR);
+ }
+
+ dbgprintf("mmkubernetes: queryKB reply:\n%s\n",
+ json_object_to_json_string_ext(jo, JSON_C_TO_STRING_PRETTY));
+
+ *rply = jo;
+
+finalize_it:
+ if(pWrkrData->curlRply != NULL) {
+ free(pWrkrData->curlRply);
+ pWrkrData->curlRply = NULL;
+ pWrkrData->curlRplyLen = 0;
+ }
+ RETiRet;
+}
+
+
+/* versions < 8.16.0 don't support BEGINdoAction_NoStrings */
+#if defined(BEGINdoAction_NoStrings)
+BEGINdoAction_NoStrings
+ smsg_t **ppMsg = (smsg_t **) pMsgData;
+ smsg_t *pMsg = ppMsg[0];
+#else
+BEGINdoAction
+ smsg_t *pMsg = (smsg_t*) ppString[0];
+#endif
+ const char *podName = NULL, *ns = NULL, *containerName = NULL,
+ *containerID = NULL;
+ char *mdKey = NULL;
+ struct json_object *jMetadata = NULL, *jMetadataCopy = NULL, *jMsgMeta = NULL,
+ *jo = NULL;
+ int add_pod_metadata = 1;
+ time_t now;
+
+CODESTARTdoAction
+ CHKiRet_Hdlr(extractMsgMetadata(pMsg, pWrkrData->pData, &jMsgMeta)) {
+ ABORT_FINALIZE((iRet == RS_RET_NOT_FOUND) ? RS_RET_OK : iRet);
+ }
+
+ datetime.GetTime(&now);
+ STATSCOUNTER_INC(pWrkrData->k8sRecordSeen, pWrkrData->mutK8sRecordSeen);
+
+ if (fjson_object_object_get_ex(jMsgMeta, "pod_name", &jo))
+ podName = json_object_get_string(jo);
+ if (fjson_object_object_get_ex(jMsgMeta, "namespace_name", &jo))
+ ns = json_object_get_string(jo);
+ if (fjson_object_object_get_ex(jMsgMeta, "container_name", &jo))
+ containerName = json_object_get_string(jo);
+ if (fjson_object_object_get_ex(jMsgMeta, "container_id", &jo))
+ containerID = json_object_get_string(jo);
+ assert(podName != NULL);
+ assert(ns != NULL);
+ assert(containerName != NULL);
+ assert(containerID != NULL);
+
+ dbgprintf("mmkubernetes:\n podName: '%s'\n namespace: '%s'\n containerName: '%s'\n"
+ " containerID: '%s'\n", podName, ns, containerName, containerID);
+
+ /* check cache for metadata */
+ if ((-1 == asprintf(&mdKey, "%s_%s_%s", ns, podName, containerName)) ||
+ (!mdKey)) {
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+ pthread_mutex_lock(pWrkrData->pData->cache->cacheMtx);
+ jMetadata = cache_entry_get_md(pWrkrData, mdKey, now);
+
+ if(jMetadata == NULL) {
+ char *url = NULL;
+ struct json_object *jReply = NULL, *jo2 = NULL, *jNsMeta = NULL, *jPodData = NULL;
+
+ /* check cache for namespace metadata */
+ jNsMeta = cache_entry_get_nsmd(pWrkrData, (const char *)ns, now);
+
+ if(jNsMeta == NULL) {
+ /* query kubernetes for namespace info */
+ /* todo: move url definitions elsewhere */
+ if ((-1 == asprintf(&url, "%s/api/v1/namespaces/%s",
+ (char *) pWrkrData->pData->kubernetesUrl, ns)) ||
+ (!url)) {
+ pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+ iRet = queryKB(pWrkrData, url, now, &jReply);
+ free(url);
+ if (iRet == RS_RET_NOT_FOUND) {
+ /* negative cache namespace - make a dummy empty namespace metadata object */
+ jNsMeta = json_object_new_object();
+ STATSCOUNTER_INC(pWrkrData->namespaceMetadataNotFound,
+ pWrkrData->mutNamespaceMetadataNotFound);
+ } else if (iRet == RS_RET_RETRY) {
+ /* server is busy - retry or error */
+ STATSCOUNTER_INC(pWrkrData->namespaceMetadataBusy,
+ pWrkrData->mutNamespaceMetadataBusy);
+ if (0 == pWrkrData->pData->busyRetryInterval) {
+ pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ add_pod_metadata = 0; /* don't cache pod metadata either - retry both */
+ } else if (iRet != RS_RET_OK) {
+ /* one of many possible transient errors: apiserver error, network, config, auth.
+ * Instead of causing hard error and disabling this module, we can return
+ * basic namespace metadata that is extracted from container log file path.
+ * When transient error resolves, other metadata will become
+ * available. For a new a new pod whose metadata is not yet cached, this
+ * will allow 401, 403, 500, etc. return status from apiserver treated
+ * similar to 404 returns.
+ * */
+ jNsMeta = json_object_new_object();
+ STATSCOUNTER_INC(pWrkrData->namespaceMetadataError,
+ pWrkrData->mutNamespaceMetadataError);
+ } else if (fjson_object_object_get_ex(jReply, "metadata", &jNsMeta)) {
+ jNsMeta = json_object_get(jNsMeta);
+ parse_labels_annotations(jNsMeta, &pWrkrData->pData->annotation_match,
+ pWrkrData->pData->de_dot,
+ (const char *)pWrkrData->pData->de_dot_separator,
+ pWrkrData->pData->de_dot_separator_len);
+ STATSCOUNTER_INC(pWrkrData->namespaceMetadataSuccess,
+ pWrkrData->mutNamespaceMetadataSuccess);
+ } else {
+ /* namespace with no metadata??? */
+ LogMsg(0, RS_RET_ERR, LOG_INFO,
+ "mmkubernetes: namespace [%s] has no metadata!\n", ns);
+ /* negative cache namespace - make a dummy empty namespace metadata object */
+ jNsMeta = json_object_new_object();
+ STATSCOUNTER_INC(pWrkrData->namespaceMetadataSuccess,
+ pWrkrData->mutNamespaceMetadataSuccess);
+ }
+
+ if(jNsMeta) {
+ if ((iRet = cache_entry_add_nsmd(pWrkrData, ns, jNsMeta, now))) {
+ ABORT_FINALIZE(iRet);
+ }
+ }
+ json_object_put(jReply);
+ jReply = NULL;
+ }
+
+ if ((-1 == asprintf(&url, "%s/api/v1/namespaces/%s/pods/%s",
+ (char *) pWrkrData->pData->kubernetesUrl, ns, podName)) ||
+ (!url)) {
+ pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+ iRet = queryKB(pWrkrData, url, now, &jReply);
+ free(url);
+ if (iRet == RS_RET_NOT_FOUND) {
+ /* negative cache pod - make a dummy empty pod metadata object */
+ iRet = RS_RET_OK;
+ STATSCOUNTER_INC(pWrkrData->podMetadataNotFound, pWrkrData->mutPodMetadataNotFound);
+ } else if (iRet == RS_RET_RETRY) {
+ /* server is busy - retry or error */
+ STATSCOUNTER_INC(pWrkrData->podMetadataBusy, pWrkrData->mutPodMetadataBusy);
+ if (0 == pWrkrData->pData->busyRetryInterval) {
+ pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ add_pod_metadata = 0; /* do not cache so that we can retry */
+ iRet = RS_RET_OK;
+ } else if(iRet != RS_RET_OK) {
+ /* This is likely caused by transient apiserver errors: 401, 403, 500, etc.
+ * Treat it similar to 404 while returning file path based pod metadata.
+ * When transient error condition resolves, additional metadata will be
+ * available for events originating from a new pod whose metatadata is not
+ * yet cached.
+ * */
+ iRet = RS_RET_OK;
+ STATSCOUNTER_INC(pWrkrData->podMetadataError, pWrkrData->mutPodMetadataError);
+ } else {
+ STATSCOUNTER_INC(pWrkrData->podMetadataSuccess, pWrkrData->mutPodMetadataSuccess);
+ }
+
+ jo = json_object_new_object();
+ if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "uid", &jo2))
+ json_object_object_add(jo, "namespace_id", json_object_get(jo2));
+ if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "labels", &jo2))
+ json_object_object_add(jo, "namespace_labels", json_object_get(jo2));
+ if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "annotations", &jo2))
+ json_object_object_add(jo, "namespace_annotations", json_object_get(jo2));
+ if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "creationTimestamp", &jo2))
+ json_object_object_add(jo, "creation_timestamp", json_object_get(jo2));
+ if(fjson_object_object_get_ex(jReply, "metadata", &jPodData)) {
+ if(fjson_object_object_get_ex(jPodData, "uid", &jo2))
+ json_object_object_add(jo, "pod_id", json_object_get(jo2));
+ parse_labels_annotations(jPodData, &pWrkrData->pData->annotation_match,
+ pWrkrData->pData->de_dot,
+ (const char *)pWrkrData->pData->de_dot_separator,
+ pWrkrData->pData->de_dot_separator_len);
+ if(fjson_object_object_get_ex(jPodData, "annotations", &jo2))
+ json_object_object_add(jo, "annotations", json_object_get(jo2));
+ if(fjson_object_object_get_ex(jPodData, "labels", &jo2))
+ json_object_object_add(jo, "labels", json_object_get(jo2));
+ }
+ if(fjson_object_object_get_ex(jReply, "spec", &jPodData)) {
+ if(fjson_object_object_get_ex(jPodData, "nodeName", &jo2)) {
+ json_object_object_add(jo, "host", json_object_get(jo2));
+ }
+ }
+ json_object_put(jReply);
+ jReply = NULL;
+
+ if (fjson_object_object_get_ex(jMsgMeta, "pod_name", &jo2))
+ json_object_object_add(jo, "pod_name", json_object_get(jo2));
+ if (fjson_object_object_get_ex(jMsgMeta, "namespace_name", &jo2))
+ json_object_object_add(jo, "namespace_name", json_object_get(jo2));
+ if (fjson_object_object_get_ex(jMsgMeta, "container_name", &jo2))
+ json_object_object_add(jo, "container_name", json_object_get(jo2));
+ json_object_object_add(jo, "master_url",
+ json_object_new_string((const char *)pWrkrData->pData->kubernetesUrl));
+ jMetadata = json_object_new_object();
+ json_object_object_add(jMetadata, "kubernetes", jo);
+ jo = json_object_new_object();
+ if (fjson_object_object_get_ex(jMsgMeta, "container_id", &jo2))
+ json_object_object_add(jo, "container_id", json_object_get(jo2));
+ json_object_object_add(jMetadata, "docker", jo);
+
+ if (add_pod_metadata) {
+ if ((iRet = cache_entry_add_md(pWrkrData, mdKey, jMetadata, now)))
+ ABORT_FINALIZE(iRet);
+ mdKey = NULL;
+ }
+ }
+
+ /* make a copy of the metadata for the msg to own */
+ /* todo: use json_object_deep_copy when implementation available in libfastjson */
+ /* yes, this is expensive - but there is no other way to make this thread safe - we
+ * can't allow the msg to have a shared pointer to an element inside the cache,
+ * outside of the cache lock
+ */
+ jMetadataCopy = json_tokener_parse(json_object_get_string(jMetadata));
+ if (!add_pod_metadata) {
+ /* jMetadata object was created from scratch and not cached */
+ json_object_put(jMetadata);
+ jMetadata = NULL;
+ }
+ pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
+ /* the +1 is there to skip the leading '$' */
+ msgAddJSON(pMsg, (uchar *) pWrkrData->pData->dstMetadataPath + 1, jMetadataCopy, 0, 0);
+
+finalize_it:
+ json_object_put(jMsgMeta);
+ free(mdKey);
+ENDdoAction
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ENDisCompatibleWithFeature
+
+
+/* all the macros bellow have to be in a specific order */
+BEGINmodExit
+CODESTARTmodExit
+ curl_global_cleanup();
+
+ objRelease(datetime, CORE_COMPONENT);
+ objRelease(regexp, LM_REGEXP_FILENAME);
+ objRelease(statsobj, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ DBGPRINTF("mmkubernetes: module compiled with rsyslog version %s.\n", VERSION);
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
+ CHKiRet(objUse(regexp, LM_REGEXP_FILENAME));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+ /* CURL_GLOBAL_ALL initializes more than is needed but the
+ * libcurl documentation discourages use of other values
+ */
+ curl_global_init(CURL_GLOBAL_ALL);
+ENDmodInit