diff options
Diffstat (limited to 'plugins/omkafka/omkafka.c')
-rw-r--r-- | plugins/omkafka/omkafka.c | 2141 |
1 files changed, 2141 insertions, 0 deletions
diff --git a/plugins/omkafka/omkafka.c b/plugins/omkafka/omkafka.c new file mode 100644 index 0000000..e8eae08 --- /dev/null +++ b/plugins/omkafka/omkafka.c @@ -0,0 +1,2141 @@ +/* omkafka.c + * This output plugin make rsyslog talk to Apache Kafka. + * + * Copyright 2014-2017 by Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <strings.h> +#include <assert.h> +#include <errno.h> +#include <fcntl.h> +#include <pthread.h> +#include <sys/uio.h> +#include <sys/queue.h> +#include <sys/types.h> +#include <math.h> +#ifdef HAVE_SYS_STAT_H +# include <sys/stat.h> +#endif +#include <unistd.h> +#include <librdkafka/rdkafka.h> + +#include "rsyslog.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "atomic.h" +#include "statsobj.h" +#include "unicode-helper.h" +#include "datetime.h" + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omkafka") + +/* internal structures + */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(datetime) +DEFobjCurrIf(strm) +DEFobjCurrIf(statsobj) + +statsobj_t *kafkaStats; +STATSCOUNTER_DEF(ctrQueueSize, mutCtrQueueSize); +STATSCOUNTER_DEF(ctrTopicSubmit, mutCtrTopicSubmit); +STATSCOUNTER_DEF(ctrKafkaFail, mutCtrKafkaFail); +STATSCOUNTER_DEF(ctrCacheMiss, mutCtrCacheMiss); +STATSCOUNTER_DEF(ctrCacheEvict, mutCtrCacheEvict); +STATSCOUNTER_DEF(ctrCacheSkip, mutCtrCacheSkip); +STATSCOUNTER_DEF(ctrKafkaAck, mutCtrKafkaAck); +STATSCOUNTER_DEF(ctrKafkaMsgTooLarge, mutCtrKafkaMsgTooLarge); +STATSCOUNTER_DEF(ctrKafkaUnknownTopic, mutCtrKafkaUnknownTopic); +STATSCOUNTER_DEF(ctrKafkaQueueFull, mutCtrKafkaQueueFull); +STATSCOUNTER_DEF(ctrKafkaUnknownPartition, mutCtrKafkaUnknownPartition); +STATSCOUNTER_DEF(ctrKafkaOtherErrors, mutCtrKafkaOtherErrors); +STATSCOUNTER_DEF(ctrKafkaRespTimedOut, mutCtrKafkaRespTimedOut); +STATSCOUNTER_DEF(ctrKafkaRespTransport, mutCtrKafkaRespTransport); +STATSCOUNTER_DEF(ctrKafkaRespBrokerDown, mutCtrKafkaRespBrokerDown); +STATSCOUNTER_DEF(ctrKafkaRespAuth, mutCtrKafkaRespAuth); +STATSCOUNTER_DEF(ctrKafkaRespSSL, mutCtrKafkaRespSSL); +STATSCOUNTER_DEF(ctrKafkaRespOther, mutCtrKafkaRespOther); + +#define MAX_ERRMSG 1024 /* max size of error messages that we support */ + +#ifndef SLIST_INIT +#define SLIST_INIT(head) do { \ + (head)->slh_first = NULL; \ +} while (/*CONSTCOND*/0) +#endif + +#ifndef SLIST_ENTRY +#define SLIST_ENTRY(type) \ + struct { \ + struct type *sle_next; /* next element */ \ + } +#endif + +#ifndef SLIST_HEAD +#define SLIST_HEAD(name, type) \ +struct name { \ + struct type *slh_first; /* first element */ \ +} +#endif + +#ifndef SLIST_INSERT_HEAD +#define SLIST_INSERT_HEAD(head, elm, field) do { \ + (elm)->field.sle_next = (head)->slh_first; \ + (head)->slh_first = (elm); \ +} while (/*CONSTCOND*/0) +#endif + +#ifndef SLIST_REMOVE_HEAD +#define SLIST_REMOVE_HEAD(head, field) do { \ + (head)->slh_first = (head)->slh_first->field.sle_next; \ +} while (/*CONSTCOND*/0) +#endif + +#ifndef SLIST_FIRST +#define SLIST_FIRST(head) ((head)->slh_first) +#endif + +#ifndef SLIST_NEXT +#define SLIST_NEXT(elm, field) ((elm)->field.sle_next) +#endif + +#ifndef SLIST_EMPTY +#define SLIST_EMPTY(head) ((head)->slh_first == NULL) +#endif + +#ifndef SLIST_REMOVE +#define SLIST_REMOVE(head, elm, type, field) do { \ + if ((head)->slh_first == (elm)) { \ + SLIST_REMOVE_HEAD((head), field); \ + } \ + else { \ + struct type *curelm = (head)->slh_first; \ + while(curelm->field.sle_next != (elm)) \ + curelm = curelm->field.sle_next; \ + curelm->field.sle_next = curelm->field.sle_next->field.sle_next; \ + } \ +} while (/*CONSTCOND*/0) +#endif + +#define NO_FIXED_PARTITION -1 /* signifies that no fixed partition config exists */ + +struct kafka_params { + const char *name; + const char *val; +}; + +#ifndef O_LARGEFILE +#define O_LARGEFILE 0 +#endif + +/* flags for writeKafka: shall we resubmit a failed message? */ +#define RESUBMIT 1 +#define NO_RESUBMIT 0 + +#ifdef HAVE_ATOMIC_BUILTINS64 +static uint64 clockTopicAccess = 0; +#else +static unsigned clockTopicAccess = 0; +#endif +/* and the "tick" function */ +#ifndef HAVE_ATOMIC_BUILTINS +static pthread_mutex_t mutClock; +#endif +static uint64 +getClockTopicAccess(void) +{ +#ifdef HAVE_ATOMIC_BUILTINS64 + return ATOMIC_INC_AND_FETCH_uint64(&clockTopicAccess, &mutClock); +#else + return ATOMIC_INC_AND_FETCH_unsigned(&clockTopicAccess, &mutClock); +#endif +} + +/* Needed for Kafka timestamp librdkafka > 0.9.4 */ +#define KAFKA_TimeStamp "\"%timestamp:::date-unixtimestamp%\"" + +static int closeTimeout = 1000; +static pthread_mutex_t closeTimeoutMut = PTHREAD_MUTEX_INITIALIZER; + +/* stats callback window metrics */ +static uint64 rtt_avg_usec; +static uint64 throttle_avg_msec; +static uint64 int_latency_avg_usec; + +/* dynamic topic cache */ +struct s_dynaTopicCacheEntry { + uchar *pName; + rd_kafka_topic_t *pTopic; + uint64 clkTickAccessed; + pthread_rwlock_t lock; +}; +typedef struct s_dynaTopicCacheEntry dynaTopicCacheEntry; + +/* Struct for Failed Messages Listitems */ +struct s_failedmsg_entry { + uchar* key; + uchar* payload; + uchar* topicname; + SLIST_ENTRY(s_failedmsg_entry) entries; /* List. */ +} ; +typedef struct s_failedmsg_entry failedmsg_entry; + +typedef struct _instanceData { + uchar *topic; + sbool dynaKey; + sbool dynaTopic; + dynaTopicCacheEntry **dynCache; + pthread_mutex_t mutDynCache; + rd_kafka_topic_t *pTopic; + int iCurrElt; + int iCurrCacheSize; + int bReportErrs; + int iDynaTopicCacheSize; + uchar *tplName; /* assigned output template */ + char *brokers; + sbool autoPartition; + int fixedPartition; + int nPartitions; + uint32_t currPartition; + DEF_ATOMIC_HELPER_MUT(mutCurrPartition); + int nConfParams; + struct kafka_params *confParams; + int nTopicConfParams; + struct kafka_params *topicConfParams; + uchar *errorFile; + uchar *key; + int bReopenOnHup; + int bResubmitOnFailure; /* Resubmit failed messages into kafka queue*/ + int bKeepFailedMessages;/* Keep Failed messages in memory, + only works if bResubmitOnFailure is enabled */ + uchar *failedMsgFile; /* file in which failed messages are being stored on + shutdown and loaded on startup */ + + int fdErrFile; /* error file fd or -1 if not open */ + pthread_mutex_t mutErrFile; + uchar *statsFile; + int fdStatsFile; /* stats file fd or -1 if not open */ + pthread_mutex_t mutStatsFile; + int bIsOpen; + int bIsSuspended; /* when broker fail, we need to suspend the action */ + pthread_rwlock_t rkLock; + pthread_mutex_t mut_doAction; /* make sure one wrkr instance max in parallel */ + rd_kafka_t *rk; + int closeTimeout; + SLIST_HEAD(failedmsg_listhead, s_failedmsg_entry) failedmsg_head; + + uchar *statsName; + statsobj_t *stats; + STATSCOUNTER_DEF(ctrTopicSubmit, mutCtrTopicSubmit); + STATSCOUNTER_DEF(ctrKafkaFail, mutCtrKafkaFail); + STATSCOUNTER_DEF(ctrKafkaAck, mutCtrKafkaAck); +} instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + +#define INST_STATSCOUNTER_INC(inst, ctr, mut) \ + do { \ + if (inst->stats) { STATSCOUNTER_INC(ctr, mut); } \ + } while(0); + +/* tables for interfacing with the v6 config system */ +/* action (instance) parameters */ +static struct cnfparamdescr actpdescr[] = { + { "topic", eCmdHdlrString, CNFPARAM_REQUIRED }, + { "dynatopic", eCmdHdlrBinary, 0 }, + { "dynatopic.cachesize", eCmdHdlrInt, 0 }, + { "dynakey", eCmdHdlrBinary, 0 }, + { "partitions.auto", eCmdHdlrBinary, 0 }, /* use librdkafka's automatic partitioning function */ + { "partitions.number", eCmdHdlrPositiveInt, 0 }, + { "partitions.usefixed", eCmdHdlrNonNegInt, 0 }, /* expert parameter, "nails" partition */ + { "broker", eCmdHdlrArray, 0 }, + { "confparam", eCmdHdlrArray, 0 }, + { "topicconfparam", eCmdHdlrArray, 0 }, + { "errorfile", eCmdHdlrGetWord, 0 }, + { "statsfile", eCmdHdlrGetWord, 0 }, + { "key", eCmdHdlrGetWord, 0 }, + { "template", eCmdHdlrGetWord, 0 }, + { "closetimeout", eCmdHdlrPositiveInt, 0 }, + { "reopenonhup", eCmdHdlrBinary, 0 }, + { "resubmitonfailure", eCmdHdlrBinary, 0 }, /* Resubmit message into kafaj queue on failure */ + { "keepfailedmessages", eCmdHdlrBinary, 0 }, + { "failedmsgfile", eCmdHdlrGetWord, 0 }, + { "statsname", eCmdHdlrGetWord, 0 } +}; +static struct cnfparamblk actpblk = + { CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr + }; + +BEGINinitConfVars /* (re)set config variables to default values */ +CODESTARTinitConfVars +ENDinitConfVars + +static uint32_t +getPartition(instanceData *const __restrict__ pData) +{ + if (pData->autoPartition) { + return RD_KAFKA_PARTITION_UA; + } else { + return (pData->fixedPartition == NO_FIXED_PARTITION) ? + ATOMIC_INC_AND_FETCH_unsigned(&pData->currPartition, + &pData->mutCurrPartition) % pData->nPartitions + : (unsigned) pData->fixedPartition; + } +} + +/* must always be called with appropriate locks taken */ +static void +free_topic(rd_kafka_topic_t **topic) +{ + if (*topic != NULL) { + DBGPRINTF("omkafka: closing topic %s\n", rd_kafka_topic_name(*topic)); + rd_kafka_topic_destroy(*topic); + *topic = NULL; + } +} + +static void ATTR_NONNULL(1) +failedmsg_entry_destruct(failedmsg_entry *const __restrict__ fmsgEntry) { + free(fmsgEntry->key); + free(fmsgEntry->payload); + free(fmsgEntry->topicname); + free(fmsgEntry); +} + +/* note: we need the length of message as we need to deal with + * non-NUL terminated strings under some circumstances. + */ +static failedmsg_entry * ATTR_NONNULL(3,5) +failedmsg_entry_construct(const char *const key, const size_t keylen, const char *const msg, +const size_t msglen, const char *const topicname) +{ + failedmsg_entry *etry = NULL; + + if((etry = malloc(sizeof(struct s_failedmsg_entry))) == NULL) { + return NULL; + } + + if (key) { + if((etry->key = (uchar*)malloc(keylen+1)) == NULL) { + free(etry); + return NULL; + } + memcpy(etry->key, key, keylen); + etry->key[keylen] = '\0'; + } else { + etry->key=NULL; + } + + if((etry->payload = (uchar*)malloc(msglen+1)) == NULL) { + free(etry->key); + free(etry); + return NULL; + } + memcpy(etry->payload, msg, msglen); + etry->payload[msglen] = '\0'; + if((etry->topicname = (uchar*)strdup(topicname)) == NULL) { + free(etry->key); + free(etry->payload); + free(etry); + return NULL; + } + return etry; +} + +/* destroy topic item */ +/* must be called with write(rkLock) */ +static void +closeTopic(instanceData *__restrict__ const pData) +{ + free_topic(&pData->pTopic); +} + +/* these dynaTopic* functions are only slightly modified versions of those found in omfile.c. + * check the sources in omfile.c for more descriptive comments about each of these functions. + * i will only put the bare descriptions in this one. 2015-01-09 - Tait Clarridge + */ + +/* delete a cache entry from the dynamic topic cache */ +/* must be called with lock(mutDynCache) */ +static rsRetVal +dynaTopicDelCacheEntry(instanceData *__restrict__ const pData, const int iEntry, const int bFreeEntry) +{ + dynaTopicCacheEntry **pCache = pData->dynCache; + DEFiRet; + assert(pCache != NULL); + + if(pCache[iEntry] == NULL) + FINALIZE; + pthread_rwlock_wrlock(&pCache[iEntry]->lock); + + DBGPRINTF("Removing entry %d for topic '%s' from dynaCache.\n", iEntry, + pCache[iEntry]->pName == NULL ? UCHAR_CONSTANT("[OPEN FAILED]") : pCache[iEntry]->pName); + + if(pCache[iEntry]->pName != NULL) { + free(pCache[iEntry]->pName); + pCache[iEntry]->pName = NULL; + } + + pthread_rwlock_unlock(&pCache[iEntry]->lock); + + if(bFreeEntry) { + pthread_rwlock_destroy(&pCache[iEntry]->lock); + free(pCache[iEntry]); + pCache[iEntry] = NULL; + } + +finalize_it: + RETiRet; +} + +/* clear the entire dynamic topic cache */ +static void +dynaTopicFreeCacheEntries(instanceData *__restrict__ const pData) +{ + register int i; + assert(pData != NULL); + + pthread_mutex_lock(&pData->mutDynCache); + for(i = 0 ; i < pData->iCurrCacheSize ; ++i) { + dynaTopicDelCacheEntry(pData, i, 1); + } + pData->iCurrElt = -1; /* invalidate current element */ + pthread_mutex_unlock(&pData->mutDynCache); +} + +/* create the topic object */ +/* must be called with _atleast_ read(rkLock) */ +static rsRetVal +createTopic(instanceData *__restrict__ const pData, const uchar *__restrict__ const newTopicName, +rd_kafka_topic_t** topic) { +/* Get a new topic conf */ + rd_kafka_topic_conf_t *const topicconf = rd_kafka_topic_conf_new(); + char errstr[MAX_ERRMSG]; + rd_kafka_topic_t *rkt = NULL; + DEFiRet; + + *topic = NULL; + + if(topicconf == NULL) { + LogError(0, RS_RET_KAFKA_ERROR, + "omkafka: error creating kafka topic conf obj: %s\n", + rd_kafka_err2str(rd_kafka_last_error())); + ABORT_FINALIZE(RS_RET_KAFKA_ERROR); + } + for(int i = 0 ; i < pData->nTopicConfParams ; ++i) { + DBGPRINTF("omkafka: setting custom topic configuration parameter: %s:%s\n", + pData->topicConfParams[i].name, + pData->topicConfParams[i].val); + if(rd_kafka_topic_conf_set(topicconf, pData->topicConfParams[i].name, + pData->topicConfParams[i].val, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + if(pData->bReportErrs) { + LogError(0, RS_RET_PARAM_ERROR, "error in kafka " + "topic conf parameter '%s=%s': %s", + pData->topicConfParams[i].name, + pData->topicConfParams[i].val, errstr); + } else { + DBGPRINTF("omkafka: setting custom topic configuration parameter '%s=%s': %s", + pData->topicConfParams[i].name, + pData->topicConfParams[i].val, errstr); + } + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + } + } + rkt = rd_kafka_topic_new(pData->rk, (char *)newTopicName, topicconf); + if(rkt == NULL) { + LogError(0, RS_RET_KAFKA_ERROR, + "omkafka: error creating kafka topic: %s\n", + rd_kafka_err2str(rd_kafka_last_error())); + ABORT_FINALIZE(RS_RET_KAFKA_ERROR); + } + *topic = rkt; +finalize_it: + RETiRet; +} + +/* create the topic object */ +/* must be called with write(rkLock) */ +static rsRetVal +prepareTopic(instanceData *__restrict__ const pData, const uchar *__restrict__ const newTopicName) +{ + DEFiRet; + iRet = createTopic(pData, newTopicName, &pData->pTopic); + if(iRet != RS_RET_OK) { + if(pData->pTopic != NULL) { + closeTopic(pData); + } + } + RETiRet; +} + +/* check dynamic topic cache for existence of the already created topic. + * if it does not exist, create a new one, or if we are currently using it + * as of the last message, keep using it. + * + * must be called with read(rkLock) + * must be called with mutDynCache locked + */ +static rsRetVal ATTR_NONNULL() +prepareDynTopic(instanceData *__restrict__ const pData, const uchar *__restrict__ const newTopicName, + rd_kafka_topic_t** topic, pthread_rwlock_t** lock) +{ + uint64 ctOldest; + int iOldest; + int i; + int iFirstFree; + rsRetVal localRet; + dynaTopicCacheEntry **pCache; + dynaTopicCacheEntry *entry = NULL; + rd_kafka_topic_t *tmpTopic = NULL; + DEFiRet; + assert(pData != NULL); + assert(newTopicName != NULL); + + pCache = pData->dynCache; + /* first check, if we still have the current topic */ + if ((pData->iCurrElt != -1) + && !ustrcmp(newTopicName, pCache[pData->iCurrElt]->pName)) { + /* great, we are all set */ + pCache[pData->iCurrElt]->clkTickAccessed = getClockTopicAccess(); + entry = pCache[pData->iCurrElt]; + STATSCOUNTER_INC(ctrCacheSkip, mutCtrCacheSkip); + FINALIZE; + } + + /* ok, no luck. Now let's search the table if we find a matching spot. + * While doing so, we also prepare for creation of a new one. + */ + pData->iCurrElt = -1; + iFirstFree = -1; + iOldest = 0; + ctOldest = getClockTopicAccess(); + for(i = 0 ; i < pData->iCurrCacheSize ; ++i) { + if(pCache[i] == NULL || pCache[i]->pName == NULL) { + if(iFirstFree == -1) + iFirstFree = i; + } else { /*got an element, let's see if it matches */ + if(!ustrcmp(newTopicName, pCache[i]->pName)) { + /* we found our element! */ + entry = pCache[i]; + pData->iCurrElt = i; + /* update "timestamp" for LRU */ + pCache[i]->clkTickAccessed = getClockTopicAccess(); + FINALIZE; + } + /* did not find it - so lets keep track of the counters for LRU */ + if(pCache[i]->clkTickAccessed < ctOldest) { + ctOldest = pCache[i]->clkTickAccessed; + iOldest = i; + } + } + } + STATSCOUNTER_INC(ctrCacheMiss, mutCtrCacheMiss); + + /* invalidate iCurrElt as we may error-exit out of this function when the currrent + * iCurrElt has been freed or otherwise become unusable. This is a precaution, and + * performance-wise it may be better to do that in each of the exits. However, that + * is error-prone, so I prefer to do it here. -- rgerhards, 2010-03-02 + */ + pData->iCurrElt = -1; + + if(iFirstFree == -1 && (pData->iCurrCacheSize < pData->iDynaTopicCacheSize)) { + /* there is space left, so set it to that index */ + iFirstFree = pData->iCurrCacheSize++; + } + + if(iFirstFree == -1) { + dynaTopicDelCacheEntry(pData, iOldest, 0); + STATSCOUNTER_INC(ctrCacheEvict, mutCtrCacheEvict); + iFirstFree = iOldest; /* this one *is* now free ;) */ + } else { + pCache[iFirstFree] = NULL; + } + /* we need to allocate memory for the cache structure */ + if(pCache[iFirstFree] == NULL) { + CHKmalloc(pCache[iFirstFree] = + (dynaTopicCacheEntry*) calloc(1, sizeof(dynaTopicCacheEntry))); + CHKiRet(pthread_rwlock_init(&pCache[iFirstFree]->lock, NULL)); + } + + /* Ok, we finally can open the topic */ + localRet = createTopic(pData, newTopicName, &tmpTopic); + + if(localRet != RS_RET_OK) { + LogError(0, localRet, "Could not open dynamic topic '%s' " + "[state %d] - discarding message", + newTopicName, localRet); + ABORT_FINALIZE(localRet); + } + + if((pCache[iFirstFree]->pName = ustrdup(newTopicName)) == NULL) { + free_topic(&tmpTopic); + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + pCache[iFirstFree]->pTopic = tmpTopic; + pCache[iFirstFree]->clkTickAccessed = getClockTopicAccess(); + entry = pCache[iFirstFree]; + pData->iCurrElt = iFirstFree; + DBGPRINTF("Added new entry %d for topic cache, topic '%s'.\n", iFirstFree, newTopicName); + +finalize_it: + if (iRet == RS_RET_OK) { + *topic = entry->pTopic; + *lock = &entry->lock; + } + RETiRet; +} + +/* write data error request/replies to separate error file + * Note: we open the file but never close it before exit. If it + * needs to be closed, HUP must be sent. + */ +static rsRetVal +writeDataError(instanceData *const pData, + const char *const __restrict__ data, + const size_t lenData, + const int kafkaErr) +{ + int bLocked = 0; + struct json_object *json = NULL; + DEFiRet; + + if(pData->errorFile == NULL) { + FINALIZE; + } + + json = json_object_new_object(); + if(json == NULL) { + ABORT_FINALIZE(RS_RET_ERR); + } + struct json_object *jval; + jval = json_object_new_int(kafkaErr); + json_object_object_add(json, "errcode", jval); + jval = json_object_new_string(rd_kafka_err2str(kafkaErr)); + json_object_object_add(json, "errmsg", jval); + jval = json_object_new_string_len(data, lenData); + json_object_object_add(json, "data", jval); + + struct iovec iov[2]; + iov[0].iov_base = (void*) json_object_get_string(json); + iov[0].iov_len = strlen(iov[0].iov_base); + iov[1].iov_base = (char *) "\n"; + iov[1].iov_len = 1; + + /* we must protect the file write do operations due to other wrks & HUP */ + pthread_mutex_lock(&pData->mutErrFile); + bLocked = 1; + if(pData->fdErrFile == -1) { + pData->fdErrFile = open((char*)pData->errorFile, + O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, + S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); + if(pData->fdErrFile == -1) { + LogError(errno, RS_RET_ERR, "omkafka: error opening error file %s", + pData->errorFile); + ABORT_FINALIZE(RS_RET_ERR); + } + } + + /* Note: we do not do real error-handling on the err file, as this + * complicates things way to much. + */ + const ssize_t nwritten = writev(pData->fdErrFile, iov, sizeof(iov)/sizeof(struct iovec)); + if(nwritten != (ssize_t) iov[0].iov_len + 1) { + LogError(errno, RS_RET_ERR, + "omkafka: error writing error file, write returns %lld\n", + (long long) nwritten); + } + +finalize_it: + if(bLocked) + pthread_mutex_unlock(&pData->mutErrFile); + if(json != NULL) + json_object_put(json); + RETiRet; +} + +/* write librdkafka stats object to a file + * Note: we open the file but never close it before exit. If it + * needs to be closed, HUP must be sent. + * Assumes pData->statsFile != NULL. + */ +static rsRetVal +writeStats(instanceData *const pData, + char *statsData, + const size_t lenData) +{ + int bLocked = 0; + DEFiRet; + + /* Protect the file write from operations due to other wrks & HUP */ + pthread_mutex_lock(&pData->mutStatsFile); + bLocked = 1; + if(pData->fdStatsFile == -1) { + pData->fdStatsFile = open((char*)pData->statsFile, + O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, + S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); + if(pData->fdStatsFile == -1) { + LogError(errno, RS_RET_ERR, "omkafka: error opening stats file %s", + pData->statsFile); + ABORT_FINALIZE(RS_RET_ERR); + } + } + + ssize_t nwritten = write(pData->fdStatsFile, statsData, lenData); + nwritten += write(pData->fdStatsFile, "\n", 1); + if(nwritten != (ssize_t) lenData + 1) { + LogError(errno, RS_RET_ERR, + "omkafka: error writing stats file, write returns %lld, expected %lld\n", + (long long) nwritten, (long long)(lenData + 1)); + } + +finalize_it: + if(bLocked) + pthread_mutex_unlock(&pData->mutStatsFile); + RETiRet; +} + +/* identify and count specific types of kafka failures. + */ +static rsRetVal +updateKafkaFailureCounts(rd_kafka_resp_err_t err) { + DEFiRet; + if (err == RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE) { + STATSCOUNTER_INC(ctrKafkaMsgTooLarge, mutCtrKafkaMsgTooLarge); + } else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) { + STATSCOUNTER_INC(ctrKafkaUnknownTopic, mutCtrKafkaUnknownTopic); + } else if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) { + STATSCOUNTER_INC(ctrKafkaQueueFull, mutCtrKafkaQueueFull); + } else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION) { + STATSCOUNTER_INC(ctrKafkaUnknownPartition, mutCtrKafkaUnknownPartition); + } else { + STATSCOUNTER_INC(ctrKafkaOtherErrors, mutCtrKafkaOtherErrors); + } + + RETiRet; +} + +/* must be called with read(rkLock) + * b_do_resubmit tells if we shall resubmit on error or not. This is needed + * when we submit already resubmitted messages. + */ +static rsRetVal ATTR_NONNULL(1, 3) +writeKafka(instanceData *const pData, uchar *const key, uchar *const msg, + uchar *const msgTimestamp, uchar *const topic, const int b_do_resubmit) +{ + DEFiRet; + const int partition = getPartition(pData); + rd_kafka_topic_t *rkt = NULL; + pthread_rwlock_t *dynTopicLock = NULL; + failedmsg_entry* fmsgEntry; + int topic_mut_locked = 0; + rd_kafka_resp_err_t msg_kafka_response; +#if RD_KAFKA_VERSION >= 0x00090400 + int64_t ttMsgTimestamp; +#else + int msg_enqueue_status = 0; +#endif + + DBGPRINTF("omkafka: trying to send: key:'%s', msg:'%s', timestamp:'%s'\n", + key, msg, msgTimestamp); + + if(pData->dynaTopic) { + DBGPRINTF("omkafka: topic to insert to: %s\n", topic); + /* ensure locking happens all inside this function */ + pthread_mutex_lock(&pData->mutDynCache); + const rsRetVal localRet = prepareDynTopic(pData, topic, &rkt, &dynTopicLock); + if (localRet == RS_RET_OK) { + pthread_rwlock_rdlock(dynTopicLock); + topic_mut_locked = 1; + } + pthread_mutex_unlock(&pData->mutDynCache); + CHKiRet(localRet); + } else { + rkt = pData->pTopic; + } + +#if RD_KAFKA_VERSION >= 0x00090400 + if (msgTimestamp == NULL) { + /* Resubmitted items don't have a timestamp */ + ttMsgTimestamp = 0; + } else { + ttMsgTimestamp = atoi((char*)msgTimestamp); /* Convert timestamp into int */ + ttMsgTimestamp *= 1000; /* Timestamp in Milliseconds for kafka */ + } + DBGPRINTF("omkafka: rd_kafka_producev timestamp=%s/%" PRId64 "\n", msgTimestamp, ttMsgTimestamp); + + /* Using new kafka producev API, includes Timestamp! */ + if (key == NULL) { + msg_kafka_response = rd_kafka_producev(pData->rk, + RD_KAFKA_V_RKT(rkt), + RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_VALUE(msg, strlen((char*)msg)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_TIMESTAMP(ttMsgTimestamp), + RD_KAFKA_V_KEY(NULL, 0), + RD_KAFKA_V_END); + } else { + DBGPRINTF("omkafka: rd_kafka_producev key=%s\n", key); + msg_kafka_response = rd_kafka_producev(pData->rk, + RD_KAFKA_V_RKT(rkt), + RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_VALUE(msg, strlen((char*)msg)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_TIMESTAMP(ttMsgTimestamp), + RD_KAFKA_V_KEY(key,strlen((char*)key)), + RD_KAFKA_V_END); + } + + if (msg_kafka_response != RD_KAFKA_RESP_ERR_NO_ERROR ) { + updateKafkaFailureCounts(msg_kafka_response); + + /* Put into kafka queue, again if configured! */ + if (pData->bResubmitOnFailure && + b_do_resubmit && + msg_kafka_response != RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE) { + DBGPRINTF("omkafka: Failed to produce to topic '%s' (rd_kafka_producev)" + "partition %d: '%d/%s' - adding MSG '%s' to failed for RETRY!\n", + rd_kafka_topic_name(rkt), partition, msg_kafka_response, + rd_kafka_err2str(msg_kafka_response), msg); + CHKmalloc(fmsgEntry = failedmsg_entry_construct((char*) key, key ? strlen((char*)key) : 0, + (char*) msg, strlen((char*)msg),rd_kafka_topic_name(rkt))); + SLIST_INSERT_HEAD(&pData->failedmsg_head, fmsgEntry, entries); + } else { + LogError(0, RS_RET_KAFKA_PRODUCE_ERR, + "omkafka: Failed to produce to topic '%s' (rd_kafka_producev)" + "partition %d: %d/%s - KEY '%s' -MSG '%s'\n", + rd_kafka_topic_name(rkt), partition, msg_kafka_response, + rd_kafka_err2str(msg_kafka_response), key, msg); + } + } +#else + + DBGPRINTF("omkafka: rd_kafka_produce\n"); + /* Using old kafka produce API */ + msg_enqueue_status = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, + msg, strlen((char*)msg), key, + key ? strlen((char*)key) : 0, + NULL); + if(msg_enqueue_status == -1) { + msg_kafka_response = rd_kafka_last_error(); + updateKafkaFailureCounts(msg_kafka_response); + + /* Put into kafka queue, again if configured! */ + if (pData->bResubmitOnFailure && + b_do_resubmit && + msg_kafka_response != RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE) { + DBGPRINTF("omkafka: Failed to produce to topic '%s' (rd_kafka_produce)" + "partition %d: '%d/%s' - adding MSG '%s' KEY '%s' to failed for RETRY!\n", + rd_kafka_topic_name(rkt), partition, msg_kafka_response, + rd_kafka_err2str(rd_kafka_errno2err(errno)), msg, key ? (const char*) key : ""); + CHKmalloc(fmsgEntry = failedmsg_entry_construct((char*) key, key ? strlen((char*)key) : 0, + (char*) msg, strlen((char*)msg),rd_kafka_topic_name(rkt))); + SLIST_INSERT_HEAD(&pData->failedmsg_head, fmsgEntry, entries); + } else { + LogError(0, RS_RET_KAFKA_PRODUCE_ERR, + "omkafka: Failed to produce to topic '%s' (rd_kafka_produce) " + "partition %d: %d/%s - MSG '%s' KEY '%s'\n", + rd_kafka_topic_name(rkt), partition, msg_kafka_response, + rd_kafka_err2str(msg_kafka_response), msg, key); + } + } +#endif + + const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */ + DBGPRINTF("omkafka: writeKafka kafka outqueue length: %d, callbacks called %d\n", + rd_kafka_outq_len(pData->rk), callbacksCalled); + +#if RD_KAFKA_VERSION >= 0x00090400 + if (msg_kafka_response != RD_KAFKA_RESP_ERR_NO_ERROR) { +#else + if (msg_enqueue_status == -1) { +#endif + STATSCOUNTER_INC(ctrKafkaFail, mutCtrKafkaFail); + INST_STATSCOUNTER_INC(pData, pData->ctrKafkaFail, pData->mutCtrKafkaFail); + ABORT_FINALIZE(RS_RET_KAFKA_PRODUCE_ERR); + /* ABORT_FINALIZE isn't absolutely necessary as of now, + because this is the last line anyway, but its useful to ensure + correctness in case we add more stuff below this line at some point*/ + } + +finalize_it: + if(topic_mut_locked) { + pthread_rwlock_unlock(dynTopicLock); + } + DBGPRINTF("omkafka: writeKafka returned %d\n", iRet); + if(iRet != RS_RET_OK) { + iRet = RS_RET_SUSPENDED; + } + STATSCOUNTER_SETMAX_NOMUT(ctrQueueSize, (unsigned) rd_kafka_outq_len(pData->rk)); + STATSCOUNTER_INC(ctrTopicSubmit, mutCtrTopicSubmit); + INST_STATSCOUNTER_INC(pData, pData->ctrTopicSubmit, pData->mutCtrTopicSubmit); + RETiRet; +} + +static void +deliveryCallback(rd_kafka_t __attribute__((unused)) *rk, + const rd_kafka_message_t *rkmessage, + void *opaque) +{ + instanceData *const pData = (instanceData *) opaque; + failedmsg_entry* fmsgEntry; + DEFiRet; + + if (rkmessage->err) { + updateKafkaFailureCounts(rkmessage->err); + + /* Put into kafka queue, again if configured! */ + if (pData->bResubmitOnFailure) { + DBGPRINTF("omkafka: kafka delivery FAIL on Topic '%s', msg '%.*s', key '%.*s' -" + " adding to FAILED MSGs for RETRY!\n", + rd_kafka_topic_name(rkmessage->rkt), + (int)(rkmessage->len-1), (char*)rkmessage->payload, + (int)(rkmessage->key_len), (char*)rkmessage->key); + CHKmalloc(fmsgEntry = failedmsg_entry_construct(rkmessage->key, rkmessage->key_len, + rkmessage->payload, rkmessage->len,rd_kafka_topic_name(rkmessage->rkt))); + SLIST_INSERT_HEAD(&pData->failedmsg_head, fmsgEntry, entries); + } else { + LogError(0, RS_RET_ERR, + "omkafka: kafka delivery FAIL on Topic '%s', msg '%.*s', key '%.*s'\n", + rd_kafka_topic_name(rkmessage->rkt), + (int)(rkmessage->len-1), (char*)rkmessage->payload, + (int)(rkmessage->key_len), (char*)rkmessage->key); + writeDataError(pData, (char*) rkmessage->payload, rkmessage->len, rkmessage->err); + } + STATSCOUNTER_INC(ctrKafkaFail, mutCtrKafkaFail); + INST_STATSCOUNTER_INC(pData, pData->ctrKafkaFail, pData->mutCtrKafkaFail); + } else { + DBGPRINTF("omkafka: kafka delivery SUCCESS on msg '%.*s'\n", (int)(rkmessage->len-1), + (char*)rkmessage->payload); + STATSCOUNTER_INC(ctrKafkaAck, mutCtrKafkaAck); + INST_STATSCOUNTER_INC(pData, pData->ctrKafkaAck, pData->mutCtrKafkaAck); + } +finalize_it: + if(iRet != RS_RET_OK) { + DBGPRINTF("omkafka: deliveryCallback returned failure %d\n", iRet); + } +} + +/** + * This function looks for a json object that corresponds to the + * passed name and returns it is found. Otherwise returns NULL. + * It will be used for processing stats callback json object. + */ +static struct fjson_object * +get_object(struct fjson_object *fj_obj, const char * name) { + struct fjson_object_iterator it = fjson_object_iter_begin(fj_obj); + struct fjson_object_iterator itEnd = fjson_object_iter_end(fj_obj); + while (!fjson_object_iter_equal (&it, &itEnd)) { + const char * key = fjson_object_iter_peek_name (&it); + struct fjson_object * val = fjson_object_iter_peek_value(&it); + if(!strncmp(key, name, strlen(name))){ + return val; + } + fjson_object_iter_next (&it); + } + + return NULL; +} + +/** + * This function performs a two level search in stats callback json + * object. It iterates over broker objects and for each broker object + * returns desired level2 value (such as avg/min/max) for specified + * level1 window statistic (such as rtt/throttle/int_latency). Threshold + * allows skipping values that are too small, so that they don't + * impact on aggregate averaged value that is returned. + */ +static uint64 +jsonExtractWindoStats(struct fjson_object * stats_object, + const char * level1_obj_name, const char * level2_obj_name, + unsigned long skip_threshold) { + uint64 level2_val; + uint64 agg_val = 0; + uint64 ret_val = 0; + int active_brokers = 0; + + struct fjson_object * brokers_obj = get_object(stats_object, "brokers"); + if (brokers_obj == NULL) { + LogMsg(0, NO_ERRCODE, LOG_ERR, "jsonExtractWindowStat: failed to find brokers object"); + return ret_val; + } + + /* iterate over borkers to get level1 window objects at level2 (min, max, avg, etc.) */ + struct fjson_object_iterator it = fjson_object_iter_begin(brokers_obj); + struct fjson_object_iterator itEnd = fjson_object_iter_end(brokers_obj); + while (!fjson_object_iter_equal (&it, &itEnd)) { + struct fjson_object * val = fjson_object_iter_peek_value(&it); + struct fjson_object * level1_obj = get_object(val, level1_obj_name); + if(level1_obj == NULL) + return ret_val; + + struct fjson_object * level2_obj = get_object(level1_obj, level2_obj_name); + if(level2_obj == NULL) + return ret_val; + + level2_val = fjson_object_get_int64(level2_obj); + if (level2_val > skip_threshold) { + agg_val += level2_val; + active_brokers++; + } + fjson_object_iter_next (&it); + } + if(active_brokers > 0) { + ret_val = agg_val/active_brokers; + } + + return ret_val; +} + +/** + * librdkafka will call this function after every statistics.interval.ms + * interval, which is specified in confParam. See the explanation at: + * https://github.com/edenhill/librdkafka/wiki/Statistics + * + * Here we have extracted windows stats: rtt, throttle time, and internal + * latency averages. These values will be logged as impstats messages. + */ +static int +statsCallback(rd_kafka_t __attribute__((unused)) *rk, + char *json, size_t __attribute__((unused)) json_len, + void __attribute__((unused)) *opaque) { + instanceData *const pData = (instanceData *) opaque; + char buf[2048]; + char handler_name[1024] = "unknown"; + int replyq = 0; + int msg_cnt = 0; + int msg_size = 0; + uint64 msg_max = 0; + uint64 msg_size_max = 0; + + struct fjson_object * stats_object = NULL; + struct fjson_object * fj_obj = NULL; + + DBGPRINTF("omkafka: librdkafka stats callback: %s\n", json); + + /* prepare fjson object from stats callback for parsing */ + stats_object = fjson_tokener_parse(json); + if (stats_object == NULL) { + LogMsg(0, NO_ERRCODE, LOG_ERR, "statsCallback: fjson tokenizer failed:"); + return 0; + } + enum fjson_type type = fjson_object_get_type(stats_object); + if (type != fjson_type_object) { + LogMsg(0, NO_ERRCODE, LOG_ERR, "statsCallback: json is not of type object; can't process statsCB\n"); + return 0; + } + + /* top level stats extraction through libfastjson based parsing */ + fj_obj = get_object(stats_object, "name"); + if (fj_obj != NULL) + snprintf(handler_name, sizeof(handler_name), "%s", (char *)fjson_object_get_string(fj_obj)); + fj_obj = get_object(stats_object, "replyq"); + replyq = (fj_obj == NULL) ? 0 : fjson_object_get_int(fj_obj); + fj_obj = get_object(stats_object, "msg_cnt"); + msg_cnt = (fj_obj == NULL) ? 0 : fjson_object_get_int(fj_obj); + fj_obj = get_object(stats_object, "msg_size"); + msg_size = (fj_obj == NULL) ? 0 : fjson_object_get_int(fj_obj); + fj_obj = get_object(stats_object, "msg_max"); + msg_max = (fj_obj == NULL) ? 0 : fjson_object_get_int64(fj_obj); + fj_obj = get_object(stats_object, "msg_size_max"); + msg_size_max = (fj_obj == NULL) ? 0 : fjson_object_get_int64(fj_obj); + + /* window stats extraction to be picked up by impstats counters */ + rtt_avg_usec = jsonExtractWindoStats(stats_object, "rtt", "avg", 100); + throttle_avg_msec = jsonExtractWindoStats(stats_object, "throttle", "avg", 0); + int_latency_avg_usec = jsonExtractWindoStats(stats_object, "int_latency", "avg", 0); + json_object_put (stats_object); + + /* emit a log line to get stats visibility per librdkafka client */ + snprintf(buf, sizeof(buf), + "statscb_window_stats: handler_name=%s replyq=%d msg_cnt=%d msg_size=%d " + "msg_max=%lld msg_size_max=%lld rtt_avg_usec=%lld throttle_avg_msec=%lld " + "int_latency_avg_usec=%lld", + handler_name, replyq, msg_cnt, msg_size, msg_max, msg_size_max, + rtt_avg_usec, throttle_avg_msec, int_latency_avg_usec); + LogMsg(0, NO_ERRCODE, LOG_INFO, "%s\n", buf); + + /* Write the entire json stats object, if requested */ + if (pData->statsFile != NULL) + writeStats(pData, json, json_len); + + return 0; +} + +static void +kafkaLogger(const rd_kafka_t __attribute__((unused)) *rk, int level, + const char *fac, const char *buf) +{ + DBGPRINTF("omkafka: kafka log message [%d,%s]: %s\n", + level, fac, buf); +} + +/* should be called with write(rkLock) */ +static void +do_rd_kafka_destroy(instanceData *const __restrict__ pData) +{ + if (pData->rk == NULL) { + DBGPRINTF("omkafka: onDestroy can't close, handle wasn't open\n"); + goto done; + } + int queuedCount = rd_kafka_outq_len(pData->rk); + DBGPRINTF("omkafka: onDestroy closing - items left in outqueue: %d\n", queuedCount); + + struct timespec tOut; + timeoutComp(&tOut, pData->closeTimeout); + + while (timeoutVal(&tOut) > 0) { + queuedCount = rd_kafka_outq_len(pData->rk); + if (queuedCount > 0) { + /* Flush all remaining kafka messages (rd_kafka_poll is called inside) */ + const int flushStatus = rd_kafka_flush(pData->rk, pData->closeTimeout); + if (flushStatus == RD_KAFKA_RESP_ERR_NO_ERROR) { + DBGPRINTF("omkafka: onDestroyflushed remaining '%d' messages " + "to kafka topic '%s'\n", queuedCount, + (pData->pTopic == NULL ? "NULL" : rd_kafka_topic_name(pData->pTopic)) + ); + + /* Trigger callbacks a last time before shutdown */ + const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */ + DBGPRINTF("omkafka: onDestroy kafka outqueue length: %d, " + "callbacks called %d\n", rd_kafka_outq_len(pData->rk), + callbacksCalled); + } else /* TODO: Handle unsend messages here! */ { + /* timeout = RD_KAFKA_RESP_ERR__TIMED_OUT */ + LogError(0, RS_RET_KAFKA_ERROR, "omkafka: onDestroy " + "Failed to send remaining '%d' messages to " + "topic '%s' on shutdown with error: '%s'", + queuedCount, + (pData->pTopic == NULL ? "NULL" : rd_kafka_topic_name(pData->pTopic)), + rd_kafka_err2str(flushStatus)); +#if RD_KAFKA_VERSION >= 0x010001ff + rd_kafka_purge(pData->rk, RD_KAFKA_PURGE_F_QUEUE | RD_KAFKA_PURGE_F_INFLIGHT); + /* Trigger callbacks a last time before shutdown */ + const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */ + DBGPRINTF("omkafka: onDestroy kafka outqueue length: %d, " + "callbacks called %d\n", rd_kafka_outq_len(pData->rk), + callbacksCalled); +#endif + } + } else { + break; + } + } + if (queuedCount > 0) { + LogMsg(0, RS_RET_ERR, LOG_WARNING, + "omkafka: queue-drain for close timed-out took too long, " + "items left in outqueue: %d -- this may indicate data loss", + rd_kafka_outq_len(pData->rk)); + } + if (pData->dynaTopic) { + dynaTopicFreeCacheEntries(pData); + } else { + closeTopic(pData); + } + + /* Final destroy of kafka!*/ + rd_kafka_destroy(pData->rk); + +# if RD_KAFKA_VERSION < 0x00090001 + /* Wait for kafka being destroyed in old API */ + if (rd_kafka_wait_destroyed(10000) < 0) { + LogError(0, RS_RET_ERR, "omkafka: rd_kafka_destroy did not finish after grace timeout (10s)!"); + } else { + DBGPRINTF("omkafka: rd_kafka_destroy successfully finished\n"); + } +# endif + + pData->rk = NULL; +done: return; +} + +/* should be called with write(rkLock) */ +static void +closeKafka(instanceData *const __restrict__ pData) +{ + if(pData->bIsOpen) { + do_rd_kafka_destroy(pData); + pData->bIsOpen = 0; + } +} + +static void +errorCallback(rd_kafka_t __attribute__((unused)) *rk, + int __attribute__((unused)) err, + const char *reason, + void __attribute__((unused)) *opaque) +{ + /* Get InstanceData pointer */ + instanceData *const pData = (instanceData *) opaque; + + /* count kafka transport errors that cause action suspension */ + if (err == RD_KAFKA_RESP_ERR__MSG_TIMED_OUT) { + STATSCOUNTER_INC(ctrKafkaRespTimedOut, mutCtrKafkaRespTimedOut); + } else if (err == RD_KAFKA_RESP_ERR__TRANSPORT) { + STATSCOUNTER_INC(ctrKafkaRespTransport, mutCtrKafkaRespTransport); + } else if (err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN) { + STATSCOUNTER_INC(ctrKafkaRespBrokerDown, mutCtrKafkaRespBrokerDown); + } else if (err == RD_KAFKA_RESP_ERR__AUTHENTICATION) { + STATSCOUNTER_INC(ctrKafkaRespAuth, mutCtrKafkaRespAuth); + } else if (err == RD_KAFKA_RESP_ERR__SSL) { + STATSCOUNTER_INC(ctrKafkaRespSSL, mutCtrKafkaRespSSL); + } else { + STATSCOUNTER_INC(ctrKafkaRespOther, mutCtrKafkaRespOther); + } + + /* Handle common transport error codes*/ + if (err == RD_KAFKA_RESP_ERR__MSG_TIMED_OUT || + err == RD_KAFKA_RESP_ERR__TRANSPORT || + err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN || + err == RD_KAFKA_RESP_ERR__AUTHENTICATION || + err == RD_KAFKA_RESP_ERR__SSL) { + /* Broker transport error, we need to disable the action for now!*/ + pData->bIsSuspended = 1; + LogMsg(0, RS_RET_KAFKA_ERROR, LOG_WARNING, + "omkafka: action will suspended due to kafka error %d: %s", + err, rd_kafka_err2str(err)); + } else { + LogError(0, RS_RET_KAFKA_ERROR, "omkafka: kafka error message: %d,'%s','%s'", + err, rd_kafka_err2str(err), reason); + } +} + + + +#if 0 /* the stock librdkafka version in Ubuntu 14.04 LTS does NOT support metadata :-( */ +/* Note: this is a skeleton, with some code missing--> add it when it is actually implemented. */ +static int +getConfiguredPartitions() +{ + struct rd_kafka_metadata *pMetadata; + if(rd_kafka_metadata(pData->rk, 0, rkt, &pMetadata, 8) + == RD_KAFKA_RESP_ERR_NO_ERROR) { + dbgprintf("omkafka: topic '%s' has %d partitions\n", + pData->topic, pMetadata->topics[0]->partition_cnt); + rd_kafka_metadata_destroy(pMetadata); + } else { + dbgprintf("omkafka: error reading metadata\n"); + // TODO: handle this gracefull **when** we actually need + // the metadata -- or remove completely. 2014-12-12 rgerhards + } +} +#endif + +/* should be called with write(rkLock) */ +static rsRetVal +openKafka(instanceData *const __restrict__ pData) +{ + char errstr[MAX_ERRMSG]; + DEFiRet; + + if(pData->bIsOpen) + FINALIZE; + + pData->pTopic = NULL; + + /* main conf */ + rd_kafka_conf_t *const conf = rd_kafka_conf_new(); + if(conf == NULL) { + LogError(0, RS_RET_KAFKA_ERROR, "omkafka: error creating kafka conf obj: %s\n", + rd_kafka_err2str(rd_kafka_last_error())); + ABORT_FINALIZE(RS_RET_KAFKA_ERROR); + } + +#ifdef DEBUG + /* enable kafka debug output */ + if(rd_kafka_conf_set(conf, "debug", RD_KAFKA_DEBUG_CONTEXTS, + errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + LogError(0, RS_RET_KAFKA_ERROR, "omkafka: error setting kafka debug option: %s\n", errstr); + /* DO NOT ABORT IN THIS CASE! */ + } +#endif + + for(int i = 0 ; i < pData->nConfParams ; ++i) { + DBGPRINTF("omkafka: setting custom configuration parameter: %s:%s\n", + pData->confParams[i].name, + pData->confParams[i].val); + if(rd_kafka_conf_set(conf, pData->confParams[i].name, + pData->confParams[i].val, errstr, sizeof(errstr)) + != RD_KAFKA_CONF_OK) { + if(pData->bReportErrs) { + LogError(0, RS_RET_PARAM_ERROR, "error setting custom configuration " + "parameter '%s=%s': %s", + pData->confParams[i].name, + pData->confParams[i].val, errstr); + } else { + DBGPRINTF("omkafka: error setting custom configuration parameter '%s=%s': %s", + pData->confParams[i].name, + pData->confParams[i].val, errstr); + } + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + } + } + rd_kafka_conf_set_opaque(conf, (void *) pData); + rd_kafka_conf_set_dr_msg_cb(conf, deliveryCallback); + rd_kafka_conf_set_error_cb(conf, errorCallback); + rd_kafka_conf_set_stats_cb(conf, statsCallback); +# if RD_KAFKA_VERSION >= 0x00090001 + rd_kafka_conf_set_log_cb(conf, kafkaLogger); +# endif + + char kafkaErrMsg[1024]; + pData->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, kafkaErrMsg, sizeof(kafkaErrMsg)); + if(pData->rk == NULL) { + LogError(0, RS_RET_KAFKA_ERROR, + "omkafka: error creating kafka handle: %s\n", kafkaErrMsg); + ABORT_FINALIZE(RS_RET_KAFKA_ERROR); + } + +# if RD_KAFKA_VERSION < 0x00090001 + rd_kafka_conf_set_log_cb(pData->rk, kafkaLogger); +# endif + DBGPRINTF("omkafka setting brokers: '%s'n", pData->brokers); + if(rd_kafka_brokers_add(pData->rk, (char*)pData->brokers) == 0) { + LogError(0, RS_RET_KAFKA_NO_VALID_BROKERS, + "omkafka: no valid brokers specified: %s\n", pData->brokers); + ABORT_FINALIZE(RS_RET_KAFKA_NO_VALID_BROKERS); + } + + pData->bIsOpen = 1; +finalize_it: + if(iRet == RS_RET_OK) { + pData->bReportErrs = 1; + } else { + pData->bReportErrs = 0; + if(pData->rk != NULL) { + do_rd_kafka_destroy(pData); + } + } + RETiRet; +} + +static rsRetVal +setupKafkaHandle(instanceData *const __restrict__ pData, int recreate) +{ + DEFiRet; + pthread_rwlock_wrlock(&pData->rkLock); + if (recreate) { + closeKafka(pData); + } + CHKiRet(openKafka(pData)); + if (! pData->dynaTopic) { + if( pData->pTopic == NULL) + CHKiRet(prepareTopic(pData, pData->topic)); + } +finalize_it: + if (iRet != RS_RET_OK) { + if (pData->rk != NULL) { + closeKafka(pData); + } + + /* Parameter Error's cannot be resumed, so we need to disable the action */ + if (iRet == RS_RET_PARAM_ERROR) { + iRet = RS_RET_DISABLE_ACTION; + LogError(0, iRet, "omkafka: action will be disabled due invalid " + "kafka configuration parameters\n"); + } + + } + pthread_rwlock_unlock(&pData->rkLock); + RETiRet; +} + +static rsRetVal +checkFailedMessages(instanceData *const __restrict__ pData) +{ + failedmsg_entry* fmsgEntry; + DEFiRet; + + /* Loop through failed messages, reprocess them first! */ + while (!SLIST_EMPTY(&pData->failedmsg_head)) { + fmsgEntry = SLIST_FIRST(&pData->failedmsg_head); + assert(fmsgEntry != NULL); + /* Put back into kafka! */ + iRet = writeKafka(pData, (uchar*) fmsgEntry->key, (uchar*) fmsgEntry->payload, NULL, + fmsgEntry->topicname,NO_RESUBMIT); + if(iRet != RS_RET_OK) { + LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING, + "omkafka: failed to deliver failed msg '%.*s' with status %d. " + "- suspending AGAIN!", + (int)(strlen((char*)fmsgEntry->payload)-1), + (char*)fmsgEntry->payload, iRet); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } else { + DBGPRINTF("omkafka: successfully delivered failed msg '%.*s'.\n", + (int)(strlen((char*)fmsgEntry->payload)-1), + (char*)fmsgEntry->payload); + /* Note: we can use SLIST even though it is o(n), because the element + * in question is always either the root or the next element and + * SLIST_REMOVE iterates only until the element to be deleted is found. + * We cannot use SLIST_REMOVE_HEAD() as new elements may have been + * added in the delivery callback! + * TODO: sounds like bad logic -- why do we add and remove, just simply + * keep it in queue? + */ + SLIST_REMOVE(&pData->failedmsg_head, fmsgEntry, s_failedmsg_entry, entries); + failedmsg_entry_destruct(fmsgEntry); + } + } + +finalize_it: + RETiRet; +} + +/* This function persists failed messages into a data file, so they can + * be resend on next startup. + * alorbach, 2017-06-02 + */ +static rsRetVal ATTR_NONNULL(1) +persistFailedMsgs(instanceData *const __restrict__ pData) +{ + DEFiRet; + int fdMsgFile = -1; + ssize_t nwritten; + + if(SLIST_EMPTY(&pData->failedmsg_head)) { + DBGPRINTF("omkafka: persistFailedMsgs: We do not need to persist failed messages.\n"); + FINALIZE; + } + + fdMsgFile = open((char*)pData->failedMsgFile, + O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, + S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); + if(fdMsgFile == -1) { + LogError(errno, RS_RET_ERR, "omkafka: persistFailedMsgs error opening failed msg file %s", + pData->failedMsgFile); + ABORT_FINALIZE(RS_RET_ERR); + } + + while (!SLIST_EMPTY(&pData->failedmsg_head)) { + failedmsg_entry* fmsgEntry = SLIST_FIRST(&pData->failedmsg_head); + assert(fmsgEntry != NULL); + nwritten = write(fdMsgFile, fmsgEntry->topicname, ustrlen(fmsgEntry->topicname) ); + if(nwritten != -1) + nwritten = write(fdMsgFile, "\t", 1); + if((nwritten != -1) && (fmsgEntry->key)) + nwritten = write(fdMsgFile, fmsgEntry->key, ustrlen(fmsgEntry->key) ); + if(nwritten != -1) + nwritten = write(fdMsgFile, "\t", 1); + if(nwritten != -1) + nwritten = write(fdMsgFile, fmsgEntry->payload, ustrlen(fmsgEntry->payload) ); + if(nwritten == -1) { + LogError(errno, RS_RET_ERR, "omkafka: persistFailedMsgs error writing failed msg file"); + ABORT_FINALIZE(RS_RET_ERR); + } else { + DBGPRINTF("omkafka: persistFailedMsgs successfully written loaded msg '%.*s' for " + "topic '%s'\n", (int)(strlen((char*)fmsgEntry->payload)-1), + fmsgEntry->payload, fmsgEntry->topicname); + } + SLIST_REMOVE_HEAD(&pData->failedmsg_head, entries); + failedmsg_entry_destruct(fmsgEntry); + } + +finalize_it: + if(fdMsgFile != -1) { + close(fdMsgFile); + } + if(iRet != RS_RET_OK) { + LogError(0, iRet, "omkafka: could not persist failed messages " + "file %s - failed messages will be lost.", + (char*)pData->failedMsgFile); + } + RETiRet; +} + +/* This function loads failed messages from a data file, so they can + * be resend after action startup. + * alorbach, 2017-06-06 + */ +static rsRetVal +loadFailedMsgs(instanceData *const __restrict__ pData) +{ + DEFiRet; + struct stat stat_buf; + failedmsg_entry* fmsgEntry; + strm_t *pstrmFMSG = NULL; + cstr_t *pCStr = NULL; + uchar *puStr; + char *pStrTabPos; + char *pStrTabPos2; + + assert(pData->failedMsgFile != NULL); + + /* check if the file exists */ + if(stat((char*) pData->failedMsgFile, &stat_buf) == -1) { + if(errno == ENOENT) { + DBGPRINTF("omkafka: loadFailedMsgs failed messages file %s wasn't found, " + "continue startup\n", pData->failedMsgFile); + ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); + } else { + LogError(errno, RS_RET_IO_ERROR, + "omkafka: loadFailedMsgs could not open failed messages file %s", + pData->failedMsgFile); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + } else { + DBGPRINTF("omkafka: loadFailedMsgs found failed message file %s.\n", + pData->failedMsgFile); + } + + /* File exists, we can load and process it */ + CHKiRet(strm.Construct(&pstrmFMSG)); + CHKiRet(strm.SettOperationsMode(pstrmFMSG, STREAMMODE_READ)); + CHKiRet(strm.SetsType(pstrmFMSG, STREAMTYPE_FILE_SINGLE)); + CHKiRet(strm.SetFName(pstrmFMSG, pData->failedMsgFile, ustrlen(pData->failedMsgFile))); + CHKiRet(strm.ConstructFinalize(pstrmFMSG)); + + while(strm.ReadLine(pstrmFMSG, &pCStr, 0, 0, NULL, 0, NULL) == RS_RET_OK) { + if(rsCStrLen(pCStr) == 0) { + /* we do not process empty lines */ + DBGPRINTF("omkafka: loadFailedMsgs msg was empty!"); + } else { + puStr = rsCStrGetSzStrNoNULL(pCStr); //topic + pStrTabPos = index((char*)puStr, '\t'); //key + pStrTabPos2 = index((char*)pStrTabPos+1, '\t'); //msg + if ((pStrTabPos != NULL) && (pStrTabPos2 != NULL)) { + *pStrTabPos = '\0'; /* split string into two */ + *pStrTabPos2 = '\0'; /* split string into two */ + DBGPRINTF("omkafka: loadFailedMsgs successfully loaded msg '%s' for " + "topic '%s' key '%s' \n", + pStrTabPos2+1, (char*)puStr, pStrTabPos+1); + if (strlen(pStrTabPos+1)) { + CHKmalloc(fmsgEntry = failedmsg_entry_construct( + pStrTabPos+1,strlen(pStrTabPos+1), + pStrTabPos2+1,strlen(pStrTabPos2+1), + (char*)puStr)); + } else { + CHKmalloc(fmsgEntry = failedmsg_entry_construct( + NULL,0, + pStrTabPos2+1,strlen(pStrTabPos2+1), + (char*)puStr)); + } + SLIST_INSERT_HEAD(&pData->failedmsg_head, fmsgEntry, entries); + } else { + LogError(0, RS_RET_ERR, "omkafka: loadFailedMsgs droping invalid msg found: %s", + (char*)rsCStrGetSzStrNoNULL(pCStr)); + } + } + + rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */ + } +finalize_it: + if(pstrmFMSG != NULL) { + strm.Destruct(&pstrmFMSG); + } + + if(iRet != RS_RET_OK) { + /* We ignore FILE NOT FOUND here */ + if (iRet != RS_RET_FILE_NOT_FOUND) { + LogError(0, iRet, "omkafka: could not load failed messages " + "from file %s error %d - failed messages will not be resend.", + (char*)pData->failedMsgFile, iRet); + } + } else { + DBGPRINTF("omkafka: loadFailedMsgs unlinking '%s'\n", (char*)pData->failedMsgFile); + /* Delete file if still exists! */ + const int r = unlink((char*)pData->failedMsgFile); + if(r != 0 && r != ENOENT) { + LogError(errno, RS_RET_ERR, "omkafka: loadFailedMsgs failed to remove " + "file \"%s\"", (char*)pData->failedMsgFile); + } + } + + RETiRet; +} + +BEGINdoHUP +CODESTARTdoHUP + pthread_mutex_lock(&pData->mutErrFile); + if(pData->fdErrFile != -1) { + close(pData->fdErrFile); + pData->fdErrFile = -1; + } + pthread_mutex_unlock(&pData->mutErrFile); + pthread_mutex_lock(&pData->mutStatsFile); + if(pData->fdStatsFile != -1) { + close(pData->fdStatsFile); + pData->fdStatsFile = -1; + } + pthread_mutex_unlock(&pData->mutStatsFile); + if (pData->bReopenOnHup) { + CHKiRet(setupKafkaHandle(pData, 1)); + } else { + /* Optional */ + const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */ + LogMsg(0, NO_ERRCODE, LOG_INFO, "omkafka: doHUP kafka - '%s' outqueue length: %d," + "callbacks called %d\n", pData->tplName, + rd_kafka_outq_len(pData->rk), callbacksCalled); + } +finalize_it: +ENDdoHUP + +BEGINcreateInstance +CODESTARTcreateInstance + pData->currPartition = 0; + pData->bIsOpen = 0; + pData->bIsSuspended = 0; + pData->fdErrFile = -1; + pData->fdStatsFile = -1; + pData->pTopic = NULL; + pData->bReportErrs = 1; + pData->bReopenOnHup = 1; + pData->bResubmitOnFailure = 0; + pData->bKeepFailedMessages = 0; + pData->failedMsgFile = NULL; + SLIST_INIT(&pData->failedmsg_head); + CHKiRet(pthread_mutex_init(&pData->mut_doAction, NULL)); + CHKiRet(pthread_mutex_init(&pData->mutErrFile, NULL)); + CHKiRet(pthread_mutex_init(&pData->mutStatsFile, NULL)); + CHKiRet(pthread_rwlock_init(&pData->rkLock, NULL)); + CHKiRet(pthread_mutex_init(&pData->mutDynCache, NULL)); + INIT_ATOMIC_HELPER_MUT(pData->mutCurrPartition); +finalize_it: +ENDcreateInstance + + +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature +ENDisCompatibleWithFeature + + +BEGINfreeInstance +CODESTARTfreeInstance + /* Helpers for Failed Msg List */ + failedmsg_entry* fmsgEntry1; + failedmsg_entry* fmsgEntry2; + if(pData->fdErrFile != -1) + close(pData->fdErrFile); + if(pData->fdStatsFile != -1) + close(pData->fdStatsFile); + /* Closing Kafka first! */ + pthread_rwlock_wrlock(&pData->rkLock); + closeKafka(pData); + if(pData->dynaTopic && pData->dynCache != NULL) { + free(pData->dynCache); + pData->dynCache = NULL; + } + /* Persist failed messages */ + if (pData->bResubmitOnFailure && pData->bKeepFailedMessages && pData->failedMsgFile != NULL) { + persistFailedMsgs(pData); + } + pthread_rwlock_unlock(&pData->rkLock); + + if (pData->stats) { + statsobj.Destruct(&pData->stats); + } + + /* Delete Linked List for failed msgs */ + fmsgEntry1 = SLIST_FIRST(&pData->failedmsg_head); + while (fmsgEntry1 != NULL) { + fmsgEntry2 = SLIST_NEXT(fmsgEntry1, entries); + failedmsg_entry_destruct(fmsgEntry1); + fmsgEntry1 = fmsgEntry2; + } + SLIST_INIT(&pData->failedmsg_head); + /* Free other mem */ + free(pData->errorFile); + free(pData->statsFile); + free(pData->failedMsgFile); + free(pData->topic); + free(pData->brokers); + free(pData->tplName); + free(pData->statsName); + for(int i = 0 ; i < pData->nConfParams ; ++i) { + free((void*) pData->confParams[i].name); + free((void*) pData->confParams[i].val); + } + free(pData->confParams); + for(int i = 0 ; i < pData->nTopicConfParams ; ++i) { + free((void*) pData->topicConfParams[i].name); + free((void*) pData->topicConfParams[i].val); + } + free(pData->topicConfParams); + DESTROY_ATOMIC_HELPER_MUT(pData->mutCurrPartition); + pthread_rwlock_destroy(&pData->rkLock); + pthread_mutex_destroy(&pData->mut_doAction); + pthread_mutex_destroy(&pData->mutErrFile); + pthread_mutex_destroy(&pData->mutStatsFile); + pthread_mutex_destroy(&pData->mutDynCache); +ENDfreeInstance + +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + + +BEGINtryResume + int iKafkaRet; + const struct rd_kafka_metadata *metadata; +CODESTARTtryResume + pthread_mutex_lock(&pWrkrData->pData->mut_doAction); /* see doAction header comment! */ + CHKiRet(setupKafkaHandle(pWrkrData->pData, 0)); + + if ((iKafkaRet = rd_kafka_metadata(pWrkrData->pData->rk, 0, NULL, &metadata, 1000)) + != RD_KAFKA_RESP_ERR_NO_ERROR) { + DBGPRINTF("omkafka: tryResume failed, brokers down %d,%s\n", iKafkaRet, rd_kafka_err2str(iKafkaRet)); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } else { + DBGPRINTF("omkafka: tryResume success, %d brokers UP\n", metadata->broker_cnt); + /* Reset suspended state */ + pWrkrData->pData->bIsSuspended = 0; + /* free mem*/ + rd_kafka_metadata_destroy(metadata); + } + +finalize_it: + pthread_mutex_unlock(&pWrkrData->pData->mut_doAction); /* see doAction header comment! */ + DBGPRINTF("omkafka: tryResume returned %d\n", iRet); +ENDtryResume + + +/* IMPORTANT NOTE on multithreading: + * librdkafka creates background threads itself. So omkafka basically needs to move + * memory buffers over to librdkafka, which then does the heavy hauling. As such, we + * think that it is best to run max one wrkr instance of omkafka -- otherwise we just + * get additional locking (contention) overhead without any real gain. As such, + * we use a global mutex for doAction which ensures only one worker can be active + * at any given time. That mutex is also used to guard utility functions (like + * tryResume) which may also be accessed by multiple workers in parallel. + * Note: shall this method be changed, the kafka connection/suspension handling needs + * to be refactored. The current code assumes that all workers share state information + * including librdkafka handles. + */ +BEGINdoAction +CODESTARTdoAction + failedmsg_entry* fmsgEntry; + instanceData *const pData = pWrkrData->pData; + int need_unlock = 0; + int dynaTopicID = 0; + int dynaKeyID = 0; + + if (pData->dynaKey) { + dynaKeyID=2; + if (pData->dynaTopic) { + dynaTopicID=3; + } + } else { + if (pData->dynaTopic) { + dynaTopicID=2; + } + } + pthread_mutex_lock(&pData->mut_doAction); + if (! pData->bIsOpen) + CHKiRet(setupKafkaHandle(pData, 0)); + + /* Lock here to prevent msg loss */ + pthread_rwlock_rdlock(&pData->rkLock); + need_unlock = 1; + + /* We need to trigger callbacks first in order to suspend the Action properly on failure */ + const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */ + DBGPRINTF("omkafka: doAction kafka outqueue length: %d, callbacks called %d\n", + rd_kafka_outq_len(pData->rk), callbacksCalled); + + /* Reprocess failed messages! */ + if (pData->bResubmitOnFailure) { + iRet = checkFailedMessages(pData); + if(iRet != RS_RET_OK) { + DBGPRINTF("omkafka: doAction failed to submit FAILED messages with status %d\n", iRet); + + if (pData->bResubmitOnFailure) { + if (pData->dynaKey || pData->key) { + DBGPRINTF("omkafka: also adding MSG '%.*s' for topic '%s' key '%s' " + "to failed for RETRY!\n", + (int)(strlen((char*)ppString[0])-1), ppString[0], + pData->dynaTopic ? ppString[dynaTopicID] : pData->topic, + pData->dynaKey ? ppString[dynaKeyID] : pData->key); + } else { + DBGPRINTF("omkafka: also adding MSG '%.*s' for topic '%s' " + "to failed for RETRY!\n", + (int)(strlen((char*)ppString[0])-1), ppString[0], + pData->dynaTopic ? ppString[dynaTopicID] : pData->topic); + } + CHKmalloc(fmsgEntry = failedmsg_entry_construct( + (char*) (pData->dynaKey ? ppString[dynaKeyID] : pData->key), + pData->dynaKey || pData->key ? + strlen((char*)(pData->dynaKey ? ppString[dynaKeyID] : pData->key)) : 0, + (char*)ppString[0], strlen((char*)ppString[0]), + (char*) (pData->dynaTopic ? ppString[dynaTopicID] : pData->topic))); + SLIST_INSERT_HEAD(&pData->failedmsg_head, fmsgEntry, entries); + } + ABORT_FINALIZE(iRet); + } + } + + /* support dynamic topic */ + iRet = writeKafka(pData, pData->dynaKey ? ppString[dynaKeyID] : pData->key, ppString[0], ppString[1], + pData->dynaTopic ? ppString[dynaTopicID] : pData->topic, RESUBMIT); + +finalize_it: + if(need_unlock) { + pthread_rwlock_unlock(&pData->rkLock); + } + + if(iRet != RS_RET_OK) { + DBGPRINTF("omkafka: doAction failed with status %d\n", iRet); + } + + /* Suspend Action if broker problems were reported in error callback */ + if (pData->bIsSuspended) { + DBGPRINTF("omkafka: doAction broker failure detected, suspending action\n"); + iRet = RS_RET_SUSPENDED; + } + pthread_mutex_unlock(&pData->mut_doAction); /* must be after last pData access! */ +ENDdoAction + + +static void +setInstParamDefaults(instanceData *pData) +{ + pData->topic = NULL; + pData->pTopic = NULL; + pData->dynaKey = 0; + pData->dynaTopic = 0; + pData->iDynaTopicCacheSize = 50; + pData->brokers = NULL; + pData->autoPartition = 0; + pData->fixedPartition = NO_FIXED_PARTITION; + pData->nPartitions = 1; + pData->nConfParams = 0; + pData->confParams = NULL; + pData->nTopicConfParams = 0; + pData->topicConfParams = NULL; + pData->errorFile = NULL; + pData->statsFile = NULL; + pData->failedMsgFile = NULL; + pData->key = NULL; + pData->closeTimeout = 2000; +} + +static rsRetVal +processKafkaParam(char *const param, + const char **const name, + const char **const paramval) +{ + DEFiRet; + char *val = strstr(param, "="); + if(val == NULL) { + LogError(0, RS_RET_PARAM_ERROR, "missing equal sign in " + "parameter '%s'", param); + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + } + *val = '\0'; /* terminates name */ + ++val; /* now points to begin of value */ + CHKmalloc(*name = strdup(param)); + CHKmalloc(*paramval = strdup(val)); +finalize_it: + RETiRet; +} + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; + int iNumTpls; +CODESTARTnewActInst + if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); + + for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "topic")) { + pData->topic = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynakey")) { + pData->dynaKey = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynatopic")) { + pData->dynaTopic = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "dynatopic.cachesize")) { + pData->iDynaTopicCacheSize = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "closetimeout")) { + pData->closeTimeout = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "partitions.auto")) { + pData->autoPartition = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "partitions.number")) { + pData->nPartitions = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "partitions.usefixed")) { + pData->fixedPartition = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "broker")) { + es_str_t *es = es_newStr(128); + int bNeedComma = 0; + for(int j = 0 ; j < pvals[i].val.d.ar->nmemb ; ++j) { + if(bNeedComma) + es_addChar(&es, ','); + es_addStr(&es, pvals[i].val.d.ar->arr[j]); + bNeedComma = 1; + } + pData->brokers = es_str2cstr(es, NULL); + es_deleteStr(es); + } else if(!strcmp(actpblk.descr[i].name, "confparam")) { + pData->nConfParams = pvals[i].val.d.ar->nmemb; + CHKmalloc(pData->confParams = malloc(sizeof(struct kafka_params) * + pvals[i].val.d.ar->nmemb )); + for(int j = 0 ; j < pvals[i].val.d.ar->nmemb ; ++j) { + char *cstr = es_str2cstr(pvals[i].val.d.ar->arr[j], NULL); + CHKiRet(processKafkaParam(cstr, &pData->confParams[j].name, + &pData->confParams[j].val)); + free(cstr); + } + } else if(!strcmp(actpblk.descr[i].name, "topicconfparam")) { + pData->nTopicConfParams = pvals[i].val.d.ar->nmemb; + CHKmalloc(pData->topicConfParams = malloc(sizeof(struct kafka_params) * + pvals[i].val.d.ar->nmemb )); + for(int j = 0 ; j < pvals[i].val.d.ar->nmemb ; ++j) { + char *cstr = es_str2cstr(pvals[i].val.d.ar->arr[j], NULL); + CHKiRet(processKafkaParam(cstr, &pData->topicConfParams[j].name, + &pData->topicConfParams[j].val)); + free(cstr); + } + } else if(!strcmp(actpblk.descr[i].name, "errorfile")) { + pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "statsfile")) { + pData->statsFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "key")) { + pData->key = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "reopenonhup")) { + pData->bReopenOnHup = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "resubmitonfailure")) { + pData->bResubmitOnFailure = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "keepfailedmessages")) { + pData->bKeepFailedMessages = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "failedmsgfile")) { + pData->failedMsgFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "statsname")) { + pData->statsName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + LogError(0, RS_RET_INTERNAL_ERROR, + "omkafka: program error, non-handled param '%s'\n", actpblk.descr[i].name); + } + } + + if(pData->brokers == NULL) { + CHKmalloc(pData->brokers = strdup("localhost:9092")); + LogMsg(0, NO_ERRCODE, LOG_INFO, "imkafka: \"broker\" parameter not specified " + "using default of localhost:9092 -- this may not be what you want!"); + } + + if(pData->dynaKey && pData->key == NULL) { + LogError(0, RS_RET_CONFIG_ERROR, + "omkafka: requested dynamic key, but no " + "name for key template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + + if(pData->dynaTopic && pData->topic == NULL) { + LogError(0, RS_RET_CONFIG_ERROR, + "omkafka: requested dynamic topic, but no " + "name for topic template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + + iNumTpls = 2; + if(pData->dynaKey) ++iNumTpls; + if(pData->dynaTopic) ++iNumTpls; + CODE_STD_STRING_REQUESTnewActInst(iNumTpls); + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? + "RSYSLOG_FileFormat" : (char*)pData->tplName), + OMSR_NO_RQD_TPL_OPTS)); + + CHKiRet(OMSRsetEntry(*ppOMSR, 1, (uchar*)strdup(" KAFKA_TimeStamp"), + OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynaKey) + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->key), OMSR_NO_RQD_TPL_OPTS)); + + if(pData->dynaTopic) { + CHKiRet(OMSRsetEntry(*ppOMSR, pData->dynaKey?3:2, ustrdup(pData->topic), OMSR_NO_RQD_TPL_OPTS)); + CHKmalloc(pData->dynCache = (dynaTopicCacheEntry**) + calloc(pData->iDynaTopicCacheSize, sizeof(dynaTopicCacheEntry*))); + pData->iCurrElt = -1; + } + + pthread_mutex_lock(&closeTimeoutMut); + if (closeTimeout < pData->closeTimeout) { + closeTimeout = pData->closeTimeout; + } + pthread_mutex_unlock(&closeTimeoutMut); + + /* Load failed messages here (If enabled), do NOT check for IRET!*/ + if (pData->bKeepFailedMessages && pData->failedMsgFile != NULL) { + loadFailedMsgs(pData); + } + + if (pData->statsName) { + CHKiRet(statsobj.Construct(&pData->stats)); + CHKiRet(statsobj.SetName(pData->stats, (uchar *)pData->statsName)); + CHKiRet(statsobj.SetOrigin(pData->stats, (uchar *)"omkafka")); + + /* Track following stats */ + STATSCOUNTER_INIT(pData->ctrTopicSubmit, pData->mutCtrTopicSubmit); + CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"submitted", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->ctrTopicSubmit)); + STATSCOUNTER_INIT(pData->ctrKafkaFail, pData->mutCtrKafkaFail); + CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"failures", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->ctrKafkaFail)); + STATSCOUNTER_INIT(pData->ctrKafkaAck, pData->mutCtrKafkaAck); + CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"acked", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->ctrKafkaAck)); + CHKiRet(statsobj.ConstructFinalize(pData->stats)); + } + +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); +ENDnewActInst + + +BEGINmodExit +CODESTARTmodExit + statsobj.Destruct(&kafkaStats); + CHKiRet(objRelease(statsobj, CORE_COMPONENT)); + DESTROY_ATOMIC_HELPER_MUT(mutClock); + + pthread_mutex_lock(&closeTimeoutMut); + int timeout = closeTimeout; + pthread_mutex_unlock(&closeTimeoutMut); + pthread_mutex_destroy(&closeTimeoutMut); + if (rd_kafka_wait_destroyed(timeout) != 0) { + LogMsg(0, RS_RET_OK, LOG_WARNING, + "omkafka: could not terminate librdkafka gracefully, " + "%d threads still remain.\n", rd_kafka_thread_cnt()); + } +finalize_it: +ENDmodExit + + +NO_LEGACY_CONF_parseSelectorAct +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES +CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_doHUP +ENDqueryEtryPt + + +BEGINmodInit() +CODESTARTmodInit + uchar *pTmp; +INITLegCnfVars + *ipIFVersProvided = CURR_MOD_IF_VERSION; +CODEmodInit_QueryRegCFSLineHdlr + dbgprintf("just because librdkafka needs it, sqrt of 4 is %f\n", sqrt(4.0)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); + CHKiRet(objUse(strm, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + + INIT_ATOMIC_HELPER_MUT(mutClock); + + DBGPRINTF("omkafka %s using librdkafka version %s, 0x%x\n", + VERSION, rd_kafka_version_str(), rd_kafka_version()); + CHKiRet(statsobj.Construct(&kafkaStats)); + CHKiRet(statsobj.SetName(kafkaStats, (uchar *)"omkafka")); + CHKiRet(statsobj.SetOrigin(kafkaStats, (uchar*)"omkafka")); + STATSCOUNTER_INIT(ctrTopicSubmit, mutCtrTopicSubmit); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"submitted", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrTopicSubmit)); + STATSCOUNTER_INIT(ctrQueueSize, mutCtrQueueSize); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"maxoutqsize", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrQueueSize)); + STATSCOUNTER_INIT(ctrKafkaFail, mutCtrKafkaFail); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"failures", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaFail)); + STATSCOUNTER_INIT(ctrCacheSkip, mutCtrCacheSkip); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"topicdynacache.skipped", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrCacheSkip)); + STATSCOUNTER_INIT(ctrCacheMiss, mutCtrCacheMiss); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"topicdynacache.miss", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrCacheMiss)); + STATSCOUNTER_INIT(ctrCacheEvict, mutCtrCacheEvict); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"topicdynacache.evicted", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrCacheEvict)); + STATSCOUNTER_INIT(ctrKafkaAck, mutCtrKafkaAck); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"acked", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaAck)); + STATSCOUNTER_INIT(ctrKafkaMsgTooLarge, mutCtrKafkaMsgTooLarge); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"failures_msg_too_large", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaMsgTooLarge)); + STATSCOUNTER_INIT(ctrKafkaUnknownTopic, mutCtrKafkaUnknownTopic); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"failures_unknown_topic", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaUnknownTopic)); + STATSCOUNTER_INIT(ctrKafkaQueueFull, mutCtrKafkaQueueFull); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"failures_queue_full", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaQueueFull)); + STATSCOUNTER_INIT(ctrKafkaUnknownPartition, mutCtrKafkaUnknownPartition); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"failures_unknown_partition", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaUnknownPartition)); + STATSCOUNTER_INIT(ctrKafkaOtherErrors, mutCtrKafkaOtherErrors); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"failures_other", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaOtherErrors)); + STATSCOUNTER_INIT(ctrKafkaRespTimedOut, mutCtrKafkaRespTimedOut); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"errors_timed_out", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaRespTimedOut)); + STATSCOUNTER_INIT(ctrKafkaRespTransport, mutCtrKafkaRespTransport); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"errors_transport", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaRespTransport)); + STATSCOUNTER_INIT(ctrKafkaRespBrokerDown, mutCtrKafkaRespBrokerDown); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"errors_broker_down", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaRespBrokerDown)); + STATSCOUNTER_INIT(ctrKafkaRespAuth, mutCtrKafkaRespAuth); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"errors_auth", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaRespAuth)); + STATSCOUNTER_INIT(ctrKafkaRespSSL, mutCtrKafkaRespSSL); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"errors_ssl", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaRespSSL)); + STATSCOUNTER_INIT(ctrKafkaRespOther, mutCtrKafkaRespOther); + CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"errors_other", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrKafkaRespOther)); + CHKiRet(statsobj.AddCounter(kafkaStats, UCHAR_CONSTANT("rtt_avg_usec"), + ctrType_Int, CTR_FLAG_NONE, &rtt_avg_usec)); + CHKiRet(statsobj.AddCounter(kafkaStats, UCHAR_CONSTANT("throttle_avg_msec"), + ctrType_Int, CTR_FLAG_NONE, &throttle_avg_msec)); + CHKiRet(statsobj.AddCounter(kafkaStats, UCHAR_CONSTANT("int_latency_avg_usec"), + ctrType_Int, CTR_FLAG_NONE, &int_latency_avg_usec)); + CHKiRet(statsobj.ConstructFinalize(kafkaStats)); + + DBGPRINTF("omkafka: Add KAFKA_TimeStamp to template system ONCE\n"); + pTmp = (uchar*) KAFKA_TimeStamp; + tplAddLine(ourConf, " KAFKA_TimeStamp", &pTmp); +ENDmodInit |