/* omhiredis.c
* Copyright 2012 Talksum, Inc
* Copyright 2015 DigitalOcean, Inc
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program. If not, see
* .
*
* Author: Brian Knox
*
*/
#include "config.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "rsyslog.h"
#include "conf.h"
#include "syslogd-types.h"
#include "srUtils.h"
#include "template.h"
#include "module-template.h"
#include "errmsg.h"
#include "cfsysline.h"
#include "unicode-helper.h"
MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
MODULE_CNFNAME("omhiredis")
/* internal structures
*/
DEF_OMOD_STATIC_DATA
#define OMHIREDIS_MODE_TEMPLATE 0
#define OMHIREDIS_MODE_QUEUE 1
#define OMHIREDIS_MODE_PUBLISH 2
#define OMHIREDIS_MODE_SET 3
#define OMHIREDIS_MODE_STREAM 4
/* our instance data.
* this will be accessable
* via pData */
typedef struct _instanceData {
uchar *server; /* redis server address */
int port; /* redis port */
uchar *serverpassword; /* redis password */
uchar *tplName; /* template name */
char *modeDescription; /* mode description */
int mode; /* mode constant */
uchar *key; /* key for QUEUE, PUBLISH and STREAM modes */
uchar *streamKeyAck; /* key name for STREAM ACKs (when enabled) */
uchar *streamGroupAck; /* group name for STREAM ACKs (when enabled) */
uchar *streamIndexAck; /* index name for STREAM ACKs (when enabled) */
int expiration; /* expiration value for SET/SETEX mode */
sbool dynaKey; /* Should we treat the key as a template? */
sbool streamDynaKeyAck; /* Should we treat the groupAck as a template? */
sbool streamDynaGroupAck; /* Should we treat the groupAck as a template? */
sbool streamDynaIndexAck; /* Should we treat the IndexAck as a template? */
sbool useRPush; /* Should we use RPUSH instead of LPUSH? */
uchar *streamOutField; /* Field to place message into (for stream insertions only) */
uint streamCapacityLimit; /* zero means stream is not capped (default)
setting a non-zero value ultimately activates the approximate MAXLEN option '~'
(see Redis XADD docs)*/
sbool streamAck; /* Should the module send an XACK for each inserted message?
This feature requires that 3 infos are present in the '$.' object of the log:
- $.redis!stream
- $.redis!group
- $.redis!index
Those 3 infos can either be provided through usage of imhiredis
or set manually with Rainerscript */
sbool streamDel; /* Should the module send an XDEL for each inserted message?
This feature requires that 2 infos are present in the '$.' object of the log:
- $.redis!stream
- $.redis!index
Those 2 infos can either be provided through usage of imhiredis
or set manually with Rainerscript */
} instanceData;
typedef struct wrkrInstanceData {
instanceData *pData; /* instanc data */
redisContext *conn; /* redis connection */
int count; /* count of command sent for current batch */
} wrkrInstanceData_t;
static struct cnfparamdescr actpdescr[] = {
{ "server", eCmdHdlrGetWord, 0 },
{ "serverport", eCmdHdlrInt, 0 },
{ "serverpassword", eCmdHdlrGetWord, 0 },
{ "template", eCmdHdlrGetWord, 0 },
{ "mode", eCmdHdlrGetWord, 0 },
{ "key", eCmdHdlrGetWord, 0 },
{ "expiration", eCmdHdlrInt, 0 },
{ "dynakey", eCmdHdlrBinary, 0 },
{ "userpush", eCmdHdlrBinary, 0 },
{ "stream.outField", eCmdHdlrGetWord, 0 },
{ "stream.capacityLimit", eCmdHdlrNonNegInt, 0 },
{ "stream.ack", eCmdHdlrBinary, 0 },
{ "stream.del", eCmdHdlrBinary, 0 },
{ "stream.keyAck", eCmdHdlrGetWord, 0 },
{ "stream.groupAck", eCmdHdlrGetWord, 0 },
{ "stream.indexAck", eCmdHdlrGetWord, 0 },
{ "stream.dynaKeyAck", eCmdHdlrBinary, 0 },
{ "stream.dynaGroupAck", eCmdHdlrBinary, 0 },
{ "stream.dynaIndexAck", eCmdHdlrBinary, 0 },
};
static struct cnfparamblk actpblk = {
CNFPARAMBLK_VERSION,
sizeof(actpdescr)/sizeof(struct cnfparamdescr),
actpdescr
};
BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
BEGINcreateWrkrInstance
CODESTARTcreateWrkrInstance
pWrkrData->conn = NULL; /* Connect later */
ENDcreateWrkrInstance
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
iRet = RS_RET_OK;
ENDisCompatibleWithFeature
/* called when closing */
static void closeHiredis(wrkrInstanceData_t *pWrkrData)
{
if(pWrkrData->conn != NULL) {
redisFree(pWrkrData->conn);
pWrkrData->conn = NULL;
}
}
/* Free our instance data. */
BEGINfreeInstance
CODESTARTfreeInstance
if (pData->server != NULL) {
free(pData->server);
}
free(pData->key);
free(pData->modeDescription);
free(pData->serverpassword);
free(pData->tplName);
free(pData->streamKeyAck);
free(pData->streamGroupAck);
free(pData->streamIndexAck);
free(pData->streamOutField);
ENDfreeInstance
BEGINfreeWrkrInstance
CODESTARTfreeWrkrInstance
closeHiredis(pWrkrData);
ENDfreeWrkrInstance
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
/* nothing special here */
ENDdbgPrintInstInfo
/* establish our connection to redis */
static rsRetVal initHiredis(wrkrInstanceData_t *pWrkrData, int bSilent)
{
char *server;
redisReply *reply = NULL;
DEFiRet;
server = (pWrkrData->pData->server == NULL) ? (char *)"127.0.0.1" :
(char*) pWrkrData->pData->server;
DBGPRINTF("omhiredis: trying connect to '%s' at port %d\n", server,
pWrkrData->pData->port);
struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
pWrkrData->conn = redisConnectWithTimeout(server, pWrkrData->pData->port,
timeout);
if (pWrkrData->conn->err) {
if(!bSilent)
LogError(0, RS_RET_SUSPENDED,
"can not initialize redis handle");
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
if (pWrkrData->pData->serverpassword != NULL) {
reply = redisCommand(pWrkrData->conn, "AUTH %s", (char*) pWrkrData->pData->serverpassword);
if (reply == NULL) {
DBGPRINTF("omhiredis: could not get reply from AUTH command\n");
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
else if (reply->type == REDIS_REPLY_ERROR) {
LogError(0, NO_ERRCODE, "omhiredis: error while authenticating: %s", reply->str);
ABORT_FINALIZE(RS_RET_ERR);
}
}
finalize_it:
if (iRet != RS_RET_OK && pWrkrData-> conn != NULL) {
redisFree(pWrkrData->conn);
pWrkrData->conn = NULL;
}
if (reply != NULL) freeReplyObject(reply);
RETiRet;
}
static rsRetVal isMaster(wrkrInstanceData_t *pWrkrData) {
DEFiRet;
redisReply *reply = NULL;
assert(pWrkrData->conn != NULL);
reply = redisCommand(pWrkrData->conn, "ROLE");
if (reply == NULL) {
DBGPRINTF("omhiredis: could not get reply from ROLE command\n");
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
else if (reply->type == REDIS_REPLY_ERROR) {
LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "omhiredis: got an error while querying role -> "
"%s\n", reply->str);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
else if (reply->type != REDIS_REPLY_ARRAY || reply->element[0]->type != REDIS_REPLY_STRING) {
LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "omhiredis: did not get a proper reply from ROLE command");
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
else {
if (strncmp(reply->element[0]->str, "master", 6)) {
LogMsg(0, RS_RET_OK, LOG_WARNING, "omhiredis: current connected node is not a master");
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
}
finalize_it:
free(reply);
RETiRet;
}
static rsRetVal writeHiredis(uchar* key, uchar *message, wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
int rc, expire;
size_t msgLen;
char *formattedMsg = NULL;
/* if we do not have a redis connection, call
* initHiredis and try to establish one */
if(pWrkrData->conn == NULL)
CHKiRet(initHiredis(pWrkrData, 0));
/* try to append the command to the pipeline.
* REDIS_ERR reply indicates something bad
* happened, in which case abort. otherwise
* increase our current pipeline count
* by 1 and continue. */
switch(pWrkrData->pData->mode) {
case OMHIREDIS_MODE_TEMPLATE:
rc = redisAppendCommand(pWrkrData->conn, (char*)message);
break;
case OMHIREDIS_MODE_QUEUE:
rc = redisAppendCommand(pWrkrData->conn,
pWrkrData->pData->useRPush ? "RPUSH %s %s" : "LPUSH %s %s",
key, (char*)message);
break;
case OMHIREDIS_MODE_PUBLISH:
rc = redisAppendCommand(pWrkrData->conn, "PUBLISH %s %s", key, (char*)message);
break;
case OMHIREDIS_MODE_SET:
expire = pWrkrData->pData->expiration;
if (expire > 0)
msgLen = redisFormatCommand(&formattedMsg, "SETEX %s %d %s", key, expire, message);
else
msgLen = redisFormatCommand(&formattedMsg, "SET %s %s", key, message);
if (msgLen)
rc = redisAppendFormattedCommand(pWrkrData->conn, formattedMsg, msgLen);
else {
dbgprintf("omhiredis: could not append SET command\n");
rc = REDIS_ERR;
}
break;
case OMHIREDIS_MODE_STREAM:
if (pWrkrData->pData->streamCapacityLimit != 0) {
rc = redisAppendCommand(pWrkrData->conn, "XADD %s MAXLEN ~ %d * %s %s",
key,
pWrkrData->pData->streamCapacityLimit,
pWrkrData->pData->streamOutField,
message);
} else {
rc = redisAppendCommand(pWrkrData->conn, "XADD %s * %s %s",
key,
pWrkrData->pData->streamOutField,
message);
}
break;
default:
dbgprintf("omhiredis: mode %d is invalid something is really wrong\n",
pWrkrData->pData->mode);
ABORT_FINALIZE(RS_RET_ERR);
}
if (rc == REDIS_ERR) {
LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr);
dbgprintf("omhiredis: %s\n", pWrkrData->conn->errstr);
ABORT_FINALIZE(RS_RET_ERR);
} else {
pWrkrData->count++;
}
finalize_it:
free(formattedMsg);
RETiRet;
}
static rsRetVal ackHiredisStreamIndex(wrkrInstanceData_t *pWrkrData, uchar *key, uchar *group, uchar *index) {
DEFiRet;
if (REDIS_ERR == redisAppendCommand(pWrkrData->conn, "XACK %s %s %s", key, group, index)) {
LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr);
DBGPRINTF("omhiredis: %s\n", pWrkrData->conn->errstr);
ABORT_FINALIZE(RS_RET_ERR);
} else {
pWrkrData->count++;
}
finalize_it:
RETiRet;
}
static rsRetVal delHiredisStreamIndex(wrkrInstanceData_t *pWrkrData, uchar *key, uchar *index) {
DEFiRet;
if (REDIS_ERR == redisAppendCommand(pWrkrData->conn, "XDEL %s %s", key, index)) {
LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr);
DBGPRINTF("omhiredis: %s\n", pWrkrData->conn->errstr);
ABORT_FINALIZE(RS_RET_ERR);
} else {
pWrkrData->count++;
}
finalize_it:
RETiRet;
}
/* called when resuming from suspended state.
* try to restablish our connection to redis */
BEGINtryResume
CODESTARTtryResume
closeHiredis(pWrkrData);
CHKiRet(initHiredis(pWrkrData, 0));
// Must get a master node for all modes, except 'publish'
if(pWrkrData->pData->mode != OMHIREDIS_MODE_PUBLISH) {
CHKiRet(isMaster(pWrkrData));
}
finalize_it:
ENDtryResume
/* begin a transaction.
* if I decide to use MULTI ... EXEC in the
* future, this block should send the
* MULTI command to redis. */
BEGINbeginTransaction
CODESTARTbeginTransaction
dbgprintf("omhiredis: beginTransaction called\n");
pWrkrData->count = 0;
ENDbeginTransaction
/* call writeHiredis for this log line,
* which appends it as a command to the
* current pipeline */
BEGINdoAction
uchar *message, *key, *keyNameAck, *groupNameAck, *IndexNameAck;
int inputIndex = 0;
CODESTARTdoAction
// Don't change the order of conditions/assignations here without changing the end of the newActInst function!
message = ppString[inputIndex++];
key = pWrkrData->pData->dynaKey ? ppString[inputIndex++] : pWrkrData->pData->key;
keyNameAck = pWrkrData->pData->streamDynaKeyAck ? ppString[inputIndex++] : pWrkrData->pData->streamKeyAck;
groupNameAck = pWrkrData->pData->streamDynaGroupAck ? ppString[inputIndex++] : pWrkrData->pData->streamGroupAck;
IndexNameAck = pWrkrData->pData->streamDynaIndexAck ? ppString[inputIndex++] : pWrkrData->pData->streamIndexAck;
CHKiRet(writeHiredis(key, message, pWrkrData));
if(pWrkrData->pData->streamAck) {
CHKiRet(ackHiredisStreamIndex(pWrkrData, keyNameAck, groupNameAck, IndexNameAck));
}
if(pWrkrData->pData->streamDel) {
CHKiRet(delHiredisStreamIndex(pWrkrData, keyNameAck, IndexNameAck));
}
iRet = RS_RET_DEFER_COMMIT;
finalize_it:
ENDdoAction
/* called when we have reached the end of a
* batch (queue.dequeuebatchsize). this
* iterates over the replies, putting them
* into the pData->replies buffer. we currently
* don't really bother to check for errors
* which should be fixed */
BEGINendTransaction
CODESTARTendTransaction
dbgprintf("omhiredis: endTransaction called\n");
redisReply *reply;
int i;
for ( i = 0; i < pWrkrData->count; i++ ) {
if( REDIS_OK != redisGetReply( pWrkrData->conn, (void*)&reply) || pWrkrData->conn->err ) {
dbgprintf("omhiredis: %s\n", pWrkrData->conn->errstr);
LogError(0, RS_RET_REDIS_ERROR, "Error while processing replies: %s", pWrkrData->conn->errstr);
closeHiredis(pWrkrData);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
else {
if (reply->type == REDIS_REPLY_ERROR) {
LogError(0, RS_RET_REDIS_ERROR, "Received error from redis -> %s", reply->str);
closeHiredis(pWrkrData);
freeReplyObject(reply);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
freeReplyObject(reply);
}
}
finalize_it:
ENDendTransaction
/* set defaults. note server is set to NULL
* and is set to a default in initHiredis if
* it is still null when it's called - I should
* probable just set the default here instead */
static void
setInstParamDefaults(instanceData *pData)
{
pData->server = NULL;
pData->port = 6379;
pData->serverpassword = NULL;
pData->tplName = NULL;
pData->mode = OMHIREDIS_MODE_TEMPLATE;
pData->expiration = 0;
pData->modeDescription = NULL;
pData->key = NULL;
pData->dynaKey = 0;
pData->useRPush = 0;
pData->streamOutField = NULL;
pData->streamKeyAck = NULL;
pData->streamDynaKeyAck = 0;
pData->streamGroupAck = NULL;
pData->streamDynaGroupAck = 0;
pData->streamIndexAck = NULL;
pData->streamDynaIndexAck = 0;
pData->streamCapacityLimit = 0;
pData->streamAck = 0;
pData->streamDel = 0;
}
/* here is where the work to set up a new instance
* is done. this reads the config options from
* the rsyslog conf and takes appropriate setup
* actions. */
BEGINnewActInst
struct cnfparamvals *pvals;
int i;
int iNumTpls;
uchar *strDup = NULL;
CODESTARTnewActInst
if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL)
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
CHKiRet(createInstance(&pData));
setInstParamDefaults(pData);
for(i = 0 ; i < actpblk.nParams ; ++i) {
if(!pvals[i].bUsed)
continue;
if(!strcmp(actpblk.descr[i].name, "server")) {
pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "serverport")) {
pData->port = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "serverpassword")) {
pData->serverpassword = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "template")) {
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "dynakey")) {
pData->dynaKey = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "userpush")) {
pData->useRPush = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "stream.outField")) {
pData->streamOutField = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "stream.keyAck")) {
pData->streamKeyAck = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "stream.dynaKeyAck")) {
pData->streamDynaKeyAck = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "stream.groupAck")) {
pData->streamGroupAck = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "stream.dynaGroupAck")) {
pData->streamDynaGroupAck = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "stream.indexAck")) {
pData->streamIndexAck = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "stream.dynaIndexAck")) {
pData->streamDynaIndexAck = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "stream.capacityLimit")) {
pData->streamCapacityLimit = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "stream.ack")) {
pData->streamAck = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "stream.del")) {
pData->streamDel = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "mode")) {
pData->modeDescription = es_str2cstr(pvals[i].val.d.estr, NULL);
if (!strcmp(pData->modeDescription, "template")) {
pData->mode = OMHIREDIS_MODE_TEMPLATE;
} else if (!strcmp(pData->modeDescription, "queue")) {
pData->mode = OMHIREDIS_MODE_QUEUE;
} else if (!strcmp(pData->modeDescription, "publish")) {
pData->mode = OMHIREDIS_MODE_PUBLISH;
} else if (!strcmp(pData->modeDescription, "set")) {
pData->mode = OMHIREDIS_MODE_SET;
} else if (!strcmp(pData->modeDescription, "stream")) {
pData->mode = OMHIREDIS_MODE_STREAM;
} else {
dbgprintf("omhiredis: unsupported mode %s\n", actpblk.descr[i].name);
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
} else if(!strcmp(actpblk.descr[i].name, "key")) {
pData->key = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "expiration")) {
pData->expiration = pvals[i].val.d.n;
dbgprintf("omhiredis: expiration set to %d\n", pData->expiration);
} else {
dbgprintf("omhiredis: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
}
}
dbgprintf("omhiredis: checking config sanity\n");
if (!pData->modeDescription) {
dbgprintf("omhiredis: no mode specified, setting it to 'template'\n");
pData->mode = OMHIREDIS_MODE_TEMPLATE;
}
if (pData->mode == OMHIREDIS_MODE_STREAM && !pData->streamOutField) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: no stream.outField set, "\
"using 'msg' as default");
pData->streamOutField = ustrdup("msg");
}
if (pData->tplName == NULL) {
if(pData->mode == OMHIREDIS_MODE_TEMPLATE) {
LogError(0, RS_RET_CONF_PARSE_ERROR, "omhiredis: selected mode requires a template");
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
} else {
CHKmalloc(pData->tplName = ustrdup("RSYSLOG_ForwardFormat"));
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: no template set, "\
"using RSYSLOG_ForwardFormat as default");
}
}
if (pData->mode != OMHIREDIS_MODE_TEMPLATE && pData->key == NULL) {
LogError(0, RS_RET_CONF_PARSE_ERROR,
"omhiredis: mode %s requires a key", pData->modeDescription);
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
if (pData->expiration && pData->mode != OMHIREDIS_MODE_SET) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: expiration set but mode is not "\
"'set', expiration will be ignored");
}
if (pData->mode != OMHIREDIS_MODE_STREAM) {
if (pData->streamOutField) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.outField set "\
"but mode is not 'stream', field will be ignored");
}
if (pData->streamAck) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.ack set "\
"but mode is not 'stream', XACK will be ignored");
}
if (pData->streamDel) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.del set "\
"but mode is not 'stream', XDEL will be ignored");
}
if (pData->streamCapacityLimit) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.capacityLimit set "\
"but mode is not 'stream', stream trimming will be ignored");
}
if (pData->streamKeyAck) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.keyAck set "\
"but mode is not 'stream', parameter will be ignored");
}
if (pData->streamDynaKeyAck) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.dynaKeyAck set "\
"but mode is not 'stream', parameter will be ignored");
}
if (pData->streamGroupAck) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.groupAck set "\
"but mode is not 'stream', parameter will be ignored");
}
if (pData->streamDynaGroupAck) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.dynaGroupAck set "\
"but mode is not 'stream', parameter will be ignored");
}
if (pData->streamIndexAck) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.indexAck set "\
"but mode is not 'stream', parameter will be ignored");
}
if (pData->streamDynaIndexAck) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.dynaIndexAck set "\
"but mode is not 'stream', parameter will be ignored");
}
} else {
if(pData->streamAck) {
if(!pData->streamKeyAck || !pData->streamGroupAck || !pData->streamIndexAck) {
LogError(0, RS_RET_CONF_PARSE_ERROR,
"omhiredis: 'stream.ack' is set but one of "\
"'stream.keyAck', 'stream.groupAck' or 'stream.indexAck' is missing");
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
}
if(pData->streamDel) {
if(!pData->streamKeyAck || !pData->streamIndexAck) {
LogError(0, RS_RET_CONF_PARSE_ERROR,
"omhiredis: 'stream.del' is set but one of "\
"'stream.keyAck' or 'stream.indexAck' is missing");
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
}
}
if (pData->streamDynaKeyAck && pData->streamKeyAck == NULL) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: 'stream.dynaKeyAck' set "\
"but 'stream.keyAck' is empty, disabling");
pData->streamDynaKeyAck = 0;
}
if (pData->streamDynaGroupAck && pData->streamGroupAck == NULL) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: 'stream.dynaGroupAck' set "\
"but 'stream.groupAck' is empty, disabling");
pData->streamDynaGroupAck = 0;
}
if (pData->streamDynaIndexAck && pData->streamIndexAck == NULL) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: 'stream.dynaGroupAck' set "\
"but 'stream.indexAck' is empty, disabling");
pData->streamDynaIndexAck = 0;
}
iNumTpls = 1;
if (pData->dynaKey) {
assert(pData->key != NULL);
iNumTpls += 1;
}
if (pData->streamDynaKeyAck) {
assert(pData->streamKeyAck != NULL);
iNumTpls += 1;
}
if (pData->streamDynaGroupAck) {
assert(pData->streamGroupAck != NULL);
iNumTpls += 1;
}
if (pData->streamDynaIndexAck) {
assert(pData->streamIndexAck != NULL);
iNumTpls += 1;
}
CODE_STD_STRING_REQUESTnewActInst(iNumTpls);
/* Insert templates in opposite order (keep in sync with doAction), order will be
* - tplName
* - key
* - streamKeyAck
* - streamGroupAck
* - streamIndexAck
*/
if (pData->streamDynaIndexAck) {
CHKmalloc(strDup = ustrdup(pData->streamIndexAck));
CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS));
strDup = NULL; /* handed over */
}
if (pData->streamDynaGroupAck) {
CHKmalloc(strDup = ustrdup(pData->streamGroupAck));
CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS));
strDup = NULL; /* handed over */
}
if (pData->streamDynaKeyAck) {
CHKmalloc(strDup = ustrdup(pData->streamKeyAck));
CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS));
strDup = NULL; /* handed over */
}
if (pData->dynaKey) {
CHKmalloc(strDup = ustrdup(pData->key));
CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS));
strDup = NULL; /* handed over */
}
CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, ustrdup(pData->tplName), OMSR_NO_RQD_TPL_OPTS));
CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
free(strDup);
ENDnewActInst
NO_LEGACY_CONF_parseSelectorAct
BEGINmodExit
CODESTARTmodExit
ENDmodExit
/* register our plugin entry points
* with the rsyslog core engine */
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* supports transaction interface */
ENDqueryEtryPt
/* note we do not support rsyslog v5 syntax */
BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* only supports rsyslog 6 configs */
CODEmodInit_QueryRegCFSLineHdlr
INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
if (!bCoreSupportsBatching) {
LogError(0, NO_ERRCODE, "omhiredis: rsyslog core does not support batching - abort");
ABORT_FINALIZE(RS_RET_ERR);
}
DBGPRINTF("omhiredis: module compiled with rsyslog version %s.\n", VERSION);
ENDmodInit