summaryrefslogtreecommitdiffstats
path: root/contrib/omhiredis/omhiredis.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--contrib/omhiredis/omhiredis.c753
1 files changed, 753 insertions, 0 deletions
diff --git a/contrib/omhiredis/omhiredis.c b/contrib/omhiredis/omhiredis.c
new file mode 100644
index 0000000..eb22ed8
--- /dev/null
+++ b/contrib/omhiredis/omhiredis.c
@@ -0,0 +1,753 @@
+/* omhiredis.c
+* Copyright 2012 Talksum, Inc
+* Copyright 2015 DigitalOcean, Inc
+*
+* This program is free software: you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public License
+* as published by the Free Software Foundation, either version 3 of
+* the License, or (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful, but
+* WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this program. If not, see
+* <http://www.gnu.org/licenses/>.
+*
+* Author: Brian Knox
+* <bknox@digitalocean.com>
+*/
+
+
+#include "config.h"
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <ctype.h>
+#include <errno.h>
+#include <assert.h>
+#include <signal.h>
+#include <time.h>
+#include <math.h>
+#include <hiredis/hiredis.h>
+
+#include "rsyslog.h"
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "cfsysline.h"
+#include "unicode-helper.h"
+
+MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("omhiredis")
+/* internal structures
+ */
+DEF_OMOD_STATIC_DATA
+
+#define OMHIREDIS_MODE_TEMPLATE 0
+#define OMHIREDIS_MODE_QUEUE 1
+#define OMHIREDIS_MODE_PUBLISH 2
+#define OMHIREDIS_MODE_SET 3
+#define OMHIREDIS_MODE_STREAM 4
+
+/* our instance data.
+ * this will be accessable
+ * via pData */
+typedef struct _instanceData {
+ uchar *server; /* redis server address */
+ int port; /* redis port */
+ uchar *serverpassword; /* redis password */
+ uchar *tplName; /* template name */
+ char *modeDescription; /* mode description */
+ int mode; /* mode constant */
+ uchar *key; /* key for QUEUE, PUBLISH and STREAM modes */
+ uchar *streamKeyAck; /* key name for STREAM ACKs (when enabled) */
+ uchar *streamGroupAck; /* group name for STREAM ACKs (when enabled) */
+ uchar *streamIndexAck; /* index name for STREAM ACKs (when enabled) */
+ int expiration; /* expiration value for SET/SETEX mode */
+ sbool dynaKey; /* Should we treat the key as a template? */
+ sbool streamDynaKeyAck; /* Should we treat the groupAck as a template? */
+ sbool streamDynaGroupAck; /* Should we treat the groupAck as a template? */
+ sbool streamDynaIndexAck; /* Should we treat the IndexAck as a template? */
+ sbool useRPush; /* Should we use RPUSH instead of LPUSH? */
+ uchar *streamOutField; /* Field to place message into (for stream insertions only) */
+ uint streamCapacityLimit; /* zero means stream is not capped (default)
+ setting a non-zero value ultimately activates the approximate MAXLEN option '~'
+ (see Redis XADD docs)*/
+ sbool streamAck; /* Should the module send an XACK for each inserted message?
+ This feature requires that 3 infos are present in the '$.' object of the log:
+ - $.redis!stream
+ - $.redis!group
+ - $.redis!index
+ Those 3 infos can either be provided through usage of imhiredis
+ or set manually with Rainerscript */
+ sbool streamDel; /* Should the module send an XDEL for each inserted message?
+ This feature requires that 2 infos are present in the '$.' object of the log:
+ - $.redis!stream
+ - $.redis!index
+ Those 2 infos can either be provided through usage of imhiredis
+ or set manually with Rainerscript */
+
+} instanceData;
+
+typedef struct wrkrInstanceData {
+ instanceData *pData; /* instanc data */
+ redisContext *conn; /* redis connection */
+ int count; /* count of command sent for current batch */
+} wrkrInstanceData_t;
+
+static struct cnfparamdescr actpdescr[] = {
+ { "server", eCmdHdlrGetWord, 0 },
+ { "serverport", eCmdHdlrInt, 0 },
+ { "serverpassword", eCmdHdlrGetWord, 0 },
+ { "template", eCmdHdlrGetWord, 0 },
+ { "mode", eCmdHdlrGetWord, 0 },
+ { "key", eCmdHdlrGetWord, 0 },
+ { "expiration", eCmdHdlrInt, 0 },
+ { "dynakey", eCmdHdlrBinary, 0 },
+ { "userpush", eCmdHdlrBinary, 0 },
+ { "stream.outField", eCmdHdlrGetWord, 0 },
+ { "stream.capacityLimit", eCmdHdlrNonNegInt, 0 },
+ { "stream.ack", eCmdHdlrBinary, 0 },
+ { "stream.del", eCmdHdlrBinary, 0 },
+ { "stream.keyAck", eCmdHdlrGetWord, 0 },
+ { "stream.groupAck", eCmdHdlrGetWord, 0 },
+ { "stream.indexAck", eCmdHdlrGetWord, 0 },
+ { "stream.dynaKeyAck", eCmdHdlrBinary, 0 },
+ { "stream.dynaGroupAck", eCmdHdlrBinary, 0 },
+ { "stream.dynaIndexAck", eCmdHdlrBinary, 0 },
+};
+
+static struct cnfparamblk actpblk = {
+ CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+};
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->conn = NULL; /* Connect later */
+ENDcreateWrkrInstance
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURERepeatedMsgReduction)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+/* called when closing */
+static void closeHiredis(wrkrInstanceData_t *pWrkrData)
+{
+ if(pWrkrData->conn != NULL) {
+ redisFree(pWrkrData->conn);
+ pWrkrData->conn = NULL;
+ }
+}
+
+/* Free our instance data. */
+BEGINfreeInstance
+CODESTARTfreeInstance
+ if (pData->server != NULL) {
+ free(pData->server);
+ }
+ free(pData->key);
+ free(pData->modeDescription);
+ free(pData->serverpassword);
+ free(pData->tplName);
+ free(pData->streamKeyAck);
+ free(pData->streamGroupAck);
+ free(pData->streamIndexAck);
+ free(pData->streamOutField);
+ENDfreeInstance
+
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ closeHiredis(pWrkrData);
+ENDfreeWrkrInstance
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ /* nothing special here */
+ENDdbgPrintInstInfo
+
+/* establish our connection to redis */
+static rsRetVal initHiredis(wrkrInstanceData_t *pWrkrData, int bSilent)
+{
+ char *server;
+ redisReply *reply = NULL;
+ DEFiRet;
+
+ server = (pWrkrData->pData->server == NULL) ? (char *)"127.0.0.1" :
+ (char*) pWrkrData->pData->server;
+ DBGPRINTF("omhiredis: trying connect to '%s' at port %d\n", server,
+ pWrkrData->pData->port);
+
+ struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
+ pWrkrData->conn = redisConnectWithTimeout(server, pWrkrData->pData->port,
+ timeout);
+ if (pWrkrData->conn->err) {
+ if(!bSilent)
+ LogError(0, RS_RET_SUSPENDED,
+ "can not initialize redis handle");
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+ if (pWrkrData->pData->serverpassword != NULL) {
+ reply = redisCommand(pWrkrData->conn, "AUTH %s", (char*) pWrkrData->pData->serverpassword);
+ if (reply == NULL) {
+ DBGPRINTF("omhiredis: could not get reply from AUTH command\n");
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ else if (reply->type == REDIS_REPLY_ERROR) {
+ LogError(0, NO_ERRCODE, "omhiredis: error while authenticating: %s", reply->str);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ }
+
+finalize_it:
+ if (iRet != RS_RET_OK && pWrkrData-> conn != NULL) {
+ redisFree(pWrkrData->conn);
+ pWrkrData->conn = NULL;
+ }
+ if (reply != NULL) freeReplyObject(reply);
+ RETiRet;
+}
+
+static rsRetVal isMaster(wrkrInstanceData_t *pWrkrData) {
+ DEFiRet;
+ redisReply *reply = NULL;
+
+ assert(pWrkrData->conn != NULL);
+
+ reply = redisCommand(pWrkrData->conn, "ROLE");
+ if (reply == NULL) {
+ DBGPRINTF("omhiredis: could not get reply from ROLE command\n");
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ else if (reply->type == REDIS_REPLY_ERROR) {
+ LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "omhiredis: got an error while querying role -> "
+ "%s\n", reply->str);
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ else if (reply->type != REDIS_REPLY_ARRAY || reply->element[0]->type != REDIS_REPLY_STRING) {
+ LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "omhiredis: did not get a proper reply from ROLE command");
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ else {
+ if (strncmp(reply->element[0]->str, "master", 6)) {
+ LogMsg(0, RS_RET_OK, LOG_WARNING, "omhiredis: current connected node is not a master");
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ }
+
+finalize_it:
+ free(reply);
+ RETiRet;
+}
+
+static rsRetVal writeHiredis(uchar* key, uchar *message, wrkrInstanceData_t *pWrkrData)
+{
+ DEFiRet;
+ int rc, expire;
+ size_t msgLen;
+ char *formattedMsg = NULL;
+
+ /* if we do not have a redis connection, call
+ * initHiredis and try to establish one */
+ if(pWrkrData->conn == NULL)
+ CHKiRet(initHiredis(pWrkrData, 0));
+
+ /* try to append the command to the pipeline.
+ * REDIS_ERR reply indicates something bad
+ * happened, in which case abort. otherwise
+ * increase our current pipeline count
+ * by 1 and continue. */
+ switch(pWrkrData->pData->mode) {
+ case OMHIREDIS_MODE_TEMPLATE:
+ rc = redisAppendCommand(pWrkrData->conn, (char*)message);
+ break;
+ case OMHIREDIS_MODE_QUEUE:
+ rc = redisAppendCommand(pWrkrData->conn,
+ pWrkrData->pData->useRPush ? "RPUSH %s %s" : "LPUSH %s %s",
+ key, (char*)message);
+ break;
+ case OMHIREDIS_MODE_PUBLISH:
+ rc = redisAppendCommand(pWrkrData->conn, "PUBLISH %s %s", key, (char*)message);
+ break;
+ case OMHIREDIS_MODE_SET:
+ expire = pWrkrData->pData->expiration;
+
+ if (expire > 0)
+ msgLen = redisFormatCommand(&formattedMsg, "SETEX %s %d %s", key, expire, message);
+ else
+ msgLen = redisFormatCommand(&formattedMsg, "SET %s %s", key, message);
+ if (msgLen)
+ rc = redisAppendFormattedCommand(pWrkrData->conn, formattedMsg, msgLen);
+ else {
+ dbgprintf("omhiredis: could not append SET command\n");
+ rc = REDIS_ERR;
+ }
+ break;
+ case OMHIREDIS_MODE_STREAM:
+ if (pWrkrData->pData->streamCapacityLimit != 0) {
+ rc = redisAppendCommand(pWrkrData->conn, "XADD %s MAXLEN ~ %d * %s %s",
+ key,
+ pWrkrData->pData->streamCapacityLimit,
+ pWrkrData->pData->streamOutField,
+ message);
+ } else {
+ rc = redisAppendCommand(pWrkrData->conn, "XADD %s * %s %s",
+ key,
+ pWrkrData->pData->streamOutField,
+ message);
+ }
+ break;
+ default:
+ dbgprintf("omhiredis: mode %d is invalid something is really wrong\n",
+ pWrkrData->pData->mode);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ if (rc == REDIS_ERR) {
+ LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr);
+ dbgprintf("omhiredis: %s\n", pWrkrData->conn->errstr);
+ ABORT_FINALIZE(RS_RET_ERR);
+ } else {
+ pWrkrData->count++;
+ }
+
+finalize_it:
+ free(formattedMsg);
+ RETiRet;
+}
+
+static rsRetVal ackHiredisStreamIndex(wrkrInstanceData_t *pWrkrData, uchar *key, uchar *group, uchar *index) {
+ DEFiRet;
+
+ if (REDIS_ERR == redisAppendCommand(pWrkrData->conn, "XACK %s %s %s", key, group, index)) {
+ LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr);
+ DBGPRINTF("omhiredis: %s\n", pWrkrData->conn->errstr);
+ ABORT_FINALIZE(RS_RET_ERR);
+ } else {
+ pWrkrData->count++;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal delHiredisStreamIndex(wrkrInstanceData_t *pWrkrData, uchar *key, uchar *index) {
+ DEFiRet;
+
+ if (REDIS_ERR == redisAppendCommand(pWrkrData->conn, "XDEL %s %s", key, index)) {
+ LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr);
+ DBGPRINTF("omhiredis: %s\n", pWrkrData->conn->errstr);
+ ABORT_FINALIZE(RS_RET_ERR);
+ } else {
+ pWrkrData->count++;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+/* called when resuming from suspended state.
+ * try to restablish our connection to redis */
+BEGINtryResume
+CODESTARTtryResume
+ closeHiredis(pWrkrData);
+ CHKiRet(initHiredis(pWrkrData, 0));
+ // Must get a master node for all modes, except 'publish'
+ if(pWrkrData->pData->mode != OMHIREDIS_MODE_PUBLISH) {
+ CHKiRet(isMaster(pWrkrData));
+ }
+finalize_it:
+ENDtryResume
+
+/* begin a transaction.
+ * if I decide to use MULTI ... EXEC in the
+ * future, this block should send the
+ * MULTI command to redis. */
+BEGINbeginTransaction
+CODESTARTbeginTransaction
+ dbgprintf("omhiredis: beginTransaction called\n");
+ pWrkrData->count = 0;
+ENDbeginTransaction
+
+/* call writeHiredis for this log line,
+ * which appends it as a command to the
+ * current pipeline */
+BEGINdoAction
+ uchar *message, *key, *keyNameAck, *groupNameAck, *IndexNameAck;
+ int inputIndex = 0;
+CODESTARTdoAction
+ // Don't change the order of conditions/assignations here without changing the end of the newActInst function!
+ message = ppString[inputIndex++];
+ key = pWrkrData->pData->dynaKey ? ppString[inputIndex++] : pWrkrData->pData->key;
+ keyNameAck = pWrkrData->pData->streamDynaKeyAck ? ppString[inputIndex++] : pWrkrData->pData->streamKeyAck;
+ groupNameAck = pWrkrData->pData->streamDynaGroupAck ? ppString[inputIndex++] : pWrkrData->pData->streamGroupAck;
+ IndexNameAck = pWrkrData->pData->streamDynaIndexAck ? ppString[inputIndex++] : pWrkrData->pData->streamIndexAck;
+
+ CHKiRet(writeHiredis(key, message, pWrkrData));
+
+ if(pWrkrData->pData->streamAck) {
+ CHKiRet(ackHiredisStreamIndex(pWrkrData, keyNameAck, groupNameAck, IndexNameAck));
+ }
+ if(pWrkrData->pData->streamDel) {
+ CHKiRet(delHiredisStreamIndex(pWrkrData, keyNameAck, IndexNameAck));
+ }
+
+ iRet = RS_RET_DEFER_COMMIT;
+finalize_it:
+ENDdoAction
+
+/* called when we have reached the end of a
+ * batch (queue.dequeuebatchsize). this
+ * iterates over the replies, putting them
+ * into the pData->replies buffer. we currently
+ * don't really bother to check for errors
+ * which should be fixed */
+BEGINendTransaction
+CODESTARTendTransaction
+ dbgprintf("omhiredis: endTransaction called\n");
+ redisReply *reply;
+ int i;
+ for ( i = 0; i < pWrkrData->count; i++ ) {
+ if( REDIS_OK != redisGetReply( pWrkrData->conn, (void*)&reply) || pWrkrData->conn->err ) {
+ dbgprintf("omhiredis: %s\n", pWrkrData->conn->errstr);
+ LogError(0, RS_RET_REDIS_ERROR, "Error while processing replies: %s", pWrkrData->conn->errstr);
+ closeHiredis(pWrkrData);
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ else {
+ if (reply->type == REDIS_REPLY_ERROR) {
+ LogError(0, RS_RET_REDIS_ERROR, "Received error from redis -> %s", reply->str);
+ closeHiredis(pWrkrData);
+ freeReplyObject(reply);
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ freeReplyObject(reply);
+ }
+ }
+
+finalize_it:
+ENDendTransaction
+
+/* set defaults. note server is set to NULL
+ * and is set to a default in initHiredis if
+ * it is still null when it's called - I should
+ * probable just set the default here instead */
+static void
+setInstParamDefaults(instanceData *pData)
+{
+ pData->server = NULL;
+ pData->port = 6379;
+ pData->serverpassword = NULL;
+ pData->tplName = NULL;
+ pData->mode = OMHIREDIS_MODE_TEMPLATE;
+ pData->expiration = 0;
+ pData->modeDescription = NULL;
+ pData->key = NULL;
+ pData->dynaKey = 0;
+ pData->useRPush = 0;
+ pData->streamOutField = NULL;
+ pData->streamKeyAck = NULL;
+ pData->streamDynaKeyAck = 0;
+ pData->streamGroupAck = NULL;
+ pData->streamDynaGroupAck = 0;
+ pData->streamIndexAck = NULL;
+ pData->streamDynaIndexAck = 0;
+ pData->streamCapacityLimit = 0;
+ pData->streamAck = 0;
+ pData->streamDel = 0;
+}
+
+/* here is where the work to set up a new instance
+ * is done. this reads the config options from
+ * the rsyslog conf and takes appropriate setup
+ * actions. */
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+ int iNumTpls;
+ uchar *strDup = NULL;
+CODESTARTnewActInst
+ if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL)
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+
+ if(!strcmp(actpblk.descr[i].name, "server")) {
+ pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "serverport")) {
+ pData->port = (int) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "serverpassword")) {
+ pData->serverpassword = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "template")) {
+ pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "dynakey")) {
+ pData->dynaKey = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "userpush")) {
+ pData->useRPush = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "stream.outField")) {
+ pData->streamOutField = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "stream.keyAck")) {
+ pData->streamKeyAck = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "stream.dynaKeyAck")) {
+ pData->streamDynaKeyAck = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "stream.groupAck")) {
+ pData->streamGroupAck = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "stream.dynaGroupAck")) {
+ pData->streamDynaGroupAck = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "stream.indexAck")) {
+ pData->streamIndexAck = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "stream.dynaIndexAck")) {
+ pData->streamDynaIndexAck = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "stream.capacityLimit")) {
+ pData->streamCapacityLimit = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "stream.ack")) {
+ pData->streamAck = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "stream.del")) {
+ pData->streamDel = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "mode")) {
+ pData->modeDescription = es_str2cstr(pvals[i].val.d.estr, NULL);
+ if (!strcmp(pData->modeDescription, "template")) {
+ pData->mode = OMHIREDIS_MODE_TEMPLATE;
+ } else if (!strcmp(pData->modeDescription, "queue")) {
+ pData->mode = OMHIREDIS_MODE_QUEUE;
+ } else if (!strcmp(pData->modeDescription, "publish")) {
+ pData->mode = OMHIREDIS_MODE_PUBLISH;
+ } else if (!strcmp(pData->modeDescription, "set")) {
+ pData->mode = OMHIREDIS_MODE_SET;
+ } else if (!strcmp(pData->modeDescription, "stream")) {
+ pData->mode = OMHIREDIS_MODE_STREAM;
+ } else {
+ dbgprintf("omhiredis: unsupported mode %s\n", actpblk.descr[i].name);
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+ } else if(!strcmp(actpblk.descr[i].name, "key")) {
+ pData->key = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "expiration")) {
+ pData->expiration = pvals[i].val.d.n;
+ dbgprintf("omhiredis: expiration set to %d\n", pData->expiration);
+ } else {
+ dbgprintf("omhiredis: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+ }
+
+ dbgprintf("omhiredis: checking config sanity\n");
+
+ if (!pData->modeDescription) {
+ dbgprintf("omhiredis: no mode specified, setting it to 'template'\n");
+ pData->mode = OMHIREDIS_MODE_TEMPLATE;
+ }
+
+ if (pData->mode == OMHIREDIS_MODE_STREAM && !pData->streamOutField) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: no stream.outField set, "\
+ "using 'msg' as default");
+ pData->streamOutField = ustrdup("msg");
+ }
+
+ if (pData->tplName == NULL) {
+ if(pData->mode == OMHIREDIS_MODE_TEMPLATE) {
+ LogError(0, RS_RET_CONF_PARSE_ERROR, "omhiredis: selected mode requires a template");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ } else {
+ CHKmalloc(pData->tplName = ustrdup("RSYSLOG_ForwardFormat"));
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: no template set, "\
+ "using RSYSLOG_ForwardFormat as default");
+ }
+ }
+
+ if (pData->mode != OMHIREDIS_MODE_TEMPLATE && pData->key == NULL) {
+
+ LogError(0, RS_RET_CONF_PARSE_ERROR,
+ "omhiredis: mode %s requires a key", pData->modeDescription);
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if (pData->expiration && pData->mode != OMHIREDIS_MODE_SET) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: expiration set but mode is not "\
+ "'set', expiration will be ignored");
+ }
+
+ if (pData->mode != OMHIREDIS_MODE_STREAM) {
+ if (pData->streamOutField) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.outField set "\
+ "but mode is not 'stream', field will be ignored");
+ }
+ if (pData->streamAck) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.ack set "\
+ "but mode is not 'stream', XACK will be ignored");
+ }
+ if (pData->streamDel) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.del set "\
+ "but mode is not 'stream', XDEL will be ignored");
+ }
+ if (pData->streamCapacityLimit) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.capacityLimit set "\
+ "but mode is not 'stream', stream trimming will be ignored");
+ }
+ if (pData->streamKeyAck) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.keyAck set "\
+ "but mode is not 'stream', parameter will be ignored");
+ }
+ if (pData->streamDynaKeyAck) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.dynaKeyAck set "\
+ "but mode is not 'stream', parameter will be ignored");
+ }
+ if (pData->streamGroupAck) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.groupAck set "\
+ "but mode is not 'stream', parameter will be ignored");
+ }
+ if (pData->streamDynaGroupAck) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.dynaGroupAck set "\
+ "but mode is not 'stream', parameter will be ignored");
+ }
+ if (pData->streamIndexAck) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.indexAck set "\
+ "but mode is not 'stream', parameter will be ignored");
+ }
+ if (pData->streamDynaIndexAck) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.dynaIndexAck set "\
+ "but mode is not 'stream', parameter will be ignored");
+ }
+ } else {
+ if(pData->streamAck) {
+ if(!pData->streamKeyAck || !pData->streamGroupAck || !pData->streamIndexAck) {
+ LogError(0, RS_RET_CONF_PARSE_ERROR,
+ "omhiredis: 'stream.ack' is set but one of "\
+ "'stream.keyAck', 'stream.groupAck' or 'stream.indexAck' is missing");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+ }
+ if(pData->streamDel) {
+ if(!pData->streamKeyAck || !pData->streamIndexAck) {
+ LogError(0, RS_RET_CONF_PARSE_ERROR,
+ "omhiredis: 'stream.del' is set but one of "\
+ "'stream.keyAck' or 'stream.indexAck' is missing");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+ }
+ }
+
+ if (pData->streamDynaKeyAck && pData->streamKeyAck == NULL) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: 'stream.dynaKeyAck' set "\
+ "but 'stream.keyAck' is empty, disabling");
+ pData->streamDynaKeyAck = 0;
+ }
+ if (pData->streamDynaGroupAck && pData->streamGroupAck == NULL) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: 'stream.dynaGroupAck' set "\
+ "but 'stream.groupAck' is empty, disabling");
+ pData->streamDynaGroupAck = 0;
+ }
+ if (pData->streamDynaIndexAck && pData->streamIndexAck == NULL) {
+ LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: 'stream.dynaGroupAck' set "\
+ "but 'stream.indexAck' is empty, disabling");
+ pData->streamDynaIndexAck = 0;
+ }
+
+ iNumTpls = 1;
+
+ if (pData->dynaKey) {
+ assert(pData->key != NULL);
+ iNumTpls += 1;
+ }
+ if (pData->streamDynaKeyAck) {
+ assert(pData->streamKeyAck != NULL);
+ iNumTpls += 1;
+ }
+ if (pData->streamDynaGroupAck) {
+ assert(pData->streamGroupAck != NULL);
+ iNumTpls += 1;
+ }
+ if (pData->streamDynaIndexAck) {
+ assert(pData->streamIndexAck != NULL);
+ iNumTpls += 1;
+ }
+ CODE_STD_STRING_REQUESTnewActInst(iNumTpls);
+
+ /* Insert templates in opposite order (keep in sync with doAction), order will be
+ * - tplName
+ * - key
+ * - streamKeyAck
+ * - streamGroupAck
+ * - streamIndexAck
+ */
+ if (pData->streamDynaIndexAck) {
+ CHKmalloc(strDup = ustrdup(pData->streamIndexAck));
+ CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS));
+ strDup = NULL; /* handed over */
+ }
+
+ if (pData->streamDynaGroupAck) {
+ CHKmalloc(strDup = ustrdup(pData->streamGroupAck));
+ CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS));
+ strDup = NULL; /* handed over */
+ }
+
+ if (pData->streamDynaKeyAck) {
+ CHKmalloc(strDup = ustrdup(pData->streamKeyAck));
+ CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS));
+ strDup = NULL; /* handed over */
+ }
+
+ if (pData->dynaKey) {
+ CHKmalloc(strDup = ustrdup(pData->key));
+ CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS));
+ strDup = NULL; /* handed over */
+ }
+
+ CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, ustrdup(pData->tplName), OMSR_NO_RQD_TPL_OPTS));
+
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ free(strDup);
+ENDnewActInst
+
+
+NO_LEGACY_CONF_parseSelectorAct
+
+
+BEGINmodExit
+CODESTARTmodExit
+ENDmodExit
+
+/* register our plugin entry points
+ * with the rsyslog core engine */
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_TXIF_OMOD_QUERIES /* supports transaction interface */
+ENDqueryEtryPt
+
+/* note we do not support rsyslog v5 syntax */
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* only supports rsyslog 6 configs */
+CODEmodInit_QueryRegCFSLineHdlr
+ INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
+ if (!bCoreSupportsBatching) {
+ LogError(0, NO_ERRCODE, "omhiredis: rsyslog core does not support batching - abort");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ DBGPRINTF("omhiredis: module compiled with rsyslog version %s.\n", VERSION);
+ENDmodInit