/* omhiredis.c * Copyright 2012 Talksum, Inc * Copyright 2015 DigitalOcean, Inc * * This program is free software: you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this program. If not, see * . * * Author: Brian Knox * */ #include "config.h" #include #include #include #include #include #include #include #include #include #include #include "rsyslog.h" #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" #include "template.h" #include "module-template.h" #include "errmsg.h" #include "cfsysline.h" #include "unicode-helper.h" MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP MODULE_CNFNAME("omhiredis") /* internal structures */ DEF_OMOD_STATIC_DATA #define OMHIREDIS_MODE_TEMPLATE 0 #define OMHIREDIS_MODE_QUEUE 1 #define OMHIREDIS_MODE_PUBLISH 2 #define OMHIREDIS_MODE_SET 3 #define OMHIREDIS_MODE_STREAM 4 /* our instance data. * this will be accessable * via pData */ typedef struct _instanceData { uchar *server; /* redis server address */ int port; /* redis port */ uchar *serverpassword; /* redis password */ uchar *tplName; /* template name */ char *modeDescription; /* mode description */ int mode; /* mode constant */ uchar *key; /* key for QUEUE, PUBLISH and STREAM modes */ uchar *streamKeyAck; /* key name for STREAM ACKs (when enabled) */ uchar *streamGroupAck; /* group name for STREAM ACKs (when enabled) */ uchar *streamIndexAck; /* index name for STREAM ACKs (when enabled) */ int expiration; /* expiration value for SET/SETEX mode */ sbool dynaKey; /* Should we treat the key as a template? */ sbool streamDynaKeyAck; /* Should we treat the groupAck as a template? */ sbool streamDynaGroupAck; /* Should we treat the groupAck as a template? */ sbool streamDynaIndexAck; /* Should we treat the IndexAck as a template? */ sbool useRPush; /* Should we use RPUSH instead of LPUSH? */ uchar *streamOutField; /* Field to place message into (for stream insertions only) */ uint streamCapacityLimit; /* zero means stream is not capped (default) setting a non-zero value ultimately activates the approximate MAXLEN option '~' (see Redis XADD docs)*/ sbool streamAck; /* Should the module send an XACK for each inserted message? This feature requires that 3 infos are present in the '$.' object of the log: - $.redis!stream - $.redis!group - $.redis!index Those 3 infos can either be provided through usage of imhiredis or set manually with Rainerscript */ sbool streamDel; /* Should the module send an XDEL for each inserted message? This feature requires that 2 infos are present in the '$.' object of the log: - $.redis!stream - $.redis!index Those 2 infos can either be provided through usage of imhiredis or set manually with Rainerscript */ } instanceData; typedef struct wrkrInstanceData { instanceData *pData; /* instanc data */ redisContext *conn; /* redis connection */ int count; /* count of command sent for current batch */ } wrkrInstanceData_t; static struct cnfparamdescr actpdescr[] = { { "server", eCmdHdlrGetWord, 0 }, { "serverport", eCmdHdlrInt, 0 }, { "serverpassword", eCmdHdlrGetWord, 0 }, { "template", eCmdHdlrGetWord, 0 }, { "mode", eCmdHdlrGetWord, 0 }, { "key", eCmdHdlrGetWord, 0 }, { "expiration", eCmdHdlrInt, 0 }, { "dynakey", eCmdHdlrBinary, 0 }, { "userpush", eCmdHdlrBinary, 0 }, { "stream.outField", eCmdHdlrGetWord, 0 }, { "stream.capacityLimit", eCmdHdlrNonNegInt, 0 }, { "stream.ack", eCmdHdlrBinary, 0 }, { "stream.del", eCmdHdlrBinary, 0 }, { "stream.keyAck", eCmdHdlrGetWord, 0 }, { "stream.groupAck", eCmdHdlrGetWord, 0 }, { "stream.indexAck", eCmdHdlrGetWord, 0 }, { "stream.dynaKeyAck", eCmdHdlrBinary, 0 }, { "stream.dynaGroupAck", eCmdHdlrBinary, 0 }, { "stream.dynaIndexAck", eCmdHdlrBinary, 0 }, }; static struct cnfparamblk actpblk = { CNFPARAMBLK_VERSION, sizeof(actpdescr)/sizeof(struct cnfparamdescr), actpdescr }; BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance BEGINcreateWrkrInstance CODESTARTcreateWrkrInstance pWrkrData->conn = NULL; /* Connect later */ ENDcreateWrkrInstance BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) iRet = RS_RET_OK; ENDisCompatibleWithFeature /* called when closing */ static void closeHiredis(wrkrInstanceData_t *pWrkrData) { if(pWrkrData->conn != NULL) { redisFree(pWrkrData->conn); pWrkrData->conn = NULL; } } /* Free our instance data. */ BEGINfreeInstance CODESTARTfreeInstance if (pData->server != NULL) { free(pData->server); } free(pData->key); free(pData->modeDescription); free(pData->serverpassword); free(pData->tplName); free(pData->streamKeyAck); free(pData->streamGroupAck); free(pData->streamIndexAck); free(pData->streamOutField); ENDfreeInstance BEGINfreeWrkrInstance CODESTARTfreeWrkrInstance closeHiredis(pWrkrData); ENDfreeWrkrInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo /* nothing special here */ ENDdbgPrintInstInfo /* establish our connection to redis */ static rsRetVal initHiredis(wrkrInstanceData_t *pWrkrData, int bSilent) { char *server; redisReply *reply = NULL; DEFiRet; server = (pWrkrData->pData->server == NULL) ? (char *)"127.0.0.1" : (char*) pWrkrData->pData->server; DBGPRINTF("omhiredis: trying connect to '%s' at port %d\n", server, pWrkrData->pData->port); struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */ pWrkrData->conn = redisConnectWithTimeout(server, pWrkrData->pData->port, timeout); if (pWrkrData->conn->err) { if(!bSilent) LogError(0, RS_RET_SUSPENDED, "can not initialize redis handle"); ABORT_FINALIZE(RS_RET_SUSPENDED); } if (pWrkrData->pData->serverpassword != NULL) { reply = redisCommand(pWrkrData->conn, "AUTH %s", (char*) pWrkrData->pData->serverpassword); if (reply == NULL) { DBGPRINTF("omhiredis: could not get reply from AUTH command\n"); ABORT_FINALIZE(RS_RET_SUSPENDED); } else if (reply->type == REDIS_REPLY_ERROR) { LogError(0, NO_ERRCODE, "omhiredis: error while authenticating: %s", reply->str); ABORT_FINALIZE(RS_RET_ERR); } } finalize_it: if (iRet != RS_RET_OK && pWrkrData-> conn != NULL) { redisFree(pWrkrData->conn); pWrkrData->conn = NULL; } if (reply != NULL) freeReplyObject(reply); RETiRet; } static rsRetVal isMaster(wrkrInstanceData_t *pWrkrData) { DEFiRet; redisReply *reply = NULL; assert(pWrkrData->conn != NULL); reply = redisCommand(pWrkrData->conn, "ROLE"); if (reply == NULL) { DBGPRINTF("omhiredis: could not get reply from ROLE command\n"); ABORT_FINALIZE(RS_RET_SUSPENDED); } else if (reply->type == REDIS_REPLY_ERROR) { LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "omhiredis: got an error while querying role -> " "%s\n", reply->str); ABORT_FINALIZE(RS_RET_SUSPENDED); } else if (reply->type != REDIS_REPLY_ARRAY || reply->element[0]->type != REDIS_REPLY_STRING) { LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "omhiredis: did not get a proper reply from ROLE command"); ABORT_FINALIZE(RS_RET_SUSPENDED); } else { if (strncmp(reply->element[0]->str, "master", 6)) { LogMsg(0, RS_RET_OK, LOG_WARNING, "omhiredis: current connected node is not a master"); ABORT_FINALIZE(RS_RET_SUSPENDED); } } finalize_it: free(reply); RETiRet; } static rsRetVal writeHiredis(uchar* key, uchar *message, wrkrInstanceData_t *pWrkrData) { DEFiRet; int rc, expire; size_t msgLen; char *formattedMsg = NULL; /* if we do not have a redis connection, call * initHiredis and try to establish one */ if(pWrkrData->conn == NULL) CHKiRet(initHiredis(pWrkrData, 0)); /* try to append the command to the pipeline. * REDIS_ERR reply indicates something bad * happened, in which case abort. otherwise * increase our current pipeline count * by 1 and continue. */ switch(pWrkrData->pData->mode) { case OMHIREDIS_MODE_TEMPLATE: rc = redisAppendCommand(pWrkrData->conn, (char*)message); break; case OMHIREDIS_MODE_QUEUE: rc = redisAppendCommand(pWrkrData->conn, pWrkrData->pData->useRPush ? "RPUSH %s %s" : "LPUSH %s %s", key, (char*)message); break; case OMHIREDIS_MODE_PUBLISH: rc = redisAppendCommand(pWrkrData->conn, "PUBLISH %s %s", key, (char*)message); break; case OMHIREDIS_MODE_SET: expire = pWrkrData->pData->expiration; if (expire > 0) msgLen = redisFormatCommand(&formattedMsg, "SETEX %s %d %s", key, expire, message); else msgLen = redisFormatCommand(&formattedMsg, "SET %s %s", key, message); if (msgLen) rc = redisAppendFormattedCommand(pWrkrData->conn, formattedMsg, msgLen); else { dbgprintf("omhiredis: could not append SET command\n"); rc = REDIS_ERR; } break; case OMHIREDIS_MODE_STREAM: if (pWrkrData->pData->streamCapacityLimit != 0) { rc = redisAppendCommand(pWrkrData->conn, "XADD %s MAXLEN ~ %d * %s %s", key, pWrkrData->pData->streamCapacityLimit, pWrkrData->pData->streamOutField, message); } else { rc = redisAppendCommand(pWrkrData->conn, "XADD %s * %s %s", key, pWrkrData->pData->streamOutField, message); } break; default: dbgprintf("omhiredis: mode %d is invalid something is really wrong\n", pWrkrData->pData->mode); ABORT_FINALIZE(RS_RET_ERR); } if (rc == REDIS_ERR) { LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr); dbgprintf("omhiredis: %s\n", pWrkrData->conn->errstr); ABORT_FINALIZE(RS_RET_ERR); } else { pWrkrData->count++; } finalize_it: free(formattedMsg); RETiRet; } static rsRetVal ackHiredisStreamIndex(wrkrInstanceData_t *pWrkrData, uchar *key, uchar *group, uchar *index) { DEFiRet; if (REDIS_ERR == redisAppendCommand(pWrkrData->conn, "XACK %s %s %s", key, group, index)) { LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr); DBGPRINTF("omhiredis: %s\n", pWrkrData->conn->errstr); ABORT_FINALIZE(RS_RET_ERR); } else { pWrkrData->count++; } finalize_it: RETiRet; } static rsRetVal delHiredisStreamIndex(wrkrInstanceData_t *pWrkrData, uchar *key, uchar *index) { DEFiRet; if (REDIS_ERR == redisAppendCommand(pWrkrData->conn, "XDEL %s %s", key, index)) { LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr); DBGPRINTF("omhiredis: %s\n", pWrkrData->conn->errstr); ABORT_FINALIZE(RS_RET_ERR); } else { pWrkrData->count++; } finalize_it: RETiRet; } /* called when resuming from suspended state. * try to restablish our connection to redis */ BEGINtryResume CODESTARTtryResume closeHiredis(pWrkrData); CHKiRet(initHiredis(pWrkrData, 0)); // Must get a master node for all modes, except 'publish' if(pWrkrData->pData->mode != OMHIREDIS_MODE_PUBLISH) { CHKiRet(isMaster(pWrkrData)); } finalize_it: ENDtryResume /* begin a transaction. * if I decide to use MULTI ... EXEC in the * future, this block should send the * MULTI command to redis. */ BEGINbeginTransaction CODESTARTbeginTransaction dbgprintf("omhiredis: beginTransaction called\n"); pWrkrData->count = 0; ENDbeginTransaction /* call writeHiredis for this log line, * which appends it as a command to the * current pipeline */ BEGINdoAction uchar *message, *key, *keyNameAck, *groupNameAck, *IndexNameAck; int inputIndex = 0; CODESTARTdoAction // Don't change the order of conditions/assignations here without changing the end of the newActInst function! message = ppString[inputIndex++]; key = pWrkrData->pData->dynaKey ? ppString[inputIndex++] : pWrkrData->pData->key; keyNameAck = pWrkrData->pData->streamDynaKeyAck ? ppString[inputIndex++] : pWrkrData->pData->streamKeyAck; groupNameAck = pWrkrData->pData->streamDynaGroupAck ? ppString[inputIndex++] : pWrkrData->pData->streamGroupAck; IndexNameAck = pWrkrData->pData->streamDynaIndexAck ? ppString[inputIndex++] : pWrkrData->pData->streamIndexAck; CHKiRet(writeHiredis(key, message, pWrkrData)); if(pWrkrData->pData->streamAck) { CHKiRet(ackHiredisStreamIndex(pWrkrData, keyNameAck, groupNameAck, IndexNameAck)); } if(pWrkrData->pData->streamDel) { CHKiRet(delHiredisStreamIndex(pWrkrData, keyNameAck, IndexNameAck)); } iRet = RS_RET_DEFER_COMMIT; finalize_it: ENDdoAction /* called when we have reached the end of a * batch (queue.dequeuebatchsize). this * iterates over the replies, putting them * into the pData->replies buffer. we currently * don't really bother to check for errors * which should be fixed */ BEGINendTransaction CODESTARTendTransaction dbgprintf("omhiredis: endTransaction called\n"); redisReply *reply; int i; for ( i = 0; i < pWrkrData->count; i++ ) { if( REDIS_OK != redisGetReply( pWrkrData->conn, (void*)&reply) || pWrkrData->conn->err ) { dbgprintf("omhiredis: %s\n", pWrkrData->conn->errstr); LogError(0, RS_RET_REDIS_ERROR, "Error while processing replies: %s", pWrkrData->conn->errstr); closeHiredis(pWrkrData); ABORT_FINALIZE(RS_RET_SUSPENDED); } else { if (reply->type == REDIS_REPLY_ERROR) { LogError(0, RS_RET_REDIS_ERROR, "Received error from redis -> %s", reply->str); closeHiredis(pWrkrData); freeReplyObject(reply); ABORT_FINALIZE(RS_RET_SUSPENDED); } freeReplyObject(reply); } } finalize_it: ENDendTransaction /* set defaults. note server is set to NULL * and is set to a default in initHiredis if * it is still null when it's called - I should * probable just set the default here instead */ static void setInstParamDefaults(instanceData *pData) { pData->server = NULL; pData->port = 6379; pData->serverpassword = NULL; pData->tplName = NULL; pData->mode = OMHIREDIS_MODE_TEMPLATE; pData->expiration = 0; pData->modeDescription = NULL; pData->key = NULL; pData->dynaKey = 0; pData->useRPush = 0; pData->streamOutField = NULL; pData->streamKeyAck = NULL; pData->streamDynaKeyAck = 0; pData->streamGroupAck = NULL; pData->streamDynaGroupAck = 0; pData->streamIndexAck = NULL; pData->streamDynaIndexAck = 0; pData->streamCapacityLimit = 0; pData->streamAck = 0; pData->streamDel = 0; } /* here is where the work to set up a new instance * is done. this reads the config options from * the rsyslog conf and takes appropriate setup * actions. */ BEGINnewActInst struct cnfparamvals *pvals; int i; int iNumTpls; uchar *strDup = NULL; CODESTARTnewActInst if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); CHKiRet(createInstance(&pData)); setInstParamDefaults(pData); for(i = 0 ; i < actpblk.nParams ; ++i) { if(!pvals[i].bUsed) continue; if(!strcmp(actpblk.descr[i].name, "server")) { pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "serverport")) { pData->port = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "serverpassword")) { pData->serverpassword = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "dynakey")) { pData->dynaKey = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "userpush")) { pData->useRPush = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "stream.outField")) { pData->streamOutField = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "stream.keyAck")) { pData->streamKeyAck = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "stream.dynaKeyAck")) { pData->streamDynaKeyAck = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "stream.groupAck")) { pData->streamGroupAck = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "stream.dynaGroupAck")) { pData->streamDynaGroupAck = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "stream.indexAck")) { pData->streamIndexAck = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "stream.dynaIndexAck")) { pData->streamDynaIndexAck = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "stream.capacityLimit")) { pData->streamCapacityLimit = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "stream.ack")) { pData->streamAck = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "stream.del")) { pData->streamDel = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "mode")) { pData->modeDescription = es_str2cstr(pvals[i].val.d.estr, NULL); if (!strcmp(pData->modeDescription, "template")) { pData->mode = OMHIREDIS_MODE_TEMPLATE; } else if (!strcmp(pData->modeDescription, "queue")) { pData->mode = OMHIREDIS_MODE_QUEUE; } else if (!strcmp(pData->modeDescription, "publish")) { pData->mode = OMHIREDIS_MODE_PUBLISH; } else if (!strcmp(pData->modeDescription, "set")) { pData->mode = OMHIREDIS_MODE_SET; } else if (!strcmp(pData->modeDescription, "stream")) { pData->mode = OMHIREDIS_MODE_STREAM; } else { dbgprintf("omhiredis: unsupported mode %s\n", actpblk.descr[i].name); ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } } else if(!strcmp(actpblk.descr[i].name, "key")) { pData->key = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "expiration")) { pData->expiration = pvals[i].val.d.n; dbgprintf("omhiredis: expiration set to %d\n", pData->expiration); } else { dbgprintf("omhiredis: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); } } dbgprintf("omhiredis: checking config sanity\n"); if (!pData->modeDescription) { dbgprintf("omhiredis: no mode specified, setting it to 'template'\n"); pData->mode = OMHIREDIS_MODE_TEMPLATE; } if (pData->mode == OMHIREDIS_MODE_STREAM && !pData->streamOutField) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: no stream.outField set, "\ "using 'msg' as default"); pData->streamOutField = ustrdup("msg"); } if (pData->tplName == NULL) { if(pData->mode == OMHIREDIS_MODE_TEMPLATE) { LogError(0, RS_RET_CONF_PARSE_ERROR, "omhiredis: selected mode requires a template"); ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } else { CHKmalloc(pData->tplName = ustrdup("RSYSLOG_ForwardFormat")); LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: no template set, "\ "using RSYSLOG_ForwardFormat as default"); } } if (pData->mode != OMHIREDIS_MODE_TEMPLATE && pData->key == NULL) { LogError(0, RS_RET_CONF_PARSE_ERROR, "omhiredis: mode %s requires a key", pData->modeDescription); ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } if (pData->expiration && pData->mode != OMHIREDIS_MODE_SET) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: expiration set but mode is not "\ "'set', expiration will be ignored"); } if (pData->mode != OMHIREDIS_MODE_STREAM) { if (pData->streamOutField) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.outField set "\ "but mode is not 'stream', field will be ignored"); } if (pData->streamAck) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.ack set "\ "but mode is not 'stream', XACK will be ignored"); } if (pData->streamDel) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.del set "\ "but mode is not 'stream', XDEL will be ignored"); } if (pData->streamCapacityLimit) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.capacityLimit set "\ "but mode is not 'stream', stream trimming will be ignored"); } if (pData->streamKeyAck) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.keyAck set "\ "but mode is not 'stream', parameter will be ignored"); } if (pData->streamDynaKeyAck) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.dynaKeyAck set "\ "but mode is not 'stream', parameter will be ignored"); } if (pData->streamGroupAck) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.groupAck set "\ "but mode is not 'stream', parameter will be ignored"); } if (pData->streamDynaGroupAck) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.dynaGroupAck set "\ "but mode is not 'stream', parameter will be ignored"); } if (pData->streamIndexAck) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.indexAck set "\ "but mode is not 'stream', parameter will be ignored"); } if (pData->streamDynaIndexAck) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.dynaIndexAck set "\ "but mode is not 'stream', parameter will be ignored"); } } else { if(pData->streamAck) { if(!pData->streamKeyAck || !pData->streamGroupAck || !pData->streamIndexAck) { LogError(0, RS_RET_CONF_PARSE_ERROR, "omhiredis: 'stream.ack' is set but one of "\ "'stream.keyAck', 'stream.groupAck' or 'stream.indexAck' is missing"); ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } } if(pData->streamDel) { if(!pData->streamKeyAck || !pData->streamIndexAck) { LogError(0, RS_RET_CONF_PARSE_ERROR, "omhiredis: 'stream.del' is set but one of "\ "'stream.keyAck' or 'stream.indexAck' is missing"); ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } } } if (pData->streamDynaKeyAck && pData->streamKeyAck == NULL) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: 'stream.dynaKeyAck' set "\ "but 'stream.keyAck' is empty, disabling"); pData->streamDynaKeyAck = 0; } if (pData->streamDynaGroupAck && pData->streamGroupAck == NULL) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: 'stream.dynaGroupAck' set "\ "but 'stream.groupAck' is empty, disabling"); pData->streamDynaGroupAck = 0; } if (pData->streamDynaIndexAck && pData->streamIndexAck == NULL) { LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: 'stream.dynaGroupAck' set "\ "but 'stream.indexAck' is empty, disabling"); pData->streamDynaIndexAck = 0; } iNumTpls = 1; if (pData->dynaKey) { assert(pData->key != NULL); iNumTpls += 1; } if (pData->streamDynaKeyAck) { assert(pData->streamKeyAck != NULL); iNumTpls += 1; } if (pData->streamDynaGroupAck) { assert(pData->streamGroupAck != NULL); iNumTpls += 1; } if (pData->streamDynaIndexAck) { assert(pData->streamIndexAck != NULL); iNumTpls += 1; } CODE_STD_STRING_REQUESTnewActInst(iNumTpls); /* Insert templates in opposite order (keep in sync with doAction), order will be * - tplName * - key * - streamKeyAck * - streamGroupAck * - streamIndexAck */ if (pData->streamDynaIndexAck) { CHKmalloc(strDup = ustrdup(pData->streamIndexAck)); CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS)); strDup = NULL; /* handed over */ } if (pData->streamDynaGroupAck) { CHKmalloc(strDup = ustrdup(pData->streamGroupAck)); CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS)); strDup = NULL; /* handed over */ } if (pData->streamDynaKeyAck) { CHKmalloc(strDup = ustrdup(pData->streamKeyAck)); CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS)); strDup = NULL; /* handed over */ } if (pData->dynaKey) { CHKmalloc(strDup = ustrdup(pData->key)); CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS)); strDup = NULL; /* handed over */ } CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, ustrdup(pData->tplName), OMSR_NO_RQD_TPL_OPTS)); CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); free(strDup); ENDnewActInst NO_LEGACY_CONF_parseSelectorAct BEGINmodExit CODESTARTmodExit ENDmodExit /* register our plugin entry points * with the rsyslog core engine */ BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_TXIF_OMOD_QUERIES /* supports transaction interface */ ENDqueryEtryPt /* note we do not support rsyslog v5 syntax */ BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* only supports rsyslog 6 configs */ CODEmodInit_QueryRegCFSLineHdlr INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); if (!bCoreSupportsBatching) { LogError(0, NO_ERRCODE, "omhiredis: rsyslog core does not support batching - abort"); ABORT_FINALIZE(RS_RET_ERR); } DBGPRINTF("omhiredis: module compiled with rsyslog version %s.\n", VERSION); ENDmodInit