From dcc721a95bef6f0d8e6d8775b8efe33e5aecd562 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 15 Apr 2024 18:28:20 +0200 Subject: Adding upstream version 8.2402.0. Signed-off-by: Daniel Baumann --- action.c | 2398 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 2398 insertions(+) create mode 100644 action.c (limited to 'action.c') diff --git a/action.c b/action.c new file mode 100644 index 0000000..1f77529 --- /dev/null +++ b/action.c @@ -0,0 +1,2398 @@ +/* action.c + * + * Implementation of the action object. + * + * File begun on 2007-08-06 by RGerhards (extracted from syslogd.c) + * + * Some notes on processing (this hopefully makes it easier to find + * the right code in question): For performance reasons, this module + * uses different methods of message submission based on the user-selected + * configuration. This code is similar, but can not be abstracted because + * of the performance-affecting differences in it. As such, it is often + * necessary to triple-check that everything works well in *all* modes. + * The different modes (and calling sequence) are: + * + * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval + * - doSubmitToActionQComplex + * handles mark message reduction, but in essence calls + * - actionWriteToAction + * - doSubmitToActionQ + * (now queue engine processing) + * if(pThis->bWriteAllMarkMsgs == RSFALSE) + * - doSubmitToActionQNotAllMark + * - doSubmitToActionQ (and from here like in the else case below!) + * else + * - doSubmitToActionQ + * - qqueueEnqObj + * (now queue engine processing) + * + * Note that bWriteAllMakrMsgs on or off creates almost the same processing. + * The difference ist that if WriteAllMarkMsgs is not set, we need to + * preprocess the batch and drop mark messages which are not yet due for + * writing. + * + * After dequeue, processing is as follows: + * - processBatchMain + * - processMsgMain (direct entry for DIRECT queue!) + * - ... + * + * MORE ON PROCESSING, QUEUES and FILTERING + * All filtering needs to be done BEFORE messages are enqueued to an + * action. In previous code, part of the filtering was done at the + * "remote end" of the action queue, which lead to problems in + * non-direct mode (because then things run asynchronously). In order + * to solve this problem once and for all, I have changed the code so + * that all filtering is done before enq, and processing on the + * dequeue side of action processing now always executes whatever is + * enqueued. This is the only way to handle things consistently and + * (as much as possible) in a queue-type agnostic way. However, it is + * a rather radical change, which I unfortunately needed to make from + * stable version 5.8.1 to 5.8.2. If new problems pop up, you now know + * what may be their cause. In any case, the way it is done now is the + * only correct one. + * A problem is that, under fortunate conditions, we use the current + * batch for the output system as well. This is very good from a performance + * point of view, but makes the distinction between enq and deq side of + * the queue a bit hard. The current idea is that the filter condition + * alone is checked at the deq side of the queue (seems to be unavoidable + * to do it that way), but all other complex conditons (like failover + * handling) go into the computation of the filter condition. For + * non-direct queues, we still enqueue only what is acutally necessary. + * Note that in this case the rest of the code must ensure that the filter + * is set to "true". While this is not perfect and not as simple as + * we would like to see it, it looks like the best way to tackle that + * beast. + * rgerhards, 2011-06-15 + * + * Copyright 2007-2022 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ +#include "config.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rsyslog.h" +#include "dirty.h" +#include "template.h" +#include "action.h" +#include "modules.h" +#include "cfsysline.h" +#include "srUtils.h" +#include "errmsg.h" +#include "batch.h" +#include "wti.h" +#include "rsconf.h" +#include "datetime.h" +#include "unicode-helper.h" +#include "atomic.h" +#include "ruleset.h" +#include "parserif.h" +#include "statsobj.h" + +/* AIXPORT : cs renamed to legacy_cs as clashes with libpthreads variable in complete file*/ +#ifdef _AIX +#define cs legacy_cs +#endif + +PRAGMA_INGORE_Wswitch_enum + +#ifndef O_LARGEFILE +#define O_LARGEFILE 0 +#endif + +#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ + +/* forward definitions */ +static rsRetVal ATTR_NONNULL() processBatchMain(void *pVoid, batch_t *pBatch, wti_t * const pWti); +static rsRetVal doSubmitToActionQ(action_t * const pAction, wti_t * const pWti, smsg_t*); +static rsRetVal doSubmitToActionQComplex(action_t * const pAction, wti_t * const pWti, smsg_t*); +static rsRetVal doSubmitToActionQNotAllMark(action_t * const pAction, wti_t * const pWti, smsg_t*); +static void ATTR_NONNULL() actionSuspend(action_t * const pThis, wti_t * const pWti); +static void ATTR_NONNULL() actionRetry(action_t * const pThis, wti_t * const pWti); + +/* object static data (once for all instances) */ +DEFobjCurrIf(obj) +DEFobjCurrIf(datetime) +DEFobjCurrIf(module) +DEFobjCurrIf(statsobj) +DEFobjCurrIf(ruleset) + + +typedef struct configSettings_s { + int bActExecWhenPrevSusp; /* execute action only when previous one was suspended? */ + int bActionWriteAllMarkMsgs; /* should all mark messages be unconditionally written? */ + int iActExecOnceInterval; /* execute action once every nn seconds */ + int iActExecEveryNthOccur; /* execute action every n-th occurrence (0,1=always) */ + time_t iActExecEveryNthOccurTO; /* timeout for n-occurrence setting (in seconds, 0=never) */ + int glbliActionResumeInterval; + int glbliActionResumeRetryCount; /* how often should suspended actions be retried? */ + int bActionRepMsgHasMsg; /* last messsage repeated... has msg fragment in it */ + uchar *pszActionName; /* short name for the action */ + /* action queue and its configuration parameters */ + queueType_t ActionQueType; /* type of the main message queue above */ + int iActionQueueSize; /* size of the main message queue above */ + int iActionQueueDeqBatchSize; /* batch size for action queues */ + int iActionQHighWtrMark; /* high water mark for disk-assisted queues */ + int iActionQLowWtrMark; /* low water mark for disk-assisted queues */ + int iActionQDiscardMark; /* begin to discard messages */ + int iActionQDiscardSeverity; + /* by default, discard nothing to prevent unintentional loss */ + int iActionQueueNumWorkers; /* number of worker threads for the mm queue above */ + uchar *pszActionQFName; /* prefix for the main message queue file */ + int64 iActionQueMaxFileSize; + int iActionQPersistUpdCnt; /* persist queue info every n updates */ + int bActionQSyncQeueFiles; /* sync queue files */ + int iActionQtoQShutdown; /* queue shutdown */ + int iActionQtoActShutdown; /* action shutdown (in phase 2) */ + int iActionQtoEnq; /* timeout for queue enque */ + int iActionQtoWrkShutdown; /* timeout for worker thread shutdown */ + int iActionQWrkMinMsgs; /* minimum messages per worker needed to start a new one */ + int bActionQSaveOnShutdown; /* save queue on shutdown (when DA enabled)? */ + int64 iActionQueMaxDiskSpace; /* max disk space allocated 0 ==> unlimited */ + int iActionQueueDeqSlowdown; /* dequeue slowdown (simple rate limiting) */ + int iActionQueueDeqtWinFromHr; /* hour begin of time frame when queue is to be dequeued */ + int iActionQueueDeqtWinToHr; /* hour begin of time frame when queue is to be dequeued */ +} configSettings_t; + + +static configSettings_t cs; /* our current config settings */ + +/* tables for interfacing with the v6 config system */ +static struct cnfparamdescr cnfparamdescr[] = { + { "name", eCmdHdlrGetWord, 0 }, /* legacy: actionname */ + { "type", eCmdHdlrString, CNFPARAM_REQUIRED }, /* legacy: actionname */ + { "action.errorfile", eCmdHdlrString, 0 }, + { "action.errorfile.maxsize", eCmdHdlrInt, 0 }, + { "action.writeallmarkmessages", eCmdHdlrBinary, 0 }, /* legacy: actionwriteallmarkmessages */ + { "action.execonlyeverynthtime", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyeverynthtime */ + { "action.execonlyeverynthtimetimeout", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyeverynthtimetimeout */ + { "action.execonlyonceeveryinterval", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyonceeveryinterval */ + { "action.execonlywhenpreviousissuspended", eCmdHdlrBinary, 0 }, + /* legacy: actionexeconlywhenpreviousissuspended */ + { "action.repeatedmsgcontainsoriginalmsg", eCmdHdlrBinary, 0 }, /* legacy: repeatedmsgcontainsoriginalmsg */ + { "action.resumeretrycount", eCmdHdlrInt, 0 }, /* legacy: actionresumeretrycount */ + { "action.reportsuspension", eCmdHdlrBinary, 0 }, + { "action.reportsuspensioncontinuation", eCmdHdlrBinary, 0 }, + { "action.resumeintervalmax", eCmdHdlrPositiveInt, 0 }, + { "action.resumeinterval", eCmdHdlrInt, 0 }, + { "action.externalstate.file", eCmdHdlrString, 0 }, + { "action.copymsg", eCmdHdlrBinary, 0 } +}; +static struct cnfparamblk pblk = + { CNFPARAMBLK_VERSION, + sizeof(cnfparamdescr)/sizeof(struct cnfparamdescr), + cnfparamdescr + }; + + +/* primarily a helper for debug purposes, get human-readble name of state */ +/* currently not needed, but may be useful in the future! */ +#if 0 +static const char * +batchState2String(const batch_state_t state) +{ + switch(state) { + case BATCH_STATE_RDY: + return "BATCH_STATE_RDY"; + case BATCH_STATE_BAD: + return "BATCH_STATE_BAD"; + case BATCH_STATE_SUB: + return "BATCH_STATE_SUB"; + case BATCH_STATE_COMM: + return "BATCH_STATE_COMM"; + case BATCH_STATE_DISC: + return "BATCH_STATE_DISC"; + default: + return "ERROR, batch state not known!"; + } +} +#endif // #if 0 + +/* ------------------------------ methods ------------------------------ */ + +/* This function returns the "current" time for this action. Current time + * is not necessarily real-time. In order to enhance performance, current + * system time is obtained the first time an action needs to know the time + * and then kept cached inside the action structure. Later requests will + * always return that very same time. Wile not totally accurate, it is far + * accurate in most cases and considered "acurate enough" for all cases. + * When changing the threading model, please keep in mind that this + * logic needs to be changed should we once allow more than one parallel + * call into the same action (object). As this is currently not supported, + * we simply cache the time inside the action object itself, after it + * is under mutex protection. + * Side-note: the value -1 is used as tActNow, because it also is the + * error return value of time(). So we would do a retry with the next + * invocation if time() failed. Then, of course, we would probably already + * be in trouble, but for the sake of performance we accept this very, + * very slight risk. + * This logic has been added as part of an overall performance improvment + * effort inspired by David Lang. -- rgerhards, 2008-09-16 + * Note: this function does not use the usual iRet call conventions + * because that would provide little to no benefit but complicate things + * a lot. So we simply return the system time. + */ +static time_t +getActNow(action_t * const pThis) +{ + assert(pThis != NULL); + if(pThis->tActNow == -1) { + pThis->tActNow = datetime.GetTime(NULL); /* good time call - the only one done */ + if(pThis->tLastExec > pThis->tActNow) { + /* if we are traveling back in time, reset tLastExec */ + pThis->tLastExec = (time_t) 0; + } + } + + return pThis->tActNow; +} + + +/* resets action queue parameters to their default values. This happens + * after each action has been created in order to prevent any wild defaults + * to be used. It is somewhat against the original spirit of the config file + * reader, but I think it is a good thing to do. + * rgerhards, 2008-01-29 + */ +static rsRetVal +actionResetQueueParams(void) +{ + DEFiRet; + + cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ + cs.iActionQueueSize = 1000; /* size of the main message queue above */ + cs.iActionQueueDeqBatchSize = 16; /* default batch size */ + cs.iActionQHighWtrMark = -1; /* high water mark for disk-assisted queues */ + cs.iActionQLowWtrMark = -1; /* low water mark for disk-assisted queues */ + cs.iActionQDiscardMark = 980; /* begin to discard messages */ + cs.iActionQDiscardSeverity = 8; /* discard warning and above */ + cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ + cs.iActionQueMaxFileSize = 1024*1024; + cs.iActionQPersistUpdCnt = 0; /* persist queue info every n updates */ + cs.bActionQSyncQeueFiles = 0; + cs.iActionQtoQShutdown = 0; /* queue shutdown */ + cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ + cs.iActionQtoEnq = 50; /* timeout for queue enque */ + cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ + cs.iActionQWrkMinMsgs = -1; /* minimum messages per worker needed to start a new one */ + cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ + cs.iActionQueMaxDiskSpace = 0; + cs.iActionQueueDeqSlowdown = 0; + cs.iActionQueueDeqtWinFromHr = 0; + cs.iActionQueueDeqtWinToHr = 25; /* 25 disables time windowed dequeuing */ + + cs.glbliActionResumeRetryCount = 0; /* I guess it is smart to reset this one, too */ + + free(cs.pszActionQFName); + cs.pszActionQFName = NULL; /* prefix for the main message queue file */ + + RETiRet; +} + +/* free action worker data table +*/ +static void freeWrkrDataTable(action_t * const pThis) +{ + int freeSpot; + for(freeSpot = 0; freeSpot < pThis->wrkrDataTableSize; ++freeSpot) { + if(pThis->wrkrDataTable[freeSpot] != NULL) { + pThis->pMod->mod.om.freeWrkrInstance(pThis->wrkrDataTable[freeSpot]); + pThis->wrkrDataTable[freeSpot] = NULL; + } + } + free(pThis->wrkrDataTable); + return; +} + +/* destructs an action descriptor object + * rgerhards, 2007-08-01 + */ +rsRetVal actionDestruct(action_t * const pThis) +{ + DEFiRet; + assert(pThis != NULL); + + if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) { + /* discard actions will be optimized out */ + FINALIZE; + } + + if(pThis->pQueue != NULL) { + qqueueDestruct(&pThis->pQueue); + } + + /* destroy stats object, if we have one (may not always be + * be the case, e.g. if turned off) + */ + if(pThis->statsobj != NULL) + statsobj.Destruct(&pThis->statsobj); + + if(pThis->pModData != NULL) + pThis->pMod->freeInstance(pThis->pModData); + + if(pThis->fdErrFile != -1) + close(pThis->fdErrFile); + pthread_mutex_destroy(&pThis->mutErrFile); + pthread_mutex_destroy(&pThis->mutAction); + pthread_mutex_destroy(&pThis->mutWrkrDataTable); + free((void*)pThis->pszErrFile); + free((void*)pThis->pszExternalStateFile); + free(pThis->pszName); + free(pThis->ppTpl); + free(pThis->peParamPassing); + freeWrkrDataTable(pThis); + +finalize_it: + free(pThis); + RETiRet; +} + + +/* Disable action, this means it will never again be usable + * until rsyslog is reloaded. Use only as a last resort, but + * depends on output module. + * rgerhards, 2007-08-02 + */ +static inline void +actionDisable(action_t *__restrict__ const pThis) +{ + pThis->bDisabled = 1; +} + + + +/* create a new action descriptor object + * rgerhards, 2007-08-01 + * Note that it is vital to set proper initial values as the v6 config + * system depends on these! + */ +rsRetVal actionConstruct(action_t **ppThis) +{ + DEFiRet; + action_t *pThis; + + assert(ppThis != NULL); + + CHKmalloc(pThis = (action_t*) calloc(1, sizeof(action_t))); + pThis->iResumeInterval = 30; + pThis->iResumeIntervalMax = 1800; /* max interval default is half an hour */ + pThis->iResumeRetryCount = 0; + pThis->pszName = NULL; + pThis->pszErrFile = NULL; + pThis->maxErrFileSize = 0; + pThis->currentErrFileSize = 0; + pThis->pszExternalStateFile = NULL; + pThis->fdErrFile = -1; + pThis->bWriteAllMarkMsgs = 1; + pThis->iExecEveryNthOccur = 0; + pThis->iExecEveryNthOccurTO = 0; + pThis->iSecsExecOnceInterval = 0; + pThis->bExecWhenPrevSusp = 0; + pThis->bRepMsgHasMsg = 0; + pThis->bDisabled = 0; + pThis->isTransactional = 0; + pThis->bReportSuspension = -1; /* indicate "not yet set" */ + pThis->bReportSuspensionCont = -1; /* indicate "not yet set" */ + pThis->bCopyMsg = 0; + pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ + pThis->iActionNbr = loadConf->actions.iActionNbr; + pthread_mutex_init(&pThis->mutErrFile, NULL); + pthread_mutex_init(&pThis->mutAction, NULL); + pthread_mutex_init(&pThis->mutWrkrDataTable, NULL); + INIT_ATOMIC_HELPER_MUT(pThis->mutCAS); + + /* indicate we have a new action */ + loadConf->actions.iActionNbr++; + +finalize_it: + *ppThis = pThis; + RETiRet; +} + + +/* action construction finalizer + */ +rsRetVal +actionConstructFinalize(action_t *__restrict__ const pThis, struct nvlst *lst) +{ + DEFiRet; + uchar pszAName[64]; /* friendly name of our action */ + + if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) { + /* discard actions will be optimized out */ + FINALIZE; + } + /* generate a friendly name for us action stats */ + if(pThis->pszName == NULL) { + snprintf((char*) pszAName, sizeof(pszAName), "action-%d-%s", + pThis->iActionNbr, pThis->pMod->pszName); + pThis->pszName = ustrdup(pszAName); + } + + /* cache transactional attribute */ + pThis->isTransactional = pThis->pMod->mod.om.supportsTX; + if(pThis->isTransactional) { + int i; + for(i = 0 ; i < pThis->iNumTpls ; ++i) { + if(pThis->peParamPassing[i] != ACT_STRING_PASSING) { + LogError(0, RS_RET_INVLD_OMOD, "action '%s'(%d) is transactional but " + "parameter %d " + "uses invalid parameter passing mode -- disabling " + "action. This is probably caused by a pre-v7 " + "output module that needs upgrade.", + pThis->pszName, pThis->iActionNbr, i); + actionDisable(pThis); + ABORT_FINALIZE(RS_RET_INVLD_OMOD); + + } + } + } + + + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&pThis->statsobj)); + CHKiRet(statsobj.SetName(pThis->statsobj, pThis->pszName)); + CHKiRet(statsobj.SetOrigin(pThis->statsobj, (uchar*)"core.action")); + + STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrProcessed)); + + STATSCOUNTER_INIT(pThis->ctrFail, pThis->mutCtrFail); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("failed"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrFail)); + + STATSCOUNTER_INIT(pThis->ctrSuspend, pThis->mutCtrSuspend); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrSuspend)); + STATSCOUNTER_INIT(pThis->ctrSuspendDuration, pThis->mutCtrSuspendDuration); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended.duration"), + ctrType_IntCtr, 0, &pThis->ctrSuspendDuration)); + + STATSCOUNTER_INIT(pThis->ctrResume, pThis->mutCtrResume); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("resumed"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrResume)); + + CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); + + /* create our queue */ + + /* generate a friendly name for the queue */ + snprintf((char*) pszAName, sizeof(pszAName), "%s queue", + pThis->pszName); + + /* now check if we can run the action in "firehose mode" during stage one of + * its processing (that is before messages are enqueued into the action q). + * This is only possible if some features, which require strict sequence, are + * not used. Thankfully, that is usually the case. The benefit of firehose + * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08 + */ + if( pThis->iExecEveryNthOccur > 1 + || pThis->iSecsExecOnceInterval + ) { + DBGPRINTF("info: firehose mode disabled for action because " + "iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n", + pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval); + pThis->submitToActQ = doSubmitToActionQComplex; + } else if(pThis->bWriteAllMarkMsgs) { + /* full firehose submission mode, default case*/ + pThis->submitToActQ = doSubmitToActionQ; + } else { + /* nearly full-speed submission mode */ + pThis->submitToActQ = doSubmitToActionQNotAllMark; + } + + /* create queue */ + /* action queues always (for now) have just one worker. This may change when + * we begin to implement an interface the enable output modules to request + * to be run on multiple threads. So far, this is forbidden by the interface + * spec. -- rgerhards, 2008-01-30 + */ + CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, + processBatchMain)); + obj.SetName((obj_t*) pThis->pQueue, pszAName); + qqueueSetpAction(pThis->pQueue, pThis); + + if(lst == NULL) { /* use legacy params? */ + /* ... set some properties ... */ +# define setQPROP(func, directive, data) \ + CHKiRet_Hdlr(func(pThis->pQueue, data)) { \ + LogError(0, NO_ERRCODE, "Invalid " #directive ", \ + error %d. Ignored, running with default setting", iRet); \ + } +# define setQPROPstr(func, directive, data) \ + CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \ + LogError(0, NO_ERRCODE, "Invalid " #directive ", \ + error %d. Ignored, running with default setting", iRet); \ + } + setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", cs.iActionQueMaxDiskSpace); + setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", cs.iActionQueueDeqBatchSize); + setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", cs.iActionQueMaxFileSize); + setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", cs.pszActionQFName); + setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", cs.iActionQPersistUpdCnt); + setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", cs.bActionQSyncQeueFiles); + setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", cs.iActionQtoQShutdown ); + setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", cs.iActionQtoActShutdown); + setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", cs.iActionQtoWrkShutdown); + setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", cs.iActionQtoEnq); + setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", cs.iActionQHighWtrMark); + setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", cs.iActionQLowWtrMark); + setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark); + setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs); + setQPROP(qqueueSetiNumWorkerThreads, "$ActionQueueWorkerThreads", cs.iActionQueueNumWorkers); + setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown); + setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown); + setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr); + setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", cs.iActionQueueDeqtWinToHr); + } else { + /* we have v6-style config params */ + qqueueSetDefaultsActionQueue(pThis->pQueue); + qqueueApplyCnfParam(pThis->pQueue, lst); + } + qqueueCorrectParams(pThis->pQueue); + +# undef setQPROP +# undef setQPROPstr + + qqueueDbgPrint(pThis->pQueue); + + DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue); + + if(pThis->bUsesMsgPassingMode && pThis->pQueue->qType != QUEUETYPE_DIRECT) { + parser_warnmsg("module %s with message passing mode uses " + "non-direct queue. This most probably leads to undesired " + "results. For message modificaton modules (mm*), this means " + "that they will have no effect - " + "see https://www.rsyslog.com/mm-no-queue/", (char*)modGetName(pThis->pMod)); + } + + /* and now reset the queue params (see comment in its function header!) */ + actionResetQueueParams(); + +finalize_it: + RETiRet; +} + + + +/* set the global resume interval + */ +rsRetVal actionSetGlobalResumeInterval(int iNewVal) +{ + cs.glbliActionResumeInterval = iNewVal; + return RS_RET_OK; +} + + +/* returns the action state name in human-readable form + * returned string must not be modified. + * rgerhards, 2009-05-07 + */ +static uchar *getActStateName(action_t * const pThis, wti_t * const pWti) +{ + switch(getActionState(pWti, pThis)) { + case ACT_STATE_RDY: + return (uchar*) "rdy"; + case ACT_STATE_ITX: + return (uchar*) "itx"; + case ACT_STATE_RTRY: + return (uchar*) "rtry"; + case ACT_STATE_SUSP: + return (uchar*) "susp"; + case ACT_STATE_DATAFAIL: + return (uchar*) "datafail"; + default: + return (uchar*) "ERROR/UNKNWON"; + } +} + + +/* returns a suitable return code based on action state + * rgerhards, 2009-05-07 + */ +static rsRetVal getReturnCode(action_t * const pThis, wti_t * const pWti) +{ + DEFiRet; + + switch(getActionState(pWti, pThis)) { + case ACT_STATE_RDY: + iRet = RS_RET_OK; + break; + case ACT_STATE_ITX: + if(pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit) { + pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit = 0; /* auto-reset */ + iRet = RS_RET_PREVIOUS_COMMITTED; + } else { + iRet = RS_RET_DEFER_COMMIT; + } + break; + case ACT_STATE_RTRY: + iRet = RS_RET_SUSPENDED; + break; + case ACT_STATE_SUSP: + iRet = RS_RET_ACTION_FAILED; + break; + case ACT_STATE_DATAFAIL: + iRet = RS_RET_DATAFAIL; + break; + default: + DBGPRINTF("Invalid action engine state %u, program error\n", + getActionState(pWti, pThis)); + iRet = RS_RET_ERR; + break; + } + + RETiRet; +} + + +/* set the action to a new state + * rgerhards, 2007-08-02 + */ +static void +actionSetState(action_t * const pThis, wti_t * const pWti, uint8_t newState) +{ + setActionState(pWti, pThis, newState); + DBGPRINTF("action[%s] transitioned to state: %s\n", + pThis->pszName, getActStateName(pThis, pWti)); +} + +/* Handles the transient commit state. So far, this is + * mostly a dummy... + * rgerhards, 2007-08-02 + */ +static void actionCommitted(action_t * const pThis, wti_t * const pWti) +{ + actionSetState(pThis, pWti, ACT_STATE_RDY); +} + + +/* set action state according to external state file (if configured) +*/ +static rsRetVal ATTR_NONNULL() +checkExternalStateFile(action_t *const pThis, wti_t *const pWti) +{ + char filebuf[1024]; + int fd = -1; + int r; + DEFiRet; + + DBGPRINTF("checking external state file\n"); + + if(pThis->pszExternalStateFile == NULL) { + FINALIZE; + } + + fd = open(pThis->pszExternalStateFile, O_RDONLY|O_CLOEXEC); + if(fd == -1) { + dbgprintf("could not read external state file\n"); + FINALIZE; + } + + r = read(fd, filebuf, sizeof(filebuf) - 1); + if(r < 1) { + dbgprintf("checkExternalStateFile read() returned %d\n", r); + FINALIZE; + } + + filebuf[r] = '\0'; + dbgprintf("external state file content: '%s'\n", filebuf); + /* trim trailing whitespace */ + for(int j = r-1 ; j > 0 ; --j) { + if(filebuf[j] == '\n' || filebuf[j] == '\t' || filebuf[j] == ' ') { + filebuf[j] = '\0'; + } else { + break; + } + } + if(!strcmp(filebuf, "SUSPENDED")) { + LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING, + "action '%s' suspended (module '%s') by external state file", + pThis->pszName, pThis->pMod->pszName); + actionRetry(pThis, pWti); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: + if(fd != -1) { + close(fd); + } + DBGPRINTF("done checking external state file, iRet=%d\n", iRet); + RETiRet; +} + + +/* we need to defer setting the action's own bReportSuspension state until + * after the full config has been processed. So the most simple case to do + * that is here. It's not a performance problem, as it happens infrequently. + * it's not a threading race problem, as always the same value will be written. + * As we need to do this in several places, we have moved the code to its own + * helper function. + */ +static void +setSuspendMessageConfVars(action_t *__restrict__ const pThis) +{ + if(pThis->bReportSuspension == -1) + pThis->bReportSuspension = runConf->globals.bActionReportSuspension; + if(pThis->bReportSuspensionCont == -1) { + pThis->bReportSuspensionCont = runConf->globals.bActionReportSuspensionCont; + if(pThis->bReportSuspensionCont == -1) + pThis->bReportSuspensionCont = 1; + } +} + + +/* set action to "rtry" state. + * rgerhards, 2007-08-02 + */ +static void ATTR_NONNULL() actionRetry(action_t * const pThis, wti_t * const pWti) +{ + setSuspendMessageConfVars(pThis); + actionSetState(pThis, pWti, ACT_STATE_RTRY); + if(pThis->bReportSuspension) { + LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING, + "action '%s' suspended (module '%s'), retry %d. There should " + "be messages before this one giving the reason for suspension.", + pThis->pszName, pThis->pMod->pszName, + getActionNbrResRtry(pWti, pThis)); + } + incActionResumeInRow(pWti, pThis); +} + +/* Suspend action, this involves changing the action state as well + * as setting the next retry time. + * if we have more than 10 retries, we prolong the + * retry interval. If something is really stalled, it will + * get re-tried only very, very seldom - but that saves + * CPU time. TODO: maybe a config option for that? + * rgerhards, 2007-08-02 + */ +static void ATTR_NONNULL() +actionSuspend(action_t * const pThis, wti_t * const pWti) +{ + time_t ttNow; + int suspendDuration; + char timebuf[32]; + + setSuspendMessageConfVars(pThis); + + /* note: we can NOT use a cached timestamp, as time may have evolved + * since caching, and this would break logic (and it actually did so!) + */ + datetime.GetTime(&ttNow); + suspendDuration = pThis->iResumeInterval * (getActionNbrResRtry(pWti, pThis) / 10 + 1); + if(pThis->iResumeIntervalMax > 0 && suspendDuration > pThis->iResumeIntervalMax) { + suspendDuration = pThis->iResumeIntervalMax; + } + pThis->ttResumeRtry = ttNow + suspendDuration; + actionSetState(pThis, pWti, ACT_STATE_SUSP); + pThis->ctrSuspendDuration += suspendDuration; + if(getActionNbrResRtry(pWti, pThis) == 0) { + STATSCOUNTER_INC(pThis->ctrSuspend, pThis->mutCtrSuspend); + } + + if( pThis->bReportSuspensionCont + || (pThis->bReportSuspension && getActionNbrResRtry(pWti, pThis) == 0) ) { + ctime_r(&pThis->ttResumeRtry, timebuf); + timebuf[strlen(timebuf)-1] = '\0'; /* strip LF */ + LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING, + "action '%s' suspended (module '%s'), next retry is %s, retry nbr %d. " + "There should be messages before this one giving the reason for suspension.", + pThis->pszName, pThis->pMod->pszName, timebuf, + getActionNbrResRtry(pWti, pThis)); + } + DBGPRINTF("action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d, " + "duration %d\n", + pThis->pszName, (long long) pThis->ttResumeRtry, (long long) ttNow, + getActionNbrResRtry(pWti, pThis), suspendDuration); +} + + +/* actually do retry processing. Note that the function receives a timestamp so + * that we do not need to call the (expensive) time() API. + * Note that we do the full retry processing here, doing the configured number of + * iterations. -- rgerhards, 2009-05-07 + * We need to guard against module which always return RS_RET_OK from their tryResume() + * entry point. This is invalid, but has harsh consequences: it will cause the rsyslog + * engine to go into a tight loop. That obviously is not acceptable. As such, we track the + * count of iterations that a tryResume returning RS_RET_OK is immediately followed by + * an unsuccessful call to doAction(). If that happens more than 10 times, we assume + * the return acutally is a RS_RET_SUSPENDED. In order to go through the various + * resumption stages, we do this for every 10 requests. This magic number 10 may + * not be the most appropriate, but it should be thought of a "if nothing else helps" + * kind of facility: in the first place, the module should return a proper indication + * of its inability to recover. -- rgerhards, 2010-04-26. + */ +static rsRetVal ATTR_NONNULL() +actionDoRetry(action_t * const pThis, wti_t * const pWti) +{ + int iRetries; + int iSleepPeriod; + int bTreatOKasSusp; + DEFiRet; + + assert(pThis != NULL); + + iRetries = 0; + while((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { + DBGPRINTF("actionDoRetry: %s enter loop, iRetries=%d, ResumeInRow %d\n", + pThis->pszName, iRetries, getActionResumeInRow(pWti, pThis)); + iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); + DBGPRINTF("actionDoRetry: %s action->tryResume returned %d\n", pThis->pszName, iRet); + if((getActionResumeInRow(pWti, pThis) > 9) && (getActionResumeInRow(pWti, pThis) % 10 == 0)) { + bTreatOKasSusp = 1; + setActionResumeInRow(pWti, pThis, 0); + iRet = RS_RET_SUSPENDED; + } else { + bTreatOKasSusp = 0; + } + if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { + DBGPRINTF("actionDoRetry: %s had success RDY again (iRet=%d)\n", + pThis->pszName, iRet); + STATSCOUNTER_INC(pThis->ctrResume, pThis->mutCtrResume); + if(pThis->bReportSuspension) { + LogMsg(0, RS_RET_RESUMED, LOG_INFO, "action '%s' " + "resumed (module '%s')", + pThis->pszName, pThis->pMod->pszName); + } + actionSetState(pThis, pWti, ACT_STATE_RDY); + } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { + /* max retries reached? */ + DBGPRINTF("actionDoRetry: %s check for max retries, iResumeRetryCount " + "%d, iRetries %d\n", + pThis->pszName, pThis->iResumeRetryCount, iRetries); + if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { + actionSuspend(pThis, pWti); + if(getActionNbrResRtry(pWti, pThis) < 20) + incActionNbrResRtry(pWti, pThis); + } else { + ++iRetries; + iSleepPeriod = pThis->iResumeInterval; + srSleep(iSleepPeriod, 0); + if(*pWti->pbShutdownImmediate) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } + } + } else if(iRet == RS_RET_DISABLE_ACTION) { + actionDisable(pThis); + } + } + + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { + setActionNbrResRtry(pWti, pThis, 0); + } + +finalize_it: + RETiRet; +} + + +/* special retry handling if disabled via file: simply wait for the file + * to indicate whether or not it is ready again + */ +static rsRetVal ATTR_NONNULL() +actionDoRetry_extFile(action_t *const pThis, wti_t *const pWti) +{ + int iRetries; + int iSleepPeriod; + DEFiRet; + + assert(pThis != NULL); + + DBGPRINTF("actionDoRetry_extFile: enter, actionState: %d\n",getActionState(pWti, pThis)); + iRetries = 0; + while((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { + DBGPRINTF("actionDoRetry_extFile: %s enter loop, iRetries=%d, ResumeInRow %d\n", + pThis->pszName, iRetries, getActionResumeInRow(pWti, pThis)); + iRet = checkExternalStateFile(pThis, pWti); + DBGPRINTF("actionDoRetry_extFile: %s checkExternalStateFile returned %d\n", pThis->pszName, iRet); + if(iRet == RS_RET_OK) { + DBGPRINTF("actionDoRetry_extFile: %s had success RDY again (iRet=%d)\n", + pThis->pszName, iRet); + if(pThis->bReportSuspension) { + LogMsg(0, RS_RET_RESUMED, LOG_INFO, "action '%s' " + "resumed (module '%s') via external state file", + pThis->pszName, pThis->pMod->pszName); + } + actionSetState(pThis, pWti, ACT_STATE_RDY); + } else if(iRet == RS_RET_SUSPENDED) { + /* max retries reached? */ + DBGPRINTF("actionDoRetry_extFile: %s check for max retries, iResumeRetryCount " + "%d, iRetries %d\n", + pThis->pszName, pThis->iResumeRetryCount, iRetries); + if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { + DBGPRINTF("actionDoRetry_extFile: did not work out, suspending\n"); + actionSuspend(pThis, pWti); + pWti->execState.bPrevWasSuspended = 1; + if(getActionNbrResRtry(pWti, pThis) < 20) + incActionNbrResRtry(pWti, pThis); + } else { + ++iRetries; + iSleepPeriod = pThis->iResumeInterval; + srSleep(iSleepPeriod, 0); + if(*pWti->pbShutdownImmediate) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } + } + } else if(iRet == RS_RET_DISABLE_ACTION) { + actionDisable(pThis); + } + } + + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { + setActionNbrResRtry(pWti, pThis, 0); + } + +finalize_it: + RETiRet; +} + +static rsRetVal +actionCheckAndCreateWrkrInstance(action_t * const pThis, const wti_t *const pWti) +{ + int locked = 0; + DEFiRet; + if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) { + DBGPRINTF("wti %p: we need to create a new action worker instance for " + "action %d\n", pWti, pThis->iActionNbr); + CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), + pThis->pModData)); + pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; + setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */ + + /* maintain worker data table -- only needed if wrkrHUP is requested! */ + + pthread_mutex_lock(&pThis->mutWrkrDataTable); + locked = 1; + int freeSpot; + for(freeSpot = 0 ; freeSpot < pThis->wrkrDataTableSize ; ++freeSpot) + if(pThis->wrkrDataTable[freeSpot] == NULL) + break; + if(pThis->nWrkr == pThis->wrkrDataTableSize) { + void *const newTable = realloc(pThis->wrkrDataTable, + (pThis->wrkrDataTableSize + 1) * sizeof(void*)); + if(newTable == NULL) { + DBGPRINTF("actionCheckAndCreateWrkrInstance: out of " + "memory realloc wrkrDataTable\n") + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + pThis->wrkrDataTable = newTable; + pThis->wrkrDataTableSize++; + } + pThis->wrkrDataTable[freeSpot] = pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData; + pThis->nWrkr++; + DBGPRINTF("wti %p: created action worker instance %d for " + "action %d\n", pWti, pThis->nWrkr, pThis->iActionNbr); + } +finalize_it: + if(locked) { + pthread_mutex_unlock(&pThis->mutWrkrDataTable); + } + RETiRet; +} + +/* try to resume an action -- rgerhards, 2007-08-02 + * changed to new action state engine -- rgerhards, 2009-05-07 + */ +static rsRetVal +actionTryResume(action_t * const pThis, wti_t * const pWti) +{ + DEFiRet; + time_t ttNow = NO_TIME_PROVIDED; + + if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { + /* if we are suspended, we need to check if the timeout expired. + * for this handling, we must always obtain a fresh timestamp. We used + * to use the action timestamp, but in this case we will never reach a + * point where a resumption is actually tried, because the action timestamp + * is always in the past. So we can not avoid doing a fresh time() call + * here. -- rgerhards, 2009-03-18 + */ + datetime.GetTime(&ttNow); /* cache "now" */ + if(ttNow >= pThis->ttResumeRtry) { + actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */ + } + } + + if(getActionState(pWti, pThis) == ACT_STATE_RTRY) { + CHKiRet(actionDoRetry(pThis, pWti)); + } + + if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY || + getActionState(pWti, pThis) == ACT_STATE_SUSP)) { + if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ + datetime.GetTime(&ttNow); + dbgprintf("actionTryResume: action[%s] state: %s, next retry (if applicable): %u [now %u]\n", + pThis->pszName, getActStateName(pThis, pWti), + (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + } + +finalize_it: + RETiRet; +} + + +/* prepare an action for performing work. This involves trying to recover it, + * depending on its current state. + * rgerhards, 2009-05-07 + */ +static rsRetVal ATTR_NONNULL() +actionPrepare(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) +{ + DEFiRet; + +DBGPRINTF("actionPrepare[%s]: enter\n", pThis->pszName); + CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); + CHKiRet(actionTryResume(pThis, pWti)); + + /* if we are now ready, we initialize the transaction and advance + * action state accordingly + */ + if(getActionState(pWti, pThis) == ACT_STATE_RDY) { + iRet = checkExternalStateFile(pThis, pWti); + if(iRet == RS_RET_SUSPENDED) { + DBGPRINTF("actionPrepare[%s]: SUSPENDED via external state file, " + "doing retry processing\n", pThis->pszName); + CHKiRet(actionDoRetry_extFile(pThis, pWti)); + } + iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); + switch(iRet) { + case RS_RET_OK: + actionSetState(pThis, pWti, ACT_STATE_ITX); + break; + case RS_RET_SUSPENDED: + actionRetry(pThis, pWti); + break; + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + default:FINALIZE; + } + } + +finalize_it: + RETiRet; +} + + +/* prepare the calling parameters for doAction() + * rgerhards, 2009-05-07 + */ +static rsRetVal +prepareDoActionParams(action_t * __restrict__ const pAction, + wti_t * __restrict__ const pWti, + smsg_t *__restrict__ const pMsg, + struct syslogTime *ttNow) +{ + int i; + struct json_object *json; + actWrkrIParams_t *iparams; + actWrkrInfo_t *__restrict__ pWrkrInfo; + DEFiRet; + + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + if(pAction->isTransactional) { + CHKiRet(wtiNewIParam(pWti, pAction, &iparams)); + for(i = 0 ; i < pAction->iNumTpls ; ++i) { + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, + &actParam(iparams, pAction->iNumTpls, 0, i), + ttNow)); + } + } else { + for(i = 0 ; i < pAction->iNumTpls ; ++i) { + switch(pAction->peParamPassing[i]) { + case ACT_STRING_PASSING: + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, + &(pWrkrInfo->p.nontx.actParams[i]), + ttNow)); + break; + /* note: ARRAY_PASSING mode has been removed in 8.26.0; if it + * is ever needed again, it can be found in 8.25.0. + * rgerhards 2017-03-06 + */ + case ACT_MSG_PASSING: + pWrkrInfo->p.nontx.actParams[i].param = (void*) pMsg; + break; + case ACT_JSON_PASSING: + CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow)); + pWrkrInfo->p.nontx.actParams[i].param = (void*) json; + break; + default:dbgprintf("software bug/error: unknown " + "pAction->peParamPassing[%d] %d in prepareDoActionParams\n", + i, (int) pAction->peParamPassing[i]); + break; + } + } + } + +finalize_it: + RETiRet; +} + + +void +releaseDoActionParams(action_t *__restrict__ const pAction, wti_t *__restrict__ const pWti, int action_destruct) +{ + int j; + actWrkrInfo_t *__restrict__ pWrkrInfo; + + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + if (action_destruct) { + if (ACT_STRING_PASSING == pAction->peParamPassing[j]) { + free(pWrkrInfo->p.nontx.actParams[j].param); + pWrkrInfo->p.nontx.actParams[j].param = NULL; + pWrkrInfo->p.nontx.actParams[j].lenBuf = 0; + pWrkrInfo->p.nontx.actParams[j].lenStr = 0; + } + } else { + switch(pAction->peParamPassing[j]) { + case ACT_ARRAY_PASSING: + LogError(0, RS_RET_ERR, "plugin error: no longer supported " + "ARRAY_PASSING mode is used (see action.c)"); + return; + case ACT_JSON_PASSING: + json_object_put((struct json_object*) + pWrkrInfo->p.nontx.actParams[j].param); + pWrkrInfo->p.nontx.actParams[j].param = NULL; + pWrkrInfo->p.nontx.actParams[j].lenBuf = 0; + pWrkrInfo->p.nontx.actParams[j].lenStr = 0; + break; + case ACT_STRING_PASSING: + case ACT_MSG_PASSING: + /* no need to do anything with these */ + break; + } + } + } + + return; +} + + +/* This is used in resume processing. We only finally know that a resume + * worked when we have been able to actually process a messages. As such, + * we need to do some cleanup and status tracking in that case. + */ +static void +actionSetActionWorked(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) +{ + setActionResumeInRow(pWti, pThis, 0); +} + +static rsRetVal +handleActionExecResult(action_t *__restrict__ const pThis, + wti_t *__restrict__ const pWti, + const rsRetVal ret) +{ + DEFiRet; + switch(ret) { + case RS_RET_OK: + actionCommitted(pThis, pWti); + actionSetActionWorked(pThis, pWti); /* we had a successful call! */ + break; + case RS_RET_DEFER_COMMIT: + actionSetActionWorked(pThis, pWti); /* we had a successful call! */ + /* we are done, action state remains the same */ + break; + case RS_RET_PREVIOUS_COMMITTED: + /* action state remains the same, but we had a commit. */ + pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit = 1; + actionSetActionWorked(pThis, pWti); /* we had a successful call! */ + break; + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + case RS_RET_SUSPENDED: + actionRetry(pThis, pWti); + break; + default:/* error happened - if it hits us here, we assume the message cannot + * be processed but an retry makes no sense. Usually, this should be + * return code RS_RET_DATAFAIL. -- rgerhards, 2017-10-06 + */ + LogError(0, ret, "action '%s' (module '%s') " + "message lost, could not be processed. Check for " + "additional error messages before this one.", + pThis->pszName, pThis->pMod->pszName); + actionSetState(pThis, pWti, ACT_STATE_DATAFAIL); + break; + } + iRet = getReturnCode(pThis, pWti); + + RETiRet; +} + +/* call the DoAction output plugin entry point + * rgerhards, 2008-01-28 + */ +static rsRetVal +actionCallDoAction(action_t *__restrict__ const pThis, + actWrkrIParams_t *__restrict__ const iparams, + wti_t *__restrict__ const pWti) +{ + void *param[CONF_OMOD_NUMSTRINGS_MAXSIZE]; + int i; + DEFiRet; + + DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", + getActStateName(pThis, pWti), pThis->iActionNbr); + + pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit = 0; + /* for this interface, we need to emulate the old style way + * of parameter passing. + */ + for(i = 0 ; i < pThis->iNumTpls ; ++i) { + param[i] = actParam(iparams, pThis->iNumTpls, 0, i).param; + } + + iRet = pThis->pMod->mod.om.doAction(param, + pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); + iRet = handleActionExecResult(pThis, pWti, iRet); + RETiRet; +} + + +/* call the commitTransaction output plugin entry point */ +static rsRetVal ATTR_NONNULL() +actionCallCommitTransaction(action_t * const pThis, + wti_t *const pWti, + actWrkrIParams_t *__restrict__ const iparams, const int nparams) +{ + DEFiRet; + + DBGPRINTF("entering actionCallCommitTransaction[%s], state: %s, nMsgs %u\n", + pThis->pszName, getActStateName(pThis, pWti), nparams); + + iRet = pThis->pMod->mod.om.commitTransaction( + pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData, + iparams, nparams); + DBGPRINTF("actionCallCommitTransaction[%s] state: %s " + "mod commitTransaction returned %d\n", + pThis->pszName, getActStateName(pThis, pWti), iRet); + iRet = handleActionExecResult(pThis, pWti, iRet); + RETiRet; +} + + +/* process a message + * this readies the action and then calls doAction() + * rgerhards, 2008-01-28 + */ +static rsRetVal +actionProcessMessage(action_t * const pThis, void *actParams, wti_t * const pWti) +{ + DEFiRet; + + CHKiRet(actionPrepare(pThis, pWti)); + if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) + pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pWti->pbShutdownImmediate); + if(getActionState(pWti, pThis) == ACT_STATE_ITX) + CHKiRet(actionCallDoAction(pThis, actParams, pWti)); + + iRet = getReturnCode(pThis, pWti); +finalize_it: + RETiRet; +} + + +/* the following function uses the new-style transactional interface */ +static rsRetVal +doTransaction(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti, + actWrkrIParams_t *__restrict__ const iparams, const int nparams) +{ + actWrkrInfo_t *wrkrInfo; + int i; + DEFiRet; + + wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); + if(pThis->pMod->mod.om.commitTransaction != NULL) { + DBGPRINTF("doTransaction: have commitTransaction IF, using that, pWrkrInfo %p\n", wrkrInfo); + CHKiRet(actionCallCommitTransaction(pThis, pWti, iparams, nparams)); + } else { /* note: this branch is for compatibility with old TX modules */ + DBGPRINTF("doTransaction: action '%s', currIParam %d\n", + pThis->pszName, wrkrInfo->p.tx.currIParam); + for(i = 0 ; i < nparams ; ++i) { + /* Note: we provide the message's base iparam - actionProcessMessage() + * uses this as *base* address. + */ + iRet = actionProcessMessage(pThis, + &actParam(iparams, pThis->iNumTpls, i, 0), pWti); + DBGPRINTF("doTransaction: action %d, processing msg %d, result %d\n", + pThis->iActionNbr, i,iRet); + if(iRet == RS_RET_SUSPENDED) { + --i; /* we need to re-submit */ + /* note: we are suspended and need to retry. In order not to + * hammer the CPU, we now do a voluntarly wait of 1 second. + * The rest will be handled by the standard retry handler. + */ + srSleep(1, 0); + } else if(iRet != RS_RET_DEFER_COMMIT && iRet != RS_RET_PREVIOUS_COMMITTED && + iRet != RS_RET_OK) { + FINALIZE; /* let upper peer handle the error condition! */ + } + } + } +finalize_it: + if(iRet == RS_RET_DEFER_COMMIT || iRet == RS_RET_PREVIOUS_COMMITTED) + iRet = RS_RET_OK; /* this is expected for transactional action! */ + RETiRet; +} + + +/* Commit try committing (do not handle retry processing and such) */ +static rsRetVal ATTR_NONNULL() +actionTryCommit(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti, + actWrkrIParams_t *__restrict__ const iparams, const int nparams) +{ + DEFiRet; + + DBGPRINTF("actionTryCommit[%s] enter\n", pThis->pszName); + CHKiRet(actionPrepare(pThis, pWti)); + + CHKiRet(doTransaction(pThis, pWti, iparams, nparams)); + + if(getActionState(pWti, pThis) == ACT_STATE_ITX) { + iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); + switch(iRet) { + case RS_RET_OK: + actionCommitted(pThis, pWti); + break; + case RS_RET_SUSPENDED: + actionRetry(pThis, pWti); + break; + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + case RS_RET_DEFER_COMMIT: + DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT " + "- ignored\n"); + actionCommitted(pThis, pWti); + break; + case RS_RET_PREVIOUS_COMMITTED: + DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED " + "- ignored\n"); + actionCommitted(pThis, pWti); + break; + default:/* permanent failure of this message - no sense in retrying. This is + * not yet handled (but easy TODO) + */ + DBGPRINTF("action[%s]: actionTryCommit receveived iRet %d\n", + pThis->pszName, iRet); + FINALIZE; + } + } + iRet = getReturnCode(pThis, pWti); + +finalize_it: + RETiRet; +} + +/* If a transcation failed, we write the error file (if configured). + */ +static void ATTR_NONNULL() +actionWriteErrorFile(action_t *__restrict__ const pThis, const rsRetVal ret, + actWrkrIParams_t *__restrict__ const iparams, const int nparams) +{ + fjson_object *etry=NULL; + int bNeedUnlock = 0; + + STATSCOUNTER_INC(pThis->ctrFail, pThis->mutCtrFail); + + if(pThis->pszErrFile == NULL) { + DBGPRINTF("action %s: commit failed, no error file set, silently " + "discarding %d messages\n", pThis->pszName, nparams); + goto done; + } + + DBGPRINTF("action %d commit failed, writing %u messages (%d tpls) to error file\n", + pThis->iActionNbr, nparams, pThis->iNumTpls); + + pthread_mutex_lock(&pThis->mutErrFile); + bNeedUnlock = 1; + + if(pThis->fdErrFile == -1) { + pThis->fdErrFile = open(pThis->pszErrFile, + O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, + S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); + if(pThis->fdErrFile == -1) { + LogError(errno, RS_RET_ERR, "action %s: error opening error file %s", + pThis->pszName, pThis->pszErrFile); + goto done; + } + if (pThis->maxErrFileSize > 0) { + struct stat statbuf; + if (fstat(pThis->fdErrFile, &statbuf) == -1) { + LogError(errno, RS_RET_ERR, "failed to fstat %s", pThis->pszErrFile); + goto done; + } + pThis->currentErrFileSize = statbuf.st_size; + } + } + + for(int i = 0 ; i < nparams ; ++i) { + if((etry = fjson_object_new_object()) == NULL) + goto done; + fjson_object_object_add(etry, "action", fjson_object_new_string((char*)pThis->pszName)); + fjson_object_object_add(etry, "status", fjson_object_new_int(ret)); + for(int j = 0 ; j < pThis->iNumTpls ; ++j) { + char tplname[20]; + snprintf(tplname, sizeof(tplname), "template%d", j); + tplname[sizeof(tplname)-1] = '\0'; + fjson_object_object_add(etry, tplname, + fjson_object_new_string((char*)actParam(iparams, 1, i, j).param)); + } + + char *const rendered = strdup((char*)fjson_object_to_json_string(etry)); + if(rendered == NULL) + goto done; + + size_t toWrite = strlen(rendered) + 1; + // Check if need to truncate the amount of bytes to write + if (pThis->maxErrFileSize > 0) { + if (pThis->currentErrFileSize + toWrite > pThis->maxErrFileSize) { + // Truncate to the pending available + toWrite = pThis->maxErrFileSize - pThis->currentErrFileSize; + } + pThis->currentErrFileSize += toWrite; + } + if(toWrite > 0) { + /* note: we use the '\0' inside the string to store a LF - we do not + * otherwise need it and it safes us a copy/realloc. + */ + rendered[toWrite-1] = '\n'; /* NO LONGER A STRING! */ + const ssize_t wrRet = write(pThis->fdErrFile, rendered, toWrite); + if(wrRet != (ssize_t) toWrite) { + LogError(errno, RS_RET_IO_ERROR, + "action %s: error writing errorFile %s, write returned %lld", + pThis->pszName, pThis->pszErrFile, (long long) wrRet); + } + } + free(rendered); + + fjson_object_put(etry); + etry = NULL; + } +done: + if(bNeedUnlock) { + pthread_mutex_unlock(&pThis->mutErrFile); + } + fjson_object_put(etry); + return; +} + + +static rsRetVal +actionTryRemoveHardErrorsFromBatch(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti, + actWrkrIParams_t *const new_iparams, unsigned *new_nMsgs) +{ + actWrkrInfo_t *const wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); + const unsigned nMsgs = wrkrInfo->p.tx.currIParam; + actWrkrIParams_t oneParamSet[CONF_OMOD_NUMSTRINGS_MAXSIZE]; + rsRetVal ret; + DEFiRet; + + *new_nMsgs = 0; + for(unsigned i = 0 ; i < nMsgs ; ++i) { + setActionResumeInRow(pWti, pThis, 0); // make sure we do not trigger OK-as-SUSPEND handling + memcpy(&oneParamSet, &actParam(wrkrInfo->p.tx.iparams, pThis->iNumTpls, i, 0), + sizeof(actWrkrIParams_t) * pThis->iNumTpls); + ret = actionTryCommit(pThis, pWti, oneParamSet, 1); + if(ret == RS_RET_SUSPENDED) { + memcpy(new_iparams + (*new_nMsgs * pThis->iNumTpls), &oneParamSet, + sizeof(actWrkrIParams_t) * pThis->iNumTpls); + ++(*new_nMsgs); + } else if(ret != RS_RET_OK) { + actionWriteErrorFile(pThis, ret, oneParamSet, 1); + } + } + RETiRet; +} + +/* Note: we currently need to return an iRet, as this is used in + * direct mode. TODO: However, it may be worth further investigating this, + * as it looks like there is no ultimate consumer of this code. + * rgerhards, 2013-11-06 + */ +static rsRetVal ATTR_NONNULL() +actionCommit(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) +{ + actWrkrInfo_t *const wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); + /* Variables that permit us to override the batch of messages */ + unsigned nMsgs = 0; + actWrkrIParams_t *iparams = NULL; + int needfree_iparams = 0; // work-around for clang static analyzer false positive + DEFiRet; + + DBGPRINTF("actionCommit[%s]: enter, %d msgs\n", pThis->pszName, wrkrInfo->p.tx.currIParam); + if(!pThis->isTransactional || pWti->actWrkrInfo[pThis->iActionNbr].p.tx.currIParam == 0) { + FINALIZE; + } else if(getActionState(pWti, pThis) == ACT_STATE_SUSP) { + /* if we are suspended, we already tried everything to recover the + * action - and failed. So all we can do here is write the error file. + */ + actionWriteErrorFile(pThis, iRet, wrkrInfo->p.tx.iparams, wrkrInfo->p.tx.currIParam); + FINALIZE; + } + DBGPRINTF("actionCommit[%s]: processing...\n", pThis->pszName); + + /* we now do one try at commiting the whole batch. Usually, this will + * succeed. If so, we are happy and done. If not, we dig into the details + * of finding out if we have a non-temporary error and try to handle this + * as well as retry processing. Due to this logic we do a bit more retries + * than configured (if temporary failure), but this unavoidable and should + * do no real harm. - rgerhards, 2017-10-06 + */ + iRet = actionTryCommit(pThis, pWti, wrkrInfo->p.tx.iparams, wrkrInfo->p.tx.currIParam); +DBGPRINTF("actionCommit[%s]: return actionTryCommit %d\n", pThis->pszName, iRet); + if(iRet == RS_RET_OK) { + FINALIZE; + } + + /* check if this was a single-message batch. If it had a datafail error, we + * are done. If it is a multi-message batch, we need to sort out the individual + * message states. + */ + if(wrkrInfo->p.tx.currIParam == 1) { + needfree_iparams = 0; + iparams = wrkrInfo->p.tx.iparams; + nMsgs = wrkrInfo->p.tx.currIParam; + if(iRet == RS_RET_DATAFAIL) { + FINALIZE; + } + } else { + DBGPRINTF("actionCommit[%s]: somewhat unhappy, full batch of %d msgs returned " + "status %d. Trying messages as individual actions.\n", + pThis->pszName, wrkrInfo->p.tx.currIParam, iRet); + CHKmalloc(iparams = malloc(sizeof(actWrkrIParams_t) * pThis->iNumTpls + * wrkrInfo->p.tx.currIParam)); + needfree_iparams = 1; + actionTryRemoveHardErrorsFromBatch(pThis, pWti, iparams, &nMsgs); + } + + if(nMsgs == 0) { + ABORT_FINALIZE(RS_RET_OK); // here, we consider everyting OK + } + + /* We still have some messages with suspend error. So now let's do our + * "regular" retry and suspend processing. + */ + DBGPRINTF("actionCommit[%s]: unhappy, we still have %d uncommitted messages.\n", + pThis->pszName, nMsgs); + int bDone = 0; + do { + iRet = actionTryCommit(pThis, pWti, iparams, nMsgs); + DBGPRINTF("actionCommit[%s]: in retry loop, iRet %d\n", + pThis->pszName, iRet); + if(iRet == RS_RET_FORCE_TERM) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } else if(iRet == RS_RET_SUSPENDED) { + iRet = actionDoRetry(pThis, pWti); + DBGPRINTF("actionCommit[%s]: actionDoRetry returned %d\n", + pThis->pszName, iRet); + if(iRet == RS_RET_FORCE_TERM) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } else if(iRet != RS_RET_OK) { + actionWriteErrorFile(pThis, iRet, iparams, nMsgs); + bDone = 1; + } + continue; + } else if(iRet == RS_RET_OK || + iRet == RS_RET_SUSPENDED || + iRet == RS_RET_ACTION_FAILED) { + bDone = 1; + } + if(getActionState(pWti, pThis) == ACT_STATE_RDY || + getActionState(pWti, pThis) == ACT_STATE_SUSP) { + bDone = 1; + } + } while(!bDone); +finalize_it: + DBGPRINTF("actionCommit[%s]: done, iRet %d\n", pThis->pszName, iRet); + if(needfree_iparams) { + free(iparams); + } + wrkrInfo->p.tx.currIParam = 0; /* reset to beginning */ + RETiRet; +} + +/* Commit all active transactions in *DIRECT mode* */ +void ATTR_NONNULL() +actionCommitAllDirect(wti_t *__restrict__ const pWti) +{ + int i; + action_t *pAction; + + for(i = 0 ; i < runConf->actions.iActionNbr ; ++i) { + pAction = pWti->actWrkrInfo[i].pAction; + if(pAction == NULL) + continue; + DBGPRINTF("actionCommitAllDirect: action %d, state %u, nbr to commit %d " + "isTransactional %d\n", + i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo->p.tx.currIParam, + pAction->isTransactional); + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + actionCommit(pAction, pWti); + } +} + +/* process a single message. This is both called if we run from the + * consumer side of an action queue as well as directly from the main + * queue thread if the action queue is set to "direct". + */ +static rsRetVal +processMsgMain(action_t *__restrict__ const pAction, + wti_t *__restrict__ const pWti, + smsg_t *__restrict__ const pMsg, + struct syslogTime *ttNow) +{ + DEFiRet; + + CHKiRet(prepareDoActionParams(pAction, pWti, pMsg, ttNow)); + + if(pAction->isTransactional) { + pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction; + DBGPRINTF("action '%s': is transactional - executing in commit phase\n", pAction->pszName); + actionPrepare(pAction, pWti); + iRet = getReturnCode(pAction, pWti); + FINALIZE; + } + + iRet = actionProcessMessage(pAction, + pWti->actWrkrInfo[pAction->iActionNbr].p.nontx.actParams, + pWti); + if(pAction->bNeedReleaseBatch) + releaseDoActionParams(pAction, pWti, 0); +finalize_it: + if(iRet == RS_RET_OK) { + if(pWti->execState.bDoAutoCommit) + iRet = actionCommit(pAction, pWti); + } + RETiRet; +} + +/* This entry point is called by the ACTION queue (not main queue!) + */ +static rsRetVal ATTR_NONNULL() +processBatchMain(void *__restrict__ const pVoid, + batch_t *__restrict__ const pBatch, + wti_t *__restrict__ const pWti) +{ + action_t *__restrict__ const pAction = (action_t*__restrict__ const) pVoid; + int i; + struct syslogTime ttNow; + DEFiRet; + + wtiResetExecState(pWti, pBatch); + /* indicate we have not yet read the date */ + ttNow.year = 0; + + for(i = 0 ; i < batchNumMsgs(pBatch) && !*pWti->pbShutdownImmediate ; ++i) { + if(batchIsValidElem(pBatch, i)) { + /* we do not check error state below, because aborting would be + * more harmful than continuing. + */ + rsRetVal localRet = processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow); + DBGPRINTF("processBatchMain: i %d, processMsgMain iRet %d\n", i, localRet); + if( localRet == RS_RET_OK + || localRet == RS_RET_DEFER_COMMIT + || localRet == RS_RET_ACTION_FAILED + || localRet == RS_RET_PREVIOUS_COMMITTED ) { + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + DBGPRINTF("processBatchMain: i %d, COMM state set\n", i); + } + } + } + + iRet = actionCommit(pAction, pWti); + RETiRet; +} + + +/* remove an action worker instance from our table of + * workers. To be called from worker handler (wti). + */ +void +actionRemoveWorker(action_t *const __restrict__ pAction, + void *const __restrict__ actWrkrData) +{ + pthread_mutex_lock(&pAction->mutWrkrDataTable); + pAction->nWrkr--; + for(int w = 0 ; w < pAction->wrkrDataTableSize ; ++w) { + if(pAction->wrkrDataTable[w] == actWrkrData) { + pAction->wrkrDataTable[w] = NULL; + break; /* done */ + } + } + pthread_mutex_unlock(&pAction->mutWrkrDataTable); +} + + +/* call the HUP handler for a given action, if such a handler is defined. + * Note that the action must be able to service HUP requests concurrently + * to any current doAction() processing. + */ +rsRetVal +actionCallHUPHdlr(action_t * const pAction) +{ + DEFiRet; + + assert(pAction != NULL); + DBGPRINTF("Action %p checks HUP hdlr, act level: %p, wrkr level %p\n", + pAction, pAction->pMod->doHUP, pAction->pMod->doHUPWrkr); + + if(pAction->pMod->doHUP != NULL) { + CHKiRet(pAction->pMod->doHUP(pAction->pModData)); + } + + if(pAction->pMod->doHUPWrkr != NULL) { + pthread_mutex_lock(&pAction->mutWrkrDataTable); + for(int i = 0 ; i < pAction->wrkrDataTableSize ; ++i) { + dbgprintf("HUP: table entry %d: %p %s\n", i, + pAction->wrkrDataTable[i], + pAction->wrkrDataTable[i] == NULL ? "[unused]" : ""); + if(pAction->wrkrDataTable[i] != NULL) { + const rsRetVal localRet + = pAction->pMod->doHUPWrkr(pAction->wrkrDataTable[i]); + if(localRet != RS_RET_OK) { + DBGPRINTF("HUP handler returned error state %d - " + "ignored\n", localRet); + } + } + } + pthread_mutex_unlock(&pAction->mutWrkrDataTable); + } + +finalize_it: + RETiRet; +} + + +/* set the action message queue mode + * TODO: probably move this into queue object, merge with MainMsgQueue! + * rgerhards, 2008-01-28 + */ +static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszType) +{ + DEFiRet; + + if (!strcasecmp((char *) pszType, "fixedarray")) { + cs.ActionQueType = QUEUETYPE_FIXED_ARRAY; + DBGPRINTF("action queue type set to FIXED_ARRAY\n"); + } else if (!strcasecmp((char *) pszType, "linkedlist")) { + cs.ActionQueType = QUEUETYPE_LINKEDLIST; + DBGPRINTF("action queue type set to LINKEDLIST\n"); + } else if (!strcasecmp((char *) pszType, "disk")) { + cs.ActionQueType = QUEUETYPE_DISK; + DBGPRINTF("action queue type set to DISK\n"); + } else if (!strcasecmp((char *) pszType, "direct")) { + cs.ActionQueType = QUEUETYPE_DIRECT; + DBGPRINTF("action queue type set to DIRECT (no queueing at all)\n"); + } else { + LogError(0, RS_RET_INVALID_PARAMS, "unknown actionqueue parameter: %s", (char *) pszType); + iRet = RS_RET_INVALID_PARAMS; + } + free(pszType); /* no longer needed */ + + RETiRet; +} + + +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. This is also utilized to submit messages in more complex cases once + * the complex logic has been applied ;) + * rgerhards, 2010-06-08 + */ +static rsRetVal ATTR_NONNULL() +doSubmitToActionQ(action_t * const pAction, wti_t * const pWti, smsg_t *pMsg) +{ + struct syslogTime ttNow; // TODO: think if we can buffer this in pWti + DEFiRet; + + DBGPRINTF("action '%s': called, logging to %s (susp %d/%d, direct q %d)\n", + pAction->pszName, module.GetStateName(pAction->pMod), + pAction->bExecWhenPrevSusp, pWti->execState.bPrevWasSuspended, + pAction->pQueue->qType == QUEUETYPE_DIRECT); + + if( pAction->bExecWhenPrevSusp + && !pWti->execState.bPrevWasSuspended) { + DBGPRINTF("action '%s': NOT executing, as previous action was " + "not suspended\n", pAction->pszName); + FINALIZE; + } + + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { + ttNow.year = 0; + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow); + } else {/* in this case, we do single submits to the queue. + * TODO: optimize this, we may do at least a multi-submit! + */ + iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, + pAction->bCopyMsg ? MsgDup(pMsg) : MsgAddRef(pMsg)); + } + pWti->execState.bPrevWasSuspended = + (iRet == RS_RET_SUSPENDED || iRet == RS_RET_ACTION_FAILED); + + if (iRet == RS_RET_ACTION_FAILED) /* Increment failed counter */ + STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail); + + DBGPRINTF("action '%s': set suspended state to %d\n", + pAction->pszName, pWti->execState.bPrevWasSuspended); + +finalize_it: + RETiRet; +} + + +/* This function builds up a batch of messages to be (later) + * submitted to the action queue. + * Important: this function MUST not be called with messages that are to + * be discarded due to their "prevWasSuspended" state. It will not check for + * this and submit all messages to the queue for execution. So these must + * be filtered out before calling us (what is done currently!). + */ +rsRetVal +actionWriteToAction(action_t * const pAction, smsg_t *pMsg, wti_t * const pWti) +{ + DEFiRet; + + /* first, we check if the action should actually be called. The action-specific + * $ActionExecOnlyEveryNthTime permits us to execute an action only every Nth + * time. So we need to check if we need to drop the (otherwise perfectly executable) + * action for this reason. Note that in case we need to drop it, we return RS_RET_OK + * as the action was properly "passed to execution" from the upper layer's point + * of view. -- rgerhards, 2008-08-07. + */ + if(pAction->iExecEveryNthOccur > 1) { + /* we need to care about multiple occurrences */ + if( pAction->iExecEveryNthOccurTO > 0 + && (getActNow(pAction) - pAction->tLastOccur) > pAction->iExecEveryNthOccurTO) { + DBGPRINTF("n-th occurrence handling timed out (%d sec), restarting from 0\n", + (int) (getActNow(pAction) - pAction->tLastOccur)); + pAction->iNbrNoExec = 0; + pAction->tLastOccur = getActNow(pAction); + } + if(pAction->iNbrNoExec < pAction->iExecEveryNthOccur - 1) { + ++pAction->iNbrNoExec; + DBGPRINTF("action %p passed %d times to execution - less than configured - discarding\n", + pAction, pAction->iNbrNoExec); + FINALIZE; + } else { + pAction->iNbrNoExec = 0; /* we execute the action now, so the number of no execs is down to */ + } + } + + DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); + + /* now check if we need to drop the message because otherwise the action would be too + * frequently called. -- rgerhards, 2008-04-08 + * Note that the check for "pAction->iSecsExecOnceInterval > 0" is not necessary from + * a purely logical point of view. However, if safes us to check the system time in + * (those common) cases where ExecOnceInterval is not used. -- rgerhards, 2008-09-16 + */ + if(pAction->iSecsExecOnceInterval > 0 && + pAction->iSecsExecOnceInterval + pAction->tLastExec > getActNow(pAction)) { + /* in this case we need to discard the message - its not yet time to exec the action */ + DBGPRINTF("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n", + (int) pAction->iSecsExecOnceInterval, (int) getActNow(pAction), + (int) (pAction->iSecsExecOnceInterval + pAction->tLastExec)); + FINALIZE; + } + + /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) + * rgerhards, 2008-09-17 */ + pAction->tLastExec = getActNow(pAction); /* re-init time flags */ + pAction->f_time = pMsg->ttGenTime; + + /* When we reach this point, we have a valid, non-disabled action. + * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 + */ + iRet = doSubmitToActionQ(pAction, pWti, pMsg); + +finalize_it: + RETiRet; +} + + +/* Call configured action, most complex case with all features supported (and thus slow). + * rgerhards, 2010-06-08 + */ + +PRAGMA_DIAGNOSTIC_PUSH; +PRAGMA_IGNORE_Wempty_body; +static rsRetVal +doSubmitToActionQComplex(action_t * const pAction, wti_t * const pWti, smsg_t *pMsg) +{ + DEFiRet; + + d_pthread_mutex_lock(&pAction->mutAction); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); + DBGPRINTF("Called action %p (complex case), logging to %s\n", + pAction, module.GetStateName(pAction->pMod)); + + pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ + // TODO: can we optimize the "now" handling again (was batch, I guess...)? + + /* don't output marks to recently written outputs */ + if(pAction->bWriteAllMarkMsgs == 0 + && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { + ABORT_FINALIZE(RS_RET_OK); + } + + /* call the output driver */ + iRet = actionWriteToAction(pAction, pMsg, pWti); + +finalize_it: + d_pthread_mutex_unlock(&pAction->mutAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ + + RETiRet; +} +PRAGMA_DIAGNOSTIC_POP + + +/* helper to activateActions, it activates a specific action. + */ +DEFFUNC_llExecFunc(doActivateActions) +{ + rsRetVal localRet; + action_t * const pThis = (action_t*) pData; + localRet = qqueueStart(runConf, pThis->pQueue); + if(localRet != RS_RET_OK) { + if(runConf->globals.bAbortOnFailedQueueStartup) { + fprintf(stderr, "rsyslogd: error %d starting up action queue, " + "abortOnFailedQueueStartup is set, so we abort rsyslog now.", localRet); + fflush(stderr); + exit(1); /* "good" exit, this is intended here */ + } + LogError(0, localRet, "error starting up action queue"); + if(localRet == RS_RET_FILE_PREFIX_MISSING) { + LogError(0, localRet, "file prefix (work directory?) " + "is missing"); + } + actionDisable(pThis); + } + DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod), + pThis, pThis->pQueue); + return RS_RET_OK; /* we ignore errors, we can not do anything either way */ +} + + +/* This function "activates" the action after privileges have been dropped. Currently, + * this means that the queues are started. + * rgerhards, 2011-05-02 + */ +rsRetVal +activateActions(void) +{ + DEFiRet; + iRet = ruleset.IterateAllActions(runConf, doActivateActions, NULL); + RETiRet; +} + + + +/* This submits the message to the action queue in case where we need to handle + * bWriteAllMarkMessage == RSFALSE only. Note that we use a non-blocking CAS loop + * for the synchronization. Here, we just modify the filter condition to be false when + * a mark message must not be written. However, in this case we must save the previous + * filter as we may need it in the next action (potential future optimization: check if this is + * the last action TODO). + * rgerhards, 2010-06-08 + */ +static rsRetVal +doSubmitToActionQNotAllMark(action_t * const pAction, wti_t * const pWti, smsg_t * const pMsg) +{ + int doProcess = 1; + time_t lastAct; + DEFiRet; + + /* TODO: think about the whole logic. If messages come in out of order, things + * tend to become a bit unreliable. On the other hand, this only happens if we have + * very high traffic, in which this use case here is not really affected (as the + * MarkInterval is pretty corase). + */ + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if(pMsg->msgFlags & MARK) { + if((pMsg->ttGenTime - lastAct) < MarkInterval / 2) { + doProcess = 0; + DBGPRINTF("action was recently called, ignoring mark message\n"); + break; /* do not update timestamp for non-written mark messages */ + } + } + } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, + pMsg->ttGenTime, &pAction->mutCAS) == 0); + + if(doProcess) { + DBGPRINTF("Called action(NotAllMark), processing via '%s'\n", + module.GetStateName(pAction->pMod)); + iRet = doSubmitToActionQ(pAction, pWti, pMsg); + } + + RETiRet; +} + + +/* apply all params from param block to action. This supports the v6 config system. + * Defaults must have been set appropriately during action construct! + * rgerhards, 2011-08-01 + */ +static rsRetVal +actionApplyCnfParam(action_t * const pAction, struct cnfparamvals * const pvals) +{ + int i; + + for(i = 0 ; i < pblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(pblk.descr[i].name, "name")) { + pAction->pszName = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(pblk.descr[i].name, "type")) { + continue; /* this is handled seperately during module select! */ + } else if(!strcmp(pblk.descr[i].name, "action.errorfile")) { + pAction->pszErrFile = es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(pblk.descr[i].name, "action.errorfile.maxsize")) { + pAction->maxErrFileSize = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.externalstate.file")) { + pAction->pszExternalStateFile = es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(pblk.descr[i].name, "action.writeallmarkmessages")) { + pAction->bWriteAllMarkMsgs = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.execonlyeverynthtime")) { + pAction->iExecEveryNthOccur = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.execonlyeverynthtimetimeout")) { + pAction->iExecEveryNthOccurTO = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.execonlyonceeveryinterval")) { + pAction->iSecsExecOnceInterval = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.execonlywhenpreviousissuspended")) { + pAction->bExecWhenPrevSusp = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.repeatedmsgcontainsoriginalmsg")) { + pAction->bRepMsgHasMsg = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.resumeretrycount")) { + pAction->iResumeRetryCount = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.reportsuspension")) { + pAction->bReportSuspension = (int) pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.reportsuspensioncontinuation")) { + pAction->bReportSuspensionCont = (int) pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.copymsg")) { + pAction->bCopyMsg = (int) pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.resumeinterval")) { + pAction->iResumeInterval = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.resumeintervalmax")) { + pAction->iResumeIntervalMax = pvals[i].val.d.n; + } else { + dbgprintf("action: program error, non-handled " + "param '%s'\n", pblk.descr[i].name); + } + } + return RS_RET_OK; +} + + +/* add an Action to the current selector + * The pOMSR is freed, as it is not needed after this function. + * Note: this function pulls global data that specifies action config state. + * rgerhards, 2007-07-27 + */ +rsRetVal +addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, + omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, + struct nvlst * const lst) +{ + DEFiRet; + int i; + int iTplOpts; + uchar *pTplName; + action_t *pAction; + char errMsg[512]; + + assert(ppAction != NULL); + assert(pMod != NULL); + assert(pOMSR != NULL); + DBGPRINTF("Module %s processes this action.\n", module.GetName(pMod)); + + CHKiRet(actionConstruct(&pAction)); /* create action object first */ + pAction->pMod = pMod; + pAction->pModData = pModData; + if(actParams == NULL) { /* use legacy systemn */ + pAction->pszName = cs.pszActionName; + pAction->iResumeInterval = cs.glbliActionResumeInterval; + pAction->iResumeRetryCount = cs.glbliActionResumeRetryCount; + pAction->bWriteAllMarkMsgs = cs.bActionWriteAllMarkMsgs; + pAction->bExecWhenPrevSusp = cs.bActExecWhenPrevSusp; + pAction->iSecsExecOnceInterval = cs.iActExecOnceInterval; + pAction->iExecEveryNthOccur = cs.iActExecEveryNthOccur; + pAction->iExecEveryNthOccurTO = cs.iActExecEveryNthOccurTO; + pAction->bRepMsgHasMsg = cs.bActionRepMsgHasMsg; + cs.iActExecEveryNthOccur = 0; /* auto-reset */ + cs.iActExecEveryNthOccurTO = 0; /* auto-reset */ + cs.bActionWriteAllMarkMsgs = 1; /* auto-reset */ + cs.pszActionName = NULL; /* free again! */ + } else { + actionApplyCnfParam(pAction, actParams); + } + + /* check if we can obtain the template pointers - TODO: move to separate function? */ + pAction->iNumTpls = OMSRgetEntryCount(pOMSR); + assert(pAction->iNumTpls >= 0); /* only debug check because this "can not happen" */ + /* please note: iNumTpls may validly be zero. This is the case if the module + * does not request any templates. This sounds unlikely, but an actual example is + * the discard action, which does not require a string. -- rgerhards, 2007-07-30 + */ + if(pAction->iNumTpls > 0) { + /* we first need to create the template arrays */ + CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *))); + CHKmalloc(pAction->peParamPassing = (paramPassing_t*)calloc(pAction->iNumTpls, + sizeof(paramPassing_t))); + } + + pAction->bUsesMsgPassingMode = 0; + pAction->bNeedReleaseBatch = 0; + for(i = 0 ; i < pAction->iNumTpls ; ++i) { + CHKiRet(OMSRgetEntry(pOMSR, i, &pTplName, &iTplOpts)); + /* Ok, we got everything, so it now is time to look up the template + * (Hint: templates MUST be defined before they are used!) + */ + if(!(iTplOpts & OMSR_TPL_AS_MSG)) { + if((pAction->ppTpl[i] = + tplFind(loadConf, (char*)pTplName, strlen((char*)pTplName))) == NULL) { + snprintf(errMsg, sizeof(errMsg), + " Could not find template %d '%s' - action disabled", + i, pTplName); + errno = 0; + LogError(0, RS_RET_NOT_FOUND, "%s", errMsg); + ABORT_FINALIZE(RS_RET_NOT_FOUND); + } + /* check required template options */ + if( (iTplOpts & OMSR_RQD_TPL_OPT_SQL) + && (pAction->ppTpl[i]->optFormatEscape == 0)) { + errno = 0; + LogError(0, RS_RET_RQD_TPLOPT_MISSING, "Action disabled." + " To use this action, you have to specify " + "the SQL or stdSQL option in your template!\n"); + ABORT_FINALIZE(RS_RET_RQD_TPLOPT_MISSING); + } + } + + /* set parameter-passing mode */ + if(iTplOpts & OMSR_TPL_AS_ARRAY) { + ABORT_FINALIZE(RS_RET_ERR); + } else if(iTplOpts & OMSR_TPL_AS_MSG) { + pAction->peParamPassing[i] = ACT_MSG_PASSING; + pAction->bUsesMsgPassingMode = 1; + } else if(iTplOpts & OMSR_TPL_AS_JSON) { + pAction->peParamPassing[i] = ACT_JSON_PASSING; + pAction->bNeedReleaseBatch = 1; + } else { + pAction->peParamPassing[i] = ACT_STRING_PASSING; + } + + DBGPRINTF("template: '%s' assigned\n", pTplName); + } + + pAction->pMod = pMod; + pAction->pModData = pModData; + + CHKiRet(actionConstructFinalize(pAction, lst)); + + *ppAction = pAction; /* finally store the action pointer */ + +finalize_it: + if(iRet == RS_RET_OK) + iRet = OMSRdestruct(pOMSR); + else { + /* do not overwrite error state! */ + OMSRdestruct(pOMSR); + if(pAction != NULL) + actionDestruct(pAction); + } + + RETiRet; +} + + +/* Reset config variables to default values. + * rgerhards, 2009-11-12 + */ +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + cs.iActExecOnceInterval = 0; + cs.bActExecWhenPrevSusp = 0; + return RS_RET_OK; +} + + +/* initialize (current) config variables. + * Used at program start and when a new scope is created. + */ +static void +initConfigVariables(void) +{ + cs.bActionWriteAllMarkMsgs = 1; + cs.glbliActionResumeRetryCount = 0; + cs.bActExecWhenPrevSusp = 0; + cs.iActExecOnceInterval = 0; + cs.iActExecEveryNthOccur = 0; + cs.iActExecEveryNthOccurTO = 0; + cs.glbliActionResumeInterval = 30; + cs.glbliActionResumeRetryCount = 0; + cs.bActionRepMsgHasMsg = 0; + if(cs.pszActionName != NULL) { + free(cs.pszActionName); + cs.pszActionName = NULL; + } + actionResetQueueParams(); +} + + +rsRetVal +actionNewInst(struct nvlst *lst, action_t **ppAction) +{ + struct cnfparamvals *paramvals; + modInfo_t *pMod; + uchar *cnfModName = NULL; + omodStringRequest_t *pOMSR; + void *pModData; + action_t *pAction; + DEFiRet; + + paramvals = nvlstGetParams(lst, &pblk, NULL); + if(paramvals == NULL) { + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + } + dbgprintf("action param blk after actionNewInst:\n"); + cnfparamsPrint(&pblk, paramvals); + cnfModName = (uchar*)es_str2cstr(paramvals[cnfparamGetIdx(&pblk, ("type"))].val.d.estr, NULL); + if((pMod = module.FindWithCnfName(loadConf, cnfModName, eMOD_OUT)) == NULL) { + LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName); + ABORT_FINALIZE(RS_RET_MOD_UNKNOWN); + } + CHKiRet(pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR)); + + if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst)) == RS_RET_OK) { + /* check if the module is compatible with select features + * (currently no such features exist) */ + loadConf->actions.nbrActions++; /* one more active action! */ + *ppAction = pAction; + } else { + // TODO: cleanup + } + +finalize_it: + free(cnfModName); + cnfparamvalsDestruct(paramvals, &pblk); + RETiRet; +} + +rsRetVal actionClassInit(void) +{ + DEFiRet; + /* request objects we use */ + CHKiRet(objGetObjInterface(&obj)); /* this provides the root pointer for all other queries */ + CHKiRet(objUse(datetime, CORE_COMPONENT)); + CHKiRet(objUse(module, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + CHKiRet(objUse(ruleset, CORE_COMPONENT)); + + CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &cs.pszActionName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, + &cs.pszActionQFName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueSize, + NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL, + &cs.bActionWriteAllMarkMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, + &cs.iActionQueueDeqBatchSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, + &cs.iActionQueMaxDiskSpace, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, + &cs.iActionQHighWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, + &cs.iActionQLowWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, + &cs.iActionQDiscardMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, + &cs.iActionQDiscardSeverity, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, + &cs.iActionQPersistUpdCnt, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, + &cs.bActionQSyncQeueFiles, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, + &cs.iActionQueueNumWorkers, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, + &cs.iActionQtoQShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, + &cs.iActionQtoActShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, + &cs.iActionQtoEnq, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, + &cs.iActionQtoWrkShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, + &cs.iActionQWrkMinMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, + &cs.iActionQueMaxFileSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, + &cs.bActionQSaveOnShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, + &cs.iActionQueueDeqSlowdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, + &cs.iActionQueueDeqtWinFromHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, + &cs.iActionQueueDeqtWinToHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, + &cs.iActExecEveryNthOccur, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, + &cs.iActExecEveryNthOccurTO, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyonceeveryinterval", 0, eCmdHdlrInt, NULL, + &cs.iActExecOnceInterval, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, + &cs.bActionRepMsgHasMsg, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, + &cs.bActExecWhenPrevSusp, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeretrycount", 0, eCmdHdlrInt, NULL, + &cs.glbliActionResumeRetryCount, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, + resetConfigVariables, NULL, NULL)); + + initConfigVariables(); /* first-time init of config setings */ + +finalize_it: + RETiRet; +} + +/* vi:set ai: + */ -- cgit v1.2.3