summaryrefslogtreecommitdiffstats
path: root/plugins/imkafka/imkafka.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imkafka/imkafka.c')
-rw-r--r--plugins/imkafka/imkafka.c904
1 files changed, 904 insertions, 0 deletions
diff --git a/plugins/imkafka/imkafka.c b/plugins/imkafka/imkafka.c
new file mode 100644
index 0000000..6351280
--- /dev/null
+++ b/plugins/imkafka/imkafka.c
@@ -0,0 +1,904 @@
+/* imkafka.c
+ *
+ * This input plugin is a consumer for Apache Kafka.
+ *
+ * File begun on 2017-04-25 by alorbach
+ *
+ * Copyright 2008-2017 Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <sys/uio.h>
+#include <librdkafka/rdkafka.h>
+
+#include "rsyslog.h"
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "atomic.h"
+#include "statsobj.h"
+#include "unicode-helper.h"
+#include "prop.h"
+#include "ruleset.h"
+#include "glbl.h"
+#include "cfsysline.h"
+#include "msg.h"
+#include "dirty.h"
+
+MODULE_TYPE_INPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("imkafka")
+
+/* static data */
+DEF_IMOD_STATIC_DATA
+DEFobjCurrIf(prop)
+DEFobjCurrIf(ruleset)
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(statsobj)
+
+/* forward references */
+static void * imkafkawrkr(void *myself);
+
+
+struct kafka_params {
+ const char *name;
+ const char *val;
+};
+
+/* Module static data */
+static struct configSettings_s {
+ uchar *topic;
+ uchar *consumergroup;
+ char *brokers;
+ uchar *pszBindRuleset;
+ int nConfParams;
+ struct kafka_params *confParams;
+} cs;
+
+struct instanceConf_s {
+ uchar *topic;
+ uchar *consumergroup;
+ char *brokers;
+ int64_t offset;
+ ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ uchar *pszBindRuleset; /* default name of Ruleset to bind to */
+ int bReportErrs;
+ int nConfParams;
+ struct kafka_params *confParams;
+ int bIsConnected;
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *rk;
+ rd_kafka_topic_conf_t *topic_conf;
+ int partition;
+ int bIsSubscribed;
+ int nMsgParsingFlags;
+
+ struct instanceConf_s *next;
+};
+
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+ uchar *topic;
+ uchar *consumergroup;
+ char *brokers;
+ instanceConf_t *root, *tail;
+ ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ uchar *pszBindRuleset; /* default name of Ruleset to bind to */
+};
+
+/* global data */
+pthread_attr_t wrkrThrdAttr; /* Attribute for worker threads ; read only after startup */
+static int activeKafkaworkers = 0;
+/* The following structure controls the worker threads. Global data is
+ * needed for their access.
+ */
+static struct kafkaWrkrInfo_s {
+ pthread_t tid; /* the worker's thread ID */
+ instanceConf_t *inst; /* Pointer to imkafka instance */
+} *kafkaWrkrInfo;
+
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */
+
+static prop_t *pInputName = NULL;
+/* there is only one global inputName for all messages generated by this input */
+
+/* module-global parameters */
+static struct cnfparamdescr modpdescr[] = {
+ { "ruleset", eCmdHdlrGetWord, 0 },
+};
+static struct cnfparamblk modpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+ };
+
+/* input instance parameters */
+static struct cnfparamdescr inppdescr[] = {
+ { "topic", eCmdHdlrString, CNFPARAM_REQUIRED },
+ { "broker", eCmdHdlrArray, 0 },
+ { "confparam", eCmdHdlrArray, 0 },
+ { "consumergroup", eCmdHdlrString, 0},
+ { "ruleset", eCmdHdlrString, 0 },
+ { "parsehostname", eCmdHdlrBinary, 0 },
+};
+static struct cnfparamblk inppblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(inppdescr)/sizeof(struct cnfparamdescr),
+ inppdescr
+ };
+
+#include "im-helper.h" /* must be included AFTER the type definitions! */
+
+/* ------------------------------ callbacks ------------------------------ */
+
+
+
+
+/* ------------------------------ end callbacks ------------------------------ */
+
+static void
+kafkaLogger(const rd_kafka_t __attribute__((unused)) *rk, int level,
+ const char *fac, const char *buf)
+{
+ DBGPRINTF("imkafka: kafka log message [%d,%s]: %s\n",
+ level, fac, buf);
+}
+
+
+/* enqueue the kafka message. The provided string is
+ * not freed - thuis must be done by the caller.
+ */
+static rsRetVal enqMsg(instanceConf_t *const __restrict__ inst,
+ rd_kafka_message_t *const __restrict__ rkmessage)
+{
+ DEFiRet;
+ smsg_t *pMsg;
+
+ if((int)rkmessage->len == 0) {
+ /* we do not process empty lines */
+ FINALIZE;
+ }
+
+DBGPRINTF("imkafka: enqMsg: Msg: %.*s\n", (int)rkmessage->len, (char *)rkmessage->payload);
+
+ CHKiRet(msgConstruct(&pMsg));
+ MsgSetInputName(pMsg, pInputName);
+ MsgSetRawMsg(pMsg, (char*)rkmessage->payload, (int)rkmessage->len);
+ MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY);
+ MsgSetRuleset(pMsg, inst->pBindRuleset);
+ pMsg->msgFlags = inst->nMsgParsingFlags;
+ /* Optional Fields */
+ if (rkmessage->key_len) {
+ DBGPRINTF("imkafka: enqMsg: Key: %.*s\n", (int)rkmessage->key_len, (char *)rkmessage->key);
+ MsgSetTAG(pMsg, (const uchar *)rkmessage->key, (int)rkmessage->key_len);
+ }
+ MsgSetMSGoffs(pMsg, 0); /* we do not have a header... */
+
+ CHKiRet(submitMsg2(pMsg));
+
+finalize_it:
+ RETiRet;
+}
+
+/**
+ * Handle Kafka Consumer Loop until all msgs are processed
+ */
+static void msgConsume (instanceConf_t *inst) {
+ rd_kafka_message_t *rkmessage = NULL;
+
+ do { /* Consume messages */
+ rkmessage = rd_kafka_consumer_poll(inst->rk, 1000); /* Block for 1000 ms max */
+ if(rkmessage == NULL) {
+ DBGPRINTF("imkafka: msgConsume EMPTY Loop on %s/%s/%s\n",
+ inst->topic, inst->consumergroup, inst->brokers);
+ goto done;
+ }
+
+ if (rkmessage->err) {
+ if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
+ /* not an error, just a regular status! */
+ DBGPRINTF("imkafka: Consumer "
+ "reached end of topic \"%s\" [%"PRId32"]"
+ "message queue offset %"PRId64"\n",
+ rd_kafka_topic_name(rkmessage->rkt),
+ rkmessage->partition,
+ rkmessage->offset);
+ goto done;
+ }
+ if (rkmessage->rkt) {
+ LogError(0, RS_RET_KAFKA_ERROR,
+ "imkafka: Consumer error for topic \"%s\" [%"PRId32"]"
+ "message queue offset %"PRId64": %s\n",
+ rd_kafka_topic_name(rkmessage->rkt),
+ rkmessage->partition,
+ rkmessage->offset,
+ rd_kafka_message_errstr(rkmessage));
+ } else {
+ LogError(0, RS_RET_KAFKA_ERROR,
+ "imkafka: Consumer error for topic \"%s\": \"%s\"\n",
+ rd_kafka_err2str(rkmessage->err),
+ rd_kafka_message_errstr(rkmessage));
+ }
+ goto done;
+ }
+
+ DBGPRINTF("imkafka: msgConsume Loop on %s/%s/%s: [%"PRId32"], "
+ "offset %"PRId64", %zd bytes):\n",
+ rd_kafka_topic_name(rkmessage->rkt) /*inst->topic*/,
+ inst->consumergroup,
+ inst->brokers,
+ rkmessage->partition,
+ rkmessage->offset,
+ rkmessage->len);
+ enqMsg(inst, rkmessage);
+ /* Destroy message and continue */
+ rd_kafka_message_destroy(rkmessage);
+ rkmessage = NULL;
+ } while(1); /* loop broken inside */
+done:
+ /* Destroy message in case rkmessage->err was set */
+ if(rkmessage != NULL) {
+ rd_kafka_message_destroy(rkmessage);
+ }
+ return;
+}
+
+
+
+/* create input instance, set default parameters, and
+ * add it to the list of instances.
+ */
+static rsRetVal
+createInstance(instanceConf_t **pinst)
+{
+ instanceConf_t *inst;
+ DEFiRet;
+ CHKmalloc(inst = malloc(sizeof(instanceConf_t)));
+ inst->next = NULL;
+
+ inst->brokers = NULL;
+ inst->topic = NULL;
+ inst->consumergroup = NULL;
+ inst->pszBindRuleset = NULL;
+ inst->nConfParams = 0;
+ inst->confParams = NULL;
+ inst->pBindRuleset = NULL;
+ inst->bReportErrs = 1; /* Fixed for now */
+ inst->nMsgParsingFlags = NEEDS_PARSING;
+ inst->bIsConnected = 0;
+ inst->bIsSubscribed = 0;
+ /* Kafka objects */
+ inst->conf = NULL;
+ inst->rk = NULL;
+ inst->topic_conf = NULL;
+ inst->partition = RD_KAFKA_PARTITION_UA;
+
+ /* node created, let's add to config */
+ if(loadModConf->tail == NULL) {
+ loadModConf->tail = loadModConf->root = inst;
+ } else {
+ loadModConf->tail->next = inst;
+ loadModConf->tail = inst;
+ }
+
+ *pinst = inst;
+finalize_it:
+ RETiRet;
+}
+
+/* this function checks instance parameters and does some required pre-processing
+ */
+static rsRetVal ATTR_NONNULL()
+checkInstance(instanceConf_t *const inst)
+{
+ DEFiRet;
+ char kafkaErrMsg[1024];
+
+ /* main kafka conf */
+ inst->conf = rd_kafka_conf_new();
+ if(inst->conf == NULL) {
+ if(inst->bReportErrs) {
+ LogError(0, RS_RET_KAFKA_ERROR,
+ "imkafka: error creating kafka conf obj: %s\n",
+ rd_kafka_err2str(rd_kafka_last_error()));
+ }
+ ABORT_FINALIZE(RS_RET_KAFKA_ERROR);
+ }
+
+# ifdef DEBUG
+ /* enable kafka debug output */
+ if(rd_kafka_conf_set(inst->conf, "debug", RD_KAFKA_DEBUG_CONTEXTS,
+ kafkaErrMsg, sizeof(kafkaErrMsg)) != RD_KAFKA_CONF_OK) {
+ LogError(0, RS_RET_KAFKA_ERROR, "imkafka: error setting kafka debug option: %s\n", kafkaErrMsg);
+ /* DO NOT ABORT IN THIS CASE! */
+ }
+# endif
+
+ /* Set custom configuration parameters */
+ for(int i = 0 ; i < inst->nConfParams ; ++i) {
+ assert(inst->confParams+i != NULL); /* invariant: nConfParams MUST exist! */
+ DBGPRINTF("imkafka: setting custom configuration parameter: %s:%s\n",
+ inst->confParams[i].name,
+ inst->confParams[i].val);
+ if(rd_kafka_conf_set(inst->conf,
+ inst->confParams[i].name,
+ inst->confParams[i].val,
+ kafkaErrMsg, sizeof(kafkaErrMsg)) != RD_KAFKA_CONF_OK) {
+ if(inst->bReportErrs) {
+ LogError(0, RS_RET_PARAM_ERROR, "error setting custom configuration "
+ "parameter '%s=%s': %s",
+ inst->confParams[i].name,
+ inst->confParams[i].val, kafkaErrMsg);
+ } else {
+ DBGPRINTF("imkafka: error setting custom configuration parameter '%s=%s': %s",
+ inst->confParams[i].name,
+ inst->confParams[i].val, kafkaErrMsg);
+ }
+ ABORT_FINALIZE(RS_RET_PARAM_ERROR);
+ }
+ }
+
+ /* Topic configuration */
+ inst->topic_conf = rd_kafka_topic_conf_new();
+
+ /* Assign kafka group id */
+ if (inst->consumergroup != NULL) {
+ DBGPRINTF("imkafka: setting consumergroup: '%s'\n", inst->consumergroup);
+ if (rd_kafka_conf_set(inst->conf, "group.id", (char*) inst->consumergroup,
+ kafkaErrMsg, sizeof(kafkaErrMsg)) != RD_KAFKA_CONF_OK) {
+ if(inst->bReportErrs) {
+ LogError(0, RS_RET_KAFKA_ERROR,
+ "imkafka: error assigning consumergroup %s to "
+ "kafka config: %s\n", inst->consumergroup,
+ kafkaErrMsg);
+ }
+ ABORT_FINALIZE(RS_RET_KAFKA_ERROR);
+ }
+
+
+ /* Set default for auto offset reset */
+ if (rd_kafka_topic_conf_set(inst->topic_conf, "auto.offset.reset",
+ "smallest", kafkaErrMsg, sizeof(kafkaErrMsg)) != RD_KAFKA_CONF_OK) {
+ if(inst->bReportErrs) {
+ LogError(0, RS_RET_KAFKA_ERROR,
+ "imkafka: error setting kafka auto.offset.reset on %s: %s\n",
+ inst->consumergroup,
+ kafkaErrMsg);
+ }
+ ABORT_FINALIZE(RS_RET_KAFKA_ERROR);
+ }
+ /* Consumer groups always use broker based offset storage */
+ if (rd_kafka_topic_conf_set(inst->topic_conf, "offset.store.method",
+ "broker", kafkaErrMsg, sizeof(kafkaErrMsg)) != RD_KAFKA_CONF_OK) {
+ if(inst->bReportErrs) {
+ LogError(0, RS_RET_KAFKA_ERROR,
+ "imkafka: error setting kafka offset.store.method on %s: %s\n",
+ inst->consumergroup,
+ kafkaErrMsg);
+ }
+ ABORT_FINALIZE(RS_RET_KAFKA_ERROR);
+ }
+
+ /* Set default topic config for pattern-matched topics. */
+ rd_kafka_conf_set_default_topic_conf(inst->conf, inst->topic_conf);
+ }
+
+ #if RD_KAFKA_VERSION >= 0x00090001
+ rd_kafka_conf_set_log_cb(inst->conf, kafkaLogger);
+ #endif
+
+ /* Create Kafka Consumer */
+ inst->rk = rd_kafka_new(RD_KAFKA_CONSUMER, inst->conf,
+ kafkaErrMsg, sizeof(kafkaErrMsg));
+ if(inst->rk == NULL) {
+ if(inst->bReportErrs) {
+ LogError(0, RS_RET_KAFKA_ERROR,
+ "imkafka: error creating kafka handle: %s\n", kafkaErrMsg);
+ }
+ ABORT_FINALIZE(RS_RET_KAFKA_ERROR);
+ }
+ #if RD_KAFKA_VERSION < 0x00090001
+ rd_kafka_set_logger(inst->rk, kafkaLogger);
+ #endif
+
+ DBGPRINTF("imkafka: setting brokers: '%s'\n", inst->brokers);
+ if(rd_kafka_brokers_add(inst->rk, (char*)inst->brokers) == 0) {
+ if(inst->bReportErrs) {
+ LogError(0, RS_RET_KAFKA_NO_VALID_BROKERS,
+ "imkafka: no valid brokers specified: %s", inst->brokers);
+ }
+ ABORT_FINALIZE(RS_RET_KAFKA_NO_VALID_BROKERS);
+ }
+
+ /* Kafka Consumer is opened */
+ inst->bIsConnected = 1;
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ if(inst->rk == NULL) {
+ if(inst->conf != NULL) {
+ rd_kafka_conf_destroy(inst->conf);
+ inst->conf = NULL;
+ }
+ } else { /* inst->rk != NULL ! */
+ rd_kafka_destroy(inst->rk);
+ inst->rk = NULL;
+ }
+ }
+
+ RETiRet;
+}
+
+/* function to generate an error message if the ruleset cannot be found */
+static inline void
+std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, instanceConf_t *inst)
+{
+ if(inst->bReportErrs) {
+ LogError(0, NO_ERRCODE, "imkafka: ruleset '%s' not found - "
+ "using default ruleset instead",
+ inst->pszBindRuleset);
+ }
+}
+
+
+static rsRetVal ATTR_NONNULL(2)
+addConsumer(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
+{
+ DEFiRet;
+ rd_kafka_resp_err_t err;
+
+ assert(inst != NULL);
+
+ rd_kafka_topic_partition_list_t *topics = NULL;
+ DBGPRINTF("imkafka: creating kafka consumer on %s/%s/%s\n",
+ inst->topic, inst->consumergroup, inst->brokers);
+
+ /* Redirect rd_kafka_poll() to consumer_poll() */
+ rd_kafka_poll_set_consumer(inst->rk);
+
+ topics = rd_kafka_topic_partition_list_new(1);
+ rd_kafka_topic_partition_list_add(topics, (const char*)inst->topic, inst->partition);
+ DBGPRINTF("imkafka: Created topics(%d) for %s)\n",
+ topics->cnt, inst->topic);
+ if ((err = rd_kafka_subscribe(inst->rk, topics))) {
+ /* Subscription failed */
+ inst->bIsSubscribed = 0;
+ LogError(0, RS_RET_KAFKA_ERROR, "imkafka: Failed to start consuming "
+ "topics: %s\n", rd_kafka_err2str(err));
+ ABORT_FINALIZE(RS_RET_KAFKA_ERROR);
+ } else {
+ DBGPRINTF("imkafka: Successfully subscribed to %s/%s/%s\n",
+ inst->topic, inst->consumergroup, inst->brokers);
+ /* Subscription is working */
+ inst->bIsSubscribed = 1;
+ }
+finalize_it:
+ if(topics != NULL)
+ rd_kafka_topic_partition_list_destroy(topics);
+ RETiRet;
+}
+
+static rsRetVal ATTR_NONNULL()
+processKafkaParam(char *const param,
+ const char **const name,
+ const char **const paramval)
+{
+ DEFiRet;
+ char *val = strstr(param, "=");
+ if(val == NULL) {
+ LogError(0, RS_RET_PARAM_ERROR, "missing equal sign in "
+ "parameter '%s'", param);
+ ABORT_FINALIZE(RS_RET_PARAM_ERROR);
+ }
+ *val = '\0'; /* terminates name */
+ ++val; /* now points to begin of value */
+ CHKmalloc(*name = strdup(param));
+ CHKmalloc(*paramval = strdup(val));
+finalize_it:
+ RETiRet;
+}
+
+BEGINnewInpInst
+ struct cnfparamvals *pvals;
+ instanceConf_t *inst;
+ int i;
+CODESTARTnewInpInst
+ DBGPRINTF("newInpInst (imkafka)\n");
+
+ if((pvals = nvlstGetParams(lst, &inppblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("input param blk in imkafka:\n");
+ cnfparamsPrint(&inppblk, pvals);
+ }
+
+ CHKiRet(createInstance(&inst));
+
+ for(i = 0 ; i < inppblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(inppblk.descr[i].name, "broker")) {
+ es_str_t *es = es_newStr(128);
+ int bNeedComma = 0;
+ for(int j = 0 ; j < pvals[i].val.d.ar->nmemb ; ++j) {
+ if(bNeedComma)
+ es_addChar(&es, ',');
+ es_addStr(&es, pvals[i].val.d.ar->arr[j]);
+ bNeedComma = 1;
+ }
+ inst->brokers = es_str2cstr(es, NULL);
+ es_deleteStr(es);
+ } else if(!strcmp(inppblk.descr[i].name, "confparam")) {
+ inst->nConfParams = pvals[i].val.d.ar->nmemb;
+ CHKmalloc(inst->confParams = malloc(sizeof(struct kafka_params)*inst->nConfParams));
+ for(int j = 0; j < inst->nConfParams; j++) {
+ char *cstr = es_str2cstr(pvals[i].val.d.ar->arr[j], NULL);
+ CHKiRet(processKafkaParam(cstr, &inst->confParams[j].name,
+ &inst->confParams[j].val));
+ free(cstr);
+ }
+ } else if(!strcmp(inppblk.descr[i].name, "topic")) {
+ inst->topic = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "consumergroup")) {
+ inst->consumergroup = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "ruleset")) {
+ inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "parsehostname")) {
+ if (pvals[i].val.d.n) {
+ inst->nMsgParsingFlags = NEEDS_PARSING | PARSE_HOSTNAME;
+ } else {
+ inst->nMsgParsingFlags = NEEDS_PARSING;
+ }
+ } else {
+ dbgprintf("imkafka: program error, non-handled "
+ "param '%s'\n", inppblk.descr[i].name);
+ }
+ }
+
+ if(inst->brokers == NULL) {
+ CHKmalloc(inst->brokers = strdup("localhost:9092"));
+ LogMsg(0, NO_ERRCODE, LOG_INFO, "imkafka: \"broker\" parameter not specified "
+ "using default of localhost:9092 -- this may not be what you want!");
+ }
+
+ DBGPRINTF("imkafka: newInpIns brokers=%s, topic=%s, consumergroup=%s\n",
+ inst->brokers, inst->topic, inst->consumergroup);
+
+finalize_it:
+CODE_STD_FINALIZERnewInpInst
+ cnfparamvalsDestruct(pvals, &inppblk);
+ENDnewInpInst
+
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ pModConf->pszBindRuleset = NULL;
+ENDbeginCnfLoad
+
+
+BEGINsetModCnf
+ struct cnfparamvals *pvals = NULL;
+ int i;
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if(pvals == NULL) {
+ LogError(0, RS_RET_MISSING_CNFPARAMS, "imkafka: error processing module "
+ "config parameters [module(...)]");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("module (global) param blk for imkafka:\n");
+ cnfparamsPrint(&modpblk, pvals);
+ }
+
+ for(i = 0 ; i < modpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(modpblk.descr[i].name, "ruleset")) {
+ loadModConf->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else {
+ dbgprintf("imkafka: program error, non-handled "
+ "param '%s' in beginCnfLoad\n", modpblk.descr[i].name);
+ }
+ }
+finalize_it:
+ if(pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ if(loadModConf->pszBindRuleset == NULL) {
+ if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) {
+ loadModConf->pszBindRuleset = NULL;
+ } else {
+ CHKmalloc(loadModConf->pszBindRuleset = ustrdup(cs.pszBindRuleset));
+ }
+ }
+finalize_it:
+ free(cs.pszBindRuleset);
+ cs.pszBindRuleset = NULL;
+ loadModConf = NULL; /* done loading */
+ENDendCnfLoad
+
+BEGINcheckCnf
+ instanceConf_t *inst;
+CODESTARTcheckCnf
+ for(inst = pModConf->root ; inst != NULL ; inst = inst->next) {
+ if(inst->pszBindRuleset == NULL && pModConf->pszBindRuleset != NULL) {
+ CHKmalloc(inst->pszBindRuleset = ustrdup(pModConf->pszBindRuleset));
+ }
+ std_checkRuleset(pModConf, inst);
+ }
+finalize_it:
+ENDcheckCnf
+
+
+BEGINactivateCnfPrePrivDrop
+CODESTARTactivateCnfPrePrivDrop
+ runModConf = pModConf;
+ENDactivateCnfPrePrivDrop
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ for(instanceConf_t *inst = pModConf->root ; inst != NULL ; inst = inst->next) {
+ iRet = checkInstance(inst);
+ }
+ENDactivateCnf
+
+
+BEGINfreeCnf
+ instanceConf_t *inst, *del;
+CODESTARTfreeCnf
+ for(inst = pModConf->root ; inst != NULL ; ) {
+ free(inst->topic);
+ free(inst->consumergroup);
+ free(inst->brokers);
+ free(inst->pszBindRuleset);
+ for(int i = 0; i < inst->nConfParams; i++) {
+ free((void*)inst->confParams[i].name);
+ free((void*)inst->confParams[i].val);
+ }
+ free((void*)inst->confParams);
+ del = inst;
+ inst = inst->next;
+ free(del);
+ }
+ free(pModConf->pszBindRuleset);
+ENDfreeCnf
+
+
+/* Cleanup imkafka worker threads */
+static void
+shutdownKafkaWorkers(void)
+{
+ int i;
+ instanceConf_t *inst;
+
+ assert(kafkaWrkrInfo != NULL);
+
+ DBGPRINTF("imkafka: waiting on imkafka workerthread termination\n");
+ for(i = 0 ; i < activeKafkaworkers ; ++i) {
+ pthread_join(kafkaWrkrInfo[i].tid, NULL);
+ DBGPRINTF("imkafka: Stopped worker %d\n", i);
+ }
+ free(kafkaWrkrInfo);
+ kafkaWrkrInfo = NULL;
+
+ for(inst = runModConf->root ; inst != NULL ; inst = inst->next) {
+ DBGPRINTF("imkafka: stop consuming %s/%s/%s\n",
+ inst->topic, inst->consumergroup, inst->brokers);
+ rd_kafka_consumer_close(inst->rk); /* Close the consumer, committing final offsets, etc. */
+ rd_kafka_destroy(inst->rk); /* Destroy handle object */
+ DBGPRINTF("imkafka: stopped consuming %s/%s/%s\n",
+ inst->topic, inst->consumergroup, inst->brokers);
+
+ #if RD_KAFKA_VERSION < 0x00090001
+ /* Wait for kafka being destroyed in old API */
+ if (rd_kafka_wait_destroyed(10000) < 0) {
+ DBGPRINTF("imkafka: error, rd_kafka_destroy did not finish after grace "
+ "timeout (10s)!\n");
+ } else {
+ DBGPRINTF("imkafka: rd_kafka_destroy successfully finished\n");
+ }
+ #endif
+ }
+}
+
+
+/* This function is called to gather input. */
+BEGINrunInput
+ int i;
+ instanceConf_t *inst;
+CODESTARTrunInput
+ DBGPRINTF("imkafka: runInput loop started ...\n");
+ activeKafkaworkers = 0;
+ for(inst = runModConf->root ; inst != NULL ; inst = inst->next) {
+ if(inst->rk != NULL) {
+ ++activeKafkaworkers;
+ }
+ }
+
+ if(activeKafkaworkers == 0) {
+ LogError(0, RS_RET_ERR, "imkafka: no active inputs, input does "
+ "not run - there should have been additional error "
+ "messages given previously");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+
+ DBGPRINTF("imkafka: Starting %d imkafka workerthreads\n", activeKafkaworkers);
+ kafkaWrkrInfo = calloc(activeKafkaworkers, sizeof(struct kafkaWrkrInfo_s));
+ if (kafkaWrkrInfo == NULL) {
+ LogError(errno, RS_RET_OUT_OF_MEMORY, "imkafka: worker-info array allocation failed.");
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+
+ /* Start worker threads for each imkafka input source
+ */
+ i = 0;
+ for(inst = runModConf->root ; inst != NULL ; inst = inst->next) {
+ /* init worker info structure! */
+ kafkaWrkrInfo[i].inst = inst; /* Set reference pointer */
+ pthread_create(&kafkaWrkrInfo[i].tid, &wrkrThrdAttr, imkafkawrkr, &(kafkaWrkrInfo[i]));
+ i++;
+ }
+
+ while(glbl.GetGlobalInputTermState() == 0) {
+
+ /* Note: the additional 10000ns wait is vitally important. It guards rsyslog
+ * against totally hogging the CPU if the users selects a polling interval
+ * of 0 seconds. It doesn't hurt any other valid scenario. So do not remove.
+ */
+ if(glbl.GetGlobalInputTermState() == 0)
+ srSleep(0, 100000);
+ }
+ DBGPRINTF("imkafka: terminating upon request of rsyslog core\n");
+
+ /* we need to shutdown kafak worker threads here because this operation can
+ * potentially block (e.g. when no kafka broker is available!). If this
+ * happens in runInput, the rsyslog core can cancel our thread. However,
+ * in afterRun this is not possible, because the core does not assume it
+ * can block there. -- rgerhards, 2018-10-23
+ */
+ shutdownKafkaWorkers();
+finalize_it:
+ENDrunInput
+
+
+BEGINwillRun
+CODESTARTwillRun
+ /* we need to create the inputName property (only once during our lifetime) */
+ CHKiRet(prop.Construct(&pInputName));
+ CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("imkafka"), sizeof("imkafka") - 1));
+ CHKiRet(prop.ConstructFinalize(pInputName));
+finalize_it:
+ENDwillRun
+
+
+BEGINafterRun
+CODESTARTafterRun
+ if(pInputName != NULL)
+ prop.Destruct(&pInputName);
+
+ENDafterRun
+
+
+BEGINmodExit
+CODESTARTmodExit
+ pthread_attr_destroy(&wrkrThrdAttr);
+ /* release objects we used */
+ objRelease(statsobj, CORE_COMPONENT);
+ objRelease(ruleset, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(prop, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES
+CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION;
+CODEmodInit_QueryRegCFSLineHdlr
+ /* request objects we use */
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
+
+ /* initialize "read-only" thread attributes */
+ pthread_attr_init(&wrkrThrdAttr);
+ pthread_attr_setstacksize(&wrkrThrdAttr, 4096*1024);
+
+ DBGPRINTF("imkafka %s using librdkafka version %s, 0x%x\n",
+ VERSION, rd_kafka_version_str(), rd_kafka_version());
+ENDmodInit
+
+/*
+* Workerthread function for a single kafka consomer
+ */
+static void *
+imkafkawrkr(void *myself)
+{
+ struct kafkaWrkrInfo_s *me = (struct kafkaWrkrInfo_s*) myself;
+ DBGPRINTF("imkafka: started kafka consumer workerthread on %s/%s/%s\n",
+ me->inst->topic, me->inst->consumergroup, me->inst->brokers);
+
+ do {
+ if(glbl.GetGlobalInputTermState() == 1)
+ break; /* terminate input! */
+
+ if(me->inst->rk == NULL) {
+ continue;
+ }
+
+ // Try to add consumer only if connected! */
+ if(me->inst->bIsConnected == 1 && me->inst->bIsSubscribed == 0 ) {
+ addConsumer(runModConf, me->inst);
+ }
+ if(me->inst->bIsSubscribed == 1 ) {
+ msgConsume(me->inst);
+ }
+ /* Note: the additional 10000ns wait is vitally important. It guards rsyslog
+ * against totally hogging the CPU if the users selects a polling interval
+ * of 0 seconds. It doesn't hurt any other valid scenario. So do not remove.
+ * rgerhards, 2008-02-14
+ */
+ if(glbl.GetGlobalInputTermState() == 0)
+ srSleep(0, 100000);
+ } while(glbl.GetGlobalInputTermState() == 0);
+
+ DBGPRINTF("imkafka: stopped kafka consumer workerthread on %s/%s/%s\n",
+ me->inst->topic, me->inst->consumergroup, me->inst->brokers);
+ return NULL;
+}