/* imkafka.c * * This input plugin is a consumer for Apache Kafka. * * File begun on 2017-04-25 by alorbach * * Copyright 2008-2017 Adiscon GmbH. * * This file is part of rsyslog. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * -or- * see COPYING.ASL20 in the source distribution * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "config.h" #include #include #include #include #include #include #include #include #include #include #include "rsyslog.h" #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" #include "template.h" #include "module-template.h" #include "errmsg.h" #include "atomic.h" #include "statsobj.h" #include "unicode-helper.h" #include "prop.h" #include "ruleset.h" #include "glbl.h" #include "cfsysline.h" #include "msg.h" #include "dirty.h" MODULE_TYPE_INPUT MODULE_TYPE_NOKEEP MODULE_CNFNAME("imkafka") /* static data */ DEF_IMOD_STATIC_DATA DEFobjCurrIf(prop) DEFobjCurrIf(ruleset) DEFobjCurrIf(glbl) DEFobjCurrIf(statsobj) /* forward references */ static void * imkafkawrkr(void *myself); struct kafka_params { const char *name; const char *val; }; /* Module static data */ static struct configSettings_s { uchar *topic; uchar *consumergroup; char *brokers; uchar *pszBindRuleset; int nConfParams; struct kafka_params *confParams; } cs; struct instanceConf_s { uchar *topic; uchar *consumergroup; char *brokers; int64_t offset; ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ uchar *pszBindRuleset; /* default name of Ruleset to bind to */ int bReportErrs; int nConfParams; struct kafka_params *confParams; int bIsConnected; rd_kafka_conf_t *conf; rd_kafka_t *rk; rd_kafka_topic_conf_t *topic_conf; int partition; int bIsSubscribed; int nMsgParsingFlags; struct instanceConf_s *next; }; struct modConfData_s { rsconf_t *pConf; /* our overall config object */ uchar *topic; uchar *consumergroup; char *brokers; instanceConf_t *root, *tail; ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ uchar *pszBindRuleset; /* default name of Ruleset to bind to */ }; /* global data */ pthread_attr_t wrkrThrdAttr; /* Attribute for worker threads ; read only after startup */ static int activeKafkaworkers = 0; /* The following structure controls the worker threads. Global data is * needed for their access. */ static struct kafkaWrkrInfo_s { pthread_t tid; /* the worker's thread ID */ instanceConf_t *inst; /* Pointer to imkafka instance */ } *kafkaWrkrInfo; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this input */ /* module-global parameters */ static struct cnfparamdescr modpdescr[] = { { "ruleset", eCmdHdlrGetWord, 0 }, }; static struct cnfparamblk modpblk = { CNFPARAMBLK_VERSION, sizeof(modpdescr)/sizeof(struct cnfparamdescr), modpdescr }; /* input instance parameters */ static struct cnfparamdescr inppdescr[] = { { "topic", eCmdHdlrString, CNFPARAM_REQUIRED }, { "broker", eCmdHdlrArray, 0 }, { "confparam", eCmdHdlrArray, 0 }, { "consumergroup", eCmdHdlrString, 0}, { "ruleset", eCmdHdlrString, 0 }, { "parsehostname", eCmdHdlrBinary, 0 }, }; static struct cnfparamblk inppblk = { CNFPARAMBLK_VERSION, sizeof(inppdescr)/sizeof(struct cnfparamdescr), inppdescr }; #include "im-helper.h" /* must be included AFTER the type definitions! */ /* ------------------------------ callbacks ------------------------------ */ /* ------------------------------ end callbacks ------------------------------ */ static void kafkaLogger(const rd_kafka_t __attribute__((unused)) *rk, int level, const char *fac, const char *buf) { DBGPRINTF("imkafka: kafka log message [%d,%s]: %s\n", level, fac, buf); } /* enqueue the kafka message. The provided string is * not freed - thuis must be done by the caller. */ static rsRetVal enqMsg(instanceConf_t *const __restrict__ inst, rd_kafka_message_t *const __restrict__ rkmessage) { DEFiRet; smsg_t *pMsg; if((int)rkmessage->len == 0) { /* we do not process empty lines */ FINALIZE; } DBGPRINTF("imkafka: enqMsg: Msg: %.*s\n", (int)rkmessage->len, (char *)rkmessage->payload); CHKiRet(msgConstruct(&pMsg)); MsgSetInputName(pMsg, pInputName); MsgSetRawMsg(pMsg, (char*)rkmessage->payload, (int)rkmessage->len); MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); MsgSetRuleset(pMsg, inst->pBindRuleset); pMsg->msgFlags = inst->nMsgParsingFlags; /* Optional Fields */ if (rkmessage->key_len) { DBGPRINTF("imkafka: enqMsg: Key: %.*s\n", (int)rkmessage->key_len, (char *)rkmessage->key); MsgSetTAG(pMsg, (const uchar *)rkmessage->key, (int)rkmessage->key_len); } MsgSetMSGoffs(pMsg, 0); /* we do not have a header... */ CHKiRet(submitMsg2(pMsg)); finalize_it: RETiRet; } /** * Handle Kafka Consumer Loop until all msgs are processed */ static void msgConsume (instanceConf_t *inst) { rd_kafka_message_t *rkmessage = NULL; do { /* Consume messages */ rkmessage = rd_kafka_consumer_poll(inst->rk, 1000); /* Block for 1000 ms max */ if(rkmessage == NULL) { DBGPRINTF("imkafka: msgConsume EMPTY Loop on %s/%s/%s\n", inst->topic, inst->consumergroup, inst->brokers); goto done; } if (rkmessage->err) { if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { /* not an error, just a regular status! */ DBGPRINTF("imkafka: Consumer " "reached end of topic \"%s\" [%"PRId32"]" "message queue offset %"PRId64"\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset); goto done; } if (rkmessage->rkt) { LogError(0, RS_RET_KAFKA_ERROR, "imkafka: Consumer error for topic \"%s\" [%"PRId32"]" "message queue offset %"PRId64": %s\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset, rd_kafka_message_errstr(rkmessage)); } else { LogError(0, RS_RET_KAFKA_ERROR, "imkafka: Consumer error for topic \"%s\": \"%s\"\n", rd_kafka_err2str(rkmessage->err), rd_kafka_message_errstr(rkmessage)); } goto done; } DBGPRINTF("imkafka: msgConsume Loop on %s/%s/%s: [%"PRId32"], " "offset %"PRId64", %zd bytes):\n", rd_kafka_topic_name(rkmessage->rkt) /*inst->topic*/, inst->consumergroup, inst->brokers, rkmessage->partition, rkmessage->offset, rkmessage->len); enqMsg(inst, rkmessage); /* Destroy message and continue */ rd_kafka_message_destroy(rkmessage); rkmessage = NULL; } while(1); /* loop broken inside */ done: /* Destroy message in case rkmessage->err was set */ if(rkmessage != NULL) { rd_kafka_message_destroy(rkmessage); } return; } /* create input instance, set default parameters, and * add it to the list of instances. */ static rsRetVal createInstance(instanceConf_t **pinst) { instanceConf_t *inst; DEFiRet; CHKmalloc(inst = malloc(sizeof(instanceConf_t))); inst->next = NULL; inst->brokers = NULL; inst->topic = NULL; inst->consumergroup = NULL; inst->pszBindRuleset = NULL; inst->nConfParams = 0; inst->confParams = NULL; inst->pBindRuleset = NULL; inst->bReportErrs = 1; /* Fixed for now */ inst->nMsgParsingFlags = NEEDS_PARSING; inst->bIsConnected = 0; inst->bIsSubscribed = 0; /* Kafka objects */ inst->conf = NULL; inst->rk = NULL; inst->topic_conf = NULL; inst->partition = RD_KAFKA_PARTITION_UA; /* node created, let's add to config */ if(loadModConf->tail == NULL) { loadModConf->tail = loadModConf->root = inst; } else { loadModConf->tail->next = inst; loadModConf->tail = inst; } *pinst = inst; finalize_it: RETiRet; } /* this function checks instance parameters and does some required pre-processing */ static rsRetVal ATTR_NONNULL() checkInstance(instanceConf_t *const inst) { DEFiRet; char kafkaErrMsg[1024]; /* main kafka conf */ inst->conf = rd_kafka_conf_new(); if(inst->conf == NULL) { if(inst->bReportErrs) { LogError(0, RS_RET_KAFKA_ERROR, "imkafka: error creating kafka conf obj: %s\n", rd_kafka_err2str(rd_kafka_last_error())); } ABORT_FINALIZE(RS_RET_KAFKA_ERROR); } # ifdef DEBUG /* enable kafka debug output */ if(rd_kafka_conf_set(inst->conf, "debug", RD_KAFKA_DEBUG_CONTEXTS, kafkaErrMsg, sizeof(kafkaErrMsg)) != RD_KAFKA_CONF_OK) { LogError(0, RS_RET_KAFKA_ERROR, "imkafka: error setting kafka debug option: %s\n", kafkaErrMsg); /* DO NOT ABORT IN THIS CASE! */ } # endif /* Set custom configuration parameters */ for(int i = 0 ; i < inst->nConfParams ; ++i) { assert(inst->confParams+i != NULL); /* invariant: nConfParams MUST exist! */ DBGPRINTF("imkafka: setting custom configuration parameter: %s:%s\n", inst->confParams[i].name, inst->confParams[i].val); if(rd_kafka_conf_set(inst->conf, inst->confParams[i].name, inst->confParams[i].val, kafkaErrMsg, sizeof(kafkaErrMsg)) != RD_KAFKA_CONF_OK) { if(inst->bReportErrs) { LogError(0, RS_RET_PARAM_ERROR, "error setting custom configuration " "parameter '%s=%s': %s", inst->confParams[i].name, inst->confParams[i].val, kafkaErrMsg); } else { DBGPRINTF("imkafka: error setting custom configuration parameter '%s=%s': %s", inst->confParams[i].name, inst->confParams[i].val, kafkaErrMsg); } ABORT_FINALIZE(RS_RET_PARAM_ERROR); } } /* Topic configuration */ inst->topic_conf = rd_kafka_topic_conf_new(); /* Assign kafka group id */ if (inst->consumergroup != NULL) { DBGPRINTF("imkafka: setting consumergroup: '%s'\n", inst->consumergroup); if (rd_kafka_conf_set(inst->conf, "group.id", (char*) inst->consumergroup, kafkaErrMsg, sizeof(kafkaErrMsg)) != RD_KAFKA_CONF_OK) { if(inst->bReportErrs) { LogError(0, RS_RET_KAFKA_ERROR, "imkafka: error assigning consumergroup %s to " "kafka config: %s\n", inst->consumergroup, kafkaErrMsg); } ABORT_FINALIZE(RS_RET_KAFKA_ERROR); } /* Set default for auto offset reset */ if (rd_kafka_topic_conf_set(inst->topic_conf, "auto.offset.reset", "smallest", kafkaErrMsg, sizeof(kafkaErrMsg)) != RD_KAFKA_CONF_OK) { if(inst->bReportErrs) { LogError(0, RS_RET_KAFKA_ERROR, "imkafka: error setting kafka auto.offset.reset on %s: %s\n", inst->consumergroup, kafkaErrMsg); } ABORT_FINALIZE(RS_RET_KAFKA_ERROR); } /* Consumer groups always use broker based offset storage */ if (rd_kafka_topic_conf_set(inst->topic_conf, "offset.store.method", "broker", kafkaErrMsg, sizeof(kafkaErrMsg)) != RD_KAFKA_CONF_OK) { if(inst->bReportErrs) { LogError(0, RS_RET_KAFKA_ERROR, "imkafka: error setting kafka offset.store.method on %s: %s\n", inst->consumergroup, kafkaErrMsg); } ABORT_FINALIZE(RS_RET_KAFKA_ERROR); } /* Set default topic config for pattern-matched topics. */ rd_kafka_conf_set_default_topic_conf(inst->conf, inst->topic_conf); } #if RD_KAFKA_VERSION >= 0x00090001 rd_kafka_conf_set_log_cb(inst->conf, kafkaLogger); #endif /* Create Kafka Consumer */ inst->rk = rd_kafka_new(RD_KAFKA_CONSUMER, inst->conf, kafkaErrMsg, sizeof(kafkaErrMsg)); if(inst->rk == NULL) { if(inst->bReportErrs) { LogError(0, RS_RET_KAFKA_ERROR, "imkafka: error creating kafka handle: %s\n", kafkaErrMsg); } ABORT_FINALIZE(RS_RET_KAFKA_ERROR); } #if RD_KAFKA_VERSION < 0x00090001 rd_kafka_set_logger(inst->rk, kafkaLogger); #endif DBGPRINTF("imkafka: setting brokers: '%s'\n", inst->brokers); if(rd_kafka_brokers_add(inst->rk, (char*)inst->brokers) == 0) { if(inst->bReportErrs) { LogError(0, RS_RET_KAFKA_NO_VALID_BROKERS, "imkafka: no valid brokers specified: %s", inst->brokers); } ABORT_FINALIZE(RS_RET_KAFKA_NO_VALID_BROKERS); } /* Kafka Consumer is opened */ inst->bIsConnected = 1; finalize_it: if(iRet != RS_RET_OK) { if(inst->rk == NULL) { if(inst->conf != NULL) { rd_kafka_conf_destroy(inst->conf); inst->conf = NULL; } } else { /* inst->rk != NULL ! */ rd_kafka_destroy(inst->rk); inst->rk = NULL; } } RETiRet; } /* function to generate an error message if the ruleset cannot be found */ static inline void std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, instanceConf_t *inst) { if(inst->bReportErrs) { LogError(0, NO_ERRCODE, "imkafka: ruleset '%s' not found - " "using default ruleset instead", inst->pszBindRuleset); } } static rsRetVal ATTR_NONNULL(2) addConsumer(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) { DEFiRet; rd_kafka_resp_err_t err; assert(inst != NULL); rd_kafka_topic_partition_list_t *topics = NULL; DBGPRINTF("imkafka: creating kafka consumer on %s/%s/%s\n", inst->topic, inst->consumergroup, inst->brokers); /* Redirect rd_kafka_poll() to consumer_poll() */ rd_kafka_poll_set_consumer(inst->rk); topics = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(topics, (const char*)inst->topic, inst->partition); DBGPRINTF("imkafka: Created topics(%d) for %s)\n", topics->cnt, inst->topic); if ((err = rd_kafka_subscribe(inst->rk, topics))) { /* Subscription failed */ inst->bIsSubscribed = 0; LogError(0, RS_RET_KAFKA_ERROR, "imkafka: Failed to start consuming " "topics: %s\n", rd_kafka_err2str(err)); ABORT_FINALIZE(RS_RET_KAFKA_ERROR); } else { DBGPRINTF("imkafka: Successfully subscribed to %s/%s/%s\n", inst->topic, inst->consumergroup, inst->brokers); /* Subscription is working */ inst->bIsSubscribed = 1; } finalize_it: if(topics != NULL) rd_kafka_topic_partition_list_destroy(topics); RETiRet; } static rsRetVal ATTR_NONNULL() processKafkaParam(char *const param, const char **const name, const char **const paramval) { DEFiRet; char *val = strstr(param, "="); if(val == NULL) { LogError(0, RS_RET_PARAM_ERROR, "missing equal sign in " "parameter '%s'", param); ABORT_FINALIZE(RS_RET_PARAM_ERROR); } *val = '\0'; /* terminates name */ ++val; /* now points to begin of value */ CHKmalloc(*name = strdup(param)); CHKmalloc(*paramval = strdup(val)); finalize_it: RETiRet; } BEGINnewInpInst struct cnfparamvals *pvals; instanceConf_t *inst; int i; CODESTARTnewInpInst DBGPRINTF("newInpInst (imkafka)\n"); if((pvals = nvlstGetParams(lst, &inppblk, NULL)) == NULL) { ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } if(Debug) { dbgprintf("input param blk in imkafka:\n"); cnfparamsPrint(&inppblk, pvals); } CHKiRet(createInstance(&inst)); for(i = 0 ; i < inppblk.nParams ; ++i) { if(!pvals[i].bUsed) continue; if(!strcmp(inppblk.descr[i].name, "broker")) { es_str_t *es = es_newStr(128); int bNeedComma = 0; for(int j = 0 ; j < pvals[i].val.d.ar->nmemb ; ++j) { if(bNeedComma) es_addChar(&es, ','); es_addStr(&es, pvals[i].val.d.ar->arr[j]); bNeedComma = 1; } inst->brokers = es_str2cstr(es, NULL); es_deleteStr(es); } else if(!strcmp(inppblk.descr[i].name, "confparam")) { inst->nConfParams = pvals[i].val.d.ar->nmemb; CHKmalloc(inst->confParams = malloc(sizeof(struct kafka_params)*inst->nConfParams)); for(int j = 0; j < inst->nConfParams; j++) { char *cstr = es_str2cstr(pvals[i].val.d.ar->arr[j], NULL); CHKiRet(processKafkaParam(cstr, &inst->confParams[j].name, &inst->confParams[j].val)); free(cstr); } } else if(!strcmp(inppblk.descr[i].name, "topic")) { inst->topic = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "consumergroup")) { inst->consumergroup = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "ruleset")) { inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "parsehostname")) { if (pvals[i].val.d.n) { inst->nMsgParsingFlags = NEEDS_PARSING | PARSE_HOSTNAME; } else { inst->nMsgParsingFlags = NEEDS_PARSING; } } else { dbgprintf("imkafka: program error, non-handled " "param '%s'\n", inppblk.descr[i].name); } } if(inst->brokers == NULL) { CHKmalloc(inst->brokers = strdup("localhost:9092")); LogMsg(0, NO_ERRCODE, LOG_INFO, "imkafka: \"broker\" parameter not specified " "using default of localhost:9092 -- this may not be what you want!"); } DBGPRINTF("imkafka: newInpIns brokers=%s, topic=%s, consumergroup=%s\n", inst->brokers, inst->topic, inst->consumergroup); finalize_it: CODE_STD_FINALIZERnewInpInst cnfparamvalsDestruct(pvals, &inppblk); ENDnewInpInst BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; pModConf->pConf = pConf; pModConf->pszBindRuleset = NULL; ENDbeginCnfLoad BEGINsetModCnf struct cnfparamvals *pvals = NULL; int i; CODESTARTsetModCnf pvals = nvlstGetParams(lst, &modpblk, NULL); if(pvals == NULL) { LogError(0, RS_RET_MISSING_CNFPARAMS, "imkafka: error processing module " "config parameters [module(...)]"); ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } if(Debug) { dbgprintf("module (global) param blk for imkafka:\n"); cnfparamsPrint(&modpblk, pvals); } for(i = 0 ; i < modpblk.nParams ; ++i) { if(!pvals[i].bUsed) continue; if(!strcmp(modpblk.descr[i].name, "ruleset")) { loadModConf->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { dbgprintf("imkafka: program error, non-handled " "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); } } finalize_it: if(pvals != NULL) cnfparamvalsDestruct(pvals, &modpblk); ENDsetModCnf BEGINendCnfLoad CODESTARTendCnfLoad if(loadModConf->pszBindRuleset == NULL) { if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) { loadModConf->pszBindRuleset = NULL; } else { CHKmalloc(loadModConf->pszBindRuleset = ustrdup(cs.pszBindRuleset)); } } finalize_it: free(cs.pszBindRuleset); cs.pszBindRuleset = NULL; loadModConf = NULL; /* done loading */ ENDendCnfLoad BEGINcheckCnf instanceConf_t *inst; CODESTARTcheckCnf for(inst = pModConf->root ; inst != NULL ; inst = inst->next) { if(inst->pszBindRuleset == NULL && pModConf->pszBindRuleset != NULL) { CHKmalloc(inst->pszBindRuleset = ustrdup(pModConf->pszBindRuleset)); } std_checkRuleset(pModConf, inst); } finalize_it: ENDcheckCnf BEGINactivateCnfPrePrivDrop CODESTARTactivateCnfPrePrivDrop runModConf = pModConf; ENDactivateCnfPrePrivDrop BEGINactivateCnf CODESTARTactivateCnf for(instanceConf_t *inst = pModConf->root ; inst != NULL ; inst = inst->next) { iRet = checkInstance(inst); } ENDactivateCnf BEGINfreeCnf instanceConf_t *inst, *del; CODESTARTfreeCnf for(inst = pModConf->root ; inst != NULL ; ) { free(inst->topic); free(inst->consumergroup); free(inst->brokers); free(inst->pszBindRuleset); for(int i = 0; i < inst->nConfParams; i++) { free((void*)inst->confParams[i].name); free((void*)inst->confParams[i].val); } free((void*)inst->confParams); del = inst; inst = inst->next; free(del); } free(pModConf->pszBindRuleset); ENDfreeCnf /* Cleanup imkafka worker threads */ static void shutdownKafkaWorkers(void) { int i; instanceConf_t *inst; assert(kafkaWrkrInfo != NULL); DBGPRINTF("imkafka: waiting on imkafka workerthread termination\n"); for(i = 0 ; i < activeKafkaworkers ; ++i) { pthread_join(kafkaWrkrInfo[i].tid, NULL); DBGPRINTF("imkafka: Stopped worker %d\n", i); } free(kafkaWrkrInfo); kafkaWrkrInfo = NULL; for(inst = runModConf->root ; inst != NULL ; inst = inst->next) { DBGPRINTF("imkafka: stop consuming %s/%s/%s\n", inst->topic, inst->consumergroup, inst->brokers); rd_kafka_consumer_close(inst->rk); /* Close the consumer, committing final offsets, etc. */ rd_kafka_destroy(inst->rk); /* Destroy handle object */ DBGPRINTF("imkafka: stopped consuming %s/%s/%s\n", inst->topic, inst->consumergroup, inst->brokers); #if RD_KAFKA_VERSION < 0x00090001 /* Wait for kafka being destroyed in old API */ if (rd_kafka_wait_destroyed(10000) < 0) { DBGPRINTF("imkafka: error, rd_kafka_destroy did not finish after grace " "timeout (10s)!\n"); } else { DBGPRINTF("imkafka: rd_kafka_destroy successfully finished\n"); } #endif } } /* This function is called to gather input. */ BEGINrunInput int i; instanceConf_t *inst; CODESTARTrunInput DBGPRINTF("imkafka: runInput loop started ...\n"); activeKafkaworkers = 0; for(inst = runModConf->root ; inst != NULL ; inst = inst->next) { if(inst->rk != NULL) { ++activeKafkaworkers; } } if(activeKafkaworkers == 0) { LogError(0, RS_RET_ERR, "imkafka: no active inputs, input does " "not run - there should have been additional error " "messages given previously"); ABORT_FINALIZE(RS_RET_ERR); } DBGPRINTF("imkafka: Starting %d imkafka workerthreads\n", activeKafkaworkers); kafkaWrkrInfo = calloc(activeKafkaworkers, sizeof(struct kafkaWrkrInfo_s)); if (kafkaWrkrInfo == NULL) { LogError(errno, RS_RET_OUT_OF_MEMORY, "imkafka: worker-info array allocation failed."); ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } /* Start worker threads for each imkafka input source */ i = 0; for(inst = runModConf->root ; inst != NULL ; inst = inst->next) { /* init worker info structure! */ kafkaWrkrInfo[i].inst = inst; /* Set reference pointer */ pthread_create(&kafkaWrkrInfo[i].tid, &wrkrThrdAttr, imkafkawrkr, &(kafkaWrkrInfo[i])); i++; } while(glbl.GetGlobalInputTermState() == 0) { /* Note: the additional 10000ns wait is vitally important. It guards rsyslog * against totally hogging the CPU if the users selects a polling interval * of 0 seconds. It doesn't hurt any other valid scenario. So do not remove. */ if(glbl.GetGlobalInputTermState() == 0) srSleep(0, 100000); } DBGPRINTF("imkafka: terminating upon request of rsyslog core\n"); /* we need to shutdown kafak worker threads here because this operation can * potentially block (e.g. when no kafka broker is available!). If this * happens in runInput, the rsyslog core can cancel our thread. However, * in afterRun this is not possible, because the core does not assume it * can block there. -- rgerhards, 2018-10-23 */ shutdownKafkaWorkers(); finalize_it: ENDrunInput BEGINwillRun CODESTARTwillRun /* we need to create the inputName property (only once during our lifetime) */ CHKiRet(prop.Construct(&pInputName)); CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("imkafka"), sizeof("imkafka") - 1)); CHKiRet(prop.ConstructFinalize(pInputName)); finalize_it: ENDwillRun BEGINafterRun CODESTARTafterRun if(pInputName != NULL) prop.Destruct(&pInputName); ENDafterRun BEGINmodExit CODESTARTmodExit pthread_attr_destroy(&wrkrThrdAttr); /* release objects we used */ objRelease(statsobj, CORE_COMPONENT); objRelease(ruleset, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); ENDmodExit BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURENonCancelInputTermination) iRet = RS_RET_OK; ENDisCompatibleWithFeature BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; CODEmodInit_QueryRegCFSLineHdlr /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); /* initialize "read-only" thread attributes */ pthread_attr_init(&wrkrThrdAttr); pthread_attr_setstacksize(&wrkrThrdAttr, 4096*1024); DBGPRINTF("imkafka %s using librdkafka version %s, 0x%x\n", VERSION, rd_kafka_version_str(), rd_kafka_version()); ENDmodInit /* * Workerthread function for a single kafka consomer */ static void * imkafkawrkr(void *myself) { struct kafkaWrkrInfo_s *me = (struct kafkaWrkrInfo_s*) myself; DBGPRINTF("imkafka: started kafka consumer workerthread on %s/%s/%s\n", me->inst->topic, me->inst->consumergroup, me->inst->brokers); do { if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ if(me->inst->rk == NULL) { continue; } // Try to add consumer only if connected! */ if(me->inst->bIsConnected == 1 && me->inst->bIsSubscribed == 0 ) { addConsumer(runModConf, me->inst); } if(me->inst->bIsSubscribed == 1 ) { msgConsume(me->inst); } /* Note: the additional 10000ns wait is vitally important. It guards rsyslog * against totally hogging the CPU if the users selects a polling interval * of 0 seconds. It doesn't hurt any other valid scenario. So do not remove. * rgerhards, 2008-02-14 */ if(glbl.GetGlobalInputTermState() == 0) srSleep(0, 100000); } while(glbl.GetGlobalInputTermState() == 0); DBGPRINTF("imkafka: stopped kafka consumer workerthread on %s/%s/%s\n", me->inst->topic, me->inst->consumergroup, me->inst->brokers); return NULL; }